1 #include <sys/types.h>
2 #include <sys/event.h>
3 #include <sys/ioctl.h>
4 #include <sys/socket.h>
5 #include <sys/sysctl.h>
6 
7 #include <arpa/inet.h>
8 #include <netinet/in.h>
9 
10 #include <err.h>
11 #include <errno.h>
12 #include <pthread.h>
13 #include <pthread_np.h>
14 #include <signal.h>
15 #include <stdio.h>
16 #include <stdint.h>
17 #include <stdlib.h>
18 #include <string.h>
19 #include <unistd.h>
20 
21 #include "kq_sendrecv_proto.h"
22 
23 #define RECV_EVENT_MAX		64
24 #define RECV_BUFLEN		(128 * 1024)
25 
26 struct recv_thrctx {
27 	int			t_id;
28 	struct sockaddr_in	t_in;
29 
30 	pthread_mutex_t		t_lock;
31 	pthread_cond_t		t_cond;
32 
33 	pthread_t		t_tid;
34 };
35 
36 static void	*recv_thread(void *);
37 
38 static void
39 usage(const char *cmd)
40 {
41 	fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D]\n", cmd);
42 	exit(2);
43 }
44 
45 int
46 main(int argc, char *argv[])
47 {
48 	struct recv_thrctx *ctx_arr;
49 	struct recv_info *info;
50 	struct sockaddr_in in;
51 	sigset_t sigset;
52 	int opt, s, on, nthr, i, info_sz, do_daemon;
53 	size_t sz;
54 
55 	sigemptyset(&sigset);
56 	sigaddset(&sigset, SIGPIPE);
57 	if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0)
58 		err(1, "sigprocmask failed");
59 
60 	sz = sizeof(nthr);
61 	if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0)
62 		err(1, "sysctl hw.ncpu failed");
63 
64 	memset(&in, 0, sizeof(in));
65 	in.sin_family = AF_INET;
66 	in.sin_addr.s_addr = htonl(INADDR_ANY);
67 	in.sin_port = htons(RECV_PORT);
68 
69 	do_daemon = 1;
70 
71 	while ((opt = getopt(argc, argv, "4:Dp:t:")) != -1) {
72 		switch (opt) {
73 		case '4':
74 			if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0)
75 				errx(1, "inet_pton failed %s", optarg);
76 			break;
77 
78 		case 'D':
79 			do_daemon = 0;
80 			break;
81 
82 		case 'p':
83 			in.sin_port = htons(strtoul(optarg, NULL, 10));
84 			break;
85 
86 		case 't':
87 			nthr = strtol(optarg, NULL, 10);
88 			if (nthr <= 0)
89 				errx(1, "invalid -t");
90 			break;
91 
92 		default:
93 			usage(argv[0]);
94 		}
95 	}
96 
97 	s = socket(AF_INET, SOCK_STREAM, 0);
98 	if (s < 0)
99 		err(1, "socket failed");
100 
101 	on = 1;
102 	if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
103 		err(1, "setsockopt(REUSEPADDR) failed");
104 
105 	if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0)
106 		err(1, "bind failed");
107 
108 	if (listen(s, -1) < 0)
109 		err(1, "listen failed");
110 
111 	ctx_arr = calloc(nthr, sizeof(struct recv_thrctx));
112 	if (ctx_arr == NULL)
113 		err(1, "calloc failed");
114 
115 	info_sz = __offsetof(struct recv_info, dport[nthr]);
116 	info = calloc(1, info_sz);
117 	if (info == NULL)
118 		err(1, "calloc failed");
119 	info->ndport = nthr;
120 
121 	if (do_daemon)
122 		daemon(0, 0);
123 
124 	pthread_set_name_np(pthread_self(), "main");
125 
126 	for (i = 0; i < nthr; ++i) {
127 		struct recv_thrctx *ctx = &ctx_arr[i];
128 		int error;
129 
130 		ctx->t_in = in;
131 		ctx->t_in.sin_port = 0;
132 
133 		ctx->t_id = i;
134 		pthread_mutex_init(&ctx->t_lock, NULL);
135 		pthread_cond_init(&ctx->t_cond, NULL);
136 
137 		/* Start receiver */
138 		error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx);
139 		if (error)
140 			errc(1, error, "pthread_create %d failed", i);
141 
142 		/*
143 		 * Wait for the receiver to select a proper data port
144 		 * and start a listen socket on the data port.
145 		 */
146 		pthread_mutex_lock(&ctx->t_lock);
147 		while (ctx->t_in.sin_port == 0)
148 			pthread_cond_wait(&ctx->t_cond, &ctx->t_lock);
149 		pthread_mutex_unlock(&ctx->t_lock);
150 
151 		info->dport[i] = ctx->t_in.sin_port;
152 	}
153 
154 	/*
155 	 * Send information, e.g. data ports, back to the clients.
156 	 */
157 	for (;;) {
158 		int s1;
159 
160 		s1 = accept(s, NULL, NULL);
161 		if (s1 < 0)
162 			continue;
163 		write(s1, info, info_sz);
164 		close(s1);
165 	}
166 
167 	/* NEVER REACHED */
168 	exit(0);
169 }
170 
171 static void *
172 recv_thread(void *xctx)
173 {
174 	struct recv_thrctx *ctx = xctx;
175 	struct kevent change_evt0[RECV_EVENT_MAX];
176 	struct conn_ack ack;
177 	uint8_t *buf;
178 	char name[32];
179 	u_short port;
180 	int s, kq, nchange;
181 
182 	/*
183 	 * Select a proper data port and create a listen socket on it.
184 	 */
185 	port = RECV_PORT + ctx->t_id;
186 	for (;;) {
187 		struct sockaddr_in in = ctx->t_in;
188 		int on;
189 
190 		++port;
191 		if (port < RECV_PORT)
192 			errx(1, "failed to find a data port");
193 		in.sin_port = htons(port);
194 
195 		s = socket(AF_INET, SOCK_STREAM, 0);
196 		if (s < 0)
197 			err(1, "socket failed");
198 
199 		on = 1;
200 		if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
201 			err(1, "setsockopt(REUSEADDR) failed");
202 
203 		on = 1;
204 		if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0)
205 			err(1, "ioctl(FIONBIO) failed");
206 
207 		if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) {
208 			close(s);
209 			continue;
210 		}
211 
212 		if (listen(s, -1) < 0)
213 			err(1, "listen failed");
214 
215 		break;
216 	}
217 
218 	kq = kqueue();
219 	if (kq < 0)
220 		err(1, "kqueue failed");
221 
222 	buf = malloc(RECV_BUFLEN);
223 	if (buf == NULL)
224 		err(1, "malloc %d failed", RECV_BUFLEN);
225 
226 	memset(&ack, 0, sizeof(ack));
227 
228 	snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port);
229 	pthread_set_name_np(pthread_self(), name);
230 
231 	/*
232 	 * Inform the main thread that we are ready.
233 	 */
234 	pthread_mutex_lock(&ctx->t_lock);
235 	ctx->t_in.sin_port = htons(port);
236 	pthread_mutex_unlock(&ctx->t_lock);
237 	pthread_cond_signal(&ctx->t_cond);
238 
239 	EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL);
240 	nchange = 1;
241 
242 	for (;;) {
243 		const struct kevent *change_evt = NULL;
244 		struct kevent evt[RECV_EVENT_MAX];
245 		int nevt, i;
246 
247 		if (nchange > 0)
248 			change_evt = change_evt0;
249 
250 		nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX,
251 		    NULL);
252 		if (nevt < 0)
253 			err(1, "kevent failed");
254 		nchange = 0;
255 
256 		for (i = 0; i < nevt; ++i) {
257 			int n;
258 
259 			if (evt[i].ident == (u_int)s) {
260 				while (nchange < RECV_EVENT_MAX) {
261 					int s1;
262 
263 					s1 = accept(s, NULL, NULL);
264 					if (s1 < 0)
265 						break;
266 
267 					/* TODO: keepalive */
268 
269 					n = write(s1, &ack, sizeof(ack));
270 					if (n != sizeof(ack)) {
271 						close(s1);
272 						continue;
273 					}
274 
275 					EV_SET(&change_evt0[nchange], s1,
276 					    EVFILT_READ, EV_ADD, 0, 0, NULL);
277 					++nchange;
278 				}
279 			} else {
280 				n = read(evt[i].ident, buf, RECV_BUFLEN);
281 				if (n <= 0) {
282 					if (n == 0 || errno != EAGAIN)
283 						close(evt[i].ident);
284 				}
285 			}
286 		}
287 	}
288 
289 	/* NEVER REACHED */
290 	return NULL;
291 }
292