1 /**
2 * client.c -- thrulay library, client API implementation.
3 *
4 * Written by Stanislav Shalunov, http://www.internet2.edu/~shalunov/
5 * Bernhard Lutzmann, belu@users.sf.net
6 * Federico Montesino Pouzols, fedemp@altern.org
7 *
8 * @(#) $Id: client.c,v 1.1.2.13 2006/08/20 18:06:19 fedemp Exp $
9 *
10 * Copyright 2003, 2006 Internet2.
11 * Legal conditions are in file LICENSE
12 * (MD5 = ecfa50d1b0bfbb81b658c810d0476a52).
13 */
14
15 #ifdef HAVE_CONFIG_H
16 #include <config.h>
17 #endif
18
19 #ifdef HAVE_STDINT_H
20 #include <stdint.h>
21 #endif
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <limits.h>
25 #include <unistd.h>
26 #include <sys/types.h>
27 #include <assert.h>
28 #include <signal.h>
29 #include <sys/param.h>
30 #include <time.h>
31 #include <sys/time.h>
32 #include <fcntl.h>
33 #include <string.h>
34 #include <strings.h>
35 #include <math.h>
36 #include <errno.h>
37 #ifndef WIN32
38 #ifdef HAVE_NETDB_H
39 #include <netdb.h>
40 #endif
41 #include <sys/socket.h>
42 #include <netinet/in.h>
43 #include <netinet/in_systm.h> /* need this under FreeBSD for
44 * <netinet/ip.h> */
45 #include <netinet/ip.h>
46 #include <netinet/tcp.h>
47 #include <arpa/inet.h>
48 #else
49 #define _WIN32_WINNT 0x0501 /* Will only work in Windows XP or later */
50 #include <winsock2.h>
51 #include <ws2tcpip.h>
52 #define sleep(t) _sleep(t)
53 #define close(s) closesocket(s) /* If close is only used with sockets*/
54 #endif /* ndef WIN32 */
55
56 #include "client.h"
57 #include "reporting.h"
58 #include "rcs.h"
59 #include "util.h"
60
61 #define THRULAY_GREET THRULAY_VERSION "+"
62
63 #define DEFAULT_PORT 5003
64 #define UDP_PORT 5003
65 #define TRY_UDP_PORTS 1000
66
67 #define IPV4_HEADER_SIZE 20
68 #define IPV6_HEADER_SIZE 40
69 #define UDP_HEADER_SIZE 8
70 #define UDP_PAYLOAD_SIZE 24
71
72 #ifndef SOL_IP
73 #ifdef IPPROTO_IP
74 #define SOL_IP IPPROTO_IP
75 #endif
76 #endif
77
78 #ifndef SOL_TCP
79 #ifdef IPPROTO_TCP
80 #define SOL_TCP IPPROTO_TCP
81 #endif
82 #endif
83
84 /* IP_MTU not defined in <bits/in.h> under Linux */
85 #if defined (__LINUX__) && !defined(IP_MTU)
86 #define IP_MTU 14
87 #endif
88
89 #if defined (__SOLARIS__)
90 #define RANDOM_MAX 4294967295UL /* 2**32-1 */
91 #elif defined (__DARWIN__)
92 #define RANDOM_MAX LONG_MAX /* Darwin */
93 #else
94 #define RANDOM_MAX RAND_MAX /* Linux, FreeBSD, Windows */
95 #endif
96
97 RCS_ID("@(#) $Id: client.c,v 1.1.2.13 2006/08/20 18:06:19 fedemp Exp $")
98
99 int
100 thrulay_tcp_init(void);
101
102 int
103 thrulay_tcp_init_id(int);
104
105 void
106 thrulay_tcp_exit(void);
107
108 void
109 thrulay_tcp_exit_id(int);
110
111 int
112 thrulay_tcp_start(void);
113
114 void
115 thrulay_tcp_stop(void);
116
117 int
118 thrulay_tcp_report(void);
119
120 int
121 thrulay_tcp_report_id(int);
122
123 void
124 thrulay_tcp_report_final(void);
125
126 void
127 thrulay_tcp_report_final_id(int);
128
129 int
130 thrulay_udp_init(void);
131
132 void
133 thrulay_udp_exit(void);
134
135 int
136 thrulay_udp_start(void);
137
138 int
139 thrulay_udp_report_final(void);
140
141 /* Statistics */
142
143 #define STREAM_PER_INTERVAL_QUANTILE_SEQ(id) (2 * id)
144 #define STREAM_FINAL_QUANTILE_SEQ(id) (2 * id + 1)
145
146 /* Get how many quantile sequences are required to keep statistics for
147 a given number of streams. */
148 int
149 required_quantile_seqs(int num_streams);
150
151 int
152 tcp_stats_init(void);
153
154 /* Timer */
155 int
156 timer_start(void);
157
158 int
159 timer_stop(void);
160
161 int
162 timer_check(void);
163
164 int
165 timer_report(struct timeval *);
166
167 void
168 timer_end(struct timeval *);
169
170 /* Client options */
171 static thrulay_opt_t thrulay_opt;
172
173 static fd_set rfds_orig, wfds_orig;
174 static int maxfd = 0;
175 static char *block;
176 static int stop_test = 0;
177 static int server_block_size;
178 static int local_window, server_window;
179 static int mtu, mss;
180
181 /* UDP test */
182 static unsigned int client_port, server_port;
183 static unsigned int packet_size;
184 static unsigned int protocol_rate; /* In packets per 1000 seconds. */
185 static int tcp_sock, udp_sock;
186 static uint64_t npackets;
187 static struct sockaddr *server = NULL;
188 static struct sockaddr *udp_destination = NULL;
189 static socklen_t udp_destination_len;
190
191 /* Stream information */
192 static struct _stream {
193 /* Connection socket */
194 int sock;
195 /* Counters for tracking send/recv progress with non-blocking I/O */
196 size_t wcount;
197 size_t rcount;
198 } stream[STREAMS_MAX];
199
200 /* Statistics information (per stream) */
201 static struct _stat {
202 /* Block counter */
203 unsigned int blocks_since_first;
204 unsigned int blocks_since_last;
205
206 /* RTT */
207 double min_rtt_since_first;
208 double min_rtt_since_last;
209 double max_rtt_since_first;
210 double max_rtt_since_last;
211 double tot_rtt_since_first;
212 double tot_rtt_since_last;
213 } stats[STREAMS_MAX];
214
215 /* Timer information */
216 static struct _timer {
217 struct timeval start;
218 struct timeval stop;
219 struct timeval next;
220 struct timeval last;
221 double runtime;
222 } timer;
223
224 /* Be very careful with error code numbers! */
225 static const char *thrulay_client_error_s[] = {
226 "No error", /* 0 */
227 "gettimeofday(): failed", /* 1 */
228 "could not initialize UDP test", /* 2 */
229 "getaddrinfo(): failed for multicast group", /* 3 */
230 "malloc(): failed", /* 4 */
231 "different local window", /* 5 */
232 "gettimeofday() failed, unable to start timer", /* 6 */
233 "gettimeofday() failed, unable to process new timestamp", /* 7 */
234 "gettimeofday() failed, unable to print stream stats.", /* 8 */
235 "getaddrinfo(): failed for server name resolution while initializing a TCP test", /* 9 */
236 "getaddrinfo(): failed for server name resolution while initializing an UDP test", /* 10 */
237 "could not establish connection to server", /* 11 */
238 "could not read server banner", /* 12 */
239 "not a thrulay server responded", /* 13 */
240 "could not read rejection reason", /* 14 */
241 "stop (connection rejected", /* 15 */
242 "could not send session proposal", /* 16 */
243 "could not read session response", /* 17 */
244 "server closed connection after proposal", /* 18 */
245 "could not send terminating message", /* 19 */
246 "could not recv session results", /* 20 */
247 "server rejected our UDP proposal", /* 21 */
248 "malformed session response from server", /* 22 */
249 "socket address family not supported", /* 23 */
250 "socket address family not supported for multicast", /* 24 */
251 "nanosleep() failed", /* 25 */
252 "sending UDP packet failed", /* 26 */
253 "server gave block size less than MIN_BLOCK", /* 27 */
254 "server gave block size too large", /* 28 */
255 "server gave ridiculously small window", /* 29 */
256 "different server window", /* 30 */
257 "different server block size", /* 31 */
258 "different MSS", /* 32 */
259 "different MTU", /* 33 */
260 "select(): failed", /* 34 */
261 "WSAStartup failed while initializing client", /* 35 */
262 "error in quantile computation" /* 36 */
263 };
264
265 static const int max_thrulay_client_error = 36;
266
267 const char *
thrulay_client_strerror(int errorcode)268 thrulay_client_strerror(int errorcode)
269 {
270 if( 0 >= errorcode && errorcode >= -max_thrulay_client_error){
271 return thrulay_client_error_s[-errorcode];
272 } else {
273 return NULL;
274 }
275 }
276
277 /* Set default options. */
278 void
thrulay_client_options_init(thrulay_opt_t * opt)279 thrulay_client_options_init (thrulay_opt_t *opt)
280 {
281 if(NULL == opt)
282 return;
283
284 opt->server_name = NULL;
285 opt->num_streams = 1;
286 opt->test_duration = 60;
287 opt->reporting_interval = 1;
288 opt->reporting_verbosity = 0;
289 opt->window = 4194304;
290 opt->block_size = 0;
291 opt->port = DEFAULT_PORT;
292 opt->rate = 0;
293 opt->dscp = 0;
294 opt->busywait = 0;
295 opt->ttl = 1;
296 opt->mcast_group = NULL;
297 }
298
299 int
tcp_stats_init(void)300 tcp_stats_init (void)
301 {
302 int rc, id;
303
304 for (id = 0; id < thrulay_opt.num_streams; id++)
305 {
306 /* Blocks */
307 stats[id].blocks_since_first = 0;
308 stats[id].blocks_since_last = 0;
309
310 /* RTT */
311 stats[id].min_rtt_since_first = 1000.0;
312 stats[id].min_rtt_since_last = 1000.0;
313 stats[id].max_rtt_since_first = -1000.0;
314 stats[id].max_rtt_since_last = -1000.0;
315 stats[id].tot_rtt_since_first = 0.0;
316 stats[id].tot_rtt_since_last = 0.0;
317 }
318
319 rc = quantile_init(required_quantile_seqs(thrulay_opt.num_streams),
320 QUANTILE_EPS, 1024 * 1024);
321 if (-1 == rc) {
322 return -4;
323 }
324
325 return 0;
326 }
327
328 void
tcp_stats_exit()329 tcp_stats_exit()
330 {
331 /* Deinitialize quantile sequences. */
332 quantile_exit();
333 }
334
335 int
required_quantile_seqs(int num_streams)336 required_quantile_seqs(int num_streams)
337 {
338 int quantile_seqs;
339
340 /* We need two quantile sequences per stream (one for interval
341 reports and one for the stream final report - plus another
342 quantile sequence for global statistics. */
343 if (num_streams == 1) {
344 /* Fortunately, if there is only one stream, we can
345 save 1 quantile sequence. */
346 quantile_seqs = 2;
347 } else {
348 quantile_seqs = 2 * num_streams + 1;
349 }
350 return quantile_seqs;
351 }
352
353 /* Process new timestamp. */
354 int
new_timestamp(int id,struct timeval * tv)355 new_timestamp(int id, struct timeval *tv)
356 {
357 struct timeval this;
358 double relative;
359 int rc;
360
361 if (tsc_gettimeofday(&this) == -1) {
362 perror("gettimeofday");
363 return -7;
364 }
365 normalize_tv(&this);
366
367 relative = time_diff(tv, &this);
368 if (relative < 0) {
369 error(ERR_WARNING, "negative delay, ignoring block");
370 return 0;
371 }
372
373 /* quantile sequence for stream interval report. */
374 rc = quantile_value_checkin(STREAM_PER_INTERVAL_QUANTILE_SEQ(id),
375 relative);
376 if (rc < 0)
377 return -36;
378 /* quantile sequence for stream final report. */
379 rc = quantile_value_checkin(STREAM_FINAL_QUANTILE_SEQ(id), relative);
380 if (rc < 0)
381 return -36;
382 /* If there is more than one stream, we use an additional
383 global sequence for the global final report. */
384 if (thrulay_opt.num_streams > 1) {
385 rc = quantile_value_checkin(2 * thrulay_opt.num_streams,
386 relative);
387 if (rc < 0)
388 return -36;
389 }
390
391 /* Update statistics for stream. */
392 stats[id].blocks_since_first++;
393 stats[id].blocks_since_last++;
394 if (stats[id].min_rtt_since_first > relative)
395 stats[id].min_rtt_since_first = relative;
396 if (stats[id].min_rtt_since_last > relative)
397 stats[id].min_rtt_since_last = relative;
398 if (stats[id].max_rtt_since_first < relative)
399 stats[id].max_rtt_since_first = relative;
400 if (stats[id].max_rtt_since_last < relative)
401 stats[id].max_rtt_since_last = relative;
402 stats[id].tot_rtt_since_first += relative;
403 stats[id].tot_rtt_since_last += relative;
404
405 return 0;
406 }
407
408 /* If test duration is over, stops TCP test. */
409 void
timer_end(struct timeval * now)410 timer_end (struct timeval *now)
411 {
412 if (now->tv_sec > timer.stop.tv_sec
413 || (now->tv_sec == timer.stop.tv_sec
414 && now->tv_usec >= timer.stop.tv_usec)) {
415 thrulay_tcp_stop();
416 }
417 }
418
419 /* If progress report should be displayed does so and updates timer values. */
420 int
timer_report(struct timeval * now)421 timer_report (struct timeval *now)
422 {
423 int rc;
424
425 if (now->tv_sec > timer.next.tv_sec
426 || (now->tv_sec == timer.next.tv_sec
427 && now->tv_usec >= timer.next.tv_usec)) {
428 rc = thrulay_tcp_report();
429 if (rc < 0)
430 return rc;
431
432 timer.last.tv_sec = now->tv_sec;
433 timer.last.tv_usec = now->tv_usec;
434
435 while (timer.next.tv_sec < now->tv_sec
436 || (timer.next.tv_sec == now->tv_sec
437 && timer.next.tv_usec <= now->tv_usec))
438 timer.next.tv_sec += thrulay_opt.reporting_interval;
439 }
440
441 return 0;
442 }
443
444 int
timer_check(void)445 timer_check (void)
446 {
447 int rc;
448 struct timeval this;
449
450 if (tsc_gettimeofday(&this) == -1) {
451 perror("gettimeofday");
452 return -1;
453 }
454 normalize_tv(&this);
455
456 if (0 != thrulay_opt.reporting_interval) {
457 rc = timer_report(&this);
458 if (rc < 0)
459 return rc;
460 }
461 timer_end(&this);
462
463 return 0;
464 }
465
466 /* Stop timer. Calculates runtime and saves this to `timer.runtime'. */
467 int
timer_stop(void)468 timer_stop (void)
469 {
470 struct timeval this;
471
472 if (tsc_gettimeofday(&this) == -1) {
473 perror("gettimeofday");
474 return -1;
475 }
476 normalize_tv(&this);
477
478 /* Set final runtime. */
479 timer.runtime = time_diff(&timer.start, &this);
480
481 return 0;
482 }
483
484 /* Start timer. This saves the starting time to `timer.start', saves time when
485 * next progress report should be displayed in `timer.next' and time when
486 * test should stop in `timer.stop'. This are all timeval structures. */
487 int
timer_start(void)488 timer_start (void)
489 {
490 if (tsc_gettimeofday(&timer.start) == -1) {
491 perror("gettimeofday");
492 return -6;
493 }
494 normalize_tv(&timer.start);
495
496 timer.stop.tv_sec = timer.start.tv_sec + thrulay_opt.test_duration;
497 timer.stop.tv_usec = timer.start.tv_usec;
498
499 if (0 != thrulay_opt.reporting_interval) {
500 /* if intermediate reports not disabled */
501 timer.last.tv_sec = timer.start.tv_sec;
502 timer.last.tv_usec = timer.start.tv_usec;
503
504 timer.next.tv_sec = timer.start.tv_sec +
505 thrulay_opt.reporting_interval;
506 timer.next.tv_usec = timer.start.tv_usec;
507 } else {
508 /* if disabled. Unnecessary, just to keep consistency */
509 timer.last.tv_sec = timer.stop.tv_sec + 1;
510 timer.last.tv_usec = 0;
511
512 timer.next.tv_sec = timer.stop.tv_sec + 1;
513 timer.next.tv_usec = 0;
514 }
515
516 return 0;
517 }
518
519 void
thrulay_tcp_report_final_id(int id)520 thrulay_tcp_report_final_id(int id)
521 {
522 int rc, quantile_seq;
523 double quantile_25, quantile_50, quantile_75;
524
525 if (stats[id].blocks_since_first == 0) {
526 /* This stream was not very active :) */
527 if (thrulay_opt.reporting_verbosity > 0) {
528 printf("#(%2d) %8.3f %8.3f %8.3f %8.3f %8.3f %8.3f "
529 "%8.3f %8.3f\n",
530 id, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
531 } else {
532 printf("#(%2d) %8.3f %8.3f %8.3f %8.3f %8.3f %8.3f\n",
533 id, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);
534 }
535 return;
536 }
537
538 if (stats[id].blocks_since_first > 3 ) {
539 quantile_seq = STREAM_FINAL_QUANTILE_SEQ(id);
540 /* Finish stream final sequence. */
541 rc = quantile_finish(quantile_seq);
542 /* Get results. */
543 quantile_output(quantile_seq, stats[id].blocks_since_first,
544 0.25, &quantile_25);
545 quantile_output(quantile_seq, stats[id].blocks_since_first,
546 0.50, &quantile_50);
547 quantile_output(quantile_seq, stats[id].blocks_since_first,
548 0.75, &quantile_75);
549 } else {
550 if (1 == stats[id].blocks_since_first) {
551 quantile_25 = quantile_50 = quantile_75 =
552 stats[id].min_rtt_since_first;
553 } else if (2 == stats[id].blocks_since_first) {
554 quantile_25 = quantile_50 =
555 stats[id].min_rtt_since_first;
556 quantile_75 =
557 stats[id].max_rtt_since_first;
558 } else {
559 quantile_25 = stats[id].min_rtt_since_first;
560 quantile_50 = stats[id].tot_rtt_since_first -
561 stats[id].max_rtt_since_first -
562 stats[id].min_rtt_since_first;
563 quantile_75 = stats[id].max_rtt_since_first;
564 }
565 }
566
567 printf("#(%2d) %8.3f %8.3f %8.3f %8.3f %8.3f",
568 id,
569 0.0,
570 timer.runtime,
571 (double)stats[id].blocks_since_first *
572 (double)server_block_size * 8.0 / 1000000.0 /
573 timer.runtime,
574 1000.0 * quantile_50, /* delay */
575 1000.0 * (quantile_75 - quantile_25) /* jitter */
576 );
577
578 /* Verbose output shows min., avg. and max. */
579 if (thrulay_opt.reporting_verbosity > 0) {
580 printf(" %8.3f %8.3f %8.3f\n",
581 stats[id].min_rtt_since_first * 1000.0,
582 stats[id].tot_rtt_since_first * 1000.0 /
583 (double)stats[id].blocks_since_first,
584 stats[id].max_rtt_since_first * 1000.0);
585 } else {
586 printf("\n");
587 }
588 }
589
590
591 void
thrulay_tcp_report_final(void)592 thrulay_tcp_report_final(void)
593 {
594 double mbs = 0.0;
595 double min_rtt = 1000.0;
596 double max_rtt = -1000.0;
597 double tot_rtt = 0.0;
598 double avg_rtt_sum = 0.0;
599 uint64_t total_blocks = 0;
600 double quantile_25, quantile_50, quantile_75;
601 int id, gseq;
602
603
604 /* If just one stream, no need to compute any more. */
605 if (thrulay_opt.num_streams > 1) {
606 for (id = 0; id < thrulay_opt.num_streams; id++)
607 thrulay_tcp_report_final_id(id);
608 }
609
610 /* Now calculate global statistics. */
611 for (id = 0; id < thrulay_opt.num_streams; id++) {
612 /* Total number of blocks */
613 total_blocks += stats[id].blocks_since_first;
614
615 /* Calculate throughput of all streams together. */
616 mbs += (double)stats[id].blocks_since_first *
617 (double)server_block_size * 8.0 / 1000000.0 /
618 timer.runtime;
619
620 /* Calculate minimum RTT */
621 min_rtt = (min_rtt < stats[id].min_rtt_since_first ?
622 min_rtt : stats[id].min_rtt_since_first);
623
624 /* Calculate maximum RTT */
625 max_rtt = (stats[id].max_rtt_since_first < max_rtt ?
626 max_rtt : stats[id].max_rtt_since_first);
627
628 /* Calculate sum of average RTT */
629 if (stats[id].blocks_since_first != 0) {
630 tot_rtt += stats[id].tot_rtt_since_first;
631 avg_rtt_sum += stats[id].tot_rtt_since_first *
632 1000.0 /
633 (double)stats[id].blocks_since_first;
634 }
635 }
636
637 /* Finish global sequence. */
638 if (thrulay_opt.num_streams > 1 ) {
639 gseq = 2 * thrulay_opt.num_streams;
640 } else {
641 /* Just 1 stream, so the global sequence is the same
642 as the stream total sequence. */
643 gseq = 1;
644 }
645
646 if (total_blocks > 3) {
647 quantile_finish(gseq);
648 /* Get global quantiles. */
649 quantile_output(gseq, total_blocks, 0.25, &quantile_25);
650 quantile_output(gseq, total_blocks, 0.50, &quantile_50);
651 quantile_output(gseq, total_blocks, 0.75, &quantile_75);
652 } else {
653 if (1 == total_blocks) {
654 quantile_25 = quantile_50 = quantile_75 = min_rtt;
655 } else if (2 == total_blocks) {
656 quantile_25 = quantile_50 = min_rtt;
657 quantile_75 = max_rtt;
658 } else {
659 quantile_25 = min_rtt;
660 quantile_50 = tot_rtt - max_rtt - min_rtt;
661 quantile_75 = max_rtt;
662 }
663 }
664
665 printf("#(**) %8.3f %8.3f %8.3f %8.3f %8.3f",
666 0.0,
667 timer.runtime, /* global runtime */
668 mbs, /* MB/s */
669 1000.0 * quantile_50, /* delay */
670 1000.0 * (quantile_75 - quantile_25) /* jitter */
671 );
672
673 /* Verbose output shows min., avg. and max. */
674 if (thrulay_opt.reporting_verbosity > 0) {
675 printf(" %8.3f %8.3f %8.3f\n",
676 min_rtt * 1000.0, /* minimal RTT */
677 avg_rtt_sum / thrulay_opt.num_streams, /* avg. RTT */
678 max_rtt * 1000.0 /* maximal RTT */
679 );
680 } else {
681 printf("\n");
682 }
683 }
684
685 /* Variables for progress reporting. */
686 char report_buffer[STREAMS_MAX * 80];
687 char *report_buffer_ptr = NULL;
688 int report_buffer_len = 0;
689
690 int
thrulay_tcp_report_id(int id)691 thrulay_tcp_report_id (int id)
692 {
693 struct timeval this;
694 double diff_first_last;
695 double relative; /* Time difference between now and last */
696 int qseq = STREAM_PER_INTERVAL_QUANTILE_SEQ(id);
697 int rc, n = 0;
698
699 if (tsc_gettimeofday(&this) == -1) {
700 perror("gettimeofday");
701 return -8;
702 }
703 normalize_tv(&this);
704
705 diff_first_last = time_diff(&timer.start, &timer.last);
706 relative = time_diff(&timer.last, &this);
707
708 /* Time must be monotonically increasing, at least
709 * roughly, for this program to work. */
710
711 if (stats[id].blocks_since_last == 0) {
712 n = sprintf(report_buffer_ptr,
713 " (%2d) %8.3f %8.3f %8.3f %8.3f %8.3f",
714 id,
715 diff_first_last,
716 diff_first_last + relative,
717 0.0, /* MB/s */
718 0.0, /* delay */
719 0.0 /* jitter */
720 );
721 if (thrulay_opt.reporting_verbosity > 0) {
722 n += sprintf(report_buffer_ptr + n,
723 " %8.3f %8.3f %8.3f\n",
724 0.0, /* min RTT since last */
725 0.0, /* avg RTT since last */
726 0.0); /* max RTT since last */
727 } else {
728 n += sprintf(report_buffer_ptr + n, "\n");
729 }
730 } else {
731 double quantile_25, quantile_50, quantile_75;
732
733 if (stats[id].blocks_since_last > 3) {
734
735 /* Finish interval quantile sequence. */
736 rc = quantile_finish(qseq);
737 if (rc < 0)
738 return -36;
739 /* Get results. */
740 rc = quantile_output(qseq, stats[id].blocks_since_last,
741 0.25,
742 &quantile_25);
743 if (rc < 0)
744 return -36;
745 rc = quantile_output(qseq, stats[id].blocks_since_last,
746 0.50, &quantile_50);
747 if (rc < 0)
748 return -36;
749 rc = quantile_output(qseq, stats[id].blocks_since_last,
750 0.75, &quantile_75);
751 if (rc < 0)
752 return -36;
753 } else {
754 if (1 == stats[id].blocks_since_last) {
755 quantile_25 = quantile_50 = quantile_75 =
756 stats[id].min_rtt_since_last;
757 } else if (2 == stats[id].blocks_since_last) {
758 quantile_25 = quantile_50 =
759 stats[id].min_rtt_since_last;
760 quantile_75 =
761 stats[id].max_rtt_since_last;
762 } else {
763 quantile_25 = stats[id].min_rtt_since_last;
764 quantile_50 = stats[id].tot_rtt_since_last -
765 stats[id].max_rtt_since_last -
766 stats[id].min_rtt_since_last;
767 quantile_75 = stats[id].max_rtt_since_last;
768 }
769 }
770
771 n = sprintf(report_buffer_ptr,
772 " (%2d) %8.3f %8.3f %8.3f %8.3f %8.3f",
773 id,
774 diff_first_last,
775 diff_first_last + relative,
776 (double)stats[id].blocks_since_last *
777 (double)server_block_size *
778 8.0 / 1000000.0 / relative,
779 1000.0 * quantile_50, /*delay*/
780 1000.0 * (quantile_75 - quantile_25) /*jitter*/
781 );
782
783 /* Verbose output shows min., avg. and max. */
784 if (thrulay_opt.reporting_verbosity > 0) {
785 n += sprintf(report_buffer_ptr + n,
786 " %8.3f %8.3f %8.3f\n",
787 stats[id].min_rtt_since_last*1000.0,
788 stats[id].tot_rtt_since_last*1000.0 /
789 (double)stats[id].blocks_since_last,
790 stats[id].max_rtt_since_last*1000.0);
791 } else {
792 n += sprintf(report_buffer_ptr + n, "\n");
793 }
794 }
795 report_buffer_ptr += n;
796 report_buffer_len += n;
797
798 stats[id].blocks_since_last = 0;
799 stats[id].min_rtt_since_last = 1000.0;
800 stats[id].max_rtt_since_last = -1000.0;
801 stats[id].tot_rtt_since_last = 0.0;
802
803 /* Reinit interval quantile sequence. */
804 quantile_exit_seq(qseq);
805 quantile_init_seq(qseq);
806
807
808 return 0;
809 }
810
811 int
thrulay_tcp_report(void)812 thrulay_tcp_report (void)
813 {
814 int id;
815
816 report_buffer_ptr = &report_buffer[0];
817 report_buffer_len = 0;
818
819 for (id = 0; id < thrulay_opt.num_streams; id++) {
820 int rc = thrulay_tcp_report_id(id);
821 if (rc < 0)
822 return rc;
823 }
824
825 /* Display progress report. */
826 write_exactly(STDOUT_FILENO, report_buffer, report_buffer_len);
827
828 return 0;
829 }
830
831 /*
832 * Return a socket connected to the right TCP port on the right
833 * server. Use the given window size, if non-zero, locally. Store
834 * the actual window size in *real_window. Store server socket
835 * address structure in `saptr' (if both saptr and lenp are non-NULL) and
836 * size of socket address structure in `lenp'.
837 */
838 int
name2socket(char * server_name,int port,int window,int * real_window,void ** saptr,socklen_t * lenp)839 name2socket(char *server_name, int port, int window, int *real_window,
840 void **saptr, socklen_t *lenp)
841 {
842 int sockfd, n;
843 struct addrinfo hints, *res, *ressave;
844 char service[7];
845
846 memset(&hints, 0, sizeof(struct addrinfo));
847 hints.ai_family = AF_UNSPEC;
848 hints.ai_socktype = SOCK_STREAM;
849
850 snprintf(service, sizeof(service), "%d", port);
851
852 if ((n = getaddrinfo(server_name, service, &hints, &res)) != 0) {
853 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(n));
854 return -9;
855 }
856 ressave = res;
857
858 do {
859 sockfd = socket(res->ai_family, res->ai_socktype,
860 res->ai_protocol);
861 if (sockfd < 0)
862 continue; /* ignore this one */
863
864 if (window)
865 *real_window = set_window_size(sockfd, window);
866
867 if (connect(sockfd, res->ai_addr, res->ai_addrlen) == 0)
868 break; /* success */
869
870 close(sockfd);
871 } while ((res = res->ai_next) != NULL);
872
873 if (res == NULL) {
874 return -11;
875 }
876
877 if (saptr && lenp) {
878 *saptr = malloc(res->ai_addrlen);
879 if (*saptr == NULL) {
880 perror("malloc");
881 return -4;
882 }
883 memcpy(*saptr, res->ai_addr, res->ai_addrlen);
884 *lenp = res->ai_addrlen;
885 }
886
887 freeaddrinfo(ressave);
888
889 return sockfd;
890 }
891
892 /* Read the thrulay greeting from a TCP connection associated with socket s. */
893 int
read_greeting(int s)894 read_greeting(int s)
895 {
896 char buf[1024];
897 int rc;
898 size_t greetlen = sizeof(THRULAY_GREET) - 1;
899
900 rc = recv_exactly(s, buf, greetlen);
901 assert(rc <= (int) greetlen);
902 if (rc != (int) greetlen) {
903 if (rc == -1)
904 perror("recv");
905 return -12;
906 }
907 if (strncmp(buf, THRULAY_VERSION, sizeof(THRULAY_VERSION) - 1) != 0)
908 return -13;
909 if (buf[greetlen - 1] != '+') {
910 error(ERR_WARNING, "connection rejected");
911 rc = recv(s, buf, sizeof(buf) - 1, 0);
912 buf[sizeof(buf) - 1] = '\0';
913 if (rc == -1) {
914 perror("reading rejection reason");
915 return -14;
916 }
917 assert(rc < (int) sizeof(buf));
918 buf[rc] = '\0';
919 fprintf(stderr, "server said: %s", buf);
920 if (buf[rc-1] != '\n')
921 fprintf(stderr, "\n");
922 return -15;
923 }
924
925 return 0;
926 }
927
928 int
send_proposal(int s,char * proposal,int proposal_size)929 send_proposal(int s, char *proposal, int proposal_size)
930 {
931 int rc;
932
933 rc = send_exactly(s, proposal, (size_t) proposal_size);
934 assert(rc <= proposal_size);
935 if (rc < proposal_size) {
936 if (rc == -1)
937 perror("send");
938 return -16;
939 }
940
941 return 0;
942 }
943
944 /* Read response to a proposal. Return the size of the response. */
945 int
read_response(int s,char * buf,int max)946 read_response(int s, char *buf, int max)
947 {
948 int rc;
949
950 /* XXX: Assume that few-byte session response will come in one
951 TCP packet and read in one block. */
952 rc = recv(s, buf, max - 1, 0);
953 assert(rc < max);
954 if (rc == -1) {
955 perror("recv");
956 return -17;
957 } else if (rc == 0) {
958 return -18;
959 }
960 assert(rc > 0);
961 buf[rc] = '\0';
962 return rc;
963 }
964
965 int
thrulay_udp_report_final(void)966 thrulay_udp_report_final (void)
967 {
968 int rc;
969 char buf[65536];
970
971 snprintf(buf, sizeof(buf), "+%llu:", (long long unsigned)npackets);
972
973 rc = send_exactly(tcp_sock, buf, strlen(buf));
974 if (rc == -1)
975 return -19;
976
977 while ((rc = recv(tcp_sock, buf, sizeof(buf) - 1, 0)) != 0) {
978 if (rc == -1) {
979 perror("recv");
980 return -20;
981 }
982 write_exactly(STDOUT_FILENO, buf, rc);
983 }
984
985 return 0;
986 }
987
988 int
thrulay_udp_start(void)989 thrulay_udp_start (void)
990 {
991 int rc;
992 int val;
993 char buf[65536];
994 char random_state[256];
995 int to_write;
996 uint64_t packet;
997 uint32_t msb, lsb;
998 char nonce[8];
999 int n, response_size;
1000 double urand;
1001 double erand;
1002 double emean;
1003 struct timespec req, rem;
1004 struct timeval this, next;
1005 long long unsigned npackets_llu;
1006 int header_size = 0;
1007
1008 to_write = snprintf(buf, sizeof(buf), "%s:u:%u:%u:%u:%llu+",
1009 THRULAY_VERSION, client_port,
1010 thrulay_opt.block_size, protocol_rate,
1011 (long long unsigned)npackets);
1012 rc = send_proposal(tcp_sock, buf, to_write);
1013 if (rc < 0)
1014 return rc;
1015
1016 rc = timer_start();
1017 if (rc < 0)
1018 return rc;
1019
1020 response_size = read_response(tcp_sock, buf, sizeof(buf));
1021 if (response_size < 0)
1022 return response_size;
1023
1024 if (strcmp(buf, "u:-") == 0)
1025 return -21;
1026 rc = sscanf(buf, "%u:%u:%u:%llu:%n", &server_port, &packet_size,
1027 &protocol_rate, &npackets_llu, &n);
1028 if ((rc != 4) || (response_size != n+9) || (buf[n+8] != '+') ||
1029 (packet_size > (int)sizeof(buf))) {
1030 return -22;
1031 }
1032 memcpy(nonce, buf+n, sizeof(nonce));
1033 npackets = (uint64_t)npackets_llu;
1034
1035 /* Set target UDP server port and header size used for sending the
1036 * UDP packets. */
1037 switch (udp_destination->sa_family) {
1038 case AF_INET:
1039 {
1040 struct sockaddr_in *sin =
1041 (struct sockaddr_in *)udp_destination;
1042 sin->sin_port = htons(server_port);
1043
1044 header_size = IPV4_HEADER_SIZE + UDP_HEADER_SIZE;
1045 }
1046 break;
1047 case AF_INET6:
1048 {
1049 struct sockaddr_in6 *sin6 =
1050 (struct sockaddr_in6 *)udp_destination;
1051 sin6->sin6_port = htons(server_port);
1052
1053 header_size = IPV6_HEADER_SIZE + UDP_HEADER_SIZE;
1054 }
1055 break;
1056 default:
1057 return -23;
1058 }
1059
1060 /* Disable keep-alives on the control TCP connection. */
1061 val = 0;
1062 rc = setsockopt(tcp_sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&val,
1063 sizeof(val));
1064 if (rc == -1)
1065 error(ERR_WARNING, "failed to disable keep-alives");
1066
1067 #ifdef ENABLE_THRULAY_MULTICAST
1068 /* Set TTL field if requested. */
1069 if (1 != thrulay_opt.ttl) {
1070 switch (udp_destination->sa_family) {
1071 int rc;
1072 case AF_INET:
1073 rc = setsockopt(udp_sock, IPPROTO_IP,
1074 IP_MULTICAST_TTL,
1075 (void*)&thrulay_opt.ttl,
1076 sizeof(thrulay_opt.ttl));
1077 if (rc < 0) {
1078 error(ERR_WARNING,
1079 "setsockopt(IP_MULTICAST_TTL) failed, "
1080 "continuing.");
1081 }
1082 break;
1083 case AF_INET6:
1084 rc = setsockopt(udp_sock, IPPROTO_IPV6,
1085 IPV6_MULTICAST_HOPS,
1086 (void*)&thrulay_opt.ttl,
1087 sizeof(thrulay_opt.ttl));
1088 if (rc < 0) {
1089 error(ERR_WARNING,
1090 "setsockopt(IPV6_MULTICAST_HOPS) failed,"
1091 " continuing.");
1092 }
1093
1094 break;
1095 default:
1096 return -24;
1097 }
1098 }
1099 #endif
1100
1101 /* Increase the buffer space of the sending UDP socket. */
1102 set_window_size_directed(udp_sock, thrulay_opt.window, SO_SNDBUF);
1103
1104 memset(buf, '2', sizeof(buf)); /* Nothing special about '2'. */
1105 memcpy(buf, nonce, sizeof(nonce));
1106 #ifdef HAVE_INITSTATE
1107 initstate(getpid() + getppid() + time(NULL),
1108 random_state, sizeof(random_state));
1109 #else
1110 srand(time(NULL)+(unsigned int)random_state);
1111 #endif
1112
1113 /* RFC 2330: Chapter 11.1.3
1114 * lambda = 1/emean */
1115 emean = 1000.0/(double)protocol_rate;
1116
1117 /* Fill `next' timeval with current time */
1118 if (tsc_gettimeofday(&next) == -1) {
1119 perror("gettimeofday");
1120 return -1;
1121 }
1122 normalize_tv(&next);
1123
1124 for (packet = 0; packet < npackets; packet++) {
1125 /* RFC 2330: Chapter 11.1.3
1126 * (Generating Poisson Sampling Intervals)
1127 *
1128 * Calculate Ui, a uniformly distributed (pseudo) random number
1129 * between 0 and 1.
1130 */
1131 #ifdef HAVE_INITSTATE
1132 urand = (double)((random()+1.0)/(RANDOM_MAX+1.0));
1133 #else
1134 urand = (double)((rand()+1.0)/(RAND_MAX+1.0));
1135 #endif
1136 assert(urand > 0 && urand <= 1);
1137
1138 /* RFC 2330: Chapter 11.1.3
1139 *
1140 * Ei = -log(Ui) / lambda
1141 */
1142 erand = -log(urand) * emean;
1143
1144 /* Update `next' timeval. */
1145 next.tv_sec += floor(erand);
1146 next.tv_usec += (emean - floor(erand)) * 1000000;
1147 normalize_tv(&next);
1148
1149 /* Calculate and set sequence number */
1150 msb = htonl(packet >> 32); /* Most significant. */
1151 lsb = htonl(packet & ((1ULL<<32)-1));/* Least significant. */
1152 memcpy(buf+8, &msb, 4);
1153 memcpy(buf+12, &lsb, 4);
1154
1155 if (thrulay_opt.busywait) {
1156 /* Busy wait. */
1157
1158 do {
1159 if (tsc_gettimeofday(&this) == -1) {
1160 perror("gettimeofday");
1161 return -1;
1162 }
1163 normalize_tv(&this);
1164 } while (this.tv_sec < next.tv_sec ||
1165 (this.tv_sec == next.tv_sec &&
1166 this.tv_usec < next.tv_usec));
1167 } else {
1168 /* No busy wait. */
1169
1170 if (tsc_gettimeofday(&this) == -1) {
1171 perror("gettimeofday");
1172 return -1;
1173 }
1174 normalize_tv(&this);
1175
1176 /* Calculate how long to sleep. */
1177 req.tv_sec = next.tv_sec - this.tv_sec;
1178 req.tv_nsec = (next.tv_usec - this.tv_usec) * 1000;
1179 if (req.tv_nsec < 0) {
1180 --req.tv_sec;
1181 req.tv_nsec += 1000000000;
1182 }
1183
1184 /* Only sleep if requested sleeping time is positive.
1185 * If we have set a high rate with -u then we are
1186 * maybe late with this packet. */
1187 if (req.tv_sec >= 0) {
1188 /* According to nanosleep(2), the nanosecond
1189 * field has to be in the range 0 to 999999999.
1190 */
1191 assert(req.tv_nsec >= 0);
1192 assert(req.tv_nsec <= 999999999);
1193
1194 #ifndef WIN32
1195 do {
1196 rc = nanosleep(&req, &rem);
1197 if (rc == -1) {
1198 if (errno != EINTR) {
1199 perror("nanosleep");
1200 return -25;
1201 }
1202 req.tv_sec = rem.tv_sec;
1203 req.tv_nsec = rem.tv_nsec;
1204 }
1205 } while (rc != 0);
1206 #else
1207 Sleep(req.tv_sec*1000+req.tv_nsec/1000000);
1208 #endif
1209 }
1210 }
1211
1212 if (tsc_gettimeofday(&this) == -1) {
1213 perror("gettimeofday");
1214 return -1;
1215 }
1216 normalize_tv(&this);
1217
1218 tv2ntp(&this, buf+16);
1219
1220 rc = sendto(udp_sock, buf, packet_size - header_size, 0,
1221 udp_destination, udp_destination_len);
1222 if (rc == -1) {
1223 perror("sendto");
1224 return -26;
1225 }
1226 }
1227
1228 timer_stop();
1229
1230 sleep(1); /* Let the UDP traffic drain. */
1231 close(udp_sock);
1232
1233 if (thrulay_opt.reporting_verbosity > 0) {
1234 printf("Client runtime: %8.3fs\n", timer.runtime);
1235 }
1236
1237 /* UDP test completed successfully. */
1238 return 0;
1239 }
1240
1241 void
thrulay_udp_exit(void)1242 thrulay_udp_exit (void)
1243 {
1244 if (close(tcp_sock) == -1)
1245 error(ERR_WARNING, "thrulay_udp_exit(): Unable to close "
1246 "TCP connection socket.");
1247
1248 /* Free memory of server sockaddr */
1249 free(server);
1250 free(udp_destination);
1251 }
1252
1253 int
thrulay_udp_init(void)1254 thrulay_udp_init (void)
1255 {
1256 int rc;
1257 int tries;
1258 struct addrinfo hints, *res, *ressave;
1259 char service[7];
1260 socklen_t server_len;
1261
1262 tcp_sock = name2socket(thrulay_opt.server_name, thrulay_opt.port,
1263 0, NULL, (void *)&server, &server_len);
1264 if (tcp_sock < 0)
1265 return tcp_sock;
1266
1267 rc = read_greeting(tcp_sock);
1268 if (rc < 0)
1269 return rc;
1270
1271 memset(&hints, 0, sizeof(struct addrinfo));
1272 hints.ai_flags = AI_PASSIVE;
1273 /* Use same address family as for TCP socket */
1274 hints.ai_family = server->sa_family;
1275 hints.ai_socktype = SOCK_DGRAM;
1276
1277 client_port = UDP_PORT;
1278 tries = 0;
1279 do {
1280 client_port++;
1281 tries++;
1282
1283 snprintf(service, sizeof(service), "%d", client_port);
1284
1285 if ((rc = getaddrinfo(NULL, service, &hints,
1286 &res)) != 0) {
1287 fprintf(stderr, "getaddrinfo(): %s\n",
1288 gai_strerror(rc));
1289 return -10;
1290 }
1291 ressave = res;
1292
1293 do {
1294 udp_sock = socket(res->ai_family, res->ai_socktype,
1295 res->ai_protocol);
1296 if (udp_sock < 0)
1297 continue;
1298
1299 /* Differentiated Services (DS) */
1300 if (thrulay_opt.dscp) {
1301 rc = set_dscp(udp_sock, thrulay_opt.dscp);
1302 if (rc == -1)
1303 error(ERR_WARNING,
1304 "thrulay_udp_init(): "
1305 "Unable to set DSCP "
1306 "value.");
1307 }
1308
1309 if ((rc = bind(udp_sock, res->ai_addr,
1310 res->ai_addrlen)) == 0)
1311 break; /* success */
1312
1313 close(udp_sock);
1314 } while ((res = res->ai_next) != NULL);
1315 } while ((rc < 0) && (tries < TRY_UDP_PORTS));
1316 if (rc < 0)
1317 return -2;
1318
1319 freeaddrinfo(ressave);
1320
1321 /* Check whether test blocks should go to a multicast group or
1322 to the server address. */
1323 udp_destination = malloc(res->ai_addrlen);
1324 if (NULL == udp_destination) {
1325 perror("malloc");
1326 return -4;
1327 }
1328 if (NULL != thrulay_opt.mcast_group) {
1329 hints.ai_flags = AI_PASSIVE;
1330 hints.ai_family = server->sa_family;
1331 hints.ai_socktype = SOCK_STREAM;
1332
1333 if ((rc = getaddrinfo(thrulay_opt.mcast_group, service, &hints,
1334 &res)) != 0) {
1335 fprintf(stderr, "getaddrinfo(): %s\n",
1336 gai_strerror(rc));
1337 return -3;
1338 }
1339 memcpy(udp_destination, res->ai_addr, res->ai_addrlen);
1340 udp_destination_len = res->ai_addrlen;
1341 } else {
1342 /* No multicast group. UDP Destination address is the
1343 same as server address. */
1344 memcpy(udp_destination, server, server_len);
1345 udp_destination_len = server_len;
1346 }
1347
1348 /* Protocol rate is in packets per 1000 seconds
1349 *
1350 * thrulay_opt.rate is in bits per second:
1351 * => (thrulay_opt.rate/8) = bytes/second
1352 * => (1000 * thrulay_opt.rate/8) = bytes per 1000 seconds
1353 */
1354 protocol_rate = ((1000/8) * thrulay_opt.rate)/thrulay_opt.block_size;
1355
1356 /* npackets is number of packets to send in test
1357 *
1358 * npackets = protocol_rate / 1000 * thrulay_opt.test_duration
1359 */
1360 npackets = (thrulay_opt.test_duration*thrulay_opt.rate)/
1361 (8*thrulay_opt.block_size);
1362
1363 /* Successful initialization */
1364 return 0;
1365 }
1366
1367 void
thrulay_tcp_stop_id(int id)1368 thrulay_tcp_stop_id (int id)
1369 {
1370 /* Delete stream socket in FD set. So it won't be processed in test
1371 * loop. */
1372 FD_CLR(stream[id].sock, &rfds_orig);
1373 FD_CLR(stream[id].sock, &wfds_orig);
1374
1375 /* Close our testing socket. This tells the server that this test
1376 * has finished and the server will log test duration and average
1377 * throughput. */
1378 thrulay_tcp_exit_id(id);
1379 }
1380
1381 /* Stop TCP test. */
1382 void
thrulay_tcp_stop(void)1383 thrulay_tcp_stop (void)
1384 {
1385 stop_test = 1;
1386 }
1387
1388 /* Some common MTU sizes with topology. */
1389 struct _mtu_info {
1390 int mtu;
1391 char *top;
1392 } mtu_list[] = {
1393 { 65535, "Hyperchannel" }, /* RFC1374 */
1394 { 17914, "16 MB/s Token Ring" },
1395 { 16436, "Linux Loopback device" },
1396 { 9000, "Ethernet, jumbo-frames" }, /* Internet2 */
1397 { 8166, "802.4 Token Bus" }, /* RFC1042 */
1398 { 4464, "4 MB/s Token Ring" },
1399 { 4352, "FDDI" }, /* RFC1390 */
1400 { 1500, "Ethernet (or PPP)" }, /* RFC894, RFC1548 */
1401 { 1492, "IEEE 802.3" },
1402 { 1006, "SLIP" }, /* RFC1055 */
1403 { 576, "X.25 & ISDN" }, /* RFC1356 */
1404 { 296, "PPP (low delay)" },
1405 };
1406 #define MTU_LIST_NUM 11
1407
1408 /* Calculate MTU out of MSS.
1409 * Set's global variable `mtu' and returns pointer to topology info.
1410 *
1411 * According to RFC879:
1412 * MSS = MTU - sizeof(TCPHDR) - sizeof(IPHDR)
1413 *
1414 * Where:
1415 * 20 <= sizeof(IPHDR) <= 60
1416 * 20 <= sizeof(TCPHDR) <= 60
1417 *
1418 * This implies:
1419 * MSS + 40 <= MTU <= MSS + 120
1420 */
1421 char *
mtu_calc(int mss)1422 mtu_calc (int mss)
1423 {
1424 int i;
1425
1426 #ifdef IP_MTU
1427 if (mtu) {
1428 for (i = 0; i < MTU_LIST_NUM; i++) {
1429 if (mtu == mtu_list[i].mtu) {
1430 return (mtu_list[i].top);
1431 }
1432 }
1433 }
1434 return "unknown";
1435 #endif
1436
1437 for (i = 0; i < MTU_LIST_NUM; i++) {
1438 if (((mss + 40) <= mtu_list[i].mtu)
1439 && (mtu_list[i].mtu <= (mss + 120))) {
1440 mtu = mtu_list[i].mtu;
1441 return (mtu_list[i].top);
1442 }
1443 }
1444
1445 /* No match. Return default one. */
1446 mtu = mss + 40;
1447 return "unknown";
1448 }
1449
1450 /* Print test info before tests start. Displayed info includes local/remote
1451 * window, block size, MTU, MSS, test duration and reporting interval.
1452 */
1453 void
thrulay_tcp_info(void)1454 thrulay_tcp_info (void)
1455 {
1456 char *str_top = NULL;
1457
1458 if (thrulay_opt.reporting_verbosity < 0)
1459 return;
1460
1461 /* Print local/remote window and block size */
1462 printf("# local window = %dB; remote window = %dB\n",
1463 local_window, server_window);
1464 if (thrulay_opt.block_size == server_block_size) {
1465 printf("# block size = %dB\n", thrulay_opt.block_size);
1466 } else {
1467 printf("# requested block size = %dB; "
1468 "actual block size = %dB\n",
1469 thrulay_opt.block_size, server_block_size);
1470 }
1471
1472 str_top = mtu_calc(mss);
1473
1474 /* Print MTU, MSS, topology info */
1475 printf("# MTU: %dB; MSS: %dB; Topology guess: %s\n", mtu, mss,
1476 str_top);
1477 #ifdef IP_MTU
1478 printf("# MTU = getsockopt(IP_MTU); MSS = getsockopt(TCP_MAXSEG)\n");
1479 #else
1480 if (!strcmp(str_top, "unknown")) {
1481 printf("# MTU = MSS + 40; MSS = getsockopt(TCP_MAXSEG)\n");
1482 } else {
1483 printf("# MTU = guessed out of MSS as in RFC 879; "
1484 "MSS = getsockopt(TCP_MAXSEG)\n");
1485 }
1486 #endif
1487
1488 /* Print test duration and reporting interval. */
1489 printf("# test duration = %ds; ",thrulay_opt.test_duration);
1490 if (0 < thrulay_opt.reporting_interval) {
1491 printf("reporting interval = %ds\n",
1492 thrulay_opt.reporting_interval);
1493 } else {
1494 printf("intermediate reporting disabled\n");
1495 }
1496 printf("# delay (median) and jitter (interquartile spread of delay) "
1497 "are reported in ms\n");
1498 if (thrulay_opt.reporting_verbosity > 0) {
1499 printf("#(ID) begin, s end, s Mb/s RTT delay,ms "
1500 "jitter min avg max\n");
1501 } else {
1502 printf("#(ID) begin, s end, s Mb/s RTT delay,ms "
1503 "jitter\n");
1504 }
1505
1506 fflush(stdout);
1507 }
1508
1509 /* Start TCP test. */
1510 int
thrulay_tcp_start(void)1511 thrulay_tcp_start (void)
1512 {
1513 int rc;
1514 int id;
1515 struct timeval tv, timeout;
1516 fd_set rfds, wfds;
1517 char buf[STREAMS_MAX][1024];
1518 int to_write;
1519
1520 for (id = 0; id < thrulay_opt.num_streams; id++) {
1521 int my_server_window, my_server_block_size, my_mss = 0, sopt;
1522 socklen_t len;
1523 #ifdef IP_MTU
1524 int my_mtu;
1525 #endif
1526
1527 to_write = snprintf(buf[0], sizeof(buf[0]), "%s:t:%u:%u+",
1528 THRULAY_VERSION, thrulay_opt.window,
1529 thrulay_opt.block_size);
1530 assert(to_write > 0 && to_write < (int) sizeof(buf[0]));
1531
1532 rc = send_proposal(stream[id].sock, buf[0], to_write);
1533 if (rc < 0)
1534 return rc;
1535
1536 rc = read_response(stream[id].sock, buf[0], sizeof(buf[0]));
1537 if (rc < 0)
1538 return rc;
1539 my_server_window = my_server_block_size = -1;
1540 /* XXX: Very long numbers will not, of course, be processed
1541 * correctly by sscanf() below. We could, if the number
1542 * exceeds a certain value (e.g., has more than a certain
1543 * number of characters, use some ``large'' -- still supported
1544 * -- value for window or block size. It's not worth the
1545 * trouble. */
1546 rc = sscanf(buf[0], "%d:%d+", &my_server_window,
1547 &my_server_block_size);
1548 if (rc != 2)
1549 return -22;
1550 assert(my_server_window >= 0 && my_server_block_size >= 0);
1551 if (my_server_block_size < MIN_BLOCK)
1552 return -27;
1553 if (my_server_block_size > MAX_BLOCK)
1554 return -28;
1555 if (my_server_window < 1500)
1556 return -29;
1557
1558 #ifdef IP_MTU
1559 len = sizeof(my_mtu);
1560 if (getsockopt(stream[id].sock, SOL_IP, IP_MTU,
1561 (char *)&my_mtu, &len) == -1) {
1562 perror("getsockopt");
1563 error(ERR_WARNING, "unable to determine Path MTU");
1564 }
1565 #endif
1566
1567 /* Get Maximum Segment Size (MSS) */
1568 #ifdef TCP_MAXSEG
1569 len = sizeof(my_mss);
1570 if (getsockopt(stream[id].sock, SOL_TCP, TCP_MAXSEG,
1571 (char *)&my_mss, &len) == -1) {
1572 perror("getsockopt");
1573 error(ERR_WARNING, "unable to determine TCP_MAXSEG");
1574 }
1575 #else
1576 perror("getsockopt");
1577 error(ERR_WARNING,
1578 "getsockopt(TCP_MAXSEG) not supported on Windows");
1579 #endif
1580 /*
1581 * Check/set local/remote window, server block size, MSS.
1582 * As we display the local/remote window, server block size,
1583 * MSS, MTU only once, we check that this information is the
1584 * same for every stream.
1585 */
1586 if (id == 0) {
1587 /* If this is first stream, initialize global values.*/
1588 server_window = my_server_window;
1589 server_block_size = my_server_block_size;
1590 mss = my_mss;
1591 #ifdef IP_MTU
1592 mtu = my_mtu;
1593 #endif
1594 }
1595 if (server_window != my_server_window) {
1596 return -30;
1597 }
1598 if (server_block_size != my_server_block_size) {
1599 return -31;
1600 }
1601 if (mss != my_mss) {
1602 return -32;
1603 }
1604 #ifdef IP_MTU
1605 if (mtu != my_mtu) {
1606 return -33;
1607 }
1608 #endif
1609
1610 /* Differentiated Services (DS) */
1611 if (thrulay_opt.dscp) {
1612 rc = set_dscp(stream[id].sock, thrulay_opt.dscp);
1613 if (rc == -1)
1614 error(ERR_WARNING, "thrulay_tcp_init_id(): "
1615 "Unable to set DSCP value.");
1616 }
1617
1618 #ifndef WIN32
1619 assert((unsigned int)stream[id].sock < FD_SETSIZE);
1620 #endif
1621
1622 /* Set non-blocking IO. */
1623 #ifndef WIN32
1624 sopt = fcntl(stream[id].sock, F_GETFL);
1625
1626 if (-1 == sopt) {
1627 error(ERR_WARNING, "fcntl(F_GETFL): failed");
1628 } else {
1629 rc = fcntl(stream[id].sock,F_SETFL,sopt | O_NONBLOCK);
1630 if (-1 == rc) {
1631 error(ERR_WARNING, "fcntl(F_SETFL,O_NONBLOCK "
1632 "failed");
1633 }
1634 }
1635 #else
1636 sopt = ioctlsocket(stream[id].sock, FIONBIO, (unsigned long*)1);
1637 if (-1 == sopt) {
1638 error(ERR_WARNING, "ioctlsocket(FIONBIO): failed");
1639 }
1640 #endif /* ndef WIN32 */
1641 }
1642
1643 /* Allocate memory for writing blocks. */
1644 block = malloc((size_t)server_block_size +
1645 (thrulay_opt.num_streams -1) * BLOCK_HEADER);
1646 if (block == NULL) {
1647 return -4;
1648 }
1649
1650 thrulay_tcp_info();
1651
1652 rc = timer_start();
1653 if (rc < 0)
1654 return rc;
1655
1656 stop_test = 0;
1657 while (!stop_test) {
1658 rfds = rfds_orig;
1659 wfds = wfds_orig;
1660
1661 timeout.tv_sec = 0;
1662 timeout.tv_usec = 1000;
1663
1664 rc = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
1665
1666 if (rc < 0) {
1667 perror("select");
1668 return -34;
1669 }
1670
1671 if (rc == 0) {
1672 /* Timeout occured. Check timer. */
1673 timer_check();
1674 }
1675
1676 if (rc > 0) {
1677 /* Check all stream sockets. */
1678 for (id = 0; id < thrulay_opt.num_streams; id++) {
1679
1680 /* Recv from socket. */
1681 if (FD_ISSET(stream[id].sock, &rfds)) {
1682 if (0 == stream[id].rcount &&
1683 tsc_gettimeofday(&tv) == -1) {
1684 perror("gettimeofday");
1685 return -1;
1686 }
1687
1688 /* Non-blocking recv. */
1689 rc = recv(stream[id].sock,
1690 buf[id]+stream[id].rcount,
1691 BLOCK_HEADER-
1692 stream[id].rcount, 0);
1693
1694 if (-1 == rc && EAGAIN != errno) {
1695 perror("read");
1696 error(ERR_WARNING, "premature "
1697 "end of test");
1698 thrulay_tcp_stop_id(id);
1699 break;
1700 } else if (0 < rc) {
1701 stream[id].rcount += rc;
1702 if ( BLOCK_HEADER ==
1703 stream[id].rcount ) {
1704 /* Whole block read */
1705 memcpy(&tv, buf[id],
1706 sizeof(tv));
1707 rc = new_timestamp(id, &tv);
1708 if (rc < 0)
1709 return rc;
1710 stream[id].rcount = 0;
1711 }
1712 }
1713 }
1714
1715 /* Send to socket */
1716 if (FD_ISSET(stream[id].sock, &wfds)) {
1717 if (0 == stream[id].wcount) {
1718 if (tsc_gettimeofday(&tv) ==
1719 -1) {
1720 perror("gettimeofday");
1721 return -1;
1722 }
1723 memcpy(block + id*BLOCK_HEADER,
1724 &tv, sizeof(tv));
1725 }
1726
1727 /* Non-blocking send */
1728 rc = send(stream[id].sock,
1729 block + id*BLOCK_HEADER +
1730 stream[id].wcount,
1731 (size_t)server_block_size-
1732 stream[id].wcount, 0);
1733
1734 if (rc == -1 && EAGAIN != errno) {
1735 perror("send");
1736 error(ERR_WARNING, "premature "
1737 "end of test");
1738 thrulay_tcp_stop_id(id);
1739 } else if (rc > 0) {
1740 stream[id].wcount += rc;
1741 }
1742 if ( (size_t)server_block_size ==
1743 stream[id].wcount ) {
1744 /* Whole block written */
1745 stream[id].wcount = 0;
1746 }
1747 } /* Send to socket */
1748 } /* for(id = 0; id < thrulay_opt.num_streams; id++) */
1749
1750 timer_check();
1751 } /* if (rc > 0) { */
1752 }
1753 timer_stop();
1754
1755 /* Free memory of writing block. */
1756 free(block);
1757
1758 /* TCP test completed successfully. */
1759 return 0;
1760 }
1761
1762 /* Deinitialize TCP stream with ID `id'. */
1763 void
thrulay_tcp_exit_id(int id)1764 thrulay_tcp_exit_id (int id)
1765 {
1766 /* Close connection to server. */
1767 if (close(stream[id].sock) == -1)
1768 error(ERR_WARNING, "thrulay_tcp_exit_id(): Unable to close "
1769 "server socket.");
1770 }
1771
1772 /* Deinitialize all TCP streams. */
1773 void
thrulay_tcp_exit(void)1774 thrulay_tcp_exit (void)
1775 {
1776 int id;
1777
1778 for (id = 0; id < thrulay_opt.num_streams; id++) {
1779 thrulay_tcp_exit_id(id);
1780 }
1781
1782 tcp_stats_exit();
1783 }
1784
1785 /* Initialize TCP stream with ID `id'. */
1786 int
thrulay_tcp_init_id(int id)1787 thrulay_tcp_init_id (int id)
1788 {
1789 int rc, my_local_window;
1790 /* Open socket. Do not greet yet */
1791 stream[id].wcount = stream[id].rcount = 0;
1792 stream[id].sock = name2socket(thrulay_opt.server_name,
1793 thrulay_opt.port, thrulay_opt.window,
1794 &my_local_window, NULL, NULL);
1795
1796 if (stream[id].sock < 0)
1797 return stream[id].sock;
1798
1799 rc = read_greeting(stream[id].sock);
1800 if (rc < 0)
1801 return rc;
1802
1803 if (id == 0) {
1804 /* If this is first stream, initialize global values. */
1805 local_window = my_local_window;
1806 }
1807 if (local_window != my_local_window) {
1808 return -5;
1809 }
1810
1811 memset(block + id*BLOCK_HEADER, '2', (size_t)server_block_size);
1812
1813 return 0;
1814 }
1815
1816 /* Initialize all that has to do with TCP streams. */
1817 int
thrulay_tcp_init(void)1818 thrulay_tcp_init (void)
1819 {
1820 int rc, id;
1821
1822 rc = tcp_stats_init();
1823 if (rc < 0)
1824 return rc;
1825
1826 /* Clean recv/send FD sets for select(). */
1827 FD_ZERO(&rfds_orig);
1828 FD_ZERO(&wfds_orig);
1829
1830 for (id = 0; id < thrulay_opt.num_streams; id++) {
1831 /* Initialize TCP stream. */
1832 rc = thrulay_tcp_init_id(id);
1833 if (rc < 0)
1834 return rc;
1835
1836 /* FD set */
1837 FD_SET(stream[id].sock, &rfds_orig);
1838 FD_SET(stream[id].sock, &wfds_orig);
1839 maxfd = (stream[id].sock > maxfd ? stream[id].sock : maxfd);
1840 }
1841 #ifdef WIN32
1842 maxfd = FD_SETSIZE - 1;
1843 #endif
1844
1845 return 0;
1846 }
1847
1848 /* Initialize thrulay. */
1849 int
thrulay_client_init(thrulay_opt_t opt)1850 thrulay_client_init (thrulay_opt_t opt)
1851 {
1852 int rc;
1853 thrulay_opt = opt;
1854 tsc_init();
1855
1856 #ifdef WIN32
1857 {
1858 WSADATA wsaData;
1859 if (WSAStartup(MAKEWORD(2, 2), &wsaData)) {
1860 return -35;
1861 }
1862 }
1863 #endif
1864
1865 /* If client has set no block size, set default ones. */
1866 if (!thrulay_opt.block_size) {
1867 if (thrulay_opt.rate) {
1868 /* Set default UDP block size */
1869 thrulay_opt.block_size = 1500;
1870 } else {
1871 /* Set default TCP block size */
1872 thrulay_opt.block_size = 64 * 1024;
1873 }
1874 }
1875
1876 if (thrulay_opt.rate) {
1877 rc = thrulay_udp_init();
1878 } else {
1879 rc = thrulay_tcp_init();
1880 }
1881
1882 return rc;
1883 }
1884
1885 /* Deinitialize thrulay. */
1886 void
thrulay_client_exit(void)1887 thrulay_client_exit (void)
1888 {
1889
1890 if (thrulay_opt.rate) {
1891 thrulay_udp_exit();
1892 } else {
1893 thrulay_tcp_exit();
1894 }
1895
1896 #ifdef WIN32
1897 WSACleanup();
1898 #endif
1899 }
1900
1901 int
thrulay_client_start(void)1902 thrulay_client_start (void)
1903 {
1904 int rc;
1905
1906 if (thrulay_opt.rate) {
1907 rc = thrulay_udp_start();
1908 } else {
1909 rc = thrulay_tcp_start();
1910 }
1911
1912 return rc;
1913 }
1914
1915 int
thrulay_client_report_final(void)1916 thrulay_client_report_final (void)
1917 {
1918 int rc = 0;
1919
1920 if (thrulay_opt.reporting_verbosity < 0)
1921 return 0;
1922
1923 if (thrulay_opt.rate) {
1924 rc = thrulay_udp_report_final();
1925 } else {
1926 thrulay_tcp_report_final();
1927 }
1928
1929 return rc;
1930 }
1931