1 /*-------------------------------------------------------------------------
2  *
3  * xlogreader.c
4  *		Generic XLog reading facility
5  *
6  * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		src/backend/access/transam/xlogreader.c
10  *
11  * NOTES
12  *		See xlogreader.h for more notes on this facility.
13  *
14  *		This file is compiled as both front-end and backend code, so it
15  *		may not use ereport, server-defined static variables, etc.
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 
20 #include <unistd.h>
21 
22 #include "access/transam.h"
23 #include "access/xlog_internal.h"
24 #include "access/xlogreader.h"
25 #include "access/xlogrecord.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "replication/origin.h"
29 
30 #ifndef FRONTEND
31 #include "miscadmin.h"
32 #include "pgstat.h"
33 #include "utils/memutils.h"
34 #endif
35 
36 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
37 			pg_attribute_printf(2, 3);
38 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
39 static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
40 							 int reqLen);
41 static void XLogReaderInvalReadState(XLogReaderState *state);
42 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
43 								  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
44 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
45 							XLogRecPtr recptr);
46 static void ResetDecoder(XLogReaderState *state);
47 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
48 							   int segsize, const char *waldir);
49 
50 /* size of the buffer allocated for error message. */
51 #define MAX_ERRORMSG_LEN 1000
52 
53 /*
54  * Construct a string in state->errormsg_buf explaining what's wrong with
55  * the current record being read.
56  */
57 static void
report_invalid_record(XLogReaderState * state,const char * fmt,...)58 report_invalid_record(XLogReaderState *state, const char *fmt,...)
59 {
60 	va_list		args;
61 
62 	fmt = _(fmt);
63 
64 	va_start(args, fmt);
65 	vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
66 	va_end(args);
67 }
68 
69 /*
70  * Allocate and initialize a new XLogReader.
71  *
72  * Returns NULL if the xlogreader couldn't be allocated.
73  */
74 XLogReaderState *
XLogReaderAllocate(int wal_segment_size,const char * waldir,XLogReaderRoutine * routine,void * private_data)75 XLogReaderAllocate(int wal_segment_size, const char *waldir,
76 				   XLogReaderRoutine *routine, void *private_data)
77 {
78 	XLogReaderState *state;
79 
80 	state = (XLogReaderState *)
81 		palloc_extended(sizeof(XLogReaderState),
82 						MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
83 	if (!state)
84 		return NULL;
85 
86 	/* initialize caller-provided support functions */
87 	state->routine = *routine;
88 
89 	state->max_block_id = -1;
90 
91 	/*
92 	 * Permanently allocate readBuf.  We do it this way, rather than just
93 	 * making a static array, for two reasons: (1) no need to waste the
94 	 * storage in most instantiations of the backend; (2) a static char array
95 	 * isn't guaranteed to have any particular alignment, whereas
96 	 * palloc_extended() will provide MAXALIGN'd storage.
97 	 */
98 	state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
99 											  MCXT_ALLOC_NO_OOM);
100 	if (!state->readBuf)
101 	{
102 		pfree(state);
103 		return NULL;
104 	}
105 
106 	/* Initialize segment info. */
107 	WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
108 					   waldir);
109 
110 	/* system_identifier initialized to zeroes above */
111 	state->private_data = private_data;
112 	/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
113 	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
114 										  MCXT_ALLOC_NO_OOM);
115 	if (!state->errormsg_buf)
116 	{
117 		pfree(state->readBuf);
118 		pfree(state);
119 		return NULL;
120 	}
121 	state->errormsg_buf[0] = '\0';
122 
123 	/*
124 	 * Allocate an initial readRecordBuf of minimal size, which can later be
125 	 * enlarged if necessary.
126 	 */
127 	if (!allocate_recordbuf(state, 0))
128 	{
129 		pfree(state->errormsg_buf);
130 		pfree(state->readBuf);
131 		pfree(state);
132 		return NULL;
133 	}
134 
135 	return state;
136 }
137 
138 void
XLogReaderFree(XLogReaderState * state)139 XLogReaderFree(XLogReaderState *state)
140 {
141 	int			block_id;
142 
143 	if (state->seg.ws_file != -1)
144 		state->routine.segment_close(state);
145 
146 	for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
147 	{
148 		if (state->blocks[block_id].data)
149 			pfree(state->blocks[block_id].data);
150 	}
151 	if (state->main_data)
152 		pfree(state->main_data);
153 
154 	pfree(state->errormsg_buf);
155 	if (state->readRecordBuf)
156 		pfree(state->readRecordBuf);
157 	pfree(state->readBuf);
158 	pfree(state);
159 }
160 
161 /*
162  * Allocate readRecordBuf to fit a record of at least the given length.
163  * Returns true if successful, false if out of memory.
164  *
165  * readRecordBufSize is set to the new buffer size.
166  *
167  * To avoid useless small increases, round its size to a multiple of
168  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
169  * with.  (That is enough for all "normal" records, but very large commit or
170  * abort records might need more space.)
171  */
172 static bool
allocate_recordbuf(XLogReaderState * state,uint32 reclength)173 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
174 {
175 	uint32		newSize = reclength;
176 
177 	newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
178 	newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
179 
180 #ifndef FRONTEND
181 
182 	/*
183 	 * Note that in much unlucky circumstances, the random data read from a
184 	 * recycled segment can cause this routine to be called with a size
185 	 * causing a hard failure at allocation.  For a standby, this would cause
186 	 * the instance to stop suddenly with a hard failure, preventing it to
187 	 * retry fetching WAL from one of its sources which could allow it to move
188 	 * on with replay without a manual restart. If the data comes from a past
189 	 * recycled segment and is still valid, then the allocation may succeed
190 	 * but record checks are going to fail so this would be short-lived.  If
191 	 * the allocation fails because of a memory shortage, then this is not a
192 	 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
193 	 */
194 	if (!AllocSizeIsValid(newSize))
195 		return false;
196 
197 #endif
198 
199 	if (state->readRecordBuf)
200 		pfree(state->readRecordBuf);
201 	state->readRecordBuf =
202 		(char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
203 	if (state->readRecordBuf == NULL)
204 	{
205 		state->readRecordBufSize = 0;
206 		return false;
207 	}
208 	state->readRecordBufSize = newSize;
209 	return true;
210 }
211 
212 /*
213  * Initialize the passed segment structs.
214  */
215 static void
WALOpenSegmentInit(WALOpenSegment * seg,WALSegmentContext * segcxt,int segsize,const char * waldir)216 WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
217 				   int segsize, const char *waldir)
218 {
219 	seg->ws_file = -1;
220 	seg->ws_segno = 0;
221 	seg->ws_tli = 0;
222 
223 	segcxt->ws_segsize = segsize;
224 	if (waldir)
225 		snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
226 }
227 
228 /*
229  * Begin reading WAL at 'RecPtr'.
230  *
231  * 'RecPtr' should point to the beginnning of a valid WAL record.  Pointing at
232  * the beginning of a page is also OK, if there is a new record right after
233  * the page header, i.e. not a continuation.
234  *
235  * This does not make any attempt to read the WAL yet, and hence cannot fail.
236  * If the starting address is not correct, the first call to XLogReadRecord()
237  * will error out.
238  */
239 void
XLogBeginRead(XLogReaderState * state,XLogRecPtr RecPtr)240 XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
241 {
242 	Assert(!XLogRecPtrIsInvalid(RecPtr));
243 
244 	ResetDecoder(state);
245 
246 	/* Begin at the passed-in record pointer. */
247 	state->EndRecPtr = RecPtr;
248 	state->ReadRecPtr = InvalidXLogRecPtr;
249 }
250 
251 /*
252  * Attempt to read an XLOG record.
253  *
254  * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
255  * to XLogReadRecord().
256  *
257  * If the page_read callback fails to read the requested data, NULL is
258  * returned.  The callback is expected to have reported the error; errormsg
259  * is set to NULL.
260  *
261  * If the reading fails for some other reason, NULL is also returned, and
262  * *errormsg is set to a string with details of the failure.
263  *
264  * The returned pointer (or *errormsg) points to an internal buffer that's
265  * valid until the next call to XLogReadRecord.
266  */
267 XLogRecord *
XLogReadRecord(XLogReaderState * state,char ** errormsg)268 XLogReadRecord(XLogReaderState *state, char **errormsg)
269 {
270 	XLogRecPtr	RecPtr;
271 	XLogRecord *record;
272 	XLogRecPtr	targetPagePtr;
273 	bool		randAccess;
274 	uint32		len,
275 				total_len;
276 	uint32		targetRecOff;
277 	uint32		pageHeaderSize;
278 	bool		assembled;
279 	bool		gotheader;
280 	int			readOff;
281 
282 	/*
283 	 * randAccess indicates whether to verify the previous-record pointer of
284 	 * the record we're reading.  We only do this if we're reading
285 	 * sequentially, which is what we initially assume.
286 	 */
287 	randAccess = false;
288 
289 	/* reset error state */
290 	*errormsg = NULL;
291 	state->errormsg_buf[0] = '\0';
292 
293 	ResetDecoder(state);
294 	state->abortedRecPtr = InvalidXLogRecPtr;
295 	state->missingContrecPtr = InvalidXLogRecPtr;
296 
297 	RecPtr = state->EndRecPtr;
298 
299 	if (state->ReadRecPtr != InvalidXLogRecPtr)
300 	{
301 		/* read the record after the one we just read */
302 
303 		/*
304 		 * EndRecPtr is pointing to end+1 of the previous WAL record.  If
305 		 * we're at a page boundary, no more records can fit on the current
306 		 * page. We must skip over the page header, but we can't do that until
307 		 * we've read in the page, since the header size is variable.
308 		 */
309 	}
310 	else
311 	{
312 		/*
313 		 * Caller supplied a position to start at.
314 		 *
315 		 * In this case, EndRecPtr should already be pointing to a valid
316 		 * record starting position.
317 		 */
318 		Assert(XRecOffIsValid(RecPtr));
319 		randAccess = true;
320 	}
321 
322 restart:
323 	state->currRecPtr = RecPtr;
324 	assembled = false;
325 
326 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
327 	targetRecOff = RecPtr % XLOG_BLCKSZ;
328 
329 	/*
330 	 * Read the page containing the record into state->readBuf. Request enough
331 	 * byte to cover the whole record header, or at least the part of it that
332 	 * fits on the same page.
333 	 */
334 	readOff = ReadPageInternal(state, targetPagePtr,
335 							   Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
336 	if (readOff < 0)
337 		goto err;
338 
339 	/*
340 	 * ReadPageInternal always returns at least the page header, so we can
341 	 * examine it now.
342 	 */
343 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
344 	if (targetRecOff == 0)
345 	{
346 		/*
347 		 * At page start, so skip over page header.
348 		 */
349 		RecPtr += pageHeaderSize;
350 		targetRecOff = pageHeaderSize;
351 	}
352 	else if (targetRecOff < pageHeaderSize)
353 	{
354 		report_invalid_record(state, "invalid record offset at %X/%X",
355 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
356 		goto err;
357 	}
358 
359 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
360 		targetRecOff == pageHeaderSize)
361 	{
362 		report_invalid_record(state, "contrecord is requested by %X/%X",
363 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
364 		goto err;
365 	}
366 
367 	/* ReadPageInternal has verified the page header */
368 	Assert(pageHeaderSize <= readOff);
369 
370 	/*
371 	 * Read the record length.
372 	 *
373 	 * NB: Even though we use an XLogRecord pointer here, the whole record
374 	 * header might not fit on this page. xl_tot_len is the first field of the
375 	 * struct, so it must be on this page (the records are MAXALIGNed), but we
376 	 * cannot access any other fields until we've verified that we got the
377 	 * whole header.
378 	 */
379 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
380 	total_len = record->xl_tot_len;
381 
382 	/*
383 	 * If the whole record header is on this page, validate it immediately.
384 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
385 	 * rest of the header after reading it from the next page.  The xl_tot_len
386 	 * check is necessary here to ensure that we enter the "Need to reassemble
387 	 * record" code path below; otherwise we might fail to apply
388 	 * ValidXLogRecordHeader at all.
389 	 */
390 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
391 	{
392 		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
393 								   randAccess))
394 			goto err;
395 		gotheader = true;
396 	}
397 	else
398 	{
399 		/* XXX: more validation should be done here */
400 		if (total_len < SizeOfXLogRecord)
401 		{
402 			report_invalid_record(state,
403 								  "invalid record length at %X/%X: wanted %u, got %u",
404 								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
405 								  (uint32) SizeOfXLogRecord, total_len);
406 			goto err;
407 		}
408 		gotheader = false;
409 	}
410 
411 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
412 	if (total_len > len)
413 	{
414 		/* Need to reassemble record */
415 		char	   *contdata;
416 		XLogPageHeader pageHeader;
417 		char	   *buffer;
418 		uint32		gotlen;
419 
420 		assembled = true;
421 
422 		/*
423 		 * Enlarge readRecordBuf as needed.
424 		 */
425 		if (total_len > state->readRecordBufSize &&
426 			!allocate_recordbuf(state, total_len))
427 		{
428 			/* We treat this as a "bogus data" condition */
429 			report_invalid_record(state, "record length %u at %X/%X too long",
430 								  total_len,
431 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
432 			goto err;
433 		}
434 
435 		/* Copy the first fragment of the record from the first page. */
436 		memcpy(state->readRecordBuf,
437 			   state->readBuf + RecPtr % XLOG_BLCKSZ, len);
438 		buffer = state->readRecordBuf + len;
439 		gotlen = len;
440 
441 		do
442 		{
443 			/* Calculate pointer to beginning of next page */
444 			targetPagePtr += XLOG_BLCKSZ;
445 
446 			/* Wait for the next page to become available */
447 			readOff = ReadPageInternal(state, targetPagePtr,
448 									   Min(total_len - gotlen + SizeOfXLogShortPHD,
449 										   XLOG_BLCKSZ));
450 
451 			if (readOff < 0)
452 				goto err;
453 
454 			Assert(SizeOfXLogShortPHD <= readOff);
455 
456 			pageHeader = (XLogPageHeader) state->readBuf;
457 
458 			/*
459 			 * If we were expecting a continuation record and got an
460 			 * "overwrite contrecord" flag, that means the continuation record
461 			 * was overwritten with a different record.  Restart the read by
462 			 * assuming the address to read is the location where we found
463 			 * this flag; but keep track of the LSN of the record we were
464 			 * reading, for later verification.
465 			 */
466 			if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
467 			{
468 				state->overwrittenRecPtr = state->currRecPtr;
469 				ResetDecoder(state);
470 				RecPtr = targetPagePtr;
471 				goto restart;
472 			}
473 
474 			/* Check that the continuation on next page looks valid */
475 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
476 			{
477 				report_invalid_record(state,
478 									  "there is no contrecord flag at %X/%X",
479 									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
480 				goto err;
481 			}
482 
483 			/*
484 			 * Cross-check that xlp_rem_len agrees with how much of the record
485 			 * we expect there to be left.
486 			 */
487 			if (pageHeader->xlp_rem_len == 0 ||
488 				total_len != (pageHeader->xlp_rem_len + gotlen))
489 			{
490 				report_invalid_record(state,
491 									  "invalid contrecord length %u at %X/%X",
492 									  pageHeader->xlp_rem_len,
493 									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
494 				goto err;
495 			}
496 
497 			/* Append the continuation from this page to the buffer */
498 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
499 
500 			if (readOff < pageHeaderSize)
501 				readOff = ReadPageInternal(state, targetPagePtr,
502 										   pageHeaderSize);
503 
504 			Assert(pageHeaderSize <= readOff);
505 
506 			contdata = (char *) state->readBuf + pageHeaderSize;
507 			len = XLOG_BLCKSZ - pageHeaderSize;
508 			if (pageHeader->xlp_rem_len < len)
509 				len = pageHeader->xlp_rem_len;
510 
511 			if (readOff < pageHeaderSize + len)
512 				readOff = ReadPageInternal(state, targetPagePtr,
513 										   pageHeaderSize + len);
514 
515 			memcpy(buffer, (char *) contdata, len);
516 			buffer += len;
517 			gotlen += len;
518 
519 			/* If we just reassembled the record header, validate it. */
520 			if (!gotheader)
521 			{
522 				record = (XLogRecord *) state->readRecordBuf;
523 				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
524 										   record, randAccess))
525 					goto err;
526 				gotheader = true;
527 			}
528 		} while (gotlen < total_len);
529 
530 		Assert(gotheader);
531 
532 		record = (XLogRecord *) state->readRecordBuf;
533 		if (!ValidXLogRecord(state, record, RecPtr))
534 			goto err;
535 
536 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
537 		state->ReadRecPtr = RecPtr;
538 		state->EndRecPtr = targetPagePtr + pageHeaderSize
539 			+ MAXALIGN(pageHeader->xlp_rem_len);
540 	}
541 	else
542 	{
543 		/* Wait for the record data to become available */
544 		readOff = ReadPageInternal(state, targetPagePtr,
545 								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
546 		if (readOff < 0)
547 			goto err;
548 
549 		/* Record does not cross a page boundary */
550 		if (!ValidXLogRecord(state, record, RecPtr))
551 			goto err;
552 
553 		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
554 
555 		state->ReadRecPtr = RecPtr;
556 	}
557 
558 	/*
559 	 * Special processing if it's an XLOG SWITCH record
560 	 */
561 	if (record->xl_rmid == RM_XLOG_ID &&
562 		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
563 	{
564 		/* Pretend it extends to end of segment */
565 		state->EndRecPtr += state->segcxt.ws_segsize - 1;
566 		state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
567 	}
568 
569 	if (DecodeXLogRecord(state, record, errormsg))
570 		return record;
571 	else
572 		return NULL;
573 
574 err:
575 	if (assembled)
576 	{
577 		/*
578 		 * We get here when a record that spans multiple pages needs to be
579 		 * assembled, but something went wrong -- perhaps a contrecord piece
580 		 * was lost.  If caller is WAL replay, it will know where the aborted
581 		 * record was and where to direct followup WAL to be written, marking
582 		 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
583 		 * in turn signal downstream WAL consumers that the broken WAL record
584 		 * is to be ignored.
585 		 */
586 		state->abortedRecPtr = RecPtr;
587 		state->missingContrecPtr = targetPagePtr;
588 	}
589 
590 	/*
591 	 * Invalidate the read state. We might read from a different source after
592 	 * failure.
593 	 */
594 	XLogReaderInvalReadState(state);
595 
596 	if (state->errormsg_buf[0] != '\0')
597 		*errormsg = state->errormsg_buf;
598 
599 	return NULL;
600 }
601 
602 /*
603  * Read a single xlog page including at least [pageptr, reqLen] of valid data
604  * via the page_read() callback.
605  *
606  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
607  * is set in that case (unless the error occurs in the page_read callback).
608  *
609  * We fetch the page from a reader-local cache if we know we have the required
610  * data and if there hasn't been any error since caching the data.
611  */
612 static int
ReadPageInternal(XLogReaderState * state,XLogRecPtr pageptr,int reqLen)613 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
614 {
615 	int			readLen;
616 	uint32		targetPageOff;
617 	XLogSegNo	targetSegNo;
618 	XLogPageHeader hdr;
619 
620 	Assert((pageptr % XLOG_BLCKSZ) == 0);
621 
622 	XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
623 	targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
624 
625 	/* check whether we have all the requested data already */
626 	if (targetSegNo == state->seg.ws_segno &&
627 		targetPageOff == state->segoff && reqLen <= state->readLen)
628 		return state->readLen;
629 
630 	/*
631 	 * Data is not in our buffer.
632 	 *
633 	 * Every time we actually read the segment, even if we looked at parts of
634 	 * it before, we need to do verification as the page_read callback might
635 	 * now be rereading data from a different source.
636 	 *
637 	 * Whenever switching to a new WAL segment, we read the first page of the
638 	 * file and validate its header, even if that's not where the target
639 	 * record is.  This is so that we can check the additional identification
640 	 * info that is present in the first page's "long" header.
641 	 */
642 	if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
643 	{
644 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
645 
646 		readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
647 										   state->currRecPtr,
648 										   state->readBuf);
649 		if (readLen < 0)
650 			goto err;
651 
652 		/* we can be sure to have enough WAL available, we scrolled back */
653 		Assert(readLen == XLOG_BLCKSZ);
654 
655 		if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
656 										  state->readBuf))
657 			goto err;
658 	}
659 
660 	/*
661 	 * First, read the requested data length, but at least a short page header
662 	 * so that we can validate it.
663 	 */
664 	readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
665 									   state->currRecPtr,
666 									   state->readBuf);
667 	if (readLen < 0)
668 		goto err;
669 
670 	Assert(readLen <= XLOG_BLCKSZ);
671 
672 	/* Do we have enough data to check the header length? */
673 	if (readLen <= SizeOfXLogShortPHD)
674 		goto err;
675 
676 	Assert(readLen >= reqLen);
677 
678 	hdr = (XLogPageHeader) state->readBuf;
679 
680 	/* still not enough */
681 	if (readLen < XLogPageHeaderSize(hdr))
682 	{
683 		readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
684 										   state->currRecPtr,
685 										   state->readBuf);
686 		if (readLen < 0)
687 			goto err;
688 	}
689 
690 	/*
691 	 * Now that we know we have the full header, validate it.
692 	 */
693 	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
694 		goto err;
695 
696 	/* update read state information */
697 	state->seg.ws_segno = targetSegNo;
698 	state->segoff = targetPageOff;
699 	state->readLen = readLen;
700 
701 	return readLen;
702 
703 err:
704 	XLogReaderInvalReadState(state);
705 	return -1;
706 }
707 
708 /*
709  * Invalidate the xlogreader's read state to force a re-read.
710  */
711 static void
XLogReaderInvalReadState(XLogReaderState * state)712 XLogReaderInvalReadState(XLogReaderState *state)
713 {
714 	state->seg.ws_segno = 0;
715 	state->segoff = 0;
716 	state->readLen = 0;
717 }
718 
719 /*
720  * Validate an XLOG record header.
721  *
722  * This is just a convenience subroutine to avoid duplicated code in
723  * XLogReadRecord.  It's not intended for use from anywhere else.
724  */
725 static bool
ValidXLogRecordHeader(XLogReaderState * state,XLogRecPtr RecPtr,XLogRecPtr PrevRecPtr,XLogRecord * record,bool randAccess)726 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
727 					  XLogRecPtr PrevRecPtr, XLogRecord *record,
728 					  bool randAccess)
729 {
730 	if (record->xl_tot_len < SizeOfXLogRecord)
731 	{
732 		report_invalid_record(state,
733 							  "invalid record length at %X/%X: wanted %u, got %u",
734 							  (uint32) (RecPtr >> 32), (uint32) RecPtr,
735 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
736 		return false;
737 	}
738 	if (record->xl_rmid > RM_MAX_ID)
739 	{
740 		report_invalid_record(state,
741 							  "invalid resource manager ID %u at %X/%X",
742 							  record->xl_rmid, (uint32) (RecPtr >> 32),
743 							  (uint32) RecPtr);
744 		return false;
745 	}
746 	if (randAccess)
747 	{
748 		/*
749 		 * We can't exactly verify the prev-link, but surely it should be less
750 		 * than the record's own address.
751 		 */
752 		if (!(record->xl_prev < RecPtr))
753 		{
754 			report_invalid_record(state,
755 								  "record with incorrect prev-link %X/%X at %X/%X",
756 								  (uint32) (record->xl_prev >> 32),
757 								  (uint32) record->xl_prev,
758 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
759 			return false;
760 		}
761 	}
762 	else
763 	{
764 		/*
765 		 * Record's prev-link should exactly match our previous location. This
766 		 * check guards against torn WAL pages where a stale but valid-looking
767 		 * WAL record starts on a sector boundary.
768 		 */
769 		if (record->xl_prev != PrevRecPtr)
770 		{
771 			report_invalid_record(state,
772 								  "record with incorrect prev-link %X/%X at %X/%X",
773 								  (uint32) (record->xl_prev >> 32),
774 								  (uint32) record->xl_prev,
775 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
776 			return false;
777 		}
778 	}
779 
780 	return true;
781 }
782 
783 
784 /*
785  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
786  * record (other than to the minimal extent of computing the amount of
787  * data to read in) until we've checked the CRCs.
788  *
789  * We assume all of the record (that is, xl_tot_len bytes) has been read
790  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
791  * record's header, which means in particular that xl_tot_len is at least
792  * SizeOfXLogRecord.
793  */
794 static bool
ValidXLogRecord(XLogReaderState * state,XLogRecord * record,XLogRecPtr recptr)795 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
796 {
797 	pg_crc32c	crc;
798 
799 	/* Calculate the CRC */
800 	INIT_CRC32C(crc);
801 	COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
802 	/* include the record header last */
803 	COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
804 	FIN_CRC32C(crc);
805 
806 	if (!EQ_CRC32C(record->xl_crc, crc))
807 	{
808 		report_invalid_record(state,
809 							  "incorrect resource manager data checksum in record at %X/%X",
810 							  (uint32) (recptr >> 32), (uint32) recptr);
811 		return false;
812 	}
813 
814 	return true;
815 }
816 
817 /*
818  * Validate a page header.
819  *
820  * Check if 'phdr' is valid as the header of the XLog page at position
821  * 'recptr'.
822  */
823 bool
XLogReaderValidatePageHeader(XLogReaderState * state,XLogRecPtr recptr,char * phdr)824 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
825 							 char *phdr)
826 {
827 	XLogRecPtr	recaddr;
828 	XLogSegNo	segno;
829 	int32		offset;
830 	XLogPageHeader hdr = (XLogPageHeader) phdr;
831 
832 	Assert((recptr % XLOG_BLCKSZ) == 0);
833 
834 	XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
835 	offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
836 
837 	XLogSegNoOffsetToRecPtr(segno, offset, state->segcxt.ws_segsize, recaddr);
838 
839 	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
840 	{
841 		char		fname[MAXFNAMELEN];
842 
843 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
844 
845 		report_invalid_record(state,
846 							  "invalid magic number %04X in log segment %s, offset %u",
847 							  hdr->xlp_magic,
848 							  fname,
849 							  offset);
850 		return false;
851 	}
852 
853 	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
854 	{
855 		char		fname[MAXFNAMELEN];
856 
857 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
858 
859 		report_invalid_record(state,
860 							  "invalid info bits %04X in log segment %s, offset %u",
861 							  hdr->xlp_info,
862 							  fname,
863 							  offset);
864 		return false;
865 	}
866 
867 	if (hdr->xlp_info & XLP_LONG_HEADER)
868 	{
869 		XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
870 
871 		if (state->system_identifier &&
872 			longhdr->xlp_sysid != state->system_identifier)
873 		{
874 			report_invalid_record(state,
875 								  "WAL file is from different database system: WAL file database system identifier is %llu, pg_control database system identifier is %llu",
876 								  (unsigned long long) longhdr->xlp_sysid,
877 								  (unsigned long long) state->system_identifier);
878 			return false;
879 		}
880 		else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
881 		{
882 			report_invalid_record(state,
883 								  "WAL file is from different database system: incorrect segment size in page header");
884 			return false;
885 		}
886 		else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
887 		{
888 			report_invalid_record(state,
889 								  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
890 			return false;
891 		}
892 	}
893 	else if (offset == 0)
894 	{
895 		char		fname[MAXFNAMELEN];
896 
897 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
898 
899 		/* hmm, first page of file doesn't have a long header? */
900 		report_invalid_record(state,
901 							  "invalid info bits %04X in log segment %s, offset %u",
902 							  hdr->xlp_info,
903 							  fname,
904 							  offset);
905 		return false;
906 	}
907 
908 	/*
909 	 * Check that the address on the page agrees with what we expected. This
910 	 * check typically fails when an old WAL segment is recycled, and hasn't
911 	 * yet been overwritten with new data yet.
912 	 */
913 	if (hdr->xlp_pageaddr != recaddr)
914 	{
915 		char		fname[MAXFNAMELEN];
916 
917 		XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
918 
919 		report_invalid_record(state,
920 							  "unexpected pageaddr %X/%X in log segment %s, offset %u",
921 							  (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
922 							  fname,
923 							  offset);
924 		return false;
925 	}
926 
927 	/*
928 	 * Since child timelines are always assigned a TLI greater than their
929 	 * immediate parent's TLI, we should never see TLI go backwards across
930 	 * successive pages of a consistent WAL sequence.
931 	 *
932 	 * Sometimes we re-read a segment that's already been (partially) read. So
933 	 * we only verify TLIs for pages that are later than the last remembered
934 	 * LSN.
935 	 */
936 	if (recptr > state->latestPagePtr)
937 	{
938 		if (hdr->xlp_tli < state->latestPageTLI)
939 		{
940 			char		fname[MAXFNAMELEN];
941 
942 			XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
943 
944 			report_invalid_record(state,
945 								  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
946 								  hdr->xlp_tli,
947 								  state->latestPageTLI,
948 								  fname,
949 								  offset);
950 			return false;
951 		}
952 	}
953 	state->latestPagePtr = recptr;
954 	state->latestPageTLI = hdr->xlp_tli;
955 
956 	return true;
957 }
958 
959 #ifdef FRONTEND
960 /*
961  * Functions that are currently not needed in the backend, but are better
962  * implemented inside xlogreader.c because of the internal facilities available
963  * here.
964  */
965 
966 /*
967  * Find the first record with an lsn >= RecPtr.
968  *
969  * This is different from XLogBeginRead() in that RecPtr doesn't need to point
970  * to a valid record boundary.  Useful for checking whether RecPtr is a valid
971  * xlog address for reading, and to find the first valid address after some
972  * address when dumping records for debugging purposes.
973  *
974  * This positions the reader, like XLogBeginRead(), so that the next call to
975  * XLogReadRecord() will read the next valid record.
976  */
977 XLogRecPtr
XLogFindNextRecord(XLogReaderState * state,XLogRecPtr RecPtr)978 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
979 {
980 	XLogRecPtr	tmpRecPtr;
981 	XLogRecPtr	found = InvalidXLogRecPtr;
982 	XLogPageHeader header;
983 	char	   *errormsg;
984 
985 	Assert(!XLogRecPtrIsInvalid(RecPtr));
986 
987 	/*
988 	 * skip over potential continuation data, keeping in mind that it may span
989 	 * multiple pages
990 	 */
991 	tmpRecPtr = RecPtr;
992 	while (true)
993 	{
994 		XLogRecPtr	targetPagePtr;
995 		int			targetRecOff;
996 		uint32		pageHeaderSize;
997 		int			readLen;
998 
999 		/*
1000 		 * Compute targetRecOff. It should typically be equal or greater than
1001 		 * short page-header since a valid record can't start anywhere before
1002 		 * that, except when caller has explicitly specified the offset that
1003 		 * falls somewhere there or when we are skipping multi-page
1004 		 * continuation record. It doesn't matter though because
1005 		 * ReadPageInternal() is prepared to handle that and will read at
1006 		 * least short page-header worth of data
1007 		 */
1008 		targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
1009 
1010 		/* scroll back to page boundary */
1011 		targetPagePtr = tmpRecPtr - targetRecOff;
1012 
1013 		/* Read the page containing the record */
1014 		readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
1015 		if (readLen < 0)
1016 			goto err;
1017 
1018 		header = (XLogPageHeader) state->readBuf;
1019 
1020 		pageHeaderSize = XLogPageHeaderSize(header);
1021 
1022 		/* make sure we have enough data for the page header */
1023 		readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
1024 		if (readLen < 0)
1025 			goto err;
1026 
1027 		/* skip over potential continuation data */
1028 		if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
1029 		{
1030 			/*
1031 			 * If the length of the remaining continuation data is more than
1032 			 * what can fit in this page, the continuation record crosses over
1033 			 * this page. Read the next page and try again. xlp_rem_len in the
1034 			 * next page header will contain the remaining length of the
1035 			 * continuation data
1036 			 *
1037 			 * Note that record headers are MAXALIGN'ed
1038 			 */
1039 			if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
1040 				tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
1041 			else
1042 			{
1043 				/*
1044 				 * The previous continuation record ends in this page. Set
1045 				 * tmpRecPtr to point to the first valid record
1046 				 */
1047 				tmpRecPtr = targetPagePtr + pageHeaderSize
1048 					+ MAXALIGN(header->xlp_rem_len);
1049 				break;
1050 			}
1051 		}
1052 		else
1053 		{
1054 			tmpRecPtr = targetPagePtr + pageHeaderSize;
1055 			break;
1056 		}
1057 	}
1058 
1059 	/*
1060 	 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1061 	 * because either we're at the first record after the beginning of a page
1062 	 * or we just jumped over the remaining data of a continuation.
1063 	 */
1064 	XLogBeginRead(state, tmpRecPtr);
1065 	while (XLogReadRecord(state, &errormsg) != NULL)
1066 	{
1067 		/* past the record we've found, break out */
1068 		if (RecPtr <= state->ReadRecPtr)
1069 		{
1070 			/* Rewind the reader to the beginning of the last record. */
1071 			found = state->ReadRecPtr;
1072 			XLogBeginRead(state, found);
1073 			return found;
1074 		}
1075 	}
1076 
1077 err:
1078 	XLogReaderInvalReadState(state);
1079 
1080 	return InvalidXLogRecPtr;
1081 }
1082 
1083 #endif							/* FRONTEND */
1084 
1085 /*
1086  * Helper function to ease writing of XLogRoutine->page_read callbacks.
1087  * If this function is used, caller must supply a segment_open callback in
1088  * 'state', as that is used here.
1089  *
1090  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
1091  * fetched from timeline 'tli'.
1092  *
1093  * Returns true if succeeded, false if an error occurs, in which case
1094  * 'errinfo' receives error details.
1095  *
1096  * XXX probably this should be improved to suck data directly from the
1097  * WAL buffers when possible.
1098  */
1099 bool
WALRead(XLogReaderState * state,char * buf,XLogRecPtr startptr,Size count,TimeLineID tli,WALReadError * errinfo)1100 WALRead(XLogReaderState *state,
1101 		char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
1102 		WALReadError *errinfo)
1103 {
1104 	char	   *p;
1105 	XLogRecPtr	recptr;
1106 	Size		nbytes;
1107 
1108 	p = buf;
1109 	recptr = startptr;
1110 	nbytes = count;
1111 
1112 	while (nbytes > 0)
1113 	{
1114 		uint32		startoff;
1115 		int			segbytes;
1116 		int			readbytes;
1117 
1118 		startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
1119 
1120 		/*
1121 		 * If the data we want is not in a segment we have open, close what we
1122 		 * have (if anything) and open the next one, using the caller's
1123 		 * provided openSegment callback.
1124 		 */
1125 		if (state->seg.ws_file < 0 ||
1126 			!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
1127 			tli != state->seg.ws_tli)
1128 		{
1129 			XLogSegNo	nextSegNo;
1130 
1131 			if (state->seg.ws_file >= 0)
1132 				state->routine.segment_close(state);
1133 
1134 			XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
1135 			state->routine.segment_open(state, nextSegNo, &tli);
1136 
1137 			/* This shouldn't happen -- indicates a bug in segment_open */
1138 			Assert(state->seg.ws_file >= 0);
1139 
1140 			/* Update the current segment info. */
1141 			state->seg.ws_tli = tli;
1142 			state->seg.ws_segno = nextSegNo;
1143 		}
1144 
1145 		/* How many bytes are within this segment? */
1146 		if (nbytes > (state->segcxt.ws_segsize - startoff))
1147 			segbytes = state->segcxt.ws_segsize - startoff;
1148 		else
1149 			segbytes = nbytes;
1150 
1151 #ifndef FRONTEND
1152 		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
1153 #endif
1154 
1155 		/* Reset errno first; eases reporting non-errno-affecting errors */
1156 		errno = 0;
1157 		readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
1158 
1159 #ifndef FRONTEND
1160 		pgstat_report_wait_end();
1161 #endif
1162 
1163 		if (readbytes <= 0)
1164 		{
1165 			errinfo->wre_errno = errno;
1166 			errinfo->wre_req = segbytes;
1167 			errinfo->wre_read = readbytes;
1168 			errinfo->wre_off = startoff;
1169 			errinfo->wre_seg = state->seg;
1170 			return false;
1171 		}
1172 
1173 		/* Update state for read */
1174 		recptr += readbytes;
1175 		nbytes -= readbytes;
1176 		p += readbytes;
1177 	}
1178 
1179 	return true;
1180 }
1181 
1182 /* ----------------------------------------
1183  * Functions for decoding the data and block references in a record.
1184  * ----------------------------------------
1185  */
1186 
1187 /* private function to reset the state between records */
1188 static void
ResetDecoder(XLogReaderState * state)1189 ResetDecoder(XLogReaderState *state)
1190 {
1191 	int			block_id;
1192 
1193 	state->decoded_record = NULL;
1194 
1195 	state->main_data_len = 0;
1196 
1197 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1198 	{
1199 		state->blocks[block_id].in_use = false;
1200 		state->blocks[block_id].has_image = false;
1201 		state->blocks[block_id].has_data = false;
1202 		state->blocks[block_id].apply_image = false;
1203 	}
1204 	state->max_block_id = -1;
1205 }
1206 
1207 /*
1208  * Decode the previously read record.
1209  *
1210  * On error, a human-readable error message is returned in *errormsg, and
1211  * the return value is false.
1212  */
1213 bool
DecodeXLogRecord(XLogReaderState * state,XLogRecord * record,char ** errormsg)1214 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1215 {
1216 	/*
1217 	 * read next _size bytes from record buffer, but check for overrun first.
1218 	 */
1219 #define COPY_HEADER_FIELD(_dst, _size)			\
1220 	do {										\
1221 		if (remaining < _size)					\
1222 			goto shortdata_err;					\
1223 		memcpy(_dst, ptr, _size);				\
1224 		ptr += _size;							\
1225 		remaining -= _size;						\
1226 	} while(0)
1227 
1228 	char	   *ptr;
1229 	uint32		remaining;
1230 	uint32		datatotal;
1231 	RelFileNode *rnode = NULL;
1232 	uint8		block_id;
1233 
1234 	ResetDecoder(state);
1235 
1236 	state->decoded_record = record;
1237 	state->record_origin = InvalidRepOriginId;
1238 
1239 	ptr = (char *) record;
1240 	ptr += SizeOfXLogRecord;
1241 	remaining = record->xl_tot_len - SizeOfXLogRecord;
1242 
1243 	/* Decode the headers */
1244 	datatotal = 0;
1245 	while (remaining > datatotal)
1246 	{
1247 		COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1248 
1249 		if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1250 		{
1251 			/* XLogRecordDataHeaderShort */
1252 			uint8		main_data_len;
1253 
1254 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1255 
1256 			state->main_data_len = main_data_len;
1257 			datatotal += main_data_len;
1258 			break;				/* by convention, the main data fragment is
1259 								 * always last */
1260 		}
1261 		else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1262 		{
1263 			/* XLogRecordDataHeaderLong */
1264 			uint32		main_data_len;
1265 
1266 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1267 			state->main_data_len = main_data_len;
1268 			datatotal += main_data_len;
1269 			break;				/* by convention, the main data fragment is
1270 								 * always last */
1271 		}
1272 		else if (block_id == XLR_BLOCK_ID_ORIGIN)
1273 		{
1274 			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1275 		}
1276 		else if (block_id <= XLR_MAX_BLOCK_ID)
1277 		{
1278 			/* XLogRecordBlockHeader */
1279 			DecodedBkpBlock *blk;
1280 			uint8		fork_flags;
1281 
1282 			if (block_id <= state->max_block_id)
1283 			{
1284 				report_invalid_record(state,
1285 									  "out-of-order block_id %u at %X/%X",
1286 									  block_id,
1287 									  (uint32) (state->ReadRecPtr >> 32),
1288 									  (uint32) state->ReadRecPtr);
1289 				goto err;
1290 			}
1291 			state->max_block_id = block_id;
1292 
1293 			blk = &state->blocks[block_id];
1294 			blk->in_use = true;
1295 			blk->apply_image = false;
1296 
1297 			COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1298 			blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1299 			blk->flags = fork_flags;
1300 			blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1301 			blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1302 
1303 			COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1304 			/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1305 			if (blk->has_data && blk->data_len == 0)
1306 			{
1307 				report_invalid_record(state,
1308 									  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1309 									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1310 				goto err;
1311 			}
1312 			if (!blk->has_data && blk->data_len != 0)
1313 			{
1314 				report_invalid_record(state,
1315 									  "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1316 									  (unsigned int) blk->data_len,
1317 									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1318 				goto err;
1319 			}
1320 			datatotal += blk->data_len;
1321 
1322 			if (blk->has_image)
1323 			{
1324 				COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1325 				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1326 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1327 
1328 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1329 
1330 				if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1331 				{
1332 					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1333 						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1334 					else
1335 						blk->hole_length = 0;
1336 				}
1337 				else
1338 					blk->hole_length = BLCKSZ - blk->bimg_len;
1339 				datatotal += blk->bimg_len;
1340 
1341 				/*
1342 				 * cross-check that hole_offset > 0, hole_length > 0 and
1343 				 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1344 				 */
1345 				if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1346 					(blk->hole_offset == 0 ||
1347 					 blk->hole_length == 0 ||
1348 					 blk->bimg_len == BLCKSZ))
1349 				{
1350 					report_invalid_record(state,
1351 										  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1352 										  (unsigned int) blk->hole_offset,
1353 										  (unsigned int) blk->hole_length,
1354 										  (unsigned int) blk->bimg_len,
1355 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1356 					goto err;
1357 				}
1358 
1359 				/*
1360 				 * cross-check that hole_offset == 0 and hole_length == 0 if
1361 				 * the HAS_HOLE flag is not set.
1362 				 */
1363 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1364 					(blk->hole_offset != 0 || blk->hole_length != 0))
1365 				{
1366 					report_invalid_record(state,
1367 										  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1368 										  (unsigned int) blk->hole_offset,
1369 										  (unsigned int) blk->hole_length,
1370 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1371 					goto err;
1372 				}
1373 
1374 				/*
1375 				 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1376 				 * flag is set.
1377 				 */
1378 				if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1379 					blk->bimg_len == BLCKSZ)
1380 				{
1381 					report_invalid_record(state,
1382 										  "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1383 										  (unsigned int) blk->bimg_len,
1384 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1385 					goto err;
1386 				}
1387 
1388 				/*
1389 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1390 				 * IS_COMPRESSED flag is set.
1391 				 */
1392 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1393 					!(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1394 					blk->bimg_len != BLCKSZ)
1395 				{
1396 					report_invalid_record(state,
1397 										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1398 										  (unsigned int) blk->data_len,
1399 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1400 					goto err;
1401 				}
1402 			}
1403 			if (!(fork_flags & BKPBLOCK_SAME_REL))
1404 			{
1405 				COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1406 				rnode = &blk->rnode;
1407 			}
1408 			else
1409 			{
1410 				if (rnode == NULL)
1411 				{
1412 					report_invalid_record(state,
1413 										  "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1414 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1415 					goto err;
1416 				}
1417 
1418 				blk->rnode = *rnode;
1419 			}
1420 			COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1421 		}
1422 		else
1423 		{
1424 			report_invalid_record(state,
1425 								  "invalid block_id %u at %X/%X",
1426 								  block_id,
1427 								  (uint32) (state->ReadRecPtr >> 32),
1428 								  (uint32) state->ReadRecPtr);
1429 			goto err;
1430 		}
1431 	}
1432 
1433 	if (remaining != datatotal)
1434 		goto shortdata_err;
1435 
1436 	/*
1437 	 * Ok, we've parsed the fragment headers, and verified that the total
1438 	 * length of the payload in the fragments is equal to the amount of data
1439 	 * left. Copy the data of each fragment to a separate buffer.
1440 	 *
1441 	 * We could just set up pointers into readRecordBuf, but we want to align
1442 	 * the data for the convenience of the callers. Backup images are not
1443 	 * copied, however; they don't need alignment.
1444 	 */
1445 
1446 	/* block data first */
1447 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1448 	{
1449 		DecodedBkpBlock *blk = &state->blocks[block_id];
1450 
1451 		if (!blk->in_use)
1452 			continue;
1453 
1454 		Assert(blk->has_image || !blk->apply_image);
1455 
1456 		if (blk->has_image)
1457 		{
1458 			blk->bkp_image = ptr;
1459 			ptr += blk->bimg_len;
1460 		}
1461 		if (blk->has_data)
1462 		{
1463 			if (!blk->data || blk->data_len > blk->data_bufsz)
1464 			{
1465 				if (blk->data)
1466 					pfree(blk->data);
1467 
1468 				/*
1469 				 * Force the initial request to be BLCKSZ so that we don't
1470 				 * waste time with lots of trips through this stanza as a
1471 				 * result of WAL compression.
1472 				 */
1473 				blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
1474 				blk->data = palloc(blk->data_bufsz);
1475 			}
1476 			memcpy(blk->data, ptr, blk->data_len);
1477 			ptr += blk->data_len;
1478 		}
1479 	}
1480 
1481 	/* and finally, the main data */
1482 	if (state->main_data_len > 0)
1483 	{
1484 		if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1485 		{
1486 			if (state->main_data)
1487 				pfree(state->main_data);
1488 
1489 			/*
1490 			 * main_data_bufsz must be MAXALIGN'ed.  In many xlog record
1491 			 * types, we omit trailing struct padding on-disk to save a few
1492 			 * bytes; but compilers may generate accesses to the xlog struct
1493 			 * that assume that padding bytes are present.  If the palloc
1494 			 * request is not large enough to include such padding bytes then
1495 			 * we'll get valgrind complaints due to otherwise-harmless fetches
1496 			 * of the padding bytes.
1497 			 *
1498 			 * In addition, force the initial request to be reasonably large
1499 			 * so that we don't waste time with lots of trips through this
1500 			 * stanza.  BLCKSZ / 2 seems like a good compromise choice.
1501 			 */
1502 			state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1503 												  BLCKSZ / 2));
1504 			state->main_data = palloc(state->main_data_bufsz);
1505 		}
1506 		memcpy(state->main_data, ptr, state->main_data_len);
1507 		ptr += state->main_data_len;
1508 	}
1509 
1510 	return true;
1511 
1512 shortdata_err:
1513 	report_invalid_record(state,
1514 						  "record with invalid length at %X/%X",
1515 						  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1516 err:
1517 	*errormsg = state->errormsg_buf;
1518 
1519 	return false;
1520 }
1521 
1522 /*
1523  * Returns information about the block that a block reference refers to.
1524  *
1525  * If the WAL record contains a block reference with the given ID, *rnode,
1526  * *forknum, and *blknum are filled in (if not NULL), and returns true.
1527  * Otherwise returns false.
1528  */
1529 bool
XLogRecGetBlockTag(XLogReaderState * record,uint8 block_id,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)1530 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1531 				   RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1532 {
1533 	DecodedBkpBlock *bkpb;
1534 
1535 	if (!record->blocks[block_id].in_use)
1536 		return false;
1537 
1538 	bkpb = &record->blocks[block_id];
1539 	if (rnode)
1540 		*rnode = bkpb->rnode;
1541 	if (forknum)
1542 		*forknum = bkpb->forknum;
1543 	if (blknum)
1544 		*blknum = bkpb->blkno;
1545 	return true;
1546 }
1547 
1548 /*
1549  * Returns the data associated with a block reference, or NULL if there is
1550  * no data (e.g. because a full-page image was taken instead). The returned
1551  * pointer points to a MAXALIGNed buffer.
1552  */
1553 char *
XLogRecGetBlockData(XLogReaderState * record,uint8 block_id,Size * len)1554 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1555 {
1556 	DecodedBkpBlock *bkpb;
1557 
1558 	if (!record->blocks[block_id].in_use)
1559 		return NULL;
1560 
1561 	bkpb = &record->blocks[block_id];
1562 
1563 	if (!bkpb->has_data)
1564 	{
1565 		if (len)
1566 			*len = 0;
1567 		return NULL;
1568 	}
1569 	else
1570 	{
1571 		if (len)
1572 			*len = bkpb->data_len;
1573 		return bkpb->data;
1574 	}
1575 }
1576 
1577 /*
1578  * Restore a full-page image from a backup block attached to an XLOG record.
1579  *
1580  * Returns true if a full-page image is restored.
1581  */
1582 bool
RestoreBlockImage(XLogReaderState * record,uint8 block_id,char * page)1583 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1584 {
1585 	DecodedBkpBlock *bkpb;
1586 	char	   *ptr;
1587 	PGAlignedBlock tmp;
1588 
1589 	if (!record->blocks[block_id].in_use)
1590 		return false;
1591 	if (!record->blocks[block_id].has_image)
1592 		return false;
1593 
1594 	bkpb = &record->blocks[block_id];
1595 	ptr = bkpb->bkp_image;
1596 
1597 	if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1598 	{
1599 		/* If a backup block image is compressed, decompress it */
1600 		if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1601 							BLCKSZ - bkpb->hole_length, true) < 0)
1602 		{
1603 			report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1604 								  (uint32) (record->ReadRecPtr >> 32),
1605 								  (uint32) record->ReadRecPtr,
1606 								  block_id);
1607 			return false;
1608 		}
1609 		ptr = tmp.data;
1610 	}
1611 
1612 	/* generate page, taking into account hole if necessary */
1613 	if (bkpb->hole_length == 0)
1614 	{
1615 		memcpy(page, ptr, BLCKSZ);
1616 	}
1617 	else
1618 	{
1619 		memcpy(page, ptr, bkpb->hole_offset);
1620 		/* must zero-fill the hole */
1621 		MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1622 		memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1623 			   ptr + bkpb->hole_offset,
1624 			   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1625 	}
1626 
1627 	return true;
1628 }
1629 
1630 #ifndef FRONTEND
1631 
1632 /*
1633  * Extract the FullTransactionId from a WAL record.
1634  */
1635 FullTransactionId
XLogRecGetFullXid(XLogReaderState * record)1636 XLogRecGetFullXid(XLogReaderState *record)
1637 {
1638 	TransactionId xid,
1639 				next_xid;
1640 	uint32		epoch;
1641 
1642 	/*
1643 	 * This function is only safe during replay, because it depends on the
1644 	 * replay state.  See AdvanceNextFullTransactionIdPastXid() for more.
1645 	 */
1646 	Assert(AmStartupProcess() || !IsUnderPostmaster);
1647 
1648 	xid = XLogRecGetXid(record);
1649 	next_xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
1650 	epoch = EpochFromFullTransactionId(ShmemVariableCache->nextFullXid);
1651 
1652 	/*
1653 	 * If xid is numerically greater than next_xid, it has to be from the last
1654 	 * epoch.
1655 	 */
1656 	if (unlikely(xid > next_xid))
1657 		--epoch;
1658 
1659 	return FullTransactionIdFromEpochAndXid(epoch, xid);
1660 }
1661 
1662 #endif
1663