1 /*-------------------------------------------------------------------------
2 *
3 * xlogutils.c
4 *
5 * PostgreSQL write-ahead log manager utility routines
6 *
7 * This file contains support routines that are used by XLOG replay functions.
8 * None of this code is used during normal system operation.
9 *
10 *
11 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 * src/backend/access/transam/xlogutils.c
15 *
16 *-------------------------------------------------------------------------
17 */
18 #include "postgres.h"
19
20 #include <unistd.h>
21
22 #include "access/timeline.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xlogutils.h"
26 #include "catalog/catalog.h"
27 #include "miscadmin.h"
28 #include "pgstat.h"
29 #include "storage/smgr.h"
30 #include "utils/guc.h"
31 #include "utils/hsearch.h"
32 #include "utils/rel.h"
33
34
35 /*
36 * During XLOG replay, we may see XLOG records for incremental updates of
37 * pages that no longer exist, because their relation was later dropped or
38 * truncated. (Note: this is only possible when full_page_writes = OFF,
39 * since when it's ON, the first reference we see to a page should always
40 * be a full-page rewrite not an incremental update.) Rather than simply
41 * ignoring such records, we make a note of the referenced page, and then
42 * complain if we don't actually see a drop or truncate covering the page
43 * later in replay.
44 */
45 typedef struct xl_invalid_page_key
46 {
47 RelFileNode node; /* the relation */
48 ForkNumber forkno; /* the fork number */
49 BlockNumber blkno; /* the page */
50 } xl_invalid_page_key;
51
52 typedef struct xl_invalid_page
53 {
54 xl_invalid_page_key key; /* hash key ... must be first */
55 bool present; /* page existed but contained zeroes */
56 } xl_invalid_page;
57
58 static HTAB *invalid_page_tab = NULL;
59
60
61 /* Report a reference to an invalid page */
62 static void
report_invalid_page(int elevel,RelFileNode node,ForkNumber forkno,BlockNumber blkno,bool present)63 report_invalid_page(int elevel, RelFileNode node, ForkNumber forkno,
64 BlockNumber blkno, bool present)
65 {
66 char *path = relpathperm(node, forkno);
67
68 if (present)
69 elog(elevel, "page %u of relation %s is uninitialized",
70 blkno, path);
71 else
72 elog(elevel, "page %u of relation %s does not exist",
73 blkno, path);
74 pfree(path);
75 }
76
77 /* Log a reference to an invalid page */
78 static void
log_invalid_page(RelFileNode node,ForkNumber forkno,BlockNumber blkno,bool present)79 log_invalid_page(RelFileNode node, ForkNumber forkno, BlockNumber blkno,
80 bool present)
81 {
82 xl_invalid_page_key key;
83 xl_invalid_page *hentry;
84 bool found;
85
86 /*
87 * Once recovery has reached a consistent state, the invalid-page table
88 * should be empty and remain so. If a reference to an invalid page is
89 * found after consistency is reached, PANIC immediately. This might seem
90 * aggressive, but it's better than letting the invalid reference linger
91 * in the hash table until the end of recovery and PANIC there, which
92 * might come only much later if this is a standby server.
93 */
94 if (reachedConsistency)
95 {
96 report_invalid_page(WARNING, node, forkno, blkno, present);
97 elog(PANIC, "WAL contains references to invalid pages");
98 }
99
100 /*
101 * Log references to invalid pages at DEBUG1 level. This allows some
102 * tracing of the cause (note the elog context mechanism will tell us
103 * something about the XLOG record that generated the reference).
104 */
105 if (log_min_messages <= DEBUG1 || client_min_messages <= DEBUG1)
106 report_invalid_page(DEBUG1, node, forkno, blkno, present);
107
108 if (invalid_page_tab == NULL)
109 {
110 /* create hash table when first needed */
111 HASHCTL ctl;
112
113 memset(&ctl, 0, sizeof(ctl));
114 ctl.keysize = sizeof(xl_invalid_page_key);
115 ctl.entrysize = sizeof(xl_invalid_page);
116
117 invalid_page_tab = hash_create("XLOG invalid-page table",
118 100,
119 &ctl,
120 HASH_ELEM | HASH_BLOBS);
121 }
122
123 /* we currently assume xl_invalid_page_key contains no padding */
124 key.node = node;
125 key.forkno = forkno;
126 key.blkno = blkno;
127 hentry = (xl_invalid_page *)
128 hash_search(invalid_page_tab, (void *) &key, HASH_ENTER, &found);
129
130 if (!found)
131 {
132 /* hash_search already filled in the key */
133 hentry->present = present;
134 }
135 else
136 {
137 /* repeat reference ... leave "present" as it was */
138 }
139 }
140
141 /* Forget any invalid pages >= minblkno, because they've been dropped */
142 static void
forget_invalid_pages(RelFileNode node,ForkNumber forkno,BlockNumber minblkno)143 forget_invalid_pages(RelFileNode node, ForkNumber forkno, BlockNumber minblkno)
144 {
145 HASH_SEQ_STATUS status;
146 xl_invalid_page *hentry;
147
148 if (invalid_page_tab == NULL)
149 return; /* nothing to do */
150
151 hash_seq_init(&status, invalid_page_tab);
152
153 while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
154 {
155 if (RelFileNodeEquals(hentry->key.node, node) &&
156 hentry->key.forkno == forkno &&
157 hentry->key.blkno >= minblkno)
158 {
159 if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2)
160 {
161 char *path = relpathperm(hentry->key.node, forkno);
162
163 elog(DEBUG2, "page %u of relation %s has been dropped",
164 hentry->key.blkno, path);
165 pfree(path);
166 }
167
168 if (hash_search(invalid_page_tab,
169 (void *) &hentry->key,
170 HASH_REMOVE, NULL) == NULL)
171 elog(ERROR, "hash table corrupted");
172 }
173 }
174 }
175
176 /* Forget any invalid pages in a whole database */
177 static void
forget_invalid_pages_db(Oid dbid)178 forget_invalid_pages_db(Oid dbid)
179 {
180 HASH_SEQ_STATUS status;
181 xl_invalid_page *hentry;
182
183 if (invalid_page_tab == NULL)
184 return; /* nothing to do */
185
186 hash_seq_init(&status, invalid_page_tab);
187
188 while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
189 {
190 if (hentry->key.node.dbNode == dbid)
191 {
192 if (log_min_messages <= DEBUG2 || client_min_messages <= DEBUG2)
193 {
194 char *path = relpathperm(hentry->key.node, hentry->key.forkno);
195
196 elog(DEBUG2, "page %u of relation %s has been dropped",
197 hentry->key.blkno, path);
198 pfree(path);
199 }
200
201 if (hash_search(invalid_page_tab,
202 (void *) &hentry->key,
203 HASH_REMOVE, NULL) == NULL)
204 elog(ERROR, "hash table corrupted");
205 }
206 }
207 }
208
209 /* Are there any unresolved references to invalid pages? */
210 bool
XLogHaveInvalidPages(void)211 XLogHaveInvalidPages(void)
212 {
213 if (invalid_page_tab != NULL &&
214 hash_get_num_entries(invalid_page_tab) > 0)
215 return true;
216 return false;
217 }
218
219 /* Complain about any remaining invalid-page entries */
220 void
XLogCheckInvalidPages(void)221 XLogCheckInvalidPages(void)
222 {
223 HASH_SEQ_STATUS status;
224 xl_invalid_page *hentry;
225 bool foundone = false;
226
227 if (invalid_page_tab == NULL)
228 return; /* nothing to do */
229
230 hash_seq_init(&status, invalid_page_tab);
231
232 /*
233 * Our strategy is to emit WARNING messages for all remaining entries and
234 * only PANIC after we've dumped all the available info.
235 */
236 while ((hentry = (xl_invalid_page *) hash_seq_search(&status)) != NULL)
237 {
238 report_invalid_page(WARNING, hentry->key.node, hentry->key.forkno,
239 hentry->key.blkno, hentry->present);
240 foundone = true;
241 }
242
243 if (foundone)
244 elog(PANIC, "WAL contains references to invalid pages");
245
246 hash_destroy(invalid_page_tab);
247 invalid_page_tab = NULL;
248 }
249
250
251 /*
252 * XLogReadBufferForRedo
253 * Read a page during XLOG replay
254 *
255 * Reads a block referenced by a WAL record into shared buffer cache, and
256 * determines what needs to be done to redo the changes to it. If the WAL
257 * record includes a full-page image of the page, it is restored.
258 *
259 * 'lsn' is the LSN of the record being replayed. It is compared with the
260 * page's LSN to determine if the record has already been replayed.
261 * 'block_id' is the ID number the block was registered with, when the WAL
262 * record was created.
263 *
264 * Returns one of the following:
265 *
266 * BLK_NEEDS_REDO - changes from the WAL record need to be applied
267 * BLK_DONE - block doesn't need replaying
268 * BLK_RESTORED - block was restored from a full-page image included in
269 * the record
270 * BLK_NOTFOUND - block was not found (because it was truncated away by
271 * an operation later in the WAL stream)
272 *
273 * On return, the buffer is locked in exclusive-mode, and returned in *buf.
274 * Note that the buffer is locked and returned even if it doesn't need
275 * replaying. (Getting the buffer lock is not really necessary during
276 * single-process crash recovery, but some subroutines such as MarkBufferDirty
277 * will complain if we don't have the lock. In hot standby mode it's
278 * definitely necessary.)
279 *
280 * Note: when a backup block is available in XLOG with the BKPIMAGE_APPLY flag
281 * set, we restore it, even if the page in the database appears newer. This
282 * is to protect ourselves against database pages that were partially or
283 * incorrectly written during a crash. We assume that the XLOG data must be
284 * good because it has passed a CRC check, while the database page might not
285 * be. This will force us to replay all subsequent modifications of the page
286 * that appear in XLOG, rather than possibly ignoring them as already
287 * applied, but that's not a huge drawback.
288 */
289 XLogRedoAction
XLogReadBufferForRedo(XLogReaderState * record,uint8 block_id,Buffer * buf)290 XLogReadBufferForRedo(XLogReaderState *record, uint8 block_id,
291 Buffer *buf)
292 {
293 return XLogReadBufferForRedoExtended(record, block_id, RBM_NORMAL,
294 false, buf);
295 }
296
297 /*
298 * Pin and lock a buffer referenced by a WAL record, for the purpose of
299 * re-initializing it.
300 */
301 Buffer
XLogInitBufferForRedo(XLogReaderState * record,uint8 block_id)302 XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id)
303 {
304 Buffer buf;
305
306 XLogReadBufferForRedoExtended(record, block_id, RBM_ZERO_AND_LOCK, false,
307 &buf);
308 return buf;
309 }
310
311 /*
312 * XLogReadBufferForRedoExtended
313 * Like XLogReadBufferForRedo, but with extra options.
314 *
315 * In RBM_ZERO_* modes, if the page doesn't exist, the relation is extended
316 * with all-zeroes pages up to the referenced block number. In
317 * RBM_ZERO_AND_LOCK and RBM_ZERO_AND_CLEANUP_LOCK modes, the return value
318 * is always BLK_NEEDS_REDO.
319 *
320 * (The RBM_ZERO_AND_CLEANUP_LOCK mode is redundant with the get_cleanup_lock
321 * parameter. Do not use an inconsistent combination!)
322 *
323 * If 'get_cleanup_lock' is true, a "cleanup lock" is acquired on the buffer
324 * using LockBufferForCleanup(), instead of a regular exclusive lock.
325 */
326 XLogRedoAction
XLogReadBufferForRedoExtended(XLogReaderState * record,uint8 block_id,ReadBufferMode mode,bool get_cleanup_lock,Buffer * buf)327 XLogReadBufferForRedoExtended(XLogReaderState *record,
328 uint8 block_id,
329 ReadBufferMode mode, bool get_cleanup_lock,
330 Buffer *buf)
331 {
332 XLogRecPtr lsn = record->EndRecPtr;
333 RelFileNode rnode;
334 ForkNumber forknum;
335 BlockNumber blkno;
336 Page page;
337 bool zeromode;
338 bool willinit;
339
340 if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
341 {
342 /* Caller specified a bogus block_id */
343 elog(PANIC, "failed to locate backup block with ID %d", block_id);
344 }
345
346 /*
347 * Make sure that if the block is marked with WILL_INIT, the caller is
348 * going to initialize it. And vice versa.
349 */
350 zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
351 willinit = (record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0;
352 if (willinit && !zeromode)
353 elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
354 if (!willinit && zeromode)
355 elog(PANIC, "block to be initialized in redo routine must be marked with WILL_INIT flag in the WAL record");
356
357 /* If it has a full-page image and it should be restored, do it. */
358 if (XLogRecBlockImageApply(record, block_id))
359 {
360 Assert(XLogRecHasBlockImage(record, block_id));
361 *buf = XLogReadBufferExtended(rnode, forknum, blkno,
362 get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK);
363 page = BufferGetPage(*buf);
364 if (!RestoreBlockImage(record, block_id, page))
365 elog(ERROR, "failed to restore block image");
366
367 /*
368 * The page may be uninitialized. If so, we can't set the LSN because
369 * that would corrupt the page.
370 */
371 if (!PageIsNew(page))
372 {
373 PageSetLSN(page, lsn);
374 }
375
376 MarkBufferDirty(*buf);
377
378 /*
379 * At the end of crash recovery the init forks of unlogged relations
380 * are copied, without going through shared buffers. So we need to
381 * force the on-disk state of init forks to always be in sync with the
382 * state in shared buffers.
383 */
384 if (forknum == INIT_FORKNUM)
385 FlushOneBuffer(*buf);
386
387 return BLK_RESTORED;
388 }
389 else
390 {
391 *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode);
392 if (BufferIsValid(*buf))
393 {
394 if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
395 {
396 if (get_cleanup_lock)
397 LockBufferForCleanup(*buf);
398 else
399 LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
400 }
401 if (lsn <= PageGetLSN(BufferGetPage(*buf)))
402 return BLK_DONE;
403 else
404 return BLK_NEEDS_REDO;
405 }
406 else
407 return BLK_NOTFOUND;
408 }
409 }
410
411 /*
412 * XLogReadBufferExtended
413 * Read a page during XLOG replay
414 *
415 * This is functionally comparable to ReadBufferExtended. There's some
416 * differences in the behavior wrt. the "mode" argument:
417 *
418 * In RBM_NORMAL mode, if the page doesn't exist, or contains all-zeroes, we
419 * return InvalidBuffer. In this case the caller should silently skip the
420 * update on this page. (In this situation, we expect that the page was later
421 * dropped or truncated. If we don't see evidence of that later in the WAL
422 * sequence, we'll complain at the end of WAL replay.)
423 *
424 * In RBM_ZERO_* modes, if the page doesn't exist, the relation is extended
425 * with all-zeroes pages up to the given block number.
426 *
427 * In RBM_NORMAL_NO_LOG mode, we return InvalidBuffer if the page doesn't
428 * exist, and we don't check for all-zeroes. Thus, no log entry is made
429 * to imply that the page should be dropped or truncated later.
430 *
431 * NB: A redo function should normally not call this directly. To get a page
432 * to modify, use XLogReadBufferForRedoExtended instead. It is important that
433 * all pages modified by a WAL record are registered in the WAL records, or
434 * they will be invisible to tools that that need to know which pages are
435 * modified.
436 */
437 Buffer
XLogReadBufferExtended(RelFileNode rnode,ForkNumber forknum,BlockNumber blkno,ReadBufferMode mode)438 XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
439 BlockNumber blkno, ReadBufferMode mode)
440 {
441 BlockNumber lastblock;
442 Buffer buffer;
443 SMgrRelation smgr;
444
445 Assert(blkno != P_NEW);
446
447 /* Open the relation at smgr level */
448 smgr = smgropen(rnode, InvalidBackendId);
449
450 /*
451 * Create the target file if it doesn't already exist. This lets us cope
452 * if the replay sequence contains writes to a relation that is later
453 * deleted. (The original coding of this routine would instead suppress
454 * the writes, but that seems like it risks losing valuable data if the
455 * filesystem loses an inode during a crash. Better to write the data
456 * until we are actually told to delete the file.)
457 */
458 smgrcreate(smgr, forknum, true);
459
460 lastblock = smgrnblocks(smgr, forknum);
461
462 if (blkno < lastblock)
463 {
464 /* page exists in file */
465 buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
466 mode, NULL);
467 }
468 else
469 {
470 /* hm, page doesn't exist in file */
471 if (mode == RBM_NORMAL)
472 {
473 log_invalid_page(rnode, forknum, blkno, false);
474 return InvalidBuffer;
475 }
476 if (mode == RBM_NORMAL_NO_LOG)
477 return InvalidBuffer;
478 /* OK to extend the file */
479 /* we do this in recovery only - no rel-extension lock needed */
480 Assert(InRecovery);
481 buffer = InvalidBuffer;
482 do
483 {
484 if (buffer != InvalidBuffer)
485 {
486 if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
487 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
488 ReleaseBuffer(buffer);
489 }
490 buffer = ReadBufferWithoutRelcache(rnode, forknum,
491 P_NEW, mode, NULL);
492 }
493 while (BufferGetBlockNumber(buffer) < blkno);
494 /* Handle the corner case that P_NEW returns non-consecutive pages */
495 if (BufferGetBlockNumber(buffer) != blkno)
496 {
497 if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
498 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
499 ReleaseBuffer(buffer);
500 buffer = ReadBufferWithoutRelcache(rnode, forknum, blkno,
501 mode, NULL);
502 }
503 }
504
505 if (mode == RBM_NORMAL)
506 {
507 /* check that page has been initialized */
508 Page page = (Page) BufferGetPage(buffer);
509
510 /*
511 * We assume that PageIsNew is safe without a lock. During recovery,
512 * there should be no other backends that could modify the buffer at
513 * the same time.
514 */
515 if (PageIsNew(page))
516 {
517 ReleaseBuffer(buffer);
518 log_invalid_page(rnode, forknum, blkno, true);
519 return InvalidBuffer;
520 }
521 }
522
523 return buffer;
524 }
525
526 /*
527 * Struct actually returned by XLogFakeRelcacheEntry, though the declared
528 * return type is Relation.
529 */
530 typedef struct
531 {
532 RelationData reldata; /* Note: this must be first */
533 FormData_pg_class pgc;
534 } FakeRelCacheEntryData;
535
536 typedef FakeRelCacheEntryData *FakeRelCacheEntry;
537
538 /*
539 * Create a fake relation cache entry for a physical relation
540 *
541 * It's often convenient to use the same functions in XLOG replay as in the
542 * main codepath, but those functions typically work with a relcache entry.
543 * We don't have a working relation cache during XLOG replay, but this
544 * function can be used to create a fake relcache entry instead. Only the
545 * fields related to physical storage, like rd_rel, are initialized, so the
546 * fake entry is only usable in low-level operations like ReadBuffer().
547 *
548 * Caller must free the returned entry with FreeFakeRelcacheEntry().
549 */
550 Relation
CreateFakeRelcacheEntry(RelFileNode rnode)551 CreateFakeRelcacheEntry(RelFileNode rnode)
552 {
553 FakeRelCacheEntry fakeentry;
554 Relation rel;
555
556 Assert(InRecovery);
557
558 /* Allocate the Relation struct and all related space in one block. */
559 fakeentry = palloc0(sizeof(FakeRelCacheEntryData));
560 rel = (Relation) fakeentry;
561
562 rel->rd_rel = &fakeentry->pgc;
563 rel->rd_node = rnode;
564 /* We will never be working with temp rels during recovery */
565 rel->rd_backend = InvalidBackendId;
566
567 /* It must be a permanent table if we're in recovery. */
568 rel->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
569
570 /* We don't know the name of the relation; use relfilenode instead */
571 sprintf(RelationGetRelationName(rel), "%u", rnode.relNode);
572
573 /*
574 * We set up the lockRelId in case anything tries to lock the dummy
575 * relation. Note that this is fairly bogus since relNode may be
576 * different from the relation's OID. It shouldn't really matter though,
577 * since we are presumably running by ourselves and can't have any lock
578 * conflicts ...
579 */
580 rel->rd_lockInfo.lockRelId.dbId = rnode.dbNode;
581 rel->rd_lockInfo.lockRelId.relId = rnode.relNode;
582
583 rel->rd_smgr = NULL;
584
585 return rel;
586 }
587
588 /*
589 * Free a fake relation cache entry.
590 */
591 void
FreeFakeRelcacheEntry(Relation fakerel)592 FreeFakeRelcacheEntry(Relation fakerel)
593 {
594 /* make sure the fakerel is not referenced by the SmgrRelation anymore */
595 if (fakerel->rd_smgr != NULL)
596 smgrclearowner(&fakerel->rd_smgr, fakerel->rd_smgr);
597 pfree(fakerel);
598 }
599
600 /*
601 * Drop a relation during XLOG replay
602 *
603 * This is called when the relation is about to be deleted; we need to remove
604 * any open "invalid-page" records for the relation.
605 */
606 void
XLogDropRelation(RelFileNode rnode,ForkNumber forknum)607 XLogDropRelation(RelFileNode rnode, ForkNumber forknum)
608 {
609 forget_invalid_pages(rnode, forknum, 0);
610 }
611
612 /*
613 * Drop a whole database during XLOG replay
614 *
615 * As above, but for DROP DATABASE instead of dropping a single rel
616 */
617 void
XLogDropDatabase(Oid dbid)618 XLogDropDatabase(Oid dbid)
619 {
620 /*
621 * This is unnecessarily heavy-handed, as it will close SMgrRelation
622 * objects for other databases as well. DROP DATABASE occurs seldom enough
623 * that it's not worth introducing a variant of smgrclose for just this
624 * purpose. XXX: Or should we rather leave the smgr entries dangling?
625 */
626 smgrcloseall();
627
628 forget_invalid_pages_db(dbid);
629 }
630
631 /*
632 * Truncate a relation during XLOG replay
633 *
634 * We need to clean up any open "invalid-page" records for the dropped pages.
635 */
636 void
XLogTruncateRelation(RelFileNode rnode,ForkNumber forkNum,BlockNumber nblocks)637 XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
638 BlockNumber nblocks)
639 {
640 forget_invalid_pages(rnode, forkNum, nblocks);
641 }
642
643 /*
644 * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
645 * in timeline 'tli'.
646 *
647 * Will open, and keep open, one WAL segment stored in the static file
648 * descriptor 'sendFile'. This means if XLogRead is used once, there will
649 * always be one descriptor left open until the process ends, but never
650 * more than one.
651 *
652 * XXX This is very similar to pg_waldump's XLogDumpXLogRead and to XLogRead
653 * in walsender.c but for small differences (such as lack of elog() in
654 * frontend). Probably these should be merged at some point.
655 */
656 static void
XLogRead(char * buf,TimeLineID tli,XLogRecPtr startptr,Size count)657 XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
658 {
659 char *p;
660 XLogRecPtr recptr;
661 Size nbytes;
662
663 /* state maintained across calls */
664 static int sendFile = -1;
665 static XLogSegNo sendSegNo = 0;
666 static TimeLineID sendTLI = 0;
667 static uint32 sendOff = 0;
668
669 p = buf;
670 recptr = startptr;
671 nbytes = count;
672
673 while (nbytes > 0)
674 {
675 uint32 startoff;
676 int segbytes;
677 int readbytes;
678
679 startoff = recptr % XLogSegSize;
680
681 /* Do we need to switch to a different xlog segment? */
682 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) ||
683 sendTLI != tli)
684 {
685 char path[MAXPGPATH];
686
687 if (sendFile >= 0)
688 close(sendFile);
689
690 XLByteToSeg(recptr, sendSegNo);
691
692 XLogFilePath(path, tli, sendSegNo);
693
694 sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
695
696 if (sendFile < 0)
697 {
698 if (errno == ENOENT)
699 ereport(ERROR,
700 (errcode_for_file_access(),
701 errmsg("requested WAL segment %s has already been removed",
702 path)));
703 else
704 ereport(ERROR,
705 (errcode_for_file_access(),
706 errmsg("could not open file \"%s\": %m",
707 path)));
708 }
709 sendOff = 0;
710 sendTLI = tli;
711 }
712
713 /* Need to seek in the file? */
714 if (sendOff != startoff)
715 {
716 if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
717 {
718 char path[MAXPGPATH];
719 int save_errno = errno;
720
721 XLogFilePath(path, tli, sendSegNo);
722
723 errno = save_errno;
724 ereport(ERROR,
725 (errcode_for_file_access(),
726 errmsg("could not seek in log segment %s to offset %u: %m",
727 path, startoff)));
728 }
729 sendOff = startoff;
730 }
731
732 /* How many bytes are within this segment? */
733 if (nbytes > (XLogSegSize - startoff))
734 segbytes = XLogSegSize - startoff;
735 else
736 segbytes = nbytes;
737
738 pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
739 readbytes = read(sendFile, p, segbytes);
740 pgstat_report_wait_end();
741 if (readbytes <= 0)
742 {
743 char path[MAXPGPATH];
744 int save_errno = errno;
745
746 XLogFilePath(path, tli, sendSegNo);
747
748 errno = save_errno;
749 ereport(ERROR,
750 (errcode_for_file_access(),
751 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
752 path, sendOff, (unsigned long) segbytes)));
753 }
754
755 /* Update state for read */
756 recptr += readbytes;
757
758 sendOff += readbytes;
759 nbytes -= readbytes;
760 p += readbytes;
761 }
762 }
763
764 /*
765 * Determine which timeline to read an xlog page from and set the
766 * XLogReaderState's currTLI to that timeline ID.
767 *
768 * We care about timelines in xlogreader when we might be reading xlog
769 * generated prior to a promotion, either if we're currently a standby in
770 * recovery or if we're a promoted master reading xlogs generated by the old
771 * master before our promotion.
772 *
773 * wantPage must be set to the start address of the page to read and
774 * wantLength to the amount of the page that will be read, up to
775 * XLOG_BLCKSZ. If the amount to be read isn't known, pass XLOG_BLCKSZ.
776 *
777 * We switch to an xlog segment from the new timeline eagerly when on a
778 * historical timeline, as soon as we reach the start of the xlog segment
779 * containing the timeline switch. The server copied the segment to the new
780 * timeline so all the data up to the switch point is the same, but there's no
781 * guarantee the old segment will still exist. It may have been deleted or
782 * renamed with a .partial suffix so we can't necessarily keep reading from
783 * the old TLI even though tliSwitchPoint says it's OK.
784 *
785 * We can't just check the timeline when we read a page on a different segment
786 * to the last page. We could've received a timeline switch from a cascading
787 * upstream, so the current segment ends abruptly (possibly getting renamed to
788 * .partial) and we have to switch to a new one. Even in the middle of reading
789 * a page we could have to dump the cached page and switch to a new TLI.
790 *
791 * Because of this, callers MAY NOT assume that currTLI is the timeline that
792 * will be in a page's xlp_tli; the page may begin on an older timeline or we
793 * might be reading from historical timeline data on a segment that's been
794 * copied to a new timeline.
795 *
796 * The caller must also make sure it doesn't read past the current replay
797 * position (using GetWalRcvWriteRecPtr) if executing in recovery, so it
798 * doesn't fail to notice that the current timeline became historical. The
799 * caller must also update ThisTimeLineID with the result of
800 * GetWalRcvWriteRecPtr and must check RecoveryInProgress().
801 */
802 void
XLogReadDetermineTimeline(XLogReaderState * state,XLogRecPtr wantPage,uint32 wantLength)803 XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
804 {
805 const XLogRecPtr lastReadPage = state->readSegNo * XLogSegSize + state->readOff;
806
807 Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0);
808 Assert(wantLength <= XLOG_BLCKSZ);
809 Assert(state->readLen == 0 || state->readLen <= XLOG_BLCKSZ);
810
811 /*
812 * If the desired page is currently read in and valid, we have nothing to
813 * do.
814 *
815 * The caller should've ensured that it didn't previously advance readOff
816 * past the valid limit of this timeline, so it doesn't matter if the
817 * current TLI has since become historical.
818 */
819 if (lastReadPage == wantPage &&
820 state->readLen != 0 &&
821 lastReadPage + state->readLen >= wantPage + Min(wantLength, XLOG_BLCKSZ - 1))
822 return;
823
824 /*
825 * If we're reading from the current timeline, it hasn't become historical
826 * and the page we're reading is after the last page read, we can again
827 * just carry on. (Seeking backwards requires a check to make sure the
828 * older page isn't on a prior timeline).
829 *
830 * ThisTimeLineID might've become historical since we last looked, but the
831 * caller is required not to read past the flush limit it saw at the time
832 * it looked up the timeline. There's nothing we can do about it if
833 * StartupXLOG() renames it to .partial concurrently.
834 */
835 if (state->currTLI == ThisTimeLineID && wantPage >= lastReadPage)
836 {
837 Assert(state->currTLIValidUntil == InvalidXLogRecPtr);
838 return;
839 }
840
841 /*
842 * If we're just reading pages from a previously validated historical
843 * timeline and the timeline we're reading from is valid until the end of
844 * the current segment we can just keep reading.
845 */
846 if (state->currTLIValidUntil != InvalidXLogRecPtr &&
847 state->currTLI != ThisTimeLineID &&
848 state->currTLI != 0 &&
849 (wantPage + wantLength) / XLogSegSize < state->currTLIValidUntil / XLogSegSize)
850 return;
851
852 /*
853 * If we reach this point we're either looking up a page for random
854 * access, the current timeline just became historical, or we're reading
855 * from a new segment containing a timeline switch. In all cases we need
856 * to determine the newest timeline on the segment.
857 *
858 * If it's the current timeline we can just keep reading from here unless
859 * we detect a timeline switch that makes the current timeline historical.
860 * If it's a historical timeline we can read all the segment on the newest
861 * timeline because it contains all the old timelines' data too. So only
862 * one switch check is required.
863 */
864 {
865 /*
866 * We need to re-read the timeline history in case it's been changed
867 * by a promotion or replay from a cascaded replica.
868 */
869 List *timelineHistory = readTimeLineHistory(ThisTimeLineID);
870
871 XLogRecPtr endOfSegment = (((wantPage / XLogSegSize) + 1) * XLogSegSize) - 1;
872
873 Assert(wantPage / XLogSegSize == endOfSegment / XLogSegSize);
874
875 /*
876 * Find the timeline of the last LSN on the segment containing
877 * wantPage.
878 */
879 state->currTLI = tliOfPointInHistory(endOfSegment, timelineHistory);
880 state->currTLIValidUntil = tliSwitchPoint(state->currTLI, timelineHistory,
881 &state->nextTLI);
882
883 Assert(state->currTLIValidUntil == InvalidXLogRecPtr ||
884 wantPage + wantLength < state->currTLIValidUntil);
885
886 list_free_deep(timelineHistory);
887
888 elog(DEBUG3, "switched to timeline %u valid until %X/%X",
889 state->currTLI,
890 (uint32) (state->currTLIValidUntil >> 32),
891 (uint32) (state->currTLIValidUntil));
892 }
893 }
894
895 /*
896 * read_page callback for reading local xlog files
897 *
898 * Public because it would likely be very helpful for someone writing another
899 * output method outside walsender, e.g. in a bgworker.
900 *
901 * TODO: The walsender has its own version of this, but it relies on the
902 * walsender's latch being set whenever WAL is flushed. No such infrastructure
903 * exists for normal backends, so we have to do a check/sleep/repeat style of
904 * loop for now.
905 */
906 int
read_local_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page,TimeLineID * pageTLI)907 read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
908 int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
909 TimeLineID *pageTLI)
910 {
911 XLogRecPtr read_upto,
912 loc;
913 int count;
914
915 loc = targetPagePtr + reqLen;
916
917 /* Loop waiting for xlog to be available if necessary */
918 while (1)
919 {
920 /*
921 * Determine the limit of xlog we can currently read to, and what the
922 * most recent timeline is.
923 *
924 * RecoveryInProgress() will update ThisTimeLineID when it first
925 * notices recovery finishes, so we only have to maintain it for the
926 * local process until recovery ends.
927 */
928 if (!RecoveryInProgress())
929 read_upto = GetFlushRecPtr();
930 else
931 read_upto = GetXLogReplayRecPtr(&ThisTimeLineID);
932
933 *pageTLI = ThisTimeLineID;
934
935 /*
936 * Check which timeline to get the record from.
937 *
938 * We have to do it each time through the loop because if we're in
939 * recovery as a cascading standby, the current timeline might've
940 * become historical. We can't rely on RecoveryInProgress() because in
941 * a standby configuration like
942 *
943 * A => B => C
944 *
945 * if we're a logical decoding session on C, and B gets promoted, our
946 * timeline will change while we remain in recovery.
947 *
948 * We can't just keep reading from the old timeline as the last WAL
949 * archive in the timeline will get renamed to .partial by
950 * StartupXLOG().
951 *
952 * If that happens after our caller updated ThisTimeLineID but before
953 * we actually read the xlog page, we might still try to read from the
954 * old (now renamed) segment and fail. There's not much we can do
955 * about this, but it can only happen when we're a leaf of a cascading
956 * standby whose master gets promoted while we're decoding, so a
957 * one-off ERROR isn't too bad.
958 */
959 XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
960
961 if (state->currTLI == ThisTimeLineID)
962 {
963
964 if (loc <= read_upto)
965 break;
966
967 CHECK_FOR_INTERRUPTS();
968 pg_usleep(1000L);
969 }
970 else
971 {
972 /*
973 * We're on a historical timeline, so limit reading to the switch
974 * point where we moved to the next timeline.
975 *
976 * We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
977 * about the new timeline, so we must've received past the end of
978 * it.
979 */
980 read_upto = state->currTLIValidUntil;
981
982 /*
983 * Setting pageTLI to our wanted record's TLI is slightly wrong;
984 * the page might begin on an older timeline if it contains a
985 * timeline switch, since its xlog segment will have been copied
986 * from the prior timeline. This is pretty harmless though, as
987 * nothing cares so long as the timeline doesn't go backwards. We
988 * should read the page header instead; FIXME someday.
989 */
990 *pageTLI = state->currTLI;
991
992 /* No need to wait on a historical timeline */
993 break;
994 }
995 }
996
997 if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
998 {
999 /*
1000 * more than one block available; read only that block, have caller
1001 * come back if they need more.
1002 */
1003 count = XLOG_BLCKSZ;
1004 }
1005 else if (targetPagePtr + reqLen > read_upto)
1006 {
1007 /* not enough data there */
1008 return -1;
1009 }
1010 else
1011 {
1012 /* enough bytes available to satisfy the request */
1013 count = read_upto - targetPagePtr;
1014 }
1015
1016 /*
1017 * Even though we just determined how much of the page can be validly read
1018 * as 'count', read the whole page anyway. It's guaranteed to be
1019 * zero-padded up to the page boundary if it's incomplete.
1020 */
1021 XLogRead(cur_page, *pageTLI, targetPagePtr, XLOG_BLCKSZ);
1022
1023 /* number of valid bytes in the buffer */
1024 return count;
1025 }
1026