1 /*
2  * Copyright (C) 2018 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "db_client.hpp"
21 #include "db_server.hpp"
22 
23 #include "wsrep/logger.hpp"
24 
client(db::server & server,wsrep::client_id client_id,enum wsrep::client_state::mode mode,const db::params & params)25 db::client::client(db::server& server,
26                    wsrep::client_id client_id,
27                    enum wsrep::client_state::mode mode,
28                    const db::params& params)
29     : mutex_()
30     , cond_()
31     , params_(params)
32     , server_(server)
33     , server_state_(server.server_state())
34     , client_state_(mutex_, cond_, server_state_, client_service_, client_id, mode)
35     , client_service_(*this)
36     , se_trx_(server.storage_engine())
37     , data_()
38     , random_device_()
39     , random_engine_(random_device_())
40     , stats_()
41 {
42     data_.resize(params.max_data_size);
43 }
44 
start()45 void db::client::start()
46 {
47     client_state_.open(client_state_.id());
48     for (size_t i(0); i < params_.n_transactions; ++i)
49     {
50         run_one_transaction();
51         report_progress(i + 1);
52     }
53     client_state_.close();
54     client_state_.cleanup();
55 }
56 
bf_abort(wsrep::seqno seqno)57 bool db::client::bf_abort(wsrep::seqno seqno)
58 {
59     return client_state_.bf_abort(seqno);
60 }
61 
62 ////////////////////////////////////////////////////////////////////////////////
63 //                              Private                                       //
64 ////////////////////////////////////////////////////////////////////////////////
65 
66 template <class F>
client_command(F f)67 int db::client::client_command(F f)
68 {
69     int err(client_state_.before_command());
70     // wsrep::log_debug() << "before_command: " << err;
71     // If err != 0, transaction was BF aborted while client idle
72     if (err == 0)
73     {
74         err = client_state_.before_statement();
75         if (err == 0)
76         {
77             err = f();
78         }
79         client_state_.after_statement();
80     }
81     client_state_.after_command_before_result();
82     if (client_state_.current_error())
83     {
84         // wsrep::log_info() << "Current error";
85         assert(client_state_.transaction().state() ==
86                wsrep::transaction::s_aborted);
87         err = 1;
88     }
89     client_state_.after_command_after_result();
90     // wsrep::log_info() << "client_command(): " << err;
91     return err;
92 }
93 
run_one_transaction()94 void db::client::run_one_transaction()
95 {
96     if (params_.sync_wait)
97     {
98         if (client_state_.sync_wait(5))
99         {
100             throw wsrep::runtime_error("Sync wait failed");
101         }
102     }
103     client_state_.reset_error();
104     int err = client_command(
105         [&]()
106         {
107             // wsrep::log_debug() << "Start transaction";
108             err = client_state_.start_transaction(
109                 wsrep::transaction_id(server_.next_transaction_id()));
110             assert(err == 0);
111             se_trx_.start(this);
112             return err;
113         });
114 
115     const wsrep::transaction& transaction(
116         client_state_.transaction());
117 
118     err = err || client_command(
119         [&]()
120         {
121             // wsrep::log_debug() << "Generate write set";
122             assert(transaction.active());
123             assert(err == 0);
124             std::uniform_int_distribution<size_t> uniform_dist(0, params_.n_rows);
125             const size_t randkey(uniform_dist(random_engine_));
126             ::memcpy(data_.data(), &randkey,
127                      std::min(sizeof(randkey), data_.size()));
128             wsrep::key key(wsrep::key::exclusive);
129             key.append_key_part("dbms", 4);
130             unsigned long long client_key(client_state_.id().get());
131             key.append_key_part(&client_key, sizeof(client_key));
132             key.append_key_part(&randkey, sizeof(randkey));
133             err = client_state_.append_key(key);
134             size_t bytes_to_append(data_.size());
135             if (params_.random_data_size)
136             {
137                 bytes_to_append = std::uniform_int_distribution<size_t>(
138                     1, data_.size())(random_engine_);
139             }
140             err = err || client_state_.append_data(
141                 wsrep::const_buffer(data_.data(), bytes_to_append));
142             return err;
143         });
144 
145     err = err || client_command(
146         [&]()
147         {
148             // wsrep::log_debug() << "Commit";
149             assert(err == 0);
150             if (do_2pc())
151             {
152                 err = err || client_state_.before_prepare();
153                 err = err || client_state_.after_prepare();
154             }
155             err = err || client_state_.before_commit();
156             if (err == 0) se_trx_.commit(transaction.ws_meta().gtid());
157             err = err || client_state_.ordered_commit();
158             err = err || client_state_.after_commit();
159             if (err)
160             {
161                 client_state_.before_rollback();
162                 se_trx_.rollback();
163                 client_state_.after_rollback();
164             }
165             return err;
166         });
167 
168     assert(err ||
169            transaction.state() == wsrep::transaction::s_aborted ||
170            transaction.state() == wsrep::transaction::s_committed);
171     assert(se_trx_.active() == false);
172     assert(transaction.active() == false);
173 
174     switch (transaction.state())
175     {
176     case wsrep::transaction::s_committed:
177         ++stats_.commits;
178         break;
179     case wsrep::transaction::s_aborted:
180         ++stats_.rollbacks;
181         break;
182     default:
183         assert(0);
184     }
185 }
186 
report_progress(size_t i) const187 void db::client::report_progress(size_t i) const
188 {
189     if ((i % 1000) == 0)
190     {
191         wsrep::log_info() << "client: " << client_state_.id().get()
192                           << " transactions: " << i
193                           << " " << 100*double(i)/double(params_.n_transactions) << "%";
194     }
195 }
196