1 //
2 // Copyright (C) 2010-2020 Codership Oy <info@codership.com>
3 //
4 
5 #include "replicator_smm.hpp"
6 #include "galera_info.hpp"
7 
8 #include <gu_abort.h>
9 #include <gu_throw.hpp>
10 
11 /*
12  * Decide STR protocol version based on group protocol version.
13  * For more infromation about versions, see table in replicator_smm.hpp.
14  */
get_str_proto_ver(int const group_proto_ver)15 static int get_str_proto_ver(int const group_proto_ver)
16 {
17     switch (group_proto_ver)
18     {
19     case 1:
20         return 0;
21     case 2:
22     case 3:
23     case 4:
24     case 5:
25         return 1;
26     case 6:
27     case 7:
28     case 8:
29     case 9:
30         // gcs intelligent donor selection.
31         // include handling dangling comma in donor string.
32         return 2;
33     case 10:
34         // 4.x
35         // CC events in IST, certification index preload
36         return 3;
37     default:
38         gu_throw_error(EPROTO)
39             << "Can't find suitable STR protocol version based on "
40             << "group protocol version: " << group_proto_ver;
41     }
42 }
43 
44 namespace galera {
45 
46 bool
state_transfer_required(const wsrep_view_info_t & view_info,int const group_proto_ver,bool const rejoined)47 ReplicatorSMM::state_transfer_required(const wsrep_view_info_t& view_info,
48                                        int const group_proto_ver,
49                                        bool const rejoined)
50 {
51     const int str_proto_ver(get_str_proto_ver(group_proto_ver));
52     if (rejoined)
53     {
54         assert(view_info.view >= 0);
55 
56         if (state_uuid_ == view_info.state_id.uuid) // common history
57         {
58             wsrep_seqno_t const group_seqno(view_info.state_id.seqno);
59             wsrep_seqno_t const local_seqno(last_committed());
60 
61             if (str_proto_ver >= 3)
62                 return (local_seqno + 1 < group_seqno); // this CC will add 1
63             else
64                 return (local_seqno < group_seqno);
65         }
66 
67         return true;
68     }
69 
70     return false;
71 }
72 
73 wsrep_status_t
sst_received(const wsrep_gtid_t & state_id,const wsrep_buf_t * const state,int const rcode)74 ReplicatorSMM::sst_received(const wsrep_gtid_t& state_id,
75                             const wsrep_buf_t* const state,
76                             int                const rcode)
77 {
78     log_info << "SST received: " << state_id.uuid << ':' << state_id.seqno;
79 
80     gu::Lock lock(sst_mutex_);
81 
82     if (state_() != S_JOINING)
83     {
84         log_error << "not JOINING when sst_received() called, state: "
85                   << state_();
86         return WSREP_CONN_FAIL;
87     }
88 
89     assert(rcode <= 0);
90     if (rcode) { assert(state_id.seqno == WSREP_SEQNO_UNDEFINED); }
91 
92     sst_uuid_  = state_id.uuid;
93     sst_seqno_ = rcode ? WSREP_SEQNO_UNDEFINED : state_id.seqno;
94     assert(false == sst_received_);
95     sst_received_ = true;
96     sst_cond_.signal();
97 
98     return WSREP_OK;
99 }
100 
101 
102 class StateRequest_v0 : public ReplicatorSMM::StateRequest
103 {
104 public:
StateRequest_v0(const void * const sst_req,ssize_t const sst_req_len)105     StateRequest_v0 (const void* const sst_req, ssize_t const sst_req_len)
106         : req_(sst_req), len_(sst_req_len)
107     {}
~StateRequest_v0()108     ~StateRequest_v0 () {}
version() const109     virtual int         version () const { return 0;    }
req() const110     virtual const void* req     () const { return req_; }
len() const111     virtual ssize_t     len     () const { return len_; }
sst_req() const112     virtual const void* sst_req () const { return req_; }
sst_len() const113     virtual ssize_t     sst_len () const { return len_; }
ist_req() const114     virtual const void* ist_req () const { return 0;    }
ist_len() const115     virtual ssize_t     ist_len () const { return 0;    }
116 private:
117     StateRequest_v0 (const StateRequest_v0&);
118     StateRequest_v0& operator = (const StateRequest_v0&);
119     const void* const req_;
120     ssize_t     const len_;
121 };
122 
123 
124 class StateRequest_v1 : public ReplicatorSMM::StateRequest
125 {
126 public:
127     static std::string const MAGIC;
128     StateRequest_v1 (const void* sst_req, ssize_t sst_req_len,
129                      const void* ist_req, ssize_t ist_req_len);
130     StateRequest_v1 (const void* str, ssize_t str_len);
~StateRequest_v1()131     ~StateRequest_v1 () { if (own_ && req_) free (req_); }
version() const132     virtual int         version () const { return 1;    }
req() const133     virtual const void* req     () const { return req_; }
len() const134     virtual ssize_t     len     () const { return len_; }
sst_req() const135     virtual const void* sst_req () const { return req(sst_offset()); }
sst_len() const136     virtual ssize_t     sst_len () const { return len(sst_offset()); }
ist_req() const137     virtual const void* ist_req () const { return req(ist_offset()); }
ist_len() const138     virtual ssize_t     ist_len () const { return len(ist_offset()); }
139 private:
140     StateRequest_v1 (const StateRequest_v1&);
141     StateRequest_v1& operator = (const StateRequest_v1&);
142 
sst_offset() const143     ssize_t sst_offset() const { return MAGIC.length() + 1; }
ist_offset() const144     ssize_t ist_offset() const
145     {
146         return sst_offset() + sizeof(uint32_t) + sst_len();
147     }
148 
len(ssize_t offset) const149     ssize_t len (ssize_t offset) const
150     {
151         int32_t ret;
152         gu::unserialize4(req_, offset, ret);
153         return ret;
154     }
155 
req(ssize_t offset) const156     void*   req (ssize_t offset) const
157     {
158         if (len(offset) > 0)
159             return req_ + offset + sizeof(uint32_t);
160         else
161             return 0;
162     }
163 
164     ssize_t const len_;
165     char*   const req_;
166     bool    const own_;
167 };
168 
169 std::string const
170 StateRequest_v1::MAGIC("STRv1");
171 
172 #ifndef INT32_MAX
173 #define INT32_MAX 0x7fffffff
174 #endif
175 
StateRequest_v1(const void * const sst_req,ssize_t const sst_req_len,const void * const ist_req,ssize_t const ist_req_len)176 StateRequest_v1::StateRequest_v1 (
177     const void* const sst_req, ssize_t const sst_req_len,
178     const void* const ist_req, ssize_t const ist_req_len)
179     :
180     len_(MAGIC.length() + 1 +
181          sizeof(uint32_t) + sst_req_len +
182          sizeof(uint32_t) + ist_req_len),
183     req_(static_cast<char*>(malloc(len_))),
184     own_(true)
185 {
186     if (!req_)
187         gu_throw_error (ENOMEM) << "Could not allocate state request v1";
188 
189     if (sst_req_len > INT32_MAX || sst_req_len < 0)
190         gu_throw_error (EMSGSIZE) << "SST request length (" << sst_req_len
191                                << ") unrepresentable";
192 
193     if (ist_req_len > INT32_MAX || ist_req_len < 0)
194         gu_throw_error (EMSGSIZE) << "IST request length (" << sst_req_len
195                                << ") unrepresentable";
196 
197     char* ptr(req_);
198 
199     strcpy (ptr, MAGIC.c_str());
200     ptr += MAGIC.length() + 1;
201 
202     ptr += gu::serialize4(uint32_t(sst_req_len), ptr, 0);
203 
204     memcpy (ptr, sst_req, sst_req_len);
205     ptr += sst_req_len;
206 
207     ptr += gu::serialize4(uint32_t(ist_req_len), ptr, 0);
208 
209     memcpy (ptr, ist_req, ist_req_len);
210 
211     assert ((ptr - req_) == (len_ - ist_req_len));
212 }
213 
214 // takes ownership over str buffer
StateRequest_v1(const void * str,ssize_t str_len)215 StateRequest_v1::StateRequest_v1 (const void* str, ssize_t str_len)
216 :
217     len_(str_len),
218     req_(static_cast<char*>(const_cast<void*>(str))),
219     own_(false)
220 {
221     if (sst_offset() + 2*sizeof(uint32_t) > size_t(len_))
222     {
223         assert(0);
224         gu_throw_error (EINVAL) << "State transfer request is too short: "
225                                 << len_ << ", must be at least: "
226                                 << (sst_offset() + 2*sizeof(uint32_t));
227     }
228 
229     if (strncmp (req_, MAGIC.c_str(), MAGIC.length()))
230     {
231         assert(0);
232         gu_throw_error (EINVAL) << "Wrong magic signature in state request v1.";
233     }
234 
235     if (sst_offset() + sst_len() + 2*sizeof(uint32_t) > size_t(len_))
236     {
237         gu_throw_error (EINVAL) << "Malformed state request v1: sst length: "
238                                 << sst_len() << ", total length: " << len_;
239     }
240 
241     if (ist_offset() + ist_len() + sizeof(uint32_t) != size_t(len_))
242     {
243         gu_throw_error (EINVAL) << "Malformed state request v1: parsed field "
244             "length " << sst_len() << " is not equal to total request length "
245                                 << len_;
246     }
247 }
248 
249 
250 static ReplicatorSMM::StateRequest*
read_state_request(const void * const req,size_t const req_len)251 read_state_request (const void* const req, size_t const req_len)
252 {
253     const char* const str(static_cast<const char*>(req));
254 
255     bool const v1(req_len > StateRequest_v1::MAGIC.length() &&
256                   !strncmp(str, StateRequest_v1::MAGIC.c_str(),
257                            StateRequest_v1::MAGIC.length()));
258 
259     log_info << "Detected STR version: " << int(v1) << ", req_len: "
260              << req_len << ", req: " << str;
261     if (v1)
262     {
263         return (new StateRequest_v1(req, req_len));
264     }
265     else
266     {
267         return (new StateRequest_v0(req, req_len));
268     }
269 }
270 
271 
272 class IST_request
273 {
274 public:
IST_request()275     IST_request() : peer_(), uuid_(), last_applied_(), group_seqno_() { }
IST_request(const std::string & peer,const wsrep_uuid_t & uuid,wsrep_seqno_t last_applied,wsrep_seqno_t last_missing_seqno)276     IST_request(const std::string& peer,
277                 const wsrep_uuid_t& uuid,
278                 wsrep_seqno_t last_applied,
279                 wsrep_seqno_t last_missing_seqno)
280         :
281         peer_(peer),
282         uuid_(uuid),
283         last_applied_(last_applied),
284         group_seqno_(last_missing_seqno)
285     { }
peer() const286     const std::string&  peer()  const { return peer_ ; }
uuid() const287     const wsrep_uuid_t& uuid()  const { return uuid_ ; }
last_applied() const288     wsrep_seqno_t       last_applied() const { return last_applied_; }
group_seqno() const289     wsrep_seqno_t       group_seqno()  const { return group_seqno_; }
290 private:
291     friend std::ostream& operator<<(std::ostream&, const IST_request&);
292     friend std::istream& operator>>(std::istream&, IST_request&);
293     std::string peer_;
294     wsrep_uuid_t uuid_;
295     wsrep_seqno_t last_applied_;
296     wsrep_seqno_t group_seqno_;
297 };
298 
operator <<(std::ostream & os,const IST_request & istr)299 std::ostream& operator<<(std::ostream& os, const IST_request& istr)
300 {
301     return (os
302             << istr.uuid_         << ":"
303             << istr.last_applied_ << "-"
304             << istr.group_seqno_  << "|"
305             << istr.peer_);
306 }
307 
operator >>(std::istream & is,IST_request & istr)308 std::istream& operator>>(std::istream& is, IST_request& istr)
309 {
310     char c;
311     return (is >> istr.uuid_ >> c >> istr.last_applied_
312             >> c >> istr.group_seqno_ >> c >> istr.peer_);
313 }
314 
315 static void
get_ist_request(const ReplicatorSMM::StateRequest * str,IST_request * istr)316 get_ist_request(const ReplicatorSMM::StateRequest* str, IST_request* istr)
317 {
318   assert(str->ist_len());
319   std::string ist_str(static_cast<const char*>(str->ist_req()),
320                       str->ist_len());
321   std::istringstream is(ist_str);
322   is >> *istr;
323 }
324 
325 static bool
sst_is_trivial(const void * const req,size_t const len)326 sst_is_trivial (const void* const req, size_t const len)
327 {
328     /* Check that the first string in request == ReplicatorSMM::TRIVIAL_SST */
329     static size_t const trivial_len(strlen(ReplicatorSMM::TRIVIAL_SST) + 1);
330     return (len >= trivial_len &&
331             !::memcmp(req, ReplicatorSMM::TRIVIAL_SST, trivial_len));
332 }
333 
334 static bool
no_sst(const void * const req,size_t const len)335 no_sst (const void* const req, size_t const len)
336 {
337     /* Check that the first string in request == ReplicatorSMM::NO_SST */
338     static size_t const no_len(strlen(ReplicatorSMM::NO_SST) + 1);
339     return (len >= no_len &&
340             !::memcmp(req, ReplicatorSMM::NO_SST, no_len));
341 }
342 
343 wsrep_seqno_t
donate_sst(void * const recv_ctx,const StateRequest & streq,const wsrep_gtid_t & state_id,bool const bypass)344 ReplicatorSMM::donate_sst(void* const         recv_ctx,
345                           const StateRequest& streq,
346                           const wsrep_gtid_t& state_id,
347                           bool const          bypass)
348 {
349     wsrep_buf_t const str = { streq.sst_req(), size_t(streq.sst_len()) };
350 
351     wsrep_cb_status const err(sst_donate_cb_(app_ctx_, recv_ctx, &str,
352                                              &state_id, NULL, bypass));
353 
354     wsrep_seqno_t const ret
355         (WSREP_CB_SUCCESS == err ? state_id.seqno : -ECANCELED);
356 
357     if (ret < 0)
358     {
359         log_error << "SST " << (bypass ? "bypass " : "") << "failed: " << err;
360     }
361 
362     return ret;
363 }
364 
365 struct slg
366 {
367     gcache::GCache& gcache_;
368     bool            unlock_;
369 
slggalera::slg370     slg(gcache::GCache& cache) : gcache_(cache), unlock_(false){}
~slggalera::slg371     ~slg() { if (unlock_) gcache_.seqno_unlock(); }
372 };
373 
run_ist_senders(ist::AsyncSenderMap & ist_senders,const gu::Config & config,const std::string & peer,wsrep_seqno_t const preload_start,wsrep_seqno_t const cc_seqno,wsrep_seqno_t const cc_lowest,int const proto_ver,slg & seqno_lock_guard,wsrep_seqno_t const rcode)374 static wsrep_seqno_t run_ist_senders(ist::AsyncSenderMap& ist_senders,
375                                      const gu::Config&    config,
376                                      const std::string&   peer,
377                                      wsrep_seqno_t const  preload_start,
378                                      wsrep_seqno_t const  cc_seqno,
379                                      wsrep_seqno_t const  cc_lowest,
380                                      int const            proto_ver,
381                                      slg&                 seqno_lock_guard,
382                                      wsrep_seqno_t const  rcode)
383 {
384     try
385     {
386         ist_senders.run(config,
387                         peer,
388                         preload_start,
389                         cc_seqno,
390                         cc_lowest,
391                         proto_ver);
392         // seqno will be unlocked when sender exists
393         seqno_lock_guard.unlock_ = false;
394         return rcode;
395     }
396     catch (gu::Exception& e)
397     {
398         log_warn << "IST failed: " << e.what();
399         return -e.get_errno();
400     }
401 }
402 
process_state_req(void * recv_ctx,const void * req,size_t req_size,wsrep_seqno_t const seqno_l,wsrep_seqno_t const donor_seq)403 void ReplicatorSMM::process_state_req(void*       recv_ctx,
404                                       const void* req,
405                                       size_t      req_size,
406                                       wsrep_seqno_t const seqno_l,
407                                       wsrep_seqno_t const donor_seq)
408 {
409     assert(recv_ctx != 0);
410     assert(seqno_l > -1);
411     assert(req != 0);
412 
413     StateRequest* const streq(read_state_request(req, req_size));
414     // Guess correct STR protocol version. Here we assume that the
415     // replicator protocol version didn't change between sending
416     // and receiving STR message. Unfortunately the protocol version
417     // is not yet available in STR message.
418     int const str_proto_ver(get_str_proto_ver(protocol_version_));
419 
420     LocalOrder lo(seqno_l);
421 
422     gu_trace(local_monitor_.enter(lo));
423     apply_monitor_.drain(donor_seq);
424 
425     if (co_mode_ != CommitOrder::BYPASS) commit_monitor_.drain(donor_seq);
426 
427     state_.shift_to(S_DONOR);
428 
429     // somehow the following does not work, string is initialized beyond
430     // the first \0:
431     //std::string const req_str(static_cast<const char*>(streq->sst_req()),
432     //                          streq->sst_len());
433     // have to resort to C ways.
434 
435     char* const tmp(strndup(static_cast<const char*>(streq->sst_req()),
436                             streq->sst_len()));
437     std::string const req_str(tmp);
438     free (tmp);
439 
440     bool const trivial_sst(sst_is_trivial(streq->sst_req(), streq->sst_len()));
441     bool const skip_sst(trivial_sst
442                         || no_sst(streq->sst_req(), streq->sst_len()));
443 
444     wsrep_seqno_t rcode (0);
445     bool join_now = true;
446 
447     if (not skip_sst)
448     {
449         slg seqno_lock_guard(gcache_);
450 
451         if (streq->ist_len())
452         {
453             IST_request istr;
454             get_ist_request(streq, &istr);
455 
456             if (istr.uuid() == state_uuid_ && istr.last_applied() >= 0)
457             {
458                 log_info << "IST request: " << istr;
459 
460                 wsrep_seqno_t const first
461                     ((str_proto_ver < 3 || cc_lowest_trx_seqno_ == 0) ?
462                     istr.last_applied() + 1 :
463                     std::min(cc_lowest_trx_seqno_, istr.last_applied()+1));
464 
465                 try
466                 {
467                     gcache_.seqno_lock(first);
468                     seqno_lock_guard.unlock_ = true;
469                 }
470                 catch(gu::NotFound& nf)
471                 {
472                     log_info << "IST first seqno " << istr.last_applied() + 1
473                              << " not found from cache, falling back to SST";
474                     // @todo: close IST channel explicitly
475                     goto full_sst;
476                 }
477 
478                 if (streq->sst_len()) // if joiner is waiting for SST, notify it
479                 {
480                     wsrep_gtid_t const state_id =
481                         { istr.uuid(), istr.last_applied() };
482 
483                     gu_trace(rcode = donate_sst(recv_ctx, *streq, state_id, true));
484 
485                     // we will join in sst_sent.
486                     join_now = false;
487                 }
488 
489                 if (rcode >= 0)
490                 {
491                     rcode = run_ist_senders(ist_senders_,
492                                             config_,
493                                             istr.peer(),
494                                             first,
495                                             cc_seqno_,
496                                             cc_lowest_trx_seqno_,
497                         /* Historically IST messages are versioned
498                          * with the global replicator protocol.
499                          * Need to keep it that way for backward
500                          * compatibility */
501                                             protocol_version_,
502                                             seqno_lock_guard,
503                                             rcode);
504                 }
505                 else
506                 {
507                     log_error << "Failed to bypass SST";
508                 }
509 
510                 goto out;
511             }
512         }
513 
514     full_sst:
515 
516         assert(!seqno_lock_guard.unlock_);
517 
518         if (cert_.nbo_size() > 0)
519         {
520             log_warn << "Non-blocking operation in progress, cannot donate SST";
521             rcode = -EAGAIN;
522         }
523         else if (streq->sst_len())
524         {
525             assert(0 == rcode);
526 
527             wsrep_gtid_t const state_id = { state_uuid_, donor_seq };
528 
529             if (str_proto_ver >= 3)
530             {
531                 if (streq->version() > 0)
532                 {
533                     if (streq->ist_len() <= 0)
534                     {
535                         if (not trivial_sst)
536                         {
537                             log_warn << "Joiner didn't provide IST connection "
538                                 "info - cert. index preload impossible, bailing "
539                                 "out.";
540                             rcode = -ENOMSG;
541                         }
542                         else
543                         {
544                             /* don't warn about trivial SST requests: such nodes
545                              * are not supposed to fully join the cluster, e,g,
546                              * garbd */
547                         }
548                         goto out;
549                     }
550 
551                     wsrep_seqno_t preload_start(cc_lowest_trx_seqno_);
552 
553                     try
554                     {
555                         if (preload_start <= 0)
556                         {
557                             preload_start = cc_seqno_;
558                         }
559 
560                         gcache_.seqno_lock(preload_start);
561                         seqno_lock_guard.unlock_ = true;
562                     }
563                     catch (gu::NotFound& nf)
564                     {
565                         log_warn << "Cert index preload first seqno "
566                                  << preload_start
567                                  << " not found from gcache (min available: "
568                                  << gcache_.seqno_min() << ')';
569                         rcode = -ENOMSG;
570                         goto out;
571                     }
572 
573                     log_info << "Cert index preload: " << preload_start
574                              << " -> " << cc_seqno_;
575 
576                     IST_request istr;
577                     get_ist_request(streq, &istr);
578                     // Send trxs to rebuild cert index.
579                     rcode = run_ist_senders(ist_senders_,
580                                             config_,
581                                             istr.peer(),
582                                             preload_start,
583                                             cc_seqno_,
584                                             preload_start,
585                         /* Historically IST messages are versioned
586                          * with the global replicator protocol.
587                          * Need to keep it that way for backward
588                          * compatibility */
589                                             protocol_version_,
590                                             seqno_lock_guard,
591                                             rcode);
592                     if (rcode < 0) goto out;
593                 }
594                 else /* streq->version() == 0 */
595                 {
596                     log_info << "STR v0: assuming backup request, skipping "
597                         "cert. index preload.";
598                 }
599             }
600 
601             rcode = donate_sst(recv_ctx, *streq, state_id, false);
602             // we will join in sst_sent.
603             join_now = false;
604         }
605         else
606         {
607             log_warn << "SST request is null, SST canceled.";
608             rcode = -ECANCELED;
609         }
610     }
611 
612 out:
613     delete streq;
614 
615     local_monitor_.leave(lo);
616 
617     if (join_now || rcode < 0)
618     {
619         gcs_.join(gu::GTID(state_uuid_, donor_seq), rcode);
620     }
621 }
622 
623 
624 void
prepare_for_IST(void * & ptr,ssize_t & len,int const group_proto_ver,int const str_proto_ver,const wsrep_uuid_t & group_uuid,wsrep_seqno_t const last_needed)625 ReplicatorSMM::prepare_for_IST (void*& ptr, ssize_t& len,
626                                 int const group_proto_ver,
627                                 int const str_proto_ver,
628                                 const wsrep_uuid_t& group_uuid,
629                                 wsrep_seqno_t const last_needed)
630 {
631     assert(group_uuid != GU_UUID_NIL);
632     // Up from STR protocol version 3 joiner is assumed to be able receive
633     // some transactions to rebuild cert index, so IST receiver must be
634     // prepared regardless of the group.
635     wsrep_seqno_t last_applied(last_committed());
636     ist_event_queue_.reset();
637     if (state_uuid_ != group_uuid)
638     {
639         if (str_proto_ver < 3)
640         {
641             gu_throw_error (EPERM) << "Local state UUID (" << state_uuid_
642                                    << ") does not match group state UUID ("
643                                    << group_uuid << ')';
644         }
645         else
646         {
647             last_applied = -1; // to cause full SST
648         }
649     }
650     else
651     {
652         assert(last_applied < last_needed);
653     }
654 
655     if (last_applied < 0 && str_proto_ver < 3)
656     {
657         gu_throw_error (EPERM) << "Local state seqno is undefined";
658     }
659 
660     wsrep_seqno_t const first_needed(last_applied + 1);
661 
662     log_info << "####### IST uuid:" << state_uuid_ << " f: " << first_needed
663              << ", l: " << last_needed << ", STRv: " << str_proto_ver; //remove
664 
665     /* Historically IST messages are versioned with the global replicator
666      * protocol. Need to keep it that way for backward compatibility */
667     std::string recv_addr(ist_receiver_.prepare(first_needed, last_needed,
668                                                 group_proto_ver, source_id()));
669 
670     std::ostringstream os;
671 
672     /* NOTE: in case last_applied is -1, first_needed is 0, but first legal
673      * cached seqno is 1 so donor will revert to SST anyways, as is required */
674     os << IST_request(recv_addr, state_uuid_, last_applied, last_needed);
675 
676     char* str = strdup (os.str().c_str());
677 
678     // cppcheck-suppress nullPointer
679     if (!str) gu_throw_error (ENOMEM) << "Failed to allocate IST buffer.";
680 
681     log_debug << "Prepared IST request: " << str;
682 
683     len = strlen(str) + 1;
684 
685     ptr = str;
686 }
687 
688 
689 ReplicatorSMM::StateRequest*
prepare_state_request(const void * sst_req,ssize_t sst_req_len,int const group_proto_ver,int const str_proto_ver,const wsrep_uuid_t & group_uuid,wsrep_seqno_t const last_needed_seqno)690 ReplicatorSMM::prepare_state_request (const void*         sst_req,
691                                       ssize_t             sst_req_len,
692                                       int const           group_proto_ver,
693                                       int const           str_proto_ver,
694                                       const wsrep_uuid_t& group_uuid,
695                                       wsrep_seqno_t const last_needed_seqno)
696 {
697     try
698     {
699         // IF there are ongoing NBO, SST might not be possible because
700         // ongoing NBO is blocking and waiting for NBO end events.
701         // Therefore in precense of ongoing NBOs we set SST request
702         // string to zero and hope that donor can serve IST.
703 
704         size_t const nbo_size(cert_.nbo_size());
705         if (nbo_size)
706         {
707             log_info << "Non-blocking operation is ongoing. "
708                 "Node can receive IST only.";
709 
710             sst_req     = NULL;
711             sst_req_len = 0;
712         }
713 
714         switch (str_proto_ver)
715         {
716         case 0:
717             if (0 == sst_req_len)
718                 gu_throw_error(EPERM) << "SST is not possible.";
719             return new StateRequest_v0 (sst_req, sst_req_len);
720         case 1:
721         case 2:
722         case 3:
723         {
724             void*   ist_req(0);
725             ssize_t ist_req_len(0);
726 
727             try
728             {
729                 // Note: IST uses group protocol version.
730                 gu_trace(prepare_for_IST (ist_req, ist_req_len,
731                                           group_proto_ver,
732                                           str_proto_ver,
733                                           group_uuid, last_needed_seqno));
734                 assert(ist_req_len > 0);
735                 assert(NULL != ist_req);
736             }
737             catch (gu::Exception& e)
738             {
739                 log_warn
740                     << "Failed to prepare for incremental state transfer: "
741                     << e.what() << ". IST will be unavailable.";
742 
743                 if (0 == sst_req_len)
744                     gu_throw_error(EPERM) << "neither SST nor IST is possible.";
745             }
746 
747             StateRequest* ret = new StateRequest_v1 (sst_req, sst_req_len,
748                                                      ist_req, ist_req_len);
749             free (ist_req);
750             return ret;
751         }
752         default:
753             gu_throw_fatal << "Unsupported STR protocol: " << str_proto_ver;
754         }
755     }
756     catch (std::exception& e)
757     {
758         log_fatal << "State Transfer Request preparation failed: " << e.what()
759                   << " Can't continue, aborting.";
760     }
761     catch (...)
762     {
763         log_fatal << "State Transfer Request preparation failed: "
764             "unknown exception. Can't continue, aborting.";
765     }
766     abort();
767 }
768 
769 static bool
retry_str(int ret)770 retry_str(int ret)
771 {
772     return (ret == -EAGAIN || ret == -ENOTCONN);
773 }
774 
775 void
send_state_request(const StateRequest * const req,int const str_proto_ver)776 ReplicatorSMM::send_state_request (const StateRequest* const req,
777                                    int const str_proto_ver)
778 {
779     long ret;
780     long tries = 0;
781 
782     gu_uuid_t ist_uuid = {{0, }};
783     gcs_seqno_t ist_seqno = GCS_SEQNO_ILL;
784 
785     if (req->ist_len())
786     {
787       IST_request istr;
788       get_ist_request(req, &istr);
789       ist_uuid  = istr.uuid();
790       ist_seqno = istr.last_applied();
791     }
792 
793     do
794     {
795         tries++;
796 
797         gcs_seqno_t seqno_l;
798 
799         ret = gcs_.request_state_transfer(str_proto_ver,
800                                           req->req(), req->len(), sst_donor_,
801                                           gu::GTID(ist_uuid, ist_seqno),seqno_l);
802         if (ret < 0)
803         {
804             if (!retry_str(ret))
805             {
806                 log_error << "Requesting state transfer failed: "
807                           << ret << "(" << strerror(-ret) << ")";
808             }
809             else if (1 == tries)
810             {
811                 log_info << "Requesting state transfer failed: "
812                          << ret << "(" << strerror(-ret) << "). "
813                          << "Will keep retrying every " << sst_retry_sec_
814                          << " second(s)";
815             }
816         }
817 
818         if (seqno_l != GCS_SEQNO_ILL)
819         {
820             /* Check that we're not running out of space in monitor. */
821             if (local_monitor_.would_block(seqno_l))
822             {
823                 log_error << "Slave queue grew too long while trying to "
824                           << "request state transfer " << tries << " time(s). "
825                           << "Please make sure that there is "
826                           << "at least one fully synced member in the group. "
827                           << "Application must be restarted.";
828                 ret = -EDEADLK;
829             }
830             else
831             {
832                 // we are already holding local monitor
833                 LocalOrder lo(seqno_l);
834                 local_monitor_.self_cancel(lo);
835             }
836         }
837     }
838     while (retry_str(ret) && (usleep(sst_retry_sec_ * 1000000), true));
839 
840     if (ret >= 0)
841     {
842         if (1 == tries)
843         {
844             log_info << "Requesting state transfer: success, donor: " << ret;
845         }
846         else
847         {
848             log_info << "Requesting state transfer: success after "
849                      << tries << " tries, donor: " << ret;
850         }
851     }
852     else
853     {
854         sst_state_ = SST_REQ_FAILED;
855 
856         st_.set(state_uuid_, last_committed(), safe_to_bootstrap_);
857         st_.mark_safe();
858 
859         gu::Lock lock(closing_mutex_);
860 
861         if (!closing_ && state_() > S_CLOSED)
862         {
863             log_fatal << "State transfer request failed unrecoverably: "
864                       << -ret << " (" << strerror(-ret) << "). Most likely "
865                       << "it is due to inability to communicate with the "
866                       << "cluster primary component. Restart required.";
867             abort();
868         }
869         else
870         {
871             // connection is being closed, send failure is expected
872         }
873     }
874 }
875 
876 
877 void
request_state_transfer(void * recv_ctx,int const group_proto_ver,const wsrep_uuid_t & group_uuid,wsrep_seqno_t const cc_seqno,const void * const sst_req,ssize_t const sst_req_len)878 ReplicatorSMM::request_state_transfer (void* recv_ctx,
879                                        int           const group_proto_ver,
880                                        const wsrep_uuid_t& group_uuid,
881                                        wsrep_seqno_t const cc_seqno,
882                                        const void*   const sst_req,
883                                        ssize_t       const sst_req_len)
884 {
885     assert(sst_req_len >= 0);
886     int const str_proto_ver(get_str_proto_ver(group_proto_ver));
887 
888     StateRequest* const req(prepare_state_request(sst_req, sst_req_len,
889                                                   group_proto_ver,
890                                                   str_proto_ver,
891                                                   group_uuid, cc_seqno));
892     gu::Lock sst_lock(sst_mutex_);
893     sst_received_ = false;
894 
895     st_.mark_unsafe();
896 
897     GU_DBUG_SYNC_WAIT("before_send_state_request");
898     send_state_request(req, str_proto_ver);
899 
900     state_.shift_to(S_JOINING);
901     sst_state_ = SST_WAIT;
902     sst_seqno_ = WSREP_SEQNO_UNDEFINED;
903     GU_DBUG_SYNC_WAIT("after_shift_to_joining");
904 
905     /* There are two places where we may need to adjust GCache.
906      * This is the first one, which we can do while waiting for SST to complete.
907      * Here we reset seqno map completely if we have different histories.
908      * This MUST be done before IST starts. */
909     bool const first_reset
910         (state_uuid_ /* GCache has */ != group_uuid /* current PC has */);
911     if (first_reset)
912     {
913         log_info << "Resetting GCache seqno map due to different histories.";
914         gcache_.seqno_reset(gu::GTID(group_uuid, cc_seqno));
915     }
916 
917     if (sst_req_len != 0)
918     {
919         if (sst_is_trivial(sst_req, sst_req_len) ||
920             no_sst        (sst_req, sst_req_len))
921         {
922             sst_uuid_  = group_uuid;
923             sst_seqno_ = cc_seqno;
924             sst_received_ = true;
925         }
926         else
927         {
928             while (false == sst_received_) sst_lock.wait(sst_cond_);
929         }
930 
931         if (sst_uuid_ != group_uuid)
932         {
933             log_fatal << "Application received wrong state: "
934                       << "\n\tReceived: " << sst_uuid_
935                       << "\n\tRequired: " << group_uuid;
936             sst_state_ = SST_FAILED;
937             log_fatal << "Application state transfer failed. This is "
938                       << "unrecoverable condition, restart required.";
939 
940             st_.set(sst_uuid_, sst_seqno_, safe_to_bootstrap_);
941             st_.mark_safe();
942 
943             abort();
944         }
945         else
946         {
947             assert(sst_seqno_ != WSREP_SEQNO_UNDEFINED);
948 
949             /* There are two places where we may need to adjust GCache.
950              * This is the second one.
951              * Here we reset seqno map completely if we have gap in seqnos
952              * between the received snapshot and current GCache contents.
953              * This MUST be done before IST starts. */
954             // there may be possible optimization to this when cert index
955             // transfer is implemented (it may close the gap), but not by much.
956             if (!first_reset && (last_committed() /* GCache has */ !=
957                                  sst_seqno_    /* current state has */))
958             {
959                 log_info << "Resetting GCache seqno map due to seqno gap: "
960                          << last_committed() << ".." << sst_seqno_;
961                 gcache_.seqno_reset(gu::GTID(sst_uuid_, sst_seqno_));
962             }
963 
964             update_state_uuid (sst_uuid_);
965 
966             if (group_proto_ver < PROTO_VER_GALERA_3_MAX)
967             {
968                 log_error << "Rolling upgrade from group protocol version "
969                           << "earlier than "
970                           << PROTO_VER_GALERA_3_MAX
971                           << " is not supported. Please upgrade "
972                           << "Galera library to latest in Galera 3 series on "
973                           << "all of the nodes in the cluster before "
974                           << "continuing.";
975                 abort();
976             }
977             else if (group_proto_ver == PROTO_VER_GALERA_3_MAX)
978             {
979                 // Rolling upgrade from Galera 3 PROTO_VER_GALERA_3_MAX.
980                 gu::GTID const cert_position
981                     (sst_uuid_, std::max(cc_seqno, sst_seqno_));
982                 cert_.assign_initial_position(
983                     cert_position,
984                     std::get<0>(get_trx_protocol_versions(group_proto_ver)));
985                 // with higher versions this happens in cert index preload
986             }
987 
988             apply_monitor_.set_initial_position(WSREP_UUID_UNDEFINED, -1);
989             apply_monitor_.set_initial_position(sst_uuid_, sst_seqno_);
990 
991             if (co_mode_ != CommitOrder::BYPASS)
992             {
993                 commit_monitor_.set_initial_position(WSREP_UUID_UNDEFINED, -1);
994                 commit_monitor_.set_initial_position(sst_uuid_, sst_seqno_);
995             }
996 
997             log_info << "Installed new state from SST: " << state_uuid_ << ":"
998                      << sst_seqno_;
999         }
1000     }
1001     else
1002     {
1003         assert (state_uuid_ == group_uuid);
1004         sst_seqno_ = last_committed();
1005     }
1006 
1007     if (st_.corrupt())
1008     {
1009         if (sst_req_len != 0 && !sst_is_trivial(sst_req, sst_req_len))
1010         {
1011             // Note: not storing sst seqno in state file to avoid
1012             // recovering to incorrect state if the node is
1013             // killed during IST.
1014             st_.mark_uncorrupt(sst_uuid_, WSREP_SEQNO_UNDEFINED);
1015         }
1016         else
1017         {
1018             log_fatal << "Application state is corrupt and cannot "
1019                       << "be recovered. Restart required.";
1020             abort();
1021         }
1022     }
1023     else
1024     {
1025         // Clear seqno from state file. Otherwise if node gets killed
1026         // during IST, it may recover to incorrect position.
1027         st_.set(state_uuid_, WSREP_SEQNO_UNDEFINED, safe_to_bootstrap_);
1028         st_.mark_safe();
1029     }
1030 
1031     if (req->ist_len() > 0)
1032     {
1033         if (state_uuid_ != group_uuid)
1034         {
1035             log_fatal << "Sanity check failed: my state UUID " << state_uuid_
1036                       << " is different from group state UUID " << group_uuid
1037                       << ". Can't continue with IST. Aborting.";
1038             st_.set(state_uuid_, last_committed(), safe_to_bootstrap_);
1039             st_.mark_safe();
1040             abort();
1041         }
1042 
1043         // IST is prepared only with str proto ver 1 and above
1044         // IST is *always* prepared at str proto ver 3 or higher
1045         if (last_committed() < cc_seqno || str_proto_ver >= 3)
1046         {
1047             wsrep_seqno_t const ist_from(last_committed() + 1);
1048             wsrep_seqno_t const ist_to(cc_seqno);
1049             bool const do_ist(ist_from > 0 && ist_from <= ist_to);
1050 
1051             if (do_ist)
1052             {
1053                 log_info << "Receiving IST: " << (ist_to - ist_from + 1)
1054                          << " writesets, seqnos " << ist_from
1055                          << "-" << ist_to;
1056             }
1057             else
1058             {
1059                 log_info << "Cert. index preload up to "
1060                          << ist_from - 1;
1061             }
1062 
1063             ist_receiver_.ready(ist_from);
1064             recv_IST(recv_ctx);
1065 
1066             wsrep_seqno_t const ist_seqno(ist_receiver_.finished());
1067 
1068             if (do_ist)
1069             {
1070                 assert(ist_seqno > sst_seqno_); // must exceed sst_seqno_
1071                 sst_seqno_ = ist_seqno;
1072 
1073                 // Note: apply_monitor_ must be drained to avoid race between
1074                 // IST appliers and GCS appliers, GCS action source may
1075                 // provide actions that have already been applied via IST.
1076                 log_info << "Draining apply monitors after IST up to "
1077                          << sst_seqno_;
1078                 apply_monitor_.drain(sst_seqno_);
1079                 set_initial_position(group_uuid, sst_seqno_);
1080             }
1081             else
1082             {
1083                 assert(sst_seqno_ > 0); // must have been esptablished via SST
1084                 assert(ist_seqno >= cc_seqno); // index must be rebuilt up to
1085                 assert(ist_seqno <= sst_seqno_);
1086             }
1087 
1088             if (ist_seqno == sst_seqno_)
1089             {
1090                 log_info << "IST received: " << state_uuid_ << ":" <<ist_seqno;
1091                 if (str_proto_ver < 3)
1092                 {
1093                     // see cert_.assign_initial_position() above
1094                     assert(cc_seqno == ist_seqno);
1095                     assert(cert_.lowest_trx_seqno() == ist_seqno);
1096                 }
1097             }
1098             else
1099                 log_info << "Cert. index preloaded up to " << ist_seqno;
1100         }
1101         else
1102         {
1103             (void)ist_receiver_.finished();
1104         }
1105     }
1106     else
1107     {
1108         // full SST can't be in the past
1109         assert(sst_seqno_ >= cc_seqno);
1110     }
1111 
1112 #ifndef NDEBUG
1113     {
1114         gu::Lock lock(closing_mutex_);
1115         assert(sst_seqno_ >= cc_seqno || closing_ || state_() == S_CLOSED);
1116     }
1117 #endif /* NDEBUG */
1118 
1119     delete req;
1120 }
1121 
process_IST_writeset(void * recv_ctx,const TrxHandleSlavePtr & ts_ptr)1122 void ReplicatorSMM::process_IST_writeset(void* recv_ctx,
1123                                          const TrxHandleSlavePtr& ts_ptr)
1124 {
1125     TrxHandleSlave& ts(*ts_ptr);
1126 
1127     assert(ts.global_seqno() > 0);
1128     assert(ts.state() != TrxHandle::S_COMMITTED);
1129     assert(ts.state() != TrxHandle::S_ROLLED_BACK);
1130 
1131     bool const skip(ts.is_dummy());
1132 
1133     if (gu_likely(!skip))
1134     {
1135         ts.verify_checksum();
1136 
1137         assert(ts.certified());
1138         assert(ts.depends_seqno() >= 0);
1139     }
1140     else
1141     {
1142         assert(ts.is_dummy());
1143     }
1144 
1145     try
1146     {
1147         apply_trx(recv_ctx, ts);
1148     }
1149     catch (...)
1150     {
1151         st_.mark_corrupt();
1152         throw;
1153     }
1154     GU_DBUG_SYNC_WAIT("recv_IST_after_apply_trx");
1155 
1156     if (gu_unlikely
1157         (gu::Logger::no_log(gu::LOG_DEBUG) == false))
1158     {
1159         std::ostringstream os;
1160 
1161         if (gu_likely(!skip))
1162             os << "IST received trx body: " << ts;
1163         else
1164             os << "IST skipping trx " << ts.global_seqno();
1165 
1166         log_debug << os.str();
1167     }
1168 }
1169 
1170 
recv_IST(void * recv_ctx)1171 void ReplicatorSMM::recv_IST(void* recv_ctx)
1172 {
1173     ISTEvent::Type event_type(ISTEvent::T_NULL);
1174     TrxHandleSlavePtr ts;
1175     wsrep_view_info_t* view;
1176 
1177     try
1178     {
1179         bool exit_loop(false);
1180 
1181         while (exit_loop == false)
1182         {
1183             ISTEvent ev(ist_event_queue_.pop_front());
1184             event_type = ev.type();
1185             switch (event_type)
1186             {
1187             case ISTEvent::T_NULL:
1188                 exit_loop = true;
1189                 continue;
1190             case ISTEvent::T_TRX:
1191                 ts = ev.ts();
1192                 assert(ts);
1193                 process_IST_writeset(recv_ctx, ts);
1194                 exit_loop = ts->exit_loop();
1195                 continue;
1196             case ISTEvent::T_VIEW:
1197             {
1198                 view = ev.view();
1199                 wsrep_seqno_t const cs(view->state_id.seqno);
1200 
1201                 submit_view_info(recv_ctx, view);
1202 
1203                 ::free(view);
1204 
1205                 CommitOrder co(cs, CommitOrder::NO_OOOC);
1206                 commit_monitor_.leave(co);
1207                 ApplyOrder ao(cs, cs - 1, false);
1208                 apply_monitor_.leave(ao);
1209                 GU_DBUG_SYNC_WAIT("recv_IST_after_conf_change");
1210                 continue;
1211             }
1212             }
1213 
1214             gu_throw_fatal << "Unrecognized event of type " << ev.type();
1215         }
1216     }
1217     catch (gu::Exception& e)
1218     {
1219         std::ostringstream os;
1220         os << "Receiving IST failed, node restart required: " << e.what();
1221 
1222         switch (event_type)
1223         {
1224         case ISTEvent::T_NULL:
1225             os << ". Null event.";
1226             break;
1227         case ISTEvent::T_TRX:
1228             if (ts)
1229                 os << ". Failed writeset: " << *ts;
1230             else
1231                 os << ". Corrupt IST event queue.";
1232             break;
1233         case ISTEvent::T_VIEW:
1234             os << ". VIEW event";
1235             break;
1236         }
1237 
1238         log_fatal << os.str();
1239 
1240         gu::Lock lock(closing_mutex_);
1241         start_closing();
1242     }
1243 }
1244 
handle_ist_nbo(const TrxHandleSlavePtr & ts,bool must_apply,bool preload)1245 void ReplicatorSMM::handle_ist_nbo(const TrxHandleSlavePtr& ts,
1246                                    bool must_apply, bool preload)
1247 {
1248     if (must_apply)
1249     {
1250         ts->verify_checksum();
1251         Certification::TestResult result(cert_.append_trx(ts));
1252         switch (result)
1253         {
1254         case Certification::TEST_OK:
1255             if (ts->nbo_end())
1256             {
1257                 // This is the same as in  process_trx()
1258                 if (ts->ends_nbo() == WSREP_SEQNO_UNDEFINED)
1259                 {
1260                     assert(ts->is_dummy());
1261                 }
1262                 else
1263                 {
1264                     // Signal NBO waiter
1265                     gu::shared_ptr<NBOCtx>::type nbo_ctx(
1266                         cert_.nbo_ctx(ts->ends_nbo()));
1267                     assert(nbo_ctx != 0);
1268                     nbo_ctx->set_ts(ts);
1269                     return; // not pushing to queue below
1270                 }
1271             }
1272             break;
1273         case Certification::TEST_FAILED:
1274         {
1275             assert(ts->nbo_end()); // non-effective nbo_end
1276             assert(ts->is_dummy());
1277             break;
1278         }
1279         }
1280         /* regardless of certification outcome, event must be passed to
1281          * apply_trx() as it carries global seqno */
1282     }
1283     else
1284     {
1285         // Skipping NBO events in preload is fine since the joiner either
1286         // has all the events applied in case of pure IST or the
1287         // donor refuses to donate SST from the position with active NBO.
1288         assert(preload);
1289         log_debug << "Skipping NBO event: " << ts;
1290         wsrep_seqno_t const pos(cert_.increment_position());
1291         assert(ts->global_seqno() == pos);
1292         (void)pos;
1293     }
1294     if (gu_likely(must_apply == true))
1295     {
1296         ist_event_queue_.push_back(ts);
1297     }
1298 }
1299 
1300 // Append IST trx to certification index. As trx has passed certification
1301 // on donor, certification is expected to pass. If it fails, exception
1302 // is thrown as the state is unrecoverable.
append_ist_trx(galera::Certification & cert,const TrxHandleSlavePtr & ts)1303 static void append_ist_trx(galera::Certification& cert,
1304                            const TrxHandleSlavePtr& ts)
1305 {
1306     Certification::TestResult result(cert.append_trx(ts));
1307     if (result != Certification::TEST_OK)
1308     {
1309         gu_throw_fatal << "Pre IST trx append returned unexpected "
1310                        << "certification result " << result
1311                        << ", expected " << Certification::TEST_OK
1312                        << "must abort to maintain consistency, "
1313                        << " cert position: " << cert.position()
1314                        << " ts: " << *ts;
1315     }
1316 }
1317 
handle_ist_trx_preload(const TrxHandleSlavePtr & ts,bool const must_apply)1318 void ReplicatorSMM::handle_ist_trx_preload(const TrxHandleSlavePtr& ts,
1319                                            bool const must_apply)
1320 {
1321     if (not ts->is_dummy())
1322     {
1323         append_ist_trx(cert_, ts);
1324         if (not must_apply)
1325         {
1326             // Pure preload event will not get applied, so mark it committed
1327             // here for certification bookkeeping.
1328             cert_.set_trx_committed(*ts);
1329         }
1330     }
1331     else if (cert_.position() != WSREP_SEQNO_UNDEFINED)
1332     {
1333         // Increment position to keep track only if the initial
1334         // seqno has already been assigned.
1335         wsrep_seqno_t const pos __attribute__((unused))(
1336             cert_.increment_position());
1337         assert(ts->global_seqno() == pos);
1338     }
1339 }
1340 
handle_ist_trx(const TrxHandleSlavePtr & ts,bool must_apply,bool preload)1341 void ReplicatorSMM::handle_ist_trx(const TrxHandleSlavePtr& ts,
1342                                    bool must_apply, bool preload)
1343 {
1344     if (preload)
1345     {
1346         handle_ist_trx_preload(ts, must_apply);
1347     }
1348     if (must_apply)
1349     {
1350         ist_event_queue_.push_back(ts);
1351     }
1352 }
1353 
ist_trx(const TrxHandleSlavePtr & ts,bool must_apply,bool preload)1354 void ReplicatorSMM::ist_trx(const TrxHandleSlavePtr& ts, bool must_apply,
1355                             bool preload)
1356 {
1357     assert(ts != 0);
1358 
1359     assert(ts->depends_seqno() >= 0 || ts->is_dummy() || ts->nbo_end());
1360     assert(ts->local_seqno() == WSREP_SEQNO_UNDEFINED);
1361     assert(sst_seqno_ > 0);
1362 
1363     ts->verify_checksum();
1364 
1365     // Note: Write sets which do not have preload or must_apply flag set
1366     // are used to populate joiner gcache to have enough history
1367     // to be able to donate IST for following joiners. Therefore
1368     // they don't need to be applied or used to populate certification
1369     // index and are just skipped here.
1370     if (not (preload || must_apply))
1371     {
1372         return;
1373     }
1374 
1375     if (gu_unlikely(cert_.position() == WSREP_SEQNO_UNDEFINED))
1376     {
1377         if (not ts->is_dummy())
1378         {
1379             // This is the first pre IST event for rebuilding cert index.
1380             // Note that we skip dummy write sets here as they don't carry
1381             // version information. If the IST will contain only dummy
1382             // write sets, the last CC belonging into IST which corresponds
1383             // to CC which triggered STR, will initialize cert index.
1384             assert(ts->version() > 0);
1385             cert_.assign_initial_position(
1386                 /* proper UUID will be installed by CC */
1387                 gu::GTID(gu::UUID(), ts->global_seqno() - 1), ts->version());
1388         }
1389     }
1390 
1391     assert(ts->state() == TrxHandleSlave::S_REPLICATING);
1392     ts->set_state(TrxHandleSlave::S_CERTIFYING);
1393 
1394     if (ts->nbo_start() || ts->nbo_end())
1395     {
1396         handle_ist_nbo(ts, must_apply, preload);
1397     }
1398     else
1399     {
1400         handle_ist_trx(ts, must_apply, preload);
1401     }
1402 }
1403 
ist_end(int error)1404 void ReplicatorSMM::ist_end(int error)
1405 {
1406     ist_event_queue_.eof(error);
1407 }
1408 
process_ist_conf_change(const gcs_act_cchange & conf)1409 void galera::ReplicatorSMM::process_ist_conf_change(const gcs_act_cchange& conf)
1410 {
1411     // IST should contain only ordered CCs
1412     assert(conf.repl_proto_ver >= PROTO_VER_ORDERED_CC);
1413 
1414     // Drain monitors to make sure that all preceding IST events have
1415     // been applied.
1416     drain_monitors(conf.seqno - 1);
1417     // Create view info. This will be consumed by ist_event_queue_.push_back().
1418     wsrep_uuid_t uuid_undefined(WSREP_UUID_UNDEFINED);
1419     wsrep_view_info_t* const view_info
1420         (galera_view_info_create(conf,
1421                                  capabilities(conf.repl_proto_ver),
1422                                  -1, uuid_undefined));
1423     // IST view status should always be Primary
1424     assert(view_info->status == WSREP_VIEW_PRIMARY);
1425     // Establish protocol version before adjusting cert position as
1426     // trx_params_.version is decided there.
1427     establish_protocol_versions (conf.repl_proto_ver);
1428     cert_.adjust_position(*view_info, gu::GTID(conf.uuid, conf.seqno),
1429                           trx_params_.version_);
1430     update_incoming_list(*view_info);
1431     record_cc_seqnos(conf.seqno, "ist");
1432 
1433     // TO monitors need to be entered here to maintain critical
1434     // section over passing the view through the event queue to
1435     // an applier and ensure that the view is submitted in isolation.
1436     // Applier is to leave monitors and free the view after it is
1437     // submitted.
1438     ApplyOrder ao(conf.seqno, conf.seqno - 1, false);
1439     apply_monitor_.enter(ao);
1440     CommitOrder co(conf.seqno, CommitOrder::NO_OOOC);
1441     commit_monitor_.enter(co);
1442     ist_event_queue_.push_back(view_info);
1443 }
1444 
ist_cc(const gcs_action & act,bool must_apply,bool preload)1445 void ReplicatorSMM::ist_cc(const gcs_action& act, bool must_apply,
1446                            bool preload)
1447 {
1448     assert(GCS_ACT_CCHANGE == act.type);
1449     assert(act.seqno_g > 0);
1450 
1451     gcs_act_cchange const conf(act.buf, act.size);
1452 
1453     assert(conf.conf_id >= 0); // Primary configuration
1454     assert(conf.seqno == act.seqno_g);
1455 
1456     if (gu_unlikely(cert_.position() == WSREP_SEQNO_UNDEFINED) &&
1457         (must_apply || preload))
1458     {
1459         // This is the first IST event for rebuilding cert index,
1460         // need to initialize certification
1461         establish_protocol_versions(conf.repl_proto_ver);
1462         cert_.assign_initial_position(gu::GTID(conf.uuid, conf.seqno - 1),
1463                                       trx_params_.version_);
1464     }
1465 
1466     if (must_apply == true)
1467     {
1468         // Will generate and queue view info. Monitors are handled by
1469         // slave appliers when the view_info is consumed.
1470         process_ist_conf_change(conf);
1471     }
1472     else
1473     {
1474         if (preload == true)
1475         {
1476             wsrep_uuid_t uuid_undefined(WSREP_UUID_UNDEFINED);
1477             wsrep_view_info_t* const view_info(
1478                 galera_view_info_create(conf, capabilities(conf.repl_proto_ver),
1479                                         -1, uuid_undefined));
1480             /* CC is part of index preload but won't be processed
1481              * by process_conf_change()
1482              * Order of these calls is essential: trx_params_.version_ may
1483              * be altered by establish_protocol_versions() */
1484             establish_protocol_versions(conf.repl_proto_ver);
1485             cert_.adjust_position(*view_info, gu::GTID(conf.uuid, conf.seqno),
1486                                   trx_params_.version_);
1487             // record CC related state seqnos, needed for IST on DONOR
1488             record_cc_seqnos(conf.seqno, "preload");
1489             ::free(view_info);
1490         }
1491     }
1492 }
1493 
1494 } /* namespace galera */
1495