1 /*
2  * PgBouncer - Lightweight connection pooler for PostgreSQL.
3  *
4  * Copyright (c) 2007-2009  Marko Kreen, Skype Technologies OÜ
5  *
6  * Permission to use, copy, modify, and/or distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 /*
20  * Packet writing and sending.
21  */
22 
23 #include "bouncer.h"
24 
25 
26 /*
27  * PostgreSQL type OIDs for result sets
28  */
29 #define BYTEAOID 17
30 #define INT8OID 20
31 #define INT4OID 23
32 #define TEXTOID 25
33 #define NUMERICOID 1700
34 
35 
pktbuf_free(PktBuf * buf)36 void pktbuf_free(PktBuf *buf)
37 {
38 	if (!buf || buf->fixed_buf)
39 		return;
40 
41 	log_debug("pktbuf_free(%p)", buf);
42 	if (buf->buf)
43 		free(buf->buf);
44 	if (buf->ev)
45 		free(buf->ev);
46 	free(buf);
47 }
48 
pktbuf_dynamic(int start_len)49 PktBuf *pktbuf_dynamic(int start_len)
50 {
51 	PktBuf *buf = zmalloc(sizeof(PktBuf));
52 	log_debug("pktbuf_dynamic(%d): %p", start_len, buf);
53 	if (!buf)
54 		return NULL;
55 
56 	buf->ev = zmalloc(sizeof(*buf->ev));
57 	if (!buf->ev) {
58 		pktbuf_free(buf);
59 		return NULL;
60 	}
61 	buf->buf = malloc(start_len);
62 	if (!buf->buf) {
63 		pktbuf_free(buf);
64 		return NULL;
65 	}
66 	buf->buf_len = start_len;
67 	return buf;
68 }
69 
pktbuf_reset(struct PktBuf * pkt)70 void pktbuf_reset(struct PktBuf *pkt)
71 {
72 	pkt->failed = false;
73 	pkt->write_pos = 0;
74 	pkt->pktlen_pos = 0;
75 	pkt->send_pos = 0;
76 	pkt->sending = false;
77 }
78 
pktbuf_static(PktBuf * buf,uint8_t * data,int len)79 void pktbuf_static(PktBuf *buf, uint8_t *data, int len)
80 {
81 	memset(buf, 0, sizeof(*buf));
82 	buf->buf = data;
83 	buf->buf_len = len;
84 	buf->fixed_buf = true;
85 }
86 
87 static PktBuf *temp_pktbuf;
88 
pktbuf_temp(void)89 struct PktBuf *pktbuf_temp(void)
90 {
91 	if (!temp_pktbuf)
92 		temp_pktbuf = pktbuf_dynamic(512);
93 	if (!temp_pktbuf)
94 		die("out of memory");
95 	pktbuf_reset(temp_pktbuf);
96 	return temp_pktbuf;
97 }
98 
pktbuf_cleanup(void)99 void pktbuf_cleanup(void)
100 {
101 	pktbuf_free(temp_pktbuf);
102 	temp_pktbuf = NULL;
103 }
104 
pktbuf_send_immediate(PktBuf * buf,PgSocket * sk)105 bool pktbuf_send_immediate(PktBuf *buf, PgSocket *sk)
106 {
107 	uint8_t *pos = buf->buf + buf->send_pos;
108 	int amount = buf->write_pos - buf->send_pos;
109 	ssize_t res;
110 
111 	if (buf->failed)
112 		return false;
113 	res = sbuf_op_send(&sk->sbuf, pos, amount);
114 	if (res < 0) {
115 		log_debug("pktbuf_send_immediate: %s", strerror(errno));
116 	}
117 	return res == amount;
118 }
119 
pktbuf_send_func(evutil_socket_t fd,short flags,void * arg)120 static void pktbuf_send_func(evutil_socket_t fd, short flags, void *arg)
121 {
122 	PktBuf *buf = arg;
123 	SBuf *sbuf = &buf->queued_dst->sbuf;
124 	int amount, res;
125 
126 	log_debug("pktbuf_send_func(%" PRId64 ", %d, %p)", (int64_t)fd, (int)flags, buf);
127 
128 	if (buf->failed)
129 		return;
130 
131 	amount = buf->write_pos - buf->send_pos;
132 	res = sbuf_op_send(sbuf, buf->buf + buf->send_pos, amount);
133 	if (res < 0) {
134 		if (errno == EAGAIN) {
135 			res = 0;
136 		} else {
137 			log_error("pktbuf_send_func: %s", strerror(errno));
138 			pktbuf_free(buf);
139 			return;
140 		}
141 	}
142 	buf->send_pos += res;
143 
144 	if (buf->send_pos < buf->write_pos) {
145 		event_assign(buf->ev, pgb_event_base, fd, EV_WRITE, pktbuf_send_func, buf);
146 		res = event_add(buf->ev, NULL);
147 		if (res < 0) {
148 			log_error("pktbuf_send_func: %s", strerror(errno));
149 			pktbuf_free(buf);
150 		}
151 	} else {
152 		pktbuf_free(buf);
153 	}
154 }
155 
pktbuf_send_queued(PktBuf * buf,PgSocket * sk)156 bool pktbuf_send_queued(PktBuf *buf, PgSocket *sk)
157 {
158 	Assert(!buf->sending);
159 	Assert(!buf->fixed_buf);
160 
161 	if (buf->failed) {
162 		pktbuf_free(buf);
163 		return send_pooler_error(sk, true, false, "result prepare failed");
164 	} else {
165 		buf->sending = true;
166 		buf->queued_dst = sk;
167 		pktbuf_send_func(sk->sbuf.sock, EV_WRITE, buf);
168 		return true;
169 	}
170 }
171 
make_room(PktBuf * buf,int len)172 static void make_room(PktBuf *buf, int len)
173 {
174 	int newlen = buf->buf_len;
175 	int need = buf->write_pos + len;
176 	void *ptr;
177 
178 	if (newlen >= need)
179 		return;
180 
181 	if (buf->failed)
182 		return;
183 
184 	if (buf->fixed_buf) {
185 		buf->failed = true;
186 		return;
187 	}
188 
189 	while (newlen < need)
190 		newlen = newlen * 2;
191 
192 	log_debug("make_room(%p, %d): realloc newlen=%d",
193 		  buf, len, newlen);
194 	ptr = realloc(buf->buf, newlen);
195 	if (!ptr) {
196 		buf->failed = true;
197 	} else {
198 		buf->buf = ptr;
199 		buf->buf_len = newlen;
200 	}
201 }
202 
pktbuf_put_char(PktBuf * buf,char val)203 void pktbuf_put_char(PktBuf *buf, char val)
204 {
205 	make_room(buf, 1);
206 	if (buf->failed)
207 		return;
208 
209 	buf->buf[buf->write_pos++] = val;
210 }
211 
pktbuf_put_uint16(PktBuf * buf,uint16_t val)212 void pktbuf_put_uint16(PktBuf *buf, uint16_t val)
213 {
214 	make_room(buf, 4);
215 	if (buf->failed)
216 		return;
217 
218 	buf->buf[buf->write_pos++] = (val >> 8) & 255;
219 	buf->buf[buf->write_pos++] = val & 255;
220 }
221 
pktbuf_put_uint32(PktBuf * buf,uint32_t val)222 void pktbuf_put_uint32(PktBuf *buf, uint32_t val)
223 {
224 	uint8_t *pos;
225 
226 	make_room(buf, 4);
227 	if (buf->failed)
228 		return;
229 
230 	pos = buf->buf + buf->write_pos;
231 	pos[0] = (val >> 24) & 255;
232 	pos[1] = (val >> 16) & 255;
233 	pos[2] = (val >> 8) & 255;
234 	pos[3] = val & 255;
235 	buf->write_pos += 4;
236 }
237 
pktbuf_put_uint64(PktBuf * buf,uint64_t val)238 void pktbuf_put_uint64(PktBuf *buf, uint64_t val)
239 {
240 	pktbuf_put_uint32(buf, val >> 32);
241 	pktbuf_put_uint32(buf, (uint32_t)val);
242 }
243 
pktbuf_put_bytes(PktBuf * buf,const void * data,int len)244 void pktbuf_put_bytes(PktBuf *buf, const void *data, int len)
245 {
246 	make_room(buf, len);
247 	if (buf->failed)
248 		return;
249 	memcpy(buf->buf + buf->write_pos, data, len);
250 	buf->write_pos += len;
251 }
252 
pktbuf_put_string(PktBuf * buf,const char * str)253 void pktbuf_put_string(PktBuf *buf, const char *str)
254 {
255 	int len = strlen(str);
256 	pktbuf_put_bytes(buf, str, len + 1);
257 }
258 
259 /*
260  * write header, remember pos to write length later.
261  */
pktbuf_start_packet(PktBuf * buf,int type)262 void pktbuf_start_packet(PktBuf *buf, int type)
263 {
264 	if (buf->failed)
265 		return;
266 
267 	if (type < 256) {
268 		/* new-style packet */
269 		pktbuf_put_char(buf, type);
270 		buf->pktlen_pos = buf->write_pos;
271 		pktbuf_put_uint32(buf, 0);
272 	} else {
273 		/* old-style packet */
274 		buf->pktlen_pos = buf->write_pos;
275 		pktbuf_put_uint32(buf, 0);
276 		pktbuf_put_uint32(buf, type);
277 	}
278 }
279 
pktbuf_finish_packet(PktBuf * buf)280 void pktbuf_finish_packet(PktBuf *buf)
281 {
282 	uint8_t *pos;
283 	unsigned len;
284 
285 	if (buf->failed)
286 		return;
287 
288 	len = buf->write_pos - buf->pktlen_pos;
289 	pos = buf->buf + buf->pktlen_pos;
290 	buf->pktlen_pos = 0;
291 
292 	*pos++ = (len >> 24) & 255;
293 	*pos++ = (len >> 16) & 255;
294 	*pos++ = (len >> 8) & 255;
295 	*pos++ = len & 255;
296 }
297 
298 /* types:
299  * c - char/byte
300  * h - uint16
301  * i - uint32
302  * q - uint64
303  * s - Cstring
304  * b - bytes
305  */
pktbuf_write_generic(PktBuf * buf,int type,const char * pktdesc,...)306 void pktbuf_write_generic(PktBuf *buf, int type, const char *pktdesc, ...)
307 {
308 	va_list ap;
309 	int len;
310 	const char *adesc = pktdesc;
311 	uint8_t *bin;
312 
313 	pktbuf_start_packet(buf, type);
314 
315 	va_start(ap, pktdesc);
316 	while (*adesc) {
317 		switch (*adesc) {
318 		case 'c':
319 			pktbuf_put_char(buf, va_arg(ap, int));
320 			break;
321 		case 'h':
322 			pktbuf_put_uint16(buf, va_arg(ap, int));
323 			break;
324 		case 'i':
325 			pktbuf_put_uint32(buf, va_arg(ap, int));
326 			break;
327 		case 'q':
328 			pktbuf_put_uint64(buf, va_arg(ap, uint64_t));
329 			break;
330 		case 's':
331 			pktbuf_put_string(buf, va_arg(ap, char *));
332 			break;
333 		case 'b':
334 			bin = va_arg(ap, uint8_t *);
335 			len = va_arg(ap, int);
336 			pktbuf_put_bytes(buf, bin, len);
337 			break;
338 		default:
339 			fatal("bad pktdesc: %s", pktdesc);
340 		}
341 		adesc++;
342 	}
343 	va_end(ap);
344 
345 	/* set correct length */
346 	pktbuf_finish_packet(buf);
347 }
348 
349 
350 /* send resultset column info
351  * tupdesc keys:
352  * 'i' - int4
353  * 'q' - int8
354  * 's' - string to text
355  * 'b' - bytes to bytea
356  * 'N' - uint64_t to numeric
357  * 'T' - usec_t to date
358  */
pktbuf_write_RowDescription(PktBuf * buf,const char * tupdesc,...)359 void pktbuf_write_RowDescription(PktBuf *buf, const char *tupdesc, ...)
360 {
361 	va_list ap;
362 	char *name;
363 	int i, ncol = strlen(tupdesc);
364 
365 	log_noise("write RowDescription");
366 
367 	pktbuf_start_packet(buf, 'T');
368 
369 	pktbuf_put_uint16(buf, ncol);
370 
371 	va_start(ap, tupdesc);
372 	for (i = 0; i < ncol; i++) {
373 		name = va_arg(ap, char *);
374 
375 		/* Fields: name, reloid, colnr, oid, typsize, typmod, fmt */
376 		pktbuf_put_string(buf, name);
377 		pktbuf_put_uint32(buf, 0);
378 		pktbuf_put_uint16(buf, 0);
379 		if (tupdesc[i] == 's') {
380 			pktbuf_put_uint32(buf, TEXTOID);
381 			pktbuf_put_uint16(buf, -1);
382 		} else if (tupdesc[i] == 'b') {
383 			pktbuf_put_uint32(buf, BYTEAOID);
384 			pktbuf_put_uint16(buf, -1);
385 		} else if (tupdesc[i] == 'i') {
386 			pktbuf_put_uint32(buf, INT4OID);
387 			pktbuf_put_uint16(buf, 4);
388 		} else if (tupdesc[i] == 'q') {
389 			pktbuf_put_uint32(buf, INT8OID);
390 			pktbuf_put_uint16(buf, 8);
391 		} else if (tupdesc[i] == 'N') {
392 			pktbuf_put_uint32(buf, NUMERICOID);
393 			pktbuf_put_uint16(buf, -1);
394 		} else if (tupdesc[i] == 'T') {
395 			pktbuf_put_uint32(buf, TEXTOID);
396 			pktbuf_put_uint16(buf, -1);
397 		} else {
398 			fatal("bad tupdesc");
399 		}
400 		pktbuf_put_uint32(buf, -1);
401 		pktbuf_put_uint16(buf, 0);
402 	}
403 	va_end(ap);
404 
405 	/* set correct length */
406 	pktbuf_finish_packet(buf);
407 }
408 
409 /*
410  * send DataRow.
411  *
412  * tupdesc keys:
413  * 'i' - int4
414  * 'q' - int8
415  * 's' - string to text
416  * 'b' - bytes to bytea
417  * 'N' - uint64_t to numeric
418  * 'T' - usec_t to date
419  */
pktbuf_write_DataRow(PktBuf * buf,const char * tupdesc,...)420 void pktbuf_write_DataRow(PktBuf *buf, const char *tupdesc, ...)
421 {
422 	int ncol = strlen(tupdesc);
423 	va_list ap;
424 
425 	pktbuf_start_packet(buf, 'D');
426 	pktbuf_put_uint16(buf, ncol);
427 
428 	va_start(ap, tupdesc);
429 	for (int i = 0; i < ncol; i++) {
430 		char tmp[100];	/* XXX good enough in practice */
431 		const char *val = NULL;
432 
433 		if (tupdesc[i] == 'i') {
434 			snprintf(tmp, sizeof(tmp), "%d", va_arg(ap, int));
435 			val = tmp;
436 		} else if (tupdesc[i] == 'q' || tupdesc[i] == 'N') {
437 			snprintf(tmp, sizeof(tmp), "%" PRIu64, va_arg(ap, uint64_t));
438 			val = tmp;
439 		} else if (tupdesc[i] == 's') {
440 			val = va_arg(ap, char *);
441 		} else if (tupdesc[i] == 'b') {
442 			int blen = va_arg(ap, int);
443 			if (blen >= 0) {
444 				uint8_t *bval = va_arg(ap, uint8_t *);
445 				size_t required = 2 + blen * 2 + 1;
446 
447 				if (required > sizeof(tmp))
448 					fatal("byte array too long (%zu > %zu)", required, sizeof(tmp));
449 				strcpy(tmp, "\\x");
450 				for (int j = 0; j < blen; j++)
451 					sprintf(tmp + (2 + j * 2), "%02x", bval[j]);
452 				val = tmp;
453 			}
454 			else {
455 				(void) va_arg(ap, uint8_t *);
456 				val = NULL;
457 			}
458 		} else if (tupdesc[i] == 'T') {
459 			usec_t time = va_arg(ap, usec_t);
460 			val = format_time_s(time, tmp, sizeof(tmp));
461 		} else {
462 			fatal("bad tupdesc: %s", tupdesc);
463 		}
464 
465 		if (val) {
466 			int len = strlen(val);
467 			pktbuf_put_uint32(buf, len);
468 			pktbuf_put_bytes(buf, val, len);
469 		} else {
470 			/* NULL */
471 			pktbuf_put_uint32(buf, -1);
472 		}
473 	}
474 	va_end(ap);
475 
476 	pktbuf_finish_packet(buf);
477 }
478 
479 /*
480  * Send Parse+Bind+Execute with string parameters.
481  */
pktbuf_write_ExtQuery(PktBuf * buf,const char * query,int nargs,...)482 void pktbuf_write_ExtQuery(PktBuf *buf, const char *query, int nargs, ...)
483 {
484 	va_list ap;
485 	const char *val;
486 	int len, i;
487 
488 	/* Parse */
489 	pktbuf_write_generic(buf, 'P', "csh", 0, query, 0);
490 
491 	/* Bind */
492 	pktbuf_start_packet(buf, 'B');
493 	pktbuf_put_char(buf, 0);	/* portal name */
494 	pktbuf_put_char(buf, 0);	/* query name */
495 	pktbuf_put_uint16(buf, 0);	/* number of parameter format codes */
496 	pktbuf_put_uint16(buf, nargs);	/* number of parameter values */
497 
498 	va_start(ap, nargs);
499 	for (i = 0; i < nargs; i++) {
500 		val = va_arg(ap, char *);
501 		len = strlen(val);
502 		pktbuf_put_uint32(buf, len);
503 		pktbuf_put_bytes(buf, val, len);
504 	}
505 	va_end(ap);
506 
507 	pktbuf_put_uint16(buf, 0);	/* number of result-column format codes */
508 	pktbuf_finish_packet(buf);
509 
510 	/* Describe */
511 	pktbuf_write_generic(buf, 'D', "cc", 'P', 0);
512 
513 	/* Execute */
514 	pktbuf_write_generic(buf, 'E', "ci", 0, 0);
515 
516 	/* Sync */
517 	pktbuf_write_generic(buf, 'S', "");
518 }
519