1 /* -*-pgsql-c-*- */
2 /*
3  * $Header$
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2019	PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  * pool_connection_pool.c: connection pool stuff
22  */
23 #include "config.h"
24 
25 #include <sys/types.h>
26 #include <sys/time.h>
27 #include <sys/socket.h>
28 #include <netinet/in.h>
29 #include <sys/un.h>
30 #ifdef HAVE_SYS_SELECT_H
31 #include <sys/select.h>
32 #endif
33 #ifdef HAVE_NETINET_TCP_H
34 #include <netinet/tcp.h>
35 #endif
36 #include <netdb.h>
37 
38 #include <stdio.h>
39 #include <errno.h>
40 #include <signal.h>
41 #include <string.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 
45 #include "pool.h"
46 #include "utils/pool_stream.h"
47 #include "utils/palloc.h"
48 #include "pool_config.h"
49 #include "utils/elog.h"
50 #include "utils/memutils.h"
51 #include "context/pool_process_context.h"
52 
53 static int pool_index;	/* Active pool index */
54 POOL_CONNECTION_POOL *pool_connection_pool;	/* connection pool */
55 volatile sig_atomic_t backend_timer_expired = 0; /* flag for connection closed timer is expired */
56 volatile sig_atomic_t health_check_timer_expired;		/* non 0 if health check timer expired */
57 static POOL_CONNECTION_POOL_SLOT *create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot);
58 static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p);
59 static int check_socket_status(int fd);
60 static bool connect_with_timeout(int fd, struct addrinfo *walk, char *host, int port, bool retry);
61 
62 /*
63 * initialize connection pools. this should be called once at the startup.
64 */
pool_init_cp(void)65 int pool_init_cp(void)
66 {
67 	int i;
68 	MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
69 
70 	pool_connection_pool = (POOL_CONNECTION_POOL *)palloc(sizeof(POOL_CONNECTION_POOL)*pool_config->max_pool);
71 	memset(pool_connection_pool, 0, sizeof(POOL_CONNECTION_POOL)*pool_config->max_pool);
72 
73 	for (i = 0; i < pool_config->max_pool; i++)
74 	{
75 		pool_connection_pool[i].info = pool_coninfo(pool_get_process_context()->proc_id, i, 0);
76 		memset(pool_connection_pool[i].info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
77 	}
78     MemoryContextSwitchTo(oldContext);
79 	return 0;
80 }
81 
82 /*
83 * find connection by user and database
84 */
pool_get_cp(char * user,char * database,int protoMajor,int check_socket)85 POOL_CONNECTION_POOL *pool_get_cp(char *user, char *database, int protoMajor, int check_socket)
86 {
87 	pool_sigset_t oldmask;
88 
89 	int i, freed = 0;
90 	ConnectionInfo *info;
91 
92 	POOL_CONNECTION_POOL *connection_pool = pool_connection_pool;
93 
94 	if (connection_pool == NULL)
95 	{
96         /* if no connection pool exists we have no reason to live */
97         ereport(ERROR,
98                 (return_code(2),
99                  errmsg("unable to get connection"),
100                   errdetail("connection pool is not initialized")));
101 	}
102 
103 	POOL_SETMASK2(&BlockSig, &oldmask);
104 
105 	for (i=0;i<pool_config->max_pool;i++)
106 	{
107 		if (MASTER_CONNECTION(connection_pool) &&
108 			MASTER_CONNECTION(connection_pool)->sp &&
109 			MASTER_CONNECTION(connection_pool)->sp->major == protoMajor &&
110 			MASTER_CONNECTION(connection_pool)->sp->user != NULL &&
111 			strcmp(MASTER_CONNECTION(connection_pool)->sp->user, user) == 0 &&
112 			strcmp(MASTER_CONNECTION(connection_pool)->sp->database, database) == 0)
113 		{
114 			int sock_broken = 0;
115 			int j;
116 
117 			/* mark this connection is under use */
118 			MASTER_CONNECTION(connection_pool)->closetime = 0;
119 			for (j=0;j<NUM_BACKENDS;j++)
120 			{
121 				connection_pool->info[j].counter++;
122 			}
123 			POOL_SETMASK(&oldmask);
124 
125 			if (check_socket)
126 			{
127 				for (j=0;j<NUM_BACKENDS;j++)
128 				{
129 					if (!VALID_BACKEND(j))
130 						continue;
131 
132 					if  (CONNECTION_SLOT(connection_pool, j))
133 					{
134 						sock_broken = check_socket_status(CONNECTION(connection_pool, j)->fd);
135 						if (sock_broken < 0)
136 							break;
137 					}
138 					else
139 					{
140 						sock_broken = -1;
141 						break;
142 					}
143 				}
144 
145 				if (sock_broken < 0)
146 				{
147 					ereport(LOG,
148 						(errmsg("connection closed."),
149 							 errdetail("retry to create new connection pool")));
150 
151 					for (j=0;j<NUM_BACKENDS;j++)
152 					{
153 						if (!VALID_BACKEND(j) || (CONNECTION_SLOT(connection_pool, j) == NULL))
154 							continue;
155 
156 						if (!freed)
157 						{
158 							pool_free_startup_packet(CONNECTION_SLOT(connection_pool, j)->sp);
159                             CONNECTION_SLOT(connection_pool, j)->sp = NULL;
160 
161 							freed = 1;
162 						}
163 
164 						pool_close(CONNECTION(connection_pool, j));
165 						pfree(CONNECTION_SLOT(connection_pool, j));
166 					}
167 					info = connection_pool->info;
168 					memset(connection_pool, 0, sizeof(POOL_CONNECTION_POOL));
169 					connection_pool->info = info;
170 					info->swallow_termination = 0;
171 					memset(connection_pool->info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
172 					POOL_SETMASK(&oldmask);
173 					return NULL;
174 				}
175 			}
176 			POOL_SETMASK(&oldmask);
177 			pool_index = i;
178 			return connection_pool;
179 		}
180 		connection_pool++;
181 	}
182 
183 	POOL_SETMASK(&oldmask);
184 	return NULL;
185 }
186 
187 /*
188  * disconnect and release a connection to the database
189  */
pool_discard_cp(char * user,char * database,int protoMajor)190 void pool_discard_cp(char *user, char *database, int protoMajor)
191 {
192 	POOL_CONNECTION_POOL *p = pool_get_cp(user, database, protoMajor, 0);
193 	ConnectionInfo *info;
194 	int i, freed = 0;
195 
196 	if (p == NULL)
197 	{
198 		ereport(LOG,
199 			(errmsg("cannot get connection pool for user: \"%s\" database: \"%s\", while discarding connection pool", user, database)));
200 		return;
201 	}
202 
203 	for (i=0;i<NUM_BACKENDS;i++)
204 	{
205 		if (!VALID_BACKEND(i))
206 			continue;
207 
208 		if (!freed)
209 		{
210 			pool_free_startup_packet(CONNECTION_SLOT(p, i)->sp);
211 			freed = 1;
212 		}
213         CONNECTION_SLOT(p, i)->sp = NULL;
214 		pool_close(CONNECTION(p, i));
215 		pfree(CONNECTION_SLOT(p, i));
216 	}
217 
218 	info = p->info;
219 	memset(p, 0, sizeof(POOL_CONNECTION_POOL));
220 	p->info = info;
221 	memset(p->info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
222 }
223 
224 
225 /*
226 * create a connection pool by user and database
227 */
pool_create_cp(void)228 POOL_CONNECTION_POOL *pool_create_cp(void)
229 {
230 	int i, freed = 0;
231 	time_t closetime;
232 	POOL_CONNECTION_POOL *oldestp;
233 	POOL_CONNECTION_POOL *ret;
234 	ConnectionInfo *info;
235 
236 	POOL_CONNECTION_POOL *p = pool_connection_pool;
237     /* if no connection pool exists we have no reason to live */
238 	if (p == NULL)
239         ereport(ERROR,
240 			(return_code(2),
241 				 errmsg("unable to create connection"),
242 					errdetail("connection pool is not initialized")));
243 
244 	for (i=0;i<pool_config->max_pool;i++)
245 	{
246 		if (MASTER_CONNECTION(p) == NULL)
247 		{
248 			ret = new_connection(p);
249 			if (ret)
250 				pool_index = i;
251 			return ret;
252 		}
253 		p++;
254 	}
255 	ereport(DEBUG1,
256 		(errmsg("creating connection pool"),
257 			 errdetail("no empty connection slot was found")));
258 
259 	/*
260 	 * no empty connection slot was found. look for the oldest connection and discard it.
261 	 */
262 	oldestp = p = pool_connection_pool;
263 	closetime = MASTER_CONNECTION(p)->closetime;
264 	pool_index = 0;
265 
266 	for (i=0;i<pool_config->max_pool;i++)
267 	{
268 		ereport(DEBUG1,
269 			(errmsg("creating connection pool"),
270 				errdetail("user: %s database: %s closetime: %ld",
271 					   MASTER_CONNECTION(p)->sp->user,
272 					   MASTER_CONNECTION(p)->sp->database,
273 					   MASTER_CONNECTION(p)->closetime)));
274 
275 		if (MASTER_CONNECTION(p)->closetime < closetime)
276 		{
277 			closetime = MASTER_CONNECTION(p)->closetime;
278 			oldestp = p;
279 			pool_index = i;
280 		}
281 		p++;
282 	}
283 
284 	p = oldestp;
285 	pool_send_frontend_exits(p);
286 
287 	ereport(DEBUG1,
288 		(errmsg("creating connection pool"),
289 			errdetail("discarding old %zd th connection. user: %s database: %s",
290 				   oldestp - pool_connection_pool,
291 				   MASTER_CONNECTION(p)->sp->user,
292 				   MASTER_CONNECTION(p)->sp->database)));
293 
294 	for (i=0;i<NUM_BACKENDS;i++)
295 	{
296 		if (!VALID_BACKEND(i))
297 			continue;
298 
299 		if (!freed)
300 		{
301 			pool_free_startup_packet(CONNECTION_SLOT(p, i)->sp);
302             CONNECTION_SLOT(p, i)->sp = NULL;
303 
304 			freed = 1;
305 		}
306 
307 		pool_close(CONNECTION(p, i));
308 		pfree(CONNECTION_SLOT(p, i));
309 	}
310 
311 	info = p->info;
312 	memset(p, 0, sizeof(POOL_CONNECTION_POOL));
313 	p->info = info;
314 	memset(p->info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
315 
316 	ret = new_connection(p);
317 	return ret;
318 }
319 
320 /*
321  * set backend connection close timer
322  */
pool_connection_pool_timer(POOL_CONNECTION_POOL * backend)323 void pool_connection_pool_timer(POOL_CONNECTION_POOL *backend)
324 {
325 	POOL_CONNECTION_POOL *p = pool_connection_pool;
326 	int i;
327 
328 	ereport(DEBUG1,
329 		(errmsg("setting backend connection close timer"),
330 			 errdetail("close time %ld", time(NULL))));
331 
332 	/* Set connection close time */
333 	for (i = 0; i < NUM_BACKENDS; i++)
334 	{
335 		if (CONNECTION_SLOT(backend, i))
336 			CONNECTION_SLOT(backend, i)->closetime = time(NULL);
337 	}
338 
339 	if (pool_config->connection_life_time == 0)
340 		return;
341 
342 	/* look for any other timeout */
343 	for (i=0;i<pool_config->max_pool;i++, p++)
344 	{
345 		if (!MASTER_CONNECTION(p))
346 			continue;
347 		if (!MASTER_CONNECTION(p)->sp)
348 			continue;
349 		if (MASTER_CONNECTION(p)->sp->user == NULL)
350 			continue;
351 
352 		if (p != backend && MASTER_CONNECTION(p)->closetime)
353 			return;
354 	}
355 
356 	/* no other timer found. set my timer */
357 	ereport(DEBUG1,
358 		(errmsg("setting backend connection close timer"),
359 			 errdetail("setting alarm after %d seconds", pool_config->connection_life_time)));
360 
361 	pool_alarm(pool_backend_timer_handler, pool_config->connection_life_time);
362 }
363 
364 /*
365  * backend connection close timer handler
366  */
pool_backend_timer_handler(int sig)367 RETSIGTYPE pool_backend_timer_handler(int sig)
368 {
369 	backend_timer_expired = 1;
370 }
371 
pool_backend_timer(void)372 void pool_backend_timer(void)
373 {
374 #define TMINTMAX 0x7fffffff
375 
376 	POOL_CONNECTION_POOL *p = pool_connection_pool;
377 	int i, j;
378 	time_t now;
379 	time_t nearest = TMINTMAX;
380 	ConnectionInfo *info;
381 
382 	POOL_SETMASK(&BlockSig);
383 
384 	now = time(NULL);
385 
386 	ereport(DEBUG1,
387 			(errmsg("backend timer handler called at %ld", now)));
388 
389 	for (i=0;i<pool_config->max_pool;i++, p++)
390 	{
391 		if (!MASTER_CONNECTION(p))
392 			continue;
393 		if (!MASTER_CONNECTION(p)->sp)
394 			continue;
395 		if (MASTER_CONNECTION(p)->sp->user == NULL)
396 			continue;
397 
398 		/* timer expire? */
399 		if (MASTER_CONNECTION(p)->closetime)
400 		{
401 			int freed = 0;
402 
403 			ereport(DEBUG1,
404 				(errmsg("backend timer handler called"),
405 					errdetail("expire time: %ld",
406 						   MASTER_CONNECTION(p)->closetime+pool_config->connection_life_time)));
407 
408 			if (now >= (MASTER_CONNECTION(p)->closetime+pool_config->connection_life_time))
409 			{
410 				/* discard expired connection */
411 				ereport(DEBUG1,
412 					(errmsg("backend timer handler called"),
413 						errdetail("expired user: \"%s\" database: \"%s\"",
414 							   MASTER_CONNECTION(p)->sp->user, MASTER_CONNECTION(p)->sp->database)));
415 				pool_send_frontend_exits(p);
416 
417 				for (j=0;j<NUM_BACKENDS;j++)
418 				{
419 					if (!VALID_BACKEND(j))
420 						continue;
421 
422 					if (!freed)
423 					{
424 						pool_free_startup_packet(CONNECTION_SLOT(p, j)->sp);
425 						freed = 1;
426 					}
427                     CONNECTION_SLOT(p, j)->sp = NULL;
428 					pool_close(CONNECTION(p, j));
429 					pfree(CONNECTION_SLOT(p, j));
430 				}
431 				info = p->info;
432 				memset(p, 0, sizeof(POOL_CONNECTION_POOL));
433 				p->info = info;
434 				memset(p->info, 0, sizeof(ConnectionInfo) * MAX_NUM_BACKENDS);
435 			}
436 			else
437 			{
438 				/* look for nearest timer */
439 				if (MASTER_CONNECTION(p)->closetime < nearest)
440 					nearest = MASTER_CONNECTION(p)->closetime;
441 			}
442 		}
443 	}
444 
445 	/* any remaining timer */
446 	if (nearest != TMINTMAX)
447 	{
448 		nearest = pool_config->connection_life_time - (now - nearest);
449 		if (nearest <= 0)
450 		  nearest = 1;
451 		pool_alarm(pool_backend_timer_handler, nearest);
452 	}
453 
454 	POOL_SETMASK(&UnBlockSig);
455 }
456 
457 /*
458  * connect to postmaster through INET domain socket
459  */
connect_inet_domain_socket(int slot,bool retry)460 int connect_inet_domain_socket(int slot, bool retry)
461 {
462 	char *host;
463 	int port;
464 
465 	host = pool_config->backend_desc->backend_info[slot].backend_hostname;
466 	port = pool_config->backend_desc->backend_info[slot].backend_port;
467 
468 	return connect_inet_domain_socket_by_port(host, port, retry);
469 }
470 
471 /*
472  * connect to postmaster through UNIX domain socket
473  */
connect_unix_domain_socket(int slot,bool retry)474 int connect_unix_domain_socket(int slot, bool retry)
475 {
476 	int port;
477 	char *socket_dir;
478 
479 	port = pool_config->backend_desc->backend_info[slot].backend_port;
480 	socket_dir = pool_config->backend_desc->backend_info[slot].backend_hostname;
481 
482 	return connect_unix_domain_socket_by_port(port, socket_dir, retry);
483 }
484 
485 /*
486  * Connect to PostgreSQL server by using UNIX domain socket.
487  * If retry is true, retry to call connect() upon receiving EINTR error.
488  */
connect_unix_domain_socket_by_port(int port,char * socket_dir,bool retry)489 int connect_unix_domain_socket_by_port(int port, char *socket_dir, bool retry)
490 {
491 	struct sockaddr_un addr;
492 	int fd;
493 	int len;
494 
495 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
496 	if (fd == -1)
497 	{
498 		ereport(LOG,
499 			(errmsg("failed to connect to PostgreSQL server by unix domain socket"),
500 				 errdetail("create socket failed with error \"%s\"", strerror(errno))));
501 		return -1;
502 	}
503 
504 	memset((char *) &addr, 0, sizeof(addr));
505 	addr.sun_family = AF_UNIX;
506 	snprintf(addr.sun_path, sizeof(addr.sun_path), "%s/.s.PGSQL.%d", socket_dir, port);
507 	len = sizeof(struct sockaddr_un);
508 
509 	for (;;)
510 	{
511 		if (exit_request)		/* exit request already sent */
512 		{
513 			ereport(LOG,
514 				(errmsg("failed to connect to PostgreSQL server by unix domain socket"),
515 					 errdetail("exit request has been sent")));
516 			close(fd);
517 			return -1;
518 		}
519 
520 		if (connect(fd, (struct sockaddr *)&addr, len) < 0)
521 		{
522 			if ((errno == EINTR && retry) || errno == EAGAIN)
523 				continue;
524 			close(fd);
525 			ereport(LOG,
526 				(errmsg("failed to connect to PostgreSQL server by unix domain socket"),
527 					 errdetail("connect to \"%s\" failed with error \"%s\"",addr.sun_path, strerror(errno))));
528 
529 			return -1;
530 		}
531 		break;
532 	}
533 
534 	return fd;
535 }
536 
537 /*
538  * Connet to backend using pool_config->connect_timeout.
539  *
540  * fd: the socket
541  * walk: backend address to connect
542  * host and port: backend hostname and port number. Only for error message
543  * purpose.
544  * retry: true if need to retry
545  */
connect_with_timeout(int fd,struct addrinfo * walk,char * host,int port,bool retry)546 static bool connect_with_timeout(int fd, struct addrinfo *walk, char *host, int port, bool retry)
547 {
548 	struct timeval *tm;
549 	struct timeval timeout;
550 	fd_set rset, wset;
551 	int sts;
552 	int error;
553 	socklen_t socklen;
554 
555 	pool_set_nonblock(fd);
556 
557 	for (;;)
558 	{
559 		if (exit_request)		/* exit request already sent */
560 		{
561 			ereport(LOG,
562 				(errmsg("failed to connect to PostgreSQL server on \"%s:%d\" using INET socket",host,port),
563 					 errdetail("exit request has been sent")));
564 			close(fd);
565 			return false;
566 		}
567 
568 		if (health_check_timer_expired && getpid() == mypid)		/* has health check timer expired */
569 		{
570 			ereport(LOG,
571 				(errmsg("failed to connect to PostgreSQL server on \"%s:%d\" using INET socket",host,port),
572 					 errdetail("health check timer expired")));
573 			close(fd);
574 			return false;
575 		}
576 
577 		if (connect(fd, walk->ai_addr, walk->ai_addrlen) < 0)
578 		{
579 			if (errno == EISCONN)
580 			{
581 				/* Socket is already connected */
582 				break;
583 			}
584 
585 			if ((errno == EINTR && retry) || errno == EAGAIN)
586 				continue;
587 
588 			/*
589 			 * If error was "connect(2) is in progress", then wait for
590 			 * completion.  Otherwise error out.
591 			 */
592 			if (errno != EINPROGRESS && errno != EALREADY)
593 			{
594 				ereport(LOG,
595 					(errmsg("failed to connect to PostgreSQL server on \"%s:%d\" with error \"%s\"",host,port,strerror(errno))));
596 				return false;
597 			}
598 
599 			if (pool_config->connect_timeout == 0)
600 				tm = NULL;
601 			else
602 			{
603 				tm = &timeout;
604 				timeout.tv_sec = pool_config->connect_timeout/1000;
605 				if (timeout.tv_sec == 0)
606 				{
607 					timeout.tv_usec = pool_config->connect_timeout*1000;
608 				}
609 				else
610 				{
611 					timeout.tv_usec = (pool_config->connect_timeout - timeout.tv_sec*1000)*1000;
612 				}
613 			}
614 
615 			FD_ZERO(&rset);
616 			FD_SET(fd, &rset);
617 			FD_ZERO(&wset);
618 			FD_SET(fd, &wset);
619 			sts = select(fd+1, &rset, &wset, NULL, tm);
620 
621 			if (sts == 0)
622 			{
623 				/* select timeout */
624 				if (retry)
625 				{
626 					ereport(LOG,
627 						(errmsg("trying connecting to PostgreSQL server on \"%s:%d\" by INET socket",host,port),
628 							 errdetail("timed out. retrying...")));
629 					continue;
630 				}
631 				else
632 				{
633 					ereport(LOG,
634 						(errmsg("failed to connect to PostgreSQL server on \"%s:%d\", timed out",host,port)));
635 					return false;
636 				}
637 			}
638 			else if (sts > 0)
639 			{
640 				/*
641 				 * If read data or write data was set, either connect
642 				 * succeeded or error.  We need to figure it out. This
643 				 * is the hardest part in using non blocking
644 				 * connect(2).  See W. Richar Stevens's "UNIX Network
645 				 * Programming: Volume 1, Second Edition" section
646 				 * 15.4.
647 				 */
648 				if (FD_ISSET(fd, &rset) || FD_ISSET(fd, &wset))
649 				{
650 					error = 0;
651 					socklen = sizeof(error);
652 					if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &socklen) < 0)
653 					{
654 						/* Solaris returns error in this case */
655 						ereport(LOG,
656 							(errmsg("failed to connect to PostgreSQL server on \"%s:%d\", getsockopt() failed with error \"%s\"",host,port,strerror(errno))));
657 
658 						return false;
659 					}
660 
661 					/* Non Solaris case */
662 					if (error != 0)
663 					{
664 						ereport(LOG,
665 								(errmsg("failed to connect to PostgreSQL server on \"%s:%d\", getsockopt() detected error \"%s\"",host,port,strerror(error))));
666 						return false;
667 					}
668 				}
669 				else
670 				{
671 					ereport(LOG,
672 							(errmsg("failed to connect to PostgreSQL server on \"%s:%d\", both read data and write data was not set",host,port)));
673 
674 					return false;
675 				}
676 			}
677 			else		/* select returns error */
678 			{
679 				if ((errno == EINTR && retry) || errno == EAGAIN)
680 				{
681 					ereport(LOG,
682 						(errmsg("trying to connect to PostgreSQL server on \"%s:%d\" using INET socket",host,port),
683 							 errdetail("select() interrupted. retrying...")));
684 					continue;
685 				}
686 
687 				/*
688 				 * select(2) was interrupted by certain signal and we guess it
689 				 * was not SIGALRM because health_check_timer_expired was not
690 				 * set (if the variable was set, we can assume that SIGALRM
691 				 * handler was called). Surely this is not a health check time
692 				 * out. We can assume that this is a transient case. So we
693 				 * will retry again...
694 				 */
695 				if (health_check_timer_expired == 0 && errno == EINTR)
696 				{
697 					ereport(LOG,
698 							(errmsg("connect_inet_domain_socket: select() interrupted by certain signal. retrying...")));
699 					continue;
700 				}
701 
702 				ereport(LOG,
703 					(errmsg("failed to connect to PostgreSQL server on \"%s:%d\" using INET socket",host,port),
704 						 errdetail("select() system call failed with an error \"%s\"",strerror(errno))));
705 				close(fd);
706 				return false;
707 			}
708 		}
709 		break;
710 	}
711 
712 	pool_unset_nonblock(fd);
713 	return true;
714 }
715 
716 /*
717  * Connect to PostgreSQL server by using INET domain socket.
718  * If retry is true, retry to call connect() upon receiving EINTR error.
719  */
connect_inet_domain_socket_by_port(char * host,int port,bool retry)720 int connect_inet_domain_socket_by_port(char *host, int port, bool retry)
721 {
722 	int fd = -1;
723 	int on = 1;
724 	char *portstr;
725 	int ret;
726 	struct addrinfo *res;
727 	struct addrinfo *walk;
728 	struct addrinfo hints;
729 
730 	/* getaddrinfo() requires a string because it also accepts service names, such as "http". */
731 	if (asprintf(&portstr, "%d", port) == -1)
732 	{
733 		ereport(WARNING,
734 			(errmsg("failed to connect to PostgreSQL server, asprintf() failed with error \"%s\"",strerror(errno))));
735 
736 		return -1;
737 	}
738 
739 	memset(&hints, 0, sizeof(struct addrinfo));
740 	hints.ai_family = PF_UNSPEC;
741 	hints.ai_socktype = SOCK_STREAM;
742 
743 	if ((ret = getaddrinfo(host, portstr, &hints, &res)) != 0)
744 	{
745 		ereport(WARNING,
746 			(errmsg("failed to connect to PostgreSQL server, getaddrinfo() failed with error \"%s\"",gai_strerror(ret))));
747 
748 		free(portstr);
749 		return -1;
750 	}
751 
752 	free(portstr);
753 
754 	for (walk = res; walk != NULL; walk = walk->ai_next)
755 	{
756 		fd = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol);
757 		if (fd < 0)
758 		{
759 			ereport(WARNING,
760 					(errmsg("failed to connect to PostgreSQL server, socket() failed with error \"%s\"",strerror(errno))));
761 			continue;
762 		}
763 
764 		/* set nodelay */
765 		if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
766 					   (char *) &on,
767 					   sizeof(on)) < 0)
768 		{
769 			ereport(WARNING,
770 					(errmsg("failed to connect to PostgreSQL server, setsockopt() failed with error \"%s\"",strerror(errno))));
771 
772 			close(fd);
773 			freeaddrinfo(res);
774 			return -1;
775 		}
776 
777 		if (!connect_with_timeout(fd, walk, host, port, retry))
778 		{
779 			close(fd);
780 			continue;
781 		}
782 
783 		freeaddrinfo(res);
784 		return fd;
785 	}
786 
787 	freeaddrinfo(res);
788 	return -1;
789 }
790 
791 /*
792  * create connection pool
793  */
create_cp(POOL_CONNECTION_POOL_SLOT * cp,int slot)794 static POOL_CONNECTION_POOL_SLOT *create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot)
795 {
796 	BackendInfo *b = &pool_config->backend_desc->backend_info[slot];
797 	int fd;
798 
799 	if (*b->backend_hostname == '/')
800 	{
801 		fd = connect_unix_domain_socket(slot, TRUE);
802 	}
803 	else
804 	{
805 		fd = connect_inet_domain_socket(slot, TRUE);
806 	}
807 
808 	if (fd < 0)
809 		return NULL;
810 
811 	cp->sp = NULL;
812 	cp->con = pool_open(fd,true);
813 	cp->closetime = 0;
814 	return cp;
815 }
816 
817 /*
818  * Create actual connections to backends.
819  * New connection resides in TopMemoryContext.
820  */
new_connection(POOL_CONNECTION_POOL * p)821 static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p)
822 {
823 	POOL_CONNECTION_POOL_SLOT *s;
824 	int active_backend_count = 0;
825 	int i;
826 	bool		status_changed = false;
827 
828     MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
829 
830 	for (i=0;i<NUM_BACKENDS;i++)
831 	{
832 		ereport(DEBUG1,
833 			(errmsg("creating new connection to backend"),
834 				 errdetail("connecting %d backend", i)));
835 
836 		if (!VALID_BACKEND(i))
837 		{
838 			ereport(DEBUG1,
839 				(errmsg("creating new connection to backend"),
840 					errdetail("skipping backend slot %d because backend_status = %d",
841 						   i, BACKEND_INFO(i).backend_status)));
842 			continue;
843 		}
844 
845 		/*
846 		 * Make sure that the global backend status in the shared memory
847 		 * agrees the local status checked by VALID_BACKEND. It is possible
848 		 * that the local status is up, while the global status has been
849 		 * changed to down by failover.
850 		 */
851 		if (BACKEND_INFO(i).backend_status != CON_UP &&
852 			BACKEND_INFO(i).backend_status != CON_CONNECT_WAIT)
853 		{
854 			ereport(DEBUG1,
855 					(errmsg("creating new connection to backend"),
856 					errdetail("skipping backend slot %d because global backend_status = %d",
857 						   i, BACKEND_INFO(i).backend_status)));
858 
859 			/* sync local status with global status */
860 			*(my_backend_status[i]) = BACKEND_INFO(i).backend_status;
861 			continue;
862 		}
863 
864 		s = palloc(sizeof(POOL_CONNECTION_POOL_SLOT));
865 
866 		if (create_cp(s, i) == NULL)
867 		{
868 			/* If fail_over_on_backend_error is true, do failover.
869 			 * Otherwise, just exit this session or skip next health node.
870 			 */
871 			if (pool_config->fail_over_on_backend_error)
872 			{
873 				notice_backend_error(i, true);
874 				ereport(FATAL,
875 					(errmsg("failed to create a backend connection"),
876 						 errdetail("executing failover on backend")));
877 			}
878 			else
879 			{
880 				/*
881 				 * If we are in streaming replication mode and the node is a
882 				 * standby node, then we skip this node to avoid fail over.
883 				 */
884 				if (STREAM && !IS_PRIMARY_NODE_ID(i))
885 				{
886 					ereport(LOG,
887 							(errmsg("failed to create a backend %d connection", i),
888 							 errdetail("skip this backend because because fail_over_on_backend_error is off and we are in streaming replication mode and node is standby node")));
889 
890 					/* set down status to local status area */
891 					*(my_backend_status[i]) = CON_DOWN;
892 
893 					/* if master_node_id is not updated, then update it */
894 					if (Req_info->master_node_id == i)
895 					{
896 						int old_master = Req_info->master_node_id;
897 						Req_info->master_node_id = get_next_master_node();
898 
899 						ereport(LOG,
900 								(errmsg("master node %d is down. Update master node to %d",
901 										old_master, Req_info->master_node_id)));
902 					}
903 
904 					/* make sure that we need to restart the process after
905 					 * finishing this session
906 					 */
907 					pool_get_my_process_info()->need_to_restart = 1;
908 					continue;
909 				}
910 				else
911 				{
912 					ereport(FATAL,
913 							(errmsg("failed to create a backend %d connection", i),
914 							 errdetail("not executing failover because fail_over_on_backend_error is off")));
915 				}
916 			}
917 			child_exit(POOL_EXIT_AND_RESTART);
918 		}
919 
920 		p->info[i].create_time = time(NULL);
921 		p->slots[i] = s;
922 
923 		pool_init_params(&s->con->params);
924 
925 		if (BACKEND_INFO(i).backend_status != CON_UP)
926 		{
927 			BACKEND_INFO(i).backend_status = CON_UP;
928 			status_changed = true;
929 		}
930 		active_backend_count++;
931 	}
932 
933 	if (status_changed)
934 		(void)write_status_file();
935 
936     MemoryContextSwitchTo(oldContext);
937 
938 	if (active_backend_count > 0)
939 	{
940 		return p;
941 	}
942 
943 	return NULL;
944 }
945 
946 /* check_socket_status()
947  * RETURN: 0 => OK
948  *        -1 => broken socket.
949  */
check_socket_status(int fd)950 static int check_socket_status(int fd)
951 {
952 	fd_set rfds;
953 	int result;
954 	struct timeval t;
955 
956 	for (;;)
957 	{
958 		FD_ZERO(&rfds);
959 		FD_SET(fd, &rfds);
960 
961 		t.tv_sec = t.tv_usec = 0;
962 
963 		result = select(fd+1, &rfds, NULL, NULL, &t);
964 		if (result < 0 && errno == EINTR)
965 		{
966 			continue;
967 		}
968 		else
969 		{
970 			return (result == 0 ? 0 : -1);
971 		}
972 	}
973 
974 	return -1;
975 }
976 
977 /*
978  * Return current used index (i.e. frontend connected)
979  */
pool_pool_index(void)980 int pool_pool_index(void)
981 {
982 	return pool_index;
983 }
984