1 /*
2 * $Header$
3 *
4 * pgpool: a language independent connection pool server for PostgreSQL
5 * written by Tatsuo Ishii
6 *
7 * Copyright (c) 2003-2015 PgPool Global Development Group
8 *
9 * Permission to use, copy, modify, and distribute this software and
10 * its documentation for any purpose and without fee is hereby
11 * granted, provided that the above copyright notice appear in all
12 * copies and that both that copyright notice and this permission
13 * notice appear in supporting documentation, and that the name of the
14 * author not be used in advertising or publicity pertaining to
15 * distribution of the software without specific, written prior
16 * permission. The author makes no representations about the
17 * suitability of this software for any purpose. It is provided "as
18 * is" without express or implied warranty.
19 *
20 * PCP buffer management module.
21 */
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <unistd.h>
27 #include <time.h>
28 #include <sys/types.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31
32 #include "pcp/pcp.h"
33 #include "pcp/pcp_stream.h"
34
35 #ifndef POOL_PRIVATE
36 #include "utils/palloc.h"
37 #include "utils/memutils.h"
38 #else
39 #include "utils/fe_ports.h"
40 #endif
41
42 static int consume_pending_data(PCP_CONNECTION * pc, void *data, int len);
43 static int save_pending_data(PCP_CONNECTION * pc, void *data, int len);
44 static int pcp_check_fd(PCP_CONNECTION * pc);
45
46 /* --------------------------------
47 * pcp_open - allocate read & write buffers for PCP_CONNECTION
48 *
49 * return newly allocated PCP_CONNECTION on success, NULL if malloc() fails
50 * --------------------------------
51 */
52 PCP_CONNECTION *
pcp_open(int fd)53 pcp_open(int fd)
54 {
55 PCP_CONNECTION *pc;
56
57 #ifndef POOL_PRIVATE
58 MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
59 #endif
60
61 pc = (PCP_CONNECTION *) palloc0(sizeof(PCP_CONNECTION));
62 /* initialize write buffer */
63 pc->wbuf = palloc(WRITEBUFSZ);
64 pc->wbufsz = WRITEBUFSZ;
65 pc->wbufpo = 0;
66
67 /* initialize pending data buffer */
68 pc->hp = palloc(READBUFSZ);
69 pc->bufsz = READBUFSZ;
70 pc->po = 0;
71 pc->len = 0;
72
73 #ifndef POOL_PRIVATE
74 MemoryContextSwitchTo(oldContext);
75 #endif
76
77 pc->fd = fd;
78 return pc;
79 }
80
81 /* --------------------------------
82 * pcp_close - deallocate read & write buffers for PCP_CONNECTION
83 * --------------------------------
84 */
85 void
pcp_close(PCP_CONNECTION * pc)86 pcp_close(PCP_CONNECTION * pc)
87 {
88 close(pc->fd);
89 pfree(pc->wbuf);
90 pfree(pc->hp);
91 pfree(pc);
92 }
93
94 /* --------------------------------
95 * pcp_read - read 'len' bytes from 'pc'
96 *
97 * return 0 on success, -1 otherwise
98 * --------------------------------
99 */
100 int
pcp_read(PCP_CONNECTION * pc,void * buf,int len)101 pcp_read(PCP_CONNECTION * pc, void *buf, int len)
102 {
103 static char readbuf[READBUFSZ];
104
105 int consume_size;
106 int readlen;
107
108 consume_size = consume_pending_data(pc, buf, len);
109 len -= consume_size;
110 buf += consume_size;
111
112 while (len > 0)
113 {
114 if (pcp_check_fd(pc))
115 return -1;
116
117 readlen = read(pc->fd, readbuf, READBUFSZ);
118 if (readlen == -1)
119 {
120 if (errno == EAGAIN || errno == EINTR)
121 continue;
122
123 return -1;
124 }
125 else if (readlen == 0)
126 {
127 return -1;
128 }
129
130 if (len < readlen)
131 {
132 /* overrun. we need to save remaining data to pending buffer */
133 if (save_pending_data(pc, readbuf + len, readlen - len))
134 return -1;
135 memmove(buf, readbuf, len);
136 break;
137 }
138
139 memmove(buf, readbuf, readlen);
140 buf += readlen;
141 len -= readlen;
142 }
143
144 return 0;
145 }
146
147 /* --------------------------------
148 * pcp_write - write 'len' bytes to 'pc' buffer
149 *
150 * return 0 on success, -1 otherwise
151 * --------------------------------
152 */
153 int
pcp_write(PCP_CONNECTION * pc,void * buf,int len)154 pcp_write(PCP_CONNECTION * pc, void *buf, int len)
155 {
156 int reqlen;
157
158 if (len < 0)
159 {
160 return -1;
161 }
162
163 /* check buffer size */
164 reqlen = pc->wbufpo + len;
165
166 if (reqlen > pc->wbufsz)
167 {
168 char *p;
169
170 reqlen = (reqlen / WRITEBUFSZ + 1) * WRITEBUFSZ;
171
172 #ifndef POOL_PRIVATE
173 MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
174 #endif
175 p = repalloc(pc->wbuf, reqlen);
176
177 #ifndef POOL_PRIVATE
178 MemoryContextSwitchTo(oldContext);
179 #endif
180
181 pc->wbuf = p;
182 pc->wbufsz = reqlen;
183 }
184
185 memcpy(pc->wbuf + pc->wbufpo, buf, len);
186 pc->wbufpo += len;
187
188 return 0;
189 }
190
191 /* --------------------------------
192 * pcp_flush - send pending data in buffer to 'pc'
193 *
194 * return 0 on success, -1 otherwise
195 * --------------------------------
196 */
197 int
pcp_flush(PCP_CONNECTION * pc)198 pcp_flush(PCP_CONNECTION * pc)
199 {
200 int sts;
201 int wlen;
202 int offset;
203
204 wlen = pc->wbufpo;
205
206 if (wlen == 0)
207 {
208 return 0;
209 }
210
211 offset = 0;
212
213 for (;;)
214 {
215 errno = 0;
216
217 sts = write(pc->fd, pc->wbuf + offset, wlen);
218
219 if (sts > 0)
220 {
221 wlen -= sts;
222
223 if (wlen == 0)
224 {
225 /* write completed */
226 break;
227 }
228
229 else if (wlen < 0)
230 {
231 return -1;
232 }
233
234 else
235 {
236 /* need to write remaining data */
237 offset += sts;
238 continue;
239 }
240 }
241 else if (errno == EAGAIN || errno == EINTR)
242 continue;
243 else
244 return -1;
245 }
246
247 pc->wbufpo = 0;
248
249 return 0;
250 }
251
252 /* --------------------------------
253 * consume_pending_data - read pending data from 'pc' buffer
254 *
255 * return the size of data read in
256 * --------------------------------
257 */
258 static int
consume_pending_data(PCP_CONNECTION * pc,void * data,int len)259 consume_pending_data(PCP_CONNECTION * pc, void *data, int len)
260 {
261 int consume_size;
262
263 if (pc->len <= 0)
264 return 0;
265
266 consume_size = Min(len, pc->len);
267 memmove(data, pc->hp + pc->po, consume_size);
268 pc->len -= consume_size;
269
270 if (pc->len <= 0)
271 pc->po = 0;
272 else
273 pc->po += consume_size;
274
275 return consume_size;
276 }
277
278 /* --------------------------------
279 * save_pending_data - save excessively read data into 'pc' buffer
280 *
281 * return 0 on success, -1 otherwise
282 * --------------------------------
283 */
284 static int
save_pending_data(PCP_CONNECTION * pc,void * data,int len)285 save_pending_data(PCP_CONNECTION * pc, void *data, int len)
286 {
287 int reqlen;
288 size_t realloc_size;
289 char *p;
290
291 /* to be safe */
292 if (pc->len == 0)
293 pc->po = 0;
294
295 reqlen = pc->po + pc->len + len;
296
297 /* pending buffer is enough? */
298 if (reqlen > pc->bufsz)
299 {
300 /* too small, enlarge it */
301 realloc_size = (reqlen / READBUFSZ + 1) * READBUFSZ;
302
303 #ifndef POOL_PRIVATE
304 MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
305 #endif
306 p = repalloc(pc->hp, realloc_size);
307
308 #ifndef POOL_PRIVATE
309 MemoryContextSwitchTo(oldContext);
310 #endif
311
312 pc->bufsz = realloc_size;
313 pc->hp = p;
314 }
315
316 memmove(pc->hp + pc->po + pc->len, data, len);
317 pc->len += len;
318
319 return 0;
320 }
321
322 /* --------------------------------
323 * pcp_check_fd - watch for fd which is ready to be read
324 *
325 * return 0 on success, -1 otherwise
326 * --------------------------------
327 */
328 static int
pcp_check_fd(PCP_CONNECTION * pc)329 pcp_check_fd(PCP_CONNECTION * pc)
330 {
331 fd_set readmask;
332 fd_set exceptmask;
333 int fd;
334 int fds;
335
336 fd = pc->fd;
337
338 for (;;)
339 {
340 FD_ZERO(&readmask);
341 FD_ZERO(&exceptmask);
342 FD_SET(fd, &readmask);
343 FD_SET(fd, &exceptmask);
344
345 fds = select(fd + 1, &readmask, NULL, &exceptmask, NULL);
346
347 if (fds == -1)
348 {
349 if (errno == EAGAIN || errno == EINTR)
350 continue;
351
352 break;
353 }
354
355 if (FD_ISSET(fd, &exceptmask))
356 break;
357
358 if (fds == 0)
359 break;
360
361 return 0;
362 }
363
364 return -1;
365 }
366