1 #include <sys/param.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/queue.h>
5 #include <sys/socket.h>
6 #include <sys/sysctl.h>
7 #include <sys/time.h>
8 
9 #include <machine/atomic.h>
10 #ifdef __FreeBSD__
11 #include <machine/cpu.h>
12 #endif
13 #include <machine/cpufunc.h>
14 
15 #include <arpa/inet.h>
16 #include <netinet/in.h>
17 
18 #include <err.h>
19 #include <errno.h>
20 #include <pthread.h>
21 #include <pthread_np.h>
22 #include <signal.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>
28 
29 #include "kq_sendrecv_proto.h"
30 
31 /*
32  * Note about the sender start synchronization.
33  *
34  * We apply two stage synchronization.  The first stage uses pthread
35  * condition (it sleeps), which waits for the establishment for all
36  * connections, which could be slow.  The second stage uses g_nwait
37  * of send_globctx; all relevant threads spin on g_nwait.  The main
38  * thread spin-waits for all senders to increase g_nwait.  The sender
39  * thread increases the g_nwait, then it spin-waits for main thread
40  * to reset g_nwait.  In this way, we can make sure that all senders
41  * start roughly at the same time.
42  */
43 
44 #ifndef timespecsub
45 #define timespecsub(vvp, uvp)						\
46 	do {								\
47 		(vvp)->tv_sec -= (uvp)->tv_sec;				\
48 		(vvp)->tv_nsec -= (uvp)->tv_nsec;			\
49 		if ((vvp)->tv_nsec < 0) {				\
50 			(vvp)->tv_sec--;				\
51 			(vvp)->tv_nsec += 1000000000;			\
52 		}							\
53 	} while (0)
54 #endif
55 
56 #ifndef timespeccmp
57 #define	timespeccmp(tvp, uvp, cmp)					\
58 	(((tvp)->tv_sec == (uvp)->tv_sec) ?				\
59 	    ((tvp)->tv_nsec cmp (uvp)->tv_nsec) :			\
60 	    ((tvp)->tv_sec cmp (uvp)->tv_sec))
61 #endif
62 
63 #if 0
64 #define SEND_DEBUG
65 #endif
66 #if 0
67 #define SEND_TIME_DEBUG
68 #endif
69 
70 #define SEND_DUR		10
71 #define SEND_EVENT_MAX		64
72 #define SEND_BUFLEN		(128 * 1024)
73 
74 /*
75  * The successful 3-way handshake on the connection does not mean the
76  * remote application can accept(2) this connection.  Even worse, the
77  * remote side's network stack may drop the connection silently, i.e.
78  * w/o RST.  If this happened, the blocking read(2) would not return,
79  * until the keepalive kicked in, which would take quite some time.
80  * This is obviously not what we want here, so use synthetic timeout
81  * for blocking read(2).  Here, we will retry if a blocking read(2)
82  * times out.
83  */
84 #define SEND_READTO_MS		1000		/* unit: ms */
85 
86 #if defined(__DragonFly__)
87 #define SEND_CONN_CTX_ALIGN	__VM_CACHELINE_SIZE
88 #elif defined(__FreeBSD__)
89 #define SEND_CONN_CTX_ALIGN	CACHE_LINE_SIZE
90 #else
91 #define SEND_CONN_CTX_ALIGN	64	/* XXX */
92 #endif
93 
94 struct conn_ctx {
95 	int			c_s;
96 	int			c_err;
97 	uint64_t		c_stat;
98 	struct timespec		c_terr;
99 
100 	STAILQ_ENTRY(conn_ctx)	c_glob_link;
101 	STAILQ_ENTRY(conn_ctx)	c_link;
102 	struct sockaddr_in	c_in;
103 	int			c_thr_id;
104 } __aligned(SEND_CONN_CTX_ALIGN);
105 
106 STAILQ_HEAD(conn_ctx_list, conn_ctx);
107 
108 struct send_globctx {
109 	struct conn_ctx_list	g_conn;
110 
111 	int			g_dur;
112 	int			g_nconn;
113 	pthread_mutex_t		g_lock;
114 	pthread_cond_t		g_cond;
115 
116 	volatile u_int		g_nwait;
117 	int			g_readto_ms;	/* unit: ms */
118 };
119 
120 struct send_thrctx {
121 	struct conn_ctx_list	t_conn;
122 	pthread_mutex_t		t_lock;
123 	pthread_cond_t		t_cond;
124 
125 	struct send_globctx	*t_glob;
126 	struct timespec		t_start;
127 	struct timespec		t_end;
128 	double			t_run_us;	/* unit: us */
129 
130 	pthread_t		t_tid;
131 	int			t_id;
132 };
133 
134 static void	send_build_addrlist(const struct sockaddr_in *, int,
135 		    const struct sockaddr_in **, int *, int);
136 static void	*send_thread(void *);
137 
138 static __inline void
139 send_spinwait(void)
140 {
141 #if defined(__DragonFly__)
142 	cpu_pause();
143 #elif defined(__FreeBSD__)
144 	cpu_spinwait();
145 #else
146 	/* XXX nothing */
147 #endif
148 }
149 
150 static void
151 usage(const char *cmd)
152 {
153 	fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
154 	    "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-E]\n", cmd);
155 	exit(2);
156 }
157 
158 int
159 main(int argc, char *argv[])
160 {
161 	struct send_globctx glob;
162 	struct send_thrctx *ctx_arr, *ctx;
163 	struct sockaddr_in *in_arr, *in;
164 	const struct sockaddr_in *daddr;
165 	struct timespec run, end, start;
166 	double total_run_us, total, conn_min, conn_max;
167 	double jain, jain_res;
168 	int jain_cnt;
169 	struct conn_ctx *conn;
170 	sigset_t sigset;
171 	int opt, i;
172 	int in_arr_cnt, in_arr_sz, ndaddr;
173 	int nthr, nconn, dur, readto_ms;
174 	int log_err, err_cnt, has_minmax;
175 	u_short port = RECV_PORT;
176 	uint32_t idx;
177 	size_t sz;
178 
179 	sigemptyset(&sigset);
180 	sigaddset(&sigset, SIGPIPE);
181 	if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
182 		err(1, "sigprocmask failed");
183 
184 	sz = sizeof(nthr);
185 	if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
186 		err(1, "sysctl hw.ncpu failed");
187 
188 	in_arr_sz = 4;
189 	in_arr_cnt = 0;
190 	in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
191 	if (in_arr == NULL)
192 		err(1, "malloc failed");
193 
194 	log_err = 0;
195 	nconn = 0;
196 	dur = SEND_DUR;
197 	readto_ms = SEND_READTO_MS;
198 
199 	while ((opt = getopt(argc, argv, "4:Ec:l:p:r:t:")) != -1) {
200 		switch (opt) {
201 		case '4':
202 			if (in_arr_cnt == in_arr_sz) {
203 				in_arr_sz *= 2;
204 				in_arr = reallocf(in_arr,
205 				    in_arr_sz * sizeof(struct sockaddr_in));
206 				if (in_arr == NULL)
207 					err(1, "reallocf failed");
208 			}
209 			in = &in_arr[in_arr_cnt];
210 			++in_arr_cnt;
211 
212 			memset(in, 0, sizeof(*in));
213 			in->sin_family = AF_INET;
214 			if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
215 				errx(1, "inet_pton failed %s", optarg);
216 			break;
217 
218 		case 'E':
219 			log_err = 1;
220 			break;
221 
222 		case 'c':
223 			nconn = strtol(optarg, NULL, 10);
224 			if (nconn <= 0)
225 				errx(1, "invalid -c");
226 			break;
227 
228 		case 'l':
229 			dur = strtoul(optarg, NULL, 10);
230 			if (dur == 0)
231 				errx(1, "invalid -l");
232 			break;
233 
234 		case 'p':
235 			port = strtoul(optarg, NULL, 10);
236 			break;
237 
238 		case 'r':
239 			readto_ms = strtol(optarg, NULL, 10);
240 			if (readto_ms <= 0)
241 				errx(1, "invalid -r");
242 			break;
243 
244 		case 't':
245 			nthr = strtol(optarg, NULL, 10);
246 			if (nthr <= 0)
247 				errx(1, "invalid -t");
248 			break;
249 
250 		default:
251 			usage(argv[0]);
252 		}
253 	}
254 	if (in_arr_cnt == 0 || nconn == 0)
255 		errx(1, "neither -4 nor -c are specified");
256 
257 	if (nthr > nconn)
258 		nthr = nconn;
259 
260 	for (i = 0; i < in_arr_cnt; ++i)
261 		in_arr[i].sin_port = htons(port);
262 
263 	ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
264 	if (ctx_arr == NULL)
265 		err(1, "calloc failed");
266 
267 	memset(&glob, 0, sizeof(glob));
268 	STAILQ_INIT(&glob.g_conn);
269 	glob.g_nconn = nconn;
270 	glob.g_nwait = 1; /* count self in */
271 	glob.g_dur = dur;
272 	glob.g_readto_ms = readto_ms;
273 	pthread_mutex_init(&glob.g_lock, NULL);
274 	pthread_cond_init(&glob.g_cond, NULL);
275 
276 	pthread_set_name_np(pthread_self(), "main");
277 
278 	/* Build receiver address list */
279 	send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
280 
281 	/*
282 	 * Start senders.
283 	 */
284 	for (i = 0; i < nthr; ++i) {
285 		int error;
286 
287 		ctx = &ctx_arr[i];
288 		STAILQ_INIT(&ctx->t_conn);
289 		ctx->t_id = i;
290 		ctx->t_glob = &glob;
291 		pthread_mutex_init(&ctx->t_lock, NULL);
292 		pthread_cond_init(&ctx->t_cond, NULL);
293 
294 		error = pthread_create(&ctx->t_tid, NULL, send_thread, ctx);
295 		if (error)
296 			errc(1, error, "pthread_create failed");
297 	}
298 
299 	/*
300 	 * Distribute connections to senders.
301 	 *
302 	 * NOTE:
303 	 * We start from a random position in the address list, so that the
304 	 * first several receiving servers will not be abused, if the number
305 	 * of connections is small and there are many clients.
306 	 */
307 	idx = arc4random_uniform(ndaddr);
308 	for (i = 0; i < nconn; ++i) {
309 		const struct sockaddr_in *da;
310 
311 		da = &daddr[idx % ndaddr];
312 		++idx;
313 
314 		conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
315 		if (conn == NULL)
316 			err(1, "aligned_alloc failed");
317 		memset(conn, 0, sizeof(*conn));
318 		conn->c_in = *da;
319 		conn->c_s = -1;
320 
321 		ctx = &ctx_arr[i % nthr];
322 		conn->c_thr_id = ctx->t_id;
323 
324 		pthread_mutex_lock(&ctx->t_lock);
325 		STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
326 		pthread_mutex_unlock(&ctx->t_lock);
327 		pthread_cond_signal(&ctx->t_cond);
328 
329 		/* Add to the global list for results gathering */
330 		STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
331 	}
332 
333 	/*
334 	 * No more connections; notify the senders.
335 	 *
336 	 * NOTE:
337 	 * The marker for 'the end of connection list' has 0 in its
338 	 * c_in.sin_port.
339 	 */
340 	for (i = 0; i < nthr; ++i) {
341 		conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
342 		if (conn == NULL)
343 			err(1, "aligned_alloc failed");
344 		memset(conn, 0, sizeof(*conn));
345 		conn->c_s = -1;
346 
347 		ctx = &ctx_arr[i];
348 		pthread_mutex_lock(&ctx->t_lock);
349 		STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
350 		pthread_mutex_unlock(&ctx->t_lock);
351 		pthread_cond_signal(&ctx->t_cond);
352 	}
353 
354 	/*
355 	 * Sender start sync, stage 1:
356 	 * Wait for connections establishment (slow).
357 	 */
358 	pthread_mutex_lock(&glob.g_lock);
359 	while (glob.g_nconn != 0)
360 		pthread_cond_wait(&glob.g_cond, &glob.g_lock);
361 	pthread_mutex_unlock(&glob.g_lock);
362 
363 	/*
364 	 * Sender start sync, stage 2:
365 	 * Wait for senders to spin-wait; and once all senders spin-wait,
366 	 * release them by resetting g_nwait.
367 	 */
368 	while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
369 		send_spinwait();
370 
371 	fprintf(stderr, "start %d seconds sending test: %d threads, "
372 	    "%d connections\n", dur, nthr, nconn);
373 
374 	/*
375 	 * Wait for the senders to finish and gather the results.
376 	 */
377 
378 	memset(&end, 0, sizeof(end));		/* XXX stupid gcc warning */
379 	memset(&start, 0, sizeof(start));	/* XXX stupid gcc warning */
380 
381 	for (i = 0; i < nthr; ++i) {
382 		ctx = &ctx_arr[i];
383 		pthread_join(ctx->t_tid, NULL);
384 
385 		run = ctx->t_end;
386 		timespecsub(&run, &ctx->t_start);
387 		ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
388 		    ((double)run.tv_nsec / 1000.0);
389 
390 		if (i == 0) {
391 			start = ctx->t_start;
392 			end = ctx->t_end;
393 		} else {
394 			if (timespeccmp(&start, &ctx->t_start, >))
395 				start = ctx->t_start;
396 			if (timespeccmp(&end, &ctx->t_end, <))
397 				end = ctx->t_end;
398 		}
399 
400 #ifdef SEND_TIME_DEBUG
401 		fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
402 		    ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
403 		    ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
404 #endif
405 	}
406 
407 #ifdef SEND_TIME_DEBUG
408 	fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
409 	    start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
410 #endif
411 
412 	run = end;
413 	timespecsub(&run, &start);
414 	total_run_us = ((double)run.tv_sec * 1000000.0) +
415 	    ((double)run.tv_nsec / 1000.0);
416 	total = 0.0;
417 
418 	err_cnt = 0;
419 	has_minmax = 0;
420 	conn_min = 0.0;
421 	conn_max = 0.0;
422 
423 	jain = 0.0;
424 	jain_res = 0.0;
425 	jain_cnt = 0;
426 
427 	STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
428 		total += conn->c_stat;
429 		if (conn->c_err == 0) {
430 			double perf;	/* unit: Mbps */
431 
432 			perf = (conn->c_stat * 8.0) /
433 			    ctx_arr[conn->c_thr_id].t_run_us;
434 			if (!has_minmax) {
435 				conn_min = perf;
436 				conn_max = perf;
437 				has_minmax = 1;
438 			} else {
439 				if (perf > conn_max)
440 					conn_max = perf;
441 				if (perf < conn_min)
442 					conn_min = perf;
443 			}
444 			jain += (perf * perf);
445 			jain_res += perf;
446 			++jain_cnt;
447 		} else {
448 			++err_cnt;
449 		}
450 	}
451 
452 	jain *= jain_cnt;
453 	jain = (jain_res * jain_res) / jain;
454 
455 	printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
456 	    "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
457 	    jain, err_cnt);
458 
459 	if (log_err && err_cnt) {
460 		STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
461 			char name[INET_ADDRSTRLEN];
462 			double tmp_run;
463 
464 			if (conn->c_err == 0)
465 				continue;
466 
467 			run = conn->c_terr;
468 			timespecsub(&run, &ctx_arr[conn->c_thr_id].t_start);
469 			tmp_run = ((double)run.tv_sec * 1000000.0) +
470 			    ((double)run.tv_nsec / 1000.0);
471 			fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
472 			    "errno %d\n",
473 			    conn->c_thr_id,
474 			    inet_ntop(AF_INET, &conn->c_in.sin_addr,
475 			        name, sizeof(name)),
476 			    ntohs(conn->c_in.sin_port),
477 			    run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
478 			    conn->c_err);
479 			--err_cnt;
480 			if (err_cnt == 0)
481 				break;
482 		}
483 	}
484 
485 	exit(0);
486 }
487 
488 static void
489 send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
490     const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
491 {
492 	struct sockaddr_in *daddr;
493 	struct timeval readto;
494 	int i, ndaddr;
495 
496 	daddr = NULL;
497 	ndaddr = 0;
498 
499 	memset(&readto, 0, sizeof(readto));
500 	readto.tv_sec = readto_ms / 1000;
501 	readto.tv_usec = (readto_ms % 1000) * 1000;
502 
503 	for (i = 0; i < in_arr_cnt; ++i) {
504 		const struct sockaddr_in *in = &in_arr[i];
505 		struct recv_info info_hdr;
506 		uint16_t *ports;
507 		int s, n, ports_sz, d;
508 
509 again:
510 		s = socket(AF_INET, SOCK_STREAM, 0);
511 		if (s < 0)
512 			err(1, "socket failed");
513 
514 		if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
515 			err(1, "connect failed");
516 
517 		if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
518 		    &readto, sizeof(readto)) < 0)
519 			err(1, "setsockopt(RCVTIMEO) failed");
520 
521 		n = read(s, &info_hdr, sizeof(info_hdr));
522 		if (n != sizeof(info_hdr)) {
523 			if (n < 0) {
524 				if (errno == EAGAIN) {
525 					close(s);
526 					goto again;
527 				}
528 				err(1, "read info hdr failed");
529 			} else {
530 				errx(1, "read truncated info hdr");
531 			}
532 		}
533 		if (info_hdr.ndport == 0) {
534 			close(s);
535 			continue;
536 		}
537 
538 		ports_sz = info_hdr.ndport * sizeof(uint16_t);
539 		ports = malloc(ports_sz);
540 		if (ports == NULL)
541 			err(1, "malloc failed");
542 
543 		n = read(s, ports, ports_sz);
544 		if (n != ports_sz) {
545 			if (n < 0) {
546 				if (errno == EAGAIN) {
547 					free(ports);
548 					close(s);
549 					goto again;
550 				}
551 				err(1, "read ports failed");
552 			} else {
553 				errx(1, "read truncated ports");
554 			}
555 		}
556 
557 		daddr = reallocf(daddr,
558 		    (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
559 		if (daddr == NULL)
560 			err(1, "reallocf failed");
561 
562 		for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
563 			struct sockaddr_in *da = &daddr[d];
564 
565 			*da = *in;
566 			da->sin_port = ports[d - ndaddr];
567 		}
568 		ndaddr += info_hdr.ndport;
569 
570 		free(ports);
571 		close(s);
572 	}
573 
574 #ifdef SEND_DEBUG
575 	for (i = 0; i < ndaddr; ++i) {
576 		const struct sockaddr_in *da = &daddr[i];
577 		char name[INET_ADDRSTRLEN];
578 
579 		fprintf(stderr, "%s:%d\n",
580 		    inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
581 		    ntohs(da->sin_port));
582 	}
583 #endif
584 
585 	*daddr0 = daddr;
586 	*ndaddr0 = ndaddr;
587 }
588 
589 static void *
590 send_thread(void *xctx)
591 {
592 	struct send_thrctx *ctx = xctx;
593 	struct conn_ctx *timeo;
594 	struct kevent chg_evt;
595 	uint8_t *buf;
596 	int nconn = 0, kq, n;
597 	char name[32];
598 
599 	snprintf(name, sizeof(name), "snd%d", ctx->t_id);
600 	pthread_set_name_np(pthread_self(), name);
601 
602 	buf = malloc(SEND_BUFLEN);
603 	if (buf == NULL)
604 		err(1, "malloc failed");
605 
606 	kq = kqueue();
607 	if (kq < 0)
608 		err(1, "kqueue failed");
609 
610 	/*
611 	 * Establish the connections assigned to us and add the
612 	 * established connections to kqueue.
613 	 */
614 	for (;;) {
615 #ifdef SEND_DEBUG
616 		char addr_name[INET_ADDRSTRLEN];
617 #endif
618 		struct timeval readto;
619 		struct conn_ctx *conn;
620 		struct conn_ack ack;
621 		int on;
622 
623 		pthread_mutex_lock(&ctx->t_lock);
624 		while (STAILQ_EMPTY(&ctx->t_conn))
625 			pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
626 		conn = STAILQ_FIRST(&ctx->t_conn);
627 		STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
628 		pthread_mutex_unlock(&ctx->t_lock);
629 
630 		if (conn->c_in.sin_port == 0) {
631 			/*
632 			 * The marker for 'the end of connection list'.
633 			 * See the related comment in main thread.
634 			 *
635 			 * NOTE:
636 			 * We reuse the marker as the udata for the
637 			 * kqueue timer.
638 			 */
639 			timeo = conn;
640 			break;
641 		}
642 
643 		++nconn;
644 #ifdef SEND_DEBUG
645 		fprintf(stderr, "%s %s:%d\n", name,
646 		    inet_ntop(AF_INET, &conn->c_in.sin_addr,
647 		        addr_name, sizeof(addr_name)),
648 		    ntohs(conn->c_in.sin_port));
649 #endif
650 
651 again:
652 		conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
653 		if (conn->c_s < 0)
654 			err(1, "socket failed");
655 
656 		if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
657 		    sizeof(conn->c_in)) < 0)
658 			err(1, "connect failed");
659 
660 		memset(&readto, 0, sizeof(readto));
661 		readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
662 		readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
663 		if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
664 		    sizeof(readto)) < 0)
665 			err(1, "setsockopt(RCVTIMEO) failed");
666 
667 		n = read(conn->c_s, &ack, sizeof(ack));
668 		if (n != sizeof(ack)) {
669 			if (n < 0) {
670 				if (errno == EAGAIN) {
671 					close(conn->c_s);
672 					goto again;
673 				}
674 				err(1, "read ack failed");
675 			} else {
676 				errx(1, "read truncated ack");
677 			}
678 		}
679 
680 		on = 1;
681 		if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
682 			err(1, "ioctl(FIONBIO) failed");
683 
684 		EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
685 		n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
686 		if (n < 0)
687 			err(1, "kevent add failed");
688 	}
689 #ifdef SEND_DEBUG
690 	fprintf(stderr, "%s conn %d\n", name, nconn);
691 #endif
692 
693 	/*
694 	 * Sender start sync, stage 1:
695 	 * Wait for connections establishment (slow).
696 	 */
697 	pthread_mutex_lock(&ctx->t_glob->g_lock);
698 	ctx->t_glob->g_nconn -= nconn;
699 	pthread_cond_broadcast(&ctx->t_glob->g_cond);
700 	while (ctx->t_glob->g_nconn != 0)
701 		pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
702 	pthread_mutex_unlock(&ctx->t_glob->g_lock);
703 
704 	/*
705 	 * Sender start sync, stage2.
706 	 */
707 	/* Increase the g_nwait. */
708 	atomic_add_int(&ctx->t_glob->g_nwait, 1);
709 	/* Spin-wait for main thread to release us (reset g_nwait). */
710 	while (ctx->t_glob->g_nwait)
711 		send_spinwait();
712 
713 #ifdef SEND_DEBUG
714 	fprintf(stderr, "%s start\n", name);
715 #endif
716 
717 	/*
718 	 * Wire a kqueue timer, so that the sending can be terminated
719 	 * as requested.
720 	 *
721 	 * NOTE:
722 	 * Set -2 to c_s for timer udata, so we could distinguish it
723 	 * from real connections.
724 	 */
725 	timeo->c_s = -2;
726 	EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
727 	    ctx->t_glob->g_dur * 1000L, timeo);
728 	n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
729 	if (n < 0)
730 		err(1, "kevent add failed");
731 
732 	clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
733 	for (;;) {
734 		struct kevent evt[SEND_EVENT_MAX];
735 		int nevt, i;
736 
737 		nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
738 		if (nevt < 0)
739 			err(1, "kevent failed");
740 
741 		for (i = 0; i < nevt; ++i) {
742 			struct conn_ctx *conn = evt[i].udata;
743 
744 			if (conn->c_s < 0) {
745 				if (conn->c_s == -2) {
746 					/* Timer expired */
747 					goto done;
748 				}
749 				continue;
750 			}
751 
752 			n = write(conn->c_s, buf, SEND_BUFLEN);
753 			if (n < 0) {
754 				if (errno != EAGAIN) {
755 					conn->c_err = errno;
756 					clock_gettime(CLOCK_MONOTONIC_PRECISE,
757 					    &conn->c_terr);
758 					close(conn->c_s);
759 					conn->c_s = -1;
760 				}
761 			} else {
762 				conn->c_stat += n;
763 			}
764 		}
765 	}
766 done:
767 	clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
768 	return NULL;
769 }
770