1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *	  Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the old COPY OUT protocol.
10  * Unfortunately, COPY OUT was designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an ereport(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
16  * set the DoingCopyOut flag.)
17  *
18  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19  * pq_putbytes(), especially if the message would require multiple calls
20  * to send.  Instead, use the routines in pqformat.c to construct the message
21  * in a buffer and then emit it in one call to pq_putmessage.  This ensures
22  * that the channel will not be clogged by an incomplete message if execution
23  * is aborted by ereport(ERROR) partway through the message.  The only
24  * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
25  *
26  * At one time, libpq was shared between frontend and backend, but now
27  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28  * All that remains is similarities of names to trap the unwary...
29  *
30  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
31  * Portions Copyright (c) 1994, Regents of the University of California
32  *
33  *	src/backend/libpq/pqcomm.c
34  *
35  *-------------------------------------------------------------------------
36  */
37 
38 /*------------------------
39  * INTERFACE ROUTINES
40  *
41  * setup/teardown:
42  *		StreamServerPort	- Open postmaster's server port
43  *		StreamConnection	- Create new connection with client
44  *		StreamClose			- Close a client/backend connection
45  *		TouchSocketFiles	- Protect socket files against /tmp cleaners
46  *		pq_init			- initialize libpq at backend startup
47  *		pq_comm_reset	- reset libpq during error recovery
48  *		pq_close		- shutdown libpq at backend exit
49  *
50  * low-level I/O:
51  *		pq_getbytes		- get a known number of bytes from connection
52  *		pq_getstring	- get a null terminated string from connection
53  *		pq_getmessage	- get a message with length word from connection
54  *		pq_getbyte		- get next byte from connection
55  *		pq_peekbyte		- peek at next byte from connection
56  *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
57  *		pq_flush		- flush pending output
58  *		pq_flush_if_writable - flush pending output if writable without blocking
59  *		pq_getbyte_if_available - get a byte if available without blocking
60  *
61  * message-level I/O (and old-style-COPY-OUT cruft):
62  *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
63  *		pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
64  *		pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
65  *		pq_endcopyout	- end a COPY OUT transfer
66  *
67  *------------------------
68  */
69 #include "postgres.h"
70 
71 #include <signal.h>
72 #include <fcntl.h>
73 #include <grp.h>
74 #include <unistd.h>
75 #include <sys/file.h>
76 #include <sys/socket.h>
77 #include <sys/stat.h>
78 #include <sys/time.h>
79 #include <netdb.h>
80 #include <netinet/in.h>
81 #ifdef HAVE_NETINET_TCP_H
82 #include <netinet/tcp.h>
83 #endif
84 #include <arpa/inet.h>
85 #ifdef HAVE_UTIME_H
86 #include <utime.h>
87 #endif
88 #ifdef _MSC_VER					/* mstcpip.h is missing on mingw */
89 #include <mstcpip.h>
90 #endif
91 
92 #include "common/ip.h"
93 #include "libpq/libpq.h"
94 #include "miscadmin.h"
95 #include "storage/ipc.h"
96 #include "utils/guc.h"
97 #include "utils/memutils.h"
98 
99 /*
100  * Cope with the various platform-specific ways to spell TCP keepalive socket
101  * options.  This doesn't cover Windows, which as usual does its own thing.
102  */
103 #if defined(TCP_KEEPIDLE)
104 /* TCP_KEEPIDLE is the name of this option on Linux and *BSD */
105 #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPIDLE
106 #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPIDLE"
107 #elif defined(TCP_KEEPALIVE_THRESHOLD)
108 /* TCP_KEEPALIVE_THRESHOLD is the name of this option on Solaris >= 11 */
109 #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE_THRESHOLD
110 #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE_THRESHOLD"
111 #elif defined(TCP_KEEPALIVE) && defined(__darwin__)
112 /* TCP_KEEPALIVE is the name of this option on macOS */
113 /* Caution: Solaris has this symbol but it means something different */
114 #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE
115 #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE"
116 #endif
117 
118 /*
119  * Configuration options
120  */
121 int			Unix_socket_permissions;
122 char	   *Unix_socket_group;
123 
124 /* Where the Unix socket files are (list of palloc'd strings) */
125 static List *sock_paths = NIL;
126 
127 /*
128  * Buffers for low-level I/O.
129  *
130  * The receive buffer is fixed size. Send buffer is usually 8k, but can be
131  * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
132  */
133 
134 #define PQ_SEND_BUFFER_SIZE 8192
135 #define PQ_RECV_BUFFER_SIZE 8192
136 
137 static char *PqSendBuffer;
138 static int	PqSendBufferSize;	/* Size send buffer */
139 static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
140 static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
141 
142 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
143 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
144 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
145 
146 /*
147  * Message status
148  */
149 static bool PqCommBusy;			/* busy sending data to the client */
150 static bool PqCommReadingMsg;	/* in the middle of reading a message */
151 static bool DoingCopyOut;		/* in old-protocol COPY OUT processing */
152 
153 
154 /* Internal functions */
155 static void socket_comm_reset(void);
156 static void socket_close(int code, Datum arg);
157 static void socket_set_nonblocking(bool nonblocking);
158 static int	socket_flush(void);
159 static int	socket_flush_if_writable(void);
160 static bool socket_is_send_pending(void);
161 static int	socket_putmessage(char msgtype, const char *s, size_t len);
162 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
163 static void socket_startcopyout(void);
164 static void socket_endcopyout(bool errorAbort);
165 static int	internal_putbytes(const char *s, size_t len);
166 static int	internal_flush(void);
167 
168 #ifdef HAVE_UNIX_SOCKETS
169 static int	Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
170 static int	Setup_AF_UNIX(char *sock_path);
171 #endif							/* HAVE_UNIX_SOCKETS */
172 
173 static PQcommMethods PqCommSocketMethods = {
174 	socket_comm_reset,
175 	socket_flush,
176 	socket_flush_if_writable,
177 	socket_is_send_pending,
178 	socket_putmessage,
179 	socket_putmessage_noblock,
180 	socket_startcopyout,
181 	socket_endcopyout
182 };
183 
184 PQcommMethods *PqCommMethods = &PqCommSocketMethods;
185 
186 WaitEventSet *FeBeWaitSet;
187 
188 
189 /* --------------------------------
190  *		pq_init - initialize libpq at backend startup
191  * --------------------------------
192  */
193 void
pq_init(void)194 pq_init(void)
195 {
196 	/* initialize state variables */
197 	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
198 	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
199 	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
200 	PqCommBusy = false;
201 	PqCommReadingMsg = false;
202 	DoingCopyOut = false;
203 
204 	/* set up process-exit hook to close the socket */
205 	on_proc_exit(socket_close, 0);
206 
207 	/*
208 	 * In backends (as soon as forked) we operate the underlying socket in
209 	 * nonblocking mode and use latches to implement blocking semantics if
210 	 * needed. That allows us to provide safely interruptible reads and
211 	 * writes.
212 	 *
213 	 * Use COMMERROR on failure, because ERROR would try to send the error to
214 	 * the client, which might require changing the mode again, leading to
215 	 * infinite recursion.
216 	 */
217 #ifndef WIN32
218 	if (!pg_set_noblock(MyProcPort->sock))
219 		ereport(COMMERROR,
220 				(errmsg("could not set socket to nonblocking mode: %m")));
221 #endif
222 
223 	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
224 	AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
225 					  NULL, NULL);
226 	AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
227 	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
228 }
229 
230 /* --------------------------------
231  *		socket_comm_reset - reset libpq during error recovery
232  *
233  * This is called from error recovery at the outer idle loop.  It's
234  * just to get us out of trouble if we somehow manage to elog() from
235  * inside a pqcomm.c routine (which ideally will never happen, but...)
236  * --------------------------------
237  */
238 static void
socket_comm_reset(void)239 socket_comm_reset(void)
240 {
241 	/* Do not throw away pending data, but do reset the busy flag */
242 	PqCommBusy = false;
243 	/* We can abort any old-style COPY OUT, too */
244 	pq_endcopyout(true);
245 }
246 
247 /* --------------------------------
248  *		socket_close - shutdown libpq at backend exit
249  *
250  * This is the one pg_on_exit_callback in place during BackendInitialize().
251  * That function's unusual signal handling constrains that this callback be
252  * safe to run at any instant.
253  * --------------------------------
254  */
255 static void
socket_close(int code,Datum arg)256 socket_close(int code, Datum arg)
257 {
258 	/* Nothing to do in a standalone backend, where MyProcPort is NULL. */
259 	if (MyProcPort != NULL)
260 	{
261 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
262 #ifdef ENABLE_GSS
263 		OM_uint32	min_s;
264 
265 		/*
266 		 * Shutdown GSSAPI layer.  This section does nothing when interrupting
267 		 * BackendInitialize(), because pg_GSS_recvauth() makes first use of
268 		 * "ctx" and "cred".
269 		 */
270 		if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
271 			gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
272 
273 		if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
274 			gss_release_cred(&min_s, &MyProcPort->gss->cred);
275 #endif							/* ENABLE_GSS */
276 
277 		/*
278 		 * GSS and SSPI share the port->gss struct.  Since nowhere else does a
279 		 * postmaster child free this, doing so is safe when interrupting
280 		 * BackendInitialize().
281 		 */
282 		free(MyProcPort->gss);
283 #endif							/* ENABLE_GSS || ENABLE_SSPI */
284 
285 		/*
286 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
287 		 * call this, so this is safe when interrupting BackendInitialize().
288 		 */
289 		secure_close(MyProcPort);
290 
291 		/*
292 		 * Formerly we did an explicit close() here, but it seems better to
293 		 * leave the socket open until the process dies.  This allows clients
294 		 * to perform a "synchronous close" if they care --- wait till the
295 		 * transport layer reports connection closure, and you can be sure the
296 		 * backend has exited.
297 		 *
298 		 * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
299 		 * though.
300 		 */
301 		MyProcPort->sock = PGINVALID_SOCKET;
302 	}
303 }
304 
305 
306 
307 /*
308  * Streams -- wrapper around Unix socket system calls
309  *
310  *
311  *		Stream functions are used for vanilla TCP connection protocol.
312  */
313 
314 
315 /*
316  * StreamServerPort -- open a "listening" port to accept connections.
317  *
318  * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
319  * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
320  * specified.  For TCP ports, hostName is either NULL for all interfaces or
321  * the interface to listen on, and unixSocketDir is ignored (can be NULL).
322  *
323  * Successfully opened sockets are added to the ListenSocket[] array (of
324  * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
325  *
326  * RETURNS: STATUS_OK or STATUS_ERROR
327  */
328 
329 int
StreamServerPort(int family,char * hostName,unsigned short portNumber,char * unixSocketDir,pgsocket ListenSocket[],int MaxListen)330 StreamServerPort(int family, char *hostName, unsigned short portNumber,
331 				 char *unixSocketDir,
332 				 pgsocket ListenSocket[], int MaxListen)
333 {
334 	pgsocket	fd;
335 	int			err;
336 	int			maxconn;
337 	int			ret;
338 	char		portNumberStr[32];
339 	const char *familyDesc;
340 	char		familyDescBuf[64];
341 	const char *addrDesc;
342 	char		addrBuf[NI_MAXHOST];
343 	char	   *service;
344 	struct addrinfo *addrs = NULL,
345 			   *addr;
346 	struct addrinfo hint;
347 	int			listen_index = 0;
348 	int			added = 0;
349 
350 #ifdef HAVE_UNIX_SOCKETS
351 	char		unixSocketPath[MAXPGPATH];
352 #endif
353 #if !defined(WIN32) || defined(IPV6_V6ONLY)
354 	int			one = 1;
355 #endif
356 
357 	/* Initialize hint structure */
358 	MemSet(&hint, 0, sizeof(hint));
359 	hint.ai_family = family;
360 	hint.ai_flags = AI_PASSIVE;
361 	hint.ai_socktype = SOCK_STREAM;
362 
363 #ifdef HAVE_UNIX_SOCKETS
364 	if (family == AF_UNIX)
365 	{
366 		/*
367 		 * Create unixSocketPath from portNumber and unixSocketDir and lock
368 		 * that file path
369 		 */
370 		UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
371 		if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
372 		{
373 			ereport(LOG,
374 					(errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
375 							unixSocketPath,
376 							(int) (UNIXSOCK_PATH_BUFLEN - 1))));
377 			return STATUS_ERROR;
378 		}
379 		if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
380 			return STATUS_ERROR;
381 		service = unixSocketPath;
382 	}
383 	else
384 #endif							/* HAVE_UNIX_SOCKETS */
385 	{
386 		snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
387 		service = portNumberStr;
388 	}
389 
390 	ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
391 	if (ret || !addrs)
392 	{
393 		if (hostName)
394 			ereport(LOG,
395 					(errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
396 							hostName, service, gai_strerror(ret))));
397 		else
398 			ereport(LOG,
399 					(errmsg("could not translate service \"%s\" to address: %s",
400 							service, gai_strerror(ret))));
401 		if (addrs)
402 			pg_freeaddrinfo_all(hint.ai_family, addrs);
403 		return STATUS_ERROR;
404 	}
405 
406 	for (addr = addrs; addr; addr = addr->ai_next)
407 	{
408 		if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
409 		{
410 			/*
411 			 * Only set up a unix domain socket when they really asked for it.
412 			 * The service/port is different in that case.
413 			 */
414 			continue;
415 		}
416 
417 		/* See if there is still room to add 1 more socket. */
418 		for (; listen_index < MaxListen; listen_index++)
419 		{
420 			if (ListenSocket[listen_index] == PGINVALID_SOCKET)
421 				break;
422 		}
423 		if (listen_index >= MaxListen)
424 		{
425 			ereport(LOG,
426 					(errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
427 							MaxListen)));
428 			break;
429 		}
430 
431 		/* set up address family name for log messages */
432 		switch (addr->ai_family)
433 		{
434 			case AF_INET:
435 				familyDesc = _("IPv4");
436 				break;
437 #ifdef HAVE_IPV6
438 			case AF_INET6:
439 				familyDesc = _("IPv6");
440 				break;
441 #endif
442 #ifdef HAVE_UNIX_SOCKETS
443 			case AF_UNIX:
444 				familyDesc = _("Unix");
445 				break;
446 #endif
447 			default:
448 				snprintf(familyDescBuf, sizeof(familyDescBuf),
449 						 _("unrecognized address family %d"),
450 						 addr->ai_family);
451 				familyDesc = familyDescBuf;
452 				break;
453 		}
454 
455 		/* set up text form of address for log messages */
456 #ifdef HAVE_UNIX_SOCKETS
457 		if (addr->ai_family == AF_UNIX)
458 			addrDesc = unixSocketPath;
459 		else
460 #endif
461 		{
462 			pg_getnameinfo_all((const struct sockaddr_storage *) addr->ai_addr,
463 							   addr->ai_addrlen,
464 							   addrBuf, sizeof(addrBuf),
465 							   NULL, 0,
466 							   NI_NUMERICHOST);
467 			addrDesc = addrBuf;
468 		}
469 
470 		if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
471 		{
472 			ereport(LOG,
473 					(errcode_for_socket_access(),
474 			/* translator: first %s is IPv4, IPv6, or Unix */
475 					 errmsg("could not create %s socket for address \"%s\": %m",
476 							familyDesc, addrDesc)));
477 			continue;
478 		}
479 
480 #ifndef WIN32
481 
482 		/*
483 		 * Without the SO_REUSEADDR flag, a new postmaster can't be started
484 		 * right away after a stop or crash, giving "address already in use"
485 		 * error on TCP ports.
486 		 *
487 		 * On win32, however, this behavior only happens if the
488 		 * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
489 		 * servers to listen on the same address, resulting in unpredictable
490 		 * behavior. With no flags at all, win32 behaves as Unix with
491 		 * SO_REUSEADDR.
492 		 */
493 		if (!IS_AF_UNIX(addr->ai_family))
494 		{
495 			if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
496 							(char *) &one, sizeof(one))) == -1)
497 			{
498 				ereport(LOG,
499 						(errcode_for_socket_access(),
500 				/* translator: first %s is IPv4, IPv6, or Unix */
501 						 errmsg("setsockopt(SO_REUSEADDR) failed for %s address \"%s\": %m",
502 								familyDesc, addrDesc)));
503 				closesocket(fd);
504 				continue;
505 			}
506 		}
507 #endif
508 
509 #ifdef IPV6_V6ONLY
510 		if (addr->ai_family == AF_INET6)
511 		{
512 			if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
513 						   (char *) &one, sizeof(one)) == -1)
514 			{
515 				ereport(LOG,
516 						(errcode_for_socket_access(),
517 				/* translator: first %s is IPv4, IPv6, or Unix */
518 						 errmsg("setsockopt(IPV6_V6ONLY) failed for %s address \"%s\": %m",
519 								familyDesc, addrDesc)));
520 				closesocket(fd);
521 				continue;
522 			}
523 		}
524 #endif
525 
526 		/*
527 		 * Note: This might fail on some OS's, like Linux older than
528 		 * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
529 		 * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
530 		 * connections.
531 		 */
532 		err = bind(fd, addr->ai_addr, addr->ai_addrlen);
533 		if (err < 0)
534 		{
535 			ereport(LOG,
536 					(errcode_for_socket_access(),
537 			/* translator: first %s is IPv4, IPv6, or Unix */
538 					 errmsg("could not bind %s address \"%s\": %m",
539 							familyDesc, addrDesc),
540 					 (IS_AF_UNIX(addr->ai_family)) ?
541 					 errhint("Is another postmaster already running on port %d?"
542 							 " If not, remove socket file \"%s\" and retry.",
543 							 (int) portNumber, service) :
544 					 errhint("Is another postmaster already running on port %d?"
545 							 " If not, wait a few seconds and retry.",
546 							 (int) portNumber)));
547 			closesocket(fd);
548 			continue;
549 		}
550 
551 #ifdef HAVE_UNIX_SOCKETS
552 		if (addr->ai_family == AF_UNIX)
553 		{
554 			if (Setup_AF_UNIX(service) != STATUS_OK)
555 			{
556 				closesocket(fd);
557 				break;
558 			}
559 		}
560 #endif
561 
562 		/*
563 		 * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
564 		 * intended to provide a clamp on the request on platforms where an
565 		 * overly large request provokes a kernel error (are there any?).
566 		 */
567 		maxconn = MaxBackends * 2;
568 		if (maxconn > PG_SOMAXCONN)
569 			maxconn = PG_SOMAXCONN;
570 
571 		err = listen(fd, maxconn);
572 		if (err < 0)
573 		{
574 			ereport(LOG,
575 					(errcode_for_socket_access(),
576 			/* translator: first %s is IPv4, IPv6, or Unix */
577 					 errmsg("could not listen on %s address \"%s\": %m",
578 							familyDesc, addrDesc)));
579 			closesocket(fd);
580 			continue;
581 		}
582 
583 #ifdef HAVE_UNIX_SOCKETS
584 		if (addr->ai_family == AF_UNIX)
585 			ereport(LOG,
586 					(errmsg("listening on Unix socket \"%s\"",
587 							addrDesc)));
588 		else
589 #endif
590 			ereport(LOG,
591 			/* translator: first %s is IPv4 or IPv6 */
592 					(errmsg("listening on %s address \"%s\", port %d",
593 							familyDesc, addrDesc, (int) portNumber)));
594 
595 		ListenSocket[listen_index] = fd;
596 		added++;
597 	}
598 
599 	pg_freeaddrinfo_all(hint.ai_family, addrs);
600 
601 	if (!added)
602 		return STATUS_ERROR;
603 
604 	return STATUS_OK;
605 }
606 
607 
608 #ifdef HAVE_UNIX_SOCKETS
609 
610 /*
611  * Lock_AF_UNIX -- configure unix socket file path
612  */
613 static int
Lock_AF_UNIX(char * unixSocketDir,char * unixSocketPath)614 Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
615 {
616 	/*
617 	 * Grab an interlock file associated with the socket file.
618 	 *
619 	 * Note: there are two reasons for using a socket lock file, rather than
620 	 * trying to interlock directly on the socket itself.  First, it's a lot
621 	 * more portable, and second, it lets us remove any pre-existing socket
622 	 * file without race conditions.
623 	 */
624 	CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
625 
626 	/*
627 	 * Once we have the interlock, we can safely delete any pre-existing
628 	 * socket file to avoid failure at bind() time.
629 	 */
630 	(void) unlink(unixSocketPath);
631 
632 	/*
633 	 * Remember socket file pathnames for later maintenance.
634 	 */
635 	sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
636 
637 	return STATUS_OK;
638 }
639 
640 
641 /*
642  * Setup_AF_UNIX -- configure unix socket permissions
643  */
644 static int
Setup_AF_UNIX(char * sock_path)645 Setup_AF_UNIX(char *sock_path)
646 {
647 	/*
648 	 * Fix socket ownership/permission if requested.  Note we must do this
649 	 * before we listen() to avoid a window where unwanted connections could
650 	 * get accepted.
651 	 */
652 	Assert(Unix_socket_group);
653 	if (Unix_socket_group[0] != '\0')
654 	{
655 #ifdef WIN32
656 		elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
657 #else
658 		char	   *endptr;
659 		unsigned long val;
660 		gid_t		gid;
661 
662 		val = strtoul(Unix_socket_group, &endptr, 10);
663 		if (*endptr == '\0')
664 		{						/* numeric group id */
665 			gid = val;
666 		}
667 		else
668 		{						/* convert group name to id */
669 			struct group *gr;
670 
671 			gr = getgrnam(Unix_socket_group);
672 			if (!gr)
673 			{
674 				ereport(LOG,
675 						(errmsg("group \"%s\" does not exist",
676 								Unix_socket_group)));
677 				return STATUS_ERROR;
678 			}
679 			gid = gr->gr_gid;
680 		}
681 		if (chown(sock_path, -1, gid) == -1)
682 		{
683 			ereport(LOG,
684 					(errcode_for_file_access(),
685 					 errmsg("could not set group of file \"%s\": %m",
686 							sock_path)));
687 			return STATUS_ERROR;
688 		}
689 #endif
690 	}
691 
692 	if (chmod(sock_path, Unix_socket_permissions) == -1)
693 	{
694 		ereport(LOG,
695 				(errcode_for_file_access(),
696 				 errmsg("could not set permissions of file \"%s\": %m",
697 						sock_path)));
698 		return STATUS_ERROR;
699 	}
700 	return STATUS_OK;
701 }
702 #endif							/* HAVE_UNIX_SOCKETS */
703 
704 
705 /*
706  * StreamConnection -- create a new connection with client using
707  *		server port.  Set port->sock to the FD of the new connection.
708  *
709  * ASSUME: that this doesn't need to be non-blocking because
710  *		the Postmaster uses select() to tell when the server master
711  *		socket is ready for accept().
712  *
713  * RETURNS: STATUS_OK or STATUS_ERROR
714  */
715 int
StreamConnection(pgsocket server_fd,Port * port)716 StreamConnection(pgsocket server_fd, Port *port)
717 {
718 	/* accept connection and fill in the client (remote) address */
719 	port->raddr.salen = sizeof(port->raddr.addr);
720 	if ((port->sock = accept(server_fd,
721 							 (struct sockaddr *) &port->raddr.addr,
722 							 &port->raddr.salen)) == PGINVALID_SOCKET)
723 	{
724 		ereport(LOG,
725 				(errcode_for_socket_access(),
726 				 errmsg("could not accept new connection: %m")));
727 
728 		/*
729 		 * If accept() fails then postmaster.c will still see the server
730 		 * socket as read-ready, and will immediately try again.  To avoid
731 		 * uselessly sucking lots of CPU, delay a bit before trying again.
732 		 * (The most likely reason for failure is being out of kernel file
733 		 * table slots; we can do little except hope some will get freed up.)
734 		 */
735 		pg_usleep(100000L);		/* wait 0.1 sec */
736 		return STATUS_ERROR;
737 	}
738 
739 	/* fill in the server (local) address */
740 	port->laddr.salen = sizeof(port->laddr.addr);
741 	if (getsockname(port->sock,
742 					(struct sockaddr *) &port->laddr.addr,
743 					&port->laddr.salen) < 0)
744 	{
745 		elog(LOG, "getsockname() failed: %m");
746 		return STATUS_ERROR;
747 	}
748 
749 	/* select NODELAY and KEEPALIVE options if it's a TCP connection */
750 	if (!IS_AF_UNIX(port->laddr.addr.ss_family))
751 	{
752 		int			on;
753 #ifdef WIN32
754 		int			oldopt;
755 		int			optlen;
756 		int			newopt;
757 #endif
758 
759 #ifdef	TCP_NODELAY
760 		on = 1;
761 		if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
762 					   (char *) &on, sizeof(on)) < 0)
763 		{
764 			elog(LOG, "setsockopt(%s) failed: %m", "TCP_NODELAY");
765 			return STATUS_ERROR;
766 		}
767 #endif
768 		on = 1;
769 		if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
770 					   (char *) &on, sizeof(on)) < 0)
771 		{
772 			elog(LOG, "setsockopt(%s) failed: %m", "SO_KEEPALIVE");
773 			return STATUS_ERROR;
774 		}
775 
776 #ifdef WIN32
777 
778 		/*
779 		 * This is a Win32 socket optimization.  The OS send buffer should be
780 		 * large enough to send the whole Postgres send buffer in one go, or
781 		 * performance suffers.  The Postgres send buffer can be enlarged if a
782 		 * very large message needs to be sent, but we won't attempt to
783 		 * enlarge the OS buffer if that happens, so somewhat arbitrarily
784 		 * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
785 		 * (That's 32kB with the current default).
786 		 *
787 		 * The default OS buffer size used to be 8kB in earlier Windows
788 		 * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
789 		 * be necessary to change it in later versions anymore.  Changing it
790 		 * unnecessarily can even reduce performance, because setting
791 		 * SO_SNDBUF in the application disables the "dynamic send buffering"
792 		 * feature that was introduced in Windows 7.  So before fiddling with
793 		 * SO_SNDBUF, check if the current buffer size is already large enough
794 		 * and only increase it if necessary.
795 		 *
796 		 * See https://support.microsoft.com/kb/823764/EN-US/ and
797 		 * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
798 		 */
799 		optlen = sizeof(oldopt);
800 		if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
801 					   &optlen) < 0)
802 		{
803 			elog(LOG, "getsockopt(%s) failed: %m", "SO_SNDBUF");
804 			return STATUS_ERROR;
805 		}
806 		newopt = PQ_SEND_BUFFER_SIZE * 4;
807 		if (oldopt < newopt)
808 		{
809 			if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
810 						   sizeof(newopt)) < 0)
811 			{
812 				elog(LOG, "setsockopt(%s) failed: %m", "SO_SNDBUF");
813 				return STATUS_ERROR;
814 			}
815 		}
816 #endif
817 
818 		/*
819 		 * Also apply the current keepalive parameters.  If we fail to set a
820 		 * parameter, don't error out, because these aren't universally
821 		 * supported.  (Note: you might think we need to reset the GUC
822 		 * variables to 0 in such a case, but it's not necessary because the
823 		 * show hooks for these variables report the truth anyway.)
824 		 */
825 		(void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
826 		(void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
827 		(void) pq_setkeepalivescount(tcp_keepalives_count, port);
828 	}
829 
830 	return STATUS_OK;
831 }
832 
833 /*
834  * StreamClose -- close a client/backend connection
835  *
836  * NOTE: this is NOT used to terminate a session; it is just used to release
837  * the file descriptor in a process that should no longer have the socket
838  * open.  (For example, the postmaster calls this after passing ownership
839  * of the connection to a child process.)  It is expected that someone else
840  * still has the socket open.  So, we only want to close the descriptor,
841  * we do NOT want to send anything to the far end.
842  */
843 void
StreamClose(pgsocket sock)844 StreamClose(pgsocket sock)
845 {
846 	closesocket(sock);
847 }
848 
849 /*
850  * TouchSocketFiles -- mark socket files as recently accessed
851  *
852  * This routine should be called every so often to ensure that the socket
853  * files have a recent mod date (ordinary operations on sockets usually won't
854  * change the mod date).  That saves them from being removed by
855  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
856  * never have put the socket file in /tmp...)
857  */
858 void
TouchSocketFiles(void)859 TouchSocketFiles(void)
860 {
861 	ListCell   *l;
862 
863 	/* Loop through all created sockets... */
864 	foreach(l, sock_paths)
865 	{
866 		char	   *sock_path = (char *) lfirst(l);
867 
868 		/*
869 		 * utime() is POSIX standard, utimes() is a common alternative. If we
870 		 * have neither, there's no way to affect the mod or access time of
871 		 * the socket :-(
872 		 *
873 		 * In either path, we ignore errors; there's no point in complaining.
874 		 */
875 #ifdef HAVE_UTIME
876 		utime(sock_path, NULL);
877 #else							/* !HAVE_UTIME */
878 #ifdef HAVE_UTIMES
879 		utimes(sock_path, NULL);
880 #endif							/* HAVE_UTIMES */
881 #endif							/* HAVE_UTIME */
882 	}
883 }
884 
885 /*
886  * RemoveSocketFiles -- unlink socket files at postmaster shutdown
887  */
888 void
RemoveSocketFiles(void)889 RemoveSocketFiles(void)
890 {
891 	ListCell   *l;
892 
893 	/* Loop through all created sockets... */
894 	foreach(l, sock_paths)
895 	{
896 		char	   *sock_path = (char *) lfirst(l);
897 
898 		/* Ignore any error. */
899 		(void) unlink(sock_path);
900 	}
901 	/* Since we're about to exit, no need to reclaim storage */
902 	sock_paths = NIL;
903 }
904 
905 
906 /* --------------------------------
907  * Low-level I/O routines begin here.
908  *
909  * These routines communicate with a frontend client across a connection
910  * already established by the preceding routines.
911  * --------------------------------
912  */
913 
914 /* --------------------------------
915  *			  socket_set_nonblocking - set socket blocking/non-blocking
916  *
917  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
918  * blocking otherwise.
919  * --------------------------------
920  */
921 static void
socket_set_nonblocking(bool nonblocking)922 socket_set_nonblocking(bool nonblocking)
923 {
924 	if (MyProcPort == NULL)
925 		ereport(ERROR,
926 				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
927 				 errmsg("there is no client connection")));
928 
929 	MyProcPort->noblock = nonblocking;
930 }
931 
932 /* --------------------------------
933  *		pq_recvbuf - load some bytes into the input buffer
934  *
935  *		returns 0 if OK, EOF if trouble
936  * --------------------------------
937  */
938 static int
pq_recvbuf(void)939 pq_recvbuf(void)
940 {
941 	if (PqRecvPointer > 0)
942 	{
943 		if (PqRecvLength > PqRecvPointer)
944 		{
945 			/* still some unread data, left-justify it in the buffer */
946 			memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
947 					PqRecvLength - PqRecvPointer);
948 			PqRecvLength -= PqRecvPointer;
949 			PqRecvPointer = 0;
950 		}
951 		else
952 			PqRecvLength = PqRecvPointer = 0;
953 	}
954 
955 	/* Ensure that we're in blocking mode */
956 	socket_set_nonblocking(false);
957 
958 	/* Can fill buffer from PqRecvLength and upwards */
959 	for (;;)
960 	{
961 		int			r;
962 
963 		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
964 						PQ_RECV_BUFFER_SIZE - PqRecvLength);
965 
966 		if (r < 0)
967 		{
968 			if (errno == EINTR)
969 				continue;		/* Ok if interrupted */
970 
971 			/*
972 			 * Careful: an ereport() that tries to write to the client would
973 			 * cause recursion to here, leading to stack overflow and core
974 			 * dump!  This message must go *only* to the postmaster log.
975 			 */
976 			ereport(COMMERROR,
977 					(errcode_for_socket_access(),
978 					 errmsg("could not receive data from client: %m")));
979 			return EOF;
980 		}
981 		if (r == 0)
982 		{
983 			/*
984 			 * EOF detected.  We used to write a log message here, but it's
985 			 * better to expect the ultimate caller to do that.
986 			 */
987 			return EOF;
988 		}
989 		/* r contains number of bytes read, so just incr length */
990 		PqRecvLength += r;
991 		return 0;
992 	}
993 }
994 
995 /* --------------------------------
996  *		pq_getbyte	- get a single byte from connection, or return EOF
997  * --------------------------------
998  */
999 int
pq_getbyte(void)1000 pq_getbyte(void)
1001 {
1002 	Assert(PqCommReadingMsg);
1003 
1004 	while (PqRecvPointer >= PqRecvLength)
1005 	{
1006 		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
1007 			return EOF;			/* Failed to recv data */
1008 	}
1009 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
1010 }
1011 
1012 /* --------------------------------
1013  *		pq_peekbyte		- peek at next byte from connection
1014  *
1015  *	 Same as pq_getbyte() except we don't advance the pointer.
1016  * --------------------------------
1017  */
1018 int
pq_peekbyte(void)1019 pq_peekbyte(void)
1020 {
1021 	Assert(PqCommReadingMsg);
1022 
1023 	while (PqRecvPointer >= PqRecvLength)
1024 	{
1025 		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
1026 			return EOF;			/* Failed to recv data */
1027 	}
1028 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
1029 }
1030 
1031 /* --------------------------------
1032  *		pq_getbyte_if_available - get a single byte from connection,
1033  *			if available
1034  *
1035  * The received byte is stored in *c. Returns 1 if a byte was read,
1036  * 0 if no data was available, or EOF if trouble.
1037  * --------------------------------
1038  */
1039 int
pq_getbyte_if_available(unsigned char * c)1040 pq_getbyte_if_available(unsigned char *c)
1041 {
1042 	int			r;
1043 
1044 	Assert(PqCommReadingMsg);
1045 
1046 	if (PqRecvPointer < PqRecvLength)
1047 	{
1048 		*c = PqRecvBuffer[PqRecvPointer++];
1049 		return 1;
1050 	}
1051 
1052 	/* Put the socket into non-blocking mode */
1053 	socket_set_nonblocking(true);
1054 
1055 	r = secure_read(MyProcPort, c, 1);
1056 	if (r < 0)
1057 	{
1058 		/*
1059 		 * Ok if no data available without blocking or interrupted (though
1060 		 * EINTR really shouldn't happen with a non-blocking socket). Report
1061 		 * other errors.
1062 		 */
1063 		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1064 			r = 0;
1065 		else
1066 		{
1067 			/*
1068 			 * Careful: an ereport() that tries to write to the client would
1069 			 * cause recursion to here, leading to stack overflow and core
1070 			 * dump!  This message must go *only* to the postmaster log.
1071 			 */
1072 			ereport(COMMERROR,
1073 					(errcode_for_socket_access(),
1074 					 errmsg("could not receive data from client: %m")));
1075 			r = EOF;
1076 		}
1077 	}
1078 	else if (r == 0)
1079 	{
1080 		/* EOF detected */
1081 		r = EOF;
1082 	}
1083 
1084 	return r;
1085 }
1086 
1087 /* --------------------------------
1088  *		pq_getbytes		- get a known number of bytes from connection
1089  *
1090  *		returns 0 if OK, EOF if trouble
1091  * --------------------------------
1092  */
1093 int
pq_getbytes(char * s,size_t len)1094 pq_getbytes(char *s, size_t len)
1095 {
1096 	size_t		amount;
1097 
1098 	Assert(PqCommReadingMsg);
1099 
1100 	while (len > 0)
1101 	{
1102 		while (PqRecvPointer >= PqRecvLength)
1103 		{
1104 			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
1105 				return EOF;		/* Failed to recv data */
1106 		}
1107 		amount = PqRecvLength - PqRecvPointer;
1108 		if (amount > len)
1109 			amount = len;
1110 		memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1111 		PqRecvPointer += amount;
1112 		s += amount;
1113 		len -= amount;
1114 	}
1115 	return 0;
1116 }
1117 
1118 /* --------------------------------
1119  *		pq_discardbytes		- throw away a known number of bytes
1120  *
1121  *		same as pq_getbytes except we do not copy the data to anyplace.
1122  *		this is used for resynchronizing after read errors.
1123  *
1124  *		returns 0 if OK, EOF if trouble
1125  * --------------------------------
1126  */
1127 static int
pq_discardbytes(size_t len)1128 pq_discardbytes(size_t len)
1129 {
1130 	size_t		amount;
1131 
1132 	Assert(PqCommReadingMsg);
1133 
1134 	while (len > 0)
1135 	{
1136 		while (PqRecvPointer >= PqRecvLength)
1137 		{
1138 			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
1139 				return EOF;		/* Failed to recv data */
1140 		}
1141 		amount = PqRecvLength - PqRecvPointer;
1142 		if (amount > len)
1143 			amount = len;
1144 		PqRecvPointer += amount;
1145 		len -= amount;
1146 	}
1147 	return 0;
1148 }
1149 
1150 /* --------------------------------
1151  *		pq_getstring	- get a null terminated string from connection
1152  *
1153  *		The return value is placed in an expansible StringInfo, which has
1154  *		already been initialized by the caller.
1155  *
1156  *		This is used only for dealing with old-protocol clients.  The idea
1157  *		is to produce a StringInfo that looks the same as we would get from
1158  *		pq_getmessage() with a newer client; we will then process it with
1159  *		pq_getmsgstring.  Therefore, no character set conversion is done here,
1160  *		even though this is presumably useful only for text.
1161  *
1162  *		returns 0 if OK, EOF if trouble
1163  * --------------------------------
1164  */
1165 int
pq_getstring(StringInfo s)1166 pq_getstring(StringInfo s)
1167 {
1168 	int			i;
1169 
1170 	Assert(PqCommReadingMsg);
1171 
1172 	resetStringInfo(s);
1173 
1174 	/* Read until we get the terminating '\0' */
1175 	for (;;)
1176 	{
1177 		while (PqRecvPointer >= PqRecvLength)
1178 		{
1179 			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
1180 				return EOF;		/* Failed to recv data */
1181 		}
1182 
1183 		for (i = PqRecvPointer; i < PqRecvLength; i++)
1184 		{
1185 			if (PqRecvBuffer[i] == '\0')
1186 			{
1187 				/* include the '\0' in the copy */
1188 				appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1189 									   i - PqRecvPointer + 1);
1190 				PqRecvPointer = i + 1;	/* advance past \0 */
1191 				return 0;
1192 			}
1193 		}
1194 
1195 		/* If we're here we haven't got the \0 in the buffer yet. */
1196 		appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1197 							   PqRecvLength - PqRecvPointer);
1198 		PqRecvPointer = PqRecvLength;
1199 	}
1200 }
1201 
1202 /* --------------------------------
1203  *		pq_buffer_has_data		- is any buffered data available to read?
1204  *
1205  * This will *not* attempt to read more data.
1206  * --------------------------------
1207  */
1208 bool
pq_buffer_has_data(void)1209 pq_buffer_has_data(void)
1210 {
1211 	return (PqRecvPointer < PqRecvLength);
1212 }
1213 
1214 
1215 /* --------------------------------
1216  *		pq_startmsgread - begin reading a message from the client.
1217  *
1218  *		This must be called before any of the pq_get* functions.
1219  * --------------------------------
1220  */
1221 void
pq_startmsgread(void)1222 pq_startmsgread(void)
1223 {
1224 	/*
1225 	 * There shouldn't be a read active already, but let's check just to be
1226 	 * sure.
1227 	 */
1228 	if (PqCommReadingMsg)
1229 		ereport(FATAL,
1230 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
1231 				 errmsg("terminating connection because protocol synchronization was lost")));
1232 
1233 	PqCommReadingMsg = true;
1234 }
1235 
1236 
1237 /* --------------------------------
1238  *		pq_endmsgread	- finish reading message.
1239  *
1240  *		This must be called after reading a V2 protocol message with
1241  *		pq_getstring() and friends, to indicate that we have read the whole
1242  *		message. In V3 protocol, pq_getmessage() does this implicitly.
1243  * --------------------------------
1244  */
1245 void
pq_endmsgread(void)1246 pq_endmsgread(void)
1247 {
1248 	Assert(PqCommReadingMsg);
1249 
1250 	PqCommReadingMsg = false;
1251 }
1252 
1253 /* --------------------------------
1254  *		pq_is_reading_msg - are we currently reading a message?
1255  *
1256  * This is used in error recovery at the outer idle loop to detect if we have
1257  * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1258  * will check for that too, but it's nicer to detect it earlier.
1259  * --------------------------------
1260  */
1261 bool
pq_is_reading_msg(void)1262 pq_is_reading_msg(void)
1263 {
1264 	return PqCommReadingMsg;
1265 }
1266 
1267 /* --------------------------------
1268  *		pq_getmessage	- get a message with length word from connection
1269  *
1270  *		The return value is placed in an expansible StringInfo, which has
1271  *		already been initialized by the caller.
1272  *		Only the message body is placed in the StringInfo; the length word
1273  *		is removed.  Also, s->cursor is initialized to zero for convenience
1274  *		in scanning the message contents.
1275  *
1276  *		If maxlen is not zero, it is an upper limit on the length of the
1277  *		message we are willing to accept.  We abort the connection (by
1278  *		returning EOF) if client tries to send more than that.
1279  *
1280  *		returns 0 if OK, EOF if trouble
1281  * --------------------------------
1282  */
1283 int
pq_getmessage(StringInfo s,int maxlen)1284 pq_getmessage(StringInfo s, int maxlen)
1285 {
1286 	int32		len;
1287 
1288 	Assert(PqCommReadingMsg);
1289 
1290 	resetStringInfo(s);
1291 
1292 	/* Read message length word */
1293 	if (pq_getbytes((char *) &len, 4) == EOF)
1294 	{
1295 		ereport(COMMERROR,
1296 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
1297 				 errmsg("unexpected EOF within message length word")));
1298 		return EOF;
1299 	}
1300 
1301 	len = ntohl(len);
1302 
1303 	if (len < 4 ||
1304 		(maxlen > 0 && len > maxlen))
1305 	{
1306 		ereport(COMMERROR,
1307 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
1308 				 errmsg("invalid message length")));
1309 		return EOF;
1310 	}
1311 
1312 	len -= 4;					/* discount length itself */
1313 
1314 	if (len > 0)
1315 	{
1316 		/*
1317 		 * Allocate space for message.  If we run out of room (ridiculously
1318 		 * large message), we will elog(ERROR), but we want to discard the
1319 		 * message body so as not to lose communication sync.
1320 		 */
1321 		PG_TRY();
1322 		{
1323 			enlargeStringInfo(s, len);
1324 		}
1325 		PG_CATCH();
1326 		{
1327 			if (pq_discardbytes(len) == EOF)
1328 				ereport(COMMERROR,
1329 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
1330 						 errmsg("incomplete message from client")));
1331 
1332 			/* we discarded the rest of the message so we're back in sync. */
1333 			PqCommReadingMsg = false;
1334 			PG_RE_THROW();
1335 		}
1336 		PG_END_TRY();
1337 
1338 		/* And grab the message */
1339 		if (pq_getbytes(s->data, len) == EOF)
1340 		{
1341 			ereport(COMMERROR,
1342 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1343 					 errmsg("incomplete message from client")));
1344 			return EOF;
1345 		}
1346 		s->len = len;
1347 		/* Place a trailing null per StringInfo convention */
1348 		s->data[len] = '\0';
1349 	}
1350 
1351 	/* finished reading the message. */
1352 	PqCommReadingMsg = false;
1353 
1354 	return 0;
1355 }
1356 
1357 
1358 /* --------------------------------
1359  *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
1360  *
1361  *		returns 0 if OK, EOF if trouble
1362  * --------------------------------
1363  */
1364 int
pq_putbytes(const char * s,size_t len)1365 pq_putbytes(const char *s, size_t len)
1366 {
1367 	int			res;
1368 
1369 	/* Should only be called by old-style COPY OUT */
1370 	Assert(DoingCopyOut);
1371 	/* No-op if reentrant call */
1372 	if (PqCommBusy)
1373 		return 0;
1374 	PqCommBusy = true;
1375 	res = internal_putbytes(s, len);
1376 	PqCommBusy = false;
1377 	return res;
1378 }
1379 
1380 static int
internal_putbytes(const char * s,size_t len)1381 internal_putbytes(const char *s, size_t len)
1382 {
1383 	size_t		amount;
1384 
1385 	while (len > 0)
1386 	{
1387 		/* If buffer is full, then flush it out */
1388 		if (PqSendPointer >= PqSendBufferSize)
1389 		{
1390 			socket_set_nonblocking(false);
1391 			if (internal_flush())
1392 				return EOF;
1393 		}
1394 		amount = PqSendBufferSize - PqSendPointer;
1395 		if (amount > len)
1396 			amount = len;
1397 		memcpy(PqSendBuffer + PqSendPointer, s, amount);
1398 		PqSendPointer += amount;
1399 		s += amount;
1400 		len -= amount;
1401 	}
1402 	return 0;
1403 }
1404 
1405 /* --------------------------------
1406  *		socket_flush		- flush pending output
1407  *
1408  *		returns 0 if OK, EOF if trouble
1409  * --------------------------------
1410  */
1411 static int
socket_flush(void)1412 socket_flush(void)
1413 {
1414 	int			res;
1415 
1416 	/* No-op if reentrant call */
1417 	if (PqCommBusy)
1418 		return 0;
1419 	PqCommBusy = true;
1420 	socket_set_nonblocking(false);
1421 	res = internal_flush();
1422 	PqCommBusy = false;
1423 	return res;
1424 }
1425 
1426 /* --------------------------------
1427  *		internal_flush - flush pending output
1428  *
1429  * Returns 0 if OK (meaning everything was sent, or operation would block
1430  * and the socket is in non-blocking mode), or EOF if trouble.
1431  * --------------------------------
1432  */
1433 static int
internal_flush(void)1434 internal_flush(void)
1435 {
1436 	static int	last_reported_send_errno = 0;
1437 
1438 	char	   *bufptr = PqSendBuffer + PqSendStart;
1439 	char	   *bufend = PqSendBuffer + PqSendPointer;
1440 
1441 	while (bufptr < bufend)
1442 	{
1443 		int			r;
1444 
1445 		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1446 
1447 		if (r <= 0)
1448 		{
1449 			if (errno == EINTR)
1450 				continue;		/* Ok if we were interrupted */
1451 
1452 			/*
1453 			 * Ok if no data writable without blocking, and the socket is in
1454 			 * non-blocking mode.
1455 			 */
1456 			if (errno == EAGAIN ||
1457 				errno == EWOULDBLOCK)
1458 			{
1459 				return 0;
1460 			}
1461 
1462 			/*
1463 			 * Careful: an ereport() that tries to write to the client would
1464 			 * cause recursion to here, leading to stack overflow and core
1465 			 * dump!  This message must go *only* to the postmaster log.
1466 			 *
1467 			 * If a client disconnects while we're in the midst of output, we
1468 			 * might write quite a bit of data before we get to a safe query
1469 			 * abort point.  So, suppress duplicate log messages.
1470 			 */
1471 			if (errno != last_reported_send_errno)
1472 			{
1473 				last_reported_send_errno = errno;
1474 				ereport(COMMERROR,
1475 						(errcode_for_socket_access(),
1476 						 errmsg("could not send data to client: %m")));
1477 			}
1478 
1479 			/*
1480 			 * We drop the buffered data anyway so that processing can
1481 			 * continue, even though we'll probably quit soon. We also set a
1482 			 * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1483 			 * the connection.
1484 			 */
1485 			PqSendStart = PqSendPointer = 0;
1486 			ClientConnectionLost = 1;
1487 			InterruptPending = 1;
1488 			return EOF;
1489 		}
1490 
1491 		last_reported_send_errno = 0;	/* reset after any successful send */
1492 		bufptr += r;
1493 		PqSendStart += r;
1494 	}
1495 
1496 	PqSendStart = PqSendPointer = 0;
1497 	return 0;
1498 }
1499 
1500 /* --------------------------------
1501  *		pq_flush_if_writable - flush pending output if writable without blocking
1502  *
1503  * Returns 0 if OK, or EOF if trouble.
1504  * --------------------------------
1505  */
1506 static int
socket_flush_if_writable(void)1507 socket_flush_if_writable(void)
1508 {
1509 	int			res;
1510 
1511 	/* Quick exit if nothing to do */
1512 	if (PqSendPointer == PqSendStart)
1513 		return 0;
1514 
1515 	/* No-op if reentrant call */
1516 	if (PqCommBusy)
1517 		return 0;
1518 
1519 	/* Temporarily put the socket into non-blocking mode */
1520 	socket_set_nonblocking(true);
1521 
1522 	PqCommBusy = true;
1523 	res = internal_flush();
1524 	PqCommBusy = false;
1525 	return res;
1526 }
1527 
1528 /* --------------------------------
1529  *	socket_is_send_pending	- is there any pending data in the output buffer?
1530  * --------------------------------
1531  */
1532 static bool
socket_is_send_pending(void)1533 socket_is_send_pending(void)
1534 {
1535 	return (PqSendStart < PqSendPointer);
1536 }
1537 
1538 /* --------------------------------
1539  * Message-level I/O routines begin here.
1540  *
1541  * These routines understand about the old-style COPY OUT protocol.
1542  * --------------------------------
1543  */
1544 
1545 
1546 /* --------------------------------
1547  *		socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1548  *
1549  *		If msgtype is not '\0', it is a message type code to place before
1550  *		the message body.  If msgtype is '\0', then the message has no type
1551  *		code (this is only valid in pre-3.0 protocols).
1552  *
1553  *		len is the length of the message body data at *s.  In protocol 3.0
1554  *		and later, a message length word (equal to len+4 because it counts
1555  *		itself too) is inserted by this routine.
1556  *
1557  *		All normal messages are suppressed while old-style COPY OUT is in
1558  *		progress.  (In practice only a few notice messages might get emitted
1559  *		then; dropping them is annoying, but at least they will still appear
1560  *		in the postmaster log.)
1561  *
1562  *		We also suppress messages generated while pqcomm.c is busy.  This
1563  *		avoids any possibility of messages being inserted within other
1564  *		messages.  The only known trouble case arises if SIGQUIT occurs
1565  *		during a pqcomm.c routine --- quickdie() will try to send a warning
1566  *		message, and the most reasonable approach seems to be to drop it.
1567  *
1568  *		returns 0 if OK, EOF if trouble
1569  * --------------------------------
1570  */
1571 static int
socket_putmessage(char msgtype,const char * s,size_t len)1572 socket_putmessage(char msgtype, const char *s, size_t len)
1573 {
1574 	if (DoingCopyOut || PqCommBusy)
1575 		return 0;
1576 	PqCommBusy = true;
1577 	if (msgtype)
1578 		if (internal_putbytes(&msgtype, 1))
1579 			goto fail;
1580 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1581 	{
1582 		uint32		n32;
1583 
1584 		n32 = htonl((uint32) (len + 4));
1585 		if (internal_putbytes((char *) &n32, 4))
1586 			goto fail;
1587 	}
1588 	if (internal_putbytes(s, len))
1589 		goto fail;
1590 	PqCommBusy = false;
1591 	return 0;
1592 
1593 fail:
1594 	PqCommBusy = false;
1595 	return EOF;
1596 }
1597 
1598 /* --------------------------------
1599  *		pq_putmessage_noblock	- like pq_putmessage, but never blocks
1600  *
1601  *		If the output buffer is too small to hold the message, the buffer
1602  *		is enlarged.
1603  */
1604 static void
socket_putmessage_noblock(char msgtype,const char * s,size_t len)1605 socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1606 {
1607 	int			res PG_USED_FOR_ASSERTS_ONLY;
1608 	int			required;
1609 
1610 	/*
1611 	 * Ensure we have enough space in the output buffer for the message header
1612 	 * as well as the message itself.
1613 	 */
1614 	required = PqSendPointer + 1 + 4 + len;
1615 	if (required > PqSendBufferSize)
1616 	{
1617 		PqSendBuffer = repalloc(PqSendBuffer, required);
1618 		PqSendBufferSize = required;
1619 	}
1620 	res = pq_putmessage(msgtype, s, len);
1621 	Assert(res == 0);			/* should not fail when the message fits in
1622 								 * buffer */
1623 }
1624 
1625 
1626 /* --------------------------------
1627  *		socket_startcopyout - inform libpq that an old-style COPY OUT transfer
1628  *			is beginning
1629  * --------------------------------
1630  */
1631 static void
socket_startcopyout(void)1632 socket_startcopyout(void)
1633 {
1634 	DoingCopyOut = true;
1635 }
1636 
1637 /* --------------------------------
1638  *		socket_endcopyout	- end an old-style COPY OUT transfer
1639  *
1640  *		If errorAbort is indicated, we are aborting a COPY OUT due to an error,
1641  *		and must send a terminator line.  Since a partial data line might have
1642  *		been emitted, send a couple of newlines first (the first one could
1643  *		get absorbed by a backslash...)  Note that old-style COPY OUT does
1644  *		not allow binary transfers, so a textual terminator is always correct.
1645  * --------------------------------
1646  */
1647 static void
socket_endcopyout(bool errorAbort)1648 socket_endcopyout(bool errorAbort)
1649 {
1650 	if (!DoingCopyOut)
1651 		return;
1652 	if (errorAbort)
1653 		pq_putbytes("\n\n\\.\n", 5);
1654 	/* in non-error case, copy.c will have emitted the terminator line */
1655 	DoingCopyOut = false;
1656 }
1657 
1658 /*
1659  * Support for TCP Keepalive parameters
1660  */
1661 
1662 /*
1663  * On Windows, we need to set both idle and interval at the same time.
1664  * We also cannot reset them to the default (setting to zero will
1665  * actually set them to zero, not default), therefore we fallback to
1666  * the out-of-the-box default instead.
1667  */
1668 #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1669 static int
pq_setkeepaliveswin32(Port * port,int idle,int interval)1670 pq_setkeepaliveswin32(Port *port, int idle, int interval)
1671 {
1672 	struct tcp_keepalive ka;
1673 	DWORD		retsize;
1674 
1675 	if (idle <= 0)
1676 		idle = 2 * 60 * 60;		/* default = 2 hours */
1677 	if (interval <= 0)
1678 		interval = 1;			/* default = 1 second */
1679 
1680 	ka.onoff = 1;
1681 	ka.keepalivetime = idle * 1000;
1682 	ka.keepaliveinterval = interval * 1000;
1683 
1684 	if (WSAIoctl(port->sock,
1685 				 SIO_KEEPALIVE_VALS,
1686 				 (LPVOID) &ka,
1687 				 sizeof(ka),
1688 				 NULL,
1689 				 0,
1690 				 &retsize,
1691 				 NULL,
1692 				 NULL)
1693 		!= 0)
1694 	{
1695 		elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
1696 			 WSAGetLastError());
1697 		return STATUS_ERROR;
1698 	}
1699 	if (port->keepalives_idle != idle)
1700 		port->keepalives_idle = idle;
1701 	if (port->keepalives_interval != interval)
1702 		port->keepalives_interval = interval;
1703 	return STATUS_OK;
1704 }
1705 #endif
1706 
1707 int
pq_getkeepalivesidle(Port * port)1708 pq_getkeepalivesidle(Port *port)
1709 {
1710 #if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1711 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1712 		return 0;
1713 
1714 	if (port->keepalives_idle != 0)
1715 		return port->keepalives_idle;
1716 
1717 	if (port->default_keepalives_idle == 0)
1718 	{
1719 #ifndef WIN32
1720 		ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
1721 
1722 		if (getsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1723 					   (char *) &port->default_keepalives_idle,
1724 					   &size) < 0)
1725 		{
1726 			elog(LOG, "getsockopt(%s) failed: %m", PG_TCP_KEEPALIVE_IDLE_STR);
1727 			port->default_keepalives_idle = -1; /* don't know */
1728 		}
1729 #else							/* WIN32 */
1730 		/* We can't get the defaults on Windows, so return "don't know" */
1731 		port->default_keepalives_idle = -1;
1732 #endif							/* WIN32 */
1733 	}
1734 
1735 	return port->default_keepalives_idle;
1736 #else
1737 	return 0;
1738 #endif
1739 }
1740 
1741 int
pq_setkeepalivesidle(int idle,Port * port)1742 pq_setkeepalivesidle(int idle, Port *port)
1743 {
1744 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1745 		return STATUS_OK;
1746 
1747 /* check SIO_KEEPALIVE_VALS here, not just WIN32, as some toolchains lack it */
1748 #if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1749 	if (idle == port->keepalives_idle)
1750 		return STATUS_OK;
1751 
1752 #ifndef WIN32
1753 	if (port->default_keepalives_idle <= 0)
1754 	{
1755 		if (pq_getkeepalivesidle(port) < 0)
1756 		{
1757 			if (idle == 0)
1758 				return STATUS_OK;	/* default is set but unknown */
1759 			else
1760 				return STATUS_ERROR;
1761 		}
1762 	}
1763 
1764 	if (idle == 0)
1765 		idle = port->default_keepalives_idle;
1766 
1767 	if (setsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1768 				   (char *) &idle, sizeof(idle)) < 0)
1769 	{
1770 		elog(LOG, "setsockopt(%s) failed: %m", PG_TCP_KEEPALIVE_IDLE_STR);
1771 		return STATUS_ERROR;
1772 	}
1773 
1774 	port->keepalives_idle = idle;
1775 #else							/* WIN32 */
1776 	return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1777 #endif
1778 #else
1779 	if (idle != 0)
1780 	{
1781 		elog(LOG, "setting the keepalive idle time is not supported");
1782 		return STATUS_ERROR;
1783 	}
1784 #endif
1785 
1786 	return STATUS_OK;
1787 }
1788 
1789 int
pq_getkeepalivesinterval(Port * port)1790 pq_getkeepalivesinterval(Port *port)
1791 {
1792 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1793 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1794 		return 0;
1795 
1796 	if (port->keepalives_interval != 0)
1797 		return port->keepalives_interval;
1798 
1799 	if (port->default_keepalives_interval == 0)
1800 	{
1801 #ifndef WIN32
1802 		ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
1803 
1804 		if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1805 					   (char *) &port->default_keepalives_interval,
1806 					   &size) < 0)
1807 		{
1808 			elog(LOG, "getsockopt(%s) failed: %m", "TCP_KEEPINTVL");
1809 			port->default_keepalives_interval = -1; /* don't know */
1810 		}
1811 #else
1812 		/* We can't get the defaults on Windows, so return "don't know" */
1813 		port->default_keepalives_interval = -1;
1814 #endif							/* WIN32 */
1815 	}
1816 
1817 	return port->default_keepalives_interval;
1818 #else
1819 	return 0;
1820 #endif
1821 }
1822 
1823 int
pq_setkeepalivesinterval(int interval,Port * port)1824 pq_setkeepalivesinterval(int interval, Port *port)
1825 {
1826 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1827 		return STATUS_OK;
1828 
1829 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1830 	if (interval == port->keepalives_interval)
1831 		return STATUS_OK;
1832 
1833 #ifndef WIN32
1834 	if (port->default_keepalives_interval <= 0)
1835 	{
1836 		if (pq_getkeepalivesinterval(port) < 0)
1837 		{
1838 			if (interval == 0)
1839 				return STATUS_OK;	/* default is set but unknown */
1840 			else
1841 				return STATUS_ERROR;
1842 		}
1843 	}
1844 
1845 	if (interval == 0)
1846 		interval = port->default_keepalives_interval;
1847 
1848 	if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1849 				   (char *) &interval, sizeof(interval)) < 0)
1850 	{
1851 		elog(LOG, "setsockopt(%s) failed: %m", "TCP_KEEPINTVL");
1852 		return STATUS_ERROR;
1853 	}
1854 
1855 	port->keepalives_interval = interval;
1856 #else							/* WIN32 */
1857 	return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1858 #endif
1859 #else
1860 	if (interval != 0)
1861 	{
1862 		elog(LOG, "setsockopt(%s) not supported", "TCP_KEEPINTVL");
1863 		return STATUS_ERROR;
1864 	}
1865 #endif
1866 
1867 	return STATUS_OK;
1868 }
1869 
1870 int
pq_getkeepalivescount(Port * port)1871 pq_getkeepalivescount(Port *port)
1872 {
1873 #ifdef TCP_KEEPCNT
1874 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1875 		return 0;
1876 
1877 	if (port->keepalives_count != 0)
1878 		return port->keepalives_count;
1879 
1880 	if (port->default_keepalives_count == 0)
1881 	{
1882 		ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
1883 
1884 		if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1885 					   (char *) &port->default_keepalives_count,
1886 					   &size) < 0)
1887 		{
1888 			elog(LOG, "getsockopt(%s) failed: %m", "TCP_KEEPCNT");
1889 			port->default_keepalives_count = -1;	/* don't know */
1890 		}
1891 	}
1892 
1893 	return port->default_keepalives_count;
1894 #else
1895 	return 0;
1896 #endif
1897 }
1898 
1899 int
pq_setkeepalivescount(int count,Port * port)1900 pq_setkeepalivescount(int count, Port *port)
1901 {
1902 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1903 		return STATUS_OK;
1904 
1905 #ifdef TCP_KEEPCNT
1906 	if (count == port->keepalives_count)
1907 		return STATUS_OK;
1908 
1909 	if (port->default_keepalives_count <= 0)
1910 	{
1911 		if (pq_getkeepalivescount(port) < 0)
1912 		{
1913 			if (count == 0)
1914 				return STATUS_OK;	/* default is set but unknown */
1915 			else
1916 				return STATUS_ERROR;
1917 		}
1918 	}
1919 
1920 	if (count == 0)
1921 		count = port->default_keepalives_count;
1922 
1923 	if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1924 				   (char *) &count, sizeof(count)) < 0)
1925 	{
1926 		elog(LOG, "setsockopt(%s) failed: %m", "TCP_KEEPCNT");
1927 		return STATUS_ERROR;
1928 	}
1929 
1930 	port->keepalives_count = count;
1931 #else
1932 	if (count != 0)
1933 	{
1934 		elog(LOG, "setsockopt(%s) not supported", "TCP_KEEPCNT");
1935 		return STATUS_ERROR;
1936 	}
1937 #endif
1938 
1939 	return STATUS_OK;
1940 }
1941