1 /*-------------------------------------------------------------------------
2 *
3 * libpqwalreceiver.c
4 *
5 * This file contains the libpq-specific parts of walreceiver. It's
6 * loaded as a dynamic module to avoid linking the main server binary with
7 * libpq.
8 *
9 * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
10 *
11 *
12 * IDENTIFICATION
13 * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14 *
15 *-------------------------------------------------------------------------
16 */
17 #include "postgres.h"
18
19 #include <unistd.h>
20 #include <sys/time.h>
21
22 #include "libpq-fe.h"
23 #include "pqexpbuffer.h"
24 #include "access/xlog.h"
25 #include "catalog/pg_type.h"
26 #include "common/connect.h"
27 #include "funcapi.h"
28 #include "mb/pg_wchar.h"
29 #include "miscadmin.h"
30 #include "pgstat.h"
31 #include "replication/walreceiver.h"
32 #include "utils/builtins.h"
33 #include "utils/memutils.h"
34 #include "utils/pg_lsn.h"
35 #include "utils/tuplestore.h"
36
37 PG_MODULE_MAGIC;
38
39 void _PG_init(void);
40
41 struct WalReceiverConn
42 {
43 /* Current connection to the primary, if any */
44 PGconn *streamConn;
45 /* Used to remember if the connection is logical or physical */
46 bool logical;
47 /* Buffer for currently read records */
48 char *recvBuf;
49 };
50
51 /* Prototypes for interface functions */
52 static WalReceiverConn *libpqrcv_connect(const char *conninfo,
53 bool logical, const char *appname,
54 char **err);
55 static void libpqrcv_check_conninfo(const char *conninfo);
56 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
57 static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
58 char **sender_host, int *sender_port);
59 static char *libpqrcv_identify_system(WalReceiverConn *conn,
60 TimeLineID *primary_tli);
61 static int libpqrcv_server_version(WalReceiverConn *conn);
62 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
63 TimeLineID tli, char **filename,
64 char **content, int *len);
65 static bool libpqrcv_startstreaming(WalReceiverConn *conn,
66 const WalRcvStreamOptions *options);
67 static void libpqrcv_endstreaming(WalReceiverConn *conn,
68 TimeLineID *next_tli);
69 static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
70 pgsocket *wait_fd);
71 static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
72 int nbytes);
73 static char *libpqrcv_create_slot(WalReceiverConn *conn,
74 const char *slotname,
75 bool temporary,
76 CRSSnapshotAction snapshot_action,
77 XLogRecPtr *lsn);
78 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
79 const char *query,
80 const int nRetTypes,
81 const Oid *retTypes);
82 static void libpqrcv_disconnect(WalReceiverConn *conn);
83
84 static WalReceiverFunctionsType PQWalReceiverFunctions = {
85 libpqrcv_connect,
86 libpqrcv_check_conninfo,
87 libpqrcv_get_conninfo,
88 libpqrcv_get_senderinfo,
89 libpqrcv_identify_system,
90 libpqrcv_server_version,
91 libpqrcv_readtimelinehistoryfile,
92 libpqrcv_startstreaming,
93 libpqrcv_endstreaming,
94 libpqrcv_receive,
95 libpqrcv_send,
96 libpqrcv_create_slot,
97 libpqrcv_exec,
98 libpqrcv_disconnect
99 };
100
101 /* Prototypes for private functions */
102 static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
103 static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
104 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
105
106 /*
107 * Module initialization function
108 */
109 void
_PG_init(void)110 _PG_init(void)
111 {
112 if (WalReceiverFunctions != NULL)
113 elog(ERROR, "libpqwalreceiver already loaded");
114 WalReceiverFunctions = &PQWalReceiverFunctions;
115 }
116
117 /*
118 * Establish the connection to the primary server for XLOG streaming
119 *
120 * Returns NULL on error and fills the err with palloc'ed error message.
121 */
122 static WalReceiverConn *
libpqrcv_connect(const char * conninfo,bool logical,const char * appname,char ** err)123 libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
124 char **err)
125 {
126 WalReceiverConn *conn;
127 PostgresPollingStatusType status;
128 const char *keys[5];
129 const char *vals[5];
130 int i = 0;
131
132 /*
133 * We use the expand_dbname parameter to process the connection string (or
134 * URI), and pass some extra options.
135 */
136 keys[i] = "dbname";
137 vals[i] = conninfo;
138 keys[++i] = "replication";
139 vals[i] = logical ? "database" : "true";
140 if (!logical)
141 {
142 /*
143 * The database name is ignored by the server in replication mode, but
144 * specify "replication" for .pgpass lookup.
145 */
146 keys[++i] = "dbname";
147 vals[i] = "replication";
148 }
149 keys[++i] = "fallback_application_name";
150 vals[i] = appname;
151 if (logical)
152 {
153 keys[++i] = "client_encoding";
154 vals[i] = GetDatabaseEncodingName();
155 }
156 keys[++i] = NULL;
157 vals[i] = NULL;
158
159 Assert(i < sizeof(keys));
160
161 conn = palloc0(sizeof(WalReceiverConn));
162 conn->streamConn = PQconnectStartParams(keys, vals,
163 /* expand_dbname = */ true);
164 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
165 {
166 *err = pchomp(PQerrorMessage(conn->streamConn));
167 return NULL;
168 }
169
170 /*
171 * Poll connection until we have OK or FAILED status.
172 *
173 * Per spec for PQconnectPoll, first wait till socket is write-ready.
174 */
175 status = PGRES_POLLING_WRITING;
176 do
177 {
178 int io_flag;
179 int rc;
180
181 if (status == PGRES_POLLING_READING)
182 io_flag = WL_SOCKET_READABLE;
183 #ifdef WIN32
184 /* Windows needs a different test while waiting for connection-made */
185 else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
186 io_flag = WL_SOCKET_CONNECTED;
187 #endif
188 else
189 io_flag = WL_SOCKET_WRITEABLE;
190
191 rc = WaitLatchOrSocket(MyLatch,
192 WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
193 PQsocket(conn->streamConn),
194 0,
195 WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
196
197 /* Interrupted? */
198 if (rc & WL_LATCH_SET)
199 {
200 ResetLatch(MyLatch);
201 ProcessWalRcvInterrupts();
202 }
203
204 /* If socket is ready, advance the libpq state machine */
205 if (rc & io_flag)
206 status = PQconnectPoll(conn->streamConn);
207 } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
208
209 if (PQstatus(conn->streamConn) != CONNECTION_OK)
210 {
211 *err = pchomp(PQerrorMessage(conn->streamConn));
212 return NULL;
213 }
214
215 if (logical)
216 {
217 PGresult *res;
218
219 res = libpqrcv_PQexec(conn->streamConn,
220 ALWAYS_SECURE_SEARCH_PATH_SQL);
221 if (PQresultStatus(res) != PGRES_TUPLES_OK)
222 {
223 PQclear(res);
224 ereport(ERROR,
225 (errmsg("could not clear search path: %s",
226 pchomp(PQerrorMessage(conn->streamConn)))));
227 }
228 PQclear(res);
229 }
230
231 conn->logical = logical;
232
233 return conn;
234 }
235
236 /*
237 * Validate connection info string (just try to parse it)
238 */
239 static void
libpqrcv_check_conninfo(const char * conninfo)240 libpqrcv_check_conninfo(const char *conninfo)
241 {
242 PQconninfoOption *opts = NULL;
243 char *err = NULL;
244
245 opts = PQconninfoParse(conninfo, &err);
246 if (opts == NULL)
247 {
248 /* The error string is malloc'd, so we must free it explicitly */
249 char *errcopy = err ? pstrdup(err) : "out of memory";
250
251 PQfreemem(err);
252 ereport(ERROR,
253 (errcode(ERRCODE_SYNTAX_ERROR),
254 errmsg("invalid connection string syntax: %s", errcopy)));
255 }
256
257 PQconninfoFree(opts);
258 }
259
260 /*
261 * Return a user-displayable conninfo string. Any security-sensitive fields
262 * are obfuscated.
263 */
264 static char *
libpqrcv_get_conninfo(WalReceiverConn * conn)265 libpqrcv_get_conninfo(WalReceiverConn *conn)
266 {
267 PQconninfoOption *conn_opts;
268 PQconninfoOption *conn_opt;
269 PQExpBufferData buf;
270 char *retval;
271
272 Assert(conn->streamConn != NULL);
273
274 initPQExpBuffer(&buf);
275 conn_opts = PQconninfo(conn->streamConn);
276
277 if (conn_opts == NULL)
278 ereport(ERROR,
279 (errmsg("could not parse connection string: %s",
280 _("out of memory"))));
281
282 /* build a clean connection string from pieces */
283 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
284 {
285 bool obfuscate;
286
287 /* Skip debug and empty options */
288 if (strchr(conn_opt->dispchar, 'D') ||
289 conn_opt->val == NULL ||
290 conn_opt->val[0] == '\0')
291 continue;
292
293 /* Obfuscate security-sensitive options */
294 obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
295
296 appendPQExpBuffer(&buf, "%s%s=%s",
297 buf.len == 0 ? "" : " ",
298 conn_opt->keyword,
299 obfuscate ? "********" : conn_opt->val);
300 }
301
302 PQconninfoFree(conn_opts);
303
304 retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
305 termPQExpBuffer(&buf);
306 return retval;
307 }
308
309 /*
310 * Provides information of sender this WAL receiver is connected to.
311 */
312 static void
libpqrcv_get_senderinfo(WalReceiverConn * conn,char ** sender_host,int * sender_port)313 libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
314 int *sender_port)
315 {
316 char *ret = NULL;
317
318 *sender_host = NULL;
319 *sender_port = 0;
320
321 Assert(conn->streamConn != NULL);
322
323 ret = PQhost(conn->streamConn);
324 if (ret && strlen(ret) != 0)
325 *sender_host = pstrdup(ret);
326
327 ret = PQport(conn->streamConn);
328 if (ret && strlen(ret) != 0)
329 *sender_port = atoi(ret);
330 }
331
332 /*
333 * Check that primary's system identifier matches ours, and fetch the current
334 * timeline ID of the primary.
335 */
336 static char *
libpqrcv_identify_system(WalReceiverConn * conn,TimeLineID * primary_tli)337 libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
338 {
339 PGresult *res;
340 char *primary_sysid;
341
342 /*
343 * Get the system identifier and timeline ID as a DataRow message from the
344 * primary server.
345 */
346 res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
347 if (PQresultStatus(res) != PGRES_TUPLES_OK)
348 {
349 PQclear(res);
350 ereport(ERROR,
351 (errmsg("could not receive database system identifier and timeline ID from "
352 "the primary server: %s",
353 pchomp(PQerrorMessage(conn->streamConn)))));
354 }
355 if (PQnfields(res) < 3 || PQntuples(res) != 1)
356 {
357 int ntuples = PQntuples(res);
358 int nfields = PQnfields(res);
359
360 PQclear(res);
361 ereport(ERROR,
362 (errmsg("invalid response from primary server"),
363 errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
364 ntuples, nfields, 3, 1)));
365 }
366 primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
367 *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
368 PQclear(res);
369
370 return primary_sysid;
371 }
372
373 /*
374 * Thin wrapper around libpq to obtain server version.
375 */
376 static int
libpqrcv_server_version(WalReceiverConn * conn)377 libpqrcv_server_version(WalReceiverConn *conn)
378 {
379 return PQserverVersion(conn->streamConn);
380 }
381
382 /*
383 * Start streaming WAL data from given streaming options.
384 *
385 * Returns true if we switched successfully to copy-both mode. False
386 * means the server received the command and executed it successfully, but
387 * didn't switch to copy-mode. That means that there was no WAL on the
388 * requested timeline and starting point, because the server switched to
389 * another timeline at or before the requested starting point. On failure,
390 * throws an ERROR.
391 */
392 static bool
libpqrcv_startstreaming(WalReceiverConn * conn,const WalRcvStreamOptions * options)393 libpqrcv_startstreaming(WalReceiverConn *conn,
394 const WalRcvStreamOptions *options)
395 {
396 StringInfoData cmd;
397 PGresult *res;
398
399 Assert(options->logical == conn->logical);
400 Assert(options->slotname || !options->logical);
401
402 initStringInfo(&cmd);
403
404 /* Build the command. */
405 appendStringInfoString(&cmd, "START_REPLICATION");
406 if (options->slotname != NULL)
407 appendStringInfo(&cmd, " SLOT \"%s\"",
408 options->slotname);
409
410 if (options->logical)
411 appendStringInfoString(&cmd, " LOGICAL");
412
413 appendStringInfo(&cmd, " %X/%X",
414 (uint32) (options->startpoint >> 32),
415 (uint32) options->startpoint);
416
417 /*
418 * Additional options are different depending on if we are doing logical
419 * or physical replication.
420 */
421 if (options->logical)
422 {
423 char *pubnames_str;
424 List *pubnames;
425 char *pubnames_literal;
426
427 appendStringInfoString(&cmd, " (");
428
429 appendStringInfo(&cmd, "proto_version '%u'",
430 options->proto.logical.proto_version);
431
432 pubnames = options->proto.logical.publication_names;
433 pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
434 if (!pubnames_str)
435 ereport(ERROR,
436 (errmsg("could not start WAL streaming: %s",
437 pchomp(PQerrorMessage(conn->streamConn)))));
438 pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
439 strlen(pubnames_str));
440 if (!pubnames_literal)
441 ereport(ERROR,
442 (errmsg("could not start WAL streaming: %s",
443 pchomp(PQerrorMessage(conn->streamConn)))));
444 appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
445 PQfreemem(pubnames_literal);
446 pfree(pubnames_str);
447
448 appendStringInfoChar(&cmd, ')');
449 }
450 else
451 appendStringInfo(&cmd, " TIMELINE %u",
452 options->proto.physical.startpointTLI);
453
454 /* Start streaming. */
455 res = libpqrcv_PQexec(conn->streamConn, cmd.data);
456 pfree(cmd.data);
457
458 if (PQresultStatus(res) == PGRES_COMMAND_OK)
459 {
460 PQclear(res);
461 return false;
462 }
463 else if (PQresultStatus(res) != PGRES_COPY_BOTH)
464 {
465 PQclear(res);
466 ereport(ERROR,
467 (errmsg("could not start WAL streaming: %s",
468 pchomp(PQerrorMessage(conn->streamConn)))));
469 }
470 PQclear(res);
471 return true;
472 }
473
474 /*
475 * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
476 * reported by the server, or 0 if it did not report it.
477 */
478 static void
libpqrcv_endstreaming(WalReceiverConn * conn,TimeLineID * next_tli)479 libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
480 {
481 PGresult *res;
482
483 /*
484 * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
485 * block, but the risk seems small.
486 */
487 if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
488 PQflush(conn->streamConn))
489 ereport(ERROR,
490 (errmsg("could not send end-of-streaming message to primary: %s",
491 pchomp(PQerrorMessage(conn->streamConn)))));
492
493 *next_tli = 0;
494
495 /*
496 * After COPY is finished, we should receive a result set indicating the
497 * next timeline's ID, or just CommandComplete if the server was shut
498 * down.
499 *
500 * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
501 * also possible in case we aborted the copy in mid-stream.
502 */
503 res = libpqrcv_PQgetResult(conn->streamConn);
504 if (PQresultStatus(res) == PGRES_TUPLES_OK)
505 {
506 /*
507 * Read the next timeline's ID. The server also sends the timeline's
508 * starting point, but it is ignored.
509 */
510 if (PQnfields(res) < 2 || PQntuples(res) != 1)
511 ereport(ERROR,
512 (errmsg("unexpected result set after end-of-streaming")));
513 *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
514 PQclear(res);
515
516 /* the result set should be followed by CommandComplete */
517 res = libpqrcv_PQgetResult(conn->streamConn);
518 }
519 else if (PQresultStatus(res) == PGRES_COPY_OUT)
520 {
521 PQclear(res);
522
523 /* End the copy */
524 if (PQendcopy(conn->streamConn))
525 ereport(ERROR,
526 (errmsg("error while shutting down streaming COPY: %s",
527 pchomp(PQerrorMessage(conn->streamConn)))));
528
529 /* CommandComplete should follow */
530 res = libpqrcv_PQgetResult(conn->streamConn);
531 }
532
533 if (PQresultStatus(res) != PGRES_COMMAND_OK)
534 ereport(ERROR,
535 (errmsg("error reading result of streaming command: %s",
536 pchomp(PQerrorMessage(conn->streamConn)))));
537 PQclear(res);
538
539 /* Verify that there are no more results */
540 res = libpqrcv_PQgetResult(conn->streamConn);
541 if (res != NULL)
542 ereport(ERROR,
543 (errmsg("unexpected result after CommandComplete: %s",
544 pchomp(PQerrorMessage(conn->streamConn)))));
545 }
546
547 /*
548 * Fetch the timeline history file for 'tli' from primary.
549 */
550 static void
libpqrcv_readtimelinehistoryfile(WalReceiverConn * conn,TimeLineID tli,char ** filename,char ** content,int * len)551 libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
552 TimeLineID tli, char **filename,
553 char **content, int *len)
554 {
555 PGresult *res;
556 char cmd[64];
557
558 Assert(!conn->logical);
559
560 /*
561 * Request the primary to send over the history file for given timeline.
562 */
563 snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
564 res = libpqrcv_PQexec(conn->streamConn, cmd);
565 if (PQresultStatus(res) != PGRES_TUPLES_OK)
566 {
567 PQclear(res);
568 ereport(ERROR,
569 (errmsg("could not receive timeline history file from "
570 "the primary server: %s",
571 pchomp(PQerrorMessage(conn->streamConn)))));
572 }
573 if (PQnfields(res) != 2 || PQntuples(res) != 1)
574 {
575 int ntuples = PQntuples(res);
576 int nfields = PQnfields(res);
577
578 PQclear(res);
579 ereport(ERROR,
580 (errmsg("invalid response from primary server"),
581 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
582 ntuples, nfields)));
583 }
584 *filename = pstrdup(PQgetvalue(res, 0, 0));
585
586 *len = PQgetlength(res, 0, 1);
587 *content = palloc(*len);
588 memcpy(*content, PQgetvalue(res, 0, 1), *len);
589 PQclear(res);
590 }
591
592 /*
593 * Send a query and wait for the results by using the asynchronous libpq
594 * functions and socket readiness events.
595 *
596 * We must not use the regular blocking libpq functions like PQexec()
597 * since they are uninterruptible by signals on some platforms, such as
598 * Windows.
599 *
600 * The function is modeled on PQexec() in libpq, but only implements
601 * those parts that are in use in the walreceiver api.
602 *
603 * May return NULL, rather than an error result, on failure.
604 */
605 static PGresult *
libpqrcv_PQexec(PGconn * streamConn,const char * query)606 libpqrcv_PQexec(PGconn *streamConn, const char *query)
607 {
608 PGresult *lastResult = NULL;
609
610 /*
611 * PQexec() silently discards any prior query results on the connection.
612 * This is not required for this function as it's expected that the caller
613 * (which is this library in all cases) will behave correctly and we don't
614 * have to be backwards compatible with old libpq.
615 */
616
617 /*
618 * Submit the query. Since we don't use non-blocking mode, this could
619 * theoretically block. In practice, since we don't send very long query
620 * strings, the risk seems negligible.
621 */
622 if (!PQsendQuery(streamConn, query))
623 return NULL;
624
625 for (;;)
626 {
627 /* Wait for, and collect, the next PGresult. */
628 PGresult *result;
629
630 result = libpqrcv_PQgetResult(streamConn);
631 if (result == NULL)
632 break; /* query is complete, or failure */
633
634 /*
635 * Emulate PQexec()'s behavior of returning the last result when there
636 * are many. We are fine with returning just last error message.
637 */
638 PQclear(lastResult);
639 lastResult = result;
640
641 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
642 PQresultStatus(lastResult) == PGRES_COPY_OUT ||
643 PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
644 PQstatus(streamConn) == CONNECTION_BAD)
645 break;
646 }
647
648 return lastResult;
649 }
650
651 /*
652 * Perform the equivalent of PQgetResult(), but watch for interrupts.
653 */
654 static PGresult *
libpqrcv_PQgetResult(PGconn * streamConn)655 libpqrcv_PQgetResult(PGconn *streamConn)
656 {
657 /*
658 * Collect data until PQgetResult is ready to get the result without
659 * blocking.
660 */
661 while (PQisBusy(streamConn))
662 {
663 int rc;
664
665 /*
666 * We don't need to break down the sleep into smaller increments,
667 * since we'll get interrupted by signals and can handle any
668 * interrupts here.
669 */
670 rc = WaitLatchOrSocket(MyLatch,
671 WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
672 WL_LATCH_SET,
673 PQsocket(streamConn),
674 0,
675 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
676
677 /* Interrupted? */
678 if (rc & WL_LATCH_SET)
679 {
680 ResetLatch(MyLatch);
681 ProcessWalRcvInterrupts();
682 }
683
684 /* Consume whatever data is available from the socket */
685 if (PQconsumeInput(streamConn) == 0)
686 {
687 /* trouble; return NULL */
688 return NULL;
689 }
690 }
691
692 /* Now we can collect and return the next PGresult */
693 return PQgetResult(streamConn);
694 }
695
696 /*
697 * Disconnect connection to primary, if any.
698 */
699 static void
libpqrcv_disconnect(WalReceiverConn * conn)700 libpqrcv_disconnect(WalReceiverConn *conn)
701 {
702 PQfinish(conn->streamConn);
703 if (conn->recvBuf != NULL)
704 PQfreemem(conn->recvBuf);
705 pfree(conn);
706 }
707
708 /*
709 * Receive a message available from XLOG stream.
710 *
711 * Returns:
712 *
713 * If data was received, returns the length of the data. *buffer is set to
714 * point to a buffer holding the received message. The buffer is only valid
715 * until the next libpqrcv_* call.
716 *
717 * If no data was available immediately, returns 0, and *wait_fd is set to a
718 * socket descriptor which can be waited on before trying again.
719 *
720 * -1 if the server ended the COPY.
721 *
722 * ereports on error.
723 */
724 static int
libpqrcv_receive(WalReceiverConn * conn,char ** buffer,pgsocket * wait_fd)725 libpqrcv_receive(WalReceiverConn *conn, char **buffer,
726 pgsocket *wait_fd)
727 {
728 int rawlen;
729
730 if (conn->recvBuf != NULL)
731 PQfreemem(conn->recvBuf);
732 conn->recvBuf = NULL;
733
734 /* Try to receive a CopyData message */
735 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
736 if (rawlen == 0)
737 {
738 /* Try consuming some data. */
739 if (PQconsumeInput(conn->streamConn) == 0)
740 ereport(ERROR,
741 (errmsg("could not receive data from WAL stream: %s",
742 pchomp(PQerrorMessage(conn->streamConn)))));
743
744 /* Now that we've consumed some input, try again */
745 rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
746 if (rawlen == 0)
747 {
748 /* Tell caller to try again when our socket is ready. */
749 *wait_fd = PQsocket(conn->streamConn);
750 return 0;
751 }
752 }
753 if (rawlen == -1) /* end-of-streaming or error */
754 {
755 PGresult *res;
756
757 res = libpqrcv_PQgetResult(conn->streamConn);
758 if (PQresultStatus(res) == PGRES_COMMAND_OK)
759 {
760 PQclear(res);
761
762 /* Verify that there are no more results. */
763 res = libpqrcv_PQgetResult(conn->streamConn);
764 if (res != NULL)
765 {
766 PQclear(res);
767
768 /*
769 * If the other side closed the connection orderly (otherwise
770 * we'd seen an error, or PGRES_COPY_IN) don't report an error
771 * here, but let callers deal with it.
772 */
773 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
774 return -1;
775
776 ereport(ERROR,
777 (errmsg("unexpected result after CommandComplete: %s",
778 PQerrorMessage(conn->streamConn))));
779 }
780
781 return -1;
782 }
783 else if (PQresultStatus(res) == PGRES_COPY_IN)
784 {
785 PQclear(res);
786 return -1;
787 }
788 else
789 {
790 PQclear(res);
791 ereport(ERROR,
792 (errmsg("could not receive data from WAL stream: %s",
793 pchomp(PQerrorMessage(conn->streamConn)))));
794 }
795 }
796 if (rawlen < -1)
797 ereport(ERROR,
798 (errmsg("could not receive data from WAL stream: %s",
799 pchomp(PQerrorMessage(conn->streamConn)))));
800
801 /* Return received messages to caller */
802 *buffer = conn->recvBuf;
803 return rawlen;
804 }
805
806 /*
807 * Send a message to XLOG stream.
808 *
809 * ereports on error.
810 */
811 static void
libpqrcv_send(WalReceiverConn * conn,const char * buffer,int nbytes)812 libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
813 {
814 if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
815 PQflush(conn->streamConn))
816 ereport(ERROR,
817 (errmsg("could not send data to WAL stream: %s",
818 pchomp(PQerrorMessage(conn->streamConn)))));
819 }
820
821 /*
822 * Create new replication slot.
823 * Returns the name of the exported snapshot for logical slot or NULL for
824 * physical slot.
825 */
826 static char *
libpqrcv_create_slot(WalReceiverConn * conn,const char * slotname,bool temporary,CRSSnapshotAction snapshot_action,XLogRecPtr * lsn)827 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
828 bool temporary, CRSSnapshotAction snapshot_action,
829 XLogRecPtr *lsn)
830 {
831 PGresult *res;
832 StringInfoData cmd;
833 char *snapshot;
834
835 initStringInfo(&cmd);
836
837 appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
838
839 if (temporary)
840 appendStringInfoString(&cmd, " TEMPORARY");
841
842 if (conn->logical)
843 {
844 appendStringInfoString(&cmd, " LOGICAL pgoutput");
845 switch (snapshot_action)
846 {
847 case CRS_EXPORT_SNAPSHOT:
848 appendStringInfoString(&cmd, " EXPORT_SNAPSHOT");
849 break;
850 case CRS_NOEXPORT_SNAPSHOT:
851 appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT");
852 break;
853 case CRS_USE_SNAPSHOT:
854 appendStringInfoString(&cmd, " USE_SNAPSHOT");
855 break;
856 }
857 }
858
859 res = libpqrcv_PQexec(conn->streamConn, cmd.data);
860 pfree(cmd.data);
861
862 if (PQresultStatus(res) != PGRES_TUPLES_OK)
863 {
864 PQclear(res);
865 ereport(ERROR,
866 (errmsg("could not create replication slot \"%s\": %s",
867 slotname, pchomp(PQerrorMessage(conn->streamConn)))));
868 }
869
870 *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
871 CStringGetDatum(PQgetvalue(res, 0, 1))));
872 if (!PQgetisnull(res, 0, 2))
873 snapshot = pstrdup(PQgetvalue(res, 0, 2));
874 else
875 snapshot = NULL;
876
877 PQclear(res);
878
879 return snapshot;
880 }
881
882 /*
883 * Convert tuple query result to tuplestore.
884 */
885 static void
libpqrcv_processTuples(PGresult * pgres,WalRcvExecResult * walres,const int nRetTypes,const Oid * retTypes)886 libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
887 const int nRetTypes, const Oid *retTypes)
888 {
889 int tupn;
890 int coln;
891 int nfields = PQnfields(pgres);
892 HeapTuple tuple;
893 AttInMetadata *attinmeta;
894 MemoryContext rowcontext;
895 MemoryContext oldcontext;
896
897 /* Make sure we got expected number of fields. */
898 if (nfields != nRetTypes)
899 ereport(ERROR,
900 (errmsg("invalid query response"),
901 errdetail("Expected %d fields, got %d fields.",
902 nRetTypes, nfields)));
903
904 walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
905
906 /* Create tuple descriptor corresponding to expected result. */
907 walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
908 for (coln = 0; coln < nRetTypes; coln++)
909 TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
910 PQfname(pgres, coln), retTypes[coln], -1, 0);
911 attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
912
913 /* No point in doing more here if there were no tuples returned. */
914 if (PQntuples(pgres) == 0)
915 return;
916
917 /* Create temporary context for local allocations. */
918 rowcontext = AllocSetContextCreate(CurrentMemoryContext,
919 "libpqrcv query result context",
920 ALLOCSET_DEFAULT_SIZES);
921
922 /* Process returned rows. */
923 for (tupn = 0; tupn < PQntuples(pgres); tupn++)
924 {
925 char *cstrs[MaxTupleAttributeNumber];
926
927 ProcessWalRcvInterrupts();
928
929 /* Do the allocations in temporary context. */
930 oldcontext = MemoryContextSwitchTo(rowcontext);
931
932 /*
933 * Fill cstrs with null-terminated strings of column values.
934 */
935 for (coln = 0; coln < nfields; coln++)
936 {
937 if (PQgetisnull(pgres, tupn, coln))
938 cstrs[coln] = NULL;
939 else
940 cstrs[coln] = PQgetvalue(pgres, tupn, coln);
941 }
942
943 /* Convert row to a tuple, and add it to the tuplestore */
944 tuple = BuildTupleFromCStrings(attinmeta, cstrs);
945 tuplestore_puttuple(walres->tuplestore, tuple);
946
947 /* Clean up */
948 MemoryContextSwitchTo(oldcontext);
949 MemoryContextReset(rowcontext);
950 }
951
952 MemoryContextDelete(rowcontext);
953 }
954
955 /*
956 * Public interface for sending generic queries (and commands).
957 *
958 * This can only be called from process connected to database.
959 */
960 static WalRcvExecResult *
libpqrcv_exec(WalReceiverConn * conn,const char * query,const int nRetTypes,const Oid * retTypes)961 libpqrcv_exec(WalReceiverConn *conn, const char *query,
962 const int nRetTypes, const Oid *retTypes)
963 {
964 PGresult *pgres = NULL;
965 WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
966
967 if (MyDatabaseId == InvalidOid)
968 ereport(ERROR,
969 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
970 errmsg("the query interface requires a database connection")));
971
972 pgres = libpqrcv_PQexec(conn->streamConn, query);
973
974 switch (PQresultStatus(pgres))
975 {
976 case PGRES_SINGLE_TUPLE:
977 case PGRES_TUPLES_OK:
978 walres->status = WALRCV_OK_TUPLES;
979 libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
980 break;
981
982 case PGRES_COPY_IN:
983 walres->status = WALRCV_OK_COPY_IN;
984 break;
985
986 case PGRES_COPY_OUT:
987 walres->status = WALRCV_OK_COPY_OUT;
988 break;
989
990 case PGRES_COPY_BOTH:
991 walres->status = WALRCV_OK_COPY_BOTH;
992 break;
993
994 case PGRES_COMMAND_OK:
995 walres->status = WALRCV_OK_COMMAND;
996 break;
997
998 /* Empty query is considered error. */
999 case PGRES_EMPTY_QUERY:
1000 walres->status = WALRCV_ERROR;
1001 walres->err = _("empty query");
1002 break;
1003
1004 case PGRES_NONFATAL_ERROR:
1005 case PGRES_FATAL_ERROR:
1006 case PGRES_BAD_RESPONSE:
1007 walres->status = WALRCV_ERROR;
1008 walres->err = pchomp(PQerrorMessage(conn->streamConn));
1009 break;
1010 }
1011
1012 PQclear(pgres);
1013
1014 return walres;
1015 }
1016
1017 /*
1018 * Given a List of strings, return it as single comma separated
1019 * string, quoting identifiers as needed.
1020 *
1021 * This is essentially the reverse of SplitIdentifierString.
1022 *
1023 * The caller should free the result.
1024 */
1025 static char *
stringlist_to_identifierstr(PGconn * conn,List * strings)1026 stringlist_to_identifierstr(PGconn *conn, List *strings)
1027 {
1028 ListCell *lc;
1029 StringInfoData res;
1030 bool first = true;
1031
1032 initStringInfo(&res);
1033
1034 foreach(lc, strings)
1035 {
1036 char *val = strVal(lfirst(lc));
1037 char *val_escaped;
1038
1039 if (first)
1040 first = false;
1041 else
1042 appendStringInfoChar(&res, ',');
1043
1044 val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1045 if (!val_escaped)
1046 {
1047 free(res.data);
1048 return NULL;
1049 }
1050 appendStringInfoString(&res, val_escaped);
1051 PQfreemem(val_escaped);
1052 }
1053
1054 return res.data;
1055 }
1056