1 //
2 // Copyright (C) 2010-2020 Codership Oy <info@codership.com>
3 //
4 
5 #include "galera_common.hpp"
6 #include "replicator_smm.hpp"
7 #include "gcs_action_source.hpp"
8 #include "galera_exception.hpp"
9 
10 #include "galera_info.hpp"
11 
12 #include <gu_debug_sync.hpp>
13 #include <gu_abort.h>
14 
15 #include <sstream>
16 #include <iostream>
17 
18 
19 #define TX_SET_STATE(t_,s_) (t_).set_state(s_, __LINE__)
20 
21 
22 wsrep_cap_t
capabilities(int protocol_version)23 galera::ReplicatorSMM::capabilities(int protocol_version)
24 {
25     static uint64_t const v4_caps(WSREP_CAP_MULTI_MASTER         |
26                                   WSREP_CAP_CERTIFICATION        |
27                                   WSREP_CAP_PARALLEL_APPLYING    |
28                                   WSREP_CAP_TRX_REPLAY           |
29                                   WSREP_CAP_ISOLATION            |
30                                   WSREP_CAP_PAUSE                |
31                                   WSREP_CAP_CAUSAL_READS);
32 
33     static uint64_t const v5_caps(WSREP_CAP_INCREMENTAL_WRITESET |
34                                   WSREP_CAP_UNORDERED            |
35                                   WSREP_CAP_PREORDERED);
36 
37     static uint64_t const v8_caps(WSREP_CAP_STREAMING);
38 
39     static uint64_t const v9_caps(WSREP_CAP_NBO);
40 
41     if (protocol_version == -1) return 0;
42 
43     assert(protocol_version >= 4);
44 
45     uint64_t caps(v4_caps);
46 
47     if (protocol_version >= 5) caps |= v5_caps;
48     if (protocol_version >= 8) caps |= v8_caps;
49     if (protocol_version >= 9) caps |= v9_caps;
50 
51     return caps;
52 }
53 
54 
operator <<(std::ostream & os,ReplicatorSMM::State state)55 std::ostream& galera::operator<<(std::ostream& os, ReplicatorSMM::State state)
56 {
57     switch (state)
58     {
59     case ReplicatorSMM::S_DESTROYED: return (os << "DESTROYED");
60     case ReplicatorSMM::S_CLOSED:    return (os << "CLOSED");
61     case ReplicatorSMM::S_CONNECTED: return (os << "CONNECTED");
62     case ReplicatorSMM::S_JOINING:   return (os << "JOINING");
63     case ReplicatorSMM::S_JOINED:    return (os << "JOINED");
64     case ReplicatorSMM::S_SYNCED:    return (os << "SYNCED");
65     case ReplicatorSMM::S_DONOR:     return (os << "DONOR");
66     }
67 
68     gu_throw_fatal << "invalid state " << static_cast<int>(state);
69 }
70 
71 //////////////////////////////////////////////////////////////////////
72 //////////////////////////////////////////////////////////////////////
73 //                           Public
74 //////////////////////////////////////////////////////////////////////
75 //////////////////////////////////////////////////////////////////////
76 
ReplicatorSMM(const struct wsrep_init_args * args)77 galera::ReplicatorSMM::ReplicatorSMM(const struct wsrep_init_args* args)
78     :
79     ist_event_queue_    (),
80     init_lib_           (reinterpret_cast<gu_log_cb_t>(args->logger_cb)),
81     config_             (),
82     init_config_        (config_, args->node_address, args->data_dir),
83     parse_options_      (*this, config_, args->options),
84     init_ssl_           (config_),
85     protocol_version_   (-1),
86     proto_max_          (gu::from_string<int>(config_.get(Param::proto_max))),
87     state_              (S_CLOSED),
88     closing_mutex_      (),
89     closing_cond_       (),
90     closing_            (false),
91     sst_state_          (SST_NONE),
92     co_mode_            (CommitOrder::from_string(
93                              config_.get(Param::commit_order))),
94     state_file_         (config_.get(BASE_DIR)+'/'+GALERA_STATE_FILE),
95     st_                 (state_file_),
96     safe_to_bootstrap_  (true),
97     trx_params_         (config_.get(BASE_DIR), -1,
98                          KeySet::version(config_.get(Param::key_format)),
99                          TrxHandleMaster::Defaults.record_set_ver_,
100                          gu::from_string<int>(config_.get(
101                              Param::max_write_set_size))),
102     uuid_               (WSREP_UUID_UNDEFINED),
103     state_uuid_         (WSREP_UUID_UNDEFINED),
104     state_uuid_str_     (),
105     cc_seqno_           (WSREP_SEQNO_UNDEFINED),
106     cc_lowest_trx_seqno_(WSREP_SEQNO_UNDEFINED),
107     pause_seqno_        (WSREP_SEQNO_UNDEFINED),
108     app_ctx_            (args->app_ctx),
109     connected_cb_       (args->connected_cb),
110     view_cb_            (args->view_cb),
111     sst_request_cb_     (args->sst_request_cb),
112     apply_cb_           (args->apply_cb),
113     unordered_cb_       (args->unordered_cb),
114     sst_donate_cb_      (args->sst_donate_cb),
115     synced_cb_          (args->synced_cb),
116     sst_donor_          (),
117     sst_uuid_           (WSREP_UUID_UNDEFINED),
118     sst_seqno_          (WSREP_SEQNO_UNDEFINED),
119     sst_mutex_          (),
120     sst_cond_           (),
121     sst_retry_sec_      (1),
122     sst_received_       (false),
123     gcache_             (config_, config_.get(BASE_DIR)),
124     gcs_                (config_, gcache_, proto_max_, args->proto_ver,
125                          args->node_name, args->node_incoming),
126     service_thd_        (gcs_, gcache_),
127     slave_pool_         (sizeof(TrxHandleSlave), 1024, "TrxHandleSlave"),
128     as_                 (new GcsActionSource(slave_pool_, gcs_, *this, gcache_)),
129     ist_receiver_       (config_, gcache_, slave_pool_,*this,args->node_address),
130     ist_senders_        (gcache_),
131     wsdb_               (),
132     cert_               (config_, &service_thd_),
133     pending_cert_queue_ (gcache_),
134     local_monitor_      (),
135     apply_monitor_      (),
136     commit_monitor_     (),
137     causal_read_timeout_(config_.get(Param::causal_read_timeout)),
138     receivers_          (),
139     replicated_         (),
140     replicated_bytes_   (),
141     keys_count_         (),
142     keys_bytes_         (),
143     data_bytes_         (),
144     unrd_bytes_         (),
145     local_commits_      (),
146     local_rollbacks_    (),
147     local_cert_failures_(),
148     local_replays_      (),
149     causal_reads_       (),
150     preordered_id_      (),
151     incoming_list_      (""),
152     incoming_mutex_     (),
153     wsrep_stats_        ()
154 {
155     // @todo add guards (and perhaps actions)
156     state_.add_transition(Transition(S_CLOSED,  S_DESTROYED));
157     state_.add_transition(Transition(S_CLOSED,  S_CONNECTED));
158 
159     state_.add_transition(Transition(S_CONNECTED, S_CLOSED));
160     state_.add_transition(Transition(S_CONNECTED, S_CONNECTED));
161     state_.add_transition(Transition(S_CONNECTED, S_JOINING));
162     // the following is possible only when bootstrapping new cluster
163     // (trivial wsrep_cluster_address)
164     state_.add_transition(Transition(S_CONNECTED, S_JOINED));
165     // the following are possible on PC remerge
166     state_.add_transition(Transition(S_CONNECTED, S_DONOR));
167     state_.add_transition(Transition(S_CONNECTED, S_SYNCED));
168 
169     state_.add_transition(Transition(S_JOINING, S_CLOSED));
170     // the following is possible if one non-prim conf follows another
171     state_.add_transition(Transition(S_JOINING, S_CONNECTED));
172     state_.add_transition(Transition(S_JOINING, S_JOINED));
173 
174     state_.add_transition(Transition(S_JOINED, S_CLOSED));
175     state_.add_transition(Transition(S_JOINED, S_CONNECTED));
176     state_.add_transition(Transition(S_JOINED, S_SYNCED));
177     // the following is possible if one desync() immediately follows another
178     state_.add_transition(Transition(S_JOINED, S_DONOR));
179 
180     state_.add_transition(Transition(S_SYNCED, S_CLOSED));
181     state_.add_transition(Transition(S_SYNCED, S_CONNECTED));
182     state_.add_transition(Transition(S_SYNCED, S_DONOR));
183 
184     state_.add_transition(Transition(S_DONOR, S_CLOSED));
185     state_.add_transition(Transition(S_DONOR, S_CONNECTED));
186     state_.add_transition(Transition(S_DONOR, S_JOINED));
187 
188     local_monitor_.set_initial_position(WSREP_UUID_UNDEFINED, 0);
189 
190     wsrep_uuid_t  uuid;
191     wsrep_seqno_t seqno;
192 
193     st_.get (uuid, seqno, safe_to_bootstrap_);
194 
195     if (0 != args->state_id &&
196         args->state_id->uuid != WSREP_UUID_UNDEFINED &&
197         args->state_id->uuid == uuid                 &&
198         seqno                == WSREP_SEQNO_UNDEFINED)
199     {
200         /* non-trivial recovery information provided on startup, and db is safe
201          * so use recovered seqno value */
202         seqno = args->state_id->seqno;
203     }
204 
205     if (seqno >= 0) // non-trivial starting position
206     {
207         assert(uuid != WSREP_UUID_UNDEFINED);
208         cc_seqno_ = seqno; // is it needed here?
209 
210         log_debug << "ReplicatorSMM() initial position: "
211                   << uuid << ':' << seqno;
212         set_initial_position(uuid, seqno);
213         cert_.assign_initial_position(gu::GTID(uuid, seqno),
214                                       trx_params_.version_);
215         gcache_.seqno_reset(gu::GTID(uuid, seqno));
216         // update gcache position to one supplied by app.
217     }
218 
219     build_stats_vars(wsrep_stats_);
220 }
221 
start_closing()222 void galera::ReplicatorSMM::start_closing()
223 {
224     assert(closing_mutex_.locked());
225     assert(state_() >= S_CONNECTED);
226     if (!closing_)
227     {
228         closing_ = true;
229         gcs_.close();
230     }
231 }
232 
shift_to_CLOSED()233 void galera::ReplicatorSMM::shift_to_CLOSED()
234 {
235     assert(closing_mutex_.locked());
236     assert(closing_);
237 
238     state_.shift_to(S_CLOSED);
239 
240     if (state_uuid_ != WSREP_UUID_UNDEFINED)
241     {
242         st_.set (state_uuid_, last_committed(), safe_to_bootstrap_);
243     }
244 
245     /* Cleanup for re-opening. */
246     uuid_ = WSREP_UUID_UNDEFINED;
247     closing_ = false;
248     if (st_.corrupt())
249     {
250         /* this is a synchronization hack to make sure all receivers are done
251          * with their work and won't access cert module any more. The usual
252          * monitor drain is not enough here. */
253         while (receivers_() > 1) usleep(1000);
254 
255         // this should erase the memory of a pre-existing state.
256         set_initial_position(WSREP_UUID_UNDEFINED, WSREP_SEQNO_UNDEFINED);
257         cert_.assign_initial_position(gu::GTID(GU_UUID_NIL, -1),
258                                       trx_params_.version_);
259         sst_uuid_            = WSREP_UUID_UNDEFINED;
260         sst_seqno_           = WSREP_SEQNO_UNDEFINED;
261         cc_seqno_            = WSREP_SEQNO_UNDEFINED;
262         cc_lowest_trx_seqno_ = WSREP_SEQNO_UNDEFINED;
263         pause_seqno_         = WSREP_SEQNO_UNDEFINED;
264     }
265 
266     closing_cond_.broadcast();
267 }
268 
wait_for_CLOSED(gu::Lock & lock)269 void galera::ReplicatorSMM::wait_for_CLOSED(gu::Lock& lock)
270 {
271     assert(closing_mutex_.locked());
272     assert(closing_);
273     while (state_() > S_CLOSED) lock.wait(closing_cond_);
274     assert(!closing_);
275     assert(WSREP_UUID_UNDEFINED == uuid_);
276 }
277 
~ReplicatorSMM()278 galera::ReplicatorSMM::~ReplicatorSMM()
279 {
280     log_info << "dtor state: " << state_();
281 
282     gu::Lock lock(closing_mutex_);
283 
284     switch (state_())
285     {
286     case S_CONNECTED:
287     case S_JOINING:
288     case S_JOINED:
289     case S_SYNCED:
290     case S_DONOR:
291         start_closing();
292         wait_for_CLOSED(lock);
293         // fall through
294     case S_CLOSED:
295         ist_senders_.cancel();
296         break;
297     case S_DESTROYED:
298         break;
299     }
300 
301     delete as_;
302 }
303 
304 
connect(const std::string & cluster_name,const std::string & cluster_url,const std::string & state_donor,bool const bootstrap)305 wsrep_status_t galera::ReplicatorSMM::connect(const std::string& cluster_name,
306                                               const std::string& cluster_url,
307                                               const std::string& state_donor,
308                                               bool  const        bootstrap)
309 {
310     sst_donor_ = state_donor;
311     service_thd_.reset();
312 
313     // make sure there was a proper initialization/cleanup
314     assert(WSREP_UUID_UNDEFINED == uuid_);
315 
316     ssize_t err = 0;
317     wsrep_status_t ret(WSREP_OK);
318     wsrep_seqno_t const seqno(last_committed());
319     wsrep_uuid_t  const gcs_uuid(seqno < 0 ? WSREP_UUID_UNDEFINED :state_uuid_);
320     gu::GTID      const inpos(gcs_uuid, seqno);
321 
322     log_info << "Setting GCS initial position to " << inpos;
323 
324     if ((bootstrap == true || cluster_url == "gcomm://")
325         && safe_to_bootstrap_ == false)
326     {
327         log_error << "It may not be safe to bootstrap the cluster from this node. "
328                   << "It was not the last one to leave the cluster and may "
329                   << "not contain all the updates. To force cluster bootstrap "
330                   << "with this node, edit the grastate.dat file manually and "
331                   << "set safe_to_bootstrap to 1 .";
332         ret = WSREP_NODE_FAIL;
333     }
334 
335     if (ret == WSREP_OK && (err = gcs_.set_initial_position(inpos)) != 0)
336     {
337         log_error << "gcs init failed:" << strerror(-err);
338         ret = WSREP_NODE_FAIL;
339     }
340 
341     if (ret == WSREP_OK &&
342         (err = gcs_.connect(cluster_name, cluster_url, bootstrap)) != 0)
343     {
344         log_error << "gcs connect failed: " << strerror(-err);
345         ret = WSREP_NODE_FAIL;
346     }
347 
348     if (ret == WSREP_OK)
349     {
350         state_.shift_to(S_CONNECTED);
351     }
352 
353     return ret;
354 }
355 
356 
close()357 wsrep_status_t galera::ReplicatorSMM::close()
358 {
359     gu::Lock lock(closing_mutex_);
360 
361     if (state_() > S_CLOSED)
362     {
363         start_closing();
364         wait_for_CLOSED(lock);
365     }
366 
367     return WSREP_OK;
368 }
369 
370 
async_recv(void * recv_ctx)371 wsrep_status_t galera::ReplicatorSMM::async_recv(void* recv_ctx)
372 {
373     if (state_() <= S_CLOSED)
374     {
375         log_error <<"async recv cannot start, provider in CLOSED state";
376         return WSREP_FATAL;
377     }
378 
379     ++receivers_;
380 
381     bool exit_loop(false);
382     wsrep_status_t retval(WSREP_OK);
383 
384     while (WSREP_OK == retval && state_() > S_CLOSED)
385     {
386         GU_DBUG_SYNC_EXECUTE("before_async_recv_process_sync", sleep(5););
387 
388         ssize_t rc;
389 
390         while (gu_unlikely((rc = as_->process(recv_ctx, exit_loop))
391                            == -ECANCELED))
392         {
393             recv_IST(recv_ctx);
394             // hack: prevent fast looping until ist controlling thread
395             // resumes gcs prosessing
396             usleep(10000);
397         }
398 
399         if (gu_unlikely(rc <= 0))
400         {
401             if (GcsActionSource::INCONSISTENCY_CODE == rc)
402             {
403                 st_.mark_corrupt();
404                 retval = WSREP_FATAL;
405             }
406             else
407             {
408                 retval = WSREP_CONN_FAIL;
409             }
410         }
411         else if (gu_unlikely(exit_loop == true))
412         {
413             assert(WSREP_OK == retval);
414 
415             if (receivers_.sub_and_fetch(1) > 0)
416             {
417                 log_info << "Slave thread exiting on request.";
418                 break;
419             }
420 
421             ++receivers_;
422             log_warn << "Refusing exit for the last slave thread.";
423         }
424     }
425 
426     /* exiting loop already did proper checks */
427     if (!exit_loop && receivers_.sub_and_fetch(1) == 0)
428     {
429         gu::Lock lock(closing_mutex_);
430         if (state_() > S_CLOSED && !closing_)
431         {
432             assert(WSREP_CONN_FAIL == retval);
433             /* Last recv thread exiting due to error but replicator is not
434              * closed. We need to at least gracefully leave the cluster.*/
435 
436             if (WSREP_OK == retval)
437             {
438                 log_warn << "Broken shutdown sequence, provider state: "
439                          << state_() << ", retval: " << retval;
440                 assert (0);
441             }
442 
443             start_closing();
444 
445             // Generate zero view before exit to notify application
446             gcs_act_cchange const cc;
447             wsrep_uuid_t tmp(uuid_);
448             wsrep_view_info_t* const err_view
449                 (galera_view_info_create(cc, capabilities(cc.repl_proto_ver),
450                                          -1, tmp));
451             view_cb_(app_ctx_, recv_ctx, err_view, 0, 0);
452             free(err_view);
453 
454             shift_to_CLOSED();
455         }
456     }
457 
458     log_debug << "Slave thread exit. Return code: " << retval;
459 
460     return retval;
461 }
462 
apply_trx(void * recv_ctx,TrxHandleSlave & ts)463 void galera::ReplicatorSMM::apply_trx(void* recv_ctx, TrxHandleSlave& ts)
464 {
465     assert(ts.global_seqno() > 0);
466     assert(!ts.is_committed());
467     if (!ts.skip_event())
468     {
469         assert(ts.trx_id() != uint64_t(-1) || ts.is_toi());
470         assert(ts.certified() /*Repl*/ || ts.preordered() /*IST*/);
471         assert(ts.local() == false || ts.nbo_end() ||
472                (ts.flags() & TrxHandle::F_COMMIT) ||
473                (ts.flags() & TrxHandle::F_ROLLBACK));
474         assert(ts.nbo_end() == false || ts.is_dummy());
475     }
476 
477     ApplyException ae;
478 
479     ApplyOrder ao(ts);
480     CommitOrder co(ts, co_mode_);
481 
482     TX_SET_STATE(ts, TrxHandle::S_APPLYING);
483 
484     gu_trace(apply_monitor_.enter(ao));
485 
486     if (gu_unlikely(ts.nbo_start() == true))
487     {
488         // Non-blocking operation start, mark state unsafe.
489         st_.mark_unsafe();
490     }
491 
492     wsrep_trx_meta_t meta = { { state_uuid_,    ts.global_seqno() },
493                               { ts.source_id(), ts.trx_id(), ts.conn_id() },
494                               ts.depends_seqno() };
495 
496     if (ts.is_toi())
497     {
498         log_debug << "Executing TO isolated action: " << ts;
499         st_.mark_unsafe();
500     }
501 
502     wsrep_bool_t exit_loop(false);
503 
504     try { gu_trace(ts.apply(recv_ctx, apply_cb_, meta, exit_loop)); }
505     catch (ApplyException& e)
506     {
507         assert(0 != e.status());
508         assert(NULL != e.data() || 0 == e.data_len());
509         assert(0 != e.data_len() || NULL == e.data());
510 
511         if (!st_.corrupt())
512         {
513             assert(0 == e.data_len());
514             /* non-empty error must be handled in handle_apply_error(), while
515              * still in commit monitor. */
516             on_inconsistency();
517         }
518     }
519     /* at this point any other exception is fatal, not catching anything else.*/
520 
521     if (ts.local() == false)
522     {
523         GU_DBUG_SYNC_WAIT("after_commit_slave_sync");
524     }
525 
526     wsrep_seqno_t const safe_to_discard(cert_.set_trx_committed(ts));
527 
528     /* For now need to keep it inside apply monitor to ensure all processing
529      * ends by the time monitors are drained because of potential gcache
530      * cleanup (and loss of the writeset buffer). Perhaps unordered monitor
531      * is needed here. */
532     ts.unordered(recv_ctx, unordered_cb_);
533 
534     apply_monitor_.leave(ao);
535 
536     if (ts.is_toi())
537     {
538         log_debug << "Done executing TO isolated action: "
539                   << ts.global_seqno();
540         st_.mark_safe();
541     }
542 
543     if (gu_likely(ts.local_seqno() != -1))
544     {
545         // trx with local seqno -1 originates from IST (or other source not gcs)
546         report_last_committed(safe_to_discard);
547     }
548 
549     ts.set_exit_loop(exit_loop);
550 }
551 
552 
send(TrxHandleMaster & trx,wsrep_trx_meta_t * meta)553 wsrep_status_t galera::ReplicatorSMM::send(TrxHandleMaster& trx,
554                                            wsrep_trx_meta_t* meta)
555 {
556     assert(trx.locked());
557     if (state_() < S_JOINED) return WSREP_TRX_FAIL;
558 
559     // SR rollback
560     const bool rollback(trx.flags() & TrxHandle::F_ROLLBACK);
561 
562     if (rollback)
563     {
564         assert(trx.state() == TrxHandle::S_ABORTING);
565         assert((trx.flags() & TrxHandle::F_BEGIN) == 0);
566         TrxHandleSlavePtr ts(TrxHandleSlave::New(true, slave_pool_),
567                              TrxHandleSlaveDeleter());
568         ts->set_global_seqno(0);
569         trx.add_replicated(ts);
570     }
571 
572     WriteSetNG::GatherVector actv;
573 
574     size_t act_size = trx.gather(actv);
575 
576     ssize_t rcode(0);
577     do
578     {
579         const bool scheduled(!rollback);
580 
581         if (scheduled)
582         {
583             const ssize_t gcs_handle(gcs_.schedule());
584 
585             if (gu_unlikely(gcs_handle < 0))
586             {
587                 log_debug << "gcs schedule " << strerror(-gcs_handle);
588                 rcode = gcs_handle;
589                 goto out;
590             }
591             trx.set_gcs_handle(gcs_handle);
592         }
593 
594         trx.finalize(last_committed());
595         trx.unlock();
596         // On rollback fragment, we instruct sendv to use gcs_sm_grab()
597         // to avoid the scenario where trx is BF aborted but can't send
598         // ROLLBACK fragment due to flow control, which results in
599         // deadlock.
600         // Otherwise sendv call was scheduled above, and we instruct
601         // the call to use regular gcs_sm_enter()
602         const bool grab(rollback);
603         rcode = gcs_.sendv(actv, act_size,
604                            GCS_ACT_WRITESET,
605                            scheduled, grab);
606         GU_DBUG_SYNC_WAIT("after_send_sync");
607         trx.lock();
608     }
609     // TODO: Break loop after some timeout
610     while (rcode == -EAGAIN && (usleep(1000), true));
611 
612     trx.set_gcs_handle(-1);
613 
614 out:
615 
616     if (rcode <= 0)
617     {
618         log_debug << "ReplicatorSMM::send failed: " << -rcode;
619     }
620 
621     return (rcode > 0 ? WSREP_OK : WSREP_TRX_FAIL);
622 }
623 
624 
replicate(TrxHandleMaster & trx,wsrep_trx_meta_t * meta)625 wsrep_status_t galera::ReplicatorSMM::replicate(TrxHandleMaster& trx,
626                                                 wsrep_trx_meta_t* meta)
627 {
628     assert(trx.locked());
629     assert(!(trx.flags() & TrxHandle::F_ROLLBACK));
630     assert(trx.state() == TrxHandle::S_EXECUTING ||
631            trx.state() == TrxHandle::S_MUST_ABORT);
632 
633     if (state_() < S_JOINED || trx.state() == TrxHandle::S_MUST_ABORT)
634     {
635     must_abort:
636         if (trx.state() == TrxHandle::S_EXECUTING ||
637             trx.state() == TrxHandle::S_REPLICATING)
638             TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
639 
640         TX_SET_STATE(trx, TrxHandle::S_ABORTING);
641 
642         if (trx.ts() != 0)
643         {
644             assert(trx.ts()->state() == TrxHandle::S_COMMITTED);
645             trx.reset_ts();
646         }
647 
648         return (st_.corrupt() ? WSREP_NODE_FAIL : WSREP_CONN_FAIL);
649     }
650 
651     WriteSetNG::GatherVector actv;
652 
653     gcs_action act;
654     act.type = GCS_ACT_WRITESET;
655 #ifndef NDEBUG
656     act.seqno_g = GCS_SEQNO_ILL;
657 #endif
658 
659     act.buf  = NULL;
660     act.size = trx.gather(actv);
661     TX_SET_STATE(trx, TrxHandle::S_REPLICATING);
662 
663     ssize_t rcode(-1);
664 
665     do
666     {
667         assert(act.seqno_g == GCS_SEQNO_ILL);
668 
669         const ssize_t gcs_handle(gcs_.schedule());
670 
671         if (gu_unlikely(gcs_handle < 0))
672         {
673             log_debug << "gcs schedule " << strerror(-gcs_handle);
674             goto must_abort;
675         }
676 
677         trx.set_gcs_handle(gcs_handle);
678 
679         trx.finalize(last_committed());
680         trx.unlock();
681         assert (act.buf == NULL); // just a sanity check
682         rcode = gcs_.replv(actv, act, true);
683 
684         GU_DBUG_SYNC_WAIT("after_replicate_sync")
685         trx.lock();
686     }
687     while (rcode == -EAGAIN && trx.state() != TrxHandle::S_MUST_ABORT &&
688            (usleep(1000), true));
689 
690     trx.set_gcs_handle(-1);
691 
692     if (rcode < 0)
693     {
694         if (rcode != -EINTR)
695         {
696             log_debug << "gcs_repl() failed with " << strerror(-rcode)
697                       << " for trx " << trx;
698         }
699 
700         assert(rcode != -EINTR || trx.state() == TrxHandle::S_MUST_ABORT);
701         assert(act.seqno_l == GCS_SEQNO_ILL && act.seqno_g == GCS_SEQNO_ILL);
702         assert(NULL == act.buf);
703 
704         if (trx.state() != TrxHandle::S_MUST_ABORT)
705         {
706             TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
707         }
708 
709         goto must_abort;
710     }
711 
712     assert(act.buf != NULL);
713     assert(act.size == rcode);
714     assert(act.seqno_l > 0);
715     assert(act.seqno_g > 0);
716 
717     TrxHandleSlavePtr ts(TrxHandleSlave::New(true, slave_pool_),
718                          TrxHandleSlaveDeleter());
719 
720     gu_trace(ts->unserialize<true>(act));
721     ts->set_local(true);
722 
723     ts->update_stats(keys_count_, keys_bytes_, data_bytes_, unrd_bytes_);
724 
725     trx.add_replicated(ts);
726 
727     ++replicated_;
728     replicated_bytes_ += rcode;
729 
730     assert(trx.source_id() == ts->source_id());
731     assert(trx.conn_id()   == ts->conn_id());
732     assert(trx.trx_id()    == ts->trx_id());
733 
734     assert(ts->global_seqno() == act.seqno_g);
735     assert(ts->last_seen_seqno() >= 0);
736 
737     assert(trx.ts() == ts);
738 
739     wsrep_status_t retval(WSREP_TRX_FAIL);
740 
741     // ROLLBACK event shortcut to avoid blocking in monitors or
742     // getting BF aborted inside provider
743     if (gu_unlikely(ts->flags() & TrxHandle::F_ROLLBACK))
744     {
745         // ROLLBACK fragments should be replicated through ReplicatorSMM::send(),
746         // assert here for debug builds to catch if this is not a case.
747         assert(0);
748         assert(ts->depends_seqno() > 0); // must be set at unserialization
749         ts->cert_bypass(true);
750         ts->mark_certified();
751 
752         TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
753         TX_SET_STATE(trx, TrxHandle::S_ABORTING);
754 
755         pending_cert_queue_.push(ts);
756         cancel_monitors_for_local(*ts);
757 
758         goto out;
759     }
760 
761     if (gu_unlikely(trx.state() == TrxHandle::S_MUST_ABORT))
762     {
763         retval = cert_for_aborted(ts);
764 
765         if (retval != WSREP_BF_ABORT)
766         {
767             assert(trx.state() == TrxHandle::S_MUST_ABORT);
768             TX_SET_STATE(trx, TrxHandle::S_ABORTING);
769 
770             pending_cert_queue_.push(ts);
771             cancel_monitors_for_local(*ts);
772 
773             assert(ts->is_dummy());
774             assert(WSREP_OK != retval);
775         }
776         else
777         {
778             // If the transaction was committing, it must replay.
779             if (ts->flags() & TrxHandle::F_COMMIT)
780             {
781                 TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY);
782             }
783             else
784             {
785                 TX_SET_STATE(trx, TrxHandle::S_ABORTING);
786 
787                 pending_cert_queue_.push(ts);
788                 cancel_monitors_for_local(*ts);
789 
790                 retval = WSREP_TRX_FAIL;
791             }
792         }
793     }
794     else
795     {
796         assert(trx.state() == TrxHandle::S_REPLICATING);
797         retval = WSREP_OK;
798     }
799 
800 out:
801     assert(trx.state() != TrxHandle::S_MUST_ABORT);
802     assert(ts->global_seqno() >  0);
803     assert(ts->global_seqno() == act.seqno_g);
804 
805     if (meta != 0) // whatever the retval, we must update GTID in meta
806     {
807         meta->gtid.uuid  = state_uuid_;
808         meta->gtid.seqno = ts->global_seqno();
809         meta->depends_on = ts->depends_seqno();
810     }
811 
812     return retval;
813 }
814 
815 wsrep_status_t
abort_trx(TrxHandleMaster & trx,wsrep_seqno_t bf_seqno,wsrep_seqno_t * victim_seqno)816 galera::ReplicatorSMM::abort_trx(TrxHandleMaster& trx, wsrep_seqno_t bf_seqno,
817                                  wsrep_seqno_t* victim_seqno)
818 {
819     assert(trx.local() == true);
820     assert(trx.locked());
821 
822     const TrxHandleSlavePtr ts(trx.ts());
823 
824     if (ts)
825     {
826         log_debug << "aborting ts  " << *ts;
827         assert(ts->global_seqno() != WSREP_SEQNO_UNDEFINED);
828         if (ts->global_seqno() < bf_seqno &&
829             (ts->flags() & TrxHandle::F_COMMIT))
830         {
831             log_debug << "seqno " << bf_seqno
832                       << " trying to abort seqno " << ts->global_seqno();
833             *victim_seqno = ts->global_seqno();
834             return WSREP_NOT_ALLOWED;
835         }
836     }
837     else
838     {
839         log_debug << "aborting trx " << trx;
840     }
841 
842     wsrep_status_t retval(WSREP_OK);
843     switch (trx.state())
844     {
845     case TrxHandle::S_MUST_ABORT:
846     case TrxHandle::S_ABORTING:
847     case TrxHandle::S_MUST_REPLAY:
848         // victim trx was already BF aborted or it failed certification
849         retval = WSREP_NOT_ALLOWED;
850         break;
851     case TrxHandle::S_EXECUTING:
852         TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
853         break;
854     case TrxHandle::S_REPLICATING:
855     {
856         // @note: it is important to place set_state() into beginning of
857         // every case, because state must be changed AFTER switch() and
858         // BEFORE entering monitors or taking any other action.
859         TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
860         int rc;
861         if (trx.gcs_handle() > 0 &&
862             ((rc = gcs_.interrupt(trx.gcs_handle()))) != 0)
863         {
864             log_debug << "gcs_interrupt(): handle "
865                       << trx.gcs_handle()
866                       << " trx id " << trx.trx_id()
867                       << ": " << strerror(-rc);
868         }
869         break;
870     }
871     case TrxHandle::S_CERTIFYING:
872     {
873         // trx is waiting in local monitor
874         assert(ts);
875         assert(ts->global_seqno() > 0);
876         log_debug << "aborting ts: " << *ts << "; BF seqno: " << bf_seqno
877                   << "; local position: " << local_monitor_.last_left();
878         TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
879         LocalOrder lo(*ts);
880         local_monitor_.interrupt(lo);
881         break;
882     }
883     case TrxHandle::S_APPLYING:
884     {
885         // trx is waiting in apply monitor
886         assert(ts);
887         assert(ts->global_seqno() > 0);
888         log_debug << "aborting ts: " << *ts << "; BF seqno: " << bf_seqno
889                   << "; apply window: " << apply_monitor_.last_left() << " - "
890                   << apply_monitor_.last_entered();
891         TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
892         ApplyOrder ao(*ts);
893         apply_monitor_.interrupt(ao);
894         break;
895     }
896     case TrxHandle::S_COMMITTING:
897     {
898         // Trx is waiting in commit monitor
899         assert(ts);
900         assert(ts->global_seqno() > 0);
901         log_debug << "aborting ts: " << *ts << "; BF seqno: " << bf_seqno
902                   << "; commit position: " << last_committed();
903         if (co_mode_ != CommitOrder::BYPASS)
904         {
905             CommitOrder co(*ts, co_mode_);
906             bool const interrupted(commit_monitor_.interrupt(co));
907             if (interrupted || !(ts->flags() & TrxHandle::F_COMMIT))
908             {
909                 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
910             }
911             else
912             {
913                 retval = WSREP_NOT_ALLOWED;
914             }
915         }
916         break;
917     }
918     case TrxHandle::S_COMMITTED:
919         assert(ts);
920         assert(ts->global_seqno() > 0);
921         if (ts->global_seqno() < bf_seqno &&
922             (ts->flags() & TrxHandle::F_COMMIT))
923         {
924             retval = WSREP_NOT_ALLOWED;
925         }
926         else
927         {
928             retval = WSREP_OK;
929         }
930         break;
931     case TrxHandle::S_ROLLING_BACK:
932         log_error << "Attempt to enter commit monitor while holding "
933             "locks in rollback by " << trx;
934         // fallthrough
935     default:
936         log_warn << "invalid state " << trx.state()
937                  << " in abort_trx for trx"
938                  << trx;
939         assert(0);
940     }
941     if (retval == WSREP_OK || retval == WSREP_NOT_ALLOWED)
942     {
943         *victim_seqno = (ts != 0 ? ts->global_seqno() : WSREP_SEQNO_UNDEFINED);
944     }
945     return retval;
946 }
947 
948 
certify(TrxHandleMaster & trx,wsrep_trx_meta_t * meta)949 wsrep_status_t galera::ReplicatorSMM::certify(TrxHandleMaster&  trx,
950                                               wsrep_trx_meta_t* meta)
951 {
952     assert(trx.state() == TrxHandle::S_REPLICATING);
953 
954     TrxHandleSlavePtr ts(trx.ts());
955     assert(ts->state() == TrxHandle::S_REPLICATING);
956 
957     // Rollback should complete with post_rollback
958     assert((ts->flags() & TrxHandle::F_ROLLBACK) == 0);
959 
960     assert(ts->local_seqno()  > 0);
961     assert(ts->global_seqno() > 0);
962     assert(ts->last_seen_seqno() >= 0);
963     assert(ts->depends_seqno() >= -1);
964 
965     if (meta != 0)
966     {
967         assert(meta->gtid.uuid  == state_uuid_);
968         assert(meta->gtid.seqno == ts->global_seqno());
969         assert(meta->depends_on == ts->depends_seqno());
970     }
971     // State should not be checked here: If trx has been replicated,
972     // it has to be certified and potentially applied. #528
973     // if (state_() < S_JOINED) return WSREP_TRX_FAIL;
974 
975     wsrep_status_t retval(cert_and_catch(&trx, ts));
976 
977     assert((ts->flags() & TrxHandle::F_ROLLBACK) == 0 ||
978            trx.state() == TrxHandle::S_ABORTING);
979 
980     if (gu_unlikely(retval != WSREP_OK))
981     {
982         switch(retval)
983         {
984         case WSREP_BF_ABORT:
985             assert(ts->depends_seqno() >= 0);
986             assert(trx.state() == TrxHandle::S_MUST_REPLAY ||
987                    !(ts->flags() & TrxHandle::F_COMMIT));
988             assert(ts->state() == TrxHandle::S_REPLICATING ||
989                    ts->state() == TrxHandle::S_CERTIFYING);
990             // apply monitor will be entered in due course during replay
991             break;
992         case WSREP_TRX_FAIL:
993             /* committing fragment fails certification or non-committing BF'ed */
994             // If the ts was queued, the depends seqno cannot be trusted
995             // as it may be modified concurrently.
996             assert(ts->queued() || ts->is_dummy() ||
997                    (ts->flags() & TrxHandle::F_COMMIT) == 0);
998             assert(ts->state() == TrxHandle::S_CERTIFYING ||
999                    ts->state() == TrxHandle::S_REPLICATING);
1000             if (ts->state() == TrxHandle::S_REPLICATING)
1001                 TX_SET_STATE(*ts, TrxHandle::S_CERTIFYING);
1002             break;
1003         default:
1004             assert(0);
1005         }
1006 
1007         return retval;
1008     }
1009     else
1010     {
1011         if (meta) meta->depends_on = ts->depends_seqno();
1012         if (enter_apply_monitor_for_local(trx, ts))
1013         {
1014             TX_SET_STATE(*ts, TrxHandle::S_APPLYING);
1015             if (trx.state() == TrxHandle::S_MUST_ABORT)
1016                 return WSREP_BF_ABORT;
1017             else
1018                 return WSREP_OK;
1019         }
1020         else
1021         {
1022             return handle_apply_monitor_interrupted(trx, ts);
1023         }
1024     }
1025 }
1026 
1027 
replay_trx(TrxHandleMaster & trx,TrxHandleLock & lock,void * const trx_ctx)1028 wsrep_status_t galera::ReplicatorSMM::replay_trx(TrxHandleMaster& trx,
1029                                                  TrxHandleLock& lock,
1030                                                  void* const      trx_ctx)
1031 {
1032     TrxHandleSlavePtr tsp(trx.ts());
1033     assert(tsp);
1034     TrxHandleSlave& ts(*tsp);
1035 
1036     assert(ts.global_seqno() > last_committed());
1037 
1038     log_debug << "replay trx: " << trx << " ts: " << ts;
1039 
1040     if (trx.state() == TrxHandle::S_MUST_ABORT)
1041     {
1042         // BF aborted outside of provider.
1043         TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY);
1044     }
1045 
1046     assert(trx.state() == TrxHandle::S_MUST_REPLAY);
1047     assert(trx.trx_id() != static_cast<wsrep_trx_id_t>(-1));
1048 
1049     wsrep_status_t retval(WSREP_OK);
1050 
1051     // Note: We set submit NULL trx pointer below to avoid
1052     // interrupting replaying in any monitor during replay.
1053 
1054     switch (ts.state())
1055     {
1056     case TrxHandle::S_REPLICATING:
1057         retval = cert_and_catch(&trx, tsp);
1058         assert(ts.state() == TrxHandle::S_CERTIFYING);
1059         if (retval != WSREP_OK)
1060         {
1061             assert(retval == WSREP_TRX_FAIL);
1062             assert(ts.is_dummy());
1063             break;
1064         }
1065         // fall through
1066     case TrxHandle::S_CERTIFYING:
1067     {
1068         assert(ts.state() == TrxHandle::S_CERTIFYING);
1069 
1070         ApplyOrder ao(ts);
1071         assert(apply_monitor_.entered(ao) == false);
1072         gu_trace(apply_monitor_.enter(ao));
1073         TX_SET_STATE(ts, TrxHandle::S_APPLYING);
1074     }
1075     // fall through
1076     case TrxHandle::S_APPLYING:
1077         //
1078         // Commit monitor will be entered from commit_order_enter_remote.
1079         //
1080         // fall through
1081     case TrxHandle::S_COMMITTING:
1082         ++local_replays_;
1083 
1084         // safety measure to make sure that all preceding trxs are
1085         // ordered for commit before replaying
1086         commit_monitor_.wait(ts.global_seqno() - 1);
1087 
1088         TX_SET_STATE(trx, TrxHandle::S_REPLAYING);
1089         try
1090         {
1091             // Only committing transactions should be replayed
1092             assert(ts.flags() & TrxHandle::F_COMMIT);
1093 
1094             wsrep_trx_meta_t meta = {{ state_uuid_,    ts.global_seqno() },
1095                                      { ts.source_id(), ts.trx_id(),
1096                                        ts.conn_id()                      },
1097                                      ts.depends_seqno()};
1098 
1099             /* failure to replay own trx is certainly a sign of inconsistency,
1100              * not trying to catch anything here */
1101             assert(trx.owned());
1102             bool unused(false);
1103             lock.unlock();
1104             gu_trace(ts.apply(trx_ctx, apply_cb_, meta, unused));
1105             lock.lock();
1106             assert(false == unused);
1107             log_debug << "replayed " << ts.global_seqno();
1108             assert(ts.state() == TrxHandle::S_COMMITTED);
1109             assert(trx.state() == TrxHandle::S_COMMITTED);
1110         }
1111         catch (gu::Exception& e)
1112         {
1113             on_inconsistency();
1114             return WSREP_NODE_FAIL;
1115         }
1116 
1117         // apply, commit monitors are released in post commit
1118         return WSREP_OK;
1119     default:
1120         assert(0);
1121         gu_throw_fatal << "Invalid state in replay for trx " << trx;
1122     }
1123 
1124     log_debug << "replaying failed for trx " << trx;
1125     assert(trx.state() == TrxHandle::S_ABORTING);
1126 
1127     return retval;
1128 }
1129 
1130 static void
dump_buf(std::ostream & os,const void * const buf,size_t const buf_len)1131 dump_buf(std::ostream& os, const void* const buf, size_t const buf_len)
1132 {
1133     std::ios_base::fmtflags const saved_flags(os.flags());
1134     char                    const saved_fill (os.fill('0'));
1135 
1136     os << std::oct;
1137 
1138     const char* const str(static_cast<const char*>(buf));
1139     for (size_t i(0); i < buf_len; ++i)
1140     {
1141         char const c(str[i]);
1142 
1143         if ('\0' == c) break;
1144 
1145         try
1146         {
1147             if (isprint(c) || isspace(c))
1148             {
1149                 os.put(c);
1150             }
1151             else
1152             {
1153                 os << '\\' << std::setw(2) << int(c);
1154             }
1155         }
1156         catch (std::ios_base::failure& f)
1157         {
1158             log_warn << "Failed to dump " << i << "th byte: " << f.what();
1159             break;
1160         }
1161     }
1162 
1163     os.flags(saved_flags);
1164     os.fill (saved_fill);
1165 }
1166 
1167 wsrep_status_t
handle_commit_interrupt(TrxHandleMaster & trx,const TrxHandleSlave & ts)1168 galera::ReplicatorSMM::handle_commit_interrupt(TrxHandleMaster& trx,
1169                                                const TrxHandleSlave& ts)
1170 {
1171     assert(trx.state() == TrxHandle::S_MUST_ABORT);
1172 
1173     if (ts.flags() & TrxHandle::F_COMMIT)
1174     {
1175         TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY);
1176         return WSREP_BF_ABORT;
1177     }
1178     else
1179     {
1180         TX_SET_STATE(trx, TrxHandle::S_ABORTING);
1181         return WSREP_TRX_FAIL;
1182     }
1183 }
1184 
1185 wsrep_status_t
commit_order_enter_local(TrxHandleMaster & trx)1186 galera::ReplicatorSMM::commit_order_enter_local(TrxHandleMaster& trx)
1187 {
1188     assert(trx.local());
1189     assert(trx.ts() && trx.ts()->global_seqno() > 0);
1190     assert(trx.locked());
1191 
1192     assert(trx.state() == TrxHandle::S_APPLYING  ||
1193            trx.state() == TrxHandle::S_ABORTING  ||
1194            trx.state() == TrxHandle::S_REPLAYING);
1195 
1196     TrxHandleSlavePtr tsp(trx.ts());
1197     TrxHandleSlave& ts(*tsp);
1198 
1199     if (trx.state() != TrxHandle::S_APPLYING)
1200     {
1201         // Transactions which are rolling back or replaying
1202         // may not have grabbed apply monitor so far. Do it
1203         // before proceeding.
1204         enter_apply_monitor_for_local_not_committing(trx, ts);
1205     }
1206 #ifndef NDEBUG
1207     {
1208         ApplyOrder ao(ts);
1209         assert(apply_monitor_.entered(ao));
1210     }
1211 #endif // NDEBUG
1212 
1213     TrxHandle::State const next_state
1214         (trx.state() == TrxHandle::S_ABORTING ?
1215          TrxHandle::S_ROLLING_BACK : TrxHandle::S_COMMITTING);
1216 
1217     TX_SET_STATE(trx, next_state);
1218 
1219     if (co_mode_ == CommitOrder::BYPASS)
1220     {
1221         TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1222         return WSREP_OK;
1223     }
1224 
1225     CommitOrder co(ts, co_mode_);
1226     if (ts.state() < TrxHandle::S_COMMITTING)
1227     {
1228         assert(!commit_monitor_.entered(co));
1229     }
1230     else
1231     {
1232         // was BF'ed after having entered commit monitor. This may happen
1233         // for SR fragment.
1234         assert(commit_monitor_.entered(co));
1235         return WSREP_OK;
1236     }
1237 
1238     try
1239     {
1240         trx.unlock();
1241         GU_DBUG_SYNC_WAIT("before_local_commit_monitor_enter");
1242         gu_trace(commit_monitor_.enter(co));
1243         assert(commit_monitor_.entered(co));
1244         trx.lock();
1245 
1246         TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1247 
1248         /* non-committing fragments may be interrupted after having entered
1249          * commit_monitor_ */
1250         if (0 == (ts.flags() & TrxHandle::F_COMMIT) &&
1251             trx.state() == TrxHandle::S_MUST_ABORT)
1252             return handle_commit_interrupt(trx, ts);
1253 
1254         assert(trx.state() == TrxHandle::S_COMMITTING ||
1255                trx.state() == TrxHandle::S_ROLLING_BACK);
1256 
1257     }
1258     catch (gu::Exception& e)
1259     {
1260         assert(!commit_monitor_.entered(co));
1261         assert(next_state != TrxHandle::S_ROLLING_BACK);
1262         trx.lock();
1263         if (e.get_errno() == EINTR)
1264         {
1265             return handle_commit_interrupt(trx, ts);
1266         }
1267         else throw;
1268     }
1269 
1270     assert(ts.global_seqno() > last_committed());
1271     assert(trx.locked());
1272 
1273     assert(trx.state() == TrxHandle::S_COMMITTING ||
1274            trx.state() == TrxHandle::S_ROLLING_BACK);
1275 
1276     return WSREP_OK;
1277 }
1278 
1279 wsrep_status_t
commit_order_enter_remote(TrxHandleSlave & trx)1280 galera::ReplicatorSMM::commit_order_enter_remote(TrxHandleSlave& trx)
1281 {
1282     assert(trx.global_seqno() > 0);
1283     assert(trx.state() == TrxHandle::S_APPLYING  ||
1284            trx.state() == TrxHandle::S_ABORTING);
1285 
1286 #ifndef NDEBUG
1287     if (trx.state() == TrxHandle::S_REPLAYING)
1288     {
1289         assert(trx.local());
1290         assert((trx.flags() & TrxHandle::F_ROLLBACK) == 0);
1291 
1292         ApplyOrder ao(trx);
1293         assert(apply_monitor_.entered(ao));
1294     }
1295 #endif /* NDEBUG */
1296 
1297     CommitOrder co(trx, co_mode_);
1298 
1299     assert(!commit_monitor_.entered(co));
1300 
1301     if (gu_likely(co_mode_ != CommitOrder::BYPASS))
1302     {
1303         gu_trace(commit_monitor_.enter(co));
1304     }
1305 
1306     TX_SET_STATE(trx, TrxHandle::S_COMMITTING);
1307 
1308     return WSREP_OK;
1309 }
1310 
process_apply_error(TrxHandleSlave & trx,const wsrep_buf_t & error)1311 void galera::ReplicatorSMM::process_apply_error(TrxHandleSlave& trx,
1312                                                 const wsrep_buf_t& error)
1313 {
1314     gu::GTID const gtid(state_uuid_, trx.global_seqno());
1315     int res;
1316 
1317     if (trx.local_seqno() != -1 || trx.nbo_end())
1318     {
1319         /* this must be done IN ORDER to avoid multiple elections, hence
1320          * anything else but LOCAL_OOOC and NO_OOOC is potentially broken */
1321         res = gcs_.vote(gtid, -1, error.ptr, error.len);
1322     }
1323     else res = 2;
1324 
1325     if (res != 0)
1326     {
1327         std::ostringstream os;
1328 
1329         switch (res)
1330         {
1331         case 2:
1332             os << "Failed on preordered " << gtid << ": inconsistency.";
1333             break;
1334         case 1:
1335             os << "Inconsistent by consensus on " << gtid;
1336             break;
1337         default:
1338             os << "Could not reach consensus on " << gtid
1339                << " (rcode: " << res << "), assuming inconsistency.";
1340         }
1341 
1342         galera::ApplyException ae(os.str(), NULL, error.ptr, error.len);
1343         GU_TRACE(ae);
1344         throw ae;
1345     }
1346     else
1347     {
1348         /* mark action as invalid (skip seqno) and return normally */
1349         gcache_.seqno_skip(trx.action().first,
1350                            trx.global_seqno(), GCS_ACT_WRITESET);
1351     }
1352 }
1353 
1354 wsrep_status_t
handle_apply_error(TrxHandleSlave & ts,const wsrep_buf_t & error,const std::string & custom_msg)1355 galera::ReplicatorSMM::handle_apply_error(TrxHandleSlave&    ts,
1356                                           const wsrep_buf_t& error,
1357                                           const std::string& custom_msg)
1358 {
1359     assert(error.len > 0);
1360 
1361     std::ostringstream os;
1362 
1363     os << custom_msg << ts.global_seqno() << ", error: ";
1364     dump_buf(os, error.ptr, error.len);
1365     log_debug << "handle_apply_error(): " << os.str();
1366 
1367     try
1368     {
1369         if (!st_.corrupt())
1370             gu_trace(process_apply_error(ts, error));
1371         return WSREP_OK;
1372     }
1373     catch (ApplyException& e)
1374     {
1375         log_error << "Inconsistency detected: " << e.what();
1376         on_inconsistency();
1377     }
1378     catch (gu::Exception& e)
1379     {
1380         log_error << "Unexpected exception: " << e.what();
1381             assert(0);
1382             abort();
1383     }
1384     catch (...)
1385     {
1386         log_error << "Unknown exception";
1387         assert(0);
1388         abort();
1389     }
1390 
1391     return WSREP_NODE_FAIL;
1392 }
1393 
1394 wsrep_status_t
commit_order_leave(TrxHandleSlave & ts,const wsrep_buf_t * const error)1395 galera::ReplicatorSMM::commit_order_leave(TrxHandleSlave&          ts,
1396                                           const wsrep_buf_t* const error)
1397 {
1398     assert(ts.state() == TrxHandle::S_COMMITTING);
1399 
1400 #ifndef NDEBUG
1401     {
1402         CommitOrder co(ts, co_mode_);
1403         assert(co_mode_ != CommitOrder::BYPASS || commit_monitor_.entered(co));
1404     }
1405 #endif
1406 
1407     wsrep_status_t retval(WSREP_OK);
1408 
1409     if (gu_unlikely(error != NULL && error->ptr != NULL))
1410     {
1411         retval = handle_apply_error(ts, *error, "Failed to apply writeset ");
1412     }
1413 
1414     if (gu_likely(co_mode_ != CommitOrder::BYPASS))
1415     {
1416         CommitOrder co(ts, co_mode_);
1417         commit_monitor_.leave(co);
1418     }
1419 
1420     TX_SET_STATE(ts, TrxHandle::S_COMMITTED);
1421     /* master state will be set upon release */
1422 
1423     return retval;
1424 }
1425 
release_commit(TrxHandleMaster & trx)1426 wsrep_status_t galera::ReplicatorSMM::release_commit(TrxHandleMaster& trx)
1427 {
1428     TrxHandleSlavePtr tsp(trx.ts());
1429     assert(tsp);
1430     TrxHandleSlave& ts(*tsp);
1431 
1432 #ifndef NDEBUG
1433     {
1434         CommitOrder co(ts, co_mode_);
1435         assert(co_mode_ == CommitOrder::BYPASS ||
1436                commit_monitor_.entered(co) == false);
1437     }
1438 #endif
1439 
1440     log_debug << "release_commit() for trx: " << trx << " ts: " << ts;
1441 
1442     assert((ts.flags() & TrxHandle::F_ROLLBACK) == 0);
1443     assert(ts.local_seqno() > 0 && ts.global_seqno() > 0);
1444     assert(ts.state() == TrxHandle::S_COMMITTED);
1445     // Streaming transaction may enter here in aborting state if the
1446     // BF abort happens during fragment commit ordering. Otherwise
1447     // should always be committed.
1448     assert(trx.state() == TrxHandle::S_COMMITTED ||
1449            (trx.state() == TrxHandle::S_ABORTING &&
1450             (ts.flags() & TrxHandle::F_COMMIT) == 0));
1451     assert(!ts.is_committed());
1452 
1453     wsrep_seqno_t const safe_to_discard(cert_.set_trx_committed(ts));
1454 
1455     ApplyOrder ao(ts);
1456     apply_monitor_.leave(ao);
1457 
1458     if ((ts.flags() & TrxHandle::F_COMMIT) == 0 &&
1459         trx.state() == TrxHandle::S_COMMITTED)
1460     {
1461         // continue streaming
1462         TX_SET_STATE(trx, TrxHandle::S_EXECUTING);
1463     }
1464 
1465     trx.reset_ts();
1466 
1467     ++local_commits_;
1468 
1469     report_last_committed(safe_to_discard);
1470 
1471     return WSREP_OK;
1472 }
1473 
1474 
release_rollback(TrxHandleMaster & trx)1475 wsrep_status_t galera::ReplicatorSMM::release_rollback(TrxHandleMaster& trx)
1476 {
1477     assert(trx.locked());
1478 
1479     if (trx.state() == TrxHandle::S_MUST_ABORT) // BF abort before replicaiton
1480         TX_SET_STATE(trx, TrxHandle::S_ABORTING);
1481 
1482     if (trx.state() == TrxHandle::S_ABORTING ||
1483         trx.state() == TrxHandle::S_EXECUTING)
1484         TX_SET_STATE(trx, TrxHandle::S_ROLLED_BACK);
1485 
1486     assert(trx.state() == TrxHandle::S_ROLLED_BACK);
1487 
1488     TrxHandleSlavePtr tsp(trx.ts());
1489     if (tsp)
1490     {
1491         TrxHandleSlave& ts(*tsp);
1492         log_debug << "release_rollback() trx: " << trx << ", ts: " << ts;
1493         assert(ts.global_seqno() > 0);
1494         if (ts.global_seqno() > 0)
1495         {
1496             ApplyOrder ao(ts.global_seqno(), 0, ts.local());
1497 
1498             // Enter and leave monitors if they were not entered/canceled
1499             // already.
1500             if (ts.state() < TrxHandle::S_COMMITTED)
1501             {
1502                 if (ts.state() < TrxHandle::S_CERTIFYING)
1503                 {
1504                     TX_SET_STATE(ts, TrxHandle::S_CERTIFYING);
1505                 }
1506                 if (ts.state() < TrxHandle::S_APPLYING)
1507                 {
1508                     apply_monitor_.enter(ao);
1509                     TX_SET_STATE(ts, TrxHandle::S_APPLYING);
1510                 }
1511                 CommitOrder co(ts, co_mode_);
1512                 if (ts.state() < TrxHandle::S_COMMITTING)
1513                 {
1514                     commit_monitor_.enter(co);
1515                     TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1516                 }
1517                 commit_monitor_.leave(co);
1518                 assert(commit_monitor_.last_left() >= ts.global_seqno());
1519                 TX_SET_STATE(ts, TrxHandle::S_COMMITTED);
1520             }
1521 
1522             /* Queued transactions will be set committed in the queue */
1523             wsrep_seqno_t const safe_to_discard
1524                 (ts.queued() ?
1525                  WSREP_SEQNO_UNDEFINED : cert_.set_trx_committed(ts));
1526             apply_monitor_.leave(ao);
1527             if (safe_to_discard != WSREP_SEQNO_UNDEFINED)
1528                 report_last_committed(safe_to_discard);
1529         }
1530     }
1531     else
1532     {
1533         log_debug << "release_rollback() trx: " << trx << ", ts: nil";
1534     }
1535 
1536     // Trx was either rolled back by user or via certification failure,
1537     // last committed report not needed since cert index state didn't change.
1538     // report_last_committed();
1539 
1540     trx.reset_ts();
1541     ++local_rollbacks_;
1542 
1543     return WSREP_OK;
1544 }
1545 
1546 
sync_wait(wsrep_gtid_t * upto,int tout,wsrep_gtid_t * gtid)1547 wsrep_status_t galera::ReplicatorSMM::sync_wait(wsrep_gtid_t* upto,
1548                                                 int           tout,
1549                                                 wsrep_gtid_t* gtid)
1550 {
1551     gu::GTID wait_gtid;
1552     gu::datetime::Date wait_until(gu::datetime::Date::calendar() +
1553                                   ((tout == -1) ?
1554                                    gu::datetime::Period(causal_read_timeout_) :
1555                                    gu::datetime::Period(tout * gu::datetime::Sec)));
1556 
1557     if (upto == 0)
1558     {
1559         try
1560         {
1561             gcs_.caused(wait_gtid, wait_until);
1562         }
1563         catch (gu::Exception& e)
1564         {
1565             log_warn << "gcs_caused() returned " << -e.get_errno()
1566                      << " (" << strerror(e.get_errno()) << ")";
1567             return WSREP_TRX_FAIL;
1568         }
1569     }
1570     else
1571     {
1572         wait_gtid.set(upto->uuid, upto->seqno);
1573     }
1574 
1575     try
1576     {
1577         // @note: Using timed wait for monitor is currently a hack
1578         // to avoid deadlock resulting from race between monitor wait
1579         // and drain during configuration change. Instead of this,
1580         // monitor should have proper mechanism to interrupt waiters
1581         // at monitor drain and disallowing further waits until
1582         // configuration change related operations (SST etc) have been
1583         // finished.
1584 
1585         // Note: Since wsrep API 26 application may request release of
1586         // commit monitor before the commit actually happens (commit
1587         // may have been ordered/queued on application side for later
1588         // processing). Therefore we now rely on apply_monitor on sync
1589         // wait. This is sufficient since apply_monitor is always released
1590         // only after the whole transaction is over.
1591         apply_monitor_.wait(wait_gtid, wait_until);
1592 
1593         if (gtid != 0)
1594         {
1595             (void)last_committed_id(gtid);
1596         }
1597         ++causal_reads_;
1598         return WSREP_OK;
1599     }
1600     catch (gu::NotFound& e)
1601     {
1602         log_debug << "monitor wait failed for sync_wait: UUID mismatch";
1603         return WSREP_TRX_MISSING;
1604     }
1605     catch (gu::Exception& e)
1606     {
1607         log_debug << "monitor wait failed for sync_wait: " << e.what();
1608         return WSREP_TRX_FAIL;
1609     }
1610 }
1611 
1612 
last_committed_id(wsrep_gtid_t * gtid) const1613 wsrep_status_t galera::ReplicatorSMM::last_committed_id(wsrep_gtid_t* gtid) const
1614 {
1615     // Note that we need to use apply monitor to determine last committed
1616     // here. Due to group commit implementation, the commit monitor may
1617     // be released before the commit has finished and the changes
1618     // made by the transaction have become visible. Therefore we rely
1619     // on apply monitor since it remains grabbed until the whole
1620     // commit is over.
1621     apply_monitor_.last_left_gtid(*gtid);
1622     return WSREP_OK;
1623 }
1624 
1625 
wait_nbo_end(TrxHandleMaster * trx,wsrep_trx_meta_t * meta)1626 wsrep_status_t galera::ReplicatorSMM::wait_nbo_end(TrxHandleMaster* trx,
1627                                                    wsrep_trx_meta_t* meta)
1628 {
1629     gu::shared_ptr<NBOCtx>::type
1630         nbo_ctx(cert_.nbo_ctx(meta->gtid.seqno));
1631 
1632     // Send end message
1633     trx->set_state(TrxHandle::S_REPLICATING);
1634 
1635     WriteSetNG::GatherVector actv;
1636     size_t const actv_size(
1637         trx->write_set_out().gather(trx->source_id(),
1638                                     trx->conn_id(),
1639                                     trx->trx_id(),
1640                                     actv));
1641   resend:
1642     wsrep_seqno_t lc(last_committed());
1643     if (lc == WSREP_SEQNO_UNDEFINED)
1644     {
1645         // Provider has been closed
1646         return WSREP_NODE_FAIL;
1647     }
1648     trx->finalize(lc);
1649 
1650     trx->unlock();
1651     int err(gcs_.sendv(actv, actv_size, GCS_ACT_WRITESET, false, false));
1652     trx->lock();
1653 
1654     if (err == -EAGAIN || err == -ENOTCONN || err == -EINTR)
1655     {
1656         // Send was either interrupted due to states excahnge (EAGAIN),
1657         // due to non-prim (ENOTCONN) or due to timeout in send monitor
1658         // (EINTR).
1659         return WSREP_CONN_FAIL;
1660     }
1661     else if (err < 0)
1662     {
1663         log_error << "Failed to send NBO-end: " << err << ": "
1664                   << ::strerror(-err);
1665         return WSREP_NODE_FAIL;
1666     }
1667 
1668     TrxHandleSlavePtr end_ts;
1669     while ((end_ts = nbo_ctx->wait_ts()) == 0)
1670     {
1671         if (closing_ || state_() == S_CLOSED)
1672         {
1673             log_error << "Closing during nonblocking operation. "
1674                 "Node will be left in inconsistent state and must be "
1675                 "re-initialized either by full SST or from backup.";
1676             return WSREP_FATAL;
1677         }
1678 
1679         if (nbo_ctx->aborted())
1680         {
1681             log_debug << "NBO wait aborted, retrying send";
1682             // Wait was aborted by view change, resend message
1683             goto resend;
1684         }
1685     }
1686 
1687     assert(end_ts->ends_nbo() != WSREP_SEQNO_UNDEFINED);
1688 
1689     trx->add_replicated(end_ts);
1690 
1691     meta->gtid.uuid  = state_uuid_;
1692     meta->gtid.seqno = end_ts->global_seqno();
1693     meta->depends_on = end_ts->depends_seqno();
1694 
1695     ApplyOrder ao(*end_ts);
1696     apply_monitor_.enter(ao);
1697 
1698     CommitOrder co(*end_ts, co_mode_);
1699     if (co_mode_ != CommitOrder::BYPASS)
1700     {
1701         commit_monitor_.enter(co);
1702     }
1703     end_ts->set_state(TrxHandle::S_APPLYING);
1704     end_ts->set_state(TrxHandle::S_COMMITTING);
1705 
1706     trx->set_state(TrxHandle::S_CERTIFYING);
1707     trx->set_state(TrxHandle::S_APPLYING);
1708     trx->set_state(TrxHandle::S_COMMITTING);
1709 
1710     // Unref
1711     cert_.erase_nbo_ctx(end_ts->ends_nbo());
1712 
1713     return WSREP_OK;
1714 }
1715 
1716 
to_isolation_begin(TrxHandleMaster & trx,wsrep_trx_meta_t * meta)1717 wsrep_status_t galera::ReplicatorSMM::to_isolation_begin(TrxHandleMaster&  trx,
1718                                                          wsrep_trx_meta_t* meta)
1719 {
1720     assert(trx.locked());
1721 
1722     if (trx.nbo_end())
1723     {
1724         return wait_nbo_end(&trx, meta);
1725     }
1726 
1727     TrxHandleSlavePtr ts_ptr(trx.ts());
1728     TrxHandleSlave& ts(*ts_ptr);
1729 
1730     if (meta != 0)
1731     {
1732         assert(meta->gtid.seqno > 0);
1733         assert(meta->gtid.seqno == ts.global_seqno());
1734         assert(meta->depends_on == ts.depends_seqno());
1735     }
1736 
1737     assert(trx.state() == TrxHandle::S_REPLICATING);
1738     assert(trx.trx_id() == static_cast<wsrep_trx_id_t>(-1));
1739     assert(ts.local_seqno() > -1 && ts.global_seqno() > -1);
1740     assert(ts.global_seqno() > last_committed());
1741 
1742     CommitOrder co(ts, co_mode_);
1743     wsrep_status_t const retval(cert_and_catch(&trx, ts_ptr));
1744 
1745     ApplyOrder ao(ts);
1746     gu_trace(apply_monitor_.enter(ao));
1747 
1748     switch (retval)
1749     {
1750     case WSREP_OK:
1751     {
1752         TX_SET_STATE(trx, TrxHandle::S_APPLYING);
1753         TX_SET_STATE(ts, TrxHandle::S_APPLYING);
1754         TX_SET_STATE(trx, TrxHandle::S_COMMITTING);
1755         TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1756         break;
1757     }
1758     case WSREP_TRX_FAIL:
1759         break;
1760     default:
1761         assert(0);
1762         gu_throw_fatal << "unrecognized retval " << retval
1763                        << " for to isolation certification for " << ts;
1764         break;
1765     }
1766 
1767     if (co_mode_ != CommitOrder::BYPASS)
1768         try
1769         {
1770             commit_monitor_.enter(co);
1771 
1772             if (ts.state() == TrxHandle::S_COMMITTING)
1773             {
1774                 log_debug << "Executing TO isolated action: " << ts;
1775                 st_.mark_unsafe();
1776             }
1777             else
1778             {
1779                 log_debug << "Grabbed TO for failed isolated action: " << ts;
1780                 assert(trx.state() == TrxHandle::S_ABORTING);
1781             }
1782         }
1783         catch (...)
1784         {
1785             gu_throw_fatal << "unable to enter commit monitor: " << ts;
1786         }
1787 
1788     return retval;
1789 }
1790 
1791 
1792 wsrep_status_t
to_isolation_end(TrxHandleMaster & trx,const wsrep_buf_t * const err)1793 galera::ReplicatorSMM::to_isolation_end(TrxHandleMaster&         trx,
1794                                         const wsrep_buf_t* const err)
1795 {
1796     TrxHandleSlavePtr ts_ptr(trx.ts());
1797     TrxHandleSlave& ts(*ts_ptr);
1798 
1799     log_debug << "Done executing TO isolated action: " << ts;
1800 
1801     assert(trx.state() == TrxHandle::S_COMMITTING ||
1802            trx.state() == TrxHandle::S_ABORTING);
1803     assert(ts.state() == TrxHandle::S_COMMITTING ||
1804            ts.state() == TrxHandle::S_CERTIFYING);
1805 
1806     wsrep_status_t ret(WSREP_OK);
1807     if (NULL != err && NULL != err->ptr)
1808     {
1809         ret = handle_apply_error(ts, *err, "Failed to execute TOI action ");
1810     }
1811 
1812     CommitOrder co(ts, co_mode_);
1813     if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.leave(co);
1814 
1815     wsrep_seqno_t const safe_to_discard(cert_.set_trx_committed(ts));
1816 
1817     ApplyOrder ao(ts);
1818     apply_monitor_.leave(ao);
1819 
1820     if (ts.state() == TrxHandle::S_COMMITTING)
1821     {
1822         assert(trx.state() == TrxHandle::S_COMMITTING);
1823         TX_SET_STATE(trx, TrxHandle::S_COMMITTED);
1824         TX_SET_STATE(ts, TrxHandle::S_COMMITTED);
1825 
1826         if (trx.nbo_start() == false) st_.mark_safe();
1827     }
1828     else
1829     {
1830         assert(trx.state() == TrxHandle::S_ABORTING);
1831         assert(ts.state() == TrxHandle::S_CERTIFYING);
1832         TX_SET_STATE(trx, TrxHandle::S_ROLLED_BACK);
1833         TX_SET_STATE(ts, TrxHandle::S_APPLYING);
1834         TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1835         TX_SET_STATE(ts, TrxHandle::S_COMMITTED);
1836     }
1837 
1838     report_last_committed(safe_to_discard);
1839 
1840     return ret;
1841 }
1842 
1843 
1844 namespace galera
1845 {
1846 
1847 static WriteSetOut*
writeset_from_handle(wsrep_po_handle_t & handle,const TrxHandleMaster::Params & trx_params)1848 writeset_from_handle (wsrep_po_handle_t&             handle,
1849                       const TrxHandleMaster::Params& trx_params)
1850 {
1851     WriteSetOut* ret = static_cast<WriteSetOut*>(handle.opaque);
1852 
1853     if (NULL == ret)
1854     {
1855         try
1856         {
1857             ret = new WriteSetOut(
1858 //                gu::String<256>(trx_params.working_dir_) << '/' << &handle,
1859                 trx_params.working_dir_, wsrep_trx_id_t(&handle),
1860                 /* key format is not essential since we're not adding keys */
1861                 KeySet::version(trx_params.key_format_), NULL, 0, 0,
1862                 trx_params.record_set_ver_,
1863                 WriteSetNG::MAX_VERSION, DataSet::MAX_VERSION, DataSet::MAX_VERSION,
1864                 trx_params.max_write_set_size_);
1865 
1866             handle.opaque = ret;
1867         }
1868         catch (std::bad_alloc& ba)
1869         {
1870             gu_throw_error(ENOMEM) << "Could not create WriteSetOut";
1871         }
1872     }
1873 
1874     return ret;
1875 }
1876 
1877 } /* namespace galera */
1878 
1879 wsrep_status_t
preordered_collect(wsrep_po_handle_t & handle,const struct wsrep_buf * const data,size_t const count,bool const copy)1880 galera::ReplicatorSMM::preordered_collect(wsrep_po_handle_t&            handle,
1881                                           const struct wsrep_buf* const data,
1882                                           size_t                  const count,
1883                                           bool                    const copy)
1884 {
1885     WriteSetOut* const ws(writeset_from_handle(handle, trx_params_));
1886 
1887     for (size_t i(0); i < count; ++i)
1888     {
1889         ws->append_data(data[i].ptr, data[i].len, copy);
1890     }
1891 
1892     return WSREP_OK;
1893 }
1894 
1895 
1896 wsrep_status_t
preordered_commit(wsrep_po_handle_t & handle,const wsrep_uuid_t & source,uint64_t const flags,int const pa_range,bool const commit)1897 galera::ReplicatorSMM::preordered_commit(wsrep_po_handle_t&         handle,
1898                                          const wsrep_uuid_t&        source,
1899                                          uint64_t             const flags,
1900                                          int                  const pa_range,
1901                                          bool                 const commit)
1902 {
1903     WriteSetOut* const ws(writeset_from_handle(handle, trx_params_));
1904 
1905     if (gu_likely(true == commit))
1906     {
1907         assert(source != WSREP_UUID_UNDEFINED);
1908 
1909         ws->set_flags (WriteSetNG::wsrep_flags_to_ws_flags(flags) |
1910                        WriteSetNG::F_PREORDERED);
1911 
1912         /* by loooking at trx_id we should be able to detect gaps / lost events
1913          * (however resending is not implemented yet). Something like
1914          *
1915          * wsrep_trx_id_t const trx_id(cert_.append_preordered(source, ws));
1916          *
1917          * begs to be here. */
1918         wsrep_trx_id_t const trx_id(preordered_id_.add_and_fetch(1));
1919 
1920         WriteSetNG::GatherVector actv;
1921 
1922         size_t const actv_size(ws->gather(source, 0, trx_id, actv));
1923 
1924         ws->finalize_preordered(pa_range); // also adds checksum
1925 
1926         int rcode;
1927         do
1928         {
1929             rcode = gcs_.sendv(actv, actv_size, GCS_ACT_WRITESET, false, false);
1930         }
1931         while (rcode == -EAGAIN && (usleep(1000), true));
1932 
1933         if (rcode < 0)
1934             gu_throw_error(-rcode)
1935                 << "Replication of preordered writeset failed.";
1936     }
1937 
1938     delete ws; // cleanup regardless of commit flag
1939 
1940     handle.opaque = NULL;
1941 
1942     return WSREP_OK;
1943 }
1944 
1945 
1946 wsrep_status_t
sst_sent(const wsrep_gtid_t & state_id,int rcode)1947 galera::ReplicatorSMM::sst_sent(const wsrep_gtid_t& state_id, int rcode)
1948 {
1949     assert (rcode <= 0);
1950     assert (rcode == 0 || state_id.seqno == WSREP_SEQNO_UNDEFINED);
1951     assert (rcode != 0 || state_id.seqno >= 0);
1952 
1953     if (state_() != S_DONOR)
1954     {
1955         log_error << "sst sent called when not SST donor, state " << state_();
1956         return WSREP_CONN_FAIL;
1957     }
1958 
1959     if (state_id.uuid != state_uuid_ && rcode >= 0)
1960     {
1961         // state we have sent no longer corresponds to the current group state
1962         // mark an error
1963         rcode = -EREMCHG;
1964     }
1965 
1966     try {
1967         if (rcode == 0)
1968             gcs_.join(gu::GTID(state_id.uuid, state_id.seqno), rcode);
1969         else
1970             /* stamp error message with the current state */
1971             gcs_.join(gu::GTID(state_uuid_, commit_monitor_.last_left()), rcode);
1972 
1973         return WSREP_OK;
1974     }
1975     catch (gu::Exception& e)
1976     {
1977         log_error << "failed to recover from DONOR state: " << e.what();
1978         return WSREP_CONN_FAIL;
1979     }
1980 }
1981 
1982 // Checks if the seqno has been assgined for the gcache buffer.
1983 // If yes, discard the old and use the one assigned in IST.
1984 // This is required to make the correct gcache buffer associated
1985 // with certification index entries.
1986 galera::TrxHandleSlavePtr
get_real_ts_with_gcache_buffer(const TrxHandleSlavePtr & ts)1987 galera::ReplicatorSMM::get_real_ts_with_gcache_buffer(
1988     const TrxHandleSlavePtr& ts)
1989 {
1990     try
1991     {
1992         ssize_t size;
1993         const void* buf(gcache_.seqno_get_ptr(ts->global_seqno(), size));
1994         // GCache seqno_get_ptr() did not throw, so there was a matching
1995         // entry in GCache. Construct a new TrxHandleSlavePtr from
1996         // existing gcache buffer and discard the old one.
1997         TrxHandleSlavePtr ret(TrxHandleSlave::New(false, slave_pool_),
1998                               TrxHandleSlaveDeleter());
1999         if (size > 0)
2000         {
2001             gu_trace(ret->unserialize<false>(
2002                          gcs_action{ts->global_seqno(), WSREP_SEQNO_UNDEFINED,
2003                                  buf, int32_t(size), GCS_ACT_WRITESET}));
2004             ret->set_local(false);
2005             assert(ret->global_seqno() == ts->global_seqno());
2006             assert(ret->depends_seqno() >= 0 || ts->nbo_end());
2007             assert(ret->action().first && ret->action().second);
2008             ret->verify_checksum();
2009         }
2010         else
2011         {
2012             ret->set_global_seqno(ts->global_seqno());
2013             ret->mark_dummy_with_action(buf);
2014         }
2015 
2016         // The bufs should never match as the seqno should not have been
2017         // yet assigned to buf on this codepath.
2018         assert(ts->action().first != buf);
2019         // Free duplicate buffer.
2020         if (ts->action().first != buf)
2021         {
2022             gcache_.free(const_cast<void*>(ts->action().first));
2023         }
2024         return ret;
2025     }
2026     catch (const gu::NotFound&)
2027     {
2028         // Seqno was not assigned to this buffer, so it was not part of
2029         // IST processing and was allocated by GCS.
2030         gcache_.seqno_assign(ts->action().first, ts->global_seqno(),
2031                              GCS_ACT_WRITESET, false);
2032         return ts;
2033     }
2034 }
2035 
handle_trx_overlapping_ist(const TrxHandleSlavePtr & ts)2036 void galera::ReplicatorSMM::handle_trx_overlapping_ist(
2037     const TrxHandleSlavePtr& ts)
2038 {
2039     // Out of order processing. IST has already applied the
2040     // trx.
2041     assert (ts->global_seqno() <= apply_monitor_.last_left());
2042 
2043     assert(not ts->local());
2044     // Use local seqno from original ts for local monitor.
2045     LocalOrder lo(ts->local_seqno(), ts.get());
2046 
2047     // Get real_ts pointing to GCache buffer which will not be discarded
2048     // if there is overlap. Do not try to access ts after this line.
2049     auto real_ts(get_real_ts_with_gcache_buffer(ts));
2050     local_monitor_.enter(lo);
2051     // If global seqno is higher than certification position, this
2052     // trx was not part of the preload ad must be appended to
2053     // certification index.
2054     if (real_ts->global_seqno() > cert_.position())
2055     {
2056         // We don't care about the result, just populate the index
2057         // and mark trx committed in certification.
2058         // see skip_prim_conf_change() for analogous logic
2059         (void)cert_.append_trx(real_ts);
2060         report_last_committed(cert_.set_trx_committed(*real_ts));
2061     }
2062     local_monitor_.leave(lo);
2063 }
2064 
process_trx(void * recv_ctx,const TrxHandleSlavePtr & ts_ptr)2065 void galera::ReplicatorSMM::process_trx(void* recv_ctx,
2066                                         const TrxHandleSlavePtr& ts_ptr)
2067 {
2068     assert(recv_ctx != 0);
2069     assert(ts_ptr != 0);
2070 
2071     TrxHandleSlave& ts(*ts_ptr);
2072 
2073     assert(ts.local_seqno() > 0);
2074     assert(ts.global_seqno() > 0);
2075     assert(ts.last_seen_seqno() >= 0);
2076     assert(ts.depends_seqno() == -1 || ts.version() >= 4);
2077     assert(ts.state() == TrxHandle::S_REPLICATING);
2078 
2079     // SST thread drains monitors after IST, so this should be
2080     // safe way to check if the ts was contained in IST.
2081     if (ts.global_seqno() <= apply_monitor_.last_left())
2082     {
2083         handle_trx_overlapping_ist(ts_ptr);
2084         return;
2085     }
2086 
2087     wsrep_status_t const retval(cert_and_catch(0, ts_ptr));
2088 
2089     switch (retval)
2090     {
2091     case WSREP_TRX_FAIL:
2092         assert(ts.is_dummy());
2093         /* fall through to apply_trx() */
2094     case WSREP_OK:
2095         try
2096         {
2097             if (ts.nbo_end() == true)
2098             {
2099                 // NBO-end events are for internal operation only, not to be
2100                 // consumed by application. If the NBO end happens with
2101                 // different seqno than the current event's global seqno,
2102                 // release monitors. In other case monitors will be grabbed
2103                 // by local NBO handler threads.
2104                 if (ts.ends_nbo() == WSREP_SEQNO_UNDEFINED)
2105                 {
2106                     assert(WSREP_OK != retval);
2107                     assert(ts.is_dummy());
2108                 }
2109                 else
2110                 {
2111                     assert(WSREP_OK == retval);
2112                     assert(ts.ends_nbo() > 0);
2113                     // Signal NBO waiter here after leaving local ordering
2114                     // critical section.
2115                     gu::shared_ptr<NBOCtx>::type nbo_ctx(
2116                         cert_.nbo_ctx(ts.ends_nbo()));
2117                     assert(nbo_ctx != 0);
2118                     nbo_ctx->set_ts(ts_ptr);
2119                     break;
2120                 }
2121             }
2122 
2123             gu_trace(apply_trx(recv_ctx, ts));
2124         }
2125         catch (std::exception& e)
2126         {
2127             log_fatal << "Failed to apply trx: " << ts;
2128             log_fatal << e.what();
2129             log_fatal << "Node consistency compromized, leaving cluster...";
2130             mark_corrupt_and_close();
2131             assert(0); // this is an unexpected exception
2132             // keep processing events from the queue until provider is closed
2133         }
2134         break;
2135     default:
2136         // this should not happen for remote actions
2137         gu_throw_error(EINVAL)
2138             << "unrecognized retval for remote trx certification: "
2139             << retval << " trx: " << ts;
2140     }
2141 }
2142 
2143 
process_commit_cut(wsrep_seqno_t const seq,wsrep_seqno_t const seqno_l)2144 void galera::ReplicatorSMM::process_commit_cut(wsrep_seqno_t const seq,
2145                                                wsrep_seqno_t const seqno_l)
2146 {
2147     assert(seq > 0);
2148     assert(seqno_l > 0);
2149 
2150     LocalOrder lo(seqno_l);
2151 
2152     gu_trace(local_monitor_.enter(lo));
2153 
2154     if (seq >= cc_seqno_) /* Refs #782. workaround for
2155                            * assert(seqno >= seqno_released_) in gcache. */
2156         cert_.purge_trxs_upto(seq, true);
2157 
2158     local_monitor_.leave(lo);
2159     log_debug << "Got commit cut from GCS: " << seq;
2160 }
2161 
2162 
2163 /* NB: the only use for this method is in cancel_seqnos() below */
cancel_seqno(wsrep_seqno_t const seqno)2164 void galera::ReplicatorSMM::cancel_seqno(wsrep_seqno_t const seqno)
2165 {
2166     assert(seqno > 0);
2167 
2168     ApplyOrder ao(seqno, seqno - 1);
2169     apply_monitor_.self_cancel(ao);
2170 
2171     if (co_mode_ != CommitOrder::BYPASS)
2172     {
2173         CommitOrder co(seqno, co_mode_);
2174         commit_monitor_.self_cancel(co);
2175     }
2176 }
2177 
2178 /* NB: the only use for this method is to dismiss the slave queue
2179  *     in corrupt state */
cancel_seqnos(wsrep_seqno_t const seqno_l,wsrep_seqno_t const seqno_g)2180 void galera::ReplicatorSMM::cancel_seqnos(wsrep_seqno_t const seqno_l,
2181                                           wsrep_seqno_t const seqno_g)
2182 {
2183     if (seqno_l > 0)
2184     {
2185         LocalOrder lo(seqno_l);
2186         local_monitor_.self_cancel(lo);
2187     }
2188 
2189     if (seqno_g > 0) cancel_seqno(seqno_g);
2190 }
2191 
2192 
drain_monitors(wsrep_seqno_t const upto)2193 void galera::ReplicatorSMM::drain_monitors(wsrep_seqno_t const upto)
2194 {
2195     apply_monitor_.drain(upto);
2196     if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(upto);
2197 }
2198 
2199 
process_vote(wsrep_seqno_t const seqno_g,wsrep_seqno_t const seqno_l,int64_t const code)2200 void galera::ReplicatorSMM::process_vote(wsrep_seqno_t const seqno_g,
2201                                          wsrep_seqno_t const seqno_l,
2202                                          int64_t       const code)
2203 {
2204     assert(seqno_g > 0);
2205     assert(seqno_l > 0);
2206 
2207     std::ostringstream msg;
2208 
2209     LocalOrder lo(seqno_l);
2210     gu_trace(local_monitor_.enter(lo));
2211 
2212     gu::GTID const gtid(state_uuid_, seqno_g);
2213 
2214     if (code > 0)  /* vote request */
2215     {
2216         assert(GCS_VOTE_REQUEST == code);
2217         log_info << "Got vote request for seqno " << gtid; //remove
2218         /* make sure WS was either successfully applied or already voted */
2219         if (last_committed() < seqno_g) drain_monitors(seqno_g);
2220         if (st_.corrupt()) goto out;
2221 
2222         int const ret(gcs_.vote(gtid, 0, NULL, 0));
2223 
2224         switch (ret)
2225         {
2226         case 0:         /* majority agrees */
2227             log_info << "Vote 0 (success) on " << gtid
2228                      << " is consistent with group. Continue.";
2229             goto out;
2230         case -EALREADY: /* already voted */
2231             log_info << gtid << " already voted on. Continue.";
2232             goto out;
2233         case 1:         /* majority disagrees */
2234             msg << "Vote 0 (success) on " << gtid
2235                 << " is inconsistent with group. Leaving cluster.";
2236             goto fail;
2237         default:        /* general error */
2238             assert(ret < 0);
2239             msg << "Failed to vote on request for " << gtid << ": "
2240                 << -ret << " (" << ::strerror(-ret) << "). "
2241                 "Assuming inconsistency";
2242             goto fail;
2243         }
2244     }
2245     else if (code < 0)
2246     {
2247         msg << "Got negative vote on successfully applied " << gtid;
2248     fail:
2249         log_error << msg.str();
2250         on_inconsistency();
2251     }
2252     else
2253     {
2254         /* seems we are in majority */
2255     }
2256 out:
2257     local_monitor_.leave(lo);
2258 }
2259 
2260 
set_initial_position(const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno)2261 void galera::ReplicatorSMM::set_initial_position(const wsrep_uuid_t&  uuid,
2262                                                  wsrep_seqno_t const seqno)
2263 {
2264     update_state_uuid(uuid);
2265 
2266     apply_monitor_.set_initial_position(uuid, seqno);
2267     if (co_mode_ != CommitOrder::BYPASS)
2268         commit_monitor_.set_initial_position(uuid, seqno);
2269 }
2270 
2271 std::tuple<int, enum gu::RecordSet::Version>
get_trx_protocol_versions(int proto_ver)2272 galera::get_trx_protocol_versions(int proto_ver)
2273 {
2274     enum gu::RecordSet::Version record_set_ver(gu::RecordSet::EMPTY);
2275     int trx_ver(-1);
2276     switch (proto_ver)
2277     {
2278     case 1:
2279         trx_ver = 1;
2280         record_set_ver = gu::RecordSet::VER1;
2281         break;
2282     case 2:
2283         trx_ver = 1;
2284         record_set_ver = gu::RecordSet::VER1;
2285         break;
2286     case 3:
2287     case 4:
2288         trx_ver = 2;
2289         record_set_ver = gu::RecordSet::VER1;
2290         break;
2291     case 5:
2292         trx_ver = 3;
2293         record_set_ver = gu::RecordSet::VER1;
2294         break;
2295     case 6:
2296         trx_ver  = 3;
2297         record_set_ver = gu::RecordSet::VER1;
2298         break;
2299     case 7:
2300         // Protocol upgrade to handle IST SSL backwards compatibility,
2301         // no effect to TRX or STR protocols.
2302         trx_ver = 3;
2303         record_set_ver = gu::RecordSet::VER1;
2304         break;
2305     case 8:
2306         // Protocol upgrade to enforce 8-byte alignment in writesets and CCs
2307         trx_ver = 3;
2308         record_set_ver = gu::RecordSet::VER2;
2309         break;
2310     case 9:
2311         // Protocol upgrade to enable support for semi-shared key type.
2312         trx_ver = 4;
2313         record_set_ver = gu::RecordSet::VER2;
2314         break;
2315     case 10:
2316         // Protocol upgrade to enable support for:
2317         trx_ver = 5;// PA range preset in the writeset,
2318                                  // WSREP_KEY_UPDATE support (API v26)
2319         record_set_ver = gu::RecordSet::VER2;
2320         break;
2321     default:
2322         gu_throw_error(EPROTO)
2323             << "Configuration change resulted in an unsupported protocol "
2324             "version: " << proto_ver << ". Can't continue.";
2325     };
2326     return std::make_tuple(trx_ver, record_set_ver);
2327 }
2328 
establish_protocol_versions(int proto_ver)2329 void galera::ReplicatorSMM::establish_protocol_versions (int proto_ver)
2330 {
2331     try
2332     {
2333         const auto trx_versions(get_trx_protocol_versions(proto_ver));
2334         trx_params_.version_ = std::get<0>(trx_versions);
2335         trx_params_.record_set_ver_ = std::get<1>(trx_versions);
2336         protocol_version_ = proto_ver;
2337         log_info << "REPL Protocols: " << protocol_version_ << " ("
2338                  << trx_params_.version_ << ")";
2339     }
2340     catch (const gu::Exception& e)
2341     {
2342         log_fatal << "Caught exception: " << e.what();
2343         abort();
2344     }
2345 }
2346 
record_cc_seqnos(wsrep_seqno_t cc_seqno,const char * source)2347 void galera::ReplicatorSMM::record_cc_seqnos(wsrep_seqno_t cc_seqno,
2348                                              const char* source)
2349 {
2350     cc_seqno_ = cc_seqno;
2351     cc_lowest_trx_seqno_ = cert_.lowest_trx_seqno();
2352     log_info << "Lowest cert index boundary for CC from " << source
2353              << ": " << cc_lowest_trx_seqno_;;
2354     log_info << "Min available from gcache for CC from " << source
2355              << ": " << gcache_.seqno_min();
2356     // Lowest TRX must not have been released from gcache at this
2357     // point.
2358     assert(cc_lowest_trx_seqno_ >= gcache_.seqno_min());
2359 }
2360 
2361 void
update_incoming_list(const wsrep_view_info_t & view)2362 galera::ReplicatorSMM::update_incoming_list(const wsrep_view_info_t& view)
2363 {
2364     static char const separator(',');
2365 
2366     ssize_t new_size(0);
2367 
2368     if (view.memb_num > 0)
2369     {
2370         new_size += view.memb_num - 1; // separators
2371 
2372         for (int i = 0; i < view.memb_num; ++i)
2373         {
2374             new_size += strlen(view.members[i].incoming);
2375         }
2376     }
2377 
2378     gu::Lock lock(incoming_mutex_);
2379 
2380     incoming_list_.clear();
2381     incoming_list_.resize(new_size);
2382 
2383     if (new_size <= 0) return;
2384 
2385     incoming_list_ = view.members[0].incoming;
2386 
2387     for (int i = 1; i < view.memb_num; ++i)
2388     {
2389         incoming_list_ += separator;
2390         incoming_list_ += view.members[i].incoming;
2391     }
2392 }
2393 
state2repl(gcs_node_state const my_state,int const my_idx)2394 static galera::Replicator::State state2repl(gcs_node_state const my_state,
2395                                             int const            my_idx)
2396 {
2397     switch (my_state)
2398     {
2399     case GCS_NODE_STATE_NON_PRIM:
2400     case GCS_NODE_STATE_PRIM:
2401         return galera::Replicator::S_CONNECTED;
2402     case GCS_NODE_STATE_JOINER:
2403         return galera::Replicator::S_JOINING;
2404     case GCS_NODE_STATE_JOINED:
2405         return galera::Replicator::S_JOINED;
2406     case GCS_NODE_STATE_SYNCED:
2407         return galera::Replicator::S_SYNCED;
2408     case GCS_NODE_STATE_DONOR:
2409         return galera::Replicator::S_DONOR;
2410     case GCS_NODE_STATE_MAX:
2411         assert(0);
2412     }
2413 
2414     gu_throw_fatal << "unhandled gcs state: " << my_state;
2415     GU_DEBUG_NORETURN;
2416 }
2417 
2418 void
submit_view_info(void * recv_ctx,const wsrep_view_info_t * view_info)2419 galera::ReplicatorSMM::submit_view_info(void*                    recv_ctx,
2420                                         const wsrep_view_info_t* view_info)
2421 {
2422     wsrep_cb_status_t const rcode
2423         (view_cb_(app_ctx_, recv_ctx, view_info, 0, 0));
2424 
2425     if (WSREP_CB_SUCCESS != rcode)
2426     {
2427         gu_throw_fatal << "View callback failed. "
2428             "This is unrecoverable, restart required.";
2429     }
2430 }
2431 
2432 void
process_conf_change(void * recv_ctx,const struct gcs_action & cc)2433 galera::ReplicatorSMM::process_conf_change(void*                    recv_ctx,
2434                                            const struct gcs_action& cc)
2435 {
2436     assert(cc.seqno_l > 0); // Must not be from IST
2437 
2438     gcs_act_cchange const conf(cc.buf, cc.size);
2439 
2440     LocalOrder lo(cc.seqno_l);
2441     local_monitor_.enter(lo);
2442 
2443     process_pending_queue(cc.seqno_l);
2444 
2445     if (conf.conf_id < 0)
2446     {
2447         process_non_prim_conf_change(recv_ctx, conf, cc.seqno_g);
2448         gcache_.free(const_cast<void*>(cc.buf));
2449     }
2450     else
2451     {
2452         process_prim_conf_change(recv_ctx, conf, cc.seqno_g,
2453                                  const_cast<void*>(cc.buf));
2454     }
2455 
2456     resume_recv();
2457 
2458     local_monitor_.leave(lo);
2459 
2460     if (conf.memb.size() == 0)
2461     {
2462         log_debug << "Received SELF-LEAVE. Connection closed.";
2463         assert(conf.conf_id < 0 && cc.seqno_g < 0);
2464         gu::Lock lock(closing_mutex_);
2465         shift_to_CLOSED();
2466     }
2467 }
2468 
drain_monitors_for_local_conf_change()2469 void galera::ReplicatorSMM::drain_monitors_for_local_conf_change()
2470 {
2471     wsrep_seqno_t const upto(cert_.position());
2472     assert(upto >= last_committed());
2473     if (upto >= last_committed())
2474     {
2475         log_debug << "Drain monitors from " << last_committed()
2476                   << " up to " << upto;
2477         gu_trace(drain_monitors(upto));
2478     }
2479     else
2480     {
2481         log_warn << "Cert position " << upto << " less than last committed "
2482                  << last_committed();
2483     }
2484 }
2485 
process_non_prim_conf_change(void * recv_ctx,const gcs_act_cchange & conf,int const my_index)2486 void galera::ReplicatorSMM::process_non_prim_conf_change(
2487     void* recv_ctx,
2488     const gcs_act_cchange& conf,
2489     int const my_index)
2490 {
2491     assert(conf.conf_id == WSREP_SEQNO_UNDEFINED);
2492 
2493     /* ignore outdated non-prim configuration change */
2494     if (conf.uuid == state_uuid_ && conf.seqno < sst_seqno_) return;
2495 
2496     wsrep_uuid_t new_uuid(uuid_);
2497     wsrep_view_info_t* const view_info
2498         (galera_view_info_create(conf,
2499                                  capabilities(conf.repl_proto_ver),
2500                                  my_index, new_uuid));
2501     // Non-prim should not change UUID
2502     assert(uuid_ == WSREP_UUID_UNDEFINED || new_uuid == uuid_);
2503     assert(view_info->status == WSREP_VIEW_NON_PRIMARY);
2504 
2505     // Draining monitors could hang when the state is corrupt as
2506     // there may be blocked appliers.
2507     if (not st_.corrupt())
2508     {
2509         drain_monitors_for_local_conf_change();
2510     }
2511 
2512     update_incoming_list(*view_info);
2513 
2514     try
2515     {
2516         submit_view_info(recv_ctx, view_info);
2517         free(view_info);
2518     }
2519     catch (gu::Exception& e)
2520     {
2521         free(view_info);
2522         log_fatal << e.what();
2523         abort();
2524     }
2525 
2526     {
2527         gu::Lock lock(closing_mutex_);
2528         if (state_() > S_CONNECTED)
2529         {
2530             state_.shift_to(S_CONNECTED);
2531         }
2532     }
2533 }
2534 
validate_local_prim_view_info(const wsrep_view_info_t * view_info,const wsrep_uuid_t & my_uuid)2535 static void validate_local_prim_view_info(const wsrep_view_info_t* view_info,
2536                                           const wsrep_uuid_t& my_uuid)
2537 {
2538     assert(view_info->status == WSREP_VIEW_PRIMARY);
2539     if (view_info->memb_num > 0 &&
2540         (view_info->my_idx < 0 || view_info->my_idx >= view_info->memb_num))
2541         // something went wrong, member must be present in own view
2542     {
2543         std::ostringstream msg;
2544         msg << "Node UUID " << my_uuid << " is absent from the view:\n";
2545         for (int m(0); m < view_info->memb_num; ++m)
2546         {
2547             msg << '\t' << view_info->members[m].id << '\n';
2548         }
2549         msg << "most likely due to unexpected node identity change. "
2550             "Aborting.";
2551         log_fatal << msg.str();
2552         abort();
2553     }
2554 }
2555 
skip_prim_conf_change(const wsrep_view_info_t & view_info,int const proto_ver)2556 bool galera::ReplicatorSMM::skip_prim_conf_change(
2557     const wsrep_view_info_t& view_info, int const proto_ver)
2558 {
2559     auto cc_seqno(WSREP_SEQNO_UNDEFINED);
2560     bool keep(false); // keep in cache
2561 
2562     if (proto_ver >= PROTO_VER_ORDERED_CC)
2563     {
2564         cc_seqno = view_info.state_id.seqno;
2565 
2566         if (cc_seqno > cert_.position())
2567         {
2568             // was not part of IST preload, adjust cert. index
2569             // see handle_trx_overlapping_ist() for analogous logic
2570             assert(cc_seqno == cert_.position() + 1);
2571             const auto trx_ver
2572                 (std::get<0>(get_trx_protocol_versions(proto_ver)));
2573             cert_.adjust_position(view_info,
2574                                   gu::GTID(view_info.state_id.uuid, cc_seqno),
2575                                   trx_ver);
2576             keep = true;
2577         }
2578     }
2579 
2580     log_info << "####### skipping local CC " << cc_seqno << ", keep in cache: "
2581              << (keep ? "true" : "false");
2582 
2583     return keep;
2584 }
2585 
process_first_view(const wsrep_view_info_t * view_info,const wsrep_uuid_t & new_uuid)2586 void galera::ReplicatorSMM::process_first_view(
2587     const wsrep_view_info_t* view_info, const wsrep_uuid_t& new_uuid)
2588 {
2589     assert(uuid_ == WSREP_UUID_UNDEFINED && new_uuid != WSREP_UUID_UNDEFINED);
2590     assert(view_info->state_id.uuid != WSREP_UUID_UNDEFINED);
2591     uuid_ = new_uuid;
2592     log_info << "Process first view: " << view_info->state_id.uuid
2593              << " my uuid: " << new_uuid;
2594     if (connected_cb_)
2595     {
2596         wsrep_cb_status_t cret(connected_cb_(app_ctx_, view_info));
2597         if (cret != WSREP_CB_SUCCESS)
2598         {
2599             log_fatal << "Application returned error "
2600                       << cret
2601                       << " from connect callback, aborting";
2602             abort();
2603         }
2604     }
2605 }
2606 
process_group_change(const wsrep_view_info_t * view_info)2607 void galera::ReplicatorSMM::process_group_change(
2608     const wsrep_view_info_t* view_info)
2609 {
2610     assert(state_uuid_ != view_info->state_id.uuid);
2611     log_info << "Process group change: "
2612              << state_uuid_ << " -> " << view_info->state_id.uuid;
2613     if (connected_cb_)
2614     {
2615         wsrep_cb_status_t cret(connected_cb_(app_ctx_, view_info));
2616         if (cret != WSREP_CB_SUCCESS)
2617         {
2618             log_fatal << "Application returned error "
2619                       << cret
2620                       << " from connect callback, aborting";
2621             abort();
2622         }
2623     }
2624 }
2625 
process_st_required(void * recv_ctx,int const group_proto_ver,const wsrep_view_info_t * view_info)2626 void galera::ReplicatorSMM::process_st_required(
2627     void* recv_ctx,
2628     int const group_proto_ver,
2629     const wsrep_view_info_t* view_info)
2630 {
2631     const wsrep_seqno_t group_seqno(view_info->state_id.seqno);
2632     const wsrep_uuid_t& group_uuid (view_info->state_id.uuid);
2633 
2634     void*  app_req(0);
2635     size_t app_req_len(0);
2636 #ifndef NDEBUG
2637     bool   app_waits_sst(false);
2638 #endif
2639     log_info << "State transfer required: "
2640              << "\n\tGroup state: " << group_uuid << ":" << group_seqno
2641              << "\n\tLocal state: " << state_uuid_<< ":" << last_committed();
2642 
2643     if (S_CONNECTED != state_()) state_.shift_to(S_CONNECTED);
2644 
2645     wsrep_cb_status_t const rcode(sst_request_cb_(app_ctx_,
2646                                                   &app_req, &app_req_len));
2647 
2648     if (WSREP_CB_SUCCESS != rcode)
2649     {
2650         assert(app_req_len <= 0);
2651         log_fatal << "SST request callback failed. This is unrecoverable, "
2652                   << "restart required.";
2653         abort();
2654     }
2655     else if (0 == app_req_len && state_uuid_ != group_uuid)
2656     {
2657         log_fatal << "Local state UUID " << state_uuid_
2658                   << " is different from group state UUID " << group_uuid
2659                   << ", and SST request is null: restart required.";
2660         abort();
2661     }
2662 #ifndef NDEBUG
2663     app_waits_sst = (app_req_len > 0) &&
2664         (app_req_len != (strlen(WSREP_STATE_TRANSFER_NONE) + 1) ||
2665          memcmp(app_req, WSREP_STATE_TRANSFER_NONE, app_req_len));
2666     log_info << "App waits SST: " << app_waits_sst;
2667 #endif
2668     // GCache::seqno_reset() happens here
2669     request_state_transfer (recv_ctx,
2670                             group_proto_ver, group_uuid, group_seqno, app_req,
2671                             app_req_len);
2672     free(app_req);
2673 
2674     finish_local_prim_conf_change(group_proto_ver, group_seqno, "sst");
2675     // No need to submit view info. It is always contained either
2676     // in SST or applied in IST.
2677 }
2678 
reset_index_if_needed(const wsrep_view_info_t * view_info,int const prev_protocol_version,int const next_protocol_version,bool const st_required)2679 void galera::ReplicatorSMM::reset_index_if_needed(
2680     const wsrep_view_info_t* view_info,
2681     int const prev_protocol_version,
2682     int const next_protocol_version,
2683     bool const st_required)
2684 {
2685     const wsrep_seqno_t group_seqno(view_info->state_id.seqno);
2686     const wsrep_uuid_t& group_uuid (view_info->state_id.uuid);
2687 
2688     //
2689     // Starting from protocol_version_ 8 joiner's cert index is rebuilt
2690     // from IST.
2691     //
2692     // The reasons to reset cert index:
2693     // - Protocol version lower than PROTO_VER_ORDERED_CC (ALL)
2694     // - Protocol upgrade                       (ALL)
2695     // - State transfer will take place         (JOINER)
2696     //
2697     bool index_reset(next_protocol_version < PROTO_VER_ORDERED_CC ||
2698                      prev_protocol_version != next_protocol_version ||
2699                      // this last condition is a bit too strict. In fact
2700                      // checking for app_waits_sst would be enough, but in
2701                      // that case we'd have to skip cert index rebuilding
2702                      // when there is none.
2703                      // This would complicate the logic with little to no
2704                      // benefits...
2705                      st_required);
2706 
2707     if (index_reset)
2708     {
2709         gu::GTID position;
2710         int trx_proto_ver;
2711         if (next_protocol_version < PROTO_VER_ORDERED_CC)
2712         {
2713             position.set(group_uuid, group_seqno);
2714             trx_proto_ver = std::get<0>(get_trx_protocol_versions(
2715                                             next_protocol_version));
2716         }
2717         else
2718         {
2719             position = gu::GTID();
2720             // With PROTO_VER_ORDERED_CC/index preload the cert protocol version
2721             // is adjusted during IST/cert index preload.
2722             // See process_ist_conf_change().
2723             trx_proto_ver = -1;
2724         }
2725         // Index will be reset, so all write sets preceding this CC in
2726         // local order must be discarded. Therefore the pending cert queue
2727         // must also be cleared.
2728         pending_cert_queue_.clear();
2729         /* 2 reasons for this here:
2730          * 1 - compatibility with protocols < PROTO_VER_ORDERED_CC
2731          * 2 - preparing cert index for preloading by setting seqno to 0 */
2732         log_info << "Cert index reset to " << position << " (proto: "
2733                  << next_protocol_version << "), state transfer needed: "
2734                  << (st_required ? "yes" : "no");
2735         /* flushes service thd, must be called before gcache_.seqno_reset()*/
2736         cert_.assign_initial_position(position, trx_proto_ver);
2737     }
2738     else
2739     {
2740         log_info << "Skipping cert index reset";
2741     }
2742 
2743 }
2744 
shift_to_next_state(Replicator::State next_state)2745 void galera::ReplicatorSMM::shift_to_next_state(Replicator::State next_state)
2746 {
2747     if (state_() == S_CONNECTED || state_() == S_DONOR)
2748     {
2749         switch (next_state)
2750         {
2751         case S_JOINING:
2752             state_.shift_to(S_JOINING);
2753             break;
2754         case S_DONOR:
2755             if (state_() == S_CONNECTED)
2756             {
2757                 state_.shift_to(S_DONOR);
2758             }
2759             break;
2760         case S_JOINED:
2761             state_.shift_to(S_JOINED);
2762             break;
2763         case S_SYNCED:
2764             state_.shift_to(S_SYNCED);
2765             if (synced_cb_(app_ctx_) != WSREP_CB_SUCCESS)
2766             {
2767                 log_fatal << "Synced callback failed. This is "
2768                           << "unrecoverable, restart required.";
2769                 abort();
2770             }
2771             break;
2772         default:
2773             log_debug << "next_state " << next_state;
2774             break;
2775         }
2776     }
2777     st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
2778 }
2779 
become_joined_if_needed()2780 void galera::ReplicatorSMM::become_joined_if_needed()
2781 {
2782     if (state_() == S_JOINING && sst_state_ != SST_NONE)
2783     {
2784         /* There are two reasons we can be here:
2785          * 1) we just got state transfer in request_state_transfer().
2786          * 2) we failed here previously (probably due to partition).
2787          */
2788         try {
2789             gcs_.join(gu::GTID(state_uuid_, sst_seqno_), 0);
2790             sst_state_ = SST_JOIN_SENT;
2791         }
2792         catch (gu::Exception& e)
2793         {
2794             if (e.get_errno() == ENOTCONN)
2795             {
2796                 log_info << "Failed to JOIN due to non-Prim";
2797             }
2798             else
2799             {
2800                 log_warn << "Failed to JOIN the cluster after SST "
2801                          << e.what();
2802             }
2803         }
2804     }
2805 }
2806 
submit_ordered_view_info(void * recv_ctx,const wsrep_view_info_t * view_info)2807 void galera::ReplicatorSMM::submit_ordered_view_info(
2808     void* recv_ctx,
2809     const wsrep_view_info_t* view_info)
2810 {
2811     try
2812     {
2813         submit_view_info(recv_ctx, view_info);
2814     }
2815     catch (gu::Exception& e)
2816     {
2817         log_fatal << e.what();
2818         abort();
2819     }
2820 }
2821 
finish_local_prim_conf_change(int const group_proto_ver,wsrep_seqno_t const seqno,const char * context)2822 void galera::ReplicatorSMM::finish_local_prim_conf_change(
2823     int const group_proto_ver __attribute__((unused)),
2824     wsrep_seqno_t const seqno,
2825     const char* context)
2826 {
2827     become_joined_if_needed();
2828     record_cc_seqnos(seqno, context);
2829     // GCache must contain some actions, at least this CC
2830     bool const ordered __attribute__((unused))
2831         (group_proto_ver >= PROTO_VER_ORDERED_CC);
2832     assert(gcache_.seqno_min() > 0 || not ordered);
2833 }
2834 
2835 namespace {
2836     // Deleter for view_info.
operator ()__anonebb396a90111::ViewInfoDeleter2837     struct ViewInfoDeleter { void operator()(void* ptr) { ::free(ptr); } };
2838 }
2839 
process_prim_conf_change(void * recv_ctx,const gcs_act_cchange & conf,int const my_index,void * cc_buf)2840 void galera::ReplicatorSMM::process_prim_conf_change(void* recv_ctx,
2841                                                      const gcs_act_cchange& conf,
2842                                                      int const my_index,
2843                                                      void* cc_buf)
2844 {
2845     assert(conf.seqno > 0);
2846     assert(my_index >= 0);
2847 
2848     GU_DBUG_SYNC_WAIT("process_primary_configuration");
2849     // Helper to discard cc_buf automatically when it goes out of scope.
2850     // Method keep() should be called if the buffer should be kept in
2851     // gcache.
2852     class CcBufDiscard
2853     {
2854     public:
2855         CcBufDiscard(gcache::GCache& gcache, void* cc_buf)
2856             : gcache_(gcache)
2857             , cc_buf_(cc_buf) { }
2858         CcBufDiscard(const CcBufDiscard&) = delete;
2859         CcBufDiscard& operator=(const CcBufDiscard&) = delete;
2860         ~CcBufDiscard()
2861         {
2862             if (cc_buf_) gcache_.free(cc_buf_);
2863         }
2864         void keep(wsrep_seqno_t const cc_seqno) // keep cc_buf_ in gcache_
2865         {
2866             gu_trace(gcache_.seqno_assign(cc_buf_, cc_seqno,
2867                                           GCS_ACT_CCHANGE, false));
2868             cc_buf_ = 0;
2869         }
2870     private:
2871         gcache::GCache& gcache_;
2872         void* cc_buf_;
2873     } cc_buf_discard(gcache_, cc_buf);
2874 
2875     // Processing local primary conf change, so this node must always
2876     // be in conf change as indicated by my_index.
2877     assert(my_index >= 0);
2878     int const group_proto_version(conf.repl_proto_ver);
2879 
2880     wsrep_uuid_t new_uuid(uuid_);
2881     auto const view_info(std::unique_ptr<wsrep_view_info_t, ViewInfoDeleter>(
2882                              galera_view_info_create(
2883                                  conf,
2884                                  capabilities(group_proto_version),
2885                                  my_index, new_uuid)));
2886     assert(view_info->my_idx == my_index);
2887     // Will abort if validation fails
2888     validate_local_prim_view_info(view_info.get(), uuid_);
2889 
2890     bool const ordered(group_proto_version >= PROTO_VER_ORDERED_CC);
2891     const wsrep_uuid_t& group_uuid (view_info->state_id.uuid);
2892     wsrep_seqno_t const group_seqno(view_info->state_id.seqno);
2893 
2894     assert(group_seqno == conf.seqno);
2895     assert(not ordered || group_seqno > 0);
2896 
2897     /* Invalidate sst_seqno_ in case of group change. */
2898     if (state_uuid_ != group_uuid) sst_seqno_ = WSREP_SEQNO_UNDEFINED;
2899 
2900     if (conf.seqno <= sst_seqno_)
2901     {
2902         // contained already in SST, skip any further processing
2903         if (skip_prim_conf_change(*view_info, group_proto_version))
2904         {
2905             // was not part of IST, don't discard
2906             cc_buf_discard.keep(conf.seqno);
2907         }
2908         return;
2909     }
2910 
2911     log_info << "####### processing CC " << group_seqno
2912              << ", local"
2913              << (ordered ? ", ordered" : ", unordered");
2914 
2915     drain_monitors_for_local_conf_change();
2916 
2917     int const prev_protocol_version(protocol_version_);
2918 
2919     const bool first_view(uuid_ == WSREP_UUID_UNDEFINED);
2920     if (first_view)
2921     {
2922         process_first_view(view_info.get(), new_uuid);
2923     }
2924     else if (state_uuid_ != group_uuid)
2925     {
2926         process_group_change(view_info.get());
2927     }
2928 
2929     log_info << "####### My UUID: " << uuid_;
2930 
2931     safe_to_bootstrap_ = (view_info->memb_num == 1);
2932 
2933     gcs_node_state_t const my_state(conf.memb[my_index].state_);
2934 
2935     assert(my_state > GCS_NODE_STATE_NON_PRIM);
2936     assert(my_state < GCS_NODE_STATE_MAX);
2937 
2938     update_incoming_list(*view_info);
2939 
2940     bool const st_required
2941         (state_transfer_required(*view_info, group_proto_version,
2942                                  my_state == GCS_NODE_STATE_PRIM));
2943     Replicator::State const next_state(state2repl(my_state, my_index));
2944 
2945     // if protocol version >= PROTO_VER_ORDERED_CC, first CC already
2946     // carries seqno 1, so it can't be less than 1. For older protocols
2947     // it can be 0.
2948     assert(group_seqno >= (group_proto_version >= PROTO_VER_ORDERED_CC));
2949 
2950     reset_index_if_needed(view_info.get(),
2951                           prev_protocol_version,
2952                           group_proto_version,
2953                           st_required);
2954 
2955     if (st_required)
2956     {
2957         process_st_required(recv_ctx, group_proto_version, view_info.get());
2958         // Rolling upgrade from earlier version. Group protocol version
2959         // PROTO_VER_GALERA_3_MAX and below do not get CCs from the IST,
2960         // so protocol versions are not established at this point yet.
2961         // Do it now before continuing.
2962         if (group_proto_version <= PROTO_VER_GALERA_3_MAX)
2963         {
2964             establish_protocol_versions(group_proto_version);
2965         }
2966         return;
2967     }
2968 
2969     // From this point on the CC is known to be processed in order.
2970     assert(group_seqno > cert_.position());
2971 
2972     // This CC is processed in order. Establish protocol versions,
2973     // it must be done before cert_.adjust_position().
2974     establish_protocol_versions (group_proto_version);
2975     /* since CC does not pass certification, need to adjust cert
2976      * position explicitly (when processed in order) */
2977     /* flushes service thd, must be called before gcache_.seqno_reset()*/
2978     cert_.adjust_position(*view_info,
2979                           gu::GTID(group_uuid, group_seqno),
2980                           trx_params_.version_);
2981 
2982     if (first_view)
2983     {
2984         /* if CC is ordered need to use preceding seqno */
2985         set_initial_position(group_uuid, group_seqno - ordered);
2986         gcache_.seqno_reset(gu::GTID(group_uuid, group_seqno - ordered));
2987     }
2988     else
2989     {
2990         // Note: Monitor initial position setting is not needed as this CC
2991         // is processed in order.
2992         assert(state_uuid_ == group_uuid);
2993         update_state_uuid(group_uuid);
2994     }
2995 
2996     /* CCs from IST already have seqno assigned and cert. position
2997      * adjusted */
2998     if (ordered)
2999     {
3000         cc_buf_discard.keep(group_seqno);
3001     }
3002     shift_to_next_state(next_state);
3003 
3004     submit_ordered_view_info(recv_ctx, view_info.get());
3005 
3006     finish_local_prim_conf_change(group_proto_version, group_seqno, "group");
3007 
3008     // Cancel monitors after view event has been processed by the
3009     // application. Otherwise last_committed_id() will return incorrect
3010     // value if called from view callback.
3011     if (ordered)
3012         cancel_seqno(group_seqno);
3013 }
3014 
process_join(wsrep_seqno_t seqno_j,wsrep_seqno_t seqno_l)3015 void galera::ReplicatorSMM::process_join(wsrep_seqno_t seqno_j,
3016                                          wsrep_seqno_t seqno_l)
3017 {
3018     LocalOrder lo(seqno_l);
3019 
3020     gu_trace(local_monitor_.enter(lo));
3021 
3022     wsrep_seqno_t const upto(cert_.position());
3023     drain_monitors(upto);
3024 
3025     if (seqno_j < 0 && S_JOINING == state_())
3026     {
3027         // #595, @todo: find a way to re-request state transfer
3028         log_fatal << "Failed to receive state transfer: " << seqno_j
3029                   << " (" << strerror (-seqno_j) << "), need to restart.";
3030         abort();
3031     }
3032     else
3033     {
3034         state_.shift_to(S_JOINED);
3035         sst_state_ = SST_NONE;
3036     }
3037 
3038     local_monitor_.leave(lo);
3039 }
3040 
3041 
process_sync(wsrep_seqno_t seqno_l)3042 void galera::ReplicatorSMM::process_sync(wsrep_seqno_t seqno_l)
3043 {
3044     LocalOrder lo(seqno_l);
3045 
3046     gu_trace(local_monitor_.enter(lo));
3047 
3048     wsrep_seqno_t const upto(cert_.position());
3049     drain_monitors(upto);
3050 
3051     state_.shift_to(S_SYNCED);
3052     if (synced_cb_(app_ctx_) != WSREP_CB_SUCCESS)
3053     {
3054         log_fatal << "Synced callback failed. This is unrecoverable, "
3055                   << "restart required.";
3056         abort();
3057     }
3058     local_monitor_.leave(lo);
3059 }
3060 
pause()3061 wsrep_seqno_t galera::ReplicatorSMM::pause()
3062 {
3063     // Grab local seqno for local_monitor_
3064     wsrep_seqno_t const local_seqno(
3065         static_cast<wsrep_seqno_t>(gcs_.local_sequence()));
3066     LocalOrder lo(local_seqno);
3067     local_monitor_.enter(lo);
3068 
3069     // Local monitor should take care that concurrent
3070     // pause requests are enqueued
3071     assert(pause_seqno_ == WSREP_SEQNO_UNDEFINED);
3072     pause_seqno_ = local_seqno;
3073 
3074     // Get drain seqno from cert index
3075     wsrep_seqno_t const upto(cert_.position());
3076     drain_monitors(upto);
3077 
3078     assert (apply_monitor_.last_left() >= upto);
3079     if (co_mode_ != CommitOrder::BYPASS)
3080     {
3081         assert (commit_monitor_.last_left() >= upto);
3082         assert (commit_monitor_.last_left() == apply_monitor_.last_left());
3083     }
3084 
3085     wsrep_seqno_t const ret(last_committed());
3086     st_.set(state_uuid_, ret, safe_to_bootstrap_);
3087 
3088     log_info << "Provider paused at " << state_uuid_ << ':' << ret
3089              << " (" << pause_seqno_ << ")";
3090 
3091     return ret;
3092 }
3093 
resume()3094 void galera::ReplicatorSMM::resume()
3095 {
3096     if (pause_seqno_ == WSREP_SEQNO_UNDEFINED)
3097     {
3098         log_warn << "tried to resume unpaused provider";
3099         return;
3100     }
3101 
3102     st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
3103     log_info << "resuming provider at " << pause_seqno_;
3104     LocalOrder lo(pause_seqno_);
3105     pause_seqno_ = WSREP_SEQNO_UNDEFINED;
3106     local_monitor_.leave(lo);
3107     log_info << "Provider resumed.";
3108 }
3109 
desync()3110 void galera::ReplicatorSMM::desync()
3111 {
3112     wsrep_seqno_t seqno_l;
3113 
3114     ssize_t const ret(gcs_.desync(seqno_l));
3115 
3116     if (seqno_l > 0)
3117     {
3118         LocalOrder lo(seqno_l); // need to process it regardless of ret value
3119 
3120         if (ret == 0)
3121         {
3122 /* #706 - the check below must be state request-specific. We are not holding
3123           any locks here and must be able to wait like any other action.
3124           However practice may prove different, leaving it here as a reminder.
3125             if (local_monitor_.would_block(seqno_l))
3126             {
3127                 gu_throw_error (-EDEADLK) << "Ran out of resources waiting to "
3128                                           << "desync the node. "
3129                                           << "The node must be restarted.";
3130             }
3131 */
3132             local_monitor_.enter(lo);
3133             if (state_() != S_DONOR) state_.shift_to(S_DONOR);
3134             local_monitor_.leave(lo);
3135         }
3136         else
3137         {
3138             local_monitor_.self_cancel(lo);
3139         }
3140     }
3141 
3142     if (ret)
3143     {
3144         gu_throw_error (-ret) << "Node desync failed.";
3145     }
3146 }
3147 
resync()3148 void galera::ReplicatorSMM::resync()
3149 {
3150     gcs_.join(gu::GTID(state_uuid_, commit_monitor_.last_left()), 0);
3151 }
3152 
3153 
3154 //////////////////////////////////////////////////////////////////////
3155 //////////////////////////////////////////////////////////////////////
3156 ////                           Private
3157 //////////////////////////////////////////////////////////////////////
3158 //////////////////////////////////////////////////////////////////////
3159 
3160 /* process pending queue events scheduled before seqno */
process_pending_queue(wsrep_seqno_t local_seqno)3161 void galera::ReplicatorSMM::process_pending_queue(wsrep_seqno_t local_seqno)
3162 {
3163     // This method should be called only from code paths of local
3164     // processing, i.e. events from group.
3165     assert(local_seqno > 0);
3166     // pending_cert_queue_ contains all writesets that:
3167     //   a) were BF aborted before being certified
3168     //   b) are not going to be replayed even though
3169     //      cert_for_aborted() returned TEST_OK for them
3170     //
3171     // Before certifying the current seqno, check if
3172     // pending_cert_queue contains any smaller seqno.
3173     // This avoids the certification index to diverge
3174     // across nodes.
3175     TrxHandleSlavePtr queued_ts;
3176     while ((queued_ts = pending_cert_queue_.must_cert_next(local_seqno)) != 0)
3177     {
3178         log_debug << "must cert next " << local_seqno
3179                   << " aborted ts " << *queued_ts;
3180 
3181         Certification::TestResult const result(cert_.append_trx(queued_ts));
3182 
3183         log_debug << "trx in pending cert queue certified, result: " << result;
3184 
3185         assert(!queued_ts->cert_bypass() ||
3186                Certification::TestResult::TEST_OK == result);
3187 
3188         bool const skip(Certification::TestResult::TEST_FAILED == result &&
3189                         !(queued_ts->cert_bypass()/* expl. ROLLBACK */));
3190 
3191         /* at this point we are still assigning seqno to buffer in order */
3192         gcache_.seqno_assign(queued_ts->action().first,
3193                              queued_ts->global_seqno(),
3194                              GCS_ACT_WRITESET, skip);
3195 
3196         cert_.set_trx_committed(*queued_ts);
3197     }
3198 }
3199 
enter_local_monitor_for_cert(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3200 bool galera::ReplicatorSMM::enter_local_monitor_for_cert(
3201     TrxHandleMaster* trx,
3202     const TrxHandleSlavePtr& ts)
3203 {
3204     bool       in_replay(trx != 0 &&
3205                          trx->state() == TrxHandle::S_MUST_REPLAY);
3206 
3207     bool interrupted(false);
3208     try
3209     {
3210         if (trx != 0)
3211         {
3212             if (in_replay == false) TX_SET_STATE(*trx, TrxHandle::S_CERTIFYING);
3213             trx->unlock();
3214         }
3215         LocalOrder lo(*ts);
3216         if (in_replay == false || local_monitor_.entered(lo) == false)
3217         {
3218             gu_trace(local_monitor_.enter(lo));
3219         }
3220 
3221         if (trx != 0) trx->lock();
3222 
3223         assert(trx == 0 ||
3224                (trx->state() == TrxHandle::S_CERTIFYING ||
3225                 trx->state() == TrxHandle::S_MUST_ABORT ||
3226                 trx->state() == TrxHandle::S_MUST_REPLAY));
3227 
3228         TX_SET_STATE(*ts, TrxHandle::S_CERTIFYING);
3229     }
3230     catch (gu::Exception& e)
3231     {
3232         if (trx != 0) trx->lock();
3233         if (e.get_errno() == EINTR) { interrupted = true; }
3234         else throw;
3235     }
3236     return (not interrupted);
3237 }
3238 
handle_local_monitor_interrupted(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3239 wsrep_status_t galera::ReplicatorSMM::handle_local_monitor_interrupted(
3240     TrxHandleMaster* trx,
3241     const TrxHandleSlavePtr& ts)
3242 {
3243     assert(trx != 0);
3244     // Did not enter local monitor.
3245     assert(ts->state() == TrxHandle::S_REPLICATING);
3246     wsrep_status_t retval(cert_for_aborted(ts));
3247 
3248     if (WSREP_TRX_FAIL != retval)
3249     {
3250         assert(ts->state() == TrxHandle::S_REPLICATING ||
3251                ts->state() == TrxHandle::S_CERTIFYING);
3252         assert(WSREP_BF_ABORT == retval);
3253         assert(trx != 0);
3254 
3255         // If the transaction was committing, it must replay.
3256         if (ts->flags() & TrxHandle::F_COMMIT)
3257         {
3258             // Return immediately without canceling local monitor,
3259             // it needs to be grabbed again in replay stage.
3260             TX_SET_STATE(*trx, TrxHandle::S_MUST_REPLAY);
3261             return retval;
3262         }
3263         // if not - we need to rollback, so pretend that certification
3264         // failed, but still update cert index to match slaves
3265         else
3266         {
3267             pending_cert_queue_.push(ts);
3268             retval = WSREP_TRX_FAIL;
3269         }
3270     }
3271     else
3272     {
3273         assert(ts->is_dummy());
3274         pending_cert_queue_.push(ts);
3275     }
3276 
3277     assert(WSREP_TRX_FAIL == retval);
3278     TX_SET_STATE(*trx, TrxHandle::S_ABORTING);
3279 
3280     LocalOrder lo(*ts);
3281     local_monitor_.self_cancel(lo);
3282     // Cert for aborted returned certification failure, so this
3283     // trx will roll back. Mark it as certified to denote that
3284     // local monitor must not be accessed again.
3285     TX_SET_STATE(*ts, TrxHandle::S_CERTIFYING);
3286 
3287     assert((retval == WSREP_TRX_FAIL && ts->is_dummy()) ||
3288            retval == WSREP_BF_ABORT || ts->queued());
3289     return retval;
3290 }
3291 
finish_cert(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3292 wsrep_status_t galera::ReplicatorSMM::finish_cert(
3293     TrxHandleMaster* trx,
3294     const TrxHandleSlavePtr& ts)
3295 {
3296     assert(ts->state() == TrxHandle::S_CERTIFYING);
3297 
3298     gu_trace(process_pending_queue(ts->local_seqno()));
3299 
3300     // Write sets which would overlap with IST must have already been
3301     // filtered out before getting here.
3302     assert(ts->global_seqno() == cert_.position() + 1);
3303 
3304     wsrep_status_t retval;
3305     switch (cert_.append_trx(ts))
3306     {
3307     case Certification::TEST_OK:
3308         // NBO_END should certify positively only if it ends NBO
3309         assert(ts->ends_nbo() > 0 || !ts->nbo_end());
3310         if (trx != 0 && trx->state() == TrxHandle::S_MUST_ABORT)
3311         {
3312             if (ts->flags() & TrxHandle::F_COMMIT)
3313             {
3314                 TX_SET_STATE(*trx, TrxHandle::S_MUST_REPLAY);
3315                 // apply monitor will be entered during replay
3316             }
3317             else
3318             {
3319                 // Abort the transaction if non-committing
3320                 // fragment was BF aborted during certification.
3321                 TX_SET_STATE(*trx, TrxHandle::S_ABORTING);
3322             }
3323             retval = WSREP_BF_ABORT;
3324         }
3325         else
3326         {
3327             retval = WSREP_OK;
3328         }
3329         assert(!ts->is_dummy());
3330         break;
3331     case Certification::TEST_FAILED:
3332         assert(ts->is_dummy());
3333         if (ts->nbo_end()) assert(ts->ends_nbo() == WSREP_SEQNO_UNDEFINED);
3334         local_cert_failures_ += ts->local();
3335         if (trx != 0) TX_SET_STATE(*trx, TrxHandle::S_ABORTING);
3336         retval = WSREP_TRX_FAIL;
3337         break;
3338     default:
3339         retval = WSREP_TRX_FAIL;
3340         assert(0);
3341         break;
3342     }
3343 
3344     // at this point we are about to leave local_monitor_. Make sure
3345     // trx checksum was alright before that.
3346     ts->verify_checksum();
3347 
3348     // we must do seqno assignment 'in order' for std::map reasons,
3349     // so keeping it inside the monitor. NBO end should never be skipped.
3350     bool const skip(ts->is_dummy() && !ts->nbo_end());
3351     gcache_.seqno_assign (ts->action().first, ts->global_seqno(),
3352                           GCS_ACT_WRITESET, skip);
3353 
3354     LocalOrder lo(*ts);
3355     local_monitor_.leave(lo);
3356 
3357     return retval;
3358 }
3359 
3360 /* don't use this directly, use cert_and_catch() instead */
3361 inline
cert(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3362 wsrep_status_t galera::ReplicatorSMM::cert(TrxHandleMaster* trx,
3363                                            const TrxHandleSlavePtr& ts)
3364 {
3365     assert(trx == 0 ||
3366            (trx->state() == TrxHandle::S_REPLICATING ||
3367             trx->state() == TrxHandle::S_MUST_REPLAY));
3368     assert(ts->state() == TrxHandle::S_REPLICATING);
3369 
3370     assert(ts->local_seqno()     != WSREP_SEQNO_UNDEFINED);
3371     assert(ts->global_seqno()    != WSREP_SEQNO_UNDEFINED);
3372     assert(ts->last_seen_seqno() >= 0);
3373     assert(ts->last_seen_seqno() < ts->global_seqno());
3374 
3375     LocalOrder lo(*ts);
3376     // Local monitor is either released or canceled in
3377     // handle_local_monitor_interrupted(), finish_cert().
3378     bool interrupted(not enter_local_monitor_for_cert(trx, ts));
3379 
3380     if (gu_unlikely (interrupted))
3381     {
3382         return handle_local_monitor_interrupted(trx, ts);
3383     }
3384     else
3385     {
3386         return finish_cert(trx, ts);
3387     }
3388 }
3389 
3390 /* pretty much any exception in cert() is fatal as it blocks local_monitor_ */
cert_and_catch(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3391 wsrep_status_t galera::ReplicatorSMM::cert_and_catch(
3392     TrxHandleMaster* trx,
3393     const TrxHandleSlavePtr& ts)
3394 {
3395     try
3396     {
3397         return cert(trx, ts);
3398     }
3399     catch (std::exception& e)
3400     {
3401         log_fatal << "Certification exception: " << e.what();
3402     }
3403     catch (...)
3404     {
3405         log_fatal << "Unknown certification exception";
3406     }
3407     assert(0);
3408     abort();
3409 }
3410 
3411 /* This must be called BEFORE local_monitor_.self_cancel() due to
3412  * gcache_.seqno_assign() */
cert_for_aborted(const TrxHandleSlavePtr & ts)3413 wsrep_status_t galera::ReplicatorSMM::cert_for_aborted(
3414     const TrxHandleSlavePtr& ts)
3415 {
3416     // trx was BF aborted either while it was replicating or
3417     // while it was waiting for local monitor
3418     assert(ts->state() == TrxHandle::S_REPLICATING ||
3419            ts->state() == TrxHandle::S_CERTIFYING);
3420 
3421     Certification::TestResult const res(cert_.test(ts, false));
3422 
3423     switch (res)
3424     {
3425     case Certification::TEST_OK:
3426         return WSREP_BF_ABORT;
3427 
3428     case Certification::TEST_FAILED:
3429         // Next step will be monitors release. Make sure that ws was not
3430         // corrupted and cert failure is real before proceeding with that.
3431  //gcf788 - this must be moved to cert(), the caller method
3432         assert(ts->is_dummy());
3433         ts->verify_checksum();
3434         assert(!ts->nbo_end()); // should never be skipped in seqno_assign()
3435         return WSREP_TRX_FAIL;
3436 
3437     default:
3438         log_fatal << "Unexpected return value from Certification::test(): "
3439                   << res;
3440         abort();
3441     }
3442 }
3443 
enter_apply_monitor_for_local(TrxHandleMaster & trx,const TrxHandleSlavePtr & ts)3444 bool galera::ReplicatorSMM::enter_apply_monitor_for_local(
3445     TrxHandleMaster& trx,
3446     const TrxHandleSlavePtr& ts)
3447 {
3448     assert(ts->global_seqno() > last_committed());
3449     assert(ts->depends_seqno() >= 0);
3450 
3451     TX_SET_STATE(trx, TrxHandle::S_APPLYING);
3452 
3453     ApplyOrder ao(*ts);
3454     bool interrupted(false);
3455 
3456     try
3457     {
3458         trx.unlock();
3459         GU_DBUG_SYNC_WAIT("before_certify_apply_monitor_enter");
3460         gu_trace(apply_monitor_.enter(ao));
3461         GU_DBUG_SYNC_WAIT("after_certify_apply_monitor_enter");
3462         trx.lock();
3463         assert(trx.state() == TrxHandle::S_APPLYING ||
3464                trx.state() == TrxHandle::S_MUST_ABORT);
3465     }
3466     catch (gu::Exception& e)
3467     {
3468         trx.lock();
3469         if (e.get_errno() == EINTR)
3470         {
3471             interrupted = true;
3472         }
3473         else throw;
3474     }
3475     return (not interrupted);
3476 }
3477 
handle_apply_monitor_interrupted(TrxHandleMaster & trx,const TrxHandleSlavePtr & ts)3478 wsrep_status_t galera::ReplicatorSMM::handle_apply_monitor_interrupted(
3479     TrxHandleMaster& trx,
3480     const TrxHandleSlavePtr& ts)
3481 {
3482     assert(trx.state() == TrxHandle::S_MUST_ABORT);
3483     assert(ts->state() == TrxHandle::S_CERTIFYING);
3484 
3485     wsrep_status_t retval;
3486     if (ts->flags() & TrxHandle::F_COMMIT)
3487     {
3488         TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY);
3489         retval = WSREP_BF_ABORT;
3490     }
3491     else
3492     {
3493         TX_SET_STATE(trx, TrxHandle::S_ABORTING);
3494         retval = WSREP_TRX_FAIL;
3495     }
3496     return retval;
3497 }
3498 
enter_apply_monitor_for_local_not_committing(const TrxHandleMaster & trx,TrxHandleSlave & ts)3499 void galera::ReplicatorSMM::enter_apply_monitor_for_local_not_committing(
3500     const TrxHandleMaster& trx,
3501     TrxHandleSlave& ts)
3502 {
3503     assert(trx.state() == TrxHandle::S_ABORTING ||
3504            trx.state() == TrxHandle::S_REPLAYING);
3505     assert(ts.state() < TrxHandle::S_COMMITTING);
3506     switch (ts.state())
3507     {
3508     case TrxHandle::S_REPLICATING:
3509         TX_SET_STATE(ts, TrxHandle::S_CERTIFYING);
3510         // fall through
3511     case TrxHandle::S_CERTIFYING:
3512     {
3513         ApplyOrder ao(ts);
3514         apply_monitor_.enter(ao);
3515         TX_SET_STATE(ts, TrxHandle::S_APPLYING);
3516         break;
3517     }
3518     case TrxHandle::S_APPLYING:
3519         break;
3520     default:
3521         assert(0); // Should never happen
3522     }
3523 }
3524 
3525 void
update_state_uuid(const wsrep_uuid_t & uuid)3526 galera::ReplicatorSMM::update_state_uuid (const wsrep_uuid_t& uuid)
3527 {
3528     if (state_uuid_ != uuid)
3529     {
3530         *(const_cast<wsrep_uuid_t*>(&state_uuid_)) = uuid;
3531 
3532         std::ostringstream os; os << state_uuid_;
3533         // Copy only non-nil terminated part of the source string
3534         // and terminate the string explicitly to silence a warning
3535         // generated by Wstringop-truncation
3536         char* str(const_cast<char*>(state_uuid_str_));
3537         strncpy(str, os.str().c_str(), sizeof(state_uuid_str_) - 1);
3538         str[sizeof(state_uuid_str_) - 1] = '\0';
3539     }
3540 
3541     st_.set(uuid, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
3542 }
3543 
3544 void
abort()3545 galera::ReplicatorSMM::abort()
3546 {
3547     log_info << "ReplicatorSMM::abort()";
3548     gcs_.close();
3549     gu_abort();
3550 }
3551