1 /*
2 * Copyright (C) 2018 Codership Oy <info@codership.com>
3 *
4 * This file is part of wsrep-lib.
5 *
6 * Wsrep-lib is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * Wsrep-lib is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20 #include "wsrep/transaction.hpp"
21 #include "wsrep/client_state.hpp"
22 #include "wsrep/server_state.hpp"
23 #include "wsrep/storage_service.hpp"
24 #include "wsrep/high_priority_service.hpp"
25 #include "wsrep/key.hpp"
26 #include "wsrep/logger.hpp"
27 #include "wsrep/compiler.hpp"
28
29 #include <sstream>
30 #include <memory>
31
32 namespace
33 {
34 class storage_service_deleter
35 {
36 public:
storage_service_deleter(wsrep::server_service & server_service)37 storage_service_deleter(wsrep::server_service& server_service)
38 : server_service_(server_service)
39 { }
operator ()(wsrep::storage_service * storage_service)40 void operator()(wsrep::storage_service* storage_service)
41 {
42 server_service_.release_storage_service(storage_service);
43 }
44 private:
45 wsrep::server_service& server_service_;
46 };
47
48 template <class D>
49 class scoped_storage_service
50 {
51 public:
scoped_storage_service(wsrep::client_service & client_service,wsrep::storage_service * storage_service,D deleter)52 scoped_storage_service(wsrep::client_service& client_service,
53 wsrep::storage_service* storage_service,
54 D deleter)
55 : client_service_(client_service)
56 , storage_service_(storage_service)
57 , deleter_(deleter)
58 {
59 if (storage_service_ == 0)
60 {
61 throw wsrep::runtime_error("Null client_state provided");
62 }
63 client_service_.reset_globals();
64 storage_service_->store_globals();
65 }
66
storage_service()67 wsrep::storage_service& storage_service()
68 {
69 return *storage_service_;
70 }
71
~scoped_storage_service()72 ~scoped_storage_service()
73 {
74 deleter_(storage_service_);
75 client_service_.store_globals();
76 }
77 private:
78 scoped_storage_service(const scoped_storage_service&);
79 scoped_storage_service& operator=(const scoped_storage_service&);
80 wsrep::client_service& client_service_;
81 wsrep::storage_service* storage_service_;
82 D deleter_;
83 };
84 }
85
86 // Public
87
transaction(wsrep::client_state & client_state)88 wsrep::transaction::transaction(
89 wsrep::client_state& client_state)
90 : server_service_(client_state.server_state().server_service())
91 , client_service_(client_state.client_service())
92 , client_state_(client_state)
93 , server_id_()
94 , id_(transaction_id::undefined())
95 , state_(s_executing)
96 , state_hist_()
97 , bf_abort_state_(s_executing)
98 , bf_abort_provider_status_()
99 , bf_abort_client_state_()
100 , bf_aborted_in_total_order_()
101 , ws_handle_()
102 , ws_meta_()
103 , flags_()
104 , implicit_deps_(false)
105 , certified_(false)
106 , fragments_certified_for_statement_()
107 , streaming_context_()
108 , sr_keys_()
109 , apply_error_buf_()
110 , xid_()
111 , streaming_rollback_in_progress_(false)
112 { }
113
114
~transaction()115 wsrep::transaction::~transaction()
116 {
117 }
118
start_transaction(const wsrep::transaction_id & id)119 int wsrep::transaction::start_transaction(
120 const wsrep::transaction_id& id)
121 {
122 debug_log_state("start_transaction enter");
123 assert(active() == false);
124 assert(is_xa() == false);
125 assert(flags() == 0);
126 server_id_ = client_state_.server_state().id();
127 id_ = id;
128 state_ = s_executing;
129 state_hist_.clear();
130 ws_handle_ = wsrep::ws_handle(id);
131 flags(wsrep::provider::flag::start_transaction);
132 switch (client_state_.mode())
133 {
134 case wsrep::client_state::m_high_priority:
135 debug_log_state("start_transaction success");
136 return 0;
137 case wsrep::client_state::m_local:
138 debug_log_state("start_transaction success");
139 return provider().start_transaction(ws_handle_);
140 default:
141 debug_log_state("start_transaction error");
142 assert(0);
143 return 1;
144 }
145 }
146
start_transaction(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)147 int wsrep::transaction::start_transaction(
148 const wsrep::ws_handle& ws_handle,
149 const wsrep::ws_meta& ws_meta)
150 {
151 debug_log_state("start_transaction enter");
152 if (state() != s_replaying)
153 {
154 // assert(ws_meta.flags());
155 assert(active() == false);
156 assert(flags() == 0);
157 server_id_ = ws_meta.server_id();
158 id_ = ws_meta.transaction_id();
159 assert(client_state_.mode() == wsrep::client_state::m_high_priority);
160 state_ = s_executing;
161 state_hist_.clear();
162 ws_handle_ = ws_handle;
163 ws_meta_ = ws_meta;
164 flags(wsrep::provider::flag::start_transaction);
165 certified_ = true;
166 }
167 else
168 {
169 ws_meta_ = ws_meta;
170 assert(ws_meta_.flags() & wsrep::provider::flag::commit);
171 assert(active());
172 assert(client_state_.mode() == wsrep::client_state::m_high_priority);
173 assert(state() == s_replaying);
174 assert(ws_meta_.seqno().is_undefined() == false);
175 certified_ = true;
176 }
177 debug_log_state("start_transaction leave");
178 return 0;
179 }
180
next_fragment(const wsrep::ws_meta & ws_meta)181 int wsrep::transaction::next_fragment(
182 const wsrep::ws_meta& ws_meta)
183 {
184 debug_log_state("next_fragment enter");
185 ws_meta_ = ws_meta;
186 debug_log_state("next_fragment leave");
187 return 0;
188 }
189
adopt(const wsrep::transaction & transaction)190 void wsrep::transaction::adopt(const wsrep::transaction& transaction)
191 {
192 debug_log_state("adopt enter");
193 assert(transaction.is_streaming());
194 start_transaction(transaction.id());
195 server_id_ = transaction.server_id_;
196 flags_ = transaction.flags();
197 streaming_context_ = transaction.streaming_context();
198 debug_log_state("adopt leave");
199 }
200
fragment_applied(wsrep::seqno seqno)201 void wsrep::transaction::fragment_applied(wsrep::seqno seqno)
202 {
203 assert(active());
204 streaming_context_.applied(seqno);
205 }
206
prepare_for_ordering(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,bool is_commit)207 int wsrep::transaction::prepare_for_ordering(
208 const wsrep::ws_handle& ws_handle,
209 const wsrep::ws_meta& ws_meta,
210 bool is_commit)
211 {
212 assert(active());
213
214 if (state_ != s_replaying)
215 {
216 ws_handle_ = ws_handle;
217 ws_meta_ = ws_meta;
218 certified_ = is_commit;
219 }
220 return 0;
221 }
222
assign_read_view(const wsrep::gtid * const gtid)223 int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid)
224 {
225 try
226 {
227 return provider().assign_read_view(ws_handle_, gtid);
228 }
229 catch (...)
230 {
231 wsrep::log_error() << "Failed to assign read view";
232 return 1;
233 }
234 }
235
append_key(const wsrep::key & key)236 int wsrep::transaction::append_key(const wsrep::key& key)
237 {
238 assert(active());
239 try
240 {
241 debug_log_key_append(key);
242 sr_keys_.insert(key);
243 return provider().append_key(ws_handle_, key);
244 }
245 catch (...)
246 {
247 wsrep::log_error() << "Failed to append key";
248 return 1;
249 }
250 }
251
append_data(const wsrep::const_buffer & data)252 int wsrep::transaction::append_data(const wsrep::const_buffer& data)
253 {
254 assert(active());
255 return provider().append_data(ws_handle_, data);
256 }
257
after_row()258 int wsrep::transaction::after_row()
259 {
260 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
261 debug_log_state("after_row_enter");
262 int ret(0);
263 if (streaming_context_.fragment_size() &&
264 streaming_context_.fragment_unit() != streaming_context::statement)
265 {
266 ret = streaming_step(lock);
267 }
268 debug_log_state("after_row_leave");
269 return ret;
270 }
271
before_prepare(wsrep::unique_lock<wsrep::mutex> & lock)272 int wsrep::transaction::before_prepare(
273 wsrep::unique_lock<wsrep::mutex>& lock)
274 {
275 assert(lock.owns_lock());
276 int ret(0);
277 debug_log_state("before_prepare_enter");
278 assert(state() == s_executing || state() == s_must_abort ||
279 state() == s_replaying);
280
281 if (state() == s_must_abort)
282 {
283 assert(client_state_.mode() == wsrep::client_state::m_local);
284 client_state_.override_error(wsrep::e_deadlock_error);
285 return 1;
286 }
287
288 switch (client_state_.mode())
289 {
290 case wsrep::client_state::m_local:
291 if (is_streaming())
292 {
293 client_service_.debug_crash(
294 "crash_last_fragment_commit_before_fragment_removal");
295 lock.unlock();
296 if (client_service_.statement_allowed_for_streaming() == false)
297 {
298 client_state_.override_error(
299 wsrep::e_error_during_commit,
300 wsrep::provider::error_not_allowed);
301 ret = 1;
302 }
303 else if (!is_xa())
304 {
305 // Note: we can't remove fragments here for XA,
306 // the transaction has already issued XA END and
307 // is in IDLE state, no more changes allowed!
308 ret = client_service_.remove_fragments();
309 if (ret)
310 {
311 client_state_.override_error(wsrep::e_deadlock_error);
312 }
313 }
314 lock.lock();
315 client_service_.debug_crash(
316 "crash_last_fragment_commit_after_fragment_removal");
317 if (state() == s_must_abort)
318 {
319 client_state_.override_error(wsrep::e_deadlock_error);
320 ret = 1;
321 }
322 }
323
324 if (ret == 0)
325 {
326 if (is_xa())
327 {
328 // Force fragment replication on XA prepare
329 flags(flags() | wsrep::provider::flag::prepare);
330 pa_unsafe(true);
331 append_sr_keys_for_commit();
332 const bool force_streaming_step = true;
333 ret = streaming_step(lock, force_streaming_step);
334 if (ret == 0)
335 {
336 assert(state() == s_executing);
337 state(lock, s_preparing);
338 }
339 }
340 else
341 {
342 ret = certify_commit(lock);
343 }
344
345 assert((ret == 0 && state() == s_preparing) ||
346 (state() == s_must_abort ||
347 state() == s_must_replay ||
348 state() == s_cert_failed));
349
350 if (ret)
351 {
352 assert(state() == s_must_replay ||
353 client_state_.current_error());
354 ret = 1;
355 }
356 }
357
358 break;
359 case wsrep::client_state::m_high_priority:
360 // Note: fragment removal is done from applying
361 // context for high priority mode.
362 if (is_xa())
363 {
364 assert(state() == s_executing ||
365 state() == s_replaying);
366 if (state() == s_replaying)
367 {
368 break;
369 }
370 }
371 state(lock, s_preparing);
372 break;
373 default:
374 assert(0);
375 break;
376 }
377
378 assert(state() == s_preparing ||
379 (is_xa() && state() == s_replaying) ||
380 (ret && (state() == s_must_abort ||
381 state() == s_must_replay ||
382 state() == s_cert_failed ||
383 state() == s_aborted)));
384 debug_log_state("before_prepare_leave");
385 return ret;
386 }
387
after_prepare(wsrep::unique_lock<wsrep::mutex> & lock)388 int wsrep::transaction::after_prepare(
389 wsrep::unique_lock<wsrep::mutex>& lock)
390 {
391 assert(lock.owns_lock());
392
393 int ret = 0;
394 debug_log_state("after_prepare_enter");
395 if (is_xa())
396 {
397 switch (state())
398 {
399 case s_preparing:
400 assert(client_state_.mode() == wsrep::client_state::m_local ||
401 (certified() && ordered()));
402 state(lock, s_prepared);
403 break;
404 case s_must_abort:
405 // prepared locally, but client has not received
406 // a result yet. We can still abort.
407 assert(client_state_.mode() == wsrep::client_state::m_local);
408 client_state_.override_error(wsrep::e_deadlock_error);
409 ret = 1;
410 break;
411 case s_replaying:
412 assert(client_state_.mode() == wsrep::client_state::m_high_priority);
413 break;
414 default:
415 assert(0);
416 }
417 }
418 else
419 {
420 assert(certified() && ordered());
421 assert(state() == s_preparing || state() == s_must_abort);
422
423 if (state() == s_must_abort)
424 {
425 assert(client_state_.mode() == wsrep::client_state::m_local);
426 state(lock, s_must_replay);
427 ret = 1;
428 }
429 else
430 {
431 state(lock, s_committing);
432 }
433 }
434 debug_log_state("after_prepare_leave");
435 return ret;
436 }
437
before_commit()438 int wsrep::transaction::before_commit()
439 {
440 int ret(1);
441
442 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
443 debug_log_state("before_commit_enter");
444 assert(client_state_.mode() != wsrep::client_state::m_toi);
445 assert(state() == s_executing ||
446 state() == s_prepared ||
447 state() == s_committing ||
448 state() == s_must_abort ||
449 state() == s_replaying);
450 assert((state() != s_committing && state() != s_replaying) ||
451 certified());
452
453 switch (client_state_.mode())
454 {
455 case wsrep::client_state::m_local:
456 if (state() == s_executing)
457 {
458 ret = before_prepare(lock) || after_prepare(lock);
459 assert((ret == 0 &&
460 (state() == s_committing || state() == s_prepared))
461 ||
462 (state() == s_must_abort ||
463 state() == s_must_replay ||
464 state() == s_cert_failed ||
465 state() == s_aborted));
466 }
467 else if (state() != s_committing && state() != s_prepared)
468 {
469 assert(state() == s_must_abort);
470 if (certified() ||
471 (is_xa() && is_streaming()))
472 {
473 state(lock, s_must_replay);
474 }
475 else
476 {
477 client_state_.override_error(wsrep::e_deadlock_error);
478 }
479 }
480 else
481 {
482 // 2PC commit, prepare was done before
483 ret = 0;
484 }
485
486 if (ret == 0 && state() == s_prepared)
487 {
488 ret = certify_commit(lock);
489 assert((ret == 0 && state() == s_committing) ||
490 (state() == s_must_abort ||
491 state() == s_must_replay ||
492 state() == s_cert_failed ||
493 state() == s_prepared));
494 }
495
496 if (ret == 0)
497 {
498 assert(certified());
499 assert(ordered());
500 lock.unlock();
501 client_service_.debug_sync("wsrep_before_commit_order_enter");
502 enum wsrep::provider::status
503 status(provider().commit_order_enter(ws_handle_, ws_meta_));
504 lock.lock();
505 switch (status)
506 {
507 case wsrep::provider::success:
508 break;
509 case wsrep::provider::error_bf_abort:
510 if (state() != s_must_abort)
511 {
512 state(lock, s_must_abort);
513 }
514 state(lock, s_must_replay);
515 ret = 1;
516 break;
517 default:
518 ret = 1;
519 assert(0);
520 break;
521 }
522 }
523 break;
524 case wsrep::client_state::m_high_priority:
525 assert(certified());
526 assert(ordered());
527 if (is_xa())
528 {
529 assert(state() == s_prepared ||
530 state() == s_replaying);
531 state(lock, s_committing);
532 ret = 0;
533 }
534 else if (state() == s_executing || state() == s_replaying)
535 {
536 ret = before_prepare(lock) || after_prepare(lock);
537 }
538 else
539 {
540 ret = 0;
541 }
542 lock.unlock();
543 ret = ret || provider().commit_order_enter(ws_handle_, ws_meta_);
544 lock.lock();
545 if (ret)
546 {
547 state(lock, s_must_abort);
548 state(lock, s_aborting);
549 }
550 break;
551 default:
552 assert(0);
553 break;
554 }
555 debug_log_state("before_commit_leave");
556 return ret;
557 }
558
ordered_commit()559 int wsrep::transaction::ordered_commit()
560 {
561 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
562 debug_log_state("ordered_commit_enter");
563 assert(state() == s_committing);
564 assert(ordered());
565 client_service_.debug_sync("wsrep_before_commit_order_leave");
566 int ret(provider().commit_order_leave(ws_handle_, ws_meta_,
567 apply_error_buf_));
568 client_service_.debug_sync("wsrep_after_commit_order_leave");
569 // Should always succeed:
570 // 1) If before commit before succeeds, the transaction handle
571 // in the provider is guaranteed to exist and the commit
572 // has been ordered
573 // 2) The transaction which has been ordered for commit cannot be BF
574 // aborted anymore
575 // 3) The provider should always guarantee that the transactions which
576 // have been ordered for commit can finish committing.
577 //
578 // The exception here is a storage service transaction which is running
579 // in high priority mode. The fragment storage commit may get BF
580 // aborted in the provider after commit ordering has been
581 // established since the transaction is operating in streaming
582 // mode.
583 if (ret)
584 {
585 assert(client_state_.mode() == wsrep::client_state::m_high_priority);
586 state(lock, s_must_abort);
587 state(lock, s_aborting);
588 }
589 else
590 {
591 state(lock, s_ordered_commit);
592 }
593 debug_log_state("ordered_commit_leave");
594 return ret;
595 }
596
after_commit()597 int wsrep::transaction::after_commit()
598 {
599 int ret(0);
600
601 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
602 debug_log_state("after_commit_enter");
603 assert(state() == s_ordered_commit);
604
605 if (is_streaming())
606 {
607 assert(client_state_.mode() == wsrep::client_state::m_local ||
608 client_state_.mode() == wsrep::client_state::m_high_priority);
609
610 if (is_xa())
611 {
612 // XA fragment removal happens here,
613 // see comment in before_prepare
614 lock.unlock();
615 scoped_storage_service<storage_service_deleter>
616 sr_scope(
617 client_service_,
618 server_service_.storage_service(client_service_),
619 storage_service_deleter(server_service_));
620 wsrep::storage_service& storage_service(
621 sr_scope.storage_service());
622 storage_service.adopt_transaction(*this);
623 storage_service.remove_fragments();
624 storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
625 lock.lock();
626 }
627
628 if (client_state_.mode() == wsrep::client_state::m_local)
629 {
630 lock.unlock();
631 client_state_.server_state_.stop_streaming_client(&client_state_);
632 lock.lock();
633 }
634 streaming_context_.cleanup();
635 }
636
637 switch (client_state_.mode())
638 {
639 case wsrep::client_state::m_local:
640 ret = provider().release(ws_handle_);
641 break;
642 case wsrep::client_state::m_high_priority:
643 break;
644 default:
645 assert(0);
646 break;
647 }
648 assert(ret == 0);
649 state(lock, s_committed);
650
651 debug_log_state("after_commit_leave");
652 return ret;
653 }
654
before_rollback()655 int wsrep::transaction::before_rollback()
656 {
657 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
658 debug_log_state("before_rollback_enter");
659 assert(state() == s_executing ||
660 state() == s_preparing ||
661 state() == s_prepared ||
662 state() == s_must_abort ||
663 // Background rollbacker or rollback initiated from SE
664 state() == s_aborting ||
665 state() == s_cert_failed ||
666 state() == s_must_replay);
667
668 switch (client_state_.mode())
669 {
670 case wsrep::client_state::m_local:
671 if (is_streaming())
672 {
673 client_service_.debug_sync("wsrep_before_SR_rollback");
674 }
675 switch (state())
676 {
677 case s_preparing:
678 // Error detected during prepare phase
679 state(lock, s_must_abort);
680 WSREP_FALLTHROUGH;
681 case s_prepared:
682 WSREP_FALLTHROUGH;
683 case s_executing:
684 // Voluntary rollback
685 if (is_streaming())
686 {
687 streaming_rollback(lock);
688 }
689 state(lock, s_aborting);
690 break;
691 case s_must_abort:
692 if (certified())
693 {
694 state(lock, s_must_replay);
695 }
696 else
697 {
698 if (is_streaming())
699 {
700 streaming_rollback(lock);
701 }
702 state(lock, s_aborting);
703 }
704 break;
705 case s_cert_failed:
706 if (is_streaming())
707 {
708 streaming_rollback(lock);
709 }
710 state(lock, s_aborting);
711 break;
712 case s_aborting:
713 if (is_streaming())
714 {
715 streaming_rollback(lock);
716 }
717 break;
718 case s_must_replay:
719 break;
720 default:
721 assert(0);
722 break;
723 }
724 break;
725 case wsrep::client_state::m_high_priority:
726 // Rollback by rollback write set or BF abort
727 assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting);
728 if (state_ != s_aborting)
729 {
730 state(lock, s_aborting);
731 }
732 break;
733 default:
734 assert(0);
735 break;
736 }
737
738 debug_log_state("before_rollback_leave");
739 return 0;
740 }
741
after_rollback()742 int wsrep::transaction::after_rollback()
743 {
744 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
745 debug_log_state("after_rollback_enter");
746 assert(state() == s_aborting ||
747 state() == s_must_replay);
748
749 if (is_streaming() && bf_aborted_in_total_order_)
750 {
751 lock.unlock();
752 // Storage service scope
753 {
754 scoped_storage_service<storage_service_deleter>
755 sr_scope(
756 client_service_,
757 server_service_.storage_service(client_service_),
758 storage_service_deleter(server_service_));
759 wsrep::storage_service& storage_service(
760 sr_scope.storage_service());
761 storage_service.adopt_transaction(*this);
762 storage_service.remove_fragments();
763 storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
764 }
765 lock.lock();
766 streaming_context_.cleanup();
767 }
768
769 if (is_streaming() && state() != s_must_replay)
770 {
771 streaming_context_.cleanup();
772 }
773
774 if (state() == s_aborting)
775 {
776 state(lock, s_aborted);
777 }
778
779 // Releasing the transaction from provider is postponed into
780 // after_statement() hook. Depending on DBMS system all the
781 // resources acquired by transaction may or may not be released
782 // during actual rollback. If the transaction has been ordered,
783 // releasing the commit ordering critical section should be
784 // also postponed until all resources have been released.
785 debug_log_state("after_rollback_leave");
786 return 0;
787 }
788
release_commit_order(wsrep::unique_lock<wsrep::mutex> & lock)789 int wsrep::transaction::release_commit_order(
790 wsrep::unique_lock<wsrep::mutex>& lock)
791 {
792 lock.unlock();
793 int ret(provider().commit_order_enter(ws_handle_, ws_meta_));
794 lock.lock();
795 if (!ret)
796 {
797 server_service_.set_position(client_service_, ws_meta_.gtid());
798 ret = provider().commit_order_leave(ws_handle_, ws_meta_,
799 apply_error_buf_);
800 }
801 return ret;
802 }
803
after_statement()804 int wsrep::transaction::after_statement()
805 {
806 int ret(0);
807 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
808 debug_log_state("after_statement_enter");
809 assert(client_state_.mode() == wsrep::client_state::m_local);
810 assert(state() == s_executing ||
811 state() == s_prepared ||
812 state() == s_committed ||
813 state() == s_aborted ||
814 state() == s_must_abort ||
815 state() == s_cert_failed ||
816 state() == s_must_replay);
817
818 if (state() == s_executing &&
819 streaming_context_.fragment_size() &&
820 streaming_context_.fragment_unit() == streaming_context::statement)
821 {
822 ret = streaming_step(lock);
823 }
824
825 switch (state())
826 {
827 case s_executing:
828 // ?
829 break;
830 case s_prepared:
831 assert(is_xa());
832 break;
833 case s_committed:
834 assert(is_streaming() == false);
835 break;
836 case s_must_abort:
837 case s_cert_failed:
838 // Error may be set already. For example, if fragment size
839 // exceeded the maximum size in certify_fragment(), then
840 // we already have wsrep::e_error_during_commit
841 if (client_state_.current_error() == wsrep::e_success)
842 {
843 client_state_.override_error(wsrep::e_deadlock_error);
844 }
845 lock.unlock();
846 ret = client_service_.bf_rollback();
847 lock.lock();
848 if (state() != s_must_replay)
849 {
850 break;
851 }
852 // Continue to replay if rollback() changed the state to s_must_replay
853 WSREP_FALLTHROUGH;
854 case s_must_replay:
855 {
856 if (is_xa() && !ordered())
857 {
858 ret = xa_replay_commit(lock);
859 }
860 else
861 {
862 ret = replay(lock);
863 }
864 break;
865 }
866 case s_aborted:
867 // Raise a deadlock error if the transaction was BF aborted and
868 // rolled back by client outside of transaction hooks.
869 if (bf_aborted() && client_state_.current_error() == wsrep::e_success &&
870 !client_service_.is_xa_rollback())
871 {
872 client_state_.override_error(wsrep::e_deadlock_error);
873 }
874 break;
875 default:
876 assert(0);
877 break;
878 }
879
880 assert(state() == s_executing ||
881 state() == s_prepared ||
882 state() == s_committed ||
883 state() == s_aborted ||
884 state() == s_must_replay);
885
886 if (state() == s_aborted)
887 {
888 if (ordered())
889 {
890 ret = release_commit_order(lock);
891 }
892 lock.unlock();
893 provider().release(ws_handle_);
894 lock.lock();
895 }
896
897 if (state() != s_executing &&
898 (!client_service_.is_explicit_xa() ||
899 client_state_.state() == wsrep::client_state::s_quitting))
900 {
901 cleanup();
902 }
903 fragments_certified_for_statement_ = 0;
904 debug_log_state("after_statement_leave");
905 assert(ret == 0 || state() == s_aborted);
906 return ret;
907 }
908
after_command_must_abort(wsrep::unique_lock<wsrep::mutex> & lock)909 void wsrep::transaction::after_command_must_abort(
910 wsrep::unique_lock<wsrep::mutex>& lock)
911 {
912 debug_log_state("after_command_must_abort enter");
913 assert(active());
914 assert(state_ == s_must_abort);
915
916 if (is_xa() && is_streaming())
917 {
918 state(lock, s_must_replay);
919 }
920
921 lock.unlock();
922 client_service_.bf_rollback();
923 lock.lock();
924
925 if (is_xa() && is_streaming())
926 {
927 xa_replay(lock);
928 }
929 else
930 {
931 client_state_.override_error(wsrep::e_deadlock_error);
932 }
933
934 debug_log_state("after_command_must_abort leave");
935 }
936
after_applying()937 void wsrep::transaction::after_applying()
938 {
939 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
940 debug_log_state("after_applying enter");
941 assert(state_ == s_executing ||
942 state_ == s_prepared ||
943 state_ == s_committed ||
944 state_ == s_aborted ||
945 state_ == s_replaying);
946
947 if (state_ != s_executing && state_ != s_prepared)
948 {
949 if (state_ == s_replaying) state(lock, s_aborted);
950 cleanup();
951 }
952 else
953 {
954 // State remains executing or prepared, so this is a streaming applier.
955 // Reset the meta data to avoid releasing commit order
956 // critical section above if the next fragment is rollback
957 // fragment. Rollback fragment ordering will be handled by
958 // another instance while removing the fragments from
959 // storage.
960 ws_meta_ = wsrep::ws_meta();
961 }
962 debug_log_state("after_applying leave");
963 }
964
bf_abort(wsrep::unique_lock<wsrep::mutex> & lock,wsrep::seqno bf_seqno)965 bool wsrep::transaction::bf_abort(
966 wsrep::unique_lock<wsrep::mutex>& lock,
967 wsrep::seqno bf_seqno)
968 {
969 bool ret(false);
970 const enum wsrep::transaction::state state_at_enter(state());
971 assert(lock.owns_lock());
972
973 if (active() == false)
974 {
975 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
976 wsrep::log::debug_level_transaction,
977 "Transaction not active, skipping bf abort");
978 }
979 else
980 {
981 switch (state_at_enter)
982 {
983 case s_executing:
984 case s_preparing:
985 case s_prepared:
986 case s_certifying:
987 case s_committing:
988 {
989 wsrep::seqno victim_seqno;
990 enum wsrep::provider::status
991 status(client_state_.provider().bf_abort(
992 bf_seqno, id_, victim_seqno));
993 switch (status)
994 {
995 case wsrep::provider::success:
996 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
997 wsrep::log::debug_level_transaction,
998 "Seqno " << bf_seqno
999 << " successfully BF aborted " << id_
1000 << " victim_seqno " << victim_seqno);
1001 bf_abort_state_ = state_at_enter;
1002 state(lock, s_must_abort);
1003 ret = true;
1004 break;
1005 default:
1006 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
1007 wsrep::log::debug_level_transaction,
1008 "Seqno " << bf_seqno
1009 << " failed to BF abort " << id_
1010 << " with status " << status
1011 << " victim_seqno " << victim_seqno);
1012 break;
1013 }
1014 break;
1015 }
1016 default:
1017 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
1018 wsrep::log::debug_level_transaction,
1019 "BF abort not allowed in state "
1020 << wsrep::to_string(state_at_enter));
1021 break;
1022 }
1023 }
1024
1025 if (ret)
1026 {
1027 bf_abort_client_state_ = client_state_.state();
1028 // If the transaction is in executing state, we must initiate
1029 // streaming rollback to ensure that the rollback fragment gets
1030 // replicated before the victim starts to roll back and release locks.
1031 // In other states the BF abort will be detected outside of
1032 // storage engine operations and streaming rollback will be
1033 // handled from before_rollback() call.
1034 if (client_state_.mode() == wsrep::client_state::m_local &&
1035 is_streaming() && state_at_enter == s_executing)
1036 {
1037 streaming_rollback(lock);
1038 }
1039
1040 if ((client_state_.state() == wsrep::client_state::s_idle &&
1041 client_state_.server_state().rollback_mode() ==
1042 wsrep::server_state::rm_sync) // locally processing idle
1043 ||
1044 // high priority streaming
1045 (client_state_.mode() == wsrep::client_state::m_high_priority &&
1046 is_streaming()))
1047 {
1048 // We need to change the state to aborting under the
1049 // lock protection to avoid a race between client thread,
1050 // otherwise it could happen that the client gains control
1051 // between releasing the lock and before background
1052 // rollbacker gets control.
1053 if (is_xa() && state_at_enter == s_prepared)
1054 {
1055 state(lock, s_must_replay);
1056 client_state_.set_rollbacker_active(true);
1057 }
1058 else
1059 {
1060 state(lock, s_aborting);
1061 client_state_.set_rollbacker_active(true);
1062 if (client_state_.mode() == wsrep::client_state::m_high_priority)
1063 {
1064 lock.unlock();
1065 client_state_.server_state().stop_streaming_applier(
1066 server_id_, id_);
1067 lock.lock();
1068 }
1069 }
1070
1071 lock.unlock();
1072 server_service_.background_rollback(client_state_);
1073 }
1074 }
1075 return ret;
1076 }
1077
total_order_bf_abort(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED,wsrep::seqno bf_seqno)1078 bool wsrep::transaction::total_order_bf_abort(
1079 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
1080 wsrep::seqno bf_seqno)
1081 {
1082 bool ret(bf_abort(lock, bf_seqno));
1083 if (ret)
1084 {
1085 bf_aborted_in_total_order_ = true;
1086 }
1087 return ret;
1088 }
1089
clone_for_replay(const wsrep::transaction & other)1090 void wsrep::transaction::clone_for_replay(const wsrep::transaction& other)
1091 {
1092 assert(other.state() == s_replaying);
1093 id_ = other.id_;
1094 xid_ = other.xid_;
1095 server_id_ = other.server_id_;
1096 ws_handle_ = other.ws_handle_;
1097 ws_meta_ = other.ws_meta_;
1098 streaming_context_ = other.streaming_context_;
1099 state_ = s_replaying;
1100 }
1101
assign_xid(const wsrep::xid & xid)1102 void wsrep::transaction::assign_xid(const wsrep::xid& xid)
1103 {
1104 assert(active());
1105 assert(!is_xa());
1106 xid_ = xid;
1107 }
1108
restore_to_prepared_state(const wsrep::xid & xid)1109 int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid)
1110 {
1111 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
1112 assert(active());
1113 assert(is_empty());
1114 assert(state() == s_executing || state() == s_replaying);
1115 flags(flags() & ~wsrep::provider::flag::start_transaction);
1116 if (state() == s_executing)
1117 {
1118 state(lock, s_certifying);
1119 }
1120 else
1121 {
1122 state(lock, s_preparing);
1123 }
1124 state(lock, s_prepared);
1125 xid_ = xid;
1126 return 0;
1127 }
1128
commit_or_rollback_by_xid(const wsrep::xid & xid,bool commit)1129 int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
1130 bool commit)
1131 {
1132 debug_log_state("commit_or_rollback_by_xid enter");
1133 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
1134 wsrep::server_state& server_state(client_state_.server_state());
1135 wsrep::high_priority_service* sa(server_state.find_streaming_applier(xid));
1136
1137 if (!sa)
1138 {
1139 assert(sa);
1140 client_state_.override_error(wsrep::e_error_during_commit);
1141 return 1;
1142 }
1143
1144 if (commit)
1145 {
1146 flags(wsrep::provider::flag::commit);
1147 }
1148 else
1149 {
1150 flags(wsrep::provider::flag::rollback);
1151 }
1152 pa_unsafe(true);
1153 wsrep::stid stid(sa->transaction().server_id(),
1154 sa->transaction().id(),
1155 client_state_.id());
1156 wsrep::ws_meta meta(stid);
1157
1158 const enum wsrep::provider::status cert_ret(
1159 provider().certify(client_state_.id(),
1160 ws_handle_,
1161 flags(),
1162 meta));
1163
1164 int ret;
1165 if (cert_ret == wsrep::provider::success)
1166 {
1167 if (commit)
1168 {
1169 state(lock, s_certifying);
1170 state(lock, s_committing);
1171 state(lock, s_committed);
1172 }
1173 else
1174 {
1175 state(lock, s_aborting);
1176 state(lock, s_aborted);
1177 }
1178 ret = 0;
1179 }
1180 else
1181 {
1182 client_state_.override_error(wsrep::e_error_during_commit,
1183 cert_ret);
1184 wsrep::log_error() << "Failed to commit_or_rollback_by_xid,"
1185 << " xid: " << xid
1186 << " error: " << cert_ret;
1187 ret = 1;
1188 }
1189 debug_log_state("commit_or_rollback_by_xid leave");
1190 return ret;
1191 }
1192
xa_detach()1193 void wsrep::transaction::xa_detach()
1194 {
1195 debug_log_state("xa_detach enter");
1196 assert(state() == s_prepared);
1197 wsrep::server_state& server_state(client_state_.server_state());
1198 server_state.convert_streaming_client_to_applier(&client_state_);
1199 client_service_.store_globals();
1200 client_service_.cleanup_transaction();
1201 wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
1202 streaming_context_.cleanup();
1203 state(lock, s_aborting);
1204 state(lock, s_aborted);
1205 provider().release(ws_handle_);
1206 cleanup();
1207 debug_log_state("xa_detach leave");
1208 }
1209
xa_replay_common(wsrep::unique_lock<wsrep::mutex> & lock)1210 void wsrep::transaction::xa_replay_common(wsrep::unique_lock<wsrep::mutex>& lock)
1211 {
1212 assert(lock.owns_lock());
1213 assert(is_xa());
1214 assert(is_streaming());
1215 assert(state() == s_must_replay);
1216 assert(bf_aborted());
1217
1218 state(lock, s_replaying);
1219
1220 enum wsrep::provider::status status;
1221 wsrep::server_state& server_state(client_state_.server_state());
1222
1223 lock.unlock();
1224 server_state.convert_streaming_client_to_applier(&client_state_);
1225 status = client_service_.replay_unordered();
1226 client_service_.store_globals();
1227 lock.lock();
1228
1229 if (status != wsrep::provider::success)
1230 {
1231 client_service_.emergency_shutdown();
1232 }
1233 }
1234
xa_replay(wsrep::unique_lock<wsrep::mutex> & lock)1235 int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
1236 {
1237 debug_log_state("xa_replay enter");
1238 xa_replay_common(lock);
1239 state(lock, s_aborted);
1240 streaming_context_.cleanup();
1241 provider().release(ws_handle_);
1242 cleanup();
1243 client_service_.signal_replayed();
1244 debug_log_state("xa_replay leave");
1245 return 0;
1246 }
1247
xa_replay_commit(wsrep::unique_lock<wsrep::mutex> & lock)1248 int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock)
1249 {
1250 debug_log_state("xa_replay_commit enter");
1251 xa_replay_common(lock);
1252 lock.unlock();
1253 enum wsrep::provider::status status(client_service_.commit_by_xid());
1254 lock.lock();
1255 int ret(1);
1256 switch (status)
1257 {
1258 case wsrep::provider::success:
1259 state(lock, s_committed);
1260 streaming_context_.cleanup();
1261 provider().release(ws_handle_);
1262 cleanup();
1263 ret = 0;
1264 break;
1265 default:
1266 log_warning() << "Failed to commit by xid during replay";
1267 // Commit by xid failed, return a commit
1268 // error and let the client retry
1269 state(lock, s_preparing);
1270 state(lock, s_prepared);
1271 client_state_.override_error(wsrep::e_error_during_commit, status);
1272 }
1273
1274 client_service_.signal_replayed();
1275 debug_log_state("xa_replay_commit leave");
1276 return ret;
1277 }
1278
1279 ////////////////////////////////////////////////////////////////////////////////
1280 // Private //
1281 ////////////////////////////////////////////////////////////////////////////////
1282
provider()1283 inline wsrep::provider& wsrep::transaction::provider()
1284 {
1285 return client_state_.server_state().provider();
1286 }
1287
state(wsrep::unique_lock<wsrep::mutex> & lock,enum wsrep::transaction::state next_state)1288 void wsrep::transaction::state(
1289 wsrep::unique_lock<wsrep::mutex>& lock __attribute__((unused)),
1290 enum wsrep::transaction::state next_state)
1291 {
1292 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
1293 wsrep::log::debug_level_transaction,
1294 "client: " << client_state_.id().get()
1295 << " txc: " << id().get()
1296 << " state: " << to_string(state_)
1297 << " -> " << to_string(next_state));
1298
1299 assert(lock.owns_lock());
1300 // BF aborter is allowed to change the state without gaining control
1301 // to the state if the next state is s_must_abort, s_aborting or
1302 // s_must_replay (for xa idle replay).
1303 assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() ||
1304 next_state == s_must_abort ||
1305 next_state == s_must_replay ||
1306 next_state == s_aborting);
1307
1308 static const char allowed[n_states][n_states] =
1309 { /* ex pg pd ce co oc ct cf ma ab ad mr re */
1310 { 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */
1311 { 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pg */
1312 { 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0}, /* pd */
1313 { 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
1314 { 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
1315 { 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
1316 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
1317 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */
1318 { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */
1319 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
1320 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
1321 { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
1322 { 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0} /* re */
1323 };
1324
1325 if (!allowed[state_][next_state])
1326 {
1327 wsrep::log_debug() << "unallowed state transition for transaction "
1328 << id_ << ": " << wsrep::to_string(state_)
1329 << " -> " << wsrep::to_string(next_state);
1330 assert(0);
1331 }
1332
1333 state_hist_.push_back(state_);
1334 if (state_hist_.size() == 12)
1335 {
1336 state_hist_.erase(state_hist_.begin());
1337 }
1338 state_ = next_state;
1339
1340 if (state_ == s_must_replay)
1341 {
1342 client_service_.will_replay();
1343 }
1344 }
1345
abort_or_interrupt(wsrep::unique_lock<wsrep::mutex> & lock)1346 bool wsrep::transaction::abort_or_interrupt(
1347 wsrep::unique_lock<wsrep::mutex>& lock)
1348 {
1349 assert(lock.owns_lock());
1350 if (state() == s_must_abort)
1351 {
1352 client_state_.override_error(wsrep::e_deadlock_error);
1353 return true;
1354 }
1355 else if (state() == s_aborting || state() == s_aborted)
1356 {
1357 // Somehow we got there after BF abort without setting error
1358 // status. This may happen if the DBMS side aborts the transaction
1359 // but still tries to continue processing rows. Raise the error here
1360 // and assert so that the error will be caught in debug builds.
1361 if (bf_abort_client_state_ &&
1362 client_state_.current_error() == wsrep::e_success)
1363 {
1364 client_state_.override_error(wsrep::e_deadlock_error);
1365 assert(0);
1366 }
1367 return true;
1368 }
1369 else if (client_service_.interrupted(lock))
1370 {
1371 client_state_.override_error(wsrep::e_interrupted_error);
1372 if (state() != s_must_abort)
1373 {
1374 state(lock, s_must_abort);
1375 }
1376 return true;
1377 }
1378 return false;
1379 }
1380
streaming_step(wsrep::unique_lock<wsrep::mutex> & lock,bool force)1381 int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
1382 bool force)
1383 {
1384 assert(lock.owns_lock());
1385 assert(streaming_context_.fragment_size() || is_xa());
1386
1387 if (client_service_.bytes_generated() <
1388 streaming_context_.log_position())
1389 {
1390 /* Something went wrong on DBMS side in keeping track of
1391 generated bytes. Return an error to abort the transaction. */
1392 wsrep::log_warning() << "Bytes generated "
1393 << client_service_.bytes_generated()
1394 << " less than bytes certified "
1395 << streaming_context_.log_position()
1396 << ", aborting streaming transaction";
1397 return 1;
1398 }
1399 int ret(0);
1400
1401 const size_t bytes_to_replicate(client_service_.bytes_generated() -
1402 streaming_context_.log_position());
1403
1404 switch (streaming_context_.fragment_unit())
1405 {
1406 case streaming_context::row:
1407 WSREP_FALLTHROUGH;
1408 case streaming_context::statement:
1409 streaming_context_.increment_unit_counter(1);
1410 break;
1411 case streaming_context::bytes:
1412 streaming_context_.set_unit_counter(bytes_to_replicate);
1413 break;
1414 }
1415
1416 // Some statements have no effect. Do not atttempt to
1417 // replicate a fragment if no data has been generated since
1418 // last fragment replication.
1419 // We use `force = true` on XA prepare: a fragment will be
1420 // generated even if no data is pending replication.
1421 if (bytes_to_replicate <= 0 && !force)
1422 {
1423 assert(bytes_to_replicate == 0);
1424 return ret;
1425 }
1426
1427 if (streaming_context_.fragment_size_exceeded() || force)
1428 {
1429 streaming_context_.reset_unit_counter();
1430 ret = certify_fragment(lock);
1431 }
1432
1433 return ret;
1434 }
1435
certify_fragment(wsrep::unique_lock<wsrep::mutex> & lock)1436 int wsrep::transaction::certify_fragment(
1437 wsrep::unique_lock<wsrep::mutex>& lock)
1438 {
1439 assert(lock.owns_lock());
1440
1441 assert(client_state_.mode() == wsrep::client_state::m_local);
1442 assert(streaming_context_.rolled_back() == false ||
1443 state() == s_must_abort);
1444
1445 client_service_.wait_for_replayers(lock);
1446 if (abort_or_interrupt(lock))
1447 {
1448 return 1;
1449 }
1450
1451 state(lock, s_certifying);
1452 lock.unlock();
1453 client_service_.debug_sync("wsrep_before_fragment_certification");
1454
1455 enum wsrep::provider::status status(
1456 client_state_.server_state_.send_pending_rollback_events());
1457 if (status)
1458 {
1459 wsrep::log_warning()
1460 << "Failed to replicate pending rollback events: "
1461 << status << " ("
1462 << wsrep::provider::to_string(status) << ")";
1463 lock.lock();
1464 state(lock, s_must_abort);
1465 return 1;
1466 }
1467
1468 wsrep::mutable_buffer data;
1469 size_t log_position(0);
1470 if (client_service_.prepare_fragment_for_replication(data, log_position))
1471 {
1472 lock.lock();
1473 state(lock, s_must_abort);
1474 client_state_.override_error(wsrep::e_error_during_commit);
1475 return 1;
1476 }
1477 streaming_context_.set_log_position(log_position);
1478
1479 if (data.size() == 0)
1480 {
1481 wsrep::log_warning() << "Attempt to replicate empty data buffer";
1482 lock.lock();
1483 state(lock, s_executing);
1484 return 0;
1485 }
1486
1487 if (provider().append_data(ws_handle_,
1488 wsrep::const_buffer(data.data(), data.size())))
1489 {
1490 lock.lock();
1491 state(lock, s_must_abort);
1492 client_state_.override_error(wsrep::e_error_during_commit);
1493 return 1;
1494 }
1495
1496 if (is_xa())
1497 {
1498 // One more check to see if the transaction
1499 // has been aborted. This is necessary because
1500 // until the append_data above will make sure
1501 // that the transaction exists in provider.
1502 lock.lock();
1503 if (abort_or_interrupt(lock))
1504 {
1505 return 1;
1506 }
1507 lock.unlock();
1508 }
1509
1510 if (is_streaming() == false)
1511 {
1512 client_state_.server_state_.start_streaming_client(&client_state_);
1513 }
1514
1515 if (implicit_deps())
1516 {
1517 flags(flags() | wsrep::provider::flag::implicit_deps);
1518 }
1519
1520 int ret(0);
1521 enum wsrep::client_error error(wsrep::e_success);
1522 enum wsrep::provider::status cert_ret(wsrep::provider::success);
1523 // Storage service scope
1524 {
1525 scoped_storage_service<storage_service_deleter>
1526 sr_scope(
1527 client_service_,
1528 server_service_.storage_service(client_service_),
1529 storage_service_deleter(server_service_));
1530 wsrep::storage_service& storage_service(
1531 sr_scope.storage_service());
1532
1533 // First the fragment is appended to the stable storage.
1534 // This is done to ensure that there is enough capacity
1535 // available to store the fragment. The fragment meta data
1536 // is updated after certification.
1537 wsrep::id server_id(client_state_.server_state().id());
1538
1539 if (server_id.is_undefined()) {
1540 // Server disconnected from cluster, do not
1541 // append a fragment with undefined server_id.
1542 ret = 1;
1543 error = wsrep::e_append_fragment_error;
1544 }
1545
1546 if (ret == 0 &&
1547 (storage_service.start_transaction(ws_handle_) ||
1548 storage_service.append_fragment(
1549 server_id,
1550 id(),
1551 flags(),
1552 wsrep::const_buffer(data.data(), data.size()),
1553 xid())))
1554 {
1555 ret = 1;
1556 error = wsrep::e_append_fragment_error;
1557 }
1558
1559 if (ret == 0)
1560 {
1561 client_service_.debug_crash(
1562 "crash_replicate_fragment_before_certify");
1563
1564 wsrep::ws_meta sr_ws_meta;
1565 cert_ret = provider().certify(client_state_.id(),
1566 ws_handle_,
1567 flags(),
1568 sr_ws_meta);
1569 client_service_.debug_crash(
1570 "crash_replicate_fragment_after_certify");
1571
1572 switch (cert_ret)
1573 {
1574 case wsrep::provider::success:
1575 ++fragments_certified_for_statement_;
1576 assert(sr_ws_meta.seqno().is_undefined() == false);
1577 streaming_context_.certified();
1578 if (storage_service.update_fragment_meta(sr_ws_meta))
1579 {
1580 storage_service.rollback(wsrep::ws_handle(),
1581 wsrep::ws_meta());
1582 ret = 1;
1583 error = wsrep::e_deadlock_error;
1584 break;
1585 }
1586 if (storage_service.commit(ws_handle_, sr_ws_meta))
1587 {
1588 ret = 1;
1589 error = wsrep::e_deadlock_error;
1590 }
1591 else
1592 {
1593 streaming_context_.stored(sr_ws_meta.seqno());
1594 }
1595 client_service_.debug_crash(
1596 "crash_replicate_fragment_success");
1597 break;
1598 case wsrep::provider::error_bf_abort:
1599 case wsrep::provider::error_certification_failed:
1600 // Streaming transcation got BF aborted, so it must roll
1601 // back. Roll back the fragment storage operation out of
1602 // order as the commit order will be grabbed later on
1603 // during rollback process. Mark the fragment as certified
1604 // though in streaming context in order to enter streaming
1605 // rollback codepath.
1606 //
1607 // Note that despite we handle error_certification_failed
1608 // here, we mark the transaction as streaming. Apparently
1609 // the provider may return status corresponding to certification
1610 // failure even if the fragment has passed certification.
1611 // This may be a bug in provider implementation or a limitation
1612 // of error codes defined in wsrep-API. In order to make
1613 // sure that the transaction will be cleaned on other servers,
1614 // we take a risk of sending one rollback fragment for nothing.
1615 storage_service.rollback(wsrep::ws_handle(),
1616 wsrep::ws_meta());
1617 streaming_context_.certified();
1618 ret = 1;
1619 error = wsrep::e_deadlock_error;
1620 break;
1621 default:
1622 // Storage service rollback must be done out of order,
1623 // otherwise there may be a deadlock between BF aborter
1624 // and the rollback process.
1625 storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta());
1626 ret = 1;
1627 error = wsrep::e_deadlock_error;
1628 break;
1629 }
1630 }
1631 }
1632
1633 // Note: This does not release the handle in the provider
1634 // since streaming is still on. However it is needed to
1635 // make provider internal state to transition for the
1636 // next fragment. If any of the operations above failed,
1637 // the handle needs to be left unreleased for the following
1638 // rollback process.
1639 if (ret == 0)
1640 {
1641 assert(error == wsrep::e_success);
1642 ret = provider().release(ws_handle_);
1643 if (ret)
1644 {
1645 error = wsrep::e_deadlock_error;
1646 }
1647 }
1648 lock.lock();
1649 if (ret)
1650 {
1651 assert(error != wsrep::e_success);
1652 if (is_streaming() == false)
1653 {
1654 lock.unlock();
1655 client_state_.server_state_.stop_streaming_client(&client_state_);
1656 lock.lock();
1657 }
1658 else
1659 {
1660 streaming_rollback(lock);
1661 }
1662 if (state_ != s_must_abort)
1663 {
1664 state(lock, s_must_abort);
1665 }
1666 client_state_.override_error(error, cert_ret);
1667 }
1668 else if (state_ == s_must_abort)
1669 {
1670 if (is_streaming())
1671 {
1672 streaming_rollback(lock);
1673 }
1674 client_state_.override_error(wsrep::e_deadlock_error, cert_ret);
1675 ret = 1;
1676 }
1677 else
1678 {
1679 assert(state_ == s_certifying);
1680 state(lock, s_executing);
1681 flags(flags() & ~wsrep::provider::flag::start_transaction);
1682 flags(flags() & ~wsrep::provider::flag::pa_unsafe);
1683 }
1684 return ret;
1685 }
1686
certify_commit(wsrep::unique_lock<wsrep::mutex> & lock)1687 int wsrep::transaction::certify_commit(
1688 wsrep::unique_lock<wsrep::mutex>& lock)
1689 {
1690 assert(lock.owns_lock());
1691 assert(active());
1692 client_service_.wait_for_replayers(lock);
1693
1694 assert(lock.owns_lock());
1695
1696 if (abort_or_interrupt(lock))
1697 {
1698 if (is_xa() && state() == s_must_abort)
1699 {
1700 state(lock, s_must_replay);
1701 }
1702 return 1;
1703 }
1704
1705 state(lock, s_certifying);
1706 lock.unlock();
1707
1708 enum wsrep::provider::status status(
1709 client_state_.server_state_.send_pending_rollback_events());
1710 if (status)
1711 {
1712 wsrep::log_warning()
1713 << "Failed to replicate pending rollback events: "
1714 << status << " ("
1715 << wsrep::provider::to_string(status) << ")";
1716
1717 // We failed to replicate some pending rollback fragment.
1718 // Meaning that some transaction that was rolled back
1719 // locally might still be active out there in the cluster.
1720 // To avoid a potential BF-BF conflict, we need to abort
1721 // and give up on this one.
1722 // Notice that we can't abort a prepared XA that wants to
1723 // commit. Fortunately, there is no need to in this case:
1724 // the commit fragment for XA does not cause any changes and
1725 // can't possibly conflict with other transactions out there.
1726 if (!is_xa())
1727 {
1728 lock.lock();
1729 state(lock, s_must_abort);
1730 return 1;
1731 }
1732 }
1733
1734 if (is_streaming())
1735 {
1736 if (!is_xa())
1737 {
1738 append_sr_keys_for_commit();
1739 }
1740 pa_unsafe(true);
1741 }
1742
1743 if (implicit_deps())
1744 {
1745 flags(flags() | wsrep::provider::flag::implicit_deps);
1746 }
1747
1748 flags(flags() | wsrep::provider::flag::commit);
1749 flags(flags() & ~wsrep::provider::flag::prepare);
1750
1751 if (client_service_.prepare_data_for_replication())
1752 {
1753 lock.lock();
1754 // Here we fake that the size exceeded error came from provider,
1755 // even though it came from the client service. This requires
1756 // some consideration how to get meaningful error codes from
1757 // the client service.
1758 client_state_.override_error(wsrep::e_size_exceeded_error,
1759 wsrep::provider::error_size_exceeded);
1760 if (state_ != s_must_abort)
1761 {
1762 state(lock, s_must_abort);
1763 }
1764 return 1;
1765 }
1766
1767 client_service_.debug_sync("wsrep_before_certification");
1768 enum wsrep::provider::status
1769 cert_ret(provider().certify(client_state_.id(),
1770 ws_handle_,
1771 flags(),
1772 ws_meta_));
1773 client_service_.debug_sync("wsrep_after_certification");
1774
1775 lock.lock();
1776
1777 assert(state() == s_certifying || state() == s_must_abort);
1778
1779 int ret(1);
1780 switch (cert_ret)
1781 {
1782 case wsrep::provider::success:
1783 assert(ordered());
1784 certified_ = true;
1785 ++fragments_certified_for_statement_;
1786 switch (state())
1787 {
1788 case s_certifying:
1789 if (is_xa())
1790 {
1791 state(lock, s_committing);
1792 }
1793 else
1794 {
1795 state(lock, s_preparing);
1796 }
1797 ret = 0;
1798 break;
1799 case s_must_abort:
1800 // We got BF aborted after succesful certification
1801 // and before acquiring client state lock. The trasaction
1802 // must be replayed.
1803 state(lock, s_must_replay);
1804 break;
1805 default:
1806 assert(0);
1807 break;
1808 }
1809 break;
1810 case wsrep::provider::error_warning:
1811 assert(ordered() == false);
1812 state(lock, s_must_abort);
1813 client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1814 break;
1815 case wsrep::provider::error_transaction_missing:
1816 state(lock, s_must_abort);
1817 // The execution should never reach this point if the
1818 // transaction has not generated any keys or data.
1819 wsrep::log_warning() << "Transaction was missing in provider";
1820 client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1821 break;
1822 case wsrep::provider::error_bf_abort:
1823 // Transaction was replicated successfully and it was either
1824 // certified successfully or the result of certifying is not
1825 // yet known. Therefore the transaction must roll back
1826 // and go through replay either to replay and commit the whole
1827 // transaction or to determine failed certification status.
1828 if (state() != s_must_abort)
1829 {
1830 state(lock, s_must_abort);
1831 }
1832 state(lock, s_must_replay);
1833 break;
1834 case wsrep::provider::error_certification_failed:
1835 state(lock, s_cert_failed);
1836 client_state_.override_error(wsrep::e_deadlock_error);
1837 break;
1838 case wsrep::provider::error_size_exceeded:
1839 state(lock, s_must_abort);
1840 client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1841 break;
1842 case wsrep::provider::error_connection_failed:
1843 // Galera provider may return CONN_FAIL if the trx is
1844 // BF aborted O_o. If we see here that the trx was BF aborted,
1845 // return deadlock error instead of error during commit
1846 // to reduce number of error state combinations elsewhere.
1847 if (state() == s_must_abort)
1848 {
1849 if (is_xa())
1850 {
1851 state(lock, s_must_replay);
1852 }
1853 client_state_.override_error(wsrep::e_deadlock_error);
1854 }
1855 else
1856 {
1857 client_state_.override_error(wsrep::e_error_during_commit,
1858 cert_ret);
1859 if (is_xa())
1860 {
1861 state(lock, s_prepared);
1862 }
1863 else
1864 {
1865 state(lock, s_must_abort);
1866 }
1867 }
1868 break;
1869 case wsrep::provider::error_provider_failed:
1870 if (state() != s_must_abort)
1871 {
1872 state(lock, s_must_abort);
1873 }
1874 client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1875 break;
1876 case wsrep::provider::error_fatal:
1877 client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1878 state(lock, s_must_abort);
1879 client_service_.emergency_shutdown();
1880 break;
1881 case wsrep::provider::error_not_implemented:
1882 case wsrep::provider::error_not_allowed:
1883 client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1884 state(lock, s_must_abort);
1885 wsrep::log_warning() << "Certification operation was not allowed: "
1886 << "id: " << id().get()
1887 << " flags: " << std::hex << flags() << std::dec;
1888 break;
1889 default:
1890 state(lock, s_must_abort);
1891 client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1892 break;
1893 }
1894
1895 return ret;
1896 }
1897
append_sr_keys_for_commit()1898 int wsrep::transaction::append_sr_keys_for_commit()
1899 {
1900 int ret(0);
1901 assert(client_state_.mode() == wsrep::client_state::m_local);
1902 for (wsrep::sr_key_set::branch_type::const_iterator
1903 i(sr_keys_.root().begin());
1904 ret == 0 && i != sr_keys_.root().end(); ++i)
1905 {
1906 for (wsrep::sr_key_set::leaf_type::const_iterator
1907 j(i->second.begin());
1908 ret == 0 && j != i->second.end(); ++j)
1909 {
1910 wsrep::key key(wsrep::key::shared);
1911 key.append_key_part(i->first.data(), i->first.size());
1912 key.append_key_part(j->data(), j->size());
1913 ret = provider().append_key(ws_handle_, key);
1914 }
1915 }
1916 return ret;
1917 }
1918
streaming_rollback(wsrep::unique_lock<wsrep::mutex> & lock)1919 void wsrep::transaction::streaming_rollback(
1920 wsrep::unique_lock<wsrep::mutex>& lock)
1921 {
1922 debug_log_state("streaming_rollback enter");
1923 assert(state_ != s_must_replay);
1924 assert(is_streaming());
1925 assert(lock.owns_lock());
1926
1927 // Prevent streaming_rollback() to be executed simultaneously.
1928 // Notice that lock is unlocked when calling into server_state
1929 // methods, to avoid violating lock order.
1930 // The condition variable below prevents a thread to go
1931 // through streaming_rollback() while another thread is busy
1932 // stopping or converting the streaming_client().
1933 // This would be problematic if a thread is performing BF abort,
1934 // while the original client manages to complete its rollback
1935 // and therefore change the state of the transaction, causing
1936 // assertions to fire.
1937 while (streaming_rollback_in_progress_)
1938 client_state_.cond_.wait(lock);
1939 streaming_rollback_in_progress_ = true;
1940
1941 if (streaming_context_.rolled_back() == false)
1942 {
1943 if (bf_aborted_in_total_order_)
1944 {
1945 lock.unlock();
1946 client_state_.server_state_.stop_streaming_client(&client_state_);
1947 lock.lock();
1948 }
1949 else
1950 {
1951 // Create a high priority applier which will handle the
1952 // rollback fragment or clean up on configuration change.
1953 // Adopt transaction will copy fragment set and appropriate
1954 // meta data.
1955 lock.unlock();
1956 server_service_.debug_sync("wsrep_streaming_rollback");
1957 client_state_.server_state_.convert_streaming_client_to_applier(
1958 &client_state_);
1959 lock.lock();
1960 streaming_context_.cleanup();
1961
1962 enum wsrep::provider::status status(provider().rollback(id_));
1963 if (status)
1964 {
1965 lock.unlock();
1966 client_state_.server_state_.queue_rollback_event(id_);
1967 lock.lock();
1968 wsrep::log_debug()
1969 << "Failed to replicate rollback fragment for " << id_
1970 << ": " << status << " ( "
1971 << wsrep::provider::to_string(status) << ")";
1972 }
1973 }
1974
1975 // Mark the streaming context as rolled back,
1976 // so that this block is executed once.
1977 streaming_context_.rolled_back(id_);
1978 }
1979
1980 debug_log_state("streaming_rollback leave");
1981 streaming_rollback_in_progress_ = false;
1982 client_state_.cond_.notify_all();
1983 }
1984
replay(wsrep::unique_lock<wsrep::mutex> & lock)1985 int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
1986 {
1987 int ret(0);
1988 state(lock, s_replaying);
1989 // Need to remember streaming state before replay, entering
1990 // after_commit() after succesful replay will clear
1991 // fragments.
1992 const bool was_streaming(is_streaming());
1993 lock.unlock();
1994 client_service_.debug_sync("wsrep_before_replay");
1995 enum wsrep::provider::status replay_ret(client_service_.replay());
1996 client_service_.signal_replayed();
1997 if (was_streaming)
1998 {
1999 client_state_.server_state_.stop_streaming_client(&client_state_);
2000 }
2001 lock.lock();
2002 switch (replay_ret)
2003 {
2004 case wsrep::provider::success:
2005 if (state() == s_replaying)
2006 {
2007 // Replay was done by using different client state, adjust state
2008 // to committed.
2009 state(lock, s_committed);
2010 }
2011 if (is_streaming())
2012 {
2013 streaming_context_.cleanup();
2014 }
2015 provider().release(ws_handle_);
2016 break;
2017 case wsrep::provider::error_certification_failed:
2018 client_state_.override_error(
2019 wsrep::e_deadlock_error);
2020 if (is_streaming())
2021 {
2022 client_service_.remove_fragments();
2023 streaming_context_.cleanup();
2024 }
2025 state(lock, s_aborted);
2026 ret = 1;
2027 break;
2028 default:
2029 client_service_.emergency_shutdown();
2030 break;
2031 }
2032
2033 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
2034 wsrep::log::debug_level_transaction,
2035 "replay returned" << replay_ret);
2036 return ret;
2037 }
2038
cleanup()2039 void wsrep::transaction::cleanup()
2040 {
2041 debug_log_state("cleanup_enter");
2042 assert(is_streaming() == false);
2043 assert(state() == s_committed || state() == s_aborted);
2044 id_ = wsrep::transaction_id::undefined();
2045 ws_handle_ = wsrep::ws_handle();
2046 // Keep the state history for troubleshooting. Reset
2047 // at start_transaction().
2048 // state_hist_.clear();
2049 if (ordered())
2050 {
2051 client_state_.update_last_written_gtid(ws_meta_.gtid());
2052 }
2053 bf_abort_state_ = s_executing;
2054 bf_abort_provider_status_ = wsrep::provider::success;
2055 bf_abort_client_state_ = 0;
2056 bf_aborted_in_total_order_ = false;
2057 ws_meta_ = wsrep::ws_meta();
2058 flags_ = 0;
2059 certified_ = false;
2060 implicit_deps_ = false;
2061 sr_keys_.clear();
2062 streaming_context_.cleanup();
2063 client_service_.cleanup_transaction();
2064 apply_error_buf_.clear();
2065 xid_.clear();
2066 debug_log_state("cleanup_leave");
2067 }
2068
debug_log_state(const char * context) const2069 void wsrep::transaction::debug_log_state(
2070 const char* context) const
2071 {
2072 WSREP_LOG_DEBUG(
2073 client_state_.debug_log_level(),
2074 wsrep::log::debug_level_transaction,
2075 context
2076 << "\n server: " << server_id_
2077 << ", client: " << int64_t(client_state_.id().get())
2078 << ", state: " << wsrep::to_c_string(client_state_.state())
2079 << ", mode: " << wsrep::to_c_string(client_state_.mode())
2080 << "\n trx_id: " << int64_t(id_.get())
2081 << ", seqno: " << ws_meta_.seqno().get()
2082 << ", flags: " << flags()
2083 << "\n"
2084 << " state: " << wsrep::to_c_string(state_)
2085 << ", bfa_state: " << wsrep::to_c_string(bf_abort_state_)
2086 << ", error: " << wsrep::to_c_string(client_state_.current_error())
2087 << ", status: " << client_state_.current_error_status()
2088 << "\n"
2089 << " is_sr: " << is_streaming()
2090 << ", frags: " << streaming_context_.fragments_certified()
2091 << ", frags size: " << streaming_context_.fragments().size()
2092 << ", unit: " << streaming_context_.fragment_unit()
2093 << ", size: " << streaming_context_.fragment_size()
2094 << ", counter: " << streaming_context_.unit_counter()
2095 << ", log_pos: " << streaming_context_.log_position()
2096 << ", sr_rb: " << streaming_context_.rolled_back()
2097 << "\n own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id())
2098 << " thread_id: " << client_state_.owning_thread_id_
2099 << "");
2100 }
2101
debug_log_key_append(const wsrep::key & key) const2102 void wsrep::transaction::debug_log_key_append(const wsrep::key& key) const
2103 {
2104 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
2105 wsrep::log::debug_level_transaction,
2106 "key_append: "
2107 << "trx_id: "
2108 << int64_t(id().get())
2109 << " append key:\n" << key);
2110 }
2111