1 /* -*-pcp_worker.c-*- */
2 /*
3 *
4 * pgpool: a language independent connection pool server for PostgreSQL
5 * written by Tatsuo Ishii
6 *
7 * Copyright (c) 2003-2020 PgPool Global Development Group
8 *
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
19 *
20 * pcp_worker.c: PCP worker child process main
21 *
22 */
23
24
25 #include "config.h"
26
27 #include <arpa/inet.h>
28 #include <signal.h>
29
30 #include <stdio.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <stdlib.h>
35 #include <sys/time.h>
36
37 #ifdef HAVE_FCNTL_H
38 #include <fcntl.h>
39 #endif
40
41 #include "pool.h"
42 #include "pool_config.h"
43
44 #include "pcp/pcp_stream.h"
45 #include "pcp/pcp.h"
46 #include "pcp/pcp_worker.h"
47 #include "pcp/recovery.h"
48 #include "auth/md5.h"
49 #include "auth/pool_auth.h"
50 #include "context/pool_process_context.h"
51 #include "utils/pool_process_reporting.h"
52 #include "utils/palloc.h"
53 #include "utils/memutils.h"
54 #include "utils/ps_status.h"
55 #include "utils/elog.h"
56 #include "watchdog/wd_json_data.h"
57 #include "watchdog/wd_internal_commands.h"
58 #include "main/pool_internal_comms.h"
59
60 #define MAX_FILE_LINE_LEN 512
61
62 extern char *pcp_conf_file; /* global variable defined in main.c holds the
63 * path for pcp.conf */
64 volatile sig_atomic_t pcp_worker_wakeup_request = 0;
65 PCP_CONNECTION *volatile pcp_frontend = NULL;
66
67 static RETSIGTYPE die(int sig);
68 static RETSIGTYPE wakeup_handler_child(int sig);
69
70 static void unset_nonblock(int fd);
71 static int user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len);
72 static void process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt);
73 static void send_md5salt(PCP_CONNECTION * frontend, char *salt);
74
75 static void pcp_process_command(char tos, char *buf, int buf_len);
76
77 static int pool_detach_node(int node_id, bool gracefully);
78 static int pool_promote_node(int node_id, bool gracefully);
79 static void inform_process_count(PCP_CONNECTION * frontend);
80 static void inform_process_info(PCP_CONNECTION * frontend, char *buf);
81 static void inform_watchdog_info(PCP_CONNECTION * frontend, char *buf);
82 static void inform_node_info(PCP_CONNECTION * frontend, char *buf);
83 static void inform_node_count(PCP_CONNECTION * frontend);
84 static void process_reload_config(PCP_CONNECTION * frontend,char scope);
85 static void inform_health_check_stats(PCP_CONNECTION *frontend, char *buf);
86 static void process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos);
87 static void process_attach_node(PCP_CONNECTION * frontend, char *buf);
88 static void process_recovery_request(PCP_CONNECTION * frontend, char *buf);
89 static void process_status_request(PCP_CONNECTION * frontend);
90 static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos);
91 static void process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos);
92 static void process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len);
93
94 static void pcp_worker_will_go_down(int code, Datum arg);
95
96 static void do_pcp_flush(PCP_CONNECTION * frontend);
97 static void do_pcp_read(PCP_CONNECTION * pc, void *buf, int len);
98
99 /*
100 * main entry pont of pcp worker child process
101 */
102 void
pcp_worker_main(int port)103 pcp_worker_main(int port)
104 {
105 sigjmp_buf local_sigjmp_buf;
106 MemoryContext PCPMemoryContext;
107 volatile int authenticated = 0;
108
109 char salt[4];
110 int random_salt = 0;
111 struct timeval uptime;
112 char tos;
113 int rsize;
114
115 ereport(DEBUG1,
116 (errmsg("I am PCP worker child with pid:%d", getpid())));
117
118 /* Identify myself via ps */
119 init_ps_display("", "", "", "");
120
121 gettimeofday(&uptime, NULL);
122 srandom((unsigned int) (getpid() ^ uptime.tv_usec));
123
124 /* set up signal handlers */
125 signal(SIGTERM, die);
126 signal(SIGINT, die);
127 signal(SIGQUIT, die);
128 signal(SIGCHLD, SIG_DFL);
129 signal(SIGUSR2, wakeup_handler_child);
130 signal(SIGUSR1, SIG_IGN);
131 signal(SIGHUP, SIG_IGN);
132 signal(SIGPIPE, SIG_IGN);
133 signal(SIGALRM, SIG_IGN);
134 /* Create per loop iteration memory context */
135 PCPMemoryContext = AllocSetContextCreate(TopMemoryContext,
136 "PCP_worker_main_loop",
137 ALLOCSET_DEFAULT_MINSIZE,
138 ALLOCSET_DEFAULT_INITSIZE,
139 ALLOCSET_DEFAULT_MAXSIZE);
140
141 MemoryContextSwitchTo(TopMemoryContext);
142
143 /*
144 * install the call back for preparation of pcp worker child exit
145 */
146 on_system_exit(pcp_worker_will_go_down, (Datum) NULL);
147
148 /* Initialize my backend status */
149 pool_initialize_private_backend_status();
150
151 /* Initialize process context */
152 pool_init_process_context();
153
154 pcp_frontend = pcp_open(port);
155 unset_nonblock(pcp_frontend->fd);
156
157 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
158 {
159 error_context_stack = NULL;
160 EmitErrorReport();
161
162 MemoryContextSwitchTo(TopMemoryContext);
163 FlushErrorState();
164 }
165 /* We can now handle ereport(ERROR) */
166 PG_exception_stack = &local_sigjmp_buf;
167
168 for (;;)
169 {
170 char *buf = NULL;
171
172 MemoryContextSwitchTo(PCPMemoryContext);
173 MemoryContextResetAndDeleteChildren(PCPMemoryContext);
174
175 errno = 0;
176
177 /* read a PCP packet */
178 do_pcp_read(pcp_frontend, &tos, 1);
179 do_pcp_read(pcp_frontend, &rsize, sizeof(int));
180
181 rsize = ntohl(rsize);
182
183 if (rsize <= 0 || rsize >= MAX_PCP_PACKET_LENGTH)
184 ereport(FATAL,
185 (errmsg("invalid PCP packet"),
186 errdetail("incorrect packet length (%d)", rsize)));
187
188 if ((rsize - sizeof(int)) > 0)
189 {
190 buf = (char *) palloc(rsize - sizeof(int));
191 do_pcp_read(pcp_frontend, buf, rsize - sizeof(int));
192 }
193
194 ereport(DEBUG1,
195 (errmsg("received PCP packet"),
196 errdetail("PCP packet type of service '%c'", tos)));
197
198 if (tos == 'R') /* authentication */
199 {
200 set_ps_display("PCP: processing authentication", false);
201 process_authentication(pcp_frontend, buf, salt, &random_salt);
202 authenticated = 1;
203 continue;
204 }
205 if (tos == 'M') /* md5 salt */
206 {
207 set_ps_display("PCP: processing authentication", false);
208 send_md5salt(pcp_frontend, salt);
209 random_salt = 1;
210 continue;
211 }
212 /* is this connection authenticated? if not disconnect immediately */
213 if (!authenticated)
214 ereport(FATAL,
215 (errmsg("authentication failed for new PCP connection"),
216 errdetail("connection not authorized")));
217
218 /* process a request */
219 pcp_process_command(tos, buf, rsize);
220 }
221 exit(0);
222 }
223
224
225 /* pcp command processor */
226 static void
pcp_process_command(char tos,char * buf,int buf_len)227 pcp_process_command(char tos, char *buf, int buf_len)
228 {
229 if (tos == 'O' || tos == 'T')
230 {
231 if (Req_info->switching)
232 {
233 if (Req_info->request_queue_tail != Req_info->request_queue_head)
234 {
235 POOL_REQUEST_KIND reqkind;
236
237 reqkind = Req_info->request[(Req_info->request_queue_head + 1) % MAX_REQUEST_QUEUE_SIZE].kind;
238
239 if (reqkind == NODE_UP_REQUEST)
240 ereport(ERROR,
241 (errmsg("failed to process PCP request at the moment"),
242 errdetail("failback is in progress")));
243 else if (reqkind == NODE_DOWN_REQUEST)
244 ereport(ERROR,
245 (errmsg("failed to process PCP request at the moment"),
246 errdetail("failover is in progress")));
247 else if (reqkind == PROMOTE_NODE_REQUEST)
248 ereport(ERROR,
249 (errmsg("failed to process PCP request at the moment"),
250 errdetail("promote node operation is in progress")));
251
252 ereport(ERROR,
253 (errmsg("failed to process PCP request at the moment"),
254 errdetail("operation is in progress")));
255 }
256 }
257 }
258
259 switch (tos)
260 {
261 case 'A': /* set configuration parameter */
262 set_ps_display("PCP: processing set configration parameter request", false);
263 process_set_configration_parameter(pcp_frontend, buf, buf_len);
264 break;
265
266 case 'L': /* node count */
267 set_ps_display("PCP: processing node count request", false);
268 inform_node_count(pcp_frontend);
269 break;
270
271 case 'H': /* health check stats */
272 set_ps_display("PCP: processing health check stats request", false);
273 inform_health_check_stats(pcp_frontend, buf);
274 break;
275
276 case 'I': /* node info */
277 set_ps_display("PCP: processing node info request", false);
278 inform_node_info(pcp_frontend, buf);
279 break;
280
281 case 'N': /* process count */
282 set_ps_display("PCP: processing process count request", false);
283 inform_process_count(pcp_frontend);
284 break;
285
286 case 'P': /* process info */
287 set_ps_display("PCP: processing process info request", false);
288 inform_process_info(pcp_frontend, buf);
289 break;
290
291 case 'W': /* watchdog info */
292 set_ps_display("PCP: processing watchdog info request", false);
293 inform_watchdog_info(pcp_frontend, buf);
294 break;
295
296 case 'D': /* detach node */
297 case 'd': /* detach node gracefully */
298 set_ps_display("PCP: processing detach node request", false);
299 process_detach_node(pcp_frontend, buf, tos);
300 break;
301
302 case 'C': /* attach node */
303 set_ps_display("PCP: processing attach node request", false);
304 process_attach_node(pcp_frontend, buf);
305 break;
306
307 case 'T':
308 case 't':
309 set_ps_display("PCP: processing shutdown request", false);
310 process_shutown_request(pcp_frontend, buf[0], tos);
311 break;
312
313 case 'O': /* recovery request */
314 set_ps_display("PCP: processing recovery request", false);
315 process_recovery_request(pcp_frontend, buf);
316 break;
317
318 case 'B': /* status request */
319 set_ps_display("PCP: processing status request request", false);
320 process_status_request(pcp_frontend);
321 break;
322
323 case 'Z': /*reload config file */
324 set_ps_display("PCP: processing reload config request", false);
325 process_reload_config(pcp_frontend, buf[0]);
326 break;
327
328 case 'J': /* promote node */
329 case 'j': /* promote node gracefully */
330 set_ps_display("PCP: processing promote node request", false);
331 process_promote_node(pcp_frontend, buf, tos);
332 break;
333
334 case 'F':
335 ereport(DEBUG1,
336 (errmsg("PCP processing request, stop online recovery")));
337 break;
338
339 case 'X': /* disconnect */
340 ereport(DEBUG1,
341 (errmsg("PCP processing request, client disconnecting"),
342 errdetail("closing PCP connection, and exiting child")));
343 pcp_close(pcp_frontend);
344 pcp_frontend = NULL;
345 /* This child has done its part. Rest in peace now */
346 exit(0);
347 break;
348
349 default:
350 ereport(FATAL,
351 (errmsg("PCP processing request"),
352 errdetail("unknown PCP packet type \"%c\"", tos)));
353 }
354 }
355
356 static RETSIGTYPE
die(int sig)357 die(int sig)
358 {
359 ereport(DEBUG1,
360 (errmsg("PCP worker child receives shutdown request signal %d", sig)));
361 if (sig == SIGTERM)
362 {
363 ereport(DEBUG1,
364 (errmsg("PCP worker child receives smart shutdown request."),
365 errdetail("waiting for the child to die its natural death")));
366 }
367 else if (sig == SIGINT)
368 {
369 ereport(DEBUG1,
370 (errmsg("PCP worker child receives fast shutdown request.")));
371 exit(0);
372 }
373 else if (sig == SIGQUIT)
374 {
375 ereport(DEBUG1,
376 (errmsg("PCP worker child receives immediate shutdown request.")));
377 exit(0);
378 }
379 else
380 exit(1);
381 }
382
383
384 static RETSIGTYPE
wakeup_handler_child(int sig)385 wakeup_handler_child(int sig)
386 {
387 pcp_worker_wakeup_request = 1;
388 }
389
390 /*
391 * unset non-block flag
392 */
393 static void
unset_nonblock(int fd)394 unset_nonblock(int fd)
395 {
396 int var;
397
398 /* set fd to non-blocking */
399 var = fcntl(fd, F_GETFL, 0);
400 if (var == -1)
401 {
402 ereport(FATAL,
403 (errmsg("unable to connect"),
404 errdetail("fcntl system call failed with error : \"%m\"")));
405
406 }
407 if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
408 {
409 ereport(FATAL,
410 (errmsg("unable to connect"),
411 errdetail("fcntl system call failed with error : \"%m\"")));
412 }
413 }
414
415 /*
416 * see if received username and password matches with one in the file
417 */
418 static int
user_authenticate(char * buf,char * passwd_file,char * salt,int salt_len)419 user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len)
420 {
421 FILE *fp = NULL;
422 char packet_username[MAX_USER_PASSWD_LEN + 1];
423 char packet_password[MAX_USER_PASSWD_LEN + 1];
424 char encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
425 char file_username[MAX_USER_PASSWD_LEN + 1];
426 char file_password[MAX_USER_PASSWD_LEN + 1];
427 char *index = NULL;
428 static char line[MAX_FILE_LINE_LEN + 1];
429 int i,
430 len;
431
432 /* strcpy() should be OK, but use strncpy() to be extra careful */
433 strncpy(packet_username, buf, MAX_USER_PASSWD_LEN);
434 index = (char *) memchr(buf, '\0', MAX_USER_PASSWD_LEN);
435 if (index == NULL)
436 {
437 ereport(FATAL,
438 (errmsg("failed to authenticate PCP user"),
439 errdetail("error while reading authentication packet")));
440 return 0;
441 }
442 strncpy(packet_password, ++index, MAX_USER_PASSWD_LEN);
443
444 fp = fopen(passwd_file, "r");
445 if (fp == NULL)
446 {
447 ereport(FATAL,
448 (errmsg("failed to authenticate PCP user"),
449 errdetail("could not open %s. reason: %m", passwd_file)));
450 return 0;
451 }
452
453 /* for now, I don't care if duplicate username exists in the config file */
454 while ((fgets(line, MAX_FILE_LINE_LEN, fp)) != NULL)
455 {
456 i = 0;
457 len = 0;
458
459 if (line[0] == '\n' || line[0] == '#')
460 continue;
461
462 while (line[i] != ':')
463 {
464 len++;
465 if (++i > MAX_USER_PASSWD_LEN)
466 {
467 fclose(fp);
468 ereport(FATAL,
469 (errmsg("failed to authenticate PCP user"),
470 errdetail("username read from file \"%s\" is larger than maximum allowed username length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
471 return 0;
472 }
473 }
474 memcpy(file_username, line, len);
475 file_username[len] = '\0';
476
477 if (strcmp(packet_username, file_username) != 0)
478 continue;
479
480 i++;
481 len = 0;
482 while (line[i] != '\n' && line[i] != '\0')
483 {
484 len++;
485 if (++i > MAX_USER_PASSWD_LEN)
486 {
487 fclose(fp);
488 ereport(FATAL,
489 (errmsg("failed to authenticate PCP user"),
490 errdetail("password read from file \"%s\" is larger than maximum allowed password length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
491 return 0;
492 }
493 }
494
495 memcpy(file_password, line + strlen(file_username) + 1, len);
496 file_password[len] = '\0';
497
498 pool_md5_encrypt(file_password, file_username, strlen(file_username),
499 encrypt_buf + MD5_PASSWD_LEN + 1);
500 encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
501
502 pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, salt_len,
503 encrypt_buf);
504 encrypt_buf[MD5_PASSWD_LEN] = '\0';
505
506 if (strcmp(encrypt_buf, packet_password) == 0)
507 {
508 fclose(fp);
509 return 1;
510 }
511 }
512 fclose(fp);
513 ereport(FATAL,
514 (errmsg("authentication failed for user \"%s\"", packet_username),
515 errdetail("username and/or password does not match")));
516
517 return 0;
518 }
519
520
521 /* Detach a node */
522 static int
pool_detach_node(int node_id,bool gracefully)523 pool_detach_node(int node_id, bool gracefully)
524 {
525 if (!gracefully)
526 {
527 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
528 return 0;
529 }
530
531 /*
532 * Check if the NODE DOWN can be executed on the given node id.
533 */
534 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
535
536 /*
537 * Wait until all frontends exit
538 */
539 *InRecovery = RECOVERY_DETACH; /* This wiil ensure that new incoming
540 * connection requests are blocked */
541
542 if (wait_connection_closed())
543 {
544 /* wait timed out */
545 finish_recovery();
546 return -1;
547 }
548
549 pcp_worker_wakeup_request = 0;
550
551 /*
552 * Now all frontends have gone. Let's do failover.
553 */
554 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, false);
555
556 /*
557 * Wait for failover completed.
558 */
559
560 while (!pcp_worker_wakeup_request)
561 {
562 struct timeval t = {1, 0};
563
564 select(0, NULL, NULL, NULL, &t);
565 }
566 pcp_worker_wakeup_request = 0;
567
568 /*
569 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
570 * to distribute SIGUSR2 all pgpool children.
571 */
572 finish_recovery();
573
574 return 0;
575 }
576
577 /* Promote a node */
578 static int
pool_promote_node(int node_id,bool gracefully)579 pool_promote_node(int node_id, bool gracefully)
580 {
581 if (!gracefully)
582 {
583 promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
584 return 0;
585 }
586
587 /*
588 * Wait until all frontends exit
589 */
590 *InRecovery = RECOVERY_PROMOTE; /* This wiil ensure that new incoming
591 * connection requests are blocked */
592
593 if (wait_connection_closed())
594 {
595 /* wait timed out */
596 finish_recovery();
597 return -1;
598 }
599
600 /*
601 * Now all frontends have gone. Let's do failover.
602 */
603 promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
604
605 /*
606 * Wait for failover completed.
607 */
608 pcp_worker_wakeup_request = 0;
609
610 while (!pcp_worker_wakeup_request)
611 {
612 struct timeval t = {1, 0};
613
614 select(0, NULL, NULL, NULL, &t);
615 }
616 pcp_worker_wakeup_request = 0;
617
618 /*
619 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
620 * to distribute SIGUSR2 all pgpool children.
621 */
622 finish_recovery();
623 return 0;
624 }
625
626 static void
inform_process_count(PCP_CONNECTION * frontend)627 inform_process_count(PCP_CONNECTION * frontend)
628 {
629 int wsize;
630 int process_count;
631 char process_count_str[16];
632 int *process_list = NULL;
633 char code[] = "CommandComplete";
634 char *mesg = NULL;
635 int i;
636 int total_port_len = 0;
637
638 process_list = pool_get_process_list(&process_count);
639
640 mesg = (char *) palloc(7 * process_count); /* PID is at most 6 characters
641 * long */
642
643 snprintf(process_count_str, sizeof(process_count_str), "%d", process_count);
644
645 for (i = 0; i < process_count; i++)
646 {
647 char process_id[7];
648
649 snprintf(process_id, sizeof(process_id), "%d", process_list[i]);
650 snprintf(mesg + total_port_len, strlen(process_id) + 1, "%s", process_id);
651 total_port_len += strlen(process_id) + 1;
652 }
653
654 pcp_write(frontend, "n", 1);
655 wsize = htonl(sizeof(code) +
656 strlen(process_count_str) + 1 +
657 total_port_len +
658 sizeof(int));
659 pcp_write(frontend, &wsize, sizeof(int));
660 pcp_write(frontend, code, sizeof(code));
661 pcp_write(frontend, process_count_str, strlen(process_count_str) + 1);
662 pcp_write(frontend, mesg, total_port_len);
663 do_pcp_flush(frontend);
664
665 pfree(process_list);
666 pfree(mesg);
667
668 ereport(DEBUG1,
669 (errmsg("PCP: informing process count"),
670 errdetail("%d process(es) found", process_count)));
671 }
672
673 static void
inform_process_info(PCP_CONNECTION * frontend,char * buf)674 inform_process_info(PCP_CONNECTION * frontend, char *buf)
675 {
676 int proc_id;
677 int wsize;
678 int num_proc = pool_config->num_init_children;
679 int i;
680
681 proc_id = atoi(buf);
682
683 if ((proc_id != 0) && (pool_get_process_info(proc_id) == NULL))
684 {
685 ereport(ERROR,
686 (errmsg("informing process info failed"),
687 errdetail("invalid process ID : %s", buf)));
688 }
689 else
690 {
691 /* First, send array size of connection_info */
692 char arr_code[] = "ArraySize";
693 char con_info_size[16];
694
695 /* Finally, indicate that all data is sent */
696 char fin_code[] = "CommandComplete";
697
698 POOL_REPORT_POOLS *pools = get_pools(&num_proc);
699
700 if (proc_id == 0)
701 {
702 snprintf(con_info_size, sizeof(con_info_size), "%d", num_proc);
703 }
704 else
705 {
706 snprintf(con_info_size, sizeof(con_info_size), "%d", pool_config->max_pool * NUM_BACKENDS);
707 }
708
709 pcp_write(frontend, "p", 1);
710 wsize = htonl(sizeof(arr_code) +
711 strlen(con_info_size) + 1 +
712 sizeof(int));
713 pcp_write(frontend, &wsize, sizeof(int));
714 pcp_write(frontend, arr_code, sizeof(arr_code));
715 pcp_write(frontend, con_info_size, strlen(con_info_size) + 1);
716 do_pcp_flush(frontend);
717
718 /* Second, send process information for all connection_info */
719 for (i = 0; i < num_proc; i++)
720 {
721 char code[] = "ProcessInfo";
722 char proc_pid[16];
723 char proc_start_time[20];
724 char proc_create_time[20];
725 char majorversion[5];
726 char minorversion[5];
727 char pool_counter[16];
728 char backend_id[16];
729 char backend_pid[16];
730 char connected[2];
731
732 if (proc_id != 0 && proc_id != pools[i].pool_pid)
733 continue;
734
735 snprintf(proc_pid, sizeof(proc_pid), "%d", pools[i].pool_pid);
736 snprintf(proc_start_time, sizeof(proc_start_time), "%ld", pools[i].start_time);
737 snprintf(proc_create_time, sizeof(proc_create_time), "%ld", pools[i].create_time);
738 snprintf(majorversion, sizeof(majorversion), "%d", pools[i].pool_majorversion);
739 snprintf(minorversion, sizeof(minorversion), "%d", pools[i].pool_minorversion);
740 snprintf(pool_counter, sizeof(pool_counter), "%d", pools[i].pool_counter);
741 snprintf(backend_id, sizeof(backend_pid), "%d", pools[i].backend_id);
742 snprintf(backend_pid, sizeof(backend_pid), "%d", pools[i].pool_backendpid);
743 snprintf(connected, sizeof(connected), "%d", pools[i].pool_connected);
744
745 pcp_write(frontend, "p", 1);
746 wsize = htonl(sizeof(code) +
747 strlen(proc_pid) + 1 +
748 strlen(pools[i].database) + 1 +
749 strlen(pools[i].username) + 1 +
750 strlen(proc_start_time) + 1 +
751 strlen(proc_create_time) + 1 +
752 strlen(majorversion) + 1 +
753 strlen(minorversion) + 1 +
754 strlen(pool_counter) + 1 +
755 strlen(backend_id) + 1 +
756 strlen(backend_pid) + 1 +
757 strlen(connected) + 1 +
758 sizeof(int));
759 pcp_write(frontend, &wsize, sizeof(int));
760 pcp_write(frontend, code, sizeof(code));
761 pcp_write(frontend, proc_pid, strlen(proc_pid) + 1);
762 pcp_write(frontend, pools[i].database, strlen(pools[i].database) + 1);
763 pcp_write(frontend, pools[i].username, strlen(pools[i].username) + 1);
764 pcp_write(frontend, proc_start_time, strlen(proc_start_time) + 1);
765 pcp_write(frontend, proc_create_time, strlen(proc_create_time) + 1);
766 pcp_write(frontend, majorversion, strlen(majorversion) + 1);
767 pcp_write(frontend, minorversion, strlen(minorversion) + 1);
768 pcp_write(frontend, pool_counter, strlen(pool_counter) + 1);
769 pcp_write(frontend, backend_id, strlen(backend_id) + 1);
770 pcp_write(frontend, backend_pid, strlen(backend_pid) + 1);
771 pcp_write(frontend, connected, strlen(connected) + 1);
772 do_pcp_flush(frontend);
773 }
774
775 pcp_write(frontend, "p", 1);
776 wsize = htonl(sizeof(fin_code) +
777 sizeof(int));
778 pcp_write(frontend, &wsize, sizeof(int));
779 pcp_write(frontend, fin_code, sizeof(fin_code));
780 do_pcp_flush(frontend);
781 ereport(DEBUG1,
782 (errmsg("PCP informing process info"),
783 errdetail("retrieved process information from shared memory")));
784
785 pfree(pools);
786 }
787 }
788
789 static void
inform_watchdog_info(PCP_CONNECTION * frontend,char * buf)790 inform_watchdog_info(PCP_CONNECTION * frontend, char *buf)
791 {
792 int wd_index;
793 int json_data_len;
794 int wsize;
795 char code[] = "CommandComplete";
796 char *json_data;
797
798 if (!pool_config->use_watchdog)
799 ereport(ERROR,
800 (errmsg("PCP: informing watchdog info failed"),
801 errdetail("watcdhog is not enabled")));
802
803 wd_index = atoi(buf);
804
805 json_data = wd_internal_get_watchdog_nodes_json(wd_index);
806 if (json_data == NULL)
807 ereport(ERROR,
808 (errmsg("PCP: informing watchdog info failed"),
809 errdetail("invalid watchdog index")));
810
811 ereport(DEBUG2,
812 (errmsg("PCP: informing watchdog info"),
813 errdetail("retrieved node information from IPC socket")));
814
815 /*
816 * This is the voilation of PCP protocol but I think in future we should
817 * shift to more adaptable protocol for data transmition.
818 */
819 json_data_len = strlen(json_data);
820 wsize = htonl(sizeof(code) +
821 json_data_len + 1 +
822 sizeof(int));
823 pcp_write(frontend, "w", 1);
824
825 pcp_write(frontend, &wsize, sizeof(int));
826 pcp_write(frontend, code, sizeof(code));
827
828 pcp_write(frontend, json_data, json_data_len + 1);
829 do_pcp_flush(frontend);
830
831 pfree(json_data);
832 }
833
834 static void
inform_node_info(PCP_CONNECTION * frontend,char * buf)835 inform_node_info(PCP_CONNECTION * frontend, char *buf)
836 {
837 int node_id;
838 int wsize;
839 char port_str[6];
840 char status[2];
841 char weight_str[20];
842 char role_str[10];
843 char standby_delay_str[20];
844 char status_changed_time_str[20];
845 char code[] = "CommandComplete";
846 BackendInfo *bi = NULL;
847 SERVER_ROLE role;
848
849 node_id = atoi(buf);
850
851 bi = pool_get_node_info(node_id);
852
853 if (bi == NULL)
854 ereport(ERROR,
855 (errmsg("informing node info failed"),
856 errdetail("invalid node ID")));
857
858 ereport(DEBUG2,
859 (errmsg("PCP: informing node info"),
860 errdetail("retrieved node information from shared memory")));
861
862 snprintf(port_str, sizeof(port_str), "%d", bi->backend_port);
863 snprintf(status, sizeof(status), "%d", bi->backend_status);
864 snprintf(weight_str, sizeof(weight_str), "%f", bi->backend_weight);
865
866 if (STREAM)
867 {
868 if (Req_info->primary_node_id == node_id)
869 role = ROLE_PRIMARY;
870 else
871 role = ROLE_STANDBY;
872 }
873 else
874 {
875 if (Req_info->main_node_id == node_id)
876 role = ROLE_MAIN;
877 else
878 role = ROLE_REPLICA;
879 }
880 snprintf(role_str, sizeof(role_str), "%d", role);
881
882 snprintf(standby_delay_str, sizeof(standby_delay_str), UINT64_FORMAT, bi->standby_delay);
883
884 snprintf(status_changed_time_str, sizeof(status_changed_time_str), UINT64_FORMAT, bi->status_changed_time);
885
886 pcp_write(frontend, "i", 1);
887 wsize = htonl(sizeof(code) +
888 strlen(bi->backend_hostname) + 1 +
889 strlen(port_str) + 1 +
890 strlen(status) + 1 +
891 strlen(weight_str) + 1 +
892 strlen(role_str) + 1 +
893 strlen(standby_delay_str) + 1 +
894 strlen(bi->replication_state) + 1 +
895 strlen(bi->replication_sync_state) + 1 +
896 strlen(status_changed_time_str) + 1 +
897 sizeof(int));
898 pcp_write(frontend, &wsize, sizeof(int));
899 pcp_write(frontend, code, sizeof(code));
900 pcp_write(frontend, bi->backend_hostname, strlen(bi->backend_hostname) + 1);
901 pcp_write(frontend, port_str, strlen(port_str) + 1);
902 pcp_write(frontend, status, strlen(status) + 1);
903 pcp_write(frontend, weight_str, strlen(weight_str) + 1);
904 pcp_write(frontend, role_str, strlen(role_str) + 1);
905 pcp_write(frontend, standby_delay_str, strlen(standby_delay_str) + 1);
906 pcp_write(frontend, bi->replication_state, strlen(bi->replication_state) + 1);
907 pcp_write(frontend, bi->replication_sync_state, strlen(bi->replication_sync_state) + 1);
908 pcp_write(frontend, status_changed_time_str, strlen(status_changed_time_str) + 1);
909
910 do_pcp_flush(frontend);
911 }
912
913 /*
914 * Send out health check stats data to pcp client. node id is provided as a
915 * string in buf parameter.
916 *
917 * The protocol starts with 'h', followed by 4-byte packet length integer in
918 * network byte order including self. Each data is represented as a null
919 * terminted string. The order of each data is defined in
920 * POOL_HEALTH_CHECK_STATS struct.
921 */
922 static void
inform_health_check_stats(PCP_CONNECTION * frontend,char * buf)923 inform_health_check_stats(PCP_CONNECTION *frontend, char *buf)
924 {
925 POOL_HEALTH_CHECK_STATS *stats;
926 POOL_HEALTH_CHECK_STATS *s;
927 int *offsets;
928 int n;
929 int nrows;
930 int i;
931 int node_id;
932 bool node_id_ok = false;
933 int wsize;
934 char code[] = "CommandComplete";
935
936 node_id = atoi(buf);
937
938 if (node_id < 0 || node_id > NUM_BACKENDS)
939 {
940 ereport(ERROR,
941 (errmsg("informing health check stats info failed"),
942 errdetail("invalid node ID %d", node_id)));
943 }
944
945 stats = get_health_check_stats(&nrows);
946
947 for (i = 0; i < nrows; i++)
948 {
949 if (atoi(stats[i].node_id) == node_id)
950 {
951 node_id_ok = true;
952 s = &stats[i];
953 break;
954 }
955 }
956
957 if (!node_id_ok)
958 {
959 ereport(ERROR,
960 (errmsg("informing health check stats info failed"),
961 errdetail("stats data for node ID %d does not exist", node_id)));
962 }
963
964 pcp_write(frontend, "h", 1); /* indicate that this is a reply to health check stats request */
965
966 wsize = sizeof(code) + sizeof(int);
967
968 /* Calculate total packet length */
969 offsets = pool_health_check_stats_offsets(&n);
970
971 for (i = 0; i < n; i++)
972 {
973 wsize += strlen((char *)s + offsets[i]) + 1;
974 }
975 wsize = htonl(wsize); /* convert to network byte order */
976
977 /* send packet length to frontend */
978 pcp_write(frontend, &wsize, sizeof(int));
979 /* send "Command Complete" to frontend */
980 pcp_write(frontend, code, sizeof(code));
981
982 /* send each health check stats data to frontend */
983 for (i = 0; i < n; i++)
984 {
985 pcp_write(frontend, (char *)s + offsets[i], strlen((char *)s + offsets[i]) + 1);
986 }
987 pfree(stats);
988 do_pcp_flush(frontend);
989 }
990
991 static void
inform_node_count(PCP_CONNECTION * frontend)992 inform_node_count(PCP_CONNECTION * frontend)
993 {
994 int wsize;
995 char mesg[16];
996 char code[] = "CommandComplete";
997 int node_count = pool_get_node_count();
998
999 snprintf(mesg, sizeof(mesg), "%d", node_count);
1000
1001 pcp_write(frontend, "l", 1);
1002 wsize = htonl(sizeof(code) +
1003 strlen(mesg) + 1 +
1004 sizeof(int));
1005 pcp_write(frontend, &wsize, sizeof(int));
1006 pcp_write(frontend, code, sizeof(code));
1007 pcp_write(frontend, mesg, strlen(mesg) + 1);
1008 do_pcp_flush(frontend);
1009
1010 ereport(DEBUG1,
1011 (errmsg("PCP: informing node count"),
1012 errdetail("%d node(s) found", node_count)));
1013 }
1014 static void
process_reload_config(PCP_CONNECTION * frontend,char scope)1015 process_reload_config(PCP_CONNECTION * frontend, char scope)
1016 {
1017 char code[] = "CommandComplete";
1018 int wsize;
1019
1020 if (scope == 'c' && pool_config->use_watchdog)
1021 {
1022 ereport(LOG,
1023 (errmsg("PCP: sending command to watchdog to reload config cluster")));
1024
1025 if (wd_execute_cluster_command(WD_COMMAND_RELOAD_CONFIG_CLUSTER,0, NULL) != COMMAND_OK)
1026 ereport(ERROR,
1027 (errmsg("PCP: error while processing reload config request for cluster"),
1028 errdetail("failed to propogate reload config command through watchdog")));
1029 }
1030
1031 if(pool_signal_parent(SIGHUP) == -1)
1032 {
1033 ereport(ERROR,
1034 (errmsg("process reload config request failed"),
1035 errdetail("failed to signal pgpool parent process")));
1036 }
1037
1038 pcp_write(frontend, "z", 1);
1039 wsize = htonl(sizeof(code) + sizeof(int));
1040 pcp_write(frontend, &wsize, sizeof(int));
1041 pcp_write(frontend, code, sizeof(code));
1042 do_pcp_flush(frontend);
1043 }
1044
1045 static void
process_detach_node(PCP_CONNECTION * frontend,char * buf,char tos)1046 process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos)
1047 {
1048 int node_id;
1049 int wsize;
1050 char code[] = "CommandComplete";
1051 bool gracefully;
1052
1053 if (tos == 'D')
1054 gracefully = false;
1055 else
1056 gracefully = true;
1057
1058 node_id = atoi(buf);
1059 ereport(DEBUG1,
1060 (errmsg("PCP: processing detach node"),
1061 errdetail("detaching Node ID %d", node_id)));
1062
1063 pool_detach_node(node_id, gracefully);
1064
1065 pcp_write(frontend, "d", 1);
1066 wsize = htonl(sizeof(code) + sizeof(int));
1067 pcp_write(frontend, &wsize, sizeof(int));
1068 pcp_write(frontend, code, sizeof(code));
1069 do_pcp_flush(frontend);
1070 }
1071
1072 static void
process_attach_node(PCP_CONNECTION * frontend,char * buf)1073 process_attach_node(PCP_CONNECTION * frontend, char *buf)
1074 {
1075 int node_id;
1076 int wsize;
1077 char code[] = "CommandComplete";
1078
1079 node_id = atoi(buf);
1080 ereport(DEBUG1,
1081 (errmsg("PCP: processing attach node"),
1082 errdetail("attaching Node ID %d", node_id)));
1083
1084 send_failback_request(node_id, true, REQ_DETAIL_CONFIRMED);
1085
1086 pcp_write(frontend, "c", 1);
1087 wsize = htonl(sizeof(code) + sizeof(int));
1088 pcp_write(frontend, &wsize, sizeof(int));
1089 pcp_write(frontend, code, sizeof(code));
1090 do_pcp_flush(frontend);
1091 }
1092
1093
1094 static void
process_recovery_request(PCP_CONNECTION * frontend,char * buf)1095 process_recovery_request(PCP_CONNECTION * frontend, char *buf)
1096 {
1097 int wsize;
1098 char code[] = "CommandComplete";
1099 int node_id = atoi(buf);
1100
1101 if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
1102 ereport(ERROR,
1103 (errmsg("process recovery request failed"),
1104 errdetail("node id %d is not valid", node_id)));
1105
1106 if ((!REPLICATION &&
1107 !(STREAM)) ||
1108 (STREAM &&
1109 node_id == PRIMARY_NODE_ID))
1110 {
1111 if (STREAM)
1112 ereport(ERROR,
1113 (errmsg("process recovery request failed"),
1114 errdetail("primary server cannot be recovered by online recovery.")));
1115 else
1116 ereport(ERROR,
1117 (errmsg("process recovery request failed"),
1118 errdetail("recovery request is only allowed in replication and streaming replication modes.")));
1119 }
1120 else
1121 {
1122 if (pcp_mark_recovery_in_progress() == false)
1123 ereport(FATAL,
1124 (errmsg("process recovery request failed"),
1125 errdetail("pgpool-II is already processing another recovery request.")));
1126
1127 ereport(DEBUG1,
1128 (errmsg("PCP: processing recovery request"),
1129 errdetail("start online recovery")));
1130
1131 PG_TRY();
1132 {
1133 start_recovery(node_id);
1134 finish_recovery();
1135 pcp_write(frontend, "c", 1);
1136 wsize = htonl(sizeof(code) + sizeof(int));
1137 pcp_write(frontend, &wsize, sizeof(int));
1138 pcp_write(frontend, code, sizeof(code));
1139 do_pcp_flush(frontend);
1140 pcp_mark_recovery_finished();
1141 }
1142 PG_CATCH();
1143 {
1144 finish_recovery();
1145 pcp_mark_recovery_finished();
1146 PG_RE_THROW();
1147
1148 } PG_END_TRY();
1149 }
1150 do_pcp_flush(frontend);
1151 }
1152
1153 static void
process_status_request(PCP_CONNECTION * frontend)1154 process_status_request(PCP_CONNECTION * frontend)
1155 {
1156 int nrows = 0;
1157 int i;
1158 POOL_REPORT_CONFIG *status = get_config(&nrows);
1159 int len = 0;
1160
1161 /* First, send array size of connection_info */
1162 char arr_code[] = "ArraySize";
1163 char code[] = "ProcessConfig";
1164
1165 /* Finally, indicate that all data is sent */
1166 char fin_code[] = "CommandComplete";
1167
1168 pcp_write(frontend, "b", 1);
1169 len = htonl(sizeof(arr_code) + sizeof(int) + sizeof(int));
1170 pcp_write(frontend, &len, sizeof(int));
1171 pcp_write(frontend, arr_code, sizeof(arr_code));
1172 len = htonl(nrows);
1173 pcp_write(frontend, &len, sizeof(int));
1174
1175 do_pcp_flush(frontend);
1176
1177 for (i = 0; i < nrows; i++)
1178 {
1179 pcp_write(frontend, "b", 1);
1180 len = htonl(sizeof(int)
1181 + sizeof(code)
1182 + strlen(status[i].name) + 1
1183 + strlen(status[i].value) + 1
1184 + strlen(status[i].desc) + 1
1185 );
1186
1187 pcp_write(frontend, &len, sizeof(int));
1188 pcp_write(frontend, code, sizeof(code));
1189 pcp_write(frontend, status[i].name, strlen(status[i].name) + 1);
1190 pcp_write(frontend, status[i].value, strlen(status[i].value) + 1);
1191 pcp_write(frontend, status[i].desc, strlen(status[i].desc) + 1);
1192 }
1193
1194 pcp_write(frontend, "b", 1);
1195 len = htonl(sizeof(fin_code) + sizeof(int));
1196 pcp_write(frontend, &len, sizeof(int));
1197 pcp_write(frontend, fin_code, sizeof(fin_code));
1198 do_pcp_flush(frontend);
1199
1200 pfree(status);
1201 ereport(DEBUG1,
1202 (errmsg("PCP: processing status request"),
1203 errdetail("retrieved status information")));
1204 }
1205
1206 static void
process_promote_node(PCP_CONNECTION * frontend,char * buf,char tos)1207 process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos)
1208 {
1209 int node_id;
1210 int wsize;
1211 char code[] = "CommandComplete";
1212 bool gracefully;
1213
1214 if (tos == 'J')
1215 gracefully = false;
1216 else
1217 gracefully = true;
1218
1219 node_id = atoi(buf);
1220 if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
1221 ereport(ERROR,
1222 (errmsg("could not process recovery request"),
1223 errdetail("node id %d is not valid", node_id)));
1224 /* promoting node is reserved to Streaming Replication */
1225 if (!STREAM)
1226 {
1227 ereport(FATAL,
1228 (errmsg("invalid pgpool mode for process recovery request"),
1229 errdetail("not in streaming replication mode, can't promote node id %d", node_id)));
1230
1231 }
1232
1233 if (node_id == REAL_PRIMARY_NODE_ID)
1234 {
1235 ereport(FATAL,
1236 (errmsg("invalid pgpool mode for process recovery request"),
1237 errdetail("specified node is already primary node, can't promote node id %d", node_id)));
1238
1239 }
1240 ereport(DEBUG1,
1241 (errmsg("PCP: processing promote node"),
1242 errdetail("promoting Node ID %d", node_id)));
1243 pool_promote_node(node_id, gracefully);
1244
1245 pcp_write(frontend, "d", 1);
1246 wsize = htonl(sizeof(code) + sizeof(int));
1247 pcp_write(frontend, &wsize, sizeof(int));
1248 pcp_write(frontend, code, sizeof(code));
1249 do_pcp_flush(frontend);
1250 }
1251
1252 static void
process_authentication(PCP_CONNECTION * frontend,char * buf,char * salt,int * random_salt)1253 process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt)
1254 {
1255 int wsize;
1256 int authenticated;
1257
1258 if (*random_salt)
1259 {
1260 authenticated = user_authenticate(buf, pcp_conf_file, salt, 4);
1261 }
1262 if (!*random_salt || !authenticated)
1263 {
1264 ereport(FATAL,
1265 (errmsg("authentication failed"),
1266 errdetail("username and/or password does not match")));
1267
1268 *random_salt = 0;
1269 }
1270 else
1271 {
1272 char code[] = "AuthenticationOK";
1273
1274 pcp_write(frontend, "r", 1);
1275 wsize = htonl(sizeof(code) + sizeof(int));
1276 pcp_write(frontend, &wsize, sizeof(int));
1277 pcp_write(frontend, code, sizeof(code));
1278 do_pcp_flush(frontend);
1279 *random_salt = 0;
1280
1281 ereport(DEBUG1,
1282 (errmsg("PCP: processing authentication request"),
1283 errdetail("authentication OK")));
1284 }
1285 }
1286
1287 static void
send_md5salt(PCP_CONNECTION * frontend,char * salt)1288 send_md5salt(PCP_CONNECTION * frontend, char *salt)
1289 {
1290 int wsize;
1291
1292 ereport(DEBUG1,
1293 (errmsg("PCP: sending md5 salt to client")));
1294
1295 pool_random_salt(salt);
1296
1297 pcp_write(frontend, "m", 1);
1298 wsize = htonl(sizeof(int) + 4);
1299 pcp_write(frontend, &wsize, sizeof(int));
1300 pcp_write(frontend, salt, 4);
1301 do_pcp_flush(frontend);
1302 }
1303
1304 static void
process_shutown_request(PCP_CONNECTION * frontend,char mode,char tos)1305 process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos)
1306 {
1307 char code[] = "CommandComplete";
1308 int len;
1309
1310 ereport(DEBUG1,
1311 (errmsg("PCP: processing shutdown request"),
1312 errdetail("shutdown mode \"%c\"", mode)));
1313
1314 /* quickly bail out if invalid mode is specified
1315 * because we do not want to propogate the command
1316 * with invalid mode over the watchdog network */
1317 if (mode != 's' && mode != 'i' && mode != 'f' )
1318 {
1319 ereport(ERROR,
1320 (errmsg("PCP: error while processing shutdown request"),
1321 errdetail("invalid shutdown mode \"%c\"", mode)));
1322 }
1323
1324 if (tos == 't' && pool_config->use_watchdog)
1325 {
1326 WDExecCommandArg wdExecCommandArg;
1327
1328 strncpy(wdExecCommandArg.arg_name, "mode", sizeof(wdExecCommandArg.arg_name) - 1);
1329 snprintf(wdExecCommandArg.arg_value, sizeof(wdExecCommandArg.arg_name) - 1, "%c",mode);
1330
1331 ereport(LOG,
1332 (errmsg("PCP: sending command to watchdog to shutdown cluster")));
1333
1334 if (wd_execute_cluster_command(WD_COMMAND_SHUTDOWN_CLUSTER,1, &wdExecCommandArg) != COMMAND_OK)
1335 ereport(ERROR,
1336 (errmsg("PCP: error while processing shutdown cluster request"),
1337 errdetail("failed to propogate shutdown command through watchdog")));
1338 }
1339
1340 pcp_write(frontend, "t", 1);
1341 len = htonl(sizeof(code) + sizeof(int));
1342 pcp_write(frontend, &len, sizeof(int));
1343 pcp_write(frontend, code, sizeof(code));
1344 do_pcp_flush(frontend);
1345
1346 terminate_pgpool(mode, true);
1347 }
1348
1349 static void
process_set_configration_parameter(PCP_CONNECTION * frontend,char * buf,int len)1350 process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len)
1351 {
1352 char *param_name;
1353 char *param_value;
1354 int wsize;
1355 char code[] = "CommandComplete";
1356
1357 param_name = buf;
1358 if (param_name == NULL)
1359 ereport(ERROR,
1360 (errmsg("PCP: set configuration parameter failed"),
1361 errdetail("invalid pcp packet received from client")));
1362
1363 param_value = (char *) memchr(buf, '\0', len);
1364 if (param_value == NULL)
1365 ereport(ERROR,
1366 (errmsg("set configuration parameter failed"),
1367 errdetail("invalid pcp packet received from client")));
1368
1369 param_value += 1;
1370 ereport(LOG,
1371 (errmsg("set configuration parameter, \"%s TO %s\"", param_name, param_value)));
1372
1373 if (strcasecmp(param_name, "client_min_messages") == 0)
1374 {
1375 const char *ordered_valid_values[] = {"debug5", "debug4", "debug3", "debug2", "debug1", "log", "commerror", "info", "notice", "warning", "error", NULL};
1376 bool found = false;
1377 int i;
1378
1379 for (i = 0;; i++)
1380 {
1381 char *valid_val = (char *) ordered_valid_values[i];
1382
1383 if (!valid_val)
1384 break;
1385
1386 if (!strcasecmp(param_value, valid_val))
1387 {
1388 found = true;
1389 pool_config->client_min_messages = i + 10;
1390 ereport(DEBUG1,
1391 (errmsg("PCP setting parameter \"%s\" to \"%s\"", param_name, param_value)));
1392 break;
1393 }
1394 }
1395 if (!found)
1396 ereport(ERROR,
1397 (errmsg("PCP: set configuration parameter failed"),
1398 errdetail("invalid value \"%s\" for parameter \"%s\"", param_value, param_name)));
1399 }
1400 else
1401 ereport(ERROR,
1402 (errmsg("PCP: set configuration parameter failed"),
1403 errdetail("invalid parameter \"%s\"", param_name)));
1404
1405 pcp_write(frontend, "a", 1);
1406 wsize = htonl(sizeof(code) + sizeof(int));
1407 pcp_write(frontend, &wsize, sizeof(int));
1408 pcp_write(frontend, code, sizeof(code));
1409 do_pcp_flush(frontend);
1410 }
1411
1412 /*
1413 * Wrapper around pcp_flush which throws FATAL error when pcp_flush fails
1414 */
1415 static void
do_pcp_flush(PCP_CONNECTION * frontend)1416 do_pcp_flush(PCP_CONNECTION * frontend)
1417 {
1418 if (pcp_flush(frontend) < 0)
1419 ereport(FATAL,
1420 (errmsg("failed to flush data to client"),
1421 errdetail("pcp_flush failed with error : \"%m\"")));
1422 }
1423
1424 /*
1425 * Wrapper around pcp_read which throws FATAL error when read fails
1426 */
1427 static void
do_pcp_read(PCP_CONNECTION * pc,void * buf,int len)1428 do_pcp_read(PCP_CONNECTION * pc, void *buf, int len)
1429 {
1430 if (pcp_read(pc, buf, len))
1431 ereport(FATAL,
1432 (errmsg("unable to read from client"),
1433 errdetail("pcp_read failed with error : \"%m\"")));
1434 }
1435
1436 int
send_to_pcp_frontend(char * data,int len,bool flush)1437 send_to_pcp_frontend(char *data, int len, bool flush)
1438 {
1439 int ret;
1440
1441 if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1442 return -1;
1443 ret = pcp_write(pcp_frontend, data, len);
1444 if (flush && !ret)
1445 ret = pcp_flush(pcp_frontend);
1446 return ret;
1447 }
1448
1449 int
pcp_frontend_exists(void)1450 pcp_frontend_exists(void)
1451 {
1452 if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1453 return -1;
1454 return 0;
1455 }
1456
1457 static void
pcp_worker_will_go_down(int code,Datum arg)1458 pcp_worker_will_go_down(int code, Datum arg)
1459 {
1460 if (processType != PT_PCP_WORKER)
1461 {
1462 /* should never happen */
1463 ereport(WARNING,
1464 (errmsg("pcp_worker_will_go_down called from invalid process")));
1465 return;
1466 }
1467 if (pcp_frontend)
1468 pcp_close(pcp_frontend);
1469 processState = EXITING;
1470 POOL_SETMASK(&UnBlockSig);
1471
1472 }
1473