1772669ffSSepherosa Ziehau #include <sys/param.h>
2772669ffSSepherosa Ziehau #include <sys/event.h>
3772669ffSSepherosa Ziehau #include <sys/ioctl.h>
4772669ffSSepherosa Ziehau #include <sys/queue.h>
5772669ffSSepherosa Ziehau #include <sys/socket.h>
6772669ffSSepherosa Ziehau #include <sys/sysctl.h>
7772669ffSSepherosa Ziehau #include <sys/time.h>
8772669ffSSepherosa Ziehau
9772669ffSSepherosa Ziehau #include <machine/atomic.h>
10772669ffSSepherosa Ziehau #ifdef __FreeBSD__
11772669ffSSepherosa Ziehau #include <machine/cpu.h>
12772669ffSSepherosa Ziehau #endif
13772669ffSSepherosa Ziehau #include <machine/cpufunc.h>
14772669ffSSepherosa Ziehau
15772669ffSSepherosa Ziehau #include <arpa/inet.h>
16772669ffSSepherosa Ziehau #include <netinet/in.h>
17772669ffSSepherosa Ziehau
18772669ffSSepherosa Ziehau #include <err.h>
19772669ffSSepherosa Ziehau #include <errno.h>
20772669ffSSepherosa Ziehau #include <pthread.h>
21772669ffSSepherosa Ziehau #include <pthread_np.h>
22772669ffSSepherosa Ziehau #include <signal.h>
23dbeb2594SSepherosa Ziehau #include <stdbool.h>
24772669ffSSepherosa Ziehau #include <stdio.h>
25772669ffSSepherosa Ziehau #include <stdint.h>
26772669ffSSepherosa Ziehau #include <stdlib.h>
27772669ffSSepherosa Ziehau #include <string.h>
28772669ffSSepherosa Ziehau #include <unistd.h>
29772669ffSSepherosa Ziehau
30772669ffSSepherosa Ziehau #include "kq_sendrecv_proto.h"
31772669ffSSepherosa Ziehau
32772669ffSSepherosa Ziehau /*
33772669ffSSepherosa Ziehau * Note about the sender start synchronization.
34772669ffSSepherosa Ziehau *
35772669ffSSepherosa Ziehau * We apply two stage synchronization. The first stage uses pthread
36772669ffSSepherosa Ziehau * condition (it sleeps), which waits for the establishment for all
37772669ffSSepherosa Ziehau * connections, which could be slow. The second stage uses g_nwait
38772669ffSSepherosa Ziehau * of send_globctx; all relevant threads spin on g_nwait. The main
39772669ffSSepherosa Ziehau * thread spin-waits for all senders to increase g_nwait. The sender
40772669ffSSepherosa Ziehau * thread increases the g_nwait, then it spin-waits for main thread
41772669ffSSepherosa Ziehau * to reset g_nwait. In this way, we can make sure that all senders
42772669ffSSepherosa Ziehau * start roughly at the same time.
43772669ffSSepherosa Ziehau */
44772669ffSSepherosa Ziehau
45772669ffSSepherosa Ziehau #if 0
46772669ffSSepherosa Ziehau #define SEND_DEBUG
47772669ffSSepherosa Ziehau #endif
48772669ffSSepherosa Ziehau #if 0
49772669ffSSepherosa Ziehau #define SEND_TIME_DEBUG
50772669ffSSepherosa Ziehau #endif
51772669ffSSepherosa Ziehau
52772669ffSSepherosa Ziehau #define SEND_DUR 10
53772669ffSSepherosa Ziehau #define SEND_EVENT_MAX 64
54772669ffSSepherosa Ziehau #define SEND_BUFLEN (128 * 1024)
55772669ffSSepherosa Ziehau
56772669ffSSepherosa Ziehau /*
57772669ffSSepherosa Ziehau * The successful 3-way handshake on the connection does not mean the
58772669ffSSepherosa Ziehau * remote application can accept(2) this connection. Even worse, the
59772669ffSSepherosa Ziehau * remote side's network stack may drop the connection silently, i.e.
60772669ffSSepherosa Ziehau * w/o RST. If this happened, the blocking read(2) would not return,
61772669ffSSepherosa Ziehau * until the keepalive kicked in, which would take quite some time.
62772669ffSSepherosa Ziehau * This is obviously not what we want here, so use synthetic timeout
63772669ffSSepherosa Ziehau * for blocking read(2). Here, we will retry if a blocking read(2)
64772669ffSSepherosa Ziehau * times out.
65772669ffSSepherosa Ziehau */
66772669ffSSepherosa Ziehau #define SEND_READTO_MS 1000 /* unit: ms */
67772669ffSSepherosa Ziehau
68772669ffSSepherosa Ziehau #if defined(__DragonFly__)
69772669ffSSepherosa Ziehau #define SEND_CONN_CTX_ALIGN __VM_CACHELINE_SIZE
70772669ffSSepherosa Ziehau #elif defined(__FreeBSD__)
71772669ffSSepherosa Ziehau #define SEND_CONN_CTX_ALIGN CACHE_LINE_SIZE
72772669ffSSepherosa Ziehau #else
73772669ffSSepherosa Ziehau #define SEND_CONN_CTX_ALIGN 64 /* XXX */
74772669ffSSepherosa Ziehau #endif
75772669ffSSepherosa Ziehau
76772669ffSSepherosa Ziehau struct conn_ctx {
77772669ffSSepherosa Ziehau int c_s;
78772669ffSSepherosa Ziehau int c_err;
79772669ffSSepherosa Ziehau uint64_t c_stat;
80772669ffSSepherosa Ziehau struct timespec c_terr;
81772669ffSSepherosa Ziehau
82772669ffSSepherosa Ziehau STAILQ_ENTRY(conn_ctx) c_glob_link;
83772669ffSSepherosa Ziehau STAILQ_ENTRY(conn_ctx) c_link;
84772669ffSSepherosa Ziehau struct sockaddr_in c_in;
85772669ffSSepherosa Ziehau int c_thr_id;
86772669ffSSepherosa Ziehau } __aligned(SEND_CONN_CTX_ALIGN);
87772669ffSSepherosa Ziehau
88772669ffSSepherosa Ziehau STAILQ_HEAD(conn_ctx_list, conn_ctx);
89772669ffSSepherosa Ziehau
90772669ffSSepherosa Ziehau struct send_globctx {
91772669ffSSepherosa Ziehau struct conn_ctx_list g_conn;
92772669ffSSepherosa Ziehau
93772669ffSSepherosa Ziehau int g_dur;
94772669ffSSepherosa Ziehau int g_nconn;
95772669ffSSepherosa Ziehau pthread_mutex_t g_lock;
96772669ffSSepherosa Ziehau pthread_cond_t g_cond;
97772669ffSSepherosa Ziehau
98772669ffSSepherosa Ziehau volatile u_int g_nwait;
99772669ffSSepherosa Ziehau int g_readto_ms; /* unit: ms */
100a06d7982SSepherosa Ziehau int g_buflen;
101dbeb2594SSepherosa Ziehau bool g_sendfile;
102772669ffSSepherosa Ziehau };
103772669ffSSepherosa Ziehau
104772669ffSSepherosa Ziehau struct send_thrctx {
105772669ffSSepherosa Ziehau struct conn_ctx_list t_conn;
106772669ffSSepherosa Ziehau pthread_mutex_t t_lock;
107772669ffSSepherosa Ziehau pthread_cond_t t_cond;
108772669ffSSepherosa Ziehau
109772669ffSSepherosa Ziehau struct send_globctx *t_glob;
110772669ffSSepherosa Ziehau struct timespec t_start;
111772669ffSSepherosa Ziehau struct timespec t_end;
112772669ffSSepherosa Ziehau double t_run_us; /* unit: us */
113772669ffSSepherosa Ziehau
114772669ffSSepherosa Ziehau pthread_t t_tid;
115772669ffSSepherosa Ziehau int t_id;
116772669ffSSepherosa Ziehau };
117772669ffSSepherosa Ziehau
118772669ffSSepherosa Ziehau static void send_build_addrlist(const struct sockaddr_in *, int,
119772669ffSSepherosa Ziehau const struct sockaddr_in **, int *, int);
120772669ffSSepherosa Ziehau static void *send_thread(void *);
121772669ffSSepherosa Ziehau
122772669ffSSepherosa Ziehau static __inline void
send_spinwait(void)123772669ffSSepherosa Ziehau send_spinwait(void)
124772669ffSSepherosa Ziehau {
125772669ffSSepherosa Ziehau #if defined(__DragonFly__)
126772669ffSSepherosa Ziehau cpu_pause();
127772669ffSSepherosa Ziehau #elif defined(__FreeBSD__)
128772669ffSSepherosa Ziehau cpu_spinwait();
129772669ffSSepherosa Ziehau #else
130772669ffSSepherosa Ziehau /* XXX nothing */
131772669ffSSepherosa Ziehau #endif
132772669ffSSepherosa Ziehau }
133772669ffSSepherosa Ziehau
134772669ffSSepherosa Ziehau static void
usage(const char * cmd)135772669ffSSepherosa Ziehau usage(const char *cmd)
136772669ffSSepherosa Ziehau {
137772669ffSSepherosa Ziehau fprintf(stderr, "%s -4 addr4 [-4 addr4 ...] [-p port] "
138a06d7982SSepherosa Ziehau "-c conns [-t nthreads] [-l sec] [-r readto_ms] [-S] [-E] "
1392fe73a22SSepherosa Ziehau "[-b buflen] [-B]\n", cmd);
140772669ffSSepherosa Ziehau exit(2);
141772669ffSSepherosa Ziehau }
142772669ffSSepherosa Ziehau
143772669ffSSepherosa Ziehau int
main(int argc,char * argv[])144772669ffSSepherosa Ziehau main(int argc, char *argv[])
145772669ffSSepherosa Ziehau {
146772669ffSSepherosa Ziehau struct send_globctx glob;
147772669ffSSepherosa Ziehau struct send_thrctx *ctx_arr, *ctx;
148772669ffSSepherosa Ziehau struct sockaddr_in *in_arr, *in;
149772669ffSSepherosa Ziehau const struct sockaddr_in *daddr;
150772669ffSSepherosa Ziehau struct timespec run, end, start;
151772669ffSSepherosa Ziehau double total_run_us, total, conn_min, conn_max;
152772669ffSSepherosa Ziehau double jain, jain_res;
153772669ffSSepherosa Ziehau int jain_cnt;
154772669ffSSepherosa Ziehau struct conn_ctx *conn;
155772669ffSSepherosa Ziehau sigset_t sigset;
1562fe73a22SSepherosa Ziehau int opt, i, ncpus;
157772669ffSSepherosa Ziehau int in_arr_cnt, in_arr_sz, ndaddr;
158a06d7982SSepherosa Ziehau int nthr, nconn, dur, readto_ms, buflen;
159772669ffSSepherosa Ziehau int log_err, err_cnt, has_minmax;
160772669ffSSepherosa Ziehau u_short port = RECV_PORT;
161772669ffSSepherosa Ziehau uint32_t idx;
162772669ffSSepherosa Ziehau size_t sz;
1632fe73a22SSepherosa Ziehau bool do_sendfile = false, bindcpu = false;
164772669ffSSepherosa Ziehau
165772669ffSSepherosa Ziehau sigemptyset(&sigset);
166772669ffSSepherosa Ziehau sigaddset(&sigset, SIGPIPE);
167772669ffSSepherosa Ziehau if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
168772669ffSSepherosa Ziehau err(1, "sigprocmask failed");
169772669ffSSepherosa Ziehau
1702fe73a22SSepherosa Ziehau sz = sizeof(ncpus);
1712fe73a22SSepherosa Ziehau if (sysctlbyname("hw.ncpu", &ncpus, &sz, NULL, 0) < 0)
172772669ffSSepherosa Ziehau err(1, "sysctl hw.ncpu failed");
1732fe73a22SSepherosa Ziehau nthr = ncpus;
174772669ffSSepherosa Ziehau
175772669ffSSepherosa Ziehau in_arr_sz = 4;
176772669ffSSepherosa Ziehau in_arr_cnt = 0;
177772669ffSSepherosa Ziehau in_arr = malloc(in_arr_sz * sizeof(struct sockaddr_in));
178772669ffSSepherosa Ziehau if (in_arr == NULL)
179772669ffSSepherosa Ziehau err(1, "malloc failed");
180772669ffSSepherosa Ziehau
181772669ffSSepherosa Ziehau log_err = 0;
182772669ffSSepherosa Ziehau nconn = 0;
183772669ffSSepherosa Ziehau dur = SEND_DUR;
184772669ffSSepherosa Ziehau readto_ms = SEND_READTO_MS;
185a06d7982SSepherosa Ziehau buflen = SEND_BUFLEN;
186772669ffSSepherosa Ziehau
1872fe73a22SSepherosa Ziehau while ((opt = getopt(argc, argv, "4:BESb:c:l:p:r:t:")) != -1) {
188772669ffSSepherosa Ziehau switch (opt) {
189772669ffSSepherosa Ziehau case '4':
190772669ffSSepherosa Ziehau if (in_arr_cnt == in_arr_sz) {
191772669ffSSepherosa Ziehau in_arr_sz *= 2;
192772669ffSSepherosa Ziehau in_arr = reallocf(in_arr,
193772669ffSSepherosa Ziehau in_arr_sz * sizeof(struct sockaddr_in));
194772669ffSSepherosa Ziehau if (in_arr == NULL)
195772669ffSSepherosa Ziehau err(1, "reallocf failed");
196772669ffSSepherosa Ziehau }
197772669ffSSepherosa Ziehau in = &in_arr[in_arr_cnt];
198772669ffSSepherosa Ziehau ++in_arr_cnt;
199772669ffSSepherosa Ziehau
200772669ffSSepherosa Ziehau memset(in, 0, sizeof(*in));
201772669ffSSepherosa Ziehau in->sin_family = AF_INET;
202772669ffSSepherosa Ziehau if (inet_pton(AF_INET, optarg, &in->sin_addr) <= 0)
203772669ffSSepherosa Ziehau errx(1, "inet_pton failed %s", optarg);
204772669ffSSepherosa Ziehau break;
205772669ffSSepherosa Ziehau
2062fe73a22SSepherosa Ziehau case 'B':
2072fe73a22SSepherosa Ziehau bindcpu = true;
2082fe73a22SSepherosa Ziehau break;
2092fe73a22SSepherosa Ziehau
210772669ffSSepherosa Ziehau case 'E':
211772669ffSSepherosa Ziehau log_err = 1;
212772669ffSSepherosa Ziehau break;
213772669ffSSepherosa Ziehau
214dbeb2594SSepherosa Ziehau case 'S':
215dbeb2594SSepherosa Ziehau do_sendfile = true;
216dbeb2594SSepherosa Ziehau break;
217dbeb2594SSepherosa Ziehau
218a06d7982SSepherosa Ziehau case 'b':
219a06d7982SSepherosa Ziehau buflen = strtol(optarg, NULL, 10);
220a06d7982SSepherosa Ziehau if (buflen <= 0)
221a06d7982SSepherosa Ziehau errx(1, "invalid -b");
222a06d7982SSepherosa Ziehau break;
223a06d7982SSepherosa Ziehau
224772669ffSSepherosa Ziehau case 'c':
225772669ffSSepherosa Ziehau nconn = strtol(optarg, NULL, 10);
226772669ffSSepherosa Ziehau if (nconn <= 0)
227772669ffSSepherosa Ziehau errx(1, "invalid -c");
228772669ffSSepherosa Ziehau break;
229772669ffSSepherosa Ziehau
230772669ffSSepherosa Ziehau case 'l':
231772669ffSSepherosa Ziehau dur = strtoul(optarg, NULL, 10);
232772669ffSSepherosa Ziehau if (dur == 0)
233772669ffSSepherosa Ziehau errx(1, "invalid -l");
234772669ffSSepherosa Ziehau break;
235772669ffSSepherosa Ziehau
236772669ffSSepherosa Ziehau case 'p':
237772669ffSSepherosa Ziehau port = strtoul(optarg, NULL, 10);
238772669ffSSepherosa Ziehau break;
239772669ffSSepherosa Ziehau
240772669ffSSepherosa Ziehau case 'r':
241772669ffSSepherosa Ziehau readto_ms = strtol(optarg, NULL, 10);
242772669ffSSepherosa Ziehau if (readto_ms <= 0)
243772669ffSSepherosa Ziehau errx(1, "invalid -r");
244772669ffSSepherosa Ziehau break;
245772669ffSSepherosa Ziehau
246772669ffSSepherosa Ziehau case 't':
247772669ffSSepherosa Ziehau nthr = strtol(optarg, NULL, 10);
248772669ffSSepherosa Ziehau if (nthr <= 0)
249772669ffSSepherosa Ziehau errx(1, "invalid -t");
250772669ffSSepherosa Ziehau break;
251772669ffSSepherosa Ziehau
252772669ffSSepherosa Ziehau default:
253772669ffSSepherosa Ziehau usage(argv[0]);
254772669ffSSepherosa Ziehau }
255772669ffSSepherosa Ziehau }
256772669ffSSepherosa Ziehau if (in_arr_cnt == 0 || nconn == 0)
257dbeb2594SSepherosa Ziehau errx(1, "either -4 or -c are specified");
258772669ffSSepherosa Ziehau
259772669ffSSepherosa Ziehau if (nthr > nconn)
260772669ffSSepherosa Ziehau nthr = nconn;
261772669ffSSepherosa Ziehau
262772669ffSSepherosa Ziehau for (i = 0; i < in_arr_cnt; ++i)
263772669ffSSepherosa Ziehau in_arr[i].sin_port = htons(port);
264772669ffSSepherosa Ziehau
265772669ffSSepherosa Ziehau ctx_arr = calloc(nthr, sizeof(struct send_thrctx));
266772669ffSSepherosa Ziehau if (ctx_arr == NULL)
267772669ffSSepherosa Ziehau err(1, "calloc failed");
268772669ffSSepherosa Ziehau
269772669ffSSepherosa Ziehau memset(&glob, 0, sizeof(glob));
270772669ffSSepherosa Ziehau STAILQ_INIT(&glob.g_conn);
271772669ffSSepherosa Ziehau glob.g_nconn = nconn;
272772669ffSSepherosa Ziehau glob.g_nwait = 1; /* count self in */
273772669ffSSepherosa Ziehau glob.g_dur = dur;
274772669ffSSepherosa Ziehau glob.g_readto_ms = readto_ms;
275dbeb2594SSepherosa Ziehau glob.g_sendfile = do_sendfile;
276a06d7982SSepherosa Ziehau glob.g_buflen = buflen;
277772669ffSSepherosa Ziehau pthread_mutex_init(&glob.g_lock, NULL);
278772669ffSSepherosa Ziehau pthread_cond_init(&glob.g_cond, NULL);
279772669ffSSepherosa Ziehau
280772669ffSSepherosa Ziehau pthread_set_name_np(pthread_self(), "main");
281772669ffSSepherosa Ziehau
282772669ffSSepherosa Ziehau /* Build receiver address list */
283772669ffSSepherosa Ziehau send_build_addrlist(in_arr, in_arr_cnt, &daddr, &ndaddr, readto_ms);
284772669ffSSepherosa Ziehau
285772669ffSSepherosa Ziehau /*
286772669ffSSepherosa Ziehau * Start senders.
287772669ffSSepherosa Ziehau */
288772669ffSSepherosa Ziehau for (i = 0; i < nthr; ++i) {
2892fe73a22SSepherosa Ziehau pthread_attr_t attr;
290772669ffSSepherosa Ziehau int error;
291772669ffSSepherosa Ziehau
292772669ffSSepherosa Ziehau ctx = &ctx_arr[i];
293772669ffSSepherosa Ziehau STAILQ_INIT(&ctx->t_conn);
294772669ffSSepherosa Ziehau ctx->t_id = i;
295772669ffSSepherosa Ziehau ctx->t_glob = &glob;
296772669ffSSepherosa Ziehau pthread_mutex_init(&ctx->t_lock, NULL);
297772669ffSSepherosa Ziehau pthread_cond_init(&ctx->t_cond, NULL);
298772669ffSSepherosa Ziehau
2992fe73a22SSepherosa Ziehau pthread_attr_init(&attr);
3002fe73a22SSepherosa Ziehau if (bindcpu) {
30112439f34SSepherosa Ziehau #ifdef __FreeBSD__
30212439f34SSepherosa Ziehau cpuset_t mask;
30312439f34SSepherosa Ziehau #else
3042fe73a22SSepherosa Ziehau cpu_set_t mask;
30512439f34SSepherosa Ziehau #endif
3062fe73a22SSepherosa Ziehau
3072fe73a22SSepherosa Ziehau CPU_ZERO(&mask);
3082fe73a22SSepherosa Ziehau CPU_SET(i % ncpus, &mask);
3092fe73a22SSepherosa Ziehau error = pthread_attr_setaffinity_np(&attr,
3102fe73a22SSepherosa Ziehau sizeof(mask), &mask);
3112fe73a22SSepherosa Ziehau if (error) {
3122fe73a22SSepherosa Ziehau errc(1, error, "pthread_attr_setaffinity_np "
3132fe73a22SSepherosa Ziehau "failed");
3142fe73a22SSepherosa Ziehau }
3152fe73a22SSepherosa Ziehau }
3162fe73a22SSepherosa Ziehau
3172fe73a22SSepherosa Ziehau error = pthread_create(&ctx->t_tid, &attr, send_thread, ctx);
318772669ffSSepherosa Ziehau if (error)
319772669ffSSepherosa Ziehau errc(1, error, "pthread_create failed");
3202fe73a22SSepherosa Ziehau pthread_attr_destroy(&attr);
321772669ffSSepherosa Ziehau }
322772669ffSSepherosa Ziehau
323772669ffSSepherosa Ziehau /*
324772669ffSSepherosa Ziehau * Distribute connections to senders.
325772669ffSSepherosa Ziehau *
326772669ffSSepherosa Ziehau * NOTE:
327772669ffSSepherosa Ziehau * We start from a random position in the address list, so that the
328772669ffSSepherosa Ziehau * first several receiving servers will not be abused, if the number
329772669ffSSepherosa Ziehau * of connections is small and there are many clients.
330772669ffSSepherosa Ziehau */
331772669ffSSepherosa Ziehau idx = arc4random_uniform(ndaddr);
332772669ffSSepherosa Ziehau for (i = 0; i < nconn; ++i) {
333772669ffSSepherosa Ziehau const struct sockaddr_in *da;
334772669ffSSepherosa Ziehau
335772669ffSSepherosa Ziehau da = &daddr[idx % ndaddr];
336772669ffSSepherosa Ziehau ++idx;
337772669ffSSepherosa Ziehau
338772669ffSSepherosa Ziehau conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
339772669ffSSepherosa Ziehau if (conn == NULL)
340772669ffSSepherosa Ziehau err(1, "aligned_alloc failed");
341772669ffSSepherosa Ziehau memset(conn, 0, sizeof(*conn));
342772669ffSSepherosa Ziehau conn->c_in = *da;
343772669ffSSepherosa Ziehau conn->c_s = -1;
344772669ffSSepherosa Ziehau
345772669ffSSepherosa Ziehau ctx = &ctx_arr[i % nthr];
346772669ffSSepherosa Ziehau conn->c_thr_id = ctx->t_id;
347772669ffSSepherosa Ziehau
348772669ffSSepherosa Ziehau pthread_mutex_lock(&ctx->t_lock);
349772669ffSSepherosa Ziehau STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
350772669ffSSepherosa Ziehau pthread_mutex_unlock(&ctx->t_lock);
351772669ffSSepherosa Ziehau pthread_cond_signal(&ctx->t_cond);
352772669ffSSepherosa Ziehau
353772669ffSSepherosa Ziehau /* Add to the global list for results gathering */
354772669ffSSepherosa Ziehau STAILQ_INSERT_TAIL(&glob.g_conn, conn, c_glob_link);
355772669ffSSepherosa Ziehau }
356772669ffSSepherosa Ziehau
357772669ffSSepherosa Ziehau /*
358772669ffSSepherosa Ziehau * No more connections; notify the senders.
359772669ffSSepherosa Ziehau *
360772669ffSSepherosa Ziehau * NOTE:
361772669ffSSepherosa Ziehau * The marker for 'the end of connection list' has 0 in its
362772669ffSSepherosa Ziehau * c_in.sin_port.
363772669ffSSepherosa Ziehau */
364772669ffSSepherosa Ziehau for (i = 0; i < nthr; ++i) {
365772669ffSSepherosa Ziehau conn = aligned_alloc(SEND_CONN_CTX_ALIGN, sizeof(*conn));
366772669ffSSepherosa Ziehau if (conn == NULL)
367772669ffSSepherosa Ziehau err(1, "aligned_alloc failed");
368772669ffSSepherosa Ziehau memset(conn, 0, sizeof(*conn));
369772669ffSSepherosa Ziehau conn->c_s = -1;
370772669ffSSepherosa Ziehau
371772669ffSSepherosa Ziehau ctx = &ctx_arr[i];
372772669ffSSepherosa Ziehau pthread_mutex_lock(&ctx->t_lock);
373772669ffSSepherosa Ziehau STAILQ_INSERT_TAIL(&ctx->t_conn, conn, c_link);
374772669ffSSepherosa Ziehau pthread_mutex_unlock(&ctx->t_lock);
375772669ffSSepherosa Ziehau pthread_cond_signal(&ctx->t_cond);
376772669ffSSepherosa Ziehau }
377772669ffSSepherosa Ziehau
378772669ffSSepherosa Ziehau /*
379772669ffSSepherosa Ziehau * Sender start sync, stage 1:
380772669ffSSepherosa Ziehau * Wait for connections establishment (slow).
381772669ffSSepherosa Ziehau */
382772669ffSSepherosa Ziehau pthread_mutex_lock(&glob.g_lock);
383772669ffSSepherosa Ziehau while (glob.g_nconn != 0)
384772669ffSSepherosa Ziehau pthread_cond_wait(&glob.g_cond, &glob.g_lock);
385772669ffSSepherosa Ziehau pthread_mutex_unlock(&glob.g_lock);
386772669ffSSepherosa Ziehau
387772669ffSSepherosa Ziehau /*
388772669ffSSepherosa Ziehau * Sender start sync, stage 2:
389772669ffSSepherosa Ziehau * Wait for senders to spin-wait; and once all senders spin-wait,
390772669ffSSepherosa Ziehau * release them by resetting g_nwait.
391772669ffSSepherosa Ziehau */
392772669ffSSepherosa Ziehau while (atomic_cmpset_int(&glob.g_nwait, nthr + 1, 0) == 0)
393772669ffSSepherosa Ziehau send_spinwait();
394772669ffSSepherosa Ziehau
395772669ffSSepherosa Ziehau fprintf(stderr, "start %d seconds sending test: %d threads, "
396772669ffSSepherosa Ziehau "%d connections\n", dur, nthr, nconn);
397772669ffSSepherosa Ziehau
398772669ffSSepherosa Ziehau /*
399772669ffSSepherosa Ziehau * Wait for the senders to finish and gather the results.
400772669ffSSepherosa Ziehau */
401772669ffSSepherosa Ziehau
402772669ffSSepherosa Ziehau memset(&end, 0, sizeof(end)); /* XXX stupid gcc warning */
403772669ffSSepherosa Ziehau memset(&start, 0, sizeof(start)); /* XXX stupid gcc warning */
404772669ffSSepherosa Ziehau
405772669ffSSepherosa Ziehau for (i = 0; i < nthr; ++i) {
406772669ffSSepherosa Ziehau ctx = &ctx_arr[i];
407772669ffSSepherosa Ziehau pthread_join(ctx->t_tid, NULL);
408772669ffSSepherosa Ziehau
409*944cd60cSSascha Wildner timespecsub(&ctx->t_end, &ctx->t_start, &run);
410772669ffSSepherosa Ziehau ctx->t_run_us = ((double)run.tv_sec * 1000000.0) +
411772669ffSSepherosa Ziehau ((double)run.tv_nsec / 1000.0);
412772669ffSSepherosa Ziehau
413772669ffSSepherosa Ziehau if (i == 0) {
414772669ffSSepherosa Ziehau start = ctx->t_start;
415772669ffSSepherosa Ziehau end = ctx->t_end;
416772669ffSSepherosa Ziehau } else {
417772669ffSSepherosa Ziehau if (timespeccmp(&start, &ctx->t_start, >))
418772669ffSSepherosa Ziehau start = ctx->t_start;
419772669ffSSepherosa Ziehau if (timespeccmp(&end, &ctx->t_end, <))
420772669ffSSepherosa Ziehau end = ctx->t_end;
421772669ffSSepherosa Ziehau }
422772669ffSSepherosa Ziehau
423772669ffSSepherosa Ziehau #ifdef SEND_TIME_DEBUG
424772669ffSSepherosa Ziehau fprintf(stderr, "start %ld.%ld, end %ld.%ld\n",
425772669ffSSepherosa Ziehau ctx->t_start.tv_sec, ctx->t_start.tv_nsec,
426772669ffSSepherosa Ziehau ctx->t_end.tv_sec, ctx->t_end.tv_nsec);
427772669ffSSepherosa Ziehau #endif
428772669ffSSepherosa Ziehau }
429772669ffSSepherosa Ziehau
430772669ffSSepherosa Ziehau #ifdef SEND_TIME_DEBUG
431772669ffSSepherosa Ziehau fprintf(stderr, "start %ld.%ld, end %ld.%ld (final)\n",
432772669ffSSepherosa Ziehau start.tv_sec, start.tv_nsec, end.tv_sec, end.tv_nsec);
433772669ffSSepherosa Ziehau #endif
434772669ffSSepherosa Ziehau
435*944cd60cSSascha Wildner timespecsub(&end, &start, &run);
436772669ffSSepherosa Ziehau total_run_us = ((double)run.tv_sec * 1000000.0) +
437772669ffSSepherosa Ziehau ((double)run.tv_nsec / 1000.0);
438772669ffSSepherosa Ziehau total = 0.0;
439772669ffSSepherosa Ziehau
440772669ffSSepherosa Ziehau err_cnt = 0;
441772669ffSSepherosa Ziehau has_minmax = 0;
442772669ffSSepherosa Ziehau conn_min = 0.0;
443772669ffSSepherosa Ziehau conn_max = 0.0;
444772669ffSSepherosa Ziehau
445772669ffSSepherosa Ziehau jain = 0.0;
446772669ffSSepherosa Ziehau jain_res = 0.0;
447772669ffSSepherosa Ziehau jain_cnt = 0;
448772669ffSSepherosa Ziehau
449772669ffSSepherosa Ziehau STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
450772669ffSSepherosa Ziehau total += conn->c_stat;
451772669ffSSepherosa Ziehau if (conn->c_err == 0) {
452772669ffSSepherosa Ziehau double perf; /* unit: Mbps */
453772669ffSSepherosa Ziehau
454772669ffSSepherosa Ziehau perf = (conn->c_stat * 8.0) /
455772669ffSSepherosa Ziehau ctx_arr[conn->c_thr_id].t_run_us;
456772669ffSSepherosa Ziehau if (!has_minmax) {
457772669ffSSepherosa Ziehau conn_min = perf;
458772669ffSSepherosa Ziehau conn_max = perf;
459772669ffSSepherosa Ziehau has_minmax = 1;
460772669ffSSepherosa Ziehau } else {
461772669ffSSepherosa Ziehau if (perf > conn_max)
462772669ffSSepherosa Ziehau conn_max = perf;
463772669ffSSepherosa Ziehau if (perf < conn_min)
464772669ffSSepherosa Ziehau conn_min = perf;
465772669ffSSepherosa Ziehau }
466772669ffSSepherosa Ziehau jain += (perf * perf);
467772669ffSSepherosa Ziehau jain_res += perf;
468772669ffSSepherosa Ziehau ++jain_cnt;
469772669ffSSepherosa Ziehau } else {
470772669ffSSepherosa Ziehau ++err_cnt;
471772669ffSSepherosa Ziehau }
472772669ffSSepherosa Ziehau }
473772669ffSSepherosa Ziehau
474772669ffSSepherosa Ziehau jain *= jain_cnt;
475772669ffSSepherosa Ziehau jain = (jain_res * jain_res) / jain;
476772669ffSSepherosa Ziehau
477772669ffSSepherosa Ziehau printf("Total: %.2lf Mbps, min/max %.2lf Mbps/%.2lf Mbps, jain %.2lf, "
478772669ffSSepherosa Ziehau "error %d\n", (total * 8.0) / total_run_us, conn_min, conn_max,
479772669ffSSepherosa Ziehau jain, err_cnt);
480772669ffSSepherosa Ziehau
481772669ffSSepherosa Ziehau if (log_err && err_cnt) {
482772669ffSSepherosa Ziehau STAILQ_FOREACH(conn, &glob.g_conn, c_glob_link) {
483772669ffSSepherosa Ziehau char name[INET_ADDRSTRLEN];
484772669ffSSepherosa Ziehau double tmp_run;
485772669ffSSepherosa Ziehau
486772669ffSSepherosa Ziehau if (conn->c_err == 0)
487772669ffSSepherosa Ziehau continue;
488772669ffSSepherosa Ziehau
489772669ffSSepherosa Ziehau run = conn->c_terr;
490*944cd60cSSascha Wildner timespecsub(&conn->c_terr,
491*944cd60cSSascha Wildner &ctx_arr[conn->c_thr_id].t_start, &run);
492772669ffSSepherosa Ziehau tmp_run = ((double)run.tv_sec * 1000000.0) +
493772669ffSSepherosa Ziehau ((double)run.tv_nsec / 1000.0);
494772669ffSSepherosa Ziehau fprintf(stderr, "snd%d ->%s:%d, %ld sec, %.2lf Mbps, "
495772669ffSSepherosa Ziehau "errno %d\n",
496772669ffSSepherosa Ziehau conn->c_thr_id,
497772669ffSSepherosa Ziehau inet_ntop(AF_INET, &conn->c_in.sin_addr,
498772669ffSSepherosa Ziehau name, sizeof(name)),
499772669ffSSepherosa Ziehau ntohs(conn->c_in.sin_port),
500772669ffSSepherosa Ziehau run.tv_sec, (conn->c_stat * 8.0) / tmp_run,
501772669ffSSepherosa Ziehau conn->c_err);
502772669ffSSepherosa Ziehau --err_cnt;
503772669ffSSepherosa Ziehau if (err_cnt == 0)
504772669ffSSepherosa Ziehau break;
505772669ffSSepherosa Ziehau }
506772669ffSSepherosa Ziehau }
507772669ffSSepherosa Ziehau
508772669ffSSepherosa Ziehau exit(0);
509772669ffSSepherosa Ziehau }
510772669ffSSepherosa Ziehau
511772669ffSSepherosa Ziehau static void
send_build_addrlist(const struct sockaddr_in * in_arr,int in_arr_cnt,const struct sockaddr_in ** daddr0,int * ndaddr0,int readto_ms)512772669ffSSepherosa Ziehau send_build_addrlist(const struct sockaddr_in *in_arr, int in_arr_cnt,
513772669ffSSepherosa Ziehau const struct sockaddr_in **daddr0, int *ndaddr0, int readto_ms)
514772669ffSSepherosa Ziehau {
515772669ffSSepherosa Ziehau struct sockaddr_in *daddr;
516772669ffSSepherosa Ziehau struct timeval readto;
517772669ffSSepherosa Ziehau int i, ndaddr;
518772669ffSSepherosa Ziehau
519772669ffSSepherosa Ziehau daddr = NULL;
520772669ffSSepherosa Ziehau ndaddr = 0;
521772669ffSSepherosa Ziehau
522772669ffSSepherosa Ziehau memset(&readto, 0, sizeof(readto));
523772669ffSSepherosa Ziehau readto.tv_sec = readto_ms / 1000;
524772669ffSSepherosa Ziehau readto.tv_usec = (readto_ms % 1000) * 1000;
525772669ffSSepherosa Ziehau
526772669ffSSepherosa Ziehau for (i = 0; i < in_arr_cnt; ++i) {
527772669ffSSepherosa Ziehau const struct sockaddr_in *in = &in_arr[i];
528772669ffSSepherosa Ziehau struct recv_info info_hdr;
529772669ffSSepherosa Ziehau uint16_t *ports;
530772669ffSSepherosa Ziehau int s, n, ports_sz, d;
531772669ffSSepherosa Ziehau
532772669ffSSepherosa Ziehau again:
533772669ffSSepherosa Ziehau s = socket(AF_INET, SOCK_STREAM, 0);
534772669ffSSepherosa Ziehau if (s < 0)
535772669ffSSepherosa Ziehau err(1, "socket failed");
536772669ffSSepherosa Ziehau
537772669ffSSepherosa Ziehau if (connect(s, (const struct sockaddr *)in, sizeof(*in)) < 0)
538772669ffSSepherosa Ziehau err(1, "connect failed");
539772669ffSSepherosa Ziehau
540772669ffSSepherosa Ziehau if (setsockopt(s, SOL_SOCKET, SO_RCVTIMEO,
541772669ffSSepherosa Ziehau &readto, sizeof(readto)) < 0)
542772669ffSSepherosa Ziehau err(1, "setsockopt(RCVTIMEO) failed");
543772669ffSSepherosa Ziehau
544772669ffSSepherosa Ziehau n = read(s, &info_hdr, sizeof(info_hdr));
545772669ffSSepherosa Ziehau if (n != sizeof(info_hdr)) {
546772669ffSSepherosa Ziehau if (n < 0) {
547772669ffSSepherosa Ziehau if (errno == EAGAIN) {
548772669ffSSepherosa Ziehau close(s);
549772669ffSSepherosa Ziehau goto again;
550772669ffSSepherosa Ziehau }
551772669ffSSepherosa Ziehau err(1, "read info hdr failed");
552772669ffSSepherosa Ziehau } else {
553772669ffSSepherosa Ziehau errx(1, "read truncated info hdr");
554772669ffSSepherosa Ziehau }
555772669ffSSepherosa Ziehau }
556772669ffSSepherosa Ziehau if (info_hdr.ndport == 0) {
557772669ffSSepherosa Ziehau close(s);
558772669ffSSepherosa Ziehau continue;
559772669ffSSepherosa Ziehau }
560772669ffSSepherosa Ziehau
561772669ffSSepherosa Ziehau ports_sz = info_hdr.ndport * sizeof(uint16_t);
562772669ffSSepherosa Ziehau ports = malloc(ports_sz);
563772669ffSSepherosa Ziehau if (ports == NULL)
564772669ffSSepherosa Ziehau err(1, "malloc failed");
565772669ffSSepherosa Ziehau
566772669ffSSepherosa Ziehau n = read(s, ports, ports_sz);
567772669ffSSepherosa Ziehau if (n != ports_sz) {
568772669ffSSepherosa Ziehau if (n < 0) {
569772669ffSSepherosa Ziehau if (errno == EAGAIN) {
570772669ffSSepherosa Ziehau free(ports);
571772669ffSSepherosa Ziehau close(s);
572772669ffSSepherosa Ziehau goto again;
573772669ffSSepherosa Ziehau }
574772669ffSSepherosa Ziehau err(1, "read ports failed");
575772669ffSSepherosa Ziehau } else {
576772669ffSSepherosa Ziehau errx(1, "read truncated ports");
577772669ffSSepherosa Ziehau }
578772669ffSSepherosa Ziehau }
579772669ffSSepherosa Ziehau
580772669ffSSepherosa Ziehau daddr = reallocf(daddr,
581772669ffSSepherosa Ziehau (ndaddr + info_hdr.ndport) * sizeof(struct sockaddr_in));
582772669ffSSepherosa Ziehau if (daddr == NULL)
583772669ffSSepherosa Ziehau err(1, "reallocf failed");
584772669ffSSepherosa Ziehau
585772669ffSSepherosa Ziehau for (d = ndaddr; d < ndaddr + info_hdr.ndport; ++d) {
586772669ffSSepherosa Ziehau struct sockaddr_in *da = &daddr[d];
587772669ffSSepherosa Ziehau
588772669ffSSepherosa Ziehau *da = *in;
589772669ffSSepherosa Ziehau da->sin_port = ports[d - ndaddr];
590772669ffSSepherosa Ziehau }
591772669ffSSepherosa Ziehau ndaddr += info_hdr.ndport;
592772669ffSSepherosa Ziehau
593772669ffSSepherosa Ziehau free(ports);
594772669ffSSepherosa Ziehau close(s);
595772669ffSSepherosa Ziehau }
596772669ffSSepherosa Ziehau
597772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
598772669ffSSepherosa Ziehau for (i = 0; i < ndaddr; ++i) {
599772669ffSSepherosa Ziehau const struct sockaddr_in *da = &daddr[i];
600772669ffSSepherosa Ziehau char name[INET_ADDRSTRLEN];
601772669ffSSepherosa Ziehau
602772669ffSSepherosa Ziehau fprintf(stderr, "%s:%d\n",
603772669ffSSepherosa Ziehau inet_ntop(AF_INET, &da->sin_addr, name, sizeof(name)),
604772669ffSSepherosa Ziehau ntohs(da->sin_port));
605772669ffSSepherosa Ziehau }
606772669ffSSepherosa Ziehau #endif
607772669ffSSepherosa Ziehau
608772669ffSSepherosa Ziehau *daddr0 = daddr;
609772669ffSSepherosa Ziehau *ndaddr0 = ndaddr;
610772669ffSSepherosa Ziehau }
611772669ffSSepherosa Ziehau
612772669ffSSepherosa Ziehau static void *
send_thread(void * xctx)613772669ffSSepherosa Ziehau send_thread(void *xctx)
614772669ffSSepherosa Ziehau {
615772669ffSSepherosa Ziehau struct send_thrctx *ctx = xctx;
616772669ffSSepherosa Ziehau struct conn_ctx *timeo;
617772669ffSSepherosa Ziehau struct kevent chg_evt;
618772669ffSSepherosa Ziehau uint8_t *buf;
619a06d7982SSepherosa Ziehau int nconn = 0, kq, n, fd = -1, buflen;
620772669ffSSepherosa Ziehau char name[32];
621772669ffSSepherosa Ziehau
622772669ffSSepherosa Ziehau snprintf(name, sizeof(name), "snd%d", ctx->t_id);
623772669ffSSepherosa Ziehau pthread_set_name_np(pthread_self(), name);
624772669ffSSepherosa Ziehau
625a06d7982SSepherosa Ziehau buflen = ctx->t_glob->g_buflen;
626a06d7982SSepherosa Ziehau buf = malloc(buflen);
627772669ffSSepherosa Ziehau if (buf == NULL)
628a06d7982SSepherosa Ziehau err(1, "malloc(%d) failed", buflen);
629772669ffSSepherosa Ziehau
630dbeb2594SSepherosa Ziehau if (ctx->t_glob->g_sendfile) {
631dbeb2594SSepherosa Ziehau char filename[] = "sendtmpXXX";
632dbeb2594SSepherosa Ziehau
633dbeb2594SSepherosa Ziehau fd = mkstemp(filename);
634dbeb2594SSepherosa Ziehau if (fd < 0)
635dbeb2594SSepherosa Ziehau err(1, "mkstemp failed");
636a06d7982SSepherosa Ziehau if (write(fd, buf, buflen) != buflen)
637dbeb2594SSepherosa Ziehau err(1, "write to file failed");
638dbeb2594SSepherosa Ziehau unlink(filename);
639dbeb2594SSepherosa Ziehau free(buf);
640dbeb2594SSepherosa Ziehau buf = NULL;
641dbeb2594SSepherosa Ziehau }
642dbeb2594SSepherosa Ziehau
643772669ffSSepherosa Ziehau kq = kqueue();
644772669ffSSepherosa Ziehau if (kq < 0)
645772669ffSSepherosa Ziehau err(1, "kqueue failed");
646772669ffSSepherosa Ziehau
647772669ffSSepherosa Ziehau /*
648772669ffSSepherosa Ziehau * Establish the connections assigned to us and add the
649772669ffSSepherosa Ziehau * established connections to kqueue.
650772669ffSSepherosa Ziehau */
651772669ffSSepherosa Ziehau for (;;) {
652772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
653772669ffSSepherosa Ziehau char addr_name[INET_ADDRSTRLEN];
654772669ffSSepherosa Ziehau #endif
655772669ffSSepherosa Ziehau struct timeval readto;
656772669ffSSepherosa Ziehau struct conn_ctx *conn;
657772669ffSSepherosa Ziehau struct conn_ack ack;
658772669ffSSepherosa Ziehau int on;
659772669ffSSepherosa Ziehau
660772669ffSSepherosa Ziehau pthread_mutex_lock(&ctx->t_lock);
661772669ffSSepherosa Ziehau while (STAILQ_EMPTY(&ctx->t_conn))
662772669ffSSepherosa Ziehau pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
663772669ffSSepherosa Ziehau conn = STAILQ_FIRST(&ctx->t_conn);
664772669ffSSepherosa Ziehau STAILQ_REMOVE_HEAD(&ctx->t_conn, c_link);
665772669ffSSepherosa Ziehau pthread_mutex_unlock(&ctx->t_lock);
666772669ffSSepherosa Ziehau
667772669ffSSepherosa Ziehau if (conn->c_in.sin_port == 0) {
668772669ffSSepherosa Ziehau /*
669772669ffSSepherosa Ziehau * The marker for 'the end of connection list'.
670772669ffSSepherosa Ziehau * See the related comment in main thread.
671772669ffSSepherosa Ziehau *
672772669ffSSepherosa Ziehau * NOTE:
673772669ffSSepherosa Ziehau * We reuse the marker as the udata for the
674772669ffSSepherosa Ziehau * kqueue timer.
675772669ffSSepherosa Ziehau */
676772669ffSSepherosa Ziehau timeo = conn;
677772669ffSSepherosa Ziehau break;
678772669ffSSepherosa Ziehau }
679772669ffSSepherosa Ziehau
680772669ffSSepherosa Ziehau ++nconn;
681772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
682772669ffSSepherosa Ziehau fprintf(stderr, "%s %s:%d\n", name,
683772669ffSSepherosa Ziehau inet_ntop(AF_INET, &conn->c_in.sin_addr,
684772669ffSSepherosa Ziehau addr_name, sizeof(addr_name)),
685772669ffSSepherosa Ziehau ntohs(conn->c_in.sin_port));
686772669ffSSepherosa Ziehau #endif
687772669ffSSepherosa Ziehau
688772669ffSSepherosa Ziehau again:
689772669ffSSepherosa Ziehau conn->c_s = socket(AF_INET, SOCK_STREAM, 0);
690772669ffSSepherosa Ziehau if (conn->c_s < 0)
691772669ffSSepherosa Ziehau err(1, "socket failed");
692772669ffSSepherosa Ziehau
693772669ffSSepherosa Ziehau if (connect(conn->c_s, (const struct sockaddr *)&conn->c_in,
694772669ffSSepherosa Ziehau sizeof(conn->c_in)) < 0)
695772669ffSSepherosa Ziehau err(1, "connect failed");
696772669ffSSepherosa Ziehau
697772669ffSSepherosa Ziehau memset(&readto, 0, sizeof(readto));
698772669ffSSepherosa Ziehau readto.tv_sec = ctx->t_glob->g_readto_ms / 1000;
699772669ffSSepherosa Ziehau readto.tv_usec = (ctx->t_glob->g_readto_ms % 1000) * 1000;
700772669ffSSepherosa Ziehau if (setsockopt(conn->c_s, SOL_SOCKET, SO_RCVTIMEO, &readto,
701772669ffSSepherosa Ziehau sizeof(readto)) < 0)
702772669ffSSepherosa Ziehau err(1, "setsockopt(RCVTIMEO) failed");
703772669ffSSepherosa Ziehau
704772669ffSSepherosa Ziehau n = read(conn->c_s, &ack, sizeof(ack));
705772669ffSSepherosa Ziehau if (n != sizeof(ack)) {
706772669ffSSepherosa Ziehau if (n < 0) {
707772669ffSSepherosa Ziehau if (errno == EAGAIN) {
708772669ffSSepherosa Ziehau close(conn->c_s);
709772669ffSSepherosa Ziehau goto again;
710772669ffSSepherosa Ziehau }
711772669ffSSepherosa Ziehau err(1, "read ack failed");
712772669ffSSepherosa Ziehau } else {
713772669ffSSepherosa Ziehau errx(1, "read truncated ack");
714772669ffSSepherosa Ziehau }
715772669ffSSepherosa Ziehau }
716772669ffSSepherosa Ziehau
717772669ffSSepherosa Ziehau on = 1;
718772669ffSSepherosa Ziehau if (ioctl(conn->c_s, FIONBIO, &on, sizeof(on)) < 0)
719772669ffSSepherosa Ziehau err(1, "ioctl(FIONBIO) failed");
720772669ffSSepherosa Ziehau
721772669ffSSepherosa Ziehau EV_SET(&chg_evt, conn->c_s, EVFILT_WRITE, EV_ADD, 0, 0, conn);
722772669ffSSepherosa Ziehau n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
723772669ffSSepherosa Ziehau if (n < 0)
724772669ffSSepherosa Ziehau err(1, "kevent add failed");
725772669ffSSepherosa Ziehau }
726772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
727772669ffSSepherosa Ziehau fprintf(stderr, "%s conn %d\n", name, nconn);
728772669ffSSepherosa Ziehau #endif
729772669ffSSepherosa Ziehau
730772669ffSSepherosa Ziehau /*
731772669ffSSepherosa Ziehau * Sender start sync, stage 1:
732772669ffSSepherosa Ziehau * Wait for connections establishment (slow).
733772669ffSSepherosa Ziehau */
734772669ffSSepherosa Ziehau pthread_mutex_lock(&ctx->t_glob->g_lock);
735772669ffSSepherosa Ziehau ctx->t_glob->g_nconn -= nconn;
736772669ffSSepherosa Ziehau pthread_cond_broadcast(&ctx->t_glob->g_cond);
737772669ffSSepherosa Ziehau while (ctx->t_glob->g_nconn != 0)
738772669ffSSepherosa Ziehau pthread_cond_wait(&ctx->t_glob->g_cond, &ctx->t_glob->g_lock);
739772669ffSSepherosa Ziehau pthread_mutex_unlock(&ctx->t_glob->g_lock);
740772669ffSSepherosa Ziehau
741772669ffSSepherosa Ziehau /*
742772669ffSSepherosa Ziehau * Sender start sync, stage2.
743772669ffSSepherosa Ziehau */
744772669ffSSepherosa Ziehau /* Increase the g_nwait. */
745772669ffSSepherosa Ziehau atomic_add_int(&ctx->t_glob->g_nwait, 1);
746772669ffSSepherosa Ziehau /* Spin-wait for main thread to release us (reset g_nwait). */
747772669ffSSepherosa Ziehau while (ctx->t_glob->g_nwait)
748772669ffSSepherosa Ziehau send_spinwait();
749772669ffSSepherosa Ziehau
750772669ffSSepherosa Ziehau #ifdef SEND_DEBUG
751772669ffSSepherosa Ziehau fprintf(stderr, "%s start\n", name);
752772669ffSSepherosa Ziehau #endif
753772669ffSSepherosa Ziehau
754772669ffSSepherosa Ziehau /*
755772669ffSSepherosa Ziehau * Wire a kqueue timer, so that the sending can be terminated
756772669ffSSepherosa Ziehau * as requested.
757772669ffSSepherosa Ziehau *
758772669ffSSepherosa Ziehau * NOTE:
759772669ffSSepherosa Ziehau * Set -2 to c_s for timer udata, so we could distinguish it
760772669ffSSepherosa Ziehau * from real connections.
761772669ffSSepherosa Ziehau */
762772669ffSSepherosa Ziehau timeo->c_s = -2;
763772669ffSSepherosa Ziehau EV_SET(&chg_evt, 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0,
764772669ffSSepherosa Ziehau ctx->t_glob->g_dur * 1000L, timeo);
765772669ffSSepherosa Ziehau n = kevent(kq, &chg_evt, 1, NULL, 0, NULL);
766772669ffSSepherosa Ziehau if (n < 0)
767772669ffSSepherosa Ziehau err(1, "kevent add failed");
768772669ffSSepherosa Ziehau
769772669ffSSepherosa Ziehau clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_start);
770772669ffSSepherosa Ziehau for (;;) {
771772669ffSSepherosa Ziehau struct kevent evt[SEND_EVENT_MAX];
772772669ffSSepherosa Ziehau int nevt, i;
773772669ffSSepherosa Ziehau
774772669ffSSepherosa Ziehau nevt = kevent(kq, NULL, 0, evt, SEND_EVENT_MAX, NULL);
775772669ffSSepherosa Ziehau if (nevt < 0)
776772669ffSSepherosa Ziehau err(1, "kevent failed");
777772669ffSSepherosa Ziehau
778772669ffSSepherosa Ziehau for (i = 0; i < nevt; ++i) {
779772669ffSSepherosa Ziehau struct conn_ctx *conn = evt[i].udata;
780772669ffSSepherosa Ziehau
781772669ffSSepherosa Ziehau if (conn->c_s < 0) {
782772669ffSSepherosa Ziehau if (conn->c_s == -2) {
783772669ffSSepherosa Ziehau /* Timer expired */
784772669ffSSepherosa Ziehau goto done;
785772669ffSSepherosa Ziehau }
786772669ffSSepherosa Ziehau continue;
787772669ffSSepherosa Ziehau }
788772669ffSSepherosa Ziehau
789dbeb2594SSepherosa Ziehau if (fd >= 0) {
790dbeb2594SSepherosa Ziehau off_t m, off;
791dbeb2594SSepherosa Ziehau size_t len;
792dbeb2594SSepherosa Ziehau
793a06d7982SSepherosa Ziehau off = conn->c_stat % buflen;
794a06d7982SSepherosa Ziehau len = buflen - off;
795dbeb2594SSepherosa Ziehau
796dbeb2594SSepherosa Ziehau n = sendfile(fd, conn->c_s, off, len, NULL,
797dbeb2594SSepherosa Ziehau &m, 0);
798dbeb2594SSepherosa Ziehau if (n == 0 || (n < 0 && errno == EAGAIN))
799dbeb2594SSepherosa Ziehau n = m;
800dbeb2594SSepherosa Ziehau } else {
801a06d7982SSepherosa Ziehau n = write(conn->c_s, buf, buflen);
802dbeb2594SSepherosa Ziehau }
803dbeb2594SSepherosa Ziehau
804772669ffSSepherosa Ziehau if (n < 0) {
805772669ffSSepherosa Ziehau if (errno != EAGAIN) {
806772669ffSSepherosa Ziehau conn->c_err = errno;
807772669ffSSepherosa Ziehau clock_gettime(CLOCK_MONOTONIC_PRECISE,
808772669ffSSepherosa Ziehau &conn->c_terr);
809772669ffSSepherosa Ziehau close(conn->c_s);
810772669ffSSepherosa Ziehau conn->c_s = -1;
811772669ffSSepherosa Ziehau }
812772669ffSSepherosa Ziehau } else {
813772669ffSSepherosa Ziehau conn->c_stat += n;
814772669ffSSepherosa Ziehau }
815772669ffSSepherosa Ziehau }
816772669ffSSepherosa Ziehau }
817772669ffSSepherosa Ziehau done:
818772669ffSSepherosa Ziehau clock_gettime(CLOCK_MONOTONIC_PRECISE, &ctx->t_end);
819dbeb2594SSepherosa Ziehau
820dbeb2594SSepherosa Ziehau if (fd >= 0)
821dbeb2594SSepherosa Ziehau close(fd);
822dbeb2594SSepherosa Ziehau if (buf != NULL)
823dbeb2594SSepherosa Ziehau free(buf);
824772669ffSSepherosa Ziehau return NULL;
825772669ffSSepherosa Ziehau }
826