1 /* -*-pcp_worker.c-*- */
2 /*
3  *
4  * pgpool: a language independent connection pool server for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2003-2019	PgPool Global Development Group
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  *
20  * pcp_worker.c: PCP worker child process main
21  *
22  */
23 
24 #include "config.h"
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 
29 #include <arpa/inet.h>
30 #include <signal.h>
31 
32 #include <stdio.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <sys/time.h>
38 
39 #ifdef HAVE_FCNTL_H
40 #include <fcntl.h>
41 #endif
42 
43 #include "pcp/pcp_stream.h"
44 #include "pcp/pcp.h"
45 #include "auth/md5.h"
46 #include "pool_config.h"
47 #include "context/pool_process_context.h"
48 #include "utils/pool_process_reporting.h"
49 #include "watchdog/wd_json_data.h"
50 #include "watchdog/wd_ipc_commands.h"
51 #include "utils/elog.h"
52 
53 #define MAX_FILE_LINE_LEN    512
54 
55 extern char *pcp_conf_file;		/* global variable defined in main.c holds the
56 								 * path for pcp.conf */
57 volatile sig_atomic_t pcp_worker_wakeup_request = 0;
58 PCP_CONNECTION *volatile pcp_frontend = NULL;
59 
60 static RETSIGTYPE die(int sig);
61 static RETSIGTYPE wakeup_handler_child(int sig);
62 
63 static void unset_nonblock(int fd);
64 static int	user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len);
65 static void process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt);
66 static void send_md5salt(PCP_CONNECTION * frontend, char *salt);
67 
68 static void pcp_process_command(char tos, char *buf, int buf_len);
69 
70 static int	pool_detach_node(int node_id, bool gracefully);
71 static int	pool_promote_node(int node_id, bool gracefully);
72 static void inform_process_count(PCP_CONNECTION * frontend);
73 static void inform_process_info(PCP_CONNECTION * frontend, char *buf);
74 static void inform_watchdog_info(PCP_CONNECTION * frontend, char *buf);
75 static void inform_node_info(PCP_CONNECTION * frontend, char *buf);
76 static void inform_node_count(PCP_CONNECTION * frontend);
77 static void process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos);
78 static void process_attach_node(PCP_CONNECTION * frontend, char *buf);
79 static void process_recovery_request(PCP_CONNECTION * frontend, char *buf);
80 static void process_status_request(PCP_CONNECTION * frontend);
81 static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos);
82 static void process_shutown_request(PCP_CONNECTION * frontend, char mode);
83 static void process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len);
84 
85 static void pcp_worker_will_go_down(int code, Datum arg);
86 
87 static void do_pcp_flush(PCP_CONNECTION * frontend);
88 static void do_pcp_read(PCP_CONNECTION * pc, void *buf, int len);
89 
90 /*
91  * main entry pont of pcp worker child process
92  */
93 void
pcp_worker_main(int port)94 pcp_worker_main(int port)
95 {
96 	sigjmp_buf	local_sigjmp_buf;
97 	MemoryContext PCPMemoryContext;
98 	volatile int authenticated = 0;
99 
100 	char		salt[4];
101 	int			random_salt = 0;
102 	struct timeval uptime;
103 	char		tos;
104 	int			rsize;
105 
106 	ereport(DEBUG1,
107 			(errmsg("I am PCP worker child with pid:%d", getpid())));
108 
109 	/* Identify myself via ps */
110 	init_ps_display("", "", "", "");
111 
112 	gettimeofday(&uptime, NULL);
113 	srandom((unsigned int) (getpid() ^ uptime.tv_usec));
114 
115 	/* set up signal handlers */
116 	signal(SIGTERM, die);
117 	signal(SIGINT, die);
118 	signal(SIGQUIT, die);
119 	signal(SIGCHLD, SIG_DFL);
120 	signal(SIGUSR2, wakeup_handler_child);
121 	signal(SIGUSR1, SIG_IGN);
122 	signal(SIGHUP, SIG_IGN);
123 	signal(SIGPIPE, SIG_IGN);
124 	signal(SIGALRM, SIG_IGN);
125 	/* Create per loop iteration memory context */
126 	PCPMemoryContext = AllocSetContextCreate(TopMemoryContext,
127 											 "PCP_worker_main_loop",
128 											 ALLOCSET_DEFAULT_MINSIZE,
129 											 ALLOCSET_DEFAULT_INITSIZE,
130 											 ALLOCSET_DEFAULT_MAXSIZE);
131 
132 	MemoryContextSwitchTo(TopMemoryContext);
133 
134 	/*
135 	 * install the call back for preparation of pcp worker child exit
136 	 */
137 	on_system_exit(pcp_worker_will_go_down, (Datum) NULL);
138 
139 	/* Initialize my backend status */
140 	pool_initialize_private_backend_status();
141 
142 	/* Initialize process context */
143 	pool_init_process_context();
144 
145 	pcp_frontend = pcp_open(port);
146 	unset_nonblock(pcp_frontend->fd);
147 
148 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
149 	{
150 		error_context_stack = NULL;
151 		EmitErrorReport();
152 
153 		MemoryContextSwitchTo(TopMemoryContext);
154 		FlushErrorState();
155 	}
156 	/* We can now handle ereport(ERROR) */
157 	PG_exception_stack = &local_sigjmp_buf;
158 
159 	for (;;)
160 	{
161 		char	*buf = NULL;
162 
163 		MemoryContextSwitchTo(PCPMemoryContext);
164 		MemoryContextResetAndDeleteChildren(PCPMemoryContext);
165 
166 		errno = 0;
167 
168 		/* read a PCP packet */
169 		do_pcp_read(pcp_frontend, &tos, 1);
170 		do_pcp_read(pcp_frontend, &rsize, sizeof(int));
171 
172 		rsize = ntohl(rsize);
173 
174 		if (rsize <= 0 || rsize >= MAX_PCP_PACKET_LENGTH)
175 			ereport(FATAL,
176 					(errmsg("invalid PCP packet"),
177 					 errdetail("incorrect packet length (%d)", rsize)));
178 
179 		if ((rsize - sizeof(int)) > 0)
180 		{
181 			buf = (char *) palloc(rsize - sizeof(int));
182 			do_pcp_read(pcp_frontend, buf, rsize - sizeof(int));
183 		}
184 
185 		ereport(DEBUG1,
186 				(errmsg("received PCP packet"),
187 				 errdetail("PCP packet type of service '%c'", tos)));
188 
189 		if (tos == 'R')			/* authentication */
190 		{
191 			set_ps_display("PCP: processing authentication", false);
192 			process_authentication(pcp_frontend, buf, salt, &random_salt);
193 			authenticated = 1;
194 			continue;
195 		}
196 		if (tos == 'M')			/* md5 salt */
197 		{
198 			set_ps_display("PCP: processing authentication", false);
199 			send_md5salt(pcp_frontend, salt);
200 			random_salt = 1;
201 			continue;
202 		}
203 		/* is this connection authenticated? if not disconnect immediately */
204 		if (!authenticated)
205 			ereport(FATAL,
206 					(errmsg("authentication failed for new PCP connection"),
207 					 errdetail("connection not authorized")));
208 
209 		/* process a request */
210 		pcp_process_command(tos, buf, rsize);
211 	}
212 	exit(0);
213 }
214 
215 
216 /* pcp command processor */
217 static void
pcp_process_command(char tos,char * buf,int buf_len)218 pcp_process_command(char tos, char *buf, int buf_len)
219 {
220 
221 	if (tos == 'O' || tos == 'T')
222 	{
223 		if (Req_info->switching)
224 		{
225 			if (Req_info->request_queue_tail != Req_info->request_queue_head)
226 			{
227 				POOL_REQUEST_KIND reqkind;
228 
229 				reqkind = Req_info->request[(Req_info->request_queue_head + 1) % MAX_REQUEST_QUEUE_SIZE].kind;
230 
231 				if (reqkind == NODE_UP_REQUEST)
232 					ereport(ERROR,
233 							(errmsg("failed to process PCP request at the moment"),
234 							 errdetail("failback is in progress")));
235 				else if (reqkind == NODE_DOWN_REQUEST)
236 					ereport(ERROR,
237 							(errmsg("failed to process PCP request at the moment"),
238 							 errdetail("failover is in progress")));
239 				else if (reqkind == PROMOTE_NODE_REQUEST)
240 					ereport(ERROR,
241 							(errmsg("failed to process PCP request at the moment"),
242 							 errdetail("promote node operation is in progress")));
243 
244 				ereport(ERROR,
245 						(errmsg("failed to process PCP request at the moment"),
246 						 errdetail("operation is in progress")));
247 			}
248 		}
249 	}
250 
251 	switch (tos)
252 	{
253 		case 'A':				/* set configuration parameter */
254 			set_ps_display("PCP: processing set configration parameter request", false);
255 			process_set_configration_parameter(pcp_frontend, buf, buf_len);
256 			break;
257 
258 		case 'L':				/* node count */
259 			set_ps_display("PCP: processing node count request", false);
260 			inform_node_count(pcp_frontend);
261 			break;
262 
263 		case 'I':				/* node info */
264 			set_ps_display("PCP: processing node info request", false);
265 			inform_node_info(pcp_frontend, buf);
266 			break;
267 
268 		case 'N':				/* process count */
269 			set_ps_display("PCP: processing process count request", false);
270 			inform_process_count(pcp_frontend);
271 			break;
272 
273 		case 'P':				/* process info */
274 			set_ps_display("PCP: processing process info request", false);
275 			inform_process_info(pcp_frontend, buf);
276 			break;
277 
278 		case 'W':				/* watchdog info */
279 			set_ps_display("PCP: processing watchdog info request", false);
280 			inform_watchdog_info(pcp_frontend, buf);
281 			break;
282 
283 		case 'D':				/* detach node */
284 		case 'd':				/* detach node gracefully */
285 			set_ps_display("PCP: processing detach node request", false);
286 			process_detach_node(pcp_frontend, buf, tos);
287 			break;
288 
289 		case 'C':				/* attach node */
290 			set_ps_display("PCP: processing attach node request", false);
291 			process_attach_node(pcp_frontend, buf);
292 			break;
293 
294 		case 'T':
295 			set_ps_display("PCP: processing shutdown request", false);
296 			process_shutown_request(pcp_frontend, buf[0]);
297 			break;
298 
299 		case 'O':				/* recovery request */
300 			set_ps_display("PCP: processing recovery request", false);
301 			process_recovery_request(pcp_frontend, buf);
302 			break;
303 
304 		case 'B':				/* status request */
305 			set_ps_display("PCP: processing status request request", false);
306 			process_status_request(pcp_frontend);
307 			break;
308 
309 		case 'J':				/* promote node */
310 		case 'j':				/* promote node gracefully */
311 			set_ps_display("PCP: processing promote node request", false);
312 			process_promote_node(pcp_frontend, buf, tos);
313 			break;
314 
315 		case 'F':
316 			ereport(DEBUG1,
317 					(errmsg("PCP processing request, stop online recovery")));
318 			break;
319 
320 		case 'X':				/* disconnect */
321 			ereport(DEBUG1,
322 					(errmsg("PCP processing request, client disconnecting"),
323 					 errdetail("closing PCP connection, and exiting child")));
324 			pcp_close(pcp_frontend);
325 			pcp_frontend = NULL;
326 			/* This child has done its part. Rest in peace now */
327 			exit(0);
328 			break;
329 
330 		default:
331 			ereport(FATAL,
332 					(errmsg("PCP processing request"),
333 					 errdetail("unknown PCP packet type \"%c\"", tos)));
334 	}
335 }
336 
337 static RETSIGTYPE
die(int sig)338 die(int sig)
339 {
340 	ereport(DEBUG1,
341 			(errmsg("PCP worker child receives shutdown request signal %d", sig)));
342 	if (sig == SIGTERM)
343 	{
344 		ereport(DEBUG1,
345 				(errmsg("PCP worker child receives smart shutdown request."),
346 				 errdetail("waiting for the child to die its natural death")));
347 	}
348 	else if (sig == SIGINT)
349 	{
350 		ereport(DEBUG1,
351 				(errmsg("PCP worker child receives fast shutdown request.")));
352 		exit(0);
353 	}
354 	else if (sig == SIGQUIT)
355 	{
356 		ereport(DEBUG1,
357 				(errmsg("PCP worker child receives immediate shutdown request.")));
358 		exit(0);
359 	}
360 	else
361 		exit(1);
362 }
363 
364 
365 static RETSIGTYPE
wakeup_handler_child(int sig)366 wakeup_handler_child(int sig)
367 {
368 	pcp_worker_wakeup_request = 1;
369 }
370 
371 /*
372  * unset non-block flag
373  */
374 static void
unset_nonblock(int fd)375 unset_nonblock(int fd)
376 {
377 	int			var;
378 
379 	/* set fd to non-blocking */
380 	var = fcntl(fd, F_GETFL, 0);
381 	if (var == -1)
382 	{
383 		ereport(FATAL,
384 				(errmsg("unable to connect"),
385 				 errdetail("fcntl system call failed with error : \"%m\"")));
386 
387 	}
388 	if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
389 	{
390 		ereport(FATAL,
391 				(errmsg("unable to connect"),
392 				 errdetail("fcntl system call failed with error : \"%m\"")));
393 	}
394 }
395 
396 /*
397  * see if received username and password matches with one in the file
398  */
399 static int
user_authenticate(char * buf,char * passwd_file,char * salt,int salt_len)400 user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len)
401 {
402 	FILE	   *fp = NULL;
403 	char		packet_username[MAX_USER_PASSWD_LEN + 1];
404 	char		packet_password[MAX_USER_PASSWD_LEN + 1];
405 	char		encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
406 	char		file_username[MAX_USER_PASSWD_LEN + 1];
407 	char		file_password[MAX_USER_PASSWD_LEN + 1];
408 	char	   *index = NULL;
409 	static char line[MAX_FILE_LINE_LEN + 1];
410 	int			i,
411 				len;
412 
413 	/* strcpy() should be OK, but use strncpy() to be extra careful */
414 	strncpy(packet_username, buf, MAX_USER_PASSWD_LEN);
415 	index = (char *) memchr(buf, '\0', MAX_USER_PASSWD_LEN);
416 	if (index == NULL)
417 	{
418 		ereport(FATAL,
419 				(errmsg("failed to authenticate PCP user"),
420 				 errdetail("error while reading authentication packet")));
421 		return 0;
422 	}
423 	strncpy(packet_password, ++index, MAX_USER_PASSWD_LEN);
424 
425 	fp = fopen(passwd_file, "r");
426 	if (fp == NULL)
427 	{
428 		ereport(FATAL,
429 				(errmsg("failed to authenticate PCP user"),
430 				 errdetail("could not open %s. reason: %m", passwd_file)));
431 		return 0;
432 	}
433 
434 	/* for now, I don't care if duplicate username exists in the config file */
435 	while ((fgets(line, MAX_FILE_LINE_LEN, fp)) != NULL)
436 	{
437 		i = 0;
438 		len = 0;
439 
440 		if (line[0] == '\n' || line[0] == '#')
441 			continue;
442 
443 		while (line[i] != ':')
444 		{
445 			len++;
446 			if (++i > MAX_USER_PASSWD_LEN)
447 			{
448 				fclose(fp);
449 				ereport(FATAL,
450 						(errmsg("failed to authenticate PCP user"),
451 						 errdetail("username read from file \"%s\" is larger than maximum allowed username length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
452 				return 0;
453 			}
454 		}
455 		memcpy(file_username, line, len);
456 		file_username[len] = '\0';
457 
458 		if (strcmp(packet_username, file_username) != 0)
459 			continue;
460 
461 		i++;
462 		len = 0;
463 		while (line[i] != '\n' && line[i] != '\0')
464 		{
465 			len++;
466 			if (++i > MAX_USER_PASSWD_LEN)
467 			{
468 				fclose(fp);
469 				ereport(FATAL,
470 						(errmsg("failed to authenticate PCP user"),
471 						 errdetail("password read from file \"%s\" is larger than maximum allowed password length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
472 				return 0;
473 			}
474 		}
475 
476 		memcpy(file_password, line + strlen(file_username) + 1, len);
477 		file_password[len] = '\0';
478 
479 		pool_md5_encrypt(file_password, file_username, strlen(file_username),
480 						 encrypt_buf + MD5_PASSWD_LEN + 1);
481 		encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
482 
483 		pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, salt_len,
484 						 encrypt_buf);
485 		encrypt_buf[MD5_PASSWD_LEN] = '\0';
486 
487 		if (strcmp(encrypt_buf, packet_password) == 0)
488 		{
489 			fclose(fp);
490 			return 1;
491 		}
492 	}
493 	fclose(fp);
494 	ereport(FATAL,
495 			(errmsg("authentication failed for user \"%s\"", packet_username),
496 			 errdetail("username and/or password does not match")));
497 
498 	return 0;
499 }
500 
501 
502 /* Detach a node */
503 static int
pool_detach_node(int node_id,bool gracefully)504 pool_detach_node(int node_id, bool gracefully)
505 {
506 	if (!gracefully)
507 	{
508 		degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
509 		return 0;
510 	}
511 
512 	/*
513 	 * Check if the NODE DOWN can be executed on the given node id.
514 	 */
515 	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
516 
517 	/*
518 	 * Wait until all frontends exit
519 	 */
520 	*InRecovery = RECOVERY_DETACH;	/* This wiil ensure that new incoming
521 									 * connection requests are blocked */
522 
523 	if (wait_connection_closed())
524 	{
525 		/* wait timed out */
526 		finish_recovery();
527 		return -1;
528 	}
529 
530 	pcp_worker_wakeup_request = 0;
531 
532 	/*
533 	 * Now all frontends have gone. Let's do failover.
534 	 */
535 	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, false);
536 
537 	/*
538 	 * Wait for failover completed.
539 	 */
540 
541 	while (!pcp_worker_wakeup_request)
542 	{
543 		struct timeval t = {1, 0};
544 
545 		select(0, NULL, NULL, NULL, &t);
546 	}
547 	pcp_worker_wakeup_request = 0;
548 
549 	/*
550 	 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
551 	 * to distribute SIGUSR2 all pgpool children.
552 	 */
553 	finish_recovery();
554 
555 	return 0;
556 }
557 
558 /* Promote a node */
559 static int
pool_promote_node(int node_id,bool gracefully)560 pool_promote_node(int node_id, bool gracefully)
561 {
562 	if (!gracefully)
563 	{
564 		promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
565 		return 0;
566 	}
567 
568 	/*
569 	 * Wait until all frontends exit
570 	 */
571 	*InRecovery = RECOVERY_PROMOTE; /* This wiil ensure that new incoming
572 									 * connection requests are blocked */
573 
574 	if (wait_connection_closed())
575 	{
576 		/* wait timed out */
577 		finish_recovery();
578 		return -1;
579 	}
580 
581 	/*
582 	 * Now all frontends have gone. Let's do failover.
583 	 */
584 	promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
585 
586 	/*
587 	 * Wait for failover completed.
588 	 */
589 	pcp_worker_wakeup_request = 0;
590 
591 	while (!pcp_worker_wakeup_request)
592 	{
593 		struct timeval t = {1, 0};
594 
595 		select(0, NULL, NULL, NULL, &t);
596 	}
597 	pcp_worker_wakeup_request = 0;
598 
599 	/*
600 	 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
601 	 * to distribute SIGUSR2 all pgpool children.
602 	 */
603 	finish_recovery();
604 	return 0;
605 }
606 
607 static void
inform_process_count(PCP_CONNECTION * frontend)608 inform_process_count(PCP_CONNECTION * frontend)
609 {
610 	int			wsize;
611 	int			process_count;
612 	char		process_count_str[16];
613 	int		   *process_list = NULL;
614 	char		code[] = "CommandComplete";
615 	char	   *mesg = NULL;
616 	int			i;
617 	int			total_port_len = 0;
618 
619 	process_list = pool_get_process_list(&process_count);
620 
621 	mesg = (char *) palloc(7 * process_count);	/* PID is at most 6 characters
622 												 * long */
623 
624 	snprintf(process_count_str, sizeof(process_count_str), "%d", process_count);
625 
626 	for (i = 0; i < process_count; i++)
627 	{
628 		char		process_id[7];
629 
630 		snprintf(process_id, sizeof(process_id), "%d", process_list[i]);
631 		snprintf(mesg + total_port_len, strlen(process_id) + 1, "%s", process_id);
632 		total_port_len += strlen(process_id) + 1;
633 	}
634 
635 	pcp_write(frontend, "n", 1);
636 	wsize = htonl(sizeof(code) +
637 				  strlen(process_count_str) + 1 +
638 				  total_port_len +
639 				  sizeof(int));
640 	pcp_write(frontend, &wsize, sizeof(int));
641 	pcp_write(frontend, code, sizeof(code));
642 	pcp_write(frontend, process_count_str, strlen(process_count_str) + 1);
643 	pcp_write(frontend, mesg, total_port_len);
644 	do_pcp_flush(frontend);
645 
646 	pfree(process_list);
647 	pfree(mesg);
648 
649 	ereport(DEBUG1,
650 			(errmsg("PCP: informing process count"),
651 			 errdetail("%d process(es) found", process_count)));
652 }
653 
654 static void
inform_process_info(PCP_CONNECTION * frontend,char * buf)655 inform_process_info(PCP_CONNECTION * frontend, char *buf)
656 {
657 	int			proc_id;
658 	int			wsize;
659 	int			num_proc = pool_config->num_init_children;
660 	int			i;
661 
662 	proc_id = atoi(buf);
663 
664 	if ((proc_id != 0) && (pool_get_process_info(proc_id) == NULL))
665 	{
666 		ereport(ERROR,
667 				(errmsg("informing process info failed"),
668 				 errdetail("invalid process ID : %s", buf)));
669 	}
670 	else
671 	{
672 		/* First, send array size of connection_info */
673 		char		arr_code[] = "ArraySize";
674 		char		con_info_size[16];
675 
676 		/* Finally, indicate that all data is sent */
677 		char		fin_code[] = "CommandComplete";
678 
679 		POOL_REPORT_POOLS *pools = get_pools(&num_proc);
680 
681 		if (proc_id == 0)
682 		{
683 			snprintf(con_info_size, sizeof(con_info_size), "%d", num_proc);
684 		}
685 		else
686 		{
687 			snprintf(con_info_size, sizeof(con_info_size), "%d", pool_config->max_pool * NUM_BACKENDS);
688 		}
689 
690 		pcp_write(frontend, "p", 1);
691 		wsize = htonl(sizeof(arr_code) +
692 					  strlen(con_info_size) + 1 +
693 					  sizeof(int));
694 		pcp_write(frontend, &wsize, sizeof(int));
695 		pcp_write(frontend, arr_code, sizeof(arr_code));
696 		pcp_write(frontend, con_info_size, strlen(con_info_size) + 1);
697 		do_pcp_flush(frontend);
698 
699 		/* Second, send process information for all connection_info */
700 		for (i = 0; i < num_proc; i++)
701 		{
702 			char		code[] = "ProcessInfo";
703 			char		proc_pid[16];
704 			char		proc_start_time[20];
705 			char		proc_create_time[20];
706 			char		majorversion[5];
707 			char		minorversion[5];
708 			char		pool_counter[16];
709 			char		backend_id[16];
710 			char		backend_pid[16];
711 			char		connected[2];
712 
713 			if (proc_id != 0 && proc_id != pools[i].pool_pid)
714 				continue;
715 
716 			snprintf(proc_pid, sizeof(proc_pid), "%d", pools[i].pool_pid);
717 			snprintf(proc_start_time, sizeof(proc_start_time), "%ld", pools[i].start_time);
718 			snprintf(proc_create_time, sizeof(proc_create_time), "%ld", pools[i].create_time);
719 			snprintf(majorversion, sizeof(majorversion), "%d", pools[i].pool_majorversion);
720 			snprintf(minorversion, sizeof(minorversion), "%d", pools[i].pool_minorversion);
721 			snprintf(pool_counter, sizeof(pool_counter), "%d", pools[i].pool_counter);
722 			snprintf(backend_id, sizeof(backend_pid), "%d", pools[i].backend_id);
723 			snprintf(backend_pid, sizeof(backend_pid), "%d", pools[i].pool_backendpid);
724 			snprintf(connected, sizeof(connected), "%d", pools[i].pool_connected);
725 
726 			pcp_write(frontend, "p", 1);
727 			wsize = htonl(sizeof(code) +
728 						  strlen(proc_pid) + 1 +
729 						  strlen(pools[i].database) + 1 +
730 						  strlen(pools[i].username) + 1 +
731 						  strlen(proc_start_time) + 1 +
732 						  strlen(proc_create_time) + 1 +
733 						  strlen(majorversion) + 1 +
734 						  strlen(minorversion) + 1 +
735 						  strlen(pool_counter) + 1 +
736 						  strlen(backend_id) + 1 +
737 						  strlen(backend_pid) + 1 +
738 						  strlen(connected) + 1 +
739 						  sizeof(int));
740 			pcp_write(frontend, &wsize, sizeof(int));
741 			pcp_write(frontend, code, sizeof(code));
742 			pcp_write(frontend, proc_pid, strlen(proc_pid) + 1);
743 			pcp_write(frontend, pools[i].database, strlen(pools[i].database) + 1);
744 			pcp_write(frontend, pools[i].username, strlen(pools[i].username) + 1);
745 			pcp_write(frontend, proc_start_time, strlen(proc_start_time) + 1);
746 			pcp_write(frontend, proc_create_time, strlen(proc_create_time) + 1);
747 			pcp_write(frontend, majorversion, strlen(majorversion) + 1);
748 			pcp_write(frontend, minorversion, strlen(minorversion) + 1);
749 			pcp_write(frontend, pool_counter, strlen(pool_counter) + 1);
750 			pcp_write(frontend, backend_id, strlen(backend_id) + 1);
751 			pcp_write(frontend, backend_pid, strlen(backend_pid) + 1);
752 			pcp_write(frontend, connected, strlen(connected) + 1);
753 			do_pcp_flush(frontend);
754 		}
755 
756 		pcp_write(frontend, "p", 1);
757 		wsize = htonl(sizeof(fin_code) +
758 					  sizeof(int));
759 		pcp_write(frontend, &wsize, sizeof(int));
760 		pcp_write(frontend, fin_code, sizeof(fin_code));
761 		do_pcp_flush(frontend);
762 		ereport(DEBUG1,
763 				(errmsg("PCP informing process info"),
764 				 errdetail("retrieved process information from shared memory")));
765 
766 		pfree(pools);
767 	}
768 }
769 
770 static void
inform_watchdog_info(PCP_CONNECTION * frontend,char * buf)771 inform_watchdog_info(PCP_CONNECTION * frontend, char *buf)
772 {
773 	int			wd_index;
774 	int			json_data_len;
775 	int			wsize;
776 	char		code[] = "CommandComplete";
777 	char	   *json_data;
778 
779 	if (!pool_config->use_watchdog)
780 		ereport(ERROR,
781 				(errmsg("PCP: informing watchdog info failed"),
782 				 errdetail("watcdhog is not enabled")));
783 
784 	wd_index = atoi(buf);
785 
786 	json_data = wd_get_watchdog_nodes(wd_index);
787 	if (json_data == NULL)
788 		ereport(ERROR,
789 				(errmsg("PCP: informing watchdog info failed"),
790 				 errdetail("invalid watchdog index")));
791 
792 	ereport(DEBUG2,
793 			(errmsg("PCP: informing watchdog info"),
794 			 errdetail("retrieved node information from IPC socket")));
795 
796 	/*
797 	 * This is the voilation of PCP protocol but I think in future we should
798 	 * shift to more adaptable protocol for data transmition.
799 	 */
800 	json_data_len = strlen(json_data);
801 	wsize = htonl(sizeof(code) +
802 				  json_data_len + 1 +
803 				  sizeof(int));
804 	pcp_write(frontend, "w", 1);
805 
806 	pcp_write(frontend, &wsize, sizeof(int));
807 	pcp_write(frontend, code, sizeof(code));
808 
809 	pcp_write(frontend, json_data, json_data_len + 1);
810 	do_pcp_flush(frontend);
811 
812 	pfree(json_data);
813 }
814 
815 static void
inform_node_info(PCP_CONNECTION * frontend,char * buf)816 inform_node_info(PCP_CONNECTION * frontend, char *buf)
817 {
818 	int			node_id;
819 	int			wsize;
820 	char		port_str[6];
821 	char		status[2];
822 	char		weight_str[20];
823 	char		role_str[10];
824 	char		standby_delay_str[20];
825 	char		status_changed_time_str[20];
826 	char		code[] = "CommandComplete";
827 	BackendInfo *bi = NULL;
828 	SERVER_ROLE role;
829 
830 	node_id = atoi(buf);
831 
832 	bi = pool_get_node_info(node_id);
833 
834 	if (bi == NULL)
835 		ereport(ERROR,
836 				(errmsg("informing node info failed"),
837 				 errdetail("invalid node ID")));
838 
839 	ereport(DEBUG2,
840 			(errmsg("PCP: informing node info"),
841 			 errdetail("retrieved node information from shared memory")));
842 
843 	snprintf(port_str, sizeof(port_str), "%d", bi->backend_port);
844 	snprintf(status, sizeof(status), "%d", bi->backend_status);
845 	snprintf(weight_str, sizeof(weight_str), "%f", bi->backend_weight);
846 
847 	if (STREAM)
848 	{
849 		if (Req_info->primary_node_id == node_id)
850 			role = ROLE_PRIMARY;
851 		else
852 			role = ROLE_STANDBY;
853 	}
854 	else
855 	{
856 		if (Req_info->master_node_id == node_id)
857 			role = ROLE_MASTER;
858 		else
859 			role = ROLE_SLAVE;
860 	}
861 	snprintf(role_str, sizeof(role_str), "%d", role);
862 
863 	snprintf(standby_delay_str, sizeof(standby_delay_str), UINT64_FORMAT, bi->standby_delay);
864 
865 	snprintf(status_changed_time_str, sizeof(status_changed_time_str), UINT64_FORMAT, bi->status_changed_time);
866 
867 	pcp_write(frontend, "i", 1);
868 	wsize = htonl(sizeof(code) +
869 				  strlen(bi->backend_hostname) + 1 +
870 				  strlen(port_str) + 1 +
871 				  strlen(status) + 1 +
872 				  strlen(weight_str) + 1 +
873 				  strlen(role_str) + 1 +
874 				  strlen(standby_delay_str) + 1 +
875 				  strlen(bi->replication_state) + 1 +
876 				  strlen(bi->replication_sync_state) + 1 +
877 				  strlen(status_changed_time_str) + 1 +
878 				  sizeof(int));
879 	pcp_write(frontend, &wsize, sizeof(int));
880 	pcp_write(frontend, code, sizeof(code));
881 	pcp_write(frontend, bi->backend_hostname, strlen(bi->backend_hostname) + 1);
882 	pcp_write(frontend, port_str, strlen(port_str) + 1);
883 	pcp_write(frontend, status, strlen(status) + 1);
884 	pcp_write(frontend, weight_str, strlen(weight_str) + 1);
885 	pcp_write(frontend, role_str, strlen(role_str) + 1);
886 	pcp_write(frontend, standby_delay_str, strlen(standby_delay_str) + 1);
887 	pcp_write(frontend, bi->replication_state, strlen(bi->replication_state) + 1);
888 	pcp_write(frontend, bi->replication_sync_state, strlen(bi->replication_sync_state) + 1);
889 	pcp_write(frontend, status_changed_time_str, strlen(status_changed_time_str) + 1);
890 
891 	do_pcp_flush(frontend);
892 }
893 
894 static void
inform_node_count(PCP_CONNECTION * frontend)895 inform_node_count(PCP_CONNECTION * frontend)
896 {
897 	int			wsize;
898 	char		mesg[16];
899 	char		code[] = "CommandComplete";
900 	int			node_count = pool_get_node_count();
901 
902 	snprintf(mesg, sizeof(mesg), "%d", node_count);
903 
904 	pcp_write(frontend, "l", 1);
905 	wsize = htonl(sizeof(code) +
906 				  strlen(mesg) + 1 +
907 				  sizeof(int));
908 	pcp_write(frontend, &wsize, sizeof(int));
909 	pcp_write(frontend, code, sizeof(code));
910 	pcp_write(frontend, mesg, strlen(mesg) + 1);
911 	do_pcp_flush(frontend);
912 
913 	ereport(DEBUG1,
914 			(errmsg("PCP: informing node count"),
915 			 errdetail("%d node(s) found", node_count)));
916 }
917 
918 static void
process_detach_node(PCP_CONNECTION * frontend,char * buf,char tos)919 process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos)
920 {
921 	int			node_id;
922 	int			wsize;
923 	char		code[] = "CommandComplete";
924 	bool		gracefully;
925 
926 	if (tos == 'D')
927 		gracefully = false;
928 	else
929 		gracefully = true;
930 
931 	node_id = atoi(buf);
932 	ereport(DEBUG1,
933 			(errmsg("PCP: processing detach node"),
934 			 errdetail("detaching Node ID %d", node_id)));
935 
936 	pool_detach_node(node_id, gracefully);
937 
938 	pcp_write(frontend, "d", 1);
939 	wsize = htonl(sizeof(code) + sizeof(int));
940 	pcp_write(frontend, &wsize, sizeof(int));
941 	pcp_write(frontend, code, sizeof(code));
942 	do_pcp_flush(frontend);
943 }
944 
945 static void
process_attach_node(PCP_CONNECTION * frontend,char * buf)946 process_attach_node(PCP_CONNECTION * frontend, char *buf)
947 {
948 	int			node_id;
949 	int			wsize;
950 	char		code[] = "CommandComplete";
951 
952 	node_id = atoi(buf);
953 	ereport(DEBUG1,
954 			(errmsg("PCP: processing attach node"),
955 			 errdetail("attaching Node ID %d", node_id)));
956 
957 	send_failback_request(node_id, true, REQ_DETAIL_CONFIRMED);
958 
959 	pcp_write(frontend, "c", 1);
960 	wsize = htonl(sizeof(code) + sizeof(int));
961 	pcp_write(frontend, &wsize, sizeof(int));
962 	pcp_write(frontend, code, sizeof(code));
963 	do_pcp_flush(frontend);
964 }
965 
966 
967 static void
process_recovery_request(PCP_CONNECTION * frontend,char * buf)968 process_recovery_request(PCP_CONNECTION * frontend, char *buf)
969 {
970 	int			wsize;
971 	char		code[] = "CommandComplete";
972 	int			node_id = atoi(buf);
973 
974 	if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
975 		ereport(ERROR,
976 				(errmsg("process recovery request failed"),
977 				 errdetail("node id %d is not valid", node_id)));
978 
979 	if ((!REPLICATION &&
980 		 !(MASTER_SLAVE &&
981 		   pool_config->master_slave_sub_mode == STREAM_MODE)) ||
982 		(MASTER_SLAVE &&
983 		 pool_config->master_slave_sub_mode == STREAM_MODE &&
984 		 node_id == PRIMARY_NODE_ID))
985 	{
986 		if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
987 			ereport(ERROR,
988 					(errmsg("process recovery request failed"),
989 					 errdetail("primary server cannot be recovered by online recovery.")));
990 		else
991 			ereport(ERROR,
992 					(errmsg("process recovery request failed"),
993 					 errdetail("recovery request is only allowed in replication and streaming replication modes.")));
994 	}
995 	else
996 	{
997 		if (pcp_mark_recovery_in_progress() == false)
998 			ereport(FATAL,
999 					(errmsg("process recovery request failed"),
1000 					 errdetail("pgpool-II is already processing another recovery request.")));
1001 
1002 		ereport(DEBUG1,
1003 				(errmsg("PCP: processing recovery request"),
1004 				 errdetail("start online recovery")));
1005 
1006 		PG_TRY();
1007 		{
1008 			start_recovery(node_id);
1009 			finish_recovery();
1010 			pcp_write(frontend, "c", 1);
1011 			wsize = htonl(sizeof(code) + sizeof(int));
1012 			pcp_write(frontend, &wsize, sizeof(int));
1013 			pcp_write(frontend, code, sizeof(code));
1014 			do_pcp_flush(frontend);
1015 			pcp_mark_recovery_finished();
1016 		}
1017 		PG_CATCH();
1018 		{
1019 			finish_recovery();
1020 			pcp_mark_recovery_finished();
1021 			PG_RE_THROW();
1022 
1023 		} PG_END_TRY();
1024 	}
1025 	do_pcp_flush(frontend);
1026 }
1027 
1028 static void
process_status_request(PCP_CONNECTION * frontend)1029 process_status_request(PCP_CONNECTION * frontend)
1030 {
1031 	int			nrows = 0;
1032 	int			i;
1033 	POOL_REPORT_CONFIG *status = get_config(&nrows);
1034 	int			len = 0;
1035 
1036 	/* First, send array size of connection_info */
1037 	char		arr_code[] = "ArraySize";
1038 	char		code[] = "ProcessConfig";
1039 
1040 	/* Finally, indicate that all data is sent */
1041 	char		fin_code[] = "CommandComplete";
1042 
1043 	pcp_write(frontend, "b", 1);
1044 	len = htonl(sizeof(arr_code) + sizeof(int) + sizeof(int));
1045 	pcp_write(frontend, &len, sizeof(int));
1046 	pcp_write(frontend, arr_code, sizeof(arr_code));
1047 	len = htonl(nrows);
1048 	pcp_write(frontend, &len, sizeof(int));
1049 
1050 	do_pcp_flush(frontend);
1051 
1052 	for (i = 0; i < nrows; i++)
1053 	{
1054 		pcp_write(frontend, "b", 1);
1055 		len = htonl(sizeof(int)
1056 					+ sizeof(code)
1057 					+ strlen(status[i].name) + 1
1058 					+ strlen(status[i].value) + 1
1059 					+ strlen(status[i].desc) + 1
1060 			);
1061 
1062 		pcp_write(frontend, &len, sizeof(int));
1063 		pcp_write(frontend, code, sizeof(code));
1064 		pcp_write(frontend, status[i].name, strlen(status[i].name) + 1);
1065 		pcp_write(frontend, status[i].value, strlen(status[i].value) + 1);
1066 		pcp_write(frontend, status[i].desc, strlen(status[i].desc) + 1);
1067 	}
1068 
1069 	pcp_write(frontend, "b", 1);
1070 	len = htonl(sizeof(fin_code) + sizeof(int));
1071 	pcp_write(frontend, &len, sizeof(int));
1072 	pcp_write(frontend, fin_code, sizeof(fin_code));
1073 	do_pcp_flush(frontend);
1074 
1075 	pfree(status);
1076 	ereport(DEBUG1,
1077 			(errmsg("PCP: processing status request"),
1078 			 errdetail("retrieved status information")));
1079 }
1080 
1081 static void
process_promote_node(PCP_CONNECTION * frontend,char * buf,char tos)1082 process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos)
1083 {
1084 	int			node_id;
1085 	int			wsize;
1086 	char		code[] = "CommandComplete";
1087 	bool		gracefully;
1088 
1089 	if (tos == 'J')
1090 		gracefully = false;
1091 	else
1092 		gracefully = true;
1093 
1094 	node_id = atoi(buf);
1095 	if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
1096 		ereport(ERROR,
1097 				(errmsg("could not process recovery request"),
1098 				 errdetail("node id %d is not valid", node_id)));
1099 	/* promoting node is reserved to Streaming Replication */
1100 	if (!MASTER_SLAVE || pool_config->master_slave_sub_mode != STREAM_MODE)
1101 	{
1102 		ereport(FATAL,
1103 				(errmsg("invalid pgpool mode for process recovery request"),
1104 				 errdetail("not in streaming replication mode, can't promote node id %d", node_id)));
1105 
1106 	}
1107 
1108 	if (node_id == REAL_PRIMARY_NODE_ID)
1109 	{
1110 		ereport(FATAL,
1111 				(errmsg("invalid pgpool mode for process recovery request"),
1112 				 errdetail("specified node is already primary node, can't promote node id %d", node_id)));
1113 
1114 	}
1115 	ereport(DEBUG1,
1116 			(errmsg("PCP: processing promote node"),
1117 			 errdetail("promoting Node ID %d", node_id)));
1118 	pool_promote_node(node_id, gracefully);
1119 
1120 	pcp_write(frontend, "d", 1);
1121 	wsize = htonl(sizeof(code) + sizeof(int));
1122 	pcp_write(frontend, &wsize, sizeof(int));
1123 	pcp_write(frontend, code, sizeof(code));
1124 	do_pcp_flush(frontend);
1125 }
1126 
1127 static void
process_authentication(PCP_CONNECTION * frontend,char * buf,char * salt,int * random_salt)1128 process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt)
1129 {
1130 	int			wsize;
1131 	int			authenticated;
1132 
1133 	if (*random_salt)
1134 	{
1135 		authenticated = user_authenticate(buf, pcp_conf_file, salt, 4);
1136 	}
1137 	if (!*random_salt || !authenticated)
1138 	{
1139 		ereport(FATAL,
1140 				(errmsg("authentication failed"),
1141 				 errdetail("username and/or password does not match")));
1142 
1143 		*random_salt = 0;
1144 	}
1145 	else
1146 	{
1147 		char		code[] = "AuthenticationOK";
1148 
1149 		pcp_write(frontend, "r", 1);
1150 		wsize = htonl(sizeof(code) + sizeof(int));
1151 		pcp_write(frontend, &wsize, sizeof(int));
1152 		pcp_write(frontend, code, sizeof(code));
1153 		do_pcp_flush(frontend);
1154 		*random_salt = 0;
1155 
1156 		ereport(DEBUG1,
1157 				(errmsg("PCP: processing authentication request"),
1158 				 errdetail("authentication OK")));
1159 	}
1160 }
1161 
1162 static void
send_md5salt(PCP_CONNECTION * frontend,char * salt)1163 send_md5salt(PCP_CONNECTION * frontend, char *salt)
1164 {
1165 	int			wsize;
1166 
1167 	ereport(DEBUG1,
1168 			(errmsg("PCP: sending md5 salt to client")));
1169 
1170 	pool_random_salt(salt);
1171 
1172 	pcp_write(frontend, "m", 1);
1173 	wsize = htonl(sizeof(int) + 4);
1174 	pcp_write(frontend, &wsize, sizeof(int));
1175 	pcp_write(frontend, salt, 4);
1176 	do_pcp_flush(frontend);
1177 }
1178 
1179 static void
process_shutown_request(PCP_CONNECTION * frontend,char mode)1180 process_shutown_request(PCP_CONNECTION * frontend, char mode)
1181 {
1182 	char		code[] = "CommandComplete";
1183 	pid_t		ppid = getppid();
1184 	int			sig,
1185 				len;
1186 
1187 	if (mode == 's')
1188 	{
1189 		ereport(DEBUG1,
1190 				(errmsg("PCP: processing shutdown request"),
1191 				 errdetail("sending SIGTERM to the parent process with PID:%d", ppid)));
1192 		sig = SIGTERM;
1193 	}
1194 	else if (mode == 'f')
1195 	{
1196 		ereport(DEBUG1,
1197 				(errmsg("PCP: processing shutdown request"),
1198 				 errdetail("sending SIGINT to the parent process with PID:%d", ppid)));
1199 		sig = SIGINT;
1200 	}
1201 	else if (mode == 'i')
1202 	{
1203 		ereport(DEBUG1,
1204 				(errmsg("PCP: processing shutdown request"),
1205 				 errdetail("sending SIGQUIT to the parent process with PID:%d", ppid)));
1206 		sig = SIGQUIT;
1207 	}
1208 	else
1209 	{
1210 		ereport(ERROR,
1211 				(errmsg("PCP: error while processing shutdown request"),
1212 				 errdetail("invalid shutdown mode \"%c\"", mode)));
1213 	}
1214 
1215 	pcp_write(frontend, "t", 1);
1216 	len = htonl(sizeof(code) + sizeof(int));
1217 	pcp_write(frontend, &len, sizeof(int));
1218 	pcp_write(frontend, code, sizeof(code));
1219 	do_pcp_flush(frontend);
1220 
1221 	pool_signal_parent(sig);
1222 }
1223 
1224 static void
process_set_configration_parameter(PCP_CONNECTION * frontend,char * buf,int len)1225 process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len)
1226 {
1227 	char	   *param_name;
1228 	char	   *param_value;
1229 	int			wsize;
1230 	char		code[] = "CommandComplete";
1231 
1232 	param_name = buf;
1233 	if (param_name == NULL)
1234 		ereport(ERROR,
1235 				(errmsg("PCP: set configuration parameter failed"),
1236 				 errdetail("invalid pcp packet received from client")));
1237 
1238 	param_value = (char *) memchr(buf, '\0', len);
1239 	if (param_value == NULL)
1240 		ereport(ERROR,
1241 				(errmsg("set configuration parameter failed"),
1242 				 errdetail("invalid pcp packet received from client")));
1243 
1244 	param_value += 1;
1245 	ereport(LOG,
1246 			(errmsg("set configuration parameter, \"%s TO %s\"", param_name, param_value)));
1247 
1248 	if (strcasecmp(param_name, "client_min_messages") == 0)
1249 	{
1250 		const char *ordered_valid_values[] = {"debug5", "debug4", "debug3", "debug2", "debug1", "log", "commerror", "info", "notice", "warning", "error", NULL};
1251 		bool		found = false;
1252 		int			i;
1253 
1254 		for (i = 0;; i++)
1255 		{
1256 			char	   *valid_val = (char *) ordered_valid_values[i];
1257 
1258 			if (!valid_val)
1259 				break;
1260 
1261 			if (!strcasecmp(param_value, valid_val))
1262 			{
1263 				found = true;
1264 				pool_config->client_min_messages = i + 10;
1265 				ereport(DEBUG1,
1266 						(errmsg("PCP setting parameter \"%s\" to \"%s\"", param_name, param_value)));
1267 				break;
1268 			}
1269 		}
1270 		if (!found)
1271 			ereport(ERROR,
1272 					(errmsg("PCP: set configuration parameter failed"),
1273 					 errdetail("invalid value \"%s\" for parameter \"%s\"", param_value, param_name)));
1274 	}
1275 	else
1276 		ereport(ERROR,
1277 				(errmsg("PCP: set configuration parameter failed"),
1278 				 errdetail("invalid parameter \"%s\"", param_name)));
1279 
1280 	pcp_write(frontend, "a", 1);
1281 	wsize = htonl(sizeof(code) + sizeof(int));
1282 	pcp_write(frontend, &wsize, sizeof(int));
1283 	pcp_write(frontend, code, sizeof(code));
1284 	do_pcp_flush(frontend);
1285 }
1286 
1287 /*
1288  * Wrapper around pcp_flush which throws FATAL error when pcp_flush fails
1289  */
1290 static void
do_pcp_flush(PCP_CONNECTION * frontend)1291 do_pcp_flush(PCP_CONNECTION * frontend)
1292 {
1293 	if (pcp_flush(frontend) < 0)
1294 		ereport(FATAL,
1295 				(errmsg("failed to flush data to client"),
1296 				 errdetail("pcp_flush failed with error : \"%m\"")));
1297 }
1298 
1299 /*
1300  * Wrapper around pcp_read which throws FATAL error when read fails
1301  */
1302 static void
do_pcp_read(PCP_CONNECTION * pc,void * buf,int len)1303 do_pcp_read(PCP_CONNECTION * pc, void *buf, int len)
1304 {
1305 	if (pcp_read(pc, buf, len))
1306 		ereport(FATAL,
1307 				(errmsg("unable to read from client"),
1308 				 errdetail("pcp_read failed with error : \"%m\"")));
1309 }
1310 
1311 int
send_to_pcp_frontend(char * data,int len,bool flush)1312 send_to_pcp_frontend(char *data, int len, bool flush)
1313 {
1314 	int			ret;
1315 
1316 	if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1317 		return -1;
1318 	ret = pcp_write(pcp_frontend, data, len);
1319 	if (flush && !ret)
1320 		ret = pcp_flush(pcp_frontend);
1321 	return ret;
1322 }
1323 
1324 int
pcp_frontend_exists(void)1325 pcp_frontend_exists(void)
1326 {
1327 	if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1328 		return -1;
1329 	return 0;
1330 }
1331 
1332 static void
pcp_worker_will_go_down(int code,Datum arg)1333 pcp_worker_will_go_down(int code, Datum arg)
1334 {
1335 	if (processType != PT_PCP_WORKER)
1336 	{
1337 		/* should never happen */
1338 		ereport(WARNING,
1339 				(errmsg("pcp_worker_will_go_down called from invalid process")));
1340 		return;
1341 	}
1342 	if (pcp_frontend)
1343 		pcp_close(pcp_frontend);
1344 	processState = EXITING;
1345 	POOL_SETMASK(&UnBlockSig);
1346 
1347 }
1348