1 /*
2 Copyright 2011 Kristian Nielsen and Monty Program Ab.
3
4 This file is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18
19 /*
20 Run a set of queries in parallel against a server using the non-blocking
21 API, and compare to running same queries with the normal blocking API.
22 */
23
24 #include <my_global.h>
25 #include <my_sys.h>
26 #include <mysql.h>
27 #include <my_getopt.h>
28
29 #include <sys/time.h>
30 #include <stdlib.h>
31 #include <stdio.h>
32 #include <string.h>
33
34 #include <event.h>
35
36
37 #define SL(s) (s), sizeof(s)
38 static const char *my_groups[]= { "client", NULL };
39
40 /* Maintaining a list of queries to run. */
41 struct query_entry {
42 struct query_entry *next;
43 char *query;
44 int index;
45 };
46 static struct query_entry *query_list;
47 static struct query_entry **tail_ptr= &query_list;
48 static int query_counter= 0;
49
50
51 /* State kept for each connection. */
52 struct state_data {
53 int ST; /* State machine current state */
54 struct event ev_mysql;
55 MYSQL mysql;
56 MYSQL_RES *result;
57 MYSQL *ret;
58 int err;
59 MYSQL_ROW row;
60 struct query_entry *query_element;
61 int index;
62 };
63
64
65 static const char *opt_db= NULL;
66 static const char *opt_user= NULL;
67 static const char *opt_password= NULL;
68 static int tty_password= 0;
69 static const char *opt_host= NULL;
70 static const char *opt_socket= NULL;
71 static unsigned int opt_port= 0;
72 static unsigned int opt_connections= 5;
73 static const char *opt_query_file= NULL;
74
75 static struct my_option options[] =
76 {
77 {"database", 'D', "Database to use", &opt_db, &opt_db,
78 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
79 {"help", '?', "Display this help and exit", 0, 0, 0, GET_NO_ARG, NO_ARG, 0,
80 0, 0, 0, 0, 0},
81 {"host", 'h', "Connect to host", &opt_host, &opt_host,
82 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
83 {"password", 'p',
84 "Password to use when connecting to server. If password is not given it's asked from the tty.",
85 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
86 {"port", 'P', "Port number to use for connection.",
87 &opt_port, &opt_port, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
88 {"socket", 'S', "Socket file to use for connection",
89 &opt_socket, &opt_socket, 0, GET_STR,
90 REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
91 {"user", 'u', "User for login if not current user", &opt_user,
92 &opt_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
93 {"connections", 'n', "Number of simultaneous connections/queries.",
94 &opt_connections, &opt_connections, 0, GET_UINT, REQUIRED_ARG,
95 5, 0, 0, 0, 0, 0},
96 {"queryfile", 'q', "Name of file containing extra queries to run",
97 &opt_query_file, &opt_query_file, 0, GET_STR, REQUIRED_ARG,
98 0, 0, 0, 0, 0, 0},
99 { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
100 };
101
102 static void
fatal(struct state_data * sd,const char * msg)103 fatal(struct state_data *sd, const char *msg)
104 {
105 fprintf(stderr, "%s: %s\n", msg, (sd ? mysql_error(&sd->mysql) : ""));
106 exit(1);
107 }
108
109
110 static void state_machine_handler(int fd, short event, void *arg);
111
112 static void
next_event(int new_st,int status,struct state_data * sd)113 next_event(int new_st, int status, struct state_data *sd)
114 {
115 short wait_event= 0;
116 struct timeval tv, *ptv;
117 int fd;
118
119 if (status & MYSQL_WAIT_READ)
120 wait_event|= EV_READ;
121 if (status & MYSQL_WAIT_WRITE)
122 wait_event|= EV_WRITE;
123 if (wait_event)
124 fd= mysql_get_socket(&sd->mysql);
125 else
126 fd= -1;
127 if (status & MYSQL_WAIT_TIMEOUT)
128 {
129 tv.tv_sec= mysql_get_timeout_value(&sd->mysql);
130 tv.tv_usec= 0;
131 ptv= &tv;
132 }
133 else
134 ptv= NULL;
135 event_set(&sd->ev_mysql, fd, wait_event, state_machine_handler, sd);
136 event_add(&sd->ev_mysql, ptv);
137 sd->ST= new_st;
138 }
139
140 static int
mysql_status(short event)141 mysql_status(short event)
142 {
143 int status= 0;
144 if (event & EV_READ)
145 status|= MYSQL_WAIT_READ;
146 if (event & EV_WRITE)
147 status|= MYSQL_WAIT_WRITE;
148 if (event & EV_TIMEOUT)
149 status|= MYSQL_WAIT_TIMEOUT;
150 return status;
151 }
152
153
154 static int num_active_connections;
155
156 /* Shortcut for going to new state immediately without waiting. */
157 #define NEXT_IMMEDIATE(sd_, new_st) do { sd_->ST= new_st; goto again; } while (0)
158
159 static void
state_machine_handler(int fd,short event,void * arg)160 state_machine_handler(int fd __attribute__((unused)), short event, void *arg)
161 {
162 struct state_data *sd= arg;
163 int status;
164
165 again:
166 switch(sd->ST)
167 {
168 case 0:
169 /* Initial state, start making the connection. */
170 status= mysql_real_connect_start(&sd->ret, &sd->mysql, opt_host, opt_user, opt_password, opt_db, opt_port, opt_socket, 0);
171 if (status)
172 /* Wait for connect to complete. */
173 next_event(1, status, sd);
174 else
175 NEXT_IMMEDIATE(sd, 9);
176 break;
177
178 case 1:
179 status= mysql_real_connect_cont(&sd->ret, &sd->mysql, mysql_status(event));
180 if (status)
181 next_event(1, status, sd);
182 else
183 NEXT_IMMEDIATE(sd, 9);
184 break;
185
186 case 9:
187 if (!sd->ret)
188 fatal(sd, "Failed to mysql_real_connect()");
189 NEXT_IMMEDIATE(sd, 10);
190 break;
191
192 case 10:
193 /* Now run the next query. */
194 sd->query_element= query_list;
195 if (!sd->query_element)
196 {
197 /* No more queries, end the connection. */
198 NEXT_IMMEDIATE(sd, 40);
199 }
200 query_list= query_list->next;
201
202 sd->index= sd->query_element->index;
203 printf("%d ! %s\n", sd->index, sd->query_element->query);
204 status= mysql_real_query_start(&sd->err, &sd->mysql, sd->query_element->query,
205 strlen(sd->query_element->query));
206 if (status)
207 next_event(11, status, sd);
208 else
209 NEXT_IMMEDIATE(sd, 20);
210 break;
211
212 case 11:
213 status= mysql_real_query_cont(&sd->err, &sd->mysql, mysql_status(event));
214 if (status)
215 next_event(11, status, sd);
216 else
217 NEXT_IMMEDIATE(sd, 20);
218 break;
219
220 case 20:
221 my_free(sd->query_element->query);
222 my_free(sd->query_element);
223 if (sd->err)
224 {
225 printf("%d | Error: %s\n", sd->index, mysql_error(&sd->mysql));
226 NEXT_IMMEDIATE(sd, 10);
227 }
228 else
229 {
230 sd->result= mysql_use_result(&sd->mysql);
231 if (!sd->result)
232 fatal(sd, "mysql_use_result() returns error");
233 NEXT_IMMEDIATE(sd, 30);
234 }
235 break;
236
237 case 30:
238 status= mysql_fetch_row_start(&sd->row, sd->result);
239 if (status)
240 next_event(31, status, sd);
241 else
242 NEXT_IMMEDIATE(sd, 39);
243 break;
244
245 case 31:
246 status= mysql_fetch_row_cont(&sd->row, sd->result, mysql_status(event));
247 if (status)
248 next_event(31, status, sd);
249 else
250 NEXT_IMMEDIATE(sd, 39);
251 break;
252
253 case 39:
254 if (sd->row)
255 {
256 /* Got a row. */
257 unsigned int i;
258 printf("%d - ", sd->index);
259 for (i= 0; i < mysql_num_fields(sd->result); i++)
260 printf("%s%s", (i ? "\t" : ""), (sd->row[i] ? sd->row[i] : "(null)"));
261 printf ("\n");
262 NEXT_IMMEDIATE(sd, 30);
263 }
264 else
265 {
266 if (mysql_errno(&sd->mysql))
267 {
268 /* An error occurred. */
269 printf("%d | Error: %s\n", sd->index, mysql_error(&sd->mysql));
270 }
271 else
272 {
273 /* EOF. */
274 printf("%d | EOF\n", sd->index);
275 }
276 mysql_free_result(sd->result);
277 NEXT_IMMEDIATE(sd, 10);
278 }
279 break;
280
281 case 40:
282 status= mysql_close_start(&sd->mysql);
283 if (status)
284 next_event(41, status, sd);
285 else
286 NEXT_IMMEDIATE(sd, 50);
287 break;
288
289 case 41:
290 status= mysql_close_cont(&sd->mysql, mysql_status(event));
291 if (status)
292 next_event(41, status, sd);
293 else
294 NEXT_IMMEDIATE(sd, 50);
295 break;
296
297 case 50:
298 /* We are done! */
299 num_active_connections--;
300 if (num_active_connections == 0)
301 event_loopbreak();
302 break;
303
304 default:
305 abort();
306 }
307 }
308
309
310 void
add_query(const char * q)311 add_query(const char *q)
312 {
313 struct query_entry *e;
314 char *q2;
315 size_t len;
316
317 e= my_malloc(sizeof(*e), MYF(0));
318 q2= my_strdup(q, MYF(0));
319 if (!e || !q2)
320 fatal(NULL, "Out of memory");
321
322 /* Remove any trailing newline. */
323 len= strlen(q2);
324 if (q2[len] == '\n')
325 q2[len--]= '\0';
326 if (q2[len] == '\r')
327 q2[len--]= '\0';
328
329 e->next= NULL;
330 e->query= q2;
331 e->index= query_counter++;
332 *tail_ptr= e;
333 tail_ptr= &e->next;
334 }
335
336
337 static my_bool
handle_option(int optid,const struct my_option * opt,char * arg)338 handle_option(int optid, const struct my_option *opt __attribute__((unused)),
339 char *arg)
340 {
341 switch (optid)
342 {
343 case '?':
344 printf("Usage: async_queries [OPTIONS] query ...\n");
345 my_print_help(options);
346 my_print_variables(options);
347 exit(0);
348 break;
349
350 case 'p':
351 if (arg)
352 opt_password= arg;
353 else
354 tty_password= 1;
355 break;
356 }
357
358 return 0;
359 }
360
361
362 int
main(int argc,char * argv[])363 main(int argc, char *argv[])
364 {
365 struct state_data *sds;
366 unsigned int i;
367 int err;
368 struct event_base *libevent_base;
369
370 err= handle_options(&argc, &argv, options, handle_option);
371 if (err)
372 exit(err);
373 if (tty_password)
374 opt_password= get_tty_password(NullS);
375
376 if (opt_query_file)
377 {
378 FILE *f= fopen(opt_query_file, "r");
379 char buf[65536];
380 if (!f)
381 fatal(NULL, "Cannot open query file");
382 while (!feof(f))
383 {
384 if (!fgets(buf, sizeof(buf), f))
385 break;
386 add_query(buf);
387 }
388 fclose(f);
389 }
390 /* Add extra queries directly on command line. */
391 while (argc > 0)
392 {
393 --argc;
394 add_query(*argv++);
395 }
396
397 sds= my_malloc(opt_connections * sizeof(*sds), MYF(0));
398 if (!sds)
399 fatal(NULL, "Out of memory");
400
401 libevent_base= event_init();
402
403 err= mysql_library_init(argc, argv, (char **)my_groups);
404 if (err)
405 {
406 fprintf(stderr, "Fatal: mysql_library_init() returns error: %d\n", err);
407 exit(1);
408 }
409
410 num_active_connections= 0;
411 for (i= 0; i < opt_connections; i++)
412 {
413 mysql_init(&sds[i].mysql);
414 mysql_options(&sds[i].mysql, MYSQL_OPT_NONBLOCK, 0);
415 mysql_options(&sds[i].mysql, MYSQL_READ_DEFAULT_GROUP, "async_queries");
416
417 /*
418 We put the initial connect call in the first state 0 of the state machine
419 and run that manually, just to have everything in one place.
420 */
421 sds[i].ST= 0;
422 num_active_connections++;
423 state_machine_handler(-1, -1, &sds[i]);
424 }
425
426 event_dispatch();
427
428 my_free(sds);
429
430 mysql_library_end();
431
432 event_base_free(libevent_base);
433
434 return 0;
435 }
436