1 //
2 // Copyright (C) 2010-2021 Codership Oy <info@codership.com>
3 //
4
5 #include "galera_common.hpp"
6 #include "replicator_smm.hpp"
7 #include "galera_exception.hpp"
8 #include "uuid.hpp"
9
10 #include "galera_info.hpp"
11
12 #include <gu_debug_sync.hpp>
13 #include <gu_abort.h>
14
15 #include <sstream>
16 #include <iostream>
17
18
19 static void
apply_trx_ws(void * recv_ctx,wsrep_apply_cb_t apply_cb,wsrep_commit_cb_t commit_cb,const galera::TrxHandle & trx,const wsrep_trx_meta_t & meta)20 apply_trx_ws(void* recv_ctx,
21 wsrep_apply_cb_t apply_cb,
22 wsrep_commit_cb_t commit_cb,
23 const galera::TrxHandle& trx,
24 const wsrep_trx_meta_t& meta)
25 {
26 using galera::TrxHandle;
27 static const size_t max_apply_attempts(4);
28 size_t attempts(1);
29
30 do
31 {
32 try
33 {
34 gu_trace(trx.apply(recv_ctx, apply_cb, meta));
35 break;
36 }
37 catch (galera::ApplyException& e)
38 {
39 if (trx.is_toi())
40 {
41 log_warn << "Ignoring error for TO isolated action: " << trx;
42 break;
43 }
44 else
45 {
46 int const err(e.status());
47
48 if (err > 0)
49 {
50 wsrep_bool_t unused(false);
51 wsrep_cb_status const rcode(
52 commit_cb(
53 recv_ctx,
54 TrxHandle::trx_flags_to_wsrep_flags(trx.flags()),
55 &meta,
56 &unused,
57 false));
58 if (WSREP_CB_SUCCESS != rcode)
59 {
60 gu_throw_fatal << "Rollback failed. Trx: " << trx;
61 }
62
63 ++attempts;
64
65 if (attempts <= max_apply_attempts)
66 {
67 log_warn << e.what()
68 << "\nRetrying " << attempts << "th time";
69 }
70 }
71 else
72 {
73 GU_TRACE(e);
74 throw;
75 }
76 }
77 }
78 }
79 while (attempts <= max_apply_attempts);
80
81 if (gu_unlikely(attempts > max_apply_attempts))
82 {
83 std::ostringstream msg;
84
85 msg << "Failed to apply trx " << trx.global_seqno() << " "
86 << max_apply_attempts << " times";
87
88 throw galera::ApplyException(msg.str(), WSREP_CB_FAILURE);
89 }
90
91 return;
92 }
93
94
operator <<(std::ostream & os,ReplicatorSMM::State state)95 std::ostream& galera::operator<<(std::ostream& os, ReplicatorSMM::State state)
96 {
97 switch (state)
98 {
99 case ReplicatorSMM::S_DESTROYED: return (os << "DESTROYED");
100 case ReplicatorSMM::S_CLOSED: return (os << "CLOSED");
101 case ReplicatorSMM::S_CLOSING: return (os << "CLOSING");
102 case ReplicatorSMM::S_CONNECTED: return (os << "CONNECTED");
103 case ReplicatorSMM::S_JOINING: return (os << "JOINING");
104 case ReplicatorSMM::S_JOINED: return (os << "JOINED");
105 case ReplicatorSMM::S_SYNCED: return (os << "SYNCED");
106 case ReplicatorSMM::S_DONOR: return (os << "DONOR");
107 }
108
109 gu_throw_fatal << "invalid state " << static_cast<int>(state);
110 }
111
112 //////////////////////////////////////////////////////////////////////
113 //////////////////////////////////////////////////////////////////////
114 // Public
115 //////////////////////////////////////////////////////////////////////
116 //////////////////////////////////////////////////////////////////////
117
ReplicatorSMM(const struct wsrep_init_args * args)118 galera::ReplicatorSMM::ReplicatorSMM(const struct wsrep_init_args* args)
119 :
120 init_lib_ (reinterpret_cast<gu_log_cb_t>(args->logger_cb)),
121 config_ (),
122 init_config_ (config_, args->node_address, args->data_dir),
123 parse_options_ (*this, config_, args->options),
124 init_ssl_ (config_),
125 str_proto_ver_ (-1),
126 protocol_version_ (-1),
127 proto_max_ (gu::from_string<int>(config_.get(Param::proto_max))),
128 state_ (S_CLOSED),
129 sst_state_ (SST_NONE),
130 co_mode_ (CommitOrder::from_string(
131 config_.get(Param::commit_order))),
132 state_file_ (config_.get(BASE_DIR)+'/'+GALERA_STATE_FILE),
133 st_ (state_file_),
134 safe_to_bootstrap_ (true),
135 trx_params_ (config_.get(BASE_DIR), -1,
136 KeySet::version(config_.get(Param::key_format)),
137 TrxHandle::Defaults.record_set_ver_,
138 gu::from_string<int>(config_.get(
139 Param::max_write_set_size))),
140 uuid_ (WSREP_UUID_UNDEFINED),
141 state_uuid_ (WSREP_UUID_UNDEFINED),
142 state_uuid_str_ (),
143 cc_seqno_ (WSREP_SEQNO_UNDEFINED),
144 pause_seqno_ (WSREP_SEQNO_UNDEFINED),
145 app_ctx_ (args->app_ctx),
146 view_cb_ (args->view_handler_cb),
147 apply_cb_ (args->apply_cb),
148 commit_cb_ (args->commit_cb),
149 unordered_cb_ (args->unordered_cb),
150 sst_donate_cb_ (args->sst_donate_cb),
151 synced_cb_ (args->synced_cb),
152 sst_donor_ (),
153 sst_uuid_ (WSREP_UUID_UNDEFINED),
154 sst_seqno_ (WSREP_SEQNO_UNDEFINED),
155 sst_mutex_ (),
156 sst_cond_ (),
157 sst_retry_sec_ (1),
158 last_st_type_ (ST_TYPE_NONE),
159 gcache_ (config_, config_.get(BASE_DIR)),
160 gcs_ (config_, gcache_, proto_max_, args->proto_ver,
161 args->node_name, args->node_incoming),
162 service_thd_ (gcs_, gcache_),
163 slave_pool_ (sizeof(TrxHandle), 1024, "SlaveTrxHandle"),
164 as_ (0),
165 gcs_as_ (slave_pool_, gcs_, *this, gcache_),
166 ist_receiver_ (config_, slave_pool_, args->node_address),
167 ist_senders_ (gcs_, gcache_),
168 wsdb_ (),
169 cert_ (config_, service_thd_),
170 local_monitor_ (),
171 apply_monitor_ (),
172 commit_monitor_ (),
173 causal_read_timeout_(config_.get(Param::causal_read_timeout)),
174 receivers_ (),
175 replicated_ (),
176 replicated_bytes_ (),
177 keys_count_ (),
178 keys_bytes_ (),
179 data_bytes_ (),
180 unrd_bytes_ (),
181 local_commits_ (),
182 local_rollbacks_ (),
183 local_cert_failures_(),
184 local_replays_ (),
185 causal_reads_ (),
186 preordered_id_ (),
187 incoming_list_ (""),
188 incoming_mutex_ (),
189 wsrep_stats_ ()
190 {
191 // @todo add guards (and perhaps actions)
192 state_.add_transition(Transition(S_CLOSED, S_DESTROYED));
193 state_.add_transition(Transition(S_CLOSED, S_CONNECTED));
194 state_.add_transition(Transition(S_CLOSING, S_CLOSED));
195
196 state_.add_transition(Transition(S_CONNECTED, S_CLOSING));
197 state_.add_transition(Transition(S_CONNECTED, S_CONNECTED));
198 state_.add_transition(Transition(S_CONNECTED, S_JOINING));
199 // the following is possible only when bootstrapping new cluster
200 // (trivial wsrep_cluster_address)
201 state_.add_transition(Transition(S_CONNECTED, S_JOINED));
202 // the following are possible on PC remerge
203 state_.add_transition(Transition(S_CONNECTED, S_DONOR));
204 state_.add_transition(Transition(S_CONNECTED, S_SYNCED));
205
206 state_.add_transition(Transition(S_JOINING, S_CLOSING));
207 // the following is possible if one non-prim conf follows another
208 state_.add_transition(Transition(S_JOINING, S_CONNECTED));
209 state_.add_transition(Transition(S_JOINING, S_JOINED));
210
211 state_.add_transition(Transition(S_JOINED, S_CLOSING));
212 state_.add_transition(Transition(S_JOINED, S_CONNECTED));
213 state_.add_transition(Transition(S_JOINED, S_SYNCED));
214 // the following is possible if one desync() immediately follows another
215 state_.add_transition(Transition(S_JOINED, S_DONOR));
216
217 state_.add_transition(Transition(S_SYNCED, S_CLOSING));
218 state_.add_transition(Transition(S_SYNCED, S_CONNECTED));
219 state_.add_transition(Transition(S_SYNCED, S_DONOR));
220
221 state_.add_transition(Transition(S_DONOR, S_CLOSING));
222 state_.add_transition(Transition(S_DONOR, S_CONNECTED));
223 state_.add_transition(Transition(S_DONOR, S_JOINED));
224
225 local_monitor_.set_initial_position(0);
226
227 wsrep_uuid_t uuid;
228 wsrep_seqno_t seqno;
229
230 st_.get (uuid, seqno, safe_to_bootstrap_);
231
232 if (0 != args->state_id &&
233 args->state_id->uuid != WSREP_UUID_UNDEFINED &&
234 args->state_id->uuid == uuid &&
235 seqno == WSREP_SEQNO_UNDEFINED)
236 {
237 /* non-trivial recovery information provided on startup, and db is safe
238 * so use recovered seqno value */
239 seqno = args->state_id->seqno;
240 }
241
242 log_debug << "End state: " << uuid << ':' << seqno << " #################";
243
244 update_state_uuid (uuid);
245 gcache_.seqno_reset(to_gu_uuid(uuid), seqno);
246 // update gcache position to one supplied by app.
247
248 cc_seqno_ = seqno; // is it needed here?
249
250 // the following initialization is needed only to pass seqno to
251 // connect() call. Ideally this should be done only on receving conf change.
252 apply_monitor_.set_initial_position(seqno);
253 if (co_mode_ != CommitOrder::BYPASS)
254 commit_monitor_.set_initial_position(seqno);
255 cert_.assign_initial_position(seqno, trx_proto_ver());
256
257 build_stats_vars(wsrep_stats_);
258 }
259
~ReplicatorSMM()260 galera::ReplicatorSMM::~ReplicatorSMM()
261 {
262 log_info << "dtor state: " << state_();
263 switch (state_())
264 {
265 case S_CONNECTED:
266 case S_JOINING:
267 case S_JOINED:
268 case S_SYNCED:
269 case S_DONOR:
270 close();
271 // fall through
272 case S_CLOSING:
273 // @todo wait that all users have left the building
274 case S_CLOSED:
275 ist_senders_.cancel();
276 break;
277 case S_DESTROYED:
278 break;
279 }
280 }
281
282
connect(const std::string & cluster_name,const std::string & cluster_url,const std::string & state_donor,bool const bootstrap)283 wsrep_status_t galera::ReplicatorSMM::connect(const std::string& cluster_name,
284 const std::string& cluster_url,
285 const std::string& state_donor,
286 bool const bootstrap)
287 {
288 sst_donor_ = state_donor;
289 service_thd_.reset();
290
291 ssize_t err;
292 wsrep_status_t ret(WSREP_OK);
293 wsrep_seqno_t const seqno(STATE_SEQNO());
294 wsrep_uuid_t const gcs_uuid(seqno < 0 ? WSREP_UUID_UNDEFINED :state_uuid_);
295
296 log_info << "Setting initial position to " << gcs_uuid << ':' << seqno;
297
298 if ((bootstrap == true || cluster_url == "gcomm://")
299 && safe_to_bootstrap_ == false)
300 {
301 log_error << "It may not be safe to bootstrap the cluster from this node. "
302 << "It was not the last one to leave the cluster and may "
303 << "not contain all the updates. To force cluster bootstrap "
304 << "with this node, edit the grastate.dat file manually and "
305 << "set safe_to_bootstrap to 1 .";
306 ret = WSREP_NODE_FAIL;
307 }
308
309 if (ret == WSREP_OK &&
310 (err = gcs_.set_initial_position(gcs_uuid, seqno)) != 0)
311 {
312 log_error << "gcs init failed:" << strerror(-err);
313 ret = WSREP_NODE_FAIL;
314 }
315
316 if (ret == WSREP_OK &&
317 (err = gcs_.connect(cluster_name, cluster_url, bootstrap)) != 0)
318 {
319 log_error << "gcs connect failed: " << strerror(-err);
320 ret = WSREP_NODE_FAIL;
321 }
322
323 if (ret == WSREP_OK)
324 {
325 state_.shift_to(S_CONNECTED);
326 }
327
328 return ret;
329 }
330
331
close()332 wsrep_status_t galera::ReplicatorSMM::close()
333 {
334 if (state_() != S_CLOSED)
335 {
336 gcs_.close();
337 }
338
339 return WSREP_OK;
340 }
341
342
async_recv(void * recv_ctx)343 wsrep_status_t galera::ReplicatorSMM::async_recv(void* recv_ctx)
344 {
345 if (state_() == S_CLOSED || state_() == S_CLOSING)
346 {
347 log_error <<"async recv cannot start, provider in closed/closing state";
348 return WSREP_FATAL;
349 }
350
351 ++receivers_;
352 as_ = &gcs_as_;
353
354 bool exit_loop(false);
355 wsrep_status_t retval(WSREP_OK);
356
357 while (WSREP_OK == retval && state_() != S_CLOSING)
358 {
359 GU_DBUG_SYNC_EXECUTE("before_async_recv_process_sync", sleep(5););
360
361 ssize_t rc;
362
363 while (gu_unlikely((rc = as_->process(recv_ctx, exit_loop))
364 == -ECANCELED))
365 {
366 recv_IST(recv_ctx);
367 // hack: prevent fast looping until ist controlling thread
368 // resumes gcs prosessing
369 usleep(10000);
370 }
371
372 if (gu_unlikely(rc <= 0))
373 {
374 if (GcsActionSource::INCONSISTENCY_CODE == rc)
375 {
376 st_.mark_corrupt();
377 retval = WSREP_FATAL;
378 }
379 else
380 {
381 retval = WSREP_CONN_FAIL;
382 }
383 }
384 else if (gu_unlikely(exit_loop == true))
385 {
386 assert(WSREP_OK == retval);
387
388 if (receivers_.sub_and_fetch(1) > 0)
389 {
390 log_info << "Slave thread exiting on request.";
391 break;
392 }
393
394 ++receivers_;
395 log_warn << "Refusing exit for the last slave thread.";
396 }
397 }
398
399 /* exiting loop already did proper checks */
400 if (!exit_loop && receivers_.sub_and_fetch(1) == 0)
401 {
402 if (state_() != S_CLOSING)
403 {
404 if (retval == WSREP_OK)
405 {
406 log_warn << "Broken shutdown sequence, provider state: "
407 << state_() << ", retval: " << retval;
408 assert (0);
409 }
410 else
411 {
412 // Generate zero view before exit to notify application
413 wsrep_view_info_t* err_view(galera_view_info_create(0, false));
414 void* fake_sst_req(0);
415 size_t fake_sst_req_len(0);
416 view_cb_(app_ctx_, recv_ctx, err_view, 0, 0,
417 &fake_sst_req, &fake_sst_req_len);
418 free(err_view);
419 }
420 /* avoid abort in production */
421 state_.shift_to(S_CLOSING);
422 }
423 state_.shift_to(S_CLOSED);
424 }
425
426 log_debug << "Slave thread exit. Return code: " << retval;
427
428 return retval;
429 }
430
431
apply_trx(void * recv_ctx,TrxHandle * trx)432 void galera::ReplicatorSMM::apply_trx(void* recv_ctx, TrxHandle* trx)
433 {
434 assert(trx != 0);
435 assert(trx->global_seqno() > 0);
436 assert(trx->is_certified() == true);
437 assert(trx->global_seqno() > STATE_SEQNO());
438 assert(trx->is_local() == false);
439
440 ApplyOrder ao(*trx);
441 CommitOrder co(*trx, co_mode_);
442
443 gu_trace(apply_monitor_.enter(ao));
444 trx->set_state(TrxHandle::S_APPLYING);
445
446 wsrep_trx_meta_t meta = {{state_uuid_, trx->global_seqno() },
447 trx->depends_seqno()};
448
449 if (trx->is_toi())
450 {
451 log_debug << "Executing TO isolated action: " << *trx;
452 st_.mark_unsafe();
453 }
454
455 gu_trace(apply_trx_ws(recv_ctx, apply_cb_, commit_cb_, *trx, meta));
456 /* at this point any exception in apply_trx_ws() is fatal, not
457 * catching anything. */
458
459 if (gu_likely(co_mode_ != CommitOrder::BYPASS))
460 {
461 gu_trace(commit_monitor_.enter(co));
462 }
463 trx->set_state(TrxHandle::S_COMMITTING);
464
465 wsrep_bool_t exit_loop(false);
466 wsrep_cb_status_t const rcode(
467 commit_cb_(
468 recv_ctx,
469 TrxHandle::trx_flags_to_wsrep_flags(trx->flags()),
470 &meta,
471 &exit_loop,
472 true));
473
474 if (gu_unlikely (rcode != WSREP_CB_SUCCESS))
475 gu_throw_fatal << "Commit failed. Trx: " << trx;
476
477 if (gu_likely(co_mode_ != CommitOrder::BYPASS))
478 {
479 commit_monitor_.leave(co);
480 }
481 trx->set_state(TrxHandle::S_COMMITTED);
482
483 if (trx->local_seqno() != -1)
484 {
485 // trx with local seqno -1 originates from IST (or other source not gcs)
486 report_last_committed(cert_.set_trx_committed(trx));
487 }
488
489 /* For now need to keep it inside apply monitor to ensure all processing
490 * ends by the time monitors are drained because of potential gcache
491 * cleanup (and loss of the writeset buffer). Perhaps unordered monitor
492 * is needed here. */
493 trx->unordered(recv_ctx, unordered_cb_);
494
495 apply_monitor_.leave(ao);
496
497 if (trx->is_toi())
498 {
499 log_debug << "Done executing TO isolated action: "
500 << trx->global_seqno();
501 st_.mark_safe();
502 }
503
504 trx->set_exit_loop(exit_loop);
505 }
506
507
replicate(TrxHandle * trx,wsrep_trx_meta_t * meta)508 wsrep_status_t galera::ReplicatorSMM::replicate(TrxHandle* trx,
509 wsrep_trx_meta_t* meta)
510 {
511 if (state_() < S_JOINED) return WSREP_TRX_FAIL;
512
513 assert(trx->state() == TrxHandle::S_EXECUTING ||
514 trx->state() == TrxHandle::S_MUST_ABORT);
515 assert(trx->local_seqno() == WSREP_SEQNO_UNDEFINED &&
516 trx->global_seqno() == WSREP_SEQNO_UNDEFINED);
517
518 wsrep_status_t retval(WSREP_TRX_FAIL);
519
520 if (trx->state() == TrxHandle::S_MUST_ABORT)
521 {
522 must_abort:
523 trx->set_state(TrxHandle::S_ABORTING);
524 return retval;
525 }
526
527 WriteSetNG::GatherVector actv;
528
529 gcs_action act;
530 act.type = GCS_ACT_TORDERED;
531 #ifndef NDEBUG
532 act.seqno_g = GCS_SEQNO_ILL;
533 #endif
534
535 if (trx->new_version())
536 {
537 act.buf = NULL;
538 act.size = trx->write_set_out().gather(trx->source_id(),
539 trx->conn_id(),
540 trx->trx_id(),
541 actv);
542 }
543 else
544 {
545 trx->set_last_seen_seqno(last_committed());
546 assert (trx->last_seen_seqno() >= 0);
547 trx->flush(0);
548
549 const MappedBuffer& wscoll(trx->write_set_collection());
550
551 act.buf = &wscoll[0];
552 act.size = wscoll.size();
553
554 assert (act.buf != NULL);
555 assert (act.size > 0);
556 }
557
558 trx->set_state(TrxHandle::S_REPLICATING);
559
560 ssize_t rcode(-1);
561
562 do
563 {
564 assert(act.seqno_g == GCS_SEQNO_ILL);
565
566 const ssize_t gcs_handle(gcs_.schedule());
567
568 if (gu_unlikely(gcs_handle < 0))
569 {
570 log_debug << "gcs schedule " << strerror(-gcs_handle);
571 trx->set_state(TrxHandle::S_MUST_ABORT);
572 goto must_abort;
573 }
574
575 trx->set_gcs_handle(gcs_handle);
576
577 if (trx->new_version())
578 {
579 trx->set_last_seen_seqno(last_committed());
580 assert(trx->last_seen_seqno() >= 0);
581 trx->unlock();
582 assert (act.buf == NULL); // just a sanity check
583 rcode = gcs_.replv(actv, act, true);
584 }
585 else
586 {
587 assert(trx->last_seen_seqno() >= 0);
588 trx->unlock();
589 assert (act.buf != NULL);
590 rcode = gcs_.repl(act, true);
591 }
592
593 GU_DBUG_SYNC_WAIT("after_replicate_sync")
594 trx->lock();
595 }
596 while (rcode == -EAGAIN && trx->state() != TrxHandle::S_MUST_ABORT &&
597 (usleep(1000), true));
598
599 assert(trx->last_seen_seqno() >= 0);
600
601 if (rcode < 0)
602 {
603 if (rcode != -EINTR)
604 {
605 log_debug << "gcs_repl() failed with " << strerror(-rcode)
606 << " for trx " << *trx;
607 }
608
609 assert(rcode != -EINTR || trx->state() == TrxHandle::S_MUST_ABORT);
610 assert(act.seqno_l == GCS_SEQNO_ILL && act.seqno_g == GCS_SEQNO_ILL);
611 assert(NULL == act.buf || !trx->new_version());
612
613 if (trx->state() != TrxHandle::S_MUST_ABORT)
614 {
615 trx->set_state(TrxHandle::S_MUST_ABORT);
616 }
617
618 trx->set_gcs_handle(-1);
619 goto must_abort;
620 }
621
622 assert(act.buf != NULL);
623 assert(act.size == rcode);
624 assert(act.seqno_l != GCS_SEQNO_ILL);
625 assert(act.seqno_g != GCS_SEQNO_ILL);
626
627 ++replicated_;
628 replicated_bytes_ += rcode;
629 trx->set_gcs_handle(-1);
630
631 if (trx->new_version())
632 {
633 gu_trace(trx->unserialize(static_cast<const gu::byte_t*>(act.buf),
634 act.size, 0));
635 trx->update_stats(keys_count_, keys_bytes_, data_bytes_, unrd_bytes_);
636 }
637
638 trx->set_received(act.buf, act.seqno_l, act.seqno_g);
639
640 if (trx->state() == TrxHandle::S_MUST_ABORT)
641 {
642 retval = cert_for_aborted(trx);
643
644 if (retval != WSREP_BF_ABORT)
645 {
646 LocalOrder lo(*trx);
647 ApplyOrder ao(*trx);
648 CommitOrder co(*trx, co_mode_);
649 local_monitor_.self_cancel(lo);
650 apply_monitor_.self_cancel(ao);
651 if (co_mode_ !=CommitOrder::BYPASS) commit_monitor_.self_cancel(co);
652 }
653 else if (meta != 0)
654 {
655 meta->gtid.uuid = state_uuid_;
656 meta->gtid.seqno = trx->global_seqno();
657 meta->depends_on = trx->depends_seqno();
658 }
659
660 if (trx->state() == TrxHandle::S_MUST_ABORT) goto must_abort;
661 }
662 else
663 {
664 retval = WSREP_OK;
665 }
666
667 assert(trx->last_seen_seqno() >= 0);
668
669 return retval;
670 }
671
672 void
abort_trx(TrxHandle * trx)673 galera::ReplicatorSMM::abort_trx(TrxHandle* trx)
674 {
675 assert(trx != 0);
676 assert(trx->is_local() == true);
677
678 log_debug << "aborting trx " << *trx << " " << trx;
679
680
681 switch (trx->state())
682 {
683 case TrxHandle::S_MUST_ABORT:
684 case TrxHandle::S_ABORTING: // guess this is here because we can have a race
685 return;
686 case TrxHandle::S_EXECUTING:
687 trx->set_state(TrxHandle::S_MUST_ABORT);
688 break;
689 case TrxHandle::S_REPLICATING:
690 {
691 trx->set_state(TrxHandle::S_MUST_ABORT);
692 // trx is in gcs repl
693 int rc;
694 if (trx->gcs_handle() > 0 &&
695 ((rc = gcs_.interrupt(trx->gcs_handle()))) != 0)
696 {
697 log_debug << "gcs_interrupt(): handle "
698 << trx->gcs_handle()
699 << " trx id " << trx->trx_id()
700 << ": " << strerror(-rc);
701 }
702 break;
703 }
704 case TrxHandle::S_CERTIFYING:
705 {
706 trx->set_state(TrxHandle::S_MUST_ABORT);
707 // trx is waiting in local monitor
708 LocalOrder lo(*trx);
709 trx->unlock();
710 local_monitor_.interrupt(lo);
711 trx->lock();
712 break;
713 }
714 case TrxHandle::S_APPLYING:
715 {
716 trx->set_state(TrxHandle::S_MUST_ABORT);
717 // trx is waiting in apply monitor
718 ApplyOrder ao(*trx);
719 trx->unlock();
720 apply_monitor_.interrupt(ao);
721 trx->lock();
722 break;
723 }
724 case TrxHandle::S_COMMITTING:
725 trx->set_state(TrxHandle::S_MUST_ABORT);
726 if (co_mode_ != CommitOrder::BYPASS)
727 {
728 // trx waiting in commit monitor
729 CommitOrder co(*trx, co_mode_);
730 trx->unlock();
731 commit_monitor_.interrupt(co);
732 trx->lock();
733 }
734 break;
735 default:
736 gu_throw_fatal << "invalid state " << trx->state();
737 }
738 }
739
740
pre_commit(TrxHandle * trx,wsrep_trx_meta_t * meta)741 wsrep_status_t galera::ReplicatorSMM::pre_commit(TrxHandle* trx,
742 wsrep_trx_meta_t* meta)
743 {
744 assert(trx->state() == TrxHandle::S_REPLICATING);
745 assert(trx->local_seqno() > -1);
746 assert(trx->global_seqno() > -1);
747 assert(trx->last_seen_seqno() >= 0);
748
749 if (meta != 0)
750 {
751 meta->gtid.uuid = state_uuid_;
752 meta->gtid.seqno = trx->global_seqno();
753 meta->depends_on = trx->depends_seqno();
754 }
755 // State should not be checked here: If trx has been replicated,
756 // it has to be certified and potentially applied. #528
757 // if (state_() < S_JOINED) return WSREP_TRX_FAIL;
758
759 wsrep_status_t retval(cert_and_catch(trx));
760
761 if (gu_unlikely(retval != WSREP_OK))
762 {
763 assert(trx->state() == TrxHandle::S_MUST_ABORT ||
764 trx->state() == TrxHandle::S_MUST_REPLAY_AM ||
765 trx->state() == TrxHandle::S_MUST_CERT_AND_REPLAY);
766
767 if (trx->state() == TrxHandle::S_MUST_ABORT)
768 {
769 trx->set_state(TrxHandle::S_ABORTING);
770 }
771
772 return retval;
773 }
774
775 assert(trx->state() == TrxHandle::S_CERTIFYING);
776 assert(trx->global_seqno() > STATE_SEQNO());
777 trx->set_state(TrxHandle::S_APPLYING);
778
779 ApplyOrder ao(*trx);
780 CommitOrder co(*trx, co_mode_);
781 bool interrupted(false);
782
783 try
784 {
785 gu_trace(apply_monitor_.enter(ao));
786 }
787 catch (gu::Exception& e)
788 {
789 if (e.get_errno() == EINTR) { interrupted = true; }
790 else throw;
791 }
792
793 if (gu_unlikely(interrupted) || trx->state() == TrxHandle::S_MUST_ABORT)
794 {
795 assert(trx->state() == TrxHandle::S_MUST_ABORT);
796 if (interrupted) trx->set_state(TrxHandle::S_MUST_REPLAY_AM);
797 else trx->set_state(TrxHandle::S_MUST_REPLAY_CM);
798 retval = WSREP_BF_ABORT;
799 }
800 else if ((trx->flags() & TrxHandle::F_COMMIT) != 0)
801 {
802 trx->set_state(TrxHandle::S_COMMITTING);
803 if (co_mode_ != CommitOrder::BYPASS)
804 {
805 try
806 {
807 gu_trace(commit_monitor_.enter(co));
808 }
809 catch (gu::Exception& e)
810 {
811 if (e.get_errno() == EINTR) { interrupted = true; }
812 else throw;
813 }
814
815 if (gu_unlikely(interrupted) ||
816 trx->state() == TrxHandle::S_MUST_ABORT)
817 {
818 assert(trx->state() == TrxHandle::S_MUST_ABORT);
819 if (interrupted) trx->set_state(TrxHandle::S_MUST_REPLAY_CM);
820 else trx->set_state(TrxHandle::S_MUST_REPLAY);
821 retval = WSREP_BF_ABORT;
822 }
823 }
824 }
825 else
826 {
827 trx->set_state(TrxHandle::S_EXECUTING);
828 }
829
830 assert((retval == WSREP_OK && (trx->state() == TrxHandle::S_COMMITTING ||
831 trx->state() == TrxHandle::S_EXECUTING))
832 ||
833 (retval == WSREP_TRX_FAIL && trx->state() == TrxHandle::S_ABORTING)
834 ||
835 (retval == WSREP_BF_ABORT && (
836 trx->state() == TrxHandle::S_MUST_REPLAY_AM ||
837 trx->state() == TrxHandle::S_MUST_REPLAY_CM ||
838 trx->state() == TrxHandle::S_MUST_REPLAY)));
839
840 return retval;
841 }
842
replay_trx(TrxHandle * trx,void * trx_ctx)843 wsrep_status_t galera::ReplicatorSMM::replay_trx(TrxHandle* trx, void* trx_ctx)
844 {
845 assert(trx->state() == TrxHandle::S_MUST_CERT_AND_REPLAY ||
846 trx->state() == TrxHandle::S_MUST_REPLAY_AM ||
847 trx->state() == TrxHandle::S_MUST_REPLAY_CM ||
848 trx->state() == TrxHandle::S_MUST_REPLAY);
849 assert(trx->trx_id() != static_cast<wsrep_trx_id_t>(-1));
850 assert(trx->global_seqno() > STATE_SEQNO());
851
852 wsrep_status_t retval(WSREP_OK);
853
854 switch (trx->state())
855 {
856 case TrxHandle::S_MUST_CERT_AND_REPLAY:
857 retval = cert_and_catch(trx);
858 if (retval != WSREP_OK)
859 {
860 // apply monitor is self canceled in cert
861 break;
862 }
863 trx->set_state(TrxHandle::S_MUST_REPLAY_AM);
864 // fall through
865 case TrxHandle::S_MUST_REPLAY_AM:
866 {
867 // safety measure to make sure that all preceding trxs finish before
868 // replaying
869 trx->set_depends_seqno(trx->global_seqno() - 1);
870 ApplyOrder ao(*trx);
871 gu_trace(apply_monitor_.enter(ao));
872 trx->set_state(TrxHandle::S_MUST_REPLAY_CM);
873 }
874 // fall through
875 case TrxHandle::S_MUST_REPLAY_CM:
876 if (co_mode_ != CommitOrder::BYPASS)
877 {
878 CommitOrder co(*trx, co_mode_);
879 gu_trace(commit_monitor_.enter(co));
880 }
881 trx->set_state(TrxHandle::S_MUST_REPLAY);
882 // fall through
883 case TrxHandle::S_MUST_REPLAY:
884 ++local_replays_;
885 trx->set_state(TrxHandle::S_REPLAYING);
886
887 try
888 {
889 wsrep_trx_meta_t meta = {{state_uuid_, trx->global_seqno() },
890 trx->depends_seqno()};
891
892 gu_trace(apply_trx_ws(trx_ctx, apply_cb_, commit_cb_, *trx, meta));
893
894 wsrep_bool_t unused(false);
895 wsrep_cb_status_t rcode(
896 commit_cb_(
897 trx_ctx,
898 TrxHandle::trx_flags_to_wsrep_flags(trx->flags()),
899 &meta,
900 &unused,
901 true));
902
903 if (gu_unlikely(rcode != WSREP_CB_SUCCESS))
904 gu_throw_fatal << "Commit failed. Trx: " << trx;
905 }
906 catch (gu::Exception& e)
907 {
908 st_.mark_corrupt();
909 throw;
910 }
911
912 // apply, commit monitors are released in post commit
913 return WSREP_OK;
914 default:
915 gu_throw_fatal << "Invalid state in replay for trx " << *trx;
916 }
917
918 log_debug << "replaying failed for trx " << *trx;
919 trx->set_state(TrxHandle::S_ABORTING);
920
921 return retval;
922 }
923
924
post_commit(TrxHandle * trx)925 wsrep_status_t galera::ReplicatorSMM::post_commit(TrxHandle* trx)
926 {
927 if (trx->state() == TrxHandle::S_MUST_ABORT)
928 {
929 // This is possible in case of ALG: BF applier BF aborts
930 // trx that has already grabbed commit monitor and is committing.
931 // However, this should be acceptable assuming that commit
932 // operation does not reserve any more resources and is able
933 // to release already reserved resources.
934 log_debug << "trx was BF aborted during commit: " << *trx;
935 // manipulate state to avoid crash
936 trx->set_state(TrxHandle::S_MUST_REPLAY);
937 trx->set_state(TrxHandle::S_REPLAYING);
938 }
939 assert(trx->state() == TrxHandle::S_COMMITTING ||
940 trx->state() == TrxHandle::S_REPLAYING);
941 assert(trx->local_seqno() > -1 && trx->global_seqno() > -1);
942
943 CommitOrder co(*trx, co_mode_);
944 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.leave(co);
945
946 ApplyOrder ao(*trx);
947 report_last_committed(cert_.set_trx_committed(trx));
948 apply_monitor_.leave(ao);
949
950 trx->set_state(TrxHandle::S_COMMITTED);
951
952 ++local_commits_;
953
954 return WSREP_OK;
955 }
956
957
post_rollback(TrxHandle * trx)958 wsrep_status_t galera::ReplicatorSMM::post_rollback(TrxHandle* trx)
959 {
960 if (trx->state() == TrxHandle::S_MUST_ABORT)
961 {
962 trx->set_state(TrxHandle::S_ABORTING);
963 }
964
965 assert(trx->state() == TrxHandle::S_ABORTING ||
966 trx->state() == TrxHandle::S_EXECUTING);
967
968 trx->set_state(TrxHandle::S_ROLLED_BACK);
969
970 // Trx was either rolled back by user or via certification failure,
971 // last committed report not needed since cert index state didn't change.
972 // report_last_committed();
973 ++local_rollbacks_;
974
975 return WSREP_OK;
976 }
977
978
causal_read(wsrep_gtid_t * gtid)979 wsrep_status_t galera::ReplicatorSMM::causal_read(wsrep_gtid_t* gtid)
980 {
981 wsrep_seqno_t cseq;
982 gu::datetime::Date wait_until(gu::datetime::Date::calendar() +
983 causal_read_timeout_);
984
985 try
986 {
987 gcs_.caused(cseq, wait_until);
988 assert(cseq >= 0);
989 }
990 catch (gu::Exception& e)
991 {
992 log_warn << "gcs_caused() returned " << -e.get_errno()
993 << " (" << strerror(e.get_errno()) << ")";
994 return WSREP_TRX_FAIL;
995 }
996
997 try
998 {
999 // @note: Using timed wait for monitor is currently a hack
1000 // to avoid deadlock resulting from race between monitor wait
1001 // and drain during configuration change. Instead of this,
1002 // monitor should have proper mechanism to interrupt waiters
1003 // at monitor drain and disallowing further waits until
1004 // configuration change related operations (SST etc) have been
1005 // finished.
1006 if (gu_likely(co_mode_ != CommitOrder::BYPASS))
1007 {
1008 commit_monitor_.wait(cseq, wait_until);
1009 }
1010 else
1011 {
1012 apply_monitor_.wait(cseq, wait_until);
1013 }
1014 if (gtid != 0)
1015 {
1016 gtid->uuid = state_uuid_;
1017 gtid->seqno = cseq;
1018 }
1019 ++causal_reads_;
1020 return WSREP_OK;
1021 }
1022 catch (gu::Exception& e)
1023 {
1024 log_debug << "monitor wait failed for causal read: " << e.what();
1025 return WSREP_TRX_FAIL;
1026 }
1027 }
1028
1029
to_isolation_begin(TrxHandle * trx,wsrep_trx_meta_t * meta)1030 wsrep_status_t galera::ReplicatorSMM::to_isolation_begin(TrxHandle* trx,
1031 wsrep_trx_meta_t* meta)
1032 {
1033 if (meta != 0)
1034 {
1035 meta->gtid.uuid = state_uuid_;
1036 meta->gtid.seqno = trx->global_seqno();
1037 meta->depends_on = trx->depends_seqno();
1038 }
1039
1040 assert(trx->state() == TrxHandle::S_REPLICATING);
1041 assert(trx->trx_id() == static_cast<wsrep_trx_id_t>(-1));
1042 assert(trx->local_seqno() > -1 && trx->global_seqno() > -1);
1043 assert(trx->global_seqno() > STATE_SEQNO());
1044
1045 wsrep_status_t retval;
1046 switch ((retval = cert_and_catch(trx)))
1047 {
1048 case WSREP_OK:
1049 {
1050 ApplyOrder ao(*trx);
1051 CommitOrder co(*trx, co_mode_);
1052
1053 gu_trace(apply_monitor_.enter(ao));
1054
1055 if (co_mode_ != CommitOrder::BYPASS)
1056 try
1057 {
1058 commit_monitor_.enter(co);
1059 }
1060 catch (...)
1061 {
1062 gu_throw_fatal << "unable to enter commit monitor: " << *trx;
1063 }
1064
1065 trx->set_state(TrxHandle::S_APPLYING);
1066 log_debug << "Executing TO isolated action: " << *trx;
1067 st_.mark_unsafe();
1068 break;
1069 }
1070 case WSREP_TRX_FAIL:
1071 // Apply monitor is released in cert() in case of failure.
1072 trx->set_state(TrxHandle::S_ABORTING);
1073 break;
1074 default:
1075 log_error << "unrecognized retval "
1076 << retval
1077 << " for to isolation certification for "
1078 << *trx;
1079 retval = WSREP_FATAL;
1080 break;
1081 }
1082
1083 return retval;
1084 }
1085
1086
to_isolation_end(TrxHandle * trx)1087 wsrep_status_t galera::ReplicatorSMM::to_isolation_end(TrxHandle* trx)
1088 {
1089 assert(trx->state() == TrxHandle::S_APPLYING);
1090
1091 log_debug << "Done executing TO isolated action: " << *trx;
1092
1093 CommitOrder co(*trx, co_mode_);
1094 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.leave(co);
1095 ApplyOrder ao(*trx);
1096 report_last_committed(cert_.set_trx_committed(trx));
1097 apply_monitor_.leave(ao);
1098
1099 st_.mark_safe();
1100
1101 return WSREP_OK;
1102 }
1103
1104 namespace galera
1105 {
1106
1107 static WriteSetOut*
writeset_from_handle(wsrep_po_handle_t & handle,const TrxHandle::Params & trx_params)1108 writeset_from_handle (wsrep_po_handle_t& handle,
1109 const TrxHandle::Params& trx_params)
1110 {
1111 WriteSetOut* ret = reinterpret_cast<WriteSetOut*>(handle.opaque);
1112
1113 if (NULL == ret)
1114 {
1115 try
1116 {
1117 ret = new WriteSetOut(
1118 // gu::String<256>(trx_params.working_dir_) << '/' << &handle,
1119 trx_params.working_dir_, wsrep_trx_id_t(&handle),
1120 /* key format is not essential since we're not adding keys */
1121 KeySet::version(trx_params.key_format_), NULL, 0, 0,
1122 trx_params.record_set_ver_,
1123 WriteSetNG::MAX_VERSION, DataSet::MAX_VERSION, DataSet::MAX_VERSION,
1124 trx_params.max_write_set_size_);
1125
1126 handle.opaque = ret;
1127 }
1128 catch (std::bad_alloc& ba)
1129 {
1130 gu_throw_error(ENOMEM) << "Could not create WriteSetOut";
1131 }
1132 }
1133
1134 return ret;
1135 }
1136
1137 } /* namespace galera */
1138
1139 wsrep_status_t
preordered_collect(wsrep_po_handle_t & handle,const struct wsrep_buf * const data,size_t const count,bool const copy)1140 galera::ReplicatorSMM::preordered_collect(wsrep_po_handle_t& handle,
1141 const struct wsrep_buf* const data,
1142 size_t const count,
1143 bool const copy)
1144 {
1145 if (gu_unlikely(trx_params_.version_ < WS_NG_VERSION))
1146 return WSREP_NOT_IMPLEMENTED;
1147
1148 WriteSetOut* const ws(writeset_from_handle(handle, trx_params_));
1149
1150 for (size_t i(0); i < count; ++i)
1151 {
1152 ws->append_data(data[i].ptr, data[i].len, copy);
1153 }
1154
1155 return WSREP_OK;
1156 }
1157
1158
1159 wsrep_status_t
preordered_commit(wsrep_po_handle_t & handle,const wsrep_uuid_t & source,uint64_t const flags,int const pa_range,bool const commit)1160 galera::ReplicatorSMM::preordered_commit(wsrep_po_handle_t& handle,
1161 const wsrep_uuid_t& source,
1162 uint64_t const flags,
1163 int const pa_range,
1164 bool const commit)
1165 {
1166 if (gu_unlikely(trx_params_.version_ < WS_NG_VERSION))
1167 return WSREP_NOT_IMPLEMENTED;
1168
1169 WriteSetOut* const ws(writeset_from_handle(handle, trx_params_));
1170
1171 if (gu_likely(true == commit))
1172 {
1173 ws->set_flags (WriteSetNG::wsrep_flags_to_ws_flags(flags));
1174
1175 /* by loooking at trx_id we should be able to detect gaps / lost events
1176 * (however resending is not implemented yet). Something like
1177 *
1178 * wsrep_trx_id_t const trx_id(cert_.append_preordered(source, ws));
1179 *
1180 * begs to be here. */
1181 wsrep_trx_id_t const trx_id(preordered_id_.add_and_fetch(1));
1182
1183 WriteSetNG::GatherVector actv;
1184
1185 size_t const actv_size(ws->gather(source, 0, trx_id, actv));
1186
1187 ws->set_preordered (pa_range); // also adds CRC
1188
1189 int rcode;
1190 do
1191 {
1192 rcode = gcs_.sendv(actv, actv_size, GCS_ACT_TORDERED, false);
1193 }
1194 while (rcode == -EAGAIN && (usleep(1000), true));
1195
1196 if (rcode < 0)
1197 gu_throw_error(-rcode)
1198 << "Replication of preordered writeset failed.";
1199 }
1200
1201 delete ws;
1202 handle.opaque = NULL;
1203
1204 return WSREP_OK;
1205 }
1206
1207
1208 wsrep_status_t
sst_sent(const wsrep_gtid_t & state_id,int const rcode)1209 galera::ReplicatorSMM::sst_sent(const wsrep_gtid_t& state_id, int const rcode)
1210 {
1211 assert (rcode <= 0);
1212 assert (rcode == 0 || state_id.seqno == WSREP_SEQNO_UNDEFINED);
1213 assert (rcode != 0 || state_id.seqno >= 0);
1214
1215 if (state_() != S_DONOR)
1216 {
1217 log_error << "sst sent called when not SST donor, state " << state_();
1218 return WSREP_CONN_FAIL;
1219 }
1220
1221 gcs_seqno_t seqno(rcode ? rcode : state_id.seqno);
1222
1223 if (state_id.uuid != state_uuid_ && seqno >= 0)
1224 {
1225 // state we have sent no longer corresponds to the current group state
1226 // mark an error
1227 seqno = -EREMCHG;
1228 }
1229
1230 try {
1231 gcs_.join(seqno);
1232 return WSREP_OK;
1233 }
1234 catch (gu::Exception& e)
1235 {
1236 log_error << "failed to recover from DONOR state: " << e.what();
1237 return WSREP_CONN_FAIL;
1238 }
1239 }
1240
1241
process_trx(void * recv_ctx,TrxHandle * trx)1242 void galera::ReplicatorSMM::process_trx(void* recv_ctx, TrxHandle* trx)
1243 {
1244 assert(recv_ctx != 0);
1245 assert(trx != 0);
1246 assert(trx->local_seqno() > 0);
1247 assert(trx->global_seqno() > 0);
1248 assert(trx->last_seen_seqno() >= 0);
1249 assert(trx->depends_seqno() == -1);
1250 assert(trx->state() == TrxHandle::S_REPLICATING);
1251
1252 wsrep_status_t const retval(cert_and_catch(trx));
1253
1254 switch (retval)
1255 {
1256 case WSREP_OK:
1257 try
1258 {
1259 gu_trace(apply_trx(recv_ctx, trx));
1260 }
1261 catch (std::exception& e)
1262 {
1263 st_.mark_corrupt();
1264
1265 log_fatal << "Failed to apply trx: " << *trx;
1266 log_fatal << e.what();
1267 log_fatal << "Node consistency compromised, aborting...";
1268 abort();
1269 }
1270 break;
1271 case WSREP_TRX_FAIL:
1272 // certification failed, apply monitor has been canceled
1273 trx->set_state(TrxHandle::S_ABORTING);
1274 trx->set_state(TrxHandle::S_ROLLED_BACK);
1275 break;
1276 default:
1277 // this should not happen for remote actions
1278 gu_throw_error(EINVAL)
1279 << "unrecognized retval for remote trx certification: "
1280 << retval << " trx: " << *trx;
1281 }
1282 }
1283
1284
process_commit_cut(wsrep_seqno_t seq,wsrep_seqno_t seqno_l)1285 void galera::ReplicatorSMM::process_commit_cut(wsrep_seqno_t seq,
1286 wsrep_seqno_t seqno_l)
1287 {
1288 assert(seq > 0);
1289 assert(seqno_l > 0);
1290 LocalOrder lo(seqno_l);
1291
1292 gu_trace(local_monitor_.enter(lo));
1293
1294 if (seq >= cc_seqno_) /* Refs #782. workaround for
1295 * assert(seqno >= seqno_released_) in gcache. */
1296 cert_.purge_trxs_upto(seq, true);
1297
1298 local_monitor_.leave(lo);
1299 log_debug << "Got commit cut from GCS: " << seq;
1300 }
1301
establish_protocol_versions(int proto_ver)1302 void galera::ReplicatorSMM::establish_protocol_versions (int proto_ver)
1303 {
1304 trx_params_.record_set_ver_ = gu::RecordSet::VER1;
1305
1306 switch (proto_ver)
1307 {
1308 case 1:
1309 trx_params_.version_ = 1;
1310 str_proto_ver_ = 0;
1311 break;
1312 case 2:
1313 trx_params_.version_ = 1;
1314 str_proto_ver_ = 1;
1315 break;
1316 case 3:
1317 case 4:
1318 trx_params_.version_ = 2;
1319 str_proto_ver_ = 1;
1320 break;
1321 case 5:
1322 trx_params_.version_ = 3;
1323 str_proto_ver_ = 1;
1324 break;
1325 case 6:
1326 trx_params_.version_ = 3;
1327 str_proto_ver_ = 2; // gcs intelligent donor selection.
1328 // include handling dangling comma in donor string.
1329 break;
1330 case 7:
1331 // Protocol upgrade to handle IST SSL backwards compatibility,
1332 // no effect to TRX or STR protocols.
1333 trx_params_.version_ = 3;
1334 str_proto_ver_ = 2;
1335 break;
1336 case 8:
1337 // Protocol upgrade to enforce 8-byte alignment in writesets.
1338 trx_params_.version_ = 3;
1339 trx_params_.record_set_ver_ = gu::RecordSet::VER2;
1340 str_proto_ver_ = 2;
1341 break;
1342 case 9:
1343 // Protocol upgrade to enable support for semi-shared key type.
1344 trx_params_.version_ = 4;
1345 trx_params_.record_set_ver_ = gu::RecordSet::VER2;
1346 str_proto_ver_ = 2;
1347 break;
1348 default:
1349 log_fatal << "Configuration change resulted in an unsupported protocol "
1350 "version: " << proto_ver << ". Can't continue.";
1351 abort();
1352 };
1353
1354 protocol_version_ = proto_ver;
1355 log_info << "REPL Protocols: " << protocol_version_ << " ("
1356 << trx_params_.version_ << ", " << str_proto_ver_ << ")";
1357 }
1358
1359 static bool
app_wants_state_transfer(const void * const req,ssize_t const req_len)1360 app_wants_state_transfer (const void* const req, ssize_t const req_len)
1361 {
1362 return (req_len != (strlen(WSREP_STATE_TRANSFER_NONE) + 1) ||
1363 memcmp(req, WSREP_STATE_TRANSFER_NONE, req_len));
1364 }
1365
1366 void
update_incoming_list(const wsrep_view_info_t & view)1367 galera::ReplicatorSMM::update_incoming_list(const wsrep_view_info_t& view)
1368 {
1369 static char const separator(',');
1370
1371 ssize_t new_size(0);
1372
1373 if (view.memb_num > 0)
1374 {
1375 new_size += view.memb_num - 1; // separators
1376
1377 for (int i = 0; i < view.memb_num; ++i)
1378 {
1379 new_size += strlen(view.members[i].incoming);
1380 }
1381 }
1382
1383 gu::Lock lock(incoming_mutex_);
1384
1385 incoming_list_.clear();
1386 incoming_list_.resize(new_size);
1387
1388 if (new_size <= 0) return;
1389
1390 incoming_list_ = view.members[0].incoming;
1391
1392 for (int i = 1; i < view.memb_num; ++i)
1393 {
1394 incoming_list_ += separator;
1395 incoming_list_ += view.members[i].incoming;
1396 }
1397 }
1398
1399 void
process_conf_change(void * recv_ctx,const wsrep_view_info_t & view_info,int repl_proto,State next_state,wsrep_seqno_t seqno_l)1400 galera::ReplicatorSMM::process_conf_change(void* recv_ctx,
1401 const wsrep_view_info_t& view_info,
1402 int repl_proto,
1403 State next_state,
1404 wsrep_seqno_t seqno_l)
1405 {
1406 assert(repl_proto >= 0 || view_info.status != WSREP_VIEW_PRIMARY);
1407 assert(seqno_l > -1);
1408
1409 update_incoming_list(view_info);
1410
1411 LocalOrder lo(seqno_l);
1412 gu_trace(local_monitor_.enter(lo));
1413
1414 wsrep_seqno_t const upto(cert_.position());
1415
1416 if (view_info.status == WSREP_VIEW_PRIMARY)
1417 {
1418 safe_to_bootstrap_ = (view_info.memb_num == 1);
1419 }
1420
1421 apply_monitor_.drain(upto);
1422
1423 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(upto);
1424
1425 if (view_info.my_idx >= 0)
1426 {
1427 uuid_ = view_info.members[view_info.my_idx].id;
1428 }
1429
1430 bool const st_required(state_transfer_required(view_info));
1431 wsrep_seqno_t const group_seqno(view_info.state_id.seqno);
1432 const wsrep_uuid_t& group_uuid (view_info.state_id.uuid);
1433
1434 if (st_required)
1435 {
1436 log_info << "State transfer required: "
1437 << "\n\tGroup state: " << group_uuid << ":" << group_seqno
1438 << "\n\tLocal state: " << state_uuid_<< ":" << STATE_SEQNO();
1439
1440 if (S_CONNECTED != state_()) state_.shift_to(S_CONNECTED);
1441 }
1442
1443 // must establish protocols before calling view_cb()
1444 if (view_info.view >= 0) establish_protocol_versions (repl_proto);
1445
1446 void* app_req(0);
1447 size_t app_req_len(0);
1448
1449 const_cast<wsrep_view_info_t&>(view_info).state_gap = st_required;
1450 wsrep_cb_status_t const rcode(
1451 view_cb_(app_ctx_, recv_ctx, &view_info, 0, 0, &app_req, &app_req_len));
1452
1453 if (WSREP_CB_SUCCESS != rcode)
1454 {
1455 assert(app_req_len <= 0);
1456 log_fatal << "View callback failed. This is unrecoverable, "
1457 << "restart required.";
1458 close();
1459 abort();
1460 }
1461 else if (st_required && 0 == app_req_len && state_uuid_ != group_uuid)
1462 {
1463 log_fatal << "Local state UUID " << state_uuid_
1464 << " is different from group state UUID " << group_uuid
1465 << ", and SST request is null: restart required.";
1466 close();
1467 abort();
1468 }
1469
1470 if (view_info.view >= 0) // Primary configuration
1471 {
1472 GU_DBUG_SYNC_WAIT("process_primary_configuration");
1473
1474 // we have to reset cert initial position here, SST does not contain
1475 // cert index yet (see #197).
1476 // Also this must be done before releasing GCache buffers.
1477 cert_.assign_initial_position(group_seqno, trx_params_.version_);
1478
1479 if (STATE_SEQNO() > 0) service_thd_.release_seqno(STATE_SEQNO());
1480 // make sure all gcache buffers are released
1481
1482 // at this point there is no ongoing master or slave transactions
1483 // and no new requests to service thread should be possible
1484 service_thd_.flush(); // make sure service thd is idle
1485
1486 // record state seqno, needed for IST on DONOR
1487 cc_seqno_ = group_seqno;
1488
1489 bool const app_wants_st(app_wants_state_transfer(app_req, app_req_len));
1490
1491 if (st_required && app_wants_st)
1492 {
1493 // GCache::Seqno_reset() happens here
1494 request_state_transfer (recv_ctx,
1495 group_uuid, group_seqno, app_req,
1496 app_req_len);
1497 }
1498 else
1499 {
1500 if (view_info.view == 1 || !app_wants_st)
1501 {
1502 update_state_uuid (group_uuid);
1503 gcache_.seqno_reset(to_gu_uuid(group_uuid), group_seqno);
1504 apply_monitor_.set_initial_position(group_seqno);
1505 if (co_mode_ != CommitOrder::BYPASS)
1506 commit_monitor_.set_initial_position(group_seqno);
1507 }
1508
1509 if (state_() == S_CONNECTED || state_() == S_DONOR)
1510 {
1511 switch (next_state)
1512 {
1513 case S_JOINING:
1514 state_.shift_to(S_JOINING);
1515 break;
1516 case S_DONOR:
1517 if (state_() == S_CONNECTED)
1518 {
1519 state_.shift_to(S_DONOR);
1520 }
1521 break;
1522 case S_JOINED:
1523 state_.shift_to(S_JOINED);
1524 break;
1525 case S_SYNCED:
1526 state_.shift_to(S_SYNCED);
1527 synced_cb_(app_ctx_);
1528 break;
1529 default:
1530 log_debug << "next_state " << next_state;
1531 break;
1532 }
1533 }
1534
1535 st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
1536 }
1537
1538 if (state_() == S_JOINING && sst_state_ != SST_NONE)
1539 {
1540 /* There are two reasons we can be here:
1541 * 1) we just got state transfer in request_state_transfer() above;
1542 * 2) we failed here previously (probably due to partition).
1543 */
1544 try {
1545 gcs_.join(sst_seqno_);
1546 sst_state_ = SST_JOIN_SENT;
1547 }
1548 catch (gu::Exception& e)
1549 {
1550 log_error << "Failed to JOIN the cluster after SST";
1551 }
1552 }
1553 }
1554 else
1555 {
1556 // Non-primary configuration
1557 if (state_uuid_ != WSREP_UUID_UNDEFINED && next_state == S_CLOSING)
1558 {
1559 st_.set (state_uuid_, STATE_SEQNO(), safe_to_bootstrap_);
1560 }
1561
1562 if (next_state != S_CONNECTED && next_state != S_CLOSING)
1563 {
1564 log_fatal << "Internal error: unexpected next state for "
1565 << "non-prim: " << next_state << ". Restart required.";
1566 close();
1567 abort();
1568 }
1569
1570 state_.shift_to(next_state);
1571 }
1572
1573 local_monitor_.leave(lo);
1574 gcs_.resume_recv();
1575 free(app_req);
1576 }
1577
1578
process_join(wsrep_seqno_t seqno_j,wsrep_seqno_t seqno_l)1579 void galera::ReplicatorSMM::process_join(wsrep_seqno_t seqno_j,
1580 wsrep_seqno_t seqno_l)
1581 {
1582 LocalOrder lo(seqno_l);
1583
1584 gu_trace(local_monitor_.enter(lo));
1585
1586 wsrep_seqno_t const upto(cert_.position());
1587
1588 apply_monitor_.drain(upto);
1589
1590 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(upto);
1591
1592 if (seqno_j < 0 && S_JOINING == state_())
1593 {
1594 // #595, @todo: find a way to re-request state transfer
1595 log_fatal << "Failed to receive state transfer: " << seqno_j
1596 << " (" << strerror (-seqno_j) << "), need to restart.";
1597 abort();
1598 }
1599 else
1600 {
1601 state_.shift_to(S_JOINED);
1602 sst_state_ = SST_NONE;
1603 }
1604
1605 local_monitor_.leave(lo);
1606 }
1607
1608
process_sync(wsrep_seqno_t seqno_l)1609 void galera::ReplicatorSMM::process_sync(wsrep_seqno_t seqno_l)
1610 {
1611 LocalOrder lo(seqno_l);
1612
1613 gu_trace(local_monitor_.enter(lo));
1614
1615 wsrep_seqno_t const upto(cert_.position());
1616
1617 apply_monitor_.drain(upto);
1618
1619 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(upto);
1620
1621 state_.shift_to(S_SYNCED);
1622 synced_cb_(app_ctx_);
1623 local_monitor_.leave(lo);
1624 }
1625
pause()1626 wsrep_seqno_t galera::ReplicatorSMM::pause()
1627 {
1628 // Grab local seqno for local_monitor_
1629 wsrep_seqno_t const local_seqno(
1630 static_cast<wsrep_seqno_t>(gcs_.local_sequence()));
1631 LocalOrder lo(local_seqno);
1632 local_monitor_.enter(lo);
1633
1634 // Local monitor should take care that concurrent
1635 // pause requests are enqueued
1636 assert(pause_seqno_ == WSREP_SEQNO_UNDEFINED);
1637 pause_seqno_ = local_seqno;
1638
1639 // Get drain seqno from cert index
1640 wsrep_seqno_t const upto(cert_.position());
1641 apply_monitor_.drain(upto);
1642 assert (apply_monitor_.last_left() >= upto);
1643
1644 if (co_mode_ != CommitOrder::BYPASS)
1645 {
1646 commit_monitor_.drain(upto);
1647 assert (commit_monitor_.last_left() >= upto);
1648 assert (commit_monitor_.last_left() == apply_monitor_.last_left());
1649 }
1650
1651 wsrep_seqno_t const ret(STATE_SEQNO());
1652 st_.set(state_uuid_, ret, safe_to_bootstrap_);
1653
1654 log_info << "Provider paused at " << state_uuid_ << ':' << ret
1655 << " (" << pause_seqno_ << ")";
1656
1657 return ret;
1658 }
1659
resume()1660 void galera::ReplicatorSMM::resume()
1661 {
1662 if (pause_seqno_ == WSREP_SEQNO_UNDEFINED)
1663 {
1664 log_warn << "tried to resume unpaused provider";
1665 return;
1666 }
1667
1668 st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
1669 log_info << "resuming provider at " << pause_seqno_;
1670 LocalOrder lo(pause_seqno_);
1671 pause_seqno_ = WSREP_SEQNO_UNDEFINED;
1672 local_monitor_.leave(lo);
1673 log_info << "Provider resumed.";
1674 }
1675
desync()1676 void galera::ReplicatorSMM::desync()
1677 {
1678 wsrep_seqno_t seqno_l;
1679
1680 ssize_t const ret(gcs_.desync(&seqno_l));
1681
1682 if (seqno_l > 0)
1683 {
1684 LocalOrder lo(seqno_l); // need to process it regardless of ret value
1685
1686 if (ret == 0)
1687 {
1688 /* #706 - the check below must be state request-specific. We are not holding
1689 any locks here and must be able to wait like any other action.
1690 However practice may prove different, leaving it here as a reminder.
1691 if (local_monitor_.would_block(seqno_l))
1692 {
1693 gu_throw_error (-EDEADLK) << "Ran out of resources waiting to "
1694 << "desync the node. "
1695 << "The node must be restarted.";
1696 }
1697 */
1698 local_monitor_.enter(lo);
1699 if (state_() != S_DONOR) state_.shift_to(S_DONOR);
1700 local_monitor_.leave(lo);
1701 }
1702 else
1703 {
1704 local_monitor_.self_cancel(lo);
1705 }
1706 }
1707
1708 if (ret)
1709 {
1710 gu_throw_error (-ret) << "Node desync failed.";
1711 }
1712 }
1713
resync()1714 void galera::ReplicatorSMM::resync()
1715 {
1716 gcs_.join(commit_monitor_.last_left());
1717 }
1718
1719
1720 //////////////////////////////////////////////////////////////////////
1721 //////////////////////////////////////////////////////////////////////
1722 //// Private
1723 //////////////////////////////////////////////////////////////////////
1724 //////////////////////////////////////////////////////////////////////
1725
1726 /* don't use this directly, use cert_and_catch() instead */
1727 inline
cert(TrxHandle * trx)1728 wsrep_status_t galera::ReplicatorSMM::cert(TrxHandle* trx)
1729 {
1730 assert(trx->state() == TrxHandle::S_REPLICATING ||
1731 trx->state() == TrxHandle::S_MUST_CERT_AND_REPLAY);
1732
1733 assert(trx->local_seqno() != WSREP_SEQNO_UNDEFINED);
1734 assert(trx->global_seqno() != WSREP_SEQNO_UNDEFINED);
1735 assert(trx->last_seen_seqno() >= 0);
1736 assert(trx->last_seen_seqno() < trx->global_seqno());
1737
1738 trx->set_state(TrxHandle::S_CERTIFYING);
1739
1740 LocalOrder lo(*trx);
1741 ApplyOrder ao(*trx);
1742 CommitOrder co(*trx, co_mode_);
1743
1744 bool interrupted(false);
1745
1746 try
1747 {
1748 gu_trace(local_monitor_.enter(lo));
1749 }
1750 catch (gu::Exception& e)
1751 {
1752 if (e.get_errno() == EINTR) { interrupted = true; }
1753 else throw;
1754 }
1755
1756 wsrep_status_t retval(WSREP_OK);
1757 // IST should have drained the monitors, so STATE_SEQNO() should be current
1758 bool const applicable(trx->global_seqno() > STATE_SEQNO());
1759
1760 if (!applicable)
1761 {
1762 // this can happen after state transfer position has been submitted
1763 // but not all actions preceding it have been processed.
1764 //
1765 // Cert index preload after SST:
1766 // ----------------------------
1767 // If the trx global seqno is in the half open range
1768 // (cc_seqno_ , sst_seqno_], the write set was contained in the SST.
1769 // In this case do the certification for trx to populate the index,
1770 // but ignore the result. Always set state as S_MUST_ABORT and
1771 // return WSREP_TRX_FAIL to make calling code to discard this trx.
1772 if (last_st_type_ == ST_TYPE_SST &&
1773 cc_seqno_ < trx->global_seqno() &&
1774 trx->global_seqno() <= sst_seqno_)
1775 {
1776 (void)cert_.append_trx(trx);
1777 trx->verify_checksum();
1778 gcache_.seqno_assign (trx->action(),
1779 trx->global_seqno(),
1780 trx->depends_seqno());
1781 cert_.set_trx_committed(trx);
1782 }
1783 else
1784 {
1785 gcache_.free(const_cast<void*>(trx->action()));
1786 }
1787 trx->set_state(TrxHandle::S_MUST_ABORT);
1788 if (interrupted)
1789 local_monitor_.self_cancel(lo);
1790 else
1791 local_monitor_.leave(lo);
1792 return WSREP_TRX_FAIL;
1793 }
1794
1795 if (gu_likely (!interrupted))
1796 {
1797 switch (cert_.append_trx(trx))
1798 {
1799 case Certification::TEST_OK:
1800 if (trx->state() == TrxHandle::S_CERTIFYING)
1801 {
1802 retval = WSREP_OK;
1803 }
1804 else
1805 {
1806 assert(trx->state() == TrxHandle::S_MUST_ABORT);
1807 trx->set_state(TrxHandle::S_MUST_REPLAY_AM);
1808 retval = WSREP_BF_ABORT;
1809 }
1810 break;
1811 case Certification::TEST_FAILED:
1812 if (gu_unlikely(trx->is_toi())) // small sanity check
1813 {
1814 // may happen on configuration change
1815 log_warn << "Certification failed for TO isolated action: "
1816 << *trx;
1817 assert(0);
1818 }
1819 local_cert_failures_ += trx->is_local();
1820 trx->set_state(TrxHandle::S_MUST_ABORT);
1821 retval = WSREP_TRX_FAIL;
1822 break;
1823 }
1824
1825 if (gu_unlikely(WSREP_TRX_FAIL == retval))
1826 {
1827 report_last_committed(cert_.set_trx_committed(trx));
1828 }
1829
1830 // at this point we are about to leave local_monitor_. Make sure
1831 // trx checksum was alright before that.
1832 trx->verify_checksum();
1833
1834 // we must do it 'in order' for std::map reasons, so keeping
1835 // it inside the monitor
1836 gcache_.seqno_assign (trx->action(),
1837 trx->global_seqno(),
1838 trx->depends_seqno());
1839
1840 local_monitor_.leave(lo);
1841 }
1842 else
1843 {
1844 retval = cert_for_aborted(trx);
1845
1846 if (WSREP_TRX_FAIL == retval)
1847 {
1848 local_monitor_.self_cancel(lo);
1849 }
1850 else
1851 {
1852 assert(WSREP_BF_ABORT == retval);
1853 }
1854 }
1855
1856 if (gu_unlikely(WSREP_TRX_FAIL == retval))
1857 {
1858 // applicable but failed certification: self-cancel monitors
1859 apply_monitor_.self_cancel(ao);
1860 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.self_cancel(co);
1861 }
1862
1863 assert(applicable);
1864
1865 return retval;
1866 }
1867
1868 /* pretty much any exception in cert() is fatal as it blocks local_monitor_ */
cert_and_catch(TrxHandle * trx)1869 wsrep_status_t galera::ReplicatorSMM::cert_and_catch(TrxHandle* trx)
1870 {
1871 try
1872 {
1873 return cert(trx);
1874 }
1875 catch (std::exception& e)
1876 {
1877 log_fatal << "Certification exception: " << e.what();
1878 }
1879 catch (...)
1880 {
1881 log_fatal << "Unknown certification exception";
1882 }
1883 abort();
1884 }
1885
1886 /* This must be called BEFORE local_monitor_.self_cancel() due to
1887 * gcache_.seqno_assign() */
cert_for_aborted(TrxHandle * trx)1888 wsrep_status_t galera::ReplicatorSMM::cert_for_aborted(TrxHandle* trx)
1889 {
1890 Certification::TestResult const res(cert_.test(trx, false));
1891
1892 switch (res)
1893 {
1894 case Certification::TEST_OK:
1895 trx->set_state(TrxHandle::S_MUST_CERT_AND_REPLAY);
1896 return WSREP_BF_ABORT;
1897
1898 case Certification::TEST_FAILED:
1899 if (trx->state() != TrxHandle::S_MUST_ABORT)
1900 {
1901 trx->set_state(TrxHandle::S_MUST_ABORT);
1902 }
1903 // Mext step will be monitors release. Make sure that ws was not
1904 // corrupted and cert failure is real before procedeing with that.
1905 trx->verify_checksum();
1906 gcache_.seqno_assign (trx->action(), trx->global_seqno(), -1);
1907 return WSREP_TRX_FAIL;
1908
1909 default:
1910 log_fatal << "Unexpected return value from Certification::test(): "
1911 << res;
1912 abort();
1913 }
1914 }
1915
1916
1917 void
update_state_uuid(const wsrep_uuid_t & uuid)1918 galera::ReplicatorSMM::update_state_uuid (const wsrep_uuid_t& uuid)
1919 {
1920 if (state_uuid_ != uuid)
1921 {
1922 *(const_cast<wsrep_uuid_t*>(&state_uuid_)) = uuid;
1923
1924 std::ostringstream os; os << state_uuid_;
1925 // Copy only non-nil terminated part of the source string
1926 // and terminate the string explicitly to silence a warning
1927 // generated by Wstringop-truncation
1928 char* str(const_cast<char*>(state_uuid_str_));
1929 strncpy(str, os.str().c_str(), sizeof(state_uuid_str_) - 1);
1930 str[sizeof(state_uuid_str_) - 1] = '\0';
1931 }
1932
1933 st_.set(uuid, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
1934 }
1935
1936 void
abort()1937 galera::ReplicatorSMM::abort()
1938 {
1939 gcs_.close();
1940 gu_abort();
1941 }
1942