1 //
2 // Copyright (C) 2010-2021 Codership Oy <info@codership.com>
3 //
4 
5 //! @file replicator_smm.hpp
6 //
7 // @brief Galera Synchronous Multi-Master replicator
8 //
9 
10 #ifndef GALERA_REPLICATOR_SMM_HPP
11 #define GALERA_REPLICATOR_SMM_HPP
12 
13 #include "replicator.hpp"
14 
15 #include "gu_init.h"
16 #include "GCache.hpp"
17 #include "gcs.hpp"
18 #include "monitor.hpp"
19 #include "wsdb.hpp"
20 #include "certification.hpp"
21 #include "trx_handle.hpp"
22 #include "write_set.hpp"
23 #include "galera_service_thd.hpp"
24 #include "fsm.hpp"
25 #include "gcs_action_source.hpp"
26 #include "ist.hpp"
27 #include "gu_atomic.hpp"
28 #include "saved_state.hpp"
29 #include "gu_debug_sync.hpp"
30 
31 
32 #include <map>
33 
34 namespace galera
35 {
36     class ReplicatorSMM : public Replicator
37     {
38     public:
39 
40         typedef enum
41         {
42             SST_NONE,
43             SST_WAIT,
44             SST_JOIN_SENT,
45             SST_REQ_FAILED,
46             SST_FAILED
47         } SstState;
48 
49         static const size_t N_STATES = S_DONOR + 1;
50 
51         ReplicatorSMM(const wsrep_init_args* args);
52 
53         ~ReplicatorSMM();
54 
trx_proto_ver() const55         int trx_proto_ver() const { return trx_params_.version_; }
repl_proto_ver() const56         int repl_proto_ver() const{ return protocol_version_; }
57 
58         wsrep_status_t connect(const std::string& cluster_name,
59                                const std::string& cluster_url,
60                                const std::string& state_donor,
61                                bool               bootstrap);
62         wsrep_status_t close();
63         wsrep_status_t async_recv(void* recv_ctx);
64 
get_local_trx(wsrep_trx_id_t trx_id,bool create=false)65         TrxHandle* get_local_trx(wsrep_trx_id_t trx_id, bool create = false)
66         {
67             return wsdb_.get_trx(trx_params_, uuid_, trx_id, create);
68         }
69 
unref_local_trx(TrxHandle * trx)70         void unref_local_trx(TrxHandle* trx)
71         {
72             assert(trx->refcnt() > 1);
73             trx->unref();
74         }
75 
discard_local_trx(TrxHandle * trx)76         void discard_local_trx(TrxHandle* trx)
77         {
78             trx->release_write_set_out();
79             wsdb_.discard_trx(trx->trx_id());
80         }
81 
local_conn_trx(wsrep_conn_id_t conn_id,bool create)82         TrxHandle* local_conn_trx(wsrep_conn_id_t conn_id, bool create)
83         {
84             return wsdb_.get_conn_query(trx_params_, uuid_, conn_id, create);
85         }
86 
discard_local_conn_trx(wsrep_conn_id_t conn_id)87         void discard_local_conn_trx(wsrep_conn_id_t conn_id)
88         {
89             wsdb_.discard_conn_query(conn_id);
90         }
91 
92         void apply_trx(void* recv_ctx, TrxHandle* trx);
93 
94         wsrep_status_t replicate(TrxHandle* trx, wsrep_trx_meta_t*);
95         void abort_trx(TrxHandle* trx) ;
96         wsrep_status_t pre_commit(TrxHandle*  trx, wsrep_trx_meta_t*);
97         wsrep_status_t replay_trx(TrxHandle* trx, void* replay_ctx);
98 
99         wsrep_status_t post_commit(TrxHandle* trx);
100         wsrep_status_t post_rollback(TrxHandle* trx);
101 
102         wsrep_status_t causal_read(wsrep_gtid_t*);
103         wsrep_status_t to_isolation_begin(TrxHandle* trx, wsrep_trx_meta_t*);
104         wsrep_status_t to_isolation_end(TrxHandle* trx);
105         wsrep_status_t preordered_collect(wsrep_po_handle_t&      handle,
106                                           const struct wsrep_buf* data,
107                                           size_t                  count,
108                                           bool                    copy);
109         wsrep_status_t preordered_commit(wsrep_po_handle_t&      handle,
110                                          const wsrep_uuid_t&     source,
111                                          uint64_t                flags,
112                                          int                     pa_range,
113                                          bool                    commit);
114         wsrep_status_t sst_sent(const wsrep_gtid_t& state_id, int rcode);
115         wsrep_status_t sst_received(const wsrep_gtid_t& state_id,
116                                     const void*         state,
117                                     size_t              state_len,
118                                     int                 rcode);
119 
120         void process_trx(void* recv_ctx, TrxHandle* trx);
121         void process_commit_cut(wsrep_seqno_t seq, wsrep_seqno_t seqno_l);
122         void process_conf_change(void* recv_ctx,
123                                  const wsrep_view_info_t& view,
124                                  int repl_proto,
125                                  State next_state,
126                                  wsrep_seqno_t seqno_l);
127         void process_state_req(void* recv_ctx, const void* req,
128                                size_t req_size, wsrep_seqno_t seqno_l,
129                                wsrep_seqno_t donor_seq);
130         void process_join(wsrep_seqno_t seqno, wsrep_seqno_t seqno_l);
131         void process_sync(wsrep_seqno_t seqno_l);
132 
133         const struct wsrep_stats_var* stats_get()  const;
134         void                          stats_reset();
135         void                   stats_free(struct wsrep_stats_var*);
136 
137         /*! @throws NotFound */
138         void           set_param (const std::string& key,
139                                   const std::string& value);
140 
141         /*! @throws NotFound */
142         void           param_set (const std::string& key,
143                                   const std::string& value);
144 
145         std::string    param_get (const std::string& key) const;
146 
params() const147         const gu::Config& params() const { return config_; }
148 
149         wsrep_seqno_t pause();
150         void          resume();
151 
152         void          desync();
153         void          resync();
154 
155         struct InitConfig
156         {
157             InitConfig(gu::Config&, const char* node_address, const char *base_dir);
158         };
159 
160     private:
161 
162         ReplicatorSMM(const ReplicatorSMM&);
163         void operator=(const ReplicatorSMM&);
164 
165         struct Param
166         {
167             static const std::string base_host;
168             static const std::string base_port;
169             static const std::string base_dir;
170             static const std::string proto_max;
171             static const std::string key_format;
172             static const std::string commit_order;
173             static const std::string causal_read_timeout;
174             static const std::string max_write_set_size;
175         };
176 
177         typedef std::pair<std::string, std::string> Default;
178 
179         struct Defaults
180         {
181             std::map<std::string, std::string> map_;
182             Defaults ();
183         };
184 
185         static const Defaults defaults;
186         // both a list of parameters and a list of default values
187 
last_committed()188         wsrep_seqno_t last_committed()
189         {
190             return co_mode_ != CommitOrder::BYPASS ?
191                    commit_monitor_.last_left() : apply_monitor_.last_left();
192         }
193 
report_last_committed(wsrep_seqno_t purge_seqno)194         void report_last_committed(wsrep_seqno_t purge_seqno)
195         {
196             if (gu_unlikely(purge_seqno != -1))
197             {
198                 service_thd_.report_last_committed(purge_seqno);
199             }
200         }
201 
202         wsrep_status_t cert(TrxHandle* trx);
203         wsrep_status_t cert_and_catch(TrxHandle* trx);
204         wsrep_status_t cert_for_aborted(TrxHandle* trx);
205 
206         void update_state_uuid (const wsrep_uuid_t& u);
207         void update_incoming_list (const wsrep_view_info_t& v);
208 
209         /* aborts/exits the program in a clean way */
210         void abort() GU_NORETURN;
211 
212         class LocalOrder
213         {
214         public:
215 
LocalOrder(TrxHandle & trx)216             LocalOrder(TrxHandle& trx)
217                 :
218                 seqno_(trx.local_seqno()),
219                 trx_(&trx)
220             { }
221 
LocalOrder(wsrep_seqno_t seqno)222             LocalOrder(wsrep_seqno_t seqno)
223                 :
224                 seqno_(seqno),
225                 trx_(0)
226             { }
227 
lock()228             void lock()   { if (trx_ != 0) trx_->lock();   }
unlock()229             void unlock() { if (trx_ != 0) trx_->unlock(); }
230 
seqno() const231             wsrep_seqno_t seqno() const { return seqno_; }
232 
condition(wsrep_seqno_t last_entered,wsrep_seqno_t last_left) const233             bool condition(wsrep_seqno_t last_entered,
234                            wsrep_seqno_t last_left) const
235             {
236                 return (last_left + 1 == seqno_);
237             }
238 
239 #ifdef GU_DBUG_ON
debug_sync(gu::Mutex & mutex)240             void debug_sync(gu::Mutex& mutex)
241             {
242                 if (trx_ != 0 && trx_->is_local())
243                 {
244                     unlock();
245                     mutex.unlock();
246                     GU_DBUG_SYNC_WAIT("local_monitor_enter_sync");
247                     mutex.lock();
248                     lock();
249                 }
250             }
251 #endif // GU_DBUG_ON
252         private:
253             LocalOrder(const LocalOrder&);
254             wsrep_seqno_t seqno_;
255             TrxHandle*    trx_;
256         };
257 
258         class ApplyOrder
259         {
260         public:
261 
ApplyOrder(TrxHandle & trx)262             ApplyOrder(TrxHandle& trx) : trx_(trx) { }
263 
lock()264             void lock()   { trx_.lock();   }
unlock()265             void unlock() { trx_.unlock(); }
266 
seqno() const267             wsrep_seqno_t seqno() const { return trx_.global_seqno(); }
268 
condition(wsrep_seqno_t last_entered,wsrep_seqno_t last_left) const269             bool condition(wsrep_seqno_t last_entered,
270                            wsrep_seqno_t last_left) const
271             {
272                 return (trx_.is_local() == true ||
273                         last_left >= trx_.depends_seqno());
274             }
275 
276 #ifdef GU_DBUG_ON
debug_sync(gu::Mutex & mutex)277             void debug_sync(gu::Mutex& mutex)
278             {
279                 if (trx_.is_local())
280                 {
281                     unlock();
282                     mutex.unlock();
283                     GU_DBUG_SYNC_WAIT("apply_monitor_enter_sync");
284                     mutex.lock();
285                     lock();
286                 }
287                 else
288                 {
289                     unlock();
290                     mutex.unlock();
291                     GU_DBUG_SYNC_WAIT("apply_monitor_slave_enter_sync");
292                     mutex.lock();
293                     lock();
294                 }
295             }
296 #endif // GU_DBUG_ON
297 
298         private:
299             ApplyOrder(const ApplyOrder&);
300             TrxHandle& trx_;
301         };
302 
303     public:
304 
305         class CommitOrder
306         {
307         public:
308             typedef enum
309             {
310                 BYPASS     = 0,
311                 OOOC       = 1,
312                 LOCAL_OOOC = 2,
313                 NO_OOOC    = 3
314             } Mode;
315 
from_string(const std::string & str)316             static Mode from_string(const std::string& str)
317             {
318                 int ret(gu::from_string<int>(str));
319                 switch (ret)
320                 {
321                 case BYPASS:
322                 case OOOC:
323                 case LOCAL_OOOC:
324                 case NO_OOOC:
325                     break;
326                 default:
327                     gu_throw_error(EINVAL)
328                         << "invalid value " << str << " for commit order mode";
329                 }
330                 return static_cast<Mode>(ret);
331             }
332 
CommitOrder(TrxHandle & trx,Mode mode)333             CommitOrder(TrxHandle& trx, Mode mode)
334                 :
335                 trx_ (trx ),
336                 mode_(mode)
337             { }
338 
lock()339             void lock()   { trx_.lock();   }
unlock()340             void unlock() { trx_.unlock(); }
seqno() const341             wsrep_seqno_t seqno() const { return trx_.global_seqno(); }
condition(wsrep_seqno_t last_entered,wsrep_seqno_t last_left) const342             bool condition(wsrep_seqno_t last_entered,
343                            wsrep_seqno_t last_left) const
344             {
345                 switch (mode_)
346                 {
347                 case BYPASS:
348                     gu_throw_fatal
349                         << "commit order condition called in bypass mode";
350                 case OOOC:
351                     return true;
352                 case LOCAL_OOOC:
353                     return trx_.is_local();
354                     // in case of remote trx fall through
355                 case NO_OOOC:
356                     return (last_left + 1 == trx_.global_seqno());
357                 }
358                 gu_throw_fatal << "invalid commit mode value " << mode_;
359             }
360 
361 #ifdef GU_DBUG_ON
debug_sync(gu::Mutex & mutex)362             void debug_sync(gu::Mutex& mutex)
363             {
364                 if (trx_.is_local())
365                 {
366                     unlock();
367                     mutex.unlock();
368                     GU_DBUG_SYNC_WAIT("commit_monitor_enter_sync");
369                     mutex.lock();
370                     lock();
371                 }
372             }
373 #endif // GU_DBUG_ON
374 
375         private:
376             CommitOrder(const CommitOrder&);
377             TrxHandle& trx_;
378             const Mode mode_;
379         };
380 
381         class StateRequest
382         {
383         public:
384             virtual const void* req     () const = 0;
385             virtual ssize_t     len     () const = 0;
386             virtual const void* sst_req () const = 0;
387             virtual ssize_t     sst_len () const = 0;
388             virtual const void* ist_req () const = 0;
389             virtual ssize_t     ist_len () const = 0;
~StateRequest()390             virtual ~StateRequest() {}
391         };
392 
393     private:
394         // state machine
395         class Transition
396         {
397         public:
398 
Transition(State const from,State const to)399             Transition(State const from, State const to) :
400                 from_(from),
401                 to_(to)
402             { }
403 
from() const404             State from() const { return from_; }
to() const405             State to()   const { return to_;   }
406 
operator ==(Transition const & other) const407             bool operator==(Transition const& other) const
408             {
409                 return (from_ == other.from_ && to_ == other.to_);
410             }
411 
412             class Hash
413             {
414             public:
operator ()(Transition const & tr) const415                 size_t operator()(Transition const& tr) const
416                 {
417                     return (gu::HashValue(static_cast<int>(tr.from_))
418                             ^ gu::HashValue(static_cast<int>(tr.to_)));
419                 }
420             };
421 
422         private:
423 
424             State from_;
425             State to_;
426         };
427 
428 
429         void build_stats_vars (std::vector<struct wsrep_stats_var>& stats);
430 
431         void establish_protocol_versions (int version);
432 
433         bool state_transfer_required(const wsrep_view_info_t& view_info);
434 
435         void prepare_for_IST (void*& req, ssize_t& req_len,
436                               const wsrep_uuid_t& group_uuid,
437                               wsrep_seqno_t       group_seqno);
438 
439         void recv_IST(void* recv_ctx);
440 
441         StateRequest* prepare_state_request (const void* sst_req,
442                                              ssize_t     sst_req_len,
443                                              const wsrep_uuid_t& group_uuid,
444                                              wsrep_seqno_t       group_seqno);
445 
446         void send_state_request (const StateRequest* req);
447 
448         void request_state_transfer (void* recv_ctx,
449                                      const wsrep_uuid_t& group_uuid,
450                                      wsrep_seqno_t       group_seqno,
451                                      const void*         sst_req,
452                                      ssize_t             sst_req_len);
453 
454         wsrep_seqno_t donate_sst(void* recv_ctx, const StateRequest& streq,
455                                  const wsrep_gtid_t& state_id, bool bypass);
456 
457         /* local state seqno for internal use (macro mock up) */
STATE_SEQNO(void)458         wsrep_seqno_t STATE_SEQNO(void) { return apply_monitor_.last_left(); }
459 
460         class InitLib /* Library initialization routines */
461         {
462         public:
InitLib(gu_log_cb_t cb)463             InitLib (gu_log_cb_t cb) { gu_init(cb); }
464         };
465 
466         InitLib                init_lib_;
467         gu::Config             config_;
468 
469         InitConfig
470             init_config_; // registers configurable parameters and defaults
471 
472         struct ParseOptions
473         {
474             ParseOptions(Replicator& repl, gu::Config&, const char* opts);
475         }
476             parse_options_; // parse option string supplied on initialization
477 
478         class InitSSL
479         {
480         public:
InitSSL(gu::Config & conf)481             InitSSL(gu::Config& conf) { gu::ssl_init_options(conf); }
482         } init_ssl_; // initialize global SSL parameters
483 
484         static int const       MAX_PROTO_VER;
485         /*
486          * |--------------------------------------------------------------------|
487          * | protocol_version_ | trx version | str_proto_ver_ | record_set_ver_ |
488          * |--------------------------------------------------------------------|
489          * |                 1 |           1 |              0 |               1 |
490          * |                 2 |           1 |              1 |               1 |
491          * |                 3 |           2 |              1 |               1 |
492          * |                 4 |           2 |              1 |               1 |
493          * |                 5 |           3 |              1 |               1 |
494          * |                 6 |           3 |              2 |               1 |
495          * |                 7 |           3 |              2 |               1 |
496          * |                 8 |           3 |              2 |               2 |
497          * |                 9 |           4 |              2 |               2 |
498          * |--------------------------------------------------------------------|
499          */
500 
501         int                    str_proto_ver_;// state transfer request protocol
502         int                    protocol_version_;// general repl layer proto
503         int                    proto_max_;    // maximum allowed proto version
504 
505         FSM<State, Transition> state_;
506         SstState               sst_state_;
507 
508         // configurable params
509         const CommitOrder::Mode co_mode_; // commit order mode
510 
511         // persistent data location
512         std::string           state_file_;
513         SavedState            st_;
514 
515         // boolean telling if the node is safe to use for bootstrapping
516         // a new primary component
517         bool safe_to_bootstrap_;
518 
519         // currently installed trx parameters
520         TrxHandle::Params     trx_params_;
521 
522         // identifiers
523         wsrep_uuid_t          uuid_;
524         wsrep_uuid_t const    state_uuid_;
525         const char            state_uuid_str_[37];
526         wsrep_seqno_t         cc_seqno_; // seqno of last CC
527         wsrep_seqno_t         pause_seqno_; // local seqno of last pause call
528 
529         // application callbacks
530         void*                 app_ctx_;
531         wsrep_view_cb_t       view_cb_;
532         wsrep_apply_cb_t      apply_cb_;
533         wsrep_commit_cb_t     commit_cb_;
534         wsrep_unordered_cb_t  unordered_cb_;
535         wsrep_sst_donate_cb_t sst_donate_cb_;
536         wsrep_synced_cb_t     synced_cb_;
537 
538         // SST
539         std::string   sst_donor_;
540         wsrep_uuid_t  sst_uuid_;
541         wsrep_seqno_t sst_seqno_;
542         gu::Mutex     sst_mutex_;
543         gu::Cond      sst_cond_;
544         int           sst_retry_sec_;
545         enum st_type
546         {
547             ST_TYPE_NONE,
548             ST_TYPE_SST,
549             ST_TYPE_IST
550         } last_st_type_;
551 
552         // services
553         gcache::GCache gcache_;
554         GCS_IMPL       gcs_;
555         ServiceThd     service_thd_;
556 
557         // action sources
558         TrxHandle::SlavePool slave_pool_;
559         ActionSource*        as_;
560         GcsActionSource      gcs_as_;
561         ist::Receiver        ist_receiver_;
562         ist::AsyncSenderMap  ist_senders_;
563 
564         // trx processing
565         Wsdb            wsdb_;
566         Certification   cert_;
567 
568         // concurrency control
569         Monitor<LocalOrder>  local_monitor_;
570         Monitor<ApplyOrder>  apply_monitor_;
571         Monitor<CommitOrder> commit_monitor_;
572         gu::datetime::Period causal_read_timeout_;
573 
574         // counters
575         gu::Atomic<size_t>    receivers_;
576         gu::Atomic<long long> replicated_;
577         gu::Atomic<long long> replicated_bytes_;
578         gu::Atomic<long long> keys_count_;
579         gu::Atomic<long long> keys_bytes_;
580         gu::Atomic<long long> data_bytes_;
581         gu::Atomic<long long> unrd_bytes_;
582         gu::Atomic<long long> local_commits_;
583         gu::Atomic<long long> local_rollbacks_;
584         gu::Atomic<long long> local_cert_failures_;
585         gu::Atomic<long long> local_replays_;
586         gu::Atomic<long long> causal_reads_;
587 
588         gu::Atomic<long long> preordered_id_; // temporary preordered ID
589 
590         // non-atomic stats
591         std::string           incoming_list_;
592         mutable gu::Mutex     incoming_mutex_;
593 
594         mutable std::vector<struct wsrep_stats_var> wsrep_stats_;
595     };
596 
597     std::ostream& operator<<(std::ostream& os, ReplicatorSMM::State state);
598 }
599 
600 #endif /* GALERA_REPLICATOR_SMM_HPP */
601