1 /* -*-pgsql-c-*- */
2 /*
3 * $Header$
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2019 PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
20 *
21 * pool_connection_pool.c: connection pool stuff
22 */
23 #include "config.h"
24
25 #include <sys/types.h>
26 #include <sys/time.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <sys/un.h>
30 #ifdef HAVE_SYS_SELECT_H
31 #include <sys/select.h>
32 #endif
33 #ifdef HAVE_NETINET_TCP_H
34 #include <netinet/tcp.h>
35 #endif
36 #include <netdb.h>
37
38 #include <stdio.h>
39 #include <errno.h>
40 #include <signal.h>
41 #include <string.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44
45 #include "pool.h"
46 #include "utils/pool_stream.h"
47 #include "utils/palloc.h"
48 #include "pool_config.h"
49 #include "utils/elog.h"
50 #include "utils/memutils.h"
51 #include "context/pool_process_context.h"
52
53 static int pool_index; /* Active pool index */
54 POOL_CONNECTION_POOL *pool_connection_pool; /* connection pool */
55 volatile sig_atomic_t backend_timer_expired = 0; /* flag for connection closed timer is expired */
56 volatile sig_atomic_t health_check_timer_expired; /* non 0 if health check timer expired */
57 static POOL_CONNECTION_POOL_SLOT *create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot);
58 static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p);
59 static int check_socket_status(int fd);
60 static bool connect_with_timeout(int fd, struct addrinfo *walk, char *host, int port, bool retry);
61
62 /*
63 * initialize connection pools. this should be called once at the startup.
64 */
pool_init_cp(void)65 int pool_init_cp(void)
66 {
67 int i;
68 MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
69
70 pool_connection_pool = (POOL_CONNECTION_POOL *)palloc(sizeof(POOL_CONNECTION_POOL)*pool_config->max_pool);
71 memset(pool_connection_pool, 0, sizeof(POOL_CONNECTION_POOL)*pool_config->max_pool);
72
73 for (i = 0; i < pool_config->max_pool; i++)
74 {
75 pool_connection_pool[i].info = pool_coninfo(pool_get_process_context()->proc_id, i, 0);
76 memset(pool_connection_pool[i].info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
77 }
78 MemoryContextSwitchTo(oldContext);
79 return 0;
80 }
81
82 /*
83 * find connection by user and database
84 */
pool_get_cp(char * user,char * database,int protoMajor,int check_socket)85 POOL_CONNECTION_POOL *pool_get_cp(char *user, char *database, int protoMajor, int check_socket)
86 {
87 pool_sigset_t oldmask;
88
89 int i, freed = 0;
90 ConnectionInfo *info;
91
92 POOL_CONNECTION_POOL *connection_pool = pool_connection_pool;
93
94 if (connection_pool == NULL)
95 {
96 /* if no connection pool exists we have no reason to live */
97 ereport(ERROR,
98 (return_code(2),
99 errmsg("unable to get connection"),
100 errdetail("connection pool is not initialized")));
101 }
102
103 POOL_SETMASK2(&BlockSig, &oldmask);
104
105 for (i=0;i<pool_config->max_pool;i++)
106 {
107 if (MASTER_CONNECTION(connection_pool) &&
108 MASTER_CONNECTION(connection_pool)->sp &&
109 MASTER_CONNECTION(connection_pool)->sp->major == protoMajor &&
110 MASTER_CONNECTION(connection_pool)->sp->user != NULL &&
111 strcmp(MASTER_CONNECTION(connection_pool)->sp->user, user) == 0 &&
112 strcmp(MASTER_CONNECTION(connection_pool)->sp->database, database) == 0)
113 {
114 int sock_broken = 0;
115 int j;
116
117 /* mark this connection is under use */
118 MASTER_CONNECTION(connection_pool)->closetime = 0;
119 for (j=0;j<NUM_BACKENDS;j++)
120 {
121 connection_pool->info[j].counter++;
122 }
123 POOL_SETMASK(&oldmask);
124
125 if (check_socket)
126 {
127 for (j=0;j<NUM_BACKENDS;j++)
128 {
129 if (!VALID_BACKEND(j))
130 continue;
131
132 if (CONNECTION_SLOT(connection_pool, j))
133 {
134 sock_broken = check_socket_status(CONNECTION(connection_pool, j)->fd);
135 if (sock_broken < 0)
136 break;
137 }
138 else
139 {
140 sock_broken = -1;
141 break;
142 }
143 }
144
145 if (sock_broken < 0)
146 {
147 ereport(LOG,
148 (errmsg("connection closed."),
149 errdetail("retry to create new connection pool")));
150
151 for (j=0;j<NUM_BACKENDS;j++)
152 {
153 if (!VALID_BACKEND(j) || (CONNECTION_SLOT(connection_pool, j) == NULL))
154 continue;
155
156 if (!freed)
157 {
158 pool_free_startup_packet(CONNECTION_SLOT(connection_pool, j)->sp);
159 CONNECTION_SLOT(connection_pool, j)->sp = NULL;
160
161 freed = 1;
162 }
163
164 pool_close(CONNECTION(connection_pool, j));
165 pfree(CONNECTION_SLOT(connection_pool, j));
166 }
167 info = connection_pool->info;
168 memset(connection_pool, 0, sizeof(POOL_CONNECTION_POOL));
169 connection_pool->info = info;
170 info->swallow_termination = 0;
171 memset(connection_pool->info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
172 POOL_SETMASK(&oldmask);
173 return NULL;
174 }
175 }
176 POOL_SETMASK(&oldmask);
177 pool_index = i;
178 return connection_pool;
179 }
180 connection_pool++;
181 }
182
183 POOL_SETMASK(&oldmask);
184 return NULL;
185 }
186
187 /*
188 * disconnect and release a connection to the database
189 */
pool_discard_cp(char * user,char * database,int protoMajor)190 void pool_discard_cp(char *user, char *database, int protoMajor)
191 {
192 POOL_CONNECTION_POOL *p = pool_get_cp(user, database, protoMajor, 0);
193 ConnectionInfo *info;
194 int i, freed = 0;
195
196 if (p == NULL)
197 {
198 ereport(LOG,
199 (errmsg("cannot get connection pool for user: \"%s\" database: \"%s\", while discarding connection pool", user, database)));
200 return;
201 }
202
203 for (i=0;i<NUM_BACKENDS;i++)
204 {
205 if (!VALID_BACKEND(i))
206 continue;
207
208 if (!freed)
209 {
210 pool_free_startup_packet(CONNECTION_SLOT(p, i)->sp);
211 freed = 1;
212 }
213 CONNECTION_SLOT(p, i)->sp = NULL;
214 pool_close(CONNECTION(p, i));
215 pfree(CONNECTION_SLOT(p, i));
216 }
217
218 info = p->info;
219 memset(p, 0, sizeof(POOL_CONNECTION_POOL));
220 p->info = info;
221 memset(p->info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
222 }
223
224
225 /*
226 * create a connection pool by user and database
227 */
pool_create_cp(void)228 POOL_CONNECTION_POOL *pool_create_cp(void)
229 {
230 int i, freed = 0;
231 time_t closetime;
232 POOL_CONNECTION_POOL *oldestp;
233 POOL_CONNECTION_POOL *ret;
234 ConnectionInfo *info;
235
236 POOL_CONNECTION_POOL *p = pool_connection_pool;
237 /* if no connection pool exists we have no reason to live */
238 if (p == NULL)
239 ereport(ERROR,
240 (return_code(2),
241 errmsg("unable to create connection"),
242 errdetail("connection pool is not initialized")));
243
244 for (i=0;i<pool_config->max_pool;i++)
245 {
246 if (MASTER_CONNECTION(p) == NULL)
247 {
248 ret = new_connection(p);
249 if (ret)
250 pool_index = i;
251 return ret;
252 }
253 p++;
254 }
255 ereport(DEBUG1,
256 (errmsg("creating connection pool"),
257 errdetail("no empty connection slot was found")));
258
259 /*
260 * no empty connection slot was found. look for the oldest connection and discard it.
261 */
262 oldestp = p = pool_connection_pool;
263 closetime = MASTER_CONNECTION(p)->closetime;
264 pool_index = 0;
265
266 for (i=0;i<pool_config->max_pool;i++)
267 {
268 ereport(DEBUG1,
269 (errmsg("creating connection pool"),
270 errdetail("user: %s database: %s closetime: %ld",
271 MASTER_CONNECTION(p)->sp->user,
272 MASTER_CONNECTION(p)->sp->database,
273 MASTER_CONNECTION(p)->closetime)));
274
275 if (MASTER_CONNECTION(p)->closetime < closetime)
276 {
277 closetime = MASTER_CONNECTION(p)->closetime;
278 oldestp = p;
279 pool_index = i;
280 }
281 p++;
282 }
283
284 p = oldestp;
285 pool_send_frontend_exits(p);
286
287 ereport(DEBUG1,
288 (errmsg("creating connection pool"),
289 errdetail("discarding old %zd th connection. user: %s database: %s",
290 oldestp - pool_connection_pool,
291 MASTER_CONNECTION(p)->sp->user,
292 MASTER_CONNECTION(p)->sp->database)));
293
294 for (i=0;i<NUM_BACKENDS;i++)
295 {
296 if (!VALID_BACKEND(i))
297 continue;
298
299 if (!freed)
300 {
301 pool_free_startup_packet(CONNECTION_SLOT(p, i)->sp);
302 CONNECTION_SLOT(p, i)->sp = NULL;
303
304 freed = 1;
305 }
306
307 pool_close(CONNECTION(p, i));
308 pfree(CONNECTION_SLOT(p, i));
309 }
310
311 info = p->info;
312 memset(p, 0, sizeof(POOL_CONNECTION_POOL));
313 p->info = info;
314 memset(p->info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
315
316 ret = new_connection(p);
317 return ret;
318 }
319
320 /*
321 * set backend connection close timer
322 */
pool_connection_pool_timer(POOL_CONNECTION_POOL * backend)323 void pool_connection_pool_timer(POOL_CONNECTION_POOL *backend)
324 {
325 POOL_CONNECTION_POOL *p = pool_connection_pool;
326 int i;
327
328 ereport(DEBUG1,
329 (errmsg("setting backend connection close timer"),
330 errdetail("close time %ld", time(NULL))));
331
332 /* Set connection close time */
333 for (i = 0; i < NUM_BACKENDS; i++)
334 {
335 if (CONNECTION_SLOT(backend, i))
336 CONNECTION_SLOT(backend, i)->closetime = time(NULL);
337 }
338
339 if (pool_config->connection_life_time == 0)
340 return;
341
342 /* look for any other timeout */
343 for (i=0;i<pool_config->max_pool;i++, p++)
344 {
345 if (!MASTER_CONNECTION(p))
346 continue;
347 if (!MASTER_CONNECTION(p)->sp)
348 continue;
349 if (MASTER_CONNECTION(p)->sp->user == NULL)
350 continue;
351
352 if (p != backend && MASTER_CONNECTION(p)->closetime)
353 return;
354 }
355
356 /* no other timer found. set my timer */
357 ereport(DEBUG1,
358 (errmsg("setting backend connection close timer"),
359 errdetail("setting alarm after %d seconds", pool_config->connection_life_time)));
360
361 pool_alarm(pool_backend_timer_handler, pool_config->connection_life_time);
362 }
363
364 /*
365 * backend connection close timer handler
366 */
pool_backend_timer_handler(int sig)367 RETSIGTYPE pool_backend_timer_handler(int sig)
368 {
369 backend_timer_expired = 1;
370 }
371
pool_backend_timer(void)372 void pool_backend_timer(void)
373 {
374 #define TMINTMAX 0x7fffffff
375
376 POOL_CONNECTION_POOL *p = pool_connection_pool;
377 int i, j;
378 time_t now;
379 time_t nearest = TMINTMAX;
380 ConnectionInfo *info;
381
382 POOL_SETMASK(&BlockSig);
383
384 now = time(NULL);
385
386 ereport(DEBUG1,
387 (errmsg("backend timer handler called at %ld", now)));
388
389 for (i=0;i<pool_config->max_pool;i++, p++)
390 {
391 if (!MASTER_CONNECTION(p))
392 continue;
393 if (!MASTER_CONNECTION(p)->sp)
394 continue;
395 if (MASTER_CONNECTION(p)->sp->user == NULL)
396 continue;
397
398 /* timer expire? */
399 if (MASTER_CONNECTION(p)->closetime)
400 {
401 int freed = 0;
402
403 ereport(DEBUG1,
404 (errmsg("backend timer handler called"),
405 errdetail("expire time: %ld",
406 MASTER_CONNECTION(p)->closetime+pool_config->connection_life_time)));
407
408 if (now >= (MASTER_CONNECTION(p)->closetime+pool_config->connection_life_time))
409 {
410 /* discard expired connection */
411 ereport(DEBUG1,
412 (errmsg("backend timer handler called"),
413 errdetail("expired user: \"%s\" database: \"%s\"",
414 MASTER_CONNECTION(p)->sp->user, MASTER_CONNECTION(p)->sp->database)));
415 pool_send_frontend_exits(p);
416
417 for (j=0;j<NUM_BACKENDS;j++)
418 {
419 if (!VALID_BACKEND(j))
420 continue;
421
422 if (!freed)
423 {
424 pool_free_startup_packet(CONNECTION_SLOT(p, j)->sp);
425 freed = 1;
426 }
427 CONNECTION_SLOT(p, j)->sp = NULL;
428 pool_close(CONNECTION(p, j));
429 pfree(CONNECTION_SLOT(p, j));
430 }
431 info = p->info;
432 memset(p, 0, sizeof(POOL_CONNECTION_POOL));
433 p->info = info;
434 memset(p->info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
435 }
436 else
437 {
438 /* look for nearest timer */
439 if (MASTER_CONNECTION(p)->closetime < nearest)
440 nearest = MASTER_CONNECTION(p)->closetime;
441 }
442 }
443 }
444
445 /* any remaining timer */
446 if (nearest != TMINTMAX)
447 {
448 nearest = pool_config->connection_life_time - (now - nearest);
449 if (nearest <= 0)
450 nearest = 1;
451 pool_alarm(pool_backend_timer_handler, nearest);
452 }
453
454 POOL_SETMASK(&UnBlockSig);
455 }
456
457 /*
458 * connect to postmaster through INET domain socket
459 */
connect_inet_domain_socket(int slot,bool retry)460 int connect_inet_domain_socket(int slot, bool retry)
461 {
462 char *host;
463 int port;
464
465 host = pool_config->backend_desc->backend_info[slot].backend_hostname;
466 port = pool_config->backend_desc->backend_info[slot].backend_port;
467
468 return connect_inet_domain_socket_by_port(host, port, retry);
469 }
470
471 /*
472 * connect to postmaster through UNIX domain socket
473 */
connect_unix_domain_socket(int slot,bool retry)474 int connect_unix_domain_socket(int slot, bool retry)
475 {
476 int port;
477 char *socket_dir;
478
479 port = pool_config->backend_desc->backend_info[slot].backend_port;
480 socket_dir = pool_config->backend_desc->backend_info[slot].backend_hostname;
481
482 return connect_unix_domain_socket_by_port(port, socket_dir, retry);
483 }
484
485 /*
486 * Connect to PostgreSQL server by using UNIX domain socket.
487 * If retry is true, retry to call connect() upon receiving EINTR error.
488 */
connect_unix_domain_socket_by_port(int port,char * socket_dir,bool retry)489 int connect_unix_domain_socket_by_port(int port, char *socket_dir, bool retry)
490 {
491 struct sockaddr_un addr;
492 int fd;
493 int len;
494
495 fd = socket(AF_UNIX, SOCK_STREAM, 0);
496 if (fd == -1)
497 {
498 ereport(LOG,
499 (errmsg("failed to connect to PostgreSQL server by unix domain socket"),
500 errdetail("create socket failed with error \"%s\"", strerror(errno))));
501 return -1;
502 }
503
504 memset((char *) &addr, 0, sizeof(addr));
505 addr.sun_family = AF_UNIX;
506 snprintf(addr.sun_path, sizeof(addr.sun_path), "%s/.s.PGSQL.%d", socket_dir, port);
507 len = sizeof(struct sockaddr_un);
508
509 for (;;)
510 {
511 if (exit_request) /* exit request already sent */
512 {
513 ereport(LOG,
514 (errmsg("failed to connect to PostgreSQL server by unix domain socket"),
515 errdetail("exit request has been sent")));
516 close(fd);
517 return -1;
518 }
519
520 if (connect(fd, (struct sockaddr *)&addr, len) < 0)
521 {
522 if ((errno == EINTR && retry) || errno == EAGAIN)
523 continue;
524 close(fd);
525 ereport(LOG,
526 (errmsg("failed to connect to PostgreSQL server by unix domain socket"),
527 errdetail("connect to \"%s\" failed with error \"%s\"",addr.sun_path, strerror(errno))));
528
529 return -1;
530 }
531 break;
532 }
533
534 return fd;
535 }
536
537 /*
538 * Connet to backend using pool_config->connect_timeout.
539 *
540 * fd: the socket
541 * walk: backend address to connect
542 * host and port: backend hostname and port number. Only for error message
543 * purpose.
544 * retry: true if need to retry
545 */
connect_with_timeout(int fd,struct addrinfo * walk,char * host,int port,bool retry)546 static bool connect_with_timeout(int fd, struct addrinfo *walk, char *host, int port, bool retry)
547 {
548 struct timeval *tm;
549 struct timeval timeout;
550 fd_set rset, wset;
551 int sts;
552 int error;
553 socklen_t socklen;
554
555 pool_set_nonblock(fd);
556
557 for (;;)
558 {
559 if (exit_request) /* exit request already sent */
560 {
561 ereport(LOG,
562 (errmsg("failed to connect to PostgreSQL server on \"%s:%d\" using INET socket",host,port),
563 errdetail("exit request has been sent")));
564 close(fd);
565 return false;
566 }
567
568 if (health_check_timer_expired && getpid() == mypid) /* has health check timer expired */
569 {
570 ereport(LOG,
571 (errmsg("failed to connect to PostgreSQL server on \"%s:%d\" using INET socket",host,port),
572 errdetail("health check timer expired")));
573 close(fd);
574 return false;
575 }
576
577 if (connect(fd, walk->ai_addr, walk->ai_addrlen) < 0)
578 {
579 if (errno == EISCONN)
580 {
581 /* Socket is already connected */
582 break;
583 }
584
585 if ((errno == EINTR && retry) || errno == EAGAIN)
586 continue;
587
588 /*
589 * If error was "connect(2) is in progress", then wait for
590 * completion. Otherwise error out.
591 */
592 if (errno != EINPROGRESS && errno != EALREADY)
593 {
594 ereport(LOG,
595 (errmsg("failed to connect to PostgreSQL server on \"%s:%d\" with error \"%s\"",host,port,strerror(errno))));
596 return false;
597 }
598
599 if (pool_config->connect_timeout == 0)
600 tm = NULL;
601 else
602 {
603 tm = &timeout;
604 timeout.tv_sec = pool_config->connect_timeout/1000;
605 if (timeout.tv_sec == 0)
606 {
607 timeout.tv_usec = pool_config->connect_timeout*1000;
608 }
609 else
610 {
611 timeout.tv_usec = (pool_config->connect_timeout - timeout.tv_sec*1000)*1000;
612 }
613 }
614
615 FD_ZERO(&rset);
616 FD_SET(fd, &rset);
617 FD_ZERO(&wset);
618 FD_SET(fd, &wset);
619 sts = select(fd+1, &rset, &wset, NULL, tm);
620
621 if (sts == 0)
622 {
623 /* select timeout */
624 if (retry)
625 {
626 ereport(LOG,
627 (errmsg("trying connecting to PostgreSQL server on \"%s:%d\" by INET socket",host,port),
628 errdetail("timed out. retrying...")));
629 continue;
630 }
631 else
632 {
633 ereport(LOG,
634 (errmsg("failed to connect to PostgreSQL server on \"%s:%d\", timed out",host,port)));
635 return false;
636 }
637 }
638 else if (sts > 0)
639 {
640 /*
641 * If read data or write data was set, either connect
642 * succeeded or error. We need to figure it out. This
643 * is the hardest part in using non blocking
644 * connect(2). See W. Richar Stevens's "UNIX Network
645 * Programming: Volume 1, Second Edition" section
646 * 15.4.
647 */
648 if (FD_ISSET(fd, &rset) || FD_ISSET(fd, &wset))
649 {
650 error = 0;
651 socklen = sizeof(error);
652 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &socklen) < 0)
653 {
654 /* Solaris returns error in this case */
655 ereport(LOG,
656 (errmsg("failed to connect to PostgreSQL server on \"%s:%d\", getsockopt() failed with error \"%s\"",host,port,strerror(errno))));
657
658 return false;
659 }
660
661 /* Non Solaris case */
662 if (error != 0)
663 {
664 ereport(LOG,
665 (errmsg("failed to connect to PostgreSQL server on \"%s:%d\", getsockopt() detected error \"%s\"",host,port,strerror(error))));
666 return false;
667 }
668 }
669 else
670 {
671 ereport(LOG,
672 (errmsg("failed to connect to PostgreSQL server on \"%s:%d\", both read data and write data was not set",host,port)));
673
674 return false;
675 }
676 }
677 else /* select returns error */
678 {
679 if ((errno == EINTR && retry) || errno == EAGAIN)
680 {
681 ereport(LOG,
682 (errmsg("trying to connect to PostgreSQL server on \"%s:%d\" using INET socket",host,port),
683 errdetail("select() interrupted. retrying...")));
684 continue;
685 }
686
687 /*
688 * select(2) was interrupted by certain signal and we guess it
689 * was not SIGALRM because health_check_timer_expired was not
690 * set (if the variable was set, we can assume that SIGALRM
691 * handler was called). Surely this is not a health check time
692 * out. We can assume that this is a transient case. So we
693 * will retry again...
694 */
695 if (health_check_timer_expired == 0 && errno == EINTR)
696 {
697 ereport(LOG,
698 (errmsg("connect_inet_domain_socket: select() interrupted by certain signal. retrying...")));
699 continue;
700 }
701
702 ereport(LOG,
703 (errmsg("failed to connect to PostgreSQL server on \"%s:%d\" using INET socket",host,port),
704 errdetail("select() system call failed with an error \"%s\"",strerror(errno))));
705 close(fd);
706 return false;
707 }
708 }
709 break;
710 }
711
712 pool_unset_nonblock(fd);
713 return true;
714 }
715
716 /*
717 * Connect to PostgreSQL server by using INET domain socket.
718 * If retry is true, retry to call connect() upon receiving EINTR error.
719 */
connect_inet_domain_socket_by_port(char * host,int port,bool retry)720 int connect_inet_domain_socket_by_port(char *host, int port, bool retry)
721 {
722 int fd = -1;
723 int on = 1;
724 char *portstr;
725 int ret;
726 struct addrinfo *res;
727 struct addrinfo *walk;
728 struct addrinfo hints;
729
730 /* getaddrinfo() requires a string because it also accepts service names, such as "http". */
731 if (asprintf(&portstr, "%d", port) == -1)
732 {
733 ereport(WARNING,
734 (errmsg("failed to connect to PostgreSQL server, asprintf() failed with error \"%s\"",strerror(errno))));
735
736 return -1;
737 }
738
739 memset(&hints, 0, sizeof(struct addrinfo));
740 hints.ai_family = PF_UNSPEC;
741 hints.ai_socktype = SOCK_STREAM;
742
743 if ((ret = getaddrinfo(host, portstr, &hints, &res)) != 0)
744 {
745 ereport(WARNING,
746 (errmsg("failed to connect to PostgreSQL server, getaddrinfo() failed with error \"%s\"",gai_strerror(ret))));
747
748 free(portstr);
749 return -1;
750 }
751
752 free(portstr);
753
754 for (walk = res; walk != NULL; walk = walk->ai_next)
755 {
756 fd = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol);
757 if (fd < 0)
758 {
759 ereport(WARNING,
760 (errmsg("failed to connect to PostgreSQL server, socket() failed with error \"%s\"",strerror(errno))));
761 continue;
762 }
763
764 /* set nodelay */
765 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
766 (char *) &on,
767 sizeof(on)) < 0)
768 {
769 ereport(WARNING,
770 (errmsg("failed to connect to PostgreSQL server, setsockopt() failed with error \"%s\"",strerror(errno))));
771
772 close(fd);
773 freeaddrinfo(res);
774 return -1;
775 }
776
777 if (!connect_with_timeout(fd, walk, host, port, retry))
778 {
779 close(fd);
780 continue;
781 }
782
783 freeaddrinfo(res);
784 return fd;
785 }
786
787 freeaddrinfo(res);
788 return -1;
789 }
790
791 /*
792 * create connection pool
793 */
create_cp(POOL_CONNECTION_POOL_SLOT * cp,int slot)794 static POOL_CONNECTION_POOL_SLOT *create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot)
795 {
796 BackendInfo *b = &pool_config->backend_desc->backend_info[slot];
797 int fd;
798
799 if (*b->backend_hostname == '/')
800 {
801 fd = connect_unix_domain_socket(slot, TRUE);
802 }
803 else
804 {
805 fd = connect_inet_domain_socket(slot, TRUE);
806 }
807
808 if (fd < 0)
809 return NULL;
810
811 cp->sp = NULL;
812 cp->con = pool_open(fd,true);
813 cp->closetime = 0;
814 return cp;
815 }
816
817 /*
818 * Create actual connections to backends.
819 * New connection resides in TopMemoryContext.
820 */
new_connection(POOL_CONNECTION_POOL * p)821 static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p)
822 {
823 POOL_CONNECTION_POOL_SLOT *s;
824 int active_backend_count = 0;
825 int i;
826 bool status_changed = false;
827
828 MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
829
830 for (i=0;i<NUM_BACKENDS;i++)
831 {
832 ereport(DEBUG1,
833 (errmsg("creating new connection to backend"),
834 errdetail("connecting %d backend", i)));
835
836 if (!VALID_BACKEND(i))
837 {
838 ereport(DEBUG1,
839 (errmsg("creating new connection to backend"),
840 errdetail("skipping backend slot %d because backend_status = %d",
841 i, BACKEND_INFO(i).backend_status)));
842 continue;
843 }
844
845 /*
846 * Make sure that the global backend status in the shared memory
847 * agrees the local status checked by VALID_BACKEND. It is possible
848 * that the local status is up, while the global status has been
849 * changed to down by failover.
850 */
851 if (BACKEND_INFO(i).backend_status != CON_UP &&
852 BACKEND_INFO(i).backend_status != CON_CONNECT_WAIT)
853 {
854 ereport(DEBUG1,
855 (errmsg("creating new connection to backend"),
856 errdetail("skipping backend slot %d because global backend_status = %d",
857 i, BACKEND_INFO(i).backend_status)));
858
859 /* sync local status with global status */
860 *(my_backend_status[i]) = BACKEND_INFO(i).backend_status;
861 continue;
862 }
863
864 s = palloc(sizeof(POOL_CONNECTION_POOL_SLOT));
865
866 if (create_cp(s, i) == NULL)
867 {
868 /* If fail_over_on_backend_error is true, do failover.
869 * Otherwise, just exit this session or skip next health node.
870 */
871 if (pool_config->fail_over_on_backend_error)
872 {
873 notice_backend_error(i, true);
874 ereport(FATAL,
875 (errmsg("failed to create a backend connection"),
876 errdetail("executing failover on backend")));
877 }
878 else
879 {
880 /*
881 * If we are in streaming replication mode and the node is a
882 * standby node, then we skip this node to avoid fail over.
883 */
884 if (STREAM && !IS_PRIMARY_NODE_ID(i))
885 {
886 ereport(LOG,
887 (errmsg("failed to create a backend %d connection", i),
888 errdetail("skip this backend because because fail_over_on_backend_error is off and we are in streaming replication mode and node is standby node")));
889
890 /* set down status to local status area */
891 *(my_backend_status[i]) = CON_DOWN;
892
893 /* if master_node_id is not updated, then update it */
894 if (Req_info->master_node_id == i)
895 {
896 int old_master = Req_info->master_node_id;
897 Req_info->master_node_id = get_next_master_node();
898
899 ereport(LOG,
900 (errmsg("master node %d is down. Update master node to %d",
901 old_master, Req_info->master_node_id)));
902 }
903
904 /* make sure that we need to restart the process after
905 * finishing this session
906 */
907 pool_get_my_process_info()->need_to_restart = 1;
908 continue;
909 }
910 else
911 {
912 ereport(FATAL,
913 (errmsg("failed to create a backend %d connection", i),
914 errdetail("not executing failover because fail_over_on_backend_error is off")));
915 }
916 }
917 child_exit(POOL_EXIT_AND_RESTART);
918 }
919
920 p->info[i].create_time = time(NULL);
921 p->slots[i] = s;
922
923 pool_init_params(&s->con->params);
924
925 if (BACKEND_INFO(i).backend_status != CON_UP)
926 {
927 BACKEND_INFO(i).backend_status = CON_UP;
928 status_changed = true;
929 }
930 active_backend_count++;
931 }
932
933 if (status_changed)
934 (void)write_status_file();
935
936 MemoryContextSwitchTo(oldContext);
937
938 if (active_backend_count > 0)
939 {
940 return p;
941 }
942
943 return NULL;
944 }
945
946 /* check_socket_status()
947 * RETURN: 0 => OK
948 * -1 => broken socket.
949 */
check_socket_status(int fd)950 static int check_socket_status(int fd)
951 {
952 fd_set rfds;
953 int result;
954 struct timeval t;
955
956 for (;;)
957 {
958 FD_ZERO(&rfds);
959 FD_SET(fd, &rfds);
960
961 t.tv_sec = t.tv_usec = 0;
962
963 result = select(fd+1, &rfds, NULL, NULL, &t);
964 if (result < 0 && errno == EINTR)
965 {
966 continue;
967 }
968 else
969 {
970 return (result == 0 ? 0 : -1);
971 }
972 }
973
974 return -1;
975 }
976
977 /*
978 * Return current used index (i.e. frontend connected)
979 */
pool_pool_index(void)980 int pool_pool_index(void)
981 {
982 return pool_index;
983 }
984