1 /*---------------------------------------------------------------
2  * Copyright (c) 1999,2000,2001,2002,2003
3  * The Board of Trustees of the University of Illinois
4  * All Rights Reserved.
5  *---------------------------------------------------------------
6  * Permission is hereby granted, free of charge, to any person
7  * obtaining a copy of this software (Iperf) and associated
8  * documentation files (the "Software"), to deal in the Software
9  * without restriction, including without limitation the
10  * rights to use, copy, modify, merge, publish, distribute,
11  * sublicense, and/or sell copies of the Software, and to permit
12  * persons to whom the Software is furnished to do
13  * so, subject to the following conditions:
14  *
15  *
16  * Redistributions of source code must retain the above
17  * copyright notice, this list of conditions and
18  * the following disclaimers.
19  *
20  *
21  * Redistributions in binary form must reproduce the above
22  * copyright notice, this list of conditions and the following
23  * disclaimers in the documentation and/or other materials
24  * provided with the distribution.
25  *
26  *
27  * Neither the names of the University of Illinois, NCSA,
28  * nor the names of its contributors may be used to endorse
29  * or promote products derived from this Software without
30  * specific prior written permission.
31  *
32  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
33  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
34  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
35  * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT
36  * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
37  * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
38  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
39  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
40  * ________________________________________________________________
41  * National Laboratory for Applied Network Research
42  * National Center for Supercomputing Applications
43  * University of Illinois at Urbana-Champaign
44  * http://www.ncsa.uiuc.edu
45  * ________________________________________________________________
46  *
47  * Server.cpp
48  * by Mark Gates <mgates@nlanr.net>
49  *     Ajay Tirumala (tirumala@ncsa.uiuc.edu>.
50  * -------------------------------------------------------------------
51  * A server thread is initiated for each connection accept() returns.
52  * Handles sending and receiving data, and then closes socket.
53  * Changes to this version : The server can be run as a daemon
54  * ------------------------------------------------------------------- */
55 
56 #define HEADERS()
57 
58 #include "headers.h"
59 #include "Server.hpp"
60 #include "active_hosts.h"
61 #include "Extractor.h"
62 #include "Reporter.h"
63 #include "Locale.h"
64 #include "delay.h"
65 #include "PerfSocket.hpp"
66 #include "SocketAddr.h"
67 #include "payloads.h"
68 #include <cmath>
69 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
70 #include "checksums.h"
71 #endif
72 
73 
74 /* -------------------------------------------------------------------
75  * Stores connected socket and socket info.
76  * ------------------------------------------------------------------- */
77 
Server(thread_Settings * inSettings)78 Server::Server (thread_Settings *inSettings) {
79 #ifdef HAVE_THREAD_DEBUG
80     thread_debug("Server constructor with thread=%p sum=%p (sock=%d)", (void *) inSettings, (void *)inSettings->mSumReport, inSettings->mSock);
81 #endif
82     mSettings = inSettings;
83     myJob = NULL;
84     reportstruct = &scratchpad;
85     memset(&scratchpad, 0, sizeof(struct ReportStruct));
86     mySocket = inSettings->mSock;
87     peerclose = false;
88 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
89     myDropSocket = inSettings->mSockDrop;
90     if (isL2LengthCheck(mSettings)) {
91 	// For L2 UDP make sure we can receive a full ethernet packet plus a bit more
92 	if (mSettings->mBufLen < (2 * ETHER_MAX_LEN)) {
93 	    mSettings->mBufLen = (2 * ETHER_MAX_LEN);
94 	}
95     }
96 #endif
97     // Enable kernel level timestamping if available
98     InitKernelTimeStamping();
99     int sorcvtimer = 0;
100     // sorcvtimer units microseconds convert to that
101     // minterval double, units seconds
102     // mAmount integer, units 10 milliseconds
103     // divide by two so timeout is 1/2 the interval
104     if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) {
105 	sorcvtimer = static_cast<int>(round(1000000.0 * mSettings->mInterval / 2.0));
106     } else if (isServerModeTime(mSettings)) {
107 	sorcvtimer = static_cast<int>(round(mSettings->mAmount * 10000) / 2);
108     }
109     isburst = (isIsochronous(mSettings) || isPeriodicBurst(mSettings) || (isTripTime(mSettings) && !isUDP(mSettings)));
110     if (isburst && (mSettings->mFPS > 0.0)) {
111 	sorcvtimer = static_cast<int>(round(2000000.0 / mSettings->mFPS));
112     }
113     if (sorcvtimer > 0) {
114 	SetSocketOptionsReceiveTimeout(mSettings, sorcvtimer);
115     }
116 }
117 
118 /* -------------------------------------------------------------------
119  * Destructor close socket.
120  * ------------------------------------------------------------------- */
~Server()121 Server::~Server () {
122 #if HAVE_THREAD_DEBUG
123     thread_debug("Server destructor sock=%d fullduplex=%s", mySocket, (isFullDuplex(mSettings) ? "true" : "false"));
124 #endif
125 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
126     if (myDropSocket != INVALID_SOCKET) {
127 	int rc = close(myDropSocket);
128 	WARN_errno(rc == SOCKET_ERROR, "server close drop");
129 	myDropSocket = INVALID_SOCKET;
130     }
131 #endif
132 }
133 
InProgress()134 inline bool Server::InProgress () {
135     return !(sInterupted || peerclose ||
136 	((isServerModeTime(mSettings) || (isModeTime(mSettings) && isReverse(mSettings))) && mEndTime.before(reportstruct->packetTime)));
137 }
138 
139 /* -------------------------------------------------------------------
140  * Receive TCP data from the (connected) socket.
141  * Sends termination flag several times at the end.
142  * Does not close the socket.
143  * ------------------------------------------------------------------- */
RunTCP()144 void Server::RunTCP () {
145     long currLen;
146     intmax_t totLen = 0;
147     struct TCP_burst_payload burst_info;
148     Timestamp time1, time2;
149     double tokens=0.000004;
150 
151     if (!InitTrafficLoop())
152 	return;
153     myReport->info.ts.prevsendTime = myReport->info.ts.startTime;
154 
155     int burst_nleft = 0;
156     burst_info.burst_id = 0;
157 
158     burst_info.send_tt.write_tv_sec = 0;
159     burst_info.send_tt.write_tv_usec = 0;
160     now.setnow();
161     reportstruct->packetTime.tv_sec = now.getSecs();
162     reportstruct->packetTime.tv_usec = now.getUsecs();
163     while (InProgress()) {
164 //	printf("***** bid expect = %u\n", burstid_expect);
165 	reportstruct->emptyreport=1;
166 	currLen = 0;
167 	// perform read
168 	if (isBWSet(mSettings)) {
169 	    time2.setnow();
170 	    tokens += time2.subSec(time1) * (mSettings->mAppRate / 8.0);
171 	    time1 = time2;
172 	}
173 	reportstruct->transit_ready = 0;
174 	if (tokens >= 0.0) {
175 	    int n = 0;
176 	    int readLen = mSettings->mBufLen;
177 	    if (burst_nleft > 0)
178 		readLen = (mSettings->mBufLen < burst_nleft) ? mSettings->mBufLen : burst_nleft;
179 	    reportstruct->emptyreport=1;
180 	    if (isburst && (burst_nleft == 0)) {
181 		if ((n = recvn(mSettings->mSock, reinterpret_cast<char *>(&burst_info), sizeof(struct TCP_burst_payload), 0)) == sizeof(struct TCP_burst_payload)) {
182 		    // burst_info.typelen.type = ntohl(burst_info.typelen.type);
183 		    // burst_info.typelen.length = ntohl(burst_info.typelen.length);
184 		    burst_info.flags = ntohl(burst_info.flags);
185 		    burst_info.burst_size = ntohl(burst_info.burst_size);
186 		    assert(burst_info.burst_size > 0);
187 		    reportstruct->burstsize = burst_info.burst_size;
188 		    burst_info.burst_id = ntohl(burst_info.burst_id);
189 		    reportstruct->frameID = burst_info.burst_id;
190 		    if (isTripTime(mSettings)) {
191 			reportstruct->sentTime.tv_sec = ntohl(burst_info.send_tt.write_tv_sec);
192 			reportstruct->sentTime.tv_usec = ntohl(burst_info.send_tt.write_tv_usec);
193 		    } else {
194 			now.setnow();
195 			reportstruct->sentTime.tv_sec = now.getSecs();
196 			reportstruct->sentTime.tv_usec = now.getUsecs();
197 		    }
198 		    // This is the first stamp of the burst
199 		    myReport->info.ts.prevsendTime = reportstruct->sentTime;
200 		    burst_nleft = burst_info.burst_size - n;
201 		    if (burst_nleft == 0) {
202 			reportstruct->prevSentTime = myReport->info.ts.prevsendTime;
203 			reportstruct->transit_ready = 1;
204 		    }
205 		    currLen += n;
206 		    readLen = (mSettings->mBufLen < burst_nleft) ? mSettings->mBufLen : burst_nleft;
207 		    WARN(burst_nleft <= 0, "invalid burst read req size");
208 		    // thread_debug("***read burst header size %d id=%d", burst_info.burst_size, burst_info.burst_id);
209 		} else {
210 		    if (n > 0) {
211 		        WARN(1, "partial readn");
212 #ifdef HAVE_THREAD_DEBUG
213 		        thread_debug("TCP burst partial read of %d wanted %d", n, sizeof(struct TCP_burst_payload));
214 		    } else {
215 		        thread_debug("Detected peer close");
216 #endif
217 		    }
218 		    goto Done;
219 		}
220 	    }
221 	    if (!reportstruct->transit_ready) {
222 		n = recv(mSettings->mSock, mSettings->mBuf, readLen, 0);
223 		if (n > 0) {
224 		    reportstruct->emptyreport = 0;
225 		    if (isburst) {
226 			burst_nleft -= n;
227 			if (burst_nleft == 0) {
228 			    reportstruct->prevSentTime = myReport->info.ts.prevsendTime;
229 			    reportstruct->transit_ready = 1;
230 			}
231 		    }
232 		} else if (n == 0) {
233 		    peerclose = true;
234 #ifdef HAVE_THREAD_DEBUG
235 		    thread_debug("Server thread detected EOF on socket %d", mSettings->mSock);
236 #endif
237 		} else if ((n < 0) && (FATALTCPREADERR(errno))) {
238 		    WARN_errno(1, "recv");
239 		    peerclose = true;
240 		    n = 0;
241 		}
242 		currLen += n;
243 	    }
244 	    now.setnow();
245 	    reportstruct->packetTime.tv_sec = now.getSecs();
246 	    reportstruct->packetTime.tv_usec = now.getUsecs();
247 	    totLen += currLen;
248 	    if (isBWSet(mSettings))
249 		tokens -= currLen;
250 
251 	    reportstruct->packetLen = currLen;
252 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
253 	    ReportPacket(myReport, reportstruct, NULL);
254 #else
255 	    ReportPacket(myReport, reportstruct);
256 #endif
257 	    // Check for reverse and amount where
258 	    // the server stops after receiving
259 	    // the expected byte count
260 	    if (isReverse(mSettings) && !isModeTime(mSettings) && (totLen >= static_cast<intmax_t>(mSettings->mAmount))) {
261 	        break;
262 	    }
263 	} else {
264 	    // Use a 4 usec delay to fill tokens
265 	    delay_loop(4);
266 	}
267     }
268   Done:
269     disarm_itimer();
270     // stop timing
271     now.setnow();
272     reportstruct->packetTime.tv_sec = now.getSecs();
273     reportstruct->packetTime.tv_usec = now.getUsecs();
274     reportstruct->packetLen = 0;
275     if (EndJob(myJob, reportstruct)) {
276 #if HAVE_THREAD_DEBUG
277 	thread_debug("tcp close sock=%d", mySocket);
278 #endif
279 	int rc = close(mySocket);
280 	WARN_errno(rc == SOCKET_ERROR, "server close");
281     }
282     Iperf_remove_host(mSettings);
283     FreeReport(myJob);
284 }
285 
InitKernelTimeStamping()286 void Server::InitKernelTimeStamping () {
287 #if HAVE_DECL_SO_TIMESTAMP
288     iov[0].iov_base=mSettings->mBuf;
289     iov[0].iov_len=mSettings->mBufLen;
290 
291     message.msg_iov=iov;
292     message.msg_iovlen=1;
293     message.msg_name=&srcaddr;
294     message.msg_namelen=sizeof(srcaddr);
295 
296     message.msg_control = (char *) ctrl;
297     message.msg_controllen = sizeof(ctrl);
298 
299     int timestampOn = 1;
300     if (setsockopt(mSettings->mSock, SOL_SOCKET, SO_TIMESTAMP, &timestampOn, sizeof(timestampOn)) < 0) {
301 	WARN_errno(mSettings->mSock == SO_TIMESTAMP, "socket");
302     }
303 #endif
304 }
305 
306 //
307 // Set the report start times and next report times, options
308 // are now, the accept time or the first write time
309 //
SetFullDuplexReportStartTime()310 inline void Server::SetFullDuplexReportStartTime () {
311     assert(myReport->FullDuplexReport != NULL);
312     struct TransferInfo *fullduplexstats = &myReport->FullDuplexReport->info;
313     assert(fullduplexstats != NULL);
314     if (TimeZero(fullduplexstats->ts.startTime)) {
315 	fullduplexstats->ts.startTime = myReport->info.ts.startTime;
316 	if (isModeTime(mSettings)) {
317 	    fullduplexstats->ts.nextTime = myReport->info.ts.nextTime;
318 	}
319     }
320 #ifdef HAVE_THREAD_DEBUG
321     thread_debug("Server fullduplex report start=%ld.%ld next=%ld.%ld", fullduplexstats->ts.startTime.tv_sec, fullduplexstats->ts.startTime.tv_usec, fullduplexstats->ts.nextTime.tv_sec, fullduplexstats->ts.nextTime.tv_usec);
322 #endif
323 }
324 
SetReportStartTime()325 inline void Server::SetReportStartTime () {
326     if (TimeZero(myReport->info.ts.startTime)) {
327 	if (!TimeZero(mSettings->sent_time) && !isTxStartTime(mSettings)) {
328 	    // Servers that aren't full duplex use the accept timestamp for start
329 	    myReport->info.ts.startTime.tv_sec = mSettings->sent_time.tv_sec;
330 	    myReport->info.ts.startTime.tv_usec = mSettings->sent_time.tv_usec;
331 	} else if (!TimeZero(mSettings->accept_time) && !isTxStartTime(mSettings)) {
332 	    // Servers that aren't full duplex use the accept timestamp for start
333 	    myReport->info.ts.startTime.tv_sec = mSettings->accept_time.tv_sec;
334 	    myReport->info.ts.startTime.tv_usec = mSettings->accept_time.tv_usec;
335 	} else {
336 	    now.setnow();
337 	    myReport->info.ts.startTime.tv_sec = now.getSecs();
338 	    myReport->info.ts.startTime.tv_usec = now.getUsecs();
339 	}
340     }
341     myReport->info.ts.IPGstart = myReport->info.ts.startTime;
342 
343     if (!TimeZero(myReport->info.ts.intervalTime)) {
344 	myReport->info.ts.nextTime = myReport->info.ts.startTime;
345 	TimeAdd(myReport->info.ts.nextTime, myReport->info.ts.intervalTime);
346     }
347     if (myReport->GroupSumReport) {
348 	struct TransferInfo *sumstats = &myReport->GroupSumReport->info;
349 	assert(sumstats != NULL);
350 	Mutex_Lock(&myReport->GroupSumReport->reference.lock);
351 	if (TimeZero(sumstats->ts.startTime)) {
352 	    sumstats->ts.startTime = myReport->info.ts.startTime;
353 	    if (isModeTime(mSettings)) {
354 		sumstats->ts.nextTime = myReport->info.ts.nextTime;
355 	    }
356 	}
357 	Mutex_Unlock(&myReport->GroupSumReport->reference.lock);
358     }
359 #ifdef HAVE_THREAD_DEBUG
360     thread_debug("Server(%d) report start=%ld.%ld next=%ld.%ld", mSettings->mSock, myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec, myReport->info.ts.nextTime.tv_sec, myReport->info.ts.nextTime.tv_usec);
361 #endif
362 }
363 
ClientReverseFirstRead(void)364 void Server::ClientReverseFirstRead (void) {
365     // Handle the case when the client spawns a server (no listener) and need the initial header
366     // Case of --trip-times and --reverse or --fullduplex, listener handles normal case
367     // Handle the case when the client spawns a server (no listener) and need the initial header
368     // Case of --trip-times and --reverse or --fullduplex, listener handles normal case
369     if (isReverse(mSettings) && (isTripTime(mSettings) || isPeriodicBurst(mSettings) || isIsochronous(mSettings))) {
370         int nread = 0;
371 	uint32_t flags = 0;
372 	int readlen = 0;
373 	if (isUDP(mSettings)) {
374 	    nread = recvn(mSettings->mSock, mSettings->mBuf, mSettings->mBufLen, 0);
375 	    switch (nread) {
376 	    case 0:
377 		//peer closed the socket, with no writes e.g. a connect-only test
378 		peerclose = true;
379 		break;
380 	    case -1 :
381 		FAIL_errno(1, "recvn-reverse", mSettings);
382 		break;
383 	    default :
384 		struct client_udp_testhdr *udp_pkt = reinterpret_cast<struct client_udp_testhdr *>(mSettings->mBuf);
385 		flags = ntohl(udp_pkt->base.flags);
386 		mSettings->sent_time.tv_sec = ntohl(udp_pkt->start_fq.start_tv_sec);
387 		mSettings->sent_time.tv_usec = ntohl(udp_pkt->start_fq.start_tv_usec);
388 		reportstruct->packetLen = nread;
389 		reportstruct->packetID = 1;
390 		break;
391 	    }
392 	} else {
393 	    nread = recvn(mSettings->mSock, mSettings->mBuf, sizeof(uint32_t), 0);
394 	    if (nread == 0) {
395 		fprintf(stderr, "WARN: zero read on header flags\n");
396 		//peer closed the socket, with no writes e.g. a connect-only test
397 		peerclose = true;
398 	    }
399 	    FAIL_errno((nread < (int) sizeof(uint32_t)), "client read tcp flags", mSettings);
400 	    reportstruct->packetID = 1;
401 	    struct client_tcp_testhdr *tcp_pkt = reinterpret_cast<struct client_tcp_testhdr *>(mSettings->mBuf);
402 	    flags = ntohl(tcp_pkt->base.flags);
403 	    // figure out the length of the test header
404 	    if ((readlen = Settings_ClientTestHdrLen(flags, mSettings)) > 0) {
405 		// read the test settings passed to the mSettings by the client
406 	        int adj = (readlen - sizeof(uint32_t));
407 	        nread = recvn(mSettings->mSock, (mSettings->mBuf + sizeof(uint32_t)), adj, 0);
408 		if (nread == 0) {
409 		    peerclose = true;
410 		}
411 		FAIL_errno((nread < adj), "client read tcp test info", mSettings);
412 		if (nread > 0) {
413 		    struct client_tcp_testhdr *tcp_pkt = reinterpret_cast<struct client_tcp_testhdr *>(mSettings->mBuf);
414 		    mSettings->sent_time.tv_sec = ntohl(tcp_pkt->start_fq.start_tv_sec);
415 		    mSettings->sent_time.tv_usec = ntohl(tcp_pkt->start_fq.start_tv_usec);
416 		}
417 		mSettings->firstreadbytes = readlen;
418 	    }
419 	}
420     }
421 }
422 
InitTrafficLoop(void)423 bool Server::InitTrafficLoop (void) {
424     myJob = InitIndividualReport(mSettings);
425     myReport = static_cast<struct ReporterData *>(myJob->this_report);
426     assert(myJob != NULL);
427     //  copy the thread drop socket to this object such
428     //  that the destructor can close it if needed
429 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
430     if (mSettings->mSockDrop > 0)
431         myDropSocket = mSettings->mSockDrop;
432 #endif
433     // Initialze the reportstruct scratchpad
434     reportstruct = &scratchpad;
435     reportstruct->packetID = 0;
436     reportstruct->l2len = 0;
437     reportstruct->l2errors = 0x0;
438 
439     int setfullduplexflag = 0;
440     if (isFullDuplex(mSettings) && !isServerReverse(mSettings)) {
441 	assert(mSettings->mFullDuplexReport != NULL);
442 	if ((setfullduplexflag = fullduplex_start_barrier(&mSettings->mFullDuplexReport->fullduplex_barrier)) < 0)
443 	    exit(-1);
444     }
445     Timestamp now;
446     if (isReverse(mSettings)) {
447 	mSettings->accept_time.tv_sec = now.getSecs();
448 	mSettings->accept_time.tv_usec = now.getUsecs();
449 	ClientReverseFirstRead();
450     }
451     if (isTripTime(mSettings)) {
452 	if ((abs(now.getSecs() - mSettings->sent_time.tv_sec)) > MAXDIFFTIMESTAMPSECS) {
453 	    unsetTripTime(mSettings);
454 	    fprintf(stdout,"WARN: ignore --trip-times because client didn't provide valid start timestamp within %d seconds of now\n", MAXDIFFTIMESTAMPSECS);
455 	    mSettings->accept_time.tv_sec = now.getSecs();
456 	    mSettings->accept_time.tv_usec = now.getUsecs();
457 	}
458     }
459     SetReportStartTime();
460     reportstruct->prevPacketTime = myReport->info.ts.startTime;
461 
462     if (setfullduplexflag)
463 	SetFullDuplexReportStartTime();
464 
465     if (isServerModeTime(mSettings) || (isModeTime(mSettings) && (isServerReverse(mSettings) || isFullDuplex(mSettings) || isReverse(mSettings)))) {
466 	if (isServerReverse(mSettings) || isFullDuplex(mSettings) || isReverse(mSettings))
467 	   mSettings->mAmount += (SLOPSECS * 100);  // add 2 sec for slop on reverse, units are 10 ms
468 #ifdef HAVE_SETITIMER
469         int err;
470         struct itimerval it;
471 	memset (&it, 0, sizeof (it));
472 	it.it_value.tv_sec = static_cast<int>(mSettings->mAmount / 100.0);
473 	it.it_value.tv_usec = static_cast<int>(10000 * (mSettings->mAmount -
474 					      it.it_value.tv_sec * 100.0));
475 	err = setitimer(ITIMER_REAL, &it, NULL);
476 	FAIL_errno(err != 0, "setitimer", mSettings);
477 #endif
478         mEndTime.setnow();
479         mEndTime.add(mSettings->mAmount / 100.0);
480     }
481     if (!isSingleUDP(mSettings))
482 	PostReport(myJob);
483     // The first payload is different for TCP so read it and report it
484     // before entering the main loop
485 
486     if (mSettings->firstreadbytes > 0) {
487 	// printf("**** burst size = %d id = %d\n", burst_info.burst_size, burst_info.burst_id);
488 	reportstruct->frameID = 0;
489 	reportstruct->sentTime.tv_sec = myReport->info.ts.startTime.tv_sec;
490 	reportstruct->sentTime.tv_usec = myReport->info.ts.startTime.tv_usec;
491 	reportstruct->packetTime = reportstruct->sentTime;
492 	reportstruct->packetLen = mSettings->firstreadbytes;
493 	if (isUDP(mSettings)) {
494 	    ReadPacketID();
495 	}
496 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
497 	ReportPacket(myReport, reportstruct, NULL);
498 #else
499 	ReportPacket(myReport, reportstruct);
500 #endif
501     }
502     return true;
503 }
504 
ReadWithRxTimestamp()505 inline int Server::ReadWithRxTimestamp () {
506     long currLen;
507     int tsdone = 0;
508 
509 #if HAVE_DECL_SO_TIMESTAMP
510     cmsg = reinterpret_cast<struct cmsghdr *>(&ctrl);
511     currLen = recvmsg(mSettings->mSock, &message, mSettings->recvflags);
512     if (currLen > 0) {
513 	if (cmsg->cmsg_level == SOL_SOCKET &&
514 	    cmsg->cmsg_type  == SCM_TIMESTAMP &&
515 	    cmsg->cmsg_len   == CMSG_LEN(sizeof(struct timeval))) {
516 	    memcpy(&(reportstruct->packetTime), CMSG_DATA(cmsg), sizeof(struct timeval));
517 	    tsdone = 1;
518 	}
519     }
520 #else
521     currLen = recv(mSettings->mSock, mSettings->mBuf, mSettings->mBufLen, mSettings->recvflags);
522 #endif
523     if (currLen <=0) {
524 	// Socket read timeout or read error
525 	reportstruct->emptyreport=1;
526 	if (currLen == 0) {
527 	    peerclose = true;
528 	} else if (FATALUDPREADERR(errno)) {
529 	    WARN_errno(1, "recvmsg");
530 	    currLen = 0;
531 	    peerclose = true;
532 	}
533     } else if (TimeZero(myReport->info.ts.prevpacketTime)) {
534 	myReport->info.ts.prevpacketTime = reportstruct->packetTime;
535     }
536     if (!tsdone) {
537 	now.setnow();
538 	reportstruct->packetTime.tv_sec = now.getSecs();
539 	reportstruct->packetTime.tv_usec = now.getUsecs();
540     }
541     return currLen;
542 }
543 
544 // Returns true if the client has indicated this is the final packet
ReadPacketID()545 inline bool Server::ReadPacketID () {
546     bool terminate = false;
547     struct UDP_datagram* mBuf_UDP  = reinterpret_cast<struct UDP_datagram*>(mSettings->mBuf + mSettings->l4payloadoffset);
548 
549     // terminate when datagram begins with negative index
550     // the datagram ID should be correct, just negated
551 
552     if (isSeqNo64b(mSettings)) {
553       // New client - Signed PacketID packed into unsigned id2,id
554       reportstruct->packetID = (static_cast<uint32_t>(ntohl(mBuf_UDP->id))) | (static_cast<uintmax_t>(ntohl(mBuf_UDP->id2)) << 32);
555 
556 #ifdef HAVE_PACKET_DEBUG
557       printf("id 0x%x, 0x%x -> %" PRIdMAX " (0x%" PRIxMAX ")\n",
558 	     ntohl(mBuf_UDP->id), ntohl(mBuf_UDP->id2), reportstruct->packetID, reportstruct->packetID);
559 #endif
560     } else {
561       // Old client - Signed PacketID in Signed id
562       reportstruct->packetID = static_cast<int32_t>(ntohl(mBuf_UDP->id));
563 #ifdef HAVE_PACKET_DEBUG
564       printf("id 0x%x -> %" PRIdMAX " (0x%" PRIxMAX ")\n",
565 	     ntohl(mBuf_UDP->id), reportstruct->packetID, reportstruct->packetID);
566 #endif
567     }
568     if (reportstruct->packetID < 0) {
569       reportstruct->packetID = - reportstruct->packetID;
570       terminate = true;
571     }
572     // read the sent timestamp from the rx packet
573     reportstruct->sentTime.tv_sec = ntohl(mBuf_UDP->tv_sec);
574     reportstruct->sentTime.tv_usec = ntohl(mBuf_UDP->tv_usec);
575     return terminate;
576 }
577 
L2_processing()578 void Server::L2_processing () {
579 #if (HAVE_LINUX_FILTER_H) && (HAVE_AF_PACKET)
580     eth_hdr = reinterpret_cast<struct ether_header *>(mSettings->mBuf);
581     ip_hdr = reinterpret_cast<struct iphdr *>(mSettings->mBuf + sizeof(struct ether_header));
582     // L4 offest is set by the listener and depends upon IPv4 or IPv6
583     udp_hdr = reinterpret_cast<struct udphdr *>(mSettings->mBuf + mSettings->l4offset);
584     // Read the packet to get the UDP length
585     int udplen = ntohs(udp_hdr->len);
586     //
587     // in the event of an L2 error, double check the packet before passing it to the reporter,
588     // i.e. no reason to run iperf accounting on a packet that has no reasonable L3 or L4 headers
589     //
590     reportstruct->packetLen = udplen - sizeof(struct udphdr);
591     reportstruct->expected_l2len = reportstruct->packetLen + mSettings->l4offset + sizeof(struct udphdr);
592     if (reportstruct->l2len != reportstruct->expected_l2len) {
593 	reportstruct->l2errors |= L2LENERR;
594 	if (L2_quintuple_filter() != 0) {
595 	    reportstruct->l2errors |= L2UNKNOWN;
596 	    reportstruct->l2errors |= L2CSUMERR;
597 	    reportstruct->emptyreport = 1;
598 	}
599     }
600     if (!(reportstruct->l2errors & L2UNKNOWN)) {
601 	// perform UDP checksum test, returns zero on success
602 	int rc;
603 	rc = udpchecksum((void *)ip_hdr, (void *)udp_hdr, udplen, (isIPV6(mSettings) ? 1 : 0));
604 	if (rc) {
605 	    reportstruct->l2errors |= L2CSUMERR;
606 	    if ((!(reportstruct->l2errors & L2LENERR)) && (L2_quintuple_filter() != 0)) {
607 		reportstruct->emptyreport = 1;
608 		reportstruct->l2errors |= L2UNKNOWN;
609 	    }
610 	}
611     }
612 #endif // HAVE_AF_PACKET
613 }
614 
615 // Run the L2 packet through a quintuple check, i.e. proto/ip src/ip dst/src port/src dst
616 // and return zero is there is a match, otherwize return nonzero
L2_quintuple_filter()617 int Server::L2_quintuple_filter () {
618 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
619 
620 #define IPV4SRCOFFSET 12  // the ipv4 source address offset from the l3 pdu
621 #define IPV6SRCOFFSET 8 // the ipv6 source address offset
622 
623     // Get the expected values from the sockaddr structures
624     // Note: it's expected the initiating socket has aready "connected"
625     // and the sockaddr structs have been populated
626     // 2nd Note:  sockaddr structs are in network byte order
627     struct sockaddr *p = reinterpret_cast<sockaddr *>(&mSettings->peer);
628     struct sockaddr *l = reinterpret_cast<sockaddr *>(&mSettings->local);
629     // make sure sa_family is coherent for both src and dst
630     if (!(((l->sa_family == AF_INET) && (p->sa_family == AF_INET)) || ((l->sa_family == AF_INET6) && (p->sa_family == AF_INET6)))) {
631 	return -1;
632     }
633 
634     // check the L2 ethertype
635     struct ether_header *l2hdr = reinterpret_cast<struct ether_header *>(mSettings->mBuf);
636 
637     if (!isIPV6(mSettings)) {
638 	if (ntohs(l2hdr->ether_type) != ETHERTYPE_IP)
639 	    return -1;
640     } else {
641 	if (ntohs(l2hdr->ether_type) != ETHERTYPE_IPV6)
642 	    return -1;
643     }
644     // check the ip src/dst
645     const uint32_t *data;
646     udp_hdr = reinterpret_cast<struct udphdr *>(mSettings->mBuf + mSettings->l4offset);
647 
648     // Check plain old v4 using v4 addr structs
649     if (l->sa_family == AF_INET) {
650 	data = reinterpret_cast<const uint32_t *>(mSettings->mBuf + sizeof(struct ether_header) + IPV4SRCOFFSET);
651 	if ((reinterpret_cast<struct sockaddr_in *>(p))->sin_addr.s_addr != *data++)
652 	    return -1;
653 	if ((reinterpret_cast<struct sockaddr_in *>(l))->sin_addr.s_addr != *data)
654 	    return -1;
655 	if (udp_hdr->source != (reinterpret_cast<struct sockaddr_in *>(p))->sin_port)
656 	    return -1;
657 	if (udp_hdr->dest != (reinterpret_cast<struct sockaddr_in *>(l))->sin_port)
658 	    return -1;
659     } else {
660 	// Using the v6 addr structures
661 #  ifdef HAVE_IPV6
662 	struct in6_addr *v6peer = SockAddr_get_in6_addr(&mSettings->peer);
663 	struct in6_addr *v6local = SockAddr_get_in6_addr(&mSettings->local);
664 	if (isIPV6(mSettings)) {
665 	    int i;
666 	    data = reinterpret_cast<const uint32_t *>(mSettings->mBuf + sizeof(struct ether_header) + IPV6SRCOFFSET);
667 	    // check for v6 src/dst address match
668 	    for (i = 0; i < 4; i++) {
669 		if (v6peer->s6_addr32[i] != *data++)
670 		    return -1;
671 	    }
672 	    for (i = 0; i < 4; i++) {
673 		if (v6local->s6_addr32[i] != *data++)
674 		    return -1;
675 	    }
676 	} else { // v4 addr in v6 family struct
677 	    data = reinterpret_cast<const uint32_t *>(mSettings->mBuf + sizeof(struct ether_header) + IPV4SRCOFFSET);
678 	    if (v6peer->s6_addr32[3] != *data++)
679 		return -1;
680 	    if (v6peer->s6_addr32[3] != *data)
681 		return -1;
682 	}
683 	// check udp ports
684 	if (udp_hdr->source != (reinterpret_cast<struct sockaddr_in6 *>(p))->sin6_port)
685 	    return -1;
686 	if (udp_hdr->dest != (reinterpret_cast<struct sockaddr_in6 *>(l))->sin6_port)
687 	    return -1;
688 #  endif // HAVE_IPV6
689     }
690 #endif // HAVE_AF_PACKET
691     // made it through all the checks
692     return 0;
693 }
694 
udp_isoch_processing(int rxlen)695 inline void Server::udp_isoch_processing (int rxlen) {
696     // Ignore runt sized isoch packets
697     if (rxlen < static_cast<int>(sizeof(struct UDP_datagram) +  sizeof(struct client_hdr_v1) + sizeof(struct client_hdrext) + sizeof(struct isoch_payload))) {
698 	reportstruct->burstsize = 0;
699 	reportstruct->remaining = 0;
700 	reportstruct->frameID = 0;
701     } else {
702 	struct client_udp_testhdr *udp_pkt = reinterpret_cast<struct client_udp_testhdr *>(mSettings->mBuf);
703 	reportstruct->isochStartTime.tv_sec = ntohl(udp_pkt->isoch.start_tv_sec);
704 	reportstruct->isochStartTime.tv_usec = ntohl(udp_pkt->isoch.start_tv_usec);
705 	reportstruct->frameID = ntohl(udp_pkt->isoch.frameid);
706 	reportstruct->prevframeID = ntohl(udp_pkt->isoch.prevframeid);
707 	reportstruct->burstsize = ntohl(udp_pkt->isoch.burstsize);
708 	reportstruct->burstperiod = ntohl(udp_pkt->isoch.burstperiod);
709 	reportstruct->remaining = ntohl(udp_pkt->isoch.remaining);
710 	if ((reportstruct->remaining == rxlen) && ((reportstruct->frameID - reportstruct->prevframeID) == 1)) {
711 	    reportstruct->transit_ready = 1;
712 	}
713     }
714 }
715 
716 /* -------------------------------------------------------------------
717  * Receive UDP data from the (connected) socket.
718  * Sends termination flag several times at the end.
719  * Does not close the socket.
720  * ------------------------------------------------------------------- */
RunUDP()721 void Server::RunUDP () {
722     int rxlen;
723     bool lastpacket = false;
724 
725     if (!InitTrafficLoop())
726 	return;
727 
728     // Exit loop on three conditions
729     // 1) Fatal read error
730     // 2) Last packet of traffic flow sent by client
731     // 3) -t timer expires
732     while (InProgress() && !lastpacket) {
733 	// The emptyreport flag can be set
734 	// by any of the packet processing routines
735 	// If it's set the iperf reporter won't do
736 	// bandwidth accounting, basically it's indicating
737 	// that the reportstruct itself couldn't be
738 	// completely filled out.
739 	reportstruct->emptyreport=1;
740 	reportstruct->packetLen=0;
741 	// read the next packet with timestamp
742 	// will also set empty report or not
743 	rxlen=ReadWithRxTimestamp();
744 	if (!peerclose && (rxlen > 0)) {
745 	    reportstruct->emptyreport = 0;
746 	    reportstruct->packetLen = rxlen;
747 	    if (isL2LengthCheck(mSettings)) {
748 		reportstruct->l2len = rxlen;
749 		// L2 processing will set the reportstruct packet length with the length found in the udp header
750 		// and also set the expected length in the report struct.  The reporter thread
751 		// will do the compare and account and print l2 errors
752 		reportstruct->l2errors = 0x0;
753 		L2_processing();
754 	    }
755 	    if (!(reportstruct->l2errors & L2UNKNOWN)) {
756 		// ReadPacketID returns true if this is the last UDP packet sent by the client
757 		// also sets the packet rx time in the reportstruct
758 		reportstruct->prevSentTime = myReport->info.ts.prevsendTime;
759 		reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime;
760 		lastpacket = ReadPacketID();
761 		myReport->info.ts.prevsendTime = reportstruct->sentTime;
762 		myReport->info.ts.prevpacketTime = reportstruct->packetTime;
763 		if (isIsochronous(mSettings)) {
764 		    udp_isoch_processing(rxlen);
765 		}
766 	    }
767 	}
768 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
769 	ReportPacket(myReport, reportstruct, NULL);
770 #else
771 	ReportPacket(myReport, reportstruct);
772 #endif
773     }
774     disarm_itimer();
775     int do_close = EndJob(myJob, reportstruct);
776     if (!isMulticast(mSettings) && !isNoUDPfin(mSettings)) {
777 	// send a UDP acknowledgement back except when:
778 	// 1) we're NOT receiving multicast
779 	// 2) the user requested no final exchange
780 	// 3) this is a full duplex test
781 	write_UDP_AckFIN(&myReport->info, mSettings->mBufLen);
782     }
783     if (do_close) {
784 #if HAVE_THREAD_DEBUG
785 	thread_debug("udp close sock=%d", mySocket);
786 #endif
787 	int rc = close(mySocket);
788 	WARN_errno(rc == SOCKET_ERROR, "server close");
789     }
790     Iperf_remove_host(mSettings);
791     FreeReport(myJob);
792 }
793 // end Recv
794