1 /**
2  * client.c -- thrulay library, client API implementation.
3  *
4  * Written by Stanislav Shalunov, http://www.internet2.edu/~shalunov/
5  *            Bernhard Lutzmann, belu@users.sf.net
6  *            Federico Montesino Pouzols, fedemp@altern.org
7  *
8  * @(#) $Id: client.c,v 1.1.2.13 2006/08/20 18:06:19 fedemp Exp $
9  *
10  * Copyright 2003, 2006 Internet2.
11  * Legal conditions are in file LICENSE
12  * (MD5 = ecfa50d1b0bfbb81b658c810d0476a52).
13  */
14 
15 #ifdef HAVE_CONFIG_H
16 #include <config.h>
17 #endif
18 
19 #ifdef HAVE_STDINT_H
20 #include <stdint.h>
21 #endif
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <limits.h>
25 #include <unistd.h>
26 #include <sys/types.h>
27 #include <assert.h>
28 #include <signal.h>
29 #include <sys/param.h>
30 #include <time.h>
31 #include <sys/time.h>
32 #include <fcntl.h>
33 #include <string.h>
34 #include <strings.h>
35 #include <math.h>
36 #include <errno.h>
37 #ifndef WIN32
38 #ifdef HAVE_NETDB_H
39 #include <netdb.h>
40 #endif
41 #include <sys/socket.h>
42 #include <netinet/in.h>
43 #include <netinet/in_systm.h>	/* need this under FreeBSD for
44 				 * <netinet/ip.h> */
45 #include <netinet/ip.h>
46 #include <netinet/tcp.h>
47 #include <arpa/inet.h>
48 #else
49 #define _WIN32_WINNT 0x0501   /* Will only work in Windows XP or later */
50 #include <winsock2.h>
51 #include <ws2tcpip.h>
52 #define sleep(t)    _sleep(t)
53 #define close(s) closesocket(s)  /* If close is only used with sockets*/
54 #endif /* ndef WIN32 */
55 
56 #include "client.h"
57 #include "reporting.h"
58 #include "rcs.h"
59 #include "util.h"
60 
61 #define THRULAY_GREET		THRULAY_VERSION "+"
62 
63 #define DEFAULT_PORT		5003
64 #define UDP_PORT		5003
65 #define TRY_UDP_PORTS		1000
66 
67 #define IPV4_HEADER_SIZE	20
68 #define IPV6_HEADER_SIZE	40
69 #define UDP_HEADER_SIZE		8
70 #define UDP_PAYLOAD_SIZE	24
71 
72 #ifndef SOL_IP
73 #ifdef IPPROTO_IP
74 #define SOL_IP			IPPROTO_IP
75 #endif
76 #endif
77 
78 #ifndef SOL_TCP
79 #ifdef IPPROTO_TCP
80 #define SOL_TCP			IPPROTO_TCP
81 #endif
82 #endif
83 
84 /* IP_MTU not defined in <bits/in.h> under Linux */
85 #if defined (__LINUX__) && !defined(IP_MTU)
86 #define IP_MTU 14
87 #endif
88 
89 #if defined (__SOLARIS__)
90 #define RANDOM_MAX		4294967295UL	/* 2**32-1 */
91 #elif defined (__DARWIN__)
92 #define RANDOM_MAX		LONG_MAX	/* Darwin */
93 #else
94 #define RANDOM_MAX		RAND_MAX	/* Linux, FreeBSD, Windows */
95 #endif
96 
97 RCS_ID("@(#) $Id: client.c,v 1.1.2.13 2006/08/20 18:06:19 fedemp Exp $")
98 
99 int
100 thrulay_tcp_init(void);
101 
102 int
103 thrulay_tcp_init_id(int);
104 
105 void
106 thrulay_tcp_exit(void);
107 
108 void
109 thrulay_tcp_exit_id(int);
110 
111 int
112 thrulay_tcp_start(void);
113 
114 void
115 thrulay_tcp_stop(void);
116 
117 int
118 thrulay_tcp_report(void);
119 
120 int
121 thrulay_tcp_report_id(int);
122 
123 void
124 thrulay_tcp_report_final(void);
125 
126 void
127 thrulay_tcp_report_final_id(int);
128 
129 int
130 thrulay_udp_init(void);
131 
132 void
133 thrulay_udp_exit(void);
134 
135 int
136 thrulay_udp_start(void);
137 
138 int
139 thrulay_udp_report_final(void);
140 
141 /* Statistics */
142 
143 #define STREAM_PER_INTERVAL_QUANTILE_SEQ(id)  (2 * id)
144 #define STREAM_FINAL_QUANTILE_SEQ(id)         (2 * id + 1)
145 
146 /* Get how many quantile sequences are required to keep statistics for
147    a given number of streams. */
148 int
149 required_quantile_seqs(int num_streams);
150 
151 int
152 tcp_stats_init(void);
153 
154 /* Timer */
155 int
156 timer_start(void);
157 
158 int
159 timer_stop(void);
160 
161 int
162 timer_check(void);
163 
164 int
165 timer_report(struct timeval *);
166 
167 void
168 timer_end(struct timeval *);
169 
170 /* Client options */
171 static thrulay_opt_t thrulay_opt;
172 
173 static fd_set rfds_orig, wfds_orig;
174 static int maxfd = 0;
175 static char *block;
176 static int stop_test = 0;
177 static int server_block_size;
178 static int local_window, server_window;
179 static int mtu, mss;
180 
181 /* UDP test */
182 static unsigned int client_port, server_port;
183 static unsigned int packet_size;
184 static unsigned int protocol_rate;	/* In packets per 1000 seconds. */
185 static int tcp_sock, udp_sock;
186 static uint64_t npackets;
187 static struct sockaddr *server = NULL;
188 static struct sockaddr *udp_destination = NULL;
189 static socklen_t udp_destination_len;
190 
191 /* Stream information */
192 static struct _stream {
193 	/* Connection socket */
194 	int sock;
195 	/* Counters for tracking send/recv progress with non-blocking I/O */
196 	size_t wcount;
197 	size_t rcount;
198 } stream[STREAMS_MAX];
199 
200 /* Statistics information (per stream) */
201 static struct _stat {
202 	/* Block counter */
203 	unsigned int blocks_since_first;
204 	unsigned int blocks_since_last;
205 
206 	/* RTT */
207 	double min_rtt_since_first;
208 	double min_rtt_since_last;
209 	double max_rtt_since_first;
210 	double max_rtt_since_last;
211 	double tot_rtt_since_first;
212 	double tot_rtt_since_last;
213 } stats[STREAMS_MAX];
214 
215 /* Timer information */
216 static struct _timer {
217 	struct timeval start;
218 	struct timeval stop;
219 	struct timeval next;
220 	struct timeval last;
221 	double runtime;
222 } timer;
223 
224 /* Be very careful with error code numbers! */
225 static const char *thrulay_client_error_s[] = {
226 	"No error", /* 0 */
227 	"gettimeofday(): failed", /* 1 */
228 	"could not initialize UDP test", /* 2 */
229 	"getaddrinfo(): failed for multicast group", /* 3 */
230 	"malloc(): failed", /* 4 */
231 	"different local window", /* 5 */
232 	"gettimeofday() failed, unable to start timer", /* 6 */
233 	"gettimeofday() failed, unable to process new timestamp", /* 7 */
234 	"gettimeofday() failed, unable to print stream stats.", /* 8 */
235 	"getaddrinfo(): failed for server name resolution while initializing a TCP test", /* 9 */
236 	"getaddrinfo(): failed for server name resolution while initializing an UDP test", /* 10 */
237 	"could not establish connection to server", /* 11 */
238 	"could not read server banner", /* 12 */
239 	"not a thrulay server responded", /* 13 */
240 	"could not read rejection reason", /* 14 */
241 	"stop (connection rejected", /* 15 */
242 	"could not send session proposal", /* 16 */
243 	"could not read session response", /* 17 */
244 	"server closed connection after proposal", /* 18 */
245 	"could not send terminating message", /* 19 */
246 	"could not recv session results", /* 20 */
247 	"server rejected our UDP proposal", /* 21 */
248 	"malformed session response from server", /* 22 */
249 	"socket address family not supported", /* 23 */
250 	"socket address family not supported for multicast", /* 24 */
251 	"nanosleep() failed", /* 25 */
252 	"sending UDP packet failed",  /* 26 */
253 	"server gave block size less than MIN_BLOCK", /* 27 */
254 	"server gave block size too large", /* 28 */
255 	"server gave ridiculously small window", /* 29 */
256 	"different server window", /* 30 */
257 	"different server block size", /* 31 */
258 	"different MSS", /* 32 */
259 	"different MTU", /* 33 */
260 	"select(): failed", /* 34 */
261 	"WSAStartup failed while initializing client", /* 35 */
262 	"error in quantile computation" /* 36 */
263 };
264 
265 static const int max_thrulay_client_error = 36;
266 
267 const char *
thrulay_client_strerror(int errorcode)268 thrulay_client_strerror(int errorcode)
269 {
270 	if( 0 >= errorcode && errorcode >= -max_thrulay_client_error){
271 		return thrulay_client_error_s[-errorcode];
272 	} else {
273 		return NULL;
274 	}
275 }
276 
277 /* Set default options. */
278 void
thrulay_client_options_init(thrulay_opt_t * opt)279 thrulay_client_options_init (thrulay_opt_t *opt)
280 {
281 	if(NULL == opt)
282 		return;
283 
284 	opt->server_name = NULL;
285 	opt->num_streams = 1;
286 	opt->test_duration = 60;
287 	opt->reporting_interval = 1;
288 	opt->reporting_verbosity = 0;
289 	opt->window = 4194304;
290 	opt->block_size = 0;
291 	opt->port = DEFAULT_PORT;
292 	opt->rate = 0;
293 	opt->dscp = 0;
294 	opt->busywait = 0;
295 	opt->ttl = 1;
296 	opt->mcast_group = NULL;
297 }
298 
299 int
tcp_stats_init(void)300 tcp_stats_init (void)
301 {
302 	int rc, id;
303 
304 	for (id = 0; id < thrulay_opt.num_streams; id++)
305 	{
306 		/* Blocks */
307 		stats[id].blocks_since_first = 0;
308 		stats[id].blocks_since_last = 0;
309 
310 		/* RTT */
311 		stats[id].min_rtt_since_first = 1000.0;
312 		stats[id].min_rtt_since_last = 1000.0;
313 		stats[id].max_rtt_since_first = -1000.0;
314 		stats[id].max_rtt_since_last = -1000.0;
315 		stats[id].tot_rtt_since_first = 0.0;
316 		stats[id].tot_rtt_since_last = 0.0;
317 	}
318 
319 	rc = quantile_init(required_quantile_seqs(thrulay_opt.num_streams),
320 			   QUANTILE_EPS, 1024 * 1024);
321 	if (-1 == rc) {
322 		return -4;
323 	}
324 
325 	return 0;
326 }
327 
328 void
tcp_stats_exit()329 tcp_stats_exit()
330 {
331 	/* Deinitialize quantile sequences. */
332 	quantile_exit();
333 }
334 
335 int
required_quantile_seqs(int num_streams)336 required_quantile_seqs(int num_streams)
337 {
338 	int quantile_seqs;
339 
340 	/* We need two quantile sequences per stream (one for interval
341 	   reports and one for the stream final report - plus another
342 	   quantile sequence for global statistics. */
343 	if (num_streams == 1) {
344 		/* Fortunately, if there is only one stream, we can
345 		   save 1 quantile sequence. */
346 		quantile_seqs = 2;
347 	} else {
348 		quantile_seqs = 2 * num_streams + 1;
349 	}
350 	return quantile_seqs;
351 }
352 
353 /* Process new timestamp. */
354 int
new_timestamp(int id,struct timeval * tv)355 new_timestamp(int id, struct timeval *tv)
356 {
357 	struct timeval this;
358 	double relative;
359 	int rc;
360 
361 	if (tsc_gettimeofday(&this) == -1) {
362 		perror("gettimeofday");
363 		return -7;
364 	}
365 	normalize_tv(&this);
366 
367 	relative = time_diff(tv, &this);
368 	if (relative < 0) {
369 		error(ERR_WARNING, "negative delay, ignoring block");
370 		return 0;
371 	}
372 
373 	/* quantile sequence for stream interval report. */
374 	rc = quantile_value_checkin(STREAM_PER_INTERVAL_QUANTILE_SEQ(id),
375 				    relative);
376 	if (rc < 0)
377 		return -36;
378 	/* quantile sequence for stream final report. */
379 	rc = quantile_value_checkin(STREAM_FINAL_QUANTILE_SEQ(id), relative);
380 	if (rc < 0)
381 		return -36;
382 	/* If there is more than one stream, we use an additional
383 	   global sequence for the global final report. */
384 	if (thrulay_opt.num_streams > 1) {
385 		rc = quantile_value_checkin(2 * thrulay_opt.num_streams,
386 					    relative);
387 		if (rc < 0)
388 			return -36;
389 	}
390 
391 	/* Update statistics for stream. */
392 	stats[id].blocks_since_first++;
393 	stats[id].blocks_since_last++;
394 	if (stats[id].min_rtt_since_first > relative)
395 		stats[id].min_rtt_since_first = relative;
396 	if (stats[id].min_rtt_since_last > relative)
397 		stats[id].min_rtt_since_last = relative;
398 	if (stats[id].max_rtt_since_first < relative)
399 		stats[id].max_rtt_since_first = relative;
400 	if (stats[id].max_rtt_since_last < relative)
401 		stats[id].max_rtt_since_last = relative;
402 	stats[id].tot_rtt_since_first += relative;
403 	stats[id].tot_rtt_since_last += relative;
404 
405 	return 0;
406 }
407 
408 /* If test duration is over, stops TCP test. */
409 void
timer_end(struct timeval * now)410 timer_end (struct timeval *now)
411 {
412 	if (now->tv_sec > timer.stop.tv_sec
413 			|| (now->tv_sec == timer.stop.tv_sec
414 				&& now->tv_usec >= timer.stop.tv_usec)) {
415 		thrulay_tcp_stop();
416 	}
417 }
418 
419 /* If progress report should be displayed does so and updates timer values. */
420 int
timer_report(struct timeval * now)421 timer_report (struct timeval *now)
422 {
423 	int rc;
424 
425 	if (now->tv_sec > timer.next.tv_sec
426 			|| (now->tv_sec == timer.next.tv_sec
427 				&& now->tv_usec >= timer.next.tv_usec)) {
428 		rc = thrulay_tcp_report();
429 		if (rc < 0)
430 			return rc;
431 
432 		timer.last.tv_sec = now->tv_sec;
433 		timer.last.tv_usec = now->tv_usec;
434 
435 		while (timer.next.tv_sec < now->tv_sec
436 				|| (timer.next.tv_sec == now->tv_sec
437 					&& timer.next.tv_usec <= now->tv_usec))
438 			timer.next.tv_sec += thrulay_opt.reporting_interval;
439 	}
440 
441 	return 0;
442 }
443 
444 int
timer_check(void)445 timer_check (void)
446 {
447 	int rc;
448 	struct timeval this;
449 
450 	if (tsc_gettimeofday(&this) == -1) {
451 		perror("gettimeofday");
452 		return -1;
453 	}
454 	normalize_tv(&this);
455 
456 	if (0 != thrulay_opt.reporting_interval) {
457 		rc = timer_report(&this);
458 		if (rc < 0)
459 			return rc;
460 	}
461 	timer_end(&this);
462 
463 	return 0;
464 }
465 
466 /* Stop timer. Calculates runtime and saves this to `timer.runtime'. */
467 int
timer_stop(void)468 timer_stop (void)
469 {
470 	struct timeval this;
471 
472 	if (tsc_gettimeofday(&this) == -1) {
473 		perror("gettimeofday");
474 		return -1;
475 	}
476 	normalize_tv(&this);
477 
478 	/* Set final runtime. */
479 	timer.runtime = time_diff(&timer.start, &this);
480 
481 	return 0;
482 }
483 
484 /* Start timer. This saves the starting time to `timer.start', saves time when
485  * next progress report should be displayed in `timer.next' and time when
486  * test should stop in `timer.stop'. This are all timeval structures. */
487 int
timer_start(void)488 timer_start (void)
489 {
490 	if (tsc_gettimeofday(&timer.start) == -1) {
491 		perror("gettimeofday");
492 		return -6;
493 	}
494 	normalize_tv(&timer.start);
495 
496 	timer.stop.tv_sec = timer.start.tv_sec + thrulay_opt.test_duration;
497 	timer.stop.tv_usec = timer.start.tv_usec;
498 
499 	if (0 != thrulay_opt.reporting_interval) {
500 		/* if intermediate reports not disabled */
501 		timer.last.tv_sec = timer.start.tv_sec;
502 		timer.last.tv_usec = timer.start.tv_usec;
503 
504 		timer.next.tv_sec = timer.start.tv_sec +
505 			thrulay_opt.reporting_interval;
506 		timer.next.tv_usec = timer.start.tv_usec;
507 	} else {
508 		/* if disabled. Unnecessary, just to keep consistency */
509 		timer.last.tv_sec = timer.stop.tv_sec + 1;
510 		timer.last.tv_usec = 0;
511 
512 		timer.next.tv_sec = timer.stop.tv_sec + 1;
513 		timer.next.tv_usec = 0;
514 	}
515 
516 	return 0;
517 }
518 
519 void
thrulay_tcp_report_final_id(int id)520 thrulay_tcp_report_final_id(int id)
521 {
522 	int rc, quantile_seq;
523 	double quantile_25, quantile_50, quantile_75;
524 
525 	if (stats[id].blocks_since_first == 0) {
526 		/* This stream was not very active :) */
527 		if (thrulay_opt.reporting_verbosity > 0) {
528 			printf("#(%2d) %8.3f %8.3f %8.3f %8.3f %8.3f %8.3f "
529 			       "%8.3f %8.3f\n",
530 			       id, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
531 		} else {
532 			printf("#(%2d) %8.3f %8.3f %8.3f %8.3f %8.3f %8.3f\n",
533 			       id, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
534 		}
535 		return;
536 	}
537 
538 	if (stats[id].blocks_since_first > 3 ) {
539 		quantile_seq = STREAM_FINAL_QUANTILE_SEQ(id);
540 		/* Finish stream final sequence. */
541 		rc = quantile_finish(quantile_seq);
542 		/* Get results. */
543 		quantile_output(quantile_seq, stats[id].blocks_since_first,
544 				0.25, &quantile_25);
545 		quantile_output(quantile_seq, stats[id].blocks_since_first,
546 				0.50, &quantile_50);
547 		quantile_output(quantile_seq, stats[id].blocks_since_first,
548 				0.75, &quantile_75);
549 	} else {
550 		if (1 == stats[id].blocks_since_first) {
551 			quantile_25 = quantile_50 = quantile_75 =
552 				stats[id].min_rtt_since_first;
553 		} else if (2 == stats[id].blocks_since_first) {
554 			quantile_25 = quantile_50 =
555 				stats[id].min_rtt_since_first;
556 			quantile_75 =
557 				stats[id].max_rtt_since_first;
558 		}  else {
559 			quantile_25 = stats[id].min_rtt_since_first;
560 			quantile_50 = stats[id].tot_rtt_since_first -
561 				stats[id].max_rtt_since_first -
562 				stats[id].min_rtt_since_first;
563 			quantile_75 = stats[id].max_rtt_since_first;
564 		}
565 	}
566 
567 	printf("#(%2d) %8.3f %8.3f %8.3f %8.3f %8.3f",
568 	       id,
569 	       0.0,
570 	       timer.runtime,
571 	       (double)stats[id].blocks_since_first *
572 	       (double)server_block_size * 8.0 / 1000000.0 /
573 	       timer.runtime,
574 	       1000.0 * quantile_50,                  /* delay */
575 	       1000.0 * (quantile_75 - quantile_25)   /* jitter */
576 	       );
577 
578 	/* Verbose output shows min., avg. and max. */
579 	if (thrulay_opt.reporting_verbosity > 0) {
580 		printf(" %8.3f %8.3f %8.3f\n",
581 		       stats[id].min_rtt_since_first * 1000.0,
582 		       stats[id].tot_rtt_since_first * 1000.0 /
583 		       (double)stats[id].blocks_since_first,
584 		       stats[id].max_rtt_since_first * 1000.0);
585 	} else {
586 		printf("\n");
587 	}
588 }
589 
590 
591 void
thrulay_tcp_report_final(void)592 thrulay_tcp_report_final(void)
593 {
594 	double mbs = 0.0;
595 	double min_rtt = 1000.0;
596 	double max_rtt = -1000.0;
597 	double tot_rtt = 0.0;
598 	double avg_rtt_sum = 0.0;
599 	uint64_t total_blocks = 0;
600 	double quantile_25, quantile_50, quantile_75;
601 	int id, gseq;
602 
603 
604 	/* If just one stream, no need to compute any more. */
605 	if (thrulay_opt.num_streams > 1) {
606 		for (id = 0; id < thrulay_opt.num_streams; id++)
607 			thrulay_tcp_report_final_id(id);
608 	}
609 
610 	/* Now calculate global statistics. */
611 	for (id = 0; id < thrulay_opt.num_streams; id++) {
612 		/* Total number of blocks */
613 		total_blocks += stats[id].blocks_since_first;
614 
615 		/* Calculate throughput of all streams together. */
616 		mbs += (double)stats[id].blocks_since_first *
617 			(double)server_block_size * 8.0 / 1000000.0 /
618 			timer.runtime;
619 
620 		/* Calculate minimum RTT */
621 		min_rtt = (min_rtt < stats[id].min_rtt_since_first ?
622 			   min_rtt : stats[id].min_rtt_since_first);
623 
624 		/* Calculate maximum RTT */
625 		max_rtt = (stats[id].max_rtt_since_first < max_rtt ?
626 			   max_rtt : stats[id].max_rtt_since_first);
627 
628 		/* Calculate sum of average RTT */
629 		if (stats[id].blocks_since_first != 0) {
630 			tot_rtt += stats[id].tot_rtt_since_first;
631 			avg_rtt_sum += stats[id].tot_rtt_since_first *
632 				1000.0 /
633 				(double)stats[id].blocks_since_first;
634 		}
635 	}
636 
637 	/* Finish global sequence. */
638 	if (thrulay_opt.num_streams > 1 ) {
639 		gseq = 2 * thrulay_opt.num_streams;
640 	} else {
641 		/* Just 1 stream, so the global sequence is the same
642 		   as the stream total sequence. */
643 		gseq = 1;
644 	}
645 
646 	if (total_blocks > 3) {
647 		quantile_finish(gseq);
648 		/* Get global quantiles. */
649 		quantile_output(gseq, total_blocks, 0.25, &quantile_25);
650 		quantile_output(gseq, total_blocks, 0.50, &quantile_50);
651 		quantile_output(gseq, total_blocks, 0.75, &quantile_75);
652 	} else {
653 		if (1 == total_blocks) {
654 			quantile_25 = quantile_50 = quantile_75 = min_rtt;
655 		} else if (2 == total_blocks) {
656 			quantile_25 = quantile_50 = min_rtt;
657 			quantile_75 = max_rtt;
658 		}  else {
659 			quantile_25 = min_rtt;
660 			quantile_50 = tot_rtt - max_rtt - min_rtt;
661 			quantile_75 = max_rtt;
662 		}
663 	}
664 
665 	printf("#(**) %8.3f %8.3f %8.3f %8.3f %8.3f",
666 	       0.0,
667 	       timer.runtime,			    /* global runtime */
668 	       mbs,			 	    /* MB/s */
669 	       1000.0 * quantile_50,                /* delay */
670 	       1000.0 * (quantile_75 - quantile_25) /* jitter */
671 	       );
672 
673 	/* Verbose output shows min., avg. and max. */
674 	if (thrulay_opt.reporting_verbosity > 0) {
675 		printf(" %8.3f %8.3f %8.3f\n",
676 		       min_rtt * 1000.0,		/* minimal RTT */
677 		       avg_rtt_sum / thrulay_opt.num_streams,	/* avg. RTT */
678 		       max_rtt * 1000.0 		/* maximal RTT */
679 		       );
680 	} else {
681 		printf("\n");
682 	}
683 }
684 
685 /* Variables for progress reporting. */
686 char report_buffer[STREAMS_MAX * 80];
687 char *report_buffer_ptr = NULL;
688 int report_buffer_len = 0;
689 
690 int
thrulay_tcp_report_id(int id)691 thrulay_tcp_report_id (int id)
692 {
693 	struct timeval this;
694 	double diff_first_last;
695 	double relative;	/* Time difference between now and last */
696 	int qseq = STREAM_PER_INTERVAL_QUANTILE_SEQ(id);
697 	int rc, n = 0;
698 
699 	if (tsc_gettimeofday(&this) == -1) {
700 		perror("gettimeofday");
701 		return -8;
702 	}
703 	normalize_tv(&this);
704 
705 	diff_first_last = time_diff(&timer.start, &timer.last);
706 	relative = time_diff(&timer.last, &this);
707 
708 	/* Time must be monotonically increasing, at least
709 	 * roughly, for this program to work. */
710 
711 	if (stats[id].blocks_since_last == 0) {
712 		n = sprintf(report_buffer_ptr,
713 			    " (%2d) %8.3f %8.3f %8.3f %8.3f %8.3f",
714 			    id,
715 			    diff_first_last,
716 			    diff_first_last + relative,
717 			    0.0,	/* MB/s */
718 			    0.0,        /* delay */
719 			    0.0         /* jitter */
720 			    );
721 		if (thrulay_opt.reporting_verbosity > 0) {
722 			n += sprintf(report_buffer_ptr + n,
723 				      " %8.3f %8.3f %8.3f\n",
724 				      0.0,	/* min RTT since last */
725 				      0.0,	/* avg RTT since last */
726 				      0.0);	/* max RTT since last */
727 		} else {
728 			n += sprintf(report_buffer_ptr + n, "\n");
729 		}
730 	} else {
731 		double quantile_25, quantile_50, quantile_75;
732 
733 		if (stats[id].blocks_since_last > 3) {
734 
735 			/* Finish interval quantile sequence. */
736 			rc = quantile_finish(qseq);
737 			if (rc < 0)
738 				return -36;
739 			/* Get results. */
740 			rc = quantile_output(qseq, stats[id].blocks_since_last,
741 					     0.25,
742 					     &quantile_25);
743 			if (rc < 0)
744 				return -36;
745 			rc = quantile_output(qseq, stats[id].blocks_since_last,
746 					     0.50, &quantile_50);
747 			if (rc < 0)
748 				return -36;
749 			rc = quantile_output(qseq, stats[id].blocks_since_last,
750 					     0.75, &quantile_75);
751 			if (rc < 0)
752 				return -36;
753 		} else {
754 			if (1 == stats[id].blocks_since_last) {
755 				quantile_25 = quantile_50 = quantile_75 =
756 					stats[id].min_rtt_since_last;
757 			} else if (2 == stats[id].blocks_since_last) {
758 				quantile_25 = quantile_50 =
759 					stats[id].min_rtt_since_last;
760 				quantile_75 =
761 					stats[id].max_rtt_since_last;
762 			} else {
763 				quantile_25 = stats[id].min_rtt_since_last;
764 				quantile_50 = stats[id].tot_rtt_since_last -
765 					stats[id].max_rtt_since_last -
766 					stats[id].min_rtt_since_last;
767 				quantile_75 = stats[id].max_rtt_since_last;
768 			}
769 		}
770 
771 		n = sprintf(report_buffer_ptr,
772 			     " (%2d) %8.3f %8.3f %8.3f %8.3f %8.3f",
773 			     id,
774 			     diff_first_last,
775 			     diff_first_last + relative,
776 			     (double)stats[id].blocks_since_last *
777 			     (double)server_block_size *
778 			     8.0 / 1000000.0 / relative,
779 			     1000.0 * quantile_50,                 /*delay*/
780 			     1000.0 * (quantile_75 - quantile_25)  /*jitter*/
781 			     );
782 
783 		/* Verbose output shows min., avg. and max. */
784 		if (thrulay_opt.reporting_verbosity > 0) {
785 			n += sprintf(report_buffer_ptr + n,
786 				     " %8.3f %8.3f %8.3f\n",
787 				     stats[id].min_rtt_since_last*1000.0,
788 				     stats[id].tot_rtt_since_last*1000.0 /
789 				     (double)stats[id].blocks_since_last,
790 				     stats[id].max_rtt_since_last*1000.0);
791 		} else {
792 			n += sprintf(report_buffer_ptr + n, "\n");
793 		}
794 	}
795 	report_buffer_ptr += n;
796 	report_buffer_len += n;
797 
798 	stats[id].blocks_since_last = 0;
799 	stats[id].min_rtt_since_last = 1000.0;
800 	stats[id].max_rtt_since_last = -1000.0;
801 	stats[id].tot_rtt_since_last = 0.0;
802 
803 	/* Reinit interval quantile sequence. */
804 	quantile_exit_seq(qseq);
805 	quantile_init_seq(qseq);
806 
807 
808 	return 0;
809 }
810 
811 int
thrulay_tcp_report(void)812 thrulay_tcp_report (void)
813 {
814 	int id;
815 
816 	report_buffer_ptr = &report_buffer[0];
817 	report_buffer_len = 0;
818 
819 	for (id = 0; id < thrulay_opt.num_streams; id++) {
820 		int rc = thrulay_tcp_report_id(id);
821 		if (rc < 0)
822 			return rc;
823 	}
824 
825 	/* Display progress report. */
826 	write_exactly(STDOUT_FILENO, report_buffer, report_buffer_len);
827 
828 	return 0;
829 }
830 
831 /*
832  * Return a socket connected to the right TCP port on the right
833  * server.  Use the given window size, if non-zero, locally.  Store
834  * the actual window size in *real_window.  Store server socket
835  * address structure in `saptr' (if both saptr and lenp are non-NULL) and
836  * size of socket address structure in `lenp'.
837  */
838 int
name2socket(char * server_name,int port,int window,int * real_window,void ** saptr,socklen_t * lenp)839 name2socket(char *server_name, int port, int window, int *real_window,
840 	void **saptr, socklen_t *lenp)
841 {
842 	int sockfd, n;
843 	struct addrinfo hints, *res, *ressave;
844 	char service[7];
845 
846 	memset(&hints, 0, sizeof(struct addrinfo));
847 	hints.ai_family = AF_UNSPEC;
848 	hints.ai_socktype = SOCK_STREAM;
849 
850 	snprintf(service, sizeof(service), "%d", port);
851 
852 	if ((n = getaddrinfo(server_name, service, &hints, &res)) != 0) {
853 		fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(n));
854 		return -9;
855 	}
856 	ressave = res;
857 
858 	do {
859 		sockfd = socket(res->ai_family, res->ai_socktype,
860 				res->ai_protocol);
861 		if (sockfd < 0)
862 			continue;	/* ignore this one */
863 
864 		if (window)
865 			*real_window = set_window_size(sockfd, window);
866 
867 		if (connect(sockfd, res->ai_addr, res->ai_addrlen) == 0)
868 			break;		/* success */
869 
870 		close(sockfd);
871 	} while ((res = res->ai_next) != NULL);
872 
873 	if (res == NULL) {
874 		return -11;
875 	}
876 
877 	if (saptr && lenp) {
878 		*saptr = malloc(res->ai_addrlen);
879 		if (*saptr == NULL) {
880 			perror("malloc");
881 			return -4;
882 		}
883 		memcpy(*saptr, res->ai_addr, res->ai_addrlen);
884 		*lenp = res->ai_addrlen;
885 	}
886 
887 	freeaddrinfo(ressave);
888 
889 	return sockfd;
890 }
891 
892 /* Read the thrulay greeting from a TCP connection associated with socket s. */
893 int
read_greeting(int s)894 read_greeting(int s)
895 {
896 	char buf[1024];
897 	int rc;
898 	size_t greetlen = sizeof(THRULAY_GREET) - 1;
899 
900 	rc = recv_exactly(s, buf, greetlen);
901 	assert(rc <= (int) greetlen);
902 	if (rc != (int) greetlen) {
903 		if (rc == -1)
904 			perror("recv");
905 		return -12;
906 	}
907 	if (strncmp(buf, THRULAY_VERSION, sizeof(THRULAY_VERSION) - 1) != 0)
908 		return -13;
909 	if (buf[greetlen - 1] != '+') {
910 		error(ERR_WARNING, "connection rejected");
911 		rc = recv(s, buf, sizeof(buf) - 1, 0);
912 		buf[sizeof(buf) - 1] = '\0';
913 		if (rc == -1) {
914 			perror("reading rejection reason");
915 			return -14;
916 		}
917 		assert(rc < (int) sizeof(buf));
918 		buf[rc] = '\0';
919 		fprintf(stderr, "server said: %s", buf);
920 		if (buf[rc-1] != '\n')
921 			fprintf(stderr, "\n");
922 		return -15;
923 	}
924 
925 	return 0;
926 }
927 
928 int
send_proposal(int s,char * proposal,int proposal_size)929 send_proposal(int s, char *proposal, int proposal_size)
930 {
931 	int rc;
932 
933 	rc = send_exactly(s, proposal, (size_t) proposal_size);
934 	assert(rc <= proposal_size);
935 	if (rc < proposal_size) {
936 		if (rc == -1)
937 			perror("send");
938 		return -16;
939 	}
940 
941 	return 0;
942 }
943 
944 /* Read response to a proposal.  Return the size of the response. */
945 int
read_response(int s,char * buf,int max)946 read_response(int s, char *buf, int max)
947 {
948 	int rc;
949 
950 	/* XXX: Assume that few-byte session response will come in one
951            TCP packet and read in one block. */
952 	rc = recv(s, buf, max - 1, 0);
953 	assert(rc < max);
954 	if (rc == -1) {
955 		perror("recv");
956 		return -17;
957 	} else if (rc == 0) {
958 		return -18;
959 	}
960 	assert(rc > 0);
961 	buf[rc] = '\0';
962 	return rc;
963 }
964 
965 int
thrulay_udp_report_final(void)966 thrulay_udp_report_final (void)
967 {
968 	int rc;
969 	char buf[65536];
970 
971 	snprintf(buf, sizeof(buf), "+%llu:", (long long unsigned)npackets);
972 
973 	rc = send_exactly(tcp_sock, buf, strlen(buf));
974 	if (rc == -1)
975 		return -19;
976 
977 	while ((rc = recv(tcp_sock, buf, sizeof(buf) - 1, 0)) != 0) {
978 		if (rc == -1) {
979 			perror("recv");
980 			return -20;
981 		}
982 		write_exactly(STDOUT_FILENO, buf, rc);
983 	}
984 
985 	return 0;
986 }
987 
988 int
thrulay_udp_start(void)989 thrulay_udp_start (void)
990 {
991 	int rc;
992 	int val;
993 	char buf[65536];
994 	char random_state[256];
995 	int to_write;
996 	uint64_t packet;
997 	uint32_t msb, lsb;
998 	char nonce[8];
999 	int n, response_size;
1000 	double urand;
1001 	double erand;
1002 	double emean;
1003 	struct timespec req, rem;
1004 	struct timeval this, next;
1005 	long long unsigned npackets_llu;
1006 	int header_size = 0;
1007 
1008 	to_write = snprintf(buf, sizeof(buf), "%s:u:%u:%u:%u:%llu+",
1009 			    THRULAY_VERSION, client_port,
1010 			    thrulay_opt.block_size, protocol_rate,
1011 			    (long long unsigned)npackets);
1012 	rc = send_proposal(tcp_sock, buf, to_write);
1013 	if (rc < 0)
1014 		return rc;
1015 
1016 	rc = timer_start();
1017 	if (rc < 0)
1018 		return rc;
1019 
1020 	response_size = read_response(tcp_sock, buf, sizeof(buf));
1021 	if (response_size < 0)
1022 		return response_size;
1023 
1024 	if (strcmp(buf, "u:-") == 0)
1025 		return -21;
1026 	rc = sscanf(buf, "%u:%u:%u:%llu:%n", &server_port, &packet_size,
1027 		    &protocol_rate, &npackets_llu, &n);
1028 	if ((rc != 4) || (response_size != n+9) || (buf[n+8] != '+') ||
1029 	    (packet_size > (int)sizeof(buf))) {
1030 		return -22;
1031 	}
1032 	memcpy(nonce, buf+n, sizeof(nonce));
1033 	npackets = (uint64_t)npackets_llu;
1034 
1035 	/* Set target UDP server port and header size used for sending the
1036 	 * UDP packets. */
1037 	switch (udp_destination->sa_family) {
1038 	case AF_INET:
1039 		{
1040 			struct sockaddr_in *sin =
1041 				(struct sockaddr_in *)udp_destination;
1042 			sin->sin_port = htons(server_port);
1043 
1044 			header_size = IPV4_HEADER_SIZE + UDP_HEADER_SIZE;
1045 		}
1046 		break;
1047 	case AF_INET6:
1048 		{
1049 			struct sockaddr_in6 *sin6 =
1050 				(struct sockaddr_in6 *)udp_destination;
1051 			sin6->sin6_port = htons(server_port);
1052 
1053 			header_size = IPV6_HEADER_SIZE + UDP_HEADER_SIZE;
1054 		}
1055 		break;
1056 	default:
1057 		return -23;
1058 	}
1059 
1060 	/* Disable keep-alives on the control TCP connection. */
1061 	val = 0;
1062 	rc = setsockopt(tcp_sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&val,
1063 	                sizeof(val));
1064 	if (rc == -1)
1065 		error(ERR_WARNING, "failed to disable keep-alives");
1066 
1067 #ifdef ENABLE_THRULAY_MULTICAST
1068 	/* Set TTL field if requested. */
1069 	if (1 != thrulay_opt.ttl) {
1070 		switch (udp_destination->sa_family) {
1071 			int rc;
1072 		case AF_INET:
1073 			rc = setsockopt(udp_sock, IPPROTO_IP,
1074 					IP_MULTICAST_TTL,
1075 					(void*)&thrulay_opt.ttl,
1076 					sizeof(thrulay_opt.ttl));
1077 			if (rc < 0) {
1078 				error(ERR_WARNING,
1079 				      "setsockopt(IP_MULTICAST_TTL) failed, "
1080 				      "continuing.");
1081 			}
1082 			break;
1083 		case AF_INET6:
1084 			rc = setsockopt(udp_sock, IPPROTO_IPV6,
1085 					IPV6_MULTICAST_HOPS,
1086 					(void*)&thrulay_opt.ttl,
1087 					sizeof(thrulay_opt.ttl));
1088 			if (rc < 0) {
1089 				error(ERR_WARNING,
1090 				      "setsockopt(IPV6_MULTICAST_HOPS) failed,"
1091 				      " continuing.");
1092 			}
1093 
1094 			break;
1095 		default:
1096 			return -24;
1097 		}
1098 	}
1099 #endif
1100 
1101 	/* Increase the buffer space of the sending UDP socket. */
1102 	set_window_size_directed(udp_sock, thrulay_opt.window, SO_SNDBUF);
1103 
1104 	memset(buf, '2', sizeof(buf));	/* Nothing special about '2'. */
1105 	memcpy(buf, nonce, sizeof(nonce));
1106 #ifdef HAVE_INITSTATE
1107 	initstate(getpid() + getppid() + time(NULL),
1108 		random_state, sizeof(random_state));
1109 #else
1110 	srand(time(NULL)+(unsigned int)random_state);
1111 #endif
1112 
1113 	/* RFC 2330: Chapter 11.1.3
1114 	 * lambda = 1/emean */
1115 	emean = 1000.0/(double)protocol_rate;
1116 
1117 	/* Fill `next' timeval with current time */
1118 	if (tsc_gettimeofday(&next) == -1) {
1119 		perror("gettimeofday");
1120 		return -1;
1121 	}
1122 	normalize_tv(&next);
1123 
1124 	for (packet = 0; packet < npackets; packet++) {
1125 		/* RFC 2330: Chapter 11.1.3
1126 		 * (Generating Poisson Sampling Intervals)
1127 		 *
1128 		 * Calculate Ui, a uniformly distributed (pseudo) random number
1129 		 * between 0 and 1.
1130 		 */
1131 #ifdef HAVE_INITSTATE
1132 		urand = (double)((random()+1.0)/(RANDOM_MAX+1.0));
1133 #else
1134                 urand = (double)((rand()+1.0)/(RAND_MAX+1.0));
1135 #endif
1136 		assert(urand > 0 && urand <= 1);
1137 
1138 		/* RFC 2330: Chapter 11.1.3
1139 		 *
1140 		 * Ei = -log(Ui) / lambda
1141 		 */
1142 		erand = -log(urand) * emean;
1143 
1144 		/* Update `next' timeval. */
1145 		next.tv_sec += floor(erand);
1146 		next.tv_usec += (emean - floor(erand)) * 1000000;
1147 		normalize_tv(&next);
1148 
1149 		/* Calculate and set sequence number */
1150 		msb = htonl(packet >> 32);	/* Most significant. */
1151 		lsb = htonl(packet & ((1ULL<<32)-1));/* Least significant. */
1152 		memcpy(buf+8, &msb, 4);
1153 		memcpy(buf+12, &lsb, 4);
1154 
1155 		if (thrulay_opt.busywait) {
1156 			/* Busy wait. */
1157 
1158 			do {
1159 				if (tsc_gettimeofday(&this) == -1) {
1160 					perror("gettimeofday");
1161 					return -1;
1162 				}
1163 				normalize_tv(&this);
1164 			} while (this.tv_sec < next.tv_sec ||
1165 					(this.tv_sec == next.tv_sec &&
1166 					 this.tv_usec < next.tv_usec));
1167 		} else {
1168 			/* No busy wait. */
1169 
1170 			if (tsc_gettimeofday(&this) == -1) {
1171 				perror("gettimeofday");
1172 				return -1;
1173 			}
1174 			normalize_tv(&this);
1175 
1176 			/* Calculate how long to sleep. */
1177 			req.tv_sec = next.tv_sec - this.tv_sec;
1178 			req.tv_nsec = (next.tv_usec - this.tv_usec) * 1000;
1179 			if (req.tv_nsec < 0) {
1180 				--req.tv_sec;
1181 				req.tv_nsec += 1000000000;
1182 			}
1183 
1184 			/* Only sleep if requested sleeping time is positive.
1185 			 * If we have set a high rate with -u then we are
1186 			 * maybe late with this packet. */
1187 			if (req.tv_sec >= 0) {
1188 				/* According to nanosleep(2), the nanosecond
1189 				 * field has to be in the range 0 to 999999999.
1190 				 */
1191 				assert(req.tv_nsec >= 0);
1192 				assert(req.tv_nsec <= 999999999);
1193 
1194 #ifndef WIN32
1195 				do {
1196 					rc = nanosleep(&req, &rem);
1197 					if (rc == -1) {
1198 						if (errno != EINTR) {
1199 							perror("nanosleep");
1200 							return -25;
1201 						}
1202 						req.tv_sec = rem.tv_sec;
1203 						req.tv_nsec = rem.tv_nsec;
1204 					}
1205 				} while (rc != 0);
1206 #else
1207 				Sleep(req.tv_sec*1000+req.tv_nsec/1000000);
1208 #endif
1209 			}
1210 		}
1211 
1212 		if (tsc_gettimeofday(&this) == -1) {
1213 			perror("gettimeofday");
1214 			return -1;
1215 		}
1216 		normalize_tv(&this);
1217 
1218 		tv2ntp(&this, buf+16);
1219 
1220 		rc = sendto(udp_sock, buf, packet_size - header_size, 0,
1221 		            udp_destination, udp_destination_len);
1222 		if (rc == -1) {
1223 			perror("sendto");
1224 			return -26;
1225 		}
1226 	}
1227 
1228 	timer_stop();
1229 
1230 	sleep(1);	/* Let the UDP traffic drain. */
1231 	close(udp_sock);
1232 
1233 	if (thrulay_opt.reporting_verbosity > 0) {
1234 		printf("Client runtime: %8.3fs\n", timer.runtime);
1235 	}
1236 
1237 	/* UDP test completed successfully. */
1238 	return 0;
1239 }
1240 
1241 void
thrulay_udp_exit(void)1242 thrulay_udp_exit (void)
1243 {
1244 	if (close(tcp_sock) == -1)
1245 		error(ERR_WARNING, "thrulay_udp_exit(): Unable to close "
1246 			"TCP connection socket.");
1247 
1248 	/* Free memory of server sockaddr */
1249 	free(server);
1250 	free(udp_destination);
1251 }
1252 
1253 int
thrulay_udp_init(void)1254 thrulay_udp_init (void)
1255 {
1256 	int rc;
1257 	int tries;
1258 	struct addrinfo hints, *res, *ressave;
1259 	char service[7];
1260 	socklen_t server_len;
1261 
1262 	tcp_sock = name2socket(thrulay_opt.server_name, thrulay_opt.port,
1263 			       0, NULL, (void *)&server, &server_len);
1264 	if (tcp_sock < 0)
1265 		return tcp_sock;
1266 
1267 	rc = read_greeting(tcp_sock);
1268 	if (rc < 0)
1269 		return rc;
1270 
1271 	memset(&hints, 0, sizeof(struct addrinfo));
1272 	hints.ai_flags = AI_PASSIVE;
1273 	/* Use same address family as for TCP socket */
1274 	hints.ai_family = server->sa_family;
1275 	hints.ai_socktype = SOCK_DGRAM;
1276 
1277 	client_port = UDP_PORT;
1278 	tries = 0;
1279 	do {
1280 		client_port++;
1281 		tries++;
1282 
1283 		snprintf(service, sizeof(service), "%d", client_port);
1284 
1285 		if ((rc = getaddrinfo(NULL, service, &hints,
1286 						&res)) != 0) {
1287 			fprintf(stderr, "getaddrinfo(): %s\n",
1288 				gai_strerror(rc));
1289 			return -10;
1290 		}
1291 		ressave = res;
1292 
1293 		do {
1294 			udp_sock = socket(res->ai_family, res->ai_socktype,
1295 					res->ai_protocol);
1296 			if (udp_sock < 0)
1297 				continue;
1298 
1299 			/* Differentiated Services (DS) */
1300 			if (thrulay_opt.dscp) {
1301 				rc = set_dscp(udp_sock, thrulay_opt.dscp);
1302 				if (rc == -1)
1303 					error(ERR_WARNING,
1304 							"thrulay_udp_init(): "
1305 							"Unable to set DSCP "
1306 							"value.");
1307 			}
1308 
1309 			if ((rc = bind(udp_sock, res->ai_addr,
1310 							res->ai_addrlen)) == 0)
1311 				break;		/* success */
1312 
1313 			close(udp_sock);
1314 		} while ((res = res->ai_next) != NULL);
1315 	} while ((rc < 0) && (tries < TRY_UDP_PORTS));
1316 	if (rc < 0)
1317 		return -2;
1318 
1319 	freeaddrinfo(ressave);
1320 
1321 	/* Check whether test blocks should go to a multicast group or
1322 	   to the server address. */
1323 	udp_destination = malloc(res->ai_addrlen);
1324 	if (NULL == udp_destination) {
1325 		perror("malloc");
1326 		return -4;
1327 	}
1328 	if (NULL != thrulay_opt.mcast_group) {
1329 		hints.ai_flags = AI_PASSIVE;
1330 		hints.ai_family = server->sa_family;
1331 		hints.ai_socktype = SOCK_STREAM;
1332 
1333 		if ((rc = getaddrinfo(thrulay_opt.mcast_group, service, &hints,
1334 						&res)) != 0) {
1335 			fprintf(stderr, "getaddrinfo(): %s\n",
1336 				gai_strerror(rc));
1337 			return -3;
1338 		}
1339 		memcpy(udp_destination, res->ai_addr, res->ai_addrlen);
1340 		udp_destination_len = res->ai_addrlen;
1341 	} else {
1342 		/* No multicast group. UDP Destination address is the
1343 		   same as server address. */
1344 		memcpy(udp_destination, server, server_len);
1345 		udp_destination_len = server_len;
1346 	}
1347 
1348 	/* Protocol rate is in packets per 1000 seconds
1349 	 *
1350 	 * thrulay_opt.rate is in bits per second:
1351 	 *  => (thrulay_opt.rate/8) = bytes/second
1352 	 *  => (1000 * thrulay_opt.rate/8) = bytes per 1000 seconds
1353 	 */
1354 	protocol_rate = ((1000/8) * thrulay_opt.rate)/thrulay_opt.block_size;
1355 
1356 	/* npackets is number of packets to send in test
1357 	 *
1358 	 * npackets = protocol_rate / 1000 * thrulay_opt.test_duration
1359 	 */
1360 	npackets = (thrulay_opt.test_duration*thrulay_opt.rate)/
1361 		(8*thrulay_opt.block_size);
1362 
1363 	/* Successful initialization */
1364 	return 0;
1365 }
1366 
1367 void
thrulay_tcp_stop_id(int id)1368 thrulay_tcp_stop_id (int id)
1369 {
1370 	/* Delete stream socket in FD set. So it won't be processed in test
1371 	 * loop. */
1372 	FD_CLR(stream[id].sock, &rfds_orig);
1373 	FD_CLR(stream[id].sock, &wfds_orig);
1374 
1375 	/* Close our testing socket. This tells the server that this test
1376 	 * has finished and the server will log test duration and average
1377 	 * throughput. */
1378 	thrulay_tcp_exit_id(id);
1379 }
1380 
1381 /* Stop TCP test. */
1382 void
thrulay_tcp_stop(void)1383 thrulay_tcp_stop (void)
1384 {
1385 	stop_test = 1;
1386 }
1387 
1388 /* Some common MTU sizes with topology. */
1389 struct _mtu_info {
1390 	int mtu;
1391 	char *top;
1392 } mtu_list[] = {
1393 	{ 65535,	"Hyperchannel" },		/* RFC1374 */
1394 	{ 17914,	"16 MB/s Token Ring" },
1395 	{ 16436,	"Linux Loopback device" },
1396 	{ 9000,		"Ethernet, jumbo-frames" },	/* Internet2 */
1397 	{ 8166,		"802.4 Token Bus" },		/* RFC1042 */
1398 	{ 4464,		"4 MB/s Token Ring" },
1399 	{ 4352,		"FDDI" },			/* RFC1390 */
1400 	{ 1500,		"Ethernet (or PPP)" },		/* RFC894, RFC1548 */
1401 	{ 1492,		"IEEE 802.3" },
1402 	{ 1006,		"SLIP" },			/* RFC1055 */
1403 	{ 576,		"X.25 & ISDN" },		/* RFC1356 */
1404 	{ 296,		"PPP (low delay)" },
1405 };
1406 #define MTU_LIST_NUM	11
1407 
1408 /* Calculate MTU out of MSS.
1409  * Set's global variable `mtu' and returns pointer to topology info.
1410  *
1411  * According to RFC879:
1412  * 	MSS = MTU - sizeof(TCPHDR) - sizeof(IPHDR)
1413  *
1414  * Where:
1415  * 	20 <= sizeof(IPHDR) <= 60
1416  *	20 <= sizeof(TCPHDR) <= 60
1417  *
1418  * This implies:
1419  *	MSS + 40 <= MTU <= MSS + 120
1420  */
1421 char *
mtu_calc(int mss)1422 mtu_calc (int mss)
1423 {
1424 	int i;
1425 
1426 #ifdef IP_MTU
1427 	if (mtu) {
1428 		for (i = 0; i < MTU_LIST_NUM; i++) {
1429 			if (mtu == mtu_list[i].mtu) {
1430 				return (mtu_list[i].top);
1431 			}
1432 		}
1433 	}
1434 	return "unknown";
1435 #endif
1436 
1437 	for (i = 0; i < MTU_LIST_NUM; i++) {
1438 		if (((mss + 40) <= mtu_list[i].mtu)
1439 				&& (mtu_list[i].mtu <= (mss + 120))) {
1440 			mtu = mtu_list[i].mtu;
1441 			return (mtu_list[i].top);
1442 		}
1443 	}
1444 
1445 	/* No match. Return default one. */
1446 	mtu = mss + 40;
1447 	return "unknown";
1448 }
1449 
1450 /* Print test info before tests start. Displayed info includes local/remote
1451  * window, block size, MTU, MSS, test duration and reporting interval.
1452  */
1453 void
thrulay_tcp_info(void)1454 thrulay_tcp_info (void)
1455 {
1456 	char *str_top = NULL;
1457 
1458 	if (thrulay_opt.reporting_verbosity < 0)
1459 		return;
1460 
1461 	/* Print local/remote window and block size */
1462 	printf("# local window = %dB; remote window = %dB\n",
1463 			local_window, server_window);
1464 	if (thrulay_opt.block_size == server_block_size) {
1465 		printf("# block size = %dB\n", thrulay_opt.block_size);
1466 	} else {
1467 		printf("# requested block size = %dB; "
1468 				"actual block size = %dB\n",
1469 		       thrulay_opt.block_size, server_block_size);
1470 	}
1471 
1472 	str_top = mtu_calc(mss);
1473 
1474 	/* Print MTU, MSS, topology info */
1475 	printf("# MTU: %dB; MSS: %dB; Topology guess: %s\n", mtu, mss,
1476 			str_top);
1477 #ifdef IP_MTU
1478 	printf("# MTU = getsockopt(IP_MTU); MSS = getsockopt(TCP_MAXSEG)\n");
1479 #else
1480 	if (!strcmp(str_top, "unknown")) {
1481 		printf("# MTU = MSS + 40; MSS = getsockopt(TCP_MAXSEG)\n");
1482 	} else {
1483 		printf("# MTU = guessed out of MSS as in RFC 879; "
1484 				"MSS = getsockopt(TCP_MAXSEG)\n");
1485 	}
1486 #endif
1487 
1488 	/* Print test duration and reporting interval. */
1489 	printf("# test duration = %ds; ",thrulay_opt.test_duration);
1490 	if (0 < thrulay_opt.reporting_interval) {
1491 		printf("reporting interval = %ds\n",
1492 		       thrulay_opt.reporting_interval);
1493 	} else {
1494 		printf("intermediate reporting disabled\n");
1495 	}
1496 	printf("# delay (median) and jitter (interquartile spread of delay) "
1497 	       "are reported in ms\n");
1498 	if (thrulay_opt.reporting_verbosity > 0) {
1499 		printf("#(ID) begin, s   end, s   Mb/s  RTT delay,ms "
1500 		       "jitter     min      avg      max\n");
1501 	} else {
1502 		printf("#(ID) begin, s   end, s   Mb/s  RTT delay,ms "
1503 		       "jitter\n");
1504 	}
1505 
1506 	fflush(stdout);
1507 }
1508 
1509 /* Start TCP test. */
1510 int
thrulay_tcp_start(void)1511 thrulay_tcp_start (void)
1512 {
1513 	int rc;
1514 	int id;
1515 	struct timeval tv, timeout;
1516 	fd_set rfds, wfds;
1517 	char buf[STREAMS_MAX][1024];
1518 	int to_write;
1519 
1520 	for (id = 0; id < thrulay_opt.num_streams; id++) {
1521 		int my_server_window, my_server_block_size, my_mss = 0, sopt;
1522 		socklen_t len;
1523 #ifdef IP_MTU
1524 		int my_mtu;
1525 #endif
1526 
1527 		to_write = snprintf(buf[0], sizeof(buf[0]), "%s:t:%u:%u+",
1528 				    THRULAY_VERSION, thrulay_opt.window,
1529 				    thrulay_opt.block_size);
1530 		assert(to_write > 0 && to_write < (int) sizeof(buf[0]));
1531 
1532 		rc = send_proposal(stream[id].sock, buf[0], to_write);
1533 		if (rc < 0)
1534 			return rc;
1535 
1536 		rc = read_response(stream[id].sock, buf[0], sizeof(buf[0]));
1537 		if (rc < 0)
1538 			return rc;
1539 		my_server_window = my_server_block_size = -1;
1540 		/* XXX: Very long numbers will not, of course, be processed
1541 		 *      correctly by sscanf() below.  We could, if the number
1542 		 *      exceeds a certain value (e.g., has more than a certain
1543 		 *      number of characters, use some ``large'' -- still supported
1544 		 *      -- value for window or block size.  It's not worth the
1545 		 *      trouble. */
1546 		rc = sscanf(buf[0], "%d:%d+", &my_server_window,
1547 			    &my_server_block_size);
1548 		if (rc != 2)
1549 			return -22;
1550 		assert(my_server_window >= 0 && my_server_block_size >= 0);
1551 		if (my_server_block_size < MIN_BLOCK)
1552 			return -27;
1553 		if (my_server_block_size > MAX_BLOCK)
1554 			return -28;
1555 		if (my_server_window < 1500)
1556 			return -29;
1557 
1558 #ifdef IP_MTU
1559 		len = sizeof(my_mtu);
1560 		if (getsockopt(stream[id].sock, SOL_IP, IP_MTU,
1561 			       (char *)&my_mtu, &len) == -1) {
1562 			perror("getsockopt");
1563 			error(ERR_WARNING, "unable to determine Path MTU");
1564 		}
1565 #endif
1566 
1567 		/* Get Maximum Segment Size (MSS) */
1568 #ifdef TCP_MAXSEG
1569 		len = sizeof(my_mss);
1570 		if (getsockopt(stream[id].sock, SOL_TCP, TCP_MAXSEG,
1571 			       (char *)&my_mss, &len) == -1) {
1572 			perror("getsockopt");
1573 			error(ERR_WARNING, "unable to determine TCP_MAXSEG");
1574 		}
1575 #else
1576 		perror("getsockopt");
1577 		error(ERR_WARNING,
1578 		      "getsockopt(TCP_MAXSEG) not supported on Windows");
1579 #endif
1580 		/*
1581 		 * Check/set local/remote window, server block size, MSS.
1582 		 * As we display the local/remote window, server block size,
1583 		 * MSS, MTU only once, we check that this information is the
1584 		 * same for every stream.
1585 		 */
1586 		if (id == 0) {
1587 			/* If this is first stream, initialize global values.*/
1588 			server_window = my_server_window;
1589 			server_block_size = my_server_block_size;
1590 			mss = my_mss;
1591 #ifdef IP_MTU
1592 			mtu = my_mtu;
1593 #endif
1594 		}
1595 		if (server_window != my_server_window) {
1596 			return -30;
1597 		}
1598 		if (server_block_size != my_server_block_size) {
1599 			return -31;
1600 		}
1601 		if (mss != my_mss) {
1602 			return -32;
1603 		}
1604 #ifdef IP_MTU
1605 		if (mtu != my_mtu) {
1606 			return -33;
1607 		}
1608 #endif
1609 
1610 		/* Differentiated Services (DS) */
1611 		if (thrulay_opt.dscp) {
1612 			rc = set_dscp(stream[id].sock, thrulay_opt.dscp);
1613 			if (rc == -1)
1614 				error(ERR_WARNING, "thrulay_tcp_init_id(): "
1615 				      "Unable to set DSCP value.");
1616 		}
1617 
1618 #ifndef WIN32
1619 		assert((unsigned int)stream[id].sock < FD_SETSIZE);
1620 #endif
1621 
1622 		/* Set non-blocking IO. */
1623 #ifndef WIN32
1624 		sopt = fcntl(stream[id].sock, F_GETFL);
1625 
1626 		if (-1 == sopt) {
1627 			error(ERR_WARNING, "fcntl(F_GETFL): failed");
1628 		} else {
1629 			rc = fcntl(stream[id].sock,F_SETFL,sopt | O_NONBLOCK);
1630 			if (-1 == rc) {
1631 				error(ERR_WARNING, "fcntl(F_SETFL,O_NONBLOCK "
1632 				      "failed");
1633 			}
1634 		}
1635 #else
1636 		sopt = ioctlsocket(stream[id].sock, FIONBIO, (unsigned long*)1);
1637 		if (-1 == sopt) {
1638 			error(ERR_WARNING, "ioctlsocket(FIONBIO): failed");
1639 		}
1640 #endif /* ndef WIN32 */
1641 	}
1642 
1643 	/* Allocate memory for writing blocks. */
1644 	block = malloc((size_t)server_block_size +
1645 		       (thrulay_opt.num_streams -1) * BLOCK_HEADER);
1646 	if (block == NULL) {
1647 		return -4;
1648 	}
1649 
1650 	thrulay_tcp_info();
1651 
1652 	rc = timer_start();
1653 	if (rc < 0)
1654 		return rc;
1655 
1656 	stop_test = 0;
1657 	while (!stop_test) {
1658 		rfds = rfds_orig;
1659 		wfds = wfds_orig;
1660 
1661 		timeout.tv_sec = 0;
1662 		timeout.tv_usec = 1000;
1663 
1664 		rc = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
1665 
1666 		if (rc < 0) {
1667 			perror("select");
1668 			return -34;
1669 		}
1670 
1671 		if (rc == 0) {
1672 			/* Timeout occured. Check timer. */
1673 			timer_check();
1674 		}
1675 
1676 		if (rc > 0) {
1677 			/* Check all stream sockets. */
1678 			for (id = 0; id < thrulay_opt.num_streams; id++) {
1679 
1680 				/* Recv from socket. */
1681 				if (FD_ISSET(stream[id].sock, &rfds)) {
1682 					if (0 == stream[id].rcount &&
1683 					    tsc_gettimeofday(&tv) == -1) {
1684 						perror("gettimeofday");
1685 						return -1;
1686 					}
1687 
1688 					/* Non-blocking recv. */
1689 					rc = recv(stream[id].sock,
1690 						  buf[id]+stream[id].rcount,
1691 						  BLOCK_HEADER-
1692 						  stream[id].rcount, 0);
1693 
1694 					if (-1 == rc && EAGAIN != errno) {
1695 						perror("read");
1696 						error(ERR_WARNING, "premature "
1697 								"end of test");
1698 						thrulay_tcp_stop_id(id);
1699 						break;
1700 					} else if (0 < rc) {
1701 						stream[id].rcount += rc;
1702 						if ( BLOCK_HEADER ==
1703 						     stream[id].rcount ) {
1704 							/* Whole block read */
1705 							memcpy(&tv, buf[id],
1706 							       sizeof(tv));
1707 							rc = new_timestamp(id, &tv);
1708 							if (rc < 0)
1709 								return rc;
1710 							stream[id].rcount = 0;
1711 						}
1712 					}
1713 				}
1714 
1715 				/* Send to socket */
1716 				if (FD_ISSET(stream[id].sock, &wfds)) {
1717 					if (0 == stream[id].wcount) {
1718 						if (tsc_gettimeofday(&tv) ==
1719 						    -1) {
1720 							perror("gettimeofday");
1721 							return -1;
1722 						}
1723 						memcpy(block + id*BLOCK_HEADER,
1724 						       &tv, sizeof(tv));
1725 					}
1726 
1727 					/* Non-blocking send */
1728 					rc = send(stream[id].sock,
1729 						   block + id*BLOCK_HEADER +
1730 						   stream[id].wcount,
1731 						   (size_t)server_block_size-
1732 						   stream[id].wcount, 0);
1733 
1734 					if (rc == -1 && EAGAIN != errno) {
1735 						perror("send");
1736 						error(ERR_WARNING, "premature "
1737 						      "end of test");
1738 						thrulay_tcp_stop_id(id);
1739 					} else if (rc > 0) {
1740 						stream[id].wcount += rc;
1741 					}
1742 					if ( (size_t)server_block_size ==
1743 					     stream[id].wcount ) {
1744 						/* Whole block written */
1745 						stream[id].wcount = 0;
1746 					}
1747 				} /* Send to socket */
1748 			} /* for(id = 0; id < thrulay_opt.num_streams; id++) */
1749 
1750 			timer_check();
1751 		} /* if (rc > 0) { */
1752 	}
1753 	timer_stop();
1754 
1755 	/* Free memory of writing block. */
1756 	free(block);
1757 
1758 	/* TCP test completed successfully. */
1759 	return 0;
1760 }
1761 
1762 /* Deinitialize TCP stream with ID `id'. */
1763 void
thrulay_tcp_exit_id(int id)1764 thrulay_tcp_exit_id (int id)
1765 {
1766 	/* Close connection to server. */
1767 	if (close(stream[id].sock) == -1)
1768 		error(ERR_WARNING, "thrulay_tcp_exit_id(): Unable to close "
1769 				"server socket.");
1770 }
1771 
1772 /* Deinitialize all TCP streams. */
1773 void
thrulay_tcp_exit(void)1774 thrulay_tcp_exit (void)
1775 {
1776 	int id;
1777 
1778 	for (id = 0; id < thrulay_opt.num_streams; id++) {
1779 		thrulay_tcp_exit_id(id);
1780 	}
1781 
1782 	tcp_stats_exit();
1783 }
1784 
1785 /* Initialize TCP stream with ID `id'. */
1786 int
thrulay_tcp_init_id(int id)1787 thrulay_tcp_init_id (int id)
1788 {
1789 	int rc, my_local_window;
1790 	/* Open socket. Do not greet yet */
1791 	stream[id].wcount = stream[id].rcount = 0;
1792 	stream[id].sock = name2socket(thrulay_opt.server_name,
1793 				      thrulay_opt.port, thrulay_opt.window,
1794 				      &my_local_window, NULL, NULL);
1795 
1796 	if (stream[id].sock < 0)
1797 		return stream[id].sock;
1798 
1799 	rc = read_greeting(stream[id].sock);
1800 	if (rc < 0)
1801 		return rc;
1802 
1803 	if (id == 0) {
1804 		/* If this is first stream, initialize global values. */
1805 		local_window = my_local_window;
1806 	}
1807 	if (local_window != my_local_window) {
1808 		return -5;
1809 	}
1810 
1811 	memset(block + id*BLOCK_HEADER, '2', (size_t)server_block_size);
1812 
1813 	return 0;
1814 }
1815 
1816 /* Initialize all that has to do with TCP streams. */
1817 int
thrulay_tcp_init(void)1818 thrulay_tcp_init (void)
1819 {
1820 	int rc, id;
1821 
1822 	rc = tcp_stats_init();
1823 	if (rc < 0)
1824 		return rc;
1825 
1826 	/* Clean recv/send FD sets for select(). */
1827 	FD_ZERO(&rfds_orig);
1828 	FD_ZERO(&wfds_orig);
1829 
1830 	for (id = 0; id < thrulay_opt.num_streams; id++) {
1831 		/* Initialize TCP stream. */
1832 		rc = thrulay_tcp_init_id(id);
1833 		if (rc < 0)
1834 			return rc;
1835 
1836 		/* FD set */
1837 		FD_SET(stream[id].sock, &rfds_orig);
1838 		FD_SET(stream[id].sock, &wfds_orig);
1839 		maxfd = (stream[id].sock > maxfd ? stream[id].sock : maxfd);
1840 	}
1841 #ifdef WIN32
1842 	maxfd = FD_SETSIZE - 1;
1843 #endif
1844 
1845 	return 0;
1846 }
1847 
1848 /* Initialize thrulay. */
1849 int
thrulay_client_init(thrulay_opt_t opt)1850 thrulay_client_init (thrulay_opt_t opt)
1851 {
1852 	int rc;
1853 	thrulay_opt = opt;
1854 	tsc_init();
1855 
1856 #ifdef WIN32
1857 	{
1858 		WSADATA wsaData;
1859 		if (WSAStartup(MAKEWORD(2, 2), &wsaData)) {
1860 			return -35;
1861 		}
1862 	}
1863 #endif
1864 
1865 	/* If client has set no block size, set default ones. */
1866 	if (!thrulay_opt.block_size) {
1867 		if (thrulay_opt.rate) {
1868 			/* Set default UDP block size */
1869 			thrulay_opt.block_size = 1500;
1870 		} else {
1871 			/* Set default TCP block size */
1872 			thrulay_opt.block_size = 64 * 1024;
1873 		}
1874 	}
1875 
1876 	if (thrulay_opt.rate) {
1877 		rc = thrulay_udp_init();
1878 	} else {
1879 		rc = thrulay_tcp_init();
1880 	}
1881 
1882 	return rc;
1883 }
1884 
1885 /* Deinitialize thrulay. */
1886 void
thrulay_client_exit(void)1887 thrulay_client_exit (void)
1888 {
1889 
1890 	if (thrulay_opt.rate) {
1891 		thrulay_udp_exit();
1892 	} else {
1893 		thrulay_tcp_exit();
1894 	}
1895 
1896 #ifdef WIN32
1897 	WSACleanup();
1898 #endif
1899 }
1900 
1901 int
thrulay_client_start(void)1902 thrulay_client_start (void)
1903 {
1904 	int rc;
1905 
1906 	if (thrulay_opt.rate) {
1907 		rc = thrulay_udp_start();
1908 	} else {
1909 		rc = thrulay_tcp_start();
1910 	}
1911 
1912 	return rc;
1913 }
1914 
1915 int
thrulay_client_report_final(void)1916 thrulay_client_report_final (void)
1917 {
1918 	int rc = 0;
1919 
1920 	if (thrulay_opt.reporting_verbosity < 0)
1921 		return 0;
1922 
1923 	if (thrulay_opt.rate) {
1924 		rc = thrulay_udp_report_final();
1925 	} else {
1926 		thrulay_tcp_report_final();
1927 	}
1928 
1929 	return rc;
1930 }
1931