1 /*
2  * $Header$
3  *
4  * Handles PCP connection, and protocol communication with pgpool-II
5  * These are client APIs. Server program should use APIs in pcp_stream.c
6  *
7  *
8  * pgpool: a language independent connection pool server for PostgreSQL
9  * written by Tatsuo Ishii
10  *
11  * Copyright (c) 2003-2021	PgPool Global Development Group
12  *
13  * Permission to use, copy, modify, and distribute this software and
14  * its documentation for any purpose and without fee is hereby
15  * granted, provided that the above copyright notice appear in all
16  * copies and that both that copyright notice and this permission
17  * notice appear in supporting documentation, and that the name of the
18  * author not be used in advertising or publicity pertaining to
19  * distribution of the software without specific, written prior
20  * permission. The author makes no representations about the
21  * suitability of this software for any purpose.  It is provided "as
22  * is" without express or implied warranty.
23  *
24  */
25 
26 #include <stdio.h>
27 #include <errno.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <sys/socket.h>
33 #include <sys/time.h>
34 #include <sys/un.h>
35 #include <netinet/in.h>
36 #include <netinet/tcp.h>
37 #include <netdb.h>
38 #include <unistd.h>
39 #include <stdarg.h>
40 
41 #include "pool.h"
42 #include "pcp/pcp.h"
43 #include "pcp/pcp_stream.h"
44 #include "utils/pool_path.h"
45 #include "utils/palloc.h"
46 #include "utils/pool_process_reporting.h"
47 #include "utils/json.h"
48 #include "auth/md5.h"
49 
50 #define PCPPASSFILE ".pcppass"
51 #define DefaultHost  "localhost"
52 
53 
54 static int	pcp_authorize(PCPConnInfo * pcpConn, char *username, char *password);
55 
56 static void pcp_internal_error(PCPConnInfo * pcpConn, const char *fmt,...);
57 
58 static PCPResultInfo * _pcp_detach_node(PCPConnInfo * pcpConn, int nid, bool gracefully);
59 static PCPResultInfo * _pcp_promote_node(PCPConnInfo * pcpConn, int nid, bool gracefully, bool promote);
60 static PCPResultInfo * process_pcp_response(PCPConnInfo * pcpConn, char sentMsg);
61 static void setCommandSuccessful(PCPConnInfo * pcpConn);
62 static void setResultStatus(PCPConnInfo * pcpConn, ResultStateType resultState);
63 static void setResultBinaryData(PCPResultInfo * res, unsigned int slotno, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *));
64 static int	setNextResultBinaryData(PCPResultInfo * res, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *));
65 static void setResultIntData(PCPResultInfo * res, unsigned int slotno, int value);
66 
67 static void process_node_info_response(PCPConnInfo * pcpConn, char *buf, int len);
68 static void	process_health_check_stats_response(PCPConnInfo * pcpConn, char *buf, int len);
69 static void process_command_complete_response(PCPConnInfo * pcpConn, char *buf, int len);
70 static void process_watchdog_info_response(PCPConnInfo * pcpConn, char *buf, int len);
71 static void process_process_info_response(PCPConnInfo * pcpConn, char *buf, int len);
72 static void process_pool_status_response(PCPConnInfo * pcpConn, char *buf, int len);
73 static void process_pcp_node_count_response(PCPConnInfo * pcpConn, char *buf, int len);
74 static void process_process_count_response(PCPConnInfo * pcpConn, char *buf, int len);
75 static void process_salt_info_response(PCPConnInfo * pcpConn, char *buf, int len);
76 static void process_error_response(PCPConnInfo * pcpConn, char toc, char *buff);
77 
78 
79 static void setResultSlotCount(PCPConnInfo * pcpConn, unsigned int slotCount);
80 static int	PCPFlush(PCPConnInfo * pcpConn);
81 
82 static bool getPoolPassFilename(char *pgpassfile);
83 static char *PasswordFromFile(PCPConnInfo * pcpConn, char *hostname, char *port, char *username);
84 static char *pwdfMatchesString(char *buf, char *token);
85 
86 /* --------------------------------
87  * pcp_connect - open connection to pgpool using given arguments
88  *
89  * return 0 on success, -1 otherwise
90  * --------------------------------
91  */
92 
93 /* Check if PCP connection is connected and authenticated
94  * return 1 on successful 0 otherwise
95  */
96 
97 PCPConnInfo *
pcp_connect(char * hostname,int port,char * username,char * password,FILE * Pfdebug)98 pcp_connect(char *hostname, int port, char *username, char *password, FILE *Pfdebug)
99 {
100 	struct sockaddr_in addr;
101 	struct sockaddr_un unix_addr;
102 	struct hostent *hp;
103 	char	   *password_from_file = NULL;
104 	char		os_user[256];
105 	PCPConnInfo *pcpConn = palloc0(sizeof(PCPConnInfo));
106 	int			fd;
107 	int			on = 1;
108 	int			len;
109 
110 	pcpConn->connState = PCP_CONNECTION_NOT_CONNECTED;
111 	pcpConn->Pfdebug = Pfdebug;
112 
113 	if (hostname == NULL || *hostname == '\0' || *hostname == '/')
114 	{
115 		char	   *path;
116 
117 		fd = socket(AF_UNIX, SOCK_STREAM, 0);
118 
119 		if (fd < 0)
120 		{
121 			pcp_internal_error(pcpConn,
122 							   "ERROR: failed to create UNIX domain socket. socket error \"%s\"", strerror(errno));
123 			pcpConn->connState = PCP_CONNECTION_BAD;
124 
125 			return pcpConn;
126 		}
127 
128 		memset(&unix_addr, 0, sizeof(unix_addr));
129 		unix_addr.sun_family = AF_UNIX;
130 
131 		if (hostname == NULL || *hostname == '\0')
132 		{
133 			path = UNIX_DOMAIN_PATH;
134 			hostname = path;
135 		}
136 		else
137 		{
138 			path = hostname;
139 		}
140 
141 		snprintf(unix_addr.sun_path, sizeof(unix_addr.sun_path), "%s/.s.PGSQL.%d",
142 				 path, port);
143 
144 		if (connect(fd, (struct sockaddr *) &unix_addr, sizeof(unix_addr)) < 0)
145 		{
146 			close(fd);
147 
148 			pcp_internal_error(pcpConn,
149 							   "ERROR: connection to socket \"%s\" failed with error \"%s\"", unix_addr.sun_path, strerror(errno));
150 			pcpConn->connState = PCP_CONNECTION_BAD;
151 			return pcpConn;
152 		}
153 	}
154 	else
155 	{
156 		fd = socket(AF_INET, SOCK_STREAM, 0);
157 		if (fd < 0)
158 		{
159 			pcp_internal_error(pcpConn,
160 							   "ERROR: failed to create INET domain socket with error \"%s\"", strerror(errno));
161 			pcpConn->connState = PCP_CONNECTION_BAD;
162 			return pcpConn;
163 		}
164 
165 		if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
166 					   (char *) &on, sizeof(on)) < 0)
167 		{
168 			close(fd);
169 
170 			pcp_internal_error(pcpConn,
171 							   "ERROR: set socket option failed with error \"%s\"", strerror(errno));
172 			pcpConn->connState = PCP_CONNECTION_BAD;
173 			return pcpConn;
174 		}
175 
176 		memset((char *) &addr, 0, sizeof(addr));
177 		addr.sin_family = AF_INET;
178 		hp = gethostbyname(hostname);
179 		if ((hp == NULL) || (hp->h_addrtype != AF_INET))
180 		{
181 			close(fd);
182 			pcp_internal_error(pcpConn,
183 							   "ERROR: could not retrieve hostname. gethostbyname failed with error \"%s\"", strerror(errno));
184 			pcpConn->connState = PCP_CONNECTION_BAD;
185 			return pcpConn;
186 
187 		}
188 		memmove((char *) &(addr.sin_addr),
189 				(char *) hp->h_addr,
190 				hp->h_length);
191 		addr.sin_port = htons(port);
192 
193 		len = sizeof(struct sockaddr_in);
194 		if (connect(fd, (struct sockaddr *) &addr, len) < 0)
195 		{
196 			close(fd);
197 			pcp_internal_error(pcpConn,
198 							   "ERROR: connection to host \"%s\" failed with error \"%s\"", hostname, strerror(errno));
199 			pcpConn->connState = PCP_CONNECTION_BAD;
200 			return pcpConn;
201 		}
202 	}
203 
204 	pcpConn->pcpConn = pcp_open(fd);
205 	if (pcpConn->pcpConn == NULL)
206 	{
207 		close(fd);
208 		pcp_internal_error(pcpConn,
209 						   "ERROR: failed to allocate memory");
210 		pcpConn->connState = PCP_CONNECTION_BAD;
211 		return pcpConn;
212 	}
213 	pcpConn->connState = PCP_CONNECTION_CONNECTED;
214 
215 	/*
216 	 * If username is not provided. Use the os user name and do not complain
217 	 * if it (getting os user name) gets failed
218 	 */
219 	if (username == NULL && get_os_username(os_user, sizeof(os_user)))
220 		username = os_user;
221 
222 	/*
223 	 * If password is not provided. lookup in pcppass file
224 	 */
225 	if (password == NULL || *password == '\0')
226 	{
227 		char		port_str[100];
228 
229 		snprintf(port_str, sizeof(port_str), "%d", port);
230 		password_from_file = PasswordFromFile(pcpConn, hostname, port_str, username);
231 		password = password_from_file;
232 	}
233 
234 	if (pcp_authorize(pcpConn, username, password) < 0)
235 	{
236 		pcp_close(pcpConn->pcpConn);
237 		pcpConn->pcpConn = NULL;
238 		pcpConn->connState = PCP_CONNECTION_AUTH_ERROR;
239 	}
240 	else
241 		pcpConn->connState = PCP_CONNECTION_OK;
242 
243 	if (password_from_file)
244 		pfree(password_from_file);
245 
246 	return pcpConn;
247 }
248 
249 static void
process_salt_info_response(PCPConnInfo * pcpConn,char * buf,int len)250 process_salt_info_response(PCPConnInfo * pcpConn, char *buf, int len)
251 {
252 	char	   *salt = palloc((sizeof(char) * 4));
253 
254 	memcpy(salt, buf, 4);
255 	if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) salt, 4, NULL) < 0)
256 	{
257 		pcp_internal_error(pcpConn,
258 
259 						   "command failed. invalid response");
260 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
261 	}
262 	else
263 	{
264 		setCommandSuccessful(pcpConn);
265 	}
266 }
267 
268 /* --------------------------------
269  * pcp_authorize - authenticate with pgpool using username and password
270  *
271  * return 0 on success, -1 otherwise
272  * --------------------------------
273  */
274 static int
pcp_authorize(PCPConnInfo * pcpConn,char * username,char * password)275 pcp_authorize(PCPConnInfo * pcpConn, char *username, char *password)
276 {
277 	int			wsize;
278 	char		salt[4];
279 	char	   *salt_ptr;
280 	char		encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
281 	char		md5[MD5_PASSWD_LEN + 1];
282 	PCPResultInfo *pcpRes;
283 
284 	if (password == NULL)
285 		password = "";
286 
287 	if (username == NULL)
288 		username = "";
289 
290 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_CONNECTED)
291 	{
292 		pcp_internal_error(pcpConn,
293 						   "ERROR: PCP authorization failed. invalid connection state.");
294 		return -1;
295 	}
296 
297 	if (strlen(username) >= MAX_USER_PASSWD_LEN)
298 	{
299 		pcp_internal_error(pcpConn,
300 						   "ERROR: PCP authorization failed. username too long.");
301 		return -1;
302 	}
303 
304 	/* request salt */
305 	pcp_write(pcpConn->pcpConn, "M", 1);
306 	wsize = htonl(sizeof(int));
307 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
308 	if (PCPFlush(pcpConn) < 0)
309 		return -1;
310 
311 	pcpRes = process_pcp_response(pcpConn, 'M');
312 	if (PCPResultStatus(pcpRes) != PCP_RES_COMMAND_OK)
313 		return -1;
314 
315 	salt_ptr = pcp_get_binary_data(pcpRes, 0);
316 	if (salt_ptr == NULL)
317 		return -1;
318 	memcpy(salt, salt_ptr, 4);
319 
320 	/* encrypt password */
321 	pool_md5_hash(password, strlen(password), md5);
322 	md5[MD5_PASSWD_LEN] = '\0';
323 
324 	pool_md5_encrypt(md5, username, strlen(username),
325 					 encrypt_buf + MD5_PASSWD_LEN + 1);
326 	encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
327 
328 	pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, 4,
329 					 encrypt_buf);
330 	encrypt_buf[MD5_PASSWD_LEN] = '\0';
331 
332 	pcp_write(pcpConn->pcpConn, "R", 1);
333 	wsize = htonl((strlen(username) + 1 + strlen(encrypt_buf) + 1) + sizeof(int));
334 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
335 	pcp_write(pcpConn->pcpConn, username, strlen(username) + 1);
336 	pcp_write(pcpConn->pcpConn, encrypt_buf, strlen(encrypt_buf) + 1);
337 	if (PCPFlush(pcpConn) < 0)
338 		return -1;
339 	pcpRes = process_pcp_response(pcpConn, 'R');
340 	if (PCPResultStatus(pcpRes) != PCP_RES_COMMAND_OK)
341 		return -1;
342 	pcp_free_result(pcpConn);
343 	return 0;
344 }
345 
process_pcp_response(PCPConnInfo * pcpConn,char sentMsg)346 static PCPResultInfo * process_pcp_response(PCPConnInfo * pcpConn, char sentMsg)
347 {
348 	char		toc;
349 	int			rsize;
350 	char	   *buf;
351 
352 	/* create empty result */
353 	if (pcpConn->pcpResInfo == NULL)
354 	{
355 		pcpConn->pcpResInfo = palloc0(sizeof(PCPResultInfo));
356 		pcpConn->pcpResInfo->resultSlots = 1;
357 	}
358 
359 	while (1)
360 	{
361 		if (pcp_read(pcpConn->pcpConn, &toc, 1))
362 		{
363 			pcp_internal_error(pcpConn,
364 							   "ERROR: unable to read data from socket.");
365 			setResultStatus(pcpConn, PCP_RES_ERROR);
366 			return pcpConn->pcpResInfo;
367 		}
368 
369 		if (pcp_read(pcpConn->pcpConn, &rsize, sizeof(int)))
370 		{
371 			pcp_internal_error(pcpConn,
372 							   "ERROR: unable to read data from socket.");
373 			setResultStatus(pcpConn, PCP_RES_ERROR);
374 			return pcpConn->pcpResInfo;
375 		}
376 		rsize = ntohl(rsize);
377 		buf = (char *) palloc(rsize);
378 
379 		if (pcp_read(pcpConn->pcpConn, buf, rsize - sizeof(int)))
380 		{
381 			pfree(buf);
382 			pcp_internal_error(pcpConn,
383 							   "ERROR: unable to read data from socket.");
384 			setResultStatus(pcpConn, PCP_RES_ERROR);
385 			return pcpConn->pcpResInfo;
386 		}
387 
388 		if (pcpConn->Pfdebug)
389 			fprintf(pcpConn->Pfdebug, "DEBUG: recv: tos=\"%c\", len=%d\n", toc, rsize);
390 
391 		switch (toc)
392 		{
393 			case 'r':			/* Authentication Response */
394 				{
395 					if (sentMsg != 'R')
396 					{
397 						setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
398 					}
399 					else if (strcmp(buf, "AuthenticationOK") == 0)
400 					{
401 						pcpConn->connState = PCP_CONNECTION_OK;
402 						setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
403 					}
404 					else
405 					{
406 						pcp_internal_error(pcpConn,
407 										   "ERROR: authentication failed. reason=\"%s\"", buf);
408 						setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
409 					}
410 				}
411 				break;
412 			case 'm':
413 				if (sentMsg != 'M')
414 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
415 				else
416 					process_salt_info_response(pcpConn, buf, rsize);
417 				break;
418 
419 			case 'E':
420 				setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
421 				process_error_response(pcpConn, toc, buf);
422 				break;
423 
424 			case 'N':
425 				process_error_response(pcpConn, toc, buf);
426 				pfree(buf);
427 				continue;
428 				break;
429 
430 			case 'i':
431 				if (sentMsg != 'I')
432 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
433 				else
434 					process_node_info_response(pcpConn, buf, rsize);
435 				break;
436 
437 			case 'h':
438 				if (sentMsg != 'H')
439 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
440 				else
441 					process_health_check_stats_response(pcpConn, buf, rsize);
442 				break;
443 
444 			case 'l':
445 				if (sentMsg != 'L')
446 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
447 				else
448 					process_pcp_node_count_response(pcpConn, buf, rsize);
449 				break;
450 
451 			case 'c':
452 				if (sentMsg != 'C' && sentMsg != 'O')
453 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
454 				else
455 					process_command_complete_response(pcpConn, buf, rsize);
456 				break;
457 
458 			case 'd':
459 				if (sentMsg != 'D' && sentMsg != 'J')
460 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
461 				else
462 					process_command_complete_response(pcpConn, buf, rsize);
463 				break;
464 
465 			case 'a':
466 				if (sentMsg != 'A')
467 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
468 				else
469 					process_command_complete_response(pcpConn, buf, rsize);
470 				break;
471 
472 			case 'z':
473 				if (sentMsg != 'Z')
474 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
475 				else
476 					process_command_complete_response(pcpConn, buf, rsize);
477 				break;
478 
479 			case 'w':
480 				if (sentMsg != 'W')
481 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
482 				else
483 					process_watchdog_info_response(pcpConn, buf, rsize);
484 				break;
485 
486 			case 'p':
487 				if (sentMsg != 'P')
488 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
489 				else
490 					process_process_info_response(pcpConn, buf, rsize);
491 				break;
492 
493 			case 'n':
494 				if (sentMsg != 'N')
495 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
496 				else
497 					process_process_count_response(pcpConn, buf, rsize);
498 				break;
499 
500 			case 'b':
501 				if (sentMsg != 'B')
502 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
503 				else
504 					process_pool_status_response(pcpConn, buf, rsize);
505 				break;
506 
507 			case 't':
508 				if (sentMsg != 'T')
509 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
510 				else
511 					setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
512 				break;
513 
514 			default:
515 				setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
516 				pcp_internal_error(pcpConn,
517 								   "ERROR: invalid PCP packet type =\"%c\"", toc);
518 				break;
519 		}
520 		pfree(buf);
521 		if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
522 			break;
523 	}
524 	return pcpConn->pcpResInfo;
525 }
526 
527 static void
process_error_response(PCPConnInfo * pcpConn,char toc,char * buf)528 process_error_response(PCPConnInfo * pcpConn, char toc, char *buf)
529 {
530 	/* For time we only support sev, error message and details */
531 	char	   *errorSev = NULL;
532 	char	   *errorMsg = NULL;
533 	char	   *errorDet = NULL;
534 	char	   *e = buf;
535 
536 	if (toc != 'E' && toc != 'N')
537 		return;
538 
539 	while (*e)
540 	{
541 		char		type = *e;
542 
543 		e++;
544 		if (*e == 0)
545 			break;
546 
547 		if (type == 'M')
548 			errorMsg = e;
549 		else if (type == 'S')
550 			errorSev = e;
551 		else if (type == 'D')
552 			errorDet = e;
553 		else
554 			e += strlen(e) + 1;
555 		if (errorDet && errorSev && errorMsg)	/* we have all what we need */
556 			break;
557 	}
558 	if (!errorSev && !errorMsg)
559 		return;
560 
561 	if (toc != 'E')				/* This is not an error report it as debug */
562 	{
563 		if (pcpConn->Pfdebug)
564 			fprintf(pcpConn->Pfdebug,
565 					"BACKEND %s:  %s\n%s%s%s", errorSev, errorMsg,
566 					errorDet ? "DETAIL:  " : "",
567 					errorDet ? errorDet : "",
568 					errorDet ? "\n" : "");
569 	}
570 	else
571 	{
572 		pcp_internal_error(pcpConn,
573 						   "%s:  %s\n%s%s%s", errorSev, errorMsg,
574 						   errorDet ? "DETAIL:  " : "",
575 						   errorDet ? errorDet : "",
576 						   errorDet ? "\n" : "");
577 		setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
578 
579 	}
580 }
581 
582 /* --------------------------------
583  * pcp_disconnect - close connection to pgpool
584  * --------------------------------
585  */
586 void
pcp_disconnect(PCPConnInfo * pcpConn)587 pcp_disconnect(PCPConnInfo * pcpConn)
588 {
589 	int			wsize;
590 
591 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
592 	{
593 		pcp_internal_error(pcpConn, "invalid PCP connection");
594 		return;
595 	}
596 
597 	pcp_write(pcpConn->pcpConn, "X", 1);
598 	wsize = htonl(sizeof(int));
599 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
600 	if (PCPFlush(pcpConn) < 0)
601 		return;
602 	if (pcpConn->Pfdebug)
603 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"X\", len=%d\n", (int) sizeof(int));
604 
605 	pcp_close(pcpConn->pcpConn);
606 	pcpConn->connState = PCP_CONNECTION_NOT_CONNECTED;
607 	pcpConn->pcpConn = NULL;
608 }
609 
610 /* --------------------------------
611  * pcp_terminate_pgpool - send terminate packet
612  *
613  * return 0 on success, -1 otherwise
614  * --------------------------------
615  */
616 PCPResultInfo *
pcp_terminate_pgpool(PCPConnInfo * pcpConn,char mode,char command_scope)617 pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode, char command_scope)
618 {
619 	int			wsize;
620 
621 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
622 	{
623 		pcp_internal_error(pcpConn, "invalid PCP connection");
624 		return NULL;
625 	}
626 	if (command_scope == 'l')	/*local only*/
627 		pcp_write(pcpConn->pcpConn, "T", 1);
628 	else
629 		pcp_write(pcpConn->pcpConn, "t", 1);
630 	wsize = htonl(sizeof(int) + sizeof(char));
631 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
632 	pcp_write(pcpConn->pcpConn, &mode, sizeof(char));
633 	if (PCPFlush(pcpConn) < 0)
634 		return NULL;
635 	if (pcpConn->Pfdebug)
636 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"T\", len=%d\n", ntohl(wsize));
637 
638 	return process_pcp_response(pcpConn, 'T');
639 }
640 
641 static void
process_pcp_node_count_response(PCPConnInfo * pcpConn,char * buf,int len)642 process_pcp_node_count_response(PCPConnInfo * pcpConn, char *buf, int len)
643 {
644 	if (strcmp(buf, "CommandComplete") == 0)
645 	{
646 		char	   *index = NULL;
647 
648 		index = (char *) memchr(buf, '\0', len);
649 		if (index != NULL)
650 		{
651 			int			ret;
652 
653 			index += 1;
654 			ret = atoi(index);
655 			setResultIntData(pcpConn->pcpResInfo, 0, ret);
656 			setCommandSuccessful(pcpConn);
657 			return;
658 		}
659 		else
660 			pcp_internal_error(pcpConn,
661 							   "command failed. invalid response");
662 	}
663 	else
664 		pcp_internal_error(pcpConn,
665 						   "command failed with reason: \"%s\"", buf);
666 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
667 }
668 
669 /* --------------------------------
670  * pcp_node_count - get number of nodes currently connected to pgpool
671  *
672  * return array of node IDs on success, -1 otherwise
673  * --------------------------------
674  */
675 PCPResultInfo *
pcp_node_count(PCPConnInfo * pcpConn)676 pcp_node_count(PCPConnInfo * pcpConn)
677 {
678 	int			wsize;
679 
680 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
681 	{
682 		pcp_internal_error(pcpConn,
683 						   "invalid PCP connection");
684 		return NULL;
685 	}
686 	pcp_write(pcpConn->pcpConn, "L", 1);
687 	wsize = htonl(sizeof(int));
688 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
689 	if (PCPFlush(pcpConn) < 0)
690 		return NULL;
691 	if (pcpConn->Pfdebug)
692 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"L\", len=%d\n", ntohl(wsize));
693 
694 	return process_pcp_response(pcpConn, 'L');
695 }
696 
697 static void
process_node_info_response(PCPConnInfo * pcpConn,char * buf,int len)698 process_node_info_response(PCPConnInfo * pcpConn, char *buf, int len)
699 {
700 	char       *index;
701 	BackendInfo *backend_info = NULL;
702 
703 	if (strcmp(buf, "ArraySize") == 0)
704 	{
705 		int			ci_size;
706 
707 		index = (char *) memchr(buf, '\0', len);
708 		if (index == NULL)
709 			goto INVALID_RESPONSE;
710 		index += 1;
711 		ci_size = atoi(index);
712 
713 		setResultStatus(pcpConn, PCP_RES_INCOMPLETE);
714 		setResultSlotCount(pcpConn, ci_size);
715 		pcpConn->pcpResInfo->nextFillSlot = 0;
716 		return;
717 	}
718 	else if (strcmp(buf, "NodeInfo") == 0)
719 	{
720 		char	   *index = NULL;
721 
722 		if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
723 			goto INVALID_RESPONSE;
724 
725 		backend_info = (BackendInfo *) palloc(sizeof(BackendInfo));
726 
727 		index = (char *) memchr(buf, '\0', len);
728 		if (index == NULL)
729 			goto INVALID_RESPONSE;
730 		index += 1;
731 		strlcpy(backend_info->backend_hostname, index, sizeof(backend_info->backend_hostname));
732 
733 		index = (char *) memchr(index, '\0', len);
734 		if (index == NULL)
735 			goto INVALID_RESPONSE;
736 		index += 1;
737 		backend_info->backend_port = atoi(index);
738 
739 		index = (char *) memchr(index, '\0', len);
740 		if (index == NULL)
741 			goto INVALID_RESPONSE;
742 		index += 1;
743 		backend_info->backend_status = atoi(index);
744 
745 		index = (char *) memchr(index, '\0', len);
746 		if (index == NULL)
747 			goto INVALID_RESPONSE;
748 		index += 1;
749 		strlcpy(backend_info->pg_backend_status, index, sizeof(backend_info->pg_backend_status));
750 
751 		index = (char *) memchr(index, '\0', len);
752 		if (index == NULL)
753 			goto INVALID_RESPONSE;
754 		index += 1;
755 		backend_info->backend_weight = atof(index);
756 
757 		index = (char *) memchr(index, '\0', len);
758 		if (index == NULL)
759 			goto INVALID_RESPONSE;
760 		index++;
761 		backend_info->role = atoi(index);
762 
763 		index = (char *) memchr(index, '\0', len);
764 		if (index == NULL)
765 			goto INVALID_RESPONSE;
766 		index++;
767 		strlcpy(backend_info->pg_role, index, sizeof(backend_info->pg_role));
768 
769 		index = (char *) memchr(index, '\0', len);
770 		if (index == NULL)
771 			goto INVALID_RESPONSE;
772 
773 		index++;
774 		backend_info->standby_delay = atol(index);
775 
776 		index = (char *) memchr(index, '\0', len);
777 		if (index == NULL)
778 			goto INVALID_RESPONSE;
779 
780 		index++;
781 		strlcpy(backend_info->replication_state, index, sizeof(backend_info->replication_state));
782 
783 		index = (char *) memchr(index, '\0', len);
784 		if (index == NULL)
785 			goto INVALID_RESPONSE;
786 
787 		index++;
788 		strlcpy(backend_info->replication_sync_state, index, sizeof(backend_info->replication_sync_state));
789 
790 		index = (char *) memchr(index, '\0', len);
791 		if (index == NULL)
792 			goto INVALID_RESPONSE;
793 
794 		index++;
795 		backend_info->status_changed_time = atol(index);
796 
797 		index = (char *) memchr(index, '\0', len);
798 		if (index == NULL)
799 			goto INVALID_RESPONSE;
800 
801 		if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) backend_info, sizeof(BackendInfo), NULL) < 0)
802 			goto INVALID_RESPONSE;
803 
804 		return;
805 	}
806 	else if (strcmp(buf, "CommandComplete") == 0)
807 	{
808 		setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
809 		return;
810 	}
811 
812 INVALID_RESPONSE:
813 
814 	if (backend_info)
815 		pfree(backend_info);
816 	pcp_internal_error(pcpConn,
817 					   "command failed. invalid response");
818 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
819 
820 }
821 
822 /* --------------------------------
823  * pcp_node_info - get information of node pointed by given argument
824  *
825  * return structure of node information on success, -1 otherwise
826  * --------------------------------
827  */
828 PCPResultInfo *
pcp_node_info(PCPConnInfo * pcpConn,int nid)829 pcp_node_info(PCPConnInfo * pcpConn, int nid)
830 {
831 	int			wsize;
832 	char		node_id[16];
833 
834 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
835 	{
836 		pcp_internal_error(pcpConn,
837 						   "invalid PCP connection");
838 		return NULL;
839 	}
840 
841 	snprintf(node_id, sizeof(node_id), "%d", nid);
842 
843 	pcp_write(pcpConn->pcpConn, "I", 1);
844 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
845 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
846 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
847 	if (PCPFlush(pcpConn) < 0)
848 		return NULL;
849 	if (pcpConn->Pfdebug)
850 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"I\", len=%d\n", ntohl(wsize));
851 
852 	return process_pcp_response(pcpConn, 'I');
853 }
854 
855 
856 /* --------------------------------
857  * pcp_health_check_stats - get information of health check stats pointed by given argument
858  *
859  * return structure of node information on success, -1 otherwise
860  * --------------------------------
861  */
862 PCPResultInfo *
pcp_health_check_stats(PCPConnInfo * pcpConn,int nid)863 pcp_health_check_stats(PCPConnInfo * pcpConn, int nid)
864 {
865 	int			wsize;
866 	char		node_id[16];
867 
868 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
869 	{
870 		pcp_internal_error(pcpConn,
871 						   "invalid PCP connection");
872 		return NULL;
873 	}
874 
875 	snprintf(node_id, sizeof(node_id), "%d", nid);
876 
877 	pcp_write(pcpConn->pcpConn, "H", 1);
878 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
879 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
880 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
881 	if (PCPFlush(pcpConn) < 0)
882 		return NULL;
883 	if (pcpConn->Pfdebug)
884 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"L\", len=%d\n", ntohl(wsize));
885 
886 	return process_pcp_response(pcpConn, 'H');
887 }
888 
889 PCPResultInfo *
pcp_reload_config(PCPConnInfo * pcpConn,char command_scope)890 pcp_reload_config(PCPConnInfo * pcpConn,char command_scope)
891 {
892 	int                     wsize;
893 /*
894  * pcp packet format for pcp_reload_config
895  * z[size][command_scope]
896  */
897 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
898 	{
899 	   pcp_internal_error(pcpConn, "invalid PCP connection");
900 	   return NULL;
901 	}
902 
903 	pcp_write(pcpConn->pcpConn, "Z", 1);
904 	wsize = htonl(sizeof(int) + sizeof(char));
905 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
906 	pcp_write(pcpConn->pcpConn, &command_scope, sizeof(char));
907 	if (PCPFlush(pcpConn) < 0)
908 	   return NULL;
909 	if (pcpConn->Pfdebug)
910 	   fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"Z\", len=%d\n", ntohl(wsize));
911 
912 	return process_pcp_response(pcpConn, 'Z');
913 }
914 
915 
916 /*
917  * Process health check response from PCP server.
918  * pcpConn: connection to the server
919  * buf:		returned data from server
920  * len:		length of the data
921  */
922 static void
process_health_check_stats_response(PCPConnInfo * pcpConn,char * buf,int len)923 process_health_check_stats_response
924 (PCPConnInfo * pcpConn, char *buf, int len)
925 {
926 	POOL_HEALTH_CHECK_STATS *stats;
927 	int		*offsets;
928 	int		n;
929 	int		i;
930 	char	*p;
931 	int		maxstr;
932 	char	c[] = "CommandComplete";
933 
934 	if (strcmp(buf, c) != 0)
935 	{
936 		pcp_internal_error(pcpConn,
937 						   "command failed. invalid response");
938 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
939 		return;
940 	}
941 	buf += sizeof(c);
942 
943 	/* Allocate health stats memory */
944 	stats = palloc0(sizeof(POOL_HEALTH_CHECK_STATS));
945 	p = (char *)stats;
946 
947 	/* Calculate total packet length */
948 	offsets = pool_health_check_stats_offsets(&n);
949 
950 	for (i = 0; i < n; i++)
951 	{
952 		if (i == n -1)
953 			maxstr = sizeof(POOL_HEALTH_CHECK_STATS) - offsets[i];
954 		else
955 			maxstr = offsets[i + 1] - offsets[i];
956 
957 		StrNCpy(p + offsets[i], buf, maxstr -1);
958 		buf += strlen(buf) + 1;
959 	}
960 
961 	if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) stats, sizeof(POOL_HEALTH_CHECK_STATS), NULL) < 0)
962 	{
963 		if (stats)
964 			pfree(stats);
965 		pcp_internal_error(pcpConn,
966 						   "command failed. invalid response");
967 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
968 	}
969 	else
970 		setCommandSuccessful(pcpConn);
971 
972 }
973 
974 static void
process_process_count_response(PCPConnInfo * pcpConn,char * buf,int len)975 process_process_count_response(PCPConnInfo * pcpConn, char *buf, int len)
976 {
977 	if (strcmp(buf, "CommandComplete") == 0)
978 	{
979 		int			process_count;
980 		int		   *process_list = NULL;
981 		char	   *index = NULL;
982 		int			i;
983 
984 		index = (char *) memchr(buf, '\0', len);
985 		if (index == NULL)
986 		{
987 			pcp_internal_error(pcpConn,
988 							   "command failed. invalid response");
989 			setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
990 			return;
991 		}
992 		index += 1;
993 		process_count = atoi(index);
994 
995 		process_list = (int *) palloc(sizeof(int) * process_count);
996 
997 		for (i = 0; i < process_count; i++)
998 		{
999 			index = (char *) memchr(index, '\0', len);
1000 			if (index == NULL)
1001 			{
1002 				pcp_internal_error(pcpConn,
1003 								   "command failed. invalid response");
1004 				setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1005 				pfree(process_list);
1006 				return;
1007 			}
1008 			index += 1;
1009 			process_list[i] = atoi(index);
1010 		}
1011 		setResultSlotCount(pcpConn, 1);
1012 		if (setNextResultBinaryData(pcpConn->pcpResInfo, process_list, (sizeof(int) * process_count), NULL) < 0)
1013 		{
1014 			pcp_internal_error(pcpConn,
1015 							   "command failed. invalid response");
1016 			setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1017 		}
1018 		else
1019 		{
1020 			setCommandSuccessful(pcpConn);
1021 		}
1022 	}
1023 	else
1024 	{
1025 		pcp_internal_error(pcpConn,
1026 						   "command failed with reason: \"%s\"", buf);
1027 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1028 	}
1029 }
1030 
1031 /* --------------------------------
1032  * pcp_node_count - get number of nodes currently connected to pgpool
1033  *
1034  * return array of pids on success, NULL otherwise
1035  * --------------------------------
1036  */
1037 
1038 PCPResultInfo *
pcp_process_count(PCPConnInfo * pcpConn)1039 pcp_process_count(PCPConnInfo * pcpConn)
1040 {
1041 	int			wsize;
1042 
1043 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1044 	{
1045 		pcp_internal_error(pcpConn, "invalid PCP connection");
1046 		return NULL;
1047 	}
1048 
1049 	pcp_write(pcpConn->pcpConn, "N", 1);
1050 	wsize = htonl(sizeof(int));
1051 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1052 	if (PCPFlush(pcpConn) < 0)
1053 		return NULL;
1054 	if (pcpConn->Pfdebug)
1055 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"N\", len=%d\n", ntohl(wsize));
1056 
1057 	return process_pcp_response(pcpConn, 'N');
1058 }
1059 
1060 static void
process_process_info_response(PCPConnInfo * pcpConn,char * buf,int len)1061 process_process_info_response(PCPConnInfo * pcpConn, char *buf, int len)
1062 {
1063 	char	   *index;
1064 	int			*offsets;
1065 	int			i, n;
1066 	int			maxstr;
1067 	char		*p;
1068 	POOL_REPORT_POOLS	*pools = NULL;
1069 
1070 	offsets = pool_report_pools_offsets(&n);
1071 
1072 	if (strcmp(buf, "ArraySize") == 0)
1073 	{
1074 		int			ci_size;
1075 
1076 		index = (char *) memchr(buf, '\0', len);
1077 		if (index == NULL)
1078 			goto INVALID_RESPONSE;
1079 		index += 1;
1080 		ci_size = atoi(index);
1081 
1082 		setResultStatus(pcpConn, PCP_RES_INCOMPLETE);
1083 		setResultSlotCount(pcpConn, ci_size);
1084 		pcpConn->pcpResInfo->nextFillSlot = 0;
1085 		return;
1086 	}
1087 	else if (strcmp(buf, "ProcessInfo") == 0)
1088 	{
1089 		if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
1090 			goto INVALID_RESPONSE;
1091 
1092 		pools = palloc0(sizeof(POOL_REPORT_POOLS));
1093 		p = (char *)pools;
1094 		buf += strlen(buf) + 1;
1095 
1096 		for (i = 0; i < n; i++)
1097 		{
1098 			if (i == n -1)
1099 				maxstr = sizeof(POOL_REPORT_POOLS) - offsets[i];
1100 			else
1101 				maxstr = offsets[i + 1] - offsets[i];
1102 
1103 			StrNCpy(p + offsets[i], buf, maxstr -1);
1104 			buf += strlen(buf) + 1;
1105 		}
1106 
1107 		if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) pools, sizeof(POOL_REPORT_POOLS), NULL) < 0)
1108 			goto INVALID_RESPONSE;
1109 
1110 		return;
1111 	}
1112 
1113 	else if (strcmp(buf, "CommandComplete") == 0)
1114 	{
1115 		setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1116 		return;
1117 	}
1118 
1119 INVALID_RESPONSE:
1120 
1121 	if (pools)
1122 	{
1123 		pfree(pools);
1124 	}
1125 	pcp_internal_error(pcpConn,
1126 					   "command failed. invalid response");
1127 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1128 }
1129 
1130 /* --------------------------------
1131  * pcp_process_info - get information of node pointed by given argument
1132  *
1133  * return structure of process information on success, -1 otherwise
1134  * --------------------------------
1135  */
1136 PCPResultInfo *
pcp_process_info(PCPConnInfo * pcpConn,int pid)1137 pcp_process_info(PCPConnInfo * pcpConn, int pid)
1138 {
1139 	int			wsize;
1140 	char		process_id[16];
1141 
1142 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1143 	{
1144 		pcp_internal_error(pcpConn, "invalid PCP connection");
1145 		return NULL;
1146 	}
1147 
1148 	snprintf(process_id, sizeof(process_id), "%d", pid);
1149 
1150 	pcp_write(pcpConn->pcpConn, "P", 1);
1151 	wsize = htonl(strlen(process_id) + 1 + sizeof(int));
1152 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1153 	pcp_write(pcpConn->pcpConn, process_id, strlen(process_id) + 1);
1154 	if (PCPFlush(pcpConn) < 0)
1155 		return NULL;
1156 	if (pcpConn->Pfdebug)
1157 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"P\", len=%d\n", ntohl(wsize));
1158 
1159 	return process_pcp_response(pcpConn, 'P');
1160 }
1161 
1162 /* --------------------------------
1163  * pcp_detach_node - detach a node given by the argument from pgpool's control
1164  *
1165  * return 0 on success, -1 otherwise
1166  * --------------------------------
1167  */
1168 PCPResultInfo *
pcp_detach_node(PCPConnInfo * pcpConn,int nid)1169 pcp_detach_node(PCPConnInfo * pcpConn, int nid)
1170 {
1171 	return _pcp_detach_node(pcpConn, nid, FALSE);
1172 }
1173 
1174 /* --------------------------------
1175 
1176  * and detach a node given by the argument from pgpool's control
1177  *
1178  * return 0 on success, -1 otherwise
1179  * --------------------------------
1180  */
1181 PCPResultInfo *
pcp_detach_node_gracefully(PCPConnInfo * pcpConn,int nid)1182 pcp_detach_node_gracefully(PCPConnInfo * pcpConn, int nid)
1183 {
1184 	return _pcp_detach_node(pcpConn, nid, TRUE);
1185 }
1186 
1187 static PCPResultInfo *
_pcp_detach_node(PCPConnInfo * pcpConn,int nid,bool gracefully)1188 _pcp_detach_node(PCPConnInfo * pcpConn, int nid, bool gracefully)
1189 {
1190 	int			wsize;
1191 	char		node_id[16];
1192 	char	   *sendchar;
1193 
1194 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1195 	{
1196 		pcp_internal_error(pcpConn, "invalid PCP connection");
1197 		return NULL;
1198 	}
1199 
1200 	snprintf(node_id, sizeof(node_id), "%d", nid);
1201 
1202 	if (gracefully)
1203 		sendchar = "d";
1204 	else
1205 		sendchar = "D";
1206 
1207 	pcp_write(pcpConn->pcpConn, sendchar, 1);
1208 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1209 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1210 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1211 	if (PCPFlush(pcpConn) < 0)
1212 		return NULL;
1213 	if (pcpConn->Pfdebug)
1214 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"D\", len=%d\n", ntohl(wsize));
1215 
1216 	return process_pcp_response(pcpConn, 'D');
1217 }
1218 
1219 static void
process_command_complete_response(PCPConnInfo * pcpConn,char * buf,int len)1220 process_command_complete_response(PCPConnInfo * pcpConn, char *buf, int len)
1221 {
1222 	if (strcmp(buf, "CommandComplete") == 0)
1223 	{
1224 		setCommandSuccessful(pcpConn);
1225 	}
1226 	else
1227 	{
1228 		pcp_internal_error(pcpConn,
1229 						   "command failed with reason: \"%s\"", buf);
1230 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1231 	}
1232 }
1233 
1234 /* --------------------------------
1235  * pcp_attach_node - attach a node given by the argument from pgpool's control
1236  *
1237  * return 0 on success, -1 otherwise
1238  * --------------------------------
1239  */
1240 PCPResultInfo *
pcp_attach_node(PCPConnInfo * pcpConn,int nid)1241 pcp_attach_node(PCPConnInfo * pcpConn, int nid)
1242 {
1243 	int			wsize;
1244 	char		node_id[16];
1245 
1246 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1247 	{
1248 		pcp_internal_error(pcpConn, "invalid PCP connection");
1249 		return NULL;
1250 	}
1251 
1252 	snprintf(node_id, sizeof(node_id), "%d", nid);
1253 
1254 	pcp_write(pcpConn->pcpConn, "C", 1);
1255 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1256 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1257 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1258 	if (PCPFlush(pcpConn) < 0)
1259 		return NULL;
1260 	if (pcpConn->Pfdebug)
1261 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"C\", len=%d\n", ntohl(wsize));
1262 
1263 	return process_pcp_response(pcpConn, 'C');
1264 }
1265 
1266 
1267 /* --------------------------------
1268  * pcp_pool_status - return setup parameters and status
1269  *
1270  * returns and array of POOL_REPORT_CONFIG, NULL otherwise
1271  * --------------------------------
1272  */
1273 static void
process_pool_status_response(PCPConnInfo * pcpConn,char * buf,int len)1274 process_pool_status_response(PCPConnInfo * pcpConn, char *buf, int len)
1275 {
1276 	char	   *index;
1277 	POOL_REPORT_CONFIG *status = NULL;
1278 
1279 	if (strcmp(buf, "ArraySize") == 0)
1280 	{
1281 		int			ci_size;
1282 
1283 		index = (char *) memchr(buf, '\0', len) + 1;
1284 		ci_size = ntohl(*((int *) index));
1285 
1286 		setResultStatus(pcpConn, PCP_RES_INCOMPLETE);
1287 		setResultSlotCount(pcpConn, ci_size);
1288 		pcpConn->pcpResInfo->nextFillSlot = 0;
1289 		return;
1290 	}
1291 	else if (strcmp(buf, "ProcessConfig") == 0)
1292 	{
1293 		if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
1294 			goto INVALID_RESPONSE;
1295 
1296 		status = palloc(sizeof(POOL_REPORT_CONFIG));
1297 
1298 		index = (char *) memchr(buf, '\0', len);
1299 		if (index == NULL)
1300 			goto INVALID_RESPONSE;
1301 		index += 1;
1302 		strlcpy(status->name, index, POOLCONFIG_MAXNAMELEN + 1);
1303 
1304 		index = (char *) memchr(index, '\0', len);
1305 		if (index == NULL)
1306 			goto INVALID_RESPONSE;
1307 		index += 1;
1308 		strlcpy(status->value, index, POOLCONFIG_MAXVALLEN + 1);
1309 
1310 		index = (char *) memchr(index, '\0', len);
1311 		if (index == NULL)
1312 			goto INVALID_RESPONSE;
1313 		index += 1;
1314 		strlcpy(status->desc, index, POOLCONFIG_MAXDESCLEN + 1);
1315 
1316 		if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) status, sizeof(POOL_REPORT_CONFIG), NULL) < 0)
1317 			goto INVALID_RESPONSE;
1318 		return;
1319 	}
1320 	else if (strcmp(buf, "CommandComplete") == 0)
1321 	{
1322 		setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1323 		return;
1324 	}
1325 
1326 INVALID_RESPONSE:
1327 
1328 	if (status)
1329 		pfree(status);
1330 	pcp_internal_error(pcpConn,
1331 					   "command failed. invalid response");
1332 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1333 }
1334 
1335 PCPResultInfo *
pcp_pool_status(PCPConnInfo * pcpConn)1336 pcp_pool_status(PCPConnInfo * pcpConn)
1337 {
1338 	int			wsize;
1339 
1340 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1341 	{
1342 		pcp_internal_error(pcpConn, "invalid PCP connection");
1343 		return NULL;
1344 	}
1345 
1346 	pcp_write(pcpConn->pcpConn, "B", 1);
1347 	wsize = htonl(sizeof(int));
1348 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1349 	if (PCPFlush(pcpConn) < 0)
1350 		return NULL;
1351 	if (pcpConn->Pfdebug)
1352 		fprintf(pcpConn->Pfdebug, "DEBUG pcp_pool_status: send: tos=\"B\", len=%d\n", ntohl(wsize));
1353 	return process_pcp_response(pcpConn, 'B');
1354 }
1355 
1356 
1357 PCPResultInfo *
pcp_recovery_node(PCPConnInfo * pcpConn,int nid)1358 pcp_recovery_node(PCPConnInfo * pcpConn, int nid)
1359 {
1360 	int			wsize;
1361 	char		node_id[16];
1362 
1363 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1364 	{
1365 		pcp_internal_error(pcpConn, "invalid PCP connection");
1366 		return NULL;
1367 	}
1368 
1369 	snprintf(node_id, sizeof(node_id), "%d", nid);
1370 
1371 	pcp_write(pcpConn->pcpConn, "O", 1);
1372 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1373 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1374 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1375 	if (PCPFlush(pcpConn) < 0)
1376 		return NULL;
1377 	if (pcpConn->Pfdebug)
1378 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"D\", len=%d\n", ntohl(wsize));
1379 
1380 	return process_pcp_response(pcpConn, 'O');
1381 }
1382 
1383 /* --------------------------------
1384  * pcp_promote_node - promote a node given by the argument as new pgpool's main node
1385  *
1386  * return 0 on success, -1 otherwise
1387  * --------------------------------
1388  */
1389 PCPResultInfo *
pcp_promote_node(PCPConnInfo * pcpConn,int nid,bool promote)1390 pcp_promote_node(PCPConnInfo * pcpConn, int nid, bool promote)
1391 {
1392 	return _pcp_promote_node(pcpConn, nid, FALSE, promote);
1393 }
1394 
1395 /* --------------------------------
1396 
1397  * and promote a node given by the argument as new pgpool's main node
1398  *
1399  * return 0 on success, -1 otherwise
1400  * --------------------------------
1401  */
1402 PCPResultInfo *
pcp_promote_node_gracefully(PCPConnInfo * pcpConn,int nid,bool switchover)1403 pcp_promote_node_gracefully(PCPConnInfo * pcpConn, int nid, bool switchover)
1404 {
1405 	return _pcp_promote_node(pcpConn, nid, TRUE, switchover);
1406 }
1407 
1408 static PCPResultInfo *
_pcp_promote_node(PCPConnInfo * pcpConn,int nid,bool gracefully,bool switchover)1409 _pcp_promote_node(PCPConnInfo * pcpConn, int nid, bool gracefully, bool switchover)
1410 {
1411 	int			wsize;
1412 	char		node_id[16];
1413 	char	   *sendchar;
1414 	char		*switchover_option;	/* n: just change node status, s: switchover primary */
1415 
1416 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1417 	{
1418 		pcp_internal_error(pcpConn, "invalid PCP connection");
1419 		return NULL;
1420 	}
1421 
1422 	snprintf(node_id, sizeof(node_id), "%d ", nid);
1423 
1424 	if (gracefully)
1425 		sendchar = "j";
1426 	else
1427 		sendchar = "J";
1428 
1429 	if (switchover)
1430 		switchover_option = "s";
1431 	else
1432 		switchover_option = "n";
1433 
1434 	pcp_write(pcpConn->pcpConn, sendchar, 1);
1435 
1436 	/* caluculate send buffer size */
1437 	wsize = sizeof(char);	/* protocol. 'j' or 'J' */
1438 	wsize += strlen(node_id);	/* node id + space */
1439 	wsize += sizeof(char);	/* promote option */
1440 	wsize += sizeof(int);	/* buffer length */
1441 	wsize = htonl(wsize);
1442 
1443 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1444 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1445 	pcp_write(pcpConn->pcpConn, switchover_option, 1);
1446 
1447 	if (PCPFlush(pcpConn) < 0)
1448 		return NULL;
1449 	if (pcpConn->Pfdebug)
1450 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"E\", len=%d\n", ntohl(wsize));
1451 
1452 	return process_pcp_response(pcpConn, 'J');
1453 }
1454 
1455 static void
process_watchdog_info_response(PCPConnInfo * pcpConn,char * buf,int len)1456 process_watchdog_info_response(PCPConnInfo * pcpConn, char *buf, int len)
1457 {
1458 	char	   *json_data = NULL;
1459 	PCPWDClusterInfo *wd_cluster_info = NULL;
1460 	int			clusterDataSize = 0;
1461 
1462 	if (strcmp(buf, "CommandComplete") == 0)
1463 	{
1464 		int			tempVal;
1465 		char	   *ptr;
1466 
1467 		json_data = (char *) memchr(buf, '\0', len);
1468 		if (json_data == NULL)
1469 			goto INVALID_RESPONSE;
1470 		json_data += 1;
1471 
1472 		json_value *root;
1473 		json_value *value;
1474 		int			i,
1475 					nodeCount;
1476 
1477 		root = json_parse(json_data, len);
1478 
1479 		/* The root node must be object */
1480 		if (root == NULL || root->type != json_object)
1481 		{
1482 			json_value_free(root);
1483 			goto INVALID_RESPONSE;
1484 		}
1485 
1486 		if (json_get_int_value_for_key(root, "NodeCount", &nodeCount))
1487 		{
1488 			json_value_free(root);
1489 			goto INVALID_RESPONSE;
1490 		}
1491 
1492 		/* find the WatchdogNodes array */
1493 		value = json_get_value_for_key(root, "WatchdogNodes");
1494 		if (value == NULL)
1495 		{
1496 			json_value_free(root);
1497 			goto INVALID_RESPONSE;
1498 		}
1499 		if (value->type != json_array)
1500 		{
1501 			json_value_free(root);
1502 			goto INVALID_RESPONSE;
1503 		}
1504 		if (nodeCount != value->u.array.length)
1505 		{
1506 			json_value_free(root);
1507 			goto INVALID_RESPONSE;
1508 		}
1509 
1510 		/* create the cluster object */
1511 		clusterDataSize = sizeof(PCPWDClusterInfo) + (sizeof(PCPWDNodeInfo) * nodeCount);
1512 		wd_cluster_info = malloc(clusterDataSize);
1513 
1514 		wd_cluster_info->nodeCount = nodeCount;
1515 
1516 		if (json_get_int_value_for_key(root, "RemoteNodeCount", &wd_cluster_info->remoteNodeCount))
1517 		{
1518 			json_value_free(root);
1519 			goto INVALID_RESPONSE;
1520 		}
1521 		if (json_get_int_value_for_key(root, "MemberRemoteNodeCount", &wd_cluster_info->memberRemoteNodeCount))
1522 		{
1523 			wd_cluster_info->memberRemoteNodeCount = -1;
1524 		}
1525 		if (json_get_int_value_for_key(root, "NodesRequireForQuorum", &wd_cluster_info->nodesRequiredForQuorum))
1526 		{
1527 			wd_cluster_info->nodesRequiredForQuorum = -1;
1528 		}
1529 
1530 		if (json_get_int_value_for_key(root, "QuorumStatus", &wd_cluster_info->quorumStatus))
1531 		{
1532 			json_value_free(root);
1533 			goto INVALID_RESPONSE;
1534 		}
1535 		if (json_get_int_value_for_key(root, "AliveNodeCount", &wd_cluster_info->aliveNodeCount))
1536 		{
1537 			json_value_free(root);
1538 			goto INVALID_RESPONSE;
1539 		}
1540 		if (json_get_int_value_for_key(root, "Escalated", &tempVal))
1541 		{
1542 			json_value_free(root);
1543 			goto INVALID_RESPONSE;
1544 		}
1545 		wd_cluster_info->escalated = tempVal == 0 ? false : true;
1546 
1547 		ptr = json_get_string_value_for_key(root, "LeaderNodeName");
1548 		if (ptr == NULL)
1549 		{
1550 			json_value_free(root);
1551 			goto INVALID_RESPONSE;
1552 		}
1553 		strncpy(wd_cluster_info->leaderNodeName, ptr, sizeof(wd_cluster_info->leaderNodeName) - 1);
1554 
1555 		ptr = json_get_string_value_for_key(root, "LeaderHostName");
1556 		if (ptr == NULL)
1557 		{
1558 			json_value_free(root);
1559 			goto INVALID_RESPONSE;
1560 		}
1561 		strncpy(wd_cluster_info->leaderHostName, ptr, sizeof(wd_cluster_info->leaderHostName) - 1);
1562 
1563 		/* Get watchdog nodes data */
1564 		for (i = 0; i < nodeCount; i++)
1565 		{
1566 			char	   *ptr;
1567 			json_value *nodeInfoValue = value->u.array.values[i];
1568 			PCPWDNodeInfo *wdNodeInfo = &wd_cluster_info->nodeList[i];
1569 
1570 			if (nodeInfoValue->type != json_object)
1571 			{
1572 				json_value_free(root);
1573 				goto INVALID_RESPONSE;
1574 			}
1575 
1576 			if (json_get_int_value_for_key(nodeInfoValue, "ID", &wdNodeInfo->id))
1577 			{
1578 				json_value_free(root);
1579 				goto INVALID_RESPONSE;
1580 			}
1581 
1582 			ptr = json_get_string_value_for_key(nodeInfoValue, "NodeName");
1583 			if (ptr == NULL)
1584 			{
1585 				json_value_free(root);
1586 				goto INVALID_RESPONSE;
1587 			}
1588 			strncpy(wdNodeInfo->nodeName, ptr, sizeof(wdNodeInfo->nodeName) - 1);
1589 
1590 			ptr = json_get_string_value_for_key(nodeInfoValue, "HostName");
1591 			if (ptr == NULL)
1592 			{
1593 				json_value_free(root);
1594 				goto INVALID_RESPONSE;
1595 			}
1596 			strncpy(wdNodeInfo->hostName, ptr, sizeof(wdNodeInfo->hostName) - 1);
1597 
1598 			ptr = json_get_string_value_for_key(nodeInfoValue, "DelegateIP");
1599 			if (ptr == NULL)
1600 			{
1601 				json_value_free(root);
1602 				goto INVALID_RESPONSE;
1603 			}
1604 			strncpy(wdNodeInfo->delegate_ip, ptr, sizeof(wdNodeInfo->delegate_ip) - 1);
1605 
1606 			if (json_get_int_value_for_key(nodeInfoValue, "Membership", &wdNodeInfo->membership_status))
1607 			{
1608 				/* would be from the older version. No need to panic */
1609 				wdNodeInfo->membership_status = 0;
1610 			}
1611 
1612 			ptr = json_get_string_value_for_key(nodeInfoValue, "MembershipString");
1613 			if (ptr == NULL)
1614 			{
1615 				strncpy(wdNodeInfo->membership_status_string, "NOT-Available",
1616 						sizeof(wdNodeInfo->membership_status_string) - 1);
1617 			}
1618 			else
1619 				strncpy(wdNodeInfo->membership_status_string, ptr,
1620 						sizeof(wdNodeInfo->membership_status_string) - 1);
1621 
1622 			if (json_get_int_value_for_key(nodeInfoValue, "WdPort", &wdNodeInfo->wd_port))
1623 			{
1624 				json_value_free(root);
1625 				goto INVALID_RESPONSE;
1626 			}
1627 
1628 			if (json_get_int_value_for_key(nodeInfoValue, "PgpoolPort", &wdNodeInfo->pgpool_port))
1629 			{
1630 				json_value_free(root);
1631 				goto INVALID_RESPONSE;
1632 			}
1633 
1634 			if (json_get_int_value_for_key(nodeInfoValue, "State", &wdNodeInfo->state))
1635 			{
1636 				json_value_free(root);
1637 				goto INVALID_RESPONSE;
1638 			}
1639 
1640 			ptr = json_get_string_value_for_key(nodeInfoValue, "StateName");
1641 			if (ptr == NULL)
1642 			{
1643 				json_value_free(root);
1644 				goto INVALID_RESPONSE;
1645 			}
1646 			strncpy(wdNodeInfo->stateName, ptr, sizeof(wdNodeInfo->stateName) - 1);
1647 
1648 			if (json_get_int_value_for_key(nodeInfoValue, "Priority", &wdNodeInfo->wd_priority))
1649 			{
1650 				json_value_free(root);
1651 				goto INVALID_RESPONSE;
1652 			}
1653 
1654 		}
1655 		json_value_free(root);
1656 
1657 		if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) wd_cluster_info, clusterDataSize, NULL) < 0)
1658 			goto INVALID_RESPONSE;
1659 
1660 		setCommandSuccessful(pcpConn);
1661 	}
1662 	else
1663 	{
1664 		pcp_internal_error(pcpConn,
1665 						   "command failed with reason: \"%s\"\n", buf);
1666 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1667 	}
1668 	return;
1669 
1670 INVALID_RESPONSE:
1671 
1672 	if (wd_cluster_info)
1673 		pfree(wd_cluster_info);
1674 	pcp_internal_error(pcpConn,
1675 					   "command failed. invalid response\n");
1676 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1677 }
1678 
1679 /* --------------------------------
1680  * pcp_watchdog_info - get information of watchdog
1681  *
1682  * return structure of watchdog information on success, -1 otherwise
1683  * --------------------------------
1684  */
1685 PCPResultInfo *
pcp_watchdog_info(PCPConnInfo * pcpConn,int nid)1686 pcp_watchdog_info(PCPConnInfo * pcpConn, int nid)
1687 {
1688 	int			wsize;
1689 	char		wd_index[16];
1690 
1691 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1692 	{
1693 		pcp_internal_error(pcpConn, "invalid PCP connection");
1694 		return NULL;
1695 	}
1696 
1697 	snprintf(wd_index, sizeof(wd_index), "%d", nid);
1698 
1699 	pcp_write(pcpConn->pcpConn, "W", 1);
1700 	wsize = htonl(strlen(wd_index) + 1 + sizeof(int));
1701 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1702 	pcp_write(pcpConn->pcpConn, wd_index, strlen(wd_index) + 1);
1703 	if (PCPFlush(pcpConn) < 0)
1704 		return NULL;
1705 	if (pcpConn->Pfdebug)
1706 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"W\", len=%d\n", ntohl(wsize));
1707 
1708 	return process_pcp_response(pcpConn, 'W');
1709 }
1710 
1711 PCPResultInfo *
pcp_set_backend_parameter(PCPConnInfo * pcpConn,char * parameter_name,char * value)1712 pcp_set_backend_parameter(PCPConnInfo * pcpConn, char *parameter_name, char *value)
1713 {
1714 	int			wsize;
1715 	char		null_chr = 0;
1716 
1717 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1718 	{
1719 		pcp_internal_error(pcpConn, "invalid PCP connection");
1720 		return NULL;
1721 	}
1722 	if (pcpConn->Pfdebug)
1723 		fprintf(pcpConn->Pfdebug, "DEBUG: seting: \"%s = %s\"\n", parameter_name, value);
1724 
1725 	pcp_write(pcpConn->pcpConn, "A", 1);
1726 	wsize = htonl(strlen(parameter_name) + 1 +
1727 				  strlen(value) + 1 +
1728 				  sizeof(int));
1729 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1730 	pcp_write(pcpConn->pcpConn, parameter_name, strlen(parameter_name));
1731 	pcp_write(pcpConn->pcpConn, &null_chr, 1);
1732 	pcp_write(pcpConn->pcpConn, value, strlen(value));
1733 	pcp_write(pcpConn->pcpConn, &null_chr, 1);
1734 	if (PCPFlush(pcpConn) < 0)
1735 		return NULL;
1736 	if (pcpConn->Pfdebug)
1737 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"A\", len=%d\n", ntohl(wsize));
1738 
1739 	return process_pcp_response(pcpConn, 'A');
1740 }
1741 
1742 /*
1743  * pcpAddInternalNotice - produce an internally-generated notice message
1744  *
1745  * A format string and optional arguments can be passed.
1746  *
1747  * The supplied text is taken as primary message (ie., it should not include
1748  * a trailing newline, and should not be more than one line).
1749  */
1750 static void
pcp_internal_error(PCPConnInfo * pcpConn,const char * fmt,...)1751 pcp_internal_error(PCPConnInfo * pcpConn, const char *fmt,...)
1752 {
1753 	char		msgBuf[1024];
1754 	va_list		args;
1755 
1756 	if (pcpConn == NULL)
1757 		return;					/* nobody home to receive notice? */
1758 
1759 	/* Format the message */
1760 	va_start(args, fmt);
1761 	vsnprintf(msgBuf, sizeof(msgBuf), fmt, args);
1762 	va_end(args);
1763 	msgBuf[sizeof(msgBuf) - 1] = '\0';	/* make real sure it's terminated */
1764 	if (pcpConn->errMsg)
1765 		pfree(pcpConn->errMsg);
1766 	pcpConn->errMsg = pstrdup(msgBuf);
1767 }
1768 
1769 ConnStateType
PCPConnectionStatus(const PCPConnInfo * conn)1770 PCPConnectionStatus(const PCPConnInfo * conn)
1771 {
1772 	if (!conn)
1773 		return PCP_CONNECTION_BAD;
1774 	return conn->connState;
1775 }
1776 
1777 ResultStateType
PCPResultStatus(const PCPResultInfo * res)1778 PCPResultStatus(const PCPResultInfo * res)
1779 {
1780 	if (!res)
1781 		return PCP_RES_ERROR;
1782 	return res->resultStatus;
1783 }
1784 
1785 static void
setResultStatus(PCPConnInfo * pcpConn,ResultStateType resultState)1786 setResultStatus(PCPConnInfo * pcpConn, ResultStateType resultState)
1787 {
1788 	if (pcpConn && pcpConn->pcpResInfo)
1789 		pcpConn->pcpResInfo->resultStatus = resultState;
1790 }
1791 
1792 static void
setCommandSuccessful(PCPConnInfo * pcpConn)1793 setCommandSuccessful(PCPConnInfo * pcpConn)
1794 {
1795 	setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1796 }
1797 
1798 static void
setResultSlotCount(PCPConnInfo * pcpConn,unsigned int slotCount)1799 setResultSlotCount(PCPConnInfo * pcpConn, unsigned int slotCount)
1800 {
1801 	if (pcpConn && pcpConn->pcpResInfo && slotCount > 0)
1802 	{
1803 		if (pcpConn->pcpResInfo->resultSlots == 0)
1804 			pcpConn->pcpResInfo->resultSlots = 1;
1805 
1806 		if (slotCount > pcpConn->pcpResInfo->resultSlots)
1807 		{
1808 			pcpConn->pcpResInfo = repalloc(pcpConn->pcpResInfo, sizeof(PCPResultInfo) + (sizeof(PCPResultSlot) * (slotCount - 1)));
1809 		}
1810 		pcpConn->pcpResInfo->resultSlots = slotCount;
1811 	}
1812 }
1813 
1814 static void
setResultIntData(PCPResultInfo * res,unsigned int slotno,int value)1815 setResultIntData(PCPResultInfo * res, unsigned int slotno, int value)
1816 {
1817 	if (res)
1818 	{
1819 		res->resultSlot[slotno].datalen = 0;
1820 		res->resultSlot[slotno].isint = 1;
1821 		res->resultSlot[slotno].data.integer = value;
1822 		res->resultSlot[slotno].free_func = NULL;
1823 	}
1824 }
1825 
1826 static void
setResultBinaryData(PCPResultInfo * res,unsigned int slotno,void * value,int datalen,void (* free_func)(struct PCPConnInfo *,void *))1827 setResultBinaryData(PCPResultInfo * res, unsigned int slotno, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *))
1828 {
1829 	if (res)
1830 	{
1831 		res->resultSlot[slotno].datalen = datalen;
1832 		res->resultSlot[slotno].isint = 0;
1833 		res->resultSlot[slotno].data.ptr = value;
1834 		res->resultSlot[slotno].free_func = free_func;
1835 	}
1836 }
1837 
1838 static int
setNextResultBinaryData(PCPResultInfo * res,void * value,int datalen,void (* free_func)(struct PCPConnInfo *,void *))1839 setNextResultBinaryData(PCPResultInfo * res, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *))
1840 {
1841 	if (res && res->nextFillSlot < res->resultSlots)
1842 	{
1843 		setResultBinaryData(res, res->nextFillSlot, value, datalen, free_func);
1844 		res->nextFillSlot++;
1845 		return res->nextFillSlot;
1846 	}
1847 	return -1;
1848 }
1849 
1850 static int
PCPFlush(PCPConnInfo * pcpConn)1851 PCPFlush(PCPConnInfo * pcpConn)
1852 {
1853 	int			ret = pcp_flush(pcpConn->pcpConn);
1854 
1855 	if (ret)
1856 		pcp_internal_error(pcpConn,
1857 						   "ERROR: sending data to backend failed with error \"%s\"", strerror(errno));
1858 	return ret;
1859 }
1860 
1861 int
pcp_result_slot_count(PCPResultInfo * res)1862 pcp_result_slot_count(PCPResultInfo * res)
1863 {
1864 	if (res)
1865 		return res->resultSlots;
1866 	return 0;
1867 }
1868 
1869 /* Returns 1 if ResultInfo has no data. 0 otherwise */
1870 int
pcp_result_is_empty(PCPResultInfo * res)1871 pcp_result_is_empty(PCPResultInfo * res)
1872 {
1873 	if (res)
1874 	{
1875 		if (res->resultSlots <= 1 && res->resultSlot[0].isint == 0 && res->resultSlot[0].datalen <= 0)
1876 			return 1;
1877 		return 0;
1878 	}
1879 	return 1;
1880 }
1881 
1882 void *
pcp_get_binary_data(const PCPResultInfo * res,unsigned int slotno)1883 pcp_get_binary_data(const PCPResultInfo * res, unsigned int slotno)
1884 {
1885 	if (res && slotno < res->resultSlots && !res->resultSlot[slotno].isint)
1886 	{
1887 		return res->resultSlot[slotno].data.ptr;
1888 	}
1889 	return NULL;
1890 }
1891 
1892 int
pcp_get_int_data(const PCPResultInfo * res,unsigned int slotno)1893 pcp_get_int_data(const PCPResultInfo * res, unsigned int slotno)
1894 {
1895 	if (res && slotno < res->resultSlots && res->resultSlot[slotno].isint)
1896 	{
1897 		return res->resultSlot[slotno].data.integer;
1898 	}
1899 	return 0;
1900 }
1901 
1902 int
pcp_get_data_length(const PCPResultInfo * res,unsigned int slotno)1903 pcp_get_data_length(const PCPResultInfo * res, unsigned int slotno)
1904 {
1905 	if (res && slotno < res->resultSlots)
1906 	{
1907 		return res->resultSlot[slotno].datalen;
1908 	}
1909 	return 0;
1910 }
1911 
1912 void
pcp_free_result(PCPConnInfo * pcpConn)1913 pcp_free_result(PCPConnInfo * pcpConn)
1914 {
1915 	if (pcpConn && pcpConn->pcpResInfo)
1916 	{
1917 		PCPResultInfo *pcpRes = pcpConn->pcpResInfo;
1918 		int			i;
1919 
1920 		for (i = 0; i < pcpRes->resultSlots; i++)
1921 		{
1922 			if (pcpRes->resultSlot[i].isint)
1923 				continue;
1924 			if (pcpRes->resultSlot[i].data.ptr == NULL)
1925 				continue;
1926 
1927 			if (pcpRes->resultSlot[i].free_func)
1928 				pcpRes->resultSlot[i].free_func(pcpConn, pcpRes->resultSlot[i].data.ptr);
1929 			else
1930 				pfree(pcpRes->resultSlot[i].data.ptr);
1931 			pcpRes->resultSlot[i].data.ptr = NULL;
1932 		}
1933 		pfree(pcpConn->pcpResInfo);
1934 		pcpConn->pcpResInfo = NULL;
1935 	}
1936 }
1937 
1938 void
pcp_free_connection(PCPConnInfo * pcpConn)1939 pcp_free_connection(PCPConnInfo * pcpConn)
1940 {
1941 	if (pcpConn)
1942 	{
1943 		pcp_free_result(pcpConn);
1944 		if (pcpConn->errMsg)
1945 			pfree(pcpConn->errMsg);
1946 		/* Should we also Disconnect it? */
1947 		pfree(pcpConn);
1948 	}
1949 }
1950 
1951 char *
pcp_get_last_error(PCPConnInfo * pcpConn)1952 pcp_get_last_error(PCPConnInfo * pcpConn)
1953 {
1954 	if (pcpConn)
1955 		return pcpConn->errMsg;
1956 	return NULL;
1957 }
1958 
1959 /*
1960  * get the password file name which could be either pointed by PCPPASSFILE
1961  * environment variable or resides in user home directory.
1962  */
1963 static bool
getPoolPassFilename(char * pgpassfile)1964 getPoolPassFilename(char *pgpassfile)
1965 {
1966 	char	   *passfile_env;
1967 
1968 	if ((passfile_env = getenv("PCPPASSFILE")) != NULL)
1969 	{
1970 		/* use the literal path from the environment, if set */
1971 		strlcpy(pgpassfile, passfile_env, MAXPGPATH);
1972 	}
1973 	else
1974 	{
1975 		char		homedir[MAXPGPATH];
1976 
1977 		if (!get_home_directory(homedir, sizeof(homedir)))
1978 			return false;
1979 		snprintf(pgpassfile, MAXPGPATH + sizeof(PCPPASSFILE) + 1, "%s/%s", homedir, PCPPASSFILE);
1980 	}
1981 	return true;
1982 }
1983 
1984 /*
1985  * Get a password from the password file. Return value is malloc'd.
1986  * format = hostname:port:username:password
1987  */
1988 static char *
PasswordFromFile(PCPConnInfo * pcpConn,char * hostname,char * port,char * username)1989 PasswordFromFile(PCPConnInfo * pcpConn, char *hostname, char *port, char *username)
1990 {
1991 	FILE	   *fp;
1992 	char		pgpassfile[MAXPGPATH + sizeof(PCPPASSFILE) + 1];
1993 	struct stat stat_buf;
1994 #define LINELEN NAMEDATALEN*5
1995 	char		buf[LINELEN];
1996 
1997 	if (username == NULL || strlen(username) == 0)
1998 		return NULL;
1999 
2000 	if (hostname == NULL)
2001 		hostname = DefaultHost;
2002 	else if (strcmp(hostname, UNIX_DOMAIN_PATH) == 0)
2003 		hostname = DefaultHost;
2004 
2005 	if (!getPoolPassFilename(pgpassfile))
2006 		return NULL;
2007 
2008 	/* If password file cannot be opened, ignore it. */
2009 	if (stat(pgpassfile, &stat_buf) != 0)
2010 		return NULL;
2011 
2012 	if (!S_ISREG(stat_buf.st_mode))
2013 	{
2014 		if (pcpConn->Pfdebug)
2015 			fprintf(pcpConn->Pfdebug, "WARNING: password file \"%s\" is not a plain file\n", pgpassfile);
2016 		return NULL;
2017 	}
2018 
2019 	/* If password file is insecure, alert the user and ignore it. */
2020 	if (stat_buf.st_mode & (S_IRWXG | S_IRWXO))
2021 	{
2022 		if (pcpConn->Pfdebug)
2023 			fprintf(pcpConn->Pfdebug, "WARNING: password file \"%s\" has group or world access; permissions should be u=rw (0600) or less\n",
2024 					pgpassfile);
2025 		return NULL;
2026 	}
2027 
2028 	fp = fopen(pgpassfile, "r");
2029 	if (fp == NULL)
2030 		return NULL;
2031 
2032 	while (!feof(fp) && !ferror(fp))
2033 	{
2034 		char	   *t = buf,
2035 				   *ret,
2036 				   *p1,
2037 				   *p2;
2038 		int			len;
2039 
2040 		if (fgets(buf, sizeof(buf), fp) == NULL)
2041 			break;
2042 
2043 		len = strlen(buf);
2044 		if (len == 0)
2045 			continue;
2046 
2047 		/* Remove trailing newline */
2048 		if (buf[len - 1] == '\n')
2049 			buf[len - 1] = 0;
2050 
2051 		if ((t = pwdfMatchesString(t, hostname)) == NULL ||
2052 			(t = pwdfMatchesString(t, port)) == NULL ||
2053 			(t = pwdfMatchesString(t, username)) == NULL)
2054 			continue;
2055 		ret = pstrdup(t);
2056 		fclose(fp);
2057 
2058 		/* De-escape password. */
2059 		for (p1 = p2 = ret; *p1 != ':' && *p1 != '\0'; ++p1, ++p2)
2060 		{
2061 			if (*p1 == '\\' && p1[1] != '\0')
2062 				++p1;
2063 			*p2 = *p1;
2064 		}
2065 		*p2 = '\0';
2066 
2067 		return ret;
2068 	}
2069 
2070 	fclose(fp);
2071 	return NULL;
2072 
2073 #undef LINELEN
2074 }
2075 
2076 /*
2077  * Helper function for PasswordFromFile borrowed from PG
2078  * returns a pointer to the next token or NULL if the current
2079  * token doesn't match
2080  */
2081 static char *
pwdfMatchesString(char * buf,char * token)2082 pwdfMatchesString(char *buf, char *token)
2083 {
2084 	char	   *tbuf,
2085 			   *ttok;
2086 	bool		bslash = false;
2087 
2088 	if (buf == NULL || token == NULL)
2089 		return NULL;
2090 	tbuf = buf;
2091 	ttok = token;
2092 	if (tbuf[0] == '*' && tbuf[1] == ':')
2093 		return tbuf + 2;
2094 	while (*tbuf != 0)
2095 	{
2096 		if (*tbuf == '\\' && !bslash)
2097 		{
2098 			tbuf++;
2099 			bslash = true;
2100 		}
2101 		if (*tbuf == ':' && *ttok == 0 && !bslash)
2102 			return tbuf + 1;
2103 		bslash = false;
2104 		if (*ttok == 0)
2105 			return NULL;
2106 		if (*tbuf == *ttok)
2107 		{
2108 			tbuf++;
2109 			ttok++;
2110 		}
2111 		else
2112 			return NULL;
2113 	}
2114 	return NULL;
2115 }
2116