1 /*
2  * Copyright (C) 2018 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "db_storage_engine.hpp"
21 #include "db_client.hpp"
22 
start(db::client * cc)23 void db::storage_engine::transaction::start(db::client* cc)
24 {
25     wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
26     if (se_.transactions_.insert(cc).second == false)
27     {
28         ::abort();
29     }
30     cc_ = cc;
31 }
32 
apply(const wsrep::transaction & transaction)33 void db::storage_engine::transaction::apply(
34     const wsrep::transaction& transaction)
35 {
36     assert(cc_);
37     se_.bf_abort_some(transaction);
38 }
39 
commit(const wsrep::gtid & gtid)40 void db::storage_engine::transaction::commit(const wsrep::gtid& gtid)
41 {
42     if (cc_)
43     {
44         wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
45         se_.transactions_.erase(cc_);
46         se_.store_position(gtid);
47     }
48     cc_ = nullptr;
49 }
50 
51 
rollback()52 void db::storage_engine::transaction::rollback()
53 {
54     if (cc_)
55     {
56         wsrep::unique_lock<wsrep::mutex> lock(se_.mutex_);
57         se_.transactions_.erase(cc_);
58     }
59     cc_ = nullptr;
60 }
61 
bf_abort_some(const wsrep::transaction & txc)62 void db::storage_engine::bf_abort_some(const wsrep::transaction& txc)
63 {
64     std::uniform_int_distribution<size_t> uniform_dist(0, alg_freq_);
65     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
66     if (alg_freq_ && uniform_dist(random_engine_) == 0)
67     {
68         if (transactions_.empty() == false)
69         {
70             for (auto victim : transactions_)
71             {
72                 wsrep::client_state& cc(victim->client_state());
73                 if (cc.mode() == wsrep::client_state::m_local)
74                 {
75                     if (victim->bf_abort(txc.seqno()))
76                     {
77                         ++bf_aborts_;
78                     }
79                     break;
80                 }
81             }
82         }
83     }
84 }
85 
store_position(const wsrep::gtid & gtid)86 void db::storage_engine::store_position(const wsrep::gtid& gtid)
87 {
88     validate_position(gtid);
89     position_ = gtid;
90 }
91 
get_position() const92 wsrep::gtid db::storage_engine::get_position() const
93 {
94     return position_;
95 }
96 
store_view(const wsrep::view & view)97 void db::storage_engine::store_view(const wsrep::view& view)
98 {
99     view_ = view;
100 }
101 
get_view() const102 wsrep::view db::storage_engine::get_view() const
103 {
104     return view_;
105 }
106 
validate_position(const wsrep::gtid & gtid) const107 void db::storage_engine::validate_position(const wsrep::gtid& gtid) const
108 {
109     using std::rel_ops::operator<=;
110     if (position_.id() == gtid.id() && gtid.seqno() <= position_.seqno())
111     {
112         std::ostringstream os;
113         os << "Invalid position submitted, position seqno "
114            << position_.seqno()
115            << " is greater than submitted seqno "
116            << gtid.seqno();
117         throw wsrep::runtime_error(os.str());
118     }
119 }
120