1 /*
2 * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License, version 2.0,
6 * as published by the Free Software Foundation.
7 *
8 * This program is also distributed with certain software (including
9 * but not limited to OpenSSL) that is licensed under separate terms,
10 * as designated in a particular file or component or in included license
11 * documentation. The authors of MySQL hereby grant you an additional
12 * permission to link the program and your derivative works with the
13 * separately licensed software that they have included with MySQL.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License, version 2.0, for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "plugin/x/tests/driver/connector/connection_manager.h"
26
27 #include <sstream>
28 #include <stdexcept>
29 #include <utility>
30 #include <vector>
31
32 #include "my_dbug.h"
33
34 #include "plugin/x/tests/driver/processor/variable_names.h"
35
36 google::protobuf::LogHandler *g_lh = nullptr;
37
Connection_manager(const Connection_options & co,Variable_container * variables,const Console & console)38 Connection_manager::Connection_manager(const Connection_options &co,
39 Variable_container *variables,
40 const Console &console)
41 : m_default_connection_options(co),
42 m_variables(variables),
43 m_console(console) {
44 g_lh = google::protobuf::SetLogHandler([](google::protobuf::LogLevel level,
45 const char *filename, int line,
46 const std::string &message) {
47 if (g_lh) g_lh(level, filename, line, message);
48 DBUG_LOG("debug",
49 "Protobuf error (level:" << level << ", filename:" << filename
50 << ":" << line << ", text:" << message);
51 });
52 m_variables->make_special_variable(
53 k_variable_option_user,
54 new Variable_dynamic_string(m_default_connection_options.user));
55
56 m_variables->make_special_variable(
57 k_variable_option_pass,
58 new Variable_dynamic_string(m_default_connection_options.password));
59
60 m_variables->make_special_variable(
61 k_variable_option_host,
62 new Variable_dynamic_string(m_default_connection_options.host));
63
64 m_variables->make_special_variable(
65 k_variable_option_socket,
66 new Variable_dynamic_string(m_default_connection_options.socket));
67
68 m_variables->make_special_variable(
69 k_variable_option_schema,
70 new Variable_dynamic_string(m_default_connection_options.schema));
71
72 m_variables->make_special_variable(
73 k_variable_option_port,
74 new Variable_dynamic_int(m_default_connection_options.port));
75
76 m_variables->make_special_variable(
77 k_variable_option_ssl_mode,
78 new Variable_dynamic_string(m_default_connection_options.ssl_mode));
79
80 m_variables->make_special_variable(
81 k_variable_option_ssl_cipher,
82 new Variable_dynamic_string(m_default_connection_options.ssl_cipher));
83
84 m_variables->make_special_variable(
85 k_variable_option_tls_version,
86 new Variable_dynamic_string(m_default_connection_options.allowed_tls));
87
88 m_variables->make_special_variable(
89 k_variable_option_compression_algorithm,
90 new Variable_dynamic_array_of_strings(
91 m_default_connection_options.compression_algorithm));
92
93 m_variables->make_special_variable(
94 k_variable_option_compression_combine_mixed_messages,
95 new Variable_string_readonly(
96 m_default_connection_options.compression_combine_mixed_messages));
97
98 m_variables->make_special_variable(
99 k_variable_option_compression_max_combine_messages,
100 new Variable_string_readonly(
101 m_default_connection_options.compression_max_combine_messages));
102
103 const std::string level =
104 m_default_connection_options.compression_level.has_value()
105 ? std::to_string(
106 m_default_connection_options.compression_level.value())
107 : std::string("DEFAULT");
108
109 m_variables->make_special_variable(k_variable_option_compression_level,
110 new Variable_string_readonly(level));
111
112 m_active_holder.reset(new Session_holder(xcl::create_session(), m_console,
113 m_default_connection_options));
114
115 m_session_holders[""] = m_active_holder;
116 }
117
~Connection_manager()118 Connection_manager::~Connection_manager() {
119 std::vector<std::string> disconnect_following;
120
121 for (const auto &connection : m_session_holders) {
122 if (connection.first != "") {
123 disconnect_following.push_back(connection.first);
124 }
125 }
126
127 for (auto &name : disconnect_following) {
128 safe_close(name);
129 }
130
131 if (m_session_holders.count("") > 0) {
132 safe_close("");
133 }
134 }
135
get_credentials(std::string * ret_user,std::string * ret_pass)136 void Connection_manager::get_credentials(std::string *ret_user,
137 std::string *ret_pass) {
138 assert(ret_user);
139 assert(ret_pass);
140
141 *ret_user = m_default_connection_options.user;
142 *ret_pass = m_default_connection_options.password;
143 }
144
safe_close(const std::string & name)145 void Connection_manager::safe_close(const std::string &name) {
146 try {
147 set_active(name, true);
148 close_active(true, true);
149 } catch (const std::exception &) {
150 } catch (const xcl::XError &) {
151 }
152 }
153
connect_default(const bool send_cap_password_expired,const bool client_interactive,const bool no_auth,const bool connect_attrs)154 void Connection_manager::connect_default(const bool send_cap_password_expired,
155 const bool client_interactive,
156 const bool no_auth,
157 const bool connect_attrs) {
158 m_console.print_verbose("Connecting...\n");
159
160 auto session = m_active_holder->get_session();
161
162 if (send_cap_password_expired) {
163 session->set_capability(
164 xcl::XSession::Capability_can_handle_expired_password, true);
165 }
166
167 if (client_interactive) {
168 session->set_capability(xcl::XSession::Capability_client_interactive, true);
169 }
170
171 if (connect_attrs) {
172 auto attrs = session->get_connect_attrs();
173 attrs.emplace_back("program_name", xcl::Argument_value{"mysqlxtest"});
174 session->set_capability(xcl::XSession::Capability_session_connect_attrs,
175 attrs, false);
176 }
177
178 xcl::XError error = m_active_holder->connect(no_auth);
179
180 if (error) {
181 // In case of configuration error, lets do safe_close to synchronize
182 // closing of socket with exist of mysqlxtest (in other case mysqlxtest
183 // is going to exit and after a while the connection is going to be
184 // accepted on server side).
185 if (CR_X_TLS_WRONG_CONFIGURATION != error.error() || no_auth) {
186 session->get_protocol().get_connection().close();
187 }
188
189 throw error;
190 }
191
192 setup_variables(session);
193
194 m_console.print_verbose("Connected client #", session->client_id(), "\n");
195 }
196
create(const std::string & name,const std::string & user,const std::string & password,const std::string & db,const std::vector<std::string> & auth_methods,const bool is_raw_connection)197 void Connection_manager::create(const std::string &name,
198 const std::string &user,
199 const std::string &password,
200 const std::string &db,
201 const std::vector<std::string> &auth_methods,
202 const bool is_raw_connection) {
203 if (m_session_holders.count(name))
204 throw std::runtime_error("a session named " + name + " already exists");
205
206 m_console.print_verbose("Connecting...\n");
207
208 Connection_options co = m_default_connection_options;
209
210 if (!user.empty()) {
211 co.user = user;
212 co.password = password;
213 }
214
215 if (!db.empty()) {
216 co.schema = db;
217 }
218
219 if (!auth_methods.empty()) {
220 co.auth_methods = auth_methods;
221 }
222
223 auto session = xcl::create_session();
224 std::shared_ptr<Session_holder> holder{
225 new Session_holder(std::move(session), m_console, co)};
226
227 xcl::XError error = holder->connect(is_raw_connection);
228
229 if (error) {
230 throw error;
231 }
232
233 m_active_holder = holder;
234 m_session_holders[name] = holder;
235 m_active_session_name = name;
236
237 setup_variables(active_xsession());
238
239 m_console.print_verbose("Connected client #", active_xsession()->client_id(),
240 "\n");
241 }
242
abort_active()243 void Connection_manager::abort_active() {
244 if (m_active_holder) {
245 if (!m_active_session_name.empty())
246 std::cout << "aborting session " << m_active_session_name << "\n";
247 /* Close connection first, to stop XSession from executing
248 Disconnection flow */
249 active_xconnection()->close();
250 m_active_holder.reset();
251 m_session_holders.erase(m_active_session_name);
252 if (m_active_session_name != "") set_active("");
253 } else {
254 throw std::runtime_error("no active session");
255 }
256 }
257
is_default_active()258 bool Connection_manager::is_default_active() {
259 return m_active_session_name.empty();
260 }
261
close_active(const bool shutdown,const bool be_quiet)262 void Connection_manager::close_active(const bool shutdown,
263 const bool be_quiet) {
264 if (m_active_holder) {
265 if (m_active_session_name.empty() && !shutdown)
266 throw std::runtime_error("cannot close default session");
267 try {
268 if (!m_active_session_name.empty() && !be_quiet)
269 m_console.print("closing session ", m_active_session_name, "\n");
270
271 if (active_xconnection()->state().is_connected()) {
272 // send a close message and wait for the corresponding Ok message
273 active_xprotocol()->send(Mysqlx::Connection::Close());
274 xcl::XProtocol::Server_message_type_id msgid;
275 xcl::XError error;
276 Message_ptr msg{
277 active_xprotocol()->recv_single_message(&msgid, &error)};
278
279 if (error) throw error;
280
281 if (!be_quiet) m_console.print(*msg);
282 if (Mysqlx::ServerMessages::OK != msgid)
283 throw xcl::XError(CR_COMMANDS_OUT_OF_SYNC,
284 "Disconnect was expecting Mysqlx.Ok(bye!), but "
285 "got the one above (one or more calls to -->recv "
286 "are probably missing)");
287
288 std::string text = static_cast<Mysqlx::Ok *>(msg.get())->msg();
289 if (text != "bye!" && text != "tchau!")
290 throw xcl::XError(CR_COMMANDS_OUT_OF_SYNC,
291 "Disconnect was expecting Mysqlx.Ok(bye!), but "
292 "got the one above (one or more calls to -->recv "
293 "are probably missing)");
294
295 if (!m_default_connection_options.dont_wait_for_disconnect) {
296 Message_ptr msg{
297 active_xprotocol()->recv_single_message(&msgid, &error)};
298
299 if (!error && !be_quiet) {
300 m_console.print_error("Was expecting closure but got message:",
301 *msg);
302 }
303 }
304
305 active_xconnection()->close();
306 }
307 m_session_holders.erase(m_active_session_name);
308 if (!shutdown) set_active("", be_quiet);
309 } catch (const std::exception &error) {
310 active_xconnection()->close();
311 m_session_holders.erase(m_active_session_name);
312 if (!shutdown) set_active("", be_quiet);
313 throw error;
314 } catch (const xcl::XError &error) {
315 active_xconnection()->close();
316 m_session_holders.erase(m_active_session_name);
317 if (!shutdown) set_active("", be_quiet);
318 throw error;
319 }
320 } else {
321 if (!shutdown) throw std::runtime_error("no active session");
322 }
323 }
324
set_active(const std::string & name,const bool be_quiet)325 void Connection_manager::set_active(const std::string &name,
326 const bool be_quiet) {
327 if (m_session_holders.count(name) == 0) {
328 std::string slist;
329 for (auto it = m_session_holders.begin(); it != m_session_holders.end();
330 ++it)
331 slist.append(it->first).append(", ");
332 if (!slist.empty()) slist.resize(slist.length() - 2);
333 throw std::runtime_error("no session named '" + name + "': " + slist);
334 }
335 m_active_holder = m_session_holders[name];
336 m_active_session_name = name;
337
338 setup_variables(active_xsession());
339
340 if (!be_quiet)
341 m_console.print(
342 "switched to session ",
343 (m_active_session_name.empty() ? "default" : m_active_session_name),
344 "\n");
345 }
346
active_holder()347 Session_holder &Connection_manager::active_holder() {
348 if (!m_active_holder) throw std::runtime_error("no active session");
349
350 return *m_active_holder;
351 }
352
active_xsession()353 xcl::XSession *Connection_manager::active_xsession() {
354 if (!m_active_holder) throw std::runtime_error("no active session");
355 return m_active_holder->get_session();
356 }
357
active_xprotocol()358 xcl::XProtocol *Connection_manager::active_xprotocol() {
359 return &active_xsession()->get_protocol();
360 }
361
active_xconnection()362 xcl::XConnection *Connection_manager::active_xconnection() {
363 return &active_xprotocol()->get_connection();
364 }
365
active_session_messages_received(const std::string & message_name) const366 uint64_t Connection_manager::active_session_messages_received(
367 const std::string &message_name) const {
368 uint64_t result = 0;
369 m_active_holder->try_get_number_of_received_messages(message_name, &result);
370
371 return result;
372 }
373
setup_variables(xcl::XSession * session)374 void Connection_manager::setup_variables(xcl::XSession *session) {
375 auto &connection = session->get_protocol().get_connection();
376 m_variables->set(k_variable_active_client_id,
377 std::to_string(session->client_id()));
378
379 m_variables->set(k_variable_active_socket_id,
380 std::to_string(connection.get_socket_fd()));
381 }
382