1 /*-------------------------------------------------------------------------
2 *
3 * receivelog.c - receive WAL files using the streaming
4 * replication protocol.
5 *
6 * Author: Magnus Hagander <magnus@hagander.net>
7 *
8 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
9 *
10 * IDENTIFICATION
11 * src/bin/pg_basebackup/receivelog.c
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres_fe.h"
16
17 #include <sys/stat.h>
18 #include <unistd.h>
19 #ifdef HAVE_SYS_SELECT_H
20 #include <sys/select.h>
21 #endif
22
23 #include "access/xlog_internal.h"
24 #include "common/file_utils.h"
25 #include "common/logging.h"
26 #include "libpq-fe.h"
27 #include "receivelog.h"
28 #include "streamutil.h"
29
30 /* fd and filename for currently open WAL file */
31 static Walfile *walfile = NULL;
32 static char current_walfile_name[MAXPGPATH] = "";
33 static bool reportFlushPosition = false;
34 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
35
36 static bool still_sending = true; /* feedback still needs to be sent? */
37
38 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
39 XLogRecPtr *stoppos);
40 static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
41 static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
42 char **buffer);
43 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
44 int len, XLogRecPtr blockpos, TimestampTz *last_status);
45 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
46 XLogRecPtr *blockpos);
47 static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
48 XLogRecPtr blockpos, XLogRecPtr *stoppos);
49 static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
50 XLogRecPtr *stoppos);
51 static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
52 TimestampTz last_status);
53
54 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
55 uint32 *timeline);
56
57 static bool
mark_file_as_archived(StreamCtl * stream,const char * fname)58 mark_file_as_archived(StreamCtl *stream, const char *fname)
59 {
60 Walfile *f;
61 static char tmppath[MAXPGPATH];
62
63 snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
64 fname);
65
66 f = stream->walmethod->open_for_write(tmppath, NULL, 0);
67 if (f == NULL)
68 {
69 pg_log_error("could not create archive status file \"%s\": %s",
70 tmppath, stream->walmethod->getlasterror());
71 return false;
72 }
73
74 stream->walmethod->close(f, CLOSE_NORMAL);
75
76 return true;
77 }
78
79 /*
80 * Open a new WAL file in the specified directory.
81 *
82 * Returns true if OK; on failure, returns false after printing an error msg.
83 * On success, 'walfile' is set to the FD for the file, and the base filename
84 * (without partial_suffix) is stored in 'current_walfile_name'.
85 *
86 * The file will be padded to 16Mb with zeroes.
87 */
88 static bool
open_walfile(StreamCtl * stream,XLogRecPtr startpoint)89 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
90 {
91 Walfile *f;
92 char *fn;
93 ssize_t size;
94 XLogSegNo segno;
95
96 XLByteToSeg(startpoint, segno, WalSegSz);
97 XLogFileName(current_walfile_name, stream->timeline, segno, WalSegSz);
98
99 /* Note that this considers the compression used if necessary */
100 fn = stream->walmethod->get_file_name(current_walfile_name,
101 stream->partial_suffix);
102
103 /*
104 * When streaming to files, if an existing file exists we verify that it's
105 * either empty (just created), or a complete WalSegSz segment (in which
106 * case it has been created and padded). Anything else indicates a corrupt
107 * file. Compressed files have no need for padding, so just ignore this
108 * case.
109 *
110 * When streaming to tar, no file with this name will exist before, so we
111 * never have to verify a size.
112 */
113 if (stream->walmethod->compression() == 0 &&
114 stream->walmethod->existsfile(fn))
115 {
116 size = stream->walmethod->get_file_size(fn);
117 if (size < 0)
118 {
119 pg_log_error("could not get size of write-ahead log file \"%s\": %s",
120 fn, stream->walmethod->getlasterror());
121 pg_free(fn);
122 return false;
123 }
124 if (size == WalSegSz)
125 {
126 /* Already padded file. Open it for use */
127 f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
128 if (f == NULL)
129 {
130 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
131 fn, stream->walmethod->getlasterror());
132 pg_free(fn);
133 return false;
134 }
135
136 /* fsync file in case of a previous crash */
137 if (stream->walmethod->sync(f) != 0)
138 {
139 pg_log_fatal("could not fsync existing write-ahead log file \"%s\": %s",
140 fn, stream->walmethod->getlasterror());
141 stream->walmethod->close(f, CLOSE_UNLINK);
142 exit(1);
143 }
144
145 walfile = f;
146 pg_free(fn);
147 return true;
148 }
149 if (size != 0)
150 {
151 /* if write didn't set errno, assume problem is no disk space */
152 if (errno == 0)
153 errno = ENOSPC;
154 pg_log_error(ngettext("write-ahead log file \"%s\" has %d byte, should be 0 or %d",
155 "write-ahead log file \"%s\" has %d bytes, should be 0 or %d",
156 size),
157 fn, (int) size, WalSegSz);
158 pg_free(fn);
159 return false;
160 }
161 /* File existed and was empty, so fall through and open */
162 }
163
164 /* No file existed, so create one */
165
166 f = stream->walmethod->open_for_write(current_walfile_name,
167 stream->partial_suffix, WalSegSz);
168 if (f == NULL)
169 {
170 pg_log_error("could not open write-ahead log file \"%s\": %s",
171 fn, stream->walmethod->getlasterror());
172 pg_free(fn);
173 return false;
174 }
175
176 pg_free(fn);
177 walfile = f;
178 return true;
179 }
180
181 /*
182 * Close the current WAL file (if open), and rename it to the correct
183 * filename if it's complete. On failure, prints an error message to stderr
184 * and returns false, otherwise returns true.
185 */
186 static bool
close_walfile(StreamCtl * stream,XLogRecPtr pos)187 close_walfile(StreamCtl *stream, XLogRecPtr pos)
188 {
189 off_t currpos;
190 int r;
191
192 if (walfile == NULL)
193 return true;
194
195 currpos = stream->walmethod->get_current_pos(walfile);
196 if (currpos == -1)
197 {
198 pg_log_error("could not determine seek position in file \"%s\": %s",
199 current_walfile_name, stream->walmethod->getlasterror());
200 stream->walmethod->close(walfile, CLOSE_UNLINK);
201 walfile = NULL;
202
203 return false;
204 }
205
206 if (stream->partial_suffix)
207 {
208 if (currpos == WalSegSz)
209 r = stream->walmethod->close(walfile, CLOSE_NORMAL);
210 else
211 {
212 pg_log_info("not renaming \"%s%s\", segment is not complete",
213 current_walfile_name, stream->partial_suffix);
214 r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
215 }
216 }
217 else
218 r = stream->walmethod->close(walfile, CLOSE_NORMAL);
219
220 walfile = NULL;
221
222 if (r != 0)
223 {
224 pg_log_error("could not close file \"%s\": %s",
225 current_walfile_name, stream->walmethod->getlasterror());
226 return false;
227 }
228
229 /*
230 * Mark file as archived if requested by the caller - pg_basebackup needs
231 * to do so as files can otherwise get archived again after promotion of a
232 * new node. This is in line with walreceiver.c always doing a
233 * XLogArchiveForceDone() after a complete segment.
234 */
235 if (currpos == WalSegSz && stream->mark_done)
236 {
237 /* writes error message if failed */
238 if (!mark_file_as_archived(stream, current_walfile_name))
239 return false;
240 }
241
242 lastFlushPosition = pos;
243 return true;
244 }
245
246
247 /*
248 * Check if a timeline history file exists.
249 */
250 static bool
existsTimeLineHistoryFile(StreamCtl * stream)251 existsTimeLineHistoryFile(StreamCtl *stream)
252 {
253 char histfname[MAXFNAMELEN];
254
255 /*
256 * Timeline 1 never has a history file. We treat that as if it existed,
257 * since we never need to stream it.
258 */
259 if (stream->timeline == 1)
260 return true;
261
262 TLHistoryFileName(histfname, stream->timeline);
263
264 return stream->walmethod->existsfile(histfname);
265 }
266
267 static bool
writeTimeLineHistoryFile(StreamCtl * stream,char * filename,char * content)268 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
269 {
270 int size = strlen(content);
271 char histfname[MAXFNAMELEN];
272 Walfile *f;
273
274 /*
275 * Check that the server's idea of how timeline history files should be
276 * named matches ours.
277 */
278 TLHistoryFileName(histfname, stream->timeline);
279 if (strcmp(histfname, filename) != 0)
280 {
281 pg_log_error("server reported unexpected history file name for timeline %u: %s",
282 stream->timeline, filename);
283 return false;
284 }
285
286 f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
287 if (f == NULL)
288 {
289 pg_log_error("could not create timeline history file \"%s\": %s",
290 histfname, stream->walmethod->getlasterror());
291 return false;
292 }
293
294 if ((int) stream->walmethod->write(f, content, size) != size)
295 {
296 pg_log_error("could not write timeline history file \"%s\": %s",
297 histfname, stream->walmethod->getlasterror());
298
299 /*
300 * If we fail to make the file, delete it to release disk space
301 */
302 stream->walmethod->close(f, CLOSE_UNLINK);
303
304 return false;
305 }
306
307 if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
308 {
309 pg_log_error("could not close file \"%s\": %s",
310 histfname, stream->walmethod->getlasterror());
311 return false;
312 }
313
314 /* Maintain archive_status, check close_walfile() for details. */
315 if (stream->mark_done)
316 {
317 /* writes error message if failed */
318 if (!mark_file_as_archived(stream, histfname))
319 return false;
320 }
321
322 return true;
323 }
324
325 /*
326 * Send a Standby Status Update message to server.
327 */
328 static bool
sendFeedback(PGconn * conn,XLogRecPtr blockpos,TimestampTz now,bool replyRequested)329 sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
330 {
331 char replybuf[1 + 8 + 8 + 8 + 8 + 1];
332 int len = 0;
333
334 replybuf[len] = 'r';
335 len += 1;
336 fe_sendint64(blockpos, &replybuf[len]); /* write */
337 len += 8;
338 if (reportFlushPosition)
339 fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
340 else
341 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
342 len += 8;
343 fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
344 len += 8;
345 fe_sendint64(now, &replybuf[len]); /* sendTime */
346 len += 8;
347 replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
348 len += 1;
349
350 if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
351 {
352 pg_log_error("could not send feedback packet: %s",
353 PQerrorMessage(conn));
354 return false;
355 }
356
357 return true;
358 }
359
360 /*
361 * Check that the server version we're connected to is supported by
362 * ReceiveXlogStream().
363 *
364 * If it's not, an error message is printed to stderr, and false is returned.
365 */
366 bool
CheckServerVersionForStreaming(PGconn * conn)367 CheckServerVersionForStreaming(PGconn *conn)
368 {
369 int minServerMajor,
370 maxServerMajor;
371 int serverMajor;
372
373 /*
374 * The message format used in streaming replication changed in 9.3, so we
375 * cannot stream from older servers. And we don't support servers newer
376 * than the client; it might work, but we don't know, so err on the safe
377 * side.
378 */
379 minServerMajor = 903;
380 maxServerMajor = PG_VERSION_NUM / 100;
381 serverMajor = PQserverVersion(conn) / 100;
382 if (serverMajor < minServerMajor)
383 {
384 const char *serverver = PQparameterStatus(conn, "server_version");
385
386 pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
387 serverver ? serverver : "'unknown'",
388 "9.3");
389 return false;
390 }
391 else if (serverMajor > maxServerMajor)
392 {
393 const char *serverver = PQparameterStatus(conn, "server_version");
394
395 pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
396 serverver ? serverver : "'unknown'",
397 PG_VERSION);
398 return false;
399 }
400 return true;
401 }
402
403 /*
404 * Receive a log stream starting at the specified position.
405 *
406 * Individual parameters are passed through the StreamCtl structure.
407 *
408 * If sysidentifier is specified, validate that both the system
409 * identifier and the timeline matches the specified ones
410 * (by sending an extra IDENTIFY_SYSTEM command)
411 *
412 * All received segments will be written to the directory
413 * specified by basedir. This will also fetch any missing timeline history
414 * files.
415 *
416 * The stream_stop callback will be called every time data
417 * is received, and whenever a segment is completed. If it returns
418 * true, the streaming will stop and the function
419 * return. As long as it returns false, streaming will continue
420 * indefinitely.
421 *
422 * If stream_stop() checks for external input, stop_socket should be set to
423 * the FD it checks. This will allow such input to be detected promptly
424 * rather than after standby_message_timeout (which might be indefinite).
425 * Note that signals will interrupt waits for input as well, but that is
426 * race-y since a signal received while busy won't interrupt the wait.
427 *
428 * standby_message_timeout controls how often we send a message
429 * back to the master letting it know our progress, in milliseconds.
430 * Zero means no messages are sent.
431 * This message will only contain the write location, and never
432 * flush or replay.
433 *
434 * If 'partial_suffix' is not NULL, files are initially created with the
435 * given suffix, and the suffix is removed once the file is finished. That
436 * allows you to tell the difference between partial and completed files,
437 * so that you can continue later where you left.
438 *
439 * If 'synchronous' is true, the received WAL is flushed as soon as written,
440 * otherwise only when the WAL file is closed.
441 *
442 * Note: The WAL location *must* be at a log segment start!
443 */
444 bool
ReceiveXlogStream(PGconn * conn,StreamCtl * stream)445 ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
446 {
447 char query[128];
448 char slotcmd[128];
449 PGresult *res;
450 XLogRecPtr stoppos;
451
452 /*
453 * The caller should've checked the server version already, but doesn't do
454 * any harm to check it here too.
455 */
456 if (!CheckServerVersionForStreaming(conn))
457 return false;
458
459 /*
460 * Decide whether we want to report the flush position. If we report the
461 * flush position, the primary will know what WAL we'll possibly
462 * re-request, and it can then remove older WAL safely. We must always do
463 * that when we are using slots.
464 *
465 * Reporting the flush position makes one eligible as a synchronous
466 * replica. People shouldn't include generic names in
467 * synchronous_standby_names, but we've protected them against it so far,
468 * so let's continue to do so unless specifically requested.
469 */
470 if (stream->replication_slot != NULL)
471 {
472 reportFlushPosition = true;
473 sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
474 }
475 else
476 {
477 if (stream->synchronous)
478 reportFlushPosition = true;
479 else
480 reportFlushPosition = false;
481 slotcmd[0] = 0;
482 }
483
484 if (stream->sysidentifier != NULL)
485 {
486 /* Validate system identifier hasn't changed */
487 res = PQexec(conn, "IDENTIFY_SYSTEM");
488 if (PQresultStatus(res) != PGRES_TUPLES_OK)
489 {
490 pg_log_error("could not send replication command \"%s\": %s",
491 "IDENTIFY_SYSTEM", PQerrorMessage(conn));
492 PQclear(res);
493 return false;
494 }
495 if (PQntuples(res) != 1 || PQnfields(res) < 3)
496 {
497 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
498 PQntuples(res), PQnfields(res), 1, 3);
499 PQclear(res);
500 return false;
501 }
502 if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
503 {
504 pg_log_error("system identifier does not match between base backup and streaming connection");
505 PQclear(res);
506 return false;
507 }
508 if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
509 {
510 pg_log_error("starting timeline %u is not present in the server",
511 stream->timeline);
512 PQclear(res);
513 return false;
514 }
515 PQclear(res);
516 }
517
518 /*
519 * initialize flush position to starting point, it's the caller's
520 * responsibility that that's sane.
521 */
522 lastFlushPosition = stream->startpos;
523
524 while (1)
525 {
526 /*
527 * Fetch the timeline history file for this timeline, if we don't have
528 * it already. When streaming log to tar, this will always return
529 * false, as we are never streaming into an existing file and
530 * therefore there can be no pre-existing timeline history file.
531 */
532 if (!existsTimeLineHistoryFile(stream))
533 {
534 snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
535 res = PQexec(conn, query);
536 if (PQresultStatus(res) != PGRES_TUPLES_OK)
537 {
538 /* FIXME: we might send it ok, but get an error */
539 pg_log_error("could not send replication command \"%s\": %s",
540 "TIMELINE_HISTORY", PQresultErrorMessage(res));
541 PQclear(res);
542 return false;
543 }
544
545 /*
546 * The response to TIMELINE_HISTORY is a single row result set
547 * with two fields: filename and content
548 */
549 if (PQnfields(res) != 2 || PQntuples(res) != 1)
550 {
551 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
552 PQntuples(res), PQnfields(res), 1, 2);
553 }
554
555 /* Write the history file to disk */
556 writeTimeLineHistoryFile(stream,
557 PQgetvalue(res, 0, 0),
558 PQgetvalue(res, 0, 1));
559
560 PQclear(res);
561 }
562
563 /*
564 * Before we start streaming from the requested location, check if the
565 * callback tells us to stop here.
566 */
567 if (stream->stream_stop(stream->startpos, stream->timeline, false))
568 return true;
569
570 /* Initiate the replication stream at specified location */
571 snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
572 slotcmd,
573 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
574 stream->timeline);
575 res = PQexec(conn, query);
576 if (PQresultStatus(res) != PGRES_COPY_BOTH)
577 {
578 pg_log_error("could not send replication command \"%s\": %s",
579 "START_REPLICATION", PQresultErrorMessage(res));
580 PQclear(res);
581 return false;
582 }
583 PQclear(res);
584
585 /* Stream the WAL */
586 res = HandleCopyStream(conn, stream, &stoppos);
587 if (res == NULL)
588 goto error;
589
590 /*
591 * Streaming finished.
592 *
593 * There are two possible reasons for that: a controlled shutdown, or
594 * we reached the end of the current timeline. In case of
595 * end-of-timeline, the server sends a result set after Copy has
596 * finished, containing information about the next timeline. Read
597 * that, and restart streaming from the next timeline. In case of
598 * controlled shutdown, stop here.
599 */
600 if (PQresultStatus(res) == PGRES_TUPLES_OK)
601 {
602 /*
603 * End-of-timeline. Read the next timeline's ID and starting
604 * position. Usually, the starting position will match the end of
605 * the previous timeline, but there are corner cases like if the
606 * server had sent us half of a WAL record, when it was promoted.
607 * The new timeline will begin at the end of the last complete
608 * record in that case, overlapping the partial WAL record on the
609 * old timeline.
610 */
611 uint32 newtimeline;
612 bool parsed;
613
614 parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
615 PQclear(res);
616 if (!parsed)
617 goto error;
618
619 /* Sanity check the values the server gave us */
620 if (newtimeline <= stream->timeline)
621 {
622 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
623 newtimeline, stream->timeline);
624 goto error;
625 }
626 if (stream->startpos > stoppos)
627 {
628 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
629 stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
630 newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
631 goto error;
632 }
633
634 /* Read the final result, which should be CommandComplete. */
635 res = PQgetResult(conn);
636 if (PQresultStatus(res) != PGRES_COMMAND_OK)
637 {
638 pg_log_error("unexpected termination of replication stream: %s",
639 PQresultErrorMessage(res));
640 PQclear(res);
641 goto error;
642 }
643 PQclear(res);
644
645 /*
646 * Loop back to start streaming from the new timeline. Always
647 * start streaming at the beginning of a segment.
648 */
649 stream->timeline = newtimeline;
650 stream->startpos = stream->startpos -
651 XLogSegmentOffset(stream->startpos, WalSegSz);
652 continue;
653 }
654 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
655 {
656 PQclear(res);
657
658 /*
659 * End of replication (ie. controlled shut down of the server).
660 *
661 * Check if the callback thinks it's OK to stop here. If not,
662 * complain.
663 */
664 if (stream->stream_stop(stoppos, stream->timeline, false))
665 return true;
666 else
667 {
668 pg_log_error("replication stream was terminated before stop point");
669 goto error;
670 }
671 }
672 else
673 {
674 /* Server returned an error. */
675 pg_log_error("unexpected termination of replication stream: %s",
676 PQresultErrorMessage(res));
677 PQclear(res);
678 goto error;
679 }
680 }
681
682 error:
683 if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
684 pg_log_error("could not close file \"%s\": %s",
685 current_walfile_name, stream->walmethod->getlasterror());
686 walfile = NULL;
687 return false;
688 }
689
690 /*
691 * Helper function to parse the result set returned by server after streaming
692 * has finished. On failure, prints an error to stderr and returns false.
693 */
694 static bool
ReadEndOfStreamingResult(PGresult * res,XLogRecPtr * startpos,uint32 * timeline)695 ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
696 {
697 uint32 startpos_xlogid,
698 startpos_xrecoff;
699
700 /*----------
701 * The result set consists of one row and two columns, e.g:
702 *
703 * next_tli | next_tli_startpos
704 * ----------+-------------------
705 * 4 | 0/9949AE0
706 *
707 * next_tli is the timeline ID of the next timeline after the one that
708 * just finished streaming. next_tli_startpos is the WAL location where
709 * the server switched to it.
710 *----------
711 */
712 if (PQnfields(res) < 2 || PQntuples(res) != 1)
713 {
714 pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
715 PQntuples(res), PQnfields(res), 1, 2);
716 return false;
717 }
718
719 *timeline = atoi(PQgetvalue(res, 0, 0));
720 if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
721 &startpos_xrecoff) != 2)
722 {
723 pg_log_error("could not parse next timeline's starting point \"%s\"",
724 PQgetvalue(res, 0, 1));
725 return false;
726 }
727 *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
728
729 return true;
730 }
731
732 /*
733 * The main loop of ReceiveXlogStream. Handles the COPY stream after
734 * initiating streaming with the START_REPLICATION command.
735 *
736 * If the COPY ends (not necessarily successfully) due a message from the
737 * server, returns a PGresult and sets *stoppos to the last byte written.
738 * On any other sort of error, returns NULL.
739 */
740 static PGresult *
HandleCopyStream(PGconn * conn,StreamCtl * stream,XLogRecPtr * stoppos)741 HandleCopyStream(PGconn *conn, StreamCtl *stream,
742 XLogRecPtr *stoppos)
743 {
744 char *copybuf = NULL;
745 TimestampTz last_status = -1;
746 XLogRecPtr blockpos = stream->startpos;
747
748 still_sending = true;
749
750 while (1)
751 {
752 int r;
753 TimestampTz now;
754 long sleeptime;
755
756 /*
757 * Check if we should continue streaming, or abort at this point.
758 */
759 if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
760 goto error;
761
762 now = feGetCurrentTimestamp();
763
764 /*
765 * If synchronous option is true, issue sync command as soon as there
766 * are WAL data which has not been flushed yet.
767 */
768 if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
769 {
770 if (stream->walmethod->sync(walfile) != 0)
771 {
772 pg_log_fatal("could not fsync file \"%s\": %s",
773 current_walfile_name, stream->walmethod->getlasterror());
774 exit(1);
775 }
776 lastFlushPosition = blockpos;
777
778 /*
779 * Send feedback so that the server sees the latest WAL locations
780 * immediately.
781 */
782 if (!sendFeedback(conn, blockpos, now, false))
783 goto error;
784 last_status = now;
785 }
786
787 /*
788 * Potentially send a status message to the master
789 */
790 if (still_sending && stream->standby_message_timeout > 0 &&
791 feTimestampDifferenceExceeds(last_status, now,
792 stream->standby_message_timeout))
793 {
794 /* Time to send feedback! */
795 if (!sendFeedback(conn, blockpos, now, false))
796 goto error;
797 last_status = now;
798 }
799
800 /*
801 * Calculate how long send/receive loops should sleep
802 */
803 sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
804 last_status);
805
806 r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
807 while (r != 0)
808 {
809 if (r == -1)
810 goto error;
811 if (r == -2)
812 {
813 PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
814
815 if (res == NULL)
816 goto error;
817 else
818 return res;
819 }
820
821 /* Check the message type. */
822 if (copybuf[0] == 'k')
823 {
824 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
825 &last_status))
826 goto error;
827 }
828 else if (copybuf[0] == 'w')
829 {
830 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
831 goto error;
832
833 /*
834 * Check if we should continue streaming, or abort at this
835 * point.
836 */
837 if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
838 goto error;
839 }
840 else
841 {
842 pg_log_error("unrecognized streaming header: \"%c\"",
843 copybuf[0]);
844 goto error;
845 }
846
847 /*
848 * Process the received data, and any subsequent data we can read
849 * without blocking.
850 */
851 r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
852 }
853 }
854
855 error:
856 if (copybuf != NULL)
857 PQfreemem(copybuf);
858 return NULL;
859 }
860
861 /*
862 * Wait until we can read a CopyData message,
863 * or timeout, or occurrence of a signal or input on the stop_socket.
864 * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
865 *
866 * Returns 1 if data has become available for reading, 0 if timed out
867 * or interrupted by signal or stop_socket input, and -1 on an error.
868 */
869 static int
CopyStreamPoll(PGconn * conn,long timeout_ms,pgsocket stop_socket)870 CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
871 {
872 int ret;
873 fd_set input_mask;
874 int connsocket;
875 int maxfd;
876 struct timeval timeout;
877 struct timeval *timeoutptr;
878
879 connsocket = PQsocket(conn);
880 if (connsocket < 0)
881 {
882 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
883 return -1;
884 }
885
886 FD_ZERO(&input_mask);
887 FD_SET(connsocket, &input_mask);
888 maxfd = connsocket;
889 if (stop_socket != PGINVALID_SOCKET)
890 {
891 FD_SET(stop_socket, &input_mask);
892 maxfd = Max(maxfd, stop_socket);
893 }
894
895 if (timeout_ms < 0)
896 timeoutptr = NULL;
897 else
898 {
899 timeout.tv_sec = timeout_ms / 1000L;
900 timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
901 timeoutptr = &timeout;
902 }
903
904 ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
905
906 if (ret < 0)
907 {
908 if (errno == EINTR)
909 return 0; /* Got a signal, so not an error */
910 pg_log_error("select() failed: %m");
911 return -1;
912 }
913 if (ret > 0 && FD_ISSET(connsocket, &input_mask))
914 return 1; /* Got input on connection socket */
915
916 return 0; /* Got timeout or input on stop_socket */
917 }
918
919 /*
920 * Receive CopyData message available from XLOG stream, blocking for
921 * maximum of 'timeout' ms.
922 *
923 * If data was received, returns the length of the data. *buffer is set to
924 * point to a buffer holding the received message. The buffer is only valid
925 * until the next CopyStreamReceive call.
926 *
927 * Returns 0 if no data was available within timeout, or if wait was
928 * interrupted by signal or stop_socket input.
929 * -1 on error. -2 if the server ended the COPY.
930 */
931 static int
CopyStreamReceive(PGconn * conn,long timeout,pgsocket stop_socket,char ** buffer)932 CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
933 char **buffer)
934 {
935 char *copybuf = NULL;
936 int rawlen;
937
938 if (*buffer != NULL)
939 PQfreemem(*buffer);
940 *buffer = NULL;
941
942 /* Try to receive a CopyData message */
943 rawlen = PQgetCopyData(conn, ©buf, 1);
944 if (rawlen == 0)
945 {
946 int ret;
947
948 /*
949 * No data available. Wait for some to appear, but not longer than
950 * the specified timeout, so that we can ping the server. Also stop
951 * waiting if input appears on stop_socket.
952 */
953 ret = CopyStreamPoll(conn, timeout, stop_socket);
954 if (ret <= 0)
955 return ret;
956
957 /* Now there is actually data on the socket */
958 if (PQconsumeInput(conn) == 0)
959 {
960 pg_log_error("could not receive data from WAL stream: %s",
961 PQerrorMessage(conn));
962 return -1;
963 }
964
965 /* Now that we've consumed some input, try again */
966 rawlen = PQgetCopyData(conn, ©buf, 1);
967 if (rawlen == 0)
968 return 0;
969 }
970 if (rawlen == -1) /* end-of-streaming or error */
971 return -2;
972 if (rawlen == -2)
973 {
974 pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
975 return -1;
976 }
977
978 /* Return received messages to caller */
979 *buffer = copybuf;
980 return rawlen;
981 }
982
983 /*
984 * Process the keepalive message.
985 */
986 static bool
ProcessKeepaliveMsg(PGconn * conn,StreamCtl * stream,char * copybuf,int len,XLogRecPtr blockpos,TimestampTz * last_status)987 ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
988 XLogRecPtr blockpos, TimestampTz *last_status)
989 {
990 int pos;
991 bool replyRequested;
992 TimestampTz now;
993
994 /*
995 * Parse the keepalive message, enclosed in the CopyData message. We just
996 * check if the server requested a reply, and ignore the rest.
997 */
998 pos = 1; /* skip msgtype 'k' */
999 pos += 8; /* skip walEnd */
1000 pos += 8; /* skip sendTime */
1001
1002 if (len < pos + 1)
1003 {
1004 pg_log_error("streaming header too small: %d", len);
1005 return false;
1006 }
1007 replyRequested = copybuf[pos];
1008
1009 /* If the server requested an immediate reply, send one. */
1010 if (replyRequested && still_sending)
1011 {
1012 if (reportFlushPosition && lastFlushPosition < blockpos &&
1013 walfile != NULL)
1014 {
1015 /*
1016 * If a valid flush location needs to be reported, flush the
1017 * current WAL file so that the latest flush location is sent back
1018 * to the server. This is necessary to see whether the last WAL
1019 * data has been successfully replicated or not, at the normal
1020 * shutdown of the server.
1021 */
1022 if (stream->walmethod->sync(walfile) != 0)
1023 {
1024 pg_log_fatal("could not fsync file \"%s\": %s",
1025 current_walfile_name, stream->walmethod->getlasterror());
1026 exit(1);
1027 }
1028 lastFlushPosition = blockpos;
1029 }
1030
1031 now = feGetCurrentTimestamp();
1032 if (!sendFeedback(conn, blockpos, now, false))
1033 return false;
1034 *last_status = now;
1035 }
1036
1037 return true;
1038 }
1039
1040 /*
1041 * Process XLogData message.
1042 */
1043 static bool
ProcessXLogDataMsg(PGconn * conn,StreamCtl * stream,char * copybuf,int len,XLogRecPtr * blockpos)1044 ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1045 XLogRecPtr *blockpos)
1046 {
1047 int xlogoff;
1048 int bytes_left;
1049 int bytes_written;
1050 int hdr_len;
1051
1052 /*
1053 * Once we've decided we don't want to receive any more, just ignore any
1054 * subsequent XLogData messages.
1055 */
1056 if (!(still_sending))
1057 return true;
1058
1059 /*
1060 * Read the header of the XLogData message, enclosed in the CopyData
1061 * message. We only need the WAL location field (dataStart), the rest of
1062 * the header is ignored.
1063 */
1064 hdr_len = 1; /* msgtype 'w' */
1065 hdr_len += 8; /* dataStart */
1066 hdr_len += 8; /* walEnd */
1067 hdr_len += 8; /* sendTime */
1068 if (len < hdr_len)
1069 {
1070 pg_log_error("streaming header too small: %d", len);
1071 return false;
1072 }
1073 *blockpos = fe_recvint64(©buf[1]);
1074
1075 /* Extract WAL location for this block */
1076 xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1077
1078 /*
1079 * Verify that the initial location in the stream matches where we think
1080 * we are.
1081 */
1082 if (walfile == NULL)
1083 {
1084 /* No file open yet */
1085 if (xlogoff != 0)
1086 {
1087 pg_log_error("received write-ahead log record for offset %u with no file open",
1088 xlogoff);
1089 return false;
1090 }
1091 }
1092 else
1093 {
1094 /* More data in existing segment */
1095 if (stream->walmethod->get_current_pos(walfile) != xlogoff)
1096 {
1097 pg_log_error("got WAL data offset %08x, expected %08x",
1098 xlogoff, (int) stream->walmethod->get_current_pos(walfile));
1099 return false;
1100 }
1101 }
1102
1103 bytes_left = len - hdr_len;
1104 bytes_written = 0;
1105
1106 while (bytes_left)
1107 {
1108 int bytes_to_write;
1109
1110 /*
1111 * If crossing a WAL boundary, only write up until we reach wal
1112 * segment size.
1113 */
1114 if (xlogoff + bytes_left > WalSegSz)
1115 bytes_to_write = WalSegSz - xlogoff;
1116 else
1117 bytes_to_write = bytes_left;
1118
1119 if (walfile == NULL)
1120 {
1121 if (!open_walfile(stream, *blockpos))
1122 {
1123 /* Error logged by open_walfile */
1124 return false;
1125 }
1126 }
1127
1128 if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
1129 bytes_to_write) != bytes_to_write)
1130 {
1131 pg_log_error("could not write %u bytes to WAL file \"%s\": %s",
1132 bytes_to_write, current_walfile_name,
1133 stream->walmethod->getlasterror());
1134 return false;
1135 }
1136
1137 /* Write was successful, advance our position */
1138 bytes_written += bytes_to_write;
1139 bytes_left -= bytes_to_write;
1140 *blockpos += bytes_to_write;
1141 xlogoff += bytes_to_write;
1142
1143 /* Did we reach the end of a WAL segment? */
1144 if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1145 {
1146 if (!close_walfile(stream, *blockpos))
1147 /* Error message written in close_walfile() */
1148 return false;
1149
1150 xlogoff = 0;
1151
1152 if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1153 {
1154 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1155 {
1156 pg_log_error("could not send copy-end packet: %s",
1157 PQerrorMessage(conn));
1158 return false;
1159 }
1160 still_sending = false;
1161 return true; /* ignore the rest of this XLogData packet */
1162 }
1163 }
1164 }
1165 /* No more data left to write, receive next copy packet */
1166
1167 return true;
1168 }
1169
1170 /*
1171 * Handle end of the copy stream.
1172 */
1173 static PGresult *
HandleEndOfCopyStream(PGconn * conn,StreamCtl * stream,char * copybuf,XLogRecPtr blockpos,XLogRecPtr * stoppos)1174 HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1175 XLogRecPtr blockpos, XLogRecPtr *stoppos)
1176 {
1177 PGresult *res = PQgetResult(conn);
1178
1179 /*
1180 * The server closed its end of the copy stream. If we haven't closed
1181 * ours already, we need to do so now, unless the server threw an error,
1182 * in which case we don't.
1183 */
1184 if (still_sending)
1185 {
1186 if (!close_walfile(stream, blockpos))
1187 {
1188 /* Error message written in close_walfile() */
1189 PQclear(res);
1190 return NULL;
1191 }
1192 if (PQresultStatus(res) == PGRES_COPY_IN)
1193 {
1194 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1195 {
1196 pg_log_error("could not send copy-end packet: %s",
1197 PQerrorMessage(conn));
1198 PQclear(res);
1199 return NULL;
1200 }
1201 res = PQgetResult(conn);
1202 }
1203 still_sending = false;
1204 }
1205 if (copybuf != NULL)
1206 PQfreemem(copybuf);
1207 *stoppos = blockpos;
1208 return res;
1209 }
1210
1211 /*
1212 * Check if we should continue streaming, or abort at this point.
1213 */
1214 static bool
CheckCopyStreamStop(PGconn * conn,StreamCtl * stream,XLogRecPtr blockpos,XLogRecPtr * stoppos)1215 CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
1216 XLogRecPtr *stoppos)
1217 {
1218 if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1219 {
1220 if (!close_walfile(stream, blockpos))
1221 {
1222 /* Potential error message is written by close_walfile */
1223 return false;
1224 }
1225 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1226 {
1227 pg_log_error("could not send copy-end packet: %s",
1228 PQerrorMessage(conn));
1229 return false;
1230 }
1231 still_sending = false;
1232 }
1233
1234 return true;
1235 }
1236
1237 /*
1238 * Calculate how long send/receive loops should sleep
1239 */
1240 static long
CalculateCopyStreamSleeptime(TimestampTz now,int standby_message_timeout,TimestampTz last_status)1241 CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1242 TimestampTz last_status)
1243 {
1244 TimestampTz status_targettime = 0;
1245 long sleeptime;
1246
1247 if (standby_message_timeout && still_sending)
1248 status_targettime = last_status +
1249 (standby_message_timeout - 1) * ((int64) 1000);
1250
1251 if (status_targettime > 0)
1252 {
1253 long secs;
1254 int usecs;
1255
1256 feTimestampDifference(now,
1257 status_targettime,
1258 &secs,
1259 &usecs);
1260 /* Always sleep at least 1 sec */
1261 if (secs <= 0)
1262 {
1263 secs = 1;
1264 usecs = 0;
1265 }
1266
1267 sleeptime = secs * 1000 + usecs / 1000;
1268 }
1269 else
1270 sleeptime = -1;
1271
1272 return sleeptime;
1273 }
1274