1 /*
2   Copyright 2011 Kristian Nielsen and Monty Program Ab.
3 
4   This file is free software; you can redistribute it and/or
5   modify it under the terms of the GNU Lesser General Public
6   License as published by the Free Software Foundation; either
7   version 2.1 of the License, or (at your option) any later version.
8 
9   This library is distributed in the hope that it will be useful,
10   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12   Lesser General Public License for more details.
13 
14   You should have received a copy of the GNU General Public License
15   along with this.  If not, see <http://www.gnu.org/licenses/>.
16 */
17 
18 
19 /*
20   Run a set of queries in parallel against a server using the non-blocking
21   API, and compare to running same queries with the normal blocking API.
22 */
23 
24 #include <my_global.h>
25 #include <my_sys.h>
26 #include <mysql.h>
27 #include <my_getopt.h>
28 
29 #include <sys/time.h>
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include <string.h>
33 
34 #include <event.h>
35 
36 
37 #define SL(s) (s), sizeof(s)
38 static const char *my_groups[]= { "client", NULL };
39 
40 /* Maintaining a list of queries to run. */
41 struct query_entry {
42   struct query_entry *next;
43   char *query;
44   int index;
45 };
46 static struct query_entry *query_list;
47 static struct query_entry **tail_ptr= &query_list;
48 static int query_counter= 0;
49 
50 
51 /* State kept for each connection. */
52 struct state_data {
53   int ST;                                    /* State machine current state */
54   struct event ev_mysql;
55   MYSQL mysql;
56   MYSQL_RES *result;
57   MYSQL *ret;
58   int err;
59   MYSQL_ROW row;
60   struct query_entry *query_element;
61   int index;
62 };
63 
64 
65 static const char *opt_db= NULL;
66 static const char *opt_user= NULL;
67 static const char *opt_password= NULL;
68 static int tty_password= 0;
69 static const char *opt_host= NULL;
70 static const char *opt_socket= NULL;
71 static unsigned int opt_port= 0;
72 static unsigned int opt_connections= 5;
73 static const char *opt_query_file= NULL;
74 
75 static struct my_option options[] =
76 {
77   {"database", 'D', "Database to use", &opt_db, &opt_db,
78    0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
79   {"help", '?', "Display this help and exit", 0, 0, 0, GET_NO_ARG, NO_ARG, 0,
80    0, 0, 0, 0, 0},
81   {"host", 'h', "Connect to host", &opt_host, &opt_host,
82    0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
83   {"password", 'p',
84    "Password to use when connecting to server. If password is not given it's asked from the tty.",
85    0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
86   {"port", 'P', "Port number to use for connection.",
87    &opt_port, &opt_port, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
88   {"socket", 'S', "Socket file to use for connection",
89    &opt_socket, &opt_socket, 0, GET_STR,
90    REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
91   {"user", 'u', "User for login if not current user", &opt_user,
92    &opt_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
93   {"connections", 'n', "Number of simultaneous connections/queries.",
94    &opt_connections, &opt_connections, 0, GET_UINT, REQUIRED_ARG,
95    5, 0, 0, 0, 0, 0},
96   {"queryfile", 'q', "Name of file containing extra queries to run",
97    &opt_query_file, &opt_query_file, 0, GET_STR, REQUIRED_ARG,
98    0, 0, 0, 0, 0, 0},
99   { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
100 };
101 
102 static void
fatal(struct state_data * sd,const char * msg)103 fatal(struct state_data *sd, const char *msg)
104 {
105   fprintf(stderr, "%s: %s\n", msg, (sd ? mysql_error(&sd->mysql) : ""));
106   exit(1);
107 }
108 
109 
110 static void state_machine_handler(int fd, short event, void *arg);
111 
112 static void
next_event(int new_st,int status,struct state_data * sd)113 next_event(int new_st, int status, struct state_data *sd)
114 {
115   short wait_event= 0;
116   struct timeval tv, *ptv;
117   int fd;
118 
119   if (status & MYSQL_WAIT_READ)
120     wait_event|= EV_READ;
121   if (status & MYSQL_WAIT_WRITE)
122     wait_event|= EV_WRITE;
123   if (wait_event)
124     fd= mysql_get_socket(&sd->mysql);
125   else
126     fd= -1;
127   if (status & MYSQL_WAIT_TIMEOUT)
128   {
129     tv.tv_sec= mysql_get_timeout_value(&sd->mysql);
130     tv.tv_usec= 0;
131     ptv= &tv;
132   }
133   else
134     ptv= NULL;
135   event_set(&sd->ev_mysql, fd, wait_event, state_machine_handler, sd);
136   event_add(&sd->ev_mysql, ptv);
137   sd->ST= new_st;
138 }
139 
140 static int
mysql_status(short event)141 mysql_status(short event)
142 {
143   int status= 0;
144   if (event & EV_READ)
145     status|= MYSQL_WAIT_READ;
146   if (event & EV_WRITE)
147     status|= MYSQL_WAIT_WRITE;
148   if (event & EV_TIMEOUT)
149     status|= MYSQL_WAIT_TIMEOUT;
150   return status;
151 }
152 
153 
154 static int num_active_connections;
155 
156 /* Shortcut for going to new state immediately without waiting. */
157 #define NEXT_IMMEDIATE(sd_, new_st) do { sd_->ST= new_st; goto again; } while (0)
158 
159 static void
state_machine_handler(int fd,short event,void * arg)160 state_machine_handler(int fd __attribute__((unused)), short event, void *arg)
161 {
162   struct state_data *sd= arg;
163   int status;
164 
165 again:
166   switch(sd->ST)
167   {
168   case 0:
169     /* Initial state, start making the connection. */
170     status= mysql_real_connect_start(&sd->ret, &sd->mysql, opt_host, opt_user, opt_password, opt_db, opt_port, opt_socket, 0);
171     if (status)
172       /* Wait for connect to complete. */
173       next_event(1, status, sd);
174     else
175       NEXT_IMMEDIATE(sd, 9);
176     break;
177 
178   case 1:
179     status= mysql_real_connect_cont(&sd->ret, &sd->mysql, mysql_status(event));
180     if (status)
181       next_event(1, status, sd);
182     else
183       NEXT_IMMEDIATE(sd, 9);
184     break;
185 
186   case 9:
187     if (!sd->ret)
188       fatal(sd, "Failed to mysql_real_connect()");
189     NEXT_IMMEDIATE(sd, 10);
190     break;
191 
192   case 10:
193     /* Now run the next query. */
194     sd->query_element= query_list;
195     if (!sd->query_element)
196     {
197       /* No more queries, end the connection. */
198       NEXT_IMMEDIATE(sd, 40);
199     }
200     query_list= query_list->next;
201 
202     sd->index= sd->query_element->index;
203     printf("%d ! %s\n", sd->index, sd->query_element->query);
204     status= mysql_real_query_start(&sd->err, &sd->mysql, sd->query_element->query,
205                                    strlen(sd->query_element->query));
206     if (status)
207       next_event(11, status, sd);
208     else
209       NEXT_IMMEDIATE(sd, 20);
210     break;
211 
212   case 11:
213     status= mysql_real_query_cont(&sd->err, &sd->mysql, mysql_status(event));
214     if (status)
215       next_event(11, status, sd);
216     else
217       NEXT_IMMEDIATE(sd, 20);
218     break;
219 
220   case 20:
221     my_free(sd->query_element->query);
222     my_free(sd->query_element);
223     if (sd->err)
224     {
225       printf("%d | Error: %s\n", sd->index, mysql_error(&sd->mysql));
226       NEXT_IMMEDIATE(sd, 10);
227     }
228     else
229     {
230       sd->result= mysql_use_result(&sd->mysql);
231       if (!sd->result)
232         fatal(sd, "mysql_use_result() returns error");
233       NEXT_IMMEDIATE(sd, 30);
234     }
235     break;
236 
237   case 30:
238     status= mysql_fetch_row_start(&sd->row, sd->result);
239     if (status)
240       next_event(31, status, sd);
241     else
242       NEXT_IMMEDIATE(sd, 39);
243     break;
244 
245   case 31:
246     status= mysql_fetch_row_cont(&sd->row, sd->result, mysql_status(event));
247     if (status)
248       next_event(31, status, sd);
249     else
250       NEXT_IMMEDIATE(sd, 39);
251     break;
252 
253   case 39:
254     if (sd->row)
255     {
256       /* Got a row. */
257       unsigned int i;
258       printf("%d - ", sd->index);
259       for (i= 0; i < mysql_num_fields(sd->result); i++)
260         printf("%s%s", (i ? "\t" : ""), (sd->row[i] ? sd->row[i] : "(null)"));
261       printf ("\n");
262       NEXT_IMMEDIATE(sd, 30);
263     }
264     else
265     {
266       if (mysql_errno(&sd->mysql))
267       {
268         /* An error occurred. */
269         printf("%d | Error: %s\n", sd->index, mysql_error(&sd->mysql));
270       }
271       else
272       {
273         /* EOF. */
274         printf("%d | EOF\n", sd->index);
275       }
276       mysql_free_result(sd->result);
277       NEXT_IMMEDIATE(sd, 10);
278     }
279     break;
280 
281   case 40:
282     status= mysql_close_start(&sd->mysql);
283     if (status)
284       next_event(41, status, sd);
285     else
286       NEXT_IMMEDIATE(sd, 50);
287     break;
288 
289   case 41:
290     status= mysql_close_cont(&sd->mysql, mysql_status(event));
291     if (status)
292       next_event(41, status, sd);
293     else
294       NEXT_IMMEDIATE(sd, 50);
295     break;
296 
297   case 50:
298     /* We are done! */
299     num_active_connections--;
300     if (num_active_connections == 0)
301       event_loopbreak();
302     break;
303 
304   default:
305     abort();
306   }
307 }
308 
309 
310 void
add_query(const char * q)311 add_query(const char *q)
312 {
313   struct query_entry *e;
314   char *q2;
315   size_t len;
316 
317   e= my_malloc(PSI_NOT_INSTRUMENTED, sizeof(*e), MYF(0));
318   q2= my_strdup(PSI_NOT_INSTRUMENTED, q, MYF(0));
319   if (!e || !q2)
320     fatal(NULL, "Out of memory");
321 
322   /* Remove any trailing newline. */
323   len= strlen(q2);
324   if (q2[len] == '\n')
325     q2[len--]= '\0';
326   if (q2[len] == '\r')
327     q2[len--]= '\0';
328 
329   e->next= NULL;
330   e->query= q2;
331   e->index= query_counter++;
332   *tail_ptr= e;
333   tail_ptr= &e->next;
334 }
335 
336 
337 static my_bool
handle_option(const struct my_option * opt,const char * arg,const char * filename)338 handle_option(const struct my_option *opt, const char *arg,
339               const char *filename __attribute__((unused)))
340 {
341   switch (opt->id)
342   {
343   case '?':
344     printf("Usage: async_queries [OPTIONS] query ...\n");
345     my_print_help(options);
346     my_print_variables(options);
347     exit(0);
348     break;
349 
350   case 'p':
351     if (arg)
352       opt_password= arg;
353     else
354       tty_password= 1;
355     break;
356   }
357 
358   return 0;
359 }
360 
361 
362 int
main(int argc,char * argv[])363 main(int argc, char *argv[])
364 {
365   struct state_data *sds;
366   unsigned int i;
367   int err;
368   struct event_base *libevent_base;
369 
370   err= handle_options(&argc, &argv, options, handle_option);
371   if (err)
372     exit(err);
373   if (tty_password)
374     opt_password= get_tty_password(NullS);
375 
376   if (opt_query_file)
377   {
378     FILE *f= fopen(opt_query_file, "r");
379     char buf[65536];
380     if (!f)
381       fatal(NULL, "Cannot open query file");
382     while (!feof(f))
383     {
384       if (!fgets(buf, sizeof(buf), f))
385         break;
386       add_query(buf);
387     }
388     fclose(f);
389   }
390   /* Add extra queries directly on command line. */
391   while (argc > 0)
392   {
393     --argc;
394     add_query(*argv++);
395   }
396 
397   sds= my_malloc(PSI_NOT_INSTRUMENTED, opt_connections * sizeof(*sds), MYF(0));
398   if (!sds)
399     fatal(NULL, "Out of memory");
400 
401   libevent_base= event_init();
402 
403   err= mysql_library_init(argc, argv, (char **)my_groups);
404   if (err)
405   {
406     fprintf(stderr, "Fatal: mysql_library_init() returns error: %d\n", err);
407     exit(1);
408   }
409 
410   num_active_connections= 0;
411   for (i= 0; i < opt_connections; i++)
412   {
413     mysql_init(&sds[i].mysql);
414     mysql_options(&sds[i].mysql, MYSQL_OPT_NONBLOCK, 0);
415     mysql_options(&sds[i].mysql, MYSQL_READ_DEFAULT_GROUP, "async_queries");
416 
417     /*
418       We put the initial connect call in the first state 0 of the state machine
419       and run that manually, just to have everything in one place.
420     */
421     sds[i].ST= 0;
422     num_active_connections++;
423     state_machine_handler(-1, -1, &sds[i]);
424   }
425 
426   event_dispatch();
427 
428   my_free(sds);
429 
430   mysql_library_end();
431 
432   event_base_free(libevent_base);
433 
434   return 0;
435 }
436