1 /*
2  * $Header$
3  *
4  * Handles PCP connection, and protocol communication with pgpool-II
5  * These are client APIs. Server program should use APIs in pcp_stream.c
6  *
7  *
8  * pgpool: a language independent connection pool server for PostgreSQL
9  * written by Tatsuo Ishii
10  *
11  * Copyright (c) 2003-2020	PgPool Global Development Group
12  *
13  * Permission to use, copy, modify, and distribute this software and
14  * its documentation for any purpose and without fee is hereby
15  * granted, provided that the above copyright notice appear in all
16  * copies and that both that copyright notice and this permission
17  * notice appear in supporting documentation, and that the name of the
18  * author not be used in advertising or publicity pertaining to
19  * distribution of the software without specific, written prior
20  * permission. The author makes no representations about the
21  * suitability of this software for any purpose.  It is provided "as
22  * is" without express or implied warranty.
23  *
24  */
25 
26 #include <stdio.h>
27 #include <errno.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <sys/socket.h>
33 #include <sys/time.h>
34 #include <sys/un.h>
35 #include <netinet/in.h>
36 #include <netinet/tcp.h>
37 #include <netdb.h>
38 #include <unistd.h>
39 #include <stdarg.h>
40 
41 #include "pool.h"
42 #include "pcp/pcp.h"
43 #include "pcp/pcp_stream.h"
44 #include "utils/pool_path.h"
45 #include "utils/palloc.h"
46 #include "utils/pool_process_reporting.h"
47 #include "utils/json.h"
48 #include "auth/md5.h"
49 
50 #define PCPPASSFILE ".pcppass"
51 #define DefaultHost  "localhost"
52 
53 
54 static int	pcp_authorize(PCPConnInfo * pcpConn, char *username, char *password);
55 
56 static void pcp_internal_error(PCPConnInfo * pcpConn, const char *fmt,...);
57 
58 static PCPResultInfo * _pcp_detach_node(PCPConnInfo * pcpConn, int nid, bool gracefully);
59 static PCPResultInfo * _pcp_promote_node(PCPConnInfo * pcpConn, int nid, bool gracefully);
60 static PCPResultInfo * process_pcp_response(PCPConnInfo * pcpConn, char sentMsg);
61 static void setCommandSuccessful(PCPConnInfo * pcpConn);
62 static void setResultStatus(PCPConnInfo * pcpConn, ResultStateType resultState);
63 static void setResultBinaryData(PCPResultInfo * res, unsigned int slotno, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *));
64 static int	setNextResultBinaryData(PCPResultInfo * res, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *));
65 static void setResultIntData(PCPResultInfo * res, unsigned int slotno, int value);
66 
67 static void process_node_info_response(PCPConnInfo * pcpConn, char *buf, int len);
68 static void	process_health_check_stats_response(PCPConnInfo * pcpConn, char *buf, int len);
69 static void process_command_complete_response(PCPConnInfo * pcpConn, char *buf, int len);
70 static void process_watchdog_info_response(PCPConnInfo * pcpConn, char *buf, int len);
71 static void process_process_info_response(PCPConnInfo * pcpConn, char *buf, int len);
72 static void process_pool_status_response(PCPConnInfo * pcpConn, char *buf, int len);
73 static void process_pcp_node_count_response(PCPConnInfo * pcpConn, char *buf, int len);
74 static void process_process_count_response(PCPConnInfo * pcpConn, char *buf, int len);
75 static void process_salt_info_response(PCPConnInfo * pcpConn, char *buf, int len);
76 static void process_error_response(PCPConnInfo * pcpConn, char toc, char *buff);
77 
78 
79 static void setResultSlotCount(PCPConnInfo * pcpConn, unsigned int slotCount);
80 static void free_processInfo(struct PCPConnInfo *pcpConn, void *ptr);
81 static int	PCPFlush(PCPConnInfo * pcpConn);
82 
83 static bool getPoolPassFilename(char *pgpassfile);
84 static char *PasswordFromFile(PCPConnInfo * pcpConn, char *hostname, char *port, char *username);
85 static char *pwdfMatchesString(char *buf, char *token);
86 
87 /* --------------------------------
88  * pcp_connect - open connection to pgpool using given arguments
89  *
90  * return 0 on success, -1 otherwise
91  * --------------------------------
92  */
93 
94 /* Check if PCP connection is connected and authenticated
95  * return 1 on successfull 0 otherwise
96  */
97 
98 PCPConnInfo *
pcp_connect(char * hostname,int port,char * username,char * password,FILE * Pfdebug)99 pcp_connect(char *hostname, int port, char *username, char *password, FILE *Pfdebug)
100 {
101 	struct sockaddr_in addr;
102 	struct sockaddr_un unix_addr;
103 	struct hostent *hp;
104 	char	   *password_fron_file = NULL;
105 	char		os_user[256];
106 	PCPConnInfo *pcpConn = palloc0(sizeof(PCPConnInfo));
107 	int			fd;
108 	int			on = 1;
109 	int			len;
110 
111 	pcpConn->connState = PCP_CONNECTION_NOT_CONNECTED;
112 	pcpConn->Pfdebug = Pfdebug;
113 
114 	if (hostname == NULL || *hostname == '\0' || *hostname == '/')
115 	{
116 		char	   *path;
117 
118 		fd = socket(AF_UNIX, SOCK_STREAM, 0);
119 
120 		if (fd < 0)
121 		{
122 			pcp_internal_error(pcpConn,
123 							   "ERROR: failed to create UNIX domain socket. socket error \"%s\"", strerror(errno));
124 			pcpConn->connState = PCP_CONNECTION_BAD;
125 
126 			return pcpConn;
127 		}
128 
129 		memset(&unix_addr, 0, sizeof(unix_addr));
130 		unix_addr.sun_family = AF_UNIX;
131 
132 		if (hostname == NULL || *hostname == '\0')
133 		{
134 			path = UNIX_DOMAIN_PATH;
135 			hostname = path;
136 		}
137 		else
138 		{
139 			path = hostname;
140 		}
141 
142 		snprintf(unix_addr.sun_path, sizeof(unix_addr.sun_path), "%s/.s.PGSQL.%d",
143 				 path, port);
144 
145 		if (connect(fd, (struct sockaddr *) &unix_addr, sizeof(unix_addr)) < 0)
146 		{
147 			close(fd);
148 
149 			pcp_internal_error(pcpConn,
150 							   "ERROR: connection to socket \"%s\" failed with error \"%s\"", unix_addr.sun_path, strerror(errno));
151 			pcpConn->connState = PCP_CONNECTION_BAD;
152 			return pcpConn;
153 		}
154 	}
155 	else
156 	{
157 		fd = socket(AF_INET, SOCK_STREAM, 0);
158 		if (fd < 0)
159 		{
160 			pcp_internal_error(pcpConn,
161 							   "ERROR: failed to create INET domain socket with error \"%s\"", strerror(errno));
162 			pcpConn->connState = PCP_CONNECTION_BAD;
163 			return pcpConn;
164 		}
165 
166 		if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
167 					   (char *) &on, sizeof(on)) < 0)
168 		{
169 			close(fd);
170 
171 			pcp_internal_error(pcpConn,
172 							   "ERROR: set socket option failed with error \"%s\"", strerror(errno));
173 			pcpConn->connState = PCP_CONNECTION_BAD;
174 			return pcpConn;
175 		}
176 
177 		memset((char *) &addr, 0, sizeof(addr));
178 		addr.sin_family = AF_INET;
179 		hp = gethostbyname(hostname);
180 		if ((hp == NULL) || (hp->h_addrtype != AF_INET))
181 		{
182 			close(fd);
183 			pcp_internal_error(pcpConn,
184 							   "ERROR: could not retrieve hostname. gethostbyname failed with error \"%s\"", strerror(errno));
185 			pcpConn->connState = PCP_CONNECTION_BAD;
186 			return pcpConn;
187 
188 		}
189 		memmove((char *) &(addr.sin_addr),
190 				(char *) hp->h_addr,
191 				hp->h_length);
192 		addr.sin_port = htons(port);
193 
194 		len = sizeof(struct sockaddr_in);
195 		if (connect(fd, (struct sockaddr *) &addr, len) < 0)
196 		{
197 			close(fd);
198 			pcp_internal_error(pcpConn,
199 							   "ERROR: connection to host \"%s\" failed with error \"%s\"", hostname, strerror(errno));
200 			pcpConn->connState = PCP_CONNECTION_BAD;
201 			return pcpConn;
202 		}
203 	}
204 
205 	pcpConn->pcpConn = pcp_open(fd);
206 	if (pcpConn->pcpConn == NULL)
207 	{
208 		close(fd);
209 		pcp_internal_error(pcpConn,
210 						   "ERROR: failed to allocate memory");
211 		pcpConn->connState = PCP_CONNECTION_BAD;
212 		return pcpConn;
213 	}
214 	pcpConn->connState = PCP_CONNECTION_CONNECTED;
215 
216 	/*
217 	 * If username is not provided. Use the os user name and do not complain
218 	 * if it (getting os user name) gets failed
219 	 */
220 	if (username == NULL && get_os_username(os_user, sizeof(os_user)))
221 		username = os_user;
222 
223 	/*
224 	 * If password is not provided. lookup in pcppass file
225 	 */
226 	if (password == NULL || *password == '\0')
227 	{
228 		char		port_str[100];
229 
230 		snprintf(port_str, sizeof(port_str), "%d", port);
231 		password_fron_file = PasswordFromFile(pcpConn, hostname, port_str, username);
232 		password = password_fron_file;
233 	}
234 
235 	if (pcp_authorize(pcpConn, username, password) < 0)
236 	{
237 		pcp_close(pcpConn->pcpConn);
238 		pcpConn->pcpConn = NULL;
239 		pcpConn->connState = PCP_CONNECTION_AUTH_ERROR;
240 	}
241 	else
242 		pcpConn->connState = PCP_CONNECTION_OK;
243 
244 	if (password_fron_file)
245 		pfree(password_fron_file);
246 
247 	return pcpConn;
248 }
249 
250 static void
process_salt_info_response(PCPConnInfo * pcpConn,char * buf,int len)251 process_salt_info_response(PCPConnInfo * pcpConn, char *buf, int len)
252 {
253 	char	   *salt = palloc((sizeof(char) * 4));
254 
255 	memcpy(salt, buf, 4);
256 	if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) salt, 4, NULL) < 0)
257 	{
258 		pcp_internal_error(pcpConn,
259 
260 						   "command failed. invalid response");
261 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
262 	}
263 	else
264 	{
265 		setCommandSuccessful(pcpConn);
266 	}
267 }
268 
269 /* --------------------------------
270  * pcp_authorize - authenticate with pgpool using username and password
271  *
272  * return 0 on success, -1 otherwise
273  * --------------------------------
274  */
275 static int
pcp_authorize(PCPConnInfo * pcpConn,char * username,char * password)276 pcp_authorize(PCPConnInfo * pcpConn, char *username, char *password)
277 {
278 	int			wsize;
279 	char		salt[4];
280 	char	   *salt_ptr;
281 	char		encrypt_buf[(MD5_PASSWD_LEN + 1) * 2];
282 	char		md5[MD5_PASSWD_LEN + 1];
283 	PCPResultInfo *pcpRes;
284 
285 	if (password == NULL)
286 		password = "";
287 
288 	if (username == NULL)
289 		username = "";
290 
291 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_CONNECTED)
292 	{
293 		pcp_internal_error(pcpConn,
294 						   "ERROR: PCP authorization failed. invalid connection state.");
295 		return -1;
296 	}
297 
298 	if (strlen(username) >= MAX_USER_PASSWD_LEN)
299 	{
300 		pcp_internal_error(pcpConn,
301 						   "ERROR: PCP authorization failed. username too long.");
302 		return -1;
303 	}
304 
305 	/* request salt */
306 	pcp_write(pcpConn->pcpConn, "M", 1);
307 	wsize = htonl(sizeof(int));
308 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
309 	if (PCPFlush(pcpConn) < 0)
310 		return -1;
311 
312 	pcpRes = process_pcp_response(pcpConn, 'M');
313 	if (PCPResultStatus(pcpRes) != PCP_RES_COMMAND_OK)
314 		return -1;
315 
316 	salt_ptr = pcp_get_binary_data(pcpRes, 0);
317 	if (salt_ptr == NULL)
318 		return -1;
319 	memcpy(salt, salt_ptr, 4);
320 
321 	/* encrypt password */
322 	pool_md5_hash(password, strlen(password), md5);
323 	md5[MD5_PASSWD_LEN] = '\0';
324 
325 	pool_md5_encrypt(md5, username, strlen(username),
326 					 encrypt_buf + MD5_PASSWD_LEN + 1);
327 	encrypt_buf[(MD5_PASSWD_LEN + 1) * 2 - 1] = '\0';
328 
329 	pool_md5_encrypt(encrypt_buf + MD5_PASSWD_LEN + 1, salt, 4,
330 					 encrypt_buf);
331 	encrypt_buf[MD5_PASSWD_LEN] = '\0';
332 
333 	pcp_write(pcpConn->pcpConn, "R", 1);
334 	wsize = htonl((strlen(username) + 1 + strlen(encrypt_buf) + 1) + sizeof(int));
335 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
336 	pcp_write(pcpConn->pcpConn, username, strlen(username) + 1);
337 	pcp_write(pcpConn->pcpConn, encrypt_buf, strlen(encrypt_buf) + 1);
338 	if (PCPFlush(pcpConn) < 0)
339 		return -1;
340 	pcpRes = process_pcp_response(pcpConn, 'R');
341 	if (PCPResultStatus(pcpRes) != PCP_RES_COMMAND_OK)
342 		return -1;
343 	pcp_free_result(pcpConn);
344 	return 0;
345 }
346 
process_pcp_response(PCPConnInfo * pcpConn,char sentMsg)347 static PCPResultInfo * process_pcp_response(PCPConnInfo * pcpConn, char sentMsg)
348 {
349 	char		toc;
350 	int			rsize;
351 	char	   *buf;
352 
353 	/* create empty result */
354 	if (pcpConn->pcpResInfo == NULL)
355 	{
356 		pcpConn->pcpResInfo = palloc0(sizeof(PCPResultInfo));
357 		pcpConn->pcpResInfo->resultSlots = 1;
358 	}
359 
360 	while (1)
361 	{
362 		if (pcp_read(pcpConn->pcpConn, &toc, 1))
363 		{
364 			pcp_internal_error(pcpConn,
365 							   "ERROR: unable to read data from socket.");
366 			setResultStatus(pcpConn, PCP_RES_ERROR);
367 			return pcpConn->pcpResInfo;
368 		}
369 
370 		if (pcp_read(pcpConn->pcpConn, &rsize, sizeof(int)))
371 		{
372 			pcp_internal_error(pcpConn,
373 							   "ERROR: unable to read data from socket.");
374 			setResultStatus(pcpConn, PCP_RES_ERROR);
375 			return pcpConn->pcpResInfo;
376 		}
377 		rsize = ntohl(rsize);
378 		buf = (char *) palloc(rsize);
379 
380 		if (pcp_read(pcpConn->pcpConn, buf, rsize - sizeof(int)))
381 		{
382 			pfree(buf);
383 			pcp_internal_error(pcpConn,
384 							   "ERROR: unable to read data from socket.");
385 			setResultStatus(pcpConn, PCP_RES_ERROR);
386 			return pcpConn->pcpResInfo;
387 		}
388 
389 		if (pcpConn->Pfdebug)
390 			fprintf(pcpConn->Pfdebug, "DEBUG: recv: tos=\"%c\", len=%d\n", toc, rsize);
391 
392 		switch (toc)
393 		{
394 			case 'r':			/* Authentication Response */
395 				{
396 					if (sentMsg != 'R')
397 					{
398 						setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
399 					}
400 					else if (strcmp(buf, "AuthenticationOK") == 0)
401 					{
402 						pcpConn->connState = PCP_CONNECTION_OK;
403 						setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
404 					}
405 					else
406 					{
407 						pcp_internal_error(pcpConn,
408 										   "ERROR: authentication failed. reason=\"%s\"", buf);
409 						setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
410 					}
411 				}
412 				break;
413 			case 'm':
414 				if (sentMsg != 'M')
415 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
416 				else
417 					process_salt_info_response(pcpConn, buf, rsize);
418 				break;
419 
420 			case 'E':
421 				setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
422 				process_error_response(pcpConn, toc, buf);
423 				break;
424 
425 			case 'N':
426 				process_error_response(pcpConn, toc, buf);
427 				pfree(buf);
428 				continue;
429 				break;
430 
431 			case 'i':
432 				if (sentMsg != 'I')
433 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
434 				else
435 					process_node_info_response(pcpConn, buf, rsize);
436 				break;
437 
438 			case 'h':
439 				if (sentMsg != 'H')
440 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
441 				else
442 					process_health_check_stats_response(pcpConn, buf, rsize);
443 				break;
444 
445 			case 'l':
446 				if (sentMsg != 'L')
447 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
448 				else
449 					process_pcp_node_count_response(pcpConn, buf, rsize);
450 				break;
451 
452 			case 'c':
453 				if (sentMsg != 'C' && sentMsg != 'O')
454 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
455 				else
456 					process_command_complete_response(pcpConn, buf, rsize);
457 				break;
458 
459 			case 'd':
460 				if (sentMsg != 'D' && sentMsg != 'J')
461 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
462 				else
463 					process_command_complete_response(pcpConn, buf, rsize);
464 				break;
465 
466 			case 'a':
467 				if (sentMsg != 'A')
468 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
469 				else
470 					process_command_complete_response(pcpConn, buf, rsize);
471 				break;
472 
473 			case 'z':
474 				if (sentMsg != 'Z')
475 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
476 				else
477 					process_command_complete_response(pcpConn, buf, rsize);
478 				break;
479 
480 			case 'w':
481 				if (sentMsg != 'W')
482 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
483 				else
484 					process_watchdog_info_response(pcpConn, buf, rsize);
485 				break;
486 
487 			case 'p':
488 				if (sentMsg != 'P')
489 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
490 				else
491 					process_process_info_response(pcpConn, buf, rsize);
492 				break;
493 
494 			case 'n':
495 				if (sentMsg != 'N')
496 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
497 				else
498 					process_process_count_response(pcpConn, buf, rsize);
499 				break;
500 
501 			case 'b':
502 				if (sentMsg != 'B')
503 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
504 				else
505 					process_pool_status_response(pcpConn, buf, rsize);
506 				break;
507 
508 			case 't':
509 				if (sentMsg != 'T')
510 					setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
511 				else
512 					setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
513 				break;
514 
515 			default:
516 				setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
517 				pcp_internal_error(pcpConn,
518 								   "ERROR: invalid PCP packet type =\"%c\"", toc);
519 				break;
520 		}
521 		pfree(buf);
522 		if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
523 			break;
524 	}
525 	return pcpConn->pcpResInfo;
526 }
527 
528 static void
process_error_response(PCPConnInfo * pcpConn,char toc,char * buf)529 process_error_response(PCPConnInfo * pcpConn, char toc, char *buf)
530 {
531 	/* For time we only support sev, error message and details */
532 	char	   *errorSev = NULL;
533 	char	   *errorMsg = NULL;
534 	char	   *errorDet = NULL;
535 	char	   *e = buf;
536 
537 	if (toc != 'E' && toc != 'N')
538 		return;
539 
540 	while (*e)
541 	{
542 		char		type = *e;
543 
544 		e++;
545 		if (*e == 0)
546 			break;
547 
548 		if (type == 'M')
549 			errorMsg = e;
550 		else if (type == 'S')
551 			errorSev = e;
552 		else if (type == 'D')
553 			errorDet = e;
554 		else
555 			e += strlen(e) + 1;
556 		if (errorDet && errorSev && errorMsg)	/* we have all what we need */
557 			break;
558 	}
559 	if (!errorSev && !errorMsg)
560 		return;
561 
562 	if (toc != 'E')				/* This is not an error report it as debug */
563 	{
564 		if (pcpConn->Pfdebug)
565 			fprintf(pcpConn->Pfdebug,
566 					"BACKEND %s:  %s\n%s%s%s", errorSev, errorMsg,
567 					errorDet ? "DETAIL:  " : "",
568 					errorDet ? errorDet : "",
569 					errorDet ? "\n" : "");
570 	}
571 	else
572 	{
573 		pcp_internal_error(pcpConn,
574 						   "%s:  %s\n%s%s%s", errorSev, errorMsg,
575 						   errorDet ? "DETAIL:  " : "",
576 						   errorDet ? errorDet : "",
577 						   errorDet ? "\n" : "");
578 		setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
579 
580 	}
581 }
582 
583 /* --------------------------------
584  * pcp_disconnect - close connection to pgpool
585  * --------------------------------
586  */
587 void
pcp_disconnect(PCPConnInfo * pcpConn)588 pcp_disconnect(PCPConnInfo * pcpConn)
589 {
590 	int			wsize;
591 
592 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
593 	{
594 		pcp_internal_error(pcpConn, "invalid PCP connection");
595 		return;
596 	}
597 
598 	pcp_write(pcpConn->pcpConn, "X", 1);
599 	wsize = htonl(sizeof(int));
600 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
601 	if (PCPFlush(pcpConn) < 0)
602 		return;
603 	if (pcpConn->Pfdebug)
604 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"X\", len=%d\n", (int) sizeof(int));
605 
606 	pcp_close(pcpConn->pcpConn);
607 	pcpConn->connState = PCP_CONNECTION_NOT_CONNECTED;
608 	pcpConn->pcpConn = NULL;
609 }
610 
611 /* --------------------------------
612  * pcp_terminate_pgpool - send terminate packet
613  *
614  * return 0 on success, -1 otherwise
615  * --------------------------------
616  */
617 PCPResultInfo *
pcp_terminate_pgpool(PCPConnInfo * pcpConn,char mode,char command_scope)618 pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode, char command_scope)
619 {
620 	int			wsize;
621 
622 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
623 	{
624 		pcp_internal_error(pcpConn, "invalid PCP connection");
625 		return NULL;
626 	}
627 	if (command_scope == 'l')	/*local only*/
628 		pcp_write(pcpConn->pcpConn, "T", 1);
629 	else
630 		pcp_write(pcpConn->pcpConn, "t", 1);
631 	wsize = htonl(sizeof(int) + sizeof(char));
632 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
633 	pcp_write(pcpConn->pcpConn, &mode, sizeof(char));
634 	if (PCPFlush(pcpConn) < 0)
635 		return NULL;
636 	if (pcpConn->Pfdebug)
637 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"T\", len=%d\n", ntohl(wsize));
638 
639 	return process_pcp_response(pcpConn, 'T');
640 }
641 
642 static void
process_pcp_node_count_response(PCPConnInfo * pcpConn,char * buf,int len)643 process_pcp_node_count_response(PCPConnInfo * pcpConn, char *buf, int len)
644 {
645 	if (strcmp(buf, "CommandComplete") == 0)
646 	{
647 		char	   *index = NULL;
648 
649 		index = (char *) memchr(buf, '\0', len);
650 		if (index != NULL)
651 		{
652 			int			ret;
653 
654 			index += 1;
655 			ret = atoi(index);
656 			setResultIntData(pcpConn->pcpResInfo, 0, ret);
657 			setCommandSuccessful(pcpConn);
658 			return;
659 		}
660 		else
661 			pcp_internal_error(pcpConn,
662 							   "command failed. invalid response");
663 	}
664 	else
665 		pcp_internal_error(pcpConn,
666 						   "command failed with reason: \"%s\"", buf);
667 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
668 }
669 
670 /* --------------------------------
671  * pcp_node_count - get number of nodes currently connected to pgpool
672  *
673  * return array of node IDs on success, -1 otherwise
674  * --------------------------------
675  */
676 PCPResultInfo *
pcp_node_count(PCPConnInfo * pcpConn)677 pcp_node_count(PCPConnInfo * pcpConn)
678 {
679 	int			wsize;
680 
681 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
682 	{
683 		pcp_internal_error(pcpConn,
684 						   "invalid PCP connection");
685 		return NULL;
686 	}
687 	pcp_write(pcpConn->pcpConn, "L", 1);
688 	wsize = htonl(sizeof(int));
689 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
690 	if (PCPFlush(pcpConn) < 0)
691 		return NULL;
692 	if (pcpConn->Pfdebug)
693 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"L\", len=%d\n", ntohl(wsize));
694 
695 	return process_pcp_response(pcpConn, 'L');
696 }
697 
698 static void
process_node_info_response(PCPConnInfo * pcpConn,char * buf,int len)699 process_node_info_response(PCPConnInfo * pcpConn, char *buf, int len)
700 {
701 	BackendInfo *backend_info = NULL;
702 
703 	if (strcmp(buf, "CommandComplete") == 0)
704 	{
705 		char	   *index = NULL;
706 
707 		backend_info = (BackendInfo *) palloc(sizeof(BackendInfo));
708 
709 		index = (char *) memchr(buf, '\0', len);
710 		if (index == NULL)
711 			goto INVALID_RESPONSE;
712 		index += 1;
713 		strlcpy(backend_info->backend_hostname, index, sizeof(backend_info->backend_hostname));
714 
715 		index = (char *) memchr(index, '\0', len);
716 		if (index == NULL)
717 			goto INVALID_RESPONSE;
718 		index += 1;
719 		backend_info->backend_port = atoi(index);
720 
721 		index = (char *) memchr(index, '\0', len);
722 		if (index == NULL)
723 			goto INVALID_RESPONSE;
724 		index += 1;
725 		backend_info->backend_status = atoi(index);
726 
727 		index = (char *) memchr(index, '\0', len);
728 		if (index == NULL)
729 			goto INVALID_RESPONSE;
730 		index += 1;
731 		backend_info->backend_weight = atof(index);
732 
733 		index = (char *) memchr(index, '\0', len);
734 		if (index == NULL)
735 			goto INVALID_RESPONSE;
736 
737 		index++;
738 		backend_info->role = atoi(index);
739 
740 		index = (char *) memchr(index, '\0', len);
741 		if (index == NULL)
742 			goto INVALID_RESPONSE;
743 
744 		index++;
745 		backend_info->standby_delay = atol(index);
746 
747 		index = (char *) memchr(index, '\0', len);
748 		if (index == NULL)
749 			goto INVALID_RESPONSE;
750 
751 		index++;
752 		strlcpy(backend_info->replication_state, index, sizeof(backend_info->replication_state));
753 
754 		index = (char *) memchr(index, '\0', len);
755 		if (index == NULL)
756 			goto INVALID_RESPONSE;
757 
758 		index++;
759 		strlcpy(backend_info->replication_sync_state, index, sizeof(backend_info->replication_sync_state));
760 
761 		index = (char *) memchr(index, '\0', len);
762 		if (index == NULL)
763 			goto INVALID_RESPONSE;
764 
765 		index++;
766 		backend_info->status_changed_time = atol(index);
767 
768 		index = (char *) memchr(index, '\0', len);
769 		if (index == NULL)
770 			goto INVALID_RESPONSE;
771 
772 		if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) backend_info, sizeof(BackendInfo), NULL) < 0)
773 			goto INVALID_RESPONSE;
774 
775 		setCommandSuccessful(pcpConn);
776 	}
777 	else
778 	{
779 		pcp_internal_error(pcpConn,
780 						   "command failed with reason: \"%s\"", buf);
781 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
782 	}
783 
784 	return;
785 
786 INVALID_RESPONSE:
787 
788 	if (backend_info)
789 		pfree(backend_info);
790 	pcp_internal_error(pcpConn,
791 					   "command failed. invalid response");
792 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
793 
794 }
795 
796 /* --------------------------------
797  * pcp_node_info - get information of node pointed by given argument
798  *
799  * return structure of node information on success, -1 otherwise
800  * --------------------------------
801  */
802 PCPResultInfo *
pcp_node_info(PCPConnInfo * pcpConn,int nid)803 pcp_node_info(PCPConnInfo * pcpConn, int nid)
804 {
805 	int			wsize;
806 	char		node_id[16];
807 
808 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
809 	{
810 		pcp_internal_error(pcpConn,
811 						   "invalid PCP connection");
812 		return NULL;
813 	}
814 
815 	snprintf(node_id, sizeof(node_id), "%d", nid);
816 
817 	pcp_write(pcpConn->pcpConn, "I", 1);
818 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
819 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
820 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
821 	if (PCPFlush(pcpConn) < 0)
822 		return NULL;
823 	if (pcpConn->Pfdebug)
824 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"I\", len=%d\n", ntohl(wsize));
825 
826 	return process_pcp_response(pcpConn, 'I');
827 }
828 
829 
830 /* --------------------------------
831  * pcp_health_check_stats - get information of health check stats pointed by given argument
832  *
833  * return structure of node information on success, -1 otherwise
834  * --------------------------------
835  */
836 PCPResultInfo *
pcp_health_check_stats(PCPConnInfo * pcpConn,int nid)837 pcp_health_check_stats(PCPConnInfo * pcpConn, int nid)
838 {
839 	int			wsize;
840 	char		node_id[16];
841 
842 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
843 	{
844 		pcp_internal_error(pcpConn,
845 						   "invalid PCP connection");
846 		return NULL;
847 	}
848 
849 	snprintf(node_id, sizeof(node_id), "%d", nid);
850 
851 	pcp_write(pcpConn->pcpConn, "H", 1);
852 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
853 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
854 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
855 	if (PCPFlush(pcpConn) < 0)
856 		return NULL;
857 	if (pcpConn->Pfdebug)
858 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"L\", len=%d\n", ntohl(wsize));
859 
860 	return process_pcp_response(pcpConn, 'H');
861 }
862 
863 PCPResultInfo *
pcp_reload_config(PCPConnInfo * pcpConn,char command_scope)864 pcp_reload_config(PCPConnInfo * pcpConn,char command_scope)
865 {
866 	int                     wsize;
867 /*
868  * pcp packet format for pcp_reload_config
869  * z[size][commmand_scope]
870  */
871 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
872 	{
873 	   pcp_internal_error(pcpConn, "invalid PCP connection");
874 	   return NULL;
875 	}
876 
877 	pcp_write(pcpConn->pcpConn, "Z", 1);
878 	wsize = htonl(sizeof(int) + sizeof(char));
879 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
880 	pcp_write(pcpConn->pcpConn, &command_scope, sizeof(char));
881 	if (PCPFlush(pcpConn) < 0)
882 	   return NULL;
883 	if (pcpConn->Pfdebug)
884 	   fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"Z\", len=%d\n", ntohl(wsize));
885 
886 	return process_pcp_response(pcpConn, 'Z');
887 }
888 
889 
890 /*
891  * Process health check response from PCP server.
892  * pcpConn: connection to the server
893  * buf:		returned data from server
894  * len:		length of the data
895  */
896 static void
process_health_check_stats_response(PCPConnInfo * pcpConn,char * buf,int len)897 process_health_check_stats_response
898 (PCPConnInfo * pcpConn, char *buf, int len)
899 {
900 	POOL_HEALTH_CHECK_STATS *stats;
901 	int		*offsets;
902 	int		n;
903 	int		i;
904 	char	*p;
905 	int		maxstr;
906 	char	c[] = "CommandComplete";
907 
908 	if (strcmp(buf, c) != 0)
909 	{
910 		pcp_internal_error(pcpConn,
911 						   "command failed. invalid response");
912 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
913 		return;
914 	}
915 	buf += sizeof(c);
916 
917 	/* Allocate health stats memory */
918 	stats = palloc0(sizeof(POOL_HEALTH_CHECK_STATS));
919 	p = (char *)stats;
920 
921 	/* Calculate total packet length */
922 	offsets = pool_health_check_stats_offsets(&n);
923 
924 	for (i = 0; i < n; i++)
925 	{
926 		if (i == n -1)
927 			maxstr = sizeof(POOL_HEALTH_CHECK_STATS) - offsets[i];
928 		else
929 			maxstr = offsets[i + 1] - offsets[i];
930 
931 		StrNCpy(p + offsets[i], buf, maxstr -1);
932 		buf += strlen(buf) + 1;
933 	}
934 
935 	if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) stats, sizeof(POOL_HEALTH_CHECK_STATS), NULL) < 0)
936 	{
937 		if (stats)
938 			pfree(stats);
939 		pcp_internal_error(pcpConn,
940 						   "command failed. invalid response");
941 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
942 	}
943 	else
944 		setCommandSuccessful(pcpConn);
945 
946 }
947 
948 static void
process_process_count_response(PCPConnInfo * pcpConn,char * buf,int len)949 process_process_count_response(PCPConnInfo * pcpConn, char *buf, int len)
950 {
951 	if (strcmp(buf, "CommandComplete") == 0)
952 	{
953 		int			process_count;
954 		int		   *process_list = NULL;
955 		char	   *index = NULL;
956 		int			i;
957 
958 		index = (char *) memchr(buf, '\0', len);
959 		if (index == NULL)
960 		{
961 			pcp_internal_error(pcpConn,
962 							   "command failed. invalid response");
963 			setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
964 			return;
965 		}
966 		index += 1;
967 		process_count = atoi(index);
968 
969 		process_list = (int *) palloc(sizeof(int) * process_count);
970 
971 		for (i = 0; i < process_count; i++)
972 		{
973 			index = (char *) memchr(index, '\0', len);
974 			if (index == NULL)
975 			{
976 				pcp_internal_error(pcpConn,
977 								   "command failed. invalid response");
978 				setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
979 				pfree(process_list);
980 				return;
981 			}
982 			index += 1;
983 			process_list[i] = atoi(index);
984 		}
985 		setResultSlotCount(pcpConn, 1);
986 		if (setNextResultBinaryData(pcpConn->pcpResInfo, process_list, (sizeof(int) * process_count), NULL) < 0)
987 		{
988 			pcp_internal_error(pcpConn,
989 							   "command failed. invalid response");
990 			setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
991 		}
992 		else
993 		{
994 			setCommandSuccessful(pcpConn);
995 		}
996 	}
997 	else
998 	{
999 		pcp_internal_error(pcpConn,
1000 						   "command failed with reason: \"%s\"", buf);
1001 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1002 	}
1003 }
1004 
1005 /* --------------------------------
1006  * pcp_node_count - get number of nodes currently connected to pgpool
1007  *
1008  * return array of pids on success, NULL otherwise
1009  * --------------------------------
1010  */
1011 
1012 PCPResultInfo *
pcp_process_count(PCPConnInfo * pcpConn)1013 pcp_process_count(PCPConnInfo * pcpConn)
1014 {
1015 	int			wsize;
1016 
1017 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1018 	{
1019 		pcp_internal_error(pcpConn, "invalid PCP connection");
1020 		return NULL;
1021 	}
1022 
1023 	pcp_write(pcpConn->pcpConn, "N", 1);
1024 	wsize = htonl(sizeof(int));
1025 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1026 	if (PCPFlush(pcpConn) < 0)
1027 		return NULL;
1028 	if (pcpConn->Pfdebug)
1029 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"N\", len=%d\n", ntohl(wsize));
1030 
1031 	return process_pcp_response(pcpConn, 'N');
1032 }
1033 
1034 static void
free_processInfo(struct PCPConnInfo * pcpConn,void * ptr)1035 free_processInfo(struct PCPConnInfo *pcpConn, void *ptr)
1036 {
1037 	ProcessInfo *pi = (ProcessInfo *) ptr;
1038 
1039 	if (pcpConn->Pfdebug)
1040 		fprintf(pcpConn->Pfdebug, "free ProcessInfo structure \n");
1041 
1042 	if (pi == NULL)
1043 	{
1044 		if (pcpConn->Pfdebug)
1045 			fprintf(pcpConn->Pfdebug, "ProcessInfo structure is NULL nothing to free \n");
1046 		return;
1047 	}
1048 	if (pi->connection_info)
1049 		pfree(pi->connection_info);
1050 	pfree(pi);
1051 }
1052 
1053 static void
process_process_info_response(PCPConnInfo * pcpConn,char * buf,int len)1054 process_process_info_response(PCPConnInfo * pcpConn, char *buf, int len)
1055 {
1056 	char	   *index;
1057 	ProcessInfo *processInfo = NULL;
1058 
1059 	if (strcmp(buf, "ArraySize") == 0)
1060 	{
1061 		int			ci_size;
1062 
1063 		index = (char *) memchr(buf, '\0', len);
1064 		if (index == NULL)
1065 			goto INVALID_RESPONSE;
1066 		index += 1;
1067 		ci_size = atoi(index);
1068 
1069 		setResultStatus(pcpConn, PCP_RES_INCOMPLETE);
1070 		setResultSlotCount(pcpConn, ci_size);
1071 		pcpConn->pcpResInfo->nextFillSlot = 0;
1072 		return;
1073 	}
1074 	else if (strcmp(buf, "ProcessInfo") == 0)
1075 	{
1076 		if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
1077 			goto INVALID_RESPONSE;
1078 
1079 		processInfo = palloc0(sizeof(ProcessInfo));
1080 		processInfo->connection_info = palloc0(sizeof(ConnectionInfo));
1081 
1082 		index = (char *) memchr(buf, '\0', len);
1083 		if (index == NULL)
1084 			goto INVALID_RESPONSE;
1085 		index += 1;
1086 		processInfo->pid = atoi(index);
1087 
1088 		index = (char *) memchr(index, '\0', len);
1089 		if (index == NULL)
1090 			goto INVALID_RESPONSE;
1091 		index += 1;
1092 		strlcpy(processInfo->connection_info->database, index, SM_DATABASE);
1093 
1094 		index = (char *) memchr(index, '\0', len);
1095 		if (index == NULL)
1096 			goto INVALID_RESPONSE;
1097 		index += 1;
1098 		strlcpy(processInfo->connection_info->user, index, SM_USER);
1099 
1100 		index = (char *) memchr(index, '\0', len);
1101 		if (index == NULL)
1102 			goto INVALID_RESPONSE;
1103 		index += 1;
1104 		processInfo->start_time = atol(index);
1105 
1106 		index = (char *) memchr(index, '\0', len);
1107 		if (index == NULL)
1108 			goto INVALID_RESPONSE;
1109 		index += 1;
1110 		processInfo->connection_info->create_time = atol(index);
1111 
1112 		index = (char *) memchr(index, '\0', len);
1113 		if (index == NULL)
1114 			goto INVALID_RESPONSE;
1115 		index += 1;
1116 		processInfo->connection_info->major = atoi(index);
1117 
1118 		index = (char *) memchr(index, '\0', len);
1119 		if (index == NULL)
1120 			goto INVALID_RESPONSE;
1121 		index += 1;
1122 		processInfo->connection_info->minor = atoi(index);
1123 
1124 		index = (char *) memchr(index, '\0', len);
1125 		if (index == NULL)
1126 			goto INVALID_RESPONSE;
1127 		index += 1;
1128 		processInfo->connection_info->counter = atoi(index);
1129 
1130 		index = (char *) memchr(index, '\0', len);
1131 		if (index == NULL)
1132 			goto INVALID_RESPONSE;
1133 		index += 1;
1134 		processInfo->connection_info->backend_id = atoi(index);
1135 
1136 		index = (char *) memchr(index, '\0', len);
1137 		if (index == NULL)
1138 			goto INVALID_RESPONSE;
1139 		index += 1;
1140 		processInfo->connection_info->pid = atoi(index);
1141 
1142 		index = (char *) memchr(index, '\0', len);
1143 		if (index == NULL)
1144 			goto INVALID_RESPONSE;
1145 		index += 1;
1146 		processInfo->connection_info->connected = atoi(index);
1147 
1148 		if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) processInfo, sizeof(ProcessInfo), free_processInfo) < 0)
1149 			goto INVALID_RESPONSE;
1150 
1151 		return;
1152 	}
1153 
1154 	else if (strcmp(buf, "CommandComplete") == 0)
1155 	{
1156 		setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1157 		return;
1158 	}
1159 
1160 INVALID_RESPONSE:
1161 
1162 	if (processInfo)
1163 	{
1164 		if (processInfo->connection_info)
1165 			pfree(processInfo->connection_info);
1166 		pfree(processInfo);
1167 	}
1168 	pcp_internal_error(pcpConn,
1169 					   "command failed. invalid response");
1170 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1171 }
1172 
1173 /* --------------------------------
1174  * pcp_process_info - get information of node pointed by given argument
1175  *
1176  * return structure of process information on success, -1 otherwise
1177  * --------------------------------
1178  */
1179 PCPResultInfo *
pcp_process_info(PCPConnInfo * pcpConn,int pid)1180 pcp_process_info(PCPConnInfo * pcpConn, int pid)
1181 {
1182 	int			wsize;
1183 	char		process_id[16];
1184 
1185 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1186 	{
1187 		pcp_internal_error(pcpConn, "invalid PCP connection");
1188 		return NULL;
1189 	}
1190 
1191 	snprintf(process_id, sizeof(process_id), "%d", pid);
1192 
1193 	pcp_write(pcpConn->pcpConn, "P", 1);
1194 	wsize = htonl(strlen(process_id) + 1 + sizeof(int));
1195 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1196 	pcp_write(pcpConn->pcpConn, process_id, strlen(process_id) + 1);
1197 	if (PCPFlush(pcpConn) < 0)
1198 		return NULL;
1199 	if (pcpConn->Pfdebug)
1200 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"P\", len=%d\n", ntohl(wsize));
1201 
1202 	return process_pcp_response(pcpConn, 'P');
1203 }
1204 
1205 /* --------------------------------
1206  * pcp_detach_node - detach a node given by the argument from pgpool's control
1207  *
1208  * return 0 on success, -1 otherwise
1209  * --------------------------------
1210  */
1211 PCPResultInfo *
pcp_detach_node(PCPConnInfo * pcpConn,int nid)1212 pcp_detach_node(PCPConnInfo * pcpConn, int nid)
1213 {
1214 	return _pcp_detach_node(pcpConn, nid, FALSE);
1215 }
1216 
1217 /* --------------------------------
1218 
1219  * and detach a node given by the argument from pgpool's control
1220  *
1221  * return 0 on success, -1 otherwise
1222  * --------------------------------
1223  */
1224 PCPResultInfo *
pcp_detach_node_gracefully(PCPConnInfo * pcpConn,int nid)1225 pcp_detach_node_gracefully(PCPConnInfo * pcpConn, int nid)
1226 {
1227 	return _pcp_detach_node(pcpConn, nid, TRUE);
1228 }
1229 
1230 static PCPResultInfo *
_pcp_detach_node(PCPConnInfo * pcpConn,int nid,bool gracefully)1231 _pcp_detach_node(PCPConnInfo * pcpConn, int nid, bool gracefully)
1232 {
1233 	int			wsize;
1234 	char		node_id[16];
1235 	char	   *sendchar;
1236 
1237 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1238 	{
1239 		pcp_internal_error(pcpConn, "invalid PCP connection");
1240 		return NULL;
1241 	}
1242 
1243 	snprintf(node_id, sizeof(node_id), "%d", nid);
1244 
1245 	if (gracefully)
1246 		sendchar = "d";
1247 	else
1248 		sendchar = "D";
1249 
1250 	pcp_write(pcpConn->pcpConn, sendchar, 1);
1251 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1252 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1253 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1254 	if (PCPFlush(pcpConn) < 0)
1255 		return NULL;
1256 	if (pcpConn->Pfdebug)
1257 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"D\", len=%d\n", ntohl(wsize));
1258 
1259 	return process_pcp_response(pcpConn, 'D');
1260 }
1261 
1262 static void
process_command_complete_response(PCPConnInfo * pcpConn,char * buf,int len)1263 process_command_complete_response(PCPConnInfo * pcpConn, char *buf, int len)
1264 {
1265 	if (strcmp(buf, "CommandComplete") == 0)
1266 	{
1267 		setCommandSuccessful(pcpConn);
1268 	}
1269 	else
1270 	{
1271 		pcp_internal_error(pcpConn,
1272 						   "command failed with reason: \"%s\"", buf);
1273 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1274 	}
1275 }
1276 
1277 /* --------------------------------
1278  * pcp_attach_node - attach a node given by the argument from pgpool's control
1279  *
1280  * return 0 on success, -1 otherwise
1281  * --------------------------------
1282  */
1283 PCPResultInfo *
pcp_attach_node(PCPConnInfo * pcpConn,int nid)1284 pcp_attach_node(PCPConnInfo * pcpConn, int nid)
1285 {
1286 	int			wsize;
1287 	char		node_id[16];
1288 
1289 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1290 	{
1291 		pcp_internal_error(pcpConn, "invalid PCP connection");
1292 		return NULL;
1293 	}
1294 
1295 	snprintf(node_id, sizeof(node_id), "%d", nid);
1296 
1297 	pcp_write(pcpConn->pcpConn, "C", 1);
1298 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1299 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1300 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1301 	if (PCPFlush(pcpConn) < 0)
1302 		return NULL;
1303 	if (pcpConn->Pfdebug)
1304 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"C\", len=%d\n", ntohl(wsize));
1305 
1306 	return process_pcp_response(pcpConn, 'C');
1307 }
1308 
1309 
1310 /* --------------------------------
1311  * pcp_pool_status - return setup parameters and status
1312  *
1313  * returns and array of POOL_REPORT_CONFIG, NULL otherwise
1314  * --------------------------------
1315  */
1316 static void
process_pool_status_response(PCPConnInfo * pcpConn,char * buf,int len)1317 process_pool_status_response(PCPConnInfo * pcpConn, char *buf, int len)
1318 {
1319 	char	   *index;
1320 	POOL_REPORT_CONFIG *status = NULL;
1321 
1322 	if (strcmp(buf, "ArraySize") == 0)
1323 	{
1324 		int			ci_size;
1325 
1326 		index = (char *) memchr(buf, '\0', len) + 1;
1327 		ci_size = ntohl(*((int *) index));
1328 
1329 		setResultStatus(pcpConn, PCP_RES_INCOMPLETE);
1330 		setResultSlotCount(pcpConn, ci_size);
1331 		pcpConn->pcpResInfo->nextFillSlot = 0;
1332 		return;
1333 	}
1334 	else if (strcmp(buf, "ProcessConfig") == 0)
1335 	{
1336 		if (PCPResultStatus(pcpConn->pcpResInfo) != PCP_RES_INCOMPLETE)
1337 			goto INVALID_RESPONSE;
1338 
1339 		status = palloc(sizeof(POOL_REPORT_CONFIG));
1340 
1341 		index = (char *) memchr(buf, '\0', len);
1342 		if (index == NULL)
1343 			goto INVALID_RESPONSE;
1344 		index += 1;
1345 		strlcpy(status->name, index, POOLCONFIG_MAXNAMELEN + 1);
1346 
1347 		index = (char *) memchr(index, '\0', len);
1348 		if (index == NULL)
1349 			goto INVALID_RESPONSE;
1350 		index += 1;
1351 		strlcpy(status->value, index, POOLCONFIG_MAXVALLEN + 1);
1352 
1353 		index = (char *) memchr(index, '\0', len);
1354 		if (index == NULL)
1355 			goto INVALID_RESPONSE;
1356 		index += 1;
1357 		strlcpy(status->desc, index, POOLCONFIG_MAXDESCLEN + 1);
1358 
1359 		if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) status, sizeof(POOL_REPORT_CONFIG), NULL) < 0)
1360 			goto INVALID_RESPONSE;
1361 		return;
1362 	}
1363 	else if (strcmp(buf, "CommandComplete") == 0)
1364 	{
1365 		setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1366 		return;
1367 	}
1368 
1369 INVALID_RESPONSE:
1370 
1371 	if (status)
1372 		pfree(status);
1373 	pcp_internal_error(pcpConn,
1374 					   "command failed. invalid response");
1375 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1376 }
1377 
1378 PCPResultInfo *
pcp_pool_status(PCPConnInfo * pcpConn)1379 pcp_pool_status(PCPConnInfo * pcpConn)
1380 {
1381 	int			wsize;
1382 
1383 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1384 	{
1385 		pcp_internal_error(pcpConn, "invalid PCP connection");
1386 		return NULL;
1387 	}
1388 
1389 	pcp_write(pcpConn->pcpConn, "B", 1);
1390 	wsize = htonl(sizeof(int));
1391 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1392 	if (PCPFlush(pcpConn) < 0)
1393 		return NULL;
1394 	if (pcpConn->Pfdebug)
1395 		fprintf(pcpConn->Pfdebug, "DEBUG pcp_pool_status: send: tos=\"B\", len=%d\n", ntohl(wsize));
1396 	return process_pcp_response(pcpConn, 'B');
1397 }
1398 
1399 
1400 PCPResultInfo *
pcp_recovery_node(PCPConnInfo * pcpConn,int nid)1401 pcp_recovery_node(PCPConnInfo * pcpConn, int nid)
1402 {
1403 	int			wsize;
1404 	char		node_id[16];
1405 
1406 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1407 	{
1408 		pcp_internal_error(pcpConn, "invalid PCP connection");
1409 		return NULL;
1410 	}
1411 
1412 	snprintf(node_id, sizeof(node_id), "%d", nid);
1413 
1414 	pcp_write(pcpConn->pcpConn, "O", 1);
1415 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1416 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1417 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1418 	if (PCPFlush(pcpConn) < 0)
1419 		return NULL;
1420 	if (pcpConn->Pfdebug)
1421 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"D\", len=%d\n", ntohl(wsize));
1422 
1423 	return process_pcp_response(pcpConn, 'O');
1424 }
1425 
1426 /* --------------------------------
1427  * pcp_promote_node - promote a node given by the argument as new pgpool's main node
1428  *
1429  * return 0 on success, -1 otherwise
1430  * --------------------------------
1431  */
1432 PCPResultInfo *
pcp_promote_node(PCPConnInfo * pcpConn,int nid)1433 pcp_promote_node(PCPConnInfo * pcpConn, int nid)
1434 {
1435 	return _pcp_promote_node(pcpConn, nid, FALSE);
1436 }
1437 
1438 /* --------------------------------
1439 
1440  * and promote a node given by the argument as new pgpool's main node
1441  *
1442  * return 0 on success, -1 otherwise
1443  * --------------------------------
1444  */
1445 PCPResultInfo *
pcp_promote_node_gracefully(PCPConnInfo * pcpConn,int nid)1446 pcp_promote_node_gracefully(PCPConnInfo * pcpConn, int nid)
1447 {
1448 	return _pcp_promote_node(pcpConn, nid, TRUE);
1449 }
1450 
1451 static PCPResultInfo *
_pcp_promote_node(PCPConnInfo * pcpConn,int nid,bool gracefully)1452 _pcp_promote_node(PCPConnInfo * pcpConn, int nid, bool gracefully)
1453 {
1454 	int			wsize;
1455 	char		node_id[16];
1456 	char	   *sendchar;
1457 
1458 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1459 	{
1460 		pcp_internal_error(pcpConn, "invalid PCP connection");
1461 		return NULL;
1462 	}
1463 
1464 	snprintf(node_id, sizeof(node_id), "%d", nid);
1465 
1466 	if (gracefully)
1467 		sendchar = "j";
1468 	else
1469 		sendchar = "J";
1470 
1471 	pcp_write(pcpConn->pcpConn, sendchar, 1);
1472 	wsize = htonl(strlen(node_id) + 1 + sizeof(int));
1473 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1474 	pcp_write(pcpConn->pcpConn, node_id, strlen(node_id) + 1);
1475 	if (PCPFlush(pcpConn) < 0)
1476 		return NULL;
1477 	if (pcpConn->Pfdebug)
1478 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"E\", len=%d\n", ntohl(wsize));
1479 
1480 	return process_pcp_response(pcpConn, 'J');
1481 }
1482 
1483 static void
process_watchdog_info_response(PCPConnInfo * pcpConn,char * buf,int len)1484 process_watchdog_info_response(PCPConnInfo * pcpConn, char *buf, int len)
1485 {
1486 	char	   *json_data = NULL;
1487 	PCPWDClusterInfo *wd_cluster_info = NULL;
1488 	int			clusterDataSize = 0;
1489 
1490 	if (strcmp(buf, "CommandComplete") == 0)
1491 	{
1492 		int			tempVal;
1493 		char	   *ptr;
1494 
1495 		json_data = (char *) memchr(buf, '\0', len);
1496 		if (json_data == NULL)
1497 			goto INVALID_RESPONSE;
1498 		json_data += 1;
1499 
1500 		json_value *root;
1501 		json_value *value;
1502 		int			i,
1503 					nodeCount;
1504 
1505 		root = json_parse(json_data, len);
1506 
1507 		/* The root node must be object */
1508 		if (root == NULL || root->type != json_object)
1509 		{
1510 			json_value_free(root);
1511 			goto INVALID_RESPONSE;
1512 		}
1513 
1514 		if (json_get_int_value_for_key(root, "NodeCount", &nodeCount))
1515 		{
1516 			json_value_free(root);
1517 			goto INVALID_RESPONSE;
1518 		}
1519 
1520 		/* find the WatchdogNodes array */
1521 		value = json_get_value_for_key(root, "WatchdogNodes");
1522 		if (value == NULL)
1523 		{
1524 			json_value_free(root);
1525 			goto INVALID_RESPONSE;
1526 		}
1527 		if (value->type != json_array)
1528 		{
1529 			json_value_free(root);
1530 			goto INVALID_RESPONSE;
1531 		}
1532 		if (nodeCount != value->u.array.length)
1533 		{
1534 			json_value_free(root);
1535 			goto INVALID_RESPONSE;
1536 		}
1537 
1538 		/* create the cluster object */
1539 		clusterDataSize = sizeof(PCPWDClusterInfo) + (sizeof(PCPWDNodeInfo) * nodeCount);
1540 		wd_cluster_info = malloc(clusterDataSize);
1541 
1542 		wd_cluster_info->nodeCount = nodeCount;
1543 
1544 		if (json_get_int_value_for_key(root, "RemoteNodeCount", &wd_cluster_info->remoteNodeCount))
1545 		{
1546 			json_value_free(root);
1547 			goto INVALID_RESPONSE;
1548 		}
1549 		if (json_get_int_value_for_key(root, "QuorumStatus", &wd_cluster_info->quorumStatus))
1550 		{
1551 			json_value_free(root);
1552 			goto INVALID_RESPONSE;
1553 		}
1554 		if (json_get_int_value_for_key(root, "AliveNodeCount", &wd_cluster_info->aliveNodeCount))
1555 		{
1556 			json_value_free(root);
1557 			goto INVALID_RESPONSE;
1558 		}
1559 		if (json_get_int_value_for_key(root, "Escalated", &tempVal))
1560 		{
1561 			json_value_free(root);
1562 			goto INVALID_RESPONSE;
1563 		}
1564 		wd_cluster_info->escalated = tempVal == 0 ? false : true;
1565 
1566 		ptr = json_get_string_value_for_key(root, "LeaderNodeName");
1567 		if (ptr == NULL)
1568 		{
1569 			json_value_free(root);
1570 			goto INVALID_RESPONSE;
1571 		}
1572 		strncpy(wd_cluster_info->leaderNodeName, ptr, sizeof(wd_cluster_info->leaderNodeName) - 1);
1573 
1574 		ptr = json_get_string_value_for_key(root, "LeaderHostName");
1575 		if (ptr == NULL)
1576 		{
1577 			json_value_free(root);
1578 			goto INVALID_RESPONSE;
1579 		}
1580 		strncpy(wd_cluster_info->leaderHostName, ptr, sizeof(wd_cluster_info->leaderHostName) - 1);
1581 
1582 		/* Get watchdog nodes data */
1583 		for (i = 0; i < nodeCount; i++)
1584 		{
1585 			char	   *ptr;
1586 			json_value *nodeInfoValue = value->u.array.values[i];
1587 			PCPWDNodeInfo *wdNodeInfo = &wd_cluster_info->nodeList[i];
1588 
1589 			if (nodeInfoValue->type != json_object)
1590 			{
1591 				json_value_free(root);
1592 				goto INVALID_RESPONSE;
1593 			}
1594 
1595 			if (json_get_int_value_for_key(nodeInfoValue, "ID", &wdNodeInfo->id))
1596 			{
1597 				json_value_free(root);
1598 				goto INVALID_RESPONSE;
1599 			}
1600 
1601 			ptr = json_get_string_value_for_key(nodeInfoValue, "NodeName");
1602 			if (ptr == NULL)
1603 			{
1604 				json_value_free(root);
1605 				goto INVALID_RESPONSE;
1606 			}
1607 			strncpy(wdNodeInfo->nodeName, ptr, sizeof(wdNodeInfo->nodeName) - 1);
1608 
1609 			ptr = json_get_string_value_for_key(nodeInfoValue, "HostName");
1610 			if (ptr == NULL)
1611 			{
1612 				json_value_free(root);
1613 				goto INVALID_RESPONSE;
1614 			}
1615 			strncpy(wdNodeInfo->hostName, ptr, sizeof(wdNodeInfo->hostName) - 1);
1616 
1617 			ptr = json_get_string_value_for_key(nodeInfoValue, "DelegateIP");
1618 			if (ptr == NULL)
1619 			{
1620 				json_value_free(root);
1621 				goto INVALID_RESPONSE;
1622 			}
1623 			strncpy(wdNodeInfo->delegate_ip, ptr, sizeof(wdNodeInfo->delegate_ip) - 1);
1624 
1625 			if (json_get_int_value_for_key(nodeInfoValue, "WdPort", &wdNodeInfo->wd_port))
1626 			{
1627 				json_value_free(root);
1628 				goto INVALID_RESPONSE;
1629 			}
1630 
1631 			if (json_get_int_value_for_key(nodeInfoValue, "PgpoolPort", &wdNodeInfo->pgpool_port))
1632 			{
1633 				json_value_free(root);
1634 				goto INVALID_RESPONSE;
1635 			}
1636 
1637 			if (json_get_int_value_for_key(nodeInfoValue, "State", &wdNodeInfo->state))
1638 			{
1639 				json_value_free(root);
1640 				goto INVALID_RESPONSE;
1641 			}
1642 
1643 			ptr = json_get_string_value_for_key(nodeInfoValue, "StateName");
1644 			if (ptr == NULL)
1645 			{
1646 				json_value_free(root);
1647 				goto INVALID_RESPONSE;
1648 			}
1649 			strncpy(wdNodeInfo->stateName, ptr, sizeof(wdNodeInfo->stateName) - 1);
1650 
1651 			if (json_get_int_value_for_key(nodeInfoValue, "Priority", &wdNodeInfo->wd_priority))
1652 			{
1653 				json_value_free(root);
1654 				goto INVALID_RESPONSE;
1655 			}
1656 
1657 		}
1658 		json_value_free(root);
1659 
1660 		if (setNextResultBinaryData(pcpConn->pcpResInfo, (void *) wd_cluster_info, clusterDataSize, NULL) < 0)
1661 			goto INVALID_RESPONSE;
1662 
1663 		setCommandSuccessful(pcpConn);
1664 	}
1665 	else
1666 	{
1667 		pcp_internal_error(pcpConn,
1668 						   "command failed with reason: \"%s\"\n", buf);
1669 		setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1670 	}
1671 	return;
1672 
1673 INVALID_RESPONSE:
1674 
1675 	if (wd_cluster_info)
1676 		pfree(wd_cluster_info);
1677 	pcp_internal_error(pcpConn,
1678 					   "command failed. invalid response\n");
1679 	setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
1680 }
1681 
1682 /* --------------------------------
1683  * pcp_watchdog_info - get information of watchdog
1684  *
1685  * return structure of watchdog information on success, -1 otherwise
1686  * --------------------------------
1687  */
1688 PCPResultInfo *
pcp_watchdog_info(PCPConnInfo * pcpConn,int nid)1689 pcp_watchdog_info(PCPConnInfo * pcpConn, int nid)
1690 {
1691 	int			wsize;
1692 	char		wd_index[16];
1693 
1694 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1695 	{
1696 		pcp_internal_error(pcpConn, "invalid PCP connection");
1697 		return NULL;
1698 	}
1699 
1700 	snprintf(wd_index, sizeof(wd_index), "%d", nid);
1701 
1702 	pcp_write(pcpConn->pcpConn, "W", 1);
1703 	wsize = htonl(strlen(wd_index) + 1 + sizeof(int));
1704 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1705 	pcp_write(pcpConn->pcpConn, wd_index, strlen(wd_index) + 1);
1706 	if (PCPFlush(pcpConn) < 0)
1707 		return NULL;
1708 	if (pcpConn->Pfdebug)
1709 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"W\", len=%d\n", ntohl(wsize));
1710 
1711 	return process_pcp_response(pcpConn, 'W');
1712 }
1713 
1714 PCPResultInfo *
pcp_set_backend_parameter(PCPConnInfo * pcpConn,char * parameter_name,char * value)1715 pcp_set_backend_parameter(PCPConnInfo * pcpConn, char *parameter_name, char *value)
1716 {
1717 	int			wsize;
1718 	char		null_chr = 0;
1719 
1720 	if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
1721 	{
1722 		pcp_internal_error(pcpConn, "invalid PCP connection");
1723 		return NULL;
1724 	}
1725 	if (pcpConn->Pfdebug)
1726 		fprintf(pcpConn->Pfdebug, "DEBUG: seting: \"%s = %s\"\n", parameter_name, value);
1727 
1728 	pcp_write(pcpConn->pcpConn, "A", 1);
1729 	wsize = htonl(strlen(parameter_name) + 1 +
1730 				  strlen(value) + 1 +
1731 				  sizeof(int));
1732 	pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
1733 	pcp_write(pcpConn->pcpConn, parameter_name, strlen(parameter_name));
1734 	pcp_write(pcpConn->pcpConn, &null_chr, 1);
1735 	pcp_write(pcpConn->pcpConn, value, strlen(value));
1736 	pcp_write(pcpConn->pcpConn, &null_chr, 1);
1737 	if (PCPFlush(pcpConn) < 0)
1738 		return NULL;
1739 	if (pcpConn->Pfdebug)
1740 		fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"A\", len=%d\n", ntohl(wsize));
1741 
1742 	return process_pcp_response(pcpConn, 'A');
1743 }
1744 
1745 /*
1746  * pcpAddInternalNotice - produce an internally-generated notice message
1747  *
1748  * A format string and optional arguments can be passed.
1749  *
1750  * The supplied text is taken as primary message (ie., it should not include
1751  * a trailing newline, and should not be more than one line).
1752  */
1753 static void
pcp_internal_error(PCPConnInfo * pcpConn,const char * fmt,...)1754 pcp_internal_error(PCPConnInfo * pcpConn, const char *fmt,...)
1755 {
1756 	char		msgBuf[1024];
1757 	va_list		args;
1758 
1759 	if (pcpConn == NULL)
1760 		return;					/* nobody home to receive notice? */
1761 
1762 	/* Format the message */
1763 	va_start(args, fmt);
1764 	vsnprintf(msgBuf, sizeof(msgBuf), fmt, args);
1765 	va_end(args);
1766 	msgBuf[sizeof(msgBuf) - 1] = '\0';	/* make real sure it's terminated */
1767 	if (pcpConn->errMsg)
1768 		pfree(pcpConn->errMsg);
1769 	pcpConn->errMsg = pstrdup(msgBuf);
1770 }
1771 
1772 ConnStateType
PCPConnectionStatus(const PCPConnInfo * conn)1773 PCPConnectionStatus(const PCPConnInfo * conn)
1774 {
1775 	if (!conn)
1776 		return PCP_CONNECTION_BAD;
1777 	return conn->connState;
1778 }
1779 
1780 ResultStateType
PCPResultStatus(const PCPResultInfo * res)1781 PCPResultStatus(const PCPResultInfo * res)
1782 {
1783 	if (!res)
1784 		return PCP_RES_ERROR;
1785 	return res->resultStatus;
1786 }
1787 
1788 static void
setResultStatus(PCPConnInfo * pcpConn,ResultStateType resultState)1789 setResultStatus(PCPConnInfo * pcpConn, ResultStateType resultState)
1790 {
1791 	if (pcpConn && pcpConn->pcpResInfo)
1792 		pcpConn->pcpResInfo->resultStatus = resultState;
1793 }
1794 
1795 static void
setCommandSuccessful(PCPConnInfo * pcpConn)1796 setCommandSuccessful(PCPConnInfo * pcpConn)
1797 {
1798 	setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
1799 }
1800 
1801 static void
setResultSlotCount(PCPConnInfo * pcpConn,unsigned int slotCount)1802 setResultSlotCount(PCPConnInfo * pcpConn, unsigned int slotCount)
1803 {
1804 	if (pcpConn && pcpConn->pcpResInfo && slotCount > 0)
1805 	{
1806 		if (pcpConn->pcpResInfo->resultSlots == 0)
1807 			pcpConn->pcpResInfo->resultSlots = 1;
1808 
1809 		if (slotCount > pcpConn->pcpResInfo->resultSlots)
1810 		{
1811 			pcpConn->pcpResInfo = repalloc(pcpConn->pcpResInfo, sizeof(PCPResultInfo) + (sizeof(PCPResultSlot) * (slotCount - 1)));
1812 		}
1813 		pcpConn->pcpResInfo->resultSlots = slotCount;
1814 	}
1815 }
1816 
1817 static void
setResultIntData(PCPResultInfo * res,unsigned int slotno,int value)1818 setResultIntData(PCPResultInfo * res, unsigned int slotno, int value)
1819 {
1820 	if (res)
1821 	{
1822 		res->resultSlot[slotno].datalen = 0;
1823 		res->resultSlot[slotno].isint = 1;
1824 		res->resultSlot[slotno].data.integer = value;
1825 		res->resultSlot[slotno].free_func = NULL;
1826 	}
1827 }
1828 
1829 static void
setResultBinaryData(PCPResultInfo * res,unsigned int slotno,void * value,int datalen,void (* free_func)(struct PCPConnInfo *,void *))1830 setResultBinaryData(PCPResultInfo * res, unsigned int slotno, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *))
1831 {
1832 	if (res)
1833 	{
1834 		res->resultSlot[slotno].datalen = datalen;
1835 		res->resultSlot[slotno].isint = 0;
1836 		res->resultSlot[slotno].data.ptr = value;
1837 		res->resultSlot[slotno].free_func = free_func;
1838 	}
1839 }
1840 
1841 static int
setNextResultBinaryData(PCPResultInfo * res,void * value,int datalen,void (* free_func)(struct PCPConnInfo *,void *))1842 setNextResultBinaryData(PCPResultInfo * res, void *value, int datalen, void (*free_func) (struct PCPConnInfo *, void *))
1843 {
1844 	if (res && res->nextFillSlot < res->resultSlots)
1845 	{
1846 		setResultBinaryData(res, res->nextFillSlot, value, datalen, free_func);
1847 		res->nextFillSlot++;
1848 		return res->nextFillSlot;
1849 	}
1850 	return -1;
1851 }
1852 
1853 static int
PCPFlush(PCPConnInfo * pcpConn)1854 PCPFlush(PCPConnInfo * pcpConn)
1855 {
1856 	int			ret = pcp_flush(pcpConn->pcpConn);
1857 
1858 	if (ret)
1859 		pcp_internal_error(pcpConn,
1860 						   "ERROR: sending data to backend failed with error \"%s\"", strerror(errno));
1861 	return ret;
1862 }
1863 
1864 int
pcp_result_slot_count(PCPResultInfo * res)1865 pcp_result_slot_count(PCPResultInfo * res)
1866 {
1867 	if (res)
1868 		return res->resultSlots;
1869 	return 0;
1870 }
1871 
1872 /* Returns 1 if ResultInfo has no data. 0 otherwise */
1873 int
pcp_result_is_empty(PCPResultInfo * res)1874 pcp_result_is_empty(PCPResultInfo * res)
1875 {
1876 	if (res)
1877 	{
1878 		if (res->resultSlots <= 1 && res->resultSlot[0].isint == 0 && res->resultSlot[0].datalen <= 0)
1879 			return 1;
1880 		return 0;
1881 	}
1882 	return 1;
1883 }
1884 
1885 void *
pcp_get_binary_data(const PCPResultInfo * res,unsigned int slotno)1886 pcp_get_binary_data(const PCPResultInfo * res, unsigned int slotno)
1887 {
1888 	if (res && slotno < res->resultSlots && !res->resultSlot[slotno].isint)
1889 	{
1890 		return res->resultSlot[slotno].data.ptr;
1891 	}
1892 	return NULL;
1893 }
1894 
1895 int
pcp_get_int_data(const PCPResultInfo * res,unsigned int slotno)1896 pcp_get_int_data(const PCPResultInfo * res, unsigned int slotno)
1897 {
1898 	if (res && slotno < res->resultSlots && res->resultSlot[slotno].isint)
1899 	{
1900 		return res->resultSlot[slotno].data.integer;
1901 	}
1902 	return 0;
1903 }
1904 
1905 int
pcp_get_data_length(const PCPResultInfo * res,unsigned int slotno)1906 pcp_get_data_length(const PCPResultInfo * res, unsigned int slotno)
1907 {
1908 	if (res && slotno < res->resultSlots)
1909 	{
1910 		return res->resultSlot[slotno].datalen;
1911 	}
1912 	return 0;
1913 }
1914 
1915 void
pcp_free_result(PCPConnInfo * pcpConn)1916 pcp_free_result(PCPConnInfo * pcpConn)
1917 {
1918 	if (pcpConn && pcpConn->pcpResInfo)
1919 	{
1920 		PCPResultInfo *pcpRes = pcpConn->pcpResInfo;
1921 		int			i;
1922 
1923 		for (i = 0; i < pcpRes->resultSlots; i++)
1924 		{
1925 			if (pcpRes->resultSlot[i].isint)
1926 				continue;
1927 			if (pcpRes->resultSlot[i].data.ptr == NULL)
1928 				continue;
1929 
1930 			if (pcpRes->resultSlot[i].free_func)
1931 				pcpRes->resultSlot[i].free_func(pcpConn, pcpRes->resultSlot[i].data.ptr);
1932 			else
1933 				pfree(pcpRes->resultSlot[i].data.ptr);
1934 			pcpRes->resultSlot[i].data.ptr = NULL;
1935 		}
1936 		pfree(pcpConn->pcpResInfo);
1937 		pcpConn->pcpResInfo = NULL;
1938 	}
1939 }
1940 
1941 void
pcp_free_connection(PCPConnInfo * pcpConn)1942 pcp_free_connection(PCPConnInfo * pcpConn)
1943 {
1944 	if (pcpConn)
1945 	{
1946 		pcp_free_result(pcpConn);
1947 		if (pcpConn->errMsg)
1948 			pfree(pcpConn->errMsg);
1949 		/* Should we also Disconnect it? */
1950 		pfree(pcpConn);
1951 	}
1952 }
1953 
1954 char *
pcp_get_last_error(PCPConnInfo * pcpConn)1955 pcp_get_last_error(PCPConnInfo * pcpConn)
1956 {
1957 	if (pcpConn)
1958 		return pcpConn->errMsg;
1959 	return NULL;
1960 }
1961 
1962 /*
1963  * get the password file name which could be either pointed by PCPPASSFILE
1964  * environment variable or resides in user home directory.
1965  */
1966 static bool
getPoolPassFilename(char * pgpassfile)1967 getPoolPassFilename(char *pgpassfile)
1968 {
1969 	char	   *passfile_env;
1970 
1971 	if ((passfile_env = getenv("PCPPASSFILE")) != NULL)
1972 	{
1973 		/* use the literal path from the environment, if set */
1974 		strlcpy(pgpassfile, passfile_env, MAXPGPATH);
1975 	}
1976 	else
1977 	{
1978 		char		homedir[MAXPGPATH];
1979 
1980 		if (!get_home_directory(homedir, sizeof(homedir)))
1981 			return false;
1982 		snprintf(pgpassfile, MAXPGPATH + sizeof(PCPPASSFILE) + 1, "%s/%s", homedir, PCPPASSFILE);
1983 	}
1984 	return true;
1985 }
1986 
1987 /*
1988  * Get a password from the password file. Return value is malloc'd.
1989  * format = hostname:port:username:password
1990  */
1991 static char *
PasswordFromFile(PCPConnInfo * pcpConn,char * hostname,char * port,char * username)1992 PasswordFromFile(PCPConnInfo * pcpConn, char *hostname, char *port, char *username)
1993 {
1994 	FILE	   *fp;
1995 	char		pgpassfile[MAXPGPATH + sizeof(PCPPASSFILE) + 1];
1996 	struct stat stat_buf;
1997 #define LINELEN NAMEDATALEN*5
1998 	char		buf[LINELEN];
1999 
2000 	if (username == NULL || strlen(username) == 0)
2001 		return NULL;
2002 
2003 	if (hostname == NULL)
2004 		hostname = DefaultHost;
2005 	else if (strcmp(hostname, UNIX_DOMAIN_PATH) == 0)
2006 		hostname = DefaultHost;
2007 
2008 	if (!getPoolPassFilename(pgpassfile))
2009 		return NULL;
2010 
2011 	/* If password file cannot be opened, ignore it. */
2012 	if (stat(pgpassfile, &stat_buf) != 0)
2013 		return NULL;
2014 
2015 	if (!S_ISREG(stat_buf.st_mode))
2016 	{
2017 		if (pcpConn->Pfdebug)
2018 			fprintf(pcpConn->Pfdebug, "WARNING: password file \"%s\" is not a plain file\n", pgpassfile);
2019 		return NULL;
2020 	}
2021 
2022 	/* If password file is insecure, alert the user and ignore it. */
2023 	if (stat_buf.st_mode & (S_IRWXG | S_IRWXO))
2024 	{
2025 		if (pcpConn->Pfdebug)
2026 			fprintf(pcpConn->Pfdebug, "WARNING: password file \"%s\" has group or world access; permissions should be u=rw (0600) or less\n",
2027 					pgpassfile);
2028 		return NULL;
2029 	}
2030 
2031 	fp = fopen(pgpassfile, "r");
2032 	if (fp == NULL)
2033 		return NULL;
2034 
2035 	while (!feof(fp) && !ferror(fp))
2036 	{
2037 		char	   *t = buf,
2038 				   *ret,
2039 				   *p1,
2040 				   *p2;
2041 		int			len;
2042 
2043 		if (fgets(buf, sizeof(buf), fp) == NULL)
2044 			break;
2045 
2046 		len = strlen(buf);
2047 		if (len == 0)
2048 			continue;
2049 
2050 		/* Remove trailing newline */
2051 		if (buf[len - 1] == '\n')
2052 			buf[len - 1] = 0;
2053 
2054 		if ((t = pwdfMatchesString(t, hostname)) == NULL ||
2055 			(t = pwdfMatchesString(t, port)) == NULL ||
2056 			(t = pwdfMatchesString(t, username)) == NULL)
2057 			continue;
2058 		ret = pstrdup(t);
2059 		fclose(fp);
2060 
2061 		/* De-escape password. */
2062 		for (p1 = p2 = ret; *p1 != ':' && *p1 != '\0'; ++p1, ++p2)
2063 		{
2064 			if (*p1 == '\\' && p1[1] != '\0')
2065 				++p1;
2066 			*p2 = *p1;
2067 		}
2068 		*p2 = '\0';
2069 
2070 		return ret;
2071 	}
2072 
2073 	fclose(fp);
2074 	return NULL;
2075 
2076 #undef LINELEN
2077 }
2078 
2079 /*
2080  * Helper function for PasswordFromFile borrowed from PG
2081  * returns a pointer to the next token or NULL if the current
2082  * token doesn't match
2083  */
2084 static char *
pwdfMatchesString(char * buf,char * token)2085 pwdfMatchesString(char *buf, char *token)
2086 {
2087 	char	   *tbuf,
2088 			   *ttok;
2089 	bool		bslash = false;
2090 
2091 	if (buf == NULL || token == NULL)
2092 		return NULL;
2093 	tbuf = buf;
2094 	ttok = token;
2095 	if (tbuf[0] == '*' && tbuf[1] == ':')
2096 		return tbuf + 2;
2097 	while (*tbuf != 0)
2098 	{
2099 		if (*tbuf == '\\' && !bslash)
2100 		{
2101 			tbuf++;
2102 			bslash = true;
2103 		}
2104 		if (*tbuf == ':' && *ttok == 0 && !bslash)
2105 			return tbuf + 1;
2106 		bslash = false;
2107 		if (*ttok == 0)
2108 			return NULL;
2109 		if (*tbuf == *ttok)
2110 		{
2111 			tbuf++;
2112 			ttok++;
2113 		}
2114 		else
2115 			return NULL;
2116 	}
2117 	return NULL;
2118 }
2119