1 /*
2  *	aprsc
3  *
4  *	(c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
5  *
6  *	This program is licensed under the BSD license, which can be found
7  *	in the file LICENSE.
8  */
9 
10 /*
11  *	accept.c: the connection accepting thread
12  */
13 
14 #include <stdio.h>
15 #include <errno.h>
16 #include <string.h>
17 #include <signal.h>
18 #include <time.h>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <fcntl.h>
22 #include <sys/types.h>
23 #include <sys/poll.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <arpa/inet.h>
28 #include <netdb.h>
29 
30 #include "accept.h"
31 #include "config.h"
32 #include "cfgfile.h"
33 #include "hlog.h"
34 #include "hmalloc.h"
35 #include "netlib.h"
36 #include "worker.h"
37 #include "dupecheck.h"
38 #include "filter.h"
39 #include "login.h"
40 #include "http.h"
41 #include "incoming.h" /* incoming_handler prototype */
42 #include "uplink.h"
43 #include "status.h"
44 #include "clientlist.h"
45 #include "client_heard.h"
46 #include "keyhash.h"
47 #include "ssl.h"
48 #include "sctp.h"
49 
50 #ifdef USE_SCTP
51 #include <netinet/sctp.h>
52 #endif
53 
54 static struct listen_t *listen_list;
55 
56 //  pthread_mutex_t mt_servercount = PTHREAD_MUTEX_INITIALIZER;
57 
58 int accept_shutting_down;
59 int accept_reconfiguring;
60 time_t accept_reconfigure_after_tick = 0;
61 
62 /* pseudoworker + pseudoclient for incoming UDP packets */
63 struct worker_t *udp_worker = NULL;
64 struct client_t *udp_pseudoclient = NULL;
65 
66 /* structure allocator/free */
67 
listener_alloc(void)68 static struct listen_t *listener_alloc(void)
69 {
70 	struct listen_t *l = hmalloc(sizeof(*l));
71 	memset( l, 0, sizeof(*l) );
72 	l->fd = -1;
73 	l->id = -1;
74 	l->listener_id = -1;
75 
76 	return l;
77 }
78 
listener_free(struct listen_t * l)79 static void listener_free(struct listen_t *l)
80 {
81 	int i;
82 
83 	hlog(LOG_DEBUG, "Freeing listener %d '%s': %s", l->id, l->name, l->addr_s);
84 
85 	if (l->udp) {
86 		l->fd = -1;
87 
88 		client_udp_free(l->udp);
89 	}
90 
91 	port_accounter_drop(l->portaccount); /* The last reference, perhaps. */
92 
93 	if (l->fd >= 0)	close(l->fd);
94 	if (l->addr_s)	hfree(l->addr_s);
95 	if (l->name)	hfree(l->name);
96 
97 	for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i)
98 		if (l->filters[i])
99 			hfree(l->filters[i]);
100 
101 	if (l->filter_s)
102 		hfree(l->filter_s);
103 
104 	if (l->acl)
105 		acl_free(l->acl);
106 
107 	/* merge listener list around this node */
108 	if (l->next)
109 		l->next->prevp = l->prevp;
110 	if (l->prevp)
111 		*(l->prevp) = l->next;
112 
113 	hfree(l);
114 }
115 
116 /*
117  *	Copy / duplicate filters from listener config to an actual listener
118  *	Free old filters if any are set.
119  */
120 
listener_copy_filters(struct listen_t * l,struct listen_config_t * lc)121 static void listener_copy_filters(struct listen_t *l, struct listen_config_t *lc)
122 {
123 	int i;
124 	char filter_s[FILTER_S_SIZE] = "";
125 	int filter_s_l = 0;
126 
127 	for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) {
128 		if (l->filters[i]) {
129 			hfree(l->filters[i]);
130 			l->filters[i] = NULL;
131 		}
132 
133 		if (i < (sizeof(lc->filters)/sizeof(lc->filters[0]))) {
134 			if (!lc->filters[i])
135 				continue;
136 
137 			l->filters[i] = hstrdup(lc->filters[i]);
138 
139 			int len = strlen(l->filters[i]);
140 			if (filter_s_l + len + 2 < FILTER_S_SIZE) {
141 				if (filter_s_l)
142 					filter_s[filter_s_l++] = ' ';
143 
144 				memcpy(filter_s + filter_s_l, l->filters[i], len);
145 				filter_s_l += len;
146 				filter_s[filter_s_l] = 0;
147 			}
148 		}
149 	}
150 
151 	if (l->filter_s) {
152 		hfree(l->filter_s);
153 		l->filter_s = NULL;
154 	}
155 
156 	if (filter_s_l == 0)
157 		return;
158 
159 	sanitize_ascii_string(filter_s);
160 	l->filter_s = hstrdup(filter_s);
161 }
162 
163 /*
164  *	Open the TCP/SCTP listening socket
165  */
166 
open_tcp_listener(struct listen_t * l,const struct addrinfo * ai,char * which)167 static int open_tcp_listener(struct listen_t *l, const struct addrinfo *ai, char *which)
168 {
169 	int arg;
170 	int f;
171 
172 	hlog(LOG_INFO, "Binding listening %s socket: %s", which, l->addr_s);
173 
174 	if ((f = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) {
175 		hlog(LOG_CRIT, "socket(): %s\n", strerror(errno));
176 		return -1;
177 	}
178 
179 	arg = 1;
180 	if (setsockopt(f, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg)) == -1)
181 		hlog(LOG_ERR, "setsockopt(%s, SO_REUSEADDR) failed for listener: %s", l->addr_s, strerror(errno));
182 #ifdef SO_REUSEPORT
183 	if (setsockopt(f, SOL_SOCKET, SO_REUSEPORT, (char *)&arg, sizeof(arg)) == -1)
184 		hlog(LOG_ERR, "setsockopt(%s, SO_REUSEPORT) failed for listener: %s", l->addr_s, strerror(errno));
185 #endif
186 
187 	if (bind(f, ai->ai_addr, ai->ai_addrlen)) {
188 		hlog(LOG_CRIT, "bind(%s): %s", l->addr_s, strerror(errno));
189 		close(f);
190 		return -1;
191 	}
192 
193 	if (listen(f, SOMAXCONN)) {
194 		hlog(LOG_CRIT, "listen(%s) failed: %s", l->addr_s, strerror(errno));
195 		close(f);
196 		return -1;
197 	}
198 
199 	l->fd = f;
200 
201 	return f;
202 }
203 
204 /*
205  *	Open the UDP receiving socket
206  */
207 
open_udp_listener(struct listen_t * l,const struct addrinfo * ai)208 static int open_udp_listener(struct listen_t *l, const struct addrinfo *ai)
209 {
210 	int arg;
211 	int fd;
212 	struct client_udp_t *c;
213 	union sockaddr_u sa; /* large enough for also IPv6 address */
214 
215 	hlog(LOG_INFO, "Binding listening UDP socket: %s", l->addr_s);
216 
217 	if ((fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) < 0) {
218 		hlog(LOG_CRIT, "socket(): %s\n", strerror(errno));
219 		return -1;
220 	}
221 
222 	arg = 1;
223 	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg)) == -1) {
224 		hlog(LOG_ERR, "setsockopt(%s, SO_REUSEADDR) failed: %s", l->addr_s, strerror(errno));
225 	}
226 #ifdef SO_REUSEPORT
227 	if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (char *)&arg, sizeof(arg)) == -1) {
228 		hlog(LOG_ERR, "setsockopt(%s, SO_REUSEPORT) failed: %s", l->addr_s, strerror(errno));
229 	}
230 #endif
231 
232 	memcpy( &sa, ai->ai_addr,  ai->ai_addrlen );
233 
234 	if (bind(fd, ai->ai_addr, ai->ai_addrlen)) {
235 		hlog(LOG_CRIT, "bind(%s): %s", l->addr_s, strerror(errno));
236 		close(fd);
237 		return -1;
238 	}
239 
240 	c = client_udp_alloc((l->corepeer) ? &udppeers : &udpclients, fd, l->portnum);
241 	c->af = ai->ai_family;
242 	c->portaccount = l->portaccount;
243 
244 	if (1) {
245 		int len, arg;
246 		/* Set bigger socket buffer sizes for the UDP port..
247 		 * The current settings are quite large just to accommodate
248 		 * load testing at high packet rates without packet loss.
249 		 */
250 		len = sizeof(arg);
251 		arg = 64*1024; /* 20 Kbytes is good for about 5 seconds of APRS-IS full feed */
252 		if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &arg, len)) {
253 			hlog(LOG_ERR, "UDP listener setup: setsockopt(SO_RCVBUF, %d) failed: %s", arg, strerror(errno));
254 		}
255 		arg = 128*1024; /* This one needs to fit packets going to *all* UDP clients */
256 		if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &arg, len)) {
257 			hlog(LOG_ERR, "UDP listener setup: setsockopt(SO_SNDBUF, %d) failed: %s", arg, strerror(errno));
258 		}
259 	}
260 
261 	/* set non-blocking mode */
262 	if (fcntl(c->fd, F_SETFL, O_NONBLOCK) == -1) {
263 		/* it really shouldn't fail.. and socket usage is "sendto(... MSG_DONTWAIT)",
264 		 * so it doesn't really even matter if it fails. */
265 		hlog(LOG_ERR, "UDP listener setup: fcntl(F_SETFL, O_NONBLOCK) failed: %s", strerror(errno));
266 	}
267 
268 	l->udp = c;
269 	/* l->fd = fd -- except that close on it will kill working client setups.. */
270 
271 	return fd;
272 }
273 
open_listener(struct listen_config_t * lc)274 static int open_listener(struct listen_config_t *lc)
275 {
276 	struct listen_t *l;
277 	int i;
278 
279 	l = listener_alloc();
280 	l->id = lc->id;
281 	l->hidden = lc->hidden;
282 	l->corepeer = lc->corepeer;
283 	l->client_flags = lc->client_flags;
284 	l->clients_max = lc->clients_max;
285 
286 	l->portaccount = port_accounter_alloc();
287 
288 	/* Pick first of the AIs for this listen definition */
289 	l->addr_s = strsockaddr( lc->ai->ai_addr, lc->ai->ai_addrlen );
290 	l->name   = hstrdup(lc->name);
291 	l->portnum = lc->portnum;
292 	l->ai_protocol = lc->ai->ai_protocol;
293 	l->listener_id = keyhash(l->addr_s, strlen(l->addr_s), 0);
294 	l->listener_id = keyhash(&lc->ai->ai_socktype, sizeof(lc->ai->ai_socktype), l->listener_id);
295 	l->listener_id = keyhash(&lc->ai->ai_protocol, sizeof(lc->ai->ai_protocol), l->listener_id);
296 	hlog(LOG_DEBUG, "Opening listener %d/%d '%s': %s", lc->id, l->listener_id, lc->name, l->addr_s);
297 
298 	if (lc->ai->ai_socktype == SOCK_DGRAM &&
299 	    lc->ai->ai_protocol == IPPROTO_UDP) {
300 		/* UDP listening is not quite same as TCP listening.. */
301 		i = open_udp_listener(l, lc->ai);
302 	} else if (lc->ai->ai_socktype == SOCK_STREAM && lc->ai->ai_protocol == IPPROTO_TCP) {
303 		/* TCP listening... */
304 		i = open_tcp_listener(l, lc->ai, "TCP");
305 #ifdef USE_SCTP
306 	} else if (lc->ai->ai_socktype == SOCK_STREAM &&
307 		   lc->ai->ai_protocol == IPPROTO_SCTP) {
308 		i = open_tcp_listener(l, lc->ai, "SCTP");
309 		if (i >= 0)
310 			i = sctp_set_listen_params(l);
311 #endif
312 	} else {
313 		hlog(LOG_ERR, "Unsupported listener protocol for '%s'", l->name);
314 		listener_free(l);
315 		return -1;
316 	}
317 
318 	if (i < 0) {
319 		hlog(LOG_DEBUG, "... failed");
320 		listener_free(l);
321 		/* trigger reconfiguration after 30 seconds; probably an IP
322 		 * address that we tried to bind was not yet configured and
323 		 * it'll appear later in the boot process
324 		 */
325 		accept_reconfigure_after_tick = tick + 30;
326 		return -1;
327 	}
328 
329 	hlog(LOG_DEBUG, "... ok, bound");
330 
331 	/* Set up an SSL context if necessary */
332 #ifdef USE_SSL
333 	if (lc->keyfile && lc->certfile) {
334 		l->ssl = ssl_alloc();
335 
336 		if (ssl_create(l->ssl, (void *)l)) {
337 			hlog(LOG_ERR, "Failed to create SSL context for '%s*': %s", lc->name, l->addr_s);
338 			listener_free(l);
339 			return -1;
340 		}
341 
342 		if (ssl_certificate(l->ssl, lc->certfile, lc->keyfile)) {
343 			hlog(LOG_ERR, "Failed to load SSL key and certificates for '%s*': %s", lc->name, l->addr_s);
344 			listener_free(l);
345 			return -1;
346 		}
347 
348 		/* optional client cert validation */
349 		if (lc->cafile) {
350 			if (ssl_ca_certificate(l->ssl, lc->cafile, 2)) {
351 				hlog(LOG_ERR, "Failed to load trusted SSL CA certificates for '%s*': %s", lc->name, l->addr_s);
352 				listener_free(l);
353 				return -1;
354 			}
355 		}
356 
357 		hlog(LOG_INFO, "SSL initialized for '%s': %s%s", lc->name, l->addr_s, (lc->cafile) ? " (client validation enabled)" : "");
358 	}
359 #endif
360 
361 	/* Copy access lists */
362 	if (lc->acl)
363 		l->acl = acl_dup(lc->acl);
364 
365 	/* Copy filter definitions */
366 	listener_copy_filters(l, lc);
367 
368 	hlog(LOG_DEBUG, "... adding %s to listened sockets", l->addr_s);
369 	// put (first) in the list of listening sockets
370 	l->next = listen_list;
371 	l->prevp = &listen_list;
372 	if (listen_list)
373 		listen_list->prevp = &l->next;
374 	listen_list = l;
375 
376 	return 0;
377 }
378 
find_listener_random_id(int id)379 static struct listen_t *find_listener_random_id(int id)
380 {
381 	struct listen_t *l = listen_list;
382 
383 	while (l) {
384 		if (l->id == id)
385 			return l;
386 		l = l->next;
387 	}
388 
389 	return NULL;
390 }
391 
find_listener_hash_id(int id)392 static struct listen_t *find_listener_hash_id(int id)
393 {
394 	struct listen_t *l = listen_list;
395 
396 	while (l) {
397 		if (l->listener_id == id)
398 			return l;
399 		l = l->next;
400 	}
401 
402 	return NULL;
403 }
404 
rescan_client_acls(void)405 static int rescan_client_acls(void)
406 {
407 	struct worker_t *w = worker_threads;
408 	struct client_t *c;
409 	struct listen_t *l;
410 	int pe;
411 
412 	hlog(LOG_DEBUG, "Scanning old clients against new ACLs and listeners");
413 
414 	while (w) {
415 		if ((pe = pthread_mutex_lock(&w->clients_mutex))) {
416 			hlog(LOG_ERR, "rescan_client_acls(worker %d): could not lock clients_mutex: %s", w->id, strerror(pe));
417 			return -1;
418 		}
419 
420 		for (c = w->clients; (c); c = c->next) {
421 			/* do not disconnect uplinks at this point */
422 			if (!(c->flags & CLFLAGS_INPORT))
423 				continue;
424 
425 			l = find_listener_hash_id(c->listener_id);
426 			if (!l) {
427 				/* listener is not there any more */
428 				hlog(LOG_INFO, "%s - Closing client on fd %d from %s (listener has been removed)", c->addr_loc, c->fd, c->addr_rem);
429 				shutdown(c->fd, SHUT_RDWR);
430 				continue;
431 			}
432 
433 			/* is there an acl? */
434 			if (!l->acl)
435 				continue;
436 
437 			/* there is, check */
438 			if (!acl_check(l->acl, (struct sockaddr *)&c->addr, sizeof(c->addr))) {
439 				hlog(LOG_INFO, "%s - Denying client on fd %d from %s (new ACL)", c->addr_loc, c->fd, c->addr_rem);
440 				shutdown(c->fd, SHUT_RDWR);
441 				continue;
442 			}
443 		}
444 
445 		if ((pe = pthread_mutex_unlock(&w->clients_mutex))) {
446 			hlog(LOG_ERR, "rescan_client_acls(worker %d): could not unlock clients_mutex: %s", w->id, strerror(pe));
447 			/* we'd going to deadlock here... */
448 			exit(1);
449 		}
450 
451 		w = w->next;
452 	}
453 
454 
455 	return 0;
456 }
457 
458 
listener_update_config(struct listen_t * l,struct listen_config_t * lc)459 static void listener_update_config(struct listen_t *l, struct listen_config_t *lc)
460 {
461 	hlog(LOG_DEBUG, "listener_update_config: %d '%s': %s:%d", lc->id, lc->name, lc->host, lc->portnum);
462 
463 	/* basic flags which can be changed on the fly */
464 	l->clients_max = lc->clients_max; /* could drop clients when decreasing maxclients (done in worker) */
465 	l->hidden = lc->hidden; /* could mark old clients on port hidden, too - needs to be done in worker */
466 	l->client_flags = lc->client_flags; /* this one must not change old clients */
467 
468 	/* Filters */
469 	listener_copy_filters(l, lc);
470 
471 	/* ACLs */
472 	if (l->acl) {
473 		acl_free(l->acl);
474 		l->acl = NULL;
475 	}
476 	if (lc->acl) {
477 		l->acl = acl_dup(lc->acl);
478 	}
479 
480 	/* Listen address change? Rebind? */
481 }
482 
open_missing_listeners(void)483 static int open_missing_listeners(void)
484 {
485 	struct listen_config_t *lc;
486 	struct listen_t *l;
487 	int opened = 0;
488 
489 	for (lc = listen_config; (lc); lc = lc->next) {
490 		if ((l = find_listener_random_id(lc->id))) {
491 			hlog(LOG_DEBUG, "open_missing_listeners: already listening %d '%s': %s:%d", lc->id, lc->name, lc->host, lc->portnum);
492 			listener_update_config(l, lc);
493 			continue;
494 		}
495 
496 		if (open_listener(lc) == 0)
497 			opened++;
498 	}
499 
500 	return opened;
501 }
502 
close_removed_listeners(void)503 static int close_removed_listeners(void)
504 {
505 	int closed = 0;
506 
507 	hlog(LOG_DEBUG, "Closing removed listening sockets....");
508 	struct listen_t *l, *next;
509 	next = listen_list;
510 	while (next) {
511 		l = next;
512 		next = l->next;
513 
514 		struct listen_config_t *lc = find_listen_config_id(listen_config, l->id);
515 		if (!lc) {
516 			hlog(LOG_INFO, "Listener %d (%s) no longer in configuration, closing port....",
517 				l->id, l->addr_s);
518 			listener_free(l);
519 			closed++;
520 		}
521 	}
522 
523 	return closed;
524 }
525 
close_listeners(void)526 static void close_listeners(void)
527 {
528 	if (!listen_list)
529 		return;
530 
531 	hlog(LOG_DEBUG, "Closing listening sockets....");
532 	while (listen_list)
533 		listener_free(listen_list);
534 }
535 
536 /*
537  *	Generate UDP peer "clients"
538  */
539 
peerip_clients_config(void)540 static void peerip_clients_config(void)
541 {
542 	struct client_t *c;
543 	struct peerip_config_t *pe;
544 	struct client_udp_t *udpclient;
545 	char *s;
546 	union sockaddr_u sa; /* large enough for also IPv6 address */
547 	socklen_t addr_len = sizeof(sa);
548 
549 	for (pe = peerip_config; (pe); pe = pe->next) {
550 		hlog(LOG_DEBUG, "Setting up UDP peer %s (%s)", pe->name, pe->host);
551 		udpclient = client_udp_find(udppeers, pe->af, pe->local_port);
552 
553 		if (!udpclient) {
554 			hlog(LOG_ERR, "Failed to find UDP socket on port %d for peer %s (%s)", pe->local_port, pe->name, pe->host);
555 			continue;
556 		}
557 
558 		c = client_alloc();
559 		if (!c) {
560 			hlog(LOG_ERR, "peerip_clients_config: client_alloc returned NULL");
561 			abort();
562 		}
563 		c->fd = -1; // Right, this client will never have a socket of it's own.
564 		c->ai_protocol = IPPROTO_UDP;
565 		c->portnum = pe->local_port; // local port
566 		c->state = CSTATE_COREPEER;
567 		c->validated = VALIDATED_WEAK;
568 		c->flags = CLFLAGS_UPLINKPORT;
569 		c->handler_line_in = &incoming_handler;
570 		memcpy((void *)&c->udpaddr.sa, (void *)pe->ai->ai_addr, pe->ai->ai_addrlen);
571 		c->udpaddrlen = pe->ai->ai_addrlen;
572 		c->udp_port = pe->remote_port; // remote port
573 		c->addr = c->udpaddr;
574 		c->udpclient = udpclient;
575 		//c->portaccount = l->portaccount;
576 		c->keepalive = tick + keepalive_interval;
577 		c->last_read = tick; /* not simulated time */
578 
579 		inbound_connects_account(3, c->udpclient->portaccount); /* "3" = udp, not listening..  */
580 
581 		/* set up peer serverid to username */
582 		strncpy(c->username, pe->serverid, sizeof(c->username));
583 		c->username[sizeof(c->username)-1] = 0;
584 		c->username_len = strlen(c->username);
585 
586 		/* convert client address to string */
587 		s = strsockaddr( &c->udpaddr.sa, c->udpaddrlen );
588 
589 		/* text format of client's IP address + port */
590 		strncpy(c->addr_rem, s, sizeof(c->addr_rem));
591 		c->addr_rem[sizeof(c->addr_rem)-1] = 0;
592 		hfree(s);
593 
594 		/* hex format of client's IP address + port */
595 		s = hexsockaddr( &c->udpaddr.sa, c->udpaddrlen );
596 
597 		strncpy(c->addr_hex, s, sizeof(c->addr_hex));
598 		c->addr_hex[sizeof(c->addr_hex)-1] = 0;
599 		hfree(s);
600 
601 		/* text format of servers' connected IP address + port */
602 		addr_len = sizeof(sa);
603 		if (getsockname(c->udpclient->fd, &sa.sa, &addr_len) == 0) { /* Fails very rarely.. */
604 			/* present my socket end address as a malloced string... */
605 			s = strsockaddr( &sa.sa, addr_len );
606 		} else {
607 			hlog(LOG_ERR, "Peer config: getsockname on udpclient->fd failed: %s", strerror(errno));
608 			s = hstrdup( "um" ); /* Server's bound IP address.. TODO: what? */
609 		}
610 		strncpy(c->addr_loc, s, sizeof(c->addr_loc));
611 		c->addr_loc[sizeof(c->addr_loc)-1] = 0;
612 		hfree(s);
613 
614 		/* pass the client to the first worker thread */
615 		if (pass_client_to_worker(worker_threads, c)) {
616 			hlog(LOG_ERR, "Failed to pass UDP peer %s (%s) to worker", pe->name, pe->host);
617 			client_free(c);
618 		}
619 	}
620 }
621 
622 /*
623  *	Close and free UDP peer "clients"
624  */
625 
peerip_clients_close(void)626 static void peerip_clients_close(void)
627 {
628 	struct client_t *c;
629 
630 	c = client_alloc();
631 	if (!c) {
632 		hlog(LOG_ERR, "peerip_clients_close: client_alloc returned NULL");
633 		abort();
634 	}
635 
636 	c->fd = -2; // Magic FD to close them all
637 	c->state = CSTATE_COREPEER;
638 	sprintf(c->filter_s, "peerip_clients_close"); // debugging
639 
640 	if (pass_client_to_worker(worker_threads, c)) {
641 		hlog(LOG_ERR, "Failed to pass magic peerip_clients_close message pseudoclient to worker");
642 		client_free(c);
643 	}
644 }
645 
646 /*
647  *	Process an incoming UDP packet submission
648  */
649 
accept_process_udpsubmit(struct listen_t * l,char * buf,int len,char * remote_host)650 static void accept_process_udpsubmit(struct listen_t *l, char *buf, int len, char *remote_host)
651 {
652 	int packet_len;
653 	char *login_string = NULL;
654 	char *packet = NULL;
655 	char *username = NULL;
656 	char validated;
657 	int e;
658 
659 	//hlog(LOG_DEBUG, "got udp submit: %.*s", len, buf);
660 
661 	packet_len = loginpost_split(buf, len, &login_string, &packet);
662 	if (packet_len == -1) {
663 		hlog(LOG_DEBUG, "UDP submit [%s]: No newline (LF) found in data", remote_host);
664 		return;
665 	}
666 
667 	if (!login_string) {
668 		hlog(LOG_DEBUG, "UDP submit [%s]: No login string in data", remote_host);
669 		return;
670 	}
671 
672 	if (!packet) {
673 		hlog(LOG_DEBUG, "UDP submit [%s]: No packet data found in data", remote_host);
674 		return;
675 	}
676 
677 	hlog(LOG_DEBUG, "UDP submit [%s]: login string: %s", remote_host, login_string);
678 	hlog(LOG_DEBUG, "UDP submit [%s]: packet: %s", remote_host, packet);
679 
680 	/* process the login string */
681 	validated = http_udp_upload_login(remote_host, login_string, &username, "UDP submit");
682 	if (validated < 0) {
683 		hlog(LOG_DEBUG, "UDP submit [%s]: Invalid login string", remote_host);
684 		return;
685 	}
686 
687 	if (validated != 1) {
688 		hlog(LOG_DEBUG, "UDP submit [%s]: Invalid passcode for user %s", remote_host, username);
689 		return;
690 	}
691 
692 	/* packet size limits */
693 	if (packet_len < PACKETLEN_MIN) {
694 		hlog(LOG_DEBUG, "UDP submit [%s]: Packet too short: %d bytes", remote_host, packet_len);
695 		return;
696 	}
697 
698 	if (packet_len > PACKETLEN_MAX-2) {
699 		hlog(LOG_DEBUG, "UDP submit [%s]: Packet too long: %d bytes", remote_host, packet_len);
700 		return;
701 	}
702 
703 	udp_pseudoclient->portaccount = l->portaccount;
704 	e = pseudoclient_push_packet(udp_worker, udp_pseudoclient, username, packet, packet_len);
705 	clientaccount_add(udp_pseudoclient, IPPROTO_UDP, len, 1, 0, 0, (e < 0) ? e : 0, 0);
706 	udp_pseudoclient->portaccount = NULL;
707 
708 	if (e < 0)
709 		hlog(LOG_DEBUG, "UDP submit [%s]: Incoming packet parse failure code %d: %s", remote_host, e, packet);
710 	else
711 		hlog(LOG_DEBUG, "UDP submit [%s]: Incoming packet parsed, code %d: %s", remote_host, e, packet);
712 }
713 
714 /*
715  *	Receive UDP packets from an UDP listener
716  */
717 
accept_udp_recv(struct listen_t * l)718 static void accept_udp_recv(struct listen_t *l)
719 {
720 	union sockaddr_u addr;
721 	socklen_t addrlen;
722 	char buf[2000];
723 	int i;
724 	char *addrs;
725 
726 	/* Receive as much as there is -- that is, LOOP...  */
727 	addrlen = sizeof(addr);
728 
729 	while ((i = recvfrom( l->udp->fd, buf, sizeof(buf)-1, MSG_DONTWAIT|MSG_TRUNC, (struct sockaddr *)&addr, &addrlen )) >= 0) {
730 		if (!(l->client_flags & CLFLAGS_UDPSUBMIT)) {
731 			hlog(LOG_DEBUG, "accept thread discarded an UDP packet on a listening socket");
732 			continue;
733 		}
734 
735 		addrs = strsockaddr(&addr.sa, addrlen);
736 		accept_process_udpsubmit(l, buf, i, addrs);
737 		hfree(addrs);
738 	}
739 }
740 
741 /*
742  *	Accept a single client
743  */
744 
accept_client_for_listener(struct listen_t * l,int fd,char * addr_s,union sockaddr_u * sa,unsigned addr_len)745 struct client_t *accept_client_for_listener(struct listen_t *l, int fd, char *addr_s, union sockaddr_u *sa, unsigned addr_len)
746 {
747 	struct client_t *c;
748 	char *s;
749 	int i;
750 	union sockaddr_u sa_loc; /* local address */
751 	socklen_t addr_len_loc = sizeof(sa_loc);
752 
753 	c = client_alloc();
754 	if (!c)
755 		return NULL;
756 
757 	c->fd    = fd;
758 	c->listener_id = l->listener_id;
759 	c->addr  = *sa;
760 	c->ai_protocol = l->ai_protocol;
761 	c->portnum = l->portnum;
762 	c->hidden  = l->hidden;
763 	c->flags   = l->client_flags;
764 	c->udpclient = client_udp_find(udpclients, sa->sa.sa_family, l->portnum);
765 	c->portaccount = l->portaccount;
766 	c->last_read = tick; /* not simulated time */
767 	inbound_connects_account(1, c->portaccount); /* account all ports + port-specifics */
768 
769 	/* text format of client's IP address + port */
770 	strncpy(c->addr_rem, addr_s, sizeof(c->addr_rem));
771 	c->addr_rem[sizeof(c->addr_rem)-1] = 0;
772 
773 	/* hex format of client's IP address + port */
774 	s = hexsockaddr( &sa->sa, addr_len );
775 	strncpy(c->addr_hex, s, sizeof(c->addr_hex));
776 	c->addr_hex[sizeof(c->addr_hex)-1] = 0;
777 	hfree(s);
778 
779 	/* text format of servers' connected IP address + port */
780 	if (getsockname(fd, &sa_loc.sa, &addr_len_loc) == 0) { /* Fails very rarely.. */
781 		if (addr_len_loc > sizeof(sa_loc))
782 			hlog(LOG_ERR, "accept_client_for_listener: getsockname for client %s truncated local address of %d to %d bytes", c->addr_rem, addr_len_loc, sizeof(sa_loc));
783 		/* present my socket end address as a malloced string... */
784 		s = strsockaddr( &sa_loc.sa, addr_len_loc );
785 	} else {
786 		s = hstrdup( l->addr_s ); /* Server's bound IP address */
787 		hlog(LOG_ERR, "accept_client_for_listener: getsockname for client %s failed: %s (using '%s' instead)", c->addr_rem, strerror(errno), s);
788 	}
789 	strncpy(c->addr_loc, s, sizeof(c->addr_loc));
790 	c->addr_loc[sizeof(c->addr_loc)-1] = 0;
791 	hfree(s);
792 
793 	/* apply predefined filters */
794 	for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) {
795 		if (l->filters[i]) {
796 			if (filter_parse(c, l->filters[i], 0) < 0) { /* system filters */
797 				hlog(LOG_ERR, "Bad system filter definition: %s", l->filters[i]);
798 			}
799 		}
800 	}
801 	if (l->filter_s) {
802 		strncpy(c->filter_s, l->filter_s, sizeof(c->filter_s));
803 		c->filter_s[FILTER_S_SIZE-1] = 0;
804 	}
805 
806 	return c;
807 }
808 
809 /*
810  *	Pick next worker to feed a client to
811  */
812 
pick_next_worker(void)813 static struct worker_t *pick_next_worker(void)
814 {
815 	static int next_receiving_worker;
816 	struct worker_t *w, *wc;
817 	int i;
818 
819 #if 1
820 	/* Use simple round-robin on client feeding.  Least clients is
821 	 * quite attractive idea, but when clients arrive at huge bursts
822 	 * they tend to move in big bunches, and it takes quite some while
823 	 * before the worker updates its client-counters.
824 	 */
825 	for (i = 0, w = worker_threads; w ;  w = w->next, i++) {
826 		if ( i >= next_receiving_worker)
827 			break;
828 	}
829 	wc = w;
830 	if (! w) {
831 		wc = worker_threads;       // ran out of the worker chain, back to the first..
832 		next_receiving_worker = 0; // and reset the index too
833 	}
834 	// in every case, increment the next receiver index for the next call.
835 	++next_receiving_worker;
836 
837 #else
838 	/* find the worker with least clients...
839 	 * This isn't strictly accurate, since the threads could change their
840 	 * client counts during scanning, but we don't really care if the load distribution
841 	 * is _exactly_ fair.
842 	 */
843 
844 	int client_min = -1;
845 	for (wc = w = worker_threads; (w); w = w->next)
846 		if (w->client_count < client_min || client_min == -1) {
847 			wc = w;
848 			client_min = w->client_count;
849 		}
850 #endif
851 	return wc;
852 }
853 
854 /*
855  *	Accept a single connection
856  */
857 
do_accept(struct listen_t * l)858 static void do_accept(struct listen_t *l)
859 {
860 	int fd;
861 	struct client_t *c;
862 	union sockaddr_u sa; /* large enough for also IPv6 address */
863 	socklen_t addr_len = sizeof(sa);
864 	static time_t last_EMFILE_report;
865 	char *s;
866 
867 	if ((fd = accept(l->fd, (struct sockaddr*)&sa, &addr_len)) < 0) {
868 		int e = errno;
869 		switch (e) {
870 			/* Errors reporting really bad internal (programming) bugs */
871 			case EBADF:
872 			case EINVAL:
873 #ifdef ENOTSOCK
874 			case ENOTSOCK: /* Not a socket */
875 #endif
876 #ifdef EOPNOTSUPP
877 			case EOPNOTSUPP: /* Not a SOCK_STREAM */
878 #endif
879 #ifdef ESOCKTNOSUPPORT
880 			case ESOCKTNOSUPPORT: /* Linux errors ? */
881 #endif
882 #ifdef EPROTONOSUPPORT
883 			case EPROTONOSUPPORT: /* Linux errors ? */
884 #endif
885 
886 				hlog(LOG_CRIT, "accept() failed: %s (giving up)", strerror(e));
887 				exit(1); // ABORT with core-dump ??
888 
889 				break;
890 
891 			/* Too many open files -- rate limit the reporting -- every 10th second or so.. */
892 			case EMFILE:
893 				if (last_EMFILE_report + 10 <= tick) {
894 					last_EMFILE_report = tick;
895 					hlog(LOG_ERR, "accept() failed: %s (continuing)", strerror(e));
896 				}
897 				return;
898 			/* Errors reporting system internal/external glitches */
899 			default:
900 				hlog(LOG_ERR, "accept() failed: %s (continuing)", strerror(e));
901 				return;
902 		}
903 	}
904 
905 	/* convert client address to string */
906 	s = strsockaddr( &sa.sa, addr_len );
907 
908 	/* TODO: the dropped connections here are not accounted. */
909 
910 	/* Limit amount of connections per port, and globally.
911 	 * Error messages written just before closing the socet may or may not get
912 	 * to the user, but at least we try.
913 	 */
914 	if (l->portaccount->gauge >= l->clients_max || inbound_connects.gauge >= maxclients) {
915 		if (inbound_connects.gauge >= maxclients) {
916 			hlog(LOG_INFO, "%s - Denied client on fd %d from %s: MaxClients reached (%d)", l->addr_s, fd, s, inbound_connects.gauge);
917 			/* The "if" is here only to silence a compiler warning
918 			 * about ignoring the result value. We're really
919 			 * disconnecting the client right now, so we don't care.
920 			 */
921 			if (write(fd, "# Server full\r\n", 15)) {};
922 		} else {
923 			hlog(LOG_INFO, "%s - Denied client on fd %d from %s: Too many clients on Listener (%d)", l->addr_s, fd, s, l->portaccount->gauge);
924 			if (write(fd, "# Port full\r\n", 13)) {};
925 		}
926 		close(fd);
927 		hfree(s);
928 		inbound_connects_account(-1, l->portaccount); /* account rejected connection */
929 		return;
930 	}
931 
932 	/* match against acl... could probably have an error message to the client */
933 	if (l->acl) {
934 		if (!acl_check(l->acl, (struct sockaddr *)&sa, addr_len)) {
935 			hlog(LOG_INFO, "%s - Denied client on fd %d from %s (ACL)", l->addr_s, fd, s);
936 			close(fd);
937 			hfree(s);
938 			inbound_connects_account(-1, l->portaccount); /* account rejected connection */
939 			return;
940 		}
941 	}
942 
943 	c = accept_client_for_listener(l, fd, s, &sa, addr_len);
944 	if (!c) {
945 		hlog(LOG_ERR, "%s - client_alloc returned NULL, too many clients. Denied client on fd %d from %s", l->addr_s, fd, s);
946 		close(fd);
947 		hfree(s);
948 		inbound_connects_account(-1, l->portaccount); /* account rejected connection */
949 		return;
950 	}
951 	hfree(s);
952 
953 	c->state   = CSTATE_LOGIN;
954 	/* use the default login handler */
955 	c->handler_line_in = &login_handler;
956 	c->keepalive = tick + keepalive_interval;
957 
958 #ifdef USE_SSL
959 	if (l->ssl) {
960 		if (ssl_create_connection(l->ssl, c, 0)) {
961 			close(fd);
962 			inbound_connects_account(-1, l->portaccount); /* account rejected connection */
963 			return;
964 		}
965 	}
966 #endif
967 
968 	hlog(LOG_DEBUG, "%s - Accepted client on fd %d from %s", c->addr_loc, c->fd, c->addr_rem);
969 
970 	/* set client socket options, return -1 on serious errors */
971 	if (set_client_sockopt(c) != 0)
972 		goto err;
973 
974 	/* ok, found it... lock the new client queue and pass the client */
975 	if (pass_client_to_worker(pick_next_worker(), c))
976 		goto err;
977 
978 	return;
979 
980 err:
981 
982 	inbound_connects_account(0, c->portaccount); /* something failed, remove this from accounts.. */
983 	client_free(c);
984 	return;
985 }
986 
987 /*
988  *	Find a listener which this client is connected on
989  */
990 
liveupgrade_find_listener(int listener_id)991 struct listen_t *liveupgrade_find_listener(int listener_id)
992 {
993 	struct listen_t *l;
994 
995 	l = listen_list;
996 
997 	while (l) {
998 		if (l->listener_id == listener_id)
999 			break;
1000 		l = l->next;
1001 	}
1002 
1003 	return l;
1004 }
1005 
1006 /*
1007  *	Map old rxerrs counter values to new ones based on the string mapping
1008  */
1009 
accept_rx_err_map(cJSON * rx_err_labels,int * old_rxerrs_len)1010 static int *accept_rx_err_map(cJSON *rx_err_labels, int *old_rxerrs_len)
1011 {
1012 	int *rxerr_map = NULL;
1013 	int i, j;
1014 
1015 	*old_rxerrs_len = cJSON_GetArraySize(rx_err_labels);
1016 
1017 	if (*old_rxerrs_len > 0) {
1018 		rxerr_map = hmalloc(sizeof(*rxerr_map) * *old_rxerrs_len);
1019 		for (i = 0; i < *old_rxerrs_len; i++) {
1020 			rxerr_map[i] = -1; // default: no mapping
1021 
1022 			cJSON *rxerr = cJSON_GetArrayItem(rx_err_labels, i);
1023 			if (!rxerr || rxerr->type != cJSON_String)
1024 				continue;
1025 
1026 			for (j = 0; j < INERR_BUCKETS; j++) {
1027 				if (strcmp(inerr_labels[j], rxerr->valuestring) == 0) {
1028 					//hlog(LOG_DEBUG, "Mapped old rxerr index %d with new index %d: %s", i, j, rxerr->valuestring);
1029 					rxerr_map[i] = j;
1030 				}
1031 			}
1032 		}
1033 	}
1034 
1035 	return rxerr_map;
1036 }
1037 
accept_rx_err_load(struct client_t * c,cJSON * rx_errs,int * rxerr_map,int rxerr_map_len)1038 static void accept_rx_err_load(struct client_t *c, cJSON *rx_errs, int *rxerr_map, int rxerr_map_len)
1039 {
1040 	int i;
1041 	int alen = cJSON_GetArraySize(rx_errs);
1042 
1043 	for (i = 0; i < rxerr_map_len && i < alen; i++) {
1044 		if (rxerr_map[i] >= 0 && rxerr_map[i] < INERR_BUCKETS) {
1045 			cJSON *val = cJSON_GetArrayItem(rx_errs, i);
1046 			if ((val) && val->type == cJSON_Number && val->valuedouble > 0)
1047 				c->localaccount.rxerrs[rxerr_map[i]] = val->valuedouble;
1048 		}
1049 	}
1050 }
1051 
1052 /*
1053  *	Live upgrade: accept old clients
1054  */
1055 
accept_liveupgrade_cJSON_get(cJSON * tree,const char * key,int type,const char * logid)1056 static cJSON *accept_liveupgrade_cJSON_get(cJSON *tree, const char *key, int type, const char *logid)
1057 {
1058 	cJSON *val;
1059 
1060 	val = cJSON_GetObjectItem(tree, key);
1061 
1062 	if (!val) {
1063 		hlog(LOG_ERR, "Live upgrade: Client '%s' JSON: Field '%s' missing", logid, key);
1064 		return NULL;
1065 	}
1066 
1067 	if (val->type != type) {
1068 		hlog(LOG_ERR, "Live upgrade: Client '%s' JSON: Field '%s' has incorrect type %d, expected %d", logid, key, val->type, type);
1069 		return NULL;
1070 	}
1071 
1072 	return val;
1073 }
1074 
accept_liveupgrade_single(cJSON * client,int * rxerr_map,int rxerr_map_len)1075 static int accept_liveupgrade_single(cJSON *client, int *rxerr_map, int rxerr_map_len)
1076 {
1077 	cJSON *fd, *listener_id, *username, *time_connect, *tick_connect;
1078 	cJSON *state;
1079 	cJSON *addr_loc;
1080 	cJSON *udp_port;
1081 	cJSON *app_name, *app_version;
1082 	cJSON *verified;
1083 	cJSON *obuf_q;
1084 	cJSON *bytes_rx, *bytes_tx;
1085 	cJSON *pkts_rx, *pkts_tx, *pkts_ign;
1086 	cJSON *rx_errs;
1087 	cJSON *filter;
1088 	cJSON *ibuf, *obuf;
1089 	cJSON *client_heard;
1090 	cJSON *lat, *lng;
1091 	unsigned addr_len;
1092 	union sockaddr_u sa;
1093 	char *argv[256];
1094 	int i, argc;
1095 	const char *username_s = "unknown";
1096 
1097 	/* get username first, so we can log it later */
1098 	username = accept_liveupgrade_cJSON_get(client, "username", cJSON_String, username_s);
1099 	if (username)
1100 		username_s = username->valuestring;
1101 
1102 	fd = accept_liveupgrade_cJSON_get(client, "fd", cJSON_Number, username_s);
1103 	int fd_i = -1;
1104 	if (fd)
1105 		fd_i = fd->valueint;
1106 
1107 	if (fd_i < 0) {
1108 		hlog(LOG_INFO, "Live upgrade: Client '%s' has negative fd %d, ignoring (corepeer?)", username_s, fd_i);
1109 		return -1;
1110 	}
1111 
1112 	listener_id = accept_liveupgrade_cJSON_get(client, "listener_id", cJSON_Number, username_s);
1113 	state = accept_liveupgrade_cJSON_get(client, "state", cJSON_String, username_s);
1114 	time_connect = accept_liveupgrade_cJSON_get(client, "t_connect", cJSON_Number, username_s);
1115 	addr_loc = accept_liveupgrade_cJSON_get(client, "addr_loc", cJSON_String, username_s);
1116 	app_name = accept_liveupgrade_cJSON_get(client, "app_name", cJSON_String, username_s);
1117 	app_version = accept_liveupgrade_cJSON_get(client, "app_version", cJSON_String, username_s);
1118 	verified = accept_liveupgrade_cJSON_get(client, "verified", cJSON_Number, username_s);
1119 	obuf_q = accept_liveupgrade_cJSON_get(client, "obuf_q", cJSON_Number, username_s);
1120 	bytes_rx = accept_liveupgrade_cJSON_get(client, "bytes_rx", cJSON_Number, username_s);
1121 	bytes_tx = accept_liveupgrade_cJSON_get(client, "bytes_tx", cJSON_Number, username_s);
1122 	pkts_rx = accept_liveupgrade_cJSON_get(client, "pkts_rx", cJSON_Number, username_s);
1123 	pkts_tx = accept_liveupgrade_cJSON_get(client, "pkts_tx", cJSON_Number, username_s);
1124 	pkts_ign = accept_liveupgrade_cJSON_get(client, "pkts_ign", cJSON_Number, username_s);
1125 	rx_errs = accept_liveupgrade_cJSON_get(client, "rx_errs", cJSON_Array, username_s);
1126 	filter = accept_liveupgrade_cJSON_get(client, "filter", cJSON_String, username_s);
1127 
1128 	/* optional */
1129 	tick_connect = cJSON_GetObjectItem(client, "t_connect_tick");
1130 	udp_port = cJSON_GetObjectItem(client, "udp_port");
1131 	ibuf = cJSON_GetObjectItem(client, "ibuf");
1132 	obuf = cJSON_GetObjectItem(client, "obuf");
1133 	client_heard = cJSON_GetObjectItem(client, "client_heard");
1134 	lat = cJSON_GetObjectItem(client, "lat");
1135 	lng = cJSON_GetObjectItem(client, "lng");
1136 
1137 	if (!(
1138 		(fd)
1139 		&& (listener_id)
1140 		&& (state)
1141 		&& (username)
1142 		&& (time_connect)
1143 		&& (addr_loc)
1144 		&& (app_name)
1145 		&& (app_version)
1146 		&& (verified)
1147 		&& (obuf_q)
1148 		&& (bytes_rx)
1149 		&& (bytes_tx)
1150 		&& (pkts_rx)
1151 		&& (pkts_tx)
1152 		&& (pkts_ign)
1153 		&& (rx_errs)
1154 		&& (filter)
1155 		)) {
1156 			hlog(LOG_ERR, "Live upgrade: Fields missing from client JSON, discarding client fd %d", fd_i);
1157 			if (fd_i >= 0)
1158 				close(fd_i);
1159 			return -1;
1160 	}
1161 
1162 	hlog(LOG_DEBUG, "Old client on fd %d: %s", fd->valueint, username->valuestring);
1163 
1164 	/* fetch peer address from the fd instead of parsing it from text */
1165 	addr_len = sizeof(sa);
1166 	if (getpeername(fd->valueint, &sa.sa, &addr_len) != 0) {
1167 		/* Sometimes clients disconnect during upgrade, especially on slow RPi servers... */
1168 		if (errno == ENOTCONN)
1169 			hlog(LOG_INFO, "Live upgrade: Client %s on fd %d has disconnected during upgrade (%s)",
1170 				username->valuestring, fd->valueint, strerror(errno));
1171 		else
1172 			hlog(LOG_ERR, "Live upgrade: getpeername client fd %d failed: %s", fd->valueint, strerror(errno));
1173 		close(fd->valueint);
1174 		return -1;
1175 	}
1176 
1177 	/* convert client address to string */
1178 	char *client_addr_s = strsockaddr( &sa.sa, addr_len );
1179 
1180 	/* find the right listener for this client, for configuration and accounting */
1181 	struct listen_t *l = liveupgrade_find_listener(listener_id->valueint);
1182 	if (!l) {
1183 		hlog(LOG_INFO, "Live upgrade: Listener has been removed for fd %d (%s - local %s): disconnecting %s",
1184 			fd->valueint, client_addr_s, addr_loc->valuestring, username->valuestring);
1185 		close(fd->valueint);
1186 		hfree(client_addr_s);
1187 		return -1;
1188 	}
1189 
1190 	struct client_t *c = accept_client_for_listener(l, fd->valueint, client_addr_s, &sa, addr_len);
1191 	if (!c) {
1192 		hlog(LOG_ERR, "Live upgrade - client_alloc returned NULL, too many clients. Denied client %s on fd %d from %s",
1193 			username->valuestring, fd->valueint, client_addr_s);
1194 		close(fd->valueint);
1195 		hfree(client_addr_s);
1196 		return -1;
1197 	}
1198 
1199 	hfree(client_addr_s);
1200 
1201 	if (strcmp(state->valuestring, "connected") == 0) {
1202 		c->state   = CSTATE_CONNECTED;
1203 		c->handler_line_in = &incoming_handler;
1204 		strncpy(c->username, username->valuestring, sizeof(c->username));
1205 		c->username[sizeof(c->username)-1] = 0;
1206 		c->username_len = strlen(c->username);
1207 	} else if (strcmp(state->valuestring, "login") == 0) {
1208 		c->state   = CSTATE_LOGIN;
1209 		c->handler_line_in = &login_handler;
1210 	} else {
1211 		hlog(LOG_ERR, "Live upgrade: Client %s is in invalid state '%s' (fd %d)", l->addr_s, state->valuestring, l->fd);
1212 		goto err;
1213 	}
1214 	/* distribute keepalive intervals for the existing old clients
1215 	 * but send them rather sooner than later */
1216 	// coverity[dont_call]  // squelch warning: not security sensitive use of random(): load distribution
1217 	c->keepalive = tick + (random() % (keepalive_interval/2));
1218 	/* distribute cleanup intervals over the next 2 minutes */
1219 	// coverity[dont_call]  // squelch warning: not security sensitive use of random(): load distribution
1220 	c->cleanup = tick + (random() % 120);
1221 
1222 	c->connect_time = time_connect->valueint;
1223 	/* live upgrade / backward compatibility: upgrading from <= 1.8.2 requires the 'else' path' */
1224 	if (tick_connect && tick_connect->type == cJSON_Number)
1225 		c->connect_tick = tick_connect->valueint;
1226 	else /* convert to monotonic time */
1227 		c->connect_tick = tick - (now - c->connect_time);
1228 
1229 	c->validated = verified->valueint;
1230 	c->localaccount.rxbytes = bytes_rx->valuedouble;
1231 	c->localaccount.txbytes = bytes_tx->valuedouble;
1232 	c->localaccount.rxpackets = pkts_rx->valuedouble;
1233 	c->localaccount.txpackets = pkts_tx->valuedouble;
1234 	c->localaccount.rxdrops = pkts_ign->valuedouble;
1235 
1236 	login_set_app_name(c, app_name->valuestring, app_version->valuestring);
1237 
1238 	// handle client's filter setting
1239 	if (c->flags & CLFLAGS_USERFILTEROK && (filter) && (filter->valuestring) && *(filter->valuestring)) {
1240 		// archive a copy of the filters, for status display
1241 		strncpy(c->filter_s, filter->valuestring, FILTER_S_SIZE);
1242 		c->filter_s[FILTER_S_SIZE-1] = 0;
1243 		sanitize_ascii_string(c->filter_s);
1244 
1245 		char *f = hstrdup(filter->valuestring);
1246 		argc = parse_args(argv, f);
1247 		for (i = 0; i < argc; ++i) {
1248 			filter_parse(c, argv[i], 1);
1249 		}
1250 		hfree(f);
1251 	}
1252 
1253 	// set up UDP downstream if necessary
1254 	if (udp_port && udp_port->type == cJSON_Number && udp_port->valueint > 1024 && udp_port->valueint < 65536) {
1255 		if (login_setup_udp_feed(c, udp_port->valueint) != 0) {
1256 			hlog(LOG_DEBUG, "%s/%s: Requested UDP on client port with no UDP configured", c->addr_rem, c->username);
1257 		}
1258 	}
1259 
1260 	// fill up ibuf
1261 	if (ibuf && ibuf->type == cJSON_String && ibuf->valuestring) {
1262 		int l = hex_decode(c->ibuf, c->ibuf_size, ibuf->valuestring);
1263 		if (l < 0) {
1264 			hlog(LOG_ERR, "Live upgrade: %s/%s: Failed to decode ibuf: %s", c->addr_rem, c->username, ibuf->valuestring);
1265 		} else {
1266 			c->ibuf_end = l;
1267 			hlog(LOG_DEBUG, "Live upgrade: Decoded ibuf %d bytes: '%.*s'", l, l, c->ibuf);
1268 			hlog(LOG_DEBUG, "Hex: %s", ibuf->valuestring);
1269 		}
1270 	}
1271 
1272 	// fill up obuf
1273 	if (obuf && obuf->type == cJSON_String && obuf->valuestring) {
1274 		int l = hex_decode(c->obuf, c->obuf_size, obuf->valuestring);
1275 		if (l < 0) {
1276 			hlog(LOG_ERR, "Live upgrade: %s/%s: Failed to decode obuf: %s", c->addr_rem, c->username, obuf->valuestring);
1277 		} else {
1278 			c->obuf_start = 0;
1279 			c->obuf_end = l;
1280 			hlog(LOG_DEBUG, "Live upgrade: Decoded obuf %d bytes: '%.*s'", l, l, c->obuf);
1281 			hlog(LOG_DEBUG, "Hex: %s", obuf->valuestring);
1282 		}
1283 	}
1284 
1285 	/* load list of stations heard by this client, to immediately support
1286 	 * messaging
1287 	 */
1288 	if (client_heard && client_heard->type == cJSON_Array)
1289 		client_heard_json_load(c, client_heard);
1290 
1291 	/* load rxerrs counters, with error name string mapping to support
1292 	 * adding/reordering of error counters
1293 	 */
1294 	if (rx_errs && rx_errs->type == cJSON_Array && rxerr_map && rxerr_map_len > 0)
1295 		accept_rx_err_load(c, rx_errs, rxerr_map, rxerr_map_len);
1296 
1297 	/* set client lat/lon, if they're given
1298 	 */
1299 	if (lat && lng && lat->type == cJSON_Number && lng->type == cJSON_Number) {
1300 		c->loc_known = 1;
1301 		c->lat = lat->valuedouble;
1302 		c->lng = lng->valuedouble;
1303 	}
1304 
1305 	hlog(LOG_DEBUG, "%s - Accepted live upgrade client on fd %d from %s", c->addr_loc, c->fd, c->addr_rem);
1306 
1307 	/* set client socket options, return -1 on serious errors */
1308 	if (set_client_sockopt(c) != 0)
1309 		goto err;
1310 
1311 	/* Add the client to the client list. */
1312 	int old_fd = clientlist_add(c);
1313 	if (c->validated && old_fd != -1) {
1314 		/* TODO: If old connection is SSL validated, and this one is not, do not disconnect it. */
1315 		hlog(LOG_INFO, "fd %d: Disconnecting duplicate validated client with username '%s'", old_fd, c->username);
1316 		shutdown(old_fd, SHUT_RDWR);
1317 	}
1318 
1319 	/* ok, found it... lock the new client queue and pass the client */
1320 	if (pass_client_to_worker(pick_next_worker(), c))
1321 		goto err;
1322 
1323 	return 0;
1324 
1325 err:
1326 	close(c->fd);
1327 	inbound_connects_account(0, c->portaccount); /* something failed, remove this from accounts.. */
1328 	client_free(c);
1329 	return -1;
1330 }
1331 
accept_liveupgrade_accept(void)1332 static void accept_liveupgrade_accept(void)
1333 {
1334 	int clen, i;
1335 	int accepted = 0;
1336 	int old_rxerrs_len = 0;
1337 	int *rxerr_map = NULL;
1338 
1339 	hlog(LOG_INFO, "Accept: Collecting live upgrade clients...");
1340 
1341 	/* Create mapping for rx_errs table indexes, so that rxerrs can be
1342 	 * loaded right even if new ones were added in the middle, or if
1343 	 * the error counters were reordered
1344 	 */
1345 	cJSON *rx_err_labels = cJSON_GetObjectItem(liveupgrade_status, "rx_errs");
1346 	if ((rx_err_labels) && rx_err_labels->type == cJSON_Array)
1347 		rxerr_map = accept_rx_err_map(rx_err_labels, &old_rxerrs_len);
1348 
1349 	cJSON *clients = cJSON_GetObjectItem(liveupgrade_status, "clients");
1350 	if (!clients || clients->type != cJSON_Array) {
1351 		hlog(LOG_ERR, "Accept: Live upgrade JSON does not contain 'clients' array!");
1352 	} else {
1353 		clen = cJSON_GetArraySize(clients);
1354 		hlog(LOG_DEBUG, "Clients array length %d", clen);
1355 		for (i = 0; i < clen; i++) {
1356 			cJSON *client = cJSON_GetArrayItem(clients, i);
1357 			if (!client || client->type != cJSON_Object) {
1358 				hlog(LOG_ERR, "Accept: Live upgrade JSON file, get client %d failed", i);
1359 				continue;
1360 			}
1361 			if (accept_liveupgrade_single(client, rxerr_map, old_rxerrs_len) == 0)
1362 				accepted++;
1363 		}
1364 		hlog(LOG_INFO, "Accepted %d of %d old clients in live upgrade", accepted, clen);
1365 		if (accepted != clen)
1366 			hlog(LOG_ERR, "Live upgrade: Failed to accept %d old clients, see above for reasons", clen-accepted);
1367 	}
1368 
1369 	cJSON_Delete(liveupgrade_status);
1370 	liveupgrade_status = NULL;
1371 
1372 	if (rxerr_map)
1373 		hfree(rxerr_map);
1374 }
1375 
1376 /*
1377  *	Accept thread
1378  */
1379 
accept_thread(void * asdf)1380 void accept_thread(void *asdf)
1381 {
1382 	sigset_t sigs_to_block;
1383 	int e, n;
1384 	struct pollfd *acceptpfd = NULL;
1385 	struct listen_t **acceptpl = NULL;
1386 	int listen_n = 0;
1387 	int poll_n = 0;
1388 	struct listen_t *l;
1389 
1390 	pthreads_profiling_reset("accept");
1391 
1392 	sigemptyset(&sigs_to_block);
1393 	sigaddset(&sigs_to_block, SIGALRM);
1394 	sigaddset(&sigs_to_block, SIGINT);
1395 	sigaddset(&sigs_to_block, SIGTERM);
1396 	sigaddset(&sigs_to_block, SIGQUIT);
1397 	sigaddset(&sigs_to_block, SIGHUP);
1398 	sigaddset(&sigs_to_block, SIGURG);
1399 	sigaddset(&sigs_to_block, SIGPIPE);
1400 	sigaddset(&sigs_to_block, SIGUSR1);
1401 	sigaddset(&sigs_to_block, SIGUSR2);
1402 	pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
1403 
1404 	/* start the accept thread, which will start server threads */
1405 	hlog(LOG_INFO, "Accept thread starting...");
1406 
1407 	/* we allocate a worker structure to be used within the accept thread
1408 	 * for parsing incoming UDP packets and passing them on to the dupecheck
1409 	 * thread.
1410 	 */
1411 	udp_worker = worker_alloc();
1412 	udp_worker->id = 81;
1413 
1414 	/* we also need a client structure to be used with incoming
1415 	 * HTTP position uploads
1416 	 */
1417 	udp_pseudoclient = pseudoclient_setup(81);
1418 	udp_pseudoclient->flags |= CLFLAGS_UDPSUBMIT;
1419 
1420 	accept_reconfiguring = 1;
1421 	while (!accept_shutting_down) {
1422 		if (accept_reconfiguring) {
1423 			accept_reconfiguring = 0;
1424 			listen_n -= close_removed_listeners();
1425 
1426 			/* start listening on the sockets */
1427 			listen_n += open_missing_listeners();
1428 
1429 			if (listen_n < 1) {
1430 				hlog(LOG_CRIT, "Failed to listen on any ports.");
1431 				exit(2);
1432 			}
1433 
1434 			/* reconfiguration must scan old clients against ACL */
1435 			rescan_client_acls();
1436 
1437 			/* how many are we polling */
1438 			poll_n = 0;
1439 			for (l = listen_list; (l); l = l->next)
1440 				if (!l->corepeer)
1441 					poll_n++;
1442 
1443 			hlog(LOG_DEBUG, "Generating polling list for %d/%d listeners...", poll_n, listen_n);
1444 
1445 			/* array of FDs for poll() */
1446 			if (acceptpfd)
1447 				hfree(acceptpfd);
1448 			acceptpfd = hmalloc(poll_n * sizeof(*acceptpfd));
1449 
1450 			/* array of listeners */
1451 			if (acceptpl)
1452 				hfree(acceptpl);
1453 			acceptpl = hmalloc(poll_n * sizeof(*acceptpl));
1454 
1455 			n = 0;
1456 			int has_filtered_listeners_now = 0;
1457 			for (l = listen_list; (l); l = l->next) {
1458 				/* The accept thread does not poll() UDP sockets for core peers.
1459 				 * Worker 0 takes care of that, and processes the incoming packets.
1460 				 */
1461 				if (l->corepeer) {
1462 					hlog(LOG_DEBUG, "... %d: fd %d (%s) - not polled, is corepeer", n, (l->udp) ? l->udp->fd : l->fd, l->addr_s);
1463 					continue;
1464 				}
1465 
1466 				int fd;
1467 				if (l->udp) {
1468 					l->udp->polled = 1;
1469 					fd = l->udp->fd;
1470 				} else {
1471 					fd = l->fd;
1472 				}
1473 
1474 				if ((l->filter_s) || (l->client_flags & CLFLAGS_USERFILTEROK))
1475 					has_filtered_listeners_now = 1;
1476 
1477 				hlog(LOG_DEBUG, "... %d: fd %d (%s)", n, fd, l->addr_s);
1478 				acceptpfd[n].fd = fd;
1479 				acceptpfd[n].events = POLLIN|POLLPRI|POLLERR|POLLHUP;
1480 				acceptpl[n] = l;
1481 				n++;
1482 			}
1483 			hlog(LOG_INFO, "Accept thread ready.");
1484 			have_filtered_listeners = has_filtered_listeners_now;
1485 			if (!have_filtered_listeners)
1486 				hlog(LOG_INFO, "Disabled historydb, listeners do not have filtering enabled.");
1487 
1488 			/* stop the dupechecking and uplink threads while adjusting
1489 			 * the amount of workers... they walk the worker list, and
1490 			 * might get confused when workers are stopped or started.
1491 			 */
1492 			if (workers_running != workers_configured) {
1493 				uplink_stop();
1494 				dupecheck_stop();
1495 				workers_start();
1496 				dupecheck_start();
1497 				uplink_start();
1498 			}
1499 
1500 			/*
1501 			 * generate UDP peer clients
1502 			 */
1503 			peerip_clients_close();
1504 			if (peerip_config)
1505 				peerip_clients_config();
1506 
1507 			/* accept liveupgrade clients */
1508 			if (liveupgrade_status)
1509 				accept_liveupgrade_accept();
1510 		} else if (accept_reconfigure_after_tick != 0 && accept_reconfigure_after_tick <= tick) {
1511 			hlog(LOG_INFO, "Trying to reconfigure listeners due to a previous failure");
1512 			accept_reconfiguring = 1;
1513 			accept_reconfigure_after_tick = 0;
1514 		}
1515 
1516 		/* check for new connections */
1517 		e = poll(acceptpfd, poll_n, 200);
1518 		if (e == 0)
1519 			continue;
1520 		if (e < 0) {
1521 			if (errno == EINTR)
1522 				continue;
1523 			hlog(LOG_ERR, "poll() on accept failed: %s (continuing)", strerror(errno));
1524 			continue;
1525 		}
1526 
1527 		/* now, which socket was that on? */
1528 		for (n = 0; n < poll_n; n++) {
1529 			l = acceptpl[n];
1530 			if (!(l) || (l->udp ? l->udp->fd : l->fd) != acceptpfd[n].fd) {
1531 				hlog(LOG_CRIT, "accept_thread: polling list and listener list do mot match!");
1532 				exit(1);
1533 			}
1534 			if (acceptpfd[n].revents) {
1535 				if (l->udp)
1536 					accept_udp_recv(l); /* receive UDP packets */
1537 				else
1538 					do_accept(l); /* accept a single connection */
1539 			}
1540 		}
1541 	}
1542 
1543 	if (accept_shutting_down == 2)
1544 		worker_shutdown_clients = cJSON_CreateArray();
1545 
1546 	hlog(LOG_DEBUG, "Accept thread shutting down listening sockets and worker threads...");
1547 	uplink_stop();
1548 	close_listeners();
1549 	dupecheck_stop();
1550 	http_shutting_down = 1;
1551 	workers_stop(accept_shutting_down);
1552 	hfree(acceptpfd);
1553 	hfree(acceptpl);
1554 	acceptpfd = NULL;
1555 	acceptpl = NULL;
1556 
1557 	/* free up the pseudo-client */
1558 	client_free(udp_pseudoclient);
1559 	udp_pseudoclient = NULL;
1560 
1561 	/* free up the pseudo-worker structure, after dupecheck is long dead */
1562 	worker_free_buffers(udp_worker);
1563 	hfree(udp_worker);
1564 	udp_worker = NULL;
1565 }
1566 
1567 /*
1568  *	generate status information in status.json about the listeners
1569  */
1570 
accept_listener_status(cJSON * listeners,cJSON * totals)1571 int accept_listener_status(cJSON *listeners, cJSON *totals)
1572 {
1573 	int n = 0;
1574 	struct listen_t *l;
1575 	long total_clients = 0;
1576 	long total_connects = 0;
1577 	/*
1578 	 * These aren't totals, these are only for clients, not uplinks.
1579 	 * So, disregard for now.
1580 	long long total_rxbytes = 0;
1581 	long long total_txbytes = 0;
1582 	long long total_rxpackets = 0;
1583 	long long total_txpackets = 0;
1584 	*/
1585 
1586 	for (l = listen_list; (l); l = l->next) {
1587 		if (l->corepeer || l->hidden)
1588 			continue;
1589 		cJSON *jl = cJSON_CreateObject();
1590 		cJSON_AddNumberToObject(jl, "fd", l->fd);
1591 		cJSON_AddNumberToObject(jl, "id", l->id);
1592 		cJSON_AddStringToObject(jl, "name", l->name);
1593 		cJSON_AddStringToObject(jl, "proto", (l->udp) ? "udp" : "tcp");
1594 		cJSON_AddStringToObject(jl, "addr", l->addr_s);
1595 		if (l->filter_s)
1596 			cJSON_AddStringToObject(jl, "filter", l->filter_s);
1597 		cJSON_AddNumberToObject(jl, "clients", l->portaccount->gauge);
1598 		cJSON_AddNumberToObject(jl, "clients_peak", l->portaccount->gauge_max);
1599 		cJSON_AddNumberToObject(jl, "clients_max", l->clients_max);
1600 		cJSON_AddNumberToObject(jl, "connects", l->portaccount->counter);
1601 		cJSON_AddNumberToObject(jl, "bytes_rx", l->portaccount->rxbytes);
1602 		cJSON_AddNumberToObject(jl, "bytes_tx", l->portaccount->txbytes);
1603 		cJSON_AddNumberToObject(jl, "pkts_rx", l->portaccount->rxpackets);
1604 		cJSON_AddNumberToObject(jl, "pkts_tx", l->portaccount->txpackets);
1605 		cJSON_AddNumberToObject(jl, "pkts_ign", l->portaccount->rxdrops);
1606 		cJSON_AddNumberToObject(jl, "pkts_dup", l->portaccount->rxdupes);
1607 		json_add_rxerrs(jl, "rx_errs", l->portaccount->rxerrs);
1608 		cJSON_AddItemToArray(listeners, jl);
1609 
1610 		if (!(l->udp)) {
1611 			total_clients += l->portaccount->gauge;
1612 			total_connects += l->portaccount->counter;
1613 		}
1614 		/*
1615 		total_rxbytes += l->portaccount->rxbytes;
1616 		total_txbytes += l->portaccount->txbytes;
1617 		total_rxpackets += l->portaccount->rxpackets;
1618 		total_txpackets += l->portaccount->txpackets;
1619 		*/
1620 	}
1621 
1622 	cJSON_AddNumberToObject(totals, "clients_max", maxclients);
1623 	cJSON_AddNumberToObject(totals, "clients", total_clients);
1624 	cJSON_AddNumberToObject(totals, "connects", total_connects);
1625 	/*
1626 	cJSON_AddNumberToObject(totals, "bytes_rx", total_rxbytes);
1627 	cJSON_AddNumberToObject(totals, "bytes_tx", total_txbytes);
1628 	cJSON_AddNumberToObject(totals, "pkts_rx", total_rxpackets);
1629 	cJSON_AddNumberToObject(totals, "pkts_tx", total_txpackets);
1630 	*/
1631 
1632 	return n;
1633 }
1634