1 /**
2  * @file fg_rpc_server.c
3  * @brief Flowgrindd rpcserver implementation
4  */
5 
6 /*
7  * Copyright (C) 2013-2014 Alexander Zimmermann <alexander.zimmermann@netapp.com>
8  * Copyright (C) 2010-2014 Arnd Hannemann <arnd@arndnet.de>
9  * Copyright (C) 2010-2013 Christian Samsel <christian.samsel@rwth-aachen.de>
10  * Copyright (C) 2009 Tim Kosse <tim.kosse@gmx.de>
11  * Copyright (C) 2007-2008 Daniel Schaffrath <daniel.schaffrath@mac.com>
12  *
13  * This file is part of Flowgrind.
14  *
15  * Flowgrind is free software: you can redistribute it and/or modify
16  * it under the terms of the GNU General Public License as published by
17  * the Free Software Foundation, either version 3 of the License, or
18  * (at your option) any later version.
19  *
20  * Flowgrind is distributed in the hope that it will be useful,
21  * but WITHOUT ANY WARRANTY; without even the implied warranty of
22  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  * GNU General Public License for more details.
24  *
25  * You should have received a copy of the GNU General Public License
26  * along with Flowgrind.  If not, see <http://www.gnu.org/licenses/>.
27  *
28  */
29 
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif /* HAVE_CONFIG_H */
33 
34 #include <sys/utsname.h>
35 /* for log levels */
36 #include <syslog.h>
37 
38 #include "common.h"
39 #include "daemon.h"
40 #include "fg_log.h"
41 #include "fg_error.h"
42 #include "fg_definitions.h"
43 #include "debug.h"
44 #include "fg_rpc_server.h"
45 
46 /**
47  * Prepare data connection for source endpoint.
48  *
49  * Flowgrind rpc server decode the information from the controller XML-RPC and
50  * construct the request data structure to add flow in the source daemon. The
51  * request is dispatched to source daemon. The source daemon execute the request
52  * and send back the executed result in the request reply to the flowgrind rpc
53  * server. Flowgrind rpc server then encode the request reply information from
54  * the daemon and send back the data to the flowgrind controller through
55  * XML-RPC connection
56  *
57  * @param[in,out] env XML-RPC environment object
58  * @param[in,out] param_array XML-RPC value
59  * @param[in,out] user_data unused arg
60  * return xmlrpc_value XML-RPC value
61  */
add_flow_source(xmlrpc_env * const env,xmlrpc_value * const param_array,void * const user_data)62 static xmlrpc_value * add_flow_source(xmlrpc_env * const env,
63 		   xmlrpc_value * const param_array,
64 		   void * const user_data)
65 {
66 	UNUSED_ARGUMENT(user_data);
67 
68 	int rc, i;
69 	xmlrpc_value *ret = 0;
70 	char* destination_host = 0;
71 	char* cc_alg = 0;
72 	char* bind_address = 0;
73 	xmlrpc_value* extra_options = 0;
74 
75 	struct flow_settings settings;
76 	struct flow_source_settings source_settings;
77 
78 	struct request_add_flow_source* request = 0;
79 
80 	DEBUG_MSG(LOG_WARNING, "method add_flow_source called");
81 
82 	/* Parse our argument array. */
83 	xmlrpc_decompose_value(env, param_array,
84 		"("
85 		"{s:s,*}"
86 		"{s:i,*}"
87 		"{s:d,s:d,s:d,s:d,s:d,*}"
88 		"{s:i,s:i,*}"
89 		"{s:i,*}"
90 		"{s:b,s:b,s:b,s:b,s:b,*}"
91 		"{s:i,s:i,*}"
92 		"{s:i,s:d,s:d,*}" /* request */
93 		"{s:i,s:d,s:d,*}" /* response */
94 		"{s:i,s:d,s:d,*}" /* interpacket_gap */
95 		"{s:b,s:b,s:i,s:i,*}"
96 		"{s:s,*}"
97 		"{s:i,s:i,s:i,s:i,s:i,*}"
98 		"{s:s,*}" /* for LIBPCAP dumps */
99 		"{s:i,s:A,*}"
100 		"{s:s,s:i,s:i,*}"
101 		")",
102 
103 		/* general settings */
104 		"bind_address", &bind_address,
105 
106 		"flow_id", &settings.flow_id,
107 
108 		"write_delay", &settings.delay[WRITE],
109 		"write_duration", &settings.duration[WRITE],
110 		"read_delay", &settings.delay[READ],
111 		"read_duration", &settings.duration[READ],
112 		"reporting_interval", &settings.reporting_interval,
113 
114 		"requested_send_buffer_size", &settings.requested_send_buffer_size,
115 		"requested_read_buffer_size", &settings.requested_read_buffer_size,
116 
117 		"maximum_block_size", &settings.maximum_block_size,
118 
119 		"traffic_dump", &settings.traffic_dump,
120 		"so_debug", &settings.so_debug,
121 		"route_record", &settings.route_record,
122 		"pushy", &settings.pushy,
123 		"shutdown", &settings.shutdown,
124 
125 		"write_rate", &settings.write_rate,
126 		"random_seed",&settings.random_seed,
127 
128 		"traffic_generation_request_distribution", &settings.request_trafgen_options.distribution,
129 		"traffic_generation_request_param_one", &settings.request_trafgen_options.param_one,
130 		"traffic_generation_request_param_two", &settings.request_trafgen_options.param_two,
131 
132 		"traffic_generation_response_distribution", &settings.response_trafgen_options.distribution,
133 		"traffic_generation_response_param_one", &settings.response_trafgen_options.param_one,
134 		"traffic_generation_response_param_two", &settings.response_trafgen_options.param_two,
135 
136 		"traffic_generation_gap_distribution", &settings.interpacket_gap_trafgen_options.distribution,
137 		"traffic_generation_gap_param_one", &settings.interpacket_gap_trafgen_options.param_one,
138 		"traffic_generation_gap_param_two", &settings.interpacket_gap_trafgen_options.param_two,
139 
140 		"flow_control", &settings.flow_control,
141 		"byte_counting", &settings.byte_counting,
142 		"cork", &settings.cork,
143 		"nonagle", &settings.nonagle,
144 
145 		"cc_alg", &cc_alg,
146 
147 		"elcn", &settings.elcn,
148 		"lcd",  &settings.lcd,
149 		"mtcp", &settings.mtcp,
150 		"dscp", &settings.dscp,
151 		"ipmtudiscover", &settings.ipmtudiscover,
152 		"dump_prefix", &dump_prefix,
153 		"num_extra_socket_options", &settings.num_extra_socket_options,
154 		"extra_socket_options", &extra_options,
155 
156 		/* source settings */
157 		"destination_address", &destination_host,
158 		"destination_port", &source_settings.destination_port,
159 		"late_connect", &source_settings.late_connect);
160 
161 	if (env->fault_occurred)
162 		goto cleanup;
163 
164 #ifndef HAVE_LIBPCAP
165 	if (settings.traffic_dump)
166 		XMLRPC_FAIL(env, XMLRPC_TYPE_ERROR, "Daemon was asked to dump traffic, but wasn't compiled with libpcap support");
167 #endif
168 
169 	/* Check for sanity */
170 	if (strlen(bind_address) >= sizeof(settings.bind_address) - 1 ||
171 		settings.delay[WRITE] < 0 || settings.duration[WRITE] < 0 ||
172 		settings.delay[READ] < 0 || settings.duration[READ] < 0 ||
173 		settings.requested_send_buffer_size < 0 || settings.requested_read_buffer_size < 0 ||
174 		settings.maximum_block_size < MIN_BLOCK_SIZE ||
175 		strlen(destination_host) >= sizeof(source_settings.destination_host) - 1||
176 		source_settings.destination_port <= 0 || source_settings.destination_port > 65535 ||
177 		strlen(cc_alg) > TCP_CA_NAME_MAX ||
178 		settings.num_extra_socket_options < 0 || settings.num_extra_socket_options > MAX_EXTRA_SOCKET_OPTIONS ||
179 		xmlrpc_array_size(env, extra_options) != settings.num_extra_socket_options ||
180 		settings.dscp < 0 || settings.dscp > 255 ||
181 		settings.write_rate < 0 ||
182 		settings.reporting_interval < 0) {
183 		XMLRPC_FAIL(env, XMLRPC_TYPE_ERROR, "Flow settings incorrect");
184 	}
185 
186 	/* Parse extra socket options */
187 	for (i = 0; i < settings.num_extra_socket_options; i++) {
188 
189 		const unsigned char* buffer = 0;
190 		size_t len;
191 		xmlrpc_value *option, *level = 0, *optname = 0, *value = 0;
192 		xmlrpc_array_read_item(env, extra_options, i, &option);
193 
194 		if (!env->fault_occurred)
195 			xmlrpc_struct_read_value(env, option, "level", &level);
196 		if (!env->fault_occurred)
197 			xmlrpc_struct_read_value(env, option, "optname", &optname);
198 		if (!env->fault_occurred)
199 			xmlrpc_struct_read_value(env, option, "value", &value);
200 		if (!env->fault_occurred)
201 			xmlrpc_read_int(env, level, &settings.extra_socket_options[i].level);
202 		if (!env->fault_occurred)
203 			xmlrpc_read_int(env, optname, &settings.extra_socket_options[i].optname);
204 		if (!env->fault_occurred)
205 			xmlrpc_read_base64(env, value, &len, &buffer);
206 		if (level)
207 			xmlrpc_DECREF(level);
208 		if (optname)
209 			xmlrpc_DECREF(optname);
210 		if (value)
211 			xmlrpc_DECREF(value);
212 		if (!env->fault_occurred) {
213 			if (len > MAX_EXTRA_SOCKET_OPTION_VALUE_LENGTH) {
214 				free((void *)buffer);
215 				XMLRPC_FAIL(env, XMLRPC_TYPE_ERROR, "Too long extra socket option length");
216 			}
217 			settings.extra_socket_options[i].optlen = len;
218 			memcpy(settings.extra_socket_options[i].optval, buffer, len);
219 			free((void *)buffer);
220 		}
221 		if (env->fault_occurred)
222 			goto cleanup;
223 	}
224 
225 	strcpy(source_settings.destination_host, destination_host);
226 	strcpy(settings.cc_alg, cc_alg);
227 	strcpy(settings.bind_address, bind_address);
228 
229 	request = malloc(sizeof(struct request_add_flow_source));
230 	request->settings = settings;
231 	request->source_settings = source_settings;
232 	rc = dispatch_request((struct request*)request, REQUEST_ADD_SOURCE);
233 
234 	if (rc == -1)
235 		XMLRPC_FAIL(env, XMLRPC_INTERNAL_ERROR, request->r.error); /* goto cleanup on failure */
236 
237 	/* Return our result. */
238 	ret = xmlrpc_build_value(env, "{s:i,s:s,s:i,s:i}",
239 		"flow_id", request->flow_id,
240 		"cc_alg", request->cc_alg,
241 		"real_send_buffer_size", request->real_send_buffer_size,
242 		"real_read_buffer_size", request->real_read_buffer_size);
243 
244 cleanup:
245 	if (request)
246 		free_all(request->r.error, request);
247 	free_all(destination_host, cc_alg, bind_address);
248 
249 	if (extra_options)
250 		xmlrpc_DECREF(extra_options);
251 
252 	if (env->fault_occurred)
253 		logging(LOG_WARNING, "method add_flow_source failed: %s",
254 			env->fault_string);
255 	else
256 		DEBUG_MSG(LOG_WARNING, "method add_flow_source successful");
257 
258 	return ret;
259 }
260 
261 /**
262  * Prepare data connection for destination endpoint.
263  *
264  * Flowgrind rpc server decode the information from the controller XML-RPC and
265  * construct the request data structure to add flow in the destination daemon.
266  * The request is dispatched to destination daemon. The destination daemon execute
267  * the request and send back the executed result in the request reply to the
268  * flowgrind rpc server. Flowgrind rpc server then encode the request reply
269  * information from the daemon and send back the data to the flowgrind
270  * controller through XML-RPC connection
271  *
272  * @param[in,out] env XML-RPC environment object
273  * @param[in,out] param_array XML-RPC value
274  * @param[in,out] user_data unused arg
275  * return xmlrpc_value XML-RPC value
276  */
add_flow_destination(xmlrpc_env * const env,xmlrpc_value * const param_array,void * const user_data)277 static xmlrpc_value * add_flow_destination(xmlrpc_env * const env,
278 		   xmlrpc_value * const param_array,
279 		   void * const user_data)
280 {
281 	UNUSED_ARGUMENT(user_data);
282 
283 	int rc, i;
284 	xmlrpc_value *ret = 0;
285 	char* cc_alg = 0;
286 	char* bind_address = 0;
287 	xmlrpc_value* extra_options = 0;
288 
289 	struct flow_settings settings;
290 
291 	struct request_add_flow_destination* request = 0;
292 
293 	DEBUG_MSG(LOG_WARNING, "method add_flow_destination called");
294 
295 	/* Parse our argument array. */
296 	xmlrpc_decompose_value(env, param_array,
297 		"("
298 		"{s:s,*}"
299 		"{s:i,*}"
300 		"{s:d,s:d,s:d,s:d,s:d,*}"
301 		"{s:i,s:i,*}"
302 		"{s:i,*}"
303 		"{s:b,s:b,s:b,s:b,s:b,*}"
304 		"{s:i,s:i,*}"
305 		"{s:i,s:d,s:d,*}" /* request */
306 		"{s:i,s:d,s:d,*}" /* response */
307 		"{s:i,s:d,s:d,*}" /* interpacket_gap */
308 		"{s:b,s:b,s:i,s:i,*}"
309 		"{s:s,*}"
310 		"{s:i,s:i,s:i,s:i,s:i,*}"
311 		"{s:s,*}" /* For libpcap dumps */
312 		"{s:i,s:A,*}"
313 		")",
314 
315 		/* general settings */
316 		"bind_address", &bind_address,
317 
318 		"flow_id", &settings.flow_id,
319 
320 		"write_delay", &settings.delay[WRITE],
321 		"write_duration", &settings.duration[WRITE],
322 		"read_delay", &settings.delay[READ],
323 		"read_duration", &settings.duration[READ],
324 		"reporting_interval", &settings.reporting_interval,
325 
326 		"requested_send_buffer_size", &settings.requested_send_buffer_size,
327 		"requested_read_buffer_size", &settings.requested_read_buffer_size,
328 
329 		"maximum_block_size", &settings.maximum_block_size,
330 
331 		"traffic_dump", &settings.traffic_dump,
332 		"so_debug", &settings.so_debug,
333 		"route_record", &settings.route_record,
334 		"pushy", &settings.pushy,
335 		"shutdown", &settings.shutdown,
336 
337 		"write_rate", &settings.write_rate,
338 		"random_seed",&settings.random_seed,
339 
340 		"traffic_generation_request_distribution", &settings.request_trafgen_options.distribution,
341 		"traffic_generation_request_param_one", &settings.request_trafgen_options.param_one,
342 		"traffic_generation_request_param_two", &settings.request_trafgen_options.param_two,
343 
344 		"traffic_generation_response_distribution", &settings.response_trafgen_options.distribution,
345 		"traffic_generation_response_param_one", &settings.response_trafgen_options.param_one,
346 		"traffic_generation_response_param_two", &settings.response_trafgen_options.param_two,
347 
348 		"traffic_generation_gap_distribution", &settings.interpacket_gap_trafgen_options.distribution,
349 		"traffic_generation_gap_param_one", &settings.interpacket_gap_trafgen_options.param_one,
350 		"traffic_generation_gap_param_two", &settings.interpacket_gap_trafgen_options.param_two,
351 
352 		"flow_control", &settings.flow_control,
353 		"byte_counting", &settings.byte_counting,
354 		"cork", &settings.cork,
355 		"nonagle", &settings.nonagle,
356 
357 		"cc_alg", &cc_alg,
358 
359 		"elcn", &settings.elcn,
360 		"lcd", &settings.lcd,
361 		"mtcp", &settings.mtcp,
362 		"dscp", &settings.dscp,
363 		"ipmtudiscover", &settings.ipmtudiscover,
364 		"dump_prefix", &dump_prefix,
365 		"num_extra_socket_options", &settings.num_extra_socket_options,
366 		"extra_socket_options", &extra_options);
367 
368 	if (env->fault_occurred)
369 		goto cleanup;
370 
371 #ifndef HAVE_LIBPCAP
372 	if (settings.traffic_dump)
373 		XMLRPC_FAIL(env, XMLRPC_TYPE_ERROR, "Daemon was asked to dump traffic, but wasn't compiled with libpcap support");
374 #endif
375 
376 	/* Check for sanity */
377 	if (strlen(bind_address) >= sizeof(settings.bind_address) - 1 ||
378 		settings.delay[WRITE] < 0 || settings.duration[WRITE] < 0 ||
379 		settings.delay[READ] < 0 || settings.duration[READ] < 0 ||
380 		settings.requested_send_buffer_size < 0 || settings.requested_read_buffer_size < 0 ||
381 		settings.maximum_block_size < MIN_BLOCK_SIZE ||
382 		settings.write_rate < 0 ||
383 		strlen(cc_alg) > TCP_CA_NAME_MAX ||
384 		settings.num_extra_socket_options < 0 || settings.num_extra_socket_options > MAX_EXTRA_SOCKET_OPTIONS ||
385 		xmlrpc_array_size(env, extra_options) != settings.num_extra_socket_options) {
386 		XMLRPC_FAIL(env, XMLRPC_TYPE_ERROR, "Flow settings incorrect");
387 	}
388 
389 	/* Parse extra socket options */
390 	for (i = 0; i < settings.num_extra_socket_options; i++) {
391 
392 		const unsigned char* buffer = 0;
393 		size_t len;
394 		xmlrpc_value *option, *level = 0, *optname = 0, *value = 0;
395 		xmlrpc_array_read_item(env, extra_options, i, &option);
396 
397 		if (!env->fault_occurred)
398 			xmlrpc_struct_read_value(env, option, "level", &level);
399 		if (!env->fault_occurred)
400 			xmlrpc_struct_read_value(env, option, "optname", &optname);
401 		if (!env->fault_occurred)
402 			xmlrpc_struct_read_value(env, option, "value", &value);
403 		if (!env->fault_occurred)
404 			xmlrpc_read_int(env, level, &settings.extra_socket_options[i].level);
405 		if (!env->fault_occurred)
406 			xmlrpc_read_int(env, optname, &settings.extra_socket_options[i].optname);
407 		if (!env->fault_occurred)
408 			xmlrpc_read_base64(env, value, &len, &buffer);
409 		if (level)
410 			xmlrpc_DECREF(level);
411 		if (optname)
412 			xmlrpc_DECREF(optname);
413 		if (value)
414 			xmlrpc_DECREF(value);
415 		if (!env->fault_occurred) {
416 			if (len > MAX_EXTRA_SOCKET_OPTION_VALUE_LENGTH) {
417 				free((void *)buffer);
418 				XMLRPC_FAIL(env, XMLRPC_TYPE_ERROR, "Too long extra socket option length");
419 			}
420 			settings.extra_socket_options[i].optlen = len;
421 			memcpy(settings.extra_socket_options[i].optval, buffer, len);
422 			free((void *)buffer);
423 		}
424 		if (env->fault_occurred)
425 			goto cleanup;
426 	}
427 
428 	strcpy(settings.cc_alg, cc_alg);
429 	strcpy(settings.bind_address, bind_address);
430 	DEBUG_MSG(LOG_WARNING, "bind_address=%s", bind_address);
431 	request = malloc(sizeof(struct request_add_flow_destination));
432 	request->settings = settings;
433 	rc = dispatch_request((struct request*)request, REQUEST_ADD_DESTINATION);
434 
435 	if (rc == -1)
436 		XMLRPC_FAIL(env, XMLRPC_INTERNAL_ERROR, request->r.error); /* goto cleanup on failure */
437 
438 	/* Return our result. */
439 	ret = xmlrpc_build_value(env, "{s:i,s:i,s:i,s:i}",
440 		"flow_id", request->flow_id,
441 		"listen_data_port", request->listen_data_port,
442 		"real_listen_send_buffer_size", request->real_listen_send_buffer_size,
443 		"real_listen_read_buffer_size", request->real_listen_read_buffer_size);
444 
445 cleanup:
446 	if (request)
447 		free_all(request->r.error, request);
448 	free_all(cc_alg, bind_address);
449 
450 	if (extra_options)
451 		xmlrpc_DECREF(extra_options);
452 
453 	if (env->fault_occurred)
454 		logging(LOG_WARNING, "method add_flow_destination failed: %s",
455 			env->fault_string);
456 	else
457 		DEBUG_MSG(LOG_WARNING, "method add_flow_destination successful");
458 
459 	return ret;
460 }
461 
start_flows(xmlrpc_env * const env,xmlrpc_value * const param_array,void * const user_data)462 static xmlrpc_value * start_flows(xmlrpc_env * const env,
463 		   xmlrpc_value * const param_array,
464 		   void * const user_data)
465 {
466 	UNUSED_ARGUMENT(user_data);
467 
468 	int rc;
469 	xmlrpc_value *ret = 0;
470 	int start_timestamp;
471 	struct request_start_flows *request = 0;
472 
473 	DEBUG_MSG(LOG_WARNING, "method start_flows called");
474 
475 	/* Parse our argument array. */
476 	xmlrpc_decompose_value(env, param_array, "({s:i,*})",
477 
478 		/* general settings */
479 		"start_timestamp", &start_timestamp);
480 
481 	if (env->fault_occurred)
482 		goto cleanup;
483 
484 	request = malloc(sizeof(struct request_start_flows));
485 	request->start_timestamp = start_timestamp;
486 	rc = dispatch_request((struct request*)request, REQUEST_START_FLOWS);
487 
488 	if (rc == -1)
489 		XMLRPC_FAIL(env, XMLRPC_INTERNAL_ERROR, request->r.error); /* goto cleanup on failure */
490 
491 	/* Return our result. */
492 	ret = xmlrpc_build_value(env, "i", 0);
493 
494 cleanup:
495 	if (request)
496 		free_all(request->r.error, request);
497 
498 	if (env->fault_occurred)
499 		logging(LOG_WARNING, "method start_flows failed: %s",
500 			env->fault_string);
501 	else
502 		DEBUG_MSG(LOG_WARNING, "method start_flows successful");
503 
504 	return ret;
505 }
506 
507 /**
508  * To get the reports from the daemon.
509  *
510  * Flowgrind rpc server get the reports from the daemon and encode the information
511  * and send the report data to the controller through XML-RPC connection
512  *
513  * @param[in,out] env XML-RPC environment object
514  * @param[in,out] param_array unused arg
515  * @param[in,out] user_data unused arg
516  * return xmlrpc_value XML-RPC value
517  */
method_get_reports(xmlrpc_env * const env,xmlrpc_value * const param_array,void * const user_data)518 static xmlrpc_value * method_get_reports(xmlrpc_env * const env,
519 		   xmlrpc_value * const param_array,
520 		   void * const user_data)
521 {
522 	int has_more;
523 	xmlrpc_value *ret = 0, *item = 0;
524 
525 	UNUSED_ARGUMENT(param_array);
526 	UNUSED_ARGUMENT(user_data);
527 
528 	DEBUG_MSG(LOG_NOTICE, "method get_reports called");
529 
530 	struct report *report = get_reports(&has_more);
531 
532 	ret = xmlrpc_array_new(env);
533 
534 	/* Add information if there's more reports pending */
535 	item = xmlrpc_int_new(env, has_more);
536 	xmlrpc_array_append_item(env, ret, item);
537 	xmlrpc_DECREF(item);
538 
539 	while (report) {
540 		xmlrpc_value *rv = xmlrpc_build_value(env,
541 			"("
542 			"{s:i,s:i,s:i,s:i,s:i,s:i,s:i}" /* Report data & timeval */
543 			"{s:i,s:i,s:i,s:i}" /* bytes */
544 			"{s:i,s:i,s:i,s:i}" /* block counts */
545 			"{s:d,s:d,s:d,s:d,s:d,s:d,s:d,s:d,s:d}" /* RTT, IAT, Delay */
546 			"{s:i,s:i}" /* MTU */
547 			"{s:i,s:i,s:i,s:i,s:i}" /* TCP info */
548 			"{s:i,s:i,s:i,s:i,s:i}" /* ...      */
549 			"{s:i,s:i,s:i,s:i,s:i}" /* ...      */
550 			"{s:i}"
551 			")",
552 
553 			"id", report->id,
554 			"endpoint",report->endpoint,
555 			"type", report->type,
556 			"begin_tv_sec", (int)report->begin.tv_sec,
557 			"begin_tv_nsec", (int)report->begin.tv_nsec,
558 			"end_tv_sec", (int)report->end.tv_sec,
559 			"end_tv_nsec", (int)report->end.tv_nsec,
560 
561 			"bytes_read_high", (int32_t)(report->bytes_read >> 32),
562 			"bytes_read_low", (int32_t)(report->bytes_read & 0xFFFFFFFF),
563 			"bytes_written_high", (int32_t)(report->bytes_written >> 32),
564 			"bytes_written_low", (int32_t)(report->bytes_written & 0xFFFFFFFF),
565 
566 			"request_blocks_read", report->request_blocks_read,
567 			"request_blocks_written", report->request_blocks_written,
568 			"response_blocks_read", report->response_blocks_read,
569 			"response_blocks_written", report->response_blocks_written,
570 
571 			"rtt_min", report->rtt_min,
572 			"rtt_max", report->rtt_max,
573 			"rtt_sum", report->rtt_sum,
574 			"iat_min", report->iat_min,
575 			"iat_max", report->iat_max,
576 			"iat_sum", report->iat_sum,
577 			"delay_min", report->delay_min,
578 			"delay_max", report->delay_max,
579 			"delay_sum", report->delay_sum,
580 
581 			"pmtu", report->pmtu,
582 			"imtu", report->imtu,
583 
584 /* Currently, not all members of the TCP_INFO socket option are used by the
585  * FreeBSD kernel. Other members will contain zeroes */
586 			"tcpi_snd_cwnd", (int)report->tcp_info.tcpi_snd_cwnd,
587 			"tcpi_snd_ssthresh", (int)report->tcp_info.tcpi_snd_ssthresh,
588 			"tcpi_unacked", (int)report->tcp_info.tcpi_unacked,
589 			"tcpi_sacked", (int)report->tcp_info.tcpi_sacked,
590 			"tcpi_lost", (int)report->tcp_info.tcpi_lost,
591 			"tcpi_retrans", (int)report->tcp_info.tcpi_retrans,
592 			"tcpi_retransmits", (int)report->tcp_info.tcpi_retransmits,
593 			"tcpi_fackets", (int)report->tcp_info.tcpi_fackets,
594 			"tcpi_reordering", (int)report->tcp_info.tcpi_reordering,
595 			"tcpi_rtt", (int)report->tcp_info.tcpi_rtt,
596 			"tcpi_rttvar", (int)report->tcp_info.tcpi_rttvar,
597 			"tcpi_rto", (int)report->tcp_info.tcpi_rto,
598 			"tcpi_backoff", (int)report->tcp_info.tcpi_backoff,
599 			"tcpi_ca_state", (int)report->tcp_info.tcpi_ca_state,
600 			"tcpi_snd_mss", (int)report->tcp_info.tcpi_snd_mss,
601 
602 			"status", report->status
603 		);
604 
605 		xmlrpc_array_append_item(env, ret, rv);
606 
607 		xmlrpc_DECREF(rv);
608 
609 		struct report *next = report->next;
610 		free(report);
611 		report = next;
612 	}
613 
614 	if (env->fault_occurred)
615 		logging(LOG_WARNING, "method get_reports failed: %s",
616 			env->fault_string);
617 	else
618 		DEBUG_MSG(LOG_WARNING, "method get_reports successful");
619 
620 	return ret;
621 }
622 
method_stop_flow(xmlrpc_env * const env,xmlrpc_value * const param_array,void * const user_data)623 static xmlrpc_value * method_stop_flow(xmlrpc_env * const env,
624 		   xmlrpc_value * const param_array,
625 		   void * const user_data)
626 {
627 	UNUSED_ARGUMENT(user_data);
628 
629 	int rc;
630 	xmlrpc_value *ret = 0;
631 	int flow_id;
632 	struct request_stop_flow *request = 0;
633 
634 	DEBUG_MSG(LOG_WARNING, "method stop_flow called");
635 
636 	/* Parse our argument array. */
637 	xmlrpc_decompose_value(env, param_array, "({s:i,*})",
638 
639 		/* flow id */
640 		"flow_id", &flow_id);
641 
642 	if (env->fault_occurred)
643 		goto cleanup;
644 
645 	request = malloc(sizeof(struct request_stop_flow));
646 	request->flow_id = flow_id;
647 	rc = dispatch_request((struct request*)request, REQUEST_STOP_FLOW);
648 
649 	if (rc == -1)
650 		XMLRPC_FAIL(env, XMLRPC_INTERNAL_ERROR, request->r.error); /* goto cleanup on failure */
651 
652 	/* Return our result. */
653 	ret = xmlrpc_build_value(env, "()");
654 
655 cleanup:
656 	if (request)
657 		free_all(request->r.error, request);
658 
659 	if (env->fault_occurred)
660 		logging(LOG_WARNING, "method stop_flow failed: %s",
661 			env->fault_string);
662 	else
663 		DEBUG_MSG(LOG_WARNING, "method stop_flow successful");
664 
665 	return ret;
666 }
667 
668 /* This method returns version information of flowgrindd and OS as an xmlrpc struct */
method_get_version(xmlrpc_env * const env,xmlrpc_value * const param_array,void * const user_data)669 static xmlrpc_value * method_get_version(xmlrpc_env * const env,
670 		   xmlrpc_value * const param_array,
671 		   void * const user_data)
672 {
673 	UNUSED_ARGUMENT(param_array);
674 	UNUSED_ARGUMENT(user_data);
675 	struct utsname buf;
676 
677 	xmlrpc_value *ret = 0;
678 
679 	DEBUG_MSG(LOG_WARNING, "method get_version called");
680 
681 	if (uname(&buf)) {
682 		logging(LOG_WARNING, "uname() failed %s", strerror(errno));
683 		exit(1);
684 	}
685 
686 	ret = xmlrpc_build_value(env, "{s:s,s:i,s:s,s:s}",
687 				 "version", FLOWGRIND_VERSION,
688 				 "api_version", FLOWGRIND_API_VERSION,
689 				 "os_name", buf.sysname,
690 				 "os_release", buf.release);
691 
692 	if (env->fault_occurred)
693 		logging(LOG_WARNING, "method get_version failed: %s",
694 			env->fault_string);
695 	else
696 		DEBUG_MSG(LOG_WARNING, "method get_version successful");
697 
698 	return ret;
699 }
700 
701 /* This method returns the number of flows and if actual test has started */
method_get_status(xmlrpc_env * const env,xmlrpc_value * const param_array,void * const user_data)702 static xmlrpc_value * method_get_status(xmlrpc_env * const env,
703 		   xmlrpc_value * const param_array,
704 		   void * const user_data)
705 {
706 	UNUSED_ARGUMENT(param_array);
707 	UNUSED_ARGUMENT(user_data);
708 
709 	int rc;
710 	xmlrpc_value *ret = 0;
711 	struct request_get_status *request = 0;
712 
713 	DEBUG_MSG(LOG_WARNING, "method get_status called");
714 
715 	request = malloc(sizeof(struct request_get_status));
716 	rc = dispatch_request((struct request*)request, REQUEST_GET_STATUS);
717 
718 	if (rc == -1)
719 		XMLRPC_FAIL(env, XMLRPC_INTERNAL_ERROR, request->r.error); /* goto cleanup on failure */
720 
721 	/* Return our result. */
722 	ret = xmlrpc_build_value(env, "{s:i,s:i}",
723 		"started", request->started,
724 		"num_flows", request->num_flows);
725 
726 cleanup:
727 	if (request)
728 		free_all(request->r.error, request);
729 
730 	if (env->fault_occurred)
731 		logging(LOG_WARNING, "method get_status failed: %s",
732 			env->fault_string);
733 	else
734 		DEBUG_MSG(LOG_WARNING, "method get_status successful");
735 
736 	return ret;
737 }
738 
739 /**
740  * To get the daemons UUID
741  *
742  * Flowgrind rpc server dispatch the request to get the daemon UUID based on the
743  * randomness. After getting these information flowgrind rpc server encode
744  * the information and send back the details to the controller through the
745  * XML-RPC connection.
746  *
747  * @param[in,out] env XML-RPC environment object
748  * @param[in.out] param_array unused arg
749  * @param[in,out] user_data unused arg
750  * return xmlrpc_value XML-RPC value
751  */
method_get_uuid(xmlrpc_env * const env,xmlrpc_value * const param_array,void * const user_data)752 static xmlrpc_value * method_get_uuid(xmlrpc_env * const env,
753 		   xmlrpc_value * const param_array,
754 		   void * const user_data)
755 {
756 	UNUSED_ARGUMENT(param_array);
757 	UNUSED_ARGUMENT(user_data);
758 
759 	DEBUG_MSG(LOG_WARNING, "Method get_uuid called");
760 
761 	xmlrpc_value *ret = 0;
762 	struct request_get_uuid *request = malloc(sizeof(struct request_get_uuid));
763 	int rc = dispatch_request((struct request*)request, REQUEST_GET_UUID);
764 
765 	if (rc == -1)
766 		XMLRPC_FAIL(env, XMLRPC_INTERNAL_ERROR, request->r.error); /* goto cleanup on failure */
767 
768 	/* Return our result. */
769 	ret = xmlrpc_build_value(env, "{s:s}", "server_uuid", request->server_uuid);
770 
771 cleanup:
772 	if (request)
773 		free_all(request->r.error, request);
774 
775 	if (env->fault_occurred)
776 		logging(LOG_WARNING, "Method get_uuid failed: %s", env->fault_string);
777 	else
778 		DEBUG_MSG(LOG_WARNING, "Method get_uuid successful");
779 
780 	return ret;
781 }
782 
783 /* Creates listen socket for the xmlrpc server. */
bind_rpc_server(char * bind_addr,unsigned port)784 static int bind_rpc_server(char *bind_addr, unsigned port) {
785 	int rc;
786 	int fd;
787 	int optval;
788 	struct addrinfo hints, *res, *ressave;
789 	char tmp_port[100];
790 
791 	bzero(&hints, sizeof(struct addrinfo));
792 	hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
793 	hints.ai_family = AF_UNSPEC;
794 	hints.ai_socktype = SOCK_STREAM;
795 	sprintf(tmp_port, "%u", port);
796 
797 	if ((rc = getaddrinfo(bind_addr, tmp_port,
798 				&hints, &res)) != 0) {
799 		critx( "Failed to find address to bind rpc_server: %s\n",
800 			gai_strerror(rc));
801 		return -1;
802 	}
803 	ressave = res;
804 
805 	/* try to bind the first succeeding socket of
806 	   the returned addresses (libxmlrpc only supports one fd)
807 	*/
808 	do {
809 		fd = socket(res->ai_family, res->ai_socktype,
810 		res->ai_protocol);
811 		if (fd < 0)
812 			continue;
813 		/* ignore old client connections in TIME_WAIT */
814 		optval = 1;
815 		setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
816 		/* Disable Nagle algorithm to reduce latency */
817 		setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval));
818 
819 		if (bind(fd, res->ai_addr, res->ai_addrlen) == 0)
820 			break;
821 
822 		close(fd);
823 	} while ((res = res->ai_next) != NULL);
824 
825 	if (res == NULL) {
826 		crit("failed to bind RPC listen socket");
827 		freeaddrinfo(ressave);
828 		return -1;
829 	}
830 
831 	return fd;
832 }
833 
834 /* Initializes the xmlrpc server and registers exported methods */
init_rpc_server(struct fg_rpc_server * server,char * rpc_bind_addr,unsigned port)835 void init_rpc_server(struct fg_rpc_server *server, char *rpc_bind_addr, unsigned port)
836 {
837 	xmlrpc_registry * registryP;
838 	xmlrpc_env *env = &(server->env);
839 	memset(&(server->parms), 0, sizeof(server->parms));
840 
841 	xmlrpc_env_init(env);
842 	registryP = xmlrpc_registry_new(env);
843 
844 	xmlrpc_registry_add_method(env, registryP, NULL, "add_flow_destination", &add_flow_destination, NULL);
845 	xmlrpc_registry_add_method(env, registryP, NULL, "add_flow_source", &add_flow_source, NULL);
846 	xmlrpc_registry_add_method(env, registryP, NULL, "start_flows", &start_flows, NULL);
847 	xmlrpc_registry_add_method(env, registryP, NULL, "get_reports", &method_get_reports, NULL);
848 	xmlrpc_registry_add_method(env, registryP, NULL, "stop_flow", &method_stop_flow, NULL);
849 	xmlrpc_registry_add_method(env, registryP, NULL, "get_version", &method_get_version, NULL);
850 	xmlrpc_registry_add_method(env, registryP, NULL, "get_status", &method_get_status, NULL);
851 	xmlrpc_registry_add_method(env, registryP, NULL, "get_uuid", &method_get_uuid, NULL);
852 
853 	/* In the modern form of the Abyss API, we supply parameters in memory
854 	   like a normal API.  We select the modern form by setting
855 	   config_file_name to NULL:
856 	*/
857 	server->parms.config_file_name	= NULL;
858 	server->parms.registryP		= registryP;
859 	server->parms.socket_bound	= 1;
860 	server->parms.log_file_name	= NULL; /*"/tmp/xmlrpc_log";*/
861 
862 	/* Increase HTTP keep-alive duration. Using defaults the amount of
863 	 * sockets in TIME_WAIT state would become too high.
864 	 */
865 	server->parms.keepalive_timeout = 60;
866 	server->parms.keepalive_max_conn = 1000;
867 
868 	/* Disable introspection */
869 	server->parms.dont_advertise = 1;
870 
871 	logging(LOG_NOTICE, "running XML-RPC server on port %u", port);
872 	printf("Running XML-RPC server...\n");
873 
874 	server->parms.socket_handle = bind_rpc_server(rpc_bind_addr, port);
875 }
876 
877 /* Enters the XMLRPC Server main loop */
run_rpc_server(struct fg_rpc_server * server)878 void run_rpc_server(struct fg_rpc_server *server)
879 {
880 	xmlrpc_env *env = &(server->env);
881 	xmlrpc_server_abyss(env, &(server->parms), XMLRPC_APSIZE(socket_handle));
882 
883 	if (env->fault_occurred)
884 		logging(LOG_ALERT, "XML-RPC Fault: %s (%d)", env->fault_string,
885 			env->fault_code);
886 	/* xmlrpc_server_abyss() never returns */
887 }
888 
889