1 /*-------------------------------------------------------------------------
2 *
3 * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 * fashion and write it to a local file.
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_recvlogical.c
10 *-------------------------------------------------------------------------
11 */
12
13 #include "postgres_fe.h"
14
15 #include <dirent.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #ifdef HAVE_SYS_SELECT_H
19 #include <sys/select.h>
20 #endif
21
22 /* local includes */
23 #include "streamutil.h"
24
25 #include "access/xlog_internal.h"
26 #include "common/fe_memutils.h"
27 #include "getopt_long.h"
28 #include "libpq-fe.h"
29 #include "libpq/pqsignal.h"
30 #include "pqexpbuffer.h"
31
32
33 /* Time to sleep between reconnection attempts */
34 #define RECONNECT_SLEEP_TIME 5
35
36 /* Global Options */
37 static char *outfile = NULL;
38 static int verbose = 0;
39 static int noloop = 0;
40 static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
41 static int fsync_interval = 10 * 1000; /* 10 sec = default */
42 static XLogRecPtr startpos = InvalidXLogRecPtr;
43 static XLogRecPtr endpos = InvalidXLogRecPtr;
44 static bool do_create_slot = false;
45 static bool slot_exists_ok = false;
46 static bool do_start_slot = false;
47 static bool do_drop_slot = false;
48 static char *replication_slot = NULL;
49
50 /* filled pairwise with option, value. value may be NULL */
51 static char **options;
52 static size_t noptions = 0;
53 static const char *plugin = "test_decoding";
54
55 /* Global State */
56 static int outfd = -1;
57 static volatile sig_atomic_t time_to_abort = false;
58 static volatile sig_atomic_t output_reopen = false;
59 static bool output_isfile;
60 static TimestampTz output_last_fsync = -1;
61 static bool output_needs_fsync = false;
62 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
63 static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
64
65 static void usage(void);
66 static void StreamLogicalLog(void);
67 static void disconnect_and_exit(int code);
68 static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
69 static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
70 bool keepalive, XLogRecPtr lsn);
71
72 static void
usage(void)73 usage(void)
74 {
75 printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
76 progname);
77 printf(_("Usage:\n"));
78 printf(_(" %s [OPTION]...\n"), progname);
79 printf(_("\nAction to be performed:\n"));
80 printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
81 printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
82 printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
83 printf(_("\nOptions:\n"));
84 printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
85 printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
86 printf(_(" -F --fsync-interval=SECS\n"
87 " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
88 printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
89 printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
90 printf(_(" -n, --no-loop do not loop on connection lost\n"));
91 printf(_(" -o, --option=NAME[=VALUE]\n"
92 " pass option NAME with optional value VALUE to the\n"
93 " output plugin\n"));
94 printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
95 printf(_(" -s, --status-interval=SECS\n"
96 " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
97 printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
98 printf(_(" -v, --verbose output verbose messages\n"));
99 printf(_(" -V, --version output version information, then exit\n"));
100 printf(_(" -?, --help show this help, then exit\n"));
101 printf(_("\nConnection options:\n"));
102 printf(_(" -d, --dbname=DBNAME database to connect to\n"));
103 printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
104 printf(_(" -p, --port=PORT database server port number\n"));
105 printf(_(" -U, --username=NAME connect as specified database user\n"));
106 printf(_(" -w, --no-password never prompt for password\n"));
107 printf(_(" -W, --password force password prompt (should happen automatically)\n"));
108 printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
109 }
110
111 /*
112 * Send a Standby Status Update message to server.
113 */
114 static bool
sendFeedback(PGconn * conn,TimestampTz now,bool force,bool replyRequested)115 sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
116 {
117 static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
118 static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
119
120 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
121 int len = 0;
122
123 /*
124 * we normally don't want to send superfluous feedbacks, but if it's
125 * because of a timeout we need to, otherwise wal_sender_timeout will kill
126 * us.
127 */
128 if (!force &&
129 last_written_lsn == output_written_lsn &&
130 last_fsync_lsn == output_fsync_lsn)
131 return true;
132
133 if (verbose)
134 fprintf(stderr,
135 _("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
136 progname,
137 (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
138 (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
139 replication_slot);
140
141 replybuf[len] = 'r';
142 len += 1;
143 fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
144 len += 8;
145 fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
146 len += 8;
147 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
148 len += 8;
149 fe_sendint64(now, &replybuf[len]); /* sendTime */
150 len += 8;
151 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
152 len += 1;
153
154 startpos = output_written_lsn;
155 last_written_lsn = output_written_lsn;
156 last_fsync_lsn = output_fsync_lsn;
157
158 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
159 {
160 fprintf(stderr, _("%s: could not send feedback packet: %s"),
161 progname, PQerrorMessage(conn));
162 return false;
163 }
164
165 return true;
166 }
167
168 static void
disconnect_and_exit(int code)169 disconnect_and_exit(int code)
170 {
171 if (conn != NULL)
172 PQfinish(conn);
173
174 exit(code);
175 }
176
177 static bool
OutputFsync(TimestampTz now)178 OutputFsync(TimestampTz now)
179 {
180 output_last_fsync = now;
181
182 output_fsync_lsn = output_written_lsn;
183
184 if (fsync_interval <= 0)
185 return true;
186
187 if (!output_needs_fsync)
188 return true;
189
190 output_needs_fsync = false;
191
192 /* can only fsync if it's a regular file */
193 if (!output_isfile)
194 return true;
195
196 if (fsync(outfd) != 0)
197 {
198 fprintf(stderr,
199 _("%s: could not fsync log file \"%s\": %s\n"),
200 progname, outfile, strerror(errno));
201 return false;
202 }
203
204 return true;
205 }
206
207 /*
208 * Start the log streaming
209 */
210 static void
StreamLogicalLog(void)211 StreamLogicalLog(void)
212 {
213 PGresult *res;
214 char *copybuf = NULL;
215 TimestampTz last_status = -1;
216 int i;
217 PQExpBuffer query;
218
219 output_written_lsn = InvalidXLogRecPtr;
220 output_fsync_lsn = InvalidXLogRecPtr;
221
222 query = createPQExpBuffer();
223
224 /*
225 * Connect in replication mode to the server
226 */
227 if (!conn)
228 conn = GetConnection();
229 if (!conn)
230 /* Error message already written in GetConnection() */
231 return;
232
233 /*
234 * Start the replication
235 */
236 if (verbose)
237 fprintf(stderr,
238 _("%s: starting log streaming at %X/%X (slot %s)\n"),
239 progname, (uint32) (startpos >> 32), (uint32) startpos,
240 replication_slot);
241
242 /* Initiate the replication stream at specified location */
243 appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
244 replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
245
246 /* print options if there are any */
247 if (noptions)
248 appendPQExpBufferStr(query, " (");
249
250 for (i = 0; i < noptions; i++)
251 {
252 /* separator */
253 if (i > 0)
254 appendPQExpBufferStr(query, ", ");
255
256 /* write option name */
257 appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
258
259 /* write option value if specified */
260 if (options[(i * 2) + 1] != NULL)
261 appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
262 }
263
264 if (noptions)
265 appendPQExpBufferChar(query, ')');
266
267 res = PQexec(conn, query->data);
268 if (PQresultStatus(res) != PGRES_COPY_BOTH)
269 {
270 fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
271 progname, query->data, PQresultErrorMessage(res));
272 PQclear(res);
273 goto error;
274 }
275 PQclear(res);
276 resetPQExpBuffer(query);
277
278 if (verbose)
279 fprintf(stderr,
280 _("%s: streaming initiated\n"),
281 progname);
282
283 while (!time_to_abort)
284 {
285 int r;
286 int bytes_left;
287 int bytes_written;
288 TimestampTz now;
289 int hdr_len;
290 XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
291
292 if (copybuf != NULL)
293 {
294 PQfreemem(copybuf);
295 copybuf = NULL;
296 }
297
298 /*
299 * Potentially send a status message to the master
300 */
301 now = feGetCurrentTimestamp();
302
303 if (outfd != -1 &&
304 feTimestampDifferenceExceeds(output_last_fsync, now,
305 fsync_interval))
306 {
307 if (!OutputFsync(now))
308 goto error;
309 }
310
311 if (standby_message_timeout > 0 &&
312 feTimestampDifferenceExceeds(last_status, now,
313 standby_message_timeout))
314 {
315 /* Time to send feedback! */
316 if (!sendFeedback(conn, now, true, false))
317 goto error;
318
319 last_status = now;
320 }
321
322 /* got SIGHUP, close output file */
323 if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
324 {
325 now = feGetCurrentTimestamp();
326 if (!OutputFsync(now))
327 goto error;
328 close(outfd);
329 outfd = -1;
330 }
331 output_reopen = false;
332
333 /* open the output file, if not open yet */
334 if (outfd == -1)
335 {
336 struct stat statbuf;
337
338 if (strcmp(outfile, "-") == 0)
339 outfd = fileno(stdout);
340 else
341 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
342 S_IRUSR | S_IWUSR);
343 if (outfd == -1)
344 {
345 fprintf(stderr,
346 _("%s: could not open log file \"%s\": %s\n"),
347 progname, outfile, strerror(errno));
348 goto error;
349 }
350
351 if (fstat(outfd, &statbuf) != 0)
352 fprintf(stderr,
353 _("%s: could not stat file \"%s\": %s\n"),
354 progname, outfile, strerror(errno));
355
356 output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
357 }
358
359 r = PQgetCopyData(conn, ©buf, 1);
360 if (r == 0)
361 {
362 /*
363 * In async mode, and no data available. We block on reading but
364 * not more than the specified timeout, so that we can send a
365 * response back to the client.
366 */
367 fd_set input_mask;
368 TimestampTz message_target = 0;
369 TimestampTz fsync_target = 0;
370 struct timeval timeout;
371 struct timeval *timeoutptr = NULL;
372
373 if (PQsocket(conn) < 0)
374 {
375 fprintf(stderr,
376 _("%s: invalid socket: %s"),
377 progname, PQerrorMessage(conn));
378 goto error;
379 }
380
381 FD_ZERO(&input_mask);
382 FD_SET(PQsocket(conn), &input_mask);
383
384 /* Compute when we need to wakeup to send a keepalive message. */
385 if (standby_message_timeout)
386 message_target = last_status + (standby_message_timeout - 1) *
387 ((int64) 1000);
388
389 /* Compute when we need to wakeup to fsync the output file. */
390 if (fsync_interval > 0 && output_needs_fsync)
391 fsync_target = output_last_fsync + (fsync_interval - 1) *
392 ((int64) 1000);
393
394 /* Now compute when to wakeup. */
395 if (message_target > 0 || fsync_target > 0)
396 {
397 TimestampTz targettime;
398 long secs;
399 int usecs;
400
401 targettime = message_target;
402
403 if (fsync_target > 0 && fsync_target < targettime)
404 targettime = fsync_target;
405
406 feTimestampDifference(now,
407 targettime,
408 &secs,
409 &usecs);
410 if (secs <= 0)
411 timeout.tv_sec = 1; /* Always sleep at least 1 sec */
412 else
413 timeout.tv_sec = secs;
414 timeout.tv_usec = usecs;
415 timeoutptr = &timeout;
416 }
417
418 r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
419 if (r == 0 || (r < 0 && errno == EINTR))
420 {
421 /*
422 * Got a timeout or signal. Continue the loop and either
423 * deliver a status packet to the server or just go back into
424 * blocking.
425 */
426 continue;
427 }
428 else if (r < 0)
429 {
430 fprintf(stderr, _("%s: select() failed: %s\n"),
431 progname, strerror(errno));
432 goto error;
433 }
434
435 /* Else there is actually data on the socket */
436 if (PQconsumeInput(conn) == 0)
437 {
438 fprintf(stderr,
439 _("%s: could not receive data from WAL stream: %s"),
440 progname, PQerrorMessage(conn));
441 goto error;
442 }
443 continue;
444 }
445
446 /* End of copy stream */
447 if (r == -1)
448 break;
449
450 /* Failure while reading the copy stream */
451 if (r == -2)
452 {
453 fprintf(stderr, _("%s: could not read COPY data: %s"),
454 progname, PQerrorMessage(conn));
455 goto error;
456 }
457
458 /* Check the message type. */
459 if (copybuf[0] == 'k')
460 {
461 int pos;
462 bool replyRequested;
463 XLogRecPtr walEnd;
464 bool endposReached = false;
465
466 /*
467 * Parse the keepalive message, enclosed in the CopyData message.
468 * We just check if the server requested a reply, and ignore the
469 * rest.
470 */
471 pos = 1; /* skip msgtype 'k' */
472 walEnd = fe_recvint64(©buf[pos]);
473 output_written_lsn = Max(walEnd, output_written_lsn);
474
475 pos += 8; /* read walEnd */
476
477 pos += 8; /* skip sendTime */
478
479 if (r < pos + 1)
480 {
481 fprintf(stderr, _("%s: streaming header too small: %d\n"),
482 progname, r);
483 goto error;
484 }
485 replyRequested = copybuf[pos];
486
487 if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
488 {
489 /*
490 * If there's nothing to read on the socket until a keepalive
491 * we know that the server has nothing to send us; and if
492 * walEnd has passed endpos, we know nothing else can have
493 * committed before endpos. So we can bail out now.
494 */
495 endposReached = true;
496 }
497
498 /* Send a reply, if necessary */
499 if (replyRequested || endposReached)
500 {
501 if (!flushAndSendFeedback(conn, &now))
502 goto error;
503 last_status = now;
504 }
505
506 if (endposReached)
507 {
508 prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
509 time_to_abort = true;
510 break;
511 }
512
513 continue;
514 }
515 else if (copybuf[0] != 'w')
516 {
517 fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
518 progname, copybuf[0]);
519 goto error;
520 }
521
522 /*
523 * Read the header of the XLogData message, enclosed in the CopyData
524 * message. We only need the WAL location field (dataStart), the rest
525 * of the header is ignored.
526 */
527 hdr_len = 1; /* msgtype 'w' */
528 hdr_len += 8; /* dataStart */
529 hdr_len += 8; /* walEnd */
530 hdr_len += 8; /* sendTime */
531 if (r < hdr_len + 1)
532 {
533 fprintf(stderr, _("%s: streaming header too small: %d\n"),
534 progname, r);
535 goto error;
536 }
537
538 /* Extract WAL location for this block */
539 cur_record_lsn = fe_recvint64(©buf[1]);
540
541 if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
542 {
543 /*
544 * We've read past our endpoint, so prepare to go away being
545 * cautious about what happens to our output data.
546 */
547 if (!flushAndSendFeedback(conn, &now))
548 goto error;
549 prepareToTerminate(conn, endpos, false, cur_record_lsn);
550 time_to_abort = true;
551 break;
552 }
553
554 output_written_lsn = Max(cur_record_lsn, output_written_lsn);
555
556 bytes_left = r - hdr_len;
557 bytes_written = 0;
558
559 /* signal that a fsync is needed */
560 output_needs_fsync = true;
561
562 while (bytes_left)
563 {
564 int ret;
565
566 ret = write(outfd,
567 copybuf + hdr_len + bytes_written,
568 bytes_left);
569
570 if (ret < 0)
571 {
572 fprintf(stderr,
573 _("%s: could not write %u bytes to log file \"%s\": %s\n"),
574 progname, bytes_left, outfile,
575 strerror(errno));
576 goto error;
577 }
578
579 /* Write was successful, advance our position */
580 bytes_written += ret;
581 bytes_left -= ret;
582 }
583
584 if (write(outfd, "\n", 1) != 1)
585 {
586 fprintf(stderr,
587 _("%s: could not write %u bytes to log file \"%s\": %s\n"),
588 progname, 1, outfile,
589 strerror(errno));
590 goto error;
591 }
592
593 if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
594 {
595 /* endpos was exactly the record we just processed, we're done */
596 if (!flushAndSendFeedback(conn, &now))
597 goto error;
598 prepareToTerminate(conn, endpos, false, cur_record_lsn);
599 time_to_abort = true;
600 break;
601 }
602 }
603
604 res = PQgetResult(conn);
605 if (PQresultStatus(res) == PGRES_COPY_OUT)
606 {
607 PQclear(res);
608
609 /*
610 * We're doing a client-initiated clean exit and have sent CopyDone to
611 * the server. Drain any messages, so we don't miss a last-minute
612 * ErrorResponse. The walsender stops generating XLogData records once
613 * it sees CopyDone, so expect this to finish quickly. After CopyDone,
614 * it's too late for sendFeedback(), even if this were to take a long
615 * time. Hence, use synchronous-mode PQgetCopyData().
616 */
617 while (1)
618 {
619 int r;
620
621 if (copybuf != NULL)
622 {
623 PQfreemem(copybuf);
624 copybuf = NULL;
625 }
626 r = PQgetCopyData(conn, ©buf, 0);
627 if (r == -1)
628 break;
629 if (r == -2)
630 {
631 fprintf(stderr, _("%s: could not read COPY data: %s"),
632 progname, PQerrorMessage(conn));
633 time_to_abort = false; /* unclean exit */
634 goto error;
635 }
636 }
637
638 res = PQgetResult(conn);
639 }
640 if (PQresultStatus(res) != PGRES_COMMAND_OK)
641 {
642 fprintf(stderr,
643 _("%s: unexpected termination of replication stream: %s"),
644 progname, PQresultErrorMessage(res));
645 goto error;
646 }
647 PQclear(res);
648
649 if (outfd != -1 && strcmp(outfile, "-") != 0)
650 {
651 TimestampTz t = feGetCurrentTimestamp();
652
653 /* no need to jump to error on failure here, we're finishing anyway */
654 OutputFsync(t);
655
656 if (close(outfd) != 0)
657 fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
658 progname, outfile, strerror(errno));
659 }
660 outfd = -1;
661 error:
662 if (copybuf != NULL)
663 {
664 PQfreemem(copybuf);
665 copybuf = NULL;
666 }
667 destroyPQExpBuffer(query);
668 PQfinish(conn);
669 conn = NULL;
670 }
671
672 /*
673 * Unfortunately we can't do sensible signal handling on windows...
674 */
675 #ifndef WIN32
676
677 /*
678 * When sigint is called, just tell the system to exit at the next possible
679 * moment.
680 */
681 static void
sigint_handler(int signum)682 sigint_handler(int signum)
683 {
684 time_to_abort = true;
685 }
686
687 /*
688 * Trigger the output file to be reopened.
689 */
690 static void
sighup_handler(int signum)691 sighup_handler(int signum)
692 {
693 output_reopen = true;
694 }
695 #endif
696
697
698 int
main(int argc,char ** argv)699 main(int argc, char **argv)
700 {
701 static struct option long_options[] = {
702 /* general options */
703 {"file", required_argument, NULL, 'f'},
704 {"fsync-interval", required_argument, NULL, 'F'},
705 {"no-loop", no_argument, NULL, 'n'},
706 {"verbose", no_argument, NULL, 'v'},
707 {"version", no_argument, NULL, 'V'},
708 {"help", no_argument, NULL, '?'},
709 /* connection options */
710 {"dbname", required_argument, NULL, 'd'},
711 {"host", required_argument, NULL, 'h'},
712 {"port", required_argument, NULL, 'p'},
713 {"username", required_argument, NULL, 'U'},
714 {"no-password", no_argument, NULL, 'w'},
715 {"password", no_argument, NULL, 'W'},
716 /* replication options */
717 {"startpos", required_argument, NULL, 'I'},
718 {"endpos", required_argument, NULL, 'E'},
719 {"option", required_argument, NULL, 'o'},
720 {"plugin", required_argument, NULL, 'P'},
721 {"status-interval", required_argument, NULL, 's'},
722 {"slot", required_argument, NULL, 'S'},
723 /* action */
724 {"create-slot", no_argument, NULL, 1},
725 {"start", no_argument, NULL, 2},
726 {"drop-slot", no_argument, NULL, 3},
727 {"if-not-exists", no_argument, NULL, 4},
728 {NULL, 0, NULL, 0}
729 };
730 int c;
731 int option_index;
732 uint32 hi,
733 lo;
734 char *db_name;
735
736 progname = get_progname(argv[0]);
737 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
738
739 if (argc > 1)
740 {
741 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
742 {
743 usage();
744 exit(0);
745 }
746 else if (strcmp(argv[1], "-V") == 0 ||
747 strcmp(argv[1], "--version") == 0)
748 {
749 puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
750 exit(0);
751 }
752 }
753
754 while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
755 long_options, &option_index)) != -1)
756 {
757 switch (c)
758 {
759 /* general options */
760 case 'f':
761 outfile = pg_strdup(optarg);
762 break;
763 case 'F':
764 fsync_interval = atoi(optarg) * 1000;
765 if (fsync_interval < 0)
766 {
767 fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
768 progname, optarg);
769 exit(1);
770 }
771 break;
772 case 'n':
773 noloop = 1;
774 break;
775 case 'v':
776 verbose++;
777 break;
778 /* connection options */
779 case 'd':
780 dbname = pg_strdup(optarg);
781 break;
782 case 'h':
783 dbhost = pg_strdup(optarg);
784 break;
785 case 'p':
786 if (atoi(optarg) <= 0)
787 {
788 fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
789 progname, optarg);
790 exit(1);
791 }
792 dbport = pg_strdup(optarg);
793 break;
794 case 'U':
795 dbuser = pg_strdup(optarg);
796 break;
797 case 'w':
798 dbgetpassword = -1;
799 break;
800 case 'W':
801 dbgetpassword = 1;
802 break;
803 /* replication options */
804 case 'I':
805 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
806 {
807 fprintf(stderr,
808 _("%s: could not parse start position \"%s\"\n"),
809 progname, optarg);
810 exit(1);
811 }
812 startpos = ((uint64) hi) << 32 | lo;
813 break;
814 case 'E':
815 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
816 {
817 fprintf(stderr,
818 _("%s: could not parse end position \"%s\"\n"),
819 progname, optarg);
820 exit(1);
821 }
822 endpos = ((uint64) hi) << 32 | lo;
823 break;
824 case 'o':
825 {
826 char *data = pg_strdup(optarg);
827 char *val = strchr(data, '=');
828
829 if (val != NULL)
830 {
831 /* remove =; separate data from val */
832 *val = '\0';
833 val++;
834 }
835
836 noptions += 1;
837 options = pg_realloc(options, sizeof(char *) * noptions * 2);
838
839 options[(noptions - 1) * 2] = data;
840 options[(noptions - 1) * 2 + 1] = val;
841 }
842
843 break;
844 case 'P':
845 plugin = pg_strdup(optarg);
846 break;
847 case 's':
848 standby_message_timeout = atoi(optarg) * 1000;
849 if (standby_message_timeout < 0)
850 {
851 fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
852 progname, optarg);
853 exit(1);
854 }
855 break;
856 case 'S':
857 replication_slot = pg_strdup(optarg);
858 break;
859 /* action */
860 case 1:
861 do_create_slot = true;
862 break;
863 case 2:
864 do_start_slot = true;
865 break;
866 case 3:
867 do_drop_slot = true;
868 break;
869 case 4:
870 slot_exists_ok = true;
871 break;
872
873 default:
874
875 /*
876 * getopt_long already emitted a complaint
877 */
878 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
879 progname);
880 exit(1);
881 }
882 }
883
884 /*
885 * Any non-option arguments?
886 */
887 if (optind < argc)
888 {
889 fprintf(stderr,
890 _("%s: too many command-line arguments (first is \"%s\")\n"),
891 progname, argv[optind]);
892 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
893 progname);
894 exit(1);
895 }
896
897 /*
898 * Required arguments
899 */
900 if (replication_slot == NULL)
901 {
902 fprintf(stderr, _("%s: no slot specified\n"), progname);
903 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
904 progname);
905 exit(1);
906 }
907
908 if (do_start_slot && outfile == NULL)
909 {
910 fprintf(stderr, _("%s: no target file specified\n"), progname);
911 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
912 progname);
913 exit(1);
914 }
915
916 if (!do_drop_slot && dbname == NULL)
917 {
918 fprintf(stderr, _("%s: no database specified\n"), progname);
919 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
920 progname);
921 exit(1);
922 }
923
924 if (!do_drop_slot && !do_create_slot && !do_start_slot)
925 {
926 fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
927 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
928 progname);
929 exit(1);
930 }
931
932 if (do_drop_slot && (do_create_slot || do_start_slot))
933 {
934 fprintf(stderr, _("%s: cannot use --create-slot or --start together with --drop-slot\n"), progname);
935 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
936 progname);
937 exit(1);
938 }
939
940 if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
941 {
942 fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos\n"), progname);
943 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
944 progname);
945 exit(1);
946 }
947
948 if (endpos != InvalidXLogRecPtr && !do_start_slot)
949 {
950 fprintf(stderr,
951 _("%s: --endpos may only be specified with --start\n"),
952 progname);
953 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
954 progname);
955 exit(1);
956 }
957
958 #ifndef WIN32
959 pqsignal(SIGINT, sigint_handler);
960 pqsignal(SIGHUP, sighup_handler);
961 #endif
962
963 /*
964 * Obtain a connection to server. This is not really necessary but it
965 * helps to get more precise error messages about authentication, required
966 * GUC parameters and such.
967 */
968 conn = GetConnection();
969 if (!conn)
970 /* Error message already written in GetConnection() */
971 exit(1);
972
973 /*
974 * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
975 * replication connection.
976 */
977 if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
978 disconnect_and_exit(1);
979
980 if (db_name == NULL)
981 {
982 fprintf(stderr,
983 _("%s: could not establish database-specific replication connection\n"),
984 progname);
985 disconnect_and_exit(1);
986 }
987
988 /* Drop a replication slot. */
989 if (do_drop_slot)
990 {
991 if (verbose)
992 fprintf(stderr,
993 _("%s: dropping replication slot \"%s\"\n"),
994 progname, replication_slot);
995
996 if (!DropReplicationSlot(conn, replication_slot))
997 disconnect_and_exit(1);
998 }
999
1000 /* Create a replication slot. */
1001 if (do_create_slot)
1002 {
1003 if (verbose)
1004 fprintf(stderr,
1005 _("%s: creating replication slot \"%s\"\n"),
1006 progname, replication_slot);
1007
1008 if (!CreateReplicationSlot(conn, replication_slot, plugin,
1009 false, slot_exists_ok))
1010 disconnect_and_exit(1);
1011 startpos = InvalidXLogRecPtr;
1012 }
1013
1014 if (!do_start_slot)
1015 disconnect_and_exit(0);
1016
1017 /* Stream loop */
1018 while (true)
1019 {
1020 StreamLogicalLog();
1021 if (time_to_abort)
1022 {
1023 /*
1024 * We've been Ctrl-C'ed or reached an exit limit condition. That's
1025 * not an error, so exit without an errorcode.
1026 */
1027 disconnect_and_exit(0);
1028 }
1029 else if (noloop)
1030 {
1031 fprintf(stderr, _("%s: disconnected\n"), progname);
1032 exit(1);
1033 }
1034 else
1035 {
1036 fprintf(stderr,
1037 /* translator: check source for value for %d */
1038 _("%s: disconnected; waiting %d seconds to try again\n"),
1039 progname, RECONNECT_SLEEP_TIME);
1040 pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1041 }
1042 }
1043 }
1044
1045 /*
1046 * Fsync our output data, and send a feedback message to the server. Returns
1047 * true if successful, false otherwise.
1048 *
1049 * If successful, *now is updated to the current timestamp just before sending
1050 * feedback.
1051 */
1052 static bool
flushAndSendFeedback(PGconn * conn,TimestampTz * now)1053 flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1054 {
1055 /* flush data to disk, so that we send a recent flush pointer */
1056 if (!OutputFsync(*now))
1057 return false;
1058 *now = feGetCurrentTimestamp();
1059 if (!sendFeedback(conn, *now, true, false))
1060 return false;
1061
1062 return true;
1063 }
1064
1065 /*
1066 * Try to inform the server about of upcoming demise, but don't wait around or
1067 * retry on failure.
1068 */
1069 static void
prepareToTerminate(PGconn * conn,XLogRecPtr endpos,bool keepalive,XLogRecPtr lsn)1070 prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
1071 {
1072 (void) PQputCopyEnd(conn, NULL);
1073 (void) PQflush(conn);
1074
1075 if (verbose)
1076 {
1077 if (keepalive)
1078 fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
1079 progname,
1080 (uint32) (endpos >> 32), (uint32) endpos);
1081 else
1082 fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
1083 progname, (uint32) (endpos >> 32), (uint32) (endpos),
1084 (uint32) (lsn >> 32), (uint32) lsn);
1085
1086 }
1087 }
1088