1 /*
2 * $Header$
3 *
4 * Handles PCP connection, and protocol communication with pgpool-II
5 * These are client APIs. Server program should use APIs in pcp_stream.c
6 *
7 *
8 * pgpool: a language independent connection pool server for PostgreSQL
9 * written by Tatsuo Ishii
10 *
11 * Copyright (c) 2003-2019 PgPool Global Development Group
12 *
13 * Permission to use, copy, modify, and distribute this software and
14 * its documentation for any purpose and without fee is hereby
15 * granted, provided that the above copyright notice appear in all
16 * copies and that both that copyright notice and this permission
17 * notice appear in supporting documentation, and that the name of the
18 * author not be used in advertising or publicity pertaining to
19 * distribution of the software without specific, written prior
20 * permission. The author makes no representations about the
21 * suitability of this software for any purpose. It is provided "as
22 * is" without express or implied warranty.
23 *
24 */
25
26 #include <stdio.h>
27 #include <errno.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <sys/socket.h>
33 #include <sys/time.h>
34 #include <sys/un.h>
35 #include <netinet/in.h>
36 #include <netinet/tcp.h>
37 #include <netdb.h>
38 #include <unistd.h>
39 #include <stdarg.h>
40
41 #include "pool.h"
42 #include "pcp/pcp.h"
43 #include "pcp/pcp_stream.h"
44 #include "utils/pool_path.h"
45 #include "utils/palloc.h"
46 #include "utils/pool_process_reporting.h"
47 #include "utils/json.h"
48 #include "auth/md5.h"
49
50 #define PCPPASSFILE ".pcppass"
51 #define DefaultHost "localhost"
52
53
54 static int pcp_authorize(PCPConnInfo * pcpConn, char *username, char *password);
55
56 static void pcp_internal_error(PCPConnInfo * pcpConn, const char *fmt,...);
57
58 static PCPResultInfo * _pcp_detach_node(PCPConnInfo * pcpConn, int nid, bool gracefully);
59 static PCPResultInfo * _pcp_promote_node(PCPConnInfo * pcpConn, int nid, bool gracefully);
60 static PCPResultInfo * process_pcp_response(PCPConnInfo * pcpConn, char sentMsg);
61 static void setCommandSuccessful(PCPConnInfo * pcpConn);
62 static void setResultStatus(PCPConnInfo * pcpConn, ResultStateType resultState);
63 static void setResultBinaryData(PCPResultInfo * res, unsigned int slotno, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *));
64 static int setNextResultBinaryData(PCPResultInfo * res, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *));
65 static void setResultIntData(PCPResultInfo * res, unsigned int slotno, int value);
66
67 static void process_node_info_response(PCPConnInfo * pcpConn, char *buf, int len);
68 static void process_command_complete_response(PCPConnInfo * pcpConn, char *buf, int len);
69 static void process_watchdog_info_response(PCPConnInfo * pcpConn, char *buf, int len);
70 static void process_process_info_response(PCPConnInfo * pcpConn, char *buf, int len);
71 static void process_pool_status_response(PCPConnInfo * pcpConn, char *buf, int len);
72 static void process_pcp_node_count_response(PCPConnInfo * pcpConn, char *buf, int len);
73 static void process_process_count_response(PCPConnInfo * pcpConn, char *buf, int len);
74 static void process_salt_info_response(PCPConnInfo * pcpConn, char *buf, int len);
75 static void process_error_response(PCPConnInfo * pcpConn, char toc, char *buff);
76
77
78 static void setResultSlotCount(PCPConnInfo * pcpConn, unsigned int slotCount);
79 static void free_processInfo(struct PCPConnInfo *pcpConn, void *ptr);
80 static int PCPFlush(PCPConnInfo * pcpConn);
81
82 static bool getPoolPassFilename(char *pgpassfile);
83 static char *PasswordFromFile(PCPConnInfo * pcpConn, char *hostname, char *port, char *username);
84 static char *pwdfMatchesString(char *buf, char *token);
85
86 /* --------------------------------
87 * pcp_connect - open connection to pgpool using given arguments
88 *
89 * return 0 on success, -1 otherwise
90 * --------------------------------
91 */
92
93 /* Check if PCP connection is connected and authenticated
94 * return 1 on successfull 0 otherwise
95 */
96
97 PCPConnInfo *
pcp_connect(char * hostname,int port,char * username,char * password,FILE * Pfdebug)98 pcp_connect(char *hostname, int port, char *username, char *password, FILE *Pfdebug)
99 {
100 struct sockaddr_in addr;
101 struct sockaddr_un unix_addr;
102 struct hostent *hp;
103 char *password_fron_file = NULL;
104 char os_user[256];
105 PCPConnInfo *pcpConn = palloc0(sizeof(PCPConnInfo));
106 int fd;
107 int on = 1;
108 int len;
109
110 pcpConn->connState = PCP_CONNECTION_NOT_CONNECTED;
111 pcpConn->Pfdebug = Pfdebug;
112
113 if (hostname == NULL || *hostname == '\0' || *hostname == '/')
114 {
115 char *path;
116
117 fd = socket(AF_UNIX, SOCK_STREAM, 0);
118
119 if (fd < 0)
120 {
121 pcp_internal_error(pcpConn,
122 "ERROR: failed to create UNIX domain socket. socket error \"%s\"", strerror(errno));
123 pcpConn->connState = PCP_CONNECTION_BAD;
124
125 return pcpConn;
126 }
127
128 memset(&unix_addr, 0, sizeof(unix_addr));
129 unix_addr.sun_family = AF_UNIX;
130
131 if (hostname == NULL || *hostname == '\0')
132 {
133 path = UNIX_DOMAIN_PATH;
134 hostname = path;
135 }
136 else
137 {
138 path = hostname;
139 }
140
141 snprintf(unix_addr.sun_path, sizeof(unix_addr.sun_path), "%s/.s.PGSQL.%d",
142 path, port);
143
144 if (connect(fd, (struct sockaddr *) &unix_addr, sizeof(unix_addr)) < 0)
145 {
146 close(fd);
147
148 pcp_internal_error(pcpConn,
149 "ERROR: connection to socket \"%s\" failed with error \"%s\"", unix_addr.sun_path, strerror(errno));
150 pcpConn->connState = PCP_CONNECTION_BAD;
151 return pcpConn;
152 }
153 }
154 else
155 {
156 fd = socket(AF_INET, SOCK_STREAM, 0);
157 if (fd < 0)
158 {
159 pcp_internal_error(pcpConn,
160 "ERROR: failed to create INET domain socket with error \"%s\"", strerror(errno));
161 pcpConn->connState = PCP_CONNECTION_BAD;
162 return pcpConn;
163 }
164
165 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
166 (char *) &on, sizeof(on)) < 0)
167 {
168 close(fd);
169
170 pcp_internal_error(pcpConn,
171 "ERROR: set socket option failed with error \"%s\"", strerror(errno));
172 pcpConn->connState = PCP_CONNECTION_BAD;
173 return pcpConn;
174 }
175
176 memset((char *) &addr, 0, sizeof(addr));
177 addr.sin_family = AF_INET;
178 hp = gethostbyname(hostname);
179 if ((hp == NULL) || (hp->h_addrtype != AF_INET))
180 {
181 close(fd);
182 pcp_internal_error(pcpConn,
183 "ERROR: could not retrieve hostname. gethostbyname failed with error \"%s\"", strerror(errno));
184 pcpConn->connState = PCP_CONNECTION_BAD;
185 return pcpConn;
186
187 }
188 memmove((char *) &(addr.sin_addr),
189 (char *) hp->h_addr,
190 hp->h_length);
191 addr.sin_port = htons(port);
192
193 len = sizeof(struct sockaddr_in);
194 if (connect(fd, (struct sockaddr *) &addr, len) < 0)
195 {
196 close(fd);
197 pcp_internal_error(pcpConn,
198 "ERROR: connection to host \"%s\" failed with error \"%s\"", hostname, strerror(errno));
199 pcpConn->connState = PCP_CONNECTION_BAD;
200 return pcpConn;
201 }
202 }
203
204 pcpConn->pcpConn = pcp_open(fd);
205 if (pcpConn->pcpConn == NULL)
206 {
207 close(fd);
208 pcp_internal_error(pcpConn,
209 "ERROR: failed to allocate memory");
210 pcpConn->connState = PCP_CONNECTION_BAD;
211 return pcpConn;
212 }
213 pcpConn->connState = PCP_CONNECTION_CONNECTED;
214
215 /*
216 * If username is not provided. Use the os user name and do not complain
217 * if it (getting os user name) gets failed
218 */
219 if (username == NULL && get_os_username(os_user, sizeof(os_user)))
220 username = os_user;
221
222 /*
223 * If password is not provided. lookup in pcppass file
224 */
225 if (password == NULL || *password == '\0')
226 {
227 char port_str[100];
228
229 snprintf(port_str, sizeof(port_str), "%d", port);
230 password_fron_file = PasswordFromFile(pcpConn, hostname, port_str, username);
231 password = password_fron_file;
232 }
233
234 if (pcp_authorize(pcpConn, username, password) < 0)
235 {
236 pcp_close(pcpConn->pcpConn);
237 pcpConn->pcpConn = NULL;
238 pcpConn->connState = PCP_CONNECTION_AUTH_ERROR;
239 }
240 else
241 pcpConn->connState = PCP_CONNECTION_OK;
242
243 if (password_fron_file)
244 pfree(password_fron_file);
245
246 return pcpConn;
247 }
248
249 static void
process_salt_info_response(PCPConnInfo * pcpConn,char * buf,int len)250 process_salt_info_response(PCPConnInfo * pcpConn, char *buf, int len)
251 {
252 char *salt = palloc((sizeof(char) * 4));
253
254 memcpy(salt, buf, 4);
255 if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) salt, 4, NULL) < 0)
256 {
257 pcp_internal_error(pcpConn,
258
259 "command failed. invalid response");
260 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
261 }
262 else
263 {
264 setCommandSuccessful(pcpConn);
265 }
266 }
267
268 /* --------------------------------
269 * pcp_authorize - authenticate with pgpool using username and password
270 *
271 * return 0 on success, -1 otherwise
272 * --------------------------------
273 */
274 static int
pcp_authorize(PCPConnInfo * pcpConn,char * username,char * password)275 pcp_authorize(PCPConnInfo * pcpConn, char *username, char *password)
276 {
277 int wsize;
278 char salt[4];
279 char *salt_ptr;
280 char encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
281 char md5[MD5_PASSWD_LEN + 1];
282 PCPResultInfo *pcpRes;
283
284 if (password == NULL)
285 password = "";
286
287 if (username == NULL)
288 username = "";
289
290 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_CONNECTED)
291 {
292 pcp_internal_error(pcpConn,
293 "ERROR: PCP authorization failed. invalid connection state.");
294 return -1;
295 }
296
297 if (strlen(username) >= MAX_USER_PASSWD_LEN)
298 {
299 pcp_internal_error(pcpConn,
300 "ERROR: PCP authorization failed. username too long.");
301 return -1;
302 }
303
304 /* request salt */
305 pcp_write(pcpConn->pcpConn, "M", 1);
306 wsize = htonl(sizeof(int));
307 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
308 if (PCPFlush(pcpConn) < 0)
309 return -1;
310
311 pcpRes = process_pcp_response(pcpConn, 'M');
312 if (PCPResultStatus(pcpRes) != PCP_RES_COMMAND_OK)
313 return -1;
314
315 salt_ptr = pcp_get_binary_data(pcpRes, 0);
316 if (salt_ptr == NULL)
317 return -1;
318 memcpy(salt, salt_ptr, 4);
319
320 /* encrypt password */
321 pool_md5_hash(password, strlen(password), md5);
322 md5[MD5_PASSWD_LEN] = '\0';
323
324 pool_md5_encrypt(md5, username, strlen(username),
325 encrypt_buf + MD5_PASSWD_LEN + 1);
326 encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
327
328 pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, 4,
329 encrypt_buf);
330 encrypt_buf[MD5_PASSWD_LEN] = '\0';
331
332 pcp_write(pcpConn->pcpConn, "R", 1);
333 wsize = htonl((strlen(username) + 1 + strlen(encrypt_buf) + 1) + sizeof(int));
334 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
335 pcp_write(pcpConn->pcpConn, username, strlen(username) + 1);
336 pcp_write(pcpConn->pcpConn, encrypt_buf, strlen(encrypt_buf) + 1);
337 if (PCPFlush(pcpConn) < 0)
338 return -1;
339 pcpRes = process_pcp_response(pcpConn, 'R');
340 if (PCPResultStatus(pcpRes) != PCP_RES_COMMAND_OK)
341 return -1;
342 pcp_free_result(pcpConn);
343 return 0;
344 }
345
process_pcp_response(PCPConnInfo * pcpConn,char sentMsg)346 static PCPResultInfo * process_pcp_response(PCPConnInfo * pcpConn, char sentMsg)
347 {
348 char toc;
349 int rsize;
350 char *buf;
351
352 /* create empty result */
353 if (pcpConn->pcpResInfo == NULL)
354 {
355 pcpConn->pcpResInfo = palloc0(sizeof(PCPResultInfo));
356 pcpConn->pcpResInfo->resultSlots = 1;
357 }
358
359 while (1)
360 {
361 if (pcp_read(pcpConn->pcpConn, &toc, 1))
362 {
363 pcp_internal_error(pcpConn,
364 "ERROR: unable to read data from socket.");
365 setResultStatus(pcpConn, PCP_RES_ERROR);
366 return pcpConn->pcpResInfo;
367 }
368 if (pcp_read(pcpConn->pcpConn, &rsize, sizeof(int)))
369 {
370 pcp_internal_error(pcpConn,
371 "ERROR: unable to read data from socket.");
372 setResultStatus(pcpConn, PCP_RES_ERROR);
373 return pcpConn->pcpResInfo;
374 }
375 rsize = ntohl(rsize);
376 buf = (char *) palloc(rsize);
377
378 if (pcp_read(pcpConn->pcpConn, buf, rsize - sizeof(int)))
379 {
380 pfree(buf);
381 pcp_internal_error(pcpConn,
382 "ERROR: unable to read data from socket.");
383 setResultStatus(pcpConn, PCP_RES_ERROR);
384 return pcpConn->pcpResInfo;
385 }
386
387 if (pcpConn->Pfdebug)
388 fprintf(pcpConn->Pfdebug, "DEBUG: recv: tos=\"%c\", len=%d\n", toc, rsize);
389
390 switch (toc)
391 {
392 case 'r': /* Authentication Response */
393 {
394 if (sentMsg != 'R')
395 {
396 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
397 }
398 else if (strcmp(buf, "AuthenticationOK") == 0)
399 {
400 pcpConn->connState = PCP_CONNECTION_OK;
401 setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
402 }
403 else
404 {
405 pcp_internal_error(pcpConn,
406 "ERROR: authentication failed. reason=\"%s\"", buf);
407 setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
408 }
409 }
410 break;
411 case 'm':
412 if (sentMsg != 'M')
413 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
414 else
415 process_salt_info_response(pcpConn, buf, rsize);
416 break;
417
418 case 'E':
419 setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
420 process_error_response(pcpConn, toc, buf);
421 break;
422
423 case 'N':
424 process_error_response(pcpConn, toc, buf);
425 pfree(buf);
426 continue;
427 break;
428
429 case 'i':
430 if (sentMsg != 'I')
431 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
432 else
433 process_node_info_response(pcpConn, buf, rsize);
434 break;
435
436 case 'l':
437 if (sentMsg != 'L')
438 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
439 else
440 process_pcp_node_count_response(pcpConn, buf, rsize);
441 break;
442
443 case 'c':
444 if (sentMsg != 'C' && sentMsg != 'O')
445 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
446 else
447 process_command_complete_response(pcpConn, buf, rsize);
448 break;
449
450 case 'd':
451 if (sentMsg != 'D' && sentMsg != 'J')
452 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
453 else
454 process_command_complete_response(pcpConn, buf, rsize);
455 break;
456
457 case 'a':
458 if (sentMsg != 'A')
459 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
460 else
461 process_command_complete_response(pcpConn, buf, rsize);
462 break;
463
464 case 'w':
465 if (sentMsg != 'W')
466 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
467 else
468 process_watchdog_info_response(pcpConn, buf, rsize);
469 break;
470
471 case 'p':
472 if (sentMsg != 'P')
473 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
474 else
475 process_process_info_response(pcpConn, buf, rsize);
476 break;
477
478 case 'n':
479 if (sentMsg != 'N')
480 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
481 else
482 process_process_count_response(pcpConn, buf, rsize);
483 break;
484
485 case 'b':
486 if (sentMsg != 'B')
487 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
488 else
489 process_pool_status_response(pcpConn, buf, rsize);
490 break;
491
492 case 't':
493 if (sentMsg != 'T')
494 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
495 else
496 setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
497 break;
498
499 default:
500 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
501 pcp_internal_error(pcpConn,
502 "ERROR: invalid PCP packet type =\"%c\"", toc);
503 break;
504 }
505 pfree(buf);
506 if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
507 break;
508 }
509 return pcpConn->pcpResInfo;
510 }
511
512 static void
process_error_response(PCPConnInfo * pcpConn,char toc,char * buf)513 process_error_response(PCPConnInfo * pcpConn, char toc, char *buf)
514 {
515 /* For time we only support sev, error message and details */
516 char *errorSev = NULL;
517 char *errorMsg = NULL;
518 char *errorDet = NULL;
519 char *e = buf;
520
521 if (toc != 'E' && toc != 'N')
522 return;
523
524 while (*e)
525 {
526 char type = *e;
527
528 e++;
529 if (*e == 0)
530 break;
531
532 if (type == 'M')
533 errorMsg = e;
534 else if (type == 'S')
535 errorSev = e;
536 else if (type == 'D')
537 errorDet = e;
538 else
539 e += strlen(e) + 1;
540 if (errorDet && errorSev && errorMsg) /* we have all what we need */
541 break;
542 }
543 if (!errorSev && !errorMsg)
544 return;
545
546 if (toc != 'E') /* This is not an error report it as debug */
547 {
548 if (pcpConn->Pfdebug)
549 fprintf(pcpConn->Pfdebug,
550 "BACKEND %s: %s\n%s%s%s", errorSev, errorMsg,
551 errorDet ? "DETAIL: " : "",
552 errorDet ? errorDet : "",
553 errorDet ? "\n" : "");
554 }
555 else
556 {
557 pcp_internal_error(pcpConn,
558 "%s: %s\n%s%s%s", errorSev, errorMsg,
559 errorDet ? "DETAIL: " : "",
560 errorDet ? errorDet : "",
561 errorDet ? "\n" : "");
562 setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
563
564 }
565 }
566
567 /* --------------------------------
568 * pcp_disconnect - close connection to pgpool
569 * --------------------------------
570 */
571 void
pcp_disconnect(PCPConnInfo * pcpConn)572 pcp_disconnect(PCPConnInfo * pcpConn)
573 {
574 int wsize;
575
576 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
577 {
578 pcp_internal_error(pcpConn, "invalid PCP connection");
579 return;
580 }
581
582 pcp_write(pcpConn->pcpConn, "X", 1);
583 wsize = htonl(sizeof(int));
584 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
585 if (PCPFlush(pcpConn) < 0)
586 return;
587 if (pcpConn->Pfdebug)
588 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"X\", len=%d\n", (int) sizeof(int));
589
590 pcp_close(pcpConn->pcpConn);
591 pcpConn->connState = PCP_CONNECTION_NOT_CONNECTED;
592 pcpConn->pcpConn = NULL;
593 }
594
595 /* --------------------------------
596 * pcp_terminate_pgpool - send terminate packet
597 *
598 * return 0 on success, -1 otherwise
599 * --------------------------------
600 */
601 PCPResultInfo *
pcp_terminate_pgpool(PCPConnInfo * pcpConn,char mode)602 pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode)
603 {
604 int wsize;
605
606 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
607 {
608 pcp_internal_error(pcpConn, "invalid PCP connection");
609 return NULL;
610 }
611 pcp_write(pcpConn->pcpConn, "T", 1);
612 wsize = htonl(sizeof(int) + sizeof(char));
613 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
614 pcp_write(pcpConn->pcpConn, &mode, sizeof(char));
615 if (PCPFlush(pcpConn) < 0)
616 return NULL;
617 if (pcpConn->Pfdebug)
618 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"T\", len=%d\n", ntohl(wsize));
619
620 return process_pcp_response(pcpConn, 'T');
621 }
622
623 static void
process_pcp_node_count_response(PCPConnInfo * pcpConn,char * buf,int len)624 process_pcp_node_count_response(PCPConnInfo * pcpConn, char *buf, int len)
625 {
626 if (strcmp(buf, "CommandComplete") == 0)
627 {
628 char *index = NULL;
629
630 index = (char *) memchr(buf, '\0', len);
631 if (index != NULL)
632 {
633 int ret;
634
635 index += 1;
636 ret = atoi(index);
637 setResultIntData(pcpConn->pcpResInfo, 0, ret);
638 setCommandSuccessful(pcpConn);
639 return;
640 }
641 else
642 pcp_internal_error(pcpConn,
643 "command failed. invalid response");
644 }
645 else
646 pcp_internal_error(pcpConn,
647 "command failed with reason: \"%s\"", buf);
648 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
649 }
650
651 /* --------------------------------
652 * pcp_node_count - get number of nodes currently connected to pgpool
653 *
654 * return array of node IDs on success, -1 otherwise
655 * --------------------------------
656 */
657 PCPResultInfo *
pcp_node_count(PCPConnInfo * pcpConn)658 pcp_node_count(PCPConnInfo * pcpConn)
659 {
660 int wsize;
661
662 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
663 {
664 pcp_internal_error(pcpConn,
665 "invalid PCP connection");
666 return NULL;
667 }
668 pcp_write(pcpConn->pcpConn, "L", 1);
669 wsize = htonl(sizeof(int));
670 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
671 if (PCPFlush(pcpConn) < 0)
672 return NULL;
673 if (pcpConn->Pfdebug)
674 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"L\", len=%d\n", ntohl(wsize));
675
676 return process_pcp_response(pcpConn, 'L');
677 }
678
679 static void
process_node_info_response(PCPConnInfo * pcpConn,char * buf,int len)680 process_node_info_response(PCPConnInfo * pcpConn, char *buf, int len)
681 {
682 BackendInfo *backend_info = NULL;
683
684 if (strcmp(buf, "CommandComplete") == 0)
685 {
686 char *index = NULL;
687
688 backend_info = (BackendInfo *) palloc(sizeof(BackendInfo));
689
690 index = (char *) memchr(buf, '\0', len);
691 if (index == NULL)
692 goto INVALID_RESPONSE;
693 index += 1;
694 strlcpy(backend_info->backend_hostname, index, sizeof(backend_info->backend_hostname));
695
696 index = (char *) memchr(index, '\0', len);
697 if (index == NULL)
698 goto INVALID_RESPONSE;
699 index += 1;
700 backend_info->backend_port = atoi(index);
701
702 index = (char *) memchr(index, '\0', len);
703 if (index == NULL)
704 goto INVALID_RESPONSE;
705 index += 1;
706 backend_info->backend_status = atoi(index);
707
708 index = (char *) memchr(index, '\0', len);
709 if (index == NULL)
710 goto INVALID_RESPONSE;
711 index += 1;
712 backend_info->backend_weight = atof(index);
713
714 index = (char *) memchr(index, '\0', len);
715 if (index == NULL)
716 goto INVALID_RESPONSE;
717
718 index++;
719 backend_info->role = atoi(index);
720
721 index = (char *) memchr(index, '\0', len);
722 if (index == NULL)
723 goto INVALID_RESPONSE;
724
725 index++;
726 backend_info->standby_delay = atol(index);
727
728 index = (char *) memchr(index, '\0', len);
729 if (index == NULL)
730 goto INVALID_RESPONSE;
731
732 index++;
733 backend_info->status_changed_time = atol(index);
734
735 index = (char *) memchr(index, '\0', len);
736 if (index == NULL)
737 goto INVALID_RESPONSE;
738
739 if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) backend_info, sizeof(BackendInfo), NULL) < 0)
740 goto INVALID_RESPONSE;
741
742 setCommandSuccessful(pcpConn);
743 }
744 else
745 {
746 pcp_internal_error(pcpConn,
747 "command failed with reason: \"%s\"", buf);
748 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
749 }
750
751 return;
752
753 INVALID_RESPONSE:
754
755 if (backend_info)
756 pfree(backend_info);
757 pcp_internal_error(pcpConn,
758 "command failed. invalid response");
759 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
760
761 }
762
763 /* --------------------------------
764 * pcp_node_info - get information of node pointed by given argument
765 *
766 * return structure of node information on success, -1 otherwise
767 * --------------------------------
768 */
769 PCPResultInfo *
pcp_node_info(PCPConnInfo * pcpConn,int nid)770 pcp_node_info(PCPConnInfo * pcpConn, int nid)
771 {
772 int wsize;
773 char node_id[16];
774
775 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
776 {
777 pcp_internal_error(pcpConn,
778 "invalid PCP connection");
779 return NULL;
780 }
781
782 snprintf(node_id, sizeof(node_id), "%d", nid);
783
784 pcp_write(pcpConn->pcpConn, "I", 1);
785 wsize = htonl(strlen(node_id) + 1 + sizeof(int));
786 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
787 pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
788 if (PCPFlush(pcpConn) < 0)
789 return NULL;
790 if (pcpConn->Pfdebug)
791 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"I\", len=%d\n", ntohl(wsize));
792
793 return process_pcp_response(pcpConn, 'I');
794 }
795
796 static void
process_process_count_response(PCPConnInfo * pcpConn,char * buf,int len)797 process_process_count_response(PCPConnInfo * pcpConn, char *buf, int len)
798 {
799 if (strcmp(buf, "CommandComplete") == 0)
800 {
801 int process_count;
802 int *process_list = NULL;
803 char *index = NULL;
804 int i;
805
806 index = (char *) memchr(buf, '\0', len);
807 if (index == NULL)
808 {
809 pcp_internal_error(pcpConn,
810 "command failed. invalid response");
811 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
812 return;
813 }
814 index += 1;
815 process_count = atoi(index);
816
817 process_list = (int *) palloc(sizeof(int) * process_count);
818
819 for (i = 0; i < process_count; i++)
820 {
821 index = (char *) memchr(index, '\0', len);
822 if (index == NULL)
823 {
824 pcp_internal_error(pcpConn,
825 "command failed. invalid response");
826 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
827 pfree(process_list);
828 return;
829 }
830 index += 1;
831 process_list[i] = atoi(index);
832 }
833 setResultSlotCount(pcpConn, 1);
834 if (setNextResultBinaryData(pcpConn->pcpResInfo, process_list, (sizeof(int) * process_count), NULL) < 0)
835 {
836 pcp_internal_error(pcpConn,
837 "command failed. invalid response");
838 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
839 }
840 else
841 {
842 setCommandSuccessful(pcpConn);
843 }
844 }
845 else
846 {
847 pcp_internal_error(pcpConn,
848 "command failed with reason: \"%s\"", buf);
849 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
850 }
851 }
852
853 /* --------------------------------
854 * pcp_node_count - get number of nodes currently connected to pgpool
855 *
856 * return array of pids on success, NULL otherwise
857 * --------------------------------
858 */
859
860 PCPResultInfo *
pcp_process_count(PCPConnInfo * pcpConn)861 pcp_process_count(PCPConnInfo * pcpConn)
862 {
863 int wsize;
864
865 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
866 {
867 pcp_internal_error(pcpConn, "invalid PCP connection");
868 return NULL;
869 }
870
871 pcp_write(pcpConn->pcpConn, "N", 1);
872 wsize = htonl(sizeof(int));
873 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
874 if (PCPFlush(pcpConn) < 0)
875 return NULL;
876 if (pcpConn->Pfdebug)
877 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"N\", len=%d\n", ntohl(wsize));
878
879 return process_pcp_response(pcpConn, 'N');
880 }
881
882 static void
free_processInfo(struct PCPConnInfo * pcpConn,void * ptr)883 free_processInfo(struct PCPConnInfo *pcpConn, void *ptr)
884 {
885 ProcessInfo *pi = (ProcessInfo *) ptr;
886
887 if (pcpConn->Pfdebug)
888 fprintf(pcpConn->Pfdebug, "free ProcessInfo structure \n");
889
890 if (pi == NULL)
891 {
892 if (pcpConn->Pfdebug)
893 fprintf(pcpConn->Pfdebug, "ProcessInfo structure is NULL nothing to free \n");
894 return;
895 }
896 if (pi->connection_info)
897 pfree(pi->connection_info);
898 pfree(pi);
899 }
900
901 static void
process_process_info_response(PCPConnInfo * pcpConn,char * buf,int len)902 process_process_info_response(PCPConnInfo * pcpConn, char *buf, int len)
903 {
904 char *index;
905 ProcessInfo *processInfo = NULL;
906
907 if (strcmp(buf, "ArraySize") == 0)
908 {
909 int ci_size;
910
911 index = (char *) memchr(buf, '\0', len);
912 if (index == NULL)
913 goto INVALID_RESPONSE;
914 index += 1;
915 ci_size = atoi(index);
916
917 setResultStatus(pcpConn, PCP_RES_INCOMPLETE);
918 setResultSlotCount(pcpConn, ci_size);
919 pcpConn->pcpResInfo->nextFillSlot = 0;
920 return;
921 }
922 else if (strcmp(buf, "ProcessInfo") == 0)
923 {
924 if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
925 goto INVALID_RESPONSE;
926
927 processInfo = palloc0(sizeof(ProcessInfo));
928 processInfo->connection_info = palloc0(sizeof(ConnectionInfo));
929
930 index = (char *) memchr(buf, '\0', len);
931 if (index == NULL)
932 goto INVALID_RESPONSE;
933 index += 1;
934 processInfo->pid = atoi(index);
935
936 index = (char *) memchr(index, '\0', len);
937 if (index == NULL)
938 goto INVALID_RESPONSE;
939 index += 1;
940 strlcpy(processInfo->connection_info->database, index, SM_DATABASE);
941
942 index = (char *) memchr(index, '\0', len);
943 if (index == NULL)
944 goto INVALID_RESPONSE;
945 index += 1;
946 strlcpy(processInfo->connection_info->user, index, SM_USER);
947
948 index = (char *) memchr(index, '\0', len);
949 if (index == NULL)
950 goto INVALID_RESPONSE;
951 index += 1;
952 processInfo->start_time = atol(index);
953
954 index = (char *) memchr(index, '\0', len);
955 if (index == NULL)
956 goto INVALID_RESPONSE;
957 index += 1;
958 processInfo->connection_info->create_time = atol(index);
959
960 index = (char *) memchr(index, '\0', len);
961 if (index == NULL)
962 goto INVALID_RESPONSE;
963 index += 1;
964 processInfo->connection_info->major = atoi(index);
965
966 index = (char *) memchr(index, '\0', len);
967 if (index == NULL)
968 goto INVALID_RESPONSE;
969 index += 1;
970 processInfo->connection_info->minor = atoi(index);
971
972 index = (char *) memchr(index, '\0', len);
973 if (index == NULL)
974 goto INVALID_RESPONSE;
975 index += 1;
976 processInfo->connection_info->counter = atoi(index);
977
978 index = (char *) memchr(index, '\0', len);
979 if (index == NULL)
980 goto INVALID_RESPONSE;
981 index += 1;
982 processInfo->connection_info->backend_id = atoi(index);
983
984 index = (char *) memchr(index, '\0', len);
985 if (index == NULL)
986 goto INVALID_RESPONSE;
987 index += 1;
988 processInfo->connection_info->pid = atoi(index);
989
990 index = (char *) memchr(index, '\0', len);
991 if (index == NULL)
992 goto INVALID_RESPONSE;
993 index += 1;
994 processInfo->connection_info->connected = atoi(index);
995
996 if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) processInfo, sizeof(ProcessInfo), free_processInfo) < 0)
997 goto INVALID_RESPONSE;
998
999 return;
1000 }
1001
1002 else if (strcmp(buf, "CommandComplete") == 0)
1003 {
1004 setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1005 return;
1006 }
1007
1008 INVALID_RESPONSE:
1009
1010 if (processInfo)
1011 {
1012 if (processInfo->connection_info)
1013 pfree(processInfo->connection_info);
1014 pfree(processInfo);
1015 }
1016 pcp_internal_error(pcpConn,
1017 "command failed. invalid response");
1018 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1019 }
1020
1021 /* --------------------------------
1022 * pcp_process_info - get information of node pointed by given argument
1023 *
1024 * return structure of process information on success, -1 otherwise
1025 * --------------------------------
1026 */
1027 PCPResultInfo *
pcp_process_info(PCPConnInfo * pcpConn,int pid)1028 pcp_process_info(PCPConnInfo * pcpConn, int pid)
1029 {
1030 int wsize;
1031 char process_id[16];
1032
1033 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1034 {
1035 pcp_internal_error(pcpConn, "invalid PCP connection");
1036 return NULL;
1037 }
1038
1039 snprintf(process_id, sizeof(process_id), "%d", pid);
1040
1041 pcp_write(pcpConn->pcpConn, "P", 1);
1042 wsize = htonl(strlen(process_id) + 1 + sizeof(int));
1043 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1044 pcp_write(pcpConn->pcpConn, process_id, strlen(process_id) + 1);
1045 if (PCPFlush(pcpConn) < 0)
1046 return NULL;
1047 if (pcpConn->Pfdebug)
1048 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"P\", len=%d\n", ntohl(wsize));
1049
1050 return process_pcp_response(pcpConn, 'P');
1051 }
1052
1053 /* --------------------------------
1054 * pcp_detach_node - detach a node given by the argument from pgpool's control
1055 *
1056 * return 0 on success, -1 otherwise
1057 * --------------------------------
1058 */
1059 PCPResultInfo *
pcp_detach_node(PCPConnInfo * pcpConn,int nid)1060 pcp_detach_node(PCPConnInfo * pcpConn, int nid)
1061 {
1062 return _pcp_detach_node(pcpConn, nid, FALSE);
1063 }
1064
1065 /* --------------------------------
1066
1067 * and detach a node given by the argument from pgpool's control
1068 *
1069 * return 0 on success, -1 otherwise
1070 * --------------------------------
1071 */
1072 PCPResultInfo *
pcp_detach_node_gracefully(PCPConnInfo * pcpConn,int nid)1073 pcp_detach_node_gracefully(PCPConnInfo * pcpConn, int nid)
1074 {
1075 return _pcp_detach_node(pcpConn, nid, TRUE);
1076 }
1077
1078 static PCPResultInfo *
_pcp_detach_node(PCPConnInfo * pcpConn,int nid,bool gracefully)1079 _pcp_detach_node(PCPConnInfo * pcpConn, int nid, bool gracefully)
1080 {
1081 int wsize;
1082 char node_id[16];
1083 char *sendchar;
1084
1085 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1086 {
1087 pcp_internal_error(pcpConn, "invalid PCP connection");
1088 return NULL;
1089 }
1090
1091 snprintf(node_id, sizeof(node_id), "%d", nid);
1092
1093 if (gracefully)
1094 sendchar = "d";
1095 else
1096 sendchar = "D";
1097
1098 pcp_write(pcpConn->pcpConn, sendchar, 1);
1099 wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1100 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1101 pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1102 if (PCPFlush(pcpConn) < 0)
1103 return NULL;
1104 if (pcpConn->Pfdebug)
1105 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"D\", len=%d\n", ntohl(wsize));
1106
1107 return process_pcp_response(pcpConn, 'D');
1108 }
1109
1110 static void
process_command_complete_response(PCPConnInfo * pcpConn,char * buf,int len)1111 process_command_complete_response(PCPConnInfo * pcpConn, char *buf, int len)
1112 {
1113 if (strcmp(buf, "CommandComplete") == 0)
1114 {
1115 setCommandSuccessful(pcpConn);
1116 }
1117 else
1118 {
1119 pcp_internal_error(pcpConn,
1120 "command failed with reason: \"%s\"", buf);
1121 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1122 }
1123 }
1124
1125 /* --------------------------------
1126 * pcp_attach_node - attach a node given by the argument from pgpool's control
1127 *
1128 * return 0 on success, -1 otherwise
1129 * --------------------------------
1130 */
1131 PCPResultInfo *
pcp_attach_node(PCPConnInfo * pcpConn,int nid)1132 pcp_attach_node(PCPConnInfo * pcpConn, int nid)
1133 {
1134 int wsize;
1135 char node_id[16];
1136
1137 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1138 {
1139 pcp_internal_error(pcpConn, "invalid PCP connection");
1140 return NULL;
1141 }
1142
1143 snprintf(node_id, sizeof(node_id), "%d", nid);
1144
1145 pcp_write(pcpConn->pcpConn, "C", 1);
1146 wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1147 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1148 pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1149 if (PCPFlush(pcpConn) < 0)
1150 return NULL;
1151 if (pcpConn->Pfdebug)
1152 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"C\", len=%d\n", ntohl(wsize));
1153
1154 return process_pcp_response(pcpConn, 'C');
1155 }
1156
1157
1158 /* --------------------------------
1159 * pcp_pool_status - return setup parameters and status
1160 *
1161 * returns and array of POOL_REPORT_CONFIG, NULL otherwise
1162 * --------------------------------
1163 */
1164 static void
process_pool_status_response(PCPConnInfo * pcpConn,char * buf,int len)1165 process_pool_status_response(PCPConnInfo * pcpConn, char *buf, int len)
1166 {
1167 char *index;
1168 POOL_REPORT_CONFIG *status = NULL;
1169
1170 if (strcmp(buf, "ArraySize") == 0)
1171 {
1172 int ci_size;
1173
1174 index = (char *) memchr(buf, '\0', len) + 1;
1175 ci_size = ntohl(*((int *) index));
1176
1177 setResultStatus(pcpConn, PCP_RES_INCOMPLETE);
1178 setResultSlotCount(pcpConn, ci_size);
1179 pcpConn->pcpResInfo->nextFillSlot = 0;
1180 return;
1181 }
1182 else if (strcmp(buf, "ProcessConfig") == 0)
1183 {
1184 if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
1185 goto INVALID_RESPONSE;
1186
1187 status = palloc(sizeof(POOL_REPORT_CONFIG));
1188
1189 index = (char *) memchr(buf, '\0', len);
1190 if (index == NULL)
1191 goto INVALID_RESPONSE;
1192 index += 1;
1193 strlcpy(status->name, index, POOLCONFIG_MAXNAMELEN + 1);
1194
1195 index = (char *) memchr(index, '\0', len);
1196 if (index == NULL)
1197 goto INVALID_RESPONSE;
1198 index += 1;
1199 strlcpy(status->value, index, POOLCONFIG_MAXVALLEN + 1);
1200
1201 index = (char *) memchr(index, '\0', len);
1202 if (index == NULL)
1203 goto INVALID_RESPONSE;
1204 index += 1;
1205 strlcpy(status->desc, index, POOLCONFIG_MAXDESCLEN + 1);
1206
1207 if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) status, sizeof(POOL_REPORT_CONFIG), NULL) < 0)
1208 goto INVALID_RESPONSE;
1209 return;
1210 }
1211 else if (strcmp(buf, "CommandComplete") == 0)
1212 {
1213 setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1214 return;
1215 }
1216
1217 INVALID_RESPONSE:
1218
1219 if (status)
1220 pfree(status);
1221 pcp_internal_error(pcpConn,
1222 "command failed. invalid response");
1223 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1224 }
1225
1226 PCPResultInfo *
pcp_pool_status(PCPConnInfo * pcpConn)1227 pcp_pool_status(PCPConnInfo * pcpConn)
1228 {
1229 int wsize;
1230
1231 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1232 {
1233 pcp_internal_error(pcpConn, "invalid PCP connection");
1234 return NULL;
1235 }
1236
1237 pcp_write(pcpConn->pcpConn, "B", 1);
1238 wsize = htonl(sizeof(int));
1239 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1240 if (PCPFlush(pcpConn) < 0)
1241 return NULL;
1242 if (pcpConn->Pfdebug)
1243 fprintf(pcpConn->Pfdebug, "DEBUG pcp_pool_status: send: tos=\"B\", len=%d\n", ntohl(wsize));
1244 return process_pcp_response(pcpConn, 'B');
1245 }
1246
1247
1248 PCPResultInfo *
pcp_recovery_node(PCPConnInfo * pcpConn,int nid)1249 pcp_recovery_node(PCPConnInfo * pcpConn, int nid)
1250 {
1251 int wsize;
1252 char node_id[16];
1253
1254 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1255 {
1256 pcp_internal_error(pcpConn, "invalid PCP connection");
1257 return NULL;
1258 }
1259
1260 snprintf(node_id, sizeof(node_id), "%d", nid);
1261
1262 pcp_write(pcpConn->pcpConn, "O", 1);
1263 wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1264 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1265 pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1266 if (PCPFlush(pcpConn) < 0)
1267 return NULL;
1268 if (pcpConn->Pfdebug)
1269 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"D\", len=%d\n", ntohl(wsize));
1270
1271 return process_pcp_response(pcpConn, 'O');
1272 }
1273
1274 /* --------------------------------
1275 * pcp_promote_node - promote a node given by the argument as new pgpool's master
1276 *
1277 * return 0 on success, -1 otherwise
1278 * --------------------------------
1279 */
1280 PCPResultInfo *
pcp_promote_node(PCPConnInfo * pcpConn,int nid)1281 pcp_promote_node(PCPConnInfo * pcpConn, int nid)
1282 {
1283 return _pcp_promote_node(pcpConn, nid, FALSE);
1284 }
1285
1286 /* --------------------------------
1287
1288 * and promote a node given by the argument as new pgpool's master
1289 *
1290 * return 0 on success, -1 otherwise
1291 * --------------------------------
1292 */
1293 PCPResultInfo *
pcp_promote_node_gracefully(PCPConnInfo * pcpConn,int nid)1294 pcp_promote_node_gracefully(PCPConnInfo * pcpConn, int nid)
1295 {
1296 return _pcp_promote_node(pcpConn, nid, TRUE);
1297 }
1298
1299 static PCPResultInfo *
_pcp_promote_node(PCPConnInfo * pcpConn,int nid,bool gracefully)1300 _pcp_promote_node(PCPConnInfo * pcpConn, int nid, bool gracefully)
1301 {
1302 int wsize;
1303 char node_id[16];
1304 char *sendchar;
1305
1306 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1307 {
1308 pcp_internal_error(pcpConn, "invalid PCP connection");
1309 return NULL;
1310 }
1311
1312 snprintf(node_id, sizeof(node_id), "%d", nid);
1313
1314 if (gracefully)
1315 sendchar = "j";
1316 else
1317 sendchar = "J";
1318
1319 pcp_write(pcpConn->pcpConn, sendchar, 1);
1320 wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1321 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1322 pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1323 if (PCPFlush(pcpConn) < 0)
1324 return NULL;
1325 if (pcpConn->Pfdebug)
1326 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"E\", len=%d\n", ntohl(wsize));
1327
1328 return process_pcp_response(pcpConn, 'J');
1329 }
1330
1331 static void
process_watchdog_info_response(PCPConnInfo * pcpConn,char * buf,int len)1332 process_watchdog_info_response(PCPConnInfo * pcpConn, char *buf, int len)
1333 {
1334 char *json_data = NULL;
1335 PCPWDClusterInfo *wd_cluster_info = NULL;
1336 int clusterDataSize = 0;
1337
1338 if (strcmp(buf, "CommandComplete") == 0)
1339 {
1340 int tempVal;
1341 char *ptr;
1342
1343 json_data = (char *) memchr(buf, '\0', len);
1344 if (json_data == NULL)
1345 goto INVALID_RESPONSE;
1346 json_data += 1;
1347
1348 json_value *root;
1349 json_value *value;
1350 int i,
1351 nodeCount;
1352
1353 root = json_parse(json_data, len);
1354
1355 /* The root node must be object */
1356 if (root == NULL || root->type != json_object)
1357 {
1358 json_value_free(root);
1359 goto INVALID_RESPONSE;
1360 }
1361
1362 if (json_get_int_value_for_key(root, "NodeCount", &nodeCount))
1363 {
1364 json_value_free(root);
1365 goto INVALID_RESPONSE;
1366 }
1367
1368 /* find the WatchdogNodes array */
1369 value = json_get_value_for_key(root, "WatchdogNodes");
1370 if (value == NULL)
1371 {
1372 json_value_free(root);
1373 goto INVALID_RESPONSE;
1374 }
1375 if (value->type != json_array)
1376 {
1377 json_value_free(root);
1378 goto INVALID_RESPONSE;
1379 }
1380 if (nodeCount != value->u.array.length)
1381 {
1382 json_value_free(root);
1383 goto INVALID_RESPONSE;
1384 }
1385
1386 /* create the cluster object */
1387 clusterDataSize = sizeof(PCPWDClusterInfo) + (sizeof(PCPWDNodeInfo) * nodeCount);
1388 wd_cluster_info = malloc(clusterDataSize);
1389
1390 wd_cluster_info->nodeCount = nodeCount;
1391
1392 if (json_get_int_value_for_key(root, "RemoteNodeCount", &wd_cluster_info->remoteNodeCount))
1393 {
1394 json_value_free(root);
1395 goto INVALID_RESPONSE;
1396 }
1397 if (json_get_int_value_for_key(root, "QuorumStatus", &wd_cluster_info->quorumStatus))
1398 {
1399 json_value_free(root);
1400 goto INVALID_RESPONSE;
1401 }
1402 if (json_get_int_value_for_key(root, "AliveNodeCount", &wd_cluster_info->aliveNodeCount))
1403 {
1404 json_value_free(root);
1405 goto INVALID_RESPONSE;
1406 }
1407 if (json_get_int_value_for_key(root, "Escalated", &tempVal))
1408 {
1409 json_value_free(root);
1410 goto INVALID_RESPONSE;
1411 }
1412 wd_cluster_info->escalated = tempVal == 0 ? false : true;
1413
1414 ptr = json_get_string_value_for_key(root, "MasterNodeName");
1415 if (ptr == NULL)
1416 {
1417 json_value_free(root);
1418 goto INVALID_RESPONSE;
1419 }
1420 strncpy(wd_cluster_info->masterNodeName, ptr, sizeof(wd_cluster_info->masterNodeName) - 1);
1421
1422 ptr = json_get_string_value_for_key(root, "MasterHostName");
1423 if (ptr == NULL)
1424 {
1425 json_value_free(root);
1426 goto INVALID_RESPONSE;
1427 }
1428 strncpy(wd_cluster_info->masterHostName, ptr, sizeof(wd_cluster_info->masterHostName) - 1);
1429
1430 /* Get watchdog nodes data */
1431 for (i = 0; i < nodeCount; i++)
1432 {
1433 char *ptr;
1434 json_value *nodeInfoValue = value->u.array.values[i];
1435 PCPWDNodeInfo *wdNodeInfo = &wd_cluster_info->nodeList[i];
1436
1437 if (nodeInfoValue->type != json_object)
1438 {
1439 json_value_free(root);
1440 goto INVALID_RESPONSE;
1441 }
1442
1443 if (json_get_int_value_for_key(nodeInfoValue, "ID", &wdNodeInfo->id))
1444 {
1445 json_value_free(root);
1446 goto INVALID_RESPONSE;
1447 }
1448
1449 ptr = json_get_string_value_for_key(nodeInfoValue, "NodeName");
1450 if (ptr == NULL)
1451 {
1452 json_value_free(root);
1453 goto INVALID_RESPONSE;
1454 }
1455 strncpy(wdNodeInfo->nodeName, ptr, sizeof(wdNodeInfo->nodeName) - 1);
1456
1457 ptr = json_get_string_value_for_key(nodeInfoValue, "HostName");
1458 if (ptr == NULL)
1459 {
1460 json_value_free(root);
1461 goto INVALID_RESPONSE;
1462 }
1463 strncpy(wdNodeInfo->hostName, ptr, sizeof(wdNodeInfo->hostName) - 1);
1464
1465 ptr = json_get_string_value_for_key(nodeInfoValue, "DelegateIP");
1466 if (ptr == NULL)
1467 {
1468 json_value_free(root);
1469 goto INVALID_RESPONSE;
1470 }
1471 strncpy(wdNodeInfo->delegate_ip, ptr, sizeof(wdNodeInfo->delegate_ip) - 1);
1472
1473 if (json_get_int_value_for_key(nodeInfoValue, "WdPort", &wdNodeInfo->wd_port))
1474 {
1475 json_value_free(root);
1476 goto INVALID_RESPONSE;
1477 }
1478
1479 if (json_get_int_value_for_key(nodeInfoValue, "PgpoolPort", &wdNodeInfo->pgpool_port))
1480 {
1481 json_value_free(root);
1482 goto INVALID_RESPONSE;
1483 }
1484
1485 if (json_get_int_value_for_key(nodeInfoValue, "State", &wdNodeInfo->state))
1486 {
1487 json_value_free(root);
1488 goto INVALID_RESPONSE;
1489 }
1490
1491 ptr = json_get_string_value_for_key(nodeInfoValue, "StateName");
1492 if (ptr == NULL)
1493 {
1494 json_value_free(root);
1495 goto INVALID_RESPONSE;
1496 }
1497 strncpy(wdNodeInfo->stateName, ptr, sizeof(wdNodeInfo->stateName) - 1);
1498
1499 if (json_get_int_value_for_key(nodeInfoValue, "Priority", &wdNodeInfo->wd_priority))
1500 {
1501 json_value_free(root);
1502 goto INVALID_RESPONSE;
1503 }
1504
1505 }
1506 json_value_free(root);
1507
1508 if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) wd_cluster_info, clusterDataSize, NULL) < 0)
1509 goto INVALID_RESPONSE;
1510
1511 setCommandSuccessful(pcpConn);
1512 }
1513 else
1514 {
1515 pcp_internal_error(pcpConn,
1516 "command failed with reason: \"%s\"\n", buf);
1517 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1518 }
1519 return;
1520
1521 INVALID_RESPONSE:
1522
1523 if (wd_cluster_info)
1524 pfree(wd_cluster_info);
1525 pcp_internal_error(pcpConn,
1526 "command failed. invalid response\n");
1527 setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1528 }
1529
1530 /* --------------------------------
1531 * pcp_watchdog_info - get information of watchdog
1532 *
1533 * return structure of watchdog information on success, -1 otherwise
1534 * --------------------------------
1535 */
1536 PCPResultInfo *
pcp_watchdog_info(PCPConnInfo * pcpConn,int nid)1537 pcp_watchdog_info(PCPConnInfo * pcpConn, int nid)
1538 {
1539 int wsize;
1540 char wd_index[16];
1541
1542 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1543 {
1544 pcp_internal_error(pcpConn, "invalid PCP connection");
1545 return NULL;
1546 }
1547
1548 snprintf(wd_index, sizeof(wd_index), "%d", nid);
1549
1550 pcp_write(pcpConn->pcpConn, "W", 1);
1551 wsize = htonl(strlen(wd_index) + 1 + sizeof(int));
1552 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1553 pcp_write(pcpConn->pcpConn, wd_index, strlen(wd_index) + 1);
1554 if (PCPFlush(pcpConn) < 0)
1555 return NULL;
1556 if (pcpConn->Pfdebug)
1557 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"W\", len=%d\n", ntohl(wsize));
1558
1559 return process_pcp_response(pcpConn, 'W');
1560 }
1561
1562 PCPResultInfo *
pcp_set_backend_parameter(PCPConnInfo * pcpConn,char * parameter_name,char * value)1563 pcp_set_backend_parameter(PCPConnInfo * pcpConn, char *parameter_name, char *value)
1564 {
1565 int wsize;
1566 char null_chr = 0;
1567
1568 if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1569 {
1570 pcp_internal_error(pcpConn, "invalid PCP connection");
1571 return NULL;
1572 }
1573 if (pcpConn->Pfdebug)
1574 fprintf(pcpConn->Pfdebug, "DEBUG: seting: \"%s = %s\"\n", parameter_name, value);
1575
1576 pcp_write(pcpConn->pcpConn, "A", 1);
1577 wsize = htonl(strlen(parameter_name) + 1 +
1578 strlen(value) + 1 +
1579 sizeof(int));
1580 pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1581 pcp_write(pcpConn->pcpConn, parameter_name, strlen(parameter_name));
1582 pcp_write(pcpConn->pcpConn, &null_chr, 1);
1583 pcp_write(pcpConn->pcpConn, value, strlen(value));
1584 pcp_write(pcpConn->pcpConn, &null_chr, 1);
1585 if (PCPFlush(pcpConn) < 0)
1586 return NULL;
1587 if (pcpConn->Pfdebug)
1588 fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"A\", len=%d\n", ntohl(wsize));
1589
1590 return process_pcp_response(pcpConn, 'A');
1591 }
1592
1593 /*
1594 * pcpAddInternalNotice - produce an internally-generated notice message
1595 *
1596 * A format string and optional arguments can be passed.
1597 *
1598 * The supplied text is taken as primary message (ie., it should not include
1599 * a trailing newline, and should not be more than one line).
1600 */
1601 static void
pcp_internal_error(PCPConnInfo * pcpConn,const char * fmt,...)1602 pcp_internal_error(PCPConnInfo * pcpConn, const char *fmt,...)
1603 {
1604 char msgBuf[1024];
1605 va_list args;
1606
1607 if (pcpConn == NULL)
1608 return; /* nobody home to receive notice? */
1609
1610 /* Format the message */
1611 va_start(args, fmt);
1612 vsnprintf(msgBuf, sizeof(msgBuf), fmt, args);
1613 va_end(args);
1614 msgBuf[sizeof(msgBuf) - 1] = '\0'; /* make real sure it's terminated */
1615 if (pcpConn->errMsg)
1616 pfree(pcpConn->errMsg);
1617 pcpConn->errMsg = pstrdup(msgBuf);
1618 }
1619
1620 ConnStateType
PCPConnectionStatus(const PCPConnInfo * conn)1621 PCPConnectionStatus(const PCPConnInfo * conn)
1622 {
1623 if (!conn)
1624 return PCP_CONNECTION_BAD;
1625 return conn->connState;
1626 }
1627
1628 ResultStateType
PCPResultStatus(const PCPResultInfo * res)1629 PCPResultStatus(const PCPResultInfo * res)
1630 {
1631 if (!res)
1632 return PCP_RES_ERROR;
1633 return res->resultStatus;
1634 }
1635
1636 static void
setResultStatus(PCPConnInfo * pcpConn,ResultStateType resultState)1637 setResultStatus(PCPConnInfo * pcpConn, ResultStateType resultState)
1638 {
1639 if (pcpConn && pcpConn->pcpResInfo)
1640 pcpConn->pcpResInfo->resultStatus = resultState;
1641 }
1642
1643 static void
setCommandSuccessful(PCPConnInfo * pcpConn)1644 setCommandSuccessful(PCPConnInfo * pcpConn)
1645 {
1646 setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1647 }
1648
1649 static void
setResultSlotCount(PCPConnInfo * pcpConn,unsigned int slotCount)1650 setResultSlotCount(PCPConnInfo * pcpConn, unsigned int slotCount)
1651 {
1652 if (pcpConn && pcpConn->pcpResInfo && slotCount > 0)
1653 {
1654 if (pcpConn->pcpResInfo->resultSlots == 0)
1655 pcpConn->pcpResInfo->resultSlots = 1;
1656
1657 if (slotCount > pcpConn->pcpResInfo->resultSlots)
1658 {
1659 pcpConn->pcpResInfo = repalloc(pcpConn->pcpResInfo, sizeof(PCPResultInfo) + (sizeof(PCPResultSlot) * (slotCount - 1)));
1660 }
1661 pcpConn->pcpResInfo->resultSlots = slotCount;
1662 }
1663 }
1664
1665 static void
setResultIntData(PCPResultInfo * res,unsigned int slotno,int value)1666 setResultIntData(PCPResultInfo * res, unsigned int slotno, int value)
1667 {
1668 if (res)
1669 {
1670 res->resultSlot[slotno].datalen = 0;
1671 res->resultSlot[slotno].isint = 1;
1672 res->resultSlot[slotno].data.integer = value;
1673 res->resultSlot[slotno].free_func = NULL;
1674 }
1675 }
1676
1677 static void
setResultBinaryData(PCPResultInfo * res,unsigned int slotno,void * value,int datalen,void (* free_func)(struct PCPConnInfo *,void *))1678 setResultBinaryData(PCPResultInfo * res, unsigned int slotno, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *))
1679 {
1680 if (res)
1681 {
1682 res->resultSlot[slotno].datalen = datalen;
1683 res->resultSlot[slotno].isint = 0;
1684 res->resultSlot[slotno].data.ptr = value;
1685 res->resultSlot[slotno].free_func = free_func;
1686 }
1687 }
1688
1689 static int
setNextResultBinaryData(PCPResultInfo * res,void * value,int datalen,void (* free_func)(struct PCPConnInfo *,void *))1690 setNextResultBinaryData(PCPResultInfo * res, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *))
1691 {
1692 if (res && res->nextFillSlot < res->resultSlots)
1693 {
1694 setResultBinaryData(res, res->nextFillSlot, value, datalen, free_func);
1695 res->nextFillSlot++;
1696 return res->nextFillSlot;
1697 }
1698 return -1;
1699 }
1700
1701 static int
PCPFlush(PCPConnInfo * pcpConn)1702 PCPFlush(PCPConnInfo * pcpConn)
1703 {
1704 int ret = pcp_flush(pcpConn->pcpConn);
1705
1706 if (ret)
1707 pcp_internal_error(pcpConn,
1708 "ERROR: sending data to backend failed with error \"%s\"", strerror(errno));
1709 return ret;
1710 }
1711
1712 int
pcp_result_slot_count(PCPResultInfo * res)1713 pcp_result_slot_count(PCPResultInfo * res)
1714 {
1715 if (res)
1716 return res->resultSlots;
1717 return 0;
1718 }
1719
1720 /* Returns 1 if ResultInfo has no data. 0 otherwise */
1721 int
pcp_result_is_empty(PCPResultInfo * res)1722 pcp_result_is_empty(PCPResultInfo * res)
1723 {
1724 if (res)
1725 {
1726 if (res->resultSlots <= 1 && res->resultSlot[0].isint == 0 && res->resultSlot[0].datalen <= 0)
1727 return 1;
1728 return 0;
1729 }
1730 return 1;
1731 }
1732
1733 void *
pcp_get_binary_data(const PCPResultInfo * res,unsigned int slotno)1734 pcp_get_binary_data(const PCPResultInfo * res, unsigned int slotno)
1735 {
1736 if (res && slotno < res->resultSlots && !res->resultSlot[slotno].isint)
1737 {
1738 return res->resultSlot[slotno].data.ptr;
1739 }
1740 return NULL;
1741 }
1742
1743 int
pcp_get_int_data(const PCPResultInfo * res,unsigned int slotno)1744 pcp_get_int_data(const PCPResultInfo * res, unsigned int slotno)
1745 {
1746 if (res && slotno < res->resultSlots && res->resultSlot[slotno].isint)
1747 {
1748 return res->resultSlot[slotno].data.integer;
1749 }
1750 return 0;
1751 }
1752
1753 int
pcp_get_data_length(const PCPResultInfo * res,unsigned int slotno)1754 pcp_get_data_length(const PCPResultInfo * res, unsigned int slotno)
1755 {
1756 if (res && slotno < res->resultSlots)
1757 {
1758 return res->resultSlot[slotno].datalen;
1759 }
1760 return 0;
1761 }
1762
1763 void
pcp_free_result(PCPConnInfo * pcpConn)1764 pcp_free_result(PCPConnInfo * pcpConn)
1765 {
1766 if (pcpConn && pcpConn->pcpResInfo)
1767 {
1768 PCPResultInfo *pcpRes = pcpConn->pcpResInfo;
1769 int i;
1770
1771 for (i = 0; i < pcpRes->resultSlots; i++)
1772 {
1773 if (pcpRes->resultSlot[i].isint)
1774 continue;
1775 if (pcpRes->resultSlot[i].data.ptr == NULL)
1776 continue;
1777
1778 if (pcpRes->resultSlot[i].free_func)
1779 pcpRes->resultSlot[i].free_func(pcpConn, pcpRes->resultSlot[i].data.ptr);
1780 else
1781 pfree(pcpRes->resultSlot[i].data.ptr);
1782 pcpRes->resultSlot[i].data.ptr = NULL;
1783 }
1784 pfree(pcpConn->pcpResInfo);
1785 pcpConn->pcpResInfo = NULL;
1786 }
1787 }
1788
1789 void
pcp_free_connection(PCPConnInfo * pcpConn)1790 pcp_free_connection(PCPConnInfo * pcpConn)
1791 {
1792 if (pcpConn)
1793 {
1794 pcp_free_result(pcpConn);
1795 if (pcpConn->errMsg)
1796 pfree(pcpConn->errMsg);
1797 /* Should we also Disconnect it? */
1798 pfree(pcpConn);
1799 }
1800 }
1801
1802 char *
pcp_get_last_error(PCPConnInfo * pcpConn)1803 pcp_get_last_error(PCPConnInfo * pcpConn)
1804 {
1805 if (pcpConn)
1806 return pcpConn->errMsg;
1807 return NULL;
1808 }
1809
1810 /*
1811 * get the password file name which could be either pointed by PCPPASSFILE
1812 * environment variable or resides in user home directory.
1813 */
1814 static bool
getPoolPassFilename(char * pgpassfile)1815 getPoolPassFilename(char *pgpassfile)
1816 {
1817 char *passfile_env;
1818
1819 if ((passfile_env = getenv("PCPPASSFILE")) != NULL)
1820 {
1821 /* use the literal path from the environment, if set */
1822 strlcpy(pgpassfile, passfile_env, MAXPGPATH);
1823 }
1824 else
1825 {
1826 char homedir[MAXPGPATH];
1827
1828 if (!get_home_directory(homedir, sizeof(homedir)))
1829 return false;
1830 snprintf(pgpassfile, MAXPGPATH + sizeof(PCPPASSFILE) + 1, "%s/%s", homedir, PCPPASSFILE);
1831 }
1832 return true;
1833 }
1834
1835 /*
1836 * Get a password from the password file. Return value is malloc'd.
1837 * format = hostname:port:username:password
1838 */
1839 static char *
PasswordFromFile(PCPConnInfo * pcpConn,char * hostname,char * port,char * username)1840 PasswordFromFile(PCPConnInfo * pcpConn, char *hostname, char *port, char *username)
1841 {
1842 FILE *fp;
1843 char pgpassfile[MAXPGPATH + sizeof(PCPPASSFILE) + 1];
1844 struct stat stat_buf;
1845 #define LINELEN NAMEDATALEN*5
1846 char buf[LINELEN];
1847
1848 if (username == NULL || strlen(username) == 0)
1849 return NULL;
1850
1851 if (hostname == NULL)
1852 hostname = DefaultHost;
1853 else if (strcmp(hostname, UNIX_DOMAIN_PATH) == 0)
1854 hostname = DefaultHost;
1855
1856 if (!getPoolPassFilename(pgpassfile))
1857 return NULL;
1858
1859 /* If password file cannot be opened, ignore it. */
1860 if (stat(pgpassfile, &stat_buf) != 0)
1861 return NULL;
1862
1863 if (!S_ISREG(stat_buf.st_mode))
1864 {
1865 if (pcpConn->Pfdebug)
1866 fprintf(pcpConn->Pfdebug, "WARNING: password file \"%s\" is not a plain file\n", pgpassfile);
1867 return NULL;
1868 }
1869
1870 /* If password file is insecure, alert the user and ignore it. */
1871 if (stat_buf.st_mode & (S_IRWXG | S_IRWXO))
1872 {
1873 if (pcpConn->Pfdebug)
1874 fprintf(pcpConn->Pfdebug, "WARNING: password file \"%s\" has group or world access; permissions should be u=rw (0600) or less\n",
1875 pgpassfile);
1876 return NULL;
1877 }
1878
1879 fp = fopen(pgpassfile, "r");
1880 if (fp == NULL)
1881 return NULL;
1882
1883 while (!feof(fp) && !ferror(fp))
1884 {
1885 char *t = buf,
1886 *ret,
1887 *p1,
1888 *p2;
1889 int len;
1890
1891 if (fgets(buf, sizeof(buf), fp) == NULL)
1892 break;
1893
1894 len = strlen(buf);
1895 if (len == 0)
1896 continue;
1897
1898 /* Remove trailing newline */
1899 if (buf[len - 1] == '\n')
1900 buf[len - 1] = 0;
1901
1902 if ((t = pwdfMatchesString(t, hostname)) == NULL ||
1903 (t = pwdfMatchesString(t, port)) == NULL ||
1904 (t = pwdfMatchesString(t, username)) == NULL)
1905 continue;
1906 ret = pstrdup(t);
1907 fclose(fp);
1908
1909 /* De-escape password. */
1910 for (p1 = p2 = ret; *p1 != ':' && *p1 != '\0'; ++p1, ++p2)
1911 {
1912 if (*p1 == '\\' && p1[1] != '\0')
1913 ++p1;
1914 *p2 = *p1;
1915 }
1916 *p2 = '\0';
1917
1918 return ret;
1919 }
1920
1921 fclose(fp);
1922 return NULL;
1923
1924 #undef LINELEN
1925 }
1926
1927 /*
1928 * Helper function for PasswordFromFile borrowed from PG
1929 * returns a pointer to the next token or NULL if the current
1930 * token doesn't match
1931 */
1932 static char *
pwdfMatchesString(char * buf,char * token)1933 pwdfMatchesString(char *buf, char *token)
1934 {
1935 char *tbuf,
1936 *ttok;
1937 bool bslash = false;
1938
1939 if (buf == NULL || token == NULL)
1940 return NULL;
1941 tbuf = buf;
1942 ttok = token;
1943 if (tbuf[0] == '*' && tbuf[1] == ':')
1944 return tbuf + 2;
1945 while (*tbuf != 0)
1946 {
1947 if (*tbuf == '\\' && !bslash)
1948 {
1949 tbuf++;
1950 bslash = true;
1951 }
1952 if (*tbuf == ':' && *ttok == 0 && !bslash)
1953 return tbuf + 1;
1954 bslash = false;
1955 if (*ttok == 0)
1956 return NULL;
1957 if (*tbuf == *ttok)
1958 {
1959 tbuf++;
1960 ttok++;
1961 }
1962 else
1963 return NULL;
1964 }
1965 return NULL;
1966 }
1967