1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  *
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  *
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 /*
20  * Handling of server connections
21  */
22 
23 #include "bouncer.h"
24 
load_parameter(PgSocket * server,PktHdr * pkt,bool startup)25 static bool load_parameter(PgSocket *server, PktHdr *pkt, bool startup)
26 {
27 	const char *key, *val;
28 	PgSocket *client = server->link;
29 
30 	/*
31 	 * Want to see complete packet.  That means SMALL_PKT
32 	 * in sbuf.c must be larger than max param pkt.
33 	 */
34 	if (incomplete_pkt(pkt))
35 		return false;
36 
37 	if (!mbuf_get_string(&pkt->data, &key))
38 		goto failed;
39 	if (!mbuf_get_string(&pkt->data, &val))
40 		goto failed;
41 	slog_debug(server, "S: param: %s = %s", key, val);
42 
43 	varcache_set(&server->vars, key, val);
44 
45 	if (client) {
46 		slog_debug(client, "setting client var: %s='%s'", key, val);
47 		varcache_set(&client->vars, key, val);
48 	}
49 
50 	if (startup) {
51 		if (!add_welcome_parameter(server->pool, key, val))
52 			goto failed_store;
53 	}
54 
55 	return true;
56 failed:
57 	disconnect_server(server, true, "broken ParameterStatus packet");
58 	return false;
59 failed_store:
60 	disconnect_server(server, true, "failed to store ParameterStatus");
61 	return false;
62 }
63 
64 /* we cannot log in at all, notify clients */
kill_pool_logins(PgPool * pool,const char * msg)65 void kill_pool_logins(PgPool *pool, const char *msg)
66 {
67 	struct List *item, *tmp;
68 	PgSocket *client;
69 
70 	statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
71 		client = container_of(item, PgSocket, head);
72 		if (!client->wait_for_welcome)
73 			continue;
74 
75 		disconnect_client(client, true, "%s", msg);
76 	}
77 }
78 
79 /* we cannot log in at all, notify clients with server error */
kill_pool_logins_server_error(PgPool * pool,PktHdr * errpkt)80 static void kill_pool_logins_server_error(PgPool *pool, PktHdr *errpkt)
81 {
82 	const char *level, *msg;
83 
84 	parse_server_error(errpkt, &level, &msg);
85 	log_warning("server login failed: %s %s", level, msg);
86 	kill_pool_logins(pool, msg);
87 }
88 
89 /* process packets on server auth phase */
handle_server_startup(PgSocket * server,PktHdr * pkt)90 static bool handle_server_startup(PgSocket *server, PktHdr *pkt)
91 {
92 	SBuf *sbuf = &server->sbuf;
93 	bool res = false;
94 	const uint8_t *ckey;
95 
96 	if (incomplete_pkt(pkt)) {
97 		disconnect_server(server, true, "partial pkt in login phase");
98 		return false;
99 	}
100 
101 	/* ignore most that happens during connect_query */
102 	if (server->exec_on_connect) {
103 		switch (pkt->type) {
104 		case 'Z':
105 		case 'S':	/* handle them below */
106 			break;
107 
108 		case 'E':	/* log & ignore errors */
109 			log_server_error("S: error while executing exec_on_query", pkt);
110 			/* fallthrough */
111 		default:	/* ignore rest */
112 			sbuf_prepare_skip(sbuf, pkt->len);
113 			return true;
114 		}
115 	}
116 
117 	switch (pkt->type) {
118 	default:
119 		slog_error(server, "unknown pkt from server: '%c'", pkt_desc(pkt));
120 		disconnect_server(server, true, "unknown pkt from server");
121 		break;
122 
123 	case 'E':		/* ErrorResponse */
124 		if (!server->pool->welcome_msg_ready)
125 			kill_pool_logins_server_error(server->pool, pkt);
126 		else
127 			log_server_error("S: login failed", pkt);
128 
129 		disconnect_server(server, true, "login failed");
130 		break;
131 
132 	/* packets that need closer look */
133 	case 'R':		/* AuthenticationXXX */
134 		slog_debug(server, "calling login_answer");
135 		res = answer_authreq(server, pkt);
136 		if (!res)
137 			disconnect_server(server, false, "failed to answer authreq");
138 		break;
139 
140 	case 'S':		/* ParameterStatus */
141 		res = load_parameter(server, pkt, true);
142 		break;
143 
144 	case 'Z':		/* ReadyForQuery */
145 		if (server->exec_on_connect) {
146 			server->exec_on_connect = false;
147 			/* deliberately ignore transaction status */
148 		} else if (server->pool->db->connect_query) {
149 			server->exec_on_connect = true;
150 			slog_debug(server, "server connect ok, send exec_on_connect");
151 			SEND_generic(res, server, 'Q', "s", server->pool->db->connect_query);
152 			if (!res)
153 				disconnect_server(server, false, "exec_on_connect query failed");
154 			break;
155 		}
156 
157 		/* login ok */
158 		slog_debug(server, "server login ok, start accepting queries");
159 		server->ready = true;
160 
161 		/* got all params */
162 		finish_welcome_msg(server);
163 
164 		/* need to notify sbuf if server was closed */
165 		res = release_server(server);
166 
167 		/* let the takeover process handle it */
168 		if (res && server->pool->db->admin)
169 			res = takeover_login(server);
170 		break;
171 
172 	/* ignorable packets */
173 	case 'K':		/* BackendKeyData */
174 		if (!mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN, &ckey)) {
175 			disconnect_server(server, true, "bad cancel key");
176 			return false;
177 		}
178 		memcpy(server->cancel_key, ckey, BACKENDKEY_LEN);
179 		res = true;
180 		break;
181 
182 	case 'N':		/* NoticeResponse */
183 		slog_noise(server, "skipping pkt: %c", pkt_desc(pkt));
184 		res = true;
185 		break;
186 	}
187 
188 	if (res)
189 		sbuf_prepare_skip(sbuf, pkt->len);
190 
191 	return res;
192 }
193 
pool_pool_mode(PgPool * pool)194 int pool_pool_mode(PgPool *pool)
195 {
196 	int pool_mode = pool->user->pool_mode;
197 	if (pool_mode == POOL_INHERIT)
198 		pool_mode = pool->db->pool_mode;
199 	if (pool_mode == POOL_INHERIT)
200 		pool_mode = cf_pool_mode;
201 	return pool_mode;
202 }
203 
pool_pool_size(PgPool * pool)204 int pool_pool_size(PgPool *pool)
205 {
206 	if (pool->db->pool_size < 0)
207 		return cf_default_pool_size;
208 	else
209 		return pool->db->pool_size;
210 }
211 
pool_min_pool_size(PgPool * pool)212 int pool_min_pool_size(PgPool *pool)
213 {
214 	if (pool->db->min_pool_size < 0)
215 		return cf_min_pool_size;
216 	else
217 		return pool->db->min_pool_size;
218 }
219 
pool_res_pool_size(PgPool * pool)220 int pool_res_pool_size(PgPool *pool)
221 {
222 	if (pool->db->res_pool_size < 0)
223 		return cf_res_pool_size;
224 	else
225 		return pool->db->res_pool_size;
226 }
227 
database_max_connections(PgDatabase * db)228 int database_max_connections(PgDatabase *db)
229 {
230 	if (db->max_db_connections <= 0) {
231 		return cf_max_db_connections;
232         } else {
233 		return db->max_db_connections;
234 	}
235 }
236 
user_max_connections(PgUser * user)237 int user_max_connections(PgUser *user)
238 {
239 	if (user->max_user_connections <= 0) {
240 		return cf_max_user_connections;
241 	} else {
242 		return user->max_user_connections;
243 	}
244 }
245 
246 /* process packets on logged in connection */
handle_server_work(PgSocket * server,PktHdr * pkt)247 static bool handle_server_work(PgSocket *server, PktHdr *pkt)
248 {
249 	bool ready = false;
250 	bool idle_tx = false;
251 	char state;
252 	SBuf *sbuf = &server->sbuf;
253 	PgSocket *client = server->link;
254 	bool async_response = false;
255 
256 	Assert(!server->pool->db->admin);
257 
258 	switch (pkt->type) {
259 	default:
260 		slog_error(server, "unknown pkt: '%c'", pkt_desc(pkt));
261 		disconnect_server(server, true, "unknown pkt");
262 		return false;
263 
264 	/* pooling decisions will be based on this packet */
265 	case 'Z':		/* ReadyForQuery */
266 
267 		/* if partial pkt, wait */
268 		if (!mbuf_get_char(&pkt->data, &state))
269 			return false;
270 
271 		/* set ready only if no tx */
272 		if (state == 'I')
273 			ready = true;
274 		else if (pool_pool_mode(server->pool) == POOL_STMT) {
275 			disconnect_server(server, true, "transaction blocks not allowed in statement pooling mode");
276 			return false;
277 		} else if (state == 'T' || state == 'E') {
278 			idle_tx = true;
279 		}
280 
281 		if (client && !server->setting_vars) {
282 			if (client->expect_rfq_count > 0) {
283 				client->expect_rfq_count--;
284 			} else if (server->state == SV_ACTIVE) {
285 				slog_debug(client, "unexpected ReadyForQuery - expect_rfq_count=%d", client->expect_rfq_count);
286 			}
287 		}
288 		break;
289 
290 	case 'S':		/* ParameterStatus */
291 		if (!load_parameter(server, pkt, false))
292 			return false;
293 		break;
294 
295 	/*
296 	 * 'E' and 'N' packets currently set ->ready to false.  Correct would
297 	 * be to leave ->ready as-is, because overall TX state stays same.
298 	 * It matters for connections in IDLE or USED state which get dirty
299 	 * suddenly but should not as they are still usable.
300 	 *
301 	 * But the 'E' or 'N' packet between transactions signifies probably
302 	 * dying backend.  It is better to tag server as dirty and drop
303 	 * it later.
304 	 */
305 	case 'E':		/* ErrorResponse */
306 		if (server->setting_vars) {
307 			/*
308 			 * the SET and user query will be different TX
309 			 * so we cannot report SET error to user.
310 			 */
311 			log_server_error("varcache_apply failed", pkt);
312 
313 			/*
314 			 * client probably gave invalid values in startup pkt.
315 			 *
316 			 * no reason to keep such guys.
317 			 */
318 			disconnect_server(server, true, "invalid server parameter");
319 			return false;
320 		}
321 		/* fallthrough */
322 	case 'C':		/* CommandComplete */
323 
324 		/* ErrorResponse and CommandComplete show end of copy mode */
325 		if (server->copy_mode) {
326 			server->copy_mode = false;
327 
328 			/* it's impossible to track sync count over copy */
329 			if (client)
330 				client->expect_rfq_count = 0;
331 		}
332 		break;
333 
334 	case 'N':		/* NoticeResponse */
335 		break;
336 
337 	/* reply to LISTEN, don't change connection state */
338 	case 'A':		/* NotificationResponse */
339 		idle_tx = server->idle_tx;
340 		ready = server->ready;
341 		async_response = true;
342 		break;
343 
344 	/* copy mode */
345 	case 'G':		/* CopyInResponse */
346 	case 'H':		/* CopyOutResponse */
347 		server->copy_mode = true;
348 		break;
349 	/* chat packets */
350 	case '2':		/* BindComplete */
351 	case '3':		/* CloseComplete */
352 	case 'c':		/* CopyDone(F/B) */
353 	case 'f':		/* CopyFail(F/B) */
354 	case 'I':		/* EmptyQueryResponse == CommandComplete */
355 	case 'V':		/* FunctionCallResponse */
356 	case 'n':		/* NoData */
357 	case '1':		/* ParseComplete */
358 	case 's':		/* PortalSuspended */
359 
360 	/* data packets, there will be more coming */
361 	case 'd':		/* CopyData(F/B) */
362 	case 'D':		/* DataRow */
363 	case 't':		/* ParameterDescription */
364 	case 'T':		/* RowDescription */
365 		break;
366 	}
367 	server->idle_tx = idle_tx;
368 	server->ready = ready;
369 	server->pool->stats.server_bytes += pkt->len;
370 
371 	if (server->setting_vars) {
372 		Assert(client);
373 		sbuf_prepare_skip(sbuf, pkt->len);
374 	} else if (client) {
375 		if (client->state == CL_LOGIN) {
376 			return handle_auth_query_response(client, pkt);
377 		} else {
378 			sbuf_prepare_send(sbuf, &client->sbuf, pkt->len);
379 
380 			/*
381 			 * Compute query and transaction times
382 			 *
383 			 * For pipelined overlapping commands, we wait until
384 			 * the last command is done (expect_rfq_count==0).
385 			 * That means, we count the time that PgBouncer is
386 			 * occupied in a query or transaction, not the total
387 			 * time that all queries/transactions take
388 			 * individually.  For that, we would have to track the
389 			 * start time of each query separately in a queue or
390 			 * similar, not only per client.
391 			 */
392 			if (client->expect_rfq_count == 0) {
393 				/* every statement (independent or in a transaction) counts as a query */
394 				if (ready || idle_tx) {
395 					if (client->query_start) {
396 						usec_t total;
397 						total = get_cached_time() - client->query_start;
398 						client->query_start = 0;
399 						server->pool->stats.query_time += total;
400 						slog_debug(client, "query time: %d us", (int)total);
401 					} else if (!async_response) {
402 						slog_warning(client, "FIXME: query end, but query_start == 0");
403 					}
404 				}
405 
406 				/* statement ending in "idle" ends a transaction */
407 				if (ready) {
408 					if (client->xact_start) {
409 						usec_t total;
410 						total = get_cached_time() - client->xact_start;
411 						client->xact_start = 0;
412 						server->pool->stats.xact_time += total;
413 						slog_debug(client, "transaction time: %d us", (int)total);
414 					} else if (!async_response) {
415 						/* XXX This happens during takeover if the new process
416 						 * continues a transaction. */
417 						slog_warning(client, "FIXME: transaction end, but xact_start == 0");
418 					}
419 				}
420 			}
421 		}
422 	} else {
423 		if (server->state != SV_TESTED)
424 			slog_warning(server,
425 				     "got packet '%c' from server when not linked",
426 				     pkt_desc(pkt));
427 		sbuf_prepare_skip(sbuf, pkt->len);
428 	}
429 
430 	return true;
431 }
432 
433 /* got connection, decide what to do */
handle_connect(PgSocket * server)434 static bool handle_connect(PgSocket *server)
435 {
436 	bool res = false;
437 	PgPool *pool = server->pool;
438 	char buf[PGADDR_BUF + 32];
439 	bool is_unix = pga_is_unix(&server->remote_addr);
440 
441 	fill_local_addr(server, sbuf_socket(&server->sbuf), is_unix);
442 
443 	if (cf_log_connections) {
444 		if (pga_is_unix(&server->remote_addr))
445 			slog_info(server, "new connection to server");
446 		else
447 			slog_info(server, "new connection to server (from %s)",
448 				  pga_str(&server->local_addr, buf, sizeof(buf)));
449 	}
450 
451 	if (!statlist_empty(&pool->cancel_req_list)) {
452 		slog_debug(server, "use it for pending cancel req");
453 		/* if pending cancel req, send it */
454 		forward_cancel_request(server);
455 		/* notify disconnect_server() that connect did not fail */
456 		server->ready = true;
457 		disconnect_server(server, false, "sent cancel req");
458 	} else {
459 		/* proceed with login */
460 		if (server_connect_sslmode > SSLMODE_DISABLED && !is_unix) {
461 			slog_noise(server, "P: SSL request");
462 			res = send_sslreq_packet(server);
463 			if (res)
464 				server->wait_sslchar = true;
465 		} else {
466 			slog_noise(server, "P: startup");
467 			res = send_startup_packet(server);
468 		}
469 		if (!res)
470 			disconnect_server(server, false, "startup pkt failed");
471 	}
472 	return res;
473 }
474 
handle_sslchar(PgSocket * server,struct MBuf * data)475 static bool handle_sslchar(PgSocket *server, struct MBuf *data)
476 {
477 	uint8_t schar = '?';
478 	bool ok;
479 
480 	server->wait_sslchar = false;
481 
482 	ok = mbuf_get_byte(data, &schar);
483 	if (!ok || (schar != 'S' && schar != 'N') || mbuf_avail_for_read(data) != 0) {
484 		disconnect_server(server, false, "bad sslreq answer");
485 		return false;
486 	}
487 
488 	if (schar == 'S') {
489 		slog_noise(server, "launching tls");
490 		ok = sbuf_tls_connect(&server->sbuf, server->pool->db->host);
491 	} else if (server_connect_sslmode >= SSLMODE_REQUIRE) {
492 		disconnect_server(server, false, "server refused SSL");
493 		return false;
494 	} else {
495 		/* proceed with non-TLS connection */
496 		ok = send_startup_packet(server);
497 	}
498 
499 	if (ok) {
500 		sbuf_prepare_skip(&server->sbuf, 1);
501 	} else {
502 		disconnect_server(server, false, "sslreq processing failed");
503 	}
504 	return ok;
505 }
506 
507 /* callback from SBuf */
server_proto(SBuf * sbuf,SBufEvent evtype,struct MBuf * data)508 bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data)
509 {
510 	bool res = false;
511 	PgSocket *server = container_of(sbuf, PgSocket, sbuf);
512 	PgPool *pool = server->pool;
513 	PktHdr pkt;
514 	char infobuf[96];
515 
516 	Assert(is_server_socket(server));
517 	Assert(server->state != SV_FREE);
518 
519 	/* may happen if close failed */
520 	if (server->state == SV_JUSTFREE)
521 		return false;
522 
523 	switch (evtype) {
524 	case SBUF_EV_RECV_FAILED:
525 		disconnect_server(server, false, "server conn crashed?");
526 		break;
527 	case SBUF_EV_SEND_FAILED:
528 		disconnect_client(server->link, false, "unexpected eof");
529 		break;
530 	case SBUF_EV_READ:
531 		if (server->wait_sslchar) {
532 			res = handle_sslchar(server, data);
533 			break;
534 		}
535 		if (incomplete_header(data)) {
536 			slog_noise(server, "S: got partial header, trying to wait a bit");
537 			break;
538 		}
539 
540 		/* parse pkt header */
541 		if (!get_header(data, &pkt)) {
542 			disconnect_server(server, true, "bad pkt header");
543 			break;
544 		}
545 		slog_noise(server, "read pkt='%c', len=%d", pkt_desc(&pkt), pkt.len);
546 
547 		server->request_time = get_cached_time();
548 		switch (server->state) {
549 		case SV_LOGIN:
550 			res = handle_server_startup(server, &pkt);
551 			break;
552 		case SV_TESTED:
553 		case SV_USED:
554 		case SV_ACTIVE:
555 		case SV_IDLE:
556 			res = handle_server_work(server, &pkt);
557 			break;
558 		default:
559 			fatal("server_proto: server in bad state: %d", server->state);
560 		}
561 		break;
562 	case SBUF_EV_CONNECT_FAILED:
563 		Assert(server->state == SV_LOGIN);
564 		disconnect_server(server, false, "connect failed");
565 		break;
566 	case SBUF_EV_CONNECT_OK:
567 		slog_debug(server, "S: connect ok");
568 		Assert(server->state == SV_LOGIN);
569 		server->request_time = get_cached_time();
570 		res = handle_connect(server);
571 		break;
572 	case SBUF_EV_FLUSH:
573 		res = true;
574 		if (!server->ready)
575 			break;
576 
577 		if (server->setting_vars) {
578 			PgSocket *client = server->link;
579 			Assert(client);
580 
581 			server->setting_vars = false;
582 			sbuf_continue(&client->sbuf);
583 			break;
584 		}
585 
586 		if (pool_pool_mode(pool) != POOL_SESSION || server->state == SV_TESTED || server->resetting) {
587 			server->resetting = false;
588 			switch (server->state) {
589 			case SV_ACTIVE:
590 			case SV_TESTED:
591 				/* keep link if client expects more Syncs */
592 				if (server->link) {
593 					if (server->link->expect_rfq_count > 0)
594 						break;
595 				}
596 
597 				/* retval does not matter here */
598 				release_server(server);
599 				break;
600 			default:
601 				slog_warning(server, "EV_FLUSH with state=%d", server->state);
602 			case SV_IDLE:
603 				break;
604 			}
605 		}
606 		break;
607 	case SBUF_EV_PKT_CALLBACK:
608 		slog_warning(server, "SBUF_EV_PKT_CALLBACK with state=%d", server->state);
609 		break;
610 	case SBUF_EV_TLS_READY:
611 		Assert(server->state == SV_LOGIN);
612 
613 		tls_get_connection_info(server->sbuf.tls, infobuf, sizeof infobuf);
614 		if (cf_log_connections) {
615 			slog_info(server, "SSL established: %s", infobuf);
616 		} else {
617 			slog_noise(server, "SSL established: %s", infobuf);
618 		}
619 
620 		server->request_time = get_cached_time();
621 		res = send_startup_packet(server);
622 		if (res)
623 			sbuf_continue(&server->sbuf);
624 		else
625 			disconnect_server(server, false, "TLS startup failed");
626 		break;
627 	}
628 	if (!res && pool->db->admin)
629 		takeover_login_failed();
630 	return res;
631 }
632