1 /* -*-pcp_worker.c-*- */
2 /*
3 *
4 * pgpool: a language independent connection pool server for PostgreSQL
5 * written by Tatsuo Ishii
6 *
7 * Copyright (c) 2003-2019 PgPool Global Development Group
8 *
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
19 *
20 * pcp_worker.c: PCP worker child process main
21 *
22 */
23
24 #include "config.h"
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28
29 #include <arpa/inet.h>
30 #include <signal.h>
31
32 #include <stdio.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <sys/time.h>
38
39 #ifdef HAVE_FCNTL_H
40 #include <fcntl.h>
41 #endif
42
43 #include "pcp/pcp_stream.h"
44 #include "pcp/pcp.h"
45 #include "auth/md5.h"
46 #include "pool_config.h"
47 #include "context/pool_process_context.h"
48 #include "utils/pool_process_reporting.h"
49 #include "watchdog/wd_json_data.h"
50 #include "watchdog/wd_ipc_commands.h"
51 #include "utils/elog.h"
52
53 #define MAX_FILE_LINE_LEN 512
54
55 extern char *pcp_conf_file; /* global variable defined in main.c holds the
56 * path for pcp.conf */
57 volatile sig_atomic_t pcp_worker_wakeup_request = 0;
58 PCP_CONNECTION *volatile pcp_frontend = NULL;
59
60 static RETSIGTYPE die(int sig);
61 static RETSIGTYPE wakeup_handler_child(int sig);
62
63 static void unset_nonblock(int fd);
64 static int user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len);
65 static void process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt);
66 static void send_md5salt(PCP_CONNECTION * frontend, char *salt);
67
68 static void pcp_process_command(char tos, char *buf, int buf_len);
69
70 static int pool_detach_node(int node_id, bool gracefully);
71 static int pool_promote_node(int node_id, bool gracefully);
72 static void inform_process_count(PCP_CONNECTION * frontend);
73 static void inform_process_info(PCP_CONNECTION * frontend, char *buf);
74 static void inform_watchdog_info(PCP_CONNECTION * frontend, char *buf);
75 static void inform_node_info(PCP_CONNECTION * frontend, char *buf);
76 static void inform_node_count(PCP_CONNECTION * frontend);
77 static void process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos);
78 static void process_attach_node(PCP_CONNECTION * frontend, char *buf);
79 static void process_recovery_request(PCP_CONNECTION * frontend, char *buf);
80 static void process_status_request(PCP_CONNECTION * frontend);
81 static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos);
82 static void process_shutown_request(PCP_CONNECTION * frontend, char mode);
83 static void process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len);
84
85 static void pcp_worker_will_go_down(int code, Datum arg);
86
87 static void do_pcp_flush(PCP_CONNECTION * frontend);
88 static void do_pcp_read(PCP_CONNECTION * pc, void *buf, int len);
89
90 /*
91 * main entry pont of pcp worker child process
92 */
93 void
pcp_worker_main(int port)94 pcp_worker_main(int port)
95 {
96 sigjmp_buf local_sigjmp_buf;
97 MemoryContext PCPMemoryContext;
98 volatile int authenticated = 0;
99
100 char salt[4];
101 int random_salt = 0;
102 struct timeval uptime;
103 char tos;
104 int rsize;
105
106 ereport(DEBUG1,
107 (errmsg("I am PCP worker child with pid:%d", getpid())));
108
109 /* Identify myself via ps */
110 init_ps_display("", "", "", "");
111
112 gettimeofday(&uptime, NULL);
113 srandom((unsigned int) (getpid() ^ uptime.tv_usec));
114
115 /* set up signal handlers */
116 signal(SIGTERM, die);
117 signal(SIGINT, die);
118 signal(SIGQUIT, die);
119 signal(SIGCHLD, SIG_DFL);
120 signal(SIGUSR2, wakeup_handler_child);
121 signal(SIGUSR1, SIG_IGN);
122 signal(SIGHUP, SIG_IGN);
123 signal(SIGPIPE, SIG_IGN);
124 signal(SIGALRM, SIG_IGN);
125 /* Create per loop iteration memory context */
126 PCPMemoryContext = AllocSetContextCreate(TopMemoryContext,
127 "PCP_worker_main_loop",
128 ALLOCSET_DEFAULT_MINSIZE,
129 ALLOCSET_DEFAULT_INITSIZE,
130 ALLOCSET_DEFAULT_MAXSIZE);
131
132 MemoryContextSwitchTo(TopMemoryContext);
133
134 /*
135 * install the call back for preparation of pcp worker child exit
136 */
137 on_system_exit(pcp_worker_will_go_down, (Datum) NULL);
138
139 /* Initialize my backend status */
140 pool_initialize_private_backend_status();
141
142 /* Initialize process context */
143 pool_init_process_context();
144
145 pcp_frontend = pcp_open(port);
146 unset_nonblock(pcp_frontend->fd);
147
148 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
149 {
150 error_context_stack = NULL;
151 EmitErrorReport();
152
153 MemoryContextSwitchTo(TopMemoryContext);
154 FlushErrorState();
155 }
156 /* We can now handle ereport(ERROR) */
157 PG_exception_stack = &local_sigjmp_buf;
158
159 for (;;)
160 {
161 char *buf = NULL;
162
163 MemoryContextSwitchTo(PCPMemoryContext);
164 MemoryContextResetAndDeleteChildren(PCPMemoryContext);
165
166 errno = 0;
167
168 /* read a PCP packet */
169 do_pcp_read(pcp_frontend, &tos, 1);
170 do_pcp_read(pcp_frontend, &rsize, sizeof(int));
171
172 rsize = ntohl(rsize);
173
174 if (rsize <= 0 || rsize >= MAX_PCP_PACKET_LENGTH)
175 ereport(FATAL,
176 (errmsg("invalid PCP packet"),
177 errdetail("incorrect packet length (%d)", rsize)));
178
179 if ((rsize - sizeof(int)) > 0)
180 {
181 buf = (char *) palloc(rsize - sizeof(int));
182 do_pcp_read(pcp_frontend, buf, rsize - sizeof(int));
183 }
184
185 ereport(DEBUG1,
186 (errmsg("received PCP packet"),
187 errdetail("PCP packet type of service '%c'", tos)));
188
189 if (tos == 'R') /* authentication */
190 {
191 set_ps_display("PCP: processing authentication", false);
192 process_authentication(pcp_frontend, buf, salt, &random_salt);
193 authenticated = 1;
194 continue;
195 }
196 if (tos == 'M') /* md5 salt */
197 {
198 set_ps_display("PCP: processing authentication", false);
199 send_md5salt(pcp_frontend, salt);
200 random_salt = 1;
201 continue;
202 }
203 /* is this connection authenticated? if not disconnect immediately */
204 if (!authenticated)
205 ereport(FATAL,
206 (errmsg("authentication failed for new PCP connection"),
207 errdetail("connection not authorized")));
208
209 /* process a request */
210 pcp_process_command(tos, buf, rsize);
211 }
212 exit(0);
213 }
214
215
216 /* pcp command processor */
217 static void
pcp_process_command(char tos,char * buf,int buf_len)218 pcp_process_command(char tos, char *buf, int buf_len)
219 {
220
221 if (tos == 'O' || tos == 'T')
222 {
223 if (Req_info->switching)
224 {
225 if (Req_info->request_queue_tail != Req_info->request_queue_head)
226 {
227 POOL_REQUEST_KIND reqkind;
228
229 reqkind = Req_info->request[(Req_info->request_queue_head + 1) % MAX_REQUEST_QUEUE_SIZE].kind;
230
231 if (reqkind == NODE_UP_REQUEST)
232 ereport(ERROR,
233 (errmsg("failed to process PCP request at the moment"),
234 errdetail("failback is in progress")));
235 else if (reqkind == NODE_DOWN_REQUEST)
236 ereport(ERROR,
237 (errmsg("failed to process PCP request at the moment"),
238 errdetail("failover is in progress")));
239 else if (reqkind == PROMOTE_NODE_REQUEST)
240 ereport(ERROR,
241 (errmsg("failed to process PCP request at the moment"),
242 errdetail("promote node operation is in progress")));
243
244 ereport(ERROR,
245 (errmsg("failed to process PCP request at the moment"),
246 errdetail("operation is in progress")));
247 }
248 }
249 }
250
251 switch (tos)
252 {
253 case 'A': /* set configuration parameter */
254 set_ps_display("PCP: processing set configration parameter request", false);
255 process_set_configration_parameter(pcp_frontend, buf, buf_len);
256 break;
257
258 case 'L': /* node count */
259 set_ps_display("PCP: processing node count request", false);
260 inform_node_count(pcp_frontend);
261 break;
262
263 case 'I': /* node info */
264 set_ps_display("PCP: processing node info request", false);
265 inform_node_info(pcp_frontend, buf);
266 break;
267
268 case 'N': /* process count */
269 set_ps_display("PCP: processing process count request", false);
270 inform_process_count(pcp_frontend);
271 break;
272
273 case 'P': /* process info */
274 set_ps_display("PCP: processing process info request", false);
275 inform_process_info(pcp_frontend, buf);
276 break;
277
278 case 'W': /* watchdog info */
279 set_ps_display("PCP: processing watchdog info request", false);
280 inform_watchdog_info(pcp_frontend, buf);
281 break;
282
283 case 'D': /* detach node */
284 case 'd': /* detach node gracefully */
285 set_ps_display("PCP: processing detach node request", false);
286 process_detach_node(pcp_frontend, buf, tos);
287 break;
288
289 case 'C': /* attach node */
290 set_ps_display("PCP: processing attach node request", false);
291 process_attach_node(pcp_frontend, buf);
292 break;
293
294 case 'T':
295 set_ps_display("PCP: processing shutdown request", false);
296 process_shutown_request(pcp_frontend, buf[0]);
297 break;
298
299 case 'O': /* recovery request */
300 set_ps_display("PCP: processing recovery request", false);
301 process_recovery_request(pcp_frontend, buf);
302 break;
303
304 case 'B': /* status request */
305 set_ps_display("PCP: processing status request request", false);
306 process_status_request(pcp_frontend);
307 break;
308
309 case 'J': /* promote node */
310 case 'j': /* promote node gracefully */
311 set_ps_display("PCP: processing promote node request", false);
312 process_promote_node(pcp_frontend, buf, tos);
313 break;
314
315 case 'F':
316 ereport(DEBUG1,
317 (errmsg("PCP processing request, stop online recovery")));
318 break;
319
320 case 'X': /* disconnect */
321 ereport(DEBUG1,
322 (errmsg("PCP processing request, client disconnecting"),
323 errdetail("closing PCP connection, and exiting child")));
324 pcp_close(pcp_frontend);
325 pcp_frontend = NULL;
326 /* This child has done its part. Rest in peace now */
327 exit(0);
328 break;
329
330 default:
331 ereport(FATAL,
332 (errmsg("PCP processing request"),
333 errdetail("unknown PCP packet type \"%c\"", tos)));
334 }
335 }
336
337 static RETSIGTYPE
die(int sig)338 die(int sig)
339 {
340 ereport(DEBUG1,
341 (errmsg("PCP worker child receives shutdown request signal %d", sig)));
342 if (sig == SIGTERM)
343 {
344 ereport(DEBUG1,
345 (errmsg("PCP worker child receives smart shutdown request."),
346 errdetail("waiting for the child to die its natural death")));
347 }
348 else if (sig == SIGINT)
349 {
350 ereport(DEBUG1,
351 (errmsg("PCP worker child receives fast shutdown request.")));
352 exit(0);
353 }
354 else if (sig == SIGQUIT)
355 {
356 ereport(DEBUG1,
357 (errmsg("PCP worker child receives immediate shutdown request.")));
358 exit(0);
359 }
360 else
361 exit(1);
362 }
363
364
365 static RETSIGTYPE
wakeup_handler_child(int sig)366 wakeup_handler_child(int sig)
367 {
368 pcp_worker_wakeup_request = 1;
369 }
370
371 /*
372 * unset non-block flag
373 */
374 static void
unset_nonblock(int fd)375 unset_nonblock(int fd)
376 {
377 int var;
378
379 /* set fd to non-blocking */
380 var = fcntl(fd, F_GETFL, 0);
381 if (var == -1)
382 {
383 ereport(FATAL,
384 (errmsg("unable to connect"),
385 errdetail("fcntl system call failed with error : \"%m\"")));
386
387 }
388 if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
389 {
390 ereport(FATAL,
391 (errmsg("unable to connect"),
392 errdetail("fcntl system call failed with error : \"%m\"")));
393 }
394 }
395
396 /*
397 * see if received username and password matches with one in the file
398 */
399 static int
user_authenticate(char * buf,char * passwd_file,char * salt,int salt_len)400 user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len)
401 {
402 FILE *fp = NULL;
403 char packet_username[MAX_USER_PASSWD_LEN + 1];
404 char packet_password[MAX_USER_PASSWD_LEN + 1];
405 char encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
406 char file_username[MAX_USER_PASSWD_LEN + 1];
407 char file_password[MAX_USER_PASSWD_LEN + 1];
408 char *index = NULL;
409 static char line[MAX_FILE_LINE_LEN + 1];
410 int i,
411 len;
412
413 /* strcpy() should be OK, but use strncpy() to be extra careful */
414 strncpy(packet_username, buf, MAX_USER_PASSWD_LEN);
415 index = (char *) memchr(buf, '\0', MAX_USER_PASSWD_LEN);
416 if (index == NULL)
417 {
418 ereport(FATAL,
419 (errmsg("failed to authenticate PCP user"),
420 errdetail("error while reading authentication packet")));
421 return 0;
422 }
423 strncpy(packet_password, ++index, MAX_USER_PASSWD_LEN);
424
425 fp = fopen(passwd_file, "r");
426 if (fp == NULL)
427 {
428 ereport(FATAL,
429 (errmsg("failed to authenticate PCP user"),
430 errdetail("could not open %s. reason: %m", passwd_file)));
431 return 0;
432 }
433
434 /* for now, I don't care if duplicate username exists in the config file */
435 while ((fgets(line, MAX_FILE_LINE_LEN, fp)) != NULL)
436 {
437 i = 0;
438 len = 0;
439
440 if (line[0] == '\n' || line[0] == '#')
441 continue;
442
443 while (line[i] != ':')
444 {
445 len++;
446 if (++i > MAX_USER_PASSWD_LEN)
447 {
448 fclose(fp);
449 ereport(FATAL,
450 (errmsg("failed to authenticate PCP user"),
451 errdetail("username read from file \"%s\" is larger than maximum allowed username length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
452 return 0;
453 }
454 }
455 memcpy(file_username, line, len);
456 file_username[len] = '\0';
457
458 if (strcmp(packet_username, file_username) != 0)
459 continue;
460
461 i++;
462 len = 0;
463 while (line[i] != '\n' && line[i] != '\0')
464 {
465 len++;
466 if (++i > MAX_USER_PASSWD_LEN)
467 {
468 fclose(fp);
469 ereport(FATAL,
470 (errmsg("failed to authenticate PCP user"),
471 errdetail("password read from file \"%s\" is larger than maximum allowed password length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
472 return 0;
473 }
474 }
475
476 memcpy(file_password, line + strlen(file_username) + 1, len);
477 file_password[len] = '\0';
478
479 pool_md5_encrypt(file_password, file_username, strlen(file_username),
480 encrypt_buf + MD5_PASSWD_LEN + 1);
481 encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
482
483 pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, salt_len,
484 encrypt_buf);
485 encrypt_buf[MD5_PASSWD_LEN] = '\0';
486
487 if (strcmp(encrypt_buf, packet_password) == 0)
488 {
489 fclose(fp);
490 return 1;
491 }
492 }
493 fclose(fp);
494 ereport(FATAL,
495 (errmsg("authentication failed for user \"%s\"", packet_username),
496 errdetail("username and/or password does not match")));
497
498 return 0;
499 }
500
501
502 /* Detach a node */
503 static int
pool_detach_node(int node_id,bool gracefully)504 pool_detach_node(int node_id, bool gracefully)
505 {
506 if (!gracefully)
507 {
508 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
509 return 0;
510 }
511
512 /*
513 * Check if the NODE DOWN can be executed on the given node id.
514 */
515 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
516
517 /*
518 * Wait until all frontends exit
519 */
520 *InRecovery = RECOVERY_DETACH; /* This wiil ensure that new incoming
521 * connection requests are blocked */
522
523 if (wait_connection_closed())
524 {
525 /* wait timed out */
526 finish_recovery();
527 return -1;
528 }
529
530 pcp_worker_wakeup_request = 0;
531
532 /*
533 * Now all frontends have gone. Let's do failover.
534 */
535 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, false);
536
537 /*
538 * Wait for failover completed.
539 */
540
541 while (!pcp_worker_wakeup_request)
542 {
543 struct timeval t = {1, 0};
544
545 select(0, NULL, NULL, NULL, &t);
546 }
547 pcp_worker_wakeup_request = 0;
548
549 /*
550 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
551 * to distribute SIGUSR2 all pgpool children.
552 */
553 finish_recovery();
554
555 return 0;
556 }
557
558 /* Promote a node */
559 static int
pool_promote_node(int node_id,bool gracefully)560 pool_promote_node(int node_id, bool gracefully)
561 {
562 if (!gracefully)
563 {
564 promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
565 return 0;
566 }
567
568 /*
569 * Wait until all frontends exit
570 */
571 *InRecovery = RECOVERY_PROMOTE; /* This wiil ensure that new incoming
572 * connection requests are blocked */
573
574 if (wait_connection_closed())
575 {
576 /* wait timed out */
577 finish_recovery();
578 return -1;
579 }
580
581 /*
582 * Now all frontends have gone. Let's do failover.
583 */
584 promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
585
586 /*
587 * Wait for failover completed.
588 */
589 pcp_worker_wakeup_request = 0;
590
591 while (!pcp_worker_wakeup_request)
592 {
593 struct timeval t = {1, 0};
594
595 select(0, NULL, NULL, NULL, &t);
596 }
597 pcp_worker_wakeup_request = 0;
598
599 /*
600 * Start to accept incoming connections and send SIGUSR2 to pgpool parent
601 * to distribute SIGUSR2 all pgpool children.
602 */
603 finish_recovery();
604 return 0;
605 }
606
607 static void
inform_process_count(PCP_CONNECTION * frontend)608 inform_process_count(PCP_CONNECTION * frontend)
609 {
610 int wsize;
611 int process_count;
612 char process_count_str[16];
613 int *process_list = NULL;
614 char code[] = "CommandComplete";
615 char *mesg = NULL;
616 int i;
617 int total_port_len = 0;
618
619 process_list = pool_get_process_list(&process_count);
620
621 mesg = (char *) palloc(7 * process_count); /* PID is at most 6 characters
622 * long */
623
624 snprintf(process_count_str, sizeof(process_count_str), "%d", process_count);
625
626 for (i = 0; i < process_count; i++)
627 {
628 char process_id[7];
629
630 snprintf(process_id, sizeof(process_id), "%d", process_list[i]);
631 snprintf(mesg + total_port_len, strlen(process_id) + 1, "%s", process_id);
632 total_port_len += strlen(process_id) + 1;
633 }
634
635 pcp_write(frontend, "n", 1);
636 wsize = htonl(sizeof(code) +
637 strlen(process_count_str) + 1 +
638 total_port_len +
639 sizeof(int));
640 pcp_write(frontend, &wsize, sizeof(int));
641 pcp_write(frontend, code, sizeof(code));
642 pcp_write(frontend, process_count_str, strlen(process_count_str) + 1);
643 pcp_write(frontend, mesg, total_port_len);
644 do_pcp_flush(frontend);
645
646 pfree(process_list);
647 pfree(mesg);
648
649 ereport(DEBUG1,
650 (errmsg("PCP: informing process count"),
651 errdetail("%d process(es) found", process_count)));
652 }
653
654 static void
inform_process_info(PCP_CONNECTION * frontend,char * buf)655 inform_process_info(PCP_CONNECTION * frontend, char *buf)
656 {
657 int proc_id;
658 int wsize;
659 int num_proc = pool_config->num_init_children;
660 int i;
661
662 proc_id = atoi(buf);
663
664 if ((proc_id != 0) && (pool_get_process_info(proc_id) == NULL))
665 {
666 ereport(ERROR,
667 (errmsg("informing process info failed"),
668 errdetail("invalid process ID : %s", buf)));
669 }
670 else
671 {
672 /* First, send array size of connection_info */
673 char arr_code[] = "ArraySize";
674 char con_info_size[16];
675
676 /* Finally, indicate that all data is sent */
677 char fin_code[] = "CommandComplete";
678
679 POOL_REPORT_POOLS *pools = get_pools(&num_proc);
680
681 if (proc_id == 0)
682 {
683 snprintf(con_info_size, sizeof(con_info_size), "%d", num_proc);
684 }
685 else
686 {
687 snprintf(con_info_size, sizeof(con_info_size), "%d", pool_config->max_pool * NUM_BACKENDS);
688 }
689
690 pcp_write(frontend, "p", 1);
691 wsize = htonl(sizeof(arr_code) +
692 strlen(con_info_size) + 1 +
693 sizeof(int));
694 pcp_write(frontend, &wsize, sizeof(int));
695 pcp_write(frontend, arr_code, sizeof(arr_code));
696 pcp_write(frontend, con_info_size, strlen(con_info_size) + 1);
697 do_pcp_flush(frontend);
698
699 /* Second, send process information for all connection_info */
700 for (i = 0; i < num_proc; i++)
701 {
702 char code[] = "ProcessInfo";
703 char proc_pid[16];
704 char proc_start_time[20];
705 char proc_create_time[20];
706 char majorversion[5];
707 char minorversion[5];
708 char pool_counter[16];
709 char backend_id[16];
710 char backend_pid[16];
711 char connected[2];
712
713 if (proc_id != 0 && proc_id != pools[i].pool_pid)
714 continue;
715
716 snprintf(proc_pid, sizeof(proc_pid), "%d", pools[i].pool_pid);
717 snprintf(proc_start_time, sizeof(proc_start_time), "%ld", pools[i].start_time);
718 snprintf(proc_create_time, sizeof(proc_create_time), "%ld", pools[i].create_time);
719 snprintf(majorversion, sizeof(majorversion), "%d", pools[i].pool_majorversion);
720 snprintf(minorversion, sizeof(minorversion), "%d", pools[i].pool_minorversion);
721 snprintf(pool_counter, sizeof(pool_counter), "%d", pools[i].pool_counter);
722 snprintf(backend_id, sizeof(backend_pid), "%d", pools[i].backend_id);
723 snprintf(backend_pid, sizeof(backend_pid), "%d", pools[i].pool_backendpid);
724 snprintf(connected, sizeof(connected), "%d", pools[i].pool_connected);
725
726 pcp_write(frontend, "p", 1);
727 wsize = htonl(sizeof(code) +
728 strlen(proc_pid) + 1 +
729 strlen(pools[i].database) + 1 +
730 strlen(pools[i].username) + 1 +
731 strlen(proc_start_time) + 1 +
732 strlen(proc_create_time) + 1 +
733 strlen(majorversion) + 1 +
734 strlen(minorversion) + 1 +
735 strlen(pool_counter) + 1 +
736 strlen(backend_id) + 1 +
737 strlen(backend_pid) + 1 +
738 strlen(connected) + 1 +
739 sizeof(int));
740 pcp_write(frontend, &wsize, sizeof(int));
741 pcp_write(frontend, code, sizeof(code));
742 pcp_write(frontend, proc_pid, strlen(proc_pid) + 1);
743 pcp_write(frontend, pools[i].database, strlen(pools[i].database) + 1);
744 pcp_write(frontend, pools[i].username, strlen(pools[i].username) + 1);
745 pcp_write(frontend, proc_start_time, strlen(proc_start_time) + 1);
746 pcp_write(frontend, proc_create_time, strlen(proc_create_time) + 1);
747 pcp_write(frontend, majorversion, strlen(majorversion) + 1);
748 pcp_write(frontend, minorversion, strlen(minorversion) + 1);
749 pcp_write(frontend, pool_counter, strlen(pool_counter) + 1);
750 pcp_write(frontend, backend_id, strlen(backend_id) + 1);
751 pcp_write(frontend, backend_pid, strlen(backend_pid) + 1);
752 pcp_write(frontend, connected, strlen(connected) + 1);
753 do_pcp_flush(frontend);
754 }
755
756 pcp_write(frontend, "p", 1);
757 wsize = htonl(sizeof(fin_code) +
758 sizeof(int));
759 pcp_write(frontend, &wsize, sizeof(int));
760 pcp_write(frontend, fin_code, sizeof(fin_code));
761 do_pcp_flush(frontend);
762 ereport(DEBUG1,
763 (errmsg("PCP informing process info"),
764 errdetail("retrieved process information from shared memory")));
765
766 pfree(pools);
767 }
768 }
769
770 static void
inform_watchdog_info(PCP_CONNECTION * frontend,char * buf)771 inform_watchdog_info(PCP_CONNECTION * frontend, char *buf)
772 {
773 int wd_index;
774 int json_data_len;
775 int wsize;
776 char code[] = "CommandComplete";
777 char *json_data;
778
779 if (!pool_config->use_watchdog)
780 ereport(ERROR,
781 (errmsg("PCP: informing watchdog info failed"),
782 errdetail("watcdhog is not enabled")));
783
784 wd_index = atoi(buf);
785
786 json_data = wd_get_watchdog_nodes(wd_index);
787 if (json_data == NULL)
788 ereport(ERROR,
789 (errmsg("PCP: informing watchdog info failed"),
790 errdetail("invalid watchdog index")));
791
792 ereport(DEBUG2,
793 (errmsg("PCP: informing watchdog info"),
794 errdetail("retrieved node information from IPC socket")));
795
796 /*
797 * This is the voilation of PCP protocol but I think in future we should
798 * shift to more adaptable protocol for data transmition.
799 */
800 json_data_len = strlen(json_data);
801 wsize = htonl(sizeof(code) +
802 json_data_len + 1 +
803 sizeof(int));
804 pcp_write(frontend, "w", 1);
805
806 pcp_write(frontend, &wsize, sizeof(int));
807 pcp_write(frontend, code, sizeof(code));
808
809 pcp_write(frontend, json_data, json_data_len + 1);
810 do_pcp_flush(frontend);
811
812 pfree(json_data);
813 }
814
815 static void
inform_node_info(PCP_CONNECTION * frontend,char * buf)816 inform_node_info(PCP_CONNECTION * frontend, char *buf)
817 {
818 int node_id;
819 int wsize;
820 char port_str[6];
821 char status[2];
822 char weight_str[20];
823 char role_str[10];
824 char standby_delay_str[20];
825 char status_changed_time_str[20];
826 char code[] = "CommandComplete";
827 BackendInfo *bi = NULL;
828 SERVER_ROLE role;
829
830 node_id = atoi(buf);
831
832 bi = pool_get_node_info(node_id);
833
834 if (bi == NULL)
835 ereport(ERROR,
836 (errmsg("informing node info failed"),
837 errdetail("invalid node ID")));
838
839 ereport(DEBUG2,
840 (errmsg("PCP: informing node info"),
841 errdetail("retrieved node information from shared memory")));
842
843 snprintf(port_str, sizeof(port_str), "%d", bi->backend_port);
844 snprintf(status, sizeof(status), "%d", bi->backend_status);
845 snprintf(weight_str, sizeof(weight_str), "%f", bi->backend_weight);
846
847 if (STREAM)
848 {
849 if (Req_info->primary_node_id == node_id)
850 role = ROLE_PRIMARY;
851 else
852 role = ROLE_STANDBY;
853 }
854 else
855 {
856 if (Req_info->master_node_id == node_id)
857 role = ROLE_MASTER;
858 else
859 role = ROLE_SLAVE;
860 }
861 snprintf(role_str, sizeof(role_str), "%d", role);
862
863 snprintf(standby_delay_str, sizeof(standby_delay_str), UINT64_FORMAT, bi->standby_delay);
864
865 snprintf(status_changed_time_str, sizeof(status_changed_time_str), UINT64_FORMAT, bi->status_changed_time);
866
867 pcp_write(frontend, "i", 1);
868 wsize = htonl(sizeof(code) +
869 strlen(bi->backend_hostname) + 1 +
870 strlen(port_str) + 1 +
871 strlen(status) + 1 +
872 strlen(weight_str) + 1 +
873 strlen(role_str) + 1 +
874 strlen(standby_delay_str) + 1 +
875 strlen(status_changed_time_str) + 1 +
876 sizeof(int));
877 pcp_write(frontend, &wsize, sizeof(int));
878 pcp_write(frontend, code, sizeof(code));
879 pcp_write(frontend, bi->backend_hostname, strlen(bi->backend_hostname) + 1);
880 pcp_write(frontend, port_str, strlen(port_str) + 1);
881 pcp_write(frontend, status, strlen(status) + 1);
882 pcp_write(frontend, weight_str, strlen(weight_str) + 1);
883 pcp_write(frontend, role_str, strlen(role_str) + 1);
884 pcp_write(frontend, standby_delay_str, strlen(standby_delay_str) + 1);
885 pcp_write(frontend, status_changed_time_str, strlen(status_changed_time_str) + 1);
886
887 do_pcp_flush(frontend);
888 }
889
890 static void
inform_node_count(PCP_CONNECTION * frontend)891 inform_node_count(PCP_CONNECTION * frontend)
892 {
893 int wsize;
894 char mesg[16];
895 char code[] = "CommandComplete";
896 int node_count = pool_get_node_count();
897
898 snprintf(mesg, sizeof(mesg), "%d", node_count);
899
900 pcp_write(frontend, "l", 1);
901 wsize = htonl(sizeof(code) +
902 strlen(mesg) + 1 +
903 sizeof(int));
904 pcp_write(frontend, &wsize, sizeof(int));
905 pcp_write(frontend, code, sizeof(code));
906 pcp_write(frontend, mesg, strlen(mesg) + 1);
907 do_pcp_flush(frontend);
908
909 ereport(DEBUG1,
910 (errmsg("PCP: informing node count"),
911 errdetail("%d node(s) found", node_count)));
912 }
913
914 static void
process_detach_node(PCP_CONNECTION * frontend,char * buf,char tos)915 process_detach_node(PCP_CONNECTION * frontend, char *buf, char tos)
916 {
917 int node_id;
918 int wsize;
919 char code[] = "CommandComplete";
920 bool gracefully;
921
922 if (tos == 'D')
923 gracefully = false;
924 else
925 gracefully = true;
926
927 node_id = atoi(buf);
928 ereport(DEBUG1,
929 (errmsg("PCP: processing detach node"),
930 errdetail("detaching Node ID %d", node_id)));
931
932 pool_detach_node(node_id, gracefully);
933
934 pcp_write(frontend, "d", 1);
935 wsize = htonl(sizeof(code) + sizeof(int));
936 pcp_write(frontend, &wsize, sizeof(int));
937 pcp_write(frontend, code, sizeof(code));
938 do_pcp_flush(frontend);
939 }
940
941 static void
process_attach_node(PCP_CONNECTION * frontend,char * buf)942 process_attach_node(PCP_CONNECTION * frontend, char *buf)
943 {
944 int node_id;
945 int wsize;
946 char code[] = "CommandComplete";
947
948 node_id = atoi(buf);
949 ereport(DEBUG1,
950 (errmsg("PCP: processing attach node"),
951 errdetail("attaching Node ID %d", node_id)));
952
953 send_failback_request(node_id, true, REQ_DETAIL_CONFIRMED);
954
955 pcp_write(frontend, "c", 1);
956 wsize = htonl(sizeof(code) + sizeof(int));
957 pcp_write(frontend, &wsize, sizeof(int));
958 pcp_write(frontend, code, sizeof(code));
959 do_pcp_flush(frontend);
960 }
961
962
963 static void
process_recovery_request(PCP_CONNECTION * frontend,char * buf)964 process_recovery_request(PCP_CONNECTION * frontend, char *buf)
965 {
966 int wsize;
967 char code[] = "CommandComplete";
968 int node_id = atoi(buf);
969
970 if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
971 ereport(ERROR,
972 (errmsg("process recovery request failed"),
973 errdetail("node id %d is not valid", node_id)));
974
975 if ((!REPLICATION &&
976 !(MASTER_SLAVE &&
977 pool_config->master_slave_sub_mode == STREAM_MODE)) ||
978 (MASTER_SLAVE &&
979 pool_config->master_slave_sub_mode == STREAM_MODE &&
980 node_id == PRIMARY_NODE_ID))
981 {
982 if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
983 ereport(ERROR,
984 (errmsg("process recovery request failed"),
985 errdetail("primary server cannot be recovered by online recovery.")));
986 else
987 ereport(ERROR,
988 (errmsg("process recovery request failed"),
989 errdetail("recovery request is only allowed in replication and streaming replication modes.")));
990 }
991 else
992 {
993 if (pcp_mark_recovery_in_progress() == false)
994 ereport(FATAL,
995 (errmsg("process recovery request failed"),
996 errdetail("pgpool-II is already processing another recovery request.")));
997
998 ereport(DEBUG1,
999 (errmsg("PCP: processing recovery request"),
1000 errdetail("start online recovery")));
1001
1002 PG_TRY();
1003 {
1004 start_recovery(node_id);
1005 finish_recovery();
1006 pcp_write(frontend, "c", 1);
1007 wsize = htonl(sizeof(code) + sizeof(int));
1008 pcp_write(frontend, &wsize, sizeof(int));
1009 pcp_write(frontend, code, sizeof(code));
1010 do_pcp_flush(frontend);
1011 pcp_mark_recovery_finished();
1012 }
1013 PG_CATCH();
1014 {
1015 finish_recovery();
1016 pcp_mark_recovery_finished();
1017 PG_RE_THROW();
1018
1019 } PG_END_TRY();
1020 }
1021 do_pcp_flush(frontend);
1022 }
1023
1024 static void
process_status_request(PCP_CONNECTION * frontend)1025 process_status_request(PCP_CONNECTION * frontend)
1026 {
1027 int nrows = 0;
1028 int i;
1029 POOL_REPORT_CONFIG *status = get_config(&nrows);
1030 int len = 0;
1031
1032 /* First, send array size of connection_info */
1033 char arr_code[] = "ArraySize";
1034 char code[] = "ProcessConfig";
1035
1036 /* Finally, indicate that all data is sent */
1037 char fin_code[] = "CommandComplete";
1038
1039 pcp_write(frontend, "b", 1);
1040 len = htonl(sizeof(arr_code) + sizeof(int) + sizeof(int));
1041 pcp_write(frontend, &len, sizeof(int));
1042 pcp_write(frontend, arr_code, sizeof(arr_code));
1043 len = htonl(nrows);
1044 pcp_write(frontend, &len, sizeof(int));
1045
1046 do_pcp_flush(frontend);
1047
1048 for (i = 0; i < nrows; i++)
1049 {
1050 pcp_write(frontend, "b", 1);
1051 len = htonl(sizeof(int)
1052 + sizeof(code)
1053 + strlen(status[i].name) + 1
1054 + strlen(status[i].value) + 1
1055 + strlen(status[i].desc) + 1
1056 );
1057
1058 pcp_write(frontend, &len, sizeof(int));
1059 pcp_write(frontend, code, sizeof(code));
1060 pcp_write(frontend, status[i].name, strlen(status[i].name) + 1);
1061 pcp_write(frontend, status[i].value, strlen(status[i].value) + 1);
1062 pcp_write(frontend, status[i].desc, strlen(status[i].desc) + 1);
1063 }
1064
1065 pcp_write(frontend, "b", 1);
1066 len = htonl(sizeof(fin_code) + sizeof(int));
1067 pcp_write(frontend, &len, sizeof(int));
1068 pcp_write(frontend, fin_code, sizeof(fin_code));
1069 do_pcp_flush(frontend);
1070
1071 pfree(status);
1072 ereport(DEBUG1,
1073 (errmsg("PCP: processing status request"),
1074 errdetail("retrieved status information")));
1075 }
1076
1077 static void
process_promote_node(PCP_CONNECTION * frontend,char * buf,char tos)1078 process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos)
1079 {
1080 int node_id;
1081 int wsize;
1082 char code[] = "CommandComplete";
1083 bool gracefully;
1084
1085 if (tos == 'J')
1086 gracefully = false;
1087 else
1088 gracefully = true;
1089
1090 node_id = atoi(buf);
1091 if ((node_id < 0) || (node_id >= pool_config->backend_desc->num_backends))
1092 ereport(ERROR,
1093 (errmsg("could not process recovery request"),
1094 errdetail("node id %d is not valid", node_id)));
1095 /* promoting node is reserved to Streaming Replication */
1096 if (!MASTER_SLAVE || pool_config->master_slave_sub_mode != STREAM_MODE)
1097 {
1098 ereport(FATAL,
1099 (errmsg("invalid pgpool mode for process recovery request"),
1100 errdetail("not in streaming replication mode, can't promote node id %d", node_id)));
1101
1102 }
1103
1104 if (node_id == REAL_PRIMARY_NODE_ID)
1105 {
1106 ereport(FATAL,
1107 (errmsg("invalid pgpool mode for process recovery request"),
1108 errdetail("specified node is already primary node, can't promote node id %d", node_id)));
1109
1110 }
1111 ereport(DEBUG1,
1112 (errmsg("PCP: processing promote node"),
1113 errdetail("promoting Node ID %d", node_id)));
1114 pool_promote_node(node_id, gracefully);
1115
1116 pcp_write(frontend, "d", 1);
1117 wsize = htonl(sizeof(code) + sizeof(int));
1118 pcp_write(frontend, &wsize, sizeof(int));
1119 pcp_write(frontend, code, sizeof(code));
1120 do_pcp_flush(frontend);
1121 }
1122
1123 static void
process_authentication(PCP_CONNECTION * frontend,char * buf,char * salt,int * random_salt)1124 process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt)
1125 {
1126 int wsize;
1127 int authenticated;
1128
1129 if (*random_salt)
1130 {
1131 authenticated = user_authenticate(buf, pcp_conf_file, salt, 4);
1132 }
1133 if (!*random_salt || !authenticated)
1134 {
1135 ereport(FATAL,
1136 (errmsg("authentication failed"),
1137 errdetail("username and/or password does not match")));
1138
1139 *random_salt = 0;
1140 }
1141 else
1142 {
1143 char code[] = "AuthenticationOK";
1144
1145 pcp_write(frontend, "r", 1);
1146 wsize = htonl(sizeof(code) + sizeof(int));
1147 pcp_write(frontend, &wsize, sizeof(int));
1148 pcp_write(frontend, code, sizeof(code));
1149 do_pcp_flush(frontend);
1150 *random_salt = 0;
1151
1152 ereport(DEBUG1,
1153 (errmsg("PCP: processing authentication request"),
1154 errdetail("authentication OK")));
1155 }
1156 }
1157
1158 static void
send_md5salt(PCP_CONNECTION * frontend,char * salt)1159 send_md5salt(PCP_CONNECTION * frontend, char *salt)
1160 {
1161 int wsize;
1162
1163 ereport(DEBUG1,
1164 (errmsg("PCP: sending md5 salt to client")));
1165
1166 pool_random_salt(salt);
1167
1168 pcp_write(frontend, "m", 1);
1169 wsize = htonl(sizeof(int) + 4);
1170 pcp_write(frontend, &wsize, sizeof(int));
1171 pcp_write(frontend, salt, 4);
1172 do_pcp_flush(frontend);
1173 }
1174
1175 static void
process_shutown_request(PCP_CONNECTION * frontend,char mode)1176 process_shutown_request(PCP_CONNECTION * frontend, char mode)
1177 {
1178 char code[] = "CommandComplete";
1179 pid_t ppid = getppid();
1180 int sig,
1181 len;
1182
1183 if (mode == 's')
1184 {
1185 ereport(DEBUG1,
1186 (errmsg("PCP: processing shutdown request"),
1187 errdetail("sending SIGTERM to the parent process with PID:%d", ppid)));
1188 sig = SIGTERM;
1189 }
1190 else if (mode == 'f')
1191 {
1192 ereport(DEBUG1,
1193 (errmsg("PCP: processing shutdown request"),
1194 errdetail("sending SIGINT to the parent process with PID:%d", ppid)));
1195 sig = SIGINT;
1196 }
1197 else if (mode == 'i')
1198 {
1199 ereport(DEBUG1,
1200 (errmsg("PCP: processing shutdown request"),
1201 errdetail("sending SIGQUIT to the parent process with PID:%d", ppid)));
1202 sig = SIGQUIT;
1203 }
1204 else
1205 {
1206 ereport(ERROR,
1207 (errmsg("PCP: error while processing shutdown request"),
1208 errdetail("invalid shutdown mode \"%c\"", mode)));
1209 }
1210
1211 pcp_write(frontend, "t", 1);
1212 len = htonl(sizeof(code) + sizeof(int));
1213 pcp_write(frontend, &len, sizeof(int));
1214 pcp_write(frontend, code, sizeof(code));
1215 do_pcp_flush(frontend);
1216
1217 pool_signal_parent(sig);
1218 }
1219
1220 static void
process_set_configration_parameter(PCP_CONNECTION * frontend,char * buf,int len)1221 process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len)
1222 {
1223 char *param_name;
1224 char *param_value;
1225 int wsize;
1226 char code[] = "CommandComplete";
1227
1228 param_name = buf;
1229 if (param_name == NULL)
1230 ereport(ERROR,
1231 (errmsg("PCP: set configuration parameter failed"),
1232 errdetail("invalid pcp packet received from client")));
1233
1234 param_value = (char *) memchr(buf, '\0', len);
1235 if (param_value == NULL)
1236 ereport(ERROR,
1237 (errmsg("set configuration parameter failed"),
1238 errdetail("invalid pcp packet received from client")));
1239
1240 param_value += 1;
1241 ereport(LOG,
1242 (errmsg("set configuration parameter, \"%s TO %s\"", param_name, param_value)));
1243
1244 if (strcasecmp(param_name, "client_min_messages") == 0)
1245 {
1246 const char *ordered_valid_values[] = {"debug5", "debug4", "debug3", "debug2", "debug1", "log", "commerror", "info", "notice", "warning", "error", NULL};
1247 bool found = false;
1248 int i;
1249
1250 for (i = 0;; i++)
1251 {
1252 char *valid_val = (char *) ordered_valid_values[i];
1253
1254 if (!valid_val)
1255 break;
1256
1257 if (!strcasecmp(param_value, valid_val))
1258 {
1259 found = true;
1260 pool_config->client_min_messages = i + 10;
1261 ereport(DEBUG1,
1262 (errmsg("PCP setting parameter \"%s\" to \"%s\"", param_name, param_value)));
1263 break;
1264 }
1265 }
1266 if (!found)
1267 ereport(ERROR,
1268 (errmsg("PCP: set configuration parameter failed"),
1269 errdetail("invalid value \"%s\" for parameter \"%s\"", param_value, param_name)));
1270 }
1271 else
1272 ereport(ERROR,
1273 (errmsg("PCP: set configuration parameter failed"),
1274 errdetail("invalid parameter \"%s\"", param_name)));
1275
1276 pcp_write(frontend, "a", 1);
1277 wsize = htonl(sizeof(code) + sizeof(int));
1278 pcp_write(frontend, &wsize, sizeof(int));
1279 pcp_write(frontend, code, sizeof(code));
1280 do_pcp_flush(frontend);
1281 }
1282
1283 /*
1284 * Wrapper around pcp_flush which throws FATAL error when pcp_flush fails
1285 */
1286 static void
do_pcp_flush(PCP_CONNECTION * frontend)1287 do_pcp_flush(PCP_CONNECTION * frontend)
1288 {
1289 if (pcp_flush(frontend) < 0)
1290 ereport(FATAL,
1291 (errmsg("failed to flush data to client"),
1292 errdetail("pcp_flush failed with error : \"%m\"")));
1293 }
1294
1295 /*
1296 * Wrapper around pcp_read which throws FATAL error when read fails
1297 */
1298 static void
do_pcp_read(PCP_CONNECTION * pc,void * buf,int len)1299 do_pcp_read(PCP_CONNECTION * pc, void *buf, int len)
1300 {
1301 if (pcp_read(pc, buf, len))
1302 ereport(FATAL,
1303 (errmsg("unable to read from client"),
1304 errdetail("pcp_read failed with error : \"%m\"")));
1305 }
1306
1307 int
send_to_pcp_frontend(char * data,int len,bool flush)1308 send_to_pcp_frontend(char *data, int len, bool flush)
1309 {
1310 int ret;
1311
1312 if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1313 return -1;
1314 ret = pcp_write(pcp_frontend, data, len);
1315 if (flush && !ret)
1316 ret = pcp_flush(pcp_frontend);
1317 return ret;
1318 }
1319
1320 int
pcp_frontend_exists(void)1321 pcp_frontend_exists(void)
1322 {
1323 if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1324 return -1;
1325 return 0;
1326 }
1327
1328 static void
pcp_worker_will_go_down(int code,Datum arg)1329 pcp_worker_will_go_down(int code, Datum arg)
1330 {
1331 if (processType != PT_PCP_WORKER)
1332 {
1333 /* should never happen */
1334 ereport(WARNING,
1335 (errmsg("pcp_worker_will_go_down called from invalid process")));
1336 return;
1337 }
1338 if (pcp_frontend)
1339 pcp_close(pcp_frontend);
1340 processState = EXITING;
1341 POOL_SETMASK(&UnBlockSig);
1342
1343 }
1344