1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 /*
20 * Handling of server connections
21 */
22
23 #include "bouncer.h"
24
load_parameter(PgSocket * server,PktHdr * pkt,bool startup)25 static bool load_parameter(PgSocket *server, PktHdr *pkt, bool startup)
26 {
27 const char *key, *val;
28 PgSocket *client = server->link;
29
30 /*
31 * Want to see complete packet. That means SMALL_PKT
32 * in sbuf.c must be larger than max param pkt.
33 */
34 if (incomplete_pkt(pkt))
35 return false;
36
37 if (!mbuf_get_string(&pkt->data, &key))
38 goto failed;
39 if (!mbuf_get_string(&pkt->data, &val))
40 goto failed;
41 slog_debug(server, "S: param: %s = %s", key, val);
42
43 varcache_set(&server->vars, key, val);
44
45 if (client) {
46 slog_debug(client, "setting client var: %s='%s'", key, val);
47 varcache_set(&client->vars, key, val);
48 }
49
50 if (startup) {
51 if (!add_welcome_parameter(server->pool, key, val))
52 goto failed_store;
53 }
54
55 return true;
56 failed:
57 disconnect_server(server, true, "broken ParameterStatus packet");
58 return false;
59 failed_store:
60 disconnect_server(server, true, "failed to store ParameterStatus");
61 return false;
62 }
63
64 /* we cannot log in at all, notify clients */
kill_pool_logins(PgPool * pool,const char * msg)65 void kill_pool_logins(PgPool *pool, const char *msg)
66 {
67 struct List *item, *tmp;
68 PgSocket *client;
69
70 statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
71 client = container_of(item, PgSocket, head);
72 if (!client->wait_for_welcome)
73 continue;
74
75 disconnect_client(client, true, "%s", msg);
76 }
77 }
78
79 /* we cannot log in at all, notify clients with server error */
kill_pool_logins_server_error(PgPool * pool,PktHdr * errpkt)80 static void kill_pool_logins_server_error(PgPool *pool, PktHdr *errpkt)
81 {
82 const char *level, *msg;
83
84 parse_server_error(errpkt, &level, &msg);
85 log_warning("server login failed: %s %s", level, msg);
86 kill_pool_logins(pool, msg);
87 }
88
89 /* process packets on server auth phase */
handle_server_startup(PgSocket * server,PktHdr * pkt)90 static bool handle_server_startup(PgSocket *server, PktHdr *pkt)
91 {
92 SBuf *sbuf = &server->sbuf;
93 bool res = false;
94 const uint8_t *ckey;
95
96 if (incomplete_pkt(pkt)) {
97 disconnect_server(server, true, "partial pkt in login phase");
98 return false;
99 }
100
101 /* ignore most that happens during connect_query */
102 if (server->exec_on_connect) {
103 switch (pkt->type) {
104 case 'Z':
105 case 'S': /* handle them below */
106 break;
107
108 case 'E': /* log & ignore errors */
109 log_server_error("S: error while executing exec_on_query", pkt);
110 /* fallthrough */
111 default: /* ignore rest */
112 sbuf_prepare_skip(sbuf, pkt->len);
113 return true;
114 }
115 }
116
117 switch (pkt->type) {
118 default:
119 slog_error(server, "unknown pkt from server: '%c'", pkt_desc(pkt));
120 disconnect_server(server, true, "unknown pkt from server");
121 break;
122
123 case 'E': /* ErrorResponse */
124 if (!server->pool->welcome_msg_ready)
125 kill_pool_logins_server_error(server->pool, pkt);
126 else
127 log_server_error("S: login failed", pkt);
128
129 disconnect_server(server, true, "login failed");
130 break;
131
132 /* packets that need closer look */
133 case 'R': /* AuthenticationXXX */
134 slog_debug(server, "calling login_answer");
135 res = answer_authreq(server, pkt);
136 if (!res)
137 disconnect_server(server, false, "failed to answer authreq");
138 break;
139
140 case 'S': /* ParameterStatus */
141 res = load_parameter(server, pkt, true);
142 break;
143
144 case 'Z': /* ReadyForQuery */
145 if (server->exec_on_connect) {
146 server->exec_on_connect = false;
147 /* deliberately ignore transaction status */
148 } else if (server->pool->db->connect_query) {
149 server->exec_on_connect = true;
150 slog_debug(server, "server connect ok, send exec_on_connect");
151 SEND_generic(res, server, 'Q', "s", server->pool->db->connect_query);
152 if (!res)
153 disconnect_server(server, false, "exec_on_connect query failed");
154 break;
155 }
156
157 /* login ok */
158 slog_debug(server, "server login ok, start accepting queries");
159 server->ready = true;
160
161 /* got all params */
162 finish_welcome_msg(server);
163
164 /* need to notify sbuf if server was closed */
165 res = release_server(server);
166
167 /* let the takeover process handle it */
168 if (res && server->pool->db->admin)
169 res = takeover_login(server);
170 break;
171
172 /* ignorable packets */
173 case 'K': /* BackendKeyData */
174 if (!mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN, &ckey)) {
175 disconnect_server(server, true, "bad cancel key");
176 return false;
177 }
178 memcpy(server->cancel_key, ckey, BACKENDKEY_LEN);
179 res = true;
180 break;
181
182 case 'N': /* NoticeResponse */
183 slog_noise(server, "skipping pkt: %c", pkt_desc(pkt));
184 res = true;
185 break;
186 }
187
188 if (res)
189 sbuf_prepare_skip(sbuf, pkt->len);
190
191 return res;
192 }
193
pool_pool_mode(PgPool * pool)194 int pool_pool_mode(PgPool *pool)
195 {
196 int pool_mode = pool->user->pool_mode;
197 if (pool_mode == POOL_INHERIT)
198 pool_mode = pool->db->pool_mode;
199 if (pool_mode == POOL_INHERIT)
200 pool_mode = cf_pool_mode;
201 return pool_mode;
202 }
203
pool_pool_size(PgPool * pool)204 int pool_pool_size(PgPool *pool)
205 {
206 if (pool->db->pool_size < 0)
207 return cf_default_pool_size;
208 else
209 return pool->db->pool_size;
210 }
211
pool_min_pool_size(PgPool * pool)212 int pool_min_pool_size(PgPool *pool)
213 {
214 if (pool->db->min_pool_size < 0)
215 return cf_min_pool_size;
216 else
217 return pool->db->min_pool_size;
218 }
219
pool_res_pool_size(PgPool * pool)220 int pool_res_pool_size(PgPool *pool)
221 {
222 if (pool->db->res_pool_size < 0)
223 return cf_res_pool_size;
224 else
225 return pool->db->res_pool_size;
226 }
227
database_max_connections(PgDatabase * db)228 int database_max_connections(PgDatabase *db)
229 {
230 if (db->max_db_connections <= 0) {
231 return cf_max_db_connections;
232 } else {
233 return db->max_db_connections;
234 }
235 }
236
user_max_connections(PgUser * user)237 int user_max_connections(PgUser *user)
238 {
239 if (user->max_user_connections <= 0) {
240 return cf_max_user_connections;
241 } else {
242 return user->max_user_connections;
243 }
244 }
245
246 /* process packets on logged in connection */
handle_server_work(PgSocket * server,PktHdr * pkt)247 static bool handle_server_work(PgSocket *server, PktHdr *pkt)
248 {
249 bool ready = false;
250 bool idle_tx = false;
251 char state;
252 SBuf *sbuf = &server->sbuf;
253 PgSocket *client = server->link;
254 bool async_response = false;
255
256 Assert(!server->pool->db->admin);
257
258 switch (pkt->type) {
259 default:
260 slog_error(server, "unknown pkt: '%c'", pkt_desc(pkt));
261 disconnect_server(server, true, "unknown pkt");
262 return false;
263
264 /* pooling decisions will be based on this packet */
265 case 'Z': /* ReadyForQuery */
266
267 /* if partial pkt, wait */
268 if (!mbuf_get_char(&pkt->data, &state))
269 return false;
270
271 /* set ready only if no tx */
272 if (state == 'I')
273 ready = true;
274 else if (pool_pool_mode(server->pool) == POOL_STMT) {
275 disconnect_server(server, true, "transaction blocks not allowed in statement pooling mode");
276 return false;
277 } else if (state == 'T' || state == 'E') {
278 idle_tx = true;
279 }
280
281 if (client && !server->setting_vars) {
282 if (client->expect_rfq_count > 0) {
283 client->expect_rfq_count--;
284 } else if (server->state == SV_ACTIVE) {
285 slog_debug(client, "unexpected ReadyForQuery - expect_rfq_count=%d", client->expect_rfq_count);
286 }
287 }
288 break;
289
290 case 'S': /* ParameterStatus */
291 if (!load_parameter(server, pkt, false))
292 return false;
293 break;
294
295 /*
296 * 'E' and 'N' packets currently set ->ready to false. Correct would
297 * be to leave ->ready as-is, because overall TX state stays same.
298 * It matters for connections in IDLE or USED state which get dirty
299 * suddenly but should not as they are still usable.
300 *
301 * But the 'E' or 'N' packet between transactions signifies probably
302 * dying backend. It is better to tag server as dirty and drop
303 * it later.
304 */
305 case 'E': /* ErrorResponse */
306 if (server->setting_vars) {
307 /*
308 * the SET and user query will be different TX
309 * so we cannot report SET error to user.
310 */
311 log_server_error("varcache_apply failed", pkt);
312
313 /*
314 * client probably gave invalid values in startup pkt.
315 *
316 * no reason to keep such guys.
317 */
318 disconnect_server(server, true, "invalid server parameter");
319 return false;
320 }
321 /* fallthrough */
322 case 'C': /* CommandComplete */
323
324 /* ErrorResponse and CommandComplete show end of copy mode */
325 if (server->copy_mode) {
326 server->copy_mode = false;
327
328 /* it's impossible to track sync count over copy */
329 if (client)
330 client->expect_rfq_count = 0;
331 }
332 break;
333
334 case 'N': /* NoticeResponse */
335 break;
336
337 /* reply to LISTEN, don't change connection state */
338 case 'A': /* NotificationResponse */
339 idle_tx = server->idle_tx;
340 ready = server->ready;
341 async_response = true;
342 break;
343
344 /* copy mode */
345 case 'G': /* CopyInResponse */
346 case 'H': /* CopyOutResponse */
347 server->copy_mode = true;
348 break;
349 /* chat packets */
350 case '2': /* BindComplete */
351 case '3': /* CloseComplete */
352 case 'c': /* CopyDone(F/B) */
353 case 'f': /* CopyFail(F/B) */
354 case 'I': /* EmptyQueryResponse == CommandComplete */
355 case 'V': /* FunctionCallResponse */
356 case 'n': /* NoData */
357 case '1': /* ParseComplete */
358 case 's': /* PortalSuspended */
359
360 /* data packets, there will be more coming */
361 case 'd': /* CopyData(F/B) */
362 case 'D': /* DataRow */
363 case 't': /* ParameterDescription */
364 case 'T': /* RowDescription */
365 break;
366 }
367 server->idle_tx = idle_tx;
368 server->ready = ready;
369 server->pool->stats.server_bytes += pkt->len;
370
371 if (server->setting_vars) {
372 Assert(client);
373 sbuf_prepare_skip(sbuf, pkt->len);
374 } else if (client) {
375 if (client->state == CL_LOGIN) {
376 return handle_auth_query_response(client, pkt);
377 } else {
378 sbuf_prepare_send(sbuf, &client->sbuf, pkt->len);
379
380 /*
381 * Compute query and transaction times
382 *
383 * For pipelined overlapping commands, we wait until
384 * the last command is done (expect_rfq_count==0).
385 * That means, we count the time that PgBouncer is
386 * occupied in a query or transaction, not the total
387 * time that all queries/transactions take
388 * individually. For that, we would have to track the
389 * start time of each query separately in a queue or
390 * similar, not only per client.
391 */
392 if (client->expect_rfq_count == 0) {
393 /* every statement (independent or in a transaction) counts as a query */
394 if (ready || idle_tx) {
395 if (client->query_start) {
396 usec_t total;
397 total = get_cached_time() - client->query_start;
398 client->query_start = 0;
399 server->pool->stats.query_time += total;
400 slog_debug(client, "query time: %d us", (int)total);
401 } else if (!async_response) {
402 slog_warning(client, "FIXME: query end, but query_start == 0");
403 }
404 }
405
406 /* statement ending in "idle" ends a transaction */
407 if (ready) {
408 if (client->xact_start) {
409 usec_t total;
410 total = get_cached_time() - client->xact_start;
411 client->xact_start = 0;
412 server->pool->stats.xact_time += total;
413 slog_debug(client, "transaction time: %d us", (int)total);
414 } else if (!async_response) {
415 /* XXX This happens during takeover if the new process
416 * continues a transaction. */
417 slog_warning(client, "FIXME: transaction end, but xact_start == 0");
418 }
419 }
420 }
421 }
422 } else {
423 if (server->state != SV_TESTED)
424 slog_warning(server,
425 "got packet '%c' from server when not linked",
426 pkt_desc(pkt));
427 sbuf_prepare_skip(sbuf, pkt->len);
428 }
429
430 return true;
431 }
432
433 /* got connection, decide what to do */
handle_connect(PgSocket * server)434 static bool handle_connect(PgSocket *server)
435 {
436 bool res = false;
437 PgPool *pool = server->pool;
438 char buf[PGADDR_BUF + 32];
439 bool is_unix = pga_is_unix(&server->remote_addr);
440
441 fill_local_addr(server, sbuf_socket(&server->sbuf), is_unix);
442
443 if (cf_log_connections) {
444 if (pga_is_unix(&server->remote_addr))
445 slog_info(server, "new connection to server");
446 else
447 slog_info(server, "new connection to server (from %s)",
448 pga_str(&server->local_addr, buf, sizeof(buf)));
449 }
450
451 if (!statlist_empty(&pool->cancel_req_list)) {
452 slog_debug(server, "use it for pending cancel req");
453 /* if pending cancel req, send it */
454 forward_cancel_request(server);
455 /* notify disconnect_server() that connect did not fail */
456 server->ready = true;
457 disconnect_server(server, false, "sent cancel req");
458 } else {
459 /* proceed with login */
460 if (server_connect_sslmode > SSLMODE_DISABLED && !is_unix) {
461 slog_noise(server, "P: SSL request");
462 res = send_sslreq_packet(server);
463 if (res)
464 server->wait_sslchar = true;
465 } else {
466 slog_noise(server, "P: startup");
467 res = send_startup_packet(server);
468 }
469 if (!res)
470 disconnect_server(server, false, "startup pkt failed");
471 }
472 return res;
473 }
474
handle_sslchar(PgSocket * server,struct MBuf * data)475 static bool handle_sslchar(PgSocket *server, struct MBuf *data)
476 {
477 uint8_t schar = '?';
478 bool ok;
479
480 server->wait_sslchar = false;
481
482 ok = mbuf_get_byte(data, &schar);
483 if (!ok || (schar != 'S' && schar != 'N') || mbuf_avail_for_read(data) != 0) {
484 disconnect_server(server, false, "bad sslreq answer");
485 return false;
486 }
487
488 if (schar == 'S') {
489 slog_noise(server, "launching tls");
490 ok = sbuf_tls_connect(&server->sbuf, server->pool->db->host);
491 } else if (server_connect_sslmode >= SSLMODE_REQUIRE) {
492 disconnect_server(server, false, "server refused SSL");
493 return false;
494 } else {
495 /* proceed with non-TLS connection */
496 ok = send_startup_packet(server);
497 }
498
499 if (ok) {
500 sbuf_prepare_skip(&server->sbuf, 1);
501 } else {
502 disconnect_server(server, false, "sslreq processing failed");
503 }
504 return ok;
505 }
506
507 /* callback from SBuf */
server_proto(SBuf * sbuf,SBufEvent evtype,struct MBuf * data)508 bool server_proto(SBuf *sbuf, SBufEvent evtype, struct MBuf *data)
509 {
510 bool res = false;
511 PgSocket *server = container_of(sbuf, PgSocket, sbuf);
512 PgPool *pool = server->pool;
513 PktHdr pkt;
514 char infobuf[96];
515
516 Assert(is_server_socket(server));
517 Assert(server->state != SV_FREE);
518
519 /* may happen if close failed */
520 if (server->state == SV_JUSTFREE)
521 return false;
522
523 switch (evtype) {
524 case SBUF_EV_RECV_FAILED:
525 disconnect_server(server, false, "server conn crashed?");
526 break;
527 case SBUF_EV_SEND_FAILED:
528 disconnect_client(server->link, false, "unexpected eof");
529 break;
530 case SBUF_EV_READ:
531 if (server->wait_sslchar) {
532 res = handle_sslchar(server, data);
533 break;
534 }
535 if (incomplete_header(data)) {
536 slog_noise(server, "S: got partial header, trying to wait a bit");
537 break;
538 }
539
540 /* parse pkt header */
541 if (!get_header(data, &pkt)) {
542 disconnect_server(server, true, "bad pkt header");
543 break;
544 }
545 slog_noise(server, "read pkt='%c', len=%d", pkt_desc(&pkt), pkt.len);
546
547 server->request_time = get_cached_time();
548 switch (server->state) {
549 case SV_LOGIN:
550 res = handle_server_startup(server, &pkt);
551 break;
552 case SV_TESTED:
553 case SV_USED:
554 case SV_ACTIVE:
555 case SV_IDLE:
556 res = handle_server_work(server, &pkt);
557 break;
558 default:
559 fatal("server_proto: server in bad state: %d", server->state);
560 }
561 break;
562 case SBUF_EV_CONNECT_FAILED:
563 Assert(server->state == SV_LOGIN);
564 disconnect_server(server, false, "connect failed");
565 break;
566 case SBUF_EV_CONNECT_OK:
567 slog_debug(server, "S: connect ok");
568 Assert(server->state == SV_LOGIN);
569 server->request_time = get_cached_time();
570 res = handle_connect(server);
571 break;
572 case SBUF_EV_FLUSH:
573 res = true;
574 if (!server->ready)
575 break;
576
577 if (server->setting_vars) {
578 PgSocket *client = server->link;
579 Assert(client);
580
581 server->setting_vars = false;
582 sbuf_continue(&client->sbuf);
583 break;
584 }
585
586 if (pool_pool_mode(pool) != POOL_SESSION || server->state == SV_TESTED || server->resetting) {
587 server->resetting = false;
588 switch (server->state) {
589 case SV_ACTIVE:
590 case SV_TESTED:
591 /* keep link if client expects more Syncs */
592 if (server->link) {
593 if (server->link->expect_rfq_count > 0)
594 break;
595 }
596
597 /* retval does not matter here */
598 release_server(server);
599 break;
600 default:
601 slog_warning(server, "EV_FLUSH with state=%d", server->state);
602 case SV_IDLE:
603 break;
604 }
605 }
606 break;
607 case SBUF_EV_PKT_CALLBACK:
608 slog_warning(server, "SBUF_EV_PKT_CALLBACK with state=%d", server->state);
609 break;
610 case SBUF_EV_TLS_READY:
611 Assert(server->state == SV_LOGIN);
612
613 tls_get_connection_info(server->sbuf.tls, infobuf, sizeof infobuf);
614 if (cf_log_connections) {
615 slog_info(server, "SSL established: %s", infobuf);
616 } else {
617 slog_noise(server, "SSL established: %s", infobuf);
618 }
619
620 server->request_time = get_cached_time();
621 res = send_startup_packet(server);
622 if (res)
623 sbuf_continue(&server->sbuf);
624 else
625 disconnect_server(server, false, "TLS startup failed");
626 break;
627 }
628 if (!res && pool->db->admin)
629 takeover_login_failed();
630 return res;
631 }
632