1 /* -*-pgsql-c-*- */
2 /*
3 * $Header$
4 *
5 * pgpool: a language independent connection pool server for PostgreSQL
6 * written by Tatsuo Ishii
7 *
8 * Copyright (c) 2003-2018	PgPool Global Development Group
9 *
10 * Permission to use, copy, modify, and distribute this software and
11 * its documentation for any purpose and without fee is hereby
12 * granted, provided that the above copyright notice appear in all
13 * copies and that both that copyright notice and this permission
14 * notice appear in supporting documentation, and that the name of the
15 * author not be used in advertising or publicity pertaining to
16 * distribution of the software without specific, written prior
17 * permission. The author makes no representations about the
18 * suitability of this software for any purpose.  It is provided "as
19 * is" without express or implied warranty.
20 *
21 * pool_stream.c: stream I/O modules
22 *
23 */
24 
25 #include "config.h"
26 
27 #ifdef HAVE_SYS_SELECT_H
28 #include <sys/select.h>
29 #endif
30 
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <unistd.h>
36 
37 #ifdef HAVE_FCNTL_H
38 #include <fcntl.h>
39 #endif
40 
41 #include "pool.h"
42 #include "utils/elog.h"
43 #include "utils/palloc.h"
44 #include "utils/memutils.h"
45 #include "utils/pool_stream.h"
46 #include "pool_config.h"
47 
48 static int mystrlen(char *str, int upper, int *flag);
49 static int mystrlinelen(char *str, int upper, int *flag);
50 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len);
51 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len);
52 static MemoryContext SwitchToConnectionContext(bool backend_connection);
53 #ifdef DEBUG
54 static void dump_buffer(char *buf, int len);
55 #endif
56 static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len);
57 
58 static MemoryContext
SwitchToConnectionContext(bool backend_connection)59     SwitchToConnectionContext(bool backend_connection)
60 {
61     /*
62      * backend connection can live as long as process life,
63      * while the frontend connection is only for one session life.
64      * So we create backend connection in long living memory context
65      */
66     if(backend_connection)
67         return MemoryContextSwitchTo(TopMemoryContext);
68     else
69         return MemoryContextSwitchTo(ProcessLoopContext);
70 }
71 
72 /*
73 * open read/write file descriptors.
74 * returns POOL_CONNECTION on success otherwise NULL.
75 */
pool_open(int fd,bool backend_connection)76 POOL_CONNECTION *pool_open(int fd, bool backend_connection)
77 {
78 	POOL_CONNECTION *cp;
79 
80     MemoryContext oldContext = SwitchToConnectionContext(backend_connection);
81 
82 	cp = (POOL_CONNECTION *)palloc0(sizeof(POOL_CONNECTION));
83 
84 	/* initialize write buffer */
85 	cp->wbuf = palloc(WRITEBUFSZ);
86 	cp->wbufsz = WRITEBUFSZ;
87 	cp->wbufpo = 0;
88 
89 	/* initialize pending data buffer */
90 	cp->hp = palloc(READBUFSZ);
91 	cp->bufsz = READBUFSZ;
92 	cp->po = 0;
93 	cp->len = 0;
94 	cp->sbuf = NULL;
95 	cp->sbufsz = 0;
96 	cp->buf2 = NULL;
97 	cp->bufsz2 = 0;
98 	cp->buf3 = NULL;
99 	cp->bufsz3 = 0;
100 
101 	cp->fd = fd;
102 	cp->socket_state = POOL_SOCKET_VALID;
103 
104     MemoryContextSwitchTo(oldContext);
105 
106 	return cp;
107 }
108 
109 /*
110 * close read/write file descriptors.
111 */
pool_close(POOL_CONNECTION * cp)112 void pool_close(POOL_CONNECTION *cp)
113 {
114 	/*
115 	 * shutdown connection to the client so that pgpool is not blocked
116 	 */
117 	if (!cp->isbackend)
118 		shutdown(cp->fd, 1);
119 	close(cp->fd);
120 	cp->socket_state = POOL_SOCKET_CLOSED;
121 	pfree(cp->wbuf);
122 	pfree(cp->hp);
123 	if (cp->sbuf)
124 		pfree(cp->sbuf);
125 	if (cp->buf2)
126 		pfree(cp->buf2);
127 	if (cp->buf3)
128 		pfree(cp->buf3);
129 	pool_discard_params(&cp->params);
130 
131 	pool_ssl_close(cp);
132 
133 	pfree(cp);
134 }
135 
pool_read_with_error(POOL_CONNECTION * cp,void * buf,int len,const char * err_context)136 void pool_read_with_error(POOL_CONNECTION *cp, void *buf, int len,
137                           const char* err_context )
138 {
139 	if (pool_read(cp, buf, len) < 0)
140 	{
141         ereport(ERROR,
142 			(errmsg("failed to read data of length %d from DB node: %d",len,cp->db_node_id),
143                  errdetail("error occurred when reading: %s",err_context?err_context:"")));
144 	}
145 }
146 /*
147 * read len bytes from cp
148 * returns 0 on success otherwise throws an ereport.
149 */
pool_read(POOL_CONNECTION * cp,void * buf,int len)150 int pool_read(POOL_CONNECTION *cp, void *buf, int len)
151 {
152 	static char readbuf[READBUFSZ];
153 
154 	int consume_size;
155 	int readlen;
156 
157 	consume_size = consume_pending_data(cp, buf, len);
158 	len -= consume_size;
159 	buf += consume_size;
160 
161 	while (len > 0)
162 	{
163 		if (pool_check_fd(cp))
164 		{
165 			if (!IS_MASTER_NODE_ID(cp->db_node_id) && (getpid() != mypid))
166 			{
167                 ereport(FATAL,
168                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
169                          errdetail("data is not ready in DB node")));
170 
171 			}
172 			else
173 			{
174                 ereport(ERROR,
175                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
176                          errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
177 			}
178 		}
179 
180 		if (cp->ssl_active > 0)
181 		{
182 		  readlen = pool_ssl_read(cp, readbuf, READBUFSZ);
183 		}
184 		else
185 		{
186 		  readlen = read(cp->fd, readbuf, READBUFSZ);
187 		  if (cp->isbackend)
188 		  {
189 			  ereport(DEBUG1,
190 				  (errmsg("pool_read: read %d bytes from backend %d",
191 						  readlen, cp->db_node_id)));
192 #ifdef DEBUG
193 			  dump_buffer(readbuf, readlen);
194 #endif
195 		  }
196 		}
197 
198 		if (readlen == -1)
199 		{
200 			if (processType == PT_HEALTH_CHECK && health_check_timer_expired && errno == EINTR)
201 			{
202 				ereport(ERROR,
203 						(errmsg("health check timed out while waiting for reading data")));
204 				return -1;
205 			}
206 
207 			if (errno == EINTR || errno == EAGAIN)
208 			{
209 				ereport(DEBUG1,
210 					(errmsg("read on socket failed with error :\"%s\"",strerror(errno)),
211 						 errdetail("retrying...")));
212 				continue;
213 			}
214 
215 			cp->socket_state = POOL_SOCKET_ERROR;
216 			if (cp->isbackend)
217 			{
218 				if (cp->con_info && cp->con_info->swallow_termination == 1)
219 				{
220 					cp->con_info->swallow_termination = 0;
221 					ereport(FATAL,
222 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
223 							 errdetail("pg_terminate_backend was called on the backend")));
224 				}
225 
226 				/* if fail_over_on_backend_error is true, then trigger failover */
227 				if (pool_config->fail_over_on_backend_error)
228 				{
229 					notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
230 
231                     /* If we are in the main process, we will not exit */
232 					child_exit(POOL_EXIT_AND_RESTART);
233 					ereport(ERROR,
234 							(errmsg("unable to read data from DB node %d",cp->db_node_id),
235 							 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
236 				}
237 				else
238 				{
239 					ereport(ERROR,
240 							(errmsg("unable to read data from DB node %d",cp->db_node_id),
241 							 errdetail("socket read failed with an error \"%s\"", strerror(errno))));
242 				}
243 			}
244 			else
245 			{
246                 ereport(FRONTEND_ERROR,
247 					(errmsg("unable to read data from frontend"),
248                          errdetail("socket read failed with an error \"%s\"", strerror(errno))));
249 			}
250 		}
251 		else if (readlen == 0)
252 		{
253 			cp->socket_state = POOL_SOCKET_EOF;
254 			if (cp->isbackend)
255 			{
256 				if (processType == PT_MAIN || processType == PT_HEALTH_CHECK)
257 					ereport(ERROR,
258 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
259 							 errdetail("EOF encountered with backend")));
260 
261                 ereport(FATAL,
262 					(errmsg("unable to read data from DB node %d",cp->db_node_id),
263                          errdetail("EOF encountered with backend")));
264 			}
265 			else
266 			{
267 				/*
268 				 * if backend offers authentication method, frontend could close connection
269 				 */
270                 ereport(FRONTEND_ERROR,
271 					(errmsg("unable to read data from frontend"),
272                          errdetail("EOF encountered with frontend")));
273 			}
274 		}
275 
276 		if (len < readlen)
277 		{
278 			/* overrun. we need to save remaining data to pending buffer */
279 			save_pending_data(cp, readbuf+len, readlen-len);
280 			memmove(buf, readbuf, len);
281 			break;
282 		}
283 
284 		memmove(buf, readbuf, readlen);
285 		buf += readlen;
286 		len -= readlen;
287 	}
288 
289 	return 0;
290 }
291 
292 /*
293 * read exactly len bytes from cp
294 * returns buffer address on success otherwise NULL.
295 */
pool_read2(POOL_CONNECTION * cp,int len)296 char *pool_read2(POOL_CONNECTION *cp, int len)
297 {
298 	char *buf;
299 	int req_size;
300 	int alloc_size;
301 	int consume_size;
302 	int readlen;
303     MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
304 
305 	req_size = cp->len + len;
306 
307 	if (req_size > cp->bufsz2)
308 	{
309 		alloc_size = ((req_size+1)/READBUFSZ+1)*READBUFSZ;
310 		cp->buf2 = repalloc(cp->buf2, alloc_size);
311 		cp->bufsz2 = alloc_size;
312 	}
313 
314 	buf = cp->buf2;
315 
316 	consume_size = consume_pending_data(cp, buf, len);
317 	len -= consume_size;
318 	buf += consume_size;
319 
320 	while (len > 0)
321 	{
322 		if (pool_check_fd(cp))
323 		{
324 			if (!IS_MASTER_NODE_ID(cp->db_node_id))
325 			{
326                 ereport(FATAL,
327                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
328                          errdetail("data is not ready in DB node")));
329 
330 			}
331 			else
332 			{
333                 ereport(ERROR,
334                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
335                          errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
336             }
337 		}
338 
339 		if (cp->ssl_active > 0)
340 		{
341 		  readlen = pool_ssl_read(cp, buf, len);
342 		}
343 		else
344 		{
345 		  readlen = read(cp->fd, buf, len);
346 		  if (cp->isbackend)
347 			  ereport(DEBUG1,
348 				  (errmsg("pool_read2: read %d bytes from backend %d",
349 						  readlen, cp->db_node_id)));
350 		}
351 
352 		if (readlen == -1)
353 		{
354 			if (errno == EINTR || errno == EAGAIN)
355 			{
356 				ereport(DEBUG1,
357 					(errmsg("read on socket failed with error :\"%s\"",strerror(errno)),
358 						 errdetail("retrying...")));
359 				continue;
360 			}
361 
362 			cp->socket_state = POOL_SOCKET_ERROR;
363 			if (cp->isbackend)
364 			{
365 				if (cp->con_info && cp->con_info->swallow_termination == 1)
366 				{
367 					cp->con_info->swallow_termination = 0;
368 					ereport(FATAL,
369 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
370 							 errdetail("pg_terminate_backend was called on the backend")));
371 				}
372 
373 				/* if fail_over_on_backend_error is true, then trigger failover */
374 				if (pool_config->fail_over_on_backend_error)
375 				{
376 					notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
377 					child_exit(POOL_EXIT_AND_RESTART);
378                     /* we are in main process */
379                     ereport(ERROR,
380                             (errmsg("unable to read data from DB node %d",cp->db_node_id),
381                              errdetail("socket read failed with an error \"%s\"", strerror(errno))));
382 				}
383 				else
384 				{
385                     ereport(ERROR,
386                         (errmsg("unable to read data from DB node %d",cp->db_node_id),
387                              errdetail("do not failover because fail_over_on_backend_error is off")));
388 				}
389 			}
390 			else
391 			{
392                 ereport(ERROR,
393                     (errmsg("unable to read data from frontend"),
394                          errdetail("socket read function returned -1")));
395 			}
396 		}
397 		else if (readlen == 0)
398 		{
399 			cp->socket_state = POOL_SOCKET_EOF;
400 			if (cp->isbackend)
401 			{
402                 ereport(ERROR,
403                     (errmsg("unable to read data from backend"),
404                          errdetail("EOF read on socket")));
405 			}
406 			else
407 			{
408 				/*
409 				 * if backend offers authentication method, frontend could close connection
410 				 */
411                 ereport(ERROR,
412                     (errmsg("unable to read data from frontend"),
413                          errdetail("EOF read on socket")));
414 
415 			}
416 		}
417 
418 		buf += readlen;
419 		len -= readlen;
420 	}
421     MemoryContextSwitchTo(oldContext);
422 	return cp->buf2;
423 }
424 
425 /*
426  * write len bytes to cp the write buffer.
427  * returns 0 on success otherwise -1.
428  */
pool_write_noerror(POOL_CONNECTION * cp,void * buf,int len)429 int pool_write_noerror(POOL_CONNECTION *cp, void *buf, int len)
430 {
431 	if (len < 0)
432         return -1;
433 
434 	if (cp->no_forward)
435 		return 0;
436 
437 	if (len == 1 && cp->isbackend)
438 	{
439 		char c;
440 
441 		c = ((char *)buf)[0];
442 
443 		ereport(DEBUG1,
444 				(errmsg("pool_write: to backend: %d kind:%c", cp->db_node_id, c)));
445 	}
446 
447 	if (!cp->isbackend)
448 	{
449 		char c;
450 
451 		c = ((char *)buf)[0];
452 
453 		if (len == 1)
454 			ereport(DEBUG1,
455 					(errmsg("pool_write: to frontend: kind:%c po:%d", c, cp->wbufpo)));
456 		else
457 			ereport(DEBUG1,
458 					(errmsg("pool_write: to frontend: length:%d po:%d", len, cp->wbufpo)));
459 	}
460 
461 	while (len > 0)
462 	{
463 		int remainder = WRITEBUFSZ - cp->wbufpo;
464 
465 		/*
466 		 * If requested data cannot be added to the write buffer, flush the
467 		 * buffer and directly write the requested data.  This could avoid
468 		 * unwanted write in the middle of message boundary.
469 		 */
470 		if (remainder < len)
471 		{
472 			if (pool_flush_it(cp) == -1)
473 				return -1;
474 
475 			if (pool_write_flush(cp, buf, len) < 0)
476 				return -1;
477 			return 0;
478 		}
479 
480 		if (cp->wbufpo >= WRITEBUFSZ)
481 		{
482 			/*
483 			 * Write buffer is full. so flush buffer.
484 			 * wbufpo is reset in pool_flush_it().
485 			 */
486 			if (pool_flush_it(cp) == -1)
487 				return -1;
488             remainder = WRITEBUFSZ;
489 		}
490 
491 		/* check buffer size */
492 		remainder = Min(remainder, len);
493 
494 		memcpy(cp->wbuf+cp->wbufpo, buf, remainder);
495 		cp->wbufpo += remainder;
496 		buf += remainder;
497 		len -= remainder;
498 	}
499 	return 0;
500 }
501 
502 /*
503  * write len bytes to cp the write buffer.
504  * returns 0 on success otherwise ereport.
505  */
pool_write(POOL_CONNECTION * cp,void * buf,int len)506 int pool_write(POOL_CONNECTION *cp, void *buf, int len)
507 {
508 	if (len < 0)
509         ereport(ERROR,
510 			(errmsg("unable to write data to %s",cp->isbackend?"backend":"frontend"),
511                  errdetail("invalid data size: %d", len)));
512 
513     if(pool_write_noerror(cp,buf,len))
514         ereport(ERROR,
515             (errmsg("unable to write data to %s",cp->isbackend?"backend":"frontend"),
516                  errdetail("pool_flush failed")));
517 
518     return 0;
519 }
520 
521 
522 /*
523  * Direct write.
524  * This function does not throws an ereport in case of an error
525  */
pool_write_flush(POOL_CONNECTION * cp,void * buf,int len)526 static int pool_write_flush(POOL_CONNECTION *cp, void *buf, int len)
527 {
528 	int sts;
529 	int wlen;
530 	int offset;
531 	wlen = len;
532 
533 	ereport(DEBUG1,
534 			(errmsg("pool_write_flush: write size: %d", wlen)));
535 
536 	if (wlen == 0)
537 	{
538 		return 0;
539 	}
540 
541 	offset = 0;
542 
543 	for (;;)
544 	{
545 		errno = 0;
546 
547 		if (cp->ssl_active > 0)
548 		{
549 			sts = pool_ssl_write(cp, buf+offset, wlen);
550 		}
551 		else
552 		{
553 			sts = write(cp->fd, buf+offset, wlen);
554 		}
555 
556 		if (sts >= 0)
557 		{
558 			wlen -= sts;
559 
560 			if (wlen == 0)
561 			{
562 				/* write completed */
563 				break;
564 			}
565 
566 			else if (wlen < 0)
567 			{
568 				ereport(WARNING,
569 						(errmsg("pool_write_flush: invalid write size %d", sts)));
570 				return -1;
571 			}
572 
573 			else
574 			{
575 				/* need to write remaining data */
576 				ereport(DEBUG1,
577 						(errmsg("pool_write_flush: write retry: %d", wlen)));
578 
579 				offset += sts;
580 				continue;
581 			}
582 		}
583 
584 		else if (errno == EAGAIN || errno == EINTR)
585 		{
586 			continue;
587 		}
588 
589 		else
590 		{
591 			/* If this is the backend stream, report error. Otherwise
592 			 * just report debug message.
593 			 */
594 			if (cp->isbackend)
595 				ereport(WARNING,
596 					(errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
597 						 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
598 			else
599 				ereport(DEBUG1,
600 					(errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
601 						 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
602 			return -1;
603 		}
604 	}
605 
606 	return 0;
607 }
608 
609 /*
610  * flush write buffer
611  * This function does not throws an ereport in case of an error
612  */
pool_flush_it(POOL_CONNECTION * cp)613 int pool_flush_it(POOL_CONNECTION *cp)
614 {
615 	int sts;
616 	int wlen;
617 	int offset;
618 	wlen = cp->wbufpo;
619 
620 	ereport(DEBUG1,
621 			(errmsg("pool_flush_it: flush size: %d", wlen)));
622 
623 	if (wlen == 0)
624 	{
625 		return 0;
626 	}
627 
628 	offset = 0;
629 
630 	for (;;)
631 	{
632 		errno = 0;
633 
634 		if (cp->ssl_active > 0)
635 		{
636 		  sts = pool_ssl_write(cp, cp->wbuf + offset, wlen);
637 		}
638 		else
639 		{
640 		  sts = write(cp->fd, cp->wbuf + offset, wlen);
641 		}
642 
643 		if (sts >= 0)
644 		{
645 			wlen -= sts;
646 
647 			if (wlen == 0)
648 			{
649 				/* write completed */
650 				break;
651 			}
652 
653 			else if (wlen < 0)
654 			{
655 				ereport(WARNING,
656 						(errmsg("pool_flush_it: invalid write size %d", sts)));
657 				cp->wbufpo = 0;
658 				return -1;
659 			}
660 
661 			else
662 			{
663 				/* need to write remaining data */
664 				ereport(DEBUG1,
665 						(errmsg("pool_flush_it: write retry: %d", wlen)));
666 
667 				offset += sts;
668 				continue;
669 			}
670 		}
671 
672 		else if (errno == EAGAIN || errno == EINTR)
673 		{
674 			continue;
675 		}
676 
677 		else
678 		{
679 			/* If this is the backend stream, report error. Otherwise
680 			 * just report debug message.
681 			 */
682 			if (cp->isbackend)
683 				ereport(WARNING,
684 					(errmsg("write on backend %d failed with error :\"%s\"",cp->db_node_id,strerror(errno)),
685 						 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
686 			else
687 				ereport(DEBUG1,
688 					(errmsg("write on frontend failed with error :\"%s\"",strerror(errno)),
689 						 errdetail("while trying to write data from offset: %d wlen: %d",offset, wlen)));
690 			cp->wbufpo = 0;
691 			return -1;
692 		}
693 	}
694 
695 	cp->wbufpo = 0;
696 
697 	return 0;
698 }
699 
700 /*
701  * flush write buffer and degenerate/failover if error occurs
702  */
pool_flush(POOL_CONNECTION * cp)703 int pool_flush(POOL_CONNECTION *cp)
704 {
705 	if (pool_flush_it(cp) == -1)
706 	{
707 		if (cp->isbackend)
708 		{
709 			if (cp->con_info && cp->con_info->swallow_termination == 1)
710 			{
711 				cp->con_info->swallow_termination = 0;
712 				ereport(FATAL,
713 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
714 						 errdetail("pg_terminate_backend was called on the backend")));
715 			}
716 
717 			/* if fail_over_on_backend_error is true, then trigger failover */
718 			if (pool_config->fail_over_on_backend_error)
719 			{
720 				notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
721 				ereport(LOG,
722 					(errmsg("unable to flush data to backend"),
723 						 errdetail("do not failover because I am the main process")));
724 
725 				child_exit(POOL_EXIT_AND_RESTART);
726 				return -1;
727 			}
728 			else
729 			{
730                 ereport(ERROR,
731                     (errmsg("unable to flush data to backend"),
732                          errdetail("do not failover because fail_over_on_backend_error is off")));
733 			}
734 		}
735 		else
736 		{
737 			/*
738 			 * If we are in replication mode, we need to continue the
739 			 * processing with backends to keep consistency among
740 			 * backends, thus ignore error.
741 			 */
742 			if (REPLICATION)
743                 ereport(NOTICE,
744                     (errmsg("unable to flush data to frontend"),
745                          errdetail("pgpool is in replication mode, ignoring error to keep consistency among backends")));
746 			else
747                 ereport(FRONTEND_ERROR,
748                     (errmsg("unable to flush data to frontend")));
749 
750 		}
751 	}
752 	return 0;
753 }
754 
755 /*
756  * same as pool_flush() but returns -ve value instead of ereport in case of failure
757  */
pool_flush_noerror(POOL_CONNECTION * cp)758 int pool_flush_noerror(POOL_CONNECTION *cp)
759 {
760     if (pool_flush_it(cp) == -1)
761     {
762         if (cp->isbackend)
763         {
764 			if (cp->con_info && cp->con_info->swallow_termination == 1)
765 			{
766 				cp->con_info->swallow_termination = 0;
767 				ereport(FATAL,
768 						(errmsg("unable to read data from DB node %d",cp->db_node_id),
769 						 errdetail("pg_terminate_backend was called on the backend")));
770 			}
771 
772             /* if fail_over_on_backend_erro is true, then trigger failover */
773             if (pool_config->fail_over_on_backend_error)
774             {
775                 notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
776                 child_exit(POOL_EXIT_AND_RESTART);
777 				ereport(LOG,
778 					(errmsg("unable to flush data to backend"),
779 						 errdetail("do not failover because I am the main process")));
780                 return -1;
781             }
782             else
783             {
784 				ereport(LOG,
785 					(errmsg("unable to flush data to backend"),
786                          errdetail("do not failover because fail_over_on_backend_error is off")));
787                 return -1;
788             }
789         }
790         else
791         {
792             /*
793              * If we are in replication mode, we need to continue the
794              * processing with backends to keep consistency among
795              * backends, thus ignore error.
796              */
797             if (REPLICATION)
798                 return 0;
799             else
800                 return -1;
801         }
802     }
803     return 0;
804 }
805 
806 /*
807  * combo of pool_write and pool_flush
808  */
pool_write_and_flush(POOL_CONNECTION * cp,void * buf,int len)809 void pool_write_and_flush(POOL_CONNECTION *cp, void *buf, int len)
810 {
811 	pool_write(cp, buf, len);
812 	pool_flush(cp);
813 }
814 
815 /*
816  * same as pool_write_and_flush() but does not throws ereport when error occures
817  */
pool_write_and_flush_noerror(POOL_CONNECTION * cp,void * buf,int len)818 int pool_write_and_flush_noerror(POOL_CONNECTION *cp, void *buf, int len)
819 {
820 	int ret;
821 	ret = pool_write_noerror(cp,buf,len);
822 	if(ret == 0)
823 		return pool_flush_noerror(cp);
824 	return ret;
825 }
826 
827 /*
828  * read a string until EOF or NULL is encountered.
829  * if line is not 0, read until new line is encountered.
830 */
pool_read_string(POOL_CONNECTION * cp,int * len,int line)831 char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
832 {
833 	int readp;
834 	int readsize;
835 	int readlen;
836 	int strlength;
837 	int flag;
838 	int consume_size;
839 
840 #ifdef DEBUG
841 	static char pbuf[READBUFSZ];
842 #endif
843 
844 	*len = 0;
845 	readp = 0;
846 
847 	/* initialize read buffer */
848 	if (cp->sbufsz == 0)
849 	{
850         MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
851 		cp->sbuf = palloc(READBUFSZ);
852         MemoryContextSwitchTo(oldContext);
853 
854 		cp->sbufsz = READBUFSZ;
855 		*cp->sbuf = '\0';
856 	}
857 
858 	/* any pending data? */
859 	if (cp->len)
860 	{
861 		if (line)
862 			strlength = mystrlinelen(cp->hp+cp->po, cp->len, &flag);
863 		else
864 			strlength = mystrlen(cp->hp+cp->po, cp->len, &flag);
865 
866 		/* buffer is too small? */
867 		if ((strlength + 1) > cp->sbufsz)
868 		{
869             MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
870 			cp->sbufsz = ((strlength+1)/READBUFSZ+1)*READBUFSZ;
871 			cp->sbuf = repalloc(cp->sbuf, cp->sbufsz);
872             MemoryContextSwitchTo(oldContext);
873 		}
874 
875 		/* consume pending and save to read string buffer */
876 		consume_size = consume_pending_data(cp, cp->sbuf, strlength);
877 
878 		*len = strlength;
879 
880 		/* is the string null terminated? */
881 		if (consume_size == strlength && !flag)
882 		{
883 			/* not null or line terminated.
884 			 * we need to read more since we have not encountered NULL or new line yet
885 			 */
886 			readsize = cp->sbufsz - strlength;
887 			readp = strlength;
888 		}
889 		else
890 		{
891 			ereport(DEBUG1,
892 				(errmsg("reading string data"),
893 					 errdetail("read all from pending data. po:%d len:%d",
894 							   cp->po, cp->len)));
895 			return cp->sbuf;
896 		}
897 	} else
898 	{
899 		readsize = cp->sbufsz;
900 	}
901 
902 	for (;;)
903 	{
904 		if (pool_check_fd(cp))
905 		{
906 			if (!IS_MASTER_NODE_ID(cp->db_node_id))
907 			{
908                 ereport(FATAL,
909                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
910                          errdetail("data is not ready in DB node")));
911 
912 			}
913 			else
914 			{
915                 ereport(ERROR,
916                     (errmsg("unable to read data from DB node %d",cp->db_node_id),
917                          errdetail("pool_check_fd call failed with an error \"%s\"", strerror(errno))));
918             }
919 		}
920 
921 		if (cp->ssl_active > 0)
922 		{
923 		  readlen = pool_ssl_read(cp, cp->sbuf+readp, readsize);
924 		}
925 		else
926 		{
927 		  readlen = read(cp->fd, cp->sbuf+readp, readsize);
928 		}
929 
930 		if (readlen == -1)
931 		{
932 			cp->socket_state = POOL_SOCKET_ERROR;
933 			if (cp->isbackend)
934 			{
935 				if (cp->con_info && cp->con_info->swallow_termination == 1)
936 				{
937 					cp->con_info->swallow_termination = 0;
938 					ereport(FATAL,
939 							(errmsg("unable to read data from DB node %d",cp->db_node_id),
940 							 errdetail("pg_terminate_backend was called on the backend")));
941 				}
942 
943 				notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
944 				child_exit(POOL_EXIT_AND_RESTART);
945                 ereport(ERROR,
946                         (errmsg("unable to read data from frontend"),
947                          errdetail("socket read function returned -1")));
948 			}
949 			else
950 			{
951                 ereport(ERROR,
952                     (errmsg("unable to read data from frontend"),
953                          errdetail("socket read function returned -1")));
954 
955 			}
956 		}
957 		else if (readlen == 0)	/* EOF detected */
958 		{
959 			/*
960 			 * just returns an error, not trigger failover or degeneration
961 			 */
962 			cp->socket_state = POOL_SOCKET_EOF;
963             ereport(ERROR,
964                 (errmsg("unable to read data from %s",cp->isbackend?"backend":"frontend"),
965                      errdetail("EOF read on socket")));
966 
967 		}
968 
969 		/* check overrun */
970 		if (line)
971 			strlength = mystrlinelen(cp->sbuf+readp, readlen, &flag);
972 		else
973 			strlength = mystrlen(cp->sbuf+readp, readlen, &flag);
974 
975 		if (strlength < readlen)
976 		{
977 			save_pending_data(cp, cp->sbuf+readp+strlength, readlen-strlength);
978 			*len += strlength;
979 			ereport(DEBUG1,
980 				(errmsg("reading string data"),
981 					 errdetail("total read %d with pending data po:%d len:%d", *len, cp->po, cp->len)));
982 			return cp->sbuf;
983 		}
984 
985 		*len += readlen;
986 
987 		/* encountered null or newline? */
988 		if (flag)
989 		{
990 			/* ok we have read all data */
991 			ereport(DEBUG1,
992 				(errmsg("reading string data"),
993 					 errdetail("all data read: total read %d", *len)));
994 			break;
995 		}
996 
997 		readp += readlen;
998 		readsize = READBUFSZ;
999 
1000 		if ((*len+readsize) > cp->sbufsz)
1001 		{
1002 			cp->sbufsz += READBUFSZ;
1003             MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1004 			cp->sbuf = repalloc(cp->sbuf, cp->sbufsz);
1005             MemoryContextSwitchTo(oldContext);
1006 		}
1007 	}
1008 	return cp->sbuf;
1009 }
1010 
1011 /*
1012  * Set db node id to connection.
1013  */
pool_set_db_node_id(POOL_CONNECTION * con,int db_node_id)1014 void pool_set_db_node_id(POOL_CONNECTION *con, int db_node_id)
1015 {
1016 	if (!con)
1017 		return;
1018 	con->db_node_id = db_node_id;
1019 }
1020 
1021 /*
1022  * returns the byte length of str, including \0, no more than upper.
1023  * if encountered \0, flag is set to non 0.
1024  * example:
1025  *	mystrlen("abc", 2) returns 2
1026  *	mystrlen("abc", 3) returns 3
1027  *	mystrlen("abc", 4) returns 4
1028  *	mystrlen("abc", 5) returns 4
1029  */
mystrlen(char * str,int upper,int * flag)1030 static int mystrlen(char *str, int upper, int *flag)
1031 {
1032 	int len;
1033 
1034 	*flag = 0;
1035 
1036 	for (len = 0;len < upper; len++, str++)
1037 	{
1038 	    if (!*str)
1039 	    {
1040 			len++;
1041 			*flag = 1;
1042 			break;
1043 	    }
1044 	}
1045 	return len;
1046 }
1047 
1048 /*
1049  * returns the byte length of str terminated by \n or \0 (including \n or \0), no more than upper.
1050  * if encountered \0 or \n, flag is set to non 0.
1051  * example:
1052  *	mystrlinelen("abc", 2) returns 2
1053  *	mystrlinelen("abc", 3) returns 3
1054  *	mystrlinelen("abc", 4) returns 4
1055  *	mystrlinelen("abc", 5) returns 4
1056  *	mystrlinelen("abcd\nefg", 4) returns 4
1057  *	mystrlinelen("abcd\nefg", 5) returns 5
1058  *	mystrlinelen("abcd\nefg", 6) returns 5
1059  */
mystrlinelen(char * str,int upper,int * flag)1060 static int mystrlinelen(char *str, int upper, int *flag)
1061 {
1062 	int len;
1063 
1064 	*flag = 0;
1065 
1066 	for (len = 0;len < upper; len++, str++)
1067 	{
1068 	    if (!*str || *str == '\n')
1069 	    {
1070 			len++;
1071 			*flag = 1;
1072 			break;
1073 	    }
1074 	}
1075 	return len;
1076 }
1077 
1078 /*
1079  * save pending data
1080  */
save_pending_data(POOL_CONNECTION * cp,void * data,int len)1081 static int save_pending_data(POOL_CONNECTION *cp, void *data, int len)
1082 {
1083 	int reqlen;
1084 	size_t realloc_size;
1085 	char *p;
1086 
1087 	/* to be safe */
1088 	if (cp->len == 0)
1089 		cp->po = 0;
1090 
1091 	reqlen = cp->po + cp->len + len;
1092 
1093 	/* pending buffer is enough? */
1094 	if (reqlen > cp->bufsz)
1095 	{
1096 		/* too small, enlarge it */
1097 		realloc_size = (reqlen/READBUFSZ+1)*READBUFSZ;
1098 
1099         MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1100 		p = repalloc(cp->hp, realloc_size);
1101         MemoryContextSwitchTo(oldContext);
1102 
1103 		cp->bufsz = realloc_size;
1104 		cp->hp = p;
1105 	}
1106 
1107 	memmove(cp->hp + cp->po + cp->len, data, len);
1108 	cp->len += len;
1109 
1110 	return 0;
1111 }
1112 
1113 /*
1114  * consume pending data. returns actually consumed data length.
1115  */
consume_pending_data(POOL_CONNECTION * cp,void * data,int len)1116 static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len)
1117 {
1118 	int consume_size;
1119 
1120 	if (cp->len <= 0)
1121 		return 0;
1122 
1123 	consume_size = Min(len, cp->len);
1124 	memmove(data, cp->hp + cp->po, consume_size);
1125 	cp->len -= consume_size;
1126 
1127 	if (cp->len <= 0)
1128 		cp->po = 0;
1129 	else
1130 		cp->po += consume_size;
1131 
1132 	return consume_size;
1133 }
1134 
1135 /*
1136  * pool_unread: Put back data to input buffer
1137  */
pool_unread(POOL_CONNECTION * cp,void * data,int len)1138 int pool_unread(POOL_CONNECTION *cp, void *data, int len)
1139 {
1140 	void *p = cp->hp;
1141 	int n = cp->len + len;
1142 	int realloc_size;
1143 
1144 	/*
1145 	 * Optimization to avoid mmove. If there's enough space in front of
1146 	 * existing data, we can use it.
1147 	 */
1148 	if (cp->po >= len)
1149 	{
1150 		memmove(cp->hp + cp->po - len, data, len);
1151 		cp->po -= len;
1152 		cp->len = n;
1153 		return 0;
1154 	}
1155 
1156 	if (cp->bufsz < n)
1157 	{
1158 		realloc_size = (n/READBUFSZ+1)*READBUFSZ;
1159 
1160         MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1161 		p = repalloc(cp->hp, realloc_size);
1162         MemoryContextSwitchTo(oldContext);
1163 
1164         cp->hp = p;
1165 		cp->bufsz = realloc_size;
1166 	}
1167 	if (cp->len != 0)
1168 		memmove(p + len, cp->hp + cp->po, cp->len);
1169 	memmove(p, data, len);
1170 	cp->len = n;
1171 	cp->po = 0;
1172 	return 0;
1173 }
1174 
1175 /*
1176  * pool_push: Push data into buffer stack.
1177  */
pool_push(POOL_CONNECTION * cp,void * data,int len)1178 int pool_push(POOL_CONNECTION *cp, void *data, int len)
1179 {
1180 	char *p;
1181 
1182 	ereport(DEBUG1,
1183 		(errmsg("pushing data of len: %d", len)));
1184 
1185 
1186     MemoryContext oldContext = SwitchToConnectionContext(cp->isbackend);
1187 
1188 	if (cp->bufsz3 == 0)
1189 	{
1190 		p = cp->buf3 = palloc(len);
1191 	}
1192 	else
1193 	{
1194 		cp->buf3 = repalloc(cp->buf3, cp->bufsz3 + len);
1195 		p = cp->buf3 + cp->bufsz3;
1196 	}
1197 
1198 	memcpy(p, data, len);
1199 	cp->bufsz3 += len;
1200 
1201     MemoryContextSwitchTo(oldContext);
1202 	return 0;
1203 }
1204 
1205 /*
1206  * pool_pop: Pop data from buffer stack and put back data using
1207  * pool_unread.
1208  */
pool_pop(POOL_CONNECTION * cp,int * len)1209 void pool_pop(POOL_CONNECTION *cp, int *len)
1210 {
1211 	if (cp->bufsz3 == 0)
1212 	{
1213 		*len = 0;
1214 		ereport(DEBUG1,
1215 				(errmsg("pop data of len: %d", *len)));
1216 		return;
1217 	}
1218 
1219 	pool_unread(cp, cp->buf3, cp->bufsz3);
1220 	*len = cp->bufsz3;
1221 	pfree(cp->buf3);
1222 	cp->bufsz3 = 0;
1223 	cp->buf3 = NULL;
1224 	ereport(DEBUG1,
1225 			(errmsg("pop data of len: %d", *len)));
1226 }
1227 
1228 /*
1229  * pool_stacklen: Returns buffer stack length
1230  * pool_unread.
1231  */
pool_stacklen(POOL_CONNECTION * cp)1232 int pool_stacklen(POOL_CONNECTION *cp)
1233 {
1234 	return cp->bufsz3;
1235 }
1236 
1237 /*
1238  * set non-block flag
1239  */
pool_set_nonblock(int fd)1240 void pool_set_nonblock(int fd)
1241 {
1242 	int var;
1243 
1244 	/* set fd to none blocking */
1245 	var = fcntl(fd, F_GETFL, 0);
1246 	if (var == -1)
1247 	{
1248         ereport(FATAL,
1249             (errmsg("unable to set options on socket"),
1250                  errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1251 
1252 	}
1253 	if (fcntl(fd, F_SETFL, var | O_NONBLOCK) == -1)
1254 	{
1255         ereport(FATAL,
1256             (errmsg("unable to set options on socket"),
1257                  errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1258 	}
1259 }
1260 
1261 /*
1262  * unset non-block flag
1263  */
pool_unset_nonblock(int fd)1264 void pool_unset_nonblock(int fd)
1265 {
1266 	int var;
1267 
1268 	/* set fd to none blocking */
1269 	var = fcntl(fd, F_GETFL, 0);
1270 	if (var == -1)
1271 	{
1272         ereport(FATAL,
1273             (errmsg("unable to set options on socket"),
1274                  errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1275 	}
1276 	if (fcntl(fd, F_SETFL, var & ~O_NONBLOCK) == -1)
1277 	{
1278         ereport(FATAL,
1279             (errmsg("unable to set options on socket"),
1280                  errdetail("fcntl system call failed with error \"%s\"", strerror(errno))));
1281 	}
1282 }
1283 
1284 #ifdef DEBUG
1285 /*
1286  * Debug aid
1287  */
dump_buffer(char * buf,int len)1288 static void dump_buffer(char *buf, int len)
1289 {
1290 	while (--len)
1291 	{
1292 		ereport(DEBUG1,
1293 				(errmsg("%02x", *buf++)));
1294 	}
1295 }
1296 #endif
socket_write(int fd,void * buf,size_t len)1297 int socket_write(int fd, void* buf, size_t len)
1298 {
1299 	int bytes_send = 0;
1300 	do
1301 	{
1302 		int ret;
1303 		ret = write(fd, buf + bytes_send, (len - bytes_send));
1304 		if (ret <=0)
1305 		{
1306 			if (errno == EINTR || errno == EAGAIN)
1307 			{
1308 				ereport(DEBUG1,
1309 						(errmsg("write on socket failed with error :\"%s\"",strerror(errno)),
1310 						 errdetail("retrying...")));
1311 				continue;
1312 			}
1313 			ereport(LOG,
1314 					(errmsg("write on socket failed with error :\"%s\"",strerror(errno))));
1315 			return -1;
1316 		}
1317 		bytes_send += ret;
1318 	}while (bytes_send < len);
1319 	return bytes_send;
1320 }
1321 
socket_read(int fd,void * buf,size_t len,int timeout)1322 int socket_read(int fd, void* buf, size_t len, int timeout)
1323 {
1324 	int ret, read_len;
1325 	read_len = 0;
1326 	struct timeval timeoutval;
1327 	fd_set readmask;
1328 	int fds;
1329 
1330 	while (read_len < len)
1331 	{
1332 		FD_ZERO(&readmask);
1333 		FD_SET(fd, &readmask);
1334 
1335 		timeoutval.tv_sec = timeout;
1336 		timeoutval.tv_usec = 0;
1337 
1338 		fds = select(fd+1, &readmask, NULL, NULL, timeout?&timeoutval:NULL);
1339 		if (fds == -1)
1340 		{
1341 			if (errno == EAGAIN || errno == EINTR)
1342 				continue;
1343 
1344 			ereport(WARNING,
1345 					(errmsg("select failed with error: \"%s\"", strerror(errno))));
1346 			return -1;
1347 		}
1348 		else if (fds == 0)
1349 		{
1350 			return -2;
1351 		}
1352 		ret = read(fd, buf + read_len, (len - read_len));
1353 		if(ret < 0)
1354 		{
1355 			if (errno == EINTR || errno == EAGAIN)
1356 			{
1357 				ereport(DEBUG1,
1358 						(errmsg("read from socket failed with error :\"%s\"",strerror(errno)),
1359 						 errdetail("retrying...")));
1360 				continue;
1361 			}
1362 			ereport(LOG,
1363 					(errmsg("read from socket failed with error :\"%s\"",strerror(errno))));
1364 			return -1;
1365 		}
1366 		if(ret == 0)
1367 		{
1368 			ereport(LOG,
1369 					(errmsg("read from socket failed, remote end closed the connection")));
1370 			return 0;
1371 		}
1372 		read_len +=ret;
1373 	}
1374 	return read_len;
1375 }
1376