1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 /*
20 * Herding objects between lists happens here.
21 */
22
23 #include "bouncer.h"
24 #include "scram.h"
25
26 /* those items will be allocated as needed, never freed */
27 STATLIST(user_list);
28 STATLIST(database_list);
29 STATLIST(pool_list);
30
31 /* All locally defined users (in auth_file) are kept here. */
32 struct AATree user_tree;
33
34 /*
35 * All PAM users are kept here. We need to differentiate two user
36 * lists to avoid user clashing for different authentication types,
37 * and because pam_user_tree is closer to PgDatabase.user_tree in
38 * logic.
39 */
40 struct AATree pam_user_tree;
41
42 /*
43 * client and server objects will be pre-allocated
44 * they are always in either active or free lists
45 * in addition to others.
46 */
47 STATLIST(login_client_list);
48
49 struct Slab *server_cache;
50 struct Slab *client_cache;
51 struct Slab *db_cache;
52 struct Slab *pool_cache;
53 struct Slab *user_cache;
54 struct Slab *iobuf_cache;
55
56 /*
57 * libevent may still report events when event_del()
58 * is called from somewhere else. So hide just freed
59 * PgSockets for one loop.
60 */
61 static STATLIST(justfree_client_list);
62 static STATLIST(justfree_server_list);
63
64 /* init autodb idle list */
65 STATLIST(autodatabase_idle_list);
66
67 /* fast way to get number of active clients */
get_active_client_count(void)68 int get_active_client_count(void)
69 {
70 return slab_active_count(client_cache);
71 }
72
73 /* fast way to get number of active servers */
get_active_server_count(void)74 int get_active_server_count(void)
75 {
76 return slab_active_count(server_cache);
77 }
78
construct_client(void * obj)79 static void construct_client(void *obj)
80 {
81 PgSocket *client = obj;
82
83 memset(client, 0, sizeof(PgSocket));
84 list_init(&client->head);
85 sbuf_init(&client->sbuf, client_proto);
86 client->state = CL_FREE;
87 }
88
construct_server(void * obj)89 static void construct_server(void *obj)
90 {
91 PgSocket *server = obj;
92
93 memset(server, 0, sizeof(PgSocket));
94 list_init(&server->head);
95 sbuf_init(&server->sbuf, server_proto);
96 server->state = SV_FREE;
97 }
98
99 /* compare string with PgUser->name, for usage with btree */
user_node_cmp(uintptr_t userptr,struct AANode * node)100 static int user_node_cmp(uintptr_t userptr, struct AANode *node)
101 {
102 const char *name = (const char *)userptr;
103 PgUser *user = container_of(node, PgUser, tree_node);
104 return strcmp(name, user->name);
105 }
106
107 /* destroy PgUser, for usage with btree */
user_node_release(struct AANode * node,void * arg)108 static void user_node_release(struct AANode *node, void *arg)
109 {
110 PgUser *user = container_of(node, PgUser, tree_node);
111 slab_free(user_cache, user);
112 }
113
114 /* initialization before config loading */
init_objects(void)115 void init_objects(void)
116 {
117 aatree_init(&user_tree, user_node_cmp, NULL);
118 aatree_init(&pam_user_tree, user_node_cmp, NULL);
119 user_cache = slab_create("user_cache", sizeof(PgUser), 0, NULL, USUAL_ALLOC);
120 db_cache = slab_create("db_cache", sizeof(PgDatabase), 0, NULL, USUAL_ALLOC);
121 pool_cache = slab_create("pool_cache", sizeof(PgPool), 0, NULL, USUAL_ALLOC);
122
123 if (!user_cache || !db_cache || !pool_cache)
124 fatal("cannot create initial caches");
125 }
126
do_iobuf_reset(void * arg)127 static void do_iobuf_reset(void *arg)
128 {
129 IOBuf *io = arg;
130 iobuf_reset(io);
131 }
132
133 /* initialization after config loading */
init_caches(void)134 void init_caches(void)
135 {
136 server_cache = slab_create("server_cache", sizeof(PgSocket), 0, construct_server, USUAL_ALLOC);
137 client_cache = slab_create("client_cache", sizeof(PgSocket), 0, construct_client, USUAL_ALLOC);
138 iobuf_cache = slab_create("iobuf_cache", IOBUF_SIZE, 0, do_iobuf_reset, USUAL_ALLOC);
139 }
140
141 /* state change means moving between lists */
change_client_state(PgSocket * client,SocketState newstate)142 void change_client_state(PgSocket *client, SocketState newstate)
143 {
144 PgPool *pool = client->pool;
145
146 /* remove from old location */
147 switch (client->state) {
148 case CL_FREE:
149 break;
150 case CL_JUSTFREE:
151 statlist_remove(&justfree_client_list, &client->head);
152 break;
153 case CL_LOGIN:
154 if (newstate == CL_WAITING)
155 newstate = CL_WAITING_LOGIN;
156 statlist_remove(&login_client_list, &client->head);
157 break;
158 case CL_WAITING_LOGIN:
159 if (newstate == CL_ACTIVE)
160 newstate = CL_LOGIN;
161 /* fallthrough */
162 case CL_WAITING:
163 statlist_remove(&pool->waiting_client_list, &client->head);
164 break;
165 case CL_ACTIVE:
166 statlist_remove(&pool->active_client_list, &client->head);
167 break;
168 case CL_CANCEL:
169 statlist_remove(&pool->cancel_req_list, &client->head);
170 break;
171 default:
172 fatal("bad cur client state: %d", client->state);
173 }
174
175 client->state = newstate;
176
177 /* put to new location */
178 switch (client->state) {
179 case CL_FREE:
180 varcache_clean(&client->vars);
181 slab_free(client_cache, client);
182 break;
183 case CL_JUSTFREE:
184 statlist_append(&justfree_client_list, &client->head);
185 break;
186 case CL_LOGIN:
187 statlist_append(&login_client_list, &client->head);
188 break;
189 case CL_WAITING:
190 case CL_WAITING_LOGIN:
191 client->wait_start = get_cached_time();
192 statlist_append(&pool->waiting_client_list, &client->head);
193 break;
194 case CL_ACTIVE:
195 statlist_append(&pool->active_client_list, &client->head);
196 break;
197 case CL_CANCEL:
198 statlist_append(&pool->cancel_req_list, &client->head);
199 break;
200 default:
201 fatal("bad new client state: %d", client->state);
202 }
203 }
204
205 /* state change means moving between lists */
change_server_state(PgSocket * server,SocketState newstate)206 void change_server_state(PgSocket *server, SocketState newstate)
207 {
208 PgPool *pool = server->pool;
209
210 /* remove from old location */
211 switch (server->state) {
212 case SV_FREE:
213 break;
214 case SV_JUSTFREE:
215 statlist_remove(&justfree_server_list, &server->head);
216 break;
217 case SV_LOGIN:
218 statlist_remove(&pool->new_server_list, &server->head);
219 break;
220 case SV_USED:
221 statlist_remove(&pool->used_server_list, &server->head);
222 break;
223 case SV_TESTED:
224 statlist_remove(&pool->tested_server_list, &server->head);
225 break;
226 case SV_IDLE:
227 statlist_remove(&pool->idle_server_list, &server->head);
228 break;
229 case SV_ACTIVE:
230 statlist_remove(&pool->active_server_list, &server->head);
231 break;
232 default:
233 fatal("bad old server state: %d", server->state);
234 }
235
236 server->state = newstate;
237
238 /* put to new location */
239 switch (server->state) {
240 case SV_FREE:
241 varcache_clean(&server->vars);
242 slab_free(server_cache, server);
243 break;
244 case SV_JUSTFREE:
245 statlist_append(&justfree_server_list, &server->head);
246 break;
247 case SV_LOGIN:
248 statlist_append(&pool->new_server_list, &server->head);
249 break;
250 case SV_USED:
251 /* use LIFO */
252 statlist_prepend(&pool->used_server_list, &server->head);
253 break;
254 case SV_TESTED:
255 statlist_append(&pool->tested_server_list, &server->head);
256 break;
257 case SV_IDLE:
258 if (server->close_needed || cf_server_round_robin) {
259 /* try to avoid immediate usage then */
260 statlist_append(&pool->idle_server_list, &server->head);
261 } else {
262 /* otherwise use LIFO */
263 statlist_prepend(&pool->idle_server_list, &server->head);
264 }
265 break;
266 case SV_ACTIVE:
267 statlist_append(&pool->active_server_list, &server->head);
268 break;
269 default:
270 fatal("bad server state: %d", server->state);
271 }
272 }
273
274 /* compare pool names, for use with put_in_order */
cmp_pool(struct List * i1,struct List * i2)275 static int cmp_pool(struct List *i1, struct List *i2)
276 {
277 PgPool *p1 = container_of(i1, PgPool, head);
278 PgPool *p2 = container_of(i2, PgPool, head);
279 if (p1->db != p2->db)
280 return strcmp(p1->db->name, p2->db->name);
281 if (p1->user != p2->user)
282 return strcmp(p1->user->name, p2->user->name);
283 return 0;
284 }
285
286 /* compare user names, for use with put_in_order */
cmp_user(struct List * i1,struct List * i2)287 static int cmp_user(struct List *i1, struct List *i2)
288 {
289 PgUser *u1 = container_of(i1, PgUser, head);
290 PgUser *u2 = container_of(i2, PgUser, head);
291 return strcmp(u1->name, u2->name);
292 }
293
294 /* compare db names, for use with put_in_order */
cmp_database(struct List * i1,struct List * i2)295 static int cmp_database(struct List *i1, struct List *i2)
296 {
297 PgDatabase *db1 = container_of(i1, PgDatabase, head);
298 PgDatabase *db2 = container_of(i2, PgDatabase, head);
299 return strcmp(db1->name, db2->name);
300 }
301
302 /* put elem into list in correct pos */
put_in_order(struct List * newitem,struct StatList * list,int (* cmpfn)(struct List *,struct List *))303 static void put_in_order(struct List *newitem, struct StatList *list,
304 int (*cmpfn)(struct List *, struct List *))
305 {
306 int res;
307 struct List *item;
308
309 statlist_for_each(item, list) {
310 res = cmpfn(item, newitem);
311 if (res == 0) {
312 fatal("put_in_order: found existing elem");
313 } else if (res > 0) {
314 statlist_put_before(list, newitem, item);
315 return;
316 }
317 }
318 statlist_append(list, newitem);
319 }
320
321 /* create new object if new, then return it */
add_database(const char * name)322 PgDatabase *add_database(const char *name)
323 {
324 PgDatabase *db = find_database(name);
325
326 /* create new object if needed */
327 if (db == NULL) {
328 db = slab_alloc(db_cache);
329 if (!db)
330 return NULL;
331
332 list_init(&db->head);
333 if (strlcpy(db->name, name, sizeof(db->name)) >= sizeof(db->name)) {
334 log_warning("too long db name: %s", name);
335 slab_free(db_cache, db);
336 return NULL;
337 }
338 aatree_init(&db->user_tree, user_node_cmp, user_node_release);
339 put_in_order(&db->head, &database_list, cmp_database);
340 }
341
342 return db;
343 }
344
345 /* register new auto database */
register_auto_database(const char * name)346 PgDatabase *register_auto_database(const char *name)
347 {
348 PgDatabase *db;
349
350 if (!cf_autodb_connstr)
351 return NULL;
352
353 if (!parse_database(NULL, name, cf_autodb_connstr))
354 return NULL;
355
356 db = find_database(name);
357 if (db) {
358 db->db_auto = true;
359 }
360
361 return db;
362 }
363
364 /* add or update client users */
add_user(const char * name,const char * passwd)365 PgUser *add_user(const char *name, const char *passwd)
366 {
367 PgUser *user = find_user(name);
368
369 if (user == NULL) {
370 user = slab_alloc(user_cache);
371 if (!user)
372 return NULL;
373
374 list_init(&user->head);
375 list_init(&user->pool_list);
376 safe_strcpy(user->name, name, sizeof(user->name));
377 put_in_order(&user->head, &user_list, cmp_user);
378
379 aatree_insert(&user_tree, (uintptr_t)user->name, &user->tree_node);
380 user->pool_mode = POOL_INHERIT;
381 }
382 safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
383 return user;
384 }
385
386 /* add or update db users */
add_db_user(PgDatabase * db,const char * name,const char * passwd)387 PgUser *add_db_user(PgDatabase *db, const char *name, const char *passwd)
388 {
389 PgUser *user = NULL;
390 struct AANode *node;
391
392 node = aatree_search(&db->user_tree, (uintptr_t)name);
393 user = node ? container_of(node, PgUser, tree_node) : NULL;
394
395 if (user == NULL) {
396 user = slab_alloc(user_cache);
397 if (!user)
398 return NULL;
399
400 list_init(&user->head);
401 list_init(&user->pool_list);
402 safe_strcpy(user->name, name, sizeof(user->name));
403
404 aatree_insert(&db->user_tree, (uintptr_t)user->name, &user->tree_node);
405 user->pool_mode = POOL_INHERIT;
406 }
407 safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
408 return user;
409 }
410
411 /* Add PAM user. The logic is same as in add_db_user */
add_pam_user(const char * name,const char * passwd)412 PgUser *add_pam_user(const char *name, const char *passwd)
413 {
414 PgUser *user = NULL;
415 struct AANode *node;
416
417 node = aatree_search(&pam_user_tree, (uintptr_t)name);
418 user = node ? container_of(node, PgUser, tree_node) : NULL;
419
420 if (user == NULL) {
421 user = slab_alloc(user_cache);
422 if (!user)
423 return NULL;
424
425 list_init(&user->head);
426 list_init(&user->pool_list);
427 safe_strcpy(user->name, name, sizeof(user->name));
428
429 aatree_insert(&pam_user_tree, (uintptr_t)user->name, &user->tree_node);
430 user->pool_mode = POOL_INHERIT;
431 }
432 if (passwd)
433 safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
434 return user;
435 }
436
437 /* create separate user object for storing server user info */
force_user(PgDatabase * db,const char * name,const char * passwd)438 PgUser *force_user(PgDatabase *db, const char *name, const char *passwd)
439 {
440 PgUser *user = db->forced_user;
441 if (!user) {
442 user = slab_alloc(user_cache);
443 if (!user)
444 return NULL;
445 list_init(&user->head);
446 list_init(&user->pool_list);
447 user->pool_mode = POOL_INHERIT;
448 }
449 safe_strcpy(user->name, name, sizeof(user->name));
450 safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
451 db->forced_user = user;
452 return user;
453 }
454
455 /* find an existing database */
find_database(const char * name)456 PgDatabase *find_database(const char *name)
457 {
458 struct List *item, *tmp;
459 PgDatabase *db;
460 statlist_for_each(item, &database_list) {
461 db = container_of(item, PgDatabase, head);
462 if (strcmp(db->name, name) == 0)
463 return db;
464 }
465 /* also trying to find in idle autodatabases list */
466 statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
467 db = container_of(item, PgDatabase, head);
468 if (strcmp(db->name, name) == 0) {
469 db->inactive_time = 0;
470 statlist_remove(&autodatabase_idle_list, &db->head);
471 put_in_order(&db->head, &database_list, cmp_database);
472 return db;
473 }
474 }
475 return NULL;
476 }
477
478 /* find existing user */
find_user(const char * name)479 PgUser *find_user(const char *name)
480 {
481 PgUser *user = NULL;
482 struct AANode *node;
483
484 node = aatree_search(&user_tree, (uintptr_t)name);
485 user = node ? container_of(node, PgUser, tree_node) : NULL;
486 return user;
487 }
488
489 /* create new pool object */
new_pool(PgDatabase * db,PgUser * user)490 static PgPool *new_pool(PgDatabase *db, PgUser *user)
491 {
492 PgPool *pool;
493
494 pool = slab_alloc(pool_cache);
495 if (!pool)
496 return NULL;
497
498 list_init(&pool->head);
499 list_init(&pool->map_head);
500
501 pool->user = user;
502 pool->db = db;
503
504 statlist_init(&pool->active_client_list, "active_client_list");
505 statlist_init(&pool->waiting_client_list, "waiting_client_list");
506 statlist_init(&pool->active_server_list, "active_server_list");
507 statlist_init(&pool->idle_server_list, "idle_server_list");
508 statlist_init(&pool->tested_server_list, "tested_server_list");
509 statlist_init(&pool->used_server_list, "used_server_list");
510 statlist_init(&pool->new_server_list, "new_server_list");
511 statlist_init(&pool->cancel_req_list, "cancel_req_list");
512
513 list_append(&user->pool_list, &pool->map_head);
514
515 /* keep pools in db/user order to make stats faster */
516 put_in_order(&pool->head, &pool_list, cmp_pool);
517
518 return pool;
519 }
520
521 /* find pool object, create if needed */
get_pool(PgDatabase * db,PgUser * user)522 PgPool *get_pool(PgDatabase *db, PgUser *user)
523 {
524 struct List *item;
525 PgPool *pool;
526
527 if (!db || !user)
528 return NULL;
529
530 list_for_each(item, &user->pool_list) {
531 pool = container_of(item, PgPool, map_head);
532 if (pool->db == db)
533 return pool;
534 }
535
536 return new_pool(db, user);
537 }
538
539 /* deactivate socket and put into wait queue */
pause_client(PgSocket * client)540 static void pause_client(PgSocket *client)
541 {
542 Assert(client->state == CL_ACTIVE || client->state == CL_LOGIN);
543
544 slog_debug(client, "pause_client");
545 change_client_state(client, CL_WAITING);
546 if (!sbuf_pause(&client->sbuf))
547 disconnect_client(client, true, "pause failed");
548 }
549
550 /* wake client from wait */
activate_client(PgSocket * client)551 void activate_client(PgSocket *client)
552 {
553 Assert(client->state == CL_WAITING || client->state == CL_WAITING_LOGIN);
554
555 Assert(client->wait_start > 0);
556
557 /* acount for time client spent waiting for server */
558 client->pool->stats.wait_time += (get_cached_time() - client->wait_start);
559
560 slog_debug(client, "activate_client");
561 change_client_state(client, CL_ACTIVE);
562 sbuf_continue(&client->sbuf);
563 }
564
565 /*
566 * Don't let clients queue at all if there is no working server connection.
567 *
568 * It must still allow following cases:
569 * - empty pool on startup
570 * - idle pool where all servers are removed
571 *
572 * Current assumptions:
573 * - old server connections will be dropped by query_timeout
574 * - new server connections fail due to server_connect_timeout, or other failure
575 *
576 * So here we drop client if all server connections have been dropped
577 * and new ones fail.
578 *
579 * Return true if the client connection should be allowed, false if it
580 * should be rejected.
581 */
check_fast_fail(PgSocket * client)582 bool check_fast_fail(PgSocket *client)
583 {
584 int cnt;
585 PgPool *pool = client->pool;
586
587 /* Could be mock authentication, proceed normally */
588 if (!pool)
589 return true;
590
591 /* If last login succeeded, client can go ahead. */
592 if (!pool->last_login_failed)
593 return true;
594
595 /* If there are servers available, client can go ahead. */
596 cnt = pool_server_count(pool) - statlist_count(&pool->new_server_list);
597 if (cnt)
598 return true;
599
600 /* Else we fail the client. */
601 disconnect_client(client, true, "pgbouncer cannot connect to server");
602
603 /*
604 * Try to launch a new connection. (launch_new_connection()
605 * will check for server_login_retry etc.) The usual relaunch
606 * from janitor.c won't do anything, as there are no waiting
607 * clients, so we need to do it here to get any new servers
608 * eventually.
609 */
610 launch_new_connection(pool);
611
612 return false;
613 }
614
615 /* link if found, otherwise put into wait queue */
find_server(PgSocket * client)616 bool find_server(PgSocket *client)
617 {
618 PgPool *pool = client->pool;
619 PgSocket *server;
620 bool res;
621 bool varchange = false;
622
623 Assert(client->state == CL_ACTIVE || client->state == CL_LOGIN);
624
625 /* no wait by default */
626 client->wait_start = 0;
627
628 if (client->link)
629 return true;
630
631 /* try to get idle server, if allowed */
632 if (cf_pause_mode == P_PAUSE || pool->db->db_paused) {
633 server = NULL;
634 } else {
635 while (1) {
636 server = first_socket(&pool->idle_server_list);
637 if (!server) {
638 break;
639 } else if (server->close_needed) {
640 disconnect_server(server, true, "obsolete connection");
641 } else if (!server->ready) {
642 disconnect_server(server, true, "idle server got dirty");
643 } else {
644 break;
645 }
646 }
647
648 if (!server && !check_fast_fail(client))
649 return false;
650
651 }
652 Assert(!server || server->state == SV_IDLE);
653
654 /* send var changes */
655 if (server) {
656 res = varcache_apply(server, client, &varchange);
657 if (!res) {
658 disconnect_server(server, true, "var change failed");
659 server = NULL;
660 }
661 }
662
663 /* link or send to waiters list */
664 if (server) {
665 client->link = server;
666 server->link = client;
667 change_server_state(server, SV_ACTIVE);
668 if (varchange) {
669 server->setting_vars = true;
670 server->ready = false;
671 res = false; /* don't process client data yet */
672 if (!sbuf_pause(&client->sbuf))
673 disconnect_client(client, true, "pause failed");
674 } else {
675 res = true;
676 }
677 } else {
678 pause_client(client);
679 res = false;
680 }
681 return res;
682 }
683
684 /* pick waiting client */
reuse_on_release(PgSocket * server)685 static bool reuse_on_release(PgSocket *server)
686 {
687 bool res = true;
688 PgPool *pool = server->pool;
689 PgSocket *client = first_socket(&pool->waiting_client_list);
690 if (client) {
691 activate_client(client);
692
693 /*
694 * As the activate_client() does full read loop,
695 * then it may happen that linked client closing
696 * causes server closing. Report it.
697 */
698 if (server->state == SV_FREE || server->state == SV_JUSTFREE)
699 res = false;
700 }
701 return res;
702 }
703
704 /* send reset query */
reset_on_release(PgSocket * server)705 static bool reset_on_release(PgSocket *server)
706 {
707 bool res;
708
709 Assert(server->state == SV_TESTED);
710
711 slog_debug(server, "resetting: %s", cf_server_reset_query);
712 SEND_generic(res, server, 'Q', "s", cf_server_reset_query);
713 if (!res)
714 disconnect_server(server, false, "reset query failed");
715 return res;
716 }
717
life_over(PgSocket * server)718 static bool life_over(PgSocket *server)
719 {
720 PgPool *pool = server->pool;
721 usec_t lifetime_kill_gap = 0;
722 usec_t now = get_cached_time();
723 usec_t age = now - server->connect_time;
724 usec_t last_kill = now - pool->last_lifetime_disconnect;
725
726 if (age < cf_server_lifetime)
727 return false;
728
729 if (pool_pool_size(pool) > 0)
730 lifetime_kill_gap = cf_server_lifetime / pool_pool_size(pool);
731
732 if (last_kill >= lifetime_kill_gap)
733 return true;
734
735 return false;
736 }
737
738 /* connecting/active -> idle, unlink if needed */
release_server(PgSocket * server)739 bool release_server(PgSocket *server)
740 {
741 PgPool *pool = server->pool;
742 SocketState newstate = SV_IDLE;
743
744 Assert(server->ready);
745
746 /* remove from old list */
747 switch (server->state) {
748 case SV_ACTIVE:
749 server->link->link = NULL;
750 server->link = NULL;
751
752 if (*cf_server_reset_query && (cf_server_reset_query_always ||
753 pool_pool_mode(pool) == POOL_SESSION))
754 {
755 /* notify reset is required */
756 newstate = SV_TESTED;
757 } else if (cf_server_check_delay == 0 && *cf_server_check_query) {
758 /*
759 * deprecated: before reset_query, the check_delay = 0
760 * was used to get same effect. This if() can be removed
761 * after couple of releases.
762 */
763 newstate = SV_USED;
764 }
765 case SV_USED:
766 case SV_TESTED:
767 break;
768 case SV_LOGIN:
769 pool->last_login_failed = false;
770 pool->last_connect_failed = false;
771 break;
772 default:
773 fatal("bad server state: %d", server->state);
774 }
775
776 /* enforce lifetime immediately on release */
777 if (server->state != SV_LOGIN && life_over(server)) {
778 disconnect_server(server, true, "server_lifetime");
779 pool->last_lifetime_disconnect = get_cached_time();
780 return false;
781 }
782
783 /* enforce close request */
784 if (server->close_needed) {
785 disconnect_server(server, true, "close_needed");
786 return false;
787 }
788
789 Assert(server->link == NULL);
790 slog_noise(server, "release_server: new state=%d", newstate);
791 change_server_state(server, newstate);
792
793 if (newstate == SV_IDLE) {
794 /* immediately process waiters, to give fair chance */
795 return reuse_on_release(server);
796 } else if (newstate == SV_TESTED) {
797 return reset_on_release(server);
798 }
799
800 return true;
801 }
802
803 /* drop server connection */
disconnect_server(PgSocket * server,bool notify,const char * reason,...)804 void disconnect_server(PgSocket *server, bool notify, const char *reason, ...)
805 {
806 PgPool *pool = server->pool;
807 PgSocket *client;
808 static const uint8_t pkt_term[] = {'X', 0,0,0,4};
809 bool send_term = true;
810 usec_t now = get_cached_time();
811 char buf[128];
812 va_list ap;
813
814 va_start(ap, reason);
815 vsnprintf(buf, sizeof(buf), reason, ap);
816 va_end(ap);
817 reason = buf;
818
819 if (cf_log_disconnections)
820 slog_info(server, "closing because: %s (age=%" PRIu64 "s)", reason,
821 (now - server->connect_time) / USEC);
822
823 switch (server->state) {
824 case SV_ACTIVE:
825 client = server->link;
826 if (client) {
827 client->link = NULL;
828 server->link = NULL;
829 disconnect_client(client, true, "%s", reason);
830 }
831 break;
832 case SV_TESTED:
833 case SV_USED:
834 case SV_IDLE:
835 break;
836 case SV_LOGIN:
837 /*
838 * usually disconnect means problems in startup phase,
839 * except when sending cancel packet
840 */
841 if (!server->ready)
842 {
843 pool->last_login_failed = true;
844 pool->last_connect_failed = true;
845 }
846 else
847 {
848 /*
849 * We did manage to connect and used the connection for query
850 * cancellation, so to the best of our knowledge we can connect to
851 * the server, reset last_connect_failed accordingly.
852 */
853 pool->last_connect_failed = false;
854 send_term = false;
855 }
856 break;
857 default:
858 fatal("bad server state: %d", server->state);
859 }
860
861 Assert(server->link == NULL);
862
863 /* notify server and close connection */
864 if (send_term && notify) {
865 bool _ignore = sbuf_answer(&server->sbuf, pkt_term, sizeof(pkt_term));
866 (void) _ignore;
867 }
868
869 if (server->dns_token) {
870 adns_cancel(adns, server->dns_token);
871 server->dns_token = NULL;
872 }
873
874 free_scram_state(&server->scram_state);
875
876 server->pool->db->connection_count--;
877 server->pool->user->connection_count--;
878
879 change_server_state(server, SV_JUSTFREE);
880 if (!sbuf_close(&server->sbuf))
881 log_noise("sbuf_close failed, retry later");
882 }
883
884 /* drop client connection */
disconnect_client(PgSocket * client,bool notify,const char * reason,...)885 void disconnect_client(PgSocket *client, bool notify, const char *reason, ...)
886 {
887 usec_t now = get_cached_time();
888
889 if (reason) {
890 char buf[128];
891 va_list ap;
892
893 va_start(ap, reason);
894 vsnprintf(buf, sizeof(buf), reason, ap);
895 va_end(ap);
896 reason = buf;
897 }
898
899 if (cf_log_disconnections && reason)
900 slog_info(client, "closing because: %s (age=%" PRIu64 "s)", reason,
901 (now - client->connect_time) / USEC);
902
903 switch (client->state) {
904 case CL_ACTIVE:
905 case CL_LOGIN:
906 if (client->link) {
907 PgSocket *server = client->link;
908 if (!server->ready) {
909 server->link = NULL;
910 client->link = NULL;
911 /*
912 * This can happen if the client
913 * connection is normally closed while
914 * the server has a transaction block
915 * open. Then there is no way for us
916 * to reset the server other than by
917 * closing it. Perhaps it would be
918 * worth tracking this separately to
919 * make the error message more
920 * precise and less scary.
921 */
922 disconnect_server(server, true, "client disconnect while server was not ready");
923 } else if (!sbuf_is_empty(&server->sbuf)) {
924 /* ->ready may be set before all is sent */
925 server->link = NULL;
926 client->link = NULL;
927 disconnect_server(server, true, "client disconnect before everything was sent to the server");
928 } else {
929 /* retval does not matter here */
930 release_server(server);
931 }
932 }
933 case CL_WAITING:
934 case CL_WAITING_LOGIN:
935 case CL_CANCEL:
936 break;
937 default:
938 fatal("bad client state: %d", client->state);
939 }
940
941 /* send reason to client */
942 if (notify && reason && client->state != CL_CANCEL) {
943 /*
944 * don't send Ready pkt here, or client won't notice
945 * closed connection
946 */
947 send_pooler_error(client, false, true, reason);
948 }
949
950 free_scram_state(&client->scram_state);
951 if (client->login_user && client->login_user->mock_auth) {
952 free(client->login_user);
953 client->login_user = NULL;
954 }
955
956 change_client_state(client, CL_JUSTFREE);
957 if (!sbuf_close(&client->sbuf))
958 log_noise("sbuf_close failed, retry later");
959 }
960
961 /*
962 * Connection creation utilities
963 */
964
connect_server(struct PgSocket * server,const struct sockaddr * sa,int salen)965 static void connect_server(struct PgSocket *server, const struct sockaddr *sa, int salen)
966 {
967 bool res;
968
969 /* fill remote_addr */
970 memset(&server->remote_addr, 0, sizeof(server->remote_addr));
971 if (sa->sa_family == AF_UNIX) {
972 pga_set(&server->remote_addr, AF_UNIX, server->pool->db->port);
973 } else {
974 pga_copy(&server->remote_addr, sa);
975 }
976
977 slog_debug(server, "launching new connection to server");
978
979 /* start connecting */
980 res = sbuf_connect(&server->sbuf, sa, salen,
981 cf_server_connect_timeout / USEC);
982 if (!res)
983 log_noise("failed to launch new connection");
984 }
985
dns_callback(void * arg,const struct sockaddr * sa,int salen)986 static void dns_callback(void *arg, const struct sockaddr *sa, int salen)
987 {
988 struct PgSocket *server = arg;
989 struct PgDatabase *db = server->pool->db;
990 struct sockaddr_in sa_in;
991 struct sockaddr_in6 sa_in6;
992
993 server->dns_token = NULL;
994
995 if (!sa) {
996 disconnect_server(server, true, "server DNS lookup failed");
997 return;
998 } else if (sa->sa_family == AF_INET) {
999 char buf[64];
1000 memcpy(&sa_in, sa, sizeof(sa_in));
1001 sa_in.sin_port = htons(db->port);
1002 sa = (struct sockaddr *)&sa_in;
1003 salen = sizeof(sa_in);
1004 slog_debug(server, "dns_callback: inet4: %s",
1005 sa2str(sa, buf, sizeof(buf)));
1006 } else if (sa->sa_family == AF_INET6) {
1007 char buf[64];
1008 memcpy(&sa_in6, sa, sizeof(sa_in6));
1009 sa_in6.sin6_port = htons(db->port);
1010 sa = (struct sockaddr *)&sa_in6;
1011 salen = sizeof(sa_in6);
1012 slog_debug(server, "dns_callback: inet6: %s",
1013 sa2str(sa, buf, sizeof(buf)));
1014 } else {
1015 disconnect_server(server, true, "unknown address family: %d", sa->sa_family);
1016 return;
1017 }
1018
1019 connect_server(server, sa, salen);
1020 }
1021
dns_connect(struct PgSocket * server)1022 static void dns_connect(struct PgSocket *server)
1023 {
1024 struct sockaddr_un sa_un;
1025 struct sockaddr_in sa_in;
1026 struct sockaddr_in6 sa_in6;
1027 struct sockaddr *sa;
1028 struct PgDatabase *db = server->pool->db;
1029 const char *host = db->host;
1030 const char *unix_dir;
1031 int sa_len;
1032 int res;
1033
1034 if (!host || host[0] == '/' || host[0] == '@') {
1035 memset(&sa_un, 0, sizeof(sa_un));
1036 sa_un.sun_family = AF_UNIX;
1037 unix_dir = host ? host : cf_unix_socket_dir;
1038 if (!unix_dir || !*unix_dir) {
1039 log_error("unix socket dir not configured: %s", db->name);
1040 disconnect_server(server, false, "cannot connect");
1041 return;
1042 }
1043 snprintf(sa_un.sun_path, sizeof(sa_un.sun_path),
1044 "%s/.s.PGSQL.%d", unix_dir, db->port);
1045 slog_noise(server, "unix socket: %s", sa_un.sun_path);
1046 if (unix_dir[0] == '@') {
1047 /*
1048 * By convention, for abstract Unix sockets,
1049 * only the length of the string is the
1050 * sockaddr length.
1051 */
1052 sa_len = offsetof(struct sockaddr_un, sun_path) + strlen(sa_un.sun_path);
1053 sa_un.sun_path[0] = '\0';
1054 }
1055 else {
1056 sa_len = sizeof(sa_un);
1057 }
1058 sa = (struct sockaddr *)&sa_un;
1059 res = 1;
1060 } else if (strchr(host, ':')) { /* assume IPv6 address on any : in addr */
1061 slog_noise(server, "inet6 socket: %s", db->host);
1062 memset(&sa_in6, 0, sizeof(sa_in6));
1063 sa_in6.sin6_family = AF_INET6;
1064 res = inet_pton(AF_INET6, db->host, &sa_in6.sin6_addr);
1065 sa_in6.sin6_port = htons(db->port);
1066 sa = (struct sockaddr *)&sa_in6;
1067 sa_len = sizeof(sa_in6);
1068 } else { /* else try IPv4 */
1069 slog_noise(server, "inet socket: %s", db->host);
1070 memset(&sa_in, 0, sizeof(sa_in));
1071 sa_in.sin_family = AF_INET;
1072 res = inet_pton(AF_INET, db->host, &sa_in.sin_addr);
1073 sa_in.sin_port = htons(db->port);
1074 sa = (struct sockaddr *)&sa_in;
1075 sa_len = sizeof(sa_in);
1076 }
1077
1078 /* if simple parse failed, use DNS */
1079 if (res != 1) {
1080 struct DNSToken *tk;
1081 slog_noise(server, "dns socket: %s", db->host);
1082 /* launch dns lookup */
1083 tk = adns_resolve(adns, db->host, dns_callback, server);
1084 if (tk)
1085 server->dns_token = tk;
1086 return;
1087 }
1088
1089 connect_server(server, sa, sa_len);
1090 }
1091
compare_connections_by_time(PgSocket * lhs,PgSocket * rhs)1092 PgSocket *compare_connections_by_time(PgSocket *lhs, PgSocket *rhs)
1093 {
1094 if (!lhs)
1095 return rhs;
1096 if (!rhs)
1097 return lhs;
1098 return lhs->request_time < rhs->request_time ? lhs : rhs;
1099 }
1100
1101 /* evict the single most idle connection from among all pools to make room in the db */
evict_connection(PgDatabase * db)1102 bool evict_connection(PgDatabase *db)
1103 {
1104 struct List *item;
1105 PgPool *pool;
1106 PgSocket *oldest_connection = NULL;
1107
1108 statlist_for_each(item, &pool_list) {
1109 pool = container_of(item, PgPool, head);
1110 if (pool->db != db)
1111 continue;
1112 oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->idle_server_list));
1113 /* only evict testing connections if nobody's waiting */
1114 if (statlist_empty(&pool->waiting_client_list)) {
1115 oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->used_server_list));
1116 oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->tested_server_list));
1117 }
1118 }
1119
1120 if (oldest_connection) {
1121 disconnect_server(oldest_connection, true, "evicted");
1122 return true;
1123 }
1124 return false;
1125 }
1126
1127 /* evict the single most idle connection from among all pools to make room in the user */
evict_user_connection(PgUser * user)1128 bool evict_user_connection(PgUser *user)
1129 {
1130 struct List *item;
1131 PgPool *pool;
1132 PgSocket *oldest_connection = NULL;
1133
1134 statlist_for_each(item, &pool_list) {
1135 pool = container_of(item, PgPool, head);
1136 if (pool->user != user)
1137 continue;
1138 oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->idle_server_list));
1139 /* only evict testing connections if nobody's waiting */
1140 if (statlist_empty(&pool->waiting_client_list)) {
1141 oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->used_server_list));
1142 oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->tested_server_list));
1143 }
1144 }
1145
1146 if (oldest_connection) {
1147 disconnect_server(oldest_connection, true, "evicted");
1148 return true;
1149 }
1150 return false;
1151 }
1152
1153 /* the pool needs new connection, if possible */
launch_new_connection(PgPool * pool)1154 void launch_new_connection(PgPool *pool)
1155 {
1156 PgSocket *server;
1157 int max;
1158
1159 /* allow only small number of connection attempts at a time */
1160 if (!statlist_empty(&pool->new_server_list)) {
1161 log_debug("launch_new_connection: already progress");
1162 return;
1163 }
1164
1165 /* if server bounces, don't retry too fast */
1166 if (pool->last_connect_failed) {
1167 usec_t now = get_cached_time();
1168 if (now - pool->last_connect_time < cf_server_login_retry) {
1169 log_debug("launch_new_connection: last failed, not launching new connection yet, still waiting %" PRIu64 " s",
1170 (cf_server_login_retry - (now - pool->last_connect_time)) / USEC);
1171 return;
1172 }
1173 }
1174
1175 max = pool_server_count(pool);
1176
1177 /* when a cancel request is queued allow connections up to twice the pool size */
1178 if (!statlist_empty(&pool->cancel_req_list) && max < (2 * pool_pool_size(pool))) {
1179 log_debug("launch_new_connection: bypass pool limitations for cancel request");
1180 goto force_new;
1181 }
1182
1183 /* is it allowed to add servers? */
1184 if (max >= pool_pool_size(pool) && pool->welcome_msg_ready) {
1185 /* should we use reserve pool? */
1186 if (cf_res_pool_timeout && pool_res_pool_size(pool)) {
1187 usec_t now = get_cached_time();
1188 PgSocket *c = first_socket(&pool->waiting_client_list);
1189 if (c && (now - c->request_time) >= cf_res_pool_timeout) {
1190 if (max < pool_pool_size(pool) + pool_res_pool_size(pool)) {
1191 slog_warning(c, "taking connection from reserve_pool");
1192 goto allow_new;
1193 }
1194 }
1195 }
1196 log_debug("launch_new_connection: pool full (%d >= %d)",
1197 max, pool_pool_size(pool));
1198 return;
1199 }
1200
1201 allow_new:
1202 max = database_max_connections(pool->db);
1203 if (max > 0) {
1204 /* try to evict unused connections first */
1205 while (pool->db->connection_count >= max) {
1206 if (!evict_connection(pool->db)) {
1207 break;
1208 }
1209 }
1210 if (pool->db->connection_count >= max) {
1211 log_debug("launch_new_connection: database '%s' full (%d >= %d)",
1212 pool->db->name, pool->db->connection_count, max);
1213 return;
1214 }
1215 }
1216
1217 max = user_max_connections(pool->user);
1218 if (max > 0) {
1219 /* try to evict unused connection first */
1220 while (pool->user->connection_count >= max) {
1221 if (!evict_user_connection(pool->user)) {
1222 break;
1223 }
1224 }
1225 if (pool->user->connection_count >= max) {
1226 log_debug("launch_new_connection: user '%s' full (%d >= %d)",
1227 pool->user->name, pool->user->connection_count, max);
1228 return;
1229 }
1230 }
1231
1232 force_new:
1233 /* get free conn object */
1234 server = slab_alloc(server_cache);
1235 if (!server) {
1236 log_debug("launch_new_connection: no memory");
1237 return;
1238 }
1239
1240 /* initialize it */
1241 server->pool = pool;
1242 server->login_user = server->pool->user;
1243 server->connect_time = get_cached_time();
1244 pool->last_connect_time = get_cached_time();
1245 change_server_state(server, SV_LOGIN);
1246 pool->db->connection_count++;
1247 pool->user->connection_count++;
1248
1249 dns_connect(server);
1250 }
1251
1252 /* new client connection attempt */
accept_client(int sock,bool is_unix)1253 PgSocket *accept_client(int sock, bool is_unix)
1254 {
1255 bool res;
1256 PgSocket *client;
1257
1258 /* get free PgSocket */
1259 client = slab_alloc(client_cache);
1260 if (!client) {
1261 log_warning("cannot allocate client struct");
1262 safe_close(sock);
1263 return NULL;
1264 }
1265
1266 client->connect_time = client->request_time = get_cached_time();
1267 client->query_start = 0;
1268
1269 /* FIXME: take local and remote address from pool_accept() */
1270 fill_remote_addr(client, sock, is_unix);
1271 fill_local_addr(client, sock, is_unix);
1272
1273 change_client_state(client, CL_LOGIN);
1274
1275 res = sbuf_accept(&client->sbuf, sock, is_unix);
1276 if (!res) {
1277 if (cf_log_connections)
1278 slog_debug(client, "failed connection attempt");
1279 return NULL;
1280 }
1281
1282 return client;
1283 }
1284
1285 /* send cached parameters to client to pretend being server */
1286 /* client managed to authenticate, send welcome msg and accept queries */
finish_client_login(PgSocket * client)1287 bool finish_client_login(PgSocket *client)
1288 {
1289 switch (client->state) {
1290 case CL_LOGIN:
1291 change_client_state(client, CL_ACTIVE);
1292 case CL_ACTIVE:
1293 break;
1294 default:
1295 fatal("bad client state: %d", client->state);
1296 }
1297
1298 client->wait_for_auth = false;
1299
1300 /* check if we know server signature */
1301 if (!client->pool->welcome_msg_ready) {
1302 log_debug("finish_client_login: no welcome message, pause");
1303 client->wait_for_welcome = true;
1304 pause_client(client);
1305 if (cf_pause_mode == P_NONE)
1306 launch_new_connection(client->pool);
1307 return false;
1308 }
1309 client->wait_for_welcome = false;
1310
1311 /* send the message */
1312 if (!welcome_client(client))
1313 return false;
1314
1315 slog_debug(client, "logged in");
1316
1317 return true;
1318 }
1319
1320 /* client->cancel_key has requested client key */
accept_cancel_request(PgSocket * req)1321 void accept_cancel_request(PgSocket *req)
1322 {
1323 struct List *pitem, *citem;
1324 PgPool *pool = NULL;
1325 PgSocket *server = NULL, *client, *main_client = NULL;
1326
1327 Assert(req->state == CL_LOGIN);
1328
1329 /* find real client this is for */
1330 statlist_for_each(pitem, &pool_list) {
1331 pool = container_of(pitem, PgPool, head);
1332 statlist_for_each(citem, &pool->active_client_list) {
1333 client = container_of(citem, PgSocket, head);
1334 if (memcmp(client->cancel_key, req->cancel_key, 8) == 0) {
1335 main_client = client;
1336 goto found;
1337 }
1338 }
1339 statlist_for_each(citem, &pool->waiting_client_list) {
1340 client = container_of(citem, PgSocket, head);
1341 if (memcmp(client->cancel_key, req->cancel_key, 8) == 0) {
1342 main_client = client;
1343 goto found;
1344 }
1345 }
1346 }
1347 found:
1348
1349 /* wrong key */
1350 if (!main_client) {
1351 disconnect_client(req, false, "failed cancel request");
1352 return;
1353 }
1354
1355 /* not linked client, just drop it then */
1356 if (!main_client->link) {
1357 /* let administrative cancel be handled elsewhere */
1358 if (main_client->pool->db->admin) {
1359 disconnect_client(req, false, "cancel request for console client");
1360 admin_handle_cancel(main_client);
1361 return;
1362 }
1363
1364 disconnect_client(req, false, "cancel request for idle client");
1365
1366 return;
1367 }
1368
1369 /* drop the connection, if fails, retry later in justfree list */
1370 if (!sbuf_close(&req->sbuf))
1371 log_noise("sbuf_close failed, retry later");
1372
1373 /* remember server key */
1374 server = main_client->link;
1375 memcpy(req->cancel_key, server->cancel_key, 8);
1376
1377 /* attach to target pool */
1378 req->pool = pool;
1379 change_client_state(req, CL_CANCEL);
1380
1381 /* need fresh connection */
1382 launch_new_connection(pool);
1383 }
1384
forward_cancel_request(PgSocket * server)1385 void forward_cancel_request(PgSocket *server)
1386 {
1387 bool res;
1388 PgSocket *req = first_socket(&server->pool->cancel_req_list);
1389
1390 Assert(req != NULL && req->state == CL_CANCEL);
1391 Assert(server->state == SV_LOGIN);
1392
1393 SEND_CancelRequest(res, server, req->cancel_key);
1394 if (!res)
1395 log_warning("sending cancel request failed: %s", strerror(errno));
1396
1397 change_client_state(req, CL_JUSTFREE);
1398 }
1399
use_client_socket(int fd,PgAddr * addr,const char * dbname,const char * username,uint64_t ckey,int oldfd,int linkfd,const char * client_enc,const char * std_string,const char * datestyle,const char * timezone,const char * password,const char * scram_client_key,int scram_client_key_len,const char * scram_server_key,int scram_server_key_len)1400 bool use_client_socket(int fd, PgAddr *addr,
1401 const char *dbname, const char *username,
1402 uint64_t ckey, int oldfd, int linkfd,
1403 const char *client_enc, const char *std_string,
1404 const char *datestyle, const char *timezone,
1405 const char *password,
1406 const char *scram_client_key, int scram_client_key_len,
1407 const char *scram_server_key, int scram_server_key_len)
1408 {
1409 PgDatabase *db = find_database(dbname);
1410 PgSocket *client;
1411 PktBuf tmp;
1412
1413 /* if the database not found, it's an auto database -> registering... */
1414 if (!db) {
1415 db = register_auto_database(dbname);
1416 if (!db)
1417 return true;
1418 }
1419
1420 if (scram_client_key || scram_server_key) {
1421 PgUser *user;
1422
1423 if (!scram_client_key || !scram_server_key) {
1424 log_error("incomplete SCRAM key data");
1425 return false;
1426 }
1427 if (sizeof(user->scram_ClientKey) != scram_client_key_len
1428 || sizeof(user->scram_ServerKey) != scram_server_key_len) {
1429 log_error("incompatible SCRAM key data");
1430 return false;
1431 }
1432 if (db->forced_user) {
1433 log_error("SCRAM key data received for forced user");
1434 return false;
1435 }
1436 if (cf_auth_type == AUTH_PAM) {
1437 log_error("SCRAM key data received for PAM user");
1438 return false;
1439 }
1440 user = find_user(username);
1441 if (!user && db->auth_user)
1442 user = add_db_user(db, username, password);
1443
1444 if (!user)
1445 return false;
1446
1447 memcpy(user->scram_ClientKey, scram_client_key, sizeof(user->scram_ClientKey));
1448 memcpy(user->scram_ServerKey, scram_server_key, sizeof(user->scram_ServerKey));
1449 user->has_scram_keys = true;
1450 }
1451
1452 client = accept_client(fd, pga_is_unix(addr));
1453 if (client == NULL)
1454 return false;
1455 client->suspended = true;
1456
1457 if (!set_pool(client, dbname, username, password, true))
1458 return false;
1459
1460 change_client_state(client, CL_ACTIVE);
1461
1462 /* store old cancel key */
1463 pktbuf_static(&tmp, client->cancel_key, 8);
1464 pktbuf_put_uint64(&tmp, ckey);
1465
1466 /* store old fds */
1467 client->tmp_sk_oldfd = oldfd;
1468 client->tmp_sk_linkfd = linkfd;
1469
1470 varcache_set(&client->vars, "client_encoding", client_enc);
1471 varcache_set(&client->vars, "standard_conforming_strings", std_string);
1472 varcache_set(&client->vars, "datestyle", datestyle);
1473 varcache_set(&client->vars, "timezone", timezone);
1474
1475 return true;
1476 }
1477
use_server_socket(int fd,PgAddr * addr,const char * dbname,const char * username,uint64_t ckey,int oldfd,int linkfd,const char * client_enc,const char * std_string,const char * datestyle,const char * timezone,const char * password,const char * scram_client_key,int scram_client_key_len,const char * scram_server_key,int scram_server_key_len)1478 bool use_server_socket(int fd, PgAddr *addr,
1479 const char *dbname, const char *username,
1480 uint64_t ckey, int oldfd, int linkfd,
1481 const char *client_enc, const char *std_string,
1482 const char *datestyle, const char *timezone,
1483 const char *password,
1484 const char *scram_client_key, int scram_client_key_len,
1485 const char *scram_server_key, int scram_server_key_len)
1486 {
1487 PgDatabase *db = find_database(dbname);
1488 PgUser *user;
1489 PgPool *pool;
1490 PgSocket *server;
1491 PktBuf tmp;
1492 bool res;
1493
1494 /* if the database not found, it's an auto database -> registering... */
1495 if (!db) {
1496 db = register_auto_database(dbname);
1497 if (!db)
1498 return true;
1499 }
1500
1501 if (db->forced_user) {
1502 user = db->forced_user;
1503 } else if (cf_auth_type == AUTH_PAM) {
1504 user = add_pam_user(username, password);
1505 } else {
1506 user = find_user(username);
1507 }
1508 if (!user && db->auth_user)
1509 user = add_db_user(db, username, password);
1510
1511 pool = get_pool(db, user);
1512 if (!pool)
1513 return false;
1514
1515 server = slab_alloc(server_cache);
1516 if (!server)
1517 return false;
1518
1519 res = sbuf_accept(&server->sbuf, fd, pga_is_unix(addr));
1520 if (!res)
1521 return false;
1522
1523 db->connection_count++;
1524
1525 server->suspended = true;
1526 server->pool = pool;
1527 server->login_user = user;
1528 server->connect_time = server->request_time = get_cached_time();
1529 server->query_start = 0;
1530
1531 fill_remote_addr(server, fd, pga_is_unix(addr));
1532 fill_local_addr(server, fd, pga_is_unix(addr));
1533
1534 if (linkfd) {
1535 server->ready = false;
1536 change_server_state(server, SV_ACTIVE);
1537 } else {
1538 server->ready = true;
1539 change_server_state(server, SV_IDLE);
1540 }
1541
1542 /* store old cancel key */
1543 pktbuf_static(&tmp, server->cancel_key, 8);
1544 pktbuf_put_uint64(&tmp, ckey);
1545
1546 /* store old fds */
1547 server->tmp_sk_oldfd = oldfd;
1548 server->tmp_sk_linkfd = linkfd;
1549
1550 varcache_set(&server->vars, "client_encoding", client_enc);
1551 varcache_set(&server->vars, "standard_conforming_strings", std_string);
1552 varcache_set(&server->vars, "datestyle", datestyle);
1553 varcache_set(&server->vars, "timezone", timezone);
1554
1555 return true;
1556 }
1557
for_each_server(PgPool * pool,void (* func)(PgSocket * sk))1558 void for_each_server(PgPool *pool, void (*func)(PgSocket *sk))
1559 {
1560 struct List *item;
1561
1562 statlist_for_each(item, &pool->idle_server_list)
1563 func(container_of(item, PgSocket, head));
1564
1565 statlist_for_each(item, &pool->used_server_list)
1566 func(container_of(item, PgSocket, head));
1567
1568 statlist_for_each(item, &pool->tested_server_list)
1569 func(container_of(item, PgSocket, head));
1570
1571 statlist_for_each(item, &pool->active_server_list)
1572 func(container_of(item, PgSocket, head));
1573
1574 statlist_for_each(item, &pool->new_server_list)
1575 func(container_of(item, PgSocket, head));
1576 }
1577
for_each_server_filtered(PgPool * pool,void (* func)(PgSocket * sk),bool (* filter)(PgSocket * sk,void * arg),void * filter_arg)1578 static void for_each_server_filtered(PgPool *pool, void (*func)(PgSocket *sk), bool (*filter)(PgSocket *sk, void *arg), void *filter_arg)
1579 {
1580 struct List *item;
1581 PgSocket *sk;
1582
1583 statlist_for_each(item, &pool->idle_server_list) {
1584 sk = container_of(item, PgSocket, head);
1585 if (filter(sk, filter_arg))
1586 func(sk);
1587 }
1588
1589 statlist_for_each(item, &pool->used_server_list) {
1590 sk = container_of(item, PgSocket, head);
1591 if (filter(sk, filter_arg))
1592 func(sk);
1593 }
1594
1595 statlist_for_each(item, &pool->tested_server_list) {
1596 sk = container_of(item, PgSocket, head);
1597 if (filter(sk, filter_arg))
1598 func(sk);
1599 }
1600
1601 statlist_for_each(item, &pool->active_server_list) {
1602 sk = container_of(item, PgSocket, head);
1603 if (filter(sk, filter_arg))
1604 func(sk);
1605 }
1606
1607 statlist_for_each(item, &pool->new_server_list) {
1608 sk = container_of(item, PgSocket, head);
1609 if (filter(sk, filter_arg))
1610 func(sk);
1611 }
1612 }
1613
1614
tag_dirty(PgSocket * sk)1615 static void tag_dirty(PgSocket *sk)
1616 {
1617 sk->close_needed = true;
1618 }
1619
tag_pool_dirty(PgPool * pool)1620 void tag_pool_dirty(PgPool *pool)
1621 {
1622 struct List *item, *tmp;
1623 struct PgSocket *server;
1624
1625 /*
1626 * Don't tag the admin pool as dirty, since this is not an actual postgres
1627 * server. Marking it as dirty breaks connecting to the pgbouncer admin
1628 * database on future connections.
1629 */
1630 if (pool->db->admin)
1631 return;
1632
1633 /* reset welcome msg */
1634 if (pool->welcome_msg) {
1635 pktbuf_free(pool->welcome_msg);
1636 pool->welcome_msg = NULL;
1637 }
1638 pool->welcome_msg_ready = false;
1639
1640 /* drop all existing servers ASAP */
1641 for_each_server(pool, tag_dirty);
1642
1643 /* drop servers login phase immediately */
1644 statlist_for_each_safe(item, &pool->new_server_list, tmp) {
1645 server = container_of(item, PgSocket, head);
1646 disconnect_server(server, true, "connect string changed");
1647 }
1648 }
1649
tag_database_dirty(PgDatabase * db)1650 void tag_database_dirty(PgDatabase *db)
1651 {
1652 struct List *item;
1653 PgPool *pool;
1654
1655 statlist_for_each(item, &pool_list) {
1656 pool = container_of(item, PgPool, head);
1657 if (pool->db == db)
1658 tag_pool_dirty(pool);
1659 }
1660 }
1661
tag_autodb_dirty(void)1662 void tag_autodb_dirty(void)
1663 {
1664 struct List *item, *tmp;
1665 PgDatabase *db;
1666 PgPool *pool;
1667
1668 /*
1669 * reload databases.
1670 */
1671 statlist_for_each(item, &database_list) {
1672 db = container_of(item, PgDatabase, head);
1673 if (db->db_auto)
1674 register_auto_database(db->name);
1675 }
1676 statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
1677 db = container_of(item, PgDatabase, head);
1678 if (db->db_auto)
1679 register_auto_database(db->name);
1680 }
1681 /*
1682 * reload pools
1683 */
1684 statlist_for_each(item, &pool_list) {
1685 pool = container_of(item, PgPool, head);
1686 if (pool->db->db_auto)
1687 tag_pool_dirty(pool);
1688 }
1689 }
1690
server_remote_addr_filter(PgSocket * sk,void * arg)1691 static bool server_remote_addr_filter(PgSocket *sk, void *arg) {
1692 PgAddr *addr = arg;
1693
1694 return (pga_cmp_addr(&sk->remote_addr, addr) == 0);
1695 }
1696
tag_host_addr_dirty(const char * host,const struct sockaddr * sa)1697 void tag_host_addr_dirty(const char *host, const struct sockaddr *sa)
1698 {
1699 struct List *item;
1700 PgPool *pool;
1701 PgAddr addr;
1702
1703 memset(&addr, 0, sizeof(addr));
1704 pga_copy(&addr, sa);
1705
1706 statlist_for_each(item, &pool_list) {
1707 pool = container_of(item, PgPool, head);
1708 if (pool->db->host && strcmp(host, pool->db->host) == 0) {
1709 for_each_server_filtered(pool, tag_dirty, server_remote_addr_filter, &addr);
1710 }
1711 }
1712 }
1713
1714
1715 /* move objects from justfree_* to free_* lists */
reuse_just_freed_objects(void)1716 void reuse_just_freed_objects(void)
1717 {
1718 struct List *tmp, *item;
1719 PgSocket *sk;
1720 bool close_works = true;
1721
1722 /*
1723 * event_del() may fail because of ENOMEM for event handlers
1724 * that need only changes sent to kernel on each loop.
1725 *
1726 * Keep open sbufs in justfree lists until successful.
1727 */
1728
1729 statlist_for_each_safe(item, &justfree_client_list, tmp) {
1730 sk = container_of(item, PgSocket, head);
1731 if (sbuf_is_closed(&sk->sbuf)) {
1732 change_client_state(sk, CL_FREE);
1733 } else if (close_works) {
1734 close_works = sbuf_close(&sk->sbuf);
1735 }
1736 }
1737 statlist_for_each_safe(item, &justfree_server_list, tmp) {
1738 sk = container_of(item, PgSocket, head);
1739 if (sbuf_is_closed(&sk->sbuf)) {
1740 change_server_state(sk, SV_FREE);
1741 } else if (close_works) {
1742 close_works = sbuf_close(&sk->sbuf);
1743 }
1744 }
1745 }
1746
objects_cleanup(void)1747 void objects_cleanup(void)
1748 {
1749 struct List *item, *tmp;
1750 PgDatabase *db;
1751
1752 /* close can be postpones, just in case call twice */
1753 reuse_just_freed_objects();
1754 reuse_just_freed_objects();
1755
1756 statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
1757 db = container_of(item, PgDatabase, head);
1758 kill_database(db);
1759 }
1760 statlist_for_each_safe(item, &database_list, tmp) {
1761 db = container_of(item, PgDatabase, head);
1762 kill_database(db);
1763 }
1764
1765 memset(&login_client_list, 0, sizeof login_client_list);
1766 memset(&user_list, 0, sizeof user_list);
1767 memset(&database_list, 0, sizeof database_list);
1768 memset(&pool_list, 0, sizeof pool_list);
1769 memset(&user_tree, 0, sizeof user_tree);
1770 memset(&autodatabase_idle_list, 0, sizeof autodatabase_idle_list);
1771
1772 slab_destroy(server_cache);
1773 server_cache = NULL;
1774 slab_destroy(client_cache);
1775 client_cache = NULL;
1776 slab_destroy(db_cache);
1777 db_cache = NULL;
1778 slab_destroy(pool_cache);
1779 pool_cache = NULL;
1780 slab_destroy(user_cache);
1781 user_cache = NULL;
1782 slab_destroy(iobuf_cache);
1783 iobuf_cache = NULL;
1784 }
1785