1 #include <sys/types.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/socket.h>
5 #include <sys/sysctl.h>
6 
7 #include <arpa/inet.h>
8 #include <netinet/in.h>
9 
10 #include <err.h>
11 #include <errno.h>
12 #include <pthread.h>
13 #include <pthread_np.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <stdint.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
20 
21 #include "kq_sendrecv_proto.h"
22 
23 #define RECV_EVENT_MAX		64
24 #define RECV_BUFLEN		(128 * 1024)
25 
26 struct recv_thrctx {
27 	int			t_id;
28 	struct sockaddr_in	t_in;
29 
30 	pthread_mutex_t		t_lock;
31 	pthread_cond_t		t_cond;
32 
33 	pthread_t		t_tid;
34 };
35 
36 static void	*recv_thread(void *);
37 
38 static int	recv_buflen = RECV_BUFLEN;
39 static int	recv_reuseport = 0;
40 static int	recv_bindcpu = 0;
41 
42 static void
43 usage(const char *cmd)
44 {
45 	fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D] [-R] [-B] "
46 	    "[-b buflen]\n", cmd);
47 	exit(2);
48 }
49 
50 int
51 main(int argc, char *argv[])
52 {
53 	struct recv_thrctx *ctx_arr;
54 	struct recv_info *info;
55 	struct sockaddr_in in;
56 	sigset_t sigset;
57 	int opt, s, on, nthr, i, info_sz, do_daemon;
58 	size_t sz;
59 
60 	sigemptyset(&sigset);
61 	sigaddset(&sigset, SIGPIPE);
62 	if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
63 		err(1, "sigprocmask failed");
64 
65 	sz = sizeof(nthr);
66 	if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
67 		err(1, "sysctl hw.ncpu failed");
68 
69 	memset(&in, 0, sizeof(in));
70 	in.sin_family = AF_INET;
71 	in.sin_addr.s_addr = htonl(INADDR_ANY);
72 	in.sin_port = htons(RECV_PORT);
73 
74 	do_daemon = 1;
75 
76 	while ((opt = getopt(argc, argv, "4:BDRb:p:t:")) != -1) {
77 		switch (opt) {
78 		case '4':
79 			if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0)
80 				errx(1, "inet_pton failed %s", optarg);
81 			break;
82 
83 		case 'B':
84 			recv_bindcpu = 1;
85 			break;
86 
87 		case 'D':
88 			do_daemon = 0;
89 			break;
90 
91 		case 'R':
92 #ifdef __DragonFly__
93 			recv_reuseport = 1;
94 #else
95 			/* Not supported on other BSDs */
96 #endif
97 			break;
98 
99 		case 'b':
100 			recv_buflen = strtol(optarg, NULL, 10);
101 			if (recv_buflen <= 0)
102 				errx(1, "invalid -b");
103 			break;
104 
105 		case 'p':
106 			in.sin_port = htons(strtoul(optarg, NULL, 10));
107 			break;
108 
109 		case 't':
110 			nthr = strtol(optarg, NULL, 10);
111 			if (nthr <= 0)
112 				errx(1, "invalid -t");
113 			break;
114 
115 		default:
116 			usage(argv[0]);
117 		}
118 	}
119 
120 	s = socket(AF_INET, SOCK_STREAM, 0);
121 	if (s < 0)
122 		err(1, "socket failed");
123 
124 	on = 1;
125 	if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
126 		err(1, "setsockopt(REUSEPADDR) failed");
127 
128 	if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0)
129 		err(1, "bind failed");
130 
131 	if (listen(s, -1) < 0)
132 		err(1, "listen failed");
133 
134 	ctx_arr = calloc(nthr, sizeof(struct recv_thrctx));
135 	if (ctx_arr == NULL)
136 		err(1, "calloc failed");
137 
138 	info_sz = __offsetof(struct recv_info, dport[nthr]);
139 	info = calloc(1, info_sz);
140 	if (info == NULL)
141 		err(1, "calloc failed");
142 	info->ndport = nthr;
143 
144 	if (do_daemon)
145 		daemon(0, 0);
146 
147 	pthread_set_name_np(pthread_self(), "main");
148 
149 	for (i = 0; i < nthr; ++i) {
150 		struct recv_thrctx *ctx = &ctx_arr[i];
151 		int error;
152 
153 		ctx->t_in = in;
154 		ctx->t_in.sin_port = 0;
155 
156 		ctx->t_id = i;
157 		pthread_mutex_init(&ctx->t_lock, NULL);
158 		pthread_cond_init(&ctx->t_cond, NULL);
159 
160 		/* Start receiver */
161 		error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx);
162 		if (error)
163 			errc(1, error, "pthread_create %d failed", i);
164 
165 		/*
166 		 * Wait for the receiver to select a proper data port
167 		 * and start a listen socket on the data port.
168 		 */
169 		pthread_mutex_lock(&ctx->t_lock);
170 		while (ctx->t_in.sin_port == 0)
171 			pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
172 		pthread_mutex_unlock(&ctx->t_lock);
173 
174 		info->dport[i] = ctx->t_in.sin_port;
175 	}
176 
177 	/*
178 	 * Send information, e.g. data ports, back to the clients.
179 	 */
180 	for (;;) {
181 		int s1;
182 
183 		s1 = accept(s, NULL, NULL);
184 		if (s1 < 0)
185 			continue;
186 		write(s1, info, info_sz);
187 		close(s1);
188 	}
189 
190 	/* NEVER REACHED */
191 	exit(0);
192 }
193 
194 static void *
195 recv_thread(void *xctx)
196 {
197 	struct recv_thrctx *ctx = xctx;
198 	struct kevent change_evt0[RECV_EVENT_MAX];
199 	struct conn_ack ack;
200 	uint8_t *buf;
201 	char name[32];
202 	u_short port;
203 	int s, kq, nchange;
204 
205 	/*
206 	 * Select a proper data port and create a listen socket on it.
207 	 */
208 	if (recv_reuseport)
209 		port = RECV_PORT;
210 	else
211 		port = RECV_PORT + ctx->t_id;
212 	for (;;) {
213 		struct sockaddr_in in = ctx->t_in;
214 		int on;
215 
216 		++port;
217 		if (port < RECV_PORT)
218 			errx(1, "failed to find a data port");
219 		in.sin_port = htons(port);
220 
221 		s = socket(AF_INET, SOCK_STREAM, 0);
222 		if (s < 0)
223 			err(1, "socket failed");
224 
225 		on = 1;
226 		if (recv_reuseport) {
227 			if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &on,
228 			    sizeof(on)))
229 				err(1, "setsockopt(REUSEPORT) failed");
230 		} else {
231 			if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on,
232 			    sizeof(on)))
233 				err(1, "setsockopt(REUSEADDR) failed");
234 		}
235 
236 		on = 1;
237 		if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0)
238 			err(1, "ioctl(FIONBIO) failed");
239 
240 		if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) {
241 			close(s);
242 			continue;
243 		}
244 
245 		if (listen(s, -1) < 0)
246 			err(1, "listen failed");
247 
248 		if (recv_bindcpu) {
249 			int cpu = -1, error;
250 #ifdef __FreeBSD__
251 			cpuset_t mask;
252 #else
253 			cpu_set_t mask;
254 #endif
255 
256 #ifdef __DragonFly__
257 			if (recv_reuseport) {
258 				socklen_t olen;
259 
260 				olen = sizeof(cpu);
261 				if (getsockopt(s, SOL_SOCKET, SO_CPUHINT,
262 				    &cpu, &olen) < 0)
263 					err(1, "getsockopt(CPUHINT) failed");
264 			}
265 #endif
266 			if (cpu < 0) {
267 				int ncpus;
268 				size_t len;
269 
270 				len = sizeof(ncpus);
271 				if (sysctlbyname("hw.ncpu", &ncpus, &len,
272 				    NULL, 0) < 0)
273 					err(1, "sysctlbyname hw.ncpu failed");
274 				cpu = ctx->t_id % ncpus;
275 			}
276 
277 			CPU_ZERO(&mask);
278 			CPU_SET(cpu, &mask);
279 			error = pthread_setaffinity_np(pthread_self(),
280 			    sizeof(mask), &mask);
281 			if (error) {
282 				errc(1, error, "pthread_setaffinity_np cpu%d "
283 				    "failed", cpu);
284 			}
285 		}
286 		break;
287 	}
288 
289 	kq = kqueue();
290 	if (kq < 0)
291 		err(1, "kqueue failed");
292 
293 	buf = malloc(recv_buflen);
294 	if (buf == NULL)
295 		err(1, "malloc %d failed", recv_buflen);
296 
297 	memset(&ack, 0, sizeof(ack));
298 
299 	snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port);
300 	pthread_set_name_np(pthread_self(), name);
301 
302 	/*
303 	 * Inform the main thread that we are ready.
304 	 */
305 	pthread_mutex_lock(&ctx->t_lock);
306 	ctx->t_in.sin_port = htons(port);
307 	pthread_mutex_unlock(&ctx->t_lock);
308 	pthread_cond_signal(&ctx->t_cond);
309 
310 	EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
311 	nchange = 1;
312 
313 	for (;;) {
314 		const struct kevent *change_evt = NULL;
315 		struct kevent evt[RECV_EVENT_MAX];
316 		int nevt, i;
317 
318 		if (nchange > 0)
319 			change_evt = change_evt0;
320 
321 		nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX,
322 		    NULL);
323 		if (nevt < 0)
324 			err(1, "kevent failed");
325 		nchange = 0;
326 
327 		for (i = 0; i < nevt; ++i) {
328 			int n;
329 
330 			if (evt[i].ident == (u_int)s) {
331 				while (nchange < RECV_EVENT_MAX) {
332 					int s1;
333 
334 					s1 = accept(s, NULL, NULL);
335 					if (s1 < 0)
336 						break;
337 
338 					/* TODO: keepalive */
339 
340 					n = write(s1, &ack, sizeof(ack));
341 					if (n != sizeof(ack)) {
342 						close(s1);
343 						continue;
344 					}
345 
346 					EV_SET(&change_evt0[nchange], s1,
347 					    EVFILT_READ, EV_ADD, 0, 0, NULL);
348 					++nchange;
349 				}
350 			} else {
351 				n = read(evt[i].ident, buf, recv_buflen);
352 				if (n <= 0) {
353 					if (n == 0 || errno != EAGAIN)
354 						close(evt[i].ident);
355 				}
356 			}
357 		}
358 	}
359 
360 	/* NEVER REACHED */
361 	return NULL;
362 }
363