1 // 2 // Copyright (C) 2010-2021 Codership Oy <info@codership.com> 3 // 4 5 //! @file replicator_smm.hpp 6 // 7 // @brief Galera Synchronous Multi-Master replicator 8 // 9 10 #ifndef GALERA_REPLICATOR_SMM_HPP 11 #define GALERA_REPLICATOR_SMM_HPP 12 13 #include "replicator.hpp" 14 15 #include "gu_init.h" 16 #include "GCache.hpp" 17 #include "gcs.hpp" 18 #include "monitor.hpp" 19 #include "wsdb.hpp" 20 #include "certification.hpp" 21 #include "trx_handle.hpp" 22 #include "write_set.hpp" 23 #include "galera_service_thd.hpp" 24 #include "fsm.hpp" 25 #include "gcs_action_source.hpp" 26 #include "ist.hpp" 27 #include "gu_atomic.hpp" 28 #include "saved_state.hpp" 29 #include "gu_debug_sync.hpp" 30 31 32 #include <map> 33 34 namespace galera 35 { 36 class ReplicatorSMM : public Replicator 37 { 38 public: 39 40 typedef enum 41 { 42 SST_NONE, 43 SST_WAIT, 44 SST_JOIN_SENT, 45 SST_REQ_FAILED, 46 SST_FAILED 47 } SstState; 48 49 static const size_t N_STATES = S_DONOR + 1; 50 51 ReplicatorSMM(const wsrep_init_args* args); 52 53 ~ReplicatorSMM(); 54 trx_proto_ver() const55 int trx_proto_ver() const { return trx_params_.version_; } repl_proto_ver() const56 int repl_proto_ver() const{ return protocol_version_; } 57 58 wsrep_status_t connect(const std::string& cluster_name, 59 const std::string& cluster_url, 60 const std::string& state_donor, 61 bool bootstrap); 62 wsrep_status_t close(); 63 wsrep_status_t async_recv(void* recv_ctx); 64 get_local_trx(wsrep_trx_id_t trx_id,bool create=false)65 TrxHandle* get_local_trx(wsrep_trx_id_t trx_id, bool create = false) 66 { 67 return wsdb_.get_trx(trx_params_, uuid_, trx_id, create); 68 } 69 unref_local_trx(TrxHandle * trx)70 void unref_local_trx(TrxHandle* trx) 71 { 72 assert(trx->refcnt() > 1); 73 trx->unref(); 74 } 75 discard_local_trx(TrxHandle * trx)76 void discard_local_trx(TrxHandle* trx) 77 { 78 trx->release_write_set_out(); 79 wsdb_.discard_trx(trx->trx_id()); 80 } 81 local_conn_trx(wsrep_conn_id_t conn_id,bool create)82 TrxHandle* local_conn_trx(wsrep_conn_id_t conn_id, bool create) 83 { 84 return wsdb_.get_conn_query(trx_params_, uuid_, conn_id, create); 85 } 86 discard_local_conn_trx(wsrep_conn_id_t conn_id)87 void discard_local_conn_trx(wsrep_conn_id_t conn_id) 88 { 89 wsdb_.discard_conn_query(conn_id); 90 } 91 92 void apply_trx(void* recv_ctx, TrxHandle* trx); 93 94 wsrep_status_t replicate(TrxHandle* trx, wsrep_trx_meta_t*); 95 void abort_trx(TrxHandle* trx) ; 96 wsrep_status_t pre_commit(TrxHandle* trx, wsrep_trx_meta_t*); 97 wsrep_status_t replay_trx(TrxHandle* trx, void* replay_ctx); 98 99 wsrep_status_t post_commit(TrxHandle* trx); 100 wsrep_status_t post_rollback(TrxHandle* trx); 101 102 wsrep_status_t causal_read(wsrep_gtid_t*); 103 wsrep_status_t to_isolation_begin(TrxHandle* trx, wsrep_trx_meta_t*); 104 wsrep_status_t to_isolation_end(TrxHandle* trx); 105 wsrep_status_t preordered_collect(wsrep_po_handle_t& handle, 106 const struct wsrep_buf* data, 107 size_t count, 108 bool copy); 109 wsrep_status_t preordered_commit(wsrep_po_handle_t& handle, 110 const wsrep_uuid_t& source, 111 uint64_t flags, 112 int pa_range, 113 bool commit); 114 wsrep_status_t sst_sent(const wsrep_gtid_t& state_id, int rcode); 115 wsrep_status_t sst_received(const wsrep_gtid_t& state_id, 116 const void* state, 117 size_t state_len, 118 int rcode); 119 120 void process_trx(void* recv_ctx, TrxHandle* trx); 121 void process_commit_cut(wsrep_seqno_t seq, wsrep_seqno_t seqno_l); 122 void process_conf_change(void* recv_ctx, 123 const wsrep_view_info_t& view, 124 int repl_proto, 125 State next_state, 126 wsrep_seqno_t seqno_l); 127 void process_state_req(void* recv_ctx, const void* req, 128 size_t req_size, wsrep_seqno_t seqno_l, 129 wsrep_seqno_t donor_seq); 130 void process_join(wsrep_seqno_t seqno, wsrep_seqno_t seqno_l); 131 void process_sync(wsrep_seqno_t seqno_l); 132 133 const struct wsrep_stats_var* stats_get() const; 134 void stats_reset(); 135 void stats_free(struct wsrep_stats_var*); 136 137 /*! @throws NotFound */ 138 void set_param (const std::string& key, 139 const std::string& value); 140 141 /*! @throws NotFound */ 142 void param_set (const std::string& key, 143 const std::string& value); 144 145 std::string param_get (const std::string& key) const; 146 params() const147 const gu::Config& params() const { return config_; } 148 149 wsrep_seqno_t pause(); 150 void resume(); 151 152 void desync(); 153 void resync(); 154 155 struct InitConfig 156 { 157 InitConfig(gu::Config&, const char* node_address, const char *base_dir); 158 }; 159 160 private: 161 162 ReplicatorSMM(const ReplicatorSMM&); 163 void operator=(const ReplicatorSMM&); 164 165 struct Param 166 { 167 static const std::string base_host; 168 static const std::string base_port; 169 static const std::string base_dir; 170 static const std::string proto_max; 171 static const std::string key_format; 172 static const std::string commit_order; 173 static const std::string causal_read_timeout; 174 static const std::string max_write_set_size; 175 }; 176 177 typedef std::pair<std::string, std::string> Default; 178 179 struct Defaults 180 { 181 std::map<std::string, std::string> map_; 182 Defaults (); 183 }; 184 185 static const Defaults defaults; 186 // both a list of parameters and a list of default values 187 last_committed()188 wsrep_seqno_t last_committed() 189 { 190 return co_mode_ != CommitOrder::BYPASS ? 191 commit_monitor_.last_left() : apply_monitor_.last_left(); 192 } 193 report_last_committed(wsrep_seqno_t purge_seqno)194 void report_last_committed(wsrep_seqno_t purge_seqno) 195 { 196 if (gu_unlikely(purge_seqno != -1)) 197 { 198 service_thd_.report_last_committed(purge_seqno); 199 } 200 } 201 202 wsrep_status_t cert(TrxHandle* trx); 203 wsrep_status_t cert_and_catch(TrxHandle* trx); 204 wsrep_status_t cert_for_aborted(TrxHandle* trx); 205 206 void update_state_uuid (const wsrep_uuid_t& u); 207 void update_incoming_list (const wsrep_view_info_t& v); 208 209 /* aborts/exits the program in a clean way */ 210 void abort() GU_NORETURN; 211 212 class LocalOrder 213 { 214 public: 215 LocalOrder(TrxHandle & trx)216 LocalOrder(TrxHandle& trx) 217 : 218 seqno_(trx.local_seqno()), 219 trx_(&trx) 220 { } 221 LocalOrder(wsrep_seqno_t seqno)222 LocalOrder(wsrep_seqno_t seqno) 223 : 224 seqno_(seqno), 225 trx_(0) 226 { } 227 lock()228 void lock() { if (trx_ != 0) trx_->lock(); } unlock()229 void unlock() { if (trx_ != 0) trx_->unlock(); } 230 seqno() const231 wsrep_seqno_t seqno() const { return seqno_; } 232 condition(wsrep_seqno_t last_entered,wsrep_seqno_t last_left) const233 bool condition(wsrep_seqno_t last_entered, 234 wsrep_seqno_t last_left) const 235 { 236 return (last_left + 1 == seqno_); 237 } 238 239 #ifdef GU_DBUG_ON debug_sync(gu::Mutex & mutex)240 void debug_sync(gu::Mutex& mutex) 241 { 242 if (trx_ != 0 && trx_->is_local()) 243 { 244 unlock(); 245 mutex.unlock(); 246 GU_DBUG_SYNC_WAIT("local_monitor_enter_sync"); 247 mutex.lock(); 248 lock(); 249 } 250 } 251 #endif // GU_DBUG_ON 252 private: 253 LocalOrder(const LocalOrder&); 254 wsrep_seqno_t seqno_; 255 TrxHandle* trx_; 256 }; 257 258 class ApplyOrder 259 { 260 public: 261 ApplyOrder(TrxHandle & trx)262 ApplyOrder(TrxHandle& trx) : trx_(trx) { } 263 lock()264 void lock() { trx_.lock(); } unlock()265 void unlock() { trx_.unlock(); } 266 seqno() const267 wsrep_seqno_t seqno() const { return trx_.global_seqno(); } 268 condition(wsrep_seqno_t last_entered,wsrep_seqno_t last_left) const269 bool condition(wsrep_seqno_t last_entered, 270 wsrep_seqno_t last_left) const 271 { 272 return (trx_.is_local() == true || 273 last_left >= trx_.depends_seqno()); 274 } 275 276 #ifdef GU_DBUG_ON debug_sync(gu::Mutex & mutex)277 void debug_sync(gu::Mutex& mutex) 278 { 279 if (trx_.is_local()) 280 { 281 unlock(); 282 mutex.unlock(); 283 GU_DBUG_SYNC_WAIT("apply_monitor_enter_sync"); 284 mutex.lock(); 285 lock(); 286 } 287 else 288 { 289 unlock(); 290 mutex.unlock(); 291 GU_DBUG_SYNC_WAIT("apply_monitor_slave_enter_sync"); 292 mutex.lock(); 293 lock(); 294 } 295 } 296 #endif // GU_DBUG_ON 297 298 private: 299 ApplyOrder(const ApplyOrder&); 300 TrxHandle& trx_; 301 }; 302 303 public: 304 305 class CommitOrder 306 { 307 public: 308 typedef enum 309 { 310 BYPASS = 0, 311 OOOC = 1, 312 LOCAL_OOOC = 2, 313 NO_OOOC = 3 314 } Mode; 315 from_string(const std::string & str)316 static Mode from_string(const std::string& str) 317 { 318 int ret(gu::from_string<int>(str)); 319 switch (ret) 320 { 321 case BYPASS: 322 case OOOC: 323 case LOCAL_OOOC: 324 case NO_OOOC: 325 break; 326 default: 327 gu_throw_error(EINVAL) 328 << "invalid value " << str << " for commit order mode"; 329 } 330 return static_cast<Mode>(ret); 331 } 332 CommitOrder(TrxHandle & trx,Mode mode)333 CommitOrder(TrxHandle& trx, Mode mode) 334 : 335 trx_ (trx ), 336 mode_(mode) 337 { } 338 lock()339 void lock() { trx_.lock(); } unlock()340 void unlock() { trx_.unlock(); } seqno() const341 wsrep_seqno_t seqno() const { return trx_.global_seqno(); } condition(wsrep_seqno_t last_entered,wsrep_seqno_t last_left) const342 bool condition(wsrep_seqno_t last_entered, 343 wsrep_seqno_t last_left) const 344 { 345 switch (mode_) 346 { 347 case BYPASS: 348 gu_throw_fatal 349 << "commit order condition called in bypass mode"; 350 case OOOC: 351 return true; 352 case LOCAL_OOOC: 353 return trx_.is_local(); 354 // in case of remote trx fall through 355 case NO_OOOC: 356 return (last_left + 1 == trx_.global_seqno()); 357 } 358 gu_throw_fatal << "invalid commit mode value " << mode_; 359 } 360 361 #ifdef GU_DBUG_ON debug_sync(gu::Mutex & mutex)362 void debug_sync(gu::Mutex& mutex) 363 { 364 if (trx_.is_local()) 365 { 366 unlock(); 367 mutex.unlock(); 368 GU_DBUG_SYNC_WAIT("commit_monitor_enter_sync"); 369 mutex.lock(); 370 lock(); 371 } 372 } 373 #endif // GU_DBUG_ON 374 375 private: 376 CommitOrder(const CommitOrder&); 377 TrxHandle& trx_; 378 const Mode mode_; 379 }; 380 381 class StateRequest 382 { 383 public: 384 virtual const void* req () const = 0; 385 virtual ssize_t len () const = 0; 386 virtual const void* sst_req () const = 0; 387 virtual ssize_t sst_len () const = 0; 388 virtual const void* ist_req () const = 0; 389 virtual ssize_t ist_len () const = 0; ~StateRequest()390 virtual ~StateRequest() {} 391 }; 392 393 private: 394 // state machine 395 class Transition 396 { 397 public: 398 Transition(State const from,State const to)399 Transition(State const from, State const to) : 400 from_(from), 401 to_(to) 402 { } 403 from() const404 State from() const { return from_; } to() const405 State to() const { return to_; } 406 operator ==(Transition const & other) const407 bool operator==(Transition const& other) const 408 { 409 return (from_ == other.from_ && to_ == other.to_); 410 } 411 412 class Hash 413 { 414 public: operator ()(Transition const & tr) const415 size_t operator()(Transition const& tr) const 416 { 417 return (gu::HashValue(static_cast<int>(tr.from_)) 418 ^ gu::HashValue(static_cast<int>(tr.to_))); 419 } 420 }; 421 422 private: 423 424 State from_; 425 State to_; 426 }; 427 428 429 void build_stats_vars (std::vector<struct wsrep_stats_var>& stats); 430 431 void establish_protocol_versions (int version); 432 433 bool state_transfer_required(const wsrep_view_info_t& view_info); 434 435 void prepare_for_IST (void*& req, ssize_t& req_len, 436 const wsrep_uuid_t& group_uuid, 437 wsrep_seqno_t group_seqno); 438 439 void recv_IST(void* recv_ctx); 440 441 StateRequest* prepare_state_request (const void* sst_req, 442 ssize_t sst_req_len, 443 const wsrep_uuid_t& group_uuid, 444 wsrep_seqno_t group_seqno); 445 446 void send_state_request (const StateRequest* req); 447 448 void request_state_transfer (void* recv_ctx, 449 const wsrep_uuid_t& group_uuid, 450 wsrep_seqno_t group_seqno, 451 const void* sst_req, 452 ssize_t sst_req_len); 453 454 wsrep_seqno_t donate_sst(void* recv_ctx, const StateRequest& streq, 455 const wsrep_gtid_t& state_id, bool bypass); 456 457 /* local state seqno for internal use (macro mock up) */ STATE_SEQNO(void)458 wsrep_seqno_t STATE_SEQNO(void) { return apply_monitor_.last_left(); } 459 460 class InitLib /* Library initialization routines */ 461 { 462 public: InitLib(gu_log_cb_t cb)463 InitLib (gu_log_cb_t cb) { gu_init(cb); } 464 }; 465 466 InitLib init_lib_; 467 gu::Config config_; 468 469 InitConfig 470 init_config_; // registers configurable parameters and defaults 471 472 struct ParseOptions 473 { 474 ParseOptions(Replicator& repl, gu::Config&, const char* opts); 475 } 476 parse_options_; // parse option string supplied on initialization 477 478 class InitSSL 479 { 480 public: InitSSL(gu::Config & conf)481 InitSSL(gu::Config& conf) { gu::ssl_init_options(conf); } 482 } init_ssl_; // initialize global SSL parameters 483 484 static int const MAX_PROTO_VER; 485 /* 486 * |--------------------------------------------------------------------| 487 * | protocol_version_ | trx version | str_proto_ver_ | record_set_ver_ | 488 * |--------------------------------------------------------------------| 489 * | 1 | 1 | 0 | 1 | 490 * | 2 | 1 | 1 | 1 | 491 * | 3 | 2 | 1 | 1 | 492 * | 4 | 2 | 1 | 1 | 493 * | 5 | 3 | 1 | 1 | 494 * | 6 | 3 | 2 | 1 | 495 * | 7 | 3 | 2 | 1 | 496 * | 8 | 3 | 2 | 2 | 497 * | 9 | 4 | 2 | 2 | 498 * |--------------------------------------------------------------------| 499 */ 500 501 int str_proto_ver_;// state transfer request protocol 502 int protocol_version_;// general repl layer proto 503 int proto_max_; // maximum allowed proto version 504 505 FSM<State, Transition> state_; 506 SstState sst_state_; 507 508 // configurable params 509 const CommitOrder::Mode co_mode_; // commit order mode 510 511 // persistent data location 512 std::string state_file_; 513 SavedState st_; 514 515 // boolean telling if the node is safe to use for bootstrapping 516 // a new primary component 517 bool safe_to_bootstrap_; 518 519 // currently installed trx parameters 520 TrxHandle::Params trx_params_; 521 522 // identifiers 523 wsrep_uuid_t uuid_; 524 wsrep_uuid_t const state_uuid_; 525 const char state_uuid_str_[37]; 526 wsrep_seqno_t cc_seqno_; // seqno of last CC 527 wsrep_seqno_t pause_seqno_; // local seqno of last pause call 528 529 // application callbacks 530 void* app_ctx_; 531 wsrep_view_cb_t view_cb_; 532 wsrep_apply_cb_t apply_cb_; 533 wsrep_commit_cb_t commit_cb_; 534 wsrep_unordered_cb_t unordered_cb_; 535 wsrep_sst_donate_cb_t sst_donate_cb_; 536 wsrep_synced_cb_t synced_cb_; 537 538 // SST 539 std::string sst_donor_; 540 wsrep_uuid_t sst_uuid_; 541 wsrep_seqno_t sst_seqno_; 542 gu::Mutex sst_mutex_; 543 gu::Cond sst_cond_; 544 int sst_retry_sec_; 545 enum st_type 546 { 547 ST_TYPE_NONE, 548 ST_TYPE_SST, 549 ST_TYPE_IST 550 } last_st_type_; 551 552 // services 553 gcache::GCache gcache_; 554 GCS_IMPL gcs_; 555 ServiceThd service_thd_; 556 557 // action sources 558 TrxHandle::SlavePool slave_pool_; 559 ActionSource* as_; 560 GcsActionSource gcs_as_; 561 ist::Receiver ist_receiver_; 562 ist::AsyncSenderMap ist_senders_; 563 564 // trx processing 565 Wsdb wsdb_; 566 Certification cert_; 567 568 // concurrency control 569 Monitor<LocalOrder> local_monitor_; 570 Monitor<ApplyOrder> apply_monitor_; 571 Monitor<CommitOrder> commit_monitor_; 572 gu::datetime::Period causal_read_timeout_; 573 574 // counters 575 gu::Atomic<size_t> receivers_; 576 gu::Atomic<long long> replicated_; 577 gu::Atomic<long long> replicated_bytes_; 578 gu::Atomic<long long> keys_count_; 579 gu::Atomic<long long> keys_bytes_; 580 gu::Atomic<long long> data_bytes_; 581 gu::Atomic<long long> unrd_bytes_; 582 gu::Atomic<long long> local_commits_; 583 gu::Atomic<long long> local_rollbacks_; 584 gu::Atomic<long long> local_cert_failures_; 585 gu::Atomic<long long> local_replays_; 586 gu::Atomic<long long> causal_reads_; 587 588 gu::Atomic<long long> preordered_id_; // temporary preordered ID 589 590 // non-atomic stats 591 std::string incoming_list_; 592 mutable gu::Mutex incoming_mutex_; 593 594 mutable std::vector<struct wsrep_stats_var> wsrep_stats_; 595 }; 596 597 std::ostream& operator<<(std::ostream& os, ReplicatorSMM::State state); 598 } 599 600 #endif /* GALERA_REPLICATOR_SMM_HPP */ 601