1 /* -*-pcp_worker.c-*- */
2 /*
3  *
4  * pgpool: a language independent connection pool server for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2003-2019	PgPool Global Development Group
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  *
20  * pcp_worker.c: PCP worker child process main
21  *
22  */
23 
24 #include "config.h"
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 
29 #include <arpa/inet.h>
30 #include <signal.h>
31 
32 #include <stdio.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <sys/time.h>
38 
39 #ifdef HAVE_FCNTL_H
40 #include <fcntl.h>
41 #endif
42 
43 #include "pcp/pcp_stream.h"
44 #include "pcp/pcp.h"
45 #include "auth/md5.h"
46 #include "pool_config.h"
47 #include "context/pool_process_context.h"
48 #include "utils/pool_process_reporting.h"
49 #include "watchdog/wd_json_data.h"
50 #include "watchdog/wd_ipc_commands.h"
51 #include "utils/elog.h"
52 
53 #define MAX_FILE_LINE_LEN    512
54 
55 extern char *pcp_conf_file; /* global variable defined in main.c holds the path for pcp.conf */
56 volatile sig_atomic_t pcp_worker_wakeup_request = 0;
57 PCP_CONNECTION* volatile pcp_frontend = NULL;
58 
59 static RETSIGTYPE die(int sig);
60 static RETSIGTYPE wakeup_handler_child(int sig);
61 
62 static void unset_nonblock(int fd);
63 static int user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len);
64 static void process_authentication(PCP_CONNECTION *frontend, char *buf, char* salt, int *random_salt);
65 static void send_md5salt(PCP_CONNECTION *frontend, char* salt);
66 
67 static void pcp_process_command(char tos, char *buf, int buf_len);
68 
69 static int pool_detach_node(int node_id, bool gracefully);
70 static int pool_promote_node(int node_id, bool gracefully);
71 static void inform_process_count(PCP_CONNECTION *frontend);
72 static void inform_process_info(PCP_CONNECTION *frontend, char *buf);
73 static void inform_watchdog_info(PCP_CONNECTION *frontend, char *buf);
74 static void inform_node_info(PCP_CONNECTION *frontend, char *buf);
75 static void inform_node_count(PCP_CONNECTION *frontend);
76 static void process_detach_node(PCP_CONNECTION *frontend,char *buf,char tos);
77 static void process_attach_node(PCP_CONNECTION *frontend,char *buf);
78 static void process_recovery_request(PCP_CONNECTION *frontend,char *buf);
79 static void process_status_request(PCP_CONNECTION *frontend);
80 static void process_promote_node(PCP_CONNECTION *frontend,char *buf, char tos);
81 static void process_shutown_request(PCP_CONNECTION *frontend,char mode);
82 static void process_set_configration_parameter(PCP_CONNECTION *frontend,char *buf, int len);
83 
84 static void pcp_worker_will_go_down(int code, Datum arg);
85 
86 static void  do_pcp_flush(PCP_CONNECTION *frontend);
87 static void  do_pcp_read(PCP_CONNECTION *pc, void *buf, int len);
88 
89 /*
90  * main entry pont of pcp worker child process
91  */
92 void
pcp_worker_main(int port)93 pcp_worker_main(int port)
94 {
95 	sigjmp_buf	local_sigjmp_buf;
96 	MemoryContext PCPMemoryContext;
97 	volatile int authenticated = 0;
98 
99 	char salt[4];
100 	int random_salt = 0;
101 	struct timeval uptime;
102 	char tos;
103 	int rsize;
104 
105 	ereport(DEBUG1,
106 			(errmsg("I am PCP worker child with pid:%d",getpid())));
107 
108 	/* Identify myself via ps */
109 	init_ps_display("", "", "", "");
110 
111 	gettimeofday(&uptime, NULL);
112 	srandom((unsigned int) (getpid() ^ uptime.tv_usec));
113 
114 	/* set up signal handlers */
115 	signal(SIGTERM, die);
116 	signal(SIGINT, die);
117 	signal(SIGQUIT, die);
118 	signal(SIGCHLD, SIG_DFL);
119 	signal(SIGUSR2, wakeup_handler_child);
120 	signal(SIGUSR1, SIG_IGN);
121 	signal(SIGHUP, SIG_IGN);
122 	signal(SIGPIPE, SIG_IGN);
123 	signal(SIGALRM, SIG_IGN);
124 	/* Create per loop iteration memory context */
125 	PCPMemoryContext = AllocSetContextCreate(TopMemoryContext,
126 											 "PCP_worker_main_loop",
127 											 ALLOCSET_DEFAULT_MINSIZE,
128 											 ALLOCSET_DEFAULT_INITSIZE,
129 											 ALLOCSET_DEFAULT_MAXSIZE);
130 
131 	MemoryContextSwitchTo(TopMemoryContext);
132 
133 	/*
134 	 * install the call back for preparation of pcp worker child exit
135 	 */
136 	on_system_exit(pcp_worker_will_go_down, (Datum)NULL);
137 
138 	/* Initialize my backend status */
139 	pool_initialize_private_backend_status();
140 
141 	/* Initialize process context */
142 	pool_init_process_context();
143 
144 	pcp_frontend = pcp_open(port);
145 	unset_nonblock(pcp_frontend->fd);
146 
147 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
148 	{
149 		error_context_stack = NULL;
150 		EmitErrorReport();
151 
152 		MemoryContextSwitchTo(TopMemoryContext);
153 		FlushErrorState();
154 	}
155 	/* We can now handle ereport(ERROR) */
156 	PG_exception_stack = &local_sigjmp_buf;
157 
158 	for(;;)
159 	{
160 		char *buf = NULL;
161 
162 		MemoryContextSwitchTo(PCPMemoryContext);
163 		MemoryContextResetAndDeleteChildren(PCPMemoryContext);
164 
165 		errno = 0;
166 
167 		/* read a PCP packet */
168 		do_pcp_read(pcp_frontend, &tos, 1);
169 		do_pcp_read(pcp_frontend, &rsize, sizeof(int));
170 
171 		rsize = ntohl(rsize);
172 
173 		if (rsize <= 0 || rsize >= MAX_PCP_PACKET_LENGTH)
174 			ereport(FATAL,
175 				(errmsg("invalid PCP packet"),
176 					 errdetail("incorrect packet length (%d)", rsize)));
177 
178 		if ((rsize - sizeof(int)) > 0)
179 		{
180 			buf = (char *)palloc(rsize - sizeof(int));
181 			do_pcp_read(pcp_frontend, buf, rsize - sizeof(int));
182 		}
183 
184 		ereport(DEBUG1,
185 			(errmsg("received PCP packet"),
186 				 errdetail("PCP packet type of service '%c'", tos)));
187 
188 		if (tos == 'R') /* authentication */
189 		{
190 			set_ps_display("PCP: processing authentication", false);
191 			process_authentication(pcp_frontend, buf,salt, &random_salt);
192 			authenticated = 1;
193 			continue;
194 		}
195 		if (tos == 'M') /* md5 salt */
196 		{
197 			set_ps_display("PCP: processing authentication", false);
198 			send_md5salt(pcp_frontend, salt);
199 			random_salt = 1;
200 			continue;
201 		}
202 		/* is this connection authenticated? if not disconnect immediately*/
203 		if (!authenticated)
204 			ereport(FATAL,
205 				(errmsg("authentication failed for new PCP connection"),
206 					 errdetail("connection not authorized")));
207 
208 		/* process a request */
209 		pcp_process_command(tos, buf, rsize);
210 	}
211 	exit(0);
212 }
213 
214 
215 /* pcp command processor */
216 static void
pcp_process_command(char tos,char * buf,int buf_len)217 pcp_process_command(char tos, char *buf, int buf_len)
218 {
219 
220 	if (tos == 'O' || tos == 'T')
221 	{
222 		if (Req_info->switching)
223 		{
224 			if(Req_info->request_queue_tail != Req_info->request_queue_head)
225 			{
226 				POOL_REQUEST_KIND reqkind;
227 				reqkind = Req_info->request[(Req_info->request_queue_head +1) % MAX_REQUEST_QUEUE_SIZE].kind;
228 
229 				if (reqkind == NODE_UP_REQUEST)
230 					ereport(ERROR,
231 							(errmsg("failed to process PCP request at the moment"),
232 							 errdetail("failback is in progress")));
233 				else if (reqkind == NODE_DOWN_REQUEST)
234 					ereport(ERROR,
235 							(errmsg("failed to process PCP request at the moment"),
236 							 errdetail("failover is in progress")));
237 				else if (reqkind == PROMOTE_NODE_REQUEST)
238 					ereport(ERROR,
239 							(errmsg("failed to process PCP request at the moment"),
240 							 errdetail("promote node operation is in progress")));
241 
242 				ereport(ERROR,
243 						(errmsg("failed to process PCP request at the moment"),
244 						 errdetail("operation is in progress")));
245 			}
246 		}
247 	}
248 
249 	switch (tos)
250 	{
251 		case 'A':			/* set configuration parameter */
252 			set_ps_display("PCP: processing set configration parameter request", false);
253 			process_set_configration_parameter(pcp_frontend,buf,buf_len);
254 			break;
255 
256 		case 'L':			/* node count */
257 			set_ps_display("PCP: processing node count request", false);
258 			inform_node_count(pcp_frontend);
259 			break;
260 
261 		case 'I':			/* node info */
262 			set_ps_display("PCP: processing node info request", false);
263 			inform_node_info(pcp_frontend, buf);
264 			break;
265 
266 		case 'N':			/* process count */
267 			set_ps_display("PCP: processing process count request", false);
268 			inform_process_count(pcp_frontend);
269 			break;
270 
271 		case 'P':			/* process info */
272 			set_ps_display("PCP: processing process info request", false);
273 			inform_process_info(pcp_frontend, buf);
274 			break;
275 
276 		case 'W':			/* watchdog info */
277 			set_ps_display("PCP: processing watchdog info request", false);
278 			inform_watchdog_info(pcp_frontend, buf);
279 			break;
280 
281 		case 'D':			/* detach node */
282 		case 'd':			/* detach node gracefully */
283 			set_ps_display("PCP: processing detach node request", false);
284 			process_detach_node(pcp_frontend, buf, tos);
285 			break;
286 
287 		case 'C':			/* attach node */
288 			set_ps_display("PCP: processing attach node request", false);
289 			process_attach_node(pcp_frontend, buf);
290 			break;
291 
292 		case 'T':
293 			set_ps_display("PCP: processing shutdown request", false);
294 			process_shutown_request(pcp_frontend, buf[0]);
295 			break;
296 
297 		case 'O': /* recovery request */
298 			set_ps_display("PCP: processing recovery request", false);
299 			process_recovery_request(pcp_frontend, buf);
300 			break;
301 
302 		case 'B': /* status request*/
303 			set_ps_display("PCP: processing status request request", false);
304 			process_status_request(pcp_frontend);
305 			break;
306 
307 		case 'J':			/* promote node */
308 		case 'j':			/* promote node gracefully */
309 			set_ps_display("PCP: processing promote node request", false);
310 			process_promote_node(pcp_frontend,buf,tos);
311 			break;
312 
313 		case 'F':
314 			ereport(DEBUG1,
315 					(errmsg("PCP processing request, stop online recovery")));
316 			break;
317 
318 		case 'X':			/* disconnect */
319 			ereport(DEBUG1,
320 				(errmsg("PCP processing request, client disconnecting"),
321 					 errdetail("closing PCP connection, and exiting child")));
322 			pcp_close(pcp_frontend);
323 			pcp_frontend = NULL;
324 			/* This child has done its part. Rest in peace now */
325 			exit(0);
326 			break;
327 
328 		default:
329 			ereport(FATAL,
330 				(errmsg("PCP processing request"),
331 					 errdetail("unknown PCP packet type \"%c\"",tos)));
332 	}
333 }
334 
335 static RETSIGTYPE
die(int sig)336 die(int sig)
337 {
338 	ereport(DEBUG1,
339 			(errmsg("PCP worker child receives shutdown request signal %d", sig)));
340 	if(sig == SIGTERM)
341 	{
342 		ereport(DEBUG1,
343 			(errmsg("PCP worker child receives smart shutdown request."),
344 				errdetail("waiting for the child to die its natural death")));
345 	}
346 	else if (sig == SIGINT)
347 	{
348 		ereport(DEBUG1,
349 				(errmsg("PCP worker child receives fast shutdown request.")));
350 		exit(0);
351 	}
352 	else if (sig == SIGQUIT)
353 	{
354 		ereport(DEBUG1,
355 				(errmsg("PCP worker child receives immediate shutdown request.")));
356 		exit(0);
357 	}
358 	else
359 		exit(1);
360 }
361 
362 
363 static RETSIGTYPE
wakeup_handler_child(int sig)364 wakeup_handler_child(int sig)
365 {
366 	pcp_worker_wakeup_request = 1;
367 }
368 
369 /*
370  * unset non-block flag
371  */
372 static void
unset_nonblock(int fd)373 unset_nonblock(int fd)
374 {
375 	int var;
376 
377 	/* set fd to non-blocking */
378 	var = fcntl(fd, F_GETFL, 0);
379 	if (var == -1)
380 	{
381 		ereport(FATAL,
382 				(errmsg("unable to connect"),
383 				 errdetail("fcntl system call failed with error : \"%s\"",strerror(errno))));
384 
385 	}
386 	if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
387 	{
388 		ereport(FATAL,
389 				(errmsg("unable to connect"),
390 				 errdetail("fcntl system call failed with error : \"%s\"",strerror(errno))));
391 	}
392 }
393 
394 /*
395  * see if received username and password matches with one in the file
396  */
397 static int
user_authenticate(char * buf,char * passwd_file,char * salt,int salt_len)398 user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len)
399 {
400 	FILE *fp = NULL;
401 	char packet_username[MAX_USER_PASSWD_LEN+1];
402 	char packet_password[MAX_USER_PASSWD_LEN+1];
403 	char encrypt_buf[(MD5_PASSWD_LEN+1)*2];
404 	char file_username[MAX_USER_PASSWD_LEN+1];
405 	char file_password[MAX_USER_PASSWD_LEN+1];
406 	char *index = NULL;
407 	static char line[MAX_FILE_LINE_LEN+1];
408 	int i, len;
409 
410 	/* strcpy() should be OK, but use strncpy() to be extra careful */
411 	strncpy(packet_username, buf, MAX_USER_PASSWD_LEN);
412 	index = (char *) memchr(buf, '\0', MAX_USER_PASSWD_LEN);
413 	if (index == NULL)
414 	{
415 		ereport(FATAL,
416 			(errmsg("failed to authenticate PCP user"),
417 				 errdetail("error while reading authentication packet")));
418 		return 0;
419 	}
420 	strncpy(packet_password, ++index, MAX_USER_PASSWD_LEN);
421 
422 	fp = fopen(passwd_file, "r");
423 	if (fp == NULL)
424 	{
425 		ereport(FATAL,
426 				(errmsg("failed to authenticate PCP user"),
427 				 errdetail("could not open %s. reason: %s", passwd_file, strerror(errno))));
428 		return 0;
429 	}
430 
431 	/* for now, I don't care if duplicate username exists in the config file */
432 	while ((fgets(line, MAX_FILE_LINE_LEN, fp)) != NULL)
433 	{
434 		i = 0;
435 		len = 0;
436 
437 		if (line[0] == '\n' || line[0] == '#')
438 			continue;
439 
440 		while (line[i] != ':')
441 		{
442 			len++;
443 			if (++i > MAX_USER_PASSWD_LEN)
444 			{
445 				fclose(fp);
446 				ereport(FATAL,
447 					(errmsg("failed to authenticate PCP user"),
448 						 errdetail("username read from file \"%s\" is larger than maximum allowed username length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
449 				return 0;
450 			}
451 		}
452 		memcpy(file_username, line, len);
453 		file_username[len] = '\0';
454 
455 		if (strcmp(packet_username, file_username) != 0)
456 			continue;
457 
458 		i++;
459 		len = 0;
460 		while (line[i] != '\n' && line[i] != '\0')
461 		{
462 			len++;
463 			if (++i > MAX_USER_PASSWD_LEN)
464 			{
465 				fclose(fp);
466 				ereport(FATAL,
467 					(errmsg("failed to authenticate PCP user"),
468 						 errdetail("password read from file \"%s\" is larger than maximum allowed password length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
469 				return 0;
470 			}
471 		}
472 
473 		memcpy(file_password, line+strlen(file_username)+1, len);
474 		file_password[len] = '\0';
475 
476 		pool_md5_encrypt(file_password, file_username, strlen(file_username),
477 					  encrypt_buf + MD5_PASSWD_LEN + 1);
478 		encrypt_buf[(MD5_PASSWD_LEN+1)*2-1] = '\0';
479 
480 		pool_md5_encrypt(encrypt_buf+MD5_PASSWD_LEN+1, salt, salt_len,
481 					  encrypt_buf);
482 		encrypt_buf[MD5_PASSWD_LEN] = '\0';
483 
484 		if (strcmp(encrypt_buf, packet_password) == 0)
485 		{
486 			fclose(fp);
487 			return 1;
488 		}
489 	}
490 	fclose(fp);
491 	ereport(FATAL,
492 		(errmsg("authentication failed for user \"%s\"",packet_username),
493 			 errdetail("username and/or password does not match")));
494 
495 	return 0;
496 }
497 
498 
499 /* Detach a node */
pool_detach_node(int node_id,bool gracefully)500 static int pool_detach_node(int node_id, bool gracefully)
501 {
502 	if (!gracefully)
503 	{
504 		degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
505 		return 0;
506 	}
507 
508 	/* Check if the NODE DOWN can be executed on
509 	 * the given node id.
510 	 */
511 	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
512 
513 	/*
514 	 * Wait until all frontends exit
515 	 */
516 	*InRecovery = RECOVERY_DETACH;	/* This wiil ensure that new incoming
517 									 * connection requests are blocked */
518 
519 	if (wait_connection_closed())
520 	{
521 		/* wait timed out */
522 		finish_recovery();
523 		return -1;
524 	}
525 
526 	pcp_worker_wakeup_request = 0;
527 
528 	/*
529 	 * Now all frontends have gone. Let's do failover.
530 	 */
531 	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, false);
532 
533 	/*
534 	 * Wait for failover completed.
535 	 */
536 
537 	while (!pcp_worker_wakeup_request)
538 	{
539 		struct timeval t = {1, 0};
540 		select(0, NULL, NULL, NULL, &t);
541 	}
542 	pcp_worker_wakeup_request = 0;
543 
544 	/*
545 	 * Start to accept incoming connections and send SIGUSR2 to pgpool
546 	 * parent to distribute SIGUSR2 all pgpool children.
547 	 */
548 	finish_recovery();
549 
550 	return 0;
551 }
552 
553 /* Promote a node */
pool_promote_node(int node_id,bool gracefully)554 static int pool_promote_node(int node_id, bool gracefully)
555 {
556 	if (!gracefully)
557 	{
558 		promote_backend(node_id, REQ_DETAIL_CONFIRMED);	/* send promote request */
559 		return 0;
560 	}
561 
562 	/*
563 	 * Wait until all frontends exit
564 	 */
565 	*InRecovery = RECOVERY_PROMOTE;	/* This wiil ensure that new incoming
566 									 * connection requests are blocked */
567 
568 	if (wait_connection_closed())
569 	{
570 		/* wait timed out */
571 		finish_recovery();
572 		return -1;
573 	}
574 
575 	/*
576 	 * Now all frontends have gone. Let's do failover.
577 	 */
578 	promote_backend(node_id, REQ_DETAIL_CONFIRMED);		/* send promote request */
579 
580 	/*
581 	 * Wait for failover completed.
582 	 */
583 	pcp_worker_wakeup_request = 0;
584 
585 	while (!pcp_worker_wakeup_request)
586 	{
587 		struct timeval t = {1, 0};
588 		select(0, NULL, NULL, NULL, &t);
589 	}
590 	pcp_worker_wakeup_request = 0;
591 
592 	/*
593 	 * Start to accept incoming connections and send SIGUSR2 to pgpool
594 	 * parent to distribute SIGUSR2 all pgpool children.
595 	 */
596 	finish_recovery();
597 	return 0;
598 }
599 
600 static void
inform_process_count(PCP_CONNECTION * frontend)601 inform_process_count(PCP_CONNECTION *frontend)
602 {
603 	int wsize;
604 	int process_count;
605 	char process_count_str[16];
606 	int *process_list = NULL;
607 	char code[] = "CommandComplete";
608 	char *mesg = NULL;
609 	int i;
610 	int total_port_len = 0;
611 
612 	process_list = pool_get_process_list(&process_count);
613 
614 	mesg = (char *)palloc(7*process_count); /* PID is at most 6 characters long */
615 
616 	snprintf(process_count_str, sizeof(process_count_str), "%d", process_count);
617 
618 	for (i = 0; i < process_count; i++)
619 	{
620 		char process_id[7];
621 		snprintf(process_id, sizeof(process_id), "%d", process_list[i]);
622 		snprintf(mesg+total_port_len, strlen(process_id)+1, "%s", process_id);
623 		total_port_len += strlen(process_id)+1;
624 	}
625 
626 	pcp_write(frontend, "n", 1);
627 	wsize = htonl(sizeof(code) +
628 				  strlen(process_count_str)+1 +
629 				  total_port_len +
630 				  sizeof(int));
631 	pcp_write(frontend, &wsize, sizeof(int));
632 	pcp_write(frontend, code, sizeof(code));
633 	pcp_write(frontend, process_count_str, strlen(process_count_str)+1);
634 	pcp_write(frontend, mesg, total_port_len);
635 	do_pcp_flush(frontend);
636 
637 	pfree(process_list);
638 	pfree(mesg);
639 
640 	ereport(DEBUG1,
641 		(errmsg("PCP: informing process count"),
642 			 errdetail("%d process(es) found", process_count)));
643 }
644 
645 static void
inform_process_info(PCP_CONNECTION * frontend,char * buf)646 inform_process_info(PCP_CONNECTION *frontend,char *buf)
647 {
648 	int proc_id;
649 	int wsize;
650 	int num_proc = pool_config->num_init_children;
651 	int i;
652 
653 	proc_id = atoi(buf);
654 
655 	if ((proc_id != 0) && (pool_get_process_info(proc_id) == NULL))
656 	{
657 		ereport(ERROR,
658 			(errmsg("informing process info failed"),
659 				 errdetail("invalid process ID : %s",buf)));
660 	}
661 	else
662 	{
663 		/* First, send array size of connection_info */
664 		char arr_code[] = "ArraySize";
665 		char con_info_size[16];
666 		/* Finally, indicate that all data is sent */
667 		char fin_code[] = "CommandComplete";
668 
669 		POOL_REPORT_POOLS *pools = get_pools(&num_proc);
670 
671 		if (proc_id == 0)
672 		{
673 			snprintf(con_info_size, sizeof(con_info_size), "%d", num_proc);
674 		}
675 		else
676 		{
677 			snprintf(con_info_size, sizeof(con_info_size), "%d", pool_config->max_pool * NUM_BACKENDS);
678 		}
679 
680 		pcp_write(frontend, "p", 1);
681 		wsize = htonl(sizeof(arr_code) +
682 					  strlen(con_info_size)+1 +
683 					  sizeof(int));
684 		pcp_write(frontend, &wsize, sizeof(int));
685 		pcp_write(frontend, arr_code, sizeof(arr_code));
686 		pcp_write(frontend, con_info_size, strlen(con_info_size)+1);
687 		do_pcp_flush(frontend);
688 
689 		/* Second, send process information for all connection_info */
690 		for (i=0; i<num_proc; i++)
691 		{
692 			char code[] = "ProcessInfo";
693 			char proc_pid[16];
694 			char proc_start_time[20];
695 			char proc_create_time[20];
696 			char majorversion[5];
697 			char minorversion[5];
698 			char pool_counter[16];
699 			char backend_id[16];
700 			char backend_pid[16];
701 			char connected[2];
702 
703 			if (proc_id != 0 && proc_id != pools[i].pool_pid) continue;
704 
705 			snprintf(proc_pid, sizeof(proc_pid), "%d", pools[i].pool_pid);
706 			snprintf(proc_start_time, sizeof(proc_start_time), "%ld", pools[i].start_time);
707 			snprintf(proc_create_time, sizeof(proc_create_time), "%ld", pools[i].create_time);
708 			snprintf(majorversion, sizeof(majorversion), "%d", pools[i].pool_majorversion);
709 			snprintf(minorversion, sizeof(minorversion), "%d", pools[i].pool_minorversion);
710 			snprintf(pool_counter, sizeof(pool_counter), "%d", pools[i].pool_counter);
711 			snprintf(backend_id, sizeof(backend_pid), "%d", pools[i].backend_id);
712 			snprintf(backend_pid, sizeof(backend_pid), "%d", pools[i].pool_backendpid);
713 			snprintf(connected, sizeof(connected), "%d", pools[i].pool_connected);
714 
715 			pcp_write(frontend, "p", 1);
716 			wsize = htonl(	sizeof(code) +
717 						  strlen(proc_pid)+1 +
718 						  strlen(pools[i].database)+1 +
719 						  strlen(pools[i].username)+1 +
720 						  strlen(proc_start_time)+1 +
721 						  strlen(proc_create_time)+1 +
722 						  strlen(majorversion)+1 +
723 						  strlen(minorversion)+1 +
724 						  strlen(pool_counter)+1 +
725 						  strlen(backend_id)+1 +
726 						  strlen(backend_pid)+1 +
727 						  strlen(connected)+1 +
728 						  sizeof(int));
729 			pcp_write(frontend, &wsize, sizeof(int));
730 			pcp_write(frontend, code, sizeof(code));
731 			pcp_write(frontend, proc_pid, strlen(proc_pid)+1);
732 			pcp_write(frontend, pools[i].database, strlen(pools[i].database)+1);
733 			pcp_write(frontend, pools[i].username, strlen(pools[i].username)+1);
734 			pcp_write(frontend, proc_start_time, strlen(proc_start_time)+1);
735 			pcp_write(frontend, proc_create_time, strlen(proc_create_time)+1);
736 			pcp_write(frontend, majorversion, strlen(majorversion)+1);
737 			pcp_write(frontend, minorversion, strlen(minorversion)+1);
738 			pcp_write(frontend, pool_counter, strlen(pool_counter)+1);
739 			pcp_write(frontend, backend_id, strlen(backend_id)+1);
740 			pcp_write(frontend, backend_pid, strlen(backend_pid)+1);
741 			pcp_write(frontend, connected, strlen(connected)+1);
742 			do_pcp_flush(frontend);
743 		}
744 
745 		pcp_write(frontend, "p", 1);
746 		wsize = htonl(sizeof(fin_code) +
747 					  sizeof(int));
748 		pcp_write(frontend, &wsize, sizeof(int));
749 		pcp_write(frontend, fin_code, sizeof(fin_code));
750 		do_pcp_flush(frontend);
751 		ereport(DEBUG1,
752 				(errmsg("PCP informing process info"),
753 				 errdetail("retrieved process information from shared memory")));
754 
755 		pfree(pools);
756 	}
757 }
758 
759 static void
inform_watchdog_info(PCP_CONNECTION * frontend,char * buf)760 inform_watchdog_info(PCP_CONNECTION *frontend,char *buf)
761 {
762 	int wd_index;
763 	int json_data_len;
764 	int wsize;
765 	char code[] = "CommandComplete";
766 	char* json_data;
767 
768 	if (!pool_config->use_watchdog)
769 		ereport(ERROR,
770 			(errmsg("PCP: informing watchdog info failed"),
771 				 errdetail("watcdhog is not enabled")));
772 
773 	wd_index = atoi(buf);
774 
775 	json_data = wd_get_watchdog_nodes(wd_index);
776 	if (json_data == NULL)
777 		ereport(ERROR,
778 			(errmsg("PCP: informing watchdog info failed"),
779 				 errdetail("invalid watchdog index")));
780 
781 	ereport(DEBUG2,
782 		(errmsg("PCP: informing watchdog info"),
783 			 errdetail("retrieved node information from IPC socket")));
784 
785 	/*
786 	 * This is the voilation of PCP protocol but I think
787 	 * in future we should shift to more adaptable protocol for
788 	 * data transmition.
789 	 */
790 	json_data_len = strlen(json_data);
791 	wsize = htonl(sizeof(code) +
792 				  json_data_len+ 1 +
793 				  sizeof(int));
794 	pcp_write(frontend, "w", 1);
795 
796 	pcp_write(frontend, &wsize, sizeof(int));
797 	pcp_write(frontend, code, sizeof(code));
798 
799 	pcp_write(frontend, json_data, json_data_len +1);
800 	do_pcp_flush(frontend);
801 
802 	pfree(json_data);
803 }
804 
805 static void
inform_node_info(PCP_CONNECTION * frontend,char * buf)806 inform_node_info(PCP_CONNECTION *frontend,char *buf)
807 {
808 	int node_id;
809 	int wsize;
810 	char port_str[6];
811 	char status[2];
812 	char weight_str[20];
813 	char role_str[10];
814 	char code[] = "CommandComplete";
815 	BackendInfo *bi = NULL;
816 	SERVER_ROLE role;
817 
818 	node_id = atoi(buf);
819 
820 	bi = pool_get_node_info(node_id);
821 
822 	if (bi == NULL)
823 		ereport(ERROR,
824 				(errmsg("informing node info failed"),
825 				 errdetail("invalid node ID")));
826 
827 	ereport(DEBUG2,
828 			(errmsg("PCP: informing node info"),
829 			 errdetail("retrieved node information from shared memory")));
830 
831 	snprintf(port_str, sizeof(port_str), "%d", bi->backend_port);
832 	snprintf(status, sizeof(status), "%d", bi->backend_status);
833 	snprintf(weight_str, sizeof(weight_str), "%f", bi->backend_weight);
834 
835 	if (STREAM)
836 	{
837 		if (Req_info->primary_node_id == node_id)
838 			role = ROLE_PRIMARY;
839 		else
840 			role = ROLE_STANDBY;
841 	}
842 	else
843 	{
844 		if (Req_info->master_node_id == node_id)
845 			role = ROLE_MASTER;
846 		else
847 			role = ROLE_SLAVE;
848 	}
849 	snprintf(role_str, sizeof(role_str), "%d", role);
850 
851 	pcp_write(frontend, "i", 1);
852 	wsize = htonl(sizeof(code) +
853 				  strlen(bi->backend_hostname)+1 +
854 				  strlen(port_str)+1 +
855 				  strlen(status)+1 +
856 				  strlen(weight_str)+1 +
857 				  strlen(role_str)+1 +
858 				  sizeof(int));
859 	pcp_write(frontend, &wsize, sizeof(int));
860 	pcp_write(frontend, code, sizeof(code));
861 	pcp_write(frontend, bi->backend_hostname, strlen(bi->backend_hostname)+1);
862 	pcp_write(frontend, port_str, strlen(port_str)+1);
863 	pcp_write(frontend, status, strlen(status)+1);
864 	pcp_write(frontend, weight_str, strlen(weight_str)+1);
865 
866 	pcp_write(frontend, role_str, strlen(role_str)+1);
867 	do_pcp_flush(frontend);
868 }
869 
870 static void
inform_node_count(PCP_CONNECTION * frontend)871 inform_node_count(PCP_CONNECTION *frontend)
872 {
873 	int wsize;
874 	char mesg[16];
875 	char code[] = "CommandComplete";
876 	int node_count = pool_get_node_count();
877 
878 	snprintf(mesg, sizeof(mesg), "%d", node_count);
879 
880 	pcp_write(frontend, "l", 1);
881 	wsize = htonl(sizeof(code) +
882 				  strlen(mesg)+1 +
883 				  sizeof(int));
884 	pcp_write(frontend, &wsize, sizeof(int));
885 	pcp_write(frontend, code, sizeof(code));
886 	pcp_write(frontend, mesg, strlen(mesg)+1);
887 	do_pcp_flush(frontend);
888 
889 	ereport(DEBUG1,
890 			(errmsg("PCP: informing node count"),
891 			 errdetail("%d node(s) found", node_count)));
892 }
893 
894 static void
process_detach_node(PCP_CONNECTION * frontend,char * buf,char tos)895 process_detach_node(PCP_CONNECTION *frontend,char *buf, char tos)
896 {
897 	int node_id;
898 	int wsize;
899 	char code[] = "CommandComplete";
900 	bool gracefully;
901 
902 	if (tos == 'D')
903 		gracefully = false;
904 	else
905 		gracefully = true;
906 
907 	node_id = atoi(buf);
908 	ereport(DEBUG1,
909 			(errmsg("PCP: processing detach node"),
910 			 errdetail("detaching Node ID %d", node_id)));
911 
912 	pool_detach_node(node_id, gracefully);
913 
914 	pcp_write(frontend, "d", 1);
915 	wsize = htonl(sizeof(code) + sizeof(int));
916 	pcp_write(frontend, &wsize, sizeof(int));
917 	pcp_write(frontend, code, sizeof(code));
918 	do_pcp_flush(frontend);
919 }
920 
921 static void
process_attach_node(PCP_CONNECTION * frontend,char * buf)922 process_attach_node(PCP_CONNECTION *frontend,char *buf)
923 {
924 	int node_id;
925 	int wsize;
926 	char code[] = "CommandComplete";
927 
928 	node_id = atoi(buf);
929 	ereport(DEBUG1,
930 			(errmsg("PCP: processing attach node"),
931 			 errdetail("attaching Node ID %d", node_id)));
932 
933 	send_failback_request(node_id,true, REQ_DETAIL_CONFIRMED);
934 
935 	pcp_write(frontend, "c", 1);
936 	wsize = htonl(sizeof(code) + sizeof(int));
937 	pcp_write(frontend, &wsize, sizeof(int));
938 	pcp_write(frontend, code, sizeof(code));
939 	do_pcp_flush(frontend);
940 }
941 
942 
943 static void
process_recovery_request(PCP_CONNECTION * frontend,char * buf)944 process_recovery_request(PCP_CONNECTION *frontend,char *buf)
945 {
946 	int wsize;
947 	char code[] = "CommandComplete";
948 	int node_id = atoi(buf);
949 
950 	if ( (node_id < 0) || (node_id >= pool_config->backend_desc->num_backends) )
951 		ereport(ERROR,
952 			(errmsg("process recovery request failed"),
953 				 errdetail("node id %d is not valid", node_id)));
954 
955 	if ((!REPLICATION &&
956 		 !(MASTER_SLAVE &&
957 		   pool_config->master_slave_sub_mode == STREAM_MODE)) ||
958 		(MASTER_SLAVE &&
959 		 pool_config->master_slave_sub_mode == STREAM_MODE &&
960 		 node_id == PRIMARY_NODE_ID))
961 	{
962 		if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
963 			ereport(ERROR,
964 				(errmsg("process recovery request failed"),
965 					 errdetail("primary server cannot be recovered by online recovery.")));
966 		else
967 			ereport(ERROR,
968 				(errmsg("process recovery request failed"),
969 					 errdetail("recovery request is only allowed in replication and streaming replication modes.")));
970 	}
971 	else
972 	{
973 		if (pcp_mark_recovery_in_progress() == false)
974 			ereport(FATAL,
975 				(errmsg("process recovery request failed"),
976 					 errdetail("pgpool-II is already processing another recovery request.")));
977 
978 		ereport(DEBUG1,
979 			(errmsg("PCP: processing recovery request"),
980 				 errdetail("start online recovery")));
981 
982 		PG_TRY();
983 		{
984 			start_recovery(node_id);
985 			finish_recovery();
986 			pcp_write(frontend, "c", 1);
987 			wsize = htonl(sizeof(code) + sizeof(int));
988 			pcp_write(frontend, &wsize, sizeof(int));
989 			pcp_write(frontend, code, sizeof(code));
990 			do_pcp_flush(frontend);
991 			pcp_mark_recovery_finished();
992 		}
993 		PG_CATCH();
994 		{
995 			finish_recovery();
996 			pcp_mark_recovery_finished();
997 			PG_RE_THROW();
998 
999 		}PG_END_TRY();
1000 	}
1001 	do_pcp_flush(frontend);
1002 }
1003 
1004 static void
process_status_request(PCP_CONNECTION * frontend)1005 process_status_request(PCP_CONNECTION *frontend)
1006 {
1007 	int nrows = 0;
1008 	int i;
1009 	POOL_REPORT_CONFIG *status = get_config(&nrows);
1010 	int len = 0;
1011 	/* First, send array size of connection_info */
1012 	char arr_code[] = "ArraySize";
1013 	char code[] = "ProcessConfig";
1014 	/* Finally, indicate that all data is sent */
1015 	char fin_code[] = "CommandComplete";
1016 
1017 	pcp_write(frontend, "b", 1);
1018 	len = htonl(sizeof(arr_code) + sizeof(int) + sizeof(int));
1019 	pcp_write(frontend, &len, sizeof(int));
1020 	pcp_write(frontend, arr_code, sizeof(arr_code));
1021 	len = htonl(nrows);
1022 	pcp_write(frontend, &len, sizeof(int));
1023 
1024 	do_pcp_flush(frontend);
1025 
1026 	for (i = 0; i < nrows; i++)
1027 	{
1028 		pcp_write(frontend, "b", 1);
1029 		len = htonl(sizeof(int)
1030 					+ sizeof(code)
1031 					+ strlen(status[i].name) + 1
1032 					+ strlen(status[i].value) + 1
1033 					+ strlen(status[i].desc) + 1
1034 					);
1035 
1036 		pcp_write(frontend, &len, sizeof(int));
1037 		pcp_write(frontend, code, sizeof(code));
1038 		pcp_write(frontend, status[i].name, strlen(status[i].name)+1);
1039 		pcp_write(frontend, status[i].value, strlen(status[i].value)+1);
1040 		pcp_write(frontend, status[i].desc, strlen(status[i].desc)+1);
1041 	}
1042 
1043 	pcp_write(frontend, "b", 1);
1044 	len = htonl(sizeof(fin_code) + sizeof(int));
1045 	pcp_write(frontend, &len, sizeof(int));
1046 	pcp_write(frontend, fin_code, sizeof(fin_code));
1047 	do_pcp_flush(frontend);
1048 
1049 	pfree(status);
1050 	ereport(DEBUG1,
1051 			(errmsg("PCP: processing status request"),
1052 			 errdetail("retrieved status information")));
1053 }
1054 
1055 static void
process_promote_node(PCP_CONNECTION * frontend,char * buf,char tos)1056 process_promote_node(PCP_CONNECTION *frontend, char *buf, char tos)
1057 {
1058 	int node_id;
1059 	int wsize;
1060 	char code[] = "CommandComplete";
1061 	bool gracefully;
1062 
1063 	if (tos == 'J')
1064 		gracefully = false;
1065 	else
1066 		gracefully = true;
1067 
1068 	node_id = atoi(buf);
1069 	if ( (node_id < 0) || (node_id >= pool_config->backend_desc->num_backends) )
1070 		ereport(ERROR,
1071 				(errmsg("could not process recovery request"),
1072 				 errdetail("node id %d is not valid", node_id)));
1073 	/* promoting node is reserved to Streaming Replication */
1074 	if (!MASTER_SLAVE || pool_config->master_slave_sub_mode != STREAM_MODE)
1075 	{
1076 		ereport(FATAL,
1077 			(errmsg("invalid pgpool mode for process recovery request"),
1078 				 errdetail("not in streaming replication mode, can't promote node id %d", node_id)));
1079 
1080 	}
1081 
1082 	if (node_id == REAL_PRIMARY_NODE_ID)
1083 	{
1084 		ereport(FATAL,
1085 				(errmsg("invalid pgpool mode for process recovery request"),
1086 				 errdetail("specified node is already primary node, can't promote node id %d", node_id)));
1087 
1088 	}
1089 	ereport(DEBUG1,
1090 			(errmsg("PCP: processing promote node"),
1091 			 errdetail("promoting Node ID %d", node_id)));
1092 	pool_promote_node(node_id, gracefully);
1093 
1094 	pcp_write(frontend, "d", 1);
1095 	wsize = htonl(sizeof(code) + sizeof(int));
1096 	pcp_write(frontend, &wsize, sizeof(int));
1097 	pcp_write(frontend, code, sizeof(code));
1098 	do_pcp_flush(frontend);
1099 }
1100 
1101 static void
process_authentication(PCP_CONNECTION * frontend,char * buf,char * salt,int * random_salt)1102 process_authentication(PCP_CONNECTION *frontend, char *buf, char* salt, int *random_salt)
1103 {
1104 	int wsize;
1105 	int authenticated;
1106 
1107 	if (*random_salt)
1108 	{
1109 		authenticated = user_authenticate(buf, pcp_conf_file, salt, 4);
1110 	}
1111 	if (!*random_salt || !authenticated)
1112 	{
1113 		ereport(FATAL,
1114 			(errmsg("authentication failed"),
1115 				 errdetail("username and/or password does not match")));
1116 
1117 		*random_salt = 0;
1118 	}
1119 	else
1120 	{
1121 		char code[] = "AuthenticationOK";
1122 		pcp_write(frontend, "r", 1);
1123 		wsize = htonl(sizeof(code) + sizeof(int));
1124 		pcp_write(frontend, &wsize, sizeof(int));
1125 		pcp_write(frontend, code, sizeof(code));
1126 		do_pcp_flush(frontend);
1127 		*random_salt = 0;
1128 
1129 		ereport(DEBUG1,
1130 			(errmsg("PCP: processing authentication request"),
1131 				 errdetail("authentication OK")));
1132 	}
1133 }
1134 
1135 static void
send_md5salt(PCP_CONNECTION * frontend,char * salt)1136 send_md5salt(PCP_CONNECTION *frontend, char* salt)
1137 {
1138 	int wsize;
1139 	ereport(DEBUG1,
1140 			(errmsg("PCP: sending md5 salt to client")));
1141 
1142 	pool_random_salt(salt);
1143 
1144 	pcp_write(frontend, "m", 1);
1145 	wsize = htonl(sizeof(int) + 4);
1146 	pcp_write(frontend, &wsize, sizeof(int));
1147 	pcp_write(frontend, salt, 4);
1148 	do_pcp_flush(frontend);
1149 }
1150 
1151 static void
process_shutown_request(PCP_CONNECTION * frontend,char mode)1152 process_shutown_request(PCP_CONNECTION *frontend, char mode)
1153 {
1154 	char code[] = "CommandComplete";
1155 	pid_t ppid = getppid();
1156 	int sig,len;
1157 
1158 	if (mode == 's')
1159 	{
1160 		ereport(DEBUG1,
1161 				(errmsg("PCP: processing shutdown request"),
1162 				 errdetail("sending SIGTERM to the parent process with PID:%d", ppid)));
1163 		sig = SIGTERM;
1164 	}
1165 	else if (mode == 'f')
1166 	{
1167 		ereport(DEBUG1,
1168 				(errmsg("PCP: processing shutdown request"),
1169 				 errdetail("sending SIGINT to the parent process with PID:%d", ppid)));
1170 		sig = SIGINT;
1171 	}
1172 	else if (mode == 'i')
1173 	{
1174 		ereport(DEBUG1,
1175 				(errmsg("PCP: processing shutdown request"),
1176 				 errdetail("sending SIGQUIT to the parent process with PID:%d", ppid)));
1177 		sig = SIGQUIT;
1178 	}
1179 	else
1180 	{
1181 		ereport(ERROR,
1182 				(errmsg("PCP: error while processing shutdown request"),
1183 				 errdetail("invalid shutdown mode \"%c\"", mode)));
1184 	}
1185 
1186 	pcp_write(frontend, "t", 1);
1187 	len = htonl(sizeof(code) + sizeof(int));
1188 	pcp_write(frontend, &len, sizeof(int));
1189 	pcp_write(frontend, code, sizeof(code));
1190 	do_pcp_flush(frontend);
1191 
1192 	pool_signal_parent(sig);
1193 }
1194 
1195 static void
process_set_configration_parameter(PCP_CONNECTION * frontend,char * buf,int len)1196 process_set_configration_parameter(PCP_CONNECTION *frontend,char *buf, int len)
1197 {
1198 	char* param_name;
1199 	char* param_value;
1200 	int wsize;
1201 	char code[] = "CommandComplete";
1202 
1203 	param_name = buf;
1204 	if(param_name == NULL)
1205 		ereport(ERROR,
1206 				(errmsg("PCP: set configuration parameter failed"),
1207 				 errdetail("invalid pcp packet received from client")));
1208 
1209 	param_value = (char *) memchr(buf, '\0', len);
1210 	if(param_value == NULL)
1211 		ereport(ERROR,
1212 			(errmsg("set configuration parameter failed"),
1213 				 errdetail("invalid pcp packet received from client")));
1214 
1215 	param_value +=1;
1216 	ereport(LOG,
1217 			(errmsg("set configuration parameter, \"%s TO %s\"",param_name,param_value)));
1218 
1219 	if(strcasecmp(param_name, "client_min_messages") == 0)
1220 	{
1221 		const char *ordered_valid_values[] = {"debug5","debug4","debug3","debug2","debug1","log","commerror","info","notice","warning","error",NULL};
1222 		bool found = false;
1223 		int i;
1224 		for(i=0; ; i++)
1225 		{
1226 			char* valid_val = (char*)ordered_valid_values[i];
1227 			if(!valid_val)
1228 				break;
1229 
1230 			if (!strcasecmp(param_value, valid_val))
1231 			{
1232 				found = true;
1233 				pool_config->client_min_messages = i + 10;
1234 				ereport(DEBUG1,
1235 					(errmsg("PCP setting parameter \"%s\" to \"%s\"",param_name,param_value)));
1236 				break;
1237 			}
1238 		}
1239 		if (!found)
1240 			ereport(ERROR,
1241 				(errmsg("PCP: set configuration parameter failed"),
1242 					 errdetail("invalid value \"%s\" for parameter \"%s\"",param_value,param_name)));
1243 	}
1244 	else
1245 		ereport(ERROR,
1246 			(errmsg("PCP: set configuration parameter failed"),
1247 				 errdetail("invalid parameter \"%s\"",param_name)));
1248 
1249 	pcp_write(frontend, "a", 1);
1250 	wsize = htonl(sizeof(code) + sizeof(int));
1251 	pcp_write(frontend, &wsize, sizeof(int));
1252 	pcp_write(frontend, code, sizeof(code));
1253 	do_pcp_flush(frontend);
1254 }
1255 /*
1256  * Wrapper around pcp_flush which throws FATAL error when pcp_flush fails
1257  */
1258 static void
do_pcp_flush(PCP_CONNECTION * frontend)1259 do_pcp_flush(PCP_CONNECTION *frontend)
1260 {
1261 	if (pcp_flush(frontend) < 0)
1262 		ereport(FATAL,
1263 			(errmsg("failed to flush data to client"),
1264 				 errdetail("pcp_flush failed with error : \"%s\"",strerror(errno))));
1265 }
1266 
1267 /*
1268  * Wrapper around pcp_read which throws FATAL error when read fails
1269  */
1270 static void
do_pcp_read(PCP_CONNECTION * pc,void * buf,int len)1271 do_pcp_read(PCP_CONNECTION *pc, void *buf, int len)
1272 {
1273 	if (pcp_read(pc,buf,len))
1274 		ereport(FATAL,
1275 				(errmsg("unable to read from client"),
1276 					errdetail("pcp_read failed with error : \"%s\"",strerror(errno))));
1277 }
1278 
send_to_pcp_frontend(char * data,int len,bool flush)1279 int send_to_pcp_frontend(char* data, int len, bool flush)
1280 {
1281 	int ret;
1282 	if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1283 		return -1;
1284 	ret = pcp_write(pcp_frontend, data, len);
1285 	if (flush && !ret)
1286 		ret = pcp_flush(pcp_frontend);
1287 	return ret;
1288 }
1289 
pcp_frontend_exists(void)1290 int pcp_frontend_exists(void)
1291 {
1292 	if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1293 		return -1;
1294 	return 0;
1295 }
1296 
pcp_worker_will_go_down(int code,Datum arg)1297 static void pcp_worker_will_go_down(int code, Datum arg)
1298 {
1299 	if (processType != PT_PCP_WORKER)
1300 	{
1301 		/* should never happen */
1302 		ereport(WARNING,
1303 				(errmsg("pcp_worker_will_go_down called from invalid process")));
1304 		return;
1305 	}
1306 	if(pcp_frontend)
1307 		pcp_close(pcp_frontend);
1308 	processState = EXITING;
1309 	POOL_SETMASK(&UnBlockSig);
1310 
1311 }
1312