1 /* -*-pgpool_main-c-*- */
2 /*
3 * $Header$
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2021 PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
20 */
21
22 #include <ctype.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <sys/un.h>
27 #include <netdb.h>
28 #include <arpa/inet.h>
29 #include <sys/time.h>
30 #ifdef HAVE_SYS_SELECT_H
31 #include <sys/select.h>
32 #endif
33 #include <sys/stat.h>
34 #include <sys/types.h>
35 #include <fcntl.h>
36
37 #include <sys/wait.h>
38
39 #include <stdio.h>
40 #include <errno.h>
41 #include <unistd.h>
42 #include <stdlib.h>
43 #include <string.h>
44
45 #include <signal.h>
46
47 #include <libgen.h>
48
49 #ifdef __FreeBSD__
50 #include <netinet/in.h>
51 #endif
52
53 #include "utils/elog.h"
54 #include "utils/palloc.h"
55
56 #include "pool.h"
57 #include "utils/palloc.h"
58 #include "utils/memutils.h"
59 #include "pool_config.h"
60 #include "context/pool_process_context.h"
61 #include "version.h"
62 #include "parser/pool_string.h"
63 #include "auth/pool_passwd.h"
64 #include "query_cache/pool_memqcache.h"
65 #include "watchdog/wd_ipc_commands.h"
66 #include "watchdog/wd_lifecheck.h"
67
68 #include "watchdog/watchdog.h"
69
70 /*
71 * Reasons for signalling a pgpool-II main process
72 */
73 typedef enum
74 {
75 SIG_FAILOVER_INTERRUPT, /* signal main to start failover */
76 SIG_WATCHDOG_STATE_CHANGED, /* notify main about local watchdog node state changed */
77 MAX_INTERRUPTS /* Must be last! */
78 } User1SignalReason;
79
80
81 typedef struct User1SignalSlot
82 {
83 sig_atomic_t signalFlags[MAX_INTERRUPTS];
84 }User1SignalSlot;
85 /*
86 * Process pending signal actions.
87 */
88 #define CHECK_REQUEST \
89 do { \
90 if (wakeup_request) \
91 { \
92 wakeup_children(); \
93 wakeup_request = 0; \
94 } \
95 if (sigusr1_request) \
96 { \
97 do {\
98 sigusr1_request = 0; \
99 sigusr1_interrupt_processor(); \
100 } while (sigusr1_request == 1); \
101 } \
102 if (sigchld_request) \
103 { \
104 reaper(); \
105 } \
106 if (reload_config_request) \
107 { \
108 reload_config(); \
109 reload_config_request = 0; \
110 } \
111 } while (0)
112
113 #define CLEAR_ALARM \
114 do { \
115 elog(DEBUG1,"health check: clearing alarm"); \
116 } while (alarm(0) > 0)
117
118
119 #define PGPOOLMAXLITSENQUEUELENGTH 10000
120
121 static int process_backend_health_check_failure(int health_check_node_id, int retrycnt);
122 static bool do_health_check(bool use_template_db, volatile int *health_check_node_id);
123 static void signal_user1_to_parent_with_reason(User1SignalReason reason);
124
125 static void FileUnlink(int code, Datum path);
126 static pid_t pcp_fork_a_child(int unix_fd, int inet_fd, char *pcp_conf_file);
127 static pid_t fork_a_child(int *fds, int id);
128 static pid_t worker_fork_a_child(void);
129 static int create_unix_domain_socket(struct sockaddr_un un_addr_tmp);
130 static int create_inet_domain_socket(const char *hostname, const int port);
131 static int *create_inet_domain_sockets(const char *hostname, const int port);
132 static void failover(void);
133 static bool check_all_backend_down(void);
134 static void reaper(void);
135 static void wakeup_children(void);
136 static void reload_config(void);
137 static int pool_pause(struct timeval *timeout);
138 static void kill_all_children(int sig);
139 static pid_t fork_follow_child(int old_master, int new_primary, int old_primary);
140 static int read_status_file(bool discard_status);
141 static RETSIGTYPE exit_handler(int sig);
142 static RETSIGTYPE reap_handler(int sig);
143 static RETSIGTYPE sigusr1_handler(int sig);
144 static void sigusr1_interrupt_processor(void);
145 static RETSIGTYPE reload_config_handler(int sig);
146 static RETSIGTYPE health_check_timer_handler(int sig);
147 static RETSIGTYPE wakeup_handler(int sig);
148
149 static void initialize_shared_mem_objects(bool clear_memcache_oidmaps);
150 static int trigger_failover_command(int node, const char *command_line,
151 int old_master, int new_master, int old_primary);
152 static bool verify_backend_node_status(int backend_no, bool* is_standby);
153 static int find_primary_node(void);
154 static int find_primary_node_repeatedly(void);
155 static void terminate_all_childrens();
156 static void system_will_go_down(int code, Datum arg);
157 static char* process_name_from_pid(pid_t pid);
158 static void sync_backend_from_watchdog(void);
159
160 static struct sockaddr_un un_addr; /* unix domain socket path */
161 static struct sockaddr_un pcp_un_addr; /* unix domain socket path for PCP */
162 ProcessInfo *process_info = NULL; /* Per child info table on shmem */
163 volatile User1SignalSlot *user1SignalSlot = NULL;/* User 1 signal slot on shmem */
164 struct timeval random_start_time;
165
166 /*
167 * Private copy of backend status
168 */
169 BACKEND_STATUS private_backend_status[MAX_NUM_BACKENDS];
170
171 /*
172 * shmem connection info table
173 * this is a three dimension array. i.e.:
174 * con_info[pool_config->num_init_children][pool_config->max_pool][MAX_NUM_BACKENDS]
175 */
176 ConnectionInfo *con_info;
177
178 static int *fds; /* listening file descriptors (UNIX socket, inet domain sockets) */
179
180 static int pcp_unix_fd; /* unix domain socket fd for PCP (not used) */
181 static int pcp_inet_fd; /* inet domain socket fd for PCP */
182 extern char *pcp_conf_file; /* path for pcp.conf */
183 extern char *conf_file;
184 extern char *hba_file;
185
186 static int exiting = 0; /* non 0 if I'm exiting */
187 static int switching = 0; /* non 0 if I'm failing over or degenerating */
188
189 POOL_REQUEST_INFO *Req_info; /* request info area in shared memory */
190 volatile sig_atomic_t *InRecovery; /* non 0 if recovery is started */
191 volatile sig_atomic_t reload_config_request = 0;
192 static volatile sig_atomic_t sigusr1_request = 0;
193 static volatile sig_atomic_t sigchld_request = 0;
194 static volatile sig_atomic_t wakeup_request = 0;
195
196 static int pipe_fds[2]; /* for delivering signals */
197
198 int my_proc_id;
199
200 static BackendStatusRecord backend_rec; /* Backend status record */
201
202 static pid_t worker_pid = 0; /* pid of worker process */
203 static pid_t follow_pid = 0; /* pid for child process handling follow command */
204 static pid_t pcp_pid = 0; /* pid for child process handling PCP */
205 static pid_t watchdog_pid = 0; /* pid for watchdog child process */
206 static pid_t wd_lifecheck_pid = 0; /* pid for child process handling watchdog lifecheck */
207
208 BACKEND_STATUS* my_backend_status[MAX_NUM_BACKENDS]; /* Backend status buffer */
209 int my_master_node_id; /* Master node id buffer */
210
211 /*
212 * Dummy varibale to suppress compiler warnings by discarding return values
213 * from write(2) in signal handlers
214 */
215 static int dummy_status;
216
217 /*
218 * pgpool main program
219 */
220
PgpoolMain(bool discard_status,bool clear_memcache_oidmaps)221 int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
222 {
223 int i;
224 volatile int health_check_node_id = 0;
225 volatile bool use_template_db = false;
226 volatile int retrycnt;
227
228 MemoryContext MainLoopMemoryContext;
229 sigjmp_buf local_sigjmp_buf;
230
231 /*
232 * to prevent the variable set on a register so that longjmp() does not
233 * discard the content
234 */
235 volatile bool first = true;
236
237 /* For PostmasterRandom */
238 gettimeofday(&random_start_time, NULL);
239
240 /* Set the process type variable */
241 processType = PT_MAIN;
242 processState = INITIALIZING;
243
244 /*
245 * Restore previous backend status if possible
246 */
247 read_status_file(discard_status);
248
249 /*
250 * install the call back for preparation of system exit
251 */
252 on_system_exit(system_will_go_down, (Datum)NULL);
253
254 /* set unix domain socket path for connections to pgpool */
255 snprintf(un_addr.sun_path, sizeof(un_addr.sun_path), "%s/.s.PGSQL.%d",
256 pool_config->socket_dir,
257 pool_config->port);
258 /* set unix domain socket path for pgpool PCP communication */
259 snprintf(pcp_un_addr.sun_path, sizeof(pcp_un_addr.sun_path), "%s/.s.PGSQL.%d",
260 pool_config->pcp_socket_dir,
261 pool_config->pcp_port);
262
263 /* set up signal handlers */
264 pool_signal(SIGPIPE, SIG_IGN);
265
266 /* create unix domain socket */
267 fds = malloc(sizeof(int) * 2);
268 if (fds == NULL)
269 ereport(FATAL,
270 (errmsg("failed to allocate memory in startup process")));
271
272 initialize_shared_mem_objects(clear_memcache_oidmaps);
273 if (pool_config->use_watchdog)
274 {
275 sigset_t mask;
276 wakeup_request = 0;
277
278 /* Watchdog process fires SIGUSR2 once in stable state,
279 * so install the SIGUSR2 handler first up. In addition,
280 * when wathcodg fails to start with FATAL, the process
281 * exits and SIGCHLD is fired, so SIGCHLD handelr is also
282 * needed.
283 * Finally, we also need to set the SIGUSR1 handler for the
284 * failover requests from other watchdog nodes.
285 * In case a request arrives at the same time when the
286 * watchdog has just been initialized.
287 */
288 pool_signal(SIGUSR2, wakeup_handler);
289 pool_signal(SIGCHLD, reap_handler);
290 pool_signal(SIGUSR1, sigusr1_handler);
291
292 /*
293 * okay as we need to wait until watchdog is in stable state
294 * so only wait for SIGUSR1, SIGCHLD, and signals those are
295 * necessary to make sure we respond to user requests of shutdown
296 * if it arrives while we are in waiting state.
297 *
298 * Note that SIGUSR1 does not need to be in the wait signal list,
299 * although it's signal handler is already installed, but even if
300 * the SIGUSR1 arrives while watchdog is initializing we will continue
301 * with our normal initialization and will process the failover request
302 * once our backend status will be synchronized across the cluster
303 */
304 sigfillset(&mask);
305 sigdelset(&mask, SIGUSR1);
306 sigdelset(&mask, SIGCHLD);
307 sigdelset(&mask, SIGTERM);
308 sigdelset(&mask, SIGINT);
309 sigdelset(&mask, SIGQUIT);
310 watchdog_pid = initialize_watchdog();
311 ereport (LOG,
312 (errmsg("waiting for watchdog to initialize")));
313 while (sigusr1_request == 0 && sigchld_request == 0)
314 {
315 sigsuspend(&mask);
316 }
317 wakeup_request = 0;
318
319 /* watchdog process fails to start */
320 if (sigchld_request)
321 {
322 reaper();
323 }
324
325 ereport (LOG,
326 (errmsg("watchdog process is initialized"),
327 errdetail("watchdog messaging data version: %s",WD_MESSAGE_DATA_VERSION)));
328 /*
329 * initialize the lifecheck process
330 */
331 wd_lifecheck_pid = initialize_watchdog_lifecheck();
332
333 if (sigusr1_request)
334 {
335 do {
336 sigusr1_request = 0;
337 sigusr1_interrupt_processor();
338 } while (sigusr1_request == 1);
339 }
340 }
341
342 fds[0] = create_unix_domain_socket(un_addr);
343 fds[1] = -1;
344 on_proc_exit(FileUnlink, (Datum) un_addr.sun_path);
345
346 /* create inet domain socket if any */
347 if (pool_config->listen_addresses[0])
348 {
349 int *inet_fds, *walk;
350 int n = 1;
351
352 inet_fds = create_inet_domain_sockets(pool_config->listen_addresses, pool_config->port);
353
354 for (walk = inet_fds; *walk != -1; walk++)
355 n++;
356
357 fds = realloc(fds, sizeof(int) * (n+1));
358 if (fds == NULL)
359 ereport(FATAL,
360 (errmsg("failed to allocate memory in startup process")));
361
362 n = 1;
363 for (walk = inet_fds; *walk != -1; walk++)
364 {
365 fds[n] = inet_fds[n-1];
366 n++;
367 }
368 fds[n] = -1;
369 free(inet_fds);
370 }
371
372
373 /*
374 * We need to block signal here. Otherwise child might send some
375 * signals, for example SIGUSR1(fail over). Children will inherit
376 * signal blocking but they do unblock signals at the very beginning
377 * of process. So this is harmless.
378 */
379 POOL_SETMASK(&BlockSig);
380 /* fork the children */
381 for (i=0;i<pool_config->num_init_children;i++)
382 {
383 process_info[i].pid = fork_a_child(fds, i);
384 process_info[i].start_time = time(NULL);
385 }
386
387 /* set up signal handlers */
388
389 pool_signal(SIGTERM, exit_handler);
390 pool_signal(SIGINT, exit_handler);
391 pool_signal(SIGQUIT, exit_handler);
392 pool_signal(SIGCHLD, reap_handler);
393 pool_signal(SIGUSR1, sigusr1_handler);
394 pool_signal(SIGUSR2, wakeup_handler);
395 pool_signal(SIGHUP, reload_config_handler);
396
397 /* create pipe for delivering event */
398 if (pipe(pipe_fds) < 0)
399 {
400 ereport(FATAL,
401 (errmsg("failed to create pipe")));
402 }
403
404 MemoryContextSwitchTo(TopMemoryContext);
405
406 /* Create per loop iteration memory context */
407 MainLoopMemoryContext = AllocSetContextCreate(TopMemoryContext,
408 "pgpool_main_loop",
409 ALLOCSET_DEFAULT_MINSIZE,
410 ALLOCSET_DEFAULT_INITSIZE,
411 ALLOCSET_DEFAULT_MAXSIZE);
412
413 /*
414 * if the primary node id is not loaded by watchdog, search for it
415 */
416 if (Req_info->primary_node_id < 0)
417 {
418 /* Save primary node id */
419 Req_info->primary_node_id = find_primary_node_repeatedly();
420 }
421
422 /* fork a child for PCP handling */
423 pcp_unix_fd = create_unix_domain_socket(pcp_un_addr);
424 /* Add onproc exit to clean up the unix domain socket at exit */
425 on_proc_exit(FileUnlink, (Datum)pcp_un_addr.sun_path);
426
427 if (pool_config->pcp_listen_addresses[0])
428 {
429 pcp_inet_fd = create_inet_domain_socket(pool_config->pcp_listen_addresses, pool_config->pcp_port);
430 }
431 pcp_pid = pcp_fork_a_child(pcp_unix_fd, pcp_inet_fd, pcp_conf_file);
432
433 /* Fork worker process */
434 worker_pid = worker_fork_a_child();
435
436 retrycnt = 0; /* reset health check retry counter */
437
438 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
439 {
440 /* Since not using PG_TRY, must reset error stack by hand */
441 error_context_stack = NULL;
442 EmitErrorReport();
443 MemoryContextSwitchTo(TopMemoryContext);
444 FlushErrorState();
445 POOL_SETMASK(&BlockSig);
446
447 /* Check if we are failed during health check
448 * perform the necessary actions in case
449 */
450 if(processState == PERFORMING_HEALTH_CHECK)
451 {
452 if (errno != EINTR || health_check_timer_expired)
453 {
454 if (use_template_db == false)
455 {
456 /* Health check was performed on 'postgres' database
457 * lets try to perform health check with template1 db
458 * before marking the health check as failed
459 */
460 use_template_db = true;
461
462 }
463 else
464 {
465 int ret;
466
467 retrycnt++;
468 ret = process_backend_health_check_failure(health_check_node_id, retrycnt);
469 if (ret > 0) /* Retries are exhausted, reset the counter */
470 {
471 retrycnt = 0;
472 if (ret == 2) /* 2 = failover done on node */
473 {
474 health_check_node_id = 0;
475 use_template_db = false;
476 }
477 }
478 }
479 }
480 }
481 }
482 /* We can now handle ereport(ERROR) */
483 PG_exception_stack = &local_sigjmp_buf;
484
485 /* Create or write status file */
486 (void)write_status_file();
487
488 /* This is the main loop */
489 for (;;)
490 {
491 bool all_nodes_healthy;
492 CHECK_REQUEST;
493
494 /*
495 * check for child signals to ensure child startup before reporting
496 * successfull start.
497 */
498 if (first)
499 ereport(LOG,
500 (errmsg("%s successfully started. version %s (%s)", PACKAGE, VERSION, PGPOOLVERSION)));
501 first = false;
502
503 /* do we need health checking for PostgreSQL? */
504 if (pool_config->health_check_period > 0)
505 {
506 /* rest per iteration memory context */
507 MemoryContextSwitchTo(MainLoopMemoryContext);
508 MemoryContextResetAndDeleteChildren(MainLoopMemoryContext);
509
510 if (retrycnt == 0 && !use_template_db)
511 ereport(DEBUG1,
512 (errmsg("starting health check")));
513 else if(retrycnt)
514 ereport(LOG,
515 (errmsg("health checking retry count %d", retrycnt)));
516
517 if (pool_config->health_check_timeout > 0)
518 {
519 /*
520 * set health checker timeout. we want to detect
521 * communication path failure much earlier before
522 * TCP/IP stack detects it.
523 */
524 CLEAR_ALARM;
525 pool_signal(SIGALRM, health_check_timer_handler);
526 alarm(pool_config->health_check_timeout);
527 }
528 /*
529 * do actual health check. trying to connect to the backend
530 */
531 errno = 0;
532 health_check_timer_expired = 0;
533
534 POOL_SETMASK(&UnBlockSig);
535
536 processState = PERFORMING_HEALTH_CHECK;
537 all_nodes_healthy = do_health_check(use_template_db,&health_check_node_id);
538 POOL_SETMASK(&BlockSig);
539
540 if (all_nodes_healthy && retrycnt)
541 ereport(LOG,
542 (errmsg("all backends are returned to a healthy state after %d retry(ies)",retrycnt)));
543
544 health_check_node_id = 0;
545 use_template_db = false;
546 retrycnt = 0;
547 processState = SLEEPING;
548
549 if (pool_config->health_check_timeout > 0)
550 {
551 /* seems OK. cancel health check timer */
552 pool_signal(SIGALRM, SIG_IGN);
553 CLEAR_ALARM;
554 }
555 pool_sleep(pool_config->health_check_period);
556 }
557 else /* Health Check is not enabled and we have not much to do */
558 {
559 processState = SLEEPING;
560 for (;;)
561 {
562 int r;
563 struct timeval t = {3, 0};
564
565 POOL_SETMASK(&UnBlockSig);
566 r = pool_pause(&t);
567 POOL_SETMASK(&BlockSig);
568 if (r > 0)
569 break;
570 }
571 }
572 }
573 }
574
575 /*
576 * Function process the backend node failure captured by the health check
577 * since this function is called from the exception handler so ereport(ERROR)
578 * is not allowed from this function
579 * Function returns non zero if no retries are exhausted.
580 * ( 1 if failover is not performed on node and 2 in case of failover)
581 */
582 static int
process_backend_health_check_failure(int health_check_node_id,int retrycnt)583 process_backend_health_check_failure(int health_check_node_id, int retrycnt)
584 {
585 /*
586 * health check is failed on template1 database as well
587 */
588 int sleep_time = pool_config->health_check_retry_delay;
589 int health_check_max_retries = pool_config->health_check_max_retries;
590
591 if (health_check_max_retries > 0 && retrycnt <= health_check_max_retries)
592 {
593 /* Keep retrying and sleep a little in between */
594 ereport(DEBUG1,
595 (errmsg("Sleeping for %d seconds from process backend health check failure", sleep_time),
596 errdetail("health check failed retry no is %d while max retries are %d",retrycnt,health_check_max_retries) ));
597
598 pool_sleep(sleep_time);
599 }
600 else
601 {
602 /* No more retries left, proceed to failover if allowed */
603 if (POOL_DISALLOW_TO_FAILOVER(BACKEND_INFO(health_check_node_id).flag))
604 {
605 ereport(LOG,
606 (errmsg("health check failed on node %d but failover is disallowed for the node", health_check_node_id)));
607 return 1;
608 }
609 else
610 {
611 /*
612 * If health check timer expired, then it is possible that there
613 * was a communitication path problem. In that case we need to do
614 * full restarting of child process.
615 */
616 bool partial_failover = health_check_timer_expired? false:true;
617
618 ereport(LOG,
619 (errmsg("setting backend node %d status to NODE DOWN", health_check_node_id)));
620 health_check_timer_expired = 0;
621 degenerate_backend_set(&health_check_node_id, 1, partial_failover, 0);
622 return 2;
623 /* need to distribute this info to children ??*/
624 }
625 }
626 return 0;
627 }
628
629 /*
630 * register_node_operation_request()
631 *
632 * This function enqueues the failover/failback requests, and fires the failover() if the function
633 * is not already executing
634 */
register_node_operation_request(POOL_REQUEST_KIND kind,int * node_id_set,int count,bool switch_over,unsigned int wd_failover_id)635 bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, bool switch_over , unsigned int wd_failover_id)
636 {
637 bool failover_in_progress;
638 pool_sigset_t oldmask;
639 int index;
640 unsigned char request_details = 0;
641
642 /*
643 * if the queue is already full
644 * what to do?
645 */
646 if((Req_info->request_queue_tail - MAX_REQUEST_QUEUE_SIZE) == Req_info->request_queue_head)
647 {
648 return false;
649 }
650 POOL_SETMASK2(&BlockSig, &oldmask);
651 pool_semaphore_lock(REQUEST_INFO_SEM);
652
653 if((Req_info->request_queue_tail - MAX_REQUEST_QUEUE_SIZE) == Req_info->request_queue_head)
654 {
655 pool_semaphore_unlock(REQUEST_INFO_SEM);
656 POOL_SETMASK(&oldmask);
657 return false;
658 }
659 Req_info->request_queue_tail++;
660 index = Req_info->request_queue_tail % MAX_REQUEST_QUEUE_SIZE;
661 Req_info->request[index].kind = kind;
662 Req_info->request[index].wd_failover_id = wd_failover_id;
663
664 /* Set switch over flag if requested */
665 if (switch_over)
666 request_details |= REQ_DETAIL_SWITCHOVER;
667 Req_info->request[index].request_details = request_details;
668
669 if(count > 0)
670 memcpy(Req_info->request[index].node_id, node_id_set, (sizeof(int) * count));
671 Req_info->request[index].count = count;
672 failover_in_progress = Req_info->switching;
673 pool_semaphore_unlock(REQUEST_INFO_SEM);
674
675 if (getpid() == mypid)
676 {
677 /*
678 * We are invoked from main process
679 * call failover with blocked signals
680 */
681 failover();
682 POOL_SETMASK(&oldmask);
683 }
684 else
685 {
686 POOL_SETMASK(&oldmask);
687 if(failover_in_progress == false)
688 {
689 signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
690 }
691 }
692
693 return true;
694 }
695
register_watchdog_state_change_interrupt(void)696 void register_watchdog_state_change_interrupt(void)
697 {
698 signal_user1_to_parent_with_reason(SIG_WATCHDOG_STATE_CHANGED);
699 }
signal_user1_to_parent_with_reason(User1SignalReason reason)700 static void signal_user1_to_parent_with_reason(User1SignalReason reason)
701 {
702 ereport(LOG,
703 (errmsg("signal_user1_to_parent_with_reason(%d)", reason)));
704
705 user1SignalSlot->signalFlags[reason] = true;
706 pool_signal_parent(SIGUSR1);
707 }
708
709 /*
710 * fork a child for PCP
711 */
pcp_fork_a_child(int unix_fd,int inet_fd,char * pcp_conf_file)712 pid_t pcp_fork_a_child(int unix_fd, int inet_fd, char *pcp_conf_file)
713 {
714 pid_t pid;
715
716 pid = fork();
717
718 if (pid == 0)
719 {
720 on_exit_reset();
721
722 close(pipe_fds[0]);
723 close(pipe_fds[1]);
724
725 /* Set the process type variable */
726 processType = PT_PCP;
727
728 /* call PCP child main */
729 POOL_SETMASK(&UnBlockSig);
730 health_check_timer_expired = 0;
731 reload_config_request = 0;
732 pcp_main(unix_fd, inet_fd);
733 }
734 else if (pid == -1)
735 {
736 ereport(FATAL,
737 (errmsg("fork() failed. reason: %s", strerror(errno))));
738 }
739
740 return pid;
741 }
742
743 /*
744 * fork a child
745 */
fork_a_child(int * fds,int id)746 pid_t fork_a_child(int *fds, int id)
747 {
748 pid_t pid;
749
750 pid = fork();
751
752 if (pid == 0)
753 {
754 on_exit_reset();
755
756 /* Before we unconditionally closed pipe_fds[0] and pipe_fds[1]
757 * here, which is apparently wrong since in the start up of
758 * pgpool, pipe(2) is not called yet and it mistakenly closes
759 * fd 0. Now we check the fd > 0 before close(), expecting
760 * pipe returns fds greater than 0. Note that we cannot
761 * unconditionally remove close(2) calls since fork_a_child()
762 * may be called *after* pgpool starting up.
763 */
764 if (pipe_fds[0] > 0)
765 {
766 close(pipe_fds[0]);
767 close(pipe_fds[1]);
768 }
769
770 /* Set the process type variable */
771 processType = PT_CHILD;
772
773 /* call child main */
774 POOL_SETMASK(&UnBlockSig);
775 health_check_timer_expired = 0;
776 reload_config_request = 0;
777 my_proc_id = id;
778 do_child(fds);
779 }
780 else if (pid == -1)
781 {
782 ereport(FATAL,
783 (errmsg("failed to fork a child"),
784 errdetail("system call fork() failed with reason: %s", strerror(errno))));
785 }
786
787 return pid;
788 }
789
790 /*
791 * fork worker child process
792 */
worker_fork_a_child()793 pid_t worker_fork_a_child()
794 {
795 pid_t pid;
796
797 pid = fork();
798
799 if (pid == 0)
800 {
801 on_exit_reset();
802
803 /* Before we unconditionally closed pipe_fds[0] and pipe_fds[1]
804 * here, which is apparently wrong since in the start up of
805 * pgpool, pipe(2) is not called yet and it mistakenly closes
806 * fd 0. Now we check the fd > 0 before close(), expecting
807 * pipe returns fds greater than 0. Note that we cannot
808 * unconditionally remove close(2) calls since fork_a_child()
809 * may be called *after* pgpool starting up.
810 */
811 if (pipe_fds[0] > 0)
812 {
813 close(pipe_fds[0]);
814 close(pipe_fds[1]);
815 }
816
817 /* Set the process type variable */
818 processType = PT_WORKER;
819
820 /* call child main */
821 POOL_SETMASK(&UnBlockSig);
822 health_check_timer_expired = 0;
823 reload_config_request = 0;
824 do_worker_child();
825 }
826 else if (pid == -1)
827 {
828 ereport(FATAL,
829 (errmsg("failed to fork a child"),
830 errdetail("system call fork() failed with reason: %s", strerror(errno))));
831 }
832
833 return pid;
834 }
835
create_inet_domain_sockets(const char * hostname,const int port)836 static int *create_inet_domain_sockets(const char *hostname, const int port)
837 {
838 int ret;
839 int fd;
840 int one = 1;
841 int status;
842 int backlog;
843 int n = 0;
844 int *sockfds;
845 char *portstr;
846 struct addrinfo *walk;
847 struct addrinfo *res;
848 struct addrinfo hints;
849
850 memset(&hints, 0, sizeof(struct addrinfo));
851 hints.ai_family = PF_UNSPEC;
852 hints.ai_socktype = SOCK_STREAM;
853 hints.ai_flags = AI_PASSIVE;
854
855 /* getaddrinfo() requires a string because it also accepts service names, such as "http". */
856 if (asprintf(&portstr, "%d", port) == -1)
857 {
858 ereport(FATAL,
859 (errmsg("failed to create INET domain socket"),
860 errdetail("asprintf() failed: %s", strerror(errno))));
861 }
862
863 if ((ret = getaddrinfo((!hostname || strcmp(hostname, "*") == 0) ? NULL : hostname, portstr, &hints, &res)) != 0)
864 {
865 ereport(FATAL,
866 (errmsg("failed to create INET domain socket"),
867 errdetail("getaddrinfo() failed: %s", gai_strerror(ret))));
868 }
869
870 free(portstr);
871
872 for (walk = res; walk != NULL; walk = walk->ai_next)
873 n++;
874
875 sockfds = malloc(sizeof(int) * (n+1));
876 n = 0;
877 for (walk = res; walk != NULL; walk = walk->ai_next)
878 sockfds[n++] = -1;
879 /* We always terminate the list of sockets with a -1 entry */
880 sockfds[n] = -1;
881
882 n = 0;
883
884 for (walk = res; walk != NULL; walk = walk->ai_next)
885 {
886 char buf[INET6_ADDRSTRLEN+1];
887 memset(buf, 0, sizeof(buf));
888 if ((ret = getnameinfo((struct sockaddr*)walk->ai_addr, walk->ai_addrlen,
889 buf, sizeof(buf), NULL, 0, NI_NUMERICHOST)) != 0)
890 {
891 ereport(FATAL,
892 (errmsg("failed to create INET domain socket"),
893 errdetail("getnameinfo() failed: \"%s\"", gai_strerror(ret))));
894 }
895
896 ereport(LOG,
897 (errmsg("Setting up socket for %s:%d", buf, port)));
898
899 if ((fd = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol)) == -1)
900 {
901 /* A single failure is not necessarily a problem (machines without
902 * proper dual stack setups), but if we cannot create any socket at
903 * all, we report a FATAL error. */
904 ereport(LOG,
905 (errmsg("perhaps failed to create INET domain socket"),
906 errdetail("socket(%s) failed: \"%s\"", buf, strerror(errno))));
907 continue;
908 }
909
910 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one,
911 sizeof(one))) == -1)
912 {
913 ereport(FATAL,
914 (errmsg("failed to create INET domain socket"),
915 errdetail("socket error \"%s\"",strerror(errno))));
916 }
917
918 if (walk->ai_family == AF_INET6)
919 {
920 /* On some machines, depending on the default value in
921 * /proc/sys/net/ipv6/bindv6only, sockets will listen on both IPv6
922 * and IPv4 at the same time. Since we are creating one socket per
923 * address family, disable that option specifically to be sure it
924 * is off. */
925 if ((setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one))) == -1) {
926 ereport(LOG,
927 (errmsg("perhaps failed to create INET domain socket"),
928 errdetail("setsockopt(%s, IPV6_V6ONLY) failed: \"%s\"", buf, strerror(errno))));
929 }
930 }
931
932 if (bind(fd, walk->ai_addr, walk->ai_addrlen) != 0)
933 {
934 ereport(FATAL,
935 (errmsg("failed to create INET domain socket"),
936 errdetail("bind on socket failed with error \"%s\"",strerror(errno))));
937 }
938
939 backlog = pool_config->num_init_children * pool_config->listen_backlog_multiplier;
940
941 if (backlog > PGPOOLMAXLITSENQUEUELENGTH)
942 backlog = PGPOOLMAXLITSENQUEUELENGTH;
943
944 status = listen(fd, backlog);
945 if (status < 0)
946 ereport(FATAL,
947 (errmsg("failed to create INET domain socket"),
948 errdetail("listen on socket failed with error \"%s\"",strerror(errno))));
949
950 sockfds[n++] = fd;
951 }
952
953 freeaddrinfo(res);
954
955 if (n == 0) {
956 ereport(FATAL,
957 (errmsg("failed to create INET domain socket"),
958 errdetail("Failed to create any sockets. See the earlier LOG messages.")));
959 }
960
961 return sockfds;
962 }
963
964 /*
965 * create inet domain socket
966 */
create_inet_domain_socket(const char * hostname,const int port)967 static int create_inet_domain_socket(const char *hostname, const int port)
968 {
969 struct sockaddr_in addr;
970 int fd;
971 int status;
972 int one = 1;
973 int len;
974 int backlog;
975
976 fd = socket(AF_INET, SOCK_STREAM, 0);
977 if (fd == -1)
978 {
979 ereport(FATAL,
980 (errmsg("failed to create INET domain socket"),
981 errdetail("socket error \"%s\"",strerror(errno))));
982 }
983 if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one,
984 sizeof(one))) == -1)
985 {
986 ereport(FATAL,
987 (errmsg("failed to create INET domain socket"),
988 errdetail("socket error \"%s\"",strerror(errno))));
989 }
990
991 memset((char *) &addr, 0, sizeof(addr));
992 addr.sin_family = AF_INET;
993
994 if (strcmp(hostname, "*")==0)
995 {
996 addr.sin_addr.s_addr = htonl(INADDR_ANY);
997 }
998 else
999 {
1000 struct hostent *hostinfo;
1001
1002 hostinfo = gethostbyname(hostname);
1003 if (!hostinfo)
1004 {
1005 ereport(FATAL,
1006 (errmsg("failed to create INET domain socket"),
1007 errdetail("could not resolve hostname \"%s\": error \"%s\"",hostname,hstrerror(h_errno))));
1008
1009 }
1010 addr.sin_addr = *(struct in_addr *) hostinfo->h_addr;
1011 }
1012
1013 addr.sin_port = htons(port);
1014 len = sizeof(struct sockaddr_in);
1015
1016 status = bind(fd, (struct sockaddr *)&addr, len);
1017 if (status == -1)
1018 {
1019 int saved_errno = errno;
1020 char hostname[NI_MAXHOST], servname[NI_MAXSERV];
1021 if ((status = getnameinfo((struct sockaddr *) &addr, len, hostname, sizeof(hostname), servname, sizeof(servname), 0)))
1022 {
1023 ereport(NOTICE,
1024 (errmsg("getnameinfo failed while creating INET domain socket"),
1025 errdetail("getnameinfo failed with reason: \"%s\"",gai_strerror(status))));
1026
1027 snprintf(servname, sizeof(servname), "%d",port);
1028 snprintf(hostname, sizeof(hostname), "%s",hostname);
1029 }
1030 ereport(FATAL,
1031 (errmsg("failed to create INET domain socket"),
1032 errdetail("bind on host:\"%s\" server:\"%s\" failed with error \"%s\"",hostname, servname,strerror(saved_errno))));
1033 }
1034
1035 backlog = pool_config->num_init_children * pool_config->listen_backlog_multiplier;
1036
1037 if (backlog > PGPOOLMAXLITSENQUEUELENGTH)
1038 backlog = PGPOOLMAXLITSENQUEUELENGTH;
1039
1040 status = listen(fd, backlog);
1041 if (status < 0)
1042 ereport(FATAL,
1043 (errmsg("failed to create INET domain socket"),
1044 errdetail("listen on socket failed with error \"%s\"",strerror(errno))));
1045
1046 return fd;
1047 }
1048
1049 /*
1050 * create UNIX domain socket
1051 */
create_unix_domain_socket(struct sockaddr_un un_addr_tmp)1052 static int create_unix_domain_socket(struct sockaddr_un un_addr_tmp)
1053 {
1054 struct sockaddr_un addr;
1055 int fd;
1056 int status;
1057 int len;
1058
1059 /* Delete any pre-existing socket file to avoid failure at bind() time */
1060 unlink(un_addr_tmp.sun_path);
1061
1062 fd = socket(AF_UNIX, SOCK_STREAM, 0);
1063 if (fd == -1)
1064 {
1065 ereport(FATAL,
1066 (errmsg("failed to create a socket"),
1067 errdetail("Failed to create UNIX domain socket. error: \"%s\"", strerror(errno))));
1068 }
1069 memset((char *) &addr, 0, sizeof(addr));
1070 addr.sun_family = AF_UNIX;
1071 snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", un_addr_tmp.sun_path);
1072 len = sizeof(struct sockaddr_un);
1073 status = bind(fd, (struct sockaddr *)&addr, len);
1074 if (status == -1)
1075 {
1076 ereport(FATAL,
1077 (errmsg("failed to bind a socket: \"%s\"",un_addr_tmp.sun_path),
1078 errdetail("bind socket failed with error: \"%s\"", strerror(errno))));
1079 }
1080
1081 if (chmod(un_addr_tmp.sun_path, 0777) == -1)
1082 {
1083 ereport(FATAL,
1084 (errmsg("failed to bind a socket: \"%s\"",un_addr_tmp.sun_path),
1085 errdetail("system call chmod failed with error: \"%s\"", strerror(errno))));
1086 }
1087
1088 status = listen(fd, PGPOOLMAXLITSENQUEUELENGTH);
1089 if (status < 0)
1090 {
1091 ereport(FATAL,
1092 (errmsg("failed to bind a socket: \"%s\"",un_addr_tmp.sun_path),
1093 errdetail("system call listen() failed with error: \"%s\"", strerror(errno))));
1094 }
1095 return fd;
1096 }
1097
1098 /*
1099 * function called as shared memory exit call back to kill all childrens
1100 */
terminate_all_childrens()1101 static void terminate_all_childrens()
1102 {
1103 pid_t wpid;
1104 /*
1105 * This is supposed to be called from main process
1106 */
1107 if(processType != PT_MAIN)
1108 return;
1109 POOL_SETMASK(&BlockSig);
1110
1111 kill_all_children(SIGINT);
1112 if(pcp_pid > 0)
1113 kill(pcp_pid, SIGINT);
1114 pcp_pid = 0;
1115 if(worker_pid > 0)
1116 kill(worker_pid, SIGINT);
1117 worker_pid = 0;
1118 if (pool_config->use_watchdog)
1119 {
1120 if (pool_config->use_watchdog)
1121 {
1122 if (watchdog_pid)
1123 kill(watchdog_pid, SIGINT);
1124 watchdog_pid = 0;
1125
1126 if (wd_lifecheck_pid)
1127 kill(wd_lifecheck_pid, SIGINT);
1128 wd_lifecheck_pid = 0;
1129 }
1130 }
1131
1132 /* wait for all children to exit */
1133 do
1134 {
1135 int ret_pid;
1136
1137 wpid = waitpid(-1, &ret_pid, 0);
1138 } while (wpid > 0 || (wpid == -1 && errno == EINTR));
1139
1140 if (wpid == -1 && errno != ECHILD)
1141 ereport(LOG,
1142 (errmsg("wait() failed. reason:%s", strerror(errno))));
1143
1144 POOL_SETMASK(&UnBlockSig);
1145 }
1146
1147 /*
1148 * Reuest failover. If "switch_over" is false, request all existing sessions
1149 * restarting.
1150 */
notice_backend_error(int node_id,bool switch_over)1151 void notice_backend_error(int node_id, bool switch_over)
1152 {
1153 int n = node_id;
1154
1155 if (getpid() == mypid)
1156 {
1157 ereport(LOG,
1158 (errmsg("notice_backend_error: called from pgpool main. ignored.")));
1159 }
1160 else
1161 {
1162 degenerate_backend_set(&n, 1, switch_over, 0);
1163 }
1164 }
1165
1166 /*
1167 * degenerate_backend_set_ex:
1168 *
1169 * The function registers/verifies the node down operation request.
1170 * The request is then processed by failover function.
1171 *
1172 * node_id_set: array of node ids to be registered for NODE DOWN operation
1173 * count: number of elements in node_id_set array
1174 * error: if set error is thrown as soon as any node id is found in
1175 * node_id_set on which operation could not be performed.
1176 * test_only: When set, function only checks if NODE DOWN operation can be
1177 * executed on provided node ids and never registers the operation
1178 * request.
1179 * For test_only case function returs false or throws an error as
1180 * soon as first non complient node in node_id_set is found
1181 * switch_over: if set, the request is originated by switch over, not errors.
1182 *
1183 * wd_failover_id: The watchdog internal ID for this failover
1184 */
degenerate_backend_set_ex(int * node_id_set,int count,bool error,bool test_only,bool switch_over,unsigned int wd_failover_id)1185 bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool test_only,
1186 bool switch_over, unsigned int wd_failover_id)
1187 {
1188 int i;
1189 int node_id[MAX_NUM_BACKENDS];
1190 int node_count = 0;
1191 int elevel = LOG;
1192
1193 if (error)
1194 elevel = ERROR;
1195
1196 for (i = 0; i < count; i++)
1197 {
1198 if (node_id_set[i] < 0 || node_id_set[i] >= MAX_NUM_BACKENDS ||
1199 !VALID_BACKEND(node_id_set[i]))
1200 {
1201 if (node_id_set[i] < 0 || node_id_set[i] >= MAX_NUM_BACKENDS)
1202 ereport(elevel,
1203 (errmsg("invalid degenerate backend request, node id: %d is out of range. node id must be between [0 and %d]"
1204 ,node_id_set[i],MAX_NUM_BACKENDS)));
1205 else
1206 ereport(elevel,
1207 (errmsg("invalid degenerate backend request, node id : %d status: [%d] is not valid for failover"
1208 ,node_id_set[i],BACKEND_INFO(node_id_set[i]).backend_status)));
1209 if (test_only)
1210 return false;
1211
1212 continue;
1213 }
1214
1215 if (POOL_DISALLOW_TO_FAILOVER(BACKEND_INFO(node_id_set[i]).flag))
1216 {
1217 ereport(elevel,
1218 (errmsg("degenerate backend request for node_id: %d from pid [%d] is canceled because failover is disallowed on the node",
1219 node_id_set[i], getpid())));
1220 if (test_only)
1221 return false;
1222 }
1223 else
1224 {
1225 if (!test_only) /* do not produce this log if we are in testing mode */
1226 ereport(LOG,
1227 (errmsg("received degenerate backend request for node_id: %d from pid [%d]",
1228 node_id_set[i], getpid())));
1229
1230 node_id[node_count++] = node_id_set[i];
1231 }
1232 }
1233
1234 if (node_count)
1235 {
1236 WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
1237 /* If this was only a test. Inform the caller without doing anything */
1238 if(test_only)
1239 return true;
1240
1241 if (pool_config->use_watchdog && wd_failover_id == 0)
1242 {
1243 int x;
1244 for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
1245 {
1246 res = wd_degenerate_backend_set(node_id_set, count, &wd_failover_id);
1247 if (res != FAILOVER_RES_TRANSITION)
1248 break;
1249 sleep(1);
1250 }
1251 }
1252 if (res == FAILOVER_RES_TRANSITION)
1253 {
1254 /*
1255 * What to do when cluster is still not stable
1256 * Is proceeding to failover is the right choice ???
1257 */
1258 ereport(NOTICE,
1259 (errmsg("received degenerate backend request for %d node(s) from pid [%d], But cluster is not in stable state"
1260 , node_count, getpid())));
1261 }
1262 if (res == FAILOVER_RES_PROCEED)
1263 {
1264 register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, switch_over, wd_failover_id);
1265 }
1266 else if (res == FAILOVER_RES_WILL_BE_DONE)
1267 {
1268 ereport(LOG,
1269 (errmsg("degenerate backend request for %d node(s) from pid [%d], will be handled by watchdog"
1270 , node_count, getpid())));
1271 }
1272 else
1273 {
1274 ereport(elevel,
1275 (errmsg("degenerate backend request for %d node(s) from pid [%d] is canceled by other pgpool"
1276 , node_count, getpid())));
1277 return false;
1278 }
1279 }
1280 return true;
1281 }
1282
1283 /*
1284 * wrapper over degenerate_backend_set_ex function to register
1285 * NODE down operation request
1286 */
degenerate_backend_set(int * node_id_set,int count,bool switch_over,unsigned int wd_failover_id)1287 bool degenerate_backend_set(int *node_id_set, int count, bool switch_over, unsigned int wd_failover_id)
1288 {
1289 return degenerate_backend_set_ex(node_id_set, count, false, false, switch_over, wd_failover_id);
1290 }
1291
1292 /* send promote node request using SIGUSR1 */
promote_backend(int node_id,unsigned int wd_failover_id)1293 bool promote_backend(int node_id, unsigned int wd_failover_id)
1294 {
1295 WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
1296 bool ret = false;
1297
1298 if (!MASTER_SLAVE || pool_config->master_slave_sub_mode != STREAM_MODE)
1299 {
1300 return false;
1301 }
1302
1303 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS || !VALID_BACKEND(node_id))
1304 {
1305 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
1306 ereport(LOG,
1307 (errmsg("invalid promote backend request, node id: %d is out of range. node id must be between [0 and %d]"
1308 ,node_id,MAX_NUM_BACKENDS)));
1309 else
1310 ereport(LOG,
1311 (errmsg("invalid promote backend request, node id : %d status: [%d] not valid"
1312 ,node_id,BACKEND_INFO(node_id).backend_status)));
1313 return false;
1314 }
1315 ereport(LOG,
1316 (errmsg("received promote backend request for node_id: %d from pid [%d]",
1317 node_id, getpid())));
1318
1319 /* If this was only a test. Inform the caller without doing anything */
1320 if (pool_config->use_watchdog && wd_failover_id == 0)
1321 {
1322 int x;
1323 for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
1324 {
1325 res = wd_promote_backend(node_id, &wd_failover_id);
1326 if (res != FAILOVER_RES_TRANSITION)
1327 break;
1328 sleep(1);
1329 }
1330 }
1331 if (res == FAILOVER_RES_TRANSITION)
1332 {
1333 /*
1334 * What to do when cluster is still not stable
1335 * Is proceeding to failover is the right choice ???
1336 */
1337 ereport(NOTICE,
1338 (errmsg("promote backend request for node_id: %d from pid [%d], But cluster is not in stable state"
1339 , node_id, getpid())));
1340 }
1341
1342 if (res == FAILOVER_RES_PROCEED)
1343 {
1344 ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, false, wd_failover_id);
1345 }
1346 else if (res == FAILOVER_RES_WILL_BE_DONE)
1347 {
1348 ereport(LOG,
1349 (errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog"
1350 , node_id, getpid())));
1351 }
1352 else
1353 {
1354 ereport(LOG,
1355 (errmsg("promote backend request for node_id: %d from pid [%d] is canceled by other pgpool"
1356 , node_id, getpid())));
1357 }
1358 return ret;
1359 }
1360
1361 /* send failback request using SIGUSR1 */
send_failback_request(int node_id,bool throw_error,unsigned int wd_failover_id)1362 bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failover_id)
1363 {
1364 WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
1365 bool ret = false;
1366
1367 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS ||
1368 (RAW_MODE && BACKEND_INFO(node_id).backend_status != CON_DOWN && VALID_BACKEND(node_id)))
1369 {
1370 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
1371 ereport(throw_error?ERROR:LOG,
1372 (errmsg("invalid failback request, node id: %d is out of range. node id must be between [0 and %d]"
1373 ,node_id,MAX_NUM_BACKENDS)));
1374 else
1375 ereport(throw_error?ERROR:LOG,
1376 (errmsg("invalid failback request, node id : %d status: [%d] not valid for failback"
1377 ,node_id,BACKEND_INFO(node_id).backend_status)));
1378 return false;
1379 }
1380
1381 ereport(LOG,
1382 (errmsg("received failback request for node_id: %d from pid [%d] wd_failover_id [%d]",
1383 node_id, getpid(),wd_failover_id)));
1384
1385 /* If this was only a test. Inform the caller without doing anything */
1386 if (pool_config->use_watchdog && wd_failover_id == 0)
1387 {
1388 int x;
1389 for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
1390 {
1391 res = wd_send_failback_request(node_id, &wd_failover_id);
1392 if (res != FAILOVER_RES_TRANSITION)
1393 break;
1394 sleep(1);
1395 }
1396 }
1397 if (res == FAILOVER_RES_TRANSITION)
1398 {
1399 /*
1400 * What to do when cluster is still not stable
1401 * Is proceeding to failover is the right choice ???
1402 */
1403 ereport(NOTICE,
1404 (errmsg("failback request for node_id: %d from pid [%d], But cluster is not in stable state"
1405 , node_id, getpid())));
1406 }
1407
1408 if (res == FAILOVER_RES_PROCEED)
1409 {
1410 ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, false, wd_failover_id);
1411 }
1412 else if (res == FAILOVER_RES_WILL_BE_DONE)
1413 {
1414 ereport(LOG,
1415 (errmsg("failback request for node_id: %d from pid [%d], will be handled by watchdog"
1416 , node_id, getpid())));
1417 }
1418 else
1419 {
1420 ereport(throw_error?ERROR:LOG,
1421 (errmsg("failback request for node_id: %d from pid [%d] is canceled by other pgpool"
1422 , node_id, getpid())));
1423 }
1424 return ret;
1425 }
1426
exit_handler(int sig)1427 static RETSIGTYPE exit_handler(int sig)
1428 {
1429 int i;
1430 pid_t wpid;
1431 int *walk;
1432
1433 int save_errno = errno;
1434 POOL_SETMASK(&AuthBlockSig);
1435
1436 /*
1437 * this could happen in a child process if a signal has been sent
1438 * before resetting signal handler
1439 */
1440 if (getpid() != mypid)
1441 {
1442 POOL_SETMASK(&UnBlockSig);
1443 proc_exit(0);
1444 }
1445
1446 if (sig != SIGTERM && sig != SIGINT && sig != SIGQUIT)
1447 {
1448 POOL_SETMASK(&UnBlockSig);
1449 errno = save_errno;
1450 return;
1451 }
1452 exiting = 1;
1453 processState = EXITING;
1454
1455 /* Close listen socket */
1456 for (walk = fds; *walk != -1; walk++)
1457 close(*walk);
1458
1459 for (i = 0; i < pool_config->num_init_children; i++)
1460 {
1461 pid_t pid = process_info[i].pid;
1462 if (pid)
1463 {
1464 kill(pid, sig);
1465 process_info[i].pid = 0;
1466 }
1467 }
1468
1469 if (pcp_pid > 0)
1470 kill(pcp_pid, sig);
1471 pcp_pid = 0;
1472
1473 if (worker_pid > 0)
1474 kill(worker_pid, sig);
1475 worker_pid = 0;
1476
1477 if (pool_config->use_watchdog)
1478 {
1479 if (watchdog_pid)
1480 kill(watchdog_pid, sig);
1481 watchdog_pid = 0;
1482
1483 if (wd_lifecheck_pid)
1484 kill(wd_lifecheck_pid, sig);
1485 wd_lifecheck_pid = 0;
1486 }
1487
1488 POOL_SETMASK(&UnBlockSig);
1489 do
1490 {
1491 int ret_pid;
1492
1493 wpid = waitpid(-1, &ret_pid, 0);
1494 } while (wpid > 0 || (wpid == -1 && errno == EINTR));
1495
1496 process_info = NULL;
1497 exit(0);
1498 }
1499
1500 /*
1501 * Calculate next valid master node id.
1502 * If no valid node found, returns -1.
1503 */
get_next_master_node(void)1504 int get_next_master_node(void)
1505 {
1506 int i;
1507
1508 for (i=0;i<pool_config->backend_desc->num_backends;i++)
1509 {
1510 /*
1511 * Do not use VALID_BACKEND macro in raw mode.
1512 * VALID_BACKEND return true only if the argument is master
1513 * node id. In other words, standby nodes are false. So need
1514 * to check backend status with VALID_BACKEND_RAW.
1515 */
1516 if (RAW_MODE)
1517 {
1518 if (VALID_BACKEND_RAW(i))
1519 break;
1520 }
1521 else
1522 {
1523 if (VALID_BACKEND(i))
1524 break;
1525 }
1526 }
1527
1528 if (i == pool_config->backend_desc->num_backends)
1529 i = -1;
1530
1531 return i;
1532 }
1533
1534 /*
1535 * handle SIGUSR1
1536 *
1537 */
sigusr1_handler(int sig)1538 static RETSIGTYPE sigusr1_handler(int sig)
1539 {
1540 int save_errno = errno;
1541
1542 POOL_SETMASK(&BlockSig);
1543 sigusr1_request = 1;
1544
1545 dummy_status = write(pipe_fds[1], "\0", 1);
1546
1547 #ifdef NOT_USED
1548 if(write(pipe_fds[1], "\0", 1) < 0)
1549 ereport(WARNING,
1550 (errmsg("SIGUSR1 handler: write to pipe failed with error \"%s\"", strerror(errno))));
1551 #endif
1552
1553 POOL_SETMASK(&UnBlockSig);
1554
1555 errno = save_errno;
1556 }
1557
sigusr1_interrupt_processor(void)1558 static void sigusr1_interrupt_processor(void)
1559 {
1560 ereport(LOG,
1561 (errmsg("Pgpool-II parent process received SIGUSR1")));
1562
1563 if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
1564 {
1565 ereport(LOG,
1566 (errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
1567
1568 user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
1569 if (get_watchdog_local_node_state() == WD_STANDBY)
1570 {
1571 ereport(LOG,
1572 (errmsg("we have joined the watchdog cluster as STANDBY node"),
1573 errdetail("syncing the backend states from the MASTER watchdog node")));
1574 sync_backend_from_watchdog();
1575 }
1576 }
1577 if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT])
1578 {
1579 ereport(LOG,
1580 (errmsg("Pgpool-II parent process has received failover request")));
1581 user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT] = false;
1582 if (processState == INITIALIZING)
1583 {
1584 ereport(LOG,
1585 (errmsg("ignoring the failover request, since we are still starting up")));
1586 }
1587 else
1588 {
1589 failover();
1590 }
1591 }
1592 }
1593
1594 /* returns true if all backends are down */
check_all_backend_down(void)1595 static bool check_all_backend_down(void)
1596 {
1597 int i;
1598 /* Check to see if all backends are down */
1599 for (i=0;i<NUM_BACKENDS;i++)
1600 {
1601 if (BACKEND_INFO(i).backend_status != CON_DOWN &&
1602 BACKEND_INFO(i).backend_status != CON_UNUSED)
1603 {
1604 ereport(LOG,
1605 (errmsg("Node %d is not down (status: %d)",
1606 i, BACKEND_INFO(i).backend_status)));
1607 return false;
1608 }
1609 }
1610 return true;
1611 }
1612
1613 /*
1614 * backend connection error, failover/failback request, if possible
1615 * failover() must be called under protecting signals.
1616 */
failover(void)1617 static void failover(void)
1618 {
1619 int i, j, k;
1620 int node_id;
1621 int new_master;
1622 int new_primary;
1623 int nodes[MAX_NUM_BACKENDS];
1624 bool need_to_restart_children;
1625 bool partial_restart;
1626 int status;
1627 int sts;
1628 bool need_to_restart_pcp = false;
1629 bool all_backend_down = true;
1630
1631 ereport(DEBUG1,
1632 (errmsg("failover handler called")));
1633
1634 memset(nodes, 0, sizeof(int) * MAX_NUM_BACKENDS);
1635
1636 /*
1637 * this could happen in a child process if a signal has been sent
1638 * before resetting signal handler
1639 */
1640 if (getpid() != mypid)
1641 {
1642 ereport(DEBUG1,
1643 (errmsg("failover handler called"),
1644 errdetail("I am not parent")));
1645 kill(pcp_pid, SIGUSR2);
1646 return;
1647 }
1648 /*
1649 * processing SIGTERM, SIGINT or SIGQUIT
1650 */
1651 if (exiting)
1652 {
1653 ereport(DEBUG1,
1654 (errmsg("failover handler called while exiting")));
1655 kill(pcp_pid, SIGUSR2);
1656 return;
1657 }
1658
1659 /*
1660 * processing fail over or switch over
1661 */
1662 if (switching)
1663 {
1664 ereport(DEBUG1,
1665 (errmsg("failover handler called while switching")));
1666 kill(pcp_pid, SIGUSR2);
1667 return;
1668 }
1669
1670 Req_info->switching = true;
1671 switching = 1;
1672
1673 /* Perform failover with health check alarm
1674 * disabled
1675 */
1676 if (pool_config->health_check_timeout > 0)
1677 {
1678 pool_signal(SIGALRM, SIG_IGN);
1679 CLEAR_ALARM;
1680 health_check_timer_expired = 0;
1681 }
1682
1683 for(;;)
1684 {
1685 POOL_REQUEST_KIND reqkind;
1686 int queue_index;
1687 int node_id_set[MAX_NUM_BACKENDS];
1688 int node_count;
1689 unsigned char request_details;
1690 unsigned int wd_failover_id;
1691 WDFailoverCMDResults wdInterlockingRes;
1692
1693 pool_semaphore_lock(REQUEST_INFO_SEM);
1694
1695 if(Req_info->request_queue_tail == Req_info->request_queue_head) /* request queue is empty*/
1696 {
1697 switching = 0;
1698 Req_info->switching = false;
1699 pool_semaphore_unlock(REQUEST_INFO_SEM);
1700 break;
1701 }
1702
1703 /* make a local copy of request */
1704 Req_info->request_queue_head++;
1705 queue_index = Req_info->request_queue_head % MAX_REQUEST_QUEUE_SIZE;
1706 memcpy(node_id_set, Req_info->request[queue_index].node_id , (sizeof(int) * Req_info->request[queue_index].count));
1707 reqkind = Req_info->request[queue_index].kind;
1708 request_details = Req_info->request[queue_index].request_details;
1709 node_count = Req_info->request[queue_index].count;
1710 wd_failover_id = Req_info->request[queue_index].wd_failover_id;
1711 pool_semaphore_unlock(REQUEST_INFO_SEM);
1712
1713 ereport(DEBUG1,
1714 (errmsg("failover handler"),
1715 errdetail("kind: %d flags: %x node_count: %d index:%d", reqkind, request_details, node_count, queue_index)));
1716
1717 if (reqkind == CLOSE_IDLE_REQUEST)
1718 {
1719 kill_all_children(SIGUSR1);
1720 continue;
1721 }
1722
1723 /* start watchdog interlocking */
1724 wdInterlockingRes = wd_start_failover_interlocking(wd_failover_id);
1725
1726 /*
1727 * if not in replication mode/master slave mode, we treat this a restart request.
1728 * otherwise we need to check if we have already failovered.
1729 */
1730 ereport(DEBUG1,
1731 (errmsg("failover handler"),
1732 errdetail("starting to select new master node")));
1733 node_id = node_id_set[0];
1734
1735 /* failback request? */
1736 if (reqkind == NODE_UP_REQUEST)
1737 {
1738 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS ||
1739 (reqkind == NODE_UP_REQUEST && !(RAW_MODE &&
1740 BACKEND_INFO(node_id).backend_status == CON_DOWN) && VALID_BACKEND(node_id)) ||
1741 (reqkind == NODE_DOWN_REQUEST && !VALID_BACKEND(node_id)))
1742 {
1743 if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
1744 ereport(LOG,
1745 (errmsg("invalid failback request, node id: %d is invalid. node id must be between [0 and %d]",node_id,MAX_NUM_BACKENDS)));
1746 else
1747 ereport(LOG,
1748 (errmsg("invalid failback request, status: [%d] of node id : %d is invalid for failback",BACKEND_INFO(node_id).backend_status,node_id)));
1749
1750 if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
1751 wd_end_failover_interlocking(wd_failover_id);
1752
1753 continue;
1754 }
1755
1756 ereport(LOG,
1757 (errmsg("starting fail back. reconnect host %s(%d)",
1758 BACKEND_INFO(node_id).backend_hostname,
1759 BACKEND_INFO(node_id).backend_port)));
1760
1761 /* Check to see if all backends are down */
1762 all_backend_down = check_all_backend_down();
1763
1764 BACKEND_INFO(node_id).backend_status = CON_CONNECT_WAIT; /* unset down status */
1765 (void)write_status_file();
1766
1767 /* Aquire failback start command lock */
1768 if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
1769 {
1770 trigger_failover_command(node_id, pool_config->failback_command,
1771 MASTER_NODE_ID, get_next_master_node(), PRIMARY_NODE_ID);
1772 wd_failover_lock_release(FAILBACK_LOCK, wd_failover_id);
1773 }
1774 else
1775 {
1776 /*
1777 * Okay we are not allowed to execute the failover command
1778 * so we need to wait till the one who is executing the command
1779 * finish with it.
1780 */
1781 wd_wait_until_command_complete_or_timeout(FAILBACK_LOCK,wd_failover_id);
1782 }
1783 }
1784 else if (reqkind == PROMOTE_NODE_REQUEST)
1785 {
1786 if (node_id != -1 && VALID_BACKEND(node_id))
1787 {
1788 ereport(LOG,
1789 (errmsg("starting promotion. promote host %s(%d)",
1790 BACKEND_INFO(node_id).backend_hostname,
1791 BACKEND_INFO(node_id).backend_port)));
1792 }
1793 else
1794 {
1795 ereport(LOG,
1796 (errmsg("failover: no backends are promoted")));
1797 if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
1798 wd_end_failover_interlocking(wd_failover_id);
1799 continue;
1800 }
1801 }
1802 else /* NODE_DOWN_REQUEST */
1803 {
1804 int cnt = 0;
1805
1806 for (i = 0; i < node_count; i++)
1807 {
1808 if (node_id_set[i] != -1 &&
1809 ((RAW_MODE && VALID_BACKEND_RAW(node_id_set[i])) ||
1810 VALID_BACKEND(node_id_set[i])))
1811 {
1812 ereport(LOG,
1813 (errmsg("starting degeneration. shutdown host %s(%d)",
1814 BACKEND_INFO(node_id_set[i]).backend_hostname,
1815 BACKEND_INFO(node_id_set[i]).backend_port)));
1816
1817 BACKEND_INFO(node_id_set[i]).backend_status = CON_DOWN; /* set down status */
1818 (void)write_status_file();
1819
1820 /* save down node */
1821 nodes[node_id_set[i]] = 1;
1822 cnt++;
1823 }
1824 }
1825
1826 if (cnt == 0)
1827 {
1828 ereport(LOG,
1829 (errmsg("failover: no backends are degenerated")));
1830
1831 if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
1832 wd_end_failover_interlocking(wd_failover_id);
1833
1834 continue;
1835 }
1836 }
1837
1838 new_master = get_next_master_node();
1839
1840 if (new_master < 0)
1841 {
1842 ereport(LOG,
1843 (errmsg("failover: no valid backends node found")));
1844 }
1845
1846 ereport(DEBUG1, (errmsg("failover/failback request details: STREAM: %d reqkind: %d detail: %x node_id: %d",
1847 STREAM, reqkind, request_details & REQ_DETAIL_SWITCHOVER,
1848 node_id)));
1849
1850 /* On 2011/5/2 Tatsuo Ishii says: if mode is streaming replication
1851 * and request is NODE_UP_REQUEST (failback case) we don't need to
1852 * restart all children. Existing session will not use newly
1853 * attached node, but load balanced node is not changed until this
1854 * session ends, so it's harmless anyway.
1855 */
1856 /*
1857 * On 2015/9/21 Tatsuo Ishii says: this judgment is not sufficient if
1858 * all backends were down. Child process has local status in which all
1859 * backends are down. In this case even if new connection arrives from
1860 * frontend, the child will not accept it because the local status
1861 * shows all backends are down. For this purpose we refer to
1862 * "all_backend_down" variable, which was set before updating backend status.
1863 *
1864 * See bug 248 for more details.
1865 */
1866
1867 /*
1868 * We also need to think about a case when the former primary node did
1869 * not exist. In the case we need to restart all children as
1870 * well. For example when previous primary node id is 0 and then it
1871 * went down, restarted, re-attached without promotion. Then existing
1872 * child process loses connection slot to node 0 and keeps on using it
1873 * when node 0 comes back. This could result in segfault later on in
1874 * the child process because there's no connection to node id 0.
1875 *
1876 * Actually we need to think about when ALWAYS_PRIMARY flag is set
1877 * *but* DISALLOW_TO_FAILOVER flag is not set case. In the case after
1878 * primary failover Req_info->primary_node_id is set, but connection
1879 * to the primary node does not exist. So we should do full restart if
1880 * requested node id is the former primary node.
1881 *
1882 * See bug 672 for more details.
1883 */
1884 if (STREAM && reqkind == NODE_UP_REQUEST && all_backend_down == false &&
1885 Req_info->primary_node_id >= 0 && Req_info->primary_node_id != node_id)
1886 {
1887 ereport(LOG,
1888 (errmsg("Do not restart children because we are failing back node id %d host: %s port: %d and we are in streaming replication mode and not all backends were down", node_id,
1889 BACKEND_INFO(node_id).backend_hostname,
1890 BACKEND_INFO(node_id).backend_port)));
1891
1892 need_to_restart_children = false;
1893 partial_restart = false;
1894 }
1895
1896 /*
1897 * If the mode is streaming replication and the request is
1898 * NODE_DOWN_REQUEST and it's actually a switch over request, we don't
1899 * need to restart all children, except the node is primary.
1900 */
1901 else if (STREAM && reqkind == NODE_DOWN_REQUEST &&
1902 request_details & REQ_DETAIL_SWITCHOVER && node_id != PRIMARY_NODE_ID)
1903 {
1904 ereport(LOG,
1905 (errmsg("Do not restart children because we are switching over node id %d host: %s port: %d and we are in streaming replication mode", node_id,
1906 BACKEND_INFO(node_id).backend_hostname,
1907 BACKEND_INFO(node_id).backend_port)));
1908
1909 need_to_restart_children = true;
1910 partial_restart = true;
1911
1912 for (i = 0; i < pool_config->num_init_children; i++)
1913 {
1914 bool restart = false;
1915
1916 for (j=0;j<pool_config->max_pool;j++)
1917 {
1918 for (k=0;k<NUM_BACKENDS;k++)
1919 {
1920 ConnectionInfo *con = pool_coninfo(i, j, k);
1921
1922 if (con->connected && con->load_balancing_node == node_id)
1923 {
1924 ereport(LOG,
1925 (errmsg("child pid %d needs to restart because pool %d uses backend %d",
1926 process_info[i].pid, j, node_id)));
1927 restart = true;
1928 break;
1929 }
1930 }
1931 }
1932
1933 if (restart)
1934 {
1935 pid_t pid = process_info[i].pid;
1936 if (pid)
1937 {
1938 kill(pid, SIGQUIT);
1939 ereport(DEBUG1,
1940 (errmsg("failover handler"),
1941 errdetail("kill process with PID:%d", pid)));
1942 }
1943 }
1944 }
1945 }
1946 else
1947 {
1948 ereport(LOG,
1949 (errmsg("Restart all children")));
1950
1951 /* kill all children */
1952 for (i = 0; i < pool_config->num_init_children; i++)
1953 {
1954 pid_t pid = process_info[i].pid;
1955 if (pid)
1956 {
1957 kill(pid, SIGQUIT);
1958 ereport(DEBUG1,
1959 (errmsg("failover handler"),
1960 errdetail("kill process with PID:%d", pid)));
1961 }
1962 }
1963
1964 need_to_restart_children = true;
1965 partial_restart = false;
1966 }
1967 if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
1968 {
1969 /* Exec failover_command if needed */
1970 for (i = 0; i < pool_config->backend_desc->num_backends; i++)
1971 {
1972 if (nodes[i])
1973 trigger_failover_command(i, pool_config->failover_command,
1974 MASTER_NODE_ID, new_master, PRIMARY_NODE_ID);
1975 }
1976 wd_failover_lock_release(FAILOVER_LOCK, wd_failover_id);
1977 }
1978 else
1979 {
1980 wd_wait_until_command_complete_or_timeout(FAILOVER_LOCK, wd_failover_id);
1981 }
1982
1983 /* no need to wait since it will be done in reap_handler */
1984 #ifdef NOT_USED
1985 while (wait(NULL) > 0)
1986 ;
1987
1988 if (errno != ECHILD)
1989 ereport(LOG,
1990 (errmsg("failover_handler: wait() failed. reason:%s", strerror(errno))));
1991
1992 #endif
1993
1994 if (reqkind == PROMOTE_NODE_REQUEST && VALID_BACKEND(node_id))
1995 new_primary = node_id;
1996
1997 /*
1998 * If the down node was a standby node in streaming replication
1999 * mode, we can avoid calling find_primary_node_repeatedly() and
2000 * recognize the former primary as the new primary node, which
2001 * will reduce the time to process standby down.
2002 */
2003 else if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE &&
2004 reqkind == NODE_DOWN_REQUEST)
2005 {
2006 if (Req_info->primary_node_id != node_id)
2007 new_primary = Req_info->primary_node_id;
2008 else
2009 new_primary = find_primary_node_repeatedly();
2010 }
2011 else
2012 new_primary = find_primary_node_repeatedly();
2013
2014 /*
2015 * If follow_master_command is provided and in master/slave
2016 * streaming replication mode, we start degenerating all backends
2017 * as they are not replicated anymore.
2018 */
2019 int follow_cnt = 0;
2020 if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
2021 {
2022 if (*pool_config->follow_master_command != '\0' ||
2023 reqkind == PROMOTE_NODE_REQUEST)
2024 {
2025 /* only if the failover is against the current primary */
2026 if (((reqkind == NODE_DOWN_REQUEST) &&
2027 Req_info->primary_node_id >= 0 &&
2028 (nodes[Req_info->primary_node_id])) ||
2029 (node_id >= 0 && (reqkind == PROMOTE_NODE_REQUEST) &&
2030 (VALID_BACKEND(node_id))))
2031 {
2032
2033 for (i = 0; i < pool_config->backend_desc->num_backends; i++)
2034 {
2035 /* do not degenerate the new primary */
2036 if ((new_primary >= 0) && (i != new_primary)) {
2037 BackendInfo *bkinfo;
2038 bkinfo = pool_get_node_info(i);
2039 ereport(LOG,
2040 (errmsg("starting follow degeneration. shutdown host %s(%d)",
2041 bkinfo->backend_hostname,
2042 bkinfo->backend_port)));
2043 bkinfo->backend_status = CON_DOWN; /* set down status */
2044 (void)write_status_file();
2045
2046 follow_cnt++;
2047 }
2048 }
2049
2050 if (follow_cnt == 0)
2051 {
2052 ereport(LOG,
2053 (errmsg("failover: no follow backends are degenerated")));
2054 }
2055 else
2056 {
2057 /* update new master node */
2058 new_master = get_next_master_node();
2059 ereport(LOG,
2060 (errmsg("failover: %d follow backends have been degenerated", follow_cnt)));
2061 }
2062 }
2063 }
2064 }
2065
2066 /*
2067 * follow master command also uses the same locks used by trigring command
2068 */
2069 if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
2070 {
2071 if ((follow_cnt > 0) && (*pool_config->follow_master_command != '\0'))
2072 {
2073 follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
2074 Req_info->primary_node_id);
2075 }
2076 wd_failover_lock_release(FOLLOW_MASTER_LOCK, wd_failover_id);
2077 }
2078 else
2079 {
2080 wd_wait_until_command_complete_or_timeout(FOLLOW_MASTER_LOCK, wd_failover_id);
2081
2082 }
2083
2084 /* Save primary node id */
2085 Req_info->primary_node_id = new_primary;
2086 ereport(LOG,
2087 (errmsg("failover: set new primary node: %d", Req_info->primary_node_id)));
2088
2089 if (new_master >= 0)
2090 {
2091 Req_info->master_node_id = new_master;
2092 ereport(LOG,
2093 (errmsg("failover: set new master node: %d", Req_info->master_node_id)));
2094 }
2095
2096
2097 /* Kill children and restart them if needed */
2098 if (need_to_restart_children)
2099 {
2100 for (i=0;i<pool_config->num_init_children;i++)
2101 {
2102 /*
2103 * Try to kill pgpool child because previous kill signal
2104 * may not be received by pgpool child. This could happen
2105 * if multiple PostgreSQL are going down (or even starting
2106 * pgpool, without starting PostgreSQL can trigger this).
2107 * Child calls degenerate_backend() and it tries to aquire
2108 * semaphore to write a failover request. In this case the
2109 * signal mask is set as well, thus signals are never
2110 * received.
2111 */
2112
2113 bool restart = false;
2114
2115 if (partial_restart)
2116 {
2117 for (j=0;j<pool_config->max_pool;j++)
2118 {
2119 for (k=0;k<NUM_BACKENDS;k++)
2120 {
2121 ConnectionInfo *con = pool_coninfo(i, j, k);
2122
2123 if (con->connected && con->load_balancing_node == node_id)
2124 {
2125
2126 ereport(LOG,
2127 (errmsg("child pid %d needs to restart because pool %d uses backend %d",
2128 process_info[i].pid, j, node_id)));
2129 restart = true;
2130 break;
2131 }
2132 }
2133 }
2134 }
2135 else
2136 restart = true;
2137
2138 if (restart)
2139 {
2140 if (process_info[i].pid)
2141 {
2142 kill(process_info[i].pid, SIGQUIT);
2143
2144 process_info[i].pid = fork_a_child(fds, i);
2145 process_info[i].start_time = time(NULL);
2146 }
2147 }
2148 else
2149 process_info[i].need_to_restart = 1;
2150 }
2151 }
2152
2153 else
2154 {
2155 /* Set restart request to each child. Children will exit(1)
2156 * whenever they are convenient.
2157 */
2158 for (i=0;i<pool_config->num_init_children;i++)
2159 {
2160 process_info[i].need_to_restart = 1;
2161 }
2162 }
2163
2164 /*
2165 * Send restart request to worker child.
2166 */
2167 kill(worker_pid, SIGUSR1);
2168
2169 if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
2170 wd_end_failover_interlocking(wd_failover_id);
2171
2172 if (reqkind == NODE_UP_REQUEST)
2173 {
2174 ereport(LOG,
2175 (errmsg("failback done. reconnect host %s(%d)",
2176 BACKEND_INFO(node_id).backend_hostname,
2177 BACKEND_INFO(node_id).backend_port)));
2178
2179 }
2180 else if (reqkind == PROMOTE_NODE_REQUEST)
2181 {
2182 ereport(LOG,
2183 (errmsg("promotion done. promoted host %s(%d)",
2184 BACKEND_INFO(node_id).backend_hostname,
2185 BACKEND_INFO(node_id).backend_port)));
2186 }
2187 else
2188 {
2189 /* Temporary black magic. Without this regression 055 does not finish */
2190 fprintf(stderr, "failover done. shutdown host %s(%d)",
2191 BACKEND_INFO(node_id).backend_hostname,
2192 BACKEND_INFO(node_id).backend_port);
2193
2194 ereport(LOG,
2195 (errmsg("failover done. shutdown host %s(%d)",
2196 BACKEND_INFO(node_id).backend_hostname,
2197 BACKEND_INFO(node_id).backend_port)));
2198 }
2199 need_to_restart_pcp = true;
2200 }
2201 switching = 0;
2202 Req_info->switching = false;
2203
2204 /* kick wakeup_handler in pcp_child to notice that
2205 * failover/failback done
2206 */
2207 kill(pcp_pid, SIGUSR2);
2208
2209 if(need_to_restart_pcp)
2210 {
2211 sleep(1);
2212
2213 /*
2214 * Send restart request to pcp child.
2215 */
2216 kill(pcp_pid, SIGUSR1);
2217 for (;;)
2218 {
2219 sts = waitpid(pcp_pid, &status, 0);
2220 if (sts != -1)
2221 break;
2222
2223 if (errno == EINTR)
2224 continue;
2225 else
2226 {
2227 ereport(WARNING,
2228 (errmsg("failover: waitpid failed. reason: %s", strerror(errno))));
2229 continue;
2230 }
2231 }
2232 if (WIFSIGNALED(status))
2233 ereport(LOG,
2234 (errmsg("PCP child %d exits with status %d by signal %d in failover()", pcp_pid, status, WTERMSIG(status))));
2235 else
2236 ereport(LOG,
2237 (errmsg("PCP child %d exits with status %d in failover()", pcp_pid, status)));
2238
2239 pcp_pid = pcp_fork_a_child(pcp_unix_fd, pcp_inet_fd, pcp_conf_file);
2240 ereport(LOG,
2241 (errmsg("fork a new PCP child pid %d in failover()", pcp_pid)));
2242 }
2243 }
2244
2245 /*
2246 * health check timer handler
2247 */
health_check_timer_handler(int sig)2248 static RETSIGTYPE health_check_timer_handler(int sig)
2249 {
2250 int save_errno = errno;
2251 POOL_SETMASK(&BlockSig);
2252 health_check_timer_expired = 1;
2253 POOL_SETMASK(&UnBlockSig);
2254 errno = save_errno;
2255 }
2256
2257
2258
2259 /*
2260 * do_health_check() performs the health check on all backend nodes.
2261 * The inout parameter health_check_node_id is the starting backend
2262 * node number for health check and when the function returns or
2263 * exits with an error health_check_node_id contains the value
2264 * of last backend node number on which health check was performed.
2265 *
2266 * Function returns false if all backend nodes are down and true if all
2267 * backend nodes are in healthy state
2268 */
2269 static bool
do_health_check(bool use_template_db,volatile int * health_check_node_id)2270 do_health_check(bool use_template_db, volatile int *health_check_node_id)
2271 {
2272 POOL_CONNECTION_POOL_SLOT *slot;
2273 BackendInfo *bkinfo;
2274 static char *dbname;
2275 int i;
2276 bool all_nodes_healthy = false;
2277
2278 /* Do not execute health check during recovery */
2279 if (*InRecovery)
2280 return false;
2281
2282 if (!strcmp(pool_config->health_check_database, ""))
2283 dbname = use_template_db ? "template1" : "postgres";
2284 else
2285 dbname = pool_config->health_check_database;
2286
2287 ereport(DEBUG1,
2288 (errmsg("doing health check against database:%s user:%s",
2289 dbname, pool_config->health_check_user)));
2290
2291 /*
2292 * Start checking the backed nodes starting from the
2293 * previously failed node
2294 */
2295 for (i=*health_check_node_id;i<pool_config->backend_desc->num_backends;i++)
2296 {
2297 *health_check_node_id = i;
2298 /*
2299 * Make sure that health check timer has not been expired.
2300 * Before called health_check(), health_check_timer_expired is
2301 * set to 0. However it is possible that while processing DB
2302 * nodes health check timer expired.
2303 */
2304 if (health_check_timer_expired)
2305 {
2306 ereport(ERROR,
2307 (errmsg("health check timer has been already expired before attempting to connect backend node %d", i)));
2308 }
2309
2310 bkinfo = pool_get_node_info(i);
2311
2312 ereport(DEBUG1,
2313 (errmsg("Backend DB node %d status is %d", i, bkinfo->backend_status)));
2314
2315
2316 if (bkinfo->backend_status == CON_UNUSED ||
2317 bkinfo->backend_status == CON_DOWN)
2318 continue;
2319
2320 all_nodes_healthy = true;
2321 ereport(DEBUG1,
2322 (errmsg("Trying to make persistent DB connection to backend node %d having status %d", i, bkinfo->backend_status)));
2323
2324 slot = make_persistent_db_connection(i, bkinfo->backend_hostname,
2325 bkinfo->backend_port,
2326 dbname,
2327 pool_config->health_check_user,
2328 pool_config->health_check_password, false);
2329
2330 ereport(DEBUG1,
2331 (errmsg("persistent DB connection to backend node %d having status %d is successful", i, bkinfo->backend_status)));
2332
2333 discard_persistent_db_connection(slot);
2334 }
2335 return all_nodes_healthy;
2336 }
2337
2338 /*
2339 * handle SIGCHLD
2340 */
reap_handler(int sig)2341 static RETSIGTYPE reap_handler(int sig)
2342 {
2343 int save_errno = errno;
2344
2345 POOL_SETMASK(&BlockSig);
2346 sigchld_request = 1;
2347
2348 if (pipe_fds[1])
2349 {
2350 dummy_status = write(pipe_fds[1], "\0", 1);
2351 }
2352
2353 #ifdef NOT_USED
2354 if(pipe_fds[1] && write(pipe_fds[1], "\0", 1) < 0)
2355 ereport(WARNING,
2356 (errmsg("reap_handler: write to pipe failed with error \"%s\"", strerror(errno))));
2357 #endif
2358
2359 POOL_SETMASK(&UnBlockSig);
2360
2361 errno = save_errno;
2362 }
2363
2364 /*
2365 * pool_waitpid:
2366 * nothing more than a wrapper function over NOHANG mode waitpid() or wait3()
2367 * depending on the existance of waitpid in the system
2368 */
pool_waitpid(int * status)2369 pid_t pool_waitpid(int *status)
2370 {
2371 #ifdef HAVE_WAITPID
2372 return waitpid(-1, status, WNOHANG);
2373 #else
2374 return wait3(status, WNOHANG, NULL);
2375 #endif
2376 }
2377
2378 /*
2379 * process_name_from_pid()
2380 * helper function for reaper() to report the terminating child process type name
2381 */
process_name_from_pid(pid_t pid)2382 static char* process_name_from_pid(pid_t pid)
2383 {
2384 if (pid == pcp_pid)
2385 return "PCP child";
2386 if (pid == worker_pid)
2387 return "worker child";
2388 if (pool_config->use_watchdog)
2389 {
2390 if (pid == watchdog_pid)
2391 return "watchdog child";
2392 else if (pid == wd_lifecheck_pid)
2393 return "watchdog lifecheck";
2394 }
2395 return "child";
2396 }
2397 /*
2398 * Attach zombie processes and restart child processes.
2399 * reaper() must be called protected from signals.
2400 * Note:
2401 * In pgpool child can exit in two ways, either by some signal or by
2402 * calling exit() system function.
2403 * For the case of child terminating due to a signal the reaper() function
2404 * always forks a new respective type of child process. But for the case when
2405 * child got terminated by exit() system call than the function checks the exit
2406 * code and if the child was exited by POOL_EXIT_FATAL than we do not restarts the
2407 * terminating child but shutdowns the pgpool-II. This allow
2408 * the child process to inform parent process of fatal failures which needs
2409 * to be rectified (e.g startup failure) by user for smooth running of system.
2410 * Also the child exits with success status POOL_EXIT_NO_RESTART does not gets
2411 * restarted.
2412 */
reaper(void)2413 static void reaper(void)
2414 {
2415 pid_t pid;
2416 int status;
2417 int i;
2418
2419 ereport(DEBUG1,
2420 (errmsg("reaper handler")));
2421
2422 if (exiting)
2423 {
2424 ereport(DEBUG1,
2425 (errmsg("reaper handler: exited because already in exiting mode")));
2426 return;
2427 }
2428
2429 if (switching)
2430 {
2431 ereport(DEBUG1,
2432 (errmsg("reaper handler: exited due to switching")));
2433 return;
2434 }
2435
2436 /* clear SIGCHLD request */
2437 sigchld_request = 0;
2438
2439 while ((pid = pool_waitpid(&status)) > 0)
2440 {
2441 pid_t new_pid = 0;
2442 bool shutdown_system = false;
2443 bool restart_child = true;
2444 bool found = false;
2445 char *exiting_process_name = process_name_from_pid(pid);
2446
2447 /*
2448 * Check if the terminating child wants pgpool main to go down with it
2449 */
2450 if(WIFEXITED(status))
2451 {
2452 if(WEXITSTATUS(status) == POOL_EXIT_FATAL)
2453 {
2454 ereport(DEBUG1,
2455 (errmsg("%s process with pid: %d exit with FATAL ERROR. pgpool-II will be shutdown",exiting_process_name, pid)));
2456 shutdown_system = true;
2457 }
2458 else if(WEXITSTATUS(status) == POOL_EXIT_NO_RESTART)
2459 {
2460 ereport(DEBUG1,
2461 (errmsg("%s process with pid: %d exit with SUCCESS. child will not be restarted",exiting_process_name, pid)));
2462
2463 restart_child = false;
2464 }
2465 }
2466 if (WIFSIGNALED(status))
2467 {
2468 /* Child terminated by segmentation fault or sigkill. Report it */
2469 if (WTERMSIG(status) == SIGSEGV)
2470 ereport(WARNING,
2471 (errmsg("%s process with pid: %d was terminated by segmentation fault", exiting_process_name, pid)));
2472 else if (WTERMSIG(status) == SIGKILL)
2473 ereport(WARNING,
2474 (errmsg("%s process with pid: %d was terminated by sigkill", exiting_process_name, pid)));
2475 else
2476 ereport(LOG,
2477 (errmsg("%s process with pid: %d exits with status %d by signal %d", exiting_process_name, pid, status, WTERMSIG(status))));
2478 /* If the watchdog process was terminated abonormally.
2479 * we need to set the cleanup flag so that the new watchdog process
2480 * can start without problems
2481 */
2482 if (pool_config->use_watchdog && pid == watchdog_pid)
2483 {
2484 set_watchdog_process_needs_cleanup();
2485 }
2486 }
2487 else
2488 ereport(LOG,
2489 (errmsg("%s process with pid: %d exits with status %d", exiting_process_name,pid, status)));
2490
2491 /* if exiting child process was PCP handler */
2492 if (pid == pcp_pid)
2493 {
2494 found = true;
2495 if(restart_child)
2496 {
2497 pcp_pid = pcp_fork_a_child(pcp_unix_fd, pcp_inet_fd, pcp_conf_file);
2498 new_pid = pcp_pid;
2499 }
2500 else
2501 pcp_pid = 0;
2502 }
2503
2504 /* exiting process was worker process */
2505 else if (pid == worker_pid)
2506 {
2507 found = true;
2508 if(restart_child)
2509 {
2510 worker_pid = worker_fork_a_child();
2511 new_pid = worker_pid;
2512 }
2513 else
2514 worker_pid = 0;
2515 }
2516
2517 /* exiting process was watchdog process */
2518 else if (pool_config->use_watchdog)
2519 {
2520 if (watchdog_pid == pid)
2521 {
2522 found = true;
2523 if(restart_child)
2524 {
2525 watchdog_pid = initialize_watchdog();
2526 new_pid = watchdog_pid;
2527 }
2528 else
2529 watchdog_pid = 0;
2530 }
2531 else if (wd_lifecheck_pid == pid)
2532 {
2533 found = true;
2534 if(restart_child)
2535 {
2536 wd_lifecheck_pid = initialize_watchdog_lifecheck();
2537 new_pid = wd_lifecheck_pid;
2538 }
2539 else
2540 wd_lifecheck_pid = 0;
2541 }
2542 }
2543 /* we are not able to identify the exiting process yet.
2544 * check if the exiting process was child process (handling PG clients)
2545 */
2546 if (found == false)
2547 {
2548 /* look for exiting child's pid */
2549 for (i=0;i<pool_config->num_init_children;i++)
2550 {
2551 if (pid == process_info[i].pid)
2552 {
2553 found = true;
2554 /* if found, fork a new child */
2555 if (!switching && !exiting && restart_child)
2556 {
2557 process_info[i].pid = fork_a_child(fds, i);
2558 process_info[i].start_time = time(NULL);
2559 new_pid = process_info[i].pid;
2560 }
2561 else
2562 process_info[i].pid = 0;
2563 break;
2564 }
2565 }
2566 }
2567 if(shutdown_system)
2568 ereport(FATAL,
2569 (errmsg("%s process exit with fatal error. exiting pgpool-II",exiting_process_name)));
2570
2571 else if(restart_child && new_pid)
2572 {
2573 /* Report if the child was restarted */
2574 ereport(LOG,
2575 (errmsg("fork a new %s process with pid: %d",exiting_process_name, new_pid)));
2576 }
2577 else
2578 {
2579 /* And the child was not restarted */
2580 ereport(LOG,
2581 (errmsg("%s process with pid: %d exited with success and will not be restarted", exiting_process_name, pid)));
2582 }
2583
2584 }
2585 ereport(DEBUG1,
2586 (errmsg("reaper handler: exiting normally")));
2587 }
2588
2589 /*
2590 * get node information specified by node_number
2591 */
2592 BackendInfo *
pool_get_node_info(int node_number)2593 pool_get_node_info(int node_number)
2594 {
2595 if (node_number < 0 || node_number >= NUM_BACKENDS)
2596 return NULL;
2597
2598 return &BACKEND_INFO(node_number);
2599 }
2600
2601 /*
2602 * get number of nodes
2603 */
2604 int
pool_get_node_count(void)2605 pool_get_node_count(void)
2606 {
2607 return NUM_BACKENDS;
2608 }
2609
2610 /*
2611 * get process ids
2612 */
2613 int *
pool_get_process_list(int * array_size)2614 pool_get_process_list(int *array_size)
2615 {
2616 int *array;
2617 int i;
2618
2619 *array_size = pool_config->num_init_children;
2620 array = palloc0(*array_size * sizeof(int));
2621 for (i = 0; i < *array_size; i++)
2622 array[i] = process_info[i].pid;
2623
2624 return array;
2625 }
2626
2627 /*
2628 * get process information specified by pid
2629 */
2630 ProcessInfo *
pool_get_process_info(pid_t pid)2631 pool_get_process_info(pid_t pid)
2632 {
2633 int i;
2634
2635 for (i = 0; i < pool_config->num_init_children; i++)
2636 if (process_info[i].pid == pid)
2637 return &process_info[i];
2638
2639 return NULL;
2640 }
2641
2642 /*
2643 * handle SIGUSR2
2644 * Wakeup all processes
2645 */
wakeup_children(void)2646 static void wakeup_children(void)
2647 {
2648 kill_all_children(SIGUSR2);
2649 }
2650
2651
wakeup_handler(int sig)2652 static RETSIGTYPE wakeup_handler(int sig)
2653 {
2654 int save_errno = errno;
2655
2656 wakeup_request = 1;
2657 if (processState != INITIALIZING)
2658 {
2659 POOL_SETMASK(&BlockSig);
2660 dummy_status = write(pipe_fds[1], "\0", 1);
2661 #ifdef NOT_USED
2662 if(write(pipe_fds[1], "\0", 1) < 0)
2663 ereport(WARNING,
2664 (errmsg("wakeup_handler: write to pipe failed with error \"%s\"", strerror(errno))));
2665 #endif
2666 POOL_SETMASK(&UnBlockSig);
2667 }
2668 errno = save_errno;
2669 }
2670
2671 /*
2672 * handle SIGHUP
2673 *
2674 */
reload_config_handler(int sig)2675 static RETSIGTYPE reload_config_handler(int sig)
2676 {
2677 int save_errno = errno;
2678
2679 POOL_SETMASK(&BlockSig);
2680 reload_config_request = 1;
2681 dummy_status = write(pipe_fds[1], "\0", 1);
2682 #ifdef NOT_USED
2683 if(write(pipe_fds[1], "\0", 1) < 0)
2684 ereport(WARNING,
2685 (errmsg("reload_config_handler: write to pipe failed with error \"%s\"", strerror(errno))));
2686 #endif
2687
2688 POOL_SETMASK(&UnBlockSig);
2689
2690 errno = save_errno;
2691 }
2692
kill_all_children(int sig)2693 static void kill_all_children(int sig)
2694 {
2695 int i;
2696 if(process_info)
2697 {
2698 /* kill all children */
2699 for (i = 0; i < pool_config->num_init_children; i++)
2700 {
2701 pid_t pid = process_info[i].pid;
2702 if (pid)
2703 {
2704 kill(pid, sig);
2705 }
2706 }
2707 }
2708 /* make PCP process reload as well */
2709 if (sig == SIGHUP && pcp_pid > 0)
2710 kill(pcp_pid, sig);
2711 }
2712
2713 /*
2714 * pause in a period specified by timeout. If any data is coming
2715 * through pipe_fds[0], that means one of: failover request(SIGUSR1),
2716 * SIGCHLD received, children wake up request(SIGUSR2 used in on line
2717 * recovery processing) or config file reload request(SIGHUP) has been
2718 * occurred. In this case this function returns 1.
2719 * otherwise 0: (no signal event occurred), -1: (error)
2720 * XXX: is it OK that select(2) error is ignored here?
2721 */
pool_pause(struct timeval * timeout)2722 static int pool_pause(struct timeval *timeout)
2723 {
2724 fd_set rfds;
2725 int n;
2726 char dummy;
2727
2728 FD_ZERO(&rfds);
2729 FD_SET(pipe_fds[0], &rfds);
2730 n = select(pipe_fds[0]+1, &rfds, NULL, NULL, timeout);
2731 if (n == 1)
2732 {
2733 if(read(pipe_fds[0], &dummy, 1) < 0)
2734 ereport(WARNING,
2735 (errmsg("pool_pause: read on pipe failed with error \"%s\"", strerror(errno))));
2736 }
2737 return n;
2738 }
2739
2740 /*
2741 * sleep for seconds specified by "second". Unlike pool_pause(), this
2742 * function guarantees that it will sleep for specified seconds. This
2743 * function uses pool_pause() internally. If it informs that there is
2744 * a pending signal event, they are processed using CHECK_REQUEST
2745 * macro. Note that most of these processes are done while all signals
2746 * are blocked.
2747 */
pool_sleep(unsigned int second)2748 void pool_sleep(unsigned int second)
2749 {
2750 struct timeval current_time, sleep_time;
2751
2752 gettimeofday(¤t_time, NULL);
2753 sleep_time.tv_sec = second + current_time.tv_sec;
2754 sleep_time.tv_usec = current_time.tv_usec;
2755
2756 POOL_SETMASK(&UnBlockSig);
2757 while (sleep_time.tv_sec > current_time.tv_sec)
2758 {
2759 struct timeval timeout;
2760 int r;
2761
2762 timeout.tv_sec = sleep_time.tv_sec - current_time.tv_sec;
2763 timeout.tv_usec = sleep_time.tv_usec - current_time.tv_usec;
2764 if (timeout.tv_usec < 0)
2765 {
2766 timeout.tv_sec--;
2767 timeout.tv_usec += 1000000;
2768 }
2769
2770 r = pool_pause(&timeout);
2771 POOL_SETMASK(&BlockSig);
2772 if (r > 0)
2773 CHECK_REQUEST;
2774 POOL_SETMASK(&UnBlockSig);
2775 gettimeofday(¤t_time, NULL);
2776 }
2777 POOL_SETMASK(&BlockSig);
2778 }
2779
2780 /*
2781 * trigger_failover_command: execute specified command at failover.
2782 * command_line is null-terminated string.
2783 */
trigger_failover_command(int node,const char * command_line,int old_master,int new_master,int old_primary)2784 static int trigger_failover_command(int node, const char *command_line,
2785 int old_master, int new_master, int old_primary)
2786 {
2787 int r = 0;
2788 String *exec_cmd;
2789 char port_buf[6];
2790 char buf[2];
2791 BackendInfo *info;
2792 BackendInfo *newmaster;
2793 if (command_line == NULL || (strlen(command_line) == 0))
2794 return 0;
2795
2796 /* check failed nodeID */
2797 if (node < 0 || node >= NUM_BACKENDS)
2798 return -1;
2799
2800 info = pool_get_node_info(node);
2801 if (!info)
2802 return -1;
2803
2804 buf[1] = '\0';
2805 exec_cmd = init_string("");
2806
2807 while (*command_line)
2808 {
2809 if (*command_line == '%')
2810 {
2811 if (*(command_line + 1))
2812 {
2813 char val = *(command_line + 1);
2814 switch (val)
2815 {
2816 case 'p': /* failed node port */
2817 snprintf(port_buf, sizeof(port_buf), "%d", info->backend_port);
2818 string_append_char(exec_cmd, port_buf);
2819 break;
2820
2821 case 'D': /* failed node database directory */
2822 string_append_char(exec_cmd, info->backend_data_directory);
2823 break;
2824
2825 case 'd': /* failed node id */
2826 snprintf(port_buf, sizeof(port_buf), "%d", node);
2827 string_append_char(exec_cmd, port_buf);
2828 break;
2829
2830 case 'h': /* failed host name */
2831 string_append_char(exec_cmd, info->backend_hostname);
2832 break;
2833
2834 case 'H': /* new master host name */
2835 newmaster = pool_get_node_info(new_master);
2836 if (newmaster)
2837 string_append_char(exec_cmd, newmaster->backend_hostname);
2838 else
2839 /* no valid new master */
2840 string_append_char(exec_cmd, "\"\"");
2841 break;
2842
2843 case 'm': /* new master node id */
2844 snprintf(port_buf, sizeof(port_buf), "%d", new_master);
2845 string_append_char(exec_cmd, port_buf);
2846 break;
2847
2848 case 'r': /* new master port */
2849 newmaster = pool_get_node_info(get_next_master_node());
2850 if (newmaster)
2851 {
2852 snprintf(port_buf, sizeof(port_buf), "%d", newmaster->backend_port);
2853 string_append_char(exec_cmd, port_buf);
2854 }
2855 else
2856 /* no valid new master */
2857 string_append_char(exec_cmd, "\"\"");
2858 break;
2859
2860 case 'R': /* new master database directory */
2861 newmaster = pool_get_node_info(get_next_master_node());
2862 if (newmaster)
2863 string_append_char(exec_cmd, newmaster->backend_data_directory);
2864 else
2865 /* no valid new master */
2866 string_append_char(exec_cmd, "\"\"");
2867 break;
2868
2869 case 'M': /* old master node id */
2870 snprintf(port_buf, sizeof(port_buf), "%d", old_master);
2871 string_append_char(exec_cmd, port_buf);
2872 break;
2873
2874 case 'P': /* old primary node id */
2875 snprintf(port_buf, sizeof(port_buf), "%d", old_primary);
2876 string_append_char(exec_cmd, port_buf);
2877 break;
2878
2879 case '%': /* escape */
2880 string_append_char(exec_cmd, "%");
2881 break;
2882
2883 default: /* ignore */
2884 break;
2885 }
2886 command_line++;
2887 }
2888 } else {
2889 buf[0] = *command_line;
2890 string_append_char(exec_cmd, buf);
2891 }
2892 command_line++;
2893 }
2894
2895 if (strlen(exec_cmd->data) != 0)
2896 {
2897 ereport(LOG,
2898 (errmsg("execute command: %s", exec_cmd->data)));
2899 r = system(exec_cmd->data);
2900 }
2901
2902 free_string(exec_cmd);
2903
2904 return r;
2905 }
2906 /*
2907 * This function is used by find_primary_node() function and is just a wrapper
2908 * over make_persistent_db_connection() function and returns boolean value to
2909 * inform connection status.
2910 * This function must not throw ereport.
2911 */
2912 static bool
verify_backend_node_status(int backend_no,bool * is_standby)2913 verify_backend_node_status(int backend_no, bool* is_standby)
2914 {
2915 POOL_CONNECTION_POOL_SLOT *s = NULL;
2916 POOL_CONNECTION *con;
2917 POOL_SELECT_RESULT *res;
2918 BackendInfo *bkinfo = pool_get_node_info(backend_no);
2919
2920 *is_standby = false;
2921
2922 s = make_persistent_db_connection_noerror(backend_no, bkinfo->backend_hostname,
2923 bkinfo->backend_port,
2924 pool_config->sr_check_database,
2925 pool_config->sr_check_user,
2926 pool_config->sr_check_password, true);
2927 if (s)
2928 {
2929 MemoryContext oldContext = CurrentMemoryContext;
2930 con = s->con;
2931
2932 PG_TRY();
2933 {
2934 do_query(con, "SELECT pg_is_in_recovery()",
2935 &res, PROTO_MAJOR_V3);
2936 }
2937 PG_CATCH();
2938 {
2939 /* ignore the error message */
2940 res = NULL;
2941 MemoryContextSwitchTo(oldContext);
2942 FlushErrorState();
2943 ereport(LOG,
2944 (errmsg("verify_backend_node_status: do_query failed")));
2945 }
2946 PG_END_TRY();
2947 if(res)
2948 {
2949 if (res->numrows <= 0)
2950 {
2951 ereport(LOG,
2952 (errmsg("verify_backend_node_status: do_query returns no rows")));
2953 }
2954 if (res->data[0] == NULL)
2955 {
2956 ereport(LOG,
2957 (errmsg("verify_backend_node_status: do_query returns no data")));
2958 }
2959 if (res->nullflags[0] == -1)
2960 {
2961 ereport(LOG,
2962 (errmsg("verify_backend_node_status: do_query returns NULL")));
2963 }
2964 if (res->data[0] && !strcmp(res->data[0], "t"))
2965 {
2966 *is_standby = true;
2967 }
2968 free_select_result(res);
2969 }
2970 discard_persistent_db_connection(s);
2971 return true;
2972 }
2973 return false;
2974 }
2975
2976 /*
2977 * Find the primary node (i.e. not standby node) and returns its node
2978 * id. If no primary node is found, returns -1.
2979 */
find_primary_node(void)2980 static int find_primary_node(void)
2981 {
2982 int i;
2983
2984 /* Streaming replication mode? */
2985 if (!STREAM)
2986 {
2987 /* No point to look for primary node if not in streaming
2988 * replication mode.
2989 */
2990 ereport(DEBUG1,
2991 (errmsg("find_primary_node: not in streaming replication mode")));
2992 return -1;
2993 }
2994
2995 for(i=0;i<NUM_BACKENDS;i++)
2996 {
2997 bool node_status;
2998 bool is_standby;
2999
3000 ereport(LOG,
3001 (errmsg("find_primary_node: checking backend no %d",i)));
3002
3003 if (!VALID_BACKEND(i))
3004 continue;
3005 node_status = verify_backend_node_status(i,&is_standby);
3006 if (!node_status)
3007 {
3008 /*
3009 * It is possible that a node is down even if
3010 * VALID_BACKEND tells it's valid. This could happen
3011 * before health checking detects the failure.
3012 * Thus we should continue to look for primary node.
3013 */
3014 continue;
3015 }
3016 if (is_standby)
3017 ereport(DEBUG1,
3018 (errmsg("find_primary_node: %d node is standby", i)));
3019 else
3020 break;
3021 }
3022
3023 if (i == NUM_BACKENDS)
3024 {
3025 ereport(DEBUG1,
3026 (errmsg("find_primary_node: no primary node found")));
3027 return -1;
3028 }
3029
3030 ereport(LOG,
3031 (errmsg("find_primary_node: primary node id is %d", i)));
3032 return i;
3033 }
3034
find_primary_node_repeatedly(void)3035 static int find_primary_node_repeatedly(void)
3036 {
3037 int sec;
3038 int node_id = -1;
3039 int i;
3040
3041 /* Streaming replication mode? */
3042 if (pool_config->master_slave_mode == false ||
3043 pool_config->master_slave_sub_mode != STREAM_MODE)
3044 {
3045 /* No point to look for primary node if not in streaming
3046 * replication mode.
3047 */
3048 ereport(DEBUG1,
3049 (errmsg("find_primary_node: not in streaming replication mode")));
3050 return -1;
3051 }
3052
3053 /*
3054 * If all of the backends are down, there's no point to keep on searching
3055 * primary node.
3056 */
3057 for (i = 0; i < NUM_BACKENDS; i++)
3058 {
3059 if (VALID_BACKEND(i))
3060 break;
3061 }
3062 if (i == NUM_BACKENDS)
3063 {
3064 ereport(LOG,
3065 (errmsg("find_primary_node_repeatedly: all of the backends are down. Giving up finding primary node")));
3066 return -1;
3067 }
3068
3069 /*
3070 * Try to find the new primary node and keep trying for
3071 * search_primary_node_timeout seconds.
3072 * search_primary_node_timeout = 0 means never timeout and keep searching
3073 * indefinitely
3074 */
3075 ereport(LOG,
3076 (errmsg("find_primary_node_repeatedly: waiting for finding a primary node")));
3077 for (sec = 0; (pool_config->search_primary_node_timeout == 0 ||
3078 sec < pool_config->search_primary_node_timeout); sec++)
3079 {
3080 node_id = find_primary_node();
3081 if (node_id != -1)
3082 break;
3083 pool_sleep(1);
3084 }
3085 return node_id;
3086 }
3087
3088 /*
3089 * fork a follow child
3090 */
fork_follow_child(int old_master,int new_primary,int old_primary)3091 pid_t fork_follow_child(int old_master, int new_primary, int old_primary)
3092 {
3093 pid_t pid;
3094 int i;
3095
3096 pid = fork();
3097
3098 if (pid == 0)
3099 {
3100 on_exit_reset();
3101 processType = PT_FOLLOWCHILD;
3102
3103 ereport(LOG,
3104 (errmsg("start triggering follow command.")));
3105 for (i = 0; i < pool_config->backend_desc->num_backends; i++)
3106 {
3107 BackendInfo *bkinfo;
3108 bkinfo = pool_get_node_info(i);
3109 if (bkinfo->backend_status == CON_DOWN)
3110 trigger_failover_command(i, pool_config->follow_master_command,
3111 old_master, new_primary, old_primary);
3112 }
3113 exit(0);
3114 }
3115 else if (pid == -1)
3116 {
3117 ereport(WARNING,
3118 (errmsg("follow fork() failed with reason: \"%s\"", strerror(errno))));
3119 exit(1);
3120 }
3121 return pid;
3122 }
3123
3124
initialize_shared_mem_objects(bool clear_memcache_oidmaps)3125 static void initialize_shared_mem_objects(bool clear_memcache_oidmaps)
3126 {
3127 size_t size;
3128 int i;
3129
3130 /*
3131 * con_info is a 3 dimension array: i corresponds to pgpool child
3132 * process, j corresponds to connection pool in each process and k
3133 * corresponds to backends in each connection pool.
3134 *
3135 * XXX: Before 2010/4/12 this was a 2 dimension array: i
3136 * corresponds to pgpool child process, j corresponds to
3137 * connection pool in each process. Of course this was wrong.
3138 */
3139 size = pool_coninfo_size();
3140 con_info = pool_shared_memory_create(size);
3141 memset(con_info, 0, size);
3142
3143 size = pool_config->num_init_children * (sizeof(ProcessInfo));
3144
3145 ereport(DEBUG1,
3146 (errmsg("ProcessInfo: num_init_children (%d) * sizeof(ProcessInfo) (%zu) = %d bytes requested for shared memory",
3147 pool_config->num_init_children,
3148 sizeof(ProcessInfo),
3149 size)));
3150
3151 process_info = pool_shared_memory_create(size);
3152 memset(process_info, 0, size);
3153
3154 for (i = 0; i < pool_config->num_init_children; i++)
3155 {
3156 process_info[i].connection_info = pool_coninfo(i,0,0);
3157 }
3158
3159 user1SignalSlot = pool_shared_memory_create(sizeof(User1SignalSlot));
3160 /* create fail over/switch over event area */
3161 Req_info = pool_shared_memory_create(sizeof(POOL_REQUEST_INFO));
3162
3163 ereport(DEBUG1,
3164 (errmsg("Request info are: sizeof(POOL_REQUEST_INFO) %zu bytes requested for shared memory",
3165 sizeof(POOL_REQUEST_INFO))));
3166 /*
3167 * Initialize backend status area.
3168 * From now on, VALID_BACKEND macro can be used.
3169 * (get_next_master_node() uses VALID_BACKEND)
3170 */
3171
3172 for (i=0;i<MAX_NUM_BACKENDS;i++)
3173 {
3174 my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
3175 }
3176
3177 /* initialize Req_info */
3178 Req_info->master_node_id = get_next_master_node();
3179 Req_info->conn_counter = 0;
3180 Req_info->switching = false;
3181 Req_info->request_queue_head = Req_info->request_queue_tail = -1;
3182 Req_info->primary_node_id = -2;
3183 InRecovery = pool_shared_memory_create(sizeof(int));
3184 *InRecovery = RECOVERY_INIT;
3185
3186 ereport(DEBUG1,
3187 (errmsg("Recovery management area: sizeof(int) %zu bytes requested for shared memory",
3188 sizeof(int))));
3189
3190 /*
3191 * Initialize shared memory cache
3192 */
3193 if (pool_config->memory_cache_enabled)
3194 {
3195 if (pool_is_shmem_cache())
3196 {
3197 size_t size;
3198
3199 size = pool_shared_memory_cache_size();
3200 pool_init_memory_cache(size);
3201
3202 size = pool_shared_memory_fsmm_size();
3203 if (size == 0)
3204 ereport(FATAL,
3205 (errmsg("invalid shared memory size"),
3206 errdetail("pool_shared_memory_fsmm_size error")));
3207
3208 pool_init_fsmm(size);
3209
3210 pool_allocate_fsmm_clock_hand();
3211
3212 pool_discard_oid_maps();
3213
3214 ereport(LOG,
3215 (errmsg("pool_discard_oid_maps: discarded memqcache oid maps")));
3216
3217 pool_hash_init(pool_config->memqcache_max_num_cache);
3218 }
3219
3220 #ifdef USE_MEMCACHED
3221 else
3222 {
3223 if (clear_memcache_oidmaps)
3224 {
3225 pool_discard_oid_maps();
3226 ereport(LOG,
3227 (errmsg("discarded memqcache oid maps")));
3228 }
3229 else
3230 {
3231 ereport(DEBUG1,
3232 (errmsg("skipped discarding memqcache oid maps")));
3233 }
3234 }
3235 #endif
3236
3237 pool_init_memqcache_stats();
3238 }
3239
3240 /* Initialize statistics area */
3241 stat_set_stat_area(pool_shared_memory_create(stat_shared_memory_size()));
3242 stat_init_stat_area();
3243 /* initialize watchdog IPC unix domain socket address */
3244 if (pool_config->use_watchdog)
3245 {
3246 wd_ipc_initialize_data();
3247 }
3248 }
3249
3250 /*
3251 * Read the status file
3252 */
read_status_file(bool discard_status)3253 static int read_status_file(bool discard_status)
3254 {
3255 FILE *fd;
3256 char fnamebuf[POOLMAXPATHLEN];
3257 int i;
3258 bool someone_wakeup = false;
3259 bool is_old_format;
3260
3261 snprintf(fnamebuf, sizeof(fnamebuf), "%s/%s", pool_config->logdir, STATUS_FILE_NAME);
3262 fd = fopen(fnamebuf, "r");
3263 if (!fd)
3264 {
3265 ereport(LOG,
3266 (errmsg("Backend status file %s does not exist", fnamebuf)));
3267 return -1;
3268 }
3269
3270 /*
3271 * If discard_status is true, unlink pgpool_status and
3272 * do not restore previous status.
3273 */
3274 if (discard_status)
3275 {
3276 fclose(fd);
3277 if (unlink(fnamebuf) == 0)
3278 ereport(LOG,
3279 (errmsg("Backend status file %s discarded", fnamebuf)));
3280 else
3281 ereport(WARNING,
3282 (errmsg("failed to discard backend status file: \"%s\" with reason: \"%s\"", fnamebuf, strerror(errno))));
3283 return 0;
3284 }
3285
3286 /*
3287 * Frist try out with old format file.
3288 */
3289 is_old_format = true;
3290
3291 if (fread(&backend_rec, 1, sizeof(backend_rec), fd) == sizeof(backend_rec))
3292 {
3293 /* It's likely old binary format status file */
3294 for (i=0;i< pool_config->backend_desc->num_backends;i++)
3295 {
3296 if (backend_rec.status[i] == CON_DOWN)
3297 {
3298 BACKEND_INFO(i).backend_status = CON_DOWN;
3299 (void)write_status_file();
3300 ereport(LOG,
3301 (errmsg("read_status_file: %d th backend is set to down status", i)));
3302 }
3303 else if (backend_rec.status[i] == CON_CONNECT_WAIT ||
3304 backend_rec.status[i] == CON_UP)
3305 {
3306 BACKEND_INFO(i).backend_status = CON_CONNECT_WAIT;
3307 (void)write_status_file();
3308 someone_wakeup = true;
3309 }
3310 else
3311 {
3312 /* It seems it's not an old binary format status file */
3313 is_old_format = false;
3314 break;
3315 }
3316 }
3317 }
3318 else
3319 is_old_format = false;
3320
3321 fclose(fd);
3322
3323 if (!is_old_format)
3324 {
3325 /*
3326 * Fall back to new ascii format file.
3327 * the format looks like(case is ignored):
3328 *
3329 * up|down|unused
3330 * UP|down|unused
3331 * :
3332 * :
3333 */
3334 #define MAXLINE 10
3335 char readbuf[MAXLINE];
3336
3337 fd = fopen(fnamebuf, "r");
3338 if (!fd)
3339 {
3340 ereport(LOG,
3341 (errmsg("Backend status file %s does not exist", fnamebuf)));
3342 return -1;
3343 }
3344
3345 for (i=0;i<MAX_NUM_BACKENDS;i++)
3346 {
3347 BACKEND_INFO(i).backend_status = CON_UNUSED;
3348 }
3349
3350 for (i=0;;i++)
3351 {
3352 readbuf[MAXLINE-1] = '\0';
3353 if (fgets(readbuf, MAXLINE-1, fd) == 0)
3354 break;
3355
3356 if (!strncasecmp("up", readbuf, 2))
3357 {
3358 BACKEND_INFO(i).backend_status = CON_UP;
3359 someone_wakeup = true;
3360 }
3361 else if (!strncasecmp("down", readbuf, 4))
3362 {
3363 BACKEND_INFO(i).backend_status = CON_DOWN;
3364 ereport(LOG,
3365 (errmsg("reading status file: %d th backend is set to down status", i)));
3366 }
3367 else if (!strncasecmp("unused", readbuf, 6))
3368 {
3369 BACKEND_INFO(i).backend_status = CON_UNUSED;
3370 }
3371 else
3372 {
3373 ereport(WARNING,
3374 (errmsg("invalid data in status file, ignoring..."),
3375 errdetail("backend:%d status is invalid: \"%s\"",i,readbuf)));
3376 }
3377 }
3378 fclose(fd);
3379 }
3380
3381 /*
3382 * If no one woke up, we regard the status file bogus
3383 */
3384 if (someone_wakeup == false)
3385 {
3386 for (i=0;i< pool_config->backend_desc->num_backends;i++)
3387 {
3388 BACKEND_INFO(i).backend_status = CON_CONNECT_WAIT;
3389 }
3390 (void)write_status_file();
3391 }
3392
3393 return 0;
3394 }
3395
3396 /*
3397 * Write the status file
3398 */
write_status_file(void)3399 int write_status_file(void)
3400 {
3401 FILE *fd;
3402 int fdnum;
3403 char fnamebuf[POOLMAXPATHLEN];
3404 char buf[10];
3405 int i;
3406
3407 if (!pool_config)
3408 {
3409 ereport(WARNING,
3410 (errmsg("pool_config is not set")));
3411 return 0;
3412 }
3413
3414 /* Check to see if all nodes are down status.
3415 * If so, skip writing status file.
3416 * So pgpool_status will
3417 * always reflect the last set of nodes to which any data was written.
3418 * Upon restart, if the up-to-date (previously "up") node is in fact down
3419 * (regardless of whether the stale ("down") node is back up), pgpool
3420 * will detect this in its health check and will fail; if the up-to-date
3421 * (previously "up") node is back up, then pgpool will commence using it.
3422 *
3423 * See [pgpool-general: 4721] for more discussion.
3424 */
3425 for (i=0;i< pool_config->backend_desc->num_backends;i++)
3426 {
3427 if (BACKEND_INFO(i).backend_status != CON_DOWN)
3428 break;
3429 }
3430
3431 if (i && i == pool_config->backend_desc->num_backends)
3432 {
3433 ereport(WARNING,
3434 (errmsg("All the DB nodes are in down status and skip writing status file.")));
3435
3436 return 0;
3437 }
3438
3439 snprintf(fnamebuf, sizeof(fnamebuf), "%s/%s", pool_config->logdir, STATUS_FILE_NAME);
3440 fd = fopen(fnamebuf, "w");
3441 if (!fd)
3442 {
3443 ereport(WARNING,
3444 (errmsg("failed to open status file at: \"%s\"",fnamebuf),
3445 errdetail("\"%s\"",strerror(errno))));
3446 return -1;
3447 }
3448
3449 for (i=0;i< pool_config->backend_desc->num_backends;i++)
3450 {
3451 char* status;
3452
3453 if (BACKEND_INFO(i).backend_status == CON_UP ||
3454 BACKEND_INFO(i).backend_status == CON_CONNECT_WAIT)
3455 status = "up";
3456 else if (BACKEND_INFO(i).backend_status == CON_DOWN)
3457 status = "down";
3458 else
3459 status = "unused";
3460
3461 sprintf(buf, "%s\n", status);
3462 if (fwrite(buf, 1, strlen(buf), fd) != strlen(buf))
3463 {
3464 ereport(WARNING,
3465 (errmsg("failed to write status file at: \"%s\"",fnamebuf),
3466 errdetail("\"%s\"",strerror(errno))));
3467 fclose(fd);
3468 return -1;
3469 }
3470 }
3471
3472 if (fflush(fd) != 0)
3473 {
3474 ereport(WARNING,
3475 (errmsg("failed to write status file at: \"%s\"",fnamebuf),
3476 errdetail("\"%s\"",strerror(errno))));
3477 fclose(fd);
3478 return -1;
3479 }
3480 fdnum = fileno(fd);
3481 if (fdnum < 0)
3482 {
3483 ereport(WARNING,
3484 (errmsg("failed to get file number. fsync() will not be performed: \"%s\"",fnamebuf),
3485 errdetail("\"%s\"",strerror(errno))));
3486 fclose(fd);
3487 return -1;
3488 }
3489 if (fsync(fdnum) != 0)
3490 {
3491 ereport(WARNING,
3492 (errmsg("failed to fsync(): \"%s\"",fnamebuf),
3493 errdetail("\"%s\"",strerror(errno))));
3494 fclose(fd);
3495 return -1;
3496 }
3497
3498 fclose(fd);
3499
3500 return 0;
3501 }
3502
reload_config(void)3503 static void reload_config(void)
3504 {
3505 ereport(LOG,
3506 (errmsg("reload config files.")));
3507 MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
3508 pool_get_config(conf_file, CFGCXT_RELOAD);
3509
3510 /* Realoading config file could change backend status */
3511 (void)write_status_file();
3512
3513 MemoryContextSwitchTo(oldContext);
3514 if (pool_config->enable_pool_hba)
3515 load_hba(hba_file);
3516 kill_all_children(SIGHUP);
3517
3518 if (worker_pid)
3519 kill(worker_pid, SIGHUP);
3520 }
3521
3522 /* Call back function to unlink the file */
FileUnlink(int code,Datum path)3523 static void FileUnlink(int code, Datum path)
3524 {
3525 char* filePath = (char*)path;
3526 if (unlink(filePath) == 0) return;
3527 /*
3528 * We are already exiting the system just produce a log entry to report an error
3529 */
3530 ereport(LOG,
3531 (errmsg("unlink failed for file at path \"%s\"", filePath),
3532 errdetail("%s", strerror(errno))));
3533 }
3534
system_will_go_down(int code,Datum arg)3535 static void system_will_go_down(int code, Datum arg)
3536 {
3537 if(mypid != getpid())
3538 {
3539 /* should never happen */
3540 ereport(LOG,(errmsg("system_will_go_down called from invalid process")));
3541 return;
3542 }
3543 POOL_SETMASK(&AuthBlockSig);
3544 /* Write status file */
3545 (void)write_status_file();
3546 /*
3547 * Terminate all childrens. But we may already have killed
3548 * all the childrens if we come to this function because of shutdown
3549 * signal.
3550 */
3551 if(processState != EXITING)
3552 terminate_all_childrens();
3553 processState = EXITING;
3554 POOL_SETMASK(&UnBlockSig);
3555
3556 }
3557
pool_send_to_frontend(char * data,int len,bool flush)3558 int pool_send_to_frontend(char* data, int len, bool flush)
3559 {
3560 if (processType == PT_PCP_WORKER)
3561 return send_to_pcp_frontend(data, len, flush);
3562 else if (processType == PT_CHILD)
3563 return send_to_pg_frontend(data, len, flush);
3564 return -1;
3565 }
3566
pool_frontend_exists(void)3567 int pool_frontend_exists(void)
3568 {
3569 if (processType == PT_PCP_WORKER)
3570 return pcp_frontend_exists();
3571 else if (processType == PT_CHILD)
3572 return pg_frontend_exists();
3573 return -1;
3574 }
3575
3576 /*
3577 * The function fetch the current status of all configured backend
3578 * nodes from the MASTER/COORDINATOR watchdog Pgpool-II and synchronize the
3579 * local backend states with the cluster wide status of each node.
3580 *
3581 * Latter in the funcrtion after syncing the backend node status the function
3582 * do a partial or full restart of Pgpool-II children depending upon the
3583 * Pgpool-II mode and type of node status change
3584 *
3585 */
sync_backend_from_watchdog(void)3586 static void sync_backend_from_watchdog(void)
3587 {
3588 bool primary_changed = false;
3589 bool node_status_was_changed_to_down = false;
3590 bool node_status_was_changed_to_up = false;
3591 bool need_to_restart_children = false;
3592 bool partial_restart = false;
3593 bool reload_maste_node_id = false;
3594
3595 int down_node_ids[MAX_NUM_BACKENDS];
3596 int down_node_ids_index = 0;
3597 int i;
3598
3599 /*
3600 * Ask the watchdog to get all the backend states from the Master/Coordinator
3601 * Pgpool-II node
3602 */
3603 WDPGBackendStatus* backendStatus = get_pg_backend_status_from_master_wd_node();
3604 if (!backendStatus)
3605 {
3606 ereport(WARNING,
3607 (errmsg("failed to get the backend status from the master watchdog node"),
3608 errdetail("using the local backend node status")));
3609 return;
3610 }
3611 if (backendStatus->node_count <= 0)
3612 {
3613 /*
3614 * -ve node count is returned by watchdog when the node itself is a master
3615 * and in that case we need to use the loacl backend node status
3616 */
3617 ereport(LOG,
3618 (errmsg("I am the master watchdog node"),
3619 errdetail("using the local backend node status")));
3620 pfree(backendStatus);
3621 return;
3622 }
3623
3624 ereport(LOG,
3625 (errmsg("master watchdog node \"%s\" returned status for %d backend nodes",backendStatus->nodeName,backendStatus->node_count)));
3626
3627 ereport(DEBUG1,
3628 (errmsg("primary node on master watchdog node \"%s\" is %d",backendStatus->nodeName,backendStatus->primary_node_id)));
3629
3630 if (Req_info->primary_node_id != backendStatus->primary_node_id)
3631 {
3632 /* Do not produce this log message if we are starting up the Pgpool-II*/
3633 if (processState != INITIALIZING)
3634 ereport(LOG,
3635 (errmsg("primary node:%d on master watchdog node \"%s\" is different from local primary node:%d",
3636 backendStatus->primary_node_id,backendStatus->nodeName,Req_info->primary_node_id)));
3637
3638 Req_info->primary_node_id = backendStatus->primary_node_id;
3639 primary_changed = true;
3640 }
3641 /* update the local backend status*/
3642 for (i = 0; i < backendStatus->node_count; i++)
3643 {
3644 if (backendStatus->backend_status[i] == CON_DOWN)
3645 {
3646 if (BACKEND_INFO(i).backend_status != CON_DOWN)
3647 {
3648 BACKEND_INFO(i).backend_status = CON_DOWN;
3649 my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
3650 reload_maste_node_id = true;
3651 node_status_was_changed_to_down = true;
3652 ereport(LOG,
3653 (errmsg("backend:%d is set to down status", i),
3654 errdetail("backend:%d is DOWN on cluster master \"%s\"",i,backendStatus->nodeName)));
3655 down_node_ids[down_node_ids_index++] = i;
3656 }
3657 }
3658 else if (backendStatus->backend_status[i] == CON_CONNECT_WAIT ||
3659 backendStatus->backend_status[i] == CON_UP)
3660 {
3661 if (BACKEND_INFO(i).backend_status != CON_CONNECT_WAIT)
3662 {
3663 if (BACKEND_INFO(i).backend_status == CON_DOWN)
3664 node_status_was_changed_to_up = true;
3665
3666 BACKEND_INFO(i).backend_status = CON_CONNECT_WAIT;
3667 my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
3668 reload_maste_node_id = true;
3669
3670 ereport(LOG,
3671 (errmsg("backend:%d is set to UP status", i),
3672 errdetail("backend:%d is UP on cluster master \"%s\"",i,backendStatus->nodeName)));
3673
3674 }
3675 }
3676 }
3677 pfree(backendStatus);
3678
3679 if (reload_maste_node_id)
3680 {
3681 Req_info->master_node_id = get_next_master_node();
3682 }
3683
3684 /* We don't need to do anything else if the Pgpool-II is starting up */
3685 if (processState == INITIALIZING)
3686 return;
3687
3688 /*
3689 * Decide if All or subset of the Pgpool-II children needs immidiate
3690 * restart or we can do that after finishing the current session
3691 *
3692 * Check if there was no change at all
3693 */
3694 if (node_status_was_changed_to_up == false &&
3695 node_status_was_changed_to_down == false &&
3696 primary_changed == false)
3697 {
3698 ereport(LOG,
3699 (errmsg("backend nodes status remains same after the sync from \"%s\"",backendStatus->nodeName)));
3700 return;
3701 }
3702 if (!STREAM)
3703 {
3704 /* If we are not in streaming replication mode
3705 * restart all child processes
3706 */
3707 ereport(LOG,
3708 (errmsg("node status was changed after the sync from \"%s\"", backendStatus->nodeName),
3709 errdetail("all children needs to be restarted as we are not in streaming replication mode")));
3710 need_to_restart_children = true;
3711 partial_restart = false;
3712 }
3713 else if (primary_changed)
3714 {
3715 /* if Primary node was changed, We should restart all
3716 * children
3717 */
3718 need_to_restart_children = true;
3719 partial_restart = false;
3720 ereport(LOG,
3721 (errmsg("primary node was changed after the sync from \"%s\"", backendStatus->nodeName),
3722 errdetail("all children needs to be restarted")));
3723 }
3724 else
3725 {
3726 if (node_status_was_changed_to_down == false)
3727 {
3728 /* no node was detached, So no need to restart
3729 * any child process
3730 */
3731 need_to_restart_children = false;
3732 partial_restart = false;
3733 ereport(LOG,
3734 (errmsg("No backend node was detached because of backend status sync from \"%s\"",backendStatus->nodeName),
3735 errdetail("no need to restart children")));
3736 }
3737 else
3738 {
3739 ereport(LOG,
3740 (errmsg("%d backend node(s) were detached because of backend status sync from \"%s\"",down_node_ids_index,backendStatus->nodeName),
3741 errdetail("restarting the children processes")));
3742
3743 need_to_restart_children = true;
3744 partial_restart = !check_all_backend_down();
3745 }
3746 }
3747
3748 /* Kill children and restart them if needed */
3749 if (need_to_restart_children)
3750 {
3751 for (i=0;i<pool_config->num_init_children;i++)
3752 {
3753 bool restart = false;
3754
3755 if (partial_restart)
3756 {
3757 int j, k;
3758 for (j=0;j<pool_config->max_pool;j++)
3759 {
3760 for (k=0;k<NUM_BACKENDS;k++)
3761 {
3762 int idx;
3763 ConnectionInfo *con = pool_coninfo(i, j, k);
3764 for (idx = 0; idx < down_node_ids_index; idx ++)
3765 {
3766 int node_id = down_node_ids[idx];
3767 if (con->connected && con->load_balancing_node == node_id)
3768 {
3769 ereport(LOG,
3770 (errmsg("child process with PID:%d needs restart, because pool %d uses backend %d",
3771 process_info[i].pid, j, node_id)));
3772 restart = true;
3773 break;
3774 }
3775 if (restart)
3776 break;
3777 }
3778 }
3779 }
3780 }
3781 else
3782 {
3783 restart = true;
3784 }
3785
3786 if (restart)
3787 {
3788 if (process_info[i].pid)
3789 {
3790 kill(process_info[i].pid, SIGQUIT);
3791
3792 process_info[i].pid = fork_a_child(fds, i);
3793 process_info[i].start_time = time(NULL);
3794 }
3795 }
3796 else
3797 process_info[i].need_to_restart = 1;
3798 }
3799 }
3800
3801 else
3802 {
3803 /* Set restart request to each child. Children will exit(1)
3804 * whenever they are convenient.
3805 */
3806 for (i=0;i<pool_config->num_init_children;i++)
3807 {
3808 process_info[i].need_to_restart = 1;
3809 }
3810 }
3811
3812 /*
3813 * Send restart request to worker child.
3814 */
3815 kill(worker_pid, SIGUSR1);
3816 }
3817