1 /*
2  * Copyright (C) 2018 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "wsrep/transaction.hpp"
21 #include "wsrep/client_state.hpp"
22 #include "wsrep/server_state.hpp"
23 #include "wsrep/storage_service.hpp"
24 #include "wsrep/high_priority_service.hpp"
25 #include "wsrep/key.hpp"
26 #include "wsrep/logger.hpp"
27 #include "wsrep/compiler.hpp"
28 
29 #include <sstream>
30 #include <memory>
31 
32 namespace
33 {
34     class storage_service_deleter
35     {
36     public:
storage_service_deleter(wsrep::server_service & server_service)37         storage_service_deleter(wsrep::server_service& server_service)
38             : server_service_(server_service)
39         { }
operator ()(wsrep::storage_service * storage_service)40         void operator()(wsrep::storage_service* storage_service)
41         {
42             server_service_.release_storage_service(storage_service);
43         }
44     private:
45         wsrep::server_service& server_service_;
46     };
47 
48     template <class D>
49     class scoped_storage_service
50     {
51     public:
scoped_storage_service(wsrep::client_service & client_service,wsrep::storage_service * storage_service,D deleter)52         scoped_storage_service(wsrep::client_service& client_service,
53                                wsrep::storage_service* storage_service,
54                                D deleter)
55             : client_service_(client_service)
56             , storage_service_(storage_service)
57             , deleter_(deleter)
58         {
59             if (storage_service_ == 0)
60             {
61                 throw wsrep::runtime_error("Null client_state provided");
62             }
63             client_service_.reset_globals();
64             storage_service_->store_globals();
65         }
66 
storage_service()67         wsrep::storage_service& storage_service()
68         {
69             return *storage_service_;
70         }
71 
~scoped_storage_service()72         ~scoped_storage_service()
73         {
74             deleter_(storage_service_);
75             client_service_.store_globals();
76         }
77     private:
78         scoped_storage_service(const scoped_storage_service&);
79         scoped_storage_service& operator=(const scoped_storage_service&);
80         wsrep::client_service& client_service_;
81         wsrep::storage_service* storage_service_;
82         D deleter_;
83     };
84 }
85 
86 // Public
87 
transaction(wsrep::client_state & client_state)88 wsrep::transaction::transaction(
89     wsrep::client_state& client_state)
90     : server_service_(client_state.server_state().server_service())
91     , client_service_(client_state.client_service())
92     , client_state_(client_state)
93     , server_id_()
94     , id_(transaction_id::undefined())
95     , state_(s_executing)
96     , state_hist_()
97     , bf_abort_state_(s_executing)
98     , bf_abort_provider_status_()
99     , bf_abort_client_state_()
100     , bf_aborted_in_total_order_()
101     , ws_handle_()
102     , ws_meta_()
103     , flags_()
104     , implicit_deps_(false)
105     , certified_(false)
106     , fragments_certified_for_statement_()
107     , streaming_context_()
108     , sr_keys_()
109     , apply_error_buf_()
110     , xid_()
111     , streaming_rollback_in_progress_(false)
112 { }
113 
114 
~transaction()115 wsrep::transaction::~transaction()
116 {
117 }
118 
start_transaction(const wsrep::transaction_id & id)119 int wsrep::transaction::start_transaction(
120     const wsrep::transaction_id& id)
121 {
122     debug_log_state("start_transaction enter");
123     assert(active() == false);
124     assert(is_xa() == false);
125     assert(flags() == 0);
126     server_id_ = client_state_.server_state().id();
127     id_ = id;
128     state_ = s_executing;
129     state_hist_.clear();
130     ws_handle_ = wsrep::ws_handle(id);
131     flags(wsrep::provider::flag::start_transaction);
132     switch (client_state_.mode())
133     {
134     case wsrep::client_state::m_high_priority:
135         debug_log_state("start_transaction success");
136         return 0;
137     case wsrep::client_state::m_local:
138         debug_log_state("start_transaction success");
139         return provider().start_transaction(ws_handle_);
140     default:
141         debug_log_state("start_transaction error");
142         assert(0);
143         return 1;
144     }
145 }
146 
start_transaction(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)147 int wsrep::transaction::start_transaction(
148     const wsrep::ws_handle& ws_handle,
149     const wsrep::ws_meta& ws_meta)
150 {
151     debug_log_state("start_transaction enter");
152     if (state() != s_replaying)
153     {
154         // assert(ws_meta.flags());
155         assert(active() == false);
156         assert(flags() == 0);
157         server_id_ = ws_meta.server_id();
158         id_ = ws_meta.transaction_id();
159         assert(client_state_.mode() == wsrep::client_state::m_high_priority);
160         state_ = s_executing;
161         state_hist_.clear();
162         ws_handle_ = ws_handle;
163         ws_meta_ = ws_meta;
164         flags(wsrep::provider::flag::start_transaction);
165         certified_ = true;
166     }
167     else
168     {
169         ws_meta_ = ws_meta;
170         assert(ws_meta_.flags() & wsrep::provider::flag::commit);
171         assert(active());
172         assert(client_state_.mode() == wsrep::client_state::m_high_priority);
173         assert(state() == s_replaying);
174         assert(ws_meta_.seqno().is_undefined() == false);
175         certified_ = true;
176     }
177     debug_log_state("start_transaction leave");
178     return 0;
179 }
180 
next_fragment(const wsrep::ws_meta & ws_meta)181 int wsrep::transaction::next_fragment(
182     const wsrep::ws_meta& ws_meta)
183 {
184     debug_log_state("next_fragment enter");
185     ws_meta_ = ws_meta;
186     debug_log_state("next_fragment leave");
187     return 0;
188 }
189 
adopt(const wsrep::transaction & transaction)190 void wsrep::transaction::adopt(const wsrep::transaction& transaction)
191 {
192     debug_log_state("adopt enter");
193     assert(transaction.is_streaming());
194     start_transaction(transaction.id());
195     server_id_ = transaction.server_id_;
196     flags_  = transaction.flags();
197     streaming_context_ = transaction.streaming_context();
198     debug_log_state("adopt leave");
199 }
200 
fragment_applied(wsrep::seqno seqno)201 void wsrep::transaction::fragment_applied(wsrep::seqno seqno)
202 {
203     assert(active());
204     streaming_context_.applied(seqno);
205 }
206 
prepare_for_ordering(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,bool is_commit)207 int wsrep::transaction::prepare_for_ordering(
208     const wsrep::ws_handle& ws_handle,
209     const wsrep::ws_meta& ws_meta,
210     bool is_commit)
211 {
212     assert(active());
213 
214     if (state_ != s_replaying)
215     {
216         ws_handle_ = ws_handle;
217         ws_meta_ = ws_meta;
218         certified_ = is_commit;
219     }
220     return 0;
221 }
222 
assign_read_view(const wsrep::gtid * const gtid)223 int wsrep::transaction::assign_read_view(const wsrep::gtid* const gtid)
224 {
225     try
226     {
227         return provider().assign_read_view(ws_handle_, gtid);
228     }
229     catch (...)
230     {
231         wsrep::log_error() << "Failed to assign read view";
232         return 1;
233     }
234 }
235 
append_key(const wsrep::key & key)236 int wsrep::transaction::append_key(const wsrep::key& key)
237 {
238     assert(active());
239     try
240     {
241         debug_log_key_append(key);
242         sr_keys_.insert(key);
243         return provider().append_key(ws_handle_, key);
244     }
245     catch (...)
246     {
247         wsrep::log_error() << "Failed to append key";
248         return 1;
249     }
250 }
251 
append_data(const wsrep::const_buffer & data)252 int wsrep::transaction::append_data(const wsrep::const_buffer& data)
253 {
254     assert(active());
255     return provider().append_data(ws_handle_, data);
256 }
257 
after_row()258 int wsrep::transaction::after_row()
259 {
260     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
261     debug_log_state("after_row_enter");
262     int ret(0);
263     if (streaming_context_.fragment_size() &&
264         streaming_context_.fragment_unit() != streaming_context::statement)
265     {
266         ret = streaming_step(lock);
267     }
268     debug_log_state("after_row_leave");
269     return ret;
270 }
271 
before_prepare(wsrep::unique_lock<wsrep::mutex> & lock)272 int wsrep::transaction::before_prepare(
273     wsrep::unique_lock<wsrep::mutex>& lock)
274 {
275     assert(lock.owns_lock());
276     int ret(0);
277     debug_log_state("before_prepare_enter");
278     assert(state() == s_executing || state() == s_must_abort ||
279            state() == s_replaying);
280 
281     if (state() == s_must_abort)
282     {
283         assert(client_state_.mode() == wsrep::client_state::m_local);
284         client_state_.override_error(wsrep::e_deadlock_error);
285         return 1;
286     }
287 
288     switch (client_state_.mode())
289     {
290     case wsrep::client_state::m_local:
291         if (is_streaming())
292         {
293             client_service_.debug_crash(
294                 "crash_last_fragment_commit_before_fragment_removal");
295             lock.unlock();
296             if (client_service_.statement_allowed_for_streaming() == false)
297             {
298                 client_state_.override_error(
299                     wsrep::e_error_during_commit,
300                     wsrep::provider::error_not_allowed);
301                 ret = 1;
302             }
303             else if (!is_xa())
304             {
305                 // Note: we can't remove fragments here for XA,
306                 // the transaction has already issued XA END and
307                 // is in IDLE state, no more changes allowed!
308                 ret = client_service_.remove_fragments();
309                 if (ret)
310                 {
311                     client_state_.override_error(wsrep::e_deadlock_error);
312                 }
313             }
314             lock.lock();
315             client_service_.debug_crash(
316                 "crash_last_fragment_commit_after_fragment_removal");
317             if (state() == s_must_abort)
318             {
319                 client_state_.override_error(wsrep::e_deadlock_error);
320                 ret = 1;
321             }
322         }
323 
324         if (ret == 0)
325         {
326             if (is_xa())
327             {
328                 // Force fragment replication on XA prepare
329                 flags(flags() | wsrep::provider::flag::prepare);
330                 pa_unsafe(true);
331                 append_sr_keys_for_commit();
332                 const bool force_streaming_step = true;
333                 ret = streaming_step(lock, force_streaming_step);
334                 if (ret == 0)
335                 {
336                     assert(state() == s_executing);
337                     state(lock, s_preparing);
338                 }
339             }
340             else
341             {
342                 ret = certify_commit(lock);
343             }
344 
345             assert((ret == 0 && state() == s_preparing) ||
346                    (state() == s_must_abort ||
347                     state() == s_must_replay ||
348                     state() == s_cert_failed));
349 
350             if (ret)
351             {
352                 assert(state() == s_must_replay ||
353                        client_state_.current_error());
354                 ret = 1;
355             }
356         }
357 
358         break;
359     case wsrep::client_state::m_high_priority:
360         // Note: fragment removal is done from applying
361         // context for high priority mode.
362         if (is_xa())
363         {
364             assert(state() == s_executing ||
365                    state() == s_replaying);
366             if (state() == s_replaying)
367             {
368                 break;
369             }
370         }
371         state(lock, s_preparing);
372         break;
373     default:
374         assert(0);
375         break;
376     }
377 
378     assert(state() == s_preparing ||
379            (is_xa() && state() == s_replaying) ||
380            (ret && (state() == s_must_abort ||
381                     state() == s_must_replay ||
382                     state() == s_cert_failed ||
383                     state() == s_aborted)));
384     debug_log_state("before_prepare_leave");
385     return ret;
386 }
387 
after_prepare(wsrep::unique_lock<wsrep::mutex> & lock)388 int wsrep::transaction::after_prepare(
389     wsrep::unique_lock<wsrep::mutex>& lock)
390 {
391     assert(lock.owns_lock());
392 
393     int ret = 0;
394     debug_log_state("after_prepare_enter");
395     if (is_xa())
396     {
397         switch (state())
398         {
399         case s_preparing:
400             assert(client_state_.mode() == wsrep::client_state::m_local ||
401                    (certified() && ordered()));
402             state(lock, s_prepared);
403             break;
404         case s_must_abort:
405             // prepared locally, but client has not received
406             // a result yet. We can still abort.
407             assert(client_state_.mode() == wsrep::client_state::m_local);
408             client_state_.override_error(wsrep::e_deadlock_error);
409             ret = 1;
410             break;
411         case s_replaying:
412             assert(client_state_.mode() == wsrep::client_state::m_high_priority);
413             break;
414         default:
415             assert(0);
416         }
417     }
418     else
419     {
420         assert(certified() && ordered());
421         assert(state() == s_preparing || state() == s_must_abort);
422 
423         if (state() == s_must_abort)
424         {
425             assert(client_state_.mode() == wsrep::client_state::m_local);
426             state(lock, s_must_replay);
427             ret = 1;
428         }
429         else
430         {
431             state(lock, s_committing);
432         }
433     }
434     debug_log_state("after_prepare_leave");
435     return ret;
436 }
437 
before_commit()438 int wsrep::transaction::before_commit()
439 {
440     int ret(1);
441 
442     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
443     debug_log_state("before_commit_enter");
444     assert(client_state_.mode() != wsrep::client_state::m_toi);
445     assert(state() == s_executing ||
446            state() == s_prepared ||
447            state() == s_committing ||
448            state() == s_must_abort ||
449            state() == s_replaying);
450     assert((state() != s_committing && state() != s_replaying) ||
451            certified());
452 
453     switch (client_state_.mode())
454     {
455     case wsrep::client_state::m_local:
456         if (state() == s_executing)
457         {
458             ret = before_prepare(lock) || after_prepare(lock);
459             assert((ret == 0 &&
460                     (state() == s_committing || state() == s_prepared))
461                    ||
462                    (state() == s_must_abort ||
463                     state() == s_must_replay ||
464                     state() == s_cert_failed ||
465                     state() == s_aborted));
466         }
467         else if (state() != s_committing && state() != s_prepared)
468         {
469             assert(state() == s_must_abort);
470             if (certified() ||
471                 (is_xa() && is_streaming()))
472             {
473                 state(lock, s_must_replay);
474             }
475             else
476             {
477                 client_state_.override_error(wsrep::e_deadlock_error);
478             }
479         }
480         else
481         {
482             // 2PC commit, prepare was done before
483             ret = 0;
484         }
485 
486         if (ret == 0 && state() == s_prepared)
487         {
488             ret = certify_commit(lock);
489             assert((ret == 0 && state() == s_committing) ||
490                    (state() == s_must_abort ||
491                     state() == s_must_replay ||
492                     state() == s_cert_failed ||
493                     state() == s_prepared));
494         }
495 
496         if (ret == 0)
497         {
498             assert(certified());
499             assert(ordered());
500             lock.unlock();
501             client_service_.debug_sync("wsrep_before_commit_order_enter");
502             enum wsrep::provider::status
503                 status(provider().commit_order_enter(ws_handle_, ws_meta_));
504             lock.lock();
505             switch (status)
506             {
507             case wsrep::provider::success:
508                 break;
509             case wsrep::provider::error_bf_abort:
510                 if (state() != s_must_abort)
511                 {
512                     state(lock, s_must_abort);
513                 }
514                 state(lock, s_must_replay);
515                 ret = 1;
516                 break;
517             default:
518                 ret = 1;
519                 assert(0);
520                 break;
521             }
522         }
523         break;
524     case wsrep::client_state::m_high_priority:
525         assert(certified());
526         assert(ordered());
527         if (is_xa())
528         {
529             assert(state() == s_prepared ||
530                    state() == s_replaying);
531             state(lock, s_committing);
532             ret = 0;
533         }
534         else if (state() == s_executing || state() == s_replaying)
535         {
536             ret = before_prepare(lock) || after_prepare(lock);
537         }
538         else
539         {
540             ret = 0;
541         }
542         lock.unlock();
543         ret = ret || provider().commit_order_enter(ws_handle_, ws_meta_);
544         lock.lock();
545         if (ret)
546         {
547             state(lock, s_must_abort);
548             state(lock, s_aborting);
549         }
550         break;
551     default:
552         assert(0);
553         break;
554     }
555     debug_log_state("before_commit_leave");
556     return ret;
557 }
558 
ordered_commit()559 int wsrep::transaction::ordered_commit()
560 {
561     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
562     debug_log_state("ordered_commit_enter");
563     assert(state() == s_committing);
564     assert(ordered());
565     client_service_.debug_sync("wsrep_before_commit_order_leave");
566     int ret(provider().commit_order_leave(ws_handle_, ws_meta_,
567                                           apply_error_buf_));
568     client_service_.debug_sync("wsrep_after_commit_order_leave");
569     // Should always succeed:
570     // 1) If before commit before succeeds, the transaction handle
571     //    in the provider is guaranteed to exist and the commit
572     //    has been ordered
573     // 2) The transaction which has been ordered for commit cannot be BF
574     //    aborted anymore
575     // 3) The provider should always guarantee that the transactions which
576     //    have been ordered for commit can finish committing.
577     //
578     // The exception here is a storage service transaction which is running
579     // in high priority mode. The fragment storage commit may get BF
580     // aborted in the provider after commit ordering has been
581     // established since the transaction is operating in streaming
582     // mode.
583     if (ret)
584     {
585         assert(client_state_.mode() == wsrep::client_state::m_high_priority);
586         state(lock, s_must_abort);
587         state(lock, s_aborting);
588     }
589     else
590     {
591         state(lock, s_ordered_commit);
592     }
593     debug_log_state("ordered_commit_leave");
594     return ret;
595 }
596 
after_commit()597 int wsrep::transaction::after_commit()
598 {
599     int ret(0);
600 
601     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
602     debug_log_state("after_commit_enter");
603     assert(state() == s_ordered_commit);
604 
605     if (is_streaming())
606     {
607         assert(client_state_.mode() == wsrep::client_state::m_local ||
608                client_state_.mode() == wsrep::client_state::m_high_priority);
609 
610         if (is_xa())
611         {
612             // XA fragment removal happens here,
613             // see comment in before_prepare
614             lock.unlock();
615             scoped_storage_service<storage_service_deleter>
616                 sr_scope(
617                     client_service_,
618                     server_service_.storage_service(client_service_),
619                     storage_service_deleter(server_service_));
620             wsrep::storage_service& storage_service(
621                 sr_scope.storage_service());
622             storage_service.adopt_transaction(*this);
623             storage_service.remove_fragments();
624             storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
625             lock.lock();
626         }
627 
628         if (client_state_.mode() == wsrep::client_state::m_local)
629         {
630             lock.unlock();
631             client_state_.server_state_.stop_streaming_client(&client_state_);
632             lock.lock();
633         }
634         streaming_context_.cleanup();
635     }
636 
637     switch (client_state_.mode())
638     {
639     case wsrep::client_state::m_local:
640         ret = provider().release(ws_handle_);
641         break;
642     case wsrep::client_state::m_high_priority:
643         break;
644     default:
645         assert(0);
646         break;
647     }
648     assert(ret == 0);
649     state(lock, s_committed);
650 
651     debug_log_state("after_commit_leave");
652     return ret;
653 }
654 
before_rollback()655 int wsrep::transaction::before_rollback()
656 {
657     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
658     debug_log_state("before_rollback_enter");
659     assert(state() == s_executing ||
660            state() == s_preparing ||
661            state() == s_prepared ||
662            state() == s_must_abort ||
663            // Background rollbacker or rollback initiated from SE
664            state() == s_aborting ||
665            state() == s_cert_failed ||
666            state() == s_must_replay);
667 
668     switch (client_state_.mode())
669     {
670     case wsrep::client_state::m_local:
671         if (is_streaming())
672         {
673             client_service_.debug_sync("wsrep_before_SR_rollback");
674         }
675         switch (state())
676         {
677         case s_preparing:
678             // Error detected during prepare phase
679             state(lock, s_must_abort);
680             WSREP_FALLTHROUGH;
681         case s_prepared:
682             WSREP_FALLTHROUGH;
683         case s_executing:
684             // Voluntary rollback
685             if (is_streaming())
686             {
687                 streaming_rollback(lock);
688             }
689             state(lock, s_aborting);
690             break;
691         case s_must_abort:
692             if (certified())
693             {
694                 state(lock, s_must_replay);
695             }
696             else
697             {
698                 if (is_streaming())
699                 {
700                     streaming_rollback(lock);
701                 }
702                 state(lock, s_aborting);
703             }
704             break;
705         case s_cert_failed:
706             if (is_streaming())
707             {
708                 streaming_rollback(lock);
709             }
710             state(lock, s_aborting);
711             break;
712         case s_aborting:
713             if (is_streaming())
714             {
715                 streaming_rollback(lock);
716             }
717             break;
718         case s_must_replay:
719             break;
720         default:
721             assert(0);
722             break;
723         }
724         break;
725     case wsrep::client_state::m_high_priority:
726         // Rollback by rollback write set or BF abort
727         assert(state_ == s_executing || state_ == s_prepared || state_ == s_aborting);
728         if (state_ != s_aborting)
729         {
730             state(lock, s_aborting);
731         }
732         break;
733     default:
734         assert(0);
735         break;
736     }
737 
738     debug_log_state("before_rollback_leave");
739     return 0;
740 }
741 
after_rollback()742 int wsrep::transaction::after_rollback()
743 {
744     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
745     debug_log_state("after_rollback_enter");
746     assert(state() == s_aborting ||
747            state() == s_must_replay);
748 
749     if (is_streaming() && bf_aborted_in_total_order_)
750     {
751         lock.unlock();
752         // Storage service scope
753         {
754             scoped_storage_service<storage_service_deleter>
755                 sr_scope(
756                     client_service_,
757                     server_service_.storage_service(client_service_),
758                     storage_service_deleter(server_service_));
759             wsrep::storage_service& storage_service(
760                 sr_scope.storage_service());
761             storage_service.adopt_transaction(*this);
762             storage_service.remove_fragments();
763             storage_service.commit(wsrep::ws_handle(), wsrep::ws_meta());
764         }
765         lock.lock();
766         streaming_context_.cleanup();
767     }
768 
769     if (is_streaming() && state() != s_must_replay)
770     {
771         streaming_context_.cleanup();
772     }
773 
774     if (state() == s_aborting)
775     {
776         state(lock, s_aborted);
777     }
778 
779     // Releasing the transaction from provider is postponed into
780     // after_statement() hook. Depending on DBMS system all the
781     // resources acquired by transaction may or may not be released
782     // during actual rollback. If the transaction has been ordered,
783     // releasing the commit ordering critical section should be
784     // also postponed until all resources have been released.
785     debug_log_state("after_rollback_leave");
786     return 0;
787 }
788 
release_commit_order(wsrep::unique_lock<wsrep::mutex> & lock)789 int wsrep::transaction::release_commit_order(
790     wsrep::unique_lock<wsrep::mutex>& lock)
791 {
792     lock.unlock();
793     int ret(provider().commit_order_enter(ws_handle_, ws_meta_));
794     lock.lock();
795     if (!ret)
796     {
797         server_service_.set_position(client_service_, ws_meta_.gtid());
798         ret = provider().commit_order_leave(ws_handle_, ws_meta_,
799                                             apply_error_buf_);
800     }
801     return ret;
802 }
803 
after_statement()804 int wsrep::transaction::after_statement()
805 {
806     int ret(0);
807     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex());
808     debug_log_state("after_statement_enter");
809     assert(client_state_.mode() == wsrep::client_state::m_local);
810     assert(state() == s_executing ||
811            state() == s_prepared ||
812            state() == s_committed ||
813            state() == s_aborted ||
814            state() == s_must_abort ||
815            state() == s_cert_failed ||
816            state() == s_must_replay);
817 
818     if (state() == s_executing &&
819         streaming_context_.fragment_size() &&
820         streaming_context_.fragment_unit() == streaming_context::statement)
821     {
822         ret = streaming_step(lock);
823     }
824 
825     switch (state())
826     {
827     case s_executing:
828         // ?
829         break;
830     case s_prepared:
831         assert(is_xa());
832         break;
833     case s_committed:
834         assert(is_streaming() == false);
835         break;
836     case s_must_abort:
837     case s_cert_failed:
838         // Error may be set already. For example, if fragment size
839         // exceeded the maximum size in certify_fragment(), then
840         // we already have wsrep::e_error_during_commit
841         if (client_state_.current_error() == wsrep::e_success)
842         {
843             client_state_.override_error(wsrep::e_deadlock_error);
844         }
845         lock.unlock();
846         ret = client_service_.bf_rollback();
847         lock.lock();
848         if (state() != s_must_replay)
849         {
850             break;
851         }
852         // Continue to replay if rollback() changed the state to s_must_replay
853         WSREP_FALLTHROUGH;
854     case s_must_replay:
855     {
856         if (is_xa() && !ordered())
857         {
858             ret = xa_replay_commit(lock);
859         }
860         else
861         {
862             ret = replay(lock);
863         }
864         break;
865     }
866     case s_aborted:
867         // Raise a deadlock error if the transaction was BF aborted and
868         // rolled back by client outside of transaction hooks.
869         if (bf_aborted() && client_state_.current_error() == wsrep::e_success &&
870             !client_service_.is_xa_rollback())
871         {
872             client_state_.override_error(wsrep::e_deadlock_error);
873         }
874         break;
875     default:
876         assert(0);
877         break;
878     }
879 
880     assert(state() == s_executing ||
881            state() == s_prepared ||
882            state() == s_committed ||
883            state() == s_aborted   ||
884            state() == s_must_replay);
885 
886     if (state() == s_aborted)
887     {
888         if (ordered())
889         {
890             ret = release_commit_order(lock);
891         }
892         lock.unlock();
893         provider().release(ws_handle_);
894         lock.lock();
895     }
896 
897     if (state() != s_executing &&
898         (!client_service_.is_explicit_xa() ||
899          client_state_.state() == wsrep::client_state::s_quitting))
900     {
901         cleanup();
902     }
903     fragments_certified_for_statement_ = 0;
904     debug_log_state("after_statement_leave");
905     assert(ret == 0 || state() == s_aborted);
906     return ret;
907 }
908 
after_command_must_abort(wsrep::unique_lock<wsrep::mutex> & lock)909 void wsrep::transaction::after_command_must_abort(
910     wsrep::unique_lock<wsrep::mutex>& lock)
911 {
912     debug_log_state("after_command_must_abort enter");
913     assert(active());
914     assert(state_ == s_must_abort);
915 
916     if (is_xa() && is_streaming())
917     {
918         state(lock, s_must_replay);
919     }
920 
921     lock.unlock();
922     client_service_.bf_rollback();
923     lock.lock();
924 
925     if (is_xa() && is_streaming())
926     {
927         xa_replay(lock);
928     }
929     else
930     {
931         client_state_.override_error(wsrep::e_deadlock_error);
932     }
933 
934     debug_log_state("after_command_must_abort leave");
935 }
936 
after_applying()937 void wsrep::transaction::after_applying()
938 {
939     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
940     debug_log_state("after_applying enter");
941     assert(state_ == s_executing ||
942            state_ == s_prepared ||
943            state_ == s_committed ||
944            state_ == s_aborted ||
945            state_ == s_replaying);
946 
947     if (state_ != s_executing && state_ != s_prepared)
948     {
949         if (state_ == s_replaying) state(lock, s_aborted);
950         cleanup();
951     }
952     else
953     {
954         // State remains executing or prepared, so this is a streaming applier.
955         // Reset the meta data to avoid releasing commit order
956         // critical section above if the next fragment is rollback
957         // fragment. Rollback fragment ordering will be handled by
958         // another instance while removing the fragments from
959         // storage.
960         ws_meta_ = wsrep::ws_meta();
961     }
962     debug_log_state("after_applying leave");
963 }
964 
bf_abort(wsrep::unique_lock<wsrep::mutex> & lock,wsrep::seqno bf_seqno)965 bool wsrep::transaction::bf_abort(
966     wsrep::unique_lock<wsrep::mutex>& lock,
967     wsrep::seqno bf_seqno)
968 {
969     bool ret(false);
970     const enum wsrep::transaction::state state_at_enter(state());
971     assert(lock.owns_lock());
972 
973     if (active() == false)
974     {
975          WSREP_LOG_DEBUG(client_state_.debug_log_level(),
976                          wsrep::log::debug_level_transaction,
977                          "Transaction not active, skipping bf abort");
978     }
979     else
980     {
981         switch (state_at_enter)
982         {
983         case s_executing:
984         case s_preparing:
985         case s_prepared:
986         case s_certifying:
987         case s_committing:
988         {
989             wsrep::seqno victim_seqno;
990             enum wsrep::provider::status
991                 status(client_state_.provider().bf_abort(
992                            bf_seqno, id_, victim_seqno));
993             switch (status)
994             {
995             case wsrep::provider::success:
996                 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
997                                 wsrep::log::debug_level_transaction,
998                                 "Seqno " << bf_seqno
999                                 << " successfully BF aborted " << id_
1000                                 << " victim_seqno " << victim_seqno);
1001                 bf_abort_state_ = state_at_enter;
1002                 state(lock, s_must_abort);
1003                 ret = true;
1004                 break;
1005             default:
1006                 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
1007                                 wsrep::log::debug_level_transaction,
1008                                 "Seqno " << bf_seqno
1009                                 << " failed to BF abort " << id_
1010                                 << " with status " << status
1011                                 << " victim_seqno " << victim_seqno);
1012                 break;
1013             }
1014             break;
1015         }
1016         default:
1017                 WSREP_LOG_DEBUG(client_state_.debug_log_level(),
1018                                 wsrep::log::debug_level_transaction,
1019                                 "BF abort not allowed in state "
1020                                 << wsrep::to_string(state_at_enter));
1021             break;
1022         }
1023     }
1024 
1025     if (ret)
1026     {
1027         bf_abort_client_state_ = client_state_.state();
1028         // If the transaction is in executing state, we must initiate
1029         // streaming rollback to ensure that the rollback fragment gets
1030         // replicated before the victim starts to roll back and release locks.
1031         // In other states the BF abort will be detected outside of
1032         // storage engine operations and streaming rollback will be
1033         // handled from before_rollback() call.
1034         if (client_state_.mode() == wsrep::client_state::m_local &&
1035             is_streaming() && state_at_enter == s_executing)
1036         {
1037             streaming_rollback(lock);
1038         }
1039 
1040         if ((client_state_.state() == wsrep::client_state::s_idle &&
1041              client_state_.server_state().rollback_mode() ==
1042              wsrep::server_state::rm_sync) // locally processing idle
1043             ||
1044             // high priority streaming
1045             (client_state_.mode() == wsrep::client_state::m_high_priority &&
1046              is_streaming()))
1047         {
1048             // We need to change the state to aborting under the
1049             // lock protection to avoid a race between client thread,
1050             // otherwise it could happen that the client gains control
1051             // between releasing the lock and before background
1052             // rollbacker gets control.
1053             if (is_xa() && state_at_enter == s_prepared)
1054             {
1055                 state(lock, s_must_replay);
1056                 client_state_.set_rollbacker_active(true);
1057             }
1058             else
1059             {
1060                 state(lock, s_aborting);
1061                 client_state_.set_rollbacker_active(true);
1062                 if (client_state_.mode() == wsrep::client_state::m_high_priority)
1063                 {
1064                     lock.unlock();
1065                     client_state_.server_state().stop_streaming_applier(
1066                         server_id_, id_);
1067                     lock.lock();
1068                 }
1069             }
1070 
1071             lock.unlock();
1072             server_service_.background_rollback(client_state_);
1073         }
1074     }
1075     return ret;
1076 }
1077 
total_order_bf_abort(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED,wsrep::seqno bf_seqno)1078 bool wsrep::transaction::total_order_bf_abort(
1079     wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
1080     wsrep::seqno bf_seqno)
1081 {
1082     bool ret(bf_abort(lock, bf_seqno));
1083     if (ret)
1084     {
1085         bf_aborted_in_total_order_ = true;
1086     }
1087     return ret;
1088 }
1089 
clone_for_replay(const wsrep::transaction & other)1090 void wsrep::transaction::clone_for_replay(const wsrep::transaction& other)
1091 {
1092     assert(other.state() == s_replaying);
1093     id_ = other.id_;
1094     xid_ = other.xid_;
1095     server_id_ = other.server_id_;
1096     ws_handle_ = other.ws_handle_;
1097     ws_meta_ = other.ws_meta_;
1098     streaming_context_ = other.streaming_context_;
1099     state_ = s_replaying;
1100 }
1101 
assign_xid(const wsrep::xid & xid)1102 void wsrep::transaction::assign_xid(const wsrep::xid& xid)
1103 {
1104     assert(active());
1105     assert(!is_xa());
1106     xid_ = xid;
1107 }
1108 
restore_to_prepared_state(const wsrep::xid & xid)1109 int wsrep::transaction::restore_to_prepared_state(const wsrep::xid& xid)
1110 {
1111     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
1112     assert(active());
1113     assert(is_empty());
1114     assert(state() == s_executing || state() == s_replaying);
1115     flags(flags() & ~wsrep::provider::flag::start_transaction);
1116     if (state() == s_executing)
1117     {
1118         state(lock, s_certifying);
1119     }
1120     else
1121     {
1122         state(lock, s_preparing);
1123     }
1124     state(lock, s_prepared);
1125     xid_ = xid;
1126     return 0;
1127 }
1128 
commit_or_rollback_by_xid(const wsrep::xid & xid,bool commit)1129 int wsrep::transaction::commit_or_rollback_by_xid(const wsrep::xid& xid,
1130                                                   bool commit)
1131 {
1132     debug_log_state("commit_or_rollback_by_xid enter");
1133     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
1134     wsrep::server_state& server_state(client_state_.server_state());
1135     wsrep::high_priority_service* sa(server_state.find_streaming_applier(xid));
1136 
1137     if (!sa)
1138     {
1139         assert(sa);
1140         client_state_.override_error(wsrep::e_error_during_commit);
1141         return 1;
1142     }
1143 
1144     if (commit)
1145     {
1146         flags(wsrep::provider::flag::commit);
1147     }
1148     else
1149     {
1150         flags(wsrep::provider::flag::rollback);
1151     }
1152     pa_unsafe(true);
1153     wsrep::stid stid(sa->transaction().server_id(),
1154                      sa->transaction().id(),
1155                      client_state_.id());
1156     wsrep::ws_meta meta(stid);
1157 
1158     const enum wsrep::provider::status cert_ret(
1159         provider().certify(client_state_.id(),
1160                            ws_handle_,
1161                            flags(),
1162                            meta));
1163 
1164     int ret;
1165     if (cert_ret == wsrep::provider::success)
1166     {
1167         if (commit)
1168         {
1169             state(lock, s_certifying);
1170             state(lock, s_committing);
1171             state(lock, s_committed);
1172         }
1173         else
1174         {
1175             state(lock, s_aborting);
1176             state(lock, s_aborted);
1177         }
1178         ret = 0;
1179     }
1180     else
1181     {
1182         client_state_.override_error(wsrep::e_error_during_commit,
1183                                      cert_ret);
1184         wsrep::log_error() << "Failed to commit_or_rollback_by_xid,"
1185                            << " xid: " << xid
1186                            << " error: " << cert_ret;
1187         ret = 1;
1188     }
1189     debug_log_state("commit_or_rollback_by_xid leave");
1190     return ret;
1191 }
1192 
xa_detach()1193 void wsrep::transaction::xa_detach()
1194 {
1195     debug_log_state("xa_detach enter");
1196     assert(state() == s_prepared);
1197     wsrep::server_state& server_state(client_state_.server_state());
1198     server_state.convert_streaming_client_to_applier(&client_state_);
1199     client_service_.store_globals();
1200     client_service_.cleanup_transaction();
1201     wsrep::unique_lock<wsrep::mutex> lock(client_state_.mutex_);
1202     streaming_context_.cleanup();
1203     state(lock, s_aborting);
1204     state(lock, s_aborted);
1205     provider().release(ws_handle_);
1206     cleanup();
1207     debug_log_state("xa_detach leave");
1208 }
1209 
xa_replay_common(wsrep::unique_lock<wsrep::mutex> & lock)1210 void wsrep::transaction::xa_replay_common(wsrep::unique_lock<wsrep::mutex>& lock)
1211 {
1212     assert(lock.owns_lock());
1213     assert(is_xa());
1214     assert(is_streaming());
1215     assert(state() == s_must_replay);
1216     assert(bf_aborted());
1217 
1218     state(lock, s_replaying);
1219 
1220     enum wsrep::provider::status status;
1221     wsrep::server_state& server_state(client_state_.server_state());
1222 
1223     lock.unlock();
1224     server_state.convert_streaming_client_to_applier(&client_state_);
1225     status = client_service_.replay_unordered();
1226     client_service_.store_globals();
1227     lock.lock();
1228 
1229     if (status != wsrep::provider::success)
1230     {
1231         client_service_.emergency_shutdown();
1232     }
1233 }
1234 
xa_replay(wsrep::unique_lock<wsrep::mutex> & lock)1235 int wsrep::transaction::xa_replay(wsrep::unique_lock<wsrep::mutex>& lock)
1236 {
1237     debug_log_state("xa_replay enter");
1238     xa_replay_common(lock);
1239     state(lock, s_aborted);
1240     streaming_context_.cleanup();
1241     provider().release(ws_handle_);
1242     cleanup();
1243     client_service_.signal_replayed();
1244     debug_log_state("xa_replay leave");
1245     return 0;
1246 }
1247 
xa_replay_commit(wsrep::unique_lock<wsrep::mutex> & lock)1248 int wsrep::transaction::xa_replay_commit(wsrep::unique_lock<wsrep::mutex>& lock)
1249 {
1250     debug_log_state("xa_replay_commit enter");
1251     xa_replay_common(lock);
1252     lock.unlock();
1253     enum wsrep::provider::status status(client_service_.commit_by_xid());
1254     lock.lock();
1255     int ret(1);
1256     switch (status)
1257     {
1258     case wsrep::provider::success:
1259         state(lock, s_committed);
1260         streaming_context_.cleanup();
1261         provider().release(ws_handle_);
1262         cleanup();
1263         ret = 0;
1264         break;
1265     default:
1266         log_warning() << "Failed to commit by xid during replay";
1267         // Commit by xid failed, return a commit
1268         // error and let the client retry
1269         state(lock, s_preparing);
1270         state(lock, s_prepared);
1271         client_state_.override_error(wsrep::e_error_during_commit, status);
1272     }
1273 
1274     client_service_.signal_replayed();
1275     debug_log_state("xa_replay_commit leave");
1276     return ret;
1277 }
1278 
1279 ////////////////////////////////////////////////////////////////////////////////
1280 //                                 Private                                    //
1281 ////////////////////////////////////////////////////////////////////////////////
1282 
provider()1283 inline wsrep::provider& wsrep::transaction::provider()
1284 {
1285     return client_state_.server_state().provider();
1286 }
1287 
state(wsrep::unique_lock<wsrep::mutex> & lock,enum wsrep::transaction::state next_state)1288 void wsrep::transaction::state(
1289     wsrep::unique_lock<wsrep::mutex>& lock __attribute__((unused)),
1290     enum wsrep::transaction::state next_state)
1291 {
1292     WSREP_LOG_DEBUG(client_state_.debug_log_level(),
1293                     wsrep::log::debug_level_transaction,
1294                     "client: " << client_state_.id().get()
1295                     << " txc: " << id().get()
1296                     << " state: " << to_string(state_)
1297                     << " -> " << to_string(next_state));
1298 
1299     assert(lock.owns_lock());
1300     // BF aborter is allowed to change the state without gaining control
1301     // to the state if the next state is s_must_abort, s_aborting or
1302     // s_must_replay (for xa idle replay).
1303     assert(client_state_.owning_thread_id_ == wsrep::this_thread::get_id() ||
1304            next_state == s_must_abort ||
1305            next_state == s_must_replay ||
1306            next_state == s_aborting);
1307 
1308     static const char allowed[n_states][n_states] =
1309         { /*  ex pg pd ce co oc ct cf ma ab ad mr re */
1310             { 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0}, /* ex */
1311             { 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0}, /* pg */
1312             { 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0}, /* pd */
1313             { 1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0}, /* ce */
1314             { 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0}, /* co */
1315             { 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, /* oc */
1316             { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ct */
1317             { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0}, /* cf */
1318             { 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0}, /* ma */
1319             { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0}, /* ab */
1320             { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, /* ad */
1321             { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, /* mr */
1322             { 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0}  /* re */
1323         };
1324 
1325     if (!allowed[state_][next_state])
1326     {
1327         wsrep::log_debug() << "unallowed state transition for transaction "
1328                            << id_ << ": " << wsrep::to_string(state_)
1329                            << " -> " << wsrep::to_string(next_state);
1330         assert(0);
1331     }
1332 
1333     state_hist_.push_back(state_);
1334     if (state_hist_.size() == 12)
1335     {
1336         state_hist_.erase(state_hist_.begin());
1337     }
1338     state_ = next_state;
1339 
1340     if (state_ == s_must_replay)
1341     {
1342         client_service_.will_replay();
1343     }
1344 }
1345 
abort_or_interrupt(wsrep::unique_lock<wsrep::mutex> & lock)1346 bool wsrep::transaction::abort_or_interrupt(
1347     wsrep::unique_lock<wsrep::mutex>& lock)
1348 {
1349     assert(lock.owns_lock());
1350     if (state() == s_must_abort)
1351     {
1352         client_state_.override_error(wsrep::e_deadlock_error);
1353         return true;
1354     }
1355     else if (state() == s_aborting || state() == s_aborted)
1356     {
1357         // Somehow we got there after BF abort without setting error
1358         // status. This may happen if the DBMS side aborts the transaction
1359         // but still tries to continue processing rows. Raise the error here
1360         // and assert so that the error will be caught in debug builds.
1361         if (bf_abort_client_state_ &&
1362             client_state_.current_error() == wsrep::e_success)
1363         {
1364             client_state_.override_error(wsrep::e_deadlock_error);
1365             assert(0);
1366         }
1367         return true;
1368     }
1369     else if (client_service_.interrupted(lock))
1370     {
1371         client_state_.override_error(wsrep::e_interrupted_error);
1372         if (state() != s_must_abort)
1373         {
1374             state(lock, s_must_abort);
1375         }
1376         return true;
1377     }
1378     return false;
1379 }
1380 
streaming_step(wsrep::unique_lock<wsrep::mutex> & lock,bool force)1381 int wsrep::transaction::streaming_step(wsrep::unique_lock<wsrep::mutex>& lock,
1382                                        bool force)
1383 {
1384     assert(lock.owns_lock());
1385     assert(streaming_context_.fragment_size() || is_xa());
1386 
1387     if (client_service_.bytes_generated() <
1388         streaming_context_.log_position())
1389     {
1390         /* Something went wrong on DBMS side in keeping track of
1391            generated bytes. Return an error to abort the transaction. */
1392         wsrep::log_warning() << "Bytes generated "
1393                              << client_service_.bytes_generated()
1394                              << " less than bytes certified "
1395                              << streaming_context_.log_position()
1396                              << ", aborting streaming transaction";
1397         return 1;
1398     }
1399     int ret(0);
1400 
1401     const size_t bytes_to_replicate(client_service_.bytes_generated() -
1402                                     streaming_context_.log_position());
1403 
1404     switch (streaming_context_.fragment_unit())
1405     {
1406     case streaming_context::row:
1407         WSREP_FALLTHROUGH;
1408     case streaming_context::statement:
1409         streaming_context_.increment_unit_counter(1);
1410         break;
1411     case streaming_context::bytes:
1412         streaming_context_.set_unit_counter(bytes_to_replicate);
1413         break;
1414     }
1415 
1416     // Some statements have no effect. Do not atttempt to
1417     // replicate a fragment if no data has been generated since
1418     // last fragment replication.
1419     // We use `force = true` on XA prepare: a fragment will be
1420     // generated even if no data is pending replication.
1421     if (bytes_to_replicate <= 0 && !force)
1422     {
1423         assert(bytes_to_replicate == 0);
1424         return ret;
1425     }
1426 
1427     if (streaming_context_.fragment_size_exceeded() || force)
1428     {
1429         streaming_context_.reset_unit_counter();
1430         ret = certify_fragment(lock);
1431     }
1432 
1433     return ret;
1434 }
1435 
certify_fragment(wsrep::unique_lock<wsrep::mutex> & lock)1436 int wsrep::transaction::certify_fragment(
1437     wsrep::unique_lock<wsrep::mutex>& lock)
1438 {
1439     assert(lock.owns_lock());
1440 
1441     assert(client_state_.mode() == wsrep::client_state::m_local);
1442     assert(streaming_context_.rolled_back() == false ||
1443            state() == s_must_abort);
1444 
1445     client_service_.wait_for_replayers(lock);
1446     if (abort_or_interrupt(lock))
1447     {
1448         return 1;
1449     }
1450 
1451     state(lock, s_certifying);
1452     lock.unlock();
1453     client_service_.debug_sync("wsrep_before_fragment_certification");
1454 
1455     enum wsrep::provider::status status(
1456         client_state_.server_state_.send_pending_rollback_events());
1457     if (status)
1458     {
1459         wsrep::log_warning()
1460             << "Failed to replicate pending rollback events: "
1461             << status << " ("
1462             << wsrep::provider::to_string(status) << ")";
1463         lock.lock();
1464         state(lock, s_must_abort);
1465         return 1;
1466     }
1467 
1468     wsrep::mutable_buffer data;
1469     size_t log_position(0);
1470     if (client_service_.prepare_fragment_for_replication(data, log_position))
1471     {
1472         lock.lock();
1473         state(lock, s_must_abort);
1474         client_state_.override_error(wsrep::e_error_during_commit);
1475         return 1;
1476     }
1477     streaming_context_.set_log_position(log_position);
1478 
1479     if (data.size() == 0)
1480     {
1481         wsrep::log_warning() << "Attempt to replicate empty data buffer";
1482         lock.lock();
1483         state(lock, s_executing);
1484         return 0;
1485     }
1486 
1487     if (provider().append_data(ws_handle_,
1488                                wsrep::const_buffer(data.data(), data.size())))
1489     {
1490         lock.lock();
1491         state(lock, s_must_abort);
1492         client_state_.override_error(wsrep::e_error_during_commit);
1493         return 1;
1494     }
1495 
1496     if (is_xa())
1497     {
1498         // One more check to see if the transaction
1499         // has been aborted. This is necessary because
1500         // until the append_data above will make sure
1501         // that the transaction exists in provider.
1502         lock.lock();
1503         if (abort_or_interrupt(lock))
1504         {
1505             return 1;
1506         }
1507         lock.unlock();
1508     }
1509 
1510     if (is_streaming() == false)
1511     {
1512         client_state_.server_state_.start_streaming_client(&client_state_);
1513     }
1514 
1515     if (implicit_deps())
1516     {
1517         flags(flags() | wsrep::provider::flag::implicit_deps);
1518     }
1519 
1520     int ret(0);
1521     enum wsrep::client_error error(wsrep::e_success);
1522     enum wsrep::provider::status cert_ret(wsrep::provider::success);
1523     // Storage service scope
1524     {
1525         scoped_storage_service<storage_service_deleter>
1526             sr_scope(
1527                 client_service_,
1528                 server_service_.storage_service(client_service_),
1529                 storage_service_deleter(server_service_));
1530         wsrep::storage_service& storage_service(
1531             sr_scope.storage_service());
1532 
1533         // First the fragment is appended to the stable storage.
1534         // This is done to ensure that there is enough capacity
1535         // available to store the fragment. The fragment meta data
1536         // is updated after certification.
1537         wsrep::id server_id(client_state_.server_state().id());
1538 
1539         if (server_id.is_undefined()) {
1540             // Server disconnected from cluster, do not
1541             // append a fragment with undefined server_id.
1542             ret = 1;
1543             error = wsrep::e_append_fragment_error;
1544         }
1545 
1546         if (ret == 0 &&
1547             (storage_service.start_transaction(ws_handle_) ||
1548              storage_service.append_fragment(
1549                  server_id,
1550                  id(),
1551                  flags(),
1552                  wsrep::const_buffer(data.data(), data.size()),
1553                  xid())))
1554         {
1555             ret = 1;
1556             error = wsrep::e_append_fragment_error;
1557         }
1558 
1559         if (ret == 0)
1560         {
1561             client_service_.debug_crash(
1562                 "crash_replicate_fragment_before_certify");
1563 
1564             wsrep::ws_meta sr_ws_meta;
1565             cert_ret = provider().certify(client_state_.id(),
1566                                           ws_handle_,
1567                                           flags(),
1568                                           sr_ws_meta);
1569             client_service_.debug_crash(
1570                 "crash_replicate_fragment_after_certify");
1571 
1572             switch (cert_ret)
1573             {
1574             case wsrep::provider::success:
1575                 ++fragments_certified_for_statement_;
1576                 assert(sr_ws_meta.seqno().is_undefined() == false);
1577                 streaming_context_.certified();
1578                 if (storage_service.update_fragment_meta(sr_ws_meta))
1579                 {
1580                     storage_service.rollback(wsrep::ws_handle(),
1581                                              wsrep::ws_meta());
1582                     ret = 1;
1583                     error = wsrep::e_deadlock_error;
1584                     break;
1585                 }
1586                 if (storage_service.commit(ws_handle_, sr_ws_meta))
1587                 {
1588                     ret = 1;
1589                     error = wsrep::e_deadlock_error;
1590                 }
1591                 else
1592                 {
1593                     streaming_context_.stored(sr_ws_meta.seqno());
1594                 }
1595                 client_service_.debug_crash(
1596                     "crash_replicate_fragment_success");
1597                 break;
1598             case wsrep::provider::error_bf_abort:
1599             case wsrep::provider::error_certification_failed:
1600                 // Streaming transcation got BF aborted, so it must roll
1601                 // back. Roll back the fragment storage operation out of
1602                 // order as the commit order will be grabbed later on
1603                 // during rollback process. Mark the fragment as certified
1604                 // though in streaming context in order to enter streaming
1605                 // rollback codepath.
1606                 //
1607                 // Note that despite we handle error_certification_failed
1608                 // here, we mark the transaction as streaming. Apparently
1609                 // the provider may return status corresponding to certification
1610                 // failure even if the fragment has passed certification.
1611                 // This may be a bug in provider implementation or a limitation
1612                 // of error codes defined in wsrep-API. In order to make
1613                 // sure that the transaction will be cleaned on other servers,
1614                 // we take a risk of sending one rollback fragment for nothing.
1615                 storage_service.rollback(wsrep::ws_handle(),
1616                                          wsrep::ws_meta());
1617                 streaming_context_.certified();
1618                 ret = 1;
1619                 error = wsrep::e_deadlock_error;
1620                 break;
1621             default:
1622                 // Storage service rollback must be done out of order,
1623                 // otherwise there may be a deadlock between BF aborter
1624                 // and the rollback process.
1625                 storage_service.rollback(wsrep::ws_handle(), wsrep::ws_meta());
1626                 ret = 1;
1627                 error = wsrep::e_deadlock_error;
1628                 break;
1629             }
1630         }
1631     }
1632 
1633     // Note: This does not release the handle in the provider
1634     // since streaming is still on. However it is needed to
1635     // make provider internal state to transition for the
1636     // next fragment. If any of the operations above failed,
1637     // the handle needs to be left unreleased for the following
1638     // rollback process.
1639     if (ret == 0)
1640     {
1641         assert(error == wsrep::e_success);
1642         ret = provider().release(ws_handle_);
1643         if (ret)
1644         {
1645             error = wsrep::e_deadlock_error;
1646         }
1647     }
1648     lock.lock();
1649     if (ret)
1650     {
1651         assert(error != wsrep::e_success);
1652         if (is_streaming() == false)
1653         {
1654             lock.unlock();
1655             client_state_.server_state_.stop_streaming_client(&client_state_);
1656             lock.lock();
1657         }
1658         else
1659         {
1660             streaming_rollback(lock);
1661         }
1662         if (state_ != s_must_abort)
1663         {
1664             state(lock, s_must_abort);
1665         }
1666         client_state_.override_error(error, cert_ret);
1667     }
1668     else if (state_ == s_must_abort)
1669     {
1670         if (is_streaming())
1671         {
1672             streaming_rollback(lock);
1673         }
1674         client_state_.override_error(wsrep::e_deadlock_error, cert_ret);
1675         ret = 1;
1676     }
1677     else
1678     {
1679         assert(state_ == s_certifying);
1680         state(lock, s_executing);
1681         flags(flags() & ~wsrep::provider::flag::start_transaction);
1682         flags(flags() & ~wsrep::provider::flag::pa_unsafe);
1683     }
1684     return ret;
1685 }
1686 
certify_commit(wsrep::unique_lock<wsrep::mutex> & lock)1687 int wsrep::transaction::certify_commit(
1688     wsrep::unique_lock<wsrep::mutex>& lock)
1689 {
1690     assert(lock.owns_lock());
1691     assert(active());
1692     client_service_.wait_for_replayers(lock);
1693 
1694     assert(lock.owns_lock());
1695 
1696     if (abort_or_interrupt(lock))
1697     {
1698         if (is_xa() && state() == s_must_abort)
1699         {
1700             state(lock, s_must_replay);
1701         }
1702         return 1;
1703     }
1704 
1705     state(lock, s_certifying);
1706     lock.unlock();
1707 
1708     enum wsrep::provider::status status(
1709         client_state_.server_state_.send_pending_rollback_events());
1710     if (status)
1711     {
1712         wsrep::log_warning()
1713             << "Failed to replicate pending rollback events: "
1714             << status << " ("
1715             << wsrep::provider::to_string(status) << ")";
1716 
1717         // We failed to replicate some pending rollback fragment.
1718         // Meaning that some transaction that was rolled back
1719         // locally might still be active out there in the cluster.
1720         // To avoid a potential BF-BF conflict, we need to abort
1721         // and give up on this one.
1722         // Notice that we can't abort a prepared XA that wants to
1723         // commit. Fortunately, there is no need to in this case:
1724         // the commit fragment for XA does not cause any changes and
1725         // can't possibly conflict with other transactions out there.
1726         if (!is_xa())
1727         {
1728             lock.lock();
1729             state(lock, s_must_abort);
1730             return 1;
1731         }
1732     }
1733 
1734     if (is_streaming())
1735     {
1736         if (!is_xa())
1737         {
1738             append_sr_keys_for_commit();
1739         }
1740         pa_unsafe(true);
1741     }
1742 
1743     if (implicit_deps())
1744     {
1745         flags(flags() | wsrep::provider::flag::implicit_deps);
1746     }
1747 
1748     flags(flags() | wsrep::provider::flag::commit);
1749     flags(flags() & ~wsrep::provider::flag::prepare);
1750 
1751     if (client_service_.prepare_data_for_replication())
1752     {
1753         lock.lock();
1754         // Here we fake that the size exceeded error came from provider,
1755         // even though it came from the client service. This requires
1756         // some consideration how to get meaningful error codes from
1757         // the client service.
1758         client_state_.override_error(wsrep::e_size_exceeded_error,
1759                                      wsrep::provider::error_size_exceeded);
1760         if (state_ != s_must_abort)
1761         {
1762             state(lock, s_must_abort);
1763         }
1764         return 1;
1765     }
1766 
1767     client_service_.debug_sync("wsrep_before_certification");
1768     enum wsrep::provider::status
1769         cert_ret(provider().certify(client_state_.id(),
1770                                    ws_handle_,
1771                                    flags(),
1772                                    ws_meta_));
1773     client_service_.debug_sync("wsrep_after_certification");
1774 
1775     lock.lock();
1776 
1777     assert(state() == s_certifying || state() == s_must_abort);
1778 
1779     int ret(1);
1780     switch (cert_ret)
1781     {
1782     case wsrep::provider::success:
1783         assert(ordered());
1784         certified_ = true;
1785         ++fragments_certified_for_statement_;
1786         switch (state())
1787         {
1788         case s_certifying:
1789             if (is_xa())
1790             {
1791                 state(lock, s_committing);
1792             }
1793             else
1794             {
1795                 state(lock, s_preparing);
1796             }
1797             ret = 0;
1798             break;
1799         case s_must_abort:
1800             // We got BF aborted after succesful certification
1801             // and before acquiring client state lock. The trasaction
1802             // must be replayed.
1803             state(lock, s_must_replay);
1804             break;
1805         default:
1806             assert(0);
1807             break;
1808         }
1809         break;
1810     case wsrep::provider::error_warning:
1811         assert(ordered() == false);
1812         state(lock, s_must_abort);
1813         client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1814         break;
1815     case wsrep::provider::error_transaction_missing:
1816         state(lock, s_must_abort);
1817         // The execution should never reach this point if the
1818         // transaction has not generated any keys or data.
1819         wsrep::log_warning() << "Transaction was missing in provider";
1820         client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1821         break;
1822     case wsrep::provider::error_bf_abort:
1823         // Transaction was replicated successfully and it was either
1824         // certified successfully or the result of certifying is not
1825         // yet known. Therefore the transaction must roll back
1826         // and go through replay either to replay and commit the whole
1827         // transaction or to determine failed certification status.
1828         if (state() != s_must_abort)
1829         {
1830             state(lock, s_must_abort);
1831         }
1832         state(lock, s_must_replay);
1833         break;
1834     case wsrep::provider::error_certification_failed:
1835         state(lock, s_cert_failed);
1836         client_state_.override_error(wsrep::e_deadlock_error);
1837         break;
1838     case wsrep::provider::error_size_exceeded:
1839         state(lock, s_must_abort);
1840         client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1841         break;
1842     case wsrep::provider::error_connection_failed:
1843         // Galera provider may return CONN_FAIL if the trx is
1844         // BF aborted O_o. If we see here that the trx was BF aborted,
1845         // return deadlock error instead of error during commit
1846         // to reduce number of error state combinations elsewhere.
1847         if (state() == s_must_abort)
1848         {
1849             if (is_xa())
1850             {
1851                 state(lock, s_must_replay);
1852             }
1853             client_state_.override_error(wsrep::e_deadlock_error);
1854         }
1855         else
1856         {
1857             client_state_.override_error(wsrep::e_error_during_commit,
1858                                          cert_ret);
1859             if (is_xa())
1860             {
1861                 state(lock, s_prepared);
1862             }
1863             else
1864             {
1865                 state(lock, s_must_abort);
1866             }
1867         }
1868         break;
1869     case wsrep::provider::error_provider_failed:
1870         if (state() != s_must_abort)
1871         {
1872             state(lock, s_must_abort);
1873         }
1874         client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1875         break;
1876     case wsrep::provider::error_fatal:
1877         client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1878         state(lock, s_must_abort);
1879         client_service_.emergency_shutdown();
1880         break;
1881     case wsrep::provider::error_not_implemented:
1882     case wsrep::provider::error_not_allowed:
1883         client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1884         state(lock, s_must_abort);
1885         wsrep::log_warning() << "Certification operation was not allowed: "
1886                              << "id: " << id().get()
1887                              << " flags: " << std::hex << flags() << std::dec;
1888         break;
1889     default:
1890         state(lock, s_must_abort);
1891         client_state_.override_error(wsrep::e_error_during_commit, cert_ret);
1892         break;
1893     }
1894 
1895     return ret;
1896 }
1897 
append_sr_keys_for_commit()1898 int wsrep::transaction::append_sr_keys_for_commit()
1899 {
1900     int ret(0);
1901     assert(client_state_.mode() == wsrep::client_state::m_local);
1902     for (wsrep::sr_key_set::branch_type::const_iterator
1903              i(sr_keys_.root().begin());
1904          ret == 0 && i != sr_keys_.root().end(); ++i)
1905     {
1906         for (wsrep::sr_key_set::leaf_type::const_iterator
1907                  j(i->second.begin());
1908              ret == 0 && j != i->second.end(); ++j)
1909         {
1910             wsrep::key key(wsrep::key::shared);
1911             key.append_key_part(i->first.data(), i->first.size());
1912             key.append_key_part(j->data(), j->size());
1913             ret = provider().append_key(ws_handle_, key);
1914         }
1915     }
1916     return ret;
1917 }
1918 
streaming_rollback(wsrep::unique_lock<wsrep::mutex> & lock)1919 void wsrep::transaction::streaming_rollback(
1920     wsrep::unique_lock<wsrep::mutex>& lock)
1921 {
1922     debug_log_state("streaming_rollback enter");
1923     assert(state_ != s_must_replay);
1924     assert(is_streaming());
1925     assert(lock.owns_lock());
1926 
1927     // Prevent streaming_rollback() to be executed simultaneously.
1928     // Notice that lock is unlocked when calling into server_state
1929     // methods, to avoid violating lock order.
1930     // The condition variable below prevents a thread to go
1931     // through streaming_rollback() while another thread is busy
1932     // stopping or converting the streaming_client().
1933     // This would be problematic if a thread is performing BF abort,
1934     // while the original client manages to complete its rollback
1935     // and therefore change the state of the transaction, causing
1936     // assertions to fire.
1937     while (streaming_rollback_in_progress_)
1938         client_state_.cond_.wait(lock);
1939     streaming_rollback_in_progress_ = true;
1940 
1941     if (streaming_context_.rolled_back() == false)
1942     {
1943         if (bf_aborted_in_total_order_)
1944         {
1945             lock.unlock();
1946             client_state_.server_state_.stop_streaming_client(&client_state_);
1947             lock.lock();
1948         }
1949         else
1950         {
1951             // Create a high priority applier which will handle the
1952             // rollback fragment or clean up on configuration change.
1953             // Adopt transaction will copy fragment set and appropriate
1954             // meta data.
1955             lock.unlock();
1956             server_service_.debug_sync("wsrep_streaming_rollback");
1957             client_state_.server_state_.convert_streaming_client_to_applier(
1958                 &client_state_);
1959             lock.lock();
1960             streaming_context_.cleanup();
1961 
1962             enum wsrep::provider::status status(provider().rollback(id_));
1963             if (status)
1964             {
1965                 lock.unlock();
1966                 client_state_.server_state_.queue_rollback_event(id_);
1967                 lock.lock();
1968                 wsrep::log_debug()
1969                     << "Failed to replicate rollback fragment for " << id_
1970                     << ": " << status << " ( "
1971                     << wsrep::provider::to_string(status) << ")";
1972             }
1973         }
1974 
1975         // Mark the streaming context as rolled back,
1976         // so that this block is executed once.
1977         streaming_context_.rolled_back(id_);
1978     }
1979 
1980     debug_log_state("streaming_rollback leave");
1981     streaming_rollback_in_progress_ = false;
1982     client_state_.cond_.notify_all();
1983 }
1984 
replay(wsrep::unique_lock<wsrep::mutex> & lock)1985 int wsrep::transaction::replay(wsrep::unique_lock<wsrep::mutex>& lock)
1986 {
1987     int ret(0);
1988     state(lock, s_replaying);
1989     // Need to remember streaming state before replay, entering
1990     // after_commit() after succesful replay will clear
1991     // fragments.
1992     const bool was_streaming(is_streaming());
1993     lock.unlock();
1994     client_service_.debug_sync("wsrep_before_replay");
1995     enum wsrep::provider::status replay_ret(client_service_.replay());
1996     client_service_.signal_replayed();
1997     if (was_streaming)
1998     {
1999         client_state_.server_state_.stop_streaming_client(&client_state_);
2000     }
2001     lock.lock();
2002     switch (replay_ret)
2003     {
2004     case wsrep::provider::success:
2005         if (state() == s_replaying)
2006         {
2007             // Replay was done by using different client state, adjust state
2008             // to committed.
2009             state(lock, s_committed);
2010         }
2011         if (is_streaming())
2012         {
2013             streaming_context_.cleanup();
2014         }
2015         provider().release(ws_handle_);
2016         break;
2017     case wsrep::provider::error_certification_failed:
2018         client_state_.override_error(
2019             wsrep::e_deadlock_error);
2020         if (is_streaming())
2021         {
2022             client_service_.remove_fragments();
2023             streaming_context_.cleanup();
2024         }
2025         state(lock, s_aborted);
2026         ret = 1;
2027         break;
2028     default:
2029         client_service_.emergency_shutdown();
2030         break;
2031     }
2032 
2033     WSREP_LOG_DEBUG(client_state_.debug_log_level(),
2034                     wsrep::log::debug_level_transaction,
2035                     "replay returned" << replay_ret);
2036     return ret;
2037 }
2038 
cleanup()2039 void wsrep::transaction::cleanup()
2040 {
2041     debug_log_state("cleanup_enter");
2042     assert(is_streaming() == false);
2043     assert(state() == s_committed || state() == s_aborted);
2044     id_ = wsrep::transaction_id::undefined();
2045     ws_handle_ = wsrep::ws_handle();
2046     // Keep the state history for troubleshooting. Reset
2047     // at start_transaction().
2048     // state_hist_.clear();
2049     if (ordered())
2050     {
2051         client_state_.update_last_written_gtid(ws_meta_.gtid());
2052     }
2053     bf_abort_state_ = s_executing;
2054     bf_abort_provider_status_ = wsrep::provider::success;
2055     bf_abort_client_state_ = 0;
2056     bf_aborted_in_total_order_ = false;
2057     ws_meta_ = wsrep::ws_meta();
2058     flags_ = 0;
2059     certified_ = false;
2060     implicit_deps_ = false;
2061     sr_keys_.clear();
2062     streaming_context_.cleanup();
2063     client_service_.cleanup_transaction();
2064     apply_error_buf_.clear();
2065     xid_.clear();
2066     debug_log_state("cleanup_leave");
2067 }
2068 
debug_log_state(const char * context) const2069 void wsrep::transaction::debug_log_state(
2070     const char* context) const
2071 {
2072     WSREP_LOG_DEBUG(
2073         client_state_.debug_log_level(),
2074         wsrep::log::debug_level_transaction,
2075         context
2076         << "\n    server: " << server_id_
2077         << ", client: " << int64_t(client_state_.id().get())
2078         << ", state: " << wsrep::to_c_string(client_state_.state())
2079         << ", mode: " << wsrep::to_c_string(client_state_.mode())
2080         << "\n    trx_id: " << int64_t(id_.get())
2081         << ", seqno: " << ws_meta_.seqno().get()
2082         << ", flags: " << flags()
2083         << "\n"
2084         << "    state: " << wsrep::to_c_string(state_)
2085         << ", bfa_state: " << wsrep::to_c_string(bf_abort_state_)
2086         << ", error: " << wsrep::to_c_string(client_state_.current_error())
2087         << ", status: " << client_state_.current_error_status()
2088         << "\n"
2089         << "    is_sr: " << is_streaming()
2090         << ", frags: " << streaming_context_.fragments_certified()
2091         << ", frags size: " << streaming_context_.fragments().size()
2092         << ", unit: " << streaming_context_.fragment_unit()
2093         << ", size: " << streaming_context_.fragment_size()
2094         << ", counter: " << streaming_context_.unit_counter()
2095         << ", log_pos: " << streaming_context_.log_position()
2096         << ", sr_rb: " << streaming_context_.rolled_back()
2097         << "\n    own: " << (client_state_.owning_thread_id_ == wsrep::this_thread::get_id())
2098         << " thread_id: " << client_state_.owning_thread_id_
2099         << "");
2100 }
2101 
debug_log_key_append(const wsrep::key & key) const2102 void wsrep::transaction::debug_log_key_append(const wsrep::key& key) const
2103 {
2104     WSREP_LOG_DEBUG(client_state_.debug_log_level(),
2105                     wsrep::log::debug_level_transaction,
2106                     "key_append: "
2107                     << "trx_id: "
2108                     << int64_t(id().get())
2109                     << " append key:\n" << key);
2110 }
2111