1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  *
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  *
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 /*
20  * Herding objects between lists happens here.
21  */
22 
23 #include "bouncer.h"
24 #include "scram.h"
25 
26 /* those items will be allocated as needed, never freed */
27 STATLIST(user_list);
28 STATLIST(database_list);
29 STATLIST(pool_list);
30 
31 /* All locally defined users (in auth_file) are kept here. */
32 struct AATree user_tree;
33 
34 /*
35  * All PAM users are kept here. We need to differentiate two user
36  * lists to avoid user clashing for different authentication types,
37  * and because pam_user_tree is closer to PgDatabase.user_tree in
38  * logic.
39  */
40 struct AATree pam_user_tree;
41 
42 /*
43  * client and server objects will be pre-allocated
44  * they are always in either active or free lists
45  * in addition to others.
46  */
47 STATLIST(login_client_list);
48 
49 struct Slab *server_cache;
50 struct Slab *client_cache;
51 struct Slab *db_cache;
52 struct Slab *pool_cache;
53 struct Slab *user_cache;
54 struct Slab *iobuf_cache;
55 
56 /*
57  * libevent may still report events when event_del()
58  * is called from somewhere else.  So hide just freed
59  * PgSockets for one loop.
60  */
61 static STATLIST(justfree_client_list);
62 static STATLIST(justfree_server_list);
63 
64 /* init autodb idle list */
65 STATLIST(autodatabase_idle_list);
66 
67 /* fast way to get number of active clients */
get_active_client_count(void)68 int get_active_client_count(void)
69 {
70 	return slab_active_count(client_cache);
71 }
72 
73 /* fast way to get number of active servers */
get_active_server_count(void)74 int get_active_server_count(void)
75 {
76 	return slab_active_count(server_cache);
77 }
78 
construct_client(void * obj)79 static void construct_client(void *obj)
80 {
81 	PgSocket *client = obj;
82 
83 	memset(client, 0, sizeof(PgSocket));
84 	list_init(&client->head);
85 	sbuf_init(&client->sbuf, client_proto);
86 	client->state = CL_FREE;
87 }
88 
construct_server(void * obj)89 static void construct_server(void *obj)
90 {
91 	PgSocket *server = obj;
92 
93 	memset(server, 0, sizeof(PgSocket));
94 	list_init(&server->head);
95 	sbuf_init(&server->sbuf, server_proto);
96 	server->state = SV_FREE;
97 }
98 
99 /* compare string with PgUser->name, for usage with btree */
user_node_cmp(uintptr_t userptr,struct AANode * node)100 static int user_node_cmp(uintptr_t userptr, struct AANode *node)
101 {
102 	const char *name = (const char *)userptr;
103 	PgUser *user = container_of(node, PgUser, tree_node);
104 	return strcmp(name, user->name);
105 }
106 
107 /* destroy PgUser, for usage with btree */
user_node_release(struct AANode * node,void * arg)108 static void user_node_release(struct AANode *node, void *arg)
109 {
110 	PgUser *user = container_of(node, PgUser, tree_node);
111 	slab_free(user_cache, user);
112 }
113 
114 /* initialization before config loading */
init_objects(void)115 void init_objects(void)
116 {
117 	aatree_init(&user_tree, user_node_cmp, NULL);
118 	aatree_init(&pam_user_tree, user_node_cmp, NULL);
119 	user_cache = slab_create("user_cache", sizeof(PgUser), 0, NULL, USUAL_ALLOC);
120 	db_cache = slab_create("db_cache", sizeof(PgDatabase), 0, NULL, USUAL_ALLOC);
121 	pool_cache = slab_create("pool_cache", sizeof(PgPool), 0, NULL, USUAL_ALLOC);
122 
123 	if (!user_cache || !db_cache || !pool_cache)
124 		fatal("cannot create initial caches");
125 }
126 
do_iobuf_reset(void * arg)127 static void do_iobuf_reset(void *arg)
128 {
129 	IOBuf *io = arg;
130 	iobuf_reset(io);
131 }
132 
133 /* initialization after config loading */
init_caches(void)134 void init_caches(void)
135 {
136 	server_cache = slab_create("server_cache", sizeof(PgSocket), 0, construct_server, USUAL_ALLOC);
137 	client_cache = slab_create("client_cache", sizeof(PgSocket), 0, construct_client, USUAL_ALLOC);
138 	iobuf_cache = slab_create("iobuf_cache", IOBUF_SIZE, 0, do_iobuf_reset, USUAL_ALLOC);
139 }
140 
141 /* state change means moving between lists */
change_client_state(PgSocket * client,SocketState newstate)142 void change_client_state(PgSocket *client, SocketState newstate)
143 {
144 	PgPool *pool = client->pool;
145 
146 	/* remove from old location */
147 	switch (client->state) {
148 	case CL_FREE:
149 		break;
150 	case CL_JUSTFREE:
151 		statlist_remove(&justfree_client_list, &client->head);
152 		break;
153 	case CL_LOGIN:
154 		if (newstate == CL_WAITING)
155 			newstate = CL_WAITING_LOGIN;
156 		statlist_remove(&login_client_list, &client->head);
157 		break;
158 	case CL_WAITING_LOGIN:
159 		if (newstate == CL_ACTIVE)
160 			newstate = CL_LOGIN;
161 		/* fallthrough */
162 	case CL_WAITING:
163 		statlist_remove(&pool->waiting_client_list, &client->head);
164 		break;
165 	case CL_ACTIVE:
166 		statlist_remove(&pool->active_client_list, &client->head);
167 		break;
168 	case CL_CANCEL:
169 		statlist_remove(&pool->cancel_req_list, &client->head);
170 		break;
171 	default:
172 		fatal("bad cur client state: %d", client->state);
173 	}
174 
175 	client->state = newstate;
176 
177 	/* put to new location */
178 	switch (client->state) {
179 	case CL_FREE:
180 		varcache_clean(&client->vars);
181 		slab_free(client_cache, client);
182 		break;
183 	case CL_JUSTFREE:
184 		statlist_append(&justfree_client_list, &client->head);
185 		break;
186 	case CL_LOGIN:
187 		statlist_append(&login_client_list, &client->head);
188 		break;
189 	case CL_WAITING:
190 	case CL_WAITING_LOGIN:
191 		client->wait_start = get_cached_time();
192 		statlist_append(&pool->waiting_client_list, &client->head);
193 		break;
194 	case CL_ACTIVE:
195 		statlist_append(&pool->active_client_list, &client->head);
196 		break;
197 	case CL_CANCEL:
198 		statlist_append(&pool->cancel_req_list, &client->head);
199 		break;
200 	default:
201 		fatal("bad new client state: %d", client->state);
202 	}
203 }
204 
205 /* state change means moving between lists */
change_server_state(PgSocket * server,SocketState newstate)206 void change_server_state(PgSocket *server, SocketState newstate)
207 {
208 	PgPool *pool = server->pool;
209 
210 	/* remove from old location */
211 	switch (server->state) {
212 	case SV_FREE:
213 		break;
214 	case SV_JUSTFREE:
215 		statlist_remove(&justfree_server_list, &server->head);
216 		break;
217 	case SV_LOGIN:
218 		statlist_remove(&pool->new_server_list, &server->head);
219 		break;
220 	case SV_USED:
221 		statlist_remove(&pool->used_server_list, &server->head);
222 		break;
223 	case SV_TESTED:
224 		statlist_remove(&pool->tested_server_list, &server->head);
225 		break;
226 	case SV_IDLE:
227 		statlist_remove(&pool->idle_server_list, &server->head);
228 		break;
229 	case SV_ACTIVE:
230 		statlist_remove(&pool->active_server_list, &server->head);
231 		break;
232 	default:
233 		fatal("bad old server state: %d", server->state);
234 	}
235 
236 	server->state = newstate;
237 
238 	/* put to new location */
239 	switch (server->state) {
240 	case SV_FREE:
241 		varcache_clean(&server->vars);
242 		slab_free(server_cache, server);
243 		break;
244 	case SV_JUSTFREE:
245 		statlist_append(&justfree_server_list, &server->head);
246 		break;
247 	case SV_LOGIN:
248 		statlist_append(&pool->new_server_list, &server->head);
249 		break;
250 	case SV_USED:
251 		/* use LIFO */
252 		statlist_prepend(&pool->used_server_list, &server->head);
253 		break;
254 	case SV_TESTED:
255 		statlist_append(&pool->tested_server_list, &server->head);
256 		break;
257 	case SV_IDLE:
258 		if (server->close_needed || cf_server_round_robin) {
259 			/* try to avoid immediate usage then */
260 			statlist_append(&pool->idle_server_list, &server->head);
261 		} else {
262 			/* otherwise use LIFO */
263 			statlist_prepend(&pool->idle_server_list, &server->head);
264 		}
265 		break;
266 	case SV_ACTIVE:
267 		statlist_append(&pool->active_server_list, &server->head);
268 		break;
269 	default:
270 		fatal("bad server state: %d", server->state);
271 	}
272 }
273 
274 /* compare pool names, for use with put_in_order */
cmp_pool(struct List * i1,struct List * i2)275 static int cmp_pool(struct List *i1, struct List *i2)
276 {
277 	PgPool *p1 = container_of(i1, PgPool, head);
278 	PgPool *p2 = container_of(i2, PgPool, head);
279 	if (p1->db != p2->db)
280 		return strcmp(p1->db->name, p2->db->name);
281 	if (p1->user != p2->user)
282 		return strcmp(p1->user->name, p2->user->name);
283 	return 0;
284 }
285 
286 /* compare user names, for use with put_in_order */
cmp_user(struct List * i1,struct List * i2)287 static int cmp_user(struct List *i1, struct List *i2)
288 {
289 	PgUser *u1 = container_of(i1, PgUser, head);
290 	PgUser *u2 = container_of(i2, PgUser, head);
291 	return strcmp(u1->name, u2->name);
292 }
293 
294 /* compare db names, for use with put_in_order */
cmp_database(struct List * i1,struct List * i2)295 static int cmp_database(struct List *i1, struct List *i2)
296 {
297 	PgDatabase *db1 = container_of(i1, PgDatabase, head);
298 	PgDatabase *db2 = container_of(i2, PgDatabase, head);
299 	return strcmp(db1->name, db2->name);
300 }
301 
302 /* put elem into list in correct pos */
put_in_order(struct List * newitem,struct StatList * list,int (* cmpfn)(struct List *,struct List *))303 static void put_in_order(struct List *newitem, struct StatList *list,
304 			 int (*cmpfn)(struct List *, struct List *))
305 {
306 	int res;
307 	struct List *item;
308 
309 	statlist_for_each(item, list) {
310 		res = cmpfn(item, newitem);
311 		if (res == 0) {
312 			fatal("put_in_order: found existing elem");
313 		} else if (res > 0) {
314 			statlist_put_before(list, newitem, item);
315 			return;
316 		}
317 	}
318 	statlist_append(list, newitem);
319 }
320 
321 /* create new object if new, then return it */
add_database(const char * name)322 PgDatabase *add_database(const char *name)
323 {
324 	PgDatabase *db = find_database(name);
325 
326 	/* create new object if needed */
327 	if (db == NULL) {
328 		db = slab_alloc(db_cache);
329 		if (!db)
330 			return NULL;
331 
332 		list_init(&db->head);
333 		if (strlcpy(db->name, name, sizeof(db->name)) >= sizeof(db->name)) {
334 			log_warning("too long db name: %s", name);
335 			slab_free(db_cache, db);
336 			return NULL;
337 		}
338 		aatree_init(&db->user_tree, user_node_cmp, user_node_release);
339 		put_in_order(&db->head, &database_list, cmp_database);
340 	}
341 
342 	return db;
343 }
344 
345 /* register new auto database */
register_auto_database(const char * name)346 PgDatabase *register_auto_database(const char *name)
347 {
348 	PgDatabase *db;
349 
350 	if (!cf_autodb_connstr)
351 		return NULL;
352 
353 	if (!parse_database(NULL, name, cf_autodb_connstr))
354 		return NULL;
355 
356 	db = find_database(name);
357 	if (db) {
358 		db->db_auto = true;
359 	}
360 
361 	return db;
362 }
363 
364 /* add or update client users */
add_user(const char * name,const char * passwd)365 PgUser *add_user(const char *name, const char *passwd)
366 {
367 	PgUser *user = find_user(name);
368 
369 	if (user == NULL) {
370 		user = slab_alloc(user_cache);
371 		if (!user)
372 			return NULL;
373 
374 		list_init(&user->head);
375 		list_init(&user->pool_list);
376 		safe_strcpy(user->name, name, sizeof(user->name));
377 		put_in_order(&user->head, &user_list, cmp_user);
378 
379 		aatree_insert(&user_tree, (uintptr_t)user->name, &user->tree_node);
380 		user->pool_mode = POOL_INHERIT;
381 	}
382 	safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
383 	return user;
384 }
385 
386 /* add or update db users */
add_db_user(PgDatabase * db,const char * name,const char * passwd)387 PgUser *add_db_user(PgDatabase *db, const char *name, const char *passwd)
388 {
389 	PgUser *user = NULL;
390 	struct AANode *node;
391 
392 	node = aatree_search(&db->user_tree, (uintptr_t)name);
393 	user = node ? container_of(node, PgUser, tree_node) : NULL;
394 
395 	if (user == NULL) {
396 		user = slab_alloc(user_cache);
397 		if (!user)
398 			return NULL;
399 
400 		list_init(&user->head);
401 		list_init(&user->pool_list);
402 		safe_strcpy(user->name, name, sizeof(user->name));
403 
404 		aatree_insert(&db->user_tree, (uintptr_t)user->name, &user->tree_node);
405 		user->pool_mode = POOL_INHERIT;
406 	}
407 	safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
408 	return user;
409 }
410 
411 /* Add PAM user. The logic is same as in add_db_user */
add_pam_user(const char * name,const char * passwd)412 PgUser *add_pam_user(const char *name, const char *passwd)
413 {
414 	PgUser *user = NULL;
415 	struct AANode *node;
416 
417 	node = aatree_search(&pam_user_tree, (uintptr_t)name);
418 	user = node ? container_of(node, PgUser, tree_node) : NULL;
419 
420 	if (user == NULL) {
421 		user = slab_alloc(user_cache);
422 		if (!user)
423 			return NULL;
424 
425 		list_init(&user->head);
426 		list_init(&user->pool_list);
427 		safe_strcpy(user->name, name, sizeof(user->name));
428 
429 		aatree_insert(&pam_user_tree, (uintptr_t)user->name, &user->tree_node);
430 		user->pool_mode = POOL_INHERIT;
431 	}
432 	if (passwd)
433 		safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
434 	return user;
435 }
436 
437 /* create separate user object for storing server user info */
force_user(PgDatabase * db,const char * name,const char * passwd)438 PgUser *force_user(PgDatabase *db, const char *name, const char *passwd)
439 {
440 	PgUser *user = db->forced_user;
441 	if (!user) {
442 		user = slab_alloc(user_cache);
443 		if (!user)
444 			return NULL;
445 		list_init(&user->head);
446 		list_init(&user->pool_list);
447 		user->pool_mode = POOL_INHERIT;
448 	}
449 	safe_strcpy(user->name, name, sizeof(user->name));
450 	safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
451 	db->forced_user = user;
452 	return user;
453 }
454 
455 /* find an existing database */
find_database(const char * name)456 PgDatabase *find_database(const char *name)
457 {
458 	struct List *item, *tmp;
459 	PgDatabase *db;
460 	statlist_for_each(item, &database_list) {
461 		db = container_of(item, PgDatabase, head);
462 		if (strcmp(db->name, name) == 0)
463 			return db;
464 	}
465 	/* also trying to find in idle autodatabases list */
466 	statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
467 		db = container_of(item, PgDatabase, head);
468 		if (strcmp(db->name, name) == 0) {
469 			db->inactive_time = 0;
470 			statlist_remove(&autodatabase_idle_list, &db->head);
471 			put_in_order(&db->head, &database_list, cmp_database);
472 			return db;
473 		}
474 	}
475 	return NULL;
476 }
477 
478 /* find existing user */
find_user(const char * name)479 PgUser *find_user(const char *name)
480 {
481 	PgUser *user = NULL;
482 	struct AANode *node;
483 
484 	node = aatree_search(&user_tree, (uintptr_t)name);
485 	user = node ? container_of(node, PgUser, tree_node) : NULL;
486 	return user;
487 }
488 
489 /* create new pool object */
new_pool(PgDatabase * db,PgUser * user)490 static PgPool *new_pool(PgDatabase *db, PgUser *user)
491 {
492 	PgPool *pool;
493 
494 	pool = slab_alloc(pool_cache);
495 	if (!pool)
496 		return NULL;
497 
498 	list_init(&pool->head);
499 	list_init(&pool->map_head);
500 
501 	pool->user = user;
502 	pool->db = db;
503 
504 	statlist_init(&pool->active_client_list, "active_client_list");
505 	statlist_init(&pool->waiting_client_list, "waiting_client_list");
506 	statlist_init(&pool->active_server_list, "active_server_list");
507 	statlist_init(&pool->idle_server_list, "idle_server_list");
508 	statlist_init(&pool->tested_server_list, "tested_server_list");
509 	statlist_init(&pool->used_server_list, "used_server_list");
510 	statlist_init(&pool->new_server_list, "new_server_list");
511 	statlist_init(&pool->cancel_req_list, "cancel_req_list");
512 
513 	list_append(&user->pool_list, &pool->map_head);
514 
515 	/* keep pools in db/user order to make stats faster */
516 	put_in_order(&pool->head, &pool_list, cmp_pool);
517 
518 	return pool;
519 }
520 
521 /* find pool object, create if needed */
get_pool(PgDatabase * db,PgUser * user)522 PgPool *get_pool(PgDatabase *db, PgUser *user)
523 {
524 	struct List *item;
525 	PgPool *pool;
526 
527 	if (!db || !user)
528 		return NULL;
529 
530 	list_for_each(item, &user->pool_list) {
531 		pool = container_of(item, PgPool, map_head);
532 		if (pool->db == db)
533 			return pool;
534 	}
535 
536 	return new_pool(db, user);
537 }
538 
539 /* deactivate socket and put into wait queue */
pause_client(PgSocket * client)540 static void pause_client(PgSocket *client)
541 {
542 	Assert(client->state == CL_ACTIVE || client->state == CL_LOGIN);
543 
544 	slog_debug(client, "pause_client");
545 	change_client_state(client, CL_WAITING);
546 	if (!sbuf_pause(&client->sbuf))
547 		disconnect_client(client, true, "pause failed");
548 }
549 
550 /* wake client from wait */
activate_client(PgSocket * client)551 void activate_client(PgSocket *client)
552 {
553 	Assert(client->state == CL_WAITING || client->state == CL_WAITING_LOGIN);
554 
555 	Assert(client->wait_start > 0);
556 
557 	/* acount for time client spent waiting for server */
558 	client->pool->stats.wait_time += (get_cached_time() - client->wait_start);
559 
560 	slog_debug(client, "activate_client");
561 	change_client_state(client, CL_ACTIVE);
562 	sbuf_continue(&client->sbuf);
563 }
564 
565 /*
566  * Don't let clients queue at all if there is no working server connection.
567  *
568  * It must still allow following cases:
569  * - empty pool on startup
570  * - idle pool where all servers are removed
571  *
572  * Current assumptions:
573  * - old server connections will be dropped by query_timeout
574  * - new server connections fail due to server_connect_timeout, or other failure
575  *
576  * So here we drop client if all server connections have been dropped
577  * and new ones fail.
578  *
579  * Return true if the client connection should be allowed, false if it
580  * should be rejected.
581  */
check_fast_fail(PgSocket * client)582 bool check_fast_fail(PgSocket *client)
583 {
584 	int cnt;
585 	PgPool *pool = client->pool;
586 
587 	/* Could be mock authentication, proceed normally */
588 	if (!pool)
589 		return true;
590 
591 	/* If last login succeeded, client can go ahead. */
592 	if (!pool->last_login_failed)
593 		return true;
594 
595 	/* If there are servers available, client can go ahead. */
596 	cnt = pool_server_count(pool) - statlist_count(&pool->new_server_list);
597 	if (cnt)
598 		return true;
599 
600 	/* Else we fail the client. */
601 	disconnect_client(client, true, "pgbouncer cannot connect to server");
602 
603 	/*
604 	 * Try to launch a new connection.  (launch_new_connection()
605 	 * will check for server_login_retry etc.)  The usual relaunch
606 	 * from janitor.c won't do anything, as there are no waiting
607 	 * clients, so we need to do it here to get any new servers
608 	 * eventually.
609 	 */
610 	launch_new_connection(pool);
611 
612 	return false;
613 }
614 
615 /* link if found, otherwise put into wait queue */
find_server(PgSocket * client)616 bool find_server(PgSocket *client)
617 {
618 	PgPool *pool = client->pool;
619 	PgSocket *server;
620 	bool res;
621 	bool varchange = false;
622 
623 	Assert(client->state == CL_ACTIVE || client->state == CL_LOGIN);
624 
625 	/* no wait by default */
626 	client->wait_start = 0;
627 
628 	if (client->link)
629 		return true;
630 
631 	/* try to get idle server, if allowed */
632 	if (cf_pause_mode == P_PAUSE || pool->db->db_paused) {
633 		server = NULL;
634 	} else {
635 		while (1) {
636 			server = first_socket(&pool->idle_server_list);
637 			if (!server) {
638 				break;
639 			} else if (server->close_needed) {
640 				disconnect_server(server, true, "obsolete connection");
641 			} else if (!server->ready) {
642 				disconnect_server(server, true, "idle server got dirty");
643 			} else {
644 				break;
645 			}
646 		}
647 
648 		if (!server && !check_fast_fail(client))
649 			return false;
650 
651 	}
652 	Assert(!server || server->state == SV_IDLE);
653 
654 	/* send var changes */
655 	if (server) {
656 		res = varcache_apply(server, client, &varchange);
657 		if (!res) {
658 			disconnect_server(server, true, "var change failed");
659 			server = NULL;
660 		}
661 	}
662 
663 	/* link or send to waiters list */
664 	if (server) {
665 		client->link = server;
666 		server->link = client;
667 		change_server_state(server, SV_ACTIVE);
668 		if (varchange) {
669 			server->setting_vars = true;
670 			server->ready = false;
671 			res = false; /* don't process client data yet */
672 			if (!sbuf_pause(&client->sbuf))
673 				disconnect_client(client, true, "pause failed");
674 		} else {
675 			res = true;
676 		}
677 	} else {
678 		pause_client(client);
679 		res = false;
680 	}
681 	return res;
682 }
683 
684 /* pick waiting client */
reuse_on_release(PgSocket * server)685 static bool reuse_on_release(PgSocket *server)
686 {
687 	bool res = true;
688 	PgPool *pool = server->pool;
689 	PgSocket *client = first_socket(&pool->waiting_client_list);
690 	if (client) {
691 		activate_client(client);
692 
693 		/*
694 		 * As the activate_client() does full read loop,
695 		 * then it may happen that linked client closing
696 		 * causes server closing.  Report it.
697 		 */
698 		if (server->state == SV_FREE || server->state == SV_JUSTFREE)
699 			res = false;
700 	}
701 	return res;
702 }
703 
704 /* send reset query */
reset_on_release(PgSocket * server)705 static bool reset_on_release(PgSocket *server)
706 {
707 	bool res;
708 
709 	Assert(server->state == SV_TESTED);
710 
711 	slog_debug(server, "resetting: %s", cf_server_reset_query);
712 	SEND_generic(res, server, 'Q', "s", cf_server_reset_query);
713 	if (!res)
714 		disconnect_server(server, false, "reset query failed");
715 	return res;
716 }
717 
life_over(PgSocket * server)718 static bool life_over(PgSocket *server)
719 {
720 	PgPool *pool = server->pool;
721 	usec_t lifetime_kill_gap = 0;
722 	usec_t now = get_cached_time();
723 	usec_t age = now - server->connect_time;
724 	usec_t last_kill = now - pool->last_lifetime_disconnect;
725 
726 	if (age < cf_server_lifetime)
727 		return false;
728 
729 	if (pool_pool_size(pool) > 0)
730 		lifetime_kill_gap = cf_server_lifetime / pool_pool_size(pool);
731 
732 	if (last_kill >= lifetime_kill_gap)
733 		return true;
734 
735 	return false;
736 }
737 
738 /* connecting/active -> idle, unlink if needed */
release_server(PgSocket * server)739 bool release_server(PgSocket *server)
740 {
741 	PgPool *pool = server->pool;
742 	SocketState newstate = SV_IDLE;
743 
744 	Assert(server->ready);
745 
746 	/* remove from old list */
747 	switch (server->state) {
748 	case SV_ACTIVE:
749 		server->link->link = NULL;
750 		server->link = NULL;
751 
752 		if (*cf_server_reset_query && (cf_server_reset_query_always ||
753 					       pool_pool_mode(pool) == POOL_SESSION))
754 		{
755 			/* notify reset is required */
756 			newstate = SV_TESTED;
757 		} else if (cf_server_check_delay == 0 && *cf_server_check_query) {
758 			/*
759 			 * deprecated: before reset_query, the check_delay = 0
760 			 * was used to get same effect.  This if() can be removed
761 			 * after couple of releases.
762 			 */
763 			newstate = SV_USED;
764 		}
765 	case SV_USED:
766 	case SV_TESTED:
767 		break;
768 	case SV_LOGIN:
769 		pool->last_login_failed = false;
770 		pool->last_connect_failed = false;
771 		break;
772 	default:
773 		fatal("bad server state: %d", server->state);
774 	}
775 
776 	/* enforce lifetime immediately on release */
777 	if (server->state != SV_LOGIN && life_over(server)) {
778 		disconnect_server(server, true, "server_lifetime");
779 		pool->last_lifetime_disconnect = get_cached_time();
780 		return false;
781 	}
782 
783 	/* enforce close request */
784 	if (server->close_needed) {
785 		disconnect_server(server, true, "close_needed");
786 		return false;
787 	}
788 
789 	Assert(server->link == NULL);
790 	slog_noise(server, "release_server: new state=%d", newstate);
791 	change_server_state(server, newstate);
792 
793 	if (newstate == SV_IDLE) {
794 		/* immediately process waiters, to give fair chance */
795 		return reuse_on_release(server);
796 	} else if (newstate == SV_TESTED) {
797 		return reset_on_release(server);
798 	}
799 
800 	return true;
801 }
802 
803 /* drop server connection */
disconnect_server(PgSocket * server,bool notify,const char * reason,...)804 void disconnect_server(PgSocket *server, bool notify, const char *reason, ...)
805 {
806 	PgPool *pool = server->pool;
807 	PgSocket *client;
808 	static const uint8_t pkt_term[] = {'X', 0,0,0,4};
809 	bool send_term = true;
810 	usec_t now = get_cached_time();
811 	char buf[128];
812 	va_list ap;
813 
814 	va_start(ap, reason);
815 	vsnprintf(buf, sizeof(buf), reason, ap);
816 	va_end(ap);
817 	reason = buf;
818 
819 	if (cf_log_disconnections)
820 		slog_info(server, "closing because: %s (age=%" PRIu64 "s)", reason,
821 			  (now - server->connect_time) / USEC);
822 
823 	switch (server->state) {
824 	case SV_ACTIVE:
825 		client = server->link;
826 		if (client) {
827 			client->link = NULL;
828 			server->link = NULL;
829 			disconnect_client(client, true, "%s", reason);
830 		}
831 		break;
832 	case SV_TESTED:
833 	case SV_USED:
834 	case SV_IDLE:
835 		break;
836 	case SV_LOGIN:
837 		/*
838 		 * usually disconnect means problems in startup phase,
839 		 * except when sending cancel packet
840 		 */
841 		if (!server->ready)
842 		{
843 			pool->last_login_failed = true;
844 			pool->last_connect_failed = true;
845 		}
846 		else
847 		{
848 			/*
849 			 * We did manage to connect and used the connection for query
850 			 * cancellation, so to the best of our knowledge we can connect to
851 			 * the server, reset last_connect_failed accordingly.
852 			 */
853 			pool->last_connect_failed = false;
854 			send_term = false;
855 		}
856 		break;
857 	default:
858 		fatal("bad server state: %d", server->state);
859 	}
860 
861 	Assert(server->link == NULL);
862 
863 	/* notify server and close connection */
864 	if (send_term && notify) {
865 		bool _ignore = sbuf_answer(&server->sbuf, pkt_term, sizeof(pkt_term));
866 		(void) _ignore;
867 	}
868 
869 	if (server->dns_token) {
870 		adns_cancel(adns, server->dns_token);
871 		server->dns_token = NULL;
872 	}
873 
874 	free_scram_state(&server->scram_state);
875 
876 	server->pool->db->connection_count--;
877 	server->pool->user->connection_count--;
878 
879 	change_server_state(server, SV_JUSTFREE);
880 	if (!sbuf_close(&server->sbuf))
881 		log_noise("sbuf_close failed, retry later");
882 }
883 
884 /* drop client connection */
disconnect_client(PgSocket * client,bool notify,const char * reason,...)885 void disconnect_client(PgSocket *client, bool notify, const char *reason, ...)
886 {
887 	usec_t now = get_cached_time();
888 
889 	if (reason) {
890 		char buf[128];
891 		va_list ap;
892 
893 		va_start(ap, reason);
894 		vsnprintf(buf, sizeof(buf), reason, ap);
895 		va_end(ap);
896 		reason = buf;
897 	}
898 
899 	if (cf_log_disconnections && reason)
900 		slog_info(client, "closing because: %s (age=%" PRIu64 "s)", reason,
901 			  (now - client->connect_time) / USEC);
902 
903 	switch (client->state) {
904 	case CL_ACTIVE:
905 	case CL_LOGIN:
906 		if (client->link) {
907 			PgSocket *server = client->link;
908 			if (!server->ready) {
909 				server->link = NULL;
910 				client->link = NULL;
911 				/*
912 				 * This can happen if the client
913 				 * connection is normally closed while
914 				 * the server has a transaction block
915 				 * open.  Then there is no way for us
916 				 * to reset the server other than by
917 				 * closing it.  Perhaps it would be
918 				 * worth tracking this separately to
919 				 * make the error message more
920 				 * precise and less scary.
921 				 */
922 				disconnect_server(server, true, "client disconnect while server was not ready");
923 			} else if (!sbuf_is_empty(&server->sbuf)) {
924 				/* ->ready may be set before all is sent */
925 				server->link = NULL;
926 				client->link = NULL;
927 				disconnect_server(server, true, "client disconnect before everything was sent to the server");
928 			} else {
929 				/* retval does not matter here */
930 				release_server(server);
931 			}
932 		}
933 	case CL_WAITING:
934 	case CL_WAITING_LOGIN:
935 	case CL_CANCEL:
936 		break;
937 	default:
938 		fatal("bad client state: %d", client->state);
939 	}
940 
941 	/* send reason to client */
942 	if (notify && reason && client->state != CL_CANCEL) {
943 		/*
944 		 * don't send Ready pkt here, or client won't notice
945 		 * closed connection
946 		 */
947 		send_pooler_error(client, false, true, reason);
948 	}
949 
950 	free_scram_state(&client->scram_state);
951 	if (client->login_user && client->login_user->mock_auth) {
952 		free(client->login_user);
953 		client->login_user = NULL;
954 	}
955 
956 	change_client_state(client, CL_JUSTFREE);
957 	if (!sbuf_close(&client->sbuf))
958 		log_noise("sbuf_close failed, retry later");
959 }
960 
961 /*
962  * Connection creation utilities
963  */
964 
connect_server(struct PgSocket * server,const struct sockaddr * sa,int salen)965 static void connect_server(struct PgSocket *server, const struct sockaddr *sa, int salen)
966 {
967 	bool res;
968 
969 	/* fill remote_addr */
970 	memset(&server->remote_addr, 0, sizeof(server->remote_addr));
971 	if (sa->sa_family == AF_UNIX) {
972 		pga_set(&server->remote_addr, AF_UNIX, server->pool->db->port);
973 	} else {
974 		pga_copy(&server->remote_addr, sa);
975 	}
976 
977 	slog_debug(server, "launching new connection to server");
978 
979 	/* start connecting */
980 	res = sbuf_connect(&server->sbuf, sa, salen,
981 			   cf_server_connect_timeout / USEC);
982 	if (!res)
983 		log_noise("failed to launch new connection");
984 }
985 
dns_callback(void * arg,const struct sockaddr * sa,int salen)986 static void dns_callback(void *arg, const struct sockaddr *sa, int salen)
987 {
988 	struct PgSocket *server = arg;
989 	struct PgDatabase *db = server->pool->db;
990 	struct sockaddr_in sa_in;
991 	struct sockaddr_in6 sa_in6;
992 
993 	server->dns_token = NULL;
994 
995 	if (!sa) {
996 		disconnect_server(server, true, "server DNS lookup failed");
997 		return;
998 	} else if (sa->sa_family == AF_INET) {
999 		char buf[64];
1000 		memcpy(&sa_in, sa, sizeof(sa_in));
1001 		sa_in.sin_port = htons(db->port);
1002 		sa = (struct sockaddr *)&sa_in;
1003 		salen = sizeof(sa_in);
1004 		slog_debug(server, "dns_callback: inet4: %s",
1005 			   sa2str(sa, buf, sizeof(buf)));
1006 	} else if (sa->sa_family == AF_INET6) {
1007 		char buf[64];
1008 		memcpy(&sa_in6, sa, sizeof(sa_in6));
1009 		sa_in6.sin6_port = htons(db->port);
1010 		sa = (struct sockaddr *)&sa_in6;
1011 		salen = sizeof(sa_in6);
1012 		slog_debug(server, "dns_callback: inet6: %s",
1013 			   sa2str(sa, buf, sizeof(buf)));
1014 	} else {
1015 		disconnect_server(server, true, "unknown address family: %d", sa->sa_family);
1016 		return;
1017 	}
1018 
1019 	connect_server(server, sa, salen);
1020 }
1021 
dns_connect(struct PgSocket * server)1022 static void dns_connect(struct PgSocket *server)
1023 {
1024 	struct sockaddr_un sa_un;
1025 	struct sockaddr_in sa_in;
1026 	struct sockaddr_in6 sa_in6;
1027 	struct sockaddr *sa;
1028 	struct PgDatabase *db = server->pool->db;
1029 	const char *host = db->host;
1030 	const char *unix_dir;
1031 	int sa_len;
1032 	int res;
1033 
1034 	if (!host || host[0] == '/' || host[0] == '@') {
1035 		memset(&sa_un, 0, sizeof(sa_un));
1036 		sa_un.sun_family = AF_UNIX;
1037 		unix_dir = host ? host : cf_unix_socket_dir;
1038 		if (!unix_dir || !*unix_dir) {
1039 			log_error("unix socket dir not configured: %s", db->name);
1040 			disconnect_server(server, false, "cannot connect");
1041 			return;
1042 		}
1043 		snprintf(sa_un.sun_path, sizeof(sa_un.sun_path),
1044 			 "%s/.s.PGSQL.%d", unix_dir, db->port);
1045 		slog_noise(server, "unix socket: %s", sa_un.sun_path);
1046 		if (unix_dir[0] == '@') {
1047 			/*
1048 			 * By convention, for abstract Unix sockets,
1049 			 * only the length of the string is the
1050 			 * sockaddr length.
1051 			 */
1052 			sa_len = offsetof(struct sockaddr_un, sun_path) + strlen(sa_un.sun_path);
1053 			sa_un.sun_path[0] = '\0';
1054 		}
1055 		else {
1056 			sa_len = sizeof(sa_un);
1057 		}
1058 		sa = (struct sockaddr *)&sa_un;
1059 		res = 1;
1060 	} else if (strchr(host, ':')) {  /* assume IPv6 address on any : in addr */
1061 		slog_noise(server, "inet6 socket: %s", db->host);
1062 		memset(&sa_in6, 0, sizeof(sa_in6));
1063 		sa_in6.sin6_family = AF_INET6;
1064 		res = inet_pton(AF_INET6, db->host, &sa_in6.sin6_addr);
1065 		sa_in6.sin6_port = htons(db->port);
1066 		sa = (struct sockaddr *)&sa_in6;
1067 		sa_len = sizeof(sa_in6);
1068 	} else { /* else try IPv4 */
1069 		slog_noise(server, "inet socket: %s", db->host);
1070 		memset(&sa_in, 0, sizeof(sa_in));
1071 		sa_in.sin_family = AF_INET;
1072 		res = inet_pton(AF_INET, db->host, &sa_in.sin_addr);
1073 		sa_in.sin_port = htons(db->port);
1074 		sa = (struct sockaddr *)&sa_in;
1075 		sa_len = sizeof(sa_in);
1076 	}
1077 
1078 	/* if simple parse failed, use DNS */
1079 	if (res != 1) {
1080 		struct DNSToken *tk;
1081 		slog_noise(server, "dns socket: %s", db->host);
1082 		/* launch dns lookup */
1083 		tk = adns_resolve(adns, db->host, dns_callback, server);
1084 		if (tk)
1085 			server->dns_token = tk;
1086 		return;
1087 	}
1088 
1089 	connect_server(server, sa, sa_len);
1090 }
1091 
compare_connections_by_time(PgSocket * lhs,PgSocket * rhs)1092 PgSocket *compare_connections_by_time(PgSocket *lhs, PgSocket *rhs)
1093 {
1094 	if (!lhs)
1095 		return rhs;
1096 	if (!rhs)
1097 		return lhs;
1098 	return lhs->request_time < rhs->request_time ? lhs : rhs;
1099 }
1100 
1101 /* evict the single most idle connection from among all pools to make room in the db */
evict_connection(PgDatabase * db)1102 bool evict_connection(PgDatabase *db)
1103 {
1104 	struct List *item;
1105 	PgPool *pool;
1106 	PgSocket *oldest_connection = NULL;
1107 
1108 	statlist_for_each(item, &pool_list) {
1109 		pool = container_of(item, PgPool, head);
1110 		if (pool->db != db)
1111 			continue;
1112 		oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->idle_server_list));
1113 		/* only evict testing connections if nobody's waiting */
1114 		if (statlist_empty(&pool->waiting_client_list)) {
1115 			oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->used_server_list));
1116 			oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->tested_server_list));
1117 		}
1118 	}
1119 
1120 	if (oldest_connection) {
1121 		disconnect_server(oldest_connection, true, "evicted");
1122 		return true;
1123 	}
1124 	return false;
1125 }
1126 
1127 /* evict the single most idle connection from among all pools to make room in the user */
evict_user_connection(PgUser * user)1128 bool evict_user_connection(PgUser *user)
1129 {
1130 	struct List *item;
1131 	PgPool *pool;
1132 	PgSocket *oldest_connection = NULL;
1133 
1134 	statlist_for_each(item, &pool_list) {
1135 		pool = container_of(item, PgPool, head);
1136 		if (pool->user != user)
1137 			continue;
1138 		oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->idle_server_list));
1139 		/* only evict testing connections if nobody's waiting */
1140 		if (statlist_empty(&pool->waiting_client_list)) {
1141 			oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->used_server_list));
1142 			oldest_connection = compare_connections_by_time(oldest_connection, last_socket(&pool->tested_server_list));
1143 		}
1144 	}
1145 
1146 	if (oldest_connection) {
1147 		disconnect_server(oldest_connection, true, "evicted");
1148 		return true;
1149 	}
1150 	return false;
1151 }
1152 
1153 /* the pool needs new connection, if possible */
launch_new_connection(PgPool * pool)1154 void launch_new_connection(PgPool *pool)
1155 {
1156 	PgSocket *server;
1157 	int max;
1158 
1159 	/* allow only small number of connection attempts at a time */
1160 	if (!statlist_empty(&pool->new_server_list)) {
1161 		log_debug("launch_new_connection: already progress");
1162 		return;
1163 	}
1164 
1165 	/* if server bounces, don't retry too fast */
1166 	if (pool->last_connect_failed) {
1167 		usec_t now = get_cached_time();
1168 		if (now - pool->last_connect_time < cf_server_login_retry) {
1169 			log_debug("launch_new_connection: last failed, not launching new connection yet, still waiting %" PRIu64 " s",
1170 				  (cf_server_login_retry - (now - pool->last_connect_time)) / USEC);
1171 			return;
1172 		}
1173 	}
1174 
1175 	max = pool_server_count(pool);
1176 
1177 	/* when a cancel request is queued allow connections up to twice the pool size */
1178 	if (!statlist_empty(&pool->cancel_req_list) && max < (2 * pool_pool_size(pool))) {
1179 		log_debug("launch_new_connection: bypass pool limitations for cancel request");
1180 		goto force_new;
1181 	}
1182 
1183 	/* is it allowed to add servers? */
1184 	if (max >= pool_pool_size(pool) && pool->welcome_msg_ready) {
1185 		/* should we use reserve pool? */
1186 		if (cf_res_pool_timeout && pool_res_pool_size(pool)) {
1187 			usec_t now = get_cached_time();
1188 			PgSocket *c = first_socket(&pool->waiting_client_list);
1189 			if (c && (now - c->request_time) >= cf_res_pool_timeout) {
1190 				if (max < pool_pool_size(pool) + pool_res_pool_size(pool)) {
1191 					slog_warning(c, "taking connection from reserve_pool");
1192 					goto allow_new;
1193 				}
1194 			}
1195 		}
1196 		log_debug("launch_new_connection: pool full (%d >= %d)",
1197 				max, pool_pool_size(pool));
1198 		return;
1199 	}
1200 
1201 allow_new:
1202 	max = database_max_connections(pool->db);
1203 	if (max > 0) {
1204 		/* try to evict unused connections first */
1205 		while (pool->db->connection_count >= max) {
1206 			if (!evict_connection(pool->db)) {
1207 				break;
1208 			}
1209 		}
1210 		if (pool->db->connection_count >= max) {
1211 			log_debug("launch_new_connection: database '%s' full (%d >= %d)",
1212 				  pool->db->name, pool->db->connection_count, max);
1213 			return;
1214 		}
1215 	}
1216 
1217 	max = user_max_connections(pool->user);
1218 	if (max > 0) {
1219 		/* try to evict unused connection first */
1220 		while (pool->user->connection_count >= max) {
1221 			if (!evict_user_connection(pool->user)) {
1222 				break;
1223 			}
1224 		}
1225 		if (pool->user->connection_count >= max) {
1226 			log_debug("launch_new_connection: user '%s' full (%d >= %d)",
1227 				  pool->user->name, pool->user->connection_count, max);
1228 			return;
1229 		}
1230 	}
1231 
1232 force_new:
1233 	/* get free conn object */
1234 	server = slab_alloc(server_cache);
1235 	if (!server) {
1236 		log_debug("launch_new_connection: no memory");
1237 		return;
1238 	}
1239 
1240 	/* initialize it */
1241 	server->pool = pool;
1242 	server->login_user = server->pool->user;
1243 	server->connect_time = get_cached_time();
1244 	pool->last_connect_time = get_cached_time();
1245 	change_server_state(server, SV_LOGIN);
1246 	pool->db->connection_count++;
1247 	pool->user->connection_count++;
1248 
1249 	dns_connect(server);
1250 }
1251 
1252 /* new client connection attempt */
accept_client(int sock,bool is_unix)1253 PgSocket *accept_client(int sock, bool is_unix)
1254 {
1255 	bool res;
1256 	PgSocket *client;
1257 
1258 	/* get free PgSocket */
1259 	client = slab_alloc(client_cache);
1260 	if (!client) {
1261 		log_warning("cannot allocate client struct");
1262 		safe_close(sock);
1263 		return NULL;
1264 	}
1265 
1266 	client->connect_time = client->request_time = get_cached_time();
1267 	client->query_start = 0;
1268 
1269 	/* FIXME: take local and remote address from pool_accept() */
1270 	fill_remote_addr(client, sock, is_unix);
1271 	fill_local_addr(client, sock, is_unix);
1272 
1273 	change_client_state(client, CL_LOGIN);
1274 
1275 	res = sbuf_accept(&client->sbuf, sock, is_unix);
1276 	if (!res) {
1277 		if (cf_log_connections)
1278 			slog_debug(client, "failed connection attempt");
1279 		return NULL;
1280 	}
1281 
1282 	return client;
1283 }
1284 
1285 /* send cached parameters to client to pretend being server */
1286 /* client managed to authenticate, send welcome msg and accept queries */
finish_client_login(PgSocket * client)1287 bool finish_client_login(PgSocket *client)
1288 {
1289 	switch (client->state) {
1290 	case CL_LOGIN:
1291 		change_client_state(client, CL_ACTIVE);
1292 	case CL_ACTIVE:
1293 		break;
1294 	default:
1295 		fatal("bad client state: %d", client->state);
1296 	}
1297 
1298 	client->wait_for_auth = false;
1299 
1300 	/* check if we know server signature */
1301 	if (!client->pool->welcome_msg_ready) {
1302 		log_debug("finish_client_login: no welcome message, pause");
1303 		client->wait_for_welcome = true;
1304 		pause_client(client);
1305 		if (cf_pause_mode == P_NONE)
1306 			launch_new_connection(client->pool);
1307 		return false;
1308 	}
1309 	client->wait_for_welcome = false;
1310 
1311 	/* send the message */
1312 	if (!welcome_client(client))
1313 		return false;
1314 
1315 	slog_debug(client, "logged in");
1316 
1317 	return true;
1318 }
1319 
1320 /* client->cancel_key has requested client key */
accept_cancel_request(PgSocket * req)1321 void accept_cancel_request(PgSocket *req)
1322 {
1323 	struct List *pitem, *citem;
1324 	PgPool *pool = NULL;
1325 	PgSocket *server = NULL, *client, *main_client = NULL;
1326 
1327 	Assert(req->state == CL_LOGIN);
1328 
1329 	/* find real client this is for */
1330 	statlist_for_each(pitem, &pool_list) {
1331 		pool = container_of(pitem, PgPool, head);
1332 		statlist_for_each(citem, &pool->active_client_list) {
1333 			client = container_of(citem, PgSocket, head);
1334 			if (memcmp(client->cancel_key, req->cancel_key, 8) == 0) {
1335 				main_client = client;
1336 				goto found;
1337 			}
1338 		}
1339 		statlist_for_each(citem, &pool->waiting_client_list) {
1340 			client = container_of(citem, PgSocket, head);
1341 			if (memcmp(client->cancel_key, req->cancel_key, 8) == 0) {
1342 				main_client = client;
1343 				goto found;
1344 			}
1345 		}
1346 	}
1347 found:
1348 
1349 	/* wrong key */
1350 	if (!main_client) {
1351 		disconnect_client(req, false, "failed cancel request");
1352 		return;
1353 	}
1354 
1355 	/* not linked client, just drop it then */
1356 	if (!main_client->link) {
1357 		/* let administrative cancel be handled elsewhere */
1358 		if (main_client->pool->db->admin) {
1359 			disconnect_client(req, false, "cancel request for console client");
1360 			admin_handle_cancel(main_client);
1361 			return;
1362 		}
1363 
1364 		disconnect_client(req, false, "cancel request for idle client");
1365 
1366 		return;
1367 	}
1368 
1369 	/* drop the connection, if fails, retry later in justfree list */
1370 	if (!sbuf_close(&req->sbuf))
1371 		log_noise("sbuf_close failed, retry later");
1372 
1373 	/* remember server key */
1374 	server = main_client->link;
1375 	memcpy(req->cancel_key, server->cancel_key, 8);
1376 
1377 	/* attach to target pool */
1378 	req->pool = pool;
1379 	change_client_state(req, CL_CANCEL);
1380 
1381 	/* need fresh connection */
1382 	launch_new_connection(pool);
1383 }
1384 
forward_cancel_request(PgSocket * server)1385 void forward_cancel_request(PgSocket *server)
1386 {
1387 	bool res;
1388 	PgSocket *req = first_socket(&server->pool->cancel_req_list);
1389 
1390 	Assert(req != NULL && req->state == CL_CANCEL);
1391 	Assert(server->state == SV_LOGIN);
1392 
1393 	SEND_CancelRequest(res, server, req->cancel_key);
1394 	if (!res)
1395 		log_warning("sending cancel request failed: %s", strerror(errno));
1396 
1397 	change_client_state(req, CL_JUSTFREE);
1398 }
1399 
use_client_socket(int fd,PgAddr * addr,const char * dbname,const char * username,uint64_t ckey,int oldfd,int linkfd,const char * client_enc,const char * std_string,const char * datestyle,const char * timezone,const char * password,const char * scram_client_key,int scram_client_key_len,const char * scram_server_key,int scram_server_key_len)1400 bool use_client_socket(int fd, PgAddr *addr,
1401 		       const char *dbname, const char *username,
1402 		       uint64_t ckey, int oldfd, int linkfd,
1403 		       const char *client_enc, const char *std_string,
1404 		       const char *datestyle, const char *timezone,
1405 		       const char *password,
1406 		       const char *scram_client_key, int scram_client_key_len,
1407 		       const char *scram_server_key, int scram_server_key_len)
1408 {
1409 	PgDatabase *db = find_database(dbname);
1410 	PgSocket *client;
1411 	PktBuf tmp;
1412 
1413 	/* if the database not found, it's an auto database -> registering... */
1414 	if (!db) {
1415 		db = register_auto_database(dbname);
1416 		if (!db)
1417 			return true;
1418 	}
1419 
1420 	if (scram_client_key || scram_server_key) {
1421 		PgUser *user;
1422 
1423 		if (!scram_client_key || !scram_server_key) {
1424 			log_error("incomplete SCRAM key data");
1425 			return false;
1426 		}
1427 		if (sizeof(user->scram_ClientKey) != scram_client_key_len
1428 		    || sizeof(user->scram_ServerKey) != scram_server_key_len) {
1429 			log_error("incompatible SCRAM key data");
1430 			return false;
1431 		}
1432 		if (db->forced_user) {
1433 			log_error("SCRAM key data received for forced user");
1434 			return false;
1435 		}
1436 		if (cf_auth_type == AUTH_PAM) {
1437 			log_error("SCRAM key data received for PAM user");
1438 			return false;
1439 		}
1440 		user = find_user(username);
1441 		if (!user && db->auth_user)
1442 			user = add_db_user(db, username, password);
1443 
1444 		if (!user)
1445 			return false;
1446 
1447 		memcpy(user->scram_ClientKey, scram_client_key, sizeof(user->scram_ClientKey));
1448 		memcpy(user->scram_ServerKey, scram_server_key, sizeof(user->scram_ServerKey));
1449 		user->has_scram_keys = true;
1450 	}
1451 
1452 	client = accept_client(fd, pga_is_unix(addr));
1453 	if (client == NULL)
1454 		return false;
1455 	client->suspended = true;
1456 
1457 	if (!set_pool(client, dbname, username, password, true))
1458 		return false;
1459 
1460 	change_client_state(client, CL_ACTIVE);
1461 
1462 	/* store old cancel key */
1463 	pktbuf_static(&tmp, client->cancel_key, 8);
1464 	pktbuf_put_uint64(&tmp, ckey);
1465 
1466 	/* store old fds */
1467 	client->tmp_sk_oldfd = oldfd;
1468 	client->tmp_sk_linkfd = linkfd;
1469 
1470 	varcache_set(&client->vars, "client_encoding", client_enc);
1471 	varcache_set(&client->vars, "standard_conforming_strings", std_string);
1472 	varcache_set(&client->vars, "datestyle", datestyle);
1473 	varcache_set(&client->vars, "timezone", timezone);
1474 
1475 	return true;
1476 }
1477 
use_server_socket(int fd,PgAddr * addr,const char * dbname,const char * username,uint64_t ckey,int oldfd,int linkfd,const char * client_enc,const char * std_string,const char * datestyle,const char * timezone,const char * password,const char * scram_client_key,int scram_client_key_len,const char * scram_server_key,int scram_server_key_len)1478 bool use_server_socket(int fd, PgAddr *addr,
1479 		       const char *dbname, const char *username,
1480 		       uint64_t ckey, int oldfd, int linkfd,
1481 		       const char *client_enc, const char *std_string,
1482 		       const char *datestyle, const char *timezone,
1483 		       const char *password,
1484 		       const char *scram_client_key, int scram_client_key_len,
1485 		       const char *scram_server_key, int scram_server_key_len)
1486 {
1487 	PgDatabase *db = find_database(dbname);
1488 	PgUser *user;
1489 	PgPool *pool;
1490 	PgSocket *server;
1491 	PktBuf tmp;
1492 	bool res;
1493 
1494 	/* if the database not found, it's an auto database -> registering... */
1495 	if (!db) {
1496 		db = register_auto_database(dbname);
1497 		if (!db)
1498 			return true;
1499 	}
1500 
1501 	if (db->forced_user) {
1502 		user = db->forced_user;
1503 	} else if (cf_auth_type == AUTH_PAM) {
1504 		user = add_pam_user(username, password);
1505 	} else {
1506 		user = find_user(username);
1507 	}
1508 	if (!user && db->auth_user)
1509 		user = add_db_user(db, username, password);
1510 
1511 	pool = get_pool(db, user);
1512 	if (!pool)
1513 		return false;
1514 
1515 	server = slab_alloc(server_cache);
1516 	if (!server)
1517 		return false;
1518 
1519 	res = sbuf_accept(&server->sbuf, fd, pga_is_unix(addr));
1520 	if (!res)
1521 		return false;
1522 
1523 	db->connection_count++;
1524 
1525 	server->suspended = true;
1526 	server->pool = pool;
1527 	server->login_user = user;
1528 	server->connect_time = server->request_time = get_cached_time();
1529 	server->query_start = 0;
1530 
1531 	fill_remote_addr(server, fd, pga_is_unix(addr));
1532 	fill_local_addr(server, fd, pga_is_unix(addr));
1533 
1534 	if (linkfd) {
1535 		server->ready = false;
1536 		change_server_state(server, SV_ACTIVE);
1537 	} else {
1538 		server->ready = true;
1539 		change_server_state(server, SV_IDLE);
1540 	}
1541 
1542 	/* store old cancel key */
1543 	pktbuf_static(&tmp, server->cancel_key, 8);
1544 	pktbuf_put_uint64(&tmp, ckey);
1545 
1546 	/* store old fds */
1547 	server->tmp_sk_oldfd = oldfd;
1548 	server->tmp_sk_linkfd = linkfd;
1549 
1550 	varcache_set(&server->vars, "client_encoding", client_enc);
1551 	varcache_set(&server->vars, "standard_conforming_strings", std_string);
1552 	varcache_set(&server->vars, "datestyle", datestyle);
1553 	varcache_set(&server->vars, "timezone", timezone);
1554 
1555 	return true;
1556 }
1557 
for_each_server(PgPool * pool,void (* func)(PgSocket * sk))1558 void for_each_server(PgPool *pool, void (*func)(PgSocket *sk))
1559 {
1560 	struct List *item;
1561 
1562 	statlist_for_each(item, &pool->idle_server_list)
1563 		func(container_of(item, PgSocket, head));
1564 
1565 	statlist_for_each(item, &pool->used_server_list)
1566 		func(container_of(item, PgSocket, head));
1567 
1568 	statlist_for_each(item, &pool->tested_server_list)
1569 		func(container_of(item, PgSocket, head));
1570 
1571 	statlist_for_each(item, &pool->active_server_list)
1572 		func(container_of(item, PgSocket, head));
1573 
1574 	statlist_for_each(item, &pool->new_server_list)
1575 		func(container_of(item, PgSocket, head));
1576 }
1577 
for_each_server_filtered(PgPool * pool,void (* func)(PgSocket * sk),bool (* filter)(PgSocket * sk,void * arg),void * filter_arg)1578 static void for_each_server_filtered(PgPool *pool, void (*func)(PgSocket *sk), bool (*filter)(PgSocket *sk, void *arg), void *filter_arg)
1579 {
1580 	struct List *item;
1581 	PgSocket *sk;
1582 
1583 	statlist_for_each(item, &pool->idle_server_list) {
1584 		sk = container_of(item, PgSocket, head);
1585 		if (filter(sk, filter_arg))
1586 			func(sk);
1587 	}
1588 
1589 	statlist_for_each(item, &pool->used_server_list) {
1590 		sk = container_of(item, PgSocket, head);
1591 		if (filter(sk, filter_arg))
1592 			func(sk);
1593 	}
1594 
1595 	statlist_for_each(item, &pool->tested_server_list) {
1596 		sk = container_of(item, PgSocket, head);
1597 		if (filter(sk, filter_arg))
1598 			func(sk);
1599 	}
1600 
1601 	statlist_for_each(item, &pool->active_server_list) {
1602 		sk = container_of(item, PgSocket, head);
1603 		if (filter(sk, filter_arg))
1604 			func(sk);
1605 	}
1606 
1607 	statlist_for_each(item, &pool->new_server_list) {
1608 		sk = container_of(item, PgSocket, head);
1609 		if (filter(sk, filter_arg))
1610 			func(sk);
1611 	}
1612 }
1613 
1614 
tag_dirty(PgSocket * sk)1615 static void tag_dirty(PgSocket *sk)
1616 {
1617 	sk->close_needed = true;
1618 }
1619 
tag_pool_dirty(PgPool * pool)1620 void tag_pool_dirty(PgPool *pool)
1621 {
1622 	struct List *item, *tmp;
1623 	struct PgSocket *server;
1624 
1625 	/*
1626 	 * Don't tag the admin pool as dirty, since this is not an actual postgres
1627 	 * server. Marking it as dirty breaks connecting to the pgbouncer admin
1628 	 * database on future connections.
1629 	 */
1630 	if (pool->db->admin)
1631 		return;
1632 
1633 	/* reset welcome msg */
1634 	if (pool->welcome_msg) {
1635 		pktbuf_free(pool->welcome_msg);
1636 		pool->welcome_msg = NULL;
1637 	}
1638 	pool->welcome_msg_ready = false;
1639 
1640 	/* drop all existing servers ASAP */
1641 	for_each_server(pool, tag_dirty);
1642 
1643 	/* drop servers login phase immediately */
1644 	statlist_for_each_safe(item, &pool->new_server_list, tmp) {
1645 		server = container_of(item, PgSocket, head);
1646 		disconnect_server(server, true, "connect string changed");
1647 	}
1648 }
1649 
tag_database_dirty(PgDatabase * db)1650 void tag_database_dirty(PgDatabase *db)
1651 {
1652 	struct List *item;
1653 	PgPool *pool;
1654 
1655 	statlist_for_each(item, &pool_list) {
1656 		pool = container_of(item, PgPool, head);
1657 		if (pool->db == db)
1658 			tag_pool_dirty(pool);
1659 	}
1660 }
1661 
tag_autodb_dirty(void)1662 void tag_autodb_dirty(void)
1663 {
1664 	struct List *item, *tmp;
1665 	PgDatabase *db;
1666 	PgPool *pool;
1667 
1668 	/*
1669 	 * reload databases.
1670 	 */
1671 	statlist_for_each(item, &database_list) {
1672 		db = container_of(item, PgDatabase, head);
1673 		if (db->db_auto)
1674 			register_auto_database(db->name);
1675 	}
1676 	statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
1677 		db = container_of(item, PgDatabase, head);
1678 		if (db->db_auto)
1679 			register_auto_database(db->name);
1680 	}
1681 	/*
1682 	 * reload pools
1683 	 */
1684 	statlist_for_each(item, &pool_list) {
1685 		pool = container_of(item, PgPool, head);
1686 		if (pool->db->db_auto)
1687 			tag_pool_dirty(pool);
1688 	}
1689 }
1690 
server_remote_addr_filter(PgSocket * sk,void * arg)1691 static bool server_remote_addr_filter(PgSocket *sk, void *arg) {
1692 	PgAddr *addr = arg;
1693 
1694 	return (pga_cmp_addr(&sk->remote_addr, addr) == 0);
1695 }
1696 
tag_host_addr_dirty(const char * host,const struct sockaddr * sa)1697 void tag_host_addr_dirty(const char *host, const struct sockaddr *sa)
1698 {
1699 	struct List *item;
1700 	PgPool *pool;
1701 	PgAddr addr;
1702 
1703 	memset(&addr, 0, sizeof(addr));
1704 	pga_copy(&addr, sa);
1705 
1706 	statlist_for_each(item, &pool_list) {
1707 		pool = container_of(item, PgPool, head);
1708 		if (pool->db->host && strcmp(host, pool->db->host) == 0) {
1709 			for_each_server_filtered(pool, tag_dirty, server_remote_addr_filter, &addr);
1710 		}
1711 	}
1712 }
1713 
1714 
1715 /* move objects from justfree_* to free_* lists */
reuse_just_freed_objects(void)1716 void reuse_just_freed_objects(void)
1717 {
1718 	struct List *tmp, *item;
1719 	PgSocket *sk;
1720 	bool close_works = true;
1721 
1722 	/*
1723 	 * event_del() may fail because of ENOMEM for event handlers
1724 	 * that need only changes sent to kernel on each loop.
1725 	 *
1726 	 * Keep open sbufs in justfree lists until successful.
1727 	 */
1728 
1729 	statlist_for_each_safe(item, &justfree_client_list, tmp) {
1730 		sk = container_of(item, PgSocket, head);
1731 		if (sbuf_is_closed(&sk->sbuf)) {
1732 			change_client_state(sk, CL_FREE);
1733 		} else if (close_works) {
1734 			close_works = sbuf_close(&sk->sbuf);
1735 		}
1736 	}
1737 	statlist_for_each_safe(item, &justfree_server_list, tmp) {
1738 		sk = container_of(item, PgSocket, head);
1739 		if (sbuf_is_closed(&sk->sbuf)) {
1740 			change_server_state(sk, SV_FREE);
1741 		} else if (close_works) {
1742 			close_works = sbuf_close(&sk->sbuf);
1743 		}
1744 	}
1745 }
1746 
objects_cleanup(void)1747 void objects_cleanup(void)
1748 {
1749 	struct List *item, *tmp;
1750 	PgDatabase *db;
1751 
1752 	/* close can be postpones, just in case call twice */
1753 	reuse_just_freed_objects();
1754 	reuse_just_freed_objects();
1755 
1756 	statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
1757 		db = container_of(item, PgDatabase, head);
1758 		kill_database(db);
1759 	}
1760 	statlist_for_each_safe(item, &database_list, tmp) {
1761 		db = container_of(item, PgDatabase, head);
1762 		kill_database(db);
1763 	}
1764 
1765 	memset(&login_client_list, 0, sizeof login_client_list);
1766 	memset(&user_list, 0, sizeof user_list);
1767 	memset(&database_list, 0, sizeof database_list);
1768 	memset(&pool_list, 0, sizeof pool_list);
1769 	memset(&user_tree, 0, sizeof user_tree);
1770 	memset(&autodatabase_idle_list, 0, sizeof autodatabase_idle_list);
1771 
1772 	slab_destroy(server_cache);
1773 	server_cache = NULL;
1774 	slab_destroy(client_cache);
1775 	client_cache = NULL;
1776 	slab_destroy(db_cache);
1777 	db_cache = NULL;
1778 	slab_destroy(pool_cache);
1779 	pool_cache = NULL;
1780 	slab_destroy(user_cache);
1781 	user_cache = NULL;
1782 	slab_destroy(iobuf_cache);
1783 	iobuf_cache = NULL;
1784 }
1785