1 /*
2 * aprsc
3 *
4 * (c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
5 *
6 * This program is licensed under the BSD license, which can be found
7 * in the file LICENSE.
8 */
9
10 /*
11 * accept.c: the connection accepting thread
12 */
13
14 #include <stdio.h>
15 #include <errno.h>
16 #include <string.h>
17 #include <signal.h>
18 #include <time.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <sys/types.h>
23 #include <sys/poll.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <arpa/inet.h>
28 #include <netdb.h>
29
30 #include "accept.h"
31 #include "config.h"
32 #include "cfgfile.h"
33 #include "hlog.h"
34 #include "hmalloc.h"
35 #include "netlib.h"
36 #include "worker.h"
37 #include "dupecheck.h"
38 #include "filter.h"
39 #include "login.h"
40 #include "http.h"
41 #include "incoming.h" /* incoming_handler prototype */
42 #include "uplink.h"
43 #include "status.h"
44 #include "clientlist.h"
45 #include "client_heard.h"
46 #include "keyhash.h"
47 #include "ssl.h"
48 #include "sctp.h"
49
50 #ifdef USE_SCTP
51 #include <netinet/sctp.h>
52 #endif
53
54 static struct listen_t *listen_list;
55
56 // pthread_mutex_t mt_servercount = PTHREAD_MUTEX_INITIALIZER;
57
58 int accept_shutting_down;
59 int accept_reconfiguring;
60 time_t accept_reconfigure_after_tick = 0;
61
62 /* pseudoworker + pseudoclient for incoming UDP packets */
63 struct worker_t *udp_worker = NULL;
64 struct client_t *udp_pseudoclient = NULL;
65
66 /* structure allocator/free */
67
listener_alloc(void)68 static struct listen_t *listener_alloc(void)
69 {
70 struct listen_t *l = hmalloc(sizeof(*l));
71 memset( l, 0, sizeof(*l) );
72 l->fd = -1;
73 l->id = -1;
74 l->listener_id = -1;
75
76 return l;
77 }
78
listener_free(struct listen_t * l)79 static void listener_free(struct listen_t *l)
80 {
81 int i;
82
83 hlog(LOG_DEBUG, "Freeing listener %d '%s': %s", l->id, l->name, l->addr_s);
84
85 if (l->udp) {
86 l->fd = -1;
87
88 client_udp_free(l->udp);
89 }
90
91 port_accounter_drop(l->portaccount); /* The last reference, perhaps. */
92
93 if (l->fd >= 0) close(l->fd);
94 if (l->addr_s) hfree(l->addr_s);
95 if (l->name) hfree(l->name);
96
97 for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i)
98 if (l->filters[i])
99 hfree(l->filters[i]);
100
101 if (l->filter_s)
102 hfree(l->filter_s);
103
104 if (l->acl)
105 acl_free(l->acl);
106
107 /* merge listener list around this node */
108 if (l->next)
109 l->next->prevp = l->prevp;
110 if (l->prevp)
111 *(l->prevp) = l->next;
112
113 hfree(l);
114 }
115
116 /*
117 * Copy / duplicate filters from listener config to an actual listener
118 * Free old filters if any are set.
119 */
120
listener_copy_filters(struct listen_t * l,struct listen_config_t * lc)121 static void listener_copy_filters(struct listen_t *l, struct listen_config_t *lc)
122 {
123 int i;
124 char filter_s[FILTER_S_SIZE] = "";
125 int filter_s_l = 0;
126
127 for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) {
128 if (l->filters[i]) {
129 hfree(l->filters[i]);
130 l->filters[i] = NULL;
131 }
132
133 if (i < (sizeof(lc->filters)/sizeof(lc->filters[0]))) {
134 if (!lc->filters[i])
135 continue;
136
137 l->filters[i] = hstrdup(lc->filters[i]);
138
139 int len = strlen(l->filters[i]);
140 if (filter_s_l + len + 2 < FILTER_S_SIZE) {
141 if (filter_s_l)
142 filter_s[filter_s_l++] = ' ';
143
144 memcpy(filter_s + filter_s_l, l->filters[i], len);
145 filter_s_l += len;
146 filter_s[filter_s_l] = 0;
147 }
148 }
149 }
150
151 if (l->filter_s) {
152 hfree(l->filter_s);
153 l->filter_s = NULL;
154 }
155
156 if (filter_s_l == 0)
157 return;
158
159 sanitize_ascii_string(filter_s);
160 l->filter_s = hstrdup(filter_s);
161 }
162
163 /*
164 * Open the TCP/SCTP listening socket
165 */
166
open_tcp_listener(struct listen_t * l,const struct addrinfo * ai,char * which)167 static int open_tcp_listener(struct listen_t *l, const struct addrinfo *ai, char *which)
168 {
169 int arg;
170 int f;
171
172 hlog(LOG_INFO, "Binding listening %s socket: %s", which, l->addr_s);
173
174 if ((f = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) {
175 hlog(LOG_CRIT, "socket(): %s\n", strerror(errno));
176 return -1;
177 }
178
179 arg = 1;
180 if (setsockopt(f, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg)) == -1)
181 hlog(LOG_ERR, "setsockopt(%s, SO_REUSEADDR) failed for listener: %s", l->addr_s, strerror(errno));
182 #ifdef SO_REUSEPORT
183 if (setsockopt(f, SOL_SOCKET, SO_REUSEPORT, (char *)&arg, sizeof(arg)) == -1)
184 hlog(LOG_ERR, "setsockopt(%s, SO_REUSEPORT) failed for listener: %s", l->addr_s, strerror(errno));
185 #endif
186
187 if (bind(f, ai->ai_addr, ai->ai_addrlen)) {
188 hlog(LOG_CRIT, "bind(%s): %s", l->addr_s, strerror(errno));
189 close(f);
190 return -1;
191 }
192
193 if (listen(f, SOMAXCONN)) {
194 hlog(LOG_CRIT, "listen(%s) failed: %s", l->addr_s, strerror(errno));
195 close(f);
196 return -1;
197 }
198
199 l->fd = f;
200
201 return f;
202 }
203
204 /*
205 * Open the UDP receiving socket
206 */
207
open_udp_listener(struct listen_t * l,const struct addrinfo * ai)208 static int open_udp_listener(struct listen_t *l, const struct addrinfo *ai)
209 {
210 int arg;
211 int fd;
212 struct client_udp_t *c;
213 union sockaddr_u sa; /* large enough for also IPv6 address */
214
215 hlog(LOG_INFO, "Binding listening UDP socket: %s", l->addr_s);
216
217 if ((fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) {
218 hlog(LOG_CRIT, "socket(): %s\n", strerror(errno));
219 return -1;
220 }
221
222 arg = 1;
223 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg)) == -1) {
224 hlog(LOG_ERR, "setsockopt(%s, SO_REUSEADDR) failed: %s", l->addr_s, strerror(errno));
225 }
226 #ifdef SO_REUSEPORT
227 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (char *)&arg, sizeof(arg)) == -1) {
228 hlog(LOG_ERR, "setsockopt(%s, SO_REUSEPORT) failed: %s", l->addr_s, strerror(errno));
229 }
230 #endif
231
232 memcpy( &sa, ai->ai_addr, ai->ai_addrlen );
233
234 if (bind(fd, ai->ai_addr, ai->ai_addrlen)) {
235 hlog(LOG_CRIT, "bind(%s): %s", l->addr_s, strerror(errno));
236 close(fd);
237 return -1;
238 }
239
240 c = client_udp_alloc((l->corepeer) ? &udppeers : &udpclients, fd, l->portnum);
241 c->af = ai->ai_family;
242 c->portaccount = l->portaccount;
243
244 if (1) {
245 int len, arg;
246 /* Set bigger socket buffer sizes for the UDP port..
247 * The current settings are quite large just to accommodate
248 * load testing at high packet rates without packet loss.
249 */
250 len = sizeof(arg);
251 arg = 64*1024; /* 20 Kbytes is good for about 5 seconds of APRS-IS full feed */
252 if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &arg, len)) {
253 hlog(LOG_ERR, "UDP listener setup: setsockopt(SO_RCVBUF, %d) failed: %s", arg, strerror(errno));
254 }
255 arg = 128*1024; /* This one needs to fit packets going to *all* UDP clients */
256 if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &arg, len)) {
257 hlog(LOG_ERR, "UDP listener setup: setsockopt(SO_SNDBUF, %d) failed: %s", arg, strerror(errno));
258 }
259 }
260
261 /* set non-blocking mode */
262 if (fcntl(c->fd, F_SETFL, O_NONBLOCK) == -1) {
263 /* it really shouldn't fail.. and socket usage is "sendto(... MSG_DONTWAIT)",
264 * so it doesn't really even matter if it fails. */
265 hlog(LOG_ERR, "UDP listener setup: fcntl(F_SETFL, O_NONBLOCK) failed: %s", strerror(errno));
266 }
267
268 l->udp = c;
269 /* l->fd = fd -- except that close on it will kill working client setups.. */
270
271 return fd;
272 }
273
open_listener(struct listen_config_t * lc)274 static int open_listener(struct listen_config_t *lc)
275 {
276 struct listen_t *l;
277 int i;
278
279 l = listener_alloc();
280 l->id = lc->id;
281 l->hidden = lc->hidden;
282 l->corepeer = lc->corepeer;
283 l->client_flags = lc->client_flags;
284 l->clients_max = lc->clients_max;
285
286 l->portaccount = port_accounter_alloc();
287
288 /* Pick first of the AIs for this listen definition */
289 l->addr_s = strsockaddr( lc->ai->ai_addr, lc->ai->ai_addrlen );
290 l->name = hstrdup(lc->name);
291 l->portnum = lc->portnum;
292 l->ai_protocol = lc->ai->ai_protocol;
293 l->listener_id = keyhash(l->addr_s, strlen(l->addr_s), 0);
294 l->listener_id = keyhash(&lc->ai->ai_socktype, sizeof(lc->ai->ai_socktype), l->listener_id);
295 l->listener_id = keyhash(&lc->ai->ai_protocol, sizeof(lc->ai->ai_protocol), l->listener_id);
296 hlog(LOG_DEBUG, "Opening listener %d/%d '%s': %s", lc->id, l->listener_id, lc->name, l->addr_s);
297
298 if (lc->ai->ai_socktype == SOCK_DGRAM &&
299 lc->ai->ai_protocol == IPPROTO_UDP) {
300 /* UDP listening is not quite same as TCP listening.. */
301 i = open_udp_listener(l, lc->ai);
302 } else if (lc->ai->ai_socktype == SOCK_STREAM && lc->ai->ai_protocol == IPPROTO_TCP) {
303 /* TCP listening... */
304 i = open_tcp_listener(l, lc->ai, "TCP");
305 #ifdef USE_SCTP
306 } else if (lc->ai->ai_socktype == SOCK_STREAM &&
307 lc->ai->ai_protocol == IPPROTO_SCTP) {
308 i = open_tcp_listener(l, lc->ai, "SCTP");
309 if (i >= 0)
310 i = sctp_set_listen_params(l);
311 #endif
312 } else {
313 hlog(LOG_ERR, "Unsupported listener protocol for '%s'", l->name);
314 listener_free(l);
315 return -1;
316 }
317
318 if (i < 0) {
319 hlog(LOG_DEBUG, "... failed");
320 listener_free(l);
321 /* trigger reconfiguration after 30 seconds; probably an IP
322 * address that we tried to bind was not yet configured and
323 * it'll appear later in the boot process
324 */
325 accept_reconfigure_after_tick = tick + 30;
326 return -1;
327 }
328
329 hlog(LOG_DEBUG, "... ok, bound");
330
331 /* Set up an SSL context if necessary */
332 #ifdef USE_SSL
333 if (lc->keyfile && lc->certfile) {
334 l->ssl = ssl_alloc();
335
336 if (ssl_create(l->ssl, (void *)l)) {
337 hlog(LOG_ERR, "Failed to create SSL context for '%s*': %s", lc->name, l->addr_s);
338 listener_free(l);
339 return -1;
340 }
341
342 if (ssl_certificate(l->ssl, lc->certfile, lc->keyfile)) {
343 hlog(LOG_ERR, "Failed to load SSL key and certificates for '%s*': %s", lc->name, l->addr_s);
344 listener_free(l);
345 return -1;
346 }
347
348 /* optional client cert validation */
349 if (lc->cafile) {
350 if (ssl_ca_certificate(l->ssl, lc->cafile, 2)) {
351 hlog(LOG_ERR, "Failed to load trusted SSL CA certificates for '%s*': %s", lc->name, l->addr_s);
352 listener_free(l);
353 return -1;
354 }
355 }
356
357 hlog(LOG_INFO, "SSL initialized for '%s': %s%s", lc->name, l->addr_s, (lc->cafile) ? " (client validation enabled)" : "");
358 }
359 #endif
360
361 /* Copy access lists */
362 if (lc->acl)
363 l->acl = acl_dup(lc->acl);
364
365 /* Copy filter definitions */
366 listener_copy_filters(l, lc);
367
368 hlog(LOG_DEBUG, "... adding %s to listened sockets", l->addr_s);
369 // put (first) in the list of listening sockets
370 l->next = listen_list;
371 l->prevp = &listen_list;
372 if (listen_list)
373 listen_list->prevp = &l->next;
374 listen_list = l;
375
376 return 0;
377 }
378
find_listener_random_id(int id)379 static struct listen_t *find_listener_random_id(int id)
380 {
381 struct listen_t *l = listen_list;
382
383 while (l) {
384 if (l->id == id)
385 return l;
386 l = l->next;
387 }
388
389 return NULL;
390 }
391
find_listener_hash_id(int id)392 static struct listen_t *find_listener_hash_id(int id)
393 {
394 struct listen_t *l = listen_list;
395
396 while (l) {
397 if (l->listener_id == id)
398 return l;
399 l = l->next;
400 }
401
402 return NULL;
403 }
404
rescan_client_acls(void)405 static int rescan_client_acls(void)
406 {
407 struct worker_t *w = worker_threads;
408 struct client_t *c;
409 struct listen_t *l;
410 int pe;
411
412 hlog(LOG_DEBUG, "Scanning old clients against new ACLs and listeners");
413
414 while (w) {
415 if ((pe = pthread_mutex_lock(&w->clients_mutex))) {
416 hlog(LOG_ERR, "rescan_client_acls(worker %d): could not lock clients_mutex: %s", w->id, strerror(pe));
417 return -1;
418 }
419
420 for (c = w->clients; (c); c = c->next) {
421 /* do not disconnect uplinks at this point */
422 if (!(c->flags & CLFLAGS_INPORT))
423 continue;
424
425 l = find_listener_hash_id(c->listener_id);
426 if (!l) {
427 /* listener is not there any more */
428 hlog(LOG_INFO, "%s - Closing client on fd %d from %s (listener has been removed)", c->addr_loc, c->fd, c->addr_rem);
429 shutdown(c->fd, SHUT_RDWR);
430 continue;
431 }
432
433 /* is there an acl? */
434 if (!l->acl)
435 continue;
436
437 /* there is, check */
438 if (!acl_check(l->acl, (struct sockaddr *)&c->addr, sizeof(c->addr))) {
439 hlog(LOG_INFO, "%s - Denying client on fd %d from %s (new ACL)", c->addr_loc, c->fd, c->addr_rem);
440 shutdown(c->fd, SHUT_RDWR);
441 continue;
442 }
443 }
444
445 if ((pe = pthread_mutex_unlock(&w->clients_mutex))) {
446 hlog(LOG_ERR, "rescan_client_acls(worker %d): could not unlock clients_mutex: %s", w->id, strerror(pe));
447 /* we'd going to deadlock here... */
448 exit(1);
449 }
450
451 w = w->next;
452 }
453
454
455 return 0;
456 }
457
458
listener_update_config(struct listen_t * l,struct listen_config_t * lc)459 static void listener_update_config(struct listen_t *l, struct listen_config_t *lc)
460 {
461 hlog(LOG_DEBUG, "listener_update_config: %d '%s': %s:%d", lc->id, lc->name, lc->host, lc->portnum);
462
463 /* basic flags which can be changed on the fly */
464 l->clients_max = lc->clients_max; /* could drop clients when decreasing maxclients (done in worker) */
465 l->hidden = lc->hidden; /* could mark old clients on port hidden, too - needs to be done in worker */
466 l->client_flags = lc->client_flags; /* this one must not change old clients */
467
468 /* Filters */
469 listener_copy_filters(l, lc);
470
471 /* ACLs */
472 if (l->acl) {
473 acl_free(l->acl);
474 l->acl = NULL;
475 }
476 if (lc->acl) {
477 l->acl = acl_dup(lc->acl);
478 }
479
480 /* Listen address change? Rebind? */
481 }
482
open_missing_listeners(void)483 static int open_missing_listeners(void)
484 {
485 struct listen_config_t *lc;
486 struct listen_t *l;
487 int opened = 0;
488
489 for (lc = listen_config; (lc); lc = lc->next) {
490 if ((l = find_listener_random_id(lc->id))) {
491 hlog(LOG_DEBUG, "open_missing_listeners: already listening %d '%s': %s:%d", lc->id, lc->name, lc->host, lc->portnum);
492 listener_update_config(l, lc);
493 continue;
494 }
495
496 if (open_listener(lc) == 0)
497 opened++;
498 }
499
500 return opened;
501 }
502
close_removed_listeners(void)503 static int close_removed_listeners(void)
504 {
505 int closed = 0;
506
507 hlog(LOG_DEBUG, "Closing removed listening sockets....");
508 struct listen_t *l, *next;
509 next = listen_list;
510 while (next) {
511 l = next;
512 next = l->next;
513
514 struct listen_config_t *lc = find_listen_config_id(listen_config, l->id);
515 if (!lc) {
516 hlog(LOG_INFO, "Listener %d (%s) no longer in configuration, closing port....",
517 l->id, l->addr_s);
518 listener_free(l);
519 closed++;
520 }
521 }
522
523 return closed;
524 }
525
close_listeners(void)526 static void close_listeners(void)
527 {
528 if (!listen_list)
529 return;
530
531 hlog(LOG_DEBUG, "Closing listening sockets....");
532 while (listen_list)
533 listener_free(listen_list);
534 }
535
536 /*
537 * Generate UDP peer "clients"
538 */
539
peerip_clients_config(void)540 static void peerip_clients_config(void)
541 {
542 struct client_t *c;
543 struct peerip_config_t *pe;
544 struct client_udp_t *udpclient;
545 char *s;
546 union sockaddr_u sa; /* large enough for also IPv6 address */
547 socklen_t addr_len = sizeof(sa);
548
549 for (pe = peerip_config; (pe); pe = pe->next) {
550 hlog(LOG_DEBUG, "Setting up UDP peer %s (%s)", pe->name, pe->host);
551 udpclient = client_udp_find(udppeers, pe->af, pe->local_port);
552
553 if (!udpclient) {
554 hlog(LOG_ERR, "Failed to find UDP socket on port %d for peer %s (%s)", pe->local_port, pe->name, pe->host);
555 continue;
556 }
557
558 c = client_alloc();
559 if (!c) {
560 hlog(LOG_ERR, "peerip_clients_config: client_alloc returned NULL");
561 abort();
562 }
563 c->fd = -1; // Right, this client will never have a socket of it's own.
564 c->ai_protocol = IPPROTO_UDP;
565 c->portnum = pe->local_port; // local port
566 c->state = CSTATE_COREPEER;
567 c->validated = VALIDATED_WEAK;
568 c->flags = CLFLAGS_UPLINKPORT;
569 c->handler_line_in = &incoming_handler;
570 memcpy((void *)&c->udpaddr.sa, (void *)pe->ai->ai_addr, pe->ai->ai_addrlen);
571 c->udpaddrlen = pe->ai->ai_addrlen;
572 c->udp_port = pe->remote_port; // remote port
573 c->addr = c->udpaddr;
574 c->udpclient = udpclient;
575 //c->portaccount = l->portaccount;
576 c->keepalive = tick + keepalive_interval;
577 c->last_read = tick; /* not simulated time */
578
579 inbound_connects_account(3, c->udpclient->portaccount); /* "3" = udp, not listening.. */
580
581 /* set up peer serverid to username */
582 strncpy(c->username, pe->serverid, sizeof(c->username));
583 c->username[sizeof(c->username)-1] = 0;
584 c->username_len = strlen(c->username);
585
586 /* convert client address to string */
587 s = strsockaddr( &c->udpaddr.sa, c->udpaddrlen );
588
589 /* text format of client's IP address + port */
590 strncpy(c->addr_rem, s, sizeof(c->addr_rem));
591 c->addr_rem[sizeof(c->addr_rem)-1] = 0;
592 hfree(s);
593
594 /* hex format of client's IP address + port */
595 s = hexsockaddr( &c->udpaddr.sa, c->udpaddrlen );
596
597 strncpy(c->addr_hex, s, sizeof(c->addr_hex));
598 c->addr_hex[sizeof(c->addr_hex)-1] = 0;
599 hfree(s);
600
601 /* text format of servers' connected IP address + port */
602 addr_len = sizeof(sa);
603 if (getsockname(c->udpclient->fd, &sa.sa, &addr_len) == 0) { /* Fails very rarely.. */
604 /* present my socket end address as a malloced string... */
605 s = strsockaddr( &sa.sa, addr_len );
606 } else {
607 hlog(LOG_ERR, "Peer config: getsockname on udpclient->fd failed: %s", strerror(errno));
608 s = hstrdup( "um" ); /* Server's bound IP address.. TODO: what? */
609 }
610 strncpy(c->addr_loc, s, sizeof(c->addr_loc));
611 c->addr_loc[sizeof(c->addr_loc)-1] = 0;
612 hfree(s);
613
614 /* pass the client to the first worker thread */
615 if (pass_client_to_worker(worker_threads, c)) {
616 hlog(LOG_ERR, "Failed to pass UDP peer %s (%s) to worker", pe->name, pe->host);
617 client_free(c);
618 }
619 }
620 }
621
622 /*
623 * Close and free UDP peer "clients"
624 */
625
peerip_clients_close(void)626 static void peerip_clients_close(void)
627 {
628 struct client_t *c;
629
630 c = client_alloc();
631 if (!c) {
632 hlog(LOG_ERR, "peerip_clients_close: client_alloc returned NULL");
633 abort();
634 }
635
636 c->fd = -2; // Magic FD to close them all
637 c->state = CSTATE_COREPEER;
638 sprintf(c->filter_s, "peerip_clients_close"); // debugging
639
640 if (pass_client_to_worker(worker_threads, c)) {
641 hlog(LOG_ERR, "Failed to pass magic peerip_clients_close message pseudoclient to worker");
642 client_free(c);
643 }
644 }
645
646 /*
647 * Process an incoming UDP packet submission
648 */
649
accept_process_udpsubmit(struct listen_t * l,char * buf,int len,char * remote_host)650 static void accept_process_udpsubmit(struct listen_t *l, char *buf, int len, char *remote_host)
651 {
652 int packet_len;
653 char *login_string = NULL;
654 char *packet = NULL;
655 char *username = NULL;
656 char validated;
657 int e;
658
659 //hlog(LOG_DEBUG, "got udp submit: %.*s", len, buf);
660
661 packet_len = loginpost_split(buf, len, &login_string, &packet);
662 if (packet_len == -1) {
663 hlog(LOG_DEBUG, "UDP submit [%s]: No newline (LF) found in data", remote_host);
664 return;
665 }
666
667 if (!login_string) {
668 hlog(LOG_DEBUG, "UDP submit [%s]: No login string in data", remote_host);
669 return;
670 }
671
672 if (!packet) {
673 hlog(LOG_DEBUG, "UDP submit [%s]: No packet data found in data", remote_host);
674 return;
675 }
676
677 hlog(LOG_DEBUG, "UDP submit [%s]: login string: %s", remote_host, login_string);
678 hlog(LOG_DEBUG, "UDP submit [%s]: packet: %s", remote_host, packet);
679
680 /* process the login string */
681 validated = http_udp_upload_login(remote_host, login_string, &username, "UDP submit");
682 if (validated < 0) {
683 hlog(LOG_DEBUG, "UDP submit [%s]: Invalid login string", remote_host);
684 return;
685 }
686
687 if (validated != 1) {
688 hlog(LOG_DEBUG, "UDP submit [%s]: Invalid passcode for user %s", remote_host, username);
689 return;
690 }
691
692 /* packet size limits */
693 if (packet_len < PACKETLEN_MIN) {
694 hlog(LOG_DEBUG, "UDP submit [%s]: Packet too short: %d bytes", remote_host, packet_len);
695 return;
696 }
697
698 if (packet_len > PACKETLEN_MAX-2) {
699 hlog(LOG_DEBUG, "UDP submit [%s]: Packet too long: %d bytes", remote_host, packet_len);
700 return;
701 }
702
703 udp_pseudoclient->portaccount = l->portaccount;
704 e = pseudoclient_push_packet(udp_worker, udp_pseudoclient, username, packet, packet_len);
705 clientaccount_add(udp_pseudoclient, IPPROTO_UDP, len, 1, 0, 0, (e < 0) ? e : 0, 0);
706 udp_pseudoclient->portaccount = NULL;
707
708 if (e < 0)
709 hlog(LOG_DEBUG, "UDP submit [%s]: Incoming packet parse failure code %d: %s", remote_host, e, packet);
710 else
711 hlog(LOG_DEBUG, "UDP submit [%s]: Incoming packet parsed, code %d: %s", remote_host, e, packet);
712 }
713
714 /*
715 * Receive UDP packets from an UDP listener
716 */
717
accept_udp_recv(struct listen_t * l)718 static void accept_udp_recv(struct listen_t *l)
719 {
720 union sockaddr_u addr;
721 socklen_t addrlen;
722 char buf[2000];
723 int i;
724 char *addrs;
725
726 /* Receive as much as there is -- that is, LOOP... */
727 addrlen = sizeof(addr);
728
729 while ((i = recvfrom( l->udp->fd, buf, sizeof(buf)-1, MSG_DONTWAIT|MSG_TRUNC, (struct sockaddr *)&addr, &addrlen )) >= 0) {
730 if (!(l->client_flags & CLFLAGS_UDPSUBMIT)) {
731 hlog(LOG_DEBUG, "accept thread discarded an UDP packet on a listening socket");
732 continue;
733 }
734
735 addrs = strsockaddr(&addr.sa, addrlen);
736 accept_process_udpsubmit(l, buf, i, addrs);
737 hfree(addrs);
738 }
739 }
740
741 /*
742 * Accept a single client
743 */
744
accept_client_for_listener(struct listen_t * l,int fd,char * addr_s,union sockaddr_u * sa,unsigned addr_len)745 struct client_t *accept_client_for_listener(struct listen_t *l, int fd, char *addr_s, union sockaddr_u *sa, unsigned addr_len)
746 {
747 struct client_t *c;
748 char *s;
749 int i;
750 union sockaddr_u sa_loc; /* local address */
751 socklen_t addr_len_loc = sizeof(sa_loc);
752
753 c = client_alloc();
754 if (!c)
755 return NULL;
756
757 c->fd = fd;
758 c->listener_id = l->listener_id;
759 c->addr = *sa;
760 c->ai_protocol = l->ai_protocol;
761 c->portnum = l->portnum;
762 c->hidden = l->hidden;
763 c->flags = l->client_flags;
764 c->udpclient = client_udp_find(udpclients, sa->sa.sa_family, l->portnum);
765 c->portaccount = l->portaccount;
766 c->last_read = tick; /* not simulated time */
767 inbound_connects_account(1, c->portaccount); /* account all ports + port-specifics */
768
769 /* text format of client's IP address + port */
770 strncpy(c->addr_rem, addr_s, sizeof(c->addr_rem));
771 c->addr_rem[sizeof(c->addr_rem)-1] = 0;
772
773 /* hex format of client's IP address + port */
774 s = hexsockaddr( &sa->sa, addr_len );
775 strncpy(c->addr_hex, s, sizeof(c->addr_hex));
776 c->addr_hex[sizeof(c->addr_hex)-1] = 0;
777 hfree(s);
778
779 /* text format of servers' connected IP address + port */
780 if (getsockname(fd, &sa_loc.sa, &addr_len_loc) == 0) { /* Fails very rarely.. */
781 if (addr_len_loc > sizeof(sa_loc))
782 hlog(LOG_ERR, "accept_client_for_listener: getsockname for client %s truncated local address of %d to %d bytes", c->addr_rem, addr_len_loc, sizeof(sa_loc));
783 /* present my socket end address as a malloced string... */
784 s = strsockaddr( &sa_loc.sa, addr_len_loc );
785 } else {
786 s = hstrdup( l->addr_s ); /* Server's bound IP address */
787 hlog(LOG_ERR, "accept_client_for_listener: getsockname for client %s failed: %s (using '%s' instead)", c->addr_rem, strerror(errno), s);
788 }
789 strncpy(c->addr_loc, s, sizeof(c->addr_loc));
790 c->addr_loc[sizeof(c->addr_loc)-1] = 0;
791 hfree(s);
792
793 /* apply predefined filters */
794 for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) {
795 if (l->filters[i]) {
796 if (filter_parse(c, l->filters[i], 0) < 0) { /* system filters */
797 hlog(LOG_ERR, "Bad system filter definition: %s", l->filters[i]);
798 }
799 }
800 }
801 if (l->filter_s) {
802 strncpy(c->filter_s, l->filter_s, sizeof(c->filter_s));
803 c->filter_s[FILTER_S_SIZE-1] = 0;
804 }
805
806 return c;
807 }
808
809 /*
810 * Pick next worker to feed a client to
811 */
812
pick_next_worker(void)813 static struct worker_t *pick_next_worker(void)
814 {
815 static int next_receiving_worker;
816 struct worker_t *w, *wc;
817 int i;
818
819 #if 1
820 /* Use simple round-robin on client feeding. Least clients is
821 * quite attractive idea, but when clients arrive at huge bursts
822 * they tend to move in big bunches, and it takes quite some while
823 * before the worker updates its client-counters.
824 */
825 for (i = 0, w = worker_threads; w ; w = w->next, i++) {
826 if ( i >= next_receiving_worker)
827 break;
828 }
829 wc = w;
830 if (! w) {
831 wc = worker_threads; // ran out of the worker chain, back to the first..
832 next_receiving_worker = 0; // and reset the index too
833 }
834 // in every case, increment the next receiver index for the next call.
835 ++next_receiving_worker;
836
837 #else
838 /* find the worker with least clients...
839 * This isn't strictly accurate, since the threads could change their
840 * client counts during scanning, but we don't really care if the load distribution
841 * is _exactly_ fair.
842 */
843
844 int client_min = -1;
845 for (wc = w = worker_threads; (w); w = w->next)
846 if (w->client_count < client_min || client_min == -1) {
847 wc = w;
848 client_min = w->client_count;
849 }
850 #endif
851 return wc;
852 }
853
854 /*
855 * Accept a single connection
856 */
857
do_accept(struct listen_t * l)858 static void do_accept(struct listen_t *l)
859 {
860 int fd;
861 struct client_t *c;
862 union sockaddr_u sa; /* large enough for also IPv6 address */
863 socklen_t addr_len = sizeof(sa);
864 static time_t last_EMFILE_report;
865 char *s;
866
867 if ((fd = accept(l->fd, (struct sockaddr*)&sa, &addr_len)) < 0) {
868 int e = errno;
869 switch (e) {
870 /* Errors reporting really bad internal (programming) bugs */
871 case EBADF:
872 case EINVAL:
873 #ifdef ENOTSOCK
874 case ENOTSOCK: /* Not a socket */
875 #endif
876 #ifdef EOPNOTSUPP
877 case EOPNOTSUPP: /* Not a SOCK_STREAM */
878 #endif
879 #ifdef ESOCKTNOSUPPORT
880 case ESOCKTNOSUPPORT: /* Linux errors ? */
881 #endif
882 #ifdef EPROTONOSUPPORT
883 case EPROTONOSUPPORT: /* Linux errors ? */
884 #endif
885
886 hlog(LOG_CRIT, "accept() failed: %s (giving up)", strerror(e));
887 exit(1); // ABORT with core-dump ??
888
889 break;
890
891 /* Too many open files -- rate limit the reporting -- every 10th second or so.. */
892 case EMFILE:
893 if (last_EMFILE_report + 10 <= tick) {
894 last_EMFILE_report = tick;
895 hlog(LOG_ERR, "accept() failed: %s (continuing)", strerror(e));
896 }
897 return;
898 /* Errors reporting system internal/external glitches */
899 default:
900 hlog(LOG_ERR, "accept() failed: %s (continuing)", strerror(e));
901 return;
902 }
903 }
904
905 /* convert client address to string */
906 s = strsockaddr( &sa.sa, addr_len );
907
908 /* TODO: the dropped connections here are not accounted. */
909
910 /* Limit amount of connections per port, and globally.
911 * Error messages written just before closing the socet may or may not get
912 * to the user, but at least we try.
913 */
914 if (l->portaccount->gauge >= l->clients_max || inbound_connects.gauge >= maxclients) {
915 if (inbound_connects.gauge >= maxclients) {
916 hlog(LOG_INFO, "%s - Denied client on fd %d from %s: MaxClients reached (%d)", l->addr_s, fd, s, inbound_connects.gauge);
917 /* The "if" is here only to silence a compiler warning
918 * about ignoring the result value. We're really
919 * disconnecting the client right now, so we don't care.
920 */
921 if (write(fd, "# Server full\r\n", 15)) {};
922 } else {
923 hlog(LOG_INFO, "%s - Denied client on fd %d from %s: Too many clients on Listener (%d)", l->addr_s, fd, s, l->portaccount->gauge);
924 if (write(fd, "# Port full\r\n", 13)) {};
925 }
926 close(fd);
927 hfree(s);
928 inbound_connects_account(-1, l->portaccount); /* account rejected connection */
929 return;
930 }
931
932 /* match against acl... could probably have an error message to the client */
933 if (l->acl) {
934 if (!acl_check(l->acl, (struct sockaddr *)&sa, addr_len)) {
935 hlog(LOG_INFO, "%s - Denied client on fd %d from %s (ACL)", l->addr_s, fd, s);
936 close(fd);
937 hfree(s);
938 inbound_connects_account(-1, l->portaccount); /* account rejected connection */
939 return;
940 }
941 }
942
943 c = accept_client_for_listener(l, fd, s, &sa, addr_len);
944 if (!c) {
945 hlog(LOG_ERR, "%s - client_alloc returned NULL, too many clients. Denied client on fd %d from %s", l->addr_s, fd, s);
946 close(fd);
947 hfree(s);
948 inbound_connects_account(-1, l->portaccount); /* account rejected connection */
949 return;
950 }
951 hfree(s);
952
953 c->state = CSTATE_LOGIN;
954 /* use the default login handler */
955 c->handler_line_in = &login_handler;
956 c->keepalive = tick + keepalive_interval;
957
958 #ifdef USE_SSL
959 if (l->ssl) {
960 if (ssl_create_connection(l->ssl, c, 0)) {
961 close(fd);
962 inbound_connects_account(-1, l->portaccount); /* account rejected connection */
963 return;
964 }
965 }
966 #endif
967
968 hlog(LOG_DEBUG, "%s - Accepted client on fd %d from %s", c->addr_loc, c->fd, c->addr_rem);
969
970 /* set client socket options, return -1 on serious errors */
971 if (set_client_sockopt(c) != 0)
972 goto err;
973
974 /* ok, found it... lock the new client queue and pass the client */
975 if (pass_client_to_worker(pick_next_worker(), c))
976 goto err;
977
978 return;
979
980 err:
981
982 inbound_connects_account(0, c->portaccount); /* something failed, remove this from accounts.. */
983 client_free(c);
984 return;
985 }
986
987 /*
988 * Find a listener which this client is connected on
989 */
990
liveupgrade_find_listener(int listener_id)991 struct listen_t *liveupgrade_find_listener(int listener_id)
992 {
993 struct listen_t *l;
994
995 l = listen_list;
996
997 while (l) {
998 if (l->listener_id == listener_id)
999 break;
1000 l = l->next;
1001 }
1002
1003 return l;
1004 }
1005
1006 /*
1007 * Map old rxerrs counter values to new ones based on the string mapping
1008 */
1009
accept_rx_err_map(cJSON * rx_err_labels,int * old_rxerrs_len)1010 static int *accept_rx_err_map(cJSON *rx_err_labels, int *old_rxerrs_len)
1011 {
1012 int *rxerr_map = NULL;
1013 int i, j;
1014
1015 *old_rxerrs_len = cJSON_GetArraySize(rx_err_labels);
1016
1017 if (*old_rxerrs_len > 0) {
1018 rxerr_map = hmalloc(sizeof(*rxerr_map) * *old_rxerrs_len);
1019 for (i = 0; i < *old_rxerrs_len; i++) {
1020 rxerr_map[i] = -1; // default: no mapping
1021
1022 cJSON *rxerr = cJSON_GetArrayItem(rx_err_labels, i);
1023 if (!rxerr || rxerr->type != cJSON_String)
1024 continue;
1025
1026 for (j = 0; j < INERR_BUCKETS; j++) {
1027 if (strcmp(inerr_labels[j], rxerr->valuestring) == 0) {
1028 //hlog(LOG_DEBUG, "Mapped old rxerr index %d with new index %d: %s", i, j, rxerr->valuestring);
1029 rxerr_map[i] = j;
1030 }
1031 }
1032 }
1033 }
1034
1035 return rxerr_map;
1036 }
1037
accept_rx_err_load(struct client_t * c,cJSON * rx_errs,int * rxerr_map,int rxerr_map_len)1038 static void accept_rx_err_load(struct client_t *c, cJSON *rx_errs, int *rxerr_map, int rxerr_map_len)
1039 {
1040 int i;
1041 int alen = cJSON_GetArraySize(rx_errs);
1042
1043 for (i = 0; i < rxerr_map_len && i < alen; i++) {
1044 if (rxerr_map[i] >= 0 && rxerr_map[i] < INERR_BUCKETS) {
1045 cJSON *val = cJSON_GetArrayItem(rx_errs, i);
1046 if ((val) && val->type == cJSON_Number && val->valuedouble > 0)
1047 c->localaccount.rxerrs[rxerr_map[i]] = val->valuedouble;
1048 }
1049 }
1050 }
1051
1052 /*
1053 * Live upgrade: accept old clients
1054 */
1055
accept_liveupgrade_cJSON_get(cJSON * tree,const char * key,int type,const char * logid)1056 static cJSON *accept_liveupgrade_cJSON_get(cJSON *tree, const char *key, int type, const char *logid)
1057 {
1058 cJSON *val;
1059
1060 val = cJSON_GetObjectItem(tree, key);
1061
1062 if (!val) {
1063 hlog(LOG_ERR, "Live upgrade: Client '%s' JSON: Field '%s' missing", logid, key);
1064 return NULL;
1065 }
1066
1067 if (val->type != type) {
1068 hlog(LOG_ERR, "Live upgrade: Client '%s' JSON: Field '%s' has incorrect type %d, expected %d", logid, key, val->type, type);
1069 return NULL;
1070 }
1071
1072 return val;
1073 }
1074
accept_liveupgrade_single(cJSON * client,int * rxerr_map,int rxerr_map_len)1075 static int accept_liveupgrade_single(cJSON *client, int *rxerr_map, int rxerr_map_len)
1076 {
1077 cJSON *fd, *listener_id, *username, *time_connect, *tick_connect;
1078 cJSON *state;
1079 cJSON *addr_loc;
1080 cJSON *udp_port;
1081 cJSON *app_name, *app_version;
1082 cJSON *verified;
1083 cJSON *obuf_q;
1084 cJSON *bytes_rx, *bytes_tx;
1085 cJSON *pkts_rx, *pkts_tx, *pkts_ign;
1086 cJSON *rx_errs;
1087 cJSON *filter;
1088 cJSON *ibuf, *obuf;
1089 cJSON *client_heard;
1090 cJSON *lat, *lng;
1091 unsigned addr_len;
1092 union sockaddr_u sa;
1093 char *argv[256];
1094 int i, argc;
1095 const char *username_s = "unknown";
1096
1097 /* get username first, so we can log it later */
1098 username = accept_liveupgrade_cJSON_get(client, "username", cJSON_String, username_s);
1099 if (username)
1100 username_s = username->valuestring;
1101
1102 fd = accept_liveupgrade_cJSON_get(client, "fd", cJSON_Number, username_s);
1103 int fd_i = -1;
1104 if (fd)
1105 fd_i = fd->valueint;
1106
1107 if (fd_i < 0) {
1108 hlog(LOG_INFO, "Live upgrade: Client '%s' has negative fd %d, ignoring (corepeer?)", username_s, fd_i);
1109 return -1;
1110 }
1111
1112 listener_id = accept_liveupgrade_cJSON_get(client, "listener_id", cJSON_Number, username_s);
1113 state = accept_liveupgrade_cJSON_get(client, "state", cJSON_String, username_s);
1114 time_connect = accept_liveupgrade_cJSON_get(client, "t_connect", cJSON_Number, username_s);
1115 addr_loc = accept_liveupgrade_cJSON_get(client, "addr_loc", cJSON_String, username_s);
1116 app_name = accept_liveupgrade_cJSON_get(client, "app_name", cJSON_String, username_s);
1117 app_version = accept_liveupgrade_cJSON_get(client, "app_version", cJSON_String, username_s);
1118 verified = accept_liveupgrade_cJSON_get(client, "verified", cJSON_Number, username_s);
1119 obuf_q = accept_liveupgrade_cJSON_get(client, "obuf_q", cJSON_Number, username_s);
1120 bytes_rx = accept_liveupgrade_cJSON_get(client, "bytes_rx", cJSON_Number, username_s);
1121 bytes_tx = accept_liveupgrade_cJSON_get(client, "bytes_tx", cJSON_Number, username_s);
1122 pkts_rx = accept_liveupgrade_cJSON_get(client, "pkts_rx", cJSON_Number, username_s);
1123 pkts_tx = accept_liveupgrade_cJSON_get(client, "pkts_tx", cJSON_Number, username_s);
1124 pkts_ign = accept_liveupgrade_cJSON_get(client, "pkts_ign", cJSON_Number, username_s);
1125 rx_errs = accept_liveupgrade_cJSON_get(client, "rx_errs", cJSON_Array, username_s);
1126 filter = accept_liveupgrade_cJSON_get(client, "filter", cJSON_String, username_s);
1127
1128 /* optional */
1129 tick_connect = cJSON_GetObjectItem(client, "t_connect_tick");
1130 udp_port = cJSON_GetObjectItem(client, "udp_port");
1131 ibuf = cJSON_GetObjectItem(client, "ibuf");
1132 obuf = cJSON_GetObjectItem(client, "obuf");
1133 client_heard = cJSON_GetObjectItem(client, "client_heard");
1134 lat = cJSON_GetObjectItem(client, "lat");
1135 lng = cJSON_GetObjectItem(client, "lng");
1136
1137 if (!(
1138 (fd)
1139 && (listener_id)
1140 && (state)
1141 && (username)
1142 && (time_connect)
1143 && (addr_loc)
1144 && (app_name)
1145 && (app_version)
1146 && (verified)
1147 && (obuf_q)
1148 && (bytes_rx)
1149 && (bytes_tx)
1150 && (pkts_rx)
1151 && (pkts_tx)
1152 && (pkts_ign)
1153 && (rx_errs)
1154 && (filter)
1155 )) {
1156 hlog(LOG_ERR, "Live upgrade: Fields missing from client JSON, discarding client fd %d", fd_i);
1157 if (fd_i >= 0)
1158 close(fd_i);
1159 return -1;
1160 }
1161
1162 hlog(LOG_DEBUG, "Old client on fd %d: %s", fd->valueint, username->valuestring);
1163
1164 /* fetch peer address from the fd instead of parsing it from text */
1165 addr_len = sizeof(sa);
1166 if (getpeername(fd->valueint, &sa.sa, &addr_len) != 0) {
1167 /* Sometimes clients disconnect during upgrade, especially on slow RPi servers... */
1168 if (errno == ENOTCONN)
1169 hlog(LOG_INFO, "Live upgrade: Client %s on fd %d has disconnected during upgrade (%s)",
1170 username->valuestring, fd->valueint, strerror(errno));
1171 else
1172 hlog(LOG_ERR, "Live upgrade: getpeername client fd %d failed: %s", fd->valueint, strerror(errno));
1173 close(fd->valueint);
1174 return -1;
1175 }
1176
1177 /* convert client address to string */
1178 char *client_addr_s = strsockaddr( &sa.sa, addr_len );
1179
1180 /* find the right listener for this client, for configuration and accounting */
1181 struct listen_t *l = liveupgrade_find_listener(listener_id->valueint);
1182 if (!l) {
1183 hlog(LOG_INFO, "Live upgrade: Listener has been removed for fd %d (%s - local %s): disconnecting %s",
1184 fd->valueint, client_addr_s, addr_loc->valuestring, username->valuestring);
1185 close(fd->valueint);
1186 hfree(client_addr_s);
1187 return -1;
1188 }
1189
1190 struct client_t *c = accept_client_for_listener(l, fd->valueint, client_addr_s, &sa, addr_len);
1191 if (!c) {
1192 hlog(LOG_ERR, "Live upgrade - client_alloc returned NULL, too many clients. Denied client %s on fd %d from %s",
1193 username->valuestring, fd->valueint, client_addr_s);
1194 close(fd->valueint);
1195 hfree(client_addr_s);
1196 return -1;
1197 }
1198
1199 hfree(client_addr_s);
1200
1201 if (strcmp(state->valuestring, "connected") == 0) {
1202 c->state = CSTATE_CONNECTED;
1203 c->handler_line_in = &incoming_handler;
1204 strncpy(c->username, username->valuestring, sizeof(c->username));
1205 c->username[sizeof(c->username)-1] = 0;
1206 c->username_len = strlen(c->username);
1207 } else if (strcmp(state->valuestring, "login") == 0) {
1208 c->state = CSTATE_LOGIN;
1209 c->handler_line_in = &login_handler;
1210 } else {
1211 hlog(LOG_ERR, "Live upgrade: Client %s is in invalid state '%s' (fd %d)", l->addr_s, state->valuestring, l->fd);
1212 goto err;
1213 }
1214 /* distribute keepalive intervals for the existing old clients
1215 * but send them rather sooner than later */
1216 // coverity[dont_call] // squelch warning: not security sensitive use of random(): load distribution
1217 c->keepalive = tick + (random() % (keepalive_interval/2));
1218 /* distribute cleanup intervals over the next 2 minutes */
1219 // coverity[dont_call] // squelch warning: not security sensitive use of random(): load distribution
1220 c->cleanup = tick + (random() % 120);
1221
1222 c->connect_time = time_connect->valueint;
1223 /* live upgrade / backward compatibility: upgrading from <= 1.8.2 requires the 'else' path' */
1224 if (tick_connect && tick_connect->type == cJSON_Number)
1225 c->connect_tick = tick_connect->valueint;
1226 else /* convert to monotonic time */
1227 c->connect_tick = tick - (now - c->connect_time);
1228
1229 c->validated = verified->valueint;
1230 c->localaccount.rxbytes = bytes_rx->valuedouble;
1231 c->localaccount.txbytes = bytes_tx->valuedouble;
1232 c->localaccount.rxpackets = pkts_rx->valuedouble;
1233 c->localaccount.txpackets = pkts_tx->valuedouble;
1234 c->localaccount.rxdrops = pkts_ign->valuedouble;
1235
1236 login_set_app_name(c, app_name->valuestring, app_version->valuestring);
1237
1238 // handle client's filter setting
1239 if (c->flags & CLFLAGS_USERFILTEROK && (filter) && (filter->valuestring) && *(filter->valuestring)) {
1240 // archive a copy of the filters, for status display
1241 strncpy(c->filter_s, filter->valuestring, FILTER_S_SIZE);
1242 c->filter_s[FILTER_S_SIZE-1] = 0;
1243 sanitize_ascii_string(c->filter_s);
1244
1245 char *f = hstrdup(filter->valuestring);
1246 argc = parse_args(argv, f);
1247 for (i = 0; i < argc; ++i) {
1248 filter_parse(c, argv[i], 1);
1249 }
1250 hfree(f);
1251 }
1252
1253 // set up UDP downstream if necessary
1254 if (udp_port && udp_port->type == cJSON_Number && udp_port->valueint > 1024 && udp_port->valueint < 65536) {
1255 if (login_setup_udp_feed(c, udp_port->valueint) != 0) {
1256 hlog(LOG_DEBUG, "%s/%s: Requested UDP on client port with no UDP configured", c->addr_rem, c->username);
1257 }
1258 }
1259
1260 // fill up ibuf
1261 if (ibuf && ibuf->type == cJSON_String && ibuf->valuestring) {
1262 int l = hex_decode(c->ibuf, c->ibuf_size, ibuf->valuestring);
1263 if (l < 0) {
1264 hlog(LOG_ERR, "Live upgrade: %s/%s: Failed to decode ibuf: %s", c->addr_rem, c->username, ibuf->valuestring);
1265 } else {
1266 c->ibuf_end = l;
1267 hlog(LOG_DEBUG, "Live upgrade: Decoded ibuf %d bytes: '%.*s'", l, l, c->ibuf);
1268 hlog(LOG_DEBUG, "Hex: %s", ibuf->valuestring);
1269 }
1270 }
1271
1272 // fill up obuf
1273 if (obuf && obuf->type == cJSON_String && obuf->valuestring) {
1274 int l = hex_decode(c->obuf, c->obuf_size, obuf->valuestring);
1275 if (l < 0) {
1276 hlog(LOG_ERR, "Live upgrade: %s/%s: Failed to decode obuf: %s", c->addr_rem, c->username, obuf->valuestring);
1277 } else {
1278 c->obuf_start = 0;
1279 c->obuf_end = l;
1280 hlog(LOG_DEBUG, "Live upgrade: Decoded obuf %d bytes: '%.*s'", l, l, c->obuf);
1281 hlog(LOG_DEBUG, "Hex: %s", obuf->valuestring);
1282 }
1283 }
1284
1285 /* load list of stations heard by this client, to immediately support
1286 * messaging
1287 */
1288 if (client_heard && client_heard->type == cJSON_Array)
1289 client_heard_json_load(c, client_heard);
1290
1291 /* load rxerrs counters, with error name string mapping to support
1292 * adding/reordering of error counters
1293 */
1294 if (rx_errs && rx_errs->type == cJSON_Array && rxerr_map && rxerr_map_len > 0)
1295 accept_rx_err_load(c, rx_errs, rxerr_map, rxerr_map_len);
1296
1297 /* set client lat/lon, if they're given
1298 */
1299 if (lat && lng && lat->type == cJSON_Number && lng->type == cJSON_Number) {
1300 c->loc_known = 1;
1301 c->lat = lat->valuedouble;
1302 c->lng = lng->valuedouble;
1303 }
1304
1305 hlog(LOG_DEBUG, "%s - Accepted live upgrade client on fd %d from %s", c->addr_loc, c->fd, c->addr_rem);
1306
1307 /* set client socket options, return -1 on serious errors */
1308 if (set_client_sockopt(c) != 0)
1309 goto err;
1310
1311 /* Add the client to the client list. */
1312 int old_fd = clientlist_add(c);
1313 if (c->validated && old_fd != -1) {
1314 /* TODO: If old connection is SSL validated, and this one is not, do not disconnect it. */
1315 hlog(LOG_INFO, "fd %d: Disconnecting duplicate validated client with username '%s'", old_fd, c->username);
1316 shutdown(old_fd, SHUT_RDWR);
1317 }
1318
1319 /* ok, found it... lock the new client queue and pass the client */
1320 if (pass_client_to_worker(pick_next_worker(), c))
1321 goto err;
1322
1323 return 0;
1324
1325 err:
1326 close(c->fd);
1327 inbound_connects_account(0, c->portaccount); /* something failed, remove this from accounts.. */
1328 client_free(c);
1329 return -1;
1330 }
1331
accept_liveupgrade_accept(void)1332 static void accept_liveupgrade_accept(void)
1333 {
1334 int clen, i;
1335 int accepted = 0;
1336 int old_rxerrs_len = 0;
1337 int *rxerr_map = NULL;
1338
1339 hlog(LOG_INFO, "Accept: Collecting live upgrade clients...");
1340
1341 /* Create mapping for rx_errs table indexes, so that rxerrs can be
1342 * loaded right even if new ones were added in the middle, or if
1343 * the error counters were reordered
1344 */
1345 cJSON *rx_err_labels = cJSON_GetObjectItem(liveupgrade_status, "rx_errs");
1346 if ((rx_err_labels) && rx_err_labels->type == cJSON_Array)
1347 rxerr_map = accept_rx_err_map(rx_err_labels, &old_rxerrs_len);
1348
1349 cJSON *clients = cJSON_GetObjectItem(liveupgrade_status, "clients");
1350 if (!clients || clients->type != cJSON_Array) {
1351 hlog(LOG_ERR, "Accept: Live upgrade JSON does not contain 'clients' array!");
1352 } else {
1353 clen = cJSON_GetArraySize(clients);
1354 hlog(LOG_DEBUG, "Clients array length %d", clen);
1355 for (i = 0; i < clen; i++) {
1356 cJSON *client = cJSON_GetArrayItem(clients, i);
1357 if (!client || client->type != cJSON_Object) {
1358 hlog(LOG_ERR, "Accept: Live upgrade JSON file, get client %d failed", i);
1359 continue;
1360 }
1361 if (accept_liveupgrade_single(client, rxerr_map, old_rxerrs_len) == 0)
1362 accepted++;
1363 }
1364 hlog(LOG_INFO, "Accepted %d of %d old clients in live upgrade", accepted, clen);
1365 if (accepted != clen)
1366 hlog(LOG_ERR, "Live upgrade: Failed to accept %d old clients, see above for reasons", clen-accepted);
1367 }
1368
1369 cJSON_Delete(liveupgrade_status);
1370 liveupgrade_status = NULL;
1371
1372 if (rxerr_map)
1373 hfree(rxerr_map);
1374 }
1375
1376 /*
1377 * Accept thread
1378 */
1379
accept_thread(void * asdf)1380 void accept_thread(void *asdf)
1381 {
1382 sigset_t sigs_to_block;
1383 int e, n;
1384 struct pollfd *acceptpfd = NULL;
1385 struct listen_t **acceptpl = NULL;
1386 int listen_n = 0;
1387 int poll_n = 0;
1388 struct listen_t *l;
1389
1390 pthreads_profiling_reset("accept");
1391
1392 sigemptyset(&sigs_to_block);
1393 sigaddset(&sigs_to_block, SIGALRM);
1394 sigaddset(&sigs_to_block, SIGINT);
1395 sigaddset(&sigs_to_block, SIGTERM);
1396 sigaddset(&sigs_to_block, SIGQUIT);
1397 sigaddset(&sigs_to_block, SIGHUP);
1398 sigaddset(&sigs_to_block, SIGURG);
1399 sigaddset(&sigs_to_block, SIGPIPE);
1400 sigaddset(&sigs_to_block, SIGUSR1);
1401 sigaddset(&sigs_to_block, SIGUSR2);
1402 pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
1403
1404 /* start the accept thread, which will start server threads */
1405 hlog(LOG_INFO, "Accept thread starting...");
1406
1407 /* we allocate a worker structure to be used within the accept thread
1408 * for parsing incoming UDP packets and passing them on to the dupecheck
1409 * thread.
1410 */
1411 udp_worker = worker_alloc();
1412 udp_worker->id = 81;
1413
1414 /* we also need a client structure to be used with incoming
1415 * HTTP position uploads
1416 */
1417 udp_pseudoclient = pseudoclient_setup(81);
1418 udp_pseudoclient->flags |= CLFLAGS_UDPSUBMIT;
1419
1420 accept_reconfiguring = 1;
1421 while (!accept_shutting_down) {
1422 if (accept_reconfiguring) {
1423 accept_reconfiguring = 0;
1424 listen_n -= close_removed_listeners();
1425
1426 /* start listening on the sockets */
1427 listen_n += open_missing_listeners();
1428
1429 if (listen_n < 1) {
1430 hlog(LOG_CRIT, "Failed to listen on any ports.");
1431 exit(2);
1432 }
1433
1434 /* reconfiguration must scan old clients against ACL */
1435 rescan_client_acls();
1436
1437 /* how many are we polling */
1438 poll_n = 0;
1439 for (l = listen_list; (l); l = l->next)
1440 if (!l->corepeer)
1441 poll_n++;
1442
1443 hlog(LOG_DEBUG, "Generating polling list for %d/%d listeners...", poll_n, listen_n);
1444
1445 /* array of FDs for poll() */
1446 if (acceptpfd)
1447 hfree(acceptpfd);
1448 acceptpfd = hmalloc(poll_n * sizeof(*acceptpfd));
1449
1450 /* array of listeners */
1451 if (acceptpl)
1452 hfree(acceptpl);
1453 acceptpl = hmalloc(poll_n * sizeof(*acceptpl));
1454
1455 n = 0;
1456 int has_filtered_listeners_now = 0;
1457 for (l = listen_list; (l); l = l->next) {
1458 /* The accept thread does not poll() UDP sockets for core peers.
1459 * Worker 0 takes care of that, and processes the incoming packets.
1460 */
1461 if (l->corepeer) {
1462 hlog(LOG_DEBUG, "... %d: fd %d (%s) - not polled, is corepeer", n, (l->udp) ? l->udp->fd : l->fd, l->addr_s);
1463 continue;
1464 }
1465
1466 int fd;
1467 if (l->udp) {
1468 l->udp->polled = 1;
1469 fd = l->udp->fd;
1470 } else {
1471 fd = l->fd;
1472 }
1473
1474 if ((l->filter_s) || (l->client_flags & CLFLAGS_USERFILTEROK))
1475 has_filtered_listeners_now = 1;
1476
1477 hlog(LOG_DEBUG, "... %d: fd %d (%s)", n, fd, l->addr_s);
1478 acceptpfd[n].fd = fd;
1479 acceptpfd[n].events = POLLIN|POLLPRI|POLLERR|POLLHUP;
1480 acceptpl[n] = l;
1481 n++;
1482 }
1483 hlog(LOG_INFO, "Accept thread ready.");
1484 have_filtered_listeners = has_filtered_listeners_now;
1485 if (!have_filtered_listeners)
1486 hlog(LOG_INFO, "Disabled historydb, listeners do not have filtering enabled.");
1487
1488 /* stop the dupechecking and uplink threads while adjusting
1489 * the amount of workers... they walk the worker list, and
1490 * might get confused when workers are stopped or started.
1491 */
1492 if (workers_running != workers_configured) {
1493 uplink_stop();
1494 dupecheck_stop();
1495 workers_start();
1496 dupecheck_start();
1497 uplink_start();
1498 }
1499
1500 /*
1501 * generate UDP peer clients
1502 */
1503 peerip_clients_close();
1504 if (peerip_config)
1505 peerip_clients_config();
1506
1507 /* accept liveupgrade clients */
1508 if (liveupgrade_status)
1509 accept_liveupgrade_accept();
1510 } else if (accept_reconfigure_after_tick != 0 && accept_reconfigure_after_tick <= tick) {
1511 hlog(LOG_INFO, "Trying to reconfigure listeners due to a previous failure");
1512 accept_reconfiguring = 1;
1513 accept_reconfigure_after_tick = 0;
1514 }
1515
1516 /* check for new connections */
1517 e = poll(acceptpfd, poll_n, 200);
1518 if (e == 0)
1519 continue;
1520 if (e < 0) {
1521 if (errno == EINTR)
1522 continue;
1523 hlog(LOG_ERR, "poll() on accept failed: %s (continuing)", strerror(errno));
1524 continue;
1525 }
1526
1527 /* now, which socket was that on? */
1528 for (n = 0; n < poll_n; n++) {
1529 l = acceptpl[n];
1530 if (!(l) || (l->udp ? l->udp->fd : l->fd) != acceptpfd[n].fd) {
1531 hlog(LOG_CRIT, "accept_thread: polling list and listener list do mot match!");
1532 exit(1);
1533 }
1534 if (acceptpfd[n].revents) {
1535 if (l->udp)
1536 accept_udp_recv(l); /* receive UDP packets */
1537 else
1538 do_accept(l); /* accept a single connection */
1539 }
1540 }
1541 }
1542
1543 if (accept_shutting_down == 2)
1544 worker_shutdown_clients = cJSON_CreateArray();
1545
1546 hlog(LOG_DEBUG, "Accept thread shutting down listening sockets and worker threads...");
1547 uplink_stop();
1548 close_listeners();
1549 dupecheck_stop();
1550 http_shutting_down = 1;
1551 workers_stop(accept_shutting_down);
1552 hfree(acceptpfd);
1553 hfree(acceptpl);
1554 acceptpfd = NULL;
1555 acceptpl = NULL;
1556
1557 /* free up the pseudo-client */
1558 client_free(udp_pseudoclient);
1559 udp_pseudoclient = NULL;
1560
1561 /* free up the pseudo-worker structure, after dupecheck is long dead */
1562 worker_free_buffers(udp_worker);
1563 hfree(udp_worker);
1564 udp_worker = NULL;
1565 }
1566
1567 /*
1568 * generate status information in status.json about the listeners
1569 */
1570
accept_listener_status(cJSON * listeners,cJSON * totals)1571 int accept_listener_status(cJSON *listeners, cJSON *totals)
1572 {
1573 int n = 0;
1574 struct listen_t *l;
1575 long total_clients = 0;
1576 long total_connects = 0;
1577 /*
1578 * These aren't totals, these are only for clients, not uplinks.
1579 * So, disregard for now.
1580 long long total_rxbytes = 0;
1581 long long total_txbytes = 0;
1582 long long total_rxpackets = 0;
1583 long long total_txpackets = 0;
1584 */
1585
1586 for (l = listen_list; (l); l = l->next) {
1587 if (l->corepeer || l->hidden)
1588 continue;
1589 cJSON *jl = cJSON_CreateObject();
1590 cJSON_AddNumberToObject(jl, "fd", l->fd);
1591 cJSON_AddNumberToObject(jl, "id", l->id);
1592 cJSON_AddStringToObject(jl, "name", l->name);
1593 cJSON_AddStringToObject(jl, "proto", (l->udp) ? "udp" : "tcp");
1594 cJSON_AddStringToObject(jl, "addr", l->addr_s);
1595 if (l->filter_s)
1596 cJSON_AddStringToObject(jl, "filter", l->filter_s);
1597 cJSON_AddNumberToObject(jl, "clients", l->portaccount->gauge);
1598 cJSON_AddNumberToObject(jl, "clients_peak", l->portaccount->gauge_max);
1599 cJSON_AddNumberToObject(jl, "clients_max", l->clients_max);
1600 cJSON_AddNumberToObject(jl, "connects", l->portaccount->counter);
1601 cJSON_AddNumberToObject(jl, "bytes_rx", l->portaccount->rxbytes);
1602 cJSON_AddNumberToObject(jl, "bytes_tx", l->portaccount->txbytes);
1603 cJSON_AddNumberToObject(jl, "pkts_rx", l->portaccount->rxpackets);
1604 cJSON_AddNumberToObject(jl, "pkts_tx", l->portaccount->txpackets);
1605 cJSON_AddNumberToObject(jl, "pkts_ign", l->portaccount->rxdrops);
1606 cJSON_AddNumberToObject(jl, "pkts_dup", l->portaccount->rxdupes);
1607 json_add_rxerrs(jl, "rx_errs", l->portaccount->rxerrs);
1608 cJSON_AddItemToArray(listeners, jl);
1609
1610 if (!(l->udp)) {
1611 total_clients += l->portaccount->gauge;
1612 total_connects += l->portaccount->counter;
1613 }
1614 /*
1615 total_rxbytes += l->portaccount->rxbytes;
1616 total_txbytes += l->portaccount->txbytes;
1617 total_rxpackets += l->portaccount->rxpackets;
1618 total_txpackets += l->portaccount->txpackets;
1619 */
1620 }
1621
1622 cJSON_AddNumberToObject(totals, "clients_max", maxclients);
1623 cJSON_AddNumberToObject(totals, "clients", total_clients);
1624 cJSON_AddNumberToObject(totals, "connects", total_connects);
1625 /*
1626 cJSON_AddNumberToObject(totals, "bytes_rx", total_rxbytes);
1627 cJSON_AddNumberToObject(totals, "bytes_tx", total_txbytes);
1628 cJSON_AddNumberToObject(totals, "pkts_rx", total_rxpackets);
1629 cJSON_AddNumberToObject(totals, "pkts_tx", total_txpackets);
1630 */
1631
1632 return n;
1633 }
1634