1 /*---------------------------------------------------------------
2 * Copyright (c) 1999,2000,2001,2002,2003
3 * The Board of Trustees of the University of Illinois
4 * All Rights Reserved.
5 *---------------------------------------------------------------
6 * Permission is hereby granted, free of charge, to any person
7 * obtaining a copy of this software (Iperf) and associated
8 * documentation files (the "Software"), to deal in the Software
9 * without restriction, including without limitation the
10 * rights to use, copy, modify, merge, publish, distribute,
11 * sublicense, and/or sell copies of the Software, and to permit
12 * persons to whom the Software is furnished to do
13 * so, subject to the following conditions:
14 *
15 *
16 * Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and
18 * the following disclaimers.
19 *
20 *
21 * Redistributions in binary form must reproduce the above
22 * copyright notice, this list of conditions and the following
23 * disclaimers in the documentation and/or other materials
24 * provided with the distribution.
25 *
26 *
27 * Neither the names of the University of Illinois, NCSA,
28 * nor the names of its contributors may be used to endorse
29 * or promote products derived from this Software without
30 * specific prior written permission.
31 *
32 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
33 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
34 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
35 * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT
36 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
37 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
38 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
39 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
40 * ________________________________________________________________
41 * National Laboratory for Applied Network Research
42 * National Center for Supercomputing Applications
43 * University of Illinois at Urbana-Champaign
44 * http://www.ncsa.uiuc.edu
45 * ________________________________________________________________
46 *
47 * Server.cpp
48 * by Mark Gates <mgates@nlanr.net>
49 * Ajay Tirumala (tirumala@ncsa.uiuc.edu>.
50 * -------------------------------------------------------------------
51 * A server thread is initiated for each connection accept() returns.
52 * Handles sending and receiving data, and then closes socket.
53 * Changes to this version : The server can be run as a daemon
54 * ------------------------------------------------------------------- */
55
56 #define HEADERS()
57
58 #include "headers.h"
59 #include "Server.hpp"
60 #include "active_hosts.h"
61 #include "Extractor.h"
62 #include "Reporter.h"
63 #include "Locale.h"
64 #include "delay.h"
65 #include "PerfSocket.hpp"
66 #include "SocketAddr.h"
67 #include "payloads.h"
68 #include <cmath>
69 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
70 #include "checksums.h"
71 #endif
72
73
74 /* -------------------------------------------------------------------
75 * Stores connected socket and socket info.
76 * ------------------------------------------------------------------- */
77
Server(thread_Settings * inSettings)78 Server::Server (thread_Settings *inSettings) {
79 #ifdef HAVE_THREAD_DEBUG
80 thread_debug("Server constructor with thread=%p sum=%p (sock=%d)", (void *) inSettings, (void *)inSettings->mSumReport, inSettings->mSock);
81 #endif
82 mSettings = inSettings;
83 myJob = NULL;
84 reportstruct = &scratchpad;
85 memset(&scratchpad, 0, sizeof(struct ReportStruct));
86 mySocket = inSettings->mSock;
87 peerclose = false;
88 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
89 myDropSocket = inSettings->mSockDrop;
90 if (isL2LengthCheck(mSettings)) {
91 // For L2 UDP make sure we can receive a full ethernet packet plus a bit more
92 if (mSettings->mBufLen < (2 * ETHER_MAX_LEN)) {
93 mSettings->mBufLen = (2 * ETHER_MAX_LEN);
94 }
95 }
96 #endif
97 // Enable kernel level timestamping if available
98 InitKernelTimeStamping();
99 int sorcvtimer = 0;
100 // sorcvtimer units microseconds convert to that
101 // minterval double, units seconds
102 // mAmount integer, units 10 milliseconds
103 // divide by two so timeout is 1/2 the interval
104 if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) {
105 sorcvtimer = static_cast<int>(round(1000000.0 * mSettings->mInterval / 2.0));
106 } else if (isServerModeTime(mSettings)) {
107 sorcvtimer = static_cast<int>(round(mSettings->mAmount * 10000) / 2);
108 }
109 isburst = (isIsochronous(mSettings) || isPeriodicBurst(mSettings) || (isTripTime(mSettings) && !isUDP(mSettings)));
110 if (isburst && (mSettings->mFPS > 0.0)) {
111 sorcvtimer = static_cast<int>(round(2000000.0 / mSettings->mFPS));
112 }
113 if (sorcvtimer > 0) {
114 SetSocketOptionsReceiveTimeout(mSettings, sorcvtimer);
115 }
116 }
117
118 /* -------------------------------------------------------------------
119 * Destructor close socket.
120 * ------------------------------------------------------------------- */
~Server()121 Server::~Server () {
122 #if HAVE_THREAD_DEBUG
123 thread_debug("Server destructor sock=%d fullduplex=%s", mySocket, (isFullDuplex(mSettings) ? "true" : "false"));
124 #endif
125 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
126 if (myDropSocket != INVALID_SOCKET) {
127 int rc = close(myDropSocket);
128 WARN_errno(rc == SOCKET_ERROR, "server close drop");
129 myDropSocket = INVALID_SOCKET;
130 }
131 #endif
132 }
133
InProgress()134 inline bool Server::InProgress () {
135 return !(sInterupted || peerclose ||
136 ((isServerModeTime(mSettings) || (isModeTime(mSettings) && isReverse(mSettings))) && mEndTime.before(reportstruct->packetTime)));
137 }
138
139 /* -------------------------------------------------------------------
140 * Receive TCP data from the (connected) socket.
141 * Sends termination flag several times at the end.
142 * Does not close the socket.
143 * ------------------------------------------------------------------- */
RunTCP()144 void Server::RunTCP () {
145 long currLen;
146 intmax_t totLen = 0;
147 struct TCP_burst_payload burst_info;
148 Timestamp time1, time2;
149 double tokens=0.000004;
150
151 if (!InitTrafficLoop())
152 return;
153 myReport->info.ts.prevsendTime = myReport->info.ts.startTime;
154
155 int burst_nleft = 0;
156 burst_info.burst_id = 0;
157
158 burst_info.send_tt.write_tv_sec = 0;
159 burst_info.send_tt.write_tv_usec = 0;
160 now.setnow();
161 reportstruct->packetTime.tv_sec = now.getSecs();
162 reportstruct->packetTime.tv_usec = now.getUsecs();
163 while (InProgress()) {
164 // printf("***** bid expect = %u\n", burstid_expect);
165 reportstruct->emptyreport=1;
166 currLen = 0;
167 // perform read
168 if (isBWSet(mSettings)) {
169 time2.setnow();
170 tokens += time2.subSec(time1) * (mSettings->mAppRate / 8.0);
171 time1 = time2;
172 }
173 reportstruct->transit_ready = 0;
174 if (tokens >= 0.0) {
175 int n = 0;
176 int readLen = mSettings->mBufLen;
177 if (burst_nleft > 0)
178 readLen = (mSettings->mBufLen < burst_nleft) ? mSettings->mBufLen : burst_nleft;
179 reportstruct->emptyreport=1;
180 if (isburst && (burst_nleft == 0)) {
181 if ((n = recvn(mSettings->mSock, reinterpret_cast<char *>(&burst_info), sizeof(struct TCP_burst_payload), 0)) == sizeof(struct TCP_burst_payload)) {
182 // burst_info.typelen.type = ntohl(burst_info.typelen.type);
183 // burst_info.typelen.length = ntohl(burst_info.typelen.length);
184 burst_info.flags = ntohl(burst_info.flags);
185 burst_info.burst_size = ntohl(burst_info.burst_size);
186 assert(burst_info.burst_size > 0);
187 reportstruct->burstsize = burst_info.burst_size;
188 burst_info.burst_id = ntohl(burst_info.burst_id);
189 reportstruct->frameID = burst_info.burst_id;
190 if (isTripTime(mSettings)) {
191 reportstruct->sentTime.tv_sec = ntohl(burst_info.send_tt.write_tv_sec);
192 reportstruct->sentTime.tv_usec = ntohl(burst_info.send_tt.write_tv_usec);
193 } else {
194 now.setnow();
195 reportstruct->sentTime.tv_sec = now.getSecs();
196 reportstruct->sentTime.tv_usec = now.getUsecs();
197 }
198 // This is the first stamp of the burst
199 myReport->info.ts.prevsendTime = reportstruct->sentTime;
200 burst_nleft = burst_info.burst_size - n;
201 if (burst_nleft == 0) {
202 reportstruct->prevSentTime = myReport->info.ts.prevsendTime;
203 reportstruct->transit_ready = 1;
204 }
205 currLen += n;
206 readLen = (mSettings->mBufLen < burst_nleft) ? mSettings->mBufLen : burst_nleft;
207 WARN(burst_nleft <= 0, "invalid burst read req size");
208 // thread_debug("***read burst header size %d id=%d", burst_info.burst_size, burst_info.burst_id);
209 } else {
210 if (n > 0) {
211 WARN(1, "partial readn");
212 #ifdef HAVE_THREAD_DEBUG
213 thread_debug("TCP burst partial read of %d wanted %d", n, sizeof(struct TCP_burst_payload));
214 } else {
215 thread_debug("Detected peer close");
216 #endif
217 }
218 goto Done;
219 }
220 }
221 if (!reportstruct->transit_ready) {
222 n = recv(mSettings->mSock, mSettings->mBuf, readLen, 0);
223 if (n > 0) {
224 reportstruct->emptyreport = 0;
225 if (isburst) {
226 burst_nleft -= n;
227 if (burst_nleft == 0) {
228 reportstruct->prevSentTime = myReport->info.ts.prevsendTime;
229 reportstruct->transit_ready = 1;
230 }
231 }
232 } else if (n == 0) {
233 peerclose = true;
234 #ifdef HAVE_THREAD_DEBUG
235 thread_debug("Server thread detected EOF on socket %d", mSettings->mSock);
236 #endif
237 } else if ((n < 0) && (FATALTCPREADERR(errno))) {
238 WARN_errno(1, "recv");
239 peerclose = true;
240 n = 0;
241 }
242 currLen += n;
243 }
244 now.setnow();
245 reportstruct->packetTime.tv_sec = now.getSecs();
246 reportstruct->packetTime.tv_usec = now.getUsecs();
247 totLen += currLen;
248 if (isBWSet(mSettings))
249 tokens -= currLen;
250
251 reportstruct->packetLen = currLen;
252 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
253 ReportPacket(myReport, reportstruct, NULL);
254 #else
255 ReportPacket(myReport, reportstruct);
256 #endif
257 // Check for reverse and amount where
258 // the server stops after receiving
259 // the expected byte count
260 if (isReverse(mSettings) && !isModeTime(mSettings) && (totLen >= static_cast<intmax_t>(mSettings->mAmount))) {
261 break;
262 }
263 } else {
264 // Use a 4 usec delay to fill tokens
265 delay_loop(4);
266 }
267 }
268 Done:
269 disarm_itimer();
270 // stop timing
271 now.setnow();
272 reportstruct->packetTime.tv_sec = now.getSecs();
273 reportstruct->packetTime.tv_usec = now.getUsecs();
274 reportstruct->packetLen = 0;
275 if (EndJob(myJob, reportstruct)) {
276 #if HAVE_THREAD_DEBUG
277 thread_debug("tcp close sock=%d", mySocket);
278 #endif
279 int rc = close(mySocket);
280 WARN_errno(rc == SOCKET_ERROR, "server close");
281 }
282 Iperf_remove_host(mSettings);
283 FreeReport(myJob);
284 }
285
InitKernelTimeStamping()286 void Server::InitKernelTimeStamping () {
287 #if HAVE_DECL_SO_TIMESTAMP
288 iov[0].iov_base=mSettings->mBuf;
289 iov[0].iov_len=mSettings->mBufLen;
290
291 message.msg_iov=iov;
292 message.msg_iovlen=1;
293 message.msg_name=&srcaddr;
294 message.msg_namelen=sizeof(srcaddr);
295
296 message.msg_control = (char *) ctrl;
297 message.msg_controllen = sizeof(ctrl);
298
299 int timestampOn = 1;
300 if (setsockopt(mSettings->mSock, SOL_SOCKET, SO_TIMESTAMP, ×tampOn, sizeof(timestampOn)) < 0) {
301 WARN_errno(mSettings->mSock == SO_TIMESTAMP, "socket");
302 }
303 #endif
304 }
305
306 //
307 // Set the report start times and next report times, options
308 // are now, the accept time or the first write time
309 //
SetFullDuplexReportStartTime()310 inline void Server::SetFullDuplexReportStartTime () {
311 assert(myReport->FullDuplexReport != NULL);
312 struct TransferInfo *fullduplexstats = &myReport->FullDuplexReport->info;
313 assert(fullduplexstats != NULL);
314 if (TimeZero(fullduplexstats->ts.startTime)) {
315 fullduplexstats->ts.startTime = myReport->info.ts.startTime;
316 if (isModeTime(mSettings)) {
317 fullduplexstats->ts.nextTime = myReport->info.ts.nextTime;
318 }
319 }
320 #ifdef HAVE_THREAD_DEBUG
321 thread_debug("Server fullduplex report start=%ld.%ld next=%ld.%ld", fullduplexstats->ts.startTime.tv_sec, fullduplexstats->ts.startTime.tv_usec, fullduplexstats->ts.nextTime.tv_sec, fullduplexstats->ts.nextTime.tv_usec);
322 #endif
323 }
324
SetReportStartTime()325 inline void Server::SetReportStartTime () {
326 if (TimeZero(myReport->info.ts.startTime)) {
327 if (!TimeZero(mSettings->sent_time) && !isTxStartTime(mSettings)) {
328 // Servers that aren't full duplex use the accept timestamp for start
329 myReport->info.ts.startTime.tv_sec = mSettings->sent_time.tv_sec;
330 myReport->info.ts.startTime.tv_usec = mSettings->sent_time.tv_usec;
331 } else if (!TimeZero(mSettings->accept_time) && !isTxStartTime(mSettings)) {
332 // Servers that aren't full duplex use the accept timestamp for start
333 myReport->info.ts.startTime.tv_sec = mSettings->accept_time.tv_sec;
334 myReport->info.ts.startTime.tv_usec = mSettings->accept_time.tv_usec;
335 } else {
336 now.setnow();
337 myReport->info.ts.startTime.tv_sec = now.getSecs();
338 myReport->info.ts.startTime.tv_usec = now.getUsecs();
339 }
340 }
341 myReport->info.ts.IPGstart = myReport->info.ts.startTime;
342
343 if (!TimeZero(myReport->info.ts.intervalTime)) {
344 myReport->info.ts.nextTime = myReport->info.ts.startTime;
345 TimeAdd(myReport->info.ts.nextTime, myReport->info.ts.intervalTime);
346 }
347 if (myReport->GroupSumReport) {
348 struct TransferInfo *sumstats = &myReport->GroupSumReport->info;
349 assert(sumstats != NULL);
350 Mutex_Lock(&myReport->GroupSumReport->reference.lock);
351 if (TimeZero(sumstats->ts.startTime)) {
352 sumstats->ts.startTime = myReport->info.ts.startTime;
353 if (isModeTime(mSettings)) {
354 sumstats->ts.nextTime = myReport->info.ts.nextTime;
355 }
356 }
357 Mutex_Unlock(&myReport->GroupSumReport->reference.lock);
358 }
359 #ifdef HAVE_THREAD_DEBUG
360 thread_debug("Server(%d) report start=%ld.%ld next=%ld.%ld", mSettings->mSock, myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec, myReport->info.ts.nextTime.tv_sec, myReport->info.ts.nextTime.tv_usec);
361 #endif
362 }
363
ClientReverseFirstRead(void)364 void Server::ClientReverseFirstRead (void) {
365 // Handle the case when the client spawns a server (no listener) and need the initial header
366 // Case of --trip-times and --reverse or --fullduplex, listener handles normal case
367 // Handle the case when the client spawns a server (no listener) and need the initial header
368 // Case of --trip-times and --reverse or --fullduplex, listener handles normal case
369 if (isReverse(mSettings) && (isTripTime(mSettings) || isPeriodicBurst(mSettings) || isIsochronous(mSettings))) {
370 int nread = 0;
371 uint32_t flags = 0;
372 int readlen = 0;
373 if (isUDP(mSettings)) {
374 nread = recvn(mSettings->mSock, mSettings->mBuf, mSettings->mBufLen, 0);
375 switch (nread) {
376 case 0:
377 //peer closed the socket, with no writes e.g. a connect-only test
378 peerclose = true;
379 break;
380 case -1 :
381 FAIL_errno(1, "recvn-reverse", mSettings);
382 break;
383 default :
384 struct client_udp_testhdr *udp_pkt = reinterpret_cast<struct client_udp_testhdr *>(mSettings->mBuf);
385 flags = ntohl(udp_pkt->base.flags);
386 mSettings->sent_time.tv_sec = ntohl(udp_pkt->start_fq.start_tv_sec);
387 mSettings->sent_time.tv_usec = ntohl(udp_pkt->start_fq.start_tv_usec);
388 reportstruct->packetLen = nread;
389 reportstruct->packetID = 1;
390 break;
391 }
392 } else {
393 nread = recvn(mSettings->mSock, mSettings->mBuf, sizeof(uint32_t), 0);
394 if (nread == 0) {
395 fprintf(stderr, "WARN: zero read on header flags\n");
396 //peer closed the socket, with no writes e.g. a connect-only test
397 peerclose = true;
398 }
399 FAIL_errno((nread < (int) sizeof(uint32_t)), "client read tcp flags", mSettings);
400 reportstruct->packetID = 1;
401 struct client_tcp_testhdr *tcp_pkt = reinterpret_cast<struct client_tcp_testhdr *>(mSettings->mBuf);
402 flags = ntohl(tcp_pkt->base.flags);
403 // figure out the length of the test header
404 if ((readlen = Settings_ClientTestHdrLen(flags, mSettings)) > 0) {
405 // read the test settings passed to the mSettings by the client
406 int adj = (readlen - sizeof(uint32_t));
407 nread = recvn(mSettings->mSock, (mSettings->mBuf + sizeof(uint32_t)), adj, 0);
408 if (nread == 0) {
409 peerclose = true;
410 }
411 FAIL_errno((nread < adj), "client read tcp test info", mSettings);
412 if (nread > 0) {
413 struct client_tcp_testhdr *tcp_pkt = reinterpret_cast<struct client_tcp_testhdr *>(mSettings->mBuf);
414 mSettings->sent_time.tv_sec = ntohl(tcp_pkt->start_fq.start_tv_sec);
415 mSettings->sent_time.tv_usec = ntohl(tcp_pkt->start_fq.start_tv_usec);
416 }
417 mSettings->firstreadbytes = readlen;
418 }
419 }
420 }
421 }
422
InitTrafficLoop(void)423 bool Server::InitTrafficLoop (void) {
424 myJob = InitIndividualReport(mSettings);
425 myReport = static_cast<struct ReporterData *>(myJob->this_report);
426 assert(myJob != NULL);
427 // copy the thread drop socket to this object such
428 // that the destructor can close it if needed
429 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
430 if (mSettings->mSockDrop > 0)
431 myDropSocket = mSettings->mSockDrop;
432 #endif
433 // Initialze the reportstruct scratchpad
434 reportstruct = &scratchpad;
435 reportstruct->packetID = 0;
436 reportstruct->l2len = 0;
437 reportstruct->l2errors = 0x0;
438
439 int setfullduplexflag = 0;
440 if (isFullDuplex(mSettings) && !isServerReverse(mSettings)) {
441 assert(mSettings->mFullDuplexReport != NULL);
442 if ((setfullduplexflag = fullduplex_start_barrier(&mSettings->mFullDuplexReport->fullduplex_barrier)) < 0)
443 exit(-1);
444 }
445 Timestamp now;
446 if (isReverse(mSettings)) {
447 mSettings->accept_time.tv_sec = now.getSecs();
448 mSettings->accept_time.tv_usec = now.getUsecs();
449 ClientReverseFirstRead();
450 }
451 if (isTripTime(mSettings)) {
452 if ((abs(now.getSecs() - mSettings->sent_time.tv_sec)) > MAXDIFFTIMESTAMPSECS) {
453 unsetTripTime(mSettings);
454 fprintf(stdout,"WARN: ignore --trip-times because client didn't provide valid start timestamp within %d seconds of now\n", MAXDIFFTIMESTAMPSECS);
455 mSettings->accept_time.tv_sec = now.getSecs();
456 mSettings->accept_time.tv_usec = now.getUsecs();
457 }
458 }
459 SetReportStartTime();
460 reportstruct->prevPacketTime = myReport->info.ts.startTime;
461
462 if (setfullduplexflag)
463 SetFullDuplexReportStartTime();
464
465 if (isServerModeTime(mSettings) || (isModeTime(mSettings) && (isServerReverse(mSettings) || isFullDuplex(mSettings) || isReverse(mSettings)))) {
466 if (isServerReverse(mSettings) || isFullDuplex(mSettings) || isReverse(mSettings))
467 mSettings->mAmount += (SLOPSECS * 100); // add 2 sec for slop on reverse, units are 10 ms
468 #ifdef HAVE_SETITIMER
469 int err;
470 struct itimerval it;
471 memset (&it, 0, sizeof (it));
472 it.it_value.tv_sec = static_cast<int>(mSettings->mAmount / 100.0);
473 it.it_value.tv_usec = static_cast<int>(10000 * (mSettings->mAmount -
474 it.it_value.tv_sec * 100.0));
475 err = setitimer(ITIMER_REAL, &it, NULL);
476 FAIL_errno(err != 0, "setitimer", mSettings);
477 #endif
478 mEndTime.setnow();
479 mEndTime.add(mSettings->mAmount / 100.0);
480 }
481 if (!isSingleUDP(mSettings))
482 PostReport(myJob);
483 // The first payload is different for TCP so read it and report it
484 // before entering the main loop
485
486 if (mSettings->firstreadbytes > 0) {
487 // printf("**** burst size = %d id = %d\n", burst_info.burst_size, burst_info.burst_id);
488 reportstruct->frameID = 0;
489 reportstruct->sentTime.tv_sec = myReport->info.ts.startTime.tv_sec;
490 reportstruct->sentTime.tv_usec = myReport->info.ts.startTime.tv_usec;
491 reportstruct->packetTime = reportstruct->sentTime;
492 reportstruct->packetLen = mSettings->firstreadbytes;
493 if (isUDP(mSettings)) {
494 ReadPacketID();
495 }
496 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
497 ReportPacket(myReport, reportstruct, NULL);
498 #else
499 ReportPacket(myReport, reportstruct);
500 #endif
501 }
502 return true;
503 }
504
ReadWithRxTimestamp()505 inline int Server::ReadWithRxTimestamp () {
506 long currLen;
507 int tsdone = 0;
508
509 #if HAVE_DECL_SO_TIMESTAMP
510 cmsg = reinterpret_cast<struct cmsghdr *>(&ctrl);
511 currLen = recvmsg(mSettings->mSock, &message, mSettings->recvflags);
512 if (currLen > 0) {
513 if (cmsg->cmsg_level == SOL_SOCKET &&
514 cmsg->cmsg_type == SCM_TIMESTAMP &&
515 cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval))) {
516 memcpy(&(reportstruct->packetTime), CMSG_DATA(cmsg), sizeof(struct timeval));
517 tsdone = 1;
518 }
519 }
520 #else
521 currLen = recv(mSettings->mSock, mSettings->mBuf, mSettings->mBufLen, mSettings->recvflags);
522 #endif
523 if (currLen <=0) {
524 // Socket read timeout or read error
525 reportstruct->emptyreport=1;
526 if (currLen == 0) {
527 peerclose = true;
528 } else if (FATALUDPREADERR(errno)) {
529 WARN_errno(1, "recvmsg");
530 currLen = 0;
531 peerclose = true;
532 }
533 } else if (TimeZero(myReport->info.ts.prevpacketTime)) {
534 myReport->info.ts.prevpacketTime = reportstruct->packetTime;
535 }
536 if (!tsdone) {
537 now.setnow();
538 reportstruct->packetTime.tv_sec = now.getSecs();
539 reportstruct->packetTime.tv_usec = now.getUsecs();
540 }
541 return currLen;
542 }
543
544 // Returns true if the client has indicated this is the final packet
ReadPacketID()545 inline bool Server::ReadPacketID () {
546 bool terminate = false;
547 struct UDP_datagram* mBuf_UDP = reinterpret_cast<struct UDP_datagram*>(mSettings->mBuf + mSettings->l4payloadoffset);
548
549 // terminate when datagram begins with negative index
550 // the datagram ID should be correct, just negated
551
552 if (isSeqNo64b(mSettings)) {
553 // New client - Signed PacketID packed into unsigned id2,id
554 reportstruct->packetID = (static_cast<uint32_t>(ntohl(mBuf_UDP->id))) | (static_cast<uintmax_t>(ntohl(mBuf_UDP->id2)) << 32);
555
556 #ifdef HAVE_PACKET_DEBUG
557 printf("id 0x%x, 0x%x -> %" PRIdMAX " (0x%" PRIxMAX ")\n",
558 ntohl(mBuf_UDP->id), ntohl(mBuf_UDP->id2), reportstruct->packetID, reportstruct->packetID);
559 #endif
560 } else {
561 // Old client - Signed PacketID in Signed id
562 reportstruct->packetID = static_cast<int32_t>(ntohl(mBuf_UDP->id));
563 #ifdef HAVE_PACKET_DEBUG
564 printf("id 0x%x -> %" PRIdMAX " (0x%" PRIxMAX ")\n",
565 ntohl(mBuf_UDP->id), reportstruct->packetID, reportstruct->packetID);
566 #endif
567 }
568 if (reportstruct->packetID < 0) {
569 reportstruct->packetID = - reportstruct->packetID;
570 terminate = true;
571 }
572 // read the sent timestamp from the rx packet
573 reportstruct->sentTime.tv_sec = ntohl(mBuf_UDP->tv_sec);
574 reportstruct->sentTime.tv_usec = ntohl(mBuf_UDP->tv_usec);
575 return terminate;
576 }
577
L2_processing()578 void Server::L2_processing () {
579 #if (HAVE_LINUX_FILTER_H) && (HAVE_AF_PACKET)
580 eth_hdr = reinterpret_cast<struct ether_header *>(mSettings->mBuf);
581 ip_hdr = reinterpret_cast<struct iphdr *>(mSettings->mBuf + sizeof(struct ether_header));
582 // L4 offest is set by the listener and depends upon IPv4 or IPv6
583 udp_hdr = reinterpret_cast<struct udphdr *>(mSettings->mBuf + mSettings->l4offset);
584 // Read the packet to get the UDP length
585 int udplen = ntohs(udp_hdr->len);
586 //
587 // in the event of an L2 error, double check the packet before passing it to the reporter,
588 // i.e. no reason to run iperf accounting on a packet that has no reasonable L3 or L4 headers
589 //
590 reportstruct->packetLen = udplen - sizeof(struct udphdr);
591 reportstruct->expected_l2len = reportstruct->packetLen + mSettings->l4offset + sizeof(struct udphdr);
592 if (reportstruct->l2len != reportstruct->expected_l2len) {
593 reportstruct->l2errors |= L2LENERR;
594 if (L2_quintuple_filter() != 0) {
595 reportstruct->l2errors |= L2UNKNOWN;
596 reportstruct->l2errors |= L2CSUMERR;
597 reportstruct->emptyreport = 1;
598 }
599 }
600 if (!(reportstruct->l2errors & L2UNKNOWN)) {
601 // perform UDP checksum test, returns zero on success
602 int rc;
603 rc = udpchecksum((void *)ip_hdr, (void *)udp_hdr, udplen, (isIPV6(mSettings) ? 1 : 0));
604 if (rc) {
605 reportstruct->l2errors |= L2CSUMERR;
606 if ((!(reportstruct->l2errors & L2LENERR)) && (L2_quintuple_filter() != 0)) {
607 reportstruct->emptyreport = 1;
608 reportstruct->l2errors |= L2UNKNOWN;
609 }
610 }
611 }
612 #endif // HAVE_AF_PACKET
613 }
614
615 // Run the L2 packet through a quintuple check, i.e. proto/ip src/ip dst/src port/src dst
616 // and return zero is there is a match, otherwize return nonzero
L2_quintuple_filter()617 int Server::L2_quintuple_filter () {
618 #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
619
620 #define IPV4SRCOFFSET 12 // the ipv4 source address offset from the l3 pdu
621 #define IPV6SRCOFFSET 8 // the ipv6 source address offset
622
623 // Get the expected values from the sockaddr structures
624 // Note: it's expected the initiating socket has aready "connected"
625 // and the sockaddr structs have been populated
626 // 2nd Note: sockaddr structs are in network byte order
627 struct sockaddr *p = reinterpret_cast<sockaddr *>(&mSettings->peer);
628 struct sockaddr *l = reinterpret_cast<sockaddr *>(&mSettings->local);
629 // make sure sa_family is coherent for both src and dst
630 if (!(((l->sa_family == AF_INET) && (p->sa_family == AF_INET)) || ((l->sa_family == AF_INET6) && (p->sa_family == AF_INET6)))) {
631 return -1;
632 }
633
634 // check the L2 ethertype
635 struct ether_header *l2hdr = reinterpret_cast<struct ether_header *>(mSettings->mBuf);
636
637 if (!isIPV6(mSettings)) {
638 if (ntohs(l2hdr->ether_type) != ETHERTYPE_IP)
639 return -1;
640 } else {
641 if (ntohs(l2hdr->ether_type) != ETHERTYPE_IPV6)
642 return -1;
643 }
644 // check the ip src/dst
645 const uint32_t *data;
646 udp_hdr = reinterpret_cast<struct udphdr *>(mSettings->mBuf + mSettings->l4offset);
647
648 // Check plain old v4 using v4 addr structs
649 if (l->sa_family == AF_INET) {
650 data = reinterpret_cast<const uint32_t *>(mSettings->mBuf + sizeof(struct ether_header) + IPV4SRCOFFSET);
651 if ((reinterpret_cast<struct sockaddr_in *>(p))->sin_addr.s_addr != *data++)
652 return -1;
653 if ((reinterpret_cast<struct sockaddr_in *>(l))->sin_addr.s_addr != *data)
654 return -1;
655 if (udp_hdr->source != (reinterpret_cast<struct sockaddr_in *>(p))->sin_port)
656 return -1;
657 if (udp_hdr->dest != (reinterpret_cast<struct sockaddr_in *>(l))->sin_port)
658 return -1;
659 } else {
660 // Using the v6 addr structures
661 # ifdef HAVE_IPV6
662 struct in6_addr *v6peer = SockAddr_get_in6_addr(&mSettings->peer);
663 struct in6_addr *v6local = SockAddr_get_in6_addr(&mSettings->local);
664 if (isIPV6(mSettings)) {
665 int i;
666 data = reinterpret_cast<const uint32_t *>(mSettings->mBuf + sizeof(struct ether_header) + IPV6SRCOFFSET);
667 // check for v6 src/dst address match
668 for (i = 0; i < 4; i++) {
669 if (v6peer->s6_addr32[i] != *data++)
670 return -1;
671 }
672 for (i = 0; i < 4; i++) {
673 if (v6local->s6_addr32[i] != *data++)
674 return -1;
675 }
676 } else { // v4 addr in v6 family struct
677 data = reinterpret_cast<const uint32_t *>(mSettings->mBuf + sizeof(struct ether_header) + IPV4SRCOFFSET);
678 if (v6peer->s6_addr32[3] != *data++)
679 return -1;
680 if (v6peer->s6_addr32[3] != *data)
681 return -1;
682 }
683 // check udp ports
684 if (udp_hdr->source != (reinterpret_cast<struct sockaddr_in6 *>(p))->sin6_port)
685 return -1;
686 if (udp_hdr->dest != (reinterpret_cast<struct sockaddr_in6 *>(l))->sin6_port)
687 return -1;
688 # endif // HAVE_IPV6
689 }
690 #endif // HAVE_AF_PACKET
691 // made it through all the checks
692 return 0;
693 }
694
udp_isoch_processing(int rxlen)695 inline void Server::udp_isoch_processing (int rxlen) {
696 // Ignore runt sized isoch packets
697 if (rxlen < static_cast<int>(sizeof(struct UDP_datagram) + sizeof(struct client_hdr_v1) + sizeof(struct client_hdrext) + sizeof(struct isoch_payload))) {
698 reportstruct->burstsize = 0;
699 reportstruct->remaining = 0;
700 reportstruct->frameID = 0;
701 } else {
702 struct client_udp_testhdr *udp_pkt = reinterpret_cast<struct client_udp_testhdr *>(mSettings->mBuf);
703 reportstruct->isochStartTime.tv_sec = ntohl(udp_pkt->isoch.start_tv_sec);
704 reportstruct->isochStartTime.tv_usec = ntohl(udp_pkt->isoch.start_tv_usec);
705 reportstruct->frameID = ntohl(udp_pkt->isoch.frameid);
706 reportstruct->prevframeID = ntohl(udp_pkt->isoch.prevframeid);
707 reportstruct->burstsize = ntohl(udp_pkt->isoch.burstsize);
708 reportstruct->burstperiod = ntohl(udp_pkt->isoch.burstperiod);
709 reportstruct->remaining = ntohl(udp_pkt->isoch.remaining);
710 if ((reportstruct->remaining == rxlen) && ((reportstruct->frameID - reportstruct->prevframeID) == 1)) {
711 reportstruct->transit_ready = 1;
712 }
713 }
714 }
715
716 /* -------------------------------------------------------------------
717 * Receive UDP data from the (connected) socket.
718 * Sends termination flag several times at the end.
719 * Does not close the socket.
720 * ------------------------------------------------------------------- */
RunUDP()721 void Server::RunUDP () {
722 int rxlen;
723 bool lastpacket = false;
724
725 if (!InitTrafficLoop())
726 return;
727
728 // Exit loop on three conditions
729 // 1) Fatal read error
730 // 2) Last packet of traffic flow sent by client
731 // 3) -t timer expires
732 while (InProgress() && !lastpacket) {
733 // The emptyreport flag can be set
734 // by any of the packet processing routines
735 // If it's set the iperf reporter won't do
736 // bandwidth accounting, basically it's indicating
737 // that the reportstruct itself couldn't be
738 // completely filled out.
739 reportstruct->emptyreport=1;
740 reportstruct->packetLen=0;
741 // read the next packet with timestamp
742 // will also set empty report or not
743 rxlen=ReadWithRxTimestamp();
744 if (!peerclose && (rxlen > 0)) {
745 reportstruct->emptyreport = 0;
746 reportstruct->packetLen = rxlen;
747 if (isL2LengthCheck(mSettings)) {
748 reportstruct->l2len = rxlen;
749 // L2 processing will set the reportstruct packet length with the length found in the udp header
750 // and also set the expected length in the report struct. The reporter thread
751 // will do the compare and account and print l2 errors
752 reportstruct->l2errors = 0x0;
753 L2_processing();
754 }
755 if (!(reportstruct->l2errors & L2UNKNOWN)) {
756 // ReadPacketID returns true if this is the last UDP packet sent by the client
757 // also sets the packet rx time in the reportstruct
758 reportstruct->prevSentTime = myReport->info.ts.prevsendTime;
759 reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime;
760 lastpacket = ReadPacketID();
761 myReport->info.ts.prevsendTime = reportstruct->sentTime;
762 myReport->info.ts.prevpacketTime = reportstruct->packetTime;
763 if (isIsochronous(mSettings)) {
764 udp_isoch_processing(rxlen);
765 }
766 }
767 }
768 #ifdef HAVE_STRUCT_TCP_INFO_TCPI_TOTAL_RETRANS
769 ReportPacket(myReport, reportstruct, NULL);
770 #else
771 ReportPacket(myReport, reportstruct);
772 #endif
773 }
774 disarm_itimer();
775 int do_close = EndJob(myJob, reportstruct);
776 if (!isMulticast(mSettings) && !isNoUDPfin(mSettings)) {
777 // send a UDP acknowledgement back except when:
778 // 1) we're NOT receiving multicast
779 // 2) the user requested no final exchange
780 // 3) this is a full duplex test
781 write_UDP_AckFIN(&myReport->info, mSettings->mBufLen);
782 }
783 if (do_close) {
784 #if HAVE_THREAD_DEBUG
785 thread_debug("udp close sock=%d", mySocket);
786 #endif
787 int rc = close(mySocket);
788 WARN_errno(rc == SOCKET_ERROR, "server close");
789 }
790 Iperf_remove_host(mSettings);
791 FreeReport(myJob);
792 }
793 // end Recv
794