1 /*
2 * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
3 *
4 * This file is part of wsrep-lib.
5 *
6 * Wsrep-lib is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * Wsrep-lib is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20 #include "wsrep/server_state.hpp"
21 #include "wsrep/client_state.hpp"
22 #include "wsrep/server_service.hpp"
23 #include "wsrep/high_priority_service.hpp"
24 #include "wsrep/transaction.hpp"
25 #include "wsrep/view.hpp"
26 #include "wsrep/logger.hpp"
27 #include "wsrep/compiler.hpp"
28 #include "wsrep/id.hpp"
29
30 #include <cassert>
31 #include <sstream>
32 #include <algorithm>
33
34
35 //////////////////////////////////////////////////////////////////////////////
36 // Helpers //
37 //////////////////////////////////////////////////////////////////////////////
38
39
40 //
41 // This method is used to deal with historical burden of several
42 // ways to bootstrap the cluster. Bootstrap happens if
43 //
44 // * bootstrap option is given
45 // * cluster_address is "gcomm://" (Galera provider)
46 //
is_bootstrap(const std::string & cluster_address,bool bootstrap)47 static bool is_bootstrap(const std::string& cluster_address, bool bootstrap)
48 {
49 return (bootstrap || cluster_address == "gcomm://");
50 }
51
52 // Helper method to provide detailed error message if transaction
53 // adopt for fragment removal fails.
log_adopt_error(const wsrep::transaction & transaction)54 static void log_adopt_error(const wsrep::transaction& transaction)
55 {
56 wsrep::log_warning() << "Adopting a transaction ("
57 << transaction.server_id() << "," << transaction.id()
58 << ") for rollback failed, "
59 << "this may leave stale entries to streaming log "
60 << "which may need to be removed manually.";
61 }
62
63 // resolve which of the two errors return to caller
resolve_return_error(bool const vote,int const vote_err,int const apply_err)64 static inline int resolve_return_error(bool const vote,
65 int const vote_err,
66 int const apply_err)
67 {
68 if (vote) return vote_err;
69 return vote_err != 0 ? vote_err : apply_err;
70 }
71
72 static void
discard_streaming_applier(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,wsrep::high_priority_service * streaming_applier,const wsrep::ws_meta & ws_meta)73 discard_streaming_applier(wsrep::server_state& server_state,
74 wsrep::high_priority_service& high_priority_service,
75 wsrep::high_priority_service* streaming_applier,
76 const wsrep::ws_meta& ws_meta)
77 {
78 server_state.stop_streaming_applier(
79 ws_meta.server_id(), ws_meta.transaction_id());
80 server_state.server_service().release_high_priority_service(
81 streaming_applier);
82 high_priority_service.store_globals();
83 }
84
apply_fragment(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,wsrep::high_priority_service * streaming_applier,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)85 static int apply_fragment(wsrep::server_state& server_state,
86 wsrep::high_priority_service& high_priority_service,
87 wsrep::high_priority_service* streaming_applier,
88 const wsrep::ws_handle& ws_handle,
89 const wsrep::ws_meta& ws_meta,
90 const wsrep::const_buffer& data)
91 {
92 int ret(0);
93 int apply_err;
94 wsrep::mutable_buffer err;
95 {
96 wsrep::high_priority_switch sw(high_priority_service,
97 *streaming_applier);
98 apply_err = streaming_applier->apply_write_set(ws_meta, data, err);
99 if (!apply_err)
100 {
101 assert(err.size() == 0);
102 streaming_applier->after_apply();
103 }
104 else
105 {
106 bool const remove_fragments(streaming_applier->transaction(
107 ).streaming_context().fragments().size() > 0);
108 ret = streaming_applier->rollback(ws_handle, ws_meta);
109 ret = ret || (streaming_applier->after_apply(), 0);
110
111 if (remove_fragments)
112 {
113 ret = ret || streaming_applier->start_transaction(ws_handle,
114 ws_meta);
115 ret = ret || (streaming_applier->adopt_apply_error(err), 0);
116 ret = ret || streaming_applier->remove_fragments(ws_meta);
117 ret = ret || streaming_applier->commit(ws_handle, ws_meta);
118 ret = ret || (streaming_applier->after_apply(), 0);
119 }
120 else
121 {
122 ret = streaming_applier->log_dummy_write_set(ws_handle,
123 ws_meta, err);
124 }
125 }
126 }
127
128 if (!ret)
129 {
130 if (!apply_err)
131 {
132 high_priority_service.debug_crash("crash_apply_cb_before_append_frag");
133 const wsrep::xid xid(streaming_applier->transaction().xid());
134 ret = high_priority_service.append_fragment_and_commit(
135 ws_handle, ws_meta, data, xid);
136 high_priority_service.debug_crash("crash_apply_cb_after_append_frag");
137 ret = ret || (high_priority_service.after_apply(), 0);
138 }
139 else
140 {
141 discard_streaming_applier(server_state,
142 high_priority_service,
143 streaming_applier,
144 ws_meta);
145 ret = resolve_return_error(err.size() > 0, ret, apply_err);
146 }
147 }
148
149 return ret;
150 }
151
commit_fragment(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,wsrep::high_priority_service * streaming_applier,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)152 static int commit_fragment(wsrep::server_state& server_state,
153 wsrep::high_priority_service& high_priority_service,
154 wsrep::high_priority_service* streaming_applier,
155 const wsrep::ws_handle& ws_handle,
156 const wsrep::ws_meta& ws_meta,
157 const wsrep::const_buffer& data)
158 {
159 int ret(0);
160 {
161 wsrep::high_priority_switch sw(
162 high_priority_service, *streaming_applier);
163 wsrep::mutable_buffer err;
164 int const apply_err(
165 streaming_applier->apply_write_set(ws_meta, data, err));
166 if (apply_err)
167 {
168 assert(streaming_applier->transaction(
169 ).streaming_context().fragments().size() > 0);
170 ret = streaming_applier->rollback(ws_handle, ws_meta);
171 ret = ret || (streaming_applier->after_apply(), 0);
172 ret = ret || streaming_applier->start_transaction(
173 ws_handle, ws_meta);
174 ret = ret || (streaming_applier->adopt_apply_error(err),0);
175 }
176 else
177 {
178 assert(err.size() == 0);
179 }
180
181 const wsrep::transaction& trx(streaming_applier->transaction());
182 // Fragment removal for XA is going to happen in after_commit
183 if (trx.state() != wsrep::transaction::s_prepared)
184 {
185 streaming_applier->debug_crash(
186 "crash_apply_cb_before_fragment_removal");
187
188 ret = ret || streaming_applier->remove_fragments(ws_meta);
189
190 streaming_applier->debug_crash(
191 "crash_apply_cb_after_fragment_removal");
192 }
193
194 streaming_applier->debug_crash(
195 "crash_commit_cb_before_last_fragment_commit");
196 ret = ret || streaming_applier->commit(ws_handle, ws_meta);
197 streaming_applier->debug_crash(
198 "crash_commit_cb_last_fragment_commit_success");
199 ret = ret || (streaming_applier->after_apply(), 0);
200 ret = resolve_return_error(err.size() > 0, ret, apply_err);
201 }
202
203 if (!ret)
204 {
205 discard_streaming_applier(server_state, high_priority_service,
206 streaming_applier, ws_meta);
207 }
208
209 return ret;
210 }
211
rollback_fragment(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,wsrep::high_priority_service * streaming_applier,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)212 static int rollback_fragment(wsrep::server_state& server_state,
213 wsrep::high_priority_service& high_priority_service,
214 wsrep::high_priority_service* streaming_applier,
215 const wsrep::ws_handle& ws_handle,
216 const wsrep::ws_meta& ws_meta)
217 {
218 int ret(0);
219 int adopt_error(0);
220 bool const remove_fragments(streaming_applier->transaction().
221 streaming_context().fragments().size() > 0);
222 // If fragment removal is needed, adopt transaction state
223 // and start a transaction for it.
224 if (remove_fragments &&
225 (adopt_error = high_priority_service.adopt_transaction(
226 streaming_applier->transaction())))
227 {
228 log_adopt_error(streaming_applier->transaction());
229 }
230 // Even if the adopt above fails we roll back the streaming transaction.
231 // Adopt failure will leave stale entries in streaming log which can
232 // be removed manually.
233 wsrep::const_buffer no_error;
234 {
235 wsrep::high_priority_switch ws(
236 high_priority_service, *streaming_applier);
237 // Streaming applier rolls back out of order. Fragment
238 // removal grabs commit order below.
239 ret = streaming_applier->rollback(wsrep::ws_handle(), wsrep::ws_meta());
240 ret = ret || (streaming_applier->after_apply(), 0);
241 }
242
243 if (!ret)
244 {
245 discard_streaming_applier(server_state, high_priority_service,
246 streaming_applier, ws_meta);
247
248 if (adopt_error == 0)
249 {
250 if (remove_fragments)
251 {
252 high_priority_service.remove_fragments(ws_meta);
253 high_priority_service.commit(ws_handle, ws_meta);
254 high_priority_service.after_apply();
255 }
256 else
257 {
258 if (ws_meta.ordered())
259 {
260 wsrep::mutable_buffer no_error;
261 ret = high_priority_service.log_dummy_write_set(
262 ws_handle, ws_meta, no_error);
263 }
264 }
265 }
266 }
267 return ret;
268 }
269
apply_write_set(wsrep::server_state & server_state,wsrep::high_priority_service & high_priority_service,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)270 static int apply_write_set(wsrep::server_state& server_state,
271 wsrep::high_priority_service& high_priority_service,
272 const wsrep::ws_handle& ws_handle,
273 const wsrep::ws_meta& ws_meta,
274 const wsrep::const_buffer& data)
275 {
276 int ret(0);
277 if (wsrep::rolls_back_transaction(ws_meta.flags()))
278 {
279 wsrep::mutable_buffer no_error;
280 if (wsrep::starts_transaction(ws_meta.flags()))
281 {
282 // No transaction existed before, log a dummy write set
283 ret = high_priority_service.log_dummy_write_set(
284 ws_handle, ws_meta, no_error);
285 }
286 else
287 {
288 wsrep::high_priority_service* sa(
289 server_state.find_streaming_applier(
290 ws_meta.server_id(), ws_meta.transaction_id()));
291 if (sa == 0)
292 {
293 // It is a known limitation that galera provider
294 // cannot always determine if certification test
295 // for interrupted transaction will pass or fail
296 // (see comments in transaction::certify_fragment()).
297 // As a consequence, unnecessary rollback fragments
298 // may be delivered here. The message below has
299 // been intentionally turned into a debug message,
300 // rather than warning.
301 WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
302 wsrep::log::debug_level_server_state,
303 "Could not find applier context for "
304 << ws_meta.server_id()
305 << ": " << ws_meta.transaction_id());
306 ret = high_priority_service.log_dummy_write_set(
307 ws_handle, ws_meta, no_error);
308 }
309 else
310 {
311 // rollback_fragment() consumes sa
312 ret = rollback_fragment(server_state,
313 high_priority_service,
314 sa,
315 ws_handle,
316 ws_meta);
317 }
318 }
319 }
320 else if (wsrep::starts_transaction(ws_meta.flags()) &&
321 wsrep::commits_transaction(ws_meta.flags()))
322 {
323 ret = high_priority_service.start_transaction(ws_handle, ws_meta);
324 if (!ret)
325 {
326 wsrep::mutable_buffer err;
327 int const apply_err(high_priority_service.apply_write_set(
328 ws_meta, data, err));
329 if (!apply_err)
330 {
331 assert(err.size() == 0);
332 ret = high_priority_service.commit(ws_handle, ws_meta);
333 ret = ret || (high_priority_service.after_apply(), 0);
334 }
335 else
336 {
337 ret = high_priority_service.rollback(ws_handle, ws_meta);
338 ret = ret || (high_priority_service.after_apply(), 0);
339 ret = ret || high_priority_service.log_dummy_write_set(
340 ws_handle, ws_meta, err);
341 ret = resolve_return_error(err.size() > 0, ret, apply_err);
342 }
343 }
344 }
345 else if (wsrep::starts_transaction(ws_meta.flags()))
346 {
347 assert(server_state.find_streaming_applier(
348 ws_meta.server_id(), ws_meta.transaction_id()) == 0);
349 wsrep::high_priority_service* sa(
350 server_state.server_service().streaming_applier_service(
351 high_priority_service));
352 server_state.start_streaming_applier(
353 ws_meta.server_id(), ws_meta.transaction_id(), sa);
354 sa->start_transaction(ws_handle, ws_meta);
355 ret = apply_fragment(server_state,
356 high_priority_service,
357 sa,
358 ws_handle,
359 ws_meta,
360 data);
361 }
362 else if (ws_meta.flags() == 0 || ws_meta.flags() == wsrep::provider::flag::pa_unsafe ||
363 wsrep::prepares_transaction(ws_meta.flags()))
364 {
365 wsrep::high_priority_service* sa(
366 server_state.find_streaming_applier(
367 ws_meta.server_id(), ws_meta.transaction_id()));
368 if (sa == 0)
369 {
370 // It is possible that rapid group membership changes
371 // may cause streaming transaction be rolled back before
372 // commit fragment comes in. Although this is a valid
373 // situation, log a warning if a sac cannot be found as
374 // it may be an indication of a bug too.
375 wsrep::log_warning() << "Could not find applier context for "
376 << ws_meta.server_id()
377 << ": " << ws_meta.transaction_id();
378 wsrep::mutable_buffer no_error;
379 ret = high_priority_service.log_dummy_write_set(
380 ws_handle, ws_meta, no_error);
381 }
382 else
383 {
384 sa->next_fragment(ws_meta);
385 ret = apply_fragment(server_state,
386 high_priority_service,
387 sa,
388 ws_handle,
389 ws_meta,
390 data);
391 }
392 }
393 else if (wsrep::commits_transaction(ws_meta.flags()))
394 {
395 if (high_priority_service.is_replaying())
396 {
397 wsrep::mutable_buffer unused;
398 ret = high_priority_service.start_transaction(
399 ws_handle, ws_meta) ||
400 high_priority_service.apply_write_set(ws_meta, data, unused) ||
401 high_priority_service.commit(ws_handle, ws_meta);
402 }
403 else
404 {
405 wsrep::high_priority_service* sa(
406 server_state.find_streaming_applier(
407 ws_meta.server_id(), ws_meta.transaction_id()));
408 if (sa == 0)
409 {
410 // It is possible that rapid group membership changes
411 // may cause streaming transaction be rolled back before
412 // commit fragment comes in. Although this is a valid
413 // situation, log a warning if a sac cannot be found as
414 // it may be an indication of a bug too.
415 wsrep::log_warning()
416 << "Could not find applier context for "
417 << ws_meta.server_id()
418 << ": " << ws_meta.transaction_id();
419 wsrep::mutable_buffer no_error;
420 ret = high_priority_service.log_dummy_write_set(
421 ws_handle, ws_meta, no_error);
422 }
423 else
424 {
425 // Commit fragment consumes sa
426 sa->next_fragment(ws_meta);
427 ret = commit_fragment(server_state,
428 high_priority_service,
429 sa,
430 ws_handle,
431 ws_meta,
432 data);
433 }
434 }
435 }
436 else
437 {
438 assert(0);
439 }
440 if (ret)
441 {
442 wsrep::log_error() << "Failed to apply write set: " << ws_meta;
443 }
444 return ret;
445 }
446
apply_toi(wsrep::provider & provider,wsrep::high_priority_service & high_priority_service,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)447 static int apply_toi(wsrep::provider& provider,
448 wsrep::high_priority_service& high_priority_service,
449 const wsrep::ws_handle& ws_handle,
450 const wsrep::ws_meta& ws_meta,
451 const wsrep::const_buffer& data)
452 {
453 if (wsrep::starts_transaction(ws_meta.flags()) &&
454 wsrep::commits_transaction(ws_meta.flags()))
455 {
456 //
457 // Regular TOI.
458 //
459 provider.commit_order_enter(ws_handle, ws_meta);
460 wsrep::mutable_buffer err;
461 int const apply_err(high_priority_service.apply_toi(ws_meta,data,err));
462 int const vote_err(provider.commit_order_leave(ws_handle, ws_meta,err));
463 return resolve_return_error(err.size() > 0, vote_err, apply_err);
464 }
465 else if (wsrep::starts_transaction(ws_meta.flags()))
466 {
467 provider.commit_order_enter(ws_handle, ws_meta);
468 wsrep::mutable_buffer err;
469 int const apply_err(high_priority_service.apply_nbo_begin(ws_meta, data, err));
470 int const vote_err(provider.commit_order_leave(ws_handle, ws_meta, err));
471 return resolve_return_error(err.size() > 0, vote_err, apply_err);
472 }
473 else if (wsrep::commits_transaction(ws_meta.flags()))
474 {
475 // NBO end event is ignored here, both local and applied
476 // have NBO end handled via local TOI calls.
477 provider.commit_order_enter(ws_handle, ws_meta);
478 wsrep::mutable_buffer err;
479 provider.commit_order_leave(ws_handle, ws_meta, err);
480 return 0;
481 }
482 else
483 {
484 assert(0);
485 return 0;
486 }
487 }
488
489 //////////////////////////////////////////////////////////////////////////////
490 // Server State //
491 //////////////////////////////////////////////////////////////////////////////
492
load_provider(const std::string & provider_spec,const std::string & provider_options,const wsrep::provider::services & services)493 int wsrep::server_state::load_provider(
494 const std::string& provider_spec, const std::string& provider_options,
495 const wsrep::provider::services& services)
496 {
497 wsrep::log_info() << "Loading provider " << provider_spec
498 << " initial position: " << initial_position_;
499
500 provider_ = wsrep::provider::make_provider(*this,
501 provider_spec,
502 provider_options,
503 services);
504 return (provider_ ? 0 : 1);
505 }
506
unload_provider()507 void wsrep::server_state::unload_provider()
508 {
509 delete provider_;
510 provider_ = 0;
511 }
512
connect(const std::string & cluster_name,const std::string & cluster_address,const std::string & state_donor,bool bootstrap)513 int wsrep::server_state::connect(const std::string& cluster_name,
514 const std::string& cluster_address,
515 const std::string& state_donor,
516 bool bootstrap)
517 {
518 bootstrap_ = is_bootstrap(cluster_address, bootstrap);
519 wsrep::log_info() << "Connecting with bootstrap option: " << bootstrap_;
520 return provider().connect(cluster_name, cluster_address, state_donor,
521 bootstrap_);
522 }
523
disconnect()524 int wsrep::server_state::disconnect()
525 {
526 {
527 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
528 // In case of failure situations which are caused by provider
529 // being shut down some failing operation may also try to shut
530 // down the replication. Check the state here and
531 // return success if the provider disconnect is already in progress
532 // or has completed.
533 if (state(lock) == s_disconnecting || state(lock) == s_disconnected)
534 {
535 return 0;
536 }
537 state(lock, s_disconnecting);
538 interrupt_state_waiters(lock);
539 }
540 return provider().disconnect();
541 }
542
~server_state()543 wsrep::server_state::~server_state()
544 {
545 delete provider_;
546 }
547
548 std::vector<wsrep::provider::status_variable>
status() const549 wsrep::server_state::status() const
550 {
551 return provider().status();
552 }
553
554
pause()555 wsrep::seqno wsrep::server_state::pause()
556 {
557 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
558 // Disallow concurrent calls to pause to in order to have non-concurrent
559 // access to desynced_on_pause_ which is checked in resume() call.
560 wsrep::log_info() << "pause";
561 while (pause_count_ > 0)
562 {
563 cond_.wait(lock);
564 }
565 ++pause_count_;
566 assert(pause_seqno_.is_undefined());
567 lock.unlock();
568 pause_seqno_ = provider().pause();
569 lock.lock();
570 if (pause_seqno_.is_undefined())
571 {
572 --pause_count_;
573 }
574 return pause_seqno_;
575 }
576
resume()577 void wsrep::server_state::resume()
578 {
579 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
580 wsrep::log_info() << "resume";
581 assert(pause_seqno_.is_undefined() == false);
582 assert(pause_count_ == 1);
583 if (provider().resume())
584 {
585 throw wsrep::runtime_error("Failed to resume provider");
586 }
587 pause_seqno_ = wsrep::seqno::undefined();
588 --pause_count_;
589 cond_.notify_all();
590 }
591
desync_and_pause()592 wsrep::seqno wsrep::server_state::desync_and_pause()
593 {
594 wsrep::log_info() << "Desyncing and pausing the provider";
595 // Temporary variable to store desync() return status. This will be
596 // assigned to desynced_on_pause_ after pause() call to prevent
597 // concurrent access to member variable desynced_on_pause_.
598 bool desync_successful;
599 if (desync())
600 {
601 // Desync may give transient error if the provider cannot
602 // communicate with the rest of the cluster. However, this
603 // error can be tolerated because if the provider can be
604 // paused successfully below.
605 WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
606 wsrep::log::debug_level_server_state,
607 "Failed to desync server before pause");
608 desync_successful = false;
609 }
610 else
611 {
612 desync_successful = true;
613 }
614 wsrep::seqno ret(pause());
615 if (ret.is_undefined())
616 {
617 wsrep::log_warning() << "Failed to pause provider";
618 resync();
619 return wsrep::seqno::undefined();
620 }
621 else
622 {
623 desynced_on_pause_ = desync_successful;
624 }
625 wsrep::log_info() << "Provider paused at: " << ret;
626 return ret;
627 }
628
resume_and_resync()629 void wsrep::server_state::resume_and_resync()
630 {
631 wsrep::log_info() << "Resuming and resyncing the provider";
632 try
633 {
634 // Assign desynced_on_pause_ to local variable before resuming
635 // in order to avoid concurrent access to desynced_on_pause_ member
636 // variable.
637 bool do_resync = desynced_on_pause_;
638 desynced_on_pause_ = false;
639 resume();
640 if (do_resync)
641 {
642 resync();
643 }
644 }
645 catch (const wsrep::runtime_error& e)
646 {
647 wsrep::log_warning()
648 << "Resume and resync failed, server may have to be restarted";
649 }
650 }
651
prepare_for_sst()652 std::string wsrep::server_state::prepare_for_sst()
653 {
654 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
655 state(lock, s_joiner);
656 lock.unlock();
657 return server_service_.sst_request();
658 }
659
start_sst(const std::string & sst_request,const wsrep::gtid & gtid,bool bypass)660 int wsrep::server_state::start_sst(const std::string& sst_request,
661 const wsrep::gtid& gtid,
662 bool bypass)
663 {
664 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
665 state(lock, s_donor);
666 int ret(0);
667 lock.unlock();
668 if (server_service_.start_sst(sst_request, gtid, bypass))
669 {
670 lock.lock();
671 wsrep::log_warning() << "SST preparation failed";
672 // v26 API does not have JOINED event, so in anticipation of SYNCED
673 // we must do it here.
674 state(lock, s_joined);
675 ret = 1;
676 }
677 return ret;
678 }
679
sst_sent(const wsrep::gtid & gtid,int error)680 void wsrep::server_state::sst_sent(const wsrep::gtid& gtid, int error)
681 {
682 if (0 == error)
683 wsrep::log_info() << "SST sent: " << gtid;
684 else
685 wsrep::log_info() << "SST sending failed: " << error;
686
687 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
688 // v26 API does not have JOINED event, so in anticipation of SYNCED
689 // we must do it here.
690 state(lock, s_joined);
691 lock.unlock();
692 enum provider::status const retval(provider().sst_sent(gtid, error));
693 if (retval != provider::success)
694 {
695 std::string msg("wsrep::sst_sent() returned an error: ");
696 msg += wsrep::provider::to_string(retval);
697 server_service_.log_message(wsrep::log::warning, msg.c_str());
698 }
699 }
700
sst_received(wsrep::client_service & cs,int const error)701 void wsrep::server_state::sst_received(wsrep::client_service& cs,
702 int const error)
703 {
704 wsrep::log_info() << "SST received";
705 wsrep::gtid gtid(wsrep::gtid::undefined());
706 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
707 assert(state_ == s_joiner || state_ == s_initialized);
708
709 // Run initialization only if the SST was successful.
710 // In case of SST failure the system is in undefined state
711 // may not be recoverable.
712 if (error == 0)
713 {
714 if (server_service_.sst_before_init())
715 {
716 if (init_initialized_ == false)
717 {
718 state(lock, s_initializing);
719 lock.unlock();
720 server_service_.debug_sync("on_view_wait_initialized");
721 lock.lock();
722 wait_until_state(lock, s_initialized);
723 assert(init_initialized_);
724 }
725 }
726 lock.unlock();
727
728 if (id_.is_undefined())
729 {
730 assert(0);
731 throw wsrep::runtime_error(
732 "wsrep::sst_received() called before connection to cluster");
733 }
734
735 gtid = server_service_.get_position(cs);
736 wsrep::log_info() << "Recovered position from storage: " << gtid;
737
738 lock.lock();
739 if (gtid.seqno() >= connected_gtid().seqno())
740 {
741 /* Now the node has all the data the cluster has: part in
742 * storage, part in replication event queue. */
743 state(lock, s_joined);
744 }
745 lock.unlock();
746
747 wsrep::view const v(server_service_.get_view(cs, id_));
748 wsrep::log_info() << "Recovered view from SST:\n" << v;
749
750 /*
751 * If the state id from recovered view has undefined ID, we may
752 * be upgrading from earlier version which does not provide
753 * view stored in stable storage. In this case we skip
754 * sanity checks and assigning the current view and wait
755 * until the first view delivery.
756 */
757 if (v.state_id().id().is_undefined() == false)
758 {
759 if (v.state_id().id() != gtid.id() ||
760 v.state_id().seqno() > gtid.seqno())
761 {
762 /* Since IN GENERAL we may not be able to recover SST GTID from
763 * the state data, we have to rely on SST script passing the
764 * GTID value explicitly.
765 * Here we check if the passed GTID makes any sense: it should
766 * have the same UUID and greater or equal seqno than the last
767 * logged view. */
768 std::ostringstream msg;
769 msg << "SST script passed bogus GTID: " << gtid
770 << ". Preceding view GTID: " << v.state_id();
771 throw wsrep::runtime_error(msg.str());
772 }
773
774 if (current_view_.status() == wsrep::view::primary)
775 {
776 previous_primary_view_ = current_view_;
777 }
778 current_view_ = v;
779 server_service_.log_view(NULL /* this view is stored already */, v);
780 }
781 else
782 {
783 wsrep::log_warning()
784 << "View recovered from stable storage was empty. If the "
785 << "server is doing rolling upgrade from previous version "
786 << "which does not support storing view info into stable "
787 << "storage, this is ok. Otherwise this may be a sign of "
788 << "malfunction.";
789 }
790 lock.lock();
791 recover_streaming_appliers_if_not_recovered(lock, cs);
792 lock.unlock();
793 }
794
795 enum provider::status const retval(provider().sst_received(gtid, error));
796 if (retval != provider::success)
797 {
798 std::string msg("wsrep::sst_received() failed: ");
799 msg += wsrep::provider::to_string(retval);
800 throw wsrep::runtime_error(msg);
801 }
802 }
803
initialized()804 void wsrep::server_state::initialized()
805 {
806 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
807 wsrep::log_info() << "Server initialized";
808 init_initialized_ = true;
809 if (server_service_.sst_before_init())
810 {
811 state(lock, s_initialized);
812 }
813 else
814 {
815 state(lock, s_initializing);
816 state(lock, s_initialized);
817 }
818 }
819
820 enum wsrep::provider::status
wait_for_gtid(const wsrep::gtid & gtid,int timeout) const821 wsrep::server_state::wait_for_gtid(const wsrep::gtid& gtid, int timeout)
822 const
823 {
824 return provider().wait_for_gtid(gtid, timeout);
825 }
826
827 int
set_encryption_key(std::vector<unsigned char> & key)828 wsrep::server_state::set_encryption_key(std::vector<unsigned char>& key)
829 {
830 encryption_key_ = key;
831 if (state_ != s_disconnected)
832 {
833 wsrep::const_buffer const key(encryption_key_.data(),
834 encryption_key_.size());
835 enum provider::status const retval(provider_->enc_set_key(key));
836 if (retval != provider::success)
837 {
838 wsrep::log_error() << "Failed to set encryption key: "
839 << provider::to_string(retval);
840 return 1;
841 }
842 }
843 return 0;
844 }
845
846 std::pair<wsrep::gtid, enum wsrep::provider::status>
causal_read(int timeout) const847 wsrep::server_state::causal_read(int timeout) const
848 {
849 return provider().causal_read(timeout);
850 }
851
on_connect(const wsrep::view & view)852 void wsrep::server_state::on_connect(const wsrep::view& view)
853 {
854 // Sanity checks
855 if (view.own_index() < 0 ||
856 size_t(view.own_index()) >= view.members().size())
857 {
858 std::ostringstream os;
859 os << "Invalid view on connect: own index out of range: " << view;
860 #ifndef NDEBUG
861 wsrep::log_error() << os.str();
862 assert(0);
863 #endif
864 throw wsrep::runtime_error(os.str());
865 }
866
867 const size_t own_index(static_cast<size_t>(view.own_index()));
868 if (id_.is_undefined() == false && id_ != view.members()[own_index].id())
869 {
870 std::ostringstream os;
871 os << "Connection in connected state.\n"
872 << "Connected view:\n" << view
873 << "Previous view:\n" << current_view_
874 << "Current own ID: " << id_;
875 #ifndef NDEBUG
876 wsrep::log_error() << os.str();
877 assert(0);
878 #endif
879 throw wsrep::runtime_error(os.str());
880 }
881 else
882 {
883 id_ = view.members()[own_index].id();
884 }
885
886 wsrep::log_info() << "Server "
887 << name_
888 << " connected to cluster at position "
889 << view.state_id()
890 << " with ID "
891 << id_;
892
893 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
894 connected_gtid_ = view.state_id();
895 state(lock, s_connected);
896 }
897
on_primary_view(const wsrep::view & view,wsrep::high_priority_service * high_priority_service)898 void wsrep::server_state::on_primary_view(
899 const wsrep::view& view,
900 wsrep::high_priority_service* high_priority_service)
901 {
902 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
903 assert(view.final() == false);
904 //
905 // Reached primary from connected state. This may mean the following
906 //
907 // 1) Server was joined to the cluster and got SST transfer
908 // 2) Server was partitioned from the cluster and got back
909 // 3) A new cluster was bootstrapped from non-prim cluster
910 //
911 // There is no enough information here what was the cause
912 // of the primary component, so we need to walk through
913 // all states leading to joined to notify possible state
914 // waiters in other threads.
915 //
916 if (server_service_.sst_before_init())
917 {
918 if (state_ == s_connected)
919 {
920 state(lock, s_joiner);
921 // We need to assign init_initialized_ here to local
922 // variable. If the value here was false, we need to skip
923 // the initializing -> initialized -> joined state cycle
924 // below. However, if we don't assign the value to
925 // local, it is possible that the main thread gets control
926 // between changing the state to initializing and checking
927 // initialized flag, which may cause the initialzing -> initialized
928 // state change to be executed even if it should not be.
929 const bool was_initialized(init_initialized_);
930 state(lock, s_initializing);
931 if (was_initialized)
932 {
933 // If server side has already been initialized,
934 // skip directly to s_joined.
935 state(lock, s_initialized);
936 }
937 }
938 }
939 else
940 {
941 if (state_ == s_connected)
942 {
943 state(lock, s_joiner);
944 }
945 }
946 if (init_initialized_ == false)
947 {
948 lock.unlock();
949 server_service_.debug_sync("on_view_wait_initialized");
950 lock.lock();
951 wait_until_state(lock, s_initialized);
952 }
953 assert(init_initialized_);
954
955 if (bootstrap_)
956 {
957 server_service_.bootstrap();
958 bootstrap_ = false;
959 }
960
961 assert(high_priority_service);
962
963 if (high_priority_service)
964 {
965 recover_streaming_appliers_if_not_recovered(lock,
966 *high_priority_service);
967 close_orphaned_sr_transactions(lock, *high_priority_service);
968 }
969
970 if (state(lock) < s_joined &&
971 view.state_id().seqno() >= connected_gtid().seqno())
972 {
973 // If we progressed beyond connected seqno, it means we have full state
974 state(lock, s_joined);
975 }
976 }
977
on_non_primary_view(const wsrep::view & view,wsrep::high_priority_service * high_priority_service)978 void wsrep::server_state::on_non_primary_view(
979 const wsrep::view& view,
980 wsrep::high_priority_service* high_priority_service)
981 {
982 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
983 wsrep::log_info() << "Non-primary view";
984 if (view.final())
985 {
986 go_final(lock, view, high_priority_service);
987 }
988 else if (state_ != s_disconnecting)
989 {
990 state(lock, s_connected);
991 }
992 }
993
go_final(wsrep::unique_lock<wsrep::mutex> & lock,const wsrep::view & view,wsrep::high_priority_service * hps)994 void wsrep::server_state::go_final(wsrep::unique_lock<wsrep::mutex>& lock,
995 const wsrep::view& view,
996 wsrep::high_priority_service* hps)
997 {
998 (void)view; // avoid compiler warning "unused parameter 'view'"
999 assert(view.final());
1000 assert(hps);
1001 if (hps)
1002 {
1003 close_transactions_at_disconnect(*hps);
1004 }
1005 state(lock, s_disconnected);
1006 id_ = wsrep::id::undefined();
1007 }
1008
on_view(const wsrep::view & view,wsrep::high_priority_service * high_priority_service)1009 void wsrep::server_state::on_view(const wsrep::view& view,
1010 wsrep::high_priority_service* high_priority_service)
1011 {
1012 wsrep::log_info()
1013 << "================================================\nView:\n"
1014 << view
1015 << "=================================================";
1016 if (current_view_.status() == wsrep::view::primary)
1017 {
1018 previous_primary_view_ = current_view_;
1019 }
1020 current_view_ = view;
1021 switch (view.status())
1022 {
1023 case wsrep::view::primary:
1024 on_primary_view(view, high_priority_service);
1025 break;
1026 case wsrep::view::non_primary:
1027 on_non_primary_view(view, high_priority_service);
1028 break;
1029 case wsrep::view::disconnected:
1030 {
1031 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1032 go_final(lock, view, high_priority_service);
1033 break;
1034 }
1035 default:
1036 wsrep::log_warning() << "Unrecognized view status: " << view.status();
1037 assert(0);
1038 }
1039
1040 server_service_.log_view(high_priority_service, view);
1041 }
1042
on_sync()1043 void wsrep::server_state::on_sync()
1044 {
1045 wsrep::log_info() << "Server " << name_ << " synced with group";
1046 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1047
1048 // Initial sync
1049 if (server_service_.sst_before_init() && init_synced_ == false)
1050 {
1051 switch (state_)
1052 {
1053 case s_synced:
1054 break;
1055 case s_connected: // Seed node path: provider becomes
1056 state(lock, s_joiner); // synced with itself before anything
1057 WSREP_FALLTHROUGH; // else. Then goes DB initialization.
1058 case s_joiner: // |
1059 state(lock, s_initializing); // V
1060 break;
1061 case s_donor:
1062 assert(false); // this should never happen
1063 state(lock, s_joined);
1064 state(lock, s_synced);
1065 break;
1066 case s_initialized:
1067 state(lock, s_joined);
1068 WSREP_FALLTHROUGH;
1069 default:
1070 /* State */
1071 state(lock, s_synced);
1072 };
1073 }
1074 else
1075 {
1076 // Calls to on_sync() in synced state are possible if
1077 // server desyncs itself from the group. Provider does not
1078 // inform about this through callbacks.
1079 if (state_ != s_synced)
1080 {
1081 state(lock, s_synced);
1082 }
1083 }
1084 init_synced_ = true;
1085
1086 enum wsrep::provider::status status(send_pending_rollback_events(lock));
1087 if (status)
1088 {
1089 // TODO should be retried?
1090 wsrep::log_warning()
1091 << "Failed to flush rollback event cache: " << status;
1092 }
1093 }
1094
on_apply(wsrep::high_priority_service & high_priority_service,const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,const wsrep::const_buffer & data)1095 int wsrep::server_state::on_apply(
1096 wsrep::high_priority_service& high_priority_service,
1097 const wsrep::ws_handle& ws_handle,
1098 const wsrep::ws_meta& ws_meta,
1099 const wsrep::const_buffer& data)
1100 {
1101 if (is_toi(ws_meta.flags()))
1102 {
1103 return apply_toi(provider(), high_priority_service,
1104 ws_handle, ws_meta, data);
1105 }
1106 else if (is_commutative(ws_meta.flags()) || is_native(ws_meta.flags()))
1107 {
1108 // Not implemented yet.
1109 assert(0);
1110 return 0;
1111 }
1112 else
1113 {
1114 return apply_write_set(*this, high_priority_service,
1115 ws_handle, ws_meta, data);
1116 }
1117 }
1118
start_streaming_client(wsrep::client_state * client_state)1119 void wsrep::server_state::start_streaming_client(
1120 wsrep::client_state* client_state)
1121 {
1122 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1123 WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1124 wsrep::log::debug_level_server_state,
1125 "Start streaming client: " << client_state->id());
1126 if (streaming_clients_.insert(
1127 std::make_pair(client_state->id(), client_state)).second == false)
1128 {
1129 wsrep::log_warning() << "Failed to insert streaming client "
1130 << client_state->id();
1131 assert(0);
1132 }
1133 }
1134
convert_streaming_client_to_applier(wsrep::client_state * client_state)1135 void wsrep::server_state::convert_streaming_client_to_applier(
1136 wsrep::client_state* client_state)
1137 {
1138 // create streaming_applier beforehand as server_state lock should
1139 // not be held when calling server_service methods
1140 wsrep::high_priority_service* streaming_applier(
1141 server_service_.streaming_applier_service(
1142 client_state->client_service()));
1143
1144 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1145 WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1146 wsrep::log::debug_level_server_state,
1147 "Convert streaming client to applier "
1148 << client_state->id());
1149 streaming_clients_map::iterator i(
1150 streaming_clients_.find(client_state->id()));
1151 assert(i != streaming_clients_.end());
1152 if (i == streaming_clients_.end())
1153 {
1154 wsrep::log_warning() << "Unable to find streaming client "
1155 << client_state->id();
1156 assert(0);
1157 }
1158 else
1159 {
1160 streaming_clients_.erase(i);
1161 }
1162
1163 // Convert to applier only if the state is not disconnected. In
1164 // disconnected state the applier map is supposed to be empty
1165 // and it will be reconstructed from fragment storage when
1166 // joining back to cluster.
1167 if (state(lock) != s_disconnected)
1168 {
1169 if (streaming_applier->adopt_transaction(client_state->transaction()))
1170 {
1171 log_adopt_error(client_state->transaction());
1172 streaming_applier->after_apply();
1173 server_service_.release_high_priority_service(streaming_applier);
1174 return;
1175 }
1176 if (streaming_appliers_.insert(
1177 std::make_pair(
1178 std::make_pair(client_state->transaction().server_id(),
1179 client_state->transaction().id()),
1180 streaming_applier)).second == false)
1181 {
1182 wsrep::log_warning() << "Could not insert streaming applier "
1183 << id_
1184 << ", "
1185 << client_state->transaction().id();
1186 assert(0);
1187 }
1188 }
1189 else
1190 {
1191 server_service_.release_high_priority_service(streaming_applier);
1192 client_state->client_service().store_globals();
1193 }
1194 }
1195
1196
stop_streaming_client(wsrep::client_state * client_state)1197 void wsrep::server_state::stop_streaming_client(
1198 wsrep::client_state* client_state)
1199 {
1200 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1201 WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1202 wsrep::log::debug_level_server_state,
1203 "Stop streaming client: " << client_state->id());
1204 streaming_clients_map::iterator i(
1205 streaming_clients_.find(client_state->id()));
1206 assert(i != streaming_clients_.end());
1207 if (i == streaming_clients_.end())
1208 {
1209 wsrep::log_warning() << "Unable to find streaming client "
1210 << client_state->id();
1211 assert(0);
1212 return;
1213 }
1214 else
1215 {
1216 streaming_clients_.erase(i);
1217 cond_.notify_all();
1218 }
1219 }
1220
start_streaming_applier(const wsrep::id & server_id,const wsrep::transaction_id & transaction_id,wsrep::high_priority_service * sa)1221 void wsrep::server_state::start_streaming_applier(
1222 const wsrep::id& server_id,
1223 const wsrep::transaction_id& transaction_id,
1224 wsrep::high_priority_service* sa)
1225 {
1226 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1227 if (streaming_appliers_.insert(
1228 std::make_pair(std::make_pair(server_id, transaction_id),
1229 sa)).second == false)
1230 {
1231 wsrep::log_error() << "Could not insert streaming applier";
1232 throw wsrep::fatal_error();
1233 }
1234 }
1235
stop_streaming_applier(const wsrep::id & server_id,const wsrep::transaction_id & transaction_id)1236 void wsrep::server_state::stop_streaming_applier(
1237 const wsrep::id& server_id,
1238 const wsrep::transaction_id& transaction_id)
1239 {
1240 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1241 streaming_appliers_map::iterator i(
1242 streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
1243 assert(i != streaming_appliers_.end());
1244 if (i == streaming_appliers_.end())
1245 {
1246 wsrep::log_warning() << "Could not find streaming applier for "
1247 << server_id << ":" << transaction_id;
1248 }
1249 else
1250 {
1251 streaming_appliers_.erase(i);
1252 cond_.notify_all();
1253 }
1254 }
1255
find_streaming_applier(const wsrep::id & server_id,const wsrep::transaction_id & transaction_id) const1256 wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
1257 const wsrep::id& server_id,
1258 const wsrep::transaction_id& transaction_id) const
1259 {
1260 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1261 streaming_appliers_map::const_iterator i(
1262 streaming_appliers_.find(std::make_pair(server_id, transaction_id)));
1263 return (i == streaming_appliers_.end() ? 0 : i->second);
1264 }
1265
find_streaming_applier(const wsrep::xid & xid) const1266 wsrep::high_priority_service* wsrep::server_state::find_streaming_applier(
1267 const wsrep::xid& xid) const
1268 {
1269 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1270 streaming_appliers_map::const_iterator i(streaming_appliers_.begin());
1271 while (i != streaming_appliers_.end())
1272 {
1273 wsrep::high_priority_service* sa(i->second);
1274 if (sa->transaction().xid() == xid)
1275 {
1276 return sa;
1277 }
1278 i++;
1279 }
1280 return NULL;
1281 }
1282
1283 //////////////////////////////////////////////////////////////////////////////
1284 // Private //
1285 //////////////////////////////////////////////////////////////////////////////
1286
desync(wsrep::unique_lock<wsrep::mutex> & lock)1287 int wsrep::server_state::desync(wsrep::unique_lock<wsrep::mutex>& lock)
1288 {
1289 assert(lock.owns_lock());
1290 ++desync_count_;
1291 lock.unlock();
1292 int ret(provider().desync());
1293 lock.lock();
1294 if (ret)
1295 {
1296 --desync_count_;
1297 }
1298 return ret;
1299 }
1300
resync(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED)1301 void wsrep::server_state::resync(wsrep::unique_lock<wsrep::mutex>&
1302 lock WSREP_UNUSED)
1303 {
1304 assert(lock.owns_lock());
1305 assert(desync_count_ > 0);
1306 if (desync_count_ > 0)
1307 {
1308 --desync_count_;
1309 if (provider().resync())
1310 {
1311 throw wsrep::runtime_error("Failed to resync");
1312 }
1313 }
1314 else
1315 {
1316 wsrep::log_warning() << "desync_count " << desync_count_
1317 << " on resync";
1318 }
1319 }
1320
1321
state(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED,enum wsrep::server_state::state state)1322 void wsrep::server_state::state(
1323 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
1324 enum wsrep::server_state::state state)
1325 {
1326 assert(lock.owns_lock());
1327 static const char allowed[n_states_][n_states_] =
1328 {
1329 /* dis, ing, ized, cted, jer, jed, dor, sed, ding to/from */
1330 { 0, 1, 0, 1, 0, 0, 0, 0, 0}, /* dis */
1331 { 1, 0, 1, 0, 0, 0, 0, 0, 1}, /* ing */
1332 { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* ized */
1333 { 1, 0, 0, 1, 1, 0, 0, 1, 1}, /* cted */
1334 { 1, 1, 0, 0, 0, 1, 0, 0, 1}, /* jer */
1335 { 1, 0, 0, 1, 0, 0, 1, 1, 1}, /* jed */
1336 { 1, 0, 0, 1, 0, 1, 0, 0, 1}, /* dor */
1337 { 1, 0, 0, 1, 0, 1, 1, 0, 1}, /* sed */
1338 { 1, 0, 0, 0, 0, 0, 0, 0, 0} /* ding */
1339 };
1340
1341 if (allowed[state_][state] == false)
1342 {
1343 std::ostringstream os;
1344 os << "server: " << name_ << " unallowed state transition: "
1345 << wsrep::to_string(state_) << " -> " << wsrep::to_string(state);
1346 wsrep::log_warning() << os.str() << "\n";
1347 assert(0);
1348 }
1349
1350 WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1351 wsrep::log::debug_level_server_state,
1352 "server " << name_ << " state change: "
1353 << to_c_string(state_) << " -> "
1354 << to_c_string(state));
1355 state_hist_.push_back(state_);
1356 server_service_.log_state_change(state_, state);
1357 state_ = state;
1358 cond_.notify_all();
1359 while (state_waiters_[state_])
1360 {
1361 cond_.wait(lock);
1362 }
1363 }
1364
wait_until_state(wsrep::unique_lock<wsrep::mutex> & lock,enum wsrep::server_state::state state) const1365 void wsrep::server_state::wait_until_state(
1366 wsrep::unique_lock<wsrep::mutex>& lock,
1367 enum wsrep::server_state::state state) const
1368 {
1369 ++state_waiters_[state];
1370 while (state_ != state)
1371 {
1372 cond_.wait(lock);
1373 // If the waiter waits for any other state than disconnecting
1374 // or disconnected and the state has been changed to disconnecting,
1375 // this usually means that some error was encountered
1376 if (state != s_disconnecting && state != s_disconnected
1377 && state_ == s_disconnecting)
1378 {
1379 throw wsrep::runtime_error("State wait was interrupted");
1380 }
1381 }
1382 --state_waiters_[state];
1383 cond_.notify_all();
1384 }
1385
interrupt_state_waiters(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED)1386 void wsrep::server_state::interrupt_state_waiters(
1387 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
1388 {
1389 assert(lock.owns_lock());
1390 cond_.notify_all();
1391 }
1392
1393 template <class C>
recover_streaming_appliers_if_not_recovered(wsrep::unique_lock<wsrep::mutex> & lock,C & c)1394 void wsrep::server_state::recover_streaming_appliers_if_not_recovered(
1395 wsrep::unique_lock<wsrep::mutex>& lock, C& c)
1396 {
1397 assert(lock.owns_lock());
1398 if (streaming_appliers_recovered_ == false)
1399 {
1400 lock.unlock();
1401 server_service_.recover_streaming_appliers(c);
1402 lock.lock();
1403 }
1404 streaming_appliers_recovered_ = true;
1405 }
1406
1407 class transaction_state_cmp
1408 {
1409 public:
transaction_state_cmp(const enum wsrep::transaction::state s)1410 transaction_state_cmp(const enum wsrep::transaction::state s)
1411 : state_(s) { }
operator ()(const std::pair<wsrep::client_id,wsrep::client_state * > & vt) const1412 bool operator()(const std::pair<wsrep::client_id,
1413 wsrep::client_state*>& vt) const
1414 {
1415 return vt.second->transaction().state() == state_;
1416 }
1417 private:
1418 enum wsrep::transaction::state state_;
1419 };
1420
close_orphaned_sr_transactions(wsrep::unique_lock<wsrep::mutex> & lock,wsrep::high_priority_service & high_priority_service)1421 void wsrep::server_state::close_orphaned_sr_transactions(
1422 wsrep::unique_lock<wsrep::mutex>& lock,
1423 wsrep::high_priority_service& high_priority_service)
1424 {
1425 assert(lock.owns_lock());
1426
1427 // When the originator of an SR transaction leaves the primary
1428 // component of the cluster, that SR must be rolled back. When two
1429 // consecutive primary views have the same membership, the system
1430 // may have been in a state with no primary components.
1431 // Example with 2 node cluster:
1432 // - (1,2 primary)
1433 // - (1 non-primary) and (2 non-primary)
1434 // - (1,2 primary)
1435 // We need to rollback SRs owned by both 1 and 2.
1436 // Notice that since the introduction of rollback_event_queue_,
1437 // checking for equal consecutive views is no longer needed.
1438 // However, we must keep it here for the time being, for backwards
1439 // compatibility.
1440 const bool equal_consecutive_views =
1441 current_view_.equal_membership(previous_primary_view_);
1442
1443 if (current_view_.own_index() == -1 || equal_consecutive_views)
1444 {
1445 streaming_clients_map::iterator i;
1446 transaction_state_cmp prepared_state_cmp(wsrep::transaction::s_prepared);
1447 while ((i = std::find_if_not(streaming_clients_.begin(),
1448 streaming_clients_.end(),
1449 prepared_state_cmp))
1450 != streaming_clients_.end())
1451 {
1452 wsrep::client_id client_id(i->first);
1453 wsrep::transaction_id transaction_id(i->second->transaction().id());
1454 // It is safe to unlock the server state temporarily here.
1455 // The processing happens inside view handler which is
1456 // protected by the provider commit ordering critical
1457 // section. The lock must be unlocked temporarily to
1458 // allow converting the current client to streaming
1459 // applier in transaction::streaming_rollback().
1460 // The iterator i may be invalidated when the server state
1461 // remains unlocked, so it should not be accessed after
1462 // the bf abort call.
1463 lock.unlock();
1464 i->second->total_order_bf_abort(current_view_.view_seqno());
1465 lock.lock();
1466 streaming_clients_map::const_iterator found_i;
1467 while ((found_i = streaming_clients_.find(client_id)) !=
1468 streaming_clients_.end() &&
1469 found_i->second->transaction().id() == transaction_id)
1470 {
1471 cond_.wait(lock);
1472 }
1473 }
1474 }
1475
1476 streaming_appliers_map::iterator i(streaming_appliers_.begin());
1477 while (i != streaming_appliers_.end())
1478 {
1479 wsrep::high_priority_service* streaming_applier(i->second);
1480
1481 // Rollback SR on equal consecutive primary views or if its
1482 // originator is not in the current view.
1483 // Transactions in prepared state must be committed or
1484 // rolled back explicitly, those are never rolled back here.
1485 if ((streaming_applier->transaction().state() !=
1486 wsrep::transaction::s_prepared) &&
1487 (equal_consecutive_views ||
1488 not current_view_.is_member(
1489 streaming_applier->transaction().server_id())))
1490 {
1491 WSREP_LOG_DEBUG(wsrep::log::debug_log_level(),
1492 wsrep::log::debug_level_server_state,
1493 "Removing SR fragments for "
1494 << i->first.first
1495 << ", " << i->first.second);
1496 wsrep::id server_id(i->first.first);
1497 wsrep::transaction_id transaction_id(i->first.second);
1498 int adopt_error;
1499 if ((adopt_error = high_priority_service.adopt_transaction(
1500 streaming_applier->transaction())))
1501 {
1502 log_adopt_error(streaming_applier->transaction());
1503 }
1504 // Even if the transaction adopt above fails, we roll back
1505 // the transaction. Adopt error will leave stale entries
1506 // in the streaming log which can be removed manually.
1507 {
1508 wsrep::high_priority_switch sw(high_priority_service,
1509 *streaming_applier);
1510 streaming_applier->rollback(
1511 wsrep::ws_handle(), wsrep::ws_meta());
1512 streaming_applier->after_apply();
1513 }
1514
1515 streaming_appliers_.erase(i++);
1516 server_service_.release_high_priority_service(streaming_applier);
1517 high_priority_service.store_globals();
1518 wsrep::ws_meta ws_meta(
1519 wsrep::gtid(),
1520 wsrep::stid(server_id, transaction_id, wsrep::client_id()),
1521 wsrep::seqno::undefined(), 0);
1522 lock.unlock();
1523 if (adopt_error == 0)
1524 {
1525 high_priority_service.remove_fragments(ws_meta);
1526 high_priority_service.commit(wsrep::ws_handle(transaction_id, 0),
1527 ws_meta);
1528 }
1529 high_priority_service.after_apply();
1530 lock.lock();
1531 }
1532 else
1533 {
1534 ++i;
1535 }
1536 }
1537 }
1538
close_transactions_at_disconnect(wsrep::high_priority_service & high_priority_service)1539 void wsrep::server_state::close_transactions_at_disconnect(
1540 wsrep::high_priority_service& high_priority_service)
1541 {
1542 // Close streaming applier without removing fragments
1543 // from fragment storage. When the server is started again,
1544 // it must be able to recover ongoing streaming transactions.
1545 streaming_appliers_map::iterator i(streaming_appliers_.begin());
1546 while (i != streaming_appliers_.end())
1547 {
1548 wsrep::high_priority_service* streaming_applier(i->second);
1549 {
1550 wsrep::high_priority_switch sw(high_priority_service,
1551 *streaming_applier);
1552 streaming_applier->rollback(
1553 wsrep::ws_handle(), wsrep::ws_meta());
1554 streaming_applier->after_apply();
1555 }
1556 streaming_appliers_.erase(i++);
1557 server_service_.release_high_priority_service(streaming_applier);
1558 high_priority_service.store_globals();
1559 }
1560 streaming_appliers_recovered_ = false;
1561 }
1562
1563 //
1564 // Rollback event queue
1565 //
1566
queue_rollback_event(const wsrep::transaction_id & id)1567 void wsrep::server_state::queue_rollback_event(
1568 const wsrep::transaction_id& id)
1569 {
1570 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1571 #ifndef NDEBUG
1572 // Make sure we don't have duplicate
1573 // transaction ids in rollback event queue.
1574 // There is no need to do this in release
1575 // build given that caller (streaming_rollback())
1576 // should avoid duplicates.
1577 for (auto i : rollback_event_queue_)
1578 {
1579 assert(id != i);
1580 }
1581 #endif
1582 rollback_event_queue_.push_back(id);
1583 }
1584
1585 enum wsrep::provider::status
send_pending_rollback_events(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED)1586 wsrep::server_state::send_pending_rollback_events(
1587 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
1588 {
1589 assert(lock.owns_lock());
1590 while (not rollback_event_queue_.empty())
1591 {
1592 const wsrep::transaction_id& id(rollback_event_queue_.front());
1593 const enum wsrep::provider::status status(provider().rollback(id));
1594 if (status)
1595 {
1596 return status;
1597 }
1598 rollback_event_queue_.pop_front();
1599 }
1600 return wsrep::provider::success;
1601 }
1602
1603 enum wsrep::provider::status
send_pending_rollback_events()1604 wsrep::server_state::send_pending_rollback_events()
1605 {
1606 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
1607 return send_pending_rollback_events(lock);
1608 }
1609