1 /*-------------------------------------------------------------------------
2  *
3  * xlogreader.c
4  *		Generic XLog reading facility
5  *
6  * Portions Copyright (c) 2013-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		src/backend/access/transam/xlogreader.c
10  *
11  * NOTES
12  *		See xlogreader.h for more notes on this facility.
13  *
14  *		This file is compiled as both front-end and backend code, so it
15  *		may not use ereport, server-defined static variables, etc.
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 
20 #include "access/transam.h"
21 #include "access/xlogrecord.h"
22 #include "access/xlog_internal.h"
23 #include "access/xlogreader.h"
24 #include "catalog/pg_control.h"
25 #include "common/pg_lzcompress.h"
26 #include "replication/origin.h"
27 
28 #ifndef FRONTEND
29 #include "utils/memutils.h"
30 #endif
31 
32 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
33 
34 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
35 					  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
36 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
37 				XLogRecPtr recptr);
38 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
39 				 int reqLen);
40 static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3);
41 
42 static void ResetDecoder(XLogReaderState *state);
43 
44 /* size of the buffer allocated for error message. */
45 #define MAX_ERRORMSG_LEN 1000
46 
47 /*
48  * Construct a string in state->errormsg_buf explaining what's wrong with
49  * the current record being read.
50  */
51 static void
52 report_invalid_record(XLogReaderState *state, const char *fmt,...)
53 {
54 	va_list		args;
55 
56 	fmt = _(fmt);
57 
58 	va_start(args, fmt);
59 	vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
60 	va_end(args);
61 }
62 
63 /*
64  * Allocate and initialize a new XLogReader.
65  *
66  * Returns NULL if the xlogreader couldn't be allocated.
67  */
68 XLogReaderState *
69 XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc,
70 				   void *private_data)
71 {
72 	XLogReaderState *state;
73 
74 	state = (XLogReaderState *)
75 		palloc_extended(sizeof(XLogReaderState),
76 						MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
77 	if (!state)
78 		return NULL;
79 
80 	state->max_block_id = -1;
81 
82 	/*
83 	 * Permanently allocate readBuf.  We do it this way, rather than just
84 	 * making a static array, for two reasons: (1) no need to waste the
85 	 * storage in most instantiations of the backend; (2) a static char array
86 	 * isn't guaranteed to have any particular alignment, whereas
87 	 * palloc_extended() will provide MAXALIGN'd storage.
88 	 */
89 	state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
90 											  MCXT_ALLOC_NO_OOM);
91 	if (!state->readBuf)
92 	{
93 		pfree(state);
94 		return NULL;
95 	}
96 
97 	state->wal_segment_size = wal_segment_size;
98 	state->read_page = pagereadfunc;
99 	/* system_identifier initialized to zeroes above */
100 	state->private_data = private_data;
101 	/* ReadRecPtr and EndRecPtr initialized to zeroes above */
102 	/* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
103 	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
104 										  MCXT_ALLOC_NO_OOM);
105 	if (!state->errormsg_buf)
106 	{
107 		pfree(state->readBuf);
108 		pfree(state);
109 		return NULL;
110 	}
111 	state->errormsg_buf[0] = '\0';
112 
113 	/*
114 	 * Allocate an initial readRecordBuf of minimal size, which can later be
115 	 * enlarged if necessary.
116 	 */
117 	if (!allocate_recordbuf(state, 0))
118 	{
119 		pfree(state->errormsg_buf);
120 		pfree(state->readBuf);
121 		pfree(state);
122 		return NULL;
123 	}
124 
125 	return state;
126 }
127 
128 void
129 XLogReaderFree(XLogReaderState *state)
130 {
131 	int			block_id;
132 
133 	for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
134 	{
135 		if (state->blocks[block_id].data)
136 			pfree(state->blocks[block_id].data);
137 	}
138 	if (state->main_data)
139 		pfree(state->main_data);
140 
141 	pfree(state->errormsg_buf);
142 	if (state->readRecordBuf)
143 		pfree(state->readRecordBuf);
144 	pfree(state->readBuf);
145 	pfree(state);
146 }
147 
148 /*
149  * Allocate readRecordBuf to fit a record of at least the given length.
150  * Returns true if successful, false if out of memory.
151  *
152  * readRecordBufSize is set to the new buffer size.
153  *
154  * To avoid useless small increases, round its size to a multiple of
155  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
156  * with.  (That is enough for all "normal" records, but very large commit or
157  * abort records might need more space.)
158  */
159 static bool
160 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
161 {
162 	uint32		newSize = reclength;
163 
164 	newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
165 	newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
166 
167 #ifndef FRONTEND
168 
169 	/*
170 	 * Note that in much unlucky circumstances, the random data read from a
171 	 * recycled segment can cause this routine to be called with a size
172 	 * causing a hard failure at allocation.  For a standby, this would cause
173 	 * the instance to stop suddenly with a hard failure, preventing it to
174 	 * retry fetching WAL from one of its sources which could allow it to move
175 	 * on with replay without a manual restart. If the data comes from a past
176 	 * recycled segment and is still valid, then the allocation may succeed
177 	 * but record checks are going to fail so this would be short-lived.  If
178 	 * the allocation fails because of a memory shortage, then this is not a
179 	 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
180 	 */
181 	if (!AllocSizeIsValid(newSize))
182 		return false;
183 
184 #endif
185 
186 	if (state->readRecordBuf)
187 		pfree(state->readRecordBuf);
188 	state->readRecordBuf =
189 		(char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
190 	if (state->readRecordBuf == NULL)
191 	{
192 		state->readRecordBufSize = 0;
193 		return false;
194 	}
195 	state->readRecordBufSize = newSize;
196 	return true;
197 }
198 
199 /*
200  * Attempt to read an XLOG record.
201  *
202  * If RecPtr is valid, try to read a record at that position.  Otherwise
203  * try to read a record just after the last one previously read.
204  *
205  * If the read_page callback fails to read the requested data, NULL is
206  * returned.  The callback is expected to have reported the error; errormsg
207  * is set to NULL.
208  *
209  * If the reading fails for some other reason, NULL is also returned, and
210  * *errormsg is set to a string with details of the failure.
211  *
212  * The returned pointer (or *errormsg) points to an internal buffer that's
213  * valid until the next call to XLogReadRecord.
214  */
215 XLogRecord *
216 XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
217 {
218 	XLogRecord *record;
219 	XLogRecPtr	targetPagePtr;
220 	bool		randAccess;
221 	uint32		len,
222 				total_len;
223 	uint32		targetRecOff;
224 	uint32		pageHeaderSize;
225 	bool		assembled;
226 	bool		gotheader;
227 	int			readOff;
228 
229 	/*
230 	 * randAccess indicates whether to verify the previous-record pointer of
231 	 * the record we're reading.  We only do this if we're reading
232 	 * sequentially, which is what we initially assume.
233 	 */
234 	randAccess = false;
235 
236 	/* reset error state */
237 	*errormsg = NULL;
238 	state->errormsg_buf[0] = '\0';
239 
240 	ResetDecoder(state);
241 	state->abortedRecPtr = InvalidXLogRecPtr;
242 	state->missingContrecPtr = InvalidXLogRecPtr;
243 
244 	if (RecPtr == InvalidXLogRecPtr)
245 	{
246 		/* No explicit start point; read the record after the one we just read */
247 		RecPtr = state->EndRecPtr;
248 
249 		if (state->ReadRecPtr == InvalidXLogRecPtr)
250 			randAccess = true;
251 
252 		/*
253 		 * RecPtr is pointing to end+1 of the previous WAL record.  If we're
254 		 * at a page boundary, no more records can fit on the current page. We
255 		 * must skip over the page header, but we can't do that until we've
256 		 * read in the page, since the header size is variable.
257 		 */
258 	}
259 	else
260 	{
261 		/*
262 		 * Caller supplied a position to start at.
263 		 *
264 		 * In this case, the passed-in record pointer should already be
265 		 * pointing to a valid record starting position.
266 		 */
267 		Assert(XRecOffIsValid(RecPtr));
268 		randAccess = true;
269 	}
270 
271 restart:
272 	state->currRecPtr = RecPtr;
273 	assembled = false;
274 
275 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
276 	targetRecOff = RecPtr % XLOG_BLCKSZ;
277 
278 	/*
279 	 * Read the page containing the record into state->readBuf. Request enough
280 	 * byte to cover the whole record header, or at least the part of it that
281 	 * fits on the same page.
282 	 */
283 	readOff = ReadPageInternal(state,
284 							   targetPagePtr,
285 							   Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
286 	if (readOff < 0)
287 		goto err;
288 
289 	/*
290 	 * ReadPageInternal always returns at least the page header, so we can
291 	 * examine it now.
292 	 */
293 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
294 	if (targetRecOff == 0)
295 	{
296 		/*
297 		 * At page start, so skip over page header.
298 		 */
299 		RecPtr += pageHeaderSize;
300 		targetRecOff = pageHeaderSize;
301 	}
302 	else if (targetRecOff < pageHeaderSize)
303 	{
304 		report_invalid_record(state, "invalid record offset at %X/%X",
305 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
306 		goto err;
307 	}
308 
309 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
310 		targetRecOff == pageHeaderSize)
311 	{
312 		report_invalid_record(state, "contrecord is requested by %X/%X",
313 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
314 		goto err;
315 	}
316 
317 	/* ReadPageInternal has verified the page header */
318 	Assert(pageHeaderSize <= readOff);
319 
320 	/*
321 	 * Read the record length.
322 	 *
323 	 * NB: Even though we use an XLogRecord pointer here, the whole record
324 	 * header might not fit on this page. xl_tot_len is the first field of the
325 	 * struct, so it must be on this page (the records are MAXALIGNed), but we
326 	 * cannot access any other fields until we've verified that we got the
327 	 * whole header.
328 	 */
329 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
330 	total_len = record->xl_tot_len;
331 
332 	/*
333 	 * If the whole record header is on this page, validate it immediately.
334 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
335 	 * rest of the header after reading it from the next page.  The xl_tot_len
336 	 * check is necessary here to ensure that we enter the "Need to reassemble
337 	 * record" code path below; otherwise we might fail to apply
338 	 * ValidXLogRecordHeader at all.
339 	 */
340 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
341 	{
342 		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
343 								   randAccess))
344 			goto err;
345 		gotheader = true;
346 	}
347 	else
348 	{
349 		/* XXX: more validation should be done here */
350 		if (total_len < SizeOfXLogRecord)
351 		{
352 			report_invalid_record(state,
353 								  "invalid record length at %X/%X: wanted %u, got %u",
354 								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
355 								  (uint32) SizeOfXLogRecord, total_len);
356 			goto err;
357 		}
358 		gotheader = false;
359 	}
360 
361 	/*
362 	 * Enlarge readRecordBuf as needed.
363 	 */
364 	if (total_len > state->readRecordBufSize &&
365 		!allocate_recordbuf(state, total_len))
366 	{
367 		/* We treat this as a "bogus data" condition */
368 		report_invalid_record(state, "record length %u at %X/%X too long",
369 							  total_len,
370 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
371 		goto err;
372 	}
373 
374 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
375 	if (total_len > len)
376 	{
377 		/* Need to reassemble record */
378 		char	   *contdata;
379 		XLogPageHeader pageHeader;
380 		char	   *buffer;
381 		uint32		gotlen;
382 
383 		assembled = true;
384 		/* Copy the first fragment of the record from the first page. */
385 		memcpy(state->readRecordBuf,
386 			   state->readBuf + RecPtr % XLOG_BLCKSZ, len);
387 		buffer = state->readRecordBuf + len;
388 		gotlen = len;
389 
390 		do
391 		{
392 			/* Calculate pointer to beginning of next page */
393 			targetPagePtr += XLOG_BLCKSZ;
394 
395 			/* Wait for the next page to become available */
396 			readOff = ReadPageInternal(state, targetPagePtr,
397 									   Min(total_len - gotlen + SizeOfXLogShortPHD,
398 										   XLOG_BLCKSZ));
399 
400 			if (readOff < 0)
401 				goto err;
402 
403 			Assert(SizeOfXLogShortPHD <= readOff);
404 
405 			pageHeader = (XLogPageHeader) state->readBuf;
406 
407 			/*
408 			 * If we were expecting a continuation record and got an
409 			 * "overwrite contrecord" flag, that means the continuation record
410 			 * was overwritten with a different record.  Restart the read by
411 			 * assuming the address to read is the location where we found
412 			 * this flag; but keep track of the LSN of the record we were
413 			 * reading, for later verification.
414 			 */
415 			if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
416 			{
417 				state->overwrittenRecPtr = state->currRecPtr;
418 				ResetDecoder(state);
419 				RecPtr = targetPagePtr;
420 				goto restart;
421 			}
422 
423 			/* Check that the continuation on next page looks valid */
424 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
425 			{
426 				report_invalid_record(state,
427 									  "there is no contrecord flag at %X/%X",
428 									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
429 				goto err;
430 			}
431 
432 			/*
433 			 * Cross-check that xlp_rem_len agrees with how much of the record
434 			 * we expect there to be left.
435 			 */
436 			if (pageHeader->xlp_rem_len == 0 ||
437 				total_len != (pageHeader->xlp_rem_len + gotlen))
438 			{
439 				report_invalid_record(state,
440 									  "invalid contrecord length %u at %X/%X",
441 									  pageHeader->xlp_rem_len,
442 									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
443 				goto err;
444 			}
445 
446 			/* Append the continuation from this page to the buffer */
447 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
448 
449 			if (readOff < pageHeaderSize)
450 				readOff = ReadPageInternal(state, targetPagePtr,
451 										   pageHeaderSize);
452 
453 			Assert(pageHeaderSize <= readOff);
454 
455 			contdata = (char *) state->readBuf + pageHeaderSize;
456 			len = XLOG_BLCKSZ - pageHeaderSize;
457 			if (pageHeader->xlp_rem_len < len)
458 				len = pageHeader->xlp_rem_len;
459 
460 			if (readOff < pageHeaderSize + len)
461 				readOff = ReadPageInternal(state, targetPagePtr,
462 										   pageHeaderSize + len);
463 
464 			memcpy(buffer, (char *) contdata, len);
465 			buffer += len;
466 			gotlen += len;
467 
468 			/* If we just reassembled the record header, validate it. */
469 			if (!gotheader)
470 			{
471 				record = (XLogRecord *) state->readRecordBuf;
472 				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
473 										   record, randAccess))
474 					goto err;
475 				gotheader = true;
476 			}
477 		} while (gotlen < total_len);
478 
479 		Assert(gotheader);
480 
481 		record = (XLogRecord *) state->readRecordBuf;
482 		if (!ValidXLogRecord(state, record, RecPtr))
483 			goto err;
484 
485 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
486 		state->ReadRecPtr = RecPtr;
487 		state->EndRecPtr = targetPagePtr + pageHeaderSize
488 			+ MAXALIGN(pageHeader->xlp_rem_len);
489 	}
490 	else
491 	{
492 		/* Wait for the record data to become available */
493 		readOff = ReadPageInternal(state, targetPagePtr,
494 								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
495 		if (readOff < 0)
496 			goto err;
497 
498 		/* Record does not cross a page boundary */
499 		if (!ValidXLogRecord(state, record, RecPtr))
500 			goto err;
501 
502 		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
503 
504 		state->ReadRecPtr = RecPtr;
505 		memcpy(state->readRecordBuf, record, total_len);
506 	}
507 
508 	/*
509 	 * Special processing if it's an XLOG SWITCH record
510 	 */
511 	if (record->xl_rmid == RM_XLOG_ID &&
512 		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
513 	{
514 		/* Pretend it extends to end of segment */
515 		state->EndRecPtr += state->wal_segment_size - 1;
516 		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size);
517 	}
518 
519 	if (DecodeXLogRecord(state, record, errormsg))
520 		return record;
521 	else
522 		return NULL;
523 
524 err:
525 	if (assembled)
526 	{
527 		/*
528 		 * We get here when a record that spans multiple pages needs to be
529 		 * assembled, but something went wrong -- perhaps a contrecord piece
530 		 * was lost.  If caller is WAL replay, it will know where the aborted
531 		 * record was and where to direct followup WAL to be written, marking
532 		 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
533 		 * in turn signal downstream WAL consumers that the broken WAL record
534 		 * is to be ignored.
535 		 */
536 		state->abortedRecPtr = RecPtr;
537 		state->missingContrecPtr = targetPagePtr;
538 	}
539 
540 	/*
541 	 * Invalidate the read state. We might read from a different source after
542 	 * failure.
543 	 */
544 	XLogReaderInvalReadState(state);
545 
546 	if (state->errormsg_buf[0] != '\0')
547 		*errormsg = state->errormsg_buf;
548 
549 	return NULL;
550 }
551 
552 /*
553  * Read a single xlog page including at least [pageptr, reqLen] of valid data
554  * via the read_page() callback.
555  *
556  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
557  * is set in that case (unless the error occurs in the read_page callback).
558  *
559  * We fetch the page from a reader-local cache if we know we have the required
560  * data and if there hasn't been any error since caching the data.
561  */
562 static int
563 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
564 {
565 	int			readLen;
566 	uint32		targetPageOff;
567 	XLogSegNo	targetSegNo;
568 	XLogPageHeader hdr;
569 
570 	Assert((pageptr % XLOG_BLCKSZ) == 0);
571 
572 	XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size);
573 	targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size);
574 
575 	/* check whether we have all the requested data already */
576 	if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
577 		reqLen < state->readLen)
578 		return state->readLen;
579 
580 	/*
581 	 * Data is not in our buffer.
582 	 *
583 	 * Every time we actually read the page, even if we looked at parts of it
584 	 * before, we need to do verification as the read_page callback might now
585 	 * be rereading data from a different source.
586 	 *
587 	 * Whenever switching to a new WAL segment, we read the first page of the
588 	 * file and validate its header, even if that's not where the target
589 	 * record is.  This is so that we can check the additional identification
590 	 * info that is present in the first page's "long" header.
591 	 */
592 	if (targetSegNo != state->readSegNo && targetPageOff != 0)
593 	{
594 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
595 
596 		readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
597 								   state->currRecPtr,
598 								   state->readBuf, &state->readPageTLI);
599 		if (readLen < 0)
600 			goto err;
601 
602 		/* we can be sure to have enough WAL available, we scrolled back */
603 		Assert(readLen == XLOG_BLCKSZ);
604 
605 		if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
606 										  state->readBuf))
607 			goto err;
608 	}
609 
610 	/*
611 	 * First, read the requested data length, but at least a short page header
612 	 * so that we can validate it.
613 	 */
614 	readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
615 							   state->currRecPtr,
616 							   state->readBuf, &state->readPageTLI);
617 	if (readLen < 0)
618 		goto err;
619 
620 	Assert(readLen <= XLOG_BLCKSZ);
621 
622 	/* Do we have enough data to check the header length? */
623 	if (readLen <= SizeOfXLogShortPHD)
624 		goto err;
625 
626 	Assert(readLen >= reqLen);
627 
628 	hdr = (XLogPageHeader) state->readBuf;
629 
630 	/* still not enough */
631 	if (readLen < XLogPageHeaderSize(hdr))
632 	{
633 		readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
634 								   state->currRecPtr,
635 								   state->readBuf, &state->readPageTLI);
636 		if (readLen < 0)
637 			goto err;
638 	}
639 
640 	/*
641 	 * Now that we know we have the full header, validate it.
642 	 */
643 	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
644 		goto err;
645 
646 	/* update read state information */
647 	state->readSegNo = targetSegNo;
648 	state->readOff = targetPageOff;
649 	state->readLen = readLen;
650 
651 	return readLen;
652 
653 err:
654 	XLogReaderInvalReadState(state);
655 	return -1;
656 }
657 
658 /*
659  * Invalidate the xlogreader's read state to force a re-read.
660  */
661 void
662 XLogReaderInvalReadState(XLogReaderState *state)
663 {
664 	state->readSegNo = 0;
665 	state->readOff = 0;
666 	state->readLen = 0;
667 }
668 
669 /*
670  * Validate an XLOG record header.
671  *
672  * This is just a convenience subroutine to avoid duplicated code in
673  * XLogReadRecord.  It's not intended for use from anywhere else.
674  */
675 static bool
676 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
677 					  XLogRecPtr PrevRecPtr, XLogRecord *record,
678 					  bool randAccess)
679 {
680 	if (record->xl_tot_len < SizeOfXLogRecord)
681 	{
682 		report_invalid_record(state,
683 							  "invalid record length at %X/%X: wanted %u, got %u",
684 							  (uint32) (RecPtr >> 32), (uint32) RecPtr,
685 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
686 		return false;
687 	}
688 	if (record->xl_rmid > RM_MAX_ID)
689 	{
690 		report_invalid_record(state,
691 							  "invalid resource manager ID %u at %X/%X",
692 							  record->xl_rmid, (uint32) (RecPtr >> 32),
693 							  (uint32) RecPtr);
694 		return false;
695 	}
696 	if (randAccess)
697 	{
698 		/*
699 		 * We can't exactly verify the prev-link, but surely it should be less
700 		 * than the record's own address.
701 		 */
702 		if (!(record->xl_prev < RecPtr))
703 		{
704 			report_invalid_record(state,
705 								  "record with incorrect prev-link %X/%X at %X/%X",
706 								  (uint32) (record->xl_prev >> 32),
707 								  (uint32) record->xl_prev,
708 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
709 			return false;
710 		}
711 	}
712 	else
713 	{
714 		/*
715 		 * Record's prev-link should exactly match our previous location. This
716 		 * check guards against torn WAL pages where a stale but valid-looking
717 		 * WAL record starts on a sector boundary.
718 		 */
719 		if (record->xl_prev != PrevRecPtr)
720 		{
721 			report_invalid_record(state,
722 								  "record with incorrect prev-link %X/%X at %X/%X",
723 								  (uint32) (record->xl_prev >> 32),
724 								  (uint32) record->xl_prev,
725 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
726 			return false;
727 		}
728 	}
729 
730 	return true;
731 }
732 
733 
734 /*
735  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
736  * record (other than to the minimal extent of computing the amount of
737  * data to read in) until we've checked the CRCs.
738  *
739  * We assume all of the record (that is, xl_tot_len bytes) has been read
740  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
741  * record's header, which means in particular that xl_tot_len is at least
742  * SizeOfXlogRecord.
743  */
744 static bool
745 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
746 {
747 	pg_crc32c	crc;
748 
749 	/* Calculate the CRC */
750 	INIT_CRC32C(crc);
751 	COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
752 	/* include the record header last */
753 	COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
754 	FIN_CRC32C(crc);
755 
756 	if (!EQ_CRC32C(record->xl_crc, crc))
757 	{
758 		report_invalid_record(state,
759 							  "incorrect resource manager data checksum in record at %X/%X",
760 							  (uint32) (recptr >> 32), (uint32) recptr);
761 		return false;
762 	}
763 
764 	return true;
765 }
766 
767 /*
768  * Validate a page header.
769  *
770  * Check if 'phdr' is valid as the header of the XLog page at position
771  * 'recptr'.
772  */
773 bool
774 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
775 							 char *phdr)
776 {
777 	XLogRecPtr	recaddr;
778 	XLogSegNo	segno;
779 	int32		offset;
780 	XLogPageHeader hdr = (XLogPageHeader) phdr;
781 
782 	Assert((recptr % XLOG_BLCKSZ) == 0);
783 
784 	XLByteToSeg(recptr, segno, state->wal_segment_size);
785 	offset = XLogSegmentOffset(recptr, state->wal_segment_size);
786 
787 	XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr);
788 
789 	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
790 	{
791 		char		fname[MAXFNAMELEN];
792 
793 		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
794 
795 		report_invalid_record(state,
796 							  "invalid magic number %04X in log segment %s, offset %u",
797 							  hdr->xlp_magic,
798 							  fname,
799 							  offset);
800 		return false;
801 	}
802 
803 	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
804 	{
805 		char		fname[MAXFNAMELEN];
806 
807 		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
808 
809 		report_invalid_record(state,
810 							  "invalid info bits %04X in log segment %s, offset %u",
811 							  hdr->xlp_info,
812 							  fname,
813 							  offset);
814 		return false;
815 	}
816 
817 	if (hdr->xlp_info & XLP_LONG_HEADER)
818 	{
819 		XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
820 
821 		if (state->system_identifier &&
822 			longhdr->xlp_sysid != state->system_identifier)
823 		{
824 			char		fhdrident_str[32];
825 			char		sysident_str[32];
826 
827 			/*
828 			 * Format sysids separately to keep platform-dependent format code
829 			 * out of the translatable message string.
830 			 */
831 			snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
832 					 longhdr->xlp_sysid);
833 			snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
834 					 state->system_identifier);
835 			report_invalid_record(state,
836 								  "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s",
837 								  fhdrident_str, sysident_str);
838 			return false;
839 		}
840 		else if (longhdr->xlp_seg_size != state->wal_segment_size)
841 		{
842 			report_invalid_record(state,
843 								  "WAL file is from different database system: incorrect segment size in page header");
844 			return false;
845 		}
846 		else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
847 		{
848 			report_invalid_record(state,
849 								  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
850 			return false;
851 		}
852 	}
853 	else if (offset == 0)
854 	{
855 		char		fname[MAXFNAMELEN];
856 
857 		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
858 
859 		/* hmm, first page of file doesn't have a long header? */
860 		report_invalid_record(state,
861 							  "invalid info bits %04X in log segment %s, offset %u",
862 							  hdr->xlp_info,
863 							  fname,
864 							  offset);
865 		return false;
866 	}
867 
868 	/*
869 	 * Check that the address on the page agrees with what we expected. This
870 	 * check typically fails when an old WAL segment is recycled, and hasn't
871 	 * yet been overwritten with new data yet.
872 	 */
873 	if (hdr->xlp_pageaddr != recaddr)
874 	{
875 		char		fname[MAXFNAMELEN];
876 
877 		XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
878 
879 		report_invalid_record(state,
880 							  "unexpected pageaddr %X/%X in log segment %s, offset %u",
881 							  (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
882 							  fname,
883 							  offset);
884 		return false;
885 	}
886 
887 	/*
888 	 * Since child timelines are always assigned a TLI greater than their
889 	 * immediate parent's TLI, we should never see TLI go backwards across
890 	 * successive pages of a consistent WAL sequence.
891 	 *
892 	 * Sometimes we re-read a segment that's already been (partially) read. So
893 	 * we only verify TLIs for pages that are later than the last remembered
894 	 * LSN.
895 	 */
896 	if (recptr > state->latestPagePtr)
897 	{
898 		if (hdr->xlp_tli < state->latestPageTLI)
899 		{
900 			char		fname[MAXFNAMELEN];
901 
902 			XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size);
903 
904 			report_invalid_record(state,
905 								  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
906 								  hdr->xlp_tli,
907 								  state->latestPageTLI,
908 								  fname,
909 								  offset);
910 			return false;
911 		}
912 	}
913 	state->latestPagePtr = recptr;
914 	state->latestPageTLI = hdr->xlp_tli;
915 
916 	return true;
917 }
918 
919 #ifdef FRONTEND
920 /*
921  * Functions that are currently not needed in the backend, but are better
922  * implemented inside xlogreader.c because of the internal facilities available
923  * here.
924  */
925 
926 /*
927  * Find the first record with an lsn >= RecPtr.
928  *
929  * Useful for checking whether RecPtr is a valid xlog address for reading, and
930  * to find the first valid address after some address when dumping records for
931  * debugging purposes.
932  */
933 XLogRecPtr
934 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
935 {
936 	XLogReaderState saved_state = *state;
937 	XLogRecPtr	tmpRecPtr;
938 	XLogRecPtr	found = InvalidXLogRecPtr;
939 	XLogPageHeader header;
940 	char	   *errormsg;
941 
942 	Assert(!XLogRecPtrIsInvalid(RecPtr));
943 
944 	/*
945 	 * skip over potential continuation data, keeping in mind that it may span
946 	 * multiple pages
947 	 */
948 	tmpRecPtr = RecPtr;
949 	while (true)
950 	{
951 		XLogRecPtr	targetPagePtr;
952 		int			targetRecOff;
953 		uint32		pageHeaderSize;
954 		int			readLen;
955 
956 		/*
957 		 * Compute targetRecOff. It should typically be equal or greater than
958 		 * short page-header since a valid record can't start anywhere before
959 		 * that, except when caller has explicitly specified the offset that
960 		 * falls somewhere there or when we are skipping multi-page
961 		 * continuation record. It doesn't matter though because
962 		 * ReadPageInternal() is prepared to handle that and will read at
963 		 * least short page-header worth of data
964 		 */
965 		targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
966 
967 		/* scroll back to page boundary */
968 		targetPagePtr = tmpRecPtr - targetRecOff;
969 
970 		/* Read the page containing the record */
971 		readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
972 		if (readLen < 0)
973 			goto err;
974 
975 		header = (XLogPageHeader) state->readBuf;
976 
977 		pageHeaderSize = XLogPageHeaderSize(header);
978 
979 		/* make sure we have enough data for the page header */
980 		readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
981 		if (readLen < 0)
982 			goto err;
983 
984 		/* skip over potential continuation data */
985 		if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
986 		{
987 			/*
988 			 * If the length of the remaining continuation data is more than
989 			 * what can fit in this page, the continuation record crosses over
990 			 * this page. Read the next page and try again. xlp_rem_len in the
991 			 * next page header will contain the remaining length of the
992 			 * continuation data
993 			 *
994 			 * Note that record headers are MAXALIGN'ed
995 			 */
996 			if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
997 				tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
998 			else
999 			{
1000 				/*
1001 				 * The previous continuation record ends in this page. Set
1002 				 * tmpRecPtr to point to the first valid record
1003 				 */
1004 				tmpRecPtr = targetPagePtr + pageHeaderSize
1005 					+ MAXALIGN(header->xlp_rem_len);
1006 				break;
1007 			}
1008 		}
1009 		else
1010 		{
1011 			tmpRecPtr = targetPagePtr + pageHeaderSize;
1012 			break;
1013 		}
1014 	}
1015 
1016 	/*
1017 	 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1018 	 * because either we're at the first record after the beginning of a page
1019 	 * or we just jumped over the remaining data of a continuation.
1020 	 */
1021 	while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
1022 	{
1023 		/* continue after the record */
1024 		tmpRecPtr = InvalidXLogRecPtr;
1025 
1026 		/* past the record we've found, break out */
1027 		if (RecPtr <= state->ReadRecPtr)
1028 		{
1029 			found = state->ReadRecPtr;
1030 			goto out;
1031 		}
1032 	}
1033 
1034 err:
1035 out:
1036 	/* Reset state to what we had before finding the record */
1037 	state->ReadRecPtr = saved_state.ReadRecPtr;
1038 	state->EndRecPtr = saved_state.EndRecPtr;
1039 	XLogReaderInvalReadState(state);
1040 
1041 	return found;
1042 }
1043 
1044 #endif							/* FRONTEND */
1045 
1046 
1047 /* ----------------------------------------
1048  * Functions for decoding the data and block references in a record.
1049  * ----------------------------------------
1050  */
1051 
1052 /* private function to reset the state between records */
1053 static void
1054 ResetDecoder(XLogReaderState *state)
1055 {
1056 	int			block_id;
1057 
1058 	state->decoded_record = NULL;
1059 
1060 	state->main_data_len = 0;
1061 
1062 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1063 	{
1064 		state->blocks[block_id].in_use = false;
1065 		state->blocks[block_id].has_image = false;
1066 		state->blocks[block_id].has_data = false;
1067 		state->blocks[block_id].apply_image = false;
1068 	}
1069 	state->max_block_id = -1;
1070 }
1071 
1072 /*
1073  * Decode the previously read record.
1074  *
1075  * On error, a human-readable error message is returned in *errormsg, and
1076  * the return value is false.
1077  */
1078 bool
1079 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1080 {
1081 	/*
1082 	 * read next _size bytes from record buffer, but check for overrun first.
1083 	 */
1084 #define COPY_HEADER_FIELD(_dst, _size)			\
1085 	do {										\
1086 		if (remaining < _size)					\
1087 			goto shortdata_err;					\
1088 		memcpy(_dst, ptr, _size);				\
1089 		ptr += _size;							\
1090 		remaining -= _size;						\
1091 	} while(0)
1092 
1093 	char	   *ptr;
1094 	uint32		remaining;
1095 	uint32		datatotal;
1096 	RelFileNode *rnode = NULL;
1097 	uint8		block_id;
1098 
1099 	ResetDecoder(state);
1100 
1101 	state->decoded_record = record;
1102 	state->record_origin = InvalidRepOriginId;
1103 
1104 	ptr = (char *) record;
1105 	ptr += SizeOfXLogRecord;
1106 	remaining = record->xl_tot_len - SizeOfXLogRecord;
1107 
1108 	/* Decode the headers */
1109 	datatotal = 0;
1110 	while (remaining > datatotal)
1111 	{
1112 		COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1113 
1114 		if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1115 		{
1116 			/* XLogRecordDataHeaderShort */
1117 			uint8		main_data_len;
1118 
1119 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1120 
1121 			state->main_data_len = main_data_len;
1122 			datatotal += main_data_len;
1123 			break;				/* by convention, the main data fragment is
1124 								 * always last */
1125 		}
1126 		else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1127 		{
1128 			/* XLogRecordDataHeaderLong */
1129 			uint32		main_data_len;
1130 
1131 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1132 			state->main_data_len = main_data_len;
1133 			datatotal += main_data_len;
1134 			break;				/* by convention, the main data fragment is
1135 								 * always last */
1136 		}
1137 		else if (block_id == XLR_BLOCK_ID_ORIGIN)
1138 		{
1139 			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1140 		}
1141 		else if (block_id <= XLR_MAX_BLOCK_ID)
1142 		{
1143 			/* XLogRecordBlockHeader */
1144 			DecodedBkpBlock *blk;
1145 			uint8		fork_flags;
1146 
1147 			if (block_id <= state->max_block_id)
1148 			{
1149 				report_invalid_record(state,
1150 									  "out-of-order block_id %u at %X/%X",
1151 									  block_id,
1152 									  (uint32) (state->ReadRecPtr >> 32),
1153 									  (uint32) state->ReadRecPtr);
1154 				goto err;
1155 			}
1156 			state->max_block_id = block_id;
1157 
1158 			blk = &state->blocks[block_id];
1159 			blk->in_use = true;
1160 			blk->apply_image = false;
1161 
1162 			COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1163 			blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1164 			blk->flags = fork_flags;
1165 			blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1166 			blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1167 
1168 			COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1169 			/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1170 			if (blk->has_data && blk->data_len == 0)
1171 			{
1172 				report_invalid_record(state,
1173 									  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1174 									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1175 				goto err;
1176 			}
1177 			if (!blk->has_data && blk->data_len != 0)
1178 			{
1179 				report_invalid_record(state,
1180 									  "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1181 									  (unsigned int) blk->data_len,
1182 									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1183 				goto err;
1184 			}
1185 			datatotal += blk->data_len;
1186 
1187 			if (blk->has_image)
1188 			{
1189 				COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1190 				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1191 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1192 
1193 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1194 
1195 				if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1196 				{
1197 					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1198 						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1199 					else
1200 						blk->hole_length = 0;
1201 				}
1202 				else
1203 					blk->hole_length = BLCKSZ - blk->bimg_len;
1204 				datatotal += blk->bimg_len;
1205 
1206 				/*
1207 				 * cross-check that hole_offset > 0, hole_length > 0 and
1208 				 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1209 				 */
1210 				if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1211 					(blk->hole_offset == 0 ||
1212 					 blk->hole_length == 0 ||
1213 					 blk->bimg_len == BLCKSZ))
1214 				{
1215 					report_invalid_record(state,
1216 										  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1217 										  (unsigned int) blk->hole_offset,
1218 										  (unsigned int) blk->hole_length,
1219 										  (unsigned int) blk->bimg_len,
1220 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1221 					goto err;
1222 				}
1223 
1224 				/*
1225 				 * cross-check that hole_offset == 0 and hole_length == 0 if
1226 				 * the HAS_HOLE flag is not set.
1227 				 */
1228 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1229 					(blk->hole_offset != 0 || blk->hole_length != 0))
1230 				{
1231 					report_invalid_record(state,
1232 										  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1233 										  (unsigned int) blk->hole_offset,
1234 										  (unsigned int) blk->hole_length,
1235 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1236 					goto err;
1237 				}
1238 
1239 				/*
1240 				 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1241 				 * flag is set.
1242 				 */
1243 				if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1244 					blk->bimg_len == BLCKSZ)
1245 				{
1246 					report_invalid_record(state,
1247 										  "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1248 										  (unsigned int) blk->bimg_len,
1249 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1250 					goto err;
1251 				}
1252 
1253 				/*
1254 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1255 				 * IS_COMPRESSED flag is set.
1256 				 */
1257 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1258 					!(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1259 					blk->bimg_len != BLCKSZ)
1260 				{
1261 					report_invalid_record(state,
1262 										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1263 										  (unsigned int) blk->data_len,
1264 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1265 					goto err;
1266 				}
1267 			}
1268 			if (!(fork_flags & BKPBLOCK_SAME_REL))
1269 			{
1270 				COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1271 				rnode = &blk->rnode;
1272 			}
1273 			else
1274 			{
1275 				if (rnode == NULL)
1276 				{
1277 					report_invalid_record(state,
1278 										  "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1279 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1280 					goto err;
1281 				}
1282 
1283 				blk->rnode = *rnode;
1284 			}
1285 			COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1286 		}
1287 		else
1288 		{
1289 			report_invalid_record(state,
1290 								  "invalid block_id %u at %X/%X",
1291 								  block_id,
1292 								  (uint32) (state->ReadRecPtr >> 32),
1293 								  (uint32) state->ReadRecPtr);
1294 			goto err;
1295 		}
1296 	}
1297 
1298 	if (remaining != datatotal)
1299 		goto shortdata_err;
1300 
1301 	/*
1302 	 * Ok, we've parsed the fragment headers, and verified that the total
1303 	 * length of the payload in the fragments is equal to the amount of data
1304 	 * left. Copy the data of each fragment to a separate buffer.
1305 	 *
1306 	 * We could just set up pointers into readRecordBuf, but we want to align
1307 	 * the data for the convenience of the callers. Backup images are not
1308 	 * copied, however; they don't need alignment.
1309 	 */
1310 
1311 	/* block data first */
1312 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1313 	{
1314 		DecodedBkpBlock *blk = &state->blocks[block_id];
1315 
1316 		if (!blk->in_use)
1317 			continue;
1318 
1319 		Assert(blk->has_image || !blk->apply_image);
1320 
1321 		if (blk->has_image)
1322 		{
1323 			blk->bkp_image = ptr;
1324 			ptr += blk->bimg_len;
1325 		}
1326 		if (blk->has_data)
1327 		{
1328 			if (!blk->data || blk->data_len > blk->data_bufsz)
1329 			{
1330 				if (blk->data)
1331 					pfree(blk->data);
1332 
1333 				/*
1334 				 * Force the initial request to be BLCKSZ so that we don't
1335 				 * waste time with lots of trips through this stanza as a
1336 				 * result of WAL compression.
1337 				 */
1338 				blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
1339 				blk->data = palloc(blk->data_bufsz);
1340 			}
1341 			memcpy(blk->data, ptr, blk->data_len);
1342 			ptr += blk->data_len;
1343 		}
1344 	}
1345 
1346 	/* and finally, the main data */
1347 	if (state->main_data_len > 0)
1348 	{
1349 		if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1350 		{
1351 			if (state->main_data)
1352 				pfree(state->main_data);
1353 
1354 			/*
1355 			 * main_data_bufsz must be MAXALIGN'ed.  In many xlog record
1356 			 * types, we omit trailing struct padding on-disk to save a few
1357 			 * bytes; but compilers may generate accesses to the xlog struct
1358 			 * that assume that padding bytes are present.  If the palloc
1359 			 * request is not large enough to include such padding bytes then
1360 			 * we'll get valgrind complaints due to otherwise-harmless fetches
1361 			 * of the padding bytes.
1362 			 *
1363 			 * In addition, force the initial request to be reasonably large
1364 			 * so that we don't waste time with lots of trips through this
1365 			 * stanza.  BLCKSZ / 2 seems like a good compromise choice.
1366 			 */
1367 			state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1368 												  BLCKSZ / 2));
1369 			state->main_data = palloc(state->main_data_bufsz);
1370 		}
1371 		memcpy(state->main_data, ptr, state->main_data_len);
1372 		ptr += state->main_data_len;
1373 	}
1374 
1375 	return true;
1376 
1377 shortdata_err:
1378 	report_invalid_record(state,
1379 						  "record with invalid length at %X/%X",
1380 						  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1381 err:
1382 	*errormsg = state->errormsg_buf;
1383 
1384 	return false;
1385 }
1386 
1387 /*
1388  * Returns information about the block that a block reference refers to.
1389  *
1390  * If the WAL record contains a block reference with the given ID, *rnode,
1391  * *forknum, and *blknum are filled in (if not NULL), and returns true.
1392  * Otherwise returns false.
1393  */
1394 bool
1395 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1396 				   RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1397 {
1398 	DecodedBkpBlock *bkpb;
1399 
1400 	if (!record->blocks[block_id].in_use)
1401 		return false;
1402 
1403 	bkpb = &record->blocks[block_id];
1404 	if (rnode)
1405 		*rnode = bkpb->rnode;
1406 	if (forknum)
1407 		*forknum = bkpb->forknum;
1408 	if (blknum)
1409 		*blknum = bkpb->blkno;
1410 	return true;
1411 }
1412 
1413 /*
1414  * Returns the data associated with a block reference, or NULL if there is
1415  * no data (e.g. because a full-page image was taken instead). The returned
1416  * pointer points to a MAXALIGNed buffer.
1417  */
1418 char *
1419 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1420 {
1421 	DecodedBkpBlock *bkpb;
1422 
1423 	if (!record->blocks[block_id].in_use)
1424 		return NULL;
1425 
1426 	bkpb = &record->blocks[block_id];
1427 
1428 	if (!bkpb->has_data)
1429 	{
1430 		if (len)
1431 			*len = 0;
1432 		return NULL;
1433 	}
1434 	else
1435 	{
1436 		if (len)
1437 			*len = bkpb->data_len;
1438 		return bkpb->data;
1439 	}
1440 }
1441 
1442 /*
1443  * Restore a full-page image from a backup block attached to an XLOG record.
1444  *
1445  * Returns true if a full-page image is restored.
1446  */
1447 bool
1448 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1449 {
1450 	DecodedBkpBlock *bkpb;
1451 	char	   *ptr;
1452 	PGAlignedBlock tmp;
1453 
1454 	if (!record->blocks[block_id].in_use)
1455 		return false;
1456 	if (!record->blocks[block_id].has_image)
1457 		return false;
1458 
1459 	bkpb = &record->blocks[block_id];
1460 	ptr = bkpb->bkp_image;
1461 
1462 	if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1463 	{
1464 		/* If a backup block image is compressed, decompress it */
1465 		if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1466 							BLCKSZ - bkpb->hole_length) < 0)
1467 		{
1468 			report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1469 								  (uint32) (record->ReadRecPtr >> 32),
1470 								  (uint32) record->ReadRecPtr,
1471 								  block_id);
1472 			return false;
1473 		}
1474 		ptr = tmp.data;
1475 	}
1476 
1477 	/* generate page, taking into account hole if necessary */
1478 	if (bkpb->hole_length == 0)
1479 	{
1480 		memcpy(page, ptr, BLCKSZ);
1481 	}
1482 	else
1483 	{
1484 		memcpy(page, ptr, bkpb->hole_offset);
1485 		/* must zero-fill the hole */
1486 		MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1487 		memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1488 			   ptr + bkpb->hole_offset,
1489 			   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1490 	}
1491 
1492 	return true;
1493 }
1494