1 /*
2  * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "wsrep/server_state.hpp"
21 #include "wsrep/client_state.hpp"
22 #include "wsrep/server_service.hpp"
23 #include "wsrep/high_priority_service.hpp"
24 #include "wsrep/transaction.hpp"
25 #include "wsrep/view.hpp"
26 #include "wsrep/logger.hpp"
27 #include "wsrep/compiler.hpp"
28 #include "wsrep/id.hpp"
29 
30 #include <cassert>
31 #include <sstream>
32 #include <algorithm>
33 
34 
35 //////////////////////////////////////////////////////////////////////////////
36 //                               Helpers                                    //
37 //////////////////////////////////////////////////////////////////////////////
38 
39 
40 //
41 // This method is used to deal with historical burden of several
42 // ways to bootstrap the cluster. Bootstrap happens if
43 //
44 // * bootstrap option is given
45 // * cluster_address is "gcomm://" (Galera provider)
46 //
is_bootstrap(const std::string & cluster_address,bool bootstrap)47 static bool is_bootstrap(const std::string& cluster_address, bool bootstrap)
48 {
49     return (bootstrap || cluster_address == "gcomm://");
50 }
51 
52 // Helper method to provide detailed error message if transaction
53 // adopt for fragment removal fails.
log_adopt_error(const wsrep::transaction & transaction)54 static void log_adopt_error(const wsrep::transaction& transaction)
55 {
56     wsrep::log_warning() << "Adopting a transaction ("
57                          << transaction.server_id() << "," << transaction.id()
58                          << ") for rollback failed, "
59                          << "this may leave stale entries to streaming log "
60                          << "which may need to be removed manually.";
61 }
62 
63 // resolve which of the two errors return to caller
resolve_return_error(bool const vote,int const vote_err,int const apply_err)64 static inline int resolve_return_error(bool const vote,
65                                        int  const vote_err,
66                                        int  const apply_err)
67 {
68     if (vote) return vote_err;
69     return vote_err != 0 ? vote_err : apply_err;
70 }
71 
72 static void
discard_streaming_applier(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,wsrep::high_priority_service * streaming_applier,const wsrep::ws_meta & ws_meta)73 discard_streaming_applier(wsrep::server_state& server_state,
74                           wsrep::high_priority_service& high_priority_service,
75                           wsrep::high_priority_service* streaming_applier,
76                           const wsrep::ws_meta& ws_meta)
77 {
78     server_state.stop_streaming_applier(
79         ws_meta.server_id(), ws_meta.transaction_id());
80     server_state.server_service().release_high_priority_service(
81         streaming_applier);
82     high_priority_service.store_globals();
83 }
84 
apply_fragment(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,wsrep::high_priority_service * streaming_applier,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)85 static int apply_fragment(wsrep::server_state& server_state,
86                           wsrep::high_priority_service& high_priority_service,
87                           wsrep::high_priority_service* streaming_applier,
88                           const wsrep::ws_handle& ws_handle,
89                           const wsrep::ws_meta& ws_meta,
90                           const wsrep::const_buffer& data)
91 {
92     int ret(0);
93     int apply_err;
94     wsrep::mutable_buffer err;
95     {
96         wsrep::high_priority_switch sw(high_priority_service,
97                                        *streaming_applier);
98         apply_err = streaming_applier->apply_write_set(ws_meta, data, err);
99         if (!apply_err)
100         {
101             assert(err.size() == 0);
102             streaming_applier->after_apply();
103         }
104         else
105         {
106             bool const remove_fragments(streaming_applier->transaction(
107                 ).streaming_context().fragments().size() > 0);
108             ret = streaming_applier->rollback(ws_handle, ws_meta);
109             ret = ret || (streaming_applier->after_apply(), 0);
110 
111             if (remove_fragments)
112             {
113                 ret = ret || streaming_applier->start_transaction(ws_handle,
114                                                                   ws_meta);
115                 ret = ret || (streaming_applier->adopt_apply_error(err), 0);
116                 ret = ret || streaming_applier->remove_fragments(ws_meta);
117                 ret = ret || streaming_applier->commit(ws_handle, ws_meta);
118                 ret = ret || (streaming_applier->after_apply(), 0);
119             }
120             else
121             {
122                 ret = streaming_applier->log_dummy_write_set(ws_handle,
123                                                              ws_meta, err);
124             }
125         }
126     }
127 
128     if (!ret)
129     {
130         if (!apply_err)
131         {
132             high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
133             const wsrep::xid xid(streaming_applier->transaction().xid());
134             ret = high_priority_service.append_fragment_and_commit(
135                 ws_handle, ws_meta, data, xid);
136             high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
137             ret = ret || (high_priority_service.after_apply(), 0);
138         }
139         else
140         {
141             discard_streaming_applier(server_state,
142                                       high_priority_service,
143                                       streaming_applier,
144                                       ws_meta);
145             ret = resolve_return_error(err.size() > 0, ret, apply_err);
146         }
147     }
148 
149     return ret;
150 }
151 
commit_fragment(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,wsrep::high_priority_service * streaming_applier,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)152 static int commit_fragment(wsrep::server_state& server_state,
153                            wsrep::high_priority_service& high_priority_service,
154                            wsrep::high_priority_service* streaming_applier,
155                            const wsrep::ws_handle& ws_handle,
156                            const wsrep::ws_meta& ws_meta,
157                            const wsrep::const_buffer& data)
158 {
159     int ret(0);
160     {
161         wsrep::high_priority_switch sw(
162             high_priority_service, *streaming_applier);
163         wsrep::mutable_buffer err;
164         int const apply_err(
165             streaming_applier->apply_write_set(ws_meta, data, err));
166         if (apply_err)
167         {
168             assert(streaming_applier->transaction(
169                 ).streaming_context().fragments().size() > 0);
170             ret = streaming_applier->rollback(ws_handle, ws_meta);
171             ret = ret || (streaming_applier->after_apply(), 0);
172             ret = ret || streaming_applier->start_transaction(
173                 ws_handle, ws_meta);
174             ret = ret || (streaming_applier->adopt_apply_error(err),0);
175         }
176         else
177         {
178             assert(err.size() == 0);
179         }
180 
181         const wsrep::transaction& trx(streaming_applier->transaction());
182         // Fragment removal for XA is going to happen in after_commit
183         if (trx.state() != wsrep::transaction::s_prepared)
184         {
185             streaming_applier->debug_crash(
186                 "crash_apply_cb_before_fragment_removal");
187 
188             ret = ret || streaming_applier->remove_fragments(ws_meta);
189 
190             streaming_applier->debug_crash(
191                 "crash_apply_cb_after_fragment_removal");
192         }
193 
194         streaming_applier->debug_crash(
195             "crash_commit_cb_before_last_fragment_commit");
196         ret = ret || streaming_applier->commit(ws_handle, ws_meta);
197         streaming_applier->debug_crash(
198             "crash_commit_cb_last_fragment_commit_success");
199         ret = ret || (streaming_applier->after_apply(), 0);
200         ret = resolve_return_error(err.size() > 0, ret, apply_err);
201     }
202 
203     if (!ret)
204     {
205         discard_streaming_applier(server_state, high_priority_service,
206                                   streaming_applier, ws_meta);
207     }
208 
209     return ret;
210 }
211 
rollback_fragment(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,wsrep::high_priority_service * streaming_applier,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)212 static int rollback_fragment(wsrep::server_state& server_state,
213                              wsrep::high_priority_service& high_priority_service,
214                              wsrep::high_priority_service* streaming_applier,
215                              const wsrep::ws_handle& ws_handle,
216                              const wsrep::ws_meta& ws_meta)
217 {
218     int ret(0);
219     int adopt_error(0);
220     bool const remove_fragments(streaming_applier->transaction().
221                                 streaming_context().fragments().size() > 0);
222     // If fragment removal is needed, adopt transaction state
223     // and start a transaction for it.
224     if (remove_fragments &&
225         (adopt_error = high_priority_service.adopt_transaction(
226           streaming_applier->transaction())))
227     {
228         log_adopt_error(streaming_applier->transaction());
229     }
230     // Even if the adopt above fails we roll back the streaming transaction.
231     // Adopt failure will leave stale entries in streaming log which can
232     // be removed manually.
233     wsrep::const_buffer no_error;
234     {
235         wsrep::high_priority_switch ws(
236             high_priority_service, *streaming_applier);
237         // Streaming applier rolls back out of order. Fragment
238         // removal grabs commit order below.
239         ret = streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta());
240         ret = ret || (streaming_applier->after_apply(), 0);
241     }
242 
243     if (!ret)
244     {
245         discard_streaming_applier(server_state, high_priority_service,
246                                   streaming_applier, ws_meta);
247 
248         if (adopt_error == 0)
249         {
250             if (remove_fragments)
251             {
252                 high_priority_service.remove_fragments(ws_meta);
253                 high_priority_service.commit(ws_handle, ws_meta);
254                 high_priority_service.after_apply();
255             }
256             else
257             {
258                 if (ws_meta.ordered())
259                 {
260                     wsrep::mutable_buffer no_error;
261                     ret = high_priority_service.log_dummy_write_set(
262                         ws_handle, ws_meta, no_error);
263                 }
264             }
265         }
266     }
267     return ret;
268 }
269 
apply_write_set(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)270 static int apply_write_set(wsrep::server_state& server_state,
271                            wsrep::high_priority_service& high_priority_service,
272                            const wsrep::ws_handle& ws_handle,
273                            const wsrep::ws_meta& ws_meta,
274                            const wsrep::const_buffer& data)
275 {
276     int ret(0);
277     if (wsrep::rolls_back_transaction(ws_meta.flags()))
278     {
279         wsrep::mutable_buffer no_error;
280         if (wsrep::starts_transaction(ws_meta.flags()))
281         {
282             // No transaction existed before, log a dummy write set
283             ret = high_priority_service.log_dummy_write_set(
284                 ws_handle, ws_meta, no_error);
285         }
286         else
287         {
288             wsrep::high_priority_service* sa(
289                 server_state.find_streaming_applier(
290                     ws_meta.server_id(), ws_meta.transaction_id()));
291             if (sa == 0)
292             {
293                 // It is a known limitation that galera provider
294                 // cannot always determine if certification test
295                 // for interrupted transaction will pass or fail
296                 // (see comments in transaction::certify_fragment()).
297                 // As a consequence, unnecessary rollback fragments
298                 // may be delivered here. The message below has
299                 // been intentionally turned into a debug message,
300                 // rather than warning.
301                  WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
302                                  wsrep::log::debug_level_server_state,
303                                  "Could not find applier context for "
304                                  << ws_meta.server_id()
305                                  << ": " << ws_meta.transaction_id());
306                 ret = high_priority_service.log_dummy_write_set(
307                     ws_handle, ws_meta, no_error);
308             }
309             else
310             {
311                 // rollback_fragment() consumes sa
312                 ret = rollback_fragment(server_state,
313                                         high_priority_service,
314                                         sa,
315                                         ws_handle,
316                                         ws_meta);
317             }
318         }
319     }
320     else if (wsrep::starts_transaction(ws_meta.flags()) &&
321              wsrep::commits_transaction(ws_meta.flags()))
322     {
323         ret = high_priority_service.start_transaction(ws_handle, ws_meta);
324         if (!ret)
325         {
326             wsrep::mutable_buffer err;
327             int const apply_err(high_priority_service.apply_write_set(
328                 ws_meta, data, err));
329             if (!apply_err)
330             {
331                 assert(err.size() == 0);
332                 ret = high_priority_service.commit(ws_handle, ws_meta);
333                 ret = ret || (high_priority_service.after_apply(), 0);
334             }
335             else
336             {
337                 ret = high_priority_service.rollback(ws_handle, ws_meta);
338                 ret = ret || (high_priority_service.after_apply(), 0);
339                 ret = ret || high_priority_service.log_dummy_write_set(
340                     ws_handle, ws_meta, err);
341                 ret = resolve_return_error(err.size() > 0, ret, apply_err);
342             }
343         }
344     }
345     else if (wsrep::starts_transaction(ws_meta.flags()))
346     {
347         assert(server_state.find_streaming_applier(
348                    ws_meta.server_id(), ws_meta.transaction_id()) == 0);
349         wsrep::high_priority_service* sa(
350             server_state.server_service().streaming_applier_service(
351                 high_priority_service));
352         server_state.start_streaming_applier(
353             ws_meta.server_id(), ws_meta.transaction_id(), sa);
354         sa->start_transaction(ws_handle, ws_meta);
355         ret = apply_fragment(server_state,
356                              high_priority_service,
357                              sa,
358                              ws_handle,
359                              ws_meta,
360                              data);
361     }
362     else if (ws_meta.flags() == 0 || ws_meta.flags() == wsrep::provider::flag::pa_unsafe ||
363              wsrep::prepares_transaction(ws_meta.flags()))
364     {
365         wsrep::high_priority_service* sa(
366             server_state.find_streaming_applier(
367                 ws_meta.server_id(), ws_meta.transaction_id()));
368         if (sa == 0)
369         {
370             // It is possible that rapid group membership changes
371             // may cause streaming transaction be rolled back before
372             // commit fragment comes in. Although this is a valid
373             // situation, log a warning if a sac cannot be found as
374             // it may be an indication of  a bug too.
375             wsrep::log_warning() << "Could not find applier context for "
376                                  << ws_meta.server_id()
377                                  << ": " << ws_meta.transaction_id();
378             wsrep::mutable_buffer no_error;
379             ret = high_priority_service.log_dummy_write_set(
380                 ws_handle, ws_meta, no_error);
381         }
382         else
383         {
384             sa->next_fragment(ws_meta);
385             ret = apply_fragment(server_state,
386                                  high_priority_service,
387                                  sa,
388                                  ws_handle,
389                                  ws_meta,
390                                  data);
391         }
392     }
393     else if (wsrep::commits_transaction(ws_meta.flags()))
394     {
395         if (high_priority_service.is_replaying())
396         {
397             wsrep::mutable_buffer unused;
398             ret = high_priority_service.start_transaction(
399                 ws_handle, ws_meta) ||
400                 high_priority_service.apply_write_set(ws_meta, data, unused) ||
401                 high_priority_service.commit(ws_handle, ws_meta);
402         }
403         else
404         {
405             wsrep::high_priority_service* sa(
406                 server_state.find_streaming_applier(
407                     ws_meta.server_id(), ws_meta.transaction_id()));
408             if (sa == 0)
409             {
410                 // It is possible that rapid group membership changes
411                 // may cause streaming transaction be rolled back before
412                 // commit fragment comes in. Although this is a valid
413                 // situation, log a warning if a sac cannot be found as
414                 // it may be an indication of  a bug too.
415                 wsrep::log_warning()
416                     << "Could not find applier context for "
417                     << ws_meta.server_id()
418                     << ": " << ws_meta.transaction_id();
419                 wsrep::mutable_buffer no_error;
420                 ret = high_priority_service.log_dummy_write_set(
421                     ws_handle, ws_meta, no_error);
422             }
423             else
424             {
425                 // Commit fragment consumes sa
426                 sa->next_fragment(ws_meta);
427                 ret = commit_fragment(server_state,
428                                       high_priority_service,
429                                       sa,
430                                       ws_handle,
431                                       ws_meta,
432                                       data);
433             }
434         }
435     }
436     else
437     {
438         assert(0);
439     }
440     if (ret)
441     {
442         wsrep::log_error() << "Failed to apply write set: " << ws_meta;
443     }
444     return ret;
445 }
446 
apply_toi(wsrep::provider & provider,wsrep::high_priority_service & high_priority_service,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)447 static int apply_toi(wsrep::provider& provider,
448                      wsrep::high_priority_service& high_priority_service,
449                      const wsrep::ws_handle& ws_handle,
450                      const wsrep::ws_meta& ws_meta,
451                      const wsrep::const_buffer& data)
452 {
453     if (wsrep::starts_transaction(ws_meta.flags()) &&
454         wsrep::commits_transaction(ws_meta.flags()))
455     {
456         //
457         // Regular TOI.
458         //
459         provider.commit_order_enter(ws_handle, ws_meta);
460         wsrep::mutable_buffer err;
461         int const apply_err(high_priority_service.apply_toi(ws_meta,data,err));
462         int const vote_err(provider.commit_order_leave(ws_handle, ws_meta,err));
463         return resolve_return_error(err.size() > 0, vote_err, apply_err);
464     }
465     else if (wsrep::starts_transaction(ws_meta.flags()))
466     {
467         provider.commit_order_enter(ws_handle, ws_meta);
468         wsrep::mutable_buffer err;
469         int const apply_err(high_priority_service.apply_nbo_begin(ws_meta, data, err));
470         int const vote_err(provider.commit_order_leave(ws_handle, ws_meta, err));
471         return resolve_return_error(err.size() > 0, vote_err, apply_err);
472     }
473     else if (wsrep::commits_transaction(ws_meta.flags()))
474     {
475         // NBO end event is ignored here, both local and applied
476         // have NBO end handled via local TOI calls.
477         provider.commit_order_enter(ws_handle, ws_meta);
478         wsrep::mutable_buffer err;
479         provider.commit_order_leave(ws_handle, ws_meta, err);
480         return 0;
481     }
482     else
483     {
484         assert(0);
485         return 0;
486     }
487 }
488 
489 //////////////////////////////////////////////////////////////////////////////
490 //                            Server State                                  //
491 //////////////////////////////////////////////////////////////////////////////
492 
load_provider(const std::string & provider_spec,const std::string & provider_options,const wsrep::provider::services & services)493 int wsrep::server_state::load_provider(
494     const std::string& provider_spec, const std::string& provider_options,
495     const wsrep::provider::services& services)
496 {
497     wsrep::log_info() << "Loading provider " << provider_spec
498                       << " initial position: " << initial_position_;
499 
500     provider_ = wsrep::provider::make_provider(*this,
501                                                provider_spec,
502                                                provider_options,
503                                                services);
504     return (provider_ ? 0 : 1);
505 }
506 
unload_provider()507 void wsrep::server_state::unload_provider()
508 {
509     delete provider_;
510     provider_ = 0;
511 }
512 
connect(const std::string & cluster_name,const std::string & cluster_address,const std::string & state_donor,bool bootstrap)513 int wsrep::server_state::connect(const std::string& cluster_name,
514                                    const std::string& cluster_address,
515                                    const std::string& state_donor,
516                                    bool bootstrap)
517 {
518     bootstrap_ = is_bootstrap(cluster_address, bootstrap);
519     wsrep::log_info() << "Connecting with bootstrap option: " << bootstrap_;
520     return provider().connect(cluster_name, cluster_address, state_donor,
521                               bootstrap_);
522 }
523 
disconnect()524 int wsrep::server_state::disconnect()
525 {
526     {
527         wsrep::unique_lock<wsrep::mutex> lock(mutex_);
528         // In case of failure situations which are caused by provider
529         // being shut down some failing operation may also try to shut
530         // down the replication. Check the state here and
531         // return success if the provider disconnect is already in progress
532         // or has completed.
533         if (state(lock) == s_disconnecting || state(lock) == s_disconnected)
534         {
535             return 0;
536         }
537         state(lock, s_disconnecting);
538         interrupt_state_waiters(lock);
539     }
540     return provider().disconnect();
541 }
542 
~server_state()543 wsrep::server_state::~server_state()
544 {
545     delete provider_;
546 }
547 
548 std::vector<wsrep::provider::status_variable>
status() const549 wsrep::server_state::status() const
550 {
551     return provider().status();
552 }
553 
554 
pause()555 wsrep::seqno wsrep::server_state::pause()
556 {
557     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
558     // Disallow concurrent calls to pause to in order to have non-concurrent
559     // access to desynced_on_pause_ which is checked in resume() call.
560     wsrep::log_info() << "pause";
561     while (pause_count_ > 0)
562     {
563         cond_.wait(lock);
564     }
565     ++pause_count_;
566     assert(pause_seqno_.is_undefined());
567     lock.unlock();
568     pause_seqno_ = provider().pause();
569     lock.lock();
570     if (pause_seqno_.is_undefined())
571     {
572         --pause_count_;
573     }
574     return pause_seqno_;
575 }
576 
resume()577 void wsrep::server_state::resume()
578 {
579     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
580     wsrep::log_info() << "resume";
581     assert(pause_seqno_.is_undefined() == false);
582     assert(pause_count_ == 1);
583     if (provider().resume())
584     {
585         throw wsrep::runtime_error("Failed to resume provider");
586     }
587     pause_seqno_ = wsrep::seqno::undefined();
588     --pause_count_;
589     cond_.notify_all();
590 }
591 
desync_and_pause()592 wsrep::seqno wsrep::server_state::desync_and_pause()
593 {
594     wsrep::log_info() << "Desyncing and pausing the provider";
595     // Temporary variable to store desync() return status. This will be
596     // assigned to desynced_on_pause_ after pause() call to prevent
597     // concurrent access to  member variable desynced_on_pause_.
598     bool desync_successful;
599     if (desync())
600     {
601         // Desync may give transient error if the provider cannot
602         // communicate with the rest of the cluster. However, this
603         // error can be tolerated because if the provider can be
604         // paused successfully below.
605         WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
606                         wsrep::log::debug_level_server_state,
607                         "Failed to desync server before pause");
608         desync_successful = false;
609     }
610     else
611     {
612         desync_successful = true;
613     }
614     wsrep::seqno ret(pause());
615     if (ret.is_undefined())
616     {
617         wsrep::log_warning() << "Failed to pause provider";
618         resync();
619         return wsrep::seqno::undefined();
620     }
621     else
622     {
623         desynced_on_pause_ = desync_successful;
624     }
625     wsrep::log_info() << "Provider paused at: " << ret;
626     return ret;
627 }
628 
resume_and_resync()629 void wsrep::server_state::resume_and_resync()
630 {
631     wsrep::log_info() << "Resuming and resyncing the provider";
632     try
633     {
634         // Assign desynced_on_pause_ to local variable before resuming
635         // in order to avoid concurrent access to desynced_on_pause_ member
636         // variable.
637         bool do_resync = desynced_on_pause_;
638         desynced_on_pause_ = false;
639         resume();
640         if (do_resync)
641         {
642             resync();
643         }
644     }
645     catch (const wsrep::runtime_error& e)
646     {
647         wsrep::log_warning()
648             << "Resume and resync failed, server may have to be restarted";
649     }
650 }
651 
prepare_for_sst()652 std::string wsrep::server_state::prepare_for_sst()
653 {
654     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
655     state(lock, s_joiner);
656     lock.unlock();
657     return server_service_.sst_request();
658 }
659 
start_sst(const std::string & sst_request,const wsrep::gtid & gtid,bool bypass)660 int wsrep::server_state::start_sst(const std::string& sst_request,
661                                    const wsrep::gtid& gtid,
662                                    bool bypass)
663 {
664     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
665     state(lock, s_donor);
666     int ret(0);
667     lock.unlock();
668     if (server_service_.start_sst(sst_request, gtid, bypass))
669     {
670         lock.lock();
671         wsrep::log_warning() << "SST preparation failed";
672         // v26 API does not have JOINED event, so in anticipation of SYNCED
673         // we must do it here.
674         state(lock, s_joined);
675         ret = 1;
676     }
677     return ret;
678 }
679 
sst_sent(const wsrep::gtid & gtid,int error)680 void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error)
681 {
682     if (0 == error)
683         wsrep::log_info() << "SST sent: " << gtid;
684     else
685         wsrep::log_info() << "SST sending failed: " << error;
686 
687     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
688     // v26 API does not have JOINED event, so in anticipation of SYNCED
689     // we must do it here.
690     state(lock, s_joined);
691     lock.unlock();
692     enum provider::status const retval(provider().sst_sent(gtid, error));
693     if (retval != provider::success)
694     {
695         std::string msg("wsrep::sst_sent() returned an error: ");
696         msg += wsrep::provider::to_string(retval);
697         server_service_.log_message(wsrep::log::warning, msg.c_str());
698     }
699 }
700 
sst_received(wsrep::client_service & cs,int const error)701 void wsrep::server_state::sst_received(wsrep::client_service& cs,
702                                        int const error)
703 {
704     wsrep::log_info() << "SST received";
705     wsrep::gtid gtid(wsrep::gtid::undefined());
706     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
707     assert(state_ == s_joiner || state_ == s_initialized);
708 
709     // Run initialization only if the SST was successful.
710     // In case of SST failure the system is in undefined state
711     // may not be recoverable.
712     if (error == 0)
713     {
714         if (server_service_.sst_before_init())
715         {
716             if (init_initialized_ == false)
717             {
718                 state(lock, s_initializing);
719                 lock.unlock();
720                 server_service_.debug_sync("on_view_wait_initialized");
721                 lock.lock();
722                 wait_until_state(lock, s_initialized);
723                 assert(init_initialized_);
724             }
725         }
726         lock.unlock();
727 
728         if (id_.is_undefined())
729         {
730             assert(0);
731             throw wsrep::runtime_error(
732                 "wsrep::sst_received() called before connection to cluster");
733         }
734 
735         gtid = server_service_.get_position(cs);
736         wsrep::log_info() << "Recovered position from storage: " << gtid;
737 
738         lock.lock();
739         if (gtid.seqno() >= connected_gtid().seqno())
740         {
741             /* Now the node has all the data the cluster has: part in
742              * storage, part in replication event queue. */
743             state(lock, s_joined);
744         }
745         lock.unlock();
746 
747         wsrep::view const v(server_service_.get_view(cs, id_));
748         wsrep::log_info() << "Recovered view from SST:\n" << v;
749 
750         /*
751          * If the state id from recovered view has undefined ID, we may
752          * be upgrading from earlier version which does not provide
753          * view stored in stable storage. In this case we skip
754          * sanity checks and assigning the current view and wait
755          * until the first view delivery.
756          */
757         if (v.state_id().id().is_undefined() == false)
758         {
759             if (v.state_id().id() != gtid.id() ||
760                 v.state_id().seqno() > gtid.seqno())
761             {
762                 /* Since IN GENERAL we may not be able to recover SST GTID from
763                  * the state data, we have to rely on SST script passing the
764                  * GTID value explicitly.
765                  * Here we check if the passed GTID makes any sense: it should
766                  * have the same UUID and greater or equal seqno than the last
767                  * logged view. */
768                 std::ostringstream msg;
769                 msg << "SST script passed bogus GTID: " << gtid
770                     << ". Preceding view GTID: " << v.state_id();
771                 throw wsrep::runtime_error(msg.str());
772             }
773 
774             if (current_view_.status() == wsrep::view::primary)
775             {
776                 previous_primary_view_ = current_view_;
777             }
778             current_view_ = v;
779             server_service_.log_view(NULL /* this view is stored already */, v);
780         }
781         else
782         {
783             wsrep::log_warning()
784                 << "View recovered from stable storage was empty. If the "
785                 << "server is doing rolling upgrade from previous version "
786                 << "which does not support storing view info into stable "
787                 << "storage, this is ok. Otherwise this may be a sign of "
788                 << "malfunction.";
789         }
790         lock.lock();
791         recover_streaming_appliers_if_not_recovered(lock, cs);
792         lock.unlock();
793     }
794 
795     enum provider::status const retval(provider().sst_received(gtid, error));
796     if (retval != provider::success)
797     {
798         std::string msg("wsrep::sst_received() failed: ");
799         msg += wsrep::provider::to_string(retval);
800         throw wsrep::runtime_error(msg);
801     }
802 }
803 
initialized()804 void wsrep::server_state::initialized()
805 {
806     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
807     wsrep::log_info() << "Server initialized";
808     init_initialized_ = true;
809     if (server_service_.sst_before_init())
810     {
811         state(lock, s_initialized);
812     }
813     else
814     {
815         state(lock, s_initializing);
816         state(lock, s_initialized);
817     }
818 }
819 
820 enum wsrep::provider::status
wait_for_gtid(const wsrep::gtid & gtid,int timeout) const821 wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
822     const
823 {
824     return provider().wait_for_gtid(gtid, timeout);
825 }
826 
827 int
set_encryption_key(std::vector<unsigned char> & key)828 wsrep::server_state::set_encryption_key(std::vector<unsigned char>& key)
829 {
830     encryption_key_ = key;
831     if (state_ != s_disconnected)
832     {
833         wsrep::const_buffer const key(encryption_key_.data(),
834                                       encryption_key_.size());
835         enum provider::status const retval(provider_->enc_set_key(key));
836         if (retval != provider::success)
837         {
838             wsrep::log_error() << "Failed to set encryption key: "
839                                << provider::to_string(retval);
840             return 1;
841         }
842     }
843     return 0;
844 }
845 
846 std::pair<wsrep::gtid, enum wsrep::provider::status>
causal_read(int timeout) const847 wsrep::server_state::causal_read(int timeout) const
848 {
849     return provider().causal_read(timeout);
850 }
851 
on_connect(const wsrep::view & view)852 void wsrep::server_state::on_connect(const wsrep::view& view)
853 {
854     // Sanity checks
855     if (view.own_index() < 0 ||
856         size_t(view.own_index()) >= view.members().size())
857     {
858         std::ostringstream os;
859         os << "Invalid view on connect: own index out of range: " << view;
860 #ifndef NDEBUG
861         wsrep::log_error() << os.str();
862         assert(0);
863 #endif
864         throw wsrep::runtime_error(os.str());
865     }
866 
867     const size_t own_index(static_cast<size_t>(view.own_index()));
868     if (id_.is_undefined() == false && id_ != view.members()[own_index].id())
869     {
870         std::ostringstream os;
871         os << "Connection in connected state.\n"
872            << "Connected view:\n" << view
873            << "Previous view:\n" << current_view_
874            << "Current own ID: " << id_;
875 #ifndef NDEBUG
876         wsrep::log_error() << os.str();
877         assert(0);
878 #endif
879         throw wsrep::runtime_error(os.str());
880     }
881     else
882     {
883         id_ = view.members()[own_index].id();
884     }
885 
886     wsrep::log_info() << "Server "
887                       << name_
888                       << " connected to cluster at position "
889                       << view.state_id()
890                       << " with ID "
891                       << id_;
892 
893     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
894     connected_gtid_ = view.state_id();
895     state(lock, s_connected);
896 }
897 
on_primary_view(const wsrep::view & view,wsrep::high_priority_service * high_priority_service)898 void wsrep::server_state::on_primary_view(
899     const wsrep::view& view,
900     wsrep::high_priority_service* high_priority_service)
901 {
902     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
903     assert(view.final() == false);
904     //
905     // Reached primary from connected state. This may mean the following
906     //
907     // 1) Server was joined to the cluster and got SST transfer
908     // 2) Server was partitioned from the cluster and got back
909     // 3) A new cluster was bootstrapped from non-prim cluster
910     //
911     // There is no enough information here what was the cause
912     // of the primary component, so we need to walk through
913     // all states leading to joined to notify possible state
914     // waiters in other threads.
915     //
916     if (server_service_.sst_before_init())
917     {
918         if (state_ == s_connected)
919         {
920             state(lock, s_joiner);
921             // We need to assign init_initialized_ here to local
922             // variable. If the value here was false, we need to skip
923             // the initializing -> initialized -> joined state cycle
924             // below. However, if we don't assign the value to
925             // local, it is possible that the main thread gets control
926             // between changing the state to initializing and checking
927             // initialized flag, which may cause the initialzing -> initialized
928             // state change to be executed even if it should not be.
929             const bool was_initialized(init_initialized_);
930             state(lock, s_initializing);
931             if (was_initialized)
932             {
933                 // If server side has already been initialized,
934                 // skip directly to s_joined.
935                 state(lock, s_initialized);
936             }
937         }
938     }
939     else
940     {
941         if (state_ == s_connected)
942         {
943             state(lock, s_joiner);
944         }
945     }
946     if (init_initialized_ == false)
947     {
948         lock.unlock();
949         server_service_.debug_sync("on_view_wait_initialized");
950         lock.lock();
951         wait_until_state(lock, s_initialized);
952     }
953     assert(init_initialized_);
954 
955     if (bootstrap_)
956     {
957         server_service_.bootstrap();
958         bootstrap_ = false;
959     }
960 
961     assert(high_priority_service);
962 
963     if (high_priority_service)
964     {
965         recover_streaming_appliers_if_not_recovered(lock,
966                                                     *high_priority_service);
967         close_orphaned_sr_transactions(lock, *high_priority_service);
968     }
969 
970     if (state(lock) < s_joined &&
971         view.state_id().seqno() >= connected_gtid().seqno())
972     {
973         // If we progressed beyond connected seqno, it means we have full state
974         state(lock, s_joined);
975     }
976 }
977 
on_non_primary_view(const wsrep::view & view,wsrep::high_priority_service * high_priority_service)978 void wsrep::server_state::on_non_primary_view(
979     const wsrep::view& view,
980     wsrep::high_priority_service* high_priority_service)
981 {
982         wsrep::unique_lock<wsrep::mutex> lock(mutex_);
983         wsrep::log_info() << "Non-primary view";
984         if (view.final())
985         {
986             go_final(lock, view, high_priority_service);
987         }
988         else if (state_ != s_disconnecting)
989         {
990             state(lock, s_connected);
991         }
992 }
993 
go_final(wsrep::unique_lock<wsrep::mutex> & lock,const wsrep::view & view,wsrep::high_priority_service * hps)994 void wsrep::server_state::go_final(wsrep::unique_lock<wsrep::mutex>& lock,
995                                    const wsrep::view& view,
996                                    wsrep::high_priority_service* hps)
997 {
998     (void)view; // avoid compiler warning "unused parameter 'view'"
999     assert(view.final());
1000     assert(hps);
1001     if (hps)
1002     {
1003         close_transactions_at_disconnect(*hps);
1004     }
1005     state(lock, s_disconnected);
1006     id_ = wsrep::id::undefined();
1007 }
1008 
on_view(const wsrep::view & view,wsrep::high_priority_service * high_priority_service)1009 void wsrep::server_state::on_view(const wsrep::view& view,
1010                                   wsrep::high_priority_service* high_priority_service)
1011 {
1012     wsrep::log_info()
1013         << "================================================\nView:\n"
1014         << view
1015         << "=================================================";
1016     if (current_view_.status() == wsrep::view::primary)
1017     {
1018         previous_primary_view_ = current_view_;
1019     }
1020     current_view_ = view;
1021     switch (view.status())
1022     {
1023     case wsrep::view::primary:
1024         on_primary_view(view, high_priority_service);
1025         break;
1026     case wsrep::view::non_primary:
1027         on_non_primary_view(view, high_priority_service);
1028         break;
1029     case wsrep::view::disconnected:
1030     {
1031         wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1032         go_final(lock, view, high_priority_service);
1033         break;
1034     }
1035     default:
1036         wsrep::log_warning() << "Unrecognized view status: " << view.status();
1037         assert(0);
1038     }
1039 
1040     server_service_.log_view(high_priority_service, view);
1041 }
1042 
on_sync()1043 void wsrep::server_state::on_sync()
1044 {
1045     wsrep::log_info() << "Server " << name_ << " synced with group";
1046     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1047 
1048     // Initial sync
1049     if (server_service_.sst_before_init() && init_synced_ == false)
1050     {
1051         switch (state_)
1052         {
1053         case s_synced:
1054             break;
1055         case s_connected:                 // Seed node path: provider becomes
1056             state(lock, s_joiner);        // synced with itself before anything
1057             WSREP_FALLTHROUGH;            // else. Then goes DB initialization.
1058         case s_joiner:                    // |
1059             state(lock, s_initializing);  // V
1060             break;
1061         case s_donor:
1062             assert(false); // this should never happen
1063             state(lock, s_joined);
1064             state(lock, s_synced);
1065             break;
1066         case s_initialized:
1067             state(lock, s_joined);
1068             WSREP_FALLTHROUGH;
1069         default:
1070             /* State */
1071             state(lock, s_synced);
1072         };
1073     }
1074     else
1075     {
1076         // Calls to on_sync() in synced state are possible if
1077         // server desyncs itself from the group. Provider does not
1078         // inform about this through callbacks.
1079         if (state_ != s_synced)
1080         {
1081             state(lock, s_synced);
1082         }
1083     }
1084     init_synced_ = true;
1085 
1086     enum wsrep::provider::status status(send_pending_rollback_events(lock));
1087     if (status)
1088     {
1089         // TODO should be retried?
1090         wsrep::log_warning()
1091             << "Failed to flush rollback event cache: " << status;
1092     }
1093 }
1094 
on_apply(wsrep::high_priority_service & high_priority_service,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)1095 int wsrep::server_state::on_apply(
1096     wsrep::high_priority_service& high_priority_service,
1097     const wsrep::ws_handle& ws_handle,
1098     const wsrep::ws_meta& ws_meta,
1099     const wsrep::const_buffer& data)
1100 {
1101     if (is_toi(ws_meta.flags()))
1102     {
1103         return apply_toi(provider(), high_priority_service,
1104                          ws_handle, ws_meta, data);
1105     }
1106     else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags()))
1107     {
1108         // Not implemented yet.
1109         assert(0);
1110         return 0;
1111     }
1112     else
1113     {
1114         return apply_write_set(*this, high_priority_service,
1115                                ws_handle, ws_meta, data);
1116     }
1117 }
1118 
start_streaming_client(wsrep::client_state * client_state)1119 void wsrep::server_state::start_streaming_client(
1120     wsrep::client_state* client_state)
1121 {
1122     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1123     WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1124                     wsrep::log::debug_level_server_state,
1125                     "Start streaming client: " << client_state->id());
1126     if (streaming_clients_.insert(
1127             std::make_pair(client_state->id(), client_state)).second == false)
1128     {
1129         wsrep::log_warning() << "Failed to insert streaming client "
1130                              << client_state->id();
1131         assert(0);
1132     }
1133 }
1134 
convert_streaming_client_to_applier(wsrep::client_state * client_state)1135 void wsrep::server_state::convert_streaming_client_to_applier(
1136     wsrep::client_state* client_state)
1137 {
1138     // create streaming_applier beforehand as server_state lock should
1139     // not be held when calling server_service methods
1140     wsrep::high_priority_service* streaming_applier(
1141         server_service_.streaming_applier_service(
1142             client_state->client_service()));
1143 
1144     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1145     WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1146                     wsrep::log::debug_level_server_state,
1147                     "Convert streaming client to applier "
1148                     << client_state->id());
1149     streaming_clients_map::iterator i(
1150         streaming_clients_.find(client_state->id()));
1151     assert(i != streaming_clients_.end());
1152     if (i == streaming_clients_.end())
1153     {
1154         wsrep::log_warning() << "Unable to find streaming client "
1155                              << client_state->id();
1156         assert(0);
1157     }
1158     else
1159     {
1160         streaming_clients_.erase(i);
1161     }
1162 
1163     // Convert to applier only if the state is not disconnected. In
1164     // disconnected state the applier map is supposed to be empty
1165     // and it will be reconstructed from fragment storage when
1166     // joining back to cluster.
1167     if (state(lock) != s_disconnected)
1168     {
1169         if (streaming_applier->adopt_transaction(client_state->transaction()))
1170         {
1171             log_adopt_error(client_state->transaction());
1172             streaming_applier->after_apply();
1173             server_service_.release_high_priority_service(streaming_applier);
1174             return;
1175         }
1176         if (streaming_appliers_.insert(
1177                 std::make_pair(
1178                     std::make_pair(client_state->transaction().server_id(),
1179                                    client_state->transaction().id()),
1180                     streaming_applier)).second == false)
1181         {
1182             wsrep::log_warning() << "Could not insert streaming applier "
1183                                  << id_
1184                                  << ", "
1185                                  << client_state->transaction().id();
1186             assert(0);
1187         }
1188     }
1189     else
1190     {
1191         server_service_.release_high_priority_service(streaming_applier);
1192         client_state->client_service().store_globals();
1193     }
1194 }
1195 
1196 
stop_streaming_client(wsrep::client_state * client_state)1197 void wsrep::server_state::stop_streaming_client(
1198     wsrep::client_state* client_state)
1199 {
1200     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1201      WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1202                      wsrep::log::debug_level_server_state,
1203                      "Stop streaming client: " << client_state->id());
1204     streaming_clients_map::iterator i(
1205         streaming_clients_.find(client_state->id()));
1206     assert(i != streaming_clients_.end());
1207     if (i == streaming_clients_.end())
1208     {
1209         wsrep::log_warning() << "Unable to find streaming client "
1210                              << client_state->id();
1211         assert(0);
1212         return;
1213     }
1214     else
1215     {
1216         streaming_clients_.erase(i);
1217         cond_.notify_all();
1218     }
1219 }
1220 
start_streaming_applier(const wsrep::id & server_id,const wsrep::transaction_id & transaction_id,wsrep::high_priority_service * sa)1221 void wsrep::server_state::start_streaming_applier(
1222     const wsrep::id& server_id,
1223     const wsrep::transaction_id& transaction_id,
1224     wsrep::high_priority_service* sa)
1225 {
1226     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1227     if (streaming_appliers_.insert(
1228             std::make_pair(std::make_pair(server_id, transaction_id),
1229                            sa)).second == false)
1230     {
1231         wsrep::log_error() << "Could not insert streaming applier";
1232         throw wsrep::fatal_error();
1233     }
1234 }
1235 
stop_streaming_applier(const wsrep::id & server_id,const wsrep::transaction_id & transaction_id)1236 void wsrep::server_state::stop_streaming_applier(
1237     const wsrep::id& server_id,
1238     const wsrep::transaction_id& transaction_id)
1239 {
1240     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1241     streaming_appliers_map::iterator i(
1242         streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
1243     assert(i != streaming_appliers_.end());
1244     if (i == streaming_appliers_.end())
1245     {
1246         wsrep::log_warning() << "Could not find streaming applier for "
1247                              << server_id << ":" << transaction_id;
1248     }
1249     else
1250     {
1251         streaming_appliers_.erase(i);
1252         cond_.notify_all();
1253     }
1254 }
1255 
find_streaming_applier(const wsrep::id & server_id,const wsrep::transaction_id & transaction_id) const1256 wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
1257     const wsrep::id& server_id,
1258     const wsrep::transaction_id& transaction_id) const
1259 {
1260     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1261     streaming_appliers_map::const_iterator i(
1262         streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
1263     return (i == streaming_appliers_.end() ? 0 : i->second);
1264 }
1265 
find_streaming_applier(const wsrep::xid & xid) const1266 wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
1267     const wsrep::xid& xid) const
1268 {
1269     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1270     streaming_appliers_map::const_iterator i(streaming_appliers_.begin());
1271     while (i != streaming_appliers_.end())
1272     {
1273         wsrep::high_priority_service* sa(i->second);
1274         if (sa->transaction().xid() == xid)
1275         {
1276             return sa;
1277         }
1278         i++;
1279     }
1280     return NULL;
1281 }
1282 
1283 //////////////////////////////////////////////////////////////////////////////
1284 //                              Private                                     //
1285 //////////////////////////////////////////////////////////////////////////////
1286 
desync(wsrep::unique_lock<wsrep::mutex> & lock)1287 int wsrep::server_state::desync(wsrep::unique_lock<wsrep::mutex>& lock)
1288 {
1289     assert(lock.owns_lock());
1290     ++desync_count_;
1291     lock.unlock();
1292     int ret(provider().desync());
1293     lock.lock();
1294     if (ret)
1295     {
1296         --desync_count_;
1297     }
1298     return ret;
1299 }
1300 
resync(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED)1301 void wsrep::server_state::resync(wsrep::unique_lock<wsrep::mutex>&
1302                                  lock WSREP_UNUSED)
1303 {
1304     assert(lock.owns_lock());
1305     assert(desync_count_ > 0);
1306     if (desync_count_ > 0)
1307     {
1308         --desync_count_;
1309         if (provider().resync())
1310         {
1311             throw wsrep::runtime_error("Failed to resync");
1312         }
1313     }
1314     else
1315     {
1316         wsrep::log_warning() << "desync_count " << desync_count_
1317                              << " on resync";
1318     }
1319 }
1320 
1321 
state(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED,enum wsrep::server_state::state state)1322 void wsrep::server_state::state(
1323     wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
1324     enum wsrep::server_state::state state)
1325 {
1326     assert(lock.owns_lock());
1327     static const char allowed[n_states_][n_states_] =
1328         {
1329             /* dis, ing, ized, cted, jer, jed, dor, sed, ding to/from */
1330             {  0,   1,   0,    1,    0,   0,   0,   0,   0}, /* dis */
1331             {  1,   0,   1,    0,    0,   0,   0,   0,   1}, /* ing */
1332             {  1,   0,   0,    1,    0,   1,   0,   0,   1}, /* ized */
1333             {  1,   0,   0,    1,    1,   0,   0,   1,   1}, /* cted */
1334             {  1,   1,   0,    0,    0,   1,   0,   0,   1}, /* jer */
1335             {  1,   0,   0,    1,    0,   0,   1,   1,   1}, /* jed */
1336             {  1,   0,   0,    1,    0,   1,   0,   0,   1}, /* dor */
1337             {  1,   0,   0,    1,    0,   1,   1,   0,   1}, /* sed */
1338             {  1,   0,   0,    0,    0,   0,   0,   0,   0}  /* ding */
1339         };
1340 
1341     if (allowed[state_][state] == false)
1342     {
1343         std::ostringstream os;
1344         os << "server: " << name_ << " unallowed state transition: "
1345            << wsrep::to_string(state_) << " -> " << wsrep::to_string(state);
1346         wsrep::log_warning() << os.str() << "\n";
1347         assert(0);
1348     }
1349 
1350     WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1351                     wsrep::log::debug_level_server_state,
1352                     "server " << name_ << " state change: "
1353                     << to_c_string(state_) << " -> "
1354                     << to_c_string(state));
1355     state_hist_.push_back(state_);
1356     server_service_.log_state_change(state_, state);
1357     state_ = state;
1358     cond_.notify_all();
1359     while (state_waiters_[state_])
1360     {
1361         cond_.wait(lock);
1362     }
1363 }
1364 
wait_until_state(wsrep::unique_lock<wsrep::mutex> & lock,enum wsrep::server_state::state state) const1365 void wsrep::server_state::wait_until_state(
1366     wsrep::unique_lock<wsrep::mutex>& lock,
1367     enum wsrep::server_state::state state) const
1368 {
1369     ++state_waiters_[state];
1370     while (state_ != state)
1371     {
1372         cond_.wait(lock);
1373         // If the waiter waits for any other state than disconnecting
1374         // or disconnected and the state has been changed to disconnecting,
1375         // this usually means that some error was encountered
1376         if (state != s_disconnecting && state != s_disconnected
1377             && state_ == s_disconnecting)
1378         {
1379             throw wsrep::runtime_error("State wait was interrupted");
1380         }
1381     }
1382     --state_waiters_[state];
1383     cond_.notify_all();
1384 }
1385 
interrupt_state_waiters(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED)1386 void wsrep::server_state::interrupt_state_waiters(
1387     wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
1388 {
1389     assert(lock.owns_lock());
1390     cond_.notify_all();
1391 }
1392 
1393 template <class C>
recover_streaming_appliers_if_not_recovered(wsrep::unique_lock<wsrep::mutex> & lock,C & c)1394 void wsrep::server_state::recover_streaming_appliers_if_not_recovered(
1395     wsrep::unique_lock<wsrep::mutex>& lock, C& c)
1396 {
1397     assert(lock.owns_lock());
1398     if (streaming_appliers_recovered_ == false)
1399     {
1400         lock.unlock();
1401         server_service_.recover_streaming_appliers(c);
1402         lock.lock();
1403     }
1404     streaming_appliers_recovered_ = true;
1405 }
1406 
1407 class transaction_state_cmp
1408 {
1409 public:
transaction_state_cmp(const enum wsrep::transaction::state s)1410     transaction_state_cmp(const enum wsrep::transaction::state s)
1411         : state_(s) { }
operator ()(const std::pair<wsrep::client_id,wsrep::client_state * > & vt) const1412     bool operator()(const std::pair<wsrep::client_id,
1413                     wsrep::client_state*>& vt) const
1414     {
1415         return vt.second->transaction().state() == state_;
1416     }
1417 private:
1418     enum wsrep::transaction::state state_;
1419 };
1420 
close_orphaned_sr_transactions(wsrep::unique_lock<wsrep::mutex> & lock,wsrep::high_priority_service & high_priority_service)1421 void wsrep::server_state::close_orphaned_sr_transactions(
1422     wsrep::unique_lock<wsrep::mutex>& lock,
1423     wsrep::high_priority_service& high_priority_service)
1424 {
1425     assert(lock.owns_lock());
1426 
1427     // When the originator of an SR transaction leaves the primary
1428     // component of the cluster, that SR must be rolled back. When two
1429     // consecutive primary views have the same membership, the system
1430     // may have been in a state with no primary components.
1431     // Example with 2 node cluster:
1432     // - (1,2 primary)
1433     // - (1 non-primary) and (2 non-primary)
1434     // - (1,2 primary)
1435     // We need to rollback SRs owned by both 1 and 2.
1436     // Notice that since the introduction of rollback_event_queue_,
1437     // checking for equal consecutive views is no longer needed.
1438     // However, we must keep it here for the time being, for backwards
1439     // compatibility.
1440     const bool equal_consecutive_views =
1441         current_view_.equal_membership(previous_primary_view_);
1442 
1443     if (current_view_.own_index() == -1 || equal_consecutive_views)
1444     {
1445         streaming_clients_map::iterator i;
1446         transaction_state_cmp prepared_state_cmp(wsrep::transaction::s_prepared);
1447         while ((i = std::find_if_not(streaming_clients_.begin(),
1448                                      streaming_clients_.end(),
1449                                      prepared_state_cmp))
1450                != streaming_clients_.end())
1451         {
1452             wsrep::client_id client_id(i->first);
1453             wsrep::transaction_id transaction_id(i->second->transaction().id());
1454             // It is safe to unlock the server state temporarily here.
1455             // The processing happens inside view handler which is
1456             // protected by the provider commit ordering critical
1457             // section. The lock must be unlocked temporarily to
1458             // allow converting the current client to streaming
1459             // applier in transaction::streaming_rollback().
1460             // The iterator i may be invalidated when the server state
1461             // remains unlocked, so it should not be accessed after
1462             // the bf abort call.
1463             lock.unlock();
1464             i->second->total_order_bf_abort(current_view_.view_seqno());
1465             lock.lock();
1466             streaming_clients_map::const_iterator found_i;
1467             while ((found_i = streaming_clients_.find(client_id)) !=
1468                    streaming_clients_.end() &&
1469                    found_i->second->transaction().id() == transaction_id)
1470             {
1471                 cond_.wait(lock);
1472             }
1473         }
1474     }
1475 
1476     streaming_appliers_map::iterator i(streaming_appliers_.begin());
1477     while (i != streaming_appliers_.end())
1478     {
1479         wsrep::high_priority_service* streaming_applier(i->second);
1480 
1481         // Rollback SR on equal consecutive primary views or if its
1482         // originator is not in the current view.
1483         // Transactions in prepared state must be committed or
1484         // rolled back explicitly, those are never rolled back here.
1485         if ((streaming_applier->transaction().state() !=
1486              wsrep::transaction::s_prepared) &&
1487             (equal_consecutive_views ||
1488              not current_view_.is_member(
1489                  streaming_applier->transaction().server_id())))
1490         {
1491             WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1492                             wsrep::log::debug_level_server_state,
1493                             "Removing SR fragments for "
1494                             << i->first.first
1495                             << ", " << i->first.second);
1496             wsrep::id server_id(i->first.first);
1497             wsrep::transaction_id transaction_id(i->first.second);
1498             int adopt_error;
1499             if ((adopt_error = high_priority_service.adopt_transaction(
1500                      streaming_applier->transaction())))
1501             {
1502                 log_adopt_error(streaming_applier->transaction());
1503             }
1504             // Even if the transaction adopt above fails, we roll back
1505             // the transaction. Adopt error will leave stale entries
1506             // in the streaming log which can be removed manually.
1507             {
1508                 wsrep::high_priority_switch sw(high_priority_service,
1509                                                *streaming_applier);
1510                 streaming_applier->rollback(
1511                     wsrep::ws_handle(), wsrep::ws_meta());
1512                 streaming_applier->after_apply();
1513             }
1514 
1515             streaming_appliers_.erase(i++);
1516             server_service_.release_high_priority_service(streaming_applier);
1517             high_priority_service.store_globals();
1518             wsrep::ws_meta ws_meta(
1519                 wsrep::gtid(),
1520                 wsrep::stid(server_id, transaction_id, wsrep::client_id()),
1521                 wsrep::seqno::undefined(), 0);
1522             lock.unlock();
1523             if (adopt_error == 0)
1524             {
1525                 high_priority_service.remove_fragments(ws_meta);
1526                 high_priority_service.commit(wsrep::ws_handle(transaction_id, 0),
1527                                              ws_meta);
1528             }
1529             high_priority_service.after_apply();
1530             lock.lock();
1531         }
1532         else
1533         {
1534             ++i;
1535         }
1536     }
1537 }
1538 
close_transactions_at_disconnect(wsrep::high_priority_service & high_priority_service)1539 void wsrep::server_state::close_transactions_at_disconnect(
1540     wsrep::high_priority_service& high_priority_service)
1541 {
1542     // Close streaming applier without removing fragments
1543     // from fragment storage. When the server is started again,
1544     // it must be able to recover ongoing streaming transactions.
1545     streaming_appliers_map::iterator i(streaming_appliers_.begin());
1546     while (i != streaming_appliers_.end())
1547     {
1548         wsrep::high_priority_service* streaming_applier(i->second);
1549         {
1550             wsrep::high_priority_switch sw(high_priority_service,
1551                                            *streaming_applier);
1552             streaming_applier->rollback(
1553                 wsrep::ws_handle(), wsrep::ws_meta());
1554             streaming_applier->after_apply();
1555         }
1556         streaming_appliers_.erase(i++);
1557         server_service_.release_high_priority_service(streaming_applier);
1558         high_priority_service.store_globals();
1559     }
1560     streaming_appliers_recovered_ = false;
1561 }
1562 
1563 //
1564 // Rollback event queue
1565 //
1566 
queue_rollback_event(const wsrep::transaction_id & id)1567 void wsrep::server_state::queue_rollback_event(
1568     const wsrep::transaction_id& id)
1569 {
1570     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1571 #ifndef NDEBUG
1572     // Make sure we don't have duplicate
1573     // transaction ids in rollback event queue.
1574     // There is no need to do this in release
1575     // build given that caller (streaming_rollback())
1576     // should avoid duplicates.
1577     for (auto i : rollback_event_queue_)
1578     {
1579         assert(id != i);
1580     }
1581 #endif
1582     rollback_event_queue_.push_back(id);
1583 }
1584 
1585 enum wsrep::provider::status
send_pending_rollback_events(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED)1586 wsrep::server_state::send_pending_rollback_events(
1587     wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
1588 {
1589     assert(lock.owns_lock());
1590     while (not rollback_event_queue_.empty())
1591     {
1592         const wsrep::transaction_id& id(rollback_event_queue_.front());
1593         const enum wsrep::provider::status status(provider().rollback(id));
1594         if (status)
1595         {
1596             return status;
1597         }
1598         rollback_event_queue_.pop_front();
1599     }
1600     return wsrep::provider::success;
1601 }
1602 
1603 enum wsrep::provider::status
send_pending_rollback_events()1604 wsrep::server_state::send_pending_rollback_events()
1605 {
1606     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1607     return send_pending_rollback_events(lock);
1608 }
1609