1 /*
2  * Copyright (C) 2018 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "db_simulator.hpp"
21 #include "db_client.hpp"
22 #include "db_threads.hpp"
23 #include "db_tls.hpp"
24 
25 #include "wsrep/logger.hpp"
26 
27 #include <boost/filesystem.hpp>
28 #include <sstream>
29 
30 static db::ti thread_instrumentation;
31 static db::tls tls_service;
32 
run()33 void db::simulator::run()
34 {
35     start();
36     stop();
37     std::flush(std::cerr);
38     std::cout << "Results:\n";
39     std::cout << stats() << std::endl;
40     std::cout << db::ti::stats() << std::endl;
41     std::cout << db::tls::stats() << std::endl;
42 }
43 
sst(db::server & server,const std::string & request,const wsrep::gtid & gtid,bool bypass)44 void db::simulator::sst(db::server& server,
45                         const std::string& request,
46                         const wsrep::gtid& gtid,
47                         bool bypass)
48 {
49     // The request may contain extra trailing '\0' after it goes
50     // through the provider, strip it first.
51     std::string name(request);
52     name.erase(std::find(name.begin(), name.end(), '\0'), name.end());
53 
54     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
55     auto i(servers_.find(name));
56     wsrep::log_info() << "SST request '" << name << "'";
57     if (i == servers_.end())
58     {
59         wsrep::log_error() << "Server " << request << " not found";
60         wsrep::log_info() << "servers:";
61         for (const auto& s : servers_)
62         {
63             wsrep::log_info() << "server: " << s.first;
64         }
65         throw wsrep::runtime_error("Server " + request + " not found");
66     }
67     if (bypass == false)
68     {
69         wsrep::log_info() << "SST "
70                           << server.server_state().name()
71                           << " -> " << request;
72         i->second->storage_engine().store_position(gtid);
73         i->second->storage_engine().store_view(
74             server.storage_engine().get_view());
75     }
76 
77     db::client dummy(*(i->second), wsrep::client_id(-1),
78                      wsrep::client_state::m_local, params());
79 
80     i->second->server_state().sst_received(dummy.client_service(), 0);
81     server.server_state().sst_sent(gtid, 0);
82 }
83 
stats() const84 std::string db::simulator::stats() const
85 {
86     auto duration(std::chrono::duration<double>(
87                       clients_stop_ - clients_start_).count());
88     long long transactions(stats_.commits + stats_.rollbacks);
89     long long bf_aborts(0);
90     for (const auto& s : servers_)
91     {
92         bf_aborts += s.second->storage_engine().bf_aborts();
93     }
94     std::ostringstream os;
95     os << "Number of transactions: " << transactions
96        << "\n"
97        << "Seconds: " << duration
98        << " \n"
99        << "Transactions per second: " << double(transactions)/double(duration)
100        << "\n"
101        << "BF aborts: "
102        << bf_aborts
103        << "\n"
104        << "Client commits: " << stats_.commits
105        << "\n"
106        << "Client rollbacks: " << stats_.rollbacks
107        << "\n"
108        << "Client replays: " << stats_.replays;
109     return os.str();
110 }
111 
112 ////////////////////////////////////////////////////////////////////////////////
113 //                              Private                                       //
114 ////////////////////////////////////////////////////////////////////////////////
115 
start()116 void db::simulator::start()
117 {
118     thread_instrumentation.level(params_.thread_instrumentation);
119     thread_instrumentation.cond_checks(params_.cond_checks);
120     tls_service.init(params_.tls_service);
121     wsrep::log_info() << "Provider: " << params_.wsrep_provider;
122 
123     std::string cluster_address(build_cluster_address());
124     wsrep::log_info() << "Cluster address: " << cluster_address;
125     for (size_t i(0); i < params_.n_servers; ++i)
126     {
127         std::ostringstream name_os;
128         name_os << (i + 1);
129         std::ostringstream id_os;
130         id_os << (i + 1);
131         std::ostringstream address_os;
132         address_os << "127.0.0.1:" << server_port(i);
133         wsrep::id server_id(id_os.str());
134         auto it(servers_.insert(
135                     std::make_pair(
136                         name_os.str(),
137                         std::make_unique<db::server>(
138                             *this,
139                             name_os.str(),
140                             address_os.str()))));
141         if (it.second == false)
142         {
143             throw wsrep::runtime_error("Failed to add server");
144         }
145         boost::filesystem::path dir("dbsim_" + id_os.str() + "_data");
146         boost::filesystem::create_directory(dir);
147 
148         db::server& server(*it.first->second);
149         server.server_state().debug_log_level(params_.debug_log_level);
150         std::string server_options(params_.wsrep_provider_options);
151 
152         wsrep::provider::services services;
153         services.thread_service = params_.thread_instrumentation
154                                       ? &thread_instrumentation
155                                       : nullptr;
156         services.tls_service = params_.tls_service
157             ? &tls_service
158             : nullptr;
159         if (server.server_state().load_provider(params_.wsrep_provider,
160                                                 server_options, services))
161         {
162             throw wsrep::runtime_error("Failed to load provider");
163         }
164         if (server.server_state().connect("sim_cluster", cluster_address, "",
165                                           i == 0))
166         {
167             throw wsrep::runtime_error("Failed to connect");
168         }
169         wsrep::log_debug() << "main: Starting applier";
170         server.start_applier();
171         wsrep::log_debug() << "main: Waiting initializing state";
172         server.server_state().wait_until_state(wsrep::server_state::s_initializing);
173         wsrep::log_debug() << "main: Calling initialized";
174         server.server_state().initialized();
175         wsrep::log_debug() << "main: Waiting for synced state";
176         server.server_state().wait_until_state(
177             wsrep::server_state::s_synced);
178         wsrep::log_debug() << "main: Server synced";
179     }
180 
181     // Start client threads
182     wsrep::log_info() << "####################### Starting client load";
183     clients_start_ = std::chrono::steady_clock::now();
184     size_t index(0);
185     for (auto& i : servers_)
186     {
187         if (params_.topology.size() == 0 || params_.topology[index]  == 'm')
188         {
189             i.second->start_clients();
190         }
191         ++index;
192     }
193 }
194 
stop()195 void db::simulator::stop()
196 {
197     for (auto& i : servers_)
198     {
199         db::server& server(*i.second);
200         server.stop_clients();
201     }
202     clients_stop_ = std::chrono::steady_clock::now();
203     wsrep::log_info() << "######## Stats ############";
204     wsrep::log_info()  << stats();
205     std::cout << db::ti::stats() << std::endl;
206     wsrep::log_info() << "######## Stats ############";
207     if (params_.fast_exit)
208     {
209         exit(0);
210     }
211     for (auto& i : servers_)
212     {
213         db::server& server(*i.second);
214         wsrep::log_info() << "Status for server: "
215                           << server.server_state().id();
216         auto status(server.server_state().provider().status());
217         for_each(status.begin(), status.end(),
218                  [](const wsrep::provider::status_variable& sv)
219                  {
220                      wsrep::log_info() << sv.name() << " = " << sv.value();
221                  });
222         server.server_state().disconnect();
223         server.server_state().wait_until_state(
224             wsrep::server_state::s_disconnected);
225         server.stop_applier();
226         server.server_state().unload_provider();
227     }
228 }
229 
server_port(size_t i) const230 std::string db::simulator::server_port(size_t i) const
231 {
232     std::ostringstream os;
233     os << (10000 + (i + 1)*10);
234     return os.str();
235 }
236 
build_cluster_address() const237 std::string db::simulator::build_cluster_address() const
238 {
239     std::string ret;
240     if (params_.wsrep_provider.find("galera_smm") != std::string::npos)
241     {
242         ret += "gcomm://";
243     }
244 
245     for (size_t i(0); i < params_.n_servers; ++i)
246     {
247         std::ostringstream sa_os;
248         sa_os << "127.0.0.1:";
249         sa_os << server_port(i);
250         ret += sa_os.str();
251         if (i < params_.n_servers - 1) ret += ",";
252     }
253     return ret;
254 }
255