1 /* -*-pcp_worker.c-*- */
2 /*
3  *
4  * pgpool: a language independent connection pool server for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2003-2019	PgPool Global Development Group
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  *
20  * pcp_worker.c: PCP worker child process main
21  *
22  */
23 
24 #include "config.h"
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 
29 #include <arpa/inet.h>
30 #include <signal.h>
31 
32 #include <stdio.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <sys/time.h>
38 
39 #ifdef HAVE_FCNTL_H
40 #include <fcntl.h>
41 #endif
42 
43 #include "pcp/pcp_stream.h"
44 #include "pcp/pcp.h"
45 #include "auth/md5.h"
46 #include "pool_config.h"
47 #include "context/pool_process_context.h"
48 #include "utils/pool_process_reporting.h"
49 #include "watchdog/wd_json_data.h"
50 #include "watchdog/wd_ipc_commands.h"
51 #include "utils/elog.h"
52 
53 #define MAX_FILE_LINE_LEN    512
54 
55 extern char *pcp_conf_file;		/* global variable defined in main.c holds the
56 								 * path for pcp.conf */
57 volatile sig_atomic_t pcp_worker_wakeup_request = 0;
58 PCP_CONNECTION *volatile pcp_frontend = NULL;
59 
60 static RETSIGTYPE die(int sig);
61 static RETSIGTYPE wakeup_handler_child(int sig);
62 
63 static void unset_nonblock(int fd);
64 static int	user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len);
65 static void process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt);
66 static void send_md5salt(PCP_CONNECTION * frontend, char *salt);
67 
68 static void pcp_process_command(char tos, char *buf, int buf_len);
69 
70 static int	pool_detach_node(int node_id, bool gracefully);
71 static int	pool_promote_node(int node_id, bool gracefully);
72 static void inform_process_count(PCP_CONNECTION * frontend);
73 static void inform_process_info(PCP_CONNECTION * frontend, char *buf);
74 static void inform_watchdog_info(PCP_CONNECTION * frontend, char *buf);
75 static void inform_node_info(PCP_CONNECTION * frontend, char *buf);
76 static void inform_node_count(PCP_CONNECTION * frontend);
77 static void process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos);
78 static void process_attach_node(PCP_CONNECTION * frontend, char *buf);
79 static void process_recovery_request(PCP_CONNECTION * frontend, char *buf);
80 static void process_status_request(PCP_CONNECTION * frontend);
81 static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos);
82 static void process_shutown_request(PCP_CONNECTION * frontend, char mode);
83 static void process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len);
84 
85 static void pcp_worker_will_go_down(int code, Datum arg);
86 
87 static void do_pcp_flush(PCP_CONNECTION * frontend);
88 static void do_pcp_read(PCP_CONNECTION * pc, void *buf, int len);
89 
90 /*
91  * main entry pont of pcp worker child process
92  */
93 void
pcp_worker_main(int port)94 pcp_worker_main(int port)
95 {
96 	sigjmp_buf	local_sigjmp_buf;
97 	MemoryContext PCPMemoryContext;
98 	volatile int authenticated = 0;
99 
100 	char		salt[4];
101 	int			random_salt = 0;
102 	struct timeval uptime;
103 	char		tos;
104 	int			rsize;
105 
106 	ereport(DEBUG1,
107 			(errmsg("I am PCP worker child with pid:%d", getpid())));
108 
109 	/* Identify myself via ps */
110 	init_ps_display("", "", "", "");
111 
112 	gettimeofday(&uptime, NULL);
113 	srandom((unsigned int) (getpid() ^ uptime.tv_usec));
114 
115 	/* set up signal handlers */
116 	signal(SIGTERM, die);
117 	signal(SIGINT, die);
118 	signal(SIGQUIT, die);
119 	signal(SIGCHLD, SIG_DFL);
120 	signal(SIGUSR2, wakeup_handler_child);
121 	signal(SIGUSR1, SIG_IGN);
122 	signal(SIGHUP, SIG_IGN);
123 	signal(SIGPIPE, SIG_IGN);
124 	signal(SIGALRM, SIG_IGN);
125 	/* Create per loop iteration memory context */
126 	PCPMemoryContext = AllocSetContextCreate(TopMemoryContext,
127 											 "PCP_worker_main_loop",
128 											 ALLOCSET_DEFAULT_MINSIZE,
129 											 ALLOCSET_DEFAULT_INITSIZE,
130 											 ALLOCSET_DEFAULT_MAXSIZE);
131 
132 	MemoryContextSwitchTo(TopMemoryContext);
133 
134 	/*
135 	 * install the call back for preparation of pcp worker child exit
136 	 */
137 	on_system_exit(pcp_worker_will_go_down, (Datum) NULL);
138 
139 	/* Initialize my backend status */
140 	pool_initialize_private_backend_status();
141 
142 	/* Initialize process context */
143 	pool_init_process_context();
144 
145 	pcp_frontend = pcp_open(port);
146 	unset_nonblock(pcp_frontend->fd);
147 
148 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
149 	{
150 		error_context_stack = NULL;
151 		EmitErrorReport();
152 
153 		MemoryContextSwitchTo(TopMemoryContext);
154 		FlushErrorState();
155 	}
156 	/* We can now handle ereport(ERROR) */
157 	PG_exception_stack = &local_sigjmp_buf;
158 
159 	for (;;)
160 	{
161 		char	*buf = NULL;
162 
163 		MemoryContextSwitchTo(PCPMemoryContext);
164 		MemoryContextResetAndDeleteChildren(PCPMemoryContext);
165 
166 		errno = 0;
167 
168 		/* read a PCP packet */
169 		do_pcp_read(pcp_frontend, &tos, 1);
170 		do_pcp_read(pcp_frontend, &rsize, sizeof(int));
171 
172 		rsize = ntohl(rsize);
173 
174 		if (rsize <= 0 || rsize >= MAX_PCP_PACKET_LENGTH)
175 			ereport(FATAL,
176 					(errmsg("invalid PCP packet"),
177 					 errdetail("incorrect packet length (%d)", rsize)));
178 
179 		if ((rsize - sizeof(int)) > 0)
180 		{
181 			buf = (char *) palloc(rsize - sizeof(int));
182 			do_pcp_read(pcp_frontend, buf, rsize - sizeof(int));
183 		}
184 
185 		ereport(DEBUG1,
186 				(errmsg("received PCP packet"),
187 				 errdetail("PCP packet type of service '%c'", tos)));
188 
189 		if (tos == 'R')			/* authentication */
190 		{
191 			set_ps_display("PCP: processing authentication", false);
192 			process_authentication(pcp_frontend, buf, salt, &random_salt);
193 			authenticated = 1;
194 			continue;
195 		}
196 		if (tos == 'M')			/* md5 salt */
197 		{
198 			set_ps_display("PCP: processing authentication", false);
199 			send_md5salt(pcp_frontend, salt);
200 			random_salt = 1;
201 			continue;
202 		}
203 		/* is this connection authenticated? if not disconnect immediately */
204 		if (!authenticated)
205 			ereport(FATAL,
206 					(errmsg("authentication failed for new PCP connection"),
207 					 errdetail("connection not authorized")));
208 
209 		/* process a request */
210 		pcp_process_command(tos, buf, rsize);
211 	}
212 	exit(0);
213 }
214 
215 
216 /* pcp command processor */
217 static void
pcp_process_command(char tos,char * buf,int buf_len)218 pcp_process_command(char tos, char *buf, int buf_len)
219 {
220 
221 	if (tos == 'O' || tos == 'T')
222 	{
223 		if (Req_info->switching)
224 		{
225 			if (Req_info->request_queue_tail != Req_info->request_queue_head)
226 			{
227 				POOL_REQUEST_KIND reqkind;
228 
229 				reqkind = Req_info->request[(Req_info->request_queue_head + 1) % MAX_REQUEST_QUEUE_SIZE].kind;
230 
231 				if (reqkind == NODE_UP_REQUEST)
232 					ereport(ERROR,
233 							(errmsg("failed to process PCP request at the moment"),
234 							 errdetail("failback is in progress")));
235 				else if (reqkind == NODE_DOWN_REQUEST)
236 					ereport(ERROR,
237 							(errmsg("failed to process PCP request at the moment"),
238 							 errdetail("failover is in progress")));
239 				else if (reqkind == PROMOTE_NODE_REQUEST)
240 					ereport(ERROR,
241 							(errmsg("failed to process PCP request at the moment"),
242 							 errdetail("promote node operation is in progress")));
243 
244 				ereport(ERROR,
245 						(errmsg("failed to process PCP request at the moment"),
246 						 errdetail("operation is in progress")));
247 			}
248 		}
249 	}
250 
251 	switch (tos)
252 	{
253 		case 'A':				/* set configuration parameter */
254 			set_ps_display("PCP: processing set configration parameter request", false);
255 			process_set_configration_parameter(pcp_frontend, buf, buf_len);
256 			break;
257 
258 		case 'L':				/* node count */
259 			set_ps_display("PCP: processing node count request", false);
260 			inform_node_count(pcp_frontend);
261 			break;
262 
263 		case 'I':				/* node info */
264 			set_ps_display("PCP: processing node info request", false);
265 			inform_node_info(pcp_frontend, buf);
266 			break;
267 
268 		case 'N':				/* process count */
269 			set_ps_display("PCP: processing process count request", false);
270 			inform_process_count(pcp_frontend);
271 			break;
272 
273 		case 'P':				/* process info */
274 			set_ps_display("PCP: processing process info request", false);
275 			inform_process_info(pcp_frontend, buf);
276 			break;
277 
278 		case 'W':				/* watchdog info */
279 			set_ps_display("PCP: processing watchdog info request", false);
280 			inform_watchdog_info(pcp_frontend, buf);
281 			break;
282 
283 		case 'D':				/* detach node */
284 		case 'd':				/* detach node gracefully */
285 			set_ps_display("PCP: processing detach node request", false);
286 			process_detach_node(pcp_frontend, buf, tos);
287 			break;
288 
289 		case 'C':				/* attach node */
290 			set_ps_display("PCP: processing attach node request", false);
291 			process_attach_node(pcp_frontend, buf);
292 			break;
293 
294 		case 'T':
295 			set_ps_display("PCP: processing shutdown request", false);
296 			process_shutown_request(pcp_frontend, buf[0]);
297 			break;
298 
299 		case 'O':				/* recovery request */
300 			set_ps_display("PCP: processing recovery request", false);
301 			process_recovery_request(pcp_frontend, buf);
302 			break;
303 
304 		case 'B':				/* status request */
305 			set_ps_display("PCP: processing status request request", false);
306 			process_status_request(pcp_frontend);
307 			break;
308 
309 		case 'J':				/* promote node */
310 		case 'j':				/* promote node gracefully */
311 			set_ps_display("PCP: processing promote node request", false);
312 			process_promote_node(pcp_frontend, buf, tos);
313 			break;
314 
315 		case 'F':
316 			ereport(DEBUG1,
317 					(errmsg("PCP processing request, stop online recovery")));
318 			break;
319 
320 		case 'X':				/* disconnect */
321 			ereport(DEBUG1,
322 					(errmsg("PCP processing request, client disconnecting"),
323 					 errdetail("closing PCP connection, and exiting child")));
324 			pcp_close(pcp_frontend);
325 			pcp_frontend = NULL;
326 			/* This child has done its part. Rest in peace now */
327 			exit(0);
328 			break;
329 
330 		default:
331 			ereport(FATAL,
332 					(errmsg("PCP processing request"),
333 					 errdetail("unknown PCP packet type \"%c\"", tos)));
334 	}
335 }
336 
337 static RETSIGTYPE
die(int sig)338 die(int sig)
339 {
340 	ereport(DEBUG1,
341 			(errmsg("PCP worker child receives shutdown request signal %d", sig)));
342 	if (sig == SIGTERM)
343 	{
344 		ereport(DEBUG1,
345 				(errmsg("PCP worker child receives smart shutdown request."),
346 				 errdetail("waiting for the child to die its natural death")));
347 	}
348 	else if (sig == SIGINT)
349 	{
350 		ereport(DEBUG1,
351 				(errmsg("PCP worker child receives fast shutdown request.")));
352 		exit(0);
353 	}
354 	else if (sig == SIGQUIT)
355 	{
356 		ereport(DEBUG1,
357 				(errmsg("PCP worker child receives immediate shutdown request.")));
358 		exit(0);
359 	}
360 	else
361 		exit(1);
362 }
363 
364 
365 static RETSIGTYPE
wakeup_handler_child(int sig)366 wakeup_handler_child(int sig)
367 {
368 	pcp_worker_wakeup_request = 1;
369 }
370 
371 /*
372  * unset non-block flag
373  */
374 static void
unset_nonblock(int fd)375 unset_nonblock(int fd)
376 {
377 	int			var;
378 
379 	/* set fd to non-blocking */
380 	var = fcntl(fd, F_GETFL, 0);
381 	if (var == -1)
382 	{
383 		ereport(FATAL,
384 				(errmsg("unable to connect"),
385 				 errdetail("fcntl system call failed with error : \"%m\"")));
386 
387 	}
388 	if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
389 	{
390 		ereport(FATAL,
391 				(errmsg("unable to connect"),
392 				 errdetail("fcntl system call failed with error : \"%m\"")));
393 	}
394 }
395 
396 /*
397  * see if received username and password matches with one in the file
398  */
399 static int
user_authenticate(char * buf,char * passwd_file,char * salt,int salt_len)400 user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len)
401 {
402 	FILE	   *fp = NULL;
403 	char		packet_username[MAX_USER_PASSWD_LEN + 1];
404 	char		packet_password[MAX_USER_PASSWD_LEN + 1];
405 	char		encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
406 	char		file_username[MAX_USER_PASSWD_LEN + 1];
407 	char		file_password[MAX_USER_PASSWD_LEN + 1];
408 	char	   *index = NULL;
409 	static char line[MAX_FILE_LINE_LEN + 1];
410 	int			i,
411 				len;
412 
413 	/* strcpy() should be OK, but use strncpy() to be extra careful */
414 	strncpy(packet_username, buf, MAX_USER_PASSWD_LEN);
415 	index = (char *) memchr(buf, '\0', MAX_USER_PASSWD_LEN);
416 	if (index == NULL)
417 	{
418 		ereport(FATAL,
419 				(errmsg("failed to authenticate PCP user"),
420 				 errdetail("error while reading authentication packet")));
421 		return 0;
422 	}
423 	strncpy(packet_password, ++index, MAX_USER_PASSWD_LEN);
424 
425 	fp = fopen(passwd_file, "r");
426 	if (fp == NULL)
427 	{
428 		ereport(FATAL,
429 				(errmsg("failed to authenticate PCP user"),
430 				 errdetail("could not open %s. reason: %m", passwd_file)));
431 		return 0;
432 	}
433 
434 	/* for now, I don't care if duplicate username exists in the config file */
435 	while ((fgets(line, MAX_FILE_LINE_LEN, fp)) != NULL)
436 	{
437 		i = 0;
438 		len = 0;
439 
440 		if (line[0] == '\n' || line[0] == '#')
441 			continue;
442 
443 		while (line[i] != ':')
444 		{
445 			len++;
446 			if (++i > MAX_USER_PASSWD_LEN)
447 			{
448 				fclose(fp);
449 				ereport(FATAL,
450 						(errmsg("failed to authenticate PCP user"),
451 						 errdetail("username read from file \"%s\" is larger than maximum allowed username length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
452 				return 0;
453 			}
454 		}
455 		memcpy(file_username, line, len);
456 		file_username[len] = '\0';
457 
458 		if (strcmp(packet_username, file_username) != 0)
459 			continue;
460 
461 		i++;
462 		len = 0;
463 		while (line[i] != '\n' && line[i] != '\0')
464 		{
465 			len++;
466 			if (++i > MAX_USER_PASSWD_LEN)
467 			{
468 				fclose(fp);
469 				ereport(FATAL,
470 						(errmsg("failed to authenticate PCP user"),
471 						 errdetail("password read from file \"%s\" is larger than maximum allowed password length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
472 				return 0;
473 			}
474 		}
475 
476 		memcpy(file_password, line + strlen(file_username) + 1, len);
477 		file_password[len] = '\0';
478 
479 		pool_md5_encrypt(file_password, file_username, strlen(file_username),
480 						 encrypt_buf + MD5_PASSWD_LEN + 1);
481 		encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
482 
483 		pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, salt_len,
484 						 encrypt_buf);
485 		encrypt_buf[MD5_PASSWD_LEN] = '\0';
486 
487 		if (strcmp(encrypt_buf, packet_password) == 0)
488 		{
489 			fclose(fp);
490 			return 1;
491 		}
492 	}
493 	fclose(fp);
494 	ereport(FATAL,
495 			(errmsg("authentication failed for user \"%s\"", packet_username),
496 			 errdetail("username and/or password does not match")));
497 
498 	return 0;
499 }
500 
501 
502 /* Detach a node */
503 static int
pool_detach_node(int node_id,bool gracefully)504 pool_detach_node(int node_id, bool gracefully)
505 {
506 	if (!gracefully)
507 	{
508 		degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
509 		return 0;
510 	}
511 
512 	/*
513 	 * Check if the NODE DOWN can be executed on the given node id.
514 	 */
515 	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
516 
517 	/*
518 	 * Wait until all frontends exit
519 	 */
520 	*InRecovery = RECOVERY_DETACH;	/* This wiil ensure that new incoming
521 									 * connection requests are blocked */
522 
523 	if (wait_connection_closed())
524 	{
525 		/* wait timed out */
526 		finish_recovery();
527 		return -1;
528 	}
529 
530 	pcp_worker_wakeup_request = 0;
531 
532 	/*
533 	 * Now all frontends have gone. Let's do failover.
534 	 */
535 	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, false);
536 
537 	/*
538 	 * Wait for failover completed.
539 	 */
540 
541 	while (!pcp_worker_wakeup_request)
542 	{
543 		struct timeval t = {1, 0};
544 
545 		select(0, NULL, NULL, NULL, &t);
546 	}
547 	pcp_worker_wakeup_request = 0;
548 
549 	/*
550 	 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
551 	 * to distribute SIGUSR2 all pgpool children.
552 	 */
553 	finish_recovery();
554 
555 	return 0;
556 }
557 
558 /* Promote a node */
559 static int
pool_promote_node(int node_id,bool gracefully)560 pool_promote_node(int node_id, bool gracefully)
561 {
562 	if (!gracefully)
563 	{
564 		promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
565 		return 0;
566 	}
567 
568 	/*
569 	 * Wait until all frontends exit
570 	 */
571 	*InRecovery = RECOVERY_PROMOTE; /* This wiil ensure that new incoming
572 									 * connection requests are blocked */
573 
574 	if (wait_connection_closed())
575 	{
576 		/* wait timed out */
577 		finish_recovery();
578 		return -1;
579 	}
580 
581 	/*
582 	 * Now all frontends have gone. Let's do failover.
583 	 */
584 	promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
585 
586 	/*
587 	 * Wait for failover completed.
588 	 */
589 	pcp_worker_wakeup_request = 0;
590 
591 	while (!pcp_worker_wakeup_request)
592 	{
593 		struct timeval t = {1, 0};
594 
595 		select(0, NULL, NULL, NULL, &t);
596 	}
597 	pcp_worker_wakeup_request = 0;
598 
599 	/*
600 	 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
601 	 * to distribute SIGUSR2 all pgpool children.
602 	 */
603 	finish_recovery();
604 	return 0;
605 }
606 
607 static void
inform_process_count(PCP_CONNECTION * frontend)608 inform_process_count(PCP_CONNECTION * frontend)
609 {
610 	int			wsize;
611 	int			process_count;
612 	char		process_count_str[16];
613 	int		   *process_list = NULL;
614 	char		code[] = "CommandComplete";
615 	char	   *mesg = NULL;
616 	int			i;
617 	int			total_port_len = 0;
618 
619 	process_list = pool_get_process_list(&process_count);
620 
621 	mesg = (char *) palloc(7 * process_count);	/* PID is at most 6 characters
622 												 * long */
623 
624 	snprintf(process_count_str, sizeof(process_count_str), "%d", process_count);
625 
626 	for (i = 0; i < process_count; i++)
627 	{
628 		char		process_id[7];
629 
630 		snprintf(process_id, sizeof(process_id), "%d", process_list[i]);
631 		snprintf(mesg + total_port_len, strlen(process_id) + 1, "%s", process_id);
632 		total_port_len += strlen(process_id) + 1;
633 	}
634 
635 	pcp_write(frontend, "n", 1);
636 	wsize = htonl(sizeof(code) +
637 				  strlen(process_count_str) + 1 +
638 				  total_port_len +
639 				  sizeof(int));
640 	pcp_write(frontend, &wsize, sizeof(int));
641 	pcp_write(frontend, code, sizeof(code));
642 	pcp_write(frontend, process_count_str, strlen(process_count_str) + 1);
643 	pcp_write(frontend, mesg, total_port_len);
644 	do_pcp_flush(frontend);
645 
646 	pfree(process_list);
647 	pfree(mesg);
648 
649 	ereport(DEBUG1,
650 			(errmsg("PCP: informing process count"),
651 			 errdetail("%d process(es) found", process_count)));
652 }
653 
654 static void
inform_process_info(PCP_CONNECTION * frontend,char * buf)655 inform_process_info(PCP_CONNECTION * frontend, char *buf)
656 {
657 	int			proc_id;
658 	int			wsize;
659 	int			num_proc = pool_config->num_init_children;
660 	int			i;
661 
662 	proc_id = atoi(buf);
663 
664 	if ((proc_id != 0) && (pool_get_process_info(proc_id) == NULL))
665 	{
666 		ereport(ERROR,
667 				(errmsg("informing process info failed"),
668 				 errdetail("invalid process ID : %s", buf)));
669 	}
670 	else
671 	{
672 		/* First, send array size of connection_info */
673 		char		arr_code[] = "ArraySize";
674 		char		con_info_size[16];
675 
676 		/* Finally, indicate that all data is sent */
677 		char		fin_code[] = "CommandComplete";
678 
679 		POOL_REPORT_POOLS *pools = get_pools(&num_proc);
680 
681 		if (proc_id == 0)
682 		{
683 			snprintf(con_info_size, sizeof(con_info_size), "%d", num_proc);
684 		}
685 		else
686 		{
687 			snprintf(con_info_size, sizeof(con_info_size), "%d", pool_config->max_pool * NUM_BACKENDS);
688 		}
689 
690 		pcp_write(frontend, "p", 1);
691 		wsize = htonl(sizeof(arr_code) +
692 					  strlen(con_info_size) + 1 +
693 					  sizeof(int));
694 		pcp_write(frontend, &wsize, sizeof(int));
695 		pcp_write(frontend, arr_code, sizeof(arr_code));
696 		pcp_write(frontend, con_info_size, strlen(con_info_size) + 1);
697 		do_pcp_flush(frontend);
698 
699 		/* Second, send process information for all connection_info */
700 		for (i = 0; i < num_proc; i++)
701 		{
702 			char		code[] = "ProcessInfo";
703 			char		proc_pid[16];
704 			char		proc_start_time[20];
705 			char		proc_create_time[20];
706 			char		majorversion[5];
707 			char		minorversion[5];
708 			char		pool_counter[16];
709 			char		backend_id[16];
710 			char		backend_pid[16];
711 			char		connected[2];
712 
713 			if (proc_id != 0 && proc_id != pools[i].pool_pid)
714 				continue;
715 
716 			snprintf(proc_pid, sizeof(proc_pid), "%d", pools[i].pool_pid);
717 			snprintf(proc_start_time, sizeof(proc_start_time), "%ld", pools[i].start_time);
718 			snprintf(proc_create_time, sizeof(proc_create_time), "%ld", pools[i].create_time);
719 			snprintf(majorversion, sizeof(majorversion), "%d", pools[i].pool_majorversion);
720 			snprintf(minorversion, sizeof(minorversion), "%d", pools[i].pool_minorversion);
721 			snprintf(pool_counter, sizeof(pool_counter), "%d", pools[i].pool_counter);
722 			snprintf(backend_id, sizeof(backend_pid), "%d", pools[i].backend_id);
723 			snprintf(backend_pid, sizeof(backend_pid), "%d", pools[i].pool_backendpid);
724 			snprintf(connected, sizeof(connected), "%d", pools[i].pool_connected);
725 
726 			pcp_write(frontend, "p", 1);
727 			wsize = htonl(sizeof(code) +
728 						  strlen(proc_pid) + 1 +
729 						  strlen(pools[i].database) + 1 +
730 						  strlen(pools[i].username) + 1 +
731 						  strlen(proc_start_time) + 1 +
732 						  strlen(proc_create_time) + 1 +
733 						  strlen(majorversion) + 1 +
734 						  strlen(minorversion) + 1 +
735 						  strlen(pool_counter) + 1 +
736 						  strlen(backend_id) + 1 +
737 						  strlen(backend_pid) + 1 +
738 						  strlen(connected) + 1 +
739 						  sizeof(int));
740 			pcp_write(frontend, &wsize, sizeof(int));
741 			pcp_write(frontend, code, sizeof(code));
742 			pcp_write(frontend, proc_pid, strlen(proc_pid) + 1);
743 			pcp_write(frontend, pools[i].database, strlen(pools[i].database) + 1);
744 			pcp_write(frontend, pools[i].username, strlen(pools[i].username) + 1);
745 			pcp_write(frontend, proc_start_time, strlen(proc_start_time) + 1);
746 			pcp_write(frontend, proc_create_time, strlen(proc_create_time) + 1);
747 			pcp_write(frontend, majorversion, strlen(majorversion) + 1);
748 			pcp_write(frontend, minorversion, strlen(minorversion) + 1);
749 			pcp_write(frontend, pool_counter, strlen(pool_counter) + 1);
750 			pcp_write(frontend, backend_id, strlen(backend_id) + 1);
751 			pcp_write(frontend, backend_pid, strlen(backend_pid) + 1);
752 			pcp_write(frontend, connected, strlen(connected) + 1);
753 			do_pcp_flush(frontend);
754 		}
755 
756 		pcp_write(frontend, "p", 1);
757 		wsize = htonl(sizeof(fin_code) +
758 					  sizeof(int));
759 		pcp_write(frontend, &wsize, sizeof(int));
760 		pcp_write(frontend, fin_code, sizeof(fin_code));
761 		do_pcp_flush(frontend);
762 		ereport(DEBUG1,
763 				(errmsg("PCP informing process info"),
764 				 errdetail("retrieved process information from shared memory")));
765 
766 		pfree(pools);
767 	}
768 }
769 
770 static void
inform_watchdog_info(PCP_CONNECTION * frontend,char * buf)771 inform_watchdog_info(PCP_CONNECTION * frontend, char *buf)
772 {
773 	int			wd_index;
774 	int			json_data_len;
775 	int			wsize;
776 	char		code[] = "CommandComplete";
777 	char	   *json_data;
778 
779 	if (!pool_config->use_watchdog)
780 		ereport(ERROR,
781 				(errmsg("PCP: informing watchdog info failed"),
782 				 errdetail("watcdhog is not enabled")));
783 
784 	wd_index = atoi(buf);
785 
786 	json_data = wd_get_watchdog_nodes(wd_index);
787 	if (json_data == NULL)
788 		ereport(ERROR,
789 				(errmsg("PCP: informing watchdog info failed"),
790 				 errdetail("invalid watchdog index")));
791 
792 	ereport(DEBUG2,
793 			(errmsg("PCP: informing watchdog info"),
794 			 errdetail("retrieved node information from IPC socket")));
795 
796 	/*
797 	 * This is the voilation of PCP protocol but I think in future we should
798 	 * shift to more adaptable protocol for data transmition.
799 	 */
800 	json_data_len = strlen(json_data);
801 	wsize = htonl(sizeof(code) +
802 				  json_data_len + 1 +
803 				  sizeof(int));
804 	pcp_write(frontend, "w", 1);
805 
806 	pcp_write(frontend, &wsize, sizeof(int));
807 	pcp_write(frontend, code, sizeof(code));
808 
809 	pcp_write(frontend, json_data, json_data_len + 1);
810 	do_pcp_flush(frontend);
811 
812 	pfree(json_data);
813 }
814 
815 static void
inform_node_info(PCP_CONNECTION * frontend,char * buf)816 inform_node_info(PCP_CONNECTION * frontend, char *buf)
817 {
818 	int			node_id;
819 	int			wsize;
820 	char		port_str[6];
821 	char		status[2];
822 	char		weight_str[20];
823 	char		role_str[10];
824 	char		standby_delay_str[20];
825 	char		status_changed_time_str[20];
826 	char		code[] = "CommandComplete";
827 	BackendInfo *bi = NULL;
828 	SERVER_ROLE role;
829 
830 	node_id = atoi(buf);
831 
832 	bi = pool_get_node_info(node_id);
833 
834 	if (bi == NULL)
835 		ereport(ERROR,
836 				(errmsg("informing node info failed"),
837 				 errdetail("invalid node ID")));
838 
839 	ereport(DEBUG2,
840 			(errmsg("PCP: informing node info"),
841 			 errdetail("retrieved node information from shared memory")));
842 
843 	snprintf(port_str, sizeof(port_str), "%d", bi->backend_port);
844 	snprintf(status, sizeof(status), "%d", bi->backend_status);
845 	snprintf(weight_str, sizeof(weight_str), "%f", bi->backend_weight);
846 
847 	if (STREAM)
848 	{
849 		if (Req_info->primary_node_id == node_id)
850 			role = ROLE_PRIMARY;
851 		else
852 			role = ROLE_STANDBY;
853 	}
854 	else
855 	{
856 		if (Req_info->master_node_id == node_id)
857 			role = ROLE_MASTER;
858 		else
859 			role = ROLE_SLAVE;
860 	}
861 	snprintf(role_str, sizeof(role_str), "%d", role);
862 
863 	snprintf(standby_delay_str, sizeof(standby_delay_str), UINT64_FORMAT, bi->standby_delay);
864 
865 	snprintf(status_changed_time_str, sizeof(status_changed_time_str), UINT64_FORMAT, bi->status_changed_time);
866 
867 	pcp_write(frontend, "i", 1);
868 	wsize = htonl(sizeof(code) +
869 				  strlen(bi->backend_hostname) + 1 +
870 				  strlen(port_str) + 1 +
871 				  strlen(status) + 1 +
872 				  strlen(weight_str) + 1 +
873 				  strlen(role_str) + 1 +
874 				  strlen(standby_delay_str) + 1 +
875 				  strlen(status_changed_time_str) + 1 +
876 				  sizeof(int));
877 	pcp_write(frontend, &wsize, sizeof(int));
878 	pcp_write(frontend, code, sizeof(code));
879 	pcp_write(frontend, bi->backend_hostname, strlen(bi->backend_hostname) + 1);
880 	pcp_write(frontend, port_str, strlen(port_str) + 1);
881 	pcp_write(frontend, status, strlen(status) + 1);
882 	pcp_write(frontend, weight_str, strlen(weight_str) + 1);
883 	pcp_write(frontend, role_str, strlen(role_str) + 1);
884 	pcp_write(frontend, standby_delay_str, strlen(standby_delay_str) + 1);
885 	pcp_write(frontend, status_changed_time_str, strlen(status_changed_time_str) + 1);
886 
887 	do_pcp_flush(frontend);
888 }
889 
890 static void
inform_node_count(PCP_CONNECTION * frontend)891 inform_node_count(PCP_CONNECTION * frontend)
892 {
893 	int			wsize;
894 	char		mesg[16];
895 	char		code[] = "CommandComplete";
896 	int			node_count = pool_get_node_count();
897 
898 	snprintf(mesg, sizeof(mesg), "%d", node_count);
899 
900 	pcp_write(frontend, "l", 1);
901 	wsize = htonl(sizeof(code) +
902 				  strlen(mesg) + 1 +
903 				  sizeof(int));
904 	pcp_write(frontend, &wsize, sizeof(int));
905 	pcp_write(frontend, code, sizeof(code));
906 	pcp_write(frontend, mesg, strlen(mesg) + 1);
907 	do_pcp_flush(frontend);
908 
909 	ereport(DEBUG1,
910 			(errmsg("PCP: informing node count"),
911 			 errdetail("%d node(s) found", node_count)));
912 }
913 
914 static void
process_detach_node(PCP_CONNECTION * frontend,char * buf,char tos)915 process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos)
916 {
917 	int			node_id;
918 	int			wsize;
919 	char		code[] = "CommandComplete";
920 	bool		gracefully;
921 
922 	if (tos == 'D')
923 		gracefully = false;
924 	else
925 		gracefully = true;
926 
927 	node_id = atoi(buf);
928 	ereport(DEBUG1,
929 			(errmsg("PCP: processing detach node"),
930 			 errdetail("detaching Node ID %d", node_id)));
931 
932 	pool_detach_node(node_id, gracefully);
933 
934 	pcp_write(frontend, "d", 1);
935 	wsize = htonl(sizeof(code) + sizeof(int));
936 	pcp_write(frontend, &wsize, sizeof(int));
937 	pcp_write(frontend, code, sizeof(code));
938 	do_pcp_flush(frontend);
939 }
940 
941 static void
process_attach_node(PCP_CONNECTION * frontend,char * buf)942 process_attach_node(PCP_CONNECTION * frontend, char *buf)
943 {
944 	int			node_id;
945 	int			wsize;
946 	char		code[] = "CommandComplete";
947 
948 	node_id = atoi(buf);
949 	ereport(DEBUG1,
950 			(errmsg("PCP: processing attach node"),
951 			 errdetail("attaching Node ID %d", node_id)));
952 
953 	send_failback_request(node_id, true, REQ_DETAIL_CONFIRMED);
954 
955 	pcp_write(frontend, "c", 1);
956 	wsize = htonl(sizeof(code) + sizeof(int));
957 	pcp_write(frontend, &wsize, sizeof(int));
958 	pcp_write(frontend, code, sizeof(code));
959 	do_pcp_flush(frontend);
960 }
961 
962 
963 static void
process_recovery_request(PCP_CONNECTION * frontend,char * buf)964 process_recovery_request(PCP_CONNECTION * frontend, char *buf)
965 {
966 	int			wsize;
967 	char		code[] = "CommandComplete";
968 	int			node_id = atoi(buf);
969 
970 	if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
971 		ereport(ERROR,
972 				(errmsg("process recovery request failed"),
973 				 errdetail("node id %d is not valid", node_id)));
974 
975 	if ((!REPLICATION &&
976 		 !(MASTER_SLAVE &&
977 		   pool_config->master_slave_sub_mode == STREAM_MODE)) ||
978 		(MASTER_SLAVE &&
979 		 pool_config->master_slave_sub_mode == STREAM_MODE &&
980 		 node_id == PRIMARY_NODE_ID))
981 	{
982 		if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
983 			ereport(ERROR,
984 					(errmsg("process recovery request failed"),
985 					 errdetail("primary server cannot be recovered by online recovery.")));
986 		else
987 			ereport(ERROR,
988 					(errmsg("process recovery request failed"),
989 					 errdetail("recovery request is only allowed in replication and streaming replication modes.")));
990 	}
991 	else
992 	{
993 		if (pcp_mark_recovery_in_progress() == false)
994 			ereport(FATAL,
995 					(errmsg("process recovery request failed"),
996 					 errdetail("pgpool-II is already processing another recovery request.")));
997 
998 		ereport(DEBUG1,
999 				(errmsg("PCP: processing recovery request"),
1000 				 errdetail("start online recovery")));
1001 
1002 		PG_TRY();
1003 		{
1004 			start_recovery(node_id);
1005 			finish_recovery();
1006 			pcp_write(frontend, "c", 1);
1007 			wsize = htonl(sizeof(code) + sizeof(int));
1008 			pcp_write(frontend, &wsize, sizeof(int));
1009 			pcp_write(frontend, code, sizeof(code));
1010 			do_pcp_flush(frontend);
1011 			pcp_mark_recovery_finished();
1012 		}
1013 		PG_CATCH();
1014 		{
1015 			finish_recovery();
1016 			pcp_mark_recovery_finished();
1017 			PG_RE_THROW();
1018 
1019 		} PG_END_TRY();
1020 	}
1021 	do_pcp_flush(frontend);
1022 }
1023 
1024 static void
process_status_request(PCP_CONNECTION * frontend)1025 process_status_request(PCP_CONNECTION * frontend)
1026 {
1027 	int			nrows = 0;
1028 	int			i;
1029 	POOL_REPORT_CONFIG *status = get_config(&nrows);
1030 	int			len = 0;
1031 
1032 	/* First, send array size of connection_info */
1033 	char		arr_code[] = "ArraySize";
1034 	char		code[] = "ProcessConfig";
1035 
1036 	/* Finally, indicate that all data is sent */
1037 	char		fin_code[] = "CommandComplete";
1038 
1039 	pcp_write(frontend, "b", 1);
1040 	len = htonl(sizeof(arr_code) + sizeof(int) + sizeof(int));
1041 	pcp_write(frontend, &len, sizeof(int));
1042 	pcp_write(frontend, arr_code, sizeof(arr_code));
1043 	len = htonl(nrows);
1044 	pcp_write(frontend, &len, sizeof(int));
1045 
1046 	do_pcp_flush(frontend);
1047 
1048 	for (i = 0; i < nrows; i++)
1049 	{
1050 		pcp_write(frontend, "b", 1);
1051 		len = htonl(sizeof(int)
1052 					+ sizeof(code)
1053 					+ strlen(status[i].name) + 1
1054 					+ strlen(status[i].value) + 1
1055 					+ strlen(status[i].desc) + 1
1056 			);
1057 
1058 		pcp_write(frontend, &len, sizeof(int));
1059 		pcp_write(frontend, code, sizeof(code));
1060 		pcp_write(frontend, status[i].name, strlen(status[i].name) + 1);
1061 		pcp_write(frontend, status[i].value, strlen(status[i].value) + 1);
1062 		pcp_write(frontend, status[i].desc, strlen(status[i].desc) + 1);
1063 	}
1064 
1065 	pcp_write(frontend, "b", 1);
1066 	len = htonl(sizeof(fin_code) + sizeof(int));
1067 	pcp_write(frontend, &len, sizeof(int));
1068 	pcp_write(frontend, fin_code, sizeof(fin_code));
1069 	do_pcp_flush(frontend);
1070 
1071 	pfree(status);
1072 	ereport(DEBUG1,
1073 			(errmsg("PCP: processing status request"),
1074 			 errdetail("retrieved status information")));
1075 }
1076 
1077 static void
process_promote_node(PCP_CONNECTION * frontend,char * buf,char tos)1078 process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos)
1079 {
1080 	int			node_id;
1081 	int			wsize;
1082 	char		code[] = "CommandComplete";
1083 	bool		gracefully;
1084 
1085 	if (tos == 'J')
1086 		gracefully = false;
1087 	else
1088 		gracefully = true;
1089 
1090 	node_id = atoi(buf);
1091 	if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
1092 		ereport(ERROR,
1093 				(errmsg("could not process recovery request"),
1094 				 errdetail("node id %d is not valid", node_id)));
1095 	/* promoting node is reserved to Streaming Replication */
1096 	if (!MASTER_SLAVE || pool_config->master_slave_sub_mode != STREAM_MODE)
1097 	{
1098 		ereport(FATAL,
1099 				(errmsg("invalid pgpool mode for process recovery request"),
1100 				 errdetail("not in streaming replication mode, can't promote node id %d", node_id)));
1101 
1102 	}
1103 
1104 	if (node_id == REAL_PRIMARY_NODE_ID)
1105 	{
1106 		ereport(FATAL,
1107 				(errmsg("invalid pgpool mode for process recovery request"),
1108 				 errdetail("specified node is already primary node, can't promote node id %d", node_id)));
1109 
1110 	}
1111 	ereport(DEBUG1,
1112 			(errmsg("PCP: processing promote node"),
1113 			 errdetail("promoting Node ID %d", node_id)));
1114 	pool_promote_node(node_id, gracefully);
1115 
1116 	pcp_write(frontend, "d", 1);
1117 	wsize = htonl(sizeof(code) + sizeof(int));
1118 	pcp_write(frontend, &wsize, sizeof(int));
1119 	pcp_write(frontend, code, sizeof(code));
1120 	do_pcp_flush(frontend);
1121 }
1122 
1123 static void
process_authentication(PCP_CONNECTION * frontend,char * buf,char * salt,int * random_salt)1124 process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt)
1125 {
1126 	int			wsize;
1127 	int			authenticated;
1128 
1129 	if (*random_salt)
1130 	{
1131 		authenticated = user_authenticate(buf, pcp_conf_file, salt, 4);
1132 	}
1133 	if (!*random_salt || !authenticated)
1134 	{
1135 		ereport(FATAL,
1136 				(errmsg("authentication failed"),
1137 				 errdetail("username and/or password does not match")));
1138 
1139 		*random_salt = 0;
1140 	}
1141 	else
1142 	{
1143 		char		code[] = "AuthenticationOK";
1144 
1145 		pcp_write(frontend, "r", 1);
1146 		wsize = htonl(sizeof(code) + sizeof(int));
1147 		pcp_write(frontend, &wsize, sizeof(int));
1148 		pcp_write(frontend, code, sizeof(code));
1149 		do_pcp_flush(frontend);
1150 		*random_salt = 0;
1151 
1152 		ereport(DEBUG1,
1153 				(errmsg("PCP: processing authentication request"),
1154 				 errdetail("authentication OK")));
1155 	}
1156 }
1157 
1158 static void
send_md5salt(PCP_CONNECTION * frontend,char * salt)1159 send_md5salt(PCP_CONNECTION * frontend, char *salt)
1160 {
1161 	int			wsize;
1162 
1163 	ereport(DEBUG1,
1164 			(errmsg("PCP: sending md5 salt to client")));
1165 
1166 	pool_random_salt(salt);
1167 
1168 	pcp_write(frontend, "m", 1);
1169 	wsize = htonl(sizeof(int) + 4);
1170 	pcp_write(frontend, &wsize, sizeof(int));
1171 	pcp_write(frontend, salt, 4);
1172 	do_pcp_flush(frontend);
1173 }
1174 
1175 static void
process_shutown_request(PCP_CONNECTION * frontend,char mode)1176 process_shutown_request(PCP_CONNECTION * frontend, char mode)
1177 {
1178 	char		code[] = "CommandComplete";
1179 	pid_t		ppid = getppid();
1180 	int			sig,
1181 				len;
1182 
1183 	if (mode == 's')
1184 	{
1185 		ereport(DEBUG1,
1186 				(errmsg("PCP: processing shutdown request"),
1187 				 errdetail("sending SIGTERM to the parent process with PID:%d", ppid)));
1188 		sig = SIGTERM;
1189 	}
1190 	else if (mode == 'f')
1191 	{
1192 		ereport(DEBUG1,
1193 				(errmsg("PCP: processing shutdown request"),
1194 				 errdetail("sending SIGINT to the parent process with PID:%d", ppid)));
1195 		sig = SIGINT;
1196 	}
1197 	else if (mode == 'i')
1198 	{
1199 		ereport(DEBUG1,
1200 				(errmsg("PCP: processing shutdown request"),
1201 				 errdetail("sending SIGQUIT to the parent process with PID:%d", ppid)));
1202 		sig = SIGQUIT;
1203 	}
1204 	else
1205 	{
1206 		ereport(ERROR,
1207 				(errmsg("PCP: error while processing shutdown request"),
1208 				 errdetail("invalid shutdown mode \"%c\"", mode)));
1209 	}
1210 
1211 	pcp_write(frontend, "t", 1);
1212 	len = htonl(sizeof(code) + sizeof(int));
1213 	pcp_write(frontend, &len, sizeof(int));
1214 	pcp_write(frontend, code, sizeof(code));
1215 	do_pcp_flush(frontend);
1216 
1217 	pool_signal_parent(sig);
1218 }
1219 
1220 static void
process_set_configration_parameter(PCP_CONNECTION * frontend,char * buf,int len)1221 process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len)
1222 {
1223 	char	   *param_name;
1224 	char	   *param_value;
1225 	int			wsize;
1226 	char		code[] = "CommandComplete";
1227 
1228 	param_name = buf;
1229 	if (param_name == NULL)
1230 		ereport(ERROR,
1231 				(errmsg("PCP: set configuration parameter failed"),
1232 				 errdetail("invalid pcp packet received from client")));
1233 
1234 	param_value = (char *) memchr(buf, '\0', len);
1235 	if (param_value == NULL)
1236 		ereport(ERROR,
1237 				(errmsg("set configuration parameter failed"),
1238 				 errdetail("invalid pcp packet received from client")));
1239 
1240 	param_value += 1;
1241 	ereport(LOG,
1242 			(errmsg("set configuration parameter, \"%s TO %s\"", param_name, param_value)));
1243 
1244 	if (strcasecmp(param_name, "client_min_messages") == 0)
1245 	{
1246 		const char *ordered_valid_values[] = {"debug5", "debug4", "debug3", "debug2", "debug1", "log", "commerror", "info", "notice", "warning", "error", NULL};
1247 		bool		found = false;
1248 		int			i;
1249 
1250 		for (i = 0;; i++)
1251 		{
1252 			char	   *valid_val = (char *) ordered_valid_values[i];
1253 
1254 			if (!valid_val)
1255 				break;
1256 
1257 			if (!strcasecmp(param_value, valid_val))
1258 			{
1259 				found = true;
1260 				pool_config->client_min_messages = i + 10;
1261 				ereport(DEBUG1,
1262 						(errmsg("PCP setting parameter \"%s\" to \"%s\"", param_name, param_value)));
1263 				break;
1264 			}
1265 		}
1266 		if (!found)
1267 			ereport(ERROR,
1268 					(errmsg("PCP: set configuration parameter failed"),
1269 					 errdetail("invalid value \"%s\" for parameter \"%s\"", param_value, param_name)));
1270 	}
1271 	else
1272 		ereport(ERROR,
1273 				(errmsg("PCP: set configuration parameter failed"),
1274 				 errdetail("invalid parameter \"%s\"", param_name)));
1275 
1276 	pcp_write(frontend, "a", 1);
1277 	wsize = htonl(sizeof(code) + sizeof(int));
1278 	pcp_write(frontend, &wsize, sizeof(int));
1279 	pcp_write(frontend, code, sizeof(code));
1280 	do_pcp_flush(frontend);
1281 }
1282 
1283 /*
1284  * Wrapper around pcp_flush which throws FATAL error when pcp_flush fails
1285  */
1286 static void
do_pcp_flush(PCP_CONNECTION * frontend)1287 do_pcp_flush(PCP_CONNECTION * frontend)
1288 {
1289 	if (pcp_flush(frontend) < 0)
1290 		ereport(FATAL,
1291 				(errmsg("failed to flush data to client"),
1292 				 errdetail("pcp_flush failed with error : \"%m\"")));
1293 }
1294 
1295 /*
1296  * Wrapper around pcp_read which throws FATAL error when read fails
1297  */
1298 static void
do_pcp_read(PCP_CONNECTION * pc,void * buf,int len)1299 do_pcp_read(PCP_CONNECTION * pc, void *buf, int len)
1300 {
1301 	if (pcp_read(pc, buf, len))
1302 		ereport(FATAL,
1303 				(errmsg("unable to read from client"),
1304 				 errdetail("pcp_read failed with error : \"%m\"")));
1305 }
1306 
1307 int
send_to_pcp_frontend(char * data,int len,bool flush)1308 send_to_pcp_frontend(char *data, int len, bool flush)
1309 {
1310 	int			ret;
1311 
1312 	if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1313 		return -1;
1314 	ret = pcp_write(pcp_frontend, data, len);
1315 	if (flush && !ret)
1316 		ret = pcp_flush(pcp_frontend);
1317 	return ret;
1318 }
1319 
1320 int
pcp_frontend_exists(void)1321 pcp_frontend_exists(void)
1322 {
1323 	if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1324 		return -1;
1325 	return 0;
1326 }
1327 
1328 static void
pcp_worker_will_go_down(int code,Datum arg)1329 pcp_worker_will_go_down(int code, Datum arg)
1330 {
1331 	if (processType != PT_PCP_WORKER)
1332 	{
1333 		/* should never happen */
1334 		ereport(WARNING,
1335 				(errmsg("pcp_worker_will_go_down called from invalid process")));
1336 		return;
1337 	}
1338 	if (pcp_frontend)
1339 		pcp_close(pcp_frontend);
1340 	processState = EXITING;
1341 	POOL_SETMASK(&UnBlockSig);
1342 
1343 }
1344