1 //
2 // Copyright (C) 2010-2020 Codership Oy <info@codership.com>
3 //
4
5 #include "replicator_smm.hpp"
6 #include "galera_info.hpp"
7
8 #include <gu_abort.h>
9 #include <gu_throw.hpp>
10
11 /*
12 * Decide STR protocol version based on group protocol version.
13 * For more infromation about versions, see table in replicator_smm.hpp.
14 */
get_str_proto_ver(int const group_proto_ver)15 static int get_str_proto_ver(int const group_proto_ver)
16 {
17 switch (group_proto_ver)
18 {
19 case 1:
20 return 0;
21 case 2:
22 case 3:
23 case 4:
24 case 5:
25 return 1;
26 case 6:
27 case 7:
28 case 8:
29 case 9:
30 // gcs intelligent donor selection.
31 // include handling dangling comma in donor string.
32 return 2;
33 case 10:
34 // 4.x
35 // CC events in IST, certification index preload
36 return 3;
37 default:
38 gu_throw_error(EPROTO)
39 << "Can't find suitable STR protocol version based on "
40 << "group protocol version: " << group_proto_ver;
41 }
42 }
43
44 namespace galera {
45
46 bool
state_transfer_required(const wsrep_view_info_t & view_info,int const group_proto_ver,bool const rejoined)47 ReplicatorSMM::state_transfer_required(const wsrep_view_info_t& view_info,
48 int const group_proto_ver,
49 bool const rejoined)
50 {
51 const int str_proto_ver(get_str_proto_ver(group_proto_ver));
52 if (rejoined)
53 {
54 assert(view_info.view >= 0);
55
56 if (state_uuid_ == view_info.state_id.uuid) // common history
57 {
58 wsrep_seqno_t const group_seqno(view_info.state_id.seqno);
59 wsrep_seqno_t const local_seqno(last_committed());
60
61 if (str_proto_ver >= 3)
62 return (local_seqno + 1 < group_seqno); // this CC will add 1
63 else
64 return (local_seqno < group_seqno);
65 }
66
67 return true;
68 }
69
70 return false;
71 }
72
73 wsrep_status_t
sst_received(const wsrep_gtid_t & state_id,const wsrep_buf_t * const state,int const rcode)74 ReplicatorSMM::sst_received(const wsrep_gtid_t& state_id,
75 const wsrep_buf_t* const state,
76 int const rcode)
77 {
78 log_info << "SST received: " << state_id.uuid << ':' << state_id.seqno;
79
80 gu::Lock lock(sst_mutex_);
81
82 if (state_() != S_JOINING)
83 {
84 log_error << "not JOINING when sst_received() called, state: "
85 << state_();
86 return WSREP_CONN_FAIL;
87 }
88
89 assert(rcode <= 0);
90 if (rcode) { assert(state_id.seqno == WSREP_SEQNO_UNDEFINED); }
91
92 sst_uuid_ = state_id.uuid;
93 sst_seqno_ = rcode ? WSREP_SEQNO_UNDEFINED : state_id.seqno;
94 assert(false == sst_received_);
95 sst_received_ = true;
96 sst_cond_.signal();
97
98 return WSREP_OK;
99 }
100
101
102 class StateRequest_v0 : public ReplicatorSMM::StateRequest
103 {
104 public:
StateRequest_v0(const void * const sst_req,ssize_t const sst_req_len)105 StateRequest_v0 (const void* const sst_req, ssize_t const sst_req_len)
106 : req_(sst_req), len_(sst_req_len)
107 {}
~StateRequest_v0()108 ~StateRequest_v0 () {}
version() const109 virtual int version () const { return 0; }
req() const110 virtual const void* req () const { return req_; }
len() const111 virtual ssize_t len () const { return len_; }
sst_req() const112 virtual const void* sst_req () const { return req_; }
sst_len() const113 virtual ssize_t sst_len () const { return len_; }
ist_req() const114 virtual const void* ist_req () const { return 0; }
ist_len() const115 virtual ssize_t ist_len () const { return 0; }
116 private:
117 StateRequest_v0 (const StateRequest_v0&);
118 StateRequest_v0& operator = (const StateRequest_v0&);
119 const void* const req_;
120 ssize_t const len_;
121 };
122
123
124 class StateRequest_v1 : public ReplicatorSMM::StateRequest
125 {
126 public:
127 static std::string const MAGIC;
128 StateRequest_v1 (const void* sst_req, ssize_t sst_req_len,
129 const void* ist_req, ssize_t ist_req_len);
130 StateRequest_v1 (const void* str, ssize_t str_len);
~StateRequest_v1()131 ~StateRequest_v1 () { if (own_ && req_) free (req_); }
version() const132 virtual int version () const { return 1; }
req() const133 virtual const void* req () const { return req_; }
len() const134 virtual ssize_t len () const { return len_; }
sst_req() const135 virtual const void* sst_req () const { return req(sst_offset()); }
sst_len() const136 virtual ssize_t sst_len () const { return len(sst_offset()); }
ist_req() const137 virtual const void* ist_req () const { return req(ist_offset()); }
ist_len() const138 virtual ssize_t ist_len () const { return len(ist_offset()); }
139 private:
140 StateRequest_v1 (const StateRequest_v1&);
141 StateRequest_v1& operator = (const StateRequest_v1&);
142
sst_offset() const143 ssize_t sst_offset() const { return MAGIC.length() + 1; }
ist_offset() const144 ssize_t ist_offset() const
145 {
146 return sst_offset() + sizeof(uint32_t) + sst_len();
147 }
148
len(ssize_t offset) const149 ssize_t len (ssize_t offset) const
150 {
151 int32_t ret;
152 gu::unserialize4(req_, offset, ret);
153 return ret;
154 }
155
req(ssize_t offset) const156 void* req (ssize_t offset) const
157 {
158 if (len(offset) > 0)
159 return req_ + offset + sizeof(uint32_t);
160 else
161 return 0;
162 }
163
164 ssize_t const len_;
165 char* const req_;
166 bool const own_;
167 };
168
169 std::string const
170 StateRequest_v1::MAGIC("STRv1");
171
172 #ifndef INT32_MAX
173 #define INT32_MAX 0x7fffffff
174 #endif
175
StateRequest_v1(const void * const sst_req,ssize_t const sst_req_len,const void * const ist_req,ssize_t const ist_req_len)176 StateRequest_v1::StateRequest_v1 (
177 const void* const sst_req, ssize_t const sst_req_len,
178 const void* const ist_req, ssize_t const ist_req_len)
179 :
180 len_(MAGIC.length() + 1 +
181 sizeof(uint32_t) + sst_req_len +
182 sizeof(uint32_t) + ist_req_len),
183 req_(static_cast<char*>(malloc(len_))),
184 own_(true)
185 {
186 if (!req_)
187 gu_throw_error (ENOMEM) << "Could not allocate state request v1";
188
189 if (sst_req_len > INT32_MAX || sst_req_len < 0)
190 gu_throw_error (EMSGSIZE) << "SST request length (" << sst_req_len
191 << ") unrepresentable";
192
193 if (ist_req_len > INT32_MAX || ist_req_len < 0)
194 gu_throw_error (EMSGSIZE) << "IST request length (" << sst_req_len
195 << ") unrepresentable";
196
197 char* ptr(req_);
198
199 strcpy (ptr, MAGIC.c_str());
200 ptr += MAGIC.length() + 1;
201
202 ptr += gu::serialize4(uint32_t(sst_req_len), ptr, 0);
203
204 memcpy (ptr, sst_req, sst_req_len);
205 ptr += sst_req_len;
206
207 ptr += gu::serialize4(uint32_t(ist_req_len), ptr, 0);
208
209 memcpy (ptr, ist_req, ist_req_len);
210
211 assert ((ptr - req_) == (len_ - ist_req_len));
212 }
213
214 // takes ownership over str buffer
StateRequest_v1(const void * str,ssize_t str_len)215 StateRequest_v1::StateRequest_v1 (const void* str, ssize_t str_len)
216 :
217 len_(str_len),
218 req_(static_cast<char*>(const_cast<void*>(str))),
219 own_(false)
220 {
221 if (sst_offset() + 2*sizeof(uint32_t) > size_t(len_))
222 {
223 assert(0);
224 gu_throw_error (EINVAL) << "State transfer request is too short: "
225 << len_ << ", must be at least: "
226 << (sst_offset() + 2*sizeof(uint32_t));
227 }
228
229 if (strncmp (req_, MAGIC.c_str(), MAGIC.length()))
230 {
231 assert(0);
232 gu_throw_error (EINVAL) << "Wrong magic signature in state request v1.";
233 }
234
235 if (sst_offset() + sst_len() + 2*sizeof(uint32_t) > size_t(len_))
236 {
237 gu_throw_error (EINVAL) << "Malformed state request v1: sst length: "
238 << sst_len() << ", total length: " << len_;
239 }
240
241 if (ist_offset() + ist_len() + sizeof(uint32_t) != size_t(len_))
242 {
243 gu_throw_error (EINVAL) << "Malformed state request v1: parsed field "
244 "length " << sst_len() << " is not equal to total request length "
245 << len_;
246 }
247 }
248
249
250 static ReplicatorSMM::StateRequest*
read_state_request(const void * const req,size_t const req_len)251 read_state_request (const void* const req, size_t const req_len)
252 {
253 const char* const str(static_cast<const char*>(req));
254
255 bool const v1(req_len > StateRequest_v1::MAGIC.length() &&
256 !strncmp(str, StateRequest_v1::MAGIC.c_str(),
257 StateRequest_v1::MAGIC.length()));
258
259 log_info << "Detected STR version: " << int(v1) << ", req_len: "
260 << req_len << ", req: " << str;
261 if (v1)
262 {
263 return (new StateRequest_v1(req, req_len));
264 }
265 else
266 {
267 return (new StateRequest_v0(req, req_len));
268 }
269 }
270
271
272 class IST_request
273 {
274 public:
IST_request()275 IST_request() : peer_(), uuid_(), last_applied_(), group_seqno_() { }
IST_request(const std::string & peer,const wsrep_uuid_t & uuid,wsrep_seqno_t last_applied,wsrep_seqno_t last_missing_seqno)276 IST_request(const std::string& peer,
277 const wsrep_uuid_t& uuid,
278 wsrep_seqno_t last_applied,
279 wsrep_seqno_t last_missing_seqno)
280 :
281 peer_(peer),
282 uuid_(uuid),
283 last_applied_(last_applied),
284 group_seqno_(last_missing_seqno)
285 { }
peer() const286 const std::string& peer() const { return peer_ ; }
uuid() const287 const wsrep_uuid_t& uuid() const { return uuid_ ; }
last_applied() const288 wsrep_seqno_t last_applied() const { return last_applied_; }
group_seqno() const289 wsrep_seqno_t group_seqno() const { return group_seqno_; }
290 private:
291 friend std::ostream& operator<<(std::ostream&, const IST_request&);
292 friend std::istream& operator>>(std::istream&, IST_request&);
293 std::string peer_;
294 wsrep_uuid_t uuid_;
295 wsrep_seqno_t last_applied_;
296 wsrep_seqno_t group_seqno_;
297 };
298
operator <<(std::ostream & os,const IST_request & istr)299 std::ostream& operator<<(std::ostream& os, const IST_request& istr)
300 {
301 return (os
302 << istr.uuid_ << ":"
303 << istr.last_applied_ << "-"
304 << istr.group_seqno_ << "|"
305 << istr.peer_);
306 }
307
operator >>(std::istream & is,IST_request & istr)308 std::istream& operator>>(std::istream& is, IST_request& istr)
309 {
310 char c;
311 return (is >> istr.uuid_ >> c >> istr.last_applied_
312 >> c >> istr.group_seqno_ >> c >> istr.peer_);
313 }
314
315 static void
get_ist_request(const ReplicatorSMM::StateRequest * str,IST_request * istr)316 get_ist_request(const ReplicatorSMM::StateRequest* str, IST_request* istr)
317 {
318 assert(str->ist_len());
319 std::string ist_str(static_cast<const char*>(str->ist_req()),
320 str->ist_len());
321 std::istringstream is(ist_str);
322 is >> *istr;
323 }
324
325 static bool
sst_is_trivial(const void * const req,size_t const len)326 sst_is_trivial (const void* const req, size_t const len)
327 {
328 /* Check that the first string in request == ReplicatorSMM::TRIVIAL_SST */
329 static size_t const trivial_len(strlen(ReplicatorSMM::TRIVIAL_SST) + 1);
330 return (len >= trivial_len &&
331 !::memcmp(req, ReplicatorSMM::TRIVIAL_SST, trivial_len));
332 }
333
334 static bool
no_sst(const void * const req,size_t const len)335 no_sst (const void* const req, size_t const len)
336 {
337 /* Check that the first string in request == ReplicatorSMM::NO_SST */
338 static size_t const no_len(strlen(ReplicatorSMM::NO_SST) + 1);
339 return (len >= no_len &&
340 !::memcmp(req, ReplicatorSMM::NO_SST, no_len));
341 }
342
343 wsrep_seqno_t
donate_sst(void * const recv_ctx,const StateRequest & streq,const wsrep_gtid_t & state_id,bool const bypass)344 ReplicatorSMM::donate_sst(void* const recv_ctx,
345 const StateRequest& streq,
346 const wsrep_gtid_t& state_id,
347 bool const bypass)
348 {
349 wsrep_buf_t const str = { streq.sst_req(), size_t(streq.sst_len()) };
350
351 wsrep_cb_status const err(sst_donate_cb_(app_ctx_, recv_ctx, &str,
352 &state_id, NULL, bypass));
353
354 wsrep_seqno_t const ret
355 (WSREP_CB_SUCCESS == err ? state_id.seqno : -ECANCELED);
356
357 if (ret < 0)
358 {
359 log_error << "SST " << (bypass ? "bypass " : "") << "failed: " << err;
360 }
361
362 return ret;
363 }
364
365 struct slg
366 {
367 gcache::GCache& gcache_;
368 bool unlock_;
369
slggalera::slg370 slg(gcache::GCache& cache) : gcache_(cache), unlock_(false){}
~slggalera::slg371 ~slg() { if (unlock_) gcache_.seqno_unlock(); }
372 };
373
run_ist_senders(ist::AsyncSenderMap & ist_senders,const gu::Config & config,const std::string & peer,wsrep_seqno_t const preload_start,wsrep_seqno_t const cc_seqno,wsrep_seqno_t const cc_lowest,int const proto_ver,slg & seqno_lock_guard,wsrep_seqno_t const rcode)374 static wsrep_seqno_t run_ist_senders(ist::AsyncSenderMap& ist_senders,
375 const gu::Config& config,
376 const std::string& peer,
377 wsrep_seqno_t const preload_start,
378 wsrep_seqno_t const cc_seqno,
379 wsrep_seqno_t const cc_lowest,
380 int const proto_ver,
381 slg& seqno_lock_guard,
382 wsrep_seqno_t const rcode)
383 {
384 try
385 {
386 ist_senders.run(config,
387 peer,
388 preload_start,
389 cc_seqno,
390 cc_lowest,
391 proto_ver);
392 // seqno will be unlocked when sender exists
393 seqno_lock_guard.unlock_ = false;
394 return rcode;
395 }
396 catch (gu::Exception& e)
397 {
398 log_warn << "IST failed: " << e.what();
399 return -e.get_errno();
400 }
401 }
402
process_state_req(void * recv_ctx,const void * req,size_t req_size,wsrep_seqno_t const seqno_l,wsrep_seqno_t const donor_seq)403 void ReplicatorSMM::process_state_req(void* recv_ctx,
404 const void* req,
405 size_t req_size,
406 wsrep_seqno_t const seqno_l,
407 wsrep_seqno_t const donor_seq)
408 {
409 assert(recv_ctx != 0);
410 assert(seqno_l > -1);
411 assert(req != 0);
412
413 StateRequest* const streq(read_state_request(req, req_size));
414 // Guess correct STR protocol version. Here we assume that the
415 // replicator protocol version didn't change between sending
416 // and receiving STR message. Unfortunately the protocol version
417 // is not yet available in STR message.
418 int const str_proto_ver(get_str_proto_ver(protocol_version_));
419
420 LocalOrder lo(seqno_l);
421
422 gu_trace(local_monitor_.enter(lo));
423 apply_monitor_.drain(donor_seq);
424
425 if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(donor_seq);
426
427 state_.shift_to(S_DONOR);
428
429 // somehow the following does not work, string is initialized beyond
430 // the first \0:
431 //std::string const req_str(static_cast<const char*>(streq->sst_req()),
432 // streq->sst_len());
433 // have to resort to C ways.
434
435 char* const tmp(strndup(static_cast<const char*>(streq->sst_req()),
436 streq->sst_len()));
437 std::string const req_str(tmp);
438 free (tmp);
439
440 bool const trivial_sst(sst_is_trivial(streq->sst_req(), streq->sst_len()));
441 bool const skip_sst(trivial_sst
442 || no_sst(streq->sst_req(), streq->sst_len()));
443
444 wsrep_seqno_t rcode (0);
445 bool join_now = true;
446
447 if (not skip_sst)
448 {
449 slg seqno_lock_guard(gcache_);
450
451 if (streq->ist_len())
452 {
453 IST_request istr;
454 get_ist_request(streq, &istr);
455
456 if (istr.uuid() == state_uuid_ && istr.last_applied() >= 0)
457 {
458 log_info << "IST request: " << istr;
459
460 wsrep_seqno_t const first
461 ((str_proto_ver < 3 || cc_lowest_trx_seqno_ == 0) ?
462 istr.last_applied() + 1 :
463 std::min(cc_lowest_trx_seqno_, istr.last_applied()+1));
464
465 try
466 {
467 gcache_.seqno_lock(first);
468 seqno_lock_guard.unlock_ = true;
469 }
470 catch(gu::NotFound& nf)
471 {
472 log_info << "IST first seqno " << istr.last_applied() + 1
473 << " not found from cache, falling back to SST";
474 // @todo: close IST channel explicitly
475 goto full_sst;
476 }
477
478 if (streq->sst_len()) // if joiner is waiting for SST, notify it
479 {
480 wsrep_gtid_t const state_id =
481 { istr.uuid(), istr.last_applied() };
482
483 gu_trace(rcode = donate_sst(recv_ctx, *streq, state_id, true));
484
485 // we will join in sst_sent.
486 join_now = false;
487 }
488
489 if (rcode >= 0)
490 {
491 rcode = run_ist_senders(ist_senders_,
492 config_,
493 istr.peer(),
494 first,
495 cc_seqno_,
496 cc_lowest_trx_seqno_,
497 /* Historically IST messages are versioned
498 * with the global replicator protocol.
499 * Need to keep it that way for backward
500 * compatibility */
501 protocol_version_,
502 seqno_lock_guard,
503 rcode);
504 }
505 else
506 {
507 log_error << "Failed to bypass SST";
508 }
509
510 goto out;
511 }
512 }
513
514 full_sst:
515
516 assert(!seqno_lock_guard.unlock_);
517
518 if (cert_.nbo_size() > 0)
519 {
520 log_warn << "Non-blocking operation in progress, cannot donate SST";
521 rcode = -EAGAIN;
522 }
523 else if (streq->sst_len())
524 {
525 assert(0 == rcode);
526
527 wsrep_gtid_t const state_id = { state_uuid_, donor_seq };
528
529 if (str_proto_ver >= 3)
530 {
531 if (streq->version() > 0)
532 {
533 if (streq->ist_len() <= 0)
534 {
535 if (not trivial_sst)
536 {
537 log_warn << "Joiner didn't provide IST connection "
538 "info - cert. index preload impossible, bailing "
539 "out.";
540 rcode = -ENOMSG;
541 }
542 else
543 {
544 /* don't warn about trivial SST requests: such nodes
545 * are not supposed to fully join the cluster, e,g,
546 * garbd */
547 }
548 goto out;
549 }
550
551 wsrep_seqno_t preload_start(cc_lowest_trx_seqno_);
552
553 try
554 {
555 if (preload_start <= 0)
556 {
557 preload_start = cc_seqno_;
558 }
559
560 gcache_.seqno_lock(preload_start);
561 seqno_lock_guard.unlock_ = true;
562 }
563 catch (gu::NotFound& nf)
564 {
565 log_warn << "Cert index preload first seqno "
566 << preload_start
567 << " not found from gcache (min available: "
568 << gcache_.seqno_min() << ')';
569 rcode = -ENOMSG;
570 goto out;
571 }
572
573 log_info << "Cert index preload: " << preload_start
574 << " -> " << cc_seqno_;
575
576 IST_request istr;
577 get_ist_request(streq, &istr);
578 // Send trxs to rebuild cert index.
579 rcode = run_ist_senders(ist_senders_,
580 config_,
581 istr.peer(),
582 preload_start,
583 cc_seqno_,
584 preload_start,
585 /* Historically IST messages are versioned
586 * with the global replicator protocol.
587 * Need to keep it that way for backward
588 * compatibility */
589 protocol_version_,
590 seqno_lock_guard,
591 rcode);
592 if (rcode < 0) goto out;
593 }
594 else /* streq->version() == 0 */
595 {
596 log_info << "STR v0: assuming backup request, skipping "
597 "cert. index preload.";
598 }
599 }
600
601 rcode = donate_sst(recv_ctx, *streq, state_id, false);
602 // we will join in sst_sent.
603 join_now = false;
604 }
605 else
606 {
607 log_warn << "SST request is null, SST canceled.";
608 rcode = -ECANCELED;
609 }
610 }
611
612 out:
613 delete streq;
614
615 local_monitor_.leave(lo);
616
617 if (join_now || rcode < 0)
618 {
619 gcs_.join(gu::GTID(state_uuid_, donor_seq), rcode);
620 }
621 }
622
623
624 void
prepare_for_IST(void * & ptr,ssize_t & len,int const group_proto_ver,int const str_proto_ver,const wsrep_uuid_t & group_uuid,wsrep_seqno_t const last_needed)625 ReplicatorSMM::prepare_for_IST (void*& ptr, ssize_t& len,
626 int const group_proto_ver,
627 int const str_proto_ver,
628 const wsrep_uuid_t& group_uuid,
629 wsrep_seqno_t const last_needed)
630 {
631 assert(group_uuid != GU_UUID_NIL);
632 // Up from STR protocol version 3 joiner is assumed to be able receive
633 // some transactions to rebuild cert index, so IST receiver must be
634 // prepared regardless of the group.
635 wsrep_seqno_t last_applied(last_committed());
636 ist_event_queue_.reset();
637 if (state_uuid_ != group_uuid)
638 {
639 if (str_proto_ver < 3)
640 {
641 gu_throw_error (EPERM) << "Local state UUID (" << state_uuid_
642 << ") does not match group state UUID ("
643 << group_uuid << ')';
644 }
645 else
646 {
647 last_applied = -1; // to cause full SST
648 }
649 }
650 else
651 {
652 assert(last_applied < last_needed);
653 }
654
655 if (last_applied < 0 && str_proto_ver < 3)
656 {
657 gu_throw_error (EPERM) << "Local state seqno is undefined";
658 }
659
660 wsrep_seqno_t const first_needed(last_applied + 1);
661
662 log_info << "####### IST uuid:" << state_uuid_ << " f: " << first_needed
663 << ", l: " << last_needed << ", STRv: " << str_proto_ver; //remove
664
665 /* Historically IST messages are versioned with the global replicator
666 * protocol. Need to keep it that way for backward compatibility */
667 std::string recv_addr(ist_receiver_.prepare(first_needed, last_needed,
668 group_proto_ver, source_id()));
669
670 std::ostringstream os;
671
672 /* NOTE: in case last_applied is -1, first_needed is 0, but first legal
673 * cached seqno is 1 so donor will revert to SST anyways, as is required */
674 os << IST_request(recv_addr, state_uuid_, last_applied, last_needed);
675
676 char* str = strdup (os.str().c_str());
677
678 // cppcheck-suppress nullPointer
679 if (!str) gu_throw_error (ENOMEM) << "Failed to allocate IST buffer.";
680
681 log_debug << "Prepared IST request: " << str;
682
683 len = strlen(str) + 1;
684
685 ptr = str;
686 }
687
688
689 ReplicatorSMM::StateRequest*
prepare_state_request(const void * sst_req,ssize_t sst_req_len,int const group_proto_ver,int const str_proto_ver,const wsrep_uuid_t & group_uuid,wsrep_seqno_t const last_needed_seqno)690 ReplicatorSMM::prepare_state_request (const void* sst_req,
691 ssize_t sst_req_len,
692 int const group_proto_ver,
693 int const str_proto_ver,
694 const wsrep_uuid_t& group_uuid,
695 wsrep_seqno_t const last_needed_seqno)
696 {
697 try
698 {
699 // IF there are ongoing NBO, SST might not be possible because
700 // ongoing NBO is blocking and waiting for NBO end events.
701 // Therefore in precense of ongoing NBOs we set SST request
702 // string to zero and hope that donor can serve IST.
703
704 size_t const nbo_size(cert_.nbo_size());
705 if (nbo_size)
706 {
707 log_info << "Non-blocking operation is ongoing. "
708 "Node can receive IST only.";
709
710 sst_req = NULL;
711 sst_req_len = 0;
712 }
713
714 switch (str_proto_ver)
715 {
716 case 0:
717 if (0 == sst_req_len)
718 gu_throw_error(EPERM) << "SST is not possible.";
719 return new StateRequest_v0 (sst_req, sst_req_len);
720 case 1:
721 case 2:
722 case 3:
723 {
724 void* ist_req(0);
725 ssize_t ist_req_len(0);
726
727 try
728 {
729 // Note: IST uses group protocol version.
730 gu_trace(prepare_for_IST (ist_req, ist_req_len,
731 group_proto_ver,
732 str_proto_ver,
733 group_uuid, last_needed_seqno));
734 assert(ist_req_len > 0);
735 assert(NULL != ist_req);
736 }
737 catch (gu::Exception& e)
738 {
739 log_warn
740 << "Failed to prepare for incremental state transfer: "
741 << e.what() << ". IST will be unavailable.";
742
743 if (0 == sst_req_len)
744 gu_throw_error(EPERM) << "neither SST nor IST is possible.";
745 }
746
747 StateRequest* ret = new StateRequest_v1 (sst_req, sst_req_len,
748 ist_req, ist_req_len);
749 free (ist_req);
750 return ret;
751 }
752 default:
753 gu_throw_fatal << "Unsupported STR protocol: " << str_proto_ver;
754 }
755 }
756 catch (std::exception& e)
757 {
758 log_fatal << "State Transfer Request preparation failed: " << e.what()
759 << " Can't continue, aborting.";
760 }
761 catch (...)
762 {
763 log_fatal << "State Transfer Request preparation failed: "
764 "unknown exception. Can't continue, aborting.";
765 }
766 abort();
767 }
768
769 static bool
retry_str(int ret)770 retry_str(int ret)
771 {
772 return (ret == -EAGAIN || ret == -ENOTCONN);
773 }
774
775 void
send_state_request(const StateRequest * const req,int const str_proto_ver)776 ReplicatorSMM::send_state_request (const StateRequest* const req,
777 int const str_proto_ver)
778 {
779 long ret;
780 long tries = 0;
781
782 gu_uuid_t ist_uuid = {{0, }};
783 gcs_seqno_t ist_seqno = GCS_SEQNO_ILL;
784
785 if (req->ist_len())
786 {
787 IST_request istr;
788 get_ist_request(req, &istr);
789 ist_uuid = istr.uuid();
790 ist_seqno = istr.last_applied();
791 }
792
793 do
794 {
795 tries++;
796
797 gcs_seqno_t seqno_l;
798
799 ret = gcs_.request_state_transfer(str_proto_ver,
800 req->req(), req->len(), sst_donor_,
801 gu::GTID(ist_uuid, ist_seqno),seqno_l);
802 if (ret < 0)
803 {
804 if (!retry_str(ret))
805 {
806 log_error << "Requesting state transfer failed: "
807 << ret << "(" << strerror(-ret) << ")";
808 }
809 else if (1 == tries)
810 {
811 log_info << "Requesting state transfer failed: "
812 << ret << "(" << strerror(-ret) << "). "
813 << "Will keep retrying every " << sst_retry_sec_
814 << " second(s)";
815 }
816 }
817
818 if (seqno_l != GCS_SEQNO_ILL)
819 {
820 /* Check that we're not running out of space in monitor. */
821 if (local_monitor_.would_block(seqno_l))
822 {
823 log_error << "Slave queue grew too long while trying to "
824 << "request state transfer " << tries << " time(s). "
825 << "Please make sure that there is "
826 << "at least one fully synced member in the group. "
827 << "Application must be restarted.";
828 ret = -EDEADLK;
829 }
830 else
831 {
832 // we are already holding local monitor
833 LocalOrder lo(seqno_l);
834 local_monitor_.self_cancel(lo);
835 }
836 }
837 }
838 while (retry_str(ret) && (usleep(sst_retry_sec_ * 1000000), true));
839
840 if (ret >= 0)
841 {
842 if (1 == tries)
843 {
844 log_info << "Requesting state transfer: success, donor: " << ret;
845 }
846 else
847 {
848 log_info << "Requesting state transfer: success after "
849 << tries << " tries, donor: " << ret;
850 }
851 }
852 else
853 {
854 sst_state_ = SST_REQ_FAILED;
855
856 st_.set(state_uuid_, last_committed(), safe_to_bootstrap_);
857 st_.mark_safe();
858
859 gu::Lock lock(closing_mutex_);
860
861 if (!closing_ && state_() > S_CLOSED)
862 {
863 log_fatal << "State transfer request failed unrecoverably: "
864 << -ret << " (" << strerror(-ret) << "). Most likely "
865 << "it is due to inability to communicate with the "
866 << "cluster primary component. Restart required.";
867 abort();
868 }
869 else
870 {
871 // connection is being closed, send failure is expected
872 }
873 }
874 }
875
876
877 void
request_state_transfer(void * recv_ctx,int const group_proto_ver,const wsrep_uuid_t & group_uuid,wsrep_seqno_t const cc_seqno,const void * const sst_req,ssize_t const sst_req_len)878 ReplicatorSMM::request_state_transfer (void* recv_ctx,
879 int const group_proto_ver,
880 const wsrep_uuid_t& group_uuid,
881 wsrep_seqno_t const cc_seqno,
882 const void* const sst_req,
883 ssize_t const sst_req_len)
884 {
885 assert(sst_req_len >= 0);
886 int const str_proto_ver(get_str_proto_ver(group_proto_ver));
887
888 StateRequest* const req(prepare_state_request(sst_req, sst_req_len,
889 group_proto_ver,
890 str_proto_ver,
891 group_uuid, cc_seqno));
892 gu::Lock sst_lock(sst_mutex_);
893 sst_received_ = false;
894
895 st_.mark_unsafe();
896
897 GU_DBUG_SYNC_WAIT("before_send_state_request");
898 send_state_request(req, str_proto_ver);
899
900 state_.shift_to(S_JOINING);
901 sst_state_ = SST_WAIT;
902 sst_seqno_ = WSREP_SEQNO_UNDEFINED;
903 GU_DBUG_SYNC_WAIT("after_shift_to_joining");
904
905 /* There are two places where we may need to adjust GCache.
906 * This is the first one, which we can do while waiting for SST to complete.
907 * Here we reset seqno map completely if we have different histories.
908 * This MUST be done before IST starts. */
909 bool const first_reset
910 (state_uuid_ /* GCache has */ != group_uuid /* current PC has */);
911 if (first_reset)
912 {
913 log_info << "Resetting GCache seqno map due to different histories.";
914 gcache_.seqno_reset(gu::GTID(group_uuid, cc_seqno));
915 }
916
917 if (sst_req_len != 0)
918 {
919 if (sst_is_trivial(sst_req, sst_req_len) ||
920 no_sst (sst_req, sst_req_len))
921 {
922 sst_uuid_ = group_uuid;
923 sst_seqno_ = cc_seqno;
924 sst_received_ = true;
925 }
926 else
927 {
928 while (false == sst_received_) sst_lock.wait(sst_cond_);
929 }
930
931 if (sst_uuid_ != group_uuid)
932 {
933 log_fatal << "Application received wrong state: "
934 << "\n\tReceived: " << sst_uuid_
935 << "\n\tRequired: " << group_uuid;
936 sst_state_ = SST_FAILED;
937 log_fatal << "Application state transfer failed. This is "
938 << "unrecoverable condition, restart required.";
939
940 st_.set(sst_uuid_, sst_seqno_, safe_to_bootstrap_);
941 st_.mark_safe();
942
943 abort();
944 }
945 else
946 {
947 assert(sst_seqno_ != WSREP_SEQNO_UNDEFINED);
948
949 /* There are two places where we may need to adjust GCache.
950 * This is the second one.
951 * Here we reset seqno map completely if we have gap in seqnos
952 * between the received snapshot and current GCache contents.
953 * This MUST be done before IST starts. */
954 // there may be possible optimization to this when cert index
955 // transfer is implemented (it may close the gap), but not by much.
956 if (!first_reset && (last_committed() /* GCache has */ !=
957 sst_seqno_ /* current state has */))
958 {
959 log_info << "Resetting GCache seqno map due to seqno gap: "
960 << last_committed() << ".." << sst_seqno_;
961 gcache_.seqno_reset(gu::GTID(sst_uuid_, sst_seqno_));
962 }
963
964 update_state_uuid (sst_uuid_);
965
966 if (group_proto_ver < PROTO_VER_GALERA_3_MAX)
967 {
968 log_error << "Rolling upgrade from group protocol version "
969 << "earlier than "
970 << PROTO_VER_GALERA_3_MAX
971 << " is not supported. Please upgrade "
972 << "Galera library to latest in Galera 3 series on "
973 << "all of the nodes in the cluster before "
974 << "continuing.";
975 abort();
976 }
977 else if (group_proto_ver == PROTO_VER_GALERA_3_MAX)
978 {
979 // Rolling upgrade from Galera 3 PROTO_VER_GALERA_3_MAX.
980 gu::GTID const cert_position
981 (sst_uuid_, std::max(cc_seqno, sst_seqno_));
982 cert_.assign_initial_position(
983 cert_position,
984 std::get<0>(get_trx_protocol_versions(group_proto_ver)));
985 // with higher versions this happens in cert index preload
986 }
987
988 apply_monitor_.set_initial_position(WSREP_UUID_UNDEFINED, -1);
989 apply_monitor_.set_initial_position(sst_uuid_, sst_seqno_);
990
991 if (co_mode_ != CommitOrder::BYPASS)
992 {
993 commit_monitor_.set_initial_position(WSREP_UUID_UNDEFINED, -1);
994 commit_monitor_.set_initial_position(sst_uuid_, sst_seqno_);
995 }
996
997 log_info << "Installed new state from SST: " << state_uuid_ << ":"
998 << sst_seqno_;
999 }
1000 }
1001 else
1002 {
1003 assert (state_uuid_ == group_uuid);
1004 sst_seqno_ = last_committed();
1005 }
1006
1007 if (st_.corrupt())
1008 {
1009 if (sst_req_len != 0 && !sst_is_trivial(sst_req, sst_req_len))
1010 {
1011 // Note: not storing sst seqno in state file to avoid
1012 // recovering to incorrect state if the node is
1013 // killed during IST.
1014 st_.mark_uncorrupt(sst_uuid_, WSREP_SEQNO_UNDEFINED);
1015 }
1016 else
1017 {
1018 log_fatal << "Application state is corrupt and cannot "
1019 << "be recovered. Restart required.";
1020 abort();
1021 }
1022 }
1023 else
1024 {
1025 // Clear seqno from state file. Otherwise if node gets killed
1026 // during IST, it may recover to incorrect position.
1027 st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
1028 st_.mark_safe();
1029 }
1030
1031 if (req->ist_len() > 0)
1032 {
1033 if (state_uuid_ != group_uuid)
1034 {
1035 log_fatal << "Sanity check failed: my state UUID " << state_uuid_
1036 << " is different from group state UUID " << group_uuid
1037 << ". Can't continue with IST. Aborting.";
1038 st_.set(state_uuid_, last_committed(), safe_to_bootstrap_);
1039 st_.mark_safe();
1040 abort();
1041 }
1042
1043 // IST is prepared only with str proto ver 1 and above
1044 // IST is *always* prepared at str proto ver 3 or higher
1045 if (last_committed() < cc_seqno || str_proto_ver >= 3)
1046 {
1047 wsrep_seqno_t const ist_from(last_committed() + 1);
1048 wsrep_seqno_t const ist_to(cc_seqno);
1049 bool const do_ist(ist_from > 0 && ist_from <= ist_to);
1050
1051 if (do_ist)
1052 {
1053 log_info << "Receiving IST: " << (ist_to - ist_from + 1)
1054 << " writesets, seqnos " << ist_from
1055 << "-" << ist_to;
1056 }
1057 else
1058 {
1059 log_info << "Cert. index preload up to "
1060 << ist_from - 1;
1061 }
1062
1063 ist_receiver_.ready(ist_from);
1064 recv_IST(recv_ctx);
1065
1066 wsrep_seqno_t const ist_seqno(ist_receiver_.finished());
1067
1068 if (do_ist)
1069 {
1070 assert(ist_seqno > sst_seqno_); // must exceed sst_seqno_
1071 sst_seqno_ = ist_seqno;
1072
1073 // Note: apply_monitor_ must be drained to avoid race between
1074 // IST appliers and GCS appliers, GCS action source may
1075 // provide actions that have already been applied via IST.
1076 log_info << "Draining apply monitors after IST up to "
1077 << sst_seqno_;
1078 apply_monitor_.drain(sst_seqno_);
1079 set_initial_position(group_uuid, sst_seqno_);
1080 }
1081 else
1082 {
1083 assert(sst_seqno_ > 0); // must have been esptablished via SST
1084 assert(ist_seqno >= cc_seqno); // index must be rebuilt up to
1085 assert(ist_seqno <= sst_seqno_);
1086 }
1087
1088 if (ist_seqno == sst_seqno_)
1089 {
1090 log_info << "IST received: " << state_uuid_ << ":" <<ist_seqno;
1091 if (str_proto_ver < 3)
1092 {
1093 // see cert_.assign_initial_position() above
1094 assert(cc_seqno == ist_seqno);
1095 assert(cert_.lowest_trx_seqno() == ist_seqno);
1096 }
1097 }
1098 else
1099 log_info << "Cert. index preloaded up to " << ist_seqno;
1100 }
1101 else
1102 {
1103 (void)ist_receiver_.finished();
1104 }
1105 }
1106 else
1107 {
1108 // full SST can't be in the past
1109 assert(sst_seqno_ >= cc_seqno);
1110 }
1111
1112 #ifndef NDEBUG
1113 {
1114 gu::Lock lock(closing_mutex_);
1115 assert(sst_seqno_ >= cc_seqno || closing_ || state_() == S_CLOSED);
1116 }
1117 #endif /* NDEBUG */
1118
1119 delete req;
1120 }
1121
process_IST_writeset(void * recv_ctx,const TrxHandleSlavePtr & ts_ptr)1122 void ReplicatorSMM::process_IST_writeset(void* recv_ctx,
1123 const TrxHandleSlavePtr& ts_ptr)
1124 {
1125 TrxHandleSlave& ts(*ts_ptr);
1126
1127 assert(ts.global_seqno() > 0);
1128 assert(ts.state() != TrxHandle::S_COMMITTED);
1129 assert(ts.state() != TrxHandle::S_ROLLED_BACK);
1130
1131 bool const skip(ts.is_dummy());
1132
1133 if (gu_likely(!skip))
1134 {
1135 ts.verify_checksum();
1136
1137 assert(ts.certified());
1138 assert(ts.depends_seqno() >= 0);
1139 }
1140 else
1141 {
1142 assert(ts.is_dummy());
1143 }
1144
1145 try
1146 {
1147 apply_trx(recv_ctx, ts);
1148 }
1149 catch (...)
1150 {
1151 st_.mark_corrupt();
1152 throw;
1153 }
1154 GU_DBUG_SYNC_WAIT("recv_IST_after_apply_trx");
1155
1156 if (gu_unlikely
1157 (gu::Logger::no_log(gu::LOG_DEBUG) == false))
1158 {
1159 std::ostringstream os;
1160
1161 if (gu_likely(!skip))
1162 os << "IST received trx body: " << ts;
1163 else
1164 os << "IST skipping trx " << ts.global_seqno();
1165
1166 log_debug << os.str();
1167 }
1168 }
1169
1170
recv_IST(void * recv_ctx)1171 void ReplicatorSMM::recv_IST(void* recv_ctx)
1172 {
1173 ISTEvent::Type event_type(ISTEvent::T_NULL);
1174 TrxHandleSlavePtr ts;
1175 wsrep_view_info_t* view;
1176
1177 try
1178 {
1179 bool exit_loop(false);
1180
1181 while (exit_loop == false)
1182 {
1183 ISTEvent ev(ist_event_queue_.pop_front());
1184 event_type = ev.type();
1185 switch (event_type)
1186 {
1187 case ISTEvent::T_NULL:
1188 exit_loop = true;
1189 continue;
1190 case ISTEvent::T_TRX:
1191 ts = ev.ts();
1192 assert(ts);
1193 process_IST_writeset(recv_ctx, ts);
1194 exit_loop = ts->exit_loop();
1195 continue;
1196 case ISTEvent::T_VIEW:
1197 {
1198 view = ev.view();
1199 wsrep_seqno_t const cs(view->state_id.seqno);
1200
1201 submit_view_info(recv_ctx, view);
1202
1203 ::free(view);
1204
1205 CommitOrder co(cs, CommitOrder::NO_OOOC);
1206 commit_monitor_.leave(co);
1207 ApplyOrder ao(cs, cs - 1, false);
1208 apply_monitor_.leave(ao);
1209 GU_DBUG_SYNC_WAIT("recv_IST_after_conf_change");
1210 continue;
1211 }
1212 }
1213
1214 gu_throw_fatal << "Unrecognized event of type " << ev.type();
1215 }
1216 }
1217 catch (gu::Exception& e)
1218 {
1219 std::ostringstream os;
1220 os << "Receiving IST failed, node restart required: " << e.what();
1221
1222 switch (event_type)
1223 {
1224 case ISTEvent::T_NULL:
1225 os << ". Null event.";
1226 break;
1227 case ISTEvent::T_TRX:
1228 if (ts)
1229 os << ". Failed writeset: " << *ts;
1230 else
1231 os << ". Corrupt IST event queue.";
1232 break;
1233 case ISTEvent::T_VIEW:
1234 os << ". VIEW event";
1235 break;
1236 }
1237
1238 log_fatal << os.str();
1239
1240 gu::Lock lock(closing_mutex_);
1241 start_closing();
1242 }
1243 }
1244
handle_ist_nbo(const TrxHandleSlavePtr & ts,bool must_apply,bool preload)1245 void ReplicatorSMM::handle_ist_nbo(const TrxHandleSlavePtr& ts,
1246 bool must_apply, bool preload)
1247 {
1248 if (must_apply)
1249 {
1250 ts->verify_checksum();
1251 Certification::TestResult result(cert_.append_trx(ts));
1252 switch (result)
1253 {
1254 case Certification::TEST_OK:
1255 if (ts->nbo_end())
1256 {
1257 // This is the same as in process_trx()
1258 if (ts->ends_nbo() == WSREP_SEQNO_UNDEFINED)
1259 {
1260 assert(ts->is_dummy());
1261 }
1262 else
1263 {
1264 // Signal NBO waiter
1265 gu::shared_ptr<NBOCtx>::type nbo_ctx(
1266 cert_.nbo_ctx(ts->ends_nbo()));
1267 assert(nbo_ctx != 0);
1268 nbo_ctx->set_ts(ts);
1269 return; // not pushing to queue below
1270 }
1271 }
1272 break;
1273 case Certification::TEST_FAILED:
1274 {
1275 assert(ts->nbo_end()); // non-effective nbo_end
1276 assert(ts->is_dummy());
1277 break;
1278 }
1279 }
1280 /* regardless of certification outcome, event must be passed to
1281 * apply_trx() as it carries global seqno */
1282 }
1283 else
1284 {
1285 // Skipping NBO events in preload is fine since the joiner either
1286 // has all the events applied in case of pure IST or the
1287 // donor refuses to donate SST from the position with active NBO.
1288 assert(preload);
1289 log_debug << "Skipping NBO event: " << ts;
1290 wsrep_seqno_t const pos(cert_.increment_position());
1291 assert(ts->global_seqno() == pos);
1292 (void)pos;
1293 }
1294 if (gu_likely(must_apply == true))
1295 {
1296 ist_event_queue_.push_back(ts);
1297 }
1298 }
1299
1300 // Append IST trx to certification index. As trx has passed certification
1301 // on donor, certification is expected to pass. If it fails, exception
1302 // is thrown as the state is unrecoverable.
append_ist_trx(galera::Certification & cert,const TrxHandleSlavePtr & ts)1303 static void append_ist_trx(galera::Certification& cert,
1304 const TrxHandleSlavePtr& ts)
1305 {
1306 Certification::TestResult result(cert.append_trx(ts));
1307 if (result != Certification::TEST_OK)
1308 {
1309 gu_throw_fatal << "Pre IST trx append returned unexpected "
1310 << "certification result " << result
1311 << ", expected " << Certification::TEST_OK
1312 << "must abort to maintain consistency, "
1313 << " cert position: " << cert.position()
1314 << " ts: " << *ts;
1315 }
1316 }
1317
handle_ist_trx_preload(const TrxHandleSlavePtr & ts,bool const must_apply)1318 void ReplicatorSMM::handle_ist_trx_preload(const TrxHandleSlavePtr& ts,
1319 bool const must_apply)
1320 {
1321 if (not ts->is_dummy())
1322 {
1323 append_ist_trx(cert_, ts);
1324 if (not must_apply)
1325 {
1326 // Pure preload event will not get applied, so mark it committed
1327 // here for certification bookkeeping.
1328 cert_.set_trx_committed(*ts);
1329 }
1330 }
1331 else if (cert_.position() != WSREP_SEQNO_UNDEFINED)
1332 {
1333 // Increment position to keep track only if the initial
1334 // seqno has already been assigned.
1335 wsrep_seqno_t const pos __attribute__((unused))(
1336 cert_.increment_position());
1337 assert(ts->global_seqno() == pos);
1338 }
1339 }
1340
handle_ist_trx(const TrxHandleSlavePtr & ts,bool must_apply,bool preload)1341 void ReplicatorSMM::handle_ist_trx(const TrxHandleSlavePtr& ts,
1342 bool must_apply, bool preload)
1343 {
1344 if (preload)
1345 {
1346 handle_ist_trx_preload(ts, must_apply);
1347 }
1348 if (must_apply)
1349 {
1350 ist_event_queue_.push_back(ts);
1351 }
1352 }
1353
ist_trx(const TrxHandleSlavePtr & ts,bool must_apply,bool preload)1354 void ReplicatorSMM::ist_trx(const TrxHandleSlavePtr& ts, bool must_apply,
1355 bool preload)
1356 {
1357 assert(ts != 0);
1358
1359 assert(ts->depends_seqno() >= 0 || ts->is_dummy() || ts->nbo_end());
1360 assert(ts->local_seqno() == WSREP_SEQNO_UNDEFINED);
1361 assert(sst_seqno_ > 0);
1362
1363 ts->verify_checksum();
1364
1365 // Note: Write sets which do not have preload or must_apply flag set
1366 // are used to populate joiner gcache to have enough history
1367 // to be able to donate IST for following joiners. Therefore
1368 // they don't need to be applied or used to populate certification
1369 // index and are just skipped here.
1370 if (not (preload || must_apply))
1371 {
1372 return;
1373 }
1374
1375 if (gu_unlikely(cert_.position() == WSREP_SEQNO_UNDEFINED))
1376 {
1377 if (not ts->is_dummy())
1378 {
1379 // This is the first pre IST event for rebuilding cert index.
1380 // Note that we skip dummy write sets here as they don't carry
1381 // version information. If the IST will contain only dummy
1382 // write sets, the last CC belonging into IST which corresponds
1383 // to CC which triggered STR, will initialize cert index.
1384 assert(ts->version() > 0);
1385 cert_.assign_initial_position(
1386 /* proper UUID will be installed by CC */
1387 gu::GTID(gu::UUID(), ts->global_seqno() - 1), ts->version());
1388 }
1389 }
1390
1391 assert(ts->state() == TrxHandleSlave::S_REPLICATING);
1392 ts->set_state(TrxHandleSlave::S_CERTIFYING);
1393
1394 if (ts->nbo_start() || ts->nbo_end())
1395 {
1396 handle_ist_nbo(ts, must_apply, preload);
1397 }
1398 else
1399 {
1400 handle_ist_trx(ts, must_apply, preload);
1401 }
1402 }
1403
ist_end(int error)1404 void ReplicatorSMM::ist_end(int error)
1405 {
1406 ist_event_queue_.eof(error);
1407 }
1408
process_ist_conf_change(const gcs_act_cchange & conf)1409 void galera::ReplicatorSMM::process_ist_conf_change(const gcs_act_cchange& conf)
1410 {
1411 // IST should contain only ordered CCs
1412 assert(conf.repl_proto_ver >= PROTO_VER_ORDERED_CC);
1413
1414 // Drain monitors to make sure that all preceding IST events have
1415 // been applied.
1416 drain_monitors(conf.seqno - 1);
1417 // Create view info. This will be consumed by ist_event_queue_.push_back().
1418 wsrep_uuid_t uuid_undefined(WSREP_UUID_UNDEFINED);
1419 wsrep_view_info_t* const view_info
1420 (galera_view_info_create(conf,
1421 capabilities(conf.repl_proto_ver),
1422 -1, uuid_undefined));
1423 // IST view status should always be Primary
1424 assert(view_info->status == WSREP_VIEW_PRIMARY);
1425 // Establish protocol version before adjusting cert position as
1426 // trx_params_.version is decided there.
1427 establish_protocol_versions (conf.repl_proto_ver);
1428 cert_.adjust_position(*view_info, gu::GTID(conf.uuid, conf.seqno),
1429 trx_params_.version_);
1430 update_incoming_list(*view_info);
1431 record_cc_seqnos(conf.seqno, "ist");
1432
1433 // TO monitors need to be entered here to maintain critical
1434 // section over passing the view through the event queue to
1435 // an applier and ensure that the view is submitted in isolation.
1436 // Applier is to leave monitors and free the view after it is
1437 // submitted.
1438 ApplyOrder ao(conf.seqno, conf.seqno - 1, false);
1439 apply_monitor_.enter(ao);
1440 CommitOrder co(conf.seqno, CommitOrder::NO_OOOC);
1441 commit_monitor_.enter(co);
1442 ist_event_queue_.push_back(view_info);
1443 }
1444
ist_cc(const gcs_action & act,bool must_apply,bool preload)1445 void ReplicatorSMM::ist_cc(const gcs_action& act, bool must_apply,
1446 bool preload)
1447 {
1448 assert(GCS_ACT_CCHANGE == act.type);
1449 assert(act.seqno_g > 0);
1450
1451 gcs_act_cchange const conf(act.buf, act.size);
1452
1453 assert(conf.conf_id >= 0); // Primary configuration
1454 assert(conf.seqno == act.seqno_g);
1455
1456 if (gu_unlikely(cert_.position() == WSREP_SEQNO_UNDEFINED) &&
1457 (must_apply || preload))
1458 {
1459 // This is the first IST event for rebuilding cert index,
1460 // need to initialize certification
1461 establish_protocol_versions(conf.repl_proto_ver);
1462 cert_.assign_initial_position(gu::GTID(conf.uuid, conf.seqno - 1),
1463 trx_params_.version_);
1464 }
1465
1466 if (must_apply == true)
1467 {
1468 // Will generate and queue view info. Monitors are handled by
1469 // slave appliers when the view_info is consumed.
1470 process_ist_conf_change(conf);
1471 }
1472 else
1473 {
1474 if (preload == true)
1475 {
1476 wsrep_uuid_t uuid_undefined(WSREP_UUID_UNDEFINED);
1477 wsrep_view_info_t* const view_info(
1478 galera_view_info_create(conf, capabilities(conf.repl_proto_ver),
1479 -1, uuid_undefined));
1480 /* CC is part of index preload but won't be processed
1481 * by process_conf_change()
1482 * Order of these calls is essential: trx_params_.version_ may
1483 * be altered by establish_protocol_versions() */
1484 establish_protocol_versions(conf.repl_proto_ver);
1485 cert_.adjust_position(*view_info, gu::GTID(conf.uuid, conf.seqno),
1486 trx_params_.version_);
1487 // record CC related state seqnos, needed for IST on DONOR
1488 record_cc_seqnos(conf.seqno, "preload");
1489 ::free(view_info);
1490 }
1491 }
1492 }
1493
1494 } /* namespace galera */
1495