1 #include <sys/types.h> 2 #include <sys/event.h> 3 #include <sys/ioctl.h> 4 #include <sys/socket.h> 5 #include <sys/sysctl.h> 6 7 #include <arpa/inet.h> 8 #include <netinet/in.h> 9 10 #include <err.h> 11 #include <errno.h> 12 #include <pthread.h> 13 #include <pthread_np.h> 14 #include <signal.h> 15 #include <stdio.h> 16 #include <stdint.h> 17 #include <stdlib.h> 18 #include <string.h> 19 #include <unistd.h> 20 21 #include "kq_sendrecv_proto.h" 22 23 #define RECV_EVENT_MAX 64 24 #define RECV_BUFLEN (128 * 1024) 25 26 struct recv_thrctx { 27 int t_id; 28 struct sockaddr_in t_in; 29 30 pthread_mutex_t t_lock; 31 pthread_cond_t t_cond; 32 33 pthread_t t_tid; 34 }; 35 36 static void *recv_thread(void *); 37 38 static int recv_buflen = RECV_BUFLEN; 39 static int recv_reuseport = 0; 40 static int recv_bindcpu = 0; 41 42 static void 43 usage(const char *cmd) 44 { 45 fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D] [-R] [-B] " 46 "[-b buflen]\n", cmd); 47 exit(2); 48 } 49 50 int 51 main(int argc, char *argv[]) 52 { 53 struct recv_thrctx *ctx_arr; 54 struct recv_info *info; 55 struct sockaddr_in in; 56 sigset_t sigset; 57 int opt, s, on, nthr, i, info_sz, do_daemon; 58 size_t sz; 59 60 sigemptyset(&sigset); 61 sigaddset(&sigset, SIGPIPE); 62 if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0) 63 err(1, "sigprocmask failed"); 64 65 sz = sizeof(nthr); 66 if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0) 67 err(1, "sysctl hw.ncpu failed"); 68 69 memset(&in, 0, sizeof(in)); 70 in.sin_family = AF_INET; 71 in.sin_addr.s_addr = htonl(INADDR_ANY); 72 in.sin_port = htons(RECV_PORT); 73 74 do_daemon = 1; 75 76 while ((opt = getopt(argc, argv, "4:BDRb:p:t:")) != -1) { 77 switch (opt) { 78 case '4': 79 if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0) 80 errx(1, "inet_pton failed %s", optarg); 81 break; 82 83 case 'B': 84 recv_bindcpu = 1; 85 break; 86 87 case 'D': 88 do_daemon = 0; 89 break; 90 91 case 'R': 92 #ifdef __DragonFly__ 93 recv_reuseport = 1; 94 #else 95 /* Not supported on other BSDs */ 96 #endif 97 break; 98 99 case 'b': 100 recv_buflen = strtol(optarg, NULL, 10); 101 if (recv_buflen <= 0) 102 errx(1, "invalid -b"); 103 break; 104 105 case 'p': 106 in.sin_port = htons(strtoul(optarg, NULL, 10)); 107 break; 108 109 case 't': 110 nthr = strtol(optarg, NULL, 10); 111 if (nthr <= 0) 112 errx(1, "invalid -t"); 113 break; 114 115 default: 116 usage(argv[0]); 117 } 118 } 119 120 s = socket(AF_INET, SOCK_STREAM, 0); 121 if (s < 0) 122 err(1, "socket failed"); 123 124 on = 1; 125 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) 126 err(1, "setsockopt(REUSEPADDR) failed"); 127 128 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) 129 err(1, "bind failed"); 130 131 if (listen(s, -1) < 0) 132 err(1, "listen failed"); 133 134 ctx_arr = calloc(nthr, sizeof(struct recv_thrctx)); 135 if (ctx_arr == NULL) 136 err(1, "calloc failed"); 137 138 info_sz = __offsetof(struct recv_info, dport[nthr]); 139 info = calloc(1, info_sz); 140 if (info == NULL) 141 err(1, "calloc failed"); 142 info->ndport = nthr; 143 144 if (do_daemon) 145 daemon(0, 0); 146 147 pthread_set_name_np(pthread_self(), "main"); 148 149 for (i = 0; i < nthr; ++i) { 150 struct recv_thrctx *ctx = &ctx_arr[i]; 151 int error; 152 153 ctx->t_in = in; 154 ctx->t_in.sin_port = 0; 155 156 ctx->t_id = i; 157 pthread_mutex_init(&ctx->t_lock, NULL); 158 pthread_cond_init(&ctx->t_cond, NULL); 159 160 /* Start receiver */ 161 error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx); 162 if (error) 163 errc(1, error, "pthread_create %d failed", i); 164 165 /* 166 * Wait for the receiver to select a proper data port 167 * and start a listen socket on the data port. 168 */ 169 pthread_mutex_lock(&ctx->t_lock); 170 while (ctx->t_in.sin_port == 0) 171 pthread_cond_wait(&ctx->t_cond, &ctx->t_lock); 172 pthread_mutex_unlock(&ctx->t_lock); 173 174 info->dport[i] = ctx->t_in.sin_port; 175 } 176 177 /* 178 * Send information, e.g. data ports, back to the clients. 179 */ 180 for (;;) { 181 int s1; 182 183 s1 = accept(s, NULL, NULL); 184 if (s1 < 0) 185 continue; 186 write(s1, info, info_sz); 187 close(s1); 188 } 189 190 /* NEVER REACHED */ 191 exit(0); 192 } 193 194 static void * 195 recv_thread(void *xctx) 196 { 197 struct recv_thrctx *ctx = xctx; 198 struct kevent change_evt0[RECV_EVENT_MAX]; 199 struct conn_ack ack; 200 uint8_t *buf; 201 char name[32]; 202 u_short port; 203 int s, kq, nchange; 204 205 /* 206 * Select a proper data port and create a listen socket on it. 207 */ 208 if (recv_reuseport) 209 port = RECV_PORT; 210 else 211 port = RECV_PORT + ctx->t_id; 212 for (;;) { 213 struct sockaddr_in in = ctx->t_in; 214 int on; 215 216 ++port; 217 if (port < RECV_PORT) 218 errx(1, "failed to find a data port"); 219 in.sin_port = htons(port); 220 221 s = socket(AF_INET, SOCK_STREAM, 0); 222 if (s < 0) 223 err(1, "socket failed"); 224 225 on = 1; 226 if (recv_reuseport) { 227 if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &on, 228 sizeof(on))) 229 err(1, "setsockopt(REUSEPORT) failed"); 230 } else { 231 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, 232 sizeof(on))) 233 err(1, "setsockopt(REUSEADDR) failed"); 234 } 235 236 on = 1; 237 if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0) 238 err(1, "ioctl(FIONBIO) failed"); 239 240 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) { 241 close(s); 242 continue; 243 } 244 245 if (listen(s, -1) < 0) 246 err(1, "listen failed"); 247 248 if (recv_bindcpu) { 249 int cpu = -1, error; 250 #ifdef __FreeBSD__ 251 cpuset_t mask; 252 #else 253 cpu_set_t mask; 254 #endif 255 256 #ifdef __DragonFly__ 257 if (recv_reuseport) { 258 socklen_t olen; 259 260 olen = sizeof(cpu); 261 if (getsockopt(s, SOL_SOCKET, SO_CPUHINT, 262 &cpu, &olen) < 0) 263 err(1, "getsockopt(CPUHINT) failed"); 264 } 265 #endif 266 if (cpu < 0) { 267 int ncpus; 268 size_t len; 269 270 len = sizeof(ncpus); 271 if (sysctlbyname("hw.ncpu", &ncpus, &len, 272 NULL, 0) < 0) 273 err(1, "sysctlbyname hw.ncpu failed"); 274 cpu = ctx->t_id % ncpus; 275 } 276 277 CPU_ZERO(&mask); 278 CPU_SET(cpu, &mask); 279 error = pthread_setaffinity_np(pthread_self(), 280 sizeof(mask), &mask); 281 if (error) { 282 errc(1, error, "pthread_setaffinity_np cpu%d " 283 "failed", cpu); 284 } 285 } 286 break; 287 } 288 289 kq = kqueue(); 290 if (kq < 0) 291 err(1, "kqueue failed"); 292 293 buf = malloc(recv_buflen); 294 if (buf == NULL) 295 err(1, "malloc %d failed", recv_buflen); 296 297 memset(&ack, 0, sizeof(ack)); 298 299 snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port); 300 pthread_set_name_np(pthread_self(), name); 301 302 /* 303 * Inform the main thread that we are ready. 304 */ 305 pthread_mutex_lock(&ctx->t_lock); 306 ctx->t_in.sin_port = htons(port); 307 pthread_mutex_unlock(&ctx->t_lock); 308 pthread_cond_signal(&ctx->t_cond); 309 310 EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL); 311 nchange = 1; 312 313 for (;;) { 314 const struct kevent *change_evt = NULL; 315 struct kevent evt[RECV_EVENT_MAX]; 316 int nevt, i; 317 318 if (nchange > 0) 319 change_evt = change_evt0; 320 321 nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX, 322 NULL); 323 if (nevt < 0) 324 err(1, "kevent failed"); 325 nchange = 0; 326 327 for (i = 0; i < nevt; ++i) { 328 int n; 329 330 if (evt[i].ident == (u_int)s) { 331 while (nchange < RECV_EVENT_MAX) { 332 int s1; 333 334 s1 = accept(s, NULL, NULL); 335 if (s1 < 0) 336 break; 337 338 /* TODO: keepalive */ 339 340 n = write(s1, &ack, sizeof(ack)); 341 if (n != sizeof(ack)) { 342 close(s1); 343 continue; 344 } 345 346 EV_SET(&change_evt0[nchange], s1, 347 EVFILT_READ, EV_ADD, 0, 0, NULL); 348 ++nchange; 349 } 350 } else { 351 n = read(evt[i].ident, buf, recv_buflen); 352 if (n <= 0) { 353 if (n == 0 || errno != EAGAIN) 354 close(evt[i].ident); 355 } 356 } 357 } 358 } 359 360 /* NEVER REACHED */ 361 return NULL; 362 } 363