1 /*
2 * $Header$
3 *
4 * Handles watchdog connection, and protocol communication with pgpool-II
5 *
6 * pgpool: a language independent connection pool server for PostgreSQL
7 * written by Tatsuo Ishii
8 *
9 * Copyright (c) 2003-2016 PgPool Global Development Group
10 *
11 * Permission to use, copy, modify, and distribute this software and
12 * its documentation for any purpose and without fee is hereby
13 * granted, provided that the above copyright notice appear in all
14 * copies and that both that copyright notice and this permission
15 * notice appear in supporting documentation, and that the name of the
16 * author not be used in advertising or publicity pertaining to
17 * distribution of the software without specific, written prior
18 * permission. The author makes no representations about the
19 * suitability of this software for any purpose. It is provided "as
20 * is" without express or implied warranty.
21 *
22 */
23 #include <pthread.h>
24 #include <stdio.h>
25 #include <errno.h>
26 #include <ctype.h>
27 #include <time.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31 #include <netdb.h>
32 #include <sys/wait.h>
33
34 #include "pool.h"
35 #include "pool_config.h"
36 #include "utils/elog.h"
37 #include "utils/json.h"
38 #include "utils/json_writer.h"
39 #include "utils/elog.h"
40 #include "utils/palloc.h"
41 #include "utils/memutils.h"
42
43 #include "watchdog/wd_utils.h"
44 #include "watchdog/wd_lifecheck.h"
45 #include "watchdog/wd_ipc_defines.h"
46 #include "watchdog/wd_ipc_commands.h"
47 #include "watchdog/wd_json_data.h"
48
49 #include "libpq-fe.h"
50
51 #define LIFECHECK_GETNODE_WAIT_SEC_COUNT 5 /* max number of seconds the lifecheck process
52 * should waits before giving up
53 * while fetching the configured watchdog node
54 * information from watchdog process through IPC channel
55 */
56
57 /*
58 * thread argument for lifecheck of pgpool
59 */
60 typedef struct {
61 LifeCheckNode *lifeCheckNode;
62 int retry; /* retry times (not used?)*/
63 } WdPgpoolThreadArg;
64
65 typedef struct WdUpstreamConnectionData
66 {
67 char* hostname; /* host name of server */
68 pid_t pid; /* pid of ping process */
69 bool reachable; /* true if last ping was successful */
70 int outputfd; /* pipe fd linked to output of ping process */
71 }WdUpstreamConnectionData;
72
73
74 List* g_trusted_server_list = NIL;
75
76 static void wd_initialize_trusted_servers_list(void);
77 static bool wd_ping_all_server(void);
78 static WdUpstreamConnectionData* wd_get_server_from_pid(pid_t pid);
79
80 static void * thread_ping_pgpool(void * arg);
81 static PGconn * create_conn(char * hostname, int port);
82
83 static pid_t lifecheck_main(void);
84 static void check_pgpool_status(void);
85 static void check_pgpool_status_by_query(void);
86 static void check_pgpool_status_by_hb(void);
87 static int ping_pgpool(PGconn * conn);
88 static int is_parent_alive(void);
89 static bool fetch_watchdog_nodes_data(void);
90 static int wd_check_heartbeat(LifeCheckNode* node);
91
92 static void load_watchdog_nodes_from_json(char* json_data, int len);
93 static void spawn_lifecheck_children(void);
94
95 static RETSIGTYPE lifecheck_exit_handler(int sig);
96 static RETSIGTYPE reap_handler(int sig);
97 static pid_t wd_reaper_lifecheck(pid_t pid, int status);
98
99 static bool lifecheck_kill_all_children(int sig);
100 static const char* lifecheck_child_name(pid_t pid);
101 static void reaper(void);
102 static int is_wd_lifecheck_ready(void);
103 static int wd_lifecheck(void);
104 static int wd_ping_pgpool(LifeCheckNode* node);
105 static pid_t fork_lifecheck_child(void);
106
107
108 LifeCheckCluster* gslifeCheckCluster = NULL; /* lives in shared memory */
109
110 pid_t *g_hb_receiver_pid = NULL; /* Array of heart beat receiver child pids */
111 pid_t *g_hb_sender_pid = NULL; /* Array of heart beat sender child pids */
112 static volatile sig_atomic_t sigchld_request = 0;
113
114
115 /*
116 * handle SIGCHLD
117 */
reap_handler(int sig)118 static RETSIGTYPE reap_handler(int sig)
119 {
120 POOL_SETMASK(&BlockSig);
121 sigchld_request = 1;
122 POOL_SETMASK(&UnBlockSig);
123 }
124
125
lifecheck_child_name(pid_t pid)126 static const char* lifecheck_child_name(pid_t pid)
127 {
128 int i;
129 for (i = 0; i < pool_config->num_hb_if; i++)
130 {
131 if (g_hb_receiver_pid && pid == g_hb_receiver_pid[i])
132 return "heartBeat receiver";
133 else if (g_hb_sender_pid && pid == g_hb_sender_pid[i])
134 return "heartBeat sender";
135 }
136 /* Check if it was a ping to trusted server process */
137 WdUpstreamConnectionData* server = wd_get_server_from_pid(pid);
138 if (server)
139 return "trusted host ping process";
140 return "unknown";
141 }
142
reaper(void)143 static void reaper(void)
144 {
145 pid_t pid;
146 int status;
147
148 ereport(DEBUG1,
149 (errmsg("Lifecheck child reaper handler")));
150
151 /* clear SIGCHLD request */
152 sigchld_request = 0;
153
154 while ((pid = pool_waitpid(&status)) > 0)
155 {
156 /* First check if it is the trusted server ping process */
157 WdUpstreamConnectionData* server = wd_get_server_from_pid(pid);
158 if (server)
159 {
160 server->reachable = wd_get_ping_result(server->hostname, status, server->outputfd);
161 server->pid = 0;
162 close(server->outputfd);
163 }
164 else
165 wd_reaper_lifecheck(pid,status);
166 }
167 }
168
169 static pid_t
wd_reaper_lifecheck(pid_t pid,int status)170 wd_reaper_lifecheck(pid_t pid, int status)
171 {
172 int i;
173 bool restart_child = true;
174 const char* proc_name = lifecheck_child_name(pid);
175
176 if(WIFEXITED(status))
177 {
178 if(WEXITSTATUS(status) == POOL_EXIT_FATAL)
179 ereport(LOG,
180 (errmsg("lifecheck child process (%s) with pid: %d exit with FATAL ERROR.",proc_name, pid)));
181 else if(WEXITSTATUS(status) == POOL_EXIT_NO_RESTART)
182 {
183 restart_child = false;
184 ereport(LOG,
185 (errmsg("lifecheck child process (%s) with pid: %d exit with SUCCESS.", lifecheck_child_name(pid), pid)));
186 }
187 }
188 if (WIFSIGNALED(status))
189 {
190 /* Child terminated by segmentation fault. Report it */
191 if(WTERMSIG(status) == SIGSEGV)
192 ereport(WARNING,
193 (errmsg("lifecheck child process (%s) with pid: %d was terminated by segmentation fault",proc_name,pid)));
194 else
195 ereport(LOG,
196 (errmsg("lifecheck child process (%s) with pid: %d exits with status %d by signal %d", proc_name, pid, status, WTERMSIG(status))));
197 }
198 else
199 ereport(LOG,
200 (errmsg("lifecheck child process (%s) with pid: %d exits with status %d",proc_name, pid, status)));
201
202 if (g_hb_receiver_pid == NULL && g_hb_sender_pid == NULL)
203 return -1;
204
205 for (i = 0; i < pool_config->num_hb_if; i++)
206 {
207 if (g_hb_receiver_pid && pid == g_hb_receiver_pid[i])
208 {
209 if(restart_child)
210 {
211 g_hb_receiver_pid[i] = wd_hb_receiver(1, &(pool_config->hb_if[i]));
212 ereport(LOG,
213 (errmsg("fork a new %s process with pid: %d",proc_name, g_hb_receiver_pid[i])));
214 }
215 else
216 g_hb_receiver_pid[i] = 0;
217
218 return g_hb_receiver_pid[i];
219 }
220
221 else if (g_hb_sender_pid && pid == g_hb_sender_pid[i])
222 {
223 if(restart_child)
224 {
225 g_hb_sender_pid[i] = wd_hb_sender(1, &(pool_config->hb_if[i]));
226 ereport(LOG,
227 (errmsg("fork a new %s process with pid: %d",proc_name, g_hb_sender_pid[i])));
228 }
229 else
230 g_hb_sender_pid[i] = 0;
231
232 return g_hb_sender_pid[i];
233 }
234 }
235 return -1;
236 }
237
lifecheck_kill_all_children(int sig)238 static bool lifecheck_kill_all_children(int sig)
239 {
240 bool ret = false;
241 if (pool_config->wd_lifecheck_method == LIFECHECK_BY_HB
242 && g_hb_receiver_pid && g_hb_sender_pid)
243 {
244 int i;
245 for (i = 0; i < pool_config->num_hb_if; i++)
246 {
247 pid_t pid_child = g_hb_receiver_pid[i];
248
249 if (pid_child > 0)
250 {
251 kill(pid_child,sig);
252 ret = true;
253 }
254
255 pid_child = g_hb_sender_pid[i];
256
257 if (pid_child > 0)
258 {
259 kill(pid_child,sig);
260 ret = true;
261 }
262 }
263 }
264 return ret;
265 }
266
267 static RETSIGTYPE
lifecheck_exit_handler(int sig)268 lifecheck_exit_handler(int sig)
269 {
270 pid_t wpid;
271 bool child_killed;
272
273 POOL_SETMASK(&AuthBlockSig);
274 ereport(DEBUG1,
275 (errmsg("lifecheck child receives shutdown request signal %d, forwarding to all children", sig)));
276
277 if (sig == SIGTERM) /* smart shutdown */
278 {
279 ereport(DEBUG1,
280 (errmsg("lifecheck child receives smart shutdown request")));
281 }
282 else if (sig == SIGINT)
283 {
284 ereport(DEBUG1,
285 (errmsg("lifecheck child receives fast shutdown request")));
286 }
287 else if (sig == SIGQUIT)
288 {
289 ereport(DEBUG1,
290 (errmsg("lifecheck child receives immediate shutdown request")));
291 }
292
293 child_killed = lifecheck_kill_all_children(sig);
294
295 POOL_SETMASK(&UnBlockSig);
296
297 if (child_killed)
298 {
299 do
300 {
301 wpid = wait(NULL);
302 }while (wpid > 0 || (wpid == -1 && errno == EINTR));
303
304 if (wpid == -1 && errno != ECHILD)
305 ereport(WARNING,
306 (errmsg("wait() on lifecheck children failed. reason:%s", strerror(errno))));
307
308 if (g_hb_receiver_pid)
309 pfree(g_hb_receiver_pid);
310
311 if (g_hb_sender_pid)
312 pfree(g_hb_sender_pid);
313
314 g_hb_receiver_pid = NULL;
315 g_hb_sender_pid = NULL;
316 }
317 exit(0);
318 }
319
initialize_watchdog_lifecheck(void)320 pid_t initialize_watchdog_lifecheck(void)
321 {
322 if (!pool_config->use_watchdog)
323 return 0;
324
325 if (pool_config->wd_lifecheck_method == LIFECHECK_BY_EXTERNAL)
326 return 0;
327
328 return fork_lifecheck_child();
329 }
330
331 /*
332 * fork a child for lifecheck
333 */
fork_lifecheck_child(void)334 static pid_t fork_lifecheck_child(void)
335 {
336 pid_t pid;
337
338 pid = fork();
339
340 if (pid == 0)
341 {
342 on_exit_reset();
343
344 /* Set the process type variable */
345 processType = PT_LIFECHECK;
346
347 /* call lifecheck child main */
348 POOL_SETMASK(&UnBlockSig);
349 lifecheck_main();
350 }
351 else if (pid == -1)
352 {
353 ereport(FATAL,
354 (errmsg("fork() failed. reason: %s", strerror(errno))));
355 }
356
357 return pid;
358 }
359
360
361 /* main entry point of watchdog lifecheck process*/
362 static pid_t
lifecheck_main(void)363 lifecheck_main(void)
364 {
365 sigjmp_buf local_sigjmp_buf;
366 int i;
367
368 ereport(DEBUG1,
369 (errmsg("I am watchdog lifecheck child with pid:%d",getpid())));
370
371 /* Identify myself via ps */
372 init_ps_display("", "", "", "");
373
374 pool_signal(SIGTERM, lifecheck_exit_handler);
375 pool_signal(SIGINT, lifecheck_exit_handler);
376 pool_signal(SIGQUIT, lifecheck_exit_handler);
377 pool_signal(SIGCHLD, reap_handler);
378 pool_signal(SIGHUP, SIG_IGN);
379 pool_signal(SIGPIPE, SIG_IGN);
380
381 /* Create per loop iteration memory context */
382 ProcessLoopContext = AllocSetContextCreate(TopMemoryContext,
383 "wd_lifecheck_main_loop",
384 ALLOCSET_DEFAULT_MINSIZE,
385 ALLOCSET_DEFAULT_INITSIZE,
386 ALLOCSET_DEFAULT_MAXSIZE);
387
388 MemoryContextSwitchTo(TopMemoryContext);
389
390 set_ps_display("lifecheck",false);
391
392 /* Get the list of watchdog node to monitor
393 * from watchdog process
394 */
395 for (i =0; i < LIFECHECK_GETNODE_WAIT_SEC_COUNT; i++)
396 {
397 if (fetch_watchdog_nodes_data() == true)
398 break;
399 sleep(1);
400 }
401
402 if (!gslifeCheckCluster)
403 ereport(ERROR,
404 (errmsg("unable to initialize lifecheck, watchdog not responding")));
405
406 spawn_lifecheck_children();
407
408 wd_initialize_trusted_servers_list();
409
410 /* wait until ready to go */
411 while (WD_OK != is_wd_lifecheck_ready())
412 {
413 sleep(pool_config->wd_interval * 10);
414 }
415
416 ereport(LOG,
417 (errmsg("watchdog: lifecheck started")));
418
419 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
420 {
421 /* Since not using PG_TRY, must reset error stack by hand */
422 error_context_stack = NULL;
423
424 EmitErrorReport();
425 MemoryContextSwitchTo(TopMemoryContext);
426 FlushErrorState();
427 sleep(pool_config->wd_heartbeat_keepalive);
428 }
429
430 /* We can now handle ereport(ERROR) */
431 PG_exception_stack = &local_sigjmp_buf;
432
433 /* watchdog loop */
434 for (;;)
435 {
436 MemoryContextSwitchTo(ProcessLoopContext);
437 MemoryContextResetAndDeleteChildren(ProcessLoopContext);
438
439 if (sigchld_request)
440 reaper();
441
442 /* pgpool life check */
443 wd_lifecheck();
444 sleep(pool_config->wd_interval);
445 }
446
447 return 0;
448 }
449
spawn_lifecheck_children(void)450 static void spawn_lifecheck_children(void)
451 {
452 if (pool_config->wd_lifecheck_method == LIFECHECK_BY_HB)
453 {
454 int i;
455 g_hb_receiver_pid = palloc0(sizeof(pid_t) * pool_config->num_hb_if);
456 g_hb_sender_pid = palloc0(sizeof(pid_t) * pool_config->num_hb_if);
457
458 for (i = 0; i < pool_config->num_hb_if; i++)
459 {
460 /* heartbeat receiver process */
461 g_hb_receiver_pid[i] = wd_hb_receiver(1, &(pool_config->hb_if[i]));
462
463 /* heartbeat sender process */
464 g_hb_sender_pid[i] = wd_hb_sender(1, &(pool_config->hb_if[i]));
465 }
466 }
467 }
468
print_lifecheck_cluster(void)469 static void print_lifecheck_cluster(void)
470 {
471 int i;
472 if (!gslifeCheckCluster)
473 return;
474 ereport(LOG,
475 (errmsg("%d watchdog nodes are configured for lifecheck",gslifeCheckCluster->nodeCount)));
476 for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
477 {
478 ereport(LOG,
479 (errmsg("watchdog nodes ID:%d Name:\"%s\"",gslifeCheckCluster->lifeCheckNodes[i].ID,gslifeCheckCluster->lifeCheckNodes[i].nodeName),
480 errdetail("Host:\"%s\" WD Port:%d pgpool-II port:%d",
481 gslifeCheckCluster->lifeCheckNodes[i].hostName,
482 gslifeCheckCluster->lifeCheckNodes[i].wdPort,
483 gslifeCheckCluster->lifeCheckNodes[i].pgpoolPort)));
484 }
485 }
486
inform_node_status(LifeCheckNode * node,char * message)487 static bool inform_node_status(LifeCheckNode* node, char *message)
488 {
489 int node_status,x;
490 char* json_data;
491 WDIPCCmdResult* res = NULL;
492 char* new_status;
493
494 if (node->nodeState == NODE_DEAD)
495 {
496 new_status = "NODE DEAD";
497 node_status = WD_LIFECHECK_NODE_STATUS_DEAD;
498 }
499 else if (node->nodeState == NODE_ALIVE)
500 {
501 new_status = "NODE ALIVE";
502 node_status = WD_LIFECHECK_NODE_STATUS_ALIVE;
503 }
504 else
505 return false;
506
507 ereport(LOG,
508 (errmsg("informing the node status change to watchdog"),
509 errdetail("node id :%d status = \"%s\" message:\"%s\"",node->ID,new_status,message)));
510
511 json_data = get_lifecheck_node_status_change_json(node->ID, node_status, message, pool_config->wd_authkey);
512 if (json_data == NULL)
513 return false;
514
515 for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
516 {
517 res = issue_command_to_watchdog(WD_NODE_STATUS_CHANGE_COMMAND ,0, json_data, strlen(json_data),false);
518 if (res)
519 break;
520 sleep(1);
521 }
522 pfree(json_data);
523 if (res)
524 {
525 pfree(res);
526 return true;
527 }
528 return false;
529 }
530
fetch_watchdog_nodes_data(void)531 static bool fetch_watchdog_nodes_data(void)
532 {
533 char* json_data = wd_get_watchdog_nodes(-1);
534 if (json_data == NULL)
535 {
536 ereport(ERROR,
537 (errmsg("get node list command reply contains no data")));
538 return false;
539 }
540 load_watchdog_nodes_from_json(json_data,strlen(json_data));
541 pfree(json_data);
542 return true;
543 }
544
load_watchdog_nodes_from_json(char * json_data,int len)545 static void load_watchdog_nodes_from_json(char* json_data, int len)
546 {
547 json_value* root;
548 json_value* value;
549 int i,nodeCount;
550
551 root = json_parse(json_data,len);
552
553 /* The root node must be object */
554 if (root == NULL || root->type != json_object)
555 {
556 json_value_free(root);
557 ereport(ERROR,
558 (errmsg("unable to parse json data for node list")));
559 }
560
561 if (json_get_int_value_for_key(root, "NodeCount", &nodeCount))
562 {
563 json_value_free(root);
564 ereport(ERROR,
565 (errmsg("invalid json data"),
566 errdetail("unable to find NodeCount node from data")));
567 }
568
569 /* find the WatchdogNodes array */
570 value = json_get_value_for_key(root,"WatchdogNodes");
571 if (value == NULL)
572 {
573 json_value_free(root);
574 ereport(ERROR,
575 (errmsg("invalid json data"),
576 errdetail("unable to find WatchdogNodes node from data")));
577 }
578 if (value->type != json_array)
579 {
580 json_value_free(root);
581 ereport(ERROR,
582 (errmsg("invalid json data"),
583 errdetail("WatchdogNodes node does not contains Array")));
584 }
585 if (nodeCount != value->u.array.length)
586 {
587 json_value_free(root);
588 ereport(ERROR,
589 (errmsg("invalid json data"),
590 errdetail("WatchdogNodes array contains %d nodes while expecting %d",value->u.array.length, nodeCount)));
591 }
592
593 /* okay we are done, put this in shared memory */
594 gslifeCheckCluster = pool_shared_memory_create(sizeof(LifeCheckCluster));
595 gslifeCheckCluster->nodeCount = nodeCount;
596 gslifeCheckCluster->lifeCheckNodes = pool_shared_memory_create(sizeof(LifeCheckNode) * gslifeCheckCluster->nodeCount);
597 for (i = 0; i < nodeCount; i++)
598 {
599 WDNodeInfo *nodeInfo = get_WDNodeInfo_from_wd_node_json(value->u.array.values[i]);
600
601 gslifeCheckCluster->lifeCheckNodes[i].nodeState = NODE_EMPTY;
602 gslifeCheckCluster->lifeCheckNodes[i].ID = nodeInfo->id;
603 strcpy(gslifeCheckCluster->lifeCheckNodes[i].hostName, nodeInfo->hostName);
604 strcpy(gslifeCheckCluster->lifeCheckNodes[i].nodeName, nodeInfo->nodeName);
605 gslifeCheckCluster->lifeCheckNodes[i].wdPort = nodeInfo->wd_port;
606 gslifeCheckCluster->lifeCheckNodes[i].pgpoolPort = nodeInfo->pgpool_port;
607 gslifeCheckCluster->lifeCheckNodes[i].retry_lives = pool_config->wd_life_point;
608 pfree(nodeInfo);
609 }
610 print_lifecheck_cluster();
611 json_value_free(root);
612 }
613
614
is_wd_lifecheck_ready(void)615 static int is_wd_lifecheck_ready(void)
616 {
617 int rtn = WD_OK;
618 int i;
619
620 for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
621 {
622 LifeCheckNode* node = &gslifeCheckCluster->lifeCheckNodes[i];
623 /* query mode */
624 if (pool_config->wd_lifecheck_method == LIFECHECK_BY_QUERY)
625 {
626 if (wd_ping_pgpool(node) == WD_NG)
627 {
628 ereport(DEBUG1,
629 (errmsg("watchdog checking life check is ready"),
630 errdetail("pgpool:%d at \"%s:%d\" has not started yet",
631 i, node->hostName, node->pgpoolPort)));
632 rtn = WD_NG;
633 }
634 }
635 /* heartbeat mode */
636 else if (pool_config->wd_lifecheck_method == LIFECHECK_BY_HB)
637 {
638 if (node->ID == 0) /* local node */
639 continue;
640
641 if (!WD_TIME_ISSET(node->hb_last_recv_time) ||
642 !WD_TIME_ISSET(node->hb_send_time))
643 {
644 ereport(DEBUG1,
645 (errmsg("watchdog checking life check is ready"),
646 errdetail("pgpool:%d at \"%s:%d\" has not send the heartbeat signal yet",
647 i, node->hostName, node->pgpoolPort)));
648 rtn = WD_NG;
649 }
650 }
651 /* otherwise */
652 else
653 {
654 ereport(ERROR,
655 (errmsg("checking if watchdog is ready, unkown watchdog mode \"%d\"",
656 pool_config->wd_lifecheck_method)));
657 }
658 }
659
660 return rtn;
661 }
662
663 /*
664 * Check if pgpool is living
665 */
wd_lifecheck(void)666 static int wd_lifecheck(void)
667 {
668 /* check upper connection */
669 if (strlen(pool_config->trusted_servers))
670 {
671 if(wd_ping_all_server() == false)
672 {
673 LifeCheckNode* node = &gslifeCheckCluster->lifeCheckNodes[0];
674
675 if (node->nodeState != NODE_DEAD)
676 {
677 node->nodeState = NODE_DEAD;
678 ereport(WARNING,
679 (errmsg("watchdog lifecheck, failed to connect to any trusted servers")));
680
681 inform_node_status(node,"trusted server is unreachable");
682 }
683 return WD_NG;
684 }
685 }
686
687 /* skip lifecheck during recovery execution */
688 if (*InRecovery != RECOVERY_INIT)
689 {
690 return WD_OK;
691 }
692
693 /* check and update pgpool status */
694 check_pgpool_status();
695
696 return WD_OK;
697 }
698
699 /*
700 * check and update pgpool status
701 */
702 static void
check_pgpool_status()703 check_pgpool_status()
704 {
705 /* query mode */
706 if (pool_config->wd_lifecheck_method == LIFECHECK_BY_QUERY)
707 {
708 check_pgpool_status_by_query();
709 }
710 /* heartbeat mode */
711 else if (pool_config->wd_lifecheck_method == LIFECHECK_BY_HB)
712 {
713 check_pgpool_status_by_hb();
714 }
715 }
716
717 static void
check_pgpool_status_by_hb(void)718 check_pgpool_status_by_hb(void)
719 {
720 int i;
721 struct timeval tv;
722 LifeCheckNode* node = &gslifeCheckCluster->lifeCheckNodes[0];
723 gettimeofday(&tv, NULL);
724
725 /* about myself */
726 /* parent is dead so it's orphan.... */
727 if (is_parent_alive() == WD_NG && node->nodeState != NODE_DEAD)
728 {
729 node->nodeState = NODE_DEAD;
730 ereport(LOG,
731 (errmsg("checking pgpool status by heartbeat"),
732 errdetail("lifecheck failed. pgpool (%s:%d) seems not to be working",
733 node->hostName, node->pgpoolPort)));
734
735 inform_node_status(node,"parent process is dead");
736
737 }
738
739
740 for (i = 1; i< gslifeCheckCluster->nodeCount; i++)
741 {
742 node = &gslifeCheckCluster->lifeCheckNodes[i];
743 ereport(DEBUG1,
744 (errmsg("watchdog life checking by heartbeat"),
745 errdetail("checking pgpool %d (%s:%d)",
746 i, node->hostName, node->pgpoolPort)));
747
748 if (wd_check_heartbeat(node) == WD_NG)
749 {
750 ereport(DEBUG2,
751 (errmsg("checking pgpool status by heartbeat"),
752 errdetail("lifecheck failed. pgpool: %d at \"%s:%d\" seems not to be working",
753 i, node->hostName, node->pgpoolPort)));
754
755 if (node->nodeState != NODE_DEAD)
756 {
757 node->nodeState = NODE_DEAD;
758 inform_node_status(node,"No heartbeat signal from node");
759 }
760 }
761 else
762 {
763 ereport(DEBUG1,
764 (errmsg("checking pgpool status by heartbeat"),
765 errdetail("OK; status OK")));
766 }
767 }
768 }
769
770 static void
check_pgpool_status_by_query(void)771 check_pgpool_status_by_query(void)
772 {
773 pthread_attr_t attr;
774 pthread_t thread[MAX_WATCHDOG_NUM];
775 WdPgpoolThreadArg thread_arg[MAX_WATCHDOG_NUM];
776 LifeCheckNode* node;
777 int rc,i;
778
779 /* thread init */
780 pthread_attr_init(&attr);
781 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
782
783 /* send queries to all pgpools using threads */
784 for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
785 {
786 node = &gslifeCheckCluster->lifeCheckNodes[i];
787 thread_arg[i].lifeCheckNode = node;
788 rc = watchdog_thread_create(&thread[i], &attr, thread_ping_pgpool, (void*)&thread_arg[i]);
789 if (rc)
790 {
791 ereport(WARNING,
792 (errmsg("failed to create thread for checking pgpool status by query for %d (%s:%d)",
793 i, node->hostName, node->pgpoolPort),
794 errdetail("pthread_create failed with error code %d: %s",rc, strerror(rc))));
795 }
796 }
797
798 pthread_attr_destroy(&attr);
799
800 /* check results of queries */
801 for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
802 {
803 int result;
804 node = &gslifeCheckCluster->lifeCheckNodes[i];
805
806 ereport(DEBUG1,
807 (errmsg("checking pgpool status by query"),
808 errdetail("checking pgpool %d (%s:%d)",
809 i, node->hostName, node->pgpoolPort)));
810
811 rc = pthread_join(thread[i], (void **)&result);
812 if ((rc != 0) && (errno == EINTR))
813 {
814 usleep(100);
815 continue;
816 }
817
818 if (result == WD_OK)
819 {
820 ereport(DEBUG1,
821 (errmsg("checking pgpool status by query"),
822 errdetail("WD_OK: status: %d", node->nodeState)));
823
824 /* life point init */
825 node->retry_lives = pool_config->wd_life_point;
826 }
827 else
828 {
829 ereport(DEBUG1,
830 (errmsg("checking pgpool status by query"),
831 errdetail("NG; status: %d life:%d", node->nodeState, node->retry_lives)));
832 if (node->retry_lives > 0)
833 {
834 node->retry_lives --;
835 }
836
837 /* pgpool goes down */
838 if (node->retry_lives <= 0)
839 {
840 ereport(LOG,
841 (errmsg("checking pgpool status by query"),
842 errdetail("lifecheck failed %d times. pgpool %d (%s:%d) seems not to be working",
843 pool_config->wd_life_point, i, node->hostName, node->pgpoolPort)));
844
845 if (node->nodeState == NODE_DEAD)
846 continue;
847 node->nodeState = NODE_DEAD;
848 /* It's me! */
849 if (i == 0)
850 inform_node_status(node,"parent process is dead");
851 else
852 inform_node_status(node,"unable to connect to node");
853 }
854 }
855 }
856 }
857
858 /*
859 * Thread function to send lifecheck query to pgpool
860 * Used in wd_lifecheck.
861 */
862 static void *
thread_ping_pgpool(void * arg)863 thread_ping_pgpool(void * arg)
864 {
865 uintptr_t rtn;
866 WdPgpoolThreadArg * thread_arg = (WdPgpoolThreadArg *)arg;
867 rtn = (uintptr_t)wd_ping_pgpool(thread_arg->lifeCheckNode);
868
869 pthread_exit((void *)rtn);
870 }
871
872 /*
873 * Create connection to pgpool
874 */
875 static PGconn *
create_conn(char * hostname,int port)876 create_conn(char * hostname, int port)
877 {
878 static char conninfo[1024];
879 PGconn *conn;
880
881 if (strlen(pool_config->wd_lifecheck_dbname) == 0)
882 {
883 ereport(WARNING,
884 (errmsg("watchdog life checking, wd_lifecheck_dbname is empty")));
885 return NULL;
886 }
887
888 if (strlen(pool_config->wd_lifecheck_user) == 0)
889 {
890 ereport(WARNING,
891 (errmsg("watchdog life checking, wd_lifecheck_user is empty")));
892 return NULL;
893 }
894
895 snprintf(conninfo,sizeof(conninfo),
896 "host='%s' port='%d' dbname='%s' user='%s' password='%s' connect_timeout='%d'",
897 hostname,
898 port,
899 pool_config->wd_lifecheck_dbname,
900 pool_config->wd_lifecheck_user,
901 pool_config->wd_lifecheck_password,
902 pool_config->wd_interval / 2 + 1);
903 conn = PQconnectdb(conninfo);
904
905 if (PQstatus(conn) != CONNECTION_OK)
906 {
907 ereport(DEBUG1,
908 (errmsg("watchdog life checking"),
909 errdetail("Connection to database failed: %s", PQerrorMessage(conn))));
910 PQfinish(conn);
911 return NULL;
912 }
913 return conn;
914 }
915
916
917 /*
918 * Check if pgpool is alive using heartbeat signal.
919 */
920 static int
wd_check_heartbeat(LifeCheckNode * node)921 wd_check_heartbeat(LifeCheckNode* node)
922 {
923 int interval;
924 struct timeval tv;
925
926 if (!WD_TIME_ISSET(node->hb_last_recv_time) ||
927 !WD_TIME_ISSET(node->hb_send_time))
928 {
929 ereport(DEBUG1,
930 (errmsg("watchdog checking if pgpool is alive using heartbeat"),
931 errdetail("pgpool (%s:%d) was restarted and has not send the heartbeat signal yet",
932 node->hostName, node->pgpoolPort)));
933 return WD_OK;
934 }
935
936 gettimeofday(&tv, NULL);
937
938 interval = WD_TIME_DIFF_SEC(tv, node->hb_last_recv_time);
939 ereport(DEBUG1,
940 (errmsg("watchdog checking if pgpool is alive using heartbeat"),
941 errdetail("the last heartbeat from \"%s:%d\" received %d seconds ago",
942 node->hostName, node->pgpoolPort, interval)));
943
944 if (interval > pool_config->wd_heartbeat_deadtime)
945 {
946 return WD_NG;
947 }
948
949 if (node->nodeState == NODE_DEAD)
950 {
951 node->nodeState = NODE_ALIVE;
952 inform_node_status(node,"Heartbeat signal found");
953 }
954 return WD_OK;
955 }
956
957 /*
958 * Check if pgpool can accept the lifecheck query.
959 */
960 static int
wd_ping_pgpool(LifeCheckNode * node)961 wd_ping_pgpool(LifeCheckNode* node)
962 {
963 PGconn * conn;
964
965 conn = create_conn(node->hostName, node->pgpoolPort);
966 if (conn == NULL)
967 return WD_NG;
968 return ping_pgpool(conn);
969 }
970
971 /* inner function for issueing lifecheck query */
972 static int
ping_pgpool(PGconn * conn)973 ping_pgpool(PGconn * conn)
974 {
975 int rtn = WD_NG;
976 int status = PGRES_FATAL_ERROR;
977 PGresult * res = (PGresult *)NULL;
978
979 if (!conn)
980 {
981 return WD_NG;
982 }
983
984 res = PQexec(conn, pool_config->wd_lifecheck_query );
985
986 status = PQresultStatus(res);
987 if (res != NULL)
988 {
989 PQclear(res);
990 }
991
992 if ((status != PGRES_NONFATAL_ERROR ) &&
993 (status != PGRES_FATAL_ERROR ))
994 {
995 rtn = WD_OK;
996 }
997 PQfinish(conn);
998
999 return rtn;
1000 }
1001
1002 static int
is_parent_alive()1003 is_parent_alive()
1004 {
1005 if (mypid == getppid())
1006 return WD_OK;
1007 else
1008 return WD_NG;
1009 }
1010
1011
wd_initialize_trusted_servers_list(void)1012 static void wd_initialize_trusted_servers_list(void)
1013 {
1014 char* token;
1015 char* tmpString;
1016 const char* delimi = ",";
1017 if (g_trusted_server_list)
1018 return;
1019
1020 if (strlen(pool_config->trusted_servers) <= 0)
1021 return;
1022
1023 /* This has to be created in TopMemoryContext */
1024 MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
1025
1026 tmpString = pstrdup(pool_config->trusted_servers);
1027 for (token = strtok(tmpString, delimi); token != NULL; token = strtok(NULL, delimi))
1028 {
1029 WdUpstreamConnectionData* server = palloc(sizeof(WdUpstreamConnectionData));
1030 server->pid = 0;
1031 server->reachable = false;
1032 server->hostname = pstrdup(token);
1033 g_trusted_server_list = lappend(g_trusted_server_list,server);
1034
1035 ereport(LOG,
1036 (errmsg("watchdog lifecheck trusted server \"%s\" added for the availability check",token)));
1037 }
1038 pfree(tmpString);
1039 MemoryContextSwitchTo(oldCxt);
1040 }
1041
wd_ping_all_server(void)1042 static bool wd_ping_all_server(void)
1043 {
1044 ListCell *lc;
1045 pid_t pid;
1046 int status;
1047 int ping_process = 0;
1048
1049 POOL_SETMASK(&BlockSig);
1050
1051 foreach(lc, g_trusted_server_list)
1052 {
1053 WdUpstreamConnectionData* server = (WdUpstreamConnectionData*)lfirst(lc);
1054 if (server->pid <= 0)
1055 server->pid = wd_issue_ping_command(server->hostname,&server->outputfd);
1056
1057 if (server->pid > 0)
1058 ping_process++;
1059 }
1060
1061 while(ping_process > 0)
1062 {
1063 pid = waitpid(0, &status, 0);
1064 if (pid > 0)
1065 {
1066 /* find the server object associated with this pid */
1067 WdUpstreamConnectionData* server = wd_get_server_from_pid(pid);
1068 if (server)
1069 {
1070 ping_process--;
1071 server->reachable = wd_get_ping_result(server->hostname, status, server->outputfd);
1072 server->pid = 0;
1073 close(server->outputfd);
1074 if (server->reachable)
1075 {
1076 /* one reachable server is all we need */
1077 POOL_SETMASK(&UnBlockSig);
1078 return true;
1079 }
1080 }
1081 else
1082 {
1083 /* It was not a ping host child process */
1084 wd_reaper_lifecheck(pid,status);
1085 }
1086 }
1087 if (pid == -1) /* wait pid error */
1088 {
1089 if (errno == EINTR)
1090 continue;
1091 ereport(WARNING,
1092 (errmsg("failed to check the ping status of trusted servers"),
1093 errdetail("waitpid failed with reason: %s", strerror(errno))));
1094 break;
1095 }
1096 }
1097 POOL_SETMASK(&UnBlockSig);
1098 return false;
1099 }
1100
wd_get_server_from_pid(pid_t pid)1101 static WdUpstreamConnectionData* wd_get_server_from_pid(pid_t pid)
1102 {
1103 ListCell *lc;
1104
1105 foreach(lc, g_trusted_server_list)
1106 {
1107 WdUpstreamConnectionData* server = (WdUpstreamConnectionData*)lfirst(lc);
1108 if (server->pid == pid)
1109 {
1110 return server;
1111 }
1112 }
1113 return NULL;
1114 }
1115
1116
1117