1 /*
2  * $Header$
3  *
4  * pgpool: a language independent connection pool server for PostgreSQL
5  * written by Tatsuo Ishii
6  *
7  * Copyright (c) 2003-2015	PgPool Global Development Group
8  *
9  * Permission to use, copy, modify, and distribute this software and
10  * its documentation for any purpose and without fee is hereby
11  * granted, provided that the above copyright notice appear in all
12  * copies and that both that copyright notice and this permission
13  * notice appear in supporting documentation, and that the name of the
14  * author not be used in advertising or publicity pertaining to
15  * distribution of the software without specific, written prior
16  * permission. The author makes no representations about the
17  * suitability of this software for any purpose.  It is provided "as
18  * is" without express or implied warranty.
19  *
20  * PCP buffer management module.
21  */
22 
23 #include <stdlib.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <unistd.h>
27 #include <time.h>
28 #include <sys/types.h>
29 #include <sys/time.h>
30 #include <unistd.h>
31 
32 #include "pcp/pcp.h"
33 #include "pcp/pcp_stream.h"
34 
35 #ifndef POOL_PRIVATE
36 #include "utils/palloc.h"
37 #include "utils/memutils.h"
38 #else
39 #include "utils/fe_ports.h"
40 #endif
41 
42 static int	consume_pending_data(PCP_CONNECTION * pc, void *data, int len);
43 static int	save_pending_data(PCP_CONNECTION * pc, void *data, int len);
44 static int	pcp_check_fd(PCP_CONNECTION * pc);
45 
46 /* --------------------------------
47  * pcp_open - allocate read & write buffers for PCP_CONNECTION
48  *
49  * return newly allocated PCP_CONNECTION on success, NULL if malloc() fails
50  * --------------------------------
51  */
52 PCP_CONNECTION *
pcp_open(int fd)53 pcp_open(int fd)
54 {
55 	PCP_CONNECTION *pc;
56 
57 #ifndef POOL_PRIVATE
58 	MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
59 #endif
60 
61 	pc = (PCP_CONNECTION *) palloc0(sizeof(PCP_CONNECTION));
62 	/* initialize write buffer */
63 	pc->wbuf = palloc(WRITEBUFSZ);
64 	pc->wbufsz = WRITEBUFSZ;
65 	pc->wbufpo = 0;
66 
67 	/* initialize pending data buffer */
68 	pc->hp = palloc(READBUFSZ);
69 	pc->bufsz = READBUFSZ;
70 	pc->po = 0;
71 	pc->len = 0;
72 
73 #ifndef POOL_PRIVATE
74 	MemoryContextSwitchTo(oldContext);
75 #endif
76 
77 	pc->fd = fd;
78 	return pc;
79 }
80 
81 /* --------------------------------
82  * pcp_close - deallocate read & write buffers for PCP_CONNECTION
83  * --------------------------------
84  */
85 void
pcp_close(PCP_CONNECTION * pc)86 pcp_close(PCP_CONNECTION * pc)
87 {
88 	close(pc->fd);
89 	pfree(pc->wbuf);
90 	pfree(pc->hp);
91 	pfree(pc);
92 }
93 
94 /* --------------------------------
95  * pcp_read - read 'len' bytes from 'pc'
96  *
97  * return 0 on success, -1 otherwise
98  * --------------------------------
99  */
100 int
pcp_read(PCP_CONNECTION * pc,void * buf,int len)101 pcp_read(PCP_CONNECTION * pc, void *buf, int len)
102 {
103 	static char readbuf[READBUFSZ];
104 
105 	int			consume_size;
106 	int			readlen;
107 
108 	consume_size = consume_pending_data(pc, buf, len);
109 	len -= consume_size;
110 	buf += consume_size;
111 
112 	while (len > 0)
113 	{
114 		if (pcp_check_fd(pc))
115 			return -1;
116 
117 		readlen = read(pc->fd, readbuf, READBUFSZ);
118 		if (readlen == -1)
119 		{
120 			if (errno == EAGAIN || errno == EINTR)
121 				continue;
122 
123 			return -1;
124 		}
125 		else if (readlen == 0)
126 		{
127 			return -1;
128 		}
129 
130 		if (len < readlen)
131 		{
132 			/* overrun. we need to save remaining data to pending buffer */
133 			if (save_pending_data(pc, readbuf + len, readlen - len))
134 				return -1;
135 			memmove(buf, readbuf, len);
136 			break;
137 		}
138 
139 		memmove(buf, readbuf, readlen);
140 		buf += readlen;
141 		len -= readlen;
142 	}
143 
144 	return 0;
145 }
146 
147 /* --------------------------------
148  * pcp_write - write 'len' bytes to 'pc' buffer
149  *
150  * return 0 on success, -1 otherwise
151  * --------------------------------
152  */
153 int
pcp_write(PCP_CONNECTION * pc,void * buf,int len)154 pcp_write(PCP_CONNECTION * pc, void *buf, int len)
155 {
156 	int			reqlen;
157 
158 	if (len < 0)
159 	{
160 		return -1;
161 	}
162 
163 	/* check buffer size */
164 	reqlen = pc->wbufpo + len;
165 
166 	if (reqlen > pc->wbufsz)
167 	{
168 		char	   *p;
169 
170 		reqlen = (reqlen / WRITEBUFSZ + 1) * WRITEBUFSZ;
171 
172 #ifndef POOL_PRIVATE
173 		MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
174 #endif
175 		p = repalloc(pc->wbuf, reqlen);
176 
177 #ifndef POOL_PRIVATE
178 		MemoryContextSwitchTo(oldContext);
179 #endif
180 
181 		pc->wbuf = p;
182 		pc->wbufsz = reqlen;
183 	}
184 
185 	memcpy(pc->wbuf + pc->wbufpo, buf, len);
186 	pc->wbufpo += len;
187 
188 	return 0;
189 }
190 
191 /* --------------------------------
192  * pcp_flush - send pending data in buffer to 'pc'
193  *
194  * return 0 on success, -1 otherwise
195  * --------------------------------
196  */
197 int
pcp_flush(PCP_CONNECTION * pc)198 pcp_flush(PCP_CONNECTION * pc)
199 {
200 	int			sts;
201 	int			wlen;
202 	int			offset;
203 
204 	wlen = pc->wbufpo;
205 
206 	if (wlen == 0)
207 	{
208 		return 0;
209 	}
210 
211 	offset = 0;
212 
213 	for (;;)
214 	{
215 		errno = 0;
216 
217 		sts = write(pc->fd, pc->wbuf + offset, wlen);
218 
219 		if (sts > 0)
220 		{
221 			wlen -= sts;
222 
223 			if (wlen == 0)
224 			{
225 				/* write completed */
226 				break;
227 			}
228 
229 			else if (wlen < 0)
230 			{
231 				return -1;
232 			}
233 
234 			else
235 			{
236 				/* need to write remaining data */
237 				offset += sts;
238 				continue;
239 			}
240 		}
241 		else if (errno == EAGAIN || errno == EINTR)
242 			continue;
243 		else
244 			return -1;
245 	}
246 
247 	pc->wbufpo = 0;
248 
249 	return 0;
250 }
251 
252 /* --------------------------------
253  * consume_pending_data - read pending data from 'pc' buffer
254  *
255  * return the size of data read in
256  * --------------------------------
257  */
258 static int
consume_pending_data(PCP_CONNECTION * pc,void * data,int len)259 consume_pending_data(PCP_CONNECTION * pc, void *data, int len)
260 {
261 	int			consume_size;
262 
263 	if (pc->len <= 0)
264 		return 0;
265 
266 	consume_size = Min(len, pc->len);
267 	memmove(data, pc->hp + pc->po, consume_size);
268 	pc->len -= consume_size;
269 
270 	if (pc->len <= 0)
271 		pc->po = 0;
272 	else
273 		pc->po += consume_size;
274 
275 	return consume_size;
276 }
277 
278 /* --------------------------------
279  * save_pending_data - save excessively read data into 'pc' buffer
280  *
281  * return 0 on success, -1 otherwise
282  * --------------------------------
283  */
284 static int
save_pending_data(PCP_CONNECTION * pc,void * data,int len)285 save_pending_data(PCP_CONNECTION * pc, void *data, int len)
286 {
287 	int			reqlen;
288 	size_t		realloc_size;
289 	char	   *p;
290 
291 	/* to be safe */
292 	if (pc->len == 0)
293 		pc->po = 0;
294 
295 	reqlen = pc->po + pc->len + len;
296 
297 	/* pending buffer is enough? */
298 	if (reqlen > pc->bufsz)
299 	{
300 		/* too small, enlarge it */
301 		realloc_size = (reqlen / READBUFSZ + 1) * READBUFSZ;
302 
303 #ifndef POOL_PRIVATE
304 		MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext);
305 #endif
306 		p = repalloc(pc->hp, realloc_size);
307 
308 #ifndef POOL_PRIVATE
309 		MemoryContextSwitchTo(oldContext);
310 #endif
311 
312 		pc->bufsz = realloc_size;
313 		pc->hp = p;
314 	}
315 
316 	memmove(pc->hp + pc->po + pc->len, data, len);
317 	pc->len += len;
318 
319 	return 0;
320 }
321 
322 /* --------------------------------
323  * pcp_check_fd - watch for fd which is ready to be read
324  *
325  * return 0 on success, -1 otherwise
326  * --------------------------------
327  */
328 static int
pcp_check_fd(PCP_CONNECTION * pc)329 pcp_check_fd(PCP_CONNECTION * pc)
330 {
331 	fd_set		readmask;
332 	fd_set		exceptmask;
333 	int			fd;
334 	int			fds;
335 
336 	fd = pc->fd;
337 
338 	for (;;)
339 	{
340 		FD_ZERO(&readmask);
341 		FD_ZERO(&exceptmask);
342 		FD_SET(fd, &readmask);
343 		FD_SET(fd, &exceptmask);
344 
345 		fds = select(fd + 1, &readmask, NULL, &exceptmask, NULL);
346 
347 		if (fds == -1)
348 		{
349 			if (errno == EAGAIN || errno == EINTR)
350 				continue;
351 
352 			break;
353 		}
354 
355 		if (FD_ISSET(fd, &exceptmask))
356 			break;
357 
358 		if (fds == 0)
359 			break;
360 
361 		return 0;
362 	}
363 
364 	return -1;
365 }
366