1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <netinet/in.h>
4 #include <arpa/inet.h>
5 #include <assert.h>
6 #include <fcntl.h>
7 #include <poll.h>
8 #include <pthread.h>
9 #include <signal.h>
10 #include <stdio.h>
11 #include <stdint.h>
12 #include <stdlib.h>
13 #include <string.h>
14 #include <time.h>
15 #include <unistd.h>
16 
17 #include <machine/cpufunc.h>
18 
19 #define TEST_KIND_ALL         0
20 #define TEST_KIND_UNCONNECTED 1
21 #define TEST_KIND_CONNECTED   2
22 #define TEST_KIND_HALFCONN    3
23 #define TEST_KIND_MAX         TEST_KIND_CONNECTED
24 
25 struct tconf {
26     int nthreads_max;
27     int nthreads_min;
28     int paylen_min;
29     int paylen_max;
30     const char *dstaddr;
31     int dstnetpref;
32     int test_kind;
33     uint64_t magic;
34 };
35 
36 static int
genrandomdest(struct tconf * cfp,struct sockaddr_in * s_in)37 genrandomdest(struct tconf *cfp, struct sockaddr_in *s_in)
38 {
39     struct in_addr raddr;
40     long rnum;
41     uint16_t rport;
42 
43     assert(s_in->sin_family == AF_INET);
44 
45     if (inet_aton(cfp->dstaddr, &raddr) == 0) {
46         return (-1);
47     }
48     rnum = random() >> cfp->dstnetpref;
49     raddr.s_addr |= htonl(rnum);
50     do {
51         rport = (uint16_t)(random());
52     } while (rport < 1000);
53     s_in->sin_addr = raddr;
54     s_in->sin_port = htons(rport);
55     return (0);
56 }
57 
58 struct pktdata {
59     uint64_t magic;
60     uint64_t send_ts;
61     int idx;
62 };
63 
64 union pkt {
65     unsigned char d[256];
66     struct pktdata pd;
67 };
68 
69 struct destination
70 {
71     int sin;
72     int sout;
73     int sconnected;
74     struct sockaddr_in daddr;
75     int buflen;
76     union pkt buf;
77 };
78 
79 struct workset
80 {
81     pthread_t tid;
82     int nreps;
83     int ndest;
84     double stime;
85     double etime;
86     struct destination dests[0];
87 };
88 
89 struct recvset
90 {
91     pthread_t tid;
92     int ndest;
93     uint64_t **nrecvd;
94     uint64_t nrecvd_total;
95     uint64_t npolls;
96     uint64_t rtt_total;
97     int done;
98     uint64_t magic;
99     struct pollfd pollset[0];
100 };
101 
102 static void
genrandombuf(struct destination * dp,int minlen,int maxlen)103 genrandombuf(struct destination *dp, int minlen, int maxlen)
104 {
105     unsigned int difflen;
106     int i;
107 
108     assert(minlen <= maxlen && maxlen <= sizeof(dp->buf));
109     difflen = maxlen - minlen;
110     if (difflen > 0) {
111         dp->buflen = minlen + (random() % (difflen + 1));
112     } else {
113         dp->buflen = minlen;
114     }
115     for (i = 0; i < dp->buflen; i++) {
116         dp->buf.d[i] = (unsigned char)random();
117     }
118 }
119 
120 static int
socket_ctor(int domain)121 socket_ctor(int domain)
122 {
123     int s, flags;
124 
125     s = socket(domain, SOCK_DGRAM, 0);
126     if (s == -1) {
127         return (-1);
128     }
129     flags = fcntl(s, F_GETFL);
130     fcntl(s, F_SETFL, flags | O_NONBLOCK);
131     return (s);
132 }
133 
134 static struct workset *
generate_workset(int setsize,struct tconf * cfp)135 generate_workset(int setsize, struct tconf *cfp)
136 {
137     struct workset *wp;
138     struct destination *dp;
139     size_t msize;
140     int i;
141 
142     msize = sizeof(struct workset) + (setsize * sizeof(struct destination));
143     wp = malloc(msize);
144     if (wp == NULL) {
145         return (NULL);
146     }
147     memset(wp, '\0', msize);
148     wp->ndest = setsize;
149     for (i = 0; i < setsize; i++) {
150         dp = &(wp->dests[i]);
151         dp->daddr.sin_family = AF_INET;
152         dp->sout = dp->sin = socket_ctor(dp->daddr.sin_family);
153         if (dp->sin == -1) {
154             goto e1;
155         }
156         genrandomdest(cfp, &dp->daddr);
157         genrandombuf(dp, cfp->paylen_min, cfp->paylen_max);
158         dp->buf.pd.magic = cfp->magic;
159         dp->buf.pd.idx = i;
160     }
161     return (wp);
162 e1:
163     for (i = i - 1; i >= 0; i--) {
164         close(wp->dests[i].sin);
165     }
166     free(wp);
167     return (NULL);
168 }
169 
170 #if !defined(sstosa)
171 #define sstosa(ss)      ((struct sockaddr *)(ss))
172 #endif
173 #if !defined(sstosin)
174 #define sstosin(ss)      ((struct sockaddr_in *)(ss))
175 #endif
176 
177 #if !defined(SA_LEN)
178 #define SA_LEN(sa) \
179   (((sa)->sa_family == AF_INET) ? \
180   sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))
181 #endif
182 #if !defined(SS_LEN)
183 #define SS_LEN(ss) \
184   (((ss)->ss_family == AF_INET) ? \
185   sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6))
186 #endif
187 
188 int
local4remote(struct sockaddr * ra,struct sockaddr_storage * la)189 local4remote(struct sockaddr *ra, struct sockaddr_storage *la)
190 {
191     int s, r;
192     socklen_t llen;
193 
194     s = socket(ra->sa_family, SOCK_DGRAM, 0);
195     if (s == -1) {
196         return (-1);
197     }
198     if (connect(s, ra, SA_LEN(ra)) == -1) {
199         close(s);
200         return (-1);
201     }
202     llen = sizeof(*la);
203     r = getsockname(s, sstosa(la), &llen);
204     close(s);
205     return (r);
206 }
207 
208 static int
connect_workset(struct workset * wp,int test_type)209 connect_workset(struct workset *wp, int test_type)
210 {
211     int i, r, reuse;
212     int rval;
213     socklen_t llen;
214     struct destination *dp;
215     struct sockaddr_storage la;
216     struct sockaddr_in *lip;
217 
218     rval = 0;
219     for (i = 0; i < wp->ndest; i++) {
220         dp = &(wp->dests[i]);
221         if (dp->sconnected == 0) {
222             if (test_type == TEST_KIND_HALFCONN) {
223                 dp->sout = socket_ctor(dp->daddr.sin_family);
224                 if (dp->sout == -1) {
225                     rval -= 1;
226                     dp->sout = dp->sin;
227                     continue;
228                 }
229 
230                 reuse = 1;
231                 r = setsockopt(dp->sin, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
232                 if (r == -1) {
233                     rval -= 1;
234                     continue;
235                 }
236 
237                 lip = sstosin(&la);
238                 lip->sin_addr.s_addr = INADDR_ANY;
239                 lip->sin_port = htons(0);
240                 llen = sizeof(struct sockaddr_in);
241                 r = bind(dp->sin, sstosa(&la), llen);
242                 if (r == -1) {
243                     rval -= 1;
244                     continue;
245                 }
246                 llen = sizeof(la);
247                 r = getsockname(dp->sin, sstosa(&la), &llen);
248                 if (r == -1) {
249                     rval -= 1;
250                     continue;
251                 }
252 
253                 reuse = 1;
254                 r = setsockopt(dp->sout, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
255                 if (r == -1) {
256                     rval -= 1;
257                     continue;
258                 }
259                 lip = sstosin(&la);
260 #if 0
261                 r = local4remote(sstosa(&dp->daddr), &lat);
262                 if (r == -1) {
263                     rval -= 1;
264                     continue;
265                 }
266                 lip->sin_addr.s_addr = sstosin(&lat)->sin_addr.s_addr;
267 #endif
268                 llen = sizeof(struct sockaddr_in);
269                 r = bind(dp->sout, sstosa(&la), llen);
270                 if (r == -1) {
271                     rval -= 1;
272                     continue;
273                 }
274             }
275             if (connect(dp->sout, sstosa(&dp->daddr), sizeof(dp->daddr)) != 0) {
276                 rval -= 1;
277                 continue;
278             }
279             if (test_type == TEST_KIND_HALFCONN) {
280 #if 0
281                 llen = sizeof(la);
282                 r = getsockname(dp->sout, sstosa(&la), &llen);
283                 if (r == -1) {
284                     rval -= 1;
285                     continue;
286                 }
287                 reuse = 1;
288                 r = setsockopt(dp->sin, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
289                 if (r == -1) {
290                     rval -= 1;
291                     continue;
292                 }
293                 lip = sstosin(&la);
294                 lip->sin_addr.s_addr = INADDR_ANY;
295                 llen = sizeof(struct sockaddr_in);
296                 r = bind(dp->sin, sstosa(&la), llen);
297                 if (r == -1) {
298                     rval -= 1;
299                     continue;
300                 }
301 #endif
302                 r = shutdown(dp->sout, SHUT_RD);
303                 if (r == -1) {
304                     rval -= 1;
305                     continue;
306                 }
307             }
308             dp->sconnected = 1;
309         }
310     }
311     return (rval);
312 }
313 
314 #if defined(CLOCK_UPTIME_PRECISE)
315 #define RTPP_CLOCK CLOCK_UPTIME_PRECISE
316 #else
317 # if defined(CLOCK_MONOTONIC_RAW)
318 #define RTPP_CLOCK CLOCK_MONOTONIC_RAW
319 # else
320 #define RTPP_CLOCK CLOCK_MONOTONIC
321 #endif
322 #endif
323 
324 static double timespec2dtime(time_t, long);
325 
326 double
getdtime(void)327 getdtime(void)
328 {
329     struct timespec tp;
330 
331     if (clock_gettime(RTPP_CLOCK, &tp) == -1)
332         return (-1);
333 
334     return timespec2dtime(tp.tv_sec, tp.tv_nsec);
335 }
336 
337 static double
timespec2dtime(time_t tv_sec,long tv_nsec)338 timespec2dtime(time_t tv_sec, long tv_nsec)
339 {
340 
341     return (double)tv_sec + (double)tv_nsec / 1000000000.0;
342 }
343 
344 static void
process_workset(struct workset * wp)345 process_workset(struct workset *wp)
346 {
347     int i, j;
348     struct destination *dp;
349 
350     wp->stime = getdtime();
351     for (i = 0; i < wp->nreps; i++) {
352         for (j = 0; j < wp->ndest; j++) {
353             dp = &(wp->dests[j]);
354             dp->buf.pd.send_ts = rdtsc();
355             if (dp->sconnected == 0) {
356                 sendto(dp->sout, dp->buf.d, dp->buflen, 0, sstosa(&dp->daddr),
357                   sizeof(dp->daddr));
358             } else {
359                 send(dp->sout, dp->buf.d, dp->buflen, 0);
360             }
361         }
362     }
363     wp->etime = getdtime();
364 }
365 
366 static void
process_recvset(struct recvset * rp)367 process_recvset(struct recvset  *rp)
368 {
369     int nready, i, rval;
370     struct pollfd *pdp;
371     union pkt buf;
372     struct sockaddr_storage raddr;
373     socklen_t fromlen;
374     uint64_t rtime, rtt;
375 
376     for (;;) {
377         nready = poll(rp->pollset, rp->ndest, 100);
378         rp->npolls++;
379         if (rp->done != 0 && nready == 0) {
380             break;
381         }
382         if (nready <= 0) {
383             continue;
384         }
385         for (i = 0; i < rp->ndest && nready > 0; i++) {
386             pdp = &rp->pollset[i];
387             if ((pdp->revents & POLLIN) == 0) {
388                 continue;
389             }
390             fromlen = sizeof(raddr);
391             rval = recvfrom(pdp->fd, buf.d, sizeof(buf.d), 0, sstosa(&raddr),
392               &fromlen);
393             rtime = rdtsc();
394             if (rval > 0) {
395                 if (buf.pd.magic != rp->magic) {
396                     continue;
397                 }
398                 rtt = rtime - buf.pd.send_ts;
399                 rp->nrecvd[i]++;
400                 rp->nrecvd_total++;
401                 rp->rtt_total += rtt;
402             }
403             nready -= 1;
404         }
405     }
406 }
407 
408 
409 static void
release_workset(struct workset * wp)410 release_workset(struct workset *wp)
411 {
412     int i;
413     struct destination *dp;
414 
415     for (i = 0; i < wp->ndest; i++) {
416         dp = &(wp->dests[i]);
417         close(dp->sin);
418         if (dp->sout != dp->sin) {
419             close(dp->sout);
420         }
421     }
422     free(wp);
423 }
424 
425 static void
release_recvset(struct recvset * rp)426 release_recvset(struct recvset  *rp)
427 {
428 
429     free(rp->nrecvd);
430     free(rp);
431 }
432 
433 struct recvset *
generate_recvset(struct workset * wp,struct tconf * cfp)434 generate_recvset(struct workset *wp, struct tconf *cfp)
435 {
436     struct recvset *rp;
437     int msize, i;
438     struct pollfd *pdp;
439 
440     msize = sizeof(struct recvset) + (sizeof(struct pollfd) * wp->ndest);
441     rp = malloc(msize);
442     if (rp == NULL) {
443         return (NULL);
444     }
445     memset(rp, '\0', msize);
446     msize = sizeof(uint64_t) * wp->ndest;
447     rp->nrecvd = malloc(msize);
448     if (rp->nrecvd == NULL) {
449         free(rp);
450         return (NULL);
451     }
452     memset(rp->nrecvd, '\0', msize);
453     for (i = 0; i < wp->ndest; i++) {
454         pdp = &rp->pollset[i];
455         pdp->fd = wp->dests[i].sin;
456         pdp->events = POLLIN;
457     }
458     rp->ndest = wp->ndest;
459     rp->magic = cfp->magic;
460     return (rp);
461 }
462 
463 struct tstats {
464     double total_pps;
465     double total_poll_rate;
466     double ploss_rate;
467 };
468 
469 static void
run_test(int nthreads,int test_type,struct tconf * cfp,struct tstats * tsp)470 run_test(int nthreads, int test_type, struct tconf *cfp, struct tstats *tsp)
471 {
472     int nreps = 10 * 100;
473     int npkts = 4000;
474     struct workset *wsp[32];
475     struct recvset *rsp[32];
476     int i;
477     double pps, tduration, poll_rate;
478     uint64_t nrecvd_total, nsent_total, rtt_total;
479 
480     for (i = 0; i < nthreads; i++) {
481         wsp[i] = generate_workset(npkts, cfp);
482         assert(wsp[i] != NULL);
483         wsp[i]->nreps = nreps;
484         if (test_type == TEST_KIND_CONNECTED || test_type == TEST_KIND_HALFCONN) {
485             if (connect_workset(wsp[i], test_type) != 0) {
486                 fprintf(stderr, "connect_workset() failed\n");
487                 abort();
488             }
489         }
490         rsp[i] = generate_recvset(wsp[i], cfp);
491     }
492     for (i = 0; i < nthreads; i++) {
493         pthread_create(&wsp[i]->tid, NULL, (void *(*)(void *))process_workset, wsp[i]);
494         pthread_create(&rsp[i]->tid, NULL, (void *(*)(void *))process_recvset, rsp[i]);
495     }
496     nrecvd_total = 0;
497     nsent_total = 0;
498     for (i = 0; i < nthreads; i++) {
499         pthread_join(wsp[i]->tid, NULL);
500         rsp[i]->done = 1;
501         pthread_join(rsp[i]->tid, NULL);
502         nsent_total += wsp[i]->nreps * wsp[i]->ndest;
503         pps = wsp[i]->nreps * wsp[i]->ndest;
504         tduration = wsp[i]->etime - wsp[i]->stime;
505         pps /= tduration;
506         tsp->total_pps += pps;
507         nrecvd_total += rsp[i]->nrecvd_total;
508         rtt_total += rsp[i]->rtt_total;
509         poll_rate = ((double)rsp[i]->npolls) / tduration;
510         tsp->total_poll_rate += poll_rate / (double)nthreads;
511         release_workset(wsp[i]);
512         release_recvset(rsp[i]);
513     }
514     fprintf(stderr, "nsent_total=%ju, nrecvd_total=%ju\n", (uintmax_t)nsent_total,
515       (uintmax_t)nrecvd_total);
516     tsp->ploss_rate = (double)(nsent_total - nrecvd_total) /
517       (double)(nsent_total);
518     return;
519 }
520 
521 static void
usage(void)522 usage(void)
523 {
524 
525     exit(1);
526 }
527 
528 int
main(int argc,char ** argv)529 main(int argc, char **argv)
530 {
531     struct tconf cfg;
532     int i, j, ch;
533     struct tstats tstats;
534     char *cp;
535 
536     memset(&cfg, '\0', sizeof(struct tconf));
537     cfg.nthreads_max = 10;
538     cfg.nthreads_min = 1;
539     cfg.dstaddr = "170.178.193.146";
540     cfg.dstnetpref = 32;
541     cfg.magic = ((uint64_t)random() << 32) | (uint64_t)random();
542     cfg.paylen_min = 30;
543     cfg.paylen_max = 170;
544 
545     while ((ch = getopt(argc, argv, "m:M:k:p:P:")) != -1) {
546         switch (ch) {
547         case 'm':
548             cfg.nthreads_min = atoi(optarg);
549             break;
550 
551         case 'M':
552             cfg.nthreads_max = atoi(optarg);
553             break;
554 
555         case 'k':
556             cfg.test_kind = atoi(optarg);
557             break;
558 
559         case 'p':
560             cfg.paylen_min = atoi(optarg);
561             if (cfg.paylen_min < sizeof(struct pktdata)) {
562                 usage();
563             }
564             break;
565 
566         case 'P':
567             cfg.paylen_max = atoi(optarg);
568             break;
569 
570         case '?':
571         default:
572             usage();
573         }
574     }
575     if (cfg.paylen_max < cfg.paylen_min) {
576         usage();
577     }
578     argc -= optind;
579     argv += optind;
580 
581     if (argc != 1) {
582         usage();
583     }
584     cfg.dstaddr = argv[0];
585     cp = strrchr(cfg.dstaddr, '/');
586     if (cp != NULL) {
587         cp[0] = '\0';
588         cfg.dstnetpref = atoi(cp + 1);
589         if (cfg.dstnetpref < 1 || cfg.dstnetpref > 32) {
590             usage();
591         }
592     }
593 
594     srandomdev();
595     for (i = cfg.nthreads_min; i <= cfg.nthreads_max; i++) {
596         if (cfg.test_kind != TEST_KIND_ALL) {
597             memset(&tstats, '\0', sizeof(struct tstats));
598             run_test(i, cfg.test_kind, &cfg, &tstats);
599             printf("nthreads = %d, connected = %d: total PPS = %f, "
600               "loss %f%%, poll rate %f\n", i, cfg.test_kind, tstats.total_pps,
601               tstats.ploss_rate * 100, tstats.total_poll_rate);
602             continue;
603         }
604         for (j = TEST_KIND_ALL + 1; j <= TEST_KIND_MAX; j++) {
605             memset(&tstats, '\0', sizeof(struct tstats));
606             run_test(i, j, &cfg, &tstats);
607             printf("nthreads = %d, connected = %d: total PPS = %f, "
608               "loss %f%%, poll rate %f\n", i, j, tstats.total_pps,
609               tstats.ploss_rate * 100, tstats.total_poll_rate);
610         }
611     }
612     exit(0);
613 }
614