1 /*
2  * $Header$
3  *
4  * Handles watchdog connection, and protocol communication with pgpool-II
5  *
6  * pgpool: a language independent connection pool server for PostgreSQL
7  * written by Tatsuo Ishii
8  *
9  * Copyright (c) 2003-2016	PgPool Global Development Group
10  *
11  * Permission to use, copy, modify, and distribute this software and
12  * its documentation for any purpose and without fee is hereby
13  * granted, provided that the above copyright notice appear in all
14  * copies and that both that copyright notice and this permission
15  * notice appear in supporting documentation, and that the name of the
16  * author not be used in advertising or publicity pertaining to
17  * distribution of the software without specific, written prior
18  * permission. The author makes no representations about the
19  * suitability of this software for any purpose.  It is provided "as
20  * is" without express or implied warranty.
21  *
22  */
23 #include <pthread.h>
24 #include <stdio.h>
25 #include <errno.h>
26 #include <ctype.h>
27 #include <time.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31 #include <netdb.h>
32 #include <sys/wait.h>
33 
34 #include "pool.h"
35 #include "pool_config.h"
36 #include "utils/elog.h"
37 #include "utils/json.h"
38 #include "utils/json_writer.h"
39 #include "utils/elog.h"
40 #include "utils/palloc.h"
41 #include "utils/memutils.h"
42 
43 #include "watchdog/wd_utils.h"
44 #include "watchdog/wd_lifecheck.h"
45 #include "watchdog/wd_ipc_defines.h"
46 #include "watchdog/wd_ipc_commands.h"
47 #include "watchdog/wd_json_data.h"
48 
49 #include "libpq-fe.h"
50 
51 #define LIFECHECK_GETNODE_WAIT_SEC_COUNT 5 /* max number of seconds the lifecheck process
52 											* should waits before giving up
53 											* while fetching the configured watchdog node
54 											* information from watchdog process through IPC channel
55 											*/
56 
57 /*
58  * thread argument for lifecheck of pgpool
59  */
60 typedef struct {
61 	LifeCheckNode *lifeCheckNode;
62 	int retry;		/* retry times (not used?)*/
63 } WdPgpoolThreadArg;
64 
65 typedef struct WdUpstreamConnectionData
66 {
67 	char*		hostname;	/* host name of server */
68 	pid_t		pid;		/* pid of ping process */
69 	bool		reachable;	/* true if last ping was successful */
70 	int			outputfd;	/* pipe fd linked to output of ping process */
71 }WdUpstreamConnectionData;
72 
73 
74 List* g_trusted_server_list = NIL;
75 
76 static void wd_initialize_trusted_servers_list(void);
77 static bool wd_ping_all_server(void);
78 static WdUpstreamConnectionData* wd_get_server_from_pid(pid_t pid);
79 
80 static void * thread_ping_pgpool(void * arg);
81 static PGconn * create_conn(char * hostname, int port);
82 
83 static pid_t lifecheck_main(void);
84 static void check_pgpool_status(void);
85 static void check_pgpool_status_by_query(void);
86 static void check_pgpool_status_by_hb(void);
87 static int ping_pgpool(PGconn * conn);
88 static int is_parent_alive(void);
89 static bool fetch_watchdog_nodes_data(void);
90 static int wd_check_heartbeat(LifeCheckNode* node);
91 
92 static void load_watchdog_nodes_from_json(char* json_data, int len);
93 static void spawn_lifecheck_children(void);
94 
95 static RETSIGTYPE lifecheck_exit_handler(int sig);
96 static RETSIGTYPE reap_handler(int sig);
97 static pid_t wd_reaper_lifecheck(pid_t pid, int status);
98 
99 static bool lifecheck_kill_all_children(int sig);
100 static const char* lifecheck_child_name(pid_t pid);
101 static void reaper(void);
102 static int is_wd_lifecheck_ready(void);
103 static int wd_lifecheck(void);
104 static int wd_ping_pgpool(LifeCheckNode* node);
105 static pid_t fork_lifecheck_child(void);
106 
107 
108 LifeCheckCluster* gslifeCheckCluster = NULL; /* lives in shared memory */
109 
110 pid_t *g_hb_receiver_pid = NULL; /* Array of heart beat receiver child pids */
111 pid_t *g_hb_sender_pid = NULL;   /* Array of heart beat sender child pids */
112 static volatile sig_atomic_t sigchld_request = 0;
113 
114 
115 /*
116  * handle SIGCHLD
117  */
reap_handler(int sig)118 static RETSIGTYPE reap_handler(int sig)
119 {
120 	POOL_SETMASK(&BlockSig);
121 	sigchld_request = 1;
122 	POOL_SETMASK(&UnBlockSig);
123 }
124 
125 
lifecheck_child_name(pid_t pid)126 static const char* lifecheck_child_name(pid_t pid)
127 {
128 	int i;
129 	for (i = 0; i < pool_config->num_hb_if; i++)
130 	{
131 		if (g_hb_receiver_pid && pid == g_hb_receiver_pid[i])
132 			return "heartBeat receiver";
133 		else if (g_hb_sender_pid && pid == g_hb_sender_pid[i])
134 			return "heartBeat sender";
135 	}
136 	/* Check if it was a ping to trusted server process */
137 	WdUpstreamConnectionData* server = wd_get_server_from_pid(pid);
138 	if (server)
139 		return "trusted host ping process";
140 	return "unknown";
141 }
142 
reaper(void)143 static void reaper(void)
144 {
145 	pid_t pid;
146 	int status;
147 
148 	ereport(DEBUG1,
149 			(errmsg("Lifecheck child reaper handler")));
150 
151 	/* clear SIGCHLD request */
152 	sigchld_request = 0;
153 
154 	while ((pid = pool_waitpid(&status)) > 0)
155 	{
156 		/* First check if it is the trusted server ping process */
157 		WdUpstreamConnectionData* server = wd_get_server_from_pid(pid);
158 		if (server)
159 		{
160 			server->reachable = wd_get_ping_result(server->hostname, status, server->outputfd);
161 			server->pid = 0;
162 			close(server->outputfd);
163 		}
164 		else
165 			wd_reaper_lifecheck(pid,status);
166 	}
167 }
168 
169 static pid_t
wd_reaper_lifecheck(pid_t pid,int status)170 wd_reaper_lifecheck(pid_t pid, int status)
171 {
172 	int i;
173 	bool restart_child = true;
174 	const char* proc_name = lifecheck_child_name(pid);
175 
176 	if(WIFEXITED(status))
177 	{
178 		if(WEXITSTATUS(status) == POOL_EXIT_FATAL)
179 			ereport(LOG,
180 					(errmsg("lifecheck child process (%s) with pid: %d exit with FATAL ERROR.",proc_name, pid)));
181 		else if(WEXITSTATUS(status) == POOL_EXIT_NO_RESTART)
182 		{
183 			restart_child = false;
184 			ereport(LOG,
185 					(errmsg("lifecheck child process (%s) with pid: %d exit with SUCCESS.", lifecheck_child_name(pid), pid)));
186 		}
187 	}
188 	if (WIFSIGNALED(status))
189 	{
190 		/* Child terminated by segmentation fault. Report it */
191 		if(WTERMSIG(status) == SIGSEGV)
192 			ereport(WARNING,
193 					(errmsg("lifecheck child process (%s) with pid: %d was terminated by segmentation fault",proc_name,pid)));
194 		else
195 			ereport(LOG,
196 					(errmsg("lifecheck child process (%s) with pid: %d exits with status %d by signal %d", proc_name, pid, status, WTERMSIG(status))));
197 	}
198 	else
199 		ereport(LOG,
200 				(errmsg("lifecheck child process (%s) with pid: %d exits with status %d",proc_name, pid, status)));
201 
202 	if (g_hb_receiver_pid == NULL && g_hb_sender_pid == NULL)
203 		return -1;
204 
205 	for (i = 0; i < pool_config->num_hb_if; i++)
206 	{
207 		if (g_hb_receiver_pid && pid == g_hb_receiver_pid[i])
208 		{
209 			if(restart_child)
210 			{
211 				g_hb_receiver_pid[i] = wd_hb_receiver(1, &(pool_config->hb_if[i]));
212 				ereport(LOG,
213 						(errmsg("fork a new %s process with pid: %d",proc_name, g_hb_receiver_pid[i])));
214 			}
215 			else
216 				g_hb_receiver_pid[i] = 0;
217 
218 			return g_hb_receiver_pid[i];
219 		}
220 
221 		else if (g_hb_sender_pid && pid == g_hb_sender_pid[i])
222 		{
223 			if(restart_child)
224 			{
225 				g_hb_sender_pid[i] = wd_hb_sender(1, &(pool_config->hb_if[i]));
226 				ereport(LOG,
227 						(errmsg("fork a new %s process with pid: %d",proc_name, g_hb_sender_pid[i])));
228 			}
229 			else
230 				g_hb_sender_pid[i] = 0;
231 
232 			return g_hb_sender_pid[i];
233 		}
234 	}
235 	return -1;
236 }
237 
lifecheck_kill_all_children(int sig)238 static bool lifecheck_kill_all_children(int sig)
239 {
240 	bool ret = false;
241 	if (pool_config->wd_lifecheck_method == LIFECHECK_BY_HB
242 		&& g_hb_receiver_pid && g_hb_sender_pid)
243 	{
244 		int i;
245 		for (i = 0; i < pool_config->num_hb_if; i++)
246 		{
247 			pid_t pid_child = g_hb_receiver_pid[i];
248 
249 			if (pid_child > 0)
250 			{
251 				kill(pid_child,sig);
252 				ret = true;
253 			}
254 
255 			pid_child = g_hb_sender_pid[i];
256 
257 			if (pid_child > 0)
258 			{
259 				kill(pid_child,sig);
260 				ret = true;
261 			}
262 		}
263 	}
264 	return ret;
265 }
266 
267 static RETSIGTYPE
lifecheck_exit_handler(int sig)268 lifecheck_exit_handler(int sig)
269 {
270 	pid_t wpid;
271 	bool child_killed;
272 
273 	POOL_SETMASK(&AuthBlockSig);
274 	ereport(DEBUG1,
275 			(errmsg("lifecheck child receives shutdown request signal %d, forwarding to all children", sig)));
276 
277 	if (sig == SIGTERM) /* smart shutdown */
278 	{
279 		ereport(DEBUG1,
280 				(errmsg("lifecheck child receives smart shutdown request")));
281 	}
282 	else if (sig == SIGINT)
283 	{
284 		ereport(DEBUG1,
285 				(errmsg("lifecheck child receives fast shutdown request")));
286 	}
287 	else if (sig == SIGQUIT)
288 	{
289 		ereport(DEBUG1,
290 				(errmsg("lifecheck child receives immediate shutdown request")));
291 	}
292 
293 	child_killed = lifecheck_kill_all_children(sig);
294 
295 	POOL_SETMASK(&UnBlockSig);
296 
297 	if (child_killed)
298 	{
299 		do
300 		{
301 			wpid = wait(NULL);
302 		}while (wpid > 0 || (wpid == -1 && errno == EINTR));
303 
304 		if (wpid == -1 && errno != ECHILD)
305 			ereport(WARNING,
306 					(errmsg("wait() on lifecheck children failed. reason:%s", strerror(errno))));
307 
308 		if (g_hb_receiver_pid)
309 			pfree(g_hb_receiver_pid);
310 
311 		if (g_hb_sender_pid)
312 			pfree(g_hb_sender_pid);
313 
314 		g_hb_receiver_pid = NULL;
315 		g_hb_sender_pid = NULL;
316 	}
317 	exit(0);
318 }
319 
initialize_watchdog_lifecheck(void)320 pid_t initialize_watchdog_lifecheck(void)
321 {
322 	if (!pool_config->use_watchdog)
323 		return 0;
324 
325 	if (pool_config->wd_lifecheck_method == LIFECHECK_BY_EXTERNAL)
326 		return 0;
327 
328 	return fork_lifecheck_child();
329 }
330 
331 /*
332  * fork a child for lifecheck
333  */
fork_lifecheck_child(void)334 static pid_t fork_lifecheck_child(void)
335 {
336 	pid_t pid;
337 
338 	pid = fork();
339 
340 	if (pid == 0)
341 	{
342 		on_exit_reset();
343 
344 		/* Set the process type variable */
345 		processType = PT_LIFECHECK;
346 
347 		/* call lifecheck child main */
348 		POOL_SETMASK(&UnBlockSig);
349 		lifecheck_main();
350 	}
351 	else if (pid == -1)
352 	{
353 		ereport(FATAL,
354 				(errmsg("fork() failed. reason: %s", strerror(errno))));
355 	}
356 
357 	return pid;
358 }
359 
360 
361 /* main entry point of watchdog lifecheck process*/
362 static pid_t
lifecheck_main(void)363 lifecheck_main(void)
364 {
365 	sigjmp_buf	local_sigjmp_buf;
366 	int i;
367 
368 	ereport(DEBUG1,
369 			(errmsg("I am watchdog lifecheck child with pid:%d",getpid())));
370 
371 	/* Identify myself via ps */
372 	init_ps_display("", "", "", "");
373 
374 	pool_signal(SIGTERM, lifecheck_exit_handler);
375 	pool_signal(SIGINT, lifecheck_exit_handler);
376 	pool_signal(SIGQUIT, lifecheck_exit_handler);
377 	pool_signal(SIGCHLD, reap_handler);
378 	pool_signal(SIGHUP, SIG_IGN);
379 	pool_signal(SIGPIPE, SIG_IGN);
380 
381 	/* Create per loop iteration memory context */
382 	ProcessLoopContext = AllocSetContextCreate(TopMemoryContext,
383 											   "wd_lifecheck_main_loop",
384 											   ALLOCSET_DEFAULT_MINSIZE,
385 											   ALLOCSET_DEFAULT_INITSIZE,
386 											   ALLOCSET_DEFAULT_MAXSIZE);
387 
388 	MemoryContextSwitchTo(TopMemoryContext);
389 
390 	set_ps_display("lifecheck",false);
391 
392 	/* Get the list of watchdog node to monitor
393 	 * from watchdog process
394 	 */
395 	for (i =0; i < LIFECHECK_GETNODE_WAIT_SEC_COUNT; i++)
396 	{
397 		if (fetch_watchdog_nodes_data() == true)
398 			break;
399 		sleep(1);
400 	}
401 
402 	if (!gslifeCheckCluster)
403 		ereport(ERROR,
404 				(errmsg("unable to initialize lifecheck, watchdog not responding")));
405 
406 	spawn_lifecheck_children();
407 
408 	wd_initialize_trusted_servers_list();
409 
410 	/* wait until ready to go */
411 	while (WD_OK != is_wd_lifecheck_ready())
412 	{
413 		sleep(pool_config->wd_interval * 10);
414 	}
415 
416 	ereport(LOG,
417 			(errmsg("watchdog: lifecheck started")));
418 
419 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
420 	{
421 		/* Since not using PG_TRY, must reset error stack by hand */
422 		error_context_stack = NULL;
423 
424 		EmitErrorReport();
425 		MemoryContextSwitchTo(TopMemoryContext);
426 		FlushErrorState();
427 		sleep(pool_config->wd_heartbeat_keepalive);
428 	}
429 
430 	/* We can now handle ereport(ERROR) */
431 	PG_exception_stack = &local_sigjmp_buf;
432 
433 	/* watchdog loop */
434 	for (;;)
435 	{
436 		MemoryContextSwitchTo(ProcessLoopContext);
437 		MemoryContextResetAndDeleteChildren(ProcessLoopContext);
438 
439 		if (sigchld_request)
440 			reaper();
441 
442 		/* pgpool life check */
443 		wd_lifecheck();
444 		sleep(pool_config->wd_interval);
445 	}
446 
447 	return 0;
448 }
449 
spawn_lifecheck_children(void)450 static void spawn_lifecheck_children(void)
451 {
452 	if (pool_config->wd_lifecheck_method == LIFECHECK_BY_HB)
453 	{
454 		int i;
455 		g_hb_receiver_pid = palloc0(sizeof(pid_t) * pool_config->num_hb_if);
456 		g_hb_sender_pid = palloc0(sizeof(pid_t) * pool_config->num_hb_if);
457 
458 		for (i = 0; i < pool_config->num_hb_if; i++)
459 		{
460 			/* heartbeat receiver process */
461 			g_hb_receiver_pid[i] = wd_hb_receiver(1, &(pool_config->hb_if[i]));
462 
463 			/* heartbeat sender process */
464 			g_hb_sender_pid[i] = wd_hb_sender(1, &(pool_config->hb_if[i]));
465 		}
466 	}
467 }
468 
print_lifecheck_cluster(void)469 static void print_lifecheck_cluster(void)
470 {
471 	int i;
472 	if (!gslifeCheckCluster)
473 		return;
474 	ereport(LOG,
475 			(errmsg("%d watchdog nodes are configured for lifecheck",gslifeCheckCluster->nodeCount)));
476 	for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
477 	{
478 		ereport(LOG,
479 				(errmsg("watchdog nodes ID:%d Name:\"%s\"",gslifeCheckCluster->lifeCheckNodes[i].ID,gslifeCheckCluster->lifeCheckNodes[i].nodeName),
480 				 errdetail("Host:\"%s\" WD Port:%d pgpool-II port:%d",
481 						   gslifeCheckCluster->lifeCheckNodes[i].hostName,
482 						   gslifeCheckCluster->lifeCheckNodes[i].wdPort,
483 						   gslifeCheckCluster->lifeCheckNodes[i].pgpoolPort)));
484 	}
485 }
486 
inform_node_status(LifeCheckNode * node,char * message)487 static bool inform_node_status(LifeCheckNode* node, char *message)
488 {
489 	int node_status,x;
490 	char* json_data;
491 	WDIPCCmdResult* res = NULL;
492 	char* new_status;
493 
494 	if (node->nodeState == NODE_DEAD)
495 	{
496 		new_status = "NODE DEAD";
497 		node_status = WD_LIFECHECK_NODE_STATUS_DEAD;
498 	}
499 	else if (node->nodeState == NODE_ALIVE)
500 	{
501 		new_status = "NODE ALIVE";
502 		node_status = WD_LIFECHECK_NODE_STATUS_ALIVE;
503 	}
504 	else
505 		return false;
506 
507 	ereport(LOG,
508 			(errmsg("informing the node status change to watchdog"),
509 				 errdetail("node id :%d status = \"%s\" message:\"%s\"",node->ID,new_status,message)));
510 
511 	json_data = get_lifecheck_node_status_change_json(node->ID, node_status, message, pool_config->wd_authkey);
512 	if (json_data == NULL)
513 		return false;
514 
515 	for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
516 	{
517 		res = issue_command_to_watchdog(WD_NODE_STATUS_CHANGE_COMMAND ,0, json_data, strlen(json_data),false);
518 		if (res)
519 			break;
520 		sleep(1);
521 	}
522 	pfree(json_data);
523 	if (res)
524 	{
525 		pfree(res);
526 		return true;
527 	}
528 	return false;
529 }
530 
fetch_watchdog_nodes_data(void)531 static bool fetch_watchdog_nodes_data(void)
532 {
533 	char* json_data = wd_get_watchdog_nodes(-1);
534 	if (json_data == NULL)
535 	{
536 		ereport(ERROR,
537 				(errmsg("get node list command reply contains no data")));
538 		return false;
539 	}
540 	load_watchdog_nodes_from_json(json_data,strlen(json_data));
541 	pfree(json_data);
542 	return true;
543 }
544 
load_watchdog_nodes_from_json(char * json_data,int len)545 static void load_watchdog_nodes_from_json(char* json_data, int len)
546 {
547 	json_value* root;
548 	json_value* value;
549 	int i,nodeCount;
550 
551 	root = json_parse(json_data,len);
552 
553 	/* The root node must be object */
554 	if (root == NULL || root->type != json_object)
555 	{
556 		json_value_free(root);
557 		ereport(ERROR,
558 			(errmsg("unable to parse json data for node list")));
559 	}
560 
561 	if (json_get_int_value_for_key(root, "NodeCount", &nodeCount))
562 	{
563 		json_value_free(root);
564 		ereport(ERROR,
565 			(errmsg("invalid json data"),
566 				 errdetail("unable to find NodeCount node from data")));
567 	}
568 
569 	/* find the WatchdogNodes array */
570 	value = json_get_value_for_key(root,"WatchdogNodes");
571 	if (value == NULL)
572 	{
573 		json_value_free(root);
574 		ereport(ERROR,
575 				(errmsg("invalid json data"),
576 				 errdetail("unable to find WatchdogNodes node from data")));
577 	}
578 	if (value->type != json_array)
579 	{
580 		json_value_free(root);
581 		ereport(ERROR,
582 			(errmsg("invalid json data"),
583 				 errdetail("WatchdogNodes node does not contains Array")));
584 	}
585 	if (nodeCount != value->u.array.length)
586 	{
587 		json_value_free(root);
588 		ereport(ERROR,
589 			(errmsg("invalid json data"),
590 				 errdetail("WatchdogNodes array contains %d nodes while expecting %d",value->u.array.length, nodeCount)));
591 	}
592 
593 	/* okay we are done, put this in shared memory */
594 	gslifeCheckCluster = pool_shared_memory_create(sizeof(LifeCheckCluster));
595 	gslifeCheckCluster->nodeCount = nodeCount;
596 	gslifeCheckCluster->lifeCheckNodes = pool_shared_memory_create(sizeof(LifeCheckNode) * gslifeCheckCluster->nodeCount);
597 	for (i = 0; i < nodeCount; i++)
598 	{
599 		WDNodeInfo *nodeInfo = get_WDNodeInfo_from_wd_node_json(value->u.array.values[i]);
600 
601 		gslifeCheckCluster->lifeCheckNodes[i].nodeState = NODE_EMPTY;
602 		gslifeCheckCluster->lifeCheckNodes[i].ID = nodeInfo->id;
603 		strcpy(gslifeCheckCluster->lifeCheckNodes[i].hostName, nodeInfo->hostName);
604 		strcpy(gslifeCheckCluster->lifeCheckNodes[i].nodeName, nodeInfo->nodeName);
605 		gslifeCheckCluster->lifeCheckNodes[i].wdPort = nodeInfo->wd_port;
606 		gslifeCheckCluster->lifeCheckNodes[i].pgpoolPort = nodeInfo->pgpool_port;
607 		gslifeCheckCluster->lifeCheckNodes[i].retry_lives = pool_config->wd_life_point;
608 		pfree(nodeInfo);
609 	}
610 	print_lifecheck_cluster();
611 	json_value_free(root);
612 }
613 
614 
is_wd_lifecheck_ready(void)615 static int is_wd_lifecheck_ready(void)
616 {
617 	int rtn = WD_OK;
618 	int i;
619 
620 	for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
621 	{
622 		LifeCheckNode* node = &gslifeCheckCluster->lifeCheckNodes[i];
623 		/* query mode */
624 		if (pool_config->wd_lifecheck_method == LIFECHECK_BY_QUERY)
625 		{
626 			if (wd_ping_pgpool(node) == WD_NG)
627 			{
628 				ereport(DEBUG1,
629 					(errmsg("watchdog checking life check is ready"),
630 						errdetail("pgpool:%d at \"%s:%d\" has not started yet",
631 							   i, node->hostName, node->pgpoolPort)));
632 				rtn = WD_NG;
633 			}
634 		}
635 		/* heartbeat mode */
636 		else if (pool_config->wd_lifecheck_method == LIFECHECK_BY_HB)
637 		{
638 			if (node->ID == 0) /* local node */
639 				continue;
640 
641 			if (!WD_TIME_ISSET(node->hb_last_recv_time) ||
642 			    !WD_TIME_ISSET(node->hb_send_time))
643 			{
644 				ereport(DEBUG1,
645 					(errmsg("watchdog checking life check is ready"),
646 						errdetail("pgpool:%d at \"%s:%d\" has not send the heartbeat signal yet",
647 							   i, node->hostName, node->pgpoolPort)));
648 				rtn = WD_NG;
649 			}
650 		}
651 		/* otherwise */
652 		else
653 		{
654 			ereport(ERROR,
655 				(errmsg("checking if watchdog is ready, unkown watchdog mode \"%d\"",
656 							pool_config->wd_lifecheck_method)));
657 		}
658 	}
659 
660 	return rtn;
661 }
662 
663 /*
664  * Check if pgpool is living
665  */
wd_lifecheck(void)666 static int wd_lifecheck(void)
667 {
668 	/* check upper connection */
669 	if (strlen(pool_config->trusted_servers))
670 	{
671 		if(wd_ping_all_server() == false)
672 		{
673 			LifeCheckNode* node = &gslifeCheckCluster->lifeCheckNodes[0];
674 
675 			if (node->nodeState != NODE_DEAD)
676 			{
677 				node->nodeState = NODE_DEAD;
678 				ereport(WARNING,
679 						(errmsg("watchdog lifecheck, failed to connect to any trusted servers")));
680 
681 				inform_node_status(node,"trusted server is unreachable");
682 			}
683 			return WD_NG;
684 		}
685 	}
686 
687 	/* skip lifecheck during recovery execution */
688 	if (*InRecovery != RECOVERY_INIT)
689 	{
690 		return WD_OK;
691 	}
692 
693 	/* check and update pgpool status */
694 	check_pgpool_status();
695 
696 	return WD_OK;
697 }
698 
699 /*
700  * check and update pgpool status
701  */
702 static void
check_pgpool_status()703 check_pgpool_status()
704 {
705 	/* query mode */
706 	if (pool_config->wd_lifecheck_method == LIFECHECK_BY_QUERY)
707 	{
708 		check_pgpool_status_by_query();
709 	}
710 	/* heartbeat mode */
711 	else if (pool_config->wd_lifecheck_method == LIFECHECK_BY_HB)
712 	{
713 		check_pgpool_status_by_hb();
714 	}
715 }
716 
717 static void
check_pgpool_status_by_hb(void)718 check_pgpool_status_by_hb(void)
719 {
720 	int i;
721 	struct timeval tv;
722 	LifeCheckNode* node = &gslifeCheckCluster->lifeCheckNodes[0];
723 	gettimeofday(&tv, NULL);
724 
725 	/* about myself */
726 	/* parent is dead so it's orphan.... */
727 	if (is_parent_alive() == WD_NG && node->nodeState != NODE_DEAD)
728 	{
729 		node->nodeState = NODE_DEAD;
730 		ereport(LOG,
731 				(errmsg("checking pgpool status by heartbeat"),
732 					errdetail("lifecheck failed. pgpool (%s:%d) seems not to be working",
733 						   node->hostName, node->pgpoolPort)));
734 
735 		inform_node_status(node,"parent process is dead");
736 
737 	}
738 
739 
740 	for (i = 1; i< gslifeCheckCluster->nodeCount; i++)
741 	{
742 		node = &gslifeCheckCluster->lifeCheckNodes[i];
743 		ereport(DEBUG1,
744 			(errmsg("watchdog life checking by heartbeat"),
745 				errdetail("checking pgpool %d (%s:%d)",
746 					   i, node->hostName, node->pgpoolPort)));
747 
748 		if (wd_check_heartbeat(node) == WD_NG)
749 		{
750 			ereport(DEBUG2,
751 				(errmsg("checking pgpool status by heartbeat"),
752 					 errdetail("lifecheck failed. pgpool: %d at \"%s:%d\" seems not to be working",
753 							   i, node->hostName, node->pgpoolPort)));
754 
755 			if (node->nodeState != NODE_DEAD)
756 			{
757 				node->nodeState = NODE_DEAD;
758 				inform_node_status(node,"No heartbeat signal from node");
759 			}
760 		}
761 		else
762 		{
763 			ereport(DEBUG1,
764 				(errmsg("checking pgpool status by heartbeat"),
765 					 errdetail("OK; status OK")));
766 		}
767 	}
768 }
769 
770 static void
check_pgpool_status_by_query(void)771 check_pgpool_status_by_query(void)
772 {
773 	pthread_attr_t attr;
774 	pthread_t thread[MAX_WATCHDOG_NUM];
775 	WdPgpoolThreadArg thread_arg[MAX_WATCHDOG_NUM];
776 	LifeCheckNode* node;
777 	int rc,i;
778 
779 	/* thread init */
780 	pthread_attr_init(&attr);
781 	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
782 
783 	/* send queries to all pgpools using threads */
784 	for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
785 	{
786 		node = &gslifeCheckCluster->lifeCheckNodes[i];
787 		thread_arg[i].lifeCheckNode = node;
788 		rc = watchdog_thread_create(&thread[i], &attr, thread_ping_pgpool, (void*)&thread_arg[i]);
789 		if (rc)
790 		{
791 			ereport(WARNING,
792 					(errmsg("failed to create thread for checking pgpool status by query for  %d (%s:%d)",
793 							i, node->hostName, node->pgpoolPort),
794 					 errdetail("pthread_create failed with error code %d: %s",rc, strerror(rc))));
795 		}
796 	}
797 
798 	pthread_attr_destroy(&attr);
799 
800 	/* check results of queries */
801 	for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
802 	{
803 		int result;
804 		node = &gslifeCheckCluster->lifeCheckNodes[i];
805 
806 		ereport(DEBUG1,
807 				(errmsg("checking pgpool status by query"),
808 					errdetail("checking pgpool %d (%s:%d)",
809 						   i, node->hostName, node->pgpoolPort)));
810 
811 		rc = pthread_join(thread[i], (void **)&result);
812 		if ((rc != 0) && (errno == EINTR))
813 		{
814 			usleep(100);
815 			continue;
816 		}
817 
818 		if (result == WD_OK)
819 		{
820 			ereport(DEBUG1,
821 				(errmsg("checking pgpool status by query"),
822 					 errdetail("WD_OK: status: %d", node->nodeState)));
823 
824 			/* life point init */
825 			node->retry_lives = pool_config->wd_life_point;
826 		}
827 		else
828 		{
829 			ereport(DEBUG1,
830 				(errmsg("checking pgpool status by query"),
831 					 errdetail("NG; status: %d life:%d", node->nodeState, node->retry_lives)));
832 			if (node->retry_lives > 0)
833 			{
834 				node->retry_lives --;
835 			}
836 
837 			/* pgpool goes down */
838 			if (node->retry_lives <= 0)
839 			{
840 				ereport(LOG,
841 					(errmsg("checking pgpool status by query"),
842 						errdetail("lifecheck failed %d times. pgpool %d (%s:%d) seems not to be working",
843 								   pool_config->wd_life_point, i, node->hostName, node->pgpoolPort)));
844 
845 				if (node->nodeState == NODE_DEAD)
846 					continue;
847 				node->nodeState = NODE_DEAD;
848 				/* It's me! */
849 				if (i == 0)
850 					inform_node_status(node,"parent process is dead");
851 				else
852 					inform_node_status(node,"unable to connect to node");
853 			}
854 		}
855 	}
856 }
857 
858 /*
859  * Thread function to send lifecheck query to pgpool
860  * Used in wd_lifecheck.
861  */
862 static void *
thread_ping_pgpool(void * arg)863 thread_ping_pgpool(void * arg)
864 {
865 	uintptr_t rtn;
866 	WdPgpoolThreadArg * thread_arg = (WdPgpoolThreadArg *)arg;
867 	rtn = (uintptr_t)wd_ping_pgpool(thread_arg->lifeCheckNode);
868 
869 	pthread_exit((void *)rtn);
870 }
871 
872 /*
873  * Create connection to pgpool
874  */
875 static PGconn *
create_conn(char * hostname,int port)876 create_conn(char * hostname, int port)
877 {
878 	static char conninfo[1024];
879 	PGconn *conn;
880 
881 	if (strlen(pool_config->wd_lifecheck_dbname) == 0)
882 	{
883 		ereport(WARNING,
884 				(errmsg("watchdog life checking, wd_lifecheck_dbname is empty")));
885 		return NULL;
886 	}
887 
888 	if (strlen(pool_config->wd_lifecheck_user) == 0)
889 	{
890 		ereport(WARNING,
891 				(errmsg("watchdog life checking, wd_lifecheck_user is empty")));
892 		return NULL;
893 	}
894 
895 	snprintf(conninfo,sizeof(conninfo),
896 		"host='%s' port='%d' dbname='%s' user='%s' password='%s' connect_timeout='%d'",
897 		hostname,
898 		port,
899 		pool_config->wd_lifecheck_dbname,
900 		pool_config->wd_lifecheck_user,
901 		pool_config->wd_lifecheck_password,
902 		pool_config->wd_interval / 2 + 1);
903 	conn = PQconnectdb(conninfo);
904 
905 	if (PQstatus(conn) != CONNECTION_OK)
906 	{
907 		ereport(DEBUG1,
908 			(errmsg("watchdog life checking"),
909 				 errdetail("Connection to database failed: %s", PQerrorMessage(conn))));
910 		PQfinish(conn);
911 		return NULL;
912 	}
913 	return conn;
914 }
915 
916 
917 /*
918  * Check if pgpool is alive using heartbeat signal.
919  */
920 static int
wd_check_heartbeat(LifeCheckNode * node)921 wd_check_heartbeat(LifeCheckNode* node)
922 {
923 	int interval;
924 	struct timeval tv;
925 
926 	if (!WD_TIME_ISSET(node->hb_last_recv_time) ||
927 	    !WD_TIME_ISSET(node->hb_send_time))
928 	{
929 		ereport(DEBUG1,
930 			(errmsg("watchdog checking if pgpool is alive using heartbeat"),
931 				errdetail("pgpool (%s:%d) was restarted and has not send the heartbeat signal yet",
932 					   node->hostName, node->pgpoolPort)));
933 		return WD_OK;
934 	}
935 
936 	gettimeofday(&tv, NULL);
937 
938 	interval = WD_TIME_DIFF_SEC(tv, node->hb_last_recv_time);
939 	ereport(DEBUG1,
940 		(errmsg("watchdog checking if pgpool is alive using heartbeat"),
941 			errdetail("the last heartbeat from \"%s:%d\" received %d seconds ago",
942 				   node->hostName, node->pgpoolPort, interval)));
943 
944 	if (interval > pool_config->wd_heartbeat_deadtime)
945 	{
946 		return WD_NG;
947 	}
948 
949 	if (node->nodeState == NODE_DEAD)
950 	{
951 		node->nodeState = NODE_ALIVE;
952 		inform_node_status(node,"Heartbeat signal found");
953 	}
954 	return WD_OK;
955 }
956 
957 /*
958  * Check if pgpool can accept the lifecheck query.
959  */
960 static int
wd_ping_pgpool(LifeCheckNode * node)961 wd_ping_pgpool(LifeCheckNode* node)
962 {
963 	PGconn * conn;
964 
965 	conn = create_conn(node->hostName, node->pgpoolPort);
966 	if (conn == NULL)
967 		return WD_NG;
968 	return ping_pgpool(conn);
969 }
970 
971 /* inner function for issueing lifecheck query */
972 static int
ping_pgpool(PGconn * conn)973 ping_pgpool(PGconn * conn)
974 {
975 	int rtn = WD_NG;
976 	int status = PGRES_FATAL_ERROR;
977 	PGresult * res = (PGresult *)NULL;
978 
979 	if (!conn)
980 	{
981 		return WD_NG;
982 	}
983 
984 	res = PQexec(conn, pool_config->wd_lifecheck_query );
985 
986 	status = PQresultStatus(res);
987 	if (res != NULL)
988 	{
989 		PQclear(res);
990 	}
991 
992 	if ((status != PGRES_NONFATAL_ERROR ) &&
993 		(status != PGRES_FATAL_ERROR ))
994 	{
995 		rtn = WD_OK;
996 	}
997 	PQfinish(conn);
998 
999 	return rtn;
1000 }
1001 
1002 static int
is_parent_alive()1003 is_parent_alive()
1004 {
1005 	if (mypid == getppid())
1006 		return WD_OK;
1007 	else
1008 		return WD_NG;
1009 }
1010 
1011 
wd_initialize_trusted_servers_list(void)1012 static void wd_initialize_trusted_servers_list(void)
1013 {
1014 	char* token;
1015 	char* tmpString;
1016 	const char* delimi = ",";
1017 	if (g_trusted_server_list)
1018 		return;
1019 
1020 	if (strlen(pool_config->trusted_servers) <= 0)
1021 		return;
1022 
1023 	/* This has to be created in TopMemoryContext */
1024 	MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
1025 
1026 	tmpString = pstrdup(pool_config->trusted_servers);
1027 	for (token = strtok(tmpString, delimi); token != NULL; token = strtok(NULL, delimi))
1028 	{
1029 		WdUpstreamConnectionData* server = palloc(sizeof(WdUpstreamConnectionData));
1030 		server->pid = 0;
1031 		server->reachable = false;
1032 		server->hostname = pstrdup(token);
1033 		g_trusted_server_list = lappend(g_trusted_server_list,server);
1034 
1035 		ereport(LOG,
1036 			(errmsg("watchdog lifecheck trusted server \"%s\" added for the availability check",token)));
1037 	}
1038 	pfree(tmpString);
1039 	MemoryContextSwitchTo(oldCxt);
1040 }
1041 
wd_ping_all_server(void)1042 static bool wd_ping_all_server(void)
1043 {
1044 	ListCell *lc;
1045 	pid_t pid;
1046 	int status;
1047 	int ping_process = 0;
1048 
1049 	POOL_SETMASK(&BlockSig);
1050 
1051 	foreach(lc, g_trusted_server_list)
1052 	{
1053 		WdUpstreamConnectionData* server = (WdUpstreamConnectionData*)lfirst(lc);
1054 		if (server->pid <= 0)
1055 			server->pid = wd_issue_ping_command(server->hostname,&server->outputfd);
1056 
1057 		if (server->pid > 0)
1058 			ping_process++;
1059 	}
1060 
1061 	while(ping_process > 0)
1062 	{
1063 		pid = waitpid(0, &status, 0);
1064 		if (pid > 0)
1065 		{
1066 			/* find the server object associated with this pid */
1067 			WdUpstreamConnectionData* server = wd_get_server_from_pid(pid);
1068 			if (server)
1069 			{
1070 				ping_process--;
1071 				server->reachable = wd_get_ping_result(server->hostname, status, server->outputfd);
1072 				server->pid = 0;
1073 				close(server->outputfd);
1074 				if (server->reachable)
1075 				{
1076 					/* one reachable server is all we need */
1077 					POOL_SETMASK(&UnBlockSig);
1078 					return true;
1079 				}
1080 			}
1081 			else
1082 			{
1083 				/* It was not a ping host child process */
1084 				wd_reaper_lifecheck(pid,status);
1085 			}
1086 		}
1087 		if (pid == -1) /* wait pid error */
1088 		{
1089 			if (errno == EINTR)
1090 				continue;
1091 			ereport(WARNING,
1092 				(errmsg("failed to check the ping status of trusted servers"),
1093 					 errdetail("waitpid failed with reason: %s", strerror(errno))));
1094 			break;
1095 		}
1096 	}
1097 	POOL_SETMASK(&UnBlockSig);
1098 	return false;
1099 }
1100 
wd_get_server_from_pid(pid_t pid)1101 static WdUpstreamConnectionData* wd_get_server_from_pid(pid_t pid)
1102 {
1103 	ListCell *lc;
1104 
1105 	foreach(lc, g_trusted_server_list)
1106 	{
1107 		WdUpstreamConnectionData* server = (WdUpstreamConnectionData*)lfirst(lc);
1108 		if (server->pid == pid)
1109 		{
1110 			return server;
1111 		}
1112 	}
1113 	return NULL;
1114 }
1115 
1116 
1117