1 /*
2  * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 
25 #include "plugin/x/tests/driver/connector/connection_manager.h"
26 
27 #include <sstream>
28 #include <stdexcept>
29 #include <utility>
30 #include <vector>
31 
32 #include "my_dbug.h"
33 
34 #include "plugin/x/tests/driver/processor/variable_names.h"
35 
36 google::protobuf::LogHandler *g_lh = nullptr;
37 
Connection_manager(const Connection_options & co,Variable_container * variables,const Console & console)38 Connection_manager::Connection_manager(const Connection_options &co,
39                                        Variable_container *variables,
40                                        const Console &console)
41     : m_default_connection_options(co),
42       m_variables(variables),
43       m_console(console) {
44   g_lh = google::protobuf::SetLogHandler([](google::protobuf::LogLevel level,
45                                             const char *filename, int line,
46                                             const std::string &message) {
47     if (g_lh) g_lh(level, filename, line, message);
48     DBUG_LOG("debug",
49              "Protobuf error (level:" << level << ", filename:" << filename
50                                       << ":" << line << ", text:" << message);
51   });
52   m_variables->make_special_variable(
53       k_variable_option_user,
54       new Variable_dynamic_string(m_default_connection_options.user));
55 
56   m_variables->make_special_variable(
57       k_variable_option_pass,
58       new Variable_dynamic_string(m_default_connection_options.password));
59 
60   m_variables->make_special_variable(
61       k_variable_option_host,
62       new Variable_dynamic_string(m_default_connection_options.host));
63 
64   m_variables->make_special_variable(
65       k_variable_option_socket,
66       new Variable_dynamic_string(m_default_connection_options.socket));
67 
68   m_variables->make_special_variable(
69       k_variable_option_schema,
70       new Variable_dynamic_string(m_default_connection_options.schema));
71 
72   m_variables->make_special_variable(
73       k_variable_option_port,
74       new Variable_dynamic_int(m_default_connection_options.port));
75 
76   m_variables->make_special_variable(
77       k_variable_option_ssl_mode,
78       new Variable_dynamic_string(m_default_connection_options.ssl_mode));
79 
80   m_variables->make_special_variable(
81       k_variable_option_ssl_cipher,
82       new Variable_dynamic_string(m_default_connection_options.ssl_cipher));
83 
84   m_variables->make_special_variable(
85       k_variable_option_tls_version,
86       new Variable_dynamic_string(m_default_connection_options.allowed_tls));
87 
88   m_variables->make_special_variable(
89       k_variable_option_compression_algorithm,
90       new Variable_dynamic_array_of_strings(
91           m_default_connection_options.compression_algorithm));
92 
93   m_variables->make_special_variable(
94       k_variable_option_compression_combine_mixed_messages,
95       new Variable_string_readonly(
96           m_default_connection_options.compression_combine_mixed_messages));
97 
98   m_variables->make_special_variable(
99       k_variable_option_compression_max_combine_messages,
100       new Variable_string_readonly(
101           m_default_connection_options.compression_max_combine_messages));
102 
103   const std::string level =
104       m_default_connection_options.compression_level.has_value()
105           ? std::to_string(
106                 m_default_connection_options.compression_level.value())
107           : std::string("DEFAULT");
108 
109   m_variables->make_special_variable(k_variable_option_compression_level,
110                                      new Variable_string_readonly(level));
111 
112   m_active_holder.reset(new Session_holder(xcl::create_session(), m_console,
113                                            m_default_connection_options));
114 
115   m_session_holders[""] = m_active_holder;
116 }
117 
~Connection_manager()118 Connection_manager::~Connection_manager() {
119   std::vector<std::string> disconnect_following;
120 
121   for (const auto &connection : m_session_holders) {
122     if (connection.first != "") {
123       disconnect_following.push_back(connection.first);
124     }
125   }
126 
127   for (auto &name : disconnect_following) {
128     safe_close(name);
129   }
130 
131   if (m_session_holders.count("") > 0) {
132     safe_close("");
133   }
134 }
135 
get_credentials(std::string * ret_user,std::string * ret_pass)136 void Connection_manager::get_credentials(std::string *ret_user,
137                                          std::string *ret_pass) {
138   assert(ret_user);
139   assert(ret_pass);
140 
141   *ret_user = m_default_connection_options.user;
142   *ret_pass = m_default_connection_options.password;
143 }
144 
safe_close(const std::string & name)145 void Connection_manager::safe_close(const std::string &name) {
146   try {
147     set_active(name, true);
148     close_active(true, true);
149   } catch (const std::exception &) {
150   } catch (const xcl::XError &) {
151   }
152 }
153 
connect_default(const bool send_cap_password_expired,const bool client_interactive,const bool no_auth,const bool connect_attrs)154 void Connection_manager::connect_default(const bool send_cap_password_expired,
155                                          const bool client_interactive,
156                                          const bool no_auth,
157                                          const bool connect_attrs) {
158   m_console.print_verbose("Connecting...\n");
159 
160   auto session = m_active_holder->get_session();
161 
162   if (send_cap_password_expired) {
163     session->set_capability(
164         xcl::XSession::Capability_can_handle_expired_password, true);
165   }
166 
167   if (client_interactive) {
168     session->set_capability(xcl::XSession::Capability_client_interactive, true);
169   }
170 
171   if (connect_attrs) {
172     auto attrs = session->get_connect_attrs();
173     attrs.emplace_back("program_name", xcl::Argument_value{"mysqlxtest"});
174     session->set_capability(xcl::XSession::Capability_session_connect_attrs,
175                             attrs, false);
176   }
177 
178   xcl::XError error = m_active_holder->connect(no_auth);
179 
180   if (error) {
181     // In case of configuration error, lets do safe_close to synchronize
182     // closing of socket with exist of mysqlxtest (in other case mysqlxtest
183     // is going to exit and after a while the connection is going to be
184     // accepted on server side).
185     if (CR_X_TLS_WRONG_CONFIGURATION != error.error() || no_auth) {
186       session->get_protocol().get_connection().close();
187     }
188 
189     throw error;
190   }
191 
192   setup_variables(session);
193 
194   m_console.print_verbose("Connected client #", session->client_id(), "\n");
195 }
196 
create(const std::string & name,const std::string & user,const std::string & password,const std::string & db,const std::vector<std::string> & auth_methods,const bool is_raw_connection)197 void Connection_manager::create(const std::string &name,
198                                 const std::string &user,
199                                 const std::string &password,
200                                 const std::string &db,
201                                 const std::vector<std::string> &auth_methods,
202                                 const bool is_raw_connection) {
203   if (m_session_holders.count(name))
204     throw std::runtime_error("a session named " + name + " already exists");
205 
206   m_console.print_verbose("Connecting...\n");
207 
208   Connection_options co = m_default_connection_options;
209 
210   if (!user.empty()) {
211     co.user = user;
212     co.password = password;
213   }
214 
215   if (!db.empty()) {
216     co.schema = db;
217   }
218 
219   if (!auth_methods.empty()) {
220     co.auth_methods = auth_methods;
221   }
222 
223   auto session = xcl::create_session();
224   std::shared_ptr<Session_holder> holder{
225       new Session_holder(std::move(session), m_console, co)};
226 
227   xcl::XError error = holder->connect(is_raw_connection);
228 
229   if (error) {
230     throw error;
231   }
232 
233   m_active_holder = holder;
234   m_session_holders[name] = holder;
235   m_active_session_name = name;
236 
237   setup_variables(active_xsession());
238 
239   m_console.print_verbose("Connected client #", active_xsession()->client_id(),
240                           "\n");
241 }
242 
abort_active()243 void Connection_manager::abort_active() {
244   if (m_active_holder) {
245     if (!m_active_session_name.empty())
246       std::cout << "aborting session " << m_active_session_name << "\n";
247     /* Close connection first, to stop XSession from executing
248      Disconnection flow */
249     active_xconnection()->close();
250     m_active_holder.reset();
251     m_session_holders.erase(m_active_session_name);
252     if (m_active_session_name != "") set_active("");
253   } else {
254     throw std::runtime_error("no active session");
255   }
256 }
257 
is_default_active()258 bool Connection_manager::is_default_active() {
259   return m_active_session_name.empty();
260 }
261 
close_active(const bool shutdown,const bool be_quiet)262 void Connection_manager::close_active(const bool shutdown,
263                                       const bool be_quiet) {
264   if (m_active_holder) {
265     if (m_active_session_name.empty() && !shutdown)
266       throw std::runtime_error("cannot close default session");
267     try {
268       if (!m_active_session_name.empty() && !be_quiet)
269         m_console.print("closing session ", m_active_session_name, "\n");
270 
271       if (active_xconnection()->state().is_connected()) {
272         // send a close message and wait for the corresponding Ok message
273         active_xprotocol()->send(Mysqlx::Connection::Close());
274         xcl::XProtocol::Server_message_type_id msgid;
275         xcl::XError error;
276         Message_ptr msg{
277             active_xprotocol()->recv_single_message(&msgid, &error)};
278 
279         if (error) throw error;
280 
281         if (!be_quiet) m_console.print(*msg);
282         if (Mysqlx::ServerMessages::OK != msgid)
283           throw xcl::XError(CR_COMMANDS_OUT_OF_SYNC,
284                             "Disconnect was expecting Mysqlx.Ok(bye!), but "
285                             "got the one above (one or more calls to -->recv "
286                             "are probably missing)");
287 
288         std::string text = static_cast<Mysqlx::Ok *>(msg.get())->msg();
289         if (text != "bye!" && text != "tchau!")
290           throw xcl::XError(CR_COMMANDS_OUT_OF_SYNC,
291                             "Disconnect was expecting Mysqlx.Ok(bye!), but "
292                             "got the one above (one or more calls to -->recv "
293                             "are probably missing)");
294 
295         if (!m_default_connection_options.dont_wait_for_disconnect) {
296           Message_ptr msg{
297               active_xprotocol()->recv_single_message(&msgid, &error)};
298 
299           if (!error && !be_quiet) {
300             m_console.print_error("Was expecting closure but got message:",
301                                   *msg);
302           }
303         }
304 
305         active_xconnection()->close();
306       }
307       m_session_holders.erase(m_active_session_name);
308       if (!shutdown) set_active("", be_quiet);
309     } catch (const std::exception &error) {
310       active_xconnection()->close();
311       m_session_holders.erase(m_active_session_name);
312       if (!shutdown) set_active("", be_quiet);
313       throw error;
314     } catch (const xcl::XError &error) {
315       active_xconnection()->close();
316       m_session_holders.erase(m_active_session_name);
317       if (!shutdown) set_active("", be_quiet);
318       throw error;
319     }
320   } else {
321     if (!shutdown) throw std::runtime_error("no active session");
322   }
323 }
324 
set_active(const std::string & name,const bool be_quiet)325 void Connection_manager::set_active(const std::string &name,
326                                     const bool be_quiet) {
327   if (m_session_holders.count(name) == 0) {
328     std::string slist;
329     for (auto it = m_session_holders.begin(); it != m_session_holders.end();
330          ++it)
331       slist.append(it->first).append(", ");
332     if (!slist.empty()) slist.resize(slist.length() - 2);
333     throw std::runtime_error("no session named '" + name + "': " + slist);
334   }
335   m_active_holder = m_session_holders[name];
336   m_active_session_name = name;
337 
338   setup_variables(active_xsession());
339 
340   if (!be_quiet)
341     m_console.print(
342         "switched to session ",
343         (m_active_session_name.empty() ? "default" : m_active_session_name),
344         "\n");
345 }
346 
active_holder()347 Session_holder &Connection_manager::active_holder() {
348   if (!m_active_holder) throw std::runtime_error("no active session");
349 
350   return *m_active_holder;
351 }
352 
active_xsession()353 xcl::XSession *Connection_manager::active_xsession() {
354   if (!m_active_holder) throw std::runtime_error("no active session");
355   return m_active_holder->get_session();
356 }
357 
active_xprotocol()358 xcl::XProtocol *Connection_manager::active_xprotocol() {
359   return &active_xsession()->get_protocol();
360 }
361 
active_xconnection()362 xcl::XConnection *Connection_manager::active_xconnection() {
363   return &active_xprotocol()->get_connection();
364 }
365 
active_session_messages_received(const std::string & message_name) const366 uint64_t Connection_manager::active_session_messages_received(
367     const std::string &message_name) const {
368   uint64_t result = 0;
369   m_active_holder->try_get_number_of_received_messages(message_name, &result);
370 
371   return result;
372 }
373 
setup_variables(xcl::XSession * session)374 void Connection_manager::setup_variables(xcl::XSession *session) {
375   auto &connection = session->get_protocol().get_connection();
376   m_variables->set(k_variable_active_client_id,
377                    std::to_string(session->client_id()));
378 
379   m_variables->set(k_variable_active_socket_id,
380                    std::to_string(connection.get_socket_fd()));
381 }
382