1 /*-------------------------------------------------------------------------
2  *
3  * xlogreader.c
4  *		Generic XLog reading facility
5  *
6  * Portions Copyright (c) 2013-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		src/backend/access/transam/xlogreader.c
10  *
11  * NOTES
12  *		See xlogreader.h for more notes on this facility.
13  *
14  *		This file is compiled as both front-end and backend code, so it
15  *		may not use ereport, server-defined static variables, etc.
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 
20 #include <unistd.h>
21 
22 #include "access/transam.h"
23 #include "access/xlog_internal.h"
24 #include "access/xlogreader.h"
25 #include "access/xlogrecord.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "replication/origin.h"
29 
30 #ifndef FRONTEND
31 #include "miscadmin.h"
32 #include "pgstat.h"
33 #include "utils/memutils.h"
34 #endif
35 
36 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
37 			pg_attribute_printf(2, 3);
38 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
39 static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
40 							 int reqLen);
41 static void XLogReaderInvalReadState(XLogReaderState *state);
42 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
43 								  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
44 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
45 							XLogRecPtr recptr);
46 static void ResetDecoder(XLogReaderState *state);
47 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
48 							   int segsize, const char *waldir);
49 
50 /* size of the buffer allocated for error message. */
51 #define MAX_ERRORMSG_LEN 1000
52 
53 /*
54  * Construct a string in state->errormsg_buf explaining what's wrong with
55  * the current record being read.
56  */
57 static void
report_invalid_record(XLogReaderState * state,const char * fmt,...)58 report_invalid_record(XLogReaderState *state, const char *fmt,...)
59 {
60 	va_list		args;
61 
62 	fmt = _(fmt);
63 
64 	va_start(args, fmt);
65 	vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
66 	va_end(args);
67 }
68 
69 /*
70  * Allocate and initialize a new XLogReader.
71  *
72  * Returns NULL if the xlogreader couldn't be allocated.
73  */
74 XLogReaderState *
XLogReaderAllocate(int wal_segment_size,const char * waldir,XLogReaderRoutine * routine,void * private_data)75 XLogReaderAllocate(int wal_segment_size, const char *waldir,
76 				   XLogReaderRoutine *routine, void *private_data)
77 {
78 	XLogReaderState *state;
79 
80 	state = (XLogReaderState *)
81 		palloc_extended(sizeof(XLogReaderState),
82 						MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
83 	if (!state)
84 		return NULL;
85 
86 	/* initialize caller-provided support functions */
87 	state->routine = *routine;
88 
89 	state->max_block_id = -1;
90 
91 	/*
92 	 * Permanently allocate readBuf.  We do it this way, rather than just
93 	 * making a static array, for two reasons: (1) no need to waste the
94 	 * storage in most instantiations of the backend; (2) a static char array
95 	 * isn't guaranteed to have any particular alignment, whereas
96 	 * palloc_extended() will provide MAXALIGN'd storage.
97 	 */
98 	state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
99 											  MCXT_ALLOC_NO_OOM);
100 	if (!state->readBuf)
101 	{
102 		pfree(state);
103 		return NULL;
104 	}
105 
106 	/* Initialize segment info. */
107 	WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
108 					   waldir);
109 
110 	/* system_identifier initialized to zeroes above */
111 	state->private_data = private_data;
112 	/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
113 	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
114 										  MCXT_ALLOC_NO_OOM);
115 	if (!state->errormsg_buf)
116 	{
117 		pfree(state->readBuf);
118 		pfree(state);
119 		return NULL;
120 	}
121 	state->errormsg_buf[0] = '\0';
122 
123 	/*
124 	 * Allocate an initial readRecordBuf of minimal size, which can later be
125 	 * enlarged if necessary.
126 	 */
127 	if (!allocate_recordbuf(state, 0))
128 	{
129 		pfree(state->errormsg_buf);
130 		pfree(state->readBuf);
131 		pfree(state);
132 		return NULL;
133 	}
134 
135 	return state;
136 }
137 
138 void
XLogReaderFree(XLogReaderState * state)139 XLogReaderFree(XLogReaderState *state)
140 {
141 	int			block_id;
142 
143 	if (state->seg.ws_file != -1)
144 		state->routine.segment_close(state);
145 
146 	for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
147 	{
148 		if (state->blocks[block_id].data)
149 			pfree(state->blocks[block_id].data);
150 	}
151 	if (state->main_data)
152 		pfree(state->main_data);
153 
154 	pfree(state->errormsg_buf);
155 	if (state->readRecordBuf)
156 		pfree(state->readRecordBuf);
157 	pfree(state->readBuf);
158 	pfree(state);
159 }
160 
161 /*
162  * Allocate readRecordBuf to fit a record of at least the given length.
163  * Returns true if successful, false if out of memory.
164  *
165  * readRecordBufSize is set to the new buffer size.
166  *
167  * To avoid useless small increases, round its size to a multiple of
168  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
169  * with.  (That is enough for all "normal" records, but very large commit or
170  * abort records might need more space.)
171  */
172 static bool
allocate_recordbuf(XLogReaderState * state,uint32 reclength)173 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
174 {
175 	uint32		newSize = reclength;
176 
177 	newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
178 	newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
179 
180 #ifndef FRONTEND
181 
182 	/*
183 	 * Note that in much unlucky circumstances, the random data read from a
184 	 * recycled segment can cause this routine to be called with a size
185 	 * causing a hard failure at allocation.  For a standby, this would cause
186 	 * the instance to stop suddenly with a hard failure, preventing it to
187 	 * retry fetching WAL from one of its sources which could allow it to move
188 	 * on with replay without a manual restart. If the data comes from a past
189 	 * recycled segment and is still valid, then the allocation may succeed
190 	 * but record checks are going to fail so this would be short-lived.  If
191 	 * the allocation fails because of a memory shortage, then this is not a
192 	 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
193 	 */
194 	if (!AllocSizeIsValid(newSize))
195 		return false;
196 
197 #endif
198 
199 	if (state->readRecordBuf)
200 		pfree(state->readRecordBuf);
201 	state->readRecordBuf =
202 		(char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
203 	if (state->readRecordBuf == NULL)
204 	{
205 		state->readRecordBufSize = 0;
206 		return false;
207 	}
208 	state->readRecordBufSize = newSize;
209 	return true;
210 }
211 
212 /*
213  * Initialize the passed segment structs.
214  */
215 static void
WALOpenSegmentInit(WALOpenSegment * seg,WALSegmentContext * segcxt,int segsize,const char * waldir)216 WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
217 				   int segsize, const char *waldir)
218 {
219 	seg->ws_file = -1;
220 	seg->ws_segno = 0;
221 	seg->ws_tli = 0;
222 
223 	segcxt->ws_segsize = segsize;
224 	if (waldir)
225 		snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
226 }
227 
228 /*
229  * Begin reading WAL at 'RecPtr'.
230  *
231  * 'RecPtr' should point to the beginnning of a valid WAL record.  Pointing at
232  * the beginning of a page is also OK, if there is a new record right after
233  * the page header, i.e. not a continuation.
234  *
235  * This does not make any attempt to read the WAL yet, and hence cannot fail.
236  * If the starting address is not correct, the first call to XLogReadRecord()
237  * will error out.
238  */
239 void
XLogBeginRead(XLogReaderState * state,XLogRecPtr RecPtr)240 XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
241 {
242 	Assert(!XLogRecPtrIsInvalid(RecPtr));
243 
244 	ResetDecoder(state);
245 
246 	/* Begin at the passed-in record pointer. */
247 	state->EndRecPtr = RecPtr;
248 	state->ReadRecPtr = InvalidXLogRecPtr;
249 }
250 
251 /*
252  * Attempt to read an XLOG record.
253  *
254  * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
255  * to XLogReadRecord().
256  *
257  * If the page_read callback fails to read the requested data, NULL is
258  * returned.  The callback is expected to have reported the error; errormsg
259  * is set to NULL.
260  *
261  * If the reading fails for some other reason, NULL is also returned, and
262  * *errormsg is set to a string with details of the failure.
263  *
264  * The returned pointer (or *errormsg) points to an internal buffer that's
265  * valid until the next call to XLogReadRecord.
266  */
267 XLogRecord *
XLogReadRecord(XLogReaderState * state,char ** errormsg)268 XLogReadRecord(XLogReaderState *state, char **errormsg)
269 {
270 	XLogRecPtr	RecPtr;
271 	XLogRecord *record;
272 	XLogRecPtr	targetPagePtr;
273 	bool		randAccess;
274 	uint32		len,
275 				total_len;
276 	uint32		targetRecOff;
277 	uint32		pageHeaderSize;
278 	bool		assembled;
279 	bool		gotheader;
280 	int			readOff;
281 
282 	/*
283 	 * randAccess indicates whether to verify the previous-record pointer of
284 	 * the record we're reading.  We only do this if we're reading
285 	 * sequentially, which is what we initially assume.
286 	 */
287 	randAccess = false;
288 
289 	/* reset error state */
290 	*errormsg = NULL;
291 	state->errormsg_buf[0] = '\0';
292 
293 	ResetDecoder(state);
294 	state->abortedRecPtr = InvalidXLogRecPtr;
295 	state->missingContrecPtr = InvalidXLogRecPtr;
296 
297 	RecPtr = state->EndRecPtr;
298 
299 	if (state->ReadRecPtr != InvalidXLogRecPtr)
300 	{
301 		/* read the record after the one we just read */
302 
303 		/*
304 		 * EndRecPtr is pointing to end+1 of the previous WAL record.  If
305 		 * we're at a page boundary, no more records can fit on the current
306 		 * page. We must skip over the page header, but we can't do that until
307 		 * we've read in the page, since the header size is variable.
308 		 */
309 	}
310 	else
311 	{
312 		/*
313 		 * Caller supplied a position to start at.
314 		 *
315 		 * In this case, EndRecPtr should already be pointing to a valid
316 		 * record starting position.
317 		 */
318 		Assert(XRecOffIsValid(RecPtr));
319 		randAccess = true;
320 	}
321 
322 restart:
323 	state->currRecPtr = RecPtr;
324 	assembled = false;
325 
326 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
327 	targetRecOff = RecPtr % XLOG_BLCKSZ;
328 
329 	/*
330 	 * Read the page containing the record into state->readBuf. Request enough
331 	 * byte to cover the whole record header, or at least the part of it that
332 	 * fits on the same page.
333 	 */
334 	readOff = ReadPageInternal(state, targetPagePtr,
335 							   Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
336 	if (readOff < 0)
337 		goto err;
338 
339 	/*
340 	 * ReadPageInternal always returns at least the page header, so we can
341 	 * examine it now.
342 	 */
343 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
344 	if (targetRecOff == 0)
345 	{
346 		/*
347 		 * At page start, so skip over page header.
348 		 */
349 		RecPtr += pageHeaderSize;
350 		targetRecOff = pageHeaderSize;
351 	}
352 	else if (targetRecOff < pageHeaderSize)
353 	{
354 		report_invalid_record(state, "invalid record offset at %X/%X",
355 							  LSN_FORMAT_ARGS(RecPtr));
356 		goto err;
357 	}
358 
359 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
360 		targetRecOff == pageHeaderSize)
361 	{
362 		report_invalid_record(state, "contrecord is requested by %X/%X",
363 							  LSN_FORMAT_ARGS(RecPtr));
364 		goto err;
365 	}
366 
367 	/* ReadPageInternal has verified the page header */
368 	Assert(pageHeaderSize <= readOff);
369 
370 	/*
371 	 * Read the record length.
372 	 *
373 	 * NB: Even though we use an XLogRecord pointer here, the whole record
374 	 * header might not fit on this page. xl_tot_len is the first field of the
375 	 * struct, so it must be on this page (the records are MAXALIGNed), but we
376 	 * cannot access any other fields until we've verified that we got the
377 	 * whole header.
378 	 */
379 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
380 	total_len = record->xl_tot_len;
381 
382 	/*
383 	 * If the whole record header is on this page, validate it immediately.
384 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
385 	 * rest of the header after reading it from the next page.  The xl_tot_len
386 	 * check is necessary here to ensure that we enter the "Need to reassemble
387 	 * record" code path below; otherwise we might fail to apply
388 	 * ValidXLogRecordHeader at all.
389 	 */
390 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
391 	{
392 		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
393 								   randAccess))
394 			goto err;
395 		gotheader = true;
396 	}
397 	else
398 	{
399 		/* XXX: more validation should be done here */
400 		if (total_len < SizeOfXLogRecord)
401 		{
402 			report_invalid_record(state,
403 								  "invalid record length at %X/%X: wanted %u, got %u",
404 								  LSN_FORMAT_ARGS(RecPtr),
405 								  (uint32) SizeOfXLogRecord, total_len);
406 			goto err;
407 		}
408 		gotheader = false;
409 	}
410 
411 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
412 	if (total_len > len)
413 	{
414 		/* Need to reassemble record */
415 		char	   *contdata;
416 		XLogPageHeader pageHeader;
417 		char	   *buffer;
418 		uint32		gotlen;
419 
420 		assembled = true;
421 
422 		/*
423 		 * Enlarge readRecordBuf as needed.
424 		 */
425 		if (total_len > state->readRecordBufSize &&
426 			!allocate_recordbuf(state, total_len))
427 		{
428 			/* We treat this as a "bogus data" condition */
429 			report_invalid_record(state, "record length %u at %X/%X too long",
430 								  total_len, LSN_FORMAT_ARGS(RecPtr));
431 			goto err;
432 		}
433 
434 		/* Copy the first fragment of the record from the first page. */
435 		memcpy(state->readRecordBuf,
436 			   state->readBuf + RecPtr % XLOG_BLCKSZ, len);
437 		buffer = state->readRecordBuf + len;
438 		gotlen = len;
439 
440 		do
441 		{
442 			/* Calculate pointer to beginning of next page */
443 			targetPagePtr += XLOG_BLCKSZ;
444 
445 			/* Wait for the next page to become available */
446 			readOff = ReadPageInternal(state, targetPagePtr,
447 									   Min(total_len - gotlen + SizeOfXLogShortPHD,
448 										   XLOG_BLCKSZ));
449 
450 			if (readOff < 0)
451 				goto err;
452 
453 			Assert(SizeOfXLogShortPHD <= readOff);
454 
455 			pageHeader = (XLogPageHeader) state->readBuf;
456 
457 			/*
458 			 * If we were expecting a continuation record and got an
459 			 * "overwrite contrecord" flag, that means the continuation record
460 			 * was overwritten with a different record.  Restart the read by
461 			 * assuming the address to read is the location where we found
462 			 * this flag; but keep track of the LSN of the record we were
463 			 * reading, for later verification.
464 			 */
465 			if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
466 			{
467 				state->overwrittenRecPtr = state->currRecPtr;
468 				ResetDecoder(state);
469 				RecPtr = targetPagePtr;
470 				goto restart;
471 			}
472 
473 			/* Check that the continuation on next page looks valid */
474 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
475 			{
476 				report_invalid_record(state,
477 									  "there is no contrecord flag at %X/%X",
478 									  LSN_FORMAT_ARGS(RecPtr));
479 				goto err;
480 			}
481 
482 			/*
483 			 * Cross-check that xlp_rem_len agrees with how much of the record
484 			 * we expect there to be left.
485 			 */
486 			if (pageHeader->xlp_rem_len == 0 ||
487 				total_len != (pageHeader->xlp_rem_len + gotlen))
488 			{
489 				report_invalid_record(state,
490 									  "invalid contrecord length %u (expected %lld) at %X/%X",
491 									  pageHeader->xlp_rem_len,
492 									  ((long long) total_len) - gotlen,
493 									  LSN_FORMAT_ARGS(RecPtr));
494 				goto err;
495 			}
496 
497 			/* Append the continuation from this page to the buffer */
498 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
499 
500 			if (readOff < pageHeaderSize)
501 				readOff = ReadPageInternal(state, targetPagePtr,
502 										   pageHeaderSize);
503 
504 			Assert(pageHeaderSize <= readOff);
505 
506 			contdata = (char *) state->readBuf + pageHeaderSize;
507 			len = XLOG_BLCKSZ - pageHeaderSize;
508 			if (pageHeader->xlp_rem_len < len)
509 				len = pageHeader->xlp_rem_len;
510 
511 			if (readOff < pageHeaderSize + len)
512 				readOff = ReadPageInternal(state, targetPagePtr,
513 										   pageHeaderSize + len);
514 
515 			memcpy(buffer, (char *) contdata, len);
516 			buffer += len;
517 			gotlen += len;
518 
519 			/* If we just reassembled the record header, validate it. */
520 			if (!gotheader)
521 			{
522 				record = (XLogRecord *) state->readRecordBuf;
523 				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
524 										   record, randAccess))
525 					goto err;
526 				gotheader = true;
527 			}
528 		} while (gotlen < total_len);
529 
530 		Assert(gotheader);
531 
532 		record = (XLogRecord *) state->readRecordBuf;
533 		if (!ValidXLogRecord(state, record, RecPtr))
534 			goto err;
535 
536 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
537 		state->ReadRecPtr = RecPtr;
538 		state->EndRecPtr = targetPagePtr + pageHeaderSize
539 			+ MAXALIGN(pageHeader->xlp_rem_len);
540 	}
541 	else
542 	{
543 		/* Wait for the record data to become available */
544 		readOff = ReadPageInternal(state, targetPagePtr,
545 								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
546 		if (readOff < 0)
547 			goto err;
548 
549 		/* Record does not cross a page boundary */
550 		if (!ValidXLogRecord(state, record, RecPtr))
551 			goto err;
552 
553 		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
554 
555 		state->ReadRecPtr = RecPtr;
556 	}
557 
558 	/*
559 	 * Special processing if it's an XLOG SWITCH record
560 	 */
561 	if (record->xl_rmid == RM_XLOG_ID &&
562 		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
563 	{
564 		/* Pretend it extends to end of segment */
565 		state->EndRecPtr += state->segcxt.ws_segsize - 1;
566 		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
567 	}
568 
569 	if (DecodeXLogRecord(state, record, errormsg))
570 		return record;
571 	else
572 		return NULL;
573 
574 err:
575 	if (assembled)
576 	{
577 		/*
578 		 * We get here when a record that spans multiple pages needs to be
579 		 * assembled, but something went wrong -- perhaps a contrecord piece
580 		 * was lost.  If caller is WAL replay, it will know where the aborted
581 		 * record was and where to direct followup WAL to be written, marking
582 		 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
583 		 * in turn signal downstream WAL consumers that the broken WAL record
584 		 * is to be ignored.
585 		 */
586 		state->abortedRecPtr = RecPtr;
587 		state->missingContrecPtr = targetPagePtr;
588 	}
589 
590 	/*
591 	 * Invalidate the read state. We might read from a different source after
592 	 * failure.
593 	 */
594 	XLogReaderInvalReadState(state);
595 
596 	if (state->errormsg_buf[0] != '\0')
597 		*errormsg = state->errormsg_buf;
598 
599 	return NULL;
600 }
601 
602 /*
603  * Read a single xlog page including at least [pageptr, reqLen] of valid data
604  * via the page_read() callback.
605  *
606  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
607  * is set in that case (unless the error occurs in the page_read callback).
608  *
609  * We fetch the page from a reader-local cache if we know we have the required
610  * data and if there hasn't been any error since caching the data.
611  */
612 static int
ReadPageInternal(XLogReaderState * state,XLogRecPtr pageptr,int reqLen)613 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
614 {
615 	int			readLen;
616 	uint32		targetPageOff;
617 	XLogSegNo	targetSegNo;
618 	XLogPageHeader hdr;
619 
620 	Assert((pageptr % XLOG_BLCKSZ) == 0);
621 
622 	XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
623 	targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
624 
625 	/* check whether we have all the requested data already */
626 	if (targetSegNo == state->seg.ws_segno &&
627 		targetPageOff == state->segoff && reqLen <= state->readLen)
628 		return state->readLen;
629 
630 	/*
631 	 * Data is not in our buffer.
632 	 *
633 	 * Every time we actually read the segment, even if we looked at parts of
634 	 * it before, we need to do verification as the page_read callback might
635 	 * now be rereading data from a different source.
636 	 *
637 	 * Whenever switching to a new WAL segment, we read the first page of the
638 	 * file and validate its header, even if that's not where the target
639 	 * record is.  This is so that we can check the additional identification
640 	 * info that is present in the first page's "long" header.
641 	 */
642 	if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
643 	{
644 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
645 
646 		readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
647 										   state->currRecPtr,
648 										   state->readBuf);
649 		if (readLen < 0)
650 			goto err;
651 
652 		/* we can be sure to have enough WAL available, we scrolled back */
653 		Assert(readLen == XLOG_BLCKSZ);
654 
655 		if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
656 										  state->readBuf))
657 			goto err;
658 	}
659 
660 	/*
661 	 * First, read the requested data length, but at least a short page header
662 	 * so that we can validate it.
663 	 */
664 	readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
665 									   state->currRecPtr,
666 									   state->readBuf);
667 	if (readLen < 0)
668 		goto err;
669 
670 	Assert(readLen <= XLOG_BLCKSZ);
671 
672 	/* Do we have enough data to check the header length? */
673 	if (readLen <= SizeOfXLogShortPHD)
674 		goto err;
675 
676 	Assert(readLen >= reqLen);
677 
678 	hdr = (XLogPageHeader) state->readBuf;
679 
680 	/* still not enough */
681 	if (readLen < XLogPageHeaderSize(hdr))
682 	{
683 		readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
684 										   state->currRecPtr,
685 										   state->readBuf);
686 		if (readLen < 0)
687 			goto err;
688 	}
689 
690 	/*
691 	 * Now that we know we have the full header, validate it.
692 	 */
693 	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
694 		goto err;
695 
696 	/* update read state information */
697 	state->seg.ws_segno = targetSegNo;
698 	state->segoff = targetPageOff;
699 	state->readLen = readLen;
700 
701 	return readLen;
702 
703 err:
704 	XLogReaderInvalReadState(state);
705 	return -1;
706 }
707 
708 /*
709  * Invalidate the xlogreader's read state to force a re-read.
710  */
711 static void
XLogReaderInvalReadState(XLogReaderState * state)712 XLogReaderInvalReadState(XLogReaderState *state)
713 {
714 	state->seg.ws_segno = 0;
715 	state->segoff = 0;
716 	state->readLen = 0;
717 }
718 
719 /*
720  * Validate an XLOG record header.
721  *
722  * This is just a convenience subroutine to avoid duplicated code in
723  * XLogReadRecord.  It's not intended for use from anywhere else.
724  */
725 static bool
ValidXLogRecordHeader(XLogReaderState * state,XLogRecPtr RecPtr,XLogRecPtr PrevRecPtr,XLogRecord * record,bool randAccess)726 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
727 					  XLogRecPtr PrevRecPtr, XLogRecord *record,
728 					  bool randAccess)
729 {
730 	if (record->xl_tot_len < SizeOfXLogRecord)
731 	{
732 		report_invalid_record(state,
733 							  "invalid record length at %X/%X: wanted %u, got %u",
734 							  LSN_FORMAT_ARGS(RecPtr),
735 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
736 		return false;
737 	}
738 	if (record->xl_rmid > RM_MAX_ID)
739 	{
740 		report_invalid_record(state,
741 							  "invalid resource manager ID %u at %X/%X",
742 							  record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
743 		return false;
744 	}
745 	if (randAccess)
746 	{
747 		/*
748 		 * We can't exactly verify the prev-link, but surely it should be less
749 		 * than the record's own address.
750 		 */
751 		if (!(record->xl_prev < RecPtr))
752 		{
753 			report_invalid_record(state,
754 								  "record with incorrect prev-link %X/%X at %X/%X",
755 								  LSN_FORMAT_ARGS(record->xl_prev),
756 								  LSN_FORMAT_ARGS(RecPtr));
757 			return false;
758 		}
759 	}
760 	else
761 	{
762 		/*
763 		 * Record's prev-link should exactly match our previous location. This
764 		 * check guards against torn WAL pages where a stale but valid-looking
765 		 * WAL record starts on a sector boundary.
766 		 */
767 		if (record->xl_prev != PrevRecPtr)
768 		{
769 			report_invalid_record(state,
770 								  "record with incorrect prev-link %X/%X at %X/%X",
771 								  LSN_FORMAT_ARGS(record->xl_prev),
772 								  LSN_FORMAT_ARGS(RecPtr));
773 			return false;
774 		}
775 	}
776 
777 	return true;
778 }
779 
780 
781 /*
782  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
783  * record (other than to the minimal extent of computing the amount of
784  * data to read in) until we've checked the CRCs.
785  *
786  * We assume all of the record (that is, xl_tot_len bytes) has been read
787  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
788  * record's header, which means in particular that xl_tot_len is at least
789  * SizeOfXLogRecord.
790  */
791 static bool
ValidXLogRecord(XLogReaderState * state,XLogRecord * record,XLogRecPtr recptr)792 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
793 {
794 	pg_crc32c	crc;
795 
796 	/* Calculate the CRC */
797 	INIT_CRC32C(crc);
798 	COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
799 	/* include the record header last */
800 	COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
801 	FIN_CRC32C(crc);
802 
803 	if (!EQ_CRC32C(record->xl_crc, crc))
804 	{
805 		report_invalid_record(state,
806 							  "incorrect resource manager data checksum in record at %X/%X",
807 							  LSN_FORMAT_ARGS(recptr));
808 		return false;
809 	}
810 
811 	return true;
812 }
813 
814 /*
815  * Validate a page header.
816  *
817  * Check if 'phdr' is valid as the header of the XLog page at position
818  * 'recptr'.
819  */
820 bool
XLogReaderValidatePageHeader(XLogReaderState * state,XLogRecPtr recptr,char * phdr)821 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
822 							 char *phdr)
823 {
824 	XLogRecPtr	recaddr;
825 	XLogSegNo	segno;
826 	int32		offset;
827 	XLogPageHeader hdr = (XLogPageHeader) phdr;
828 
829 	Assert((recptr % XLOG_BLCKSZ) == 0);
830 
831 	XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
832 	offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
833 
834 	XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
835 
836 	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
837 	{
838 		char		fname[MAXFNAMELEN];
839 
840 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
841 
842 		report_invalid_record(state,
843 							  "invalid magic number %04X in log segment %s, offset %u",
844 							  hdr->xlp_magic,
845 							  fname,
846 							  offset);
847 		return false;
848 	}
849 
850 	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
851 	{
852 		char		fname[MAXFNAMELEN];
853 
854 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
855 
856 		report_invalid_record(state,
857 							  "invalid info bits %04X in log segment %s, offset %u",
858 							  hdr->xlp_info,
859 							  fname,
860 							  offset);
861 		return false;
862 	}
863 
864 	if (hdr->xlp_info & XLP_LONG_HEADER)
865 	{
866 		XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
867 
868 		if (state->system_identifier &&
869 			longhdr->xlp_sysid != state->system_identifier)
870 		{
871 			report_invalid_record(state,
872 								  "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
873 								  (unsigned long long) longhdr->xlp_sysid,
874 								  (unsigned long long) state->system_identifier);
875 			return false;
876 		}
877 		else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
878 		{
879 			report_invalid_record(state,
880 								  "WAL file is from different database system: incorrect segment size in page header");
881 			return false;
882 		}
883 		else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
884 		{
885 			report_invalid_record(state,
886 								  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
887 			return false;
888 		}
889 	}
890 	else if (offset == 0)
891 	{
892 		char		fname[MAXFNAMELEN];
893 
894 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
895 
896 		/* hmm, first page of file doesn't have a long header? */
897 		report_invalid_record(state,
898 							  "invalid info bits %04X in log segment %s, offset %u",
899 							  hdr->xlp_info,
900 							  fname,
901 							  offset);
902 		return false;
903 	}
904 
905 	/*
906 	 * Check that the address on the page agrees with what we expected. This
907 	 * check typically fails when an old WAL segment is recycled, and hasn't
908 	 * yet been overwritten with new data yet.
909 	 */
910 	if (hdr->xlp_pageaddr != recaddr)
911 	{
912 		char		fname[MAXFNAMELEN];
913 
914 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
915 
916 		report_invalid_record(state,
917 							  "unexpected pageaddr %X/%X in log segment %s, offset %u",
918 							  LSN_FORMAT_ARGS(hdr->xlp_pageaddr),
919 							  fname,
920 							  offset);
921 		return false;
922 	}
923 
924 	/*
925 	 * Since child timelines are always assigned a TLI greater than their
926 	 * immediate parent's TLI, we should never see TLI go backwards across
927 	 * successive pages of a consistent WAL sequence.
928 	 *
929 	 * Sometimes we re-read a segment that's already been (partially) read. So
930 	 * we only verify TLIs for pages that are later than the last remembered
931 	 * LSN.
932 	 */
933 	if (recptr > state->latestPagePtr)
934 	{
935 		if (hdr->xlp_tli < state->latestPageTLI)
936 		{
937 			char		fname[MAXFNAMELEN];
938 
939 			XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
940 
941 			report_invalid_record(state,
942 								  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
943 								  hdr->xlp_tli,
944 								  state->latestPageTLI,
945 								  fname,
946 								  offset);
947 			return false;
948 		}
949 	}
950 	state->latestPagePtr = recptr;
951 	state->latestPageTLI = hdr->xlp_tli;
952 
953 	return true;
954 }
955 
956 #ifdef FRONTEND
957 /*
958  * Functions that are currently not needed in the backend, but are better
959  * implemented inside xlogreader.c because of the internal facilities available
960  * here.
961  */
962 
963 /*
964  * Find the first record with an lsn >= RecPtr.
965  *
966  * This is different from XLogBeginRead() in that RecPtr doesn't need to point
967  * to a valid record boundary.  Useful for checking whether RecPtr is a valid
968  * xlog address for reading, and to find the first valid address after some
969  * address when dumping records for debugging purposes.
970  *
971  * This positions the reader, like XLogBeginRead(), so that the next call to
972  * XLogReadRecord() will read the next valid record.
973  */
974 XLogRecPtr
XLogFindNextRecord(XLogReaderState * state,XLogRecPtr RecPtr)975 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
976 {
977 	XLogRecPtr	tmpRecPtr;
978 	XLogRecPtr	found = InvalidXLogRecPtr;
979 	XLogPageHeader header;
980 	char	   *errormsg;
981 
982 	Assert(!XLogRecPtrIsInvalid(RecPtr));
983 
984 	/*
985 	 * skip over potential continuation data, keeping in mind that it may span
986 	 * multiple pages
987 	 */
988 	tmpRecPtr = RecPtr;
989 	while (true)
990 	{
991 		XLogRecPtr	targetPagePtr;
992 		int			targetRecOff;
993 		uint32		pageHeaderSize;
994 		int			readLen;
995 
996 		/*
997 		 * Compute targetRecOff. It should typically be equal or greater than
998 		 * short page-header since a valid record can't start anywhere before
999 		 * that, except when caller has explicitly specified the offset that
1000 		 * falls somewhere there or when we are skipping multi-page
1001 		 * continuation record. It doesn't matter though because
1002 		 * ReadPageInternal() is prepared to handle that and will read at
1003 		 * least short page-header worth of data
1004 		 */
1005 		targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
1006 
1007 		/* scroll back to page boundary */
1008 		targetPagePtr = tmpRecPtr - targetRecOff;
1009 
1010 		/* Read the page containing the record */
1011 		readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
1012 		if (readLen < 0)
1013 			goto err;
1014 
1015 		header = (XLogPageHeader) state->readBuf;
1016 
1017 		pageHeaderSize = XLogPageHeaderSize(header);
1018 
1019 		/* make sure we have enough data for the page header */
1020 		readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
1021 		if (readLen < 0)
1022 			goto err;
1023 
1024 		/* skip over potential continuation data */
1025 		if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
1026 		{
1027 			/*
1028 			 * If the length of the remaining continuation data is more than
1029 			 * what can fit in this page, the continuation record crosses over
1030 			 * this page. Read the next page and try again. xlp_rem_len in the
1031 			 * next page header will contain the remaining length of the
1032 			 * continuation data
1033 			 *
1034 			 * Note that record headers are MAXALIGN'ed
1035 			 */
1036 			if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
1037 				tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
1038 			else
1039 			{
1040 				/*
1041 				 * The previous continuation record ends in this page. Set
1042 				 * tmpRecPtr to point to the first valid record
1043 				 */
1044 				tmpRecPtr = targetPagePtr + pageHeaderSize
1045 					+ MAXALIGN(header->xlp_rem_len);
1046 				break;
1047 			}
1048 		}
1049 		else
1050 		{
1051 			tmpRecPtr = targetPagePtr + pageHeaderSize;
1052 			break;
1053 		}
1054 	}
1055 
1056 	/*
1057 	 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1058 	 * because either we're at the first record after the beginning of a page
1059 	 * or we just jumped over the remaining data of a continuation.
1060 	 */
1061 	XLogBeginRead(state, tmpRecPtr);
1062 	while (XLogReadRecord(state, &errormsg) != NULL)
1063 	{
1064 		/* past the record we've found, break out */
1065 		if (RecPtr <= state->ReadRecPtr)
1066 		{
1067 			/* Rewind the reader to the beginning of the last record. */
1068 			found = state->ReadRecPtr;
1069 			XLogBeginRead(state, found);
1070 			return found;
1071 		}
1072 	}
1073 
1074 err:
1075 	XLogReaderInvalReadState(state);
1076 
1077 	return InvalidXLogRecPtr;
1078 }
1079 
1080 #endif							/* FRONTEND */
1081 
1082 /*
1083  * Helper function to ease writing of XLogRoutine->page_read callbacks.
1084  * If this function is used, caller must supply a segment_open callback in
1085  * 'state', as that is used here.
1086  *
1087  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
1088  * fetched from timeline 'tli'.
1089  *
1090  * Returns true if succeeded, false if an error occurs, in which case
1091  * 'errinfo' receives error details.
1092  *
1093  * XXX probably this should be improved to suck data directly from the
1094  * WAL buffers when possible.
1095  */
1096 bool
WALRead(XLogReaderState * state,char * buf,XLogRecPtr startptr,Size count,TimeLineID tli,WALReadError * errinfo)1097 WALRead(XLogReaderState *state,
1098 		char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
1099 		WALReadError *errinfo)
1100 {
1101 	char	   *p;
1102 	XLogRecPtr	recptr;
1103 	Size		nbytes;
1104 
1105 	p = buf;
1106 	recptr = startptr;
1107 	nbytes = count;
1108 
1109 	while (nbytes > 0)
1110 	{
1111 		uint32		startoff;
1112 		int			segbytes;
1113 		int			readbytes;
1114 
1115 		startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
1116 
1117 		/*
1118 		 * If the data we want is not in a segment we have open, close what we
1119 		 * have (if anything) and open the next one, using the caller's
1120 		 * provided openSegment callback.
1121 		 */
1122 		if (state->seg.ws_file < 0 ||
1123 			!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
1124 			tli != state->seg.ws_tli)
1125 		{
1126 			XLogSegNo	nextSegNo;
1127 
1128 			if (state->seg.ws_file >= 0)
1129 				state->routine.segment_close(state);
1130 
1131 			XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
1132 			state->routine.segment_open(state, nextSegNo, &tli);
1133 
1134 			/* This shouldn't happen -- indicates a bug in segment_open */
1135 			Assert(state->seg.ws_file >= 0);
1136 
1137 			/* Update the current segment info. */
1138 			state->seg.ws_tli = tli;
1139 			state->seg.ws_segno = nextSegNo;
1140 		}
1141 
1142 		/* How many bytes are within this segment? */
1143 		if (nbytes > (state->segcxt.ws_segsize - startoff))
1144 			segbytes = state->segcxt.ws_segsize - startoff;
1145 		else
1146 			segbytes = nbytes;
1147 
1148 #ifndef FRONTEND
1149 		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
1150 #endif
1151 
1152 		/* Reset errno first; eases reporting non-errno-affecting errors */
1153 		errno = 0;
1154 		readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
1155 
1156 #ifndef FRONTEND
1157 		pgstat_report_wait_end();
1158 #endif
1159 
1160 		if (readbytes <= 0)
1161 		{
1162 			errinfo->wre_errno = errno;
1163 			errinfo->wre_req = segbytes;
1164 			errinfo->wre_read = readbytes;
1165 			errinfo->wre_off = startoff;
1166 			errinfo->wre_seg = state->seg;
1167 			return false;
1168 		}
1169 
1170 		/* Update state for read */
1171 		recptr += readbytes;
1172 		nbytes -= readbytes;
1173 		p += readbytes;
1174 	}
1175 
1176 	return true;
1177 }
1178 
1179 /* ----------------------------------------
1180  * Functions for decoding the data and block references in a record.
1181  * ----------------------------------------
1182  */
1183 
1184 /* private function to reset the state between records */
1185 static void
ResetDecoder(XLogReaderState * state)1186 ResetDecoder(XLogReaderState *state)
1187 {
1188 	int			block_id;
1189 
1190 	state->decoded_record = NULL;
1191 
1192 	state->main_data_len = 0;
1193 
1194 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1195 	{
1196 		state->blocks[block_id].in_use = false;
1197 		state->blocks[block_id].has_image = false;
1198 		state->blocks[block_id].has_data = false;
1199 		state->blocks[block_id].apply_image = false;
1200 	}
1201 	state->max_block_id = -1;
1202 }
1203 
1204 /*
1205  * Decode the previously read record.
1206  *
1207  * On error, a human-readable error message is returned in *errormsg, and
1208  * the return value is false.
1209  */
1210 bool
DecodeXLogRecord(XLogReaderState * state,XLogRecord * record,char ** errormsg)1211 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1212 {
1213 	/*
1214 	 * read next _size bytes from record buffer, but check for overrun first.
1215 	 */
1216 #define COPY_HEADER_FIELD(_dst, _size)			\
1217 	do {										\
1218 		if (remaining < _size)					\
1219 			goto shortdata_err;					\
1220 		memcpy(_dst, ptr, _size);				\
1221 		ptr += _size;							\
1222 		remaining -= _size;						\
1223 	} while(0)
1224 
1225 	char	   *ptr;
1226 	uint32		remaining;
1227 	uint32		datatotal;
1228 	RelFileNode *rnode = NULL;
1229 	uint8		block_id;
1230 
1231 	ResetDecoder(state);
1232 
1233 	state->decoded_record = record;
1234 	state->record_origin = InvalidRepOriginId;
1235 	state->toplevel_xid = InvalidTransactionId;
1236 
1237 	ptr = (char *) record;
1238 	ptr += SizeOfXLogRecord;
1239 	remaining = record->xl_tot_len - SizeOfXLogRecord;
1240 
1241 	/* Decode the headers */
1242 	datatotal = 0;
1243 	while (remaining > datatotal)
1244 	{
1245 		COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1246 
1247 		if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1248 		{
1249 			/* XLogRecordDataHeaderShort */
1250 			uint8		main_data_len;
1251 
1252 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1253 
1254 			state->main_data_len = main_data_len;
1255 			datatotal += main_data_len;
1256 			break;				/* by convention, the main data fragment is
1257 								 * always last */
1258 		}
1259 		else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1260 		{
1261 			/* XLogRecordDataHeaderLong */
1262 			uint32		main_data_len;
1263 
1264 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1265 			state->main_data_len = main_data_len;
1266 			datatotal += main_data_len;
1267 			break;				/* by convention, the main data fragment is
1268 								 * always last */
1269 		}
1270 		else if (block_id == XLR_BLOCK_ID_ORIGIN)
1271 		{
1272 			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1273 		}
1274 		else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
1275 		{
1276 			COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
1277 		}
1278 		else if (block_id <= XLR_MAX_BLOCK_ID)
1279 		{
1280 			/* XLogRecordBlockHeader */
1281 			DecodedBkpBlock *blk;
1282 			uint8		fork_flags;
1283 
1284 			if (block_id <= state->max_block_id)
1285 			{
1286 				report_invalid_record(state,
1287 									  "out-of-order block_id %u at %X/%X",
1288 									  block_id,
1289 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
1290 				goto err;
1291 			}
1292 			state->max_block_id = block_id;
1293 
1294 			blk = &state->blocks[block_id];
1295 			blk->in_use = true;
1296 			blk->apply_image = false;
1297 
1298 			COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1299 			blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1300 			blk->flags = fork_flags;
1301 			blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1302 			blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1303 
1304 			COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1305 			/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1306 			if (blk->has_data && blk->data_len == 0)
1307 			{
1308 				report_invalid_record(state,
1309 									  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1310 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
1311 				goto err;
1312 			}
1313 			if (!blk->has_data && blk->data_len != 0)
1314 			{
1315 				report_invalid_record(state,
1316 									  "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1317 									  (unsigned int) blk->data_len,
1318 									  LSN_FORMAT_ARGS(state->ReadRecPtr));
1319 				goto err;
1320 			}
1321 			datatotal += blk->data_len;
1322 
1323 			if (blk->has_image)
1324 			{
1325 				COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1326 				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1327 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1328 
1329 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1330 
1331 				if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1332 				{
1333 					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1334 						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1335 					else
1336 						blk->hole_length = 0;
1337 				}
1338 				else
1339 					blk->hole_length = BLCKSZ - blk->bimg_len;
1340 				datatotal += blk->bimg_len;
1341 
1342 				/*
1343 				 * cross-check that hole_offset > 0, hole_length > 0 and
1344 				 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1345 				 */
1346 				if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1347 					(blk->hole_offset == 0 ||
1348 					 blk->hole_length == 0 ||
1349 					 blk->bimg_len == BLCKSZ))
1350 				{
1351 					report_invalid_record(state,
1352 										  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1353 										  (unsigned int) blk->hole_offset,
1354 										  (unsigned int) blk->hole_length,
1355 										  (unsigned int) blk->bimg_len,
1356 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
1357 					goto err;
1358 				}
1359 
1360 				/*
1361 				 * cross-check that hole_offset == 0 and hole_length == 0 if
1362 				 * the HAS_HOLE flag is not set.
1363 				 */
1364 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1365 					(blk->hole_offset != 0 || blk->hole_length != 0))
1366 				{
1367 					report_invalid_record(state,
1368 										  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1369 										  (unsigned int) blk->hole_offset,
1370 										  (unsigned int) blk->hole_length,
1371 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
1372 					goto err;
1373 				}
1374 
1375 				/*
1376 				 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1377 				 * flag is set.
1378 				 */
1379 				if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1380 					blk->bimg_len == BLCKSZ)
1381 				{
1382 					report_invalid_record(state,
1383 										  "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1384 										  (unsigned int) blk->bimg_len,
1385 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
1386 					goto err;
1387 				}
1388 
1389 				/*
1390 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1391 				 * IS_COMPRESSED flag is set.
1392 				 */
1393 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1394 					!(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1395 					blk->bimg_len != BLCKSZ)
1396 				{
1397 					report_invalid_record(state,
1398 										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1399 										  (unsigned int) blk->data_len,
1400 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
1401 					goto err;
1402 				}
1403 			}
1404 			if (!(fork_flags & BKPBLOCK_SAME_REL))
1405 			{
1406 				COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1407 				rnode = &blk->rnode;
1408 			}
1409 			else
1410 			{
1411 				if (rnode == NULL)
1412 				{
1413 					report_invalid_record(state,
1414 										  "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1415 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
1416 					goto err;
1417 				}
1418 
1419 				blk->rnode = *rnode;
1420 			}
1421 			COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1422 		}
1423 		else
1424 		{
1425 			report_invalid_record(state,
1426 								  "invalid block_id %u at %X/%X",
1427 								  block_id, LSN_FORMAT_ARGS(state->ReadRecPtr));
1428 			goto err;
1429 		}
1430 	}
1431 
1432 	if (remaining != datatotal)
1433 		goto shortdata_err;
1434 
1435 	/*
1436 	 * Ok, we've parsed the fragment headers, and verified that the total
1437 	 * length of the payload in the fragments is equal to the amount of data
1438 	 * left. Copy the data of each fragment to a separate buffer.
1439 	 *
1440 	 * We could just set up pointers into readRecordBuf, but we want to align
1441 	 * the data for the convenience of the callers. Backup images are not
1442 	 * copied, however; they don't need alignment.
1443 	 */
1444 
1445 	/* block data first */
1446 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1447 	{
1448 		DecodedBkpBlock *blk = &state->blocks[block_id];
1449 
1450 		if (!blk->in_use)
1451 			continue;
1452 
1453 		Assert(blk->has_image || !blk->apply_image);
1454 
1455 		if (blk->has_image)
1456 		{
1457 			blk->bkp_image = ptr;
1458 			ptr += blk->bimg_len;
1459 		}
1460 		if (blk->has_data)
1461 		{
1462 			if (!blk->data || blk->data_len > blk->data_bufsz)
1463 			{
1464 				if (blk->data)
1465 					pfree(blk->data);
1466 
1467 				/*
1468 				 * Force the initial request to be BLCKSZ so that we don't
1469 				 * waste time with lots of trips through this stanza as a
1470 				 * result of WAL compression.
1471 				 */
1472 				blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
1473 				blk->data = palloc(blk->data_bufsz);
1474 			}
1475 			memcpy(blk->data, ptr, blk->data_len);
1476 			ptr += blk->data_len;
1477 		}
1478 	}
1479 
1480 	/* and finally, the main data */
1481 	if (state->main_data_len > 0)
1482 	{
1483 		if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1484 		{
1485 			if (state->main_data)
1486 				pfree(state->main_data);
1487 
1488 			/*
1489 			 * main_data_bufsz must be MAXALIGN'ed.  In many xlog record
1490 			 * types, we omit trailing struct padding on-disk to save a few
1491 			 * bytes; but compilers may generate accesses to the xlog struct
1492 			 * that assume that padding bytes are present.  If the palloc
1493 			 * request is not large enough to include such padding bytes then
1494 			 * we'll get valgrind complaints due to otherwise-harmless fetches
1495 			 * of the padding bytes.
1496 			 *
1497 			 * In addition, force the initial request to be reasonably large
1498 			 * so that we don't waste time with lots of trips through this
1499 			 * stanza.  BLCKSZ / 2 seems like a good compromise choice.
1500 			 */
1501 			state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1502 												  BLCKSZ / 2));
1503 			state->main_data = palloc(state->main_data_bufsz);
1504 		}
1505 		memcpy(state->main_data, ptr, state->main_data_len);
1506 		ptr += state->main_data_len;
1507 	}
1508 
1509 	return true;
1510 
1511 shortdata_err:
1512 	report_invalid_record(state,
1513 						  "record with invalid length at %X/%X",
1514 						  LSN_FORMAT_ARGS(state->ReadRecPtr));
1515 err:
1516 	*errormsg = state->errormsg_buf;
1517 
1518 	return false;
1519 }
1520 
1521 /*
1522  * Returns information about the block that a block reference refers to.
1523  *
1524  * If the WAL record contains a block reference with the given ID, *rnode,
1525  * *forknum, and *blknum are filled in (if not NULL), and returns true.
1526  * Otherwise returns false.
1527  */
1528 bool
XLogRecGetBlockTag(XLogReaderState * record,uint8 block_id,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)1529 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1530 				   RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1531 {
1532 	DecodedBkpBlock *bkpb;
1533 
1534 	if (!record->blocks[block_id].in_use)
1535 		return false;
1536 
1537 	bkpb = &record->blocks[block_id];
1538 	if (rnode)
1539 		*rnode = bkpb->rnode;
1540 	if (forknum)
1541 		*forknum = bkpb->forknum;
1542 	if (blknum)
1543 		*blknum = bkpb->blkno;
1544 	return true;
1545 }
1546 
1547 /*
1548  * Returns the data associated with a block reference, or NULL if there is
1549  * no data (e.g. because a full-page image was taken instead). The returned
1550  * pointer points to a MAXALIGNed buffer.
1551  */
1552 char *
XLogRecGetBlockData(XLogReaderState * record,uint8 block_id,Size * len)1553 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1554 {
1555 	DecodedBkpBlock *bkpb;
1556 
1557 	if (!record->blocks[block_id].in_use)
1558 		return NULL;
1559 
1560 	bkpb = &record->blocks[block_id];
1561 
1562 	if (!bkpb->has_data)
1563 	{
1564 		if (len)
1565 			*len = 0;
1566 		return NULL;
1567 	}
1568 	else
1569 	{
1570 		if (len)
1571 			*len = bkpb->data_len;
1572 		return bkpb->data;
1573 	}
1574 }
1575 
1576 /*
1577  * Restore a full-page image from a backup block attached to an XLOG record.
1578  *
1579  * Returns true if a full-page image is restored.
1580  */
1581 bool
RestoreBlockImage(XLogReaderState * record,uint8 block_id,char * page)1582 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1583 {
1584 	DecodedBkpBlock *bkpb;
1585 	char	   *ptr;
1586 	PGAlignedBlock tmp;
1587 
1588 	if (!record->blocks[block_id].in_use)
1589 		return false;
1590 	if (!record->blocks[block_id].has_image)
1591 		return false;
1592 
1593 	bkpb = &record->blocks[block_id];
1594 	ptr = bkpb->bkp_image;
1595 
1596 	if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1597 	{
1598 		/* If a backup block image is compressed, decompress it */
1599 		if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1600 							BLCKSZ - bkpb->hole_length, true) < 0)
1601 		{
1602 			report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1603 								  LSN_FORMAT_ARGS(record->ReadRecPtr),
1604 								  block_id);
1605 			return false;
1606 		}
1607 		ptr = tmp.data;
1608 	}
1609 
1610 	/* generate page, taking into account hole if necessary */
1611 	if (bkpb->hole_length == 0)
1612 	{
1613 		memcpy(page, ptr, BLCKSZ);
1614 	}
1615 	else
1616 	{
1617 		memcpy(page, ptr, bkpb->hole_offset);
1618 		/* must zero-fill the hole */
1619 		MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1620 		memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1621 			   ptr + bkpb->hole_offset,
1622 			   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1623 	}
1624 
1625 	return true;
1626 }
1627 
1628 #ifndef FRONTEND
1629 
1630 /*
1631  * Extract the FullTransactionId from a WAL record.
1632  */
1633 FullTransactionId
XLogRecGetFullXid(XLogReaderState * record)1634 XLogRecGetFullXid(XLogReaderState *record)
1635 {
1636 	TransactionId xid,
1637 				next_xid;
1638 	uint32		epoch;
1639 
1640 	/*
1641 	 * This function is only safe during replay, because it depends on the
1642 	 * replay state.  See AdvanceNextFullTransactionIdPastXid() for more.
1643 	 */
1644 	Assert(AmStartupProcess() || !IsUnderPostmaster);
1645 
1646 	xid = XLogRecGetXid(record);
1647 	next_xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
1648 	epoch = EpochFromFullTransactionId(ShmemVariableCache->nextXid);
1649 
1650 	/*
1651 	 * If xid is numerically greater than next_xid, it has to be from the last
1652 	 * epoch.
1653 	 */
1654 	if (unlikely(xid > next_xid))
1655 		--epoch;
1656 
1657 	return FullTransactionIdFromEpochAndXid(epoch, xid);
1658 }
1659 
1660 #endif
1661