1 #include <sys/types.h> 2 #include <sys/event.h> 3 #include <sys/ioctl.h> 4 #include <sys/socket.h> 5 #include <sys/sysctl.h> 6 7 #include <arpa/inet.h> 8 #include <netinet/in.h> 9 10 #include <err.h> 11 #include <errno.h> 12 #include <pthread.h> 13 #include <pthread_np.h> 14 #include <signal.h> 15 #include <stdio.h> 16 #include <stdint.h> 17 #include <stdlib.h> 18 #include <string.h> 19 #include <unistd.h> 20 21 #include "kq_sendrecv_proto.h" 22 23 #define RECV_EVENT_MAX 64 24 #define RECV_BUFLEN (128 * 1024) 25 26 struct recv_thrctx { 27 int t_id; 28 struct sockaddr_in t_in; 29 30 pthread_mutex_t t_lock; 31 pthread_cond_t t_cond; 32 33 pthread_t t_tid; 34 }; 35 36 static void *recv_thread(void *); 37 38 static void 39 usage(const char *cmd) 40 { 41 fprintf(stderr, "%s [-4 addr4] [-p port] [-t nthreads] [-D]\n", cmd); 42 exit(2); 43 } 44 45 int 46 main(int argc, char *argv[]) 47 { 48 struct recv_thrctx *ctx_arr; 49 struct recv_info *info; 50 struct sockaddr_in in; 51 sigset_t sigset; 52 int opt, s, on, nthr, i, info_sz, do_daemon; 53 size_t sz; 54 55 sigemptyset(&sigset); 56 sigaddset(&sigset, SIGPIPE); 57 if (sigprocmask(SIG_BLOCK, &sigset, NULL) < 0) 58 err(1, "sigprocmask failed"); 59 60 sz = sizeof(nthr); 61 if (sysctlbyname("hw.ncpu", &nthr, &sz, NULL, 0) < 0) 62 err(1, "sysctl hw.ncpu failed"); 63 64 memset(&in, 0, sizeof(in)); 65 in.sin_family = AF_INET; 66 in.sin_addr.s_addr = htonl(INADDR_ANY); 67 in.sin_port = htons(RECV_PORT); 68 69 do_daemon = 1; 70 71 while ((opt = getopt(argc, argv, "4:Dp:t:")) != -1) { 72 switch (opt) { 73 case '4': 74 if (inet_pton(AF_INET, optarg, &in.sin_addr) <= 0) 75 errx(1, "inet_pton failed %s", optarg); 76 break; 77 78 case 'D': 79 do_daemon = 0; 80 break; 81 82 case 'p': 83 in.sin_port = htons(strtoul(optarg, NULL, 10)); 84 break; 85 86 case 't': 87 nthr = strtol(optarg, NULL, 10); 88 if (nthr <= 0) 89 errx(1, "invalid -t"); 90 break; 91 92 default: 93 usage(argv[0]); 94 } 95 } 96 97 s = socket(AF_INET, SOCK_STREAM, 0); 98 if (s < 0) 99 err(1, "socket failed"); 100 101 on = 1; 102 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) 103 err(1, "setsockopt(REUSEPADDR) failed"); 104 105 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) 106 err(1, "bind failed"); 107 108 if (listen(s, -1) < 0) 109 err(1, "listen failed"); 110 111 ctx_arr = calloc(nthr, sizeof(struct recv_thrctx)); 112 if (ctx_arr == NULL) 113 err(1, "calloc failed"); 114 115 info_sz = __offsetof(struct recv_info, dport[nthr]); 116 info = calloc(1, info_sz); 117 if (info == NULL) 118 err(1, "calloc failed"); 119 info->ndport = nthr; 120 121 if (do_daemon) 122 daemon(0, 0); 123 124 pthread_set_name_np(pthread_self(), "main"); 125 126 for (i = 0; i < nthr; ++i) { 127 struct recv_thrctx *ctx = &ctx_arr[i]; 128 int error; 129 130 ctx->t_in = in; 131 ctx->t_in.sin_port = 0; 132 133 ctx->t_id = i; 134 pthread_mutex_init(&ctx->t_lock, NULL); 135 pthread_cond_init(&ctx->t_cond, NULL); 136 137 /* Start receiver */ 138 error = pthread_create(&ctx->t_tid, NULL, recv_thread, ctx); 139 if (error) 140 errc(1, error, "pthread_create %d failed", i); 141 142 /* 143 * Wait for the receiver to select a proper data port 144 * and start a listen socket on the data port. 145 */ 146 pthread_mutex_lock(&ctx->t_lock); 147 while (ctx->t_in.sin_port == 0) 148 pthread_cond_wait(&ctx->t_cond, &ctx->t_lock); 149 pthread_mutex_unlock(&ctx->t_lock); 150 151 info->dport[i] = ctx->t_in.sin_port; 152 } 153 154 /* 155 * Send information, e.g. data ports, back to the clients. 156 */ 157 for (;;) { 158 int s1; 159 160 s1 = accept(s, NULL, NULL); 161 if (s1 < 0) 162 continue; 163 write(s1, info, info_sz); 164 close(s1); 165 } 166 167 /* NEVER REACHED */ 168 exit(0); 169 } 170 171 static void * 172 recv_thread(void *xctx) 173 { 174 struct recv_thrctx *ctx = xctx; 175 struct kevent change_evt0[RECV_EVENT_MAX]; 176 struct conn_ack ack; 177 uint8_t *buf; 178 char name[32]; 179 u_short port; 180 int s, kq, nchange; 181 182 /* 183 * Select a proper data port and create a listen socket on it. 184 */ 185 port = RECV_PORT + ctx->t_id; 186 for (;;) { 187 struct sockaddr_in in = ctx->t_in; 188 int on; 189 190 ++port; 191 if (port < RECV_PORT) 192 errx(1, "failed to find a data port"); 193 in.sin_port = htons(port); 194 195 s = socket(AF_INET, SOCK_STREAM, 0); 196 if (s < 0) 197 err(1, "socket failed"); 198 199 on = 1; 200 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) 201 err(1, "setsockopt(REUSEADDR) failed"); 202 203 on = 1; 204 if (ioctl(s, FIONBIO, &on, sizeof(on)) < 0) 205 err(1, "ioctl(FIONBIO) failed"); 206 207 if (bind(s, (const struct sockaddr *)&in, sizeof(in)) < 0) { 208 close(s); 209 continue; 210 } 211 212 if (listen(s, -1) < 0) 213 err(1, "listen failed"); 214 215 break; 216 } 217 218 kq = kqueue(); 219 if (kq < 0) 220 err(1, "kqueue failed"); 221 222 buf = malloc(RECV_BUFLEN); 223 if (buf == NULL) 224 err(1, "malloc %d failed", RECV_BUFLEN); 225 226 memset(&ack, 0, sizeof(ack)); 227 228 snprintf(name, sizeof(name), "rcv%d %d", ctx->t_id, port); 229 pthread_set_name_np(pthread_self(), name); 230 231 /* 232 * Inform the main thread that we are ready. 233 */ 234 pthread_mutex_lock(&ctx->t_lock); 235 ctx->t_in.sin_port = htons(port); 236 pthread_mutex_unlock(&ctx->t_lock); 237 pthread_cond_signal(&ctx->t_cond); 238 239 EV_SET(&change_evt0[0], s, EVFILT_READ, EV_ADD, 0, 0, NULL); 240 nchange = 1; 241 242 for (;;) { 243 const struct kevent *change_evt = NULL; 244 struct kevent evt[RECV_EVENT_MAX]; 245 int nevt, i; 246 247 if (nchange > 0) 248 change_evt = change_evt0; 249 250 nevt = kevent(kq, change_evt, nchange, evt, RECV_EVENT_MAX, 251 NULL); 252 if (nevt < 0) 253 err(1, "kevent failed"); 254 nchange = 0; 255 256 for (i = 0; i < nevt; ++i) { 257 int n; 258 259 if (evt[i].ident == (u_int)s) { 260 while (nchange < RECV_EVENT_MAX) { 261 int s1; 262 263 s1 = accept(s, NULL, NULL); 264 if (s1 < 0) 265 break; 266 267 /* TODO: keepalive */ 268 269 n = write(s1, &ack, sizeof(ack)); 270 if (n != sizeof(ack)) { 271 close(s1); 272 continue; 273 } 274 275 EV_SET(&change_evt0[nchange], s1, 276 EVFILT_READ, EV_ADD, 0, 0, NULL); 277 ++nchange; 278 } 279 } else { 280 n = read(evt[i].ident, buf, RECV_BUFLEN); 281 if (n <= 0) { 282 if (n == 0 || errno != EAGAIN) 283 close(evt[i].ident); 284 } 285 } 286 } 287 } 288 289 /* NEVER REACHED */ 290 return NULL; 291 } 292