1 /*
2  *	aprsc
3  *
4  *	(c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
5  *
6  *     This program is licensed under the BSD license, which can be found
7  *     in the file LICENSE.
8  *
9  */
10 
11 /*
12  *	worker.c: the worker thread
13  */
14 
15 #include <errno.h>
16 #include <string.h>
17 #include <signal.h>
18 #include <time.h>
19 #include <stdlib.h>
20 #include <netinet/in.h>
21 #include <fcntl.h>
22 
23 #include "worker.h"
24 
25 #include "config.h"
26 #include "hlog.h"
27 #include "hmalloc.h"
28 #include "login.h"
29 #include "uplink.h"
30 #include "incoming.h"
31 #include "outgoing.h"
32 #include "filter.h"
33 #include "dupecheck.h"
34 #include "clientlist.h"
35 #include "client_heard.h"
36 #include "cellmalloc.h"
37 #include "version.h"
38 #include "status.h"
39 #include "sctp.h"
40 
41 
42 time_t now;	/* current time, updated by the main thread, MAY be spun around by NTP */
43 time_t tick;	/* monotonous clock, may or may not be wallclock */
44 
45 struct worker_t *worker_threads;
46 struct client_udp_t *udppeers;	/* list of listening/receiving UDP peer sockets */
47 
48 struct client_udp_t *udpclients;	/* list of listening/receiving UDP client sockets */
49 /* mutex to protect udpclient chain refcounts */
50 pthread_mutex_t udpclient_mutex = PTHREAD_MUTEX_INITIALIZER;
51 
52 int workers_running;
53 int sock_write_expire  = 25;    /* 25 seconds, smaller than the 30-second dupe check window. */
54 int keepalive_interval = 20;    /* 20 seconds for individual socket, NOT all in sync! */
55 #define KEEPALIVE_POLL_FREQ 2	/* keepalive analysis scan interval */
56 int obuf_writes_threshold = 16;	/* This many writes per keepalive scan interval switch socket
57 				   output to buffered. */
58 int obuf_writes_threshold_hys = 6; /* Less than this, and switch back. */
59 
60 /* global packet buffer */
61 rwlock_t pbuf_global_rwlock = RWL_INITIALIZER;
62 struct pbuf_t  *pbuf_global       = NULL;
63 struct pbuf_t **pbuf_global_prevp = &pbuf_global;
64 struct pbuf_t  *pbuf_global_dupe       = NULL;
65 struct pbuf_t **pbuf_global_dupe_prevp = &pbuf_global_dupe;
66 
67 
68 /* global inbound connects, and protocol traffic accounters */
69 
70 struct portaccount_t inbound_connects = {
71   .mutex    = PTHREAD_MUTEX_INITIALIZER,
72   .refcount = 99,	/* Global static blocks have extra-high initial refcount */
73 };
74 
75 /* global byte/packet counters per protocol */
76 struct portaccount_t client_connects_tcp = {
77   .mutex    = PTHREAD_MUTEX_INITIALIZER,
78   .refcount = 99,	/* Global static blocks have extra-high initial refcount */
79 };
80 struct portaccount_t client_connects_udp = {
81   .mutex    = PTHREAD_MUTEX_INITIALIZER,
82   .refcount = 99,	/* Global static blocks have extra-high initial refcount */
83 };
84 #ifdef USE_SCTP
85 struct portaccount_t client_connects_sctp = {
86   .mutex    = PTHREAD_MUTEX_INITIALIZER,
87   .refcount = 99,	/* Global static blocks have extra-high initial refcount */
88 };
89 #endif
90 
91 int worker_corepeer_client_count = 0;
92 struct client_t *worker_corepeer_clients[MAX_COREPEERS];
93 
94 #ifndef _FOR_VALGRIND_
95 cellarena_t *client_cells;
96 #endif
97 
98 /* clientlist collected at shutdown for live upgrade */
99 cJSON *worker_shutdown_clients = NULL;
100 
101 static struct cJSON *worker_client_json(struct client_t *c, int liveup_info);
102 
103 /* port accounters */
port_accounter_alloc(void)104 struct portaccount_t *port_accounter_alloc(void)
105 {
106 	struct portaccount_t *p;
107 
108 	p = hmalloc(sizeof(*p));
109 	memset(p, 0, sizeof(*p));
110 
111 	p->refcount = 1;
112 	pthread_mutex_init( & p->mutex, NULL );
113 
114 	//hlog(LOG_DEBUG, "new port_accounter %p", p);
115 
116 	return p;
117 }
118 
port_accounter_reject(struct portaccount_t * p)119 static void port_accounter_reject(struct portaccount_t *p)
120 {
121 	int i;
122 	if (!p) return;
123 
124 	if ((i = pthread_mutex_lock(&p->mutex))) {
125 		hlog(LOG_ERR, "port_accounter_reject: could not lock portaccount: %s", strerror(i));
126 		return;
127 	}
128 
129 	++ p->counter;
130 
131 	if ((i = pthread_mutex_unlock(&p->mutex))) {
132 		hlog(LOG_ERR, "port_accounter_reject: could not unlock portaccount: %s", strerror(i));
133 		return;
134 	}
135 }
136 
port_accounter_add(struct portaccount_t * p)137 static void port_accounter_add(struct portaccount_t *p)
138 {
139 	int i;
140 	if (!p) return;
141 
142 	if ((i = pthread_mutex_lock(&p->mutex))) {
143 		hlog(LOG_ERR, "port_accounter_add: could not lock portaccount: %s", strerror(i));
144 		return;
145 	}
146 
147 	//hlog(LOG_DEBUG, "port_accounter_add %p", p);
148 
149 	++ p->refcount;
150 	++ p->counter;
151 	++ p->gauge;
152 
153 	if (p->gauge > p->gauge_max)
154 		p->gauge_max = p->gauge;
155 
156 	if ((i = pthread_mutex_unlock(&p->mutex))) {
157 		hlog(LOG_ERR, "port_accounter_add: could not unlock portaccount: %s", strerror(i));
158 		return;
159 	}
160 }
161 
port_accounter_drop(struct portaccount_t * p)162 void port_accounter_drop(struct portaccount_t *p)
163 {
164 	int i, r;
165 	if (!p) return;
166 
167 	if ((i = pthread_mutex_lock(&p->mutex))) {
168 		hlog(LOG_ERR, "port_accounter_drop: could not lock portaccount: %s", strerror(i));
169 		return;
170 	}
171 
172 	-- p->refcount;
173 	-- p->gauge;
174 
175 	r = p->refcount;
176 
177 	if ((i = pthread_mutex_unlock(&p->mutex))) {
178 		hlog(LOG_ERR, "port_accounter_drop: could not unlock portaccount: %s", strerror(i));
179 		return;
180 	}
181 
182 	//hlog(LOG_DEBUG, "port_accounter_drop(%p) refcount=%d", p, r);
183 
184 	if (r == 0) {
185 		/* Last reference is being destroyed */
186 		hfree(p);
187 	}
188 }
189 
190 /*
191  *	Global and port specific port usage counters
192  */
193 
inbound_connects_account(const int add,struct portaccount_t * p)194 void inbound_connects_account(const int add, struct portaccount_t *p)
195 {	/* add == 2/3  --> UDP "client" socket drop/add, -1 --> rejected connect */
196 	int i;
197 	if (add < 2) {
198 		if ((i = pthread_mutex_lock(& inbound_connects.mutex ))) {
199 			hlog(LOG_ERR, "inbound_connects_account: could not lock inbound_connects: %s", strerror(i));
200 			return;
201 		}
202 
203 		if (add == -1) {
204 			/* just increment connects, it was discarded */
205 			++ inbound_connects.counter;
206 		} else if (add) {
207 			++ inbound_connects.counter;
208 			++ inbound_connects.gauge;
209 			if (inbound_connects.gauge > inbound_connects.gauge_max)
210 				inbound_connects.gauge_max = inbound_connects.gauge;
211 		} else {
212 			-- inbound_connects.gauge;
213 		}
214 
215 		if ((i = pthread_mutex_unlock(& inbound_connects.mutex )))
216 			hlog(LOG_ERR, "inbound_connects_account: could not unlock inbound_connects: %s", strerror(i));
217 	}
218 
219 	if ( p ) {
220 		if ( add == -1 ) {
221 			port_accounter_reject(p);
222 		} else if ( add & 1 ) {
223 			port_accounter_add(p);
224 		} else {
225 			port_accounter_drop(p);
226 		}
227 	}
228 
229 	// hlog( LOG_DEBUG, "inbound_connects_account(), count=%d gauge=%d max=%d",
230 	//       inbound_connects.count, inbound_connects.gauge, inbound_connects.gauge_max );
231 }
232 
233 /* object alloc/free */
234 
client_udp_free(struct client_udp_t * u)235 void client_udp_free(struct client_udp_t *u)
236 {
237 	int i;
238 
239 	if (!u) return;
240 
241 	if ((i = pthread_mutex_lock(& udpclient_mutex ))) {
242 		hlog(LOG_ERR, "client_udp_free: could not lock udpclient_mutex: %s", strerror(i));
243 		return;
244 	}
245 
246 	-- u->refcount;
247 
248 	if (u)
249 		hlog(LOG_DEBUG, "client_udp_free %p port %d refcount now: %d", u, u->portnum, u->refcount);
250 
251 	if ( u->refcount   == 0 ) {
252 		hlog(LOG_DEBUG, "client_udp_free %p port %d FREEING", u, u->portnum);
253 		/* Unchain, and destroy.. */
254 		if (u->next)
255 			u->next->prevp = u->prevp;
256 		*u->prevp = u->next;
257 
258 		close(u->fd);
259 
260 		hfree(u);
261 	}
262 
263 	if ((i = pthread_mutex_unlock(& udpclient_mutex )))
264 		hlog(LOG_ERR, "client_udp_free: could not unlock udpclient_mutex: %s", strerror(i));
265 }
266 
client_udp_find(struct client_udp_t * root,const int af,const int portnum)267 struct client_udp_t *client_udp_find(struct client_udp_t *root, const int af, const int portnum)
268 {
269 	struct client_udp_t *u;
270 	int i;
271 
272 	if ((i = pthread_mutex_lock(& udpclient_mutex ))) {
273 		hlog(LOG_ERR, "client_udp_find: could not lock udpclient_mutex: %s", strerror(i));
274 		return NULL;
275 	}
276 
277 	for (u = root ; u ; u = u->next ) {
278 		if (u->portnum == portnum && u->af == af) {
279 			++ u->refcount;
280 			break;
281 		}
282 	}
283 
284 	//if (u)
285 	//	hlog(LOG_DEBUG, "client_udp_find %u port %d refcount now: %d", u, u->portnum, u->refcount);
286 
287 	if ((i = pthread_mutex_unlock(& udpclient_mutex )))
288 		hlog(LOG_ERR, "client_udp_find: could not unlock udpclient_mutex: %s", strerror(i));
289 
290 	return u;
291 }
292 
293 
client_udp_alloc(struct client_udp_t ** root,int fd,int portnum)294 struct client_udp_t *client_udp_alloc(struct client_udp_t **root, int fd, int portnum)
295 {
296 	struct client_udp_t *c;
297 	int i;
298 
299 	/* TODO: hm, could maybe lock a bit later, just before adding to the udpclient list? */
300 	if ((i = pthread_mutex_lock(& udpclient_mutex ))) {
301 		hlog(LOG_ERR, "client_udp_alloc: could not lock udpclient_mutex: %s", strerror(i));
302 		return NULL;
303 	}
304 
305 	c = hmalloc(sizeof(*c));
306 	c->polled     = 0;
307 	c->fd         = fd;
308 	c->refcount   = 1; /* One reference already on creation */
309 	c->portnum    = portnum;
310 
311 	/* Add this to special list of UDP sockets */
312 	c->next  = *root;
313 	c->prevp = root;
314 	if (c->next)
315 		c->next->prevp = &c->next;
316 	*root = c;
317 
318 	//hlog(LOG_DEBUG, "client_udp_alloc %u port %d refcount now: %d", c, c->portnum, c->refcount);
319 
320 	if ((i = pthread_mutex_unlock(& udpclient_mutex )))
321 		hlog(LOG_ERR, "client_udp_alloc: could not lock udpclient_mutex: %s", strerror(i));
322 
323 	return c;
324 }
325 
326 /*
327  *	Close and free all UDP core peers
328  */
329 
corepeer_close_all(struct worker_t * self)330 static void corepeer_close_all(struct worker_t *self)
331 {
332 	int i;
333 	struct client_t *c;
334 
335 	for (i = 0; i < worker_corepeer_client_count; i++) {
336 		c = worker_corepeer_clients[i];
337 		client_close(self, c, CLIOK_PEERS_CLOSING);
338 		worker_corepeer_clients[i] = NULL;
339 	}
340 
341 	worker_corepeer_client_count = 0;
342 }
343 
344 
345 /*
346  *	set up cellmalloc for clients
347  */
348 
client_init(void)349 void client_init(void)
350 {
351 #ifndef _FOR_VALGRIND_
352 	client_cells  = cellinit( "clients",
353 				  sizeof(struct client_t),
354 				  __alignof__(struct client_t), CELLMALLOC_POLICY_FIFO,
355 				  4096 /* 4 MB at the time */, 0 /* minfree */ );
356 	/* 4 MB arena size -> about 100 clients per single arena
357 	   .. with 40 arenas -> 4000 clients max. */
358 #endif
359 }
360 
client_alloc(void)361 struct client_t *client_alloc(void)
362 {
363 #ifndef _FOR_VALGRIND_
364 	struct client_t *c = cellmalloc(client_cells);
365 	if (!c) {
366 		hlog(LOG_ERR, "client_alloc: cellmalloc failed");
367 		return NULL;
368 	}
369 #else
370 	struct client_t *c = hmalloc(sizeof(*c));
371 #endif
372 	memset((void *)c, 0, sizeof(*c));
373 	c->fd = -1;
374 	c->state = CSTATE_INIT;
375 
376 #ifdef FIXED_IOBUFS
377 	c->ibuf_size = sizeof(c->ibuf);
378 	c->obuf_size = sizeof(c->obuf);
379 #else
380 	c->ibuf_size = ibuf_size;
381 	c->obuf_size = obuf_size;
382 
383 	c->ibuf      = hmalloc(c->ibuf_size);
384 	c->obuf      = hmalloc(c->obuf_size);
385 #endif
386 
387 	c->connect_time = now;
388 	c->connect_tick = tick;
389 	c->obuf_wtime = tick;
390 
391 	c->cleanup   = tick + 120;
392 
393 	return c;
394 }
395 
client_free(struct client_t * c)396 void client_free(struct client_t *c)
397 {
398 	//hlog(LOG_DEBUG, "client_free %p: fd %d name %s addr_loc %s udpclient %p", c, c->fd, c->username, c->addr_loc, c->udpclient);
399 
400 	if (c->fd >= 0)	 close(c->fd);
401 #ifndef FIXED_IOBUFS
402 	if (c->ibuf)     hfree(c->ibuf);
403 	if (c->obuf)     hfree(c->obuf);
404 #endif
405 
406 	filter_free(c->posdefaultfilters);
407 	filter_free(c->negdefaultfilters);
408 	filter_free(c->posuserfilters);
409 	filter_free(c->neguserfilters);
410 
411 	client_heard_free(c);
412 
413 	client_udp_free(c->udpclient);
414 	clientlist_remove(c);
415 
416 #ifdef USE_SSL
417 	if (c->ssl_con)
418 		ssl_free_connection(c);
419 #endif
420 
421 	memset(c, 0, sizeof(*c));
422 
423 #ifndef _FOR_VALGRIND_
424 	cellfree(client_cells, c);
425 #else
426 	hfree(c);
427 #endif
428 }
429 
430 /*
431  *	Set up a pseudoclient for UDP and HTTP submitted packets
432  */
433 
pseudoclient_setup(int portnum)434 struct client_t *pseudoclient_setup(int portnum)
435 {
436 	struct client_t *c;
437 
438 	c = client_alloc();
439 	if (!c) {
440 		hlog(LOG_ERR, "pseudoclient_setup: client_alloc returned NULL");
441 		abort();
442 	}
443 	c->fd    = -1;
444 	c->portnum = portnum;
445 	c->state = CSTATE_CONNECTED;
446 	c->flags = CLFLAGS_INPORT|CLFLAGS_CLIENTONLY;
447 	c->validated = VALIDATED_WEAK; // we will validate on every packet
448 	//c->portaccount = l->portaccount;
449 	c->keepalive = tick;
450 	c->last_read = tick;
451 
452 	//hlog(LOG_DEBUG, "pseudoclient setup %p: fd %d name %s addr_loc %s udpclient %p", c, c->fd, c->username, c->addr_loc, c->udpclient);
453 
454 	return c;
455 }
456 
457 /*
458  *	set client socket options, return -1 on serious errors, just log smaller ones
459  */
460 
set_client_sockopt(struct client_t * c)461 int set_client_sockopt(struct client_t *c)
462 {
463 	/* set non-blocking mode */
464 	if (fcntl(c->fd, F_SETFL, O_NONBLOCK)) {
465 		hlog(LOG_ERR, "%s - Failed to set non-blocking mode on socket: %s", c->addr_rem, strerror(errno));
466 		return -1;
467 	}
468 
469 #ifdef USE_SCTP
470 	/* set socket options specific to SCTP clients */
471 	if (c->ai_protocol == IPPROTO_SCTP)
472 		return sctp_set_client_sockopt(c);
473 #endif
474 
475 	/* Set up TCP keepalives, so that we'll notice idle clients.
476 	 * I'm not sure if this is absolutely required, since we send
477 	 * keepalive datetime messages every 30 seconds from the application,
478 	 * but it can't hurt.
479 	 */
480 #ifdef SO_KEEPALIVE
481 	int keepalive_arg;
482 #ifdef TCP_KEEPIDLE
483 	/* start sending keepalives after socket has been idle for 10 minutes */
484 	keepalive_arg = 10*60;
485 	if (setsockopt(c->fd, IPPROTO_TCP, TCP_KEEPIDLE, (void *)&keepalive_arg, sizeof(keepalive_arg)))
486 		hlog(LOG_ERR, "%s - setsockopt(TCP_KEEPIDLE, %d) failed: %s", c->addr_rem, keepalive_arg, strerror(errno));
487 #endif
488 #ifdef TCP_KEEPINTVL
489 	/* send keepalive probes every 20 seconds after the idle time has passed */
490 	keepalive_arg = 20;
491 	if (setsockopt(c->fd, IPPROTO_TCP, TCP_KEEPINTVL, (void *)&keepalive_arg, sizeof(keepalive_arg)))
492 		hlog(LOG_ERR, "%s - setsockopt(TCP_KEEPINTVL, %d) failed: %s", c->addr_rem, keepalive_arg, strerror(errno));
493 #endif
494 #ifdef TCP_KEEPCNT
495 	/* send 3 probes before giving up */
496 	keepalive_arg = 3;
497 	if (setsockopt(c->fd, IPPROTO_TCP, TCP_KEEPCNT, (void *)&keepalive_arg, sizeof(keepalive_arg)))
498 		hlog(LOG_ERR, "%s - setsockopt(TCP_KEEPCNT, %d) failed: %s", c->addr_rem, keepalive_arg, strerror(errno));
499 #endif
500 	/* enable TCP keepalives */
501 	keepalive_arg = 1;
502 	if (setsockopt(c->fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepalive_arg, sizeof(keepalive_arg)))
503 		hlog(LOG_ERR, "%s - setsockopt(TCP_KEEPALIVE, %d) failed: %s", c->addr_rem, keepalive_arg, strerror(errno));
504 #endif
505 
506 	return 0;
507 }
508 
set_client_sockopt_post_login(struct client_t * c)509 int set_client_sockopt_post_login(struct client_t *c)
510 {
511 	/* Use TCP_NODELAY for APRS-IS sockets. High delays can cause packets getting past
512 	 * the dupe filters.
513 	 */
514 #ifdef TCP_NODELAY
515 	int arg = 1;
516 	if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (void *)&arg, sizeof(arg)))
517 		hlog(LOG_ERR, "%s - setsockopt(TCP_NODELAY, %d) failed: %s", c->addr_rem, arg, strerror(errno));
518 #endif
519 
520 	return 0;
521 }
522 
523 
524 /*
525  *	Pass a new client to a worker thread
526  */
527 
pass_client_to_worker(struct worker_t * wc,struct client_t * c)528 int pass_client_to_worker(struct worker_t *wc, struct client_t *c)
529 {
530 	int pe;
531 
532 	hlog(LOG_DEBUG, "pass_client_to_worker: client on fd %d to thread %d with %d users", c->fd, wc->id, wc->client_count);
533 
534 	if ((pe = pthread_mutex_lock(&wc->new_clients_mutex))) {
535 		hlog(LOG_ERR, "pass_client_to_worker(): could not lock new_clients_mutex: %s", strerror(pe));
536 		return -1;
537 	}
538 
539 	/* push the client in the worker's queue */
540 	c->next = NULL;
541 
542 	if (wc->new_clients_last) {
543 		wc->new_clients_last->next = c;
544 		c->prevp = &wc->new_clients_last->next;
545 	} else {
546 		wc->new_clients = c;
547 		c->prevp = &wc->new_clients;
548 	}
549 
550 	wc->new_clients_last = c;
551 
552 	/* unlock the queue */
553 	if ((pe = pthread_mutex_unlock(&wc->new_clients_mutex))) {
554 		hlog(LOG_ERR, "pass_client_to_worker(): could not unlock new_clients_mutex: %s", strerror(pe));
555 		return -1;
556 	}
557 
558 	return 0;
559 }
560 
strsockaddr(const struct sockaddr * sa,const int addr_len)561 char *strsockaddr(const struct sockaddr *sa, const int addr_len)
562 {
563 	char eb[200], *s;
564 	char sbuf[20];
565 	union sockaddr_u su, *sup;
566 
567 	sup = (union sockaddr_u *)sa;
568 #ifdef IN6_IS_ADDR_V4MAPPED
569 	if ( sa->sa_family == AF_INET6 &&
570 	     ( IN6_IS_ADDR_V4MAPPED(&(sup->si6.sin6_addr)) ||
571 	       IN6_IS_ADDR_V4COMPAT(&(sup->si6.sin6_addr)) ) ) {
572 
573 		memset(&su, 0, sizeof(su));
574 		su.si.sin_family = AF_INET;
575 		su.si.sin_port   = sup->si6.sin6_port;
576 		memcpy(& su.si.sin_addr, &((uint32_t*)(&(sup->si6.sin6_addr)))[3], 4);
577 		sa = &su.sa;
578 		// sup = NULL;
579 		// hlog(LOG_DEBUG, "Translating v4 mapped/compat address..");
580 	}
581 #endif
582 
583 
584 	if ( sa->sa_family == AF_INET ) {
585 		eb[0] = 0;
586 		sbuf[0] = 0;
587 
588 		getnameinfo( sa, addr_len,
589 			    eb, sizeof(eb), sbuf, sizeof(sbuf), NI_NUMERICHOST|NI_NUMERICSERV);
590 		s = eb + strlen(eb);
591 
592 		sprintf(s, ":%s", sbuf);
593 	} else {
594 		/* presumption: IPv6 */
595 		eb[0] = '[';
596 		eb[1] = 0;
597 		sbuf[0] = 0;
598 
599 		getnameinfo( sa, addr_len,
600 			    eb+1, sizeof(eb)-1, sbuf, sizeof(sbuf), NI_NUMERICHOST|NI_NUMERICSERV);
601 		s = eb + strlen(eb);
602 
603 		sprintf(s, "]:%s", sbuf);
604 	}
605 
606 	// if (!sup) hlog(LOG_DEBUG, "... to: %s", eb);
607 
608 	return hstrdup(eb);
609 }
610 
611 /*
612  *	Generate a hexadecimal representation of the socket
613  *	address, as used in the APRS-IS Q construct.
614  */
615 
hexsockaddr(const struct sockaddr * sa,const int addr_len)616 char *hexsockaddr(const struct sockaddr *sa, const int addr_len)
617 {
618 	char eb[200];
619 	union sockaddr_u su, *sup;
620 	struct sockaddr_in *sa_in;
621 	uint8_t *in6;
622 
623 	sup = (union sockaddr_u *)sa;
624 #ifdef IN6_IS_ADDR_V4MAPPED
625 	if ( sa->sa_family == AF_INET6 &&
626 	     ( IN6_IS_ADDR_V4MAPPED(&(sup->si6.sin6_addr)) ||
627 	       IN6_IS_ADDR_V4COMPAT(&(sup->si6.sin6_addr)) ) ) {
628 
629 		memset(&su, 0, sizeof(su));
630 		su.si.sin_family = AF_INET;
631 		su.si.sin_port   = sup->si6.sin6_port;
632 		memcpy(& su.si.sin_addr, &((uint32_t*)(&(sup->si6.sin6_addr)))[3], 4);
633 		sa = &su.sa;
634 		// sup = NULL;
635 		// hlog(LOG_DEBUG, "Translating v4 mapped/compat address..");
636 	}
637 #endif
638 
639 	/* As per the original implementation's example, the hex address is in upper case.
640 	 * For IPv6, there's simply more bytes in there.
641 	 */
642 	if ( sa->sa_family == AF_INET ) {
643 		sa_in = (struct sockaddr_in *)sa;
644 		sprintf(eb, "%02X%02X%02X%02X",
645 			sa_in->sin_addr.s_addr & 0xff,
646 			(sa_in->sin_addr.s_addr >> 8) & 0xff,
647 			(sa_in->sin_addr.s_addr >> 16) & 0xff,
648 			(sa_in->sin_addr.s_addr >> 24) & 0xff
649 			);
650 	} else {
651 		/* presumption: IPv6 */
652 		in6 = (uint8_t *)&sup->si6.sin6_addr.s6_addr;
653 		sprintf(eb, "%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X",
654 			in6[0], in6[1], in6[2], in6[3], in6[4], in6[5], in6[6], in6[7],
655 			in6[8], in6[9], in6[10], in6[11], in6[12], in6[13], in6[14], in6[15]);
656 	}
657 
658 	// if (!sup) hlog(LOG_DEBUG, "... to: %s", eb);
659 
660 	return hstrdup(eb);
661 }
662 
clientaccount_add(struct client_t * c,int l4proto,int rxbytes,int rxpackets,int txbytes,int txpackets,int rxerr,int rxdupes)663 void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int txbytes, int txpackets, int rxerr, int rxdupes)
664 {
665 	struct portaccount_t *pa = NULL;
666 	int rxdrops = 0;
667 
668 	if (rxerr < 0) {
669 		rxdrops = 1;
670 		if (rxerr < INERR_MIN)
671 			rxerr = INERR_UNKNOWN; /* which is 0 */
672 		rxerr *= -1;
673 	}
674 
675 	/* worker local accounters do not need locks */
676 	c->localaccount.rxbytes   += rxbytes;
677 	c->localaccount.txbytes   += txbytes;
678 	c->localaccount.rxpackets += rxpackets;
679 	c->localaccount.txpackets += txpackets;
680 	c->localaccount.rxdupes   += rxdupes;
681 	if (rxdrops) {
682 		c->localaccount.rxdrops += 1;
683 		c->localaccount.rxerrs[rxerr] += 1;
684 	}
685 
686 	if (l4proto == IPPROTO_UDP && c->udpclient && c->udpclient->portaccount) {
687 		pa = c->udpclient->portaccount;
688 	} else if (c->portaccount) {
689 		pa = c->portaccount;
690 	}
691 
692 	if (pa) {
693 #ifdef HAVE_SYNC_FETCH_AND_ADD
694 		__sync_fetch_and_add(&pa->rxbytes, rxbytes);
695 		__sync_fetch_and_add(&pa->txbytes, txbytes);
696 		__sync_fetch_and_add(&pa->rxpackets, rxpackets);
697 		__sync_fetch_and_add(&pa->txpackets, txpackets);
698 		__sync_fetch_and_add(&pa->rxdupes, rxdupes);
699 		if (rxdrops) {
700 			__sync_fetch_and_add(&pa->rxdrops, 1);
701 			__sync_fetch_and_add(&pa->rxerrs[rxerr], 1);
702 		}
703 #else
704 		// FIXME: MUTEX !! -- this may or may not need locks..
705 		pa->rxbytes   += rxbytes;
706 		pa->txbytes   += txbytes;
707 		pa->rxpackets += rxpackets;
708 		pa->txpackets += txpackets;
709 		pa->rxdupes   += rxdupes;
710 		if (rxdrops) {
711 			pa->rxdrops += 1;
712 			pa->rxerrs[rxerr] += 1;
713 		}
714 #endif
715 	}
716 
717 	struct portaccount_t *proto;
718 
719 	if (l4proto == IPPROTO_TCP)
720 		proto = &client_connects_tcp;
721 	else if (l4proto == IPPROTO_UDP)
722 		proto = &client_connects_udp;
723 #ifdef USE_SCTP
724 	else if (l4proto == IPPROTO_SCTP)
725 		proto = &client_connects_sctp;
726 #endif
727 	else
728 		return;
729 
730 #ifdef HAVE_SYNC_FETCH_AND_ADD
731 	__sync_fetch_and_add(&proto->rxbytes, rxbytes);
732 	__sync_fetch_and_add(&proto->txbytes, txbytes);
733 	__sync_fetch_and_add(&proto->rxpackets, rxpackets);
734 	__sync_fetch_and_add(&proto->txpackets, txpackets);
735 	if (rxdrops) {
736 		__sync_fetch_and_add(&proto->rxdrops, 1);
737 		__sync_fetch_and_add(&proto->rxerrs[rxerr], 1);
738 	}
739 #else
740 	// FIXME: MUTEX !! -- this may or may not need locks..
741 	proto->rxbytes   += rxbytes;
742 	proto->txbytes   += txbytes;
743 	proto->rxpackets += rxpackets;
744 	proto->txpackets += txpackets;
745 	if (rxdrops) {
746 		proto->rxdrops += 1;
747 		proto->rxerrs[rxerr] += 1;
748 	}
749 #endif
750 }
751 
protocol_str(struct client_t * c)752 static const char *protocol_str(struct client_t *c)
753 {
754 	static const char unknown[] = "UNKNOWN-PROTOCOL";
755 	static const char tcp[] = "TCP";
756 	static const char tcp_udp[] = "TCP+UDP";
757 	static const char udp[] = "UDP";
758 
759 	if (c->ai_protocol == IPPROTO_TCP) {
760 		if (c->udp_port)
761 			return tcp_udp;
762 
763 		return tcp;
764 	}
765 
766 	if (c->ai_protocol == IPPROTO_UDP)
767 		return udp;
768 
769 #ifdef USE_SCTP
770 	static const char sctp[] = "SCTP";
771 
772 	if (c->ai_protocol == IPPROTO_SCTP)
773 		return sctp;
774 #endif
775 
776 	return unknown;
777 }
778 
779 /*
780  *	close and forget a client connection
781  */
782 
client_close(struct worker_t * self,struct client_t * c,int errnum)783 void client_close(struct worker_t *self, struct client_t *c, int errnum)
784 {
785 	int pe;
786 
787 	// TODO: log validation status, ssl status, ssl cert info, tcp/sctp
788 
789 	hlog( LOG_INFO, "%s %s %s (%s) closed after %d s: %s, tx/rx %lld/%lld bytes %lld/%lld pkts, dropped %lld, fd %d, worker %d%s%s%s%s",
790 	      ( (c->flags & CLFLAGS_UPLINKPORT)
791 			  ? ((c->state == CSTATE_COREPEER) ? "Peer" : "Uplink") : "Client" ),
792 			  	protocol_str(c),
793 			  	c->addr_rem,
794 			  	((c->username[0]) ? c->username : "?"),
795 			  	tick - c->connect_tick,
796 			  	((errnum >= 0) ? strerror(errnum) : aprsc_strerror(errnum)),
797 			  	c->localaccount.txbytes,
798 			  	c->localaccount.rxbytes,
799 			  	c->localaccount.txpackets,
800 			  	c->localaccount.rxpackets,
801 			  	c->localaccount.rxdrops,
802 			  	c->fd,
803 			  	self->id,
804 			  	(*c->app_name) ? " app " : "",
805 			  	(*c->app_name) ? c->app_name : "",
806 			  	(*c->app_version) ? " ver " : "",
807 			  	(*c->app_version) ? c->app_version : ""
808 			  	);
809 
810 	if (c->localaccount.rxdrops) {
811 		char s[256] = "";
812 		int p = 0;
813 		int i;
814 
815 		for (i = 0; i < INERR_BUCKETS; i++) {
816 			if (c->localaccount.rxerrs[i]) {
817 				p += snprintf(s+p, 256-p-2, "%s%s %lld",
818 					(p == 0) ? "" : ", ",
819 					inerr_labels[i], c->localaccount.rxerrs[i]);
820 			}
821 		}
822 
823 		hlog(LOG_INFO, "%s (%s) rx drops: %s",
824 			c->addr_rem, c->username, s);
825 	}
826 
827 	/* remove from polling list */
828 	if (c->xfd) {
829 		//hlog(LOG_DEBUG, "client_close: xpoll_remove %p fd %d", c->xfd, c->xfd->fd);
830 		xpoll_remove(&self->xp, c->xfd);
831 	}
832 
833 	/* close */
834 	if (c->fd >= 0) {
835 		close(c->fd);
836 	}
837 
838 	c->fd = -1;
839 
840 	/* If this thread already owns the mutex (we're closing the socket
841 	 * while traversing the thread's client list), FreeBSD's mutex lock
842 	 * will fail with EDEADLK:
843 	 *
844 	 * 2012/08/15 10:03:34.065703 aprsc[41159:800f12fc0] ERROR:
845 	 * client_close(worker 1): could not lock clients_mutex: Resource deadlock
846 	 * avoided
847 	 *
848 	 * If this happens, let's remember we've locked the mutex earlier,
849 	 * and let's not unlock it either.
850 	 *
851 	 * The current example of this is when collect_new_clients() sends
852 	 * the "# aprsc VERSION" login string to new clients. The client_printf()
853 	 * may fail if the client connects and disconnects very quickly, and
854 	 * this will cause it to client_close() during the collection loop.
855 	 */
856 	if ((pe = pthread_mutex_lock(&self->clients_mutex)) && pe != EDEADLK) {
857 		hlog(LOG_ERR, "client_close(worker %d): could not lock clients_mutex: %s", self->id, strerror(pe));
858 		return;
859 	}
860 	if (pe == EDEADLK) {
861 		hlog(LOG_ERR, "client_close(worker %d): could not lock clients_mutex (ignoring): %s", self->id, strerror(pe));
862 	}
863 
864 	/* link the list together over this node */
865 	if (c->next)
866 		c->next->prevp = c->prevp;
867 	*c->prevp = c->next;
868 
869 	/* link the classified clients list together over this node, but only if
870 	 * the client has fully logged in and classification has been done
871 	 */
872 	if (c->class_prevp) {
873 		*c->class_prevp = c->class_next;
874 		if (c->class_next)
875 			c->class_next->class_prevp = c->class_prevp;
876 	}
877 
878 	/* If this happens to be the uplink, tell the uplink connection
879 	 * setup module that the connection has gone away.
880 	 */
881 	if (c->flags & CLFLAGS_UPLINKPORT && c->state != CSTATE_COREPEER)
882 		uplink_close(c, errnum);
883 
884 	if (c->portaccount) {
885 		/* If port accounting is done, handle population accounting... */
886 		//hlog(LOG_DEBUG, "client_close dropping inbound_connects_account %p", c->portaccount);
887 		inbound_connects_account(0, c->portaccount);
888 		c->portaccount = NULL;
889 	} else {
890 		hlog(LOG_DEBUG, "client_close: has no portaccount");
891 	}
892 
893 	if (c->udp_port && c->udpclient->portaccount) {
894 		inbound_connects_account(2, c->udpclient->portaccount); /* udp client count goes down */
895 	}
896 
897 	/* free it up */
898 	client_free(c);
899 
900 	/* if we held the lock before locking, let's not unlock it either */
901 	if (pe == EDEADLK) {
902 		hlog(LOG_ERR, "client_close(worker %d): closed client while holding clients_mutex", self->id);
903 	} else {
904 		if ((pe = pthread_mutex_unlock(&self->clients_mutex))) {
905 			hlog(LOG_ERR, "client_close(worker %d): could not unlock clients_mutex: %s", self->id, strerror(pe));
906 			exit(1);
907 		}
908 	}
909 
910 	/* reduce client counter */
911 	self->client_count--;
912 }
913 
udp_client_write(struct worker_t * self,struct client_t * c,char * p,int len)914 int udp_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
915 {
916 	/* Every packet ends with CRLF, but they are not sent over UDP ! */
917 	/* Existing system doesn't send keepalives via UDP.. */
918 	int i = sendto( c->udpclient->fd, p, len-2, MSG_DONTWAIT,
919 		    &c->udpaddr.sa, c->udpaddrlen );
920 
921 	if (i < 0) {
922 		hlog(LOG_ERR, "UDP transmit error to %s udp port %d: %s",
923 			c->addr_rem, c->udp_port, strerror(errno));
924 	} else if (i != len -2) {
925 		hlog(LOG_ERR, "UDP transmit incomplete to %s udp port %d: wrote %d of %d bytes, errno: %s",
926 			c->addr_rem, c->udp_port, i, len-2, strerror(errno));
927 	}
928 
929 	// hlog( LOG_DEBUG, "UDP from %d to client port %d, sendto rc=%d", c->udpclient->portnum, c->udp_port, i );
930 
931 	if (i > 0)
932 		clientaccount_add( c, IPPROTO_UDP, 0, 0, i, 0, 0, 0);
933 
934 	return i;
935 }
936 
937 /*
938  *	Put outgoing data in obuf
939  */
940 
client_buffer_outgoing_data(struct worker_t * self,struct client_t * c,char * p,int len)941 static int client_buffer_outgoing_data(struct worker_t *self, struct client_t *c, char *p, int len)
942 {
943 	if (c->obuf_end + len > c->obuf_size) {
944 		/* Oops, cannot append the data to the output buffer.
945 		 * Check if we can make space for it by moving data
946 		 * towards the beginning of the buffer?
947 		 */
948 		if (len > c->obuf_size - (c->obuf_end - c->obuf_start)) {
949 			/* Oh crap, the data will not fit even if we move stuff. */
950 			hlog(LOG_DEBUG, "client_write(%s) can not fit new data in buffer; disconnecting", c->addr_rem);
951 			client_close(self, c, CLIERR_OUTPUT_BUFFER_FULL);
952 			return -12;
953 		}
954 
955 		/* okay, move stuff to the beginning to make space in the end */
956 		if (c->obuf_start > 0)
957 			memmove((void *)c->obuf, (void *)c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
958 		c->obuf_end  -= c->obuf_start;
959 		c->obuf_start = 0;
960 	}
961 
962 	/* copy data to the output buffer */
963 	if (len > 0)
964 		memcpy((void *)c->obuf + c->obuf_end, p, len);
965 	c->obuf_end += len;
966 
967 	return 0;
968 }
969 
970 /*
971  *	write data to a client (well, at least put it in the output buffer)
972  *	(this is also used with len=0 to flush current buffer)
973  */
974 
975 #ifdef USE_SSL
ssl_client_write(struct worker_t * self,struct client_t * c,char * p,int len)976 static int ssl_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
977 {
978 	c->obuf_writes++;
979 
980 	if (len > 0)
981 		clientaccount_add( c, c->ai_protocol, 0, 0, len, 0, 0, 0);
982 
983 	if (client_buffer_outgoing_data(self, c, p, len) == -12)
984 		return -12;
985 
986 	if (c->obuf_end > c->obuf_flushsize || ((len == 0) && (c->obuf_end > c->obuf_start)))
987 		return ssl_write(self, c);
988 
989 	/* tell the poller that we have outgoing data */
990 	xpoll_outgoing(&self->xp, c->xfd, 1);
991 
992 	/* just buffer */
993 	return len;
994 }
995 #endif
996 
tcp_client_write(struct worker_t * self,struct client_t * c,char * p,int len)997 static int tcp_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
998 {
999 	int i, e;
1000 
1001 	//hlog(LOG_DEBUG, "client_write: %*s\n", len, p);
1002 
1003 	/* a TCP client with a udp downstream socket? */
1004 	if (c->udp_port && c->udpclient && len > 0 && *p != '#') {
1005 		return udp_client_write(self, c, p, len);
1006 	}
1007 
1008 	/* Count the number of writes towards this client,  the keepalive
1009 	   manager monitors this counter to determine if the socket should be
1010 	   kept in BUFFERED mode, or written immediately every time.
1011 	   Buffer flushing is done every KEEPALIVE_POLL_FREQ (2) seconds.
1012 	*/
1013 	c->obuf_writes++;
1014 
1015 	if (len > 0) {
1016 		/* Here, we only increment the bytes counter. Packets counter
1017 		 * will be incremented only when we actually transmit a packet
1018 		 * instead of a keepalive.
1019 		 */
1020 		clientaccount_add( c, c->ai_protocol, 0, 0, len, 0, 0, 0);
1021 	}
1022 
1023 	if (client_buffer_outgoing_data(self, c, p, len) == -12)
1024 		return -12;
1025 
1026 	/* Is it over the flush size ? */
1027 	if (c->obuf_end > c->obuf_flushsize || ((len == 0) && (c->obuf_end > c->obuf_start))) {
1028 		/* TODO: move this code to client_try_write and call it */
1029 
1030 		/*if (c->obuf_end > c->obuf_flushsize)
1031 		 *	hlog(LOG_DEBUG, "flushing fd %d since obuf_end %d > %d", c->fd, c->obuf_end, c->obuf_flushsize);
1032 		 */
1033 	write_retry_2:;
1034 		i = write(c->fd, c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
1035 		e = errno;
1036 		if (i < 0 && e == EINTR)
1037 			goto write_retry_2;
1038 		if (i < 0 && e == EPIPE) {
1039 			/* Remote socket closed.. */
1040 			hlog(LOG_DEBUG, "client_write(%s) fails/2 EPIPE; disconnecting; %s", c->addr_rem, strerror(e));
1041 			// WARNING: This also destroys the client object!
1042 			client_close(self, c, e);
1043 			return -9;
1044 		}
1045 		if (i < 0 && (e == EAGAIN || e == EWOULDBLOCK)) {
1046 			/* Kernel's transmit buffer is full (per-socket or some more global resource).
1047 			 * This happens even with small amounts of data in real world:
1048 			 * aprsc INFO: Client xx.yy.zz.ff:22823 (XXXXX) closed after 1 s:
1049 			 *    Resource temporarily unavailable, tx/rx 735/51 bytes 8/0 pkts,
1050 			 *    dropped 0, fd 59, worker 1 app aprx ver 2.00
1051 			 */
1052 			hlog(LOG_DEBUG, "client_write(%s) fails/2c; %s", c->addr_rem, strerror(e));
1053 			return -1;
1054 		}
1055 		if (i < 0 && len != 0) {
1056 			hlog(LOG_DEBUG, "client_write(%s) fails/2d; disconnecting; %s", c->addr_rem, strerror(e));
1057 			client_close(self, c, e);
1058 			return -11;
1059 		}
1060 		if (i > 0) {
1061 			//hlog(LOG_DEBUG, "client_write(%s) wrote %d", c->addr_rem, i);
1062 			c->obuf_start += i;
1063 			c->obuf_wtime = tick;
1064 		}
1065 	}
1066 
1067 	/* All done ? */
1068 	if (c->obuf_start >= c->obuf_end) {
1069 		//hlog(LOG_DEBUG, "client_write(%s) obuf empty", c->addr_rem);
1070 		c->obuf_start = 0;
1071 		c->obuf_end   = 0;
1072 		return len;
1073 	}
1074 
1075 	/* tell the poller that we have outgoing data */
1076 	xpoll_outgoing(&self->xp, c->xfd, 1);
1077 
1078 	return len;
1079 }
1080 
1081 /*
1082  *	printf to a client
1083  */
1084 
client_printf(struct worker_t * self,struct client_t * c,const char * fmt,...)1085 int client_printf(struct worker_t *self, struct client_t *c, const char *fmt, ...)
1086 {
1087 	va_list args;
1088 	char s[PACKETLEN_MAX];
1089 	int i;
1090 
1091 	va_start(args, fmt);
1092 	i = vsnprintf(s, PACKETLEN_MAX, fmt, args);
1093 	va_end(args);
1094 
1095 	if (i < 0 || i >= PACKETLEN_MAX) {
1096 		hlog(LOG_ERR, "client_printf vsnprintf failed to %s: '%s'", c->addr_rem, fmt);
1097 		return -1;
1098 	}
1099 
1100 	return c->write(self, c, s, i);
1101 }
1102 
1103 /*
1104  *	tell the client once that it has bad filter definition
1105  */
client_bad_filter_notify(struct worker_t * self,struct client_t * c,const char * filt)1106 int client_bad_filter_notify(struct worker_t *self, struct client_t *c, const char *filt)
1107 {
1108 	if (!c->warned) {
1109 		c->warned = 1;
1110 		return client_printf(self, c, "# Warning: Bad filter: %s\r\n", filt);
1111 	}
1112 	return 0;
1113 }
1114 
1115 /*
1116  *	Receive UDP packets from a core peer
1117  */
1118 
handle_corepeer_readable(struct worker_t * self,struct client_t * c)1119 static int handle_corepeer_readable(struct worker_t *self, struct client_t *c)
1120 {
1121 	struct client_t *rc = NULL; // real client
1122 	union sockaddr_u addr;
1123 	socklen_t addrlen;
1124 	int i;
1125 	int r;
1126 	char *addrs;
1127 
1128 	addrlen = sizeof(addr);
1129 	r = recvfrom( c->udpclient->fd, c->ibuf, c->ibuf_size-1,
1130 		MSG_DONTWAIT|MSG_TRUNC, (struct sockaddr *)&addr, &addrlen );
1131 
1132 	if (r < 0) {
1133 		if (errno == EINTR || errno == EAGAIN)
1134 			return 0; /* D'oh..  return again latter */
1135 
1136 		hlog( LOG_DEBUG, "recv: Error from corepeer UDP socket fd %d (%s): %s",
1137 			c->udpclient->fd, c->addr_rem, strerror(errno));
1138 
1139 		return 0;
1140 	}
1141 
1142 	if (r == 0) {
1143 		hlog( LOG_DEBUG, "recv: EOF from corepeer UDP socket fd %d (%s)",
1144 			c->udpclient->fd, c->addr_rem);
1145 		return 0;
1146 	}
1147 
1148 	// Figure the correct client/peer based on the remote IP address.
1149 	for (i = 0; i < worker_corepeer_client_count; i++) {
1150 		rc = worker_corepeer_clients[i];
1151 
1152 		if (rc->udpaddrlen != addrlen)
1153 			continue;
1154 		if (rc->udpaddr.sa.sa_family != addr.sa.sa_family)
1155 			continue;
1156 
1157 		if (addr.sa.sa_family == AF_INET) {
1158 			if (memcmp(&rc->udpaddr.si.sin_addr, &addr.si.sin_addr, sizeof(addr.si.sin_addr)) != 0)
1159 				continue;
1160 			if (rc->udpaddr.si.sin_port != addr.si.sin_port)
1161 				continue;
1162 
1163 			break;
1164 		} else if (addr.sa.sa_family == AF_INET6) {
1165 			if (memcmp(&rc->udpaddr.si6.sin6_addr, &addr.si6.sin6_addr, sizeof(addr.si6.sin6_addr)) != 0)
1166 				continue;
1167 			if (rc->udpaddr.si6.sin6_port != addr.si6.sin6_port)
1168 				continue;
1169 
1170 			break;
1171 		}
1172 	}
1173 
1174 	if (i == worker_corepeer_client_count || !rc) {
1175 		addrs = strsockaddr(&addr.sa, addrlen);
1176 		hlog(LOG_INFO, "recv: Received UDP peergroup packet from unknown peer address %s: %*s", addrs, r, c->ibuf);
1177 		hfree(addrs);
1178 		return 0;
1179 	}
1180 
1181 	/*
1182 	addrs = strsockaddr(&addr.sa, addrlen);
1183 	hlog(LOG_DEBUG, "worker thread passing UDP packet from %s to handler: %*s", addrs, r, c->ibuf);
1184 	hfree(addrs);
1185 	*/
1186 	clientaccount_add( rc, IPPROTO_UDP, r, 0, 0, 0, 0, 0); /* Account byte count. incoming_handler() will account packets. */
1187 	rc->last_read = tick;
1188 
1189 	/* Ignore CRs and LFs in UDP input packet - the current core peer system puts 1 APRS packet in each
1190 	 * UDP frame.
1191 	 * TODO: consider processing multiple packets from an UDP frame, split up by CRLF.
1192 	 */
1193 	for (i = 0; i < r; i++) {
1194 		if (c->ibuf[i] == '\r' || c->ibuf[i] == '\n') {
1195 			r = i;
1196 			break;
1197 		}
1198 	}
1199 
1200 	c->handler_line_in(self, rc, IPPROTO_UDP, c->ibuf, r);
1201 
1202 	return 0;
1203 }
1204 
1205 /*
1206  *	Process incoming data from client after reading
1207  */
1208 
client_postread(struct worker_t * self,struct client_t * c,int r)1209 int client_postread(struct worker_t *self, struct client_t *c, int r)
1210 {
1211 	char *s;
1212 	char *ibuf_end;
1213 	char *row_start;
1214 
1215 	clientaccount_add(c, c->ai_protocol, r, 0, 0, 0, 0, 0); /* Number of packets is now unknown,
1216 					     byte count is collected.
1217 					     The incoming_handler() will account
1218 					     packets. */
1219 
1220 	c->ibuf_end += r;
1221 	// hlog( LOG_DEBUG, "read: %d bytes from client fd %d (%s) - %d in ibuf",
1222 	//       r, c->fd, c->addr_rem, c->ibuf_end);
1223 
1224 	/* parse out rows ending in CR and/or LF and pass them to the handler
1225 	 * without the CRLF (we accept either CR or LF or both, but make sure
1226 	 * to always output CRLF
1227 	 */
1228 	ibuf_end = c->ibuf + c->ibuf_end;
1229 	row_start = c->ibuf;
1230 	c->last_read = tick; /* not simulated time */
1231 
1232 	for (s = c->ibuf; s < ibuf_end; s++) {
1233 		if (*s == '\r' || *s == '\n') {
1234 			/* found EOL */
1235 			if (s - row_start > 0) {
1236 				// int ch = *s;
1237 				// *s = 0;
1238 				// hlog( LOG_DEBUG, "got: %s\n", row_start );
1239 				// *s = ch;
1240 
1241 				/* NOTE: handler call CAN destroy the c-> object ! */
1242 				if (c->handler_line_in(self, c, c->ai_protocol, row_start, s - row_start) < 0)
1243 					return -1;
1244 			}
1245 			/* skip the first, just-found part of EOL, which might have been
1246 			 * NULled by the login handler (TODO: make it not NUL it) */
1247 			s++;
1248 			/* skip the rest of EOL */
1249 			while (s < ibuf_end && (*s == '\r' || *s == '\n'))
1250 				s++;
1251 			row_start = s;
1252 		}
1253 	}
1254 
1255 	if (row_start >= ibuf_end) {
1256 		/* ok, we processed the whole buffer, just mark it empty */
1257 		c->ibuf_end = 0;
1258 	} else if (row_start != c->ibuf) {
1259 		/* ok, we found data... move the buffer contents to the beginning */
1260 		c->ibuf_end = ibuf_end - row_start;
1261 		memmove(c->ibuf, row_start, c->ibuf_end);
1262 	}
1263 
1264 	return 0;
1265 }
1266 
1267 
1268 
1269 /*
1270  *	handle an event on an fd
1271  */
1272 
handle_client_readable(struct worker_t * self,struct client_t * c)1273 static int handle_client_readable(struct worker_t *self, struct client_t *c)
1274 {
1275 	int r;
1276 
1277 	r = read(c->fd, c->ibuf + c->ibuf_end, c->ibuf_size - c->ibuf_end - 1);
1278 
1279 	if (r == 0) {
1280 		hlog( LOG_DEBUG, "read: EOF from socket fd %d (%s @ %s)",
1281 		      c->fd, c->addr_rem, c->addr_loc );
1282 		client_close(self, c, CLIERR_EOF);
1283 		return -1;
1284 	}
1285 
1286 	if (r < 0) {
1287 		if (errno == EINTR || errno == EAGAIN)
1288 			return 0; /* D'oh..  return again later */
1289 
1290 		hlog( LOG_DEBUG, "read: Error from socket fd %d (%s): %s",
1291 		      c->fd, c->addr_rem, strerror(errno));
1292 		hlog( LOG_DEBUG, " .. ibuf=%p  ibuf_end=%d  ibuf_size=%d",
1293 		      c->ibuf, c->ibuf_end, c->ibuf_size-c->ibuf_end-1);
1294 		client_close(self, c, errno);
1295 		return -1;
1296 	}
1297 
1298 	return client_postread(self, c, r);
1299 }
1300 
1301 /*
1302  *	client fd is now writaable
1303  */
1304 
handle_client_writable(struct worker_t * self,struct client_t * c)1305 static int handle_client_writable(struct worker_t *self, struct client_t *c)
1306 {
1307 	int r;
1308 
1309 	/* TODO: call client_try_write */
1310 	r = write(c->fd, c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
1311 	if (r < 0) {
1312 		if (errno == EINTR || errno == EAGAIN) {
1313 			hlog(LOG_DEBUG, "writable: Would block fd %d (%s): %s", c->fd, c->addr_rem, strerror(errno));
1314 			return 0;
1315 		}
1316 
1317 		hlog(LOG_DEBUG, "writable: Error from socket fd %d (%s): %s", c->fd, c->addr_rem, strerror(errno));
1318 		client_close(self, c, errno);
1319 		return -1;
1320 	}
1321 
1322 	c->obuf_start += r;
1323 	//hlog(LOG_DEBUG, "writable: %d bytes to socket fd %d (%s) - %d in obuf", r, c->fd, c->addr_rem, c->obuf_end - c->obuf_start);
1324 
1325 	if (c->obuf_start == c->obuf_end) {
1326 		xpoll_outgoing(&self->xp, c->xfd, 0);
1327 		c->obuf_start = c->obuf_end = 0;
1328 	}
1329 
1330 	return 0;
1331 }
1332 
handle_client_event(struct xpoll_t * xp,struct xpoll_fd_t * xfd)1333 static int handle_client_event(struct xpoll_t *xp, struct xpoll_fd_t *xfd)
1334 {
1335 	struct worker_t *self = (struct worker_t *)xp->tp;
1336 	struct client_t *c    = (struct client_t *)xfd->p;
1337 
1338 	//hlog(LOG_DEBUG, "handle_client_event(%d): %d", xfd->fd, xfd->result);
1339 
1340 	if (xfd->result & XP_OUT) {  /* priorize doing output */
1341 		/* ah, the client is writable */
1342 
1343 		if (c->obuf_start == c->obuf_end) {
1344 			/* there is nothing to write any more */
1345 			//hlog(LOG_DEBUG, "client writable: nothing to write on fd %d (%s)", c->fd, c->addr_rem);
1346 			xpoll_outgoing(&self->xp, c->xfd, 0);
1347 			c->obuf_start = c->obuf_end = 0;
1348 		} else {
1349 			if (c->handler_client_writable(self, c) < 0)
1350 				return 0;
1351 		}
1352 	}
1353 
1354 	if (xfd->result & XP_IN) {  /* .. before doing input */
1355 		/* Is this really necessary any more?
1356 		if (c->fd < 0) {
1357 			hlog(LOG_DEBUG, "fd %d: socket no longer alive, closing (%s)", c->fd, c->addr_rem);
1358 			client_close(self, c, CLIERR_FD_NUM_INVALID);
1359 			return -1;
1360 		}
1361 		*/
1362 
1363 		/* ok, read */
1364 		c->handler_client_readable(self, c);
1365 		/* c might be invalid now, don't touch it */
1366 	}
1367 
1368 	return 0;
1369 }
1370 
1371 /*
1372  *	Classify a client and put it in the correct outgoing processor class
1373  *	list. This will enable outgoing packet transmission for the client.
1374  */
1375 
worker_classify_client(struct worker_t * self,struct client_t * c)1376 static void worker_classify_client(struct worker_t *self, struct client_t *c)
1377 {
1378 	struct client_t *class_next = NULL;
1379 	struct client_t **class_prevp = NULL;
1380 
1381 	if (c->flags & CLFLAGS_PORT_RO) {
1382 		//hlog(LOG_DEBUG, "classify_client(worker %d): client fd %d classified readonly", self->id, c->fd);
1383 		class_next = self->clients_ro;
1384 		class_prevp = &self->clients_ro;
1385 	} else if (c->state == CSTATE_COREPEER || (c->flags & CLFLAGS_UPLINKPORT)) {
1386 		//hlog(LOG_DEBUG, "classify_client(worker %d): client fd %d classified upstream/peer", self->id, c->fd);
1387 		class_next = self->clients_ups;
1388 		class_prevp = &self->clients_ups;
1389 	} else if (c->flags & CLFLAGS_DUPEFEED) {
1390 		//hlog(LOG_DEBUG, "classify_client(worker %d): client fd %d classified dupefeed", self->id, c->fd);
1391 		class_next = self->clients_dupe;
1392 		class_prevp = &self->clients_dupe;
1393 	} else if (c->flags & CLFLAGS_INPORT) {
1394 		//hlog(LOG_DEBUG, "classify_client(worker %d): client fd %d classified other", self->id, c->fd);
1395 		class_next = self->clients_other;
1396 		class_prevp = &self->clients_other;
1397 	} else {
1398 		hlog(LOG_ERR, "classify_client(worker %d): client fd %d NOT CLASSIFIED - will not get any packets", self->id, c->fd);
1399 		return;
1400 	}
1401 
1402 	c->class_next = class_next;
1403 	if (class_next)
1404 		class_next->class_prevp = &c->class_next;
1405 	*class_prevp = c;
1406 	c->class_prevp = class_prevp;
1407 }
1408 
1409 /*
1410  *	Mark the client connected and do whatever processing is needed
1411  *	to start transmitting data to it.
1412  */
1413 
worker_mark_client_connected(struct worker_t * self,struct client_t * c)1414 void worker_mark_client_connected(struct worker_t *self, struct client_t *c)
1415 {
1416 	c->state = CSTATE_CONNECTED;
1417 
1418 	set_client_sockopt_post_login(c);
1419 
1420 	/* classify the client and put it in the right list of clients for
1421 	 * outgoing data to start flowing.
1422 	 */
1423 	worker_classify_client(self, c);
1424 }
1425 
1426 /*
1427  *	move new clients from the new clients queue to the worker thread
1428  */
1429 
collect_new_clients(struct worker_t * self)1430 static void collect_new_clients(struct worker_t *self)
1431 {
1432 	int pe, n, i;
1433 	struct client_t *new_clients, *c;
1434 
1435 	/* lock the queue */
1436 	if ((pe = pthread_mutex_lock(&self->new_clients_mutex))) {
1437 		hlog(LOG_ERR, "collect_new_clients(worker %d): could not lock new_clients_mutex: %s", self->id, strerror(pe));
1438 		return;
1439 	}
1440 
1441 	/* quickly grab the new clients to a local variable */
1442 	new_clients = self->new_clients;
1443 	self->new_clients = NULL;
1444 	self->new_clients_last = NULL;
1445 
1446 	/* unlock */
1447 	if ((pe = pthread_mutex_unlock(&self->new_clients_mutex))) {
1448 		hlog(LOG_ERR, "collect_new_clients(worker %d): could not unlock new_clients_mutex: %s", self->id, strerror(pe));
1449 		/* we'd be going to deadlock here... */
1450 		exit(1);
1451 	}
1452 
1453 	if ((pe = pthread_mutex_lock(&self->clients_mutex))) {
1454 		hlog(LOG_ERR, "collect_new_clients(worker %d): could not lock clients_mutex: %s", self->id, strerror(pe));
1455 		return;
1456 	}
1457 
1458 	/* move the new clients to the thread local client list */
1459 	n = self->xp.pollfd_used;
1460 	i = 0;
1461 	while (new_clients) {
1462 		i++;
1463 		c = new_clients;
1464 		new_clients = c->next;
1465 
1466 		if (c->fd < -1) {
1467 			if (c->fd == -2) {
1468 				/* corepeer reconfig flag */
1469 				hlog(LOG_DEBUG, "collect_new_clients(worker %d): closing all existing peergroup peers", self->id);
1470 				corepeer_close_all(self);
1471 			} else {
1472 				hlog(LOG_NOTICE, "collect_new_clients(worker %d): odd fd on new client: %d", self->id, c->fd);
1473 			}
1474 
1475 			client_free(c);
1476 			i--; /* don't count it in */
1477 			continue;
1478 		}
1479 
1480 		self->client_count++;
1481 		// hlog(LOG_DEBUG, "collect_new_clients(worker %d): got client fd %d", self->id, c->fd);
1482 		c->next = self->clients;
1483 		if (c->next)
1484 			c->next->prevp = &c->next;
1485 		self->clients = c;
1486 		c->prevp = &self->clients;
1487 
1488 		/* If this client is already in connected state, classify it
1489 		 * (live upgrading). Also, if it's a corepeer, it's not going to
1490 		 * "log in" later and it needs to be classified now.
1491 		 */
1492 		if (c->state == CSTATE_CONNECTED || c->state == CSTATE_COREPEER)
1493 			worker_classify_client(self, c);
1494 
1495 		/* If the new client is an UDP core peer, we will add its FD to the
1496 		 * polling list, but only once. There is only a single listener socket
1497 		 * for a single peer group.
1498 		 */
1499 		if (c->state == CSTATE_COREPEER) {
1500 			/* add to corepeer client list and polling list */
1501 			hlog(LOG_DEBUG, "collect_new_clients(worker %d): got core peergroup peer, UDP fd %d", self->id, c->udpclient->fd);
1502 
1503 			if (worker_corepeer_client_count == MAX_COREPEERS) {
1504 				hlog(LOG_ERR, "worker: Too many core peergroup peers (max %d)", MAX_COREPEERS);
1505 				exit(1);
1506 			}
1507 
1508 			/* build a static array of clients, for quick searching based on address */
1509 			c->fd = worker_corepeer_client_count * -1 - 100;
1510 			worker_corepeer_clients[worker_corepeer_client_count] = c;
1511 			worker_corepeer_client_count++;
1512 
1513 			if (!c->udpclient->polled) {
1514 				c->udpclient->polled = 1;
1515 				c->xfd = xpoll_add(&self->xp, c->udpclient->fd, (void *)c);
1516 				hlog(LOG_DEBUG, "collect_new_clients(worker %d): starting poll for UDP fd %d xfd %p", self->id, c->udpclient->fd, c->xfd);
1517 			}
1518 
1519 			c->handler_client_readable = &handle_corepeer_readable;
1520 			c->write = &udp_client_write;
1521 
1522 			continue;
1523 		}
1524 
1525 		/* add to polling list */
1526 		c->xfd = xpoll_add(&self->xp, c->fd, (void *)c);
1527 		hlog(LOG_DEBUG, "collect_new_clients(worker %d): added fd %d to polling list, xfd %p", self->id, c->fd, c->xfd);
1528 		if (!c->xfd) {
1529 			/* ouch, out of xfd space */
1530 			shutdown(c->fd, SHUT_RDWR);
1531 			continue;
1532 		}
1533 
1534 #ifdef USE_SCTP
1535 		if (c->ai_protocol == IPPROTO_SCTP) {
1536 			c->handler_client_readable = &sctp_readable;
1537 			c->handler_client_writable = &sctp_writable;
1538 			c->write = &sctp_client_write;
1539 		} else
1540 #endif
1541 #ifdef USE_SSL
1542 		if (c->ssl_con) {
1543 			hlog(LOG_DEBUG, "collect_new_clients(worker %d): fd %d uses SSL", self->id, c->fd);
1544 			c->handler_client_readable = &ssl_readable;
1545 			c->handler_client_writable = &ssl_writable;
1546 			c->write = &ssl_client_write;
1547 		} else
1548 #endif
1549 		{
1550 			c->handler_client_readable = &handle_client_readable;
1551 			c->handler_client_writable = &handle_client_writable;
1552 			c->write = &tcp_client_write;
1553 		}
1554 
1555 		/* The new client may end up destroyed right away, never mind it here.
1556 		 * We will notice it later and discard the client.
1557 		 */
1558 
1559 		/* According to http://www.aprs-is.net/ServerDesign.aspx, the server must
1560 		 * initially transmit it's software name and version string.
1561 		 * In case of a live upgrade, this should maybe be skipped, but
1562 		 * I'll leave it in for now.
1563 		 */
1564 		if (c->flags & CLFLAGS_INPORT)
1565 			client_printf(self, c, "# %s\r\n", (fake_version) ? fake_version : verstr_aprsis);
1566 
1567 		/* If the write failed immediately, c is already invalid at this point. Don't touch it. */
1568 	}
1569 
1570 	if ((pe = pthread_mutex_unlock(&self->clients_mutex))) {
1571 		hlog(LOG_ERR, "collect_new_clients(worker %d): could not unlock clients_mutex: %s", self->id, strerror(pe));
1572 		exit(1);
1573 	}
1574 
1575 	hlog( LOG_DEBUG, "Worker %d accepted %d new clients, %d new connections, now total %d clients",
1576 	      self->id, i, self->xp.pollfd_used - n, self->client_count );
1577 }
1578 
1579 /*
1580  *	Send keepalives to client sockets, run this once a second
1581  *	This watches also obuf_wtime becoming too old, and also about
1582  *	the number of writes on socket in previous run interval to
1583  *	auto-adjust socket buffering mode.
1584  */
send_keepalives(struct worker_t * self)1585 static void send_keepalives(struct worker_t *self)
1586 {
1587 	struct client_t *c, *cnext;
1588 	struct tm t;
1589 	char buf[230], *s;
1590 	int len0, len, rc;
1591 	static const char *monthname[12] = {"Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"};
1592 	time_t w_expire    = tick - sock_write_expire;
1593 
1594 	// Example message:
1595 	// # javAPRSSrvr 3.12b12 1 Mar 2008 15:11:20 GMT T2FINLAND 85.188.1.32:14580
1596 
1597 	sprintf(buf, "# %.40s ", verstr_aprsis);
1598 	s = buf + strlen(buf);
1599 
1600 	memset(&t, 0, sizeof(t));
1601 	gmtime_r(&now, &t);
1602 
1603 	// s += strftime(s, 40, "%d %b %Y %T GMT", &t);
1604 	// However that depends upon LOCALE, thus following:
1605 	s += sprintf(s, "%d %s %d %02d:%02d:%02d GMT",
1606 		     t.tm_mday, monthname[t.tm_mon], t.tm_year + 1900,
1607 		     t.tm_hour, t.tm_min, t.tm_sec);
1608 
1609 	s += sprintf(s, " %s ", serverid);
1610 
1611 	len0 = (s - buf);
1612 
1613 	for (c = self->clients; (c); c = cnext) {
1614 		// the  c  may get destroyed from underneath of ourselves!
1615 		cnext = c->next;
1616 
1617 		/* No keepalives on PEER links.. */
1618 		if ( c->state == CSTATE_COREPEER )
1619 			continue;
1620 
1621 		/* is it time to clean up? */
1622 		if (c->cleanup <= tick || c->cleanup > tick + 120+120) {
1623 			c->cleanup = tick + 120;
1624 			client_heard_expire(c);
1625 		}
1626 
1627 		/* Is it time for keepalive? Also send a keepalive if clock jumped backwards. */
1628 		if ((c->keepalive <= tick)
1629 		    || (c->keepalive > tick + keepalive_interval)) {
1630 			int flushlevel = c->obuf_flushsize;
1631 			c->keepalive = tick + keepalive_interval;
1632 
1633 			len = len0 + sprintf(s, "%s\r\n", c->addr_loc);
1634 
1635 			c->obuf_flushsize = 0;
1636 			/* Write out immediately */
1637 			rc = c->write(self, c, buf, len);
1638 			if (rc < -2) continue; // destroyed
1639 			c->obuf_flushsize = flushlevel;
1640 		} else {
1641 			/* just fush if there was anything to write */
1642 			if (c->ai_protocol == IPPROTO_TCP) {
1643 				rc = c->write(self, c, buf, 0);
1644 				if (rc < -2) continue; // destroyed..
1645 			}
1646 		}
1647 
1648 		/* Check for input timeouts. These will currently also kick in if the
1649 		 * real-time clock jumps backwards for some reason.
1650 		 */
1651 		if (c->flags & CLFLAGS_INPORT) {
1652 			if (c->state != CSTATE_CONNECTED) {
1653 				if (c->connect_tick <= tick - client_login_timeout) {
1654 					hlog(LOG_DEBUG, "%s: Closing client fd %d due to login timeout (%d s)",
1655 					      c->addr_rem, c->fd, client_login_timeout);
1656 					client_close(self, c, CLIERR_LOGIN_TIMEOUT);
1657 					continue;
1658 				}
1659 			} else {
1660 				if (c->last_read <= tick - client_timeout) {
1661 					hlog(LOG_DEBUG, "%s: Closing client fd %d due to inactivity (%d s)",
1662 						c->addr_rem, c->fd, client_timeout);
1663 					client_close(self, c, CLIERR_INACTIVITY);
1664 					continue;
1665 				}
1666 			}
1667 		} else {
1668 			if (c->last_read <= tick - upstream_timeout) {
1669 				hlog(LOG_INFO, "%s: Closing uplink fd %d due to inactivity (%d s)",
1670 				      c->addr_rem, c->fd, upstream_timeout);
1671 				client_close(self, c, CLIERR_INACTIVITY);
1672 				continue;
1673 			}
1674 		}
1675 
1676 		/* check for write timeouts */
1677 		if (c->obuf_wtime < w_expire && c->state != CSTATE_UDP) {
1678 			// TOO OLD!  Shutdown the client
1679 			hlog(LOG_DEBUG, "%s: Closing connection fd %d due to obuf wtime timeout",
1680 			      c->addr_rem, c->fd);
1681 			client_close(self, c, CLIERR_OUTPUT_WRITE_TIMEOUT);
1682 			continue;
1683 		}
1684 
1685 		/* Adjust buffering, try not to jump back and forth between buffered and unbuffered.
1686 		 * Please note that the we always flush the buffer at the end of a round if the
1687 		 * client socket is writable (OS buffer not full), so we don't really wait for
1688 		 * obuf_flushsize to be reached. Buffering will just make a couple of packets sent
1689 		 * go in the same write().
1690 		 */
1691 		if (c->obuf_writes > obuf_writes_threshold) {
1692 			// Lots and lots of writes, switch to buffering...
1693 			if (c->obuf_flushsize == 0) {
1694 				c->obuf_flushsize = c->obuf_size / 2;
1695 				//hlog( LOG_DEBUG,"Switch fd %d (%s) to buffered writes (%d writes), flush at %d",
1696 				//	c->fd, c->addr_rem, c->obuf_writes, c->obuf_flushsize);
1697 			}
1698 		} else if (c->obuf_flushsize != 0 && c->obuf_writes < obuf_writes_threshold_hys) {
1699 			// Not so much writes, back to "write immediate"
1700 			//hlog( LOG_DEBUG,"Switch fd %d (%s) to unbuffered writes (%d writes)",
1701 			//	 c->fd, c->addr_rem, c->obuf_writes);
1702 			c->obuf_flushsize = 0;
1703 		}
1704 
1705 		c->obuf_writes = 0;
1706 	}
1707 }
1708 
1709 
1710 /*
1711  *	Worker thread
1712  */
1713 
worker_thread(struct worker_t * self)1714 void worker_thread(struct worker_t *self)
1715 {
1716 	sigset_t sigs_to_block;
1717 	time_t next_keepalive = tick + 2;
1718 	time_t next_24h_cleanup = tick + 86400;
1719 	char myname[20];
1720 	struct pbuf_t *p;
1721 #if 0
1722 	time_t next_lag_query = tick + 10;
1723 #endif
1724 	time_t t1, t2, t3, t4, t5, t6, t7;
1725 
1726 	sprintf(myname,"worker %d", self->id);
1727 	pthreads_profiling_reset(myname);
1728 
1729 	sigemptyset(&sigs_to_block);
1730 	sigaddset(&sigs_to_block, SIGALRM);
1731 	sigaddset(&sigs_to_block, SIGINT);
1732 	sigaddset(&sigs_to_block, SIGTERM);
1733 	sigaddset(&sigs_to_block, SIGQUIT);
1734 	sigaddset(&sigs_to_block, SIGHUP);
1735 	sigaddset(&sigs_to_block, SIGURG);
1736 	sigaddset(&sigs_to_block, SIGPIPE);
1737 	sigaddset(&sigs_to_block, SIGUSR1);
1738 	sigaddset(&sigs_to_block, SIGUSR2);
1739 	pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
1740 
1741 	hlog(LOG_DEBUG, "Worker %d started.", self->id);
1742 
1743 	while (!self->shutting_down) {
1744 		t1 = tick;
1745 
1746 		/* if we have new stuff in the global packet buffer, process it */
1747 		if (*self->pbuf_global_prevp || *self->pbuf_global_dupe_prevp)
1748 			process_outgoing(self);
1749 
1750 		t2 = tick;
1751 
1752 		// TODO: calculate different delay based on outgoing lag ?
1753 		/* poll for incoming traffic */
1754 		xpoll(&self->xp, 30); // was 200, but gave too big latency
1755 
1756 		/* if we have stuff in the local queue, try to flush it and make
1757 		 * it available to the dupecheck thread
1758 		 */
1759 		t3 = tick;
1760 
1761 		if (self->pbuf_incoming_local)
1762 			incoming_flush(self);
1763 
1764 		t4 = tick;
1765 
1766 		if (self->new_clients)
1767 			collect_new_clients(self);
1768 
1769 		t5 = tick;
1770 
1771 		/* time of next keepalive broadcast ? */
1772 		if (tick >= next_keepalive || next_keepalive > tick + KEEPALIVE_POLL_FREQ*2) {
1773 			next_keepalive = tick + KEEPALIVE_POLL_FREQ; /* Run them every 2 seconds */
1774 			send_keepalives(self);
1775 
1776 			/* time of daily worker cleanup? */
1777 			if (tick >= next_24h_cleanup || tick < next_24h_cleanup - 100000) {
1778 				hlog(LOG_DEBUG, "worker %d: 24h clean up event", self->id);
1779 				/* currently the cleanup is pretty lean. */
1780 				self->internal_packet_drops = 0;
1781 				next_24h_cleanup = tick + 86400;
1782 			}
1783 		}
1784 
1785 		t6 = tick;
1786 #if 0
1787 		if (tick > next_lag_query) {
1788 			int lag, lag1, lag2;
1789 			next_lag_query += 10; // every 10 seconds..
1790 			lag = outgoing_lag_report(self, &lag1, &lag2);
1791 			hlog(LOG_DEBUG, "Thread %d  pbuf lag %d,  dupelag %d", self->id, lag1, lag2);
1792 		}
1793 #endif
1794 		t7 = tick;
1795 
1796 #if 1
1797 		if (t7-t1 > 1 || t7-t1 < 0) // only report if the delay is over 1 seconds.  they are a LOT rarer
1798 		  hlog( LOG_DEBUG, "Worker thread %d loop step delays:  dt2: %d  dt3: %d  dt4: %d  dt5: %d  dt6: %d  dt7: %d",
1799 			self->id, t2-t1, t3-t1, t4-t1, t5-t1, t6-t1, t7-t1 );
1800 #endif
1801 	}
1802 
1803 	if (self->shutting_down == 2) {
1804 		/* live upgrade: must free all UDP client structs - we need to close the UDP listener fd. */
1805 		/* Must also disconnect all SSL clients - the SSL crypto state cannot be moved over. */
1806 		struct client_t *c, *next;
1807 		for (c = self->clients; (c); c = next) {
1808 			next = c->next;
1809 #ifdef USE_SSL
1810 			/* SSL client? */
1811 			if (c->ssl_con) {
1812 				client_close(self, c, CLIOK_THREAD_SHUTDOWN);
1813 				continue;
1814 			}
1815 #endif
1816 			/* collect client state first before closing or freeing anything */
1817 			if (worker_shutdown_clients && c->fd >= 0) {
1818 				cJSON *jc = worker_client_json(c, 1);
1819 				cJSON_AddItemToArray(worker_shutdown_clients, jc);
1820 			}
1821 			client_udp_free(c->udpclient);
1822 			c->udpclient = NULL;
1823 		}
1824 	} else {
1825 		/* close all clients, if not shutting down for a live upgrade */
1826 		while (self->clients)
1827 			client_close(self, self->clients, CLIOK_THREAD_SHUTDOWN);
1828 	}
1829 
1830 	/* stop polling */
1831 	xpoll_free(&self->xp);
1832 	memset(&self->xp,0,sizeof(self->xp));
1833 
1834 	/* check if there is stuff in the incoming queue (not taken by dupecheck) */
1835 	int pbuf_incoming_found = 0;
1836 	for (p = self->pbuf_incoming; p; p = p->next) {
1837 		pbuf_incoming_found++;
1838 	}
1839 	if (pbuf_incoming_found != self->pbuf_incoming_count) {
1840 		hlog(LOG_ERR, "Worker %d: found %d packets in incoming queue, does not match count %d",
1841 			self->id, pbuf_incoming_found, self->pbuf_incoming_count);
1842 	}
1843 	if (self->pbuf_incoming_count)
1844 		hlog(LOG_INFO, "Worker %d: %d packets left in incoming queue",
1845 			self->id, self->pbuf_incoming_count);
1846 
1847 	/* clean up thread-local pbuf pools */
1848 	worker_free_buffers(self);
1849 
1850 	hlog(LOG_DEBUG, "Worker %d shut down%s.", self->id, (self->shutting_down == 2) ? " - clients left hanging" : "");
1851 }
1852 
1853 /*
1854  *	Stop workers - runs from accept_thread
1855  *	stop_all: 1 => stop all threads, 2 => stop all threads for live upgrade
1856  */
1857 
workers_stop(int stop_all)1858 void workers_stop(int stop_all)
1859 {
1860 	struct worker_t *w;
1861 	int e;
1862 	int stopped = 0;
1863 
1864 	hlog(LOG_INFO, "Stopping %d worker threads...",
1865 		(stop_all) ? workers_running : workers_running - workers_configured);
1866 	while (workers_running > workers_configured || (stop_all && workers_running > 0)) {
1867 		hlog(LOG_DEBUG, "Stopping a worker thread...");
1868 		/* find the last worker thread and shut it down...
1869 		 * could shut down the first one, but to reduce confusion
1870 		 * will shut down the one with the largest worker id :)
1871 		 *
1872 		 * This could be done even more cleanly by moving the connected
1873 		 * clients to the threads which are left running, but maybe
1874 		 * that'd be way too cool and complicated to implement right now.
1875 		 * It's cool enough to be able to reconfigure at all.
1876 		 */
1877 		w = worker_threads;
1878 		if (w == NULL) {
1879 			hlog(LOG_CRIT, "Cannot stop worker threads, none running");
1880 			abort();
1881 		}
1882 		while ((w) && (w->next))
1883 			w = w->next;
1884 
1885 		w->shutting_down = (stop_all == 2) ? 2 : 1;
1886 		if ((e = pthread_join(w->th, NULL))) {
1887 			hlog(LOG_ERR, "Could not pthread_join worker %d: %s", w->id, strerror(e));
1888 		} else {
1889 			hlog(LOG_DEBUG, "Worker %d has terminated.", w->id);
1890 			stopped++;
1891 		}
1892 
1893 		*(w->prevp) = NULL;
1894 		hfree(w);
1895 
1896 		workers_running--;
1897 	}
1898 	hlog(LOG_INFO, "Stopped %d worker threads.", stopped);
1899 
1900 }
1901 
1902 /*
1903  *	Allocate a worker structure.
1904  *	This is also called from the http thread which acts as a
1905  *	"worker" for incoming packets.
1906  */
1907 
worker_alloc(void)1908 struct worker_t *worker_alloc(void)
1909 {
1910 	struct worker_t *w;
1911 	pthread_mutexattr_t mut_recursive;
1912 	int e;
1913 
1914 	if ((e = pthread_mutexattr_init(&mut_recursive))) {
1915 		hlog(LOG_ERR, "worker_alloc: pthread_mutexattr_init failed: %s", strerror(e));
1916 	}
1917 
1918 	if ((e = pthread_mutexattr_settype(&mut_recursive, PTHREAD_MUTEX_RECURSIVE))) {
1919 		hlog(LOG_ERR, "worker_alloc: pthread_mutexattr_settype PTHREAD_MUTEX_RECURSIVE failed: %s", strerror(e));
1920 	}
1921 
1922 	w = hmalloc(sizeof(*w));
1923 	memset(w, 0, sizeof(*w));
1924 
1925 	pthread_mutex_init(&w->clients_mutex, &mut_recursive);
1926 	pthread_mutex_init(&w->new_clients_mutex, NULL);
1927 
1928 	w->pbuf_incoming_local = NULL;
1929 	w->pbuf_incoming_local_last = &w->pbuf_incoming_local;
1930 
1931 	w->pbuf_incoming      = NULL;
1932 	w->pbuf_incoming_last = &w->pbuf_incoming;
1933 	pthread_mutex_init(&w->pbuf_incoming_mutex, NULL);
1934 
1935 	w->pbuf_global_prevp      = pbuf_global_prevp;
1936 	w->pbuf_global_dupe_prevp = pbuf_global_dupe_prevp;
1937 
1938 	return w;
1939 }
1940 
1941 /*
1942  *	Free a worker's local buffers
1943  */
1944 
worker_free_buffers(struct worker_t * self)1945 void worker_free_buffers(struct worker_t *self)
1946 {
1947 	struct pbuf_t *p, *pn;
1948 
1949 	/* clean up thread-local pbuf pools */
1950 	for (p = self->pbuf_free_small; p; p = pn) {
1951 		pn = p->next;
1952 		pbuf_free(NULL, p); // free to global pool
1953 	}
1954 	for (p = self->pbuf_free_medium; p; p = pn) {
1955 		pn = p->next;
1956 		pbuf_free(NULL, p); // free to global pool
1957 	}
1958 	for (p = self->pbuf_free_large; p; p = pn) {
1959 		pn = p->next;
1960 		pbuf_free(NULL, p); // free to global pool
1961 	}
1962 }
1963 
1964 /*
1965  *	Start workers - runs from accept_thread
1966  */
1967 
workers_start(void)1968 void workers_start(void)
1969 {
1970 	int i;
1971 	struct worker_t * volatile w;
1972  	struct worker_t **prevp;
1973 
1974 	if (workers_running)
1975 		workers_stop(0);
1976 
1977 	hlog(LOG_INFO, "Starting %d worker threads (configured: %d)...",
1978 		workers_configured - workers_running, workers_configured);
1979 
1980 	while (workers_running < workers_configured) {
1981 		hlog(LOG_DEBUG, "Starting a worker thread...");
1982 		i = 0;
1983 		prevp = &worker_threads;
1984 		w = worker_threads;
1985 		while (w) {
1986 			prevp = &w->next;
1987 			w = w->next;
1988 			i++;
1989 		}
1990 
1991 		w = worker_alloc();
1992 		*prevp = w;
1993 		w->prevp = prevp;
1994 
1995 		w->id = i;
1996 		xpoll_initialize(&w->xp, (void *)w, &handle_client_event);
1997 
1998 		/* start the worker thread */
1999 		if (pthread_create(&w->th, &pthr_attrs, (void *)worker_thread, w))
2000 			perror("pthread_create failed for worker_thread");
2001 
2002 		workers_running++;
2003 	}
2004 }
2005 
2006 /*
2007  *	Add an array of long longs to a JSON tree.
2008  */
2009 
json_add_rxerrs(cJSON * root,const char * key,long long vals[])2010 void json_add_rxerrs(cJSON *root, const char *key, long long vals[])
2011 {
2012 	double vald[INERR_BUCKETS];
2013 	int i;
2014 
2015 	/* cJSON does not have a CreateLongLongArray, big ints are taken in
2016 	 * as floating point values. Strange, ain't it.
2017 	 */
2018 	for (i = 0; i < INERR_BUCKETS; i++)
2019 		vald[i] = vals[i];
2020 
2021 	cJSON_AddItemToObject(root, key, cJSON_CreateDoubleArray(vald, INERR_BUCKETS));
2022 }
2023 
2024 /*
2025  *	Client state string
2026  */
2027 
client_state_string(CStateEnum state)2028 static const char *client_state_string(CStateEnum state)
2029 {
2030 	static const char *states[] = {
2031 		"unknown",
2032 		"init",
2033 		"login",
2034 		"logresp",
2035 		"connected",
2036 		"udp",
2037 		"corepeer"
2038 	};
2039 
2040 	switch (state) {
2041 	case CSTATE_CONNECTED:
2042 		return states[4];
2043 	case CSTATE_INIT:
2044 		return states[1];
2045 	case CSTATE_LOGIN:
2046 		return states[2];
2047 	case CSTATE_LOGRESP:
2048 		return states[3];
2049 	case CSTATE_UDP:
2050 		return states[5];
2051 	case CSTATE_COREPEER:
2052 		return states[6];
2053 	};
2054 
2055 	return states[0];
2056 }
2057 
2058 /*
2059  *	Fill worker client list for status display
2060  *	(called from another thread - watch out and lock!)
2061  */
2062 
worker_client_json(struct client_t * c,int liveup_info)2063 static struct cJSON *worker_client_json(struct client_t *c, int liveup_info)
2064 {
2065 	char addr_s[80];
2066 	char *s;
2067 	static const char *uplink_modes[] = {
2068 		"ro",
2069 		"multiro",
2070 		"full",
2071 		"peer"
2072 	};
2073 	const char *mode;
2074 
2075 	cJSON *jc = cJSON_CreateObject();
2076 	cJSON_AddNumberToObject(jc, "fd", c->fd);
2077 	cJSON_AddNumberToObject(jc, "id", c->fd);
2078 
2079 	/* additional information for live upgrade, not published */
2080 	if (liveup_info) {
2081 		cJSON_AddNumberToObject(jc, "listener_id", c->listener_id);
2082 		cJSON_AddStringToObject(jc, "state", client_state_string(c->state));
2083 		if (c->udp_port && c->udpclient)
2084 			cJSON_AddNumberToObject(jc, "udp_port", c->udp_port);
2085 
2086 		/* output buffer and input buffer data */
2087 		if (c->obuf_end - c->obuf_start > 0) {
2088 			s = hex_encode(c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
2089 			cJSON_AddStringToObject(jc, "obuf", s);
2090 			hfree(s);
2091 		}
2092 
2093 		if (c->ibuf_end > 0) {
2094 			s = hex_encode(c->ibuf, c->ibuf_end);
2095 			cJSON_AddStringToObject(jc, "ibuf", s);
2096 			hlog(LOG_DEBUG, "Encoded ibuf %d bytes: '%.*s'", c->ibuf_end, c->ibuf_end, c->ibuf);
2097 			hlog(LOG_DEBUG, "Hex: %s", s);
2098 			hfree(s);
2099 		}
2100 
2101 		/* If message routing for stations heard by this client is enabled,
2102 		 * dump the client_heard hash table, too.
2103 		 */
2104 		if (c->flags & CLFLAGS_IGATE)
2105 			cJSON_AddItemToObject(jc, "client_heard", client_heard_json(c->client_heard));
2106 	}
2107 
2108 	if (c->state == CSTATE_COREPEER) {
2109 		/* cut out ports in the name of security by obscurity */
2110 		strncpy(addr_s, c->addr_rem, sizeof(addr_s));
2111 		addr_s[sizeof(addr_s)-1] = 0;
2112 		if ((s = strrchr(addr_s, ':')))
2113 			*s = 0;
2114 		cJSON_AddStringToObject(jc, "addr_rem", addr_s);
2115 		strncpy(addr_s, c->addr_loc, sizeof(addr_s));
2116 		addr_s[sizeof(addr_s)-1] = 0;
2117 		if ((s = strrchr(addr_s, ':')))
2118 			*s = 0;
2119 		cJSON_AddStringToObject(jc, "addr_loc", addr_s);
2120 	} else {
2121 		cJSON_AddStringToObject(jc, "addr_rem", c->addr_rem);
2122 		cJSON_AddStringToObject(jc, "addr_loc", c->addr_loc);
2123 	}
2124 
2125 	//cJSON_AddStringToObject(jc, "addr_q", c->addr_hex);
2126 
2127 	if (c->udp_port && c->udpclient)
2128 		cJSON_AddNumberToObject(jc, "udp_downstream", 1);
2129 
2130 	cJSON_AddNumberToObject(jc, "t_connect", c->connect_time);
2131 	cJSON_AddNumberToObject(jc, "t_connect_tick", c->connect_tick);
2132 	cJSON_AddNumberToObject(jc, "since_connect", tick - c->connect_tick);
2133 	cJSON_AddNumberToObject(jc, "since_last_read", tick - c->last_read);
2134 	cJSON_AddStringToObject(jc, "username", c->username);
2135 	cJSON_AddStringToObject(jc, "app_name", c->app_name);
2136 	cJSON_AddStringToObject(jc, "app_version", c->app_version);
2137 	cJSON_AddNumberToObject(jc, "verified", c->validated);
2138 	cJSON_AddNumberToObject(jc, "obuf_q", c->obuf_end - c->obuf_start);
2139 	cJSON_AddNumberToObject(jc, "bytes_rx", c->localaccount.rxbytes);
2140 	cJSON_AddNumberToObject(jc, "bytes_tx", c->localaccount.txbytes);
2141 	cJSON_AddNumberToObject(jc, "pkts_rx", c->localaccount.rxpackets);
2142 	cJSON_AddNumberToObject(jc, "pkts_tx", c->localaccount.txpackets);
2143 	cJSON_AddNumberToObject(jc, "pkts_ign", c->localaccount.rxdrops);
2144 	cJSON_AddNumberToObject(jc, "pkts_dup", c->localaccount.rxdupes);
2145 	cJSON_AddNumberToObject(jc, "heard_count", c->client_heard_count);
2146 	cJSON_AddNumberToObject(jc, "courtesy_count", c->client_courtesy_count);
2147 
2148 	if (c->loc_known) {
2149 		cJSON_AddNumberToObject(jc, "lat", c->lat);
2150 		cJSON_AddNumberToObject(jc, "lng", c->lng);
2151 	}
2152 
2153 	if (c->quirks_mode)
2154 		cJSON_AddNumberToObject(jc, "quirks_mode", c->quirks_mode);
2155 
2156 	json_add_rxerrs(jc, "rx_errs", c->localaccount.rxerrs);
2157 
2158 	if (c->state == CSTATE_COREPEER) {
2159 		cJSON_AddStringToObject(jc, "mode", uplink_modes[3]);
2160 	} else if (c->flags & CLFLAGS_INPORT) {
2161 		/* client */
2162 		cJSON_AddStringToObject(jc, "filter", c->filter_s);
2163 	} else {
2164 		if (c->flags & CLFLAGS_UPLINKMULTI)
2165 			mode = uplink_modes[1];
2166 		else if (c->flags & CLFLAGS_PORT_RO)
2167 			mode = uplink_modes[0];
2168 		else
2169 			mode = uplink_modes[2];
2170 
2171 		cJSON_AddStringToObject(jc, "mode", mode);
2172 	}
2173 
2174 #ifdef USE_SSL
2175 	if (c->cert_subject[0])
2176 		cJSON_AddStringToObject(jc, "cert_subject", c->cert_subject);
2177 	if (c->cert_issuer[0])
2178 		cJSON_AddStringToObject(jc, "cert_issuer", c->cert_issuer);
2179 #endif
2180 
2181 	return jc;
2182 }
2183 
worker_client_list(cJSON * workers,cJSON * clients,cJSON * uplinks,cJSON * peers,cJSON * totals,cJSON * memory)2184 int worker_client_list(cJSON *workers, cJSON *clients, cJSON *uplinks, cJSON *peers, cJSON *totals, cJSON *memory)
2185 {
2186 	struct worker_t *w = worker_threads;
2187 	struct client_t *c;
2188 	int pe;
2189 	int client_heard_count = 0;
2190 	int client_courtesy_count = 0;
2191 
2192 	while (w) {
2193 		if ((pe = pthread_mutex_lock(&w->clients_mutex))) {
2194 			hlog(LOG_ERR, "worker_client_list(worker %d): could not lock clients_mutex: %s", w->id, strerror(pe));
2195 			return -1;
2196 		}
2197 
2198 		cJSON *jw = cJSON_CreateObject();
2199 		cJSON_AddNumberToObject(jw, "id", w->id);
2200 		cJSON_AddNumberToObject(jw, "clients", w->client_count);
2201 		cJSON_AddNumberToObject(jw, "pbuf_incoming_count", w->pbuf_incoming_count);
2202 		cJSON_AddNumberToObject(jw, "pbuf_incoming_local_count", w->pbuf_incoming_local_count);
2203 
2204 		for (c = w->clients; (c); c = c->next) {
2205 			client_heard_count += c->client_heard_count;
2206 			client_courtesy_count += c->client_courtesy_count;
2207 
2208 			/* clients on hidden listener sockets are not shown */
2209 			/* if there are a huge amount of clients, don't list them
2210 			 * - cJSON takes huge amounts of CPU to build the list
2211 			 * - web browser will die due to the big blob
2212 			 */
2213 			if (c->hidden || w->client_count > 1000)
2214 				continue;
2215 
2216 			cJSON *jc = worker_client_json(c, 0);
2217 
2218 			if (c->state == CSTATE_COREPEER) {
2219 				cJSON_AddItemToArray(peers, jc);
2220 			} else if (c->flags & CLFLAGS_INPORT) {
2221 				cJSON_AddItemToArray(clients, jc);
2222 			} else {
2223 				cJSON_AddItemToArray(uplinks, jc);
2224 			}
2225 		}
2226 
2227 		cJSON_AddItemToArray(workers, jw);
2228 
2229 		if ((pe = pthread_mutex_unlock(&w->clients_mutex))) {
2230 			hlog(LOG_ERR, "worker_client_list(worker %d): could not unlock clients_mutex: %s", w->id, strerror(pe));
2231 			/* we'd going to deadlock here... */
2232 			exit(1);
2233 		}
2234 
2235 		w = w->next;
2236 	}
2237 
2238 	cJSON_AddNumberToObject(totals, "tcp_bytes_rx", client_connects_tcp.rxbytes);
2239 	cJSON_AddNumberToObject(totals, "tcp_bytes_tx", client_connects_tcp.txbytes);
2240 	cJSON_AddNumberToObject(totals, "tcp_pkts_rx", client_connects_tcp.rxpackets);
2241 	cJSON_AddNumberToObject(totals, "tcp_pkts_tx", client_connects_tcp.txpackets);
2242 	cJSON_AddNumberToObject(totals, "tcp_pkts_ign", client_connects_tcp.rxdrops);
2243 	cJSON_AddNumberToObject(totals, "udp_bytes_rx", client_connects_udp.rxbytes);
2244 	cJSON_AddNumberToObject(totals, "udp_bytes_tx", client_connects_udp.txbytes);
2245 	cJSON_AddNumberToObject(totals, "udp_pkts_rx", client_connects_udp.rxpackets);
2246 	cJSON_AddNumberToObject(totals, "udp_pkts_tx", client_connects_udp.txpackets);
2247 	cJSON_AddNumberToObject(totals, "udp_pkts_ign", client_connects_udp.rxdrops);
2248 	json_add_rxerrs(totals, "tcp_rx_errs", client_connects_tcp.rxerrs);
2249 	json_add_rxerrs(totals, "udp_rx_errs", client_connects_udp.rxerrs);
2250 #ifdef USE_SCTP
2251 	cJSON_AddNumberToObject(totals, "sctp_bytes_rx", client_connects_sctp.rxbytes);
2252 	cJSON_AddNumberToObject(totals, "sctp_bytes_tx", client_connects_sctp.txbytes);
2253 	cJSON_AddNumberToObject(totals, "sctp_pkts_rx", client_connects_sctp.rxpackets);
2254 	cJSON_AddNumberToObject(totals, "sctp_pkts_tx", client_connects_sctp.txpackets);
2255 	cJSON_AddNumberToObject(totals, "sctp_pkts_ign", client_connects_sctp.rxdrops);
2256 	json_add_rxerrs(totals, "sctp_rx_errs", client_connects_sctp.rxerrs);
2257 #endif
2258 
2259 #ifndef _FOR_VALGRIND_
2260 	struct cellstatus_t cellst;
2261 	cellstatus(client_cells, &cellst);
2262 	int used = cellst.cellcount - cellst.freecount;
2263 	cJSON_AddNumberToObject(memory, "client_cells_used", used);
2264 	cJSON_AddNumberToObject(memory, "client_cells_free", cellst.freecount);
2265 	cJSON_AddNumberToObject(memory, "client_used_bytes", used*cellst.cellsize_aligned);
2266 	cJSON_AddNumberToObject(memory, "client_allocated_bytes", (long)cellst.blocks * (long)cellst.block_size);
2267 	cJSON_AddNumberToObject(memory, "client_block_size", (long)cellst.block_size);
2268 	cJSON_AddNumberToObject(memory, "client_blocks", (long)cellst.blocks);
2269 	cJSON_AddNumberToObject(memory, "client_blocks_max", (long)cellst.blocks_max);
2270 	cJSON_AddNumberToObject(memory, "client_cell_size", cellst.cellsize);
2271 	cJSON_AddNumberToObject(memory, "client_cell_size_aligned", cellst.cellsize_aligned);
2272 	cJSON_AddNumberToObject(memory, "client_cell_align", cellst.alignment);
2273 #endif
2274 
2275 	return 0;
2276 }
2277