1 /*-------------------------------------------------------------------------
2  *
3  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4  *					  fashion and write it to a local file.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  src/bin/pg_basebackup/pg_recvlogical.c
10  *-------------------------------------------------------------------------
11  */
12 
13 #include "postgres_fe.h"
14 
15 #include <dirent.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 #ifdef HAVE_SYS_SELECT_H
19 #include <sys/select.h>
20 #endif
21 
22 /* local includes */
23 #include "streamutil.h"
24 
25 #include "access/xlog_internal.h"
26 #include "common/fe_memutils.h"
27 #include "getopt_long.h"
28 #include "libpq-fe.h"
29 #include "libpq/pqsignal.h"
30 #include "pqexpbuffer.h"
31 
32 
33 /* Time to sleep between reconnection attempts */
34 #define RECONNECT_SLEEP_TIME 5
35 
36 /* Global Options */
37 static char *outfile = NULL;
38 static int	verbose = 0;
39 static int	noloop = 0;
40 static int	standby_message_timeout = 10 * 1000;	/* 10 sec = default */
41 static int	fsync_interval = 10 * 1000; /* 10 sec = default */
42 static XLogRecPtr startpos = InvalidXLogRecPtr;
43 static XLogRecPtr endpos = InvalidXLogRecPtr;
44 static bool do_create_slot = false;
45 static bool slot_exists_ok = false;
46 static bool do_start_slot = false;
47 static bool do_drop_slot = false;
48 static char *replication_slot = NULL;
49 
50 /* filled pairwise with option, value. value may be NULL */
51 static char **options;
52 static size_t noptions = 0;
53 static const char *plugin = "test_decoding";
54 
55 /* Global State */
56 static int	outfd = -1;
57 static volatile sig_atomic_t time_to_abort = false;
58 static volatile sig_atomic_t output_reopen = false;
59 static bool output_isfile;
60 static TimestampTz output_last_fsync = -1;
61 static bool output_needs_fsync = false;
62 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
63 static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
64 
65 static void usage(void);
66 static void StreamLogicalLog(void);
67 static void disconnect_and_exit(int code);
68 static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
69 static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
70 				   bool keepalive, XLogRecPtr lsn);
71 
72 static void
usage(void)73 usage(void)
74 {
75 	printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
76 		   progname);
77 	printf(_("Usage:\n"));
78 	printf(_("  %s [OPTION]...\n"), progname);
79 	printf(_("\nAction to be performed:\n"));
80 	printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
81 	printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
82 	printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
83 	printf(_("\nOptions:\n"));
84 	printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
85 	printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
86 	printf(_("  -F  --fsync-interval=SECS\n"
87 			 "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
88 	printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
89 	printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
90 	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
91 	printf(_("  -o, --option=NAME[=VALUE]\n"
92 			 "                         pass option NAME with optional value VALUE to the\n"
93 			 "                         output plugin\n"));
94 	printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
95 	printf(_("  -s, --status-interval=SECS\n"
96 			 "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
97 	printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
98 	printf(_("  -v, --verbose          output verbose messages\n"));
99 	printf(_("  -V, --version          output version information, then exit\n"));
100 	printf(_("  -?, --help             show this help, then exit\n"));
101 	printf(_("\nConnection options:\n"));
102 	printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
103 	printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
104 	printf(_("  -p, --port=PORT        database server port number\n"));
105 	printf(_("  -U, --username=NAME    connect as specified database user\n"));
106 	printf(_("  -w, --no-password      never prompt for password\n"));
107 	printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
108 	printf(_("\nReport bugs to <pgsql-bugs@postgresql.org>.\n"));
109 }
110 
111 /*
112  * Send a Standby Status Update message to server.
113  */
114 static bool
sendFeedback(PGconn * conn,TimestampTz now,bool force,bool replyRequested)115 sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
116 {
117 	static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
118 	static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
119 
120 	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
121 	int			len = 0;
122 
123 	/*
124 	 * we normally don't want to send superfluous feedbacks, but if it's
125 	 * because of a timeout we need to, otherwise wal_sender_timeout will kill
126 	 * us.
127 	 */
128 	if (!force &&
129 		last_written_lsn == output_written_lsn &&
130 		last_fsync_lsn == output_fsync_lsn)
131 		return true;
132 
133 	if (verbose)
134 		fprintf(stderr,
135 				_("%s: confirming write up to %X/%X, flush to %X/%X (slot %s)\n"),
136 				progname,
137 				(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
138 				(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
139 				replication_slot);
140 
141 	replybuf[len] = 'r';
142 	len += 1;
143 	fe_sendint64(output_written_lsn, &replybuf[len]);	/* write */
144 	len += 8;
145 	fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
146 	len += 8;
147 	fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);	/* apply */
148 	len += 8;
149 	fe_sendint64(now, &replybuf[len]);	/* sendTime */
150 	len += 8;
151 	replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
152 	len += 1;
153 
154 	startpos = output_written_lsn;
155 	last_written_lsn = output_written_lsn;
156 	last_fsync_lsn = output_fsync_lsn;
157 
158 	if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
159 	{
160 		fprintf(stderr, _("%s: could not send feedback packet: %s"),
161 				progname, PQerrorMessage(conn));
162 		return false;
163 	}
164 
165 	return true;
166 }
167 
168 static void
disconnect_and_exit(int code)169 disconnect_and_exit(int code)
170 {
171 	if (conn != NULL)
172 		PQfinish(conn);
173 
174 	exit(code);
175 }
176 
177 static bool
OutputFsync(TimestampTz now)178 OutputFsync(TimestampTz now)
179 {
180 	output_last_fsync = now;
181 
182 	output_fsync_lsn = output_written_lsn;
183 
184 	if (fsync_interval <= 0)
185 		return true;
186 
187 	if (!output_needs_fsync)
188 		return true;
189 
190 	output_needs_fsync = false;
191 
192 	/* can only fsync if it's a regular file */
193 	if (!output_isfile)
194 		return true;
195 
196 	if (fsync(outfd) != 0)
197 	{
198 		fprintf(stderr,
199 				_("%s: could not fsync log file \"%s\": %s\n"),
200 				progname, outfile, strerror(errno));
201 		return false;
202 	}
203 
204 	return true;
205 }
206 
207 /*
208  * Start the log streaming
209  */
210 static void
StreamLogicalLog(void)211 StreamLogicalLog(void)
212 {
213 	PGresult   *res;
214 	char	   *copybuf = NULL;
215 	TimestampTz last_status = -1;
216 	int			i;
217 	PQExpBuffer query;
218 
219 	output_written_lsn = InvalidXLogRecPtr;
220 	output_fsync_lsn = InvalidXLogRecPtr;
221 
222 	query = createPQExpBuffer();
223 
224 	/*
225 	 * Connect in replication mode to the server
226 	 */
227 	if (!conn)
228 		conn = GetConnection();
229 	if (!conn)
230 		/* Error message already written in GetConnection() */
231 		return;
232 
233 	/*
234 	 * Start the replication
235 	 */
236 	if (verbose)
237 		fprintf(stderr,
238 				_("%s: starting log streaming at %X/%X (slot %s)\n"),
239 				progname, (uint32) (startpos >> 32), (uint32) startpos,
240 				replication_slot);
241 
242 	/* Initiate the replication stream at specified location */
243 	appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
244 					  replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
245 
246 	/* print options if there are any */
247 	if (noptions)
248 		appendPQExpBufferStr(query, " (");
249 
250 	for (i = 0; i < noptions; i++)
251 	{
252 		/* separator */
253 		if (i > 0)
254 			appendPQExpBufferStr(query, ", ");
255 
256 		/* write option name */
257 		appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
258 
259 		/* write option value if specified */
260 		if (options[(i * 2) + 1] != NULL)
261 			appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
262 	}
263 
264 	if (noptions)
265 		appendPQExpBufferChar(query, ')');
266 
267 	res = PQexec(conn, query->data);
268 	if (PQresultStatus(res) != PGRES_COPY_BOTH)
269 	{
270 		fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
271 				progname, query->data, PQresultErrorMessage(res));
272 		PQclear(res);
273 		goto error;
274 	}
275 	PQclear(res);
276 	resetPQExpBuffer(query);
277 
278 	if (verbose)
279 		fprintf(stderr,
280 				_("%s: streaming initiated\n"),
281 				progname);
282 
283 	while (!time_to_abort)
284 	{
285 		int			r;
286 		int			bytes_left;
287 		int			bytes_written;
288 		TimestampTz now;
289 		int			hdr_len;
290 		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
291 
292 		if (copybuf != NULL)
293 		{
294 			PQfreemem(copybuf);
295 			copybuf = NULL;
296 		}
297 
298 		/*
299 		 * Potentially send a status message to the master
300 		 */
301 		now = feGetCurrentTimestamp();
302 
303 		if (outfd != -1 &&
304 			feTimestampDifferenceExceeds(output_last_fsync, now,
305 										 fsync_interval))
306 		{
307 			if (!OutputFsync(now))
308 				goto error;
309 		}
310 
311 		if (standby_message_timeout > 0 &&
312 			feTimestampDifferenceExceeds(last_status, now,
313 										 standby_message_timeout))
314 		{
315 			/* Time to send feedback! */
316 			if (!sendFeedback(conn, now, true, false))
317 				goto error;
318 
319 			last_status = now;
320 		}
321 
322 		/* got SIGHUP, close output file */
323 		if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
324 		{
325 			now = feGetCurrentTimestamp();
326 			if (!OutputFsync(now))
327 				goto error;
328 			close(outfd);
329 			outfd = -1;
330 		}
331 		output_reopen = false;
332 
333 		/* open the output file, if not open yet */
334 		if (outfd == -1)
335 		{
336 			struct stat statbuf;
337 
338 			if (strcmp(outfile, "-") == 0)
339 				outfd = fileno(stdout);
340 			else
341 				outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
342 							 S_IRUSR | S_IWUSR);
343 			if (outfd == -1)
344 			{
345 				fprintf(stderr,
346 						_("%s: could not open log file \"%s\": %s\n"),
347 						progname, outfile, strerror(errno));
348 				goto error;
349 			}
350 
351 			if (fstat(outfd, &statbuf) != 0)
352 				fprintf(stderr,
353 						_("%s: could not stat file \"%s\": %s\n"),
354 						progname, outfile, strerror(errno));
355 
356 			output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
357 		}
358 
359 		r = PQgetCopyData(conn, &copybuf, 1);
360 		if (r == 0)
361 		{
362 			/*
363 			 * In async mode, and no data available. We block on reading but
364 			 * not more than the specified timeout, so that we can send a
365 			 * response back to the client.
366 			 */
367 			fd_set		input_mask;
368 			TimestampTz message_target = 0;
369 			TimestampTz fsync_target = 0;
370 			struct timeval timeout;
371 			struct timeval *timeoutptr = NULL;
372 
373 			if (PQsocket(conn) < 0)
374 			{
375 				fprintf(stderr,
376 						_("%s: invalid socket: %s"),
377 						progname, PQerrorMessage(conn));
378 				goto error;
379 			}
380 
381 			FD_ZERO(&input_mask);
382 			FD_SET(PQsocket(conn), &input_mask);
383 
384 			/* Compute when we need to wakeup to send a keepalive message. */
385 			if (standby_message_timeout)
386 				message_target = last_status + (standby_message_timeout - 1) *
387 					((int64) 1000);
388 
389 			/* Compute when we need to wakeup to fsync the output file. */
390 			if (fsync_interval > 0 && output_needs_fsync)
391 				fsync_target = output_last_fsync + (fsync_interval - 1) *
392 					((int64) 1000);
393 
394 			/* Now compute when to wakeup. */
395 			if (message_target > 0 || fsync_target > 0)
396 			{
397 				TimestampTz targettime;
398 				long		secs;
399 				int			usecs;
400 
401 				targettime = message_target;
402 
403 				if (fsync_target > 0 && fsync_target < targettime)
404 					targettime = fsync_target;
405 
406 				feTimestampDifference(now,
407 									  targettime,
408 									  &secs,
409 									  &usecs);
410 				if (secs <= 0)
411 					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
412 				else
413 					timeout.tv_sec = secs;
414 				timeout.tv_usec = usecs;
415 				timeoutptr = &timeout;
416 			}
417 
418 			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
419 			if (r == 0 || (r < 0 && errno == EINTR))
420 			{
421 				/*
422 				 * Got a timeout or signal. Continue the loop and either
423 				 * deliver a status packet to the server or just go back into
424 				 * blocking.
425 				 */
426 				continue;
427 			}
428 			else if (r < 0)
429 			{
430 				fprintf(stderr, _("%s: select() failed: %s\n"),
431 						progname, strerror(errno));
432 				goto error;
433 			}
434 
435 			/* Else there is actually data on the socket */
436 			if (PQconsumeInput(conn) == 0)
437 			{
438 				fprintf(stderr,
439 						_("%s: could not receive data from WAL stream: %s"),
440 						progname, PQerrorMessage(conn));
441 				goto error;
442 			}
443 			continue;
444 		}
445 
446 		/* End of copy stream */
447 		if (r == -1)
448 			break;
449 
450 		/* Failure while reading the copy stream */
451 		if (r == -2)
452 		{
453 			fprintf(stderr, _("%s: could not read COPY data: %s"),
454 					progname, PQerrorMessage(conn));
455 			goto error;
456 		}
457 
458 		/* Check the message type. */
459 		if (copybuf[0] == 'k')
460 		{
461 			int			pos;
462 			bool		replyRequested;
463 			XLogRecPtr	walEnd;
464 			bool		endposReached = false;
465 
466 			/*
467 			 * Parse the keepalive message, enclosed in the CopyData message.
468 			 * We just check if the server requested a reply, and ignore the
469 			 * rest.
470 			 */
471 			pos = 1;			/* skip msgtype 'k' */
472 			walEnd = fe_recvint64(&copybuf[pos]);
473 			output_written_lsn = Max(walEnd, output_written_lsn);
474 
475 			pos += 8;			/* read walEnd */
476 
477 			pos += 8;			/* skip sendTime */
478 
479 			if (r < pos + 1)
480 			{
481 				fprintf(stderr, _("%s: streaming header too small: %d\n"),
482 						progname, r);
483 				goto error;
484 			}
485 			replyRequested = copybuf[pos];
486 
487 			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
488 			{
489 				/*
490 				 * If there's nothing to read on the socket until a keepalive
491 				 * we know that the server has nothing to send us; and if
492 				 * walEnd has passed endpos, we know nothing else can have
493 				 * committed before endpos.  So we can bail out now.
494 				 */
495 				endposReached = true;
496 			}
497 
498 			/* Send a reply, if necessary */
499 			if (replyRequested || endposReached)
500 			{
501 				if (!flushAndSendFeedback(conn, &now))
502 					goto error;
503 				last_status = now;
504 			}
505 
506 			if (endposReached)
507 			{
508 				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
509 				time_to_abort = true;
510 				break;
511 			}
512 
513 			continue;
514 		}
515 		else if (copybuf[0] != 'w')
516 		{
517 			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
518 					progname, copybuf[0]);
519 			goto error;
520 		}
521 
522 		/*
523 		 * Read the header of the XLogData message, enclosed in the CopyData
524 		 * message. We only need the WAL location field (dataStart), the rest
525 		 * of the header is ignored.
526 		 */
527 		hdr_len = 1;			/* msgtype 'w' */
528 		hdr_len += 8;			/* dataStart */
529 		hdr_len += 8;			/* walEnd */
530 		hdr_len += 8;			/* sendTime */
531 		if (r < hdr_len + 1)
532 		{
533 			fprintf(stderr, _("%s: streaming header too small: %d\n"),
534 					progname, r);
535 			goto error;
536 		}
537 
538 		/* Extract WAL location for this block */
539 		cur_record_lsn = fe_recvint64(&copybuf[1]);
540 
541 		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
542 		{
543 			/*
544 			 * We've read past our endpoint, so prepare to go away being
545 			 * cautious about what happens to our output data.
546 			 */
547 			if (!flushAndSendFeedback(conn, &now))
548 				goto error;
549 			prepareToTerminate(conn, endpos, false, cur_record_lsn);
550 			time_to_abort = true;
551 			break;
552 		}
553 
554 		output_written_lsn = Max(cur_record_lsn, output_written_lsn);
555 
556 		bytes_left = r - hdr_len;
557 		bytes_written = 0;
558 
559 		/* signal that a fsync is needed */
560 		output_needs_fsync = true;
561 
562 		while (bytes_left)
563 		{
564 			int			ret;
565 
566 			ret = write(outfd,
567 						copybuf + hdr_len + bytes_written,
568 						bytes_left);
569 
570 			if (ret < 0)
571 			{
572 				fprintf(stderr,
573 						_("%s: could not write %u bytes to log file \"%s\": %s\n"),
574 						progname, bytes_left, outfile,
575 						strerror(errno));
576 				goto error;
577 			}
578 
579 			/* Write was successful, advance our position */
580 			bytes_written += ret;
581 			bytes_left -= ret;
582 		}
583 
584 		if (write(outfd, "\n", 1) != 1)
585 		{
586 			fprintf(stderr,
587 					_("%s: could not write %u bytes to log file \"%s\": %s\n"),
588 					progname, 1, outfile,
589 					strerror(errno));
590 			goto error;
591 		}
592 
593 		if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
594 		{
595 			/* endpos was exactly the record we just processed, we're done */
596 			if (!flushAndSendFeedback(conn, &now))
597 				goto error;
598 			prepareToTerminate(conn, endpos, false, cur_record_lsn);
599 			time_to_abort = true;
600 			break;
601 		}
602 	}
603 
604 	res = PQgetResult(conn);
605 	if (PQresultStatus(res) == PGRES_COPY_OUT)
606 	{
607 		PQclear(res);
608 
609 		/*
610 		 * We're doing a client-initiated clean exit and have sent CopyDone to
611 		 * the server. Drain any messages, so we don't miss a last-minute
612 		 * ErrorResponse. The walsender stops generating XLogData records once
613 		 * it sees CopyDone, so expect this to finish quickly. After CopyDone,
614 		 * it's too late for sendFeedback(), even if this were to take a long
615 		 * time. Hence, use synchronous-mode PQgetCopyData().
616 		 */
617 		while (1)
618 		{
619 			int			r;
620 
621 			if (copybuf != NULL)
622 			{
623 				PQfreemem(copybuf);
624 				copybuf = NULL;
625 			}
626 			r = PQgetCopyData(conn, &copybuf, 0);
627 			if (r == -1)
628 				break;
629 			if (r == -2)
630 			{
631 				fprintf(stderr, _("%s: could not read COPY data: %s"),
632 						progname, PQerrorMessage(conn));
633 				time_to_abort = false;	/* unclean exit */
634 				goto error;
635 			}
636 		}
637 
638 		res = PQgetResult(conn);
639 	}
640 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
641 	{
642 		fprintf(stderr,
643 				_("%s: unexpected termination of replication stream: %s"),
644 				progname, PQresultErrorMessage(res));
645 		goto error;
646 	}
647 	PQclear(res);
648 
649 	if (outfd != -1 && strcmp(outfile, "-") != 0)
650 	{
651 		TimestampTz t = feGetCurrentTimestamp();
652 
653 		/* no need to jump to error on failure here, we're finishing anyway */
654 		OutputFsync(t);
655 
656 		if (close(outfd) != 0)
657 			fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
658 					progname, outfile, strerror(errno));
659 	}
660 	outfd = -1;
661 error:
662 	if (copybuf != NULL)
663 	{
664 		PQfreemem(copybuf);
665 		copybuf = NULL;
666 	}
667 	destroyPQExpBuffer(query);
668 	PQfinish(conn);
669 	conn = NULL;
670 }
671 
672 /*
673  * Unfortunately we can't do sensible signal handling on windows...
674  */
675 #ifndef WIN32
676 
677 /*
678  * When sigint is called, just tell the system to exit at the next possible
679  * moment.
680  */
681 static void
sigint_handler(int signum)682 sigint_handler(int signum)
683 {
684 	time_to_abort = true;
685 }
686 
687 /*
688  * Trigger the output file to be reopened.
689  */
690 static void
sighup_handler(int signum)691 sighup_handler(int signum)
692 {
693 	output_reopen = true;
694 }
695 #endif
696 
697 
698 int
main(int argc,char ** argv)699 main(int argc, char **argv)
700 {
701 	static struct option long_options[] = {
702 /* general options */
703 		{"file", required_argument, NULL, 'f'},
704 		{"fsync-interval", required_argument, NULL, 'F'},
705 		{"no-loop", no_argument, NULL, 'n'},
706 		{"verbose", no_argument, NULL, 'v'},
707 		{"version", no_argument, NULL, 'V'},
708 		{"help", no_argument, NULL, '?'},
709 /* connection options */
710 		{"dbname", required_argument, NULL, 'd'},
711 		{"host", required_argument, NULL, 'h'},
712 		{"port", required_argument, NULL, 'p'},
713 		{"username", required_argument, NULL, 'U'},
714 		{"no-password", no_argument, NULL, 'w'},
715 		{"password", no_argument, NULL, 'W'},
716 /* replication options */
717 		{"startpos", required_argument, NULL, 'I'},
718 		{"endpos", required_argument, NULL, 'E'},
719 		{"option", required_argument, NULL, 'o'},
720 		{"plugin", required_argument, NULL, 'P'},
721 		{"status-interval", required_argument, NULL, 's'},
722 		{"slot", required_argument, NULL, 'S'},
723 /* action */
724 		{"create-slot", no_argument, NULL, 1},
725 		{"start", no_argument, NULL, 2},
726 		{"drop-slot", no_argument, NULL, 3},
727 		{"if-not-exists", no_argument, NULL, 4},
728 		{NULL, 0, NULL, 0}
729 	};
730 	int			c;
731 	int			option_index;
732 	uint32		hi,
733 				lo;
734 	char	   *db_name;
735 
736 	progname = get_progname(argv[0]);
737 	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
738 
739 	if (argc > 1)
740 	{
741 		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
742 		{
743 			usage();
744 			exit(0);
745 		}
746 		else if (strcmp(argv[1], "-V") == 0 ||
747 				 strcmp(argv[1], "--version") == 0)
748 		{
749 			puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
750 			exit(0);
751 		}
752 	}
753 
754 	while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
755 							long_options, &option_index)) != -1)
756 	{
757 		switch (c)
758 		{
759 /* general options */
760 			case 'f':
761 				outfile = pg_strdup(optarg);
762 				break;
763 			case 'F':
764 				fsync_interval = atoi(optarg) * 1000;
765 				if (fsync_interval < 0)
766 				{
767 					fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
768 							progname, optarg);
769 					exit(1);
770 				}
771 				break;
772 			case 'n':
773 				noloop = 1;
774 				break;
775 			case 'v':
776 				verbose++;
777 				break;
778 /* connection options */
779 			case 'd':
780 				dbname = pg_strdup(optarg);
781 				break;
782 			case 'h':
783 				dbhost = pg_strdup(optarg);
784 				break;
785 			case 'p':
786 				if (atoi(optarg) <= 0)
787 				{
788 					fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
789 							progname, optarg);
790 					exit(1);
791 				}
792 				dbport = pg_strdup(optarg);
793 				break;
794 			case 'U':
795 				dbuser = pg_strdup(optarg);
796 				break;
797 			case 'w':
798 				dbgetpassword = -1;
799 				break;
800 			case 'W':
801 				dbgetpassword = 1;
802 				break;
803 /* replication options */
804 			case 'I':
805 				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
806 				{
807 					fprintf(stderr,
808 							_("%s: could not parse start position \"%s\"\n"),
809 							progname, optarg);
810 					exit(1);
811 				}
812 				startpos = ((uint64) hi) << 32 | lo;
813 				break;
814 			case 'E':
815 				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
816 				{
817 					fprintf(stderr,
818 							_("%s: could not parse end position \"%s\"\n"),
819 							progname, optarg);
820 					exit(1);
821 				}
822 				endpos = ((uint64) hi) << 32 | lo;
823 				break;
824 			case 'o':
825 				{
826 					char	   *data = pg_strdup(optarg);
827 					char	   *val = strchr(data, '=');
828 
829 					if (val != NULL)
830 					{
831 						/* remove =; separate data from val */
832 						*val = '\0';
833 						val++;
834 					}
835 
836 					noptions += 1;
837 					options = pg_realloc(options, sizeof(char *) * noptions * 2);
838 
839 					options[(noptions - 1) * 2] = data;
840 					options[(noptions - 1) * 2 + 1] = val;
841 				}
842 
843 				break;
844 			case 'P':
845 				plugin = pg_strdup(optarg);
846 				break;
847 			case 's':
848 				standby_message_timeout = atoi(optarg) * 1000;
849 				if (standby_message_timeout < 0)
850 				{
851 					fprintf(stderr, _("%s: invalid status interval \"%s\"\n"),
852 							progname, optarg);
853 					exit(1);
854 				}
855 				break;
856 			case 'S':
857 				replication_slot = pg_strdup(optarg);
858 				break;
859 /* action */
860 			case 1:
861 				do_create_slot = true;
862 				break;
863 			case 2:
864 				do_start_slot = true;
865 				break;
866 			case 3:
867 				do_drop_slot = true;
868 				break;
869 			case 4:
870 				slot_exists_ok = true;
871 				break;
872 
873 			default:
874 
875 				/*
876 				 * getopt_long already emitted a complaint
877 				 */
878 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
879 						progname);
880 				exit(1);
881 		}
882 	}
883 
884 	/*
885 	 * Any non-option arguments?
886 	 */
887 	if (optind < argc)
888 	{
889 		fprintf(stderr,
890 				_("%s: too many command-line arguments (first is \"%s\")\n"),
891 				progname, argv[optind]);
892 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
893 				progname);
894 		exit(1);
895 	}
896 
897 	/*
898 	 * Required arguments
899 	 */
900 	if (replication_slot == NULL)
901 	{
902 		fprintf(stderr, _("%s: no slot specified\n"), progname);
903 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
904 				progname);
905 		exit(1);
906 	}
907 
908 	if (do_start_slot && outfile == NULL)
909 	{
910 		fprintf(stderr, _("%s: no target file specified\n"), progname);
911 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
912 				progname);
913 		exit(1);
914 	}
915 
916 	if (!do_drop_slot && dbname == NULL)
917 	{
918 		fprintf(stderr, _("%s: no database specified\n"), progname);
919 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
920 				progname);
921 		exit(1);
922 	}
923 
924 	if (!do_drop_slot && !do_create_slot && !do_start_slot)
925 	{
926 		fprintf(stderr, _("%s: at least one action needs to be specified\n"), progname);
927 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
928 				progname);
929 		exit(1);
930 	}
931 
932 	if (do_drop_slot && (do_create_slot || do_start_slot))
933 	{
934 		fprintf(stderr, _("%s: cannot use --create-slot or --start together with --drop-slot\n"), progname);
935 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
936 				progname);
937 		exit(1);
938 	}
939 
940 	if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
941 	{
942 		fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos\n"), progname);
943 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
944 				progname);
945 		exit(1);
946 	}
947 
948 	if (endpos != InvalidXLogRecPtr && !do_start_slot)
949 	{
950 		fprintf(stderr,
951 				_("%s: --endpos may only be specified with --start\n"),
952 				progname);
953 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
954 				progname);
955 		exit(1);
956 	}
957 
958 #ifndef WIN32
959 	pqsignal(SIGINT, sigint_handler);
960 	pqsignal(SIGHUP, sighup_handler);
961 #endif
962 
963 	/*
964 	 * Obtain a connection to server. This is not really necessary but it
965 	 * helps to get more precise error messages about authentication, required
966 	 * GUC parameters and such.
967 	 */
968 	conn = GetConnection();
969 	if (!conn)
970 		/* Error message already written in GetConnection() */
971 		exit(1);
972 
973 	/*
974 	 * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
975 	 * replication connection.
976 	 */
977 	if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
978 		disconnect_and_exit(1);
979 
980 	if (db_name == NULL)
981 	{
982 		fprintf(stderr,
983 				_("%s: could not establish database-specific replication connection\n"),
984 				progname);
985 		disconnect_and_exit(1);
986 	}
987 
988 	/* Drop a replication slot. */
989 	if (do_drop_slot)
990 	{
991 		if (verbose)
992 			fprintf(stderr,
993 					_("%s: dropping replication slot \"%s\"\n"),
994 					progname, replication_slot);
995 
996 		if (!DropReplicationSlot(conn, replication_slot))
997 			disconnect_and_exit(1);
998 	}
999 
1000 	/* Create a replication slot. */
1001 	if (do_create_slot)
1002 	{
1003 		if (verbose)
1004 			fprintf(stderr,
1005 					_("%s: creating replication slot \"%s\"\n"),
1006 					progname, replication_slot);
1007 
1008 		if (!CreateReplicationSlot(conn, replication_slot, plugin,
1009 								   false, slot_exists_ok))
1010 			disconnect_and_exit(1);
1011 		startpos = InvalidXLogRecPtr;
1012 	}
1013 
1014 	if (!do_start_slot)
1015 		disconnect_and_exit(0);
1016 
1017 	/* Stream loop */
1018 	while (true)
1019 	{
1020 		StreamLogicalLog();
1021 		if (time_to_abort)
1022 		{
1023 			/*
1024 			 * We've been Ctrl-C'ed or reached an exit limit condition. That's
1025 			 * not an error, so exit without an errorcode.
1026 			 */
1027 			disconnect_and_exit(0);
1028 		}
1029 		else if (noloop)
1030 		{
1031 			fprintf(stderr, _("%s: disconnected\n"), progname);
1032 			exit(1);
1033 		}
1034 		else
1035 		{
1036 			fprintf(stderr,
1037 			/* translator: check source for value for %d */
1038 					_("%s: disconnected; waiting %d seconds to try again\n"),
1039 					progname, RECONNECT_SLEEP_TIME);
1040 			pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1041 		}
1042 	}
1043 }
1044 
1045 /*
1046  * Fsync our output data, and send a feedback message to the server.  Returns
1047  * true if successful, false otherwise.
1048  *
1049  * If successful, *now is updated to the current timestamp just before sending
1050  * feedback.
1051  */
1052 static bool
flushAndSendFeedback(PGconn * conn,TimestampTz * now)1053 flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1054 {
1055 	/* flush data to disk, so that we send a recent flush pointer */
1056 	if (!OutputFsync(*now))
1057 		return false;
1058 	*now = feGetCurrentTimestamp();
1059 	if (!sendFeedback(conn, *now, true, false))
1060 		return false;
1061 
1062 	return true;
1063 }
1064 
1065 /*
1066  * Try to inform the server about of upcoming demise, but don't wait around or
1067  * retry on failure.
1068  */
1069 static void
prepareToTerminate(PGconn * conn,XLogRecPtr endpos,bool keepalive,XLogRecPtr lsn)1070 prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
1071 {
1072 	(void) PQputCopyEnd(conn, NULL);
1073 	(void) PQflush(conn);
1074 
1075 	if (verbose)
1076 	{
1077 		if (keepalive)
1078 			fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
1079 					progname,
1080 					(uint32) (endpos >> 32), (uint32) endpos);
1081 		else
1082 			fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
1083 					progname, (uint32) (endpos >> 32), (uint32) (endpos),
1084 					(uint32) (lsn >> 32), (uint32) lsn);
1085 
1086 	}
1087 }
1088