1 /*---------------------------------------------------------------
2 * Copyright (c) 1999,2000,2001,2002,2003
3 * The Board of Trustees of the University of Illinois
4 * All Rights Reserved.
5 *---------------------------------------------------------------
6 * Permission is hereby granted, free of charge, to any person
7 * obtaining a copy of this software (Iperf) and associated
8 * documentation files (the "Software"), to deal in the Software
9 * without restriction, including without limitation the
10 * rights to use, copy, modify, merge, publish, distribute,
11 * sublicense, and/or sell copies of the Software, and to permit
12 * persons to whom the Software is furnished to do
13 * so, subject to the following conditions:
14 *
15 *
16 * Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and
18 * the following disclaimers.
19 *
20 *
21 * Redistributions in binary form must reproduce the above
22 * copyright notice, this list of conditions and the following
23 * disclaimers in the documentation and/or other materials
24 * provided with the distribution.
25 *
26 *
27 * Neither the names of the University of Illinois, NCSA,
28 * nor the names of its contributors may be used to endorse
29 * or promote products derived from this Software without
30 * specific prior written permission.
31 *
32 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
33 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
34 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
35 * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT
36 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
37 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
38 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
39 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
40 * ________________________________________________________________
41 * National Laboratory for Applied Network Research
42 * National Center for Supercomputing Applications
43 * University of Illinois at Urbana-Champaign
44 * http://www.ncsa.uiuc.edu
45 * ________________________________________________________________
46 *
47 * Client.cpp
48 * by Mark Gates <mgates@nlanr.net>
49 * -------------------------------------------------------------------
50 * A client thread initiates a connect to the server and handles
51 * sending and receiving data, then closes the socket.
52 * ------------------------------------------------------------------- */
53 #include <ctime>
54 #include <cmath>
55 #include "headers.h"
56 #include "Client.hpp"
57 #include "Thread.h"
58 #include "SocketAddr.h"
59 #include "PerfSocket.hpp"
60 #include "Extractor.h"
61 #include "delay.h"
62 #include "util.h"
63 #include "Locale.h"
64 #include "isochronous.hpp"
65 #include "pdfs.h"
66 #include "version.h"
67 #include "payloads.h"
68 #include "active_hosts.h"
69
70 // const double kSecs_to_usecs = 1e6;
71 const double kSecs_to_nsecs = 1e9;
72 const int kBytes_to_Bits = 8;
73
74 #define VARYLOAD_PERIOD 0.1 // recompute the variable load every n seconds
75 #define MAXUDPBUF 1470
76
Client(thread_Settings * inSettings)77 Client::Client (thread_Settings *inSettings) {
78 #ifdef HAVE_THREAD_DEBUG
79 thread_debug("Client constructor with thread %p sum=%p (flags=%x)", (void *) inSettings, (void *)inSettings->mSumReport, inSettings->flags);
80 #endif
81 mSettings = inSettings;
82 myJob = NULL;
83 myReport = NULL;
84 framecounter = NULL;
85 one_report = false;
86 udp_payload_minimum = 1;
87 apply_first_udppkt_delay = false;
88
89 memset(&scratchpad, 0, sizeof(struct ReportStruct));
90 reportstruct = &scratchpad;
91 reportstruct->packetID = 1;
92 mySocket = isServerReverse(mSettings) ? mSettings->mSock : INVALID_SOCKET;
93 connected = isServerReverse(mSettings);
94 if (isCompat(mSettings) && isPeerVerDetect(mSettings)) {
95 fprintf(stderr, "%s", warn_compat_and_peer_exchange);
96 unsetPeerVerDetect(mSettings);
97 }
98
99 pattern(mSettings->mBuf, mSettings->mBufLen);
100 if (isFileInput(mSettings)) {
101 if (!isSTDIN(mSettings))
102 Extractor_Initialize(mSettings->mFileName, mSettings->mBufLen, mSettings);
103 else
104 Extractor_InitializeFile(stdin, mSettings->mBufLen, mSettings);
105
106 if (!Extractor_canRead(mSettings)) {
107 unsetFileInput(mSettings);
108 }
109 }
110 if (isIsochronous(mSettings)) {
111 FAIL_errno(!(mSettings->mFPS > 0.0), "Invalid value for frames per second in the isochronous settings\n", mSettings);
112 }
113 peerclose = false;
114 isburst = (isIsochronous(mSettings) || isPeriodicBurst(mSettings) || (isTripTime(mSettings) && !isUDP(mSettings)));
115 } // end Client
116
117 /* -------------------------------------------------------------------
118 * Destructor
119 * ------------------------------------------------------------------- */
~Client()120 Client::~Client () {
121 #if HAVE_THREAD_DEBUG
122 thread_debug("Client destructor sock=%d report=%p server-reverse=%s fullduplex=%s", \
123 mySocket, (void *) mSettings->reporthdr, \
124 (isServerReverse(mSettings) ? "true" : "false"), (isFullDuplex(mSettings) ? "true" : "false"));
125 #endif
126 DELETE_PTR(framecounter);
127 } // end ~Client
128
129
130 /* -------------------------------------------------------------------
131 * Setup a socket connected to a server.
132 * If inLocalhost is not null, bind to that address, specifying
133 * which outgoing interface to use.
134 * ------------------------------------------------------------------- */
my_connect(bool close_on_fail)135 bool Client::my_connect (bool close_on_fail) {
136 int rc;
137 double connecttime = -1.0;
138 // create an internet socket
139 int type = (isUDP(mSettings) ? SOCK_DGRAM : SOCK_STREAM);
140 int domain = (SockAddr_isIPv6(&mSettings->peer) ?
141 #ifdef HAVE_IPV6
142 AF_INET6
143 #else
144 AF_INET
145 #endif
146 : AF_INET);
147
148 mySocket = socket(domain, type, 0);
149 WARN_errno(mySocket == INVALID_SOCKET, "socket");
150 // Socket is carried both by the object and the thread
151 mSettings->mSock=mySocket;
152 SetSocketOptions(mSettings);
153 SockAddr_localAddr(mSettings);
154 SockAddr_remoteAddr(mSettings);
155 if (mSettings->mLocalhost != NULL) {
156 // bind socket to local address
157 rc = bind(mySocket, reinterpret_cast<sockaddr*>(&mSettings->local),
158 SockAddr_get_sizeof_sockaddr(&mSettings->local));
159 WARN_errno(rc == SOCKET_ERROR, "bind");
160 }
161
162 // connect socket
163 connected = false;
164 if (!isUDP(mSettings)) {
165 int trycnt = mSettings->mConnectRetries + 1;
166 while (trycnt > 0) {
167 connect_start.setnow();
168 rc = connect(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer),
169 SockAddr_get_sizeof_sockaddr(&mSettings->peer));
170 WARN_errno((rc == SOCKET_ERROR), "tcp connect");
171 if (rc == SOCKET_ERROR) {
172 if ((--trycnt) <= 0) {
173 if (close_on_fail) {
174 close(mySocket);
175 mySocket = INVALID_SOCKET;
176 }
177 } else {
178 delay_loop(200000);
179 }
180 } else {
181 connect_done.setnow();
182 connecttime = 1e3 * connect_done.subSec(connect_start);
183 mSettings->connecttime = connecttime;
184 connected = true;
185 break;
186 }
187 }
188 } else {
189 rc = connect(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer),
190 SockAddr_get_sizeof_sockaddr(&mSettings->peer));
191 connecttime = 0.0; // UDP doesn't have a 3WHS
192 WARN_errno((rc == SOCKET_ERROR), "udp connect");
193 if (rc != SOCKET_ERROR)
194 connected = true;
195 }
196 if (connected) {
197 // Set the send timeout for the very first write which has the test exchange
198 int sosndtimer = TESTEXCHANGETIMEOUT; // 4 sec in usecs
199 SetSocketOptionsSendTimeout(mSettings, sosndtimer);
200 getsockname(mySocket, reinterpret_cast<sockaddr*>(&mSettings->local), &mSettings->size_local);
201 getpeername(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer), &mSettings->size_peer);
202 SockAddr_Ifrname(mSettings);
203 if (isUDP(mSettings) && !isIsochronous(mSettings) && !isIPG(mSettings)) {
204 mSettings->mBurstIPG = get_delay_target() / 1e3; // this is being set for the settings report only
205 }
206 } else {
207 connecttime = -1;
208 if (mySocket != INVALID_SOCKET) {
209 int rc = close(mySocket);
210 WARN_errno(rc == SOCKET_ERROR, "client connect close");
211 mySocket = INVALID_SOCKET;
212 }
213 }
214 if (isReport(mSettings) && isSettingsReport(mSettings)) {
215 struct ReportHeader *tmp = InitSettingsReport(mSettings);
216 assert(tmp!=NULL);
217 PostReport(tmp);
218 setNoSettReport(mSettings);
219 }
220 // Post the connect report unless peer version exchange is set
221 if (isConnectionReport(mSettings) && !isSumOnly(mSettings)) {
222 if (connected) {
223 struct ReportHeader *reporthdr = InitConnectionReport(mSettings, connecttime);
224 struct ConnectionInfo *cr = static_cast<struct ConnectionInfo *>(reporthdr->this_report);
225 cr->connect_timestamp.tv_sec = connect_start.getSecs();
226 cr->connect_timestamp.tv_usec = connect_start.getUsecs();
227 assert(reporthdr);
228 PostReport(reporthdr);
229 } else {
230 PostReport(InitConnectionReport(mSettings, -1));
231 }
232 }
233 return connected;
234 } // end Connect
235
isConnected() const236 bool Client::isConnected () const {
237 #ifdef HAVE_THREAD_DEBUG
238 // thread_debug("Client is connected %d", connected);
239 #endif
240 return connected;
241 }
242
TxDelay()243 void Client::TxDelay () {
244 if (isTxHoldback(mSettings)) {
245 clock_usleep(&mSettings->txholdback_timer);
246 }
247 }
248
249 // return true of tcpi stats were sampled
250 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
myReportPacket(bool sample_tcpi)251 inline bool Client::myReportPacket (bool sample_tcpi) {
252 bool rc = false;
253 if (sample_tcpi) {
254 rc = ReportPacket(myReport, reportstruct, &my_tcpi_stats);
255 } else {
256 ReportPacket(myReport, reportstruct, NULL);
257 }
258 reportstruct->packetLen = 0;
259 return rc;
260 }
myReportPacket()261 inline void Client::myReportPacket () {
262 ReportPacket(myReport, reportstruct, NULL);
263 reportstruct->packetLen = 0;
264 }
265 #else
myReportPacket(void)266 inline void Client::myReportPacket (void) {
267 ReportPacket(myReport, reportstruct);
268 reportstruct->packetLen = 0;
269 }
270 #endif
271
272
273 // There are multiple startup synchronizations, this code
274 // handles them all. The caller decides to apply them
275 // either before connect() or after connect() and before writes()
StartSynch()276 int Client::StartSynch () {
277 #ifdef HAVE_THREAD_DEBUG
278 thread_debug("Client start sync enterred");
279 #endif
280
281 myJob = InitIndividualReport(mSettings);
282 myReport = static_cast<struct ReporterData *>(myJob->this_report);
283 myReport->info.common->socket=mySocket;
284
285 // Perform delays, usually between connect() and data xfer though before connect
286 // Two delays are supported:
287 // o First is an absolute start time per unix epoch format
288 // o Second is a holdback, a relative amount of seconds between the connect and data xfers
289 // check for an epoch based start time
290 reportstruct->packetLen = 0;
291 if (!isServerReverse(mSettings)) {
292 if (!isCompat(mSettings)) {
293 reportstruct->packetLen = SendFirstPayload();
294 // Reverse UDP tests need to retry "first sends" a few times
295 // before going to server or read mode
296 if (isReverse(mSettings) && isUDP(mSettings)) {
297 reportstruct->packetLen = 0;
298 fd_set set;
299 struct timeval timeout;
300 int resend_udp = 100;
301 while (--resend_udp > 0) {
302 FD_ZERO(&set);
303 FD_SET(mySocket, &set);
304 timeout.tv_sec = 0;
305 timeout.tv_usec = rand() % 20000; // randomize IPG a bit
306 if (select(mySocket + 1, &set, NULL, NULL, &timeout) == 0) {
307 reportstruct->packetLen = SendFirstPayload();
308 // printf("**** resend sock=%d count=%d\n", mySocket, resend_udp);
309 } else {
310 break;
311 }
312 }
313 }
314 }
315 if (isTxStartTime(mSettings)) {
316 clock_usleep_abstime(&mSettings->txstart_epoch);
317 } else if (isTxHoldback(mSettings)) {
318 TxDelay();
319 }
320 // Server side client
321 } else if (isTripTime(mSettings) || isPeriodicBurst(mSettings)) {
322 reportstruct->packetLen = SendFirstPayload();
323 }
324 if (isIsochronous(mSettings) || isPeriodicBurst(mSettings)) {
325 Timestamp tmp;
326 tmp.set(mSettings->txstart_epoch.tv_sec, mSettings->txstart_epoch.tv_usec);
327 framecounter = new Isochronous::FrameCounter(mSettings->mFPS, tmp);
328 }
329 int setfullduplexflag = 0;
330 if (isFullDuplex(mSettings) && !isServerReverse(mSettings)) {
331 assert(mSettings->mFullDuplexReport != NULL);
332 if ((setfullduplexflag = fullduplex_start_barrier(&mSettings->mFullDuplexReport->fullduplex_barrier)) < 0)
333 return -1;
334 }
335 SetReportStartTime();
336 if (reportstruct->packetLen > 0) {
337 reportstruct->packetTime = myReport->info.ts.startTime;
338 reportstruct->sentTime = reportstruct->packetTime;
339 reportstruct->prevSentTime = reportstruct->packetTime;
340 reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime;
341 myReportPacket();
342 myReport->info.ts.prevpacketTime = reportstruct->packetTime;
343 reportstruct->packetID++;
344 }
345 if (setfullduplexflag) {
346 SetFullDuplexReportStartTime();
347 }
348 // Full duplex sockets need to be syncronized
349 #ifdef HAVE_THREAD_DEBUG
350 thread_debug("Client start sync exited");
351 #endif
352 return 0;
353 }
354
SetFullDuplexReportStartTime()355 inline void Client::SetFullDuplexReportStartTime () {
356 assert(myReport->FullDuplexReport != NULL);
357 struct TransferInfo *fullduplexstats = &myReport->FullDuplexReport->info;
358 assert(fullduplexstats != NULL);
359 if (TimeZero(fullduplexstats->ts.startTime)) {
360 fullduplexstats->ts.startTime = myReport->info.ts.startTime;
361 if (isModeTime(mSettings)) {
362 fullduplexstats->ts.nextTime = myReport->info.ts.nextTime;
363 }
364 }
365 #ifdef HAVE_THREAD_DEBUG
366 thread_debug("Client fullduplex report start=%ld.%ld next=%ld.%ld", fullduplexstats->ts.startTime.tv_sec, fullduplexstats->ts.startTime.tv_usec, fullduplexstats->ts.nextTime.tv_sec, fullduplexstats->ts.nextTime.tv_usec);
367 #endif
368 }
SetReportStartTime()369 inline void Client::SetReportStartTime () {
370 assert(myReport!=NULL);
371 now.setnow();
372 myReport->info.ts.startTime.tv_sec = now.getSecs();
373 myReport->info.ts.startTime.tv_usec = now.getUsecs();
374 myReport->info.ts.IPGstart = myReport->info.ts.startTime;
375 myReport->info.ts.prevpacketTime = myReport->info.ts.startTime;
376 if (!TimeZero(myReport->info.ts.intervalTime)) {
377 myReport->info.ts.nextTime = myReport->info.ts.startTime;
378 TimeAdd(myReport->info.ts.nextTime, myReport->info.ts.intervalTime);
379 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
380 myReport->info.ts.nextTCPStampleTime = myReport->info.ts.nextTime;
381 #endif
382 }
383 if (myReport->GroupSumReport) {
384 struct TransferInfo *sumstats = &myReport->GroupSumReport->info;
385 assert(sumstats != NULL);
386 Mutex_Lock(&myReport->GroupSumReport->reference.lock);
387 if (TimeZero(sumstats->ts.startTime)) {
388 sumstats->ts.startTime = myReport->info.ts.startTime;
389 if (isModeTime(mSettings)) {
390 sumstats->ts.nextTime = myReport->info.ts.nextTime;
391 }
392 #ifdef HAVE_THREAD_DEBUG
393 thread_debug("Client group sum report start=%ld.%ld next=%ld.%ld", sumstats->ts.startTime.tv_sec, sumstats->ts.startTime.tv_usec, sumstats->ts.nextTime.tv_sec, sumstats->ts.nextTime.tv_usec);
394 #endif
395 }
396 Mutex_Unlock(&myReport->GroupSumReport->reference.lock);
397 }
398 #ifdef HAVE_THREAD_DEBUG
399 thread_debug("Client(%d) report start/ipg=%ld.%ld next=%ld.%ld", mSettings->mSock, myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec, myReport->info.ts.nextTime.tv_sec, myReport->info.ts.nextTime.tv_usec);
400 #endif
401 }
402
ConnectPeriodic()403 void Client::ConnectPeriodic () {
404 Timestamp end;
405 Timestamp next;
406 unsigned int amount_usec = 1000000;
407 if (isModeTime(mSettings)) {
408 amount_usec = (mSettings->mAmount * 10000);
409 end.add(amount_usec); // add in micro seconds
410 }
411 setNoConnectSync(mSettings);
412 int num_connects = -1;
413 if (!(mSettings->mInterval > 0)) {
414 if (mSettings->connectonly_count < 0)
415 num_connects = 10;
416 else if (mSettings->connectonly_count > 0)
417 num_connects = mSettings->connectonly_count;
418 }
419
420 do {
421 if (my_connect(false)){
422 int rc = close(mySocket);
423 WARN_errno(rc == SOCKET_ERROR, "client close");
424 mySocket = INVALID_SOCKET;
425 }
426 if (mSettings->mInterval > 0) {
427 now.setnow();
428 do {
429 next.add(mSettings->mInterval);
430 } while (next.before(now));
431 if (next.before(end)) {
432 struct timeval tmp;
433 tmp.tv_sec = next.getSecs();
434 tmp.tv_usec = next.getUsecs();
435 clock_usleep_abstime(&tmp);
436 }
437 }
438 if (num_connects > 0) {
439 --num_connects;
440 }
441 } while (num_connects && !sInterupted && (next.before(end) || (isModeTime(mSettings) && !(mSettings->mInterval > 0))));
442 }
443 /* -------------------------------------------------------------------
444 * Common traffic loop intializations
445 * ------------------------------------------------------------------- */
InitTrafficLoop()446 void Client::InitTrafficLoop () {
447 // Enable socket write timeouts for responsive reporting
448 // Do this after the connection establishment
449 // and after Client::InitiateServer as during these
450 // default socket timeouts are preferred.
451 int sosndtimer = 0;
452 // sosndtimer units microseconds
453 // mInterval units are microseconds, mAmount units is 10 ms
454 // SetSocketOptionsSendTimeout takes microseconds
455 // Set the timeout value to 1/2 the interval (per -i) or 1/2 the -t value
456 if (isPeriodicBurst(mSettings) && (mSettings->mFPS > 0.0)) {
457 sosndtimer = static_cast<int>(round(250000.0 / mSettings->mFPS));
458 } else if (mSettings->mInterval > 0) {
459 sosndtimer = static_cast<int>(mSettings->mInterval / 2);
460 } else {
461 sosndtimer = static_cast<int>((mSettings->mAmount * 10000) / 2);
462 }
463 SetSocketOptionsSendTimeout(mSettings, sosndtimer);
464 // set the lower bounds delay based of the socket timeout timer
465 // units needs to be in nanoseconds
466 delay_lower_bounds = static_cast<double>(sosndtimer) * -1e3;
467
468 if (isIsochronous(mSettings))
469 myReport->info.matchframeID = 1;
470
471 // set the total bytes sent to zero
472 totLen = 0;
473 if (isModeTime(mSettings)) {
474 mEndTime.setnow();
475 mEndTime.add(mSettings->mAmount / 100.0);
476 }
477 readAt = mSettings->mBuf;
478 lastPacketTime.set(myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec);
479 reportstruct->errwrite=WriteNoErr;
480 reportstruct->emptyreport=0;
481 reportstruct->packetLen = 0;
482 // Finally, post this thread's "job report" which the reporter thread
483 // will continuously process as long as there are packets flowing
484 // right now the ring is empty
485 if (!isReverse(mSettings) && !isSingleUDP(mSettings) && isDataReport(mSettings)) {
486 assert(myJob!=NULL);
487 assert(myReport!=NULL);
488 PostReport(myJob);
489 }
490 one_report = (!isUDP(mSettings) && !isEnhanced(mSettings) && (mSettings->mIntervalMode != kInterval_Time) \
491 && !isIsochronous(mSettings) && !isPeriodicBurst(mSettings) && !isTripTime(mSettings) && !isReverse(mSettings));
492 }
493
494 /* -------------------------------------------------------------------
495 * Run the appropriate send loop between
496 *
497 * 1) TCP without rate limiting
498 * 2) TCP with rate limiting
499 * 3) UDP
500 * 4) UDP isochronous w/vbr
501 *
502 * ------------------------------------------------------------------- */
Run()503 void Client::Run () {
504 // Initialize the report struct scratch pad
505 // Peform common traffic setup
506 InitTrafficLoop();
507 /*
508 * UDP
509 */
510 if (isUDP(mSettings)) {
511 if (isFileInput(mSettings)) {
512 // Due to the UDP timestamps etc, included
513 // reduce the read size by an amount
514 // equal to the header size
515 Extractor_reduceReadSize(sizeof(struct UDP_datagram), mSettings);
516 readAt += sizeof(struct UDP_datagram);
517 }
518 // Launch the approprate UDP traffic loop
519 if (isIsochronous(mSettings)) {
520 RunUDPIsochronous();
521 } else {
522 RunUDP();
523 }
524 } else {
525 // Launch the approprate TCP traffic loop
526 if (mSettings->mAppRate > 0) {
527 RunRateLimitedTCP();
528 } else if (isNearCongest(mSettings)) {
529 RunNearCongestionTCP();
530 #if HAVE_DECL_TCP_NOTSENT_LOWAT
531 } else if (isWritePrefetch(mSettings) && \
532 !isIsochronous(mSettings) && !isPeriodicBurst(mSettings)) {
533 RunWriteEventsTCP();
534 #endif
535 } else {
536 RunTCP();
537 }
538 }
539 }
540
541 /*
542 * TCP send loop
543 */
RunTCP()544 void Client::RunTCP () {
545 int burst_remaining = 0;
546 int burst_id = 1;
547 int writelen = mSettings->mBufLen;
548 now.setnow();
549 reportstruct->packetTime.tv_sec = now.getSecs();
550 reportstruct->packetTime.tv_usec = now.getUsecs();
551 while (InProgress()) {
552 if (isModeAmount(mSettings)) {
553 writelen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
554 }
555 if (isburst && !(burst_remaining > 0)) {
556 if (isIsochronous(mSettings)) {
557 assert(mSettings->mMean);
558 burst_remaining = static_cast<int>(lognormal(mSettings->mMean,mSettings->mVariance)) / (mSettings->mFPS * 8);
559 } else if (isPeriodicBurst(mSettings)){
560 assert(mSettings->mBurstSize);
561 burst_remaining = mSettings->mBurstSize;
562 } else {
563 burst_remaining = mSettings->mBufLen;
564 }
565 // check for TCP minimum payload
566 if (burst_remaining < static_cast<int>(sizeof(struct TCP_burst_payload)))
567 burst_remaining = static_cast<int>(sizeof(struct TCP_burst_payload));
568 // apply scheduling if needed
569 if (framecounter) {
570 burst_id = framecounter->wait_tick();
571 if (isPeriodicBurst(mSettings)) {
572 // low duty cycle traffic needs special event handling
573 now.setnow();
574 reportstruct->packetTime.tv_sec = now.getSecs();
575 reportstruct->packetTime.tv_usec = now.getUsecs();
576 if (!InProgress()) {
577 reportstruct->packetLen = 0;
578 reportstruct->emptyreport = 1;
579 // wait may have crossed the termination boundry
580 break;
581 } else {
582 //time interval crossings may have occurred during the wait
583 //post a null event to cause the report to flush the packet ring
584 PostNullEvent();
585 }
586 }
587 #if HAVE_DECL_TCP_NOTSENT_LOWAT
588 if (isWritePrefetch(mSettings)) {
589 AwaitWriteSelectEventTCP();
590 }
591 #endif
592 }
593 now.setnow();
594 reportstruct->packetTime.tv_sec = now.getSecs();
595 reportstruct->packetTime.tv_usec = now.getUsecs();
596 WriteTcpTxHdr(reportstruct, burst_remaining, burst_id++);
597 reportstruct->sentTime = reportstruct->packetTime;
598 myReport->info.ts.prevsendTime = reportstruct->packetTime;
599 writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen;
600 // perform write, full header must succeed
601 reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen);
602 FAIL_errno(reportstruct->packetLen < (intmax_t) sizeof(struct TCP_burst_payload), "burst written", mSettings);
603 } else {
604 // printf("pl=%ld\n",reportstruct->packetLen);
605 // perform write
606 if (isburst)
607 writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen;
608 #if HAVE_DECL_TCP_NOTSENT_LOWAT
609 if (isWritePrefetch(mSettings)) {
610 AwaitWriteSelectEventTCP();
611 }
612 #endif
613 reportstruct->packetLen = write(mySocket, mSettings->mBuf, writelen);
614 now.setnow();
615 reportstruct->packetTime.tv_sec = now.getSecs();
616 reportstruct->packetTime.tv_usec = now.getUsecs();
617 reportstruct->sentTime = reportstruct->packetTime;
618 }
619 if (reportstruct->packetLen <= 0) {
620 if (reportstruct->packetLen == 0) {
621 peerclose = true;
622 } else if (NONFATALTCPWRITERR(errno)) {
623 reportstruct->errwrite=WriteErrAccount;
624 } else if (FATALTCPWRITERR(errno)) {
625 reportstruct->errwrite=WriteErrFatal;
626 WARN_errno(1, "tcp write");
627 break;
628 } else {
629 reportstruct->errwrite=WriteErrNoAccount;
630 }
631 reportstruct->packetLen = 0;
632 reportstruct->emptyreport = 1;
633 } else {
634 reportstruct->emptyreport = 0;
635 totLen += reportstruct->packetLen;
636 reportstruct->errwrite=WriteNoErr;
637 if (isburst) {
638 burst_remaining -= reportstruct->packetLen;
639 if (burst_remaining > 0) {
640 reportstruct->transit_ready = 0;
641 } else {
642 reportstruct->transit_ready = 1;
643 }
644 }
645 }
646 if (isModeAmount(mSettings) && !reportstruct->emptyreport) {
647 /* mAmount may be unsigned, so don't let it underflow! */
648 if (mSettings->mAmount >= static_cast<unsigned long>(reportstruct->packetLen)) {
649 mSettings->mAmount -= static_cast<unsigned long>(reportstruct->packetLen);
650 } else {
651 mSettings->mAmount = 0;
652 }
653 }
654 if (!one_report) {
655 myReportPacket();
656 }
657 }
658 FinishTrafficActions();
659 }
660
661 /*
662 * TCP send loop
663 */
RunNearCongestionTCP()664 void Client::RunNearCongestionTCP () {
665 int burst_remaining = 0;
666 int burst_id = 1;
667 now.setnow();
668 reportstruct->packetTime.tv_sec = now.getSecs();
669 reportstruct->packetTime.tv_usec = now.getUsecs();
670 while (InProgress()) {
671 if (isModeAmount(mSettings)) {
672 reportstruct->packetLen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
673 } else {
674 reportstruct->packetLen = mSettings->mBufLen;
675 }
676 if (!burst_remaining) {
677 burst_remaining = mSettings->mBufLen;
678 // mAmount check
679 now.setnow();
680 reportstruct->packetTime.tv_sec = now.getSecs();
681 reportstruct->packetTime.tv_usec = now.getUsecs();
682 WriteTcpTxHdr(reportstruct, burst_remaining, burst_id++);
683 reportstruct->sentTime = reportstruct->packetTime;
684 myReport->info.ts.prevsendTime = reportstruct->packetTime;
685 // perform write
686 int writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen;
687 reportstruct->packetLen = write(mySocket, mSettings->mBuf, writelen);
688 assert(reportstruct->packetLen >= (intmax_t) sizeof(struct TCP_burst_payload));
689 goto ReportNow;
690 }
691 if (reportstruct->packetLen > burst_remaining) {
692 reportstruct->packetLen = burst_remaining;
693 }
694 // printf("pl=%ld\n",reportstruct->packetLen);
695 // perform write
696 reportstruct->packetLen = write(mySocket, mSettings->mBuf, reportstruct->packetLen);
697 now.setnow();
698 reportstruct->packetTime.tv_sec = now.getSecs();
699 reportstruct->packetTime.tv_usec = now.getUsecs();
700 reportstruct->sentTime = reportstruct->packetTime;
701 ReportNow:
702 reportstruct->transit_ready = 0;
703 if (reportstruct->packetLen < 0) {
704 if (NONFATALTCPWRITERR(errno)) {
705 reportstruct->errwrite=WriteErrAccount;
706 } else if (FATALTCPWRITERR(errno)) {
707 reportstruct->errwrite=WriteErrFatal;
708 WARN_errno(1, "tcp write");
709 break;
710 } else {
711 reportstruct->errwrite=WriteErrNoAccount;
712 }
713 reportstruct->packetLen = 0;
714 reportstruct->emptyreport = 1;
715 } else {
716 reportstruct->emptyreport = 0;
717 totLen += reportstruct->packetLen;
718 reportstruct->errwrite=WriteNoErr;
719 burst_remaining -= reportstruct->packetLen;
720 if (burst_remaining <= 0) {
721 reportstruct->transit_ready = 1;
722 }
723 }
724 if (isModeAmount(mSettings) && !reportstruct->emptyreport) {
725 /* mAmount may be unsigned, so don't let it underflow! */
726 if (mSettings->mAmount >= static_cast<unsigned long>(reportstruct->packetLen)) {
727 mSettings->mAmount -= static_cast<unsigned long>(reportstruct->packetLen);
728 } else {
729 mSettings->mAmount = 0;
730 }
731 }
732 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
733 // apply placing after write burst completes
734 if (reportstruct->transit_ready && myReportPacket(true)) {
735 int pacing_timer = static_cast<int>(std::ceil(static_cast<double>(my_tcpi_stats.tcpi_rtt) * mSettings->rtt_nearcongest_divider));
736 // printf("**** delaytime = %d\n", delaytime);
737 delay_loop(pacing_timer);
738 } else
739 #endif
740 {
741 myReportPacket();
742 }
743 }
744 FinishTrafficActions();
745 }
746
747 /*
748 * A version of the transmit loop that supports TCP rate limiting using a token bucket
749 */
RunRateLimitedTCP()750 void Client::RunRateLimitedTCP () {
751 double tokens = 0;
752 Timestamp time1, time2;
753 int burst_size = mSettings->mBufLen;
754 int burst_remaining = 0;
755 int burst_id = 1;
756
757 long var_rate = mSettings->mAppRate;
758 int fatalwrite_err = 0;
759
760 now.setnow();
761 reportstruct->packetTime.tv_sec = now.getSecs();
762 reportstruct->packetTime.tv_usec = now.getUsecs();
763 while (InProgress() && !fatalwrite_err) {
764 // Add tokens per the loop time
765 time2.setnow();
766 if (isVaryLoad(mSettings)) {
767 static Timestamp time3;
768 if (time2.subSec(time3) >= VARYLOAD_PERIOD) {
769 var_rate = lognormal(mSettings->mAppRate,mSettings->mVariance);
770 time3 = time2;
771 if (var_rate < 0)
772 var_rate = 0;
773 }
774 }
775 tokens += time2.subSec(time1) * (var_rate / 8.0);
776 time1 = time2;
777 if (tokens >= 0.0) {
778 if (isModeAmount(mSettings)) {
779 reportstruct->packetLen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
780 } else {
781 reportstruct->packetLen = mSettings->mBufLen;
782 }
783 // perform write
784 int n = 0;
785 if (isTripTime(mSettings)) {
786 if (burst_remaining == 0) {
787 now.setnow();
788 reportstruct->packetTime.tv_sec = now.getSecs();
789 reportstruct->packetTime.tv_usec = now.getUsecs();
790 WriteTcpTxHdr(reportstruct, burst_size, burst_id++);
791 reportstruct->sentTime = reportstruct->packetTime;
792 burst_remaining = burst_size;
793 // perform write
794 n = writen(mySocket, mSettings->mBuf, sizeof(struct TCP_burst_payload));
795 WARN(n != sizeof(struct TCP_burst_payload), "burst hdr write failed");
796 burst_remaining -= n;
797 reportstruct->packetLen -= n;
798 // thread_debug("***write burst header %d id=%d", burst_size, (burst_id - 1));
799 } else if (reportstruct->packetLen > burst_remaining) {
800 reportstruct->packetLen = burst_remaining;
801 }
802 }
803 int len = write(mySocket, mSettings->mBuf, reportstruct->packetLen);
804 if (len < 0) {
805 if (NONFATALTCPWRITERR(errno)) {
806 reportstruct->errwrite=WriteErrAccount;
807 } else if (FATALTCPWRITERR(errno)) {
808 reportstruct->errwrite=WriteErrFatal;
809 WARN_errno(1, "write");
810 fatalwrite_err = 1;
811 break;
812 } else {
813 reportstruct->errwrite=WriteErrNoAccount;
814 }
815 len = 0;
816 } else {
817 // Consume tokens per the transmit
818 tokens -= (len + n);
819 totLen += (len + n);;
820 reportstruct->errwrite=WriteNoErr;
821 }
822 if (isTripTime(mSettings))
823 burst_remaining -= len;
824
825 time2.setnow();
826 reportstruct->packetLen = len + n;
827 reportstruct->packetTime.tv_sec = time2.getSecs();
828 reportstruct->packetTime.tv_usec = time2.getUsecs();
829 reportstruct->sentTime = reportstruct->packetTime;
830 if (isModeAmount(mSettings)) {
831 /* mAmount may be unsigned, so don't let it underflow! */
832 if (mSettings->mAmount >= static_cast<unsigned long>(reportstruct->packetLen)) {
833 mSettings->mAmount -= static_cast<unsigned long>(reportstruct->packetLen);
834 } else {
835 mSettings->mAmount = 0;
836 }
837 }
838 if (!one_report) {
839 myReportPacket();
840 }
841 } else {
842 // Use a 4 usec delay to fill tokens
843 delay_loop(4);
844 }
845 }
846 FinishTrafficActions();
847 }
848
849 #if HAVE_DECL_TCP_NOTSENT_LOWAT
AwaitWriteSelectEventTCP(void)850 inline bool Client::AwaitWriteSelectEventTCP (void) {
851 int rc;
852 struct timeval timeout;
853 fd_set writeset;
854 FD_ZERO(&writeset);
855 FD_SET(mySocket, &writeset);
856 if (isModeTime(mSettings)) {
857 Timestamp write_event_timeout(0,0);
858 if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) {
859 write_event_timeout.add((double) mSettings->mInterval / 1e6 * 2.0);
860 } else {
861 write_event_timeout.add((double) mSettings->mAmount / 1e2 * 4.0);
862 }
863 timeout.tv_sec = write_event_timeout.getSecs();
864 timeout.tv_usec = write_event_timeout.getUsecs();
865 } else {
866 timeout.tv_sec = 10; // longest is 10 seconds
867 timeout.tv_usec = 0;
868 }
869
870 Timestamp t1;
871 if ((rc = select(mySocket + 1, NULL, &writeset, NULL, &timeout)) <= 0) {
872 reportstruct->select_delay = -1;
873 WARN_errno((rc < 0), "select");
874 #ifdef HAVE_THREAD_DEBUG
875 if (rc == 0)
876 thread_debug("AwaitWrite timeout");
877 #endif
878 return false;
879 }
880 Timestamp t2;
881 reportstruct->select_delay = t2.subSec(t1);
882 // printf("*****t1 = %f\n", t2.subSec(t1));
883 return true;
884 }
885
RunWriteEventsTCP()886 void Client::RunWriteEventsTCP () {
887 int burst_id = 0;
888 int writelen = mSettings->mBufLen;
889
890 now.setnow();
891 reportstruct->packetTime.tv_sec = now.getSecs();
892 reportstruct->packetTime.tv_usec = now.getUsecs();
893 while (InProgress()) {
894 if (isModeAmount(mSettings)) {
895 writelen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
896 }
897 now.setnow();
898 bool rc = AwaitWriteSelectEventTCP();
899 reportstruct->emptyreport = (rc == false) ? 1 : 0;
900 if (rc) {
901 now.setnow();
902 reportstruct->packetTime.tv_sec = now.getSecs();
903 reportstruct->packetTime.tv_usec = now.getUsecs();
904 WriteTcpTxHdr(reportstruct, writelen, ++burst_id);
905 reportstruct->sentTime = reportstruct->packetTime;
906 reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen);
907 if (reportstruct->packetLen <= 0) {
908 WARN_errno((reportstruct->packetLen < 0), "event writen()");
909 if (reportstruct->packetLen == 0) {
910 peerclose = true;
911 }
912 reportstruct->packetLen = 0;
913 reportstruct->emptyreport = 1;
914 }
915 }
916 if (isModeAmount(mSettings) && !reportstruct->emptyreport) {
917 /* mAmount may be unsigned, so don't let it underflow! */
918 if (mSettings->mAmount >= static_cast<unsigned long>(reportstruct->packetLen)) {
919 mSettings->mAmount -= static_cast<unsigned long>(reportstruct->packetLen);
920 } else {
921 mSettings->mAmount = 0;
922 }
923 }
924 if (!one_report) {
925 myReportPacket();
926 }
927 }
928 FinishTrafficActions();
929 }
930 #endif
931 /*
932 * UDP send loop
933 */
get_delay_target()934 double Client::get_delay_target () {
935 double delay_target;
936 if (isIPG(mSettings)) {
937 delay_target = mSettings->mBurstIPG * 1000000; // convert from milliseconds to nanoseconds
938 } else {
939 // compute delay target in units of nanoseconds
940 if (mSettings->mAppRateUnits == kRate_BW) {
941 // compute delay for bandwidth restriction, constrained to [0,1] seconds
942 delay_target = (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits)
943 / mSettings->mAppRate));
944 } else {
945 delay_target = 1e9 / mSettings->mAppRate;
946 }
947 }
948 return delay_target;
949 }
950
RunUDP()951 void Client::RunUDP () {
952 struct UDP_datagram* mBuf_UDP = reinterpret_cast<struct UDP_datagram*>(mSettings->mBuf);
953 int currLen;
954
955 double delay_target = get_delay_target();
956 double delay = 0;
957 double adjust = 0;
958
959 // Set this to > 0 so first loop iteration will delay the IPG
960 currLen = 1;
961 double variance = mSettings->mVariance;
962 if (apply_first_udppkt_delay && (delay_target > 100000)) {
963 //the case when a UDP first packet went out in SendFirstPayload
964 delay_loop(static_cast<unsigned long>(delay_target / 1000));
965 }
966
967 while (InProgress()) {
968 // Test case: drop 17 packets and send 2 out-of-order:
969 // sequence 51, 52, 70, 53, 54, 71, 72
970 //switch(datagramID) {
971 // case 53: datagramID = 70; break;
972 // case 71: datagramID = 53; break;
973 // case 55: datagramID = 71; break;
974 // default: break;
975 //}
976 now.setnow();
977 reportstruct->packetTime.tv_sec = now.getSecs();
978 reportstruct->packetTime.tv_usec = now.getUsecs();
979 reportstruct->sentTime = reportstruct->packetTime;
980 if (isVaryLoad(mSettings) && mSettings->mAppRateUnits == kRate_BW) {
981 static Timestamp time3;
982 if (now.subSec(time3) >= VARYLOAD_PERIOD) {
983 long var_rate = lognormal(mSettings->mAppRate,variance);
984 if (var_rate < 0)
985 var_rate = 0;
986 delay_target = (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits) / var_rate));
987 time3 = now;
988 }
989 }
990 // store datagram ID into buffer
991 WritePacketID(reportstruct->packetID);
992 mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec);
993 mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec);
994
995 // Adjustment for the running delay
996 // o measure how long the last loop iteration took
997 // o calculate the delay adjust
998 // - If write succeeded, adjust = target IPG - the loop time
999 // - If write failed, adjust = the loop time
1000 // o then adjust the overall running delay
1001 // Note: adjust units are nanoseconds,
1002 // packet timestamps are microseconds
1003 if (currLen > 0)
1004 adjust = delay_target + \
1005 (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime));
1006 else
1007 adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime);
1008
1009 lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec);
1010 // Since linux nanosleep/busyloop can exceed delay
1011 // there are two possible equilibriums
1012 // 1) Try to perserve inter packet gap
1013 // 2) Try to perserve requested transmit rate
1014 // The latter seems preferred, hence use a running delay
1015 // that spans the life of the thread and constantly adjust.
1016 // A negative delay means the iperf app is behind.
1017 delay += adjust;
1018 // Don't let delay grow unbounded
1019 if (delay < delay_lower_bounds) {
1020 delay = delay_target;
1021 }
1022
1023 reportstruct->errwrite = WriteNoErr;
1024 reportstruct->emptyreport = 0;
1025 // perform write
1026 if (isModeAmount(mSettings)) {
1027 currLen = write(mySocket, mSettings->mBuf, (mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
1028 } else {
1029 currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
1030 }
1031 if (currLen < 0) {
1032 reportstruct->packetID--;
1033 if (FATALUDPWRITERR(errno)) {
1034 reportstruct->errwrite = WriteErrFatal;
1035 WARN_errno(1, "write");
1036 break;
1037 } else {
1038 reportstruct->errwrite = WriteErrAccount;
1039 currLen = 0;
1040 }
1041 reportstruct->emptyreport = 1;
1042 }
1043
1044 if (isModeAmount(mSettings)) {
1045 /* mAmount may be unsigned, so don't let it underflow! */
1046 if (mSettings->mAmount >= static_cast<unsigned long>(currLen)) {
1047 mSettings->mAmount -= static_cast<unsigned long>(currLen);
1048 } else {
1049 mSettings->mAmount = 0;
1050 }
1051 }
1052
1053 // report packets
1054 reportstruct->packetLen = static_cast<unsigned long>(currLen);
1055 reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime;
1056 myReportPacket();
1057 reportstruct->packetID++;
1058 myReport->info.ts.prevpacketTime = reportstruct->packetTime;
1059 // Insert delay here only if the running delay is greater than 100 usec,
1060 // otherwise don't delay and immediately continue with the next tx.
1061 if (delay >= 100000) {
1062 // Convert from nanoseconds to microseconds
1063 // and invoke the microsecond delay
1064 delay_loop(static_cast<unsigned long>(delay / 1000));
1065 }
1066 }
1067 FinishTrafficActions();
1068 }
1069
1070 /*
1071 * UDP isochronous send loop
1072 */
RunUDPIsochronous()1073 void Client::RunUDPIsochronous () {
1074 struct UDP_datagram* mBuf_UDP = reinterpret_cast<struct UDP_datagram*>(mSettings->mBuf);
1075 // skip over the UDP datagram (seq no, timestamp) to reach the isoch fields
1076 struct client_udp_testhdr *udp_payload = reinterpret_cast<client_udp_testhdr *>(mSettings->mBuf);
1077
1078 double delay_target = mSettings->mBurstIPG * 1000000; // convert from milliseconds to nanoseconds
1079 double delay = 0;
1080 double adjust = 0;
1081 int currLen = 1;
1082 int frameid=0;
1083 Timestamp t1;
1084
1085 // make sure the packet can carry the isoch payload
1086 if (!framecounter) {
1087 framecounter = new Isochronous::FrameCounter(mSettings->mFPS);
1088 }
1089 udp_payload->isoch.burstperiod = htonl(framecounter->period_us());
1090
1091 int initdone = 0;
1092 int fatalwrite_err = 0;
1093 while (InProgress() && !fatalwrite_err) {
1094 int bytecnt = static_cast<int>(lognormal(mSettings->mMean,mSettings->mVariance)) / (mSettings->mFPS * 8);
1095 if (bytecnt < udp_payload_minimum)
1096 bytecnt = udp_payload_minimum;
1097 delay = 0;
1098
1099 // printf("bits=%d\n", (int) (mSettings->mFPS * bytecnt * 8));
1100 udp_payload->isoch.burstsize = htonl(bytecnt);
1101 udp_payload->isoch.prevframeid = htonl(frameid);
1102 reportstruct->burstsize=bytecnt;
1103 frameid = framecounter->wait_tick();
1104 udp_payload->isoch.frameid = htonl(frameid);
1105 lastPacketTime.setnow();
1106 if (!initdone) {
1107 initdone = 1;
1108 udp_payload->isoch.start_tv_sec = htonl(framecounter->getSecs());
1109 udp_payload->isoch.start_tv_usec = htonl(framecounter->getUsecs());
1110 }
1111 while ((bytecnt > 0) && InProgress()) {
1112 t1.setnow();
1113 reportstruct->packetTime.tv_sec = t1.getSecs();
1114 reportstruct->packetTime.tv_usec = t1.getUsecs();
1115 reportstruct->sentTime = reportstruct->packetTime;
1116 mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec);
1117 mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec);
1118 WritePacketID(reportstruct->packetID);
1119
1120 // Adjustment for the running delay
1121 // o measure how long the last loop iteration took
1122 // o calculate the delay adjust
1123 // - If write succeeded, adjust = target IPG - the loop time
1124 // - If write failed, adjust = the loop time
1125 // o then adjust the overall running delay
1126 // Note: adjust units are nanoseconds,
1127 // packet timestamps are microseconds
1128 if (currLen > 0)
1129 adjust = delay_target + \
1130 (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime));
1131 else
1132 adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime);
1133
1134 lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec);
1135 // Since linux nanosleep/busyloop can exceed delay
1136 // there are two possible equilibriums
1137 // 1) Try to perserve inter packet gap
1138 // 2) Try to perserve requested transmit rate
1139 // The latter seems preferred, hence use a running delay
1140 // that spans the life of the thread and constantly adjust.
1141 // A negative delay means the iperf app is behind.
1142 delay += adjust;
1143 // Don't let delay grow unbounded
1144 // if (delay < delay_lower_bounds) {
1145 // delay = delay_target;
1146 // }
1147
1148 reportstruct->errwrite = WriteNoErr;
1149 reportstruct->emptyreport = 0;
1150
1151 // perform write
1152 if (isModeAmount(mSettings) && (mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen))) {
1153 udp_payload->isoch.remaining = htonl(mSettings->mAmount);
1154 reportstruct->remaining=mSettings->mAmount;
1155 currLen = write(mySocket, mSettings->mBuf, mSettings->mAmount);
1156 } else {
1157 udp_payload->isoch.remaining = htonl(bytecnt);
1158 reportstruct->remaining=bytecnt;
1159 currLen = write(mySocket, mSettings->mBuf, (bytecnt < mSettings->mBufLen) ? bytecnt : mSettings->mBufLen);
1160 }
1161
1162 if (currLen < 0) {
1163 reportstruct->packetID--;
1164 reportstruct->emptyreport = 1;
1165 currLen = 0;
1166 if (FATALUDPWRITERR(errno)) {
1167 reportstruct->errwrite = WriteErrFatal;
1168 WARN_errno(1, "write");
1169 fatalwrite_err = 1;
1170 } else {
1171 reportstruct->errwrite = WriteErrAccount;
1172 }
1173 } else {
1174 bytecnt -= currLen;
1175 if (!bytecnt)
1176 reportstruct->transit_ready = 1;
1177 else
1178 reportstruct->transit_ready = 0;
1179 // adjust bytecnt so last packet of burst is greater or equal to min packet
1180 if ((bytecnt > 0) && (bytecnt < udp_payload_minimum)) {
1181 bytecnt = udp_payload_minimum;
1182 udp_payload->isoch.burstsize = htonl(bytecnt);
1183 reportstruct->burstsize=bytecnt;
1184 }
1185 }
1186 if (isModeAmount(mSettings)) {
1187 /* mAmount may be unsigned, so don't let it underflow! */
1188 if (mSettings->mAmount >= static_cast<unsigned long>(currLen)) {
1189 mSettings->mAmount -= static_cast<unsigned long>(currLen);
1190 } else {
1191 mSettings->mAmount = 0;
1192 }
1193 }
1194 // report packets
1195
1196 reportstruct->frameID=frameid;
1197 reportstruct->packetLen = static_cast<unsigned long>(currLen);
1198 reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime;
1199 myReportPacket();
1200 reportstruct->packetID++;
1201 myReport->info.ts.prevpacketTime = reportstruct->packetTime;
1202 // Insert delay here only if the running delay is greater than 1 usec,
1203 // otherwise don't delay and immediately continue with the next tx.
1204 if (delay >= 1000) {
1205 // Convert from nanoseconds to microseconds
1206 // and invoke the microsecond delay
1207 delay_loop(static_cast<unsigned long>(delay / 1000));
1208 }
1209 }
1210 }
1211 FinishTrafficActions();
1212 }
1213 // end RunUDPIsoch
1214
WritePacketID(intmax_t packetID)1215 inline void Client::WritePacketID (intmax_t packetID) {
1216 struct UDP_datagram * mBuf_UDP = reinterpret_cast<struct UDP_datagram *>(mSettings->mBuf);
1217 // store datagram ID into buffer
1218 #ifdef HAVE_INT64_T
1219 // Pack signed 64bit packetID into unsigned 32bit id1 + unsigned
1220 // 32bit id2. A legacy server reading only id1 will still be able
1221 // to reconstruct a valid signed packet ID number up to 2^31.
1222 uint32_t id1, id2;
1223 id1 = packetID & 0xFFFFFFFFLL;
1224 id2 = (packetID & 0xFFFFFFFF00000000LL) >> 32;
1225
1226 mBuf_UDP->id = htonl(id1);
1227 mBuf_UDP->id2 = htonl(id2);
1228
1229 #ifdef HAVE_PACKET_DEBUG
1230 printf("id %" PRIdMAX " (0x%" PRIxMAX ") -> 0x%x, 0x%x\n",
1231 packetID, packetID, id1, id2);
1232 #endif
1233 #else
1234 mBuf_UDP->id = htonl((reportstruct->packetID));
1235 #endif
1236 }
1237
WriteTcpTxHdr(struct ReportStruct * reportstruct,int burst_size,int burst_id)1238 inline void Client::WriteTcpTxHdr (struct ReportStruct *reportstruct, int burst_size, int burst_id) {
1239 struct TCP_burst_payload * mBuf_burst = reinterpret_cast<struct TCP_burst_payload *>(mSettings->mBuf);
1240 // store packet ID into buffer
1241 reportstruct->packetID += burst_size;
1242 mBuf_burst->start_tv_sec = htonl(myReport->info.ts.startTime.tv_sec);
1243 mBuf_burst->start_tv_usec = htonl(myReport->info.ts.startTime.tv_usec);
1244
1245 #ifdef HAVE_INT64_T
1246 // Pack signed 64bit packetID into unsigned 32bit id1 + unsigned
1247 // 32bit id2. A legacy server reading only id1 will still be able
1248 // to reconstruct a valid signed packet ID number up to 2^31.
1249 uint32_t id1, id2;
1250 id1 = reportstruct->packetID & 0xFFFFFFFFLL;
1251 id2 = (reportstruct->packetID & 0xFFFFFFFF00000000LL) >> 32;
1252
1253 mBuf_burst->seqno_lower = htonl(id1);
1254 mBuf_burst->seqno_upper = htonl(id2);
1255
1256 #ifdef HAVE_PACKET_DEBUG
1257 printf("id %" PRIdMAX " (0x%" PRIxMAX ") -> 0x%x, 0x%x\n",
1258 reportstruct->packetID, reportstruct->packetID, id1, id2);
1259 #endif
1260 #else
1261 mBuf_burst->seqno_lower = htonl((reportstruct->packetID));
1262 mBuf_burst->seqno_upper = htonl(0x0);
1263 #endif
1264 mBuf_burst->send_tt.write_tv_sec = htonl(reportstruct->packetTime.tv_sec);
1265 mBuf_burst->send_tt.write_tv_usec = htonl(reportstruct->packetTime.tv_usec);
1266 mBuf_burst->burst_id = htonl((uint32_t)burst_id);
1267 mBuf_burst->burst_size = htonl((uint32_t)burst_size);
1268 mBuf_burst->burst_period_s = htonl(0x0);
1269 mBuf_burst->burst_period_us = htonl(0x0);
1270 reportstruct->frameID=burst_id;
1271 reportstruct->burstsize=burst_size;
1272 // printf("**** Write tcp burst header size= %d id = %d\n", burst_size, burst_id);
1273 }
1274
InProgress()1275 inline bool Client::InProgress () {
1276 // Read the next data block from
1277 // the file if it's file input
1278 if (isFileInput(mSettings)) {
1279 Extractor_getNextDataBlock(readAt, mSettings);
1280 return Extractor_canRead(mSettings) != 0;
1281 }
1282 return !(sInterupted || peerclose || \
1283 (isModeTime(mSettings) && mEndTime.before(reportstruct->packetTime)) ||
1284 (isModeAmount(mSettings) && (mSettings->mAmount <= 0)));
1285 }
1286
1287 /*
1288 * Common things to do to finish a traffic thread
1289 *
1290 * Notes on the negative packet count or seq no:
1291 * A negative packet id is used to tell the server
1292 * this UDP stream is terminating. The server will remove
1293 * the sign. So a decrement will be seen as increments by
1294 * the server (e.g, -1000, -1001, -1002 as 1000, 1001, 1002)
1295 * If the retries weren't decrement here the server can get out
1296 * of order packets per these retries actually being received
1297 * by the server (e.g. -1000, -1000, -1000)
1298 */
FinishTrafficActions()1299 void Client::FinishTrafficActions () {
1300 disarm_itimer();
1301 // Shutdown the TCP socket's writes as the event for the server to end its traffic loop
1302 if (!isUDP(mSettings)) {
1303 if ((mySocket != INVALID_SOCKET) && isConnected()) {
1304 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
1305 // gettcpistats(myReport, true, NULL);
1306 #endif
1307 int rc = shutdown(mySocket, SHUT_WR);
1308 #ifdef HAVE_THREAD_DEBUG
1309 thread_debug("Client calls shutdown() SHUTW_WR on tcp socket %d", mySocket);
1310 #endif
1311 WARN_errno(rc == SOCKET_ERROR, "shutdown");
1312 if (!rc && !isFullDuplex(mSettings))
1313 AwaitServerCloseEvent();
1314 }
1315 now.setnow();
1316 reportstruct->packetTime.tv_sec = now.getSecs();
1317 reportstruct->packetTime.tv_usec = now.getUsecs();
1318 if (one_report) {
1319 /*
1320 * For TCP and if not doing interval or enhanced reporting (needed for write accounting),
1321 * then report the entire transfer as one big packet
1322 *
1323 */
1324 reportstruct->packetLen = totLen;
1325 }
1326 } else {
1327 // stop timing
1328 now.setnow();
1329 reportstruct->packetTime.tv_sec = now.getSecs();
1330 reportstruct->packetTime.tv_usec = now.getUsecs();
1331 reportstruct->sentTime = reportstruct->packetTime;
1332 // send a final terminating datagram
1333 // Don't count in the mTotalLen. The server counts this one,
1334 // but didn't count our first datagram, so we're even now.
1335 // The negative datagram ID signifies termination to the server.
1336 WritePacketID(-reportstruct->packetID);
1337 struct UDP_datagram * mBuf_UDP = reinterpret_cast<struct UDP_datagram *>(mSettings->mBuf);
1338 mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec);
1339 mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec);
1340 int len = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
1341 #ifdef HAVE_THREAD_DEBUG
1342 thread_debug("UDP client sent final packet per negative seqno %ld", -reportstruct->packetID);
1343 #endif
1344 if (len > 0) {
1345 reportstruct->packetLen = len;
1346 myReportPacket();
1347 }
1348 reportstruct->packetLen = 0;
1349 }
1350 int do_close = EndJob(myJob, reportstruct);
1351 if (isUDP(mSettings) && !isMulticast(mSettings) && !isNoUDPfin(mSettings)) {
1352 /*
1353 * For UDP, there is a final handshake between the client and the server,
1354 * do that now (unless requested no to)
1355 */
1356 AwaitServerFinPacket();
1357 }
1358 if (do_close) {
1359 #if HAVE_THREAD_DEBUG
1360 thread_debug("client close sock=%d", mySocket);
1361 #endif
1362 int rc = close(mySocket);
1363 WARN_errno(rc == SOCKET_ERROR, "client close");
1364 }
1365 Iperf_remove_host(mSettings);
1366 FreeReport(myJob);
1367 if (framecounter)
1368 DELETE_PTR(framecounter);
1369 }
1370
1371 /* -------------------------------------------------------------------
1372 * Await for the server's fin packet which also has the server
1373 * stats to displayed on the client. Attempt to re-transmit
1374 * until the fin is received
1375 * ------------------------------------------------------------------- */
1376 #define RETRYTIMER 10000 //units of us
1377 #define RETRYCOUNT (2 * 1000000 / RETRYTIMER) // 2 seconds worth of retries
AwaitServerFinPacket()1378 void Client::AwaitServerFinPacket () {
1379 int rc;
1380 fd_set readSet;
1381 struct timeval timeout;
1382 int ack_success = 0;
1383 int count = RETRYCOUNT;
1384 while (--count >= 0) {
1385 // wait until the socket is readable, or our timeout expires
1386 FD_ZERO(&readSet);
1387 FD_SET(mySocket, &readSet);
1388 timeout.tv_sec = 0;
1389 timeout.tv_usec = RETRYTIMER;
1390 rc = select(mySocket+1, &readSet, NULL, NULL, &timeout);
1391 FAIL_errno(rc == SOCKET_ERROR, "select", mSettings);
1392 // rc= zero means select's read timed out
1393 if (rc == 0) {
1394 // try to trigger another FIN by resending a negative seq no
1395 WritePacketID(-(++reportstruct->packetID));
1396 // write data
1397 rc = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
1398 WARN_errno(rc < 0, "write-fin");
1399 #ifdef HAVE_THREAD_DEBUG
1400 thread_debug("UDP client retransmit final packet per negative seqno %ld", -reportstruct->packetID);
1401 #endif
1402 } else {
1403 // socket ready to read, this packet size
1404 // is set by the server. Assume it's large enough
1405 // to contain the final server packet
1406 rc = read(mySocket, mSettings->mBuf, MAXUDPBUF);
1407
1408 // dump any 2.0.13 client acks sent at the start of traffic
1409 if (rc == sizeof(client_hdr_ack)) {
1410 struct client_hdr_ack *ack = reinterpret_cast<struct client_hdr_ack *>(mSettings->mBuf);
1411 if (ntohl(ack->typelen.type) == CLIENTHDRACK) {
1412 // printf("**** dump stale ack \n");
1413 continue;
1414 }
1415 }
1416
1417 WARN_errno(rc < 0, "read");
1418 if (rc > 0) {
1419 ack_success = 1;
1420 #ifdef HAVE_THREAD_DEBUG
1421 thread_debug("UDP client received server relay report ack (%d)", -reportstruct->packetID);
1422 #endif
1423 if (mSettings->mReportMode != kReport_CSV) {
1424 PostReport(InitServerRelayUDPReport(mSettings, reinterpret_cast<server_hdr*>(reinterpret_cast<UDP_datagram*>(mSettings->mBuf) + 1)));
1425 }
1426 break;
1427 }
1428 }
1429 }
1430 if ((!ack_success) && (mSettings->mReportMode != kReport_CSV))
1431 fprintf(stderr, warn_no_ack, mySocket, (isModeTime(mSettings) ? 10 : 1));
1432 }
1433
1434
PostNullEvent()1435 void Client::PostNullEvent () {
1436 assert(myReport!=NULL);
1437 // push a nonevent into the packet ring
1438 // this will cause the reporter to process
1439 // up to this event
1440 memset(reportstruct, 0, sizeof(struct ReportStruct));
1441 now.setnow();
1442 reportstruct->packetTime.tv_sec = now.getSecs();
1443 reportstruct->packetTime.tv_usec = now.getUsecs();
1444 reportstruct->emptyreport=1;
1445 myReportPacket();
1446 }
1447
1448 // The client end timer is based upon the final fin, fin-ack w/the server
1449 // A way to detect this is to hang a recv and wait for the zero byte
1450 // return indicating the socket is closed for recv per the server
1451 // closing it's socket
1452 #define MINAWAITCLOSEUSECS 2000000
AwaitServerCloseEvent()1453 void Client::AwaitServerCloseEvent () {
1454 // the await detection can take awhile so post a non event ahead of it
1455 PostNullEvent();
1456 unsigned int amount_usec = \
1457 (isModeTime(mSettings) ? static_cast<int>(mSettings->mAmount * 10000) : MINAWAITCLOSEUSECS);
1458 if (amount_usec < MINAWAITCLOSEUSECS)
1459 amount_usec = MINAWAITCLOSEUSECS;
1460 SetSocketOptionsReceiveTimeout(mSettings, amount_usec);
1461 int rc;
1462 while ((rc = recv(mySocket, mSettings->mBuf, mSettings->mBufLen, 0) > 0)) {};
1463 if (rc < 0)
1464 WARN_errno(1, "client await server close");
1465 #ifdef HAVE_THREAD_DEBUG
1466 if (rc==0)
1467 thread_debug("Client detected server close %d", mySocket);
1468 #endif
1469 }
1470
SendFirstPayload()1471 int Client::SendFirstPayload () {
1472 int pktlen = 0;
1473 if (!isConnectOnly(mSettings)) {
1474 if (myReport && !TimeZero(myReport->info.ts.startTime) && !(mSettings->mMode == kTest_TradeOff)) {
1475 reportstruct->packetTime = myReport->info.ts.startTime;
1476 } else {
1477 now.setnow();
1478 reportstruct->packetTime.tv_sec = now.getSecs();
1479 reportstruct->packetTime.tv_usec = now.getUsecs();
1480 }
1481 if (isTxStartTime(mSettings)) {
1482 pktlen = Settings_GenerateClientHdr(mSettings, (void *) mSettings->mBuf, mSettings->txstart_epoch);
1483 } else {
1484 pktlen = Settings_GenerateClientHdr(mSettings, (void *) mSettings->mBuf, reportstruct->packetTime);
1485 }
1486 if (pktlen > 0) {
1487 if (isUDP(mSettings)) {
1488 struct client_udp_testhdr *tmphdr = reinterpret_cast<struct client_udp_testhdr *>(mSettings->mBuf);
1489 WritePacketID(reportstruct->packetID);
1490 tmphdr->seqno_ts.tv_sec = htonl(reportstruct->packetTime.tv_sec);
1491 tmphdr->seqno_ts.tv_usec = htonl(reportstruct->packetTime.tv_usec);
1492 udp_payload_minimum = pktlen;
1493 #if HAVE_DECL_MSG_DONTWAIT
1494 pktlen = send(mySocket, mSettings->mBuf, (pktlen > mSettings->mBufLen) ? pktlen : mSettings->mBufLen, MSG_DONTWAIT);
1495 #else
1496 pktlen = send(mySocket, mSettings->mBuf, (pktlen > mSettings->mBufLen) ? pktlen : mSettings->mBufLen, 0);
1497 #endif
1498 apply_first_udppkt_delay = true;
1499 } else {
1500 #if HAVE_DECL_TCP_NODELAY
1501 if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) {
1502 int optflag=1;
1503 int rc;
1504 // Disable Nagle to reduce latency of this intial message
1505 if ((rc = setsockopt(mSettings->mSock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&optflag), sizeof(int))) < 0) {
1506 WARN_errno(rc < 0, "tcpnodelay");
1507 }
1508 }
1509 #endif
1510 #if HAVE_DECL_MSG_DONTWAIT
1511 pktlen = send(mySocket, mSettings->mBuf, pktlen, MSG_DONTWAIT);
1512 #else
1513 pktlen = send(mySocket, mSettings->mBuf, pktlen, 0);
1514 #endif
1515 if (isPeerVerDetect(mSettings) && !isServerReverse(mSettings)) {
1516 PeerXchange();
1517 }
1518 #if HAVE_DECL_TCP_NODELAY
1519 if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) {
1520 int optflag=0;
1521 int rc;
1522 // Disable Nagle to reduce latency of this intial message
1523 if ((rc = setsockopt(mSettings->mSock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&optflag), sizeof(int))) < 0) {
1524 WARN_errno(rc < 0, "tcpnodelay");
1525 }
1526 }
1527 #endif
1528 }
1529 WARN_errno(pktlen < 0, "send_hdr");
1530 }
1531 }
1532 return pktlen;
1533 }
1534
PeerXchange()1535 void Client::PeerXchange () {
1536 int n;
1537 client_hdr_ack ack;
1538 /*
1539 * Hang read and see if this is a header ack message
1540 */
1541 int readlen = isTripTime(mSettings) ? sizeof(struct client_hdr_ack) : (sizeof(struct client_hdr_ack) - sizeof(struct client_hdr_ack_ts));
1542 if ((n = recvn(mySocket, reinterpret_cast<char *>(&ack), readlen, 0)) == readlen) {
1543 if (ntohl(ack.typelen.type) == CLIENTHDRACK) {
1544 mSettings->peer_version_u = ntohl(ack.version_u);
1545 mSettings->peer_version_l = ntohl(ack.version_l);
1546 if (isTripTime(mSettings)) {
1547 Timestamp now;
1548 Timestamp senttx(ntohl(ack.ts.sent_tv_sec), ntohl(ack.ts.sent_tv_usec));
1549 Timestamp sentrx(ntohl(ack.ts.sentrx_tv_sec), ntohl(ack.ts.sentrx_tv_usec));
1550 Timestamp acktx(ntohl(ack.ts.ack_tv_sec), ntohl(ack.ts.ack_tv_usec));
1551 Timestamp ackrx(now.getSecs(), now.getUsecs());
1552 double str = (sentrx.get() - senttx.get()) * 1e3;
1553 double atr = (now.get() - acktx.get()) * 1e3;
1554 double rtt = str + atr;
1555 double halfrtt = rtt / 2.0;
1556 fprintf(stderr,"%sClock sync check (ms): RTT/Half=(%0.3f/%0.3f) OWD-send/ack/asym=(%0.3f/%0.3f/%0.3f)\n",mSettings->mTransferIDStr, rtt, halfrtt, str, atr, abs(str-atr));
1557 }
1558 }
1559 } else {
1560 WARN_errno(1, "recvack");
1561 }
1562 }
1563
1564 /*
1565 * BarrierClient allows for multiple stream clients to be syncronized
1566 */
BarrierClient(struct BarrierMutex * barrier)1567 int Client::BarrierClient (struct BarrierMutex *barrier) {
1568 int last = 0;
1569 #ifdef HAVE_THREAD
1570 assert(barrier != NULL);
1571 Condition_Lock(barrier->await);
1572 if (--barrier->count <= 0) {
1573 // store the barrier release timer
1574 #ifdef HAVE_CLOCK_GETTIME
1575 struct timespec t1;
1576 clock_gettime(CLOCK_REALTIME, &t1);
1577 barrier->release_time.tv_sec = t1.tv_sec;
1578 barrier->release_time.tv_usec = t1.tv_nsec / 1000;
1579 #else
1580 gettimeofday(&barrier->release_time, NULL);
1581 #endif
1582 last = 1;
1583 // last one wake's up everyone else
1584 Condition_Broadcast(&barrier->await);
1585 #ifdef HAVE_THREAD_DEBUG
1586 thread_debug("Barrier BROADCAST on condition %p", (void *)&barrier->await);
1587 #endif
1588 } else {
1589 #ifdef HAVE_THREAD_DEBUG
1590 thread_debug("Barrier WAIT on condition %p count=%d", (void *)&barrier->await, barrier->count);
1591 #endif
1592 Condition_Wait(&barrier->await);
1593 }
1594 Condition_Unlock(barrier->await);
1595 #ifdef HAVE_THREAD_DEBUG
1596 thread_debug("Barrier EXIT on condition %p", (void *)&barrier->await);
1597 #endif
1598 #else
1599 last = 1;
1600 #endif // HAVE_THREAD
1601 return last;
1602 }
1603