1 /*---------------------------------------------------------------
2  * Copyright (c) 1999,2000,2001,2002,2003
3  * The Board of Trustees of the University of Illinois
4  * All Rights Reserved.
5  *---------------------------------------------------------------
6  * Permission is hereby granted, free of charge, to any person
7  * obtaining a copy of this software (Iperf) and associated
8  * documentation files (the "Software"), to deal in the Software
9  * without restriction, including without limitation the
10  * rights to use, copy, modify, merge, publish, distribute,
11  * sublicense, and/or sell copies of the Software, and to permit
12  * persons to whom the Software is furnished to do
13  * so, subject to the following conditions:
14  *
15  *
16  * Redistributions of source code must retain the above
17  * copyright notice, this list of conditions and
18  * the following disclaimers.
19  *
20  *
21  * Redistributions in binary form must reproduce the above
22  * copyright notice, this list of conditions and the following
23  * disclaimers in the documentation and/or other materials
24  * provided with the distribution.
25  *
26  *
27  * Neither the names of the University of Illinois, NCSA,
28  * nor the names of its contributors may be used to endorse
29  * or promote products derived from this Software without
30  * specific prior written permission.
31  *
32  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
33  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
34  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
35  * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT
36  * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
37  * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
38  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
39  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
40  * ________________________________________________________________
41  * National Laboratory for Applied Network Research
42  * National Center for Supercomputing Applications
43  * University of Illinois at Urbana-Champaign
44  * http://www.ncsa.uiuc.edu
45  * ________________________________________________________________
46  *
47  * Client.cpp
48  * by Mark Gates <mgates@nlanr.net>
49  * -------------------------------------------------------------------
50  * A client thread initiates a connect to the server and handles
51  * sending and receiving data, then closes the socket.
52  * ------------------------------------------------------------------- */
53 #include <ctime>
54 #include <cmath>
55 #include "headers.h"
56 #include "Client.hpp"
57 #include "Thread.h"
58 #include "SocketAddr.h"
59 #include "PerfSocket.hpp"
60 #include "Extractor.h"
61 #include "delay.h"
62 #include "util.h"
63 #include "Locale.h"
64 #include "isochronous.hpp"
65 #include "pdfs.h"
66 #include "version.h"
67 #include "payloads.h"
68 #include "active_hosts.h"
69 
70 // const double kSecs_to_usecs = 1e6;
71 const double kSecs_to_nsecs = 1e9;
72 const int    kBytes_to_Bits = 8;
73 
74 #define VARYLOAD_PERIOD 0.1 // recompute the variable load every n seconds
75 #define MAXUDPBUF 1470
76 
Client(thread_Settings * inSettings)77 Client::Client (thread_Settings *inSettings) {
78 #ifdef HAVE_THREAD_DEBUG
79   thread_debug("Client constructor with thread %p sum=%p (flags=%x)", (void *) inSettings, (void *)inSettings->mSumReport, inSettings->flags);
80 #endif
81     mSettings = inSettings;
82     myJob = NULL;
83     myReport = NULL;
84     framecounter = NULL;
85     one_report = false;
86     udp_payload_minimum = 1;
87     apply_first_udppkt_delay = false;
88 
89     memset(&scratchpad, 0, sizeof(struct ReportStruct));
90     reportstruct = &scratchpad;
91     reportstruct->packetID = 1;
92     mySocket = isServerReverse(mSettings) ? mSettings->mSock : INVALID_SOCKET;
93     connected = isServerReverse(mSettings);
94     if (isCompat(mSettings) && isPeerVerDetect(mSettings)) {
95 	fprintf(stderr, "%s", warn_compat_and_peer_exchange);
96 	unsetPeerVerDetect(mSettings);
97     }
98 
99     pattern(mSettings->mBuf, mSettings->mBufLen);
100     if (isFileInput(mSettings)) {
101         if (!isSTDIN(mSettings))
102             Extractor_Initialize(mSettings->mFileName, mSettings->mBufLen, mSettings);
103         else
104             Extractor_InitializeFile(stdin, mSettings->mBufLen, mSettings);
105 
106         if (!Extractor_canRead(mSettings)) {
107             unsetFileInput(mSettings);
108         }
109     }
110     if (isIsochronous(mSettings)) {
111 	FAIL_errno(!(mSettings->mFPS > 0.0), "Invalid value for frames per second in the isochronous settings\n", mSettings);
112     }
113     peerclose = false;
114     isburst = (isIsochronous(mSettings) || isPeriodicBurst(mSettings) || (isTripTime(mSettings) && !isUDP(mSettings)));
115 } // end Client
116 
117 /* -------------------------------------------------------------------
118  * Destructor
119  * ------------------------------------------------------------------- */
~Client()120 Client::~Client () {
121 #if HAVE_THREAD_DEBUG
122     thread_debug("Client destructor sock=%d report=%p server-reverse=%s fullduplex=%s", \
123 		 mySocket, (void *) mSettings->reporthdr, \
124 		 (isServerReverse(mSettings) ? "true" : "false"), (isFullDuplex(mSettings) ? "true" : "false"));
125 #endif
126     DELETE_PTR(framecounter);
127 } // end ~Client
128 
129 
130 /* -------------------------------------------------------------------
131  * Setup a socket connected to a server.
132  * If inLocalhost is not null, bind to that address, specifying
133  * which outgoing interface to use.
134  * ------------------------------------------------------------------- */
my_connect(bool close_on_fail)135 bool Client::my_connect (bool close_on_fail) {
136     int rc;
137     double connecttime = -1.0;
138     // create an internet socket
139     int type = (isUDP(mSettings) ? SOCK_DGRAM : SOCK_STREAM);
140     int domain = (SockAddr_isIPv6(&mSettings->peer) ?
141 #ifdef HAVE_IPV6
142                   AF_INET6
143 #else
144                   AF_INET
145 #endif
146                   : AF_INET);
147 
148     mySocket = socket(domain, type, 0);
149     WARN_errno(mySocket == INVALID_SOCKET, "socket");
150     // Socket is carried both by the object and the thread
151     mSettings->mSock=mySocket;
152     SetSocketOptions(mSettings);
153     SockAddr_localAddr(mSettings);
154     SockAddr_remoteAddr(mSettings);
155     if (mSettings->mLocalhost != NULL) {
156         // bind socket to local address
157         rc = bind(mySocket, reinterpret_cast<sockaddr*>(&mSettings->local),
158 		  SockAddr_get_sizeof_sockaddr(&mSettings->local));
159         WARN_errno(rc == SOCKET_ERROR, "bind");
160     }
161 
162     // connect socket
163     connected = false;
164     if (!isUDP(mSettings)) {
165 	int trycnt = mSettings->mConnectRetries + 1;
166 	while (trycnt > 0) {
167 	    connect_start.setnow();
168 	    rc = connect(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer),
169 			 SockAddr_get_sizeof_sockaddr(&mSettings->peer));
170 	    WARN_errno((rc == SOCKET_ERROR), "tcp connect");
171 	    if (rc == SOCKET_ERROR) {
172 		if ((--trycnt) <= 0) {
173 		    if (close_on_fail) {
174 			close(mySocket);
175 			mySocket = INVALID_SOCKET;
176 		    }
177 		} else {
178 		    delay_loop(200000);
179 		}
180 	    } else {
181 		connect_done.setnow();
182 		connecttime = 1e3 * connect_done.subSec(connect_start);
183 		mSettings->connecttime = connecttime;
184 		connected = true;
185 		break;
186 	    }
187 	}
188     } else {
189 	rc = connect(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer),
190 		     SockAddr_get_sizeof_sockaddr(&mSettings->peer));
191 	connecttime = 0.0; // UDP doesn't have a 3WHS
192         WARN_errno((rc == SOCKET_ERROR), "udp connect");
193 	if (rc != SOCKET_ERROR)
194 	    connected = true;
195     }
196     if (connected) {
197 	// Set the send timeout for the very first write which has the test exchange
198 	int sosndtimer = TESTEXCHANGETIMEOUT; // 4 sec in usecs
199 	SetSocketOptionsSendTimeout(mSettings, sosndtimer);
200 	getsockname(mySocket, reinterpret_cast<sockaddr*>(&mSettings->local), &mSettings->size_local);
201 	getpeername(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer), &mSettings->size_peer);
202 	SockAddr_Ifrname(mSettings);
203 	if (isUDP(mSettings) && !isIsochronous(mSettings) && !isIPG(mSettings)) {
204 	    mSettings->mBurstIPG = get_delay_target() / 1e3; // this is being set for the settings report only
205 	}
206     } else {
207 	connecttime = -1;
208 	if (mySocket != INVALID_SOCKET) {
209 	    int rc = close(mySocket);
210 	    WARN_errno(rc == SOCKET_ERROR, "client connect close");
211 	    mySocket = INVALID_SOCKET;
212 	}
213     }
214     if (isReport(mSettings) && isSettingsReport(mSettings)) {
215 	struct ReportHeader *tmp = InitSettingsReport(mSettings);
216 	assert(tmp!=NULL);
217 	PostReport(tmp);
218 	setNoSettReport(mSettings);
219     }
220     // Post the connect report unless peer version exchange is set
221     if (isConnectionReport(mSettings) && !isSumOnly(mSettings)) {
222 	if (connected) {
223 	    struct ReportHeader *reporthdr = InitConnectionReport(mSettings, connecttime);
224 	    struct ConnectionInfo *cr = static_cast<struct ConnectionInfo *>(reporthdr->this_report);
225 	    cr->connect_timestamp.tv_sec = connect_start.getSecs();
226 	    cr->connect_timestamp.tv_usec = connect_start.getUsecs();
227 	    assert(reporthdr);
228 	    PostReport(reporthdr);
229 	} else {
230 	    PostReport(InitConnectionReport(mSettings, -1));
231 	}
232     }
233     return connected;
234 } // end Connect
235 
isConnected() const236 bool Client::isConnected () const {
237 #ifdef HAVE_THREAD_DEBUG
238   // thread_debug("Client is connected %d", connected);
239 #endif
240     return connected;
241 }
242 
TxDelay()243 void Client::TxDelay () {
244     if (isTxHoldback(mSettings)) {
245 	clock_usleep(&mSettings->txholdback_timer);
246     }
247 }
248 
249 // return true of tcpi stats were sampled
250 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
myReportPacket(bool sample_tcpi)251 inline bool Client::myReportPacket (bool sample_tcpi) {
252     bool rc = false;
253     if (sample_tcpi) {
254 	rc = ReportPacket(myReport, reportstruct, &my_tcpi_stats);
255     } else {
256 	ReportPacket(myReport, reportstruct, NULL);
257     }
258     reportstruct->packetLen = 0;
259     return rc;
260 }
myReportPacket()261 inline void Client::myReportPacket () {
262     ReportPacket(myReport, reportstruct, NULL);
263     reportstruct->packetLen = 0;
264 }
265 #else
myReportPacket(void)266 inline void Client::myReportPacket (void) {
267     ReportPacket(myReport, reportstruct);
268     reportstruct->packetLen = 0;
269 }
270 #endif
271 
272 
273 // There are multiple startup synchronizations, this code
274 // handles them all. The caller decides to apply them
275 // either before connect() or after connect() and before writes()
StartSynch()276 int Client::StartSynch () {
277 #ifdef HAVE_THREAD_DEBUG
278     thread_debug("Client start sync enterred");
279 #endif
280 
281     myJob = InitIndividualReport(mSettings);
282     myReport = static_cast<struct ReporterData *>(myJob->this_report);
283     myReport->info.common->socket=mySocket;
284 
285     // Perform delays, usually between connect() and data xfer though before connect
286     // Two delays are supported:
287     // o First is an absolute start time per unix epoch format
288     // o Second is a holdback, a relative amount of seconds between the connect and data xfers
289     // check for an epoch based start time
290     reportstruct->packetLen = 0;
291     if (!isServerReverse(mSettings)) {
292 	if (!isCompat(mSettings)) {
293 	    reportstruct->packetLen = SendFirstPayload();
294 	    // Reverse UDP tests need to retry "first sends" a few times
295 	    // before going to server or read mode
296 	    if (isReverse(mSettings) && isUDP(mSettings)) {
297 		reportstruct->packetLen = 0;
298 		fd_set set;
299 		struct timeval timeout;
300 		int resend_udp = 100;
301 		while (--resend_udp > 0) {
302 		    FD_ZERO(&set);
303 		    FD_SET(mySocket, &set);
304 		    timeout.tv_sec = 0;
305 		    timeout.tv_usec = rand() % 20000; // randomize IPG a bit
306 		    if (select(mySocket + 1, &set, NULL, NULL, &timeout) == 0) {
307 			reportstruct->packetLen = SendFirstPayload();
308 			// printf("**** resend sock=%d count=%d\n", mySocket, resend_udp);
309 		    } else {
310 			break;
311 		    }
312 		}
313 	    }
314 	}
315 	if (isTxStartTime(mSettings)) {
316 	    clock_usleep_abstime(&mSettings->txstart_epoch);
317 	} else if (isTxHoldback(mSettings)) {
318 	    TxDelay();
319 	}
320 	// Server side client
321     } else if (isTripTime(mSettings) || isPeriodicBurst(mSettings)) {
322 	reportstruct->packetLen = SendFirstPayload();
323     }
324     if (isIsochronous(mSettings) || isPeriodicBurst(mSettings)) {
325         Timestamp tmp;
326         tmp.set(mSettings->txstart_epoch.tv_sec, mSettings->txstart_epoch.tv_usec);
327         framecounter = new Isochronous::FrameCounter(mSettings->mFPS, tmp);
328     }
329     int setfullduplexflag = 0;
330     if (isFullDuplex(mSettings) && !isServerReverse(mSettings)) {
331 	assert(mSettings->mFullDuplexReport != NULL);
332 	if ((setfullduplexflag = fullduplex_start_barrier(&mSettings->mFullDuplexReport->fullduplex_barrier)) < 0)
333 	    return -1;
334     }
335     SetReportStartTime();
336     if (reportstruct->packetLen > 0) {
337 	reportstruct->packetTime = myReport->info.ts.startTime;
338 	reportstruct->sentTime = reportstruct->packetTime;
339 	reportstruct->prevSentTime = reportstruct->packetTime;
340 	reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime;
341 	myReportPacket();
342 	myReport->info.ts.prevpacketTime = reportstruct->packetTime;
343 	reportstruct->packetID++;
344     }
345     if (setfullduplexflag) {
346 	SetFullDuplexReportStartTime();
347     }
348     // Full duplex sockets need to be syncronized
349 #ifdef HAVE_THREAD_DEBUG
350     thread_debug("Client start sync exited");
351 #endif
352     return 0;
353 }
354 
SetFullDuplexReportStartTime()355 inline void Client::SetFullDuplexReportStartTime () {
356     assert(myReport->FullDuplexReport != NULL);
357     struct TransferInfo *fullduplexstats = &myReport->FullDuplexReport->info;
358     assert(fullduplexstats != NULL);
359     if (TimeZero(fullduplexstats->ts.startTime)) {
360 	fullduplexstats->ts.startTime = myReport->info.ts.startTime;
361 	if (isModeTime(mSettings)) {
362 	    fullduplexstats->ts.nextTime = myReport->info.ts.nextTime;
363 	}
364     }
365 #ifdef HAVE_THREAD_DEBUG
366     thread_debug("Client fullduplex report start=%ld.%ld next=%ld.%ld", fullduplexstats->ts.startTime.tv_sec, fullduplexstats->ts.startTime.tv_usec, fullduplexstats->ts.nextTime.tv_sec, fullduplexstats->ts.nextTime.tv_usec);
367 #endif
368 }
SetReportStartTime()369 inline void Client::SetReportStartTime () {
370     assert(myReport!=NULL);
371     now.setnow();
372     myReport->info.ts.startTime.tv_sec = now.getSecs();
373     myReport->info.ts.startTime.tv_usec = now.getUsecs();
374     myReport->info.ts.IPGstart = myReport->info.ts.startTime;
375     myReport->info.ts.prevpacketTime = myReport->info.ts.startTime;
376     if (!TimeZero(myReport->info.ts.intervalTime)) {
377 	myReport->info.ts.nextTime = myReport->info.ts.startTime;
378 	TimeAdd(myReport->info.ts.nextTime, myReport->info.ts.intervalTime);
379 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
380 	myReport->info.ts.nextTCPStampleTime = myReport->info.ts.nextTime;
381 #endif
382     }
383     if (myReport->GroupSumReport) {
384 	struct TransferInfo *sumstats = &myReport->GroupSumReport->info;
385 	assert(sumstats != NULL);
386 	Mutex_Lock(&myReport->GroupSumReport->reference.lock);
387 	if (TimeZero(sumstats->ts.startTime)) {
388 	    sumstats->ts.startTime = myReport->info.ts.startTime;
389 	    if (isModeTime(mSettings)) {
390 		sumstats->ts.nextTime = myReport->info.ts.nextTime;
391 	    }
392 #ifdef HAVE_THREAD_DEBUG
393 	    thread_debug("Client group sum report start=%ld.%ld next=%ld.%ld", sumstats->ts.startTime.tv_sec, sumstats->ts.startTime.tv_usec, sumstats->ts.nextTime.tv_sec, sumstats->ts.nextTime.tv_usec);
394 #endif
395 	}
396 	Mutex_Unlock(&myReport->GroupSumReport->reference.lock);
397     }
398 #ifdef HAVE_THREAD_DEBUG
399     thread_debug("Client(%d) report start/ipg=%ld.%ld next=%ld.%ld", mSettings->mSock, myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec, myReport->info.ts.nextTime.tv_sec, myReport->info.ts.nextTime.tv_usec);
400 #endif
401 }
402 
ConnectPeriodic()403 void Client::ConnectPeriodic () {
404     Timestamp end;
405     Timestamp next;
406     unsigned int amount_usec = 1000000;
407     if (isModeTime(mSettings)) {
408 	amount_usec = (mSettings->mAmount * 10000);
409 	end.add(amount_usec); // add in micro seconds
410     }
411     setNoConnectSync(mSettings);
412     int num_connects = -1;
413     if (!(mSettings->mInterval > 0)) {
414 	if (mSettings->connectonly_count < 0)
415 	    num_connects = 10;
416 	else if (mSettings->connectonly_count > 0)
417 	    num_connects = mSettings->connectonly_count;
418     }
419 
420     do {
421 	if (my_connect(false)){
422 	    int rc = close(mySocket);
423 	    WARN_errno(rc == SOCKET_ERROR, "client close");
424 	    mySocket = INVALID_SOCKET;
425 	}
426 	if (mSettings->mInterval > 0) {
427 	    now.setnow();
428 	    do {
429 		next.add(mSettings->mInterval);
430 	    } while (next.before(now));
431 	    if (next.before(end)) {
432 		struct timeval tmp;
433 		tmp.tv_sec = next.getSecs();
434 		tmp.tv_usec = next.getUsecs();
435 		clock_usleep_abstime(&tmp);
436 	    }
437 	}
438 	if (num_connects > 0) {
439 	    --num_connects;
440 	}
441     } while (num_connects && !sInterupted && (next.before(end) || (isModeTime(mSettings) && !(mSettings->mInterval > 0))));
442 }
443 /* -------------------------------------------------------------------
444  * Common traffic loop intializations
445  * ------------------------------------------------------------------- */
InitTrafficLoop()446 void Client::InitTrafficLoop () {
447     //  Enable socket write timeouts for responsive reporting
448     //  Do this after the connection establishment
449     //  and after Client::InitiateServer as during these
450     //  default socket timeouts are preferred.
451     int sosndtimer = 0;
452     // sosndtimer units microseconds
453     // mInterval units are microseconds, mAmount units is 10 ms
454     // SetSocketOptionsSendTimeout takes microseconds
455     // Set the timeout value to 1/2 the interval (per -i) or 1/2 the -t value
456     if (isPeriodicBurst(mSettings) && (mSettings->mFPS > 0.0)) {
457 	sosndtimer = static_cast<int>(round(250000.0 / mSettings->mFPS));
458     } else if (mSettings->mInterval > 0) {
459 	sosndtimer = static_cast<int>(mSettings->mInterval / 2);
460     } else {
461 	sosndtimer = static_cast<int>((mSettings->mAmount * 10000) / 2);
462     }
463     SetSocketOptionsSendTimeout(mSettings, sosndtimer);
464     // set the lower bounds delay based of the socket timeout timer
465     // units needs to be in nanoseconds
466     delay_lower_bounds = static_cast<double>(sosndtimer) * -1e3;
467 
468     if (isIsochronous(mSettings))
469 	myReport->info.matchframeID = 1;
470 
471     // set the total bytes sent to zero
472     totLen = 0;
473     if (isModeTime(mSettings)) {
474         mEndTime.setnow();
475         mEndTime.add(mSettings->mAmount / 100.0);
476     }
477     readAt = mSettings->mBuf;
478     lastPacketTime.set(myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec);
479     reportstruct->errwrite=WriteNoErr;
480     reportstruct->emptyreport=0;
481     reportstruct->packetLen = 0;
482     // Finally, post this thread's "job report" which the reporter thread
483     // will continuously process as long as there are packets flowing
484     // right now the ring is empty
485     if (!isReverse(mSettings) && !isSingleUDP(mSettings) && isDataReport(mSettings)) {
486         assert(myJob!=NULL);
487         assert(myReport!=NULL);
488         PostReport(myJob);
489     }
490     one_report = (!isUDP(mSettings) && !isEnhanced(mSettings) && (mSettings->mIntervalMode != kInterval_Time) \
491 		   && !isIsochronous(mSettings) && !isPeriodicBurst(mSettings) && !isTripTime(mSettings) && !isReverse(mSettings));
492 }
493 
494 /* -------------------------------------------------------------------
495  * Run the appropriate send loop between
496  *
497  * 1) TCP without rate limiting
498  * 2) TCP with rate limiting
499  * 3) UDP
500  * 4) UDP isochronous w/vbr
501  *
502  * ------------------------------------------------------------------- */
Run()503 void Client::Run () {
504     // Initialize the report struct scratch pad
505     // Peform common traffic setup
506     InitTrafficLoop();
507     /*
508      * UDP
509      */
510     if (isUDP(mSettings)) {
511 	if (isFileInput(mSettings)) {
512 	    // Due to the UDP timestamps etc, included
513 	    // reduce the read size by an amount
514 	    // equal to the header size
515 	    Extractor_reduceReadSize(sizeof(struct UDP_datagram), mSettings);
516 	    readAt += sizeof(struct UDP_datagram);
517 	}
518 	// Launch the approprate UDP traffic loop
519 	if (isIsochronous(mSettings)) {
520 	    RunUDPIsochronous();
521 	} else {
522 	    RunUDP();
523 	}
524     } else {
525 	// Launch the approprate TCP traffic loop
526 	if (mSettings->mAppRate > 0) {
527 	    RunRateLimitedTCP();
528 	} else if (isNearCongest(mSettings)) {
529 	    RunNearCongestionTCP();
530 #if HAVE_DECL_TCP_NOTSENT_LOWAT
531 	} else if (isWritePrefetch(mSettings) && \
532 		   !isIsochronous(mSettings) && !isPeriodicBurst(mSettings)) {
533 	    RunWriteEventsTCP();
534 #endif
535 	} else {
536 	    RunTCP();
537 	}
538     }
539 }
540 
541 /*
542  * TCP send loop
543  */
RunTCP()544 void Client::RunTCP () {
545     int burst_remaining = 0;
546     int burst_id = 1;
547     int writelen = mSettings->mBufLen;
548     now.setnow();
549     reportstruct->packetTime.tv_sec = now.getSecs();
550     reportstruct->packetTime.tv_usec = now.getUsecs();
551     while (InProgress()) {
552         if (isModeAmount(mSettings)) {
553 	    writelen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
554 	}
555 	if (isburst && !(burst_remaining > 0)) {
556 	    if (isIsochronous(mSettings)) {
557 		assert(mSettings->mMean);
558 		burst_remaining = static_cast<int>(lognormal(mSettings->mMean,mSettings->mVariance)) / (mSettings->mFPS * 8);
559 	    } else if (isPeriodicBurst(mSettings)){
560 		assert(mSettings->mBurstSize);
561 		burst_remaining = mSettings->mBurstSize;
562 	    } else {
563 		burst_remaining = mSettings->mBufLen;
564 	    }
565 	    // check for TCP minimum payload
566 	    if (burst_remaining < static_cast<int>(sizeof(struct TCP_burst_payload)))
567 		burst_remaining = static_cast<int>(sizeof(struct TCP_burst_payload));
568 	    // apply scheduling if needed
569 	    if (framecounter) {
570 		burst_id = framecounter->wait_tick();
571 		if (isPeriodicBurst(mSettings)) {
572 		    // low duty cycle traffic needs special event handling
573 		    now.setnow();
574 		    reportstruct->packetTime.tv_sec = now.getSecs();
575 		    reportstruct->packetTime.tv_usec = now.getUsecs();
576 		    if (!InProgress()) {
577 			reportstruct->packetLen = 0;
578 			reportstruct->emptyreport = 1;
579 			// wait may have crossed the termination boundry
580 			break;
581 		    } else {
582 			//time interval crossings may have occurred during the wait
583 			//post a null event to cause the report to flush the packet ring
584 			PostNullEvent();
585 		    }
586 		}
587 #if HAVE_DECL_TCP_NOTSENT_LOWAT
588 		if (isWritePrefetch(mSettings)) {
589 		    AwaitWriteSelectEventTCP();
590 		}
591 #endif
592 	    }
593 	    now.setnow();
594 	    reportstruct->packetTime.tv_sec = now.getSecs();
595 	    reportstruct->packetTime.tv_usec = now.getUsecs();
596 	    WriteTcpTxHdr(reportstruct, burst_remaining, burst_id++);
597 	    reportstruct->sentTime = reportstruct->packetTime;
598 	    myReport->info.ts.prevsendTime = reportstruct->packetTime;
599 	    writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen;
600 	    // perform write, full header must succeed
601 	    reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen);
602 	    FAIL_errno(reportstruct->packetLen < (intmax_t) sizeof(struct TCP_burst_payload), "burst written", mSettings);
603 	} else {
604 	    // printf("pl=%ld\n",reportstruct->packetLen);
605 	    // perform write
606 	    if (isburst)
607 		writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen;
608 #if HAVE_DECL_TCP_NOTSENT_LOWAT
609 	    if (isWritePrefetch(mSettings)) {
610 		AwaitWriteSelectEventTCP();
611 	    }
612 #endif
613 	    reportstruct->packetLen = write(mySocket, mSettings->mBuf, writelen);
614 	    now.setnow();
615 	    reportstruct->packetTime.tv_sec = now.getSecs();
616 	    reportstruct->packetTime.tv_usec = now.getUsecs();
617 	    reportstruct->sentTime = reportstruct->packetTime;
618 	}
619 	if (reportstruct->packetLen <= 0) {
620 	    if (reportstruct->packetLen == 0) {
621 		peerclose = true;
622 	    } else if (NONFATALTCPWRITERR(errno)) {
623 		reportstruct->errwrite=WriteErrAccount;
624 	    } else if (FATALTCPWRITERR(errno)) {
625 		reportstruct->errwrite=WriteErrFatal;
626 		WARN_errno(1, "tcp write");
627 		break;
628 	    } else {
629 		reportstruct->errwrite=WriteErrNoAccount;
630 	    }
631 	    reportstruct->packetLen = 0;
632 	    reportstruct->emptyreport = 1;
633 	} else {
634 	    reportstruct->emptyreport = 0;
635 	    totLen += reportstruct->packetLen;
636 	    reportstruct->errwrite=WriteNoErr;
637 	    if (isburst) {
638 		burst_remaining -= reportstruct->packetLen;
639 		if (burst_remaining > 0) {
640 		    reportstruct->transit_ready = 0;
641 		} else {
642 		    reportstruct->transit_ready = 1;
643 		}
644 	    }
645 	}
646 	if (isModeAmount(mSettings) && !reportstruct->emptyreport) {
647 	    /* mAmount may be unsigned, so don't let it underflow! */
648 	    if (mSettings->mAmount >= static_cast<unsigned long>(reportstruct->packetLen)) {
649 		mSettings->mAmount -= static_cast<unsigned long>(reportstruct->packetLen);
650 	    } else {
651 		mSettings->mAmount = 0;
652 	    }
653 	}
654 	if (!one_report) {
655 	    myReportPacket();
656 	}
657     }
658     FinishTrafficActions();
659 }
660 
661 /*
662  * TCP send loop
663  */
RunNearCongestionTCP()664 void Client::RunNearCongestionTCP () {
665     int burst_remaining = 0;
666     int burst_id = 1;
667     now.setnow();
668     reportstruct->packetTime.tv_sec = now.getSecs();
669     reportstruct->packetTime.tv_usec = now.getUsecs();
670     while (InProgress()) {
671         if (isModeAmount(mSettings)) {
672 	    reportstruct->packetLen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
673 	} else {
674 	    reportstruct->packetLen = mSettings->mBufLen;
675 	}
676 	if (!burst_remaining) {
677 	    burst_remaining = mSettings->mBufLen;
678 	    // mAmount check
679 	    now.setnow();
680 	    reportstruct->packetTime.tv_sec = now.getSecs();
681 	    reportstruct->packetTime.tv_usec = now.getUsecs();
682 	    WriteTcpTxHdr(reportstruct, burst_remaining, burst_id++);
683 	    reportstruct->sentTime = reportstruct->packetTime;
684 	    myReport->info.ts.prevsendTime = reportstruct->packetTime;
685 	    // perform write
686 	    int writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen;
687 	    reportstruct->packetLen = write(mySocket, mSettings->mBuf, writelen);
688 	    assert(reportstruct->packetLen >= (intmax_t) sizeof(struct TCP_burst_payload));
689 	    goto ReportNow;
690 	}
691 	if (reportstruct->packetLen > burst_remaining) {
692 	    reportstruct->packetLen = burst_remaining;
693 	}
694 	// printf("pl=%ld\n",reportstruct->packetLen);
695 	// perform write
696 	reportstruct->packetLen = write(mySocket, mSettings->mBuf, reportstruct->packetLen);
697 	now.setnow();
698 	reportstruct->packetTime.tv_sec = now.getSecs();
699 	reportstruct->packetTime.tv_usec = now.getUsecs();
700 	reportstruct->sentTime = reportstruct->packetTime;
701       ReportNow:
702 	reportstruct->transit_ready = 0;
703 	if (reportstruct->packetLen < 0) {
704 	    if (NONFATALTCPWRITERR(errno)) {
705 		reportstruct->errwrite=WriteErrAccount;
706 	    } else if (FATALTCPWRITERR(errno)) {
707 		reportstruct->errwrite=WriteErrFatal;
708 		WARN_errno(1, "tcp write");
709 		break;
710 	    } else {
711 		reportstruct->errwrite=WriteErrNoAccount;
712 	    }
713 	    reportstruct->packetLen = 0;
714 	    reportstruct->emptyreport = 1;
715 	} else {
716 	    reportstruct->emptyreport = 0;
717 	    totLen += reportstruct->packetLen;
718 	    reportstruct->errwrite=WriteNoErr;
719 	    burst_remaining -= reportstruct->packetLen;
720 	    if (burst_remaining <= 0) {
721 		reportstruct->transit_ready = 1;
722 	    }
723 	}
724 	if (isModeAmount(mSettings) && !reportstruct->emptyreport) {
725 	    /* mAmount may be unsigned, so don't let it underflow! */
726 	    if (mSettings->mAmount >= static_cast<unsigned long>(reportstruct->packetLen)) {
727 		mSettings->mAmount -= static_cast<unsigned long>(reportstruct->packetLen);
728 	    } else {
729 		mSettings->mAmount = 0;
730 	    }
731 	}
732 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
733 	// apply placing after write burst completes
734 	if (reportstruct->transit_ready && myReportPacket(true)) {
735 	    int pacing_timer = static_cast<int>(std::ceil(static_cast<double>(my_tcpi_stats.tcpi_rtt) * mSettings->rtt_nearcongest_divider));
736 //		printf("**** delaytime = %d\n", delaytime);
737 	    delay_loop(pacing_timer);
738 	} else
739 #endif
740         {
741 	   myReportPacket();
742         }
743     }
744     FinishTrafficActions();
745 }
746 
747 /*
748  * A version of the transmit loop that supports TCP rate limiting using a token bucket
749  */
RunRateLimitedTCP()750 void Client::RunRateLimitedTCP () {
751     double tokens = 0;
752     Timestamp time1, time2;
753     int burst_size = mSettings->mBufLen;
754     int burst_remaining = 0;
755     int burst_id = 1;
756 
757     long var_rate = mSettings->mAppRate;
758     int fatalwrite_err = 0;
759 
760     now.setnow();
761     reportstruct->packetTime.tv_sec = now.getSecs();
762     reportstruct->packetTime.tv_usec = now.getUsecs();
763     while (InProgress() && !fatalwrite_err) {
764 	// Add tokens per the loop time
765 	time2.setnow();
766         if (isVaryLoad(mSettings)) {
767 	    static Timestamp time3;
768 	    if (time2.subSec(time3) >= VARYLOAD_PERIOD) {
769 		var_rate = lognormal(mSettings->mAppRate,mSettings->mVariance);
770 		time3 = time2;
771 		if (var_rate < 0)
772 		    var_rate = 0;
773 	    }
774 	}
775 	tokens += time2.subSec(time1) * (var_rate / 8.0);
776 	time1 = time2;
777 	if (tokens >= 0.0) {
778 	    if (isModeAmount(mSettings)) {
779 	        reportstruct->packetLen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
780 	    } else {
781 	        reportstruct->packetLen = mSettings->mBufLen;
782 	    }
783 	    // perform write
784 	    int n = 0;
785 	    if (isTripTime(mSettings)) {
786 		if (burst_remaining == 0) {
787 		    now.setnow();
788 		    reportstruct->packetTime.tv_sec = now.getSecs();
789 		    reportstruct->packetTime.tv_usec = now.getUsecs();
790 		    WriteTcpTxHdr(reportstruct, burst_size, burst_id++);
791 		    reportstruct->sentTime = reportstruct->packetTime;
792 		    burst_remaining = burst_size;
793 		    // perform write
794 		    n = writen(mySocket, mSettings->mBuf, sizeof(struct TCP_burst_payload));
795 		    WARN(n != sizeof(struct TCP_burst_payload), "burst hdr write failed");
796 		    burst_remaining -= n;
797 		    reportstruct->packetLen -= n;
798 		    // thread_debug("***write burst header %d id=%d", burst_size, (burst_id - 1));
799 		} else if (reportstruct->packetLen > burst_remaining) {
800 		    reportstruct->packetLen = burst_remaining;
801 		}
802 	    }
803 	    int len = write(mySocket, mSettings->mBuf, reportstruct->packetLen);
804 	    if (len < 0) {
805 	        if (NONFATALTCPWRITERR(errno)) {
806 		    reportstruct->errwrite=WriteErrAccount;
807 		} else if (FATALTCPWRITERR(errno)) {
808 		    reportstruct->errwrite=WriteErrFatal;
809 		    WARN_errno(1, "write");
810 		    fatalwrite_err = 1;
811 		    break;
812 		} else {
813 		    reportstruct->errwrite=WriteErrNoAccount;
814 	        }
815 		len = 0;
816 	    } else {
817 		// Consume tokens per the transmit
818 	        tokens -= (len + n);
819 	        totLen += (len + n);;
820 		reportstruct->errwrite=WriteNoErr;
821 	    }
822 	    if (isTripTime(mSettings))
823 		burst_remaining -= len;
824 
825 	    time2.setnow();
826 	    reportstruct->packetLen = len + n;
827 	    reportstruct->packetTime.tv_sec = time2.getSecs();
828 	    reportstruct->packetTime.tv_usec = time2.getUsecs();
829 	    reportstruct->sentTime = reportstruct->packetTime;
830 	    if (isModeAmount(mSettings)) {
831 		/* mAmount may be unsigned, so don't let it underflow! */
832 		if (mSettings->mAmount >= static_cast<unsigned long>(reportstruct->packetLen)) {
833 		    mSettings->mAmount -= static_cast<unsigned long>(reportstruct->packetLen);
834 		} else {
835 		    mSettings->mAmount = 0;
836 		}
837 	    }
838 	    if (!one_report) {
839 		myReportPacket();
840 	    }
841         } else {
842 	    // Use a 4 usec delay to fill tokens
843 	    delay_loop(4);
844 	}
845     }
846     FinishTrafficActions();
847 }
848 
849 #if HAVE_DECL_TCP_NOTSENT_LOWAT
AwaitWriteSelectEventTCP(void)850 inline bool Client::AwaitWriteSelectEventTCP (void) {
851     int rc;
852     struct timeval timeout;
853     fd_set writeset;
854     FD_ZERO(&writeset);
855     FD_SET(mySocket, &writeset);
856     if (isModeTime(mSettings)) {
857         Timestamp write_event_timeout(0,0);
858 	if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) {
859 	    write_event_timeout.add((double) mSettings->mInterval / 1e6 * 2.0);
860 	} else {
861 	    write_event_timeout.add((double) mSettings->mAmount / 1e2 * 4.0);
862 	}
863 	timeout.tv_sec = write_event_timeout.getSecs();
864         timeout.tv_usec = write_event_timeout.getUsecs();
865     } else {
866 	timeout.tv_sec = 10; // longest is 10 seconds
867 	timeout.tv_usec = 0;
868     }
869 
870     Timestamp t1;
871     if ((rc = select(mySocket + 1, NULL, &writeset, NULL, &timeout)) <= 0) {
872         reportstruct->select_delay = -1;
873 	WARN_errno((rc < 0), "select");
874 #ifdef HAVE_THREAD_DEBUG
875 	if (rc == 0)
876 	    thread_debug("AwaitWrite timeout");
877 #endif
878 	return false;
879     }
880     Timestamp t2;
881     reportstruct->select_delay = t2.subSec(t1);
882     //    printf("*****t1 = %f\n", t2.subSec(t1));
883     return true;
884 }
885 
RunWriteEventsTCP()886 void Client::RunWriteEventsTCP () {
887     int burst_id = 0;
888     int writelen = mSettings->mBufLen;
889 
890     now.setnow();
891     reportstruct->packetTime.tv_sec = now.getSecs();
892     reportstruct->packetTime.tv_usec = now.getUsecs();
893     while (InProgress()) {
894         if (isModeAmount(mSettings)) {
895 	    writelen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
896 	}
897 	now.setnow();
898 	bool rc = AwaitWriteSelectEventTCP();
899 	reportstruct->emptyreport = (rc == false) ? 1 : 0;
900         if (rc) {
901 	    now.setnow();
902 	    reportstruct->packetTime.tv_sec = now.getSecs();
903 	    reportstruct->packetTime.tv_usec = now.getUsecs();
904 	    WriteTcpTxHdr(reportstruct, writelen, ++burst_id);
905 	    reportstruct->sentTime = reportstruct->packetTime;
906 	    reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen);
907 	    if (reportstruct->packetLen <= 0) {
908 		WARN_errno((reportstruct->packetLen < 0), "event writen()");
909 		if (reportstruct->packetLen == 0) {
910 		    peerclose = true;
911 		}
912 		reportstruct->packetLen = 0;
913 		reportstruct->emptyreport = 1;
914 	    }
915 	}
916 	if (isModeAmount(mSettings) && !reportstruct->emptyreport) {
917 	    /* mAmount may be unsigned, so don't let it underflow! */
918 	    if (mSettings->mAmount >= static_cast<unsigned long>(reportstruct->packetLen)) {
919 		mSettings->mAmount -= static_cast<unsigned long>(reportstruct->packetLen);
920 	    } else {
921 		mSettings->mAmount = 0;
922 	    }
923 	}
924 	if (!one_report) {
925 	    myReportPacket();
926 	}
927     }
928     FinishTrafficActions();
929 }
930 #endif
931 /*
932  * UDP send loop
933  */
get_delay_target()934 double Client::get_delay_target () {
935     double delay_target;
936     if (isIPG(mSettings)) {
937 	delay_target = mSettings->mBurstIPG * 1000000;  // convert from milliseconds to nanoseconds
938     } else {
939 	// compute delay target in units of nanoseconds
940 	if (mSettings->mAppRateUnits == kRate_BW) {
941 	    // compute delay for bandwidth restriction, constrained to [0,1] seconds
942 	    delay_target = (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits)
943 							   / mSettings->mAppRate));
944 	} else {
945 	    delay_target = 1e9 / mSettings->mAppRate;
946 	}
947     }
948     return delay_target;
949 }
950 
RunUDP()951 void Client::RunUDP () {
952     struct UDP_datagram* mBuf_UDP = reinterpret_cast<struct UDP_datagram*>(mSettings->mBuf);
953     int currLen;
954 
955     double delay_target = get_delay_target();
956     double delay = 0;
957     double adjust = 0;
958 
959     // Set this to > 0 so first loop iteration will delay the IPG
960     currLen = 1;
961     double variance = mSettings->mVariance;
962     if (apply_first_udppkt_delay && (delay_target > 100000)) {
963 	//the case when a UDP first packet went out in SendFirstPayload
964 	delay_loop(static_cast<unsigned long>(delay_target / 1000));
965     }
966 
967     while (InProgress()) {
968         // Test case: drop 17 packets and send 2 out-of-order:
969         // sequence 51, 52, 70, 53, 54, 71, 72
970         //switch(datagramID) {
971         //  case 53: datagramID = 70; break;
972         //  case 71: datagramID = 53; break;
973         //  case 55: datagramID = 71; break;
974         //  default: break;
975         //}
976 	now.setnow();
977 	reportstruct->packetTime.tv_sec = now.getSecs();
978 	reportstruct->packetTime.tv_usec = now.getUsecs();
979 	reportstruct->sentTime = reportstruct->packetTime;
980         if (isVaryLoad(mSettings) && mSettings->mAppRateUnits == kRate_BW) {
981 	    static Timestamp time3;
982 	    if (now.subSec(time3) >= VARYLOAD_PERIOD) {
983 		long var_rate = lognormal(mSettings->mAppRate,variance);
984 		if (var_rate < 0)
985 		    var_rate = 0;
986 		delay_target = (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits) / var_rate));
987 		time3 = now;
988 	    }
989 	}
990 	// store datagram ID into buffer
991 	WritePacketID(reportstruct->packetID);
992 	mBuf_UDP->tv_sec  = htonl(reportstruct->packetTime.tv_sec);
993 	mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec);
994 
995 	// Adjustment for the running delay
996 	// o measure how long the last loop iteration took
997 	// o calculate the delay adjust
998 	//   - If write succeeded, adjust = target IPG - the loop time
999 	//   - If write failed, adjust = the loop time
1000 	// o then adjust the overall running delay
1001 	// Note: adjust units are nanoseconds,
1002 	//       packet timestamps are microseconds
1003 	if (currLen > 0)
1004 	    adjust = delay_target + \
1005 		(1000.0 * lastPacketTime.subUsec(reportstruct->packetTime));
1006 	else
1007 	    adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime);
1008 
1009 	lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec);
1010 	// Since linux nanosleep/busyloop can exceed delay
1011 	// there are two possible equilibriums
1012 	//  1)  Try to perserve inter packet gap
1013 	//  2)  Try to perserve requested transmit rate
1014 	// The latter seems preferred, hence use a running delay
1015 	// that spans the life of the thread and constantly adjust.
1016 	// A negative delay means the iperf app is behind.
1017 	delay += adjust;
1018 	// Don't let delay grow unbounded
1019 	if (delay < delay_lower_bounds) {
1020 	    delay = delay_target;
1021 	}
1022 
1023 	reportstruct->errwrite = WriteNoErr;
1024 	reportstruct->emptyreport = 0;
1025 	// perform write
1026 	if (isModeAmount(mSettings)) {
1027 	    currLen = write(mySocket, mSettings->mBuf, (mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
1028 	} else {
1029 	    currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
1030 	}
1031 	if (currLen < 0) {
1032 	    reportstruct->packetID--;
1033 	    if (FATALUDPWRITERR(errno)) {
1034 	        reportstruct->errwrite = WriteErrFatal;
1035 	        WARN_errno(1, "write");
1036 		break;
1037 	    } else {
1038 	        reportstruct->errwrite = WriteErrAccount;
1039 	        currLen = 0;
1040 	    }
1041 	    reportstruct->emptyreport = 1;
1042 	}
1043 
1044 	if (isModeAmount(mSettings)) {
1045 	    /* mAmount may be unsigned, so don't let it underflow! */
1046 	    if (mSettings->mAmount >= static_cast<unsigned long>(currLen)) {
1047 	        mSettings->mAmount -= static_cast<unsigned long>(currLen);
1048 	    } else {
1049 	        mSettings->mAmount = 0;
1050 	    }
1051 	}
1052 
1053 	// report packets
1054 	reportstruct->packetLen = static_cast<unsigned long>(currLen);
1055 	reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime;
1056 	myReportPacket();
1057 	reportstruct->packetID++;
1058 	myReport->info.ts.prevpacketTime = reportstruct->packetTime;
1059 	// Insert delay here only if the running delay is greater than 100 usec,
1060 	// otherwise don't delay and immediately continue with the next tx.
1061 	if (delay >= 100000) {
1062 	    // Convert from nanoseconds to microseconds
1063 	    // and invoke the microsecond delay
1064 	    delay_loop(static_cast<unsigned long>(delay / 1000));
1065 	}
1066     }
1067     FinishTrafficActions();
1068 }
1069 
1070 /*
1071  * UDP isochronous send loop
1072  */
RunUDPIsochronous()1073 void Client::RunUDPIsochronous () {
1074     struct UDP_datagram* mBuf_UDP = reinterpret_cast<struct UDP_datagram*>(mSettings->mBuf);
1075     // skip over the UDP datagram (seq no, timestamp) to reach the isoch fields
1076     struct client_udp_testhdr *udp_payload = reinterpret_cast<client_udp_testhdr *>(mSettings->mBuf);
1077 
1078     double delay_target = mSettings->mBurstIPG * 1000000;  // convert from milliseconds to nanoseconds
1079     double delay = 0;
1080     double adjust = 0;
1081     int currLen = 1;
1082     int frameid=0;
1083     Timestamp t1;
1084 
1085     // make sure the packet can carry the isoch payload
1086     if (!framecounter) {
1087 	framecounter = new Isochronous::FrameCounter(mSettings->mFPS);
1088     }
1089     udp_payload->isoch.burstperiod = htonl(framecounter->period_us());
1090 
1091     int initdone = 0;
1092     int fatalwrite_err = 0;
1093     while (InProgress() && !fatalwrite_err) {
1094 	int bytecnt = static_cast<int>(lognormal(mSettings->mMean,mSettings->mVariance)) / (mSettings->mFPS * 8);
1095 	if (bytecnt < udp_payload_minimum)
1096 	    bytecnt = udp_payload_minimum;
1097 	delay = 0;
1098 
1099 	// printf("bits=%d\n", (int) (mSettings->mFPS * bytecnt * 8));
1100 	udp_payload->isoch.burstsize  = htonl(bytecnt);
1101 	udp_payload->isoch.prevframeid  = htonl(frameid);
1102 	reportstruct->burstsize=bytecnt;
1103 	frameid =  framecounter->wait_tick();
1104 	udp_payload->isoch.frameid  = htonl(frameid);
1105 	lastPacketTime.setnow();
1106 	if (!initdone) {
1107 	    initdone = 1;
1108 	    udp_payload->isoch.start_tv_sec = htonl(framecounter->getSecs());
1109 	    udp_payload->isoch.start_tv_usec = htonl(framecounter->getUsecs());
1110 	}
1111 	while ((bytecnt > 0) && InProgress()) {
1112 	    t1.setnow();
1113 	    reportstruct->packetTime.tv_sec = t1.getSecs();
1114 	    reportstruct->packetTime.tv_usec = t1.getUsecs();
1115 	    reportstruct->sentTime = reportstruct->packetTime;
1116 	    mBuf_UDP->tv_sec  = htonl(reportstruct->packetTime.tv_sec);
1117 	    mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec);
1118 	    WritePacketID(reportstruct->packetID);
1119 
1120 	    // Adjustment for the running delay
1121 	    // o measure how long the last loop iteration took
1122 	    // o calculate the delay adjust
1123 	    //   - If write succeeded, adjust = target IPG - the loop time
1124 	    //   - If write failed, adjust = the loop time
1125 	    // o then adjust the overall running delay
1126 	    // Note: adjust units are nanoseconds,
1127 	    //       packet timestamps are microseconds
1128 	    if (currLen > 0)
1129 		adjust = delay_target + \
1130 		    (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime));
1131 	    else
1132 		adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime);
1133 
1134 	    lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec);
1135 	    // Since linux nanosleep/busyloop can exceed delay
1136 	    // there are two possible equilibriums
1137 	    //  1)  Try to perserve inter packet gap
1138 	    //  2)  Try to perserve requested transmit rate
1139 	    // The latter seems preferred, hence use a running delay
1140 	    // that spans the life of the thread and constantly adjust.
1141 	    // A negative delay means the iperf app is behind.
1142 	    delay += adjust;
1143 	    // Don't let delay grow unbounded
1144 	    // if (delay < delay_lower_bounds) {
1145 	    //	  delay = delay_target;
1146 	    // }
1147 
1148 	    reportstruct->errwrite = WriteNoErr;
1149 	    reportstruct->emptyreport = 0;
1150 
1151 	    // perform write
1152 	    if (isModeAmount(mSettings) && (mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen))) {
1153 	        udp_payload->isoch.remaining = htonl(mSettings->mAmount);
1154 		reportstruct->remaining=mSettings->mAmount;
1155 	        currLen = write(mySocket, mSettings->mBuf, mSettings->mAmount);
1156 	    } else {
1157 	        udp_payload->isoch.remaining = htonl(bytecnt);
1158 		reportstruct->remaining=bytecnt;
1159 	        currLen = write(mySocket, mSettings->mBuf, (bytecnt < mSettings->mBufLen) ? bytecnt : mSettings->mBufLen);
1160 	    }
1161 
1162 	    if (currLen < 0) {
1163 	        reportstruct->packetID--;
1164 		reportstruct->emptyreport = 1;
1165 		currLen = 0;
1166 		if (FATALUDPWRITERR(errno)) {
1167 	            reportstruct->errwrite = WriteErrFatal;
1168 	            WARN_errno(1, "write");
1169 		    fatalwrite_err = 1;
1170 	        } else {
1171 		    reportstruct->errwrite = WriteErrAccount;
1172 		}
1173 	    } else {
1174 		bytecnt -= currLen;
1175 		if (!bytecnt)
1176 		    reportstruct->transit_ready = 1;
1177 		else
1178 		    reportstruct->transit_ready = 0;
1179 		// adjust bytecnt so last packet of burst is greater or equal to min packet
1180 		if ((bytecnt > 0) && (bytecnt < udp_payload_minimum)) {
1181 		    bytecnt = udp_payload_minimum;
1182 		    udp_payload->isoch.burstsize  = htonl(bytecnt);
1183 		    reportstruct->burstsize=bytecnt;
1184 		}
1185 	    }
1186 	    if (isModeAmount(mSettings)) {
1187 	        /* mAmount may be unsigned, so don't let it underflow! */
1188 	        if (mSettings->mAmount >= static_cast<unsigned long>(currLen)) {
1189 		    mSettings->mAmount -= static_cast<unsigned long>(currLen);
1190 		} else {
1191 		    mSettings->mAmount = 0;
1192 		}
1193 	    }
1194 	    // report packets
1195 
1196 	    reportstruct->frameID=frameid;
1197 	    reportstruct->packetLen = static_cast<unsigned long>(currLen);
1198 	    reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime;
1199 	    myReportPacket();
1200 	    reportstruct->packetID++;
1201 	    myReport->info.ts.prevpacketTime = reportstruct->packetTime;
1202 	    // Insert delay here only if the running delay is greater than 1 usec,
1203 	    // otherwise don't delay and immediately continue with the next tx.
1204 	    if (delay >= 1000) {
1205 		// Convert from nanoseconds to microseconds
1206 		// and invoke the microsecond delay
1207 		delay_loop(static_cast<unsigned long>(delay / 1000));
1208 	    }
1209 	}
1210     }
1211     FinishTrafficActions();
1212 }
1213 // end RunUDPIsoch
1214 
WritePacketID(intmax_t packetID)1215 inline void Client::WritePacketID (intmax_t packetID) {
1216     struct UDP_datagram * mBuf_UDP = reinterpret_cast<struct UDP_datagram *>(mSettings->mBuf);
1217     // store datagram ID into buffer
1218 #ifdef HAVE_INT64_T
1219     // Pack signed 64bit packetID into unsigned 32bit id1 + unsigned
1220     // 32bit id2.  A legacy server reading only id1 will still be able
1221     // to reconstruct a valid signed packet ID number up to 2^31.
1222     uint32_t id1, id2;
1223     id1 = packetID & 0xFFFFFFFFLL;
1224     id2 = (packetID  & 0xFFFFFFFF00000000LL) >> 32;
1225 
1226     mBuf_UDP->id = htonl(id1);
1227     mBuf_UDP->id2 = htonl(id2);
1228 
1229 #ifdef HAVE_PACKET_DEBUG
1230     printf("id %" PRIdMAX " (0x%" PRIxMAX ") -> 0x%x, 0x%x\n",
1231 	   packetID, packetID, id1, id2);
1232 #endif
1233 #else
1234     mBuf_UDP->id = htonl((reportstruct->packetID));
1235 #endif
1236 }
1237 
WriteTcpTxHdr(struct ReportStruct * reportstruct,int burst_size,int burst_id)1238 inline void Client::WriteTcpTxHdr (struct ReportStruct *reportstruct, int burst_size, int burst_id) {
1239     struct TCP_burst_payload * mBuf_burst = reinterpret_cast<struct TCP_burst_payload *>(mSettings->mBuf);
1240     // store packet ID into buffer
1241     reportstruct->packetID += burst_size;
1242     mBuf_burst->start_tv_sec = htonl(myReport->info.ts.startTime.tv_sec);
1243     mBuf_burst->start_tv_usec = htonl(myReport->info.ts.startTime.tv_usec);
1244 
1245 #ifdef HAVE_INT64_T
1246     // Pack signed 64bit packetID into unsigned 32bit id1 + unsigned
1247     // 32bit id2.  A legacy server reading only id1 will still be able
1248     // to reconstruct a valid signed packet ID number up to 2^31.
1249     uint32_t id1, id2;
1250     id1 = reportstruct->packetID & 0xFFFFFFFFLL;
1251     id2 = (reportstruct->packetID  & 0xFFFFFFFF00000000LL) >> 32;
1252 
1253     mBuf_burst->seqno_lower = htonl(id1);
1254     mBuf_burst->seqno_upper = htonl(id2);
1255 
1256 #ifdef HAVE_PACKET_DEBUG
1257     printf("id %" PRIdMAX " (0x%" PRIxMAX ") -> 0x%x, 0x%x\n",
1258 	   reportstruct->packetID, reportstruct->packetID, id1, id2);
1259 #endif
1260 #else
1261     mBuf_burst->seqno_lower = htonl((reportstruct->packetID));
1262     mBuf_burst->seqno_upper = htonl(0x0);
1263 #endif
1264     mBuf_burst->send_tt.write_tv_sec  = htonl(reportstruct->packetTime.tv_sec);
1265     mBuf_burst->send_tt.write_tv_usec  = htonl(reportstruct->packetTime.tv_usec);
1266     mBuf_burst->burst_id  = htonl((uint32_t)burst_id);
1267     mBuf_burst->burst_size  = htonl((uint32_t)burst_size);
1268     mBuf_burst->burst_period_s  = htonl(0x0);
1269     mBuf_burst->burst_period_us  = htonl(0x0);
1270     reportstruct->frameID=burst_id;
1271     reportstruct->burstsize=burst_size;
1272 //    printf("**** Write tcp burst header size= %d id = %d\n", burst_size, burst_id);
1273 }
1274 
InProgress()1275 inline bool Client::InProgress () {
1276     // Read the next data block from
1277     // the file if it's file input
1278     if (isFileInput(mSettings)) {
1279 	Extractor_getNextDataBlock(readAt, mSettings);
1280         return Extractor_canRead(mSettings) != 0;
1281     }
1282     return !(sInterupted || peerclose || \
1283 	(isModeTime(mSettings) && mEndTime.before(reportstruct->packetTime))  ||
1284 	(isModeAmount(mSettings) && (mSettings->mAmount <= 0)));
1285 }
1286 
1287 /*
1288  * Common things to do to finish a traffic thread
1289  *
1290  * Notes on the negative packet count or seq no:
1291  * A negative packet id is used to tell the server
1292  * this UDP stream is terminating.  The server will remove
1293  * the sign.  So a decrement will be seen as increments by
1294  * the server (e.g, -1000, -1001, -1002 as 1000, 1001, 1002)
1295  * If the retries weren't decrement here the server can get out
1296  * of order packets per these retries actually being received
1297  * by the server (e.g. -1000, -1000, -1000)
1298  */
FinishTrafficActions()1299 void Client::FinishTrafficActions () {
1300     disarm_itimer();
1301     // Shutdown the TCP socket's writes as the event for the server to end its traffic loop
1302     if (!isUDP(mSettings)) {
1303 	if ((mySocket != INVALID_SOCKET) && isConnected()) {
1304 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
1305 	  // gettcpistats(myReport, true, NULL);
1306 #endif
1307 	    int rc = shutdown(mySocket, SHUT_WR);
1308 #ifdef HAVE_THREAD_DEBUG
1309 	    thread_debug("Client calls shutdown() SHUTW_WR on tcp socket %d", mySocket);
1310 #endif
1311 	    WARN_errno(rc == SOCKET_ERROR, "shutdown");
1312 	    if (!rc && !isFullDuplex(mSettings))
1313 		AwaitServerCloseEvent();
1314 	}
1315 	now.setnow();
1316 	reportstruct->packetTime.tv_sec = now.getSecs();
1317 	reportstruct->packetTime.tv_usec = now.getUsecs();
1318 	if (one_report) {
1319 	    /*
1320 	     *  For TCP and if not doing interval or enhanced reporting (needed for write accounting),
1321 	     *  then report the entire transfer as one big packet
1322 	     *
1323 	     */
1324 	    reportstruct->packetLen = totLen;
1325 	}
1326     } else {
1327 	// stop timing
1328 	now.setnow();
1329 	reportstruct->packetTime.tv_sec = now.getSecs();
1330 	reportstruct->packetTime.tv_usec = now.getUsecs();
1331 	reportstruct->sentTime = reportstruct->packetTime;
1332 	// send a final terminating datagram
1333 	// Don't count in the mTotalLen. The server counts this one,
1334 	// but didn't count our first datagram, so we're even now.
1335 	// The negative datagram ID signifies termination to the server.
1336 	WritePacketID(-reportstruct->packetID);
1337 	struct UDP_datagram * mBuf_UDP = reinterpret_cast<struct UDP_datagram *>(mSettings->mBuf);
1338 	mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec);
1339 	mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec);
1340 	int len = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
1341 #ifdef HAVE_THREAD_DEBUG
1342 	thread_debug("UDP client sent final packet per negative seqno %ld", -reportstruct->packetID);
1343 #endif
1344 	if (len > 0) {
1345 	    reportstruct->packetLen = len;
1346 	    myReportPacket();
1347 	}
1348 	reportstruct->packetLen = 0;
1349     }
1350     int do_close = EndJob(myJob, reportstruct);
1351     if (isUDP(mSettings) && !isMulticast(mSettings) && !isNoUDPfin(mSettings)) {
1352 	/*
1353 	 *  For UDP, there is a final handshake between the client and the server,
1354 	 *  do that now (unless requested no to)
1355 	 */
1356 	AwaitServerFinPacket();
1357     }
1358     if (do_close) {
1359 #if HAVE_THREAD_DEBUG
1360 	thread_debug("client close sock=%d", mySocket);
1361 #endif
1362 	int rc = close(mySocket);
1363 	WARN_errno(rc == SOCKET_ERROR, "client close");
1364     }
1365     Iperf_remove_host(mSettings);
1366     FreeReport(myJob);
1367     if (framecounter)
1368 	DELETE_PTR(framecounter);
1369 }
1370 
1371 /* -------------------------------------------------------------------
1372  * Await for the server's fin packet which also has the server
1373  * stats to displayed on the client.  Attempt to re-transmit
1374  * until the fin is received
1375  * ------------------------------------------------------------------- */
1376 #define RETRYTIMER 10000 //units of us
1377 #define RETRYCOUNT (2 * 1000000 / RETRYTIMER) // 2 seconds worth of retries
AwaitServerFinPacket()1378 void Client::AwaitServerFinPacket () {
1379     int rc;
1380     fd_set readSet;
1381     struct timeval timeout;
1382     int ack_success = 0;
1383     int count = RETRYCOUNT;
1384     while (--count >= 0) {
1385         // wait until the socket is readable, or our timeout expires
1386         FD_ZERO(&readSet);
1387         FD_SET(mySocket, &readSet);
1388         timeout.tv_sec  = 0;
1389         timeout.tv_usec = RETRYTIMER;
1390         rc = select(mySocket+1, &readSet, NULL, NULL, &timeout);
1391         FAIL_errno(rc == SOCKET_ERROR, "select", mSettings);
1392         // rc= zero means select's read timed out
1393 	if (rc == 0) {
1394 	    // try to trigger another FIN by resending a negative seq no
1395 	    WritePacketID(-(++reportstruct->packetID));
1396 	    // write data
1397 	    rc = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
1398 	    WARN_errno(rc < 0, "write-fin");
1399 #ifdef HAVE_THREAD_DEBUG
1400 	    thread_debug("UDP client retransmit final packet per negative seqno %ld", -reportstruct->packetID);
1401 #endif
1402 	} else {
1403             // socket ready to read, this packet size
1404 	    // is set by the server.  Assume it's large enough
1405 	    // to contain the final server packet
1406 	    rc = read(mySocket, mSettings->mBuf, MAXUDPBUF);
1407 
1408 	    // dump any 2.0.13 client acks sent at the start of traffic
1409 	    if (rc == sizeof(client_hdr_ack)) {
1410 		struct client_hdr_ack *ack =  reinterpret_cast<struct client_hdr_ack *>(mSettings->mBuf);
1411 		if (ntohl(ack->typelen.type) == CLIENTHDRACK) {
1412 		    // printf("**** dump stale ack \n");
1413 		    continue;
1414 		}
1415 	    }
1416 
1417 	    WARN_errno(rc < 0, "read");
1418 	    if (rc > 0) {
1419 		ack_success = 1;
1420 #ifdef HAVE_THREAD_DEBUG
1421 		thread_debug("UDP client received server relay report ack (%d)", -reportstruct->packetID);
1422 #endif
1423 		if (mSettings->mReportMode != kReport_CSV) {
1424 		    PostReport(InitServerRelayUDPReport(mSettings, reinterpret_cast<server_hdr*>(reinterpret_cast<UDP_datagram*>(mSettings->mBuf) + 1)));
1425 		}
1426 		break;
1427 	    }
1428         }
1429     }
1430     if ((!ack_success) && (mSettings->mReportMode != kReport_CSV))
1431 	fprintf(stderr, warn_no_ack, mySocket, (isModeTime(mSettings) ? 10 : 1));
1432 }
1433 
1434 
PostNullEvent()1435 void Client::PostNullEvent () {
1436     assert(myReport!=NULL);
1437     // push a nonevent into the packet ring
1438     // this will cause the reporter to process
1439     // up to this event
1440     memset(reportstruct, 0, sizeof(struct ReportStruct));
1441     now.setnow();
1442     reportstruct->packetTime.tv_sec = now.getSecs();
1443     reportstruct->packetTime.tv_usec = now.getUsecs();
1444     reportstruct->emptyreport=1;
1445     myReportPacket();
1446 }
1447 
1448 // The client end timer is based upon the final fin, fin-ack w/the server
1449 // A way to detect this is to hang a recv and wait for the zero byte
1450 // return indicating the socket is closed for recv per the server
1451 // closing it's socket
1452 #define MINAWAITCLOSEUSECS 2000000
AwaitServerCloseEvent()1453 void Client::AwaitServerCloseEvent () {
1454     // the await detection can take awhile so post a non event ahead of it
1455     PostNullEvent();
1456     unsigned int amount_usec = \
1457 	(isModeTime(mSettings) ? static_cast<int>(mSettings->mAmount * 10000) : MINAWAITCLOSEUSECS);
1458     if (amount_usec < MINAWAITCLOSEUSECS)
1459 	amount_usec = MINAWAITCLOSEUSECS;
1460     SetSocketOptionsReceiveTimeout(mSettings, amount_usec);
1461     int rc;
1462     while ((rc = recv(mySocket, mSettings->mBuf, mSettings->mBufLen, 0) > 0)) {};
1463     if (rc < 0)
1464 	WARN_errno(1, "client await server close");
1465 #ifdef HAVE_THREAD_DEBUG
1466     if (rc==0)
1467 	thread_debug("Client detected server close %d", mySocket);
1468 #endif
1469 }
1470 
SendFirstPayload()1471 int Client::SendFirstPayload () {
1472     int pktlen = 0;
1473     if (!isConnectOnly(mSettings)) {
1474 	if (myReport && !TimeZero(myReport->info.ts.startTime) && !(mSettings->mMode == kTest_TradeOff)) {
1475 	    reportstruct->packetTime = myReport->info.ts.startTime;
1476 	} else {
1477 	    now.setnow();
1478 	    reportstruct->packetTime.tv_sec = now.getSecs();
1479 	    reportstruct->packetTime.tv_usec = now.getUsecs();
1480 	}
1481 	if (isTxStartTime(mSettings)) {
1482 	    pktlen = Settings_GenerateClientHdr(mSettings, (void *) mSettings->mBuf, mSettings->txstart_epoch);
1483 	} else {
1484 	    pktlen = Settings_GenerateClientHdr(mSettings, (void *) mSettings->mBuf, reportstruct->packetTime);
1485 	}
1486 	if (pktlen > 0) {
1487 	    if (isUDP(mSettings)) {
1488 		struct client_udp_testhdr *tmphdr = reinterpret_cast<struct client_udp_testhdr *>(mSettings->mBuf);
1489 		WritePacketID(reportstruct->packetID);
1490 		tmphdr->seqno_ts.tv_sec  = htonl(reportstruct->packetTime.tv_sec);
1491 		tmphdr->seqno_ts.tv_usec = htonl(reportstruct->packetTime.tv_usec);
1492 		udp_payload_minimum = pktlen;
1493 #if HAVE_DECL_MSG_DONTWAIT
1494 		pktlen = send(mySocket, mSettings->mBuf, (pktlen > mSettings->mBufLen) ? pktlen : mSettings->mBufLen, MSG_DONTWAIT);
1495 #else
1496 		pktlen = send(mySocket, mSettings->mBuf, (pktlen > mSettings->mBufLen) ? pktlen : mSettings->mBufLen, 0);
1497 #endif
1498 		apply_first_udppkt_delay = true;
1499 	    } else {
1500 #if HAVE_DECL_TCP_NODELAY
1501 		if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) {
1502 		    int optflag=1;
1503 		    int rc;
1504 		    // Disable Nagle to reduce latency of this intial message
1505 		    if ((rc = setsockopt(mSettings->mSock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&optflag), sizeof(int))) < 0) {
1506 			WARN_errno(rc < 0, "tcpnodelay");
1507 		    }
1508 		}
1509 #endif
1510 #if HAVE_DECL_MSG_DONTWAIT
1511 		pktlen = send(mySocket, mSettings->mBuf, pktlen, MSG_DONTWAIT);
1512 #else
1513 		pktlen = send(mySocket, mSettings->mBuf, pktlen, 0);
1514 #endif
1515 		if (isPeerVerDetect(mSettings) && !isServerReverse(mSettings)) {
1516 		    PeerXchange();
1517 		}
1518 #if HAVE_DECL_TCP_NODELAY
1519 		if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) {
1520 		    int optflag=0;
1521 		    int rc;
1522 		    // Disable Nagle to reduce latency of this intial message
1523 		    if ((rc = setsockopt(mSettings->mSock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&optflag), sizeof(int))) < 0) {
1524 			WARN_errno(rc < 0, "tcpnodelay");
1525 		    }
1526 		}
1527 #endif
1528 	    }
1529 	    WARN_errno(pktlen < 0, "send_hdr");
1530 	}
1531     }
1532     return pktlen;
1533 }
1534 
PeerXchange()1535 void Client::PeerXchange () {
1536     int n;
1537     client_hdr_ack ack;
1538     /*
1539      * Hang read and see if this is a header ack message
1540      */
1541     int readlen = isTripTime(mSettings) ? sizeof(struct client_hdr_ack) : (sizeof(struct client_hdr_ack) - sizeof(struct client_hdr_ack_ts));
1542     if ((n = recvn(mySocket, reinterpret_cast<char *>(&ack), readlen, 0)) == readlen) {
1543 	if (ntohl(ack.typelen.type) == CLIENTHDRACK) {
1544 	    mSettings->peer_version_u = ntohl(ack.version_u);
1545 	    mSettings->peer_version_l = ntohl(ack.version_l);
1546 	    if (isTripTime(mSettings)) {
1547 		Timestamp now;
1548 		Timestamp senttx(ntohl(ack.ts.sent_tv_sec), ntohl(ack.ts.sent_tv_usec));
1549 		Timestamp sentrx(ntohl(ack.ts.sentrx_tv_sec), ntohl(ack.ts.sentrx_tv_usec));
1550 		Timestamp acktx(ntohl(ack.ts.ack_tv_sec), ntohl(ack.ts.ack_tv_usec));
1551 		Timestamp ackrx(now.getSecs(), now.getUsecs());
1552 		double str = (sentrx.get() - senttx.get()) * 1e3;
1553 		double atr = (now.get() - acktx.get()) * 1e3;
1554 		double rtt = str + atr;
1555 		double halfrtt = rtt / 2.0;
1556 		fprintf(stderr,"%sClock sync check (ms): RTT/Half=(%0.3f/%0.3f) OWD-send/ack/asym=(%0.3f/%0.3f/%0.3f)\n",mSettings->mTransferIDStr, rtt, halfrtt, str, atr, abs(str-atr));
1557 	    }
1558 	}
1559     } else {
1560 	WARN_errno(1, "recvack");
1561     }
1562 }
1563 
1564 /*
1565  * BarrierClient allows for multiple stream clients to be syncronized
1566  */
BarrierClient(struct BarrierMutex * barrier)1567 int Client::BarrierClient (struct BarrierMutex *barrier) {
1568     int last = 0;
1569 #ifdef HAVE_THREAD
1570     assert(barrier != NULL);
1571     Condition_Lock(barrier->await);
1572     if (--barrier->count <= 0) {
1573 	// store the barrier release timer
1574 #ifdef HAVE_CLOCK_GETTIME
1575 	struct timespec t1;
1576 	clock_gettime(CLOCK_REALTIME, &t1);
1577 	barrier->release_time.tv_sec  = t1.tv_sec;
1578 	barrier->release_time.tv_usec = t1.tv_nsec / 1000;
1579 #else
1580 	gettimeofday(&barrier->release_time, NULL);
1581 #endif
1582 	last = 1;
1583 	// last one wake's up everyone else
1584 	Condition_Broadcast(&barrier->await);
1585 #ifdef HAVE_THREAD_DEBUG
1586 	thread_debug("Barrier BROADCAST on condition %p", (void *)&barrier->await);
1587 #endif
1588     } else {
1589 #ifdef HAVE_THREAD_DEBUG
1590         thread_debug("Barrier WAIT on condition %p count=%d", (void *)&barrier->await, barrier->count);
1591 #endif
1592         Condition_Wait(&barrier->await);
1593     }
1594     Condition_Unlock(barrier->await);
1595 #ifdef HAVE_THREAD_DEBUG
1596     thread_debug("Barrier EXIT on condition %p", (void *)&barrier->await);
1597 #endif
1598 #else
1599     last = 1;
1600 #endif // HAVE_THREAD
1601     return last;
1602 }
1603