1 /* -*-pcp_worker.c-*- */
2 /*
3  *
4  * pgpool: a language independent connection pool server for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2003-2020	PgPool Global Development Group
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  *
20  * pcp_worker.c: PCP worker child process main
21  *
22  */
23 
24 
25 #include "config.h"
26 
27 #include <arpa/inet.h>
28 #include <signal.h>
29 
30 #include <stdio.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <stdlib.h>
35 #include <sys/time.h>
36 
37 #ifdef HAVE_FCNTL_H
38 #include <fcntl.h>
39 #endif
40 
41 #include "pool.h"
42 #include "pool_config.h"
43 
44 #include "pcp/pcp_stream.h"
45 #include "pcp/pcp.h"
46 #include "pcp/pcp_worker.h"
47 #include "pcp/recovery.h"
48 #include "auth/md5.h"
49 #include "auth/pool_auth.h"
50 #include "context/pool_process_context.h"
51 #include "utils/pool_process_reporting.h"
52 #include "utils/palloc.h"
53 #include "utils/memutils.h"
54 #include "utils/ps_status.h"
55 #include "utils/elog.h"
56 #include "watchdog/wd_json_data.h"
57 #include "watchdog/wd_internal_commands.h"
58 #include "main/pool_internal_comms.h"
59 
60 #define MAX_FILE_LINE_LEN    512
61 
62 extern char *pcp_conf_file;		/* global variable defined in main.c holds the
63 								 * path for pcp.conf */
64 volatile sig_atomic_t pcp_worker_wakeup_request = 0;
65 PCP_CONNECTION *volatile pcp_frontend = NULL;
66 
67 static RETSIGTYPE die(int sig);
68 static RETSIGTYPE wakeup_handler_child(int sig);
69 
70 static void unset_nonblock(int fd);
71 static int	user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len);
72 static void process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt);
73 static void send_md5salt(PCP_CONNECTION * frontend, char *salt);
74 
75 static void pcp_process_command(char tos, char *buf, int buf_len);
76 
77 static int	pool_detach_node(int node_id, bool gracefully);
78 static int	pool_promote_node(int node_id, bool gracefully);
79 static void inform_process_count(PCP_CONNECTION * frontend);
80 static void inform_process_info(PCP_CONNECTION * frontend, char *buf);
81 static void inform_watchdog_info(PCP_CONNECTION * frontend, char *buf);
82 static void inform_node_info(PCP_CONNECTION * frontend, char *buf);
83 static void inform_node_count(PCP_CONNECTION * frontend);
84 static void process_reload_config(PCP_CONNECTION * frontend,char scope);
85 static void inform_health_check_stats(PCP_CONNECTION *frontend, char *buf);
86 static void process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos);
87 static void process_attach_node(PCP_CONNECTION * frontend, char *buf);
88 static void process_recovery_request(PCP_CONNECTION * frontend, char *buf);
89 static void process_status_request(PCP_CONNECTION * frontend);
90 static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos);
91 static void process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos);
92 static void process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len);
93 
94 static void pcp_worker_will_go_down(int code, Datum arg);
95 
96 static void do_pcp_flush(PCP_CONNECTION * frontend);
97 static void do_pcp_read(PCP_CONNECTION * pc, void *buf, int len);
98 
99 /*
100  * main entry pont of pcp worker child process
101  */
102 void
pcp_worker_main(int port)103 pcp_worker_main(int port)
104 {
105 	sigjmp_buf	local_sigjmp_buf;
106 	MemoryContext PCPMemoryContext;
107 	volatile int authenticated = 0;
108 
109 	char		salt[4];
110 	int			random_salt = 0;
111 	struct timeval uptime;
112 	char		tos;
113 	int			rsize;
114 
115 	ereport(DEBUG1,
116 			(errmsg("I am PCP worker child with pid:%d", getpid())));
117 
118 	/* Identify myself via ps */
119 	init_ps_display("", "", "", "");
120 
121 	gettimeofday(&uptime, NULL);
122 	srandom((unsigned int) (getpid() ^ uptime.tv_usec));
123 
124 	/* set up signal handlers */
125 	signal(SIGTERM, die);
126 	signal(SIGINT, die);
127 	signal(SIGQUIT, die);
128 	signal(SIGCHLD, SIG_DFL);
129 	signal(SIGUSR2, wakeup_handler_child);
130 	signal(SIGUSR1, SIG_IGN);
131 	signal(SIGHUP, SIG_IGN);
132 	signal(SIGPIPE, SIG_IGN);
133 	signal(SIGALRM, SIG_IGN);
134 	/* Create per loop iteration memory context */
135 	PCPMemoryContext = AllocSetContextCreate(TopMemoryContext,
136 											 "PCP_worker_main_loop",
137 											 ALLOCSET_DEFAULT_MINSIZE,
138 											 ALLOCSET_DEFAULT_INITSIZE,
139 											 ALLOCSET_DEFAULT_MAXSIZE);
140 
141 	MemoryContextSwitchTo(TopMemoryContext);
142 
143 	/*
144 	 * install the call back for preparation of pcp worker child exit
145 	 */
146 	on_system_exit(pcp_worker_will_go_down, (Datum) NULL);
147 
148 	/* Initialize my backend status */
149 	pool_initialize_private_backend_status();
150 
151 	/* Initialize process context */
152 	pool_init_process_context();
153 
154 	pcp_frontend = pcp_open(port);
155 	unset_nonblock(pcp_frontend->fd);
156 
157 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
158 	{
159 		error_context_stack = NULL;
160 		EmitErrorReport();
161 
162 		MemoryContextSwitchTo(TopMemoryContext);
163 		FlushErrorState();
164 	}
165 	/* We can now handle ereport(ERROR) */
166 	PG_exception_stack = &local_sigjmp_buf;
167 
168 	for (;;)
169 	{
170 		char	*buf = NULL;
171 
172 		MemoryContextSwitchTo(PCPMemoryContext);
173 		MemoryContextResetAndDeleteChildren(PCPMemoryContext);
174 
175 		errno = 0;
176 
177 		/* read a PCP packet */
178 		do_pcp_read(pcp_frontend, &tos, 1);
179 		do_pcp_read(pcp_frontend, &rsize, sizeof(int));
180 
181 		rsize = ntohl(rsize);
182 
183 		if (rsize <= 0 || rsize >= MAX_PCP_PACKET_LENGTH)
184 			ereport(FATAL,
185 					(errmsg("invalid PCP packet"),
186 					 errdetail("incorrect packet length (%d)", rsize)));
187 
188 		if ((rsize - sizeof(int)) > 0)
189 		{
190 			buf = (char *) palloc(rsize - sizeof(int));
191 			do_pcp_read(pcp_frontend, buf, rsize - sizeof(int));
192 		}
193 
194 		ereport(DEBUG1,
195 				(errmsg("received PCP packet"),
196 				 errdetail("PCP packet type of service '%c'", tos)));
197 
198 		if (tos == 'R')			/* authentication */
199 		{
200 			set_ps_display("PCP: processing authentication", false);
201 			process_authentication(pcp_frontend, buf, salt, &random_salt);
202 			authenticated = 1;
203 			continue;
204 		}
205 		if (tos == 'M')			/* md5 salt */
206 		{
207 			set_ps_display("PCP: processing authentication", false);
208 			send_md5salt(pcp_frontend, salt);
209 			random_salt = 1;
210 			continue;
211 		}
212 		/* is this connection authenticated? if not disconnect immediately */
213 		if (!authenticated)
214 			ereport(FATAL,
215 					(errmsg("authentication failed for new PCP connection"),
216 					 errdetail("connection not authorized")));
217 
218 		/* process a request */
219 		pcp_process_command(tos, buf, rsize);
220 	}
221 	exit(0);
222 }
223 
224 
225 /* pcp command processor */
226 static void
pcp_process_command(char tos,char * buf,int buf_len)227 pcp_process_command(char tos, char *buf, int buf_len)
228 {
229 	if (tos == 'O' || tos == 'T')
230 	{
231 		if (Req_info->switching)
232 		{
233 			if (Req_info->request_queue_tail != Req_info->request_queue_head)
234 			{
235 				POOL_REQUEST_KIND reqkind;
236 
237 				reqkind = Req_info->request[(Req_info->request_queue_head + 1) % MAX_REQUEST_QUEUE_SIZE].kind;
238 
239 				if (reqkind == NODE_UP_REQUEST)
240 					ereport(ERROR,
241 							(errmsg("failed to process PCP request at the moment"),
242 							 errdetail("failback is in progress")));
243 				else if (reqkind == NODE_DOWN_REQUEST)
244 					ereport(ERROR,
245 							(errmsg("failed to process PCP request at the moment"),
246 							 errdetail("failover is in progress")));
247 				else if (reqkind == PROMOTE_NODE_REQUEST)
248 					ereport(ERROR,
249 							(errmsg("failed to process PCP request at the moment"),
250 							 errdetail("promote node operation is in progress")));
251 
252 				ereport(ERROR,
253 						(errmsg("failed to process PCP request at the moment"),
254 						 errdetail("operation is in progress")));
255 			}
256 		}
257 	}
258 
259 	switch (tos)
260 	{
261 		case 'A':				/* set configuration parameter */
262 			set_ps_display("PCP: processing set configration parameter request", false);
263 			process_set_configration_parameter(pcp_frontend, buf, buf_len);
264 			break;
265 
266 		case 'L':				/* node count */
267 			set_ps_display("PCP: processing node count request", false);
268 			inform_node_count(pcp_frontend);
269 			break;
270 
271 		case 'H':				/* health check stats */
272 			set_ps_display("PCP: processing health check stats request", false);
273 			inform_health_check_stats(pcp_frontend, buf);
274 			break;
275 
276 		case 'I':				/* node info */
277 			set_ps_display("PCP: processing node info request", false);
278 			inform_node_info(pcp_frontend, buf);
279 			break;
280 
281 		case 'N':				/* process count */
282 			set_ps_display("PCP: processing process count request", false);
283 			inform_process_count(pcp_frontend);
284 			break;
285 
286 		case 'P':				/* process info */
287 			set_ps_display("PCP: processing process info request", false);
288 			inform_process_info(pcp_frontend, buf);
289 			break;
290 
291 		case 'W':				/* watchdog info */
292 			set_ps_display("PCP: processing watchdog info request", false);
293 			inform_watchdog_info(pcp_frontend, buf);
294 			break;
295 
296 		case 'D':				/* detach node */
297 		case 'd':				/* detach node gracefully */
298 			set_ps_display("PCP: processing detach node request", false);
299 			process_detach_node(pcp_frontend, buf, tos);
300 			break;
301 
302 		case 'C':				/* attach node */
303 			set_ps_display("PCP: processing attach node request", false);
304 			process_attach_node(pcp_frontend, buf);
305 			break;
306 
307 		case 'T':
308 		case 't':
309 			set_ps_display("PCP: processing shutdown request", false);
310 			process_shutown_request(pcp_frontend, buf[0], tos);
311 			break;
312 
313 		case 'O':				/* recovery request */
314 			set_ps_display("PCP: processing recovery request", false);
315 			process_recovery_request(pcp_frontend, buf);
316 			break;
317 
318 		case 'B':				/* status request */
319 			set_ps_display("PCP: processing status request request", false);
320 			process_status_request(pcp_frontend);
321 			break;
322 
323 		case 'Z':				/*reload config file */
324 			set_ps_display("PCP: processing reload config request", false);
325 			process_reload_config(pcp_frontend, buf[0]);
326 			break;
327 
328 		case 'J':				/* promote node */
329 		case 'j':				/* promote node gracefully */
330 			set_ps_display("PCP: processing promote node request", false);
331 			process_promote_node(pcp_frontend, buf, tos);
332 			break;
333 
334 		case 'F':
335 			ereport(DEBUG1,
336 					(errmsg("PCP processing request, stop online recovery")));
337 			break;
338 
339 		case 'X':				/* disconnect */
340 			ereport(DEBUG1,
341 					(errmsg("PCP processing request, client disconnecting"),
342 					 errdetail("closing PCP connection, and exiting child")));
343 			pcp_close(pcp_frontend);
344 			pcp_frontend = NULL;
345 			/* This child has done its part. Rest in peace now */
346 			exit(0);
347 			break;
348 
349 		default:
350 			ereport(FATAL,
351 					(errmsg("PCP processing request"),
352 					 errdetail("unknown PCP packet type \"%c\"", tos)));
353 	}
354 }
355 
356 static RETSIGTYPE
die(int sig)357 die(int sig)
358 {
359 	ereport(DEBUG1,
360 			(errmsg("PCP worker child receives shutdown request signal %d", sig)));
361 	if (sig == SIGTERM)
362 	{
363 		ereport(DEBUG1,
364 				(errmsg("PCP worker child receives smart shutdown request."),
365 				 errdetail("waiting for the child to die its natural death")));
366 	}
367 	else if (sig == SIGINT)
368 	{
369 		ereport(DEBUG1,
370 				(errmsg("PCP worker child receives fast shutdown request.")));
371 		exit(0);
372 	}
373 	else if (sig == SIGQUIT)
374 	{
375 		ereport(DEBUG1,
376 				(errmsg("PCP worker child receives immediate shutdown request.")));
377 		exit(0);
378 	}
379 	else
380 		exit(1);
381 }
382 
383 
384 static RETSIGTYPE
wakeup_handler_child(int sig)385 wakeup_handler_child(int sig)
386 {
387 	pcp_worker_wakeup_request = 1;
388 }
389 
390 /*
391  * unset non-block flag
392  */
393 static void
unset_nonblock(int fd)394 unset_nonblock(int fd)
395 {
396 	int			var;
397 
398 	/* set fd to non-blocking */
399 	var = fcntl(fd, F_GETFL, 0);
400 	if (var == -1)
401 	{
402 		ereport(FATAL,
403 				(errmsg("unable to connect"),
404 				 errdetail("fcntl system call failed with error : \"%m\"")));
405 
406 	}
407 	if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
408 	{
409 		ereport(FATAL,
410 				(errmsg("unable to connect"),
411 				 errdetail("fcntl system call failed with error : \"%m\"")));
412 	}
413 }
414 
415 /*
416  * see if received username and password matches with one in the file
417  */
418 static int
user_authenticate(char * buf,char * passwd_file,char * salt,int salt_len)419 user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len)
420 {
421 	FILE	   *fp = NULL;
422 	char		packet_username[MAX_USER_PASSWD_LEN + 1];
423 	char		packet_password[MAX_USER_PASSWD_LEN + 1];
424 	char		encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
425 	char		file_username[MAX_USER_PASSWD_LEN + 1];
426 	char		file_password[MAX_USER_PASSWD_LEN + 1];
427 	char	   *index = NULL;
428 	static char line[MAX_FILE_LINE_LEN + 1];
429 	int			i,
430 				len;
431 
432 	/* strcpy() should be OK, but use strncpy() to be extra careful */
433 	strncpy(packet_username, buf, MAX_USER_PASSWD_LEN);
434 	index = (char *) memchr(buf, '\0', MAX_USER_PASSWD_LEN);
435 	if (index == NULL)
436 	{
437 		ereport(FATAL,
438 				(errmsg("failed to authenticate PCP user"),
439 				 errdetail("error while reading authentication packet")));
440 		return 0;
441 	}
442 	strncpy(packet_password, ++index, MAX_USER_PASSWD_LEN);
443 
444 	fp = fopen(passwd_file, "r");
445 	if (fp == NULL)
446 	{
447 		ereport(FATAL,
448 				(errmsg("failed to authenticate PCP user"),
449 				 errdetail("could not open %s. reason: %m", passwd_file)));
450 		return 0;
451 	}
452 
453 	/* for now, I don't care if duplicate username exists in the config file */
454 	while ((fgets(line, MAX_FILE_LINE_LEN, fp)) != NULL)
455 	{
456 		i = 0;
457 		len = 0;
458 
459 		if (line[0] == '\n' || line[0] == '#')
460 			continue;
461 
462 		while (line[i] != ':')
463 		{
464 			len++;
465 			if (++i > MAX_USER_PASSWD_LEN)
466 			{
467 				fclose(fp);
468 				ereport(FATAL,
469 						(errmsg("failed to authenticate PCP user"),
470 						 errdetail("username read from file \"%s\" is larger than maximum allowed username length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
471 				return 0;
472 			}
473 		}
474 		memcpy(file_username, line, len);
475 		file_username[len] = '\0';
476 
477 		if (strcmp(packet_username, file_username) != 0)
478 			continue;
479 
480 		i++;
481 		len = 0;
482 		while (line[i] != '\n' && line[i] != '\0')
483 		{
484 			len++;
485 			if (++i > MAX_USER_PASSWD_LEN)
486 			{
487 				fclose(fp);
488 				ereport(FATAL,
489 						(errmsg("failed to authenticate PCP user"),
490 						 errdetail("password read from file \"%s\" is larger than maximum allowed password length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
491 				return 0;
492 			}
493 		}
494 
495 		memcpy(file_password, line + strlen(file_username) + 1, len);
496 		file_password[len] = '\0';
497 
498 		pool_md5_encrypt(file_password, file_username, strlen(file_username),
499 						 encrypt_buf + MD5_PASSWD_LEN + 1);
500 		encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
501 
502 		pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, salt_len,
503 						 encrypt_buf);
504 		encrypt_buf[MD5_PASSWD_LEN] = '\0';
505 
506 		if (strcmp(encrypt_buf, packet_password) == 0)
507 		{
508 			fclose(fp);
509 			return 1;
510 		}
511 	}
512 	fclose(fp);
513 	ereport(FATAL,
514 			(errmsg("authentication failed for user \"%s\"", packet_username),
515 			 errdetail("username and/or password does not match")));
516 
517 	return 0;
518 }
519 
520 
521 /* Detach a node */
522 static int
pool_detach_node(int node_id,bool gracefully)523 pool_detach_node(int node_id, bool gracefully)
524 {
525 	if (!gracefully)
526 	{
527 		degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
528 		return 0;
529 	}
530 
531 	/*
532 	 * Check if the NODE DOWN can be executed on the given node id.
533 	 */
534 	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
535 
536 	/*
537 	 * Wait until all frontends exit
538 	 */
539 	*InRecovery = RECOVERY_DETACH;	/* This wiil ensure that new incoming
540 									 * connection requests are blocked */
541 
542 	if (wait_connection_closed())
543 	{
544 		/* wait timed out */
545 		finish_recovery();
546 		return -1;
547 	}
548 
549 	pcp_worker_wakeup_request = 0;
550 
551 	/*
552 	 * Now all frontends have gone. Let's do failover.
553 	 */
554 	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, false);
555 
556 	/*
557 	 * Wait for failover completed.
558 	 */
559 
560 	while (!pcp_worker_wakeup_request)
561 	{
562 		struct timeval t = {1, 0};
563 
564 		select(0, NULL, NULL, NULL, &t);
565 	}
566 	pcp_worker_wakeup_request = 0;
567 
568 	/*
569 	 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
570 	 * to distribute SIGUSR2 all pgpool children.
571 	 */
572 	finish_recovery();
573 
574 	return 0;
575 }
576 
577 /* Promote a node */
578 static int
pool_promote_node(int node_id,bool gracefully)579 pool_promote_node(int node_id, bool gracefully)
580 {
581 	if (!gracefully)
582 	{
583 		promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
584 		return 0;
585 	}
586 
587 	/*
588 	 * Wait until all frontends exit
589 	 */
590 	*InRecovery = RECOVERY_PROMOTE; /* This wiil ensure that new incoming
591 									 * connection requests are blocked */
592 
593 	if (wait_connection_closed())
594 	{
595 		/* wait timed out */
596 		finish_recovery();
597 		return -1;
598 	}
599 
600 	/*
601 	 * Now all frontends have gone. Let's do failover.
602 	 */
603 	promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
604 
605 	/*
606 	 * Wait for failover completed.
607 	 */
608 	pcp_worker_wakeup_request = 0;
609 
610 	while (!pcp_worker_wakeup_request)
611 	{
612 		struct timeval t = {1, 0};
613 
614 		select(0, NULL, NULL, NULL, &t);
615 	}
616 	pcp_worker_wakeup_request = 0;
617 
618 	/*
619 	 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
620 	 * to distribute SIGUSR2 all pgpool children.
621 	 */
622 	finish_recovery();
623 	return 0;
624 }
625 
626 static void
inform_process_count(PCP_CONNECTION * frontend)627 inform_process_count(PCP_CONNECTION * frontend)
628 {
629 	int			wsize;
630 	int			process_count;
631 	char		process_count_str[16];
632 	int		   *process_list = NULL;
633 	char		code[] = "CommandComplete";
634 	char	   *mesg = NULL;
635 	int			i;
636 	int			total_port_len = 0;
637 
638 	process_list = pool_get_process_list(&process_count);
639 
640 	mesg = (char *) palloc(7 * process_count);	/* PID is at most 6 characters
641 												 * long */
642 
643 	snprintf(process_count_str, sizeof(process_count_str), "%d", process_count);
644 
645 	for (i = 0; i < process_count; i++)
646 	{
647 		char		process_id[7];
648 
649 		snprintf(process_id, sizeof(process_id), "%d", process_list[i]);
650 		snprintf(mesg + total_port_len, strlen(process_id) + 1, "%s", process_id);
651 		total_port_len += strlen(process_id) + 1;
652 	}
653 
654 	pcp_write(frontend, "n", 1);
655 	wsize = htonl(sizeof(code) +
656 				  strlen(process_count_str) + 1 +
657 				  total_port_len +
658 				  sizeof(int));
659 	pcp_write(frontend, &wsize, sizeof(int));
660 	pcp_write(frontend, code, sizeof(code));
661 	pcp_write(frontend, process_count_str, strlen(process_count_str) + 1);
662 	pcp_write(frontend, mesg, total_port_len);
663 	do_pcp_flush(frontend);
664 
665 	pfree(process_list);
666 	pfree(mesg);
667 
668 	ereport(DEBUG1,
669 			(errmsg("PCP: informing process count"),
670 			 errdetail("%d process(es) found", process_count)));
671 }
672 
673 static void
inform_process_info(PCP_CONNECTION * frontend,char * buf)674 inform_process_info(PCP_CONNECTION * frontend, char *buf)
675 {
676 	int			proc_id;
677 	int			wsize;
678 	int			num_proc = pool_config->num_init_children;
679 	int			i;
680 
681 	proc_id = atoi(buf);
682 
683 	if ((proc_id != 0) && (pool_get_process_info(proc_id) == NULL))
684 	{
685 		ereport(ERROR,
686 				(errmsg("informing process info failed"),
687 				 errdetail("invalid process ID : %s", buf)));
688 	}
689 	else
690 	{
691 		/* First, send array size of connection_info */
692 		char		arr_code[] = "ArraySize";
693 		char		con_info_size[16];
694 
695 		/* Finally, indicate that all data is sent */
696 		char		fin_code[] = "CommandComplete";
697 
698 		POOL_REPORT_POOLS *pools = get_pools(&num_proc);
699 
700 		if (proc_id == 0)
701 		{
702 			snprintf(con_info_size, sizeof(con_info_size), "%d", num_proc);
703 		}
704 		else
705 		{
706 			snprintf(con_info_size, sizeof(con_info_size), "%d", pool_config->max_pool * NUM_BACKENDS);
707 		}
708 
709 		pcp_write(frontend, "p", 1);
710 		wsize = htonl(sizeof(arr_code) +
711 					  strlen(con_info_size) + 1 +
712 					  sizeof(int));
713 		pcp_write(frontend, &wsize, sizeof(int));
714 		pcp_write(frontend, arr_code, sizeof(arr_code));
715 		pcp_write(frontend, con_info_size, strlen(con_info_size) + 1);
716 		do_pcp_flush(frontend);
717 
718 		/* Second, send process information for all connection_info */
719 		for (i = 0; i < num_proc; i++)
720 		{
721 			char		code[] = "ProcessInfo";
722 			char		proc_pid[16];
723 			char		proc_start_time[20];
724 			char		proc_create_time[20];
725 			char		majorversion[5];
726 			char		minorversion[5];
727 			char		pool_counter[16];
728 			char		backend_id[16];
729 			char		backend_pid[16];
730 			char		connected[2];
731 
732 			if (proc_id != 0 && proc_id != pools[i].pool_pid)
733 				continue;
734 
735 			snprintf(proc_pid, sizeof(proc_pid), "%d", pools[i].pool_pid);
736 			snprintf(proc_start_time, sizeof(proc_start_time), "%ld", pools[i].start_time);
737 			snprintf(proc_create_time, sizeof(proc_create_time), "%ld", pools[i].create_time);
738 			snprintf(majorversion, sizeof(majorversion), "%d", pools[i].pool_majorversion);
739 			snprintf(minorversion, sizeof(minorversion), "%d", pools[i].pool_minorversion);
740 			snprintf(pool_counter, sizeof(pool_counter), "%d", pools[i].pool_counter);
741 			snprintf(backend_id, sizeof(backend_pid), "%d", pools[i].backend_id);
742 			snprintf(backend_pid, sizeof(backend_pid), "%d", pools[i].pool_backendpid);
743 			snprintf(connected, sizeof(connected), "%d", pools[i].pool_connected);
744 
745 			pcp_write(frontend, "p", 1);
746 			wsize = htonl(sizeof(code) +
747 						  strlen(proc_pid) + 1 +
748 						  strlen(pools[i].database) + 1 +
749 						  strlen(pools[i].username) + 1 +
750 						  strlen(proc_start_time) + 1 +
751 						  strlen(proc_create_time) + 1 +
752 						  strlen(majorversion) + 1 +
753 						  strlen(minorversion) + 1 +
754 						  strlen(pool_counter) + 1 +
755 						  strlen(backend_id) + 1 +
756 						  strlen(backend_pid) + 1 +
757 						  strlen(connected) + 1 +
758 						  sizeof(int));
759 			pcp_write(frontend, &wsize, sizeof(int));
760 			pcp_write(frontend, code, sizeof(code));
761 			pcp_write(frontend, proc_pid, strlen(proc_pid) + 1);
762 			pcp_write(frontend, pools[i].database, strlen(pools[i].database) + 1);
763 			pcp_write(frontend, pools[i].username, strlen(pools[i].username) + 1);
764 			pcp_write(frontend, proc_start_time, strlen(proc_start_time) + 1);
765 			pcp_write(frontend, proc_create_time, strlen(proc_create_time) + 1);
766 			pcp_write(frontend, majorversion, strlen(majorversion) + 1);
767 			pcp_write(frontend, minorversion, strlen(minorversion) + 1);
768 			pcp_write(frontend, pool_counter, strlen(pool_counter) + 1);
769 			pcp_write(frontend, backend_id, strlen(backend_id) + 1);
770 			pcp_write(frontend, backend_pid, strlen(backend_pid) + 1);
771 			pcp_write(frontend, connected, strlen(connected) + 1);
772 			do_pcp_flush(frontend);
773 		}
774 
775 		pcp_write(frontend, "p", 1);
776 		wsize = htonl(sizeof(fin_code) +
777 					  sizeof(int));
778 		pcp_write(frontend, &wsize, sizeof(int));
779 		pcp_write(frontend, fin_code, sizeof(fin_code));
780 		do_pcp_flush(frontend);
781 		ereport(DEBUG1,
782 				(errmsg("PCP informing process info"),
783 				 errdetail("retrieved process information from shared memory")));
784 
785 		pfree(pools);
786 	}
787 }
788 
789 static void
inform_watchdog_info(PCP_CONNECTION * frontend,char * buf)790 inform_watchdog_info(PCP_CONNECTION * frontend, char *buf)
791 {
792 	int			wd_index;
793 	int			json_data_len;
794 	int			wsize;
795 	char		code[] = "CommandComplete";
796 	char	   *json_data;
797 
798 	if (!pool_config->use_watchdog)
799 		ereport(ERROR,
800 				(errmsg("PCP: informing watchdog info failed"),
801 				 errdetail("watcdhog is not enabled")));
802 
803 	wd_index = atoi(buf);
804 
805 	json_data = wd_internal_get_watchdog_nodes_json(wd_index);
806 	if (json_data == NULL)
807 		ereport(ERROR,
808 				(errmsg("PCP: informing watchdog info failed"),
809 				 errdetail("invalid watchdog index")));
810 
811 	ereport(DEBUG2,
812 			(errmsg("PCP: informing watchdog info"),
813 			 errdetail("retrieved node information from IPC socket")));
814 
815 	/*
816 	 * This is the voilation of PCP protocol but I think in future we should
817 	 * shift to more adaptable protocol for data transmition.
818 	 */
819 	json_data_len = strlen(json_data);
820 	wsize = htonl(sizeof(code) +
821 				  json_data_len + 1 +
822 				  sizeof(int));
823 	pcp_write(frontend, "w", 1);
824 
825 	pcp_write(frontend, &wsize, sizeof(int));
826 	pcp_write(frontend, code, sizeof(code));
827 
828 	pcp_write(frontend, json_data, json_data_len + 1);
829 	do_pcp_flush(frontend);
830 
831 	pfree(json_data);
832 }
833 
834 static void
inform_node_info(PCP_CONNECTION * frontend,char * buf)835 inform_node_info(PCP_CONNECTION * frontend, char *buf)
836 {
837 	int			node_id;
838 	int			wsize;
839 	char		port_str[6];
840 	char		status[2];
841 	char		weight_str[20];
842 	char		role_str[10];
843 	char		standby_delay_str[20];
844 	char		status_changed_time_str[20];
845 	char		code[] = "CommandComplete";
846 	BackendInfo *bi = NULL;
847 	SERVER_ROLE role;
848 
849 	node_id = atoi(buf);
850 
851 	bi = pool_get_node_info(node_id);
852 
853 	if (bi == NULL)
854 		ereport(ERROR,
855 				(errmsg("informing node info failed"),
856 				 errdetail("invalid node ID")));
857 
858 	ereport(DEBUG2,
859 			(errmsg("PCP: informing node info"),
860 			 errdetail("retrieved node information from shared memory")));
861 
862 	snprintf(port_str, sizeof(port_str), "%d", bi->backend_port);
863 	snprintf(status, sizeof(status), "%d", bi->backend_status);
864 	snprintf(weight_str, sizeof(weight_str), "%f", bi->backend_weight);
865 
866 	if (STREAM)
867 	{
868 		if (Req_info->primary_node_id == node_id)
869 			role = ROLE_PRIMARY;
870 		else
871 			role = ROLE_STANDBY;
872 	}
873 	else
874 	{
875 		if (Req_info->main_node_id == node_id)
876 			role = ROLE_MAIN;
877 		else
878 			role = ROLE_REPLICA;
879 	}
880 	snprintf(role_str, sizeof(role_str), "%d", role);
881 
882 	snprintf(standby_delay_str, sizeof(standby_delay_str), UINT64_FORMAT, bi->standby_delay);
883 
884 	snprintf(status_changed_time_str, sizeof(status_changed_time_str), UINT64_FORMAT, bi->status_changed_time);
885 
886 	pcp_write(frontend, "i", 1);
887 	wsize = htonl(sizeof(code) +
888 				  strlen(bi->backend_hostname) + 1 +
889 				  strlen(port_str) + 1 +
890 				  strlen(status) + 1 +
891 				  strlen(weight_str) + 1 +
892 				  strlen(role_str) + 1 +
893 				  strlen(standby_delay_str) + 1 +
894 				  strlen(bi->replication_state) + 1 +
895 				  strlen(bi->replication_sync_state) + 1 +
896 				  strlen(status_changed_time_str) + 1 +
897 				  sizeof(int));
898 	pcp_write(frontend, &wsize, sizeof(int));
899 	pcp_write(frontend, code, sizeof(code));
900 	pcp_write(frontend, bi->backend_hostname, strlen(bi->backend_hostname) + 1);
901 	pcp_write(frontend, port_str, strlen(port_str) + 1);
902 	pcp_write(frontend, status, strlen(status) + 1);
903 	pcp_write(frontend, weight_str, strlen(weight_str) + 1);
904 	pcp_write(frontend, role_str, strlen(role_str) + 1);
905 	pcp_write(frontend, standby_delay_str, strlen(standby_delay_str) + 1);
906 	pcp_write(frontend, bi->replication_state, strlen(bi->replication_state) + 1);
907 	pcp_write(frontend, bi->replication_sync_state, strlen(bi->replication_sync_state) + 1);
908 	pcp_write(frontend, status_changed_time_str, strlen(status_changed_time_str) + 1);
909 
910 	do_pcp_flush(frontend);
911 }
912 
913 /*
914  * Send out health check stats data to pcp client.  node id is provided as a
915  * string in buf parameter.
916  *
917  * The protocol starts with 'h', followed by 4-byte packet length integer in
918  * network byte order including self.  Each data is represented as a null
919  * terminted string. The order of each data is defined in
920  * POOL_HEALTH_CHECK_STATS struct.
921  */
922 static void
inform_health_check_stats(PCP_CONNECTION * frontend,char * buf)923 inform_health_check_stats(PCP_CONNECTION *frontend, char *buf)
924 {
925 	POOL_HEALTH_CHECK_STATS *stats;
926 	POOL_HEALTH_CHECK_STATS *s;
927 	int		*offsets;
928 	int		n;
929 	int		nrows;
930 	int		i;
931 	int		node_id;
932 	bool	node_id_ok = false;
933 	int		wsize;
934 	char	code[] = "CommandComplete";
935 
936 	node_id = atoi(buf);
937 
938 	if (node_id < 0 || node_id > NUM_BACKENDS)
939 	{
940 		ereport(ERROR,
941 				(errmsg("informing health check stats info failed"),
942 				 errdetail("invalid node ID %d", node_id)));
943 	}
944 
945 	stats = get_health_check_stats(&nrows);
946 
947 	for (i = 0; i < nrows; i++)
948 	{
949 		if (atoi(stats[i].node_id) == node_id)
950 		{
951 			node_id_ok = true;
952 			s = &stats[i];
953 			break;
954 		}
955 	}
956 
957 	if (!node_id_ok)
958 	{
959 		ereport(ERROR,
960 				(errmsg("informing health check stats info failed"),
961 				 errdetail("stats data for node ID %d does not exist", node_id)));
962 	}
963 
964 	pcp_write(frontend, "h", 1);	/* indicate that this is a reply to health check stats request */
965 
966 	wsize = sizeof(code) + sizeof(int);
967 
968 	/* Calculate total packet length */
969 	offsets = pool_health_check_stats_offsets(&n);
970 
971 	for (i = 0; i < n; i++)
972 	{
973 		wsize += strlen((char *)s + offsets[i]) + 1;
974 	}
975 	wsize = htonl(wsize);	/* convert to network byte order */
976 
977 	/* send packet length to frontend */
978 	pcp_write(frontend, &wsize, sizeof(int));
979 	/* send "Command Complete" to frontend */
980 	pcp_write(frontend, code, sizeof(code));
981 
982 	/* send each health check stats data to frontend */
983 	for (i = 0; i < n; i++)
984 	{
985 		pcp_write(frontend, (char *)s + offsets[i], strlen((char *)s + offsets[i]) + 1);
986 	}
987 	pfree(stats);
988 	do_pcp_flush(frontend);
989 }
990 
991 static void
inform_node_count(PCP_CONNECTION * frontend)992 inform_node_count(PCP_CONNECTION * frontend)
993 {
994 	int			wsize;
995 	char		mesg[16];
996 	char		code[] = "CommandComplete";
997 	int			node_count = pool_get_node_count();
998 
999 	snprintf(mesg, sizeof(mesg), "%d", node_count);
1000 
1001 	pcp_write(frontend, "l", 1);
1002 	wsize = htonl(sizeof(code) +
1003 				  strlen(mesg) + 1 +
1004 				  sizeof(int));
1005 	pcp_write(frontend, &wsize, sizeof(int));
1006 	pcp_write(frontend, code, sizeof(code));
1007 	pcp_write(frontend, mesg, strlen(mesg) + 1);
1008 	do_pcp_flush(frontend);
1009 
1010 	ereport(DEBUG1,
1011 			(errmsg("PCP: informing node count"),
1012 			 errdetail("%d node(s) found", node_count)));
1013 }
1014 static void
process_reload_config(PCP_CONNECTION * frontend,char scope)1015 process_reload_config(PCP_CONNECTION * frontend, char scope)
1016 {
1017 	char            code[] = "CommandComplete";
1018 	int wsize;
1019 
1020 	if (scope == 'c' && pool_config->use_watchdog)
1021 	{
1022 		ereport(LOG,
1023 				(errmsg("PCP: sending command to watchdog to reload config cluster")));
1024 
1025 		if (wd_execute_cluster_command(WD_COMMAND_RELOAD_CONFIG_CLUSTER,0, NULL) != COMMAND_OK)
1026 			ereport(ERROR,
1027 					(errmsg("PCP: error while processing reload config request for cluster"),
1028 					 errdetail("failed to propogate reload config command through watchdog")));
1029 	}
1030 
1031 	if(pool_signal_parent(SIGHUP) == -1)
1032 	{
1033 	   ereport(ERROR,
1034 			   (errmsg("process reload config request failed"),
1035 				errdetail("failed to signal pgpool parent process")));
1036 	}
1037 
1038 	pcp_write(frontend, "z", 1);
1039 	wsize = htonl(sizeof(code) + sizeof(int));
1040 	pcp_write(frontend, &wsize, sizeof(int));
1041 	pcp_write(frontend, code, sizeof(code));
1042 	do_pcp_flush(frontend);
1043 }
1044 
1045 static void
process_detach_node(PCP_CONNECTION * frontend,char * buf,char tos)1046 process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos)
1047 {
1048 	int			node_id;
1049 	int			wsize;
1050 	char		code[] = "CommandComplete";
1051 	bool		gracefully;
1052 
1053 	if (tos == 'D')
1054 		gracefully = false;
1055 	else
1056 		gracefully = true;
1057 
1058 	node_id = atoi(buf);
1059 	ereport(DEBUG1,
1060 			(errmsg("PCP: processing detach node"),
1061 			 errdetail("detaching Node ID %d", node_id)));
1062 
1063 	pool_detach_node(node_id, gracefully);
1064 
1065 	pcp_write(frontend, "d", 1);
1066 	wsize = htonl(sizeof(code) + sizeof(int));
1067 	pcp_write(frontend, &wsize, sizeof(int));
1068 	pcp_write(frontend, code, sizeof(code));
1069 	do_pcp_flush(frontend);
1070 }
1071 
1072 static void
process_attach_node(PCP_CONNECTION * frontend,char * buf)1073 process_attach_node(PCP_CONNECTION * frontend, char *buf)
1074 {
1075 	int			node_id;
1076 	int			wsize;
1077 	char		code[] = "CommandComplete";
1078 
1079 	node_id = atoi(buf);
1080 	ereport(DEBUG1,
1081 			(errmsg("PCP: processing attach node"),
1082 			 errdetail("attaching Node ID %d", node_id)));
1083 
1084 	send_failback_request(node_id, true, REQ_DETAIL_CONFIRMED);
1085 
1086 	pcp_write(frontend, "c", 1);
1087 	wsize = htonl(sizeof(code) + sizeof(int));
1088 	pcp_write(frontend, &wsize, sizeof(int));
1089 	pcp_write(frontend, code, sizeof(code));
1090 	do_pcp_flush(frontend);
1091 }
1092 
1093 
1094 static void
process_recovery_request(PCP_CONNECTION * frontend,char * buf)1095 process_recovery_request(PCP_CONNECTION * frontend, char *buf)
1096 {
1097 	int			wsize;
1098 	char		code[] = "CommandComplete";
1099 	int			node_id = atoi(buf);
1100 
1101 	if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
1102 		ereport(ERROR,
1103 				(errmsg("process recovery request failed"),
1104 				 errdetail("node id %d is not valid", node_id)));
1105 
1106 	if ((!REPLICATION &&
1107 		 !(STREAM)) ||
1108 		(STREAM &&
1109 		 node_id == PRIMARY_NODE_ID))
1110 	{
1111 		if (STREAM)
1112 			ereport(ERROR,
1113 					(errmsg("process recovery request failed"),
1114 					 errdetail("primary server cannot be recovered by online recovery.")));
1115 		else
1116 			ereport(ERROR,
1117 					(errmsg("process recovery request failed"),
1118 					 errdetail("recovery request is only allowed in replication and streaming replication modes.")));
1119 	}
1120 	else
1121 	{
1122 		if (pcp_mark_recovery_in_progress() == false)
1123 			ereport(FATAL,
1124 					(errmsg("process recovery request failed"),
1125 					 errdetail("pgpool-II is already processing another recovery request.")));
1126 
1127 		ereport(DEBUG1,
1128 				(errmsg("PCP: processing recovery request"),
1129 				 errdetail("start online recovery")));
1130 
1131 		PG_TRY();
1132 		{
1133 			start_recovery(node_id);
1134 			finish_recovery();
1135 			pcp_write(frontend, "c", 1);
1136 			wsize = htonl(sizeof(code) + sizeof(int));
1137 			pcp_write(frontend, &wsize, sizeof(int));
1138 			pcp_write(frontend, code, sizeof(code));
1139 			do_pcp_flush(frontend);
1140 			pcp_mark_recovery_finished();
1141 		}
1142 		PG_CATCH();
1143 		{
1144 			finish_recovery();
1145 			pcp_mark_recovery_finished();
1146 			PG_RE_THROW();
1147 
1148 		} PG_END_TRY();
1149 	}
1150 	do_pcp_flush(frontend);
1151 }
1152 
1153 static void
process_status_request(PCP_CONNECTION * frontend)1154 process_status_request(PCP_CONNECTION * frontend)
1155 {
1156 	int			nrows = 0;
1157 	int			i;
1158 	POOL_REPORT_CONFIG *status = get_config(&nrows);
1159 	int			len = 0;
1160 
1161 	/* First, send array size of connection_info */
1162 	char		arr_code[] = "ArraySize";
1163 	char		code[] = "ProcessConfig";
1164 
1165 	/* Finally, indicate that all data is sent */
1166 	char		fin_code[] = "CommandComplete";
1167 
1168 	pcp_write(frontend, "b", 1);
1169 	len = htonl(sizeof(arr_code) + sizeof(int) + sizeof(int));
1170 	pcp_write(frontend, &len, sizeof(int));
1171 	pcp_write(frontend, arr_code, sizeof(arr_code));
1172 	len = htonl(nrows);
1173 	pcp_write(frontend, &len, sizeof(int));
1174 
1175 	do_pcp_flush(frontend);
1176 
1177 	for (i = 0; i < nrows; i++)
1178 	{
1179 		pcp_write(frontend, "b", 1);
1180 		len = htonl(sizeof(int)
1181 					+ sizeof(code)
1182 					+ strlen(status[i].name) + 1
1183 					+ strlen(status[i].value) + 1
1184 					+ strlen(status[i].desc) + 1
1185 			);
1186 
1187 		pcp_write(frontend, &len, sizeof(int));
1188 		pcp_write(frontend, code, sizeof(code));
1189 		pcp_write(frontend, status[i].name, strlen(status[i].name) + 1);
1190 		pcp_write(frontend, status[i].value, strlen(status[i].value) + 1);
1191 		pcp_write(frontend, status[i].desc, strlen(status[i].desc) + 1);
1192 	}
1193 
1194 	pcp_write(frontend, "b", 1);
1195 	len = htonl(sizeof(fin_code) + sizeof(int));
1196 	pcp_write(frontend, &len, sizeof(int));
1197 	pcp_write(frontend, fin_code, sizeof(fin_code));
1198 	do_pcp_flush(frontend);
1199 
1200 	pfree(status);
1201 	ereport(DEBUG1,
1202 			(errmsg("PCP: processing status request"),
1203 			 errdetail("retrieved status information")));
1204 }
1205 
1206 static void
process_promote_node(PCP_CONNECTION * frontend,char * buf,char tos)1207 process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos)
1208 {
1209 	int			node_id;
1210 	int			wsize;
1211 	char		code[] = "CommandComplete";
1212 	bool		gracefully;
1213 
1214 	if (tos == 'J')
1215 		gracefully = false;
1216 	else
1217 		gracefully = true;
1218 
1219 	node_id = atoi(buf);
1220 	if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
1221 		ereport(ERROR,
1222 				(errmsg("could not process recovery request"),
1223 				 errdetail("node id %d is not valid", node_id)));
1224 	/* promoting node is reserved to Streaming Replication */
1225 	if (!STREAM)
1226 	{
1227 		ereport(FATAL,
1228 				(errmsg("invalid pgpool mode for process recovery request"),
1229 				 errdetail("not in streaming replication mode, can't promote node id %d", node_id)));
1230 
1231 	}
1232 
1233 	if (node_id == REAL_PRIMARY_NODE_ID)
1234 	{
1235 		ereport(FATAL,
1236 				(errmsg("invalid pgpool mode for process recovery request"),
1237 				 errdetail("specified node is already primary node, can't promote node id %d", node_id)));
1238 
1239 	}
1240 	ereport(DEBUG1,
1241 			(errmsg("PCP: processing promote node"),
1242 			 errdetail("promoting Node ID %d", node_id)));
1243 	pool_promote_node(node_id, gracefully);
1244 
1245 	pcp_write(frontend, "d", 1);
1246 	wsize = htonl(sizeof(code) + sizeof(int));
1247 	pcp_write(frontend, &wsize, sizeof(int));
1248 	pcp_write(frontend, code, sizeof(code));
1249 	do_pcp_flush(frontend);
1250 }
1251 
1252 static void
process_authentication(PCP_CONNECTION * frontend,char * buf,char * salt,int * random_salt)1253 process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt)
1254 {
1255 	int			wsize;
1256 	int			authenticated;
1257 
1258 	if (*random_salt)
1259 	{
1260 		authenticated = user_authenticate(buf, pcp_conf_file, salt, 4);
1261 	}
1262 	if (!*random_salt || !authenticated)
1263 	{
1264 		ereport(FATAL,
1265 				(errmsg("authentication failed"),
1266 				 errdetail("username and/or password does not match")));
1267 
1268 		*random_salt = 0;
1269 	}
1270 	else
1271 	{
1272 		char		code[] = "AuthenticationOK";
1273 
1274 		pcp_write(frontend, "r", 1);
1275 		wsize = htonl(sizeof(code) + sizeof(int));
1276 		pcp_write(frontend, &wsize, sizeof(int));
1277 		pcp_write(frontend, code, sizeof(code));
1278 		do_pcp_flush(frontend);
1279 		*random_salt = 0;
1280 
1281 		ereport(DEBUG1,
1282 				(errmsg("PCP: processing authentication request"),
1283 				 errdetail("authentication OK")));
1284 	}
1285 }
1286 
1287 static void
send_md5salt(PCP_CONNECTION * frontend,char * salt)1288 send_md5salt(PCP_CONNECTION * frontend, char *salt)
1289 {
1290 	int			wsize;
1291 
1292 	ereport(DEBUG1,
1293 			(errmsg("PCP: sending md5 salt to client")));
1294 
1295 	pool_random_salt(salt);
1296 
1297 	pcp_write(frontend, "m", 1);
1298 	wsize = htonl(sizeof(int) + 4);
1299 	pcp_write(frontend, &wsize, sizeof(int));
1300 	pcp_write(frontend, salt, 4);
1301 	do_pcp_flush(frontend);
1302 }
1303 
1304 static void
process_shutown_request(PCP_CONNECTION * frontend,char mode,char tos)1305 process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos)
1306 {
1307 	char		code[] = "CommandComplete";
1308 	int			len;
1309 
1310 	ereport(DEBUG1,
1311 			(errmsg("PCP: processing shutdown request"),
1312 			 errdetail("shutdown mode \"%c\"", mode)));
1313 
1314 	/* quickly bail out if invalid mode is specified
1315 	 * because we do not want to propogate the command
1316 	 * with invalid mode over the watchdog network */
1317 	if (mode != 's' && mode != 'i' && mode != 'f' )
1318 	{
1319 		ereport(ERROR,
1320 				(errmsg("PCP: error while processing shutdown request"),
1321 				 errdetail("invalid shutdown mode \"%c\"", mode)));
1322 	}
1323 
1324 	if (tos == 't' && pool_config->use_watchdog)
1325 	{
1326 		WDExecCommandArg wdExecCommandArg;
1327 
1328 		strncpy(wdExecCommandArg.arg_name, "mode", sizeof(wdExecCommandArg.arg_name) - 1);
1329 		snprintf(wdExecCommandArg.arg_value, sizeof(wdExecCommandArg.arg_name) - 1, "%c",mode);
1330 
1331 		ereport(LOG,
1332 				(errmsg("PCP: sending command to watchdog to shutdown cluster")));
1333 
1334 		if (wd_execute_cluster_command(WD_COMMAND_SHUTDOWN_CLUSTER,1, &wdExecCommandArg) != COMMAND_OK)
1335 			ereport(ERROR,
1336 					(errmsg("PCP: error while processing shutdown cluster request"),
1337 					 errdetail("failed to propogate shutdown command through watchdog")));
1338 	}
1339 
1340 	pcp_write(frontend, "t", 1);
1341 	len = htonl(sizeof(code) + sizeof(int));
1342 	pcp_write(frontend, &len, sizeof(int));
1343 	pcp_write(frontend, code, sizeof(code));
1344 	do_pcp_flush(frontend);
1345 
1346 	terminate_pgpool(mode, true);
1347 }
1348 
1349 static void
process_set_configration_parameter(PCP_CONNECTION * frontend,char * buf,int len)1350 process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len)
1351 {
1352 	char	   *param_name;
1353 	char	   *param_value;
1354 	int			wsize;
1355 	char		code[] = "CommandComplete";
1356 
1357 	param_name = buf;
1358 	if (param_name == NULL)
1359 		ereport(ERROR,
1360 				(errmsg("PCP: set configuration parameter failed"),
1361 				 errdetail("invalid pcp packet received from client")));
1362 
1363 	param_value = (char *) memchr(buf, '\0', len);
1364 	if (param_value == NULL)
1365 		ereport(ERROR,
1366 				(errmsg("set configuration parameter failed"),
1367 				 errdetail("invalid pcp packet received from client")));
1368 
1369 	param_value += 1;
1370 	ereport(LOG,
1371 			(errmsg("set configuration parameter, \"%s TO %s\"", param_name, param_value)));
1372 
1373 	if (strcasecmp(param_name, "client_min_messages") == 0)
1374 	{
1375 		const char *ordered_valid_values[] = {"debug5", "debug4", "debug3", "debug2", "debug1", "log", "commerror", "info", "notice", "warning", "error", NULL};
1376 		bool		found = false;
1377 		int			i;
1378 
1379 		for (i = 0;; i++)
1380 		{
1381 			char	   *valid_val = (char *) ordered_valid_values[i];
1382 
1383 			if (!valid_val)
1384 				break;
1385 
1386 			if (!strcasecmp(param_value, valid_val))
1387 			{
1388 				found = true;
1389 				pool_config->client_min_messages = i + 10;
1390 				ereport(DEBUG1,
1391 						(errmsg("PCP setting parameter \"%s\" to \"%s\"", param_name, param_value)));
1392 				break;
1393 			}
1394 		}
1395 		if (!found)
1396 			ereport(ERROR,
1397 					(errmsg("PCP: set configuration parameter failed"),
1398 					 errdetail("invalid value \"%s\" for parameter \"%s\"", param_value, param_name)));
1399 	}
1400 	else
1401 		ereport(ERROR,
1402 				(errmsg("PCP: set configuration parameter failed"),
1403 				 errdetail("invalid parameter \"%s\"", param_name)));
1404 
1405 	pcp_write(frontend, "a", 1);
1406 	wsize = htonl(sizeof(code) + sizeof(int));
1407 	pcp_write(frontend, &wsize, sizeof(int));
1408 	pcp_write(frontend, code, sizeof(code));
1409 	do_pcp_flush(frontend);
1410 }
1411 
1412 /*
1413  * Wrapper around pcp_flush which throws FATAL error when pcp_flush fails
1414  */
1415 static void
do_pcp_flush(PCP_CONNECTION * frontend)1416 do_pcp_flush(PCP_CONNECTION * frontend)
1417 {
1418 	if (pcp_flush(frontend) < 0)
1419 		ereport(FATAL,
1420 				(errmsg("failed to flush data to client"),
1421 				 errdetail("pcp_flush failed with error : \"%m\"")));
1422 }
1423 
1424 /*
1425  * Wrapper around pcp_read which throws FATAL error when read fails
1426  */
1427 static void
do_pcp_read(PCP_CONNECTION * pc,void * buf,int len)1428 do_pcp_read(PCP_CONNECTION * pc, void *buf, int len)
1429 {
1430 	if (pcp_read(pc, buf, len))
1431 		ereport(FATAL,
1432 				(errmsg("unable to read from client"),
1433 				 errdetail("pcp_read failed with error : \"%m\"")));
1434 }
1435 
1436 int
send_to_pcp_frontend(char * data,int len,bool flush)1437 send_to_pcp_frontend(char *data, int len, bool flush)
1438 {
1439 	int			ret;
1440 
1441 	if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1442 		return -1;
1443 	ret = pcp_write(pcp_frontend, data, len);
1444 	if (flush && !ret)
1445 		ret = pcp_flush(pcp_frontend);
1446 	return ret;
1447 }
1448 
1449 int
pcp_frontend_exists(void)1450 pcp_frontend_exists(void)
1451 {
1452 	if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1453 		return -1;
1454 	return 0;
1455 }
1456 
1457 static void
pcp_worker_will_go_down(int code,Datum arg)1458 pcp_worker_will_go_down(int code, Datum arg)
1459 {
1460 	if (processType != PT_PCP_WORKER)
1461 	{
1462 		/* should never happen */
1463 		ereport(WARNING,
1464 				(errmsg("pcp_worker_will_go_down called from invalid process")));
1465 		return;
1466 	}
1467 	if (pcp_frontend)
1468 		pcp_close(pcp_frontend);
1469 	processState = EXITING;
1470 	POOL_SETMASK(&UnBlockSig);
1471 
1472 }
1473