1 /*
2  *	aprsc
3  *
4  *	(c) Matti Aarnio, OH2MQK, <oh2mqk@sral.fi>
5  *	(c) Heikki Hannikainen, OH7LZB
6  *
7  *     This program is licensed under the BSD license, which can be found
8  *     in the file LICENSE.
9  *
10  */
11 
12 /*
13  *	uplink thread: create uplink connections as necessary
14  *	(and tear them down)
15  */
16 
17 #include <string.h>
18 #include <errno.h>
19 #include <signal.h>
20 #include <ctype.h>
21 #ifdef HAVE_ALLOCA_H
22 #include <alloca.h>
23 #endif
24 #include <time.h>
25 #include <stdlib.h>
26 #include <fcntl.h>
27 #include <sys/types.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 
32 #include "config.h"
33 #include "version.h"
34 #include "status.h"
35 #include "uplink.h"
36 #include "hmalloc.h"
37 #include "hlog.h"
38 #include "worker.h"
39 #include "login.h"
40 #include "incoming.h"
41 #include "outgoing.h"
42 #include "filter.h"
43 #include "ssl.h"
44 
45 #define MAX_UPLINKS 32
46 
47 int uplink_reconfiguring;
48 int uplink_shutting_down;
49 
50 pthread_mutex_t uplink_client_mutex = PTHREAD_MUTEX_INITIALIZER;
51 struct client_t *uplink_client[MAX_UPLINKS];
52 
53 int uplink_running;
54 pthread_t uplink_th;
55 
56 struct uplink_config_t *uplink_config; /* currently running uplink config */
57 
58 /* global uplink connects, and protocol traffic accounters */
59 
60 struct portaccount_t uplink_connects = {
61   .mutex    = PTHREAD_MUTEX_INITIALIZER,
62   .refcount = 99,	/* Global static blocks have extra-high initial refcount */
63 };
64 
65 /*
66  *	Close uplinking sockets
67  */
68 
close_uplinkers(void)69 void close_uplinkers(void)
70 {
71 	int rc;
72 
73 	hlog(LOG_DEBUG, "Closing all uplinks");
74 
75 	if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
76 		hlog( LOG_ERR, "close_uplinkers(): could not lock uplink_client_mutex: %s", strerror(rc) );
77 		return;
78 	}
79 
80 	int i;
81 	for (i = 0; i < MAX_UPLINKS; i++) {
82 		if ((uplink_client[i]) && uplink_client[i]->fd >= 0) {
83 			hlog( LOG_DEBUG, "Closing uplinking socket %d (fd %d) %s ...", i, uplink_client[i]->fd, uplink_client[i]->addr_rem );
84 			shutdown(uplink_client[i]->fd, SHUT_RDWR);
85 		}
86 	}
87 
88 	if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
89 		hlog( LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc) );
90 		return;
91 	}
92 	return;
93 }
94 
95 /*
96  *	Log and handle the closing of an uplink.
97  */
98 
uplink_close(struct client_t * c,int errnum)99 void uplink_close(struct client_t *c, int errnum)
100 {
101 	int rc;
102 
103 	hlog(LOG_INFO, "%s: Uplink [%d] has been closed: %s", c->addr_rem, c->uplink_index, aprsc_strerror(errnum));
104 
105 	if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
106 		hlog(LOG_ERR, "uplink_close(): could not lock uplink_client_mutex: %s", strerror(rc));
107 		return;
108 	}
109 
110 	if ((rc = pthread_mutex_lock(& uplink_connects.mutex )))
111 		hlog(LOG_ERR, "uplink_close: could not lock uplink_connects: %s", strerror(rc));
112 	-- uplink_connects.gauge;
113 	if ((rc = pthread_mutex_unlock(& uplink_connects.mutex )))
114 		hlog(LOG_ERR, "uplink_close: could not unlock uplink_connects: %s", strerror(rc));
115 
116 	struct uplink_config_t *l = uplink_config;
117 	for (; l; l = l->next) {
118 		if (l->client_ptr == (void *)c) {
119 			hlog(LOG_DEBUG, "found the link to disconnect");
120 			l->state = UPLINK_ST_NOT_LINKED;
121 			l->client_ptr = NULL;
122 		}
123 	}
124 
125 	uplink_client[c->uplink_index] = NULL; // there can be only one!
126 
127 	if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
128 		hlog(LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
129 		return;
130 	}
131 	return;
132 }
133 
134 /*
135  *	uplink_logresp_handler parses the "# logresp" string given by
136  *	an upstream server after our "user" command has been sent.
137  */
138 
uplink_logresp_handler(struct worker_t * self,struct client_t * c,int l4proto,char * s,int len)139 int uplink_logresp_handler(struct worker_t *self, struct client_t *c, int l4proto, char *s, int len)
140 {
141 	int argc;
142 	char *argv[256];
143 	char *p;
144 
145 	hlog_packet(LOG_INFO, s, len, "%s: Uplink server login response: ", c->addr_rem);
146 
147 	/* parse to arguments */
148 	/* make it null-terminated for our string processing */
149 	char *e = s + len;
150 	*e = 0;
151 	if ((argc = parse_args_noshell(argv, s)) == 0 || *argv[0] != '#') {
152 		hlog(LOG_ERR, "%s: Uplink's logresp message is not recognized: no # in beginning (protocol incompatibility)", c->addr_rem);
153 		client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
154 		return 0;
155 	}
156 
157 	if (argc < 6) {
158 		hlog(LOG_ERR, "%s: Uplink's logresp message does not have enough arguments (protocol incompatibility)", c->addr_rem);
159 		client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
160 		return 0;
161 	}
162 
163 	if (strcmp(argv[1], "logresp") != 0) {
164 		hlog(LOG_ERR, "%s: Uplink's logresp message does not say 'logresp' (protocol incompatibility)", c->addr_rem);
165 		client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
166 		return 0;
167 	}
168 
169 	if (strcmp(argv[2], serverid) != 0) {
170 		hlog(LOG_ERR, "%s: Uplink's logresp message does not have my callsign '%s' on it (protocol incompatibility)", c->addr_rem, serverid);
171 		client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
172 		return 0;
173 	}
174 
175 	if (strcmp(argv[3], "verified,") != 0) {
176 		hlog(LOG_ERR, "%s: Uplink's logresp message does not say I'm verified (wrong passcode in my configuration?)", c->addr_rem);
177 		client_close(self, c, CLIERR_UPLINK_LOGIN_NOT_VERIFIED);
178 		return 0;
179 	}
180 
181 	if (strcmp(argv[4], "server") != 0) {
182 		hlog(LOG_ERR, "%s: Uplink's logresp message does not contain 'server' (protocol incompatibility)", c->addr_rem);
183 		client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
184 		return 0;
185 	}
186 
187 	p = strchr(argv[5], ',');
188 	if (p)
189 		*p = 0;
190 
191 	if (strlen(argv[5]) > CALLSIGNLEN_MAX) {
192 		hlog(LOG_ERR, "%s: Uplink's server name is too long: '%s'", c->addr_rem, argv[5]);
193 		client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
194 		return 0;
195 	}
196 
197 	if (strcasecmp(argv[5], serverid) == 0) {
198 		hlog(LOG_ERR, "%s: Uplink's server name is same as ours: '%s'", c->addr_rem, argv[5]);
199 		client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
200 		return 0;
201 	}
202 
203 	/* todo: validate server callsign with the q valid path algorithm */
204 
205 	/* store the remote server's callsign as the "client username" */
206 	strncpy(c->username, argv[5], sizeof(c->username));
207 	c->username[sizeof(c->username)-1] = 0;
208 
209 	/* uplink servers are always "validated" */
210 	c->validated = VALIDATED_WEAK;
211 
212 	/* check the server name against certificate */
213 #ifdef USE_SSL
214 	if (c->ssl_con && c->ssl_con->validate) {
215 		hlog(LOG_DEBUG, "%s/%s: Uplink: Validating SSL server cert subject", c->addr_rem, c->username);
216 		int ssl_res = ssl_validate_peer_cert_phase2(c);
217 
218 		if (ssl_res != 0) {
219 			hlog(LOG_WARNING, "%s/%s: SSL server cert validation failed: %s", c->addr_rem, c->username, ssl_strerror(ssl_res));
220 			client_close(self, c, CLIERR_UPLINK_PEER_CERT_FAIL);
221 			return 0;
222 		}
223 
224 		c->validated = VALIDATED_STRONG;
225 	}
226 #endif
227 
228 	hlog(LOG_INFO, "%s: Uplink logged in to server %s", c->addr_rem, c->username);
229 
230 	c->handler_line_in = incoming_handler;
231 
232 	/* mark as connected and classify */
233 	worker_mark_client_connected(self, c);
234 
235 	return 0;
236 }
237 
238 /*
239  *	uplink_login_handler parses the "# <software> <version" string given by
240  *	an upstream server.
241  */
242 
uplink_login_handler(struct worker_t * self,struct client_t * c,int l4proto,char * s,int len)243 int uplink_login_handler(struct worker_t *self, struct client_t *c, int l4proto, char *s, int len)
244 {
245 	char buf[1000];
246 	int rc;
247 	int argc;
248 	char *argv[256];
249 
250 	hlog_packet(LOG_INFO, s, len, "%s: Uplink server software: ", c->addr_rem);
251 
252 #ifdef USE_SSL
253 	if (c->ssl_con && c->ssl_con->validate) {
254 		hlog(LOG_DEBUG, "%s/%s: Uplink: Validating SSL server cert against CA", c->addr_rem, c->username);
255 		int ssl_res = ssl_validate_peer_cert_phase1(c);
256 
257 		if (ssl_res != 0) {
258 			hlog(LOG_WARNING, "%s/%s: SSL server cert validation failed: %s", c->addr_rem, c->username, ssl_strerror(ssl_res));
259 			client_close(self, c, CLIERR_UPLINK_PEER_CERT_FAIL);
260 			return 0;
261 		}
262 	}
263 #endif
264 
265 	/* parse to arguments */
266 	/* make it null-terminated for our string processing */
267 	char *e = s + len;
268 	*e = 0;
269 	if ((argc = parse_args_noshell(argv, s)) == 0 || *argv[0] != '#') {
270 		hlog(LOG_ERR, "%s: Uplink's welcome message is not recognized: no # in beginning", c->addr_rem);
271 		client_close(self, c, CLIERR_UPLINK_LOGIN_PROTO_ERR);
272 		return 0;
273 	}
274 
275 	if (argc >= 3) {
276 		strncpy(c->app_name, argv[1], sizeof(c->app_name));
277 		c->app_name[sizeof(c->app_name)-1] = 0;
278 		strncpy(c->app_version, argv[2], sizeof(c->app_version));
279 		c->app_version[sizeof(c->app_version)-1] = 0;
280 	}
281 
282 	// TODO: The uplink login command here could maybe be improved to send a filter command.
283 	len = sprintf(buf, "user %s pass %s vers %s\r\n", serverid, passcode, verstr_aprsis);
284 
285 	hlog(LOG_DEBUG, "%s: my login string: \"%.*s\"", c->addr_rem, len-2, buf, len);
286 
287 	rc = c->write(self, c, buf, len);
288 	if (rc < -2) return rc; // the client was destroyed by client_write, don't touch it
289 
290 	c->handler_line_in = uplink_logresp_handler;
291 	c->state   = CSTATE_LOGRESP;
292 
293 	hlog(LOG_INFO, "%s: Connected to server, logging in", c->addr_rem);
294 
295 	return 0;
296 }
297 
298 #ifdef USE_SSL
config_uplink_ssl_setup(struct uplink_config_t * l)299 int config_uplink_ssl_setup(struct uplink_config_t *l)
300 {
301 	l->ssl = ssl_alloc();
302 
303 	if (ssl_create(l->ssl, (void *)l)) {
304 		hlog(LOG_ERR, "Uplink: Failed to create SSL context for '%s*'", l->name);
305 		return -1;
306 	}
307 
308 	/* optional client cert for server-side validation */
309 	if (l->certfile && l->keyfile) {
310 		if (ssl_certificate(l->ssl, l->certfile, l->keyfile)) {
311 			hlog(LOG_ERR, "Uplink '%s': Failed to load SSL certificatess", l->name);
312 			ssl_free(l->ssl);
313 			l->ssl = NULL;
314 			return -1;
315 		}
316 	}
317 
318 	/* optional server cert validation */
319 	if (l->cafile) {
320 		if (ssl_ca_certificate(l->ssl, l->cafile, 2)) {
321 			hlog(LOG_ERR, "Uplink '%s': Failed to load trusted SSL CA certificates", l->name);
322 			ssl_free(l->ssl);
323 			l->ssl = NULL;
324 			return -1;
325 		}
326 	}
327 
328 	hlog(LOG_INFO, "Uplink %s: SSL initialized%s%s",
329 		l->name,
330 		(l->cafile) ? ", server validated" : "",
331 		(l->certfile) ? ", client cert loaded" : "");
332 
333 	return 0;
334 }
335 #endif
336 
337 
338 /*
339  *	Uplink a single connection
340  */
341 
make_uplink(struct uplink_config_t * l)342 int make_uplink(struct uplink_config_t *l)
343 {
344 	int fd, i, addrc, arg;
345 	int uplink_index;
346 	union sockaddr_u sa; /* large enough for also IPv6 address */
347 	socklen_t addr_len;
348 	struct addrinfo *ai, *a, *ap[21];
349 	struct addrinfo req;
350 	char *addr_s = NULL;
351 	int port;
352 	struct sockaddr *srcaddr;
353 	socklen_t srcaddr_len;
354 
355 	memset(&req, 0, sizeof(req));
356 	req.ai_family   = 0;
357 	req.ai_socktype = SOCK_STREAM;
358 	req.ai_protocol = IPPROTO_TCP;
359 	req.ai_flags    = AI_ADDRCONFIG;
360 	ai = NULL;
361 
362 #ifdef USE_SSL
363 	/* SSL requires both a cert and a key, or none at all */
364 	if ((l->certfile && !l->keyfile) || (l->keyfile && !l->certfile)) {
365 		hlog(LOG_ERR, "Uplink %s: Only one of sslkey and sslcert defined - both needed for SSL authentication", l->name);
366 		return -2;
367 	}
368 
369 	/* todo: allow triggering SSL without client auth */
370 	if (l->keyfile && l->certfile) {
371 		if (!l->ssl) {
372 			if (config_uplink_ssl_setup(l)) {
373 				hlog(LOG_ERR, "Uplink '%s': SSL setup failed", l->name);
374 				return -2;
375 			}
376 		}
377 	}
378 #endif
379 
380 	/* find a free uplink slot */
381 	for (uplink_index = 0; uplink_index < MAX_UPLINKS; uplink_index++) {
382 		if (!uplink_client[uplink_index])
383 			break;
384 	}
385 	if (uplink_index == MAX_UPLINKS) {
386 		hlog(LOG_ERR, "Uplink %s: No available uplink slots, %d used", l->name, MAX_UPLINKS);
387 		return -2;
388 	}
389 
390 	if (strcasecmp(l->proto, "tcp") == 0) {
391 		// well, do nothing for now.
392 	} else if (strcasecmp(l->proto, "udp") == 0) {
393 		req.ai_socktype = SOCK_DGRAM;
394 		req.ai_protocol = IPPROTO_UDP;
395 #ifdef USE_SCTP
396 	} else if (strcasecmp(l->proto, "sctp") == 0) {
397 		req.ai_socktype = SOCK_STREAM;
398 		req.ai_protocol = IPPROTO_SCTP;
399 #endif
400 	} else {
401 		hlog(LOG_ERR, "Uplink %s: Unsupported protocol '%s'\n", l->name, l->proto);
402 		return -2;
403 	}
404 
405 	port = atoi(l->port);
406 	if (port < 1 || port > 65535) {
407 		hlog(LOG_ERR, "Uplink %s: unsupported port number '%s'\n", l->name, l->port);
408 		return -2;
409 	}
410 
411 	l->state = UPLINK_ST_CONNECTING;
412 	i = getaddrinfo(l->host, l->port, &req, &ai);
413 	if (i != 0) {
414 		hlog(LOG_INFO, "Uplink %s: address resolving failure of '%s' '%s': %s", l->name, l->host, l->port, gai_strerror(i));
415 		l->state = UPLINK_ST_NOT_LINKED;
416 		return -2;
417 	}
418 
419 	/* Count the amount of addresses in response */
420 	addrc = 0;
421 	for (a = ai; a && addrc < 20 ; a = a->ai_next, ++addrc) {
422 		ap[addrc] = a; /* Up to 20 first addresses */
423 	}
424 	ap[addrc] = NULL;
425 
426 	if (addrc == 0) {
427 		hlog(LOG_INFO, "Uplink %s: address resolving of '%s' '%s': returned 0 addresses", l->name, l->host, l->port);
428 		l->state = UPLINK_ST_NOT_LINKED;
429 		return -2;
430 	}
431 
432 	/* Pick random address to start from */
433 	// coverity[dont_call]  // squelch warning: not security sensitive use of random()
434 	i = random() % addrc;
435 
436 	/* Then lets try making socket and connection in address order */
437 	/* TODO: BUG: If the TCP connection succeeds, but the server rejects our
438 	 * login due to a bad source address (like, IPv4 would be allowed but our
439 	 * IPv6 address is not in the server's ACL), this currently does not switch
440 	 * to the next destination address.
441 	 * Instead it'll wait for the retry timer and then try a random
442 	 * destination address, and eventually succeed (unless very unlucky).
443 	 */
444 	fd = -1;
445 	while ((a = ap[i])) {
446 		ap[i] = NULL;
447 		addr_s = strsockaddr(a->ai_addr, a->ai_addrlen);
448 
449 		hlog(LOG_INFO, "Uplink %s: Connecting to %s:%s (%s) [link %d, addr %d/%d]",
450 			l->name, l->host, l->port, addr_s, uplink_index, i+1, addrc);
451 		i++;
452 		if (i == addrc)
453 			i = 0;
454 
455 		if ((fd = socket(a->ai_family, a->ai_socktype, a->ai_protocol)) < 0) {
456 			hlog(LOG_CRIT, "Uplink %s: socket(): %s\n", l->name, strerror(errno));
457 			hfree(addr_s);
458 			continue;
459 		}
460 
461 		arg = 1;
462 		if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg)))
463 			hlog(LOG_ERR, "Uplink %s: Failed to set SO_REUSEADDR on new socket: %s", l->name, strerror(errno));
464 
465 		/* bind source address */
466 		srcaddr_len = 0;
467 		if (a->ai_family == AF_INET && uplink_bind_v4_len != 0) {
468 			srcaddr = (struct sockaddr *)&uplink_bind_v4;
469 			srcaddr_len = uplink_bind_v4_len;
470 		} else if (a->ai_family == AF_INET6 && uplink_bind_v6_len != 0) {
471 			srcaddr = (struct sockaddr *)&uplink_bind_v6;
472 			srcaddr_len = uplink_bind_v6_len;
473 		}
474 
475 		if (srcaddr_len) {
476 			if (bind(fd, srcaddr, srcaddr_len)) {
477 				char *s = strsockaddr(srcaddr, srcaddr_len);
478 				hlog(LOG_ERR, "Uplink %s: Failed to bind source address '%s': %s", l->name, s, strerror(errno));
479 				hfree(s);
480 				goto connerr;
481 			}
482 		}
483 
484 		/* set non-blocking mode at this point, so that we can make a
485 		 * non-blocking connect() with a short timeout
486 		 */
487 		if (fcntl(fd, F_SETFL, O_NONBLOCK)) {
488 			hlog(LOG_CRIT, "Uplink %s: Failed to set non-blocking mode on new socket: %s", l->name, strerror(errno));
489 			goto connerr;
490 		}
491 
492 		/* Use TCP_NODELAY for APRS-IS sockets. High delays can cause packets getting past
493 		 * the dupe filters.
494 		 */
495 #ifdef TCP_NODELAY
496 		if (a->ai_protocol == IPPROTO_TCP) {
497 			int arg = 1;
498 			if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&arg, sizeof(arg)))
499 				hlog(LOG_ERR, "Uplink %s: %s: setsockopt(TCP_NODELAY, %d) failed: %s", l->name, addr_s, arg, strerror(errno));
500 		}
501 #endif
502 
503 		if (connect(fd, a->ai_addr, a->ai_addrlen) && errno != EINPROGRESS) {
504 			hlog(LOG_ERR, "Uplink %s: connect(%s) failed: %s", l->name, addr_s, strerror(errno));
505 			goto connerr;
506 		}
507 
508 		/* Only wait a few seconds for the connection to be created.
509 		 * If the connection setup is very slow, it is unlikely to
510 		 * perform well enough anyway.
511 		 */
512 		struct pollfd connect_fd;
513 		connect_fd.fd = fd;
514 		connect_fd.events = POLLOUT;
515 		connect_fd.revents = 0;
516 
517 		int r = poll(&connect_fd, 1, 3000);
518 		hlog(LOG_DEBUG, "Uplink %s: poll after connect returned %d, revents %d", l->name, r, connect_fd.revents);
519 
520 		if (r < 0) {
521 			hlog(LOG_ERR, "Uplink %s: connect to %s: poll failed: %s", l->name, addr_s, strerror(errno));
522 			goto connerr;
523 		}
524 
525 		if (r < 1) {
526 			hlog(LOG_ERR, "Uplink %s: connect to %s timed out", l->name, addr_s);
527 			goto connerr;
528 		}
529 
530 		socklen_t optlen = sizeof(arg);
531 		if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *)&arg, &optlen) == -1) {
532 			hlog(LOG_ERR, "Uplink %s: getsockopt() after connect failed: %s", l->name, strerror(errno));
533 			goto connerr;
534 		} else if (arg == 0) {
535 			/* Successful connect! */
536 			hlog(LOG_DEBUG, "Uplink %s: successful connect", l->name);
537 			break;
538 		}
539 
540 		hlog(LOG_ERR, "Uplink %s: connect to %s failed: %s", l->name, addr_s, strerror(arg));
541 connerr:
542 		close(fd);
543 		fd = -1;
544 		hfree(addr_s);
545 	}
546 
547 	freeaddrinfo(ai); /* Not needed anymore.. */
548 
549 	if (fd < 0) {
550 		l->state = UPLINK_ST_NOT_LINKED;
551 		return -3; /* No successfull connection at any address.. */
552 	}
553 
554 	struct client_t *c = client_alloc();
555 	if (!c) {
556 		hlog(LOG_ERR, "Uplink %s: client_alloc() failed, too many clients", l->name);
557 		close(fd);
558 		l->state = UPLINK_ST_NOT_LINKED;
559 		return -3; /* No successfull connection at any address.. */
560 	}
561 
562 	l->client_ptr = (void *)c;
563 	c->uplink_index = uplink_index;
564 	c->fd    = fd;
565 	c->ai_protocol = req.ai_protocol;
566 	c->state = CSTATE_INIT;
567 	/* use the default login handler */
568 	c->handler_line_in = &uplink_login_handler;
569 	c->flags    = l->client_flags;
570 	c->keepalive = tick;
571 	c->last_read = tick;
572 	c->connect_time = now;
573 	strncpy(c->username, l->name, sizeof(c->username));
574 	c->username[sizeof(c->username)-1] = 0;
575 	c->username_len = strlen(c->username);
576 
577 	/* These peer/sock name calls can not fail -- or the socket closed
578 	   on us in which case it gets abandoned a bit further below. */
579 
580 	addr_len = sizeof(sa);
581 	getpeername(fd, (struct sockaddr *)&sa, &addr_len);
582 	//s = strsockaddr( &sa.sa, addr_len ); /* server side address */
583 	strncpy(c->addr_rem, addr_s, sizeof(c->addr_rem));
584 	c->addr_rem[sizeof(c->addr_rem)-1] = 0;
585 	hfree(addr_s);
586 	c->addr  = sa;
587 
588 	/* hex format of client's IP address + port */
589 
590 	char *s = hexsockaddr( &sa.sa, addr_len );
591 	strncpy(c->addr_hex, s, sizeof(c->addr_hex));
592 	c->addr_hex[sizeof(c->addr_hex)-1] = 0;
593 	hfree(s);
594 
595 	addr_len = sizeof(sa);
596 	getsockname(fd, (struct sockaddr *)&sa, &addr_len);
597 	s = strsockaddr( &sa.sa, addr_len ); /* client side address */
598 	strncpy(c->addr_loc, s, sizeof(c->addr_loc));
599 	c->addr_loc[sizeof(c->addr_loc)-1] = 0;
600 	hfree(s);
601 
602 	hlog(LOG_INFO, "Uplink %s: %s: Connection established on fd %d using source address %s", l->name, c->addr_rem, c->fd, c->addr_loc);
603 
604 	if (set_client_sockopt(c) < 0)
605 		goto err;
606 
607 	uplink_client[uplink_index] = c;
608 	l->state = UPLINK_ST_CONNECTED;
609 
610 	/* set up SSL if necessary */
611 #ifdef USE_SSL
612 	if (l->ssl) {
613 		if (ssl_create_connection(l->ssl, c, 1))
614 			goto err;
615 	}
616 #endif
617 
618 	/* Push it on the first worker, which ever it is..
619 	 */
620 
621 	if (pass_client_to_worker(worker_threads, c))
622 		goto err;
623 
624 	if ((i = pthread_mutex_lock(& uplink_connects.mutex )))
625 		hlog(LOG_ERR, "make_uplink: could not lock uplink_connects: %s", strerror(i));
626 	++ uplink_connects.gauge;
627 	++ uplink_connects.counter;
628 	++ uplink_connects.refcount;  /* <-- that does not get decremented at any time..  */
629 	if ((i = pthread_mutex_unlock(& uplink_connects.mutex )))
630 		hlog(LOG_ERR, "make_uplink: could not unlock uplink_connects: %s", strerror(i));
631 
632 	c->portaccount = & uplink_connects; /* calculate traffic bytes/packets */
633 
634 	return 0;
635 
636 err:
637 	client_free(c);
638 	uplink_client[uplink_index] = NULL;
639 	l->state = UPLINK_ST_NOT_LINKED;
640 	return -1;
641 }
642 
643 
644 /*
645  *	Uplink thread
646  */
647 
uplink_thread(void * asdf)648 void uplink_thread(void *asdf)
649 {
650 	sigset_t sigs_to_block;
651 	int rc;
652 	int next_uplink = -1; /* the index to the next regular uplink candidate */
653 	int uplink_error_set = -1;
654 
655 	pthreads_profiling_reset("uplink");
656 
657 	sigemptyset(&sigs_to_block);
658 	sigaddset(&sigs_to_block, SIGALRM);
659 	sigaddset(&sigs_to_block, SIGINT);
660 	sigaddset(&sigs_to_block, SIGTERM);
661 	sigaddset(&sigs_to_block, SIGQUIT);
662 	sigaddset(&sigs_to_block, SIGHUP);
663 	sigaddset(&sigs_to_block, SIGURG);
664 	sigaddset(&sigs_to_block, SIGPIPE);
665 	sigaddset(&sigs_to_block, SIGUSR1);
666 	sigaddset(&sigs_to_block, SIGUSR2);
667 	pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
668 
669 	//hlog(LOG_INFO, "Uplink thread starting...");
670 
671 	uplink_reconfiguring = 1;
672 	while (!uplink_shutting_down) {
673 		if (uplink_reconfiguring || uplink_config_updated) {
674 			hlog(LOG_INFO, "Uplink thread applying new configuration...");
675 			__sync_synchronize();
676 			uplink_reconfiguring = 0;
677 			close_uplinkers();
678 
679 			free_uplink_config(&uplink_config);
680 			uplink_config = uplink_config_install;
681 			uplink_config_install = NULL;
682 			if (uplink_config)
683 				uplink_config->prevp = &uplink_config;
684 
685 			uplink_config_updated = 0;
686 
687 			hlog(LOG_INFO, "Uplink thread configured.");
688 		}
689 
690 		/* speed up shutdown */
691 		if (uplink_shutting_down || uplink_reconfiguring)
692 			continue;
693 
694 		if (uplink_config_updated) {
695 			uplink_reconfiguring = 1;
696 			continue;
697 		}
698 
699 		if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
700 			hlog(LOG_ERR, "uplink_thread(): could not lock uplink_client_mutex: %s", strerror(rc));
701 			continue;
702 		}
703 
704 		/* Check if all we have a single regular uplink connection up, out of all
705 		 * the configured ones. Also, check that all the UPLINKMULTI links are
706 		 * connected.
707 		 */
708 
709 		int has_uplink = 0; /* do we have a single regular uplink? */
710 		int avail_uplink = 0; /* how many regular uplinks are configured? */
711 
712 		struct uplink_config_t *l = uplink_config;
713 		for (; l; l = l->next) {
714 			if (l->client_flags & CLFLAGS_UPLINKMULTI) {
715 				/* MULTI uplink, needs to be up */
716 				if (l->state < UPLINK_ST_CONNECTING)
717 					make_uplink(l);
718 			} else {
719 				/* regular uplink, need to have one connected */
720 				if (l->state >= UPLINK_ST_CONNECTING)
721 					has_uplink++;
722 				avail_uplink++;
723 			}
724 		}
725 
726 		if (avail_uplink && !has_uplink) {
727 			hlog(LOG_INFO, "Uplink: %d uplinks configured, %d are connected, need to pick new", avail_uplink, has_uplink);
728 			/* we have regular uplinks but none are connected,
729 			 * pick the next one and connect */
730 			next_uplink++;
731 			if (next_uplink >= avail_uplink)
732 				next_uplink = 0;
733 			//hlog(LOG_DEBUG, "Uplink: picked uplink %d as the new candidate", next_uplink);
734 			l = uplink_config;
735 			int i = 0;
736 			while ((l) && i < next_uplink) {
737 				if (!(l->client_flags & CLFLAGS_UPLINKMULTI))
738 					i++;
739 				l = l->next;
740 			}
741 			if (l) {
742 				hlog(LOG_DEBUG, "Uplink: trying %s (%s:%s)", l->name, l->host, l->port);
743 				make_uplink(l);
744 			}
745 		}
746 
747 		if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
748 			hlog(LOG_CRIT, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
749 			continue;
750 		}
751 
752 		if (avail_uplink && !has_uplink) {
753 			status_error(3600, "no_uplink");
754 			uplink_error_set = 1;
755 		} else {
756 			if (uplink_error_set == 1) {
757 				status_error(-1, "no_uplink");
758 				uplink_error_set = 0;
759 			}
760 		}
761 
762 		/* sleep for 4 seconds between successful rounds */
763 		for (rc = 0; (!uplink_shutting_down) && rc < 4000/200; rc++)
764 			if (poll(NULL, 0, 200) == -1 && errno != EINTR)
765 				hlog(LOG_WARNING, "uplink: poll sleep failed: %s", strerror(errno));
766 	}
767 
768 	hlog(LOG_DEBUG, "Uplink thread shutting down uplinking sockets...");
769 	close_uplinkers();
770 }
771 
772 
773 /*
774  *	Start / stop the uplinks maintainer thread
775  */
776 
uplink_start(void)777 void uplink_start(void)
778 {
779 	if (uplink_running)
780 		return;
781 
782 	uplink_shutting_down = 0;
783 
784 	if (pthread_create(&uplink_th, &pthr_attrs, (void *)uplink_thread, NULL))
785 		perror("pthread_create failed for uplink_thread");
786 
787 	uplink_running = 1;
788 }
789 
uplink_stop(void)790 void uplink_stop(void)
791 {
792 	int i, e;
793 
794 	if (!uplink_running)
795 		return;
796 
797 	hlog(LOG_INFO, "Signalling uplink_thread to shut down...");
798 	uplink_shutting_down = 1;
799 
800 	if ((e = pthread_join(uplink_th, NULL))) {
801 		hlog(LOG_ERR, "Could not pthread_join uplink_th: %s", strerror(e));
802 	} else {
803 		hlog(LOG_INFO, "Uplink thread has terminated.");
804 		uplink_running = 0;
805 	}
806 
807 	/* free uplink config - and clean up the uplink_client indexed pointers
808 	 * which refer to the configs
809 	 */
810 	if ((e = pthread_mutex_lock(&uplink_client_mutex))) {
811 		hlog( LOG_ERR, "uplink_stop(): could not lock uplink_client_mutex: %s", strerror(e) );
812 		return;
813 	}
814 
815 	for (i = 0; i < MAX_UPLINKS; i++)
816 		uplink_client[i] = NULL;
817 
818 	free_uplink_config(&uplink_config);
819 
820 	if ((e = pthread_mutex_unlock(&uplink_client_mutex))) {
821 		hlog( LOG_ERR, "uplink_stop(): could not unlock uplink_client_mutex: %s", strerror(e) );
822 		return;
823 	}
824 }
825