1 /*
2  * Copyright (c) 2015, 2021, Oracle and/or its affiliates.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License, version 2.0,
6  * as published by the Free Software Foundation.
7  *
8  * This program is also distributed with certain software (including
9  * but not limited to OpenSSL) that is licensed under separate terms,
10  * as designated in a particular file or component or in included license
11  * documentation.  The authors of MySQL hereby grant you an additional
12  * permission to link the program and your derivative works with the
13  * separately licensed software that they have included with MySQL.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License, version 2.0, for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23  * 02110-1301  USA
24  */
25 
26 
27 #include <rapidjson/document.h>
28 #include <rapidjson/stringbuffer.h>
29 #include <rapidjson/writer.h>
30 #include <string.h>
31 #include <algorithm>
32 #include <cctype>
33 #include <fstream>
34 #include <ios>
35 #include <iostream>
36 #include <iterator>
37 #include <sstream>
38 #include <stdexcept>
39 
40 #include "dummy_stream.h"
41 #include "m_string.h" // needed by writer.h, but has to be included after expr_parser.h
42 #include "my_global.h"
43 #include "mysqlx_error.h"
44 #include "mysqlx_protocol.h"
45 #include "mysqlx_resultset.h"
46 #include "mysqlx_session.h"
47 #include "mysqlx_version.h"
48 #include "ngs_common/bind.h"
49 #include "mysqlxtest_error_names.h"
50 #include "common/utils_string_parsing.h"
51 #include "ngs_common/chrono.h"
52 #include "ngs_common/protocol_const.h"
53 #include "ngs_common/protocol_protobuf.h"
54 #include "ngs_common/to_string.h"
55 #include "utils_mysql_parsing.h"
56 #include "message_formatter.h"
57 #include "violite.h"
58 
59 #ifdef HAVE_SYS_UN_H
60 #include <sys/un.h>
61 #endif
62 
63 const char * const CMD_ARG_BE_QUIET = "be-quiet";
64 const char * const MYSQLXTEST_VERSION = "1.0";
65 const char CMD_ARG_SEPARATOR = '\t';
66 
67 #include <mysql/service_my_snprintf.h>
68 #include <mysql.h>
69 
70 #ifdef _MSC_VER
71 #  pragma push_macro("ERROR")
72 #  undef ERROR
73 #endif
74 
75 using namespace google::protobuf;
76 
77 typedef std::map<std::string, std::string> Message_by_full_name;
78 static Message_by_full_name server_msgs_by_full_name;
79 static Message_by_full_name client_msgs_by_full_name;
80 
81 typedef std::map<std::string, std::pair<mysqlx::Message* (*)(), int8_t> > Message_by_name;
82 typedef ngs::function<void (std::string)> Value_callback;
83 static Message_by_name server_msgs_by_name;
84 static Message_by_name client_msgs_by_name;
85 
86 typedef std::map<int8_t, std::pair<mysqlx::Message* (*)(), std::string> > Message_by_id;
87 static Message_by_id server_msgs_by_id;
88 static Message_by_id client_msgs_by_id;
89 
90 typedef ngs::unique_ptr<mysqlx::Message> Message_ptr;
91 
92 bool OPT_quiet = false;
93 bool OPT_bindump = false;
94 bool OPT_show_warnings = false;
95 bool OPT_fatal_errors = true;
96 bool OPT_verbose = false;
97 bool OPT_query = true;
98 #ifndef _WIN32
99 bool OPT_color = false;
100 #endif
101 const char current_dir[] = {FN_CURLIB, FN_LIBCHAR, '\0'};
102 std::string OPT_import_path(current_dir);
103 
104 class Expected_error;
105 static Expected_error *OPT_expect_error = 0;
106 
107 struct Stack_frame {
108   int line_number;
109   std::string context;
110 };
111 static std::list<Stack_frame> script_stack;
112 
113 static std::map<std::string, std::string> variables;
114 static std::list<std::string> variables_to_unreplace;
115 
ignore_traces_from_libraries(enum loglevel ll,const char * format,va_list args)116 static void ignore_traces_from_libraries(enum loglevel ll, const char *format, va_list args)
117 {
118 }
119 
get_stream_for_results(const bool force_quiet=false)120 static std::ostream &get_stream_for_results(const bool force_quiet = false)
121 {
122   if (OPT_query && !force_quiet)
123     return std::cout;
124 
125   static Dummy_stream dummy;
126 
127   return dummy;
128 }
129 
replace_variables(std::string & s)130 static void replace_variables(std::string &s)
131 {
132   for (std::map<std::string, std::string>::const_iterator sub = variables.begin();
133       sub != variables.end(); ++sub)
134   {
135     std::string tmp(sub->second);
136 
137     aux::replace_all(tmp, "\"", "\\\"");
138     aux::replace_all(tmp, "\n", "\\n");
139     aux::replace_all(s, sub->first, tmp);
140   }
141 }
142 
unreplace_variables(const std::string & in,bool clear)143 static std::string unreplace_variables(const std::string &in, bool clear)
144 {
145   std::string s = in;
146   for (std::list<std::string>::const_iterator sub = variables_to_unreplace.begin();
147       sub != variables_to_unreplace.end(); ++sub)
148   {
149     aux::replace_all(s, variables[*sub], *sub);
150   }
151   if (clear)
152     variables_to_unreplace.clear();
153   return s;
154 }
155 
error()156 static std::string error()
157 {
158   std::string context;
159 
160   for (std::list<Stack_frame>::const_reverse_iterator it = script_stack.rbegin(); it != script_stack.rend(); ++it)
161   {
162     char tmp[1024];
163     my_snprintf(tmp, sizeof(tmp), "in %s, line %i:", it->context.c_str(), it->line_number);
164     context.append(tmp);
165   }
166 
167 #ifndef _WIN32
168   if (OPT_color)
169     return std::string("\e[1;31m").append(context).append("ERROR: ");
170   else
171 #endif
172     return std::string(context).append("ERROR: ");
173 }
174 
eoerr()175 static std::string eoerr()
176 {
177 #ifndef _WIN32
178   if (OPT_color)
179     return "\e[0m\n";
180   else
181 #endif
182     return "\n";
183 }
184 
dumpx(const std::exception & exc)185 static void dumpx(const std::exception &exc)
186 {
187   std::cerr << error() << exc.what() << eoerr();
188 }
189 
dumpx(const mysqlx::Error & exc)190 static void dumpx(const mysqlx::Error &exc)
191 {
192   std::cerr << error() << exc.what() << " (code " << exc.error() << ")" << eoerr();
193 }
194 
195 static void print_columndata(const std::vector<mysqlx::ColumnMetadata> &meta);
196 static void print_result_set(mysqlx::Result &result);
197 static void print_result_set(mysqlx::Result &result, const std::vector<std::string> &columns,
198                              Value_callback value_callback = Value_callback(), bool quiet = false);
199 
200 //---------------------------------------------------------------------------------------------------------
201 
202 class Expected_error
203 {
204 public:
Expected_error()205   Expected_error() {}
206 
expect_errno(int err)207   void expect_errno(int err)
208   {
209     m_expect_errno.insert(err);
210   }
211 
check_error(const mysqlx::Error & err)212   bool check_error(const mysqlx::Error &err)
213   {
214     if (m_expect_errno.empty())
215     {
216       dumpx(err);
217       return !OPT_fatal_errors;
218     }
219 
220     return check(err);
221   }
222 
check_ok()223   bool check_ok()
224   {
225     if (m_expect_errno.empty())
226       return true;
227     return check(mysqlx::Error());
228   }
229 
230 private:
check(const mysqlx::Error & err)231   bool check(const mysqlx::Error &err)
232   {
233     if (m_expect_errno.find(err.error()) == m_expect_errno.end())
234     {
235       print_unexpected_error(err);
236       m_expect_errno.clear();
237       return !OPT_fatal_errors;
238     }
239 
240     print_expected_error(err);
241     m_expect_errno.clear();
242     return true;
243   }
244 
print_unexpected_error(const mysqlx::Error & err)245   void print_unexpected_error(const mysqlx::Error &err)
246   {
247     std::cerr << error() << "Got unexpected error";
248     print_error_msg(std::cerr, err);
249     std::cerr << "; expected was ";
250     if (m_expect_errno.size() > 1)
251       std::cerr << "one of: ";
252     print_expect_errors(std::cerr);
253     std::cerr << "\n";
254   }
255 
print_expected_error(const mysqlx::Error & err)256   void print_expected_error(const mysqlx::Error &err)
257   {
258     std::cout << "Got expected error";
259     if (m_expect_errno.size() == 1)
260       print_error_msg(std::cout, err);
261     else
262     {
263       std::cout << " (one of: ";
264       print_expect_errors(std::cout);
265       std::cout << ")";
266     }
267     std::cout << "\n";
268   }
269 
print_error_msg(std::ostream & os,const mysqlx::Error & err)270   void print_error_msg(std::ostream & os, const mysqlx::Error &err)
271   {
272     if (err.error())
273       os << ": " << err.what();
274     os << " (code " << err.error() << ")";
275   }
276 
print_expect_errors(std::ostream & os)277   void print_expect_errors(std::ostream & os)
278   {
279     std::copy(m_expect_errno.begin(),
280               m_expect_errno.end(),
281               std::ostream_iterator<int>(os, " "));
282   }
283 
284   std::set<int> m_expect_errno;
285 };
286 
287 //---------------------------------------------------------------------------------------------------------
288 
289 struct Connection_options
290 {
Connection_optionsConnection_options291   Connection_options()
292   : port(0)
293   {
294   }
295 
296   std::string socket;
297   std::string host;
298   int port;
299   std::string user;
300   std::string password;
301   std::string schema;
302 };
303 
304 class Connection_manager
305 {
306 public:
Connection_manager(const std::string & uri,const Connection_options & co,const mysqlx::Ssl_config & ssl_config_,const std::size_t timeout_,const bool _dont_wait_for_disconnect,const mysqlx::Internet_protocol ip_mode)307   Connection_manager(const std::string &uri,
308                      const Connection_options &co,
309                      const mysqlx::Ssl_config &ssl_config_,
310                      const std::size_t timeout_,
311                      const bool _dont_wait_for_disconnect,
312                      const mysqlx::Internet_protocol ip_mode)
313   : connection_options(co),
314     ssl_config(ssl_config_),
315     timeout(timeout_),
316     dont_wait_for_disconnect(_dont_wait_for_disconnect),
317     m_ip_mode(ip_mode)
318   {
319     int pwdfound;
320     std::string proto;
321 
322     if (uri.length())
323     {
324       mysqlx::parse_mysql_connstring(uri, proto,
325           connection_options.user,
326           connection_options.password,
327           connection_options.host,
328           connection_options.port,
329           connection_options.socket,
330           connection_options.schema,
331           pwdfound);
332     }
333     variables["%OPTION_CLIENT_USER%"]     = connection_options.user;
334     variables["%OPTION_CLIENT_PASSWORD%"] = connection_options.password;
335     variables["%OPTION_CLIENT_HOST%"]     = connection_options.host;
336     variables["%OPTION_CLIENT_PORT%"]     = connection_options.port;
337     variables["%OPTION_CLIENT_SOCKET%"]   = connection_options.socket;
338     variables["%OPTION_CLIENT_SCHEMA%"]   = connection_options.schema;
339 
340     active_connection.reset(new mysqlx::XProtocol(ssl_config, timeout, dont_wait_for_disconnect, m_ip_mode));
341     connections[""] = active_connection;
342 
343     if (OPT_verbose)
344       std::cout << "Connecting...\n";
345 
346     make_connection(active_connection);
347   }
348 
get_credentials(std::string & ret_user,std::string & ret_pass)349   void get_credentials(std::string &ret_user, std::string &ret_pass)
350   {
351     ret_user = connection_options.user;
352     ret_pass = connection_options.password;
353   }
354 
connect_default(const bool send_cap_password_expired=false,bool use_plain_auth=false)355   void connect_default(const bool send_cap_password_expired = false, bool use_plain_auth = false)
356   {
357     if (send_cap_password_expired)
358       active_connection->setup_capability("client.pwd_expire_ok", true);
359 
360     if (use_plain_auth)
361       active_connection->authenticate_plain(connection_options.user, connection_options.password, connection_options.schema);
362     else
363       active_connection->authenticate(connection_options.user, connection_options.password, connection_options.schema);
364 
365     std::stringstream s;
366     s << active_connection->client_id();
367     variables["%ACTIVE_CLIENT_ID%"] = s.str();
368 
369     if (OPT_verbose)
370       std::cout << "Connected client #" << active_connection->client_id() << "\n";
371   }
372 
create(const std::string & name,const std::string & user,const std::string & password,const std::string & db,bool no_ssl)373   void create(const std::string &name,
374               const std::string &user, const std::string &password, const std::string &db,
375               bool no_ssl)
376   {
377     if (connections.find(name) != connections.end())
378       throw std::runtime_error("a session named "+name+" already exists");
379 
380     std::cout << "connecting...\n";
381 
382     ngs::shared_ptr<mysqlx::XProtocol> connection;
383     mysqlx::Ssl_config                    connection_ssl_config;
384 
385     if (!no_ssl)
386       connection_ssl_config = ssl_config;
387 
388     connection.reset(new mysqlx::XProtocol(connection_ssl_config, timeout, dont_wait_for_disconnect, m_ip_mode));
389 
390     make_connection(connection);
391 
392     if (user != "-")
393     {
394       if (user.empty())
395         connection->authenticate(connection_options.user, connection_options.password, db.empty() ? connection_options.schema : db);
396       else
397         connection->authenticate(user, password, db.empty() ? connection_options.schema : db);
398     }
399 
400     active_connection = connection;
401     active_connection_name = name;
402     connections[name] = active_connection;
403     std::stringstream s;
404     s << active_connection->client_id();
405     variables["%ACTIVE_CLIENT_ID%"] = s.str();
406     std::cout << "active session is now '" << name << "'\n";
407 
408     if (OPT_verbose)
409       std::cout << "Connected client #" << active_connection->client_id() << "\n";
410   }
411 
abort_active()412   void abort_active()
413   {
414     if (active_connection)
415     {
416       if (!active_connection_name.empty())
417         std::cout << "aborting session " << active_connection_name << "\n";
418       active_connection->set_closed();
419       active_connection.reset();
420       connections.erase(active_connection_name);
421       if (active_connection_name != "")
422         set_active("");
423     }
424     else
425       throw std::runtime_error("no active session");
426   }
427 
is_default_active()428   bool is_default_active()
429   {
430     return active_connection_name.empty();
431   }
432 
close_active(bool shutdown=false)433   void close_active(bool shutdown = false)
434   {
435     if (active_connection)
436     {
437       if (active_connection_name.empty() && !shutdown)
438         throw std::runtime_error("cannot close default session");
439       try
440       {
441         if (!active_connection_name.empty())
442           std::cout << "closing session " << active_connection_name << "\n";
443 
444         if (!active_connection->is_closed())
445         {
446           // send a close message and wait for the corresponding Ok message
447           active_connection->send(Mysqlx::Session::Close());
448           active_connection->set_closed();
449           int msgid;
450           Message_ptr msg(active_connection->recv_raw(msgid));
451           std::cout << formatter::message_to_text(*msg);
452           if (Mysqlx::ServerMessages::OK != msgid)
453             throw mysqlx::Error(CR_COMMANDS_OUT_OF_SYNC,
454                                 "Disconnect was expecting Mysqlx.Ok(bye!), but got the one above (one or more calls to -->recv are probably missing)");
455 
456           std::string text = static_cast<Mysqlx::Ok*>(msg.get())->msg();
457           if (text != "bye!" && text != "tchau!")
458             throw mysqlx::Error(CR_COMMANDS_OUT_OF_SYNC,
459                                 "Disconnect was expecting Mysqlx.Ok(bye!), but got the one above (one or more calls to -->recv are probably missing)");
460 
461           if (!dont_wait_for_disconnect)
462           {
463             try
464             {
465               Message_ptr msg(active_connection->recv_raw(msgid));
466 
467               std::cout << formatter::message_to_text(*msg);
468 
469               throw mysqlx::Error(CR_COMMANDS_OUT_OF_SYNC,
470                   "Was expecting closure but got the one above message");
471             }
472             catch (...)
473             {}
474           }
475         }
476         connections.erase(active_connection_name);
477         if (!shutdown)
478           set_active("");
479       }
480       catch (...)
481       {
482         connections.erase(active_connection_name);
483         if (!shutdown)
484           set_active("");
485         throw;
486       }
487     }
488     else if (!shutdown)
489       throw std::runtime_error("no active session");
490   }
491 
set_active(const std::string & name)492   void set_active(const std::string &name)
493   {
494     if (connections.find(name) == connections.end())
495     {
496       std::string slist;
497       for (std::map<std::string, ngs::shared_ptr<mysqlx::XProtocol> >::const_iterator it = connections.begin(); it != connections.end(); ++it)
498         slist.append(it->first).append(", ");
499       if (!slist.empty())
500         slist.resize(slist.length()-2);
501       throw std::runtime_error("no session named '"+name+"': " + slist);
502     }
503     active_connection = connections[name];
504     active_connection_name = name;
505     std::stringstream s;
506     s << active_connection->client_id();
507     variables["%ACTIVE_CLIENT_ID%"] = s.str();
508     std::cout << "switched to session " << (active_connection_name.empty() ? "default" : active_connection_name) << "\n";
509   }
510 
active()511   mysqlx::XProtocol* active()
512   {
513     if (!active_connection)
514       throw std::runtime_error("no active session");
515     return active_connection.get();
516   }
517 
518 private:
make_connection(ngs::shared_ptr<mysqlx::XProtocol> & connection)519   void make_connection(ngs::shared_ptr<mysqlx::XProtocol> &connection)
520   {
521     if (connection_options.socket.empty())
522       connection->connect(connection_options.host, connection_options.port);
523     else
524       connection->connect_to_localhost(connection_options.socket);
525   }
526 
527   std::map<std::string, ngs::shared_ptr<mysqlx::XProtocol> > connections;
528   ngs::shared_ptr<mysqlx::XProtocol> active_connection;
529   std::string active_connection_name;
530   Connection_options connection_options;
531 
532   mysqlx::Ssl_config ssl_config;
533   const std::size_t timeout;
534   const bool dont_wait_for_disconnect;
535   const mysqlx::Internet_protocol m_ip_mode;
536 };
537 
data_to_bindump(const std::string & bindump)538 static std::string data_to_bindump(const std::string &bindump)
539 {
540   std::string res;
541 
542   for (size_t i = 0; i < bindump.length(); i++)
543   {
544     unsigned char ch = bindump[i];
545 
546     if (i >= 5 && ch == '\\')
547     {
548       res.push_back('\\');
549       res.push_back('\\');
550     }
551     else if (i >= 5 && isprint(ch) && !isblank(ch))
552       res.push_back(ch);
553     else
554     {
555       res.append("\\x");
556       res.push_back(aux::ALLOWED_HEX_CHARACTERS[(ch >> 4) & 0xf]);
557       res.push_back(aux::ALLOWED_HEX_CHARACTERS[ch & 0xf]);
558     }
559   }
560 
561   return res;
562 }
563 
bindump_to_data(const std::string & bindump)564 static std::string bindump_to_data(const std::string &bindump)
565 {
566   std::string res;
567   for (size_t i = 0; i < bindump.length(); i++)
568   {
569     if (bindump[i] == '\\')
570     {
571       if (bindump[i+1] == '\\')
572       {
573         res.push_back('\\');
574         ++i;
575       }
576       else if (bindump[i+1] == 'x')
577       {
578         int value = 0;
579         const char *hex = aux::ALLOWED_HEX_CHARACTERS.c_str();
580         const char *p = strchr(hex, bindump[i+2]);
581         if (p)
582           value = (p - hex) << 4;
583         else
584         {
585           std::cerr << error() << "Invalid bindump char at " << i+2 << eoerr();
586           break;
587         }
588         p = strchr(hex, bindump[i+3]);
589         if (p)
590           value |= p - hex;
591         else
592         {
593           std::cerr << error() << "Invalid bindump char at " << i+3 << eoerr();
594           break;
595         }
596         i += 3;
597         res.push_back(value);
598       }
599     }
600     else
601       res.push_back(bindump[i]);
602   }
603   return res;
604 }
605 
message_to_bindump(const mysqlx::Message & message)606 static std::string message_to_bindump(const mysqlx::Message &message)
607 {
608   std::string res;
609   std::string out;
610 
611   message.SerializeToString(&out);
612 
613   res.resize(5);
614   *(uint32_t*)res.data() = static_cast<uint32_t>(out.size() + 1);
615 
616 #ifdef WORDS_BIGENDIAN
617   std::swap(res[0], res[3]);
618   std::swap(res[1], res[2]);
619 #endif
620 
621   res[4] = client_msgs_by_name[client_msgs_by_full_name[message.GetDescriptor()->full_name()]].second;
622   res.append(out);
623 
624   return data_to_bindump(res);
625 }
626 
627 class ErrorDumper : public ::google::protobuf::io::ErrorCollector
628 {
629   std::stringstream m_out;
630 
631 public:
AddError(int line,int column,const string & message)632   virtual void AddError(int line, int column, const string & message)
633   {
634     m_out << "ERROR in message: line " << line+1 << ": column " << column << ": " << message<<"\n";
635   }
636 
AddWarning(int line,int column,const string & message)637   virtual void AddWarning(int line, int column, const string & message)
638   {
639     m_out << "WARNING in message: line " << line+1 << ": column " << column << ": " << message<<"\n";
640   }
641 
str()642   std::string str() { return m_out.str(); }
643 };
644 
text_to_client_message(const std::string & name,const std::string & data,int8_t & msg_id)645 static mysqlx::Message *text_to_client_message(const std::string &name, const std::string &data, int8_t &msg_id)
646 {
647   if (client_msgs_by_full_name.find(name) == client_msgs_by_full_name.end())
648   {
649     std::cerr << error() << "Invalid message type " << name << eoerr();
650     return NULL;
651   }
652 
653   Message_by_name::const_iterator msg = client_msgs_by_name.find(client_msgs_by_full_name[name]);
654   if (msg == client_msgs_by_name.end())
655   {
656     std::cerr << error() << "Invalid message type " << name << eoerr();
657     return NULL;
658   }
659 
660   mysqlx::Message *message = msg->second.first();
661   msg_id = msg->second.second;
662 
663   google::protobuf::TextFormat::Parser parser;
664   ErrorDumper dumper;
665   parser.RecordErrorsTo(&dumper);
666   if (!parser.ParseFromString(data, message))
667   {
668     std::cerr << error() << "Invalid message in input: " << name << eoerr();
669     int i = 1;
670     for (std::string::size_type p = 0, n = data.find('\n', p+1);
671         p != std::string::npos;
672         p = (n == std::string::npos ? n : n+1), n = data.find('\n', p+1), ++i)
673     {
674       std::cerr << i << ": " << data.substr(p, n-p) << "\n";
675     }
676     std::cerr << "\n" << dumper.str();
677     delete message;
678     return NULL;
679   }
680 
681   return message;
682 }
683 
dump_notices(int type,const std::string & data)684 static bool dump_notices(int type, const std::string &data)
685 {
686   if (type == 3)
687   {
688     Mysqlx::Notice::SessionStateChanged change;
689     change.ParseFromString(data);
690     if (!change.IsInitialized())
691       std::cerr << "Invalid notice received from server " << change.InitializationErrorString() << "\n";
692     else
693     {
694       if (change.param() == Mysqlx::Notice::SessionStateChanged::ACCOUNT_EXPIRED)
695       {
696         std::cout << "NOTICE: Account password expired\n";
697         return true;
698       }
699     }
700   }
701   return false;
702 }
703 
704 //-----------------------------------------------------------------------------------
705 
706 class Execution_context
707 {
708 public:
Execution_context(std::istream & stream,Connection_manager * cm)709   Execution_context(std::istream &stream, Connection_manager *cm)
710   : m_stream(stream), m_cm(cm)
711   { }
712 
713   std::string         m_command_name;
714   std::istream       &m_stream;
715   Connection_manager *m_cm;
716 
connection()717   mysqlx::XProtocol *connection() { return m_cm->active(); }
718 };
719 
720 //---------------------------------------------------------------------------------------------------------
721 
722 class Macro
723 {
724 public:
Macro(const std::string & name,const std::list<std::string> & argnames)725   Macro(const std::string &name, const std::list<std::string> &argnames)
726   : m_name(name), m_args(argnames)
727   { }
728 
name() const729   std::string name() const { return m_name; }
730 
set_body(const std::string & body)731   void set_body(const std::string &body)
732   {
733     m_body = body;
734   }
735 
get(const std::list<std::string> & args) const736   std::string get(const std::list<std::string> &args) const
737   {
738     if (args.size() != m_args.size())
739     {
740       std::cerr << error() << "Invalid number of arguments for macro "+m_name << ", expected:" << m_args.size() << " actual:" << args.size() << eoerr();
741       return "";
742     }
743 
744     std::string text = m_body;
745     std::list<std::string>::const_iterator n = m_args.begin(), v = args.begin();
746     for (size_t i = 0; i < args.size(); i++)
747     {
748       aux::replace_all(text, *(n++), *(v++));
749     }
750     return text;
751   }
752 
753 public:
754   static std::list<ngs::shared_ptr<Macro> > macros;
755 
add(ngs::shared_ptr<Macro> macro)756   static void add(ngs::shared_ptr<Macro> macro)
757   {
758     macros.push_back(macro);
759   }
760 
get(const std::string & cmd,std::string & r_name)761   static std::string get(const std::string &cmd, std::string &r_name)
762   {
763     std::list<std::string> args;
764     std::string::size_type p = std::min(cmd.find(' '), cmd.find('\t'));
765     if (p == std::string::npos)
766       r_name = cmd;
767     else
768     {
769       r_name = cmd.substr(0, p);
770       std::string rest = cmd.substr(p+1);
771       aux::split(args, rest, "\t", true);
772     }
773     if (r_name.empty())
774     {
775       std::cerr << error() << "Missing macro name for macro call" << eoerr();
776       return "";
777     }
778 
779     for (std::list<ngs::shared_ptr<Macro> >::const_iterator iter = macros.begin(); iter != macros.end(); ++iter)
780     {
781       if ((*iter)->m_name == r_name)
782       {
783         return (*iter)->get(args);
784       }
785     }
786     std::cerr << error() << "Undefined macro " << r_name << eoerr();
787     return "";
788   }
789 
790   static bool call(Execution_context &context, const std::string &cmd);
791 
792 private:
793   std::string m_name;
794   std::list<std::string> m_args;
795   std::string m_body;
796 };
797 
798 std::list<ngs::shared_ptr<Macro> > Macro::macros;
799 
800 
801 //---------------------------------------------------------------------------------------------------------
802 
803 class Command
804 {
805 public:
806   enum Result {Continue, Stop_with_success, Stop_with_failure};
807 
Command()808   Command()
809   : m_cmd_prefix("-->")
810   {
811     m_commands["title "]      = &Command::cmd_title;
812     m_commands["echo "]       = &Command::cmd_echo;
813     m_commands["recvtype "]   = &Command::cmd_recvtype;
814     m_commands["recverror "]  = &Command::cmd_recverror;
815     m_commands["recvresult"]  = &Command::cmd_recvresult;
816     m_commands["recvtovar "]  = &Command::cmd_recvtovar;
817     m_commands["recvuntil "]  = &Command::cmd_recvuntil;
818     m_commands["recvuntildisc"] = &Command::cmd_recv_all_until_disc;
819     m_commands["enablessl"]   = &Command::cmd_enablessl;
820     m_commands["sleep "]      = &Command::cmd_sleep;
821     m_commands["login "]      = &Command::cmd_login;
822     m_commands["stmtadmin "]  = &Command::cmd_stmtadmin;
823     m_commands["stmtsql "]    = &Command::cmd_stmtsql;
824     m_commands["loginerror "] = &Command::cmd_loginerror;
825     m_commands["repeat "]     = &Command::cmd_repeat;
826     m_commands["endrepeat"]   = &Command::cmd_endrepeat;
827     m_commands["system "]     = &Command::cmd_system;
828     m_commands["peerdisc "]   = &Command::cmd_peerdisc;
829     m_commands["recv"]        = &Command::cmd_recv;
830     m_commands["exit"]        = &Command::cmd_exit;
831     m_commands["abort"]        = &Command::cmd_abort;
832     m_commands["nowarnings"]  = &Command::cmd_nowarnings;
833     m_commands["yeswarnings"] = &Command::cmd_yeswarnings;
834     m_commands["fatalerrors"] = &Command::cmd_fatalerrors;
835     m_commands["nofatalerrors"] = &Command::cmd_nofatalerrors;
836     m_commands["newsession "]  = &Command::cmd_newsession;
837     m_commands["newsessionplain "]  = &Command::cmd_newsessionplain;
838     m_commands["setsession "]  = &Command::cmd_setsession;
839     m_commands["setsession"]  = &Command::cmd_setsession; // for setsession with no args
840     m_commands["closesession"]= &Command::cmd_closesession;
841     m_commands["expecterror "] = &Command::cmd_expecterror;
842     m_commands["measure"]      = &Command::cmd_measure;
843     m_commands["endmeasure "]  = &Command::cmd_endmeasure;
844     m_commands["quiet"]        = &Command::cmd_quiet;
845     m_commands["noquiet"]      = &Command::cmd_noquiet;
846     m_commands["varfile "]     = &Command::cmd_varfile;
847     m_commands["varlet "]      = &Command::cmd_varlet;
848     m_commands["varinc "]      = &Command::cmd_varinc;
849     m_commands["varsub "]      = &Command::cmd_varsub;
850     m_commands["vargen "]      = &Command::cmd_vargen;
851     m_commands["binsend "]     = &Command::cmd_binsend;
852     m_commands["hexsend "]     = &Command::cmd_hexsend;
853     m_commands["binsendoffset "] = &Command::cmd_binsendoffset;
854     m_commands["callmacro "]   = &Command::cmd_callmacro;
855     m_commands["import "]      = &Command::cmd_import;
856     m_commands["assert_eq "]      = &Command::cmd_assert_eq;
857     m_commands["assert_gt "]      = &Command::cmd_assert_gt;
858     m_commands["assert_ge "]      = &Command::cmd_assert_ge;
859     m_commands["query_result"]    = &Command::cmd_query;
860     m_commands["noquery_result"]  = &Command::cmd_noquery;
861     m_commands["wait_for "]       = &Command::cmd_wait_for;
862     m_commands["received "]       = &Command::cmd_received;
863   }
864 
is_command_syntax(const std::string & cmd) const865   bool is_command_syntax(const std::string &cmd) const
866   {
867     return 0 == strncmp(cmd.c_str(), m_cmd_prefix.c_str(), m_cmd_prefix.length());
868   }
869 
process(Execution_context & context,const std::string & command)870   Result process(Execution_context &context, const std::string &command)
871   {
872     if (!is_command_syntax(command))
873       return Stop_with_failure;
874 
875     Command_map::iterator i = std::find_if(m_commands.begin(),
876                                            m_commands.end(),
877                                            ngs::bind(&Command::match_command_name, this, ngs::placeholders::_1, command));
878 
879     if (i == m_commands.end())
880     {
881       std::cerr << "Unknown command " << command << "\n";
882       return Stop_with_failure;
883     }
884 
885     if (OPT_verbose)
886       std::cout << "Execute " << command <<"\n";
887 
888     context.m_command_name = (*i).first;
889 
890     return (*this.*(*i).second)(context, command.c_str() + m_cmd_prefix.length() + (*i).first.length());
891   }
892 
893 private:
894   typedef std::map< std::string, Result (Command::*)(Execution_context &,const std::string &) > Command_map;
895   typedef ::Mysqlx::Datatypes::Any Any;
896 
897   struct Loop_do
898   {
899     std::streampos block_begin;
900     int            iterations;
901     int            value;
902     std::string    variable_name;
903   };
904 
905   Command_map        m_commands;
906   std::list<Loop_do> m_loop_stack;
907   std::string        m_cmd_prefix;
908 
match_command_name(const Command_map::value_type & command,const std::string & instruction)909   bool match_command_name(const Command_map::value_type &command, const std::string &instruction)
910   {
911     if (m_cmd_prefix.length() + command.first.length() > instruction.length())
912       return false;
913 
914     std::string::const_iterator i = std::find(instruction.begin(), instruction.end(), ' ');
915     std::string                 command_name(instruction.begin() + m_cmd_prefix.length(), i);
916 
917     if (0 != command.first.compare(command_name))
918     {
919       if (instruction.end() != i)
920       {
921         ++i;
922         return 0 == command.first.compare(std::string(instruction.begin() + m_cmd_prefix.length(), i));
923       }
924 
925       return false;
926     }
927 
928     return true;
929   }
930 
cmd_echo(Execution_context & context,const std::string & args)931   Result cmd_echo(Execution_context &context, const std::string &args)
932   {
933     std::string s = args;
934     replace_variables(s);
935     std::cout << s << "\n";
936 
937     return Continue;
938   }
939 
cmd_title(Execution_context & context,const std::string & args)940   Result cmd_title(Execution_context &context, const std::string &args)
941   {
942     if (!args.empty())
943     {
944       std::cout << "\n" << args.substr(1) << "\n";
945       std::string sep(args.length()-1, args[0]);
946       std::cout << sep << "\n";
947     }
948     else
949       std::cout << "\n\n";
950 
951     return Continue;
952   }
953 
cmd_recvtype(Execution_context & context,const std::string & args)954   Result cmd_recvtype(Execution_context &context, const std::string &args)
955   {
956     std::vector<std::string> vargs;
957     aux::split(vargs, args, " ", true);
958 
959     if (1 != vargs.size() &&
960         2 != vargs.size())
961     {
962       std::stringstream error_message;
963       error_message << "Received wrong number of arguments, got:"
964                     << vargs.size();
965       throw std::logic_error(error_message.str());
966     }
967 
968     bool be_quiet = false;
969     int msgid;
970     Message_ptr msg(context.connection()->recv_raw(msgid));
971 
972     if (1 < vargs.size())
973     {
974       if (vargs[1] == CMD_ARG_BE_QUIET)
975         be_quiet = true;
976     }
977 
978     if (NULL == msg.get())
979       return OPT_fatal_errors ? Stop_with_failure : Continue;
980 
981     try
982     {
983       const std::string message_in_text = unreplace_variables(formatter::message_to_text(*msg), true);
984 
985       if (msg->GetDescriptor()->full_name() != vargs[0])
986       {
987         std::cout << "Received unexpected message. Was expecting:\n    " << vargs[0] << "\nbut got:\n";
988         std::cout << message_in_text << "\n";
989 
990         return OPT_fatal_errors ? Stop_with_failure : Continue;
991       }
992 
993       std::ostream &out = get_stream_for_results(be_quiet);
994 
995       out << message_in_text << "\n";
996     }
997     catch (std::exception &e)
998     {
999       dumpx(e);
1000       if (OPT_fatal_errors)
1001         return Stop_with_success;
1002     }
1003 
1004     return Continue;
1005   }
1006 
cmd_recverror(Execution_context & context,const std::string & args)1007   Result cmd_recverror(Execution_context &context, const std::string &args)
1008   {
1009     int msgid;
1010     Message_ptr msg(context.connection()->recv_raw(msgid));
1011 
1012     if (msg.get())
1013     {
1014       bool failed = false;
1015       try
1016       {
1017         const int expected_error_code = mysqlxtest::get_error_code_by_text(args);
1018         if (msg->GetDescriptor()->full_name() != "Mysqlx.Error" ||
1019             expected_error_code != (int)static_cast<Mysqlx::Error*>(msg.get())->code())
1020         {
1021           std::cout << error() << "Was expecting Error " << args <<", but got:" << eoerr();
1022           failed = true;
1023         }
1024         else
1025         {
1026           std::cout << "Got expected error:\n";
1027         }
1028 
1029         std::cout << formatter::message_to_text(*msg) << "\n";
1030         if (failed && OPT_fatal_errors)
1031           return Stop_with_success;
1032       }
1033       catch (std::exception &e)
1034       {
1035         dumpx(e);
1036         if (OPT_fatal_errors)
1037           return Stop_with_success;
1038       }
1039     }
1040 
1041     return Continue;
1042   }
1043 
set_variable(std::string name,std::string value)1044   static void set_variable(std::string name, std::string value)
1045   {
1046     variables[name] = value;
1047   }
1048 
cmd_recvtovar(Execution_context & context,const std::string & args)1049   Result cmd_recvtovar(Execution_context &context, const std::string &args)
1050   {
1051     std::string args_cmd = args;
1052     std::vector<std::string> args_array;
1053     aux::trim(args_cmd);
1054 
1055     aux::split(args_array, args_cmd, " ", false);
1056 
1057     args_cmd = CMD_ARG_BE_QUIET;
1058 
1059     if (args_array.size() > 1)
1060     {
1061       args_cmd += " ";
1062       args_cmd += args_array.at(1);
1063     }
1064 
1065     cmd_recvresult(context, args_cmd, ngs::bind(&Command::set_variable, args_array.at(0), ngs::placeholders::_1));
1066 
1067     return Continue;
1068   }
1069 
cmd_recvresult(Execution_context & context,const std::string & args)1070   Result cmd_recvresult(Execution_context &context, const std::string &args)
1071   {
1072     return cmd_recvresult(context, args, Value_callback());
1073   }
1074 
cmd_recvresult(Execution_context & context,const std::string & args,Value_callback value_callback)1075   Result cmd_recvresult(Execution_context &context, const std::string &args, Value_callback value_callback)
1076   {
1077     ngs::shared_ptr<mysqlx::Result> result;
1078     try
1079     {
1080       std::vector<std::string> columns;
1081       std::string cmd_args = args;
1082 
1083       aux::trim(cmd_args);
1084 
1085       if (cmd_args.size())
1086         aux::split(columns, cmd_args, " ", false);
1087 
1088       std::vector<std::string>::iterator i = std::find(columns.begin(), columns.end(), "print-columnsinfo");
1089       const bool print_colinfo = i != columns.end();
1090       if (print_colinfo) columns.erase(i);
1091 
1092       i = std::find(columns.begin(), columns.end(), CMD_ARG_BE_QUIET);
1093       const bool quiet = i != columns.end();
1094       if (quiet) columns.erase(i);
1095 
1096       std::ostream &out = get_stream_for_results(quiet);
1097 
1098       result = context.connection()->recv_result();
1099       print_result_set(*result, columns, value_callback, quiet);
1100 
1101       if (print_colinfo)
1102         print_columndata(*result->columnMetadata());
1103 
1104       variables_to_unreplace.clear();
1105       int64_t x = result->affectedRows();
1106       if (x >= 0)
1107         out << x << " rows affected\n";
1108       else
1109         out << "command ok\n";
1110       if (result->lastInsertId() > 0)
1111         out << "last insert id: " << result->lastInsertId() << "\n";
1112       if (!result->infoMessage().empty())
1113         out << result->infoMessage() << "\n";
1114       {
1115         std::vector<mysqlx::Result::Warning> warnings(result->getWarnings());
1116         if (!warnings.empty())
1117           out << "Warnings generated:\n";
1118         for (std::vector<mysqlx::Result::Warning>::const_iterator w = warnings.begin();
1119             w != warnings.end(); ++w)
1120         {
1121           out << (w->is_note ? "NOTE" : "WARNING") << " | " << w->code << " | " << w->text << "\n";
1122         }
1123       }
1124 
1125       if (!OPT_expect_error->check_ok())
1126         return Stop_with_failure;
1127     }
1128     catch (mysqlx::Error &err)
1129     {
1130       if (result.get())
1131         result->mark_error();
1132       if (!OPT_expect_error->check_error(err))
1133         return Stop_with_failure;
1134     }
1135     return Continue;
1136   }
1137 
cmd_recvuntil(Execution_context & context,const std::string & args)1138   Result cmd_recvuntil(Execution_context &context, const std::string &args)
1139   {
1140     int msgid;
1141 
1142     std::vector<std::string> argl;
1143 
1144     aux::split(argl, args, " ", true);
1145 
1146     bool show = true, stop = false;
1147 
1148     if (argl.size() > 1)
1149     {
1150       const char *argument_do_not_print = argl[1].c_str();
1151       show = false;
1152 
1153       if (0 != strcmp(argument_do_not_print, "do_not_show_intermediate"))
1154       {
1155         std::cout << "Invalid argument received: " << argl[1] << "\n";
1156         return Stop_with_failure;
1157       }
1158     }
1159 
1160     Message_by_full_name::iterator iterator_msg_name = server_msgs_by_full_name.find(argl[0]);
1161 
1162     if (server_msgs_by_full_name.end() == iterator_msg_name)
1163     {
1164       std::cout << "Unknown message name: " << argl[0] << " " << server_msgs_by_full_name.size() << "\n";
1165       return Stop_with_failure;
1166     }
1167 
1168     Message_by_name::iterator iterator_msg_id = server_msgs_by_name.find(iterator_msg_name->second);
1169 
1170     if (server_msgs_by_name.end() == iterator_msg_id)
1171     {
1172       std::cout << "Invalid data in internal message list, entry not found:" << iterator_msg_name->second << "\n";
1173       return Stop_with_failure;
1174     }
1175 
1176     const int expected_msg_id = iterator_msg_id->second.second;
1177 
1178     do
1179     {
1180       Message_ptr msg(context.connection()->recv_raw(msgid));
1181 
1182       if (msg.get())
1183       {
1184         if (msg->GetDescriptor()->full_name() == argl[0] ||
1185             msgid == Mysqlx::ServerMessages::ERROR)
1186         {
1187           show = true;
1188           stop = true;
1189         }
1190 
1191         try
1192         {
1193           if (show)
1194             std::cout << formatter::message_to_text(*msg) << "\n";
1195         }
1196         catch (std::exception &e)
1197         {
1198           dumpx(e);
1199           if (OPT_fatal_errors)
1200             return Stop_with_success;
1201         }
1202       }
1203     }
1204     while (!stop);
1205 
1206     variables_to_unreplace.clear();
1207 
1208     if (Mysqlx::ServerMessages::ERROR == msgid &&
1209         Mysqlx::ServerMessages::ERROR != expected_msg_id)
1210       return Stop_with_failure;
1211 
1212     return Continue;
1213   }
1214 
cmd_enablessl(Execution_context & context,const std::string & args)1215   Result cmd_enablessl(Execution_context &context, const std::string &args)
1216   {
1217     try
1218     {
1219       context.connection()->enable_tls();
1220     }
1221     catch (const mysqlx::Error &err)
1222     {
1223       dumpx(err);
1224       return Stop_with_failure;
1225     }
1226 
1227     return Continue;
1228   }
1229 
cmd_stmtsql(Execution_context & context,const std::string & args)1230   Result cmd_stmtsql(Execution_context &context, const std::string &args)
1231   {
1232     Mysqlx::Sql::StmtExecute stmt;
1233 
1234     std::string command = args;
1235     replace_variables(command);
1236 
1237     stmt.set_stmt(command);
1238     stmt.set_namespace_("sql");
1239 
1240     context.connection()->send(stmt);
1241 
1242     if (!OPT_quiet)
1243       std::cout << "RUN " << command << "\n";
1244 
1245     return Continue;
1246   }
1247 
1248 
cmd_stmtadmin(Execution_context & context,const std::string & args)1249   Result cmd_stmtadmin(Execution_context &context, const std::string &args)
1250   {
1251     std::string tmp = args;
1252     replace_variables(tmp);
1253     std::vector<std::string> params;
1254     aux::split(params, tmp, "\t", true);
1255     if (params.empty())
1256     {
1257       std::cerr << "Invalid empty admin command\n";
1258       return Stop_with_failure;
1259     }
1260 
1261     aux::trim(params[0]);
1262 
1263     Mysqlx::Sql::StmtExecute stmt;
1264     stmt.set_stmt(params[0]);
1265     stmt.set_namespace_("mysqlx");
1266 
1267     if (params.size() == 2)
1268     {
1269       Any obj;
1270       if (!json_string_to_any(params[1], obj))
1271       {
1272         std::cerr << "Invalid argument for '" << params[0] << "' command; json object expected\n";
1273         return Stop_with_failure;
1274       }
1275       stmt.add_args()->CopyFrom(obj);
1276     }
1277 
1278     context.connection()->send(stmt);
1279 
1280     return Continue;
1281   }
1282 
1283 
1284   bool json_string_to_any(const std::string &json_string, Any &any) const;
1285 
1286 
cmd_sleep(Execution_context & context,const std::string & args)1287   Result cmd_sleep(Execution_context &context, const std::string &args)
1288   {
1289     std::string tmp = args;
1290     replace_variables(tmp);
1291     const double delay_in_seconds = ngs::stod(tmp);
1292 #ifdef _WIN32
1293     const int delay_in_miliseconds = delay_in_seconds * 1000;
1294     Sleep(delay_in_miliseconds);
1295 #else
1296     const int delay_in_ultraseconds = delay_in_seconds * 1000000;
1297     usleep(delay_in_ultraseconds);
1298 #endif
1299     return Continue;
1300   }
1301 
cmd_login(Execution_context & context,const std::string & args)1302   Result cmd_login(Execution_context &context, const std::string &args)
1303   {
1304     std::string user, pass, db, auth_meth;
1305 
1306     if (args.empty())
1307       context.m_cm->get_credentials(user, pass);
1308     else
1309     {
1310       std::string s = args;
1311       replace_variables(s);
1312 
1313       std::string::size_type p = s.find(CMD_ARG_SEPARATOR);
1314       if (p != std::string::npos)
1315       {
1316         user = s.substr(0, p);
1317         s = s.substr(p+1);
1318         p = s.find(CMD_ARG_SEPARATOR);
1319         if (p != std::string::npos)
1320         {
1321           pass = s.substr(0, p);
1322           s = s.substr(p+1);
1323           p = s.find(CMD_ARG_SEPARATOR);
1324           if (p != std::string::npos)
1325           {
1326             db = s.substr(0, p);
1327             auth_meth = s.substr(p+1);
1328           }
1329           else
1330             db = s;
1331         }
1332         else
1333           pass = s;
1334       }
1335       else
1336         user = s;
1337     }
1338 
1339     void (mysqlx::XProtocol::*method)(const std::string &, const std::string &, const std::string &);
1340 
1341     method = &mysqlx::XProtocol::authenticate_mysql41;
1342 
1343     try
1344     {
1345       context.connection()->push_local_notice_handler(ngs::bind(dump_notices, ngs::placeholders::_1, ngs::placeholders::_2));
1346       //XXX
1347       // Prepered for method map
1348       if (0 == strncmp(auth_meth.c_str(), "plain", 5))
1349       {
1350         method = &mysqlx::XProtocol::authenticate_plain;
1351       }
1352       else if ( !(0 == strncmp(auth_meth.c_str(), "mysql41", 5) || 0 == auth_meth.length()))
1353         throw mysqlx::Error(CR_UNKNOWN_ERROR, "Wrong authentication method");
1354 
1355       (context.connection()->*method)(user, pass, db);
1356 
1357       context.connection()->pop_local_notice_handler();
1358 
1359       std::cout << "Login OK\n";
1360     }
1361     catch (mysqlx::Error &err)
1362     {
1363       context.connection()->pop_local_notice_handler();
1364       if (!OPT_expect_error->check_error(err))
1365         return Stop_with_failure;
1366     }
1367 
1368     return Continue;
1369   }
1370 
cmd_repeat(Execution_context & context,const std::string & args)1371   Result cmd_repeat(Execution_context &context, const std::string &args)
1372   {
1373     std::string variable_name = "";
1374     std::vector<std::string> argl;
1375 
1376     aux::split(argl, args, "\t", true);
1377 
1378     if (argl.size() > 1)
1379     {
1380       variable_name = argl[1];
1381     }
1382 
1383     // Allow use of variables as a source of number of iterations
1384     replace_variables(argl[0]);
1385 
1386     Loop_do loop = {context.m_stream.tellg(), ngs::stoi(argl[0]), 0, variable_name};
1387 
1388     m_loop_stack.push_back(loop);
1389 
1390     if (variable_name.length())
1391       variables[variable_name] = ngs::to_string(loop.value);
1392 
1393     return Continue;
1394   }
1395 
cmd_endrepeat(Execution_context & context,const std::string & args)1396   Result cmd_endrepeat(Execution_context &context, const std::string &args)
1397   {
1398     while (m_loop_stack.size())
1399     {
1400       Loop_do &ld = m_loop_stack.back();
1401 
1402       --ld.iterations;
1403       ++ld.value;
1404 
1405       if (ld.variable_name.length())
1406         variables[ld.variable_name] = ngs::to_string(ld.value);
1407 
1408       if (1 > ld.iterations)
1409       {
1410         m_loop_stack.pop_back();
1411         break;
1412       }
1413 
1414       context.m_stream.seekg(ld.block_begin);
1415       break;
1416     }
1417 
1418     return Continue;
1419   }
1420 
cmd_loginerror(Execution_context & context,const std::string & args)1421   Result cmd_loginerror(Execution_context &context, const std::string &args)
1422   {
1423     std::string s = args;
1424     std::string expected, user, pass, db;
1425     int expected_error_code = 0;
1426 
1427     replace_variables(s);
1428     std::string::size_type p = s.find('\t');
1429     if (p != std::string::npos)
1430     {
1431       expected = s.substr(0, p);
1432       s = s.substr(p+1);
1433       p = s.find('\t');
1434       if (p != std::string::npos)
1435       {
1436         user = s.substr(0, p);
1437         s = s.substr(p+1);
1438         p = s.find('\t');
1439         if (p != std::string::npos)
1440         {
1441           pass = s.substr(0, p+1);
1442           db = s.substr(p+1);
1443         }
1444         else
1445           pass = s;
1446       }
1447       else
1448         user = s;
1449     }
1450     else
1451     {
1452       std::cout << error() << "Missing arguments to -->loginerror" << eoerr();
1453       return Stop_with_failure;
1454     }
1455 
1456     try
1457     {
1458       replace_variables(expected);
1459       aux::trim(expected);
1460       expected_error_code = mysqlxtest::get_error_code_by_text(expected);
1461       context.connection()->push_local_notice_handler(ngs::bind(dump_notices, ngs::placeholders::_1, ngs::placeholders::_2));
1462 
1463       context.connection()->authenticate_mysql41(user, pass, db);
1464 
1465       context.connection()->pop_local_notice_handler();
1466 
1467       std::cout << error() << "Login succeeded, but an error was expected" << eoerr();
1468       if (OPT_fatal_errors)
1469         return Stop_with_failure;
1470     }
1471     catch (const std::exception &e)
1472     {
1473       std::cerr << e.what() << "\n";
1474 
1475       return Stop_with_failure;
1476     }
1477     catch (mysqlx::Error &err)
1478     {
1479       context.connection()->pop_local_notice_handler();
1480 
1481       if (err.error() == expected_error_code)
1482         std::cerr << "error (as expected): " << err.what() << " (code " << err.error() << ")\n";
1483       else
1484       {
1485         std::cerr << error() << "was expecting: " << expected_error_code << " but got: " << err.what() << " (code " << err.error() << ")" << eoerr();
1486         if (OPT_fatal_errors)
1487           return Stop_with_failure;
1488       }
1489     }
1490 
1491     return Continue;
1492   }
1493 
cmd_system(Execution_context & context,const std::string & args)1494   Result cmd_system(Execution_context &context, const std::string &args)
1495   {
1496     // command used only at dev level
1497     // example of usage
1498     // -->system (sleep 3; echo "Killing"; ps aux | grep mysqld | egrep -v "gdb .+mysqld" | grep -v  "kdeinit4"| awk '{print($2)}' | xargs kill -s SIGQUIT)&
1499     if (0 == system(args.c_str()))
1500       return Continue;
1501 
1502     return Stop_with_failure;
1503   }
1504 
cmd_recv_all_until_disc(Execution_context & context,const std::string & args)1505   Result cmd_recv_all_until_disc(Execution_context &context, const std::string &args)
1506   {
1507     int msgid;
1508     try
1509     {
1510       while(true)
1511       {
1512         Message_ptr msg(context.connection()->recv_raw(msgid));
1513 
1514         //TODO:
1515         // For now this command will be used in places where random messages
1516         // can reach mysqlxtest in different mtr rans
1517         // the random behavior of server in such cases should be fixed
1518         //if (msg.get())
1519         //  std::cout << unreplace_variables(message_to_text(*msg), true) << "\n";
1520       }
1521     }
1522     catch (mysqlx::Error&)
1523     {
1524       std::cerr << "Server disconnected\n";
1525     }
1526 
1527     if (context.m_cm->is_default_active())
1528       return Stop_with_success;
1529 
1530     context.m_cm->active()->set_closed();
1531     context.m_cm->close_active(false);
1532 
1533     return Continue;
1534   }
1535 
cmd_peerdisc(Execution_context & context,const std::string & args)1536   Result cmd_peerdisc(Execution_context &context, const std::string &args)
1537   {
1538     int expected_delta_time;
1539     int tolerance;
1540     int result = sscanf(args.c_str(),"%i %i", &expected_delta_time, &tolerance);
1541 
1542     if (result <1 || result > 2)
1543     {
1544       std::cerr << "ERROR: Invalid use of command\n";
1545 
1546       return Stop_with_failure;
1547     }
1548 
1549     if (1 == result)
1550     {
1551       tolerance = 10 * expected_delta_time / 100;
1552     }
1553 
1554     ngs::chrono::time_point start_time = ngs::chrono::now();
1555     try
1556     {
1557       int msgid;
1558 
1559       Message_ptr msg(context.connection()->recv_raw_with_deadline(msgid, 2 * expected_delta_time));
1560 
1561       if (msg.get())
1562       {
1563         std::cerr << "ERROR: Received unexpected message.\n";
1564         std::cerr << formatter::message_to_text(*msg) << "\n";
1565       }
1566       else
1567       {
1568         std::cerr << "ERROR: Timeout occur while waiting for disconnection.\n";
1569       }
1570 
1571       return Stop_with_failure;
1572     }
1573     catch (const mysqlx::Error &ec)
1574     {
1575       if (CR_SERVER_GONE_ERROR != ec.error())
1576       {
1577         dumpx(ec);
1578         return Stop_with_failure;
1579       }
1580     }
1581 
1582     int execution_delta_time = ngs::chrono::to_milliseconds(ngs::chrono::now() - start_time);
1583 
1584     if (abs(execution_delta_time - expected_delta_time) > tolerance)
1585     {
1586       std::cerr << "ERROR: Peer disconnected after: "<< execution_delta_time << "[ms], expected: " << expected_delta_time << "[ms]\n";
1587       return Stop_with_failure;
1588     }
1589 
1590     context.m_cm->active()->set_closed();
1591 
1592     if (context.m_cm->is_default_active())
1593       return Stop_with_success;
1594 
1595     context.m_cm->close_active(false);
1596 
1597     return Continue;
1598   }
1599 
cmd_recv(Execution_context & context,const std::string & args)1600   Result cmd_recv(Execution_context &context, const std::string &args)
1601   {
1602     int msgid;
1603     bool quiet = false;
1604     std::string args_copy(args);
1605 
1606     aux::trim(args_copy);
1607     if (args_copy == "quiet") {
1608       quiet = true;
1609       args_copy = "";
1610     }
1611 
1612     try
1613     {
1614       Message_ptr msg(context.connection()->recv_raw(msgid));
1615 
1616       std::ostream &out = get_stream_for_results(quiet);
1617 
1618       if (msg.get())
1619         out << unreplace_variables(formatter::message_to_text(*msg, args_copy), true) << "\n";
1620       if (!OPT_expect_error->check_ok())
1621         return Stop_with_failure;
1622     }
1623     catch (mysqlx::Error &e)
1624     {
1625       if (!quiet && !OPT_expect_error->check_error(e)) //TODO do we need this !quiet ?
1626         return Stop_with_failure;
1627     }
1628     catch (std::exception &e)
1629     {
1630       std::cerr << "ERROR: "<< e.what()<<"\n";
1631       if (OPT_fatal_errors)
1632         return Stop_with_failure;
1633     }
1634     return Continue;
1635   }
1636 
cmd_exit(Execution_context & context,const std::string & args)1637   Result cmd_exit(Execution_context &context, const std::string &args)
1638   {
1639     return Stop_with_success;
1640   }
1641 
cmd_abort(Execution_context & context,const std::string & args)1642   Result cmd_abort(Execution_context &context, const std::string &args)
1643   {
1644     exit(2);
1645     return Stop_with_success;
1646   }
1647 
cmd_nowarnings(Execution_context & context,const std::string & args)1648   Result cmd_nowarnings(Execution_context &context, const std::string &args)
1649   {
1650     OPT_show_warnings = false;
1651     return Continue;
1652   }
1653 
cmd_yeswarnings(Execution_context & context,const std::string & args)1654   Result cmd_yeswarnings(Execution_context &context, const std::string &args)
1655   {
1656     OPT_show_warnings = true;
1657     return Continue;
1658   }
1659 
cmd_fatalerrors(Execution_context & context,const std::string & args)1660   Result cmd_fatalerrors(Execution_context &context, const std::string &args)
1661   {
1662     OPT_fatal_errors = true;
1663     return Continue;
1664   }
1665 
cmd_nofatalerrors(Execution_context & context,const std::string & args)1666   Result cmd_nofatalerrors(Execution_context &context, const std::string &args)
1667   {
1668     OPT_fatal_errors = false;
1669     return Continue;
1670   }
1671 
cmd_newsessionplain(Execution_context & context,const std::string & args)1672   Result cmd_newsessionplain(Execution_context &context, const std::string &args)
1673   {
1674     return do_newsession(context, args, true);
1675   }
1676 
cmd_newsession(Execution_context & context,const std::string & args)1677   Result cmd_newsession(Execution_context &context, const std::string &args)
1678   {
1679     return do_newsession(context, args, false);
1680   }
1681 
do_newsession(Execution_context & context,const std::string & args,bool plain)1682   Result do_newsession(Execution_context &context, const std::string &args, bool plain)
1683   {
1684     std::string s = args;
1685     std::string user, pass, db, name;
1686 
1687     replace_variables(s);
1688 
1689     std::string::size_type p = s.find(CMD_ARG_SEPARATOR);
1690 
1691     if (p != std::string::npos)
1692     {
1693       name = s.substr(0, p);
1694       s = s.substr(p+1);
1695       p = s.find(CMD_ARG_SEPARATOR);
1696       if (p != std::string::npos)
1697       {
1698         user = s.substr(0, p);
1699         s = s.substr(p+1);
1700         p = s.find(CMD_ARG_SEPARATOR);
1701         if (p != std::string::npos)
1702         {
1703           pass = s.substr(0, p);
1704           db = s.substr(p+1);
1705         }
1706         else
1707           pass = s;
1708       }
1709       else
1710         user = s;
1711     }
1712     else
1713       name = s;
1714 
1715     try
1716     {
1717       context.m_cm->create(name, user, pass, db, plain);
1718       if (!OPT_expect_error->check_ok())
1719         return Stop_with_failure;
1720     }
1721     catch (mysqlx::Error &err)
1722     {
1723       if (!OPT_expect_error->check_error(err))
1724         return Stop_with_failure;
1725     }
1726 
1727     return Continue;
1728   }
1729 
cmd_setsession(Execution_context & context,const std::string & args)1730   Result cmd_setsession(Execution_context &context, const std::string &args)
1731   {
1732     std::string s = args;
1733 
1734     replace_variables(s);
1735 
1736     if (!s.empty() && (s[0] == ' ' || s[0] == '\t'))
1737       context.m_cm->set_active(s.substr(1));
1738     else
1739       context.m_cm->set_active(s);
1740     return Continue;
1741   }
1742 
cmd_closesession(Execution_context & context,const std::string & args)1743   Result cmd_closesession(Execution_context &context, const std::string &args)
1744   {
1745     try
1746     {
1747       if (args == " abort")
1748         context.m_cm->abort_active();
1749       else
1750         context.m_cm->close_active();
1751       if (!OPT_expect_error->check_ok())
1752         return Stop_with_failure;
1753     }
1754     catch (mysqlx::Error &err)
1755     {
1756       if (!OPT_expect_error->check_error(err))
1757         return Stop_with_failure;
1758     }
1759     return Continue;
1760   }
1761 
cmd_expecterror(Execution_context & context,const std::string & args)1762   Result cmd_expecterror(Execution_context &context, const std::string &args)
1763   {
1764     try
1765     {
1766       if (args.empty())
1767         throw std::logic_error("expecterror requires an errno argument");
1768 
1769       std::vector<std::string> argl;
1770       aux::split(argl, args, ",", true);
1771       for (std::vector<std::string>::const_iterator arg = argl.begin(); arg != argl.end(); ++arg)
1772       {
1773         std::string value = *arg;
1774 
1775         replace_variables(value);
1776         aux::trim(value);
1777 
1778         const int error_code = mysqlxtest::get_error_code_by_text(value);
1779 
1780         OPT_expect_error->expect_errno(error_code);
1781       }
1782     }
1783     catch(const std::exception &e)
1784     {
1785       std::cerr << e.what() << "\n";
1786 
1787       return Stop_with_failure;
1788     }
1789 
1790     return Continue;
1791   }
1792 
1793 
1794   static ngs::chrono::time_point m_start_measure;
1795 
cmd_measure(Execution_context & context,const std::string & args)1796   Result cmd_measure(Execution_context &context, const std::string &args)
1797   {
1798     m_start_measure = ngs::chrono::now();
1799     return Continue;
1800   }
1801 
cmd_endmeasure(Execution_context & context,const std::string & args)1802   Result cmd_endmeasure(Execution_context &context, const std::string &args)
1803   {
1804     if (!ngs::chrono::is_valid(m_start_measure))
1805     {
1806       std::cerr << "Time measurement, wasn't initialized\n";
1807       return Stop_with_failure;
1808     }
1809 
1810     std::vector<std::string> argl;
1811     aux::split(argl, args, " ", true);
1812     if (argl.size() != 2 && argl.size() != 1)
1813     {
1814       std::cerr << "Invalid number of arguments for command endmeasure\n";
1815       return Stop_with_failure;
1816     }
1817 
1818     const int64_t expected_msec = ngs::stoi(argl[0]);
1819     const int64_t msec = ngs::chrono::to_milliseconds(ngs::chrono::now() - m_start_measure);
1820 
1821     int64_t tolerance = expected_msec * 10 / 100;
1822 
1823     if (2 == argl.size())
1824       tolerance = ngs::stoi(argl[1]);
1825 
1826     if (abs(expected_msec - msec) > tolerance)
1827     {
1828       std::cerr << "Timeout should occur after " << expected_msec << "ms, but it was " << msec <<"ms.  \n";
1829       return Stop_with_failure;
1830     }
1831 
1832     m_start_measure = ngs::chrono::time_point();
1833     return Continue;
1834   }
1835 
cmd_quiet(Execution_context & context,const std::string & args)1836   Result cmd_quiet(Execution_context &context, const std::string &args)
1837   {
1838     OPT_quiet = true;
1839 
1840     return Continue;
1841   }
1842 
cmd_noquiet(Execution_context & context,const std::string & args)1843   Result cmd_noquiet(Execution_context &context, const std::string &args)
1844   {
1845     OPT_quiet = false;
1846 
1847     return Continue;
1848   }
1849 
cmd_varsub(Execution_context & context,const std::string & args)1850   Result cmd_varsub(Execution_context &context, const std::string &args)
1851   {
1852     variables_to_unreplace.push_back(args);
1853     return Continue;
1854   }
1855 
cmd_varlet(Execution_context & context,const std::string & args)1856   Result cmd_varlet(Execution_context &context, const std::string &args)
1857   {
1858     std::string::size_type p = args.find(' ');
1859     if (p == std::string::npos)
1860     {
1861       variables[args] = "";
1862     }
1863     else
1864     {
1865       std::string value = args.substr(p+1);
1866       replace_variables(value);
1867       variables[args.substr(0, p)] = value;
1868     }
1869     return Continue;
1870   }
1871 
cmd_varinc(Execution_context & context,const std::string & args)1872   Result cmd_varinc(Execution_context &context, const std::string &args)
1873   {
1874     std::vector<std::string> argl;
1875     aux::split(argl, args, " ", true);
1876     if (argl.size() != 2)
1877     {
1878       std::cerr << "Invalid number of arguments for command varinc\n";
1879       return Stop_with_failure;
1880     }
1881 
1882     if (variables.find(argl[0]) == variables.end())
1883     {
1884       std::cerr << "Invalid variable " << argl[0] << "\n";
1885       return Stop_with_failure;
1886     }
1887 
1888     std::string val = variables[argl[0]];
1889     char* c;
1890     std::string inc_by = argl[1].c_str();
1891 
1892     replace_variables(inc_by);
1893 
1894     long int_val = strtol(val.c_str(), &c, 10);
1895     long int_n = strtol(inc_by.c_str(), &c, 10);
1896     int_val += int_n;
1897     val = ngs::to_string(int_val);
1898     variables[argl[0]] = val;
1899 
1900     return Continue;
1901   }
1902 
cmd_vargen(Execution_context & context,const std::string & args)1903   Result cmd_vargen(Execution_context &context, const std::string &args)
1904   {
1905     std::vector<std::string> argl;
1906     aux::split(argl, args, " ", true);
1907     if (argl.size() != 3)
1908     {
1909       std::cerr << "Invalid number of arguments for command vargen\n";
1910       return Stop_with_failure;
1911     }
1912     std::string data(ngs::stoi(argl[2]), *argl[1].c_str());
1913     variables[argl[0]] = data;
1914     return Continue;
1915   }
1916 
cmd_varfile(Execution_context & context,const std::string & args)1917   Result cmd_varfile(Execution_context &context, const std::string &args)
1918   {
1919     std::vector<std::string> argl;
1920     aux::split(argl, args, " ", true);
1921     if (argl.size() != 2)
1922     {
1923       std::cerr << "Invalid number of arguments for command varfile " << args << "\n";
1924       return Stop_with_failure;
1925     }
1926 
1927     std::string path_to_file = argl[1];
1928     replace_variables(path_to_file);
1929 
1930     std::ifstream file(path_to_file.c_str());
1931     if (!file.is_open())
1932     {
1933       std::cerr << "Couldn't not open file " << path_to_file <<"\n";
1934       return Stop_with_failure;
1935     }
1936 
1937     file.seekg(0, file.end);
1938     size_t len = file.tellg();
1939     file.seekg(0);
1940 
1941     char *buffer = new char[len];
1942     file.read(buffer, len);
1943     variables[argl[0]] = std::string(buffer, len);
1944     delete []buffer;
1945 
1946     return Continue;
1947   }
1948 
cmd_binsend(Execution_context & context,const std::string & args)1949   Result cmd_binsend(Execution_context &context, const std::string &args)
1950   {
1951     std::string args_copy = args;
1952     replace_variables(args_copy);
1953     std::string data = bindump_to_data(args_copy);
1954 
1955     std::cout << "Sending " << data.length() << " bytes raw data...\n";
1956     context.m_cm->active()->send_bytes(data);
1957     return Continue;
1958   }
1959 
cmd_hexsend(Execution_context & context,const std::string & args)1960   Result cmd_hexsend(Execution_context &context, const std::string &args)
1961   {
1962     std::string args_copy = args;
1963     replace_variables(args_copy);
1964 
1965     if (0 == args_copy.length())
1966     {
1967       std::cerr << "Data should not be present\n";
1968       return Stop_with_failure;
1969     }
1970 
1971     if (0 != args_copy.length() % 2)
1972     {
1973       std::cerr << "Size of data should be a multiplication of two, current length:" << args_copy.length()<<"\n";
1974       return Stop_with_failure;
1975     }
1976 
1977     std::string data;
1978     try
1979     {
1980       aux::unhex(args_copy, data);
1981     }
1982     catch(const std::exception&)
1983     {
1984       std::cerr << "Hex string is invalid\n";
1985       return Stop_with_failure;
1986     }
1987 
1988     std::cout << "Sending " << data.length() << " bytes raw data...\n";
1989     context.m_cm->active()->send_bytes(data);
1990     return Continue;
1991   }
1992 
value_to_offset(const std::string & data,const size_t maximum_value)1993   size_t value_to_offset(const std::string &data, const size_t maximum_value)
1994   {
1995     if ('%' == *data.rbegin())
1996     {
1997       size_t percent = ngs::stoi(data);
1998 
1999       return maximum_value * percent / 100;
2000     }
2001 
2002     return ngs::stoi(data);
2003   }
2004 
cmd_binsendoffset(Execution_context & context,const std::string & args)2005   Result cmd_binsendoffset(Execution_context &context, const std::string &args)
2006   {
2007     std::string args_copy = args;
2008     replace_variables(args_copy);
2009 
2010     std::vector<std::string> argl;
2011     aux::split(argl, args_copy, " ", true);
2012 
2013     size_t begin_bin = 0;
2014     size_t end_bin = 0;
2015     std::string data;
2016 
2017     try
2018     {
2019       data = bindump_to_data(argl[0]);
2020       end_bin = data.length();
2021 
2022       if (argl.size() > 1)
2023       {
2024         begin_bin = value_to_offset(argl[1], data.length());
2025         if (argl.size() > 2)
2026         {
2027           end_bin = value_to_offset(argl[2], data.length());
2028 
2029           if (argl.size() > 3)
2030             throw std::out_of_range("Too many arguments");
2031         }
2032       }
2033     }
2034     catch (const std::out_of_range&)
2035     {
2036       std::cerr << "Invalid number of arguments for command binsendoffset:" << argl.size() << "\n";
2037       return Stop_with_failure;
2038     }
2039 
2040     std::cout << "Sending " << end_bin << " bytes raw data...\n";
2041     context.m_cm->active()->send_bytes(data.substr(begin_bin, end_bin - begin_bin));
2042     return Continue;
2043   }
2044 
cmd_callmacro(Execution_context & context,const std::string & args)2045   Result cmd_callmacro(Execution_context &context, const std::string &args)
2046   {
2047     if (Macro::call(context, args))
2048       return Continue;
2049     return Stop_with_failure;
2050   }
2051 
cmd_assert_eq(Execution_context & context,const std::string & args)2052   Result cmd_assert_eq(Execution_context &context, const std::string &args)
2053   {
2054     std::vector<std::string> vargs;
2055 
2056     aux::split(vargs, args, "\t", true);
2057 
2058     if (2 != vargs.size())
2059     {
2060       std::cerr << "Specified invalid number of arguments for command assert_eq:" << vargs.size() << " expecting 2\n";
2061       return Stop_with_failure;
2062     }
2063 
2064     replace_variables(vargs[0]);
2065     replace_variables(vargs[1]);
2066 
2067     if (vargs[0] != vargs[1])
2068     {
2069       std::cerr << "Expecting '" << vargs[0] << "', but received '" << vargs[1] << "'\n";
2070       return Stop_with_failure;
2071     }
2072 
2073     return Continue;
2074   }
2075 
cmd_assert_gt(Execution_context & context,const std::string & args)2076   Result cmd_assert_gt(Execution_context &context, const std::string &args)
2077   {
2078     std::vector<std::string> vargs;
2079 
2080     aux::split(vargs, args, "\t", true);
2081 
2082     if (2 != vargs.size())
2083     {
2084       std::cerr << "Specified invalid number of arguments for command assert_gt:" << vargs.size() << " expecting 2\n";
2085       return Stop_with_failure;
2086     }
2087 
2088     replace_variables(vargs[0]);
2089     replace_variables(vargs[1]);
2090 
2091     if (ngs::stoi(vargs[0]) <= ngs::stoi(vargs[1]))
2092     {
2093       std::cerr << "Expecting '" << vargs[0] << "' to be greater than '" << vargs[1] << "'\n";
2094       return Stop_with_failure;
2095     }
2096 
2097     return Continue;
2098   }
2099 
cmd_assert_ge(Execution_context & context,const std::string & args)2100   Result cmd_assert_ge(Execution_context &context, const std::string &args)
2101   {
2102     std::vector<std::string> vargs;
2103     char *end_string = NULL;
2104 
2105     aux::split(vargs, args, "\t", true);
2106 
2107     if (2 != vargs.size())
2108     {
2109       std::cerr << "Specified invalid number of arguments for command assert_gt:" << vargs.size() << " expecting 2\n";
2110       return Stop_with_failure;
2111     }
2112 
2113     replace_variables(vargs[0]);
2114     replace_variables(vargs[1]);
2115 
2116     if (strtoll(vargs[0].c_str(), &end_string, 10) < strtoll(vargs[1].c_str(), &end_string, 10))
2117     {
2118       std::cerr << "assert_gt(" << args << ") failed!\n";
2119       std::cerr << "Expecting '" << vargs[0] << "' to be greater or equal to '" << vargs[1] << "'\n";
2120       return Stop_with_failure;
2121     }
2122 
2123     return Continue;
2124   }
2125 
cmd_query(Execution_context & context,const std::string & args)2126   Result cmd_query(Execution_context &context, const std::string &args)
2127   {
2128     OPT_query = true;
2129     return Continue;
2130   }
2131 
cmd_noquery(Execution_context & context,const std::string & args)2132   Result cmd_noquery(Execution_context &context, const std::string &args)
2133   {
2134     OPT_query = false;
2135     return Continue;
2136   }
2137 
put_variable_to(std::string & result,const std::string & value)2138   static void put_variable_to(std::string &result, const std::string &value)
2139   {
2140     result = value;
2141   }
2142 
try_result(Result result)2143   static void try_result(Result result)
2144   {
2145     if (result != Continue)
2146       throw result;
2147   }
2148 
2149   template <typename T>
2150   class Backup_and_restore
2151   {
2152   public:
Backup_and_restore(T & variable,const T & temporaru_value)2153     Backup_and_restore(T &variable, const T &temporaru_value)
2154     : m_variable(variable), m_value(variable)
2155     {
2156       m_variable = temporaru_value;
2157     }
2158 
~Backup_and_restore()2159     ~Backup_and_restore()
2160     {
2161       m_variable = m_value;
2162     }
2163 
2164   private:
2165     T &m_variable;
2166     T m_value;
2167   };
2168 
cmd_wait_for(Execution_context & context,const std::string & args)2169   Result cmd_wait_for(Execution_context &context, const std::string &args)
2170   {
2171     bool match = false;
2172     const int countdown_start_value = 30;
2173     int  countdown_retries = countdown_start_value;
2174 
2175     std::string args_variables_replaced = args;
2176     std::vector<std::string> vargs;
2177 
2178     replace_variables(args_variables_replaced);
2179     aux::split(vargs, args_variables_replaced, "\t", true);
2180 
2181     if (2 != vargs.size())
2182     {
2183       std::cerr << "Specified invalid number of arguments for command wait_for:" << vargs.size() << " expecting 2\n";
2184       return Stop_with_failure;
2185     }
2186 
2187     const std::string &expected_value = vargs[0];
2188     std::string value;
2189 
2190     try
2191     {
2192       do
2193       {
2194         Backup_and_restore<bool>        backup_and_restore_fatal_errors(OPT_fatal_errors, true);
2195         Backup_and_restore<bool>        backup_and_restore_query(OPT_query, false);
2196         Backup_and_restore<std::string> backup_and_restore_command_name(context.m_command_name, "sql");
2197 
2198         try_result(cmd_stmtsql(context, vargs[1]));
2199         try_result(cmd_recvresult(context, "", ngs::bind(&Command::put_variable_to, ngs::ref(value), ngs::placeholders::_1)));
2200         try_result(cmd_sleep(context,"1"));
2201 
2202         match = (value == expected_value);
2203       }
2204       while(!match && --countdown_retries);
2205     }
2206     catch(const Result result)
2207     {
2208       std::cerr << "'Wait_for' failed because one of subsequent commands failed\n";
2209       return  result;
2210     }
2211 
2212     if (!match)
2213     {
2214       std::cerr << "Query didn't return expected value, tried " << countdown_start_value << " times\n";
2215       std::cerr << "Expected '" << expected_value << "', received '" << value << "'\n";
2216       return Stop_with_failure;
2217     }
2218 
2219     return Continue;
2220   }
2221 
2222   Result cmd_import(Execution_context &context, const std::string &args);
2223 
cmd_received(Execution_context & context,const std::string & args)2224   Result cmd_received(Execution_context &context, const std::string &args)
2225   {
2226     std::string cargs(args);
2227     std::vector<std::string> vargs;
2228     aux::split(vargs, cargs, " \t", true);
2229     replace_variables(vargs[0]);
2230 
2231     if (2 != vargs.size())
2232     {
2233       std::cerr << "Specified invalid number of arguments for command received:"
2234                 << vargs.size() << " expecting 2\n";
2235       return Stop_with_failure;
2236     }
2237 
2238     set_variable(vargs[1],
2239                  ngs::to_string(
2240                      context.connection()->get_received_msg_counter(vargs[0])));
2241     return Continue;
2242   }
2243 };
2244 
2245 ngs::chrono::time_point Command::m_start_measure;
2246 
process_client_message(mysqlx::XProtocol * connection,int8_t msg_id,const mysqlx::Message & msg)2247 static int process_client_message(mysqlx::XProtocol *connection, int8_t msg_id, const mysqlx::Message &msg)
2248 {
2249   if (!OPT_quiet)
2250     std::cout << "send " << formatter::message_to_text(msg) << "\n";
2251 
2252   if (OPT_bindump)
2253     std::cout << message_to_bindump(msg) << "\n";
2254 
2255   try
2256   {
2257     // send request
2258     connection->send(msg_id, msg);
2259 
2260     if (!OPT_expect_error->check_ok())
2261       return 1;
2262   }
2263   catch (mysqlx::Error &err)
2264   {
2265     if (!OPT_expect_error->check_error(err))
2266       return 1;
2267   }
2268   return 0;
2269 }
2270 
print_result_set(mysqlx::Result & result)2271 static void print_result_set(mysqlx::Result &result)
2272 {
2273   std::vector<std::string> empty_column_array_print_all;
2274 
2275   print_result_set(result, empty_column_array_print_all);
2276 }
2277 
2278 template<typename T>
get_object_value(const T & value)2279 std::string get_object_value(const T &value)
2280 {
2281   std::stringstream result;
2282   result << value;
2283 
2284   return result.str();
2285 }
2286 
get_field_value(ngs::shared_ptr<mysqlx::Row> & row,const int field,ngs::shared_ptr<std::vector<mysqlx::ColumnMetadata>> & meta)2287 std::string get_field_value(ngs::shared_ptr<mysqlx::Row> &row, const int field, ngs::shared_ptr<std::vector<mysqlx::ColumnMetadata> > &meta)
2288 {
2289   if (row->isNullField(field))
2290   {
2291     return "null";
2292   }
2293 
2294   try
2295   {
2296     const mysqlx::ColumnMetadata &col(meta->at(field));
2297 
2298     switch (col.type)
2299     {
2300     case mysqlx::SINT:
2301       return ngs::to_string(row->sInt64Field(field));
2302 
2303     case mysqlx::UINT:
2304       return ngs::to_string(row->uInt64Field(field));
2305 
2306     case mysqlx::DOUBLE:
2307       if (col.fractional_digits < 31)
2308       {
2309         char buffer[100];
2310         my_fcvt(row->doubleField(field), col.fractional_digits, buffer, NULL);
2311         return buffer;
2312       }
2313       return ngs::to_string(row->doubleField(field));
2314 
2315     case mysqlx::FLOAT:
2316       if (col.fractional_digits < 31)
2317       {
2318         char buffer[100];
2319         my_fcvt(row->floatField(field), col.fractional_digits, buffer, NULL);
2320         return buffer;
2321       }
2322       return ngs::to_string(row->floatField(field));
2323 
2324     case mysqlx::BYTES:
2325     {
2326       std::string tmp(row->stringField(field));
2327       return unreplace_variables(tmp, false);
2328     }
2329 
2330     case mysqlx::TIME:
2331       return get_object_value(row->timeField(field));
2332 
2333     case mysqlx::DATETIME:
2334       return get_object_value(row->dateTimeField(field));
2335 
2336     case mysqlx::DECIMAL:
2337       return row->decimalField(field);
2338 
2339     case mysqlx::SET:
2340       return row->setFieldStr(field);
2341 
2342     case mysqlx::ENUM:
2343       return row->enumField(field);
2344 
2345     case mysqlx::BIT:
2346       return get_object_value(row->bitField(field));
2347     }
2348   }
2349   catch (std::exception &e)
2350   {
2351     std::cout << "ERROR: " << e.what() << "\n";
2352   }
2353 
2354   return "";
2355 }
2356 
2357 
2358 namespace
2359 {
2360 
get_typename(const mysqlx::FieldType & field)2361 inline std::string get_typename(const mysqlx::FieldType& field)
2362 {
2363   switch (field)
2364   {
2365   case mysqlx::SINT:
2366     return "SINT";
2367   case mysqlx::UINT:
2368     return "UINT";
2369   case mysqlx::DOUBLE:
2370     return "DOUBLE";
2371   case mysqlx::FLOAT:
2372     return "FLOAT";
2373   case mysqlx::BYTES:
2374     return "BYTES";
2375   case mysqlx::TIME:
2376     return "TIME";
2377   case mysqlx::DATETIME:
2378     return "DATETIME";
2379   case mysqlx::SET:
2380     return "SET";
2381   case mysqlx::ENUM:
2382     return "ENUM";
2383   case mysqlx::BIT:
2384     return "BIT";
2385   case mysqlx::DECIMAL:
2386     return "DECIMAL";
2387   }
2388   return "UNKNOWN";
2389 }
2390 
2391 
get_flags(const mysqlx::FieldType & field,uint32_t flags)2392 inline std::string get_flags(const mysqlx::FieldType& field, uint32_t flags)
2393 {
2394   std::string r;
2395 
2396   if (flags & MYSQLX_COLUMN_FLAGS_UINT_ZEROFILL) // and other equal 1
2397   {
2398     switch (field)
2399     {
2400     case mysqlx::SINT:
2401     case mysqlx::UINT:
2402       r += " ZEROFILL";
2403       break;
2404 
2405     case mysqlx::DOUBLE:
2406     case mysqlx::FLOAT:
2407     case mysqlx::DECIMAL:
2408       r += " UNSIGNED";
2409       break;
2410 
2411     case mysqlx::BYTES:
2412       r += " RIGHTPAD";
2413       break;
2414 
2415     case mysqlx::DATETIME:
2416       r += " TIMESTAMP";
2417       break;
2418 
2419     default:
2420       ;
2421     }
2422   }
2423   if (flags & MYSQLX_COLUMN_FLAGS_NOT_NULL)
2424     r += " NOT_NULL";
2425 
2426   if (flags & MYSQLX_COLUMN_FLAGS_PRIMARY_KEY)
2427     r += " PRIMARY_KEY";
2428 
2429   if (flags & MYSQLX_COLUMN_FLAGS_UNIQUE_KEY)
2430     r += " UNIQUE_KEY";
2431 
2432   if (flags & MYSQLX_COLUMN_FLAGS_MULTIPLE_KEY)
2433     r += " MULTIPLE_KEY";
2434 
2435   if (flags & MYSQLX_COLUMN_FLAGS_AUTO_INCREMENT)
2436     r += " AUTO_INCREMENT";
2437 
2438   return r;
2439 }
2440 
2441 
2442 } // namespace
2443 
2444 
print_columndata(const std::vector<mysqlx::ColumnMetadata> & meta)2445 static void print_columndata(const std::vector<mysqlx::ColumnMetadata> &meta)
2446 {
2447   for (std::vector<mysqlx::ColumnMetadata>::const_iterator col = meta.begin(); col != meta.end(); ++col)
2448   {
2449     std::cout << col->name << ":" << get_typename(col->type) << ':'
2450               << get_flags(col->type, col->flags) << '\n';
2451   }
2452 }
2453 
print_result_set(mysqlx::Result & result,const std::vector<std::string> & columns,Value_callback value_callback,bool quiet)2454 static void print_result_set(mysqlx::Result &result, const std::vector<std::string> &columns,
2455                              Value_callback value_callback, bool quiet)
2456 {
2457   ngs::shared_ptr<std::vector<mysqlx::ColumnMetadata> > meta(result.columnMetadata());
2458   std::vector<int> column_indexes;
2459   int column_index = -1;
2460   bool first = true;
2461 
2462   std::ostream &out = get_stream_for_results(quiet);
2463 
2464   for (std::vector<mysqlx::ColumnMetadata>::const_iterator col = meta->begin();
2465       col != meta->end(); ++col)
2466   {
2467     ++column_index;
2468 
2469     if (!first)
2470       out << "\t";
2471     else
2472       first = false;
2473 
2474     if (!columns.empty() && columns.end() == std::find(columns.begin(), columns.end(), col->name))
2475       continue;
2476 
2477     column_indexes.push_back(column_index);
2478     out << col->name;
2479   }
2480   out << "\n";
2481 
2482   for (;;)
2483   {
2484     ngs::shared_ptr<mysqlx::Row> row(result.next());
2485     if (!row.get())
2486       break;
2487 
2488     std::vector<int>::iterator i = column_indexes.begin();
2489     for (; i != column_indexes.end() && (*i) < row->numFields(); ++i)
2490     {
2491       int field = (*i);
2492       if (field != 0)
2493         out << "\t";
2494 
2495       std::string result = get_field_value(row, field, meta);
2496 
2497       if (value_callback)
2498       {
2499         value_callback(result);
2500         Value_callback().swap(value_callback);
2501       }
2502       out << result;
2503     }
2504     out << "\n";
2505   }
2506 }
2507 
run_sql_batch(mysqlx::XProtocol * conn,const std::string & sql_)2508 static int run_sql_batch(mysqlx::XProtocol *conn, const std::string &sql_)
2509 {
2510   std::string delimiter = ";";
2511   std::vector<std::pair<size_t, size_t> > ranges;
2512   std::stack<std::string> input_context_stack;
2513   std::string sql = sql_;
2514 
2515   replace_variables(sql);
2516 
2517   shcore::mysql::splitter::determineStatementRanges(sql.data(), sql.length(), delimiter,
2518                                                     ranges, "\n", input_context_stack);
2519 
2520   for (std::vector<std::pair<size_t, size_t> >::const_iterator st = ranges.begin(); st != ranges.end(); ++st)
2521   {
2522     try
2523     {
2524       if (!OPT_quiet)
2525         std::cout << "RUN " << sql.substr(st->first, st->second) << "\n";
2526       ngs::shared_ptr<mysqlx::Result> result(conn->execute_sql(sql.substr(st->first, st->second)));
2527       if (result.get())
2528       {
2529         do
2530         {
2531           print_result_set(*result.get());
2532         } while (result->nextDataSet());
2533 
2534         int64_t x = result->affectedRows();
2535         if (x >= 0)
2536           std::cout << x << " rows affected\n";
2537         if (result->lastInsertId() > 0)
2538           std::cout << "last insert id: " << result->lastInsertId() << "\n";
2539         if (!result->infoMessage().empty())
2540           std::cout << result->infoMessage() << "\n";
2541 
2542         if (OPT_show_warnings)
2543         {
2544           std::vector<mysqlx::Result::Warning> warnings(result->getWarnings());
2545           if (!warnings.empty())
2546             std::cout << "Warnings generated:\n";
2547           for (std::vector<mysqlx::Result::Warning>::const_iterator w = warnings.begin();
2548               w != warnings.end(); ++w)
2549           {
2550             std::cout << (w->is_note ? "NOTE" : "WARNING") << " | " << w->code << " | " << w->text << "\n";
2551           }
2552         }
2553       }
2554     }
2555     catch (mysqlx::Error &err)
2556     {
2557       variables_to_unreplace.clear();
2558 
2559       std::cerr << "While executing " << sql.substr(st->first, st->second) << ":\n";
2560       if (!OPT_expect_error->check_error(err))
2561         return 1;
2562     }
2563   }
2564   variables_to_unreplace.clear();
2565   return 0;
2566 }
2567 
2568 enum Block_result
2569 {
2570   Block_result_feed_more,
2571   Block_result_eated_but_not_hungry,
2572   Block_result_not_hungry,
2573   Block_result_indigestion,
2574   Block_result_everyone_not_hungry
2575 };
2576 
2577 class Block_processor
2578 {
2579 public:
~Block_processor()2580   virtual ~Block_processor() {}
2581 
2582   virtual Block_result feed(std::istream &input, const char *linebuf) = 0;
feed_ended_is_state_ok()2583   virtual bool feed_ended_is_state_ok() { return true; }
2584 };
2585 
2586 typedef ngs::shared_ptr<Block_processor> Block_processor_ptr;
2587 
2588 class Sql_block_processor : public Block_processor
2589 {
2590 public:
Sql_block_processor(Connection_manager * cm)2591   Sql_block_processor(Connection_manager *cm)
2592   : m_cm(cm), m_sql(false)
2593   { }
2594 
feed(std::istream & input,const char * linebuf)2595   virtual Block_result feed(std::istream &input, const char *linebuf)
2596   {
2597     if (m_sql)
2598     {
2599       if (strcmp(linebuf, "-->endsql") == 0)
2600       {
2601         {
2602           int r = run_sql_batch(m_cm->active(), m_rawbuffer);
2603           if (r != 0)
2604           {
2605             return Block_result_indigestion;
2606           }
2607         }
2608         m_sql = false;
2609 
2610         return Block_result_eated_but_not_hungry;
2611       }
2612       else
2613         m_rawbuffer.append(linebuf).append("\n");
2614 
2615       return Block_result_feed_more;
2616     }
2617 
2618     // -->command
2619     if (strcmp(linebuf, "-->sql") == 0)
2620     {
2621       m_rawbuffer.clear();
2622       m_sql = true;
2623       // feed everything until -->endraw to the mysql client
2624 
2625       return Block_result_feed_more;
2626     }
2627 
2628     return Block_result_not_hungry;
2629   }
2630 
feed_ended_is_state_ok()2631   virtual bool feed_ended_is_state_ok()
2632   {
2633     if (m_sql)
2634     {
2635       std::cerr << error() << "Unclosed -->sql directive" << eoerr();
2636       return false;
2637     }
2638 
2639     return true;
2640   }
2641 
2642 private:
2643   Connection_manager *m_cm;
2644   std::string m_rawbuffer;
2645   bool m_sql;
2646 };
2647 
2648 class Macro_block_processor : public Block_processor
2649 {
2650 public:
Macro_block_processor(Connection_manager * cm)2651   Macro_block_processor(Connection_manager *cm)
2652   : m_cm(cm)
2653   { }
2654 
~Macro_block_processor()2655   ~Macro_block_processor()
2656   {
2657   }
2658 
feed(std::istream & input,const char * linebuf)2659   virtual Block_result feed(std::istream &input, const char *linebuf)
2660   {
2661     if (m_macro)
2662     {
2663       if (strcmp(linebuf, "-->endmacro") == 0)
2664       {
2665         m_macro->set_body(m_rawbuffer);
2666 
2667         Macro::add(m_macro);
2668         if (OPT_verbose)
2669           std::cout << "Macro " << m_macro->name() << " defined\n";
2670 
2671         m_macro.reset();
2672 
2673         return Block_result_eated_but_not_hungry;
2674       }
2675       else
2676         m_rawbuffer.append(linebuf).append("\n");
2677 
2678       return Block_result_feed_more;
2679     }
2680 
2681     // -->command
2682     const char *cmd = "-->macro ";
2683     if (strncmp(linebuf, cmd, strlen(cmd)) == 0)
2684     {
2685       std::list<std::string> args;
2686       std::string t(linebuf+strlen(cmd));
2687       aux::split(args, t, " \t", true);
2688 
2689       if (args.empty())
2690       {
2691         std::cerr << error() << "Missing macro name argument for -->macro" << eoerr();
2692         return Block_result_indigestion;
2693       }
2694 
2695       m_rawbuffer.clear();
2696       std::string name = args.front();
2697       args.pop_front();
2698       m_macro.reset(new Macro(name, args));
2699 
2700       return Block_result_feed_more;
2701     }
2702 
2703     return Block_result_not_hungry;
2704   }
2705 
feed_ended_is_state_ok()2706   virtual bool feed_ended_is_state_ok()
2707   {
2708     if (m_macro)
2709     {
2710       std::cerr << error() << "Unclosed -->macro directive" << eoerr();
2711       return false;
2712     }
2713 
2714     return true;
2715   }
2716 
2717 private:
2718   Connection_manager *m_cm;
2719   ngs::shared_ptr<Macro> m_macro;
2720   std::string m_rawbuffer;
2721 };
2722 
2723 class Single_command_processor: public Block_processor
2724 {
2725 public:
Single_command_processor(Connection_manager * cm)2726   Single_command_processor(Connection_manager *cm)
2727   : m_cm(cm)
2728   { }
2729 
feed(std::istream & input,const char * linebuf)2730   virtual Block_result feed(std::istream &input, const char *linebuf)
2731   {
2732     Execution_context context(input, m_cm);
2733 
2734     if (m_command.is_command_syntax(linebuf))
2735     {
2736       {
2737         Command::Result r = m_command.process(context, linebuf);
2738         if (Command::Stop_with_failure == r)
2739           return Block_result_indigestion;
2740         else if (Command::Stop_with_success == r)
2741           return Block_result_everyone_not_hungry;
2742       }
2743 
2744       return Block_result_eated_but_not_hungry;
2745     }
2746     // # comment
2747     else if (linebuf[0] == '#' || linebuf[0] == 0)
2748     {
2749       return Block_result_eated_but_not_hungry;
2750     }
2751 
2752     return Block_result_not_hungry;
2753   }
2754 
2755 private:
2756   Command m_command;
2757   Connection_manager *m_cm;
2758 };
2759 
2760 class Snd_message_block_processor: public Block_processor
2761 {
2762 public:
Snd_message_block_processor(Connection_manager * cm)2763   Snd_message_block_processor(Connection_manager *cm)
2764   : m_cm(cm)
2765   { }
2766 
feed(std::istream & input,const char * linebuf)2767   virtual Block_result feed(std::istream &input, const char *linebuf)
2768   {
2769     if (m_full_name.empty())
2770     {
2771       if (!(m_full_name = get_message_name(linebuf)).empty())
2772       {
2773         m_buffer.clear();
2774         return Block_result_feed_more;
2775       }
2776     }
2777     else
2778     {
2779       if (linebuf[0] == '}')
2780       {
2781         int8_t msg_id = 0;
2782         std::string processed_buffer = m_buffer;
2783         replace_variables(processed_buffer);
2784 
2785         Message_ptr msg(text_to_client_message(m_full_name, processed_buffer, msg_id));
2786 
2787         m_full_name.clear();
2788         if (!msg.get())
2789           return Block_result_indigestion;
2790 
2791         {
2792           int r = process(msg_id, *msg.get());
2793 
2794           if (r != 0)
2795             return Block_result_indigestion;
2796         }
2797 
2798         return Block_result_eated_but_not_hungry;
2799       }
2800       else
2801       {
2802         m_buffer.append(linebuf).append("\n");
2803         return Block_result_feed_more;
2804       }
2805     }
2806 
2807     return Block_result_not_hungry;
2808   }
2809 
feed_ended_is_state_ok()2810   virtual bool feed_ended_is_state_ok()
2811   {
2812     if (!m_full_name.empty())
2813     {
2814       std::cerr << error() << "Incomplete message " << m_full_name << eoerr();
2815       return false;
2816     }
2817 
2818     return true;
2819   }
2820 
2821 private:
get_message_name(const char * linebuf)2822   virtual std::string get_message_name(const char *linebuf)
2823   {
2824     const char *p;
2825     if ((p = strstr(linebuf, " {")))
2826     {
2827       return std::string(linebuf, p-linebuf);
2828     }
2829 
2830     return "";
2831   }
2832 
process(const int8_t msg_id,mysqlx::Message & message)2833   virtual int process(const int8_t msg_id, mysqlx::Message &message)
2834   {
2835     return process_client_message(m_cm->active(), msg_id, message);
2836   }
2837 
2838   Connection_manager *m_cm;
2839   std::string m_buffer;
2840   std::string m_full_name;
2841 };
2842 
2843 class Dump_message_block_processor: public Snd_message_block_processor
2844 {
2845 public:
Dump_message_block_processor(Connection_manager * cm)2846   Dump_message_block_processor(Connection_manager *cm)
2847   : Snd_message_block_processor(cm)
2848   { }
2849 
2850 private:
get_message_name(const char * linebuf)2851   virtual std::string get_message_name(const char *linebuf)
2852   {
2853     const char *command_dump = "-->binparse";
2854     std::vector<std::string> args;
2855 
2856     aux::split(args, linebuf, " ", true);
2857 
2858     if (4 != args.size())
2859       return "";
2860 
2861     if (args[0] == command_dump && args[3] == "{")
2862     {
2863       m_variable_name = args[1];
2864       return args[2];
2865     }
2866 
2867     return "";
2868   }
2869 
process(const int8_t msg_id,mysqlx::Message & message)2870   virtual int process(const int8_t msg_id, mysqlx::Message &message)
2871   {
2872     std::string bin_message = message_to_bindump(message);
2873 
2874     variables[m_variable_name] = bin_message;
2875 
2876     return 0;
2877   }
2878 
2879   std::string m_variable_name;
2880 };
2881 
process_client_input(std::istream & input,std::vector<Block_processor_ptr> & eaters)2882 static int process_client_input(std::istream &input, std::vector<Block_processor_ptr> &eaters)
2883 {
2884   const std::size_t buffer_length = 64*1024 + 1024;
2885   char              linebuf[buffer_length + 1];
2886 
2887   linebuf[buffer_length] = 0;
2888 
2889   if (!input.good())
2890   {
2891     std::cerr << "Input stream isn't valid\n";
2892 
2893     return 1;
2894   }
2895 
2896   Block_processor_ptr hungry_block_reader;
2897 
2898   while (!input.eof())
2899   {
2900     Block_result result = Block_result_not_hungry;
2901 
2902     input.getline(linebuf, buffer_length);
2903     script_stack.front().line_number++;
2904 
2905     if (!hungry_block_reader)
2906     {
2907       std::vector<Block_processor_ptr>::iterator i = eaters.begin();
2908 
2909       while (i != eaters.end() &&
2910           Block_result_not_hungry == result)
2911       {
2912         result = (*i)->feed(input, linebuf);
2913 
2914         if (Block_result_indigestion == result)
2915           return 1;
2916 
2917         if (Block_result_feed_more == result)
2918           hungry_block_reader = (*i);
2919 
2920         ++i;
2921       }
2922 
2923       if (Block_result_everyone_not_hungry == result)
2924         break;
2925 
2926       continue;
2927     }
2928 
2929     result = hungry_block_reader->feed(input, linebuf);
2930 
2931     if (Block_result_indigestion == result)
2932       return 1;
2933 
2934     if (Block_result_feed_more != result)
2935       hungry_block_reader.reset();
2936 
2937     if (Block_result_everyone_not_hungry == result)
2938       break;
2939   }
2940 
2941   std::vector<Block_processor_ptr>::iterator i = eaters.begin();
2942 
2943   while (i != eaters.end())
2944   {
2945     if (!(*i)->feed_ended_is_state_ok())
2946       return 1;
2947 
2948     ++i;
2949   }
2950 
2951   return 0;
2952 }
2953 
2954 #include "cmdline_options.h"
2955 
2956 class My_command_line_options : public Command_line_options
2957 {
2958 public:
2959   enum Run_mode{
2960     RunTest,
2961     RunTestWithoutAuth
2962   } run_mode;
2963 
2964   std::string run_file;
2965   bool        has_file;
2966   bool        cap_expired_password;
2967   bool        dont_wait_for_server_disconnect;
2968   bool        use_plain_auth;
2969 
2970   mysqlx::Internet_protocol ip_mode;
2971   int timeout;
2972   Connection_options connection;
2973 
2974   std::string uri;
2975   mysqlx::Ssl_config ssl;
2976   bool        daemon;
2977   std::string sql;
2978 
print_version()2979   void print_version()
2980   {
2981     printf("%s  Ver %s Distrib %s, for %s (%s)\n", my_progname, MYSQLXTEST_VERSION,
2982         MYSQL_SERVER_VERSION, SYSTEM_TYPE, MACHINE_TYPE);
2983   }
2984 
print_help()2985   void print_help()
2986   {
2987     std::cout << "mysqlxtest <options> [SCHEMA]\n";
2988     std::cout << "Options:\n";
2989     std::cout << "-f, --file=<file>     Reads input from file\n";
2990     std::cout << "-I, --import=<dir>    Reads macro files from dir; required by -->import\n";
2991     std::cout << "--sql=<SQL>           Use SQL as input and execute it like in -->sql block\n";
2992     std::cout << "-e=<SQL>, --execute=<SQL> Aliases for \"--sql\" option\n";
2993     std::cout << "-n, --no-auth         Skip authentication which is required by -->sql block (run mode)\n";
2994     std::cout << "--plain-auth          Use PLAIN text authentication mechanism\n";
2995     std::cout << "-u, --user=<user>     Connection user\n";
2996     std::cout << "-p, --password=<pass> Connection password\n";
2997     std::cout << "-h, --host=<host>     Connection host\n";
2998     std::cout << "-P, --port=<port>     Connection port (default:" << MYSQLX_TCP_PORT << ")\n";
2999     std::cout << "--ipv=<mode>          Force internet protocol (default:4):\n";
3000     std::cout << "                      0 - allow system to resolve IPv6 and IPv4, for example\n";
3001     std::cout << "                          resolving of 'localhost' can return both '::1' and '127.0.0.1'\n";
3002     std::cout << "                      4 - allow system to resolve only IPv4, for example\n";
3003     std::cout << "                          resolving of 'localhost' is going to return '127.0.0.1'\n";
3004     std::cout << "                      6 - allow system to resolve only IPv6, for example\n";
3005     std::cout << "                          resolving of 'localhost' is going to return '::1'\n";
3006     std::cout << "-t, --timeout=<ms>    I/O timeouts in milliseconds\n";
3007     std::cout << "--close-no-sync       Do not wait for connection to be closed by server(disconnect first)\n";
3008     std::cout << "--schema=<schema>     Default schema to connect to\n";
3009     std::cout << "--uri=<uri>           Connection URI\n";
3010     std::cout << "                      URI takes precedence before options like: user, host, password, port\n";
3011     std::cout << "--socket=<file>       Connection through UNIX socket\n";
3012     std::cout << "--use-socket          Connection through UNIX socket, using default file name '" << MYSQLX_UNIX_ADDR << "'\n";
3013     std::cout << "                      --use-socket* options take precedence before options like: uri, user,\n";
3014     std::cout << "                      host, password, port\n";
3015     std::cout << "--ssl-key             X509 key in PEM format\n";
3016     std::cout << "--ssl-ca              CA file in PEM format\n";
3017     std::cout << "--ssl-ca_path         CA directory\n";
3018     std::cout << "--ssl-cert            X509 cert in PEM format\n";
3019     std::cout << "--ssl-cipher          SSL cipher to use\n";
3020     std::cout << "--tls-version         TLS version to use\n";
3021     std::cout << "--connect-expired-password Allow expired password\n";
3022     std::cout << "--quiet               Don't print out messages sent\n";
3023     std::cout << "-vVARIABLE_NAME=VALUE Set variable VARIABLE_NAME from command line\n";
3024     std::cout << "--fatal-errors=<0|1>  Mysqlxtest is started with ignoring or stopping on fatal error (default: 1)\n";
3025     std::cout << "-B, --bindump         Dump binary representation of messages sent, in format suitable for\n";
3026     std::cout << "                      the \"-->binsend\" command\n";
3027     std::cout << "--verbose             Enable extra verbose messages\n";
3028     std::cout << "--daemon              Work as a daemon (unix only)\n";
3029     std::cout << "--help                Show command line help\n";
3030     std::cout << "--help-commands       Show help for input commands\n";
3031     std::cout << "-V, --version         Show version of mysqlxtest\n";
3032     std::cout << "\nOnly one option that changes run mode is allowed.\n";
3033   }
3034 
print_help_commands()3035   void print_help_commands()
3036   {
3037     std::cout << "Input may be a file (or if no --file is specified, it stdin will be used)\n";
3038     std::cout << "The following commands may appear in the input script:\n";
3039     std::cout << "-->echo <text>\n";
3040     std::cout << "  Prints the text (allows variables)\n";
3041     std::cout << "-->title <c><text>\n";
3042     std::cout << "  Prints the text with an underline, using the character <c>\n";
3043     std::cout << "-->sql\n";
3044     std::cout << "  Begins SQL block. SQL statements that appear will be executed and results printed (allows variables).\n";
3045     std::cout << "-->endsql\n";
3046     std::cout << "  End SQL block. End a block of SQL started by -->sql\n";
3047     std::cout << "-->macro <macroname> <argname1> ...\n";
3048     std::cout << "  Start a block of text to be defined as a macro. Must be terminated with -->endmacro\n";
3049     std::cout << "-->endmacro\n";
3050     std::cout << "  Ends a macro block\n";
3051     std::cout << "-->callmacro <macro>\t<argvalue1>\t...\n";
3052     std::cout << "  Executes the macro text, substituting argument values with the provided ones (args separated by tabs).\n";
3053     std::cout << "-->import <macrofile>\n";
3054     std::cout << "  Loads macros from the specified file. The file must be in the directory specified by --import option in command line.\n";
3055     std::cout << "-->enablessl\n";
3056     std::cout << "  Enables ssl on current connection\n";
3057     std::cout << "<protomsg>\n";
3058     std::cout << "  Encodes the text format protobuf message and sends it to the server (allows variables).\n";
3059     std::cout << "-->recv [quiet|<FIELD PATH>]\n";
3060     std::cout << "  quiet        - received message isn't printed\n";
3061     std::cout << "  <FIELD PATH> - print only selected part of the message using \"field-path\" filter:\n";
3062     std::cout << "                 field_name1\n";
3063     std::cout << "                 field_name1.field_name2\n";
3064     std::cout << "                 repeated_field_name1[1].field_name1.field_name2\n";
3065     std::cout << "-->recvresult [print-columnsinfo] [" << CMD_ARG_BE_QUIET << "]\n";
3066     std::cout << "  Read and print one resultset from the server; if print-columnsinfo is present also print short columns status\n";
3067     std::cout << "-->recvtovar <varname> [COLUMN_NAME]\n";
3068     std::cout << "  Read first row and first column (or column with name COLUMN_NAME) of resultset\n";
3069     std::cout << "  and set the variable <varname>\n";
3070     std::cout << "-->recverror <errno>\n";
3071     std::cout << "  Read a message and ensure that it's an error of the expected type\n";
3072     std::cout << "-->recvtype <msgtype> [" << CMD_ARG_BE_QUIET << "]\n";
3073     std::cout << "  Read one message and print it, checking that its type is the specified one\n";
3074     std::cout << "-->recvuntil <msgtype> [do_not_show_intermediate]\n";
3075     std::cout << "  Read messages and print them, until a msg of the specified type (or Error) is received\n";
3076     std::cout << "  do_not_show_intermediate - if this argument is present then printing of intermediate message should be omitted\n";
3077     std::cout << "-->repeat <N> [<VARIABLE_NAME>]\n";
3078     std::cout << "  Begin block of instructions that should be repeated N times\n";
3079     std::cout << "-->endrepeat\n";
3080     std::cout << "  End block of instructions that should be repeated - next iteration\n";
3081     std::cout << "-->stmtsql <CMD>\n";
3082     std::cout << "  Send StmtExecute with sql command\n";
3083     std::cout << "-->stmtadmin <CMD> [json_string]\n";
3084     std::cout << "  Send StmtExecute with admin command with given aguments (formated as json object)\n";
3085     std::cout << "-->system <CMD>\n";
3086     std::cout << "  Execute application or script (dev only)\n";
3087     std::cout << "-->exit\n";
3088     std::cout << "  Stops reading commands, disconnects and exits (same as <eof>/^D)\n";
3089     std::cout << "-->abort\n";
3090     std::cout << "  Exit immediately, without performing cleanup\n";
3091     std::cout << "-->nowarnings/-->yeswarnings\n";
3092     std::cout << "  Whether to print warnings generated by the statement (default no)\n";
3093     std::cout << "-->peerdisc <MILLISECONDS> [TOLERANCE]\n";
3094     std::cout << "  Expect that xplugin disconnects after given number of milliseconds and tolerance\n";
3095     std::cout << "-->sleep <SECONDS>\n";
3096     std::cout << "  Stops execution of mysqlxtest for given number of seconds (may be fractional)\n";
3097     std::cout << "-->login <user>\t<pass>\t<db>\t<mysql41|plain>]\n";
3098     std::cout << "  Performs authentication steps (use with --no-auth)\n";
3099     std::cout << "-->loginerror <errno>\t<user>\t<pass>\t<db>\n";
3100     std::cout << "  Performs authentication steps expecting an error (use with --no-auth)\n";
3101     std::cout << "-->fatalerrors/nofatalerrors\n";
3102     std::cout << "  Whether to immediately exit on MySQL errors\n";
3103     std::cout << "-->expecterror <errno>\n";
3104     std::cout << "  Expect a specific error for the next command and fail if something else occurs\n";
3105     std::cout << "  Works for: newsession, closesession, recvresult\n";
3106     std::cout << "-->newsession <name>\t<user>\t<pass>\t<db>\n";
3107     std::cout << "  Create a new connection with given name and account (use - as user for no-auth)\n";
3108     std::cout << "-->newsessionplain <name>\t<user>\t<pass>\t<db>\n";
3109     std::cout << "  Create a new connection with given name and account and force it to NOT use ssl, even if its generally enabled\n";
3110     std::cout << "-->setsession <name>\n";
3111     std::cout << "  Activate the named session\n";
3112     std::cout << "-->closesession [abort]\n";
3113     std::cout << "  Close the active session (unless its the default session)\n";
3114     std::cout << "-->wait_for <VALUE_EXPECTED>\t<SQL QUERY>\n";
3115     std::cout << "  Wait until SQL query returns value matches expected value (time limit 30 second)\n";
3116     std::cout << "-->assert_eq <VALUE_EXPECTED>\t<VALUE_TESTED>\n";
3117     std::cout << "  Ensure that 'TESTED' value equals 'EXPECTED' by comparing strings lexicographically\n";
3118     std::cout << "-->assert_gt <VALUE_EXPECTED>\t<VALUE_TESTED>\n";
3119     std::cout << "  Ensure that 'TESTED' value is greater than 'EXPECTED' (only when the both are numeric values)\n";
3120     std::cout << "-->assert_ge <VALUE_EXPECTED>\t<VALUE_TESTED>\n";
3121     std::cout << "  Ensure that 'TESTED' value is greater  or equal to 'EXPECTED' (only when the both are numeric values)\n";
3122     std::cout << "-->varfile <varname> <datafile>\n";
3123     std::cout << "  Assigns the contents of the file to the named variable\n";
3124     std::cout << "-->varlet <varname> <value>\n";
3125     std::cout << "  Assign the value (can be another variable) to the variable\n";
3126     std::cout << "-->varinc <varname> <n>\n";
3127     std::cout << "  Increment the value of varname by n (assuming both convert to integral)\n";
3128     std::cout << "-->varsub <varname>\n";
3129     std::cout << "  Add a variable to the list of variables to replace for the next recv or sql command (value is replaced by the name)\n";
3130     std::cout << "-->binsend <bindump>[<bindump>...]\n";
3131     std::cout << "  Sends one or more binary message dumps to the server (generate those with --bindump)\n";
3132     std::cout << "-->binsendoffset <srcvar> [offset-begin[percent]> [offset-end[percent]]]\n";
3133     std::cout << "  Same as binsend with begin and end offset of data to be send\n";
3134     std::cout << "-->binparse MESSAGE.NAME {\n";
3135     std::cout << "    MESSAGE.DATA\n";
3136     std::cout << "}\n";
3137     std::cout << "  Dump given message to variable %MESSAGE_DUMP%\n";
3138     std::cout << "-->quiet/noquiet\n";
3139     std::cout << "  Toggle verbose messages\n";
3140     std::cout << "-->query_result/noquery_result\n";
3141     std::cout << "  Toggle visibility for query results\n";
3142     std::cout << "-->received <msgtype>\t<varname>\n";
3143     std::cout << "  Assigns number of received messages of indicated type (in active session) to a variable\n";
3144     std::cout << "# comment\n";
3145   }
3146 
set_mode(Run_mode mode)3147   bool set_mode(Run_mode mode)
3148   {
3149     if (RunTest != run_mode)
3150       return false;
3151 
3152     run_mode = mode;
3153 
3154     return true;
3155   }
3156 
get_socket_name()3157   std::string get_socket_name()
3158   {
3159     return MYSQLX_UNIX_ADDR;
3160   }
3161 
My_command_line_options(int argc,char ** argv)3162   My_command_line_options(int argc, char **argv)
3163   : Command_line_options(argc, argv), run_mode(RunTest), has_file(false),
3164     cap_expired_password(false), dont_wait_for_server_disconnect(false),
3165     use_plain_auth(false), ip_mode(mysqlx::IPv4), timeout(0l), daemon(false)
3166   {
3167     std::string user;
3168 
3169     run_mode = RunTest; // run tests by default
3170 
3171     for (int i = 1; i < argc && exit_code == 0; i++)
3172     {
3173       char *value;
3174       if (check_arg_with_value(argv, i, "--file", "-f", value))
3175       {
3176         run_file = value;
3177         has_file = true;
3178       }
3179       else if (check_arg(argv, i, "--no-auth", "-n"))
3180       {
3181         if (!set_mode(RunTestWithoutAuth))
3182         {
3183           std::cerr << "Only one option that changes run mode is allowed.\n";
3184           exit_code = 1;
3185         }
3186       }
3187       else if (check_arg(argv, i, "--plain-auth", NULL))
3188       {
3189         use_plain_auth = true;
3190       }
3191       else if (check_arg_with_value(argv, i, "--sql", NULL, value))
3192       {
3193         sql = value;
3194       }
3195       else if (check_arg_with_value(argv, i, "--execute", "-e", value))
3196       {
3197         sql = value;
3198       }
3199       else if (check_arg_with_value(argv, i, "--password", "-p", value))
3200         connection.password = value;
3201       else if (check_arg_with_value(argv, i, "--ssl-key", NULL, value))
3202         ssl.key = value;
3203       else if (check_arg_with_value(argv, i, "--ssl-ca", NULL, value))
3204         ssl.ca = value;
3205       else if (check_arg_with_value(argv, i, "--ssl-ca_path", NULL, value))
3206         ssl.ca_path = value;
3207       else if (check_arg_with_value(argv, i, "--ssl-cert", NULL, value))
3208         ssl.cert = value;
3209       else if (check_arg_with_value(argv, i, "--ssl-cipher", NULL, value))
3210         ssl.cipher = value;
3211       else if (check_arg_with_value(argv, i, "--tls-version", NULL, value))
3212         ssl.tls_version = value;
3213       else if (check_arg_with_value(argv, i, "--host", "-h", value))
3214         connection.host = value;
3215       else if (check_arg_with_value(argv, i, "--user", "-u", value))
3216         connection.user = value;
3217       else if (check_arg_with_value(argv, i, "--uri", NULL, value))
3218         uri = value;
3219       else if (check_arg_with_value(argv, i, "--schema", NULL, value))
3220         connection.schema = value;
3221       else if (check_arg_with_value(argv, i, "--port", "-P", value))
3222         connection.port = ngs::stoi(value);
3223       else if (check_arg_with_value(argv, i, "--ipv", NULL, value))
3224       {
3225         ip_mode = set_protocol(ngs::stoi(value));
3226       }
3227       else if (check_arg_with_value(argv, i, "--timeout", "-t", value))
3228         timeout = ngs::stoi(value);
3229       else if (check_arg_with_value(argv, i, "--fatal-errors", NULL, value))
3230         OPT_fatal_errors = ngs::stoi(value);
3231       else if (check_arg_with_value(argv, i, "--password", "-p", value))
3232         connection.password = value;
3233       else if (check_arg_with_value(argv, i, "--socket", "-S", value))
3234         connection.socket = value;
3235       else if (check_arg_with_value(argv, i, NULL, "-v", value))
3236         set_variable_option(value);
3237       else if (check_arg(argv, i, "--use-socket", NULL))
3238         connection.socket = get_socket_name();
3239       else if (check_arg(argv, i, "--close-no-sync", NULL))
3240         dont_wait_for_server_disconnect = true;
3241       else if (check_arg(argv, i, "--bindump", "-B"))
3242         OPT_bindump = true;
3243       else if (check_arg(argv, i, "--connect-expired-password", NULL))
3244         cap_expired_password = true;
3245       else if (check_arg(argv, i, "--quiet", "-q"))
3246         OPT_quiet = true;
3247       else if (check_arg(argv, i, "--verbose", NULL))
3248         OPT_verbose = true;
3249       else if (check_arg(argv, i, "--daemon", NULL))
3250         daemon = true;
3251 #ifndef _WIN32
3252       else if (check_arg(argv, i, "--color", NULL))
3253         OPT_color = true;
3254 #endif
3255       else if (check_arg_with_value(argv, i, "--import", "-I", value))
3256       {
3257         OPT_import_path = value;
3258         if (*OPT_import_path.rbegin() != FN_LIBCHAR)
3259           OPT_import_path += FN_LIBCHAR;
3260       }
3261       else if (check_arg(argv, i, "--help", "--help"))
3262       {
3263         print_help();
3264         exit_code = 1;
3265       }
3266       else if (check_arg(argv, i, "--help-commands", "--help-commands"))
3267       {
3268         print_help_commands();
3269         exit_code = 1;
3270       }
3271       else if (check_arg(argv, i, "--version", "-V"))
3272       {
3273         print_version();
3274         exit_code = 1;
3275       }
3276       else if (exit_code == 0)
3277       {
3278         if (argc -1 == i && std::isalnum(argv[i][0]))
3279         {
3280           connection.schema = argv[i];
3281           break;
3282         }
3283 
3284         std::cerr << argv[0] << ": unknown option " << argv[i] << "\n";
3285         exit_code = 1;
3286         break;
3287       }
3288     }
3289 
3290     if (connection.port == 0)
3291       connection.port = MYSQLX_TCP_PORT;
3292     if (connection.host.empty())
3293       connection.host = "localhost";
3294   }
3295 
set_variable_option(const std::string & set_expression)3296   void set_variable_option(const std::string &set_expression)
3297   {
3298     std::vector<std::string> args;
3299 
3300     aux::split(args, set_expression, "=", false);
3301 
3302     if (2 != args.size())
3303     {
3304       std::cerr << "Wrong format expected NAME=VALUE\n";
3305       exit_code = 1;
3306       return;
3307     }
3308 
3309     variables[args[0]] = args[1];
3310   }
3311 
set_protocol(const int ip_mode)3312   mysqlx::Internet_protocol set_protocol(const int ip_mode)
3313   {
3314     switch(ip_mode)
3315     {
3316     case 0:
3317       return mysqlx::IP_any;
3318 
3319     case 4:
3320       return mysqlx::IPv4;
3321 
3322     case 6:
3323       return mysqlx::IPv6;
3324 
3325     default:
3326       std::cerr << "Wrong Internet protocol version\n";
3327       exit_code = 1;
3328       return mysqlx::IP_any;
3329     }
3330   }
3331 };
3332 
create_macro_block_processors(Connection_manager * cm)3333 static std::vector<Block_processor_ptr> create_macro_block_processors(Connection_manager *cm)
3334 {
3335   std::vector<Block_processor_ptr> result;
3336 
3337   result.push_back(ngs::make_shared<Sql_block_processor>(cm));
3338   result.push_back(ngs::make_shared<Dump_message_block_processor>(cm));
3339   result.push_back(ngs::make_shared<Single_command_processor>(cm));
3340   result.push_back(ngs::make_shared<Snd_message_block_processor>(cm));
3341 
3342   return result;
3343 }
3344 
create_block_processors(Connection_manager * cm)3345 static std::vector<Block_processor_ptr> create_block_processors(Connection_manager *cm)
3346 {
3347   std::vector<Block_processor_ptr> result;
3348 
3349   result.push_back(ngs::make_shared<Sql_block_processor>(cm));
3350   result.push_back(ngs::make_shared<Macro_block_processor>(cm));
3351   result.push_back(ngs::make_shared<Dump_message_block_processor>(cm));
3352   result.push_back(ngs::make_shared<Single_command_processor>(cm));
3353   result.push_back(ngs::make_shared<Snd_message_block_processor>(cm));
3354 
3355   return result;
3356 }
3357 
process_client_input_on_session(const My_command_line_options & options,std::istream & input)3358 static int process_client_input_on_session(const My_command_line_options &options, std::istream &input)
3359 {
3360   Connection_manager cm(options.uri, options.connection, options.ssl, options.timeout, options.dont_wait_for_server_disconnect, options.ip_mode);
3361   int r = 1;
3362 
3363   try
3364   {
3365     std::vector<Block_processor_ptr> eaters;
3366 
3367     cm.connect_default(options.cap_expired_password, options.use_plain_auth);
3368     eaters = create_block_processors(&cm);
3369     r = process_client_input(input, eaters);
3370     cm.close_active(true);
3371   }
3372   catch (mysqlx::Error &error)
3373   {
3374     dumpx(error);
3375     std::cerr << "not ok\n";
3376     return 1;
3377   }
3378 
3379   if (r == 0)
3380     std::cerr << "ok\n";
3381   else
3382     std::cerr << "not ok\n";
3383 
3384   return r;
3385 }
3386 
process_client_input_no_auth(const My_command_line_options & options,std::istream & input)3387 static int process_client_input_no_auth(const My_command_line_options &options, std::istream &input)
3388 {
3389   Connection_manager cm(options.uri, options.connection, options.ssl, options.timeout, options.dont_wait_for_server_disconnect, options.ip_mode);
3390   int r = 1;
3391 
3392   try
3393   {
3394     std::vector<Block_processor_ptr> eaters;
3395 
3396     cm.active()->set_closed();
3397     eaters = create_block_processors(&cm);
3398     r = process_client_input(input, eaters);
3399   }
3400   catch (mysqlx::Error &error)
3401   {
3402     dumpx(error);
3403     std::cerr << "not ok\n";
3404     return 1;
3405   }
3406 
3407   if (r == 0)
3408     std::cerr << "ok\n";
3409   else
3410     std::cerr << "not ok\n";
3411 
3412   return r;
3413 }
3414 
call(Execution_context & context,const std::string & cmd)3415 bool Macro::call(Execution_context &context, const std::string &cmd)
3416 {
3417   std::string name;
3418   std::string macro = get(cmd, name);
3419   if (macro.empty())
3420     return false;
3421 
3422   Stack_frame frame = {0, "macro "+name};
3423   script_stack.push_front(frame);
3424 
3425   std::stringstream stream(macro);
3426   std::vector<Block_processor_ptr> processors(create_macro_block_processors(context.m_cm));
3427 
3428   bool r = process_client_input(stream, processors) == 0;
3429 
3430   script_stack.pop_front();
3431 
3432   return r;
3433 }
3434 
3435 
3436 namespace
3437 {
3438 
3439 class Json_to_any_handler : public rapidjson::BaseReaderHandler<rapidjson::UTF8<>, Json_to_any_handler>
3440 {
3441 public:
3442   typedef ::Mysqlx::Datatypes::Any Any;
3443 
Json_to_any_handler(Any & any)3444   Json_to_any_handler(Any &any)
3445   {
3446     m_stack.push(&any);
3447   }
3448 
Key(const char * str,rapidjson::SizeType length,bool copy)3449   bool Key(const char *str, rapidjson::SizeType length, bool copy)
3450   {
3451     typedef ::Mysqlx::Datatypes::Object_ObjectField Field;
3452     Field *f = m_stack.top()->mutable_obj()->add_fld();
3453     f->set_key(str, length);
3454     m_stack.push(f->mutable_value());
3455     return true;
3456   }
3457 
Null()3458   bool Null()
3459   {
3460     get_scalar(::Mysqlx::Datatypes::Scalar_Type_V_NULL);
3461     return true;
3462   }
3463 
Bool(bool b)3464   bool Bool(bool b)
3465   {
3466     get_scalar(::Mysqlx::Datatypes::Scalar_Type_V_BOOL)->set_v_bool(b);
3467     return true;
3468   }
3469 
Int(int i)3470   bool Int(int i)
3471   {
3472     get_scalar(::Mysqlx::Datatypes::Scalar_Type_V_SINT)->set_v_signed_int(i);
3473     return true;
3474   }
3475 
Uint(unsigned u)3476   bool Uint(unsigned u)
3477   {
3478     get_scalar(::Mysqlx::Datatypes::Scalar_Type_V_UINT)->set_v_unsigned_int(u);
3479     return true;
3480   }
3481 
Int64(int64_t i)3482   bool Int64(int64_t i)
3483   {
3484     get_scalar(::Mysqlx::Datatypes::Scalar_Type_V_SINT)->set_v_signed_int(i);
3485     return true;
3486   }
3487 
Uint64(uint64_t u)3488   bool Uint64(uint64_t u)
3489   {
3490     get_scalar(::Mysqlx::Datatypes::Scalar_Type_V_UINT)->set_v_unsigned_int(u);
3491     return true;
3492   }
3493 
Double(double d,bool=false)3494   bool Double(double d, bool = false)
3495   {
3496     get_scalar(::Mysqlx::Datatypes::Scalar_Type_V_DOUBLE)->set_v_double(d);
3497     return true;
3498   }
3499 
String(const char * str,rapidjson::SizeType length,bool)3500   bool String(const char* str, rapidjson::SizeType length, bool)
3501   {
3502     get_scalar(::Mysqlx::Datatypes::Scalar_Type_V_STRING)->mutable_v_string()->set_value(str, length);
3503     return true;
3504   }
3505 
StartObject()3506   bool StartObject()
3507   {
3508     Any *any = m_stack.top();
3509     if (any->has_type() && any->type() == ::Mysqlx::Datatypes::Any_Type_ARRAY)
3510       m_stack.push(any->mutable_array()->add_value());
3511     m_stack.top()->set_type(::Mysqlx::Datatypes::Any_Type_OBJECT);
3512     m_stack.top()->mutable_obj();
3513     return true;
3514   }
3515 
EndObject(rapidjson::SizeType memberCount)3516   bool EndObject(rapidjson::SizeType memberCount)
3517   {
3518     m_stack.pop();
3519     return true;
3520   }
3521 
StartArray()3522   bool StartArray()
3523   {
3524     m_stack.top()->set_type(::Mysqlx::Datatypes::Any_Type_ARRAY);
3525     m_stack.top()->mutable_array();
3526     return true;
3527   }
3528 
EndArray(rapidjson::SizeType elementCount)3529   bool EndArray(rapidjson::SizeType elementCount)
3530   {
3531     m_stack.pop();
3532     return true;
3533   }
3534 
3535 private:
3536   typedef ::Mysqlx::Datatypes::Scalar Scalar;
3537 
get_scalar(Scalar::Type scalar_t)3538   Scalar *get_scalar(Scalar::Type scalar_t)
3539   {
3540     Any *any = m_stack.top();
3541     if (any->has_type() && any->type() == ::Mysqlx::Datatypes::Any_Type_ARRAY)
3542       any = any->mutable_array()->add_value();
3543     else
3544       m_stack.pop();
3545     any->set_type(::Mysqlx::Datatypes::Any_Type_SCALAR);
3546     Scalar *s = any->mutable_scalar();
3547     s->set_type(scalar_t);
3548     return s;
3549   }
3550 
3551   std::stack<Any*> m_stack;
3552 };
3553 
3554 } // namespace
3555 
3556 
json_string_to_any(const std::string & json_string,Any & any) const3557 bool Command::json_string_to_any(const std::string &json_string, Any &any) const
3558 {
3559   Json_to_any_handler handler(any);
3560   rapidjson::Reader reader;
3561   rapidjson::StringStream ss(json_string.c_str());
3562   return !reader.Parse(ss, handler).IsError();
3563 }
3564 
3565 
cmd_import(Execution_context & context,const std::string & args)3566 Command::Result Command::cmd_import(Execution_context &context,
3567                                     const std::string &args)
3568 {
3569   std::string varg(args);
3570   replace_variables(varg);
3571   const std::string filename = OPT_import_path + varg;
3572 
3573   std::ifstream fs(filename.c_str());
3574   if (!fs.good())
3575   {
3576     std::cerr << error() << "Could not open macro file " << args << " (aka "
3577               << filename << ")" << eoerr();
3578     return Stop_with_failure;
3579   }
3580 
3581   Stack_frame frame = {0, args};
3582   script_stack.push_front(frame);
3583 
3584   std::vector<Block_processor_ptr> processors;
3585   processors.push_back(ngs::make_shared<Macro_block_processor>(context.m_cm));
3586   bool r = process_client_input(fs, processors) == 0;
3587   script_stack.pop_front();
3588 
3589   return r ? Continue : Stop_with_failure;
3590 }
3591 
3592 typedef int (*Program_mode)(const My_command_line_options &, std::istream &input);
3593 
get_input(My_command_line_options & opt,std::ifstream & file,std::stringstream & string)3594 static std::istream &get_input(My_command_line_options &opt, std::ifstream &file, std::stringstream &string)
3595 {
3596   if (opt.has_file)
3597   {
3598     if (!opt.sql.empty())
3599     {
3600       std::cerr << "ERROR: specified file and sql to execute, please enter only one of those\n";
3601       opt.exit_code = 1;
3602     }
3603 
3604     file.open(opt.run_file.c_str());
3605     file.rdbuf()->pubsetbuf(NULL, 0);
3606 
3607     if (!file.is_open())
3608     {
3609       std::cerr << "ERROR: Could not open file " << opt.run_file << "\n";
3610       opt.exit_code = 1;
3611     }
3612 
3613     return file;
3614   }
3615 
3616   if (!opt.sql.empty())
3617   {
3618     std::streampos position = string.tellp();
3619 
3620     string << "-->sql\n";
3621     string << opt.sql << "\n";
3622     string << "-->endsql\n";
3623     string.seekp(position, std::ios::beg);
3624 
3625     return string;
3626   }
3627 
3628   return std::cin;
3629 }
3630 
3631 
unable_daemonize()3632 static void unable_daemonize()
3633 {
3634   std::cerr << "ERROR: Unable to put process in background\n";
3635   exit(2);
3636 }
3637 
3638 
daemonize()3639 static void daemonize()
3640 {
3641 #ifdef WIN32
3642   unable_daemonize();
3643 #else
3644   if (getppid() == 1) // already a daemon
3645     exit(0);
3646   pid_t pid = fork();
3647   if (pid < 0)
3648     unable_daemonize();
3649   if (pid > 0)
3650     exit(0);
3651   if (setsid() < 0)
3652     unable_daemonize();
3653 #endif
3654 }
3655 
3656 
get_mode_function(const My_command_line_options & opt)3657 static Program_mode get_mode_function(const My_command_line_options &opt)
3658 {
3659   switch(opt.run_mode)
3660   {
3661   case My_command_line_options::RunTestWithoutAuth:
3662     return process_client_input_no_auth;
3663 
3664   case My_command_line_options::RunTest:
3665   default:
3666     return process_client_input_on_session;
3667   }
3668 }
3669 
3670 
main(int argc,char ** argv)3671 int main(int argc, char **argv)
3672 {
3673   MY_INIT(argv[0]);
3674   local_message_hook = ignore_traces_from_libraries;
3675 
3676   OPT_expect_error = new Expected_error();
3677   My_command_line_options options(argc, argv);
3678 
3679   if (options.exit_code != 0)
3680     return options.exit_code;
3681 
3682   if (options.daemon)
3683     daemonize();
3684 
3685   std::cout << std::unitbuf;
3686   std::ifstream fs;
3687   std::stringstream ss;
3688   std::istream &input = get_input(options, fs, ss);
3689   Program_mode  mode  = get_mode_function(options);
3690 
3691 #ifdef WIN32
3692   if (!have_tcpip)
3693   {
3694     std::cerr << "OS doesn't have tcpip\n";
3695     return 1;
3696   }
3697 #endif
3698 
3699   ssl_start();
3700 
3701   bool result = 0;
3702   try
3703   {
3704     Stack_frame frame = {0, "main"};
3705     script_stack.push_front(frame);
3706 
3707     result = mode(options, input);
3708   }
3709   catch (mysqlx::Error &e)
3710   {
3711     std::cerr << "ERROR: " << e.what() << "\n";
3712     result = 1;
3713   }
3714   catch (std::exception &e)
3715   {
3716     std::cerr << "ERROR: " << e.what() << "\n";
3717     result = 1;
3718   }
3719 
3720   vio_end();
3721   my_end(0);
3722   return result;
3723 }
3724 
3725 
3726 #include "mysqlx_all_msgs.h"
3727 
3728 #ifdef _MSC_VER
3729 #  pragma pop_macro("ERROR")
3730 #endif
3731