1 /*-------------------------------------------------------------------------
2 *
3 * xlogreader.c
4 * Generic XLog reading facility
5 *
6 * Portions Copyright (c) 2013-2021, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogreader.c
10 *
11 * NOTES
12 * See xlogreader.h for more notes on this facility.
13 *
14 * This file is compiled as both front-end and backend code, so it
15 * may not use ereport, server-defined static variables, etc.
16 *-------------------------------------------------------------------------
17 */
18 #include "postgres.h"
19
20 #include <unistd.h>
21
22 #include "access/transam.h"
23 #include "access/xlog_internal.h"
24 #include "access/xlogreader.h"
25 #include "access/xlogrecord.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "replication/origin.h"
29
30 #ifndef FRONTEND
31 #include "miscadmin.h"
32 #include "pgstat.h"
33 #include "utils/memutils.h"
34 #endif
35
36 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
37 pg_attribute_printf(2, 3);
38 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
39 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
40 int reqLen);
41 static void XLogReaderInvalReadState(XLogReaderState *state);
42 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
43 XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
44 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
45 XLogRecPtr recptr);
46 static void ResetDecoder(XLogReaderState *state);
47 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
48 int segsize, const char *waldir);
49
50 /* size of the buffer allocated for error message. */
51 #define MAX_ERRORMSG_LEN 1000
52
53 /*
54 * Construct a string in state->errormsg_buf explaining what's wrong with
55 * the current record being read.
56 */
57 static void
report_invalid_record(XLogReaderState * state,const char * fmt,...)58 report_invalid_record(XLogReaderState *state, const char *fmt,...)
59 {
60 va_list args;
61
62 fmt = _(fmt);
63
64 va_start(args, fmt);
65 vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
66 va_end(args);
67 }
68
69 /*
70 * Allocate and initialize a new XLogReader.
71 *
72 * Returns NULL if the xlogreader couldn't be allocated.
73 */
74 XLogReaderState *
XLogReaderAllocate(int wal_segment_size,const char * waldir,XLogReaderRoutine * routine,void * private_data)75 XLogReaderAllocate(int wal_segment_size, const char *waldir,
76 XLogReaderRoutine *routine, void *private_data)
77 {
78 XLogReaderState *state;
79
80 state = (XLogReaderState *)
81 palloc_extended(sizeof(XLogReaderState),
82 MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
83 if (!state)
84 return NULL;
85
86 /* initialize caller-provided support functions */
87 state->routine = *routine;
88
89 state->max_block_id = -1;
90
91 /*
92 * Permanently allocate readBuf. We do it this way, rather than just
93 * making a static array, for two reasons: (1) no need to waste the
94 * storage in most instantiations of the backend; (2) a static char array
95 * isn't guaranteed to have any particular alignment, whereas
96 * palloc_extended() will provide MAXALIGN'd storage.
97 */
98 state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
99 MCXT_ALLOC_NO_OOM);
100 if (!state->readBuf)
101 {
102 pfree(state);
103 return NULL;
104 }
105
106 /* Initialize segment info. */
107 WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
108 waldir);
109
110 /* system_identifier initialized to zeroes above */
111 state->private_data = private_data;
112 /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
113 state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
114 MCXT_ALLOC_NO_OOM);
115 if (!state->errormsg_buf)
116 {
117 pfree(state->readBuf);
118 pfree(state);
119 return NULL;
120 }
121 state->errormsg_buf[0] = '\0';
122
123 /*
124 * Allocate an initial readRecordBuf of minimal size, which can later be
125 * enlarged if necessary.
126 */
127 if (!allocate_recordbuf(state, 0))
128 {
129 pfree(state->errormsg_buf);
130 pfree(state->readBuf);
131 pfree(state);
132 return NULL;
133 }
134
135 return state;
136 }
137
138 void
XLogReaderFree(XLogReaderState * state)139 XLogReaderFree(XLogReaderState *state)
140 {
141 int block_id;
142
143 if (state->seg.ws_file != -1)
144 state->routine.segment_close(state);
145
146 for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
147 {
148 if (state->blocks[block_id].data)
149 pfree(state->blocks[block_id].data);
150 }
151 if (state->main_data)
152 pfree(state->main_data);
153
154 pfree(state->errormsg_buf);
155 if (state->readRecordBuf)
156 pfree(state->readRecordBuf);
157 pfree(state->readBuf);
158 pfree(state);
159 }
160
161 /*
162 * Allocate readRecordBuf to fit a record of at least the given length.
163 * Returns true if successful, false if out of memory.
164 *
165 * readRecordBufSize is set to the new buffer size.
166 *
167 * To avoid useless small increases, round its size to a multiple of
168 * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
169 * with. (That is enough for all "normal" records, but very large commit or
170 * abort records might need more space.)
171 */
172 static bool
allocate_recordbuf(XLogReaderState * state,uint32 reclength)173 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
174 {
175 uint32 newSize = reclength;
176
177 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
178 newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
179
180 #ifndef FRONTEND
181
182 /*
183 * Note that in much unlucky circumstances, the random data read from a
184 * recycled segment can cause this routine to be called with a size
185 * causing a hard failure at allocation. For a standby, this would cause
186 * the instance to stop suddenly with a hard failure, preventing it to
187 * retry fetching WAL from one of its sources which could allow it to move
188 * on with replay without a manual restart. If the data comes from a past
189 * recycled segment and is still valid, then the allocation may succeed
190 * but record checks are going to fail so this would be short-lived. If
191 * the allocation fails because of a memory shortage, then this is not a
192 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
193 */
194 if (!AllocSizeIsValid(newSize))
195 return false;
196
197 #endif
198
199 if (state->readRecordBuf)
200 pfree(state->readRecordBuf);
201 state->readRecordBuf =
202 (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
203 if (state->readRecordBuf == NULL)
204 {
205 state->readRecordBufSize = 0;
206 return false;
207 }
208 state->readRecordBufSize = newSize;
209 return true;
210 }
211
212 /*
213 * Initialize the passed segment structs.
214 */
215 static void
WALOpenSegmentInit(WALOpenSegment * seg,WALSegmentContext * segcxt,int segsize,const char * waldir)216 WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
217 int segsize, const char *waldir)
218 {
219 seg->ws_file = -1;
220 seg->ws_segno = 0;
221 seg->ws_tli = 0;
222
223 segcxt->ws_segsize = segsize;
224 if (waldir)
225 snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
226 }
227
228 /*
229 * Begin reading WAL at 'RecPtr'.
230 *
231 * 'RecPtr' should point to the beginnning of a valid WAL record. Pointing at
232 * the beginning of a page is also OK, if there is a new record right after
233 * the page header, i.e. not a continuation.
234 *
235 * This does not make any attempt to read the WAL yet, and hence cannot fail.
236 * If the starting address is not correct, the first call to XLogReadRecord()
237 * will error out.
238 */
239 void
XLogBeginRead(XLogReaderState * state,XLogRecPtr RecPtr)240 XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
241 {
242 Assert(!XLogRecPtrIsInvalid(RecPtr));
243
244 ResetDecoder(state);
245
246 /* Begin at the passed-in record pointer. */
247 state->EndRecPtr = RecPtr;
248 state->ReadRecPtr = InvalidXLogRecPtr;
249 }
250
251 /*
252 * Attempt to read an XLOG record.
253 *
254 * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
255 * to XLogReadRecord().
256 *
257 * If the page_read callback fails to read the requested data, NULL is
258 * returned. The callback is expected to have reported the error; errormsg
259 * is set to NULL.
260 *
261 * If the reading fails for some other reason, NULL is also returned, and
262 * *errormsg is set to a string with details of the failure.
263 *
264 * The returned pointer (or *errormsg) points to an internal buffer that's
265 * valid until the next call to XLogReadRecord.
266 */
267 XLogRecord *
XLogReadRecord(XLogReaderState * state,char ** errormsg)268 XLogReadRecord(XLogReaderState *state, char **errormsg)
269 {
270 XLogRecPtr RecPtr;
271 XLogRecord *record;
272 XLogRecPtr targetPagePtr;
273 bool randAccess;
274 uint32 len,
275 total_len;
276 uint32 targetRecOff;
277 uint32 pageHeaderSize;
278 bool assembled;
279 bool gotheader;
280 int readOff;
281
282 /*
283 * randAccess indicates whether to verify the previous-record pointer of
284 * the record we're reading. We only do this if we're reading
285 * sequentially, which is what we initially assume.
286 */
287 randAccess = false;
288
289 /* reset error state */
290 *errormsg = NULL;
291 state->errormsg_buf[0] = '\0';
292
293 ResetDecoder(state);
294 state->abortedRecPtr = InvalidXLogRecPtr;
295 state->missingContrecPtr = InvalidXLogRecPtr;
296
297 RecPtr = state->EndRecPtr;
298
299 if (state->ReadRecPtr != InvalidXLogRecPtr)
300 {
301 /* read the record after the one we just read */
302
303 /*
304 * EndRecPtr is pointing to end+1 of the previous WAL record. If
305 * we're at a page boundary, no more records can fit on the current
306 * page. We must skip over the page header, but we can't do that until
307 * we've read in the page, since the header size is variable.
308 */
309 }
310 else
311 {
312 /*
313 * Caller supplied a position to start at.
314 *
315 * In this case, EndRecPtr should already be pointing to a valid
316 * record starting position.
317 */
318 Assert(XRecOffIsValid(RecPtr));
319 randAccess = true;
320 }
321
322 restart:
323 state->currRecPtr = RecPtr;
324 assembled = false;
325
326 targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
327 targetRecOff = RecPtr % XLOG_BLCKSZ;
328
329 /*
330 * Read the page containing the record into state->readBuf. Request enough
331 * byte to cover the whole record header, or at least the part of it that
332 * fits on the same page.
333 */
334 readOff = ReadPageInternal(state, targetPagePtr,
335 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
336 if (readOff < 0)
337 goto err;
338
339 /*
340 * ReadPageInternal always returns at least the page header, so we can
341 * examine it now.
342 */
343 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
344 if (targetRecOff == 0)
345 {
346 /*
347 * At page start, so skip over page header.
348 */
349 RecPtr += pageHeaderSize;
350 targetRecOff = pageHeaderSize;
351 }
352 else if (targetRecOff < pageHeaderSize)
353 {
354 report_invalid_record(state, "invalid record offset at %X/%X",
355 LSN_FORMAT_ARGS(RecPtr));
356 goto err;
357 }
358
359 if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
360 targetRecOff == pageHeaderSize)
361 {
362 report_invalid_record(state, "contrecord is requested by %X/%X",
363 LSN_FORMAT_ARGS(RecPtr));
364 goto err;
365 }
366
367 /* ReadPageInternal has verified the page header */
368 Assert(pageHeaderSize <= readOff);
369
370 /*
371 * Read the record length.
372 *
373 * NB: Even though we use an XLogRecord pointer here, the whole record
374 * header might not fit on this page. xl_tot_len is the first field of the
375 * struct, so it must be on this page (the records are MAXALIGNed), but we
376 * cannot access any other fields until we've verified that we got the
377 * whole header.
378 */
379 record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
380 total_len = record->xl_tot_len;
381
382 /*
383 * If the whole record header is on this page, validate it immediately.
384 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
385 * rest of the header after reading it from the next page. The xl_tot_len
386 * check is necessary here to ensure that we enter the "Need to reassemble
387 * record" code path below; otherwise we might fail to apply
388 * ValidXLogRecordHeader at all.
389 */
390 if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
391 {
392 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
393 randAccess))
394 goto err;
395 gotheader = true;
396 }
397 else
398 {
399 /* XXX: more validation should be done here */
400 if (total_len < SizeOfXLogRecord)
401 {
402 report_invalid_record(state,
403 "invalid record length at %X/%X: wanted %u, got %u",
404 LSN_FORMAT_ARGS(RecPtr),
405 (uint32) SizeOfXLogRecord, total_len);
406 goto err;
407 }
408 gotheader = false;
409 }
410
411 len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
412 if (total_len > len)
413 {
414 /* Need to reassemble record */
415 char *contdata;
416 XLogPageHeader pageHeader;
417 char *buffer;
418 uint32 gotlen;
419
420 assembled = true;
421
422 /*
423 * Enlarge readRecordBuf as needed.
424 */
425 if (total_len > state->readRecordBufSize &&
426 !allocate_recordbuf(state, total_len))
427 {
428 /* We treat this as a "bogus data" condition */
429 report_invalid_record(state, "record length %u at %X/%X too long",
430 total_len, LSN_FORMAT_ARGS(RecPtr));
431 goto err;
432 }
433
434 /* Copy the first fragment of the record from the first page. */
435 memcpy(state->readRecordBuf,
436 state->readBuf + RecPtr % XLOG_BLCKSZ, len);
437 buffer = state->readRecordBuf + len;
438 gotlen = len;
439
440 do
441 {
442 /* Calculate pointer to beginning of next page */
443 targetPagePtr += XLOG_BLCKSZ;
444
445 /* Wait for the next page to become available */
446 readOff = ReadPageInternal(state, targetPagePtr,
447 Min(total_len - gotlen + SizeOfXLogShortPHD,
448 XLOG_BLCKSZ));
449
450 if (readOff < 0)
451 goto err;
452
453 Assert(SizeOfXLogShortPHD <= readOff);
454
455 pageHeader = (XLogPageHeader) state->readBuf;
456
457 /*
458 * If we were expecting a continuation record and got an
459 * "overwrite contrecord" flag, that means the continuation record
460 * was overwritten with a different record. Restart the read by
461 * assuming the address to read is the location where we found
462 * this flag; but keep track of the LSN of the record we were
463 * reading, for later verification.
464 */
465 if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
466 {
467 state->overwrittenRecPtr = state->currRecPtr;
468 ResetDecoder(state);
469 RecPtr = targetPagePtr;
470 goto restart;
471 }
472
473 /* Check that the continuation on next page looks valid */
474 if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
475 {
476 report_invalid_record(state,
477 "there is no contrecord flag at %X/%X",
478 LSN_FORMAT_ARGS(RecPtr));
479 goto err;
480 }
481
482 /*
483 * Cross-check that xlp_rem_len agrees with how much of the record
484 * we expect there to be left.
485 */
486 if (pageHeader->xlp_rem_len == 0 ||
487 total_len != (pageHeader->xlp_rem_len + gotlen))
488 {
489 report_invalid_record(state,
490 "invalid contrecord length %u (expected %lld) at %X/%X",
491 pageHeader->xlp_rem_len,
492 ((long long) total_len) - gotlen,
493 LSN_FORMAT_ARGS(RecPtr));
494 goto err;
495 }
496
497 /* Append the continuation from this page to the buffer */
498 pageHeaderSize = XLogPageHeaderSize(pageHeader);
499
500 if (readOff < pageHeaderSize)
501 readOff = ReadPageInternal(state, targetPagePtr,
502 pageHeaderSize);
503
504 Assert(pageHeaderSize <= readOff);
505
506 contdata = (char *) state->readBuf + pageHeaderSize;
507 len = XLOG_BLCKSZ - pageHeaderSize;
508 if (pageHeader->xlp_rem_len < len)
509 len = pageHeader->xlp_rem_len;
510
511 if (readOff < pageHeaderSize + len)
512 readOff = ReadPageInternal(state, targetPagePtr,
513 pageHeaderSize + len);
514
515 memcpy(buffer, (char *) contdata, len);
516 buffer += len;
517 gotlen += len;
518
519 /* If we just reassembled the record header, validate it. */
520 if (!gotheader)
521 {
522 record = (XLogRecord *) state->readRecordBuf;
523 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
524 record, randAccess))
525 goto err;
526 gotheader = true;
527 }
528 } while (gotlen < total_len);
529
530 Assert(gotheader);
531
532 record = (XLogRecord *) state->readRecordBuf;
533 if (!ValidXLogRecord(state, record, RecPtr))
534 goto err;
535
536 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
537 state->ReadRecPtr = RecPtr;
538 state->EndRecPtr = targetPagePtr + pageHeaderSize
539 + MAXALIGN(pageHeader->xlp_rem_len);
540 }
541 else
542 {
543 /* Wait for the record data to become available */
544 readOff = ReadPageInternal(state, targetPagePtr,
545 Min(targetRecOff + total_len, XLOG_BLCKSZ));
546 if (readOff < 0)
547 goto err;
548
549 /* Record does not cross a page boundary */
550 if (!ValidXLogRecord(state, record, RecPtr))
551 goto err;
552
553 state->EndRecPtr = RecPtr + MAXALIGN(total_len);
554
555 state->ReadRecPtr = RecPtr;
556 }
557
558 /*
559 * Special processing if it's an XLOG SWITCH record
560 */
561 if (record->xl_rmid == RM_XLOG_ID &&
562 (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
563 {
564 /* Pretend it extends to end of segment */
565 state->EndRecPtr += state->segcxt.ws_segsize - 1;
566 state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
567 }
568
569 if (DecodeXLogRecord(state, record, errormsg))
570 return record;
571 else
572 return NULL;
573
574 err:
575 if (assembled)
576 {
577 /*
578 * We get here when a record that spans multiple pages needs to be
579 * assembled, but something went wrong -- perhaps a contrecord piece
580 * was lost. If caller is WAL replay, it will know where the aborted
581 * record was and where to direct followup WAL to be written, marking
582 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
583 * in turn signal downstream WAL consumers that the broken WAL record
584 * is to be ignored.
585 */
586 state->abortedRecPtr = RecPtr;
587 state->missingContrecPtr = targetPagePtr;
588 }
589
590 /*
591 * Invalidate the read state. We might read from a different source after
592 * failure.
593 */
594 XLogReaderInvalReadState(state);
595
596 if (state->errormsg_buf[0] != '\0')
597 *errormsg = state->errormsg_buf;
598
599 return NULL;
600 }
601
602 /*
603 * Read a single xlog page including at least [pageptr, reqLen] of valid data
604 * via the page_read() callback.
605 *
606 * Returns -1 if the required page cannot be read for some reason; errormsg_buf
607 * is set in that case (unless the error occurs in the page_read callback).
608 *
609 * We fetch the page from a reader-local cache if we know we have the required
610 * data and if there hasn't been any error since caching the data.
611 */
612 static int
ReadPageInternal(XLogReaderState * state,XLogRecPtr pageptr,int reqLen)613 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
614 {
615 int readLen;
616 uint32 targetPageOff;
617 XLogSegNo targetSegNo;
618 XLogPageHeader hdr;
619
620 Assert((pageptr % XLOG_BLCKSZ) == 0);
621
622 XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
623 targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
624
625 /* check whether we have all the requested data already */
626 if (targetSegNo == state->seg.ws_segno &&
627 targetPageOff == state->segoff && reqLen <= state->readLen)
628 return state->readLen;
629
630 /*
631 * Data is not in our buffer.
632 *
633 * Every time we actually read the segment, even if we looked at parts of
634 * it before, we need to do verification as the page_read callback might
635 * now be rereading data from a different source.
636 *
637 * Whenever switching to a new WAL segment, we read the first page of the
638 * file and validate its header, even if that's not where the target
639 * record is. This is so that we can check the additional identification
640 * info that is present in the first page's "long" header.
641 */
642 if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
643 {
644 XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
645
646 readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
647 state->currRecPtr,
648 state->readBuf);
649 if (readLen < 0)
650 goto err;
651
652 /* we can be sure to have enough WAL available, we scrolled back */
653 Assert(readLen == XLOG_BLCKSZ);
654
655 if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
656 state->readBuf))
657 goto err;
658 }
659
660 /*
661 * First, read the requested data length, but at least a short page header
662 * so that we can validate it.
663 */
664 readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
665 state->currRecPtr,
666 state->readBuf);
667 if (readLen < 0)
668 goto err;
669
670 Assert(readLen <= XLOG_BLCKSZ);
671
672 /* Do we have enough data to check the header length? */
673 if (readLen <= SizeOfXLogShortPHD)
674 goto err;
675
676 Assert(readLen >= reqLen);
677
678 hdr = (XLogPageHeader) state->readBuf;
679
680 /* still not enough */
681 if (readLen < XLogPageHeaderSize(hdr))
682 {
683 readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
684 state->currRecPtr,
685 state->readBuf);
686 if (readLen < 0)
687 goto err;
688 }
689
690 /*
691 * Now that we know we have the full header, validate it.
692 */
693 if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
694 goto err;
695
696 /* update read state information */
697 state->seg.ws_segno = targetSegNo;
698 state->segoff = targetPageOff;
699 state->readLen = readLen;
700
701 return readLen;
702
703 err:
704 XLogReaderInvalReadState(state);
705 return -1;
706 }
707
708 /*
709 * Invalidate the xlogreader's read state to force a re-read.
710 */
711 static void
XLogReaderInvalReadState(XLogReaderState * state)712 XLogReaderInvalReadState(XLogReaderState *state)
713 {
714 state->seg.ws_segno = 0;
715 state->segoff = 0;
716 state->readLen = 0;
717 }
718
719 /*
720 * Validate an XLOG record header.
721 *
722 * This is just a convenience subroutine to avoid duplicated code in
723 * XLogReadRecord. It's not intended for use from anywhere else.
724 */
725 static bool
ValidXLogRecordHeader(XLogReaderState * state,XLogRecPtr RecPtr,XLogRecPtr PrevRecPtr,XLogRecord * record,bool randAccess)726 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
727 XLogRecPtr PrevRecPtr, XLogRecord *record,
728 bool randAccess)
729 {
730 if (record->xl_tot_len < SizeOfXLogRecord)
731 {
732 report_invalid_record(state,
733 "invalid record length at %X/%X: wanted %u, got %u",
734 LSN_FORMAT_ARGS(RecPtr),
735 (uint32) SizeOfXLogRecord, record->xl_tot_len);
736 return false;
737 }
738 if (record->xl_rmid > RM_MAX_ID)
739 {
740 report_invalid_record(state,
741 "invalid resource manager ID %u at %X/%X",
742 record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
743 return false;
744 }
745 if (randAccess)
746 {
747 /*
748 * We can't exactly verify the prev-link, but surely it should be less
749 * than the record's own address.
750 */
751 if (!(record->xl_prev < RecPtr))
752 {
753 report_invalid_record(state,
754 "record with incorrect prev-link %X/%X at %X/%X",
755 LSN_FORMAT_ARGS(record->xl_prev),
756 LSN_FORMAT_ARGS(RecPtr));
757 return false;
758 }
759 }
760 else
761 {
762 /*
763 * Record's prev-link should exactly match our previous location. This
764 * check guards against torn WAL pages where a stale but valid-looking
765 * WAL record starts on a sector boundary.
766 */
767 if (record->xl_prev != PrevRecPtr)
768 {
769 report_invalid_record(state,
770 "record with incorrect prev-link %X/%X at %X/%X",
771 LSN_FORMAT_ARGS(record->xl_prev),
772 LSN_FORMAT_ARGS(RecPtr));
773 return false;
774 }
775 }
776
777 return true;
778 }
779
780
781 /*
782 * CRC-check an XLOG record. We do not believe the contents of an XLOG
783 * record (other than to the minimal extent of computing the amount of
784 * data to read in) until we've checked the CRCs.
785 *
786 * We assume all of the record (that is, xl_tot_len bytes) has been read
787 * into memory at *record. Also, ValidXLogRecordHeader() has accepted the
788 * record's header, which means in particular that xl_tot_len is at least
789 * SizeOfXLogRecord.
790 */
791 static bool
ValidXLogRecord(XLogReaderState * state,XLogRecord * record,XLogRecPtr recptr)792 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
793 {
794 pg_crc32c crc;
795
796 /* Calculate the CRC */
797 INIT_CRC32C(crc);
798 COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
799 /* include the record header last */
800 COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
801 FIN_CRC32C(crc);
802
803 if (!EQ_CRC32C(record->xl_crc, crc))
804 {
805 report_invalid_record(state,
806 "incorrect resource manager data checksum in record at %X/%X",
807 LSN_FORMAT_ARGS(recptr));
808 return false;
809 }
810
811 return true;
812 }
813
814 /*
815 * Validate a page header.
816 *
817 * Check if 'phdr' is valid as the header of the XLog page at position
818 * 'recptr'.
819 */
820 bool
XLogReaderValidatePageHeader(XLogReaderState * state,XLogRecPtr recptr,char * phdr)821 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
822 char *phdr)
823 {
824 XLogRecPtr recaddr;
825 XLogSegNo segno;
826 int32 offset;
827 XLogPageHeader hdr = (XLogPageHeader) phdr;
828
829 Assert((recptr % XLOG_BLCKSZ) == 0);
830
831 XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
832 offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
833
834 XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
835
836 if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
837 {
838 char fname[MAXFNAMELEN];
839
840 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
841
842 report_invalid_record(state,
843 "invalid magic number %04X in log segment %s, offset %u",
844 hdr->xlp_magic,
845 fname,
846 offset);
847 return false;
848 }
849
850 if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
851 {
852 char fname[MAXFNAMELEN];
853
854 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
855
856 report_invalid_record(state,
857 "invalid info bits %04X in log segment %s, offset %u",
858 hdr->xlp_info,
859 fname,
860 offset);
861 return false;
862 }
863
864 if (hdr->xlp_info & XLP_LONG_HEADER)
865 {
866 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
867
868 if (state->system_identifier &&
869 longhdr->xlp_sysid != state->system_identifier)
870 {
871 report_invalid_record(state,
872 "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
873 (unsigned long long) longhdr->xlp_sysid,
874 (unsigned long long) state->system_identifier);
875 return false;
876 }
877 else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
878 {
879 report_invalid_record(state,
880 "WAL file is from different database system: incorrect segment size in page header");
881 return false;
882 }
883 else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
884 {
885 report_invalid_record(state,
886 "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
887 return false;
888 }
889 }
890 else if (offset == 0)
891 {
892 char fname[MAXFNAMELEN];
893
894 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
895
896 /* hmm, first page of file doesn't have a long header? */
897 report_invalid_record(state,
898 "invalid info bits %04X in log segment %s, offset %u",
899 hdr->xlp_info,
900 fname,
901 offset);
902 return false;
903 }
904
905 /*
906 * Check that the address on the page agrees with what we expected. This
907 * check typically fails when an old WAL segment is recycled, and hasn't
908 * yet been overwritten with new data yet.
909 */
910 if (hdr->xlp_pageaddr != recaddr)
911 {
912 char fname[MAXFNAMELEN];
913
914 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
915
916 report_invalid_record(state,
917 "unexpected pageaddr %X/%X in log segment %s, offset %u",
918 LSN_FORMAT_ARGS(hdr->xlp_pageaddr),
919 fname,
920 offset);
921 return false;
922 }
923
924 /*
925 * Since child timelines are always assigned a TLI greater than their
926 * immediate parent's TLI, we should never see TLI go backwards across
927 * successive pages of a consistent WAL sequence.
928 *
929 * Sometimes we re-read a segment that's already been (partially) read. So
930 * we only verify TLIs for pages that are later than the last remembered
931 * LSN.
932 */
933 if (recptr > state->latestPagePtr)
934 {
935 if (hdr->xlp_tli < state->latestPageTLI)
936 {
937 char fname[MAXFNAMELEN];
938
939 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
940
941 report_invalid_record(state,
942 "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
943 hdr->xlp_tli,
944 state->latestPageTLI,
945 fname,
946 offset);
947 return false;
948 }
949 }
950 state->latestPagePtr = recptr;
951 state->latestPageTLI = hdr->xlp_tli;
952
953 return true;
954 }
955
956 #ifdef FRONTEND
957 /*
958 * Functions that are currently not needed in the backend, but are better
959 * implemented inside xlogreader.c because of the internal facilities available
960 * here.
961 */
962
963 /*
964 * Find the first record with an lsn >= RecPtr.
965 *
966 * This is different from XLogBeginRead() in that RecPtr doesn't need to point
967 * to a valid record boundary. Useful for checking whether RecPtr is a valid
968 * xlog address for reading, and to find the first valid address after some
969 * address when dumping records for debugging purposes.
970 *
971 * This positions the reader, like XLogBeginRead(), so that the next call to
972 * XLogReadRecord() will read the next valid record.
973 */
974 XLogRecPtr
XLogFindNextRecord(XLogReaderState * state,XLogRecPtr RecPtr)975 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
976 {
977 XLogRecPtr tmpRecPtr;
978 XLogRecPtr found = InvalidXLogRecPtr;
979 XLogPageHeader header;
980 char *errormsg;
981
982 Assert(!XLogRecPtrIsInvalid(RecPtr));
983
984 /*
985 * skip over potential continuation data, keeping in mind that it may span
986 * multiple pages
987 */
988 tmpRecPtr = RecPtr;
989 while (true)
990 {
991 XLogRecPtr targetPagePtr;
992 int targetRecOff;
993 uint32 pageHeaderSize;
994 int readLen;
995
996 /*
997 * Compute targetRecOff. It should typically be equal or greater than
998 * short page-header since a valid record can't start anywhere before
999 * that, except when caller has explicitly specified the offset that
1000 * falls somewhere there or when we are skipping multi-page
1001 * continuation record. It doesn't matter though because
1002 * ReadPageInternal() is prepared to handle that and will read at
1003 * least short page-header worth of data
1004 */
1005 targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
1006
1007 /* scroll back to page boundary */
1008 targetPagePtr = tmpRecPtr - targetRecOff;
1009
1010 /* Read the page containing the record */
1011 readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
1012 if (readLen < 0)
1013 goto err;
1014
1015 header = (XLogPageHeader) state->readBuf;
1016
1017 pageHeaderSize = XLogPageHeaderSize(header);
1018
1019 /* make sure we have enough data for the page header */
1020 readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
1021 if (readLen < 0)
1022 goto err;
1023
1024 /* skip over potential continuation data */
1025 if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
1026 {
1027 /*
1028 * If the length of the remaining continuation data is more than
1029 * what can fit in this page, the continuation record crosses over
1030 * this page. Read the next page and try again. xlp_rem_len in the
1031 * next page header will contain the remaining length of the
1032 * continuation data
1033 *
1034 * Note that record headers are MAXALIGN'ed
1035 */
1036 if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
1037 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
1038 else
1039 {
1040 /*
1041 * The previous continuation record ends in this page. Set
1042 * tmpRecPtr to point to the first valid record
1043 */
1044 tmpRecPtr = targetPagePtr + pageHeaderSize
1045 + MAXALIGN(header->xlp_rem_len);
1046 break;
1047 }
1048 }
1049 else
1050 {
1051 tmpRecPtr = targetPagePtr + pageHeaderSize;
1052 break;
1053 }
1054 }
1055
1056 /*
1057 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1058 * because either we're at the first record after the beginning of a page
1059 * or we just jumped over the remaining data of a continuation.
1060 */
1061 XLogBeginRead(state, tmpRecPtr);
1062 while (XLogReadRecord(state, &errormsg) != NULL)
1063 {
1064 /* past the record we've found, break out */
1065 if (RecPtr <= state->ReadRecPtr)
1066 {
1067 /* Rewind the reader to the beginning of the last record. */
1068 found = state->ReadRecPtr;
1069 XLogBeginRead(state, found);
1070 return found;
1071 }
1072 }
1073
1074 err:
1075 XLogReaderInvalReadState(state);
1076
1077 return InvalidXLogRecPtr;
1078 }
1079
1080 #endif /* FRONTEND */
1081
1082 /*
1083 * Helper function to ease writing of XLogRoutine->page_read callbacks.
1084 * If this function is used, caller must supply a segment_open callback in
1085 * 'state', as that is used here.
1086 *
1087 * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
1088 * fetched from timeline 'tli'.
1089 *
1090 * Returns true if succeeded, false if an error occurs, in which case
1091 * 'errinfo' receives error details.
1092 *
1093 * XXX probably this should be improved to suck data directly from the
1094 * WAL buffers when possible.
1095 */
1096 bool
WALRead(XLogReaderState * state,char * buf,XLogRecPtr startptr,Size count,TimeLineID tli,WALReadError * errinfo)1097 WALRead(XLogReaderState *state,
1098 char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
1099 WALReadError *errinfo)
1100 {
1101 char *p;
1102 XLogRecPtr recptr;
1103 Size nbytes;
1104
1105 p = buf;
1106 recptr = startptr;
1107 nbytes = count;
1108
1109 while (nbytes > 0)
1110 {
1111 uint32 startoff;
1112 int segbytes;
1113 int readbytes;
1114
1115 startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
1116
1117 /*
1118 * If the data we want is not in a segment we have open, close what we
1119 * have (if anything) and open the next one, using the caller's
1120 * provided openSegment callback.
1121 */
1122 if (state->seg.ws_file < 0 ||
1123 !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
1124 tli != state->seg.ws_tli)
1125 {
1126 XLogSegNo nextSegNo;
1127
1128 if (state->seg.ws_file >= 0)
1129 state->routine.segment_close(state);
1130
1131 XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
1132 state->routine.segment_open(state, nextSegNo, &tli);
1133
1134 /* This shouldn't happen -- indicates a bug in segment_open */
1135 Assert(state->seg.ws_file >= 0);
1136
1137 /* Update the current segment info. */
1138 state->seg.ws_tli = tli;
1139 state->seg.ws_segno = nextSegNo;
1140 }
1141
1142 /* How many bytes are within this segment? */
1143 if (nbytes > (state->segcxt.ws_segsize - startoff))
1144 segbytes = state->segcxt.ws_segsize - startoff;
1145 else
1146 segbytes = nbytes;
1147
1148 #ifndef FRONTEND
1149 pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
1150 #endif
1151
1152 /* Reset errno first; eases reporting non-errno-affecting errors */
1153 errno = 0;
1154 readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
1155
1156 #ifndef FRONTEND
1157 pgstat_report_wait_end();
1158 #endif
1159
1160 if (readbytes <= 0)
1161 {
1162 errinfo->wre_errno = errno;
1163 errinfo->wre_req = segbytes;
1164 errinfo->wre_read = readbytes;
1165 errinfo->wre_off = startoff;
1166 errinfo->wre_seg = state->seg;
1167 return false;
1168 }
1169
1170 /* Update state for read */
1171 recptr += readbytes;
1172 nbytes -= readbytes;
1173 p += readbytes;
1174 }
1175
1176 return true;
1177 }
1178
1179 /* ----------------------------------------
1180 * Functions for decoding the data and block references in a record.
1181 * ----------------------------------------
1182 */
1183
1184 /* private function to reset the state between records */
1185 static void
ResetDecoder(XLogReaderState * state)1186 ResetDecoder(XLogReaderState *state)
1187 {
1188 int block_id;
1189
1190 state->decoded_record = NULL;
1191
1192 state->main_data_len = 0;
1193
1194 for (block_id = 0; block_id <= state->max_block_id; block_id++)
1195 {
1196 state->blocks[block_id].in_use = false;
1197 state->blocks[block_id].has_image = false;
1198 state->blocks[block_id].has_data = false;
1199 state->blocks[block_id].apply_image = false;
1200 }
1201 state->max_block_id = -1;
1202 }
1203
1204 /*
1205 * Decode the previously read record.
1206 *
1207 * On error, a human-readable error message is returned in *errormsg, and
1208 * the return value is false.
1209 */
1210 bool
DecodeXLogRecord(XLogReaderState * state,XLogRecord * record,char ** errormsg)1211 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1212 {
1213 /*
1214 * read next _size bytes from record buffer, but check for overrun first.
1215 */
1216 #define COPY_HEADER_FIELD(_dst, _size) \
1217 do { \
1218 if (remaining < _size) \
1219 goto shortdata_err; \
1220 memcpy(_dst, ptr, _size); \
1221 ptr += _size; \
1222 remaining -= _size; \
1223 } while(0)
1224
1225 char *ptr;
1226 uint32 remaining;
1227 uint32 datatotal;
1228 RelFileNode *rnode = NULL;
1229 uint8 block_id;
1230
1231 ResetDecoder(state);
1232
1233 state->decoded_record = record;
1234 state->record_origin = InvalidRepOriginId;
1235 state->toplevel_xid = InvalidTransactionId;
1236
1237 ptr = (char *) record;
1238 ptr += SizeOfXLogRecord;
1239 remaining = record->xl_tot_len - SizeOfXLogRecord;
1240
1241 /* Decode the headers */
1242 datatotal = 0;
1243 while (remaining > datatotal)
1244 {
1245 COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1246
1247 if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1248 {
1249 /* XLogRecordDataHeaderShort */
1250 uint8 main_data_len;
1251
1252 COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1253
1254 state->main_data_len = main_data_len;
1255 datatotal += main_data_len;
1256 break; /* by convention, the main data fragment is
1257 * always last */
1258 }
1259 else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1260 {
1261 /* XLogRecordDataHeaderLong */
1262 uint32 main_data_len;
1263
1264 COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1265 state->main_data_len = main_data_len;
1266 datatotal += main_data_len;
1267 break; /* by convention, the main data fragment is
1268 * always last */
1269 }
1270 else if (block_id == XLR_BLOCK_ID_ORIGIN)
1271 {
1272 COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1273 }
1274 else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
1275 {
1276 COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
1277 }
1278 else if (block_id <= XLR_MAX_BLOCK_ID)
1279 {
1280 /* XLogRecordBlockHeader */
1281 DecodedBkpBlock *blk;
1282 uint8 fork_flags;
1283
1284 if (block_id <= state->max_block_id)
1285 {
1286 report_invalid_record(state,
1287 "out-of-order block_id %u at %X/%X",
1288 block_id,
1289 LSN_FORMAT_ARGS(state->ReadRecPtr));
1290 goto err;
1291 }
1292 state->max_block_id = block_id;
1293
1294 blk = &state->blocks[block_id];
1295 blk->in_use = true;
1296 blk->apply_image = false;
1297
1298 COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1299 blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1300 blk->flags = fork_flags;
1301 blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1302 blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1303
1304 COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1305 /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1306 if (blk->has_data && blk->data_len == 0)
1307 {
1308 report_invalid_record(state,
1309 "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1310 LSN_FORMAT_ARGS(state->ReadRecPtr));
1311 goto err;
1312 }
1313 if (!blk->has_data && blk->data_len != 0)
1314 {
1315 report_invalid_record(state,
1316 "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1317 (unsigned int) blk->data_len,
1318 LSN_FORMAT_ARGS(state->ReadRecPtr));
1319 goto err;
1320 }
1321 datatotal += blk->data_len;
1322
1323 if (blk->has_image)
1324 {
1325 COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1326 COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1327 COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1328
1329 blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1330
1331 if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1332 {
1333 if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1334 COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1335 else
1336 blk->hole_length = 0;
1337 }
1338 else
1339 blk->hole_length = BLCKSZ - blk->bimg_len;
1340 datatotal += blk->bimg_len;
1341
1342 /*
1343 * cross-check that hole_offset > 0, hole_length > 0 and
1344 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1345 */
1346 if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1347 (blk->hole_offset == 0 ||
1348 blk->hole_length == 0 ||
1349 blk->bimg_len == BLCKSZ))
1350 {
1351 report_invalid_record(state,
1352 "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1353 (unsigned int) blk->hole_offset,
1354 (unsigned int) blk->hole_length,
1355 (unsigned int) blk->bimg_len,
1356 LSN_FORMAT_ARGS(state->ReadRecPtr));
1357 goto err;
1358 }
1359
1360 /*
1361 * cross-check that hole_offset == 0 and hole_length == 0 if
1362 * the HAS_HOLE flag is not set.
1363 */
1364 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1365 (blk->hole_offset != 0 || blk->hole_length != 0))
1366 {
1367 report_invalid_record(state,
1368 "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1369 (unsigned int) blk->hole_offset,
1370 (unsigned int) blk->hole_length,
1371 LSN_FORMAT_ARGS(state->ReadRecPtr));
1372 goto err;
1373 }
1374
1375 /*
1376 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1377 * flag is set.
1378 */
1379 if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1380 blk->bimg_len == BLCKSZ)
1381 {
1382 report_invalid_record(state,
1383 "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1384 (unsigned int) blk->bimg_len,
1385 LSN_FORMAT_ARGS(state->ReadRecPtr));
1386 goto err;
1387 }
1388
1389 /*
1390 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1391 * IS_COMPRESSED flag is set.
1392 */
1393 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1394 !(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1395 blk->bimg_len != BLCKSZ)
1396 {
1397 report_invalid_record(state,
1398 "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1399 (unsigned int) blk->data_len,
1400 LSN_FORMAT_ARGS(state->ReadRecPtr));
1401 goto err;
1402 }
1403 }
1404 if (!(fork_flags & BKPBLOCK_SAME_REL))
1405 {
1406 COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1407 rnode = &blk->rnode;
1408 }
1409 else
1410 {
1411 if (rnode == NULL)
1412 {
1413 report_invalid_record(state,
1414 "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1415 LSN_FORMAT_ARGS(state->ReadRecPtr));
1416 goto err;
1417 }
1418
1419 blk->rnode = *rnode;
1420 }
1421 COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1422 }
1423 else
1424 {
1425 report_invalid_record(state,
1426 "invalid block_id %u at %X/%X",
1427 block_id, LSN_FORMAT_ARGS(state->ReadRecPtr));
1428 goto err;
1429 }
1430 }
1431
1432 if (remaining != datatotal)
1433 goto shortdata_err;
1434
1435 /*
1436 * Ok, we've parsed the fragment headers, and verified that the total
1437 * length of the payload in the fragments is equal to the amount of data
1438 * left. Copy the data of each fragment to a separate buffer.
1439 *
1440 * We could just set up pointers into readRecordBuf, but we want to align
1441 * the data for the convenience of the callers. Backup images are not
1442 * copied, however; they don't need alignment.
1443 */
1444
1445 /* block data first */
1446 for (block_id = 0; block_id <= state->max_block_id; block_id++)
1447 {
1448 DecodedBkpBlock *blk = &state->blocks[block_id];
1449
1450 if (!blk->in_use)
1451 continue;
1452
1453 Assert(blk->has_image || !blk->apply_image);
1454
1455 if (blk->has_image)
1456 {
1457 blk->bkp_image = ptr;
1458 ptr += blk->bimg_len;
1459 }
1460 if (blk->has_data)
1461 {
1462 if (!blk->data || blk->data_len > blk->data_bufsz)
1463 {
1464 if (blk->data)
1465 pfree(blk->data);
1466
1467 /*
1468 * Force the initial request to be BLCKSZ so that we don't
1469 * waste time with lots of trips through this stanza as a
1470 * result of WAL compression.
1471 */
1472 blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
1473 blk->data = palloc(blk->data_bufsz);
1474 }
1475 memcpy(blk->data, ptr, blk->data_len);
1476 ptr += blk->data_len;
1477 }
1478 }
1479
1480 /* and finally, the main data */
1481 if (state->main_data_len > 0)
1482 {
1483 if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1484 {
1485 if (state->main_data)
1486 pfree(state->main_data);
1487
1488 /*
1489 * main_data_bufsz must be MAXALIGN'ed. In many xlog record
1490 * types, we omit trailing struct padding on-disk to save a few
1491 * bytes; but compilers may generate accesses to the xlog struct
1492 * that assume that padding bytes are present. If the palloc
1493 * request is not large enough to include such padding bytes then
1494 * we'll get valgrind complaints due to otherwise-harmless fetches
1495 * of the padding bytes.
1496 *
1497 * In addition, force the initial request to be reasonably large
1498 * so that we don't waste time with lots of trips through this
1499 * stanza. BLCKSZ / 2 seems like a good compromise choice.
1500 */
1501 state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1502 BLCKSZ / 2));
1503 state->main_data = palloc(state->main_data_bufsz);
1504 }
1505 memcpy(state->main_data, ptr, state->main_data_len);
1506 ptr += state->main_data_len;
1507 }
1508
1509 return true;
1510
1511 shortdata_err:
1512 report_invalid_record(state,
1513 "record with invalid length at %X/%X",
1514 LSN_FORMAT_ARGS(state->ReadRecPtr));
1515 err:
1516 *errormsg = state->errormsg_buf;
1517
1518 return false;
1519 }
1520
1521 /*
1522 * Returns information about the block that a block reference refers to.
1523 *
1524 * If the WAL record contains a block reference with the given ID, *rnode,
1525 * *forknum, and *blknum are filled in (if not NULL), and returns true.
1526 * Otherwise returns false.
1527 */
1528 bool
XLogRecGetBlockTag(XLogReaderState * record,uint8 block_id,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)1529 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1530 RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1531 {
1532 DecodedBkpBlock *bkpb;
1533
1534 if (!record->blocks[block_id].in_use)
1535 return false;
1536
1537 bkpb = &record->blocks[block_id];
1538 if (rnode)
1539 *rnode = bkpb->rnode;
1540 if (forknum)
1541 *forknum = bkpb->forknum;
1542 if (blknum)
1543 *blknum = bkpb->blkno;
1544 return true;
1545 }
1546
1547 /*
1548 * Returns the data associated with a block reference, or NULL if there is
1549 * no data (e.g. because a full-page image was taken instead). The returned
1550 * pointer points to a MAXALIGNed buffer.
1551 */
1552 char *
XLogRecGetBlockData(XLogReaderState * record,uint8 block_id,Size * len)1553 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1554 {
1555 DecodedBkpBlock *bkpb;
1556
1557 if (!record->blocks[block_id].in_use)
1558 return NULL;
1559
1560 bkpb = &record->blocks[block_id];
1561
1562 if (!bkpb->has_data)
1563 {
1564 if (len)
1565 *len = 0;
1566 return NULL;
1567 }
1568 else
1569 {
1570 if (len)
1571 *len = bkpb->data_len;
1572 return bkpb->data;
1573 }
1574 }
1575
1576 /*
1577 * Restore a full-page image from a backup block attached to an XLOG record.
1578 *
1579 * Returns true if a full-page image is restored.
1580 */
1581 bool
RestoreBlockImage(XLogReaderState * record,uint8 block_id,char * page)1582 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1583 {
1584 DecodedBkpBlock *bkpb;
1585 char *ptr;
1586 PGAlignedBlock tmp;
1587
1588 if (!record->blocks[block_id].in_use)
1589 return false;
1590 if (!record->blocks[block_id].has_image)
1591 return false;
1592
1593 bkpb = &record->blocks[block_id];
1594 ptr = bkpb->bkp_image;
1595
1596 if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1597 {
1598 /* If a backup block image is compressed, decompress it */
1599 if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1600 BLCKSZ - bkpb->hole_length, true) < 0)
1601 {
1602 report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1603 LSN_FORMAT_ARGS(record->ReadRecPtr),
1604 block_id);
1605 return false;
1606 }
1607 ptr = tmp.data;
1608 }
1609
1610 /* generate page, taking into account hole if necessary */
1611 if (bkpb->hole_length == 0)
1612 {
1613 memcpy(page, ptr, BLCKSZ);
1614 }
1615 else
1616 {
1617 memcpy(page, ptr, bkpb->hole_offset);
1618 /* must zero-fill the hole */
1619 MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1620 memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1621 ptr + bkpb->hole_offset,
1622 BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1623 }
1624
1625 return true;
1626 }
1627
1628 #ifndef FRONTEND
1629
1630 /*
1631 * Extract the FullTransactionId from a WAL record.
1632 */
1633 FullTransactionId
XLogRecGetFullXid(XLogReaderState * record)1634 XLogRecGetFullXid(XLogReaderState *record)
1635 {
1636 TransactionId xid,
1637 next_xid;
1638 uint32 epoch;
1639
1640 /*
1641 * This function is only safe during replay, because it depends on the
1642 * replay state. See AdvanceNextFullTransactionIdPastXid() for more.
1643 */
1644 Assert(AmStartupProcess() || !IsUnderPostmaster);
1645
1646 xid = XLogRecGetXid(record);
1647 next_xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
1648 epoch = EpochFromFullTransactionId(ShmemVariableCache->nextXid);
1649
1650 /*
1651 * If xid is numerically greater than next_xid, it has to be from the last
1652 * epoch.
1653 */
1654 if (unlikely(xid > next_xid))
1655 --epoch;
1656
1657 return FullTransactionIdFromEpochAndXid(epoch, xid);
1658 }
1659
1660 #endif
1661