1 /*-------------------------------------------------------------------------
2  *
3  * receivelog.c - receive WAL files using the streaming
4  *				  replication protocol.
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  *		  src/bin/pg_basebackup/receivelog.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22 
23 #include "access/xlog_internal.h"
24 #include "common/file_utils.h"
25 #include "common/logging.h"
26 #include "libpq-fe.h"
27 #include "receivelog.h"
28 #include "streamutil.h"
29 
30 /* fd and filename for currently open WAL file */
31 static Walfile *walfile = NULL;
32 static char current_walfile_name[MAXPGPATH] = "";
33 static bool reportFlushPosition = false;
34 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
35 
36 static bool still_sending = true;	/* feedback still needs to be sent? */
37 
38 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
39 								  XLogRecPtr *stoppos);
40 static int	CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
41 static int	CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
42 							  char **buffer);
43 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
44 								int len, XLogRecPtr blockpos, TimestampTz *last_status);
45 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
46 							   XLogRecPtr *blockpos);
47 static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
48 									   XLogRecPtr blockpos, XLogRecPtr *stoppos);
49 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
50 								XLogRecPtr *stoppos);
51 static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
52 										 TimestampTz last_status);
53 
54 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
55 									 uint32 *timeline);
56 
57 static bool
mark_file_as_archived(StreamCtl * stream,const char * fname)58 mark_file_as_archived(StreamCtl *stream, const char *fname)
59 {
60 	Walfile    *f;
61 	static char tmppath[MAXPGPATH];
62 
63 	snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
64 			 fname);
65 
66 	f = stream->walmethod->open_for_write(tmppath, NULL, 0);
67 	if (f == NULL)
68 	{
69 		pg_log_error("could not create archive status file \"%s\": %s",
70 					 tmppath, stream->walmethod->getlasterror());
71 		return false;
72 	}
73 
74 	stream->walmethod->close(f, CLOSE_NORMAL);
75 
76 	return true;
77 }
78 
79 /*
80  * Open a new WAL file in the specified directory.
81  *
82  * Returns true if OK; on failure, returns false after printing an error msg.
83  * On success, 'walfile' is set to the FD for the file, and the base filename
84  * (without partial_suffix) is stored in 'current_walfile_name'.
85  *
86  * The file will be padded to 16Mb with zeroes.
87  */
88 static bool
open_walfile(StreamCtl * stream,XLogRecPtr startpoint)89 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
90 {
91 	Walfile    *f;
92 	char	   *fn;
93 	ssize_t		size;
94 	XLogSegNo	segno;
95 
96 	XLByteToSeg(startpoint, segno, WalSegSz);
97 	XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
98 
99 	/* Note that this considers the compression used if necessary */
100 	fn = stream->walmethod->get_file_name(current_walfile_name,
101 										  stream->partial_suffix);
102 
103 	/*
104 	 * When streaming to files, if an existing file exists we verify that it's
105 	 * either empty (just created), or a complete WalSegSz segment (in which
106 	 * case it has been created and padded). Anything else indicates a corrupt
107 	 * file. Compressed files have no need for padding, so just ignore this
108 	 * case.
109 	 *
110 	 * When streaming to tar, no file with this name will exist before, so we
111 	 * never have to verify a size.
112 	 */
113 	if (stream->walmethod->compression() == 0 &&
114 		stream->walmethod->existsfile(fn))
115 	{
116 		size = stream->walmethod->get_file_size(fn);
117 		if (size < 0)
118 		{
119 			pg_log_error("could not get size of write-ahead log file \"%s\": %s",
120 						 fn, stream->walmethod->getlasterror());
121 			pg_free(fn);
122 			return false;
123 		}
124 		if (size == WalSegSz)
125 		{
126 			/* Already padded file. Open it for use */
127 			f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
128 			if (f == NULL)
129 			{
130 				pg_log_error("could not open existing write-ahead log file \"%s\": %s",
131 							 fn, stream->walmethod->getlasterror());
132 				pg_free(fn);
133 				return false;
134 			}
135 
136 			/* fsync file in case of a previous crash */
137 			if (stream->walmethod->sync(f) != 0)
138 			{
139 				pg_log_fatal("could not fsync existing write-ahead log file \"%s\": %s",
140 							 fn, stream->walmethod->getlasterror());
141 				stream->walmethod->close(f, CLOSE_UNLINK);
142 				exit(1);
143 			}
144 
145 			walfile = f;
146 			pg_free(fn);
147 			return true;
148 		}
149 		if (size != 0)
150 		{
151 			/* if write didn't set errno, assume problem is no disk space */
152 			if (errno == 0)
153 				errno = ENOSPC;
154 			pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
155 								  "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
156 								  size),
157 						 fn, (int) size, WalSegSz);
158 			pg_free(fn);
159 			return false;
160 		}
161 		/* File existed and was empty, so fall through and open */
162 	}
163 
164 	/* No file existed, so create one */
165 
166 	f = stream->walmethod->open_for_write(current_walfile_name,
167 										  stream->partial_suffix, WalSegSz);
168 	if (f == NULL)
169 	{
170 		pg_log_error("could not open write-ahead log file \"%s\": %s",
171 					 fn, stream->walmethod->getlasterror());
172 		pg_free(fn);
173 		return false;
174 	}
175 
176 	pg_free(fn);
177 	walfile = f;
178 	return true;
179 }
180 
181 /*
182  * Close the current WAL file (if open), and rename it to the correct
183  * filename if it's complete. On failure, prints an error message to stderr
184  * and returns false, otherwise returns true.
185  */
186 static bool
close_walfile(StreamCtl * stream,XLogRecPtr pos)187 close_walfile(StreamCtl *stream, XLogRecPtr pos)
188 {
189 	off_t		currpos;
190 	int			r;
191 
192 	if (walfile == NULL)
193 		return true;
194 
195 	currpos = stream->walmethod->get_current_pos(walfile);
196 	if (currpos == -1)
197 	{
198 		pg_log_error("could not determine seek position in file \"%s\": %s",
199 					 current_walfile_name, stream->walmethod->getlasterror());
200 		stream->walmethod->close(walfile, CLOSE_UNLINK);
201 		walfile = NULL;
202 
203 		return false;
204 	}
205 
206 	if (stream->partial_suffix)
207 	{
208 		if (currpos == WalSegSz)
209 			r = stream->walmethod->close(walfile, CLOSE_NORMAL);
210 		else
211 		{
212 			pg_log_info("not renaming \"%s%s\", segment is not complete",
213 						current_walfile_name, stream->partial_suffix);
214 			r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
215 		}
216 	}
217 	else
218 		r = stream->walmethod->close(walfile, CLOSE_NORMAL);
219 
220 	walfile = NULL;
221 
222 	if (r != 0)
223 	{
224 		pg_log_error("could not close file \"%s\": %s",
225 					 current_walfile_name, stream->walmethod->getlasterror());
226 		return false;
227 	}
228 
229 	/*
230 	 * Mark file as archived if requested by the caller - pg_basebackup needs
231 	 * to do so as files can otherwise get archived again after promotion of a
232 	 * new node. This is in line with walreceiver.c always doing a
233 	 * XLogArchiveForceDone() after a complete segment.
234 	 */
235 	if (currpos == WalSegSz && stream->mark_done)
236 	{
237 		/* writes error message if failed */
238 		if (!mark_file_as_archived(stream, current_walfile_name))
239 			return false;
240 	}
241 
242 	lastFlushPosition = pos;
243 	return true;
244 }
245 
246 
247 /*
248  * Check if a timeline history file exists.
249  */
250 static bool
existsTimeLineHistoryFile(StreamCtl * stream)251 existsTimeLineHistoryFile(StreamCtl *stream)
252 {
253 	char		histfname[MAXFNAMELEN];
254 
255 	/*
256 	 * Timeline 1 never has a history file. We treat that as if it existed,
257 	 * since we never need to stream it.
258 	 */
259 	if (stream->timeline == 1)
260 		return true;
261 
262 	TLHistoryFileName(histfname, stream->timeline);
263 
264 	return stream->walmethod->existsfile(histfname);
265 }
266 
267 static bool
writeTimeLineHistoryFile(StreamCtl * stream,char * filename,char * content)268 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
269 {
270 	int			size = strlen(content);
271 	char		histfname[MAXFNAMELEN];
272 	Walfile    *f;
273 
274 	/*
275 	 * Check that the server's idea of how timeline history files should be
276 	 * named matches ours.
277 	 */
278 	TLHistoryFileName(histfname, stream->timeline);
279 	if (strcmp(histfname, filename) != 0)
280 	{
281 		pg_log_error("server reported unexpected history file name for timeline %u: %s",
282 					 stream->timeline, filename);
283 		return false;
284 	}
285 
286 	f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
287 	if (f == NULL)
288 	{
289 		pg_log_error("could not create timeline history file \"%s\": %s",
290 					 histfname, stream->walmethod->getlasterror());
291 		return false;
292 	}
293 
294 	if ((int) stream->walmethod->write(f, content, size) != size)
295 	{
296 		pg_log_error("could not write timeline history file \"%s\": %s",
297 					 histfname, stream->walmethod->getlasterror());
298 
299 		/*
300 		 * If we fail to make the file, delete it to release disk space
301 		 */
302 		stream->walmethod->close(f, CLOSE_UNLINK);
303 
304 		return false;
305 	}
306 
307 	if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
308 	{
309 		pg_log_error("could not close file \"%s\": %s",
310 					 histfname, stream->walmethod->getlasterror());
311 		return false;
312 	}
313 
314 	/* Maintain archive_status, check close_walfile() for details. */
315 	if (stream->mark_done)
316 	{
317 		/* writes error message if failed */
318 		if (!mark_file_as_archived(stream, histfname))
319 			return false;
320 	}
321 
322 	return true;
323 }
324 
325 /*
326  * Send a Standby Status Update message to server.
327  */
328 static bool
sendFeedback(PGconn * conn,XLogRecPtr blockpos,TimestampTz now,bool replyRequested)329 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
330 {
331 	char		replybuf[1 + 8 + 8 + 8 + 8 + 1];
332 	int			len = 0;
333 
334 	replybuf[len] = 'r';
335 	len += 1;
336 	fe_sendint64(blockpos, &replybuf[len]); /* write */
337 	len += 8;
338 	if (reportFlushPosition)
339 		fe_sendint64(lastFlushPosition, &replybuf[len]);	/* flush */
340 	else
341 		fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);	/* flush */
342 	len += 8;
343 	fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);	/* apply */
344 	len += 8;
345 	fe_sendint64(now, &replybuf[len]);	/* sendTime */
346 	len += 8;
347 	replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
348 	len += 1;
349 
350 	if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
351 	{
352 		pg_log_error("could not send feedback packet: %s",
353 					 PQerrorMessage(conn));
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /*
361  * Check that the server version we're connected to is supported by
362  * ReceiveXlogStream().
363  *
364  * If it's not, an error message is printed to stderr, and false is returned.
365  */
366 bool
CheckServerVersionForStreaming(PGconn * conn)367 CheckServerVersionForStreaming(PGconn *conn)
368 {
369 	int			minServerMajor,
370 				maxServerMajor;
371 	int			serverMajor;
372 
373 	/*
374 	 * The message format used in streaming replication changed in 9.3, so we
375 	 * cannot stream from older servers. And we don't support servers newer
376 	 * than the client; it might work, but we don't know, so err on the safe
377 	 * side.
378 	 */
379 	minServerMajor = 903;
380 	maxServerMajor = PG_VERSION_NUM / 100;
381 	serverMajor = PQserverVersion(conn) / 100;
382 	if (serverMajor < minServerMajor)
383 	{
384 		const char *serverver = PQparameterStatus(conn, "server_version");
385 
386 		pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
387 					 serverver ? serverver : "'unknown'",
388 					 "9.3");
389 		return false;
390 	}
391 	else if (serverMajor > maxServerMajor)
392 	{
393 		const char *serverver = PQparameterStatus(conn, "server_version");
394 
395 		pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
396 					 serverver ? serverver : "'unknown'",
397 					 PG_VERSION);
398 		return false;
399 	}
400 	return true;
401 }
402 
403 /*
404  * Receive a log stream starting at the specified position.
405  *
406  * Individual parameters are passed through the StreamCtl structure.
407  *
408  * If sysidentifier is specified, validate that both the system
409  * identifier and the timeline matches the specified ones
410  * (by sending an extra IDENTIFY_SYSTEM command)
411  *
412  * All received segments will be written to the directory
413  * specified by basedir. This will also fetch any missing timeline history
414  * files.
415  *
416  * The stream_stop callback will be called every time data
417  * is received, and whenever a segment is completed. If it returns
418  * true, the streaming will stop and the function
419  * return. As long as it returns false, streaming will continue
420  * indefinitely.
421  *
422  * If stream_stop() checks for external input, stop_socket should be set to
423  * the FD it checks.  This will allow such input to be detected promptly
424  * rather than after standby_message_timeout (which might be indefinite).
425  * Note that signals will interrupt waits for input as well, but that is
426  * race-y since a signal received while busy won't interrupt the wait.
427  *
428  * standby_message_timeout controls how often we send a message
429  * back to the master letting it know our progress, in milliseconds.
430  * Zero means no messages are sent.
431  * This message will only contain the write location, and never
432  * flush or replay.
433  *
434  * If 'partial_suffix' is not NULL, files are initially created with the
435  * given suffix, and the suffix is removed once the file is finished. That
436  * allows you to tell the difference between partial and completed files,
437  * so that you can continue later where you left.
438  *
439  * If 'synchronous' is true, the received WAL is flushed as soon as written,
440  * otherwise only when the WAL file is closed.
441  *
442  * Note: The WAL location *must* be at a log segment start!
443  */
444 bool
ReceiveXlogStream(PGconn * conn,StreamCtl * stream)445 ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
446 {
447 	char		query[128];
448 	char		slotcmd[128];
449 	PGresult   *res;
450 	XLogRecPtr	stoppos;
451 
452 	/*
453 	 * The caller should've checked the server version already, but doesn't do
454 	 * any harm to check it here too.
455 	 */
456 	if (!CheckServerVersionForStreaming(conn))
457 		return false;
458 
459 	/*
460 	 * Decide whether we want to report the flush position. If we report the
461 	 * flush position, the primary will know what WAL we'll possibly
462 	 * re-request, and it can then remove older WAL safely. We must always do
463 	 * that when we are using slots.
464 	 *
465 	 * Reporting the flush position makes one eligible as a synchronous
466 	 * replica. People shouldn't include generic names in
467 	 * synchronous_standby_names, but we've protected them against it so far,
468 	 * so let's continue to do so unless specifically requested.
469 	 */
470 	if (stream->replication_slot != NULL)
471 	{
472 		reportFlushPosition = true;
473 		sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
474 	}
475 	else
476 	{
477 		if (stream->synchronous)
478 			reportFlushPosition = true;
479 		else
480 			reportFlushPosition = false;
481 		slotcmd[0] = 0;
482 	}
483 
484 	if (stream->sysidentifier != NULL)
485 	{
486 		/* Validate system identifier hasn't changed */
487 		res = PQexec(conn, "IDENTIFY_SYSTEM");
488 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
489 		{
490 			pg_log_error("could not send replication command \"%s\": %s",
491 						 "IDENTIFY_SYSTEM", PQerrorMessage(conn));
492 			PQclear(res);
493 			return false;
494 		}
495 		if (PQntuples(res) != 1 || PQnfields(res) < 3)
496 		{
497 			pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
498 						 PQntuples(res), PQnfields(res), 1, 3);
499 			PQclear(res);
500 			return false;
501 		}
502 		if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
503 		{
504 			pg_log_error("system identifier does not match between base backup and streaming connection");
505 			PQclear(res);
506 			return false;
507 		}
508 		if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
509 		{
510 			pg_log_error("starting timeline %u is not present in the server",
511 						 stream->timeline);
512 			PQclear(res);
513 			return false;
514 		}
515 		PQclear(res);
516 	}
517 
518 	/*
519 	 * initialize flush position to starting point, it's the caller's
520 	 * responsibility that that's sane.
521 	 */
522 	lastFlushPosition = stream->startpos;
523 
524 	while (1)
525 	{
526 		/*
527 		 * Fetch the timeline history file for this timeline, if we don't have
528 		 * it already. When streaming log to tar, this will always return
529 		 * false, as we are never streaming into an existing file and
530 		 * therefore there can be no pre-existing timeline history file.
531 		 */
532 		if (!existsTimeLineHistoryFile(stream))
533 		{
534 			snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
535 			res = PQexec(conn, query);
536 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
537 			{
538 				/* FIXME: we might send it ok, but get an error */
539 				pg_log_error("could not send replication command \"%s\": %s",
540 							 "TIMELINE_HISTORY", PQresultErrorMessage(res));
541 				PQclear(res);
542 				return false;
543 			}
544 
545 			/*
546 			 * The response to TIMELINE_HISTORY is a single row result set
547 			 * with two fields: filename and content
548 			 */
549 			if (PQnfields(res) != 2 || PQntuples(res) != 1)
550 			{
551 				pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
552 							   PQntuples(res), PQnfields(res), 1, 2);
553 			}
554 
555 			/* Write the history file to disk */
556 			writeTimeLineHistoryFile(stream,
557 									 PQgetvalue(res, 0, 0),
558 									 PQgetvalue(res, 0, 1));
559 
560 			PQclear(res);
561 		}
562 
563 		/*
564 		 * Before we start streaming from the requested location, check if the
565 		 * callback tells us to stop here.
566 		 */
567 		if (stream->stream_stop(stream->startpos, stream->timeline, false))
568 			return true;
569 
570 		/* Initiate the replication stream at specified location */
571 		snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
572 				 slotcmd,
573 				 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
574 				 stream->timeline);
575 		res = PQexec(conn, query);
576 		if (PQresultStatus(res) != PGRES_COPY_BOTH)
577 		{
578 			pg_log_error("could not send replication command \"%s\": %s",
579 						 "START_REPLICATION", PQresultErrorMessage(res));
580 			PQclear(res);
581 			return false;
582 		}
583 		PQclear(res);
584 
585 		/* Stream the WAL */
586 		res = HandleCopyStream(conn, stream, &stoppos);
587 		if (res == NULL)
588 			goto error;
589 
590 		/*
591 		 * Streaming finished.
592 		 *
593 		 * There are two possible reasons for that: a controlled shutdown, or
594 		 * we reached the end of the current timeline. In case of
595 		 * end-of-timeline, the server sends a result set after Copy has
596 		 * finished, containing information about the next timeline. Read
597 		 * that, and restart streaming from the next timeline. In case of
598 		 * controlled shutdown, stop here.
599 		 */
600 		if (PQresultStatus(res) == PGRES_TUPLES_OK)
601 		{
602 			/*
603 			 * End-of-timeline. Read the next timeline's ID and starting
604 			 * position. Usually, the starting position will match the end of
605 			 * the previous timeline, but there are corner cases like if the
606 			 * server had sent us half of a WAL record, when it was promoted.
607 			 * The new timeline will begin at the end of the last complete
608 			 * record in that case, overlapping the partial WAL record on the
609 			 * old timeline.
610 			 */
611 			uint32		newtimeline;
612 			bool		parsed;
613 
614 			parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
615 			PQclear(res);
616 			if (!parsed)
617 				goto error;
618 
619 			/* Sanity check the values the server gave us */
620 			if (newtimeline <= stream->timeline)
621 			{
622 				pg_log_error("server reported unexpected next timeline %u, following timeline %u",
623 							 newtimeline, stream->timeline);
624 				goto error;
625 			}
626 			if (stream->startpos > stoppos)
627 			{
628 				pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
629 							 stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
630 							 newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
631 				goto error;
632 			}
633 
634 			/* Read the final result, which should be CommandComplete. */
635 			res = PQgetResult(conn);
636 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
637 			{
638 				pg_log_error("unexpected termination of replication stream: %s",
639 							 PQresultErrorMessage(res));
640 				PQclear(res);
641 				goto error;
642 			}
643 			PQclear(res);
644 
645 			/*
646 			 * Loop back to start streaming from the new timeline. Always
647 			 * start streaming at the beginning of a segment.
648 			 */
649 			stream->timeline = newtimeline;
650 			stream->startpos = stream->startpos -
651 				XLogSegmentOffset(stream->startpos, WalSegSz);
652 			continue;
653 		}
654 		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
655 		{
656 			PQclear(res);
657 
658 			/*
659 			 * End of replication (ie. controlled shut down of the server).
660 			 *
661 			 * Check if the callback thinks it's OK to stop here. If not,
662 			 * complain.
663 			 */
664 			if (stream->stream_stop(stoppos, stream->timeline, false))
665 				return true;
666 			else
667 			{
668 				pg_log_error("replication stream was terminated before stop point");
669 				goto error;
670 			}
671 		}
672 		else
673 		{
674 			/* Server returned an error. */
675 			pg_log_error("unexpected termination of replication stream: %s",
676 						 PQresultErrorMessage(res));
677 			PQclear(res);
678 			goto error;
679 		}
680 	}
681 
682 error:
683 	if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
684 		pg_log_error("could not close file \"%s\": %s",
685 					 current_walfile_name, stream->walmethod->getlasterror());
686 	walfile = NULL;
687 	return false;
688 }
689 
690 /*
691  * Helper function to parse the result set returned by server after streaming
692  * has finished. On failure, prints an error to stderr and returns false.
693  */
694 static bool
ReadEndOfStreamingResult(PGresult * res,XLogRecPtr * startpos,uint32 * timeline)695 ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
696 {
697 	uint32		startpos_xlogid,
698 				startpos_xrecoff;
699 
700 	/*----------
701 	 * The result set consists of one row and two columns, e.g:
702 	 *
703 	 *	next_tli | next_tli_startpos
704 	 * ----------+-------------------
705 	 *		   4 | 0/9949AE0
706 	 *
707 	 * next_tli is the timeline ID of the next timeline after the one that
708 	 * just finished streaming. next_tli_startpos is the WAL location where
709 	 * the server switched to it.
710 	 *----------
711 	 */
712 	if (PQnfields(res) < 2 || PQntuples(res) != 1)
713 	{
714 		pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
715 					 PQntuples(res), PQnfields(res), 1, 2);
716 		return false;
717 	}
718 
719 	*timeline = atoi(PQgetvalue(res, 0, 0));
720 	if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
721 			   &startpos_xrecoff) != 2)
722 	{
723 		pg_log_error("could not parse next timeline's starting point \"%s\"",
724 					 PQgetvalue(res, 0, 1));
725 		return false;
726 	}
727 	*startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
728 
729 	return true;
730 }
731 
732 /*
733  * The main loop of ReceiveXlogStream. Handles the COPY stream after
734  * initiating streaming with the START_REPLICATION command.
735  *
736  * If the COPY ends (not necessarily successfully) due a message from the
737  * server, returns a PGresult and sets *stoppos to the last byte written.
738  * On any other sort of error, returns NULL.
739  */
740 static PGresult *
HandleCopyStream(PGconn * conn,StreamCtl * stream,XLogRecPtr * stoppos)741 HandleCopyStream(PGconn *conn, StreamCtl *stream,
742 				 XLogRecPtr *stoppos)
743 {
744 	char	   *copybuf = NULL;
745 	TimestampTz last_status = -1;
746 	XLogRecPtr	blockpos = stream->startpos;
747 
748 	still_sending = true;
749 
750 	while (1)
751 	{
752 		int			r;
753 		TimestampTz now;
754 		long		sleeptime;
755 
756 		/*
757 		 * Check if we should continue streaming, or abort at this point.
758 		 */
759 		if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
760 			goto error;
761 
762 		now = feGetCurrentTimestamp();
763 
764 		/*
765 		 * If synchronous option is true, issue sync command as soon as there
766 		 * are WAL data which has not been flushed yet.
767 		 */
768 		if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
769 		{
770 			if (stream->walmethod->sync(walfile) != 0)
771 			{
772 				pg_log_fatal("could not fsync file \"%s\": %s",
773 							 current_walfile_name, stream->walmethod->getlasterror());
774 				exit(1);
775 			}
776 			lastFlushPosition = blockpos;
777 
778 			/*
779 			 * Send feedback so that the server sees the latest WAL locations
780 			 * immediately.
781 			 */
782 			if (!sendFeedback(conn, blockpos, now, false))
783 				goto error;
784 			last_status = now;
785 		}
786 
787 		/*
788 		 * Potentially send a status message to the master
789 		 */
790 		if (still_sending && stream->standby_message_timeout > 0 &&
791 			feTimestampDifferenceExceeds(last_status, now,
792 										 stream->standby_message_timeout))
793 		{
794 			/* Time to send feedback! */
795 			if (!sendFeedback(conn, blockpos, now, false))
796 				goto error;
797 			last_status = now;
798 		}
799 
800 		/*
801 		 * Calculate how long send/receive loops should sleep
802 		 */
803 		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
804 												 last_status);
805 
806 		r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
807 		while (r != 0)
808 		{
809 			if (r == -1)
810 				goto error;
811 			if (r == -2)
812 			{
813 				PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
814 
815 				if (res == NULL)
816 					goto error;
817 				else
818 					return res;
819 			}
820 
821 			/* Check the message type. */
822 			if (copybuf[0] == 'k')
823 			{
824 				if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
825 										 &last_status))
826 					goto error;
827 			}
828 			else if (copybuf[0] == 'w')
829 			{
830 				if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
831 					goto error;
832 
833 				/*
834 				 * Check if we should continue streaming, or abort at this
835 				 * point.
836 				 */
837 				if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
838 					goto error;
839 			}
840 			else
841 			{
842 				pg_log_error("unrecognized streaming header: \"%c\"",
843 							 copybuf[0]);
844 				goto error;
845 			}
846 
847 			/*
848 			 * Process the received data, and any subsequent data we can read
849 			 * without blocking.
850 			 */
851 			r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
852 		}
853 	}
854 
855 error:
856 	if (copybuf != NULL)
857 		PQfreemem(copybuf);
858 	return NULL;
859 }
860 
861 /*
862  * Wait until we can read a CopyData message,
863  * or timeout, or occurrence of a signal or input on the stop_socket.
864  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
865  *
866  * Returns 1 if data has become available for reading, 0 if timed out
867  * or interrupted by signal or stop_socket input, and -1 on an error.
868  */
869 static int
CopyStreamPoll(PGconn * conn,long timeout_ms,pgsocket stop_socket)870 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
871 {
872 	int			ret;
873 	fd_set		input_mask;
874 	int			connsocket;
875 	int			maxfd;
876 	struct timeval timeout;
877 	struct timeval *timeoutptr;
878 
879 	connsocket = PQsocket(conn);
880 	if (connsocket < 0)
881 	{
882 		pg_log_error("invalid socket: %s", PQerrorMessage(conn));
883 		return -1;
884 	}
885 
886 	FD_ZERO(&input_mask);
887 	FD_SET(connsocket, &input_mask);
888 	maxfd = connsocket;
889 	if (stop_socket != PGINVALID_SOCKET)
890 	{
891 		FD_SET(stop_socket, &input_mask);
892 		maxfd = Max(maxfd, stop_socket);
893 	}
894 
895 	if (timeout_ms < 0)
896 		timeoutptr = NULL;
897 	else
898 	{
899 		timeout.tv_sec = timeout_ms / 1000L;
900 		timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
901 		timeoutptr = &timeout;
902 	}
903 
904 	ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
905 
906 	if (ret < 0)
907 	{
908 		if (errno == EINTR)
909 			return 0;			/* Got a signal, so not an error */
910 		pg_log_error("select() failed: %m");
911 		return -1;
912 	}
913 	if (ret > 0 && FD_ISSET(connsocket, &input_mask))
914 		return 1;				/* Got input on connection socket */
915 
916 	return 0;					/* Got timeout or input on stop_socket */
917 }
918 
919 /*
920  * Receive CopyData message available from XLOG stream, blocking for
921  * maximum of 'timeout' ms.
922  *
923  * If data was received, returns the length of the data. *buffer is set to
924  * point to a buffer holding the received message. The buffer is only valid
925  * until the next CopyStreamReceive call.
926  *
927  * Returns 0 if no data was available within timeout, or if wait was
928  * interrupted by signal or stop_socket input.
929  * -1 on error. -2 if the server ended the COPY.
930  */
931 static int
CopyStreamReceive(PGconn * conn,long timeout,pgsocket stop_socket,char ** buffer)932 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
933 				  char **buffer)
934 {
935 	char	   *copybuf = NULL;
936 	int			rawlen;
937 
938 	if (*buffer != NULL)
939 		PQfreemem(*buffer);
940 	*buffer = NULL;
941 
942 	/* Try to receive a CopyData message */
943 	rawlen = PQgetCopyData(conn, &copybuf, 1);
944 	if (rawlen == 0)
945 	{
946 		int			ret;
947 
948 		/*
949 		 * No data available.  Wait for some to appear, but not longer than
950 		 * the specified timeout, so that we can ping the server.  Also stop
951 		 * waiting if input appears on stop_socket.
952 		 */
953 		ret = CopyStreamPoll(conn, timeout, stop_socket);
954 		if (ret <= 0)
955 			return ret;
956 
957 		/* Now there is actually data on the socket */
958 		if (PQconsumeInput(conn) == 0)
959 		{
960 			pg_log_error("could not receive data from WAL stream: %s",
961 						 PQerrorMessage(conn));
962 			return -1;
963 		}
964 
965 		/* Now that we've consumed some input, try again */
966 		rawlen = PQgetCopyData(conn, &copybuf, 1);
967 		if (rawlen == 0)
968 			return 0;
969 	}
970 	if (rawlen == -1)			/* end-of-streaming or error */
971 		return -2;
972 	if (rawlen == -2)
973 	{
974 		pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
975 		return -1;
976 	}
977 
978 	/* Return received messages to caller */
979 	*buffer = copybuf;
980 	return rawlen;
981 }
982 
983 /*
984  * Process the keepalive message.
985  */
986 static bool
ProcessKeepaliveMsg(PGconn * conn,StreamCtl * stream,char * copybuf,int len,XLogRecPtr blockpos,TimestampTz * last_status)987 ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
988 					XLogRecPtr blockpos, TimestampTz *last_status)
989 {
990 	int			pos;
991 	bool		replyRequested;
992 	TimestampTz now;
993 
994 	/*
995 	 * Parse the keepalive message, enclosed in the CopyData message. We just
996 	 * check if the server requested a reply, and ignore the rest.
997 	 */
998 	pos = 1;					/* skip msgtype 'k' */
999 	pos += 8;					/* skip walEnd */
1000 	pos += 8;					/* skip sendTime */
1001 
1002 	if (len < pos + 1)
1003 	{
1004 		pg_log_error("streaming header too small: %d", len);
1005 		return false;
1006 	}
1007 	replyRequested = copybuf[pos];
1008 
1009 	/* If the server requested an immediate reply, send one. */
1010 	if (replyRequested && still_sending)
1011 	{
1012 		if (reportFlushPosition && lastFlushPosition < blockpos &&
1013 			walfile != NULL)
1014 		{
1015 			/*
1016 			 * If a valid flush location needs to be reported, flush the
1017 			 * current WAL file so that the latest flush location is sent back
1018 			 * to the server. This is necessary to see whether the last WAL
1019 			 * data has been successfully replicated or not, at the normal
1020 			 * shutdown of the server.
1021 			 */
1022 			if (stream->walmethod->sync(walfile) != 0)
1023 			{
1024 				pg_log_fatal("could not fsync file \"%s\": %s",
1025 							 current_walfile_name, stream->walmethod->getlasterror());
1026 				exit(1);
1027 			}
1028 			lastFlushPosition = blockpos;
1029 		}
1030 
1031 		now = feGetCurrentTimestamp();
1032 		if (!sendFeedback(conn, blockpos, now, false))
1033 			return false;
1034 		*last_status = now;
1035 	}
1036 
1037 	return true;
1038 }
1039 
1040 /*
1041  * Process XLogData message.
1042  */
1043 static bool
ProcessXLogDataMsg(PGconn * conn,StreamCtl * stream,char * copybuf,int len,XLogRecPtr * blockpos)1044 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1045 				   XLogRecPtr *blockpos)
1046 {
1047 	int			xlogoff;
1048 	int			bytes_left;
1049 	int			bytes_written;
1050 	int			hdr_len;
1051 
1052 	/*
1053 	 * Once we've decided we don't want to receive any more, just ignore any
1054 	 * subsequent XLogData messages.
1055 	 */
1056 	if (!(still_sending))
1057 		return true;
1058 
1059 	/*
1060 	 * Read the header of the XLogData message, enclosed in the CopyData
1061 	 * message. We only need the WAL location field (dataStart), the rest of
1062 	 * the header is ignored.
1063 	 */
1064 	hdr_len = 1;				/* msgtype 'w' */
1065 	hdr_len += 8;				/* dataStart */
1066 	hdr_len += 8;				/* walEnd */
1067 	hdr_len += 8;				/* sendTime */
1068 	if (len < hdr_len)
1069 	{
1070 		pg_log_error("streaming header too small: %d", len);
1071 		return false;
1072 	}
1073 	*blockpos = fe_recvint64(&copybuf[1]);
1074 
1075 	/* Extract WAL location for this block */
1076 	xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1077 
1078 	/*
1079 	 * Verify that the initial location in the stream matches where we think
1080 	 * we are.
1081 	 */
1082 	if (walfile == NULL)
1083 	{
1084 		/* No file open yet */
1085 		if (xlogoff != 0)
1086 		{
1087 			pg_log_error("received write-ahead log record for offset %u with no file open",
1088 						 xlogoff);
1089 			return false;
1090 		}
1091 	}
1092 	else
1093 	{
1094 		/* More data in existing segment */
1095 		if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1096 		{
1097 			pg_log_error("got WAL data offset %08x, expected %08x",
1098 						 xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1099 			return false;
1100 		}
1101 	}
1102 
1103 	bytes_left = len - hdr_len;
1104 	bytes_written = 0;
1105 
1106 	while (bytes_left)
1107 	{
1108 		int			bytes_to_write;
1109 
1110 		/*
1111 		 * If crossing a WAL boundary, only write up until we reach wal
1112 		 * segment size.
1113 		 */
1114 		if (xlogoff + bytes_left > WalSegSz)
1115 			bytes_to_write = WalSegSz - xlogoff;
1116 		else
1117 			bytes_to_write = bytes_left;
1118 
1119 		if (walfile == NULL)
1120 		{
1121 			if (!open_walfile(stream, *blockpos))
1122 			{
1123 				/* Error logged by open_walfile */
1124 				return false;
1125 			}
1126 		}
1127 
1128 		if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1129 									 bytes_to_write) != bytes_to_write)
1130 		{
1131 			pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
1132 						 bytes_to_write, current_walfile_name,
1133 						 stream->walmethod->getlasterror());
1134 			return false;
1135 		}
1136 
1137 		/* Write was successful, advance our position */
1138 		bytes_written += bytes_to_write;
1139 		bytes_left -= bytes_to_write;
1140 		*blockpos += bytes_to_write;
1141 		xlogoff += bytes_to_write;
1142 
1143 		/* Did we reach the end of a WAL segment? */
1144 		if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1145 		{
1146 			if (!close_walfile(stream, *blockpos))
1147 				/* Error message written in close_walfile() */
1148 				return false;
1149 
1150 			xlogoff = 0;
1151 
1152 			if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1153 			{
1154 				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1155 				{
1156 					pg_log_error("could not send copy-end packet: %s",
1157 								 PQerrorMessage(conn));
1158 					return false;
1159 				}
1160 				still_sending = false;
1161 				return true;	/* ignore the rest of this XLogData packet */
1162 			}
1163 		}
1164 	}
1165 	/* No more data left to write, receive next copy packet */
1166 
1167 	return true;
1168 }
1169 
1170 /*
1171  * Handle end of the copy stream.
1172  */
1173 static PGresult *
HandleEndOfCopyStream(PGconn * conn,StreamCtl * stream,char * copybuf,XLogRecPtr blockpos,XLogRecPtr * stoppos)1174 HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1175 					  XLogRecPtr blockpos, XLogRecPtr *stoppos)
1176 {
1177 	PGresult   *res = PQgetResult(conn);
1178 
1179 	/*
1180 	 * The server closed its end of the copy stream.  If we haven't closed
1181 	 * ours already, we need to do so now, unless the server threw an error,
1182 	 * in which case we don't.
1183 	 */
1184 	if (still_sending)
1185 	{
1186 		if (!close_walfile(stream, blockpos))
1187 		{
1188 			/* Error message written in close_walfile() */
1189 			PQclear(res);
1190 			return NULL;
1191 		}
1192 		if (PQresultStatus(res) == PGRES_COPY_IN)
1193 		{
1194 			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1195 			{
1196 				pg_log_error("could not send copy-end packet: %s",
1197 							 PQerrorMessage(conn));
1198 				PQclear(res);
1199 				return NULL;
1200 			}
1201 			res = PQgetResult(conn);
1202 		}
1203 		still_sending = false;
1204 	}
1205 	if (copybuf != NULL)
1206 		PQfreemem(copybuf);
1207 	*stoppos = blockpos;
1208 	return res;
1209 }
1210 
1211 /*
1212  * Check if we should continue streaming, or abort at this point.
1213  */
1214 static bool
CheckCopyStreamStop(PGconn * conn,StreamCtl * stream,XLogRecPtr blockpos,XLogRecPtr * stoppos)1215 CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
1216 					XLogRecPtr *stoppos)
1217 {
1218 	if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1219 	{
1220 		if (!close_walfile(stream, blockpos))
1221 		{
1222 			/* Potential error message is written by close_walfile */
1223 			return false;
1224 		}
1225 		if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1226 		{
1227 			pg_log_error("could not send copy-end packet: %s",
1228 						 PQerrorMessage(conn));
1229 			return false;
1230 		}
1231 		still_sending = false;
1232 	}
1233 
1234 	return true;
1235 }
1236 
1237 /*
1238  * Calculate how long send/receive loops should sleep
1239  */
1240 static long
CalculateCopyStreamSleeptime(TimestampTz now,int standby_message_timeout,TimestampTz last_status)1241 CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1242 							 TimestampTz last_status)
1243 {
1244 	TimestampTz status_targettime = 0;
1245 	long		sleeptime;
1246 
1247 	if (standby_message_timeout && still_sending)
1248 		status_targettime = last_status +
1249 			(standby_message_timeout - 1) * ((int64) 1000);
1250 
1251 	if (status_targettime > 0)
1252 	{
1253 		long		secs;
1254 		int			usecs;
1255 
1256 		feTimestampDifference(now,
1257 							  status_targettime,
1258 							  &secs,
1259 							  &usecs);
1260 		/* Always sleep at least 1 sec */
1261 		if (secs <= 0)
1262 		{
1263 			secs = 1;
1264 			usecs = 0;
1265 		}
1266 
1267 		sleeptime = secs * 1000 + usecs / 1000;
1268 	}
1269 	else
1270 		sleeptime = -1;
1271 
1272 	return sleeptime;
1273 }
1274