1 #include <sys/param.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/queue.h>
5 #include <sys/socket.h>
6 #include <sys/sysctl.h>
7 #include <sys/time.h>
8
9 #include <machine/atomic.h>
10 #ifdef __FreeBSD__
11 #include <machine/cpu.h>
12 #endif
13 #include <machine/cpufunc.h>
14
15 #include <arpa/inet.h>
16 #include <netinet/in.h>
17
18 #include <err.h>
19 #include <errno.h>
20 #include <pthread.h>
21 #include <pthread_np.h>
22 #include <signal.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <stdint.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include "kq_sendrecv_proto.h"
31
32 /*
33 * Note about the sender start synchronization.
34 *
35 * We apply two stage synchronization. The first stage uses pthread
36 * condition (it sleeps), which waits for the establishment for all
37 * connections, which could be slow. The second stage uses g_nwait
38 * of send_globctx; all relevant threads spin on g_nwait. The main
39 * thread spin-waits for all senders to increase g_nwait. The sender
40 * thread increases the g_nwait, then it spin-waits for main thread
41 * to reset g_nwait. In this way, we can make sure that all senders
42 * start roughly at the same time.
43 */
44
45 #if 0
46 #define SEND_DEBUG
47 #endif
48 #if 0
49 #define SEND_TIME_DEBUG
50 #endif
51
52 #define SEND_DUR 10
53 #define SEND_EVENT_MAX 64
54 #define SEND_BUFLEN (128 * 1024)
55
56 /*
57 * The successful 3-way handshake on the connection does not mean the
58 * remote application can accept(2) this connection. Even worse, the
59 * remote side's network stack may drop the connection silently, i.e.
60 * w/o RST. If this happened, the blocking read(2) would not return,
61 * until the keepalive kicked in, which would take quite some time.
62 * This is obviously not what we want here, so use synthetic timeout
63 * for blocking read(2). Here, we will retry if a blocking read(2)
64 * times out.
65 */
66 #define SEND_READTO_MS 1000 /* unit: ms */
67
68 #if defined(__DragonFly__)
69 #define SEND_CONN_CTX_ALIGN __VM_CACHELINE_SIZE
70 #elif defined(__FreeBSD__)
71 #define SEND_CONN_CTX_ALIGN CACHE_LINE_SIZE
72 #else
73 #define SEND_CONN_CTX_ALIGN 64 /* XXX */
74 #endif
75
76 struct conn_ctx {
77 int c_s;
78 int c_err;
79 uint64_t c_stat;
80 struct timespec c_terr;
81
82 STAILQ_ENTRY(conn_ctx) c_glob_link;
83 STAILQ_ENTRY(conn_ctx) c_link;
84 struct sockaddr_in c_in;
85 int c_thr_id;
86 } __aligned(SEND_CONN_CTX_ALIGN);
87
88 STAILQ_HEAD(conn_ctx_list, conn_ctx);
89
90 struct send_globctx {
91 struct conn_ctx_list g_conn;
92
93 int g_dur;
94 int g_nconn;
95 pthread_mutex_t g_lock;
96 pthread_cond_t g_cond;
97
98 volatile u_int g_nwait;
99 int g_readto_ms; /* unit: ms */
100 int g_buflen;
101 bool g_sendfile;
102 };
103
104 struct send_thrctx {
105 struct conn_ctx_list t_conn;
106 pthread_mutex_t t_lock;
107 pthread_cond_t t_cond;
108
109 struct send_globctx *t_glob;
110 struct timespec t_start;
111 struct timespec t_end;
112 double t_run_us; /* unit: us */
113
114 pthread_t t_tid;
115 int t_id;
116 };
117
118 static void send_build_addrlist(const struct sockaddr_in *, int,
119 const struct sockaddr_in **, int *, int);
120 static void *send_thread(void *);
121
122 static __inline void
send_spinwait(void)123 send_spinwait(void)
124 {
125 #if defined(__DragonFly__)
126 cpu_pause();
127 #elif defined(__FreeBSD__)
128 cpu_spinwait();
129 #else
130 /* XXX nothing */
131 #endif
132 }
133
134 static void
usage(const char * cmd)135 usage(const char *cmd)
136 {
137 fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
138 "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] "
139 "[-b buflen] [-B]\n", cmd);
140 exit(2);
141 }
142
143 int
main(int argc,char * argv[])144 main(int argc, char *argv[])
145 {
146 struct send_globctx glob;
147 struct send_thrctx *ctx_arr, *ctx;
148 struct sockaddr_in *in_arr, *in;
149 const struct sockaddr_in *daddr;
150 struct timespec run, end, start;
151 double total_run_us, total, conn_min, conn_max;
152 double jain, jain_res;
153 int jain_cnt;
154 struct conn_ctx *conn;
155 sigset_t sigset;
156 int opt, i, ncpus;
157 int in_arr_cnt, in_arr_sz, ndaddr;
158 int nthr, nconn, dur, readto_ms, buflen;
159 int log_err, err_cnt, has_minmax;
160 u_short port = RECV_PORT;
161 uint32_t idx;
162 size_t sz;
163 bool do_sendfile = false, bindcpu = false;
164
165 sigemptyset(&sigset);
166 sigaddset(&sigset, SIGPIPE);
167 if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
168 err(1, "sigprocmask failed");
169
170 sz = sizeof(ncpus);
171 if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0)
172 err(1, "sysctl hw.ncpu failed");
173 nthr = ncpus;
174
175 in_arr_sz = 4;
176 in_arr_cnt = 0;
177 in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
178 if (in_arr == NULL)
179 err(1, "malloc failed");
180
181 log_err = 0;
182 nconn = 0;
183 dur = SEND_DUR;
184 readto_ms = SEND_READTO_MS;
185 buflen = SEND_BUFLEN;
186
187 while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) {
188 switch (opt) {
189 case '4':
190 if (in_arr_cnt == in_arr_sz) {
191 in_arr_sz *= 2;
192 in_arr = reallocf(in_arr,
193 in_arr_sz * sizeof(struct sockaddr_in));
194 if (in_arr == NULL)
195 err(1, "reallocf failed");
196 }
197 in = &in_arr[in_arr_cnt];
198 ++in_arr_cnt;
199
200 memset(in, 0, sizeof(*in));
201 in->sin_family = AF_INET;
202 if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
203 errx(1, "inet_pton failed %s", optarg);
204 break;
205
206 case 'B':
207 bindcpu = true;
208 break;
209
210 case 'E':
211 log_err = 1;
212 break;
213
214 case 'S':
215 do_sendfile = true;
216 break;
217
218 case 'b':
219 buflen = strtol(optarg, NULL, 10);
220 if (buflen <= 0)
221 errx(1, "invalid -b");
222 break;
223
224 case 'c':
225 nconn = strtol(optarg, NULL, 10);
226 if (nconn <= 0)
227 errx(1, "invalid -c");
228 break;
229
230 case 'l':
231 dur = strtoul(optarg, NULL, 10);
232 if (dur == 0)
233 errx(1, "invalid -l");
234 break;
235
236 case 'p':
237 port = strtoul(optarg, NULL, 10);
238 break;
239
240 case 'r':
241 readto_ms = strtol(optarg, NULL, 10);
242 if (readto_ms <= 0)
243 errx(1, "invalid -r");
244 break;
245
246 case 't':
247 nthr = strtol(optarg, NULL, 10);
248 if (nthr <= 0)
249 errx(1, "invalid -t");
250 break;
251
252 default:
253 usage(argv[0]);
254 }
255 }
256 if (in_arr_cnt == 0 || nconn == 0)
257 errx(1, "either -4 or -c are specified");
258
259 if (nthr > nconn)
260 nthr = nconn;
261
262 for (i = 0; i < in_arr_cnt; ++i)
263 in_arr[i].sin_port = htons(port);
264
265 ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
266 if (ctx_arr == NULL)
267 err(1, "calloc failed");
268
269 memset(&glob, 0, sizeof(glob));
270 STAILQ_INIT(&glob.g_conn);
271 glob.g_nconn = nconn;
272 glob.g_nwait = 1; /* count self in */
273 glob.g_dur = dur;
274 glob.g_readto_ms = readto_ms;
275 glob.g_sendfile = do_sendfile;
276 glob.g_buflen = buflen;
277 pthread_mutex_init(&glob.g_lock, NULL);
278 pthread_cond_init(&glob.g_cond, NULL);
279
280 pthread_set_name_np(pthread_self(), "main");
281
282 /* Build receiver address list */
283 send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
284
285 /*
286 * Start senders.
287 */
288 for (i = 0; i < nthr; ++i) {
289 pthread_attr_t attr;
290 int error;
291
292 ctx = &ctx_arr[i];
293 STAILQ_INIT(&ctx->t_conn);
294 ctx->t_id = i;
295 ctx->t_glob = &glob;
296 pthread_mutex_init(&ctx->t_lock, NULL);
297 pthread_cond_init(&ctx->t_cond, NULL);
298
299 pthread_attr_init(&attr);
300 if (bindcpu) {
301 #ifdef __FreeBSD__
302 cpuset_t mask;
303 #else
304 cpu_set_t mask;
305 #endif
306
307 CPU_ZERO(&mask);
308 CPU_SET(i % ncpus, &mask);
309 error = pthread_attr_setaffinity_np(&attr,
310 sizeof(mask), &mask);
311 if (error) {
312 errc(1, error, "pthread_attr_setaffinity_np "
313 "failed");
314 }
315 }
316
317 error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx);
318 if (error)
319 errc(1, error, "pthread_create failed");
320 pthread_attr_destroy(&attr);
321 }
322
323 /*
324 * Distribute connections to senders.
325 *
326 * NOTE:
327 * We start from a random position in the address list, so that the
328 * first several receiving servers will not be abused, if the number
329 * of connections is small and there are many clients.
330 */
331 idx = arc4random_uniform(ndaddr);
332 for (i = 0; i < nconn; ++i) {
333 const struct sockaddr_in *da;
334
335 da = &daddr[idx % ndaddr];
336 ++idx;
337
338 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
339 if (conn == NULL)
340 err(1, "aligned_alloc failed");
341 memset(conn, 0, sizeof(*conn));
342 conn->c_in = *da;
343 conn->c_s = -1;
344
345 ctx = &ctx_arr[i % nthr];
346 conn->c_thr_id = ctx->t_id;
347
348 pthread_mutex_lock(&ctx->t_lock);
349 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
350 pthread_mutex_unlock(&ctx->t_lock);
351 pthread_cond_signal(&ctx->t_cond);
352
353 /* Add to the global list for results gathering */
354 STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
355 }
356
357 /*
358 * No more connections; notify the senders.
359 *
360 * NOTE:
361 * The marker for 'the end of connection list' has 0 in its
362 * c_in.sin_port.
363 */
364 for (i = 0; i < nthr; ++i) {
365 conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
366 if (conn == NULL)
367 err(1, "aligned_alloc failed");
368 memset(conn, 0, sizeof(*conn));
369 conn->c_s = -1;
370
371 ctx = &ctx_arr[i];
372 pthread_mutex_lock(&ctx->t_lock);
373 STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
374 pthread_mutex_unlock(&ctx->t_lock);
375 pthread_cond_signal(&ctx->t_cond);
376 }
377
378 /*
379 * Sender start sync, stage 1:
380 * Wait for connections establishment (slow).
381 */
382 pthread_mutex_lock(&glob.g_lock);
383 while (glob.g_nconn != 0)
384 pthread_cond_wait(&glob.g_cond, &glob.g_lock);
385 pthread_mutex_unlock(&glob.g_lock);
386
387 /*
388 * Sender start sync, stage 2:
389 * Wait for senders to spin-wait; and once all senders spin-wait,
390 * release them by resetting g_nwait.
391 */
392 while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
393 send_spinwait();
394
395 fprintf(stderr, "start %d seconds sending test: %d threads, "
396 "%d connections\n", dur, nthr, nconn);
397
398 /*
399 * Wait for the senders to finish and gather the results.
400 */
401
402 memset(&end, 0, sizeof(end)); /* XXX stupid gcc warning */
403 memset(&start, 0, sizeof(start)); /* XXX stupid gcc warning */
404
405 for (i = 0; i < nthr; ++i) {
406 ctx = &ctx_arr[i];
407 pthread_join(ctx->t_tid, NULL);
408
409 timespecsub(&ctx->t_end, &ctx->t_start, &run);
410 ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
411 ((double)run.tv_nsec / 1000.0);
412
413 if (i == 0) {
414 start = ctx->t_start;
415 end = ctx->t_end;
416 } else {
417 if (timespeccmp(&start, &ctx->t_start, >))
418 start = ctx->t_start;
419 if (timespeccmp(&end, &ctx->t_end, <))
420 end = ctx->t_end;
421 }
422
423 #ifdef SEND_TIME_DEBUG
424 fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
425 ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
426 ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
427 #endif
428 }
429
430 #ifdef SEND_TIME_DEBUG
431 fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
432 start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
433 #endif
434
435 timespecsub(&end, &start, &run);
436 total_run_us = ((double)run.tv_sec * 1000000.0) +
437 ((double)run.tv_nsec / 1000.0);
438 total = 0.0;
439
440 err_cnt = 0;
441 has_minmax = 0;
442 conn_min = 0.0;
443 conn_max = 0.0;
444
445 jain = 0.0;
446 jain_res = 0.0;
447 jain_cnt = 0;
448
449 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
450 total += conn->c_stat;
451 if (conn->c_err == 0) {
452 double perf; /* unit: Mbps */
453
454 perf = (conn->c_stat * 8.0) /
455 ctx_arr[conn->c_thr_id].t_run_us;
456 if (!has_minmax) {
457 conn_min = perf;
458 conn_max = perf;
459 has_minmax = 1;
460 } else {
461 if (perf > conn_max)
462 conn_max = perf;
463 if (perf < conn_min)
464 conn_min = perf;
465 }
466 jain += (perf * perf);
467 jain_res += perf;
468 ++jain_cnt;
469 } else {
470 ++err_cnt;
471 }
472 }
473
474 jain *= jain_cnt;
475 jain = (jain_res * jain_res) / jain;
476
477 printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
478 "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
479 jain, err_cnt);
480
481 if (log_err && err_cnt) {
482 STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
483 char name[INET_ADDRSTRLEN];
484 double tmp_run;
485
486 if (conn->c_err == 0)
487 continue;
488
489 run = conn->c_terr;
490 timespecsub(&conn->c_terr,
491 &ctx_arr[conn->c_thr_id].t_start, &run);
492 tmp_run = ((double)run.tv_sec * 1000000.0) +
493 ((double)run.tv_nsec / 1000.0);
494 fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
495 "errno %d\n",
496 conn->c_thr_id,
497 inet_ntop(AF_INET, &conn->c_in.sin_addr,
498 name, sizeof(name)),
499 ntohs(conn->c_in.sin_port),
500 run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
501 conn->c_err);
502 --err_cnt;
503 if (err_cnt == 0)
504 break;
505 }
506 }
507
508 exit(0);
509 }
510
511 static void
send_build_addrlist(const struct sockaddr_in * in_arr,int in_arr_cnt,const struct sockaddr_in ** daddr0,int * ndaddr0,int readto_ms)512 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
513 const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
514 {
515 struct sockaddr_in *daddr;
516 struct timeval readto;
517 int i, ndaddr;
518
519 daddr = NULL;
520 ndaddr = 0;
521
522 memset(&readto, 0, sizeof(readto));
523 readto.tv_sec = readto_ms / 1000;
524 readto.tv_usec = (readto_ms % 1000) * 1000;
525
526 for (i = 0; i < in_arr_cnt; ++i) {
527 const struct sockaddr_in *in = &in_arr[i];
528 struct recv_info info_hdr;
529 uint16_t *ports;
530 int s, n, ports_sz, d;
531
532 again:
533 s = socket(AF_INET, SOCK_STREAM, 0);
534 if (s < 0)
535 err(1, "socket failed");
536
537 if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
538 err(1, "connect failed");
539
540 if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
541 &readto, sizeof(readto)) < 0)
542 err(1, "setsockopt(RCVTIMEO) failed");
543
544 n = read(s, &info_hdr, sizeof(info_hdr));
545 if (n != sizeof(info_hdr)) {
546 if (n < 0) {
547 if (errno == EAGAIN) {
548 close(s);
549 goto again;
550 }
551 err(1, "read info hdr failed");
552 } else {
553 errx(1, "read truncated info hdr");
554 }
555 }
556 if (info_hdr.ndport == 0) {
557 close(s);
558 continue;
559 }
560
561 ports_sz = info_hdr.ndport * sizeof(uint16_t);
562 ports = malloc(ports_sz);
563 if (ports == NULL)
564 err(1, "malloc failed");
565
566 n = read(s, ports, ports_sz);
567 if (n != ports_sz) {
568 if (n < 0) {
569 if (errno == EAGAIN) {
570 free(ports);
571 close(s);
572 goto again;
573 }
574 err(1, "read ports failed");
575 } else {
576 errx(1, "read truncated ports");
577 }
578 }
579
580 daddr = reallocf(daddr,
581 (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
582 if (daddr == NULL)
583 err(1, "reallocf failed");
584
585 for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
586 struct sockaddr_in *da = &daddr[d];
587
588 *da = *in;
589 da->sin_port = ports[d - ndaddr];
590 }
591 ndaddr += info_hdr.ndport;
592
593 free(ports);
594 close(s);
595 }
596
597 #ifdef SEND_DEBUG
598 for (i = 0; i < ndaddr; ++i) {
599 const struct sockaddr_in *da = &daddr[i];
600 char name[INET_ADDRSTRLEN];
601
602 fprintf(stderr, "%s:%d\n",
603 inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
604 ntohs(da->sin_port));
605 }
606 #endif
607
608 *daddr0 = daddr;
609 *ndaddr0 = ndaddr;
610 }
611
612 static void *
send_thread(void * xctx)613 send_thread(void *xctx)
614 {
615 struct send_thrctx *ctx = xctx;
616 struct conn_ctx *timeo;
617 struct kevent chg_evt;
618 uint8_t *buf;
619 int nconn = 0, kq, n, fd = -1, buflen;
620 char name[32];
621
622 snprintf(name, sizeof(name), "snd%d", ctx->t_id);
623 pthread_set_name_np(pthread_self(), name);
624
625 buflen = ctx->t_glob->g_buflen;
626 buf = malloc(buflen);
627 if (buf == NULL)
628 err(1, "malloc(%d) failed", buflen);
629
630 if (ctx->t_glob->g_sendfile) {
631 char filename[] = "sendtmpXXX";
632
633 fd = mkstemp(filename);
634 if (fd < 0)
635 err(1, "mkstemp failed");
636 if (write(fd, buf, buflen) != buflen)
637 err(1, "write to file failed");
638 unlink(filename);
639 free(buf);
640 buf = NULL;
641 }
642
643 kq = kqueue();
644 if (kq < 0)
645 err(1, "kqueue failed");
646
647 /*
648 * Establish the connections assigned to us and add the
649 * established connections to kqueue.
650 */
651 for (;;) {
652 #ifdef SEND_DEBUG
653 char addr_name[INET_ADDRSTRLEN];
654 #endif
655 struct timeval readto;
656 struct conn_ctx *conn;
657 struct conn_ack ack;
658 int on;
659
660 pthread_mutex_lock(&ctx->t_lock);
661 while (STAILQ_EMPTY(&ctx->t_conn))
662 pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
663 conn = STAILQ_FIRST(&ctx->t_conn);
664 STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
665 pthread_mutex_unlock(&ctx->t_lock);
666
667 if (conn->c_in.sin_port == 0) {
668 /*
669 * The marker for 'the end of connection list'.
670 * See the related comment in main thread.
671 *
672 * NOTE:
673 * We reuse the marker as the udata for the
674 * kqueue timer.
675 */
676 timeo = conn;
677 break;
678 }
679
680 ++nconn;
681 #ifdef SEND_DEBUG
682 fprintf(stderr, "%s %s:%d\n", name,
683 inet_ntop(AF_INET, &conn->c_in.sin_addr,
684 addr_name, sizeof(addr_name)),
685 ntohs(conn->c_in.sin_port));
686 #endif
687
688 again:
689 conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
690 if (conn->c_s < 0)
691 err(1, "socket failed");
692
693 if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
694 sizeof(conn->c_in)) < 0)
695 err(1, "connect failed");
696
697 memset(&readto, 0, sizeof(readto));
698 readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
699 readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
700 if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
701 sizeof(readto)) < 0)
702 err(1, "setsockopt(RCVTIMEO) failed");
703
704 n = read(conn->c_s, &ack, sizeof(ack));
705 if (n != sizeof(ack)) {
706 if (n < 0) {
707 if (errno == EAGAIN) {
708 close(conn->c_s);
709 goto again;
710 }
711 err(1, "read ack failed");
712 } else {
713 errx(1, "read truncated ack");
714 }
715 }
716
717 on = 1;
718 if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
719 err(1, "ioctl(FIONBIO) failed");
720
721 EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
722 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
723 if (n < 0)
724 err(1, "kevent add failed");
725 }
726 #ifdef SEND_DEBUG
727 fprintf(stderr, "%s conn %d\n", name, nconn);
728 #endif
729
730 /*
731 * Sender start sync, stage 1:
732 * Wait for connections establishment (slow).
733 */
734 pthread_mutex_lock(&ctx->t_glob->g_lock);
735 ctx->t_glob->g_nconn -= nconn;
736 pthread_cond_broadcast(&ctx->t_glob->g_cond);
737 while (ctx->t_glob->g_nconn != 0)
738 pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
739 pthread_mutex_unlock(&ctx->t_glob->g_lock);
740
741 /*
742 * Sender start sync, stage2.
743 */
744 /* Increase the g_nwait. */
745 atomic_add_int(&ctx->t_glob->g_nwait, 1);
746 /* Spin-wait for main thread to release us (reset g_nwait). */
747 while (ctx->t_glob->g_nwait)
748 send_spinwait();
749
750 #ifdef SEND_DEBUG
751 fprintf(stderr, "%s start\n", name);
752 #endif
753
754 /*
755 * Wire a kqueue timer, so that the sending can be terminated
756 * as requested.
757 *
758 * NOTE:
759 * Set -2 to c_s for timer udata, so we could distinguish it
760 * from real connections.
761 */
762 timeo->c_s = -2;
763 EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
764 ctx->t_glob->g_dur * 1000L, timeo);
765 n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
766 if (n < 0)
767 err(1, "kevent add failed");
768
769 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
770 for (;;) {
771 struct kevent evt[SEND_EVENT_MAX];
772 int nevt, i;
773
774 nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
775 if (nevt < 0)
776 err(1, "kevent failed");
777
778 for (i = 0; i < nevt; ++i) {
779 struct conn_ctx *conn = evt[i].udata;
780
781 if (conn->c_s < 0) {
782 if (conn->c_s == -2) {
783 /* Timer expired */
784 goto done;
785 }
786 continue;
787 }
788
789 if (fd >= 0) {
790 off_t m, off;
791 size_t len;
792
793 off = conn->c_stat % buflen;
794 len = buflen - off;
795
796 n = sendfile(fd, conn->c_s, off, len, NULL,
797 &m, 0);
798 if (n == 0 || (n < 0 && errno == EAGAIN))
799 n = m;
800 } else {
801 n = write(conn->c_s, buf, buflen);
802 }
803
804 if (n < 0) {
805 if (errno != EAGAIN) {
806 conn->c_err = errno;
807 clock_gettime(CLOCK_MONOTONIC_PRECISE,
808 &conn->c_terr);
809 close(conn->c_s);
810 conn->c_s = -1;
811 }
812 } else {
813 conn->c_stat += n;
814 }
815 }
816 }
817 done:
818 clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
819
820 if (fd >= 0)
821 close(fd);
822 if (buf != NULL)
823 free(buf);
824 return NULL;
825 }
826