1 /* -*-pcp_worker.c-*- */
2 /*
3 *
4 * pgpool: a language independent connection pool server for PostgreSQL
5 * written by Tatsuo Ishii
6 *
7 * Copyright (c) 2003-2019 PgPool Global Development Group
8 *
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
19 *
20 * pcp_worker.c: PCP worker child process main
21 *
22 */
23
24 #include "config.h"
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28
29 #include <arpa/inet.h>
30 #include <signal.h>
31
32 #include <stdio.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <sys/time.h>
38
39 #ifdef HAVE_FCNTL_H
40 #include <fcntl.h>
41 #endif
42
43 #include "pcp/pcp_stream.h"
44 #include "pcp/pcp.h"
45 #include "auth/md5.h"
46 #include "pool_config.h"
47 #include "context/pool_process_context.h"
48 #include "utils/pool_process_reporting.h"
49 #include "watchdog/wd_json_data.h"
50 #include "watchdog/wd_ipc_commands.h"
51 #include "utils/elog.h"
52
53 #define MAX_FILE_LINE_LEN 512
54
55 extern char *pcp_conf_file; /* global variable defined in main.c holds the
56 * path for pcp.conf */
57 volatile sig_atomic_t pcp_worker_wakeup_request = 0;
58 PCP_CONNECTION *volatile pcp_frontend = NULL;
59
60 static RETSIGTYPE die(int sig);
61 static RETSIGTYPE wakeup_handler_child(int sig);
62
63 static void unset_nonblock(int fd);
64 static int user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len);
65 static void process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt);
66 static void send_md5salt(PCP_CONNECTION * frontend, char *salt);
67
68 static void pcp_process_command(char tos, char *buf, int buf_len);
69
70 static int pool_detach_node(int node_id, bool gracefully);
71 static int pool_promote_node(int node_id, bool gracefully);
72 static void inform_process_count(PCP_CONNECTION * frontend);
73 static void inform_process_info(PCP_CONNECTION * frontend, char *buf);
74 static void inform_watchdog_info(PCP_CONNECTION * frontend, char *buf);
75 static void inform_node_info(PCP_CONNECTION * frontend, char *buf);
76 static void inform_node_count(PCP_CONNECTION * frontend);
77 static void process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos);
78 static void process_attach_node(PCP_CONNECTION * frontend, char *buf);
79 static void process_recovery_request(PCP_CONNECTION * frontend, char *buf);
80 static void process_status_request(PCP_CONNECTION * frontend);
81 static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos);
82 static void process_shutown_request(PCP_CONNECTION * frontend, char mode);
83 static void process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len);
84
85 static void pcp_worker_will_go_down(int code, Datum arg);
86
87 static void do_pcp_flush(PCP_CONNECTION * frontend);
88 static void do_pcp_read(PCP_CONNECTION * pc, void *buf, int len);
89
90 /*
91 * main entry pont of pcp worker child process
92 */
93 void
pcp_worker_main(int port)94 pcp_worker_main(int port)
95 {
96 sigjmp_buf local_sigjmp_buf;
97 MemoryContext PCPMemoryContext;
98 volatile int authenticated = 0;
99
100 char salt[4];
101 int random_salt = 0;
102 struct timeval uptime;
103 char tos;
104 int rsize;
105
106 ereport(DEBUG1,
107 (errmsg("I am PCP worker child with pid:%d", getpid())));
108
109 /* Identify myself via ps */
110 init_ps_display("", "", "", "");
111
112 gettimeofday(&uptime, NULL);
113 srandom((unsigned int) (getpid() ^ uptime.tv_usec));
114
115 /* set up signal handlers */
116 signal(SIGTERM, die);
117 signal(SIGINT, die);
118 signal(SIGQUIT, die);
119 signal(SIGCHLD, SIG_DFL);
120 signal(SIGUSR2, wakeup_handler_child);
121 signal(SIGUSR1, SIG_IGN);
122 signal(SIGHUP, SIG_IGN);
123 signal(SIGPIPE, SIG_IGN);
124 signal(SIGALRM, SIG_IGN);
125 /* Create per loop iteration memory context */
126 PCPMemoryContext = AllocSetContextCreate(TopMemoryContext,
127 "PCP_worker_main_loop",
128 ALLOCSET_DEFAULT_MINSIZE,
129 ALLOCSET_DEFAULT_INITSIZE,
130 ALLOCSET_DEFAULT_MAXSIZE);
131
132 MemoryContextSwitchTo(TopMemoryContext);
133
134 /*
135 * install the call back for preparation of pcp worker child exit
136 */
137 on_system_exit(pcp_worker_will_go_down, (Datum) NULL);
138
139 /* Initialize my backend status */
140 pool_initialize_private_backend_status();
141
142 /* Initialize process context */
143 pool_init_process_context();
144
145 pcp_frontend = pcp_open(port);
146 unset_nonblock(pcp_frontend->fd);
147
148 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
149 {
150 error_context_stack = NULL;
151 EmitErrorReport();
152
153 MemoryContextSwitchTo(TopMemoryContext);
154 FlushErrorState();
155 }
156 /* We can now handle ereport(ERROR) */
157 PG_exception_stack = &local_sigjmp_buf;
158
159 for (;;)
160 {
161 char *buf = NULL;
162
163 MemoryContextSwitchTo(PCPMemoryContext);
164 MemoryContextResetAndDeleteChildren(PCPMemoryContext);
165
166 errno = 0;
167
168 /* read a PCP packet */
169 do_pcp_read(pcp_frontend, &tos, 1);
170 do_pcp_read(pcp_frontend, &rsize, sizeof(int));
171
172 rsize = ntohl(rsize);
173
174 if (rsize <= 0 || rsize >= MAX_PCP_PACKET_LENGTH)
175 ereport(FATAL,
176 (errmsg("invalid PCP packet"),
177 errdetail("incorrect packet length (%d)", rsize)));
178
179 if ((rsize - sizeof(int)) > 0)
180 {
181 buf = (char *) palloc(rsize - sizeof(int));
182 do_pcp_read(pcp_frontend, buf, rsize - sizeof(int));
183 }
184
185 ereport(DEBUG1,
186 (errmsg("received PCP packet"),
187 errdetail("PCP packet type of service '%c'", tos)));
188
189 if (tos == 'R') /* authentication */
190 {
191 set_ps_display("PCP: processing authentication", false);
192 process_authentication(pcp_frontend, buf, salt, &random_salt);
193 authenticated = 1;
194 continue;
195 }
196 if (tos == 'M') /* md5 salt */
197 {
198 set_ps_display("PCP: processing authentication", false);
199 send_md5salt(pcp_frontend, salt);
200 random_salt = 1;
201 continue;
202 }
203 /* is this connection authenticated? if not disconnect immediately */
204 if (!authenticated)
205 ereport(FATAL,
206 (errmsg("authentication failed for new PCP connection"),
207 errdetail("connection not authorized")));
208
209 /* process a request */
210 pcp_process_command(tos, buf, rsize);
211 }
212 exit(0);
213 }
214
215
216 /* pcp command processor */
217 static void
pcp_process_command(char tos,char * buf,int buf_len)218 pcp_process_command(char tos, char *buf, int buf_len)
219 {
220
221 if (tos == 'O' || tos == 'T')
222 {
223 if (Req_info->switching)
224 {
225 if (Req_info->request_queue_tail != Req_info->request_queue_head)
226 {
227 POOL_REQUEST_KIND reqkind;
228
229 reqkind = Req_info->request[(Req_info->request_queue_head + 1) % MAX_REQUEST_QUEUE_SIZE].kind;
230
231 if (reqkind == NODE_UP_REQUEST)
232 ereport(ERROR,
233 (errmsg("failed to process PCP request at the moment"),
234 errdetail("failback is in progress")));
235 else if (reqkind == NODE_DOWN_REQUEST)
236 ereport(ERROR,
237 (errmsg("failed to process PCP request at the moment"),
238 errdetail("failover is in progress")));
239 else if (reqkind == PROMOTE_NODE_REQUEST)
240 ereport(ERROR,
241 (errmsg("failed to process PCP request at the moment"),
242 errdetail("promote node operation is in progress")));
243
244 ereport(ERROR,
245 (errmsg("failed to process PCP request at the moment"),
246 errdetail("operation is in progress")));
247 }
248 }
249 }
250
251 switch (tos)
252 {
253 case 'A': /* set configuration parameter */
254 set_ps_display("PCP: processing set configration parameter request", false);
255 process_set_configration_parameter(pcp_frontend, buf, buf_len);
256 break;
257
258 case 'L': /* node count */
259 set_ps_display("PCP: processing node count request", false);
260 inform_node_count(pcp_frontend);
261 break;
262
263 case 'I': /* node info */
264 set_ps_display("PCP: processing node info request", false);
265 inform_node_info(pcp_frontend, buf);
266 break;
267
268 case 'N': /* process count */
269 set_ps_display("PCP: processing process count request", false);
270 inform_process_count(pcp_frontend);
271 break;
272
273 case 'P': /* process info */
274 set_ps_display("PCP: processing process info request", false);
275 inform_process_info(pcp_frontend, buf);
276 break;
277
278 case 'W': /* watchdog info */
279 set_ps_display("PCP: processing watchdog info request", false);
280 inform_watchdog_info(pcp_frontend, buf);
281 break;
282
283 case 'D': /* detach node */
284 case 'd': /* detach node gracefully */
285 set_ps_display("PCP: processing detach node request", false);
286 process_detach_node(pcp_frontend, buf, tos);
287 break;
288
289 case 'C': /* attach node */
290 set_ps_display("PCP: processing attach node request", false);
291 process_attach_node(pcp_frontend, buf);
292 break;
293
294 case 'T':
295 set_ps_display("PCP: processing shutdown request", false);
296 process_shutown_request(pcp_frontend, buf[0]);
297 break;
298
299 case 'O': /* recovery request */
300 set_ps_display("PCP: processing recovery request", false);
301 process_recovery_request(pcp_frontend, buf);
302 break;
303
304 case 'B': /* status request */
305 set_ps_display("PCP: processing status request request", false);
306 process_status_request(pcp_frontend);
307 break;
308
309 case 'J': /* promote node */
310 case 'j': /* promote node gracefully */
311 set_ps_display("PCP: processing promote node request", false);
312 process_promote_node(pcp_frontend, buf, tos);
313 break;
314
315 case 'F':
316 ereport(DEBUG1,
317 (errmsg("PCP processing request, stop online recovery")));
318 break;
319
320 case 'X': /* disconnect */
321 ereport(DEBUG1,
322 (errmsg("PCP processing request, client disconnecting"),
323 errdetail("closing PCP connection, and exiting child")));
324 pcp_close(pcp_frontend);
325 pcp_frontend = NULL;
326 /* This child has done its part. Rest in peace now */
327 exit(0);
328 break;
329
330 default:
331 ereport(FATAL,
332 (errmsg("PCP processing request"),
333 errdetail("unknown PCP packet type \"%c\"", tos)));
334 }
335 }
336
337 static RETSIGTYPE
die(int sig)338 die(int sig)
339 {
340 ereport(DEBUG1,
341 (errmsg("PCP worker child receives shutdown request signal %d", sig)));
342 if (sig == SIGTERM)
343 {
344 ereport(DEBUG1,
345 (errmsg("PCP worker child receives smart shutdown request."),
346 errdetail("waiting for the child to die its natural death")));
347 }
348 else if (sig == SIGINT)
349 {
350 ereport(DEBUG1,
351 (errmsg("PCP worker child receives fast shutdown request.")));
352 exit(0);
353 }
354 else if (sig == SIGQUIT)
355 {
356 ereport(DEBUG1,
357 (errmsg("PCP worker child receives immediate shutdown request.")));
358 exit(0);
359 }
360 else
361 exit(1);
362 }
363
364
365 static RETSIGTYPE
wakeup_handler_child(int sig)366 wakeup_handler_child(int sig)
367 {
368 pcp_worker_wakeup_request = 1;
369 }
370
371 /*
372 * unset non-block flag
373 */
374 static void
unset_nonblock(int fd)375 unset_nonblock(int fd)
376 {
377 int var;
378
379 /* set fd to non-blocking */
380 var = fcntl(fd, F_GETFL, 0);
381 if (var == -1)
382 {
383 ereport(FATAL,
384 (errmsg("unable to connect"),
385 errdetail("fcntl system call failed with error : \"%m\"")));
386
387 }
388 if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
389 {
390 ereport(FATAL,
391 (errmsg("unable to connect"),
392 errdetail("fcntl system call failed with error : \"%m\"")));
393 }
394 }
395
396 /*
397 * see if received username and password matches with one in the file
398 */
399 static int
user_authenticate(char * buf,char * passwd_file,char * salt,int salt_len)400 user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len)
401 {
402 FILE *fp = NULL;
403 char packet_username[MAX_USER_PASSWD_LEN + 1];
404 char packet_password[MAX_USER_PASSWD_LEN + 1];
405 char encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
406 char file_username[MAX_USER_PASSWD_LEN + 1];
407 char file_password[MAX_USER_PASSWD_LEN + 1];
408 char *index = NULL;
409 static char line[MAX_FILE_LINE_LEN + 1];
410 int i,
411 len;
412
413 /* strcpy() should be OK, but use strncpy() to be extra careful */
414 strncpy(packet_username, buf, MAX_USER_PASSWD_LEN);
415 index = (char *) memchr(buf, '\0', MAX_USER_PASSWD_LEN);
416 if (index == NULL)
417 {
418 ereport(FATAL,
419 (errmsg("failed to authenticate PCP user"),
420 errdetail("error while reading authentication packet")));
421 return 0;
422 }
423 strncpy(packet_password, ++index, MAX_USER_PASSWD_LEN);
424
425 fp = fopen(passwd_file, "r");
426 if (fp == NULL)
427 {
428 ereport(FATAL,
429 (errmsg("failed to authenticate PCP user"),
430 errdetail("could not open %s. reason: %m", passwd_file)));
431 return 0;
432 }
433
434 /* for now, I don't care if duplicate username exists in the config file */
435 while ((fgets(line, MAX_FILE_LINE_LEN, fp)) != NULL)
436 {
437 i = 0;
438 len = 0;
439
440 if (line[0] == '\n' || line[0] == '#')
441 continue;
442
443 while (line[i] != ':')
444 {
445 len++;
446 if (++i > MAX_USER_PASSWD_LEN)
447 {
448 fclose(fp);
449 ereport(FATAL,
450 (errmsg("failed to authenticate PCP user"),
451 errdetail("username read from file \"%s\" is larger than maximum allowed username length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
452 return 0;
453 }
454 }
455 memcpy(file_username, line, len);
456 file_username[len] = '\0';
457
458 if (strcmp(packet_username, file_username) != 0)
459 continue;
460
461 i++;
462 len = 0;
463 while (line[i] != '\n' && line[i] != '\0')
464 {
465 len++;
466 if (++i > MAX_USER_PASSWD_LEN)
467 {
468 fclose(fp);
469 ereport(FATAL,
470 (errmsg("failed to authenticate PCP user"),
471 errdetail("password read from file \"%s\" is larger than maximum allowed password length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
472 return 0;
473 }
474 }
475
476 memcpy(file_password, line + strlen(file_username) + 1, len);
477 file_password[len] = '\0';
478
479 pool_md5_encrypt(file_password, file_username, strlen(file_username),
480 encrypt_buf + MD5_PASSWD_LEN + 1);
481 encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
482
483 pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, salt_len,
484 encrypt_buf);
485 encrypt_buf[MD5_PASSWD_LEN] = '\0';
486
487 if (strcmp(encrypt_buf, packet_password) == 0)
488 {
489 fclose(fp);
490 return 1;
491 }
492 }
493 fclose(fp);
494 ereport(FATAL,
495 (errmsg("authentication failed for user \"%s\"", packet_username),
496 errdetail("username and/or password does not match")));
497
498 return 0;
499 }
500
501
502 /* Detach a node */
503 static int
pool_detach_node(int node_id,bool gracefully)504 pool_detach_node(int node_id, bool gracefully)
505 {
506 if (!gracefully)
507 {
508 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
509 return 0;
510 }
511
512 /*
513 * Check if the NODE DOWN can be executed on the given node id.
514 */
515 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
516
517 /*
518 * Wait until all frontends exit
519 */
520 *InRecovery = RECOVERY_DETACH; /* This wiil ensure that new incoming
521 * connection requests are blocked */
522
523 if (wait_connection_closed())
524 {
525 /* wait timed out */
526 finish_recovery();
527 return -1;
528 }
529
530 pcp_worker_wakeup_request = 0;
531
532 /*
533 * Now all frontends have gone. Let's do failover.
534 */
535 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, false);
536
537 /*
538 * Wait for failover completed.
539 */
540
541 while (!pcp_worker_wakeup_request)
542 {
543 struct timeval t = {1, 0};
544
545 select(0, NULL, NULL, NULL, &t);
546 }
547 pcp_worker_wakeup_request = 0;
548
549 /*
550 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
551 * to distribute SIGUSR2 all pgpool children.
552 */
553 finish_recovery();
554
555 return 0;
556 }
557
558 /* Promote a node */
559 static int
pool_promote_node(int node_id,bool gracefully)560 pool_promote_node(int node_id, bool gracefully)
561 {
562 if (!gracefully)
563 {
564 promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
565 return 0;
566 }
567
568 /*
569 * Wait until all frontends exit
570 */
571 *InRecovery = RECOVERY_PROMOTE; /* This wiil ensure that new incoming
572 * connection requests are blocked */
573
574 if (wait_connection_closed())
575 {
576 /* wait timed out */
577 finish_recovery();
578 return -1;
579 }
580
581 /*
582 * Now all frontends have gone. Let's do failover.
583 */
584 promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
585
586 /*
587 * Wait for failover completed.
588 */
589 pcp_worker_wakeup_request = 0;
590
591 while (!pcp_worker_wakeup_request)
592 {
593 struct timeval t = {1, 0};
594
595 select(0, NULL, NULL, NULL, &t);
596 }
597 pcp_worker_wakeup_request = 0;
598
599 /*
600 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
601 * to distribute SIGUSR2 all pgpool children.
602 */
603 finish_recovery();
604 return 0;
605 }
606
607 static void
inform_process_count(PCP_CONNECTION * frontend)608 inform_process_count(PCP_CONNECTION * frontend)
609 {
610 int wsize;
611 int process_count;
612 char process_count_str[16];
613 int *process_list = NULL;
614 char code[] = "CommandComplete";
615 char *mesg = NULL;
616 int i;
617 int total_port_len = 0;
618
619 process_list = pool_get_process_list(&process_count);
620
621 mesg = (char *) palloc(7 * process_count); /* PID is at most 6 characters
622 * long */
623
624 snprintf(process_count_str, sizeof(process_count_str), "%d", process_count);
625
626 for (i = 0; i < process_count; i++)
627 {
628 char process_id[7];
629
630 snprintf(process_id, sizeof(process_id), "%d", process_list[i]);
631 snprintf(mesg + total_port_len, strlen(process_id) + 1, "%s", process_id);
632 total_port_len += strlen(process_id) + 1;
633 }
634
635 pcp_write(frontend, "n", 1);
636 wsize = htonl(sizeof(code) +
637 strlen(process_count_str) + 1 +
638 total_port_len +
639 sizeof(int));
640 pcp_write(frontend, &wsize, sizeof(int));
641 pcp_write(frontend, code, sizeof(code));
642 pcp_write(frontend, process_count_str, strlen(process_count_str) + 1);
643 pcp_write(frontend, mesg, total_port_len);
644 do_pcp_flush(frontend);
645
646 pfree(process_list);
647 pfree(mesg);
648
649 ereport(DEBUG1,
650 (errmsg("PCP: informing process count"),
651 errdetail("%d process(es) found", process_count)));
652 }
653
654 static void
inform_process_info(PCP_CONNECTION * frontend,char * buf)655 inform_process_info(PCP_CONNECTION * frontend, char *buf)
656 {
657 int proc_id;
658 int wsize;
659 int num_proc = pool_config->num_init_children;
660 int i;
661
662 proc_id = atoi(buf);
663
664 if ((proc_id != 0) && (pool_get_process_info(proc_id) == NULL))
665 {
666 ereport(ERROR,
667 (errmsg("informing process info failed"),
668 errdetail("invalid process ID : %s", buf)));
669 }
670 else
671 {
672 /* First, send array size of connection_info */
673 char arr_code[] = "ArraySize";
674 char con_info_size[16];
675
676 /* Finally, indicate that all data is sent */
677 char fin_code[] = "CommandComplete";
678
679 POOL_REPORT_POOLS *pools = get_pools(&num_proc);
680
681 if (proc_id == 0)
682 {
683 snprintf(con_info_size, sizeof(con_info_size), "%d", num_proc);
684 }
685 else
686 {
687 snprintf(con_info_size, sizeof(con_info_size), "%d", pool_config->max_pool * NUM_BACKENDS);
688 }
689
690 pcp_write(frontend, "p", 1);
691 wsize = htonl(sizeof(arr_code) +
692 strlen(con_info_size) + 1 +
693 sizeof(int));
694 pcp_write(frontend, &wsize, sizeof(int));
695 pcp_write(frontend, arr_code, sizeof(arr_code));
696 pcp_write(frontend, con_info_size, strlen(con_info_size) + 1);
697 do_pcp_flush(frontend);
698
699 /* Second, send process information for all connection_info */
700 for (i = 0; i < num_proc; i++)
701 {
702 char code[] = "ProcessInfo";
703 char proc_pid[16];
704 char proc_start_time[20];
705 char proc_create_time[20];
706 char majorversion[5];
707 char minorversion[5];
708 char pool_counter[16];
709 char backend_id[16];
710 char backend_pid[16];
711 char connected[2];
712
713 if (proc_id != 0 && proc_id != pools[i].pool_pid)
714 continue;
715
716 snprintf(proc_pid, sizeof(proc_pid), "%d", pools[i].pool_pid);
717 snprintf(proc_start_time, sizeof(proc_start_time), "%ld", pools[i].start_time);
718 snprintf(proc_create_time, sizeof(proc_create_time), "%ld", pools[i].create_time);
719 snprintf(majorversion, sizeof(majorversion), "%d", pools[i].pool_majorversion);
720 snprintf(minorversion, sizeof(minorversion), "%d", pools[i].pool_minorversion);
721 snprintf(pool_counter, sizeof(pool_counter), "%d", pools[i].pool_counter);
722 snprintf(backend_id, sizeof(backend_pid), "%d", pools[i].backend_id);
723 snprintf(backend_pid, sizeof(backend_pid), "%d", pools[i].pool_backendpid);
724 snprintf(connected, sizeof(connected), "%d", pools[i].pool_connected);
725
726 pcp_write(frontend, "p", 1);
727 wsize = htonl(sizeof(code) +
728 strlen(proc_pid) + 1 +
729 strlen(pools[i].database) + 1 +
730 strlen(pools[i].username) + 1 +
731 strlen(proc_start_time) + 1 +
732 strlen(proc_create_time) + 1 +
733 strlen(majorversion) + 1 +
734 strlen(minorversion) + 1 +
735 strlen(pool_counter) + 1 +
736 strlen(backend_id) + 1 +
737 strlen(backend_pid) + 1 +
738 strlen(connected) + 1 +
739 sizeof(int));
740 pcp_write(frontend, &wsize, sizeof(int));
741 pcp_write(frontend, code, sizeof(code));
742 pcp_write(frontend, proc_pid, strlen(proc_pid) + 1);
743 pcp_write(frontend, pools[i].database, strlen(pools[i].database) + 1);
744 pcp_write(frontend, pools[i].username, strlen(pools[i].username) + 1);
745 pcp_write(frontend, proc_start_time, strlen(proc_start_time) + 1);
746 pcp_write(frontend, proc_create_time, strlen(proc_create_time) + 1);
747 pcp_write(frontend, majorversion, strlen(majorversion) + 1);
748 pcp_write(frontend, minorversion, strlen(minorversion) + 1);
749 pcp_write(frontend, pool_counter, strlen(pool_counter) + 1);
750 pcp_write(frontend, backend_id, strlen(backend_id) + 1);
751 pcp_write(frontend, backend_pid, strlen(backend_pid) + 1);
752 pcp_write(frontend, connected, strlen(connected) + 1);
753 do_pcp_flush(frontend);
754 }
755
756 pcp_write(frontend, "p", 1);
757 wsize = htonl(sizeof(fin_code) +
758 sizeof(int));
759 pcp_write(frontend, &wsize, sizeof(int));
760 pcp_write(frontend, fin_code, sizeof(fin_code));
761 do_pcp_flush(frontend);
762 ereport(DEBUG1,
763 (errmsg("PCP informing process info"),
764 errdetail("retrieved process information from shared memory")));
765
766 pfree(pools);
767 }
768 }
769
770 static void
inform_watchdog_info(PCP_CONNECTION * frontend,char * buf)771 inform_watchdog_info(PCP_CONNECTION * frontend, char *buf)
772 {
773 int wd_index;
774 int json_data_len;
775 int wsize;
776 char code[] = "CommandComplete";
777 char *json_data;
778
779 if (!pool_config->use_watchdog)
780 ereport(ERROR,
781 (errmsg("PCP: informing watchdog info failed"),
782 errdetail("watcdhog is not enabled")));
783
784 wd_index = atoi(buf);
785
786 json_data = wd_get_watchdog_nodes(wd_index);
787 if (json_data == NULL)
788 ereport(ERROR,
789 (errmsg("PCP: informing watchdog info failed"),
790 errdetail("invalid watchdog index")));
791
792 ereport(DEBUG2,
793 (errmsg("PCP: informing watchdog info"),
794 errdetail("retrieved node information from IPC socket")));
795
796 /*
797 * This is the voilation of PCP protocol but I think in future we should
798 * shift to more adaptable protocol for data transmition.
799 */
800 json_data_len = strlen(json_data);
801 wsize = htonl(sizeof(code) +
802 json_data_len + 1 +
803 sizeof(int));
804 pcp_write(frontend, "w", 1);
805
806 pcp_write(frontend, &wsize, sizeof(int));
807 pcp_write(frontend, code, sizeof(code));
808
809 pcp_write(frontend, json_data, json_data_len + 1);
810 do_pcp_flush(frontend);
811
812 pfree(json_data);
813 }
814
815 static void
inform_node_info(PCP_CONNECTION * frontend,char * buf)816 inform_node_info(PCP_CONNECTION * frontend, char *buf)
817 {
818 int node_id;
819 int wsize;
820 char port_str[6];
821 char status[2];
822 char weight_str[20];
823 char role_str[10];
824 char standby_delay_str[20];
825 char status_changed_time_str[20];
826 char code[] = "CommandComplete";
827 BackendInfo *bi = NULL;
828 SERVER_ROLE role;
829
830 node_id = atoi(buf);
831
832 bi = pool_get_node_info(node_id);
833
834 if (bi == NULL)
835 ereport(ERROR,
836 (errmsg("informing node info failed"),
837 errdetail("invalid node ID")));
838
839 ereport(DEBUG2,
840 (errmsg("PCP: informing node info"),
841 errdetail("retrieved node information from shared memory")));
842
843 snprintf(port_str, sizeof(port_str), "%d", bi->backend_port);
844 snprintf(status, sizeof(status), "%d", bi->backend_status);
845 snprintf(weight_str, sizeof(weight_str), "%f", bi->backend_weight);
846
847 if (STREAM)
848 {
849 if (Req_info->primary_node_id == node_id)
850 role = ROLE_PRIMARY;
851 else
852 role = ROLE_STANDBY;
853 }
854 else
855 {
856 if (Req_info->master_node_id == node_id)
857 role = ROLE_MASTER;
858 else
859 role = ROLE_SLAVE;
860 }
861 snprintf(role_str, sizeof(role_str), "%d", role);
862
863 snprintf(standby_delay_str, sizeof(standby_delay_str), UINT64_FORMAT, bi->standby_delay);
864
865 snprintf(status_changed_time_str, sizeof(status_changed_time_str), UINT64_FORMAT, bi->status_changed_time);
866
867 pcp_write(frontend, "i", 1);
868 wsize = htonl(sizeof(code) +
869 strlen(bi->backend_hostname) + 1 +
870 strlen(port_str) + 1 +
871 strlen(status) + 1 +
872 strlen(weight_str) + 1 +
873 strlen(role_str) + 1 +
874 strlen(standby_delay_str) + 1 +
875 strlen(bi->replication_state) + 1 +
876 strlen(bi->replication_sync_state) + 1 +
877 strlen(status_changed_time_str) + 1 +
878 sizeof(int));
879 pcp_write(frontend, &wsize, sizeof(int));
880 pcp_write(frontend, code, sizeof(code));
881 pcp_write(frontend, bi->backend_hostname, strlen(bi->backend_hostname) + 1);
882 pcp_write(frontend, port_str, strlen(port_str) + 1);
883 pcp_write(frontend, status, strlen(status) + 1);
884 pcp_write(frontend, weight_str, strlen(weight_str) + 1);
885 pcp_write(frontend, role_str, strlen(role_str) + 1);
886 pcp_write(frontend, standby_delay_str, strlen(standby_delay_str) + 1);
887 pcp_write(frontend, bi->replication_state, strlen(bi->replication_state) + 1);
888 pcp_write(frontend, bi->replication_sync_state, strlen(bi->replication_sync_state) + 1);
889 pcp_write(frontend, status_changed_time_str, strlen(status_changed_time_str) + 1);
890
891 do_pcp_flush(frontend);
892 }
893
894 static void
inform_node_count(PCP_CONNECTION * frontend)895 inform_node_count(PCP_CONNECTION * frontend)
896 {
897 int wsize;
898 char mesg[16];
899 char code[] = "CommandComplete";
900 int node_count = pool_get_node_count();
901
902 snprintf(mesg, sizeof(mesg), "%d", node_count);
903
904 pcp_write(frontend, "l", 1);
905 wsize = htonl(sizeof(code) +
906 strlen(mesg) + 1 +
907 sizeof(int));
908 pcp_write(frontend, &wsize, sizeof(int));
909 pcp_write(frontend, code, sizeof(code));
910 pcp_write(frontend, mesg, strlen(mesg) + 1);
911 do_pcp_flush(frontend);
912
913 ereport(DEBUG1,
914 (errmsg("PCP: informing node count"),
915 errdetail("%d node(s) found", node_count)));
916 }
917
918 static void
process_detach_node(PCP_CONNECTION * frontend,char * buf,char tos)919 process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos)
920 {
921 int node_id;
922 int wsize;
923 char code[] = "CommandComplete";
924 bool gracefully;
925
926 if (tos == 'D')
927 gracefully = false;
928 else
929 gracefully = true;
930
931 node_id = atoi(buf);
932 ereport(DEBUG1,
933 (errmsg("PCP: processing detach node"),
934 errdetail("detaching Node ID %d", node_id)));
935
936 pool_detach_node(node_id, gracefully);
937
938 pcp_write(frontend, "d", 1);
939 wsize = htonl(sizeof(code) + sizeof(int));
940 pcp_write(frontend, &wsize, sizeof(int));
941 pcp_write(frontend, code, sizeof(code));
942 do_pcp_flush(frontend);
943 }
944
945 static void
process_attach_node(PCP_CONNECTION * frontend,char * buf)946 process_attach_node(PCP_CONNECTION * frontend, char *buf)
947 {
948 int node_id;
949 int wsize;
950 char code[] = "CommandComplete";
951
952 node_id = atoi(buf);
953 ereport(DEBUG1,
954 (errmsg("PCP: processing attach node"),
955 errdetail("attaching Node ID %d", node_id)));
956
957 send_failback_request(node_id, true, REQ_DETAIL_CONFIRMED);
958
959 pcp_write(frontend, "c", 1);
960 wsize = htonl(sizeof(code) + sizeof(int));
961 pcp_write(frontend, &wsize, sizeof(int));
962 pcp_write(frontend, code, sizeof(code));
963 do_pcp_flush(frontend);
964 }
965
966
967 static void
process_recovery_request(PCP_CONNECTION * frontend,char * buf)968 process_recovery_request(PCP_CONNECTION * frontend, char *buf)
969 {
970 int wsize;
971 char code[] = "CommandComplete";
972 int node_id = atoi(buf);
973
974 if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
975 ereport(ERROR,
976 (errmsg("process recovery request failed"),
977 errdetail("node id %d is not valid", node_id)));
978
979 if ((!REPLICATION &&
980 !(MASTER_SLAVE &&
981 pool_config->master_slave_sub_mode == STREAM_MODE)) ||
982 (MASTER_SLAVE &&
983 pool_config->master_slave_sub_mode == STREAM_MODE &&
984 node_id == PRIMARY_NODE_ID))
985 {
986 if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
987 ereport(ERROR,
988 (errmsg("process recovery request failed"),
989 errdetail("primary server cannot be recovered by online recovery.")));
990 else
991 ereport(ERROR,
992 (errmsg("process recovery request failed"),
993 errdetail("recovery request is only allowed in replication and streaming replication modes.")));
994 }
995 else
996 {
997 if (pcp_mark_recovery_in_progress() == false)
998 ereport(FATAL,
999 (errmsg("process recovery request failed"),
1000 errdetail("pgpool-II is already processing another recovery request.")));
1001
1002 ereport(DEBUG1,
1003 (errmsg("PCP: processing recovery request"),
1004 errdetail("start online recovery")));
1005
1006 PG_TRY();
1007 {
1008 start_recovery(node_id);
1009 finish_recovery();
1010 pcp_write(frontend, "c", 1);
1011 wsize = htonl(sizeof(code) + sizeof(int));
1012 pcp_write(frontend, &wsize, sizeof(int));
1013 pcp_write(frontend, code, sizeof(code));
1014 do_pcp_flush(frontend);
1015 pcp_mark_recovery_finished();
1016 }
1017 PG_CATCH();
1018 {
1019 finish_recovery();
1020 pcp_mark_recovery_finished();
1021 PG_RE_THROW();
1022
1023 } PG_END_TRY();
1024 }
1025 do_pcp_flush(frontend);
1026 }
1027
1028 static void
process_status_request(PCP_CONNECTION * frontend)1029 process_status_request(PCP_CONNECTION * frontend)
1030 {
1031 int nrows = 0;
1032 int i;
1033 POOL_REPORT_CONFIG *status = get_config(&nrows);
1034 int len = 0;
1035
1036 /* First, send array size of connection_info */
1037 char arr_code[] = "ArraySize";
1038 char code[] = "ProcessConfig";
1039
1040 /* Finally, indicate that all data is sent */
1041 char fin_code[] = "CommandComplete";
1042
1043 pcp_write(frontend, "b", 1);
1044 len = htonl(sizeof(arr_code) + sizeof(int) + sizeof(int));
1045 pcp_write(frontend, &len, sizeof(int));
1046 pcp_write(frontend, arr_code, sizeof(arr_code));
1047 len = htonl(nrows);
1048 pcp_write(frontend, &len, sizeof(int));
1049
1050 do_pcp_flush(frontend);
1051
1052 for (i = 0; i < nrows; i++)
1053 {
1054 pcp_write(frontend, "b", 1);
1055 len = htonl(sizeof(int)
1056 + sizeof(code)
1057 + strlen(status[i].name) + 1
1058 + strlen(status[i].value) + 1
1059 + strlen(status[i].desc) + 1
1060 );
1061
1062 pcp_write(frontend, &len, sizeof(int));
1063 pcp_write(frontend, code, sizeof(code));
1064 pcp_write(frontend, status[i].name, strlen(status[i].name) + 1);
1065 pcp_write(frontend, status[i].value, strlen(status[i].value) + 1);
1066 pcp_write(frontend, status[i].desc, strlen(status[i].desc) + 1);
1067 }
1068
1069 pcp_write(frontend, "b", 1);
1070 len = htonl(sizeof(fin_code) + sizeof(int));
1071 pcp_write(frontend, &len, sizeof(int));
1072 pcp_write(frontend, fin_code, sizeof(fin_code));
1073 do_pcp_flush(frontend);
1074
1075 pfree(status);
1076 ereport(DEBUG1,
1077 (errmsg("PCP: processing status request"),
1078 errdetail("retrieved status information")));
1079 }
1080
1081 static void
process_promote_node(PCP_CONNECTION * frontend,char * buf,char tos)1082 process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos)
1083 {
1084 int node_id;
1085 int wsize;
1086 char code[] = "CommandComplete";
1087 bool gracefully;
1088
1089 if (tos == 'J')
1090 gracefully = false;
1091 else
1092 gracefully = true;
1093
1094 node_id = atoi(buf);
1095 if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
1096 ereport(ERROR,
1097 (errmsg("could not process recovery request"),
1098 errdetail("node id %d is not valid", node_id)));
1099 /* promoting node is reserved to Streaming Replication */
1100 if (!MASTER_SLAVE || pool_config->master_slave_sub_mode != STREAM_MODE)
1101 {
1102 ereport(FATAL,
1103 (errmsg("invalid pgpool mode for process recovery request"),
1104 errdetail("not in streaming replication mode, can't promote node id %d", node_id)));
1105
1106 }
1107
1108 if (node_id == REAL_PRIMARY_NODE_ID)
1109 {
1110 ereport(FATAL,
1111 (errmsg("invalid pgpool mode for process recovery request"),
1112 errdetail("specified node is already primary node, can't promote node id %d", node_id)));
1113
1114 }
1115 ereport(DEBUG1,
1116 (errmsg("PCP: processing promote node"),
1117 errdetail("promoting Node ID %d", node_id)));
1118 pool_promote_node(node_id, gracefully);
1119
1120 pcp_write(frontend, "d", 1);
1121 wsize = htonl(sizeof(code) + sizeof(int));
1122 pcp_write(frontend, &wsize, sizeof(int));
1123 pcp_write(frontend, code, sizeof(code));
1124 do_pcp_flush(frontend);
1125 }
1126
1127 static void
process_authentication(PCP_CONNECTION * frontend,char * buf,char * salt,int * random_salt)1128 process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt)
1129 {
1130 int wsize;
1131 int authenticated;
1132
1133 if (*random_salt)
1134 {
1135 authenticated = user_authenticate(buf, pcp_conf_file, salt, 4);
1136 }
1137 if (!*random_salt || !authenticated)
1138 {
1139 ereport(FATAL,
1140 (errmsg("authentication failed"),
1141 errdetail("username and/or password does not match")));
1142
1143 *random_salt = 0;
1144 }
1145 else
1146 {
1147 char code[] = "AuthenticationOK";
1148
1149 pcp_write(frontend, "r", 1);
1150 wsize = htonl(sizeof(code) + sizeof(int));
1151 pcp_write(frontend, &wsize, sizeof(int));
1152 pcp_write(frontend, code, sizeof(code));
1153 do_pcp_flush(frontend);
1154 *random_salt = 0;
1155
1156 ereport(DEBUG1,
1157 (errmsg("PCP: processing authentication request"),
1158 errdetail("authentication OK")));
1159 }
1160 }
1161
1162 static void
send_md5salt(PCP_CONNECTION * frontend,char * salt)1163 send_md5salt(PCP_CONNECTION * frontend, char *salt)
1164 {
1165 int wsize;
1166
1167 ereport(DEBUG1,
1168 (errmsg("PCP: sending md5 salt to client")));
1169
1170 pool_random_salt(salt);
1171
1172 pcp_write(frontend, "m", 1);
1173 wsize = htonl(sizeof(int) + 4);
1174 pcp_write(frontend, &wsize, sizeof(int));
1175 pcp_write(frontend, salt, 4);
1176 do_pcp_flush(frontend);
1177 }
1178
1179 static void
process_shutown_request(PCP_CONNECTION * frontend,char mode)1180 process_shutown_request(PCP_CONNECTION * frontend, char mode)
1181 {
1182 char code[] = "CommandComplete";
1183 pid_t ppid = getppid();
1184 int sig,
1185 len;
1186
1187 if (mode == 's')
1188 {
1189 ereport(DEBUG1,
1190 (errmsg("PCP: processing shutdown request"),
1191 errdetail("sending SIGTERM to the parent process with PID:%d", ppid)));
1192 sig = SIGTERM;
1193 }
1194 else if (mode == 'f')
1195 {
1196 ereport(DEBUG1,
1197 (errmsg("PCP: processing shutdown request"),
1198 errdetail("sending SIGINT to the parent process with PID:%d", ppid)));
1199 sig = SIGINT;
1200 }
1201 else if (mode == 'i')
1202 {
1203 ereport(DEBUG1,
1204 (errmsg("PCP: processing shutdown request"),
1205 errdetail("sending SIGQUIT to the parent process with PID:%d", ppid)));
1206 sig = SIGQUIT;
1207 }
1208 else
1209 {
1210 ereport(ERROR,
1211 (errmsg("PCP: error while processing shutdown request"),
1212 errdetail("invalid shutdown mode \"%c\"", mode)));
1213 }
1214
1215 pcp_write(frontend, "t", 1);
1216 len = htonl(sizeof(code) + sizeof(int));
1217 pcp_write(frontend, &len, sizeof(int));
1218 pcp_write(frontend, code, sizeof(code));
1219 do_pcp_flush(frontend);
1220
1221 pool_signal_parent(sig);
1222 }
1223
1224 static void
process_set_configration_parameter(PCP_CONNECTION * frontend,char * buf,int len)1225 process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len)
1226 {
1227 char *param_name;
1228 char *param_value;
1229 int wsize;
1230 char code[] = "CommandComplete";
1231
1232 param_name = buf;
1233 if (param_name == NULL)
1234 ereport(ERROR,
1235 (errmsg("PCP: set configuration parameter failed"),
1236 errdetail("invalid pcp packet received from client")));
1237
1238 param_value = (char *) memchr(buf, '\0', len);
1239 if (param_value == NULL)
1240 ereport(ERROR,
1241 (errmsg("set configuration parameter failed"),
1242 errdetail("invalid pcp packet received from client")));
1243
1244 param_value += 1;
1245 ereport(LOG,
1246 (errmsg("set configuration parameter, \"%s TO %s\"", param_name, param_value)));
1247
1248 if (strcasecmp(param_name, "client_min_messages") == 0)
1249 {
1250 const char *ordered_valid_values[] = {"debug5", "debug4", "debug3", "debug2", "debug1", "log", "commerror", "info", "notice", "warning", "error", NULL};
1251 bool found = false;
1252 int i;
1253
1254 for (i = 0;; i++)
1255 {
1256 char *valid_val = (char *) ordered_valid_values[i];
1257
1258 if (!valid_val)
1259 break;
1260
1261 if (!strcasecmp(param_value, valid_val))
1262 {
1263 found = true;
1264 pool_config->client_min_messages = i + 10;
1265 ereport(DEBUG1,
1266 (errmsg("PCP setting parameter \"%s\" to \"%s\"", param_name, param_value)));
1267 break;
1268 }
1269 }
1270 if (!found)
1271 ereport(ERROR,
1272 (errmsg("PCP: set configuration parameter failed"),
1273 errdetail("invalid value \"%s\" for parameter \"%s\"", param_value, param_name)));
1274 }
1275 else
1276 ereport(ERROR,
1277 (errmsg("PCP: set configuration parameter failed"),
1278 errdetail("invalid parameter \"%s\"", param_name)));
1279
1280 pcp_write(frontend, "a", 1);
1281 wsize = htonl(sizeof(code) + sizeof(int));
1282 pcp_write(frontend, &wsize, sizeof(int));
1283 pcp_write(frontend, code, sizeof(code));
1284 do_pcp_flush(frontend);
1285 }
1286
1287 /*
1288 * Wrapper around pcp_flush which throws FATAL error when pcp_flush fails
1289 */
1290 static void
do_pcp_flush(PCP_CONNECTION * frontend)1291 do_pcp_flush(PCP_CONNECTION * frontend)
1292 {
1293 if (pcp_flush(frontend) < 0)
1294 ereport(FATAL,
1295 (errmsg("failed to flush data to client"),
1296 errdetail("pcp_flush failed with error : \"%m\"")));
1297 }
1298
1299 /*
1300 * Wrapper around pcp_read which throws FATAL error when read fails
1301 */
1302 static void
do_pcp_read(PCP_CONNECTION * pc,void * buf,int len)1303 do_pcp_read(PCP_CONNECTION * pc, void *buf, int len)
1304 {
1305 if (pcp_read(pc, buf, len))
1306 ereport(FATAL,
1307 (errmsg("unable to read from client"),
1308 errdetail("pcp_read failed with error : \"%m\"")));
1309 }
1310
1311 int
send_to_pcp_frontend(char * data,int len,bool flush)1312 send_to_pcp_frontend(char *data, int len, bool flush)
1313 {
1314 int ret;
1315
1316 if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1317 return -1;
1318 ret = pcp_write(pcp_frontend, data, len);
1319 if (flush && !ret)
1320 ret = pcp_flush(pcp_frontend);
1321 return ret;
1322 }
1323
1324 int
pcp_frontend_exists(void)1325 pcp_frontend_exists(void)
1326 {
1327 if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1328 return -1;
1329 return 0;
1330 }
1331
1332 static void
pcp_worker_will_go_down(int code,Datum arg)1333 pcp_worker_will_go_down(int code, Datum arg)
1334 {
1335 if (processType != PT_PCP_WORKER)
1336 {
1337 /* should never happen */
1338 ereport(WARNING,
1339 (errmsg("pcp_worker_will_go_down called from invalid process")));
1340 return;
1341 }
1342 if (pcp_frontend)
1343 pcp_close(pcp_frontend);
1344 processState = EXITING;
1345 POOL_SETMASK(&UnBlockSig);
1346
1347 }
1348