1 /**
2  * @file flowgrind.c
3  * @brief Flowgrind controller
4  */
5 
6 /*
7  * Copyright (C) 2013-2014 Alexander Zimmermann <alexander.zimmermann@netapp.com>
8  * Copyright (C) 2010-2013 Arnd Hannemann <arnd@arndnet.de>
9  * Copyright (C) 2010-2013 Christian Samsel <christian.samsel@rwth-aachen.de>
10  * Copyright (C) 2009 Tim Kosse <tim.kosse@gmx.de>
11  * Copyright (C) 2007-2008 Daniel Schaffrath <daniel.schaffrath@mac.com>
12  *
13  * This file is part of Flowgrind.
14  *
15  * Flowgrind is free software: you can redistribute it and/or modify
16  * it under the terms of the GNU General Public License as published by
17  * the Free Software Foundation, either version 3 of the License, or
18  * (at your option) any later version.
19  *
20  * Flowgrind is distributed in the hope that it will be useful,
21  * but WITHOUT ANY WARRANTY; without even the implied warranty of
22  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  * GNU General Public License for more details.
24  *
25  * You should have received a copy of the GNU General Public License
26  * along with Flowgrind.  If not, see <http://www.gnu.org/licenses/>.
27  *
28  */
29 
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif /* HAVE_CONFIG_H */
33 
34 #include <assert.h>
35 #include <errno.h>
36 #include <limits.h>
37 #include <math.h>
38 #include <sys/types.h>
39 /* for AF_INET6 */
40 #include <sys/socket.h>
41 #include <arpa/inet.h>
42 #include <netinet/in.h>
43 #include <netinet/ip.h>
44 /* for CA states (on Linux only) */
45 #include <netinet/tcp.h>
46 #include <signal.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <string.h>
50 #include <sys/param.h>
51 #include <sys/uio.h>
52 #include <sys/utsname.h>
53 #include <time.h>
54 #include <unistd.h>
55 #include <fcntl.h>
56 #include <syslog.h>
57 /* xmlrpc-c */
58 #include <xmlrpc-c/base.h>
59 #include <xmlrpc-c/client.h>
60 
61 #include "flowgrind.h"
62 #include "common.h"
63 #include "fg_error.h"
64 #include "fg_progname.h"
65 #include "fg_time.h"
66 #include "fg_definitions.h"
67 #include "fg_string.h"
68 #include "debug.h"
69 #include "fg_rpc_client.h"
70 #include "fg_argparser.h"
71 #include "fg_log.h"
72 
73 /** To show intermediated interval report columns. */
74 #define SHOW_COLUMNS(...)                                                   \
75         (set_column_visibility(true, NARGS(__VA_ARGS__), __VA_ARGS__))
76 
77 /** To hide intermediated interval report columns. */
78 #define HIDE_COLUMNS(...)                                                   \
79         (set_column_visibility(false, NARGS(__VA_ARGS__), __VA_ARGS__))
80 
81 /** To set the unit of intermediated interval report columns. */
82 #define SET_COLUMN_UNIT(unit, ...)                                          \
83         (set_column_unit(unit, NARGS(__VA_ARGS__), __VA_ARGS__))
84 
85 /** Print error message, usage string and exit. Used for cmdline parsing errors. */
86 #define PARSE_ERR(err_msg, ...) do {	\
87 	errx(err_msg, ##__VA_ARGS__);	\
88 	usage(EXIT_FAILURE);		\
89 } while (0)
90 
91 /* External global variables */
92 extern const char *progname;
93 
94 /** Logfile for measurement output. */
95 static FILE *log_stream = NULL;
96 
97 /** Name of logfile. */
98 static char *log_filename = NULL;
99 
100 /** SIGINT (CTRL-C) received? */
101 static bool sigint_caught = false;
102 
103 /* XML-RPC environment object that contains any error that has occurred. */
104 static xmlrpc_env rpc_env;
105 
106 /** Global linked list to the flow endpoints XML RPC connection information. */
107 static struct linked_list flows_rpc_info;
108 
109 /** Global linked list to the daemons containing UUID and daemons flowgrind version. */
110 static struct linked_list unique_daemons;
111 
112 /** Command line option parser. */
113 static struct arg_parser parser;
114 
115 /** Controller options. */
116 static struct controller_options copt;
117 
118 /** Infos about all flows including flow options. */
119 static struct cflow cflow[MAX_FLOWS_CONTROLLER];
120 
121 /** Command line option parser. */
122 static struct arg_parser parser;
123 
124 /** Number of currently active flows. */
125 static unsigned short active_flows = 0;
126 
127 /* To cover a gcc bug (http://gcc.gnu.org/bugzilla/show_bug.cgi?id=36446) */
128 #pragma GCC diagnostic push
129 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
130 /** Infos about the intermediated interval report columns. */
131 static struct column column_info[] = {
132 	{.type = COL_FLOW_ID, .header.name = "# ID",
133 	 .header.unit = "#   ", .state.visible = true},
134 	{.type = COL_BEGIN, .header.name = "begin",
135 	 .header.unit = "[s]", .state.visible = true},
136 	{.type = COL_END, .header.name = "end",
137 	 .header.unit = "[s]", .state.visible = true},
138 	{.type = COL_THROUGH, .header.name = "through",
139 	 .header.unit = "[Mbit/s]", .state.visible = true},
140 	{.type = COL_TRANSAC, .header.name = "transac",
141 	 .header.unit = "[#/s]", .state.visible = true},
142 	{.type = COL_BLOCK_REQU, .header.name = "requ",
143 	 .header.unit = "[#]", .state.visible = false},
144 	{.type = COL_BLOCK_RESP, .header.name = "resp",
145 	 .header.unit = "[#]", .state.visible = false},
146 	{.type = COL_RTT_MIN, .header.name = "min RTT",
147 	 .header.unit = "[ms]", .state.visible = false},
148 	{.type = COL_RTT_AVG, .header.name = "avg RTT",
149 	 .header.unit = "[ms]", .state.visible = false},
150 	{.type = COL_RTT_MAX, .header.name = "max RTT",
151 	 .header.unit = "[ms]", .state.visible = false},
152 	{.type = COL_IAT_MIN, .header.name = "min IAT",
153 	 .header.unit = "[ms]", .state.visible = true},
154 	{.type = COL_IAT_AVG, .header.name = "avg IAT",
155 	 .header.unit = "[ms]", .state.visible = true},
156 	{.type = COL_IAT_MAX, .header.name = "max IAT",
157 	 .header.unit = "[ms]", .state.visible = true},
158 	{.type = COL_DLY_MIN, .header.name = "min DLY",
159 	 .header.unit = "[ms]", .state.visible = false},
160 	{.type = COL_DLY_AVG, .header.name = "avg DLY",
161 	 .header.unit = "[ms]", .state.visible = false},
162 	{.type = COL_DLY_MAX, .header.name = "max DLY",
163 	 .header.unit = "[ms]", .state.visible = false},
164 	{.type = COL_TCP_CWND, .header.name = "cwnd",
165 	 .header.unit = "[#]", .state.visible = true},
166 	{.type = COL_TCP_SSTH, .header.name = "ssth",
167 	 .header.unit = "[#]", .state.visible = true},
168 	{.type = COL_TCP_UACK, .header.name = "uack",
169 	 .header.unit = "[#]", .state.visible = true},
170 	{.type = COL_TCP_SACK, .header.name = "sack",
171 	 .header.unit = "[#]", .state.visible = true},
172 	{.type = COL_TCP_LOST, .header.name = "lost",
173 	 .header.unit = "[#]", .state.visible = true},
174 	{.type = COL_TCP_RETR, .header.name = "retr",
175 	 .header.unit = "[#]", .state.visible = true},
176 	{.type = COL_TCP_TRET, .header.name = "tret",
177 	 .header.unit = "[#]", .state.visible = true},
178 	{.type = COL_TCP_FACK, .header.name = "fack",
179 	 .header.unit = "[#]", .state.visible = true},
180 	{.type = COL_TCP_REOR, .header.name = "reor",
181 	 .header.unit = "[#]", .state.visible = true},
182 	{.type = COL_TCP_BKOF, .header.name = "bkof",
183 	 .header.unit = "[#]", .state.visible = true},
184 	{.type = COL_TCP_RTT, .header.name = "rtt",
185 	 .header.unit = "[ms]", .state.visible = true},
186 	{.type = COL_TCP_RTTVAR, .header.name = "rttvar",
187 	 .header.unit = "[ms]", .state.visible = true},
188 	{.type = COL_TCP_RTO, .header.name = "rto",
189 	 .header.unit = "[ms]", .state.visible = true},
190 	{.type = COL_TCP_CA_STATE, .header.name = "ca state",
191 	 .header.unit = "", .state.visible = true},
192 	{.type = COL_SMSS, .header.name = "smss",
193 	 .header.unit = "[B]", .state.visible = true},
194 	{.type = COL_PMTU, .header.name = "pmtu",
195 	 .header.unit = "[B]", .state.visible = true},
196 #ifdef DEBUG
197 	{.type = COL_STATUS, .header.name = "status",
198 	 .header.unit = "", .state.visible = false}
199 #endif /* DEBUG */
200 };
201 #pragma GCC diagnostic pop
202 
203 /* Forward declarations */
204 static void usage(short status)
205 	__attribute__((noreturn));
206 static void usage_sockopt(void)
207 	__attribute__((noreturn));
208 static void usage_trafgenopt(void)
209 	__attribute__((noreturn));
210 inline static void print_output(const char *fmt, ...)
211 	__attribute__((format(printf, 1, 2)));
212 static void fetch_reports(xmlrpc_client *);
213 static void report_flow(struct report* report);
214 static void print_interval_report(unsigned short flow_id, enum endpoint_t e,
215 		                  struct report *report);
216 
217 /**
218  * Print usage or error message and exit.
219  *
220  * Depending on exit status @p status print either the usage or an error
221  * message. In all cases it call exit() with the given exit status @p status.
222  *
223  * @param[in] status exit status
224  */
usage(short status)225 static void usage(short status)
226 {
227 	/* Syntax error. Emit 'try help' to stderr and exit */
228 	if (status != EXIT_SUCCESS) {
229 		fprintf(stderr, "Try '%s -h' for more information\n", progname);
230 		exit(status);
231 	}
232 
233 	fprintf(stdout,
234 		"Usage: %1$s [OPTION]...\n"
235 		"Advanced TCP traffic generator for Linux, FreeBSD, and Mac OS X.\n\n"
236 
237 		"Mandatory arguments to long options are mandatory for short options too.\n\n"
238 
239 		"General options:\n"
240 		"  -h, --help[=WHAT]\n"
241 		"                 display help and exit. Optional WHAT can either be 'socket' for\n"
242 		"                 help on socket options or 'traffic' traffic generation help\n"
243 		"  -v, --version  print version information and exit\n\n"
244 
245 		"Controller options:\n"
246 		"  -c, --show-colon=TYPE[,TYPE]...\n"
247 		"                 display intermediated interval report column TYPE in output.\n"
248 		"                 Allowed values for TYPE are: 'interval', 'through', 'transac',\n"
249 		"                 'iat', 'kernel' (all show per default), and 'blocks', 'rtt',\n"
250 #ifdef DEBUG
251 		"                 'delay', 'status' (optional)\n"
252 #else /* DEBUG */
253 		"                 'delay' (optional)\n"
254 #endif /* DEBUG */
255 #ifdef DEBUG
256 		"  -d, --debug    increase debugging verbosity. Add option multiple times to\n"
257 		"                 increase the verbosity\n"
258 #endif /* DEBUG */
259 		"  -e, --dump-prefix=PRE\n"
260 		"                 prepend prefix PRE to pcap dump filename (default: \"%3$s\")\n"
261 		"  -i, --report-interval=#.#\n"
262 		"                 reporting interval, in seconds (default: 0.05s)\n"
263 		"      --log-file[=FILE]\n"
264 		"                 write output to logfile FILE (default: %1$s-'timestamp'.log)\n"
265 		"  -m             report throughput in 2**20 bytes/s (default: 10**6 bit/s)\n"
266 		"  -n, --flows=#  number of test flows (default: 1)\n"
267 		"  -o             overwrite existing log files (default: don't)\n"
268 		"  -p             don't print symbolic values (like INT_MAX) instead of numbers\n"
269 		"  -q, --quiet    be quiet, do not log to screen (default: off)\n"
270 		"  -s, --tcp-stack=TYPE\n"
271 		"                 don't determine unit of source TCP stacks automatically. Force\n"
272 		"                 unit to TYPE, where TYPE is 'segment' or 'byte'\n"
273 		"  -w             write output to logfile (same as --log-file)\n\n"
274 
275 		"Flow options:\n"
276 		"  Some of these options take the flow endpoint as argument, denoted by 'x' in\n"
277 		"  the option syntax. 'x' needs to be replaced with either 's' for the source\n"
278 		"  endpoint, 'd' for the destination endpoint or 'b' for both endpoints. To\n"
279 		"  specify different values for each endpoints, separate them by comma. For\n"
280 		"  instance -W s=8192,d=4096 sets the advertised window to 8192 at the source\n"
281 		"  and 4096 at the destination.\n\n"
282 		"  -A x           use minimal response size needed for RTT calculation\n"
283 		"                 (same as -G s=p,C,%2$d)\n"
284 		"  -B x=#         set requested sending buffer, in bytes\n"
285 		"  -C x           stop flow if it is experiencing local congestion\n"
286 		"  -D x=DSCP      DSCP value for TOS byte\n"
287 		"  -E             enumerate bytes in payload instead of sending zeros\n"
288 		"  -F #[,#]...    flow options following this option apply only to the given flow \n"
289 		"                 IDs. Useful in combination with -n to set specific options\n"
290 		"                 for certain flows. Numbering starts with 0, so -F 1 refers\n"
291 		"                 to the second flow. With -1 all flow are refered\n"
292 #ifdef HAVE_LIBGSL
293 		"  -G x=(q|p|g):(C|U|E|N|L|P|W):#1:[#2]\n"
294 #else /* HAVE_LIBGSL */
295 		"  -G x=(q|p|g):(C|U):#1:[#2]\n"
296 #endif /* HAVE_LIBGSL */
297 		"                 activate stochastic traffic generation and set parameters\n"
298 		"                 according to the used distribution. For additional information \n"
299 		"                 see 'flowgrind --help=traffic'\n"
300 		"  -H x=HOST[/CONTROL[:PORT]]\n"
301 		"                 test from/to HOST. Optional argument is the address and port\n"
302 		"                 for the CONTROL connection to the same host.\n"
303 		"                 An endpoint that isn't specified is assumed to be localhost\n"
304 		"  -J #           use random seed # (default: read /dev/urandom)\n"
305 		"  -I             enable one-way delay calculation (no clock synchronization)\n"
306 		"  -L             call connect() on test socket immediately before starting to\n"
307 		"                 send data (late connect). If not specified the test connection\n"
308 		"                 is established in the preparation phase before the test starts\n"
309 		"  -M x           dump traffic using libpcap. flowgrindd must be run as root\n"
310 		"  -N             shutdown() each socket direction after test flow\n"
311 		"  -O x=OPT       set socket option OPT on test socket. For additional information\n"
312 		"                 see 'flowgrind --help=socket'\n"
313 		"  -P x           do not iterate through select() to continue sending in case\n"
314 		"                 block size did not suffice to fill sending queue (pushy)\n"
315 		"  -Q             summarize only, no intermediated interval reports are\n"
316 		"                 computed (quiet)\n"
317 		"  -R x=#.#(z|k|M|G)(b|B)\n"
318 		"                 send at specified rate per second, where: z = 2**0, k = 2**10,\n"
319 		"                 M = 2**20, G = 2**30, and b = bits/s (default), B = bytes/s\n"
320 		"  -S x=#         set block (message) size, in bytes (same as -G s=q,C,#)\n"
321 		"  -T x=#.#       set flow duration, in seconds (default: s=10,d=0)\n"
322 		"  -U x=#         set application buffer size, in bytes (default: 8192)\n"
323 		"                 truncates values if used with stochastic traffic generation\n"
324 		"  -W x=#         set requested receiver buffer (advertised window), in bytes\n"
325 		"  -Y x=#.#       set initial delay before the host starts to send, in seconds\n"
326 /*		"  -Z x=#.#       set amount of data to be send, in bytes (instead of -t)\n"*/,
327 		progname,
328 		MIN_BLOCK_SIZE
329 		, copt.dump_prefix
330 		);
331 	exit(EXIT_SUCCESS);
332 }
333 
334 /**
335  * Print help on flowgrind's socket options and exit with EXIT_SUCCESS.
336  */
usage_sockopt(void)337 static void usage_sockopt(void)
338 {
339 	fprintf(stdout,
340 		"%s allows to set the following standard and non-standard socket options. \n\n"
341 
342 		"All socket options take the flow endpoint as argument, denoted by 'x' in the\n"
343 		"option syntax. 'x' needs to be replaced with either 's' for the source endpoint,\n"
344 		"'d' for the destination endpoint or 'b' for both endpoints. To specify different\n"
345 		"values for each endpoints, separate them by comma. Moreover, it is possible to\n"
346 		"repeatedly pass the same endpoint in order to specify multiple socket options\n\n"
347 
348 		"Standard socket options:\n"
349 		"  -O x=TCP_CONGESTION=ALG\n"
350 		"               set congestion control algorithm ALG on test socket\n"
351 		"  -O x=TCP_CORK\n"
352 		"               set TCP_CORK on test socket\n"
353 		"  -O x=TCP_NODELAY\n"
354 		"               disable nagle algorithm on test socket\n"
355 		"  -O x=SO_DEBUG\n"
356 		"               set SO_DEBUG on test socket\n"
357 		"  -O x=IP_MTU_DISCOVER\n"
358 		"               set IP_MTU_DISCOVER on test socket if not already enabled by\n"
359 		"               system default\n"
360 		"  -O x=ROUTE_RECORD\n"
361 		"               set ROUTE_RECORD on test socket\n\n"
362 
363 		"Non-standard socket options:\n"
364 		"  -O x=TCP_MTCP\n"
365 		"               set TCP_MTCP (15) on test socket\n"
366 		"  -O x=TCP_ELCN\n"
367 		"               set TCP_ELCN (20) on test socket\n"
368 		"  -O x=TCP_LCD set TCP_LCD (21) on test socket\n\n"
369 
370 		"Examples:\n"
371 		"  -O s=TCP_CONGESTION=reno,d=SO_DEBUG\n"
372 		"               sets Reno TCP as congestion control algorithm at the source and\n"
373 		"               SO_DEBUG as socket option at the destinatio\n"
374 		"  -O s=SO_DEBUG,s=TCP_CORK\n"
375 		"               set SO_DEBUG and TCP_CORK as socket option at the source\n",
376 		progname);
377 	exit(EXIT_SUCCESS);
378 }
379 
380 /**
381  * Print help on flowgrind's traffic generation facilities and exit with EXIT_SUCCESS.
382  */
usage_trafgenopt(void)383 static void usage_trafgenopt(void)
384 {
385 	fprintf(stdout,
386 		"%s supports stochastic traffic generation, which allows to conduct\n"
387 		"besides normal bulk also advanced rate-limited and request-response data\n"
388 		"transfers.\n\n"
389 
390 		"The stochastic traffic generation option '-G' takes the flow endpoint as\n"
391 		"argument, denoted by 'x' in the option syntax. 'x' needs to be replaced with\n"
392 		"either 's' for the source endpoint, 'd' for the destination endpoint or 'b' for\n"
393 		"both endpoints. However, please note that bidirectional traffic generation can\n"
394 		"lead to unexpected results. To specify different values for each endpoints,\n"
395 		"separate them by comma.\n\n"
396 
397 		"Stochastic traffic generation:\n"
398 #ifdef HAVE_LIBGSL
399 		"  -G x=(q|p|g):(C|U|E|N|L|P|W):#1:[#2]\n"
400 #else /* HAVE_LIBGSL */
401 		"  -G x=(q|p|g):(C|U):#1:[#2]\n"
402 #endif /* HAVE_LIBGSL */
403 		"               Flow parameter:\n"
404 		"                 q = request size (in bytes)\n"
405 		"                 p = response size (in bytes)\n"
406 		"                 g = request interpacket gap (in seconds)\n\n"
407 
408 		"               Distributions:\n"
409 		"                 C = constant (#1: value, #2: not used)\n"
410 		"                 U = uniform (#1: min, #2: max)\n"
411 #ifdef HAVE_LIBGSL
412 		"                 E = exponential (#1: lamba - lifetime, #2: not used)\n"
413 		"                 N = normal (#1: mu - mean value, #2: sigma_square - variance)\n"
414 		"                 L = lognormal (#1: zeta - mean, #2: sigma - std dev)\n"
415 		"                 P = pareto (#1: k - shape, #2 x_min - scale)\n"
416 		"                 W = weibull (#1: lambda - scale, #2: k - shape)\n"
417 #else /* HAVE_LIBGSL */
418 		"               advanced distributions are only available if compiled with libgsl\n"
419 #endif /* HAVE_LIBGSL */
420 		"  -U x=#       specify a cap for the calculated values for request and response\n"
421 		"               size (not needed for constant values or uniform distribution),\n"
422 		"               values over this cap are recalculated\n\n"
423 
424 		"Examples:\n"
425 		"  -G s=q:C:40\n"
426 		"               use contant request size of 40 bytes\n"
427 		"  -G s=p:N:2000:50\n"
428 		"               use normal distributed response size with mean 2000 bytes and\n"
429 		"               variance 50\n"
430 		"  -G s=g:U:0.005:0.01\n"
431 		"               use uniform distributed interpacket gap with minimum 0.005s and\n"
432 		"               maximum 0.01s\n\n"
433 
434 		"Notes: \n"
435 		"  - The man page contains more explained examples\n"
436 		"  - Using bidirectional traffic generation can lead to unexpected results\n"
437 		"  - Usage of -G in conjunction with -A, -R, -S is not recommended, as they\n"
438 		"    overwrite each other. -A, -R and -S exist as shortcut only\n",
439 		progname);
440 	exit(EXIT_SUCCESS);
441 }
442 
443 /**
444  * Signal handler to catching signals.
445  *
446  * @param[in] sig signal to catch
447  */
sighandler(int sig)448 static void sighandler(int sig)
449 {
450 	UNUSED_ARGUMENT(sig);
451 
452 	DEBUG_MSG(LOG_ERR, "caught %s", strsignal(sig));
453 
454 	if (!sigint_caught) {
455 		warnx("caught SIGINT, trying to gracefully close flows. "
456 		      "Press CTRL+C again to force termination \n");
457 		sigint_caught = true;
458 	} else {
459 		exit(EXIT_FAILURE);
460 	}
461 }
462 
463 /**
464  * Initialization of general controller options.
465  */
init_controller_options(void)466 static void init_controller_options(void)
467 {
468 	copt.num_flows = 1;
469 	copt.reporting_interval = 0.05;
470 	copt.log_to_stdout = true;
471 	copt.log_to_file = false;
472 	copt.dump_prefix = "flowgrind-";
473 	copt.clobber = false;
474 	copt.mbyte = false;
475 	copt.symbolic = true;
476 	copt.force_unit = INT_MAX;
477 }
478 
479 /**
480  * Initilization the flow option to default values.
481  *
482  * Initializes the controller flow option settings,
483  * final report for both source and destination daemon
484  * in the flow.
485  */
init_flow_options(void)486 static void init_flow_options(void)
487 {
488 	for (int id = 0; id < MAX_FLOWS_CONTROLLER; id++) {
489 
490 		cflow[id].proto = PROTO_TCP;
491 
492 		foreach(int *i, SOURCE, DESTINATION) {
493 			cflow[id].settings[*i].requested_send_buffer_size = 0;
494 			cflow[id].settings[*i].requested_read_buffer_size = 0;
495 			cflow[id].settings[*i].delay[WRITE] = 0;
496 			cflow[id].settings[*i].maximum_block_size = 8192;
497 			cflow[id].settings[*i].request_trafgen_options.param_one = 8192;
498 			cflow[id].settings[*i].response_trafgen_options.param_one = 0;
499 			cflow[id].settings[*i].route_record = 0;
500 			strcpy(cflow[id].endpoint[*i].test_address, "localhost");
501 
502 			/* Default daemon is localhost, set in parse_cmdline */
503 			cflow[id].endpoint[*i].rpc_info = 0;
504 			cflow[id].endpoint[*i].daemon = 0;
505 
506 			cflow[id].settings[*i].pushy = 0;
507 			cflow[id].settings[*i].cork = 0;
508 			cflow[id].settings[*i].cc_alg[0] = 0;
509 			cflow[id].settings[*i].elcn = 0;
510 			cflow[id].settings[*i].lcd = 0;
511 			cflow[id].settings[*i].mtcp = 0;
512 			cflow[id].settings[*i].nonagle = 0;
513 			cflow[id].settings[*i].traffic_dump = 0;
514 			cflow[id].settings[*i].so_debug = 0;
515 			cflow[id].settings[*i].dscp = 0;
516 			cflow[id].settings[*i].ipmtudiscover = 0;
517 
518 			cflow[id].settings[*i].num_extra_socket_options = 0;
519 		}
520 		cflow[id].settings[SOURCE].duration[WRITE] = 10.0;
521 		cflow[id].settings[DESTINATION].duration[WRITE] = 0.0;
522 
523 		cflow[id].endpoint_id[0] = cflow[id].endpoint_id[1] = -1;
524 		cflow[id].start_timestamp[0].tv_sec = 0;
525 		cflow[id].start_timestamp[0].tv_nsec = 0;
526 		cflow[id].start_timestamp[1].tv_sec = 0;
527 		cflow[id].start_timestamp[1].tv_nsec = 0;
528 
529 		cflow[id].finished[0] = 0;
530 		cflow[id].finished[1] = 0;
531 		cflow[id].final_report[0] = NULL;
532 		cflow[id].final_report[1] = NULL;
533 
534 		cflow[id].summarize_only = 0;
535 		cflow[id].late_connect = 0;
536 		cflow[id].shutdown = 0;
537 		cflow[id].byte_counting = 0;
538 		cflow[id].random_seed = 0;
539 
540 		int data = open("/dev/urandom", O_RDONLY);
541 		int rc = read(data, &cflow[id].random_seed, sizeof (int) );
542 		close(data);
543 		if(rc == -1)
544 			crit("read /dev/urandom failed");
545 	}
546 }
547 
548 /**
549  * Create a logfile for measurement output.
550  */
open_logfile(void)551 static void open_logfile(void)
552 {
553 	if (!copt.log_to_file)
554 		return;
555 
556 	/* Log filename is not given by cmdline */
557 	if (!log_filename) {
558 		if (asprintf(&log_filename, "%s-%s.log", progname,
559 			     ctimenow(false)) == -1)
560 			critx("could not allocate memory for log filename");
561 	}
562 
563 	if (!copt.clobber && access(log_filename, R_OK) == 0)
564 		critx("log file exists");
565 
566 	log_stream = fopen(log_filename, "w");
567 	if (!log_stream)
568 		critx("could not open logfile '%s'", log_filename);
569 
570 	DEBUG_MSG(LOG_NOTICE, "logging to '%s'", log_filename);
571 }
572 
573 /**
574  * Close measurement output file.
575  */
close_logfile(void)576 static void close_logfile(void)
577 {
578 	if (!copt.log_to_file)
579 		return;
580 	if (fclose(log_stream) == -1)
581 		critx("could not close logfile '%s'", log_filename);
582 
583 	free(log_filename);
584 }
585 
586 /**
587  * Print measurement output to logfile and / or to stdout.
588  *
589  * @param[in] fmt format string
590  * @param[in] ... parameters used to fill fmt
591  */
print_output(const char * fmt,...)592 inline static void print_output(const char *fmt, ...)
593 {
594 	va_list ap;
595 
596 	va_start(ap, fmt);
597 	if (copt.log_to_stdout) {
598 		vprintf(fmt, ap);
599 		fflush(stdout);
600 	}
601 	if (copt.log_to_file) {
602 		vfprintf(log_stream, fmt, ap);
603 		fflush(log_stream);
604 	}
605 	va_end(ap);
606 }
607 
die_if_fault_occurred(xmlrpc_env * env)608 inline static void die_if_fault_occurred(xmlrpc_env *env)
609 {
610     if (env->fault_occurred)
611 	critx("XML-RPC Fault: %s (%d)", env->fault_string, env->fault_code);
612 }
613 
614 /* creates an xmlrpc_client for connect to server, uses global env rpc_env */
prepare_xmlrpc_client(xmlrpc_client ** rpc_client)615 static void prepare_xmlrpc_client(xmlrpc_client **rpc_client)
616 {
617 	struct xmlrpc_clientparms clientParms;
618 	size_t clientParms_cpsize = XMLRPC_CPSIZE(transport);
619 
620 	/* Since version 1.21 xmlrpclib will automatically generate a
621 	 * rather long user_agent, we will do a lot of RPC calls so let's
622 	 * spare some bytes and omit this header */
623 #ifdef HAVE_STRUCT_XMLRPC_CURL_XPORTPARMS_DONT_ADVERTISE
624 	struct xmlrpc_curl_xportparms curlParms;
625 	memset(&curlParms, 0, sizeof(curlParms));
626 
627 	curlParms.dont_advertise = 1;
628 	clientParms.transportparmsP = &curlParms;
629 	clientParms.transportparm_size = XMLRPC_CXPSIZE(dont_advertise);
630 	clientParms_cpsize = XMLRPC_CPSIZE(transportparm_size);
631 #endif /* HAVE_STRUCT_XMLRPC_CURL_XPORTPARMS_DONT_ADVERTISE */
632 
633 	/* Force usage of curl transport, we require it in configure script
634 	 * anyway and at least FreeBSD 9.1 will use libwww otherwise */
635 	clientParms.transport = "curl";
636 
637 	DEBUG_MSG(LOG_WARNING, "prepare xmlrpc client");
638 	xmlrpc_client_create(&rpc_env, XMLRPC_CLIENT_NO_FLAGS, "Flowgrind",
639 			     FLOWGRIND_VERSION, &clientParms,
640 			     clientParms_cpsize, rpc_client);
641 }
642 
643 /**
644  * Checks all the daemons flowgrind version.
645  *
646  * Collect the daemons flowgrind version, XML-RPC API version,
647  * OS name and release details. Store these information in the
648  * daemons linked list for the result display
649  *
650  * @param[in,out] rpc_client to connect controller to daemon
651  */
check_version(xmlrpc_client * rpc_client)652 static void check_version(xmlrpc_client *rpc_client)
653 {
654 	xmlrpc_value * resultP = 0;
655 	char mismatch = 0;
656 
657 	const struct list_node *node = fg_list_front(&unique_daemons);
658 
659 	while (node) {
660 		if (sigint_caught)
661 			return;
662 
663 		struct daemon *daemon = node->data;
664 		node = node->next;
665 
666 		xmlrpc_client_call2f(&rpc_env, rpc_client, daemon->url,
667 					"get_version", &resultP, "()");
668 		if ((rpc_env.fault_occurred) && (strcasestr(rpc_env.fault_string,"response code is 400")))
669 			critx("node %s could not parse request.You are "
670 			      "probably trying to use a numeric IPv6 address "
671 			      "and the node's libxmlrpc is too old, please "
672 			      "upgrade!", daemon->url);
673 
674 		die_if_fault_occurred(&rpc_env);
675 
676 		/* Decomposes the xmlrpc value and extract the daemons data in
677 		 * it into controller local variable */
678 		if (resultP) {
679 			char* version;
680 			int api_version;
681 			char* os_name;
682 			char* os_release;
683 			xmlrpc_decompose_value(&rpc_env, resultP, "{s:s,s:i,s:s,s:s,*}",
684 						"version", &version,
685 						"api_version", &api_version,
686 						"os_name", &os_name,
687 						"os_release", &os_release);
688 			die_if_fault_occurred(&rpc_env);
689 
690 			if (strcmp(version, FLOWGRIND_VERSION)) {
691 				mismatch = 1;
692 				warnx("node %s uses version %s",
693 				      daemon->url, version);
694 			}
695 			/* Store the daemons XML RPC API version,
696 			 * OS name and release in daemons linked list */
697 			daemon->api_version = api_version;
698 			strncpy(daemon->os_name, os_name, 256);
699 			strncpy(daemon->os_release, os_release, 256);
700 			free_all(version, os_name, os_release);
701 			xmlrpc_DECREF(resultP);
702 		}
703 	}
704 
705 	if (mismatch) {
706 		warnx("our version is %s\n\nContinuing in 5 seconds", FLOWGRIND_VERSION);
707 		sleep(5);
708 	}
709 }
710 
711 /**
712  * Add daemon for controller flow by UUID.
713  *
714  * Stores the daemons data and push the data in linked list
715  * which contains daemons UUID and daemons XML RPC url
716  *
717  * @param[in,out] server_uuid UUID from daemons
718  * @param[in,out] daemon_url URL from daemons
719  */
add_daemon_by_uuid(const char * server_uuid,char * daemon_url)720 static struct daemon * add_daemon_by_uuid(const char* server_uuid,
721 		char* daemon_url)
722 {
723 	struct daemon *daemon;
724 	daemon = malloc((sizeof(struct daemon)));
725 
726 	if (!daemon) {
727 		logging(LOG_ALERT, "could not allocate memory for daemon");
728 		return 0;
729 	}
730 
731 	memset(daemon, 0, sizeof(struct daemon));
732 	strcpy(daemon->uuid, server_uuid);
733 	daemon->url = daemon_url;
734 	fg_list_push_back(&unique_daemons, daemon);
735 	return daemon;
736 }
737 
738 /**
739  * Determine the daemons for controller flow by UUID.
740  *
741  * Determine the daemons memory block size by number of server in
742  * the controller flow option.
743  *
744  * @param[in,out] server_uuid UUID from daemons
745  * @param[in,out] daemon_url URL from daemons
746  */
set_unique_daemon_by_uuid(const char * server_uuid,char * daemon_url)747 static struct daemon * set_unique_daemon_by_uuid(const char* server_uuid,
748 		char* daemon_url)
749 {
750 	/* Store the first daemon UUID and XML RPC url connection string.
751 	 * First daemon is used as reference to avoid the daemon duplication
752 	 * by their UUID */
753 	if (fg_list_size(&unique_daemons) == 0)
754 		return add_daemon_by_uuid(server_uuid, daemon_url);
755 
756 	/* Compare the incoming daemons UUID with all daemons UUID in
757 	 * memory in order to prevent dupliclity in storing the daemons.
758 	 * If the incoming daemon UUID is already present in the daemons list,
759 	 * then return existing daemon pointer to controller connection.
760 	 * This is because a single daemons can run and maintain mutliple
761 	 * data connection */
762 	const struct list_node *node = fg_list_front(&unique_daemons);
763 	while (node) {
764 		struct daemon *daemon = node->data;
765 		node = node->next;
766 		if (!strcmp(daemon->uuid, server_uuid))
767 			return daemon;
768 	}
769 
770 	return add_daemon_by_uuid(server_uuid, daemon_url);
771 }
772 
773 /**
774  * Set the daemon for controller flow endpoint.
775  *
776  * @param[in,out] server_uuid UUID from daemons
777  * @param[in,out] daemon_url URL from daemons
778  */
set_flow_endpoint_daemon(const char * server_uuid,char * server_url)779 static void set_flow_endpoint_daemon(const char* server_uuid, char* server_url)
780 {
781 	/* Determine the daemon in controller flow data by UUID
782 	 * This prevent the daemons duplication */
783 	for (unsigned id = 0; id < copt.num_flows; id++) {
784 		foreach(int *i, SOURCE, DESTINATION) {
785 			struct flow_endpoint* e = &cflow[id].endpoint[*i];
786 			if(!strcmp(e->rpc_info->server_url, server_url) && !e->daemon) {
787 				e->daemon = set_unique_daemon_by_uuid(server_uuid,
788 								      server_url);
789 			}
790 		}
791 	}
792 }
793 
794 /**
795 * Checks all daemons in flow option.
796 *
797 * Daemon UUID is retrieved and this information is used
798 * to determine daemon in the controller flow information.
799 *
800 * The UUID is used to detect if the same daemon instance is controlling
801 * flows via different IP adresses.
802 *
803 * @param[in,out] rpc_client to connect controller to daemon
804 */
find_daemon(xmlrpc_client * rpc_client)805 static void find_daemon(xmlrpc_client *rpc_client)
806 {
807 	xmlrpc_value * resultP = 0;
808 	const struct list_node *node = fg_list_front(&flows_rpc_info);
809 
810 	while (node) {
811 		if (sigint_caught)
812 			return;
813 
814 		struct rpc_info *flow_rpc_info= node->data;
815 		node = node->next;
816 		/* call daemons by flow option XML-RPC URL connection string */
817 		xmlrpc_client_call2f(&rpc_env, rpc_client,
818 				     flow_rpc_info->server_url,
819 				     "get_uuid", &resultP, "()");
820 		die_if_fault_occurred(&rpc_env);
821 
822 		/* Decomposes the xmlrpc_value and extract the daemon UUID
823 		 * in it into controller local variable */
824 		if (resultP) {
825 			char* server_uuid = 0;
826 
827 			xmlrpc_decompose_value(&rpc_env, resultP, "{s:s,*}",
828 					"server_uuid", &server_uuid);
829 			set_flow_endpoint_daemon(server_uuid, flow_rpc_info->server_url);
830 			die_if_fault_occurred(&rpc_env);
831 			xmlrpc_DECREF(resultP);
832 		}
833 	}
834 }
835 
836 /**
837 * Checks that all nodes are currently idle.
838 *
839 * Get the daemon's flow start status and number of flows running in
840 * a daemon. This piece of information is used to determine, whether the
841 * daemon in a node is busy or idle.
842 *
843 * @param[in,out] rpc_client to connect controller to daemon
844 */
check_idle(xmlrpc_client * rpc_client)845 static void check_idle(xmlrpc_client *rpc_client)
846 {
847 	xmlrpc_value * resultP = 0;
848 	const struct list_node *node = fg_list_front(&unique_daemons);
849 
850 	while (node) {
851 		if (sigint_caught)
852 			return;
853 
854 		struct daemon *daemon = node->data;
855 		node = node->next;
856 
857 		xmlrpc_client_call2f(&rpc_env, rpc_client,
858 				     daemon->url,
859 				     "get_status", &resultP, "()");
860 		die_if_fault_occurred(&rpc_env);
861 
862 		/* Decomposes the xmlrpc_value and extract the daemons data
863 		 * in it into controller local variable */
864 		if (resultP) {
865 			int started;
866 			int num_flows;
867 
868 			xmlrpc_decompose_value(&rpc_env, resultP,
869 					       "{s:i,s:i,*}", "started",
870 					       &started, "num_flows",
871 					       &num_flows);
872 			die_if_fault_occurred(&rpc_env);
873 
874 			/* Daemon start status and number of flows is used to
875 			 * determine node idle status */
876 			if (started || num_flows)
877 				critx("node %s is busy. %d flows, started=%d",
878 				       daemon->url, num_flows,
879 				       started);
880 			xmlrpc_DECREF(resultP);
881 		}
882 	}
883 }
884 
885 /**
886  * To show/hide intermediated interval report columns.
887  *
888  * @param[in] visibility show/hide column
889  * @param[in] nargs length of variable argument list
890  * @param[in] ... column IDs
891  * @see enum column_id
892  */
set_column_visibility(bool visibility,unsigned nargs,...)893 static void set_column_visibility(bool visibility, unsigned nargs, ...)
894 {
895         va_list ap;
896         enum column_id col_id;
897 
898         va_start(ap, nargs);
899         while (nargs--) {
900                 col_id = va_arg(ap, enum column_id);
901                 column_info[col_id].state.visible = visibility;
902         }
903         va_end(ap);
904 }
905 
906 /**
907  * To set the unit the in header of intermediated interval report columns.
908  *
909  * @param[in] unit unit of column header as string
910  * @param[in] nargs length of variable argument list
911  * @param[in] ... column IDs
912  * @see enum column_id
913  */
set_column_unit(const char * unit,unsigned nargs,...)914 static void set_column_unit(const char *unit, unsigned nargs, ...)
915 {
916         va_list ap;
917         enum column_id col_id;
918 
919         va_start(ap, nargs);
920         while (nargs--) {
921                 col_id = va_arg(ap, enum column_id);
922                 column_info[col_id].header.unit = unit;
923         }
924         va_end(ap);
925 }
926 
927 /**
928  * Print headline with various informations before the actual measurement will
929  * be begin.
930  */
print_headline(void)931 static void print_headline(void)
932 {
933 	/* Print headline */
934 	struct utsname me;
935 	int rc = uname(&me);
936 	print_output("# Date: %s, controlling host = %s, number of flows = %d, "
937 		     "reporting interval = %.2fs, [through] = %s (%s)\n",
938 		     ctimenow(false), (rc == -1 ? "(unknown)" : me.nodename),
939 		     copt.num_flows, copt.reporting_interval,
940 		     (copt.mbyte ? "2**20 bytes/second": "10**6 bit/second"),
941 		     FLOWGRIND_VERSION);
942 
943 	/* Prepare column visibility based on involved OSes */
944 	bool involved_os[] = {[0 ... NUM_OSes-1] = false};
945 	const struct list_node *node = fg_list_front(&unique_daemons);
946 	while (node) {
947 		struct daemon *daemon = node->data;
948 		node = node->next;
949 		if (!strcmp(daemon->os_name, "Linux"))
950 			involved_os[LINUX] = true;
951 		else if (!strcmp(daemon->os_name, "FreeBSD"))
952 			involved_os[FREEBSD] = true;
953 		else if (!strcmp(daemon->os_name, "Darwin"))
954 			involved_os[DARWIN] = true;
955 	}
956 
957 	/* No Linux OS is involved in the test */
958 	if (!involved_os[LINUX])
959 		HIDE_COLUMNS(COL_TCP_UACK, COL_TCP_SACK, COL_TCP_LOST,
960 			     COL_TCP_RETR, COL_TCP_TRET, COL_TCP_FACK,
961 			     COL_TCP_REOR, COL_TCP_BKOF, COL_TCP_CA_STATE,
962 			     COL_PMTU);
963 
964 	/* No Linux and FreeBSD OS is involved in the test */
965 	if (!involved_os[FREEBSD] && !involved_os[LINUX])
966 		HIDE_COLUMNS(COL_TCP_CWND, COL_TCP_SSTH, COL_TCP_RTT,
967 			     COL_TCP_RTTVAR, COL_TCP_RTO, COL_SMSS);
968 
969 	const struct list_node *firstnode = fg_list_front(&unique_daemons);
970 	struct daemon *daemon_firstnode = firstnode->data;
971 
972 	/* Set unit for kernel TCP metrics to bytes */
973 	if (copt.force_unit == BYTE_BASED || (copt.force_unit != SEGMENT_BASED &&
974 	    strcmp(daemon_firstnode->os_name, "Linux")))
975 		SET_COLUMN_UNIT(" [B]", COL_TCP_CWND, COL_TCP_SSTH,
976 				COL_TCP_UACK, COL_TCP_SACK, COL_TCP_LOST,
977 				COL_TCP_RETR, COL_TCP_TRET, COL_TCP_FACK,
978 				COL_TCP_REOR, COL_TCP_BKOF);
979 }
980 
981 /**
982  * Prepare test connection for a flow between source and destination daemons.
983  *
984  * Controller sends the flow option to source and destination daemons
985  * separately through XML RPC connection and get backs the flow id and
986  * snd/rcx buffer size from the daemons.
987  *
988  * @param[in] id flow id to prepare the test connection in daemons
989  * @param[in,out] rpc_client to connect controller to daemon
990  */
prepare_flow(int id,xmlrpc_client * rpc_client)991 static void prepare_flow(int id, xmlrpc_client *rpc_client)
992 {
993 	xmlrpc_value *resultP, *extra_options;
994 
995 	int listen_data_port;
996 	DEBUG_MSG(LOG_WARNING, "prepare flow %d destination", id);
997 
998 	/* Contruct extra socket options array */
999 	extra_options = xmlrpc_array_new(&rpc_env);
1000 	for (int i = 0; i < cflow[id].settings[DESTINATION].num_extra_socket_options; i++) {
1001 		xmlrpc_value *value;
1002 		xmlrpc_value *option = xmlrpc_build_value(&rpc_env, "{s:i,s:i}",
1003 			 "level", cflow[id].settings[DESTINATION].extra_socket_options[i].level,
1004 			 "optname", cflow[id].settings[DESTINATION].extra_socket_options[i].optname);
1005 
1006 		value = xmlrpc_base64_new(&rpc_env, cflow[id].settings[DESTINATION].extra_socket_options[i].optlen, (unsigned char*)cflow[id].settings[DESTINATION].extra_socket_options[i].optval);
1007 
1008 		xmlrpc_struct_set_value(&rpc_env, option, "value", value);
1009 
1010 		xmlrpc_array_append_item(&rpc_env, extra_options, option);
1011 		xmlrpc_DECREF(value);
1012 		xmlrpc_DECREF(option);
1013 	}
1014 	xmlrpc_client_call2f(&rpc_env, rpc_client,
1015 		cflow[id].endpoint[DESTINATION].rpc_info->server_url,
1016 		"add_flow_destination", &resultP,
1017 		"("
1018 		"{s:s}"
1019 		"{s:i}"
1020 		"{s:d,s:d,s:d,s:d,s:d}"
1021 		"{s:i,s:i}"
1022 		"{s:i}"
1023 		"{s:b,s:b,s:b,s:b,s:b}"
1024 		"{s:i,s:i}"
1025 		"{s:i,s:d,s:d}" /* request */
1026 		"{s:i,s:d,s:d}" /* response */
1027 		"{s:i,s:d,s:d}" /* interpacket_gap */
1028 		"{s:b,s:b,s:i,s:i}"
1029 		"{s:s}"
1030 		"{s:i,s:i,s:i,s:i,s:i}"
1031 		"{s:s}"
1032 		"{s:i,s:A}"
1033 		")",
1034 
1035 		/* general flow settings */
1036 		"bind_address", cflow[id].endpoint[DESTINATION].test_address,
1037 
1038 		"flow_id",id,
1039 
1040 		"write_delay", cflow[id].settings[DESTINATION].delay[WRITE],
1041 		"write_duration", cflow[id].settings[DESTINATION].duration[WRITE],
1042 		"read_delay", cflow[id].settings[SOURCE].delay[WRITE],
1043 		"read_duration", cflow[id].settings[SOURCE].duration[WRITE],
1044 		"reporting_interval", cflow[id].summarize_only ? 0 : copt.reporting_interval,
1045 
1046 		"requested_send_buffer_size", cflow[id].settings[DESTINATION].requested_send_buffer_size,
1047 		"requested_read_buffer_size", cflow[id].settings[DESTINATION].requested_read_buffer_size,
1048 
1049 		"maximum_block_size", cflow[id].settings[DESTINATION].maximum_block_size,
1050 
1051 		"traffic_dump", cflow[id].settings[DESTINATION].traffic_dump,
1052 		"so_debug", cflow[id].settings[DESTINATION].so_debug,
1053 		"route_record", (int)cflow[id].settings[DESTINATION].route_record,
1054 		"pushy", cflow[id].settings[DESTINATION].pushy,
1055 		"shutdown", (int)cflow[id].shutdown,
1056 
1057 		"write_rate", cflow[id].settings[DESTINATION].write_rate,
1058 		"random_seed",cflow[id].random_seed,
1059 
1060 		"traffic_generation_request_distribution", cflow[id].settings[DESTINATION].request_trafgen_options.distribution,
1061 		"traffic_generation_request_param_one", cflow[id].settings[DESTINATION].request_trafgen_options.param_one,
1062 		"traffic_generation_request_param_two", cflow[id].settings[DESTINATION].request_trafgen_options.param_two,
1063 
1064 		"traffic_generation_response_distribution", cflow[id].settings[DESTINATION].response_trafgen_options.distribution,
1065 		"traffic_generation_response_param_one", cflow[id].settings[DESTINATION].response_trafgen_options.param_one,
1066 		"traffic_generation_response_param_two", cflow[id].settings[DESTINATION].response_trafgen_options.param_two,
1067 
1068 		"traffic_generation_gap_distribution", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.distribution,
1069 		"traffic_generation_gap_param_one", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.param_one,
1070 		"traffic_generation_gap_param_two", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.param_two,
1071 
1072 	"flow_control", cflow[id].settings[DESTINATION].flow_control,
1073 		"byte_counting", cflow[id].byte_counting,
1074 		"cork", (int)cflow[id].settings[DESTINATION].cork,
1075 		"nonagle", cflow[id].settings[DESTINATION].nonagle,
1076 
1077 		"cc_alg", cflow[id].settings[DESTINATION].cc_alg,
1078 
1079 		"elcn", cflow[id].settings[DESTINATION].elcn,
1080 		"lcd", cflow[id].settings[DESTINATION].lcd,
1081 		"mtcp", cflow[id].settings[DESTINATION].mtcp,
1082 		"dscp", (int)cflow[id].settings[DESTINATION].dscp,
1083 		"ipmtudiscover", cflow[id].settings[DESTINATION].ipmtudiscover,
1084 		"dump_prefix", copt.dump_prefix,
1085 		"num_extra_socket_options", cflow[id].settings[DESTINATION].num_extra_socket_options,
1086 		"extra_socket_options", extra_options);
1087 
1088 	die_if_fault_occurred(&rpc_env);
1089 
1090 	xmlrpc_parse_value(&rpc_env, resultP, "{s:i,s:i,s:i,s:i,*}",
1091 		"flow_id", &cflow[id].endpoint_id[DESTINATION],
1092 		"listen_data_port", &listen_data_port,
1093 		"real_listen_send_buffer_size", &cflow[id].endpoint[DESTINATION].send_buffer_size_real,
1094 		"real_listen_read_buffer_size", &cflow[id].endpoint[DESTINATION].receive_buffer_size_real);
1095 	die_if_fault_occurred(&rpc_env);
1096 
1097 	if (resultP)
1098 		xmlrpc_DECREF(resultP);
1099 
1100 	/* Contruct extra socket options array */
1101 	extra_options = xmlrpc_array_new(&rpc_env);
1102 	for (int i = 0; i < cflow[id].settings[SOURCE].num_extra_socket_options; i++) {
1103 
1104 		xmlrpc_value *value;
1105 		xmlrpc_value *option = xmlrpc_build_value(&rpc_env, "{s:i,s:i}",
1106 			 "level", cflow[id].settings[SOURCE].extra_socket_options[i].level,
1107 			 "optname", cflow[id].settings[SOURCE].extra_socket_options[i].optname);
1108 
1109 		value = xmlrpc_base64_new(&rpc_env, cflow[id].settings[SOURCE].extra_socket_options[i].optlen, (unsigned char*)cflow[id].settings[SOURCE].extra_socket_options[i].optval);
1110 
1111 		xmlrpc_struct_set_value(&rpc_env, option, "value", value);
1112 
1113 		xmlrpc_array_append_item(&rpc_env, extra_options, option);
1114 		xmlrpc_DECREF(value);
1115 		xmlrpc_DECREF(option);
1116 	}
1117 	DEBUG_MSG(LOG_WARNING, "prepare flow %d source", id);
1118 
1119 	xmlrpc_client_call2f(&rpc_env, rpc_client,
1120 		cflow[id].endpoint[SOURCE].rpc_info->server_url,
1121 		"add_flow_source", &resultP,
1122 		"("
1123 		"{s:s}"
1124 		"{s:i}"
1125 		"{s:d,s:d,s:d,s:d,s:d}"
1126 		"{s:i,s:i}"
1127 		"{s:i}"
1128 		"{s:b,s:b,s:b,s:b,s:b}"
1129 		"{s:i,s:i}"
1130 		"{s:i,s:d,s:d}" /* request */
1131 		"{s:i,s:d,s:d}" /* response */
1132 		"{s:i,s:d,s:d}" /* interpacket_gap */
1133 		"{s:b,s:b,s:i,s:i}"
1134 		"{s:s}"
1135 		"{s:i,s:i,s:i,s:i,s:i}"
1136 		"{s:s}"
1137 		"{s:i,s:A}"
1138 		"{s:s,s:i,s:i}"
1139 		")",
1140 
1141 		/* general flow settings */
1142 		"bind_address", cflow[id].endpoint[SOURCE].test_address,
1143 
1144 		"flow_id",id,
1145 
1146 		"write_delay", cflow[id].settings[SOURCE].delay[WRITE],
1147 		"write_duration", cflow[id].settings[SOURCE].duration[WRITE],
1148 		"read_delay", cflow[id].settings[DESTINATION].delay[WRITE],
1149 		"read_duration", cflow[id].settings[DESTINATION].duration[WRITE],
1150 		"reporting_interval", cflow[id].summarize_only ? 0 : copt.reporting_interval,
1151 
1152 		"requested_send_buffer_size", cflow[id].settings[SOURCE].requested_send_buffer_size,
1153 		"requested_read_buffer_size", cflow[id].settings[SOURCE].requested_read_buffer_size,
1154 
1155 		"maximum_block_size", cflow[id].settings[SOURCE].maximum_block_size,
1156 
1157 		"traffic_dump", cflow[id].settings[SOURCE].traffic_dump,
1158 		"so_debug", cflow[id].settings[SOURCE].so_debug,
1159 		"route_record", (int)cflow[id].settings[SOURCE].route_record,
1160 		"pushy", cflow[id].settings[SOURCE].pushy,
1161 		"shutdown", (int)cflow[id].shutdown,
1162 
1163 		"write_rate", cflow[id].settings[SOURCE].write_rate,
1164 		"random_seed",cflow[id].random_seed,
1165 
1166 		"traffic_generation_request_distribution", cflow[id].settings[SOURCE].request_trafgen_options.distribution,
1167 		"traffic_generation_request_param_one", cflow[id].settings[SOURCE].request_trafgen_options.param_one,
1168 		"traffic_generation_request_param_two", cflow[id].settings[SOURCE].request_trafgen_options.param_two,
1169 
1170 		"traffic_generation_response_distribution", cflow[id].settings[SOURCE].response_trafgen_options.distribution,
1171 		"traffic_generation_response_param_one", cflow[id].settings[SOURCE].response_trafgen_options.param_one,
1172 		"traffic_generation_response_param_two", cflow[id].settings[SOURCE].response_trafgen_options.param_two,
1173 
1174 		"traffic_generation_gap_distribution", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.distribution,
1175 		"traffic_generation_gap_param_one", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.param_one,
1176 		"traffic_generation_gap_param_two", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.param_two,
1177 
1178 
1179 		"flow_control", cflow[id].settings[SOURCE].flow_control,
1180 		"byte_counting", cflow[id].byte_counting,
1181 		"cork", (int)cflow[id].settings[SOURCE].cork,
1182 		"nonagle", (int)cflow[id].settings[SOURCE].nonagle,
1183 
1184 		"cc_alg", cflow[id].settings[SOURCE].cc_alg,
1185 
1186 		"elcn", cflow[id].settings[SOURCE].elcn,
1187 		"lcd", cflow[id].settings[SOURCE].lcd,
1188 		"mtcp", cflow[id].settings[SOURCE].mtcp,
1189 		"dscp", (int)cflow[id].settings[SOURCE].dscp,
1190 		"ipmtudiscover", cflow[id].settings[SOURCE].ipmtudiscover,
1191 		"dump_prefix", copt.dump_prefix,
1192 		"num_extra_socket_options", cflow[id].settings[SOURCE].num_extra_socket_options,
1193 		"extra_socket_options", extra_options,
1194 
1195 		/* source settings */
1196 		"destination_address", cflow[id].endpoint[DESTINATION].test_address,
1197 		"destination_port", listen_data_port,
1198 		"late_connect", (int)cflow[id].late_connect);
1199 	die_if_fault_occurred(&rpc_env);
1200 
1201 	xmlrpc_DECREF(extra_options);
1202 
1203 	xmlrpc_parse_value(&rpc_env, resultP, "{s:i,s:i,s:i,*}",
1204 		"flow_id", &cflow[id].endpoint_id[SOURCE],
1205 		"real_send_buffer_size", &cflow[id].endpoint[SOURCE].send_buffer_size_real,
1206 		"real_read_buffer_size", &cflow[id].endpoint[SOURCE].receive_buffer_size_real);
1207 	die_if_fault_occurred(&rpc_env);
1208 
1209 	if (resultP)
1210 		xmlrpc_DECREF(resultP);
1211 	DEBUG_MSG(LOG_WARNING, "prepare flow %d completed", id);
1212 }
1213 
1214 /**
1215  * Prepare test connection for all flows in a test
1216  *
1217  * @param[in,out] rpc_client to connect controller to daemon
1218  */
prepare_all_flows(xmlrpc_client * rpc_client)1219 static void prepare_all_flows(xmlrpc_client *rpc_client)
1220 {
1221 	/* prepare flows */
1222 	for (unsigned short id = 0; id < copt.num_flows; id++) {
1223 		if (sigint_caught)
1224 			return;
1225 		prepare_flow(id, rpc_client);
1226 	}
1227 }
1228 
1229 /**
1230  * Start test connections for all flows in a test
1231  *
1232  * All the test connection are started, but test connection flow in the
1233  * controller and in daemon are different. In the controller, test connection
1234  * are respective to number of flows in a test,but in daemons test connection
1235  * are respective to flow endpoints. Single daemons can maintain multiple flows
1236  * endpoints, So controller should start a daemon only once.
1237  *
1238  * @param[in,out] rpc_client to connect controller to daemon
1239  */
start_all_flows(xmlrpc_client * rpc_client)1240 static void start_all_flows(xmlrpc_client *rpc_client)
1241 {
1242 	xmlrpc_value * resultP = 0;
1243 
1244 	struct timespec lastreport_end;
1245 	struct timespec lastreport_begin;
1246 	struct timespec now;
1247 
1248 	gettime(&lastreport_end);
1249 	gettime(&lastreport_begin);
1250 	gettime(&now);
1251 
1252 	const struct list_node *node = fg_list_front(&unique_daemons);
1253 	while (node) {
1254 		if (sigint_caught)
1255 			return;
1256 		struct daemon *daemon = node->data;
1257 		node = node->next;
1258 
1259 		DEBUG_MSG(LOG_ERR, "starting flow on server with UUID %s",daemon->uuid);
1260 		xmlrpc_client_call2f(&rpc_env, rpc_client,
1261 				     daemon->url,
1262 				     "start_flows", &resultP, "({s:i})",
1263 				     "start_timestamp", now.tv_sec + 2);
1264 		die_if_fault_occurred(&rpc_env);
1265 		if (resultP)
1266 			xmlrpc_DECREF(resultP);
1267 	}
1268 
1269 	active_flows = copt.num_flows;
1270 
1271 	/* Reports are fetched from the daemons based on the
1272 	 * report interval duration */
1273 	while (!sigint_caught) {
1274 		if ( time_diff_now(&lastreport_begin) <  copt.reporting_interval ) {
1275 			usleep(copt.reporting_interval - time_diff(&lastreport_begin,&lastreport_end) );
1276 			continue;
1277 		}
1278 		gettime(&lastreport_begin);
1279 		fetch_reports(rpc_client);
1280 		gettime(&lastreport_end);
1281 
1282 		/* All flows have ended */
1283 		if (active_flows < 1)
1284 			return;
1285 	}
1286 }
1287 
1288 /**
1289  * Reports are fetched from the flow endpoint daemon.
1290  *
1291  * Single daemon can maintain multiple flows endpoints and daemons combine all
1292  * its flows reports and send them to the controller. So controller should call
1293  * a daemon in its flows only once.
1294  *
1295  * @param[in,out] rpc_client to connect controller to daemon
1296  */
fetch_reports(xmlrpc_client * rpc_client)1297 static void fetch_reports(xmlrpc_client *rpc_client)
1298 {
1299 
1300 	xmlrpc_value * resultP = 0;
1301 	const struct list_node *node = fg_list_front(&unique_daemons);
1302 
1303 	while (node) {
1304 		struct daemon *daemon = node->data;
1305 		node = node->next;
1306 		int array_size, has_more;
1307 		xmlrpc_value *rv = 0;
1308 
1309 has_more_reports:
1310 
1311 		xmlrpc_client_call2f(&rpc_env, rpc_client, daemon->url,
1312 			"get_reports", &resultP, "()");
1313 		if (rpc_env.fault_occurred) {
1314 			errx("XML-RPC fault: %s (%d)", rpc_env.fault_string,
1315 			      rpc_env.fault_code);
1316 			continue;
1317 		}
1318 
1319 		if (!resultP)
1320 			continue;
1321 
1322 		array_size = xmlrpc_array_size(&rpc_env, resultP);
1323 		if (!array_size) {
1324 			warnx("empty array in get_reports reply");
1325 			continue;
1326 		}
1327 
1328 		xmlrpc_array_read_item(&rpc_env, resultP, 0, &rv);
1329 		xmlrpc_read_int(&rpc_env, rv, &has_more);
1330 		if (rpc_env.fault_occurred) {
1331 			errx("XML-RPC fault: %s (%d)", rpc_env.fault_string,
1332 			      rpc_env.fault_code);
1333 			xmlrpc_DECREF(rv);
1334 			continue;
1335 		}
1336 		xmlrpc_DECREF(rv);
1337 
1338 		for (int i = 1; i < array_size; i++) {
1339 			xmlrpc_value *rv = 0;
1340 
1341 			xmlrpc_array_read_item(&rpc_env, resultP, i, &rv);
1342 			if (rv) {
1343 				struct report report;
1344 				int begin_sec, begin_nsec, end_sec, end_nsec;
1345 				int tcpi_snd_cwnd;
1346 				int tcpi_snd_ssthresh;
1347 				int tcpi_unacked;
1348 				int tcpi_sacked;
1349 				int tcpi_lost;
1350 				int tcpi_retrans;
1351 				int tcpi_retransmits;
1352 				int tcpi_fackets;
1353 				int tcpi_reordering;
1354 				int tcpi_rtt;
1355 				int tcpi_rttvar;
1356 				int tcpi_rto;
1357 				int tcpi_backoff;
1358 				int tcpi_ca_state;
1359 				int tcpi_snd_mss;
1360 				int bytes_read_low, bytes_read_high;
1361 				int bytes_written_low, bytes_written_high;
1362 
1363 				xmlrpc_decompose_value(&rpc_env, rv,
1364 					"("
1365 					"{s:i,s:i,s:i,s:i,s:i,s:i,s:i,*}" /* Report data & timeval */
1366 					"{s:i,s:i,s:i,s:i,*}" /* bytes */
1367 					"{s:i,s:i,s:i,s:i,*}" /* blocks */
1368 					"{s:d,s:d,s:d,s:d,s:d,s:d,s:d,s:d,s:d,*}" /* RTT, IAT, Delay */
1369 					"{s:i,s:i,*}" /* MTU */
1370 					"{s:i,s:i,s:i,s:i,s:i,*}" /* TCP info */
1371 					"{s:i,s:i,s:i,s:i,s:i,*}" /* ...      */
1372 					"{s:i,s:i,s:i,s:i,s:i,*}" /* ...      */
1373 					"{s:i,*}"
1374 					")",
1375 
1376 					"id", &report.id,
1377 					"endpoint", &report.endpoint,
1378 					"type", &report.type,
1379 					"begin_tv_sec", &begin_sec,
1380 					"begin_tv_nsec", &begin_nsec,
1381 					"end_tv_sec", &end_sec,
1382 					"end_tv_nsec", &end_nsec,
1383 
1384 					"bytes_read_high", &bytes_read_high,
1385 					"bytes_read_low", &bytes_read_low,
1386 					"bytes_written_high", &bytes_written_high,
1387 					"bytes_written_low", &bytes_written_low,
1388 
1389 					"request_blocks_read", &report.request_blocks_read,
1390 					"request_blocks_written", &report.request_blocks_written,
1391 					"response_blocks_read", &report.response_blocks_read,
1392 					"response_blocks_written", &report.response_blocks_written,
1393 
1394 					"rtt_min", &report.rtt_min,
1395 					"rtt_max", &report.rtt_max,
1396 					"rtt_sum", &report.rtt_sum,
1397 					"iat_min", &report.iat_min,
1398 					"iat_max", &report.iat_max,
1399 					"iat_sum", &report.iat_sum,
1400 					"delay_min", &report.delay_min,
1401 					"delay_max", &report.delay_max,
1402 					"delay_sum", &report.delay_sum,
1403 
1404 					"pmtu", &report.pmtu,
1405 					"imtu", &report.imtu,
1406 
1407 					"tcpi_snd_cwnd", &tcpi_snd_cwnd,
1408 					"tcpi_snd_ssthresh", &tcpi_snd_ssthresh,
1409 					"tcpi_unacked", &tcpi_unacked,
1410 					"tcpi_sacked", &tcpi_sacked,
1411 					"tcpi_lost", &tcpi_lost,
1412 
1413 					"tcpi_retrans", &tcpi_retrans,
1414 					"tcpi_retransmits", &tcpi_retransmits,
1415 					"tcpi_fackets", &tcpi_fackets,
1416 					"tcpi_reordering", &tcpi_reordering,
1417 					"tcpi_rtt", &tcpi_rtt,
1418 
1419 					"tcpi_rttvar", &tcpi_rttvar,
1420 					"tcpi_rto", &tcpi_rto,
1421 					"tcpi_backoff", &tcpi_backoff,
1422 					"tcpi_ca_state", &tcpi_ca_state,
1423 					"tcpi_snd_mss", &tcpi_snd_mss,
1424 
1425 					"status", &report.status
1426 				);
1427 				xmlrpc_DECREF(rv);
1428 #ifdef HAVE_UNSIGNED_LONG_LONG_INT
1429 				report.bytes_read = ((long long)bytes_read_high << 32) + (uint32_t)bytes_read_low;
1430 				report.bytes_written = ((long long)bytes_written_high << 32) + (uint32_t)bytes_written_low;
1431 #else /* HAVE_UNSIGNED_LONG_LONG_INT */
1432 				report.bytes_read = (uint32_t)bytes_read_low;
1433 				report.bytes_written = (uint32_t)bytes_written_low;
1434 #endif /* HAVE_UNSIGNED_LONG_LONG_INT */
1435 
1436 				/* FIXME Kernel metrics (tcp_info). Other OS than
1437 				 * Linux may not send valid values here. For
1438 				 * the moment we don't care and handle this in
1439 				 * the output/display routines. However, this
1440 				 * do not work in heterogeneous environments */
1441 				report.tcp_info.tcpi_snd_cwnd = tcpi_snd_cwnd;
1442 				report.tcp_info.tcpi_snd_ssthresh = tcpi_snd_ssthresh;
1443 				report.tcp_info.tcpi_unacked = tcpi_unacked;
1444 				report.tcp_info.tcpi_sacked = tcpi_sacked;
1445 				report.tcp_info.tcpi_lost = tcpi_lost;
1446 				report.tcp_info.tcpi_retrans = tcpi_retrans;
1447 				report.tcp_info.tcpi_retransmits = tcpi_retransmits;
1448 				report.tcp_info.tcpi_fackets = tcpi_fackets;
1449 				report.tcp_info.tcpi_reordering = tcpi_reordering;
1450 				report.tcp_info.tcpi_rtt = tcpi_rtt;
1451 				report.tcp_info.tcpi_rttvar = tcpi_rttvar;
1452 				report.tcp_info.tcpi_rto = tcpi_rto;
1453 				report.tcp_info.tcpi_backoff = tcpi_backoff;
1454 				report.tcp_info.tcpi_ca_state = tcpi_ca_state;
1455 				report.tcp_info.tcpi_snd_mss = tcpi_snd_mss;
1456 
1457 				report.begin.tv_sec = begin_sec;
1458 				report.begin.tv_nsec = begin_nsec;
1459 				report.end.tv_sec = end_sec;
1460 				report.end.tv_nsec = end_nsec;
1461 
1462 				report_flow(&report);
1463 			}
1464 		}
1465 		xmlrpc_DECREF(resultP);
1466 
1467 		if (has_more)
1468 			goto has_more_reports;
1469 	}
1470 }
1471 
1472 /**
1473  * Reports are fetched from the flow endpoint daemon
1474  *
1475  * Single daemon can maintain multiple flows endpoints and daemons combine all
1476  * it flows report and send the controller. So controller give the flow ID to
1477  * daemons, while prepare the flow.Controller flow ID is maintained by the
1478  * daemons to maintain its flow endpoints.So When getting back the reports from
1479  * the daemons, the controller use those flow ID registered for the daemon in
1480  * the prepare flow as reference to distinguish the @p report.
1481  * The daemon also send back the details regarding flow endpoints
1482  * i.e. source or destination. So this information is also used by the daemons
1483  * to distinguish the report in the report flow.
1484  *
1485  * @param[in] report report from the daemon
1486  */
report_flow(struct report * report)1487 static void report_flow(struct report* report)
1488 {
1489 	int *i = NULL;
1490 	unsigned short id;
1491 	struct cflow *f = NULL;
1492 
1493 	/* Get matching flow for report */
1494 	/* TODO Maybe just use compare daemon pointers? */
1495 	for (id = 0; id < copt.num_flows; id++) {
1496 		f = &cflow[id];
1497 
1498 		foreach(i, SOURCE, DESTINATION)
1499 			if (f->endpoint_id[*i] == report->id &&
1500 			    *i == (int)report->endpoint)
1501 				goto exit_outer_loop;
1502 	}
1503 exit_outer_loop:
1504 
1505 	if (f->start_timestamp[*i].tv_sec == 0)
1506 		f->start_timestamp[*i] = report->begin;
1507 
1508 	if (report->type == FINAL) {
1509 		DEBUG_MSG(LOG_DEBUG, "received final report for flow %d", id);
1510 		/* Final report, keep it for later */
1511 		free(f->final_report[*i]);
1512 		f->final_report[*i] = malloc(sizeof(struct report));
1513 		*f->final_report[*i] = *report;
1514 
1515 		if (!f->finished[*i]) {
1516 			f->finished[*i] = 1;
1517 			if (f->finished[1 - *i]) {
1518 				active_flows--;
1519 				DEBUG_MSG(LOG_DEBUG, "remaining active flows: "
1520 					  "%d", active_flows);
1521 				assert(active_flows >= 0);
1522 			}
1523 		}
1524 		return;
1525 	}
1526 	print_interval_report(id, *i, report);
1527 }
1528 
1529 /**
1530  * Stop test connections for all flows in a test
1531  *
1532  * All the test connection are stopped, but the test connection flow in the
1533  * controller and in daemon are different. In the controller, test connection
1534  * are respective to number of flows in a test,but in daemons test connection
1535  * are respective to flow endpoints. Single daemons can maintain multiple flows
1536  * endpoints, So controller should stop a daemon only once.
1537  */
close_all_flows(void)1538 static void close_all_flows(void)
1539 {
1540 	xmlrpc_env env;
1541 	xmlrpc_client *client;
1542 
1543 	for (unsigned short id = 0; id < copt.num_flows; id++) {
1544 		DEBUG_MSG(LOG_WARNING, "closing flow %u", id);
1545 
1546 		if (cflow[id].finished[SOURCE] && cflow[id].finished[DESTINATION])
1547 			continue;
1548 
1549 		/* We use new env and client, old one might be in fault condition */
1550 		xmlrpc_env_init(&env);
1551 		xmlrpc_client_create(&env, XMLRPC_CLIENT_NO_FLAGS, "Flowgrind", FLOWGRIND_VERSION, NULL, 0, &client);
1552 		die_if_fault_occurred(&env);
1553 		xmlrpc_env_clean(&env);
1554 
1555 		foreach(int *i, SOURCE, DESTINATION) {
1556 			xmlrpc_value * resultP = 0;
1557 
1558 			if (cflow[id].endpoint_id[*i] == -1 ||
1559 			    cflow[id].finished[*i])
1560 				/* Endpoint does not need closing */
1561 				continue;
1562 
1563 			cflow[id].finished[*i] = 1;
1564 
1565 			xmlrpc_env_init(&env);
1566 			xmlrpc_client_call2f(&env, client,
1567 					     cflow[id].endpoint[*i].rpc_info->server_url,
1568 					     "stop_flow", &resultP, "({s:i})",
1569 					     "flow_id", cflow[id].endpoint_id[*i]);
1570 			if (resultP)
1571 				xmlrpc_DECREF(resultP);
1572 
1573 			xmlrpc_env_clean(&env);
1574 		}
1575 
1576 		if (active_flows > 0)
1577 			active_flows--;
1578 
1579 		xmlrpc_client_destroy(client);
1580 		DEBUG_MSG(LOG_WARNING, "closed flow %u", id);
1581 	}
1582 }
1583 
1584 /**
1585  * Determines the length of the integer part of a decimal number.
1586  *
1587  * @param[in] value decimal number
1588  * @return length of integer part
1589  */
det_num_digits(double value)1590 inline static size_t det_num_digits(double value)
1591 {
1592 	/* Avoiding divide-by-zero */
1593 	if (unlikely((int)value == 0))
1594 		return 1;
1595 	else
1596 		return floor(log10(abs((int)value))) + 1;
1597 }
1598 
1599 /**
1600  * Scale the given throughput @p thruput in either Mebibyte per seconds or in
1601  * Megabits per seconds.
1602  *
1603  * @param[in] thruput throughput in byte per seconds
1604  * @return scaled throughput in MiB/s or Mb/s
1605  */
scale_thruput(double thruput)1606 inline static double scale_thruput(double thruput)
1607 {
1608         if (copt.mbyte)
1609                 return thruput / (1<<20);
1610         return thruput / 1e6 * (1<<3);
1611 }
1612 
1613 /**
1614  * Determines if the current column width @p column_width is larger or smaller
1615  * than the old one and updates the state of column @p column accordingly.
1616  *
1617  * @param[in,out] column column that state to be updated
1618  * @param[in] column_width current column width
1619  * @return true if column state has been updated, false otherwise
1620  */
update_column_width(struct column * column,unsigned column_width)1621 static bool update_column_width(struct column *column, unsigned column_width)
1622 {
1623 	/* True if column width has changed */
1624 	bool has_changed = false;
1625 
1626 	if (column->state.last_width < column_width) {
1627 		/* Column too small */
1628 		has_changed = true;
1629 		column->state.last_width = column_width;
1630 		column->state.oversized = 0;
1631 	} else if (column->state.last_width > 1 + column_width) {
1632 		/* Column too big */
1633 		if (column->state.oversized >= MAX_COLUM_TOO_LARGE) {
1634 			/* Column too big for quite a while */
1635 			has_changed = true;
1636 			column->state.last_width = column_width;
1637 			column->state.oversized = 0;
1638 		} else {
1639 			(column->state.oversized)++;
1640 		}
1641 	} else {
1642 		/* This size was needed, keep it */
1643 		column->state.oversized = 0;
1644 	}
1645 
1646 	return has_changed;
1647 }
1648 
1649 /**
1650  * Append measured data for interval report column @p column_id to given strings.
1651  *
1652  * For the intermediated interval report column @p column_id, append measured
1653  * data @p value to the destination data string @p data, and the name and unit
1654  * of intermediated interval column header to @p header1 and @p header2.
1655  *
1656  * @param[in,out] header1 1st header string (name) to append to
1657  * @param[in,out] header2 2nd header string (unit) to append to
1658  * @param[in,out] data data value string to append to
1659  * @param[in] column_id ID of intermediated interval report column
1660  * @param[in] value measured data string to be append
1661  * @return true if column width has changed, false otherwise
1662  */
print_column_str(char ** header1,char ** header2,char ** data,enum column_id column_id,char * value)1663 static bool print_column_str(char **header1, char **header2, char **data,
1664 			     enum column_id column_id, char* value)
1665 {
1666 	/* Only for convenience */
1667 	struct column *column = &column_info[column_id];
1668 
1669 	if (!column->state.visible)
1670 		return false;
1671 
1672 	/* Get max column width */
1673 	unsigned data_len = strlen(value);
1674 	unsigned header_len = MAX(strlen(column->header.name),
1675 				  strlen(column->header.unit));
1676 	unsigned column_width = MAX(data_len, header_len);
1677 
1678 	/* Check if column width has changed */
1679 	bool has_changed = update_column_width(column, column_width);
1680 
1681 	/* Create format specifiers of right length */
1682 	char *fmt_str = NULL;
1683 	const size_t width = column->state.last_width;
1684 	if (asprintf(&fmt_str, "%%%zus", width + GUARDBAND) == -1)
1685 		critx("could not allocate memory for interval report");
1686 
1687 	/* Print data, 1st and 2nd header row */
1688 	asprintf_append(data, fmt_str, value);
1689 	asprintf_append(header1, fmt_str, column->header.name);
1690 	asprintf_append(header2, fmt_str, column->header.unit);
1691 
1692 	free(fmt_str);
1693 	return has_changed;
1694 }
1695 
1696 /**
1697  * Append measured data for interval report column @p column_id to given strings.
1698  *
1699  * For the intermediated interval report column @p column_id, append measured
1700  * data @p value to the destination data string @p data, and the name and unit
1701  * of intermediated interval column header to @p header1 and @p header2.
1702  *
1703  * @param[in,out] header1 1st header string (name) to append to
1704  * @param[in,out] header2 2nd header string (unit) to append to
1705  * @param[in,out] data data value string to append to
1706  * @param[in] column_id ID of intermediated interval report column
1707  * @param[in] value measured data value to be append
1708  * @param[in] accuracy number of decimal places to be append
1709  * @return true if column width has changed, false otherwise
1710  */
print_column(char ** header1,char ** header2,char ** data,enum column_id column_id,double value,unsigned accuracy)1711 static bool print_column(char **header1, char **header2, char **data,
1712 			 enum column_id column_id, double value,
1713 			 unsigned accuracy)
1714 {
1715 	/* Print symbolic values instead of numbers */
1716 	if (copt.symbolic) {
1717 		switch ((int)value) {
1718 		case INT_MAX:
1719 			return print_column_str(header1, header2, data,
1720 						column_id, "INT_MAX");
1721 		case USHRT_MAX:
1722 			return print_column_str(header1, header2, data,
1723 						column_id, "USHRT_MAX");
1724 		case UINT_MAX:
1725 			return print_column_str(header1, header2, data,
1726 						column_id, "UINT_MAX");
1727 		}
1728 	}
1729 
1730 	/* Only for convenience */
1731 	struct column *column = &column_info[column_id];
1732 
1733 	if (!column->state.visible)
1734 		return false;
1735 
1736 	/* Get max column width */
1737 	unsigned data_len = det_num_digits(value) + (accuracy ? accuracy + 1 : 0);
1738 	unsigned header_len = MAX(strlen(column->header.name),
1739 				  strlen(column->header.unit));
1740 	unsigned column_width = MAX(data_len, header_len);
1741 
1742 	/* Check if column width has changed */
1743 	bool has_changed = update_column_width(column, column_width);
1744 
1745 	/* Create format specifiers of right length */
1746 	char *fmt_num = NULL, *fmt_str = NULL;
1747 	const size_t width = column->state.last_width;
1748 	if (asprintf(&fmt_num, "%%%zu.%df", width + GUARDBAND, accuracy) == -1 ||
1749 	    asprintf(&fmt_str, "%%%zus", width + GUARDBAND) == -1)
1750 		critx("could not allocate memory for interval report");
1751 
1752 	/* Print data, 1st and 2nd header row */
1753 	asprintf_append(data, fmt_num, value);
1754 	asprintf_append(header1, fmt_str, column->header.name);
1755 	asprintf_append(header2, fmt_str, column->header.unit);
1756 
1757 	free_all(fmt_num, fmt_str);
1758 	return has_changed;
1759 }
1760 
1761 /**
1762  * Print interval report @p report for endpoint @p e of flow @p flow_id.
1763  *
1764  * In addition, if the width of one intermediated interval report columns has
1765  * been changed, the interval column header will be printed again.
1766  *
1767  * @param[in] flow_id flow an interval report will be created for
1768  * @param[in] e flow endpoint (SOURCE or DESTINATION)
1769  * @param[in] report interval report to be printed
1770  */
print_interval_report(unsigned short flow_id,enum endpoint_t e,struct report * report)1771 static void print_interval_report(unsigned short flow_id, enum endpoint_t e,
1772 				  struct report *report)
1773 {
1774 	/* Whether or not column width has been changed */
1775 	bool changed = false;
1776 	/* 1st header row, 2nd header row, and the actual measured data */
1777 	char *header1 = NULL, *header2 = NULL, *data = NULL;
1778 
1779 	/* Flow ID and endpoint (source or destination) */
1780 	if (asprintf(&header1, "%s", column_info[COL_FLOW_ID].header.name) == -1 ||
1781 	    asprintf(&header2, "%s", column_info[COL_FLOW_ID].header.unit) == -1 ||
1782 	    asprintf(&data, "%s%3d", e ? "D" : "S", flow_id) == -1)
1783 		critx("could not allocate memory for interval report");
1784 
1785 	/* Calculate time */
1786 	double diff_first_last = time_diff(&cflow[flow_id].start_timestamp[e],
1787 					   &report->begin);
1788 	double diff_first_now = time_diff(&cflow[flow_id].start_timestamp[e],
1789 					  &report->end);
1790 	changed |= print_column(&header1, &header2, &data, COL_BEGIN,
1791 				diff_first_last, 3);
1792 	changed |= print_column(&header1, &header2, &data, COL_END,
1793 				diff_first_now, 3);
1794 
1795 	/* Throughput */
1796 	double thruput = (double)report->bytes_written /
1797 			 (diff_first_now - diff_first_last);
1798 	thruput = scale_thruput(thruput);
1799 	changed |= print_column(&header1, &header2, &data, COL_THROUGH,
1800 				thruput, 6);
1801 
1802 	/* Transactions */
1803 	double transac = (double)report->response_blocks_read /
1804 			 (diff_first_now - diff_first_last);
1805 	changed |= print_column(&header1, &header2, &data, COL_TRANSAC,
1806 				transac, 2);
1807 
1808 	/* Blocks */
1809 	changed |= print_column(&header1, &header2, &data, COL_BLOCK_REQU,
1810 				report->request_blocks_written, 0);
1811 	changed |= print_column(&header1, &header2, &data, COL_BLOCK_RESP,
1812 				report->response_blocks_written, 0);
1813 
1814 	/* RTT */
1815 	double rtt_avg = 0.0;
1816 	if (report->response_blocks_read && report->rtt_sum)
1817 		rtt_avg = report->rtt_sum /
1818 			  (double)(report->response_blocks_read);
1819 	else
1820 		report->rtt_min = report->rtt_max = rtt_avg = INFINITY;
1821 	changed |= print_column(&header1, &header2, &data, COL_RTT_MIN,
1822 				report->rtt_min * 1e3, 3);
1823 	changed |= print_column(&header1, &header2, &data, COL_RTT_AVG,
1824 				rtt_avg * 1e3, 3);
1825 	changed |= print_column(&header1, &header2, &data, COL_RTT_MAX,
1826 				report->rtt_max * 1e3, 3);
1827 
1828 	/* IAT */
1829 	double iat_avg = 0.0;
1830 	if (report->request_blocks_read && report->iat_sum)
1831 		iat_avg = report->iat_sum /
1832 			  (double)(report->request_blocks_read);
1833 	else
1834 		report->iat_min = report->iat_max = iat_avg = INFINITY;
1835 	changed |= print_column(&header1, &header2, &data, COL_IAT_MIN,
1836 				report->rtt_min * 1e3, 3);
1837 	changed |= print_column(&header1, &header2, &data, COL_IAT_AVG,
1838 				iat_avg * 1e3, 3);
1839 	changed |= print_column(&header1, &header2, &data, COL_IAT_MAX,
1840 				report->iat_max * 1e3, 3);
1841 
1842 	/* Delay */
1843 	double delay_avg = 0.0;
1844 	if (report->request_blocks_read && report->delay_sum)
1845 		delay_avg = report->delay_sum /
1846 			    (double)(report->request_blocks_read);
1847 	else
1848 		report->delay_min = report->delay_max = delay_avg = INFINITY;
1849 	changed |= print_column(&header1, &header2, &data, COL_DLY_MIN,
1850 				report->delay_min * 1e3, 3);
1851 	changed |= print_column(&header1, &header2, &data, COL_DLY_AVG,
1852 				delay_avg * 1e3, 3);
1853 	changed |= print_column(&header1, &header2, &data, COL_DLY_MAX,
1854 				report->delay_max * 1e3, 3);
1855 
1856 	/* TCP info struct */
1857 	changed |= print_column(&header1, &header2, &data, COL_TCP_CWND,
1858 				report->tcp_info.tcpi_snd_cwnd, 0);
1859 	changed |= print_column(&header1, &header2, &data, COL_TCP_SSTH,
1860 				report->tcp_info.tcpi_snd_ssthresh, 0);
1861 	changed |= print_column(&header1, &header2, &data, COL_TCP_UACK,
1862 				report->tcp_info.tcpi_unacked, 0);
1863 	changed |= print_column(&header1, &header2, &data, COL_TCP_SACK,
1864 				report->tcp_info.tcpi_sacked, 0);
1865 	changed |= print_column(&header1, &header2, &data, COL_TCP_LOST,
1866 				report->tcp_info.tcpi_lost, 0);
1867 	changed |= print_column(&header1, &header2, &data, COL_TCP_RETR,
1868 				report->tcp_info.tcpi_retrans, 0);
1869 	changed |= print_column(&header1, &header2, &data, COL_TCP_TRET,
1870 				report->tcp_info.tcpi_retransmits, 0);
1871 	changed |= print_column(&header1, &header2, &data, COL_TCP_FACK,
1872 				report->tcp_info.tcpi_fackets, 0);
1873 	changed |= print_column(&header1, &header2, &data, COL_TCP_REOR,
1874 				report->tcp_info.tcpi_reordering, 0);
1875 	changed |= print_column(&header1, &header2, &data, COL_TCP_BKOF,
1876 				report->tcp_info.tcpi_backoff, 0);
1877 	changed |= print_column(&header1, &header2, &data, COL_TCP_RTT,
1878 				report->tcp_info.tcpi_rtt / 1e3, 1);
1879 	changed |= print_column(&header1, &header2, &data, COL_TCP_RTTVAR,
1880 				report->tcp_info.tcpi_rttvar / 1e3, 1);
1881 	changed |= print_column(&header1, &header2, &data, COL_TCP_RTO,
1882 				report->tcp_info.tcpi_rto / 1e3, 1);
1883 
1884 	/* TCP CA state */
1885 	char *ca_state = NULL;
1886 	switch (report->tcp_info.tcpi_ca_state) {
1887 	case TCP_CA_Open:
1888 		ca_state = "open";
1889 		break;
1890 	case TCP_CA_Disorder:
1891 		ca_state = "disorder";
1892 		break;
1893 	case TCP_CA_CWR:
1894 		ca_state = "cwr";
1895 		break;
1896 	case TCP_CA_Recovery:
1897 		ca_state = "recover";
1898 		break;
1899 	case TCP_CA_Loss:
1900 		ca_state = "loss";
1901 		break;
1902 	default:
1903 		ca_state = "unknown";
1904 	}
1905 	changed |= print_column_str(&header1, &header2, &data,
1906 				    COL_TCP_CA_STATE, ca_state);
1907 
1908 	/* SMSS & PMTU */
1909 	changed |= print_column(&header1, &header2, &data, COL_SMSS,
1910 				report->tcp_info.tcpi_snd_mss, 0);
1911 	changed |= print_column(&header1, &header2, &data, COL_PMTU,
1912 				report->pmtu, 0);
1913 
1914 /* Internal flowgrind state */
1915 #ifdef DEBUG
1916 	int rc = 0;
1917 	char *fg_state = NULL;
1918 	if (cflow[flow_id].finished[e]) {
1919 		rc = asprintf(&fg_state, "(stopped)");
1920 	} else {
1921 		/* Write status */
1922 		char ws = (char)(report->status & 0xFF);
1923 		if  (ws != 'd' || ws != 'l' || ws != 'o' || ws != 'f' ||
1924 		     ws != 'c' || ws != 'n')
1925 			ws = 'u';
1926 
1927 		/* Read status */
1928 		char rs = (char)(report->status >> 8);
1929 		if  (rs != 'd' || rs != 'l' || rs != 'o' || rs != 'f' ||
1930 		     rs != 'c' || rs != 'n')
1931 			rs = 'u';
1932 		rc = asprintf(&fg_state, "(%c/%c)", ws, rs);
1933 	}
1934 
1935 	if (rc == -1)
1936 		critx("could not allocate memory for flowgrind status string");
1937 
1938 	changed |= print_column_str(&header1, &header2, &data, COL_STATUS,
1939 				    fg_state);
1940 	free(fg_state);
1941 #endif /* DEBUG */
1942 
1943 	/* Print interval header again if either the column width has been
1944 	 * changed or MAX_REPORTS_BEFORE_HEADER reports have been emited
1945 	 * since last time header was printed */
1946 	static unsigned short printed_reports = 0;
1947 	if (changed || (printed_reports % MAX_REPORTS_IN_ROW) == 0) {
1948 		print_output("%s\n", header1);
1949 		print_output("%s\n", header2);
1950 	}
1951 
1952 	print_output("%s\n", data);
1953 	printed_reports++;
1954 	free_all(header1, header2, data);
1955 }
1956 
1957 /**
1958  * Maps common MTU sizes to network known technologies.
1959  *
1960  * @param[in] mtu MTU size
1961  * @return return network technology as string
1962  */
guess_topology(unsigned mtu)1963 static char *guess_topology(unsigned mtu)
1964 {
1965 	struct mtu_hint {
1966 		unsigned mtu;
1967 		char *topology;
1968 	};
1969 
1970 	static const struct mtu_hint mtu_hints[] = {
1971 		{65535,	"Hyperchannel"},		/* RFC1374 */
1972 		{17914, "16 MB/s Token Ring"},
1973 		{16436, "Linux Loopback device"},
1974 		{16384, "FreeBSD Loopback device"},
1975 		{16352, "Darwin Loopback device"},
1976 		{9000, "Gigabit Ethernet (Jumboframes)"},
1977 		{8166, "802.4 Token Bus"},		/* RFC1042 */
1978 		{4464, "4 MB/s Token Ring"},
1979 		{4352, "FDDI"},				/* RFC1390 */
1980 		{1500, "Ethernet/PPP"},			/* RFC894, RFC1548 */
1981 		{1492, "PPPoE"},			/* RFC2516 */
1982 		{1472, "IP-in-IP"},			/* RFC1853 */
1983 		{1280, "IPv6 Tunnel"},			/* RFC4213 */
1984 		{1006, "SLIP"},				/* RFC1055 */
1985 		{576,  "X.25 & ISDN"},			/* RFC1356 */
1986 		{296,  "PPP (low delay)"},
1987 	};
1988 
1989 	size_t array_size = sizeof(mtu_hints) / sizeof(struct mtu_hint);
1990 	for (unsigned short i = 0; i < array_size; i++)
1991 		if (mtu == mtu_hints[i].mtu)
1992 			return mtu_hints[i].topology;
1993 	return "unknown";
1994 }
1995 
1996 /**
1997  * Print final report (i.e. summary line) for endpoint @p e of flow @p flow_id.
1998  *
1999  * @param[in] flow_id flow a final report will be created for
2000  * @param[in] e flow endpoint (SOURCE or DESTINATION)
2001  */
print_final_report(unsigned short flow_id,enum endpoint_t e)2002 static void print_final_report(unsigned short flow_id, enum endpoint_t e)
2003 {
2004 	/* To store the final report */
2005 	char *buf = NULL;
2006 
2007 	/* For convenience only */
2008 	struct flow_endpoint *endpoint = &cflow[flow_id].endpoint[e];
2009 	struct flow_settings *settings = &cflow[flow_id].settings[e];
2010 	struct report *report = cflow[flow_id].final_report[e];
2011 
2012 	/* Flow ID and endpoint (source or destination) */
2013 	if (asprintf(&buf, "# ID %3d %s: ", flow_id, e ? "D" : "S") == -1)
2014 		critx("could not allocate memory for final report");;
2015 
2016 	/* No final report received. Skip final report line for this endpoint */
2017 	if (!report) {
2018 		asprintf_append(&buf, "Error: no final report received");
2019 		goto out;
2020 	}
2021 
2022 	/* Infos about the test connections */
2023 	asprintf_append(&buf, "%s", endpoint->test_address);
2024 
2025 	if (strcmp(endpoint->rpc_info->server_name, endpoint->test_address) != 0)
2026 		asprintf_append(&buf, "/%s", endpoint->rpc_info->server_name);
2027 	if (endpoint->rpc_info->server_port != DEFAULT_LISTEN_PORT)
2028 		asprintf_append(&buf, ":%d", endpoint->rpc_info->server_port);
2029 
2030 	/* Infos about the daemon OS */
2031 	asprintf_append(&buf, " (%s %s), ",
2032 			endpoint->daemon->os_name, endpoint->daemon->os_release);
2033 
2034 	/* Random seed */
2035 	asprintf_append(&buf, "random seed: %u, ", cflow[flow_id].random_seed);
2036 
2037 	/* Sending & Receiving buffer */
2038 	asprintf_append(&buf, "sbuf = %d/%d [B] (real/req), ",
2039 			endpoint->send_buffer_size_real,
2040 			settings->requested_send_buffer_size);
2041 	asprintf_append(&buf, "rbuf = %d/%d [B] (real/req), ",
2042 			endpoint->receive_buffer_size_real,
2043 			settings->requested_read_buffer_size);
2044 
2045 	/* SMSS, Path MTU, Interface MTU */
2046 	if (report->tcp_info.tcpi_snd_mss > 0)
2047 		asprintf_append(&buf, "SMSS = %d [B], ",
2048 				report->tcp_info.tcpi_snd_mss);
2049 	if (report->pmtu > 0)
2050 		asprintf_append(&buf, "PMTU = %d [B], ", report->pmtu);
2051 	if (report->imtu > 0)
2052 		asprintf_append(&buf, "Interface MTU = %d (%s) [B], ",
2053 				report->imtu, guess_topology(report->imtu));
2054 
2055 	/* Congestion control algorithms */
2056 	if (settings->cc_alg[0])
2057 		asprintf_append(&buf, "CC = %s, ", settings->cc_alg);
2058 
2059 	/* Calculate time */
2060 	double report_time = time_diff(&report->begin, &report->end);
2061 	double delta_write = 0.0, delta_read = 0.0;
2062 	if (settings->duration[WRITE])
2063 		delta_write = report_time - settings->duration[WRITE]
2064 					  - settings->delay[SOURCE];
2065 	if (settings->duration[READ])
2066 		delta_read = report_time - settings->duration[READ]
2067 					 - settings->delay[DESTINATION];
2068 
2069 	/* Calculate delta target vs. real report time */
2070 	double real_write = settings->duration[WRITE] + delta_write;
2071 	double real_read = settings->duration[READ] + delta_read;
2072 	if (settings->duration[WRITE])
2073 		asprintf_append(&buf, "duration = %.3f/%.3f [s] (real/req), ",
2074 				real_write, settings->duration[WRITE]);
2075 	if (settings->delay[WRITE])
2076 		asprintf_append(&buf, "write delay = %.3f [s], ",
2077 				settings->delay[WRITE]);
2078 	if (settings->delay[READ])
2079 		asprintf_append(&buf, "read delay = %.3f [s], ",
2080 				settings->delay[READ]);
2081 
2082 	/* Throughput */
2083 	double thruput_read = report->bytes_read / MAX(real_read, real_write);
2084 	double thruput_write = report->bytes_written / MAX(real_read, real_write);
2085 	if (isnan(thruput_read))
2086 		thruput_read = 0.0;
2087 	if (isnan(thruput_write))
2088 		thruput_write = 0.0;
2089 
2090 	thruput_read = scale_thruput(thruput_read);
2091 	thruput_write = scale_thruput(thruput_write);
2092 
2093 	if (copt.mbyte)
2094 		asprintf_append(&buf, "through = %.6f/%.6f [MiB/s] (out/in)",
2095 				thruput_write, thruput_read);
2096 	else
2097 		asprintf_append(&buf, "through = " "%.6f/%.6f [Mbit/s] (out/in)",
2098 				thruput_write, thruput_read);
2099 
2100 	/* Transactions */
2101 	double trans = report->response_blocks_read / MAX(real_read, real_write);
2102 	if (isnan(trans))
2103 		trans = 0.0;
2104 	if (trans)
2105 		asprintf_append(&buf, ", transactions/s = %.2f [#]", trans);
2106 
2107 	/* Blocks */
2108 	if (report->request_blocks_written || report->request_blocks_read)
2109 		asprintf_append(&buf, ", request blocks = %u/%u [#] (out/in)",
2110 				report->request_blocks_written,
2111 				report->request_blocks_read);
2112 	if (report->response_blocks_written || report->response_blocks_read)
2113 		asprintf_append(&buf, ", response blocks = %u/%u [#] (out/in)",
2114 				report->response_blocks_written,
2115 				report->response_blocks_read);
2116 
2117 	/* RTT */
2118 	if (report->response_blocks_read) {
2119 		double rtt_avg = report->rtt_sum /
2120 				 (double)(report->response_blocks_read);
2121 		asprintf_append(&buf, ", RTT = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2122 				report->rtt_min * 1e3, rtt_avg * 1e3,
2123 				report->rtt_max * 1e3);
2124 	}
2125 
2126 	/* IAT */
2127 	if (report->request_blocks_read) {
2128 		double iat_avg = report->iat_sum /
2129 				 (double)(report->request_blocks_read);
2130 		asprintf_append(&buf, ", IAT = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2131 				report->iat_min * 1e3, iat_avg * 1e3,
2132 				report->iat_max * 1e3);
2133 	}
2134 
2135 	/* Delay */
2136 	if (report->request_blocks_read) {
2137 		double delay_avg = report->delay_sum /
2138 				   (double)(report->request_blocks_read);
2139 		asprintf_append(&buf, ", delay = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2140 				report->delay_min * 1e3, delay_avg * 1e3,
2141 				report->delay_max * 1e3);
2142 	}
2143 
2144 	/* Fixed sending rate per second was set */
2145 	if (settings->write_rate_str)
2146 		asprintf_append(&buf, ", rate = %s", settings->write_rate_str);
2147 
2148 	/* Socket options */
2149 	if (settings->elcn)
2150 		asprintf_append(&buf, ", ELCN");
2151 	if (settings->cork)
2152 		asprintf_append(&buf, ", TCP_CORK");
2153 	if (settings->pushy)
2154 		asprintf_append(&buf, ", PUSHY");
2155 	if (settings->nonagle)
2156 		asprintf_append(&buf, ", TCP_NODELAY");
2157 	if (settings->mtcp)
2158 		asprintf_append(&buf, ", TCP_MTCP");
2159 	if (settings->dscp)
2160 		asprintf_append(&buf, ", dscp = 0x%02x", settings->dscp);
2161 
2162 	/* Other flow options */
2163 	if (cflow[flow_id].late_connect)
2164 		asprintf_append(&buf, ", late connecting");
2165 	if (cflow[flow_id].shutdown)
2166 		asprintf_append(&buf, ", calling shutdown");
2167 
2168 out:
2169 	print_output("%s\n", buf);
2170 	free(buf);
2171 }
2172 
2173 /**
2174  * Print final report (i.e. summary line) for all configured flows.
2175  */
print_all_final_reports(void)2176 static void print_all_final_reports(void)
2177 {
2178 	for (unsigned id = 0; id < copt.num_flows; id++) {
2179 		print_output("\n");
2180 		foreach(int *i, SOURCE, DESTINATION) {
2181 			print_final_report(id, *i);
2182 			free(cflow[id].final_report[*i]);
2183 		}
2184 	}
2185 }
2186 
2187 /**
2188  * Add the flow endpoint XML RPC data to the Global linked list.
2189  *
2190  * @param[in] XML-RPC connection url
2191  * @param[in] server_name flow endpoints IP address
2192  * @param[in] server_port controller - daemon XML-RPC connection port Nr
2193  * @return rpc_info flow endpoint XML RPC structure data
2194  */
add_flow_endpoint_by_url(const char * server_url,const char * server_name,unsigned short server_port)2195 static struct rpc_info * add_flow_endpoint_by_url(const char* server_url,
2196 					  const char* server_name,
2197 					  unsigned short server_port)
2198 {
2199 	struct rpc_info *flow_rpc_info;
2200 	flow_rpc_info = malloc((sizeof(struct rpc_info)));
2201 
2202 	if (!flow_rpc_info ) {
2203 		logging(LOG_ALERT, "could not allocate memory for flows rpc info");
2204 		return 0;
2205 	}
2206 
2207 	memset(flow_rpc_info, 0, sizeof(struct rpc_info));
2208 
2209 	strcpy(flow_rpc_info->server_url, server_url);
2210 	strcpy(flow_rpc_info->server_name, server_name);
2211 	flow_rpc_info->server_port = server_port;
2212 	fg_list_push_back(&flows_rpc_info, flow_rpc_info);
2213 	return flow_rpc_info;
2214 }
2215 
2216 /**
2217  * Set the flow endpoint XML RPC data for a given server_url.
2218  *
2219  * @param[in] XML-RPC connection url
2220  * @param[in] server_name flow endpoints IP address
2221  * @param[in] server_port controller - daemon XML-RPC connection port Nr
2222  * @return rpc_info flow endpoint XML RPC structure data
2223  */
set_rpc_info(const char * server_url,const char * server_name,unsigned short server_port)2224 static struct rpc_info * set_rpc_info(const char* server_url,
2225 					  const char* server_name,
2226 					  unsigned short server_port)
2227 {
2228 	if(fg_list_size(&flows_rpc_info) == 0)
2229 		return add_flow_endpoint_by_url(server_url,server_name, server_port);
2230 
2231 	/* If we have already stored flow info for this URL return a pointer to it */
2232 	const struct list_node *node = fg_list_front(&flows_rpc_info);
2233 	while (node) {
2234 		struct rpc_info *flow_rpc_info= node->data;
2235 		node = node->next;
2236 
2237 		if (!strcmp(flow_rpc_info->server_url, server_url))
2238 			return flow_rpc_info;
2239 	}
2240 	/* didn't find anything, seems to be a new one */
2241 	return add_flow_endpoint_by_url(server_url,server_name, server_port);
2242 }
2243 
2244 /**
2245  * Parse option for stochastic traffic generation (option -G).
2246  *
2247  * @param[in] params parameter string in the form 'x=(q|p|g):(C|U|E|N|L|P|W):#1:[#2]'
2248  * @param[in] flow_id ID of flow to apply option to
2249  * @param[in] endpoint_id endpoint to apply option to
2250  */
parse_trafgen_option(const char * params,int flow_id,int endpoint_id)2251 static void parse_trafgen_option(const char *params, int flow_id, int endpoint_id)
2252 {
2253 	int rc;
2254 	double param1 = 0, param2 = 0, unused;
2255 	char typechar, distchar;
2256 	enum distribution_t distr = CONSTANT;
2257 
2258 	rc = sscanf(params, "%c:%c:%lf:%lf:%lf", &typechar, &distchar,
2259 		    &param1, &param2, &unused);
2260 	if (rc != 3 && rc != 4)
2261 		PARSE_ERR("flow %i: option -G: malformed traffic generation "
2262 			  "parameters", flow_id);
2263 
2264 	switch (distchar) {
2265 	case 'N':
2266 		distr = NORMAL;
2267 		if (!param1 || !param2)
2268 			PARSE_ERR("flow %i: option -G: normal distribution "
2269 				  "needs two non-zero parameters", flow_id);
2270 		break;
2271 	case 'W':
2272 		distr = WEIBULL;
2273 		if (!param1 || !param2)
2274 			PARSE_ERR("flow %i: option -G: weibull distribution "
2275 				  "needs two non-zero parameters", flow_id);
2276 		break;
2277 	case 'U':
2278 		distr = UNIFORM;
2279 		if  (param1 <= 0 || param2 <= 0 || (param1 > param2))
2280 			PARSE_ERR("flow %i: option -G: uniform distribution "
2281 				  "needs two positive parameters", flow_id);
2282 		break;
2283 	case 'E':
2284 		distr = EXPONENTIAL;
2285 		if (param1 <= 0)
2286 			PARSE_ERR("flow %i: option -G: exponential distribution "
2287 				  "needs one positive parameter", flow_id);
2288 		break;
2289 	case 'P':
2290 		distr = PARETO;
2291 		if (!param1 || !param2)
2292 			PARSE_ERR("flow %i: option -G: pareto distribution "
2293 				  "needs two non-zero parameters", flow_id);
2294 		break;
2295 	case 'L':
2296 		distr = LOGNORMAL;
2297 		if (!param1 || !param2)
2298 			PARSE_ERR("flow %i: option -G: lognormal distribution "
2299 				  "needs two non-zero parameters", flow_id);
2300 		break;
2301 	case 'C':
2302 		distr = CONSTANT;
2303 		if (param1 <= 0)
2304 			PARSE_ERR("flow %i: option -G: constant distribution "
2305 				  "needs one positive parameters", flow_id);
2306 		break;
2307 	default:
2308 		PARSE_ERR("flow %i: option -G: syntax error: %c is not a "
2309 			  "distribution", flow_id, distchar);
2310 		break;
2311 	}
2312 
2313 	switch (typechar) {
2314 	case 'p':
2315 		cflow[flow_id].settings[endpoint_id].response_trafgen_options.distribution = distr;
2316 		cflow[flow_id].settings[endpoint_id].response_trafgen_options.param_one = param1;
2317 		cflow[flow_id].settings[endpoint_id].response_trafgen_options.param_two = param2;
2318 		break;
2319 	case 'q':
2320 		cflow[flow_id].settings[endpoint_id].request_trafgen_options.distribution = distr;
2321 		cflow[flow_id].settings[endpoint_id].request_trafgen_options.param_one = param1;
2322 		cflow[flow_id].settings[endpoint_id].request_trafgen_options.param_two = param2;
2323 		break;
2324 	case 'g':
2325 		cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.distribution = distr;
2326 		cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.param_one = param1;
2327 		cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.param_two = param2;
2328 		break;
2329 	}
2330 
2331 	/* sanity check for max block size */
2332 	foreach(int *i, SOURCE, DESTINATION) {
2333 		if (distr == CONSTANT &&
2334 		    cflow[flow_id].settings[*i].maximum_block_size < param1)
2335 			cflow[flow_id].settings[*i].maximum_block_size = param1;
2336 		if (distr == UNIFORM &&
2337 		    cflow[flow_id].settings[*i].maximum_block_size < param2)
2338 			cflow[flow_id].settings[*i].maximum_block_size = param2;
2339 	}
2340 }
2341 
2342 /**
2343  * Parse argument for option -R, which specifies the rate the endpoint will send.
2344  *
2345  * @param[in] arg argument for option -R in form of #.#(z|k|M|G)(b|B|o)
2346  * @param[in] flow_id ID of flow to apply option to
2347  * @param[in] endpoint_id endpoint to apply option to
2348  */
parse_rate_option(const char * arg,int flow_id,int endpoint_id)2349 static void parse_rate_option(const char *arg, int flow_id, int endpoint_id)
2350 {
2351 	char unit = 0, type = 0;
2352 	double optdouble = 0.0;
2353 	/* last %c for catching wrong input... this is not nice. */
2354 	int rc = sscanf(arg, "%lf%c%c%c",
2355 			&optdouble, &unit, &type, &unit);
2356 	if (rc < 1 || rc > 4)
2357 		PARSE_ERR("flow %i: option -R: malformed rate", flow_id);
2358 
2359 	if (optdouble == 0.0)
2360 		PARSE_ERR("flow %i: option -R: rate of 0", flow_id);
2361 
2362 
2363 	switch (unit) {
2364 	case 0:
2365 	case 'z':
2366 		break;
2367 
2368 	case 'k':
2369 		optdouble *= 1<<10;
2370 		break;
2371 
2372 	case 'M':
2373 		optdouble *= 1<<20;
2374 		break;
2375 
2376 	case 'G':
2377 		optdouble *= 1<<30;
2378 		break;
2379 
2380 	default:
2381 		PARSE_ERR("flow %i: option -R: illegal unit specifier", flow_id);
2382 		break;
2383 	}
2384 
2385 	if (type != 'b' && type != 'B')
2386 		PARSE_ERR("flow %i: option -R: illegal type specifier "
2387 			  "(either 'b' or 'B')", flow_id);
2388 	if (type == 'b')
2389 		optdouble /=  8;
2390 
2391 	if (optdouble > 5e9)
2392 		warnx("rate of flow %d too high", flow_id);
2393 
2394 	cflow[flow_id].settings[endpoint_id].write_rate_str = strdup(arg);
2395 	cflow[flow_id].settings[endpoint_id].write_rate = optdouble;
2396 }
2397 
2398 
2399 
2400 /**
2401  * Parse argument for option -H, which specifies the endpoints of a flow.
2402  *
2403  * @param[in] hostarg argument for option -H in form of HOST[/CONTROL[:PORT]]
2404  *	- HOST: test address where the actual test connection goes to
2405  *	- CONTROL: RPC address, where this program connects to
2406  *	- PORT: port for the control connection
2407  * @param[in] flow_id ID of flow to apply option to
2408  * @param[in] endpoint_id endpoint to apply option to
2409  */
parse_host_option(const char * hostarg,int flow_id,int endpoint_id)2410 static void parse_host_option(const char* hostarg, int flow_id, int endpoint_id)
2411 {
2412 	struct sockaddr_in6 source_in6;
2413 	source_in6.sin6_family = AF_INET6;
2414 	int port = DEFAULT_LISTEN_PORT;
2415 	bool extra_rpc = false;
2416 	bool is_ipv6 = false;
2417 	char *rpc_address, *url = 0, *sepptr = 0;
2418 	char *arg = strdup(hostarg);
2419 	struct flow_endpoint* endpoint = &cflow[flow_id].endpoint[endpoint_id];
2420 
2421 	/* extra RPC address ? */
2422 	sepptr = strchr(arg, '/');
2423 	if (sepptr) {
2424 		*sepptr = '\0';
2425 		rpc_address = sepptr + 1;
2426 		extra_rpc = true;
2427 	} else {
2428 		rpc_address = arg;
2429 	}
2430 
2431 	/* IPv6 Address? */
2432 	if (strchr(arg, ':')) {
2433 		if (inet_pton(AF_INET6, arg, (char*)&source_in6.sin6_addr) <= 0)
2434 			PARSE_ERR("flow %i: invalid IPv6 address '%s' for "
2435 				  "test connection", flow_id, arg);
2436 
2437 		if (!extra_rpc)
2438 			is_ipv6 = true;
2439 	}
2440 
2441 	/* optional dedicated rpc address was supplied and needs to be parsed */
2442 	if (extra_rpc) {
2443 		parse_rpc_address(&rpc_address, &port, &is_ipv6);
2444 		if (is_ipv6 && (inet_pton(AF_INET6, rpc_address,
2445 				(char*)&source_in6.sin6_addr) <= 0))
2446 			PARSE_ERR("flow %i: invalid IPv6 address '%s' for RPC",
2447 				  flow_id, arg);
2448 		if (port < 1 || port > 65535)
2449 			PARSE_ERR("flow %i: invalid port for RPC", flow_id);
2450 	}
2451 
2452 	if (!*arg)
2453 		PARSE_ERR("flow %i: no test host given in argument", flow_id);
2454 
2455 	int rc = 0;
2456 	if (is_ipv6)
2457 		rc = asprintf(&url, "http://[%s]:%d/RPC2", rpc_address, port);
2458 	else
2459 		rc = asprintf(&url, "http://%s:%d/RPC2", rpc_address, port);
2460 
2461 	if (rc == -1)
2462 		critx("could not allocate memory for RPC URL");
2463 
2464 	/* Get flow endpoint server information for each flow */
2465 	endpoint->rpc_info  = set_rpc_info(url, rpc_address, port);
2466 	strcpy(endpoint->test_address, arg);
2467 	free_all(arg, url);
2468 }
2469 
2470 /**
2471  * Parse flow options with endpoint.
2472  *
2473  * @param[in] code the code of the cmdline option
2474  * @param[in] arg the argument of the cmdline option
2475  * @param[in] opt_string contains the real cmdline option string
2476  * @param[in] flow_id ID of flow to apply option to
2477  * @param[in] endpoint_id endpoint to apply option to
2478  */
parse_flow_option_endpoint(int code,const char * arg,const char * opt_string,int flow_id,int endpoint_id)2479 static void parse_flow_option_endpoint(int code, const char* arg,
2480 				       const char* opt_string, int flow_id,
2481 				       int endpoint_id)
2482 {
2483 	int optint = 0;
2484 	double optdouble = 0.0;
2485 
2486 	struct flow_settings* settings = &cflow[flow_id].settings[endpoint_id];
2487 
2488 	switch (code) {
2489 	case 'G':
2490 		parse_trafgen_option(arg, flow_id, endpoint_id);
2491 		break;
2492 	case 'A':
2493 		SHOW_COLUMNS(COL_RTT_MIN, COL_RTT_AVG, COL_RTT_MAX);
2494 		settings->response_trafgen_options.distribution = CONSTANT;
2495 		settings->response_trafgen_options.param_one = MIN_BLOCK_SIZE;
2496 		break;
2497 	case 'B':
2498 		if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2499 			PARSE_ERR("in flow %i: option %s needs positive integer",
2500 				  flow_id, opt_string);
2501 		settings->requested_send_buffer_size = optint;
2502 		break;
2503 	case 'C':
2504 		settings->flow_control= 1;
2505 		break;
2506 	case 'D':
2507 		if (sscanf(arg, "%x", &optint) != 1 || (optint & ~0x3f))
2508 			PARSE_ERR("in flow %i: option %s service code point "
2509 				  "is malformed", flow_id, opt_string);
2510 		settings->dscp = optint;
2511 		break;
2512 	case 'H':
2513 		parse_host_option(arg, flow_id, endpoint_id);
2514 		break;
2515 	case 'M':
2516 		settings->traffic_dump = 1;
2517 		break;
2518 	case 'O':
2519 		if (!*arg)
2520 			PARSE_ERR("in flow %i: option %s requires a value "
2521 				  "for each endpoint", flow_id, opt_string);
2522 
2523 		if (!strcmp(arg, "TCP_CORK")) {
2524 			settings->cork = 1;
2525 		} else if (!strcmp(arg, "TCP_ELCN")) {
2526 			settings->elcn = 1;
2527 		} else if (!strcmp(arg, "TCP_LCD")) {
2528 			settings->lcd = 1;
2529 		} else if (!strcmp(arg, "TCP_MTCP")) {
2530 			settings->mtcp = 1;
2531 		} else if (!strcmp(arg, "TCP_NODELAY")) {
2532 			settings->nonagle = 1;
2533 		} else if (!strcmp(arg, "ROUTE_RECORD")) {
2534 			settings->route_record = 1;
2535 		/* keep TCP_CONG_MODULE for backward compatibility */
2536 		} else if (!memcmp(arg, "TCP_CONG_MODULE=", 16)) {
2537 			if (strlen(arg + 16) >= sizeof(cflow[0].settings[SOURCE].cc_alg))
2538 				PARSE_ERR("in flow %i: option %s: too large "
2539 					  "string for TCP_CONG_MODULE",
2540 					  flow_id, opt_string);
2541 			strcpy(settings->cc_alg, arg + 16);
2542 		} else if (!memcmp(arg, "TCP_CONGESTION=", 15)) {
2543 			if (strlen(arg + 16) >= sizeof(cflow[0].settings[SOURCE].cc_alg))
2544 				PARSE_ERR("in flow %i: option %s: too large "
2545 					  "string for TCP_CONGESTION",
2546 					  flow_id, opt_string);
2547 			strcpy(settings->cc_alg, arg + 15);
2548 		} else if (!strcmp(arg, "SO_DEBUG")) {
2549 			settings->so_debug = 1;
2550 		} else if (!strcmp(arg, "IP_MTU_DISCOVER")) {
2551 			settings->ipmtudiscover = 1;
2552 		} else {
2553 			PARSE_ERR("in flow %i: option %s: unknown socket "
2554 				  "option or socket option not implemented",
2555 				  flow_id, opt_string);
2556 		}
2557 		break;
2558 	case 'P':
2559 		settings->pushy = 1;
2560 		break;
2561 	case 'R':
2562 		if (!*arg)
2563 			PARSE_ERR("in flow %i: option %s requires a value "
2564 				  "for each given endpoint", flow_id, opt_string);
2565 		parse_rate_option(arg, flow_id, endpoint_id);
2566 		break;
2567 	case 'S':
2568 		if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2569 			PARSE_ERR("in flow %i: option %s needs positive integer",
2570 				  flow_id, opt_string);
2571 		settings->request_trafgen_options.distribution = CONSTANT;
2572 		settings->request_trafgen_options.param_one = optint;
2573 		for (int id = 0; id < MAX_FLOWS_CONTROLLER; id++) {
2574 			foreach(int *i, SOURCE, DESTINATION) {
2575 				if ((signed)optint >
2576 				    cflow[id].settings[*i].maximum_block_size)
2577 					cflow[id].settings[*i].maximum_block_size =
2578 						(signed)optint;
2579 			}
2580 		}
2581 		break;
2582 	case 'T':
2583 		if (sscanf(arg, "%lf", &optdouble) != 1 || optdouble < 0)
2584 			PARSE_ERR("in flow %i: option %s needs positive number",
2585 				  flow_id, opt_string);
2586 		settings->duration[WRITE] = optdouble;
2587 		break;
2588 	case 'U':
2589 		if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2590 			PARSE_ERR("in flow %i: option %s needs positive integer",
2591 				  flow_id, opt_string);
2592 		settings->maximum_block_size = optint;
2593 		break;
2594 	case 'W':
2595 		if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2596 			PARSE_ERR("in flow %i: option %s needs non-negative number",
2597 				  flow_id, opt_string);
2598 		settings->requested_read_buffer_size = optint;
2599 		break;
2600 	case 'Y':
2601 		if (sscanf(arg, "%lf", &optdouble) != 1 || optdouble < 0)
2602 			PARSE_ERR("in flow %i: option %s needs non-negative number",
2603 				  flow_id, opt_string);
2604 		settings->delay[WRITE] = optdouble;
2605 		break;
2606 	}
2607 }
2608 
2609 /**
2610  * Parse flow options without endpoint.
2611  *
2612  * @param[in] code the code of the cmdline option
2613  * @param[in] arg the argument string of the cmdline option
2614  * @param[in] opt_string contains the real cmdline option string
2615  * @param[in] flow_id ID of flow to apply option to
2616  */
parse_flow_option(int code,const char * arg,const char * opt_string,int flow_id)2617 static void parse_flow_option(int code, const char* arg, const char* opt_string,
2618 			      int flow_id)
2619 {
2620 	unsigned optunsigned = 0;
2621 
2622 	switch (code) {
2623 	/* flow options w/o endpoint identifier */
2624 	case 'E':
2625 		cflow[flow_id].byte_counting = 1;
2626 		break;
2627 	case 'I':
2628 		SHOW_COLUMNS(COL_DLY_MIN, COL_DLY_AVG, COL_DLY_MAX);
2629 		break;
2630 	case 'J':
2631 		if (sscanf(arg, "%u", &optunsigned) != 1)
2632 			PARSE_ERR("option %s needs an integer argument",
2633 				  opt_string);
2634 		cflow[flow_id].random_seed = optunsigned;
2635 		break;
2636 	case 'L':
2637 		cflow[flow_id].late_connect = 1;
2638 		break;
2639 	case 'N':
2640 		cflow[flow_id].shutdown = 1;
2641 		break;
2642 	case 'Q':
2643 		cflow[flow_id].summarize_only = 1;
2644 		break;
2645 	}
2646 }
2647 
2648 /**
2649  * Parse argument for option -c to hide/show intermediated interval report
2650  * columns.
2651  *
2652  * @param[in] arg argument for option -c
2653  */
parse_colon_option(const char * arg)2654 static void parse_colon_option(const char *arg)
2655 {
2656 	/* To make it easy (independed of default values), hide all colons */
2657 	HIDE_COLUMNS(COL_BEGIN, COL_END, COL_THROUGH, COL_TRANSAC,
2658 		     COL_BLOCK_REQU, COL_BLOCK_RESP, COL_RTT_MIN, COL_RTT_AVG,
2659 		     COL_RTT_MAX, COL_IAT_MIN, COL_IAT_AVG, COL_IAT_MAX,
2660 		     COL_DLY_MIN, COL_DLY_AVG, COL_DLY_MAX, COL_TCP_CWND,
2661 		     COL_TCP_SSTH, COL_TCP_UACK, COL_TCP_SACK, COL_TCP_LOST,
2662 		     COL_TCP_RETR, COL_TCP_TRET, COL_TCP_FACK, COL_TCP_REOR,
2663 		     COL_TCP_BKOF, COL_TCP_RTT, COL_TCP_RTTVAR, COL_TCP_RTO,
2664 		     COL_TCP_CA_STATE, COL_SMSS, COL_PMTU);
2665 #ifdef DEBUG
2666 	HIDE_COLUMNS(COL_STATUS);
2667 #endif /* DEBUG */
2668 
2669 	/* Set colon visibility according option */
2670 	char *argcpy = strdup(arg);
2671 	for (char *token = strtok(argcpy, ","); token;
2672 	     token = strtok(NULL, ",")) {
2673 		if (!strcmp(token, "interval"))
2674 			SHOW_COLUMNS(COL_BEGIN, COL_END);
2675 		else if (!strcmp(token, "through"))
2676 			SHOW_COLUMNS(COL_THROUGH);
2677 		else if (!strcmp(token, "transac"))
2678 			SHOW_COLUMNS(COL_TRANSAC);
2679 		else if (!strcmp(token, "blocks"))
2680 			SHOW_COLUMNS(COL_BLOCK_REQU, COL_BLOCK_RESP);
2681 		else if (!strcmp(token, "rtt"))
2682 			SHOW_COLUMNS(COL_RTT_MIN, COL_RTT_AVG, COL_RTT_MAX);
2683 		else if (!strcmp(token, "iat"))
2684 			SHOW_COLUMNS(COL_IAT_MIN, COL_IAT_AVG, COL_IAT_MAX);
2685 		else if (!strcmp(token, "delay"))
2686 			SHOW_COLUMNS(COL_DLY_MIN, COL_DLY_AVG, COL_DLY_MAX);
2687 		else if (!strcmp(token, "kernel"))
2688 			SHOW_COLUMNS(COL_TCP_CWND, COL_TCP_SSTH, COL_TCP_UACK,
2689 				     COL_TCP_SACK, COL_TCP_LOST, COL_TCP_RETR,
2690 				     COL_TCP_TRET, COL_TCP_FACK, COL_TCP_REOR,
2691 				     COL_TCP_BKOF, COL_TCP_RTT, COL_TCP_RTTVAR,
2692 				     COL_TCP_RTO, COL_TCP_CA_STATE, COL_SMSS,
2693 				     COL_PMTU);
2694 #ifdef DEBUG
2695 		else if (!strcmp(token, "status"))
2696 			SHOW_COLUMNS(COL_STATUS);
2697 #endif /* DEBUG */
2698 		else
2699 			PARSE_ERR("%s", "malformed option '-c'");
2700 	}
2701 	free(argcpy);
2702 }
2703 
2704 /**
2705  * Parse general controller options given on the cmdline.
2706  *
2707  * @param[in] code the code of the cmdline option
2708  * @param[in] arg the argument string of the cmdline option
2709  * @param[in] opt_string contains the real cmdline option string
2710  */
parse_general_option(int code,const char * arg,const char * opt_string)2711 static void parse_general_option(int code, const char* arg, const char* opt_string)
2712 {
2713 
2714 	switch (code) {
2715 	case 0:
2716 		PARSE_ERR("invalid argument: %s", arg);
2717 	/* general options */
2718 	case 'h':
2719 		if (!arg || !strlen(arg))
2720 			usage(EXIT_SUCCESS);
2721 		else if (!strcmp(arg, "socket"))
2722 			usage_sockopt();
2723 		else if (!strcmp(arg, "traffic"))
2724 			usage_trafgenopt();
2725 		else
2726 			PARSE_ERR("invalid argument '%s' for %s", arg, opt_string);
2727 		break;
2728 	case 'v':
2729 		fprintf(stdout, "%s %s\n%s\n%s\n\n%s\n", progname,
2730 			FLOWGRIND_VERSION, FLOWGRIND_COPYRIGHT,
2731 			FLOWGRIND_COPYING, FLOWGRIND_AUTHORS);
2732 		exit(EXIT_SUCCESS);
2733 
2734 	/* controller options */
2735 	case 'c':
2736 		parse_colon_option(arg);
2737 		break;
2738 #ifdef DEBUG
2739 	case 'd':
2740 		increase_debuglevel();
2741 		break;
2742 #endif /* DEBUG */
2743 	case 'e':
2744 		copt.dump_prefix = strdup(arg);
2745 		break;
2746 	case 'i':
2747 		if (sscanf(arg, "%lf", &copt.reporting_interval) != 1 ||
2748 					copt.reporting_interval <= 0)
2749 			PARSE_ERR("option %s needs a positive number "
2750 				  "(in seconds)", opt_string);
2751 		break;
2752 	case LOG_FILE_OPTION:
2753 		copt.log_to_file = true;
2754 		if (arg)
2755 			log_filename = strdup(arg);
2756 		break;
2757 	case 'm':
2758 		copt.mbyte = true;
2759 		column_info[COL_THROUGH].header.unit = " [MiB/s]";
2760 		break;
2761 	case 'n':
2762 		if (sscanf(arg, "%hd", &copt.num_flows) != 1 ||
2763 			   copt.num_flows > MAX_FLOWS_CONTROLLER)
2764 			PARSE_ERR("option %s (number of flows) must be within "
2765 				  "[1..%d]", opt_string, MAX_FLOWS_CONTROLLER);
2766 		break;
2767 	case 'o':
2768 		copt.clobber = true;
2769 		break;
2770 	case 'p':
2771 		copt.symbolic = false;
2772 		break;
2773 	case 'q':
2774 		copt.log_to_stdout = false;
2775 		break;
2776 	case 's':
2777 		if (!strcmp(arg, "segment"))
2778 			copt.force_unit = SEGMENT_BASED;
2779 		else if (!strcmp(arg, "byte"))
2780 			copt.force_unit = BYTE_BASED;
2781 		else
2782 			PARSE_ERR("invalid argument '%s' for option %s",
2783 				  arg, opt_string);
2784 		break;
2785 	case 'w':
2786 		copt.log_to_file = true;
2787 		break;
2788 	/* unknown option or missing option-argument */
2789 	default:
2790 		PARSE_ERR("uncaught option: %s", arg);
2791 		break;
2792 	}
2793 
2794 }
2795 
2796 /**
2797  * Wrapper function for mutex checking and error message printing.
2798  *
2799  * Defines the cmdline options and distinguishes option types (flow, general, ...)
2800  * and tokenizes flow options which can have several endpoints.
2801  *
2802  * @param[in] ms array of mutex states
2803  * @param[in] context the mutex context of this option (see enum #mutex_contexts)
2804  * @param[in] argind option record index
2805  * @param[in] flow_id ID of the flow to show in error message
2806  */
check_mutex(struct ap_Mutex_state ms[],const enum mutex_context_t context,const int argind,int flow_id)2807 static void check_mutex(struct ap_Mutex_state ms[],
2808 			const enum mutex_context_t context,
2809 			const int argind, int flow_id)
2810 {
2811 	int mutex_index;
2812 	if (context == MUTEX_CONTEXT_CONTROLLER){
2813 		if (ap_set_check_mutex(&parser, &ms[context], argind, &mutex_index))
2814 			PARSE_ERR("Option %s conflicts with option %s",
2815 				ap_opt_string(&parser, argind),
2816 				ap_opt_string(&parser, mutex_index));
2817 	} else {
2818 		if (ap_set_check_mutex(&parser, &ms[context], argind, &mutex_index))
2819 			PARSE_ERR("In flow %i: option %s conflicts with option %s",
2820 				flow_id, ap_opt_string(&parser, argind),
2821 				ap_opt_string(&parser, mutex_index));
2822 	}
2823 }
2824 
2825 /**
2826  * Parse flow options for multiple endpoints.
2827  *
2828  * This iterates through the endpoints given in the argument string
2829  * (e.g. s=#,d=# or b=#).
2830  *
2831  * @param[in] code the code of the cmdline option
2832  * @param[in] arg the argument of the multi-endpoint flow option
2833  * @param[in] opt_string contains the real cmdline option string
2834  * @param[in] ms array of mutex states
2835  * @param[in] argind index of the option
2836  * @param[in] flow_id ID of flow to apply option to
2837  */
parse_multi_endpoint_option(int code,const char * arg,const char * opt_string,struct ap_Mutex_state ms[],int argind,int flow_id)2838 static void parse_multi_endpoint_option(int code, const char* arg,
2839 					const char* opt_string,
2840 					struct ap_Mutex_state ms[], int argind,
2841 					int flow_id)
2842 {
2843 	char *argcpy = strdup(arg);
2844 	for (char *token = strtok(argcpy, ","); token;
2845 	     token = strtok(NULL, ",")) {
2846 
2847 		char type = token[0];
2848 		char* arg;
2849 
2850 		if (token[1] == '=')
2851 			arg = token + 2;
2852 		else
2853 			arg = token + 1;
2854 
2855 		if (type != 's' && type != 'd' && type != 'b')
2856 			PARSE_ERR("Invalid endpoint specifier in Option %s",
2857 				  opt_string);
2858 
2859 		/* check mutex in context of current endpoint */
2860 		if (type == 's' || type == 'b') {
2861 			check_mutex(ms, MUTEX_CONTEXT_SOURCE, argind, flow_id);
2862 			parse_flow_option_endpoint(code, arg, opt_string,
2863 						   flow_id, SOURCE);
2864 		}
2865 		if (type == 'd' || type == 'b') {
2866 			check_mutex(ms, MUTEX_CONTEXT_DESTINATION, argind, flow_id);
2867 			parse_flow_option_endpoint(code, arg, opt_string,
2868 						   flow_id, DESTINATION);
2869 		}
2870 	}
2871 	free(argcpy);
2872 }
2873 
2874 /**
2875  * The main commandline argument parsing function.
2876  *
2877  * Defines the cmdline options and distinguishes option types (flow, general,
2878  * ...) and tokenizes flow options which can have several endpoints.
2879  *
2880  * @param[in] argc number of arguments (as in main())
2881  * @param[in] argv array of argument strings (as in main())
2882  */
parse_cmdline(int argc,char * argv[])2883 static void parse_cmdline(int argc, char *argv[])
2884 {
2885 	int rc = 0;
2886 	int cur_num_flows = 0;
2887 	int current_flow_ids[MAX_FLOWS_CONTROLLER];
2888 	int max_flow_specifier = 0;
2889 	int optint = 0;
2890 
2891 	const struct ap_Option options[] = {
2892 		{'c', "show-colon", ap_yes, OPT_CONTROLLER, 0},
2893 #ifdef DEBUG
2894 		{'d', "debug", ap_no, OPT_CONTROLLER, 0},
2895 #endif /* DEBUG */
2896 		{'e', "dump-prefix", ap_yes, OPT_CONTROLLER, 0},
2897 		{'h', "help", ap_maybe, OPT_CONTROLLER, 0},
2898 		{'i', "report-interval", ap_yes, OPT_CONTROLLER, 0},
2899 		{LOG_FILE_OPTION, "log-file", ap_maybe, OPT_CONTROLLER, 0},
2900 		{'m', 0, ap_no, OPT_CONTROLLER, 0},
2901 		{'n', "flows", ap_yes, OPT_CONTROLLER, 0},
2902 		{'o', 0, ap_no, OPT_CONTROLLER, 0},
2903 		{'p', 0, ap_no, OPT_CONTROLLER, 0},
2904 		{'q', "quiet", ap_no, OPT_CONTROLLER, 0},
2905 		{'s', "tcp-stack", ap_yes, OPT_CONTROLLER, 0},
2906 		{'v', "version", ap_no, OPT_CONTROLLER, 0},
2907 		{'w', 0, ap_no, OPT_CONTROLLER, 0},
2908 		{'A', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){1,0}},
2909 		{'B', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2910 		{'C', 0, ap_no, OPT_FLOW_ENDPOINT, 0},
2911 		{'D', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2912 		{'E', 0, ap_no, OPT_FLOW, 0},
2913 		{'F', 0, ap_yes, OPT_SELECTOR, 0},
2914 		{'G', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){1,2,3,0}},
2915 		{'H', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2916 		{'I', 0, ap_no, OPT_FLOW, 0},
2917 		{'J', 0, ap_yes, OPT_FLOW, 0},
2918 		{'L', 0, ap_no, OPT_FLOW, 0},
2919 		{'M', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2920 		{'N', 0, ap_no, OPT_FLOW, 0},
2921 		{'O', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2922 		{'P', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2923 		{'Q', 0, ap_no, OPT_FLOW, 0},
2924 		{'R', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){2,0}},
2925 		{'S', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){3,0}},
2926 		{'T', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2927 		{'U', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2928 		{'W', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2929 		{'Y', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2930 		{0, 0, ap_no, 0, 0}
2931 	};
2932 
2933 	if (!ap_init(&parser, argc, (const char* const*) argv, options, 0))
2934 		critx("could not allocate memory for option parser");
2935 	if (ap_error(&parser))
2936 		PARSE_ERR("%s", ap_error(&parser));
2937 
2938 	/* initialize 4 mutex contexts (for SOURCE+DESTINATION+CONTROLLER+BOTH ENDPOINTS) */
2939 	struct ap_Mutex_state ms[4];
2940 	foreach(int *i, MUTEX_CONTEXT_CONTROLLER, MUTEX_CONTEXT_TWO_SIDED,
2941 			MUTEX_CONTEXT_TWO_SIDED, MUTEX_CONTEXT_DESTINATION)
2942 		ap_init_mutex_state(&parser, &ms[*i]);
2943 
2944 	/* if no option -F is given, configure all flows*/
2945 	for (int i = 0; i < MAX_FLOWS_CONTROLLER; i++)
2946 		current_flow_ids[i] = i;
2947 	cur_num_flows = MAX_FLOWS_CONTROLLER;
2948 
2949 	/* parse command line */
2950 	for (int argind = 0; argind < ap_arguments(&parser); argind++) {
2951 		const int code = ap_code(&parser, argind);
2952 		const char *arg = ap_argument(&parser, argind);
2953 		const char *opt_string = ap_opt_string(&parser, argind);
2954 		int tag = ap_option(&parser, argind)->tag;
2955 
2956 		/* distinguish option types by tag first */
2957 		switch (tag) {
2958 		case OPT_CONTROLLER:
2959 			check_mutex(ms, MUTEX_CONTEXT_CONTROLLER, argind, 0);
2960 			parse_general_option(code, arg, opt_string);
2961 			break;
2962 		case OPT_SELECTOR:
2963 			cur_num_flows = 0;
2964 			char *argcpy = strdup(arg);
2965 			for (char *token = strtok(argcpy, ","); token;
2966 			     token = strtok(NULL, ",")) {
2967 				rc = sscanf(token, "%d", &optint);
2968 				if (rc != 1)
2969 					PARSE_ERR("%s", "Malformed flow specifier");
2970 
2971 				/* all flows */
2972 				if (optint == -1) {
2973 					for (int i = 0; i < MAX_FLOWS_CONTROLLER; i++)
2974 						current_flow_ids[i] = i;
2975 					cur_num_flows = MAX_FLOWS_CONTROLLER;
2976 					break;
2977 				}
2978 
2979 				current_flow_ids[cur_num_flows++] = optint;
2980 				ASSIGN_MAX(max_flow_specifier, optint);
2981 			}
2982 			free(argcpy);
2983 			/* reset mutex for each new flow */
2984 			foreach(int *i, MUTEX_CONTEXT_SOURCE,
2985 					MUTEX_CONTEXT_DESTINATION,
2986 					MUTEX_CONTEXT_TWO_SIDED)
2987 				ap_reset_mutex(&ms[*i]);
2988 			break;
2989 		case OPT_FLOW:
2990 			check_mutex(ms, MUTEX_CONTEXT_TWO_SIDED, argind,
2991 				    current_flow_ids[0]);
2992 			for (int i = 0; i < cur_num_flows; i++)
2993 				parse_flow_option(code, arg, opt_string,
2994 						current_flow_ids[i]);
2995 			break;
2996 		case OPT_FLOW_ENDPOINT:
2997 			for (int i = 0; i < cur_num_flows; i++)
2998 				parse_multi_endpoint_option(code, arg,
2999 							    opt_string, ms, argind,
3000 							    current_flow_ids[i]);
3001 			break;
3002 		default:
3003 			PARSE_ERR("%s", "uncaught option tag!");
3004 			break;
3005 		}
3006 	}
3007 
3008 	if (copt.num_flows <= max_flow_specifier)
3009 		PARSE_ERR("%s", "must not specify option for non-existing flow");
3010 
3011 #if 0
3012 	/* Demonstration how to set arbitary socket options. Note that this is
3013 	 * only intended for quickly testing new options without having to
3014 	 * recompile and restart the daemons. To add support for a particular
3015 	 * options in future flowgrind versions it's recommended to implement
3016 	 * them like the other options supported by the -O argument.
3017 	 */
3018 	{
3019 		assert(cflow[0].settings[SOURCE].num_extra_socket_options < MAX_EXTRA_SOCKET_OPTIONS);
3020 		struct extra_socket_options *option = &cflow[0].settings[SOURCE].extra_socket_options[cflow[0].settings[SOURCE].num_extra_socket_options++];
3021 		int v;
3022 
3023 		/* The value of the TCP_NODELAY constant gets passed to the daemons.
3024 		 * If daemons use a different system, constants may be different. In this case use
3025 		 * a value that matches the daemons'. */
3026 		option->optname = TCP_NODELAY; /* or option->optname = 12345; as explained above */
3027 
3028 		option->level = level_ipproto_tcp; /* See extra_socket_option_level enum in common.h */
3029 
3030 		/* Again, value needs to be of correct size for the daemons.
3031 		 * Particular pitfalls can be differences in integer sizes or endianess.
3032 		 */
3033 		assert(sizeof(v) < MAX_EXTRA_SOCKET_OPTION_VALUE_LENGTH);
3034 		option->optlen = sizeof(v);
3035 		memcpy(option->optval, &v, sizeof(v));
3036 	}
3037 #endif /* 0 */
3038 
3039 	for (unsigned short id = 0; id < copt.num_flows; id++) {
3040 		cflow[id].settings[SOURCE].duration[READ] = cflow[id].settings[DESTINATION].duration[WRITE];
3041 		cflow[id].settings[DESTINATION].duration[READ] = cflow[id].settings[SOURCE].duration[WRITE];
3042 		cflow[id].settings[SOURCE].delay[READ] = cflow[id].settings[DESTINATION].delay[WRITE];
3043 		cflow[id].settings[DESTINATION].delay[READ] = cflow[id].settings[SOURCE].delay[WRITE];
3044 
3045 		foreach(int *i, SOURCE, DESTINATION) {
3046 			/* Default to localhost, if no endpoints were set for a flow */
3047 			if (!cflow[id].endpoint[*i].rpc_info) {
3048 				cflow[id].endpoint[*i].rpc_info = set_rpc_info(
3049 					"http://localhost:5999/RPC2", "localhost", DEFAULT_LISTEN_PORT);
3050 			}
3051 		}
3052 	}
3053 
3054 	foreach(int *i, MUTEX_CONTEXT_CONTROLLER, MUTEX_CONTEXT_TWO_SIDED,
3055 			MUTEX_CONTEXT_TWO_SIDED, MUTEX_CONTEXT_DESTINATION)
3056 		ap_free_mutex_state(&ms[*i]);
3057 }
3058 
3059 /**
3060  * Sanity checking flow options.
3061  */
sanity_check(void)3062 static void sanity_check(void)
3063 {
3064 	for (unsigned short id = 0; id < copt.num_flows; id++) {
3065 		DEBUG_MSG(LOG_DEBUG, "sanity checking parameter set of flow %d", id);
3066 		if (cflow[id].settings[DESTINATION].duration[WRITE] > 0 &&
3067 		    cflow[id].late_connect &&
3068 		    cflow[id].settings[DESTINATION].delay[WRITE] <
3069 		    cflow[id].settings[SOURCE].delay[WRITE]) {
3070 			errx("server flow %d starts earlier than client "
3071 			      "flow while late connecting", id);
3072 			exit(EXIT_FAILURE);
3073 		}
3074 		if (cflow[id].settings[SOURCE].delay[WRITE] > 0 &&
3075 		    cflow[id].settings[SOURCE].duration[WRITE] == 0) {
3076 			errx("client flow %d has a delay but no runtime", id);
3077 			exit(EXIT_FAILURE);
3078 		}
3079 		if (cflow[id].settings[DESTINATION].delay[WRITE] > 0 &&
3080 		    cflow[id].settings[DESTINATION].duration[WRITE] == 0) {
3081 			errx("server flow %d has a delay but no runtime", id);
3082 			exit(EXIT_FAILURE);
3083 		}
3084 		if (!cflow[id].settings[DESTINATION].duration[WRITE] &&
3085 		    !cflow[id].settings[SOURCE].duration[WRITE]) {
3086 			errx("server and client flow have both zero runtime "
3087 			      "for flow %d", id);
3088 			exit(EXIT_FAILURE);
3089 		}
3090 
3091 		foreach(int *i, SOURCE, DESTINATION) {
3092 			if (cflow[id].settings[*i].flow_control &&
3093 			    !cflow[id].settings[*i].write_rate_str) {
3094 				errx("flow %d has flow control enabled but no "
3095 				      "rate", id);
3096 				exit(EXIT_FAILURE);
3097 			}
3098 
3099 			if (cflow[id].settings[*i].write_rate &&
3100 			    (cflow[id].settings[*i].write_rate /
3101 			     cflow[id].settings[*i].maximum_block_size) < 1) {
3102 				errx("client block size for flow %u is too big for "
3103 				      "specified rate", id);
3104 				exit(EXIT_FAILURE);
3105 			}
3106 		}
3107 		DEBUG_MSG(LOG_DEBUG, "sanity check parameter set of flow %d completed", id);
3108 	}
3109 }
3110 
main(int argc,char * argv[])3111 int main(int argc, char *argv[])
3112 {
3113 	struct sigaction sa;
3114 	sa.sa_handler = sighandler;
3115 	sa.sa_flags = 0;
3116 	sigemptyset (&sa.sa_mask);
3117 	if (sigaction(SIGINT, &sa, NULL))
3118 		critx("could not set handler for SIGINT");
3119 
3120 	xmlrpc_client *rpc_client = 0;
3121 	xmlrpc_env_init(&rpc_env);
3122 	xmlrpc_client_setup_global_const(&rpc_env);
3123 
3124 	fg_list_init(&flows_rpc_info);
3125 	fg_list_init(&unique_daemons);
3126 
3127 	set_progname(argv[0]);
3128 	init_controller_options();
3129 	init_flow_options();
3130 	parse_cmdline(argc, argv);
3131 	sanity_check();
3132 	open_logfile();
3133 	prepare_xmlrpc_client(&rpc_client);
3134 
3135 	DEBUG_MSG(LOG_WARNING, "check daemons in the flows");
3136 	if (!sigint_caught)
3137 		find_daemon(rpc_client);
3138 
3139 	DEBUG_MSG(LOG_WARNING, "check flowgrindds versions");
3140 	if (!sigint_caught)
3141 		check_version(rpc_client);
3142 
3143 	DEBUG_MSG(LOG_WARNING, "check if flowgrindds are idle");
3144 	if (!sigint_caught)
3145 		check_idle(rpc_client);
3146 
3147 	DEBUG_MSG(LOG_WARNING, "prepare all flows");
3148 	if (!sigint_caught)
3149 		prepare_all_flows(rpc_client);
3150 
3151 	DEBUG_MSG(LOG_WARNING, "print headline");
3152 	if (!sigint_caught)
3153 		print_headline();
3154 
3155 	DEBUG_MSG(LOG_WARNING, "start all flows");
3156 	if (!sigint_caught)
3157 		start_all_flows(rpc_client);
3158 
3159 	DEBUG_MSG(LOG_WARNING, "close all flows");
3160 	close_all_flows();
3161 
3162 	DEBUG_MSG(LOG_WARNING, "print all final report");
3163 	fetch_reports(rpc_client);
3164 	print_all_final_reports();
3165 
3166 	fg_list_clear(&flows_rpc_info);
3167 	fg_list_clear(&unique_daemons);
3168 
3169 	close_logfile();
3170 
3171 	xmlrpc_client_destroy(rpc_client);
3172 	xmlrpc_env_clean(&rpc_env);
3173 	xmlrpc_client_teardown_global_const();
3174 
3175 	ap_free(&parser);
3176 
3177 	DEBUG_MSG(LOG_WARNING, "bye");
3178 }
3179