1 #include <sys/param.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/queue.h>
5 #include <sys/socket.h>
6 #include <sys/sysctl.h>
7 #include <sys/time.h>
8 
9 #include <machine/atomic.h>
10 #ifdef __FreeBSD__
11 #include <machine/cpu.h>
12 #endif
13 #include <machine/cpufunc.h>
14 
15 #include <arpa/inet.h>
16 #include <netinet/in.h>
17 
18 #include <err.h>
19 #include <errno.h>
20 #include <pthread.h>
21 #include <pthread_np.h>
22 #include <signal.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <stdint.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 
30 #include "kq_sendrecv_proto.h"
31 
32 /*
33  * Note about the sender start synchronization.
34  *
35  * We apply two stage synchronization.  The first stage uses pthread
36  * condition (it sleeps), which waits for the establishment for all
37  * connections, which could be slow.  The second stage uses g_nwait
38  * of send_globctx; all relevant threads spin on g_nwait.  The main
39  * thread spin-waits for all senders to increase g_nwait.  The sender
40  * thread increases the g_nwait, then it spin-waits for main thread
41  * to reset g_nwait.  In this way, we can make sure that all senders
42  * start roughly at the same time.
43  */
44 
45 #if 0
46 #define SEND_DEBUG
47 #endif
48 #if 0
49 #define SEND_TIME_DEBUG
50 #endif
51 
52 #define SEND_DUR		10
53 #define SEND_EVENT_MAX		64
54 #define SEND_BUFLEN		(128 * 1024)
55 
56 /*
57  * The successful 3-way handshake on the connection does not mean the
58  * remote application can accept(2) this connection.  Even worse, the
59  * remote side's network stack may drop the connection silently, i.e.
60  * w/o RST.  If this happened, the blocking read(2) would not return,
61  * until the keepalive kicked in, which would take quite some time.
62  * This is obviously not what we want here, so use synthetic timeout
63  * for blocking read(2).  Here, we will retry if a blocking read(2)
64  * times out.
65  */
66 #define SEND_READTO_MS		1000		/* unit: ms */
67 
68 #if defined(__DragonFly__)
69 #define SEND_CONN_CTX_ALIGN	__VM_CACHELINE_SIZE
70 #elif defined(__FreeBSD__)
71 #define SEND_CONN_CTX_ALIGN	CACHE_LINE_SIZE
72 #else
73 #define SEND_CONN_CTX_ALIGN	64	/* XXX */
74 #endif
75 
76 struct conn_ctx {
77 	int			c_s;
78 	int			c_err;
79 	uint64_t		c_stat;
80 	struct timespec		c_terr;
81 
82 	STAILQ_ENTRY(conn_ctx)	c_glob_link;
83 	STAILQ_ENTRY(conn_ctx)	c_link;
84 	struct sockaddr_in	c_in;
85 	int			c_thr_id;
86 } __aligned(SEND_CONN_CTX_ALIGN);
87 
88 STAILQ_HEAD(conn_ctx_list, conn_ctx);
89 
90 struct send_globctx {
91 	struct conn_ctx_list	g_conn;
92 
93 	int			g_dur;
94 	int			g_nconn;
95 	pthread_mutex_t		g_lock;
96 	pthread_cond_t		g_cond;
97 
98 	volatile u_int		g_nwait;
99 	int			g_readto_ms;	/* unit: ms */
100 	int			g_buflen;
101 	bool			g_sendfile;
102 };
103 
104 struct send_thrctx {
105 	struct conn_ctx_list	t_conn;
106 	pthread_mutex_t		t_lock;
107 	pthread_cond_t		t_cond;
108 
109 	struct send_globctx	*t_glob;
110 	struct timespec		t_start;
111 	struct timespec		t_end;
112 	double			t_run_us;	/* unit: us */
113 
114 	pthread_t		t_tid;
115 	int			t_id;
116 };
117 
118 static void	send_build_addrlist(const struct sockaddr_in *, int,
119 		    const struct sockaddr_in **, int *, int);
120 static void	*send_thread(void *);
121 
122 static __inline void
123 send_spinwait(void)
124 {
125 #if defined(__DragonFly__)
126 	cpu_pause();
127 #elif defined(__FreeBSD__)
128 	cpu_spinwait();
129 #else
130 	/* XXX nothing */
131 #endif
132 }
133 
134 static void
135 usage(const char *cmd)
136 {
137 	fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
138 	    "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] "
139 	    "[-b buflen] [-B]\n", cmd);
140 	exit(2);
141 }
142 
143 int
144 main(int argc, char *argv[])
145 {
146 	struct send_globctx glob;
147 	struct send_thrctx *ctx_arr, *ctx;
148 	struct sockaddr_in *in_arr, *in;
149 	const struct sockaddr_in *daddr;
150 	struct timespec run, end, start;
151 	double total_run_us, total, conn_min, conn_max;
152 	double jain, jain_res;
153 	int jain_cnt;
154 	struct conn_ctx *conn;
155 	sigset_t sigset;
156 	int opt, i, ncpus;
157 	int in_arr_cnt, in_arr_sz, ndaddr;
158 	int nthr, nconn, dur, readto_ms, buflen;
159 	int log_err, err_cnt, has_minmax;
160 	u_short port = RECV_PORT;
161 	uint32_t idx;
162 	size_t sz;
163 	bool do_sendfile = false, bindcpu = false;
164 
165 	sigemptyset(&sigset);
166 	sigaddset(&sigset, SIGPIPE);
167 	if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
168 		err(1, "sigprocmask failed");
169 
170 	sz = sizeof(ncpus);
171 	if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0)
172 		err(1, "sysctl hw.ncpu failed");
173 	nthr = ncpus;
174 
175 	in_arr_sz = 4;
176 	in_arr_cnt = 0;
177 	in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
178 	if (in_arr == NULL)
179 		err(1, "malloc failed");
180 
181 	log_err = 0;
182 	nconn = 0;
183 	dur = SEND_DUR;
184 	readto_ms = SEND_READTO_MS;
185 	buflen = SEND_BUFLEN;
186 
187 	while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) {
188 		switch (opt) {
189 		case '4':
190 			if (in_arr_cnt == in_arr_sz) {
191 				in_arr_sz *= 2;
192 				in_arr = reallocf(in_arr,
193 				    in_arr_sz * sizeof(struct sockaddr_in));
194 				if (in_arr == NULL)
195 					err(1, "reallocf failed");
196 			}
197 			in = &in_arr[in_arr_cnt];
198 			++in_arr_cnt;
199 
200 			memset(in, 0, sizeof(*in));
201 			in->sin_family = AF_INET;
202 			if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
203 				errx(1, "inet_pton failed %s", optarg);
204 			break;
205 
206 		case 'B':
207 			bindcpu = true;
208 			break;
209 
210 		case 'E':
211 			log_err = 1;
212 			break;
213 
214 		case 'S':
215 			do_sendfile = true;
216 			break;
217 
218 		case 'b':
219 			buflen = strtol(optarg, NULL, 10);
220 			if (buflen <= 0)
221 				errx(1, "invalid -b");
222 			break;
223 
224 		case 'c':
225 			nconn = strtol(optarg, NULL, 10);
226 			if (nconn <= 0)
227 				errx(1, "invalid -c");
228 			break;
229 
230 		case 'l':
231 			dur = strtoul(optarg, NULL, 10);
232 			if (dur == 0)
233 				errx(1, "invalid -l");
234 			break;
235 
236 		case 'p':
237 			port = strtoul(optarg, NULL, 10);
238 			break;
239 
240 		case 'r':
241 			readto_ms = strtol(optarg, NULL, 10);
242 			if (readto_ms <= 0)
243 				errx(1, "invalid -r");
244 			break;
245 
246 		case 't':
247 			nthr = strtol(optarg, NULL, 10);
248 			if (nthr <= 0)
249 				errx(1, "invalid -t");
250 			break;
251 
252 		default:
253 			usage(argv[0]);
254 		}
255 	}
256 	if (in_arr_cnt == 0 || nconn == 0)
257 		errx(1, "either -4 or -c are specified");
258 
259 	if (nthr > nconn)
260 		nthr = nconn;
261 
262 	for (i = 0; i < in_arr_cnt; ++i)
263 		in_arr[i].sin_port = htons(port);
264 
265 	ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
266 	if (ctx_arr == NULL)
267 		err(1, "calloc failed");
268 
269 	memset(&glob, 0, sizeof(glob));
270 	STAILQ_INIT(&glob.g_conn);
271 	glob.g_nconn = nconn;
272 	glob.g_nwait = 1; /* count self in */
273 	glob.g_dur = dur;
274 	glob.g_readto_ms = readto_ms;
275 	glob.g_sendfile = do_sendfile;
276 	glob.g_buflen = buflen;
277 	pthread_mutex_init(&glob.g_lock, NULL);
278 	pthread_cond_init(&glob.g_cond, NULL);
279 
280 	pthread_set_name_np(pthread_self(), "main");
281 
282 	/* Build receiver address list */
283 	send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
284 
285 	/*
286 	 * Start senders.
287 	 */
288 	for (i = 0; i < nthr; ++i) {
289 		pthread_attr_t attr;
290 		int error;
291 
292 		ctx = &ctx_arr[i];
293 		STAILQ_INIT(&ctx->t_conn);
294 		ctx->t_id = i;
295 		ctx->t_glob = &glob;
296 		pthread_mutex_init(&ctx->t_lock, NULL);
297 		pthread_cond_init(&ctx->t_cond, NULL);
298 
299 		pthread_attr_init(&attr);
300 		if (bindcpu) {
301 #ifdef __FreeBSD__
302 			cpuset_t mask;
303 #else
304 			cpu_set_t mask;
305 #endif
306 
307 			CPU_ZERO(&mask);
308 			CPU_SET(i % ncpus, &mask);
309 			error = pthread_attr_setaffinity_np(&attr,
310 			    sizeof(mask), &mask);
311 			if (error) {
312 				errc(1, error, "pthread_attr_setaffinity_np "
313 				    "failed");
314 			}
315 		}
316 
317 		error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx);
318 		if (error)
319 			errc(1, error, "pthread_create failed");
320 		pthread_attr_destroy(&attr);
321 	}
322 
323 	/*
324 	 * Distribute connections to senders.
325 	 *
326 	 * NOTE:
327 	 * We start from a random position in the address list, so that the
328 	 * first several receiving servers will not be abused, if the number
329 	 * of connections is small and there are many clients.
330 	 */
331 	idx = arc4random_uniform(ndaddr);
332 	for (i = 0; i < nconn; ++i) {
333 		const struct sockaddr_in *da;
334 
335 		da = &daddr[idx % ndaddr];
336 		++idx;
337 
338 		conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
339 		if (conn == NULL)
340 			err(1, "aligned_alloc failed");
341 		memset(conn, 0, sizeof(*conn));
342 		conn->c_in = *da;
343 		conn->c_s = -1;
344 
345 		ctx = &ctx_arr[i % nthr];
346 		conn->c_thr_id = ctx->t_id;
347 
348 		pthread_mutex_lock(&ctx->t_lock);
349 		STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
350 		pthread_mutex_unlock(&ctx->t_lock);
351 		pthread_cond_signal(&ctx->t_cond);
352 
353 		/* Add to the global list for results gathering */
354 		STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
355 	}
356 
357 	/*
358 	 * No more connections; notify the senders.
359 	 *
360 	 * NOTE:
361 	 * The marker for 'the end of connection list' has 0 in its
362 	 * c_in.sin_port.
363 	 */
364 	for (i = 0; i < nthr; ++i) {
365 		conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
366 		if (conn == NULL)
367 			err(1, "aligned_alloc failed");
368 		memset(conn, 0, sizeof(*conn));
369 		conn->c_s = -1;
370 
371 		ctx = &ctx_arr[i];
372 		pthread_mutex_lock(&ctx->t_lock);
373 		STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
374 		pthread_mutex_unlock(&ctx->t_lock);
375 		pthread_cond_signal(&ctx->t_cond);
376 	}
377 
378 	/*
379 	 * Sender start sync, stage 1:
380 	 * Wait for connections establishment (slow).
381 	 */
382 	pthread_mutex_lock(&glob.g_lock);
383 	while (glob.g_nconn != 0)
384 		pthread_cond_wait(&glob.g_cond, &glob.g_lock);
385 	pthread_mutex_unlock(&glob.g_lock);
386 
387 	/*
388 	 * Sender start sync, stage 2:
389 	 * Wait for senders to spin-wait; and once all senders spin-wait,
390 	 * release them by resetting g_nwait.
391 	 */
392 	while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
393 		send_spinwait();
394 
395 	fprintf(stderr, "start %d seconds sending test: %d threads, "
396 	    "%d connections\n", dur, nthr, nconn);
397 
398 	/*
399 	 * Wait for the senders to finish and gather the results.
400 	 */
401 
402 	memset(&end, 0, sizeof(end));		/* XXX stupid gcc warning */
403 	memset(&start, 0, sizeof(start));	/* XXX stupid gcc warning */
404 
405 	for (i = 0; i < nthr; ++i) {
406 		ctx = &ctx_arr[i];
407 		pthread_join(ctx->t_tid, NULL);
408 
409 		timespecsub(&ctx->t_end, &ctx->t_start, &run);
410 		ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
411 		    ((double)run.tv_nsec / 1000.0);
412 
413 		if (i == 0) {
414 			start = ctx->t_start;
415 			end = ctx->t_end;
416 		} else {
417 			if (timespeccmp(&start, &ctx->t_start, >))
418 				start = ctx->t_start;
419 			if (timespeccmp(&end, &ctx->t_end, <))
420 				end = ctx->t_end;
421 		}
422 
423 #ifdef SEND_TIME_DEBUG
424 		fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
425 		    ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
426 		    ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
427 #endif
428 	}
429 
430 #ifdef SEND_TIME_DEBUG
431 	fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
432 	    start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
433 #endif
434 
435 	timespecsub(&end, &start, &run);
436 	total_run_us = ((double)run.tv_sec * 1000000.0) +
437 	    ((double)run.tv_nsec / 1000.0);
438 	total = 0.0;
439 
440 	err_cnt = 0;
441 	has_minmax = 0;
442 	conn_min = 0.0;
443 	conn_max = 0.0;
444 
445 	jain = 0.0;
446 	jain_res = 0.0;
447 	jain_cnt = 0;
448 
449 	STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
450 		total += conn->c_stat;
451 		if (conn->c_err == 0) {
452 			double perf;	/* unit: Mbps */
453 
454 			perf = (conn->c_stat * 8.0) /
455 			    ctx_arr[conn->c_thr_id].t_run_us;
456 			if (!has_minmax) {
457 				conn_min = perf;
458 				conn_max = perf;
459 				has_minmax = 1;
460 			} else {
461 				if (perf > conn_max)
462 					conn_max = perf;
463 				if (perf < conn_min)
464 					conn_min = perf;
465 			}
466 			jain += (perf * perf);
467 			jain_res += perf;
468 			++jain_cnt;
469 		} else {
470 			++err_cnt;
471 		}
472 	}
473 
474 	jain *= jain_cnt;
475 	jain = (jain_res * jain_res) / jain;
476 
477 	printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
478 	    "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
479 	    jain, err_cnt);
480 
481 	if (log_err && err_cnt) {
482 		STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
483 			char name[INET_ADDRSTRLEN];
484 			double tmp_run;
485 
486 			if (conn->c_err == 0)
487 				continue;
488 
489 			run = conn->c_terr;
490 			timespecsub(&conn->c_terr,
491 			    &ctx_arr[conn->c_thr_id].t_start, &run);
492 			tmp_run = ((double)run.tv_sec * 1000000.0) +
493 			    ((double)run.tv_nsec / 1000.0);
494 			fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
495 			    "errno %d\n",
496 			    conn->c_thr_id,
497 			    inet_ntop(AF_INET, &conn->c_in.sin_addr,
498 			        name, sizeof(name)),
499 			    ntohs(conn->c_in.sin_port),
500 			    run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
501 			    conn->c_err);
502 			--err_cnt;
503 			if (err_cnt == 0)
504 				break;
505 		}
506 	}
507 
508 	exit(0);
509 }
510 
511 static void
512 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
513     const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
514 {
515 	struct sockaddr_in *daddr;
516 	struct timeval readto;
517 	int i, ndaddr;
518 
519 	daddr = NULL;
520 	ndaddr = 0;
521 
522 	memset(&readto, 0, sizeof(readto));
523 	readto.tv_sec = readto_ms / 1000;
524 	readto.tv_usec = (readto_ms % 1000) * 1000;
525 
526 	for (i = 0; i < in_arr_cnt; ++i) {
527 		const struct sockaddr_in *in = &in_arr[i];
528 		struct recv_info info_hdr;
529 		uint16_t *ports;
530 		int s, n, ports_sz, d;
531 
532 again:
533 		s = socket(AF_INET, SOCK_STREAM, 0);
534 		if (s < 0)
535 			err(1, "socket failed");
536 
537 		if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
538 			err(1, "connect failed");
539 
540 		if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
541 		    &readto, sizeof(readto)) < 0)
542 			err(1, "setsockopt(RCVTIMEO) failed");
543 
544 		n = read(s, &info_hdr, sizeof(info_hdr));
545 		if (n != sizeof(info_hdr)) {
546 			if (n < 0) {
547 				if (errno == EAGAIN) {
548 					close(s);
549 					goto again;
550 				}
551 				err(1, "read info hdr failed");
552 			} else {
553 				errx(1, "read truncated info hdr");
554 			}
555 		}
556 		if (info_hdr.ndport == 0) {
557 			close(s);
558 			continue;
559 		}
560 
561 		ports_sz = info_hdr.ndport * sizeof(uint16_t);
562 		ports = malloc(ports_sz);
563 		if (ports == NULL)
564 			err(1, "malloc failed");
565 
566 		n = read(s, ports, ports_sz);
567 		if (n != ports_sz) {
568 			if (n < 0) {
569 				if (errno == EAGAIN) {
570 					free(ports);
571 					close(s);
572 					goto again;
573 				}
574 				err(1, "read ports failed");
575 			} else {
576 				errx(1, "read truncated ports");
577 			}
578 		}
579 
580 		daddr = reallocf(daddr,
581 		    (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
582 		if (daddr == NULL)
583 			err(1, "reallocf failed");
584 
585 		for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
586 			struct sockaddr_in *da = &daddr[d];
587 
588 			*da = *in;
589 			da->sin_port = ports[d - ndaddr];
590 		}
591 		ndaddr += info_hdr.ndport;
592 
593 		free(ports);
594 		close(s);
595 	}
596 
597 #ifdef SEND_DEBUG
598 	for (i = 0; i < ndaddr; ++i) {
599 		const struct sockaddr_in *da = &daddr[i];
600 		char name[INET_ADDRSTRLEN];
601 
602 		fprintf(stderr, "%s:%d\n",
603 		    inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
604 		    ntohs(da->sin_port));
605 	}
606 #endif
607 
608 	*daddr0 = daddr;
609 	*ndaddr0 = ndaddr;
610 }
611 
612 static void *
613 send_thread(void *xctx)
614 {
615 	struct send_thrctx *ctx = xctx;
616 	struct conn_ctx *timeo;
617 	struct kevent chg_evt;
618 	uint8_t *buf;
619 	int nconn = 0, kq, n, fd = -1, buflen;
620 	char name[32];
621 
622 	snprintf(name, sizeof(name), "snd%d", ctx->t_id);
623 	pthread_set_name_np(pthread_self(), name);
624 
625 	buflen = ctx->t_glob->g_buflen;
626 	buf = malloc(buflen);
627 	if (buf == NULL)
628 		err(1, "malloc(%d) failed", buflen);
629 
630 	if (ctx->t_glob->g_sendfile) {
631 		char filename[] = "sendtmpXXX";
632 
633 		fd = mkstemp(filename);
634 		if (fd < 0)
635 			err(1, "mkstemp failed");
636 		if (write(fd, buf, buflen) != buflen)
637 			err(1, "write to file failed");
638 		unlink(filename);
639 		free(buf);
640 		buf = NULL;
641 	}
642 
643 	kq = kqueue();
644 	if (kq < 0)
645 		err(1, "kqueue failed");
646 
647 	/*
648 	 * Establish the connections assigned to us and add the
649 	 * established connections to kqueue.
650 	 */
651 	for (;;) {
652 #ifdef SEND_DEBUG
653 		char addr_name[INET_ADDRSTRLEN];
654 #endif
655 		struct timeval readto;
656 		struct conn_ctx *conn;
657 		struct conn_ack ack;
658 		int on;
659 
660 		pthread_mutex_lock(&ctx->t_lock);
661 		while (STAILQ_EMPTY(&ctx->t_conn))
662 			pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
663 		conn = STAILQ_FIRST(&ctx->t_conn);
664 		STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
665 		pthread_mutex_unlock(&ctx->t_lock);
666 
667 		if (conn->c_in.sin_port == 0) {
668 			/*
669 			 * The marker for 'the end of connection list'.
670 			 * See the related comment in main thread.
671 			 *
672 			 * NOTE:
673 			 * We reuse the marker as the udata for the
674 			 * kqueue timer.
675 			 */
676 			timeo = conn;
677 			break;
678 		}
679 
680 		++nconn;
681 #ifdef SEND_DEBUG
682 		fprintf(stderr, "%s %s:%d\n", name,
683 		    inet_ntop(AF_INET, &conn->c_in.sin_addr,
684 		        addr_name, sizeof(addr_name)),
685 		    ntohs(conn->c_in.sin_port));
686 #endif
687 
688 again:
689 		conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
690 		if (conn->c_s < 0)
691 			err(1, "socket failed");
692 
693 		if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
694 		    sizeof(conn->c_in)) < 0)
695 			err(1, "connect failed");
696 
697 		memset(&readto, 0, sizeof(readto));
698 		readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
699 		readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
700 		if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
701 		    sizeof(readto)) < 0)
702 			err(1, "setsockopt(RCVTIMEO) failed");
703 
704 		n = read(conn->c_s, &ack, sizeof(ack));
705 		if (n != sizeof(ack)) {
706 			if (n < 0) {
707 				if (errno == EAGAIN) {
708 					close(conn->c_s);
709 					goto again;
710 				}
711 				err(1, "read ack failed");
712 			} else {
713 				errx(1, "read truncated ack");
714 			}
715 		}
716 
717 		on = 1;
718 		if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
719 			err(1, "ioctl(FIONBIO) failed");
720 
721 		EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
722 		n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
723 		if (n < 0)
724 			err(1, "kevent add failed");
725 	}
726 #ifdef SEND_DEBUG
727 	fprintf(stderr, "%s conn %d\n", name, nconn);
728 #endif
729 
730 	/*
731 	 * Sender start sync, stage 1:
732 	 * Wait for connections establishment (slow).
733 	 */
734 	pthread_mutex_lock(&ctx->t_glob->g_lock);
735 	ctx->t_glob->g_nconn -= nconn;
736 	pthread_cond_broadcast(&ctx->t_glob->g_cond);
737 	while (ctx->t_glob->g_nconn != 0)
738 		pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
739 	pthread_mutex_unlock(&ctx->t_glob->g_lock);
740 
741 	/*
742 	 * Sender start sync, stage2.
743 	 */
744 	/* Increase the g_nwait. */
745 	atomic_add_int(&ctx->t_glob->g_nwait, 1);
746 	/* Spin-wait for main thread to release us (reset g_nwait). */
747 	while (ctx->t_glob->g_nwait)
748 		send_spinwait();
749 
750 #ifdef SEND_DEBUG
751 	fprintf(stderr, "%s start\n", name);
752 #endif
753 
754 	/*
755 	 * Wire a kqueue timer, so that the sending can be terminated
756 	 * as requested.
757 	 *
758 	 * NOTE:
759 	 * Set -2 to c_s for timer udata, so we could distinguish it
760 	 * from real connections.
761 	 */
762 	timeo->c_s = -2;
763 	EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
764 	    ctx->t_glob->g_dur * 1000L, timeo);
765 	n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
766 	if (n < 0)
767 		err(1, "kevent add failed");
768 
769 	clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
770 	for (;;) {
771 		struct kevent evt[SEND_EVENT_MAX];
772 		int nevt, i;
773 
774 		nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
775 		if (nevt < 0)
776 			err(1, "kevent failed");
777 
778 		for (i = 0; i < nevt; ++i) {
779 			struct conn_ctx *conn = evt[i].udata;
780 
781 			if (conn->c_s < 0) {
782 				if (conn->c_s == -2) {
783 					/* Timer expired */
784 					goto done;
785 				}
786 				continue;
787 			}
788 
789 			if (fd >= 0) {
790 				off_t m, off;
791 				size_t len;
792 
793 				off = conn->c_stat % buflen;
794 				len = buflen - off;
795 
796 				n = sendfile(fd, conn->c_s, off, len, NULL,
797 				    &m, 0);
798 				if (n == 0 || (n < 0 && errno == EAGAIN))
799 					n = m;
800 			} else {
801 				n = write(conn->c_s, buf, buflen);
802 			}
803 
804 			if (n < 0) {
805 				if (errno != EAGAIN) {
806 					conn->c_err = errno;
807 					clock_gettime(CLOCK_MONOTONIC_PRECISE,
808 					    &conn->c_terr);
809 					close(conn->c_s);
810 					conn->c_s = -1;
811 				}
812 			} else {
813 				conn->c_stat += n;
814 			}
815 		}
816 	}
817 done:
818 	clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
819 
820 	if (fd >= 0)
821 		close(fd);
822 	if (buf != NULL)
823 		free(buf);
824 	return NULL;
825 }
826