1 /* -*-pgsql-c-*- */
2 /*
3 * $Header$
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2018	PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose.  It is provided "as
19 * is" without express or implied warranty.
20 *
21 * pool_stream.c: stream I/O modules
22 *
23 */
24 
25 #include "config.h"
26 
27 #ifdef HAVE_SYS_SELECT_H
28 #include <sys/select.h>
29 #endif
30 
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <unistd.h>
36 
37 #ifdef HAVE_FCNTL_H
38 #include <fcntl.h>
39 #endif
40 
41 #include "pool.h"
42 #include "utils/elog.h"
43 #include "utils/palloc.h"
44 #include "utils/memutils.h"
45 #include "utils/pool_stream.h"
46 #include "pool_config.h"
47 
48 static int mystrlen(char *str, int upper, int *flag);
49 static int mystrlinelen(char *str, int upper, int *flag);
50 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len);
51 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len);
52 static MemoryContext SwitchToConnectionContext(bool backend_connection);
53 #ifdef DEBUG
54 static void dump_buffer(char *buf, int len);
55 #endif
56 static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len);
57 
58 static MemoryContext
SwitchToConnectionContext(bool backend_connection)59     SwitchToConnectionContext(bool backend_connection)
60 {
61     /*
62      * backend connection can live as long as process life,
63      * while the frontend connection is only for one session life.
64      * So we create backend connection in long living memory context
65      */
66     if(backend_connection)
67         return MemoryContextSwitchTo(TopMemoryContext);
68     else
69         return MemoryContextSwitchTo(ProcessLoopContext);
70 }
71 
72 /*
73 * open read/write file descriptors.
74 * returns POOL_CONNECTION on success otherwise NULL.
75 */
pool_open(int fd,bool backend_connection)76 POOL_CONNECTION *pool_open(int fd, bool backend_connection)
77 {
78 	POOL_CONNECTION *cp;
79 
80     MemoryContext oldContext = SwitchToConnectionContext(backend_connection);
81 
82 	cp = (POOL_CONNECTION *)palloc0(sizeof(POOL_CONNECTION));
83 
84 	/* initialize write buffer */
85 	cp->wbuf = palloc(WRITEBUFSZ);
86 	cp->wbufsz = WRITEBUFSZ;
87 	cp->wbufpo = 0;
88 
89 	/* initialize pending data buffer */
90 	cp->hp = palloc(READBUFSZ);
91 	cp->bufsz = READBUFSZ;
92 	cp->po = 0;
93 	cp->len = 0;
94 	cp->sbuf = NULL;
95 	cp->sbufsz = 0;
96 	cp->buf2 = NULL;
97 	cp->bufsz2 = 0;
98 	cp->buf3 = NULL;
99 	cp->bufsz3 = 0;
100 
101 	cp->fd = fd;
102 	cp->socket_state = POOL_SOCKET_VALID;
103 
104     MemoryContextSwitchTo(oldContext);
105 
106 	return cp;
107 }
108 
109 /*
110 * close read/write file descriptors.
111 */
pool_close(POOL_CONNECTION * cp)112 void pool_close(POOL_CONNECTION *cp)
113 {
114 	/*
115 	 * shutdown connection to the client so that pgpool is not blocked
116 	 */
117 	if (!cp->isbackend)
118 		shutdown(cp->fd, 1);
119 	close(cp->fd);
120 	cp->socket_state = POOL_SOCKET_CLOSED;
121 	pfree(cp->wbuf);
122 	pfree(cp->hp);
123 	if (cp->sbuf)
124 		pfree(cp->sbuf);
125 	if (cp->buf2)
126 		pfree(cp->buf2);
127 	if (cp->buf3)
128 		pfree(cp->buf3);
129 	pool_discard_params(&cp->params);
130 
131 	pool_ssl_close(cp);
132 
133 	pfree(cp);
134 }
135 
pool_read_with_error(POOL_CONNECTION * cp,void * buf,int len,const char * err_context)136 void pool_read_with_error(POOL_CONNECTION *cp, void *buf, int len,
137                           const char* err_context )
138 {
139 	if (pool_read(cp, buf, len) < 0)
140 	{
141         ereport(ERROR,
142 			(errmsg("failed to read data of length %d from DB node: %d",len,cp->db_node_id),
143                  errdetail("error occurred when reading: %s",err_context?err_context:"")));
144 	}
145 }
146 /*
147 * read len bytes from cp
148 * returns 0 on success otherwise throws an ereport.
149 */
pool_read(POOL_CONNECTION * cp,void * buf,int len)150 int pool_read(POOL_CONNECTION *cp, void *buf, int len)
151 {
152 	static char readbuf[READBUFSZ];
153 
154 	int consume_size;
155 	int readlen;
156 
157 	consume_size = consume_pending_data(cp, buf, len);
158 	len -= consume_size;
159 	buf += consume_size;
160 
161 	while (len > 0)
162 	{
163 		if (pool_check_fd(cp))
164 		{
165 			if (!IS_MASTER_NODE_ID(cp->db_node_id) && (getpid() != mypid))
166 			{
167                 ereport(FATAL,
168                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
169                          errdetail("data is not ready in DB node")));
170 
171 			}
172 			else
173 			{
174                 ereport(ERROR,
175                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
176                          errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
177 			}
178 		}
179 
180 		if (cp->ssl_active > 0)
181 		{
182 		  readlen = pool_ssl_read(cp, readbuf, READBUFSZ);
183 		}
184 		else
185 		{
186 		  readlen = read(cp->fd, readbuf, READBUFSZ);
187 		  if (cp->isbackend)
188 		  {
189 			  ereport(DEBUG1,
190 				  (errmsg("pool_read: read %d bytes from backend %d",
191 						  readlen, cp->db_node_id)));
192 #ifdef DEBUG
193 			  dump_buffer(readbuf, readlen);
194 #endif
195 		  }
196 		}
197 
198 		if (readlen == -1)
199 		{
200 			if (errno == EINTR || errno == EAGAIN)
201 			{
202 				ereport(DEBUG1,
203 					(errmsg("read on socket failed with error :\"%s\"",strerror(errno)),
204 						 errdetail("retrying...")));
205 				continue;
206 			}
207 
208 			cp->socket_state = POOL_SOCKET_ERROR;
209 			if (cp->isbackend)
210 			{
211 				if (cp->con_info && cp->con_info->swallow_termination == 1)
212 				{
213 					cp->con_info->swallow_termination = 0;
214 					ereport(FATAL,
215 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
216 							 errdetail("pg_terminate_backend was called on the backend")));
217 				}
218 
219 				/* if fail_over_on_backend_error is true, then trigger failover */
220 				if (pool_config->fail_over_on_backend_error)
221 				{
222 					notice_backend_error(cp->db_node_id, true);
223 
224                     /* If we are in the main process, we will not exit */
225 					child_exit(POOL_EXIT_AND_RESTART);
226 					ereport(ERROR,
227 							(errmsg("unable to read data from DB node %d",cp->db_node_id),
228 							 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
229 				}
230 				else
231 				{
232 					ereport(ERROR,
233 							(errmsg("unable to read data from DB node %d",cp->db_node_id),
234 							 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
235 				}
236 			}
237 			else
238 			{
239                 ereport(FRONTEND_ERROR,
240 					(errmsg("unable to read data from frontend"),
241                          errdetail("socket read failed with an error \"%s\"", strerror(errno))));
242 			}
243 		}
244 		else if (readlen == 0)
245 		{
246 			cp->socket_state = POOL_SOCKET_EOF;
247 			if (cp->isbackend)
248 			{
249 				if(processType == PT_MAIN)
250 					ereport(ERROR,
251 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
252 							 errdetail("EOF encountered with backend")));
253 
254                 ereport(FATAL,
255 					(errmsg("unable to read data from DB node %d",cp->db_node_id),
256                          errdetail("EOF encountered with backend")));
257 			}
258 			else
259 			{
260 				/*
261 				 * if backend offers authentication method, frontend could close connection
262 				 */
263                 ereport(FRONTEND_ERROR,
264 					(errmsg("unable to read data from frontend"),
265                          errdetail("EOF encountered with frontend")));
266 			}
267 		}
268 
269 		if (len < readlen)
270 		{
271 			/* overrun. we need to save remaining data to pending buffer */
272 			save_pending_data(cp, readbuf+len, readlen-len);
273 			memmove(buf, readbuf, len);
274 			break;
275 		}
276 
277 		memmove(buf, readbuf, readlen);
278 		buf += readlen;
279 		len -= readlen;
280 	}
281 
282 	return 0;
283 }
284 
285 /*
286 * read exactly len bytes from cp
287 * returns buffer address on success otherwise NULL.
288 */
pool_read2(POOL_CONNECTION * cp,int len)289 char *pool_read2(POOL_CONNECTION *cp, int len)
290 {
291 	char *buf;
292 	int req_size;
293 	int alloc_size;
294 	int consume_size;
295 	int readlen;
296     MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
297 
298 	req_size = cp->len + len;
299 
300 	if (req_size > cp->bufsz2)
301 	{
302 		alloc_size = ((req_size+1)/READBUFSZ+1)*READBUFSZ;
303 		cp->buf2 = repalloc(cp->buf2, alloc_size);
304 		cp->bufsz2 = alloc_size;
305 	}
306 
307 	buf = cp->buf2;
308 
309 	consume_size = consume_pending_data(cp, buf, len);
310 	len -= consume_size;
311 	buf += consume_size;
312 
313 	while (len > 0)
314 	{
315 		if (pool_check_fd(cp))
316 		{
317 			if (!IS_MASTER_NODE_ID(cp->db_node_id))
318 			{
319                 ereport(FATAL,
320                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
321                          errdetail("data is not ready in DB node")));
322 
323 			}
324 			else
325 			{
326                 ereport(ERROR,
327                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
328                          errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
329             }
330 		}
331 
332 		if (cp->ssl_active > 0)
333 		{
334 		  readlen = pool_ssl_read(cp, buf, len);
335 		}
336 		else
337 		{
338 		  readlen = read(cp->fd, buf, len);
339 		  if (cp->isbackend)
340 			  ereport(DEBUG1,
341 				  (errmsg("pool_read2: read %d bytes from backend %d",
342 						  readlen, cp->db_node_id)));
343 		}
344 
345 		if (readlen == -1)
346 		{
347 			if (errno == EINTR || errno == EAGAIN)
348 			{
349 				ereport(DEBUG1,
350 					(errmsg("read on socket failed with error :\"%s\"",strerror(errno)),
351 						 errdetail("retrying...")));
352 				continue;
353 			}
354 
355 			cp->socket_state = POOL_SOCKET_ERROR;
356 			if (cp->isbackend)
357 			{
358 				if (cp->con_info && cp->con_info->swallow_termination == 1)
359 				{
360 					cp->con_info->swallow_termination = 0;
361 					ereport(FATAL,
362 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
363 							 errdetail("pg_terminate_backend was called on the backend")));
364 				}
365 
366 				/* if fail_over_on_backend_error is true, then trigger failover */
367 				if (pool_config->fail_over_on_backend_error)
368 				{
369 					notice_backend_error(cp->db_node_id, true);
370 					child_exit(POOL_EXIT_AND_RESTART);
371                     /* we are in main process */
372                     ereport(ERROR,
373                             (errmsg("unable to read data from DB node %d",cp->db_node_id),
374                              errdetail("socket read failed with an error \"%s\"", strerror(errno))));
375 				}
376 				else
377 				{
378                     ereport(ERROR,
379                         (errmsg("unable to read data from DB node %d",cp->db_node_id),
380                              errdetail("do not failover because fail_over_on_backend_error is off")));
381 				}
382 			}
383 			else
384 			{
385                 ereport(ERROR,
386                     (errmsg("unable to read data from frontend"),
387                          errdetail("socket read function returned -1")));
388 			}
389 		}
390 		else if (readlen == 0)
391 		{
392 			cp->socket_state = POOL_SOCKET_EOF;
393 			if (cp->isbackend)
394 			{
395                 ereport(ERROR,
396                     (errmsg("unable to read data from backend"),
397                          errdetail("EOF read on socket")));
398 			}
399 			else
400 			{
401 				/*
402 				 * if backend offers authentication method, frontend could close connection
403 				 */
404                 ereport(ERROR,
405                     (errmsg("unable to read data from frontend"),
406                          errdetail("EOF read on socket")));
407 
408 			}
409 		}
410 
411 		buf += readlen;
412 		len -= readlen;
413 	}
414     MemoryContextSwitchTo(oldContext);
415 	return cp->buf2;
416 }
417 
418 /*
419  * write len bytes to cp the write buffer.
420  * returns 0 on success otherwise -1.
421  */
pool_write_noerror(POOL_CONNECTION * cp,void * buf,int len)422 int pool_write_noerror(POOL_CONNECTION *cp, void *buf, int len)
423 {
424 	if (len < 0)
425         return -1;
426 
427 	if (cp->no_forward)
428 		return 0;
429 
430 	if (len == 1 && cp->isbackend)
431 	{
432 		char c;
433 
434 		c = ((char *)buf)[0];
435 
436 		ereport(DEBUG1,
437 				(errmsg("pool_write: to backend: %d kind:%c", cp->db_node_id, c)));
438 	}
439 
440 	if (!cp->isbackend)
441 	{
442 		char c;
443 
444 		c = ((char *)buf)[0];
445 
446 		if (len == 1)
447 			ereport(DEBUG1,
448 					(errmsg("pool_write: to frontend: kind:%c po:%d", c, cp->wbufpo)));
449 		else
450 			ereport(DEBUG1,
451 					(errmsg("pool_write: to frontend: length:%d po:%d", len, cp->wbufpo)));
452 	}
453 
454 	while (len > 0)
455 	{
456 		int remainder = WRITEBUFSZ - cp->wbufpo;
457 
458 		/*
459 		 * If requested data cannot be added to the write buffer, flush the
460 		 * buffer and directly write the requested data.  This could avoid
461 		 * unwanted write in the middle of message boundary.
462 		 */
463 		if (remainder < len)
464 		{
465 			if (pool_flush_it(cp) == -1)
466 				return -1;
467 
468 			if (pool_write_flush(cp, buf, len) < 0)
469 				return -1;
470 			return 0;
471 		}
472 
473 		if (cp->wbufpo >= WRITEBUFSZ)
474 		{
475 			/*
476 			 * Write buffer is full. so flush buffer.
477 			 * wbufpo is reset in pool_flush_it().
478 			 */
479 			if (pool_flush_it(cp) == -1)
480 				return -1;
481             remainder = WRITEBUFSZ;
482 		}
483 
484 		/* check buffer size */
485 		remainder = Min(remainder, len);
486 
487 		memcpy(cp->wbuf+cp->wbufpo, buf, remainder);
488 		cp->wbufpo += remainder;
489 		buf += remainder;
490 		len -= remainder;
491 	}
492 	return 0;
493 }
494 
495 /*
496  * write len bytes to cp the write buffer.
497  * returns 0 on success otherwise ereport.
498  */
pool_write(POOL_CONNECTION * cp,void * buf,int len)499 int pool_write(POOL_CONNECTION *cp, void *buf, int len)
500 {
501 	if (len < 0)
502         ereport(ERROR,
503 			(errmsg("unable to write data to %s",cp->isbackend?"backend":"frontend"),
504                  errdetail("invalid data size: %d", len)));
505 
506     if(pool_write_noerror(cp,buf,len))
507         ereport(ERROR,
508             (errmsg("unable to write data to %s",cp->isbackend?"backend":"frontend"),
509                  errdetail("pool_flush failed")));
510 
511     return 0;
512 }
513 
514 
515 /*
516  * Direct write.
517  * This function does not throws an ereport in case of an error
518  */
pool_write_flush(POOL_CONNECTION * cp,void * buf,int len)519 static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len)
520 {
521 	int sts;
522 	int wlen;
523 	int offset;
524 	wlen = len;
525 
526 	ereport(DEBUG1,
527 			(errmsg("pool_write_flush: write size: %d", wlen)));
528 
529 	if (wlen == 0)
530 	{
531 		return 0;
532 	}
533 
534 	offset = 0;
535 
536 	for (;;)
537 	{
538 		errno = 0;
539 
540 		if (cp->ssl_active > 0)
541 		{
542 			sts = pool_ssl_write(cp, buf+offset, wlen);
543 		}
544 		else
545 		{
546 			sts = write(cp->fd, buf+offset, wlen);
547 		}
548 
549 		if (sts >= 0)
550 		{
551 			wlen -= sts;
552 
553 			if (wlen == 0)
554 			{
555 				/* write completed */
556 				break;
557 			}
558 
559 			else if (wlen < 0)
560 			{
561 				ereport(WARNING,
562 						(errmsg("pool_write_flush: invalid write size %d", sts)));
563 				return -1;
564 			}
565 
566 			else
567 			{
568 				/* need to write remaining data */
569 				ereport(DEBUG1,
570 						(errmsg("pool_write_flush: write retry: %d", wlen)));
571 
572 				offset += sts;
573 				continue;
574 			}
575 		}
576 
577 		else if (errno == EAGAIN || errno == EINTR)
578 		{
579 			continue;
580 		}
581 
582 		else
583 		{
584 			/* If this is the backend stream, report error. Otherwise
585 			 * just report debug message.
586 			 */
587 			if (cp->isbackend)
588 				ereport(WARNING,
589 					(errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
590 						 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
591 			else
592 				ereport(DEBUG1,
593 					(errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
594 						 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
595 			return -1;
596 		}
597 	}
598 
599 	return 0;
600 }
601 
602 /*
603  * flush write buffer
604  * This function does not throws an ereport in case of an error
605  */
pool_flush_it(POOL_CONNECTION * cp)606 int pool_flush_it(POOL_CONNECTION *cp)
607 {
608 	int sts;
609 	int wlen;
610 	int offset;
611 	wlen = cp->wbufpo;
612 
613 	ereport(DEBUG1,
614 			(errmsg("pool_flush_it: flush size: %d", wlen)));
615 
616 	if (wlen == 0)
617 	{
618 		return 0;
619 	}
620 
621 	offset = 0;
622 
623 	for (;;)
624 	{
625 		errno = 0;
626 
627 		if (cp->ssl_active > 0)
628 		{
629 		  sts = pool_ssl_write(cp, cp->wbuf + offset, wlen);
630 		}
631 		else
632 		{
633 		  sts = write(cp->fd, cp->wbuf + offset, wlen);
634 		}
635 
636 		if (sts >= 0)
637 		{
638 			wlen -= sts;
639 
640 			if (wlen == 0)
641 			{
642 				/* write completed */
643 				break;
644 			}
645 
646 			else if (wlen < 0)
647 			{
648 				ereport(WARNING,
649 						(errmsg("pool_flush_it: invalid write size %d", sts)));
650 				cp->wbufpo = 0;
651 				return -1;
652 			}
653 
654 			else
655 			{
656 				/* need to write remaining data */
657 				ereport(DEBUG1,
658 						(errmsg("pool_flush_it: write retry: %d", wlen)));
659 
660 				offset += sts;
661 				continue;
662 			}
663 		}
664 
665 		else if (errno == EAGAIN || errno == EINTR)
666 		{
667 			continue;
668 		}
669 
670 		else
671 		{
672 			/* If this is the backend stream, report error. Otherwise
673 			 * just report debug message.
674 			 */
675 			if (cp->isbackend)
676 				ereport(WARNING,
677 					(errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
678 						 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
679 			else
680 				ereport(DEBUG1,
681 					(errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
682 						 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
683 			cp->wbufpo = 0;
684 			return -1;
685 		}
686 	}
687 
688 	cp->wbufpo = 0;
689 
690 	return 0;
691 }
692 
693 /*
694  * flush write buffer and degenerate/failover if error occurs
695  */
pool_flush(POOL_CONNECTION * cp)696 int pool_flush(POOL_CONNECTION *cp)
697 {
698 	if (pool_flush_it(cp) == -1)
699 	{
700 		if (cp->isbackend)
701 		{
702 			if (cp->con_info && cp->con_info->swallow_termination == 1)
703 			{
704 				cp->con_info->swallow_termination = 0;
705 				ereport(FATAL,
706 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
707 						 errdetail("pg_terminate_backend was called on the backend")));
708 			}
709 
710 			/* if fail_over_on_backend_error is true, then trigger failover */
711 			if (pool_config->fail_over_on_backend_error)
712 			{
713 				notice_backend_error(cp->db_node_id, true);
714 				ereport(LOG,
715 					(errmsg("unable to flush data to backend"),
716 						 errdetail("do not failover because I am the main process")));
717 
718 				child_exit(POOL_EXIT_AND_RESTART);
719 				return -1;
720 			}
721 			else
722 			{
723                 ereport(ERROR,
724                     (errmsg("unable to flush data to backend"),
725                          errdetail("do not failover because fail_over_on_backend_error is off")));
726 			}
727 		}
728 		else
729 		{
730 			/*
731 			 * If we are in replication mode, we need to continue the
732 			 * processing with backends to keep consistency among
733 			 * backends, thus ignore error.
734 			 */
735 			if (REPLICATION)
736                 ereport(NOTICE,
737                     (errmsg("unable to flush data to frontend"),
738                          errdetail("pgpool is in replication mode, ignoring error to keep consistency among backends")));
739 			else
740                 ereport(FRONTEND_ERROR,
741                     (errmsg("unable to flush data to frontend")));
742 
743 		}
744 	}
745 	return 0;
746 }
747 
748 /*
749  * same as pool_flush() but returns -ve value instead of ereport in case of failure
750  */
pool_flush_noerror(POOL_CONNECTION * cp)751 int pool_flush_noerror(POOL_CONNECTION *cp)
752 {
753     if (pool_flush_it(cp) == -1)
754     {
755         if (cp->isbackend)
756         {
757 			if (cp->con_info && cp->con_info->swallow_termination == 1)
758 			{
759 				cp->con_info->swallow_termination = 0;
760 				ereport(FATAL,
761 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
762 						 errdetail("pg_terminate_backend was called on the backend")));
763 			}
764 
765             /* if fail_over_on_backend_erro is true, then trigger failover */
766             if (pool_config->fail_over_on_backend_error)
767             {
768                 notice_backend_error(cp->db_node_id, true);
769                 child_exit(POOL_EXIT_AND_RESTART);
770 				ereport(LOG,
771 					(errmsg("unable to flush data to backend"),
772 						 errdetail("do not failover because I am the main process")));
773                 return -1;
774             }
775             else
776             {
777 				ereport(LOG,
778 					(errmsg("unable to flush data to backend"),
779                          errdetail("do not failover because fail_over_on_backend_error is off")));
780                 return -1;
781             }
782         }
783         else
784         {
785             /*
786              * If we are in replication mode, we need to continue the
787              * processing with backends to keep consistency among
788              * backends, thus ignore error.
789              */
790             if (REPLICATION)
791                 return 0;
792             else
793                 return -1;
794         }
795     }
796     return 0;
797 }
798 
799 /*
800  * combo of pool_write and pool_flush
801  */
pool_write_and_flush(POOL_CONNECTION * cp,void * buf,int len)802 void pool_write_and_flush(POOL_CONNECTION *cp, void *buf, int len)
803 {
804 	pool_write(cp, buf, len);
805 	pool_flush(cp);
806 }
807 
808 /*
809  * same as pool_write_and_flush() but does not throws ereport when error occures
810  */
pool_write_and_flush_noerror(POOL_CONNECTION * cp,void * buf,int len)811 int pool_write_and_flush_noerror(POOL_CONNECTION *cp, void *buf, int len)
812 {
813 	int ret;
814 	ret = pool_write_noerror(cp,buf,len);
815 	if(ret == 0)
816 		return pool_flush_noerror(cp);
817 	return ret;
818 }
819 
820 /*
821  * read a string until EOF or NULL is encountered.
822  * if line is not 0, read until new line is encountered.
823 */
pool_read_string(POOL_CONNECTION * cp,int * len,int line)824 char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
825 {
826 	int readp;
827 	int readsize;
828 	int readlen;
829 	int strlength;
830 	int flag;
831 	int consume_size;
832 
833 #ifdef DEBUG
834 	static char pbuf[READBUFSZ];
835 #endif
836 
837 	*len = 0;
838 	readp = 0;
839 
840 	/* initialize read buffer */
841 	if (cp->sbufsz == 0)
842 	{
843         MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
844 		cp->sbuf = palloc(READBUFSZ);
845         MemoryContextSwitchTo(oldContext);
846 
847 		cp->sbufsz = READBUFSZ;
848 		*cp->sbuf = '\0';
849 	}
850 
851 	/* any pending data? */
852 	if (cp->len)
853 	{
854 		if (line)
855 			strlength = mystrlinelen(cp->hp+cp->po, cp->len, &flag);
856 		else
857 			strlength = mystrlen(cp->hp+cp->po, cp->len, &flag);
858 
859 		/* buffer is too small? */
860 		if ((strlength + 1) > cp->sbufsz)
861 		{
862             MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
863 			cp->sbufsz = ((strlength+1)/READBUFSZ+1)*READBUFSZ;
864 			cp->sbuf = repalloc(cp->sbuf, cp->sbufsz);
865             MemoryContextSwitchTo(oldContext);
866 		}
867 
868 		/* consume pending and save to read string buffer */
869 		consume_size = consume_pending_data(cp, cp->sbuf, strlength);
870 
871 		*len = strlength;
872 
873 		/* is the string null terminated? */
874 		if (consume_size == strlength && !flag)
875 		{
876 			/* not null or line terminated.
877 			 * we need to read more since we have not encountered NULL or new line yet
878 			 */
879 			readsize = cp->sbufsz - strlength;
880 			readp = strlength;
881 		}
882 		else
883 		{
884 			ereport(DEBUG1,
885 				(errmsg("reading string data"),
886 					 errdetail("read all from pending data. po:%d len:%d",
887 							   cp->po, cp->len)));
888 			return cp->sbuf;
889 		}
890 	} else
891 	{
892 		readsize = cp->sbufsz;
893 	}
894 
895 	for (;;)
896 	{
897 		if (pool_check_fd(cp))
898 		{
899 			if (!IS_MASTER_NODE_ID(cp->db_node_id))
900 			{
901                 ereport(FATAL,
902                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
903                          errdetail("data is not ready in DB node")));
904 
905 			}
906 			else
907 			{
908                 ereport(ERROR,
909                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
910                          errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
911             }
912 		}
913 
914 		if (cp->ssl_active > 0)
915 		{
916 		  readlen = pool_ssl_read(cp, cp->sbuf+readp, readsize);
917 		}
918 		else
919 		{
920 		  readlen = read(cp->fd, cp->sbuf+readp, readsize);
921 		}
922 
923 		if (readlen == -1)
924 		{
925 			cp->socket_state = POOL_SOCKET_ERROR;
926 			if (cp->isbackend)
927 			{
928 				if (cp->con_info && cp->con_info->swallow_termination == 1)
929 				{
930 					cp->con_info->swallow_termination = 0;
931 					ereport(FATAL,
932 							(errmsg("unable to read data from DB node %d",cp->db_node_id),
933 							 errdetail("pg_terminate_backend was called on the backend")));
934 				}
935 
936 				notice_backend_error(cp->db_node_id, true);
937 				child_exit(POOL_EXIT_AND_RESTART);
938                 ereport(ERROR,
939                         (errmsg("unable to read data from frontend"),
940                          errdetail("socket read function returned -1")));
941 			}
942 			else
943 			{
944                 ereport(ERROR,
945                     (errmsg("unable to read data from frontend"),
946                          errdetail("socket read function returned -1")));
947 
948 			}
949 		}
950 		else if (readlen == 0)	/* EOF detected */
951 		{
952 			/*
953 			 * just returns an error, not trigger failover or degeneration
954 			 */
955 			cp->socket_state = POOL_SOCKET_EOF;
956             ereport(ERROR,
957                 (errmsg("unable to read data from %s",cp->isbackend?"backend":"frontend"),
958                      errdetail("EOF read on socket")));
959 
960 		}
961 
962 		/* check overrun */
963 		if (line)
964 			strlength = mystrlinelen(cp->sbuf+readp, readlen, &flag);
965 		else
966 			strlength = mystrlen(cp->sbuf+readp, readlen, &flag);
967 
968 		if (strlength < readlen)
969 		{
970 			save_pending_data(cp, cp->sbuf+readp+strlength, readlen-strlength);
971 			*len += strlength;
972 			ereport(DEBUG1,
973 				(errmsg("reading string data"),
974 					 errdetail("total read %d with pending data po:%d len:%d", *len, cp->po, cp->len)));
975 			return cp->sbuf;
976 		}
977 
978 		*len += readlen;
979 
980 		/* encountered null or newline? */
981 		if (flag)
982 		{
983 			/* ok we have read all data */
984 			ereport(DEBUG1,
985 				(errmsg("reading string data"),
986 					 errdetail("all data read: total read %d", *len)));
987 			break;
988 		}
989 
990 		readp += readlen;
991 		readsize = READBUFSZ;
992 
993 		if ((*len+readsize) > cp->sbufsz)
994 		{
995 			cp->sbufsz += READBUFSZ;
996             MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
997 			cp->sbuf = repalloc(cp->sbuf, cp->sbufsz);
998             MemoryContextSwitchTo(oldContext);
999 		}
1000 	}
1001 	return cp->sbuf;
1002 }
1003 
1004 /*
1005  * Set db node id to connection.
1006  */
pool_set_db_node_id(POOL_CONNECTION * con,int db_node_id)1007 void pool_set_db_node_id(POOL_CONNECTION *con, int db_node_id)
1008 {
1009 	if (!con)
1010 		return;
1011 	con->db_node_id = db_node_id;
1012 }
1013 
1014 /*
1015  * returns the byte length of str, including \0, no more than upper.
1016  * if encountered \0, flag is set to non 0.
1017  * example:
1018  *	mystrlen("abc", 2) returns 2
1019  *	mystrlen("abc", 3) returns 3
1020  *	mystrlen("abc", 4) returns 4
1021  *	mystrlen("abc", 5) returns 4
1022  */
mystrlen(char * str,int upper,int * flag)1023 static int mystrlen(char *str, int upper, int *flag)
1024 {
1025 	int len;
1026 
1027 	*flag = 0;
1028 
1029 	for (len = 0;len < upper; len++, str++)
1030 	{
1031 	    if (!*str)
1032 	    {
1033 			len++;
1034 			*flag = 1;
1035 			break;
1036 	    }
1037 	}
1038 	return len;
1039 }
1040 
1041 /*
1042  * returns the byte length of str terminated by \n or \0 (including \n or \0), no more than upper.
1043  * if encountered \0 or \n, flag is set to non 0.
1044  * example:
1045  *	mystrlinelen("abc", 2) returns 2
1046  *	mystrlinelen("abc", 3) returns 3
1047  *	mystrlinelen("abc", 4) returns 4
1048  *	mystrlinelen("abc", 5) returns 4
1049  *	mystrlinelen("abcd\nefg", 4) returns 4
1050  *	mystrlinelen("abcd\nefg", 5) returns 5
1051  *	mystrlinelen("abcd\nefg", 6) returns 5
1052  */
mystrlinelen(char * str,int upper,int * flag)1053 static int mystrlinelen(char *str, int upper, int *flag)
1054 {
1055 	int len;
1056 
1057 	*flag = 0;
1058 
1059 	for (len = 0;len < upper; len++, str++)
1060 	{
1061 	    if (!*str || *str == '\n')
1062 	    {
1063 			len++;
1064 			*flag = 1;
1065 			break;
1066 	    }
1067 	}
1068 	return len;
1069 }
1070 
1071 /*
1072  * save pending data
1073  */
save_pending_data(POOL_CONNECTION * cp,void * data,int len)1074 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len)
1075 {
1076 	int reqlen;
1077 	size_t realloc_size;
1078 	char *p;
1079 
1080 	/* to be safe */
1081 	if (cp->len == 0)
1082 		cp->po = 0;
1083 
1084 	reqlen = cp->po + cp->len + len;
1085 
1086 	/* pending buffer is enough? */
1087 	if (reqlen > cp->bufsz)
1088 	{
1089 		/* too small, enlarge it */
1090 		realloc_size = (reqlen/READBUFSZ+1)*READBUFSZ;
1091 
1092         MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1093 		p = repalloc(cp->hp, realloc_size);
1094         MemoryContextSwitchTo(oldContext);
1095 
1096 		cp->bufsz = realloc_size;
1097 		cp->hp = p;
1098 	}
1099 
1100 	memmove(cp->hp + cp->po + cp->len, data, len);
1101 	cp->len += len;
1102 
1103 	return 0;
1104 }
1105 
1106 /*
1107  * consume pending data. returns actually consumed data length.
1108  */
consume_pending_data(POOL_CONNECTION * cp,void * data,int len)1109 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len)
1110 {
1111 	int consume_size;
1112 
1113 	if (cp->len <= 0)
1114 		return 0;
1115 
1116 	consume_size = Min(len, cp->len);
1117 	memmove(data, cp->hp + cp->po, consume_size);
1118 	cp->len -= consume_size;
1119 
1120 	if (cp->len <= 0)
1121 		cp->po = 0;
1122 	else
1123 		cp->po += consume_size;
1124 
1125 	return consume_size;
1126 }
1127 
1128 /*
1129  * pool_unread: Put back data to input buffer
1130  */
pool_unread(POOL_CONNECTION * cp,void * data,int len)1131 int pool_unread(POOL_CONNECTION *cp, void *data, int len)
1132 {
1133 	void *p = cp->hp;
1134 	int n = cp->len + len;
1135 	int realloc_size;
1136 
1137 	/*
1138 	 * Optimization to avoid mmove. If there's enough space in front of
1139 	 * existing data, we can use it.
1140 	 */
1141 	if (cp->po >= len)
1142 	{
1143 		memmove(cp->hp + cp->po - len, data, len);
1144 		cp->po -= len;
1145 		cp->len = n;
1146 		return 0;
1147 	}
1148 
1149 	if (cp->bufsz < n)
1150 	{
1151 		realloc_size = (n/READBUFSZ+1)*READBUFSZ;
1152 
1153         MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1154 		p = repalloc(cp->hp, realloc_size);
1155         MemoryContextSwitchTo(oldContext);
1156 
1157         cp->hp = p;
1158 		cp->bufsz = realloc_size;
1159 	}
1160 	if (cp->len != 0)
1161 		memmove(p + len, cp->hp + cp->po, cp->len);
1162 	memmove(p, data, len);
1163 	cp->len = n;
1164 	cp->po = 0;
1165 	return 0;
1166 }
1167 
1168 /*
1169  * pool_push: Push data into buffer stack.
1170  */
pool_push(POOL_CONNECTION * cp,void * data,int len)1171 int pool_push(POOL_CONNECTION *cp, void *data, int len)
1172 {
1173 	char *p;
1174 
1175 	ereport(DEBUG1,
1176 		(errmsg("pushing data of len: %d", len)));
1177 
1178 
1179     MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1180 
1181 	if (cp->bufsz3 == 0)
1182 	{
1183 		p = cp->buf3 = palloc(len);
1184 	}
1185 	else
1186 	{
1187 		cp->buf3 = repalloc(cp->buf3, cp->bufsz3 + len);
1188 		p = cp->buf3 + cp->bufsz3;
1189 	}
1190 
1191 	memcpy(p, data, len);
1192 	cp->bufsz3 += len;
1193 
1194     MemoryContextSwitchTo(oldContext);
1195 	return 0;
1196 }
1197 
1198 /*
1199  * pool_pop: Pop data from buffer stack and put back data using
1200  * pool_unread.
1201  */
pool_pop(POOL_CONNECTION * cp,int * len)1202 void pool_pop(POOL_CONNECTION *cp, int *len)
1203 {
1204 	if (cp->bufsz3 == 0)
1205 	{
1206 		*len = 0;
1207 		ereport(DEBUG1,
1208 				(errmsg("pop data of len: %d", *len)));
1209 		return;
1210 	}
1211 
1212 	pool_unread(cp, cp->buf3, cp->bufsz3);
1213 	*len = cp->bufsz3;
1214 	pfree(cp->buf3);
1215 	cp->bufsz3 = 0;
1216 	cp->buf3 = NULL;
1217 	ereport(DEBUG1,
1218 			(errmsg("pop data of len: %d", *len)));
1219 }
1220 
1221 /*
1222  * pool_stacklen: Returns buffer stack length
1223  * pool_unread.
1224  */
pool_stacklen(POOL_CONNECTION * cp)1225 int pool_stacklen(POOL_CONNECTION *cp)
1226 {
1227 	return cp->bufsz3;
1228 }
1229 
1230 /*
1231  * set non-block flag
1232  */
pool_set_nonblock(int fd)1233 void pool_set_nonblock(int fd)
1234 {
1235 	int var;
1236 
1237 	/* set fd to none blocking */
1238 	var = fcntl(fd, F_GETFL, 0);
1239 	if (var == -1)
1240 	{
1241         ereport(FATAL,
1242             (errmsg("unable to set options on socket"),
1243                  errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1244 
1245 	}
1246 	if (fcntl(fd, F_SETFL, var | O_NONBLOCK) == -1)
1247 	{
1248         ereport(FATAL,
1249             (errmsg("unable to set options on socket"),
1250                  errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1251 	}
1252 }
1253 
1254 /*
1255  * unset non-block flag
1256  */
pool_unset_nonblock(int fd)1257 void pool_unset_nonblock(int fd)
1258 {
1259 	int var;
1260 
1261 	/* set fd to none blocking */
1262 	var = fcntl(fd, F_GETFL, 0);
1263 	if (var == -1)
1264 	{
1265         ereport(FATAL,
1266             (errmsg("unable to set options on socket"),
1267                  errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1268 	}
1269 	if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
1270 	{
1271         ereport(FATAL,
1272             (errmsg("unable to set options on socket"),
1273                  errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1274 	}
1275 }
1276 
1277 #ifdef DEBUG
1278 /*
1279  * Debug aid
1280  */
dump_buffer(char * buf,int len)1281 static void dump_buffer(char *buf, int len)
1282 {
1283 	while (--len)
1284 	{
1285 		ereport(DEBUG1,
1286 				(errmsg("%02x", *buf++)));
1287 	}
1288 }
1289 #endif
socket_write(int fd,void * buf,size_t len)1290 int socket_write(int fd, void* buf, size_t len)
1291 {
1292 	int bytes_send = 0;
1293 	do
1294 	{
1295 		int ret;
1296 		ret = write(fd, buf + bytes_send, (len - bytes_send));
1297 		if (ret <=0)
1298 		{
1299 			if (errno == EINTR || errno == EAGAIN)
1300 			{
1301 				ereport(DEBUG1,
1302 						(errmsg("write on socket failed with error :\"%s\"",strerror(errno)),
1303 						 errdetail("retrying...")));
1304 				continue;
1305 			}
1306 			ereport(LOG,
1307 					(errmsg("write on socket failed with error :\"%s\"",strerror(errno))));
1308 			return -1;
1309 		}
1310 		bytes_send += ret;
1311 	}while (bytes_send < len);
1312 	return bytes_send;
1313 }
1314 
socket_read(int fd,void * buf,size_t len,int timeout)1315 int socket_read(int fd, void* buf, size_t len, int timeout)
1316 {
1317 	int ret, read_len;
1318 	read_len = 0;
1319 	struct timeval timeoutval;
1320 	fd_set readmask;
1321 	int fds;
1322 
1323 	while (read_len < len)
1324 	{
1325 		FD_ZERO(&readmask);
1326 		FD_SET(fd, &readmask);
1327 
1328 		timeoutval.tv_sec = timeout;
1329 		timeoutval.tv_usec = 0;
1330 
1331 		fds = select(fd+1, &readmask, NULL, NULL, timeout?&timeoutval:NULL);
1332 		if (fds == -1)
1333 		{
1334 			if (errno == EAGAIN || errno == EINTR)
1335 				continue;
1336 
1337 			ereport(WARNING,
1338 					(errmsg("select failed with error: \"%s\"", strerror(errno))));
1339 			return -1;
1340 		}
1341 		else if (fds == 0)
1342 		{
1343 			return -2;
1344 		}
1345 		ret = read(fd, buf + read_len, (len - read_len));
1346 		if(ret < 0)
1347 		{
1348 			if (errno == EINTR || errno == EAGAIN)
1349 			{
1350 				ereport(DEBUG1,
1351 						(errmsg("read from socket failed with error :\"%s\"",strerror(errno)),
1352 						 errdetail("retrying...")));
1353 				continue;
1354 			}
1355 			ereport(LOG,
1356 					(errmsg("read from socket failed with error :\"%s\"",strerror(errno))));
1357 			return -1;
1358 		}
1359 		if(ret == 0)
1360 		{
1361 			ereport(LOG,
1362 					(errmsg("read from socket failed, remote end closed the connection")));
1363 			return 0;
1364 		}
1365 		read_len +=ret;
1366 	}
1367 	return read_len;
1368 }
1369