1 /**
2 * @file flowgrind.c
3 * @brief Flowgrind controller
4 */
5
6 /*
7 * Copyright (C) 2013-2014 Alexander Zimmermann <alexander.zimmermann@netapp.com>
8 * Copyright (C) 2010-2013 Arnd Hannemann <arnd@arndnet.de>
9 * Copyright (C) 2010-2013 Christian Samsel <christian.samsel@rwth-aachen.de>
10 * Copyright (C) 2009 Tim Kosse <tim.kosse@gmx.de>
11 * Copyright (C) 2007-2008 Daniel Schaffrath <daniel.schaffrath@mac.com>
12 *
13 * This file is part of Flowgrind.
14 *
15 * Flowgrind is free software: you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation, either version 3 of the License, or
18 * (at your option) any later version.
19 *
20 * Flowgrind is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * GNU General Public License for more details.
24 *
25 * You should have received a copy of the GNU General Public License
26 * along with Flowgrind. If not, see <http://www.gnu.org/licenses/>.
27 *
28 */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif /* HAVE_CONFIG_H */
33
34 #include <assert.h>
35 #include <errno.h>
36 #include <limits.h>
37 #include <math.h>
38 #include <sys/types.h>
39 /* for AF_INET6 */
40 #include <sys/socket.h>
41 #include <arpa/inet.h>
42 #include <netinet/in.h>
43 #include <netinet/ip.h>
44 /* for CA states (on Linux only) */
45 #include <netinet/tcp.h>
46 #include <signal.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <string.h>
50 #include <sys/param.h>
51 #include <sys/uio.h>
52 #include <sys/utsname.h>
53 #include <time.h>
54 #include <unistd.h>
55 #include <fcntl.h>
56 #include <syslog.h>
57 /* xmlrpc-c */
58 #include <xmlrpc-c/base.h>
59 #include <xmlrpc-c/client.h>
60
61 #include "flowgrind.h"
62 #include "common.h"
63 #include "fg_error.h"
64 #include "fg_progname.h"
65 #include "fg_time.h"
66 #include "fg_definitions.h"
67 #include "fg_string.h"
68 #include "debug.h"
69 #include "fg_rpc_client.h"
70 #include "fg_argparser.h"
71 #include "fg_log.h"
72
73 /** To show intermediated interval report columns. */
74 #define SHOW_COLUMNS(...) \
75 (set_column_visibility(true, NARGS(__VA_ARGS__), __VA_ARGS__))
76
77 /** To hide intermediated interval report columns. */
78 #define HIDE_COLUMNS(...) \
79 (set_column_visibility(false, NARGS(__VA_ARGS__), __VA_ARGS__))
80
81 /** To set the unit of intermediated interval report columns. */
82 #define SET_COLUMN_UNIT(unit, ...) \
83 (set_column_unit(unit, NARGS(__VA_ARGS__), __VA_ARGS__))
84
85 /** Print error message, usage string and exit. Used for cmdline parsing errors. */
86 #define PARSE_ERR(err_msg, ...) do { \
87 errx(err_msg, ##__VA_ARGS__); \
88 usage(EXIT_FAILURE); \
89 } while (0)
90
91 /* External global variables */
92 extern const char *progname;
93
94 /** Logfile for measurement output. */
95 static FILE *log_stream = NULL;
96
97 /** Name of logfile. */
98 static char *log_filename = NULL;
99
100 /** SIGINT (CTRL-C) received? */
101 static bool sigint_caught = false;
102
103 /* XML-RPC environment object that contains any error that has occurred. */
104 static xmlrpc_env rpc_env;
105
106 /** Global linked list to the flow endpoints XML RPC connection information. */
107 static struct linked_list flows_rpc_info;
108
109 /** Global linked list to the daemons containing UUID and daemons flowgrind version. */
110 static struct linked_list unique_daemons;
111
112 /** Command line option parser. */
113 static struct arg_parser parser;
114
115 /** Controller options. */
116 static struct controller_options copt;
117
118 /** Infos about all flows including flow options. */
119 static struct cflow cflow[MAX_FLOWS_CONTROLLER];
120
121 /** Command line option parser. */
122 static struct arg_parser parser;
123
124 /** Number of currently active flows. */
125 static unsigned short active_flows = 0;
126
127 /* To cover a gcc bug (http://gcc.gnu.org/bugzilla/show_bug.cgi?id=36446) */
128 #pragma GCC diagnostic push
129 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
130 /** Infos about the intermediated interval report columns. */
131 static struct column column_info[] = {
132 {.type = COL_FLOW_ID, .header.name = "# ID",
133 .header.unit = "# ", .state.visible = true},
134 {.type = COL_BEGIN, .header.name = "begin",
135 .header.unit = "[s]", .state.visible = true},
136 {.type = COL_END, .header.name = "end",
137 .header.unit = "[s]", .state.visible = true},
138 {.type = COL_THROUGH, .header.name = "through",
139 .header.unit = "[Mbit/s]", .state.visible = true},
140 {.type = COL_TRANSAC, .header.name = "transac",
141 .header.unit = "[#/s]", .state.visible = true},
142 {.type = COL_BLOCK_REQU, .header.name = "requ",
143 .header.unit = "[#]", .state.visible = false},
144 {.type = COL_BLOCK_RESP, .header.name = "resp",
145 .header.unit = "[#]", .state.visible = false},
146 {.type = COL_RTT_MIN, .header.name = "min RTT",
147 .header.unit = "[ms]", .state.visible = false},
148 {.type = COL_RTT_AVG, .header.name = "avg RTT",
149 .header.unit = "[ms]", .state.visible = false},
150 {.type = COL_RTT_MAX, .header.name = "max RTT",
151 .header.unit = "[ms]", .state.visible = false},
152 {.type = COL_IAT_MIN, .header.name = "min IAT",
153 .header.unit = "[ms]", .state.visible = true},
154 {.type = COL_IAT_AVG, .header.name = "avg IAT",
155 .header.unit = "[ms]", .state.visible = true},
156 {.type = COL_IAT_MAX, .header.name = "max IAT",
157 .header.unit = "[ms]", .state.visible = true},
158 {.type = COL_DLY_MIN, .header.name = "min DLY",
159 .header.unit = "[ms]", .state.visible = false},
160 {.type = COL_DLY_AVG, .header.name = "avg DLY",
161 .header.unit = "[ms]", .state.visible = false},
162 {.type = COL_DLY_MAX, .header.name = "max DLY",
163 .header.unit = "[ms]", .state.visible = false},
164 {.type = COL_TCP_CWND, .header.name = "cwnd",
165 .header.unit = "[#]", .state.visible = true},
166 {.type = COL_TCP_SSTH, .header.name = "ssth",
167 .header.unit = "[#]", .state.visible = true},
168 {.type = COL_TCP_UACK, .header.name = "uack",
169 .header.unit = "[#]", .state.visible = true},
170 {.type = COL_TCP_SACK, .header.name = "sack",
171 .header.unit = "[#]", .state.visible = true},
172 {.type = COL_TCP_LOST, .header.name = "lost",
173 .header.unit = "[#]", .state.visible = true},
174 {.type = COL_TCP_RETR, .header.name = "retr",
175 .header.unit = "[#]", .state.visible = true},
176 {.type = COL_TCP_TRET, .header.name = "tret",
177 .header.unit = "[#]", .state.visible = true},
178 {.type = COL_TCP_FACK, .header.name = "fack",
179 .header.unit = "[#]", .state.visible = true},
180 {.type = COL_TCP_REOR, .header.name = "reor",
181 .header.unit = "[#]", .state.visible = true},
182 {.type = COL_TCP_BKOF, .header.name = "bkof",
183 .header.unit = "[#]", .state.visible = true},
184 {.type = COL_TCP_RTT, .header.name = "rtt",
185 .header.unit = "[ms]", .state.visible = true},
186 {.type = COL_TCP_RTTVAR, .header.name = "rttvar",
187 .header.unit = "[ms]", .state.visible = true},
188 {.type = COL_TCP_RTO, .header.name = "rto",
189 .header.unit = "[ms]", .state.visible = true},
190 {.type = COL_TCP_CA_STATE, .header.name = "ca state",
191 .header.unit = "", .state.visible = true},
192 {.type = COL_SMSS, .header.name = "smss",
193 .header.unit = "[B]", .state.visible = true},
194 {.type = COL_PMTU, .header.name = "pmtu",
195 .header.unit = "[B]", .state.visible = true},
196 #ifdef DEBUG
197 {.type = COL_STATUS, .header.name = "status",
198 .header.unit = "", .state.visible = false}
199 #endif /* DEBUG */
200 };
201 #pragma GCC diagnostic pop
202
203 /* Forward declarations */
204 static void usage(short status)
205 __attribute__((noreturn));
206 static void usage_sockopt(void)
207 __attribute__((noreturn));
208 static void usage_trafgenopt(void)
209 __attribute__((noreturn));
210 inline static void print_output(const char *fmt, ...)
211 __attribute__((format(printf, 1, 2)));
212 static void fetch_reports(xmlrpc_client *);
213 static void report_flow(struct report* report);
214 static void print_interval_report(unsigned short flow_id, enum endpoint_t e,
215 struct report *report);
216
217 /**
218 * Print usage or error message and exit.
219 *
220 * Depending on exit status @p status print either the usage or an error
221 * message. In all cases it call exit() with the given exit status @p status.
222 *
223 * @param[in] status exit status
224 */
usage(short status)225 static void usage(short status)
226 {
227 /* Syntax error. Emit 'try help' to stderr and exit */
228 if (status != EXIT_SUCCESS) {
229 fprintf(stderr, "Try '%s -h' for more information\n", progname);
230 exit(status);
231 }
232
233 fprintf(stdout,
234 "Usage: %1$s [OPTION]...\n"
235 "Advanced TCP traffic generator for Linux, FreeBSD, and Mac OS X.\n\n"
236
237 "Mandatory arguments to long options are mandatory for short options too.\n\n"
238
239 "General options:\n"
240 " -h, --help[=WHAT]\n"
241 " display help and exit. Optional WHAT can either be 'socket' for\n"
242 " help on socket options or 'traffic' traffic generation help\n"
243 " -v, --version print version information and exit\n\n"
244
245 "Controller options:\n"
246 " -c, --show-colon=TYPE[,TYPE]...\n"
247 " display intermediated interval report column TYPE in output.\n"
248 " Allowed values for TYPE are: 'interval', 'through', 'transac',\n"
249 " 'iat', 'kernel' (all show per default), and 'blocks', 'rtt',\n"
250 #ifdef DEBUG
251 " 'delay', 'status' (optional)\n"
252 #else /* DEBUG */
253 " 'delay' (optional)\n"
254 #endif /* DEBUG */
255 #ifdef DEBUG
256 " -d, --debug increase debugging verbosity. Add option multiple times to\n"
257 " increase the verbosity\n"
258 #endif /* DEBUG */
259 " -e, --dump-prefix=PRE\n"
260 " prepend prefix PRE to pcap dump filename (default: \"%3$s\")\n"
261 " -i, --report-interval=#.#\n"
262 " reporting interval, in seconds (default: 0.05s)\n"
263 " --log-file[=FILE]\n"
264 " write output to logfile FILE (default: %1$s-'timestamp'.log)\n"
265 " -m report throughput in 2**20 bytes/s (default: 10**6 bit/s)\n"
266 " -n, --flows=# number of test flows (default: 1)\n"
267 " -o overwrite existing log files (default: don't)\n"
268 " -p don't print symbolic values (like INT_MAX) instead of numbers\n"
269 " -q, --quiet be quiet, do not log to screen (default: off)\n"
270 " -s, --tcp-stack=TYPE\n"
271 " don't determine unit of source TCP stacks automatically. Force\n"
272 " unit to TYPE, where TYPE is 'segment' or 'byte'\n"
273 " -w write output to logfile (same as --log-file)\n\n"
274
275 "Flow options:\n"
276 " Some of these options take the flow endpoint as argument, denoted by 'x' in\n"
277 " the option syntax. 'x' needs to be replaced with either 's' for the source\n"
278 " endpoint, 'd' for the destination endpoint or 'b' for both endpoints. To\n"
279 " specify different values for each endpoints, separate them by comma. For\n"
280 " instance -W s=8192,d=4096 sets the advertised window to 8192 at the source\n"
281 " and 4096 at the destination.\n\n"
282 " -A x use minimal response size needed for RTT calculation\n"
283 " (same as -G s=p,C,%2$d)\n"
284 " -B x=# set requested sending buffer, in bytes\n"
285 " -C x stop flow if it is experiencing local congestion\n"
286 " -D x=DSCP DSCP value for TOS byte\n"
287 " -E enumerate bytes in payload instead of sending zeros\n"
288 " -F #[,#]... flow options following this option apply only to the given flow \n"
289 " IDs. Useful in combination with -n to set specific options\n"
290 " for certain flows. Numbering starts with 0, so -F 1 refers\n"
291 " to the second flow. With -1 all flow are refered\n"
292 #ifdef HAVE_LIBGSL
293 " -G x=(q|p|g):(C|U|E|N|L|P|W):#1:[#2]\n"
294 #else /* HAVE_LIBGSL */
295 " -G x=(q|p|g):(C|U):#1:[#2]\n"
296 #endif /* HAVE_LIBGSL */
297 " activate stochastic traffic generation and set parameters\n"
298 " according to the used distribution. For additional information \n"
299 " see 'flowgrind --help=traffic'\n"
300 " -H x=HOST[/CONTROL[:PORT]]\n"
301 " test from/to HOST. Optional argument is the address and port\n"
302 " for the CONTROL connection to the same host.\n"
303 " An endpoint that isn't specified is assumed to be localhost\n"
304 " -J # use random seed # (default: read /dev/urandom)\n"
305 " -I enable one-way delay calculation (no clock synchronization)\n"
306 " -L call connect() on test socket immediately before starting to\n"
307 " send data (late connect). If not specified the test connection\n"
308 " is established in the preparation phase before the test starts\n"
309 " -M x dump traffic using libpcap. flowgrindd must be run as root\n"
310 " -N shutdown() each socket direction after test flow\n"
311 " -O x=OPT set socket option OPT on test socket. For additional information\n"
312 " see 'flowgrind --help=socket'\n"
313 " -P x do not iterate through select() to continue sending in case\n"
314 " block size did not suffice to fill sending queue (pushy)\n"
315 " -Q summarize only, no intermediated interval reports are\n"
316 " computed (quiet)\n"
317 " -R x=#.#(z|k|M|G)(b|B)\n"
318 " send at specified rate per second, where: z = 2**0, k = 2**10,\n"
319 " M = 2**20, G = 2**30, and b = bits/s (default), B = bytes/s\n"
320 " -S x=# set block (message) size, in bytes (same as -G s=q,C,#)\n"
321 " -T x=#.# set flow duration, in seconds (default: s=10,d=0)\n"
322 " -U x=# set application buffer size, in bytes (default: 8192)\n"
323 " truncates values if used with stochastic traffic generation\n"
324 " -W x=# set requested receiver buffer (advertised window), in bytes\n"
325 " -Y x=#.# set initial delay before the host starts to send, in seconds\n"
326 /* " -Z x=#.# set amount of data to be send, in bytes (instead of -t)\n"*/,
327 progname,
328 MIN_BLOCK_SIZE
329 , copt.dump_prefix
330 );
331 exit(EXIT_SUCCESS);
332 }
333
334 /**
335 * Print help on flowgrind's socket options and exit with EXIT_SUCCESS.
336 */
usage_sockopt(void)337 static void usage_sockopt(void)
338 {
339 fprintf(stdout,
340 "%s allows to set the following standard and non-standard socket options. \n\n"
341
342 "All socket options take the flow endpoint as argument, denoted by 'x' in the\n"
343 "option syntax. 'x' needs to be replaced with either 's' for the source endpoint,\n"
344 "'d' for the destination endpoint or 'b' for both endpoints. To specify different\n"
345 "values for each endpoints, separate them by comma. Moreover, it is possible to\n"
346 "repeatedly pass the same endpoint in order to specify multiple socket options\n\n"
347
348 "Standard socket options:\n"
349 " -O x=TCP_CONGESTION=ALG\n"
350 " set congestion control algorithm ALG on test socket\n"
351 " -O x=TCP_CORK\n"
352 " set TCP_CORK on test socket\n"
353 " -O x=TCP_NODELAY\n"
354 " disable nagle algorithm on test socket\n"
355 " -O x=SO_DEBUG\n"
356 " set SO_DEBUG on test socket\n"
357 " -O x=IP_MTU_DISCOVER\n"
358 " set IP_MTU_DISCOVER on test socket if not already enabled by\n"
359 " system default\n"
360 " -O x=ROUTE_RECORD\n"
361 " set ROUTE_RECORD on test socket\n\n"
362
363 "Non-standard socket options:\n"
364 " -O x=TCP_MTCP\n"
365 " set TCP_MTCP (15) on test socket\n"
366 " -O x=TCP_ELCN\n"
367 " set TCP_ELCN (20) on test socket\n"
368 " -O x=TCP_LCD set TCP_LCD (21) on test socket\n\n"
369
370 "Examples:\n"
371 " -O s=TCP_CONGESTION=reno,d=SO_DEBUG\n"
372 " sets Reno TCP as congestion control algorithm at the source and\n"
373 " SO_DEBUG as socket option at the destinatio\n"
374 " -O s=SO_DEBUG,s=TCP_CORK\n"
375 " set SO_DEBUG and TCP_CORK as socket option at the source\n",
376 progname);
377 exit(EXIT_SUCCESS);
378 }
379
380 /**
381 * Print help on flowgrind's traffic generation facilities and exit with EXIT_SUCCESS.
382 */
usage_trafgenopt(void)383 static void usage_trafgenopt(void)
384 {
385 fprintf(stdout,
386 "%s supports stochastic traffic generation, which allows to conduct\n"
387 "besides normal bulk also advanced rate-limited and request-response data\n"
388 "transfers.\n\n"
389
390 "The stochastic traffic generation option '-G' takes the flow endpoint as\n"
391 "argument, denoted by 'x' in the option syntax. 'x' needs to be replaced with\n"
392 "either 's' for the source endpoint, 'd' for the destination endpoint or 'b' for\n"
393 "both endpoints. However, please note that bidirectional traffic generation can\n"
394 "lead to unexpected results. To specify different values for each endpoints,\n"
395 "separate them by comma.\n\n"
396
397 "Stochastic traffic generation:\n"
398 #ifdef HAVE_LIBGSL
399 " -G x=(q|p|g):(C|U|E|N|L|P|W):#1:[#2]\n"
400 #else /* HAVE_LIBGSL */
401 " -G x=(q|p|g):(C|U):#1:[#2]\n"
402 #endif /* HAVE_LIBGSL */
403 " Flow parameter:\n"
404 " q = request size (in bytes)\n"
405 " p = response size (in bytes)\n"
406 " g = request interpacket gap (in seconds)\n\n"
407
408 " Distributions:\n"
409 " C = constant (#1: value, #2: not used)\n"
410 " U = uniform (#1: min, #2: max)\n"
411 #ifdef HAVE_LIBGSL
412 " E = exponential (#1: lamba - lifetime, #2: not used)\n"
413 " N = normal (#1: mu - mean value, #2: sigma_square - variance)\n"
414 " L = lognormal (#1: zeta - mean, #2: sigma - std dev)\n"
415 " P = pareto (#1: k - shape, #2 x_min - scale)\n"
416 " W = weibull (#1: lambda - scale, #2: k - shape)\n"
417 #else /* HAVE_LIBGSL */
418 " advanced distributions are only available if compiled with libgsl\n"
419 #endif /* HAVE_LIBGSL */
420 " -U x=# specify a cap for the calculated values for request and response\n"
421 " size (not needed for constant values or uniform distribution),\n"
422 " values over this cap are recalculated\n\n"
423
424 "Examples:\n"
425 " -G s=q:C:40\n"
426 " use contant request size of 40 bytes\n"
427 " -G s=p:N:2000:50\n"
428 " use normal distributed response size with mean 2000 bytes and\n"
429 " variance 50\n"
430 " -G s=g:U:0.005:0.01\n"
431 " use uniform distributed interpacket gap with minimum 0.005s and\n"
432 " maximum 0.01s\n\n"
433
434 "Notes: \n"
435 " - The man page contains more explained examples\n"
436 " - Using bidirectional traffic generation can lead to unexpected results\n"
437 " - Usage of -G in conjunction with -A, -R, -S is not recommended, as they\n"
438 " overwrite each other. -A, -R and -S exist as shortcut only\n",
439 progname);
440 exit(EXIT_SUCCESS);
441 }
442
443 /**
444 * Signal handler to catching signals.
445 *
446 * @param[in] sig signal to catch
447 */
sighandler(int sig)448 static void sighandler(int sig)
449 {
450 UNUSED_ARGUMENT(sig);
451
452 DEBUG_MSG(LOG_ERR, "caught %s", strsignal(sig));
453
454 if (!sigint_caught) {
455 warnx("caught SIGINT, trying to gracefully close flows. "
456 "Press CTRL+C again to force termination \n");
457 sigint_caught = true;
458 } else {
459 exit(EXIT_FAILURE);
460 }
461 }
462
463 /**
464 * Initialization of general controller options.
465 */
init_controller_options(void)466 static void init_controller_options(void)
467 {
468 copt.num_flows = 1;
469 copt.reporting_interval = 0.05;
470 copt.log_to_stdout = true;
471 copt.log_to_file = false;
472 copt.dump_prefix = "flowgrind-";
473 copt.clobber = false;
474 copt.mbyte = false;
475 copt.symbolic = true;
476 copt.force_unit = INT_MAX;
477 }
478
479 /**
480 * Initilization the flow option to default values.
481 *
482 * Initializes the controller flow option settings,
483 * final report for both source and destination daemon
484 * in the flow.
485 */
init_flow_options(void)486 static void init_flow_options(void)
487 {
488 for (int id = 0; id < MAX_FLOWS_CONTROLLER; id++) {
489
490 cflow[id].proto = PROTO_TCP;
491
492 foreach(int *i, SOURCE, DESTINATION) {
493 cflow[id].settings[*i].requested_send_buffer_size = 0;
494 cflow[id].settings[*i].requested_read_buffer_size = 0;
495 cflow[id].settings[*i].delay[WRITE] = 0;
496 cflow[id].settings[*i].maximum_block_size = 8192;
497 cflow[id].settings[*i].request_trafgen_options.param_one = 8192;
498 cflow[id].settings[*i].response_trafgen_options.param_one = 0;
499 cflow[id].settings[*i].route_record = 0;
500 strcpy(cflow[id].endpoint[*i].test_address, "localhost");
501
502 /* Default daemon is localhost, set in parse_cmdline */
503 cflow[id].endpoint[*i].rpc_info = 0;
504 cflow[id].endpoint[*i].daemon = 0;
505
506 cflow[id].settings[*i].pushy = 0;
507 cflow[id].settings[*i].cork = 0;
508 cflow[id].settings[*i].cc_alg[0] = 0;
509 cflow[id].settings[*i].elcn = 0;
510 cflow[id].settings[*i].lcd = 0;
511 cflow[id].settings[*i].mtcp = 0;
512 cflow[id].settings[*i].nonagle = 0;
513 cflow[id].settings[*i].traffic_dump = 0;
514 cflow[id].settings[*i].so_debug = 0;
515 cflow[id].settings[*i].dscp = 0;
516 cflow[id].settings[*i].ipmtudiscover = 0;
517
518 cflow[id].settings[*i].num_extra_socket_options = 0;
519 }
520 cflow[id].settings[SOURCE].duration[WRITE] = 10.0;
521 cflow[id].settings[DESTINATION].duration[WRITE] = 0.0;
522
523 cflow[id].endpoint_id[0] = cflow[id].endpoint_id[1] = -1;
524 cflow[id].start_timestamp[0].tv_sec = 0;
525 cflow[id].start_timestamp[0].tv_nsec = 0;
526 cflow[id].start_timestamp[1].tv_sec = 0;
527 cflow[id].start_timestamp[1].tv_nsec = 0;
528
529 cflow[id].finished[0] = 0;
530 cflow[id].finished[1] = 0;
531 cflow[id].final_report[0] = NULL;
532 cflow[id].final_report[1] = NULL;
533
534 cflow[id].summarize_only = 0;
535 cflow[id].late_connect = 0;
536 cflow[id].shutdown = 0;
537 cflow[id].byte_counting = 0;
538 cflow[id].random_seed = 0;
539
540 int data = open("/dev/urandom", O_RDONLY);
541 int rc = read(data, &cflow[id].random_seed, sizeof (int) );
542 close(data);
543 if(rc == -1)
544 crit("read /dev/urandom failed");
545 }
546 }
547
548 /**
549 * Create a logfile for measurement output.
550 */
open_logfile(void)551 static void open_logfile(void)
552 {
553 if (!copt.log_to_file)
554 return;
555
556 /* Log filename is not given by cmdline */
557 if (!log_filename) {
558 if (asprintf(&log_filename, "%s-%s.log", progname,
559 ctimenow(false)) == -1)
560 critx("could not allocate memory for log filename");
561 }
562
563 if (!copt.clobber && access(log_filename, R_OK) == 0)
564 critx("log file exists");
565
566 log_stream = fopen(log_filename, "w");
567 if (!log_stream)
568 critx("could not open logfile '%s'", log_filename);
569
570 DEBUG_MSG(LOG_NOTICE, "logging to '%s'", log_filename);
571 }
572
573 /**
574 * Close measurement output file.
575 */
close_logfile(void)576 static void close_logfile(void)
577 {
578 if (!copt.log_to_file)
579 return;
580 if (fclose(log_stream) == -1)
581 critx("could not close logfile '%s'", log_filename);
582
583 free(log_filename);
584 }
585
586 /**
587 * Print measurement output to logfile and / or to stdout.
588 *
589 * @param[in] fmt format string
590 * @param[in] ... parameters used to fill fmt
591 */
print_output(const char * fmt,...)592 inline static void print_output(const char *fmt, ...)
593 {
594 va_list ap;
595
596 va_start(ap, fmt);
597 if (copt.log_to_stdout) {
598 vprintf(fmt, ap);
599 fflush(stdout);
600 }
601 if (copt.log_to_file) {
602 vfprintf(log_stream, fmt, ap);
603 fflush(log_stream);
604 }
605 va_end(ap);
606 }
607
die_if_fault_occurred(xmlrpc_env * env)608 inline static void die_if_fault_occurred(xmlrpc_env *env)
609 {
610 if (env->fault_occurred)
611 critx("XML-RPC Fault: %s (%d)", env->fault_string, env->fault_code);
612 }
613
614 /* creates an xmlrpc_client for connect to server, uses global env rpc_env */
prepare_xmlrpc_client(xmlrpc_client ** rpc_client)615 static void prepare_xmlrpc_client(xmlrpc_client **rpc_client)
616 {
617 struct xmlrpc_clientparms clientParms;
618 size_t clientParms_cpsize = XMLRPC_CPSIZE(transport);
619
620 /* Since version 1.21 xmlrpclib will automatically generate a
621 * rather long user_agent, we will do a lot of RPC calls so let's
622 * spare some bytes and omit this header */
623 #ifdef HAVE_STRUCT_XMLRPC_CURL_XPORTPARMS_DONT_ADVERTISE
624 struct xmlrpc_curl_xportparms curlParms;
625 memset(&curlParms, 0, sizeof(curlParms));
626
627 curlParms.dont_advertise = 1;
628 clientParms.transportparmsP = &curlParms;
629 clientParms.transportparm_size = XMLRPC_CXPSIZE(dont_advertise);
630 clientParms_cpsize = XMLRPC_CPSIZE(transportparm_size);
631 #endif /* HAVE_STRUCT_XMLRPC_CURL_XPORTPARMS_DONT_ADVERTISE */
632
633 /* Force usage of curl transport, we require it in configure script
634 * anyway and at least FreeBSD 9.1 will use libwww otherwise */
635 clientParms.transport = "curl";
636
637 DEBUG_MSG(LOG_WARNING, "prepare xmlrpc client");
638 xmlrpc_client_create(&rpc_env, XMLRPC_CLIENT_NO_FLAGS, "Flowgrind",
639 FLOWGRIND_VERSION, &clientParms,
640 clientParms_cpsize, rpc_client);
641 }
642
643 /**
644 * Checks all the daemons flowgrind version.
645 *
646 * Collect the daemons flowgrind version, XML-RPC API version,
647 * OS name and release details. Store these information in the
648 * daemons linked list for the result display
649 *
650 * @param[in,out] rpc_client to connect controller to daemon
651 */
check_version(xmlrpc_client * rpc_client)652 static void check_version(xmlrpc_client *rpc_client)
653 {
654 xmlrpc_value * resultP = 0;
655 char mismatch = 0;
656
657 const struct list_node *node = fg_list_front(&unique_daemons);
658
659 while (node) {
660 if (sigint_caught)
661 return;
662
663 struct daemon *daemon = node->data;
664 node = node->next;
665
666 xmlrpc_client_call2f(&rpc_env, rpc_client, daemon->url,
667 "get_version", &resultP, "()");
668 if ((rpc_env.fault_occurred) && (strcasestr(rpc_env.fault_string,"response code is 400")))
669 critx("node %s could not parse request.You are "
670 "probably trying to use a numeric IPv6 address "
671 "and the node's libxmlrpc is too old, please "
672 "upgrade!", daemon->url);
673
674 die_if_fault_occurred(&rpc_env);
675
676 /* Decomposes the xmlrpc value and extract the daemons data in
677 * it into controller local variable */
678 if (resultP) {
679 char* version;
680 int api_version;
681 char* os_name;
682 char* os_release;
683 xmlrpc_decompose_value(&rpc_env, resultP, "{s:s,s:i,s:s,s:s,*}",
684 "version", &version,
685 "api_version", &api_version,
686 "os_name", &os_name,
687 "os_release", &os_release);
688 die_if_fault_occurred(&rpc_env);
689
690 if (strcmp(version, FLOWGRIND_VERSION)) {
691 mismatch = 1;
692 warnx("node %s uses version %s",
693 daemon->url, version);
694 }
695 /* Store the daemons XML RPC API version,
696 * OS name and release in daemons linked list */
697 daemon->api_version = api_version;
698 strncpy(daemon->os_name, os_name, 256);
699 strncpy(daemon->os_release, os_release, 256);
700 free_all(version, os_name, os_release);
701 xmlrpc_DECREF(resultP);
702 }
703 }
704
705 if (mismatch) {
706 warnx("our version is %s\n\nContinuing in 5 seconds", FLOWGRIND_VERSION);
707 sleep(5);
708 }
709 }
710
711 /**
712 * Add daemon for controller flow by UUID.
713 *
714 * Stores the daemons data and push the data in linked list
715 * which contains daemons UUID and daemons XML RPC url
716 *
717 * @param[in,out] server_uuid UUID from daemons
718 * @param[in,out] daemon_url URL from daemons
719 */
add_daemon_by_uuid(const char * server_uuid,char * daemon_url)720 static struct daemon * add_daemon_by_uuid(const char* server_uuid,
721 char* daemon_url)
722 {
723 struct daemon *daemon;
724 daemon = malloc((sizeof(struct daemon)));
725
726 if (!daemon) {
727 logging(LOG_ALERT, "could not allocate memory for daemon");
728 return 0;
729 }
730
731 memset(daemon, 0, sizeof(struct daemon));
732 strcpy(daemon->uuid, server_uuid);
733 daemon->url = daemon_url;
734 fg_list_push_back(&unique_daemons, daemon);
735 return daemon;
736 }
737
738 /**
739 * Determine the daemons for controller flow by UUID.
740 *
741 * Determine the daemons memory block size by number of server in
742 * the controller flow option.
743 *
744 * @param[in,out] server_uuid UUID from daemons
745 * @param[in,out] daemon_url URL from daemons
746 */
set_unique_daemon_by_uuid(const char * server_uuid,char * daemon_url)747 static struct daemon * set_unique_daemon_by_uuid(const char* server_uuid,
748 char* daemon_url)
749 {
750 /* Store the first daemon UUID and XML RPC url connection string.
751 * First daemon is used as reference to avoid the daemon duplication
752 * by their UUID */
753 if (fg_list_size(&unique_daemons) == 0)
754 return add_daemon_by_uuid(server_uuid, daemon_url);
755
756 /* Compare the incoming daemons UUID with all daemons UUID in
757 * memory in order to prevent dupliclity in storing the daemons.
758 * If the incoming daemon UUID is already present in the daemons list,
759 * then return existing daemon pointer to controller connection.
760 * This is because a single daemons can run and maintain mutliple
761 * data connection */
762 const struct list_node *node = fg_list_front(&unique_daemons);
763 while (node) {
764 struct daemon *daemon = node->data;
765 node = node->next;
766 if (!strcmp(daemon->uuid, server_uuid))
767 return daemon;
768 }
769
770 return add_daemon_by_uuid(server_uuid, daemon_url);
771 }
772
773 /**
774 * Set the daemon for controller flow endpoint.
775 *
776 * @param[in,out] server_uuid UUID from daemons
777 * @param[in,out] daemon_url URL from daemons
778 */
set_flow_endpoint_daemon(const char * server_uuid,char * server_url)779 static void set_flow_endpoint_daemon(const char* server_uuid, char* server_url)
780 {
781 /* Determine the daemon in controller flow data by UUID
782 * This prevent the daemons duplication */
783 for (unsigned id = 0; id < copt.num_flows; id++) {
784 foreach(int *i, SOURCE, DESTINATION) {
785 struct flow_endpoint* e = &cflow[id].endpoint[*i];
786 if(!strcmp(e->rpc_info->server_url, server_url) && !e->daemon) {
787 e->daemon = set_unique_daemon_by_uuid(server_uuid,
788 server_url);
789 }
790 }
791 }
792 }
793
794 /**
795 * Checks all daemons in flow option.
796 *
797 * Daemon UUID is retrieved and this information is used
798 * to determine daemon in the controller flow information.
799 *
800 * The UUID is used to detect if the same daemon instance is controlling
801 * flows via different IP adresses.
802 *
803 * @param[in,out] rpc_client to connect controller to daemon
804 */
find_daemon(xmlrpc_client * rpc_client)805 static void find_daemon(xmlrpc_client *rpc_client)
806 {
807 xmlrpc_value * resultP = 0;
808 const struct list_node *node = fg_list_front(&flows_rpc_info);
809
810 while (node) {
811 if (sigint_caught)
812 return;
813
814 struct rpc_info *flow_rpc_info= node->data;
815 node = node->next;
816 /* call daemons by flow option XML-RPC URL connection string */
817 xmlrpc_client_call2f(&rpc_env, rpc_client,
818 flow_rpc_info->server_url,
819 "get_uuid", &resultP, "()");
820 die_if_fault_occurred(&rpc_env);
821
822 /* Decomposes the xmlrpc_value and extract the daemon UUID
823 * in it into controller local variable */
824 if (resultP) {
825 char* server_uuid = 0;
826
827 xmlrpc_decompose_value(&rpc_env, resultP, "{s:s,*}",
828 "server_uuid", &server_uuid);
829 set_flow_endpoint_daemon(server_uuid, flow_rpc_info->server_url);
830 die_if_fault_occurred(&rpc_env);
831 xmlrpc_DECREF(resultP);
832 }
833 }
834 }
835
836 /**
837 * Checks that all nodes are currently idle.
838 *
839 * Get the daemon's flow start status and number of flows running in
840 * a daemon. This piece of information is used to determine, whether the
841 * daemon in a node is busy or idle.
842 *
843 * @param[in,out] rpc_client to connect controller to daemon
844 */
check_idle(xmlrpc_client * rpc_client)845 static void check_idle(xmlrpc_client *rpc_client)
846 {
847 xmlrpc_value * resultP = 0;
848 const struct list_node *node = fg_list_front(&unique_daemons);
849
850 while (node) {
851 if (sigint_caught)
852 return;
853
854 struct daemon *daemon = node->data;
855 node = node->next;
856
857 xmlrpc_client_call2f(&rpc_env, rpc_client,
858 daemon->url,
859 "get_status", &resultP, "()");
860 die_if_fault_occurred(&rpc_env);
861
862 /* Decomposes the xmlrpc_value and extract the daemons data
863 * in it into controller local variable */
864 if (resultP) {
865 int started;
866 int num_flows;
867
868 xmlrpc_decompose_value(&rpc_env, resultP,
869 "{s:i,s:i,*}", "started",
870 &started, "num_flows",
871 &num_flows);
872 die_if_fault_occurred(&rpc_env);
873
874 /* Daemon start status and number of flows is used to
875 * determine node idle status */
876 if (started || num_flows)
877 critx("node %s is busy. %d flows, started=%d",
878 daemon->url, num_flows,
879 started);
880 xmlrpc_DECREF(resultP);
881 }
882 }
883 }
884
885 /**
886 * To show/hide intermediated interval report columns.
887 *
888 * @param[in] visibility show/hide column
889 * @param[in] nargs length of variable argument list
890 * @param[in] ... column IDs
891 * @see enum column_id
892 */
set_column_visibility(bool visibility,unsigned nargs,...)893 static void set_column_visibility(bool visibility, unsigned nargs, ...)
894 {
895 va_list ap;
896 enum column_id col_id;
897
898 va_start(ap, nargs);
899 while (nargs--) {
900 col_id = va_arg(ap, enum column_id);
901 column_info[col_id].state.visible = visibility;
902 }
903 va_end(ap);
904 }
905
906 /**
907 * To set the unit the in header of intermediated interval report columns.
908 *
909 * @param[in] unit unit of column header as string
910 * @param[in] nargs length of variable argument list
911 * @param[in] ... column IDs
912 * @see enum column_id
913 */
set_column_unit(const char * unit,unsigned nargs,...)914 static void set_column_unit(const char *unit, unsigned nargs, ...)
915 {
916 va_list ap;
917 enum column_id col_id;
918
919 va_start(ap, nargs);
920 while (nargs--) {
921 col_id = va_arg(ap, enum column_id);
922 column_info[col_id].header.unit = unit;
923 }
924 va_end(ap);
925 }
926
927 /**
928 * Print headline with various informations before the actual measurement will
929 * be begin.
930 */
print_headline(void)931 static void print_headline(void)
932 {
933 /* Print headline */
934 struct utsname me;
935 int rc = uname(&me);
936 print_output("# Date: %s, controlling host = %s, number of flows = %d, "
937 "reporting interval = %.2fs, [through] = %s (%s)\n",
938 ctimenow(false), (rc == -1 ? "(unknown)" : me.nodename),
939 copt.num_flows, copt.reporting_interval,
940 (copt.mbyte ? "2**20 bytes/second": "10**6 bit/second"),
941 FLOWGRIND_VERSION);
942
943 /* Prepare column visibility based on involved OSes */
944 bool involved_os[] = {[0 ... NUM_OSes-1] = false};
945 const struct list_node *node = fg_list_front(&unique_daemons);
946 while (node) {
947 struct daemon *daemon = node->data;
948 node = node->next;
949 if (!strcmp(daemon->os_name, "Linux"))
950 involved_os[LINUX] = true;
951 else if (!strcmp(daemon->os_name, "FreeBSD"))
952 involved_os[FREEBSD] = true;
953 else if (!strcmp(daemon->os_name, "Darwin"))
954 involved_os[DARWIN] = true;
955 }
956
957 /* No Linux OS is involved in the test */
958 if (!involved_os[LINUX])
959 HIDE_COLUMNS(COL_TCP_UACK, COL_TCP_SACK, COL_TCP_LOST,
960 COL_TCP_RETR, COL_TCP_TRET, COL_TCP_FACK,
961 COL_TCP_REOR, COL_TCP_BKOF, COL_TCP_CA_STATE,
962 COL_PMTU);
963
964 /* No Linux and FreeBSD OS is involved in the test */
965 if (!involved_os[FREEBSD] && !involved_os[LINUX])
966 HIDE_COLUMNS(COL_TCP_CWND, COL_TCP_SSTH, COL_TCP_RTT,
967 COL_TCP_RTTVAR, COL_TCP_RTO, COL_SMSS);
968
969 const struct list_node *firstnode = fg_list_front(&unique_daemons);
970 struct daemon *daemon_firstnode = firstnode->data;
971
972 /* Set unit for kernel TCP metrics to bytes */
973 if (copt.force_unit == BYTE_BASED || (copt.force_unit != SEGMENT_BASED &&
974 strcmp(daemon_firstnode->os_name, "Linux")))
975 SET_COLUMN_UNIT(" [B]", COL_TCP_CWND, COL_TCP_SSTH,
976 COL_TCP_UACK, COL_TCP_SACK, COL_TCP_LOST,
977 COL_TCP_RETR, COL_TCP_TRET, COL_TCP_FACK,
978 COL_TCP_REOR, COL_TCP_BKOF);
979 }
980
981 /**
982 * Prepare test connection for a flow between source and destination daemons.
983 *
984 * Controller sends the flow option to source and destination daemons
985 * separately through XML RPC connection and get backs the flow id and
986 * snd/rcx buffer size from the daemons.
987 *
988 * @param[in] id flow id to prepare the test connection in daemons
989 * @param[in,out] rpc_client to connect controller to daemon
990 */
prepare_flow(int id,xmlrpc_client * rpc_client)991 static void prepare_flow(int id, xmlrpc_client *rpc_client)
992 {
993 xmlrpc_value *resultP, *extra_options;
994
995 int listen_data_port;
996 DEBUG_MSG(LOG_WARNING, "prepare flow %d destination", id);
997
998 /* Contruct extra socket options array */
999 extra_options = xmlrpc_array_new(&rpc_env);
1000 for (int i = 0; i < cflow[id].settings[DESTINATION].num_extra_socket_options; i++) {
1001 xmlrpc_value *value;
1002 xmlrpc_value *option = xmlrpc_build_value(&rpc_env, "{s:i,s:i}",
1003 "level", cflow[id].settings[DESTINATION].extra_socket_options[i].level,
1004 "optname", cflow[id].settings[DESTINATION].extra_socket_options[i].optname);
1005
1006 value = xmlrpc_base64_new(&rpc_env, cflow[id].settings[DESTINATION].extra_socket_options[i].optlen, (unsigned char*)cflow[id].settings[DESTINATION].extra_socket_options[i].optval);
1007
1008 xmlrpc_struct_set_value(&rpc_env, option, "value", value);
1009
1010 xmlrpc_array_append_item(&rpc_env, extra_options, option);
1011 xmlrpc_DECREF(value);
1012 xmlrpc_DECREF(option);
1013 }
1014 xmlrpc_client_call2f(&rpc_env, rpc_client,
1015 cflow[id].endpoint[DESTINATION].rpc_info->server_url,
1016 "add_flow_destination", &resultP,
1017 "("
1018 "{s:s}"
1019 "{s:i}"
1020 "{s:d,s:d,s:d,s:d,s:d}"
1021 "{s:i,s:i}"
1022 "{s:i}"
1023 "{s:b,s:b,s:b,s:b,s:b}"
1024 "{s:i,s:i}"
1025 "{s:i,s:d,s:d}" /* request */
1026 "{s:i,s:d,s:d}" /* response */
1027 "{s:i,s:d,s:d}" /* interpacket_gap */
1028 "{s:b,s:b,s:i,s:i}"
1029 "{s:s}"
1030 "{s:i,s:i,s:i,s:i,s:i}"
1031 "{s:s}"
1032 "{s:i,s:A}"
1033 ")",
1034
1035 /* general flow settings */
1036 "bind_address", cflow[id].endpoint[DESTINATION].test_address,
1037
1038 "flow_id",id,
1039
1040 "write_delay", cflow[id].settings[DESTINATION].delay[WRITE],
1041 "write_duration", cflow[id].settings[DESTINATION].duration[WRITE],
1042 "read_delay", cflow[id].settings[SOURCE].delay[WRITE],
1043 "read_duration", cflow[id].settings[SOURCE].duration[WRITE],
1044 "reporting_interval", cflow[id].summarize_only ? 0 : copt.reporting_interval,
1045
1046 "requested_send_buffer_size", cflow[id].settings[DESTINATION].requested_send_buffer_size,
1047 "requested_read_buffer_size", cflow[id].settings[DESTINATION].requested_read_buffer_size,
1048
1049 "maximum_block_size", cflow[id].settings[DESTINATION].maximum_block_size,
1050
1051 "traffic_dump", cflow[id].settings[DESTINATION].traffic_dump,
1052 "so_debug", cflow[id].settings[DESTINATION].so_debug,
1053 "route_record", (int)cflow[id].settings[DESTINATION].route_record,
1054 "pushy", cflow[id].settings[DESTINATION].pushy,
1055 "shutdown", (int)cflow[id].shutdown,
1056
1057 "write_rate", cflow[id].settings[DESTINATION].write_rate,
1058 "random_seed",cflow[id].random_seed,
1059
1060 "traffic_generation_request_distribution", cflow[id].settings[DESTINATION].request_trafgen_options.distribution,
1061 "traffic_generation_request_param_one", cflow[id].settings[DESTINATION].request_trafgen_options.param_one,
1062 "traffic_generation_request_param_two", cflow[id].settings[DESTINATION].request_trafgen_options.param_two,
1063
1064 "traffic_generation_response_distribution", cflow[id].settings[DESTINATION].response_trafgen_options.distribution,
1065 "traffic_generation_response_param_one", cflow[id].settings[DESTINATION].response_trafgen_options.param_one,
1066 "traffic_generation_response_param_two", cflow[id].settings[DESTINATION].response_trafgen_options.param_two,
1067
1068 "traffic_generation_gap_distribution", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.distribution,
1069 "traffic_generation_gap_param_one", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.param_one,
1070 "traffic_generation_gap_param_two", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.param_two,
1071
1072 "flow_control", cflow[id].settings[DESTINATION].flow_control,
1073 "byte_counting", cflow[id].byte_counting,
1074 "cork", (int)cflow[id].settings[DESTINATION].cork,
1075 "nonagle", cflow[id].settings[DESTINATION].nonagle,
1076
1077 "cc_alg", cflow[id].settings[DESTINATION].cc_alg,
1078
1079 "elcn", cflow[id].settings[DESTINATION].elcn,
1080 "lcd", cflow[id].settings[DESTINATION].lcd,
1081 "mtcp", cflow[id].settings[DESTINATION].mtcp,
1082 "dscp", (int)cflow[id].settings[DESTINATION].dscp,
1083 "ipmtudiscover", cflow[id].settings[DESTINATION].ipmtudiscover,
1084 "dump_prefix", copt.dump_prefix,
1085 "num_extra_socket_options", cflow[id].settings[DESTINATION].num_extra_socket_options,
1086 "extra_socket_options", extra_options);
1087
1088 die_if_fault_occurred(&rpc_env);
1089
1090 xmlrpc_parse_value(&rpc_env, resultP, "{s:i,s:i,s:i,s:i,*}",
1091 "flow_id", &cflow[id].endpoint_id[DESTINATION],
1092 "listen_data_port", &listen_data_port,
1093 "real_listen_send_buffer_size", &cflow[id].endpoint[DESTINATION].send_buffer_size_real,
1094 "real_listen_read_buffer_size", &cflow[id].endpoint[DESTINATION].receive_buffer_size_real);
1095 die_if_fault_occurred(&rpc_env);
1096
1097 if (resultP)
1098 xmlrpc_DECREF(resultP);
1099
1100 /* Contruct extra socket options array */
1101 extra_options = xmlrpc_array_new(&rpc_env);
1102 for (int i = 0; i < cflow[id].settings[SOURCE].num_extra_socket_options; i++) {
1103
1104 xmlrpc_value *value;
1105 xmlrpc_value *option = xmlrpc_build_value(&rpc_env, "{s:i,s:i}",
1106 "level", cflow[id].settings[SOURCE].extra_socket_options[i].level,
1107 "optname", cflow[id].settings[SOURCE].extra_socket_options[i].optname);
1108
1109 value = xmlrpc_base64_new(&rpc_env, cflow[id].settings[SOURCE].extra_socket_options[i].optlen, (unsigned char*)cflow[id].settings[SOURCE].extra_socket_options[i].optval);
1110
1111 xmlrpc_struct_set_value(&rpc_env, option, "value", value);
1112
1113 xmlrpc_array_append_item(&rpc_env, extra_options, option);
1114 xmlrpc_DECREF(value);
1115 xmlrpc_DECREF(option);
1116 }
1117 DEBUG_MSG(LOG_WARNING, "prepare flow %d source", id);
1118
1119 xmlrpc_client_call2f(&rpc_env, rpc_client,
1120 cflow[id].endpoint[SOURCE].rpc_info->server_url,
1121 "add_flow_source", &resultP,
1122 "("
1123 "{s:s}"
1124 "{s:i}"
1125 "{s:d,s:d,s:d,s:d,s:d}"
1126 "{s:i,s:i}"
1127 "{s:i}"
1128 "{s:b,s:b,s:b,s:b,s:b}"
1129 "{s:i,s:i}"
1130 "{s:i,s:d,s:d}" /* request */
1131 "{s:i,s:d,s:d}" /* response */
1132 "{s:i,s:d,s:d}" /* interpacket_gap */
1133 "{s:b,s:b,s:i,s:i}"
1134 "{s:s}"
1135 "{s:i,s:i,s:i,s:i,s:i}"
1136 "{s:s}"
1137 "{s:i,s:A}"
1138 "{s:s,s:i,s:i}"
1139 ")",
1140
1141 /* general flow settings */
1142 "bind_address", cflow[id].endpoint[SOURCE].test_address,
1143
1144 "flow_id",id,
1145
1146 "write_delay", cflow[id].settings[SOURCE].delay[WRITE],
1147 "write_duration", cflow[id].settings[SOURCE].duration[WRITE],
1148 "read_delay", cflow[id].settings[DESTINATION].delay[WRITE],
1149 "read_duration", cflow[id].settings[DESTINATION].duration[WRITE],
1150 "reporting_interval", cflow[id].summarize_only ? 0 : copt.reporting_interval,
1151
1152 "requested_send_buffer_size", cflow[id].settings[SOURCE].requested_send_buffer_size,
1153 "requested_read_buffer_size", cflow[id].settings[SOURCE].requested_read_buffer_size,
1154
1155 "maximum_block_size", cflow[id].settings[SOURCE].maximum_block_size,
1156
1157 "traffic_dump", cflow[id].settings[SOURCE].traffic_dump,
1158 "so_debug", cflow[id].settings[SOURCE].so_debug,
1159 "route_record", (int)cflow[id].settings[SOURCE].route_record,
1160 "pushy", cflow[id].settings[SOURCE].pushy,
1161 "shutdown", (int)cflow[id].shutdown,
1162
1163 "write_rate", cflow[id].settings[SOURCE].write_rate,
1164 "random_seed",cflow[id].random_seed,
1165
1166 "traffic_generation_request_distribution", cflow[id].settings[SOURCE].request_trafgen_options.distribution,
1167 "traffic_generation_request_param_one", cflow[id].settings[SOURCE].request_trafgen_options.param_one,
1168 "traffic_generation_request_param_two", cflow[id].settings[SOURCE].request_trafgen_options.param_two,
1169
1170 "traffic_generation_response_distribution", cflow[id].settings[SOURCE].response_trafgen_options.distribution,
1171 "traffic_generation_response_param_one", cflow[id].settings[SOURCE].response_trafgen_options.param_one,
1172 "traffic_generation_response_param_two", cflow[id].settings[SOURCE].response_trafgen_options.param_two,
1173
1174 "traffic_generation_gap_distribution", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.distribution,
1175 "traffic_generation_gap_param_one", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.param_one,
1176 "traffic_generation_gap_param_two", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.param_two,
1177
1178
1179 "flow_control", cflow[id].settings[SOURCE].flow_control,
1180 "byte_counting", cflow[id].byte_counting,
1181 "cork", (int)cflow[id].settings[SOURCE].cork,
1182 "nonagle", (int)cflow[id].settings[SOURCE].nonagle,
1183
1184 "cc_alg", cflow[id].settings[SOURCE].cc_alg,
1185
1186 "elcn", cflow[id].settings[SOURCE].elcn,
1187 "lcd", cflow[id].settings[SOURCE].lcd,
1188 "mtcp", cflow[id].settings[SOURCE].mtcp,
1189 "dscp", (int)cflow[id].settings[SOURCE].dscp,
1190 "ipmtudiscover", cflow[id].settings[SOURCE].ipmtudiscover,
1191 "dump_prefix", copt.dump_prefix,
1192 "num_extra_socket_options", cflow[id].settings[SOURCE].num_extra_socket_options,
1193 "extra_socket_options", extra_options,
1194
1195 /* source settings */
1196 "destination_address", cflow[id].endpoint[DESTINATION].test_address,
1197 "destination_port", listen_data_port,
1198 "late_connect", (int)cflow[id].late_connect);
1199 die_if_fault_occurred(&rpc_env);
1200
1201 xmlrpc_DECREF(extra_options);
1202
1203 xmlrpc_parse_value(&rpc_env, resultP, "{s:i,s:i,s:i,*}",
1204 "flow_id", &cflow[id].endpoint_id[SOURCE],
1205 "real_send_buffer_size", &cflow[id].endpoint[SOURCE].send_buffer_size_real,
1206 "real_read_buffer_size", &cflow[id].endpoint[SOURCE].receive_buffer_size_real);
1207 die_if_fault_occurred(&rpc_env);
1208
1209 if (resultP)
1210 xmlrpc_DECREF(resultP);
1211 DEBUG_MSG(LOG_WARNING, "prepare flow %d completed", id);
1212 }
1213
1214 /**
1215 * Prepare test connection for all flows in a test
1216 *
1217 * @param[in,out] rpc_client to connect controller to daemon
1218 */
prepare_all_flows(xmlrpc_client * rpc_client)1219 static void prepare_all_flows(xmlrpc_client *rpc_client)
1220 {
1221 /* prepare flows */
1222 for (unsigned short id = 0; id < copt.num_flows; id++) {
1223 if (sigint_caught)
1224 return;
1225 prepare_flow(id, rpc_client);
1226 }
1227 }
1228
1229 /**
1230 * Start test connections for all flows in a test
1231 *
1232 * All the test connection are started, but test connection flow in the
1233 * controller and in daemon are different. In the controller, test connection
1234 * are respective to number of flows in a test,but in daemons test connection
1235 * are respective to flow endpoints. Single daemons can maintain multiple flows
1236 * endpoints, So controller should start a daemon only once.
1237 *
1238 * @param[in,out] rpc_client to connect controller to daemon
1239 */
start_all_flows(xmlrpc_client * rpc_client)1240 static void start_all_flows(xmlrpc_client *rpc_client)
1241 {
1242 xmlrpc_value * resultP = 0;
1243
1244 struct timespec lastreport_end;
1245 struct timespec lastreport_begin;
1246 struct timespec now;
1247
1248 gettime(&lastreport_end);
1249 gettime(&lastreport_begin);
1250 gettime(&now);
1251
1252 const struct list_node *node = fg_list_front(&unique_daemons);
1253 while (node) {
1254 if (sigint_caught)
1255 return;
1256 struct daemon *daemon = node->data;
1257 node = node->next;
1258
1259 DEBUG_MSG(LOG_ERR, "starting flow on server with UUID %s",daemon->uuid);
1260 xmlrpc_client_call2f(&rpc_env, rpc_client,
1261 daemon->url,
1262 "start_flows", &resultP, "({s:i})",
1263 "start_timestamp", now.tv_sec + 2);
1264 die_if_fault_occurred(&rpc_env);
1265 if (resultP)
1266 xmlrpc_DECREF(resultP);
1267 }
1268
1269 active_flows = copt.num_flows;
1270
1271 /* Reports are fetched from the daemons based on the
1272 * report interval duration */
1273 while (!sigint_caught) {
1274 if ( time_diff_now(&lastreport_begin) < copt.reporting_interval ) {
1275 usleep(copt.reporting_interval - time_diff(&lastreport_begin,&lastreport_end) );
1276 continue;
1277 }
1278 gettime(&lastreport_begin);
1279 fetch_reports(rpc_client);
1280 gettime(&lastreport_end);
1281
1282 /* All flows have ended */
1283 if (active_flows < 1)
1284 return;
1285 }
1286 }
1287
1288 /**
1289 * Reports are fetched from the flow endpoint daemon.
1290 *
1291 * Single daemon can maintain multiple flows endpoints and daemons combine all
1292 * its flows reports and send them to the controller. So controller should call
1293 * a daemon in its flows only once.
1294 *
1295 * @param[in,out] rpc_client to connect controller to daemon
1296 */
fetch_reports(xmlrpc_client * rpc_client)1297 static void fetch_reports(xmlrpc_client *rpc_client)
1298 {
1299
1300 xmlrpc_value * resultP = 0;
1301 const struct list_node *node = fg_list_front(&unique_daemons);
1302
1303 while (node) {
1304 struct daemon *daemon = node->data;
1305 node = node->next;
1306 int array_size, has_more;
1307 xmlrpc_value *rv = 0;
1308
1309 has_more_reports:
1310
1311 xmlrpc_client_call2f(&rpc_env, rpc_client, daemon->url,
1312 "get_reports", &resultP, "()");
1313 if (rpc_env.fault_occurred) {
1314 errx("XML-RPC fault: %s (%d)", rpc_env.fault_string,
1315 rpc_env.fault_code);
1316 continue;
1317 }
1318
1319 if (!resultP)
1320 continue;
1321
1322 array_size = xmlrpc_array_size(&rpc_env, resultP);
1323 if (!array_size) {
1324 warnx("empty array in get_reports reply");
1325 continue;
1326 }
1327
1328 xmlrpc_array_read_item(&rpc_env, resultP, 0, &rv);
1329 xmlrpc_read_int(&rpc_env, rv, &has_more);
1330 if (rpc_env.fault_occurred) {
1331 errx("XML-RPC fault: %s (%d)", rpc_env.fault_string,
1332 rpc_env.fault_code);
1333 xmlrpc_DECREF(rv);
1334 continue;
1335 }
1336 xmlrpc_DECREF(rv);
1337
1338 for (int i = 1; i < array_size; i++) {
1339 xmlrpc_value *rv = 0;
1340
1341 xmlrpc_array_read_item(&rpc_env, resultP, i, &rv);
1342 if (rv) {
1343 struct report report;
1344 int begin_sec, begin_nsec, end_sec, end_nsec;
1345 int tcpi_snd_cwnd;
1346 int tcpi_snd_ssthresh;
1347 int tcpi_unacked;
1348 int tcpi_sacked;
1349 int tcpi_lost;
1350 int tcpi_retrans;
1351 int tcpi_retransmits;
1352 int tcpi_fackets;
1353 int tcpi_reordering;
1354 int tcpi_rtt;
1355 int tcpi_rttvar;
1356 int tcpi_rto;
1357 int tcpi_backoff;
1358 int tcpi_ca_state;
1359 int tcpi_snd_mss;
1360 int bytes_read_low, bytes_read_high;
1361 int bytes_written_low, bytes_written_high;
1362
1363 xmlrpc_decompose_value(&rpc_env, rv,
1364 "("
1365 "{s:i,s:i,s:i,s:i,s:i,s:i,s:i,*}" /* Report data & timeval */
1366 "{s:i,s:i,s:i,s:i,*}" /* bytes */
1367 "{s:i,s:i,s:i,s:i,*}" /* blocks */
1368 "{s:d,s:d,s:d,s:d,s:d,s:d,s:d,s:d,s:d,*}" /* RTT, IAT, Delay */
1369 "{s:i,s:i,*}" /* MTU */
1370 "{s:i,s:i,s:i,s:i,s:i,*}" /* TCP info */
1371 "{s:i,s:i,s:i,s:i,s:i,*}" /* ... */
1372 "{s:i,s:i,s:i,s:i,s:i,*}" /* ... */
1373 "{s:i,*}"
1374 ")",
1375
1376 "id", &report.id,
1377 "endpoint", &report.endpoint,
1378 "type", &report.type,
1379 "begin_tv_sec", &begin_sec,
1380 "begin_tv_nsec", &begin_nsec,
1381 "end_tv_sec", &end_sec,
1382 "end_tv_nsec", &end_nsec,
1383
1384 "bytes_read_high", &bytes_read_high,
1385 "bytes_read_low", &bytes_read_low,
1386 "bytes_written_high", &bytes_written_high,
1387 "bytes_written_low", &bytes_written_low,
1388
1389 "request_blocks_read", &report.request_blocks_read,
1390 "request_blocks_written", &report.request_blocks_written,
1391 "response_blocks_read", &report.response_blocks_read,
1392 "response_blocks_written", &report.response_blocks_written,
1393
1394 "rtt_min", &report.rtt_min,
1395 "rtt_max", &report.rtt_max,
1396 "rtt_sum", &report.rtt_sum,
1397 "iat_min", &report.iat_min,
1398 "iat_max", &report.iat_max,
1399 "iat_sum", &report.iat_sum,
1400 "delay_min", &report.delay_min,
1401 "delay_max", &report.delay_max,
1402 "delay_sum", &report.delay_sum,
1403
1404 "pmtu", &report.pmtu,
1405 "imtu", &report.imtu,
1406
1407 "tcpi_snd_cwnd", &tcpi_snd_cwnd,
1408 "tcpi_snd_ssthresh", &tcpi_snd_ssthresh,
1409 "tcpi_unacked", &tcpi_unacked,
1410 "tcpi_sacked", &tcpi_sacked,
1411 "tcpi_lost", &tcpi_lost,
1412
1413 "tcpi_retrans", &tcpi_retrans,
1414 "tcpi_retransmits", &tcpi_retransmits,
1415 "tcpi_fackets", &tcpi_fackets,
1416 "tcpi_reordering", &tcpi_reordering,
1417 "tcpi_rtt", &tcpi_rtt,
1418
1419 "tcpi_rttvar", &tcpi_rttvar,
1420 "tcpi_rto", &tcpi_rto,
1421 "tcpi_backoff", &tcpi_backoff,
1422 "tcpi_ca_state", &tcpi_ca_state,
1423 "tcpi_snd_mss", &tcpi_snd_mss,
1424
1425 "status", &report.status
1426 );
1427 xmlrpc_DECREF(rv);
1428 #ifdef HAVE_UNSIGNED_LONG_LONG_INT
1429 report.bytes_read = ((long long)bytes_read_high << 32) + (uint32_t)bytes_read_low;
1430 report.bytes_written = ((long long)bytes_written_high << 32) + (uint32_t)bytes_written_low;
1431 #else /* HAVE_UNSIGNED_LONG_LONG_INT */
1432 report.bytes_read = (uint32_t)bytes_read_low;
1433 report.bytes_written = (uint32_t)bytes_written_low;
1434 #endif /* HAVE_UNSIGNED_LONG_LONG_INT */
1435
1436 /* FIXME Kernel metrics (tcp_info). Other OS than
1437 * Linux may not send valid values here. For
1438 * the moment we don't care and handle this in
1439 * the output/display routines. However, this
1440 * do not work in heterogeneous environments */
1441 report.tcp_info.tcpi_snd_cwnd = tcpi_snd_cwnd;
1442 report.tcp_info.tcpi_snd_ssthresh = tcpi_snd_ssthresh;
1443 report.tcp_info.tcpi_unacked = tcpi_unacked;
1444 report.tcp_info.tcpi_sacked = tcpi_sacked;
1445 report.tcp_info.tcpi_lost = tcpi_lost;
1446 report.tcp_info.tcpi_retrans = tcpi_retrans;
1447 report.tcp_info.tcpi_retransmits = tcpi_retransmits;
1448 report.tcp_info.tcpi_fackets = tcpi_fackets;
1449 report.tcp_info.tcpi_reordering = tcpi_reordering;
1450 report.tcp_info.tcpi_rtt = tcpi_rtt;
1451 report.tcp_info.tcpi_rttvar = tcpi_rttvar;
1452 report.tcp_info.tcpi_rto = tcpi_rto;
1453 report.tcp_info.tcpi_backoff = tcpi_backoff;
1454 report.tcp_info.tcpi_ca_state = tcpi_ca_state;
1455 report.tcp_info.tcpi_snd_mss = tcpi_snd_mss;
1456
1457 report.begin.tv_sec = begin_sec;
1458 report.begin.tv_nsec = begin_nsec;
1459 report.end.tv_sec = end_sec;
1460 report.end.tv_nsec = end_nsec;
1461
1462 report_flow(&report);
1463 }
1464 }
1465 xmlrpc_DECREF(resultP);
1466
1467 if (has_more)
1468 goto has_more_reports;
1469 }
1470 }
1471
1472 /**
1473 * Reports are fetched from the flow endpoint daemon
1474 *
1475 * Single daemon can maintain multiple flows endpoints and daemons combine all
1476 * it flows report and send the controller. So controller give the flow ID to
1477 * daemons, while prepare the flow.Controller flow ID is maintained by the
1478 * daemons to maintain its flow endpoints.So When getting back the reports from
1479 * the daemons, the controller use those flow ID registered for the daemon in
1480 * the prepare flow as reference to distinguish the @p report.
1481 * The daemon also send back the details regarding flow endpoints
1482 * i.e. source or destination. So this information is also used by the daemons
1483 * to distinguish the report in the report flow.
1484 *
1485 * @param[in] report report from the daemon
1486 */
report_flow(struct report * report)1487 static void report_flow(struct report* report)
1488 {
1489 int *i = NULL;
1490 unsigned short id;
1491 struct cflow *f = NULL;
1492
1493 /* Get matching flow for report */
1494 /* TODO Maybe just use compare daemon pointers? */
1495 for (id = 0; id < copt.num_flows; id++) {
1496 f = &cflow[id];
1497
1498 foreach(i, SOURCE, DESTINATION)
1499 if (f->endpoint_id[*i] == report->id &&
1500 *i == (int)report->endpoint)
1501 goto exit_outer_loop;
1502 }
1503 exit_outer_loop:
1504
1505 if (f->start_timestamp[*i].tv_sec == 0)
1506 f->start_timestamp[*i] = report->begin;
1507
1508 if (report->type == FINAL) {
1509 DEBUG_MSG(LOG_DEBUG, "received final report for flow %d", id);
1510 /* Final report, keep it for later */
1511 free(f->final_report[*i]);
1512 f->final_report[*i] = malloc(sizeof(struct report));
1513 *f->final_report[*i] = *report;
1514
1515 if (!f->finished[*i]) {
1516 f->finished[*i] = 1;
1517 if (f->finished[1 - *i]) {
1518 active_flows--;
1519 DEBUG_MSG(LOG_DEBUG, "remaining active flows: "
1520 "%d", active_flows);
1521 assert(active_flows >= 0);
1522 }
1523 }
1524 return;
1525 }
1526 print_interval_report(id, *i, report);
1527 }
1528
1529 /**
1530 * Stop test connections for all flows in a test
1531 *
1532 * All the test connection are stopped, but the test connection flow in the
1533 * controller and in daemon are different. In the controller, test connection
1534 * are respective to number of flows in a test,but in daemons test connection
1535 * are respective to flow endpoints. Single daemons can maintain multiple flows
1536 * endpoints, So controller should stop a daemon only once.
1537 */
close_all_flows(void)1538 static void close_all_flows(void)
1539 {
1540 xmlrpc_env env;
1541 xmlrpc_client *client;
1542
1543 for (unsigned short id = 0; id < copt.num_flows; id++) {
1544 DEBUG_MSG(LOG_WARNING, "closing flow %u", id);
1545
1546 if (cflow[id].finished[SOURCE] && cflow[id].finished[DESTINATION])
1547 continue;
1548
1549 /* We use new env and client, old one might be in fault condition */
1550 xmlrpc_env_init(&env);
1551 xmlrpc_client_create(&env, XMLRPC_CLIENT_NO_FLAGS, "Flowgrind", FLOWGRIND_VERSION, NULL, 0, &client);
1552 die_if_fault_occurred(&env);
1553 xmlrpc_env_clean(&env);
1554
1555 foreach(int *i, SOURCE, DESTINATION) {
1556 xmlrpc_value * resultP = 0;
1557
1558 if (cflow[id].endpoint_id[*i] == -1 ||
1559 cflow[id].finished[*i])
1560 /* Endpoint does not need closing */
1561 continue;
1562
1563 cflow[id].finished[*i] = 1;
1564
1565 xmlrpc_env_init(&env);
1566 xmlrpc_client_call2f(&env, client,
1567 cflow[id].endpoint[*i].rpc_info->server_url,
1568 "stop_flow", &resultP, "({s:i})",
1569 "flow_id", cflow[id].endpoint_id[*i]);
1570 if (resultP)
1571 xmlrpc_DECREF(resultP);
1572
1573 xmlrpc_env_clean(&env);
1574 }
1575
1576 if (active_flows > 0)
1577 active_flows--;
1578
1579 xmlrpc_client_destroy(client);
1580 DEBUG_MSG(LOG_WARNING, "closed flow %u", id);
1581 }
1582 }
1583
1584 /**
1585 * Determines the length of the integer part of a decimal number.
1586 *
1587 * @param[in] value decimal number
1588 * @return length of integer part
1589 */
det_num_digits(double value)1590 inline static size_t det_num_digits(double value)
1591 {
1592 /* Avoiding divide-by-zero */
1593 if (unlikely((int)value == 0))
1594 return 1;
1595 else
1596 return floor(log10(abs((int)value))) + 1;
1597 }
1598
1599 /**
1600 * Scale the given throughput @p thruput in either Mebibyte per seconds or in
1601 * Megabits per seconds.
1602 *
1603 * @param[in] thruput throughput in byte per seconds
1604 * @return scaled throughput in MiB/s or Mb/s
1605 */
scale_thruput(double thruput)1606 inline static double scale_thruput(double thruput)
1607 {
1608 if (copt.mbyte)
1609 return thruput / (1<<20);
1610 return thruput / 1e6 * (1<<3);
1611 }
1612
1613 /**
1614 * Determines if the current column width @p column_width is larger or smaller
1615 * than the old one and updates the state of column @p column accordingly.
1616 *
1617 * @param[in,out] column column that state to be updated
1618 * @param[in] column_width current column width
1619 * @return true if column state has been updated, false otherwise
1620 */
update_column_width(struct column * column,unsigned column_width)1621 static bool update_column_width(struct column *column, unsigned column_width)
1622 {
1623 /* True if column width has changed */
1624 bool has_changed = false;
1625
1626 if (column->state.last_width < column_width) {
1627 /* Column too small */
1628 has_changed = true;
1629 column->state.last_width = column_width;
1630 column->state.oversized = 0;
1631 } else if (column->state.last_width > 1 + column_width) {
1632 /* Column too big */
1633 if (column->state.oversized >= MAX_COLUM_TOO_LARGE) {
1634 /* Column too big for quite a while */
1635 has_changed = true;
1636 column->state.last_width = column_width;
1637 column->state.oversized = 0;
1638 } else {
1639 (column->state.oversized)++;
1640 }
1641 } else {
1642 /* This size was needed, keep it */
1643 column->state.oversized = 0;
1644 }
1645
1646 return has_changed;
1647 }
1648
1649 /**
1650 * Append measured data for interval report column @p column_id to given strings.
1651 *
1652 * For the intermediated interval report column @p column_id, append measured
1653 * data @p value to the destination data string @p data, and the name and unit
1654 * of intermediated interval column header to @p header1 and @p header2.
1655 *
1656 * @param[in,out] header1 1st header string (name) to append to
1657 * @param[in,out] header2 2nd header string (unit) to append to
1658 * @param[in,out] data data value string to append to
1659 * @param[in] column_id ID of intermediated interval report column
1660 * @param[in] value measured data string to be append
1661 * @return true if column width has changed, false otherwise
1662 */
print_column_str(char ** header1,char ** header2,char ** data,enum column_id column_id,char * value)1663 static bool print_column_str(char **header1, char **header2, char **data,
1664 enum column_id column_id, char* value)
1665 {
1666 /* Only for convenience */
1667 struct column *column = &column_info[column_id];
1668
1669 if (!column->state.visible)
1670 return false;
1671
1672 /* Get max column width */
1673 unsigned data_len = strlen(value);
1674 unsigned header_len = MAX(strlen(column->header.name),
1675 strlen(column->header.unit));
1676 unsigned column_width = MAX(data_len, header_len);
1677
1678 /* Check if column width has changed */
1679 bool has_changed = update_column_width(column, column_width);
1680
1681 /* Create format specifiers of right length */
1682 char *fmt_str = NULL;
1683 const size_t width = column->state.last_width;
1684 if (asprintf(&fmt_str, "%%%zus", width + GUARDBAND) == -1)
1685 critx("could not allocate memory for interval report");
1686
1687 /* Print data, 1st and 2nd header row */
1688 asprintf_append(data, fmt_str, value);
1689 asprintf_append(header1, fmt_str, column->header.name);
1690 asprintf_append(header2, fmt_str, column->header.unit);
1691
1692 free(fmt_str);
1693 return has_changed;
1694 }
1695
1696 /**
1697 * Append measured data for interval report column @p column_id to given strings.
1698 *
1699 * For the intermediated interval report column @p column_id, append measured
1700 * data @p value to the destination data string @p data, and the name and unit
1701 * of intermediated interval column header to @p header1 and @p header2.
1702 *
1703 * @param[in,out] header1 1st header string (name) to append to
1704 * @param[in,out] header2 2nd header string (unit) to append to
1705 * @param[in,out] data data value string to append to
1706 * @param[in] column_id ID of intermediated interval report column
1707 * @param[in] value measured data value to be append
1708 * @param[in] accuracy number of decimal places to be append
1709 * @return true if column width has changed, false otherwise
1710 */
print_column(char ** header1,char ** header2,char ** data,enum column_id column_id,double value,unsigned accuracy)1711 static bool print_column(char **header1, char **header2, char **data,
1712 enum column_id column_id, double value,
1713 unsigned accuracy)
1714 {
1715 /* Print symbolic values instead of numbers */
1716 if (copt.symbolic) {
1717 switch ((int)value) {
1718 case INT_MAX:
1719 return print_column_str(header1, header2, data,
1720 column_id, "INT_MAX");
1721 case USHRT_MAX:
1722 return print_column_str(header1, header2, data,
1723 column_id, "USHRT_MAX");
1724 case UINT_MAX:
1725 return print_column_str(header1, header2, data,
1726 column_id, "UINT_MAX");
1727 }
1728 }
1729
1730 /* Only for convenience */
1731 struct column *column = &column_info[column_id];
1732
1733 if (!column->state.visible)
1734 return false;
1735
1736 /* Get max column width */
1737 unsigned data_len = det_num_digits(value) + (accuracy ? accuracy + 1 : 0);
1738 unsigned header_len = MAX(strlen(column->header.name),
1739 strlen(column->header.unit));
1740 unsigned column_width = MAX(data_len, header_len);
1741
1742 /* Check if column width has changed */
1743 bool has_changed = update_column_width(column, column_width);
1744
1745 /* Create format specifiers of right length */
1746 char *fmt_num = NULL, *fmt_str = NULL;
1747 const size_t width = column->state.last_width;
1748 if (asprintf(&fmt_num, "%%%zu.%df", width + GUARDBAND, accuracy) == -1 ||
1749 asprintf(&fmt_str, "%%%zus", width + GUARDBAND) == -1)
1750 critx("could not allocate memory for interval report");
1751
1752 /* Print data, 1st and 2nd header row */
1753 asprintf_append(data, fmt_num, value);
1754 asprintf_append(header1, fmt_str, column->header.name);
1755 asprintf_append(header2, fmt_str, column->header.unit);
1756
1757 free_all(fmt_num, fmt_str);
1758 return has_changed;
1759 }
1760
1761 /**
1762 * Print interval report @p report for endpoint @p e of flow @p flow_id.
1763 *
1764 * In addition, if the width of one intermediated interval report columns has
1765 * been changed, the interval column header will be printed again.
1766 *
1767 * @param[in] flow_id flow an interval report will be created for
1768 * @param[in] e flow endpoint (SOURCE or DESTINATION)
1769 * @param[in] report interval report to be printed
1770 */
print_interval_report(unsigned short flow_id,enum endpoint_t e,struct report * report)1771 static void print_interval_report(unsigned short flow_id, enum endpoint_t e,
1772 struct report *report)
1773 {
1774 /* Whether or not column width has been changed */
1775 bool changed = false;
1776 /* 1st header row, 2nd header row, and the actual measured data */
1777 char *header1 = NULL, *header2 = NULL, *data = NULL;
1778
1779 /* Flow ID and endpoint (source or destination) */
1780 if (asprintf(&header1, "%s", column_info[COL_FLOW_ID].header.name) == -1 ||
1781 asprintf(&header2, "%s", column_info[COL_FLOW_ID].header.unit) == -1 ||
1782 asprintf(&data, "%s%3d", e ? "D" : "S", flow_id) == -1)
1783 critx("could not allocate memory for interval report");
1784
1785 /* Calculate time */
1786 double diff_first_last = time_diff(&cflow[flow_id].start_timestamp[e],
1787 &report->begin);
1788 double diff_first_now = time_diff(&cflow[flow_id].start_timestamp[e],
1789 &report->end);
1790 changed |= print_column(&header1, &header2, &data, COL_BEGIN,
1791 diff_first_last, 3);
1792 changed |= print_column(&header1, &header2, &data, COL_END,
1793 diff_first_now, 3);
1794
1795 /* Throughput */
1796 double thruput = (double)report->bytes_written /
1797 (diff_first_now - diff_first_last);
1798 thruput = scale_thruput(thruput);
1799 changed |= print_column(&header1, &header2, &data, COL_THROUGH,
1800 thruput, 6);
1801
1802 /* Transactions */
1803 double transac = (double)report->response_blocks_read /
1804 (diff_first_now - diff_first_last);
1805 changed |= print_column(&header1, &header2, &data, COL_TRANSAC,
1806 transac, 2);
1807
1808 /* Blocks */
1809 changed |= print_column(&header1, &header2, &data, COL_BLOCK_REQU,
1810 report->request_blocks_written, 0);
1811 changed |= print_column(&header1, &header2, &data, COL_BLOCK_RESP,
1812 report->response_blocks_written, 0);
1813
1814 /* RTT */
1815 double rtt_avg = 0.0;
1816 if (report->response_blocks_read && report->rtt_sum)
1817 rtt_avg = report->rtt_sum /
1818 (double)(report->response_blocks_read);
1819 else
1820 report->rtt_min = report->rtt_max = rtt_avg = INFINITY;
1821 changed |= print_column(&header1, &header2, &data, COL_RTT_MIN,
1822 report->rtt_min * 1e3, 3);
1823 changed |= print_column(&header1, &header2, &data, COL_RTT_AVG,
1824 rtt_avg * 1e3, 3);
1825 changed |= print_column(&header1, &header2, &data, COL_RTT_MAX,
1826 report->rtt_max * 1e3, 3);
1827
1828 /* IAT */
1829 double iat_avg = 0.0;
1830 if (report->request_blocks_read && report->iat_sum)
1831 iat_avg = report->iat_sum /
1832 (double)(report->request_blocks_read);
1833 else
1834 report->iat_min = report->iat_max = iat_avg = INFINITY;
1835 changed |= print_column(&header1, &header2, &data, COL_IAT_MIN,
1836 report->rtt_min * 1e3, 3);
1837 changed |= print_column(&header1, &header2, &data, COL_IAT_AVG,
1838 iat_avg * 1e3, 3);
1839 changed |= print_column(&header1, &header2, &data, COL_IAT_MAX,
1840 report->iat_max * 1e3, 3);
1841
1842 /* Delay */
1843 double delay_avg = 0.0;
1844 if (report->request_blocks_read && report->delay_sum)
1845 delay_avg = report->delay_sum /
1846 (double)(report->request_blocks_read);
1847 else
1848 report->delay_min = report->delay_max = delay_avg = INFINITY;
1849 changed |= print_column(&header1, &header2, &data, COL_DLY_MIN,
1850 report->delay_min * 1e3, 3);
1851 changed |= print_column(&header1, &header2, &data, COL_DLY_AVG,
1852 delay_avg * 1e3, 3);
1853 changed |= print_column(&header1, &header2, &data, COL_DLY_MAX,
1854 report->delay_max * 1e3, 3);
1855
1856 /* TCP info struct */
1857 changed |= print_column(&header1, &header2, &data, COL_TCP_CWND,
1858 report->tcp_info.tcpi_snd_cwnd, 0);
1859 changed |= print_column(&header1, &header2, &data, COL_TCP_SSTH,
1860 report->tcp_info.tcpi_snd_ssthresh, 0);
1861 changed |= print_column(&header1, &header2, &data, COL_TCP_UACK,
1862 report->tcp_info.tcpi_unacked, 0);
1863 changed |= print_column(&header1, &header2, &data, COL_TCP_SACK,
1864 report->tcp_info.tcpi_sacked, 0);
1865 changed |= print_column(&header1, &header2, &data, COL_TCP_LOST,
1866 report->tcp_info.tcpi_lost, 0);
1867 changed |= print_column(&header1, &header2, &data, COL_TCP_RETR,
1868 report->tcp_info.tcpi_retrans, 0);
1869 changed |= print_column(&header1, &header2, &data, COL_TCP_TRET,
1870 report->tcp_info.tcpi_retransmits, 0);
1871 changed |= print_column(&header1, &header2, &data, COL_TCP_FACK,
1872 report->tcp_info.tcpi_fackets, 0);
1873 changed |= print_column(&header1, &header2, &data, COL_TCP_REOR,
1874 report->tcp_info.tcpi_reordering, 0);
1875 changed |= print_column(&header1, &header2, &data, COL_TCP_BKOF,
1876 report->tcp_info.tcpi_backoff, 0);
1877 changed |= print_column(&header1, &header2, &data, COL_TCP_RTT,
1878 report->tcp_info.tcpi_rtt / 1e3, 1);
1879 changed |= print_column(&header1, &header2, &data, COL_TCP_RTTVAR,
1880 report->tcp_info.tcpi_rttvar / 1e3, 1);
1881 changed |= print_column(&header1, &header2, &data, COL_TCP_RTO,
1882 report->tcp_info.tcpi_rto / 1e3, 1);
1883
1884 /* TCP CA state */
1885 char *ca_state = NULL;
1886 switch (report->tcp_info.tcpi_ca_state) {
1887 case TCP_CA_Open:
1888 ca_state = "open";
1889 break;
1890 case TCP_CA_Disorder:
1891 ca_state = "disorder";
1892 break;
1893 case TCP_CA_CWR:
1894 ca_state = "cwr";
1895 break;
1896 case TCP_CA_Recovery:
1897 ca_state = "recover";
1898 break;
1899 case TCP_CA_Loss:
1900 ca_state = "loss";
1901 break;
1902 default:
1903 ca_state = "unknown";
1904 }
1905 changed |= print_column_str(&header1, &header2, &data,
1906 COL_TCP_CA_STATE, ca_state);
1907
1908 /* SMSS & PMTU */
1909 changed |= print_column(&header1, &header2, &data, COL_SMSS,
1910 report->tcp_info.tcpi_snd_mss, 0);
1911 changed |= print_column(&header1, &header2, &data, COL_PMTU,
1912 report->pmtu, 0);
1913
1914 /* Internal flowgrind state */
1915 #ifdef DEBUG
1916 int rc = 0;
1917 char *fg_state = NULL;
1918 if (cflow[flow_id].finished[e]) {
1919 rc = asprintf(&fg_state, "(stopped)");
1920 } else {
1921 /* Write status */
1922 char ws = (char)(report->status & 0xFF);
1923 if (ws != 'd' || ws != 'l' || ws != 'o' || ws != 'f' ||
1924 ws != 'c' || ws != 'n')
1925 ws = 'u';
1926
1927 /* Read status */
1928 char rs = (char)(report->status >> 8);
1929 if (rs != 'd' || rs != 'l' || rs != 'o' || rs != 'f' ||
1930 rs != 'c' || rs != 'n')
1931 rs = 'u';
1932 rc = asprintf(&fg_state, "(%c/%c)", ws, rs);
1933 }
1934
1935 if (rc == -1)
1936 critx("could not allocate memory for flowgrind status string");
1937
1938 changed |= print_column_str(&header1, &header2, &data, COL_STATUS,
1939 fg_state);
1940 free(fg_state);
1941 #endif /* DEBUG */
1942
1943 /* Print interval header again if either the column width has been
1944 * changed or MAX_REPORTS_BEFORE_HEADER reports have been emited
1945 * since last time header was printed */
1946 static unsigned short printed_reports = 0;
1947 if (changed || (printed_reports % MAX_REPORTS_IN_ROW) == 0) {
1948 print_output("%s\n", header1);
1949 print_output("%s\n", header2);
1950 }
1951
1952 print_output("%s\n", data);
1953 printed_reports++;
1954 free_all(header1, header2, data);
1955 }
1956
1957 /**
1958 * Maps common MTU sizes to network known technologies.
1959 *
1960 * @param[in] mtu MTU size
1961 * @return return network technology as string
1962 */
guess_topology(unsigned mtu)1963 static char *guess_topology(unsigned mtu)
1964 {
1965 struct mtu_hint {
1966 unsigned mtu;
1967 char *topology;
1968 };
1969
1970 static const struct mtu_hint mtu_hints[] = {
1971 {65535, "Hyperchannel"}, /* RFC1374 */
1972 {17914, "16 MB/s Token Ring"},
1973 {16436, "Linux Loopback device"},
1974 {16384, "FreeBSD Loopback device"},
1975 {16352, "Darwin Loopback device"},
1976 {9000, "Gigabit Ethernet (Jumboframes)"},
1977 {8166, "802.4 Token Bus"}, /* RFC1042 */
1978 {4464, "4 MB/s Token Ring"},
1979 {4352, "FDDI"}, /* RFC1390 */
1980 {1500, "Ethernet/PPP"}, /* RFC894, RFC1548 */
1981 {1492, "PPPoE"}, /* RFC2516 */
1982 {1472, "IP-in-IP"}, /* RFC1853 */
1983 {1280, "IPv6 Tunnel"}, /* RFC4213 */
1984 {1006, "SLIP"}, /* RFC1055 */
1985 {576, "X.25 & ISDN"}, /* RFC1356 */
1986 {296, "PPP (low delay)"},
1987 };
1988
1989 size_t array_size = sizeof(mtu_hints) / sizeof(struct mtu_hint);
1990 for (unsigned short i = 0; i < array_size; i++)
1991 if (mtu == mtu_hints[i].mtu)
1992 return mtu_hints[i].topology;
1993 return "unknown";
1994 }
1995
1996 /**
1997 * Print final report (i.e. summary line) for endpoint @p e of flow @p flow_id.
1998 *
1999 * @param[in] flow_id flow a final report will be created for
2000 * @param[in] e flow endpoint (SOURCE or DESTINATION)
2001 */
print_final_report(unsigned short flow_id,enum endpoint_t e)2002 static void print_final_report(unsigned short flow_id, enum endpoint_t e)
2003 {
2004 /* To store the final report */
2005 char *buf = NULL;
2006
2007 /* For convenience only */
2008 struct flow_endpoint *endpoint = &cflow[flow_id].endpoint[e];
2009 struct flow_settings *settings = &cflow[flow_id].settings[e];
2010 struct report *report = cflow[flow_id].final_report[e];
2011
2012 /* Flow ID and endpoint (source or destination) */
2013 if (asprintf(&buf, "# ID %3d %s: ", flow_id, e ? "D" : "S") == -1)
2014 critx("could not allocate memory for final report");;
2015
2016 /* No final report received. Skip final report line for this endpoint */
2017 if (!report) {
2018 asprintf_append(&buf, "Error: no final report received");
2019 goto out;
2020 }
2021
2022 /* Infos about the test connections */
2023 asprintf_append(&buf, "%s", endpoint->test_address);
2024
2025 if (strcmp(endpoint->rpc_info->server_name, endpoint->test_address) != 0)
2026 asprintf_append(&buf, "/%s", endpoint->rpc_info->server_name);
2027 if (endpoint->rpc_info->server_port != DEFAULT_LISTEN_PORT)
2028 asprintf_append(&buf, ":%d", endpoint->rpc_info->server_port);
2029
2030 /* Infos about the daemon OS */
2031 asprintf_append(&buf, " (%s %s), ",
2032 endpoint->daemon->os_name, endpoint->daemon->os_release);
2033
2034 /* Random seed */
2035 asprintf_append(&buf, "random seed: %u, ", cflow[flow_id].random_seed);
2036
2037 /* Sending & Receiving buffer */
2038 asprintf_append(&buf, "sbuf = %d/%d [B] (real/req), ",
2039 endpoint->send_buffer_size_real,
2040 settings->requested_send_buffer_size);
2041 asprintf_append(&buf, "rbuf = %d/%d [B] (real/req), ",
2042 endpoint->receive_buffer_size_real,
2043 settings->requested_read_buffer_size);
2044
2045 /* SMSS, Path MTU, Interface MTU */
2046 if (report->tcp_info.tcpi_snd_mss > 0)
2047 asprintf_append(&buf, "SMSS = %d [B], ",
2048 report->tcp_info.tcpi_snd_mss);
2049 if (report->pmtu > 0)
2050 asprintf_append(&buf, "PMTU = %d [B], ", report->pmtu);
2051 if (report->imtu > 0)
2052 asprintf_append(&buf, "Interface MTU = %d (%s) [B], ",
2053 report->imtu, guess_topology(report->imtu));
2054
2055 /* Congestion control algorithms */
2056 if (settings->cc_alg[0])
2057 asprintf_append(&buf, "CC = %s, ", settings->cc_alg);
2058
2059 /* Calculate time */
2060 double report_time = time_diff(&report->begin, &report->end);
2061 double delta_write = 0.0, delta_read = 0.0;
2062 if (settings->duration[WRITE])
2063 delta_write = report_time - settings->duration[WRITE]
2064 - settings->delay[SOURCE];
2065 if (settings->duration[READ])
2066 delta_read = report_time - settings->duration[READ]
2067 - settings->delay[DESTINATION];
2068
2069 /* Calculate delta target vs. real report time */
2070 double real_write = settings->duration[WRITE] + delta_write;
2071 double real_read = settings->duration[READ] + delta_read;
2072 if (settings->duration[WRITE])
2073 asprintf_append(&buf, "duration = %.3f/%.3f [s] (real/req), ",
2074 real_write, settings->duration[WRITE]);
2075 if (settings->delay[WRITE])
2076 asprintf_append(&buf, "write delay = %.3f [s], ",
2077 settings->delay[WRITE]);
2078 if (settings->delay[READ])
2079 asprintf_append(&buf, "read delay = %.3f [s], ",
2080 settings->delay[READ]);
2081
2082 /* Throughput */
2083 double thruput_read = report->bytes_read / MAX(real_read, real_write);
2084 double thruput_write = report->bytes_written / MAX(real_read, real_write);
2085 if (isnan(thruput_read))
2086 thruput_read = 0.0;
2087 if (isnan(thruput_write))
2088 thruput_write = 0.0;
2089
2090 thruput_read = scale_thruput(thruput_read);
2091 thruput_write = scale_thruput(thruput_write);
2092
2093 if (copt.mbyte)
2094 asprintf_append(&buf, "through = %.6f/%.6f [MiB/s] (out/in)",
2095 thruput_write, thruput_read);
2096 else
2097 asprintf_append(&buf, "through = " "%.6f/%.6f [Mbit/s] (out/in)",
2098 thruput_write, thruput_read);
2099
2100 /* Transactions */
2101 double trans = report->response_blocks_read / MAX(real_read, real_write);
2102 if (isnan(trans))
2103 trans = 0.0;
2104 if (trans)
2105 asprintf_append(&buf, ", transactions/s = %.2f [#]", trans);
2106
2107 /* Blocks */
2108 if (report->request_blocks_written || report->request_blocks_read)
2109 asprintf_append(&buf, ", request blocks = %u/%u [#] (out/in)",
2110 report->request_blocks_written,
2111 report->request_blocks_read);
2112 if (report->response_blocks_written || report->response_blocks_read)
2113 asprintf_append(&buf, ", response blocks = %u/%u [#] (out/in)",
2114 report->response_blocks_written,
2115 report->response_blocks_read);
2116
2117 /* RTT */
2118 if (report->response_blocks_read) {
2119 double rtt_avg = report->rtt_sum /
2120 (double)(report->response_blocks_read);
2121 asprintf_append(&buf, ", RTT = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2122 report->rtt_min * 1e3, rtt_avg * 1e3,
2123 report->rtt_max * 1e3);
2124 }
2125
2126 /* IAT */
2127 if (report->request_blocks_read) {
2128 double iat_avg = report->iat_sum /
2129 (double)(report->request_blocks_read);
2130 asprintf_append(&buf, ", IAT = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2131 report->iat_min * 1e3, iat_avg * 1e3,
2132 report->iat_max * 1e3);
2133 }
2134
2135 /* Delay */
2136 if (report->request_blocks_read) {
2137 double delay_avg = report->delay_sum /
2138 (double)(report->request_blocks_read);
2139 asprintf_append(&buf, ", delay = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2140 report->delay_min * 1e3, delay_avg * 1e3,
2141 report->delay_max * 1e3);
2142 }
2143
2144 /* Fixed sending rate per second was set */
2145 if (settings->write_rate_str)
2146 asprintf_append(&buf, ", rate = %s", settings->write_rate_str);
2147
2148 /* Socket options */
2149 if (settings->elcn)
2150 asprintf_append(&buf, ", ELCN");
2151 if (settings->cork)
2152 asprintf_append(&buf, ", TCP_CORK");
2153 if (settings->pushy)
2154 asprintf_append(&buf, ", PUSHY");
2155 if (settings->nonagle)
2156 asprintf_append(&buf, ", TCP_NODELAY");
2157 if (settings->mtcp)
2158 asprintf_append(&buf, ", TCP_MTCP");
2159 if (settings->dscp)
2160 asprintf_append(&buf, ", dscp = 0x%02x", settings->dscp);
2161
2162 /* Other flow options */
2163 if (cflow[flow_id].late_connect)
2164 asprintf_append(&buf, ", late connecting");
2165 if (cflow[flow_id].shutdown)
2166 asprintf_append(&buf, ", calling shutdown");
2167
2168 out:
2169 print_output("%s\n", buf);
2170 free(buf);
2171 }
2172
2173 /**
2174 * Print final report (i.e. summary line) for all configured flows.
2175 */
print_all_final_reports(void)2176 static void print_all_final_reports(void)
2177 {
2178 for (unsigned id = 0; id < copt.num_flows; id++) {
2179 print_output("\n");
2180 foreach(int *i, SOURCE, DESTINATION) {
2181 print_final_report(id, *i);
2182 free(cflow[id].final_report[*i]);
2183 }
2184 }
2185 }
2186
2187 /**
2188 * Add the flow endpoint XML RPC data to the Global linked list.
2189 *
2190 * @param[in] XML-RPC connection url
2191 * @param[in] server_name flow endpoints IP address
2192 * @param[in] server_port controller - daemon XML-RPC connection port Nr
2193 * @return rpc_info flow endpoint XML RPC structure data
2194 */
add_flow_endpoint_by_url(const char * server_url,const char * server_name,unsigned short server_port)2195 static struct rpc_info * add_flow_endpoint_by_url(const char* server_url,
2196 const char* server_name,
2197 unsigned short server_port)
2198 {
2199 struct rpc_info *flow_rpc_info;
2200 flow_rpc_info = malloc((sizeof(struct rpc_info)));
2201
2202 if (!flow_rpc_info ) {
2203 logging(LOG_ALERT, "could not allocate memory for flows rpc info");
2204 return 0;
2205 }
2206
2207 memset(flow_rpc_info, 0, sizeof(struct rpc_info));
2208
2209 strcpy(flow_rpc_info->server_url, server_url);
2210 strcpy(flow_rpc_info->server_name, server_name);
2211 flow_rpc_info->server_port = server_port;
2212 fg_list_push_back(&flows_rpc_info, flow_rpc_info);
2213 return flow_rpc_info;
2214 }
2215
2216 /**
2217 * Set the flow endpoint XML RPC data for a given server_url.
2218 *
2219 * @param[in] XML-RPC connection url
2220 * @param[in] server_name flow endpoints IP address
2221 * @param[in] server_port controller - daemon XML-RPC connection port Nr
2222 * @return rpc_info flow endpoint XML RPC structure data
2223 */
set_rpc_info(const char * server_url,const char * server_name,unsigned short server_port)2224 static struct rpc_info * set_rpc_info(const char* server_url,
2225 const char* server_name,
2226 unsigned short server_port)
2227 {
2228 if(fg_list_size(&flows_rpc_info) == 0)
2229 return add_flow_endpoint_by_url(server_url,server_name, server_port);
2230
2231 /* If we have already stored flow info for this URL return a pointer to it */
2232 const struct list_node *node = fg_list_front(&flows_rpc_info);
2233 while (node) {
2234 struct rpc_info *flow_rpc_info= node->data;
2235 node = node->next;
2236
2237 if (!strcmp(flow_rpc_info->server_url, server_url))
2238 return flow_rpc_info;
2239 }
2240 /* didn't find anything, seems to be a new one */
2241 return add_flow_endpoint_by_url(server_url,server_name, server_port);
2242 }
2243
2244 /**
2245 * Parse option for stochastic traffic generation (option -G).
2246 *
2247 * @param[in] params parameter string in the form 'x=(q|p|g):(C|U|E|N|L|P|W):#1:[#2]'
2248 * @param[in] flow_id ID of flow to apply option to
2249 * @param[in] endpoint_id endpoint to apply option to
2250 */
parse_trafgen_option(const char * params,int flow_id,int endpoint_id)2251 static void parse_trafgen_option(const char *params, int flow_id, int endpoint_id)
2252 {
2253 int rc;
2254 double param1 = 0, param2 = 0, unused;
2255 char typechar, distchar;
2256 enum distribution_t distr = CONSTANT;
2257
2258 rc = sscanf(params, "%c:%c:%lf:%lf:%lf", &typechar, &distchar,
2259 ¶m1, ¶m2, &unused);
2260 if (rc != 3 && rc != 4)
2261 PARSE_ERR("flow %i: option -G: malformed traffic generation "
2262 "parameters", flow_id);
2263
2264 switch (distchar) {
2265 case 'N':
2266 distr = NORMAL;
2267 if (!param1 || !param2)
2268 PARSE_ERR("flow %i: option -G: normal distribution "
2269 "needs two non-zero parameters", flow_id);
2270 break;
2271 case 'W':
2272 distr = WEIBULL;
2273 if (!param1 || !param2)
2274 PARSE_ERR("flow %i: option -G: weibull distribution "
2275 "needs two non-zero parameters", flow_id);
2276 break;
2277 case 'U':
2278 distr = UNIFORM;
2279 if (param1 <= 0 || param2 <= 0 || (param1 > param2))
2280 PARSE_ERR("flow %i: option -G: uniform distribution "
2281 "needs two positive parameters", flow_id);
2282 break;
2283 case 'E':
2284 distr = EXPONENTIAL;
2285 if (param1 <= 0)
2286 PARSE_ERR("flow %i: option -G: exponential distribution "
2287 "needs one positive parameter", flow_id);
2288 break;
2289 case 'P':
2290 distr = PARETO;
2291 if (!param1 || !param2)
2292 PARSE_ERR("flow %i: option -G: pareto distribution "
2293 "needs two non-zero parameters", flow_id);
2294 break;
2295 case 'L':
2296 distr = LOGNORMAL;
2297 if (!param1 || !param2)
2298 PARSE_ERR("flow %i: option -G: lognormal distribution "
2299 "needs two non-zero parameters", flow_id);
2300 break;
2301 case 'C':
2302 distr = CONSTANT;
2303 if (param1 <= 0)
2304 PARSE_ERR("flow %i: option -G: constant distribution "
2305 "needs one positive parameters", flow_id);
2306 break;
2307 default:
2308 PARSE_ERR("flow %i: option -G: syntax error: %c is not a "
2309 "distribution", flow_id, distchar);
2310 break;
2311 }
2312
2313 switch (typechar) {
2314 case 'p':
2315 cflow[flow_id].settings[endpoint_id].response_trafgen_options.distribution = distr;
2316 cflow[flow_id].settings[endpoint_id].response_trafgen_options.param_one = param1;
2317 cflow[flow_id].settings[endpoint_id].response_trafgen_options.param_two = param2;
2318 break;
2319 case 'q':
2320 cflow[flow_id].settings[endpoint_id].request_trafgen_options.distribution = distr;
2321 cflow[flow_id].settings[endpoint_id].request_trafgen_options.param_one = param1;
2322 cflow[flow_id].settings[endpoint_id].request_trafgen_options.param_two = param2;
2323 break;
2324 case 'g':
2325 cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.distribution = distr;
2326 cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.param_one = param1;
2327 cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.param_two = param2;
2328 break;
2329 }
2330
2331 /* sanity check for max block size */
2332 foreach(int *i, SOURCE, DESTINATION) {
2333 if (distr == CONSTANT &&
2334 cflow[flow_id].settings[*i].maximum_block_size < param1)
2335 cflow[flow_id].settings[*i].maximum_block_size = param1;
2336 if (distr == UNIFORM &&
2337 cflow[flow_id].settings[*i].maximum_block_size < param2)
2338 cflow[flow_id].settings[*i].maximum_block_size = param2;
2339 }
2340 }
2341
2342 /**
2343 * Parse argument for option -R, which specifies the rate the endpoint will send.
2344 *
2345 * @param[in] arg argument for option -R in form of #.#(z|k|M|G)(b|B|o)
2346 * @param[in] flow_id ID of flow to apply option to
2347 * @param[in] endpoint_id endpoint to apply option to
2348 */
parse_rate_option(const char * arg,int flow_id,int endpoint_id)2349 static void parse_rate_option(const char *arg, int flow_id, int endpoint_id)
2350 {
2351 char unit = 0, type = 0;
2352 double optdouble = 0.0;
2353 /* last %c for catching wrong input... this is not nice. */
2354 int rc = sscanf(arg, "%lf%c%c%c",
2355 &optdouble, &unit, &type, &unit);
2356 if (rc < 1 || rc > 4)
2357 PARSE_ERR("flow %i: option -R: malformed rate", flow_id);
2358
2359 if (optdouble == 0.0)
2360 PARSE_ERR("flow %i: option -R: rate of 0", flow_id);
2361
2362
2363 switch (unit) {
2364 case 0:
2365 case 'z':
2366 break;
2367
2368 case 'k':
2369 optdouble *= 1<<10;
2370 break;
2371
2372 case 'M':
2373 optdouble *= 1<<20;
2374 break;
2375
2376 case 'G':
2377 optdouble *= 1<<30;
2378 break;
2379
2380 default:
2381 PARSE_ERR("flow %i: option -R: illegal unit specifier", flow_id);
2382 break;
2383 }
2384
2385 if (type != 'b' && type != 'B')
2386 PARSE_ERR("flow %i: option -R: illegal type specifier "
2387 "(either 'b' or 'B')", flow_id);
2388 if (type == 'b')
2389 optdouble /= 8;
2390
2391 if (optdouble > 5e9)
2392 warnx("rate of flow %d too high", flow_id);
2393
2394 cflow[flow_id].settings[endpoint_id].write_rate_str = strdup(arg);
2395 cflow[flow_id].settings[endpoint_id].write_rate = optdouble;
2396 }
2397
2398
2399
2400 /**
2401 * Parse argument for option -H, which specifies the endpoints of a flow.
2402 *
2403 * @param[in] hostarg argument for option -H in form of HOST[/CONTROL[:PORT]]
2404 * - HOST: test address where the actual test connection goes to
2405 * - CONTROL: RPC address, where this program connects to
2406 * - PORT: port for the control connection
2407 * @param[in] flow_id ID of flow to apply option to
2408 * @param[in] endpoint_id endpoint to apply option to
2409 */
parse_host_option(const char * hostarg,int flow_id,int endpoint_id)2410 static void parse_host_option(const char* hostarg, int flow_id, int endpoint_id)
2411 {
2412 struct sockaddr_in6 source_in6;
2413 source_in6.sin6_family = AF_INET6;
2414 int port = DEFAULT_LISTEN_PORT;
2415 bool extra_rpc = false;
2416 bool is_ipv6 = false;
2417 char *rpc_address, *url = 0, *sepptr = 0;
2418 char *arg = strdup(hostarg);
2419 struct flow_endpoint* endpoint = &cflow[flow_id].endpoint[endpoint_id];
2420
2421 /* extra RPC address ? */
2422 sepptr = strchr(arg, '/');
2423 if (sepptr) {
2424 *sepptr = '\0';
2425 rpc_address = sepptr + 1;
2426 extra_rpc = true;
2427 } else {
2428 rpc_address = arg;
2429 }
2430
2431 /* IPv6 Address? */
2432 if (strchr(arg, ':')) {
2433 if (inet_pton(AF_INET6, arg, (char*)&source_in6.sin6_addr) <= 0)
2434 PARSE_ERR("flow %i: invalid IPv6 address '%s' for "
2435 "test connection", flow_id, arg);
2436
2437 if (!extra_rpc)
2438 is_ipv6 = true;
2439 }
2440
2441 /* optional dedicated rpc address was supplied and needs to be parsed */
2442 if (extra_rpc) {
2443 parse_rpc_address(&rpc_address, &port, &is_ipv6);
2444 if (is_ipv6 && (inet_pton(AF_INET6, rpc_address,
2445 (char*)&source_in6.sin6_addr) <= 0))
2446 PARSE_ERR("flow %i: invalid IPv6 address '%s' for RPC",
2447 flow_id, arg);
2448 if (port < 1 || port > 65535)
2449 PARSE_ERR("flow %i: invalid port for RPC", flow_id);
2450 }
2451
2452 if (!*arg)
2453 PARSE_ERR("flow %i: no test host given in argument", flow_id);
2454
2455 int rc = 0;
2456 if (is_ipv6)
2457 rc = asprintf(&url, "http://[%s]:%d/RPC2", rpc_address, port);
2458 else
2459 rc = asprintf(&url, "http://%s:%d/RPC2", rpc_address, port);
2460
2461 if (rc == -1)
2462 critx("could not allocate memory for RPC URL");
2463
2464 /* Get flow endpoint server information for each flow */
2465 endpoint->rpc_info = set_rpc_info(url, rpc_address, port);
2466 strcpy(endpoint->test_address, arg);
2467 free_all(arg, url);
2468 }
2469
2470 /**
2471 * Parse flow options with endpoint.
2472 *
2473 * @param[in] code the code of the cmdline option
2474 * @param[in] arg the argument of the cmdline option
2475 * @param[in] opt_string contains the real cmdline option string
2476 * @param[in] flow_id ID of flow to apply option to
2477 * @param[in] endpoint_id endpoint to apply option to
2478 */
parse_flow_option_endpoint(int code,const char * arg,const char * opt_string,int flow_id,int endpoint_id)2479 static void parse_flow_option_endpoint(int code, const char* arg,
2480 const char* opt_string, int flow_id,
2481 int endpoint_id)
2482 {
2483 int optint = 0;
2484 double optdouble = 0.0;
2485
2486 struct flow_settings* settings = &cflow[flow_id].settings[endpoint_id];
2487
2488 switch (code) {
2489 case 'G':
2490 parse_trafgen_option(arg, flow_id, endpoint_id);
2491 break;
2492 case 'A':
2493 SHOW_COLUMNS(COL_RTT_MIN, COL_RTT_AVG, COL_RTT_MAX);
2494 settings->response_trafgen_options.distribution = CONSTANT;
2495 settings->response_trafgen_options.param_one = MIN_BLOCK_SIZE;
2496 break;
2497 case 'B':
2498 if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2499 PARSE_ERR("in flow %i: option %s needs positive integer",
2500 flow_id, opt_string);
2501 settings->requested_send_buffer_size = optint;
2502 break;
2503 case 'C':
2504 settings->flow_control= 1;
2505 break;
2506 case 'D':
2507 if (sscanf(arg, "%x", &optint) != 1 || (optint & ~0x3f))
2508 PARSE_ERR("in flow %i: option %s service code point "
2509 "is malformed", flow_id, opt_string);
2510 settings->dscp = optint;
2511 break;
2512 case 'H':
2513 parse_host_option(arg, flow_id, endpoint_id);
2514 break;
2515 case 'M':
2516 settings->traffic_dump = 1;
2517 break;
2518 case 'O':
2519 if (!*arg)
2520 PARSE_ERR("in flow %i: option %s requires a value "
2521 "for each endpoint", flow_id, opt_string);
2522
2523 if (!strcmp(arg, "TCP_CORK")) {
2524 settings->cork = 1;
2525 } else if (!strcmp(arg, "TCP_ELCN")) {
2526 settings->elcn = 1;
2527 } else if (!strcmp(arg, "TCP_LCD")) {
2528 settings->lcd = 1;
2529 } else if (!strcmp(arg, "TCP_MTCP")) {
2530 settings->mtcp = 1;
2531 } else if (!strcmp(arg, "TCP_NODELAY")) {
2532 settings->nonagle = 1;
2533 } else if (!strcmp(arg, "ROUTE_RECORD")) {
2534 settings->route_record = 1;
2535 /* keep TCP_CONG_MODULE for backward compatibility */
2536 } else if (!memcmp(arg, "TCP_CONG_MODULE=", 16)) {
2537 if (strlen(arg + 16) >= sizeof(cflow[0].settings[SOURCE].cc_alg))
2538 PARSE_ERR("in flow %i: option %s: too large "
2539 "string for TCP_CONG_MODULE",
2540 flow_id, opt_string);
2541 strcpy(settings->cc_alg, arg + 16);
2542 } else if (!memcmp(arg, "TCP_CONGESTION=", 15)) {
2543 if (strlen(arg + 16) >= sizeof(cflow[0].settings[SOURCE].cc_alg))
2544 PARSE_ERR("in flow %i: option %s: too large "
2545 "string for TCP_CONGESTION",
2546 flow_id, opt_string);
2547 strcpy(settings->cc_alg, arg + 15);
2548 } else if (!strcmp(arg, "SO_DEBUG")) {
2549 settings->so_debug = 1;
2550 } else if (!strcmp(arg, "IP_MTU_DISCOVER")) {
2551 settings->ipmtudiscover = 1;
2552 } else {
2553 PARSE_ERR("in flow %i: option %s: unknown socket "
2554 "option or socket option not implemented",
2555 flow_id, opt_string);
2556 }
2557 break;
2558 case 'P':
2559 settings->pushy = 1;
2560 break;
2561 case 'R':
2562 if (!*arg)
2563 PARSE_ERR("in flow %i: option %s requires a value "
2564 "for each given endpoint", flow_id, opt_string);
2565 parse_rate_option(arg, flow_id, endpoint_id);
2566 break;
2567 case 'S':
2568 if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2569 PARSE_ERR("in flow %i: option %s needs positive integer",
2570 flow_id, opt_string);
2571 settings->request_trafgen_options.distribution = CONSTANT;
2572 settings->request_trafgen_options.param_one = optint;
2573 for (int id = 0; id < MAX_FLOWS_CONTROLLER; id++) {
2574 foreach(int *i, SOURCE, DESTINATION) {
2575 if ((signed)optint >
2576 cflow[id].settings[*i].maximum_block_size)
2577 cflow[id].settings[*i].maximum_block_size =
2578 (signed)optint;
2579 }
2580 }
2581 break;
2582 case 'T':
2583 if (sscanf(arg, "%lf", &optdouble) != 1 || optdouble < 0)
2584 PARSE_ERR("in flow %i: option %s needs positive number",
2585 flow_id, opt_string);
2586 settings->duration[WRITE] = optdouble;
2587 break;
2588 case 'U':
2589 if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2590 PARSE_ERR("in flow %i: option %s needs positive integer",
2591 flow_id, opt_string);
2592 settings->maximum_block_size = optint;
2593 break;
2594 case 'W':
2595 if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2596 PARSE_ERR("in flow %i: option %s needs non-negative number",
2597 flow_id, opt_string);
2598 settings->requested_read_buffer_size = optint;
2599 break;
2600 case 'Y':
2601 if (sscanf(arg, "%lf", &optdouble) != 1 || optdouble < 0)
2602 PARSE_ERR("in flow %i: option %s needs non-negative number",
2603 flow_id, opt_string);
2604 settings->delay[WRITE] = optdouble;
2605 break;
2606 }
2607 }
2608
2609 /**
2610 * Parse flow options without endpoint.
2611 *
2612 * @param[in] code the code of the cmdline option
2613 * @param[in] arg the argument string of the cmdline option
2614 * @param[in] opt_string contains the real cmdline option string
2615 * @param[in] flow_id ID of flow to apply option to
2616 */
parse_flow_option(int code,const char * arg,const char * opt_string,int flow_id)2617 static void parse_flow_option(int code, const char* arg, const char* opt_string,
2618 int flow_id)
2619 {
2620 unsigned optunsigned = 0;
2621
2622 switch (code) {
2623 /* flow options w/o endpoint identifier */
2624 case 'E':
2625 cflow[flow_id].byte_counting = 1;
2626 break;
2627 case 'I':
2628 SHOW_COLUMNS(COL_DLY_MIN, COL_DLY_AVG, COL_DLY_MAX);
2629 break;
2630 case 'J':
2631 if (sscanf(arg, "%u", &optunsigned) != 1)
2632 PARSE_ERR("option %s needs an integer argument",
2633 opt_string);
2634 cflow[flow_id].random_seed = optunsigned;
2635 break;
2636 case 'L':
2637 cflow[flow_id].late_connect = 1;
2638 break;
2639 case 'N':
2640 cflow[flow_id].shutdown = 1;
2641 break;
2642 case 'Q':
2643 cflow[flow_id].summarize_only = 1;
2644 break;
2645 }
2646 }
2647
2648 /**
2649 * Parse argument for option -c to hide/show intermediated interval report
2650 * columns.
2651 *
2652 * @param[in] arg argument for option -c
2653 */
parse_colon_option(const char * arg)2654 static void parse_colon_option(const char *arg)
2655 {
2656 /* To make it easy (independed of default values), hide all colons */
2657 HIDE_COLUMNS(COL_BEGIN, COL_END, COL_THROUGH, COL_TRANSAC,
2658 COL_BLOCK_REQU, COL_BLOCK_RESP, COL_RTT_MIN, COL_RTT_AVG,
2659 COL_RTT_MAX, COL_IAT_MIN, COL_IAT_AVG, COL_IAT_MAX,
2660 COL_DLY_MIN, COL_DLY_AVG, COL_DLY_MAX, COL_TCP_CWND,
2661 COL_TCP_SSTH, COL_TCP_UACK, COL_TCP_SACK, COL_TCP_LOST,
2662 COL_TCP_RETR, COL_TCP_TRET, COL_TCP_FACK, COL_TCP_REOR,
2663 COL_TCP_BKOF, COL_TCP_RTT, COL_TCP_RTTVAR, COL_TCP_RTO,
2664 COL_TCP_CA_STATE, COL_SMSS, COL_PMTU);
2665 #ifdef DEBUG
2666 HIDE_COLUMNS(COL_STATUS);
2667 #endif /* DEBUG */
2668
2669 /* Set colon visibility according option */
2670 char *argcpy = strdup(arg);
2671 for (char *token = strtok(argcpy, ","); token;
2672 token = strtok(NULL, ",")) {
2673 if (!strcmp(token, "interval"))
2674 SHOW_COLUMNS(COL_BEGIN, COL_END);
2675 else if (!strcmp(token, "through"))
2676 SHOW_COLUMNS(COL_THROUGH);
2677 else if (!strcmp(token, "transac"))
2678 SHOW_COLUMNS(COL_TRANSAC);
2679 else if (!strcmp(token, "blocks"))
2680 SHOW_COLUMNS(COL_BLOCK_REQU, COL_BLOCK_RESP);
2681 else if (!strcmp(token, "rtt"))
2682 SHOW_COLUMNS(COL_RTT_MIN, COL_RTT_AVG, COL_RTT_MAX);
2683 else if (!strcmp(token, "iat"))
2684 SHOW_COLUMNS(COL_IAT_MIN, COL_IAT_AVG, COL_IAT_MAX);
2685 else if (!strcmp(token, "delay"))
2686 SHOW_COLUMNS(COL_DLY_MIN, COL_DLY_AVG, COL_DLY_MAX);
2687 else if (!strcmp(token, "kernel"))
2688 SHOW_COLUMNS(COL_TCP_CWND, COL_TCP_SSTH, COL_TCP_UACK,
2689 COL_TCP_SACK, COL_TCP_LOST, COL_TCP_RETR,
2690 COL_TCP_TRET, COL_TCP_FACK, COL_TCP_REOR,
2691 COL_TCP_BKOF, COL_TCP_RTT, COL_TCP_RTTVAR,
2692 COL_TCP_RTO, COL_TCP_CA_STATE, COL_SMSS,
2693 COL_PMTU);
2694 #ifdef DEBUG
2695 else if (!strcmp(token, "status"))
2696 SHOW_COLUMNS(COL_STATUS);
2697 #endif /* DEBUG */
2698 else
2699 PARSE_ERR("%s", "malformed option '-c'");
2700 }
2701 free(argcpy);
2702 }
2703
2704 /**
2705 * Parse general controller options given on the cmdline.
2706 *
2707 * @param[in] code the code of the cmdline option
2708 * @param[in] arg the argument string of the cmdline option
2709 * @param[in] opt_string contains the real cmdline option string
2710 */
parse_general_option(int code,const char * arg,const char * opt_string)2711 static void parse_general_option(int code, const char* arg, const char* opt_string)
2712 {
2713
2714 switch (code) {
2715 case 0:
2716 PARSE_ERR("invalid argument: %s", arg);
2717 /* general options */
2718 case 'h':
2719 if (!arg || !strlen(arg))
2720 usage(EXIT_SUCCESS);
2721 else if (!strcmp(arg, "socket"))
2722 usage_sockopt();
2723 else if (!strcmp(arg, "traffic"))
2724 usage_trafgenopt();
2725 else
2726 PARSE_ERR("invalid argument '%s' for %s", arg, opt_string);
2727 break;
2728 case 'v':
2729 fprintf(stdout, "%s %s\n%s\n%s\n\n%s\n", progname,
2730 FLOWGRIND_VERSION, FLOWGRIND_COPYRIGHT,
2731 FLOWGRIND_COPYING, FLOWGRIND_AUTHORS);
2732 exit(EXIT_SUCCESS);
2733
2734 /* controller options */
2735 case 'c':
2736 parse_colon_option(arg);
2737 break;
2738 #ifdef DEBUG
2739 case 'd':
2740 increase_debuglevel();
2741 break;
2742 #endif /* DEBUG */
2743 case 'e':
2744 copt.dump_prefix = strdup(arg);
2745 break;
2746 case 'i':
2747 if (sscanf(arg, "%lf", &copt.reporting_interval) != 1 ||
2748 copt.reporting_interval <= 0)
2749 PARSE_ERR("option %s needs a positive number "
2750 "(in seconds)", opt_string);
2751 break;
2752 case LOG_FILE_OPTION:
2753 copt.log_to_file = true;
2754 if (arg)
2755 log_filename = strdup(arg);
2756 break;
2757 case 'm':
2758 copt.mbyte = true;
2759 column_info[COL_THROUGH].header.unit = " [MiB/s]";
2760 break;
2761 case 'n':
2762 if (sscanf(arg, "%hd", &copt.num_flows) != 1 ||
2763 copt.num_flows > MAX_FLOWS_CONTROLLER)
2764 PARSE_ERR("option %s (number of flows) must be within "
2765 "[1..%d]", opt_string, MAX_FLOWS_CONTROLLER);
2766 break;
2767 case 'o':
2768 copt.clobber = true;
2769 break;
2770 case 'p':
2771 copt.symbolic = false;
2772 break;
2773 case 'q':
2774 copt.log_to_stdout = false;
2775 break;
2776 case 's':
2777 if (!strcmp(arg, "segment"))
2778 copt.force_unit = SEGMENT_BASED;
2779 else if (!strcmp(arg, "byte"))
2780 copt.force_unit = BYTE_BASED;
2781 else
2782 PARSE_ERR("invalid argument '%s' for option %s",
2783 arg, opt_string);
2784 break;
2785 case 'w':
2786 copt.log_to_file = true;
2787 break;
2788 /* unknown option or missing option-argument */
2789 default:
2790 PARSE_ERR("uncaught option: %s", arg);
2791 break;
2792 }
2793
2794 }
2795
2796 /**
2797 * Wrapper function for mutex checking and error message printing.
2798 *
2799 * Defines the cmdline options and distinguishes option types (flow, general, ...)
2800 * and tokenizes flow options which can have several endpoints.
2801 *
2802 * @param[in] ms array of mutex states
2803 * @param[in] context the mutex context of this option (see enum #mutex_contexts)
2804 * @param[in] argind option record index
2805 * @param[in] flow_id ID of the flow to show in error message
2806 */
check_mutex(struct ap_Mutex_state ms[],const enum mutex_context_t context,const int argind,int flow_id)2807 static void check_mutex(struct ap_Mutex_state ms[],
2808 const enum mutex_context_t context,
2809 const int argind, int flow_id)
2810 {
2811 int mutex_index;
2812 if (context == MUTEX_CONTEXT_CONTROLLER){
2813 if (ap_set_check_mutex(&parser, &ms[context], argind, &mutex_index))
2814 PARSE_ERR("Option %s conflicts with option %s",
2815 ap_opt_string(&parser, argind),
2816 ap_opt_string(&parser, mutex_index));
2817 } else {
2818 if (ap_set_check_mutex(&parser, &ms[context], argind, &mutex_index))
2819 PARSE_ERR("In flow %i: option %s conflicts with option %s",
2820 flow_id, ap_opt_string(&parser, argind),
2821 ap_opt_string(&parser, mutex_index));
2822 }
2823 }
2824
2825 /**
2826 * Parse flow options for multiple endpoints.
2827 *
2828 * This iterates through the endpoints given in the argument string
2829 * (e.g. s=#,d=# or b=#).
2830 *
2831 * @param[in] code the code of the cmdline option
2832 * @param[in] arg the argument of the multi-endpoint flow option
2833 * @param[in] opt_string contains the real cmdline option string
2834 * @param[in] ms array of mutex states
2835 * @param[in] argind index of the option
2836 * @param[in] flow_id ID of flow to apply option to
2837 */
parse_multi_endpoint_option(int code,const char * arg,const char * opt_string,struct ap_Mutex_state ms[],int argind,int flow_id)2838 static void parse_multi_endpoint_option(int code, const char* arg,
2839 const char* opt_string,
2840 struct ap_Mutex_state ms[], int argind,
2841 int flow_id)
2842 {
2843 char *argcpy = strdup(arg);
2844 for (char *token = strtok(argcpy, ","); token;
2845 token = strtok(NULL, ",")) {
2846
2847 char type = token[0];
2848 char* arg;
2849
2850 if (token[1] == '=')
2851 arg = token + 2;
2852 else
2853 arg = token + 1;
2854
2855 if (type != 's' && type != 'd' && type != 'b')
2856 PARSE_ERR("Invalid endpoint specifier in Option %s",
2857 opt_string);
2858
2859 /* check mutex in context of current endpoint */
2860 if (type == 's' || type == 'b') {
2861 check_mutex(ms, MUTEX_CONTEXT_SOURCE, argind, flow_id);
2862 parse_flow_option_endpoint(code, arg, opt_string,
2863 flow_id, SOURCE);
2864 }
2865 if (type == 'd' || type == 'b') {
2866 check_mutex(ms, MUTEX_CONTEXT_DESTINATION, argind, flow_id);
2867 parse_flow_option_endpoint(code, arg, opt_string,
2868 flow_id, DESTINATION);
2869 }
2870 }
2871 free(argcpy);
2872 }
2873
2874 /**
2875 * The main commandline argument parsing function.
2876 *
2877 * Defines the cmdline options and distinguishes option types (flow, general,
2878 * ...) and tokenizes flow options which can have several endpoints.
2879 *
2880 * @param[in] argc number of arguments (as in main())
2881 * @param[in] argv array of argument strings (as in main())
2882 */
parse_cmdline(int argc,char * argv[])2883 static void parse_cmdline(int argc, char *argv[])
2884 {
2885 int rc = 0;
2886 int cur_num_flows = 0;
2887 int current_flow_ids[MAX_FLOWS_CONTROLLER];
2888 int max_flow_specifier = 0;
2889 int optint = 0;
2890
2891 const struct ap_Option options[] = {
2892 {'c', "show-colon", ap_yes, OPT_CONTROLLER, 0},
2893 #ifdef DEBUG
2894 {'d', "debug", ap_no, OPT_CONTROLLER, 0},
2895 #endif /* DEBUG */
2896 {'e', "dump-prefix", ap_yes, OPT_CONTROLLER, 0},
2897 {'h', "help", ap_maybe, OPT_CONTROLLER, 0},
2898 {'i', "report-interval", ap_yes, OPT_CONTROLLER, 0},
2899 {LOG_FILE_OPTION, "log-file", ap_maybe, OPT_CONTROLLER, 0},
2900 {'m', 0, ap_no, OPT_CONTROLLER, 0},
2901 {'n', "flows", ap_yes, OPT_CONTROLLER, 0},
2902 {'o', 0, ap_no, OPT_CONTROLLER, 0},
2903 {'p', 0, ap_no, OPT_CONTROLLER, 0},
2904 {'q', "quiet", ap_no, OPT_CONTROLLER, 0},
2905 {'s', "tcp-stack", ap_yes, OPT_CONTROLLER, 0},
2906 {'v', "version", ap_no, OPT_CONTROLLER, 0},
2907 {'w', 0, ap_no, OPT_CONTROLLER, 0},
2908 {'A', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){1,0}},
2909 {'B', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2910 {'C', 0, ap_no, OPT_FLOW_ENDPOINT, 0},
2911 {'D', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2912 {'E', 0, ap_no, OPT_FLOW, 0},
2913 {'F', 0, ap_yes, OPT_SELECTOR, 0},
2914 {'G', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){1,2,3,0}},
2915 {'H', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2916 {'I', 0, ap_no, OPT_FLOW, 0},
2917 {'J', 0, ap_yes, OPT_FLOW, 0},
2918 {'L', 0, ap_no, OPT_FLOW, 0},
2919 {'M', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2920 {'N', 0, ap_no, OPT_FLOW, 0},
2921 {'O', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2922 {'P', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2923 {'Q', 0, ap_no, OPT_FLOW, 0},
2924 {'R', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){2,0}},
2925 {'S', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){3,0}},
2926 {'T', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2927 {'U', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2928 {'W', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2929 {'Y', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2930 {0, 0, ap_no, 0, 0}
2931 };
2932
2933 if (!ap_init(&parser, argc, (const char* const*) argv, options, 0))
2934 critx("could not allocate memory for option parser");
2935 if (ap_error(&parser))
2936 PARSE_ERR("%s", ap_error(&parser));
2937
2938 /* initialize 4 mutex contexts (for SOURCE+DESTINATION+CONTROLLER+BOTH ENDPOINTS) */
2939 struct ap_Mutex_state ms[4];
2940 foreach(int *i, MUTEX_CONTEXT_CONTROLLER, MUTEX_CONTEXT_TWO_SIDED,
2941 MUTEX_CONTEXT_TWO_SIDED, MUTEX_CONTEXT_DESTINATION)
2942 ap_init_mutex_state(&parser, &ms[*i]);
2943
2944 /* if no option -F is given, configure all flows*/
2945 for (int i = 0; i < MAX_FLOWS_CONTROLLER; i++)
2946 current_flow_ids[i] = i;
2947 cur_num_flows = MAX_FLOWS_CONTROLLER;
2948
2949 /* parse command line */
2950 for (int argind = 0; argind < ap_arguments(&parser); argind++) {
2951 const int code = ap_code(&parser, argind);
2952 const char *arg = ap_argument(&parser, argind);
2953 const char *opt_string = ap_opt_string(&parser, argind);
2954 int tag = ap_option(&parser, argind)->tag;
2955
2956 /* distinguish option types by tag first */
2957 switch (tag) {
2958 case OPT_CONTROLLER:
2959 check_mutex(ms, MUTEX_CONTEXT_CONTROLLER, argind, 0);
2960 parse_general_option(code, arg, opt_string);
2961 break;
2962 case OPT_SELECTOR:
2963 cur_num_flows = 0;
2964 char *argcpy = strdup(arg);
2965 for (char *token = strtok(argcpy, ","); token;
2966 token = strtok(NULL, ",")) {
2967 rc = sscanf(token, "%d", &optint);
2968 if (rc != 1)
2969 PARSE_ERR("%s", "Malformed flow specifier");
2970
2971 /* all flows */
2972 if (optint == -1) {
2973 for (int i = 0; i < MAX_FLOWS_CONTROLLER; i++)
2974 current_flow_ids[i] = i;
2975 cur_num_flows = MAX_FLOWS_CONTROLLER;
2976 break;
2977 }
2978
2979 current_flow_ids[cur_num_flows++] = optint;
2980 ASSIGN_MAX(max_flow_specifier, optint);
2981 }
2982 free(argcpy);
2983 /* reset mutex for each new flow */
2984 foreach(int *i, MUTEX_CONTEXT_SOURCE,
2985 MUTEX_CONTEXT_DESTINATION,
2986 MUTEX_CONTEXT_TWO_SIDED)
2987 ap_reset_mutex(&ms[*i]);
2988 break;
2989 case OPT_FLOW:
2990 check_mutex(ms, MUTEX_CONTEXT_TWO_SIDED, argind,
2991 current_flow_ids[0]);
2992 for (int i = 0; i < cur_num_flows; i++)
2993 parse_flow_option(code, arg, opt_string,
2994 current_flow_ids[i]);
2995 break;
2996 case OPT_FLOW_ENDPOINT:
2997 for (int i = 0; i < cur_num_flows; i++)
2998 parse_multi_endpoint_option(code, arg,
2999 opt_string, ms, argind,
3000 current_flow_ids[i]);
3001 break;
3002 default:
3003 PARSE_ERR("%s", "uncaught option tag!");
3004 break;
3005 }
3006 }
3007
3008 if (copt.num_flows <= max_flow_specifier)
3009 PARSE_ERR("%s", "must not specify option for non-existing flow");
3010
3011 #if 0
3012 /* Demonstration how to set arbitary socket options. Note that this is
3013 * only intended for quickly testing new options without having to
3014 * recompile and restart the daemons. To add support for a particular
3015 * options in future flowgrind versions it's recommended to implement
3016 * them like the other options supported by the -O argument.
3017 */
3018 {
3019 assert(cflow[0].settings[SOURCE].num_extra_socket_options < MAX_EXTRA_SOCKET_OPTIONS);
3020 struct extra_socket_options *option = &cflow[0].settings[SOURCE].extra_socket_options[cflow[0].settings[SOURCE].num_extra_socket_options++];
3021 int v;
3022
3023 /* The value of the TCP_NODELAY constant gets passed to the daemons.
3024 * If daemons use a different system, constants may be different. In this case use
3025 * a value that matches the daemons'. */
3026 option->optname = TCP_NODELAY; /* or option->optname = 12345; as explained above */
3027
3028 option->level = level_ipproto_tcp; /* See extra_socket_option_level enum in common.h */
3029
3030 /* Again, value needs to be of correct size for the daemons.
3031 * Particular pitfalls can be differences in integer sizes or endianess.
3032 */
3033 assert(sizeof(v) < MAX_EXTRA_SOCKET_OPTION_VALUE_LENGTH);
3034 option->optlen = sizeof(v);
3035 memcpy(option->optval, &v, sizeof(v));
3036 }
3037 #endif /* 0 */
3038
3039 for (unsigned short id = 0; id < copt.num_flows; id++) {
3040 cflow[id].settings[SOURCE].duration[READ] = cflow[id].settings[DESTINATION].duration[WRITE];
3041 cflow[id].settings[DESTINATION].duration[READ] = cflow[id].settings[SOURCE].duration[WRITE];
3042 cflow[id].settings[SOURCE].delay[READ] = cflow[id].settings[DESTINATION].delay[WRITE];
3043 cflow[id].settings[DESTINATION].delay[READ] = cflow[id].settings[SOURCE].delay[WRITE];
3044
3045 foreach(int *i, SOURCE, DESTINATION) {
3046 /* Default to localhost, if no endpoints were set for a flow */
3047 if (!cflow[id].endpoint[*i].rpc_info) {
3048 cflow[id].endpoint[*i].rpc_info = set_rpc_info(
3049 "http://localhost:5999/RPC2", "localhost", DEFAULT_LISTEN_PORT);
3050 }
3051 }
3052 }
3053
3054 foreach(int *i, MUTEX_CONTEXT_CONTROLLER, MUTEX_CONTEXT_TWO_SIDED,
3055 MUTEX_CONTEXT_TWO_SIDED, MUTEX_CONTEXT_DESTINATION)
3056 ap_free_mutex_state(&ms[*i]);
3057 }
3058
3059 /**
3060 * Sanity checking flow options.
3061 */
sanity_check(void)3062 static void sanity_check(void)
3063 {
3064 for (unsigned short id = 0; id < copt.num_flows; id++) {
3065 DEBUG_MSG(LOG_DEBUG, "sanity checking parameter set of flow %d", id);
3066 if (cflow[id].settings[DESTINATION].duration[WRITE] > 0 &&
3067 cflow[id].late_connect &&
3068 cflow[id].settings[DESTINATION].delay[WRITE] <
3069 cflow[id].settings[SOURCE].delay[WRITE]) {
3070 errx("server flow %d starts earlier than client "
3071 "flow while late connecting", id);
3072 exit(EXIT_FAILURE);
3073 }
3074 if (cflow[id].settings[SOURCE].delay[WRITE] > 0 &&
3075 cflow[id].settings[SOURCE].duration[WRITE] == 0) {
3076 errx("client flow %d has a delay but no runtime", id);
3077 exit(EXIT_FAILURE);
3078 }
3079 if (cflow[id].settings[DESTINATION].delay[WRITE] > 0 &&
3080 cflow[id].settings[DESTINATION].duration[WRITE] == 0) {
3081 errx("server flow %d has a delay but no runtime", id);
3082 exit(EXIT_FAILURE);
3083 }
3084 if (!cflow[id].settings[DESTINATION].duration[WRITE] &&
3085 !cflow[id].settings[SOURCE].duration[WRITE]) {
3086 errx("server and client flow have both zero runtime "
3087 "for flow %d", id);
3088 exit(EXIT_FAILURE);
3089 }
3090
3091 foreach(int *i, SOURCE, DESTINATION) {
3092 if (cflow[id].settings[*i].flow_control &&
3093 !cflow[id].settings[*i].write_rate_str) {
3094 errx("flow %d has flow control enabled but no "
3095 "rate", id);
3096 exit(EXIT_FAILURE);
3097 }
3098
3099 if (cflow[id].settings[*i].write_rate &&
3100 (cflow[id].settings[*i].write_rate /
3101 cflow[id].settings[*i].maximum_block_size) < 1) {
3102 errx("client block size for flow %u is too big for "
3103 "specified rate", id);
3104 exit(EXIT_FAILURE);
3105 }
3106 }
3107 DEBUG_MSG(LOG_DEBUG, "sanity check parameter set of flow %d completed", id);
3108 }
3109 }
3110
main(int argc,char * argv[])3111 int main(int argc, char *argv[])
3112 {
3113 struct sigaction sa;
3114 sa.sa_handler = sighandler;
3115 sa.sa_flags = 0;
3116 sigemptyset (&sa.sa_mask);
3117 if (sigaction(SIGINT, &sa, NULL))
3118 critx("could not set handler for SIGINT");
3119
3120 xmlrpc_client *rpc_client = 0;
3121 xmlrpc_env_init(&rpc_env);
3122 xmlrpc_client_setup_global_const(&rpc_env);
3123
3124 fg_list_init(&flows_rpc_info);
3125 fg_list_init(&unique_daemons);
3126
3127 set_progname(argv[0]);
3128 init_controller_options();
3129 init_flow_options();
3130 parse_cmdline(argc, argv);
3131 sanity_check();
3132 open_logfile();
3133 prepare_xmlrpc_client(&rpc_client);
3134
3135 DEBUG_MSG(LOG_WARNING, "check daemons in the flows");
3136 if (!sigint_caught)
3137 find_daemon(rpc_client);
3138
3139 DEBUG_MSG(LOG_WARNING, "check flowgrindds versions");
3140 if (!sigint_caught)
3141 check_version(rpc_client);
3142
3143 DEBUG_MSG(LOG_WARNING, "check if flowgrindds are idle");
3144 if (!sigint_caught)
3145 check_idle(rpc_client);
3146
3147 DEBUG_MSG(LOG_WARNING, "prepare all flows");
3148 if (!sigint_caught)
3149 prepare_all_flows(rpc_client);
3150
3151 DEBUG_MSG(LOG_WARNING, "print headline");
3152 if (!sigint_caught)
3153 print_headline();
3154
3155 DEBUG_MSG(LOG_WARNING, "start all flows");
3156 if (!sigint_caught)
3157 start_all_flows(rpc_client);
3158
3159 DEBUG_MSG(LOG_WARNING, "close all flows");
3160 close_all_flows();
3161
3162 DEBUG_MSG(LOG_WARNING, "print all final report");
3163 fetch_reports(rpc_client);
3164 print_all_final_reports();
3165
3166 fg_list_clear(&flows_rpc_info);
3167 fg_list_clear(&unique_daemons);
3168
3169 close_logfile();
3170
3171 xmlrpc_client_destroy(rpc_client);
3172 xmlrpc_env_clean(&rpc_env);
3173 xmlrpc_client_teardown_global_const();
3174
3175 ap_free(&parser);
3176
3177 DEBUG_MSG(LOG_WARNING, "bye");
3178 }
3179