1 /*
2 * aprsc
3 *
4 * (c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
5 *
6 * This program is licensed under the BSD license, which can be found
7 * in the file LICENSE.
8 *
9 */
10
11 /*
12 * worker.c: the worker thread
13 */
14
15 #include <errno.h>
16 #include <string.h>
17 #include <signal.h>
18 #include <time.h>
19 #include <stdlib.h>
20 #include <netinet/in.h>
21 #include <fcntl.h>
22
23 #include "worker.h"
24
25 #include "config.h"
26 #include "hlog.h"
27 #include "hmalloc.h"
28 #include "login.h"
29 #include "uplink.h"
30 #include "incoming.h"
31 #include "outgoing.h"
32 #include "filter.h"
33 #include "dupecheck.h"
34 #include "clientlist.h"
35 #include "client_heard.h"
36 #include "cellmalloc.h"
37 #include "version.h"
38 #include "status.h"
39 #include "sctp.h"
40
41
42 time_t now; /* current time, updated by the main thread, MAY be spun around by NTP */
43 time_t tick; /* monotonous clock, may or may not be wallclock */
44
45 struct worker_t *worker_threads;
46 struct client_udp_t *udppeers; /* list of listening/receiving UDP peer sockets */
47
48 struct client_udp_t *udpclients; /* list of listening/receiving UDP client sockets */
49 /* mutex to protect udpclient chain refcounts */
50 pthread_mutex_t udpclient_mutex = PTHREAD_MUTEX_INITIALIZER;
51
52 int workers_running;
53 int sock_write_expire = 25; /* 25 seconds, smaller than the 30-second dupe check window. */
54 int keepalive_interval = 20; /* 20 seconds for individual socket, NOT all in sync! */
55 #define KEEPALIVE_POLL_FREQ 2 /* keepalive analysis scan interval */
56 int obuf_writes_threshold = 16; /* This many writes per keepalive scan interval switch socket
57 output to buffered. */
58 int obuf_writes_threshold_hys = 6; /* Less than this, and switch back. */
59
60 /* global packet buffer */
61 rwlock_t pbuf_global_rwlock = RWL_INITIALIZER;
62 struct pbuf_t *pbuf_global = NULL;
63 struct pbuf_t **pbuf_global_prevp = &pbuf_global;
64 struct pbuf_t *pbuf_global_dupe = NULL;
65 struct pbuf_t **pbuf_global_dupe_prevp = &pbuf_global_dupe;
66
67
68 /* global inbound connects, and protocol traffic accounters */
69
70 struct portaccount_t inbound_connects = {
71 .mutex = PTHREAD_MUTEX_INITIALIZER,
72 .refcount = 99, /* Global static blocks have extra-high initial refcount */
73 };
74
75 /* global byte/packet counters per protocol */
76 struct portaccount_t client_connects_tcp = {
77 .mutex = PTHREAD_MUTEX_INITIALIZER,
78 .refcount = 99, /* Global static blocks have extra-high initial refcount */
79 };
80 struct portaccount_t client_connects_udp = {
81 .mutex = PTHREAD_MUTEX_INITIALIZER,
82 .refcount = 99, /* Global static blocks have extra-high initial refcount */
83 };
84 #ifdef USE_SCTP
85 struct portaccount_t client_connects_sctp = {
86 .mutex = PTHREAD_MUTEX_INITIALIZER,
87 .refcount = 99, /* Global static blocks have extra-high initial refcount */
88 };
89 #endif
90
91 int worker_corepeer_client_count = 0;
92 struct client_t *worker_corepeer_clients[MAX_COREPEERS];
93
94 #ifndef _FOR_VALGRIND_
95 cellarena_t *client_cells;
96 #endif
97
98 /* clientlist collected at shutdown for live upgrade */
99 cJSON *worker_shutdown_clients = NULL;
100
101 static struct cJSON *worker_client_json(struct client_t *c, int liveup_info);
102
103 /* port accounters */
port_accounter_alloc(void)104 struct portaccount_t *port_accounter_alloc(void)
105 {
106 struct portaccount_t *p;
107
108 p = hmalloc(sizeof(*p));
109 memset(p, 0, sizeof(*p));
110
111 p->refcount = 1;
112 pthread_mutex_init( & p->mutex, NULL );
113
114 //hlog(LOG_DEBUG, "new port_accounter %p", p);
115
116 return p;
117 }
118
port_accounter_reject(struct portaccount_t * p)119 static void port_accounter_reject(struct portaccount_t *p)
120 {
121 int i;
122 if (!p) return;
123
124 if ((i = pthread_mutex_lock(&p->mutex))) {
125 hlog(LOG_ERR, "port_accounter_reject: could not lock portaccount: %s", strerror(i));
126 return;
127 }
128
129 ++ p->counter;
130
131 if ((i = pthread_mutex_unlock(&p->mutex))) {
132 hlog(LOG_ERR, "port_accounter_reject: could not unlock portaccount: %s", strerror(i));
133 return;
134 }
135 }
136
port_accounter_add(struct portaccount_t * p)137 static void port_accounter_add(struct portaccount_t *p)
138 {
139 int i;
140 if (!p) return;
141
142 if ((i = pthread_mutex_lock(&p->mutex))) {
143 hlog(LOG_ERR, "port_accounter_add: could not lock portaccount: %s", strerror(i));
144 return;
145 }
146
147 //hlog(LOG_DEBUG, "port_accounter_add %p", p);
148
149 ++ p->refcount;
150 ++ p->counter;
151 ++ p->gauge;
152
153 if (p->gauge > p->gauge_max)
154 p->gauge_max = p->gauge;
155
156 if ((i = pthread_mutex_unlock(&p->mutex))) {
157 hlog(LOG_ERR, "port_accounter_add: could not unlock portaccount: %s", strerror(i));
158 return;
159 }
160 }
161
port_accounter_drop(struct portaccount_t * p)162 void port_accounter_drop(struct portaccount_t *p)
163 {
164 int i, r;
165 if (!p) return;
166
167 if ((i = pthread_mutex_lock(&p->mutex))) {
168 hlog(LOG_ERR, "port_accounter_drop: could not lock portaccount: %s", strerror(i));
169 return;
170 }
171
172 -- p->refcount;
173 -- p->gauge;
174
175 r = p->refcount;
176
177 if ((i = pthread_mutex_unlock(&p->mutex))) {
178 hlog(LOG_ERR, "port_accounter_drop: could not unlock portaccount: %s", strerror(i));
179 return;
180 }
181
182 //hlog(LOG_DEBUG, "port_accounter_drop(%p) refcount=%d", p, r);
183
184 if (r == 0) {
185 /* Last reference is being destroyed */
186 hfree(p);
187 }
188 }
189
190 /*
191 * Global and port specific port usage counters
192 */
193
inbound_connects_account(const int add,struct portaccount_t * p)194 void inbound_connects_account(const int add, struct portaccount_t *p)
195 { /* add == 2/3 --> UDP "client" socket drop/add, -1 --> rejected connect */
196 int i;
197 if (add < 2) {
198 if ((i = pthread_mutex_lock(& inbound_connects.mutex ))) {
199 hlog(LOG_ERR, "inbound_connects_account: could not lock inbound_connects: %s", strerror(i));
200 return;
201 }
202
203 if (add == -1) {
204 /* just increment connects, it was discarded */
205 ++ inbound_connects.counter;
206 } else if (add) {
207 ++ inbound_connects.counter;
208 ++ inbound_connects.gauge;
209 if (inbound_connects.gauge > inbound_connects.gauge_max)
210 inbound_connects.gauge_max = inbound_connects.gauge;
211 } else {
212 -- inbound_connects.gauge;
213 }
214
215 if ((i = pthread_mutex_unlock(& inbound_connects.mutex )))
216 hlog(LOG_ERR, "inbound_connects_account: could not unlock inbound_connects: %s", strerror(i));
217 }
218
219 if ( p ) {
220 if ( add == -1 ) {
221 port_accounter_reject(p);
222 } else if ( add & 1 ) {
223 port_accounter_add(p);
224 } else {
225 port_accounter_drop(p);
226 }
227 }
228
229 // hlog( LOG_DEBUG, "inbound_connects_account(), count=%d gauge=%d max=%d",
230 // inbound_connects.count, inbound_connects.gauge, inbound_connects.gauge_max );
231 }
232
233 /* object alloc/free */
234
client_udp_free(struct client_udp_t * u)235 void client_udp_free(struct client_udp_t *u)
236 {
237 int i;
238
239 if (!u) return;
240
241 if ((i = pthread_mutex_lock(& udpclient_mutex ))) {
242 hlog(LOG_ERR, "client_udp_free: could not lock udpclient_mutex: %s", strerror(i));
243 return;
244 }
245
246 -- u->refcount;
247
248 if (u)
249 hlog(LOG_DEBUG, "client_udp_free %p port %d refcount now: %d", u, u->portnum, u->refcount);
250
251 if ( u->refcount == 0 ) {
252 hlog(LOG_DEBUG, "client_udp_free %p port %d FREEING", u, u->portnum);
253 /* Unchain, and destroy.. */
254 if (u->next)
255 u->next->prevp = u->prevp;
256 *u->prevp = u->next;
257
258 close(u->fd);
259
260 hfree(u);
261 }
262
263 if ((i = pthread_mutex_unlock(& udpclient_mutex )))
264 hlog(LOG_ERR, "client_udp_free: could not unlock udpclient_mutex: %s", strerror(i));
265 }
266
client_udp_find(struct client_udp_t * root,const int af,const int portnum)267 struct client_udp_t *client_udp_find(struct client_udp_t *root, const int af, const int portnum)
268 {
269 struct client_udp_t *u;
270 int i;
271
272 if ((i = pthread_mutex_lock(& udpclient_mutex ))) {
273 hlog(LOG_ERR, "client_udp_find: could not lock udpclient_mutex: %s", strerror(i));
274 return NULL;
275 }
276
277 for (u = root ; u ; u = u->next ) {
278 if (u->portnum == portnum && u->af == af) {
279 ++ u->refcount;
280 break;
281 }
282 }
283
284 //if (u)
285 // hlog(LOG_DEBUG, "client_udp_find %u port %d refcount now: %d", u, u->portnum, u->refcount);
286
287 if ((i = pthread_mutex_unlock(& udpclient_mutex )))
288 hlog(LOG_ERR, "client_udp_find: could not unlock udpclient_mutex: %s", strerror(i));
289
290 return u;
291 }
292
293
client_udp_alloc(struct client_udp_t ** root,int fd,int portnum)294 struct client_udp_t *client_udp_alloc(struct client_udp_t **root, int fd, int portnum)
295 {
296 struct client_udp_t *c;
297 int i;
298
299 /* TODO: hm, could maybe lock a bit later, just before adding to the udpclient list? */
300 if ((i = pthread_mutex_lock(& udpclient_mutex ))) {
301 hlog(LOG_ERR, "client_udp_alloc: could not lock udpclient_mutex: %s", strerror(i));
302 return NULL;
303 }
304
305 c = hmalloc(sizeof(*c));
306 c->polled = 0;
307 c->fd = fd;
308 c->refcount = 1; /* One reference already on creation */
309 c->portnum = portnum;
310
311 /* Add this to special list of UDP sockets */
312 c->next = *root;
313 c->prevp = root;
314 if (c->next)
315 c->next->prevp = &c->next;
316 *root = c;
317
318 //hlog(LOG_DEBUG, "client_udp_alloc %u port %d refcount now: %d", c, c->portnum, c->refcount);
319
320 if ((i = pthread_mutex_unlock(& udpclient_mutex )))
321 hlog(LOG_ERR, "client_udp_alloc: could not lock udpclient_mutex: %s", strerror(i));
322
323 return c;
324 }
325
326 /*
327 * Close and free all UDP core peers
328 */
329
corepeer_close_all(struct worker_t * self)330 static void corepeer_close_all(struct worker_t *self)
331 {
332 int i;
333 struct client_t *c;
334
335 for (i = 0; i < worker_corepeer_client_count; i++) {
336 c = worker_corepeer_clients[i];
337 client_close(self, c, CLIOK_PEERS_CLOSING);
338 worker_corepeer_clients[i] = NULL;
339 }
340
341 worker_corepeer_client_count = 0;
342 }
343
344
345 /*
346 * set up cellmalloc for clients
347 */
348
client_init(void)349 void client_init(void)
350 {
351 #ifndef _FOR_VALGRIND_
352 client_cells = cellinit( "clients",
353 sizeof(struct client_t),
354 __alignof__(struct client_t), CELLMALLOC_POLICY_FIFO,
355 4096 /* 4 MB at the time */, 0 /* minfree */ );
356 /* 4 MB arena size -> about 100 clients per single arena
357 .. with 40 arenas -> 4000 clients max. */
358 #endif
359 }
360
client_alloc(void)361 struct client_t *client_alloc(void)
362 {
363 #ifndef _FOR_VALGRIND_
364 struct client_t *c = cellmalloc(client_cells);
365 if (!c) {
366 hlog(LOG_ERR, "client_alloc: cellmalloc failed");
367 return NULL;
368 }
369 #else
370 struct client_t *c = hmalloc(sizeof(*c));
371 #endif
372 memset((void *)c, 0, sizeof(*c));
373 c->fd = -1;
374 c->state = CSTATE_INIT;
375
376 #ifdef FIXED_IOBUFS
377 c->ibuf_size = sizeof(c->ibuf);
378 c->obuf_size = sizeof(c->obuf);
379 #else
380 c->ibuf_size = ibuf_size;
381 c->obuf_size = obuf_size;
382
383 c->ibuf = hmalloc(c->ibuf_size);
384 c->obuf = hmalloc(c->obuf_size);
385 #endif
386
387 c->connect_time = now;
388 c->connect_tick = tick;
389 c->obuf_wtime = tick;
390
391 c->cleanup = tick + 120;
392
393 return c;
394 }
395
client_free(struct client_t * c)396 void client_free(struct client_t *c)
397 {
398 //hlog(LOG_DEBUG, "client_free %p: fd %d name %s addr_loc %s udpclient %p", c, c->fd, c->username, c->addr_loc, c->udpclient);
399
400 if (c->fd >= 0) close(c->fd);
401 #ifndef FIXED_IOBUFS
402 if (c->ibuf) hfree(c->ibuf);
403 if (c->obuf) hfree(c->obuf);
404 #endif
405
406 filter_free(c->posdefaultfilters);
407 filter_free(c->negdefaultfilters);
408 filter_free(c->posuserfilters);
409 filter_free(c->neguserfilters);
410
411 client_heard_free(c);
412
413 client_udp_free(c->udpclient);
414 clientlist_remove(c);
415
416 #ifdef USE_SSL
417 if (c->ssl_con)
418 ssl_free_connection(c);
419 #endif
420
421 memset(c, 0, sizeof(*c));
422
423 #ifndef _FOR_VALGRIND_
424 cellfree(client_cells, c);
425 #else
426 hfree(c);
427 #endif
428 }
429
430 /*
431 * Set up a pseudoclient for UDP and HTTP submitted packets
432 */
433
pseudoclient_setup(int portnum)434 struct client_t *pseudoclient_setup(int portnum)
435 {
436 struct client_t *c;
437
438 c = client_alloc();
439 if (!c) {
440 hlog(LOG_ERR, "pseudoclient_setup: client_alloc returned NULL");
441 abort();
442 }
443 c->fd = -1;
444 c->portnum = portnum;
445 c->state = CSTATE_CONNECTED;
446 c->flags = CLFLAGS_INPORT|CLFLAGS_CLIENTONLY;
447 c->validated = VALIDATED_WEAK; // we will validate on every packet
448 //c->portaccount = l->portaccount;
449 c->keepalive = tick;
450 c->last_read = tick;
451
452 //hlog(LOG_DEBUG, "pseudoclient setup %p: fd %d name %s addr_loc %s udpclient %p", c, c->fd, c->username, c->addr_loc, c->udpclient);
453
454 return c;
455 }
456
457 /*
458 * set client socket options, return -1 on serious errors, just log smaller ones
459 */
460
set_client_sockopt(struct client_t * c)461 int set_client_sockopt(struct client_t *c)
462 {
463 /* set non-blocking mode */
464 if (fcntl(c->fd, F_SETFL, O_NONBLOCK)) {
465 hlog(LOG_ERR, "%s - Failed to set non-blocking mode on socket: %s", c->addr_rem, strerror(errno));
466 return -1;
467 }
468
469 #ifdef USE_SCTP
470 /* set socket options specific to SCTP clients */
471 if (c->ai_protocol == IPPROTO_SCTP)
472 return sctp_set_client_sockopt(c);
473 #endif
474
475 /* Set up TCP keepalives, so that we'll notice idle clients.
476 * I'm not sure if this is absolutely required, since we send
477 * keepalive datetime messages every 30 seconds from the application,
478 * but it can't hurt.
479 */
480 #ifdef SO_KEEPALIVE
481 int keepalive_arg;
482 #ifdef TCP_KEEPIDLE
483 /* start sending keepalives after socket has been idle for 10 minutes */
484 keepalive_arg = 10*60;
485 if (setsockopt(c->fd, IPPROTO_TCP, TCP_KEEPIDLE, (void *)&keepalive_arg, sizeof(keepalive_arg)))
486 hlog(LOG_ERR, "%s - setsockopt(TCP_KEEPIDLE, %d) failed: %s", c->addr_rem, keepalive_arg, strerror(errno));
487 #endif
488 #ifdef TCP_KEEPINTVL
489 /* send keepalive probes every 20 seconds after the idle time has passed */
490 keepalive_arg = 20;
491 if (setsockopt(c->fd, IPPROTO_TCP, TCP_KEEPINTVL, (void *)&keepalive_arg, sizeof(keepalive_arg)))
492 hlog(LOG_ERR, "%s - setsockopt(TCP_KEEPINTVL, %d) failed: %s", c->addr_rem, keepalive_arg, strerror(errno));
493 #endif
494 #ifdef TCP_KEEPCNT
495 /* send 3 probes before giving up */
496 keepalive_arg = 3;
497 if (setsockopt(c->fd, IPPROTO_TCP, TCP_KEEPCNT, (void *)&keepalive_arg, sizeof(keepalive_arg)))
498 hlog(LOG_ERR, "%s - setsockopt(TCP_KEEPCNT, %d) failed: %s", c->addr_rem, keepalive_arg, strerror(errno));
499 #endif
500 /* enable TCP keepalives */
501 keepalive_arg = 1;
502 if (setsockopt(c->fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepalive_arg, sizeof(keepalive_arg)))
503 hlog(LOG_ERR, "%s - setsockopt(TCP_KEEPALIVE, %d) failed: %s", c->addr_rem, keepalive_arg, strerror(errno));
504 #endif
505
506 return 0;
507 }
508
set_client_sockopt_post_login(struct client_t * c)509 int set_client_sockopt_post_login(struct client_t *c)
510 {
511 /* Use TCP_NODELAY for APRS-IS sockets. High delays can cause packets getting past
512 * the dupe filters.
513 */
514 #ifdef TCP_NODELAY
515 int arg = 1;
516 if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (void *)&arg, sizeof(arg)))
517 hlog(LOG_ERR, "%s - setsockopt(TCP_NODELAY, %d) failed: %s", c->addr_rem, arg, strerror(errno));
518 #endif
519
520 return 0;
521 }
522
523
524 /*
525 * Pass a new client to a worker thread
526 */
527
pass_client_to_worker(struct worker_t * wc,struct client_t * c)528 int pass_client_to_worker(struct worker_t *wc, struct client_t *c)
529 {
530 int pe;
531
532 hlog(LOG_DEBUG, "pass_client_to_worker: client on fd %d to thread %d with %d users", c->fd, wc->id, wc->client_count);
533
534 if ((pe = pthread_mutex_lock(&wc->new_clients_mutex))) {
535 hlog(LOG_ERR, "pass_client_to_worker(): could not lock new_clients_mutex: %s", strerror(pe));
536 return -1;
537 }
538
539 /* push the client in the worker's queue */
540 c->next = NULL;
541
542 if (wc->new_clients_last) {
543 wc->new_clients_last->next = c;
544 c->prevp = &wc->new_clients_last->next;
545 } else {
546 wc->new_clients = c;
547 c->prevp = &wc->new_clients;
548 }
549
550 wc->new_clients_last = c;
551
552 /* unlock the queue */
553 if ((pe = pthread_mutex_unlock(&wc->new_clients_mutex))) {
554 hlog(LOG_ERR, "pass_client_to_worker(): could not unlock new_clients_mutex: %s", strerror(pe));
555 return -1;
556 }
557
558 return 0;
559 }
560
strsockaddr(const struct sockaddr * sa,const int addr_len)561 char *strsockaddr(const struct sockaddr *sa, const int addr_len)
562 {
563 char eb[200], *s;
564 char sbuf[20];
565 union sockaddr_u su, *sup;
566
567 sup = (union sockaddr_u *)sa;
568 #ifdef IN6_IS_ADDR_V4MAPPED
569 if ( sa->sa_family == AF_INET6 &&
570 ( IN6_IS_ADDR_V4MAPPED(&(sup->si6.sin6_addr)) ||
571 IN6_IS_ADDR_V4COMPAT(&(sup->si6.sin6_addr)) ) ) {
572
573 memset(&su, 0, sizeof(su));
574 su.si.sin_family = AF_INET;
575 su.si.sin_port = sup->si6.sin6_port;
576 memcpy(& su.si.sin_addr, &((uint32_t*)(&(sup->si6.sin6_addr)))[3], 4);
577 sa = &su.sa;
578 // sup = NULL;
579 // hlog(LOG_DEBUG, "Translating v4 mapped/compat address..");
580 }
581 #endif
582
583
584 if ( sa->sa_family == AF_INET ) {
585 eb[0] = 0;
586 sbuf[0] = 0;
587
588 getnameinfo( sa, addr_len,
589 eb, sizeof(eb), sbuf, sizeof(sbuf), NI_NUMERICHOST|NI_NUMERICSERV);
590 s = eb + strlen(eb);
591
592 sprintf(s, ":%s", sbuf);
593 } else {
594 /* presumption: IPv6 */
595 eb[0] = '[';
596 eb[1] = 0;
597 sbuf[0] = 0;
598
599 getnameinfo( sa, addr_len,
600 eb+1, sizeof(eb)-1, sbuf, sizeof(sbuf), NI_NUMERICHOST|NI_NUMERICSERV);
601 s = eb + strlen(eb);
602
603 sprintf(s, "]:%s", sbuf);
604 }
605
606 // if (!sup) hlog(LOG_DEBUG, "... to: %s", eb);
607
608 return hstrdup(eb);
609 }
610
611 /*
612 * Generate a hexadecimal representation of the socket
613 * address, as used in the APRS-IS Q construct.
614 */
615
hexsockaddr(const struct sockaddr * sa,const int addr_len)616 char *hexsockaddr(const struct sockaddr *sa, const int addr_len)
617 {
618 char eb[200];
619 union sockaddr_u su, *sup;
620 struct sockaddr_in *sa_in;
621 uint8_t *in6;
622
623 sup = (union sockaddr_u *)sa;
624 #ifdef IN6_IS_ADDR_V4MAPPED
625 if ( sa->sa_family == AF_INET6 &&
626 ( IN6_IS_ADDR_V4MAPPED(&(sup->si6.sin6_addr)) ||
627 IN6_IS_ADDR_V4COMPAT(&(sup->si6.sin6_addr)) ) ) {
628
629 memset(&su, 0, sizeof(su));
630 su.si.sin_family = AF_INET;
631 su.si.sin_port = sup->si6.sin6_port;
632 memcpy(& su.si.sin_addr, &((uint32_t*)(&(sup->si6.sin6_addr)))[3], 4);
633 sa = &su.sa;
634 // sup = NULL;
635 // hlog(LOG_DEBUG, "Translating v4 mapped/compat address..");
636 }
637 #endif
638
639 /* As per the original implementation's example, the hex address is in upper case.
640 * For IPv6, there's simply more bytes in there.
641 */
642 if ( sa->sa_family == AF_INET ) {
643 sa_in = (struct sockaddr_in *)sa;
644 sprintf(eb, "%02X%02X%02X%02X",
645 sa_in->sin_addr.s_addr & 0xff,
646 (sa_in->sin_addr.s_addr >> 8) & 0xff,
647 (sa_in->sin_addr.s_addr >> 16) & 0xff,
648 (sa_in->sin_addr.s_addr >> 24) & 0xff
649 );
650 } else {
651 /* presumption: IPv6 */
652 in6 = (uint8_t *)&sup->si6.sin6_addr.s6_addr;
653 sprintf(eb, "%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X%02X",
654 in6[0], in6[1], in6[2], in6[3], in6[4], in6[5], in6[6], in6[7],
655 in6[8], in6[9], in6[10], in6[11], in6[12], in6[13], in6[14], in6[15]);
656 }
657
658 // if (!sup) hlog(LOG_DEBUG, "... to: %s", eb);
659
660 return hstrdup(eb);
661 }
662
clientaccount_add(struct client_t * c,int l4proto,int rxbytes,int rxpackets,int txbytes,int txpackets,int rxerr,int rxdupes)663 void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int txbytes, int txpackets, int rxerr, int rxdupes)
664 {
665 struct portaccount_t *pa = NULL;
666 int rxdrops = 0;
667
668 if (rxerr < 0) {
669 rxdrops = 1;
670 if (rxerr < INERR_MIN)
671 rxerr = INERR_UNKNOWN; /* which is 0 */
672 rxerr *= -1;
673 }
674
675 /* worker local accounters do not need locks */
676 c->localaccount.rxbytes += rxbytes;
677 c->localaccount.txbytes += txbytes;
678 c->localaccount.rxpackets += rxpackets;
679 c->localaccount.txpackets += txpackets;
680 c->localaccount.rxdupes += rxdupes;
681 if (rxdrops) {
682 c->localaccount.rxdrops += 1;
683 c->localaccount.rxerrs[rxerr] += 1;
684 }
685
686 if (l4proto == IPPROTO_UDP && c->udpclient && c->udpclient->portaccount) {
687 pa = c->udpclient->portaccount;
688 } else if (c->portaccount) {
689 pa = c->portaccount;
690 }
691
692 if (pa) {
693 #ifdef HAVE_SYNC_FETCH_AND_ADD
694 __sync_fetch_and_add(&pa->rxbytes, rxbytes);
695 __sync_fetch_and_add(&pa->txbytes, txbytes);
696 __sync_fetch_and_add(&pa->rxpackets, rxpackets);
697 __sync_fetch_and_add(&pa->txpackets, txpackets);
698 __sync_fetch_and_add(&pa->rxdupes, rxdupes);
699 if (rxdrops) {
700 __sync_fetch_and_add(&pa->rxdrops, 1);
701 __sync_fetch_and_add(&pa->rxerrs[rxerr], 1);
702 }
703 #else
704 // FIXME: MUTEX !! -- this may or may not need locks..
705 pa->rxbytes += rxbytes;
706 pa->txbytes += txbytes;
707 pa->rxpackets += rxpackets;
708 pa->txpackets += txpackets;
709 pa->rxdupes += rxdupes;
710 if (rxdrops) {
711 pa->rxdrops += 1;
712 pa->rxerrs[rxerr] += 1;
713 }
714 #endif
715 }
716
717 struct portaccount_t *proto;
718
719 if (l4proto == IPPROTO_TCP)
720 proto = &client_connects_tcp;
721 else if (l4proto == IPPROTO_UDP)
722 proto = &client_connects_udp;
723 #ifdef USE_SCTP
724 else if (l4proto == IPPROTO_SCTP)
725 proto = &client_connects_sctp;
726 #endif
727 else
728 return;
729
730 #ifdef HAVE_SYNC_FETCH_AND_ADD
731 __sync_fetch_and_add(&proto->rxbytes, rxbytes);
732 __sync_fetch_and_add(&proto->txbytes, txbytes);
733 __sync_fetch_and_add(&proto->rxpackets, rxpackets);
734 __sync_fetch_and_add(&proto->txpackets, txpackets);
735 if (rxdrops) {
736 __sync_fetch_and_add(&proto->rxdrops, 1);
737 __sync_fetch_and_add(&proto->rxerrs[rxerr], 1);
738 }
739 #else
740 // FIXME: MUTEX !! -- this may or may not need locks..
741 proto->rxbytes += rxbytes;
742 proto->txbytes += txbytes;
743 proto->rxpackets += rxpackets;
744 proto->txpackets += txpackets;
745 if (rxdrops) {
746 proto->rxdrops += 1;
747 proto->rxerrs[rxerr] += 1;
748 }
749 #endif
750 }
751
protocol_str(struct client_t * c)752 static const char *protocol_str(struct client_t *c)
753 {
754 static const char unknown[] = "UNKNOWN-PROTOCOL";
755 static const char tcp[] = "TCP";
756 static const char tcp_udp[] = "TCP+UDP";
757 static const char udp[] = "UDP";
758
759 if (c->ai_protocol == IPPROTO_TCP) {
760 if (c->udp_port)
761 return tcp_udp;
762
763 return tcp;
764 }
765
766 if (c->ai_protocol == IPPROTO_UDP)
767 return udp;
768
769 #ifdef USE_SCTP
770 static const char sctp[] = "SCTP";
771
772 if (c->ai_protocol == IPPROTO_SCTP)
773 return sctp;
774 #endif
775
776 return unknown;
777 }
778
779 /*
780 * close and forget a client connection
781 */
782
client_close(struct worker_t * self,struct client_t * c,int errnum)783 void client_close(struct worker_t *self, struct client_t *c, int errnum)
784 {
785 int pe;
786
787 // TODO: log validation status, ssl status, ssl cert info, tcp/sctp
788
789 hlog( LOG_INFO, "%s %s %s (%s) closed after %d s: %s, tx/rx %lld/%lld bytes %lld/%lld pkts, dropped %lld, fd %d, worker %d%s%s%s%s",
790 ( (c->flags & CLFLAGS_UPLINKPORT)
791 ? ((c->state == CSTATE_COREPEER) ? "Peer" : "Uplink") : "Client" ),
792 protocol_str(c),
793 c->addr_rem,
794 ((c->username[0]) ? c->username : "?"),
795 tick - c->connect_tick,
796 ((errnum >= 0) ? strerror(errnum) : aprsc_strerror(errnum)),
797 c->localaccount.txbytes,
798 c->localaccount.rxbytes,
799 c->localaccount.txpackets,
800 c->localaccount.rxpackets,
801 c->localaccount.rxdrops,
802 c->fd,
803 self->id,
804 (*c->app_name) ? " app " : "",
805 (*c->app_name) ? c->app_name : "",
806 (*c->app_version) ? " ver " : "",
807 (*c->app_version) ? c->app_version : ""
808 );
809
810 if (c->localaccount.rxdrops) {
811 char s[256] = "";
812 int p = 0;
813 int i;
814
815 for (i = 0; i < INERR_BUCKETS; i++) {
816 if (c->localaccount.rxerrs[i]) {
817 p += snprintf(s+p, 256-p-2, "%s%s %lld",
818 (p == 0) ? "" : ", ",
819 inerr_labels[i], c->localaccount.rxerrs[i]);
820 }
821 }
822
823 hlog(LOG_INFO, "%s (%s) rx drops: %s",
824 c->addr_rem, c->username, s);
825 }
826
827 /* remove from polling list */
828 if (c->xfd) {
829 //hlog(LOG_DEBUG, "client_close: xpoll_remove %p fd %d", c->xfd, c->xfd->fd);
830 xpoll_remove(&self->xp, c->xfd);
831 }
832
833 /* close */
834 if (c->fd >= 0) {
835 close(c->fd);
836 }
837
838 c->fd = -1;
839
840 /* If this thread already owns the mutex (we're closing the socket
841 * while traversing the thread's client list), FreeBSD's mutex lock
842 * will fail with EDEADLK:
843 *
844 * 2012/08/15 10:03:34.065703 aprsc[41159:800f12fc0] ERROR:
845 * client_close(worker 1): could not lock clients_mutex: Resource deadlock
846 * avoided
847 *
848 * If this happens, let's remember we've locked the mutex earlier,
849 * and let's not unlock it either.
850 *
851 * The current example of this is when collect_new_clients() sends
852 * the "# aprsc VERSION" login string to new clients. The client_printf()
853 * may fail if the client connects and disconnects very quickly, and
854 * this will cause it to client_close() during the collection loop.
855 */
856 if ((pe = pthread_mutex_lock(&self->clients_mutex)) && pe != EDEADLK) {
857 hlog(LOG_ERR, "client_close(worker %d): could not lock clients_mutex: %s", self->id, strerror(pe));
858 return;
859 }
860 if (pe == EDEADLK) {
861 hlog(LOG_ERR, "client_close(worker %d): could not lock clients_mutex (ignoring): %s", self->id, strerror(pe));
862 }
863
864 /* link the list together over this node */
865 if (c->next)
866 c->next->prevp = c->prevp;
867 *c->prevp = c->next;
868
869 /* link the classified clients list together over this node, but only if
870 * the client has fully logged in and classification has been done
871 */
872 if (c->class_prevp) {
873 *c->class_prevp = c->class_next;
874 if (c->class_next)
875 c->class_next->class_prevp = c->class_prevp;
876 }
877
878 /* If this happens to be the uplink, tell the uplink connection
879 * setup module that the connection has gone away.
880 */
881 if (c->flags & CLFLAGS_UPLINKPORT && c->state != CSTATE_COREPEER)
882 uplink_close(c, errnum);
883
884 if (c->portaccount) {
885 /* If port accounting is done, handle population accounting... */
886 //hlog(LOG_DEBUG, "client_close dropping inbound_connects_account %p", c->portaccount);
887 inbound_connects_account(0, c->portaccount);
888 c->portaccount = NULL;
889 } else {
890 hlog(LOG_DEBUG, "client_close: has no portaccount");
891 }
892
893 if (c->udp_port && c->udpclient->portaccount) {
894 inbound_connects_account(2, c->udpclient->portaccount); /* udp client count goes down */
895 }
896
897 /* free it up */
898 client_free(c);
899
900 /* if we held the lock before locking, let's not unlock it either */
901 if (pe == EDEADLK) {
902 hlog(LOG_ERR, "client_close(worker %d): closed client while holding clients_mutex", self->id);
903 } else {
904 if ((pe = pthread_mutex_unlock(&self->clients_mutex))) {
905 hlog(LOG_ERR, "client_close(worker %d): could not unlock clients_mutex: %s", self->id, strerror(pe));
906 exit(1);
907 }
908 }
909
910 /* reduce client counter */
911 self->client_count--;
912 }
913
udp_client_write(struct worker_t * self,struct client_t * c,char * p,int len)914 int udp_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
915 {
916 /* Every packet ends with CRLF, but they are not sent over UDP ! */
917 /* Existing system doesn't send keepalives via UDP.. */
918 int i = sendto( c->udpclient->fd, p, len-2, MSG_DONTWAIT,
919 &c->udpaddr.sa, c->udpaddrlen );
920
921 if (i < 0) {
922 hlog(LOG_ERR, "UDP transmit error to %s udp port %d: %s",
923 c->addr_rem, c->udp_port, strerror(errno));
924 } else if (i != len -2) {
925 hlog(LOG_ERR, "UDP transmit incomplete to %s udp port %d: wrote %d of %d bytes, errno: %s",
926 c->addr_rem, c->udp_port, i, len-2, strerror(errno));
927 }
928
929 // hlog( LOG_DEBUG, "UDP from %d to client port %d, sendto rc=%d", c->udpclient->portnum, c->udp_port, i );
930
931 if (i > 0)
932 clientaccount_add( c, IPPROTO_UDP, 0, 0, i, 0, 0, 0);
933
934 return i;
935 }
936
937 /*
938 * Put outgoing data in obuf
939 */
940
client_buffer_outgoing_data(struct worker_t * self,struct client_t * c,char * p,int len)941 static int client_buffer_outgoing_data(struct worker_t *self, struct client_t *c, char *p, int len)
942 {
943 if (c->obuf_end + len > c->obuf_size) {
944 /* Oops, cannot append the data to the output buffer.
945 * Check if we can make space for it by moving data
946 * towards the beginning of the buffer?
947 */
948 if (len > c->obuf_size - (c->obuf_end - c->obuf_start)) {
949 /* Oh crap, the data will not fit even if we move stuff. */
950 hlog(LOG_DEBUG, "client_write(%s) can not fit new data in buffer; disconnecting", c->addr_rem);
951 client_close(self, c, CLIERR_OUTPUT_BUFFER_FULL);
952 return -12;
953 }
954
955 /* okay, move stuff to the beginning to make space in the end */
956 if (c->obuf_start > 0)
957 memmove((void *)c->obuf, (void *)c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
958 c->obuf_end -= c->obuf_start;
959 c->obuf_start = 0;
960 }
961
962 /* copy data to the output buffer */
963 if (len > 0)
964 memcpy((void *)c->obuf + c->obuf_end, p, len);
965 c->obuf_end += len;
966
967 return 0;
968 }
969
970 /*
971 * write data to a client (well, at least put it in the output buffer)
972 * (this is also used with len=0 to flush current buffer)
973 */
974
975 #ifdef USE_SSL
ssl_client_write(struct worker_t * self,struct client_t * c,char * p,int len)976 static int ssl_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
977 {
978 c->obuf_writes++;
979
980 if (len > 0)
981 clientaccount_add( c, c->ai_protocol, 0, 0, len, 0, 0, 0);
982
983 if (client_buffer_outgoing_data(self, c, p, len) == -12)
984 return -12;
985
986 if (c->obuf_end > c->obuf_flushsize || ((len == 0) && (c->obuf_end > c->obuf_start)))
987 return ssl_write(self, c);
988
989 /* tell the poller that we have outgoing data */
990 xpoll_outgoing(&self->xp, c->xfd, 1);
991
992 /* just buffer */
993 return len;
994 }
995 #endif
996
tcp_client_write(struct worker_t * self,struct client_t * c,char * p,int len)997 static int tcp_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
998 {
999 int i, e;
1000
1001 //hlog(LOG_DEBUG, "client_write: %*s\n", len, p);
1002
1003 /* a TCP client with a udp downstream socket? */
1004 if (c->udp_port && c->udpclient && len > 0 && *p != '#') {
1005 return udp_client_write(self, c, p, len);
1006 }
1007
1008 /* Count the number of writes towards this client, the keepalive
1009 manager monitors this counter to determine if the socket should be
1010 kept in BUFFERED mode, or written immediately every time.
1011 Buffer flushing is done every KEEPALIVE_POLL_FREQ (2) seconds.
1012 */
1013 c->obuf_writes++;
1014
1015 if (len > 0) {
1016 /* Here, we only increment the bytes counter. Packets counter
1017 * will be incremented only when we actually transmit a packet
1018 * instead of a keepalive.
1019 */
1020 clientaccount_add( c, c->ai_protocol, 0, 0, len, 0, 0, 0);
1021 }
1022
1023 if (client_buffer_outgoing_data(self, c, p, len) == -12)
1024 return -12;
1025
1026 /* Is it over the flush size ? */
1027 if (c->obuf_end > c->obuf_flushsize || ((len == 0) && (c->obuf_end > c->obuf_start))) {
1028 /* TODO: move this code to client_try_write and call it */
1029
1030 /*if (c->obuf_end > c->obuf_flushsize)
1031 * hlog(LOG_DEBUG, "flushing fd %d since obuf_end %d > %d", c->fd, c->obuf_end, c->obuf_flushsize);
1032 */
1033 write_retry_2:;
1034 i = write(c->fd, c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
1035 e = errno;
1036 if (i < 0 && e == EINTR)
1037 goto write_retry_2;
1038 if (i < 0 && e == EPIPE) {
1039 /* Remote socket closed.. */
1040 hlog(LOG_DEBUG, "client_write(%s) fails/2 EPIPE; disconnecting; %s", c->addr_rem, strerror(e));
1041 // WARNING: This also destroys the client object!
1042 client_close(self, c, e);
1043 return -9;
1044 }
1045 if (i < 0 && (e == EAGAIN || e == EWOULDBLOCK)) {
1046 /* Kernel's transmit buffer is full (per-socket or some more global resource).
1047 * This happens even with small amounts of data in real world:
1048 * aprsc INFO: Client xx.yy.zz.ff:22823 (XXXXX) closed after 1 s:
1049 * Resource temporarily unavailable, tx/rx 735/51 bytes 8/0 pkts,
1050 * dropped 0, fd 59, worker 1 app aprx ver 2.00
1051 */
1052 hlog(LOG_DEBUG, "client_write(%s) fails/2c; %s", c->addr_rem, strerror(e));
1053 return -1;
1054 }
1055 if (i < 0 && len != 0) {
1056 hlog(LOG_DEBUG, "client_write(%s) fails/2d; disconnecting; %s", c->addr_rem, strerror(e));
1057 client_close(self, c, e);
1058 return -11;
1059 }
1060 if (i > 0) {
1061 //hlog(LOG_DEBUG, "client_write(%s) wrote %d", c->addr_rem, i);
1062 c->obuf_start += i;
1063 c->obuf_wtime = tick;
1064 }
1065 }
1066
1067 /* All done ? */
1068 if (c->obuf_start >= c->obuf_end) {
1069 //hlog(LOG_DEBUG, "client_write(%s) obuf empty", c->addr_rem);
1070 c->obuf_start = 0;
1071 c->obuf_end = 0;
1072 return len;
1073 }
1074
1075 /* tell the poller that we have outgoing data */
1076 xpoll_outgoing(&self->xp, c->xfd, 1);
1077
1078 return len;
1079 }
1080
1081 /*
1082 * printf to a client
1083 */
1084
client_printf(struct worker_t * self,struct client_t * c,const char * fmt,...)1085 int client_printf(struct worker_t *self, struct client_t *c, const char *fmt, ...)
1086 {
1087 va_list args;
1088 char s[PACKETLEN_MAX];
1089 int i;
1090
1091 va_start(args, fmt);
1092 i = vsnprintf(s, PACKETLEN_MAX, fmt, args);
1093 va_end(args);
1094
1095 if (i < 0 || i >= PACKETLEN_MAX) {
1096 hlog(LOG_ERR, "client_printf vsnprintf failed to %s: '%s'", c->addr_rem, fmt);
1097 return -1;
1098 }
1099
1100 return c->write(self, c, s, i);
1101 }
1102
1103 /*
1104 * tell the client once that it has bad filter definition
1105 */
client_bad_filter_notify(struct worker_t * self,struct client_t * c,const char * filt)1106 int client_bad_filter_notify(struct worker_t *self, struct client_t *c, const char *filt)
1107 {
1108 if (!c->warned) {
1109 c->warned = 1;
1110 return client_printf(self, c, "# Warning: Bad filter: %s\r\n", filt);
1111 }
1112 return 0;
1113 }
1114
1115 /*
1116 * Receive UDP packets from a core peer
1117 */
1118
handle_corepeer_readable(struct worker_t * self,struct client_t * c)1119 static int handle_corepeer_readable(struct worker_t *self, struct client_t *c)
1120 {
1121 struct client_t *rc = NULL; // real client
1122 union sockaddr_u addr;
1123 socklen_t addrlen;
1124 int i;
1125 int r;
1126 char *addrs;
1127
1128 addrlen = sizeof(addr);
1129 r = recvfrom( c->udpclient->fd, c->ibuf, c->ibuf_size-1,
1130 MSG_DONTWAIT|MSG_TRUNC, (struct sockaddr *)&addr, &addrlen );
1131
1132 if (r < 0) {
1133 if (errno == EINTR || errno == EAGAIN)
1134 return 0; /* D'oh.. return again latter */
1135
1136 hlog( LOG_DEBUG, "recv: Error from corepeer UDP socket fd %d (%s): %s",
1137 c->udpclient->fd, c->addr_rem, strerror(errno));
1138
1139 return 0;
1140 }
1141
1142 if (r == 0) {
1143 hlog( LOG_DEBUG, "recv: EOF from corepeer UDP socket fd %d (%s)",
1144 c->udpclient->fd, c->addr_rem);
1145 return 0;
1146 }
1147
1148 // Figure the correct client/peer based on the remote IP address.
1149 for (i = 0; i < worker_corepeer_client_count; i++) {
1150 rc = worker_corepeer_clients[i];
1151
1152 if (rc->udpaddrlen != addrlen)
1153 continue;
1154 if (rc->udpaddr.sa.sa_family != addr.sa.sa_family)
1155 continue;
1156
1157 if (addr.sa.sa_family == AF_INET) {
1158 if (memcmp(&rc->udpaddr.si.sin_addr, &addr.si.sin_addr, sizeof(addr.si.sin_addr)) != 0)
1159 continue;
1160 if (rc->udpaddr.si.sin_port != addr.si.sin_port)
1161 continue;
1162
1163 break;
1164 } else if (addr.sa.sa_family == AF_INET6) {
1165 if (memcmp(&rc->udpaddr.si6.sin6_addr, &addr.si6.sin6_addr, sizeof(addr.si6.sin6_addr)) != 0)
1166 continue;
1167 if (rc->udpaddr.si6.sin6_port != addr.si6.sin6_port)
1168 continue;
1169
1170 break;
1171 }
1172 }
1173
1174 if (i == worker_corepeer_client_count || !rc) {
1175 addrs = strsockaddr(&addr.sa, addrlen);
1176 hlog(LOG_INFO, "recv: Received UDP peergroup packet from unknown peer address %s: %*s", addrs, r, c->ibuf);
1177 hfree(addrs);
1178 return 0;
1179 }
1180
1181 /*
1182 addrs = strsockaddr(&addr.sa, addrlen);
1183 hlog(LOG_DEBUG, "worker thread passing UDP packet from %s to handler: %*s", addrs, r, c->ibuf);
1184 hfree(addrs);
1185 */
1186 clientaccount_add( rc, IPPROTO_UDP, r, 0, 0, 0, 0, 0); /* Account byte count. incoming_handler() will account packets. */
1187 rc->last_read = tick;
1188
1189 /* Ignore CRs and LFs in UDP input packet - the current core peer system puts 1 APRS packet in each
1190 * UDP frame.
1191 * TODO: consider processing multiple packets from an UDP frame, split up by CRLF.
1192 */
1193 for (i = 0; i < r; i++) {
1194 if (c->ibuf[i] == '\r' || c->ibuf[i] == '\n') {
1195 r = i;
1196 break;
1197 }
1198 }
1199
1200 c->handler_line_in(self, rc, IPPROTO_UDP, c->ibuf, r);
1201
1202 return 0;
1203 }
1204
1205 /*
1206 * Process incoming data from client after reading
1207 */
1208
client_postread(struct worker_t * self,struct client_t * c,int r)1209 int client_postread(struct worker_t *self, struct client_t *c, int r)
1210 {
1211 char *s;
1212 char *ibuf_end;
1213 char *row_start;
1214
1215 clientaccount_add(c, c->ai_protocol, r, 0, 0, 0, 0, 0); /* Number of packets is now unknown,
1216 byte count is collected.
1217 The incoming_handler() will account
1218 packets. */
1219
1220 c->ibuf_end += r;
1221 // hlog( LOG_DEBUG, "read: %d bytes from client fd %d (%s) - %d in ibuf",
1222 // r, c->fd, c->addr_rem, c->ibuf_end);
1223
1224 /* parse out rows ending in CR and/or LF and pass them to the handler
1225 * without the CRLF (we accept either CR or LF or both, but make sure
1226 * to always output CRLF
1227 */
1228 ibuf_end = c->ibuf + c->ibuf_end;
1229 row_start = c->ibuf;
1230 c->last_read = tick; /* not simulated time */
1231
1232 for (s = c->ibuf; s < ibuf_end; s++) {
1233 if (*s == '\r' || *s == '\n') {
1234 /* found EOL */
1235 if (s - row_start > 0) {
1236 // int ch = *s;
1237 // *s = 0;
1238 // hlog( LOG_DEBUG, "got: %s\n", row_start );
1239 // *s = ch;
1240
1241 /* NOTE: handler call CAN destroy the c-> object ! */
1242 if (c->handler_line_in(self, c, c->ai_protocol, row_start, s - row_start) < 0)
1243 return -1;
1244 }
1245 /* skip the first, just-found part of EOL, which might have been
1246 * NULled by the login handler (TODO: make it not NUL it) */
1247 s++;
1248 /* skip the rest of EOL */
1249 while (s < ibuf_end && (*s == '\r' || *s == '\n'))
1250 s++;
1251 row_start = s;
1252 }
1253 }
1254
1255 if (row_start >= ibuf_end) {
1256 /* ok, we processed the whole buffer, just mark it empty */
1257 c->ibuf_end = 0;
1258 } else if (row_start != c->ibuf) {
1259 /* ok, we found data... move the buffer contents to the beginning */
1260 c->ibuf_end = ibuf_end - row_start;
1261 memmove(c->ibuf, row_start, c->ibuf_end);
1262 }
1263
1264 return 0;
1265 }
1266
1267
1268
1269 /*
1270 * handle an event on an fd
1271 */
1272
handle_client_readable(struct worker_t * self,struct client_t * c)1273 static int handle_client_readable(struct worker_t *self, struct client_t *c)
1274 {
1275 int r;
1276
1277 r = read(c->fd, c->ibuf + c->ibuf_end, c->ibuf_size - c->ibuf_end - 1);
1278
1279 if (r == 0) {
1280 hlog( LOG_DEBUG, "read: EOF from socket fd %d (%s @ %s)",
1281 c->fd, c->addr_rem, c->addr_loc );
1282 client_close(self, c, CLIERR_EOF);
1283 return -1;
1284 }
1285
1286 if (r < 0) {
1287 if (errno == EINTR || errno == EAGAIN)
1288 return 0; /* D'oh.. return again later */
1289
1290 hlog( LOG_DEBUG, "read: Error from socket fd %d (%s): %s",
1291 c->fd, c->addr_rem, strerror(errno));
1292 hlog( LOG_DEBUG, " .. ibuf=%p ibuf_end=%d ibuf_size=%d",
1293 c->ibuf, c->ibuf_end, c->ibuf_size-c->ibuf_end-1);
1294 client_close(self, c, errno);
1295 return -1;
1296 }
1297
1298 return client_postread(self, c, r);
1299 }
1300
1301 /*
1302 * client fd is now writaable
1303 */
1304
handle_client_writable(struct worker_t * self,struct client_t * c)1305 static int handle_client_writable(struct worker_t *self, struct client_t *c)
1306 {
1307 int r;
1308
1309 /* TODO: call client_try_write */
1310 r = write(c->fd, c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
1311 if (r < 0) {
1312 if (errno == EINTR || errno == EAGAIN) {
1313 hlog(LOG_DEBUG, "writable: Would block fd %d (%s): %s", c->fd, c->addr_rem, strerror(errno));
1314 return 0;
1315 }
1316
1317 hlog(LOG_DEBUG, "writable: Error from socket fd %d (%s): %s", c->fd, c->addr_rem, strerror(errno));
1318 client_close(self, c, errno);
1319 return -1;
1320 }
1321
1322 c->obuf_start += r;
1323 //hlog(LOG_DEBUG, "writable: %d bytes to socket fd %d (%s) - %d in obuf", r, c->fd, c->addr_rem, c->obuf_end - c->obuf_start);
1324
1325 if (c->obuf_start == c->obuf_end) {
1326 xpoll_outgoing(&self->xp, c->xfd, 0);
1327 c->obuf_start = c->obuf_end = 0;
1328 }
1329
1330 return 0;
1331 }
1332
handle_client_event(struct xpoll_t * xp,struct xpoll_fd_t * xfd)1333 static int handle_client_event(struct xpoll_t *xp, struct xpoll_fd_t *xfd)
1334 {
1335 struct worker_t *self = (struct worker_t *)xp->tp;
1336 struct client_t *c = (struct client_t *)xfd->p;
1337
1338 //hlog(LOG_DEBUG, "handle_client_event(%d): %d", xfd->fd, xfd->result);
1339
1340 if (xfd->result & XP_OUT) { /* priorize doing output */
1341 /* ah, the client is writable */
1342
1343 if (c->obuf_start == c->obuf_end) {
1344 /* there is nothing to write any more */
1345 //hlog(LOG_DEBUG, "client writable: nothing to write on fd %d (%s)", c->fd, c->addr_rem);
1346 xpoll_outgoing(&self->xp, c->xfd, 0);
1347 c->obuf_start = c->obuf_end = 0;
1348 } else {
1349 if (c->handler_client_writable(self, c) < 0)
1350 return 0;
1351 }
1352 }
1353
1354 if (xfd->result & XP_IN) { /* .. before doing input */
1355 /* Is this really necessary any more?
1356 if (c->fd < 0) {
1357 hlog(LOG_DEBUG, "fd %d: socket no longer alive, closing (%s)", c->fd, c->addr_rem);
1358 client_close(self, c, CLIERR_FD_NUM_INVALID);
1359 return -1;
1360 }
1361 */
1362
1363 /* ok, read */
1364 c->handler_client_readable(self, c);
1365 /* c might be invalid now, don't touch it */
1366 }
1367
1368 return 0;
1369 }
1370
1371 /*
1372 * Classify a client and put it in the correct outgoing processor class
1373 * list. This will enable outgoing packet transmission for the client.
1374 */
1375
worker_classify_client(struct worker_t * self,struct client_t * c)1376 static void worker_classify_client(struct worker_t *self, struct client_t *c)
1377 {
1378 struct client_t *class_next = NULL;
1379 struct client_t **class_prevp = NULL;
1380
1381 if (c->flags & CLFLAGS_PORT_RO) {
1382 //hlog(LOG_DEBUG, "classify_client(worker %d): client fd %d classified readonly", self->id, c->fd);
1383 class_next = self->clients_ro;
1384 class_prevp = &self->clients_ro;
1385 } else if (c->state == CSTATE_COREPEER || (c->flags & CLFLAGS_UPLINKPORT)) {
1386 //hlog(LOG_DEBUG, "classify_client(worker %d): client fd %d classified upstream/peer", self->id, c->fd);
1387 class_next = self->clients_ups;
1388 class_prevp = &self->clients_ups;
1389 } else if (c->flags & CLFLAGS_DUPEFEED) {
1390 //hlog(LOG_DEBUG, "classify_client(worker %d): client fd %d classified dupefeed", self->id, c->fd);
1391 class_next = self->clients_dupe;
1392 class_prevp = &self->clients_dupe;
1393 } else if (c->flags & CLFLAGS_INPORT) {
1394 //hlog(LOG_DEBUG, "classify_client(worker %d): client fd %d classified other", self->id, c->fd);
1395 class_next = self->clients_other;
1396 class_prevp = &self->clients_other;
1397 } else {
1398 hlog(LOG_ERR, "classify_client(worker %d): client fd %d NOT CLASSIFIED - will not get any packets", self->id, c->fd);
1399 return;
1400 }
1401
1402 c->class_next = class_next;
1403 if (class_next)
1404 class_next->class_prevp = &c->class_next;
1405 *class_prevp = c;
1406 c->class_prevp = class_prevp;
1407 }
1408
1409 /*
1410 * Mark the client connected and do whatever processing is needed
1411 * to start transmitting data to it.
1412 */
1413
worker_mark_client_connected(struct worker_t * self,struct client_t * c)1414 void worker_mark_client_connected(struct worker_t *self, struct client_t *c)
1415 {
1416 c->state = CSTATE_CONNECTED;
1417
1418 set_client_sockopt_post_login(c);
1419
1420 /* classify the client and put it in the right list of clients for
1421 * outgoing data to start flowing.
1422 */
1423 worker_classify_client(self, c);
1424 }
1425
1426 /*
1427 * move new clients from the new clients queue to the worker thread
1428 */
1429
collect_new_clients(struct worker_t * self)1430 static void collect_new_clients(struct worker_t *self)
1431 {
1432 int pe, n, i;
1433 struct client_t *new_clients, *c;
1434
1435 /* lock the queue */
1436 if ((pe = pthread_mutex_lock(&self->new_clients_mutex))) {
1437 hlog(LOG_ERR, "collect_new_clients(worker %d): could not lock new_clients_mutex: %s", self->id, strerror(pe));
1438 return;
1439 }
1440
1441 /* quickly grab the new clients to a local variable */
1442 new_clients = self->new_clients;
1443 self->new_clients = NULL;
1444 self->new_clients_last = NULL;
1445
1446 /* unlock */
1447 if ((pe = pthread_mutex_unlock(&self->new_clients_mutex))) {
1448 hlog(LOG_ERR, "collect_new_clients(worker %d): could not unlock new_clients_mutex: %s", self->id, strerror(pe));
1449 /* we'd be going to deadlock here... */
1450 exit(1);
1451 }
1452
1453 if ((pe = pthread_mutex_lock(&self->clients_mutex))) {
1454 hlog(LOG_ERR, "collect_new_clients(worker %d): could not lock clients_mutex: %s", self->id, strerror(pe));
1455 return;
1456 }
1457
1458 /* move the new clients to the thread local client list */
1459 n = self->xp.pollfd_used;
1460 i = 0;
1461 while (new_clients) {
1462 i++;
1463 c = new_clients;
1464 new_clients = c->next;
1465
1466 if (c->fd < -1) {
1467 if (c->fd == -2) {
1468 /* corepeer reconfig flag */
1469 hlog(LOG_DEBUG, "collect_new_clients(worker %d): closing all existing peergroup peers", self->id);
1470 corepeer_close_all(self);
1471 } else {
1472 hlog(LOG_NOTICE, "collect_new_clients(worker %d): odd fd on new client: %d", self->id, c->fd);
1473 }
1474
1475 client_free(c);
1476 i--; /* don't count it in */
1477 continue;
1478 }
1479
1480 self->client_count++;
1481 // hlog(LOG_DEBUG, "collect_new_clients(worker %d): got client fd %d", self->id, c->fd);
1482 c->next = self->clients;
1483 if (c->next)
1484 c->next->prevp = &c->next;
1485 self->clients = c;
1486 c->prevp = &self->clients;
1487
1488 /* If this client is already in connected state, classify it
1489 * (live upgrading). Also, if it's a corepeer, it's not going to
1490 * "log in" later and it needs to be classified now.
1491 */
1492 if (c->state == CSTATE_CONNECTED || c->state == CSTATE_COREPEER)
1493 worker_classify_client(self, c);
1494
1495 /* If the new client is an UDP core peer, we will add its FD to the
1496 * polling list, but only once. There is only a single listener socket
1497 * for a single peer group.
1498 */
1499 if (c->state == CSTATE_COREPEER) {
1500 /* add to corepeer client list and polling list */
1501 hlog(LOG_DEBUG, "collect_new_clients(worker %d): got core peergroup peer, UDP fd %d", self->id, c->udpclient->fd);
1502
1503 if (worker_corepeer_client_count == MAX_COREPEERS) {
1504 hlog(LOG_ERR, "worker: Too many core peergroup peers (max %d)", MAX_COREPEERS);
1505 exit(1);
1506 }
1507
1508 /* build a static array of clients, for quick searching based on address */
1509 c->fd = worker_corepeer_client_count * -1 - 100;
1510 worker_corepeer_clients[worker_corepeer_client_count] = c;
1511 worker_corepeer_client_count++;
1512
1513 if (!c->udpclient->polled) {
1514 c->udpclient->polled = 1;
1515 c->xfd = xpoll_add(&self->xp, c->udpclient->fd, (void *)c);
1516 hlog(LOG_DEBUG, "collect_new_clients(worker %d): starting poll for UDP fd %d xfd %p", self->id, c->udpclient->fd, c->xfd);
1517 }
1518
1519 c->handler_client_readable = &handle_corepeer_readable;
1520 c->write = &udp_client_write;
1521
1522 continue;
1523 }
1524
1525 /* add to polling list */
1526 c->xfd = xpoll_add(&self->xp, c->fd, (void *)c);
1527 hlog(LOG_DEBUG, "collect_new_clients(worker %d): added fd %d to polling list, xfd %p", self->id, c->fd, c->xfd);
1528 if (!c->xfd) {
1529 /* ouch, out of xfd space */
1530 shutdown(c->fd, SHUT_RDWR);
1531 continue;
1532 }
1533
1534 #ifdef USE_SCTP
1535 if (c->ai_protocol == IPPROTO_SCTP) {
1536 c->handler_client_readable = &sctp_readable;
1537 c->handler_client_writable = &sctp_writable;
1538 c->write = &sctp_client_write;
1539 } else
1540 #endif
1541 #ifdef USE_SSL
1542 if (c->ssl_con) {
1543 hlog(LOG_DEBUG, "collect_new_clients(worker %d): fd %d uses SSL", self->id, c->fd);
1544 c->handler_client_readable = &ssl_readable;
1545 c->handler_client_writable = &ssl_writable;
1546 c->write = &ssl_client_write;
1547 } else
1548 #endif
1549 {
1550 c->handler_client_readable = &handle_client_readable;
1551 c->handler_client_writable = &handle_client_writable;
1552 c->write = &tcp_client_write;
1553 }
1554
1555 /* The new client may end up destroyed right away, never mind it here.
1556 * We will notice it later and discard the client.
1557 */
1558
1559 /* According to http://www.aprs-is.net/ServerDesign.aspx, the server must
1560 * initially transmit it's software name and version string.
1561 * In case of a live upgrade, this should maybe be skipped, but
1562 * I'll leave it in for now.
1563 */
1564 if (c->flags & CLFLAGS_INPORT)
1565 client_printf(self, c, "# %s\r\n", (fake_version) ? fake_version : verstr_aprsis);
1566
1567 /* If the write failed immediately, c is already invalid at this point. Don't touch it. */
1568 }
1569
1570 if ((pe = pthread_mutex_unlock(&self->clients_mutex))) {
1571 hlog(LOG_ERR, "collect_new_clients(worker %d): could not unlock clients_mutex: %s", self->id, strerror(pe));
1572 exit(1);
1573 }
1574
1575 hlog( LOG_DEBUG, "Worker %d accepted %d new clients, %d new connections, now total %d clients",
1576 self->id, i, self->xp.pollfd_used - n, self->client_count );
1577 }
1578
1579 /*
1580 * Send keepalives to client sockets, run this once a second
1581 * This watches also obuf_wtime becoming too old, and also about
1582 * the number of writes on socket in previous run interval to
1583 * auto-adjust socket buffering mode.
1584 */
send_keepalives(struct worker_t * self)1585 static void send_keepalives(struct worker_t *self)
1586 {
1587 struct client_t *c, *cnext;
1588 struct tm t;
1589 char buf[230], *s;
1590 int len0, len, rc;
1591 static const char *monthname[12] = {"Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"};
1592 time_t w_expire = tick - sock_write_expire;
1593
1594 // Example message:
1595 // # javAPRSSrvr 3.12b12 1 Mar 2008 15:11:20 GMT T2FINLAND 85.188.1.32:14580
1596
1597 sprintf(buf, "# %.40s ", verstr_aprsis);
1598 s = buf + strlen(buf);
1599
1600 memset(&t, 0, sizeof(t));
1601 gmtime_r(&now, &t);
1602
1603 // s += strftime(s, 40, "%d %b %Y %T GMT", &t);
1604 // However that depends upon LOCALE, thus following:
1605 s += sprintf(s, "%d %s %d %02d:%02d:%02d GMT",
1606 t.tm_mday, monthname[t.tm_mon], t.tm_year + 1900,
1607 t.tm_hour, t.tm_min, t.tm_sec);
1608
1609 s += sprintf(s, " %s ", serverid);
1610
1611 len0 = (s - buf);
1612
1613 for (c = self->clients; (c); c = cnext) {
1614 // the c may get destroyed from underneath of ourselves!
1615 cnext = c->next;
1616
1617 /* No keepalives on PEER links.. */
1618 if ( c->state == CSTATE_COREPEER )
1619 continue;
1620
1621 /* is it time to clean up? */
1622 if (c->cleanup <= tick || c->cleanup > tick + 120+120) {
1623 c->cleanup = tick + 120;
1624 client_heard_expire(c);
1625 }
1626
1627 /* Is it time for keepalive? Also send a keepalive if clock jumped backwards. */
1628 if ((c->keepalive <= tick)
1629 || (c->keepalive > tick + keepalive_interval)) {
1630 int flushlevel = c->obuf_flushsize;
1631 c->keepalive = tick + keepalive_interval;
1632
1633 len = len0 + sprintf(s, "%s\r\n", c->addr_loc);
1634
1635 c->obuf_flushsize = 0;
1636 /* Write out immediately */
1637 rc = c->write(self, c, buf, len);
1638 if (rc < -2) continue; // destroyed
1639 c->obuf_flushsize = flushlevel;
1640 } else {
1641 /* just fush if there was anything to write */
1642 if (c->ai_protocol == IPPROTO_TCP) {
1643 rc = c->write(self, c, buf, 0);
1644 if (rc < -2) continue; // destroyed..
1645 }
1646 }
1647
1648 /* Check for input timeouts. These will currently also kick in if the
1649 * real-time clock jumps backwards for some reason.
1650 */
1651 if (c->flags & CLFLAGS_INPORT) {
1652 if (c->state != CSTATE_CONNECTED) {
1653 if (c->connect_tick <= tick - client_login_timeout) {
1654 hlog(LOG_DEBUG, "%s: Closing client fd %d due to login timeout (%d s)",
1655 c->addr_rem, c->fd, client_login_timeout);
1656 client_close(self, c, CLIERR_LOGIN_TIMEOUT);
1657 continue;
1658 }
1659 } else {
1660 if (c->last_read <= tick - client_timeout) {
1661 hlog(LOG_DEBUG, "%s: Closing client fd %d due to inactivity (%d s)",
1662 c->addr_rem, c->fd, client_timeout);
1663 client_close(self, c, CLIERR_INACTIVITY);
1664 continue;
1665 }
1666 }
1667 } else {
1668 if (c->last_read <= tick - upstream_timeout) {
1669 hlog(LOG_INFO, "%s: Closing uplink fd %d due to inactivity (%d s)",
1670 c->addr_rem, c->fd, upstream_timeout);
1671 client_close(self, c, CLIERR_INACTIVITY);
1672 continue;
1673 }
1674 }
1675
1676 /* check for write timeouts */
1677 if (c->obuf_wtime < w_expire && c->state != CSTATE_UDP) {
1678 // TOO OLD! Shutdown the client
1679 hlog(LOG_DEBUG, "%s: Closing connection fd %d due to obuf wtime timeout",
1680 c->addr_rem, c->fd);
1681 client_close(self, c, CLIERR_OUTPUT_WRITE_TIMEOUT);
1682 continue;
1683 }
1684
1685 /* Adjust buffering, try not to jump back and forth between buffered and unbuffered.
1686 * Please note that the we always flush the buffer at the end of a round if the
1687 * client socket is writable (OS buffer not full), so we don't really wait for
1688 * obuf_flushsize to be reached. Buffering will just make a couple of packets sent
1689 * go in the same write().
1690 */
1691 if (c->obuf_writes > obuf_writes_threshold) {
1692 // Lots and lots of writes, switch to buffering...
1693 if (c->obuf_flushsize == 0) {
1694 c->obuf_flushsize = c->obuf_size / 2;
1695 //hlog( LOG_DEBUG,"Switch fd %d (%s) to buffered writes (%d writes), flush at %d",
1696 // c->fd, c->addr_rem, c->obuf_writes, c->obuf_flushsize);
1697 }
1698 } else if (c->obuf_flushsize != 0 && c->obuf_writes < obuf_writes_threshold_hys) {
1699 // Not so much writes, back to "write immediate"
1700 //hlog( LOG_DEBUG,"Switch fd %d (%s) to unbuffered writes (%d writes)",
1701 // c->fd, c->addr_rem, c->obuf_writes);
1702 c->obuf_flushsize = 0;
1703 }
1704
1705 c->obuf_writes = 0;
1706 }
1707 }
1708
1709
1710 /*
1711 * Worker thread
1712 */
1713
worker_thread(struct worker_t * self)1714 void worker_thread(struct worker_t *self)
1715 {
1716 sigset_t sigs_to_block;
1717 time_t next_keepalive = tick + 2;
1718 time_t next_24h_cleanup = tick + 86400;
1719 char myname[20];
1720 struct pbuf_t *p;
1721 #if 0
1722 time_t next_lag_query = tick + 10;
1723 #endif
1724 time_t t1, t2, t3, t4, t5, t6, t7;
1725
1726 sprintf(myname,"worker %d", self->id);
1727 pthreads_profiling_reset(myname);
1728
1729 sigemptyset(&sigs_to_block);
1730 sigaddset(&sigs_to_block, SIGALRM);
1731 sigaddset(&sigs_to_block, SIGINT);
1732 sigaddset(&sigs_to_block, SIGTERM);
1733 sigaddset(&sigs_to_block, SIGQUIT);
1734 sigaddset(&sigs_to_block, SIGHUP);
1735 sigaddset(&sigs_to_block, SIGURG);
1736 sigaddset(&sigs_to_block, SIGPIPE);
1737 sigaddset(&sigs_to_block, SIGUSR1);
1738 sigaddset(&sigs_to_block, SIGUSR2);
1739 pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
1740
1741 hlog(LOG_DEBUG, "Worker %d started.", self->id);
1742
1743 while (!self->shutting_down) {
1744 t1 = tick;
1745
1746 /* if we have new stuff in the global packet buffer, process it */
1747 if (*self->pbuf_global_prevp || *self->pbuf_global_dupe_prevp)
1748 process_outgoing(self);
1749
1750 t2 = tick;
1751
1752 // TODO: calculate different delay based on outgoing lag ?
1753 /* poll for incoming traffic */
1754 xpoll(&self->xp, 30); // was 200, but gave too big latency
1755
1756 /* if we have stuff in the local queue, try to flush it and make
1757 * it available to the dupecheck thread
1758 */
1759 t3 = tick;
1760
1761 if (self->pbuf_incoming_local)
1762 incoming_flush(self);
1763
1764 t4 = tick;
1765
1766 if (self->new_clients)
1767 collect_new_clients(self);
1768
1769 t5 = tick;
1770
1771 /* time of next keepalive broadcast ? */
1772 if (tick >= next_keepalive || next_keepalive > tick + KEEPALIVE_POLL_FREQ*2) {
1773 next_keepalive = tick + KEEPALIVE_POLL_FREQ; /* Run them every 2 seconds */
1774 send_keepalives(self);
1775
1776 /* time of daily worker cleanup? */
1777 if (tick >= next_24h_cleanup || tick < next_24h_cleanup - 100000) {
1778 hlog(LOG_DEBUG, "worker %d: 24h clean up event", self->id);
1779 /* currently the cleanup is pretty lean. */
1780 self->internal_packet_drops = 0;
1781 next_24h_cleanup = tick + 86400;
1782 }
1783 }
1784
1785 t6 = tick;
1786 #if 0
1787 if (tick > next_lag_query) {
1788 int lag, lag1, lag2;
1789 next_lag_query += 10; // every 10 seconds..
1790 lag = outgoing_lag_report(self, &lag1, &lag2);
1791 hlog(LOG_DEBUG, "Thread %d pbuf lag %d, dupelag %d", self->id, lag1, lag2);
1792 }
1793 #endif
1794 t7 = tick;
1795
1796 #if 1
1797 if (t7-t1 > 1 || t7-t1 < 0) // only report if the delay is over 1 seconds. they are a LOT rarer
1798 hlog( LOG_DEBUG, "Worker thread %d loop step delays: dt2: %d dt3: %d dt4: %d dt5: %d dt6: %d dt7: %d",
1799 self->id, t2-t1, t3-t1, t4-t1, t5-t1, t6-t1, t7-t1 );
1800 #endif
1801 }
1802
1803 if (self->shutting_down == 2) {
1804 /* live upgrade: must free all UDP client structs - we need to close the UDP listener fd. */
1805 /* Must also disconnect all SSL clients - the SSL crypto state cannot be moved over. */
1806 struct client_t *c, *next;
1807 for (c = self->clients; (c); c = next) {
1808 next = c->next;
1809 #ifdef USE_SSL
1810 /* SSL client? */
1811 if (c->ssl_con) {
1812 client_close(self, c, CLIOK_THREAD_SHUTDOWN);
1813 continue;
1814 }
1815 #endif
1816 /* collect client state first before closing or freeing anything */
1817 if (worker_shutdown_clients && c->fd >= 0) {
1818 cJSON *jc = worker_client_json(c, 1);
1819 cJSON_AddItemToArray(worker_shutdown_clients, jc);
1820 }
1821 client_udp_free(c->udpclient);
1822 c->udpclient = NULL;
1823 }
1824 } else {
1825 /* close all clients, if not shutting down for a live upgrade */
1826 while (self->clients)
1827 client_close(self, self->clients, CLIOK_THREAD_SHUTDOWN);
1828 }
1829
1830 /* stop polling */
1831 xpoll_free(&self->xp);
1832 memset(&self->xp,0,sizeof(self->xp));
1833
1834 /* check if there is stuff in the incoming queue (not taken by dupecheck) */
1835 int pbuf_incoming_found = 0;
1836 for (p = self->pbuf_incoming; p; p = p->next) {
1837 pbuf_incoming_found++;
1838 }
1839 if (pbuf_incoming_found != self->pbuf_incoming_count) {
1840 hlog(LOG_ERR, "Worker %d: found %d packets in incoming queue, does not match count %d",
1841 self->id, pbuf_incoming_found, self->pbuf_incoming_count);
1842 }
1843 if (self->pbuf_incoming_count)
1844 hlog(LOG_INFO, "Worker %d: %d packets left in incoming queue",
1845 self->id, self->pbuf_incoming_count);
1846
1847 /* clean up thread-local pbuf pools */
1848 worker_free_buffers(self);
1849
1850 hlog(LOG_DEBUG, "Worker %d shut down%s.", self->id, (self->shutting_down == 2) ? " - clients left hanging" : "");
1851 }
1852
1853 /*
1854 * Stop workers - runs from accept_thread
1855 * stop_all: 1 => stop all threads, 2 => stop all threads for live upgrade
1856 */
1857
workers_stop(int stop_all)1858 void workers_stop(int stop_all)
1859 {
1860 struct worker_t *w;
1861 int e;
1862 int stopped = 0;
1863
1864 hlog(LOG_INFO, "Stopping %d worker threads...",
1865 (stop_all) ? workers_running : workers_running - workers_configured);
1866 while (workers_running > workers_configured || (stop_all && workers_running > 0)) {
1867 hlog(LOG_DEBUG, "Stopping a worker thread...");
1868 /* find the last worker thread and shut it down...
1869 * could shut down the first one, but to reduce confusion
1870 * will shut down the one with the largest worker id :)
1871 *
1872 * This could be done even more cleanly by moving the connected
1873 * clients to the threads which are left running, but maybe
1874 * that'd be way too cool and complicated to implement right now.
1875 * It's cool enough to be able to reconfigure at all.
1876 */
1877 w = worker_threads;
1878 if (w == NULL) {
1879 hlog(LOG_CRIT, "Cannot stop worker threads, none running");
1880 abort();
1881 }
1882 while ((w) && (w->next))
1883 w = w->next;
1884
1885 w->shutting_down = (stop_all == 2) ? 2 : 1;
1886 if ((e = pthread_join(w->th, NULL))) {
1887 hlog(LOG_ERR, "Could not pthread_join worker %d: %s", w->id, strerror(e));
1888 } else {
1889 hlog(LOG_DEBUG, "Worker %d has terminated.", w->id);
1890 stopped++;
1891 }
1892
1893 *(w->prevp) = NULL;
1894 hfree(w);
1895
1896 workers_running--;
1897 }
1898 hlog(LOG_INFO, "Stopped %d worker threads.", stopped);
1899
1900 }
1901
1902 /*
1903 * Allocate a worker structure.
1904 * This is also called from the http thread which acts as a
1905 * "worker" for incoming packets.
1906 */
1907
worker_alloc(void)1908 struct worker_t *worker_alloc(void)
1909 {
1910 struct worker_t *w;
1911 pthread_mutexattr_t mut_recursive;
1912 int e;
1913
1914 if ((e = pthread_mutexattr_init(&mut_recursive))) {
1915 hlog(LOG_ERR, "worker_alloc: pthread_mutexattr_init failed: %s", strerror(e));
1916 }
1917
1918 if ((e = pthread_mutexattr_settype(&mut_recursive, PTHREAD_MUTEX_RECURSIVE))) {
1919 hlog(LOG_ERR, "worker_alloc: pthread_mutexattr_settype PTHREAD_MUTEX_RECURSIVE failed: %s", strerror(e));
1920 }
1921
1922 w = hmalloc(sizeof(*w));
1923 memset(w, 0, sizeof(*w));
1924
1925 pthread_mutex_init(&w->clients_mutex, &mut_recursive);
1926 pthread_mutex_init(&w->new_clients_mutex, NULL);
1927
1928 w->pbuf_incoming_local = NULL;
1929 w->pbuf_incoming_local_last = &w->pbuf_incoming_local;
1930
1931 w->pbuf_incoming = NULL;
1932 w->pbuf_incoming_last = &w->pbuf_incoming;
1933 pthread_mutex_init(&w->pbuf_incoming_mutex, NULL);
1934
1935 w->pbuf_global_prevp = pbuf_global_prevp;
1936 w->pbuf_global_dupe_prevp = pbuf_global_dupe_prevp;
1937
1938 return w;
1939 }
1940
1941 /*
1942 * Free a worker's local buffers
1943 */
1944
worker_free_buffers(struct worker_t * self)1945 void worker_free_buffers(struct worker_t *self)
1946 {
1947 struct pbuf_t *p, *pn;
1948
1949 /* clean up thread-local pbuf pools */
1950 for (p = self->pbuf_free_small; p; p = pn) {
1951 pn = p->next;
1952 pbuf_free(NULL, p); // free to global pool
1953 }
1954 for (p = self->pbuf_free_medium; p; p = pn) {
1955 pn = p->next;
1956 pbuf_free(NULL, p); // free to global pool
1957 }
1958 for (p = self->pbuf_free_large; p; p = pn) {
1959 pn = p->next;
1960 pbuf_free(NULL, p); // free to global pool
1961 }
1962 }
1963
1964 /*
1965 * Start workers - runs from accept_thread
1966 */
1967
workers_start(void)1968 void workers_start(void)
1969 {
1970 int i;
1971 struct worker_t * volatile w;
1972 struct worker_t **prevp;
1973
1974 if (workers_running)
1975 workers_stop(0);
1976
1977 hlog(LOG_INFO, "Starting %d worker threads (configured: %d)...",
1978 workers_configured - workers_running, workers_configured);
1979
1980 while (workers_running < workers_configured) {
1981 hlog(LOG_DEBUG, "Starting a worker thread...");
1982 i = 0;
1983 prevp = &worker_threads;
1984 w = worker_threads;
1985 while (w) {
1986 prevp = &w->next;
1987 w = w->next;
1988 i++;
1989 }
1990
1991 w = worker_alloc();
1992 *prevp = w;
1993 w->prevp = prevp;
1994
1995 w->id = i;
1996 xpoll_initialize(&w->xp, (void *)w, &handle_client_event);
1997
1998 /* start the worker thread */
1999 if (pthread_create(&w->th, &pthr_attrs, (void *)worker_thread, w))
2000 perror("pthread_create failed for worker_thread");
2001
2002 workers_running++;
2003 }
2004 }
2005
2006 /*
2007 * Add an array of long longs to a JSON tree.
2008 */
2009
json_add_rxerrs(cJSON * root,const char * key,long long vals[])2010 void json_add_rxerrs(cJSON *root, const char *key, long long vals[])
2011 {
2012 double vald[INERR_BUCKETS];
2013 int i;
2014
2015 /* cJSON does not have a CreateLongLongArray, big ints are taken in
2016 * as floating point values. Strange, ain't it.
2017 */
2018 for (i = 0; i < INERR_BUCKETS; i++)
2019 vald[i] = vals[i];
2020
2021 cJSON_AddItemToObject(root, key, cJSON_CreateDoubleArray(vald, INERR_BUCKETS));
2022 }
2023
2024 /*
2025 * Client state string
2026 */
2027
client_state_string(CStateEnum state)2028 static const char *client_state_string(CStateEnum state)
2029 {
2030 static const char *states[] = {
2031 "unknown",
2032 "init",
2033 "login",
2034 "logresp",
2035 "connected",
2036 "udp",
2037 "corepeer"
2038 };
2039
2040 switch (state) {
2041 case CSTATE_CONNECTED:
2042 return states[4];
2043 case CSTATE_INIT:
2044 return states[1];
2045 case CSTATE_LOGIN:
2046 return states[2];
2047 case CSTATE_LOGRESP:
2048 return states[3];
2049 case CSTATE_UDP:
2050 return states[5];
2051 case CSTATE_COREPEER:
2052 return states[6];
2053 };
2054
2055 return states[0];
2056 }
2057
2058 /*
2059 * Fill worker client list for status display
2060 * (called from another thread - watch out and lock!)
2061 */
2062
worker_client_json(struct client_t * c,int liveup_info)2063 static struct cJSON *worker_client_json(struct client_t *c, int liveup_info)
2064 {
2065 char addr_s[80];
2066 char *s;
2067 static const char *uplink_modes[] = {
2068 "ro",
2069 "multiro",
2070 "full",
2071 "peer"
2072 };
2073 const char *mode;
2074
2075 cJSON *jc = cJSON_CreateObject();
2076 cJSON_AddNumberToObject(jc, "fd", c->fd);
2077 cJSON_AddNumberToObject(jc, "id", c->fd);
2078
2079 /* additional information for live upgrade, not published */
2080 if (liveup_info) {
2081 cJSON_AddNumberToObject(jc, "listener_id", c->listener_id);
2082 cJSON_AddStringToObject(jc, "state", client_state_string(c->state));
2083 if (c->udp_port && c->udpclient)
2084 cJSON_AddNumberToObject(jc, "udp_port", c->udp_port);
2085
2086 /* output buffer and input buffer data */
2087 if (c->obuf_end - c->obuf_start > 0) {
2088 s = hex_encode(c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
2089 cJSON_AddStringToObject(jc, "obuf", s);
2090 hfree(s);
2091 }
2092
2093 if (c->ibuf_end > 0) {
2094 s = hex_encode(c->ibuf, c->ibuf_end);
2095 cJSON_AddStringToObject(jc, "ibuf", s);
2096 hlog(LOG_DEBUG, "Encoded ibuf %d bytes: '%.*s'", c->ibuf_end, c->ibuf_end, c->ibuf);
2097 hlog(LOG_DEBUG, "Hex: %s", s);
2098 hfree(s);
2099 }
2100
2101 /* If message routing for stations heard by this client is enabled,
2102 * dump the client_heard hash table, too.
2103 */
2104 if (c->flags & CLFLAGS_IGATE)
2105 cJSON_AddItemToObject(jc, "client_heard", client_heard_json(c->client_heard));
2106 }
2107
2108 if (c->state == CSTATE_COREPEER) {
2109 /* cut out ports in the name of security by obscurity */
2110 strncpy(addr_s, c->addr_rem, sizeof(addr_s));
2111 addr_s[sizeof(addr_s)-1] = 0;
2112 if ((s = strrchr(addr_s, ':')))
2113 *s = 0;
2114 cJSON_AddStringToObject(jc, "addr_rem", addr_s);
2115 strncpy(addr_s, c->addr_loc, sizeof(addr_s));
2116 addr_s[sizeof(addr_s)-1] = 0;
2117 if ((s = strrchr(addr_s, ':')))
2118 *s = 0;
2119 cJSON_AddStringToObject(jc, "addr_loc", addr_s);
2120 } else {
2121 cJSON_AddStringToObject(jc, "addr_rem", c->addr_rem);
2122 cJSON_AddStringToObject(jc, "addr_loc", c->addr_loc);
2123 }
2124
2125 //cJSON_AddStringToObject(jc, "addr_q", c->addr_hex);
2126
2127 if (c->udp_port && c->udpclient)
2128 cJSON_AddNumberToObject(jc, "udp_downstream", 1);
2129
2130 cJSON_AddNumberToObject(jc, "t_connect", c->connect_time);
2131 cJSON_AddNumberToObject(jc, "t_connect_tick", c->connect_tick);
2132 cJSON_AddNumberToObject(jc, "since_connect", tick - c->connect_tick);
2133 cJSON_AddNumberToObject(jc, "since_last_read", tick - c->last_read);
2134 cJSON_AddStringToObject(jc, "username", c->username);
2135 cJSON_AddStringToObject(jc, "app_name", c->app_name);
2136 cJSON_AddStringToObject(jc, "app_version", c->app_version);
2137 cJSON_AddNumberToObject(jc, "verified", c->validated);
2138 cJSON_AddNumberToObject(jc, "obuf_q", c->obuf_end - c->obuf_start);
2139 cJSON_AddNumberToObject(jc, "bytes_rx", c->localaccount.rxbytes);
2140 cJSON_AddNumberToObject(jc, "bytes_tx", c->localaccount.txbytes);
2141 cJSON_AddNumberToObject(jc, "pkts_rx", c->localaccount.rxpackets);
2142 cJSON_AddNumberToObject(jc, "pkts_tx", c->localaccount.txpackets);
2143 cJSON_AddNumberToObject(jc, "pkts_ign", c->localaccount.rxdrops);
2144 cJSON_AddNumberToObject(jc, "pkts_dup", c->localaccount.rxdupes);
2145 cJSON_AddNumberToObject(jc, "heard_count", c->client_heard_count);
2146 cJSON_AddNumberToObject(jc, "courtesy_count", c->client_courtesy_count);
2147
2148 if (c->loc_known) {
2149 cJSON_AddNumberToObject(jc, "lat", c->lat);
2150 cJSON_AddNumberToObject(jc, "lng", c->lng);
2151 }
2152
2153 if (c->quirks_mode)
2154 cJSON_AddNumberToObject(jc, "quirks_mode", c->quirks_mode);
2155
2156 json_add_rxerrs(jc, "rx_errs", c->localaccount.rxerrs);
2157
2158 if (c->state == CSTATE_COREPEER) {
2159 cJSON_AddStringToObject(jc, "mode", uplink_modes[3]);
2160 } else if (c->flags & CLFLAGS_INPORT) {
2161 /* client */
2162 cJSON_AddStringToObject(jc, "filter", c->filter_s);
2163 } else {
2164 if (c->flags & CLFLAGS_UPLINKMULTI)
2165 mode = uplink_modes[1];
2166 else if (c->flags & CLFLAGS_PORT_RO)
2167 mode = uplink_modes[0];
2168 else
2169 mode = uplink_modes[2];
2170
2171 cJSON_AddStringToObject(jc, "mode", mode);
2172 }
2173
2174 #ifdef USE_SSL
2175 if (c->cert_subject[0])
2176 cJSON_AddStringToObject(jc, "cert_subject", c->cert_subject);
2177 if (c->cert_issuer[0])
2178 cJSON_AddStringToObject(jc, "cert_issuer", c->cert_issuer);
2179 #endif
2180
2181 return jc;
2182 }
2183
worker_client_list(cJSON * workers,cJSON * clients,cJSON * uplinks,cJSON * peers,cJSON * totals,cJSON * memory)2184 int worker_client_list(cJSON *workers, cJSON *clients, cJSON *uplinks, cJSON *peers, cJSON *totals, cJSON *memory)
2185 {
2186 struct worker_t *w = worker_threads;
2187 struct client_t *c;
2188 int pe;
2189 int client_heard_count = 0;
2190 int client_courtesy_count = 0;
2191
2192 while (w) {
2193 if ((pe = pthread_mutex_lock(&w->clients_mutex))) {
2194 hlog(LOG_ERR, "worker_client_list(worker %d): could not lock clients_mutex: %s", w->id, strerror(pe));
2195 return -1;
2196 }
2197
2198 cJSON *jw = cJSON_CreateObject();
2199 cJSON_AddNumberToObject(jw, "id", w->id);
2200 cJSON_AddNumberToObject(jw, "clients", w->client_count);
2201 cJSON_AddNumberToObject(jw, "pbuf_incoming_count", w->pbuf_incoming_count);
2202 cJSON_AddNumberToObject(jw, "pbuf_incoming_local_count", w->pbuf_incoming_local_count);
2203
2204 for (c = w->clients; (c); c = c->next) {
2205 client_heard_count += c->client_heard_count;
2206 client_courtesy_count += c->client_courtesy_count;
2207
2208 /* clients on hidden listener sockets are not shown */
2209 /* if there are a huge amount of clients, don't list them
2210 * - cJSON takes huge amounts of CPU to build the list
2211 * - web browser will die due to the big blob
2212 */
2213 if (c->hidden || w->client_count > 1000)
2214 continue;
2215
2216 cJSON *jc = worker_client_json(c, 0);
2217
2218 if (c->state == CSTATE_COREPEER) {
2219 cJSON_AddItemToArray(peers, jc);
2220 } else if (c->flags & CLFLAGS_INPORT) {
2221 cJSON_AddItemToArray(clients, jc);
2222 } else {
2223 cJSON_AddItemToArray(uplinks, jc);
2224 }
2225 }
2226
2227 cJSON_AddItemToArray(workers, jw);
2228
2229 if ((pe = pthread_mutex_unlock(&w->clients_mutex))) {
2230 hlog(LOG_ERR, "worker_client_list(worker %d): could not unlock clients_mutex: %s", w->id, strerror(pe));
2231 /* we'd going to deadlock here... */
2232 exit(1);
2233 }
2234
2235 w = w->next;
2236 }
2237
2238 cJSON_AddNumberToObject(totals, "tcp_bytes_rx", client_connects_tcp.rxbytes);
2239 cJSON_AddNumberToObject(totals, "tcp_bytes_tx", client_connects_tcp.txbytes);
2240 cJSON_AddNumberToObject(totals, "tcp_pkts_rx", client_connects_tcp.rxpackets);
2241 cJSON_AddNumberToObject(totals, "tcp_pkts_tx", client_connects_tcp.txpackets);
2242 cJSON_AddNumberToObject(totals, "tcp_pkts_ign", client_connects_tcp.rxdrops);
2243 cJSON_AddNumberToObject(totals, "udp_bytes_rx", client_connects_udp.rxbytes);
2244 cJSON_AddNumberToObject(totals, "udp_bytes_tx", client_connects_udp.txbytes);
2245 cJSON_AddNumberToObject(totals, "udp_pkts_rx", client_connects_udp.rxpackets);
2246 cJSON_AddNumberToObject(totals, "udp_pkts_tx", client_connects_udp.txpackets);
2247 cJSON_AddNumberToObject(totals, "udp_pkts_ign", client_connects_udp.rxdrops);
2248 json_add_rxerrs(totals, "tcp_rx_errs", client_connects_tcp.rxerrs);
2249 json_add_rxerrs(totals, "udp_rx_errs", client_connects_udp.rxerrs);
2250 #ifdef USE_SCTP
2251 cJSON_AddNumberToObject(totals, "sctp_bytes_rx", client_connects_sctp.rxbytes);
2252 cJSON_AddNumberToObject(totals, "sctp_bytes_tx", client_connects_sctp.txbytes);
2253 cJSON_AddNumberToObject(totals, "sctp_pkts_rx", client_connects_sctp.rxpackets);
2254 cJSON_AddNumberToObject(totals, "sctp_pkts_tx", client_connects_sctp.txpackets);
2255 cJSON_AddNumberToObject(totals, "sctp_pkts_ign", client_connects_sctp.rxdrops);
2256 json_add_rxerrs(totals, "sctp_rx_errs", client_connects_sctp.rxerrs);
2257 #endif
2258
2259 #ifndef _FOR_VALGRIND_
2260 struct cellstatus_t cellst;
2261 cellstatus(client_cells, &cellst);
2262 int used = cellst.cellcount - cellst.freecount;
2263 cJSON_AddNumberToObject(memory, "client_cells_used", used);
2264 cJSON_AddNumberToObject(memory, "client_cells_free", cellst.freecount);
2265 cJSON_AddNumberToObject(memory, "client_used_bytes", used*cellst.cellsize_aligned);
2266 cJSON_AddNumberToObject(memory, "client_allocated_bytes", (long)cellst.blocks * (long)cellst.block_size);
2267 cJSON_AddNumberToObject(memory, "client_block_size", (long)cellst.block_size);
2268 cJSON_AddNumberToObject(memory, "client_blocks", (long)cellst.blocks);
2269 cJSON_AddNumberToObject(memory, "client_blocks_max", (long)cellst.blocks_max);
2270 cJSON_AddNumberToObject(memory, "client_cell_size", cellst.cellsize);
2271 cJSON_AddNumberToObject(memory, "client_cell_size_aligned", cellst.cellsize_aligned);
2272 cJSON_AddNumberToObject(memory, "client_cell_align", cellst.alignment);
2273 #endif
2274
2275 return 0;
2276 }
2277