1 /*
2 * aprsc
3 *
4 * (c) Matti Aarnio, OH2MQK, <oh2mqk@sral.fi>
5 * (c) Heikki Hannikainen, OH7LZB
6 *
7 * This program is licensed under the BSD license, which can be found
8 * in the file LICENSE.
9 *
10 */
11
12 /*
13 * uplink thread: create uplink connections as necessary
14 * (and tear them down)
15 */
16
17 #include <string.h>
18 #include <errno.h>
19 #include <signal.h>
20 #include <ctype.h>
21 #ifdef HAVE_ALLOCA_H
22 #include <alloca.h>
23 #endif
24 #include <time.h>
25 #include <stdlib.h>
26 #include <fcntl.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31
32 #include "config.h"
33 #include "version.h"
34 #include "status.h"
35 #include "uplink.h"
36 #include "hmalloc.h"
37 #include "hlog.h"
38 #include "worker.h"
39 #include "login.h"
40 #include "incoming.h"
41 #include "outgoing.h"
42 #include "filter.h"
43 #include "ssl.h"
44
45 #define MAX_UPLINKS 32
46
47 int uplink_reconfiguring;
48 int uplink_shutting_down;
49
50 pthread_mutex_t uplink_client_mutex = PTHREAD_MUTEX_INITIALIZER;
51 struct client_t *uplink_client[MAX_UPLINKS];
52
53 int uplink_running;
54 pthread_t uplink_th;
55
56 struct uplink_config_t *uplink_config; /* currently running uplink config */
57
58 /* global uplink connects, and protocol traffic accounters */
59
60 struct portaccount_t uplink_connects = {
61 .mutex = PTHREAD_MUTEX_INITIALIZER,
62 .refcount = 99, /* Global static blocks have extra-high initial refcount */
63 };
64
65 /*
66 * Close uplinking sockets
67 */
68
close_uplinkers(void)69 void close_uplinkers(void)
70 {
71 int rc;
72
73 hlog(LOG_DEBUG, "Closing all uplinks");
74
75 if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
76 hlog( LOG_ERR, "close_uplinkers(): could not lock uplink_client_mutex: %s", strerror(rc) );
77 return;
78 }
79
80 int i;
81 for (i = 0; i < MAX_UPLINKS; i++) {
82 if ((uplink_client[i]) && uplink_client[i]->fd >= 0) {
83 hlog( LOG_DEBUG, "Closing uplinking socket %d (fd %d) %s ...", i, uplink_client[i]->fd, uplink_client[i]->addr_rem );
84 shutdown(uplink_client[i]->fd, SHUT_RDWR);
85 }
86 }
87
88 if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
89 hlog( LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc) );
90 return;
91 }
92 return;
93 }
94
95 /*
96 * Log and handle the closing of an uplink.
97 */
98
uplink_close(struct client_t * c,int errnum)99 void uplink_close(struct client_t *c, int errnum)
100 {
101 int rc;
102
103 hlog(LOG_INFO, "%s: Uplink [%d] has been closed: %s", c->addr_rem, c->uplink_index, aprsc_strerror(errnum));
104
105 if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
106 hlog(LOG_ERR, "uplink_close(): could not lock uplink_client_mutex: %s", strerror(rc));
107 return;
108 }
109
110 if ((rc = pthread_mutex_lock(& uplink_connects.mutex )))
111 hlog(LOG_ERR, "uplink_close: could not lock uplink_connects: %s", strerror(rc));
112 -- uplink_connects.gauge;
113 if ((rc = pthread_mutex_unlock(& uplink_connects.mutex )))
114 hlog(LOG_ERR, "uplink_close: could not unlock uplink_connects: %s", strerror(rc));
115
116 struct uplink_config_t *l = uplink_config;
117 for (; l; l = l->next) {
118 if (l->client_ptr == (void *)c) {
119 hlog(LOG_DEBUG, "found the link to disconnect");
120 l->state = UPLINK_ST_NOT_LINKED;
121 l->client_ptr = NULL;
122 }
123 }
124
125 uplink_client[c->uplink_index] = NULL; // there can be only one!
126
127 if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
128 hlog(LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
129 return;
130 }
131 return;
132 }
133
134 /*
135 * uplink_logresp_handler parses the "# logresp" string given by
136 * an upstream server after our "user" command has been sent.
137 */
138
uplink_logresp_handler(struct worker_t * self,struct client_t * c,int l4proto,char * s,int len)139 int uplink_logresp_handler(struct worker_t *self, struct client_t *c, int l4proto, char *s, int len)
140 {
141 int argc;
142 char *argv[256];
143 char *p;
144
145 hlog_packet(LOG_INFO, s, len, "%s: Uplink server login response: ", c->addr_rem);
146
147 /* parse to arguments */
148 /* make it null-terminated for our string processing */
149 char *e = s + len;
150 *e = 0;
151 if ((argc = parse_args_noshell(argv, s)) == 0 || *argv[0] != '#') {
152 hlog(LOG_ERR, "%s: Uplink's logresp message is not recognized: no # in beginning (protocol incompatibility)", c->addr_rem);
153 client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
154 return 0;
155 }
156
157 if (argc < 6) {
158 hlog(LOG_ERR, "%s: Uplink's logresp message does not have enough arguments (protocol incompatibility)", c->addr_rem);
159 client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
160 return 0;
161 }
162
163 if (strcmp(argv[1], "logresp") != 0) {
164 hlog(LOG_ERR, "%s: Uplink's logresp message does not say 'logresp' (protocol incompatibility)", c->addr_rem);
165 client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
166 return 0;
167 }
168
169 if (strcmp(argv[2], serverid) != 0) {
170 hlog(LOG_ERR, "%s: Uplink's logresp message does not have my callsign '%s' on it (protocol incompatibility)", c->addr_rem, serverid);
171 client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
172 return 0;
173 }
174
175 if (strcmp(argv[3], "verified,") != 0) {
176 hlog(LOG_ERR, "%s: Uplink's logresp message does not say I'm verified (wrong passcode in my configuration?)", c->addr_rem);
177 client_close(self, c, CLIERR_UPLINK_LOGIN_NOT_VERIFIED);
178 return 0;
179 }
180
181 if (strcmp(argv[4], "server") != 0) {
182 hlog(LOG_ERR, "%s: Uplink's logresp message does not contain 'server' (protocol incompatibility)", c->addr_rem);
183 client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
184 return 0;
185 }
186
187 p = strchr(argv[5], ',');
188 if (p)
189 *p = 0;
190
191 if (strlen(argv[5]) > CALLSIGNLEN_MAX) {
192 hlog(LOG_ERR, "%s: Uplink's server name is too long: '%s'", c->addr_rem, argv[5]);
193 client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
194 return 0;
195 }
196
197 if (strcasecmp(argv[5], serverid) == 0) {
198 hlog(LOG_ERR, "%s: Uplink's server name is same as ours: '%s'", c->addr_rem, argv[5]);
199 client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
200 return 0;
201 }
202
203 /* todo: validate server callsign with the q valid path algorithm */
204
205 /* store the remote server's callsign as the "client username" */
206 strncpy(c->username, argv[5], sizeof(c->username));
207 c->username[sizeof(c->username)-1] = 0;
208
209 /* uplink servers are always "validated" */
210 c->validated = VALIDATED_WEAK;
211
212 /* check the server name against certificate */
213 #ifdef USE_SSL
214 if (c->ssl_con && c->ssl_con->validate) {
215 hlog(LOG_DEBUG, "%s/%s: Uplink: Validating SSL server cert subject", c->addr_rem, c->username);
216 int ssl_res = ssl_validate_peer_cert_phase2(c);
217
218 if (ssl_res != 0) {
219 hlog(LOG_WARNING, "%s/%s: SSL server cert validation failed: %s", c->addr_rem, c->username, ssl_strerror(ssl_res));
220 client_close(self, c, CLIERR_UPLINK_PEER_CERT_FAIL);
221 return 0;
222 }
223
224 c->validated = VALIDATED_STRONG;
225 }
226 #endif
227
228 hlog(LOG_INFO, "%s: Uplink logged in to server %s", c->addr_rem, c->username);
229
230 c->handler_line_in = incoming_handler;
231
232 /* mark as connected and classify */
233 worker_mark_client_connected(self, c);
234
235 return 0;
236 }
237
238 /*
239 * uplink_login_handler parses the "# <software> <version" string given by
240 * an upstream server.
241 */
242
uplink_login_handler(struct worker_t * self,struct client_t * c,int l4proto,char * s,int len)243 int uplink_login_handler(struct worker_t *self, struct client_t *c, int l4proto, char *s, int len)
244 {
245 char buf[1000];
246 int rc;
247 int argc;
248 char *argv[256];
249
250 hlog_packet(LOG_INFO, s, len, "%s: Uplink server software: ", c->addr_rem);
251
252 #ifdef USE_SSL
253 if (c->ssl_con && c->ssl_con->validate) {
254 hlog(LOG_DEBUG, "%s/%s: Uplink: Validating SSL server cert against CA", c->addr_rem, c->username);
255 int ssl_res = ssl_validate_peer_cert_phase1(c);
256
257 if (ssl_res != 0) {
258 hlog(LOG_WARNING, "%s/%s: SSL server cert validation failed: %s", c->addr_rem, c->username, ssl_strerror(ssl_res));
259 client_close(self, c, CLIERR_UPLINK_PEER_CERT_FAIL);
260 return 0;
261 }
262 }
263 #endif
264
265 /* parse to arguments */
266 /* make it null-terminated for our string processing */
267 char *e = s + len;
268 *e = 0;
269 if ((argc = parse_args_noshell(argv, s)) == 0 || *argv[0] != '#') {
270 hlog(LOG_ERR, "%s: Uplink's welcome message is not recognized: no # in beginning", c->addr_rem);
271 client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
272 return 0;
273 }
274
275 if (argc >= 3) {
276 strncpy(c->app_name, argv[1], sizeof(c->app_name));
277 c->app_name[sizeof(c->app_name)-1] = 0;
278 strncpy(c->app_version, argv[2], sizeof(c->app_version));
279 c->app_version[sizeof(c->app_version)-1] = 0;
280 }
281
282 // TODO: The uplink login command here could maybe be improved to send a filter command.
283 len = sprintf(buf, "user %s pass %s vers %s\r\n", serverid, passcode, verstr_aprsis);
284
285 hlog(LOG_DEBUG, "%s: my login string: \"%.*s\"", c->addr_rem, len-2, buf, len);
286
287 rc = c->write(self, c, buf, len);
288 if (rc < -2) return rc; // the client was destroyed by client_write, don't touch it
289
290 c->handler_line_in = uplink_logresp_handler;
291 c->state = CSTATE_LOGRESP;
292
293 hlog(LOG_INFO, "%s: Connected to server, logging in", c->addr_rem);
294
295 return 0;
296 }
297
298 #ifdef USE_SSL
config_uplink_ssl_setup(struct uplink_config_t * l)299 int config_uplink_ssl_setup(struct uplink_config_t *l)
300 {
301 l->ssl = ssl_alloc();
302
303 if (ssl_create(l->ssl, (void *)l)) {
304 hlog(LOG_ERR, "Uplink: Failed to create SSL context for '%s*'", l->name);
305 return -1;
306 }
307
308 /* optional client cert for server-side validation */
309 if (l->certfile && l->keyfile) {
310 if (ssl_certificate(l->ssl, l->certfile, l->keyfile)) {
311 hlog(LOG_ERR, "Uplink '%s': Failed to load SSL certificatess", l->name);
312 ssl_free(l->ssl);
313 l->ssl = NULL;
314 return -1;
315 }
316 }
317
318 /* optional server cert validation */
319 if (l->cafile) {
320 if (ssl_ca_certificate(l->ssl, l->cafile, 2)) {
321 hlog(LOG_ERR, "Uplink '%s': Failed to load trusted SSL CA certificates", l->name);
322 ssl_free(l->ssl);
323 l->ssl = NULL;
324 return -1;
325 }
326 }
327
328 hlog(LOG_INFO, "Uplink %s: SSL initialized%s%s",
329 l->name,
330 (l->cafile) ? ", server validated" : "",
331 (l->certfile) ? ", client cert loaded" : "");
332
333 return 0;
334 }
335 #endif
336
337
338 /*
339 * Uplink a single connection
340 */
341
make_uplink(struct uplink_config_t * l)342 int make_uplink(struct uplink_config_t *l)
343 {
344 int fd, i, addrc, arg;
345 int uplink_index;
346 union sockaddr_u sa; /* large enough for also IPv6 address */
347 socklen_t addr_len;
348 struct addrinfo *ai, *a, *ap[21];
349 struct addrinfo req;
350 char *addr_s = NULL;
351 int port;
352 struct sockaddr *srcaddr;
353 socklen_t srcaddr_len;
354
355 memset(&req, 0, sizeof(req));
356 req.ai_family = 0;
357 req.ai_socktype = SOCK_STREAM;
358 req.ai_protocol = IPPROTO_TCP;
359 req.ai_flags = AI_ADDRCONFIG;
360 ai = NULL;
361
362 #ifdef USE_SSL
363 /* SSL requires both a cert and a key, or none at all */
364 if ((l->certfile && !l->keyfile) || (l->keyfile && !l->certfile)) {
365 hlog(LOG_ERR, "Uplink %s: Only one of sslkey and sslcert defined - both needed for SSL authentication", l->name);
366 return -2;
367 }
368
369 /* todo: allow triggering SSL without client auth */
370 if (l->keyfile && l->certfile) {
371 if (!l->ssl) {
372 if (config_uplink_ssl_setup(l)) {
373 hlog(LOG_ERR, "Uplink '%s': SSL setup failed", l->name);
374 return -2;
375 }
376 }
377 }
378 #endif
379
380 /* find a free uplink slot */
381 for (uplink_index = 0; uplink_index < MAX_UPLINKS; uplink_index++) {
382 if (!uplink_client[uplink_index])
383 break;
384 }
385 if (uplink_index == MAX_UPLINKS) {
386 hlog(LOG_ERR, "Uplink %s: No available uplink slots, %d used", l->name, MAX_UPLINKS);
387 return -2;
388 }
389
390 if (strcasecmp(l->proto, "tcp") == 0) {
391 // well, do nothing for now.
392 } else if (strcasecmp(l->proto, "udp") == 0) {
393 req.ai_socktype = SOCK_DGRAM;
394 req.ai_protocol = IPPROTO_UDP;
395 #ifdef USE_SCTP
396 } else if (strcasecmp(l->proto, "sctp") == 0) {
397 req.ai_socktype = SOCK_STREAM;
398 req.ai_protocol = IPPROTO_SCTP;
399 #endif
400 } else {
401 hlog(LOG_ERR, "Uplink %s: Unsupported protocol '%s'\n", l->name, l->proto);
402 return -2;
403 }
404
405 port = atoi(l->port);
406 if (port < 1 || port > 65535) {
407 hlog(LOG_ERR, "Uplink %s: unsupported port number '%s'\n", l->name, l->port);
408 return -2;
409 }
410
411 l->state = UPLINK_ST_CONNECTING;
412 i = getaddrinfo(l->host, l->port, &req, &ai);
413 if (i != 0) {
414 hlog(LOG_INFO, "Uplink %s: address resolving failure of '%s' '%s': %s", l->name, l->host, l->port, gai_strerror(i));
415 l->state = UPLINK_ST_NOT_LINKED;
416 return -2;
417 }
418
419 /* Count the amount of addresses in response */
420 addrc = 0;
421 for (a = ai; a && addrc < 20 ; a = a->ai_next, ++addrc) {
422 ap[addrc] = a; /* Up to 20 first addresses */
423 }
424 ap[addrc] = NULL;
425
426 if (addrc == 0) {
427 hlog(LOG_INFO, "Uplink %s: address resolving of '%s' '%s': returned 0 addresses", l->name, l->host, l->port);
428 l->state = UPLINK_ST_NOT_LINKED;
429 return -2;
430 }
431
432 /* Pick random address to start from */
433 // coverity[dont_call] // squelch warning: not security sensitive use of random()
434 i = random() % addrc;
435
436 /* Then lets try making socket and connection in address order */
437 /* TODO: BUG: If the TCP connection succeeds, but the server rejects our
438 * login due to a bad source address (like, IPv4 would be allowed but our
439 * IPv6 address is not in the server's ACL), this currently does not switch
440 * to the next destination address.
441 * Instead it'll wait for the retry timer and then try a random
442 * destination address, and eventually succeed (unless very unlucky).
443 */
444 fd = -1;
445 while ((a = ap[i])) {
446 ap[i] = NULL;
447 addr_s = strsockaddr(a->ai_addr, a->ai_addrlen);
448
449 hlog(LOG_INFO, "Uplink %s: Connecting to %s:%s (%s) [link %d, addr %d/%d]",
450 l->name, l->host, l->port, addr_s, uplink_index, i+1, addrc);
451 i++;
452 if (i == addrc)
453 i = 0;
454
455 if ((fd = socket(a->ai_family, a->ai_socktype, a->ai_protocol)) < 0) {
456 hlog(LOG_CRIT, "Uplink %s: socket(): %s\n", l->name, strerror(errno));
457 hfree(addr_s);
458 continue;
459 }
460
461 arg = 1;
462 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg)))
463 hlog(LOG_ERR, "Uplink %s: Failed to set SO_REUSEADDR on new socket: %s", l->name, strerror(errno));
464
465 /* bind source address */
466 srcaddr_len = 0;
467 if (a->ai_family == AF_INET && uplink_bind_v4_len != 0) {
468 srcaddr = (struct sockaddr *)&uplink_bind_v4;
469 srcaddr_len = uplink_bind_v4_len;
470 } else if (a->ai_family == AF_INET6 && uplink_bind_v6_len != 0) {
471 srcaddr = (struct sockaddr *)&uplink_bind_v6;
472 srcaddr_len = uplink_bind_v6_len;
473 }
474
475 if (srcaddr_len) {
476 if (bind(fd, srcaddr, srcaddr_len)) {
477 char *s = strsockaddr(srcaddr, srcaddr_len);
478 hlog(LOG_ERR, "Uplink %s: Failed to bind source address '%s': %s", l->name, s, strerror(errno));
479 hfree(s);
480 goto connerr;
481 }
482 }
483
484 /* set non-blocking mode at this point, so that we can make a
485 * non-blocking connect() with a short timeout
486 */
487 if (fcntl(fd, F_SETFL, O_NONBLOCK)) {
488 hlog(LOG_CRIT, "Uplink %s: Failed to set non-blocking mode on new socket: %s", l->name, strerror(errno));
489 goto connerr;
490 }
491
492 /* Use TCP_NODELAY for APRS-IS sockets. High delays can cause packets getting past
493 * the dupe filters.
494 */
495 #ifdef TCP_NODELAY
496 if (a->ai_protocol == IPPROTO_TCP) {
497 int arg = 1;
498 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&arg, sizeof(arg)))
499 hlog(LOG_ERR, "Uplink %s: %s: setsockopt(TCP_NODELAY, %d) failed: %s", l->name, addr_s, arg, strerror(errno));
500 }
501 #endif
502
503 if (connect(fd, a->ai_addr, a->ai_addrlen) && errno != EINPROGRESS) {
504 hlog(LOG_ERR, "Uplink %s: connect(%s) failed: %s", l->name, addr_s, strerror(errno));
505 goto connerr;
506 }
507
508 /* Only wait a few seconds for the connection to be created.
509 * If the connection setup is very slow, it is unlikely to
510 * perform well enough anyway.
511 */
512 struct pollfd connect_fd;
513 connect_fd.fd = fd;
514 connect_fd.events = POLLOUT;
515 connect_fd.revents = 0;
516
517 int r = poll(&connect_fd, 1, 3000);
518 hlog(LOG_DEBUG, "Uplink %s: poll after connect returned %d, revents %d", l->name, r, connect_fd.revents);
519
520 if (r < 0) {
521 hlog(LOG_ERR, "Uplink %s: connect to %s: poll failed: %s", l->name, addr_s, strerror(errno));
522 goto connerr;
523 }
524
525 if (r < 1) {
526 hlog(LOG_ERR, "Uplink %s: connect to %s timed out", l->name, addr_s);
527 goto connerr;
528 }
529
530 socklen_t optlen = sizeof(arg);
531 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *)&arg, &optlen) == -1) {
532 hlog(LOG_ERR, "Uplink %s: getsockopt() after connect failed: %s", l->name, strerror(errno));
533 goto connerr;
534 } else if (arg == 0) {
535 /* Successful connect! */
536 hlog(LOG_DEBUG, "Uplink %s: successful connect", l->name);
537 break;
538 }
539
540 hlog(LOG_ERR, "Uplink %s: connect to %s failed: %s", l->name, addr_s, strerror(arg));
541 connerr:
542 close(fd);
543 fd = -1;
544 hfree(addr_s);
545 }
546
547 freeaddrinfo(ai); /* Not needed anymore.. */
548
549 if (fd < 0) {
550 l->state = UPLINK_ST_NOT_LINKED;
551 return -3; /* No successfull connection at any address.. */
552 }
553
554 struct client_t *c = client_alloc();
555 if (!c) {
556 hlog(LOG_ERR, "Uplink %s: client_alloc() failed, too many clients", l->name);
557 close(fd);
558 l->state = UPLINK_ST_NOT_LINKED;
559 return -3; /* No successfull connection at any address.. */
560 }
561
562 l->client_ptr = (void *)c;
563 c->uplink_index = uplink_index;
564 c->fd = fd;
565 c->ai_protocol = req.ai_protocol;
566 c->state = CSTATE_INIT;
567 /* use the default login handler */
568 c->handler_line_in = &uplink_login_handler;
569 c->flags = l->client_flags;
570 c->keepalive = tick;
571 c->last_read = tick;
572 c->connect_time = now;
573 strncpy(c->username, l->name, sizeof(c->username));
574 c->username[sizeof(c->username)-1] = 0;
575 c->username_len = strlen(c->username);
576
577 /* These peer/sock name calls can not fail -- or the socket closed
578 on us in which case it gets abandoned a bit further below. */
579
580 addr_len = sizeof(sa);
581 getpeername(fd, (struct sockaddr *)&sa, &addr_len);
582 //s = strsockaddr( &sa.sa, addr_len ); /* server side address */
583 strncpy(c->addr_rem, addr_s, sizeof(c->addr_rem));
584 c->addr_rem[sizeof(c->addr_rem)-1] = 0;
585 hfree(addr_s);
586 c->addr = sa;
587
588 /* hex format of client's IP address + port */
589
590 char *s = hexsockaddr( &sa.sa, addr_len );
591 strncpy(c->addr_hex, s, sizeof(c->addr_hex));
592 c->addr_hex[sizeof(c->addr_hex)-1] = 0;
593 hfree(s);
594
595 addr_len = sizeof(sa);
596 getsockname(fd, (struct sockaddr *)&sa, &addr_len);
597 s = strsockaddr( &sa.sa, addr_len ); /* client side address */
598 strncpy(c->addr_loc, s, sizeof(c->addr_loc));
599 c->addr_loc[sizeof(c->addr_loc)-1] = 0;
600 hfree(s);
601
602 hlog(LOG_INFO, "Uplink %s: %s: Connection established on fd %d using source address %s", l->name, c->addr_rem, c->fd, c->addr_loc);
603
604 if (set_client_sockopt(c) < 0)
605 goto err;
606
607 uplink_client[uplink_index] = c;
608 l->state = UPLINK_ST_CONNECTED;
609
610 /* set up SSL if necessary */
611 #ifdef USE_SSL
612 if (l->ssl) {
613 if (ssl_create_connection(l->ssl, c, 1))
614 goto err;
615 }
616 #endif
617
618 /* Push it on the first worker, which ever it is..
619 */
620
621 if (pass_client_to_worker(worker_threads, c))
622 goto err;
623
624 if ((i = pthread_mutex_lock(& uplink_connects.mutex )))
625 hlog(LOG_ERR, "make_uplink: could not lock uplink_connects: %s", strerror(i));
626 ++ uplink_connects.gauge;
627 ++ uplink_connects.counter;
628 ++ uplink_connects.refcount; /* <-- that does not get decremented at any time.. */
629 if ((i = pthread_mutex_unlock(& uplink_connects.mutex )))
630 hlog(LOG_ERR, "make_uplink: could not unlock uplink_connects: %s", strerror(i));
631
632 c->portaccount = & uplink_connects; /* calculate traffic bytes/packets */
633
634 return 0;
635
636 err:
637 client_free(c);
638 uplink_client[uplink_index] = NULL;
639 l->state = UPLINK_ST_NOT_LINKED;
640 return -1;
641 }
642
643
644 /*
645 * Uplink thread
646 */
647
uplink_thread(void * asdf)648 void uplink_thread(void *asdf)
649 {
650 sigset_t sigs_to_block;
651 int rc;
652 int next_uplink = -1; /* the index to the next regular uplink candidate */
653 int uplink_error_set = -1;
654
655 pthreads_profiling_reset("uplink");
656
657 sigemptyset(&sigs_to_block);
658 sigaddset(&sigs_to_block, SIGALRM);
659 sigaddset(&sigs_to_block, SIGINT);
660 sigaddset(&sigs_to_block, SIGTERM);
661 sigaddset(&sigs_to_block, SIGQUIT);
662 sigaddset(&sigs_to_block, SIGHUP);
663 sigaddset(&sigs_to_block, SIGURG);
664 sigaddset(&sigs_to_block, SIGPIPE);
665 sigaddset(&sigs_to_block, SIGUSR1);
666 sigaddset(&sigs_to_block, SIGUSR2);
667 pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
668
669 //hlog(LOG_INFO, "Uplink thread starting...");
670
671 uplink_reconfiguring = 1;
672 while (!uplink_shutting_down) {
673 if (uplink_reconfiguring || uplink_config_updated) {
674 hlog(LOG_INFO, "Uplink thread applying new configuration...");
675 __sync_synchronize();
676 uplink_reconfiguring = 0;
677 close_uplinkers();
678
679 free_uplink_config(&uplink_config);
680 uplink_config = uplink_config_install;
681 uplink_config_install = NULL;
682 if (uplink_config)
683 uplink_config->prevp = &uplink_config;
684
685 uplink_config_updated = 0;
686
687 hlog(LOG_INFO, "Uplink thread configured.");
688 }
689
690 /* speed up shutdown */
691 if (uplink_shutting_down || uplink_reconfiguring)
692 continue;
693
694 if (uplink_config_updated) {
695 uplink_reconfiguring = 1;
696 continue;
697 }
698
699 if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
700 hlog(LOG_ERR, "uplink_thread(): could not lock uplink_client_mutex: %s", strerror(rc));
701 continue;
702 }
703
704 /* Check if all we have a single regular uplink connection up, out of all
705 * the configured ones. Also, check that all the UPLINKMULTI links are
706 * connected.
707 */
708
709 int has_uplink = 0; /* do we have a single regular uplink? */
710 int avail_uplink = 0; /* how many regular uplinks are configured? */
711
712 struct uplink_config_t *l = uplink_config;
713 for (; l; l = l->next) {
714 if (l->client_flags & CLFLAGS_UPLINKMULTI) {
715 /* MULTI uplink, needs to be up */
716 if (l->state < UPLINK_ST_CONNECTING)
717 make_uplink(l);
718 } else {
719 /* regular uplink, need to have one connected */
720 if (l->state >= UPLINK_ST_CONNECTING)
721 has_uplink++;
722 avail_uplink++;
723 }
724 }
725
726 if (avail_uplink && !has_uplink) {
727 hlog(LOG_INFO, "Uplink: %d uplinks configured, %d are connected, need to pick new", avail_uplink, has_uplink);
728 /* we have regular uplinks but none are connected,
729 * pick the next one and connect */
730 next_uplink++;
731 if (next_uplink >= avail_uplink)
732 next_uplink = 0;
733 //hlog(LOG_DEBUG, "Uplink: picked uplink %d as the new candidate", next_uplink);
734 l = uplink_config;
735 int i = 0;
736 while ((l) && i < next_uplink) {
737 if (!(l->client_flags & CLFLAGS_UPLINKMULTI))
738 i++;
739 l = l->next;
740 }
741 if (l) {
742 hlog(LOG_DEBUG, "Uplink: trying %s (%s:%s)", l->name, l->host, l->port);
743 make_uplink(l);
744 }
745 }
746
747 if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
748 hlog(LOG_CRIT, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
749 continue;
750 }
751
752 if (avail_uplink && !has_uplink) {
753 status_error(3600, "no_uplink");
754 uplink_error_set = 1;
755 } else {
756 if (uplink_error_set == 1) {
757 status_error(-1, "no_uplink");
758 uplink_error_set = 0;
759 }
760 }
761
762 /* sleep for 4 seconds between successful rounds */
763 for (rc = 0; (!uplink_shutting_down) && rc < 4000/200; rc++)
764 if (poll(NULL, 0, 200) == -1 && errno != EINTR)
765 hlog(LOG_WARNING, "uplink: poll sleep failed: %s", strerror(errno));
766 }
767
768 hlog(LOG_DEBUG, "Uplink thread shutting down uplinking sockets...");
769 close_uplinkers();
770 }
771
772
773 /*
774 * Start / stop the uplinks maintainer thread
775 */
776
uplink_start(void)777 void uplink_start(void)
778 {
779 if (uplink_running)
780 return;
781
782 uplink_shutting_down = 0;
783
784 if (pthread_create(&uplink_th, &pthr_attrs, (void *)uplink_thread, NULL))
785 perror("pthread_create failed for uplink_thread");
786
787 uplink_running = 1;
788 }
789
uplink_stop(void)790 void uplink_stop(void)
791 {
792 int i, e;
793
794 if (!uplink_running)
795 return;
796
797 hlog(LOG_INFO, "Signalling uplink_thread to shut down...");
798 uplink_shutting_down = 1;
799
800 if ((e = pthread_join(uplink_th, NULL))) {
801 hlog(LOG_ERR, "Could not pthread_join uplink_th: %s", strerror(e));
802 } else {
803 hlog(LOG_INFO, "Uplink thread has terminated.");
804 uplink_running = 0;
805 }
806
807 /* free uplink config - and clean up the uplink_client indexed pointers
808 * which refer to the configs
809 */
810 if ((e = pthread_mutex_lock(&uplink_client_mutex))) {
811 hlog( LOG_ERR, "uplink_stop(): could not lock uplink_client_mutex: %s", strerror(e) );
812 return;
813 }
814
815 for (i = 0; i < MAX_UPLINKS; i++)
816 uplink_client[i] = NULL;
817
818 free_uplink_config(&uplink_config);
819
820 if ((e = pthread_mutex_unlock(&uplink_client_mutex))) {
821 hlog( LOG_ERR, "uplink_stop(): could not unlock uplink_client_mutex: %s", strerror(e) );
822 return;
823 }
824 }
825