1 /*
2 * Copyright (C) 2018 Codership Oy <info@codership.com>
3 *
4 * This file is part of wsrep-lib.
5 *
6 * Wsrep-lib is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * Wsrep-lib is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20 #include "db_client.hpp"
21 #include "db_server.hpp"
22
23 #include "wsrep/logger.hpp"
24
client(db::server & server,wsrep::client_id client_id,enum wsrep::client_state::mode mode,const db::params & params)25 db::client::client(db::server& server,
26 wsrep::client_id client_id,
27 enum wsrep::client_state::mode mode,
28 const db::params& params)
29 : mutex_()
30 , cond_()
31 , params_(params)
32 , server_(server)
33 , server_state_(server.server_state())
34 , client_state_(mutex_, cond_, server_state_, client_service_, client_id, mode)
35 , client_service_(*this)
36 , se_trx_(server.storage_engine())
37 , data_()
38 , random_device_()
39 , random_engine_(random_device_())
40 , stats_()
41 {
42 data_.resize(params.max_data_size);
43 }
44
start()45 void db::client::start()
46 {
47 client_state_.open(client_state_.id());
48 for (size_t i(0); i < params_.n_transactions; ++i)
49 {
50 run_one_transaction();
51 report_progress(i + 1);
52 }
53 client_state_.close();
54 client_state_.cleanup();
55 }
56
bf_abort(wsrep::seqno seqno)57 bool db::client::bf_abort(wsrep::seqno seqno)
58 {
59 return client_state_.bf_abort(seqno);
60 }
61
62 ////////////////////////////////////////////////////////////////////////////////
63 // Private //
64 ////////////////////////////////////////////////////////////////////////////////
65
66 template <class F>
client_command(F f)67 int db::client::client_command(F f)
68 {
69 int err(client_state_.before_command());
70 // wsrep::log_debug() << "before_command: " << err;
71 // If err != 0, transaction was BF aborted while client idle
72 if (err == 0)
73 {
74 err = client_state_.before_statement();
75 if (err == 0)
76 {
77 err = f();
78 }
79 client_state_.after_statement();
80 }
81 client_state_.after_command_before_result();
82 if (client_state_.current_error())
83 {
84 // wsrep::log_info() << "Current error";
85 assert(client_state_.transaction().state() ==
86 wsrep::transaction::s_aborted);
87 err = 1;
88 }
89 client_state_.after_command_after_result();
90 // wsrep::log_info() << "client_command(): " << err;
91 return err;
92 }
93
run_one_transaction()94 void db::client::run_one_transaction()
95 {
96 if (params_.sync_wait)
97 {
98 if (client_state_.sync_wait(5))
99 {
100 throw wsrep::runtime_error("Sync wait failed");
101 }
102 }
103 client_state_.reset_error();
104 int err = client_command(
105 [&]()
106 {
107 // wsrep::log_debug() << "Start transaction";
108 err = client_state_.start_transaction(
109 wsrep::transaction_id(server_.next_transaction_id()));
110 assert(err == 0);
111 se_trx_.start(this);
112 return err;
113 });
114
115 const wsrep::transaction& transaction(
116 client_state_.transaction());
117
118 err = err || client_command(
119 [&]()
120 {
121 // wsrep::log_debug() << "Generate write set";
122 assert(transaction.active());
123 assert(err == 0);
124 std::uniform_int_distribution<size_t> uniform_dist(0, params_.n_rows);
125 const size_t randkey(uniform_dist(random_engine_));
126 ::memcpy(data_.data(), &randkey,
127 std::min(sizeof(randkey), data_.size()));
128 wsrep::key key(wsrep::key::exclusive);
129 key.append_key_part("dbms", 4);
130 unsigned long long client_key(client_state_.id().get());
131 key.append_key_part(&client_key, sizeof(client_key));
132 key.append_key_part(&randkey, sizeof(randkey));
133 err = client_state_.append_key(key);
134 size_t bytes_to_append(data_.size());
135 if (params_.random_data_size)
136 {
137 bytes_to_append = std::uniform_int_distribution<size_t>(
138 1, data_.size())(random_engine_);
139 }
140 err = err || client_state_.append_data(
141 wsrep::const_buffer(data_.data(), bytes_to_append));
142 return err;
143 });
144
145 err = err || client_command(
146 [&]()
147 {
148 // wsrep::log_debug() << "Commit";
149 assert(err == 0);
150 if (do_2pc())
151 {
152 err = err || client_state_.before_prepare();
153 err = err || client_state_.after_prepare();
154 }
155 err = err || client_state_.before_commit();
156 if (err == 0) se_trx_.commit(transaction.ws_meta().gtid());
157 err = err || client_state_.ordered_commit();
158 err = err || client_state_.after_commit();
159 if (err)
160 {
161 client_state_.before_rollback();
162 se_trx_.rollback();
163 client_state_.after_rollback();
164 }
165 return err;
166 });
167
168 assert(err ||
169 transaction.state() == wsrep::transaction::s_aborted ||
170 transaction.state() == wsrep::transaction::s_committed);
171 assert(se_trx_.active() == false);
172 assert(transaction.active() == false);
173
174 switch (transaction.state())
175 {
176 case wsrep::transaction::s_committed:
177 ++stats_.commits;
178 break;
179 case wsrep::transaction::s_aborted:
180 ++stats_.rollbacks;
181 break;
182 default:
183 assert(0);
184 }
185 }
186
report_progress(size_t i) const187 void db::client::report_progress(size_t i) const
188 {
189 if ((i % 1000) == 0)
190 {
191 wsrep::log_info() << "client: " << client_state_.id().get()
192 << " transactions: " << i
193 << " " << 100*double(i)/double(params_.n_transactions) << "%";
194 }
195 }
196