1 /*
2  * Copyright (c) 2017-2018	Tatsuo Ishii
3  * Copyright (c) 2018-2021	PgPool Global Development Group
4  *
5  * Permission to use, copy, modify, and distribute this software and
6  * its documentation for any purpose and without fee is hereby
7  * granted, provided that the above copyright notice appear in all
8  * copies and that both that copyright notice and this permission
9  * notice appear in supporting documentation, and that the name of the
10  * author not be used in advertising or publicity pertaining to
11  * distribution of the software without specific, written prior
12  * permission. The author makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as
14  * is" without express or implied warranty.
15  *
16  * Functions to read packets from connection to PostgreSQL.
17  */
18 
19 #include "../../include/config.h"
20 #include "pgproto/pgproto.h"
21 #include <unistd.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <arpa/inet.h>
27 #include "pgproto/fe_memutils.h"
28 #include <libpq-fe.h>
29 #include "pgproto/read.h"
30 
31 static char read_char(PGconn *conn);
32 static int	read_int32(PGconn *conn);
33 static char *read_bytes(int len, PGconn *conn);
34 static void read_and_discard(PGconn *conn);
35 static void read_it(PGconn *conn, char *buf, int len);
36 static char *read_string(PGconn *conn);
37 
38 /*
39  * Read message from connection until ready for query message is received.  If
40  * a positive timeout is given, wait for timeout seconds then return if no
41  * data is availble from the connection.
42  */
43 void
read_until_ready_for_query(PGconn * conn,int timeout)44 read_until_ready_for_query(PGconn *conn, int timeout)
45 {
46 	int			kind;
47 	int			len;
48 	char	   *buf;
49 	char		c;
50 	char	   *p;
51 	int			fd;
52 	int			cont;
53 	struct timeval timeoutval;
54 	fd_set		readmask;
55 	int			fds;
56 
57 	cont = 1;
58 
59 	while (cont)
60 	{
61 		if (timeout > 0)
62 		{
63 			fd = PQsocket(conn);
64 
65 			for (;;)
66 			{
67 				FD_ZERO(&readmask);
68 				FD_SET(fd, &readmask);
69 				timeoutval.tv_sec = timeout;
70 				timeoutval.tv_usec = 0;
71 				fds = select(fd + 1, &readmask, NULL, NULL, &timeoutval);
72 				if (fds == -1)
73 				{
74 					if (errno == EAGAIN || errno == EINTR)
75 					{
76 						continue;
77 					}
78 					else
79 					{
80 						fprintf(stderr, "reading from Pgpool-II failed. reason: %s\n",
81 								strerror(errno));
82 						exit(1);
83 					}
84 				}
85 				else if (fds == 0)
86 				{
87 					/* socket is not ready for reading */
88 					return;
89 				}
90 				else
91 					/* socket is ready for reading */
92 					break;
93 			}
94 		}
95 
96 		kind = read_char(conn);
97 		switch (kind)
98 		{
99 			char	*channel, *payload;
100 			int		pid;
101 
102 			case '1':			/* Parse complete */
103 				fprintf(stderr, "<= BE ParseComplete\n");
104 				read_and_discard(conn);
105 				break;
106 
107 			case '2':			/* Bind complete */
108 				fprintf(stderr, "<= BE BindComplete\n");
109 				read_and_discard(conn);
110 				break;
111 
112 			case '3':			/* Close complete */
113 				fprintf(stderr, "<= BE CloseComplete\n");
114 				read_and_discard(conn);
115 				break;
116 
117 			case 'C':			/* Command complete */
118 				len = read_int32(conn);
119 				buf = read_bytes(len - sizeof(int), conn);
120 				fprintf(stderr, "<= BE CommandComplete(%s)\n", buf);
121 				pg_free(buf);
122 				break;
123 
124 			case 'D':			/* Data row */
125 				fprintf(stderr, "<= BE DataRow\n");
126 				read_and_discard(conn);
127 				break;
128 
129 			case 'E':			/* Error response */
130 			case 'N':			/* Notice response */
131 				if (kind == 'E')
132 					fprintf(stderr, "<= BE ErrorResponse(");
133 				else
134 					fprintf(stderr, "<= BE NoticeResponse(");
135 				len = read_int32(conn);
136 				p = buf = read_bytes(len - sizeof(int), conn);
137 				while (*p)
138 				{
139 					fprintf(stderr, "%c ", *p);
140 					p++;
141 					fprintf(stderr, "%s ", p);
142 					p += strlen(p) + 1;
143 				}
144 
145 				fprintf(stderr, ")\n");
146 
147 				pg_free(buf);
148 				break;
149 
150 			case 'G':			/* Copy in response */
151 				fprintf(stderr, "<= BE CopyInResponse\n");
152 				read_and_discard(conn);
153 				break;
154 
155 			case 'H':			/* Copy out response */
156 				fprintf(stderr, "<= BE CopyOutResponse\n");
157 				read_and_discard(conn);
158 				break;
159 
160 			case 'I':			/* Empty query response */
161 				fprintf(stderr, "<= BE EmptyQueryResponse\n");
162 				read_and_discard(conn);
163 				break;
164 
165 			case 'S':			/* Parameter status */
166 				fprintf(stderr, "<= BE ParameterStatus\n");
167 				read_and_discard(conn);
168 				break;
169 
170 			case 'T':			/* Row Description */
171 				fprintf(stderr, "<= BE RowDescription\n");
172 				read_and_discard(conn);
173 				break;
174 
175 			case 'V':			/* Function call response */
176 				fprintf(stderr, "<= BE FunctionCallResponse\n");
177 				read_and_discard(conn);
178 				break;
179 
180 			case 'W':			/* Copy both response */
181 				fprintf(stderr, "<= BE CopyBothResponse\n");
182 				read_and_discard(conn);
183 				break;
184 
185 			case 'Z':			/* Ready for Query */
186 				if (read_int32(conn) < 0)
187 				{
188 					fprintf(stderr, "read_int32() failed\n");
189 					exit(1);
190 				}
191 				c = read_char(conn);
192 				fprintf(stderr, "<= BE ReadyForQuery(%c)\n", c);
193 				cont = 0;
194 				break;
195 
196 			case 'c':			/* Copy Done */
197 				fprintf(stderr, "<= BE CopyDone\n");
198 				read_and_discard(conn);
199 				break;
200 
201 			case 'd':			/* Copy Data */
202 				fprintf(stderr, "<= BE CopyData\n");
203 				read_and_discard(conn);
204 				break;
205 
206 			case 'n':			/* No data */
207 				fprintf(stderr, "<= BE NoData\n");
208 				read_and_discard(conn);
209 				break;
210 
211 			case 's':			/* Portal suspended */
212 				fprintf(stderr, "<= BE PortalSuspended\n");
213 				read_and_discard(conn);
214 				break;
215 
216 			case 't':			/* Parameter description */
217 				fprintf(stderr, "<= BE ParameterDescription\n");
218 				read_and_discard(conn);
219 				break;
220 
221 			case 'A':			/* Notification response */
222 				len = read_int32(conn);
223 				(void)len;
224 				pid = read_int32(conn);
225 
226 				channel = read_string(conn);
227 				if (channel)
228 				{
229 					payload = read_string(conn);
230 					if (payload)
231 					{
232 						fprintf(stderr, "<= BE Notification response. pid: %d channel: %s payload: \"%s\"\n",
233 								pid, channel, payload);
234 						free(payload);
235 					}
236 					else
237 					{
238 						fprintf(stderr, "<= BE Notification response. pid: %d channel: %s\n",
239 								pid, channel);
240 					}
241 					free(channel);
242 				}
243 				else
244 				{
245 					fprintf(stderr, "<= BE Notification response. pid: %d\n",
246 						pid);
247 				}
248 				break;
249 
250 			default:
251 				fprintf(stderr, "<= BE (%c)\n", kind);
252 				read_and_discard(conn);
253 				break;
254 		}
255 
256 		/* If nap-bwteen-line is requested, nap for some time */
257 		if (read_nap > 0)
258 		{
259 			(void) usleep(read_nap);
260 		}
261 	}
262 }
263 
264 /*
265  * Read a character from connection.
266  */
267 static char
read_char(PGconn * conn)268 read_char(PGconn *conn)
269 {
270 	char		c;
271 
272 	read_it(conn, (char *) &c, sizeof(c));
273 
274 	return c;
275 }
276 
277 /*
278  * Read an integer from connection.
279  */
280 static int
read_int32(PGconn * conn)281 read_int32(PGconn *conn)
282 {
283 	int			len;
284 
285 	read_it(conn, (char *) &len, sizeof(len));
286 
287 	return ntohl(len);
288 }
289 
290 /*
291  * Read specified length of bytes from connection.
292  * pg_malloc'ed buffer is returned.
293  */
294 static char *
read_bytes(int len,PGconn * conn)295 read_bytes(int len, PGconn *conn)
296 {
297 	char	   *buf;
298 
299 	buf = pg_malloc(len);
300 
301 	read_it(conn, buf, len);
302 
303 	return buf;
304 }
305 
306 /*
307  * Read and discard a packet.
308  */
309 static void
read_and_discard(PGconn * conn)310 read_and_discard(PGconn *conn)
311 {
312 	int			len;
313 	char	   *buf;
314 
315 	len = read_int32(conn);
316 
317 	if (len > sizeof(int))
318 	{
319 		buf = read_bytes(len - sizeof(int), conn);
320 		pg_free(buf);
321 	}
322 }
323 
324 /*
325  * Read requested bytes from conn. exit in case of error or EOF.
326  */
327 static void
read_it(PGconn * conn,char * buf,int len)328 read_it(PGconn *conn, char *buf, int len)
329 {
330 	int			sts;
331 
332 	if (len <= 0)
333 		return;
334 
335 	for (;;)
336 	{
337 		sts = read(PQsocket(conn), buf, len);
338 
339 		if (sts == 0)
340 		{
341 			fprintf(stderr, "read_it: EOF detected");
342 			exit(1);
343 		}
344 		else if (sts < 0)
345 		{
346 			fprintf(stderr, "read_it: read(2) returns error %s\n", strerror(errno));
347 			exit(1);
348 		}
349 
350 		len -= sts;
351 		buf += sts;
352 
353 		if (len <= 0)
354 			break;
355 	}
356 }
357 
358 /*
359  * Read a string from conn and returns malloced buffer pointer.
360  */
361 static char *
read_string(PGconn * conn)362 read_string(PGconn *conn)
363 {
364 #define	PROTO_ALLOC_SIZE	512
365 
366 	int			sts;
367 	char		*buf;
368 	char		*p;
369 	int			alloc_factor = 1;
370 	int			len;
371 
372 	buf = pg_malloc(PROTO_ALLOC_SIZE);
373 	len = PROTO_ALLOC_SIZE;
374 	p = buf;
375 
376 	for (;;)
377 	{
378 		sts = read(PQsocket(conn), p, 1);
379 
380 		if (sts == 0)
381 		{
382 			fprintf(stderr, "read_string: EOF detected");
383 			exit(1);
384 		}
385 		else if (sts < 0)
386 		{
387 			fprintf(stderr, "read_string: read(2) returns error %s\n", strerror(errno));
388 			exit(1);
389 		}
390 
391 
392 		if (*p == '\0')
393 		{
394 			return buf;
395 		}
396 
397 		len--;
398 		p++;
399 
400 		if (len <= 0)
401 		{
402 			alloc_factor++;
403 			buf = pg_realloc(buf, PROTO_ALLOC_SIZE * alloc_factor);
404 			len = PROTO_ALLOC_SIZE;
405 		}
406 	}
407 
408 }
409