1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *	  src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18 
19 #include <unistd.h>
20 #include <sys/time.h>
21 
22 #include "access/xlog.h"
23 #include "catalog/pg_type.h"
24 #include "common/connect.h"
25 #include "funcapi.h"
26 #include "libpq-fe.h"
27 #include "mb/pg_wchar.h"
28 #include "miscadmin.h"
29 #include "pgstat.h"
30 #include "pqexpbuffer.h"
31 #include "replication/walreceiver.h"
32 #include "utils/builtins.h"
33 #include "utils/memutils.h"
34 #include "utils/pg_lsn.h"
35 #include "utils/tuplestore.h"
36 
37 PG_MODULE_MAGIC;
38 
39 void		_PG_init(void);
40 
41 struct WalReceiverConn
42 {
43 	/* Current connection to the primary, if any */
44 	PGconn	   *streamConn;
45 	/* Used to remember if the connection is logical or physical */
46 	bool		logical;
47 	/* Buffer for currently read records */
48 	char	   *recvBuf;
49 };
50 
51 /* Prototypes for interface functions */
52 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
53 										 bool logical, const char *appname,
54 										 char **err);
55 static void libpqrcv_check_conninfo(const char *conninfo);
56 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
57 static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
58 									char **sender_host, int *sender_port);
59 static char *libpqrcv_identify_system(WalReceiverConn *conn,
60 									  TimeLineID *primary_tli);
61 static int	libpqrcv_server_version(WalReceiverConn *conn);
62 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
63 											 TimeLineID tli, char **filename,
64 											 char **content, int *len);
65 static bool libpqrcv_startstreaming(WalReceiverConn *conn,
66 									const WalRcvStreamOptions *options);
67 static void libpqrcv_endstreaming(WalReceiverConn *conn,
68 								  TimeLineID *next_tli);
69 static int	libpqrcv_receive(WalReceiverConn *conn, char **buffer,
70 							 pgsocket *wait_fd);
71 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
72 						  int nbytes);
73 static char *libpqrcv_create_slot(WalReceiverConn *conn,
74 								  const char *slotname,
75 								  bool temporary,
76 								  CRSSnapshotAction snapshot_action,
77 								  XLogRecPtr *lsn);
78 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
79 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
80 									   const char *query,
81 									   const int nRetTypes,
82 									   const Oid *retTypes);
83 static void libpqrcv_disconnect(WalReceiverConn *conn);
84 
85 static WalReceiverFunctionsType PQWalReceiverFunctions = {
86 	libpqrcv_connect,
87 	libpqrcv_check_conninfo,
88 	libpqrcv_get_conninfo,
89 	libpqrcv_get_senderinfo,
90 	libpqrcv_identify_system,
91 	libpqrcv_server_version,
92 	libpqrcv_readtimelinehistoryfile,
93 	libpqrcv_startstreaming,
94 	libpqrcv_endstreaming,
95 	libpqrcv_receive,
96 	libpqrcv_send,
97 	libpqrcv_create_slot,
98 	libpqrcv_get_backend_pid,
99 	libpqrcv_exec,
100 	libpqrcv_disconnect
101 };
102 
103 /* Prototypes for private functions */
104 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
105 static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
106 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
107 
108 /*
109  * Module initialization function
110  */
111 void
_PG_init(void)112 _PG_init(void)
113 {
114 	if (WalReceiverFunctions != NULL)
115 		elog(ERROR, "libpqwalreceiver already loaded");
116 	WalReceiverFunctions = &PQWalReceiverFunctions;
117 }
118 
119 /*
120  * Establish the connection to the primary server for XLOG streaming
121  *
122  * Returns NULL on error and fills the err with palloc'ed error message.
123  */
124 static WalReceiverConn *
libpqrcv_connect(const char * conninfo,bool logical,const char * appname,char ** err)125 libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
126 				 char **err)
127 {
128 	WalReceiverConn *conn;
129 	PostgresPollingStatusType status;
130 	const char *keys[5];
131 	const char *vals[5];
132 	int			i = 0;
133 
134 	/*
135 	 * We use the expand_dbname parameter to process the connection string (or
136 	 * URI), and pass some extra options.
137 	 */
138 	keys[i] = "dbname";
139 	vals[i] = conninfo;
140 	keys[++i] = "replication";
141 	vals[i] = logical ? "database" : "true";
142 	if (!logical)
143 	{
144 		/*
145 		 * The database name is ignored by the server in replication mode, but
146 		 * specify "replication" for .pgpass lookup.
147 		 */
148 		keys[++i] = "dbname";
149 		vals[i] = "replication";
150 	}
151 	keys[++i] = "fallback_application_name";
152 	vals[i] = appname;
153 	if (logical)
154 	{
155 		keys[++i] = "client_encoding";
156 		vals[i] = GetDatabaseEncodingName();
157 	}
158 	keys[++i] = NULL;
159 	vals[i] = NULL;
160 
161 	Assert(i < sizeof(keys));
162 
163 	conn = palloc0(sizeof(WalReceiverConn));
164 	conn->streamConn = PQconnectStartParams(keys, vals,
165 											 /* expand_dbname = */ true);
166 	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
167 	{
168 		*err = pchomp(PQerrorMessage(conn->streamConn));
169 		return NULL;
170 	}
171 
172 	/*
173 	 * Poll connection until we have OK or FAILED status.
174 	 *
175 	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
176 	 */
177 	status = PGRES_POLLING_WRITING;
178 	do
179 	{
180 		int			io_flag;
181 		int			rc;
182 
183 		if (status == PGRES_POLLING_READING)
184 			io_flag = WL_SOCKET_READABLE;
185 #ifdef WIN32
186 		/* Windows needs a different test while waiting for connection-made */
187 		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
188 			io_flag = WL_SOCKET_CONNECTED;
189 #endif
190 		else
191 			io_flag = WL_SOCKET_WRITEABLE;
192 
193 		rc = WaitLatchOrSocket(MyLatch,
194 							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
195 							   PQsocket(conn->streamConn),
196 							   0,
197 							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
198 
199 		/* Interrupted? */
200 		if (rc & WL_LATCH_SET)
201 		{
202 			ResetLatch(MyLatch);
203 			ProcessWalRcvInterrupts();
204 		}
205 
206 		/* If socket is ready, advance the libpq state machine */
207 		if (rc & io_flag)
208 			status = PQconnectPoll(conn->streamConn);
209 	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
210 
211 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
212 	{
213 		*err = pchomp(PQerrorMessage(conn->streamConn));
214 		return NULL;
215 	}
216 
217 	if (logical)
218 	{
219 		PGresult   *res;
220 
221 		res = libpqrcv_PQexec(conn->streamConn,
222 							  ALWAYS_SECURE_SEARCH_PATH_SQL);
223 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
224 		{
225 			PQclear(res);
226 			ereport(ERROR,
227 					(errmsg("could not clear search path: %s",
228 							pchomp(PQerrorMessage(conn->streamConn)))));
229 		}
230 		PQclear(res);
231 	}
232 
233 	conn->logical = logical;
234 
235 	return conn;
236 }
237 
238 /*
239  * Validate connection info string (just try to parse it)
240  */
241 static void
libpqrcv_check_conninfo(const char * conninfo)242 libpqrcv_check_conninfo(const char *conninfo)
243 {
244 	PQconninfoOption *opts = NULL;
245 	char	   *err = NULL;
246 
247 	opts = PQconninfoParse(conninfo, &err);
248 	if (opts == NULL)
249 	{
250 		/* The error string is malloc'd, so we must free it explicitly */
251 		char	   *errcopy = err ? pstrdup(err) : "out of memory";
252 
253 		PQfreemem(err);
254 		ereport(ERROR,
255 				(errcode(ERRCODE_SYNTAX_ERROR),
256 				 errmsg("invalid connection string syntax: %s", errcopy)));
257 	}
258 
259 	PQconninfoFree(opts);
260 }
261 
262 /*
263  * Return a user-displayable conninfo string.  Any security-sensitive fields
264  * are obfuscated.
265  */
266 static char *
libpqrcv_get_conninfo(WalReceiverConn * conn)267 libpqrcv_get_conninfo(WalReceiverConn *conn)
268 {
269 	PQconninfoOption *conn_opts;
270 	PQconninfoOption *conn_opt;
271 	PQExpBufferData buf;
272 	char	   *retval;
273 
274 	Assert(conn->streamConn != NULL);
275 
276 	initPQExpBuffer(&buf);
277 	conn_opts = PQconninfo(conn->streamConn);
278 
279 	if (conn_opts == NULL)
280 		ereport(ERROR,
281 				(errmsg("could not parse connection string: %s",
282 						_("out of memory"))));
283 
284 	/* build a clean connection string from pieces */
285 	for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
286 	{
287 		bool		obfuscate;
288 
289 		/* Skip debug and empty options */
290 		if (strchr(conn_opt->dispchar, 'D') ||
291 			conn_opt->val == NULL ||
292 			conn_opt->val[0] == '\0')
293 			continue;
294 
295 		/* Obfuscate security-sensitive options */
296 		obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
297 
298 		appendPQExpBuffer(&buf, "%s%s=%s",
299 						  buf.len == 0 ? "" : " ",
300 						  conn_opt->keyword,
301 						  obfuscate ? "********" : conn_opt->val);
302 	}
303 
304 	PQconninfoFree(conn_opts);
305 
306 	retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
307 	termPQExpBuffer(&buf);
308 	return retval;
309 }
310 
311 /*
312  * Provides information of sender this WAL receiver is connected to.
313  */
314 static void
libpqrcv_get_senderinfo(WalReceiverConn * conn,char ** sender_host,int * sender_port)315 libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
316 						int *sender_port)
317 {
318 	char	   *ret = NULL;
319 
320 	*sender_host = NULL;
321 	*sender_port = 0;
322 
323 	Assert(conn->streamConn != NULL);
324 
325 	ret = PQhost(conn->streamConn);
326 	if (ret && strlen(ret) != 0)
327 		*sender_host = pstrdup(ret);
328 
329 	ret = PQport(conn->streamConn);
330 	if (ret && strlen(ret) != 0)
331 		*sender_port = atoi(ret);
332 }
333 
334 /*
335  * Check that primary's system identifier matches ours, and fetch the current
336  * timeline ID of the primary.
337  */
338 static char *
libpqrcv_identify_system(WalReceiverConn * conn,TimeLineID * primary_tli)339 libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
340 {
341 	PGresult   *res;
342 	char	   *primary_sysid;
343 
344 	/*
345 	 * Get the system identifier and timeline ID as a DataRow message from the
346 	 * primary server.
347 	 */
348 	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
349 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
350 	{
351 		PQclear(res);
352 		ereport(ERROR,
353 				(errmsg("could not receive database system identifier and timeline ID from "
354 						"the primary server: %s",
355 						pchomp(PQerrorMessage(conn->streamConn)))));
356 	}
357 	if (PQnfields(res) < 3 || PQntuples(res) != 1)
358 	{
359 		int			ntuples = PQntuples(res);
360 		int			nfields = PQnfields(res);
361 
362 		PQclear(res);
363 		ereport(ERROR,
364 				(errmsg("invalid response from primary server"),
365 				 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
366 						   ntuples, nfields, 3, 1)));
367 	}
368 	primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
369 	*primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
370 	PQclear(res);
371 
372 	return primary_sysid;
373 }
374 
375 /*
376  * Thin wrapper around libpq to obtain server version.
377  */
378 static int
libpqrcv_server_version(WalReceiverConn * conn)379 libpqrcv_server_version(WalReceiverConn *conn)
380 {
381 	return PQserverVersion(conn->streamConn);
382 }
383 
384 /*
385  * Start streaming WAL data from given streaming options.
386  *
387  * Returns true if we switched successfully to copy-both mode. False
388  * means the server received the command and executed it successfully, but
389  * didn't switch to copy-mode.  That means that there was no WAL on the
390  * requested timeline and starting point, because the server switched to
391  * another timeline at or before the requested starting point. On failure,
392  * throws an ERROR.
393  */
394 static bool
libpqrcv_startstreaming(WalReceiverConn * conn,const WalRcvStreamOptions * options)395 libpqrcv_startstreaming(WalReceiverConn *conn,
396 						const WalRcvStreamOptions *options)
397 {
398 	StringInfoData cmd;
399 	PGresult   *res;
400 
401 	Assert(options->logical == conn->logical);
402 	Assert(options->slotname || !options->logical);
403 
404 	initStringInfo(&cmd);
405 
406 	/* Build the command. */
407 	appendStringInfoString(&cmd, "START_REPLICATION");
408 	if (options->slotname != NULL)
409 		appendStringInfo(&cmd, " SLOT \"%s\"",
410 						 options->slotname);
411 
412 	if (options->logical)
413 		appendStringInfoString(&cmd, " LOGICAL");
414 
415 	appendStringInfo(&cmd, " %X/%X",
416 					 (uint32) (options->startpoint >> 32),
417 					 (uint32) options->startpoint);
418 
419 	/*
420 	 * Additional options are different depending on if we are doing logical
421 	 * or physical replication.
422 	 */
423 	if (options->logical)
424 	{
425 		char	   *pubnames_str;
426 		List	   *pubnames;
427 		char	   *pubnames_literal;
428 
429 		appendStringInfoString(&cmd, " (");
430 
431 		appendStringInfo(&cmd, "proto_version '%u'",
432 						 options->proto.logical.proto_version);
433 
434 		pubnames = options->proto.logical.publication_names;
435 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
436 		if (!pubnames_str)
437 			ereport(ERROR,
438 					(errmsg("could not start WAL streaming: %s",
439 							pchomp(PQerrorMessage(conn->streamConn)))));
440 		pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
441 										   strlen(pubnames_str));
442 		if (!pubnames_literal)
443 			ereport(ERROR,
444 					(errmsg("could not start WAL streaming: %s",
445 							pchomp(PQerrorMessage(conn->streamConn)))));
446 		appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
447 		PQfreemem(pubnames_literal);
448 		pfree(pubnames_str);
449 
450 		appendStringInfoChar(&cmd, ')');
451 	}
452 	else
453 		appendStringInfo(&cmd, " TIMELINE %u",
454 						 options->proto.physical.startpointTLI);
455 
456 	/* Start streaming. */
457 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
458 	pfree(cmd.data);
459 
460 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
461 	{
462 		PQclear(res);
463 		return false;
464 	}
465 	else if (PQresultStatus(res) != PGRES_COPY_BOTH)
466 	{
467 		PQclear(res);
468 		ereport(ERROR,
469 				(errmsg("could not start WAL streaming: %s",
470 						pchomp(PQerrorMessage(conn->streamConn)))));
471 	}
472 	PQclear(res);
473 	return true;
474 }
475 
476 /*
477  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
478  * reported by the server, or 0 if it did not report it.
479  */
480 static void
libpqrcv_endstreaming(WalReceiverConn * conn,TimeLineID * next_tli)481 libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
482 {
483 	PGresult   *res;
484 
485 	/*
486 	 * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
487 	 * block, but the risk seems small.
488 	 */
489 	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
490 		PQflush(conn->streamConn))
491 		ereport(ERROR,
492 				(errmsg("could not send end-of-streaming message to primary: %s",
493 						pchomp(PQerrorMessage(conn->streamConn)))));
494 
495 	*next_tli = 0;
496 
497 	/*
498 	 * After COPY is finished, we should receive a result set indicating the
499 	 * next timeline's ID, or just CommandComplete if the server was shut
500 	 * down.
501 	 *
502 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
503 	 * also possible in case we aborted the copy in mid-stream.
504 	 */
505 	res = libpqrcv_PQgetResult(conn->streamConn);
506 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
507 	{
508 		/*
509 		 * Read the next timeline's ID. The server also sends the timeline's
510 		 * starting point, but it is ignored.
511 		 */
512 		if (PQnfields(res) < 2 || PQntuples(res) != 1)
513 			ereport(ERROR,
514 					(errmsg("unexpected result set after end-of-streaming")));
515 		*next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
516 		PQclear(res);
517 
518 		/* the result set should be followed by CommandComplete */
519 		res = libpqrcv_PQgetResult(conn->streamConn);
520 	}
521 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
522 	{
523 		PQclear(res);
524 
525 		/* End the copy */
526 		if (PQendcopy(conn->streamConn))
527 			ereport(ERROR,
528 					(errmsg("error while shutting down streaming COPY: %s",
529 							pchomp(PQerrorMessage(conn->streamConn)))));
530 
531 		/* CommandComplete should follow */
532 		res = libpqrcv_PQgetResult(conn->streamConn);
533 	}
534 
535 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
536 		ereport(ERROR,
537 				(errmsg("error reading result of streaming command: %s",
538 						pchomp(PQerrorMessage(conn->streamConn)))));
539 	PQclear(res);
540 
541 	/* Verify that there are no more results */
542 	res = libpqrcv_PQgetResult(conn->streamConn);
543 	if (res != NULL)
544 		ereport(ERROR,
545 				(errmsg("unexpected result after CommandComplete: %s",
546 						pchomp(PQerrorMessage(conn->streamConn)))));
547 }
548 
549 /*
550  * Fetch the timeline history file for 'tli' from primary.
551  */
552 static void
libpqrcv_readtimelinehistoryfile(WalReceiverConn * conn,TimeLineID tli,char ** filename,char ** content,int * len)553 libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
554 								 TimeLineID tli, char **filename,
555 								 char **content, int *len)
556 {
557 	PGresult   *res;
558 	char		cmd[64];
559 
560 	Assert(!conn->logical);
561 
562 	/*
563 	 * Request the primary to send over the history file for given timeline.
564 	 */
565 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
566 	res = libpqrcv_PQexec(conn->streamConn, cmd);
567 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
568 	{
569 		PQclear(res);
570 		ereport(ERROR,
571 				(errmsg("could not receive timeline history file from "
572 						"the primary server: %s",
573 						pchomp(PQerrorMessage(conn->streamConn)))));
574 	}
575 	if (PQnfields(res) != 2 || PQntuples(res) != 1)
576 	{
577 		int			ntuples = PQntuples(res);
578 		int			nfields = PQnfields(res);
579 
580 		PQclear(res);
581 		ereport(ERROR,
582 				(errmsg("invalid response from primary server"),
583 				 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
584 						   ntuples, nfields)));
585 	}
586 	*filename = pstrdup(PQgetvalue(res, 0, 0));
587 
588 	*len = PQgetlength(res, 0, 1);
589 	*content = palloc(*len);
590 	memcpy(*content, PQgetvalue(res, 0, 1), *len);
591 	PQclear(res);
592 }
593 
594 /*
595  * Send a query and wait for the results by using the asynchronous libpq
596  * functions and socket readiness events.
597  *
598  * We must not use the regular blocking libpq functions like PQexec()
599  * since they are uninterruptible by signals on some platforms, such as
600  * Windows.
601  *
602  * The function is modeled on PQexec() in libpq, but only implements
603  * those parts that are in use in the walreceiver api.
604  *
605  * May return NULL, rather than an error result, on failure.
606  */
607 static PGresult *
libpqrcv_PQexec(PGconn * streamConn,const char * query)608 libpqrcv_PQexec(PGconn *streamConn, const char *query)
609 {
610 	PGresult   *lastResult = NULL;
611 
612 	/*
613 	 * PQexec() silently discards any prior query results on the connection.
614 	 * This is not required for this function as it's expected that the caller
615 	 * (which is this library in all cases) will behave correctly and we don't
616 	 * have to be backwards compatible with old libpq.
617 	 */
618 
619 	/*
620 	 * Submit the query.  Since we don't use non-blocking mode, this could
621 	 * theoretically block.  In practice, since we don't send very long query
622 	 * strings, the risk seems negligible.
623 	 */
624 	if (!PQsendQuery(streamConn, query))
625 		return NULL;
626 
627 	for (;;)
628 	{
629 		/* Wait for, and collect, the next PGresult. */
630 		PGresult   *result;
631 
632 		result = libpqrcv_PQgetResult(streamConn);
633 		if (result == NULL)
634 			break;				/* query is complete, or failure */
635 
636 		/*
637 		 * Emulate PQexec()'s behavior of returning the last result when there
638 		 * are many.  We are fine with returning just last error message.
639 		 */
640 		PQclear(lastResult);
641 		lastResult = result;
642 
643 		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
644 			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
645 			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
646 			PQstatus(streamConn) == CONNECTION_BAD)
647 			break;
648 	}
649 
650 	return lastResult;
651 }
652 
653 /*
654  * Perform the equivalent of PQgetResult(), but watch for interrupts.
655  */
656 static PGresult *
libpqrcv_PQgetResult(PGconn * streamConn)657 libpqrcv_PQgetResult(PGconn *streamConn)
658 {
659 	/*
660 	 * Collect data until PQgetResult is ready to get the result without
661 	 * blocking.
662 	 */
663 	while (PQisBusy(streamConn))
664 	{
665 		int			rc;
666 
667 		/*
668 		 * We don't need to break down the sleep into smaller increments,
669 		 * since we'll get interrupted by signals and can handle any
670 		 * interrupts here.
671 		 */
672 		rc = WaitLatchOrSocket(MyLatch,
673 							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
674 							   WL_LATCH_SET,
675 							   PQsocket(streamConn),
676 							   0,
677 							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
678 
679 		/* Interrupted? */
680 		if (rc & WL_LATCH_SET)
681 		{
682 			ResetLatch(MyLatch);
683 			ProcessWalRcvInterrupts();
684 		}
685 
686 		/* Consume whatever data is available from the socket */
687 		if (PQconsumeInput(streamConn) == 0)
688 		{
689 			/* trouble; return NULL */
690 			return NULL;
691 		}
692 	}
693 
694 	/* Now we can collect and return the next PGresult */
695 	return PQgetResult(streamConn);
696 }
697 
698 /*
699  * Disconnect connection to primary, if any.
700  */
701 static void
libpqrcv_disconnect(WalReceiverConn * conn)702 libpqrcv_disconnect(WalReceiverConn *conn)
703 {
704 	PQfinish(conn->streamConn);
705 	if (conn->recvBuf != NULL)
706 		PQfreemem(conn->recvBuf);
707 	pfree(conn);
708 }
709 
710 /*
711  * Receive a message available from XLOG stream.
712  *
713  * Returns:
714  *
715  *	 If data was received, returns the length of the data. *buffer is set to
716  *	 point to a buffer holding the received message. The buffer is only valid
717  *	 until the next libpqrcv_* call.
718  *
719  *	 If no data was available immediately, returns 0, and *wait_fd is set to a
720  *	 socket descriptor which can be waited on before trying again.
721  *
722  *	 -1 if the server ended the COPY.
723  *
724  * ereports on error.
725  */
726 static int
libpqrcv_receive(WalReceiverConn * conn,char ** buffer,pgsocket * wait_fd)727 libpqrcv_receive(WalReceiverConn *conn, char **buffer,
728 				 pgsocket *wait_fd)
729 {
730 	int			rawlen;
731 
732 	if (conn->recvBuf != NULL)
733 		PQfreemem(conn->recvBuf);
734 	conn->recvBuf = NULL;
735 
736 	/* Try to receive a CopyData message */
737 	rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
738 	if (rawlen == 0)
739 	{
740 		/* Try consuming some data. */
741 		if (PQconsumeInput(conn->streamConn) == 0)
742 			ereport(ERROR,
743 					(errmsg("could not receive data from WAL stream: %s",
744 							pchomp(PQerrorMessage(conn->streamConn)))));
745 
746 		/* Now that we've consumed some input, try again */
747 		rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
748 		if (rawlen == 0)
749 		{
750 			/* Tell caller to try again when our socket is ready. */
751 			*wait_fd = PQsocket(conn->streamConn);
752 			return 0;
753 		}
754 	}
755 	if (rawlen == -1)			/* end-of-streaming or error */
756 	{
757 		PGresult   *res;
758 
759 		res = libpqrcv_PQgetResult(conn->streamConn);
760 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
761 		{
762 			PQclear(res);
763 
764 			/* Verify that there are no more results. */
765 			res = libpqrcv_PQgetResult(conn->streamConn);
766 			if (res != NULL)
767 			{
768 				PQclear(res);
769 
770 				/*
771 				 * If the other side closed the connection orderly (otherwise
772 				 * we'd seen an error, or PGRES_COPY_IN) don't report an error
773 				 * here, but let callers deal with it.
774 				 */
775 				if (PQstatus(conn->streamConn) == CONNECTION_BAD)
776 					return -1;
777 
778 				ereport(ERROR,
779 						(errmsg("unexpected result after CommandComplete: %s",
780 								PQerrorMessage(conn->streamConn))));
781 			}
782 
783 			return -1;
784 		}
785 		else if (PQresultStatus(res) == PGRES_COPY_IN)
786 		{
787 			PQclear(res);
788 			return -1;
789 		}
790 		else
791 		{
792 			PQclear(res);
793 			ereport(ERROR,
794 					(errmsg("could not receive data from WAL stream: %s",
795 							pchomp(PQerrorMessage(conn->streamConn)))));
796 		}
797 	}
798 	if (rawlen < -1)
799 		ereport(ERROR,
800 				(errmsg("could not receive data from WAL stream: %s",
801 						pchomp(PQerrorMessage(conn->streamConn)))));
802 
803 	/* Return received messages to caller */
804 	*buffer = conn->recvBuf;
805 	return rawlen;
806 }
807 
808 /*
809  * Send a message to XLOG stream.
810  *
811  * ereports on error.
812  */
813 static void
libpqrcv_send(WalReceiverConn * conn,const char * buffer,int nbytes)814 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
815 {
816 	if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
817 		PQflush(conn->streamConn))
818 		ereport(ERROR,
819 				(errmsg("could not send data to WAL stream: %s",
820 						pchomp(PQerrorMessage(conn->streamConn)))));
821 }
822 
823 /*
824  * Create new replication slot.
825  * Returns the name of the exported snapshot for logical slot or NULL for
826  * physical slot.
827  */
828 static char *
libpqrcv_create_slot(WalReceiverConn * conn,const char * slotname,bool temporary,CRSSnapshotAction snapshot_action,XLogRecPtr * lsn)829 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
830 					 bool temporary, CRSSnapshotAction snapshot_action,
831 					 XLogRecPtr *lsn)
832 {
833 	PGresult   *res;
834 	StringInfoData cmd;
835 	char	   *snapshot;
836 
837 	initStringInfo(&cmd);
838 
839 	appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
840 
841 	if (temporary)
842 		appendStringInfoString(&cmd, " TEMPORARY");
843 
844 	if (conn->logical)
845 	{
846 		appendStringInfoString(&cmd, " LOGICAL pgoutput");
847 		switch (snapshot_action)
848 		{
849 			case CRS_EXPORT_SNAPSHOT:
850 				appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
851 				break;
852 			case CRS_NOEXPORT_SNAPSHOT:
853 				appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
854 				break;
855 			case CRS_USE_SNAPSHOT:
856 				appendStringInfoString(&cmd, " USE_SNAPSHOT");
857 				break;
858 		}
859 	}
860 	else
861 	{
862 		appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
863 	}
864 
865 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
866 	pfree(cmd.data);
867 
868 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
869 	{
870 		PQclear(res);
871 		ereport(ERROR,
872 				(errmsg("could not create replication slot \"%s\": %s",
873 						slotname, pchomp(PQerrorMessage(conn->streamConn)))));
874 	}
875 
876 	if (lsn)
877 		*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
878 												   CStringGetDatum(PQgetvalue(res, 0, 1))));
879 
880 	if (!PQgetisnull(res, 0, 2))
881 		snapshot = pstrdup(PQgetvalue(res, 0, 2));
882 	else
883 		snapshot = NULL;
884 
885 	PQclear(res);
886 
887 	return snapshot;
888 }
889 
890 /*
891  * Return PID of remote backend process.
892  */
893 static pid_t
libpqrcv_get_backend_pid(WalReceiverConn * conn)894 libpqrcv_get_backend_pid(WalReceiverConn *conn)
895 {
896 	return PQbackendPID(conn->streamConn);
897 }
898 
899 /*
900  * Convert tuple query result to tuplestore.
901  */
902 static void
libpqrcv_processTuples(PGresult * pgres,WalRcvExecResult * walres,const int nRetTypes,const Oid * retTypes)903 libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
904 					   const int nRetTypes, const Oid *retTypes)
905 {
906 	int			tupn;
907 	int			coln;
908 	int			nfields = PQnfields(pgres);
909 	HeapTuple	tuple;
910 	AttInMetadata *attinmeta;
911 	MemoryContext rowcontext;
912 	MemoryContext oldcontext;
913 
914 	/* Make sure we got expected number of fields. */
915 	if (nfields != nRetTypes)
916 		ereport(ERROR,
917 				(errmsg("invalid query response"),
918 				 errdetail("Expected %d fields, got %d fields.",
919 						   nRetTypes, nfields)));
920 
921 	walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
922 
923 	/* Create tuple descriptor corresponding to expected result. */
924 	walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
925 	for (coln = 0; coln < nRetTypes; coln++)
926 		TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
927 						   PQfname(pgres, coln), retTypes[coln], -1, 0);
928 	attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
929 
930 	/* No point in doing more here if there were no tuples returned. */
931 	if (PQntuples(pgres) == 0)
932 		return;
933 
934 	/* Create temporary context for local allocations. */
935 	rowcontext = AllocSetContextCreate(CurrentMemoryContext,
936 									   "libpqrcv query result context",
937 									   ALLOCSET_DEFAULT_SIZES);
938 
939 	/* Process returned rows. */
940 	for (tupn = 0; tupn < PQntuples(pgres); tupn++)
941 	{
942 		char	   *cstrs[MaxTupleAttributeNumber];
943 
944 		ProcessWalRcvInterrupts();
945 
946 		/* Do the allocations in temporary context. */
947 		oldcontext = MemoryContextSwitchTo(rowcontext);
948 
949 		/*
950 		 * Fill cstrs with null-terminated strings of column values.
951 		 */
952 		for (coln = 0; coln < nfields; coln++)
953 		{
954 			if (PQgetisnull(pgres, tupn, coln))
955 				cstrs[coln] = NULL;
956 			else
957 				cstrs[coln] = PQgetvalue(pgres, tupn, coln);
958 		}
959 
960 		/* Convert row to a tuple, and add it to the tuplestore */
961 		tuple = BuildTupleFromCStrings(attinmeta, cstrs);
962 		tuplestore_puttuple(walres->tuplestore, tuple);
963 
964 		/* Clean up */
965 		MemoryContextSwitchTo(oldcontext);
966 		MemoryContextReset(rowcontext);
967 	}
968 
969 	MemoryContextDelete(rowcontext);
970 }
971 
972 /*
973  * Public interface for sending generic queries (and commands).
974  *
975  * This can only be called from process connected to database.
976  */
977 static WalRcvExecResult *
libpqrcv_exec(WalReceiverConn * conn,const char * query,const int nRetTypes,const Oid * retTypes)978 libpqrcv_exec(WalReceiverConn *conn, const char *query,
979 			  const int nRetTypes, const Oid *retTypes)
980 {
981 	PGresult   *pgres = NULL;
982 	WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
983 
984 	if (MyDatabaseId == InvalidOid)
985 		ereport(ERROR,
986 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
987 				 errmsg("the query interface requires a database connection")));
988 
989 	pgres = libpqrcv_PQexec(conn->streamConn, query);
990 
991 	switch (PQresultStatus(pgres))
992 	{
993 		case PGRES_SINGLE_TUPLE:
994 		case PGRES_TUPLES_OK:
995 			walres->status = WALRCV_OK_TUPLES;
996 			libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
997 			break;
998 
999 		case PGRES_COPY_IN:
1000 			walres->status = WALRCV_OK_COPY_IN;
1001 			break;
1002 
1003 		case PGRES_COPY_OUT:
1004 			walres->status = WALRCV_OK_COPY_OUT;
1005 			break;
1006 
1007 		case PGRES_COPY_BOTH:
1008 			walres->status = WALRCV_OK_COPY_BOTH;
1009 			break;
1010 
1011 		case PGRES_COMMAND_OK:
1012 			walres->status = WALRCV_OK_COMMAND;
1013 			break;
1014 
1015 			/* Empty query is considered error. */
1016 		case PGRES_EMPTY_QUERY:
1017 			walres->status = WALRCV_ERROR;
1018 			walres->err = _("empty query");
1019 			break;
1020 
1021 		case PGRES_NONFATAL_ERROR:
1022 		case PGRES_FATAL_ERROR:
1023 		case PGRES_BAD_RESPONSE:
1024 			walres->status = WALRCV_ERROR;
1025 			walres->err = pchomp(PQerrorMessage(conn->streamConn));
1026 			break;
1027 	}
1028 
1029 	PQclear(pgres);
1030 
1031 	return walres;
1032 }
1033 
1034 /*
1035  * Given a List of strings, return it as single comma separated
1036  * string, quoting identifiers as needed.
1037  *
1038  * This is essentially the reverse of SplitIdentifierString.
1039  *
1040  * The caller should free the result.
1041  */
1042 static char *
stringlist_to_identifierstr(PGconn * conn,List * strings)1043 stringlist_to_identifierstr(PGconn *conn, List *strings)
1044 {
1045 	ListCell   *lc;
1046 	StringInfoData res;
1047 	bool		first = true;
1048 
1049 	initStringInfo(&res);
1050 
1051 	foreach(lc, strings)
1052 	{
1053 		char	   *val = strVal(lfirst(lc));
1054 		char	   *val_escaped;
1055 
1056 		if (first)
1057 			first = false;
1058 		else
1059 			appendStringInfoChar(&res, ',');
1060 
1061 		val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1062 		if (!val_escaped)
1063 		{
1064 			free(res.data);
1065 			return NULL;
1066 		}
1067 		appendStringInfoString(&res, val_escaped);
1068 		PQfreemem(val_escaped);
1069 	}
1070 
1071 	return res.data;
1072 }
1073