1 /*-------------------------------------------------------------------------
2 *
3 * xlogreader.c
4 * Generic XLog reading facility
5 *
6 * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogreader.c
10 *
11 * NOTES
12 * See xlogreader.h for more notes on this facility.
13 *
14 * This file is compiled as both front-end and backend code, so it
15 * may not use ereport, server-defined static variables, etc.
16 *-------------------------------------------------------------------------
17 */
18 #include "postgres.h"
19
20 #include <unistd.h>
21
22 #include "access/transam.h"
23 #include "access/xlog_internal.h"
24 #include "access/xlogreader.h"
25 #include "access/xlogrecord.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "replication/origin.h"
29
30 #ifndef FRONTEND
31 #include "miscadmin.h"
32 #include "pgstat.h"
33 #include "utils/memutils.h"
34 #endif
35
36 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
37 pg_attribute_printf(2, 3);
38 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
39 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
40 int reqLen);
41 static void XLogReaderInvalReadState(XLogReaderState *state);
42 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
43 XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
44 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
45 XLogRecPtr recptr);
46 static void ResetDecoder(XLogReaderState *state);
47 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
48 int segsize, const char *waldir);
49
50 /* size of the buffer allocated for error message. */
51 #define MAX_ERRORMSG_LEN 1000
52
53 /*
54 * Construct a string in state->errormsg_buf explaining what's wrong with
55 * the current record being read.
56 */
57 static void
report_invalid_record(XLogReaderState * state,const char * fmt,...)58 report_invalid_record(XLogReaderState *state, const char *fmt,...)
59 {
60 va_list args;
61
62 fmt = _(fmt);
63
64 va_start(args, fmt);
65 vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
66 va_end(args);
67 }
68
69 /*
70 * Allocate and initialize a new XLogReader.
71 *
72 * Returns NULL if the xlogreader couldn't be allocated.
73 */
74 XLogReaderState *
XLogReaderAllocate(int wal_segment_size,const char * waldir,XLogReaderRoutine * routine,void * private_data)75 XLogReaderAllocate(int wal_segment_size, const char *waldir,
76 XLogReaderRoutine *routine, void *private_data)
77 {
78 XLogReaderState *state;
79
80 state = (XLogReaderState *)
81 palloc_extended(sizeof(XLogReaderState),
82 MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
83 if (!state)
84 return NULL;
85
86 /* initialize caller-provided support functions */
87 state->routine = *routine;
88
89 state->max_block_id = -1;
90
91 /*
92 * Permanently allocate readBuf. We do it this way, rather than just
93 * making a static array, for two reasons: (1) no need to waste the
94 * storage in most instantiations of the backend; (2) a static char array
95 * isn't guaranteed to have any particular alignment, whereas
96 * palloc_extended() will provide MAXALIGN'd storage.
97 */
98 state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
99 MCXT_ALLOC_NO_OOM);
100 if (!state->readBuf)
101 {
102 pfree(state);
103 return NULL;
104 }
105
106 /* Initialize segment info. */
107 WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
108 waldir);
109
110 /* system_identifier initialized to zeroes above */
111 state->private_data = private_data;
112 /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
113 state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
114 MCXT_ALLOC_NO_OOM);
115 if (!state->errormsg_buf)
116 {
117 pfree(state->readBuf);
118 pfree(state);
119 return NULL;
120 }
121 state->errormsg_buf[0] = '\0';
122
123 /*
124 * Allocate an initial readRecordBuf of minimal size, which can later be
125 * enlarged if necessary.
126 */
127 if (!allocate_recordbuf(state, 0))
128 {
129 pfree(state->errormsg_buf);
130 pfree(state->readBuf);
131 pfree(state);
132 return NULL;
133 }
134
135 return state;
136 }
137
138 void
XLogReaderFree(XLogReaderState * state)139 XLogReaderFree(XLogReaderState *state)
140 {
141 int block_id;
142
143 if (state->seg.ws_file != -1)
144 state->routine.segment_close(state);
145
146 for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
147 {
148 if (state->blocks[block_id].data)
149 pfree(state->blocks[block_id].data);
150 }
151 if (state->main_data)
152 pfree(state->main_data);
153
154 pfree(state->errormsg_buf);
155 if (state->readRecordBuf)
156 pfree(state->readRecordBuf);
157 pfree(state->readBuf);
158 pfree(state);
159 }
160
161 /*
162 * Allocate readRecordBuf to fit a record of at least the given length.
163 * Returns true if successful, false if out of memory.
164 *
165 * readRecordBufSize is set to the new buffer size.
166 *
167 * To avoid useless small increases, round its size to a multiple of
168 * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
169 * with. (That is enough for all "normal" records, but very large commit or
170 * abort records might need more space.)
171 */
172 static bool
allocate_recordbuf(XLogReaderState * state,uint32 reclength)173 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
174 {
175 uint32 newSize = reclength;
176
177 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
178 newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
179
180 #ifndef FRONTEND
181
182 /*
183 * Note that in much unlucky circumstances, the random data read from a
184 * recycled segment can cause this routine to be called with a size
185 * causing a hard failure at allocation. For a standby, this would cause
186 * the instance to stop suddenly with a hard failure, preventing it to
187 * retry fetching WAL from one of its sources which could allow it to move
188 * on with replay without a manual restart. If the data comes from a past
189 * recycled segment and is still valid, then the allocation may succeed
190 * but record checks are going to fail so this would be short-lived. If
191 * the allocation fails because of a memory shortage, then this is not a
192 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
193 */
194 if (!AllocSizeIsValid(newSize))
195 return false;
196
197 #endif
198
199 if (state->readRecordBuf)
200 pfree(state->readRecordBuf);
201 state->readRecordBuf =
202 (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
203 if (state->readRecordBuf == NULL)
204 {
205 state->readRecordBufSize = 0;
206 return false;
207 }
208 state->readRecordBufSize = newSize;
209 return true;
210 }
211
212 /*
213 * Initialize the passed segment structs.
214 */
215 static void
WALOpenSegmentInit(WALOpenSegment * seg,WALSegmentContext * segcxt,int segsize,const char * waldir)216 WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
217 int segsize, const char *waldir)
218 {
219 seg->ws_file = -1;
220 seg->ws_segno = 0;
221 seg->ws_tli = 0;
222
223 segcxt->ws_segsize = segsize;
224 if (waldir)
225 snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
226 }
227
228 /*
229 * Begin reading WAL at 'RecPtr'.
230 *
231 * 'RecPtr' should point to the beginnning of a valid WAL record. Pointing at
232 * the beginning of a page is also OK, if there is a new record right after
233 * the page header, i.e. not a continuation.
234 *
235 * This does not make any attempt to read the WAL yet, and hence cannot fail.
236 * If the starting address is not correct, the first call to XLogReadRecord()
237 * will error out.
238 */
239 void
XLogBeginRead(XLogReaderState * state,XLogRecPtr RecPtr)240 XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
241 {
242 Assert(!XLogRecPtrIsInvalid(RecPtr));
243
244 ResetDecoder(state);
245
246 /* Begin at the passed-in record pointer. */
247 state->EndRecPtr = RecPtr;
248 state->ReadRecPtr = InvalidXLogRecPtr;
249 }
250
251 /*
252 * Attempt to read an XLOG record.
253 *
254 * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
255 * to XLogReadRecord().
256 *
257 * If the page_read callback fails to read the requested data, NULL is
258 * returned. The callback is expected to have reported the error; errormsg
259 * is set to NULL.
260 *
261 * If the reading fails for some other reason, NULL is also returned, and
262 * *errormsg is set to a string with details of the failure.
263 *
264 * The returned pointer (or *errormsg) points to an internal buffer that's
265 * valid until the next call to XLogReadRecord.
266 */
267 XLogRecord *
XLogReadRecord(XLogReaderState * state,char ** errormsg)268 XLogReadRecord(XLogReaderState *state, char **errormsg)
269 {
270 XLogRecPtr RecPtr;
271 XLogRecord *record;
272 XLogRecPtr targetPagePtr;
273 bool randAccess;
274 uint32 len,
275 total_len;
276 uint32 targetRecOff;
277 uint32 pageHeaderSize;
278 bool assembled;
279 bool gotheader;
280 int readOff;
281
282 /*
283 * randAccess indicates whether to verify the previous-record pointer of
284 * the record we're reading. We only do this if we're reading
285 * sequentially, which is what we initially assume.
286 */
287 randAccess = false;
288
289 /* reset error state */
290 *errormsg = NULL;
291 state->errormsg_buf[0] = '\0';
292
293 ResetDecoder(state);
294 state->abortedRecPtr = InvalidXLogRecPtr;
295 state->missingContrecPtr = InvalidXLogRecPtr;
296
297 RecPtr = state->EndRecPtr;
298
299 if (state->ReadRecPtr != InvalidXLogRecPtr)
300 {
301 /* read the record after the one we just read */
302
303 /*
304 * EndRecPtr is pointing to end+1 of the previous WAL record. If
305 * we're at a page boundary, no more records can fit on the current
306 * page. We must skip over the page header, but we can't do that until
307 * we've read in the page, since the header size is variable.
308 */
309 }
310 else
311 {
312 /*
313 * Caller supplied a position to start at.
314 *
315 * In this case, EndRecPtr should already be pointing to a valid
316 * record starting position.
317 */
318 Assert(XRecOffIsValid(RecPtr));
319 randAccess = true;
320 }
321
322 restart:
323 state->currRecPtr = RecPtr;
324 assembled = false;
325
326 targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
327 targetRecOff = RecPtr % XLOG_BLCKSZ;
328
329 /*
330 * Read the page containing the record into state->readBuf. Request enough
331 * byte to cover the whole record header, or at least the part of it that
332 * fits on the same page.
333 */
334 readOff = ReadPageInternal(state, targetPagePtr,
335 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
336 if (readOff < 0)
337 goto err;
338
339 /*
340 * ReadPageInternal always returns at least the page header, so we can
341 * examine it now.
342 */
343 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
344 if (targetRecOff == 0)
345 {
346 /*
347 * At page start, so skip over page header.
348 */
349 RecPtr += pageHeaderSize;
350 targetRecOff = pageHeaderSize;
351 }
352 else if (targetRecOff < pageHeaderSize)
353 {
354 report_invalid_record(state, "invalid record offset at %X/%X",
355 (uint32) (RecPtr >> 32), (uint32) RecPtr);
356 goto err;
357 }
358
359 if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
360 targetRecOff == pageHeaderSize)
361 {
362 report_invalid_record(state, "contrecord is requested by %X/%X",
363 (uint32) (RecPtr >> 32), (uint32) RecPtr);
364 goto err;
365 }
366
367 /* ReadPageInternal has verified the page header */
368 Assert(pageHeaderSize <= readOff);
369
370 /*
371 * Read the record length.
372 *
373 * NB: Even though we use an XLogRecord pointer here, the whole record
374 * header might not fit on this page. xl_tot_len is the first field of the
375 * struct, so it must be on this page (the records are MAXALIGNed), but we
376 * cannot access any other fields until we've verified that we got the
377 * whole header.
378 */
379 record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
380 total_len = record->xl_tot_len;
381
382 /*
383 * If the whole record header is on this page, validate it immediately.
384 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
385 * rest of the header after reading it from the next page. The xl_tot_len
386 * check is necessary here to ensure that we enter the "Need to reassemble
387 * record" code path below; otherwise we might fail to apply
388 * ValidXLogRecordHeader at all.
389 */
390 if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
391 {
392 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
393 randAccess))
394 goto err;
395 gotheader = true;
396 }
397 else
398 {
399 /* XXX: more validation should be done here */
400 if (total_len < SizeOfXLogRecord)
401 {
402 report_invalid_record(state,
403 "invalid record length at %X/%X: wanted %u, got %u",
404 (uint32) (RecPtr >> 32), (uint32) RecPtr,
405 (uint32) SizeOfXLogRecord, total_len);
406 goto err;
407 }
408 gotheader = false;
409 }
410
411 len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
412 if (total_len > len)
413 {
414 /* Need to reassemble record */
415 char *contdata;
416 XLogPageHeader pageHeader;
417 char *buffer;
418 uint32 gotlen;
419
420 assembled = true;
421
422 /*
423 * Enlarge readRecordBuf as needed.
424 */
425 if (total_len > state->readRecordBufSize &&
426 !allocate_recordbuf(state, total_len))
427 {
428 /* We treat this as a "bogus data" condition */
429 report_invalid_record(state, "record length %u at %X/%X too long",
430 total_len,
431 (uint32) (RecPtr >> 32), (uint32) RecPtr);
432 goto err;
433 }
434
435 /* Copy the first fragment of the record from the first page. */
436 memcpy(state->readRecordBuf,
437 state->readBuf + RecPtr % XLOG_BLCKSZ, len);
438 buffer = state->readRecordBuf + len;
439 gotlen = len;
440
441 do
442 {
443 /* Calculate pointer to beginning of next page */
444 targetPagePtr += XLOG_BLCKSZ;
445
446 /* Wait for the next page to become available */
447 readOff = ReadPageInternal(state, targetPagePtr,
448 Min(total_len - gotlen + SizeOfXLogShortPHD,
449 XLOG_BLCKSZ));
450
451 if (readOff < 0)
452 goto err;
453
454 Assert(SizeOfXLogShortPHD <= readOff);
455
456 pageHeader = (XLogPageHeader) state->readBuf;
457
458 /*
459 * If we were expecting a continuation record and got an
460 * "overwrite contrecord" flag, that means the continuation record
461 * was overwritten with a different record. Restart the read by
462 * assuming the address to read is the location where we found
463 * this flag; but keep track of the LSN of the record we were
464 * reading, for later verification.
465 */
466 if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
467 {
468 state->overwrittenRecPtr = state->currRecPtr;
469 ResetDecoder(state);
470 RecPtr = targetPagePtr;
471 goto restart;
472 }
473
474 /* Check that the continuation on next page looks valid */
475 if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
476 {
477 report_invalid_record(state,
478 "there is no contrecord flag at %X/%X",
479 (uint32) (RecPtr >> 32), (uint32) RecPtr);
480 goto err;
481 }
482
483 /*
484 * Cross-check that xlp_rem_len agrees with how much of the record
485 * we expect there to be left.
486 */
487 if (pageHeader->xlp_rem_len == 0 ||
488 total_len != (pageHeader->xlp_rem_len + gotlen))
489 {
490 report_invalid_record(state,
491 "invalid contrecord length %u at %X/%X",
492 pageHeader->xlp_rem_len,
493 (uint32) (RecPtr >> 32), (uint32) RecPtr);
494 goto err;
495 }
496
497 /* Append the continuation from this page to the buffer */
498 pageHeaderSize = XLogPageHeaderSize(pageHeader);
499
500 if (readOff < pageHeaderSize)
501 readOff = ReadPageInternal(state, targetPagePtr,
502 pageHeaderSize);
503
504 Assert(pageHeaderSize <= readOff);
505
506 contdata = (char *) state->readBuf + pageHeaderSize;
507 len = XLOG_BLCKSZ - pageHeaderSize;
508 if (pageHeader->xlp_rem_len < len)
509 len = pageHeader->xlp_rem_len;
510
511 if (readOff < pageHeaderSize + len)
512 readOff = ReadPageInternal(state, targetPagePtr,
513 pageHeaderSize + len);
514
515 memcpy(buffer, (char *) contdata, len);
516 buffer += len;
517 gotlen += len;
518
519 /* If we just reassembled the record header, validate it. */
520 if (!gotheader)
521 {
522 record = (XLogRecord *) state->readRecordBuf;
523 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
524 record, randAccess))
525 goto err;
526 gotheader = true;
527 }
528 } while (gotlen < total_len);
529
530 Assert(gotheader);
531
532 record = (XLogRecord *) state->readRecordBuf;
533 if (!ValidXLogRecord(state, record, RecPtr))
534 goto err;
535
536 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
537 state->ReadRecPtr = RecPtr;
538 state->EndRecPtr = targetPagePtr + pageHeaderSize
539 + MAXALIGN(pageHeader->xlp_rem_len);
540 }
541 else
542 {
543 /* Wait for the record data to become available */
544 readOff = ReadPageInternal(state, targetPagePtr,
545 Min(targetRecOff + total_len, XLOG_BLCKSZ));
546 if (readOff < 0)
547 goto err;
548
549 /* Record does not cross a page boundary */
550 if (!ValidXLogRecord(state, record, RecPtr))
551 goto err;
552
553 state->EndRecPtr = RecPtr + MAXALIGN(total_len);
554
555 state->ReadRecPtr = RecPtr;
556 }
557
558 /*
559 * Special processing if it's an XLOG SWITCH record
560 */
561 if (record->xl_rmid == RM_XLOG_ID &&
562 (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
563 {
564 /* Pretend it extends to end of segment */
565 state->EndRecPtr += state->segcxt.ws_segsize - 1;
566 state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
567 }
568
569 if (DecodeXLogRecord(state, record, errormsg))
570 return record;
571 else
572 return NULL;
573
574 err:
575 if (assembled)
576 {
577 /*
578 * We get here when a record that spans multiple pages needs to be
579 * assembled, but something went wrong -- perhaps a contrecord piece
580 * was lost. If caller is WAL replay, it will know where the aborted
581 * record was and where to direct followup WAL to be written, marking
582 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
583 * in turn signal downstream WAL consumers that the broken WAL record
584 * is to be ignored.
585 */
586 state->abortedRecPtr = RecPtr;
587 state->missingContrecPtr = targetPagePtr;
588 }
589
590 /*
591 * Invalidate the read state. We might read from a different source after
592 * failure.
593 */
594 XLogReaderInvalReadState(state);
595
596 if (state->errormsg_buf[0] != '\0')
597 *errormsg = state->errormsg_buf;
598
599 return NULL;
600 }
601
602 /*
603 * Read a single xlog page including at least [pageptr, reqLen] of valid data
604 * via the page_read() callback.
605 *
606 * Returns -1 if the required page cannot be read for some reason; errormsg_buf
607 * is set in that case (unless the error occurs in the page_read callback).
608 *
609 * We fetch the page from a reader-local cache if we know we have the required
610 * data and if there hasn't been any error since caching the data.
611 */
612 static int
ReadPageInternal(XLogReaderState * state,XLogRecPtr pageptr,int reqLen)613 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
614 {
615 int readLen;
616 uint32 targetPageOff;
617 XLogSegNo targetSegNo;
618 XLogPageHeader hdr;
619
620 Assert((pageptr % XLOG_BLCKSZ) == 0);
621
622 XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
623 targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
624
625 /* check whether we have all the requested data already */
626 if (targetSegNo == state->seg.ws_segno &&
627 targetPageOff == state->segoff && reqLen <= state->readLen)
628 return state->readLen;
629
630 /*
631 * Data is not in our buffer.
632 *
633 * Every time we actually read the segment, even if we looked at parts of
634 * it before, we need to do verification as the page_read callback might
635 * now be rereading data from a different source.
636 *
637 * Whenever switching to a new WAL segment, we read the first page of the
638 * file and validate its header, even if that's not where the target
639 * record is. This is so that we can check the additional identification
640 * info that is present in the first page's "long" header.
641 */
642 if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
643 {
644 XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
645
646 readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
647 state->currRecPtr,
648 state->readBuf);
649 if (readLen < 0)
650 goto err;
651
652 /* we can be sure to have enough WAL available, we scrolled back */
653 Assert(readLen == XLOG_BLCKSZ);
654
655 if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
656 state->readBuf))
657 goto err;
658 }
659
660 /*
661 * First, read the requested data length, but at least a short page header
662 * so that we can validate it.
663 */
664 readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
665 state->currRecPtr,
666 state->readBuf);
667 if (readLen < 0)
668 goto err;
669
670 Assert(readLen <= XLOG_BLCKSZ);
671
672 /* Do we have enough data to check the header length? */
673 if (readLen <= SizeOfXLogShortPHD)
674 goto err;
675
676 Assert(readLen >= reqLen);
677
678 hdr = (XLogPageHeader) state->readBuf;
679
680 /* still not enough */
681 if (readLen < XLogPageHeaderSize(hdr))
682 {
683 readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
684 state->currRecPtr,
685 state->readBuf);
686 if (readLen < 0)
687 goto err;
688 }
689
690 /*
691 * Now that we know we have the full header, validate it.
692 */
693 if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
694 goto err;
695
696 /* update read state information */
697 state->seg.ws_segno = targetSegNo;
698 state->segoff = targetPageOff;
699 state->readLen = readLen;
700
701 return readLen;
702
703 err:
704 XLogReaderInvalReadState(state);
705 return -1;
706 }
707
708 /*
709 * Invalidate the xlogreader's read state to force a re-read.
710 */
711 static void
XLogReaderInvalReadState(XLogReaderState * state)712 XLogReaderInvalReadState(XLogReaderState *state)
713 {
714 state->seg.ws_segno = 0;
715 state->segoff = 0;
716 state->readLen = 0;
717 }
718
719 /*
720 * Validate an XLOG record header.
721 *
722 * This is just a convenience subroutine to avoid duplicated code in
723 * XLogReadRecord. It's not intended for use from anywhere else.
724 */
725 static bool
ValidXLogRecordHeader(XLogReaderState * state,XLogRecPtr RecPtr,XLogRecPtr PrevRecPtr,XLogRecord * record,bool randAccess)726 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
727 XLogRecPtr PrevRecPtr, XLogRecord *record,
728 bool randAccess)
729 {
730 if (record->xl_tot_len < SizeOfXLogRecord)
731 {
732 report_invalid_record(state,
733 "invalid record length at %X/%X: wanted %u, got %u",
734 (uint32) (RecPtr >> 32), (uint32) RecPtr,
735 (uint32) SizeOfXLogRecord, record->xl_tot_len);
736 return false;
737 }
738 if (record->xl_rmid > RM_MAX_ID)
739 {
740 report_invalid_record(state,
741 "invalid resource manager ID %u at %X/%X",
742 record->xl_rmid, (uint32) (RecPtr >> 32),
743 (uint32) RecPtr);
744 return false;
745 }
746 if (randAccess)
747 {
748 /*
749 * We can't exactly verify the prev-link, but surely it should be less
750 * than the record's own address.
751 */
752 if (!(record->xl_prev < RecPtr))
753 {
754 report_invalid_record(state,
755 "record with incorrect prev-link %X/%X at %X/%X",
756 (uint32) (record->xl_prev >> 32),
757 (uint32) record->xl_prev,
758 (uint32) (RecPtr >> 32), (uint32) RecPtr);
759 return false;
760 }
761 }
762 else
763 {
764 /*
765 * Record's prev-link should exactly match our previous location. This
766 * check guards against torn WAL pages where a stale but valid-looking
767 * WAL record starts on a sector boundary.
768 */
769 if (record->xl_prev != PrevRecPtr)
770 {
771 report_invalid_record(state,
772 "record with incorrect prev-link %X/%X at %X/%X",
773 (uint32) (record->xl_prev >> 32),
774 (uint32) record->xl_prev,
775 (uint32) (RecPtr >> 32), (uint32) RecPtr);
776 return false;
777 }
778 }
779
780 return true;
781 }
782
783
784 /*
785 * CRC-check an XLOG record. We do not believe the contents of an XLOG
786 * record (other than to the minimal extent of computing the amount of
787 * data to read in) until we've checked the CRCs.
788 *
789 * We assume all of the record (that is, xl_tot_len bytes) has been read
790 * into memory at *record. Also, ValidXLogRecordHeader() has accepted the
791 * record's header, which means in particular that xl_tot_len is at least
792 * SizeOfXLogRecord.
793 */
794 static bool
ValidXLogRecord(XLogReaderState * state,XLogRecord * record,XLogRecPtr recptr)795 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
796 {
797 pg_crc32c crc;
798
799 /* Calculate the CRC */
800 INIT_CRC32C(crc);
801 COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
802 /* include the record header last */
803 COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
804 FIN_CRC32C(crc);
805
806 if (!EQ_CRC32C(record->xl_crc, crc))
807 {
808 report_invalid_record(state,
809 "incorrect resource manager data checksum in record at %X/%X",
810 (uint32) (recptr >> 32), (uint32) recptr);
811 return false;
812 }
813
814 return true;
815 }
816
817 /*
818 * Validate a page header.
819 *
820 * Check if 'phdr' is valid as the header of the XLog page at position
821 * 'recptr'.
822 */
823 bool
XLogReaderValidatePageHeader(XLogReaderState * state,XLogRecPtr recptr,char * phdr)824 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
825 char *phdr)
826 {
827 XLogRecPtr recaddr;
828 XLogSegNo segno;
829 int32 offset;
830 XLogPageHeader hdr = (XLogPageHeader) phdr;
831
832 Assert((recptr % XLOG_BLCKSZ) == 0);
833
834 XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
835 offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
836
837 XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
838
839 if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
840 {
841 char fname[MAXFNAMELEN];
842
843 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
844
845 report_invalid_record(state,
846 "invalid magic number %04X in log segment %s, offset %u",
847 hdr->xlp_magic,
848 fname,
849 offset);
850 return false;
851 }
852
853 if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
854 {
855 char fname[MAXFNAMELEN];
856
857 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
858
859 report_invalid_record(state,
860 "invalid info bits %04X in log segment %s, offset %u",
861 hdr->xlp_info,
862 fname,
863 offset);
864 return false;
865 }
866
867 if (hdr->xlp_info & XLP_LONG_HEADER)
868 {
869 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
870
871 if (state->system_identifier &&
872 longhdr->xlp_sysid != state->system_identifier)
873 {
874 report_invalid_record(state,
875 "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
876 (unsigned long long) longhdr->xlp_sysid,
877 (unsigned long long) state->system_identifier);
878 return false;
879 }
880 else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
881 {
882 report_invalid_record(state,
883 "WAL file is from different database system: incorrect segment size in page header");
884 return false;
885 }
886 else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
887 {
888 report_invalid_record(state,
889 "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
890 return false;
891 }
892 }
893 else if (offset == 0)
894 {
895 char fname[MAXFNAMELEN];
896
897 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
898
899 /* hmm, first page of file doesn't have a long header? */
900 report_invalid_record(state,
901 "invalid info bits %04X in log segment %s, offset %u",
902 hdr->xlp_info,
903 fname,
904 offset);
905 return false;
906 }
907
908 /*
909 * Check that the address on the page agrees with what we expected. This
910 * check typically fails when an old WAL segment is recycled, and hasn't
911 * yet been overwritten with new data yet.
912 */
913 if (hdr->xlp_pageaddr != recaddr)
914 {
915 char fname[MAXFNAMELEN];
916
917 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
918
919 report_invalid_record(state,
920 "unexpected pageaddr %X/%X in log segment %s, offset %u",
921 (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
922 fname,
923 offset);
924 return false;
925 }
926
927 /*
928 * Since child timelines are always assigned a TLI greater than their
929 * immediate parent's TLI, we should never see TLI go backwards across
930 * successive pages of a consistent WAL sequence.
931 *
932 * Sometimes we re-read a segment that's already been (partially) read. So
933 * we only verify TLIs for pages that are later than the last remembered
934 * LSN.
935 */
936 if (recptr > state->latestPagePtr)
937 {
938 if (hdr->xlp_tli < state->latestPageTLI)
939 {
940 char fname[MAXFNAMELEN];
941
942 XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
943
944 report_invalid_record(state,
945 "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
946 hdr->xlp_tli,
947 state->latestPageTLI,
948 fname,
949 offset);
950 return false;
951 }
952 }
953 state->latestPagePtr = recptr;
954 state->latestPageTLI = hdr->xlp_tli;
955
956 return true;
957 }
958
959 #ifdef FRONTEND
960 /*
961 * Functions that are currently not needed in the backend, but are better
962 * implemented inside xlogreader.c because of the internal facilities available
963 * here.
964 */
965
966 /*
967 * Find the first record with an lsn >= RecPtr.
968 *
969 * This is different from XLogBeginRead() in that RecPtr doesn't need to point
970 * to a valid record boundary. Useful for checking whether RecPtr is a valid
971 * xlog address for reading, and to find the first valid address after some
972 * address when dumping records for debugging purposes.
973 *
974 * This positions the reader, like XLogBeginRead(), so that the next call to
975 * XLogReadRecord() will read the next valid record.
976 */
977 XLogRecPtr
XLogFindNextRecord(XLogReaderState * state,XLogRecPtr RecPtr)978 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
979 {
980 XLogRecPtr tmpRecPtr;
981 XLogRecPtr found = InvalidXLogRecPtr;
982 XLogPageHeader header;
983 char *errormsg;
984
985 Assert(!XLogRecPtrIsInvalid(RecPtr));
986
987 /*
988 * skip over potential continuation data, keeping in mind that it may span
989 * multiple pages
990 */
991 tmpRecPtr = RecPtr;
992 while (true)
993 {
994 XLogRecPtr targetPagePtr;
995 int targetRecOff;
996 uint32 pageHeaderSize;
997 int readLen;
998
999 /*
1000 * Compute targetRecOff. It should typically be equal or greater than
1001 * short page-header since a valid record can't start anywhere before
1002 * that, except when caller has explicitly specified the offset that
1003 * falls somewhere there or when we are skipping multi-page
1004 * continuation record. It doesn't matter though because
1005 * ReadPageInternal() is prepared to handle that and will read at
1006 * least short page-header worth of data
1007 */
1008 targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
1009
1010 /* scroll back to page boundary */
1011 targetPagePtr = tmpRecPtr - targetRecOff;
1012
1013 /* Read the page containing the record */
1014 readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
1015 if (readLen < 0)
1016 goto err;
1017
1018 header = (XLogPageHeader) state->readBuf;
1019
1020 pageHeaderSize = XLogPageHeaderSize(header);
1021
1022 /* make sure we have enough data for the page header */
1023 readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
1024 if (readLen < 0)
1025 goto err;
1026
1027 /* skip over potential continuation data */
1028 if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
1029 {
1030 /*
1031 * If the length of the remaining continuation data is more than
1032 * what can fit in this page, the continuation record crosses over
1033 * this page. Read the next page and try again. xlp_rem_len in the
1034 * next page header will contain the remaining length of the
1035 * continuation data
1036 *
1037 * Note that record headers are MAXALIGN'ed
1038 */
1039 if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
1040 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
1041 else
1042 {
1043 /*
1044 * The previous continuation record ends in this page. Set
1045 * tmpRecPtr to point to the first valid record
1046 */
1047 tmpRecPtr = targetPagePtr + pageHeaderSize
1048 + MAXALIGN(header->xlp_rem_len);
1049 break;
1050 }
1051 }
1052 else
1053 {
1054 tmpRecPtr = targetPagePtr + pageHeaderSize;
1055 break;
1056 }
1057 }
1058
1059 /*
1060 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1061 * because either we're at the first record after the beginning of a page
1062 * or we just jumped over the remaining data of a continuation.
1063 */
1064 XLogBeginRead(state, tmpRecPtr);
1065 while (XLogReadRecord(state, &errormsg) != NULL)
1066 {
1067 /* past the record we've found, break out */
1068 if (RecPtr <= state->ReadRecPtr)
1069 {
1070 /* Rewind the reader to the beginning of the last record. */
1071 found = state->ReadRecPtr;
1072 XLogBeginRead(state, found);
1073 return found;
1074 }
1075 }
1076
1077 err:
1078 XLogReaderInvalReadState(state);
1079
1080 return InvalidXLogRecPtr;
1081 }
1082
1083 #endif /* FRONTEND */
1084
1085 /*
1086 * Helper function to ease writing of XLogRoutine->page_read callbacks.
1087 * If this function is used, caller must supply a segment_open callback in
1088 * 'state', as that is used here.
1089 *
1090 * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
1091 * fetched from timeline 'tli'.
1092 *
1093 * Returns true if succeeded, false if an error occurs, in which case
1094 * 'errinfo' receives error details.
1095 *
1096 * XXX probably this should be improved to suck data directly from the
1097 * WAL buffers when possible.
1098 */
1099 bool
WALRead(XLogReaderState * state,char * buf,XLogRecPtr startptr,Size count,TimeLineID tli,WALReadError * errinfo)1100 WALRead(XLogReaderState *state,
1101 char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
1102 WALReadError *errinfo)
1103 {
1104 char *p;
1105 XLogRecPtr recptr;
1106 Size nbytes;
1107
1108 p = buf;
1109 recptr = startptr;
1110 nbytes = count;
1111
1112 while (nbytes > 0)
1113 {
1114 uint32 startoff;
1115 int segbytes;
1116 int readbytes;
1117
1118 startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
1119
1120 /*
1121 * If the data we want is not in a segment we have open, close what we
1122 * have (if anything) and open the next one, using the caller's
1123 * provided openSegment callback.
1124 */
1125 if (state->seg.ws_file < 0 ||
1126 !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
1127 tli != state->seg.ws_tli)
1128 {
1129 XLogSegNo nextSegNo;
1130
1131 if (state->seg.ws_file >= 0)
1132 state->routine.segment_close(state);
1133
1134 XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
1135 state->routine.segment_open(state, nextSegNo, &tli);
1136
1137 /* This shouldn't happen -- indicates a bug in segment_open */
1138 Assert(state->seg.ws_file >= 0);
1139
1140 /* Update the current segment info. */
1141 state->seg.ws_tli = tli;
1142 state->seg.ws_segno = nextSegNo;
1143 }
1144
1145 /* How many bytes are within this segment? */
1146 if (nbytes > (state->segcxt.ws_segsize - startoff))
1147 segbytes = state->segcxt.ws_segsize - startoff;
1148 else
1149 segbytes = nbytes;
1150
1151 #ifndef FRONTEND
1152 pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
1153 #endif
1154
1155 /* Reset errno first; eases reporting non-errno-affecting errors */
1156 errno = 0;
1157 readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
1158
1159 #ifndef FRONTEND
1160 pgstat_report_wait_end();
1161 #endif
1162
1163 if (readbytes <= 0)
1164 {
1165 errinfo->wre_errno = errno;
1166 errinfo->wre_req = segbytes;
1167 errinfo->wre_read = readbytes;
1168 errinfo->wre_off = startoff;
1169 errinfo->wre_seg = state->seg;
1170 return false;
1171 }
1172
1173 /* Update state for read */
1174 recptr += readbytes;
1175 nbytes -= readbytes;
1176 p += readbytes;
1177 }
1178
1179 return true;
1180 }
1181
1182 /* ----------------------------------------
1183 * Functions for decoding the data and block references in a record.
1184 * ----------------------------------------
1185 */
1186
1187 /* private function to reset the state between records */
1188 static void
ResetDecoder(XLogReaderState * state)1189 ResetDecoder(XLogReaderState *state)
1190 {
1191 int block_id;
1192
1193 state->decoded_record = NULL;
1194
1195 state->main_data_len = 0;
1196
1197 for (block_id = 0; block_id <= state->max_block_id; block_id++)
1198 {
1199 state->blocks[block_id].in_use = false;
1200 state->blocks[block_id].has_image = false;
1201 state->blocks[block_id].has_data = false;
1202 state->blocks[block_id].apply_image = false;
1203 }
1204 state->max_block_id = -1;
1205 }
1206
1207 /*
1208 * Decode the previously read record.
1209 *
1210 * On error, a human-readable error message is returned in *errormsg, and
1211 * the return value is false.
1212 */
1213 bool
DecodeXLogRecord(XLogReaderState * state,XLogRecord * record,char ** errormsg)1214 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1215 {
1216 /*
1217 * read next _size bytes from record buffer, but check for overrun first.
1218 */
1219 #define COPY_HEADER_FIELD(_dst, _size) \
1220 do { \
1221 if (remaining < _size) \
1222 goto shortdata_err; \
1223 memcpy(_dst, ptr, _size); \
1224 ptr += _size; \
1225 remaining -= _size; \
1226 } while(0)
1227
1228 char *ptr;
1229 uint32 remaining;
1230 uint32 datatotal;
1231 RelFileNode *rnode = NULL;
1232 uint8 block_id;
1233
1234 ResetDecoder(state);
1235
1236 state->decoded_record = record;
1237 state->record_origin = InvalidRepOriginId;
1238
1239 ptr = (char *) record;
1240 ptr += SizeOfXLogRecord;
1241 remaining = record->xl_tot_len - SizeOfXLogRecord;
1242
1243 /* Decode the headers */
1244 datatotal = 0;
1245 while (remaining > datatotal)
1246 {
1247 COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1248
1249 if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1250 {
1251 /* XLogRecordDataHeaderShort */
1252 uint8 main_data_len;
1253
1254 COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1255
1256 state->main_data_len = main_data_len;
1257 datatotal += main_data_len;
1258 break; /* by convention, the main data fragment is
1259 * always last */
1260 }
1261 else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1262 {
1263 /* XLogRecordDataHeaderLong */
1264 uint32 main_data_len;
1265
1266 COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1267 state->main_data_len = main_data_len;
1268 datatotal += main_data_len;
1269 break; /* by convention, the main data fragment is
1270 * always last */
1271 }
1272 else if (block_id == XLR_BLOCK_ID_ORIGIN)
1273 {
1274 COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1275 }
1276 else if (block_id <= XLR_MAX_BLOCK_ID)
1277 {
1278 /* XLogRecordBlockHeader */
1279 DecodedBkpBlock *blk;
1280 uint8 fork_flags;
1281
1282 if (block_id <= state->max_block_id)
1283 {
1284 report_invalid_record(state,
1285 "out-of-order block_id %u at %X/%X",
1286 block_id,
1287 (uint32) (state->ReadRecPtr >> 32),
1288 (uint32) state->ReadRecPtr);
1289 goto err;
1290 }
1291 state->max_block_id = block_id;
1292
1293 blk = &state->blocks[block_id];
1294 blk->in_use = true;
1295 blk->apply_image = false;
1296
1297 COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1298 blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1299 blk->flags = fork_flags;
1300 blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1301 blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1302
1303 COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1304 /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1305 if (blk->has_data && blk->data_len == 0)
1306 {
1307 report_invalid_record(state,
1308 "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1309 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1310 goto err;
1311 }
1312 if (!blk->has_data && blk->data_len != 0)
1313 {
1314 report_invalid_record(state,
1315 "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1316 (unsigned int) blk->data_len,
1317 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1318 goto err;
1319 }
1320 datatotal += blk->data_len;
1321
1322 if (blk->has_image)
1323 {
1324 COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1325 COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1326 COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1327
1328 blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1329
1330 if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1331 {
1332 if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1333 COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1334 else
1335 blk->hole_length = 0;
1336 }
1337 else
1338 blk->hole_length = BLCKSZ - blk->bimg_len;
1339 datatotal += blk->bimg_len;
1340
1341 /*
1342 * cross-check that hole_offset > 0, hole_length > 0 and
1343 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1344 */
1345 if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1346 (blk->hole_offset == 0 ||
1347 blk->hole_length == 0 ||
1348 blk->bimg_len == BLCKSZ))
1349 {
1350 report_invalid_record(state,
1351 "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1352 (unsigned int) blk->hole_offset,
1353 (unsigned int) blk->hole_length,
1354 (unsigned int) blk->bimg_len,
1355 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1356 goto err;
1357 }
1358
1359 /*
1360 * cross-check that hole_offset == 0 and hole_length == 0 if
1361 * the HAS_HOLE flag is not set.
1362 */
1363 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1364 (blk->hole_offset != 0 || blk->hole_length != 0))
1365 {
1366 report_invalid_record(state,
1367 "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1368 (unsigned int) blk->hole_offset,
1369 (unsigned int) blk->hole_length,
1370 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1371 goto err;
1372 }
1373
1374 /*
1375 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1376 * flag is set.
1377 */
1378 if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1379 blk->bimg_len == BLCKSZ)
1380 {
1381 report_invalid_record(state,
1382 "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1383 (unsigned int) blk->bimg_len,
1384 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1385 goto err;
1386 }
1387
1388 /*
1389 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1390 * IS_COMPRESSED flag is set.
1391 */
1392 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1393 !(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1394 blk->bimg_len != BLCKSZ)
1395 {
1396 report_invalid_record(state,
1397 "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1398 (unsigned int) blk->data_len,
1399 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1400 goto err;
1401 }
1402 }
1403 if (!(fork_flags & BKPBLOCK_SAME_REL))
1404 {
1405 COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1406 rnode = &blk->rnode;
1407 }
1408 else
1409 {
1410 if (rnode == NULL)
1411 {
1412 report_invalid_record(state,
1413 "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1414 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1415 goto err;
1416 }
1417
1418 blk->rnode = *rnode;
1419 }
1420 COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1421 }
1422 else
1423 {
1424 report_invalid_record(state,
1425 "invalid block_id %u at %X/%X",
1426 block_id,
1427 (uint32) (state->ReadRecPtr >> 32),
1428 (uint32) state->ReadRecPtr);
1429 goto err;
1430 }
1431 }
1432
1433 if (remaining != datatotal)
1434 goto shortdata_err;
1435
1436 /*
1437 * Ok, we've parsed the fragment headers, and verified that the total
1438 * length of the payload in the fragments is equal to the amount of data
1439 * left. Copy the data of each fragment to a separate buffer.
1440 *
1441 * We could just set up pointers into readRecordBuf, but we want to align
1442 * the data for the convenience of the callers. Backup images are not
1443 * copied, however; they don't need alignment.
1444 */
1445
1446 /* block data first */
1447 for (block_id = 0; block_id <= state->max_block_id; block_id++)
1448 {
1449 DecodedBkpBlock *blk = &state->blocks[block_id];
1450
1451 if (!blk->in_use)
1452 continue;
1453
1454 Assert(blk->has_image || !blk->apply_image);
1455
1456 if (blk->has_image)
1457 {
1458 blk->bkp_image = ptr;
1459 ptr += blk->bimg_len;
1460 }
1461 if (blk->has_data)
1462 {
1463 if (!blk->data || blk->data_len > blk->data_bufsz)
1464 {
1465 if (blk->data)
1466 pfree(blk->data);
1467
1468 /*
1469 * Force the initial request to be BLCKSZ so that we don't
1470 * waste time with lots of trips through this stanza as a
1471 * result of WAL compression.
1472 */
1473 blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
1474 blk->data = palloc(blk->data_bufsz);
1475 }
1476 memcpy(blk->data, ptr, blk->data_len);
1477 ptr += blk->data_len;
1478 }
1479 }
1480
1481 /* and finally, the main data */
1482 if (state->main_data_len > 0)
1483 {
1484 if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1485 {
1486 if (state->main_data)
1487 pfree(state->main_data);
1488
1489 /*
1490 * main_data_bufsz must be MAXALIGN'ed. In many xlog record
1491 * types, we omit trailing struct padding on-disk to save a few
1492 * bytes; but compilers may generate accesses to the xlog struct
1493 * that assume that padding bytes are present. If the palloc
1494 * request is not large enough to include such padding bytes then
1495 * we'll get valgrind complaints due to otherwise-harmless fetches
1496 * of the padding bytes.
1497 *
1498 * In addition, force the initial request to be reasonably large
1499 * so that we don't waste time with lots of trips through this
1500 * stanza. BLCKSZ / 2 seems like a good compromise choice.
1501 */
1502 state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1503 BLCKSZ / 2));
1504 state->main_data = palloc(state->main_data_bufsz);
1505 }
1506 memcpy(state->main_data, ptr, state->main_data_len);
1507 ptr += state->main_data_len;
1508 }
1509
1510 return true;
1511
1512 shortdata_err:
1513 report_invalid_record(state,
1514 "record with invalid length at %X/%X",
1515 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1516 err:
1517 *errormsg = state->errormsg_buf;
1518
1519 return false;
1520 }
1521
1522 /*
1523 * Returns information about the block that a block reference refers to.
1524 *
1525 * If the WAL record contains a block reference with the given ID, *rnode,
1526 * *forknum, and *blknum are filled in (if not NULL), and returns true.
1527 * Otherwise returns false.
1528 */
1529 bool
XLogRecGetBlockTag(XLogReaderState * record,uint8 block_id,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)1530 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1531 RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1532 {
1533 DecodedBkpBlock *bkpb;
1534
1535 if (!record->blocks[block_id].in_use)
1536 return false;
1537
1538 bkpb = &record->blocks[block_id];
1539 if (rnode)
1540 *rnode = bkpb->rnode;
1541 if (forknum)
1542 *forknum = bkpb->forknum;
1543 if (blknum)
1544 *blknum = bkpb->blkno;
1545 return true;
1546 }
1547
1548 /*
1549 * Returns the data associated with a block reference, or NULL if there is
1550 * no data (e.g. because a full-page image was taken instead). The returned
1551 * pointer points to a MAXALIGNed buffer.
1552 */
1553 char *
XLogRecGetBlockData(XLogReaderState * record,uint8 block_id,Size * len)1554 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1555 {
1556 DecodedBkpBlock *bkpb;
1557
1558 if (!record->blocks[block_id].in_use)
1559 return NULL;
1560
1561 bkpb = &record->blocks[block_id];
1562
1563 if (!bkpb->has_data)
1564 {
1565 if (len)
1566 *len = 0;
1567 return NULL;
1568 }
1569 else
1570 {
1571 if (len)
1572 *len = bkpb->data_len;
1573 return bkpb->data;
1574 }
1575 }
1576
1577 /*
1578 * Restore a full-page image from a backup block attached to an XLOG record.
1579 *
1580 * Returns true if a full-page image is restored.
1581 */
1582 bool
RestoreBlockImage(XLogReaderState * record,uint8 block_id,char * page)1583 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1584 {
1585 DecodedBkpBlock *bkpb;
1586 char *ptr;
1587 PGAlignedBlock tmp;
1588
1589 if (!record->blocks[block_id].in_use)
1590 return false;
1591 if (!record->blocks[block_id].has_image)
1592 return false;
1593
1594 bkpb = &record->blocks[block_id];
1595 ptr = bkpb->bkp_image;
1596
1597 if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1598 {
1599 /* If a backup block image is compressed, decompress it */
1600 if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1601 BLCKSZ - bkpb->hole_length, true) < 0)
1602 {
1603 report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1604 (uint32) (record->ReadRecPtr >> 32),
1605 (uint32) record->ReadRecPtr,
1606 block_id);
1607 return false;
1608 }
1609 ptr = tmp.data;
1610 }
1611
1612 /* generate page, taking into account hole if necessary */
1613 if (bkpb->hole_length == 0)
1614 {
1615 memcpy(page, ptr, BLCKSZ);
1616 }
1617 else
1618 {
1619 memcpy(page, ptr, bkpb->hole_offset);
1620 /* must zero-fill the hole */
1621 MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1622 memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1623 ptr + bkpb->hole_offset,
1624 BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1625 }
1626
1627 return true;
1628 }
1629
1630 #ifndef FRONTEND
1631
1632 /*
1633 * Extract the FullTransactionId from a WAL record.
1634 */
1635 FullTransactionId
XLogRecGetFullXid(XLogReaderState * record)1636 XLogRecGetFullXid(XLogReaderState *record)
1637 {
1638 TransactionId xid,
1639 next_xid;
1640 uint32 epoch;
1641
1642 /*
1643 * This function is only safe during replay, because it depends on the
1644 * replay state. See AdvanceNextFullTransactionIdPastXid() for more.
1645 */
1646 Assert(AmStartupProcess() || !IsUnderPostmaster);
1647
1648 xid = XLogRecGetXid(record);
1649 next_xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
1650 epoch = EpochFromFullTransactionId(ShmemVariableCache->nextFullXid);
1651
1652 /*
1653 * If xid is numerically greater than next_xid, it has to be from the last
1654 * epoch.
1655 */
1656 if (unlikely(xid > next_xid))
1657 --epoch;
1658
1659 return FullTransactionIdFromEpochAndXid(epoch, xid);
1660 }
1661
1662 #endif
1663