1 //
2 // Copyright (C) 2010-2020 Codership Oy <info@codership.com>
3 //
4
5 #include "galera_common.hpp"
6 #include "replicator_smm.hpp"
7 #include "gcs_action_source.hpp"
8 #include "galera_exception.hpp"
9
10 #include "galera_info.hpp"
11
12 #include <gu_debug_sync.hpp>
13 #include <gu_abort.h>
14
15 #include <sstream>
16 #include <iostream>
17
18
19 #define TX_SET_STATE(t_,s_) (t_).set_state(s_, __LINE__)
20
21
22 wsrep_cap_t
capabilities(int protocol_version)23 galera::ReplicatorSMM::capabilities(int protocol_version)
24 {
25 static uint64_t const v4_caps(WSREP_CAP_MULTI_MASTER |
26 WSREP_CAP_CERTIFICATION |
27 WSREP_CAP_PARALLEL_APPLYING |
28 WSREP_CAP_TRX_REPLAY |
29 WSREP_CAP_ISOLATION |
30 WSREP_CAP_PAUSE |
31 WSREP_CAP_CAUSAL_READS);
32
33 static uint64_t const v5_caps(WSREP_CAP_INCREMENTAL_WRITESET |
34 WSREP_CAP_UNORDERED |
35 WSREP_CAP_PREORDERED);
36
37 static uint64_t const v8_caps(WSREP_CAP_STREAMING);
38
39 static uint64_t const v9_caps(WSREP_CAP_NBO);
40
41 if (protocol_version == -1) return 0;
42
43 assert(protocol_version >= 4);
44
45 uint64_t caps(v4_caps);
46
47 if (protocol_version >= 5) caps |= v5_caps;
48 if (protocol_version >= 8) caps |= v8_caps;
49 if (protocol_version >= 9) caps |= v9_caps;
50
51 return caps;
52 }
53
54
operator <<(std::ostream & os,ReplicatorSMM::State state)55 std::ostream& galera::operator<<(std::ostream& os, ReplicatorSMM::State state)
56 {
57 switch (state)
58 {
59 case ReplicatorSMM::S_DESTROYED: return (os << "DESTROYED");
60 case ReplicatorSMM::S_CLOSED: return (os << "CLOSED");
61 case ReplicatorSMM::S_CONNECTED: return (os << "CONNECTED");
62 case ReplicatorSMM::S_JOINING: return (os << "JOINING");
63 case ReplicatorSMM::S_JOINED: return (os << "JOINED");
64 case ReplicatorSMM::S_SYNCED: return (os << "SYNCED");
65 case ReplicatorSMM::S_DONOR: return (os << "DONOR");
66 }
67
68 gu_throw_fatal << "invalid state " << static_cast<int>(state);
69 }
70
71 //////////////////////////////////////////////////////////////////////
72 //////////////////////////////////////////////////////////////////////
73 // Public
74 //////////////////////////////////////////////////////////////////////
75 //////////////////////////////////////////////////////////////////////
76
ReplicatorSMM(const struct wsrep_init_args * args)77 galera::ReplicatorSMM::ReplicatorSMM(const struct wsrep_init_args* args)
78 :
79 ist_event_queue_ (),
80 init_lib_ (reinterpret_cast<gu_log_cb_t>(args->logger_cb)),
81 config_ (),
82 init_config_ (config_, args->node_address, args->data_dir),
83 parse_options_ (*this, config_, args->options),
84 init_ssl_ (config_),
85 protocol_version_ (-1),
86 proto_max_ (gu::from_string<int>(config_.get(Param::proto_max))),
87 state_ (S_CLOSED),
88 closing_mutex_ (),
89 closing_cond_ (),
90 closing_ (false),
91 sst_state_ (SST_NONE),
92 co_mode_ (CommitOrder::from_string(
93 config_.get(Param::commit_order))),
94 state_file_ (config_.get(BASE_DIR)+'/'+GALERA_STATE_FILE),
95 st_ (state_file_),
96 safe_to_bootstrap_ (true),
97 trx_params_ (config_.get(BASE_DIR), -1,
98 KeySet::version(config_.get(Param::key_format)),
99 TrxHandleMaster::Defaults.record_set_ver_,
100 gu::from_string<int>(config_.get(
101 Param::max_write_set_size))),
102 uuid_ (WSREP_UUID_UNDEFINED),
103 state_uuid_ (WSREP_UUID_UNDEFINED),
104 state_uuid_str_ (),
105 cc_seqno_ (WSREP_SEQNO_UNDEFINED),
106 cc_lowest_trx_seqno_(WSREP_SEQNO_UNDEFINED),
107 pause_seqno_ (WSREP_SEQNO_UNDEFINED),
108 app_ctx_ (args->app_ctx),
109 connected_cb_ (args->connected_cb),
110 view_cb_ (args->view_cb),
111 sst_request_cb_ (args->sst_request_cb),
112 apply_cb_ (args->apply_cb),
113 unordered_cb_ (args->unordered_cb),
114 sst_donate_cb_ (args->sst_donate_cb),
115 synced_cb_ (args->synced_cb),
116 sst_donor_ (),
117 sst_uuid_ (WSREP_UUID_UNDEFINED),
118 sst_seqno_ (WSREP_SEQNO_UNDEFINED),
119 sst_mutex_ (),
120 sst_cond_ (),
121 sst_retry_sec_ (1),
122 sst_received_ (false),
123 gcache_ (config_, config_.get(BASE_DIR)),
124 gcs_ (config_, gcache_, proto_max_, args->proto_ver,
125 args->node_name, args->node_incoming),
126 service_thd_ (gcs_, gcache_),
127 slave_pool_ (sizeof(TrxHandleSlave), 1024, "TrxHandleSlave"),
128 as_ (new GcsActionSource(slave_pool_, gcs_, *this, gcache_)),
129 ist_receiver_ (config_, gcache_, slave_pool_,*this,args->node_address),
130 ist_senders_ (gcache_),
131 wsdb_ (),
132 cert_ (config_, &service_thd_),
133 pending_cert_queue_ (gcache_),
134 local_monitor_ (),
135 apply_monitor_ (),
136 commit_monitor_ (),
137 causal_read_timeout_(config_.get(Param::causal_read_timeout)),
138 receivers_ (),
139 replicated_ (),
140 replicated_bytes_ (),
141 keys_count_ (),
142 keys_bytes_ (),
143 data_bytes_ (),
144 unrd_bytes_ (),
145 local_commits_ (),
146 local_rollbacks_ (),
147 local_cert_failures_(),
148 local_replays_ (),
149 causal_reads_ (),
150 preordered_id_ (),
151 incoming_list_ (""),
152 incoming_mutex_ (),
153 wsrep_stats_ ()
154 {
155 // @todo add guards (and perhaps actions)
156 state_.add_transition(Transition(S_CLOSED, S_DESTROYED));
157 state_.add_transition(Transition(S_CLOSED, S_CONNECTED));
158
159 state_.add_transition(Transition(S_CONNECTED, S_CLOSED));
160 state_.add_transition(Transition(S_CONNECTED, S_CONNECTED));
161 state_.add_transition(Transition(S_CONNECTED, S_JOINING));
162 // the following is possible only when bootstrapping new cluster
163 // (trivial wsrep_cluster_address)
164 state_.add_transition(Transition(S_CONNECTED, S_JOINED));
165 // the following are possible on PC remerge
166 state_.add_transition(Transition(S_CONNECTED, S_DONOR));
167 state_.add_transition(Transition(S_CONNECTED, S_SYNCED));
168
169 state_.add_transition(Transition(S_JOINING, S_CLOSED));
170 // the following is possible if one non-prim conf follows another
171 state_.add_transition(Transition(S_JOINING, S_CONNECTED));
172 state_.add_transition(Transition(S_JOINING, S_JOINED));
173
174 state_.add_transition(Transition(S_JOINED, S_CLOSED));
175 state_.add_transition(Transition(S_JOINED, S_CONNECTED));
176 state_.add_transition(Transition(S_JOINED, S_SYNCED));
177 // the following is possible if one desync() immediately follows another
178 state_.add_transition(Transition(S_JOINED, S_DONOR));
179
180 state_.add_transition(Transition(S_SYNCED, S_CLOSED));
181 state_.add_transition(Transition(S_SYNCED, S_CONNECTED));
182 state_.add_transition(Transition(S_SYNCED, S_DONOR));
183
184 state_.add_transition(Transition(S_DONOR, S_CLOSED));
185 state_.add_transition(Transition(S_DONOR, S_CONNECTED));
186 state_.add_transition(Transition(S_DONOR, S_JOINED));
187
188 local_monitor_.set_initial_position(WSREP_UUID_UNDEFINED, 0);
189
190 wsrep_uuid_t uuid;
191 wsrep_seqno_t seqno;
192
193 st_.get (uuid, seqno, safe_to_bootstrap_);
194
195 if (0 != args->state_id &&
196 args->state_id->uuid != WSREP_UUID_UNDEFINED &&
197 args->state_id->uuid == uuid &&
198 seqno == WSREP_SEQNO_UNDEFINED)
199 {
200 /* non-trivial recovery information provided on startup, and db is safe
201 * so use recovered seqno value */
202 seqno = args->state_id->seqno;
203 }
204
205 if (seqno >= 0) // non-trivial starting position
206 {
207 assert(uuid != WSREP_UUID_UNDEFINED);
208 cc_seqno_ = seqno; // is it needed here?
209
210 log_debug << "ReplicatorSMM() initial position: "
211 << uuid << ':' << seqno;
212 set_initial_position(uuid, seqno);
213 cert_.assign_initial_position(gu::GTID(uuid, seqno),
214 trx_params_.version_);
215 gcache_.seqno_reset(gu::GTID(uuid, seqno));
216 // update gcache position to one supplied by app.
217 }
218
219 build_stats_vars(wsrep_stats_);
220 }
221
start_closing()222 void galera::ReplicatorSMM::start_closing()
223 {
224 assert(closing_mutex_.locked());
225 assert(state_() >= S_CONNECTED);
226 if (!closing_)
227 {
228 closing_ = true;
229 gcs_.close();
230 }
231 }
232
shift_to_CLOSED()233 void galera::ReplicatorSMM::shift_to_CLOSED()
234 {
235 assert(closing_mutex_.locked());
236 assert(closing_);
237
238 state_.shift_to(S_CLOSED);
239
240 if (state_uuid_ != WSREP_UUID_UNDEFINED)
241 {
242 st_.set (state_uuid_, last_committed(), safe_to_bootstrap_);
243 }
244
245 /* Cleanup for re-opening. */
246 uuid_ = WSREP_UUID_UNDEFINED;
247 closing_ = false;
248 if (st_.corrupt())
249 {
250 /* this is a synchronization hack to make sure all receivers are done
251 * with their work and won't access cert module any more. The usual
252 * monitor drain is not enough here. */
253 while (receivers_() > 1) usleep(1000);
254
255 // this should erase the memory of a pre-existing state.
256 set_initial_position(WSREP_UUID_UNDEFINED, WSREP_SEQNO_UNDEFINED);
257 cert_.assign_initial_position(gu::GTID(GU_UUID_NIL, -1),
258 trx_params_.version_);
259 sst_uuid_ = WSREP_UUID_UNDEFINED;
260 sst_seqno_ = WSREP_SEQNO_UNDEFINED;
261 cc_seqno_ = WSREP_SEQNO_UNDEFINED;
262 cc_lowest_trx_seqno_ = WSREP_SEQNO_UNDEFINED;
263 pause_seqno_ = WSREP_SEQNO_UNDEFINED;
264 }
265
266 closing_cond_.broadcast();
267 }
268
wait_for_CLOSED(gu::Lock & lock)269 void galera::ReplicatorSMM::wait_for_CLOSED(gu::Lock& lock)
270 {
271 assert(closing_mutex_.locked());
272 assert(closing_);
273 while (state_() > S_CLOSED) lock.wait(closing_cond_);
274 assert(!closing_);
275 assert(WSREP_UUID_UNDEFINED == uuid_);
276 }
277
~ReplicatorSMM()278 galera::ReplicatorSMM::~ReplicatorSMM()
279 {
280 log_info << "dtor state: " << state_();
281
282 gu::Lock lock(closing_mutex_);
283
284 switch (state_())
285 {
286 case S_CONNECTED:
287 case S_JOINING:
288 case S_JOINED:
289 case S_SYNCED:
290 case S_DONOR:
291 start_closing();
292 wait_for_CLOSED(lock);
293 // fall through
294 case S_CLOSED:
295 ist_senders_.cancel();
296 break;
297 case S_DESTROYED:
298 break;
299 }
300
301 delete as_;
302 }
303
304
connect(const std::string & cluster_name,const std::string & cluster_url,const std::string & state_donor,bool const bootstrap)305 wsrep_status_t galera::ReplicatorSMM::connect(const std::string& cluster_name,
306 const std::string& cluster_url,
307 const std::string& state_donor,
308 bool const bootstrap)
309 {
310 sst_donor_ = state_donor;
311 service_thd_.reset();
312
313 // make sure there was a proper initialization/cleanup
314 assert(WSREP_UUID_UNDEFINED == uuid_);
315
316 ssize_t err = 0;
317 wsrep_status_t ret(WSREP_OK);
318 wsrep_seqno_t const seqno(last_committed());
319 wsrep_uuid_t const gcs_uuid(seqno < 0 ? WSREP_UUID_UNDEFINED :state_uuid_);
320 gu::GTID const inpos(gcs_uuid, seqno);
321
322 log_info << "Setting GCS initial position to " << inpos;
323
324 if ((bootstrap == true || cluster_url == "gcomm://")
325 && safe_to_bootstrap_ == false)
326 {
327 log_error << "It may not be safe to bootstrap the cluster from this node. "
328 << "It was not the last one to leave the cluster and may "
329 << "not contain all the updates. To force cluster bootstrap "
330 << "with this node, edit the grastate.dat file manually and "
331 << "set safe_to_bootstrap to 1 .";
332 ret = WSREP_NODE_FAIL;
333 }
334
335 if (ret == WSREP_OK && (err = gcs_.set_initial_position(inpos)) != 0)
336 {
337 log_error << "gcs init failed:" << strerror(-err);
338 ret = WSREP_NODE_FAIL;
339 }
340
341 if (ret == WSREP_OK &&
342 (err = gcs_.connect(cluster_name, cluster_url, bootstrap)) != 0)
343 {
344 log_error << "gcs connect failed: " << strerror(-err);
345 ret = WSREP_NODE_FAIL;
346 }
347
348 if (ret == WSREP_OK)
349 {
350 state_.shift_to(S_CONNECTED);
351 }
352
353 return ret;
354 }
355
356
close()357 wsrep_status_t galera::ReplicatorSMM::close()
358 {
359 gu::Lock lock(closing_mutex_);
360
361 if (state_() > S_CLOSED)
362 {
363 start_closing();
364 wait_for_CLOSED(lock);
365 }
366
367 return WSREP_OK;
368 }
369
370
async_recv(void * recv_ctx)371 wsrep_status_t galera::ReplicatorSMM::async_recv(void* recv_ctx)
372 {
373 if (state_() <= S_CLOSED)
374 {
375 log_error <<"async recv cannot start, provider in CLOSED state";
376 return WSREP_FATAL;
377 }
378
379 ++receivers_;
380
381 bool exit_loop(false);
382 wsrep_status_t retval(WSREP_OK);
383
384 while (WSREP_OK == retval && state_() > S_CLOSED)
385 {
386 GU_DBUG_SYNC_EXECUTE("before_async_recv_process_sync", sleep(5););
387
388 ssize_t rc;
389
390 while (gu_unlikely((rc = as_->process(recv_ctx, exit_loop))
391 == -ECANCELED))
392 {
393 recv_IST(recv_ctx);
394 // hack: prevent fast looping until ist controlling thread
395 // resumes gcs prosessing
396 usleep(10000);
397 }
398
399 if (gu_unlikely(rc <= 0))
400 {
401 if (GcsActionSource::INCONSISTENCY_CODE == rc)
402 {
403 st_.mark_corrupt();
404 retval = WSREP_FATAL;
405 }
406 else
407 {
408 retval = WSREP_CONN_FAIL;
409 }
410 }
411 else if (gu_unlikely(exit_loop == true))
412 {
413 assert(WSREP_OK == retval);
414
415 if (receivers_.sub_and_fetch(1) > 0)
416 {
417 log_info << "Slave thread exiting on request.";
418 break;
419 }
420
421 ++receivers_;
422 log_warn << "Refusing exit for the last slave thread.";
423 }
424 }
425
426 /* exiting loop already did proper checks */
427 if (!exit_loop && receivers_.sub_and_fetch(1) == 0)
428 {
429 gu::Lock lock(closing_mutex_);
430 if (state_() > S_CLOSED && !closing_)
431 {
432 assert(WSREP_CONN_FAIL == retval);
433 /* Last recv thread exiting due to error but replicator is not
434 * closed. We need to at least gracefully leave the cluster.*/
435
436 if (WSREP_OK == retval)
437 {
438 log_warn << "Broken shutdown sequence, provider state: "
439 << state_() << ", retval: " << retval;
440 assert (0);
441 }
442
443 start_closing();
444
445 // Generate zero view before exit to notify application
446 gcs_act_cchange const cc;
447 wsrep_uuid_t tmp(uuid_);
448 wsrep_view_info_t* const err_view
449 (galera_view_info_create(cc, capabilities(cc.repl_proto_ver),
450 -1, tmp));
451 view_cb_(app_ctx_, recv_ctx, err_view, 0, 0);
452 free(err_view);
453
454 shift_to_CLOSED();
455 }
456 }
457
458 log_debug << "Slave thread exit. Return code: " << retval;
459
460 return retval;
461 }
462
apply_trx(void * recv_ctx,TrxHandleSlave & ts)463 void galera::ReplicatorSMM::apply_trx(void* recv_ctx, TrxHandleSlave& ts)
464 {
465 assert(ts.global_seqno() > 0);
466 assert(!ts.is_committed());
467 if (!ts.skip_event())
468 {
469 assert(ts.trx_id() != uint64_t(-1) || ts.is_toi());
470 assert(ts.certified() /*Repl*/ || ts.preordered() /*IST*/);
471 assert(ts.local() == false || ts.nbo_end() ||
472 (ts.flags() & TrxHandle::F_COMMIT) ||
473 (ts.flags() & TrxHandle::F_ROLLBACK));
474 assert(ts.nbo_end() == false || ts.is_dummy());
475 }
476
477 ApplyException ae;
478
479 ApplyOrder ao(ts);
480 CommitOrder co(ts, co_mode_);
481
482 TX_SET_STATE(ts, TrxHandle::S_APPLYING);
483
484 gu_trace(apply_monitor_.enter(ao));
485
486 if (gu_unlikely(ts.nbo_start() == true))
487 {
488 // Non-blocking operation start, mark state unsafe.
489 st_.mark_unsafe();
490 }
491
492 wsrep_trx_meta_t meta = { { state_uuid_, ts.global_seqno() },
493 { ts.source_id(), ts.trx_id(), ts.conn_id() },
494 ts.depends_seqno() };
495
496 if (ts.is_toi())
497 {
498 log_debug << "Executing TO isolated action: " << ts;
499 st_.mark_unsafe();
500 }
501
502 wsrep_bool_t exit_loop(false);
503
504 try { gu_trace(ts.apply(recv_ctx, apply_cb_, meta, exit_loop)); }
505 catch (ApplyException& e)
506 {
507 assert(0 != e.status());
508 assert(NULL != e.data() || 0 == e.data_len());
509 assert(0 != e.data_len() || NULL == e.data());
510
511 if (!st_.corrupt())
512 {
513 assert(0 == e.data_len());
514 /* non-empty error must be handled in handle_apply_error(), while
515 * still in commit monitor. */
516 on_inconsistency();
517 }
518 }
519 /* at this point any other exception is fatal, not catching anything else.*/
520
521 if (ts.local() == false)
522 {
523 GU_DBUG_SYNC_WAIT("after_commit_slave_sync");
524 }
525
526 wsrep_seqno_t const safe_to_discard(cert_.set_trx_committed(ts));
527
528 /* For now need to keep it inside apply monitor to ensure all processing
529 * ends by the time monitors are drained because of potential gcache
530 * cleanup (and loss of the writeset buffer). Perhaps unordered monitor
531 * is needed here. */
532 ts.unordered(recv_ctx, unordered_cb_);
533
534 apply_monitor_.leave(ao);
535
536 if (ts.is_toi())
537 {
538 log_debug << "Done executing TO isolated action: "
539 << ts.global_seqno();
540 st_.mark_safe();
541 }
542
543 if (gu_likely(ts.local_seqno() != -1))
544 {
545 // trx with local seqno -1 originates from IST (or other source not gcs)
546 report_last_committed(safe_to_discard);
547 }
548
549 ts.set_exit_loop(exit_loop);
550 }
551
552
send(TrxHandleMaster & trx,wsrep_trx_meta_t * meta)553 wsrep_status_t galera::ReplicatorSMM::send(TrxHandleMaster& trx,
554 wsrep_trx_meta_t* meta)
555 {
556 assert(trx.locked());
557 if (state_() < S_JOINED) return WSREP_TRX_FAIL;
558
559 // SR rollback
560 const bool rollback(trx.flags() & TrxHandle::F_ROLLBACK);
561
562 if (rollback)
563 {
564 assert(trx.state() == TrxHandle::S_ABORTING);
565 assert((trx.flags() & TrxHandle::F_BEGIN) == 0);
566 TrxHandleSlavePtr ts(TrxHandleSlave::New(true, slave_pool_),
567 TrxHandleSlaveDeleter());
568 ts->set_global_seqno(0);
569 trx.add_replicated(ts);
570 }
571
572 WriteSetNG::GatherVector actv;
573
574 size_t act_size = trx.gather(actv);
575
576 ssize_t rcode(0);
577 do
578 {
579 const bool scheduled(!rollback);
580
581 if (scheduled)
582 {
583 const ssize_t gcs_handle(gcs_.schedule());
584
585 if (gu_unlikely(gcs_handle < 0))
586 {
587 log_debug << "gcs schedule " << strerror(-gcs_handle);
588 rcode = gcs_handle;
589 goto out;
590 }
591 trx.set_gcs_handle(gcs_handle);
592 }
593
594 trx.finalize(last_committed());
595 trx.unlock();
596 // On rollback fragment, we instruct sendv to use gcs_sm_grab()
597 // to avoid the scenario where trx is BF aborted but can't send
598 // ROLLBACK fragment due to flow control, which results in
599 // deadlock.
600 // Otherwise sendv call was scheduled above, and we instruct
601 // the call to use regular gcs_sm_enter()
602 const bool grab(rollback);
603 rcode = gcs_.sendv(actv, act_size,
604 GCS_ACT_WRITESET,
605 scheduled, grab);
606 GU_DBUG_SYNC_WAIT("after_send_sync");
607 trx.lock();
608 }
609 // TODO: Break loop after some timeout
610 while (rcode == -EAGAIN && (usleep(1000), true));
611
612 trx.set_gcs_handle(-1);
613
614 out:
615
616 if (rcode <= 0)
617 {
618 log_debug << "ReplicatorSMM::send failed: " << -rcode;
619 }
620
621 return (rcode > 0 ? WSREP_OK : WSREP_TRX_FAIL);
622 }
623
624
replicate(TrxHandleMaster & trx,wsrep_trx_meta_t * meta)625 wsrep_status_t galera::ReplicatorSMM::replicate(TrxHandleMaster& trx,
626 wsrep_trx_meta_t* meta)
627 {
628 assert(trx.locked());
629 assert(!(trx.flags() & TrxHandle::F_ROLLBACK));
630 assert(trx.state() == TrxHandle::S_EXECUTING ||
631 trx.state() == TrxHandle::S_MUST_ABORT);
632
633 if (state_() < S_JOINED || trx.state() == TrxHandle::S_MUST_ABORT)
634 {
635 must_abort:
636 if (trx.state() == TrxHandle::S_EXECUTING ||
637 trx.state() == TrxHandle::S_REPLICATING)
638 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
639
640 TX_SET_STATE(trx, TrxHandle::S_ABORTING);
641
642 if (trx.ts() != 0)
643 {
644 assert(trx.ts()->state() == TrxHandle::S_COMMITTED);
645 trx.reset_ts();
646 }
647
648 return (st_.corrupt() ? WSREP_NODE_FAIL : WSREP_CONN_FAIL);
649 }
650
651 WriteSetNG::GatherVector actv;
652
653 gcs_action act;
654 act.type = GCS_ACT_WRITESET;
655 #ifndef NDEBUG
656 act.seqno_g = GCS_SEQNO_ILL;
657 #endif
658
659 act.buf = NULL;
660 act.size = trx.gather(actv);
661 TX_SET_STATE(trx, TrxHandle::S_REPLICATING);
662
663 ssize_t rcode(-1);
664
665 do
666 {
667 assert(act.seqno_g == GCS_SEQNO_ILL);
668
669 const ssize_t gcs_handle(gcs_.schedule());
670
671 if (gu_unlikely(gcs_handle < 0))
672 {
673 log_debug << "gcs schedule " << strerror(-gcs_handle);
674 goto must_abort;
675 }
676
677 trx.set_gcs_handle(gcs_handle);
678
679 trx.finalize(last_committed());
680 trx.unlock();
681 assert (act.buf == NULL); // just a sanity check
682 rcode = gcs_.replv(actv, act, true);
683
684 GU_DBUG_SYNC_WAIT("after_replicate_sync")
685 trx.lock();
686 }
687 while (rcode == -EAGAIN && trx.state() != TrxHandle::S_MUST_ABORT &&
688 (usleep(1000), true));
689
690 trx.set_gcs_handle(-1);
691
692 if (rcode < 0)
693 {
694 if (rcode != -EINTR)
695 {
696 log_debug << "gcs_repl() failed with " << strerror(-rcode)
697 << " for trx " << trx;
698 }
699
700 assert(rcode != -EINTR || trx.state() == TrxHandle::S_MUST_ABORT);
701 assert(act.seqno_l == GCS_SEQNO_ILL && act.seqno_g == GCS_SEQNO_ILL);
702 assert(NULL == act.buf);
703
704 if (trx.state() != TrxHandle::S_MUST_ABORT)
705 {
706 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
707 }
708
709 goto must_abort;
710 }
711
712 assert(act.buf != NULL);
713 assert(act.size == rcode);
714 assert(act.seqno_l > 0);
715 assert(act.seqno_g > 0);
716
717 TrxHandleSlavePtr ts(TrxHandleSlave::New(true, slave_pool_),
718 TrxHandleSlaveDeleter());
719
720 gu_trace(ts->unserialize<true>(act));
721 ts->set_local(true);
722
723 ts->update_stats(keys_count_, keys_bytes_, data_bytes_, unrd_bytes_);
724
725 trx.add_replicated(ts);
726
727 ++replicated_;
728 replicated_bytes_ += rcode;
729
730 assert(trx.source_id() == ts->source_id());
731 assert(trx.conn_id() == ts->conn_id());
732 assert(trx.trx_id() == ts->trx_id());
733
734 assert(ts->global_seqno() == act.seqno_g);
735 assert(ts->last_seen_seqno() >= 0);
736
737 assert(trx.ts() == ts);
738
739 wsrep_status_t retval(WSREP_TRX_FAIL);
740
741 // ROLLBACK event shortcut to avoid blocking in monitors or
742 // getting BF aborted inside provider
743 if (gu_unlikely(ts->flags() & TrxHandle::F_ROLLBACK))
744 {
745 // ROLLBACK fragments should be replicated through ReplicatorSMM::send(),
746 // assert here for debug builds to catch if this is not a case.
747 assert(0);
748 assert(ts->depends_seqno() > 0); // must be set at unserialization
749 ts->cert_bypass(true);
750 ts->mark_certified();
751
752 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
753 TX_SET_STATE(trx, TrxHandle::S_ABORTING);
754
755 pending_cert_queue_.push(ts);
756 cancel_monitors_for_local(*ts);
757
758 goto out;
759 }
760
761 if (gu_unlikely(trx.state() == TrxHandle::S_MUST_ABORT))
762 {
763 retval = cert_for_aborted(ts);
764
765 if (retval != WSREP_BF_ABORT)
766 {
767 assert(trx.state() == TrxHandle::S_MUST_ABORT);
768 TX_SET_STATE(trx, TrxHandle::S_ABORTING);
769
770 pending_cert_queue_.push(ts);
771 cancel_monitors_for_local(*ts);
772
773 assert(ts->is_dummy());
774 assert(WSREP_OK != retval);
775 }
776 else
777 {
778 // If the transaction was committing, it must replay.
779 if (ts->flags() & TrxHandle::F_COMMIT)
780 {
781 TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY);
782 }
783 else
784 {
785 TX_SET_STATE(trx, TrxHandle::S_ABORTING);
786
787 pending_cert_queue_.push(ts);
788 cancel_monitors_for_local(*ts);
789
790 retval = WSREP_TRX_FAIL;
791 }
792 }
793 }
794 else
795 {
796 assert(trx.state() == TrxHandle::S_REPLICATING);
797 retval = WSREP_OK;
798 }
799
800 out:
801 assert(trx.state() != TrxHandle::S_MUST_ABORT);
802 assert(ts->global_seqno() > 0);
803 assert(ts->global_seqno() == act.seqno_g);
804
805 if (meta != 0) // whatever the retval, we must update GTID in meta
806 {
807 meta->gtid.uuid = state_uuid_;
808 meta->gtid.seqno = ts->global_seqno();
809 meta->depends_on = ts->depends_seqno();
810 }
811
812 return retval;
813 }
814
815 wsrep_status_t
abort_trx(TrxHandleMaster & trx,wsrep_seqno_t bf_seqno,wsrep_seqno_t * victim_seqno)816 galera::ReplicatorSMM::abort_trx(TrxHandleMaster& trx, wsrep_seqno_t bf_seqno,
817 wsrep_seqno_t* victim_seqno)
818 {
819 assert(trx.local() == true);
820 assert(trx.locked());
821
822 const TrxHandleSlavePtr ts(trx.ts());
823
824 if (ts)
825 {
826 log_debug << "aborting ts " << *ts;
827 assert(ts->global_seqno() != WSREP_SEQNO_UNDEFINED);
828 if (ts->global_seqno() < bf_seqno &&
829 (ts->flags() & TrxHandle::F_COMMIT))
830 {
831 log_debug << "seqno " << bf_seqno
832 << " trying to abort seqno " << ts->global_seqno();
833 *victim_seqno = ts->global_seqno();
834 return WSREP_NOT_ALLOWED;
835 }
836 }
837 else
838 {
839 log_debug << "aborting trx " << trx;
840 }
841
842 wsrep_status_t retval(WSREP_OK);
843 switch (trx.state())
844 {
845 case TrxHandle::S_MUST_ABORT:
846 case TrxHandle::S_ABORTING:
847 case TrxHandle::S_MUST_REPLAY:
848 // victim trx was already BF aborted or it failed certification
849 retval = WSREP_NOT_ALLOWED;
850 break;
851 case TrxHandle::S_EXECUTING:
852 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
853 break;
854 case TrxHandle::S_REPLICATING:
855 {
856 // @note: it is important to place set_state() into beginning of
857 // every case, because state must be changed AFTER switch() and
858 // BEFORE entering monitors or taking any other action.
859 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
860 int rc;
861 if (trx.gcs_handle() > 0 &&
862 ((rc = gcs_.interrupt(trx.gcs_handle()))) != 0)
863 {
864 log_debug << "gcs_interrupt(): handle "
865 << trx.gcs_handle()
866 << " trx id " << trx.trx_id()
867 << ": " << strerror(-rc);
868 }
869 break;
870 }
871 case TrxHandle::S_CERTIFYING:
872 {
873 // trx is waiting in local monitor
874 assert(ts);
875 assert(ts->global_seqno() > 0);
876 log_debug << "aborting ts: " << *ts << "; BF seqno: " << bf_seqno
877 << "; local position: " << local_monitor_.last_left();
878 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
879 LocalOrder lo(*ts);
880 local_monitor_.interrupt(lo);
881 break;
882 }
883 case TrxHandle::S_APPLYING:
884 {
885 // trx is waiting in apply monitor
886 assert(ts);
887 assert(ts->global_seqno() > 0);
888 log_debug << "aborting ts: " << *ts << "; BF seqno: " << bf_seqno
889 << "; apply window: " << apply_monitor_.last_left() << " - "
890 << apply_monitor_.last_entered();
891 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
892 ApplyOrder ao(*ts);
893 apply_monitor_.interrupt(ao);
894 break;
895 }
896 case TrxHandle::S_COMMITTING:
897 {
898 // Trx is waiting in commit monitor
899 assert(ts);
900 assert(ts->global_seqno() > 0);
901 log_debug << "aborting ts: " << *ts << "; BF seqno: " << bf_seqno
902 << "; commit position: " << last_committed();
903 if (co_mode_ != CommitOrder::BYPASS)
904 {
905 CommitOrder co(*ts, co_mode_);
906 bool const interrupted(commit_monitor_.interrupt(co));
907 if (interrupted || !(ts->flags() & TrxHandle::F_COMMIT))
908 {
909 TX_SET_STATE(trx, TrxHandle::S_MUST_ABORT);
910 }
911 else
912 {
913 retval = WSREP_NOT_ALLOWED;
914 }
915 }
916 break;
917 }
918 case TrxHandle::S_COMMITTED:
919 assert(ts);
920 assert(ts->global_seqno() > 0);
921 if (ts->global_seqno() < bf_seqno &&
922 (ts->flags() & TrxHandle::F_COMMIT))
923 {
924 retval = WSREP_NOT_ALLOWED;
925 }
926 else
927 {
928 retval = WSREP_OK;
929 }
930 break;
931 case TrxHandle::S_ROLLING_BACK:
932 log_error << "Attempt to enter commit monitor while holding "
933 "locks in rollback by " << trx;
934 // fallthrough
935 default:
936 log_warn << "invalid state " << trx.state()
937 << " in abort_trx for trx"
938 << trx;
939 assert(0);
940 }
941 if (retval == WSREP_OK || retval == WSREP_NOT_ALLOWED)
942 {
943 *victim_seqno = (ts != 0 ? ts->global_seqno() : WSREP_SEQNO_UNDEFINED);
944 }
945 return retval;
946 }
947
948
certify(TrxHandleMaster & trx,wsrep_trx_meta_t * meta)949 wsrep_status_t galera::ReplicatorSMM::certify(TrxHandleMaster& trx,
950 wsrep_trx_meta_t* meta)
951 {
952 assert(trx.state() == TrxHandle::S_REPLICATING);
953
954 TrxHandleSlavePtr ts(trx.ts());
955 assert(ts->state() == TrxHandle::S_REPLICATING);
956
957 // Rollback should complete with post_rollback
958 assert((ts->flags() & TrxHandle::F_ROLLBACK) == 0);
959
960 assert(ts->local_seqno() > 0);
961 assert(ts->global_seqno() > 0);
962 assert(ts->last_seen_seqno() >= 0);
963 assert(ts->depends_seqno() >= -1);
964
965 if (meta != 0)
966 {
967 assert(meta->gtid.uuid == state_uuid_);
968 assert(meta->gtid.seqno == ts->global_seqno());
969 assert(meta->depends_on == ts->depends_seqno());
970 }
971 // State should not be checked here: If trx has been replicated,
972 // it has to be certified and potentially applied. #528
973 // if (state_() < S_JOINED) return WSREP_TRX_FAIL;
974
975 wsrep_status_t retval(cert_and_catch(&trx, ts));
976
977 assert((ts->flags() & TrxHandle::F_ROLLBACK) == 0 ||
978 trx.state() == TrxHandle::S_ABORTING);
979
980 if (gu_unlikely(retval != WSREP_OK))
981 {
982 switch(retval)
983 {
984 case WSREP_BF_ABORT:
985 assert(ts->depends_seqno() >= 0);
986 assert(trx.state() == TrxHandle::S_MUST_REPLAY ||
987 !(ts->flags() & TrxHandle::F_COMMIT));
988 assert(ts->state() == TrxHandle::S_REPLICATING ||
989 ts->state() == TrxHandle::S_CERTIFYING);
990 // apply monitor will be entered in due course during replay
991 break;
992 case WSREP_TRX_FAIL:
993 /* committing fragment fails certification or non-committing BF'ed */
994 // If the ts was queued, the depends seqno cannot be trusted
995 // as it may be modified concurrently.
996 assert(ts->queued() || ts->is_dummy() ||
997 (ts->flags() & TrxHandle::F_COMMIT) == 0);
998 assert(ts->state() == TrxHandle::S_CERTIFYING ||
999 ts->state() == TrxHandle::S_REPLICATING);
1000 if (ts->state() == TrxHandle::S_REPLICATING)
1001 TX_SET_STATE(*ts, TrxHandle::S_CERTIFYING);
1002 break;
1003 default:
1004 assert(0);
1005 }
1006
1007 return retval;
1008 }
1009 else
1010 {
1011 if (meta) meta->depends_on = ts->depends_seqno();
1012 if (enter_apply_monitor_for_local(trx, ts))
1013 {
1014 TX_SET_STATE(*ts, TrxHandle::S_APPLYING);
1015 if (trx.state() == TrxHandle::S_MUST_ABORT)
1016 return WSREP_BF_ABORT;
1017 else
1018 return WSREP_OK;
1019 }
1020 else
1021 {
1022 return handle_apply_monitor_interrupted(trx, ts);
1023 }
1024 }
1025 }
1026
1027
replay_trx(TrxHandleMaster & trx,TrxHandleLock & lock,void * const trx_ctx)1028 wsrep_status_t galera::ReplicatorSMM::replay_trx(TrxHandleMaster& trx,
1029 TrxHandleLock& lock,
1030 void* const trx_ctx)
1031 {
1032 TrxHandleSlavePtr tsp(trx.ts());
1033 assert(tsp);
1034 TrxHandleSlave& ts(*tsp);
1035
1036 assert(ts.global_seqno() > last_committed());
1037
1038 log_debug << "replay trx: " << trx << " ts: " << ts;
1039
1040 if (trx.state() == TrxHandle::S_MUST_ABORT)
1041 {
1042 // BF aborted outside of provider.
1043 TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY);
1044 }
1045
1046 assert(trx.state() == TrxHandle::S_MUST_REPLAY);
1047 assert(trx.trx_id() != static_cast<wsrep_trx_id_t>(-1));
1048
1049 wsrep_status_t retval(WSREP_OK);
1050
1051 // Note: We set submit NULL trx pointer below to avoid
1052 // interrupting replaying in any monitor during replay.
1053
1054 switch (ts.state())
1055 {
1056 case TrxHandle::S_REPLICATING:
1057 retval = cert_and_catch(&trx, tsp);
1058 assert(ts.state() == TrxHandle::S_CERTIFYING);
1059 if (retval != WSREP_OK)
1060 {
1061 assert(retval == WSREP_TRX_FAIL);
1062 assert(ts.is_dummy());
1063 break;
1064 }
1065 // fall through
1066 case TrxHandle::S_CERTIFYING:
1067 {
1068 assert(ts.state() == TrxHandle::S_CERTIFYING);
1069
1070 ApplyOrder ao(ts);
1071 assert(apply_monitor_.entered(ao) == false);
1072 gu_trace(apply_monitor_.enter(ao));
1073 TX_SET_STATE(ts, TrxHandle::S_APPLYING);
1074 }
1075 // fall through
1076 case TrxHandle::S_APPLYING:
1077 //
1078 // Commit monitor will be entered from commit_order_enter_remote.
1079 //
1080 // fall through
1081 case TrxHandle::S_COMMITTING:
1082 ++local_replays_;
1083
1084 // safety measure to make sure that all preceding trxs are
1085 // ordered for commit before replaying
1086 commit_monitor_.wait(ts.global_seqno() - 1);
1087
1088 TX_SET_STATE(trx, TrxHandle::S_REPLAYING);
1089 try
1090 {
1091 // Only committing transactions should be replayed
1092 assert(ts.flags() & TrxHandle::F_COMMIT);
1093
1094 wsrep_trx_meta_t meta = {{ state_uuid_, ts.global_seqno() },
1095 { ts.source_id(), ts.trx_id(),
1096 ts.conn_id() },
1097 ts.depends_seqno()};
1098
1099 /* failure to replay own trx is certainly a sign of inconsistency,
1100 * not trying to catch anything here */
1101 assert(trx.owned());
1102 bool unused(false);
1103 lock.unlock();
1104 gu_trace(ts.apply(trx_ctx, apply_cb_, meta, unused));
1105 lock.lock();
1106 assert(false == unused);
1107 log_debug << "replayed " << ts.global_seqno();
1108 assert(ts.state() == TrxHandle::S_COMMITTED);
1109 assert(trx.state() == TrxHandle::S_COMMITTED);
1110 }
1111 catch (gu::Exception& e)
1112 {
1113 on_inconsistency();
1114 return WSREP_NODE_FAIL;
1115 }
1116
1117 // apply, commit monitors are released in post commit
1118 return WSREP_OK;
1119 default:
1120 assert(0);
1121 gu_throw_fatal << "Invalid state in replay for trx " << trx;
1122 }
1123
1124 log_debug << "replaying failed for trx " << trx;
1125 assert(trx.state() == TrxHandle::S_ABORTING);
1126
1127 return retval;
1128 }
1129
1130 static void
dump_buf(std::ostream & os,const void * const buf,size_t const buf_len)1131 dump_buf(std::ostream& os, const void* const buf, size_t const buf_len)
1132 {
1133 std::ios_base::fmtflags const saved_flags(os.flags());
1134 char const saved_fill (os.fill('0'));
1135
1136 os << std::oct;
1137
1138 const char* const str(static_cast<const char*>(buf));
1139 for (size_t i(0); i < buf_len; ++i)
1140 {
1141 char const c(str[i]);
1142
1143 if ('\0' == c) break;
1144
1145 try
1146 {
1147 if (isprint(c) || isspace(c))
1148 {
1149 os.put(c);
1150 }
1151 else
1152 {
1153 os << '\\' << std::setw(2) << int(c);
1154 }
1155 }
1156 catch (std::ios_base::failure& f)
1157 {
1158 log_warn << "Failed to dump " << i << "th byte: " << f.what();
1159 break;
1160 }
1161 }
1162
1163 os.flags(saved_flags);
1164 os.fill (saved_fill);
1165 }
1166
1167 wsrep_status_t
handle_commit_interrupt(TrxHandleMaster & trx,const TrxHandleSlave & ts)1168 galera::ReplicatorSMM::handle_commit_interrupt(TrxHandleMaster& trx,
1169 const TrxHandleSlave& ts)
1170 {
1171 assert(trx.state() == TrxHandle::S_MUST_ABORT);
1172
1173 if (ts.flags() & TrxHandle::F_COMMIT)
1174 {
1175 TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY);
1176 return WSREP_BF_ABORT;
1177 }
1178 else
1179 {
1180 TX_SET_STATE(trx, TrxHandle::S_ABORTING);
1181 return WSREP_TRX_FAIL;
1182 }
1183 }
1184
1185 wsrep_status_t
commit_order_enter_local(TrxHandleMaster & trx)1186 galera::ReplicatorSMM::commit_order_enter_local(TrxHandleMaster& trx)
1187 {
1188 assert(trx.local());
1189 assert(trx.ts() && trx.ts()->global_seqno() > 0);
1190 assert(trx.locked());
1191
1192 assert(trx.state() == TrxHandle::S_APPLYING ||
1193 trx.state() == TrxHandle::S_ABORTING ||
1194 trx.state() == TrxHandle::S_REPLAYING);
1195
1196 TrxHandleSlavePtr tsp(trx.ts());
1197 TrxHandleSlave& ts(*tsp);
1198
1199 if (trx.state() != TrxHandle::S_APPLYING)
1200 {
1201 // Transactions which are rolling back or replaying
1202 // may not have grabbed apply monitor so far. Do it
1203 // before proceeding.
1204 enter_apply_monitor_for_local_not_committing(trx, ts);
1205 }
1206 #ifndef NDEBUG
1207 {
1208 ApplyOrder ao(ts);
1209 assert(apply_monitor_.entered(ao));
1210 }
1211 #endif // NDEBUG
1212
1213 TrxHandle::State const next_state
1214 (trx.state() == TrxHandle::S_ABORTING ?
1215 TrxHandle::S_ROLLING_BACK : TrxHandle::S_COMMITTING);
1216
1217 TX_SET_STATE(trx, next_state);
1218
1219 if (co_mode_ == CommitOrder::BYPASS)
1220 {
1221 TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1222 return WSREP_OK;
1223 }
1224
1225 CommitOrder co(ts, co_mode_);
1226 if (ts.state() < TrxHandle::S_COMMITTING)
1227 {
1228 assert(!commit_monitor_.entered(co));
1229 }
1230 else
1231 {
1232 // was BF'ed after having entered commit monitor. This may happen
1233 // for SR fragment.
1234 assert(commit_monitor_.entered(co));
1235 return WSREP_OK;
1236 }
1237
1238 try
1239 {
1240 trx.unlock();
1241 GU_DBUG_SYNC_WAIT("before_local_commit_monitor_enter");
1242 gu_trace(commit_monitor_.enter(co));
1243 assert(commit_monitor_.entered(co));
1244 trx.lock();
1245
1246 TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1247
1248 /* non-committing fragments may be interrupted after having entered
1249 * commit_monitor_ */
1250 if (0 == (ts.flags() & TrxHandle::F_COMMIT) &&
1251 trx.state() == TrxHandle::S_MUST_ABORT)
1252 return handle_commit_interrupt(trx, ts);
1253
1254 assert(trx.state() == TrxHandle::S_COMMITTING ||
1255 trx.state() == TrxHandle::S_ROLLING_BACK);
1256
1257 }
1258 catch (gu::Exception& e)
1259 {
1260 assert(!commit_monitor_.entered(co));
1261 assert(next_state != TrxHandle::S_ROLLING_BACK);
1262 trx.lock();
1263 if (e.get_errno() == EINTR)
1264 {
1265 return handle_commit_interrupt(trx, ts);
1266 }
1267 else throw;
1268 }
1269
1270 assert(ts.global_seqno() > last_committed());
1271 assert(trx.locked());
1272
1273 assert(trx.state() == TrxHandle::S_COMMITTING ||
1274 trx.state() == TrxHandle::S_ROLLING_BACK);
1275
1276 return WSREP_OK;
1277 }
1278
1279 wsrep_status_t
commit_order_enter_remote(TrxHandleSlave & trx)1280 galera::ReplicatorSMM::commit_order_enter_remote(TrxHandleSlave& trx)
1281 {
1282 assert(trx.global_seqno() > 0);
1283 assert(trx.state() == TrxHandle::S_APPLYING ||
1284 trx.state() == TrxHandle::S_ABORTING);
1285
1286 #ifndef NDEBUG
1287 if (trx.state() == TrxHandle::S_REPLAYING)
1288 {
1289 assert(trx.local());
1290 assert((trx.flags() & TrxHandle::F_ROLLBACK) == 0);
1291
1292 ApplyOrder ao(trx);
1293 assert(apply_monitor_.entered(ao));
1294 }
1295 #endif /* NDEBUG */
1296
1297 CommitOrder co(trx, co_mode_);
1298
1299 assert(!commit_monitor_.entered(co));
1300
1301 if (gu_likely(co_mode_ != CommitOrder::BYPASS))
1302 {
1303 gu_trace(commit_monitor_.enter(co));
1304 }
1305
1306 TX_SET_STATE(trx, TrxHandle::S_COMMITTING);
1307
1308 return WSREP_OK;
1309 }
1310
process_apply_error(TrxHandleSlave & trx,const wsrep_buf_t & error)1311 void galera::ReplicatorSMM::process_apply_error(TrxHandleSlave& trx,
1312 const wsrep_buf_t& error)
1313 {
1314 gu::GTID const gtid(state_uuid_, trx.global_seqno());
1315 int res;
1316
1317 if (trx.local_seqno() != -1 || trx.nbo_end())
1318 {
1319 /* this must be done IN ORDER to avoid multiple elections, hence
1320 * anything else but LOCAL_OOOC and NO_OOOC is potentially broken */
1321 res = gcs_.vote(gtid, -1, error.ptr, error.len);
1322 }
1323 else res = 2;
1324
1325 if (res != 0)
1326 {
1327 std::ostringstream os;
1328
1329 switch (res)
1330 {
1331 case 2:
1332 os << "Failed on preordered " << gtid << ": inconsistency.";
1333 break;
1334 case 1:
1335 os << "Inconsistent by consensus on " << gtid;
1336 break;
1337 default:
1338 os << "Could not reach consensus on " << gtid
1339 << " (rcode: " << res << "), assuming inconsistency.";
1340 }
1341
1342 galera::ApplyException ae(os.str(), NULL, error.ptr, error.len);
1343 GU_TRACE(ae);
1344 throw ae;
1345 }
1346 else
1347 {
1348 /* mark action as invalid (skip seqno) and return normally */
1349 gcache_.seqno_skip(trx.action().first,
1350 trx.global_seqno(), GCS_ACT_WRITESET);
1351 }
1352 }
1353
1354 wsrep_status_t
handle_apply_error(TrxHandleSlave & ts,const wsrep_buf_t & error,const std::string & custom_msg)1355 galera::ReplicatorSMM::handle_apply_error(TrxHandleSlave& ts,
1356 const wsrep_buf_t& error,
1357 const std::string& custom_msg)
1358 {
1359 assert(error.len > 0);
1360
1361 std::ostringstream os;
1362
1363 os << custom_msg << ts.global_seqno() << ", error: ";
1364 dump_buf(os, error.ptr, error.len);
1365 log_debug << "handle_apply_error(): " << os.str();
1366
1367 try
1368 {
1369 if (!st_.corrupt())
1370 gu_trace(process_apply_error(ts, error));
1371 return WSREP_OK;
1372 }
1373 catch (ApplyException& e)
1374 {
1375 log_error << "Inconsistency detected: " << e.what();
1376 on_inconsistency();
1377 }
1378 catch (gu::Exception& e)
1379 {
1380 log_error << "Unexpected exception: " << e.what();
1381 assert(0);
1382 abort();
1383 }
1384 catch (...)
1385 {
1386 log_error << "Unknown exception";
1387 assert(0);
1388 abort();
1389 }
1390
1391 return WSREP_NODE_FAIL;
1392 }
1393
1394 wsrep_status_t
commit_order_leave(TrxHandleSlave & ts,const wsrep_buf_t * const error)1395 galera::ReplicatorSMM::commit_order_leave(TrxHandleSlave& ts,
1396 const wsrep_buf_t* const error)
1397 {
1398 assert(ts.state() == TrxHandle::S_COMMITTING);
1399
1400 #ifndef NDEBUG
1401 {
1402 CommitOrder co(ts, co_mode_);
1403 assert(co_mode_ != CommitOrder::BYPASS || commit_monitor_.entered(co));
1404 }
1405 #endif
1406
1407 wsrep_status_t retval(WSREP_OK);
1408
1409 if (gu_unlikely(error != NULL && error->ptr != NULL))
1410 {
1411 retval = handle_apply_error(ts, *error, "Failed to apply writeset ");
1412 }
1413
1414 if (gu_likely(co_mode_ != CommitOrder::BYPASS))
1415 {
1416 CommitOrder co(ts, co_mode_);
1417 commit_monitor_.leave(co);
1418 }
1419
1420 TX_SET_STATE(ts, TrxHandle::S_COMMITTED);
1421 /* master state will be set upon release */
1422
1423 return retval;
1424 }
1425
release_commit(TrxHandleMaster & trx)1426 wsrep_status_t galera::ReplicatorSMM::release_commit(TrxHandleMaster& trx)
1427 {
1428 TrxHandleSlavePtr tsp(trx.ts());
1429 assert(tsp);
1430 TrxHandleSlave& ts(*tsp);
1431
1432 #ifndef NDEBUG
1433 {
1434 CommitOrder co(ts, co_mode_);
1435 assert(co_mode_ == CommitOrder::BYPASS ||
1436 commit_monitor_.entered(co) == false);
1437 }
1438 #endif
1439
1440 log_debug << "release_commit() for trx: " << trx << " ts: " << ts;
1441
1442 assert((ts.flags() & TrxHandle::F_ROLLBACK) == 0);
1443 assert(ts.local_seqno() > 0 && ts.global_seqno() > 0);
1444 assert(ts.state() == TrxHandle::S_COMMITTED);
1445 // Streaming transaction may enter here in aborting state if the
1446 // BF abort happens during fragment commit ordering. Otherwise
1447 // should always be committed.
1448 assert(trx.state() == TrxHandle::S_COMMITTED ||
1449 (trx.state() == TrxHandle::S_ABORTING &&
1450 (ts.flags() & TrxHandle::F_COMMIT) == 0));
1451 assert(!ts.is_committed());
1452
1453 wsrep_seqno_t const safe_to_discard(cert_.set_trx_committed(ts));
1454
1455 ApplyOrder ao(ts);
1456 apply_monitor_.leave(ao);
1457
1458 if ((ts.flags() & TrxHandle::F_COMMIT) == 0 &&
1459 trx.state() == TrxHandle::S_COMMITTED)
1460 {
1461 // continue streaming
1462 TX_SET_STATE(trx, TrxHandle::S_EXECUTING);
1463 }
1464
1465 trx.reset_ts();
1466
1467 ++local_commits_;
1468
1469 report_last_committed(safe_to_discard);
1470
1471 return WSREP_OK;
1472 }
1473
1474
release_rollback(TrxHandleMaster & trx)1475 wsrep_status_t galera::ReplicatorSMM::release_rollback(TrxHandleMaster& trx)
1476 {
1477 assert(trx.locked());
1478
1479 if (trx.state() == TrxHandle::S_MUST_ABORT) // BF abort before replicaiton
1480 TX_SET_STATE(trx, TrxHandle::S_ABORTING);
1481
1482 if (trx.state() == TrxHandle::S_ABORTING ||
1483 trx.state() == TrxHandle::S_EXECUTING)
1484 TX_SET_STATE(trx, TrxHandle::S_ROLLED_BACK);
1485
1486 assert(trx.state() == TrxHandle::S_ROLLED_BACK);
1487
1488 TrxHandleSlavePtr tsp(trx.ts());
1489 if (tsp)
1490 {
1491 TrxHandleSlave& ts(*tsp);
1492 log_debug << "release_rollback() trx: " << trx << ", ts: " << ts;
1493 assert(ts.global_seqno() > 0);
1494 if (ts.global_seqno() > 0)
1495 {
1496 ApplyOrder ao(ts.global_seqno(), 0, ts.local());
1497
1498 // Enter and leave monitors if they were not entered/canceled
1499 // already.
1500 if (ts.state() < TrxHandle::S_COMMITTED)
1501 {
1502 if (ts.state() < TrxHandle::S_CERTIFYING)
1503 {
1504 TX_SET_STATE(ts, TrxHandle::S_CERTIFYING);
1505 }
1506 if (ts.state() < TrxHandle::S_APPLYING)
1507 {
1508 apply_monitor_.enter(ao);
1509 TX_SET_STATE(ts, TrxHandle::S_APPLYING);
1510 }
1511 CommitOrder co(ts, co_mode_);
1512 if (ts.state() < TrxHandle::S_COMMITTING)
1513 {
1514 commit_monitor_.enter(co);
1515 TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1516 }
1517 commit_monitor_.leave(co);
1518 assert(commit_monitor_.last_left() >= ts.global_seqno());
1519 TX_SET_STATE(ts, TrxHandle::S_COMMITTED);
1520 }
1521
1522 /* Queued transactions will be set committed in the queue */
1523 wsrep_seqno_t const safe_to_discard
1524 (ts.queued() ?
1525 WSREP_SEQNO_UNDEFINED : cert_.set_trx_committed(ts));
1526 apply_monitor_.leave(ao);
1527 if (safe_to_discard != WSREP_SEQNO_UNDEFINED)
1528 report_last_committed(safe_to_discard);
1529 }
1530 }
1531 else
1532 {
1533 log_debug << "release_rollback() trx: " << trx << ", ts: nil";
1534 }
1535
1536 // Trx was either rolled back by user or via certification failure,
1537 // last committed report not needed since cert index state didn't change.
1538 // report_last_committed();
1539
1540 trx.reset_ts();
1541 ++local_rollbacks_;
1542
1543 return WSREP_OK;
1544 }
1545
1546
sync_wait(wsrep_gtid_t * upto,int tout,wsrep_gtid_t * gtid)1547 wsrep_status_t galera::ReplicatorSMM::sync_wait(wsrep_gtid_t* upto,
1548 int tout,
1549 wsrep_gtid_t* gtid)
1550 {
1551 gu::GTID wait_gtid;
1552 gu::datetime::Date wait_until(gu::datetime::Date::calendar() +
1553 ((tout == -1) ?
1554 gu::datetime::Period(causal_read_timeout_) :
1555 gu::datetime::Period(tout * gu::datetime::Sec)));
1556
1557 if (upto == 0)
1558 {
1559 try
1560 {
1561 gcs_.caused(wait_gtid, wait_until);
1562 }
1563 catch (gu::Exception& e)
1564 {
1565 log_warn << "gcs_caused() returned " << -e.get_errno()
1566 << " (" << strerror(e.get_errno()) << ")";
1567 return WSREP_TRX_FAIL;
1568 }
1569 }
1570 else
1571 {
1572 wait_gtid.set(upto->uuid, upto->seqno);
1573 }
1574
1575 try
1576 {
1577 // @note: Using timed wait for monitor is currently a hack
1578 // to avoid deadlock resulting from race between monitor wait
1579 // and drain during configuration change. Instead of this,
1580 // monitor should have proper mechanism to interrupt waiters
1581 // at monitor drain and disallowing further waits until
1582 // configuration change related operations (SST etc) have been
1583 // finished.
1584
1585 // Note: Since wsrep API 26 application may request release of
1586 // commit monitor before the commit actually happens (commit
1587 // may have been ordered/queued on application side for later
1588 // processing). Therefore we now rely on apply_monitor on sync
1589 // wait. This is sufficient since apply_monitor is always released
1590 // only after the whole transaction is over.
1591 apply_monitor_.wait(wait_gtid, wait_until);
1592
1593 if (gtid != 0)
1594 {
1595 (void)last_committed_id(gtid);
1596 }
1597 ++causal_reads_;
1598 return WSREP_OK;
1599 }
1600 catch (gu::NotFound& e)
1601 {
1602 log_debug << "monitor wait failed for sync_wait: UUID mismatch";
1603 return WSREP_TRX_MISSING;
1604 }
1605 catch (gu::Exception& e)
1606 {
1607 log_debug << "monitor wait failed for sync_wait: " << e.what();
1608 return WSREP_TRX_FAIL;
1609 }
1610 }
1611
1612
last_committed_id(wsrep_gtid_t * gtid) const1613 wsrep_status_t galera::ReplicatorSMM::last_committed_id(wsrep_gtid_t* gtid) const
1614 {
1615 // Note that we need to use apply monitor to determine last committed
1616 // here. Due to group commit implementation, the commit monitor may
1617 // be released before the commit has finished and the changes
1618 // made by the transaction have become visible. Therefore we rely
1619 // on apply monitor since it remains grabbed until the whole
1620 // commit is over.
1621 apply_monitor_.last_left_gtid(*gtid);
1622 return WSREP_OK;
1623 }
1624
1625
wait_nbo_end(TrxHandleMaster * trx,wsrep_trx_meta_t * meta)1626 wsrep_status_t galera::ReplicatorSMM::wait_nbo_end(TrxHandleMaster* trx,
1627 wsrep_trx_meta_t* meta)
1628 {
1629 gu::shared_ptr<NBOCtx>::type
1630 nbo_ctx(cert_.nbo_ctx(meta->gtid.seqno));
1631
1632 // Send end message
1633 trx->set_state(TrxHandle::S_REPLICATING);
1634
1635 WriteSetNG::GatherVector actv;
1636 size_t const actv_size(
1637 trx->write_set_out().gather(trx->source_id(),
1638 trx->conn_id(),
1639 trx->trx_id(),
1640 actv));
1641 resend:
1642 wsrep_seqno_t lc(last_committed());
1643 if (lc == WSREP_SEQNO_UNDEFINED)
1644 {
1645 // Provider has been closed
1646 return WSREP_NODE_FAIL;
1647 }
1648 trx->finalize(lc);
1649
1650 trx->unlock();
1651 int err(gcs_.sendv(actv, actv_size, GCS_ACT_WRITESET, false, false));
1652 trx->lock();
1653
1654 if (err == -EAGAIN || err == -ENOTCONN || err == -EINTR)
1655 {
1656 // Send was either interrupted due to states excahnge (EAGAIN),
1657 // due to non-prim (ENOTCONN) or due to timeout in send monitor
1658 // (EINTR).
1659 return WSREP_CONN_FAIL;
1660 }
1661 else if (err < 0)
1662 {
1663 log_error << "Failed to send NBO-end: " << err << ": "
1664 << ::strerror(-err);
1665 return WSREP_NODE_FAIL;
1666 }
1667
1668 TrxHandleSlavePtr end_ts;
1669 while ((end_ts = nbo_ctx->wait_ts()) == 0)
1670 {
1671 if (closing_ || state_() == S_CLOSED)
1672 {
1673 log_error << "Closing during nonblocking operation. "
1674 "Node will be left in inconsistent state and must be "
1675 "re-initialized either by full SST or from backup.";
1676 return WSREP_FATAL;
1677 }
1678
1679 if (nbo_ctx->aborted())
1680 {
1681 log_debug << "NBO wait aborted, retrying send";
1682 // Wait was aborted by view change, resend message
1683 goto resend;
1684 }
1685 }
1686
1687 assert(end_ts->ends_nbo() != WSREP_SEQNO_UNDEFINED);
1688
1689 trx->add_replicated(end_ts);
1690
1691 meta->gtid.uuid = state_uuid_;
1692 meta->gtid.seqno = end_ts->global_seqno();
1693 meta->depends_on = end_ts->depends_seqno();
1694
1695 ApplyOrder ao(*end_ts);
1696 apply_monitor_.enter(ao);
1697
1698 CommitOrder co(*end_ts, co_mode_);
1699 if (co_mode_ != CommitOrder::BYPASS)
1700 {
1701 commit_monitor_.enter(co);
1702 }
1703 end_ts->set_state(TrxHandle::S_APPLYING);
1704 end_ts->set_state(TrxHandle::S_COMMITTING);
1705
1706 trx->set_state(TrxHandle::S_CERTIFYING);
1707 trx->set_state(TrxHandle::S_APPLYING);
1708 trx->set_state(TrxHandle::S_COMMITTING);
1709
1710 // Unref
1711 cert_.erase_nbo_ctx(end_ts->ends_nbo());
1712
1713 return WSREP_OK;
1714 }
1715
1716
to_isolation_begin(TrxHandleMaster & trx,wsrep_trx_meta_t * meta)1717 wsrep_status_t galera::ReplicatorSMM::to_isolation_begin(TrxHandleMaster& trx,
1718 wsrep_trx_meta_t* meta)
1719 {
1720 assert(trx.locked());
1721
1722 if (trx.nbo_end())
1723 {
1724 return wait_nbo_end(&trx, meta);
1725 }
1726
1727 TrxHandleSlavePtr ts_ptr(trx.ts());
1728 TrxHandleSlave& ts(*ts_ptr);
1729
1730 if (meta != 0)
1731 {
1732 assert(meta->gtid.seqno > 0);
1733 assert(meta->gtid.seqno == ts.global_seqno());
1734 assert(meta->depends_on == ts.depends_seqno());
1735 }
1736
1737 assert(trx.state() == TrxHandle::S_REPLICATING);
1738 assert(trx.trx_id() == static_cast<wsrep_trx_id_t>(-1));
1739 assert(ts.local_seqno() > -1 && ts.global_seqno() > -1);
1740 assert(ts.global_seqno() > last_committed());
1741
1742 CommitOrder co(ts, co_mode_);
1743 wsrep_status_t const retval(cert_and_catch(&trx, ts_ptr));
1744
1745 ApplyOrder ao(ts);
1746 gu_trace(apply_monitor_.enter(ao));
1747
1748 switch (retval)
1749 {
1750 case WSREP_OK:
1751 {
1752 TX_SET_STATE(trx, TrxHandle::S_APPLYING);
1753 TX_SET_STATE(ts, TrxHandle::S_APPLYING);
1754 TX_SET_STATE(trx, TrxHandle::S_COMMITTING);
1755 TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1756 break;
1757 }
1758 case WSREP_TRX_FAIL:
1759 break;
1760 default:
1761 assert(0);
1762 gu_throw_fatal << "unrecognized retval " << retval
1763 << " for to isolation certification for " << ts;
1764 break;
1765 }
1766
1767 if (co_mode_ != CommitOrder::BYPASS)
1768 try
1769 {
1770 commit_monitor_.enter(co);
1771
1772 if (ts.state() == TrxHandle::S_COMMITTING)
1773 {
1774 log_debug << "Executing TO isolated action: " << ts;
1775 st_.mark_unsafe();
1776 }
1777 else
1778 {
1779 log_debug << "Grabbed TO for failed isolated action: " << ts;
1780 assert(trx.state() == TrxHandle::S_ABORTING);
1781 }
1782 }
1783 catch (...)
1784 {
1785 gu_throw_fatal << "unable to enter commit monitor: " << ts;
1786 }
1787
1788 return retval;
1789 }
1790
1791
1792 wsrep_status_t
to_isolation_end(TrxHandleMaster & trx,const wsrep_buf_t * const err)1793 galera::ReplicatorSMM::to_isolation_end(TrxHandleMaster& trx,
1794 const wsrep_buf_t* const err)
1795 {
1796 TrxHandleSlavePtr ts_ptr(trx.ts());
1797 TrxHandleSlave& ts(*ts_ptr);
1798
1799 log_debug << "Done executing TO isolated action: " << ts;
1800
1801 assert(trx.state() == TrxHandle::S_COMMITTING ||
1802 trx.state() == TrxHandle::S_ABORTING);
1803 assert(ts.state() == TrxHandle::S_COMMITTING ||
1804 ts.state() == TrxHandle::S_CERTIFYING);
1805
1806 wsrep_status_t ret(WSREP_OK);
1807 if (NULL != err && NULL != err->ptr)
1808 {
1809 ret = handle_apply_error(ts, *err, "Failed to execute TOI action ");
1810 }
1811
1812 CommitOrder co(ts, co_mode_);
1813 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.leave(co);
1814
1815 wsrep_seqno_t const safe_to_discard(cert_.set_trx_committed(ts));
1816
1817 ApplyOrder ao(ts);
1818 apply_monitor_.leave(ao);
1819
1820 if (ts.state() == TrxHandle::S_COMMITTING)
1821 {
1822 assert(trx.state() == TrxHandle::S_COMMITTING);
1823 TX_SET_STATE(trx, TrxHandle::S_COMMITTED);
1824 TX_SET_STATE(ts, TrxHandle::S_COMMITTED);
1825
1826 if (trx.nbo_start() == false) st_.mark_safe();
1827 }
1828 else
1829 {
1830 assert(trx.state() == TrxHandle::S_ABORTING);
1831 assert(ts.state() == TrxHandle::S_CERTIFYING);
1832 TX_SET_STATE(trx, TrxHandle::S_ROLLED_BACK);
1833 TX_SET_STATE(ts, TrxHandle::S_APPLYING);
1834 TX_SET_STATE(ts, TrxHandle::S_COMMITTING);
1835 TX_SET_STATE(ts, TrxHandle::S_COMMITTED);
1836 }
1837
1838 report_last_committed(safe_to_discard);
1839
1840 return ret;
1841 }
1842
1843
1844 namespace galera
1845 {
1846
1847 static WriteSetOut*
writeset_from_handle(wsrep_po_handle_t & handle,const TrxHandleMaster::Params & trx_params)1848 writeset_from_handle (wsrep_po_handle_t& handle,
1849 const TrxHandleMaster::Params& trx_params)
1850 {
1851 WriteSetOut* ret = static_cast<WriteSetOut*>(handle.opaque);
1852
1853 if (NULL == ret)
1854 {
1855 try
1856 {
1857 ret = new WriteSetOut(
1858 // gu::String<256>(trx_params.working_dir_) << '/' << &handle,
1859 trx_params.working_dir_, wsrep_trx_id_t(&handle),
1860 /* key format is not essential since we're not adding keys */
1861 KeySet::version(trx_params.key_format_), NULL, 0, 0,
1862 trx_params.record_set_ver_,
1863 WriteSetNG::MAX_VERSION, DataSet::MAX_VERSION, DataSet::MAX_VERSION,
1864 trx_params.max_write_set_size_);
1865
1866 handle.opaque = ret;
1867 }
1868 catch (std::bad_alloc& ba)
1869 {
1870 gu_throw_error(ENOMEM) << "Could not create WriteSetOut";
1871 }
1872 }
1873
1874 return ret;
1875 }
1876
1877 } /* namespace galera */
1878
1879 wsrep_status_t
preordered_collect(wsrep_po_handle_t & handle,const struct wsrep_buf * const data,size_t const count,bool const copy)1880 galera::ReplicatorSMM::preordered_collect(wsrep_po_handle_t& handle,
1881 const struct wsrep_buf* const data,
1882 size_t const count,
1883 bool const copy)
1884 {
1885 WriteSetOut* const ws(writeset_from_handle(handle, trx_params_));
1886
1887 for (size_t i(0); i < count; ++i)
1888 {
1889 ws->append_data(data[i].ptr, data[i].len, copy);
1890 }
1891
1892 return WSREP_OK;
1893 }
1894
1895
1896 wsrep_status_t
preordered_commit(wsrep_po_handle_t & handle,const wsrep_uuid_t & source,uint64_t const flags,int const pa_range,bool const commit)1897 galera::ReplicatorSMM::preordered_commit(wsrep_po_handle_t& handle,
1898 const wsrep_uuid_t& source,
1899 uint64_t const flags,
1900 int const pa_range,
1901 bool const commit)
1902 {
1903 WriteSetOut* const ws(writeset_from_handle(handle, trx_params_));
1904
1905 if (gu_likely(true == commit))
1906 {
1907 assert(source != WSREP_UUID_UNDEFINED);
1908
1909 ws->set_flags (WriteSetNG::wsrep_flags_to_ws_flags(flags) |
1910 WriteSetNG::F_PREORDERED);
1911
1912 /* by loooking at trx_id we should be able to detect gaps / lost events
1913 * (however resending is not implemented yet). Something like
1914 *
1915 * wsrep_trx_id_t const trx_id(cert_.append_preordered(source, ws));
1916 *
1917 * begs to be here. */
1918 wsrep_trx_id_t const trx_id(preordered_id_.add_and_fetch(1));
1919
1920 WriteSetNG::GatherVector actv;
1921
1922 size_t const actv_size(ws->gather(source, 0, trx_id, actv));
1923
1924 ws->finalize_preordered(pa_range); // also adds checksum
1925
1926 int rcode;
1927 do
1928 {
1929 rcode = gcs_.sendv(actv, actv_size, GCS_ACT_WRITESET, false, false);
1930 }
1931 while (rcode == -EAGAIN && (usleep(1000), true));
1932
1933 if (rcode < 0)
1934 gu_throw_error(-rcode)
1935 << "Replication of preordered writeset failed.";
1936 }
1937
1938 delete ws; // cleanup regardless of commit flag
1939
1940 handle.opaque = NULL;
1941
1942 return WSREP_OK;
1943 }
1944
1945
1946 wsrep_status_t
sst_sent(const wsrep_gtid_t & state_id,int rcode)1947 galera::ReplicatorSMM::sst_sent(const wsrep_gtid_t& state_id, int rcode)
1948 {
1949 assert (rcode <= 0);
1950 assert (rcode == 0 || state_id.seqno == WSREP_SEQNO_UNDEFINED);
1951 assert (rcode != 0 || state_id.seqno >= 0);
1952
1953 if (state_() != S_DONOR)
1954 {
1955 log_error << "sst sent called when not SST donor, state " << state_();
1956 return WSREP_CONN_FAIL;
1957 }
1958
1959 if (state_id.uuid != state_uuid_ && rcode >= 0)
1960 {
1961 // state we have sent no longer corresponds to the current group state
1962 // mark an error
1963 rcode = -EREMCHG;
1964 }
1965
1966 try {
1967 if (rcode == 0)
1968 gcs_.join(gu::GTID(state_id.uuid, state_id.seqno), rcode);
1969 else
1970 /* stamp error message with the current state */
1971 gcs_.join(gu::GTID(state_uuid_, commit_monitor_.last_left()), rcode);
1972
1973 return WSREP_OK;
1974 }
1975 catch (gu::Exception& e)
1976 {
1977 log_error << "failed to recover from DONOR state: " << e.what();
1978 return WSREP_CONN_FAIL;
1979 }
1980 }
1981
1982 // Checks if the seqno has been assgined for the gcache buffer.
1983 // If yes, discard the old and use the one assigned in IST.
1984 // This is required to make the correct gcache buffer associated
1985 // with certification index entries.
1986 galera::TrxHandleSlavePtr
get_real_ts_with_gcache_buffer(const TrxHandleSlavePtr & ts)1987 galera::ReplicatorSMM::get_real_ts_with_gcache_buffer(
1988 const TrxHandleSlavePtr& ts)
1989 {
1990 try
1991 {
1992 ssize_t size;
1993 const void* buf(gcache_.seqno_get_ptr(ts->global_seqno(), size));
1994 // GCache seqno_get_ptr() did not throw, so there was a matching
1995 // entry in GCache. Construct a new TrxHandleSlavePtr from
1996 // existing gcache buffer and discard the old one.
1997 TrxHandleSlavePtr ret(TrxHandleSlave::New(false, slave_pool_),
1998 TrxHandleSlaveDeleter());
1999 if (size > 0)
2000 {
2001 gu_trace(ret->unserialize<false>(
2002 gcs_action{ts->global_seqno(), WSREP_SEQNO_UNDEFINED,
2003 buf, int32_t(size), GCS_ACT_WRITESET}));
2004 ret->set_local(false);
2005 assert(ret->global_seqno() == ts->global_seqno());
2006 assert(ret->depends_seqno() >= 0 || ts->nbo_end());
2007 assert(ret->action().first && ret->action().second);
2008 ret->verify_checksum();
2009 }
2010 else
2011 {
2012 ret->set_global_seqno(ts->global_seqno());
2013 ret->mark_dummy_with_action(buf);
2014 }
2015
2016 // The bufs should never match as the seqno should not have been
2017 // yet assigned to buf on this codepath.
2018 assert(ts->action().first != buf);
2019 // Free duplicate buffer.
2020 if (ts->action().first != buf)
2021 {
2022 gcache_.free(const_cast<void*>(ts->action().first));
2023 }
2024 return ret;
2025 }
2026 catch (const gu::NotFound&)
2027 {
2028 // Seqno was not assigned to this buffer, so it was not part of
2029 // IST processing and was allocated by GCS.
2030 gcache_.seqno_assign(ts->action().first, ts->global_seqno(),
2031 GCS_ACT_WRITESET, false);
2032 return ts;
2033 }
2034 }
2035
handle_trx_overlapping_ist(const TrxHandleSlavePtr & ts)2036 void galera::ReplicatorSMM::handle_trx_overlapping_ist(
2037 const TrxHandleSlavePtr& ts)
2038 {
2039 // Out of order processing. IST has already applied the
2040 // trx.
2041 assert (ts->global_seqno() <= apply_monitor_.last_left());
2042
2043 assert(not ts->local());
2044 // Use local seqno from original ts for local monitor.
2045 LocalOrder lo(ts->local_seqno(), ts.get());
2046
2047 // Get real_ts pointing to GCache buffer which will not be discarded
2048 // if there is overlap. Do not try to access ts after this line.
2049 auto real_ts(get_real_ts_with_gcache_buffer(ts));
2050 local_monitor_.enter(lo);
2051 // If global seqno is higher than certification position, this
2052 // trx was not part of the preload ad must be appended to
2053 // certification index.
2054 if (real_ts->global_seqno() > cert_.position())
2055 {
2056 // We don't care about the result, just populate the index
2057 // and mark trx committed in certification.
2058 // see skip_prim_conf_change() for analogous logic
2059 (void)cert_.append_trx(real_ts);
2060 report_last_committed(cert_.set_trx_committed(*real_ts));
2061 }
2062 local_monitor_.leave(lo);
2063 }
2064
process_trx(void * recv_ctx,const TrxHandleSlavePtr & ts_ptr)2065 void galera::ReplicatorSMM::process_trx(void* recv_ctx,
2066 const TrxHandleSlavePtr& ts_ptr)
2067 {
2068 assert(recv_ctx != 0);
2069 assert(ts_ptr != 0);
2070
2071 TrxHandleSlave& ts(*ts_ptr);
2072
2073 assert(ts.local_seqno() > 0);
2074 assert(ts.global_seqno() > 0);
2075 assert(ts.last_seen_seqno() >= 0);
2076 assert(ts.depends_seqno() == -1 || ts.version() >= 4);
2077 assert(ts.state() == TrxHandle::S_REPLICATING);
2078
2079 // SST thread drains monitors after IST, so this should be
2080 // safe way to check if the ts was contained in IST.
2081 if (ts.global_seqno() <= apply_monitor_.last_left())
2082 {
2083 handle_trx_overlapping_ist(ts_ptr);
2084 return;
2085 }
2086
2087 wsrep_status_t const retval(cert_and_catch(0, ts_ptr));
2088
2089 switch (retval)
2090 {
2091 case WSREP_TRX_FAIL:
2092 assert(ts.is_dummy());
2093 /* fall through to apply_trx() */
2094 case WSREP_OK:
2095 try
2096 {
2097 if (ts.nbo_end() == true)
2098 {
2099 // NBO-end events are for internal operation only, not to be
2100 // consumed by application. If the NBO end happens with
2101 // different seqno than the current event's global seqno,
2102 // release monitors. In other case monitors will be grabbed
2103 // by local NBO handler threads.
2104 if (ts.ends_nbo() == WSREP_SEQNO_UNDEFINED)
2105 {
2106 assert(WSREP_OK != retval);
2107 assert(ts.is_dummy());
2108 }
2109 else
2110 {
2111 assert(WSREP_OK == retval);
2112 assert(ts.ends_nbo() > 0);
2113 // Signal NBO waiter here after leaving local ordering
2114 // critical section.
2115 gu::shared_ptr<NBOCtx>::type nbo_ctx(
2116 cert_.nbo_ctx(ts.ends_nbo()));
2117 assert(nbo_ctx != 0);
2118 nbo_ctx->set_ts(ts_ptr);
2119 break;
2120 }
2121 }
2122
2123 gu_trace(apply_trx(recv_ctx, ts));
2124 }
2125 catch (std::exception& e)
2126 {
2127 log_fatal << "Failed to apply trx: " << ts;
2128 log_fatal << e.what();
2129 log_fatal << "Node consistency compromized, leaving cluster...";
2130 mark_corrupt_and_close();
2131 assert(0); // this is an unexpected exception
2132 // keep processing events from the queue until provider is closed
2133 }
2134 break;
2135 default:
2136 // this should not happen for remote actions
2137 gu_throw_error(EINVAL)
2138 << "unrecognized retval for remote trx certification: "
2139 << retval << " trx: " << ts;
2140 }
2141 }
2142
2143
process_commit_cut(wsrep_seqno_t const seq,wsrep_seqno_t const seqno_l)2144 void galera::ReplicatorSMM::process_commit_cut(wsrep_seqno_t const seq,
2145 wsrep_seqno_t const seqno_l)
2146 {
2147 assert(seq > 0);
2148 assert(seqno_l > 0);
2149
2150 LocalOrder lo(seqno_l);
2151
2152 gu_trace(local_monitor_.enter(lo));
2153
2154 if (seq >= cc_seqno_) /* Refs #782. workaround for
2155 * assert(seqno >= seqno_released_) in gcache. */
2156 cert_.purge_trxs_upto(seq, true);
2157
2158 local_monitor_.leave(lo);
2159 log_debug << "Got commit cut from GCS: " << seq;
2160 }
2161
2162
2163 /* NB: the only use for this method is in cancel_seqnos() below */
cancel_seqno(wsrep_seqno_t const seqno)2164 void galera::ReplicatorSMM::cancel_seqno(wsrep_seqno_t const seqno)
2165 {
2166 assert(seqno > 0);
2167
2168 ApplyOrder ao(seqno, seqno - 1);
2169 apply_monitor_.self_cancel(ao);
2170
2171 if (co_mode_ != CommitOrder::BYPASS)
2172 {
2173 CommitOrder co(seqno, co_mode_);
2174 commit_monitor_.self_cancel(co);
2175 }
2176 }
2177
2178 /* NB: the only use for this method is to dismiss the slave queue
2179 * in corrupt state */
cancel_seqnos(wsrep_seqno_t const seqno_l,wsrep_seqno_t const seqno_g)2180 void galera::ReplicatorSMM::cancel_seqnos(wsrep_seqno_t const seqno_l,
2181 wsrep_seqno_t const seqno_g)
2182 {
2183 if (seqno_l > 0)
2184 {
2185 LocalOrder lo(seqno_l);
2186 local_monitor_.self_cancel(lo);
2187 }
2188
2189 if (seqno_g > 0) cancel_seqno(seqno_g);
2190 }
2191
2192
drain_monitors(wsrep_seqno_t const upto)2193 void galera::ReplicatorSMM::drain_monitors(wsrep_seqno_t const upto)
2194 {
2195 apply_monitor_.drain(upto);
2196 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(upto);
2197 }
2198
2199
process_vote(wsrep_seqno_t const seqno_g,wsrep_seqno_t const seqno_l,int64_t const code)2200 void galera::ReplicatorSMM::process_vote(wsrep_seqno_t const seqno_g,
2201 wsrep_seqno_t const seqno_l,
2202 int64_t const code)
2203 {
2204 assert(seqno_g > 0);
2205 assert(seqno_l > 0);
2206
2207 std::ostringstream msg;
2208
2209 LocalOrder lo(seqno_l);
2210 gu_trace(local_monitor_.enter(lo));
2211
2212 gu::GTID const gtid(state_uuid_, seqno_g);
2213
2214 if (code > 0) /* vote request */
2215 {
2216 assert(GCS_VOTE_REQUEST == code);
2217 log_info << "Got vote request for seqno " << gtid; //remove
2218 /* make sure WS was either successfully applied or already voted */
2219 if (last_committed() < seqno_g) drain_monitors(seqno_g);
2220 if (st_.corrupt()) goto out;
2221
2222 int const ret(gcs_.vote(gtid, 0, NULL, 0));
2223
2224 switch (ret)
2225 {
2226 case 0: /* majority agrees */
2227 log_info << "Vote 0 (success) on " << gtid
2228 << " is consistent with group. Continue.";
2229 goto out;
2230 case -EALREADY: /* already voted */
2231 log_info << gtid << " already voted on. Continue.";
2232 goto out;
2233 case 1: /* majority disagrees */
2234 msg << "Vote 0 (success) on " << gtid
2235 << " is inconsistent with group. Leaving cluster.";
2236 goto fail;
2237 default: /* general error */
2238 assert(ret < 0);
2239 msg << "Failed to vote on request for " << gtid << ": "
2240 << -ret << " (" << ::strerror(-ret) << "). "
2241 "Assuming inconsistency";
2242 goto fail;
2243 }
2244 }
2245 else if (code < 0)
2246 {
2247 msg << "Got negative vote on successfully applied " << gtid;
2248 fail:
2249 log_error << msg.str();
2250 on_inconsistency();
2251 }
2252 else
2253 {
2254 /* seems we are in majority */
2255 }
2256 out:
2257 local_monitor_.leave(lo);
2258 }
2259
2260
set_initial_position(const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno)2261 void galera::ReplicatorSMM::set_initial_position(const wsrep_uuid_t& uuid,
2262 wsrep_seqno_t const seqno)
2263 {
2264 update_state_uuid(uuid);
2265
2266 apply_monitor_.set_initial_position(uuid, seqno);
2267 if (co_mode_ != CommitOrder::BYPASS)
2268 commit_monitor_.set_initial_position(uuid, seqno);
2269 }
2270
2271 std::tuple<int, enum gu::RecordSet::Version>
get_trx_protocol_versions(int proto_ver)2272 galera::get_trx_protocol_versions(int proto_ver)
2273 {
2274 enum gu::RecordSet::Version record_set_ver(gu::RecordSet::EMPTY);
2275 int trx_ver(-1);
2276 switch (proto_ver)
2277 {
2278 case 1:
2279 trx_ver = 1;
2280 record_set_ver = gu::RecordSet::VER1;
2281 break;
2282 case 2:
2283 trx_ver = 1;
2284 record_set_ver = gu::RecordSet::VER1;
2285 break;
2286 case 3:
2287 case 4:
2288 trx_ver = 2;
2289 record_set_ver = gu::RecordSet::VER1;
2290 break;
2291 case 5:
2292 trx_ver = 3;
2293 record_set_ver = gu::RecordSet::VER1;
2294 break;
2295 case 6:
2296 trx_ver = 3;
2297 record_set_ver = gu::RecordSet::VER1;
2298 break;
2299 case 7:
2300 // Protocol upgrade to handle IST SSL backwards compatibility,
2301 // no effect to TRX or STR protocols.
2302 trx_ver = 3;
2303 record_set_ver = gu::RecordSet::VER1;
2304 break;
2305 case 8:
2306 // Protocol upgrade to enforce 8-byte alignment in writesets and CCs
2307 trx_ver = 3;
2308 record_set_ver = gu::RecordSet::VER2;
2309 break;
2310 case 9:
2311 // Protocol upgrade to enable support for semi-shared key type.
2312 trx_ver = 4;
2313 record_set_ver = gu::RecordSet::VER2;
2314 break;
2315 case 10:
2316 // Protocol upgrade to enable support for:
2317 trx_ver = 5;// PA range preset in the writeset,
2318 // WSREP_KEY_UPDATE support (API v26)
2319 record_set_ver = gu::RecordSet::VER2;
2320 break;
2321 default:
2322 gu_throw_error(EPROTO)
2323 << "Configuration change resulted in an unsupported protocol "
2324 "version: " << proto_ver << ". Can't continue.";
2325 };
2326 return std::make_tuple(trx_ver, record_set_ver);
2327 }
2328
establish_protocol_versions(int proto_ver)2329 void galera::ReplicatorSMM::establish_protocol_versions (int proto_ver)
2330 {
2331 try
2332 {
2333 const auto trx_versions(get_trx_protocol_versions(proto_ver));
2334 trx_params_.version_ = std::get<0>(trx_versions);
2335 trx_params_.record_set_ver_ = std::get<1>(trx_versions);
2336 protocol_version_ = proto_ver;
2337 log_info << "REPL Protocols: " << protocol_version_ << " ("
2338 << trx_params_.version_ << ")";
2339 }
2340 catch (const gu::Exception& e)
2341 {
2342 log_fatal << "Caught exception: " << e.what();
2343 abort();
2344 }
2345 }
2346
record_cc_seqnos(wsrep_seqno_t cc_seqno,const char * source)2347 void galera::ReplicatorSMM::record_cc_seqnos(wsrep_seqno_t cc_seqno,
2348 const char* source)
2349 {
2350 cc_seqno_ = cc_seqno;
2351 cc_lowest_trx_seqno_ = cert_.lowest_trx_seqno();
2352 log_info << "Lowest cert index boundary for CC from " << source
2353 << ": " << cc_lowest_trx_seqno_;;
2354 log_info << "Min available from gcache for CC from " << source
2355 << ": " << gcache_.seqno_min();
2356 // Lowest TRX must not have been released from gcache at this
2357 // point.
2358 assert(cc_lowest_trx_seqno_ >= gcache_.seqno_min());
2359 }
2360
2361 void
update_incoming_list(const wsrep_view_info_t & view)2362 galera::ReplicatorSMM::update_incoming_list(const wsrep_view_info_t& view)
2363 {
2364 static char const separator(',');
2365
2366 ssize_t new_size(0);
2367
2368 if (view.memb_num > 0)
2369 {
2370 new_size += view.memb_num - 1; // separators
2371
2372 for (int i = 0; i < view.memb_num; ++i)
2373 {
2374 new_size += strlen(view.members[i].incoming);
2375 }
2376 }
2377
2378 gu::Lock lock(incoming_mutex_);
2379
2380 incoming_list_.clear();
2381 incoming_list_.resize(new_size);
2382
2383 if (new_size <= 0) return;
2384
2385 incoming_list_ = view.members[0].incoming;
2386
2387 for (int i = 1; i < view.memb_num; ++i)
2388 {
2389 incoming_list_ += separator;
2390 incoming_list_ += view.members[i].incoming;
2391 }
2392 }
2393
state2repl(gcs_node_state const my_state,int const my_idx)2394 static galera::Replicator::State state2repl(gcs_node_state const my_state,
2395 int const my_idx)
2396 {
2397 switch (my_state)
2398 {
2399 case GCS_NODE_STATE_NON_PRIM:
2400 case GCS_NODE_STATE_PRIM:
2401 return galera::Replicator::S_CONNECTED;
2402 case GCS_NODE_STATE_JOINER:
2403 return galera::Replicator::S_JOINING;
2404 case GCS_NODE_STATE_JOINED:
2405 return galera::Replicator::S_JOINED;
2406 case GCS_NODE_STATE_SYNCED:
2407 return galera::Replicator::S_SYNCED;
2408 case GCS_NODE_STATE_DONOR:
2409 return galera::Replicator::S_DONOR;
2410 case GCS_NODE_STATE_MAX:
2411 assert(0);
2412 }
2413
2414 gu_throw_fatal << "unhandled gcs state: " << my_state;
2415 GU_DEBUG_NORETURN;
2416 }
2417
2418 void
submit_view_info(void * recv_ctx,const wsrep_view_info_t * view_info)2419 galera::ReplicatorSMM::submit_view_info(void* recv_ctx,
2420 const wsrep_view_info_t* view_info)
2421 {
2422 wsrep_cb_status_t const rcode
2423 (view_cb_(app_ctx_, recv_ctx, view_info, 0, 0));
2424
2425 if (WSREP_CB_SUCCESS != rcode)
2426 {
2427 gu_throw_fatal << "View callback failed. "
2428 "This is unrecoverable, restart required.";
2429 }
2430 }
2431
2432 void
process_conf_change(void * recv_ctx,const struct gcs_action & cc)2433 galera::ReplicatorSMM::process_conf_change(void* recv_ctx,
2434 const struct gcs_action& cc)
2435 {
2436 assert(cc.seqno_l > 0); // Must not be from IST
2437
2438 gcs_act_cchange const conf(cc.buf, cc.size);
2439
2440 LocalOrder lo(cc.seqno_l);
2441 local_monitor_.enter(lo);
2442
2443 process_pending_queue(cc.seqno_l);
2444
2445 if (conf.conf_id < 0)
2446 {
2447 process_non_prim_conf_change(recv_ctx, conf, cc.seqno_g);
2448 gcache_.free(const_cast<void*>(cc.buf));
2449 }
2450 else
2451 {
2452 process_prim_conf_change(recv_ctx, conf, cc.seqno_g,
2453 const_cast<void*>(cc.buf));
2454 }
2455
2456 resume_recv();
2457
2458 local_monitor_.leave(lo);
2459
2460 if (conf.memb.size() == 0)
2461 {
2462 log_debug << "Received SELF-LEAVE. Connection closed.";
2463 assert(conf.conf_id < 0 && cc.seqno_g < 0);
2464 gu::Lock lock(closing_mutex_);
2465 shift_to_CLOSED();
2466 }
2467 }
2468
drain_monitors_for_local_conf_change()2469 void galera::ReplicatorSMM::drain_monitors_for_local_conf_change()
2470 {
2471 wsrep_seqno_t const upto(cert_.position());
2472 assert(upto >= last_committed());
2473 if (upto >= last_committed())
2474 {
2475 log_debug << "Drain monitors from " << last_committed()
2476 << " up to " << upto;
2477 gu_trace(drain_monitors(upto));
2478 }
2479 else
2480 {
2481 log_warn << "Cert position " << upto << " less than last committed "
2482 << last_committed();
2483 }
2484 }
2485
process_non_prim_conf_change(void * recv_ctx,const gcs_act_cchange & conf,int const my_index)2486 void galera::ReplicatorSMM::process_non_prim_conf_change(
2487 void* recv_ctx,
2488 const gcs_act_cchange& conf,
2489 int const my_index)
2490 {
2491 assert(conf.conf_id == WSREP_SEQNO_UNDEFINED);
2492
2493 /* ignore outdated non-prim configuration change */
2494 if (conf.uuid == state_uuid_ && conf.seqno < sst_seqno_) return;
2495
2496 wsrep_uuid_t new_uuid(uuid_);
2497 wsrep_view_info_t* const view_info
2498 (galera_view_info_create(conf,
2499 capabilities(conf.repl_proto_ver),
2500 my_index, new_uuid));
2501 // Non-prim should not change UUID
2502 assert(uuid_ == WSREP_UUID_UNDEFINED || new_uuid == uuid_);
2503 assert(view_info->status == WSREP_VIEW_NON_PRIMARY);
2504
2505 // Draining monitors could hang when the state is corrupt as
2506 // there may be blocked appliers.
2507 if (not st_.corrupt())
2508 {
2509 drain_monitors_for_local_conf_change();
2510 }
2511
2512 update_incoming_list(*view_info);
2513
2514 try
2515 {
2516 submit_view_info(recv_ctx, view_info);
2517 free(view_info);
2518 }
2519 catch (gu::Exception& e)
2520 {
2521 free(view_info);
2522 log_fatal << e.what();
2523 abort();
2524 }
2525
2526 {
2527 gu::Lock lock(closing_mutex_);
2528 if (state_() > S_CONNECTED)
2529 {
2530 state_.shift_to(S_CONNECTED);
2531 }
2532 }
2533 }
2534
validate_local_prim_view_info(const wsrep_view_info_t * view_info,const wsrep_uuid_t & my_uuid)2535 static void validate_local_prim_view_info(const wsrep_view_info_t* view_info,
2536 const wsrep_uuid_t& my_uuid)
2537 {
2538 assert(view_info->status == WSREP_VIEW_PRIMARY);
2539 if (view_info->memb_num > 0 &&
2540 (view_info->my_idx < 0 || view_info->my_idx >= view_info->memb_num))
2541 // something went wrong, member must be present in own view
2542 {
2543 std::ostringstream msg;
2544 msg << "Node UUID " << my_uuid << " is absent from the view:\n";
2545 for (int m(0); m < view_info->memb_num; ++m)
2546 {
2547 msg << '\t' << view_info->members[m].id << '\n';
2548 }
2549 msg << "most likely due to unexpected node identity change. "
2550 "Aborting.";
2551 log_fatal << msg.str();
2552 abort();
2553 }
2554 }
2555
skip_prim_conf_change(const wsrep_view_info_t & view_info,int const proto_ver)2556 bool galera::ReplicatorSMM::skip_prim_conf_change(
2557 const wsrep_view_info_t& view_info, int const proto_ver)
2558 {
2559 auto cc_seqno(WSREP_SEQNO_UNDEFINED);
2560 bool keep(false); // keep in cache
2561
2562 if (proto_ver >= PROTO_VER_ORDERED_CC)
2563 {
2564 cc_seqno = view_info.state_id.seqno;
2565
2566 if (cc_seqno > cert_.position())
2567 {
2568 // was not part of IST preload, adjust cert. index
2569 // see handle_trx_overlapping_ist() for analogous logic
2570 assert(cc_seqno == cert_.position() + 1);
2571 const auto trx_ver
2572 (std::get<0>(get_trx_protocol_versions(proto_ver)));
2573 cert_.adjust_position(view_info,
2574 gu::GTID(view_info.state_id.uuid, cc_seqno),
2575 trx_ver);
2576 keep = true;
2577 }
2578 }
2579
2580 log_info << "####### skipping local CC " << cc_seqno << ", keep in cache: "
2581 << (keep ? "true" : "false");
2582
2583 return keep;
2584 }
2585
process_first_view(const wsrep_view_info_t * view_info,const wsrep_uuid_t & new_uuid)2586 void galera::ReplicatorSMM::process_first_view(
2587 const wsrep_view_info_t* view_info, const wsrep_uuid_t& new_uuid)
2588 {
2589 assert(uuid_ == WSREP_UUID_UNDEFINED && new_uuid != WSREP_UUID_UNDEFINED);
2590 assert(view_info->state_id.uuid != WSREP_UUID_UNDEFINED);
2591 uuid_ = new_uuid;
2592 log_info << "Process first view: " << view_info->state_id.uuid
2593 << " my uuid: " << new_uuid;
2594 if (connected_cb_)
2595 {
2596 wsrep_cb_status_t cret(connected_cb_(app_ctx_, view_info));
2597 if (cret != WSREP_CB_SUCCESS)
2598 {
2599 log_fatal << "Application returned error "
2600 << cret
2601 << " from connect callback, aborting";
2602 abort();
2603 }
2604 }
2605 }
2606
process_group_change(const wsrep_view_info_t * view_info)2607 void galera::ReplicatorSMM::process_group_change(
2608 const wsrep_view_info_t* view_info)
2609 {
2610 assert(state_uuid_ != view_info->state_id.uuid);
2611 log_info << "Process group change: "
2612 << state_uuid_ << " -> " << view_info->state_id.uuid;
2613 if (connected_cb_)
2614 {
2615 wsrep_cb_status_t cret(connected_cb_(app_ctx_, view_info));
2616 if (cret != WSREP_CB_SUCCESS)
2617 {
2618 log_fatal << "Application returned error "
2619 << cret
2620 << " from connect callback, aborting";
2621 abort();
2622 }
2623 }
2624 }
2625
process_st_required(void * recv_ctx,int const group_proto_ver,const wsrep_view_info_t * view_info)2626 void galera::ReplicatorSMM::process_st_required(
2627 void* recv_ctx,
2628 int const group_proto_ver,
2629 const wsrep_view_info_t* view_info)
2630 {
2631 const wsrep_seqno_t group_seqno(view_info->state_id.seqno);
2632 const wsrep_uuid_t& group_uuid (view_info->state_id.uuid);
2633
2634 void* app_req(0);
2635 size_t app_req_len(0);
2636 #ifndef NDEBUG
2637 bool app_waits_sst(false);
2638 #endif
2639 log_info << "State transfer required: "
2640 << "\n\tGroup state: " << group_uuid << ":" << group_seqno
2641 << "\n\tLocal state: " << state_uuid_<< ":" << last_committed();
2642
2643 if (S_CONNECTED != state_()) state_.shift_to(S_CONNECTED);
2644
2645 wsrep_cb_status_t const rcode(sst_request_cb_(app_ctx_,
2646 &app_req, &app_req_len));
2647
2648 if (WSREP_CB_SUCCESS != rcode)
2649 {
2650 assert(app_req_len <= 0);
2651 log_fatal << "SST request callback failed. This is unrecoverable, "
2652 << "restart required.";
2653 abort();
2654 }
2655 else if (0 == app_req_len && state_uuid_ != group_uuid)
2656 {
2657 log_fatal << "Local state UUID " << state_uuid_
2658 << " is different from group state UUID " << group_uuid
2659 << ", and SST request is null: restart required.";
2660 abort();
2661 }
2662 #ifndef NDEBUG
2663 app_waits_sst = (app_req_len > 0) &&
2664 (app_req_len != (strlen(WSREP_STATE_TRANSFER_NONE) + 1) ||
2665 memcmp(app_req, WSREP_STATE_TRANSFER_NONE, app_req_len));
2666 log_info << "App waits SST: " << app_waits_sst;
2667 #endif
2668 // GCache::seqno_reset() happens here
2669 request_state_transfer (recv_ctx,
2670 group_proto_ver, group_uuid, group_seqno, app_req,
2671 app_req_len);
2672 free(app_req);
2673
2674 finish_local_prim_conf_change(group_proto_ver, group_seqno, "sst");
2675 // No need to submit view info. It is always contained either
2676 // in SST or applied in IST.
2677 }
2678
reset_index_if_needed(const wsrep_view_info_t * view_info,int const prev_protocol_version,int const next_protocol_version,bool const st_required)2679 void galera::ReplicatorSMM::reset_index_if_needed(
2680 const wsrep_view_info_t* view_info,
2681 int const prev_protocol_version,
2682 int const next_protocol_version,
2683 bool const st_required)
2684 {
2685 const wsrep_seqno_t group_seqno(view_info->state_id.seqno);
2686 const wsrep_uuid_t& group_uuid (view_info->state_id.uuid);
2687
2688 //
2689 // Starting from protocol_version_ 8 joiner's cert index is rebuilt
2690 // from IST.
2691 //
2692 // The reasons to reset cert index:
2693 // - Protocol version lower than PROTO_VER_ORDERED_CC (ALL)
2694 // - Protocol upgrade (ALL)
2695 // - State transfer will take place (JOINER)
2696 //
2697 bool index_reset(next_protocol_version < PROTO_VER_ORDERED_CC ||
2698 prev_protocol_version != next_protocol_version ||
2699 // this last condition is a bit too strict. In fact
2700 // checking for app_waits_sst would be enough, but in
2701 // that case we'd have to skip cert index rebuilding
2702 // when there is none.
2703 // This would complicate the logic with little to no
2704 // benefits...
2705 st_required);
2706
2707 if (index_reset)
2708 {
2709 gu::GTID position;
2710 int trx_proto_ver;
2711 if (next_protocol_version < PROTO_VER_ORDERED_CC)
2712 {
2713 position.set(group_uuid, group_seqno);
2714 trx_proto_ver = std::get<0>(get_trx_protocol_versions(
2715 next_protocol_version));
2716 }
2717 else
2718 {
2719 position = gu::GTID();
2720 // With PROTO_VER_ORDERED_CC/index preload the cert protocol version
2721 // is adjusted during IST/cert index preload.
2722 // See process_ist_conf_change().
2723 trx_proto_ver = -1;
2724 }
2725 // Index will be reset, so all write sets preceding this CC in
2726 // local order must be discarded. Therefore the pending cert queue
2727 // must also be cleared.
2728 pending_cert_queue_.clear();
2729 /* 2 reasons for this here:
2730 * 1 - compatibility with protocols < PROTO_VER_ORDERED_CC
2731 * 2 - preparing cert index for preloading by setting seqno to 0 */
2732 log_info << "Cert index reset to " << position << " (proto: "
2733 << next_protocol_version << "), state transfer needed: "
2734 << (st_required ? "yes" : "no");
2735 /* flushes service thd, must be called before gcache_.seqno_reset()*/
2736 cert_.assign_initial_position(position, trx_proto_ver);
2737 }
2738 else
2739 {
2740 log_info << "Skipping cert index reset";
2741 }
2742
2743 }
2744
shift_to_next_state(Replicator::State next_state)2745 void galera::ReplicatorSMM::shift_to_next_state(Replicator::State next_state)
2746 {
2747 if (state_() == S_CONNECTED || state_() == S_DONOR)
2748 {
2749 switch (next_state)
2750 {
2751 case S_JOINING:
2752 state_.shift_to(S_JOINING);
2753 break;
2754 case S_DONOR:
2755 if (state_() == S_CONNECTED)
2756 {
2757 state_.shift_to(S_DONOR);
2758 }
2759 break;
2760 case S_JOINED:
2761 state_.shift_to(S_JOINED);
2762 break;
2763 case S_SYNCED:
2764 state_.shift_to(S_SYNCED);
2765 if (synced_cb_(app_ctx_) != WSREP_CB_SUCCESS)
2766 {
2767 log_fatal << "Synced callback failed. This is "
2768 << "unrecoverable, restart required.";
2769 abort();
2770 }
2771 break;
2772 default:
2773 log_debug << "next_state " << next_state;
2774 break;
2775 }
2776 }
2777 st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
2778 }
2779
become_joined_if_needed()2780 void galera::ReplicatorSMM::become_joined_if_needed()
2781 {
2782 if (state_() == S_JOINING && sst_state_ != SST_NONE)
2783 {
2784 /* There are two reasons we can be here:
2785 * 1) we just got state transfer in request_state_transfer().
2786 * 2) we failed here previously (probably due to partition).
2787 */
2788 try {
2789 gcs_.join(gu::GTID(state_uuid_, sst_seqno_), 0);
2790 sst_state_ = SST_JOIN_SENT;
2791 }
2792 catch (gu::Exception& e)
2793 {
2794 if (e.get_errno() == ENOTCONN)
2795 {
2796 log_info << "Failed to JOIN due to non-Prim";
2797 }
2798 else
2799 {
2800 log_warn << "Failed to JOIN the cluster after SST "
2801 << e.what();
2802 }
2803 }
2804 }
2805 }
2806
submit_ordered_view_info(void * recv_ctx,const wsrep_view_info_t * view_info)2807 void galera::ReplicatorSMM::submit_ordered_view_info(
2808 void* recv_ctx,
2809 const wsrep_view_info_t* view_info)
2810 {
2811 try
2812 {
2813 submit_view_info(recv_ctx, view_info);
2814 }
2815 catch (gu::Exception& e)
2816 {
2817 log_fatal << e.what();
2818 abort();
2819 }
2820 }
2821
finish_local_prim_conf_change(int const group_proto_ver,wsrep_seqno_t const seqno,const char * context)2822 void galera::ReplicatorSMM::finish_local_prim_conf_change(
2823 int const group_proto_ver __attribute__((unused)),
2824 wsrep_seqno_t const seqno,
2825 const char* context)
2826 {
2827 become_joined_if_needed();
2828 record_cc_seqnos(seqno, context);
2829 // GCache must contain some actions, at least this CC
2830 bool const ordered __attribute__((unused))
2831 (group_proto_ver >= PROTO_VER_ORDERED_CC);
2832 assert(gcache_.seqno_min() > 0 || not ordered);
2833 }
2834
2835 namespace {
2836 // Deleter for view_info.
operator ()__anonebb396a90111::ViewInfoDeleter2837 struct ViewInfoDeleter { void operator()(void* ptr) { ::free(ptr); } };
2838 }
2839
process_prim_conf_change(void * recv_ctx,const gcs_act_cchange & conf,int const my_index,void * cc_buf)2840 void galera::ReplicatorSMM::process_prim_conf_change(void* recv_ctx,
2841 const gcs_act_cchange& conf,
2842 int const my_index,
2843 void* cc_buf)
2844 {
2845 assert(conf.seqno > 0);
2846 assert(my_index >= 0);
2847
2848 GU_DBUG_SYNC_WAIT("process_primary_configuration");
2849 // Helper to discard cc_buf automatically when it goes out of scope.
2850 // Method keep() should be called if the buffer should be kept in
2851 // gcache.
2852 class CcBufDiscard
2853 {
2854 public:
2855 CcBufDiscard(gcache::GCache& gcache, void* cc_buf)
2856 : gcache_(gcache)
2857 , cc_buf_(cc_buf) { }
2858 CcBufDiscard(const CcBufDiscard&) = delete;
2859 CcBufDiscard& operator=(const CcBufDiscard&) = delete;
2860 ~CcBufDiscard()
2861 {
2862 if (cc_buf_) gcache_.free(cc_buf_);
2863 }
2864 void keep(wsrep_seqno_t const cc_seqno) // keep cc_buf_ in gcache_
2865 {
2866 gu_trace(gcache_.seqno_assign(cc_buf_, cc_seqno,
2867 GCS_ACT_CCHANGE, false));
2868 cc_buf_ = 0;
2869 }
2870 private:
2871 gcache::GCache& gcache_;
2872 void* cc_buf_;
2873 } cc_buf_discard(gcache_, cc_buf);
2874
2875 // Processing local primary conf change, so this node must always
2876 // be in conf change as indicated by my_index.
2877 assert(my_index >= 0);
2878 int const group_proto_version(conf.repl_proto_ver);
2879
2880 wsrep_uuid_t new_uuid(uuid_);
2881 auto const view_info(std::unique_ptr<wsrep_view_info_t, ViewInfoDeleter>(
2882 galera_view_info_create(
2883 conf,
2884 capabilities(group_proto_version),
2885 my_index, new_uuid)));
2886 assert(view_info->my_idx == my_index);
2887 // Will abort if validation fails
2888 validate_local_prim_view_info(view_info.get(), uuid_);
2889
2890 bool const ordered(group_proto_version >= PROTO_VER_ORDERED_CC);
2891 const wsrep_uuid_t& group_uuid (view_info->state_id.uuid);
2892 wsrep_seqno_t const group_seqno(view_info->state_id.seqno);
2893
2894 assert(group_seqno == conf.seqno);
2895 assert(not ordered || group_seqno > 0);
2896
2897 /* Invalidate sst_seqno_ in case of group change. */
2898 if (state_uuid_ != group_uuid) sst_seqno_ = WSREP_SEQNO_UNDEFINED;
2899
2900 if (conf.seqno <= sst_seqno_)
2901 {
2902 // contained already in SST, skip any further processing
2903 if (skip_prim_conf_change(*view_info, group_proto_version))
2904 {
2905 // was not part of IST, don't discard
2906 cc_buf_discard.keep(conf.seqno);
2907 }
2908 return;
2909 }
2910
2911 log_info << "####### processing CC " << group_seqno
2912 << ", local"
2913 << (ordered ? ", ordered" : ", unordered");
2914
2915 drain_monitors_for_local_conf_change();
2916
2917 int const prev_protocol_version(protocol_version_);
2918
2919 const bool first_view(uuid_ == WSREP_UUID_UNDEFINED);
2920 if (first_view)
2921 {
2922 process_first_view(view_info.get(), new_uuid);
2923 }
2924 else if (state_uuid_ != group_uuid)
2925 {
2926 process_group_change(view_info.get());
2927 }
2928
2929 log_info << "####### My UUID: " << uuid_;
2930
2931 safe_to_bootstrap_ = (view_info->memb_num == 1);
2932
2933 gcs_node_state_t const my_state(conf.memb[my_index].state_);
2934
2935 assert(my_state > GCS_NODE_STATE_NON_PRIM);
2936 assert(my_state < GCS_NODE_STATE_MAX);
2937
2938 update_incoming_list(*view_info);
2939
2940 bool const st_required
2941 (state_transfer_required(*view_info, group_proto_version,
2942 my_state == GCS_NODE_STATE_PRIM));
2943 Replicator::State const next_state(state2repl(my_state, my_index));
2944
2945 // if protocol version >= PROTO_VER_ORDERED_CC, first CC already
2946 // carries seqno 1, so it can't be less than 1. For older protocols
2947 // it can be 0.
2948 assert(group_seqno >= (group_proto_version >= PROTO_VER_ORDERED_CC));
2949
2950 reset_index_if_needed(view_info.get(),
2951 prev_protocol_version,
2952 group_proto_version,
2953 st_required);
2954
2955 if (st_required)
2956 {
2957 process_st_required(recv_ctx, group_proto_version, view_info.get());
2958 // Rolling upgrade from earlier version. Group protocol version
2959 // PROTO_VER_GALERA_3_MAX and below do not get CCs from the IST,
2960 // so protocol versions are not established at this point yet.
2961 // Do it now before continuing.
2962 if (group_proto_version <= PROTO_VER_GALERA_3_MAX)
2963 {
2964 establish_protocol_versions(group_proto_version);
2965 }
2966 return;
2967 }
2968
2969 // From this point on the CC is known to be processed in order.
2970 assert(group_seqno > cert_.position());
2971
2972 // This CC is processed in order. Establish protocol versions,
2973 // it must be done before cert_.adjust_position().
2974 establish_protocol_versions (group_proto_version);
2975 /* since CC does not pass certification, need to adjust cert
2976 * position explicitly (when processed in order) */
2977 /* flushes service thd, must be called before gcache_.seqno_reset()*/
2978 cert_.adjust_position(*view_info,
2979 gu::GTID(group_uuid, group_seqno),
2980 trx_params_.version_);
2981
2982 if (first_view)
2983 {
2984 /* if CC is ordered need to use preceding seqno */
2985 set_initial_position(group_uuid, group_seqno - ordered);
2986 gcache_.seqno_reset(gu::GTID(group_uuid, group_seqno - ordered));
2987 }
2988 else
2989 {
2990 // Note: Monitor initial position setting is not needed as this CC
2991 // is processed in order.
2992 assert(state_uuid_ == group_uuid);
2993 update_state_uuid(group_uuid);
2994 }
2995
2996 /* CCs from IST already have seqno assigned and cert. position
2997 * adjusted */
2998 if (ordered)
2999 {
3000 cc_buf_discard.keep(group_seqno);
3001 }
3002 shift_to_next_state(next_state);
3003
3004 submit_ordered_view_info(recv_ctx, view_info.get());
3005
3006 finish_local_prim_conf_change(group_proto_version, group_seqno, "group");
3007
3008 // Cancel monitors after view event has been processed by the
3009 // application. Otherwise last_committed_id() will return incorrect
3010 // value if called from view callback.
3011 if (ordered)
3012 cancel_seqno(group_seqno);
3013 }
3014
process_join(wsrep_seqno_t seqno_j,wsrep_seqno_t seqno_l)3015 void galera::ReplicatorSMM::process_join(wsrep_seqno_t seqno_j,
3016 wsrep_seqno_t seqno_l)
3017 {
3018 LocalOrder lo(seqno_l);
3019
3020 gu_trace(local_monitor_.enter(lo));
3021
3022 wsrep_seqno_t const upto(cert_.position());
3023 drain_monitors(upto);
3024
3025 if (seqno_j < 0 && S_JOINING == state_())
3026 {
3027 // #595, @todo: find a way to re-request state transfer
3028 log_fatal << "Failed to receive state transfer: " << seqno_j
3029 << " (" << strerror (-seqno_j) << "), need to restart.";
3030 abort();
3031 }
3032 else
3033 {
3034 state_.shift_to(S_JOINED);
3035 sst_state_ = SST_NONE;
3036 }
3037
3038 local_monitor_.leave(lo);
3039 }
3040
3041
process_sync(wsrep_seqno_t seqno_l)3042 void galera::ReplicatorSMM::process_sync(wsrep_seqno_t seqno_l)
3043 {
3044 LocalOrder lo(seqno_l);
3045
3046 gu_trace(local_monitor_.enter(lo));
3047
3048 wsrep_seqno_t const upto(cert_.position());
3049 drain_monitors(upto);
3050
3051 state_.shift_to(S_SYNCED);
3052 if (synced_cb_(app_ctx_) != WSREP_CB_SUCCESS)
3053 {
3054 log_fatal << "Synced callback failed. This is unrecoverable, "
3055 << "restart required.";
3056 abort();
3057 }
3058 local_monitor_.leave(lo);
3059 }
3060
pause()3061 wsrep_seqno_t galera::ReplicatorSMM::pause()
3062 {
3063 // Grab local seqno for local_monitor_
3064 wsrep_seqno_t const local_seqno(
3065 static_cast<wsrep_seqno_t>(gcs_.local_sequence()));
3066 LocalOrder lo(local_seqno);
3067 local_monitor_.enter(lo);
3068
3069 // Local monitor should take care that concurrent
3070 // pause requests are enqueued
3071 assert(pause_seqno_ == WSREP_SEQNO_UNDEFINED);
3072 pause_seqno_ = local_seqno;
3073
3074 // Get drain seqno from cert index
3075 wsrep_seqno_t const upto(cert_.position());
3076 drain_monitors(upto);
3077
3078 assert (apply_monitor_.last_left() >= upto);
3079 if (co_mode_ != CommitOrder::BYPASS)
3080 {
3081 assert (commit_monitor_.last_left() >= upto);
3082 assert (commit_monitor_.last_left() == apply_monitor_.last_left());
3083 }
3084
3085 wsrep_seqno_t const ret(last_committed());
3086 st_.set(state_uuid_, ret, safe_to_bootstrap_);
3087
3088 log_info << "Provider paused at " << state_uuid_ << ':' << ret
3089 << " (" << pause_seqno_ << ")";
3090
3091 return ret;
3092 }
3093
resume()3094 void galera::ReplicatorSMM::resume()
3095 {
3096 if (pause_seqno_ == WSREP_SEQNO_UNDEFINED)
3097 {
3098 log_warn << "tried to resume unpaused provider";
3099 return;
3100 }
3101
3102 st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
3103 log_info << "resuming provider at " << pause_seqno_;
3104 LocalOrder lo(pause_seqno_);
3105 pause_seqno_ = WSREP_SEQNO_UNDEFINED;
3106 local_monitor_.leave(lo);
3107 log_info << "Provider resumed.";
3108 }
3109
desync()3110 void galera::ReplicatorSMM::desync()
3111 {
3112 wsrep_seqno_t seqno_l;
3113
3114 ssize_t const ret(gcs_.desync(seqno_l));
3115
3116 if (seqno_l > 0)
3117 {
3118 LocalOrder lo(seqno_l); // need to process it regardless of ret value
3119
3120 if (ret == 0)
3121 {
3122 /* #706 - the check below must be state request-specific. We are not holding
3123 any locks here and must be able to wait like any other action.
3124 However practice may prove different, leaving it here as a reminder.
3125 if (local_monitor_.would_block(seqno_l))
3126 {
3127 gu_throw_error (-EDEADLK) << "Ran out of resources waiting to "
3128 << "desync the node. "
3129 << "The node must be restarted.";
3130 }
3131 */
3132 local_monitor_.enter(lo);
3133 if (state_() != S_DONOR) state_.shift_to(S_DONOR);
3134 local_monitor_.leave(lo);
3135 }
3136 else
3137 {
3138 local_monitor_.self_cancel(lo);
3139 }
3140 }
3141
3142 if (ret)
3143 {
3144 gu_throw_error (-ret) << "Node desync failed.";
3145 }
3146 }
3147
resync()3148 void galera::ReplicatorSMM::resync()
3149 {
3150 gcs_.join(gu::GTID(state_uuid_, commit_monitor_.last_left()), 0);
3151 }
3152
3153
3154 //////////////////////////////////////////////////////////////////////
3155 //////////////////////////////////////////////////////////////////////
3156 //// Private
3157 //////////////////////////////////////////////////////////////////////
3158 //////////////////////////////////////////////////////////////////////
3159
3160 /* process pending queue events scheduled before seqno */
process_pending_queue(wsrep_seqno_t local_seqno)3161 void galera::ReplicatorSMM::process_pending_queue(wsrep_seqno_t local_seqno)
3162 {
3163 // This method should be called only from code paths of local
3164 // processing, i.e. events from group.
3165 assert(local_seqno > 0);
3166 // pending_cert_queue_ contains all writesets that:
3167 // a) were BF aborted before being certified
3168 // b) are not going to be replayed even though
3169 // cert_for_aborted() returned TEST_OK for them
3170 //
3171 // Before certifying the current seqno, check if
3172 // pending_cert_queue contains any smaller seqno.
3173 // This avoids the certification index to diverge
3174 // across nodes.
3175 TrxHandleSlavePtr queued_ts;
3176 while ((queued_ts = pending_cert_queue_.must_cert_next(local_seqno)) != 0)
3177 {
3178 log_debug << "must cert next " << local_seqno
3179 << " aborted ts " << *queued_ts;
3180
3181 Certification::TestResult const result(cert_.append_trx(queued_ts));
3182
3183 log_debug << "trx in pending cert queue certified, result: " << result;
3184
3185 assert(!queued_ts->cert_bypass() ||
3186 Certification::TestResult::TEST_OK == result);
3187
3188 bool const skip(Certification::TestResult::TEST_FAILED == result &&
3189 !(queued_ts->cert_bypass()/* expl. ROLLBACK */));
3190
3191 /* at this point we are still assigning seqno to buffer in order */
3192 gcache_.seqno_assign(queued_ts->action().first,
3193 queued_ts->global_seqno(),
3194 GCS_ACT_WRITESET, skip);
3195
3196 cert_.set_trx_committed(*queued_ts);
3197 }
3198 }
3199
enter_local_monitor_for_cert(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3200 bool galera::ReplicatorSMM::enter_local_monitor_for_cert(
3201 TrxHandleMaster* trx,
3202 const TrxHandleSlavePtr& ts)
3203 {
3204 bool in_replay(trx != 0 &&
3205 trx->state() == TrxHandle::S_MUST_REPLAY);
3206
3207 bool interrupted(false);
3208 try
3209 {
3210 if (trx != 0)
3211 {
3212 if (in_replay == false) TX_SET_STATE(*trx, TrxHandle::S_CERTIFYING);
3213 trx->unlock();
3214 }
3215 LocalOrder lo(*ts);
3216 if (in_replay == false || local_monitor_.entered(lo) == false)
3217 {
3218 gu_trace(local_monitor_.enter(lo));
3219 }
3220
3221 if (trx != 0) trx->lock();
3222
3223 assert(trx == 0 ||
3224 (trx->state() == TrxHandle::S_CERTIFYING ||
3225 trx->state() == TrxHandle::S_MUST_ABORT ||
3226 trx->state() == TrxHandle::S_MUST_REPLAY));
3227
3228 TX_SET_STATE(*ts, TrxHandle::S_CERTIFYING);
3229 }
3230 catch (gu::Exception& e)
3231 {
3232 if (trx != 0) trx->lock();
3233 if (e.get_errno() == EINTR) { interrupted = true; }
3234 else throw;
3235 }
3236 return (not interrupted);
3237 }
3238
handle_local_monitor_interrupted(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3239 wsrep_status_t galera::ReplicatorSMM::handle_local_monitor_interrupted(
3240 TrxHandleMaster* trx,
3241 const TrxHandleSlavePtr& ts)
3242 {
3243 assert(trx != 0);
3244 // Did not enter local monitor.
3245 assert(ts->state() == TrxHandle::S_REPLICATING);
3246 wsrep_status_t retval(cert_for_aborted(ts));
3247
3248 if (WSREP_TRX_FAIL != retval)
3249 {
3250 assert(ts->state() == TrxHandle::S_REPLICATING ||
3251 ts->state() == TrxHandle::S_CERTIFYING);
3252 assert(WSREP_BF_ABORT == retval);
3253 assert(trx != 0);
3254
3255 // If the transaction was committing, it must replay.
3256 if (ts->flags() & TrxHandle::F_COMMIT)
3257 {
3258 // Return immediately without canceling local monitor,
3259 // it needs to be grabbed again in replay stage.
3260 TX_SET_STATE(*trx, TrxHandle::S_MUST_REPLAY);
3261 return retval;
3262 }
3263 // if not - we need to rollback, so pretend that certification
3264 // failed, but still update cert index to match slaves
3265 else
3266 {
3267 pending_cert_queue_.push(ts);
3268 retval = WSREP_TRX_FAIL;
3269 }
3270 }
3271 else
3272 {
3273 assert(ts->is_dummy());
3274 pending_cert_queue_.push(ts);
3275 }
3276
3277 assert(WSREP_TRX_FAIL == retval);
3278 TX_SET_STATE(*trx, TrxHandle::S_ABORTING);
3279
3280 LocalOrder lo(*ts);
3281 local_monitor_.self_cancel(lo);
3282 // Cert for aborted returned certification failure, so this
3283 // trx will roll back. Mark it as certified to denote that
3284 // local monitor must not be accessed again.
3285 TX_SET_STATE(*ts, TrxHandle::S_CERTIFYING);
3286
3287 assert((retval == WSREP_TRX_FAIL && ts->is_dummy()) ||
3288 retval == WSREP_BF_ABORT || ts->queued());
3289 return retval;
3290 }
3291
finish_cert(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3292 wsrep_status_t galera::ReplicatorSMM::finish_cert(
3293 TrxHandleMaster* trx,
3294 const TrxHandleSlavePtr& ts)
3295 {
3296 assert(ts->state() == TrxHandle::S_CERTIFYING);
3297
3298 gu_trace(process_pending_queue(ts->local_seqno()));
3299
3300 // Write sets which would overlap with IST must have already been
3301 // filtered out before getting here.
3302 assert(ts->global_seqno() == cert_.position() + 1);
3303
3304 wsrep_status_t retval;
3305 switch (cert_.append_trx(ts))
3306 {
3307 case Certification::TEST_OK:
3308 // NBO_END should certify positively only if it ends NBO
3309 assert(ts->ends_nbo() > 0 || !ts->nbo_end());
3310 if (trx != 0 && trx->state() == TrxHandle::S_MUST_ABORT)
3311 {
3312 if (ts->flags() & TrxHandle::F_COMMIT)
3313 {
3314 TX_SET_STATE(*trx, TrxHandle::S_MUST_REPLAY);
3315 // apply monitor will be entered during replay
3316 }
3317 else
3318 {
3319 // Abort the transaction if non-committing
3320 // fragment was BF aborted during certification.
3321 TX_SET_STATE(*trx, TrxHandle::S_ABORTING);
3322 }
3323 retval = WSREP_BF_ABORT;
3324 }
3325 else
3326 {
3327 retval = WSREP_OK;
3328 }
3329 assert(!ts->is_dummy());
3330 break;
3331 case Certification::TEST_FAILED:
3332 assert(ts->is_dummy());
3333 if (ts->nbo_end()) assert(ts->ends_nbo() == WSREP_SEQNO_UNDEFINED);
3334 local_cert_failures_ += ts->local();
3335 if (trx != 0) TX_SET_STATE(*trx, TrxHandle::S_ABORTING);
3336 retval = WSREP_TRX_FAIL;
3337 break;
3338 default:
3339 retval = WSREP_TRX_FAIL;
3340 assert(0);
3341 break;
3342 }
3343
3344 // at this point we are about to leave local_monitor_. Make sure
3345 // trx checksum was alright before that.
3346 ts->verify_checksum();
3347
3348 // we must do seqno assignment 'in order' for std::map reasons,
3349 // so keeping it inside the monitor. NBO end should never be skipped.
3350 bool const skip(ts->is_dummy() && !ts->nbo_end());
3351 gcache_.seqno_assign (ts->action().first, ts->global_seqno(),
3352 GCS_ACT_WRITESET, skip);
3353
3354 LocalOrder lo(*ts);
3355 local_monitor_.leave(lo);
3356
3357 return retval;
3358 }
3359
3360 /* don't use this directly, use cert_and_catch() instead */
3361 inline
cert(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3362 wsrep_status_t galera::ReplicatorSMM::cert(TrxHandleMaster* trx,
3363 const TrxHandleSlavePtr& ts)
3364 {
3365 assert(trx == 0 ||
3366 (trx->state() == TrxHandle::S_REPLICATING ||
3367 trx->state() == TrxHandle::S_MUST_REPLAY));
3368 assert(ts->state() == TrxHandle::S_REPLICATING);
3369
3370 assert(ts->local_seqno() != WSREP_SEQNO_UNDEFINED);
3371 assert(ts->global_seqno() != WSREP_SEQNO_UNDEFINED);
3372 assert(ts->last_seen_seqno() >= 0);
3373 assert(ts->last_seen_seqno() < ts->global_seqno());
3374
3375 LocalOrder lo(*ts);
3376 // Local monitor is either released or canceled in
3377 // handle_local_monitor_interrupted(), finish_cert().
3378 bool interrupted(not enter_local_monitor_for_cert(trx, ts));
3379
3380 if (gu_unlikely (interrupted))
3381 {
3382 return handle_local_monitor_interrupted(trx, ts);
3383 }
3384 else
3385 {
3386 return finish_cert(trx, ts);
3387 }
3388 }
3389
3390 /* pretty much any exception in cert() is fatal as it blocks local_monitor_ */
cert_and_catch(TrxHandleMaster * trx,const TrxHandleSlavePtr & ts)3391 wsrep_status_t galera::ReplicatorSMM::cert_and_catch(
3392 TrxHandleMaster* trx,
3393 const TrxHandleSlavePtr& ts)
3394 {
3395 try
3396 {
3397 return cert(trx, ts);
3398 }
3399 catch (std::exception& e)
3400 {
3401 log_fatal << "Certification exception: " << e.what();
3402 }
3403 catch (...)
3404 {
3405 log_fatal << "Unknown certification exception";
3406 }
3407 assert(0);
3408 abort();
3409 }
3410
3411 /* This must be called BEFORE local_monitor_.self_cancel() due to
3412 * gcache_.seqno_assign() */
cert_for_aborted(const TrxHandleSlavePtr & ts)3413 wsrep_status_t galera::ReplicatorSMM::cert_for_aborted(
3414 const TrxHandleSlavePtr& ts)
3415 {
3416 // trx was BF aborted either while it was replicating or
3417 // while it was waiting for local monitor
3418 assert(ts->state() == TrxHandle::S_REPLICATING ||
3419 ts->state() == TrxHandle::S_CERTIFYING);
3420
3421 Certification::TestResult const res(cert_.test(ts, false));
3422
3423 switch (res)
3424 {
3425 case Certification::TEST_OK:
3426 return WSREP_BF_ABORT;
3427
3428 case Certification::TEST_FAILED:
3429 // Next step will be monitors release. Make sure that ws was not
3430 // corrupted and cert failure is real before proceeding with that.
3431 //gcf788 - this must be moved to cert(), the caller method
3432 assert(ts->is_dummy());
3433 ts->verify_checksum();
3434 assert(!ts->nbo_end()); // should never be skipped in seqno_assign()
3435 return WSREP_TRX_FAIL;
3436
3437 default:
3438 log_fatal << "Unexpected return value from Certification::test(): "
3439 << res;
3440 abort();
3441 }
3442 }
3443
enter_apply_monitor_for_local(TrxHandleMaster & trx,const TrxHandleSlavePtr & ts)3444 bool galera::ReplicatorSMM::enter_apply_monitor_for_local(
3445 TrxHandleMaster& trx,
3446 const TrxHandleSlavePtr& ts)
3447 {
3448 assert(ts->global_seqno() > last_committed());
3449 assert(ts->depends_seqno() >= 0);
3450
3451 TX_SET_STATE(trx, TrxHandle::S_APPLYING);
3452
3453 ApplyOrder ao(*ts);
3454 bool interrupted(false);
3455
3456 try
3457 {
3458 trx.unlock();
3459 GU_DBUG_SYNC_WAIT("before_certify_apply_monitor_enter");
3460 gu_trace(apply_monitor_.enter(ao));
3461 GU_DBUG_SYNC_WAIT("after_certify_apply_monitor_enter");
3462 trx.lock();
3463 assert(trx.state() == TrxHandle::S_APPLYING ||
3464 trx.state() == TrxHandle::S_MUST_ABORT);
3465 }
3466 catch (gu::Exception& e)
3467 {
3468 trx.lock();
3469 if (e.get_errno() == EINTR)
3470 {
3471 interrupted = true;
3472 }
3473 else throw;
3474 }
3475 return (not interrupted);
3476 }
3477
handle_apply_monitor_interrupted(TrxHandleMaster & trx,const TrxHandleSlavePtr & ts)3478 wsrep_status_t galera::ReplicatorSMM::handle_apply_monitor_interrupted(
3479 TrxHandleMaster& trx,
3480 const TrxHandleSlavePtr& ts)
3481 {
3482 assert(trx.state() == TrxHandle::S_MUST_ABORT);
3483 assert(ts->state() == TrxHandle::S_CERTIFYING);
3484
3485 wsrep_status_t retval;
3486 if (ts->flags() & TrxHandle::F_COMMIT)
3487 {
3488 TX_SET_STATE(trx, TrxHandle::S_MUST_REPLAY);
3489 retval = WSREP_BF_ABORT;
3490 }
3491 else
3492 {
3493 TX_SET_STATE(trx, TrxHandle::S_ABORTING);
3494 retval = WSREP_TRX_FAIL;
3495 }
3496 return retval;
3497 }
3498
enter_apply_monitor_for_local_not_committing(const TrxHandleMaster & trx,TrxHandleSlave & ts)3499 void galera::ReplicatorSMM::enter_apply_monitor_for_local_not_committing(
3500 const TrxHandleMaster& trx,
3501 TrxHandleSlave& ts)
3502 {
3503 assert(trx.state() == TrxHandle::S_ABORTING ||
3504 trx.state() == TrxHandle::S_REPLAYING);
3505 assert(ts.state() < TrxHandle::S_COMMITTING);
3506 switch (ts.state())
3507 {
3508 case TrxHandle::S_REPLICATING:
3509 TX_SET_STATE(ts, TrxHandle::S_CERTIFYING);
3510 // fall through
3511 case TrxHandle::S_CERTIFYING:
3512 {
3513 ApplyOrder ao(ts);
3514 apply_monitor_.enter(ao);
3515 TX_SET_STATE(ts, TrxHandle::S_APPLYING);
3516 break;
3517 }
3518 case TrxHandle::S_APPLYING:
3519 break;
3520 default:
3521 assert(0); // Should never happen
3522 }
3523 }
3524
3525 void
update_state_uuid(const wsrep_uuid_t & uuid)3526 galera::ReplicatorSMM::update_state_uuid (const wsrep_uuid_t& uuid)
3527 {
3528 if (state_uuid_ != uuid)
3529 {
3530 *(const_cast<wsrep_uuid_t*>(&state_uuid_)) = uuid;
3531
3532 std::ostringstream os; os << state_uuid_;
3533 // Copy only non-nil terminated part of the source string
3534 // and terminate the string explicitly to silence a warning
3535 // generated by Wstringop-truncation
3536 char* str(const_cast<char*>(state_uuid_str_));
3537 strncpy(str, os.str().c_str(), sizeof(state_uuid_str_) - 1);
3538 str[sizeof(state_uuid_str_) - 1] = '\0';
3539 }
3540
3541 st_.set(uuid, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
3542 }
3543
3544 void
abort()3545 galera::ReplicatorSMM::abort()
3546 {
3547 log_info << "ReplicatorSMM::abort()";
3548 gcs_.close();
3549 gu_abort();
3550 }
3551