1 /* -*-pgsql-c-*- */
2 /*
3 * $Header$
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2018 PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
20 *
21 * pool_stream.c: stream I/O modules
22 *
23 */
24
25 #include "config.h"
26
27 #ifdef HAVE_SYS_SELECT_H
28 #include <sys/select.h>
29 #endif
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <unistd.h>
36
37 #ifdef HAVE_FCNTL_H
38 #include <fcntl.h>
39 #endif
40
41 #include "pool.h"
42 #include "utils/elog.h"
43 #include "utils/palloc.h"
44 #include "utils/memutils.h"
45 #include "utils/pool_stream.h"
46 #include "pool_config.h"
47
48 static int mystrlen(char *str, int upper, int *flag);
49 static int mystrlinelen(char *str, int upper, int *flag);
50 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len);
51 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len);
52 static MemoryContext SwitchToConnectionContext(bool backend_connection);
53 #ifdef DEBUG
54 static void dump_buffer(char *buf, int len);
55 #endif
56 static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len);
57
58 static MemoryContext
SwitchToConnectionContext(bool backend_connection)59 SwitchToConnectionContext(bool backend_connection)
60 {
61 /*
62 * backend connection can live as long as process life,
63 * while the frontend connection is only for one session life.
64 * So we create backend connection in long living memory context
65 */
66 if(backend_connection)
67 return MemoryContextSwitchTo(TopMemoryContext);
68 else
69 return MemoryContextSwitchTo(ProcessLoopContext);
70 }
71
72 /*
73 * open read/write file descriptors.
74 * returns POOL_CONNECTION on success otherwise NULL.
75 */
pool_open(int fd,bool backend_connection)76 POOL_CONNECTION *pool_open(int fd, bool backend_connection)
77 {
78 POOL_CONNECTION *cp;
79
80 MemoryContext oldContext = SwitchToConnectionContext(backend_connection);
81
82 cp = (POOL_CONNECTION *)palloc0(sizeof(POOL_CONNECTION));
83
84 /* initialize write buffer */
85 cp->wbuf = palloc(WRITEBUFSZ);
86 cp->wbufsz = WRITEBUFSZ;
87 cp->wbufpo = 0;
88
89 /* initialize pending data buffer */
90 cp->hp = palloc(READBUFSZ);
91 cp->bufsz = READBUFSZ;
92 cp->po = 0;
93 cp->len = 0;
94 cp->sbuf = NULL;
95 cp->sbufsz = 0;
96 cp->buf2 = NULL;
97 cp->bufsz2 = 0;
98 cp->buf3 = NULL;
99 cp->bufsz3 = 0;
100
101 cp->fd = fd;
102 cp->socket_state = POOL_SOCKET_VALID;
103
104 MemoryContextSwitchTo(oldContext);
105
106 return cp;
107 }
108
109 /*
110 * close read/write file descriptors.
111 */
pool_close(POOL_CONNECTION * cp)112 void pool_close(POOL_CONNECTION *cp)
113 {
114 /*
115 * shutdown connection to the client so that pgpool is not blocked
116 */
117 if (!cp->isbackend)
118 shutdown(cp->fd, 1);
119 close(cp->fd);
120 cp->socket_state = POOL_SOCKET_CLOSED;
121 pfree(cp->wbuf);
122 pfree(cp->hp);
123 if (cp->sbuf)
124 pfree(cp->sbuf);
125 if (cp->buf2)
126 pfree(cp->buf2);
127 if (cp->buf3)
128 pfree(cp->buf3);
129 pool_discard_params(&cp->params);
130
131 pool_ssl_close(cp);
132
133 pfree(cp);
134 }
135
pool_read_with_error(POOL_CONNECTION * cp,void * buf,int len,const char * err_context)136 void pool_read_with_error(POOL_CONNECTION *cp, void *buf, int len,
137 const char* err_context )
138 {
139 if (pool_read(cp, buf, len) < 0)
140 {
141 ereport(ERROR,
142 (errmsg("failed to read data of length %d from DB node: %d",len,cp->db_node_id),
143 errdetail("error occurred when reading: %s",err_context?err_context:"")));
144 }
145 }
146 /*
147 * read len bytes from cp
148 * returns 0 on success otherwise throws an ereport.
149 */
pool_read(POOL_CONNECTION * cp,void * buf,int len)150 int pool_read(POOL_CONNECTION *cp, void *buf, int len)
151 {
152 static char readbuf[READBUFSZ];
153
154 int consume_size;
155 int readlen;
156
157 consume_size = consume_pending_data(cp, buf, len);
158 len -= consume_size;
159 buf += consume_size;
160
161 while (len > 0)
162 {
163 if (pool_check_fd(cp))
164 {
165 if (!IS_MASTER_NODE_ID(cp->db_node_id) && (getpid() != mypid))
166 {
167 ereport(FATAL,
168 (errmsg("unable to read data from DB node %d",cp->db_node_id),
169 errdetail("data is not ready in DB node")));
170
171 }
172 else
173 {
174 ereport(ERROR,
175 (errmsg("unable to read data from DB node %d",cp->db_node_id),
176 errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
177 }
178 }
179
180 if (cp->ssl_active > 0)
181 {
182 readlen = pool_ssl_read(cp, readbuf, READBUFSZ);
183 }
184 else
185 {
186 readlen = read(cp->fd, readbuf, READBUFSZ);
187 if (cp->isbackend)
188 {
189 ereport(DEBUG1,
190 (errmsg("pool_read: read %d bytes from backend %d",
191 readlen, cp->db_node_id)));
192 #ifdef DEBUG
193 dump_buffer(readbuf, readlen);
194 #endif
195 }
196 }
197
198 if (readlen == -1)
199 {
200 if (errno == EINTR || errno == EAGAIN)
201 {
202 ereport(DEBUG1,
203 (errmsg("read on socket failed with error :\"%s\"",strerror(errno)),
204 errdetail("retrying...")));
205 continue;
206 }
207
208 cp->socket_state = POOL_SOCKET_ERROR;
209 if (cp->isbackend)
210 {
211 if (cp->con_info && cp->con_info->swallow_termination == 1)
212 {
213 cp->con_info->swallow_termination = 0;
214 ereport(FATAL,
215 (errmsg("unable to read data from DB node %d",cp->db_node_id),
216 errdetail("pg_terminate_backend was called on the backend")));
217 }
218
219 /* if fail_over_on_backend_error is true, then trigger failover */
220 if (pool_config->fail_over_on_backend_error)
221 {
222 notice_backend_error(cp->db_node_id, true);
223
224 /* If we are in the main process, we will not exit */
225 child_exit(POOL_EXIT_AND_RESTART);
226 ereport(ERROR,
227 (errmsg("unable to read data from DB node %d",cp->db_node_id),
228 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
229 }
230 else
231 {
232 ereport(ERROR,
233 (errmsg("unable to read data from DB node %d",cp->db_node_id),
234 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
235 }
236 }
237 else
238 {
239 ereport(FRONTEND_ERROR,
240 (errmsg("unable to read data from frontend"),
241 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
242 }
243 }
244 else if (readlen == 0)
245 {
246 cp->socket_state = POOL_SOCKET_EOF;
247 if (cp->isbackend)
248 {
249 if(processType == PT_MAIN)
250 ereport(ERROR,
251 (errmsg("unable to read data from DB node %d",cp->db_node_id),
252 errdetail("EOF encountered with backend")));
253
254 ereport(FATAL,
255 (errmsg("unable to read data from DB node %d",cp->db_node_id),
256 errdetail("EOF encountered with backend")));
257 }
258 else
259 {
260 /*
261 * if backend offers authentication method, frontend could close connection
262 */
263 ereport(FRONTEND_ERROR,
264 (errmsg("unable to read data from frontend"),
265 errdetail("EOF encountered with frontend")));
266 }
267 }
268
269 if (len < readlen)
270 {
271 /* overrun. we need to save remaining data to pending buffer */
272 save_pending_data(cp, readbuf+len, readlen-len);
273 memmove(buf, readbuf, len);
274 break;
275 }
276
277 memmove(buf, readbuf, readlen);
278 buf += readlen;
279 len -= readlen;
280 }
281
282 return 0;
283 }
284
285 /*
286 * read exactly len bytes from cp
287 * returns buffer address on success otherwise NULL.
288 */
pool_read2(POOL_CONNECTION * cp,int len)289 char *pool_read2(POOL_CONNECTION *cp, int len)
290 {
291 char *buf;
292 int req_size;
293 int alloc_size;
294 int consume_size;
295 int readlen;
296 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
297
298 req_size = cp->len + len;
299
300 if (req_size > cp->bufsz2)
301 {
302 alloc_size = ((req_size+1)/READBUFSZ+1)*READBUFSZ;
303 cp->buf2 = repalloc(cp->buf2, alloc_size);
304 cp->bufsz2 = alloc_size;
305 }
306
307 buf = cp->buf2;
308
309 consume_size = consume_pending_data(cp, buf, len);
310 len -= consume_size;
311 buf += consume_size;
312
313 while (len > 0)
314 {
315 if (pool_check_fd(cp))
316 {
317 if (!IS_MASTER_NODE_ID(cp->db_node_id))
318 {
319 ereport(FATAL,
320 (errmsg("unable to read data from DB node %d",cp->db_node_id),
321 errdetail("data is not ready in DB node")));
322
323 }
324 else
325 {
326 ereport(ERROR,
327 (errmsg("unable to read data from DB node %d",cp->db_node_id),
328 errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
329 }
330 }
331
332 if (cp->ssl_active > 0)
333 {
334 readlen = pool_ssl_read(cp, buf, len);
335 }
336 else
337 {
338 readlen = read(cp->fd, buf, len);
339 if (cp->isbackend)
340 ereport(DEBUG1,
341 (errmsg("pool_read2: read %d bytes from backend %d",
342 readlen, cp->db_node_id)));
343 }
344
345 if (readlen == -1)
346 {
347 if (errno == EINTR || errno == EAGAIN)
348 {
349 ereport(DEBUG1,
350 (errmsg("read on socket failed with error :\"%s\"",strerror(errno)),
351 errdetail("retrying...")));
352 continue;
353 }
354
355 cp->socket_state = POOL_SOCKET_ERROR;
356 if (cp->isbackend)
357 {
358 if (cp->con_info && cp->con_info->swallow_termination == 1)
359 {
360 cp->con_info->swallow_termination = 0;
361 ereport(FATAL,
362 (errmsg("unable to read data from DB node %d",cp->db_node_id),
363 errdetail("pg_terminate_backend was called on the backend")));
364 }
365
366 /* if fail_over_on_backend_error is true, then trigger failover */
367 if (pool_config->fail_over_on_backend_error)
368 {
369 notice_backend_error(cp->db_node_id, true);
370 child_exit(POOL_EXIT_AND_RESTART);
371 /* we are in main process */
372 ereport(ERROR,
373 (errmsg("unable to read data from DB node %d",cp->db_node_id),
374 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
375 }
376 else
377 {
378 ereport(ERROR,
379 (errmsg("unable to read data from DB node %d",cp->db_node_id),
380 errdetail("do not failover because fail_over_on_backend_error is off")));
381 }
382 }
383 else
384 {
385 ereport(ERROR,
386 (errmsg("unable to read data from frontend"),
387 errdetail("socket read function returned -1")));
388 }
389 }
390 else if (readlen == 0)
391 {
392 cp->socket_state = POOL_SOCKET_EOF;
393 if (cp->isbackend)
394 {
395 ereport(ERROR,
396 (errmsg("unable to read data from backend"),
397 errdetail("EOF read on socket")));
398 }
399 else
400 {
401 /*
402 * if backend offers authentication method, frontend could close connection
403 */
404 ereport(ERROR,
405 (errmsg("unable to read data from frontend"),
406 errdetail("EOF read on socket")));
407
408 }
409 }
410
411 buf += readlen;
412 len -= readlen;
413 }
414 MemoryContextSwitchTo(oldContext);
415 return cp->buf2;
416 }
417
418 /*
419 * write len bytes to cp the write buffer.
420 * returns 0 on success otherwise -1.
421 */
pool_write_noerror(POOL_CONNECTION * cp,void * buf,int len)422 int pool_write_noerror(POOL_CONNECTION *cp, void *buf, int len)
423 {
424 if (len < 0)
425 return -1;
426
427 if (cp->no_forward)
428 return 0;
429
430 if (len == 1 && cp->isbackend)
431 {
432 char c;
433
434 c = ((char *)buf)[0];
435
436 ereport(DEBUG1,
437 (errmsg("pool_write: to backend: %d kind:%c", cp->db_node_id, c)));
438 }
439
440 if (!cp->isbackend)
441 {
442 char c;
443
444 c = ((char *)buf)[0];
445
446 if (len == 1)
447 ereport(DEBUG1,
448 (errmsg("pool_write: to frontend: kind:%c po:%d", c, cp->wbufpo)));
449 else
450 ereport(DEBUG1,
451 (errmsg("pool_write: to frontend: length:%d po:%d", len, cp->wbufpo)));
452 }
453
454 while (len > 0)
455 {
456 int remainder = WRITEBUFSZ - cp->wbufpo;
457
458 /*
459 * If requested data cannot be added to the write buffer, flush the
460 * buffer and directly write the requested data. This could avoid
461 * unwanted write in the middle of message boundary.
462 */
463 if (remainder < len)
464 {
465 if (pool_flush_it(cp) == -1)
466 return -1;
467
468 if (pool_write_flush(cp, buf, len) < 0)
469 return -1;
470 return 0;
471 }
472
473 if (cp->wbufpo >= WRITEBUFSZ)
474 {
475 /*
476 * Write buffer is full. so flush buffer.
477 * wbufpo is reset in pool_flush_it().
478 */
479 if (pool_flush_it(cp) == -1)
480 return -1;
481 remainder = WRITEBUFSZ;
482 }
483
484 /* check buffer size */
485 remainder = Min(remainder, len);
486
487 memcpy(cp->wbuf+cp->wbufpo, buf, remainder);
488 cp->wbufpo += remainder;
489 buf += remainder;
490 len -= remainder;
491 }
492 return 0;
493 }
494
495 /*
496 * write len bytes to cp the write buffer.
497 * returns 0 on success otherwise ereport.
498 */
pool_write(POOL_CONNECTION * cp,void * buf,int len)499 int pool_write(POOL_CONNECTION *cp, void *buf, int len)
500 {
501 if (len < 0)
502 ereport(ERROR,
503 (errmsg("unable to write data to %s",cp->isbackend?"backend":"frontend"),
504 errdetail("invalid data size: %d", len)));
505
506 if(pool_write_noerror(cp,buf,len))
507 ereport(ERROR,
508 (errmsg("unable to write data to %s",cp->isbackend?"backend":"frontend"),
509 errdetail("pool_flush failed")));
510
511 return 0;
512 }
513
514
515 /*
516 * Direct write.
517 * This function does not throws an ereport in case of an error
518 */
pool_write_flush(POOL_CONNECTION * cp,void * buf,int len)519 static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len)
520 {
521 int sts;
522 int wlen;
523 int offset;
524 wlen = len;
525
526 ereport(DEBUG1,
527 (errmsg("pool_write_flush: write size: %d", wlen)));
528
529 if (wlen == 0)
530 {
531 return 0;
532 }
533
534 offset = 0;
535
536 for (;;)
537 {
538 errno = 0;
539
540 if (cp->ssl_active > 0)
541 {
542 sts = pool_ssl_write(cp, buf+offset, wlen);
543 }
544 else
545 {
546 sts = write(cp->fd, buf+offset, wlen);
547 }
548
549 if (sts >= 0)
550 {
551 wlen -= sts;
552
553 if (wlen == 0)
554 {
555 /* write completed */
556 break;
557 }
558
559 else if (wlen < 0)
560 {
561 ereport(WARNING,
562 (errmsg("pool_write_flush: invalid write size %d", sts)));
563 return -1;
564 }
565
566 else
567 {
568 /* need to write remaining data */
569 ereport(DEBUG1,
570 (errmsg("pool_write_flush: write retry: %d", wlen)));
571
572 offset += sts;
573 continue;
574 }
575 }
576
577 else if (errno == EAGAIN || errno == EINTR)
578 {
579 continue;
580 }
581
582 else
583 {
584 /* If this is the backend stream, report error. Otherwise
585 * just report debug message.
586 */
587 if (cp->isbackend)
588 ereport(WARNING,
589 (errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
590 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
591 else
592 ereport(DEBUG1,
593 (errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
594 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
595 return -1;
596 }
597 }
598
599 return 0;
600 }
601
602 /*
603 * flush write buffer
604 * This function does not throws an ereport in case of an error
605 */
pool_flush_it(POOL_CONNECTION * cp)606 int pool_flush_it(POOL_CONNECTION *cp)
607 {
608 int sts;
609 int wlen;
610 int offset;
611 wlen = cp->wbufpo;
612
613 ereport(DEBUG1,
614 (errmsg("pool_flush_it: flush size: %d", wlen)));
615
616 if (wlen == 0)
617 {
618 return 0;
619 }
620
621 offset = 0;
622
623 for (;;)
624 {
625 errno = 0;
626
627 if (cp->ssl_active > 0)
628 {
629 sts = pool_ssl_write(cp, cp->wbuf + offset, wlen);
630 }
631 else
632 {
633 sts = write(cp->fd, cp->wbuf + offset, wlen);
634 }
635
636 if (sts >= 0)
637 {
638 wlen -= sts;
639
640 if (wlen == 0)
641 {
642 /* write completed */
643 break;
644 }
645
646 else if (wlen < 0)
647 {
648 ereport(WARNING,
649 (errmsg("pool_flush_it: invalid write size %d", sts)));
650 cp->wbufpo = 0;
651 return -1;
652 }
653
654 else
655 {
656 /* need to write remaining data */
657 ereport(DEBUG1,
658 (errmsg("pool_flush_it: write retry: %d", wlen)));
659
660 offset += sts;
661 continue;
662 }
663 }
664
665 else if (errno == EAGAIN || errno == EINTR)
666 {
667 continue;
668 }
669
670 else
671 {
672 /* If this is the backend stream, report error. Otherwise
673 * just report debug message.
674 */
675 if (cp->isbackend)
676 ereport(WARNING,
677 (errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
678 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
679 else
680 ereport(DEBUG1,
681 (errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
682 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
683 cp->wbufpo = 0;
684 return -1;
685 }
686 }
687
688 cp->wbufpo = 0;
689
690 return 0;
691 }
692
693 /*
694 * flush write buffer and degenerate/failover if error occurs
695 */
pool_flush(POOL_CONNECTION * cp)696 int pool_flush(POOL_CONNECTION *cp)
697 {
698 if (pool_flush_it(cp) == -1)
699 {
700 if (cp->isbackend)
701 {
702 if (cp->con_info && cp->con_info->swallow_termination == 1)
703 {
704 cp->con_info->swallow_termination = 0;
705 ereport(FATAL,
706 (errmsg("unable to read data from DB node %d",cp->db_node_id),
707 errdetail("pg_terminate_backend was called on the backend")));
708 }
709
710 /* if fail_over_on_backend_error is true, then trigger failover */
711 if (pool_config->fail_over_on_backend_error)
712 {
713 notice_backend_error(cp->db_node_id, true);
714 ereport(LOG,
715 (errmsg("unable to flush data to backend"),
716 errdetail("do not failover because I am the main process")));
717
718 child_exit(POOL_EXIT_AND_RESTART);
719 return -1;
720 }
721 else
722 {
723 ereport(ERROR,
724 (errmsg("unable to flush data to backend"),
725 errdetail("do not failover because fail_over_on_backend_error is off")));
726 }
727 }
728 else
729 {
730 /*
731 * If we are in replication mode, we need to continue the
732 * processing with backends to keep consistency among
733 * backends, thus ignore error.
734 */
735 if (REPLICATION)
736 ereport(NOTICE,
737 (errmsg("unable to flush data to frontend"),
738 errdetail("pgpool is in replication mode, ignoring error to keep consistency among backends")));
739 else
740 ereport(FRONTEND_ERROR,
741 (errmsg("unable to flush data to frontend")));
742
743 }
744 }
745 return 0;
746 }
747
748 /*
749 * same as pool_flush() but returns -ve value instead of ereport in case of failure
750 */
pool_flush_noerror(POOL_CONNECTION * cp)751 int pool_flush_noerror(POOL_CONNECTION *cp)
752 {
753 if (pool_flush_it(cp) == -1)
754 {
755 if (cp->isbackend)
756 {
757 if (cp->con_info && cp->con_info->swallow_termination == 1)
758 {
759 cp->con_info->swallow_termination = 0;
760 ereport(FATAL,
761 (errmsg("unable to read data from DB node %d",cp->db_node_id),
762 errdetail("pg_terminate_backend was called on the backend")));
763 }
764
765 /* if fail_over_on_backend_erro is true, then trigger failover */
766 if (pool_config->fail_over_on_backend_error)
767 {
768 notice_backend_error(cp->db_node_id, true);
769 child_exit(POOL_EXIT_AND_RESTART);
770 ereport(LOG,
771 (errmsg("unable to flush data to backend"),
772 errdetail("do not failover because I am the main process")));
773 return -1;
774 }
775 else
776 {
777 ereport(LOG,
778 (errmsg("unable to flush data to backend"),
779 errdetail("do not failover because fail_over_on_backend_error is off")));
780 return -1;
781 }
782 }
783 else
784 {
785 /*
786 * If we are in replication mode, we need to continue the
787 * processing with backends to keep consistency among
788 * backends, thus ignore error.
789 */
790 if (REPLICATION)
791 return 0;
792 else
793 return -1;
794 }
795 }
796 return 0;
797 }
798
799 /*
800 * combo of pool_write and pool_flush
801 */
pool_write_and_flush(POOL_CONNECTION * cp,void * buf,int len)802 void pool_write_and_flush(POOL_CONNECTION *cp, void *buf, int len)
803 {
804 pool_write(cp, buf, len);
805 pool_flush(cp);
806 }
807
808 /*
809 * same as pool_write_and_flush() but does not throws ereport when error occures
810 */
pool_write_and_flush_noerror(POOL_CONNECTION * cp,void * buf,int len)811 int pool_write_and_flush_noerror(POOL_CONNECTION *cp, void *buf, int len)
812 {
813 int ret;
814 ret = pool_write_noerror(cp,buf,len);
815 if(ret == 0)
816 return pool_flush_noerror(cp);
817 return ret;
818 }
819
820 /*
821 * read a string until EOF or NULL is encountered.
822 * if line is not 0, read until new line is encountered.
823 */
pool_read_string(POOL_CONNECTION * cp,int * len,int line)824 char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
825 {
826 int readp;
827 int readsize;
828 int readlen;
829 int strlength;
830 int flag;
831 int consume_size;
832
833 #ifdef DEBUG
834 static char pbuf[READBUFSZ];
835 #endif
836
837 *len = 0;
838 readp = 0;
839
840 /* initialize read buffer */
841 if (cp->sbufsz == 0)
842 {
843 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
844 cp->sbuf = palloc(READBUFSZ);
845 MemoryContextSwitchTo(oldContext);
846
847 cp->sbufsz = READBUFSZ;
848 *cp->sbuf = '\0';
849 }
850
851 /* any pending data? */
852 if (cp->len)
853 {
854 if (line)
855 strlength = mystrlinelen(cp->hp+cp->po, cp->len, &flag);
856 else
857 strlength = mystrlen(cp->hp+cp->po, cp->len, &flag);
858
859 /* buffer is too small? */
860 if ((strlength + 1) > cp->sbufsz)
861 {
862 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
863 cp->sbufsz = ((strlength+1)/READBUFSZ+1)*READBUFSZ;
864 cp->sbuf = repalloc(cp->sbuf, cp->sbufsz);
865 MemoryContextSwitchTo(oldContext);
866 }
867
868 /* consume pending and save to read string buffer */
869 consume_size = consume_pending_data(cp, cp->sbuf, strlength);
870
871 *len = strlength;
872
873 /* is the string null terminated? */
874 if (consume_size == strlength && !flag)
875 {
876 /* not null or line terminated.
877 * we need to read more since we have not encountered NULL or new line yet
878 */
879 readsize = cp->sbufsz - strlength;
880 readp = strlength;
881 }
882 else
883 {
884 ereport(DEBUG1,
885 (errmsg("reading string data"),
886 errdetail("read all from pending data. po:%d len:%d",
887 cp->po, cp->len)));
888 return cp->sbuf;
889 }
890 } else
891 {
892 readsize = cp->sbufsz;
893 }
894
895 for (;;)
896 {
897 if (pool_check_fd(cp))
898 {
899 if (!IS_MASTER_NODE_ID(cp->db_node_id))
900 {
901 ereport(FATAL,
902 (errmsg("unable to read data from DB node %d",cp->db_node_id),
903 errdetail("data is not ready in DB node")));
904
905 }
906 else
907 {
908 ereport(ERROR,
909 (errmsg("unable to read data from DB node %d",cp->db_node_id),
910 errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
911 }
912 }
913
914 if (cp->ssl_active > 0)
915 {
916 readlen = pool_ssl_read(cp, cp->sbuf+readp, readsize);
917 }
918 else
919 {
920 readlen = read(cp->fd, cp->sbuf+readp, readsize);
921 }
922
923 if (readlen == -1)
924 {
925 cp->socket_state = POOL_SOCKET_ERROR;
926 if (cp->isbackend)
927 {
928 if (cp->con_info && cp->con_info->swallow_termination == 1)
929 {
930 cp->con_info->swallow_termination = 0;
931 ereport(FATAL,
932 (errmsg("unable to read data from DB node %d",cp->db_node_id),
933 errdetail("pg_terminate_backend was called on the backend")));
934 }
935
936 notice_backend_error(cp->db_node_id, true);
937 child_exit(POOL_EXIT_AND_RESTART);
938 ereport(ERROR,
939 (errmsg("unable to read data from frontend"),
940 errdetail("socket read function returned -1")));
941 }
942 else
943 {
944 ereport(ERROR,
945 (errmsg("unable to read data from frontend"),
946 errdetail("socket read function returned -1")));
947
948 }
949 }
950 else if (readlen == 0) /* EOF detected */
951 {
952 /*
953 * just returns an error, not trigger failover or degeneration
954 */
955 cp->socket_state = POOL_SOCKET_EOF;
956 ereport(ERROR,
957 (errmsg("unable to read data from %s",cp->isbackend?"backend":"frontend"),
958 errdetail("EOF read on socket")));
959
960 }
961
962 /* check overrun */
963 if (line)
964 strlength = mystrlinelen(cp->sbuf+readp, readlen, &flag);
965 else
966 strlength = mystrlen(cp->sbuf+readp, readlen, &flag);
967
968 if (strlength < readlen)
969 {
970 save_pending_data(cp, cp->sbuf+readp+strlength, readlen-strlength);
971 *len += strlength;
972 ereport(DEBUG1,
973 (errmsg("reading string data"),
974 errdetail("total read %d with pending data po:%d len:%d", *len, cp->po, cp->len)));
975 return cp->sbuf;
976 }
977
978 *len += readlen;
979
980 /* encountered null or newline? */
981 if (flag)
982 {
983 /* ok we have read all data */
984 ereport(DEBUG1,
985 (errmsg("reading string data"),
986 errdetail("all data read: total read %d", *len)));
987 break;
988 }
989
990 readp += readlen;
991 readsize = READBUFSZ;
992
993 if ((*len+readsize) > cp->sbufsz)
994 {
995 cp->sbufsz += READBUFSZ;
996 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
997 cp->sbuf = repalloc(cp->sbuf, cp->sbufsz);
998 MemoryContextSwitchTo(oldContext);
999 }
1000 }
1001 return cp->sbuf;
1002 }
1003
1004 /*
1005 * Set db node id to connection.
1006 */
pool_set_db_node_id(POOL_CONNECTION * con,int db_node_id)1007 void pool_set_db_node_id(POOL_CONNECTION *con, int db_node_id)
1008 {
1009 if (!con)
1010 return;
1011 con->db_node_id = db_node_id;
1012 }
1013
1014 /*
1015 * returns the byte length of str, including \0, no more than upper.
1016 * if encountered \0, flag is set to non 0.
1017 * example:
1018 * mystrlen("abc", 2) returns 2
1019 * mystrlen("abc", 3) returns 3
1020 * mystrlen("abc", 4) returns 4
1021 * mystrlen("abc", 5) returns 4
1022 */
mystrlen(char * str,int upper,int * flag)1023 static int mystrlen(char *str, int upper, int *flag)
1024 {
1025 int len;
1026
1027 *flag = 0;
1028
1029 for (len = 0;len < upper; len++, str++)
1030 {
1031 if (!*str)
1032 {
1033 len++;
1034 *flag = 1;
1035 break;
1036 }
1037 }
1038 return len;
1039 }
1040
1041 /*
1042 * returns the byte length of str terminated by \n or \0 (including \n or \0), no more than upper.
1043 * if encountered \0 or \n, flag is set to non 0.
1044 * example:
1045 * mystrlinelen("abc", 2) returns 2
1046 * mystrlinelen("abc", 3) returns 3
1047 * mystrlinelen("abc", 4) returns 4
1048 * mystrlinelen("abc", 5) returns 4
1049 * mystrlinelen("abcd\nefg", 4) returns 4
1050 * mystrlinelen("abcd\nefg", 5) returns 5
1051 * mystrlinelen("abcd\nefg", 6) returns 5
1052 */
mystrlinelen(char * str,int upper,int * flag)1053 static int mystrlinelen(char *str, int upper, int *flag)
1054 {
1055 int len;
1056
1057 *flag = 0;
1058
1059 for (len = 0;len < upper; len++, str++)
1060 {
1061 if (!*str || *str == '\n')
1062 {
1063 len++;
1064 *flag = 1;
1065 break;
1066 }
1067 }
1068 return len;
1069 }
1070
1071 /*
1072 * save pending data
1073 */
save_pending_data(POOL_CONNECTION * cp,void * data,int len)1074 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len)
1075 {
1076 int reqlen;
1077 size_t realloc_size;
1078 char *p;
1079
1080 /* to be safe */
1081 if (cp->len == 0)
1082 cp->po = 0;
1083
1084 reqlen = cp->po + cp->len + len;
1085
1086 /* pending buffer is enough? */
1087 if (reqlen > cp->bufsz)
1088 {
1089 /* too small, enlarge it */
1090 realloc_size = (reqlen/READBUFSZ+1)*READBUFSZ;
1091
1092 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1093 p = repalloc(cp->hp, realloc_size);
1094 MemoryContextSwitchTo(oldContext);
1095
1096 cp->bufsz = realloc_size;
1097 cp->hp = p;
1098 }
1099
1100 memmove(cp->hp + cp->po + cp->len, data, len);
1101 cp->len += len;
1102
1103 return 0;
1104 }
1105
1106 /*
1107 * consume pending data. returns actually consumed data length.
1108 */
consume_pending_data(POOL_CONNECTION * cp,void * data,int len)1109 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len)
1110 {
1111 int consume_size;
1112
1113 if (cp->len <= 0)
1114 return 0;
1115
1116 consume_size = Min(len, cp->len);
1117 memmove(data, cp->hp + cp->po, consume_size);
1118 cp->len -= consume_size;
1119
1120 if (cp->len <= 0)
1121 cp->po = 0;
1122 else
1123 cp->po += consume_size;
1124
1125 return consume_size;
1126 }
1127
1128 /*
1129 * pool_unread: Put back data to input buffer
1130 */
pool_unread(POOL_CONNECTION * cp,void * data,int len)1131 int pool_unread(POOL_CONNECTION *cp, void *data, int len)
1132 {
1133 void *p = cp->hp;
1134 int n = cp->len + len;
1135 int realloc_size;
1136
1137 /*
1138 * Optimization to avoid mmove. If there's enough space in front of
1139 * existing data, we can use it.
1140 */
1141 if (cp->po >= len)
1142 {
1143 memmove(cp->hp + cp->po - len, data, len);
1144 cp->po -= len;
1145 cp->len = n;
1146 return 0;
1147 }
1148
1149 if (cp->bufsz < n)
1150 {
1151 realloc_size = (n/READBUFSZ+1)*READBUFSZ;
1152
1153 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1154 p = repalloc(cp->hp, realloc_size);
1155 MemoryContextSwitchTo(oldContext);
1156
1157 cp->hp = p;
1158 cp->bufsz = realloc_size;
1159 }
1160 if (cp->len != 0)
1161 memmove(p + len, cp->hp + cp->po, cp->len);
1162 memmove(p, data, len);
1163 cp->len = n;
1164 cp->po = 0;
1165 return 0;
1166 }
1167
1168 /*
1169 * pool_push: Push data into buffer stack.
1170 */
pool_push(POOL_CONNECTION * cp,void * data,int len)1171 int pool_push(POOL_CONNECTION *cp, void *data, int len)
1172 {
1173 char *p;
1174
1175 ereport(DEBUG1,
1176 (errmsg("pushing data of len: %d", len)));
1177
1178
1179 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1180
1181 if (cp->bufsz3 == 0)
1182 {
1183 p = cp->buf3 = palloc(len);
1184 }
1185 else
1186 {
1187 cp->buf3 = repalloc(cp->buf3, cp->bufsz3 + len);
1188 p = cp->buf3 + cp->bufsz3;
1189 }
1190
1191 memcpy(p, data, len);
1192 cp->bufsz3 += len;
1193
1194 MemoryContextSwitchTo(oldContext);
1195 return 0;
1196 }
1197
1198 /*
1199 * pool_pop: Pop data from buffer stack and put back data using
1200 * pool_unread.
1201 */
pool_pop(POOL_CONNECTION * cp,int * len)1202 void pool_pop(POOL_CONNECTION *cp, int *len)
1203 {
1204 if (cp->bufsz3 == 0)
1205 {
1206 *len = 0;
1207 ereport(DEBUG1,
1208 (errmsg("pop data of len: %d", *len)));
1209 return;
1210 }
1211
1212 pool_unread(cp, cp->buf3, cp->bufsz3);
1213 *len = cp->bufsz3;
1214 pfree(cp->buf3);
1215 cp->bufsz3 = 0;
1216 cp->buf3 = NULL;
1217 ereport(DEBUG1,
1218 (errmsg("pop data of len: %d", *len)));
1219 }
1220
1221 /*
1222 * pool_stacklen: Returns buffer stack length
1223 * pool_unread.
1224 */
pool_stacklen(POOL_CONNECTION * cp)1225 int pool_stacklen(POOL_CONNECTION *cp)
1226 {
1227 return cp->bufsz3;
1228 }
1229
1230 /*
1231 * set non-block flag
1232 */
pool_set_nonblock(int fd)1233 void pool_set_nonblock(int fd)
1234 {
1235 int var;
1236
1237 /* set fd to none blocking */
1238 var = fcntl(fd, F_GETFL, 0);
1239 if (var == -1)
1240 {
1241 ereport(FATAL,
1242 (errmsg("unable to set options on socket"),
1243 errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1244
1245 }
1246 if (fcntl(fd, F_SETFL, var | O_NONBLOCK) == -1)
1247 {
1248 ereport(FATAL,
1249 (errmsg("unable to set options on socket"),
1250 errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1251 }
1252 }
1253
1254 /*
1255 * unset non-block flag
1256 */
pool_unset_nonblock(int fd)1257 void pool_unset_nonblock(int fd)
1258 {
1259 int var;
1260
1261 /* set fd to none blocking */
1262 var = fcntl(fd, F_GETFL, 0);
1263 if (var == -1)
1264 {
1265 ereport(FATAL,
1266 (errmsg("unable to set options on socket"),
1267 errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1268 }
1269 if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
1270 {
1271 ereport(FATAL,
1272 (errmsg("unable to set options on socket"),
1273 errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1274 }
1275 }
1276
1277 #ifdef DEBUG
1278 /*
1279 * Debug aid
1280 */
dump_buffer(char * buf,int len)1281 static void dump_buffer(char *buf, int len)
1282 {
1283 while (--len)
1284 {
1285 ereport(DEBUG1,
1286 (errmsg("%02x", *buf++)));
1287 }
1288 }
1289 #endif
socket_write(int fd,void * buf,size_t len)1290 int socket_write(int fd, void* buf, size_t len)
1291 {
1292 int bytes_send = 0;
1293 do
1294 {
1295 int ret;
1296 ret = write(fd, buf + bytes_send, (len - bytes_send));
1297 if (ret <=0)
1298 {
1299 if (errno == EINTR || errno == EAGAIN)
1300 {
1301 ereport(DEBUG1,
1302 (errmsg("write on socket failed with error :\"%s\"",strerror(errno)),
1303 errdetail("retrying...")));
1304 continue;
1305 }
1306 ereport(LOG,
1307 (errmsg("write on socket failed with error :\"%s\"",strerror(errno))));
1308 return -1;
1309 }
1310 bytes_send += ret;
1311 }while (bytes_send < len);
1312 return bytes_send;
1313 }
1314
socket_read(int fd,void * buf,size_t len,int timeout)1315 int socket_read(int fd, void* buf, size_t len, int timeout)
1316 {
1317 int ret, read_len;
1318 read_len = 0;
1319 struct timeval timeoutval;
1320 fd_set readmask;
1321 int fds;
1322
1323 while (read_len < len)
1324 {
1325 FD_ZERO(&readmask);
1326 FD_SET(fd, &readmask);
1327
1328 timeoutval.tv_sec = timeout;
1329 timeoutval.tv_usec = 0;
1330
1331 fds = select(fd+1, &readmask, NULL, NULL, timeout?&timeoutval:NULL);
1332 if (fds == -1)
1333 {
1334 if (errno == EAGAIN || errno == EINTR)
1335 continue;
1336
1337 ereport(WARNING,
1338 (errmsg("select failed with error: \"%s\"", strerror(errno))));
1339 return -1;
1340 }
1341 else if (fds == 0)
1342 {
1343 return -2;
1344 }
1345 ret = read(fd, buf + read_len, (len - read_len));
1346 if(ret < 0)
1347 {
1348 if (errno == EINTR || errno == EAGAIN)
1349 {
1350 ereport(DEBUG1,
1351 (errmsg("read from socket failed with error :\"%s\"",strerror(errno)),
1352 errdetail("retrying...")));
1353 continue;
1354 }
1355 ereport(LOG,
1356 (errmsg("read from socket failed with error :\"%s\"",strerror(errno))));
1357 return -1;
1358 }
1359 if(ret == 0)
1360 {
1361 ereport(LOG,
1362 (errmsg("read from socket failed, remote end closed the connection")));
1363 return 0;
1364 }
1365 read_len +=ret;
1366 }
1367 return read_len;
1368 }
1369