1 /*
2 * Copyright (c) 2017-2018 Tatsuo Ishii
3 * Copyright (c) 2018-2021 PgPool Global Development Group
4 *
5 * Permission to use, copy, modify, and distribute this software and
6 * its documentation for any purpose and without fee is hereby
7 * granted, provided that the above copyright notice appear in all
8 * copies and that both that copyright notice and this permission
9 * notice appear in supporting documentation, and that the name of the
10 * author not be used in advertising or publicity pertaining to
11 * distribution of the software without specific, written prior
12 * permission. The author makes no representations about the
13 * suitability of this software for any purpose. It is provided "as
14 * is" without express or implied warranty.
15 *
16 * Functions to read packets from connection to PostgreSQL.
17 */
18
19 #include "../../include/config.h"
20 #include "pgproto/pgproto.h"
21 #include <unistd.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <arpa/inet.h>
27 #include "pgproto/fe_memutils.h"
28 #include <libpq-fe.h>
29 #include "pgproto/read.h"
30
31 static char read_char(PGconn *conn);
32 static int read_int32(PGconn *conn);
33 static char *read_bytes(int len, PGconn *conn);
34 static void read_and_discard(PGconn *conn);
35 static void read_it(PGconn *conn, char *buf, int len);
36 static char *read_string(PGconn *conn);
37
38 /*
39 * Read message from connection until ready for query message is received. If
40 * a positive timeout is given, wait for timeout seconds then return if no
41 * data is availble from the connection.
42 */
43 void
read_until_ready_for_query(PGconn * conn,int timeout)44 read_until_ready_for_query(PGconn *conn, int timeout)
45 {
46 int kind;
47 int len;
48 char *buf;
49 char c;
50 char *p;
51 int fd;
52 int cont;
53 struct timeval timeoutval;
54 fd_set readmask;
55 int fds;
56
57 cont = 1;
58
59 while (cont)
60 {
61 if (timeout > 0)
62 {
63 fd = PQsocket(conn);
64
65 for (;;)
66 {
67 FD_ZERO(&readmask);
68 FD_SET(fd, &readmask);
69 timeoutval.tv_sec = timeout;
70 timeoutval.tv_usec = 0;
71 fds = select(fd + 1, &readmask, NULL, NULL, &timeoutval);
72 if (fds == -1)
73 {
74 if (errno == EAGAIN || errno == EINTR)
75 {
76 continue;
77 }
78 else
79 {
80 fprintf(stderr, "reading from Pgpool-II failed. reason: %s\n",
81 strerror(errno));
82 exit(1);
83 }
84 }
85 else if (fds == 0)
86 {
87 /* socket is not ready for reading */
88 return;
89 }
90 else
91 /* socket is ready for reading */
92 break;
93 }
94 }
95
96 kind = read_char(conn);
97 switch (kind)
98 {
99 char *channel, *payload;
100 int pid;
101
102 case '1': /* Parse complete */
103 fprintf(stderr, "<= BE ParseComplete\n");
104 read_and_discard(conn);
105 break;
106
107 case '2': /* Bind complete */
108 fprintf(stderr, "<= BE BindComplete\n");
109 read_and_discard(conn);
110 break;
111
112 case '3': /* Close complete */
113 fprintf(stderr, "<= BE CloseComplete\n");
114 read_and_discard(conn);
115 break;
116
117 case 'C': /* Command complete */
118 len = read_int32(conn);
119 buf = read_bytes(len - sizeof(int), conn);
120 fprintf(stderr, "<= BE CommandComplete(%s)\n", buf);
121 pg_free(buf);
122 break;
123
124 case 'D': /* Data row */
125 fprintf(stderr, "<= BE DataRow\n");
126 read_and_discard(conn);
127 break;
128
129 case 'E': /* Error response */
130 case 'N': /* Notice response */
131 if (kind == 'E')
132 fprintf(stderr, "<= BE ErrorResponse(");
133 else
134 fprintf(stderr, "<= BE NoticeResponse(");
135 len = read_int32(conn);
136 p = buf = read_bytes(len - sizeof(int), conn);
137 while (*p)
138 {
139 fprintf(stderr, "%c ", *p);
140 p++;
141 fprintf(stderr, "%s ", p);
142 p += strlen(p) + 1;
143 }
144
145 fprintf(stderr, ")\n");
146
147 pg_free(buf);
148 break;
149
150 case 'G': /* Copy in response */
151 fprintf(stderr, "<= BE CopyInResponse\n");
152 read_and_discard(conn);
153 break;
154
155 case 'H': /* Copy out response */
156 fprintf(stderr, "<= BE CopyOutResponse\n");
157 read_and_discard(conn);
158 break;
159
160 case 'I': /* Empty query response */
161 fprintf(stderr, "<= BE EmptyQueryResponse\n");
162 read_and_discard(conn);
163 break;
164
165 case 'S': /* Parameter status */
166 fprintf(stderr, "<= BE ParameterStatus\n");
167 read_and_discard(conn);
168 break;
169
170 case 'T': /* Row Description */
171 fprintf(stderr, "<= BE RowDescription\n");
172 read_and_discard(conn);
173 break;
174
175 case 'V': /* Function call response */
176 fprintf(stderr, "<= BE FunctionCallResponse\n");
177 read_and_discard(conn);
178 break;
179
180 case 'W': /* Copy both response */
181 fprintf(stderr, "<= BE CopyBothResponse\n");
182 read_and_discard(conn);
183 break;
184
185 case 'Z': /* Ready for Query */
186 if (read_int32(conn) < 0)
187 {
188 fprintf(stderr, "read_int32() failed\n");
189 exit(1);
190 }
191 c = read_char(conn);
192 fprintf(stderr, "<= BE ReadyForQuery(%c)\n", c);
193 cont = 0;
194 break;
195
196 case 'c': /* Copy Done */
197 fprintf(stderr, "<= BE CopyDone\n");
198 read_and_discard(conn);
199 break;
200
201 case 'd': /* Copy Data */
202 fprintf(stderr, "<= BE CopyData\n");
203 read_and_discard(conn);
204 break;
205
206 case 'n': /* No data */
207 fprintf(stderr, "<= BE NoData\n");
208 read_and_discard(conn);
209 break;
210
211 case 's': /* Portal suspended */
212 fprintf(stderr, "<= BE PortalSuspended\n");
213 read_and_discard(conn);
214 break;
215
216 case 't': /* Parameter description */
217 fprintf(stderr, "<= BE ParameterDescription\n");
218 read_and_discard(conn);
219 break;
220
221 case 'A': /* Notification response */
222 len = read_int32(conn);
223 (void)len;
224 pid = read_int32(conn);
225
226 channel = read_string(conn);
227 if (channel)
228 {
229 payload = read_string(conn);
230 if (payload)
231 {
232 fprintf(stderr, "<= BE Notification response. pid: %d channel: %s payload: \"%s\"\n",
233 pid, channel, payload);
234 free(payload);
235 }
236 else
237 {
238 fprintf(stderr, "<= BE Notification response. pid: %d channel: %s\n",
239 pid, channel);
240 }
241 free(channel);
242 }
243 else
244 {
245 fprintf(stderr, "<= BE Notification response. pid: %d\n",
246 pid);
247 }
248 break;
249
250 default:
251 fprintf(stderr, "<= BE (%c)\n", kind);
252 read_and_discard(conn);
253 break;
254 }
255
256 /* If nap-bwteen-line is requested, nap for some time */
257 if (read_nap > 0)
258 {
259 (void) usleep(read_nap);
260 }
261 }
262 }
263
264 /*
265 * Read a character from connection.
266 */
267 static char
read_char(PGconn * conn)268 read_char(PGconn *conn)
269 {
270 char c;
271
272 read_it(conn, (char *) &c, sizeof(c));
273
274 return c;
275 }
276
277 /*
278 * Read an integer from connection.
279 */
280 static int
read_int32(PGconn * conn)281 read_int32(PGconn *conn)
282 {
283 int len;
284
285 read_it(conn, (char *) &len, sizeof(len));
286
287 return ntohl(len);
288 }
289
290 /*
291 * Read specified length of bytes from connection.
292 * pg_malloc'ed buffer is returned.
293 */
294 static char *
read_bytes(int len,PGconn * conn)295 read_bytes(int len, PGconn *conn)
296 {
297 char *buf;
298
299 buf = pg_malloc(len);
300
301 read_it(conn, buf, len);
302
303 return buf;
304 }
305
306 /*
307 * Read and discard a packet.
308 */
309 static void
read_and_discard(PGconn * conn)310 read_and_discard(PGconn *conn)
311 {
312 int len;
313 char *buf;
314
315 len = read_int32(conn);
316
317 if (len > sizeof(int))
318 {
319 buf = read_bytes(len - sizeof(int), conn);
320 pg_free(buf);
321 }
322 }
323
324 /*
325 * Read requested bytes from conn. exit in case of error or EOF.
326 */
327 static void
read_it(PGconn * conn,char * buf,int len)328 read_it(PGconn *conn, char *buf, int len)
329 {
330 int sts;
331
332 if (len <= 0)
333 return;
334
335 for (;;)
336 {
337 sts = read(PQsocket(conn), buf, len);
338
339 if (sts == 0)
340 {
341 fprintf(stderr, "read_it: EOF detected");
342 exit(1);
343 }
344 else if (sts < 0)
345 {
346 fprintf(stderr, "read_it: read(2) returns error %s\n", strerror(errno));
347 exit(1);
348 }
349
350 len -= sts;
351 buf += sts;
352
353 if (len <= 0)
354 break;
355 }
356 }
357
358 /*
359 * Read a string from conn and returns malloced buffer pointer.
360 */
361 static char *
read_string(PGconn * conn)362 read_string(PGconn *conn)
363 {
364 #define PROTO_ALLOC_SIZE 512
365
366 int sts;
367 char *buf;
368 char *p;
369 int alloc_factor = 1;
370 int len;
371
372 buf = pg_malloc(PROTO_ALLOC_SIZE);
373 len = PROTO_ALLOC_SIZE;
374 p = buf;
375
376 for (;;)
377 {
378 sts = read(PQsocket(conn), p, 1);
379
380 if (sts == 0)
381 {
382 fprintf(stderr, "read_string: EOF detected");
383 exit(1);
384 }
385 else if (sts < 0)
386 {
387 fprintf(stderr, "read_string: read(2) returns error %s\n", strerror(errno));
388 exit(1);
389 }
390
391
392 if (*p == '\0')
393 {
394 return buf;
395 }
396
397 len--;
398 p++;
399
400 if (len <= 0)
401 {
402 alloc_factor++;
403 buf = pg_realloc(buf, PROTO_ALLOC_SIZE * alloc_factor);
404 len = PROTO_ALLOC_SIZE;
405 }
406 }
407
408 }
409