1 //
2 // Copyright (C) 2010-2021 Codership Oy <info@codership.com>
3 //
4 
5 #include "galera_common.hpp"
6 #include "replicator_smm.hpp"
7 #include "galera_exception.hpp"
8 #include "uuid.hpp"
9 
10 #include "galera_info.hpp"
11 
12 #include <gu_debug_sync.hpp>
13 #include <gu_abort.h>
14 
15 #include <sstream>
16 #include <iostream>
17 
18 
19 static void
apply_trx_ws(void * recv_ctx,wsrep_apply_cb_t apply_cb,wsrep_commit_cb_t commit_cb,const galera::TrxHandle & trx,const wsrep_trx_meta_t & meta)20 apply_trx_ws(void*                    recv_ctx,
21              wsrep_apply_cb_t         apply_cb,
22              wsrep_commit_cb_t        commit_cb,
23              const galera::TrxHandle& trx,
24              const wsrep_trx_meta_t&  meta)
25 {
26     using galera::TrxHandle;
27     static const size_t max_apply_attempts(4);
28     size_t attempts(1);
29 
30     do
31     {
32         try
33         {
34             gu_trace(trx.apply(recv_ctx, apply_cb, meta));
35             break;
36         }
37         catch (galera::ApplyException& e)
38         {
39             if (trx.is_toi())
40             {
41                 log_warn << "Ignoring error for TO isolated action: " << trx;
42                 break;
43             }
44             else
45             {
46                 int const err(e.status());
47 
48                 if (err > 0)
49                 {
50                     wsrep_bool_t unused(false);
51                     wsrep_cb_status const rcode(
52                         commit_cb(
53                             recv_ctx,
54                             TrxHandle::trx_flags_to_wsrep_flags(trx.flags()),
55                             &meta,
56                             &unused,
57                             false));
58                     if (WSREP_CB_SUCCESS != rcode)
59                     {
60                         gu_throw_fatal << "Rollback failed. Trx: " << trx;
61                     }
62 
63                     ++attempts;
64 
65                     if (attempts <= max_apply_attempts)
66                     {
67                         log_warn << e.what()
68                                  << "\nRetrying " << attempts << "th time";
69                     }
70                 }
71                 else
72                 {
73                     GU_TRACE(e);
74                     throw;
75                 }
76             }
77         }
78     }
79     while (attempts <= max_apply_attempts);
80 
81     if (gu_unlikely(attempts > max_apply_attempts))
82     {
83         std::ostringstream msg;
84 
85         msg << "Failed to apply trx " << trx.global_seqno() << " "
86             << max_apply_attempts << " times";
87 
88         throw galera::ApplyException(msg.str(), WSREP_CB_FAILURE);
89     }
90 
91     return;
92 }
93 
94 
operator <<(std::ostream & os,ReplicatorSMM::State state)95 std::ostream& galera::operator<<(std::ostream& os, ReplicatorSMM::State state)
96 {
97     switch (state)
98     {
99     case ReplicatorSMM::S_DESTROYED: return (os << "DESTROYED");
100     case ReplicatorSMM::S_CLOSED:    return (os << "CLOSED");
101     case ReplicatorSMM::S_CLOSING:   return (os << "CLOSING");
102     case ReplicatorSMM::S_CONNECTED: return (os << "CONNECTED");
103     case ReplicatorSMM::S_JOINING:   return (os << "JOINING");
104     case ReplicatorSMM::S_JOINED:    return (os << "JOINED");
105     case ReplicatorSMM::S_SYNCED:    return (os << "SYNCED");
106     case ReplicatorSMM::S_DONOR:     return (os << "DONOR");
107     }
108 
109     gu_throw_fatal << "invalid state " << static_cast<int>(state);
110 }
111 
112 //////////////////////////////////////////////////////////////////////
113 //////////////////////////////////////////////////////////////////////
114 //                           Public
115 //////////////////////////////////////////////////////////////////////
116 //////////////////////////////////////////////////////////////////////
117 
ReplicatorSMM(const struct wsrep_init_args * args)118 galera::ReplicatorSMM::ReplicatorSMM(const struct wsrep_init_args* args)
119     :
120     init_lib_           (reinterpret_cast<gu_log_cb_t>(args->logger_cb)),
121     config_             (),
122     init_config_        (config_, args->node_address, args->data_dir),
123     parse_options_      (*this, config_, args->options),
124     init_ssl_           (config_),
125     str_proto_ver_      (-1),
126     protocol_version_   (-1),
127     proto_max_          (gu::from_string<int>(config_.get(Param::proto_max))),
128     state_              (S_CLOSED),
129     sst_state_          (SST_NONE),
130     co_mode_            (CommitOrder::from_string(
131                              config_.get(Param::commit_order))),
132     state_file_         (config_.get(BASE_DIR)+'/'+GALERA_STATE_FILE),
133     st_                 (state_file_),
134     safe_to_bootstrap_  (true),
135     trx_params_         (config_.get(BASE_DIR), -1,
136                          KeySet::version(config_.get(Param::key_format)),
137                          TrxHandle::Defaults.record_set_ver_,
138                          gu::from_string<int>(config_.get(
139                              Param::max_write_set_size))),
140     uuid_               (WSREP_UUID_UNDEFINED),
141     state_uuid_         (WSREP_UUID_UNDEFINED),
142     state_uuid_str_     (),
143     cc_seqno_           (WSREP_SEQNO_UNDEFINED),
144     pause_seqno_        (WSREP_SEQNO_UNDEFINED),
145     app_ctx_            (args->app_ctx),
146     view_cb_            (args->view_handler_cb),
147     apply_cb_           (args->apply_cb),
148     commit_cb_          (args->commit_cb),
149     unordered_cb_       (args->unordered_cb),
150     sst_donate_cb_      (args->sst_donate_cb),
151     synced_cb_          (args->synced_cb),
152     sst_donor_          (),
153     sst_uuid_           (WSREP_UUID_UNDEFINED),
154     sst_seqno_          (WSREP_SEQNO_UNDEFINED),
155     sst_mutex_          (),
156     sst_cond_           (),
157     sst_retry_sec_      (1),
158     last_st_type_       (ST_TYPE_NONE),
159     gcache_             (config_, config_.get(BASE_DIR)),
160     gcs_                (config_, gcache_, proto_max_, args->proto_ver,
161                          args->node_name, args->node_incoming),
162     service_thd_        (gcs_, gcache_),
163     slave_pool_         (sizeof(TrxHandle), 1024, "SlaveTrxHandle"),
164     as_                 (0),
165     gcs_as_             (slave_pool_, gcs_, *this, gcache_),
166     ist_receiver_       (config_, slave_pool_, args->node_address),
167     ist_senders_        (gcs_, gcache_),
168     wsdb_               (),
169     cert_               (config_, service_thd_),
170     local_monitor_      (),
171     apply_monitor_      (),
172     commit_monitor_     (),
173     causal_read_timeout_(config_.get(Param::causal_read_timeout)),
174     receivers_          (),
175     replicated_         (),
176     replicated_bytes_   (),
177     keys_count_         (),
178     keys_bytes_         (),
179     data_bytes_         (),
180     unrd_bytes_         (),
181     local_commits_      (),
182     local_rollbacks_    (),
183     local_cert_failures_(),
184     local_replays_      (),
185     causal_reads_       (),
186     preordered_id_      (),
187     incoming_list_      (""),
188     incoming_mutex_     (),
189     wsrep_stats_        ()
190 {
191     // @todo add guards (and perhaps actions)
192     state_.add_transition(Transition(S_CLOSED,  S_DESTROYED));
193     state_.add_transition(Transition(S_CLOSED,  S_CONNECTED));
194     state_.add_transition(Transition(S_CLOSING, S_CLOSED));
195 
196     state_.add_transition(Transition(S_CONNECTED, S_CLOSING));
197     state_.add_transition(Transition(S_CONNECTED, S_CONNECTED));
198     state_.add_transition(Transition(S_CONNECTED, S_JOINING));
199     // the following is possible only when bootstrapping new cluster
200     // (trivial wsrep_cluster_address)
201     state_.add_transition(Transition(S_CONNECTED, S_JOINED));
202     // the following are possible on PC remerge
203     state_.add_transition(Transition(S_CONNECTED, S_DONOR));
204     state_.add_transition(Transition(S_CONNECTED, S_SYNCED));
205 
206     state_.add_transition(Transition(S_JOINING, S_CLOSING));
207     // the following is possible if one non-prim conf follows another
208     state_.add_transition(Transition(S_JOINING, S_CONNECTED));
209     state_.add_transition(Transition(S_JOINING, S_JOINED));
210 
211     state_.add_transition(Transition(S_JOINED, S_CLOSING));
212     state_.add_transition(Transition(S_JOINED, S_CONNECTED));
213     state_.add_transition(Transition(S_JOINED, S_SYNCED));
214     // the following is possible if one desync() immediately follows another
215     state_.add_transition(Transition(S_JOINED, S_DONOR));
216 
217     state_.add_transition(Transition(S_SYNCED, S_CLOSING));
218     state_.add_transition(Transition(S_SYNCED, S_CONNECTED));
219     state_.add_transition(Transition(S_SYNCED, S_DONOR));
220 
221     state_.add_transition(Transition(S_DONOR, S_CLOSING));
222     state_.add_transition(Transition(S_DONOR, S_CONNECTED));
223     state_.add_transition(Transition(S_DONOR, S_JOINED));
224 
225     local_monitor_.set_initial_position(0);
226 
227     wsrep_uuid_t  uuid;
228     wsrep_seqno_t seqno;
229 
230     st_.get (uuid, seqno, safe_to_bootstrap_);
231 
232     if (0 != args->state_id &&
233         args->state_id->uuid != WSREP_UUID_UNDEFINED &&
234         args->state_id->uuid == uuid                 &&
235         seqno                == WSREP_SEQNO_UNDEFINED)
236     {
237         /* non-trivial recovery information provided on startup, and db is safe
238          * so use recovered seqno value */
239         seqno = args->state_id->seqno;
240     }
241 
242     log_debug << "End state: " << uuid << ':' << seqno << " #################";
243 
244     update_state_uuid (uuid);
245     gcache_.seqno_reset(to_gu_uuid(uuid), seqno);
246     // update gcache position to one supplied by app.
247 
248     cc_seqno_ = seqno; // is it needed here?
249 
250     // the following initialization is needed only to pass seqno to
251     // connect() call. Ideally this should be done only on receving conf change.
252     apply_monitor_.set_initial_position(seqno);
253     if (co_mode_ != CommitOrder::BYPASS)
254         commit_monitor_.set_initial_position(seqno);
255     cert_.assign_initial_position(seqno, trx_proto_ver());
256 
257     build_stats_vars(wsrep_stats_);
258 }
259 
~ReplicatorSMM()260 galera::ReplicatorSMM::~ReplicatorSMM()
261 {
262     log_info << "dtor state: " << state_();
263     switch (state_())
264     {
265     case S_CONNECTED:
266     case S_JOINING:
267     case S_JOINED:
268     case S_SYNCED:
269     case S_DONOR:
270         close();
271         // fall through
272     case S_CLOSING:
273         // @todo wait that all users have left the building
274     case S_CLOSED:
275         ist_senders_.cancel();
276         break;
277     case S_DESTROYED:
278         break;
279     }
280 }
281 
282 
connect(const std::string & cluster_name,const std::string & cluster_url,const std::string & state_donor,bool const bootstrap)283 wsrep_status_t galera::ReplicatorSMM::connect(const std::string& cluster_name,
284                                               const std::string& cluster_url,
285                                               const std::string& state_donor,
286                                               bool  const        bootstrap)
287 {
288     sst_donor_ = state_donor;
289     service_thd_.reset();
290 
291     ssize_t err;
292     wsrep_status_t ret(WSREP_OK);
293     wsrep_seqno_t const seqno(STATE_SEQNO());
294     wsrep_uuid_t  const gcs_uuid(seqno < 0 ? WSREP_UUID_UNDEFINED :state_uuid_);
295 
296     log_info << "Setting initial position to " << gcs_uuid << ':' << seqno;
297 
298     if ((bootstrap == true || cluster_url == "gcomm://")
299         && safe_to_bootstrap_ == false)
300     {
301         log_error << "It may not be safe to bootstrap the cluster from this node. "
302                   << "It was not the last one to leave the cluster and may "
303                   << "not contain all the updates. To force cluster bootstrap "
304                   << "with this node, edit the grastate.dat file manually and "
305                   << "set safe_to_bootstrap to 1 .";
306         ret = WSREP_NODE_FAIL;
307     }
308 
309     if (ret == WSREP_OK &&
310         (err = gcs_.set_initial_position(gcs_uuid, seqno)) != 0)
311     {
312         log_error << "gcs init failed:" << strerror(-err);
313         ret = WSREP_NODE_FAIL;
314     }
315 
316     if (ret == WSREP_OK &&
317         (err = gcs_.connect(cluster_name, cluster_url, bootstrap)) != 0)
318     {
319         log_error << "gcs connect failed: " << strerror(-err);
320         ret = WSREP_NODE_FAIL;
321     }
322 
323     if (ret == WSREP_OK)
324     {
325         state_.shift_to(S_CONNECTED);
326     }
327 
328     return ret;
329 }
330 
331 
close()332 wsrep_status_t galera::ReplicatorSMM::close()
333 {
334     if (state_() != S_CLOSED)
335     {
336         gcs_.close();
337     }
338 
339     return WSREP_OK;
340 }
341 
342 
async_recv(void * recv_ctx)343 wsrep_status_t galera::ReplicatorSMM::async_recv(void* recv_ctx)
344 {
345     if (state_() == S_CLOSED || state_() == S_CLOSING)
346     {
347         log_error <<"async recv cannot start, provider in closed/closing state";
348         return WSREP_FATAL;
349     }
350 
351     ++receivers_;
352     as_ = &gcs_as_;
353 
354     bool exit_loop(false);
355     wsrep_status_t retval(WSREP_OK);
356 
357     while (WSREP_OK == retval && state_() != S_CLOSING)
358     {
359         GU_DBUG_SYNC_EXECUTE("before_async_recv_process_sync", sleep(5););
360 
361         ssize_t rc;
362 
363         while (gu_unlikely((rc = as_->process(recv_ctx, exit_loop))
364                            == -ECANCELED))
365         {
366             recv_IST(recv_ctx);
367             // hack: prevent fast looping until ist controlling thread
368             // resumes gcs prosessing
369             usleep(10000);
370         }
371 
372         if (gu_unlikely(rc <= 0))
373         {
374             if (GcsActionSource::INCONSISTENCY_CODE == rc)
375             {
376                 st_.mark_corrupt();
377                 retval = WSREP_FATAL;
378             }
379             else
380             {
381                 retval = WSREP_CONN_FAIL;
382             }
383         }
384         else if (gu_unlikely(exit_loop == true))
385         {
386             assert(WSREP_OK == retval);
387 
388             if (receivers_.sub_and_fetch(1) > 0)
389             {
390                 log_info << "Slave thread exiting on request.";
391                 break;
392             }
393 
394             ++receivers_;
395             log_warn << "Refusing exit for the last slave thread.";
396         }
397     }
398 
399     /* exiting loop already did proper checks */
400     if (!exit_loop && receivers_.sub_and_fetch(1) == 0)
401     {
402         if (state_() != S_CLOSING)
403         {
404             if (retval == WSREP_OK)
405             {
406                 log_warn << "Broken shutdown sequence, provider state: "
407                          << state_() << ", retval: " << retval;
408                 assert (0);
409             }
410             else
411             {
412                 // Generate zero view before exit to notify application
413                 wsrep_view_info_t* err_view(galera_view_info_create(0, false));
414                 void* fake_sst_req(0);
415                 size_t fake_sst_req_len(0);
416                 view_cb_(app_ctx_, recv_ctx, err_view, 0, 0,
417                          &fake_sst_req, &fake_sst_req_len);
418                 free(err_view);
419             }
420             /* avoid abort in production */
421             state_.shift_to(S_CLOSING);
422         }
423         state_.shift_to(S_CLOSED);
424     }
425 
426     log_debug << "Slave thread exit. Return code: " << retval;
427 
428     return retval;
429 }
430 
431 
apply_trx(void * recv_ctx,TrxHandle * trx)432 void galera::ReplicatorSMM::apply_trx(void* recv_ctx, TrxHandle* trx)
433 {
434     assert(trx != 0);
435     assert(trx->global_seqno() > 0);
436     assert(trx->is_certified() == true);
437     assert(trx->global_seqno() > STATE_SEQNO());
438     assert(trx->is_local() == false);
439 
440     ApplyOrder ao(*trx);
441     CommitOrder co(*trx, co_mode_);
442 
443     gu_trace(apply_monitor_.enter(ao));
444     trx->set_state(TrxHandle::S_APPLYING);
445 
446     wsrep_trx_meta_t meta = {{state_uuid_, trx->global_seqno() },
447                              trx->depends_seqno()};
448 
449     if (trx->is_toi())
450     {
451         log_debug << "Executing TO isolated action: " << *trx;
452         st_.mark_unsafe();
453     }
454 
455     gu_trace(apply_trx_ws(recv_ctx, apply_cb_, commit_cb_, *trx, meta));
456     /* at this point any exception in apply_trx_ws() is fatal, not
457      * catching anything. */
458 
459     if (gu_likely(co_mode_ != CommitOrder::BYPASS))
460     {
461         gu_trace(commit_monitor_.enter(co));
462     }
463     trx->set_state(TrxHandle::S_COMMITTING);
464 
465     wsrep_bool_t exit_loop(false);
466     wsrep_cb_status_t const rcode(
467         commit_cb_(
468             recv_ctx,
469             TrxHandle::trx_flags_to_wsrep_flags(trx->flags()),
470             &meta,
471             &exit_loop,
472             true));
473 
474     if (gu_unlikely (rcode != WSREP_CB_SUCCESS))
475         gu_throw_fatal << "Commit failed. Trx: " << trx;
476 
477     if (gu_likely(co_mode_ != CommitOrder::BYPASS))
478     {
479         commit_monitor_.leave(co);
480     }
481     trx->set_state(TrxHandle::S_COMMITTED);
482 
483     if (trx->local_seqno() != -1)
484     {
485         // trx with local seqno -1 originates from IST (or other source not gcs)
486         report_last_committed(cert_.set_trx_committed(trx));
487     }
488 
489     /* For now need to keep it inside apply monitor to ensure all processing
490      * ends by the time monitors are drained because of potential gcache
491      * cleanup (and loss of the writeset buffer). Perhaps unordered monitor
492      * is needed here. */
493     trx->unordered(recv_ctx, unordered_cb_);
494 
495     apply_monitor_.leave(ao);
496 
497     if (trx->is_toi())
498     {
499         log_debug << "Done executing TO isolated action: "
500                   << trx->global_seqno();
501         st_.mark_safe();
502     }
503 
504     trx->set_exit_loop(exit_loop);
505 }
506 
507 
replicate(TrxHandle * trx,wsrep_trx_meta_t * meta)508 wsrep_status_t galera::ReplicatorSMM::replicate(TrxHandle* trx,
509                                                 wsrep_trx_meta_t* meta)
510 {
511     if (state_() < S_JOINED) return WSREP_TRX_FAIL;
512 
513     assert(trx->state() == TrxHandle::S_EXECUTING ||
514            trx->state() == TrxHandle::S_MUST_ABORT);
515     assert(trx->local_seqno() == WSREP_SEQNO_UNDEFINED &&
516            trx->global_seqno() == WSREP_SEQNO_UNDEFINED);
517 
518     wsrep_status_t retval(WSREP_TRX_FAIL);
519 
520     if (trx->state() == TrxHandle::S_MUST_ABORT)
521     {
522     must_abort:
523         trx->set_state(TrxHandle::S_ABORTING);
524         return retval;
525     }
526 
527     WriteSetNG::GatherVector actv;
528 
529     gcs_action act;
530     act.type = GCS_ACT_TORDERED;
531 #ifndef NDEBUG
532     act.seqno_g = GCS_SEQNO_ILL;
533 #endif
534 
535     if (trx->new_version())
536     {
537         act.buf  = NULL;
538         act.size = trx->write_set_out().gather(trx->source_id(),
539                                                trx->conn_id(),
540                                                trx->trx_id(),
541                                                actv);
542     }
543     else
544     {
545         trx->set_last_seen_seqno(last_committed());
546         assert (trx->last_seen_seqno() >= 0);
547         trx->flush(0);
548 
549         const MappedBuffer& wscoll(trx->write_set_collection());
550 
551         act.buf  = &wscoll[0];
552         act.size = wscoll.size();
553 
554         assert (act.buf != NULL);
555         assert (act.size > 0);
556     }
557 
558     trx->set_state(TrxHandle::S_REPLICATING);
559 
560     ssize_t rcode(-1);
561 
562     do
563     {
564         assert(act.seqno_g == GCS_SEQNO_ILL);
565 
566         const ssize_t gcs_handle(gcs_.schedule());
567 
568         if (gu_unlikely(gcs_handle < 0))
569         {
570             log_debug << "gcs schedule " << strerror(-gcs_handle);
571             trx->set_state(TrxHandle::S_MUST_ABORT);
572             goto must_abort;
573         }
574 
575         trx->set_gcs_handle(gcs_handle);
576 
577         if (trx->new_version())
578         {
579             trx->set_last_seen_seqno(last_committed());
580             assert(trx->last_seen_seqno() >= 0);
581             trx->unlock();
582             assert (act.buf == NULL); // just a sanity check
583             rcode = gcs_.replv(actv, act, true);
584         }
585         else
586         {
587             assert(trx->last_seen_seqno() >= 0);
588             trx->unlock();
589             assert (act.buf != NULL);
590             rcode = gcs_.repl(act, true);
591         }
592 
593         GU_DBUG_SYNC_WAIT("after_replicate_sync")
594         trx->lock();
595     }
596     while (rcode == -EAGAIN && trx->state() != TrxHandle::S_MUST_ABORT &&
597            (usleep(1000), true));
598 
599     assert(trx->last_seen_seqno() >= 0);
600 
601     if (rcode < 0)
602     {
603         if (rcode != -EINTR)
604         {
605             log_debug << "gcs_repl() failed with " << strerror(-rcode)
606                       << " for trx " << *trx;
607         }
608 
609         assert(rcode != -EINTR || trx->state() == TrxHandle::S_MUST_ABORT);
610         assert(act.seqno_l == GCS_SEQNO_ILL && act.seqno_g == GCS_SEQNO_ILL);
611         assert(NULL == act.buf || !trx->new_version());
612 
613         if (trx->state() != TrxHandle::S_MUST_ABORT)
614         {
615             trx->set_state(TrxHandle::S_MUST_ABORT);
616         }
617 
618         trx->set_gcs_handle(-1);
619         goto must_abort;
620     }
621 
622     assert(act.buf != NULL);
623     assert(act.size == rcode);
624     assert(act.seqno_l != GCS_SEQNO_ILL);
625     assert(act.seqno_g != GCS_SEQNO_ILL);
626 
627     ++replicated_;
628     replicated_bytes_ += rcode;
629     trx->set_gcs_handle(-1);
630 
631     if (trx->new_version())
632     {
633         gu_trace(trx->unserialize(static_cast<const gu::byte_t*>(act.buf),
634                                   act.size, 0));
635         trx->update_stats(keys_count_, keys_bytes_, data_bytes_, unrd_bytes_);
636     }
637 
638     trx->set_received(act.buf, act.seqno_l, act.seqno_g);
639 
640     if (trx->state() == TrxHandle::S_MUST_ABORT)
641     {
642         retval = cert_for_aborted(trx);
643 
644         if (retval != WSREP_BF_ABORT)
645         {
646             LocalOrder  lo(*trx);
647             ApplyOrder  ao(*trx);
648             CommitOrder co(*trx, co_mode_);
649             local_monitor_.self_cancel(lo);
650             apply_monitor_.self_cancel(ao);
651             if (co_mode_ !=CommitOrder::BYPASS) commit_monitor_.self_cancel(co);
652         }
653         else if (meta != 0)
654         {
655             meta->gtid.uuid  = state_uuid_;
656             meta->gtid.seqno = trx->global_seqno();
657             meta->depends_on = trx->depends_seqno();
658         }
659 
660         if (trx->state() == TrxHandle::S_MUST_ABORT) goto must_abort;
661     }
662     else
663     {
664         retval = WSREP_OK;
665     }
666 
667     assert(trx->last_seen_seqno() >= 0);
668 
669     return retval;
670 }
671 
672 void
abort_trx(TrxHandle * trx)673 galera::ReplicatorSMM::abort_trx(TrxHandle* trx)
674 {
675     assert(trx != 0);
676     assert(trx->is_local() == true);
677 
678     log_debug << "aborting trx " << *trx << " " << trx;
679 
680 
681     switch (trx->state())
682     {
683     case TrxHandle::S_MUST_ABORT:
684     case TrxHandle::S_ABORTING: // guess this is here because we can have a race
685         return;
686     case TrxHandle::S_EXECUTING:
687         trx->set_state(TrxHandle::S_MUST_ABORT);
688         break;
689     case TrxHandle::S_REPLICATING:
690     {
691         trx->set_state(TrxHandle::S_MUST_ABORT);
692         // trx is in gcs repl
693         int rc;
694         if (trx->gcs_handle() > 0 &&
695             ((rc = gcs_.interrupt(trx->gcs_handle()))) != 0)
696         {
697             log_debug << "gcs_interrupt(): handle "
698                       << trx->gcs_handle()
699                       << " trx id " << trx->trx_id()
700                       << ": " << strerror(-rc);
701         }
702         break;
703     }
704     case TrxHandle::S_CERTIFYING:
705     {
706         trx->set_state(TrxHandle::S_MUST_ABORT);
707         // trx is waiting in local monitor
708         LocalOrder lo(*trx);
709         trx->unlock();
710         local_monitor_.interrupt(lo);
711         trx->lock();
712         break;
713     }
714     case TrxHandle::S_APPLYING:
715     {
716         trx->set_state(TrxHandle::S_MUST_ABORT);
717         // trx is waiting in apply monitor
718         ApplyOrder ao(*trx);
719         trx->unlock();
720         apply_monitor_.interrupt(ao);
721         trx->lock();
722         break;
723     }
724     case TrxHandle::S_COMMITTING:
725         trx->set_state(TrxHandle::S_MUST_ABORT);
726         if (co_mode_ != CommitOrder::BYPASS)
727         {
728             // trx waiting in commit monitor
729             CommitOrder co(*trx, co_mode_);
730             trx->unlock();
731             commit_monitor_.interrupt(co);
732             trx->lock();
733         }
734         break;
735     default:
736         gu_throw_fatal << "invalid state " << trx->state();
737     }
738 }
739 
740 
pre_commit(TrxHandle * trx,wsrep_trx_meta_t * meta)741 wsrep_status_t galera::ReplicatorSMM::pre_commit(TrxHandle*        trx,
742                                                  wsrep_trx_meta_t* meta)
743 {
744     assert(trx->state() == TrxHandle::S_REPLICATING);
745     assert(trx->local_seqno()  > -1);
746     assert(trx->global_seqno() > -1);
747     assert(trx->last_seen_seqno() >= 0);
748 
749     if (meta != 0)
750     {
751         meta->gtid.uuid  = state_uuid_;
752         meta->gtid.seqno = trx->global_seqno();
753         meta->depends_on = trx->depends_seqno();
754     }
755     // State should not be checked here: If trx has been replicated,
756     // it has to be certified and potentially applied. #528
757     // if (state_() < S_JOINED) return WSREP_TRX_FAIL;
758 
759     wsrep_status_t retval(cert_and_catch(trx));
760 
761     if (gu_unlikely(retval != WSREP_OK))
762     {
763         assert(trx->state() == TrxHandle::S_MUST_ABORT ||
764                trx->state() == TrxHandle::S_MUST_REPLAY_AM ||
765                trx->state() == TrxHandle::S_MUST_CERT_AND_REPLAY);
766 
767         if (trx->state() == TrxHandle::S_MUST_ABORT)
768         {
769             trx->set_state(TrxHandle::S_ABORTING);
770         }
771 
772         return retval;
773     }
774 
775     assert(trx->state() == TrxHandle::S_CERTIFYING);
776     assert(trx->global_seqno() > STATE_SEQNO());
777     trx->set_state(TrxHandle::S_APPLYING);
778 
779     ApplyOrder ao(*trx);
780     CommitOrder co(*trx, co_mode_);
781     bool interrupted(false);
782 
783     try
784     {
785         gu_trace(apply_monitor_.enter(ao));
786     }
787     catch (gu::Exception& e)
788     {
789         if (e.get_errno() == EINTR) { interrupted = true; }
790         else throw;
791     }
792 
793     if (gu_unlikely(interrupted) || trx->state() == TrxHandle::S_MUST_ABORT)
794     {
795         assert(trx->state() == TrxHandle::S_MUST_ABORT);
796         if (interrupted) trx->set_state(TrxHandle::S_MUST_REPLAY_AM);
797         else             trx->set_state(TrxHandle::S_MUST_REPLAY_CM);
798         retval = WSREP_BF_ABORT;
799     }
800     else if ((trx->flags() & TrxHandle::F_COMMIT) != 0)
801     {
802         trx->set_state(TrxHandle::S_COMMITTING);
803         if (co_mode_ != CommitOrder::BYPASS)
804         {
805             try
806             {
807                 gu_trace(commit_monitor_.enter(co));
808             }
809             catch (gu::Exception& e)
810             {
811                 if (e.get_errno() == EINTR) { interrupted = true; }
812                 else throw;
813             }
814 
815             if (gu_unlikely(interrupted) ||
816                 trx->state() == TrxHandle::S_MUST_ABORT)
817             {
818                 assert(trx->state() == TrxHandle::S_MUST_ABORT);
819                 if (interrupted) trx->set_state(TrxHandle::S_MUST_REPLAY_CM);
820                 else             trx->set_state(TrxHandle::S_MUST_REPLAY);
821                 retval = WSREP_BF_ABORT;
822             }
823         }
824     }
825     else
826     {
827         trx->set_state(TrxHandle::S_EXECUTING);
828     }
829 
830     assert((retval == WSREP_OK && (trx->state() == TrxHandle::S_COMMITTING ||
831                                    trx->state() == TrxHandle::S_EXECUTING))
832            ||
833            (retval == WSREP_TRX_FAIL && trx->state() == TrxHandle::S_ABORTING)
834            ||
835            (retval == WSREP_BF_ABORT && (
836                trx->state() == TrxHandle::S_MUST_REPLAY_AM ||
837                trx->state() == TrxHandle::S_MUST_REPLAY_CM ||
838                trx->state() == TrxHandle::S_MUST_REPLAY)));
839 
840     return retval;
841 }
842 
replay_trx(TrxHandle * trx,void * trx_ctx)843 wsrep_status_t galera::ReplicatorSMM::replay_trx(TrxHandle* trx, void* trx_ctx)
844 {
845     assert(trx->state() == TrxHandle::S_MUST_CERT_AND_REPLAY ||
846            trx->state() == TrxHandle::S_MUST_REPLAY_AM       ||
847            trx->state() == TrxHandle::S_MUST_REPLAY_CM       ||
848            trx->state() == TrxHandle::S_MUST_REPLAY);
849     assert(trx->trx_id() != static_cast<wsrep_trx_id_t>(-1));
850     assert(trx->global_seqno() > STATE_SEQNO());
851 
852     wsrep_status_t retval(WSREP_OK);
853 
854     switch (trx->state())
855     {
856     case TrxHandle::S_MUST_CERT_AND_REPLAY:
857         retval = cert_and_catch(trx);
858         if (retval != WSREP_OK)
859         {
860             // apply monitor is self canceled in cert
861             break;
862         }
863         trx->set_state(TrxHandle::S_MUST_REPLAY_AM);
864         // fall through
865     case TrxHandle::S_MUST_REPLAY_AM:
866     {
867         // safety measure to make sure that all preceding trxs finish before
868         // replaying
869         trx->set_depends_seqno(trx->global_seqno() - 1);
870         ApplyOrder ao(*trx);
871         gu_trace(apply_monitor_.enter(ao));
872         trx->set_state(TrxHandle::S_MUST_REPLAY_CM);
873     }
874     // fall through
875     case TrxHandle::S_MUST_REPLAY_CM:
876         if (co_mode_ != CommitOrder::BYPASS)
877         {
878             CommitOrder co(*trx, co_mode_);
879             gu_trace(commit_monitor_.enter(co));
880         }
881         trx->set_state(TrxHandle::S_MUST_REPLAY);
882         // fall through
883     case TrxHandle::S_MUST_REPLAY:
884         ++local_replays_;
885         trx->set_state(TrxHandle::S_REPLAYING);
886 
887         try
888         {
889             wsrep_trx_meta_t meta = {{state_uuid_, trx->global_seqno() },
890                                      trx->depends_seqno()};
891 
892             gu_trace(apply_trx_ws(trx_ctx, apply_cb_, commit_cb_, *trx, meta));
893 
894             wsrep_bool_t unused(false);
895             wsrep_cb_status_t rcode(
896                 commit_cb_(
897                     trx_ctx,
898                     TrxHandle::trx_flags_to_wsrep_flags(trx->flags()),
899                     &meta,
900                     &unused,
901                     true));
902 
903             if (gu_unlikely(rcode != WSREP_CB_SUCCESS))
904                 gu_throw_fatal << "Commit failed. Trx: " << trx;
905         }
906         catch (gu::Exception& e)
907         {
908             st_.mark_corrupt();
909             throw;
910         }
911 
912         // apply, commit monitors are released in post commit
913         return WSREP_OK;
914     default:
915         gu_throw_fatal << "Invalid state in replay for trx " << *trx;
916     }
917 
918     log_debug << "replaying failed for trx " << *trx;
919     trx->set_state(TrxHandle::S_ABORTING);
920 
921     return retval;
922 }
923 
924 
post_commit(TrxHandle * trx)925 wsrep_status_t galera::ReplicatorSMM::post_commit(TrxHandle* trx)
926 {
927     if (trx->state() == TrxHandle::S_MUST_ABORT)
928     {
929         // This is possible in case of ALG: BF applier BF aborts
930         // trx that has already grabbed commit monitor and is committing.
931         // However, this should be acceptable assuming that commit
932         // operation does not reserve any more resources and is able
933         // to release already reserved resources.
934         log_debug << "trx was BF aborted during commit: " << *trx;
935         // manipulate state to avoid crash
936         trx->set_state(TrxHandle::S_MUST_REPLAY);
937         trx->set_state(TrxHandle::S_REPLAYING);
938     }
939     assert(trx->state() == TrxHandle::S_COMMITTING ||
940            trx->state() == TrxHandle::S_REPLAYING);
941     assert(trx->local_seqno() > -1 && trx->global_seqno() > -1);
942 
943     CommitOrder co(*trx, co_mode_);
944     if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.leave(co);
945 
946     ApplyOrder ao(*trx);
947     report_last_committed(cert_.set_trx_committed(trx));
948     apply_monitor_.leave(ao);
949 
950     trx->set_state(TrxHandle::S_COMMITTED);
951 
952     ++local_commits_;
953 
954     return WSREP_OK;
955 }
956 
957 
post_rollback(TrxHandle * trx)958 wsrep_status_t galera::ReplicatorSMM::post_rollback(TrxHandle* trx)
959 {
960     if (trx->state() == TrxHandle::S_MUST_ABORT)
961     {
962         trx->set_state(TrxHandle::S_ABORTING);
963     }
964 
965     assert(trx->state() == TrxHandle::S_ABORTING ||
966            trx->state() == TrxHandle::S_EXECUTING);
967 
968     trx->set_state(TrxHandle::S_ROLLED_BACK);
969 
970     // Trx was either rolled back by user or via certification failure,
971     // last committed report not needed since cert index state didn't change.
972     // report_last_committed();
973     ++local_rollbacks_;
974 
975     return WSREP_OK;
976 }
977 
978 
causal_read(wsrep_gtid_t * gtid)979 wsrep_status_t galera::ReplicatorSMM::causal_read(wsrep_gtid_t* gtid)
980 {
981     wsrep_seqno_t cseq;
982     gu::datetime::Date wait_until(gu::datetime::Date::calendar() +
983                                   causal_read_timeout_);
984 
985     try
986     {
987         gcs_.caused(cseq, wait_until);
988         assert(cseq >= 0);
989     }
990     catch (gu::Exception& e)
991     {
992         log_warn << "gcs_caused() returned " << -e.get_errno()
993                  << " (" << strerror(e.get_errno()) << ")";
994         return WSREP_TRX_FAIL;
995     }
996 
997     try
998     {
999         // @note: Using timed wait for monitor is currently a hack
1000         // to avoid deadlock resulting from race between monitor wait
1001         // and drain during configuration change. Instead of this,
1002         // monitor should have proper mechanism to interrupt waiters
1003         // at monitor drain and disallowing further waits until
1004         // configuration change related operations (SST etc) have been
1005         // finished.
1006         if (gu_likely(co_mode_ != CommitOrder::BYPASS))
1007         {
1008             commit_monitor_.wait(cseq, wait_until);
1009         }
1010         else
1011         {
1012             apply_monitor_.wait(cseq, wait_until);
1013         }
1014         if (gtid != 0)
1015         {
1016             gtid->uuid = state_uuid_;
1017             gtid->seqno = cseq;
1018         }
1019         ++causal_reads_;
1020         return WSREP_OK;
1021     }
1022     catch (gu::Exception& e)
1023     {
1024         log_debug << "monitor wait failed for causal read: " << e.what();
1025         return WSREP_TRX_FAIL;
1026     }
1027 }
1028 
1029 
to_isolation_begin(TrxHandle * trx,wsrep_trx_meta_t * meta)1030 wsrep_status_t galera::ReplicatorSMM::to_isolation_begin(TrxHandle*        trx,
1031                                                          wsrep_trx_meta_t* meta)
1032 {
1033     if (meta != 0)
1034     {
1035         meta->gtid.uuid  = state_uuid_;
1036         meta->gtid.seqno = trx->global_seqno();
1037         meta->depends_on = trx->depends_seqno();
1038     }
1039 
1040     assert(trx->state() == TrxHandle::S_REPLICATING);
1041     assert(trx->trx_id() == static_cast<wsrep_trx_id_t>(-1));
1042     assert(trx->local_seqno() > -1 && trx->global_seqno() > -1);
1043     assert(trx->global_seqno() > STATE_SEQNO());
1044 
1045     wsrep_status_t retval;
1046     switch ((retval = cert_and_catch(trx)))
1047     {
1048     case WSREP_OK:
1049     {
1050         ApplyOrder ao(*trx);
1051         CommitOrder co(*trx, co_mode_);
1052 
1053         gu_trace(apply_monitor_.enter(ao));
1054 
1055         if (co_mode_ != CommitOrder::BYPASS)
1056             try
1057             {
1058                 commit_monitor_.enter(co);
1059             }
1060             catch (...)
1061             {
1062                 gu_throw_fatal << "unable to enter commit monitor: " << *trx;
1063             }
1064 
1065         trx->set_state(TrxHandle::S_APPLYING);
1066         log_debug << "Executing TO isolated action: " << *trx;
1067         st_.mark_unsafe();
1068         break;
1069     }
1070     case WSREP_TRX_FAIL:
1071         // Apply monitor is released in cert() in case of failure.
1072         trx->set_state(TrxHandle::S_ABORTING);
1073         break;
1074     default:
1075         log_error << "unrecognized retval "
1076                   << retval
1077                   << " for to isolation certification for "
1078                   << *trx;
1079         retval = WSREP_FATAL;
1080         break;
1081     }
1082 
1083     return retval;
1084 }
1085 
1086 
to_isolation_end(TrxHandle * trx)1087 wsrep_status_t galera::ReplicatorSMM::to_isolation_end(TrxHandle* trx)
1088 {
1089     assert(trx->state() == TrxHandle::S_APPLYING);
1090 
1091     log_debug << "Done executing TO isolated action: " << *trx;
1092 
1093     CommitOrder co(*trx, co_mode_);
1094     if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.leave(co);
1095     ApplyOrder ao(*trx);
1096     report_last_committed(cert_.set_trx_committed(trx));
1097     apply_monitor_.leave(ao);
1098 
1099     st_.mark_safe();
1100 
1101     return WSREP_OK;
1102 }
1103 
1104 namespace galera
1105 {
1106 
1107 static WriteSetOut*
writeset_from_handle(wsrep_po_handle_t & handle,const TrxHandle::Params & trx_params)1108 writeset_from_handle (wsrep_po_handle_t& handle,
1109                       const TrxHandle::Params& trx_params)
1110 {
1111     WriteSetOut* ret = reinterpret_cast<WriteSetOut*>(handle.opaque);
1112 
1113     if (NULL == ret)
1114     {
1115         try
1116         {
1117             ret = new WriteSetOut(
1118 //                gu::String<256>(trx_params.working_dir_) << '/' << &handle,
1119                 trx_params.working_dir_, wsrep_trx_id_t(&handle),
1120                 /* key format is not essential since we're not adding keys */
1121                 KeySet::version(trx_params.key_format_), NULL, 0, 0,
1122                 trx_params.record_set_ver_,
1123                 WriteSetNG::MAX_VERSION, DataSet::MAX_VERSION, DataSet::MAX_VERSION,
1124                 trx_params.max_write_set_size_);
1125 
1126             handle.opaque = ret;
1127         }
1128         catch (std::bad_alloc& ba)
1129         {
1130             gu_throw_error(ENOMEM) << "Could not create WriteSetOut";
1131         }
1132     }
1133 
1134     return ret;
1135 }
1136 
1137 } /* namespace galera */
1138 
1139 wsrep_status_t
preordered_collect(wsrep_po_handle_t & handle,const struct wsrep_buf * const data,size_t const count,bool const copy)1140 galera::ReplicatorSMM::preordered_collect(wsrep_po_handle_t&            handle,
1141                                           const struct wsrep_buf* const data,
1142                                           size_t                  const count,
1143                                           bool                    const copy)
1144 {
1145     if (gu_unlikely(trx_params_.version_ < WS_NG_VERSION))
1146         return WSREP_NOT_IMPLEMENTED;
1147 
1148     WriteSetOut* const ws(writeset_from_handle(handle, trx_params_));
1149 
1150     for (size_t i(0); i < count; ++i)
1151     {
1152         ws->append_data(data[i].ptr, data[i].len, copy);
1153     }
1154 
1155     return WSREP_OK;
1156 }
1157 
1158 
1159 wsrep_status_t
preordered_commit(wsrep_po_handle_t & handle,const wsrep_uuid_t & source,uint64_t const flags,int const pa_range,bool const commit)1160 galera::ReplicatorSMM::preordered_commit(wsrep_po_handle_t&            handle,
1161                                          const wsrep_uuid_t&           source,
1162                                          uint64_t                const flags,
1163                                          int                     const pa_range,
1164                                          bool                    const commit)
1165 {
1166     if (gu_unlikely(trx_params_.version_ < WS_NG_VERSION))
1167         return WSREP_NOT_IMPLEMENTED;
1168 
1169     WriteSetOut* const ws(writeset_from_handle(handle, trx_params_));
1170 
1171     if (gu_likely(true == commit))
1172     {
1173         ws->set_flags (WriteSetNG::wsrep_flags_to_ws_flags(flags));
1174 
1175         /* by loooking at trx_id we should be able to detect gaps / lost events
1176          * (however resending is not implemented yet). Something like
1177          *
1178          * wsrep_trx_id_t const trx_id(cert_.append_preordered(source, ws));
1179          *
1180          * begs to be here. */
1181         wsrep_trx_id_t const trx_id(preordered_id_.add_and_fetch(1));
1182 
1183         WriteSetNG::GatherVector actv;
1184 
1185         size_t const actv_size(ws->gather(source, 0, trx_id, actv));
1186 
1187         ws->set_preordered (pa_range); // also adds CRC
1188 
1189         int rcode;
1190         do
1191         {
1192             rcode = gcs_.sendv(actv, actv_size, GCS_ACT_TORDERED, false);
1193         }
1194         while (rcode == -EAGAIN && (usleep(1000), true));
1195 
1196         if (rcode < 0)
1197             gu_throw_error(-rcode)
1198                 << "Replication of preordered writeset failed.";
1199     }
1200 
1201     delete ws;
1202     handle.opaque = NULL;
1203 
1204     return WSREP_OK;
1205 }
1206 
1207 
1208 wsrep_status_t
sst_sent(const wsrep_gtid_t & state_id,int const rcode)1209 galera::ReplicatorSMM::sst_sent(const wsrep_gtid_t& state_id, int const rcode)
1210 {
1211     assert (rcode <= 0);
1212     assert (rcode == 0 || state_id.seqno == WSREP_SEQNO_UNDEFINED);
1213     assert (rcode != 0 || state_id.seqno >= 0);
1214 
1215     if (state_() != S_DONOR)
1216     {
1217         log_error << "sst sent called when not SST donor, state " << state_();
1218         return WSREP_CONN_FAIL;
1219     }
1220 
1221     gcs_seqno_t seqno(rcode ? rcode : state_id.seqno);
1222 
1223     if (state_id.uuid != state_uuid_ && seqno >= 0)
1224     {
1225         // state we have sent no longer corresponds to the current group state
1226         // mark an error
1227         seqno = -EREMCHG;
1228     }
1229 
1230     try {
1231         gcs_.join(seqno);
1232         return WSREP_OK;
1233     }
1234     catch (gu::Exception& e)
1235     {
1236         log_error << "failed to recover from DONOR state: " << e.what();
1237         return WSREP_CONN_FAIL;
1238     }
1239 }
1240 
1241 
process_trx(void * recv_ctx,TrxHandle * trx)1242 void galera::ReplicatorSMM::process_trx(void* recv_ctx, TrxHandle* trx)
1243 {
1244     assert(recv_ctx != 0);
1245     assert(trx != 0);
1246     assert(trx->local_seqno() > 0);
1247     assert(trx->global_seqno() > 0);
1248     assert(trx->last_seen_seqno() >= 0);
1249     assert(trx->depends_seqno() == -1);
1250     assert(trx->state() == TrxHandle::S_REPLICATING);
1251 
1252     wsrep_status_t const retval(cert_and_catch(trx));
1253 
1254     switch (retval)
1255     {
1256     case WSREP_OK:
1257         try
1258         {
1259             gu_trace(apply_trx(recv_ctx, trx));
1260         }
1261         catch (std::exception& e)
1262         {
1263             st_.mark_corrupt();
1264 
1265             log_fatal << "Failed to apply trx: " << *trx;
1266             log_fatal << e.what();
1267             log_fatal << "Node consistency compromised, aborting...";
1268             abort();
1269         }
1270         break;
1271     case WSREP_TRX_FAIL:
1272         // certification failed, apply monitor has been canceled
1273         trx->set_state(TrxHandle::S_ABORTING);
1274         trx->set_state(TrxHandle::S_ROLLED_BACK);
1275         break;
1276     default:
1277         // this should not happen for remote actions
1278         gu_throw_error(EINVAL)
1279             << "unrecognized retval for remote trx certification: "
1280             << retval << " trx: " << *trx;
1281     }
1282 }
1283 
1284 
process_commit_cut(wsrep_seqno_t seq,wsrep_seqno_t seqno_l)1285 void galera::ReplicatorSMM::process_commit_cut(wsrep_seqno_t seq,
1286                                                wsrep_seqno_t seqno_l)
1287 {
1288     assert(seq > 0);
1289     assert(seqno_l > 0);
1290     LocalOrder lo(seqno_l);
1291 
1292     gu_trace(local_monitor_.enter(lo));
1293 
1294     if (seq >= cc_seqno_) /* Refs #782. workaround for
1295                            * assert(seqno >= seqno_released_) in gcache. */
1296         cert_.purge_trxs_upto(seq, true);
1297 
1298     local_monitor_.leave(lo);
1299     log_debug << "Got commit cut from GCS: " << seq;
1300 }
1301 
establish_protocol_versions(int proto_ver)1302 void galera::ReplicatorSMM::establish_protocol_versions (int proto_ver)
1303 {
1304     trx_params_.record_set_ver_ = gu::RecordSet::VER1;
1305 
1306     switch (proto_ver)
1307     {
1308     case 1:
1309         trx_params_.version_ = 1;
1310         str_proto_ver_ = 0;
1311         break;
1312     case 2:
1313         trx_params_.version_ = 1;
1314         str_proto_ver_ = 1;
1315         break;
1316     case 3:
1317     case 4:
1318         trx_params_.version_ = 2;
1319         str_proto_ver_ = 1;
1320         break;
1321     case 5:
1322         trx_params_.version_ = 3;
1323         str_proto_ver_ = 1;
1324         break;
1325     case 6:
1326         trx_params_.version_  = 3;
1327         str_proto_ver_ = 2; // gcs intelligent donor selection.
1328         // include handling dangling comma in donor string.
1329         break;
1330     case 7:
1331         // Protocol upgrade to handle IST SSL backwards compatibility,
1332         // no effect to TRX or STR protocols.
1333         trx_params_.version_ = 3;
1334         str_proto_ver_ = 2;
1335         break;
1336     case 8:
1337         // Protocol upgrade to enforce 8-byte alignment in writesets.
1338         trx_params_.version_ = 3;
1339         trx_params_.record_set_ver_ = gu::RecordSet::VER2;
1340         str_proto_ver_ = 2;
1341         break;
1342     case 9:
1343         // Protocol upgrade to enable support for semi-shared key type.
1344         trx_params_.version_ = 4;
1345         trx_params_.record_set_ver_ = gu::RecordSet::VER2;
1346         str_proto_ver_ = 2;
1347         break;
1348     default:
1349         log_fatal << "Configuration change resulted in an unsupported protocol "
1350             "version: " << proto_ver << ". Can't continue.";
1351         abort();
1352     };
1353 
1354     protocol_version_ = proto_ver;
1355     log_info << "REPL Protocols: " << protocol_version_ << " ("
1356               << trx_params_.version_ << ", " << str_proto_ver_ << ")";
1357 }
1358 
1359 static bool
app_wants_state_transfer(const void * const req,ssize_t const req_len)1360 app_wants_state_transfer (const void* const req, ssize_t const req_len)
1361 {
1362     return (req_len != (strlen(WSREP_STATE_TRANSFER_NONE) + 1) ||
1363             memcmp(req, WSREP_STATE_TRANSFER_NONE, req_len));
1364 }
1365 
1366 void
update_incoming_list(const wsrep_view_info_t & view)1367 galera::ReplicatorSMM::update_incoming_list(const wsrep_view_info_t& view)
1368 {
1369     static char const separator(',');
1370 
1371     ssize_t new_size(0);
1372 
1373     if (view.memb_num > 0)
1374     {
1375         new_size += view.memb_num - 1; // separators
1376 
1377         for (int i = 0; i < view.memb_num; ++i)
1378         {
1379             new_size += strlen(view.members[i].incoming);
1380         }
1381     }
1382 
1383     gu::Lock lock(incoming_mutex_);
1384 
1385     incoming_list_.clear();
1386     incoming_list_.resize(new_size);
1387 
1388     if (new_size <= 0) return;
1389 
1390     incoming_list_ = view.members[0].incoming;
1391 
1392     for (int i = 1; i < view.memb_num; ++i)
1393     {
1394         incoming_list_ += separator;
1395         incoming_list_ += view.members[i].incoming;
1396     }
1397 }
1398 
1399 void
process_conf_change(void * recv_ctx,const wsrep_view_info_t & view_info,int repl_proto,State next_state,wsrep_seqno_t seqno_l)1400 galera::ReplicatorSMM::process_conf_change(void*                    recv_ctx,
1401                                            const wsrep_view_info_t& view_info,
1402                                            int                      repl_proto,
1403                                            State                    next_state,
1404                                            wsrep_seqno_t            seqno_l)
1405 {
1406     assert(repl_proto >= 0 || view_info.status != WSREP_VIEW_PRIMARY);
1407     assert(seqno_l > -1);
1408 
1409     update_incoming_list(view_info);
1410 
1411     LocalOrder lo(seqno_l);
1412     gu_trace(local_monitor_.enter(lo));
1413 
1414     wsrep_seqno_t const upto(cert_.position());
1415 
1416     if (view_info.status == WSREP_VIEW_PRIMARY)
1417     {
1418         safe_to_bootstrap_ = (view_info.memb_num == 1);
1419     }
1420 
1421     apply_monitor_.drain(upto);
1422 
1423     if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(upto);
1424 
1425     if (view_info.my_idx >= 0)
1426     {
1427         uuid_ = view_info.members[view_info.my_idx].id;
1428     }
1429 
1430     bool const          st_required(state_transfer_required(view_info));
1431     wsrep_seqno_t const group_seqno(view_info.state_id.seqno);
1432     const wsrep_uuid_t& group_uuid (view_info.state_id.uuid);
1433 
1434     if (st_required)
1435     {
1436         log_info << "State transfer required: "
1437                  << "\n\tGroup state: " << group_uuid << ":" << group_seqno
1438                  << "\n\tLocal state: " << state_uuid_<< ":" << STATE_SEQNO();
1439 
1440         if (S_CONNECTED != state_()) state_.shift_to(S_CONNECTED);
1441     }
1442 
1443     // must establish protocols before calling view_cb()
1444     if (view_info.view >= 0) establish_protocol_versions (repl_proto);
1445 
1446     void*  app_req(0);
1447     size_t app_req_len(0);
1448 
1449     const_cast<wsrep_view_info_t&>(view_info).state_gap = st_required;
1450     wsrep_cb_status_t const rcode(
1451         view_cb_(app_ctx_, recv_ctx, &view_info, 0, 0, &app_req, &app_req_len));
1452 
1453     if (WSREP_CB_SUCCESS != rcode)
1454     {
1455         assert(app_req_len <= 0);
1456         log_fatal << "View callback failed. This is unrecoverable, "
1457                   << "restart required.";
1458         close();
1459         abort();
1460     }
1461     else if (st_required && 0 == app_req_len && state_uuid_ != group_uuid)
1462     {
1463         log_fatal << "Local state UUID " << state_uuid_
1464                   << " is different from group state UUID " << group_uuid
1465                   << ", and SST request is null: restart required.";
1466         close();
1467         abort();
1468     }
1469 
1470     if (view_info.view >= 0) // Primary configuration
1471     {
1472         GU_DBUG_SYNC_WAIT("process_primary_configuration");
1473 
1474         // we have to reset cert initial position here, SST does not contain
1475         // cert index yet (see #197).
1476         // Also this must be done before releasing GCache buffers.
1477         cert_.assign_initial_position(group_seqno, trx_params_.version_);
1478 
1479         if (STATE_SEQNO() > 0) service_thd_.release_seqno(STATE_SEQNO());
1480         // make sure all gcache buffers are released
1481 
1482         // at this point there is no ongoing master or slave transactions
1483         // and no new requests to service thread should be possible
1484         service_thd_.flush();             // make sure service thd is idle
1485 
1486         // record state seqno, needed for IST on DONOR
1487         cc_seqno_ = group_seqno;
1488 
1489         bool const app_wants_st(app_wants_state_transfer(app_req, app_req_len));
1490 
1491         if (st_required && app_wants_st)
1492         {
1493             // GCache::Seqno_reset() happens here
1494             request_state_transfer (recv_ctx,
1495                                     group_uuid, group_seqno, app_req,
1496                                     app_req_len);
1497         }
1498         else
1499         {
1500             if (view_info.view == 1 || !app_wants_st)
1501             {
1502                 update_state_uuid (group_uuid);
1503                 gcache_.seqno_reset(to_gu_uuid(group_uuid), group_seqno);
1504                 apply_monitor_.set_initial_position(group_seqno);
1505                 if (co_mode_ != CommitOrder::BYPASS)
1506                     commit_monitor_.set_initial_position(group_seqno);
1507             }
1508 
1509             if (state_() == S_CONNECTED || state_() == S_DONOR)
1510             {
1511                 switch (next_state)
1512                 {
1513                 case S_JOINING:
1514                     state_.shift_to(S_JOINING);
1515                     break;
1516                 case S_DONOR:
1517                     if (state_() == S_CONNECTED)
1518                     {
1519                         state_.shift_to(S_DONOR);
1520                     }
1521                     break;
1522                 case S_JOINED:
1523                     state_.shift_to(S_JOINED);
1524                     break;
1525                 case S_SYNCED:
1526                     state_.shift_to(S_SYNCED);
1527                     synced_cb_(app_ctx_);
1528                     break;
1529                 default:
1530                     log_debug << "next_state " << next_state;
1531                     break;
1532                 }
1533             }
1534 
1535             st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
1536         }
1537 
1538         if (state_() == S_JOINING && sst_state_ != SST_NONE)
1539         {
1540             /* There are two reasons we can be here:
1541              * 1) we just got state transfer in request_state_transfer() above;
1542              * 2) we failed here previously (probably due to partition).
1543              */
1544             try {
1545                 gcs_.join(sst_seqno_);
1546                 sst_state_ = SST_JOIN_SENT;
1547             }
1548             catch (gu::Exception& e)
1549             {
1550                 log_error << "Failed to JOIN the cluster after SST";
1551             }
1552         }
1553     }
1554     else
1555     {
1556         // Non-primary configuration
1557         if (state_uuid_ != WSREP_UUID_UNDEFINED && next_state == S_CLOSING)
1558         {
1559             st_.set (state_uuid_, STATE_SEQNO(), safe_to_bootstrap_);
1560         }
1561 
1562         if (next_state != S_CONNECTED && next_state != S_CLOSING)
1563         {
1564             log_fatal << "Internal error: unexpected next state for "
1565                       << "non-prim: " << next_state << ". Restart required.";
1566             close();
1567             abort();
1568         }
1569 
1570         state_.shift_to(next_state);
1571     }
1572 
1573     local_monitor_.leave(lo);
1574     gcs_.resume_recv();
1575     free(app_req);
1576 }
1577 
1578 
process_join(wsrep_seqno_t seqno_j,wsrep_seqno_t seqno_l)1579 void galera::ReplicatorSMM::process_join(wsrep_seqno_t seqno_j,
1580                                          wsrep_seqno_t seqno_l)
1581 {
1582     LocalOrder lo(seqno_l);
1583 
1584     gu_trace(local_monitor_.enter(lo));
1585 
1586     wsrep_seqno_t const upto(cert_.position());
1587 
1588     apply_monitor_.drain(upto);
1589 
1590     if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(upto);
1591 
1592     if (seqno_j < 0 && S_JOINING == state_())
1593     {
1594         // #595, @todo: find a way to re-request state transfer
1595         log_fatal << "Failed to receive state transfer: " << seqno_j
1596                   << " (" << strerror (-seqno_j) << "), need to restart.";
1597         abort();
1598     }
1599     else
1600     {
1601         state_.shift_to(S_JOINED);
1602         sst_state_ = SST_NONE;
1603     }
1604 
1605     local_monitor_.leave(lo);
1606 }
1607 
1608 
process_sync(wsrep_seqno_t seqno_l)1609 void galera::ReplicatorSMM::process_sync(wsrep_seqno_t seqno_l)
1610 {
1611     LocalOrder lo(seqno_l);
1612 
1613     gu_trace(local_monitor_.enter(lo));
1614 
1615     wsrep_seqno_t const upto(cert_.position());
1616 
1617     apply_monitor_.drain(upto);
1618 
1619     if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(upto);
1620 
1621     state_.shift_to(S_SYNCED);
1622     synced_cb_(app_ctx_);
1623     local_monitor_.leave(lo);
1624 }
1625 
pause()1626 wsrep_seqno_t galera::ReplicatorSMM::pause()
1627 {
1628     // Grab local seqno for local_monitor_
1629     wsrep_seqno_t const local_seqno(
1630         static_cast<wsrep_seqno_t>(gcs_.local_sequence()));
1631     LocalOrder lo(local_seqno);
1632     local_monitor_.enter(lo);
1633 
1634     // Local monitor should take care that concurrent
1635     // pause requests are enqueued
1636     assert(pause_seqno_ == WSREP_SEQNO_UNDEFINED);
1637     pause_seqno_ = local_seqno;
1638 
1639     // Get drain seqno from cert index
1640     wsrep_seqno_t const upto(cert_.position());
1641     apply_monitor_.drain(upto);
1642     assert (apply_monitor_.last_left() >= upto);
1643 
1644     if (co_mode_ != CommitOrder::BYPASS)
1645     {
1646         commit_monitor_.drain(upto);
1647         assert (commit_monitor_.last_left() >= upto);
1648         assert (commit_monitor_.last_left() == apply_monitor_.last_left());
1649     }
1650 
1651     wsrep_seqno_t const ret(STATE_SEQNO());
1652     st_.set(state_uuid_, ret, safe_to_bootstrap_);
1653 
1654     log_info << "Provider paused at " << state_uuid_ << ':' << ret
1655              << " (" << pause_seqno_ << ")";
1656 
1657     return ret;
1658 }
1659 
resume()1660 void galera::ReplicatorSMM::resume()
1661 {
1662     if (pause_seqno_ == WSREP_SEQNO_UNDEFINED)
1663     {
1664         log_warn << "tried to resume unpaused provider";
1665         return;
1666     }
1667 
1668     st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
1669     log_info << "resuming provider at " << pause_seqno_;
1670     LocalOrder lo(pause_seqno_);
1671     pause_seqno_ = WSREP_SEQNO_UNDEFINED;
1672     local_monitor_.leave(lo);
1673     log_info << "Provider resumed.";
1674 }
1675 
desync()1676 void galera::ReplicatorSMM::desync()
1677 {
1678     wsrep_seqno_t seqno_l;
1679 
1680     ssize_t const ret(gcs_.desync(&seqno_l));
1681 
1682     if (seqno_l > 0)
1683     {
1684         LocalOrder lo(seqno_l); // need to process it regardless of ret value
1685 
1686         if (ret == 0)
1687         {
1688 /* #706 - the check below must be state request-specific. We are not holding
1689           any locks here and must be able to wait like any other action.
1690           However practice may prove different, leaving it here as a reminder.
1691             if (local_monitor_.would_block(seqno_l))
1692             {
1693                 gu_throw_error (-EDEADLK) << "Ran out of resources waiting to "
1694                                           << "desync the node. "
1695                                           << "The node must be restarted.";
1696             }
1697 */
1698             local_monitor_.enter(lo);
1699             if (state_() != S_DONOR) state_.shift_to(S_DONOR);
1700             local_monitor_.leave(lo);
1701         }
1702         else
1703         {
1704             local_monitor_.self_cancel(lo);
1705         }
1706     }
1707 
1708     if (ret)
1709     {
1710         gu_throw_error (-ret) << "Node desync failed.";
1711     }
1712 }
1713 
resync()1714 void galera::ReplicatorSMM::resync()
1715 {
1716     gcs_.join(commit_monitor_.last_left());
1717 }
1718 
1719 
1720 //////////////////////////////////////////////////////////////////////
1721 //////////////////////////////////////////////////////////////////////
1722 ////                           Private
1723 //////////////////////////////////////////////////////////////////////
1724 //////////////////////////////////////////////////////////////////////
1725 
1726 /* don't use this directly, use cert_and_catch() instead */
1727 inline
cert(TrxHandle * trx)1728 wsrep_status_t galera::ReplicatorSMM::cert(TrxHandle* trx)
1729 {
1730     assert(trx->state() == TrxHandle::S_REPLICATING ||
1731            trx->state() == TrxHandle::S_MUST_CERT_AND_REPLAY);
1732 
1733     assert(trx->local_seqno()     != WSREP_SEQNO_UNDEFINED);
1734     assert(trx->global_seqno()    != WSREP_SEQNO_UNDEFINED);
1735     assert(trx->last_seen_seqno() >= 0);
1736     assert(trx->last_seen_seqno() < trx->global_seqno());
1737 
1738     trx->set_state(TrxHandle::S_CERTIFYING);
1739 
1740     LocalOrder  lo(*trx);
1741     ApplyOrder  ao(*trx);
1742     CommitOrder co(*trx, co_mode_);
1743 
1744     bool interrupted(false);
1745 
1746     try
1747     {
1748         gu_trace(local_monitor_.enter(lo));
1749     }
1750     catch (gu::Exception& e)
1751     {
1752         if (e.get_errno() == EINTR) { interrupted = true; }
1753         else throw;
1754     }
1755 
1756     wsrep_status_t retval(WSREP_OK);
1757     // IST should have drained the monitors, so STATE_SEQNO() should be current
1758     bool const applicable(trx->global_seqno() > STATE_SEQNO());
1759 
1760     if (!applicable)
1761     {
1762         // this can happen after state transfer position has been submitted
1763         // but not all actions preceding it have been processed.
1764         //
1765         // Cert index preload after SST:
1766         // ----------------------------
1767         // If the trx global seqno is in the half open range
1768         // (cc_seqno_ , sst_seqno_], the write set was contained in the SST.
1769         // In this case do the certification for trx to populate the index,
1770         // but ignore the result. Always set state as S_MUST_ABORT and
1771         // return WSREP_TRX_FAIL to make calling code to discard this trx.
1772         if (last_st_type_ == ST_TYPE_SST &&
1773             cc_seqno_ < trx->global_seqno() &&
1774             trx->global_seqno() <= sst_seqno_)
1775         {
1776             (void)cert_.append_trx(trx);
1777             trx->verify_checksum();
1778             gcache_.seqno_assign (trx->action(),
1779                                   trx->global_seqno(),
1780                                   trx->depends_seqno());
1781             cert_.set_trx_committed(trx);
1782         }
1783         else
1784         {
1785             gcache_.free(const_cast<void*>(trx->action()));
1786         }
1787         trx->set_state(TrxHandle::S_MUST_ABORT);
1788         if (interrupted)
1789             local_monitor_.self_cancel(lo);
1790         else
1791             local_monitor_.leave(lo);
1792         return WSREP_TRX_FAIL;
1793     }
1794 
1795     if (gu_likely (!interrupted))
1796     {
1797         switch (cert_.append_trx(trx))
1798         {
1799         case Certification::TEST_OK:
1800             if (trx->state() == TrxHandle::S_CERTIFYING)
1801             {
1802                 retval = WSREP_OK;
1803             }
1804             else
1805             {
1806                 assert(trx->state() == TrxHandle::S_MUST_ABORT);
1807                 trx->set_state(TrxHandle::S_MUST_REPLAY_AM);
1808                 retval = WSREP_BF_ABORT;
1809             }
1810             break;
1811         case Certification::TEST_FAILED:
1812             if (gu_unlikely(trx->is_toi())) // small sanity check
1813             {
1814                 // may happen on configuration change
1815                 log_warn << "Certification failed for TO isolated action: "
1816                          << *trx;
1817                 assert(0);
1818             }
1819             local_cert_failures_ += trx->is_local();
1820             trx->set_state(TrxHandle::S_MUST_ABORT);
1821             retval = WSREP_TRX_FAIL;
1822             break;
1823         }
1824 
1825         if (gu_unlikely(WSREP_TRX_FAIL == retval))
1826         {
1827             report_last_committed(cert_.set_trx_committed(trx));
1828         }
1829 
1830         // at this point we are about to leave local_monitor_. Make sure
1831         // trx checksum was alright before that.
1832         trx->verify_checksum();
1833 
1834         // we must do it 'in order' for std::map reasons, so keeping
1835         // it inside the monitor
1836         gcache_.seqno_assign (trx->action(),
1837                               trx->global_seqno(),
1838                               trx->depends_seqno());
1839 
1840         local_monitor_.leave(lo);
1841     }
1842     else
1843     {
1844         retval = cert_for_aborted(trx);
1845 
1846         if (WSREP_TRX_FAIL == retval)
1847         {
1848             local_monitor_.self_cancel(lo);
1849         }
1850         else
1851         {
1852             assert(WSREP_BF_ABORT == retval);
1853         }
1854     }
1855 
1856     if (gu_unlikely(WSREP_TRX_FAIL == retval))
1857     {
1858         // applicable but failed certification: self-cancel monitors
1859         apply_monitor_.self_cancel(ao);
1860         if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.self_cancel(co);
1861     }
1862 
1863     assert(applicable);
1864 
1865     return retval;
1866 }
1867 
1868 /* pretty much any exception in cert() is fatal as it blocks local_monitor_ */
cert_and_catch(TrxHandle * trx)1869 wsrep_status_t galera::ReplicatorSMM::cert_and_catch(TrxHandle* trx)
1870 {
1871     try
1872     {
1873         return cert(trx);
1874     }
1875     catch (std::exception& e)
1876     {
1877         log_fatal << "Certification exception: " << e.what();
1878     }
1879     catch (...)
1880     {
1881         log_fatal << "Unknown certification exception";
1882     }
1883     abort();
1884 }
1885 
1886 /* This must be called BEFORE local_monitor_.self_cancel() due to
1887  * gcache_.seqno_assign() */
cert_for_aborted(TrxHandle * trx)1888 wsrep_status_t galera::ReplicatorSMM::cert_for_aborted(TrxHandle* trx)
1889 {
1890     Certification::TestResult const res(cert_.test(trx, false));
1891 
1892     switch (res)
1893     {
1894     case Certification::TEST_OK:
1895         trx->set_state(TrxHandle::S_MUST_CERT_AND_REPLAY);
1896         return WSREP_BF_ABORT;
1897 
1898     case Certification::TEST_FAILED:
1899         if (trx->state() != TrxHandle::S_MUST_ABORT)
1900         {
1901             trx->set_state(TrxHandle::S_MUST_ABORT);
1902         }
1903         // Mext step will be monitors release. Make sure that ws was not
1904         // corrupted and cert failure is real before procedeing with that.
1905         trx->verify_checksum();
1906         gcache_.seqno_assign (trx->action(), trx->global_seqno(), -1);
1907         return WSREP_TRX_FAIL;
1908 
1909     default:
1910         log_fatal << "Unexpected return value from Certification::test(): "
1911                   << res;
1912         abort();
1913     }
1914 }
1915 
1916 
1917 void
update_state_uuid(const wsrep_uuid_t & uuid)1918 galera::ReplicatorSMM::update_state_uuid (const wsrep_uuid_t& uuid)
1919 {
1920     if (state_uuid_ != uuid)
1921     {
1922         *(const_cast<wsrep_uuid_t*>(&state_uuid_)) = uuid;
1923 
1924         std::ostringstream os; os << state_uuid_;
1925         // Copy only non-nil terminated part of the source string
1926         // and terminate the string explicitly to silence a warning
1927         // generated by Wstringop-truncation
1928         char* str(const_cast<char*>(state_uuid_str_));
1929         strncpy(str, os.str().c_str(), sizeof(state_uuid_str_) - 1);
1930         str[sizeof(state_uuid_str_) - 1] = '\0';
1931     }
1932 
1933     st_.set(uuid, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
1934 }
1935 
1936 void
abort()1937 galera::ReplicatorSMM::abort()
1938 {
1939     gcs_.close();
1940     gu_abort();
1941 }
1942