1 /* -*-pgsql-c-*- */
2 /*
3 * $Header$
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2018 PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose. It is provided "as
19 * is" without express or implied warranty.
20 *
21 * pool_stream.c: stream I/O modules
22 *
23 */
24
25 #include "config.h"
26
27 #ifdef HAVE_SYS_SELECT_H
28 #include <sys/select.h>
29 #endif
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <unistd.h>
36
37 #ifdef HAVE_FCNTL_H
38 #include <fcntl.h>
39 #endif
40
41 #include "pool.h"
42 #include "utils/elog.h"
43 #include "utils/palloc.h"
44 #include "utils/memutils.h"
45 #include "utils/pool_stream.h"
46 #include "pool_config.h"
47
48 static int mystrlen(char *str, int upper, int *flag);
49 static int mystrlinelen(char *str, int upper, int *flag);
50 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len);
51 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len);
52 static MemoryContext SwitchToConnectionContext(bool backend_connection);
53 #ifdef DEBUG
54 static void dump_buffer(char *buf, int len);
55 #endif
56 static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len);
57
58 static MemoryContext
SwitchToConnectionContext(bool backend_connection)59 SwitchToConnectionContext(bool backend_connection)
60 {
61 /*
62 * backend connection can live as long as process life,
63 * while the frontend connection is only for one session life.
64 * So we create backend connection in long living memory context
65 */
66 if(backend_connection)
67 return MemoryContextSwitchTo(TopMemoryContext);
68 else
69 return MemoryContextSwitchTo(ProcessLoopContext);
70 }
71
72 /*
73 * open read/write file descriptors.
74 * returns POOL_CONNECTION on success otherwise NULL.
75 */
pool_open(int fd,bool backend_connection)76 POOL_CONNECTION *pool_open(int fd, bool backend_connection)
77 {
78 POOL_CONNECTION *cp;
79
80 MemoryContext oldContext = SwitchToConnectionContext(backend_connection);
81
82 cp = (POOL_CONNECTION *)palloc0(sizeof(POOL_CONNECTION));
83
84 /* initialize write buffer */
85 cp->wbuf = palloc(WRITEBUFSZ);
86 cp->wbufsz = WRITEBUFSZ;
87 cp->wbufpo = 0;
88
89 /* initialize pending data buffer */
90 cp->hp = palloc(READBUFSZ);
91 cp->bufsz = READBUFSZ;
92 cp->po = 0;
93 cp->len = 0;
94 cp->sbuf = NULL;
95 cp->sbufsz = 0;
96 cp->buf2 = NULL;
97 cp->bufsz2 = 0;
98 cp->buf3 = NULL;
99 cp->bufsz3 = 0;
100
101 cp->fd = fd;
102 cp->socket_state = POOL_SOCKET_VALID;
103
104 MemoryContextSwitchTo(oldContext);
105
106 return cp;
107 }
108
109 /*
110 * close read/write file descriptors.
111 */
pool_close(POOL_CONNECTION * cp)112 void pool_close(POOL_CONNECTION *cp)
113 {
114 /*
115 * shutdown connection to the client so that pgpool is not blocked
116 */
117 if (!cp->isbackend)
118 shutdown(cp->fd, 1);
119 close(cp->fd);
120 cp->socket_state = POOL_SOCKET_CLOSED;
121 pfree(cp->wbuf);
122 pfree(cp->hp);
123 if (cp->sbuf)
124 pfree(cp->sbuf);
125 if (cp->buf2)
126 pfree(cp->buf2);
127 if (cp->buf3)
128 pfree(cp->buf3);
129 pool_discard_params(&cp->params);
130
131 pool_ssl_close(cp);
132
133 pfree(cp);
134 }
135
pool_read_with_error(POOL_CONNECTION * cp,void * buf,int len,const char * err_context)136 void pool_read_with_error(POOL_CONNECTION *cp, void *buf, int len,
137 const char* err_context )
138 {
139 if (pool_read(cp, buf, len) < 0)
140 {
141 ereport(ERROR,
142 (errmsg("failed to read data of length %d from DB node: %d",len,cp->db_node_id),
143 errdetail("error occurred when reading: %s",err_context?err_context:"")));
144 }
145 }
146 /*
147 * read len bytes from cp
148 * returns 0 on success otherwise throws an ereport.
149 */
pool_read(POOL_CONNECTION * cp,void * buf,int len)150 int pool_read(POOL_CONNECTION *cp, void *buf, int len)
151 {
152 static char readbuf[READBUFSZ];
153
154 int consume_size;
155 int readlen;
156
157 consume_size = consume_pending_data(cp, buf, len);
158 len -= consume_size;
159 buf += consume_size;
160
161 while (len > 0)
162 {
163 if (pool_check_fd(cp))
164 {
165 if (!IS_MASTER_NODE_ID(cp->db_node_id) && (getpid() != mypid))
166 {
167 ereport(FATAL,
168 (errmsg("unable to read data from DB node %d",cp->db_node_id),
169 errdetail("data is not ready in DB node")));
170
171 }
172 else
173 {
174 ereport(ERROR,
175 (errmsg("unable to read data from DB node %d",cp->db_node_id),
176 errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
177 }
178 }
179
180 if (cp->ssl_active > 0)
181 {
182 readlen = pool_ssl_read(cp, readbuf, READBUFSZ);
183 }
184 else
185 {
186 readlen = read(cp->fd, readbuf, READBUFSZ);
187 if (cp->isbackend)
188 {
189 ereport(DEBUG1,
190 (errmsg("pool_read: read %d bytes from backend %d",
191 readlen, cp->db_node_id)));
192 #ifdef DEBUG
193 dump_buffer(readbuf, readlen);
194 #endif
195 }
196 }
197
198 if (readlen == -1)
199 {
200 if (processType == PT_HEALTH_CHECK && health_check_timer_expired && errno == EINTR)
201 {
202 ereport(ERROR,
203 (errmsg("health check timed out while waiting for reading data")));
204 return -1;
205 }
206
207 if (errno == EINTR || errno == EAGAIN)
208 {
209 ereport(DEBUG1,
210 (errmsg("read on socket failed with error :\"%s\"",strerror(errno)),
211 errdetail("retrying...")));
212 continue;
213 }
214
215 cp->socket_state = POOL_SOCKET_ERROR;
216 if (cp->isbackend)
217 {
218 if (cp->con_info && cp->con_info->swallow_termination == 1)
219 {
220 cp->con_info->swallow_termination = 0;
221 ereport(FATAL,
222 (errmsg("unable to read data from DB node %d",cp->db_node_id),
223 errdetail("pg_terminate_backend was called on the backend")));
224 }
225
226 /* if fail_over_on_backend_error is true, then trigger failover */
227 if (pool_config->fail_over_on_backend_error)
228 {
229 notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
230
231 /* If we are in the main process, we will not exit */
232 child_exit(POOL_EXIT_AND_RESTART);
233 ereport(ERROR,
234 (errmsg("unable to read data from DB node %d",cp->db_node_id),
235 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
236 }
237 else
238 {
239 ereport(ERROR,
240 (errmsg("unable to read data from DB node %d",cp->db_node_id),
241 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
242 }
243 }
244 else
245 {
246 ereport(FRONTEND_ERROR,
247 (errmsg("unable to read data from frontend"),
248 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
249 }
250 }
251 else if (readlen == 0)
252 {
253 cp->socket_state = POOL_SOCKET_EOF;
254 if (cp->isbackend)
255 {
256 if (processType == PT_MAIN || processType == PT_HEALTH_CHECK)
257 ereport(ERROR,
258 (errmsg("unable to read data from DB node %d",cp->db_node_id),
259 errdetail("EOF encountered with backend")));
260
261 ereport(FATAL,
262 (errmsg("unable to read data from DB node %d",cp->db_node_id),
263 errdetail("EOF encountered with backend")));
264 }
265 else
266 {
267 /*
268 * if backend offers authentication method, frontend could close connection
269 */
270 ereport(FRONTEND_ERROR,
271 (errmsg("unable to read data from frontend"),
272 errdetail("EOF encountered with frontend")));
273 }
274 }
275
276 if (len < readlen)
277 {
278 /* overrun. we need to save remaining data to pending buffer */
279 save_pending_data(cp, readbuf+len, readlen-len);
280 memmove(buf, readbuf, len);
281 break;
282 }
283
284 memmove(buf, readbuf, readlen);
285 buf += readlen;
286 len -= readlen;
287 }
288
289 return 0;
290 }
291
292 /*
293 * read exactly len bytes from cp
294 * returns buffer address on success otherwise NULL.
295 */
pool_read2(POOL_CONNECTION * cp,int len)296 char *pool_read2(POOL_CONNECTION *cp, int len)
297 {
298 char *buf;
299 int req_size;
300 int alloc_size;
301 int consume_size;
302 int readlen;
303 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
304
305 req_size = cp->len + len;
306
307 if (req_size > cp->bufsz2)
308 {
309 alloc_size = ((req_size+1)/READBUFSZ+1)*READBUFSZ;
310 cp->buf2 = repalloc(cp->buf2, alloc_size);
311 cp->bufsz2 = alloc_size;
312 }
313
314 buf = cp->buf2;
315
316 consume_size = consume_pending_data(cp, buf, len);
317 len -= consume_size;
318 buf += consume_size;
319
320 while (len > 0)
321 {
322 if (pool_check_fd(cp))
323 {
324 if (!IS_MASTER_NODE_ID(cp->db_node_id))
325 {
326 ereport(FATAL,
327 (errmsg("unable to read data from DB node %d",cp->db_node_id),
328 errdetail("data is not ready in DB node")));
329
330 }
331 else
332 {
333 ereport(ERROR,
334 (errmsg("unable to read data from DB node %d",cp->db_node_id),
335 errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
336 }
337 }
338
339 if (cp->ssl_active > 0)
340 {
341 readlen = pool_ssl_read(cp, buf, len);
342 }
343 else
344 {
345 readlen = read(cp->fd, buf, len);
346 if (cp->isbackend)
347 ereport(DEBUG1,
348 (errmsg("pool_read2: read %d bytes from backend %d",
349 readlen, cp->db_node_id)));
350 }
351
352 if (readlen == -1)
353 {
354 if (errno == EINTR || errno == EAGAIN)
355 {
356 ereport(DEBUG1,
357 (errmsg("read on socket failed with error :\"%s\"",strerror(errno)),
358 errdetail("retrying...")));
359 continue;
360 }
361
362 cp->socket_state = POOL_SOCKET_ERROR;
363 if (cp->isbackend)
364 {
365 if (cp->con_info && cp->con_info->swallow_termination == 1)
366 {
367 cp->con_info->swallow_termination = 0;
368 ereport(FATAL,
369 (errmsg("unable to read data from DB node %d",cp->db_node_id),
370 errdetail("pg_terminate_backend was called on the backend")));
371 }
372
373 /* if fail_over_on_backend_error is true, then trigger failover */
374 if (pool_config->fail_over_on_backend_error)
375 {
376 notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
377 child_exit(POOL_EXIT_AND_RESTART);
378 /* we are in main process */
379 ereport(ERROR,
380 (errmsg("unable to read data from DB node %d",cp->db_node_id),
381 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
382 }
383 else
384 {
385 ereport(ERROR,
386 (errmsg("unable to read data from DB node %d",cp->db_node_id),
387 errdetail("do not failover because fail_over_on_backend_error is off")));
388 }
389 }
390 else
391 {
392 ereport(ERROR,
393 (errmsg("unable to read data from frontend"),
394 errdetail("socket read function returned -1")));
395 }
396 }
397 else if (readlen == 0)
398 {
399 cp->socket_state = POOL_SOCKET_EOF;
400 if (cp->isbackend)
401 {
402 ereport(ERROR,
403 (errmsg("unable to read data from backend"),
404 errdetail("EOF read on socket")));
405 }
406 else
407 {
408 /*
409 * if backend offers authentication method, frontend could close connection
410 */
411 ereport(ERROR,
412 (errmsg("unable to read data from frontend"),
413 errdetail("EOF read on socket")));
414
415 }
416 }
417
418 buf += readlen;
419 len -= readlen;
420 }
421 MemoryContextSwitchTo(oldContext);
422 return cp->buf2;
423 }
424
425 /*
426 * write len bytes to cp the write buffer.
427 * returns 0 on success otherwise -1.
428 */
pool_write_noerror(POOL_CONNECTION * cp,void * buf,int len)429 int pool_write_noerror(POOL_CONNECTION *cp, void *buf, int len)
430 {
431 if (len < 0)
432 return -1;
433
434 if (cp->no_forward)
435 return 0;
436
437 if (len == 1 && cp->isbackend)
438 {
439 char c;
440
441 c = ((char *)buf)[0];
442
443 ereport(DEBUG1,
444 (errmsg("pool_write: to backend: %d kind:%c", cp->db_node_id, c)));
445 }
446
447 if (!cp->isbackend)
448 {
449 char c;
450
451 c = ((char *)buf)[0];
452
453 if (len == 1)
454 ereport(DEBUG1,
455 (errmsg("pool_write: to frontend: kind:%c po:%d", c, cp->wbufpo)));
456 else
457 ereport(DEBUG1,
458 (errmsg("pool_write: to frontend: length:%d po:%d", len, cp->wbufpo)));
459 }
460
461 while (len > 0)
462 {
463 int remainder = WRITEBUFSZ - cp->wbufpo;
464
465 /*
466 * If requested data cannot be added to the write buffer, flush the
467 * buffer and directly write the requested data. This could avoid
468 * unwanted write in the middle of message boundary.
469 */
470 if (remainder < len)
471 {
472 if (pool_flush_it(cp) == -1)
473 return -1;
474
475 if (pool_write_flush(cp, buf, len) < 0)
476 return -1;
477 return 0;
478 }
479
480 if (cp->wbufpo >= WRITEBUFSZ)
481 {
482 /*
483 * Write buffer is full. so flush buffer.
484 * wbufpo is reset in pool_flush_it().
485 */
486 if (pool_flush_it(cp) == -1)
487 return -1;
488 remainder = WRITEBUFSZ;
489 }
490
491 /* check buffer size */
492 remainder = Min(remainder, len);
493
494 memcpy(cp->wbuf+cp->wbufpo, buf, remainder);
495 cp->wbufpo += remainder;
496 buf += remainder;
497 len -= remainder;
498 }
499 return 0;
500 }
501
502 /*
503 * write len bytes to cp the write buffer.
504 * returns 0 on success otherwise ereport.
505 */
pool_write(POOL_CONNECTION * cp,void * buf,int len)506 int pool_write(POOL_CONNECTION *cp, void *buf, int len)
507 {
508 if (len < 0)
509 ereport(ERROR,
510 (errmsg("unable to write data to %s",cp->isbackend?"backend":"frontend"),
511 errdetail("invalid data size: %d", len)));
512
513 if(pool_write_noerror(cp,buf,len))
514 ereport(ERROR,
515 (errmsg("unable to write data to %s",cp->isbackend?"backend":"frontend"),
516 errdetail("pool_flush failed")));
517
518 return 0;
519 }
520
521
522 /*
523 * Direct write.
524 * This function does not throws an ereport in case of an error
525 */
pool_write_flush(POOL_CONNECTION * cp,void * buf,int len)526 static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len)
527 {
528 int sts;
529 int wlen;
530 int offset;
531 wlen = len;
532
533 ereport(DEBUG1,
534 (errmsg("pool_write_flush: write size: %d", wlen)));
535
536 if (wlen == 0)
537 {
538 return 0;
539 }
540
541 offset = 0;
542
543 for (;;)
544 {
545 errno = 0;
546
547 if (cp->ssl_active > 0)
548 {
549 sts = pool_ssl_write(cp, buf+offset, wlen);
550 }
551 else
552 {
553 sts = write(cp->fd, buf+offset, wlen);
554 }
555
556 if (sts >= 0)
557 {
558 wlen -= sts;
559
560 if (wlen == 0)
561 {
562 /* write completed */
563 break;
564 }
565
566 else if (wlen < 0)
567 {
568 ereport(WARNING,
569 (errmsg("pool_write_flush: invalid write size %d", sts)));
570 return -1;
571 }
572
573 else
574 {
575 /* need to write remaining data */
576 ereport(DEBUG1,
577 (errmsg("pool_write_flush: write retry: %d", wlen)));
578
579 offset += sts;
580 continue;
581 }
582 }
583
584 else if (errno == EAGAIN || errno == EINTR)
585 {
586 continue;
587 }
588
589 else
590 {
591 /* If this is the backend stream, report error. Otherwise
592 * just report debug message.
593 */
594 if (cp->isbackend)
595 ereport(WARNING,
596 (errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
597 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
598 else
599 ereport(DEBUG1,
600 (errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
601 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
602 return -1;
603 }
604 }
605
606 return 0;
607 }
608
609 /*
610 * flush write buffer
611 * This function does not throws an ereport in case of an error
612 */
pool_flush_it(POOL_CONNECTION * cp)613 int pool_flush_it(POOL_CONNECTION *cp)
614 {
615 int sts;
616 int wlen;
617 int offset;
618 wlen = cp->wbufpo;
619
620 ereport(DEBUG1,
621 (errmsg("pool_flush_it: flush size: %d", wlen)));
622
623 if (wlen == 0)
624 {
625 return 0;
626 }
627
628 offset = 0;
629
630 for (;;)
631 {
632 errno = 0;
633
634 if (cp->ssl_active > 0)
635 {
636 sts = pool_ssl_write(cp, cp->wbuf + offset, wlen);
637 }
638 else
639 {
640 sts = write(cp->fd, cp->wbuf + offset, wlen);
641 }
642
643 if (sts >= 0)
644 {
645 wlen -= sts;
646
647 if (wlen == 0)
648 {
649 /* write completed */
650 break;
651 }
652
653 else if (wlen < 0)
654 {
655 ereport(WARNING,
656 (errmsg("pool_flush_it: invalid write size %d", sts)));
657 cp->wbufpo = 0;
658 return -1;
659 }
660
661 else
662 {
663 /* need to write remaining data */
664 ereport(DEBUG1,
665 (errmsg("pool_flush_it: write retry: %d", wlen)));
666
667 offset += sts;
668 continue;
669 }
670 }
671
672 else if (errno == EAGAIN || errno == EINTR)
673 {
674 continue;
675 }
676
677 else
678 {
679 /* If this is the backend stream, report error. Otherwise
680 * just report debug message.
681 */
682 if (cp->isbackend)
683 ereport(WARNING,
684 (errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
685 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
686 else
687 ereport(DEBUG1,
688 (errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
689 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
690 cp->wbufpo = 0;
691 return -1;
692 }
693 }
694
695 cp->wbufpo = 0;
696
697 return 0;
698 }
699
700 /*
701 * flush write buffer and degenerate/failover if error occurs
702 */
pool_flush(POOL_CONNECTION * cp)703 int pool_flush(POOL_CONNECTION *cp)
704 {
705 if (pool_flush_it(cp) == -1)
706 {
707 if (cp->isbackend)
708 {
709 if (cp->con_info && cp->con_info->swallow_termination == 1)
710 {
711 cp->con_info->swallow_termination = 0;
712 ereport(FATAL,
713 (errmsg("unable to read data from DB node %d",cp->db_node_id),
714 errdetail("pg_terminate_backend was called on the backend")));
715 }
716
717 /* if fail_over_on_backend_error is true, then trigger failover */
718 if (pool_config->fail_over_on_backend_error)
719 {
720 notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
721 ereport(LOG,
722 (errmsg("unable to flush data to backend"),
723 errdetail("do not failover because I am the main process")));
724
725 child_exit(POOL_EXIT_AND_RESTART);
726 return -1;
727 }
728 else
729 {
730 ereport(ERROR,
731 (errmsg("unable to flush data to backend"),
732 errdetail("do not failover because fail_over_on_backend_error is off")));
733 }
734 }
735 else
736 {
737 /*
738 * If we are in replication mode, we need to continue the
739 * processing with backends to keep consistency among
740 * backends, thus ignore error.
741 */
742 if (REPLICATION)
743 ereport(NOTICE,
744 (errmsg("unable to flush data to frontend"),
745 errdetail("pgpool is in replication mode, ignoring error to keep consistency among backends")));
746 else
747 ereport(FRONTEND_ERROR,
748 (errmsg("unable to flush data to frontend")));
749
750 }
751 }
752 return 0;
753 }
754
755 /*
756 * same as pool_flush() but returns -ve value instead of ereport in case of failure
757 */
pool_flush_noerror(POOL_CONNECTION * cp)758 int pool_flush_noerror(POOL_CONNECTION *cp)
759 {
760 if (pool_flush_it(cp) == -1)
761 {
762 if (cp->isbackend)
763 {
764 if (cp->con_info && cp->con_info->swallow_termination == 1)
765 {
766 cp->con_info->swallow_termination = 0;
767 ereport(FATAL,
768 (errmsg("unable to read data from DB node %d",cp->db_node_id),
769 errdetail("pg_terminate_backend was called on the backend")));
770 }
771
772 /* if fail_over_on_backend_erro is true, then trigger failover */
773 if (pool_config->fail_over_on_backend_error)
774 {
775 notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
776 child_exit(POOL_EXIT_AND_RESTART);
777 ereport(LOG,
778 (errmsg("unable to flush data to backend"),
779 errdetail("do not failover because I am the main process")));
780 return -1;
781 }
782 else
783 {
784 ereport(LOG,
785 (errmsg("unable to flush data to backend"),
786 errdetail("do not failover because fail_over_on_backend_error is off")));
787 return -1;
788 }
789 }
790 else
791 {
792 /*
793 * If we are in replication mode, we need to continue the
794 * processing with backends to keep consistency among
795 * backends, thus ignore error.
796 */
797 if (REPLICATION)
798 return 0;
799 else
800 return -1;
801 }
802 }
803 return 0;
804 }
805
806 /*
807 * combo of pool_write and pool_flush
808 */
pool_write_and_flush(POOL_CONNECTION * cp,void * buf,int len)809 void pool_write_and_flush(POOL_CONNECTION *cp, void *buf, int len)
810 {
811 pool_write(cp, buf, len);
812 pool_flush(cp);
813 }
814
815 /*
816 * same as pool_write_and_flush() but does not throws ereport when error occures
817 */
pool_write_and_flush_noerror(POOL_CONNECTION * cp,void * buf,int len)818 int pool_write_and_flush_noerror(POOL_CONNECTION *cp, void *buf, int len)
819 {
820 int ret;
821 ret = pool_write_noerror(cp,buf,len);
822 if(ret == 0)
823 return pool_flush_noerror(cp);
824 return ret;
825 }
826
827 /*
828 * read a string until EOF or NULL is encountered.
829 * if line is not 0, read until new line is encountered.
830 */
pool_read_string(POOL_CONNECTION * cp,int * len,int line)831 char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
832 {
833 int readp;
834 int readsize;
835 int readlen;
836 int strlength;
837 int flag;
838 int consume_size;
839
840 #ifdef DEBUG
841 static char pbuf[READBUFSZ];
842 #endif
843
844 *len = 0;
845 readp = 0;
846
847 /* initialize read buffer */
848 if (cp->sbufsz == 0)
849 {
850 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
851 cp->sbuf = palloc(READBUFSZ);
852 MemoryContextSwitchTo(oldContext);
853
854 cp->sbufsz = READBUFSZ;
855 *cp->sbuf = '\0';
856 }
857
858 /* any pending data? */
859 if (cp->len)
860 {
861 if (line)
862 strlength = mystrlinelen(cp->hp+cp->po, cp->len, &flag);
863 else
864 strlength = mystrlen(cp->hp+cp->po, cp->len, &flag);
865
866 /* buffer is too small? */
867 if ((strlength + 1) > cp->sbufsz)
868 {
869 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
870 cp->sbufsz = ((strlength+1)/READBUFSZ+1)*READBUFSZ;
871 cp->sbuf = repalloc(cp->sbuf, cp->sbufsz);
872 MemoryContextSwitchTo(oldContext);
873 }
874
875 /* consume pending and save to read string buffer */
876 consume_size = consume_pending_data(cp, cp->sbuf, strlength);
877
878 *len = strlength;
879
880 /* is the string null terminated? */
881 if (consume_size == strlength && !flag)
882 {
883 /* not null or line terminated.
884 * we need to read more since we have not encountered NULL or new line yet
885 */
886 readsize = cp->sbufsz - strlength;
887 readp = strlength;
888 }
889 else
890 {
891 ereport(DEBUG1,
892 (errmsg("reading string data"),
893 errdetail("read all from pending data. po:%d len:%d",
894 cp->po, cp->len)));
895 return cp->sbuf;
896 }
897 } else
898 {
899 readsize = cp->sbufsz;
900 }
901
902 for (;;)
903 {
904 if (pool_check_fd(cp))
905 {
906 if (!IS_MASTER_NODE_ID(cp->db_node_id))
907 {
908 ereport(FATAL,
909 (errmsg("unable to read data from DB node %d",cp->db_node_id),
910 errdetail("data is not ready in DB node")));
911
912 }
913 else
914 {
915 ereport(ERROR,
916 (errmsg("unable to read data from DB node %d",cp->db_node_id),
917 errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
918 }
919 }
920
921 if (cp->ssl_active > 0)
922 {
923 readlen = pool_ssl_read(cp, cp->sbuf+readp, readsize);
924 }
925 else
926 {
927 readlen = read(cp->fd, cp->sbuf+readp, readsize);
928 }
929
930 if (readlen == -1)
931 {
932 cp->socket_state = POOL_SOCKET_ERROR;
933 if (cp->isbackend)
934 {
935 if (cp->con_info && cp->con_info->swallow_termination == 1)
936 {
937 cp->con_info->swallow_termination = 0;
938 ereport(FATAL,
939 (errmsg("unable to read data from DB node %d",cp->db_node_id),
940 errdetail("pg_terminate_backend was called on the backend")));
941 }
942
943 notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
944 child_exit(POOL_EXIT_AND_RESTART);
945 ereport(ERROR,
946 (errmsg("unable to read data from frontend"),
947 errdetail("socket read function returned -1")));
948 }
949 else
950 {
951 ereport(ERROR,
952 (errmsg("unable to read data from frontend"),
953 errdetail("socket read function returned -1")));
954
955 }
956 }
957 else if (readlen == 0) /* EOF detected */
958 {
959 /*
960 * just returns an error, not trigger failover or degeneration
961 */
962 cp->socket_state = POOL_SOCKET_EOF;
963 ereport(ERROR,
964 (errmsg("unable to read data from %s",cp->isbackend?"backend":"frontend"),
965 errdetail("EOF read on socket")));
966
967 }
968
969 /* check overrun */
970 if (line)
971 strlength = mystrlinelen(cp->sbuf+readp, readlen, &flag);
972 else
973 strlength = mystrlen(cp->sbuf+readp, readlen, &flag);
974
975 if (strlength < readlen)
976 {
977 save_pending_data(cp, cp->sbuf+readp+strlength, readlen-strlength);
978 *len += strlength;
979 ereport(DEBUG1,
980 (errmsg("reading string data"),
981 errdetail("total read %d with pending data po:%d len:%d", *len, cp->po, cp->len)));
982 return cp->sbuf;
983 }
984
985 *len += readlen;
986
987 /* encountered null or newline? */
988 if (flag)
989 {
990 /* ok we have read all data */
991 ereport(DEBUG1,
992 (errmsg("reading string data"),
993 errdetail("all data read: total read %d", *len)));
994 break;
995 }
996
997 readp += readlen;
998 readsize = READBUFSZ;
999
1000 if ((*len+readsize) > cp->sbufsz)
1001 {
1002 cp->sbufsz += READBUFSZ;
1003 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1004 cp->sbuf = repalloc(cp->sbuf, cp->sbufsz);
1005 MemoryContextSwitchTo(oldContext);
1006 }
1007 }
1008 return cp->sbuf;
1009 }
1010
1011 /*
1012 * Set db node id to connection.
1013 */
pool_set_db_node_id(POOL_CONNECTION * con,int db_node_id)1014 void pool_set_db_node_id(POOL_CONNECTION *con, int db_node_id)
1015 {
1016 if (!con)
1017 return;
1018 con->db_node_id = db_node_id;
1019 }
1020
1021 /*
1022 * returns the byte length of str, including \0, no more than upper.
1023 * if encountered \0, flag is set to non 0.
1024 * example:
1025 * mystrlen("abc", 2) returns 2
1026 * mystrlen("abc", 3) returns 3
1027 * mystrlen("abc", 4) returns 4
1028 * mystrlen("abc", 5) returns 4
1029 */
mystrlen(char * str,int upper,int * flag)1030 static int mystrlen(char *str, int upper, int *flag)
1031 {
1032 int len;
1033
1034 *flag = 0;
1035
1036 for (len = 0;len < upper; len++, str++)
1037 {
1038 if (!*str)
1039 {
1040 len++;
1041 *flag = 1;
1042 break;
1043 }
1044 }
1045 return len;
1046 }
1047
1048 /*
1049 * returns the byte length of str terminated by \n or \0 (including \n or \0), no more than upper.
1050 * if encountered \0 or \n, flag is set to non 0.
1051 * example:
1052 * mystrlinelen("abc", 2) returns 2
1053 * mystrlinelen("abc", 3) returns 3
1054 * mystrlinelen("abc", 4) returns 4
1055 * mystrlinelen("abc", 5) returns 4
1056 * mystrlinelen("abcd\nefg", 4) returns 4
1057 * mystrlinelen("abcd\nefg", 5) returns 5
1058 * mystrlinelen("abcd\nefg", 6) returns 5
1059 */
mystrlinelen(char * str,int upper,int * flag)1060 static int mystrlinelen(char *str, int upper, int *flag)
1061 {
1062 int len;
1063
1064 *flag = 0;
1065
1066 for (len = 0;len < upper; len++, str++)
1067 {
1068 if (!*str || *str == '\n')
1069 {
1070 len++;
1071 *flag = 1;
1072 break;
1073 }
1074 }
1075 return len;
1076 }
1077
1078 /*
1079 * save pending data
1080 */
save_pending_data(POOL_CONNECTION * cp,void * data,int len)1081 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len)
1082 {
1083 int reqlen;
1084 size_t realloc_size;
1085 char *p;
1086
1087 /* to be safe */
1088 if (cp->len == 0)
1089 cp->po = 0;
1090
1091 reqlen = cp->po + cp->len + len;
1092
1093 /* pending buffer is enough? */
1094 if (reqlen > cp->bufsz)
1095 {
1096 /* too small, enlarge it */
1097 realloc_size = (reqlen/READBUFSZ+1)*READBUFSZ;
1098
1099 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1100 p = repalloc(cp->hp, realloc_size);
1101 MemoryContextSwitchTo(oldContext);
1102
1103 cp->bufsz = realloc_size;
1104 cp->hp = p;
1105 }
1106
1107 memmove(cp->hp + cp->po + cp->len, data, len);
1108 cp->len += len;
1109
1110 return 0;
1111 }
1112
1113 /*
1114 * consume pending data. returns actually consumed data length.
1115 */
consume_pending_data(POOL_CONNECTION * cp,void * data,int len)1116 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len)
1117 {
1118 int consume_size;
1119
1120 if (cp->len <= 0)
1121 return 0;
1122
1123 consume_size = Min(len, cp->len);
1124 memmove(data, cp->hp + cp->po, consume_size);
1125 cp->len -= consume_size;
1126
1127 if (cp->len <= 0)
1128 cp->po = 0;
1129 else
1130 cp->po += consume_size;
1131
1132 return consume_size;
1133 }
1134
1135 /*
1136 * pool_unread: Put back data to input buffer
1137 */
pool_unread(POOL_CONNECTION * cp,void * data,int len)1138 int pool_unread(POOL_CONNECTION *cp, void *data, int len)
1139 {
1140 void *p = cp->hp;
1141 int n = cp->len + len;
1142 int realloc_size;
1143
1144 /*
1145 * Optimization to avoid mmove. If there's enough space in front of
1146 * existing data, we can use it.
1147 */
1148 if (cp->po >= len)
1149 {
1150 memmove(cp->hp + cp->po - len, data, len);
1151 cp->po -= len;
1152 cp->len = n;
1153 return 0;
1154 }
1155
1156 if (cp->bufsz < n)
1157 {
1158 realloc_size = (n/READBUFSZ+1)*READBUFSZ;
1159
1160 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1161 p = repalloc(cp->hp, realloc_size);
1162 MemoryContextSwitchTo(oldContext);
1163
1164 cp->hp = p;
1165 cp->bufsz = realloc_size;
1166 }
1167 if (cp->len != 0)
1168 memmove(p + len, cp->hp + cp->po, cp->len);
1169 memmove(p, data, len);
1170 cp->len = n;
1171 cp->po = 0;
1172 return 0;
1173 }
1174
1175 /*
1176 * pool_push: Push data into buffer stack.
1177 */
pool_push(POOL_CONNECTION * cp,void * data,int len)1178 int pool_push(POOL_CONNECTION *cp, void *data, int len)
1179 {
1180 char *p;
1181
1182 ereport(DEBUG1,
1183 (errmsg("pushing data of len: %d", len)));
1184
1185
1186 MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1187
1188 if (cp->bufsz3 == 0)
1189 {
1190 p = cp->buf3 = palloc(len);
1191 }
1192 else
1193 {
1194 cp->buf3 = repalloc(cp->buf3, cp->bufsz3 + len);
1195 p = cp->buf3 + cp->bufsz3;
1196 }
1197
1198 memcpy(p, data, len);
1199 cp->bufsz3 += len;
1200
1201 MemoryContextSwitchTo(oldContext);
1202 return 0;
1203 }
1204
1205 /*
1206 * pool_pop: Pop data from buffer stack and put back data using
1207 * pool_unread.
1208 */
pool_pop(POOL_CONNECTION * cp,int * len)1209 void pool_pop(POOL_CONNECTION *cp, int *len)
1210 {
1211 if (cp->bufsz3 == 0)
1212 {
1213 *len = 0;
1214 ereport(DEBUG1,
1215 (errmsg("pop data of len: %d", *len)));
1216 return;
1217 }
1218
1219 pool_unread(cp, cp->buf3, cp->bufsz3);
1220 *len = cp->bufsz3;
1221 pfree(cp->buf3);
1222 cp->bufsz3 = 0;
1223 cp->buf3 = NULL;
1224 ereport(DEBUG1,
1225 (errmsg("pop data of len: %d", *len)));
1226 }
1227
1228 /*
1229 * pool_stacklen: Returns buffer stack length
1230 * pool_unread.
1231 */
pool_stacklen(POOL_CONNECTION * cp)1232 int pool_stacklen(POOL_CONNECTION *cp)
1233 {
1234 return cp->bufsz3;
1235 }
1236
1237 /*
1238 * set non-block flag
1239 */
pool_set_nonblock(int fd)1240 void pool_set_nonblock(int fd)
1241 {
1242 int var;
1243
1244 /* set fd to none blocking */
1245 var = fcntl(fd, F_GETFL, 0);
1246 if (var == -1)
1247 {
1248 ereport(FATAL,
1249 (errmsg("unable to set options on socket"),
1250 errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1251
1252 }
1253 if (fcntl(fd, F_SETFL, var | O_NONBLOCK) == -1)
1254 {
1255 ereport(FATAL,
1256 (errmsg("unable to set options on socket"),
1257 errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1258 }
1259 }
1260
1261 /*
1262 * unset non-block flag
1263 */
pool_unset_nonblock(int fd)1264 void pool_unset_nonblock(int fd)
1265 {
1266 int var;
1267
1268 /* set fd to none blocking */
1269 var = fcntl(fd, F_GETFL, 0);
1270 if (var == -1)
1271 {
1272 ereport(FATAL,
1273 (errmsg("unable to set options on socket"),
1274 errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1275 }
1276 if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
1277 {
1278 ereport(FATAL,
1279 (errmsg("unable to set options on socket"),
1280 errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1281 }
1282 }
1283
1284 #ifdef DEBUG
1285 /*
1286 * Debug aid
1287 */
dump_buffer(char * buf,int len)1288 static void dump_buffer(char *buf, int len)
1289 {
1290 while (--len)
1291 {
1292 ereport(DEBUG1,
1293 (errmsg("%02x", *buf++)));
1294 }
1295 }
1296 #endif
socket_write(int fd,void * buf,size_t len)1297 int socket_write(int fd, void* buf, size_t len)
1298 {
1299 int bytes_send = 0;
1300 do
1301 {
1302 int ret;
1303 ret = write(fd, buf + bytes_send, (len - bytes_send));
1304 if (ret <=0)
1305 {
1306 if (errno == EINTR || errno == EAGAIN)
1307 {
1308 ereport(DEBUG1,
1309 (errmsg("write on socket failed with error :\"%s\"",strerror(errno)),
1310 errdetail("retrying...")));
1311 continue;
1312 }
1313 ereport(LOG,
1314 (errmsg("write on socket failed with error :\"%s\"",strerror(errno))));
1315 return -1;
1316 }
1317 bytes_send += ret;
1318 }while (bytes_send < len);
1319 return bytes_send;
1320 }
1321
socket_read(int fd,void * buf,size_t len,int timeout)1322 int socket_read(int fd, void* buf, size_t len, int timeout)
1323 {
1324 int ret, read_len;
1325 read_len = 0;
1326 struct timeval timeoutval;
1327 fd_set readmask;
1328 int fds;
1329
1330 while (read_len < len)
1331 {
1332 FD_ZERO(&readmask);
1333 FD_SET(fd, &readmask);
1334
1335 timeoutval.tv_sec = timeout;
1336 timeoutval.tv_usec = 0;
1337
1338 fds = select(fd+1, &readmask, NULL, NULL, timeout?&timeoutval:NULL);
1339 if (fds == -1)
1340 {
1341 if (errno == EAGAIN || errno == EINTR)
1342 continue;
1343
1344 ereport(WARNING,
1345 (errmsg("select failed with error: \"%s\"", strerror(errno))));
1346 return -1;
1347 }
1348 else if (fds == 0)
1349 {
1350 return -2;
1351 }
1352 ret = read(fd, buf + read_len, (len - read_len));
1353 if(ret < 0)
1354 {
1355 if (errno == EINTR || errno == EAGAIN)
1356 {
1357 ereport(DEBUG1,
1358 (errmsg("read from socket failed with error :\"%s\"",strerror(errno)),
1359 errdetail("retrying...")));
1360 continue;
1361 }
1362 ereport(LOG,
1363 (errmsg("read from socket failed with error :\"%s\"",strerror(errno))));
1364 return -1;
1365 }
1366 if(ret == 0)
1367 {
1368 ereport(LOG,
1369 (errmsg("read from socket failed, remote end closed the connection")));
1370 return 0;
1371 }
1372 read_len +=ret;
1373 }
1374 return read_len;
1375 }
1376