1 #include <sys/param.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/queue.h>
5 #include <sys/socket.h>
6 #include <sys/sysctl.h>
7 #include <sys/time.h>
8 
9 #include <machine/atomic.h>
10 #ifdef __FreeBSD__
11 #include <machine/cpu.h>
12 #endif
13 #include <machine/cpufunc.h>
14 
15 #include <arpa/inet.h>
16 #include <netinet/in.h>
17 
18 #include <err.h>
19 #include <errno.h>
20 #include <pthread.h>
21 #include <pthread_np.h>
22 #include <signal.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <stdint.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 
30 #include "kq_sendrecv_proto.h"
31 
32 /*
33  * Note about the sender start synchronization.
34  *
35  * We apply two stage synchronization.  The first stage uses pthread
36  * condition (it sleeps), which waits for the establishment for all
37  * connections, which could be slow.  The second stage uses g_nwait
38  * of send_globctx; all relevant threads spin on g_nwait.  The main
39  * thread spin-waits for all senders to increase g_nwait.  The sender
40  * thread increases the g_nwait, then it spin-waits for main thread
41  * to reset g_nwait.  In this way, we can make sure that all senders
42  * start roughly at the same time.
43  */
44 
45 #ifndef timespecsub
46 #define timespecsub(vvp, uvp)						\
47 	do {								\
48 		(vvp)->tv_sec -= (uvp)->tv_sec;				\
49 		(vvp)->tv_nsec -= (uvp)->tv_nsec;			\
50 		if ((vvp)->tv_nsec < 0) {				\
51 			(vvp)->tv_sec--;				\
52 			(vvp)->tv_nsec += 1000000000;			\
53 		}							\
54 	} while (0)
55 #endif
56 
57 #ifndef timespeccmp
58 #define	timespeccmp(tvp, uvp, cmp)					\
59 	(((tvp)->tv_sec == (uvp)->tv_sec) ?				\
60 	    ((tvp)->tv_nsec cmp (uvp)->tv_nsec) :			\
61 	    ((tvp)->tv_sec cmp (uvp)->tv_sec))
62 #endif
63 
64 #if 0
65 #define SEND_DEBUG
66 #endif
67 #if 0
68 #define SEND_TIME_DEBUG
69 #endif
70 
71 #define SEND_DUR		10
72 #define SEND_EVENT_MAX		64
73 #define SEND_BUFLEN		(128 * 1024)
74 
75 /*
76  * The successful 3-way handshake on the connection does not mean the
77  * remote application can accept(2) this connection.  Even worse, the
78  * remote side's network stack may drop the connection silently, i.e.
79  * w/o RST.  If this happened, the blocking read(2) would not return,
80  * until the keepalive kicked in, which would take quite some time.
81  * This is obviously not what we want here, so use synthetic timeout
82  * for blocking read(2).  Here, we will retry if a blocking read(2)
83  * times out.
84  */
85 #define SEND_READTO_MS		1000		/* unit: ms */
86 
87 #if defined(__DragonFly__)
88 #define SEND_CONN_CTX_ALIGN	__VM_CACHELINE_SIZE
89 #elif defined(__FreeBSD__)
90 #define SEND_CONN_CTX_ALIGN	CACHE_LINE_SIZE
91 #else
92 #define SEND_CONN_CTX_ALIGN	64	/* XXX */
93 #endif
94 
95 struct conn_ctx {
96 	int			c_s;
97 	int			c_err;
98 	uint64_t		c_stat;
99 	struct timespec		c_terr;
100 
101 	STAILQ_ENTRY(conn_ctx)	c_glob_link;
102 	STAILQ_ENTRY(conn_ctx)	c_link;
103 	struct sockaddr_in	c_in;
104 	int			c_thr_id;
105 } __aligned(SEND_CONN_CTX_ALIGN);
106 
107 STAILQ_HEAD(conn_ctx_list, conn_ctx);
108 
109 struct send_globctx {
110 	struct conn_ctx_list	g_conn;
111 
112 	int			g_dur;
113 	int			g_nconn;
114 	pthread_mutex_t		g_lock;
115 	pthread_cond_t		g_cond;
116 
117 	volatile u_int		g_nwait;
118 	int			g_readto_ms;	/* unit: ms */
119 	int			g_buflen;
120 	bool			g_sendfile;
121 };
122 
123 struct send_thrctx {
124 	struct conn_ctx_list	t_conn;
125 	pthread_mutex_t		t_lock;
126 	pthread_cond_t		t_cond;
127 
128 	struct send_globctx	*t_glob;
129 	struct timespec		t_start;
130 	struct timespec		t_end;
131 	double			t_run_us;	/* unit: us */
132 
133 	pthread_t		t_tid;
134 	int			t_id;
135 };
136 
137 static void	send_build_addrlist(const struct sockaddr_in *, int,
138 		    const struct sockaddr_in **, int *, int);
139 static void	*send_thread(void *);
140 
141 static __inline void
142 send_spinwait(void)
143 {
144 #if defined(__DragonFly__)
145 	cpu_pause();
146 #elif defined(__FreeBSD__)
147 	cpu_spinwait();
148 #else
149 	/* XXX nothing */
150 #endif
151 }
152 
153 static void
154 usage(const char *cmd)
155 {
156 	fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
157 	    "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] "
158 	    "[-b buflen] [-B]\n", cmd);
159 	exit(2);
160 }
161 
162 int
163 main(int argc, char *argv[])
164 {
165 	struct send_globctx glob;
166 	struct send_thrctx *ctx_arr, *ctx;
167 	struct sockaddr_in *in_arr, *in;
168 	const struct sockaddr_in *daddr;
169 	struct timespec run, end, start;
170 	double total_run_us, total, conn_min, conn_max;
171 	double jain, jain_res;
172 	int jain_cnt;
173 	struct conn_ctx *conn;
174 	sigset_t sigset;
175 	int opt, i, ncpus;
176 	int in_arr_cnt, in_arr_sz, ndaddr;
177 	int nthr, nconn, dur, readto_ms, buflen;
178 	int log_err, err_cnt, has_minmax;
179 	u_short port = RECV_PORT;
180 	uint32_t idx;
181 	size_t sz;
182 	bool do_sendfile = false, bindcpu = false;
183 
184 	sigemptyset(&sigset);
185 	sigaddset(&sigset, SIGPIPE);
186 	if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
187 		err(1, "sigprocmask failed");
188 
189 	sz = sizeof(ncpus);
190 	if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0)
191 		err(1, "sysctl hw.ncpu failed");
192 	nthr = ncpus;
193 
194 	in_arr_sz = 4;
195 	in_arr_cnt = 0;
196 	in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
197 	if (in_arr == NULL)
198 		err(1, "malloc failed");
199 
200 	log_err = 0;
201 	nconn = 0;
202 	dur = SEND_DUR;
203 	readto_ms = SEND_READTO_MS;
204 	buflen = SEND_BUFLEN;
205 
206 	while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) {
207 		switch (opt) {
208 		case '4':
209 			if (in_arr_cnt == in_arr_sz) {
210 				in_arr_sz *= 2;
211 				in_arr = reallocf(in_arr,
212 				    in_arr_sz * sizeof(struct sockaddr_in));
213 				if (in_arr == NULL)
214 					err(1, "reallocf failed");
215 			}
216 			in = &in_arr[in_arr_cnt];
217 			++in_arr_cnt;
218 
219 			memset(in, 0, sizeof(*in));
220 			in->sin_family = AF_INET;
221 			if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
222 				errx(1, "inet_pton failed %s", optarg);
223 			break;
224 
225 		case 'B':
226 			bindcpu = true;
227 			break;
228 
229 		case 'E':
230 			log_err = 1;
231 			break;
232 
233 		case 'S':
234 			do_sendfile = true;
235 			break;
236 
237 		case 'b':
238 			buflen = strtol(optarg, NULL, 10);
239 			if (buflen <= 0)
240 				errx(1, "invalid -b");
241 			break;
242 
243 		case 'c':
244 			nconn = strtol(optarg, NULL, 10);
245 			if (nconn <= 0)
246 				errx(1, "invalid -c");
247 			break;
248 
249 		case 'l':
250 			dur = strtoul(optarg, NULL, 10);
251 			if (dur == 0)
252 				errx(1, "invalid -l");
253 			break;
254 
255 		case 'p':
256 			port = strtoul(optarg, NULL, 10);
257 			break;
258 
259 		case 'r':
260 			readto_ms = strtol(optarg, NULL, 10);
261 			if (readto_ms <= 0)
262 				errx(1, "invalid -r");
263 			break;
264 
265 		case 't':
266 			nthr = strtol(optarg, NULL, 10);
267 			if (nthr <= 0)
268 				errx(1, "invalid -t");
269 			break;
270 
271 		default:
272 			usage(argv[0]);
273 		}
274 	}
275 	if (in_arr_cnt == 0 || nconn == 0)
276 		errx(1, "either -4 or -c are specified");
277 
278 	if (nthr > nconn)
279 		nthr = nconn;
280 
281 	for (i = 0; i < in_arr_cnt; ++i)
282 		in_arr[i].sin_port = htons(port);
283 
284 	ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
285 	if (ctx_arr == NULL)
286 		err(1, "calloc failed");
287 
288 	memset(&glob, 0, sizeof(glob));
289 	STAILQ_INIT(&glob.g_conn);
290 	glob.g_nconn = nconn;
291 	glob.g_nwait = 1; /* count self in */
292 	glob.g_dur = dur;
293 	glob.g_readto_ms = readto_ms;
294 	glob.g_sendfile = do_sendfile;
295 	glob.g_buflen = buflen;
296 	pthread_mutex_init(&glob.g_lock, NULL);
297 	pthread_cond_init(&glob.g_cond, NULL);
298 
299 	pthread_set_name_np(pthread_self(), "main");
300 
301 	/* Build receiver address list */
302 	send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
303 
304 	/*
305 	 * Start senders.
306 	 */
307 	for (i = 0; i < nthr; ++i) {
308 		pthread_attr_t attr;
309 		int error;
310 
311 		ctx = &ctx_arr[i];
312 		STAILQ_INIT(&ctx->t_conn);
313 		ctx->t_id = i;
314 		ctx->t_glob = &glob;
315 		pthread_mutex_init(&ctx->t_lock, NULL);
316 		pthread_cond_init(&ctx->t_cond, NULL);
317 
318 		pthread_attr_init(&attr);
319 		if (bindcpu) {
320 #ifdef __FreeBSD__
321 			cpuset_t mask;
322 #else
323 			cpu_set_t mask;
324 #endif
325 
326 			CPU_ZERO(&mask);
327 			CPU_SET(i % ncpus, &mask);
328 			error = pthread_attr_setaffinity_np(&attr,
329 			    sizeof(mask), &mask);
330 			if (error) {
331 				errc(1, error, "pthread_attr_setaffinity_np "
332 				    "failed");
333 			}
334 		}
335 
336 		error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx);
337 		if (error)
338 			errc(1, error, "pthread_create failed");
339 		pthread_attr_destroy(&attr);
340 	}
341 
342 	/*
343 	 * Distribute connections to senders.
344 	 *
345 	 * NOTE:
346 	 * We start from a random position in the address list, so that the
347 	 * first several receiving servers will not be abused, if the number
348 	 * of connections is small and there are many clients.
349 	 */
350 	idx = arc4random_uniform(ndaddr);
351 	for (i = 0; i < nconn; ++i) {
352 		const struct sockaddr_in *da;
353 
354 		da = &daddr[idx % ndaddr];
355 		++idx;
356 
357 		conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
358 		if (conn == NULL)
359 			err(1, "aligned_alloc failed");
360 		memset(conn, 0, sizeof(*conn));
361 		conn->c_in = *da;
362 		conn->c_s = -1;
363 
364 		ctx = &ctx_arr[i % nthr];
365 		conn->c_thr_id = ctx->t_id;
366 
367 		pthread_mutex_lock(&ctx->t_lock);
368 		STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
369 		pthread_mutex_unlock(&ctx->t_lock);
370 		pthread_cond_signal(&ctx->t_cond);
371 
372 		/* Add to the global list for results gathering */
373 		STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
374 	}
375 
376 	/*
377 	 * No more connections; notify the senders.
378 	 *
379 	 * NOTE:
380 	 * The marker for 'the end of connection list' has 0 in its
381 	 * c_in.sin_port.
382 	 */
383 	for (i = 0; i < nthr; ++i) {
384 		conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
385 		if (conn == NULL)
386 			err(1, "aligned_alloc failed");
387 		memset(conn, 0, sizeof(*conn));
388 		conn->c_s = -1;
389 
390 		ctx = &ctx_arr[i];
391 		pthread_mutex_lock(&ctx->t_lock);
392 		STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
393 		pthread_mutex_unlock(&ctx->t_lock);
394 		pthread_cond_signal(&ctx->t_cond);
395 	}
396 
397 	/*
398 	 * Sender start sync, stage 1:
399 	 * Wait for connections establishment (slow).
400 	 */
401 	pthread_mutex_lock(&glob.g_lock);
402 	while (glob.g_nconn != 0)
403 		pthread_cond_wait(&glob.g_cond, &glob.g_lock);
404 	pthread_mutex_unlock(&glob.g_lock);
405 
406 	/*
407 	 * Sender start sync, stage 2:
408 	 * Wait for senders to spin-wait; and once all senders spin-wait,
409 	 * release them by resetting g_nwait.
410 	 */
411 	while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
412 		send_spinwait();
413 
414 	fprintf(stderr, "start %d seconds sending test: %d threads, "
415 	    "%d connections\n", dur, nthr, nconn);
416 
417 	/*
418 	 * Wait for the senders to finish and gather the results.
419 	 */
420 
421 	memset(&end, 0, sizeof(end));		/* XXX stupid gcc warning */
422 	memset(&start, 0, sizeof(start));	/* XXX stupid gcc warning */
423 
424 	for (i = 0; i < nthr; ++i) {
425 		ctx = &ctx_arr[i];
426 		pthread_join(ctx->t_tid, NULL);
427 
428 		run = ctx->t_end;
429 		timespecsub(&run, &ctx->t_start);
430 		ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
431 		    ((double)run.tv_nsec / 1000.0);
432 
433 		if (i == 0) {
434 			start = ctx->t_start;
435 			end = ctx->t_end;
436 		} else {
437 			if (timespeccmp(&start, &ctx->t_start, >))
438 				start = ctx->t_start;
439 			if (timespeccmp(&end, &ctx->t_end, <))
440 				end = ctx->t_end;
441 		}
442 
443 #ifdef SEND_TIME_DEBUG
444 		fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
445 		    ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
446 		    ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
447 #endif
448 	}
449 
450 #ifdef SEND_TIME_DEBUG
451 	fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
452 	    start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
453 #endif
454 
455 	run = end;
456 	timespecsub(&run, &start);
457 	total_run_us = ((double)run.tv_sec * 1000000.0) +
458 	    ((double)run.tv_nsec / 1000.0);
459 	total = 0.0;
460 
461 	err_cnt = 0;
462 	has_minmax = 0;
463 	conn_min = 0.0;
464 	conn_max = 0.0;
465 
466 	jain = 0.0;
467 	jain_res = 0.0;
468 	jain_cnt = 0;
469 
470 	STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
471 		total += conn->c_stat;
472 		if (conn->c_err == 0) {
473 			double perf;	/* unit: Mbps */
474 
475 			perf = (conn->c_stat * 8.0) /
476 			    ctx_arr[conn->c_thr_id].t_run_us;
477 			if (!has_minmax) {
478 				conn_min = perf;
479 				conn_max = perf;
480 				has_minmax = 1;
481 			} else {
482 				if (perf > conn_max)
483 					conn_max = perf;
484 				if (perf < conn_min)
485 					conn_min = perf;
486 			}
487 			jain += (perf * perf);
488 			jain_res += perf;
489 			++jain_cnt;
490 		} else {
491 			++err_cnt;
492 		}
493 	}
494 
495 	jain *= jain_cnt;
496 	jain = (jain_res * jain_res) / jain;
497 
498 	printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
499 	    "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
500 	    jain, err_cnt);
501 
502 	if (log_err && err_cnt) {
503 		STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
504 			char name[INET_ADDRSTRLEN];
505 			double tmp_run;
506 
507 			if (conn->c_err == 0)
508 				continue;
509 
510 			run = conn->c_terr;
511 			timespecsub(&run, &ctx_arr[conn->c_thr_id].t_start);
512 			tmp_run = ((double)run.tv_sec * 1000000.0) +
513 			    ((double)run.tv_nsec / 1000.0);
514 			fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
515 			    "errno %d\n",
516 			    conn->c_thr_id,
517 			    inet_ntop(AF_INET, &conn->c_in.sin_addr,
518 			        name, sizeof(name)),
519 			    ntohs(conn->c_in.sin_port),
520 			    run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
521 			    conn->c_err);
522 			--err_cnt;
523 			if (err_cnt == 0)
524 				break;
525 		}
526 	}
527 
528 	exit(0);
529 }
530 
531 static void
532 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
533     const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
534 {
535 	struct sockaddr_in *daddr;
536 	struct timeval readto;
537 	int i, ndaddr;
538 
539 	daddr = NULL;
540 	ndaddr = 0;
541 
542 	memset(&readto, 0, sizeof(readto));
543 	readto.tv_sec = readto_ms / 1000;
544 	readto.tv_usec = (readto_ms % 1000) * 1000;
545 
546 	for (i = 0; i < in_arr_cnt; ++i) {
547 		const struct sockaddr_in *in = &in_arr[i];
548 		struct recv_info info_hdr;
549 		uint16_t *ports;
550 		int s, n, ports_sz, d;
551 
552 again:
553 		s = socket(AF_INET, SOCK_STREAM, 0);
554 		if (s < 0)
555 			err(1, "socket failed");
556 
557 		if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
558 			err(1, "connect failed");
559 
560 		if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
561 		    &readto, sizeof(readto)) < 0)
562 			err(1, "setsockopt(RCVTIMEO) failed");
563 
564 		n = read(s, &info_hdr, sizeof(info_hdr));
565 		if (n != sizeof(info_hdr)) {
566 			if (n < 0) {
567 				if (errno == EAGAIN) {
568 					close(s);
569 					goto again;
570 				}
571 				err(1, "read info hdr failed");
572 			} else {
573 				errx(1, "read truncated info hdr");
574 			}
575 		}
576 		if (info_hdr.ndport == 0) {
577 			close(s);
578 			continue;
579 		}
580 
581 		ports_sz = info_hdr.ndport * sizeof(uint16_t);
582 		ports = malloc(ports_sz);
583 		if (ports == NULL)
584 			err(1, "malloc failed");
585 
586 		n = read(s, ports, ports_sz);
587 		if (n != ports_sz) {
588 			if (n < 0) {
589 				if (errno == EAGAIN) {
590 					free(ports);
591 					close(s);
592 					goto again;
593 				}
594 				err(1, "read ports failed");
595 			} else {
596 				errx(1, "read truncated ports");
597 			}
598 		}
599 
600 		daddr = reallocf(daddr,
601 		    (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
602 		if (daddr == NULL)
603 			err(1, "reallocf failed");
604 
605 		for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
606 			struct sockaddr_in *da = &daddr[d];
607 
608 			*da = *in;
609 			da->sin_port = ports[d - ndaddr];
610 		}
611 		ndaddr += info_hdr.ndport;
612 
613 		free(ports);
614 		close(s);
615 	}
616 
617 #ifdef SEND_DEBUG
618 	for (i = 0; i < ndaddr; ++i) {
619 		const struct sockaddr_in *da = &daddr[i];
620 		char name[INET_ADDRSTRLEN];
621 
622 		fprintf(stderr, "%s:%d\n",
623 		    inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
624 		    ntohs(da->sin_port));
625 	}
626 #endif
627 
628 	*daddr0 = daddr;
629 	*ndaddr0 = ndaddr;
630 }
631 
632 static void *
633 send_thread(void *xctx)
634 {
635 	struct send_thrctx *ctx = xctx;
636 	struct conn_ctx *timeo;
637 	struct kevent chg_evt;
638 	uint8_t *buf;
639 	int nconn = 0, kq, n, fd = -1, buflen;
640 	char name[32];
641 
642 	snprintf(name, sizeof(name), "snd%d", ctx->t_id);
643 	pthread_set_name_np(pthread_self(), name);
644 
645 	buflen = ctx->t_glob->g_buflen;
646 	buf = malloc(buflen);
647 	if (buf == NULL)
648 		err(1, "malloc(%d) failed", buflen);
649 
650 	if (ctx->t_glob->g_sendfile) {
651 		char filename[] = "sendtmpXXX";
652 
653 		fd = mkstemp(filename);
654 		if (fd < 0)
655 			err(1, "mkstemp failed");
656 		if (write(fd, buf, buflen) != buflen)
657 			err(1, "write to file failed");
658 		unlink(filename);
659 		free(buf);
660 		buf = NULL;
661 	}
662 
663 	kq = kqueue();
664 	if (kq < 0)
665 		err(1, "kqueue failed");
666 
667 	/*
668 	 * Establish the connections assigned to us and add the
669 	 * established connections to kqueue.
670 	 */
671 	for (;;) {
672 #ifdef SEND_DEBUG
673 		char addr_name[INET_ADDRSTRLEN];
674 #endif
675 		struct timeval readto;
676 		struct conn_ctx *conn;
677 		struct conn_ack ack;
678 		int on;
679 
680 		pthread_mutex_lock(&ctx->t_lock);
681 		while (STAILQ_EMPTY(&ctx->t_conn))
682 			pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
683 		conn = STAILQ_FIRST(&ctx->t_conn);
684 		STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
685 		pthread_mutex_unlock(&ctx->t_lock);
686 
687 		if (conn->c_in.sin_port == 0) {
688 			/*
689 			 * The marker for 'the end of connection list'.
690 			 * See the related comment in main thread.
691 			 *
692 			 * NOTE:
693 			 * We reuse the marker as the udata for the
694 			 * kqueue timer.
695 			 */
696 			timeo = conn;
697 			break;
698 		}
699 
700 		++nconn;
701 #ifdef SEND_DEBUG
702 		fprintf(stderr, "%s %s:%d\n", name,
703 		    inet_ntop(AF_INET, &conn->c_in.sin_addr,
704 		        addr_name, sizeof(addr_name)),
705 		    ntohs(conn->c_in.sin_port));
706 #endif
707 
708 again:
709 		conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
710 		if (conn->c_s < 0)
711 			err(1, "socket failed");
712 
713 		if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
714 		    sizeof(conn->c_in)) < 0)
715 			err(1, "connect failed");
716 
717 		memset(&readto, 0, sizeof(readto));
718 		readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
719 		readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
720 		if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
721 		    sizeof(readto)) < 0)
722 			err(1, "setsockopt(RCVTIMEO) failed");
723 
724 		n = read(conn->c_s, &ack, sizeof(ack));
725 		if (n != sizeof(ack)) {
726 			if (n < 0) {
727 				if (errno == EAGAIN) {
728 					close(conn->c_s);
729 					goto again;
730 				}
731 				err(1, "read ack failed");
732 			} else {
733 				errx(1, "read truncated ack");
734 			}
735 		}
736 
737 		on = 1;
738 		if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
739 			err(1, "ioctl(FIONBIO) failed");
740 
741 		EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
742 		n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
743 		if (n < 0)
744 			err(1, "kevent add failed");
745 	}
746 #ifdef SEND_DEBUG
747 	fprintf(stderr, "%s conn %d\n", name, nconn);
748 #endif
749 
750 	/*
751 	 * Sender start sync, stage 1:
752 	 * Wait for connections establishment (slow).
753 	 */
754 	pthread_mutex_lock(&ctx->t_glob->g_lock);
755 	ctx->t_glob->g_nconn -= nconn;
756 	pthread_cond_broadcast(&ctx->t_glob->g_cond);
757 	while (ctx->t_glob->g_nconn != 0)
758 		pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
759 	pthread_mutex_unlock(&ctx->t_glob->g_lock);
760 
761 	/*
762 	 * Sender start sync, stage2.
763 	 */
764 	/* Increase the g_nwait. */
765 	atomic_add_int(&ctx->t_glob->g_nwait, 1);
766 	/* Spin-wait for main thread to release us (reset g_nwait). */
767 	while (ctx->t_glob->g_nwait)
768 		send_spinwait();
769 
770 #ifdef SEND_DEBUG
771 	fprintf(stderr, "%s start\n", name);
772 #endif
773 
774 	/*
775 	 * Wire a kqueue timer, so that the sending can be terminated
776 	 * as requested.
777 	 *
778 	 * NOTE:
779 	 * Set -2 to c_s for timer udata, so we could distinguish it
780 	 * from real connections.
781 	 */
782 	timeo->c_s = -2;
783 	EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
784 	    ctx->t_glob->g_dur * 1000L, timeo);
785 	n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
786 	if (n < 0)
787 		err(1, "kevent add failed");
788 
789 	clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
790 	for (;;) {
791 		struct kevent evt[SEND_EVENT_MAX];
792 		int nevt, i;
793 
794 		nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
795 		if (nevt < 0)
796 			err(1, "kevent failed");
797 
798 		for (i = 0; i < nevt; ++i) {
799 			struct conn_ctx *conn = evt[i].udata;
800 
801 			if (conn->c_s < 0) {
802 				if (conn->c_s == -2) {
803 					/* Timer expired */
804 					goto done;
805 				}
806 				continue;
807 			}
808 
809 			if (fd >= 0) {
810 				off_t m, off;
811 				size_t len;
812 
813 				off = conn->c_stat % buflen;
814 				len = buflen - off;
815 
816 				n = sendfile(fd, conn->c_s, off, len, NULL,
817 				    &m, 0);
818 				if (n == 0 || (n < 0 && errno == EAGAIN))
819 					n = m;
820 			} else {
821 				n = write(conn->c_s, buf, buflen);
822 			}
823 
824 			if (n < 0) {
825 				if (errno != EAGAIN) {
826 					conn->c_err = errno;
827 					clock_gettime(CLOCK_MONOTONIC_PRECISE,
828 					    &conn->c_terr);
829 					close(conn->c_s);
830 					conn->c_s = -1;
831 				}
832 			} else {
833 				conn->c_stat += n;
834 			}
835 		}
836 	}
837 done:
838 	clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
839 
840 	if (fd >= 0)
841 		close(fd);
842 	if (buf != NULL)
843 		free(buf);
844 	return NULL;
845 }
846