1 /*-------------------------------------------------------------------------
2  *
3  * pqcomm.c
4  *	  Communication functions between the Frontend and the Backend
5  *
6  * These routines handle the low-level details of communication between
7  * frontend and backend.  They just shove data across the communication
8  * channel, and are ignorant of the semantics of the data --- or would be,
9  * except for major brain damage in the design of the old COPY OUT protocol.
10  * Unfortunately, COPY OUT was designed to commandeer the communication
11  * channel (it just transfers data without wrapping it into messages).
12  * No other messages can be sent while COPY OUT is in progress; and if the
13  * copy is aborted by an ereport(ERROR), we need to close out the copy so that
14  * the frontend gets back into sync.  Therefore, these routines have to be
15  * aware of COPY OUT state.  (New COPY-OUT is message-based and does *not*
16  * set the DoingCopyOut flag.)
17  *
18  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
19  * pq_putbytes(), especially if the message would require multiple calls
20  * to send.  Instead, use the routines in pqformat.c to construct the message
21  * in a buffer and then emit it in one call to pq_putmessage.  This ensures
22  * that the channel will not be clogged by an incomplete message if execution
23  * is aborted by ereport(ERROR) partway through the message.  The only
24  * non-libpq code that should call pq_putbytes directly is old-style COPY OUT.
25  *
26  * At one time, libpq was shared between frontend and backend, but now
27  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
28  * All that remains is similarities of names to trap the unwary...
29  *
30  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
31  * Portions Copyright (c) 1994, Regents of the University of California
32  *
33  *	src/backend/libpq/pqcomm.c
34  *
35  *-------------------------------------------------------------------------
36  */
37 
38 /*------------------------
39  * INTERFACE ROUTINES
40  *
41  * setup/teardown:
42  *		StreamServerPort	- Open postmaster's server port
43  *		StreamConnection	- Create new connection with client
44  *		StreamClose			- Close a client/backend connection
45  *		TouchSocketFiles	- Protect socket files against /tmp cleaners
46  *		pq_init			- initialize libpq at backend startup
47  *		pq_comm_reset	- reset libpq during error recovery
48  *		pq_close		- shutdown libpq at backend exit
49  *
50  * low-level I/O:
51  *		pq_getbytes		- get a known number of bytes from connection
52  *		pq_getstring	- get a null terminated string from connection
53  *		pq_getmessage	- get a message with length word from connection
54  *		pq_getbyte		- get next byte from connection
55  *		pq_peekbyte		- peek at next byte from connection
56  *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
57  *		pq_flush		- flush pending output
58  *		pq_flush_if_writable - flush pending output if writable without blocking
59  *		pq_getbyte_if_available - get a byte if available without blocking
60  *
61  * message-level I/O (and old-style-COPY-OUT cruft):
62  *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
63  *		pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
64  *		pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
65  *		pq_endcopyout	- end a COPY OUT transfer
66  *
67  *------------------------
68  */
69 #include "postgres.h"
70 
71 #include <signal.h>
72 #include <fcntl.h>
73 #include <grp.h>
74 #include <unistd.h>
75 #include <sys/file.h>
76 #include <sys/socket.h>
77 #include <sys/stat.h>
78 #include <sys/time.h>
79 #include <netdb.h>
80 #include <netinet/in.h>
81 #ifdef HAVE_NETINET_TCP_H
82 #include <netinet/tcp.h>
83 #endif
84 #include <arpa/inet.h>
85 #ifdef HAVE_UTIME_H
86 #include <utime.h>
87 #endif
88 #ifdef WIN32_ONLY_COMPILER		/* mstcpip.h is missing on mingw */
89 #include <mstcpip.h>
90 #endif
91 
92 #include "libpq/ip.h"
93 #include "libpq/libpq.h"
94 #include "miscadmin.h"
95 #include "storage/ipc.h"
96 #include "utils/guc.h"
97 #include "utils/memutils.h"
98 
99 /*
100  * Cope with the various platform-specific ways to spell TCP keepalive socket
101  * options.  This doesn't cover Windows, which as usual does its own thing.
102  */
103 #if defined(TCP_KEEPIDLE)
104 /* TCP_KEEPIDLE is the name of this option on Linux and *BSD */
105 #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPIDLE
106 #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPIDLE"
107 #elif defined(TCP_KEEPALIVE_THRESHOLD)
108 /* TCP_KEEPALIVE_THRESHOLD is the name of this option on Solaris >= 11 */
109 #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE_THRESHOLD
110 #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE_THRESHOLD"
111 #elif defined(TCP_KEEPALIVE) && defined(__darwin__)
112 /* TCP_KEEPALIVE is the name of this option on macOS */
113 /* Caution: Solaris has this symbol but it means something different */
114 #define PG_TCP_KEEPALIVE_IDLE TCP_KEEPALIVE
115 #define PG_TCP_KEEPALIVE_IDLE_STR "TCP_KEEPALIVE"
116 #endif
117 
118 /*
119  * Configuration options
120  */
121 int			Unix_socket_permissions;
122 char	   *Unix_socket_group;
123 
124 /* Where the Unix socket files are (list of palloc'd strings) */
125 static List *sock_paths = NIL;
126 
127 /*
128  * Buffers for low-level I/O.
129  *
130  * The receive buffer is fixed size. Send buffer is usually 8k, but can be
131  * enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
132  */
133 
134 #define PQ_SEND_BUFFER_SIZE 8192
135 #define PQ_RECV_BUFFER_SIZE 8192
136 
137 static char *PqSendBuffer;
138 static int	PqSendBufferSize;	/* Size send buffer */
139 static int	PqSendPointer;		/* Next index to store a byte in PqSendBuffer */
140 static int	PqSendStart;		/* Next index to send a byte in PqSendBuffer */
141 
142 static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
143 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
144 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
145 
146 /*
147  * Message status
148  */
149 static bool PqCommBusy;			/* busy sending data to the client */
150 static bool PqCommReadingMsg;	/* in the middle of reading a message */
151 static bool DoingCopyOut;		/* in old-protocol COPY OUT processing */
152 
153 
154 /* Internal functions */
155 static void socket_comm_reset(void);
156 static void socket_close(int code, Datum arg);
157 static void socket_set_nonblocking(bool nonblocking);
158 static int	socket_flush(void);
159 static int	socket_flush_if_writable(void);
160 static bool socket_is_send_pending(void);
161 static int	socket_putmessage(char msgtype, const char *s, size_t len);
162 static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
163 static void socket_startcopyout(void);
164 static void socket_endcopyout(bool errorAbort);
165 static int	internal_putbytes(const char *s, size_t len);
166 static int	internal_flush(void);
167 static void socket_set_nonblocking(bool nonblocking);
168 
169 #ifdef HAVE_UNIX_SOCKETS
170 static int	Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
171 static int	Setup_AF_UNIX(char *sock_path);
172 #endif   /* HAVE_UNIX_SOCKETS */
173 
174 static PQcommMethods PqCommSocketMethods = {
175 	socket_comm_reset,
176 	socket_flush,
177 	socket_flush_if_writable,
178 	socket_is_send_pending,
179 	socket_putmessage,
180 	socket_putmessage_noblock,
181 	socket_startcopyout,
182 	socket_endcopyout
183 };
184 
185 PQcommMethods *PqCommMethods = &PqCommSocketMethods;
186 
187 WaitEventSet *FeBeWaitSet;
188 
189 
190 /* --------------------------------
191  *		pq_init - initialize libpq at backend startup
192  * --------------------------------
193  */
194 void
pq_init(void)195 pq_init(void)
196 {
197 	/* initialize state variables */
198 	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
199 	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
200 	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
201 	PqCommBusy = false;
202 	PqCommReadingMsg = false;
203 	DoingCopyOut = false;
204 
205 	/* set up process-exit hook to close the socket */
206 	on_proc_exit(socket_close, 0);
207 
208 	/*
209 	 * In backends (as soon as forked) we operate the underlying socket in
210 	 * nonblocking mode and use latches to implement blocking semantics if
211 	 * needed. That allows us to provide safely interruptible reads and
212 	 * writes.
213 	 *
214 	 * Use COMMERROR on failure, because ERROR would try to send the error to
215 	 * the client, which might require changing the mode again, leading to
216 	 * infinite recursion.
217 	 */
218 #ifndef WIN32
219 	if (!pg_set_noblock(MyProcPort->sock))
220 		ereport(COMMERROR,
221 				(errmsg("could not set socket to nonblocking mode: %m")));
222 #endif
223 
224 	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
225 	AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
226 					  NULL, NULL);
227 	AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
228 	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
229 }
230 
231 /* --------------------------------
232  *		socket_comm_reset - reset libpq during error recovery
233  *
234  * This is called from error recovery at the outer idle loop.  It's
235  * just to get us out of trouble if we somehow manage to elog() from
236  * inside a pqcomm.c routine (which ideally will never happen, but...)
237  * --------------------------------
238  */
239 static void
socket_comm_reset(void)240 socket_comm_reset(void)
241 {
242 	/* Do not throw away pending data, but do reset the busy flag */
243 	PqCommBusy = false;
244 	/* We can abort any old-style COPY OUT, too */
245 	pq_endcopyout(true);
246 }
247 
248 /* --------------------------------
249  *		socket_close - shutdown libpq at backend exit
250  *
251  * This is the one pg_on_exit_callback in place during BackendInitialize().
252  * That function's unusual signal handling constrains that this callback be
253  * safe to run at any instant.
254  * --------------------------------
255  */
256 static void
socket_close(int code,Datum arg)257 socket_close(int code, Datum arg)
258 {
259 	/* Nothing to do in a standalone backend, where MyProcPort is NULL. */
260 	if (MyProcPort != NULL)
261 	{
262 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
263 #ifdef ENABLE_GSS
264 		OM_uint32	min_s;
265 
266 		/*
267 		 * Shutdown GSSAPI layer.  This section does nothing when interrupting
268 		 * BackendInitialize(), because pg_GSS_recvauth() makes first use of
269 		 * "ctx" and "cred".
270 		 */
271 		if (MyProcPort->gss->ctx != GSS_C_NO_CONTEXT)
272 			gss_delete_sec_context(&min_s, &MyProcPort->gss->ctx, NULL);
273 
274 		if (MyProcPort->gss->cred != GSS_C_NO_CREDENTIAL)
275 			gss_release_cred(&min_s, &MyProcPort->gss->cred);
276 #endif   /* ENABLE_GSS */
277 
278 		/*
279 		 * GSS and SSPI share the port->gss struct.  Since nowhere else does a
280 		 * postmaster child free this, doing so is safe when interrupting
281 		 * BackendInitialize().
282 		 */
283 		free(MyProcPort->gss);
284 #endif   /* ENABLE_GSS || ENABLE_SSPI */
285 
286 		/*
287 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
288 		 * call this, so this is safe when interrupting BackendInitialize().
289 		 */
290 		secure_close(MyProcPort);
291 
292 		/*
293 		 * Formerly we did an explicit close() here, but it seems better to
294 		 * leave the socket open until the process dies.  This allows clients
295 		 * to perform a "synchronous close" if they care --- wait till the
296 		 * transport layer reports connection closure, and you can be sure the
297 		 * backend has exited.
298 		 *
299 		 * We do set sock to PGINVALID_SOCKET to prevent any further I/O,
300 		 * though.
301 		 */
302 		MyProcPort->sock = PGINVALID_SOCKET;
303 	}
304 }
305 
306 
307 
308 /*
309  * Streams -- wrapper around Unix socket system calls
310  *
311  *
312  *		Stream functions are used for vanilla TCP connection protocol.
313  */
314 
315 
316 /*
317  * StreamServerPort -- open a "listening" port to accept connections.
318  *
319  * family should be AF_UNIX or AF_UNSPEC; portNumber is the port number.
320  * For AF_UNIX ports, hostName should be NULL and unixSocketDir must be
321  * specified.  For TCP ports, hostName is either NULL for all interfaces or
322  * the interface to listen on, and unixSocketDir is ignored (can be NULL).
323  *
324  * Successfully opened sockets are added to the ListenSocket[] array (of
325  * length MaxListen), at the first position that isn't PGINVALID_SOCKET.
326  *
327  * RETURNS: STATUS_OK or STATUS_ERROR
328  */
329 
330 int
StreamServerPort(int family,char * hostName,unsigned short portNumber,char * unixSocketDir,pgsocket ListenSocket[],int MaxListen)331 StreamServerPort(int family, char *hostName, unsigned short portNumber,
332 				 char *unixSocketDir,
333 				 pgsocket ListenSocket[], int MaxListen)
334 {
335 	pgsocket	fd;
336 	int			err;
337 	int			maxconn;
338 	int			ret;
339 	char		portNumberStr[32];
340 	const char *familyDesc;
341 	char		familyDescBuf[64];
342 	char	   *service;
343 	struct addrinfo *addrs = NULL,
344 			   *addr;
345 	struct addrinfo hint;
346 	int			listen_index = 0;
347 	int			added = 0;
348 
349 #ifdef HAVE_UNIX_SOCKETS
350 	char		unixSocketPath[MAXPGPATH];
351 #endif
352 #if !defined(WIN32) || defined(IPV6_V6ONLY)
353 	int			one = 1;
354 #endif
355 
356 	/* Initialize hint structure */
357 	MemSet(&hint, 0, sizeof(hint));
358 	hint.ai_family = family;
359 	hint.ai_flags = AI_PASSIVE;
360 	hint.ai_socktype = SOCK_STREAM;
361 
362 #ifdef HAVE_UNIX_SOCKETS
363 	if (family == AF_UNIX)
364 	{
365 		/*
366 		 * Create unixSocketPath from portNumber and unixSocketDir and lock
367 		 * that file path
368 		 */
369 		UNIXSOCK_PATH(unixSocketPath, portNumber, unixSocketDir);
370 		if (strlen(unixSocketPath) >= UNIXSOCK_PATH_BUFLEN)
371 		{
372 			ereport(LOG,
373 					(errmsg("Unix-domain socket path \"%s\" is too long (maximum %d bytes)",
374 							unixSocketPath,
375 							(int) (UNIXSOCK_PATH_BUFLEN - 1))));
376 			return STATUS_ERROR;
377 		}
378 		if (Lock_AF_UNIX(unixSocketDir, unixSocketPath) != STATUS_OK)
379 			return STATUS_ERROR;
380 		service = unixSocketPath;
381 	}
382 	else
383 #endif   /* HAVE_UNIX_SOCKETS */
384 	{
385 		snprintf(portNumberStr, sizeof(portNumberStr), "%d", portNumber);
386 		service = portNumberStr;
387 	}
388 
389 	ret = pg_getaddrinfo_all(hostName, service, &hint, &addrs);
390 	if (ret || !addrs)
391 	{
392 		if (hostName)
393 			ereport(LOG,
394 					(errmsg("could not translate host name \"%s\", service \"%s\" to address: %s",
395 							hostName, service, gai_strerror(ret))));
396 		else
397 			ereport(LOG,
398 				 (errmsg("could not translate service \"%s\" to address: %s",
399 						 service, gai_strerror(ret))));
400 		if (addrs)
401 			pg_freeaddrinfo_all(hint.ai_family, addrs);
402 		return STATUS_ERROR;
403 	}
404 
405 	for (addr = addrs; addr; addr = addr->ai_next)
406 	{
407 		if (!IS_AF_UNIX(family) && IS_AF_UNIX(addr->ai_family))
408 		{
409 			/*
410 			 * Only set up a unix domain socket when they really asked for it.
411 			 * The service/port is different in that case.
412 			 */
413 			continue;
414 		}
415 
416 		/* See if there is still room to add 1 more socket. */
417 		for (; listen_index < MaxListen; listen_index++)
418 		{
419 			if (ListenSocket[listen_index] == PGINVALID_SOCKET)
420 				break;
421 		}
422 		if (listen_index >= MaxListen)
423 		{
424 			ereport(LOG,
425 					(errmsg("could not bind to all requested addresses: MAXLISTEN (%d) exceeded",
426 							MaxListen)));
427 			break;
428 		}
429 
430 		/* set up family name for possible error messages */
431 		switch (addr->ai_family)
432 		{
433 			case AF_INET:
434 				familyDesc = _("IPv4");
435 				break;
436 #ifdef HAVE_IPV6
437 			case AF_INET6:
438 				familyDesc = _("IPv6");
439 				break;
440 #endif
441 #ifdef HAVE_UNIX_SOCKETS
442 			case AF_UNIX:
443 				familyDesc = _("Unix");
444 				break;
445 #endif
446 			default:
447 				snprintf(familyDescBuf, sizeof(familyDescBuf),
448 						 _("unrecognized address family %d"),
449 						 addr->ai_family);
450 				familyDesc = familyDescBuf;
451 				break;
452 		}
453 
454 		if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
455 		{
456 			ereport(LOG,
457 					(errcode_for_socket_access(),
458 			/* translator: %s is IPv4, IPv6, or Unix */
459 					 errmsg("could not create %s socket: %m",
460 							familyDesc)));
461 			continue;
462 		}
463 
464 #ifndef WIN32
465 
466 		/*
467 		 * Without the SO_REUSEADDR flag, a new postmaster can't be started
468 		 * right away after a stop or crash, giving "address already in use"
469 		 * error on TCP ports.
470 		 *
471 		 * On win32, however, this behavior only happens if the
472 		 * SO_EXLUSIVEADDRUSE is set. With SO_REUSEADDR, win32 allows multiple
473 		 * servers to listen on the same address, resulting in unpredictable
474 		 * behavior. With no flags at all, win32 behaves as Unix with
475 		 * SO_REUSEADDR.
476 		 */
477 		if (!IS_AF_UNIX(addr->ai_family))
478 		{
479 			if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
480 							(char *) &one, sizeof(one))) == -1)
481 			{
482 				ereport(LOG,
483 						(errcode_for_socket_access(),
484 						 errmsg("setsockopt(SO_REUSEADDR) failed: %m")));
485 				closesocket(fd);
486 				continue;
487 			}
488 		}
489 #endif
490 
491 #ifdef IPV6_V6ONLY
492 		if (addr->ai_family == AF_INET6)
493 		{
494 			if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
495 						   (char *) &one, sizeof(one)) == -1)
496 			{
497 				ereport(LOG,
498 						(errcode_for_socket_access(),
499 						 errmsg("setsockopt(IPV6_V6ONLY) failed: %m")));
500 				closesocket(fd);
501 				continue;
502 			}
503 		}
504 #endif
505 
506 		/*
507 		 * Note: This might fail on some OS's, like Linux older than
508 		 * 2.4.21-pre3, that don't have the IPV6_V6ONLY socket option, and map
509 		 * ipv4 addresses to ipv6.  It will show ::ffff:ipv4 for all ipv4
510 		 * connections.
511 		 */
512 		err = bind(fd, addr->ai_addr, addr->ai_addrlen);
513 		if (err < 0)
514 		{
515 			ereport(LOG,
516 					(errcode_for_socket_access(),
517 			/* translator: %s is IPv4, IPv6, or Unix */
518 					 errmsg("could not bind %s socket: %m",
519 							familyDesc),
520 					 (IS_AF_UNIX(addr->ai_family)) ?
521 				  errhint("Is another postmaster already running on port %d?"
522 						  " If not, remove socket file \"%s\" and retry.",
523 						  (int) portNumber, service) :
524 				  errhint("Is another postmaster already running on port %d?"
525 						  " If not, wait a few seconds and retry.",
526 						  (int) portNumber)));
527 			closesocket(fd);
528 			continue;
529 		}
530 
531 #ifdef HAVE_UNIX_SOCKETS
532 		if (addr->ai_family == AF_UNIX)
533 		{
534 			if (Setup_AF_UNIX(service) != STATUS_OK)
535 			{
536 				closesocket(fd);
537 				break;
538 			}
539 		}
540 #endif
541 
542 		/*
543 		 * Select appropriate accept-queue length limit.  PG_SOMAXCONN is only
544 		 * intended to provide a clamp on the request on platforms where an
545 		 * overly large request provokes a kernel error (are there any?).
546 		 */
547 		maxconn = MaxBackends * 2;
548 		if (maxconn > PG_SOMAXCONN)
549 			maxconn = PG_SOMAXCONN;
550 
551 		err = listen(fd, maxconn);
552 		if (err < 0)
553 		{
554 			ereport(LOG,
555 					(errcode_for_socket_access(),
556 			/* translator: %s is IPv4, IPv6, or Unix */
557 					 errmsg("could not listen on %s socket: %m",
558 							familyDesc)));
559 			closesocket(fd);
560 			continue;
561 		}
562 		ListenSocket[listen_index] = fd;
563 		added++;
564 	}
565 
566 	pg_freeaddrinfo_all(hint.ai_family, addrs);
567 
568 	if (!added)
569 		return STATUS_ERROR;
570 
571 	return STATUS_OK;
572 }
573 
574 
575 #ifdef HAVE_UNIX_SOCKETS
576 
577 /*
578  * Lock_AF_UNIX -- configure unix socket file path
579  */
580 static int
Lock_AF_UNIX(char * unixSocketDir,char * unixSocketPath)581 Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath)
582 {
583 	/*
584 	 * Grab an interlock file associated with the socket file.
585 	 *
586 	 * Note: there are two reasons for using a socket lock file, rather than
587 	 * trying to interlock directly on the socket itself.  First, it's a lot
588 	 * more portable, and second, it lets us remove any pre-existing socket
589 	 * file without race conditions.
590 	 */
591 	CreateSocketLockFile(unixSocketPath, true, unixSocketDir);
592 
593 	/*
594 	 * Once we have the interlock, we can safely delete any pre-existing
595 	 * socket file to avoid failure at bind() time.
596 	 */
597 	(void) unlink(unixSocketPath);
598 
599 	/*
600 	 * Remember socket file pathnames for later maintenance.
601 	 */
602 	sock_paths = lappend(sock_paths, pstrdup(unixSocketPath));
603 
604 	return STATUS_OK;
605 }
606 
607 
608 /*
609  * Setup_AF_UNIX -- configure unix socket permissions
610  */
611 static int
Setup_AF_UNIX(char * sock_path)612 Setup_AF_UNIX(char *sock_path)
613 {
614 	/*
615 	 * Fix socket ownership/permission if requested.  Note we must do this
616 	 * before we listen() to avoid a window where unwanted connections could
617 	 * get accepted.
618 	 */
619 	Assert(Unix_socket_group);
620 	if (Unix_socket_group[0] != '\0')
621 	{
622 #ifdef WIN32
623 		elog(WARNING, "configuration item unix_socket_group is not supported on this platform");
624 #else
625 		char	   *endptr;
626 		unsigned long val;
627 		gid_t		gid;
628 
629 		val = strtoul(Unix_socket_group, &endptr, 10);
630 		if (*endptr == '\0')
631 		{						/* numeric group id */
632 			gid = val;
633 		}
634 		else
635 		{						/* convert group name to id */
636 			struct group *gr;
637 
638 			gr = getgrnam(Unix_socket_group);
639 			if (!gr)
640 			{
641 				ereport(LOG,
642 						(errmsg("group \"%s\" does not exist",
643 								Unix_socket_group)));
644 				return STATUS_ERROR;
645 			}
646 			gid = gr->gr_gid;
647 		}
648 		if (chown(sock_path, -1, gid) == -1)
649 		{
650 			ereport(LOG,
651 					(errcode_for_file_access(),
652 					 errmsg("could not set group of file \"%s\": %m",
653 							sock_path)));
654 			return STATUS_ERROR;
655 		}
656 #endif
657 	}
658 
659 	if (chmod(sock_path, Unix_socket_permissions) == -1)
660 	{
661 		ereport(LOG,
662 				(errcode_for_file_access(),
663 				 errmsg("could not set permissions of file \"%s\": %m",
664 						sock_path)));
665 		return STATUS_ERROR;
666 	}
667 	return STATUS_OK;
668 }
669 #endif   /* HAVE_UNIX_SOCKETS */
670 
671 
672 /*
673  * StreamConnection -- create a new connection with client using
674  *		server port.  Set port->sock to the FD of the new connection.
675  *
676  * ASSUME: that this doesn't need to be non-blocking because
677  *		the Postmaster uses select() to tell when the server master
678  *		socket is ready for accept().
679  *
680  * RETURNS: STATUS_OK or STATUS_ERROR
681  */
682 int
StreamConnection(pgsocket server_fd,Port * port)683 StreamConnection(pgsocket server_fd, Port *port)
684 {
685 	/* accept connection and fill in the client (remote) address */
686 	port->raddr.salen = sizeof(port->raddr.addr);
687 	if ((port->sock = accept(server_fd,
688 							 (struct sockaddr *) & port->raddr.addr,
689 							 &port->raddr.salen)) == PGINVALID_SOCKET)
690 	{
691 		ereport(LOG,
692 				(errcode_for_socket_access(),
693 				 errmsg("could not accept new connection: %m")));
694 
695 		/*
696 		 * If accept() fails then postmaster.c will still see the server
697 		 * socket as read-ready, and will immediately try again.  To avoid
698 		 * uselessly sucking lots of CPU, delay a bit before trying again.
699 		 * (The most likely reason for failure is being out of kernel file
700 		 * table slots; we can do little except hope some will get freed up.)
701 		 */
702 		pg_usleep(100000L);		/* wait 0.1 sec */
703 		return STATUS_ERROR;
704 	}
705 
706 #ifdef SCO_ACCEPT_BUG
707 
708 	/*
709 	 * UnixWare 7+ and OpenServer 5.0.4 are known to have this bug, but it
710 	 * shouldn't hurt to catch it for all versions of those platforms.
711 	 */
712 	if (port->raddr.addr.ss_family == 0)
713 		port->raddr.addr.ss_family = AF_UNIX;
714 #endif
715 
716 	/* fill in the server (local) address */
717 	port->laddr.salen = sizeof(port->laddr.addr);
718 	if (getsockname(port->sock,
719 					(struct sockaddr *) & port->laddr.addr,
720 					&port->laddr.salen) < 0)
721 	{
722 		elog(LOG, "getsockname() failed: %m");
723 		return STATUS_ERROR;
724 	}
725 
726 	/* select NODELAY and KEEPALIVE options if it's a TCP connection */
727 	if (!IS_AF_UNIX(port->laddr.addr.ss_family))
728 	{
729 		int			on;
730 #ifdef WIN32
731 		int			oldopt;
732 		int			optlen;
733 		int			newopt;
734 #endif
735 
736 #ifdef	TCP_NODELAY
737 		on = 1;
738 		if (setsockopt(port->sock, IPPROTO_TCP, TCP_NODELAY,
739 					   (char *) &on, sizeof(on)) < 0)
740 		{
741 			elog(LOG, "setsockopt(%s) failed: %m", "TCP_NODELAY");
742 			return STATUS_ERROR;
743 		}
744 #endif
745 		on = 1;
746 		if (setsockopt(port->sock, SOL_SOCKET, SO_KEEPALIVE,
747 					   (char *) &on, sizeof(on)) < 0)
748 		{
749 			elog(LOG, "setsockopt(%s) failed: %m", "SO_KEEPALIVE");
750 			return STATUS_ERROR;
751 		}
752 
753 #ifdef WIN32
754 
755 		/*
756 		 * This is a Win32 socket optimization.  The OS send buffer should be
757 		 * large enough to send the whole Postgres send buffer in one go, or
758 		 * performance suffers.  The Postgres send buffer can be enlarged if a
759 		 * very large message needs to be sent, but we won't attempt to
760 		 * enlarge the OS buffer if that happens, so somewhat arbitrarily
761 		 * ensure that the OS buffer is at least PQ_SEND_BUFFER_SIZE * 4.
762 		 * (That's 32kB with the current default).
763 		 *
764 		 * The default OS buffer size used to be 8kB in earlier Windows
765 		 * versions, but was raised to 64kB in Windows 2012.  So it shouldn't
766 		 * be necessary to change it in later versions anymore.  Changing it
767 		 * unnecessarily can even reduce performance, because setting
768 		 * SO_SNDBUF in the application disables the "dynamic send buffering"
769 		 * feature that was introduced in Windows 7.  So before fiddling with
770 		 * SO_SNDBUF, check if the current buffer size is already large enough
771 		 * and only increase it if necessary.
772 		 *
773 		 * See https://support.microsoft.com/kb/823764/EN-US/ and
774 		 * https://msdn.microsoft.com/en-us/library/bb736549%28v=vs.85%29.aspx
775 		 */
776 		optlen = sizeof(oldopt);
777 		if (getsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &oldopt,
778 					   &optlen) < 0)
779 		{
780 			elog(LOG, "getsockopt(%s) failed: %m", "SO_SNDBUF");
781 			return STATUS_ERROR;
782 		}
783 		newopt = PQ_SEND_BUFFER_SIZE * 4;
784 		if (oldopt < newopt)
785 		{
786 			if (setsockopt(port->sock, SOL_SOCKET, SO_SNDBUF, (char *) &newopt,
787 						   sizeof(newopt)) < 0)
788 			{
789 				elog(LOG, "setsockopt(%s) failed: %m", "SO_SNDBUF");
790 				return STATUS_ERROR;
791 			}
792 		}
793 #endif
794 
795 		/*
796 		 * Also apply the current keepalive parameters.  If we fail to set a
797 		 * parameter, don't error out, because these aren't universally
798 		 * supported.  (Note: you might think we need to reset the GUC
799 		 * variables to 0 in such a case, but it's not necessary because the
800 		 * show hooks for these variables report the truth anyway.)
801 		 */
802 		(void) pq_setkeepalivesidle(tcp_keepalives_idle, port);
803 		(void) pq_setkeepalivesinterval(tcp_keepalives_interval, port);
804 		(void) pq_setkeepalivescount(tcp_keepalives_count, port);
805 	}
806 
807 	return STATUS_OK;
808 }
809 
810 /*
811  * StreamClose -- close a client/backend connection
812  *
813  * NOTE: this is NOT used to terminate a session; it is just used to release
814  * the file descriptor in a process that should no longer have the socket
815  * open.  (For example, the postmaster calls this after passing ownership
816  * of the connection to a child process.)  It is expected that someone else
817  * still has the socket open.  So, we only want to close the descriptor,
818  * we do NOT want to send anything to the far end.
819  */
820 void
StreamClose(pgsocket sock)821 StreamClose(pgsocket sock)
822 {
823 	closesocket(sock);
824 }
825 
826 /*
827  * TouchSocketFiles -- mark socket files as recently accessed
828  *
829  * This routine should be called every so often to ensure that the socket
830  * files have a recent mod date (ordinary operations on sockets usually won't
831  * change the mod date).  That saves them from being removed by
832  * overenthusiastic /tmp-directory-cleaner daemons.  (Another reason we should
833  * never have put the socket file in /tmp...)
834  */
835 void
TouchSocketFiles(void)836 TouchSocketFiles(void)
837 {
838 	ListCell   *l;
839 
840 	/* Loop through all created sockets... */
841 	foreach(l, sock_paths)
842 	{
843 		char	   *sock_path = (char *) lfirst(l);
844 
845 		/*
846 		 * utime() is POSIX standard, utimes() is a common alternative. If we
847 		 * have neither, there's no way to affect the mod or access time of
848 		 * the socket :-(
849 		 *
850 		 * In either path, we ignore errors; there's no point in complaining.
851 		 */
852 #ifdef HAVE_UTIME
853 		utime(sock_path, NULL);
854 #else							/* !HAVE_UTIME */
855 #ifdef HAVE_UTIMES
856 		utimes(sock_path, NULL);
857 #endif   /* HAVE_UTIMES */
858 #endif   /* HAVE_UTIME */
859 	}
860 }
861 
862 /*
863  * RemoveSocketFiles -- unlink socket files at postmaster shutdown
864  */
865 void
RemoveSocketFiles(void)866 RemoveSocketFiles(void)
867 {
868 	ListCell   *l;
869 
870 	/* Loop through all created sockets... */
871 	foreach(l, sock_paths)
872 	{
873 		char	   *sock_path = (char *) lfirst(l);
874 
875 		/* Ignore any error. */
876 		(void) unlink(sock_path);
877 	}
878 	/* Since we're about to exit, no need to reclaim storage */
879 	sock_paths = NIL;
880 }
881 
882 
883 /* --------------------------------
884  * Low-level I/O routines begin here.
885  *
886  * These routines communicate with a frontend client across a connection
887  * already established by the preceding routines.
888  * --------------------------------
889  */
890 
891 /* --------------------------------
892  *			  socket_set_nonblocking - set socket blocking/non-blocking
893  *
894  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
895  * blocking otherwise.
896  * --------------------------------
897  */
898 static void
socket_set_nonblocking(bool nonblocking)899 socket_set_nonblocking(bool nonblocking)
900 {
901 	if (MyProcPort == NULL)
902 		ereport(ERROR,
903 				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
904 				 errmsg("there is no client connection")));
905 
906 	MyProcPort->noblock = nonblocking;
907 }
908 
909 /* --------------------------------
910  *		pq_recvbuf - load some bytes into the input buffer
911  *
912  *		returns 0 if OK, EOF if trouble
913  * --------------------------------
914  */
915 static int
pq_recvbuf(void)916 pq_recvbuf(void)
917 {
918 	if (PqRecvPointer > 0)
919 	{
920 		if (PqRecvLength > PqRecvPointer)
921 		{
922 			/* still some unread data, left-justify it in the buffer */
923 			memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
924 					PqRecvLength - PqRecvPointer);
925 			PqRecvLength -= PqRecvPointer;
926 			PqRecvPointer = 0;
927 		}
928 		else
929 			PqRecvLength = PqRecvPointer = 0;
930 	}
931 
932 	/* Ensure that we're in blocking mode */
933 	socket_set_nonblocking(false);
934 
935 	/* Can fill buffer from PqRecvLength and upwards */
936 	for (;;)
937 	{
938 		int			r;
939 
940 		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
941 						PQ_RECV_BUFFER_SIZE - PqRecvLength);
942 
943 		if (r < 0)
944 		{
945 			if (errno == EINTR)
946 				continue;		/* Ok if interrupted */
947 
948 			/*
949 			 * Careful: an ereport() that tries to write to the client would
950 			 * cause recursion to here, leading to stack overflow and core
951 			 * dump!  This message must go *only* to the postmaster log.
952 			 */
953 			ereport(COMMERROR,
954 					(errcode_for_socket_access(),
955 					 errmsg("could not receive data from client: %m")));
956 			return EOF;
957 		}
958 		if (r == 0)
959 		{
960 			/*
961 			 * EOF detected.  We used to write a log message here, but it's
962 			 * better to expect the ultimate caller to do that.
963 			 */
964 			return EOF;
965 		}
966 		/* r contains number of bytes read, so just incr length */
967 		PqRecvLength += r;
968 		return 0;
969 	}
970 }
971 
972 /* --------------------------------
973  *		pq_getbyte	- get a single byte from connection, or return EOF
974  * --------------------------------
975  */
976 int
pq_getbyte(void)977 pq_getbyte(void)
978 {
979 	Assert(PqCommReadingMsg);
980 
981 	while (PqRecvPointer >= PqRecvLength)
982 	{
983 		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
984 			return EOF;			/* Failed to recv data */
985 	}
986 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
987 }
988 
989 /* --------------------------------
990  *		pq_peekbyte		- peek at next byte from connection
991  *
992  *	 Same as pq_getbyte() except we don't advance the pointer.
993  * --------------------------------
994  */
995 int
pq_peekbyte(void)996 pq_peekbyte(void)
997 {
998 	Assert(PqCommReadingMsg);
999 
1000 	while (PqRecvPointer >= PqRecvLength)
1001 	{
1002 		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
1003 			return EOF;			/* Failed to recv data */
1004 	}
1005 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
1006 }
1007 
1008 /* --------------------------------
1009  *		pq_getbyte_if_available - get a single byte from connection,
1010  *			if available
1011  *
1012  * The received byte is stored in *c. Returns 1 if a byte was read,
1013  * 0 if no data was available, or EOF if trouble.
1014  * --------------------------------
1015  */
1016 int
pq_getbyte_if_available(unsigned char * c)1017 pq_getbyte_if_available(unsigned char *c)
1018 {
1019 	int			r;
1020 
1021 	Assert(PqCommReadingMsg);
1022 
1023 	if (PqRecvPointer < PqRecvLength)
1024 	{
1025 		*c = PqRecvBuffer[PqRecvPointer++];
1026 		return 1;
1027 	}
1028 
1029 	/* Put the socket into non-blocking mode */
1030 	socket_set_nonblocking(true);
1031 
1032 	r = secure_read(MyProcPort, c, 1);
1033 	if (r < 0)
1034 	{
1035 		/*
1036 		 * Ok if no data available without blocking or interrupted (though
1037 		 * EINTR really shouldn't happen with a non-blocking socket). Report
1038 		 * other errors.
1039 		 */
1040 		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
1041 			r = 0;
1042 		else
1043 		{
1044 			/*
1045 			 * Careful: an ereport() that tries to write to the client would
1046 			 * cause recursion to here, leading to stack overflow and core
1047 			 * dump!  This message must go *only* to the postmaster log.
1048 			 */
1049 			ereport(COMMERROR,
1050 					(errcode_for_socket_access(),
1051 					 errmsg("could not receive data from client: %m")));
1052 			r = EOF;
1053 		}
1054 	}
1055 	else if (r == 0)
1056 	{
1057 		/* EOF detected */
1058 		r = EOF;
1059 	}
1060 
1061 	return r;
1062 }
1063 
1064 /* --------------------------------
1065  *		pq_getbytes		- get a known number of bytes from connection
1066  *
1067  *		returns 0 if OK, EOF if trouble
1068  * --------------------------------
1069  */
1070 int
pq_getbytes(char * s,size_t len)1071 pq_getbytes(char *s, size_t len)
1072 {
1073 	size_t		amount;
1074 
1075 	Assert(PqCommReadingMsg);
1076 
1077 	while (len > 0)
1078 	{
1079 		while (PqRecvPointer >= PqRecvLength)
1080 		{
1081 			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
1082 				return EOF;		/* Failed to recv data */
1083 		}
1084 		amount = PqRecvLength - PqRecvPointer;
1085 		if (amount > len)
1086 			amount = len;
1087 		memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
1088 		PqRecvPointer += amount;
1089 		s += amount;
1090 		len -= amount;
1091 	}
1092 	return 0;
1093 }
1094 
1095 /* --------------------------------
1096  *		pq_discardbytes		- throw away a known number of bytes
1097  *
1098  *		same as pq_getbytes except we do not copy the data to anyplace.
1099  *		this is used for resynchronizing after read errors.
1100  *
1101  *		returns 0 if OK, EOF if trouble
1102  * --------------------------------
1103  */
1104 static int
pq_discardbytes(size_t len)1105 pq_discardbytes(size_t len)
1106 {
1107 	size_t		amount;
1108 
1109 	Assert(PqCommReadingMsg);
1110 
1111 	while (len > 0)
1112 	{
1113 		while (PqRecvPointer >= PqRecvLength)
1114 		{
1115 			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
1116 				return EOF;		/* Failed to recv data */
1117 		}
1118 		amount = PqRecvLength - PqRecvPointer;
1119 		if (amount > len)
1120 			amount = len;
1121 		PqRecvPointer += amount;
1122 		len -= amount;
1123 	}
1124 	return 0;
1125 }
1126 
1127 /* --------------------------------
1128  *		pq_getstring	- get a null terminated string from connection
1129  *
1130  *		The return value is placed in an expansible StringInfo, which has
1131  *		already been initialized by the caller.
1132  *
1133  *		This is used only for dealing with old-protocol clients.  The idea
1134  *		is to produce a StringInfo that looks the same as we would get from
1135  *		pq_getmessage() with a newer client; we will then process it with
1136  *		pq_getmsgstring.  Therefore, no character set conversion is done here,
1137  *		even though this is presumably useful only for text.
1138  *
1139  *		returns 0 if OK, EOF if trouble
1140  * --------------------------------
1141  */
1142 int
pq_getstring(StringInfo s)1143 pq_getstring(StringInfo s)
1144 {
1145 	int			i;
1146 
1147 	Assert(PqCommReadingMsg);
1148 
1149 	resetStringInfo(s);
1150 
1151 	/* Read until we get the terminating '\0' */
1152 	for (;;)
1153 	{
1154 		while (PqRecvPointer >= PqRecvLength)
1155 		{
1156 			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
1157 				return EOF;		/* Failed to recv data */
1158 		}
1159 
1160 		for (i = PqRecvPointer; i < PqRecvLength; i++)
1161 		{
1162 			if (PqRecvBuffer[i] == '\0')
1163 			{
1164 				/* include the '\0' in the copy */
1165 				appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1166 									   i - PqRecvPointer + 1);
1167 				PqRecvPointer = i + 1;	/* advance past \0 */
1168 				return 0;
1169 			}
1170 		}
1171 
1172 		/* If we're here we haven't got the \0 in the buffer yet. */
1173 		appendBinaryStringInfo(s, PqRecvBuffer + PqRecvPointer,
1174 							   PqRecvLength - PqRecvPointer);
1175 		PqRecvPointer = PqRecvLength;
1176 	}
1177 }
1178 
1179 /* --------------------------------
1180  *		pq_buffer_has_data		- is any buffered data available to read?
1181  *
1182  * This will *not* attempt to read more data.
1183  * --------------------------------
1184  */
1185 bool
pq_buffer_has_data(void)1186 pq_buffer_has_data(void)
1187 {
1188 	return (PqRecvPointer < PqRecvLength);
1189 }
1190 
1191 
1192 /* --------------------------------
1193  *		pq_startmsgread - begin reading a message from the client.
1194  *
1195  *		This must be called before any of the pq_get* functions.
1196  * --------------------------------
1197  */
1198 void
pq_startmsgread(void)1199 pq_startmsgread(void)
1200 {
1201 	/*
1202 	 * There shouldn't be a read active already, but let's check just to be
1203 	 * sure.
1204 	 */
1205 	if (PqCommReadingMsg)
1206 		ereport(FATAL,
1207 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
1208 				 errmsg("terminating connection because protocol synchronization was lost")));
1209 
1210 	PqCommReadingMsg = true;
1211 }
1212 
1213 
1214 /* --------------------------------
1215  *		pq_endmsgread	- finish reading message.
1216  *
1217  *		This must be called after reading a V2 protocol message with
1218  *		pq_getstring() and friends, to indicate that we have read the whole
1219  *		message. In V3 protocol, pq_getmessage() does this implicitly.
1220  * --------------------------------
1221  */
1222 void
pq_endmsgread(void)1223 pq_endmsgread(void)
1224 {
1225 	Assert(PqCommReadingMsg);
1226 
1227 	PqCommReadingMsg = false;
1228 }
1229 
1230 /* --------------------------------
1231  *		pq_is_reading_msg - are we currently reading a message?
1232  *
1233  * This is used in error recovery at the outer idle loop to detect if we have
1234  * lost protocol sync, and need to terminate the connection. pq_startmsgread()
1235  * will check for that too, but it's nicer to detect it earlier.
1236  * --------------------------------
1237  */
1238 bool
pq_is_reading_msg(void)1239 pq_is_reading_msg(void)
1240 {
1241 	return PqCommReadingMsg;
1242 }
1243 
1244 /* --------------------------------
1245  *		pq_getmessage	- get a message with length word from connection
1246  *
1247  *		The return value is placed in an expansible StringInfo, which has
1248  *		already been initialized by the caller.
1249  *		Only the message body is placed in the StringInfo; the length word
1250  *		is removed.  Also, s->cursor is initialized to zero for convenience
1251  *		in scanning the message contents.
1252  *
1253  *		If maxlen is not zero, it is an upper limit on the length of the
1254  *		message we are willing to accept.  We abort the connection (by
1255  *		returning EOF) if client tries to send more than that.
1256  *
1257  *		returns 0 if OK, EOF if trouble
1258  * --------------------------------
1259  */
1260 int
pq_getmessage(StringInfo s,int maxlen)1261 pq_getmessage(StringInfo s, int maxlen)
1262 {
1263 	int32		len;
1264 
1265 	Assert(PqCommReadingMsg);
1266 
1267 	resetStringInfo(s);
1268 
1269 	/* Read message length word */
1270 	if (pq_getbytes((char *) &len, 4) == EOF)
1271 	{
1272 		ereport(COMMERROR,
1273 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
1274 				 errmsg("unexpected EOF within message length word")));
1275 		return EOF;
1276 	}
1277 
1278 	len = ntohl(len);
1279 
1280 	if (len < 4 ||
1281 		(maxlen > 0 && len > maxlen))
1282 	{
1283 		ereport(COMMERROR,
1284 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
1285 				 errmsg("invalid message length")));
1286 		return EOF;
1287 	}
1288 
1289 	len -= 4;					/* discount length itself */
1290 
1291 	if (len > 0)
1292 	{
1293 		/*
1294 		 * Allocate space for message.  If we run out of room (ridiculously
1295 		 * large message), we will elog(ERROR), but we want to discard the
1296 		 * message body so as not to lose communication sync.
1297 		 */
1298 		PG_TRY();
1299 		{
1300 			enlargeStringInfo(s, len);
1301 		}
1302 		PG_CATCH();
1303 		{
1304 			if (pq_discardbytes(len) == EOF)
1305 				ereport(COMMERROR,
1306 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
1307 						 errmsg("incomplete message from client")));
1308 
1309 			/* we discarded the rest of the message so we're back in sync. */
1310 			PqCommReadingMsg = false;
1311 			PG_RE_THROW();
1312 		}
1313 		PG_END_TRY();
1314 
1315 		/* And grab the message */
1316 		if (pq_getbytes(s->data, len) == EOF)
1317 		{
1318 			ereport(COMMERROR,
1319 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1320 					 errmsg("incomplete message from client")));
1321 			return EOF;
1322 		}
1323 		s->len = len;
1324 		/* Place a trailing null per StringInfo convention */
1325 		s->data[len] = '\0';
1326 	}
1327 
1328 	/* finished reading the message. */
1329 	PqCommReadingMsg = false;
1330 
1331 	return 0;
1332 }
1333 
1334 
1335 /* --------------------------------
1336  *		pq_putbytes		- send bytes to connection (not flushed until pq_flush)
1337  *
1338  *		returns 0 if OK, EOF if trouble
1339  * --------------------------------
1340  */
1341 int
pq_putbytes(const char * s,size_t len)1342 pq_putbytes(const char *s, size_t len)
1343 {
1344 	int			res;
1345 
1346 	/* Should only be called by old-style COPY OUT */
1347 	Assert(DoingCopyOut);
1348 	/* No-op if reentrant call */
1349 	if (PqCommBusy)
1350 		return 0;
1351 	PqCommBusy = true;
1352 	res = internal_putbytes(s, len);
1353 	PqCommBusy = false;
1354 	return res;
1355 }
1356 
1357 static int
internal_putbytes(const char * s,size_t len)1358 internal_putbytes(const char *s, size_t len)
1359 {
1360 	size_t		amount;
1361 
1362 	while (len > 0)
1363 	{
1364 		/* If buffer is full, then flush it out */
1365 		if (PqSendPointer >= PqSendBufferSize)
1366 		{
1367 			socket_set_nonblocking(false);
1368 			if (internal_flush())
1369 				return EOF;
1370 		}
1371 		amount = PqSendBufferSize - PqSendPointer;
1372 		if (amount > len)
1373 			amount = len;
1374 		memcpy(PqSendBuffer + PqSendPointer, s, amount);
1375 		PqSendPointer += amount;
1376 		s += amount;
1377 		len -= amount;
1378 	}
1379 	return 0;
1380 }
1381 
1382 /* --------------------------------
1383  *		socket_flush		- flush pending output
1384  *
1385  *		returns 0 if OK, EOF if trouble
1386  * --------------------------------
1387  */
1388 static int
socket_flush(void)1389 socket_flush(void)
1390 {
1391 	int			res;
1392 
1393 	/* No-op if reentrant call */
1394 	if (PqCommBusy)
1395 		return 0;
1396 	PqCommBusy = true;
1397 	socket_set_nonblocking(false);
1398 	res = internal_flush();
1399 	PqCommBusy = false;
1400 	return res;
1401 }
1402 
1403 /* --------------------------------
1404  *		internal_flush - flush pending output
1405  *
1406  * Returns 0 if OK (meaning everything was sent, or operation would block
1407  * and the socket is in non-blocking mode), or EOF if trouble.
1408  * --------------------------------
1409  */
1410 static int
internal_flush(void)1411 internal_flush(void)
1412 {
1413 	static int	last_reported_send_errno = 0;
1414 
1415 	char	   *bufptr = PqSendBuffer + PqSendStart;
1416 	char	   *bufend = PqSendBuffer + PqSendPointer;
1417 
1418 	while (bufptr < bufend)
1419 	{
1420 		int			r;
1421 
1422 		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
1423 
1424 		if (r <= 0)
1425 		{
1426 			if (errno == EINTR)
1427 				continue;		/* Ok if we were interrupted */
1428 
1429 			/*
1430 			 * Ok if no data writable without blocking, and the socket is in
1431 			 * non-blocking mode.
1432 			 */
1433 			if (errno == EAGAIN ||
1434 				errno == EWOULDBLOCK)
1435 			{
1436 				return 0;
1437 			}
1438 
1439 			/*
1440 			 * Careful: an ereport() that tries to write to the client would
1441 			 * cause recursion to here, leading to stack overflow and core
1442 			 * dump!  This message must go *only* to the postmaster log.
1443 			 *
1444 			 * If a client disconnects while we're in the midst of output, we
1445 			 * might write quite a bit of data before we get to a safe query
1446 			 * abort point.  So, suppress duplicate log messages.
1447 			 */
1448 			if (errno != last_reported_send_errno)
1449 			{
1450 				last_reported_send_errno = errno;
1451 				ereport(COMMERROR,
1452 						(errcode_for_socket_access(),
1453 						 errmsg("could not send data to client: %m")));
1454 			}
1455 
1456 			/*
1457 			 * We drop the buffered data anyway so that processing can
1458 			 * continue, even though we'll probably quit soon. We also set a
1459 			 * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
1460 			 * the connection.
1461 			 */
1462 			PqSendStart = PqSendPointer = 0;
1463 			ClientConnectionLost = 1;
1464 			InterruptPending = 1;
1465 			return EOF;
1466 		}
1467 
1468 		last_reported_send_errno = 0;	/* reset after any successful send */
1469 		bufptr += r;
1470 		PqSendStart += r;
1471 	}
1472 
1473 	PqSendStart = PqSendPointer = 0;
1474 	return 0;
1475 }
1476 
1477 /* --------------------------------
1478  *		pq_flush_if_writable - flush pending output if writable without blocking
1479  *
1480  * Returns 0 if OK, or EOF if trouble.
1481  * --------------------------------
1482  */
1483 static int
socket_flush_if_writable(void)1484 socket_flush_if_writable(void)
1485 {
1486 	int			res;
1487 
1488 	/* Quick exit if nothing to do */
1489 	if (PqSendPointer == PqSendStart)
1490 		return 0;
1491 
1492 	/* No-op if reentrant call */
1493 	if (PqCommBusy)
1494 		return 0;
1495 
1496 	/* Temporarily put the socket into non-blocking mode */
1497 	socket_set_nonblocking(true);
1498 
1499 	PqCommBusy = true;
1500 	res = internal_flush();
1501 	PqCommBusy = false;
1502 	return res;
1503 }
1504 
1505 /* --------------------------------
1506  *	socket_is_send_pending	- is there any pending data in the output buffer?
1507  * --------------------------------
1508  */
1509 static bool
socket_is_send_pending(void)1510 socket_is_send_pending(void)
1511 {
1512 	return (PqSendStart < PqSendPointer);
1513 }
1514 
1515 /* --------------------------------
1516  * Message-level I/O routines begin here.
1517  *
1518  * These routines understand about the old-style COPY OUT protocol.
1519  * --------------------------------
1520  */
1521 
1522 
1523 /* --------------------------------
1524  *		socket_putmessage - send a normal message (suppressed in COPY OUT mode)
1525  *
1526  *		If msgtype is not '\0', it is a message type code to place before
1527  *		the message body.  If msgtype is '\0', then the message has no type
1528  *		code (this is only valid in pre-3.0 protocols).
1529  *
1530  *		len is the length of the message body data at *s.  In protocol 3.0
1531  *		and later, a message length word (equal to len+4 because it counts
1532  *		itself too) is inserted by this routine.
1533  *
1534  *		All normal messages are suppressed while old-style COPY OUT is in
1535  *		progress.  (In practice only a few notice messages might get emitted
1536  *		then; dropping them is annoying, but at least they will still appear
1537  *		in the postmaster log.)
1538  *
1539  *		We also suppress messages generated while pqcomm.c is busy.  This
1540  *		avoids any possibility of messages being inserted within other
1541  *		messages.  The only known trouble case arises if SIGQUIT occurs
1542  *		during a pqcomm.c routine --- quickdie() will try to send a warning
1543  *		message, and the most reasonable approach seems to be to drop it.
1544  *
1545  *		returns 0 if OK, EOF if trouble
1546  * --------------------------------
1547  */
1548 static int
socket_putmessage(char msgtype,const char * s,size_t len)1549 socket_putmessage(char msgtype, const char *s, size_t len)
1550 {
1551 	if (DoingCopyOut || PqCommBusy)
1552 		return 0;
1553 	PqCommBusy = true;
1554 	if (msgtype)
1555 		if (internal_putbytes(&msgtype, 1))
1556 			goto fail;
1557 	if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1558 	{
1559 		uint32		n32;
1560 
1561 		n32 = htonl((uint32) (len + 4));
1562 		if (internal_putbytes((char *) &n32, 4))
1563 			goto fail;
1564 	}
1565 	if (internal_putbytes(s, len))
1566 		goto fail;
1567 	PqCommBusy = false;
1568 	return 0;
1569 
1570 fail:
1571 	PqCommBusy = false;
1572 	return EOF;
1573 }
1574 
1575 /* --------------------------------
1576  *		pq_putmessage_noblock	- like pq_putmessage, but never blocks
1577  *
1578  *		If the output buffer is too small to hold the message, the buffer
1579  *		is enlarged.
1580  */
1581 static void
socket_putmessage_noblock(char msgtype,const char * s,size_t len)1582 socket_putmessage_noblock(char msgtype, const char *s, size_t len)
1583 {
1584 	int res		PG_USED_FOR_ASSERTS_ONLY;
1585 	int			required;
1586 
1587 	/*
1588 	 * Ensure we have enough space in the output buffer for the message header
1589 	 * as well as the message itself.
1590 	 */
1591 	required = PqSendPointer + 1 + 4 + len;
1592 	if (required > PqSendBufferSize)
1593 	{
1594 		PqSendBuffer = repalloc(PqSendBuffer, required);
1595 		PqSendBufferSize = required;
1596 	}
1597 	res = pq_putmessage(msgtype, s, len);
1598 	Assert(res == 0);			/* should not fail when the message fits in
1599 								 * buffer */
1600 }
1601 
1602 
1603 /* --------------------------------
1604  *		socket_startcopyout - inform libpq that an old-style COPY OUT transfer
1605  *			is beginning
1606  * --------------------------------
1607  */
1608 static void
socket_startcopyout(void)1609 socket_startcopyout(void)
1610 {
1611 	DoingCopyOut = true;
1612 }
1613 
1614 /* --------------------------------
1615  *		socket_endcopyout	- end an old-style COPY OUT transfer
1616  *
1617  *		If errorAbort is indicated, we are aborting a COPY OUT due to an error,
1618  *		and must send a terminator line.  Since a partial data line might have
1619  *		been emitted, send a couple of newlines first (the first one could
1620  *		get absorbed by a backslash...)  Note that old-style COPY OUT does
1621  *		not allow binary transfers, so a textual terminator is always correct.
1622  * --------------------------------
1623  */
1624 static void
socket_endcopyout(bool errorAbort)1625 socket_endcopyout(bool errorAbort)
1626 {
1627 	if (!DoingCopyOut)
1628 		return;
1629 	if (errorAbort)
1630 		pq_putbytes("\n\n\\.\n", 5);
1631 	/* in non-error case, copy.c will have emitted the terminator line */
1632 	DoingCopyOut = false;
1633 }
1634 
1635 /*
1636  * Support for TCP Keepalive parameters
1637  */
1638 
1639 /*
1640  * On Windows, we need to set both idle and interval at the same time.
1641  * We also cannot reset them to the default (setting to zero will
1642  * actually set them to zero, not default), therefore we fallback to
1643  * the out-of-the-box default instead.
1644  */
1645 #if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
1646 static int
pq_setkeepaliveswin32(Port * port,int idle,int interval)1647 pq_setkeepaliveswin32(Port *port, int idle, int interval)
1648 {
1649 	struct tcp_keepalive ka;
1650 	DWORD		retsize;
1651 
1652 	if (idle <= 0)
1653 		idle = 2 * 60 * 60;		/* default = 2 hours */
1654 	if (interval <= 0)
1655 		interval = 1;			/* default = 1 second */
1656 
1657 	ka.onoff = 1;
1658 	ka.keepalivetime = idle * 1000;
1659 	ka.keepaliveinterval = interval * 1000;
1660 
1661 	if (WSAIoctl(port->sock,
1662 				 SIO_KEEPALIVE_VALS,
1663 				 (LPVOID) &ka,
1664 				 sizeof(ka),
1665 				 NULL,
1666 				 0,
1667 				 &retsize,
1668 				 NULL,
1669 				 NULL)
1670 		!= 0)
1671 	{
1672 		elog(LOG, "WSAIoctl(SIO_KEEPALIVE_VALS) failed: %ui",
1673 			 WSAGetLastError());
1674 		return STATUS_ERROR;
1675 	}
1676 	if (port->keepalives_idle != idle)
1677 		port->keepalives_idle = idle;
1678 	if (port->keepalives_interval != interval)
1679 		port->keepalives_interval = interval;
1680 	return STATUS_OK;
1681 }
1682 #endif
1683 
1684 int
pq_getkeepalivesidle(Port * port)1685 pq_getkeepalivesidle(Port *port)
1686 {
1687 #if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1688 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1689 		return 0;
1690 
1691 	if (port->keepalives_idle != 0)
1692 		return port->keepalives_idle;
1693 
1694 	if (port->default_keepalives_idle == 0)
1695 	{
1696 #ifndef WIN32
1697 		ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_idle);
1698 
1699 		if (getsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1700 					   (char *) &port->default_keepalives_idle,
1701 					   &size) < 0)
1702 		{
1703 			elog(LOG, "getsockopt(%s) failed: %m", PG_TCP_KEEPALIVE_IDLE_STR);
1704 			port->default_keepalives_idle = -1; /* don't know */
1705 		}
1706 #else							/* WIN32 */
1707 		/* We can't get the defaults on Windows, so return "don't know" */
1708 		port->default_keepalives_idle = -1;
1709 #endif   /* WIN32 */
1710 	}
1711 
1712 	return port->default_keepalives_idle;
1713 #else
1714 	return 0;
1715 #endif
1716 }
1717 
1718 int
pq_setkeepalivesidle(int idle,Port * port)1719 pq_setkeepalivesidle(int idle, Port *port)
1720 {
1721 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1722 		return STATUS_OK;
1723 
1724 /* check SIO_KEEPALIVE_VALS here, not just WIN32, as some toolchains lack it */
1725 #if defined(PG_TCP_KEEPALIVE_IDLE) || defined(SIO_KEEPALIVE_VALS)
1726 	if (idle == port->keepalives_idle)
1727 		return STATUS_OK;
1728 
1729 #ifndef WIN32
1730 	if (port->default_keepalives_idle <= 0)
1731 	{
1732 		if (pq_getkeepalivesidle(port) < 0)
1733 		{
1734 			if (idle == 0)
1735 				return STATUS_OK;		/* default is set but unknown */
1736 			else
1737 				return STATUS_ERROR;
1738 		}
1739 	}
1740 
1741 	if (idle == 0)
1742 		idle = port->default_keepalives_idle;
1743 
1744 	if (setsockopt(port->sock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
1745 				   (char *) &idle, sizeof(idle)) < 0)
1746 	{
1747 		elog(LOG, "setsockopt(%s) failed: %m", PG_TCP_KEEPALIVE_IDLE_STR);
1748 		return STATUS_ERROR;
1749 	}
1750 
1751 	port->keepalives_idle = idle;
1752 #else							/* WIN32 */
1753 	return pq_setkeepaliveswin32(port, idle, port->keepalives_interval);
1754 #endif
1755 #else
1756 	if (idle != 0)
1757 	{
1758 		elog(LOG, "setting the keepalive idle time is not supported");
1759 		return STATUS_ERROR;
1760 	}
1761 #endif
1762 
1763 	return STATUS_OK;
1764 }
1765 
1766 int
pq_getkeepalivesinterval(Port * port)1767 pq_getkeepalivesinterval(Port *port)
1768 {
1769 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1770 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1771 		return 0;
1772 
1773 	if (port->keepalives_interval != 0)
1774 		return port->keepalives_interval;
1775 
1776 	if (port->default_keepalives_interval == 0)
1777 	{
1778 #ifndef WIN32
1779 		ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_interval);
1780 
1781 		if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1782 					   (char *) &port->default_keepalives_interval,
1783 					   &size) < 0)
1784 		{
1785 			elog(LOG, "getsockopt(%s) failed: %m", "TCP_KEEPINTVL");
1786 			port->default_keepalives_interval = -1;		/* don't know */
1787 		}
1788 #else
1789 		/* We can't get the defaults on Windows, so return "don't know" */
1790 		port->default_keepalives_interval = -1;
1791 #endif   /* WIN32 */
1792 	}
1793 
1794 	return port->default_keepalives_interval;
1795 #else
1796 	return 0;
1797 #endif
1798 }
1799 
1800 int
pq_setkeepalivesinterval(int interval,Port * port)1801 pq_setkeepalivesinterval(int interval, Port *port)
1802 {
1803 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1804 		return STATUS_OK;
1805 
1806 #if defined(TCP_KEEPINTVL) || defined(SIO_KEEPALIVE_VALS)
1807 	if (interval == port->keepalives_interval)
1808 		return STATUS_OK;
1809 
1810 #ifndef WIN32
1811 	if (port->default_keepalives_interval <= 0)
1812 	{
1813 		if (pq_getkeepalivesinterval(port) < 0)
1814 		{
1815 			if (interval == 0)
1816 				return STATUS_OK;		/* default is set but unknown */
1817 			else
1818 				return STATUS_ERROR;
1819 		}
1820 	}
1821 
1822 	if (interval == 0)
1823 		interval = port->default_keepalives_interval;
1824 
1825 	if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPINTVL,
1826 				   (char *) &interval, sizeof(interval)) < 0)
1827 	{
1828 		elog(LOG, "setsockopt(%s) failed: %m", "TCP_KEEPINTVL");
1829 		return STATUS_ERROR;
1830 	}
1831 
1832 	port->keepalives_interval = interval;
1833 #else							/* WIN32 */
1834 	return pq_setkeepaliveswin32(port, port->keepalives_idle, interval);
1835 #endif
1836 #else
1837 	if (interval != 0)
1838 	{
1839 		elog(LOG, "setsockopt(%s) not supported", "TCP_KEEPINTVL");
1840 		return STATUS_ERROR;
1841 	}
1842 #endif
1843 
1844 	return STATUS_OK;
1845 }
1846 
1847 int
pq_getkeepalivescount(Port * port)1848 pq_getkeepalivescount(Port *port)
1849 {
1850 #ifdef TCP_KEEPCNT
1851 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1852 		return 0;
1853 
1854 	if (port->keepalives_count != 0)
1855 		return port->keepalives_count;
1856 
1857 	if (port->default_keepalives_count == 0)
1858 	{
1859 		ACCEPT_TYPE_ARG3 size = sizeof(port->default_keepalives_count);
1860 
1861 		if (getsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1862 					   (char *) &port->default_keepalives_count,
1863 					   &size) < 0)
1864 		{
1865 			elog(LOG, "getsockopt(%s) failed: %m", "TCP_KEEPCNT");
1866 			port->default_keepalives_count = -1;		/* don't know */
1867 		}
1868 	}
1869 
1870 	return port->default_keepalives_count;
1871 #else
1872 	return 0;
1873 #endif
1874 }
1875 
1876 int
pq_setkeepalivescount(int count,Port * port)1877 pq_setkeepalivescount(int count, Port *port)
1878 {
1879 	if (port == NULL || IS_AF_UNIX(port->laddr.addr.ss_family))
1880 		return STATUS_OK;
1881 
1882 #ifdef TCP_KEEPCNT
1883 	if (count == port->keepalives_count)
1884 		return STATUS_OK;
1885 
1886 	if (port->default_keepalives_count <= 0)
1887 	{
1888 		if (pq_getkeepalivescount(port) < 0)
1889 		{
1890 			if (count == 0)
1891 				return STATUS_OK;		/* default is set but unknown */
1892 			else
1893 				return STATUS_ERROR;
1894 		}
1895 	}
1896 
1897 	if (count == 0)
1898 		count = port->default_keepalives_count;
1899 
1900 	if (setsockopt(port->sock, IPPROTO_TCP, TCP_KEEPCNT,
1901 				   (char *) &count, sizeof(count)) < 0)
1902 	{
1903 		elog(LOG, "setsockopt(%s) failed: %m", "TCP_KEEPCNT");
1904 		return STATUS_ERROR;
1905 	}
1906 
1907 	port->keepalives_count = count;
1908 #else
1909 	if (count != 0)
1910 	{
1911 		elog(LOG, "setsockopt(%s) not supported", "TCP_KEEPCNT");
1912 		return STATUS_ERROR;
1913 	}
1914 #endif
1915 
1916 	return STATUS_OK;
1917 }
1918