1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *	  src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18 
19 #include <unistd.h>
20 #include <sys/time.h>
21 
22 #include "libpq-fe.h"
23 #include "pqexpbuffer.h"
24 #include "access/xlog.h"
25 #include "catalog/pg_type.h"
26 #include "common/connect.h"
27 #include "funcapi.h"
28 #include "mb/pg_wchar.h"
29 #include "miscadmin.h"
30 #include "pgstat.h"
31 #include "replication/walreceiver.h"
32 #include "utils/builtins.h"
33 #include "utils/memutils.h"
34 #include "utils/pg_lsn.h"
35 #include "utils/tuplestore.h"
36 
37 PG_MODULE_MAGIC;
38 
39 void		_PG_init(void);
40 
41 struct WalReceiverConn
42 {
43 	/* Current connection to the primary, if any */
44 	PGconn	   *streamConn;
45 	/* Used to remember if the connection is logical or physical */
46 	bool		logical;
47 	/* Buffer for currently read records */
48 	char	   *recvBuf;
49 };
50 
51 /* Prototypes for interface functions */
52 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
53 										 bool logical, const char *appname,
54 										 char **err);
55 static void libpqrcv_check_conninfo(const char *conninfo);
56 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
57 static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
58 									char **sender_host, int *sender_port);
59 static char *libpqrcv_identify_system(WalReceiverConn *conn,
60 									  TimeLineID *primary_tli);
61 static int	libpqrcv_server_version(WalReceiverConn *conn);
62 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
63 											 TimeLineID tli, char **filename,
64 											 char **content, int *len);
65 static bool libpqrcv_startstreaming(WalReceiverConn *conn,
66 									const WalRcvStreamOptions *options);
67 static void libpqrcv_endstreaming(WalReceiverConn *conn,
68 								  TimeLineID *next_tli);
69 static int	libpqrcv_receive(WalReceiverConn *conn, char **buffer,
70 							 pgsocket *wait_fd);
71 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
72 						  int nbytes);
73 static char *libpqrcv_create_slot(WalReceiverConn *conn,
74 								  const char *slotname,
75 								  bool temporary,
76 								  CRSSnapshotAction snapshot_action,
77 								  XLogRecPtr *lsn);
78 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
79 									   const char *query,
80 									   const int nRetTypes,
81 									   const Oid *retTypes);
82 static void libpqrcv_disconnect(WalReceiverConn *conn);
83 
84 static WalReceiverFunctionsType PQWalReceiverFunctions = {
85 	libpqrcv_connect,
86 	libpqrcv_check_conninfo,
87 	libpqrcv_get_conninfo,
88 	libpqrcv_get_senderinfo,
89 	libpqrcv_identify_system,
90 	libpqrcv_server_version,
91 	libpqrcv_readtimelinehistoryfile,
92 	libpqrcv_startstreaming,
93 	libpqrcv_endstreaming,
94 	libpqrcv_receive,
95 	libpqrcv_send,
96 	libpqrcv_create_slot,
97 	libpqrcv_exec,
98 	libpqrcv_disconnect
99 };
100 
101 /* Prototypes for private functions */
102 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
103 static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
104 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
105 
106 /*
107  * Module initialization function
108  */
109 void
_PG_init(void)110 _PG_init(void)
111 {
112 	if (WalReceiverFunctions != NULL)
113 		elog(ERROR, "libpqwalreceiver already loaded");
114 	WalReceiverFunctions = &PQWalReceiverFunctions;
115 }
116 
117 /*
118  * Establish the connection to the primary server for XLOG streaming
119  *
120  * Returns NULL on error and fills the err with palloc'ed error message.
121  */
122 static WalReceiverConn *
libpqrcv_connect(const char * conninfo,bool logical,const char * appname,char ** err)123 libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
124 				 char **err)
125 {
126 	WalReceiverConn *conn;
127 	PostgresPollingStatusType status;
128 	const char *keys[5];
129 	const char *vals[5];
130 	int			i = 0;
131 
132 	/*
133 	 * We use the expand_dbname parameter to process the connection string (or
134 	 * URI), and pass some extra options.
135 	 */
136 	keys[i] = "dbname";
137 	vals[i] = conninfo;
138 	keys[++i] = "replication";
139 	vals[i] = logical ? "database" : "true";
140 	if (!logical)
141 	{
142 		/*
143 		 * The database name is ignored by the server in replication mode, but
144 		 * specify "replication" for .pgpass lookup.
145 		 */
146 		keys[++i] = "dbname";
147 		vals[i] = "replication";
148 	}
149 	keys[++i] = "fallback_application_name";
150 	vals[i] = appname;
151 	if (logical)
152 	{
153 		keys[++i] = "client_encoding";
154 		vals[i] = GetDatabaseEncodingName();
155 	}
156 	keys[++i] = NULL;
157 	vals[i] = NULL;
158 
159 	Assert(i < sizeof(keys));
160 
161 	conn = palloc0(sizeof(WalReceiverConn));
162 	conn->streamConn = PQconnectStartParams(keys, vals,
163 											 /* expand_dbname = */ true);
164 	if (PQstatus(conn->streamConn) == CONNECTION_BAD)
165 	{
166 		*err = pchomp(PQerrorMessage(conn->streamConn));
167 		return NULL;
168 	}
169 
170 	/*
171 	 * Poll connection until we have OK or FAILED status.
172 	 *
173 	 * Per spec for PQconnectPoll, first wait till socket is write-ready.
174 	 */
175 	status = PGRES_POLLING_WRITING;
176 	do
177 	{
178 		int			io_flag;
179 		int			rc;
180 
181 		if (status == PGRES_POLLING_READING)
182 			io_flag = WL_SOCKET_READABLE;
183 #ifdef WIN32
184 		/* Windows needs a different test while waiting for connection-made */
185 		else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
186 			io_flag = WL_SOCKET_CONNECTED;
187 #endif
188 		else
189 			io_flag = WL_SOCKET_WRITEABLE;
190 
191 		rc = WaitLatchOrSocket(MyLatch,
192 							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
193 							   PQsocket(conn->streamConn),
194 							   0,
195 							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
196 
197 		/* Interrupted? */
198 		if (rc & WL_LATCH_SET)
199 		{
200 			ResetLatch(MyLatch);
201 			ProcessWalRcvInterrupts();
202 		}
203 
204 		/* If socket is ready, advance the libpq state machine */
205 		if (rc & io_flag)
206 			status = PQconnectPoll(conn->streamConn);
207 	} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
208 
209 	if (PQstatus(conn->streamConn) != CONNECTION_OK)
210 	{
211 		*err = pchomp(PQerrorMessage(conn->streamConn));
212 		return NULL;
213 	}
214 
215 	if (logical)
216 	{
217 		PGresult   *res;
218 
219 		res = libpqrcv_PQexec(conn->streamConn,
220 							  ALWAYS_SECURE_SEARCH_PATH_SQL);
221 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
222 		{
223 			PQclear(res);
224 			ereport(ERROR,
225 					(errmsg("could not clear search path: %s",
226 							pchomp(PQerrorMessage(conn->streamConn)))));
227 		}
228 		PQclear(res);
229 	}
230 
231 	conn->logical = logical;
232 
233 	return conn;
234 }
235 
236 /*
237  * Validate connection info string (just try to parse it)
238  */
239 static void
libpqrcv_check_conninfo(const char * conninfo)240 libpqrcv_check_conninfo(const char *conninfo)
241 {
242 	PQconninfoOption *opts = NULL;
243 	char	   *err = NULL;
244 
245 	opts = PQconninfoParse(conninfo, &err);
246 	if (opts == NULL)
247 	{
248 		/* The error string is malloc'd, so we must free it explicitly */
249 		char	   *errcopy = err ? pstrdup(err) : "out of memory";
250 
251 		PQfreemem(err);
252 		ereport(ERROR,
253 				(errcode(ERRCODE_SYNTAX_ERROR),
254 				 errmsg("invalid connection string syntax: %s", errcopy)));
255 	}
256 
257 	PQconninfoFree(opts);
258 }
259 
260 /*
261  * Return a user-displayable conninfo string.  Any security-sensitive fields
262  * are obfuscated.
263  */
264 static char *
libpqrcv_get_conninfo(WalReceiverConn * conn)265 libpqrcv_get_conninfo(WalReceiverConn *conn)
266 {
267 	PQconninfoOption *conn_opts;
268 	PQconninfoOption *conn_opt;
269 	PQExpBufferData buf;
270 	char	   *retval;
271 
272 	Assert(conn->streamConn != NULL);
273 
274 	initPQExpBuffer(&buf);
275 	conn_opts = PQconninfo(conn->streamConn);
276 
277 	if (conn_opts == NULL)
278 		ereport(ERROR,
279 				(errmsg("could not parse connection string: %s",
280 						_("out of memory"))));
281 
282 	/* build a clean connection string from pieces */
283 	for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
284 	{
285 		bool		obfuscate;
286 
287 		/* Skip debug and empty options */
288 		if (strchr(conn_opt->dispchar, 'D') ||
289 			conn_opt->val == NULL ||
290 			conn_opt->val[0] == '\0')
291 			continue;
292 
293 		/* Obfuscate security-sensitive options */
294 		obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
295 
296 		appendPQExpBuffer(&buf, "%s%s=%s",
297 						  buf.len == 0 ? "" : " ",
298 						  conn_opt->keyword,
299 						  obfuscate ? "********" : conn_opt->val);
300 	}
301 
302 	PQconninfoFree(conn_opts);
303 
304 	retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
305 	termPQExpBuffer(&buf);
306 	return retval;
307 }
308 
309 /*
310  * Provides information of sender this WAL receiver is connected to.
311  */
312 static void
libpqrcv_get_senderinfo(WalReceiverConn * conn,char ** sender_host,int * sender_port)313 libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
314 						int *sender_port)
315 {
316 	char	   *ret = NULL;
317 
318 	*sender_host = NULL;
319 	*sender_port = 0;
320 
321 	Assert(conn->streamConn != NULL);
322 
323 	ret = PQhost(conn->streamConn);
324 	if (ret && strlen(ret) != 0)
325 		*sender_host = pstrdup(ret);
326 
327 	ret = PQport(conn->streamConn);
328 	if (ret && strlen(ret) != 0)
329 		*sender_port = atoi(ret);
330 }
331 
332 /*
333  * Check that primary's system identifier matches ours, and fetch the current
334  * timeline ID of the primary.
335  */
336 static char *
libpqrcv_identify_system(WalReceiverConn * conn,TimeLineID * primary_tli)337 libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
338 {
339 	PGresult   *res;
340 	char	   *primary_sysid;
341 
342 	/*
343 	 * Get the system identifier and timeline ID as a DataRow message from the
344 	 * primary server.
345 	 */
346 	res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
347 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
348 	{
349 		PQclear(res);
350 		ereport(ERROR,
351 				(errmsg("could not receive database system identifier and timeline ID from "
352 						"the primary server: %s",
353 						pchomp(PQerrorMessage(conn->streamConn)))));
354 	}
355 	if (PQnfields(res) < 3 || PQntuples(res) != 1)
356 	{
357 		int			ntuples = PQntuples(res);
358 		int			nfields = PQnfields(res);
359 
360 		PQclear(res);
361 		ereport(ERROR,
362 				(errmsg("invalid response from primary server"),
363 				 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
364 						   ntuples, nfields, 3, 1)));
365 	}
366 	primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
367 	*primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
368 	PQclear(res);
369 
370 	return primary_sysid;
371 }
372 
373 /*
374  * Thin wrapper around libpq to obtain server version.
375  */
376 static int
libpqrcv_server_version(WalReceiverConn * conn)377 libpqrcv_server_version(WalReceiverConn *conn)
378 {
379 	return PQserverVersion(conn->streamConn);
380 }
381 
382 /*
383  * Start streaming WAL data from given streaming options.
384  *
385  * Returns true if we switched successfully to copy-both mode. False
386  * means the server received the command and executed it successfully, but
387  * didn't switch to copy-mode.  That means that there was no WAL on the
388  * requested timeline and starting point, because the server switched to
389  * another timeline at or before the requested starting point. On failure,
390  * throws an ERROR.
391  */
392 static bool
libpqrcv_startstreaming(WalReceiverConn * conn,const WalRcvStreamOptions * options)393 libpqrcv_startstreaming(WalReceiverConn *conn,
394 						const WalRcvStreamOptions *options)
395 {
396 	StringInfoData cmd;
397 	PGresult   *res;
398 
399 	Assert(options->logical == conn->logical);
400 	Assert(options->slotname || !options->logical);
401 
402 	initStringInfo(&cmd);
403 
404 	/* Build the command. */
405 	appendStringInfoString(&cmd, "START_REPLICATION");
406 	if (options->slotname != NULL)
407 		appendStringInfo(&cmd, " SLOT \"%s\"",
408 						 options->slotname);
409 
410 	if (options->logical)
411 		appendStringInfoString(&cmd, " LOGICAL");
412 
413 	appendStringInfo(&cmd, " %X/%X",
414 					 (uint32) (options->startpoint >> 32),
415 					 (uint32) options->startpoint);
416 
417 	/*
418 	 * Additional options are different depending on if we are doing logical
419 	 * or physical replication.
420 	 */
421 	if (options->logical)
422 	{
423 		char	   *pubnames_str;
424 		List	   *pubnames;
425 		char	   *pubnames_literal;
426 
427 		appendStringInfoString(&cmd, " (");
428 
429 		appendStringInfo(&cmd, "proto_version '%u'",
430 						 options->proto.logical.proto_version);
431 
432 		pubnames = options->proto.logical.publication_names;
433 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
434 		if (!pubnames_str)
435 			ereport(ERROR,
436 					(errmsg("could not start WAL streaming: %s",
437 							pchomp(PQerrorMessage(conn->streamConn)))));
438 		pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
439 										   strlen(pubnames_str));
440 		if (!pubnames_literal)
441 			ereport(ERROR,
442 					(errmsg("could not start WAL streaming: %s",
443 							pchomp(PQerrorMessage(conn->streamConn)))));
444 		appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
445 		PQfreemem(pubnames_literal);
446 		pfree(pubnames_str);
447 
448 		appendStringInfoChar(&cmd, ')');
449 	}
450 	else
451 		appendStringInfo(&cmd, " TIMELINE %u",
452 						 options->proto.physical.startpointTLI);
453 
454 	/* Start streaming. */
455 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
456 	pfree(cmd.data);
457 
458 	if (PQresultStatus(res) == PGRES_COMMAND_OK)
459 	{
460 		PQclear(res);
461 		return false;
462 	}
463 	else if (PQresultStatus(res) != PGRES_COPY_BOTH)
464 	{
465 		PQclear(res);
466 		ereport(ERROR,
467 				(errmsg("could not start WAL streaming: %s",
468 						pchomp(PQerrorMessage(conn->streamConn)))));
469 	}
470 	PQclear(res);
471 	return true;
472 }
473 
474 /*
475  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
476  * reported by the server, or 0 if it did not report it.
477  */
478 static void
libpqrcv_endstreaming(WalReceiverConn * conn,TimeLineID * next_tli)479 libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
480 {
481 	PGresult   *res;
482 
483 	/*
484 	 * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
485 	 * block, but the risk seems small.
486 	 */
487 	if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
488 		PQflush(conn->streamConn))
489 		ereport(ERROR,
490 				(errmsg("could not send end-of-streaming message to primary: %s",
491 						pchomp(PQerrorMessage(conn->streamConn)))));
492 
493 	*next_tli = 0;
494 
495 	/*
496 	 * After COPY is finished, we should receive a result set indicating the
497 	 * next timeline's ID, or just CommandComplete if the server was shut
498 	 * down.
499 	 *
500 	 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
501 	 * also possible in case we aborted the copy in mid-stream.
502 	 */
503 	res = libpqrcv_PQgetResult(conn->streamConn);
504 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
505 	{
506 		/*
507 		 * Read the next timeline's ID. The server also sends the timeline's
508 		 * starting point, but it is ignored.
509 		 */
510 		if (PQnfields(res) < 2 || PQntuples(res) != 1)
511 			ereport(ERROR,
512 					(errmsg("unexpected result set after end-of-streaming")));
513 		*next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
514 		PQclear(res);
515 
516 		/* the result set should be followed by CommandComplete */
517 		res = libpqrcv_PQgetResult(conn->streamConn);
518 	}
519 	else if (PQresultStatus(res) == PGRES_COPY_OUT)
520 	{
521 		PQclear(res);
522 
523 		/* End the copy */
524 		if (PQendcopy(conn->streamConn))
525 			ereport(ERROR,
526 					(errmsg("error while shutting down streaming COPY: %s",
527 							pchomp(PQerrorMessage(conn->streamConn)))));
528 
529 		/* CommandComplete should follow */
530 		res = libpqrcv_PQgetResult(conn->streamConn);
531 	}
532 
533 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
534 		ereport(ERROR,
535 				(errmsg("error reading result of streaming command: %s",
536 						pchomp(PQerrorMessage(conn->streamConn)))));
537 	PQclear(res);
538 
539 	/* Verify that there are no more results */
540 	res = libpqrcv_PQgetResult(conn->streamConn);
541 	if (res != NULL)
542 		ereport(ERROR,
543 				(errmsg("unexpected result after CommandComplete: %s",
544 						pchomp(PQerrorMessage(conn->streamConn)))));
545 }
546 
547 /*
548  * Fetch the timeline history file for 'tli' from primary.
549  */
550 static void
libpqrcv_readtimelinehistoryfile(WalReceiverConn * conn,TimeLineID tli,char ** filename,char ** content,int * len)551 libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
552 								 TimeLineID tli, char **filename,
553 								 char **content, int *len)
554 {
555 	PGresult   *res;
556 	char		cmd[64];
557 
558 	Assert(!conn->logical);
559 
560 	/*
561 	 * Request the primary to send over the history file for given timeline.
562 	 */
563 	snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
564 	res = libpqrcv_PQexec(conn->streamConn, cmd);
565 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
566 	{
567 		PQclear(res);
568 		ereport(ERROR,
569 				(errmsg("could not receive timeline history file from "
570 						"the primary server: %s",
571 						pchomp(PQerrorMessage(conn->streamConn)))));
572 	}
573 	if (PQnfields(res) != 2 || PQntuples(res) != 1)
574 	{
575 		int			ntuples = PQntuples(res);
576 		int			nfields = PQnfields(res);
577 
578 		PQclear(res);
579 		ereport(ERROR,
580 				(errmsg("invalid response from primary server"),
581 				 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
582 						   ntuples, nfields)));
583 	}
584 	*filename = pstrdup(PQgetvalue(res, 0, 0));
585 
586 	*len = PQgetlength(res, 0, 1);
587 	*content = palloc(*len);
588 	memcpy(*content, PQgetvalue(res, 0, 1), *len);
589 	PQclear(res);
590 }
591 
592 /*
593  * Send a query and wait for the results by using the asynchronous libpq
594  * functions and socket readiness events.
595  *
596  * We must not use the regular blocking libpq functions like PQexec()
597  * since they are uninterruptible by signals on some platforms, such as
598  * Windows.
599  *
600  * The function is modeled on PQexec() in libpq, but only implements
601  * those parts that are in use in the walreceiver api.
602  *
603  * May return NULL, rather than an error result, on failure.
604  */
605 static PGresult *
libpqrcv_PQexec(PGconn * streamConn,const char * query)606 libpqrcv_PQexec(PGconn *streamConn, const char *query)
607 {
608 	PGresult   *lastResult = NULL;
609 
610 	/*
611 	 * PQexec() silently discards any prior query results on the connection.
612 	 * This is not required for this function as it's expected that the caller
613 	 * (which is this library in all cases) will behave correctly and we don't
614 	 * have to be backwards compatible with old libpq.
615 	 */
616 
617 	/*
618 	 * Submit the query.  Since we don't use non-blocking mode, this could
619 	 * theoretically block.  In practice, since we don't send very long query
620 	 * strings, the risk seems negligible.
621 	 */
622 	if (!PQsendQuery(streamConn, query))
623 		return NULL;
624 
625 	for (;;)
626 	{
627 		/* Wait for, and collect, the next PGresult. */
628 		PGresult   *result;
629 
630 		result = libpqrcv_PQgetResult(streamConn);
631 		if (result == NULL)
632 			break;				/* query is complete, or failure */
633 
634 		/*
635 		 * Emulate PQexec()'s behavior of returning the last result when there
636 		 * are many.  We are fine with returning just last error message.
637 		 */
638 		PQclear(lastResult);
639 		lastResult = result;
640 
641 		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
642 			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
643 			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
644 			PQstatus(streamConn) == CONNECTION_BAD)
645 			break;
646 	}
647 
648 	return lastResult;
649 }
650 
651 /*
652  * Perform the equivalent of PQgetResult(), but watch for interrupts.
653  */
654 static PGresult *
libpqrcv_PQgetResult(PGconn * streamConn)655 libpqrcv_PQgetResult(PGconn *streamConn)
656 {
657 	/*
658 	 * Collect data until PQgetResult is ready to get the result without
659 	 * blocking.
660 	 */
661 	while (PQisBusy(streamConn))
662 	{
663 		int			rc;
664 
665 		/*
666 		 * We don't need to break down the sleep into smaller increments,
667 		 * since we'll get interrupted by signals and can handle any
668 		 * interrupts here.
669 		 */
670 		rc = WaitLatchOrSocket(MyLatch,
671 							   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
672 							   WL_LATCH_SET,
673 							   PQsocket(streamConn),
674 							   0,
675 							   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
676 
677 		/* Interrupted? */
678 		if (rc & WL_LATCH_SET)
679 		{
680 			ResetLatch(MyLatch);
681 			ProcessWalRcvInterrupts();
682 		}
683 
684 		/* Consume whatever data is available from the socket */
685 		if (PQconsumeInput(streamConn) == 0)
686 		{
687 			/* trouble; return NULL */
688 			return NULL;
689 		}
690 	}
691 
692 	/* Now we can collect and return the next PGresult */
693 	return PQgetResult(streamConn);
694 }
695 
696 /*
697  * Disconnect connection to primary, if any.
698  */
699 static void
libpqrcv_disconnect(WalReceiverConn * conn)700 libpqrcv_disconnect(WalReceiverConn *conn)
701 {
702 	PQfinish(conn->streamConn);
703 	if (conn->recvBuf != NULL)
704 		PQfreemem(conn->recvBuf);
705 	pfree(conn);
706 }
707 
708 /*
709  * Receive a message available from XLOG stream.
710  *
711  * Returns:
712  *
713  *	 If data was received, returns the length of the data. *buffer is set to
714  *	 point to a buffer holding the received message. The buffer is only valid
715  *	 until the next libpqrcv_* call.
716  *
717  *	 If no data was available immediately, returns 0, and *wait_fd is set to a
718  *	 socket descriptor which can be waited on before trying again.
719  *
720  *	 -1 if the server ended the COPY.
721  *
722  * ereports on error.
723  */
724 static int
libpqrcv_receive(WalReceiverConn * conn,char ** buffer,pgsocket * wait_fd)725 libpqrcv_receive(WalReceiverConn *conn, char **buffer,
726 				 pgsocket *wait_fd)
727 {
728 	int			rawlen;
729 
730 	if (conn->recvBuf != NULL)
731 		PQfreemem(conn->recvBuf);
732 	conn->recvBuf = NULL;
733 
734 	/* Try to receive a CopyData message */
735 	rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
736 	if (rawlen == 0)
737 	{
738 		/* Try consuming some data. */
739 		if (PQconsumeInput(conn->streamConn) == 0)
740 			ereport(ERROR,
741 					(errmsg("could not receive data from WAL stream: %s",
742 							pchomp(PQerrorMessage(conn->streamConn)))));
743 
744 		/* Now that we've consumed some input, try again */
745 		rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
746 		if (rawlen == 0)
747 		{
748 			/* Tell caller to try again when our socket is ready. */
749 			*wait_fd = PQsocket(conn->streamConn);
750 			return 0;
751 		}
752 	}
753 	if (rawlen == -1)			/* end-of-streaming or error */
754 	{
755 		PGresult   *res;
756 
757 		res = libpqrcv_PQgetResult(conn->streamConn);
758 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
759 		{
760 			PQclear(res);
761 
762 			/* Verify that there are no more results. */
763 			res = libpqrcv_PQgetResult(conn->streamConn);
764 			if (res != NULL)
765 			{
766 				PQclear(res);
767 
768 				/*
769 				 * If the other side closed the connection orderly (otherwise
770 				 * we'd seen an error, or PGRES_COPY_IN) don't report an error
771 				 * here, but let callers deal with it.
772 				 */
773 				if (PQstatus(conn->streamConn) == CONNECTION_BAD)
774 					return -1;
775 
776 				ereport(ERROR,
777 						(errmsg("unexpected result after CommandComplete: %s",
778 								PQerrorMessage(conn->streamConn))));
779 			}
780 
781 			return -1;
782 		}
783 		else if (PQresultStatus(res) == PGRES_COPY_IN)
784 		{
785 			PQclear(res);
786 			return -1;
787 		}
788 		else
789 		{
790 			PQclear(res);
791 			ereport(ERROR,
792 					(errmsg("could not receive data from WAL stream: %s",
793 							pchomp(PQerrorMessage(conn->streamConn)))));
794 		}
795 	}
796 	if (rawlen < -1)
797 		ereport(ERROR,
798 				(errmsg("could not receive data from WAL stream: %s",
799 						pchomp(PQerrorMessage(conn->streamConn)))));
800 
801 	/* Return received messages to caller */
802 	*buffer = conn->recvBuf;
803 	return rawlen;
804 }
805 
806 /*
807  * Send a message to XLOG stream.
808  *
809  * ereports on error.
810  */
811 static void
libpqrcv_send(WalReceiverConn * conn,const char * buffer,int nbytes)812 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
813 {
814 	if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
815 		PQflush(conn->streamConn))
816 		ereport(ERROR,
817 				(errmsg("could not send data to WAL stream: %s",
818 						pchomp(PQerrorMessage(conn->streamConn)))));
819 }
820 
821 /*
822  * Create new replication slot.
823  * Returns the name of the exported snapshot for logical slot or NULL for
824  * physical slot.
825  */
826 static char *
libpqrcv_create_slot(WalReceiverConn * conn,const char * slotname,bool temporary,CRSSnapshotAction snapshot_action,XLogRecPtr * lsn)827 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
828 					 bool temporary, CRSSnapshotAction snapshot_action,
829 					 XLogRecPtr *lsn)
830 {
831 	PGresult   *res;
832 	StringInfoData cmd;
833 	char	   *snapshot;
834 
835 	initStringInfo(&cmd);
836 
837 	appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
838 
839 	if (temporary)
840 		appendStringInfoString(&cmd, " TEMPORARY");
841 
842 	if (conn->logical)
843 	{
844 		appendStringInfoString(&cmd, " LOGICAL pgoutput");
845 		switch (snapshot_action)
846 		{
847 			case CRS_EXPORT_SNAPSHOT:
848 				appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
849 				break;
850 			case CRS_NOEXPORT_SNAPSHOT:
851 				appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
852 				break;
853 			case CRS_USE_SNAPSHOT:
854 				appendStringInfoString(&cmd, " USE_SNAPSHOT");
855 				break;
856 		}
857 	}
858 
859 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
860 	pfree(cmd.data);
861 
862 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
863 	{
864 		PQclear(res);
865 		ereport(ERROR,
866 				(errmsg("could not create replication slot \"%s\": %s",
867 						slotname, pchomp(PQerrorMessage(conn->streamConn)))));
868 	}
869 
870 	*lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
871 											   CStringGetDatum(PQgetvalue(res, 0, 1))));
872 	if (!PQgetisnull(res, 0, 2))
873 		snapshot = pstrdup(PQgetvalue(res, 0, 2));
874 	else
875 		snapshot = NULL;
876 
877 	PQclear(res);
878 
879 	return snapshot;
880 }
881 
882 /*
883  * Convert tuple query result to tuplestore.
884  */
885 static void
libpqrcv_processTuples(PGresult * pgres,WalRcvExecResult * walres,const int nRetTypes,const Oid * retTypes)886 libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
887 					   const int nRetTypes, const Oid *retTypes)
888 {
889 	int			tupn;
890 	int			coln;
891 	int			nfields = PQnfields(pgres);
892 	HeapTuple	tuple;
893 	AttInMetadata *attinmeta;
894 	MemoryContext rowcontext;
895 	MemoryContext oldcontext;
896 
897 	/* Make sure we got expected number of fields. */
898 	if (nfields != nRetTypes)
899 		ereport(ERROR,
900 				(errmsg("invalid query response"),
901 				 errdetail("Expected %d fields, got %d fields.",
902 						   nRetTypes, nfields)));
903 
904 	walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
905 
906 	/* Create tuple descriptor corresponding to expected result. */
907 	walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
908 	for (coln = 0; coln < nRetTypes; coln++)
909 		TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
910 						   PQfname(pgres, coln), retTypes[coln], -1, 0);
911 	attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
912 
913 	/* No point in doing more here if there were no tuples returned. */
914 	if (PQntuples(pgres) == 0)
915 		return;
916 
917 	/* Create temporary context for local allocations. */
918 	rowcontext = AllocSetContextCreate(CurrentMemoryContext,
919 									   "libpqrcv query result context",
920 									   ALLOCSET_DEFAULT_SIZES);
921 
922 	/* Process returned rows. */
923 	for (tupn = 0; tupn < PQntuples(pgres); tupn++)
924 	{
925 		char	   *cstrs[MaxTupleAttributeNumber];
926 
927 		ProcessWalRcvInterrupts();
928 
929 		/* Do the allocations in temporary context. */
930 		oldcontext = MemoryContextSwitchTo(rowcontext);
931 
932 		/*
933 		 * Fill cstrs with null-terminated strings of column values.
934 		 */
935 		for (coln = 0; coln < nfields; coln++)
936 		{
937 			if (PQgetisnull(pgres, tupn, coln))
938 				cstrs[coln] = NULL;
939 			else
940 				cstrs[coln] = PQgetvalue(pgres, tupn, coln);
941 		}
942 
943 		/* Convert row to a tuple, and add it to the tuplestore */
944 		tuple = BuildTupleFromCStrings(attinmeta, cstrs);
945 		tuplestore_puttuple(walres->tuplestore, tuple);
946 
947 		/* Clean up */
948 		MemoryContextSwitchTo(oldcontext);
949 		MemoryContextReset(rowcontext);
950 	}
951 
952 	MemoryContextDelete(rowcontext);
953 }
954 
955 /*
956  * Public interface for sending generic queries (and commands).
957  *
958  * This can only be called from process connected to database.
959  */
960 static WalRcvExecResult *
libpqrcv_exec(WalReceiverConn * conn,const char * query,const int nRetTypes,const Oid * retTypes)961 libpqrcv_exec(WalReceiverConn *conn, const char *query,
962 			  const int nRetTypes, const Oid *retTypes)
963 {
964 	PGresult   *pgres = NULL;
965 	WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
966 
967 	if (MyDatabaseId == InvalidOid)
968 		ereport(ERROR,
969 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
970 				 errmsg("the query interface requires a database connection")));
971 
972 	pgres = libpqrcv_PQexec(conn->streamConn, query);
973 
974 	switch (PQresultStatus(pgres))
975 	{
976 		case PGRES_SINGLE_TUPLE:
977 		case PGRES_TUPLES_OK:
978 			walres->status = WALRCV_OK_TUPLES;
979 			libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
980 			break;
981 
982 		case PGRES_COPY_IN:
983 			walres->status = WALRCV_OK_COPY_IN;
984 			break;
985 
986 		case PGRES_COPY_OUT:
987 			walres->status = WALRCV_OK_COPY_OUT;
988 			break;
989 
990 		case PGRES_COPY_BOTH:
991 			walres->status = WALRCV_OK_COPY_BOTH;
992 			break;
993 
994 		case PGRES_COMMAND_OK:
995 			walres->status = WALRCV_OK_COMMAND;
996 			break;
997 
998 			/* Empty query is considered error. */
999 		case PGRES_EMPTY_QUERY:
1000 			walres->status = WALRCV_ERROR;
1001 			walres->err = _("empty query");
1002 			break;
1003 
1004 		case PGRES_NONFATAL_ERROR:
1005 		case PGRES_FATAL_ERROR:
1006 		case PGRES_BAD_RESPONSE:
1007 			walres->status = WALRCV_ERROR;
1008 			walres->err = pchomp(PQerrorMessage(conn->streamConn));
1009 			break;
1010 	}
1011 
1012 	PQclear(pgres);
1013 
1014 	return walres;
1015 }
1016 
1017 /*
1018  * Given a List of strings, return it as single comma separated
1019  * string, quoting identifiers as needed.
1020  *
1021  * This is essentially the reverse of SplitIdentifierString.
1022  *
1023  * The caller should free the result.
1024  */
1025 static char *
stringlist_to_identifierstr(PGconn * conn,List * strings)1026 stringlist_to_identifierstr(PGconn *conn, List *strings)
1027 {
1028 	ListCell   *lc;
1029 	StringInfoData res;
1030 	bool		first = true;
1031 
1032 	initStringInfo(&res);
1033 
1034 	foreach(lc, strings)
1035 	{
1036 		char	   *val = strVal(lfirst(lc));
1037 		char	   *val_escaped;
1038 
1039 		if (first)
1040 			first = false;
1041 		else
1042 			appendStringInfoChar(&res, ',');
1043 
1044 		val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1045 		if (!val_escaped)
1046 		{
1047 			free(res.data);
1048 			return NULL;
1049 		}
1050 		appendStringInfoString(&res, val_escaped);
1051 		PQfreemem(val_escaped);
1052 	}
1053 
1054 	return res.data;
1055 }
1056