1 /* -*-pcp_worker.c-*- */
2 /*
3 *
4 * pgpool: a language independent connection pool server for PostgreSQL
5 * written by Tatsuo Ishii
6 *
7 * Copyright (c) 2003-2019 PgPool Global Development Group
8 *
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
19 *
20 * pcp_worker.c: PCP worker child process main
21 *
22 */
23
24 #include "config.h"
25 #include "pool.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28
29 #include <arpa/inet.h>
30 #include <signal.h>
31
32 #include <stdio.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <stdlib.h>
37 #include <sys/time.h>
38
39 #ifdef HAVE_FCNTL_H
40 #include <fcntl.h>
41 #endif
42
43 #include "pcp/pcp_stream.h"
44 #include "pcp/pcp.h"
45 #include "auth/md5.h"
46 #include "pool_config.h"
47 #include "context/pool_process_context.h"
48 #include "utils/pool_process_reporting.h"
49 #include "watchdog/wd_json_data.h"
50 #include "watchdog/wd_ipc_commands.h"
51 #include "utils/elog.h"
52
53 #define MAX_FILE_LINE_LEN 512
54
55 extern char *pcp_conf_file; /* global variable defined in main.c holds the path for pcp.conf */
56 volatile sig_atomic_t pcp_worker_wakeup_request = 0;
57 PCP_CONNECTION* volatile pcp_frontend = NULL;
58
59 static RETSIGTYPE die(int sig);
60 static RETSIGTYPE wakeup_handler_child(int sig);
61
62 static void unset_nonblock(int fd);
63 static int user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len);
64 static void process_authentication(PCP_CONNECTION *frontend, char *buf, char* salt, int *random_salt);
65 static void send_md5salt(PCP_CONNECTION *frontend, char* salt);
66
67 static void pcp_process_command(char tos, char *buf, int buf_len);
68
69 static int pool_detach_node(int node_id, bool gracefully);
70 static int pool_promote_node(int node_id, bool gracefully);
71 static void inform_process_count(PCP_CONNECTION *frontend);
72 static void inform_process_info(PCP_CONNECTION *frontend, char *buf);
73 static void inform_watchdog_info(PCP_CONNECTION *frontend, char *buf);
74 static void inform_node_info(PCP_CONNECTION *frontend, char *buf);
75 static void inform_node_count(PCP_CONNECTION *frontend);
76 static void process_detach_node(PCP_CONNECTION *frontend,char *buf,char tos);
77 static void process_attach_node(PCP_CONNECTION *frontend,char *buf);
78 static void process_recovery_request(PCP_CONNECTION *frontend,char *buf);
79 static void process_status_request(PCP_CONNECTION *frontend);
80 static void process_promote_node(PCP_CONNECTION *frontend,char *buf, char tos);
81 static void process_shutown_request(PCP_CONNECTION *frontend,char mode);
82 static void process_set_configration_parameter(PCP_CONNECTION *frontend,char *buf, int len);
83
84 static void pcp_worker_will_go_down(int code, Datum arg);
85
86 static void do_pcp_flush(PCP_CONNECTION *frontend);
87 static void do_pcp_read(PCP_CONNECTION *pc, void *buf, int len);
88
89 /*
90 * main entry pont of pcp worker child process
91 */
92 void
pcp_worker_main(int port)93 pcp_worker_main(int port)
94 {
95 sigjmp_buf local_sigjmp_buf;
96 MemoryContext PCPMemoryContext;
97 volatile int authenticated = 0;
98
99 char salt[4];
100 int random_salt = 0;
101 struct timeval uptime;
102 char tos;
103 int rsize;
104
105 ereport(DEBUG1,
106 (errmsg("I am PCP worker child with pid:%d",getpid())));
107
108 /* Identify myself via ps */
109 init_ps_display("", "", "", "");
110
111 gettimeofday(&uptime, NULL);
112 srandom((unsigned int) (getpid() ^ uptime.tv_usec));
113
114 /* set up signal handlers */
115 signal(SIGTERM, die);
116 signal(SIGINT, die);
117 signal(SIGQUIT, die);
118 signal(SIGCHLD, SIG_DFL);
119 signal(SIGUSR2, wakeup_handler_child);
120 signal(SIGUSR1, SIG_IGN);
121 signal(SIGHUP, SIG_IGN);
122 signal(SIGPIPE, SIG_IGN);
123 signal(SIGALRM, SIG_IGN);
124 /* Create per loop iteration memory context */
125 PCPMemoryContext = AllocSetContextCreate(TopMemoryContext,
126 "PCP_worker_main_loop",
127 ALLOCSET_DEFAULT_MINSIZE,
128 ALLOCSET_DEFAULT_INITSIZE,
129 ALLOCSET_DEFAULT_MAXSIZE);
130
131 MemoryContextSwitchTo(TopMemoryContext);
132
133 /*
134 * install the call back for preparation of pcp worker child exit
135 */
136 on_system_exit(pcp_worker_will_go_down, (Datum)NULL);
137
138 /* Initialize my backend status */
139 pool_initialize_private_backend_status();
140
141 /* Initialize process context */
142 pool_init_process_context();
143
144 pcp_frontend = pcp_open(port);
145 unset_nonblock(pcp_frontend->fd);
146
147 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
148 {
149 error_context_stack = NULL;
150 EmitErrorReport();
151
152 MemoryContextSwitchTo(TopMemoryContext);
153 FlushErrorState();
154 }
155 /* We can now handle ereport(ERROR) */
156 PG_exception_stack = &local_sigjmp_buf;
157
158 for(;;)
159 {
160 char *buf = NULL;
161
162 MemoryContextSwitchTo(PCPMemoryContext);
163 MemoryContextResetAndDeleteChildren(PCPMemoryContext);
164
165 errno = 0;
166
167 /* read a PCP packet */
168 do_pcp_read(pcp_frontend, &tos, 1);
169 do_pcp_read(pcp_frontend, &rsize, sizeof(int));
170
171 rsize = ntohl(rsize);
172
173 if (rsize <= 0 || rsize >= MAX_PCP_PACKET_LENGTH)
174 ereport(FATAL,
175 (errmsg("invalid PCP packet"),
176 errdetail("incorrect packet length (%d)", rsize)));
177
178 if ((rsize - sizeof(int)) > 0)
179 {
180 buf = (char *)palloc(rsize - sizeof(int));
181 do_pcp_read(pcp_frontend, buf, rsize - sizeof(int));
182 }
183
184 ereport(DEBUG1,
185 (errmsg("received PCP packet"),
186 errdetail("PCP packet type of service '%c'", tos)));
187
188 if (tos == 'R') /* authentication */
189 {
190 set_ps_display("PCP: processing authentication", false);
191 process_authentication(pcp_frontend, buf,salt, &random_salt);
192 authenticated = 1;
193 continue;
194 }
195 if (tos == 'M') /* md5 salt */
196 {
197 set_ps_display("PCP: processing authentication", false);
198 send_md5salt(pcp_frontend, salt);
199 random_salt = 1;
200 continue;
201 }
202 /* is this connection authenticated? if not disconnect immediately*/
203 if (!authenticated)
204 ereport(FATAL,
205 (errmsg("authentication failed for new PCP connection"),
206 errdetail("connection not authorized")));
207
208 /* process a request */
209 pcp_process_command(tos, buf, rsize);
210 }
211 exit(0);
212 }
213
214
215 /* pcp command processor */
216 static void
pcp_process_command(char tos,char * buf,int buf_len)217 pcp_process_command(char tos, char *buf, int buf_len)
218 {
219
220 if (tos == 'O' || tos == 'T')
221 {
222 if (Req_info->switching)
223 {
224 if(Req_info->request_queue_tail != Req_info->request_queue_head)
225 {
226 POOL_REQUEST_KIND reqkind;
227 reqkind = Req_info->request[(Req_info->request_queue_head +1) % MAX_REQUEST_QUEUE_SIZE].kind;
228
229 if (reqkind == NODE_UP_REQUEST)
230 ereport(ERROR,
231 (errmsg("failed to process PCP request at the moment"),
232 errdetail("failback is in progress")));
233 else if (reqkind == NODE_DOWN_REQUEST)
234 ereport(ERROR,
235 (errmsg("failed to process PCP request at the moment"),
236 errdetail("failover is in progress")));
237 else if (reqkind == PROMOTE_NODE_REQUEST)
238 ereport(ERROR,
239 (errmsg("failed to process PCP request at the moment"),
240 errdetail("promote node operation is in progress")));
241
242 ereport(ERROR,
243 (errmsg("failed to process PCP request at the moment"),
244 errdetail("operation is in progress")));
245 }
246 }
247 }
248
249 switch (tos)
250 {
251 case 'A': /* set configuration parameter */
252 set_ps_display("PCP: processing set configration parameter request", false);
253 process_set_configration_parameter(pcp_frontend,buf,buf_len);
254 break;
255
256 case 'L': /* node count */
257 set_ps_display("PCP: processing node count request", false);
258 inform_node_count(pcp_frontend);
259 break;
260
261 case 'I': /* node info */
262 set_ps_display("PCP: processing node info request", false);
263 inform_node_info(pcp_frontend, buf);
264 break;
265
266 case 'N': /* process count */
267 set_ps_display("PCP: processing process count request", false);
268 inform_process_count(pcp_frontend);
269 break;
270
271 case 'P': /* process info */
272 set_ps_display("PCP: processing process info request", false);
273 inform_process_info(pcp_frontend, buf);
274 break;
275
276 case 'W': /* watchdog info */
277 set_ps_display("PCP: processing watchdog info request", false);
278 inform_watchdog_info(pcp_frontend, buf);
279 break;
280
281 case 'D': /* detach node */
282 case 'd': /* detach node gracefully */
283 set_ps_display("PCP: processing detach node request", false);
284 process_detach_node(pcp_frontend, buf, tos);
285 break;
286
287 case 'C': /* attach node */
288 set_ps_display("PCP: processing attach node request", false);
289 process_attach_node(pcp_frontend, buf);
290 break;
291
292 case 'T':
293 set_ps_display("PCP: processing shutdown request", false);
294 process_shutown_request(pcp_frontend, buf[0]);
295 break;
296
297 case 'O': /* recovery request */
298 set_ps_display("PCP: processing recovery request", false);
299 process_recovery_request(pcp_frontend, buf);
300 break;
301
302 case 'B': /* status request*/
303 set_ps_display("PCP: processing status request request", false);
304 process_status_request(pcp_frontend);
305 break;
306
307 case 'J': /* promote node */
308 case 'j': /* promote node gracefully */
309 set_ps_display("PCP: processing promote node request", false);
310 process_promote_node(pcp_frontend,buf,tos);
311 break;
312
313 case 'F':
314 ereport(DEBUG1,
315 (errmsg("PCP processing request, stop online recovery")));
316 break;
317
318 case 'X': /* disconnect */
319 ereport(DEBUG1,
320 (errmsg("PCP processing request, client disconnecting"),
321 errdetail("closing PCP connection, and exiting child")));
322 pcp_close(pcp_frontend);
323 pcp_frontend = NULL;
324 /* This child has done its part. Rest in peace now */
325 exit(0);
326 break;
327
328 default:
329 ereport(FATAL,
330 (errmsg("PCP processing request"),
331 errdetail("unknown PCP packet type \"%c\"",tos)));
332 }
333 }
334
335 static RETSIGTYPE
die(int sig)336 die(int sig)
337 {
338 ereport(DEBUG1,
339 (errmsg("PCP worker child receives shutdown request signal %d", sig)));
340 if(sig == SIGTERM)
341 {
342 ereport(DEBUG1,
343 (errmsg("PCP worker child receives smart shutdown request."),
344 errdetail("waiting for the child to die its natural death")));
345 }
346 else if (sig == SIGINT)
347 {
348 ereport(DEBUG1,
349 (errmsg("PCP worker child receives fast shutdown request.")));
350 exit(0);
351 }
352 else if (sig == SIGQUIT)
353 {
354 ereport(DEBUG1,
355 (errmsg("PCP worker child receives immediate shutdown request.")));
356 exit(0);
357 }
358 else
359 exit(1);
360 }
361
362
363 static RETSIGTYPE
wakeup_handler_child(int sig)364 wakeup_handler_child(int sig)
365 {
366 pcp_worker_wakeup_request = 1;
367 }
368
369 /*
370 * unset non-block flag
371 */
372 static void
unset_nonblock(int fd)373 unset_nonblock(int fd)
374 {
375 int var;
376
377 /* set fd to non-blocking */
378 var = fcntl(fd, F_GETFL, 0);
379 if (var == -1)
380 {
381 ereport(FATAL,
382 (errmsg("unable to connect"),
383 errdetail("fcntl system call failed with error : \"%s\"",strerror(errno))));
384
385 }
386 if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
387 {
388 ereport(FATAL,
389 (errmsg("unable to connect"),
390 errdetail("fcntl system call failed with error : \"%s\"",strerror(errno))));
391 }
392 }
393
394 /*
395 * see if received username and password matches with one in the file
396 */
397 static int
user_authenticate(char * buf,char * passwd_file,char * salt,int salt_len)398 user_authenticate(char *buf, char *passwd_file, char *salt, int salt_len)
399 {
400 FILE *fp = NULL;
401 char packet_username[MAX_USER_PASSWD_LEN+1];
402 char packet_password[MAX_USER_PASSWD_LEN+1];
403 char encrypt_buf[(MD5_PASSWD_LEN+1)*2];
404 char file_username[MAX_USER_PASSWD_LEN+1];
405 char file_password[MAX_USER_PASSWD_LEN+1];
406 char *index = NULL;
407 static char line[MAX_FILE_LINE_LEN+1];
408 int i, len;
409
410 /* strcpy() should be OK, but use strncpy() to be extra careful */
411 strncpy(packet_username, buf, MAX_USER_PASSWD_LEN);
412 index = (char *) memchr(buf, '\0', MAX_USER_PASSWD_LEN);
413 if (index == NULL)
414 {
415 ereport(FATAL,
416 (errmsg("failed to authenticate PCP user"),
417 errdetail("error while reading authentication packet")));
418 return 0;
419 }
420 strncpy(packet_password, ++index, MAX_USER_PASSWD_LEN);
421
422 fp = fopen(passwd_file, "r");
423 if (fp == NULL)
424 {
425 ereport(FATAL,
426 (errmsg("failed to authenticate PCP user"),
427 errdetail("could not open %s. reason: %s", passwd_file, strerror(errno))));
428 return 0;
429 }
430
431 /* for now, I don't care if duplicate username exists in the config file */
432 while ((fgets(line, MAX_FILE_LINE_LEN, fp)) != NULL)
433 {
434 i = 0;
435 len = 0;
436
437 if (line[0] == '\n' || line[0] == '#')
438 continue;
439
440 while (line[i] != ':')
441 {
442 len++;
443 if (++i > MAX_USER_PASSWD_LEN)
444 {
445 fclose(fp);
446 ereport(FATAL,
447 (errmsg("failed to authenticate PCP user"),
448 errdetail("username read from file \"%s\" is larger than maximum allowed username length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
449 return 0;
450 }
451 }
452 memcpy(file_username, line, len);
453 file_username[len] = '\0';
454
455 if (strcmp(packet_username, file_username) != 0)
456 continue;
457
458 i++;
459 len = 0;
460 while (line[i] != '\n' && line[i] != '\0')
461 {
462 len++;
463 if (++i > MAX_USER_PASSWD_LEN)
464 {
465 fclose(fp);
466 ereport(FATAL,
467 (errmsg("failed to authenticate PCP user"),
468 errdetail("password read from file \"%s\" is larger than maximum allowed password length [%d]", passwd_file, MAX_USER_PASSWD_LEN)));
469 return 0;
470 }
471 }
472
473 memcpy(file_password, line+strlen(file_username)+1, len);
474 file_password[len] = '\0';
475
476 pool_md5_encrypt(file_password, file_username, strlen(file_username),
477 encrypt_buf + MD5_PASSWD_LEN + 1);
478 encrypt_buf[(MD5_PASSWD_LEN+1)*2-1] = '\0';
479
480 pool_md5_encrypt(encrypt_buf+MD5_PASSWD_LEN+1, salt, salt_len,
481 encrypt_buf);
482 encrypt_buf[MD5_PASSWD_LEN] = '\0';
483
484 if (strcmp(encrypt_buf, packet_password) == 0)
485 {
486 fclose(fp);
487 return 1;
488 }
489 }
490 fclose(fp);
491 ereport(FATAL,
492 (errmsg("authentication failed for user \"%s\"",packet_username),
493 errdetail("username and/or password does not match")));
494
495 return 0;
496 }
497
498
499 /* Detach a node */
pool_detach_node(int node_id,bool gracefully)500 static int pool_detach_node(int node_id, bool gracefully)
501 {
502 if (!gracefully)
503 {
504 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
505 return 0;
506 }
507
508 /* Check if the NODE DOWN can be executed on
509 * the given node id.
510 */
511 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
512
513 /*
514 * Wait until all frontends exit
515 */
516 *InRecovery = RECOVERY_DETACH; /* This wiil ensure that new incoming
517 * connection requests are blocked */
518
519 if (wait_connection_closed())
520 {
521 /* wait timed out */
522 finish_recovery();
523 return -1;
524 }
525
526 pcp_worker_wakeup_request = 0;
527
528 /*
529 * Now all frontends have gone. Let's do failover.
530 */
531 degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, false);
532
533 /*
534 * Wait for failover completed.
535 */
536
537 while (!pcp_worker_wakeup_request)
538 {
539 struct timeval t = {1, 0};
540 select(0, NULL, NULL, NULL, &t);
541 }
542 pcp_worker_wakeup_request = 0;
543
544 /*
545 * Start to accept incoming connections and send SIGUSR2 to pgpool
546 * parent to distribute SIGUSR2 all pgpool children.
547 */
548 finish_recovery();
549
550 return 0;
551 }
552
553 /* Promote a node */
pool_promote_node(int node_id,bool gracefully)554 static int pool_promote_node(int node_id, bool gracefully)
555 {
556 if (!gracefully)
557 {
558 promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
559 return 0;
560 }
561
562 /*
563 * Wait until all frontends exit
564 */
565 *InRecovery = RECOVERY_PROMOTE; /* This wiil ensure that new incoming
566 * connection requests are blocked */
567
568 if (wait_connection_closed())
569 {
570 /* wait timed out */
571 finish_recovery();
572 return -1;
573 }
574
575 /*
576 * Now all frontends have gone. Let's do failover.
577 */
578 promote_backend(node_id, REQ_DETAIL_CONFIRMED); /* send promote request */
579
580 /*
581 * Wait for failover completed.
582 */
583 pcp_worker_wakeup_request = 0;
584
585 while (!pcp_worker_wakeup_request)
586 {
587 struct timeval t = {1, 0};
588 select(0, NULL, NULL, NULL, &t);
589 }
590 pcp_worker_wakeup_request = 0;
591
592 /*
593 * Start to accept incoming connections and send SIGUSR2 to pgpool
594 * parent to distribute SIGUSR2 all pgpool children.
595 */
596 finish_recovery();
597 return 0;
598 }
599
600 static void
inform_process_count(PCP_CONNECTION * frontend)601 inform_process_count(PCP_CONNECTION *frontend)
602 {
603 int wsize;
604 int process_count;
605 char process_count_str[16];
606 int *process_list = NULL;
607 char code[] = "CommandComplete";
608 char *mesg = NULL;
609 int i;
610 int total_port_len = 0;
611
612 process_list = pool_get_process_list(&process_count);
613
614 mesg = (char *)palloc(7*process_count); /* PID is at most 6 characters long */
615
616 snprintf(process_count_str, sizeof(process_count_str), "%d", process_count);
617
618 for (i = 0; i < process_count; i++)
619 {
620 char process_id[7];
621 snprintf(process_id, sizeof(process_id), "%d", process_list[i]);
622 snprintf(mesg+total_port_len, strlen(process_id)+1, "%s", process_id);
623 total_port_len += strlen(process_id)+1;
624 }
625
626 pcp_write(frontend, "n", 1);
627 wsize = htonl(sizeof(code) +
628 strlen(process_count_str)+1 +
629 total_port_len +
630 sizeof(int));
631 pcp_write(frontend, &wsize, sizeof(int));
632 pcp_write(frontend, code, sizeof(code));
633 pcp_write(frontend, process_count_str, strlen(process_count_str)+1);
634 pcp_write(frontend, mesg, total_port_len);
635 do_pcp_flush(frontend);
636
637 pfree(process_list);
638 pfree(mesg);
639
640 ereport(DEBUG1,
641 (errmsg("PCP: informing process count"),
642 errdetail("%d process(es) found", process_count)));
643 }
644
645 static void
inform_process_info(PCP_CONNECTION * frontend,char * buf)646 inform_process_info(PCP_CONNECTION *frontend,char *buf)
647 {
648 int proc_id;
649 int wsize;
650 int num_proc = pool_config->num_init_children;
651 int i;
652
653 proc_id = atoi(buf);
654
655 if ((proc_id != 0) && (pool_get_process_info(proc_id) == NULL))
656 {
657 ereport(ERROR,
658 (errmsg("informing process info failed"),
659 errdetail("invalid process ID : %s",buf)));
660 }
661 else
662 {
663 /* First, send array size of connection_info */
664 char arr_code[] = "ArraySize";
665 char con_info_size[16];
666 /* Finally, indicate that all data is sent */
667 char fin_code[] = "CommandComplete";
668
669 POOL_REPORT_POOLS *pools = get_pools(&num_proc);
670
671 if (proc_id == 0)
672 {
673 snprintf(con_info_size, sizeof(con_info_size), "%d", num_proc);
674 }
675 else
676 {
677 snprintf(con_info_size, sizeof(con_info_size), "%d", pool_config->max_pool * NUM_BACKENDS);
678 }
679
680 pcp_write(frontend, "p", 1);
681 wsize = htonl(sizeof(arr_code) +
682 strlen(con_info_size)+1 +
683 sizeof(int));
684 pcp_write(frontend, &wsize, sizeof(int));
685 pcp_write(frontend, arr_code, sizeof(arr_code));
686 pcp_write(frontend, con_info_size, strlen(con_info_size)+1);
687 do_pcp_flush(frontend);
688
689 /* Second, send process information for all connection_info */
690 for (i=0; i<num_proc; i++)
691 {
692 char code[] = "ProcessInfo";
693 char proc_pid[16];
694 char proc_start_time[20];
695 char proc_create_time[20];
696 char majorversion[5];
697 char minorversion[5];
698 char pool_counter[16];
699 char backend_id[16];
700 char backend_pid[16];
701 char connected[2];
702
703 if (proc_id != 0 && proc_id != pools[i].pool_pid) continue;
704
705 snprintf(proc_pid, sizeof(proc_pid), "%d", pools[i].pool_pid);
706 snprintf(proc_start_time, sizeof(proc_start_time), "%ld", pools[i].start_time);
707 snprintf(proc_create_time, sizeof(proc_create_time), "%ld", pools[i].create_time);
708 snprintf(majorversion, sizeof(majorversion), "%d", pools[i].pool_majorversion);
709 snprintf(minorversion, sizeof(minorversion), "%d", pools[i].pool_minorversion);
710 snprintf(pool_counter, sizeof(pool_counter), "%d", pools[i].pool_counter);
711 snprintf(backend_id, sizeof(backend_pid), "%d", pools[i].backend_id);
712 snprintf(backend_pid, sizeof(backend_pid), "%d", pools[i].pool_backendpid);
713 snprintf(connected, sizeof(connected), "%d", pools[i].pool_connected);
714
715 pcp_write(frontend, "p", 1);
716 wsize = htonl( sizeof(code) +
717 strlen(proc_pid)+1 +
718 strlen(pools[i].database)+1 +
719 strlen(pools[i].username)+1 +
720 strlen(proc_start_time)+1 +
721 strlen(proc_create_time)+1 +
722 strlen(majorversion)+1 +
723 strlen(minorversion)+1 +
724 strlen(pool_counter)+1 +
725 strlen(backend_id)+1 +
726 strlen(backend_pid)+1 +
727 strlen(connected)+1 +
728 sizeof(int));
729 pcp_write(frontend, &wsize, sizeof(int));
730 pcp_write(frontend, code, sizeof(code));
731 pcp_write(frontend, proc_pid, strlen(proc_pid)+1);
732 pcp_write(frontend, pools[i].database, strlen(pools[i].database)+1);
733 pcp_write(frontend, pools[i].username, strlen(pools[i].username)+1);
734 pcp_write(frontend, proc_start_time, strlen(proc_start_time)+1);
735 pcp_write(frontend, proc_create_time, strlen(proc_create_time)+1);
736 pcp_write(frontend, majorversion, strlen(majorversion)+1);
737 pcp_write(frontend, minorversion, strlen(minorversion)+1);
738 pcp_write(frontend, pool_counter, strlen(pool_counter)+1);
739 pcp_write(frontend, backend_id, strlen(backend_id)+1);
740 pcp_write(frontend, backend_pid, strlen(backend_pid)+1);
741 pcp_write(frontend, connected, strlen(connected)+1);
742 do_pcp_flush(frontend);
743 }
744
745 pcp_write(frontend, "p", 1);
746 wsize = htonl(sizeof(fin_code) +
747 sizeof(int));
748 pcp_write(frontend, &wsize, sizeof(int));
749 pcp_write(frontend, fin_code, sizeof(fin_code));
750 do_pcp_flush(frontend);
751 ereport(DEBUG1,
752 (errmsg("PCP informing process info"),
753 errdetail("retrieved process information from shared memory")));
754
755 pfree(pools);
756 }
757 }
758
759 static void
inform_watchdog_info(PCP_CONNECTION * frontend,char * buf)760 inform_watchdog_info(PCP_CONNECTION *frontend,char *buf)
761 {
762 int wd_index;
763 int json_data_len;
764 int wsize;
765 char code[] = "CommandComplete";
766 char* json_data;
767
768 if (!pool_config->use_watchdog)
769 ereport(ERROR,
770 (errmsg("PCP: informing watchdog info failed"),
771 errdetail("watcdhog is not enabled")));
772
773 wd_index = atoi(buf);
774
775 json_data = wd_get_watchdog_nodes(wd_index);
776 if (json_data == NULL)
777 ereport(ERROR,
778 (errmsg("PCP: informing watchdog info failed"),
779 errdetail("invalid watchdog index")));
780
781 ereport(DEBUG2,
782 (errmsg("PCP: informing watchdog info"),
783 errdetail("retrieved node information from IPC socket")));
784
785 /*
786 * This is the voilation of PCP protocol but I think
787 * in future we should shift to more adaptable protocol for
788 * data transmition.
789 */
790 json_data_len = strlen(json_data);
791 wsize = htonl(sizeof(code) +
792 json_data_len+ 1 +
793 sizeof(int));
794 pcp_write(frontend, "w", 1);
795
796 pcp_write(frontend, &wsize, sizeof(int));
797 pcp_write(frontend, code, sizeof(code));
798
799 pcp_write(frontend, json_data, json_data_len +1);
800 do_pcp_flush(frontend);
801
802 pfree(json_data);
803 }
804
805 static void
inform_node_info(PCP_CONNECTION * frontend,char * buf)806 inform_node_info(PCP_CONNECTION *frontend,char *buf)
807 {
808 int node_id;
809 int wsize;
810 char port_str[6];
811 char status[2];
812 char weight_str[20];
813 char role_str[10];
814 char code[] = "CommandComplete";
815 BackendInfo *bi = NULL;
816 SERVER_ROLE role;
817
818 node_id = atoi(buf);
819
820 bi = pool_get_node_info(node_id);
821
822 if (bi == NULL)
823 ereport(ERROR,
824 (errmsg("informing node info failed"),
825 errdetail("invalid node ID")));
826
827 ereport(DEBUG2,
828 (errmsg("PCP: informing node info"),
829 errdetail("retrieved node information from shared memory")));
830
831 snprintf(port_str, sizeof(port_str), "%d", bi->backend_port);
832 snprintf(status, sizeof(status), "%d", bi->backend_status);
833 snprintf(weight_str, sizeof(weight_str), "%f", bi->backend_weight);
834
835 if (STREAM)
836 {
837 if (Req_info->primary_node_id == node_id)
838 role = ROLE_PRIMARY;
839 else
840 role = ROLE_STANDBY;
841 }
842 else
843 {
844 if (Req_info->master_node_id == node_id)
845 role = ROLE_MASTER;
846 else
847 role = ROLE_SLAVE;
848 }
849 snprintf(role_str, sizeof(role_str), "%d", role);
850
851 pcp_write(frontend, "i", 1);
852 wsize = htonl(sizeof(code) +
853 strlen(bi->backend_hostname)+1 +
854 strlen(port_str)+1 +
855 strlen(status)+1 +
856 strlen(weight_str)+1 +
857 strlen(role_str)+1 +
858 sizeof(int));
859 pcp_write(frontend, &wsize, sizeof(int));
860 pcp_write(frontend, code, sizeof(code));
861 pcp_write(frontend, bi->backend_hostname, strlen(bi->backend_hostname)+1);
862 pcp_write(frontend, port_str, strlen(port_str)+1);
863 pcp_write(frontend, status, strlen(status)+1);
864 pcp_write(frontend, weight_str, strlen(weight_str)+1);
865
866 pcp_write(frontend, role_str, strlen(role_str)+1);
867 do_pcp_flush(frontend);
868 }
869
870 static void
inform_node_count(PCP_CONNECTION * frontend)871 inform_node_count(PCP_CONNECTION *frontend)
872 {
873 int wsize;
874 char mesg[16];
875 char code[] = "CommandComplete";
876 int node_count = pool_get_node_count();
877
878 snprintf(mesg, sizeof(mesg), "%d", node_count);
879
880 pcp_write(frontend, "l", 1);
881 wsize = htonl(sizeof(code) +
882 strlen(mesg)+1 +
883 sizeof(int));
884 pcp_write(frontend, &wsize, sizeof(int));
885 pcp_write(frontend, code, sizeof(code));
886 pcp_write(frontend, mesg, strlen(mesg)+1);
887 do_pcp_flush(frontend);
888
889 ereport(DEBUG1,
890 (errmsg("PCP: informing node count"),
891 errdetail("%d node(s) found", node_count)));
892 }
893
894 static void
process_detach_node(PCP_CONNECTION * frontend,char * buf,char tos)895 process_detach_node(PCP_CONNECTION *frontend,char *buf, char tos)
896 {
897 int node_id;
898 int wsize;
899 char code[] = "CommandComplete";
900 bool gracefully;
901
902 if (tos == 'D')
903 gracefully = false;
904 else
905 gracefully = true;
906
907 node_id = atoi(buf);
908 ereport(DEBUG1,
909 (errmsg("PCP: processing detach node"),
910 errdetail("detaching Node ID %d", node_id)));
911
912 pool_detach_node(node_id, gracefully);
913
914 pcp_write(frontend, "d", 1);
915 wsize = htonl(sizeof(code) + sizeof(int));
916 pcp_write(frontend, &wsize, sizeof(int));
917 pcp_write(frontend, code, sizeof(code));
918 do_pcp_flush(frontend);
919 }
920
921 static void
process_attach_node(PCP_CONNECTION * frontend,char * buf)922 process_attach_node(PCP_CONNECTION *frontend,char *buf)
923 {
924 int node_id;
925 int wsize;
926 char code[] = "CommandComplete";
927
928 node_id = atoi(buf);
929 ereport(DEBUG1,
930 (errmsg("PCP: processing attach node"),
931 errdetail("attaching Node ID %d", node_id)));
932
933 send_failback_request(node_id,true, REQ_DETAIL_CONFIRMED);
934
935 pcp_write(frontend, "c", 1);
936 wsize = htonl(sizeof(code) + sizeof(int));
937 pcp_write(frontend, &wsize, sizeof(int));
938 pcp_write(frontend, code, sizeof(code));
939 do_pcp_flush(frontend);
940 }
941
942
943 static void
process_recovery_request(PCP_CONNECTION * frontend,char * buf)944 process_recovery_request(PCP_CONNECTION *frontend,char *buf)
945 {
946 int wsize;
947 char code[] = "CommandComplete";
948 int node_id = atoi(buf);
949
950 if ( (node_id < 0) || (node_id >= pool_config->backend_desc->num_backends) )
951 ereport(ERROR,
952 (errmsg("process recovery request failed"),
953 errdetail("node id %d is not valid", node_id)));
954
955 if ((!REPLICATION &&
956 !(MASTER_SLAVE &&
957 pool_config->master_slave_sub_mode == STREAM_MODE)) ||
958 (MASTER_SLAVE &&
959 pool_config->master_slave_sub_mode == STREAM_MODE &&
960 node_id == PRIMARY_NODE_ID))
961 {
962 if (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE)
963 ereport(ERROR,
964 (errmsg("process recovery request failed"),
965 errdetail("primary server cannot be recovered by online recovery.")));
966 else
967 ereport(ERROR,
968 (errmsg("process recovery request failed"),
969 errdetail("recovery request is only allowed in replication and streaming replication modes.")));
970 }
971 else
972 {
973 if (pcp_mark_recovery_in_progress() == false)
974 ereport(FATAL,
975 (errmsg("process recovery request failed"),
976 errdetail("pgpool-II is already processing another recovery request.")));
977
978 ereport(DEBUG1,
979 (errmsg("PCP: processing recovery request"),
980 errdetail("start online recovery")));
981
982 PG_TRY();
983 {
984 start_recovery(node_id);
985 finish_recovery();
986 pcp_write(frontend, "c", 1);
987 wsize = htonl(sizeof(code) + sizeof(int));
988 pcp_write(frontend, &wsize, sizeof(int));
989 pcp_write(frontend, code, sizeof(code));
990 do_pcp_flush(frontend);
991 pcp_mark_recovery_finished();
992 }
993 PG_CATCH();
994 {
995 finish_recovery();
996 pcp_mark_recovery_finished();
997 PG_RE_THROW();
998
999 }PG_END_TRY();
1000 }
1001 do_pcp_flush(frontend);
1002 }
1003
1004 static void
process_status_request(PCP_CONNECTION * frontend)1005 process_status_request(PCP_CONNECTION *frontend)
1006 {
1007 int nrows = 0;
1008 int i;
1009 POOL_REPORT_CONFIG *status = get_config(&nrows);
1010 int len = 0;
1011 /* First, send array size of connection_info */
1012 char arr_code[] = "ArraySize";
1013 char code[] = "ProcessConfig";
1014 /* Finally, indicate that all data is sent */
1015 char fin_code[] = "CommandComplete";
1016
1017 pcp_write(frontend, "b", 1);
1018 len = htonl(sizeof(arr_code) + sizeof(int) + sizeof(int));
1019 pcp_write(frontend, &len, sizeof(int));
1020 pcp_write(frontend, arr_code, sizeof(arr_code));
1021 len = htonl(nrows);
1022 pcp_write(frontend, &len, sizeof(int));
1023
1024 do_pcp_flush(frontend);
1025
1026 for (i = 0; i < nrows; i++)
1027 {
1028 pcp_write(frontend, "b", 1);
1029 len = htonl(sizeof(int)
1030 + sizeof(code)
1031 + strlen(status[i].name) + 1
1032 + strlen(status[i].value) + 1
1033 + strlen(status[i].desc) + 1
1034 );
1035
1036 pcp_write(frontend, &len, sizeof(int));
1037 pcp_write(frontend, code, sizeof(code));
1038 pcp_write(frontend, status[i].name, strlen(status[i].name)+1);
1039 pcp_write(frontend, status[i].value, strlen(status[i].value)+1);
1040 pcp_write(frontend, status[i].desc, strlen(status[i].desc)+1);
1041 }
1042
1043 pcp_write(frontend, "b", 1);
1044 len = htonl(sizeof(fin_code) + sizeof(int));
1045 pcp_write(frontend, &len, sizeof(int));
1046 pcp_write(frontend, fin_code, sizeof(fin_code));
1047 do_pcp_flush(frontend);
1048
1049 pfree(status);
1050 ereport(DEBUG1,
1051 (errmsg("PCP: processing status request"),
1052 errdetail("retrieved status information")));
1053 }
1054
1055 static void
process_promote_node(PCP_CONNECTION * frontend,char * buf,char tos)1056 process_promote_node(PCP_CONNECTION *frontend, char *buf, char tos)
1057 {
1058 int node_id;
1059 int wsize;
1060 char code[] = "CommandComplete";
1061 bool gracefully;
1062
1063 if (tos == 'J')
1064 gracefully = false;
1065 else
1066 gracefully = true;
1067
1068 node_id = atoi(buf);
1069 if ( (node_id < 0) || (node_id >= pool_config->backend_desc->num_backends) )
1070 ereport(ERROR,
1071 (errmsg("could not process recovery request"),
1072 errdetail("node id %d is not valid", node_id)));
1073 /* promoting node is reserved to Streaming Replication */
1074 if (!MASTER_SLAVE || pool_config->master_slave_sub_mode != STREAM_MODE)
1075 {
1076 ereport(FATAL,
1077 (errmsg("invalid pgpool mode for process recovery request"),
1078 errdetail("not in streaming replication mode, can't promote node id %d", node_id)));
1079
1080 }
1081
1082 if (node_id == REAL_PRIMARY_NODE_ID)
1083 {
1084 ereport(FATAL,
1085 (errmsg("invalid pgpool mode for process recovery request"),
1086 errdetail("specified node is already primary node, can't promote node id %d", node_id)));
1087
1088 }
1089 ereport(DEBUG1,
1090 (errmsg("PCP: processing promote node"),
1091 errdetail("promoting Node ID %d", node_id)));
1092 pool_promote_node(node_id, gracefully);
1093
1094 pcp_write(frontend, "d", 1);
1095 wsize = htonl(sizeof(code) + sizeof(int));
1096 pcp_write(frontend, &wsize, sizeof(int));
1097 pcp_write(frontend, code, sizeof(code));
1098 do_pcp_flush(frontend);
1099 }
1100
1101 static void
process_authentication(PCP_CONNECTION * frontend,char * buf,char * salt,int * random_salt)1102 process_authentication(PCP_CONNECTION *frontend, char *buf, char* salt, int *random_salt)
1103 {
1104 int wsize;
1105 int authenticated;
1106
1107 if (*random_salt)
1108 {
1109 authenticated = user_authenticate(buf, pcp_conf_file, salt, 4);
1110 }
1111 if (!*random_salt || !authenticated)
1112 {
1113 ereport(FATAL,
1114 (errmsg("authentication failed"),
1115 errdetail("username and/or password does not match")));
1116
1117 *random_salt = 0;
1118 }
1119 else
1120 {
1121 char code[] = "AuthenticationOK";
1122 pcp_write(frontend, "r", 1);
1123 wsize = htonl(sizeof(code) + sizeof(int));
1124 pcp_write(frontend, &wsize, sizeof(int));
1125 pcp_write(frontend, code, sizeof(code));
1126 do_pcp_flush(frontend);
1127 *random_salt = 0;
1128
1129 ereport(DEBUG1,
1130 (errmsg("PCP: processing authentication request"),
1131 errdetail("authentication OK")));
1132 }
1133 }
1134
1135 static void
send_md5salt(PCP_CONNECTION * frontend,char * salt)1136 send_md5salt(PCP_CONNECTION *frontend, char* salt)
1137 {
1138 int wsize;
1139 ereport(DEBUG1,
1140 (errmsg("PCP: sending md5 salt to client")));
1141
1142 pool_random_salt(salt);
1143
1144 pcp_write(frontend, "m", 1);
1145 wsize = htonl(sizeof(int) + 4);
1146 pcp_write(frontend, &wsize, sizeof(int));
1147 pcp_write(frontend, salt, 4);
1148 do_pcp_flush(frontend);
1149 }
1150
1151 static void
process_shutown_request(PCP_CONNECTION * frontend,char mode)1152 process_shutown_request(PCP_CONNECTION *frontend, char mode)
1153 {
1154 char code[] = "CommandComplete";
1155 pid_t ppid = getppid();
1156 int sig,len;
1157
1158 if (mode == 's')
1159 {
1160 ereport(DEBUG1,
1161 (errmsg("PCP: processing shutdown request"),
1162 errdetail("sending SIGTERM to the parent process with PID:%d", ppid)));
1163 sig = SIGTERM;
1164 }
1165 else if (mode == 'f')
1166 {
1167 ereport(DEBUG1,
1168 (errmsg("PCP: processing shutdown request"),
1169 errdetail("sending SIGINT to the parent process with PID:%d", ppid)));
1170 sig = SIGINT;
1171 }
1172 else if (mode == 'i')
1173 {
1174 ereport(DEBUG1,
1175 (errmsg("PCP: processing shutdown request"),
1176 errdetail("sending SIGQUIT to the parent process with PID:%d", ppid)));
1177 sig = SIGQUIT;
1178 }
1179 else
1180 {
1181 ereport(ERROR,
1182 (errmsg("PCP: error while processing shutdown request"),
1183 errdetail("invalid shutdown mode \"%c\"", mode)));
1184 }
1185
1186 pcp_write(frontend, "t", 1);
1187 len = htonl(sizeof(code) + sizeof(int));
1188 pcp_write(frontend, &len, sizeof(int));
1189 pcp_write(frontend, code, sizeof(code));
1190 do_pcp_flush(frontend);
1191
1192 pool_signal_parent(sig);
1193 }
1194
1195 static void
process_set_configration_parameter(PCP_CONNECTION * frontend,char * buf,int len)1196 process_set_configration_parameter(PCP_CONNECTION *frontend,char *buf, int len)
1197 {
1198 char* param_name;
1199 char* param_value;
1200 int wsize;
1201 char code[] = "CommandComplete";
1202
1203 param_name = buf;
1204 if(param_name == NULL)
1205 ereport(ERROR,
1206 (errmsg("PCP: set configuration parameter failed"),
1207 errdetail("invalid pcp packet received from client")));
1208
1209 param_value = (char *) memchr(buf, '\0', len);
1210 if(param_value == NULL)
1211 ereport(ERROR,
1212 (errmsg("set configuration parameter failed"),
1213 errdetail("invalid pcp packet received from client")));
1214
1215 param_value +=1;
1216 ereport(LOG,
1217 (errmsg("set configuration parameter, \"%s TO %s\"",param_name,param_value)));
1218
1219 if(strcasecmp(param_name, "client_min_messages") == 0)
1220 {
1221 const char *ordered_valid_values[] = {"debug5","debug4","debug3","debug2","debug1","log","commerror","info","notice","warning","error",NULL};
1222 bool found = false;
1223 int i;
1224 for(i=0; ; i++)
1225 {
1226 char* valid_val = (char*)ordered_valid_values[i];
1227 if(!valid_val)
1228 break;
1229
1230 if (!strcasecmp(param_value, valid_val))
1231 {
1232 found = true;
1233 pool_config->client_min_messages = i + 10;
1234 ereport(DEBUG1,
1235 (errmsg("PCP setting parameter \"%s\" to \"%s\"",param_name,param_value)));
1236 break;
1237 }
1238 }
1239 if (!found)
1240 ereport(ERROR,
1241 (errmsg("PCP: set configuration parameter failed"),
1242 errdetail("invalid value \"%s\" for parameter \"%s\"",param_value,param_name)));
1243 }
1244 else
1245 ereport(ERROR,
1246 (errmsg("PCP: set configuration parameter failed"),
1247 errdetail("invalid parameter \"%s\"",param_name)));
1248
1249 pcp_write(frontend, "a", 1);
1250 wsize = htonl(sizeof(code) + sizeof(int));
1251 pcp_write(frontend, &wsize, sizeof(int));
1252 pcp_write(frontend, code, sizeof(code));
1253 do_pcp_flush(frontend);
1254 }
1255 /*
1256 * Wrapper around pcp_flush which throws FATAL error when pcp_flush fails
1257 */
1258 static void
do_pcp_flush(PCP_CONNECTION * frontend)1259 do_pcp_flush(PCP_CONNECTION *frontend)
1260 {
1261 if (pcp_flush(frontend) < 0)
1262 ereport(FATAL,
1263 (errmsg("failed to flush data to client"),
1264 errdetail("pcp_flush failed with error : \"%s\"",strerror(errno))));
1265 }
1266
1267 /*
1268 * Wrapper around pcp_read which throws FATAL error when read fails
1269 */
1270 static void
do_pcp_read(PCP_CONNECTION * pc,void * buf,int len)1271 do_pcp_read(PCP_CONNECTION *pc, void *buf, int len)
1272 {
1273 if (pcp_read(pc,buf,len))
1274 ereport(FATAL,
1275 (errmsg("unable to read from client"),
1276 errdetail("pcp_read failed with error : \"%s\"",strerror(errno))));
1277 }
1278
send_to_pcp_frontend(char * data,int len,bool flush)1279 int send_to_pcp_frontend(char* data, int len, bool flush)
1280 {
1281 int ret;
1282 if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1283 return -1;
1284 ret = pcp_write(pcp_frontend, data, len);
1285 if (flush && !ret)
1286 ret = pcp_flush(pcp_frontend);
1287 return ret;
1288 }
1289
pcp_frontend_exists(void)1290 int pcp_frontend_exists(void)
1291 {
1292 if (processType != PT_PCP_WORKER || pcp_frontend == NULL)
1293 return -1;
1294 return 0;
1295 }
1296
pcp_worker_will_go_down(int code,Datum arg)1297 static void pcp_worker_will_go_down(int code, Datum arg)
1298 {
1299 if (processType != PT_PCP_WORKER)
1300 {
1301 /* should never happen */
1302 ereport(WARNING,
1303 (errmsg("pcp_worker_will_go_down called from invalid process")));
1304 return;
1305 }
1306 if(pcp_frontend)
1307 pcp_close(pcp_frontend);
1308 processState = EXITING;
1309 POOL_SETMASK(&UnBlockSig);
1310
1311 }
1312