1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 /*
20 * Packet writing and sending.
21 */
22
23 #include "bouncer.h"
24
25
26 /*
27 * PostgreSQL type OIDs for result sets
28 */
29 #define BYTEAOID 17
30 #define INT8OID 20
31 #define INT4OID 23
32 #define TEXTOID 25
33 #define NUMERICOID 1700
34
35
pktbuf_free(PktBuf * buf)36 void pktbuf_free(PktBuf *buf)
37 {
38 if (!buf || buf->fixed_buf)
39 return;
40
41 log_debug("pktbuf_free(%p)", buf);
42 if (buf->buf)
43 free(buf->buf);
44 if (buf->ev)
45 free(buf->ev);
46 free(buf);
47 }
48
pktbuf_dynamic(int start_len)49 PktBuf *pktbuf_dynamic(int start_len)
50 {
51 PktBuf *buf = zmalloc(sizeof(PktBuf));
52 log_debug("pktbuf_dynamic(%d): %p", start_len, buf);
53 if (!buf)
54 return NULL;
55
56 buf->ev = zmalloc(sizeof(*buf->ev));
57 if (!buf->ev) {
58 pktbuf_free(buf);
59 return NULL;
60 }
61 buf->buf = malloc(start_len);
62 if (!buf->buf) {
63 pktbuf_free(buf);
64 return NULL;
65 }
66 buf->buf_len = start_len;
67 return buf;
68 }
69
pktbuf_reset(struct PktBuf * pkt)70 void pktbuf_reset(struct PktBuf *pkt)
71 {
72 pkt->failed = false;
73 pkt->write_pos = 0;
74 pkt->pktlen_pos = 0;
75 pkt->send_pos = 0;
76 pkt->sending = false;
77 }
78
pktbuf_static(PktBuf * buf,uint8_t * data,int len)79 void pktbuf_static(PktBuf *buf, uint8_t *data, int len)
80 {
81 memset(buf, 0, sizeof(*buf));
82 buf->buf = data;
83 buf->buf_len = len;
84 buf->fixed_buf = true;
85 }
86
87 static PktBuf *temp_pktbuf;
88
pktbuf_temp(void)89 struct PktBuf *pktbuf_temp(void)
90 {
91 if (!temp_pktbuf)
92 temp_pktbuf = pktbuf_dynamic(512);
93 if (!temp_pktbuf)
94 die("out of memory");
95 pktbuf_reset(temp_pktbuf);
96 return temp_pktbuf;
97 }
98
pktbuf_cleanup(void)99 void pktbuf_cleanup(void)
100 {
101 pktbuf_free(temp_pktbuf);
102 temp_pktbuf = NULL;
103 }
104
pktbuf_send_immediate(PktBuf * buf,PgSocket * sk)105 bool pktbuf_send_immediate(PktBuf *buf, PgSocket *sk)
106 {
107 uint8_t *pos = buf->buf + buf->send_pos;
108 int amount = buf->write_pos - buf->send_pos;
109 ssize_t res;
110
111 if (buf->failed)
112 return false;
113 res = sbuf_op_send(&sk->sbuf, pos, amount);
114 if (res < 0) {
115 log_debug("pktbuf_send_immediate: %s", strerror(errno));
116 }
117 return res == amount;
118 }
119
pktbuf_send_func(evutil_socket_t fd,short flags,void * arg)120 static void pktbuf_send_func(evutil_socket_t fd, short flags, void *arg)
121 {
122 PktBuf *buf = arg;
123 SBuf *sbuf = &buf->queued_dst->sbuf;
124 int amount, res;
125
126 log_debug("pktbuf_send_func(%" PRId64 ", %d, %p)", (int64_t)fd, (int)flags, buf);
127
128 if (buf->failed)
129 return;
130
131 amount = buf->write_pos - buf->send_pos;
132 res = sbuf_op_send(sbuf, buf->buf + buf->send_pos, amount);
133 if (res < 0) {
134 if (errno == EAGAIN) {
135 res = 0;
136 } else {
137 log_error("pktbuf_send_func: %s", strerror(errno));
138 pktbuf_free(buf);
139 return;
140 }
141 }
142 buf->send_pos += res;
143
144 if (buf->send_pos < buf->write_pos) {
145 event_assign(buf->ev, pgb_event_base, fd, EV_WRITE, pktbuf_send_func, buf);
146 res = event_add(buf->ev, NULL);
147 if (res < 0) {
148 log_error("pktbuf_send_func: %s", strerror(errno));
149 pktbuf_free(buf);
150 }
151 } else {
152 pktbuf_free(buf);
153 }
154 }
155
pktbuf_send_queued(PktBuf * buf,PgSocket * sk)156 bool pktbuf_send_queued(PktBuf *buf, PgSocket *sk)
157 {
158 Assert(!buf->sending);
159 Assert(!buf->fixed_buf);
160
161 if (buf->failed) {
162 pktbuf_free(buf);
163 return send_pooler_error(sk, true, false, "result prepare failed");
164 } else {
165 buf->sending = true;
166 buf->queued_dst = sk;
167 pktbuf_send_func(sk->sbuf.sock, EV_WRITE, buf);
168 return true;
169 }
170 }
171
make_room(PktBuf * buf,int len)172 static void make_room(PktBuf *buf, int len)
173 {
174 int newlen = buf->buf_len;
175 int need = buf->write_pos + len;
176 void *ptr;
177
178 if (newlen >= need)
179 return;
180
181 if (buf->failed)
182 return;
183
184 if (buf->fixed_buf) {
185 buf->failed = true;
186 return;
187 }
188
189 while (newlen < need)
190 newlen = newlen * 2;
191
192 log_debug("make_room(%p, %d): realloc newlen=%d",
193 buf, len, newlen);
194 ptr = realloc(buf->buf, newlen);
195 if (!ptr) {
196 buf->failed = true;
197 } else {
198 buf->buf = ptr;
199 buf->buf_len = newlen;
200 }
201 }
202
pktbuf_put_char(PktBuf * buf,char val)203 void pktbuf_put_char(PktBuf *buf, char val)
204 {
205 make_room(buf, 1);
206 if (buf->failed)
207 return;
208
209 buf->buf[buf->write_pos++] = val;
210 }
211
pktbuf_put_uint16(PktBuf * buf,uint16_t val)212 void pktbuf_put_uint16(PktBuf *buf, uint16_t val)
213 {
214 make_room(buf, 4);
215 if (buf->failed)
216 return;
217
218 buf->buf[buf->write_pos++] = (val >> 8) & 255;
219 buf->buf[buf->write_pos++] = val & 255;
220 }
221
pktbuf_put_uint32(PktBuf * buf,uint32_t val)222 void pktbuf_put_uint32(PktBuf *buf, uint32_t val)
223 {
224 uint8_t *pos;
225
226 make_room(buf, 4);
227 if (buf->failed)
228 return;
229
230 pos = buf->buf + buf->write_pos;
231 pos[0] = (val >> 24) & 255;
232 pos[1] = (val >> 16) & 255;
233 pos[2] = (val >> 8) & 255;
234 pos[3] = val & 255;
235 buf->write_pos += 4;
236 }
237
pktbuf_put_uint64(PktBuf * buf,uint64_t val)238 void pktbuf_put_uint64(PktBuf *buf, uint64_t val)
239 {
240 pktbuf_put_uint32(buf, val >> 32);
241 pktbuf_put_uint32(buf, (uint32_t)val);
242 }
243
pktbuf_put_bytes(PktBuf * buf,const void * data,int len)244 void pktbuf_put_bytes(PktBuf *buf, const void *data, int len)
245 {
246 make_room(buf, len);
247 if (buf->failed)
248 return;
249 memcpy(buf->buf + buf->write_pos, data, len);
250 buf->write_pos += len;
251 }
252
pktbuf_put_string(PktBuf * buf,const char * str)253 void pktbuf_put_string(PktBuf *buf, const char *str)
254 {
255 int len = strlen(str);
256 pktbuf_put_bytes(buf, str, len + 1);
257 }
258
259 /*
260 * write header, remember pos to write length later.
261 */
pktbuf_start_packet(PktBuf * buf,int type)262 void pktbuf_start_packet(PktBuf *buf, int type)
263 {
264 if (buf->failed)
265 return;
266
267 if (type < 256) {
268 /* new-style packet */
269 pktbuf_put_char(buf, type);
270 buf->pktlen_pos = buf->write_pos;
271 pktbuf_put_uint32(buf, 0);
272 } else {
273 /* old-style packet */
274 buf->pktlen_pos = buf->write_pos;
275 pktbuf_put_uint32(buf, 0);
276 pktbuf_put_uint32(buf, type);
277 }
278 }
279
pktbuf_finish_packet(PktBuf * buf)280 void pktbuf_finish_packet(PktBuf *buf)
281 {
282 uint8_t *pos;
283 unsigned len;
284
285 if (buf->failed)
286 return;
287
288 len = buf->write_pos - buf->pktlen_pos;
289 pos = buf->buf + buf->pktlen_pos;
290 buf->pktlen_pos = 0;
291
292 *pos++ = (len >> 24) & 255;
293 *pos++ = (len >> 16) & 255;
294 *pos++ = (len >> 8) & 255;
295 *pos++ = len & 255;
296 }
297
298 /* types:
299 * c - char/byte
300 * h - uint16
301 * i - uint32
302 * q - uint64
303 * s - Cstring
304 * b - bytes
305 */
pktbuf_write_generic(PktBuf * buf,int type,const char * pktdesc,...)306 void pktbuf_write_generic(PktBuf *buf, int type, const char *pktdesc, ...)
307 {
308 va_list ap;
309 int len;
310 const char *adesc = pktdesc;
311 uint8_t *bin;
312
313 pktbuf_start_packet(buf, type);
314
315 va_start(ap, pktdesc);
316 while (*adesc) {
317 switch (*adesc) {
318 case 'c':
319 pktbuf_put_char(buf, va_arg(ap, int));
320 break;
321 case 'h':
322 pktbuf_put_uint16(buf, va_arg(ap, int));
323 break;
324 case 'i':
325 pktbuf_put_uint32(buf, va_arg(ap, int));
326 break;
327 case 'q':
328 pktbuf_put_uint64(buf, va_arg(ap, uint64_t));
329 break;
330 case 's':
331 pktbuf_put_string(buf, va_arg(ap, char *));
332 break;
333 case 'b':
334 bin = va_arg(ap, uint8_t *);
335 len = va_arg(ap, int);
336 pktbuf_put_bytes(buf, bin, len);
337 break;
338 default:
339 fatal("bad pktdesc: %s", pktdesc);
340 }
341 adesc++;
342 }
343 va_end(ap);
344
345 /* set correct length */
346 pktbuf_finish_packet(buf);
347 }
348
349
350 /* send resultset column info
351 * tupdesc keys:
352 * 'i' - int4
353 * 'q' - int8
354 * 's' - string to text
355 * 'b' - bytes to bytea
356 * 'N' - uint64_t to numeric
357 * 'T' - usec_t to date
358 */
pktbuf_write_RowDescription(PktBuf * buf,const char * tupdesc,...)359 void pktbuf_write_RowDescription(PktBuf *buf, const char *tupdesc, ...)
360 {
361 va_list ap;
362 char *name;
363 int i, ncol = strlen(tupdesc);
364
365 log_noise("write RowDescription");
366
367 pktbuf_start_packet(buf, 'T');
368
369 pktbuf_put_uint16(buf, ncol);
370
371 va_start(ap, tupdesc);
372 for (i = 0; i < ncol; i++) {
373 name = va_arg(ap, char *);
374
375 /* Fields: name, reloid, colnr, oid, typsize, typmod, fmt */
376 pktbuf_put_string(buf, name);
377 pktbuf_put_uint32(buf, 0);
378 pktbuf_put_uint16(buf, 0);
379 if (tupdesc[i] == 's') {
380 pktbuf_put_uint32(buf, TEXTOID);
381 pktbuf_put_uint16(buf, -1);
382 } else if (tupdesc[i] == 'b') {
383 pktbuf_put_uint32(buf, BYTEAOID);
384 pktbuf_put_uint16(buf, -1);
385 } else if (tupdesc[i] == 'i') {
386 pktbuf_put_uint32(buf, INT4OID);
387 pktbuf_put_uint16(buf, 4);
388 } else if (tupdesc[i] == 'q') {
389 pktbuf_put_uint32(buf, INT8OID);
390 pktbuf_put_uint16(buf, 8);
391 } else if (tupdesc[i] == 'N') {
392 pktbuf_put_uint32(buf, NUMERICOID);
393 pktbuf_put_uint16(buf, -1);
394 } else if (tupdesc[i] == 'T') {
395 pktbuf_put_uint32(buf, TEXTOID);
396 pktbuf_put_uint16(buf, -1);
397 } else {
398 fatal("bad tupdesc");
399 }
400 pktbuf_put_uint32(buf, -1);
401 pktbuf_put_uint16(buf, 0);
402 }
403 va_end(ap);
404
405 /* set correct length */
406 pktbuf_finish_packet(buf);
407 }
408
409 /*
410 * send DataRow.
411 *
412 * tupdesc keys:
413 * 'i' - int4
414 * 'q' - int8
415 * 's' - string to text
416 * 'b' - bytes to bytea
417 * 'N' - uint64_t to numeric
418 * 'T' - usec_t to date
419 */
pktbuf_write_DataRow(PktBuf * buf,const char * tupdesc,...)420 void pktbuf_write_DataRow(PktBuf *buf, const char *tupdesc, ...)
421 {
422 int ncol = strlen(tupdesc);
423 va_list ap;
424
425 pktbuf_start_packet(buf, 'D');
426 pktbuf_put_uint16(buf, ncol);
427
428 va_start(ap, tupdesc);
429 for (int i = 0; i < ncol; i++) {
430 char tmp[100]; /* XXX good enough in practice */
431 const char *val = NULL;
432
433 if (tupdesc[i] == 'i') {
434 snprintf(tmp, sizeof(tmp), "%d", va_arg(ap, int));
435 val = tmp;
436 } else if (tupdesc[i] == 'q' || tupdesc[i] == 'N') {
437 snprintf(tmp, sizeof(tmp), "%" PRIu64, va_arg(ap, uint64_t));
438 val = tmp;
439 } else if (tupdesc[i] == 's') {
440 val = va_arg(ap, char *);
441 } else if (tupdesc[i] == 'b') {
442 int blen = va_arg(ap, int);
443 if (blen >= 0) {
444 uint8_t *bval = va_arg(ap, uint8_t *);
445 size_t required = 2 + blen * 2 + 1;
446
447 if (required > sizeof(tmp))
448 fatal("byte array too long (%zu > %zu)", required, sizeof(tmp));
449 strcpy(tmp, "\\x");
450 for (int j = 0; j < blen; j++)
451 sprintf(tmp + (2 + j * 2), "%02x", bval[j]);
452 val = tmp;
453 }
454 else {
455 (void) va_arg(ap, uint8_t *);
456 val = NULL;
457 }
458 } else if (tupdesc[i] == 'T') {
459 usec_t time = va_arg(ap, usec_t);
460 val = format_time_s(time, tmp, sizeof(tmp));
461 } else {
462 fatal("bad tupdesc: %s", tupdesc);
463 }
464
465 if (val) {
466 int len = strlen(val);
467 pktbuf_put_uint32(buf, len);
468 pktbuf_put_bytes(buf, val, len);
469 } else {
470 /* NULL */
471 pktbuf_put_uint32(buf, -1);
472 }
473 }
474 va_end(ap);
475
476 pktbuf_finish_packet(buf);
477 }
478
479 /*
480 * Send Parse+Bind+Execute with string parameters.
481 */
pktbuf_write_ExtQuery(PktBuf * buf,const char * query,int nargs,...)482 void pktbuf_write_ExtQuery(PktBuf *buf, const char *query, int nargs, ...)
483 {
484 va_list ap;
485 const char *val;
486 int len, i;
487
488 /* Parse */
489 pktbuf_write_generic(buf, 'P', "csh", 0, query, 0);
490
491 /* Bind */
492 pktbuf_start_packet(buf, 'B');
493 pktbuf_put_char(buf, 0); /* portal name */
494 pktbuf_put_char(buf, 0); /* query name */
495 pktbuf_put_uint16(buf, 0); /* number of parameter format codes */
496 pktbuf_put_uint16(buf, nargs); /* number of parameter values */
497
498 va_start(ap, nargs);
499 for (i = 0; i < nargs; i++) {
500 val = va_arg(ap, char *);
501 len = strlen(val);
502 pktbuf_put_uint32(buf, len);
503 pktbuf_put_bytes(buf, val, len);
504 }
505 va_end(ap);
506
507 pktbuf_put_uint16(buf, 0); /* number of result-column format codes */
508 pktbuf_finish_packet(buf);
509
510 /* Describe */
511 pktbuf_write_generic(buf, 'D', "cc", 'P', 0);
512
513 /* Execute */
514 pktbuf_write_generic(buf, 'E', "ci", 0, 0);
515
516 /* Sync */
517 pktbuf_write_generic(buf, 'S', "");
518 }
519