1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  *
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  *
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 /*
20  * Handling of pooler listening sockets
21  */
22 
23 #include "bouncer.h"
24 
25 #include <usual/netdb.h>
26 
27 struct ListenSocket {
28 	struct List node;
29 	int fd;
30 	bool active;
31 	struct event ev;
32 	PgAddr addr;
33 };
34 
35 static STATLIST(sock_list);
36 
37 /* hints for getaddrinfo(listen_addr) */
38 static const struct addrinfo hints = {
39 	.ai_family = AF_UNSPEC,
40 	.ai_socktype = SOCK_STREAM,
41 	.ai_protocol = IPPROTO_TCP,
42 	.ai_flags = AI_PASSIVE,
43 };
44 
45 /* should listening sockets be active or suspended? */
46 static bool need_active = false;
47 /* is it actually active or suspended? */
48 static bool pooler_active = false;
49 
50 /* on accept() failure sleep 5 seconds */
51 static struct event ev_err;
52 static struct timeval err_timeout = {5, 0};
53 
54 static void tune_accept(int sock, bool on);
55 
56 /* atexit() cleanup func */
cleanup_sockets(void)57 static void cleanup_sockets(void)
58 {
59 	struct ListenSocket *ls;
60 	struct List *el;
61 
62 	/* avoid cleanup if exit() while suspended */
63 	if (cf_pause_mode == P_SUSPEND)
64 		return;
65 
66 	while ((el = statlist_pop(&sock_list)) != NULL) {
67 		ls = container_of(el, struct ListenSocket, node);
68 		if (ls->fd > 0) {
69 			safe_close(ls->fd);
70 			ls->fd = 0;
71 		}
72 		if (pga_is_unix(&ls->addr) && cf_unix_socket_dir[0] != '@') {
73 			char buf[sizeof(struct sockaddr_un) + 20];
74 			snprintf(buf, sizeof(buf), "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port);
75 			unlink(buf);
76 		}
77 		statlist_remove(&sock_list, &ls->node);
78 		free(ls);
79 	}
80 }
81 
82 /*
83  * initialize another listening socket.
84  */
add_listen(int af,const struct sockaddr * sa,int salen)85 static bool add_listen(int af, const struct sockaddr *sa, int salen)
86 {
87 	struct ListenSocket *ls;
88 	int sock, res;
89 	char buf[128];
90 	const char *errpos;
91 
92 	log_debug("add_listen: %s", sa2str(sa, buf, sizeof(buf)));
93 
94 	/* create socket */
95 	errpos = "socket";
96 	sock = socket(af, SOCK_STREAM, 0);
97 	if (sock < 0)
98 		goto failed;
99 
100 	/* SO_REUSEADDR behaviour it default in WIN32.  */
101 #ifndef WIN32
102 	/* relaxed binding */
103 	if (af != AF_UNIX) {
104 		int val = 1;
105 		errpos = "setsockopt";
106 		res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
107 		if (res < 0)
108 			goto failed;
109 	}
110 #endif
111 
112 #ifdef IPV6_V6ONLY
113 	/* avoid ipv6 socket's attempt to takeover ipv4 port */
114 	if (af == AF_INET6) {
115 		int val = 1;
116 		errpos = "setsockopt/IPV6_V6ONLY";
117 		res = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
118 		if (res < 0)
119 			goto failed;
120 	}
121 #endif
122 
123 	/*
124 	 * If configured, set SO_REUSEPORT or equivalent.  If it's not
125 	 * enabled, just leave the socket alone.  (We could also unset
126 	 * the socket option in that case, but this area is fairly
127 	 * unportable, so perhaps better to avoid it.)
128 	 */
129 	if (af != AF_UNIX && cf_so_reuseport) {
130 #if defined(SO_REUSEPORT_LB)
131 		int val = 1;
132 		errpos = "setsockopt/SO_REUSEPORT_LB";
133 		res = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT_LB, &val, sizeof(val));
134 		if (res < 0)
135 			goto failed;
136 #elif defined(SO_REUSEPORT)
137 		int val = 1;
138 		errpos = "setsockopt/SO_REUSEPORT";
139 		res = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
140 		if (res < 0)
141 			goto failed;
142 #else
143 		die("so_reuseport not supported on this platform");
144 #endif
145 	}
146 
147 	/* bind it */
148 	errpos = "bind";
149 	res = bind(sock, sa, salen);
150 	if (res < 0)
151 		goto failed;
152 
153 	/* set common options */
154 	errpos = "tune_socket";
155 	if (!tune_socket(sock, (af == AF_UNIX)))
156 		goto failed;
157 
158 	/* finally, accept connections */
159 	errpos = "listen";
160 	res = listen(sock, cf_listen_backlog);
161 	if (res < 0)
162 		goto failed;
163 
164 	errpos = "calloc";
165 	ls = calloc(1, sizeof(*ls));
166 	if (!ls)
167 		goto failed;
168 
169 	list_init(&ls->node);
170 	ls->fd = sock;
171 	if (sa->sa_family == AF_UNIX) {
172 		pga_set(&ls->addr, AF_UNIX, cf_listen_port);
173 	} else {
174 		pga_copy(&ls->addr, sa);
175 	}
176 
177 	if (af == AF_UNIX) {
178 #ifndef WIN32
179 		if (cf_unix_socket_dir[0] != '@') {
180 			struct sockaddr_un *un = (struct sockaddr_un *)sa;
181 			change_file_mode(un->sun_path, cf_unix_socket_mode, NULL, cf_unix_socket_group);
182 		}
183 #endif
184 	} else {
185 		tune_accept(sock, cf_tcp_defer_accept);
186 	}
187 
188 	log_info("listening on %s", sa2str(sa, buf, sizeof(buf)));
189 	statlist_append(&sock_list, &ls->node);
190 	return true;
191 
192 failed:
193 	log_warning("cannot listen on %s: %s(): %s",
194 		    sa2str(sa, buf, sizeof(buf)),
195 		    errpos, strerror(errno));
196 	if (sock >= 0)
197 		safe_close(sock);
198 	return false;
199 }
200 
create_unix_socket(const char * socket_dir,int listen_port)201 static void create_unix_socket(const char *socket_dir, int listen_port)
202 {
203 	struct sockaddr_un un;
204 	int addrlen;
205 	int res;
206 	char lockfile[sizeof(struct sockaddr_un) + 10];
207 	struct stat st;
208 
209 	/* fill sockaddr struct */
210 	memset(&un, 0, sizeof(un));
211 	un.sun_family = AF_UNIX;
212 	snprintf(un.sun_path, sizeof(un.sun_path),
213 		"%s/.s.PGSQL.%d", socket_dir, listen_port);
214 	if (socket_dir[0] == '@') {
215 		/*
216 		 * By convention, for abstract Unix sockets, only the
217 		 * length of the string is the sockaddr length.
218 		 */
219 		addrlen = offsetof(struct sockaddr_un, sun_path) + strlen(un.sun_path);
220 		un.sun_path[0] = '\0';
221 	}
222 	else {
223 		addrlen = sizeof(un);
224 	}
225 
226 	if (socket_dir[0] != '@') {
227 		/* check for lockfile */
228 		snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
229 		res = lstat(lockfile, &st);
230 		if (res == 0)
231 			die("unix port %d is in use", listen_port);
232 
233 		/* expect old bouncer gone */
234 		unlink(un.sun_path);
235 	}
236 
237 	add_listen(AF_UNIX, (const struct sockaddr *)&un, addrlen);
238 }
239 
240 /*
241  * Notify pooler only when also data is arrived.
242  *
243  * optval specifies how long after connection attempt to wait for data.
244  *
245  * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
246  *
247  * SO_ACCEPTFILTER needs to be set after listen(), maybe TCP_DEFER_ACCEPT too.
248  */
tune_accept(int sock,bool on)249 static void tune_accept(int sock, bool on)
250 {
251 	const char *act = on ? "install" : "uninstall";
252 	int res = 0;
253 #ifdef TCP_DEFER_ACCEPT
254 	int val = 45; /* FIXME: proper value */
255 	socklen_t vlen = sizeof(val);
256 	res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
257 	log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
258 	val = on ? 1 : 0;
259 	log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
260 	res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
261 #else
262 #if 0
263 #ifdef SO_ACCEPTFILTER
264 	struct accept_filter_arg af, *afp = on ? &af : NULL;
265 	socklen_t af_len = on ? sizeof(af) : 0;
266 	memset(&af, 0, sizeof(af));
267 	strcpy(af.af_name, "dataready");
268 	log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
269 	res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
270 #endif
271 #endif
272 #endif
273 	if (res < 0)
274 		log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
275 			    act, strerror(errno));
276 }
277 
pooler_tune_accept(bool on)278 void pooler_tune_accept(bool on)
279 {
280 	struct List *el;
281 	struct ListenSocket *ls;
282 	statlist_for_each(el, &sock_list) {
283 		ls = container_of(el, struct ListenSocket, node);
284 		if (!pga_is_unix(&ls->addr))
285 			tune_accept(ls->fd, on);
286 	}
287 }
288 
err_wait_func(evutil_socket_t sock,short flags,void * arg)289 static void err_wait_func(evutil_socket_t sock, short flags, void *arg)
290 {
291 	if (cf_pause_mode != P_SUSPEND)
292 		resume_pooler();
293 }
294 
addrpair(const PgAddr * src,const PgAddr * dst)295 static const char *addrpair(const PgAddr *src, const PgAddr *dst)
296 {
297 	static char ip1buf[PGADDR_BUF], ip2buf[PGADDR_BUF],
298 	            buf[2*PGADDR_BUF + 16];
299 	const char *ip1, *ip2;
300 	if (pga_is_unix(src))
301 		return "unix->unix";
302 
303 	ip1 = pga_ntop(src, ip1buf, sizeof(ip1buf));
304 	ip2 = pga_ntop(src, ip2buf, sizeof(ip2buf));
305 	snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
306 		 ip1, pga_port(src), ip2, pga_port(dst));
307 	return buf;
308 }
309 
conninfo(const PgSocket * sk)310 static const char *conninfo(const PgSocket *sk)
311 {
312 	if (is_server_socket(sk)) {
313 		return addrpair(&sk->local_addr, &sk->remote_addr);
314 	} else {
315 		return addrpair(&sk->remote_addr, &sk->local_addr);
316 	}
317 }
318 
319 /* got new connection, associate it with client struct */
pool_accept(evutil_socket_t sock,short flags,void * arg)320 static void pool_accept(evutil_socket_t sock, short flags, void *arg)
321 {
322 	struct ListenSocket *ls = arg;
323 	int fd;
324 	PgSocket *client;
325 	union {
326 		struct sockaddr_in in;
327 		struct sockaddr_in6 in6;
328 		struct sockaddr_un un;
329 		struct sockaddr sa;
330 	} raddr;
331 	socklen_t len = sizeof(raddr);
332 	bool is_unix = pga_is_unix(&ls->addr);
333 
334 	if(!(flags & EV_READ)) {
335 		log_warning("no EV_READ in pool_accept");
336 		return;
337 	}
338 loop:
339 	/* get fd */
340 	fd = safe_accept(sock, &raddr.sa, &len);
341 	if (fd < 0) {
342 		if (errno == EAGAIN)
343 			return;
344 		else if (errno == ECONNABORTED)
345 			return;
346 
347 		/*
348 		 * probably fd limit, pointless to try often
349 		 * wait a bit, hope that admin resolves somehow
350 		 */
351 		log_error("accept() failed: %s", strerror(errno));
352 		evtimer_assign(&ev_err, pgb_event_base, err_wait_func, NULL);
353 		safe_evtimer_add(&ev_err, &err_timeout);
354 		suspend_pooler();
355 		return;
356 	}
357 
358 	log_noise("new fd from accept=%d", fd);
359 	if (is_unix) {
360 		client = accept_client(fd, true);
361 	} else {
362 		client = accept_client(fd, false);
363 	}
364 
365 	if (client)
366 		slog_debug(client, "P: got connection: %s", conninfo(client));
367 
368 	/*
369 	 * there may be several clients waiting,
370 	 * avoid context switch by looping
371 	 */
372 	goto loop;
373 }
374 
use_pooler_socket(int sock,bool is_unix)375 bool use_pooler_socket(int sock, bool is_unix)
376 {
377 	struct ListenSocket *ls;
378 	int res;
379 	char buf[PGADDR_BUF];
380 
381 	if (!tune_socket(sock, is_unix))
382 		return false;
383 
384 	ls = calloc(1, sizeof(*ls));
385 	if (!ls)
386 		return false;
387 	ls->fd = sock;
388 	if (is_unix) {
389 		pga_set(&ls->addr, AF_UNIX, cf_listen_port);
390 	} else {
391 		struct sockaddr_storage ss;
392 		socklen_t len = sizeof(ss);
393 		res = getsockname(sock, (struct sockaddr *)&ss, &len);
394 		if (res < 0) {
395 			log_error("getsockname failed");
396 			free(ls);
397 			return false;
398 		}
399 		pga_copy(&ls->addr, (struct sockaddr *)&ss);
400 	}
401 	log_info("got pooler socket: %s", pga_str(&ls->addr, buf, sizeof(buf)));
402 	statlist_append(&sock_list, &ls->node);
403 	return true;
404 }
405 
suspend_pooler(void)406 void suspend_pooler(void)
407 {
408 	struct List *el;
409 	struct ListenSocket *ls;
410 
411 	need_active = false;
412 	statlist_for_each(el, &sock_list) {
413 		ls = container_of(el, struct ListenSocket, node);
414 		if (!ls->active)
415 			continue;
416 		if (event_del(&ls->ev) < 0) {
417 			log_warning("suspend_pooler, event_del: %s", strerror(errno));
418 			return;
419 		}
420 		ls->active = false;
421 	}
422 	pooler_active = false;
423 }
424 
resume_pooler(void)425 void resume_pooler(void)
426 {
427 	struct List *el;
428 	struct ListenSocket *ls;
429 
430 	need_active = true;
431 	statlist_for_each(el, &sock_list) {
432 		ls = container_of(el, struct ListenSocket, node);
433 		if (ls->active)
434 			continue;
435 		event_assign(&ls->ev, pgb_event_base, ls->fd, EV_READ | EV_PERSIST, pool_accept, ls);
436 		if (event_add(&ls->ev, NULL) < 0) {
437 			log_warning("event_add failed: %s", strerror(errno));
438 			return;
439 		}
440 		ls->active = true;
441 	}
442 	pooler_active = true;
443 }
444 
445 /* retry previously failed suspend_pooler() / resume_pooler() */
per_loop_pooler_maint(void)446 void per_loop_pooler_maint(void)
447 {
448 	if (need_active && !pooler_active)
449 		resume_pooler();
450 	else if (!need_active && pooler_active)
451 		suspend_pooler();
452 }
453 
parse_addr(void * arg,const char * addr)454 static bool parse_addr(void *arg, const char *addr)
455 {
456 	int res;
457 	char service[64];
458 	struct addrinfo *ai, *gaires = NULL;
459 	bool ok;
460 
461 	if (!*addr)
462 		return true;
463 	if (strcmp(addr, "*") == 0)
464 		addr = NULL;
465 	snprintf(service, sizeof(service), "%d", cf_listen_port);
466 
467 	res = getaddrinfo(addr, service, &hints, &gaires);
468 	if (res != 0) {
469 		die("getaddrinfo('%s', '%d') = %s [%d]", addr ? addr : "*",
470 		      cf_listen_port, gai_strerror(res), res);
471 	}
472 
473 	for (ai = gaires; ai; ai = ai->ai_next) {
474 		ok = add_listen(ai->ai_family, ai->ai_addr, ai->ai_addrlen);
475 		/* it's unclear whether all or only first result should be used */
476 		if (0 && ok)
477 			break;
478 	}
479 
480 	freeaddrinfo(gaires);
481 	return true;
482 }
483 
484 /* listen on socket - should happen after all other initializations */
pooler_setup(void)485 void pooler_setup(void)
486 {
487 	int n;
488 
489 	n = sd_listen_fds(0);
490 	if (n > 0) {
491 		if (cf_listen_addr && *cf_listen_addr)
492 			log_warning("sockets passed from service manager, cf_listen_addr ignored");
493 		if (cf_unix_socket_dir && *cf_unix_socket_dir && strcmp(cf_unix_socket_dir, DEFAULT_UNIX_SOCKET_DIR) != 0)
494 			log_warning("sockets passed from service manager, cf_unix_socket_dir ignored");
495 
496 		for (int i = 0; i < n; i++) {
497 			int fd = SD_LISTEN_FDS_START + i;
498 			struct ListenSocket *ls;
499 			bool ok = true;
500 
501 			ls = calloc(1, sizeof(*ls));
502 			if (!ls)
503 				die("out of memory");
504 			list_init(&ls->node);
505 			ls->fd = fd;
506 			if (sd_is_socket(fd, AF_UNIX, 0, -1)) {
507 				pga_set(&ls->addr, AF_UNIX, 0);
508 				if (!tune_socket(fd, true))
509 					ok = false;
510 			} else if (sd_is_socket(fd, AF_INET, 0, -1)) {
511 				pga_set(&ls->addr, AF_INET, 0);
512 				if (!tune_socket(fd, false))
513 					ok = false;
514 				tune_accept(fd, cf_tcp_defer_accept);
515 			} else if (sd_is_socket(fd, AF_INET6, 0, -1)) {
516 				pga_set(&ls->addr, AF_INET6, 0);
517 				if (!tune_socket(fd, false))
518 					ok = false;
519 				tune_accept(fd, cf_tcp_defer_accept);
520 			}
521 			if (!ok)
522 				die("failed to set up socket passed from service manager (fd %d)", fd);
523 			log_info("socket passed from service manager (fd %d)", fd);
524 			statlist_append(&sock_list, &ls->node);
525 		}
526 	} else {
527 		bool ok;
528 		static bool init_done = false;
529 
530 		if (!init_done) {
531 			/* remove socket on shutdown */
532 			atexit(cleanup_sockets);
533 			init_done = true;
534 		}
535 
536 		ok = parse_word_list(cf_listen_addr, parse_addr, NULL);
537 		if (!ok)
538 			die("failed to parse listen_addr list: %s", cf_listen_addr);
539 
540 		if (cf_unix_socket_dir && *cf_unix_socket_dir)
541 			create_unix_socket(cf_unix_socket_dir, cf_listen_port);
542 	}
543 
544 	if (!statlist_count(&sock_list))
545 		die("nowhere to listen on");
546 
547 	resume_pooler();
548 }
549 
for_each_pooler_fd(pooler_cb cbfunc,void * arg)550 bool for_each_pooler_fd(pooler_cb cbfunc, void *arg)
551 {
552 	struct List *el;
553 	struct ListenSocket *ls;
554 	bool ok;
555 
556 	statlist_for_each(el, &sock_list) {
557 		ls = container_of(el, struct ListenSocket, node);
558 		ok = cbfunc(arg, ls->fd, &ls->addr);
559 		if (!ok)
560 			return false;
561 	}
562 	return true;
563 }
564