1772669ffSSepherosa Ziehau #include <sys/param.h>
2772669ffSSepherosa Ziehau #include <sys/event.h>
3772669ffSSepherosa Ziehau #include <sys/ioctl.h>
4772669ffSSepherosa Ziehau #include <sys/queue.h>
5772669ffSSepherosa Ziehau #include <sys/socket.h>
6772669ffSSepherosa Ziehau #include <sys/sysctl.h>
7772669ffSSepherosa Ziehau #include <sys/time.h>
8772669ffSSepherosa Ziehau 
9772669ffSSepherosa Ziehau #include <machine/atomic.h>
10772669ffSSepherosa Ziehau #ifdef __FreeBSD__
11772669ffSSepherosa Ziehau #include <machine/cpu.h>
12772669ffSSepherosa Ziehau #endif
13772669ffSSepherosa Ziehau #include <machine/cpufunc.h>
14772669ffSSepherosa Ziehau 
15772669ffSSepherosa Ziehau #include <arpa/inet.h>
16772669ffSSepherosa Ziehau #include <netinet/in.h>
17772669ffSSepherosa Ziehau 
18772669ffSSepherosa Ziehau #include <err.h>
19772669ffSSepherosa Ziehau #include <errno.h>
20772669ffSSepherosa Ziehau #include <pthread.h>
21772669ffSSepherosa Ziehau #include <pthread_np.h>
22772669ffSSepherosa Ziehau #include <signal.h>
23dbeb2594SSepherosa Ziehau #include <stdbool.h>
24772669ffSSepherosa Ziehau #include <stdio.h>
25772669ffSSepherosa Ziehau #include <stdint.h>
26772669ffSSepherosa Ziehau #include <stdlib.h>
27772669ffSSepherosa Ziehau #include <string.h>
28772669ffSSepherosa Ziehau #include <unistd.h>
29772669ffSSepherosa Ziehau 
30772669ffSSepherosa Ziehau #include "kq_sendrecv_proto.h"
31772669ffSSepherosa Ziehau 
32772669ffSSepherosa Ziehau /*
33772669ffSSepherosa Ziehau  * Note about the sender start synchronization.
34772669ffSSepherosa Ziehau  *
35772669ffSSepherosa Ziehau  * We apply two stage synchronization.  The first stage uses pthread
36772669ffSSepherosa Ziehau  * condition (it sleeps), which waits for the establishment for all
37772669ffSSepherosa Ziehau  * connections, which could be slow.  The second stage uses g_nwait
38772669ffSSepherosa Ziehau  * of send_globctx; all relevant threads spin on g_nwait.  The main
39772669ffSSepherosa Ziehau  * thread spin-waits for all senders to increase g_nwait.  The sender
40772669ffSSepherosa Ziehau  * thread increases the g_nwait, then it spin-waits for main thread
41772669ffSSepherosa Ziehau  * to reset g_nwait.  In this way, we can make sure that all senders
42772669ffSSepherosa Ziehau  * start roughly at the same time.
43772669ffSSepherosa Ziehau  */
44772669ffSSepherosa Ziehau 
45772669ffSSepherosa Ziehau #if 0
46772669ffSSepherosa Ziehau #define SEND_DEBUG
47772669ffSSepherosa Ziehau #endif
48772669ffSSepherosa Ziehau #if 0
49772669ffSSepherosa Ziehau #define SEND_TIME_DEBUG
50772669ffSSepherosa Ziehau #endif
51772669ffSSepherosa Ziehau 
52772669ffSSepherosa Ziehau #define SEND_DUR		10
53772669ffSSepherosa Ziehau #define SEND_EVENT_MAX		64
54772669ffSSepherosa Ziehau #define SEND_BUFLEN		(128 * 1024)
55772669ffSSepherosa Ziehau 
56772669ffSSepherosa Ziehau /*
57772669ffSSepherosa Ziehau  * The successful 3-way handshake on the connection does not mean the
58772669ffSSepherosa Ziehau  * remote application can accept(2) this connection.  Even worse, the
59772669ffSSepherosa Ziehau  * remote side's network stack may drop the connection silently, i.e.
60772669ffSSepherosa Ziehau  * w/o RST.  If this happened, the blocking read(2) would not return,
61772669ffSSepherosa Ziehau  * until the keepalive kicked in, which would take quite some time.
62772669ffSSepherosa Ziehau  * This is obviously not what we want here, so use synthetic timeout
63772669ffSSepherosa Ziehau  * for blocking read(2).  Here, we will retry if a blocking read(2)
64772669ffSSepherosa Ziehau  * times out.
65772669ffSSepherosa Ziehau  */
66772669ffSSepherosa Ziehau #define SEND_READTO_MS		1000		/* unit: ms */
67772669ffSSepherosa Ziehau 
68772669ffSSepherosa Ziehau #if defined(__DragonFly__)
69772669ffSSepherosa Ziehau #define SEND_CONN_CTX_ALIGN	__VM_CACHELINE_SIZE
70772669ffSSepherosa Ziehau #elif defined(__FreeBSD__)
71772669ffSSepherosa Ziehau #define SEND_CONN_CTX_ALIGN	CACHE_LINE_SIZE
72772669ffSSepherosa Ziehau #else
73772669ffSSepherosa Ziehau #define SEND_CONN_CTX_ALIGN	64	/* XXX */
74772669ffSSepherosa Ziehau #endif
75772669ffSSepherosa Ziehau 
76772669ffSSepherosa Ziehau struct conn_ctx {
77772669ffSSepherosa Ziehau 	int			c_s;
78772669ffSSepherosa Ziehau 	int			c_err;
79772669ffSSepherosa Ziehau 	uint64_t		c_stat;
80772669ffSSepherosa Ziehau 	struct timespec		c_terr;
81772669ffSSepherosa Ziehau 
82772669ffSSepherosa Ziehau 	STAILQ_ENTRY(conn_ctx)	c_glob_link;
83772669ffSSepherosa Ziehau 	STAILQ_ENTRY(conn_ctx)	c_link;
84772669ffSSepherosa Ziehau 	struct sockaddr_in	c_in;
85772669ffSSepherosa Ziehau 	int			c_thr_id;
86772669ffSSepherosa Ziehau } __aligned(SEND_CONN_CTX_ALIGN);
87772669ffSSepherosa Ziehau 
88772669ffSSepherosa Ziehau STAILQ_HEAD(conn_ctx_list, conn_ctx);
89772669ffSSepherosa Ziehau 
90772669ffSSepherosa Ziehau struct send_globctx {
91772669ffSSepherosa Ziehau 	struct conn_ctx_list	g_conn;
92772669ffSSepherosa Ziehau 
93772669ffSSepherosa Ziehau 	int			g_dur;
94772669ffSSepherosa Ziehau 	int			g_nconn;
95772669ffSSepherosa Ziehau 	pthread_mutex_t		g_lock;
96772669ffSSepherosa Ziehau 	pthread_cond_t		g_cond;
97772669ffSSepherosa Ziehau 
98772669ffSSepherosa Ziehau 	volatile u_int		g_nwait;
99772669ffSSepherosa Ziehau 	int			g_readto_ms;	/* unit: ms */
100a06d7982SSepherosa Ziehau 	int			g_buflen;
101dbeb2594SSepherosa Ziehau 	bool			g_sendfile;
102772669ffSSepherosa Ziehau };
103772669ffSSepherosa Ziehau 
104772669ffSSepherosa Ziehau struct send_thrctx {
105772669ffSSepherosa Ziehau 	struct conn_ctx_list	t_conn;
106772669ffSSepherosa Ziehau 	pthread_mutex_t		t_lock;
107772669ffSSepherosa Ziehau 	pthread_cond_t		t_cond;
108772669ffSSepherosa Ziehau 
109772669ffSSepherosa Ziehau 	struct send_globctx	*t_glob;
110772669ffSSepherosa Ziehau 	struct timespec		t_start;
111772669ffSSepherosa Ziehau 	struct timespec		t_end;
112772669ffSSepherosa Ziehau 	double			t_run_us;	/* unit: us */
113772669ffSSepherosa Ziehau 
114772669ffSSepherosa Ziehau 	pthread_t		t_tid;
115772669ffSSepherosa Ziehau 	int			t_id;
116772669ffSSepherosa Ziehau };
117772669ffSSepherosa Ziehau 
118772669ffSSepherosa Ziehau static void	send_build_addrlist(const struct sockaddr_in *, int,
119772669ffSSepherosa Ziehau 		    const struct sockaddr_in **, int *, int);
120772669ffSSepherosa Ziehau static void	*send_thread(void *);
121772669ffSSepherosa Ziehau 
122772669ffSSepherosa Ziehau static __inline void
send_spinwait(void)123772669ffSSepherosa Ziehau send_spinwait(void)
124772669ffSSepherosa Ziehau {
125772669ffSSepherosa Ziehau #if defined(__DragonFly__)
126772669ffSSepherosa Ziehau 	cpu_pause();
127772669ffSSepherosa Ziehau #elif defined(__FreeBSD__)
128772669ffSSepherosa Ziehau 	cpu_spinwait();
129772669ffSSepherosa Ziehau #else
130772669ffSSepherosa Ziehau 	/* XXX nothing */
131772669ffSSepherosa Ziehau #endif
132772669ffSSepherosa Ziehau }
133772669ffSSepherosa Ziehau 
134772669ffSSepherosa Ziehau static void
usage(const char * cmd)135772669ffSSepherosa Ziehau usage(const char *cmd)
136772669ffSSepherosa Ziehau {
137772669ffSSepherosa Ziehau 	fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
138a06d7982SSepherosa Ziehau 	    "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] "
1392fe73a22SSepherosa Ziehau 	    "[-b buflen] [-B]\n", cmd);
140772669ffSSepherosa Ziehau 	exit(2);
141772669ffSSepherosa Ziehau }
142772669ffSSepherosa Ziehau 
143772669ffSSepherosa Ziehau int
main(int argc,char * argv[])144772669ffSSepherosa Ziehau main(int argc, char *argv[])
145772669ffSSepherosa Ziehau {
146772669ffSSepherosa Ziehau 	struct send_globctx glob;
147772669ffSSepherosa Ziehau 	struct send_thrctx *ctx_arr, *ctx;
148772669ffSSepherosa Ziehau 	struct sockaddr_in *in_arr, *in;
149772669ffSSepherosa Ziehau 	const struct sockaddr_in *daddr;
150772669ffSSepherosa Ziehau 	struct timespec run, end, start;
151772669ffSSepherosa Ziehau 	double total_run_us, total, conn_min, conn_max;
152772669ffSSepherosa Ziehau 	double jain, jain_res;
153772669ffSSepherosa Ziehau 	int jain_cnt;
154772669ffSSepherosa Ziehau 	struct conn_ctx *conn;
155772669ffSSepherosa Ziehau 	sigset_t sigset;
1562fe73a22SSepherosa Ziehau 	int opt, i, ncpus;
157772669ffSSepherosa Ziehau 	int in_arr_cnt, in_arr_sz, ndaddr;
158a06d7982SSepherosa Ziehau 	int nthr, nconn, dur, readto_ms, buflen;
159772669ffSSepherosa Ziehau 	int log_err, err_cnt, has_minmax;
160772669ffSSepherosa Ziehau 	u_short port = RECV_PORT;
161772669ffSSepherosa Ziehau 	uint32_t idx;
162772669ffSSepherosa Ziehau 	size_t sz;
1632fe73a22SSepherosa Ziehau 	bool do_sendfile = false, bindcpu = false;
164772669ffSSepherosa Ziehau 
165772669ffSSepherosa Ziehau 	sigemptyset(&sigset);
166772669ffSSepherosa Ziehau 	sigaddset(&sigset, SIGPIPE);
167772669ffSSepherosa Ziehau 	if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
168772669ffSSepherosa Ziehau 		err(1, "sigprocmask failed");
169772669ffSSepherosa Ziehau 
1702fe73a22SSepherosa Ziehau 	sz = sizeof(ncpus);
1712fe73a22SSepherosa Ziehau 	if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0)
172772669ffSSepherosa Ziehau 		err(1, "sysctl hw.ncpu failed");
1732fe73a22SSepherosa Ziehau 	nthr = ncpus;
174772669ffSSepherosa Ziehau 
175772669ffSSepherosa Ziehau 	in_arr_sz = 4;
176772669ffSSepherosa Ziehau 	in_arr_cnt = 0;
177772669ffSSepherosa Ziehau 	in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
178772669ffSSepherosa Ziehau 	if (in_arr == NULL)
179772669ffSSepherosa Ziehau 		err(1, "malloc failed");
180772669ffSSepherosa Ziehau 
181772669ffSSepherosa Ziehau 	log_err = 0;
182772669ffSSepherosa Ziehau 	nconn = 0;
183772669ffSSepherosa Ziehau 	dur = SEND_DUR;
184772669ffSSepherosa Ziehau 	readto_ms = SEND_READTO_MS;
185a06d7982SSepherosa Ziehau 	buflen = SEND_BUFLEN;
186772669ffSSepherosa Ziehau 
1872fe73a22SSepherosa Ziehau 	while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) {
188772669ffSSepherosa Ziehau 		switch (opt) {
189772669ffSSepherosa Ziehau 		case '4':
190772669ffSSepherosa Ziehau 			if (in_arr_cnt == in_arr_sz) {
191772669ffSSepherosa Ziehau 				in_arr_sz *= 2;
192772669ffSSepherosa Ziehau 				in_arr = reallocf(in_arr,
193772669ffSSepherosa Ziehau 				    in_arr_sz * sizeof(struct sockaddr_in));
194772669ffSSepherosa Ziehau 				if (in_arr == NULL)
195772669ffSSepherosa Ziehau 					err(1, "reallocf failed");
196772669ffSSepherosa Ziehau 			}
197772669ffSSepherosa Ziehau 			in = &in_arr[in_arr_cnt];
198772669ffSSepherosa Ziehau 			++in_arr_cnt;
199772669ffSSepherosa Ziehau 
200772669ffSSepherosa Ziehau 			memset(in, 0, sizeof(*in));
201772669ffSSepherosa Ziehau 			in->sin_family = AF_INET;
202772669ffSSepherosa Ziehau 			if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
203772669ffSSepherosa Ziehau 				errx(1, "inet_pton failed %s", optarg);
204772669ffSSepherosa Ziehau 			break;
205772669ffSSepherosa Ziehau 
2062fe73a22SSepherosa Ziehau 		case 'B':
2072fe73a22SSepherosa Ziehau 			bindcpu = true;
2082fe73a22SSepherosa Ziehau 			break;
2092fe73a22SSepherosa Ziehau 
210772669ffSSepherosa Ziehau 		case 'E':
211772669ffSSepherosa Ziehau 			log_err = 1;
212772669ffSSepherosa Ziehau 			break;
213772669ffSSepherosa Ziehau 
214dbeb2594SSepherosa Ziehau 		case 'S':
215dbeb2594SSepherosa Ziehau 			do_sendfile = true;
216dbeb2594SSepherosa Ziehau 			break;
217dbeb2594SSepherosa Ziehau 
218a06d7982SSepherosa Ziehau 		case 'b':
219a06d7982SSepherosa Ziehau 			buflen = strtol(optarg, NULL, 10);
220a06d7982SSepherosa Ziehau 			if (buflen <= 0)
221a06d7982SSepherosa Ziehau 				errx(1, "invalid -b");
222a06d7982SSepherosa Ziehau 			break;
223a06d7982SSepherosa Ziehau 
224772669ffSSepherosa Ziehau 		case 'c':
225772669ffSSepherosa Ziehau 			nconn = strtol(optarg, NULL, 10);
226772669ffSSepherosa Ziehau 			if (nconn <= 0)
227772669ffSSepherosa Ziehau 				errx(1, "invalid -c");
228772669ffSSepherosa Ziehau 			break;
229772669ffSSepherosa Ziehau 
230772669ffSSepherosa Ziehau 		case 'l':
231772669ffSSepherosa Ziehau 			dur = strtoul(optarg, NULL, 10);
232772669ffSSepherosa Ziehau 			if (dur == 0)
233772669ffSSepherosa Ziehau 				errx(1, "invalid -l");
234772669ffSSepherosa Ziehau 			break;
235772669ffSSepherosa Ziehau 
236772669ffSSepherosa Ziehau 		case 'p':
237772669ffSSepherosa Ziehau 			port = strtoul(optarg, NULL, 10);
238772669ffSSepherosa Ziehau 			break;
239772669ffSSepherosa Ziehau 
240772669ffSSepherosa Ziehau 		case 'r':
241772669ffSSepherosa Ziehau 			readto_ms = strtol(optarg, NULL, 10);
242772669ffSSepherosa Ziehau 			if (readto_ms <= 0)
243772669ffSSepherosa Ziehau 				errx(1, "invalid -r");
244772669ffSSepherosa Ziehau 			break;
245772669ffSSepherosa Ziehau 
246772669ffSSepherosa Ziehau 		case 't':
247772669ffSSepherosa Ziehau 			nthr = strtol(optarg, NULL, 10);
248772669ffSSepherosa Ziehau 			if (nthr <= 0)
249772669ffSSepherosa Ziehau 				errx(1, "invalid -t");
250772669ffSSepherosa Ziehau 			break;
251772669ffSSepherosa Ziehau 
252772669ffSSepherosa Ziehau 		default:
253772669ffSSepherosa Ziehau 			usage(argv[0]);
254772669ffSSepherosa Ziehau 		}
255772669ffSSepherosa Ziehau 	}
256772669ffSSepherosa Ziehau 	if (in_arr_cnt == 0 || nconn == 0)
257dbeb2594SSepherosa Ziehau 		errx(1, "either -4 or -c are specified");
258772669ffSSepherosa Ziehau 
259772669ffSSepherosa Ziehau 	if (nthr > nconn)
260772669ffSSepherosa Ziehau 		nthr = nconn;
261772669ffSSepherosa Ziehau 
262772669ffSSepherosa Ziehau 	for (i = 0; i < in_arr_cnt; ++i)
263772669ffSSepherosa Ziehau 		in_arr[i].sin_port = htons(port);
264772669ffSSepherosa Ziehau 
265772669ffSSepherosa Ziehau 	ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
266772669ffSSepherosa Ziehau 	if (ctx_arr == NULL)
267772669ffSSepherosa Ziehau 		err(1, "calloc failed");
268772669ffSSepherosa Ziehau 
269772669ffSSepherosa Ziehau 	memset(&glob, 0, sizeof(glob));
270772669ffSSepherosa Ziehau 	STAILQ_INIT(&glob.g_conn);
271772669ffSSepherosa Ziehau 	glob.g_nconn = nconn;
272772669ffSSepherosa Ziehau 	glob.g_nwait = 1; /* count self in */
273772669ffSSepherosa Ziehau 	glob.g_dur = dur;
274772669ffSSepherosa Ziehau 	glob.g_readto_ms = readto_ms;
275dbeb2594SSepherosa Ziehau 	glob.g_sendfile = do_sendfile;
276a06d7982SSepherosa Ziehau 	glob.g_buflen = buflen;
277772669ffSSepherosa Ziehau 	pthread_mutex_init(&glob.g_lock, NULL);
278772669ffSSepherosa Ziehau 	pthread_cond_init(&glob.g_cond, NULL);
279772669ffSSepherosa Ziehau 
280772669ffSSepherosa Ziehau 	pthread_set_name_np(pthread_self(), "main");
281772669ffSSepherosa Ziehau 
282772669ffSSepherosa Ziehau 	/* Build receiver address list */
283772669ffSSepherosa Ziehau 	send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
284772669ffSSepherosa Ziehau 
285772669ffSSepherosa Ziehau 	/*
286772669ffSSepherosa Ziehau 	 * Start senders.
287772669ffSSepherosa Ziehau 	 */
288772669ffSSepherosa Ziehau 	for (i = 0; i < nthr; ++i) {
2892fe73a22SSepherosa Ziehau 		pthread_attr_t attr;
290772669ffSSepherosa Ziehau 		int error;
291772669ffSSepherosa Ziehau 
292772669ffSSepherosa Ziehau 		ctx = &ctx_arr[i];
293772669ffSSepherosa Ziehau 		STAILQ_INIT(&ctx->t_conn);
294772669ffSSepherosa Ziehau 		ctx->t_id = i;
295772669ffSSepherosa Ziehau 		ctx->t_glob = &glob;
296772669ffSSepherosa Ziehau 		pthread_mutex_init(&ctx->t_lock, NULL);
297772669ffSSepherosa Ziehau 		pthread_cond_init(&ctx->t_cond, NULL);
298772669ffSSepherosa Ziehau 
2992fe73a22SSepherosa Ziehau 		pthread_attr_init(&attr);
3002fe73a22SSepherosa Ziehau 		if (bindcpu) {
30112439f34SSepherosa Ziehau #ifdef __FreeBSD__
30212439f34SSepherosa Ziehau 			cpuset_t mask;
30312439f34SSepherosa Ziehau #else
3042fe73a22SSepherosa Ziehau 			cpu_set_t mask;
30512439f34SSepherosa Ziehau #endif
3062fe73a22SSepherosa Ziehau 
3072fe73a22SSepherosa Ziehau 			CPU_ZERO(&mask);
3082fe73a22SSepherosa Ziehau 			CPU_SET(i % ncpus, &mask);
3092fe73a22SSepherosa Ziehau 			error = pthread_attr_setaffinity_np(&attr,
3102fe73a22SSepherosa Ziehau 			    sizeof(mask), &mask);
3112fe73a22SSepherosa Ziehau 			if (error) {
3122fe73a22SSepherosa Ziehau 				errc(1, error, "pthread_attr_setaffinity_np "
3132fe73a22SSepherosa Ziehau 				    "failed");
3142fe73a22SSepherosa Ziehau 			}
3152fe73a22SSepherosa Ziehau 		}
3162fe73a22SSepherosa Ziehau 
3172fe73a22SSepherosa Ziehau 		error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx);
318772669ffSSepherosa Ziehau 		if (error)
319772669ffSSepherosa Ziehau 			errc(1, error, "pthread_create failed");
3202fe73a22SSepherosa Ziehau 		pthread_attr_destroy(&attr);
321772669ffSSepherosa Ziehau 	}
322772669ffSSepherosa Ziehau 
323772669ffSSepherosa Ziehau 	/*
324772669ffSSepherosa Ziehau 	 * Distribute connections to senders.
325772669ffSSepherosa Ziehau 	 *
326772669ffSSepherosa Ziehau 	 * NOTE:
327772669ffSSepherosa Ziehau 	 * We start from a random position in the address list, so that the
328772669ffSSepherosa Ziehau 	 * first several receiving servers will not be abused, if the number
329772669ffSSepherosa Ziehau 	 * of connections is small and there are many clients.
330772669ffSSepherosa Ziehau 	 */
331772669ffSSepherosa Ziehau 	idx = arc4random_uniform(ndaddr);
332772669ffSSepherosa Ziehau 	for (i = 0; i < nconn; ++i) {
333772669ffSSepherosa Ziehau 		const struct sockaddr_in *da;
334772669ffSSepherosa Ziehau 
335772669ffSSepherosa Ziehau 		da = &daddr[idx % ndaddr];
336772669ffSSepherosa Ziehau 		++idx;
337772669ffSSepherosa Ziehau 
338772669ffSSepherosa Ziehau 		conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
339772669ffSSepherosa Ziehau 		if (conn == NULL)
340772669ffSSepherosa Ziehau 			err(1, "aligned_alloc failed");
341772669ffSSepherosa Ziehau 		memset(conn, 0, sizeof(*conn));
342772669ffSSepherosa Ziehau 		conn->c_in = *da;
343772669ffSSepherosa Ziehau 		conn->c_s = -1;
344772669ffSSepherosa Ziehau 
345772669ffSSepherosa Ziehau 		ctx = &ctx_arr[i % nthr];
346772669ffSSepherosa Ziehau 		conn->c_thr_id = ctx->t_id;
347772669ffSSepherosa Ziehau 
348772669ffSSepherosa Ziehau 		pthread_mutex_lock(&ctx->t_lock);
349772669ffSSepherosa Ziehau 		STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
350772669ffSSepherosa Ziehau 		pthread_mutex_unlock(&ctx->t_lock);
351772669ffSSepherosa Ziehau 		pthread_cond_signal(&ctx->t_cond);
352772669ffSSepherosa Ziehau 
353772669ffSSepherosa Ziehau 		/* Add to the global list for results gathering */
354772669ffSSepherosa Ziehau 		STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
355772669ffSSepherosa Ziehau 	}
356772669ffSSepherosa Ziehau 
357772669ffSSepherosa Ziehau 	/*
358772669ffSSepherosa Ziehau 	 * No more connections; notify the senders.
359772669ffSSepherosa Ziehau 	 *
360772669ffSSepherosa Ziehau 	 * NOTE:
361772669ffSSepherosa Ziehau 	 * The marker for 'the end of connection list' has 0 in its
362772669ffSSepherosa Ziehau 	 * c_in.sin_port.
363772669ffSSepherosa Ziehau 	 */
364772669ffSSepherosa Ziehau 	for (i = 0; i < nthr; ++i) {
365772669ffSSepherosa Ziehau 		conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
366772669ffSSepherosa Ziehau 		if (conn == NULL)
367772669ffSSepherosa Ziehau 			err(1, "aligned_alloc failed");
368772669ffSSepherosa Ziehau 		memset(conn, 0, sizeof(*conn));
369772669ffSSepherosa Ziehau 		conn->c_s = -1;
370772669ffSSepherosa Ziehau 
371772669ffSSepherosa Ziehau 		ctx = &ctx_arr[i];
372772669ffSSepherosa Ziehau 		pthread_mutex_lock(&ctx->t_lock);
373772669ffSSepherosa Ziehau 		STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
374772669ffSSepherosa Ziehau 		pthread_mutex_unlock(&ctx->t_lock);
375772669ffSSepherosa Ziehau 		pthread_cond_signal(&ctx->t_cond);
376772669ffSSepherosa Ziehau 	}
377772669ffSSepherosa Ziehau 
378772669ffSSepherosa Ziehau 	/*
379772669ffSSepherosa Ziehau 	 * Sender start sync, stage 1:
380772669ffSSepherosa Ziehau 	 * Wait for connections establishment (slow).
381772669ffSSepherosa Ziehau 	 */
382772669ffSSepherosa Ziehau 	pthread_mutex_lock(&glob.g_lock);
383772669ffSSepherosa Ziehau 	while (glob.g_nconn != 0)
384772669ffSSepherosa Ziehau 		pthread_cond_wait(&glob.g_cond, &glob.g_lock);
385772669ffSSepherosa Ziehau 	pthread_mutex_unlock(&glob.g_lock);
386772669ffSSepherosa Ziehau 
387772669ffSSepherosa Ziehau 	/*
388772669ffSSepherosa Ziehau 	 * Sender start sync, stage 2:
389772669ffSSepherosa Ziehau 	 * Wait for senders to spin-wait; and once all senders spin-wait,
390772669ffSSepherosa Ziehau 	 * release them by resetting g_nwait.
391772669ffSSepherosa Ziehau 	 */
392772669ffSSepherosa Ziehau 	while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
393772669ffSSepherosa Ziehau 		send_spinwait();
394772669ffSSepherosa Ziehau 
395772669ffSSepherosa Ziehau 	fprintf(stderr, "start %d seconds sending test: %d threads, "
396772669ffSSepherosa Ziehau 	    "%d connections\n", dur, nthr, nconn);
397772669ffSSepherosa Ziehau 
398772669ffSSepherosa Ziehau 	/*
399772669ffSSepherosa Ziehau 	 * Wait for the senders to finish and gather the results.
400772669ffSSepherosa Ziehau 	 */
401772669ffSSepherosa Ziehau 
402772669ffSSepherosa Ziehau 	memset(&end, 0, sizeof(end));		/* XXX stupid gcc warning */
403772669ffSSepherosa Ziehau 	memset(&start, 0, sizeof(start));	/* XXX stupid gcc warning */
404772669ffSSepherosa Ziehau 
405772669ffSSepherosa Ziehau 	for (i = 0; i < nthr; ++i) {
406772669ffSSepherosa Ziehau 		ctx = &ctx_arr[i];
407772669ffSSepherosa Ziehau 		pthread_join(ctx->t_tid, NULL);
408772669ffSSepherosa Ziehau 
409*944cd60cSSascha Wildner 		timespecsub(&ctx->t_end, &ctx->t_start, &run);
410772669ffSSepherosa Ziehau 		ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
411772669ffSSepherosa Ziehau 		    ((double)run.tv_nsec / 1000.0);
412772669ffSSepherosa Ziehau 
413772669ffSSepherosa Ziehau 		if (i == 0) {
414772669ffSSepherosa Ziehau 			start = ctx->t_start;
415772669ffSSepherosa Ziehau 			end = ctx->t_end;
416772669ffSSepherosa Ziehau 		} else {
417772669ffSSepherosa Ziehau 			if (timespeccmp(&start, &ctx->t_start, >))
418772669ffSSepherosa Ziehau 				start = ctx->t_start;
419772669ffSSepherosa Ziehau 			if (timespeccmp(&end, &ctx->t_end, <))
420772669ffSSepherosa Ziehau 				end = ctx->t_end;
421772669ffSSepherosa Ziehau 		}
422772669ffSSepherosa Ziehau 
423772669ffSSepherosa Ziehau #ifdef SEND_TIME_DEBUG
424772669ffSSepherosa Ziehau 		fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
425772669ffSSepherosa Ziehau 		    ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
426772669ffSSepherosa Ziehau 		    ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
427772669ffSSepherosa Ziehau #endif
428772669ffSSepherosa Ziehau 	}
429772669ffSSepherosa Ziehau 
430772669ffSSepherosa Ziehau #ifdef SEND_TIME_DEBUG
431772669ffSSepherosa Ziehau 	fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
432772669ffSSepherosa Ziehau 	    start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
433772669ffSSepherosa Ziehau #endif
434772669ffSSepherosa Ziehau 
435*944cd60cSSascha Wildner 	timespecsub(&end, &start, &run);
436772669ffSSepherosa Ziehau 	total_run_us = ((double)run.tv_sec * 1000000.0) +
437772669ffSSepherosa Ziehau 	    ((double)run.tv_nsec / 1000.0);
438772669ffSSepherosa Ziehau 	total = 0.0;
439772669ffSSepherosa Ziehau 
440772669ffSSepherosa Ziehau 	err_cnt = 0;
441772669ffSSepherosa Ziehau 	has_minmax = 0;
442772669ffSSepherosa Ziehau 	conn_min = 0.0;
443772669ffSSepherosa Ziehau 	conn_max = 0.0;
444772669ffSSepherosa Ziehau 
445772669ffSSepherosa Ziehau 	jain = 0.0;
446772669ffSSepherosa Ziehau 	jain_res = 0.0;
447772669ffSSepherosa Ziehau 	jain_cnt = 0;
448772669ffSSepherosa Ziehau 
449772669ffSSepherosa Ziehau 	STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
450772669ffSSepherosa Ziehau 		total += conn->c_stat;
451772669ffSSepherosa Ziehau 		if (conn->c_err == 0) {
452772669ffSSepherosa Ziehau 			double perf;	/* unit: Mbps */
453772669ffSSepherosa Ziehau 
454772669ffSSepherosa Ziehau 			perf = (conn->c_stat * 8.0) /
455772669ffSSepherosa Ziehau 			    ctx_arr[conn->c_thr_id].t_run_us;
456772669ffSSepherosa Ziehau 			if (!has_minmax) {
457772669ffSSepherosa Ziehau 				conn_min = perf;
458772669ffSSepherosa Ziehau 				conn_max = perf;
459772669ffSSepherosa Ziehau 				has_minmax = 1;
460772669ffSSepherosa Ziehau 			} else {
461772669ffSSepherosa Ziehau 				if (perf > conn_max)
462772669ffSSepherosa Ziehau 					conn_max = perf;
463772669ffSSepherosa Ziehau 				if (perf < conn_min)
464772669ffSSepherosa Ziehau 					conn_min = perf;
465772669ffSSepherosa Ziehau 			}
466772669ffSSepherosa Ziehau 			jain += (perf * perf);
467772669ffSSepherosa Ziehau 			jain_res += perf;
468772669ffSSepherosa Ziehau 			++jain_cnt;
469772669ffSSepherosa Ziehau 		} else {
470772669ffSSepherosa Ziehau 			++err_cnt;
471772669ffSSepherosa Ziehau 		}
472772669ffSSepherosa Ziehau 	}
473772669ffSSepherosa Ziehau 
474772669ffSSepherosa Ziehau 	jain *= jain_cnt;
475772669ffSSepherosa Ziehau 	jain = (jain_res * jain_res) / jain;
476772669ffSSepherosa Ziehau 
477772669ffSSepherosa Ziehau 	printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
478772669ffSSepherosa Ziehau 	    "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
479772669ffSSepherosa Ziehau 	    jain, err_cnt);
480772669ffSSepherosa Ziehau 
481772669ffSSepherosa Ziehau 	if (log_err && err_cnt) {
482772669ffSSepherosa Ziehau 		STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
483772669ffSSepherosa Ziehau 			char name[INET_ADDRSTRLEN];
484772669ffSSepherosa Ziehau 			double tmp_run;
485772669ffSSepherosa Ziehau 
486772669ffSSepherosa Ziehau 			if (conn->c_err == 0)
487772669ffSSepherosa Ziehau 				continue;
488772669ffSSepherosa Ziehau 
489772669ffSSepherosa Ziehau 			run = conn->c_terr;
490*944cd60cSSascha Wildner 			timespecsub(&conn->c_terr,
491*944cd60cSSascha Wildner 			    &ctx_arr[conn->c_thr_id].t_start, &run);
492772669ffSSepherosa Ziehau 			tmp_run = ((double)run.tv_sec * 1000000.0) +
493772669ffSSepherosa Ziehau 			    ((double)run.tv_nsec / 1000.0);
494772669ffSSepherosa Ziehau 			fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
495772669ffSSepherosa Ziehau 			    "errno %d\n",
496772669ffSSepherosa Ziehau 			    conn->c_thr_id,
497772669ffSSepherosa Ziehau 			    inet_ntop(AF_INET, &conn->c_in.sin_addr,
498772669ffSSepherosa Ziehau 			        name, sizeof(name)),
499772669ffSSepherosa Ziehau 			    ntohs(conn->c_in.sin_port),
500772669ffSSepherosa Ziehau 			    run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
501772669ffSSepherosa Ziehau 			    conn->c_err);
502772669ffSSepherosa Ziehau 			--err_cnt;
503772669ffSSepherosa Ziehau 			if (err_cnt == 0)
504772669ffSSepherosa Ziehau 				break;
505772669ffSSepherosa Ziehau 		}
506772669ffSSepherosa Ziehau 	}
507772669ffSSepherosa Ziehau 
508772669ffSSepherosa Ziehau 	exit(0);
509772669ffSSepherosa Ziehau }
510772669ffSSepherosa Ziehau 
511772669ffSSepherosa Ziehau static void
send_build_addrlist(const struct sockaddr_in * in_arr,int in_arr_cnt,const struct sockaddr_in ** daddr0,int * ndaddr0,int readto_ms)512772669ffSSepherosa Ziehau send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
513772669ffSSepherosa Ziehau     const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
514772669ffSSepherosa Ziehau {
515772669ffSSepherosa Ziehau 	struct sockaddr_in *daddr;
516772669ffSSepherosa Ziehau 	struct timeval readto;
517772669ffSSepherosa Ziehau 	int i, ndaddr;
518772669ffSSepherosa Ziehau 
519772669ffSSepherosa Ziehau 	daddr = NULL;
520772669ffSSepherosa Ziehau 	ndaddr = 0;
521772669ffSSepherosa Ziehau 
522772669ffSSepherosa Ziehau 	memset(&readto, 0, sizeof(readto));
523772669ffSSepherosa Ziehau 	readto.tv_sec = readto_ms / 1000;
524772669ffSSepherosa Ziehau 	readto.tv_usec = (readto_ms % 1000) * 1000;
525772669ffSSepherosa Ziehau 
526772669ffSSepherosa Ziehau 	for (i = 0; i < in_arr_cnt; ++i) {
527772669ffSSepherosa Ziehau 		const struct sockaddr_in *in = &in_arr[i];
528772669ffSSepherosa Ziehau 		struct recv_info info_hdr;
529772669ffSSepherosa Ziehau 		uint16_t *ports;
530772669ffSSepherosa Ziehau 		int s, n, ports_sz, d;
531772669ffSSepherosa Ziehau 
532772669ffSSepherosa Ziehau again:
533772669ffSSepherosa Ziehau 		s = socket(AF_INET, SOCK_STREAM, 0);
534772669ffSSepherosa Ziehau 		if (s < 0)
535772669ffSSepherosa Ziehau 			err(1, "socket failed");
536772669ffSSepherosa Ziehau 
537772669ffSSepherosa Ziehau 		if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
538772669ffSSepherosa Ziehau 			err(1, "connect failed");
539772669ffSSepherosa Ziehau 
540772669ffSSepherosa Ziehau 		if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
541772669ffSSepherosa Ziehau 		    &readto, sizeof(readto)) < 0)
542772669ffSSepherosa Ziehau 			err(1, "setsockopt(RCVTIMEO) failed");
543772669ffSSepherosa Ziehau 
544772669ffSSepherosa Ziehau 		n = read(s, &info_hdr, sizeof(info_hdr));
545772669ffSSepherosa Ziehau 		if (n != sizeof(info_hdr)) {
546772669ffSSepherosa Ziehau 			if (n < 0) {
547772669ffSSepherosa Ziehau 				if (errno == EAGAIN) {
548772669ffSSepherosa Ziehau 					close(s);
549772669ffSSepherosa Ziehau 					goto again;
550772669ffSSepherosa Ziehau 				}
551772669ffSSepherosa Ziehau 				err(1, "read info hdr failed");
552772669ffSSepherosa Ziehau 			} else {
553772669ffSSepherosa Ziehau 				errx(1, "read truncated info hdr");
554772669ffSSepherosa Ziehau 			}
555772669ffSSepherosa Ziehau 		}
556772669ffSSepherosa Ziehau 		if (info_hdr.ndport == 0) {
557772669ffSSepherosa Ziehau 			close(s);
558772669ffSSepherosa Ziehau 			continue;
559772669ffSSepherosa Ziehau 		}
560772669ffSSepherosa Ziehau 
561772669ffSSepherosa Ziehau 		ports_sz = info_hdr.ndport * sizeof(uint16_t);
562772669ffSSepherosa Ziehau 		ports = malloc(ports_sz);
563772669ffSSepherosa Ziehau 		if (ports == NULL)
564772669ffSSepherosa Ziehau 			err(1, "malloc failed");
565772669ffSSepherosa Ziehau 
566772669ffSSepherosa Ziehau 		n = read(s, ports, ports_sz);
567772669ffSSepherosa Ziehau 		if (n != ports_sz) {
568772669ffSSepherosa Ziehau 			if (n < 0) {
569772669ffSSepherosa Ziehau 				if (errno == EAGAIN) {
570772669ffSSepherosa Ziehau 					free(ports);
571772669ffSSepherosa Ziehau 					close(s);
572772669ffSSepherosa Ziehau 					goto again;
573772669ffSSepherosa Ziehau 				}
574772669ffSSepherosa Ziehau 				err(1, "read ports failed");
575772669ffSSepherosa Ziehau 			} else {
576772669ffSSepherosa Ziehau 				errx(1, "read truncated ports");
577772669ffSSepherosa Ziehau 			}
578772669ffSSepherosa Ziehau 		}
579772669ffSSepherosa Ziehau 
580772669ffSSepherosa Ziehau 		daddr = reallocf(daddr,
581772669ffSSepherosa Ziehau 		    (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
582772669ffSSepherosa Ziehau 		if (daddr == NULL)
583772669ffSSepherosa Ziehau 			err(1, "reallocf failed");
584772669ffSSepherosa Ziehau 
585772669ffSSepherosa Ziehau 		for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
586772669ffSSepherosa Ziehau 			struct sockaddr_in *da = &daddr[d];
587772669ffSSepherosa Ziehau 
588772669ffSSepherosa Ziehau 			*da = *in;
589772669ffSSepherosa Ziehau 			da->sin_port = ports[d - ndaddr];
590772669ffSSepherosa Ziehau 		}
591772669ffSSepherosa Ziehau 		ndaddr += info_hdr.ndport;
592772669ffSSepherosa Ziehau 
593772669ffSSepherosa Ziehau 		free(ports);
594772669ffSSepherosa Ziehau 		close(s);
595772669ffSSepherosa Ziehau 	}
596772669ffSSepherosa Ziehau 
597772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
598772669ffSSepherosa Ziehau 	for (i = 0; i < ndaddr; ++i) {
599772669ffSSepherosa Ziehau 		const struct sockaddr_in *da = &daddr[i];
600772669ffSSepherosa Ziehau 		char name[INET_ADDRSTRLEN];
601772669ffSSepherosa Ziehau 
602772669ffSSepherosa Ziehau 		fprintf(stderr, "%s:%d\n",
603772669ffSSepherosa Ziehau 		    inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
604772669ffSSepherosa Ziehau 		    ntohs(da->sin_port));
605772669ffSSepherosa Ziehau 	}
606772669ffSSepherosa Ziehau #endif
607772669ffSSepherosa Ziehau 
608772669ffSSepherosa Ziehau 	*daddr0 = daddr;
609772669ffSSepherosa Ziehau 	*ndaddr0 = ndaddr;
610772669ffSSepherosa Ziehau }
611772669ffSSepherosa Ziehau 
612772669ffSSepherosa Ziehau static void *
send_thread(void * xctx)613772669ffSSepherosa Ziehau send_thread(void *xctx)
614772669ffSSepherosa Ziehau {
615772669ffSSepherosa Ziehau 	struct send_thrctx *ctx = xctx;
616772669ffSSepherosa Ziehau 	struct conn_ctx *timeo;
617772669ffSSepherosa Ziehau 	struct kevent chg_evt;
618772669ffSSepherosa Ziehau 	uint8_t *buf;
619a06d7982SSepherosa Ziehau 	int nconn = 0, kq, n, fd = -1, buflen;
620772669ffSSepherosa Ziehau 	char name[32];
621772669ffSSepherosa Ziehau 
622772669ffSSepherosa Ziehau 	snprintf(name, sizeof(name), "snd%d", ctx->t_id);
623772669ffSSepherosa Ziehau 	pthread_set_name_np(pthread_self(), name);
624772669ffSSepherosa Ziehau 
625a06d7982SSepherosa Ziehau 	buflen = ctx->t_glob->g_buflen;
626a06d7982SSepherosa Ziehau 	buf = malloc(buflen);
627772669ffSSepherosa Ziehau 	if (buf == NULL)
628a06d7982SSepherosa Ziehau 		err(1, "malloc(%d) failed", buflen);
629772669ffSSepherosa Ziehau 
630dbeb2594SSepherosa Ziehau 	if (ctx->t_glob->g_sendfile) {
631dbeb2594SSepherosa Ziehau 		char filename[] = "sendtmpXXX";
632dbeb2594SSepherosa Ziehau 
633dbeb2594SSepherosa Ziehau 		fd = mkstemp(filename);
634dbeb2594SSepherosa Ziehau 		if (fd < 0)
635dbeb2594SSepherosa Ziehau 			err(1, "mkstemp failed");
636a06d7982SSepherosa Ziehau 		if (write(fd, buf, buflen) != buflen)
637dbeb2594SSepherosa Ziehau 			err(1, "write to file failed");
638dbeb2594SSepherosa Ziehau 		unlink(filename);
639dbeb2594SSepherosa Ziehau 		free(buf);
640dbeb2594SSepherosa Ziehau 		buf = NULL;
641dbeb2594SSepherosa Ziehau 	}
642dbeb2594SSepherosa Ziehau 
643772669ffSSepherosa Ziehau 	kq = kqueue();
644772669ffSSepherosa Ziehau 	if (kq < 0)
645772669ffSSepherosa Ziehau 		err(1, "kqueue failed");
646772669ffSSepherosa Ziehau 
647772669ffSSepherosa Ziehau 	/*
648772669ffSSepherosa Ziehau 	 * Establish the connections assigned to us and add the
649772669ffSSepherosa Ziehau 	 * established connections to kqueue.
650772669ffSSepherosa Ziehau 	 */
651772669ffSSepherosa Ziehau 	for (;;) {
652772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
653772669ffSSepherosa Ziehau 		char addr_name[INET_ADDRSTRLEN];
654772669ffSSepherosa Ziehau #endif
655772669ffSSepherosa Ziehau 		struct timeval readto;
656772669ffSSepherosa Ziehau 		struct conn_ctx *conn;
657772669ffSSepherosa Ziehau 		struct conn_ack ack;
658772669ffSSepherosa Ziehau 		int on;
659772669ffSSepherosa Ziehau 
660772669ffSSepherosa Ziehau 		pthread_mutex_lock(&ctx->t_lock);
661772669ffSSepherosa Ziehau 		while (STAILQ_EMPTY(&ctx->t_conn))
662772669ffSSepherosa Ziehau 			pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
663772669ffSSepherosa Ziehau 		conn = STAILQ_FIRST(&ctx->t_conn);
664772669ffSSepherosa Ziehau 		STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
665772669ffSSepherosa Ziehau 		pthread_mutex_unlock(&ctx->t_lock);
666772669ffSSepherosa Ziehau 
667772669ffSSepherosa Ziehau 		if (conn->c_in.sin_port == 0) {
668772669ffSSepherosa Ziehau 			/*
669772669ffSSepherosa Ziehau 			 * The marker for 'the end of connection list'.
670772669ffSSepherosa Ziehau 			 * See the related comment in main thread.
671772669ffSSepherosa Ziehau 			 *
672772669ffSSepherosa Ziehau 			 * NOTE:
673772669ffSSepherosa Ziehau 			 * We reuse the marker as the udata for the
674772669ffSSepherosa Ziehau 			 * kqueue timer.
675772669ffSSepherosa Ziehau 			 */
676772669ffSSepherosa Ziehau 			timeo = conn;
677772669ffSSepherosa Ziehau 			break;
678772669ffSSepherosa Ziehau 		}
679772669ffSSepherosa Ziehau 
680772669ffSSepherosa Ziehau 		++nconn;
681772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
682772669ffSSepherosa Ziehau 		fprintf(stderr, "%s %s:%d\n", name,
683772669ffSSepherosa Ziehau 		    inet_ntop(AF_INET, &conn->c_in.sin_addr,
684772669ffSSepherosa Ziehau 		        addr_name, sizeof(addr_name)),
685772669ffSSepherosa Ziehau 		    ntohs(conn->c_in.sin_port));
686772669ffSSepherosa Ziehau #endif
687772669ffSSepherosa Ziehau 
688772669ffSSepherosa Ziehau again:
689772669ffSSepherosa Ziehau 		conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
690772669ffSSepherosa Ziehau 		if (conn->c_s < 0)
691772669ffSSepherosa Ziehau 			err(1, "socket failed");
692772669ffSSepherosa Ziehau 
693772669ffSSepherosa Ziehau 		if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
694772669ffSSepherosa Ziehau 		    sizeof(conn->c_in)) < 0)
695772669ffSSepherosa Ziehau 			err(1, "connect failed");
696772669ffSSepherosa Ziehau 
697772669ffSSepherosa Ziehau 		memset(&readto, 0, sizeof(readto));
698772669ffSSepherosa Ziehau 		readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
699772669ffSSepherosa Ziehau 		readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
700772669ffSSepherosa Ziehau 		if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
701772669ffSSepherosa Ziehau 		    sizeof(readto)) < 0)
702772669ffSSepherosa Ziehau 			err(1, "setsockopt(RCVTIMEO) failed");
703772669ffSSepherosa Ziehau 
704772669ffSSepherosa Ziehau 		n = read(conn->c_s, &ack, sizeof(ack));
705772669ffSSepherosa Ziehau 		if (n != sizeof(ack)) {
706772669ffSSepherosa Ziehau 			if (n < 0) {
707772669ffSSepherosa Ziehau 				if (errno == EAGAIN) {
708772669ffSSepherosa Ziehau 					close(conn->c_s);
709772669ffSSepherosa Ziehau 					goto again;
710772669ffSSepherosa Ziehau 				}
711772669ffSSepherosa Ziehau 				err(1, "read ack failed");
712772669ffSSepherosa Ziehau 			} else {
713772669ffSSepherosa Ziehau 				errx(1, "read truncated ack");
714772669ffSSepherosa Ziehau 			}
715772669ffSSepherosa Ziehau 		}
716772669ffSSepherosa Ziehau 
717772669ffSSepherosa Ziehau 		on = 1;
718772669ffSSepherosa Ziehau 		if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
719772669ffSSepherosa Ziehau 			err(1, "ioctl(FIONBIO) failed");
720772669ffSSepherosa Ziehau 
721772669ffSSepherosa Ziehau 		EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
722772669ffSSepherosa Ziehau 		n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
723772669ffSSepherosa Ziehau 		if (n < 0)
724772669ffSSepherosa Ziehau 			err(1, "kevent add failed");
725772669ffSSepherosa Ziehau 	}
726772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
727772669ffSSepherosa Ziehau 	fprintf(stderr, "%s conn %d\n", name, nconn);
728772669ffSSepherosa Ziehau #endif
729772669ffSSepherosa Ziehau 
730772669ffSSepherosa Ziehau 	/*
731772669ffSSepherosa Ziehau 	 * Sender start sync, stage 1:
732772669ffSSepherosa Ziehau 	 * Wait for connections establishment (slow).
733772669ffSSepherosa Ziehau 	 */
734772669ffSSepherosa Ziehau 	pthread_mutex_lock(&ctx->t_glob->g_lock);
735772669ffSSepherosa Ziehau 	ctx->t_glob->g_nconn -= nconn;
736772669ffSSepherosa Ziehau 	pthread_cond_broadcast(&ctx->t_glob->g_cond);
737772669ffSSepherosa Ziehau 	while (ctx->t_glob->g_nconn != 0)
738772669ffSSepherosa Ziehau 		pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
739772669ffSSepherosa Ziehau 	pthread_mutex_unlock(&ctx->t_glob->g_lock);
740772669ffSSepherosa Ziehau 
741772669ffSSepherosa Ziehau 	/*
742772669ffSSepherosa Ziehau 	 * Sender start sync, stage2.
743772669ffSSepherosa Ziehau 	 */
744772669ffSSepherosa Ziehau 	/* Increase the g_nwait. */
745772669ffSSepherosa Ziehau 	atomic_add_int(&ctx->t_glob->g_nwait, 1);
746772669ffSSepherosa Ziehau 	/* Spin-wait for main thread to release us (reset g_nwait). */
747772669ffSSepherosa Ziehau 	while (ctx->t_glob->g_nwait)
748772669ffSSepherosa Ziehau 		send_spinwait();
749772669ffSSepherosa Ziehau 
750772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
751772669ffSSepherosa Ziehau 	fprintf(stderr, "%s start\n", name);
752772669ffSSepherosa Ziehau #endif
753772669ffSSepherosa Ziehau 
754772669ffSSepherosa Ziehau 	/*
755772669ffSSepherosa Ziehau 	 * Wire a kqueue timer, so that the sending can be terminated
756772669ffSSepherosa Ziehau 	 * as requested.
757772669ffSSepherosa Ziehau 	 *
758772669ffSSepherosa Ziehau 	 * NOTE:
759772669ffSSepherosa Ziehau 	 * Set -2 to c_s for timer udata, so we could distinguish it
760772669ffSSepherosa Ziehau 	 * from real connections.
761772669ffSSepherosa Ziehau 	 */
762772669ffSSepherosa Ziehau 	timeo->c_s = -2;
763772669ffSSepherosa Ziehau 	EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
764772669ffSSepherosa Ziehau 	    ctx->t_glob->g_dur * 1000L, timeo);
765772669ffSSepherosa Ziehau 	n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
766772669ffSSepherosa Ziehau 	if (n < 0)
767772669ffSSepherosa Ziehau 		err(1, "kevent add failed");
768772669ffSSepherosa Ziehau 
769772669ffSSepherosa Ziehau 	clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
770772669ffSSepherosa Ziehau 	for (;;) {
771772669ffSSepherosa Ziehau 		struct kevent evt[SEND_EVENT_MAX];
772772669ffSSepherosa Ziehau 		int nevt, i;
773772669ffSSepherosa Ziehau 
774772669ffSSepherosa Ziehau 		nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
775772669ffSSepherosa Ziehau 		if (nevt < 0)
776772669ffSSepherosa Ziehau 			err(1, "kevent failed");
777772669ffSSepherosa Ziehau 
778772669ffSSepherosa Ziehau 		for (i = 0; i < nevt; ++i) {
779772669ffSSepherosa Ziehau 			struct conn_ctx *conn = evt[i].udata;
780772669ffSSepherosa Ziehau 
781772669ffSSepherosa Ziehau 			if (conn->c_s < 0) {
782772669ffSSepherosa Ziehau 				if (conn->c_s == -2) {
783772669ffSSepherosa Ziehau 					/* Timer expired */
784772669ffSSepherosa Ziehau 					goto done;
785772669ffSSepherosa Ziehau 				}
786772669ffSSepherosa Ziehau 				continue;
787772669ffSSepherosa Ziehau 			}
788772669ffSSepherosa Ziehau 
789dbeb2594SSepherosa Ziehau 			if (fd >= 0) {
790dbeb2594SSepherosa Ziehau 				off_t m, off;
791dbeb2594SSepherosa Ziehau 				size_t len;
792dbeb2594SSepherosa Ziehau 
793a06d7982SSepherosa Ziehau 				off = conn->c_stat % buflen;
794a06d7982SSepherosa Ziehau 				len = buflen - off;
795dbeb2594SSepherosa Ziehau 
796dbeb2594SSepherosa Ziehau 				n = sendfile(fd, conn->c_s, off, len, NULL,
797dbeb2594SSepherosa Ziehau 				    &m, 0);
798dbeb2594SSepherosa Ziehau 				if (n == 0 || (n < 0 && errno == EAGAIN))
799dbeb2594SSepherosa Ziehau 					n = m;
800dbeb2594SSepherosa Ziehau 			} else {
801a06d7982SSepherosa Ziehau 				n = write(conn->c_s, buf, buflen);
802dbeb2594SSepherosa Ziehau 			}
803dbeb2594SSepherosa Ziehau 
804772669ffSSepherosa Ziehau 			if (n < 0) {
805772669ffSSepherosa Ziehau 				if (errno != EAGAIN) {
806772669ffSSepherosa Ziehau 					conn->c_err = errno;
807772669ffSSepherosa Ziehau 					clock_gettime(CLOCK_MONOTONIC_PRECISE,
808772669ffSSepherosa Ziehau 					    &conn->c_terr);
809772669ffSSepherosa Ziehau 					close(conn->c_s);
810772669ffSSepherosa Ziehau 					conn->c_s = -1;
811772669ffSSepherosa Ziehau 				}
812772669ffSSepherosa Ziehau 			} else {
813772669ffSSepherosa Ziehau 				conn->c_stat += n;
814772669ffSSepherosa Ziehau 			}
815772669ffSSepherosa Ziehau 		}
816772669ffSSepherosa Ziehau 	}
817772669ffSSepherosa Ziehau done:
818772669ffSSepherosa Ziehau 	clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
819dbeb2594SSepherosa Ziehau 
820dbeb2594SSepherosa Ziehau 	if (fd >= 0)
821dbeb2594SSepherosa Ziehau 		close(fd);
822dbeb2594SSepherosa Ziehau 	if (buf != NULL)
823dbeb2594SSepherosa Ziehau 		free(buf);
824772669ffSSepherosa Ziehau 	return NULL;
825772669ffSSepherosa Ziehau }
826