1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 /*
20 * Handling of pooler listening sockets
21 */
22
23 #include "bouncer.h"
24
25 #include <usual/netdb.h>
26
27 struct ListenSocket {
28 struct List node;
29 int fd;
30 bool active;
31 struct event ev;
32 PgAddr addr;
33 };
34
35 static STATLIST(sock_list);
36
37 /* hints for getaddrinfo(listen_addr) */
38 static const struct addrinfo hints = {
39 .ai_family = AF_UNSPEC,
40 .ai_socktype = SOCK_STREAM,
41 .ai_protocol = IPPROTO_TCP,
42 .ai_flags = AI_PASSIVE,
43 };
44
45 /* should listening sockets be active or suspended? */
46 static bool need_active = false;
47 /* is it actually active or suspended? */
48 static bool pooler_active = false;
49
50 /* on accept() failure sleep 5 seconds */
51 static struct event ev_err;
52 static struct timeval err_timeout = {5, 0};
53
54 static void tune_accept(int sock, bool on);
55
56 /* atexit() cleanup func */
cleanup_sockets(void)57 static void cleanup_sockets(void)
58 {
59 struct ListenSocket *ls;
60 struct List *el;
61
62 /* avoid cleanup if exit() while suspended */
63 if (cf_pause_mode == P_SUSPEND)
64 return;
65
66 while ((el = statlist_pop(&sock_list)) != NULL) {
67 ls = container_of(el, struct ListenSocket, node);
68 if (ls->fd > 0) {
69 safe_close(ls->fd);
70 ls->fd = 0;
71 }
72 if (pga_is_unix(&ls->addr) && cf_unix_socket_dir[0] != '@') {
73 char buf[sizeof(struct sockaddr_un) + 20];
74 snprintf(buf, sizeof(buf), "%s/.s.PGSQL.%d", cf_unix_socket_dir, cf_listen_port);
75 unlink(buf);
76 }
77 statlist_remove(&sock_list, &ls->node);
78 free(ls);
79 }
80 }
81
82 /*
83 * initialize another listening socket.
84 */
add_listen(int af,const struct sockaddr * sa,int salen)85 static bool add_listen(int af, const struct sockaddr *sa, int salen)
86 {
87 struct ListenSocket *ls;
88 int sock, res;
89 char buf[128];
90 const char *errpos;
91
92 log_debug("add_listen: %s", sa2str(sa, buf, sizeof(buf)));
93
94 /* create socket */
95 errpos = "socket";
96 sock = socket(af, SOCK_STREAM, 0);
97 if (sock < 0)
98 goto failed;
99
100 /* SO_REUSEADDR behaviour it default in WIN32. */
101 #ifndef WIN32
102 /* relaxed binding */
103 if (af != AF_UNIX) {
104 int val = 1;
105 errpos = "setsockopt";
106 res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
107 if (res < 0)
108 goto failed;
109 }
110 #endif
111
112 #ifdef IPV6_V6ONLY
113 /* avoid ipv6 socket's attempt to takeover ipv4 port */
114 if (af == AF_INET6) {
115 int val = 1;
116 errpos = "setsockopt/IPV6_V6ONLY";
117 res = setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
118 if (res < 0)
119 goto failed;
120 }
121 #endif
122
123 /*
124 * If configured, set SO_REUSEPORT or equivalent. If it's not
125 * enabled, just leave the socket alone. (We could also unset
126 * the socket option in that case, but this area is fairly
127 * unportable, so perhaps better to avoid it.)
128 */
129 if (af != AF_UNIX && cf_so_reuseport) {
130 #if defined(SO_REUSEPORT_LB)
131 int val = 1;
132 errpos = "setsockopt/SO_REUSEPORT_LB";
133 res = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT_LB, &val, sizeof(val));
134 if (res < 0)
135 goto failed;
136 #elif defined(SO_REUSEPORT)
137 int val = 1;
138 errpos = "setsockopt/SO_REUSEPORT";
139 res = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
140 if (res < 0)
141 goto failed;
142 #else
143 die("so_reuseport not supported on this platform");
144 #endif
145 }
146
147 /* bind it */
148 errpos = "bind";
149 res = bind(sock, sa, salen);
150 if (res < 0)
151 goto failed;
152
153 /* set common options */
154 errpos = "tune_socket";
155 if (!tune_socket(sock, (af == AF_UNIX)))
156 goto failed;
157
158 /* finally, accept connections */
159 errpos = "listen";
160 res = listen(sock, cf_listen_backlog);
161 if (res < 0)
162 goto failed;
163
164 errpos = "calloc";
165 ls = calloc(1, sizeof(*ls));
166 if (!ls)
167 goto failed;
168
169 list_init(&ls->node);
170 ls->fd = sock;
171 if (sa->sa_family == AF_UNIX) {
172 pga_set(&ls->addr, AF_UNIX, cf_listen_port);
173 } else {
174 pga_copy(&ls->addr, sa);
175 }
176
177 if (af == AF_UNIX) {
178 #ifndef WIN32
179 if (cf_unix_socket_dir[0] != '@') {
180 struct sockaddr_un *un = (struct sockaddr_un *)sa;
181 change_file_mode(un->sun_path, cf_unix_socket_mode, NULL, cf_unix_socket_group);
182 }
183 #endif
184 } else {
185 tune_accept(sock, cf_tcp_defer_accept);
186 }
187
188 log_info("listening on %s", sa2str(sa, buf, sizeof(buf)));
189 statlist_append(&sock_list, &ls->node);
190 return true;
191
192 failed:
193 log_warning("cannot listen on %s: %s(): %s",
194 sa2str(sa, buf, sizeof(buf)),
195 errpos, strerror(errno));
196 if (sock >= 0)
197 safe_close(sock);
198 return false;
199 }
200
create_unix_socket(const char * socket_dir,int listen_port)201 static void create_unix_socket(const char *socket_dir, int listen_port)
202 {
203 struct sockaddr_un un;
204 int addrlen;
205 int res;
206 char lockfile[sizeof(struct sockaddr_un) + 10];
207 struct stat st;
208
209 /* fill sockaddr struct */
210 memset(&un, 0, sizeof(un));
211 un.sun_family = AF_UNIX;
212 snprintf(un.sun_path, sizeof(un.sun_path),
213 "%s/.s.PGSQL.%d", socket_dir, listen_port);
214 if (socket_dir[0] == '@') {
215 /*
216 * By convention, for abstract Unix sockets, only the
217 * length of the string is the sockaddr length.
218 */
219 addrlen = offsetof(struct sockaddr_un, sun_path) + strlen(un.sun_path);
220 un.sun_path[0] = '\0';
221 }
222 else {
223 addrlen = sizeof(un);
224 }
225
226 if (socket_dir[0] != '@') {
227 /* check for lockfile */
228 snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
229 res = lstat(lockfile, &st);
230 if (res == 0)
231 die("unix port %d is in use", listen_port);
232
233 /* expect old bouncer gone */
234 unlink(un.sun_path);
235 }
236
237 add_listen(AF_UNIX, (const struct sockaddr *)&un, addrlen);
238 }
239
240 /*
241 * Notify pooler only when also data is arrived.
242 *
243 * optval specifies how long after connection attempt to wait for data.
244 *
245 * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
246 *
247 * SO_ACCEPTFILTER needs to be set after listen(), maybe TCP_DEFER_ACCEPT too.
248 */
tune_accept(int sock,bool on)249 static void tune_accept(int sock, bool on)
250 {
251 const char *act = on ? "install" : "uninstall";
252 int res = 0;
253 #ifdef TCP_DEFER_ACCEPT
254 int val = 45; /* FIXME: proper value */
255 socklen_t vlen = sizeof(val);
256 res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
257 log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
258 val = on ? 1 : 0;
259 log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
260 res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
261 #else
262 #if 0
263 #ifdef SO_ACCEPTFILTER
264 struct accept_filter_arg af, *afp = on ? &af : NULL;
265 socklen_t af_len = on ? sizeof(af) : 0;
266 memset(&af, 0, sizeof(af));
267 strcpy(af.af_name, "dataready");
268 log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
269 res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
270 #endif
271 #endif
272 #endif
273 if (res < 0)
274 log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
275 act, strerror(errno));
276 }
277
pooler_tune_accept(bool on)278 void pooler_tune_accept(bool on)
279 {
280 struct List *el;
281 struct ListenSocket *ls;
282 statlist_for_each(el, &sock_list) {
283 ls = container_of(el, struct ListenSocket, node);
284 if (!pga_is_unix(&ls->addr))
285 tune_accept(ls->fd, on);
286 }
287 }
288
err_wait_func(evutil_socket_t sock,short flags,void * arg)289 static void err_wait_func(evutil_socket_t sock, short flags, void *arg)
290 {
291 if (cf_pause_mode != P_SUSPEND)
292 resume_pooler();
293 }
294
addrpair(const PgAddr * src,const PgAddr * dst)295 static const char *addrpair(const PgAddr *src, const PgAddr *dst)
296 {
297 static char ip1buf[PGADDR_BUF], ip2buf[PGADDR_BUF],
298 buf[2*PGADDR_BUF + 16];
299 const char *ip1, *ip2;
300 if (pga_is_unix(src))
301 return "unix->unix";
302
303 ip1 = pga_ntop(src, ip1buf, sizeof(ip1buf));
304 ip2 = pga_ntop(src, ip2buf, sizeof(ip2buf));
305 snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
306 ip1, pga_port(src), ip2, pga_port(dst));
307 return buf;
308 }
309
conninfo(const PgSocket * sk)310 static const char *conninfo(const PgSocket *sk)
311 {
312 if (is_server_socket(sk)) {
313 return addrpair(&sk->local_addr, &sk->remote_addr);
314 } else {
315 return addrpair(&sk->remote_addr, &sk->local_addr);
316 }
317 }
318
319 /* got new connection, associate it with client struct */
pool_accept(evutil_socket_t sock,short flags,void * arg)320 static void pool_accept(evutil_socket_t sock, short flags, void *arg)
321 {
322 struct ListenSocket *ls = arg;
323 int fd;
324 PgSocket *client;
325 union {
326 struct sockaddr_in in;
327 struct sockaddr_in6 in6;
328 struct sockaddr_un un;
329 struct sockaddr sa;
330 } raddr;
331 socklen_t len = sizeof(raddr);
332 bool is_unix = pga_is_unix(&ls->addr);
333
334 if(!(flags & EV_READ)) {
335 log_warning("no EV_READ in pool_accept");
336 return;
337 }
338 loop:
339 /* get fd */
340 fd = safe_accept(sock, &raddr.sa, &len);
341 if (fd < 0) {
342 if (errno == EAGAIN)
343 return;
344 else if (errno == ECONNABORTED)
345 return;
346
347 /*
348 * probably fd limit, pointless to try often
349 * wait a bit, hope that admin resolves somehow
350 */
351 log_error("accept() failed: %s", strerror(errno));
352 evtimer_assign(&ev_err, pgb_event_base, err_wait_func, NULL);
353 safe_evtimer_add(&ev_err, &err_timeout);
354 suspend_pooler();
355 return;
356 }
357
358 log_noise("new fd from accept=%d", fd);
359 if (is_unix) {
360 client = accept_client(fd, true);
361 } else {
362 client = accept_client(fd, false);
363 }
364
365 if (client)
366 slog_debug(client, "P: got connection: %s", conninfo(client));
367
368 /*
369 * there may be several clients waiting,
370 * avoid context switch by looping
371 */
372 goto loop;
373 }
374
use_pooler_socket(int sock,bool is_unix)375 bool use_pooler_socket(int sock, bool is_unix)
376 {
377 struct ListenSocket *ls;
378 int res;
379 char buf[PGADDR_BUF];
380
381 if (!tune_socket(sock, is_unix))
382 return false;
383
384 ls = calloc(1, sizeof(*ls));
385 if (!ls)
386 return false;
387 ls->fd = sock;
388 if (is_unix) {
389 pga_set(&ls->addr, AF_UNIX, cf_listen_port);
390 } else {
391 struct sockaddr_storage ss;
392 socklen_t len = sizeof(ss);
393 res = getsockname(sock, (struct sockaddr *)&ss, &len);
394 if (res < 0) {
395 log_error("getsockname failed");
396 free(ls);
397 return false;
398 }
399 pga_copy(&ls->addr, (struct sockaddr *)&ss);
400 }
401 log_info("got pooler socket: %s", pga_str(&ls->addr, buf, sizeof(buf)));
402 statlist_append(&sock_list, &ls->node);
403 return true;
404 }
405
suspend_pooler(void)406 void suspend_pooler(void)
407 {
408 struct List *el;
409 struct ListenSocket *ls;
410
411 need_active = false;
412 statlist_for_each(el, &sock_list) {
413 ls = container_of(el, struct ListenSocket, node);
414 if (!ls->active)
415 continue;
416 if (event_del(&ls->ev) < 0) {
417 log_warning("suspend_pooler, event_del: %s", strerror(errno));
418 return;
419 }
420 ls->active = false;
421 }
422 pooler_active = false;
423 }
424
resume_pooler(void)425 void resume_pooler(void)
426 {
427 struct List *el;
428 struct ListenSocket *ls;
429
430 need_active = true;
431 statlist_for_each(el, &sock_list) {
432 ls = container_of(el, struct ListenSocket, node);
433 if (ls->active)
434 continue;
435 event_assign(&ls->ev, pgb_event_base, ls->fd, EV_READ | EV_PERSIST, pool_accept, ls);
436 if (event_add(&ls->ev, NULL) < 0) {
437 log_warning("event_add failed: %s", strerror(errno));
438 return;
439 }
440 ls->active = true;
441 }
442 pooler_active = true;
443 }
444
445 /* retry previously failed suspend_pooler() / resume_pooler() */
per_loop_pooler_maint(void)446 void per_loop_pooler_maint(void)
447 {
448 if (need_active && !pooler_active)
449 resume_pooler();
450 else if (!need_active && pooler_active)
451 suspend_pooler();
452 }
453
parse_addr(void * arg,const char * addr)454 static bool parse_addr(void *arg, const char *addr)
455 {
456 int res;
457 char service[64];
458 struct addrinfo *ai, *gaires = NULL;
459 bool ok;
460
461 if (!*addr)
462 return true;
463 if (strcmp(addr, "*") == 0)
464 addr = NULL;
465 snprintf(service, sizeof(service), "%d", cf_listen_port);
466
467 res = getaddrinfo(addr, service, &hints, &gaires);
468 if (res != 0) {
469 die("getaddrinfo('%s', '%d') = %s [%d]", addr ? addr : "*",
470 cf_listen_port, gai_strerror(res), res);
471 }
472
473 for (ai = gaires; ai; ai = ai->ai_next) {
474 ok = add_listen(ai->ai_family, ai->ai_addr, ai->ai_addrlen);
475 /* it's unclear whether all or only first result should be used */
476 if (0 && ok)
477 break;
478 }
479
480 freeaddrinfo(gaires);
481 return true;
482 }
483
484 /* listen on socket - should happen after all other initializations */
pooler_setup(void)485 void pooler_setup(void)
486 {
487 int n;
488
489 n = sd_listen_fds(0);
490 if (n > 0) {
491 if (cf_listen_addr && *cf_listen_addr)
492 log_warning("sockets passed from service manager, cf_listen_addr ignored");
493 if (cf_unix_socket_dir && *cf_unix_socket_dir && strcmp(cf_unix_socket_dir, DEFAULT_UNIX_SOCKET_DIR) != 0)
494 log_warning("sockets passed from service manager, cf_unix_socket_dir ignored");
495
496 for (int i = 0; i < n; i++) {
497 int fd = SD_LISTEN_FDS_START + i;
498 struct ListenSocket *ls;
499 bool ok = true;
500
501 ls = calloc(1, sizeof(*ls));
502 if (!ls)
503 die("out of memory");
504 list_init(&ls->node);
505 ls->fd = fd;
506 if (sd_is_socket(fd, AF_UNIX, 0, -1)) {
507 pga_set(&ls->addr, AF_UNIX, 0);
508 if (!tune_socket(fd, true))
509 ok = false;
510 } else if (sd_is_socket(fd, AF_INET, 0, -1)) {
511 pga_set(&ls->addr, AF_INET, 0);
512 if (!tune_socket(fd, false))
513 ok = false;
514 tune_accept(fd, cf_tcp_defer_accept);
515 } else if (sd_is_socket(fd, AF_INET6, 0, -1)) {
516 pga_set(&ls->addr, AF_INET6, 0);
517 if (!tune_socket(fd, false))
518 ok = false;
519 tune_accept(fd, cf_tcp_defer_accept);
520 }
521 if (!ok)
522 die("failed to set up socket passed from service manager (fd %d)", fd);
523 log_info("socket passed from service manager (fd %d)", fd);
524 statlist_append(&sock_list, &ls->node);
525 }
526 } else {
527 bool ok;
528 static bool init_done = false;
529
530 if (!init_done) {
531 /* remove socket on shutdown */
532 atexit(cleanup_sockets);
533 init_done = true;
534 }
535
536 ok = parse_word_list(cf_listen_addr, parse_addr, NULL);
537 if (!ok)
538 die("failed to parse listen_addr list: %s", cf_listen_addr);
539
540 if (cf_unix_socket_dir && *cf_unix_socket_dir)
541 create_unix_socket(cf_unix_socket_dir, cf_listen_port);
542 }
543
544 if (!statlist_count(&sock_list))
545 die("nowhere to listen on");
546
547 resume_pooler();
548 }
549
for_each_pooler_fd(pooler_cb cbfunc,void * arg)550 bool for_each_pooler_fd(pooler_cb cbfunc, void *arg)
551 {
552 struct List *el;
553 struct ListenSocket *ls;
554 bool ok;
555
556 statlist_for_each(el, &sock_list) {
557 ls = container_of(el, struct ListenSocket, node);
558 ok = cbfunc(arg, ls->fd, &ls->addr);
559 if (!ok)
560 return false;
561 }
562 return true;
563 }
564