1 /*-------------------------------------------------------------------------
2  *
3  * xlogreader.c
4  *		Generic XLog reading facility
5  *
6  * Portions Copyright (c) 2013-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		src/backend/access/transam/xlogreader.c
10  *
11  * NOTES
12  *		See xlogreader.h for more notes on this facility.
13  *
14  *		This file is compiled as both front-end and backend code, so it
15  *		may not use ereport, server-defined static variables, etc.
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 
20 #include "access/transam.h"
21 #include "access/xlogrecord.h"
22 #include "access/xlog_internal.h"
23 #include "access/xlogreader.h"
24 #include "catalog/pg_control.h"
25 #include "common/pg_lzcompress.h"
26 #include "replication/origin.h"
27 
28 #ifndef FRONTEND
29 #include "utils/memutils.h"
30 #endif
31 
32 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
33 
34 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
35 					  XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
36 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
37 				XLogRecPtr recptr);
38 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
39 				 int reqLen);
40 static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3);
41 
42 static void ResetDecoder(XLogReaderState *state);
43 
44 /* size of the buffer allocated for error message. */
45 #define MAX_ERRORMSG_LEN 1000
46 
47 /*
48  * Construct a string in state->errormsg_buf explaining what's wrong with
49  * the current record being read.
50  */
51 static void
report_invalid_record(XLogReaderState * state,const char * fmt,...)52 report_invalid_record(XLogReaderState *state, const char *fmt,...)
53 {
54 	va_list		args;
55 
56 	fmt = _(fmt);
57 
58 	va_start(args, fmt);
59 	vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
60 	va_end(args);
61 }
62 
63 /*
64  * Allocate and initialize a new XLogReader.
65  *
66  * Returns NULL if the xlogreader couldn't be allocated.
67  */
68 XLogReaderState *
XLogReaderAllocate(XLogPageReadCB pagereadfunc,void * private_data)69 XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
70 {
71 	XLogReaderState *state;
72 
73 	state = (XLogReaderState *)
74 		palloc_extended(sizeof(XLogReaderState),
75 						MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
76 	if (!state)
77 		return NULL;
78 
79 	state->max_block_id = -1;
80 
81 	/*
82 	 * Permanently allocate readBuf.  We do it this way, rather than just
83 	 * making a static array, for two reasons: (1) no need to waste the
84 	 * storage in most instantiations of the backend; (2) a static char array
85 	 * isn't guaranteed to have any particular alignment, whereas
86 	 * palloc_extended() will provide MAXALIGN'd storage.
87 	 */
88 	state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
89 											  MCXT_ALLOC_NO_OOM);
90 	if (!state->readBuf)
91 	{
92 		pfree(state);
93 		return NULL;
94 	}
95 
96 	state->read_page = pagereadfunc;
97 	/* system_identifier initialized to zeroes above */
98 	state->private_data = private_data;
99 	/* ReadRecPtr and EndRecPtr initialized to zeroes above */
100 	/* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
101 	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
102 										  MCXT_ALLOC_NO_OOM);
103 	if (!state->errormsg_buf)
104 	{
105 		pfree(state->readBuf);
106 		pfree(state);
107 		return NULL;
108 	}
109 	state->errormsg_buf[0] = '\0';
110 
111 	/*
112 	 * Allocate an initial readRecordBuf of minimal size, which can later be
113 	 * enlarged if necessary.
114 	 */
115 	if (!allocate_recordbuf(state, 0))
116 	{
117 		pfree(state->errormsg_buf);
118 		pfree(state->readBuf);
119 		pfree(state);
120 		return NULL;
121 	}
122 
123 	return state;
124 }
125 
126 void
XLogReaderFree(XLogReaderState * state)127 XLogReaderFree(XLogReaderState *state)
128 {
129 	int			block_id;
130 
131 	for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
132 	{
133 		if (state->blocks[block_id].data)
134 			pfree(state->blocks[block_id].data);
135 	}
136 	if (state->main_data)
137 		pfree(state->main_data);
138 
139 	pfree(state->errormsg_buf);
140 	if (state->readRecordBuf)
141 		pfree(state->readRecordBuf);
142 	pfree(state->readBuf);
143 	pfree(state);
144 }
145 
146 /*
147  * Allocate readRecordBuf to fit a record of at least the given length.
148  * Returns true if successful, false if out of memory.
149  *
150  * readRecordBufSize is set to the new buffer size.
151  *
152  * To avoid useless small increases, round its size to a multiple of
153  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
154  * with.  (That is enough for all "normal" records, but very large commit or
155  * abort records might need more space.)
156  */
157 static bool
allocate_recordbuf(XLogReaderState * state,uint32 reclength)158 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
159 {
160 	uint32		newSize = reclength;
161 
162 	newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
163 	newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
164 
165 #ifndef FRONTEND
166 
167 	/*
168 	 * Note that in much unlucky circumstances, the random data read from a
169 	 * recycled segment can cause this routine to be called with a size
170 	 * causing a hard failure at allocation.  For a standby, this would cause
171 	 * the instance to stop suddenly with a hard failure, preventing it to
172 	 * retry fetching WAL from one of its sources which could allow it to move
173 	 * on with replay without a manual restart. If the data comes from a past
174 	 * recycled segment and is still valid, then the allocation may succeed
175 	 * but record checks are going to fail so this would be short-lived.  If
176 	 * the allocation fails because of a memory shortage, then this is not a
177 	 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
178 	 */
179 	if (!AllocSizeIsValid(newSize))
180 		return false;
181 
182 #endif
183 
184 	if (state->readRecordBuf)
185 		pfree(state->readRecordBuf);
186 	state->readRecordBuf =
187 		(char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
188 	if (state->readRecordBuf == NULL)
189 	{
190 		state->readRecordBufSize = 0;
191 		return false;
192 	}
193 	state->readRecordBufSize = newSize;
194 	return true;
195 }
196 
197 /*
198  * Attempt to read an XLOG record.
199  *
200  * If RecPtr is valid, try to read a record at that position.  Otherwise
201  * try to read a record just after the last one previously read.
202  *
203  * If the read_page callback fails to read the requested data, NULL is
204  * returned.  The callback is expected to have reported the error; errormsg
205  * is set to NULL.
206  *
207  * If the reading fails for some other reason, NULL is also returned, and
208  * *errormsg is set to a string with details of the failure.
209  *
210  * The returned pointer (or *errormsg) points to an internal buffer that's
211  * valid until the next call to XLogReadRecord.
212  */
213 XLogRecord *
XLogReadRecord(XLogReaderState * state,XLogRecPtr RecPtr,char ** errormsg)214 XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
215 {
216 	XLogRecord *record;
217 	XLogRecPtr	targetPagePtr;
218 	bool		randAccess;
219 	uint32		len,
220 				total_len;
221 	uint32		targetRecOff;
222 	uint32		pageHeaderSize;
223 	bool		assembled;
224 	bool		gotheader;
225 	int			readOff;
226 
227 	/*
228 	 * randAccess indicates whether to verify the previous-record pointer of
229 	 * the record we're reading.  We only do this if we're reading
230 	 * sequentially, which is what we initially assume.
231 	 */
232 	randAccess = false;
233 
234 	/* reset error state */
235 	*errormsg = NULL;
236 	state->errormsg_buf[0] = '\0';
237 
238 	ResetDecoder(state);
239 	state->abortedRecPtr = InvalidXLogRecPtr;
240 	state->missingContrecPtr = InvalidXLogRecPtr;
241 
242 	if (RecPtr == InvalidXLogRecPtr)
243 	{
244 		/* No explicit start point; read the record after the one we just read */
245 		RecPtr = state->EndRecPtr;
246 
247 		if (state->ReadRecPtr == InvalidXLogRecPtr)
248 			randAccess = true;
249 
250 		/*
251 		 * RecPtr is pointing to end+1 of the previous WAL record.  If we're
252 		 * at a page boundary, no more records can fit on the current page. We
253 		 * must skip over the page header, but we can't do that until we've
254 		 * read in the page, since the header size is variable.
255 		 */
256 	}
257 	else
258 	{
259 		/*
260 		 * Caller supplied a position to start at.
261 		 *
262 		 * In this case, the passed-in record pointer should already be
263 		 * pointing to a valid record starting position.
264 		 */
265 		Assert(XRecOffIsValid(RecPtr));
266 		randAccess = true;
267 	}
268 
269 restart:
270 	state->currRecPtr = RecPtr;
271 	assembled = false;
272 
273 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
274 	targetRecOff = RecPtr % XLOG_BLCKSZ;
275 
276 	/*
277 	 * Read the page containing the record into state->readBuf. Request enough
278 	 * byte to cover the whole record header, or at least the part of it that
279 	 * fits on the same page.
280 	 */
281 	readOff = ReadPageInternal(state,
282 							   targetPagePtr,
283 							   Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
284 	if (readOff < 0)
285 		goto err;
286 
287 	/*
288 	 * ReadPageInternal always returns at least the page header, so we can
289 	 * examine it now.
290 	 */
291 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
292 	if (targetRecOff == 0)
293 	{
294 		/*
295 		 * At page start, so skip over page header.
296 		 */
297 		RecPtr += pageHeaderSize;
298 		targetRecOff = pageHeaderSize;
299 	}
300 	else if (targetRecOff < pageHeaderSize)
301 	{
302 		report_invalid_record(state, "invalid record offset at %X/%X",
303 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
304 		goto err;
305 	}
306 
307 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
308 		targetRecOff == pageHeaderSize)
309 	{
310 		report_invalid_record(state, "contrecord is requested by %X/%X",
311 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
312 		goto err;
313 	}
314 
315 	/* ReadPageInternal has verified the page header */
316 	Assert(pageHeaderSize <= readOff);
317 
318 	/*
319 	 * Read the record length.
320 	 *
321 	 * NB: Even though we use an XLogRecord pointer here, the whole record
322 	 * header might not fit on this page. xl_tot_len is the first field of the
323 	 * struct, so it must be on this page (the records are MAXALIGNed), but we
324 	 * cannot access any other fields until we've verified that we got the
325 	 * whole header.
326 	 */
327 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
328 	total_len = record->xl_tot_len;
329 
330 	/*
331 	 * If the whole record header is on this page, validate it immediately.
332 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
333 	 * rest of the header after reading it from the next page.  The xl_tot_len
334 	 * check is necessary here to ensure that we enter the "Need to reassemble
335 	 * record" code path below; otherwise we might fail to apply
336 	 * ValidXLogRecordHeader at all.
337 	 */
338 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
339 	{
340 		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
341 								   randAccess))
342 			goto err;
343 		gotheader = true;
344 	}
345 	else
346 	{
347 		/* XXX: more validation should be done here */
348 		if (total_len < SizeOfXLogRecord)
349 		{
350 			report_invalid_record(state,
351 								  "invalid record length at %X/%X: wanted %u, got %u",
352 								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
353 								  (uint32) SizeOfXLogRecord, total_len);
354 			goto err;
355 		}
356 		gotheader = false;
357 	}
358 
359 	/*
360 	 * Enlarge readRecordBuf as needed.
361 	 */
362 	if (total_len > state->readRecordBufSize &&
363 		!allocate_recordbuf(state, total_len))
364 	{
365 		/* We treat this as a "bogus data" condition */
366 		report_invalid_record(state, "record length %u at %X/%X too long",
367 							  total_len,
368 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
369 		goto err;
370 	}
371 
372 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
373 	if (total_len > len)
374 	{
375 		/* Need to reassemble record */
376 		char	   *contdata;
377 		XLogPageHeader pageHeader;
378 		char	   *buffer;
379 		uint32		gotlen;
380 
381 		assembled = true;
382 		/* Copy the first fragment of the record from the first page. */
383 		memcpy(state->readRecordBuf,
384 			   state->readBuf + RecPtr % XLOG_BLCKSZ, len);
385 		buffer = state->readRecordBuf + len;
386 		gotlen = len;
387 
388 		do
389 		{
390 			/* Calculate pointer to beginning of next page */
391 			targetPagePtr += XLOG_BLCKSZ;
392 
393 			/* Wait for the next page to become available */
394 			readOff = ReadPageInternal(state, targetPagePtr,
395 									   Min(total_len - gotlen + SizeOfXLogShortPHD,
396 										   XLOG_BLCKSZ));
397 
398 			if (readOff < 0)
399 				goto err;
400 
401 			Assert(SizeOfXLogShortPHD <= readOff);
402 
403 			pageHeader = (XLogPageHeader) state->readBuf;
404 
405 			/*
406 			 * If we were expecting a continuation record and got an
407 			 * "overwrite contrecord" flag, that means the continuation record
408 			 * was overwritten with a different record.  Restart the read by
409 			 * assuming the address to read is the location where we found
410 			 * this flag; but keep track of the LSN of the record we were
411 			 * reading, for later verification.
412 			 */
413 			if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
414 			{
415 				state->overwrittenRecPtr = state->currRecPtr;
416 				ResetDecoder(state);
417 				RecPtr = targetPagePtr;
418 				goto restart;
419 			}
420 
421 			/* Check that the continuation on next page looks valid */
422 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
423 			{
424 				report_invalid_record(state,
425 									  "there is no contrecord flag at %X/%X",
426 									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
427 				goto err;
428 			}
429 
430 			/*
431 			 * Cross-check that xlp_rem_len agrees with how much of the record
432 			 * we expect there to be left.
433 			 */
434 			if (pageHeader->xlp_rem_len == 0 ||
435 				total_len != (pageHeader->xlp_rem_len + gotlen))
436 			{
437 				report_invalid_record(state,
438 									  "invalid contrecord length %u at %X/%X",
439 									  pageHeader->xlp_rem_len,
440 									  (uint32) (RecPtr >> 32), (uint32) RecPtr);
441 				goto err;
442 			}
443 
444 			/* Append the continuation from this page to the buffer */
445 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
446 
447 			if (readOff < pageHeaderSize)
448 				readOff = ReadPageInternal(state, targetPagePtr,
449 										   pageHeaderSize);
450 
451 			Assert(pageHeaderSize <= readOff);
452 
453 			contdata = (char *) state->readBuf + pageHeaderSize;
454 			len = XLOG_BLCKSZ - pageHeaderSize;
455 			if (pageHeader->xlp_rem_len < len)
456 				len = pageHeader->xlp_rem_len;
457 
458 			if (readOff < pageHeaderSize + len)
459 				readOff = ReadPageInternal(state, targetPagePtr,
460 										   pageHeaderSize + len);
461 
462 			memcpy(buffer, (char *) contdata, len);
463 			buffer += len;
464 			gotlen += len;
465 
466 			/* If we just reassembled the record header, validate it. */
467 			if (!gotheader)
468 			{
469 				record = (XLogRecord *) state->readRecordBuf;
470 				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
471 										   record, randAccess))
472 					goto err;
473 				gotheader = true;
474 			}
475 		} while (gotlen < total_len);
476 
477 		Assert(gotheader);
478 
479 		record = (XLogRecord *) state->readRecordBuf;
480 		if (!ValidXLogRecord(state, record, RecPtr))
481 			goto err;
482 
483 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
484 		state->ReadRecPtr = RecPtr;
485 		state->EndRecPtr = targetPagePtr + pageHeaderSize
486 			+ MAXALIGN(pageHeader->xlp_rem_len);
487 	}
488 	else
489 	{
490 		/* Wait for the record data to become available */
491 		readOff = ReadPageInternal(state, targetPagePtr,
492 								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
493 		if (readOff < 0)
494 			goto err;
495 
496 		/* Record does not cross a page boundary */
497 		if (!ValidXLogRecord(state, record, RecPtr))
498 			goto err;
499 
500 		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
501 
502 		state->ReadRecPtr = RecPtr;
503 		memcpy(state->readRecordBuf, record, total_len);
504 	}
505 
506 	/*
507 	 * Special processing if it's an XLOG SWITCH record
508 	 */
509 	if (record->xl_rmid == RM_XLOG_ID &&
510 		(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
511 	{
512 		/* Pretend it extends to end of segment */
513 		state->EndRecPtr += XLogSegSize - 1;
514 		state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
515 	}
516 
517 	if (DecodeXLogRecord(state, record, errormsg))
518 		return record;
519 	else
520 		return NULL;
521 
522 err:
523 	if (assembled)
524 	{
525 		/*
526 		 * We get here when a record that spans multiple pages needs to be
527 		 * assembled, but something went wrong -- perhaps a contrecord piece
528 		 * was lost.  If caller is WAL replay, it will know where the aborted
529 		 * record was and where to direct followup WAL to be written, marking
530 		 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
531 		 * in turn signal downstream WAL consumers that the broken WAL record
532 		 * is to be ignored.
533 		 */
534 		state->abortedRecPtr = RecPtr;
535 		state->missingContrecPtr = targetPagePtr;
536 	}
537 
538 	/*
539 	 * Invalidate the read state. We might read from a different source after
540 	 * failure.
541 	 */
542 	XLogReaderInvalReadState(state);
543 
544 	if (state->errormsg_buf[0] != '\0')
545 		*errormsg = state->errormsg_buf;
546 
547 	return NULL;
548 }
549 
550 /*
551  * Read a single xlog page including at least [pageptr, reqLen] of valid data
552  * via the read_page() callback.
553  *
554  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
555  * is set in that case (unless the error occurs in the read_page callback).
556  *
557  * We fetch the page from a reader-local cache if we know we have the required
558  * data and if there hasn't been any error since caching the data.
559  */
560 static int
ReadPageInternal(XLogReaderState * state,XLogRecPtr pageptr,int reqLen)561 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
562 {
563 	int			readLen;
564 	uint32		targetPageOff;
565 	XLogSegNo	targetSegNo;
566 	XLogPageHeader hdr;
567 
568 	Assert((pageptr % XLOG_BLCKSZ) == 0);
569 
570 	XLByteToSeg(pageptr, targetSegNo);
571 	targetPageOff = (pageptr % XLogSegSize);
572 
573 	/* check whether we have all the requested data already */
574 	if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
575 		reqLen < state->readLen)
576 		return state->readLen;
577 
578 	/*
579 	 * Data is not in our buffer.
580 	 *
581 	 * Every time we actually read the page, even if we looked at parts of it
582 	 * before, we need to do verification as the read_page callback might now
583 	 * be rereading data from a different source.
584 	 *
585 	 * Whenever switching to a new WAL segment, we read the first page of the
586 	 * file and validate its header, even if that's not where the target
587 	 * record is.  This is so that we can check the additional identification
588 	 * info that is present in the first page's "long" header.
589 	 */
590 	if (targetSegNo != state->readSegNo && targetPageOff != 0)
591 	{
592 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
593 
594 		readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
595 								   state->currRecPtr,
596 								   state->readBuf, &state->readPageTLI);
597 		if (readLen < 0)
598 			goto err;
599 
600 		/* we can be sure to have enough WAL available, we scrolled back */
601 		Assert(readLen == XLOG_BLCKSZ);
602 
603 		if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
604 										  state->readBuf))
605 			goto err;
606 	}
607 
608 	/*
609 	 * First, read the requested data length, but at least a short page header
610 	 * so that we can validate it.
611 	 */
612 	readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
613 							   state->currRecPtr,
614 							   state->readBuf, &state->readPageTLI);
615 	if (readLen < 0)
616 		goto err;
617 
618 	Assert(readLen <= XLOG_BLCKSZ);
619 
620 	/* Do we have enough data to check the header length? */
621 	if (readLen <= SizeOfXLogShortPHD)
622 		goto err;
623 
624 	Assert(readLen >= reqLen);
625 
626 	hdr = (XLogPageHeader) state->readBuf;
627 
628 	/* still not enough */
629 	if (readLen < XLogPageHeaderSize(hdr))
630 	{
631 		readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
632 								   state->currRecPtr,
633 								   state->readBuf, &state->readPageTLI);
634 		if (readLen < 0)
635 			goto err;
636 	}
637 
638 	/*
639 	 * Now that we know we have the full header, validate it.
640 	 */
641 	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
642 		goto err;
643 
644 	/* update read state information */
645 	state->readSegNo = targetSegNo;
646 	state->readOff = targetPageOff;
647 	state->readLen = readLen;
648 
649 	return readLen;
650 
651 err:
652 	XLogReaderInvalReadState(state);
653 	return -1;
654 }
655 
656 /*
657  * Invalidate the xlogreader's read state to force a re-read.
658  */
659 void
XLogReaderInvalReadState(XLogReaderState * state)660 XLogReaderInvalReadState(XLogReaderState *state)
661 {
662 	state->readSegNo = 0;
663 	state->readOff = 0;
664 	state->readLen = 0;
665 }
666 
667 /*
668  * Validate an XLOG record header.
669  *
670  * This is just a convenience subroutine to avoid duplicated code in
671  * XLogReadRecord.  It's not intended for use from anywhere else.
672  */
673 static bool
ValidXLogRecordHeader(XLogReaderState * state,XLogRecPtr RecPtr,XLogRecPtr PrevRecPtr,XLogRecord * record,bool randAccess)674 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
675 					  XLogRecPtr PrevRecPtr, XLogRecord *record,
676 					  bool randAccess)
677 {
678 	if (record->xl_tot_len < SizeOfXLogRecord)
679 	{
680 		report_invalid_record(state,
681 							  "invalid record length at %X/%X: wanted %u, got %u",
682 							  (uint32) (RecPtr >> 32), (uint32) RecPtr,
683 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
684 		return false;
685 	}
686 	if (record->xl_rmid > RM_MAX_ID)
687 	{
688 		report_invalid_record(state,
689 							  "invalid resource manager ID %u at %X/%X",
690 							  record->xl_rmid, (uint32) (RecPtr >> 32),
691 							  (uint32) RecPtr);
692 		return false;
693 	}
694 	if (randAccess)
695 	{
696 		/*
697 		 * We can't exactly verify the prev-link, but surely it should be less
698 		 * than the record's own address.
699 		 */
700 		if (!(record->xl_prev < RecPtr))
701 		{
702 			report_invalid_record(state,
703 								  "record with incorrect prev-link %X/%X at %X/%X",
704 								  (uint32) (record->xl_prev >> 32),
705 								  (uint32) record->xl_prev,
706 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
707 			return false;
708 		}
709 	}
710 	else
711 	{
712 		/*
713 		 * Record's prev-link should exactly match our previous location. This
714 		 * check guards against torn WAL pages where a stale but valid-looking
715 		 * WAL record starts on a sector boundary.
716 		 */
717 		if (record->xl_prev != PrevRecPtr)
718 		{
719 			report_invalid_record(state,
720 								  "record with incorrect prev-link %X/%X at %X/%X",
721 								  (uint32) (record->xl_prev >> 32),
722 								  (uint32) record->xl_prev,
723 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
724 			return false;
725 		}
726 	}
727 
728 	return true;
729 }
730 
731 
732 /*
733  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
734  * record (other than to the minimal extent of computing the amount of
735  * data to read in) until we've checked the CRCs.
736  *
737  * We assume all of the record (that is, xl_tot_len bytes) has been read
738  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
739  * record's header, which means in particular that xl_tot_len is at least
740  * SizeOfXlogRecord.
741  */
742 static bool
ValidXLogRecord(XLogReaderState * state,XLogRecord * record,XLogRecPtr recptr)743 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
744 {
745 	pg_crc32c	crc;
746 
747 	/* Calculate the CRC */
748 	INIT_CRC32C(crc);
749 	COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
750 	/* include the record header last */
751 	COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
752 	FIN_CRC32C(crc);
753 
754 	if (!EQ_CRC32C(record->xl_crc, crc))
755 	{
756 		report_invalid_record(state,
757 							  "incorrect resource manager data checksum in record at %X/%X",
758 							  (uint32) (recptr >> 32), (uint32) recptr);
759 		return false;
760 	}
761 
762 	return true;
763 }
764 
765 /*
766  * Validate a page header.
767  *
768  * Check if 'phdr' is valid as the header of the XLog page at position
769  * 'recptr'.
770  */
771 bool
XLogReaderValidatePageHeader(XLogReaderState * state,XLogRecPtr recptr,char * phdr)772 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
773 							 char *phdr)
774 {
775 	XLogRecPtr	recaddr;
776 	XLogSegNo	segno;
777 	int32		offset;
778 	XLogPageHeader hdr = (XLogPageHeader) phdr;
779 
780 	Assert((recptr % XLOG_BLCKSZ) == 0);
781 
782 	XLByteToSeg(recptr, segno);
783 	offset = recptr % XLogSegSize;
784 
785 	XLogSegNoOffsetToRecPtr(segno, offset, recaddr);
786 
787 	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
788 	{
789 		char		fname[MAXFNAMELEN];
790 
791 		XLogFileName(fname, state->readPageTLI, segno);
792 
793 		report_invalid_record(state,
794 							  "invalid magic number %04X in log segment %s, offset %u",
795 							  hdr->xlp_magic,
796 							  fname,
797 							  offset);
798 		return false;
799 	}
800 
801 	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
802 	{
803 		char		fname[MAXFNAMELEN];
804 
805 		XLogFileName(fname, state->readPageTLI, segno);
806 
807 		report_invalid_record(state,
808 							  "invalid info bits %04X in log segment %s, offset %u",
809 							  hdr->xlp_info,
810 							  fname,
811 							  offset);
812 		return false;
813 	}
814 
815 	if (hdr->xlp_info & XLP_LONG_HEADER)
816 	{
817 		XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
818 
819 		if (state->system_identifier &&
820 			longhdr->xlp_sysid != state->system_identifier)
821 		{
822 			char		fhdrident_str[32];
823 			char		sysident_str[32];
824 
825 			/*
826 			 * Format sysids separately to keep platform-dependent format code
827 			 * out of the translatable message string.
828 			 */
829 			snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
830 					 longhdr->xlp_sysid);
831 			snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
832 					 state->system_identifier);
833 			report_invalid_record(state,
834 								  "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s",
835 								  fhdrident_str, sysident_str);
836 			return false;
837 		}
838 		else if (longhdr->xlp_seg_size != XLogSegSize)
839 		{
840 			report_invalid_record(state,
841 								  "WAL file is from different database system: incorrect XLOG_SEG_SIZE in page header");
842 			return false;
843 		}
844 		else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
845 		{
846 			report_invalid_record(state,
847 								  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
848 			return false;
849 		}
850 	}
851 	else if (offset == 0)
852 	{
853 		char		fname[MAXFNAMELEN];
854 
855 		XLogFileName(fname, state->readPageTLI, segno);
856 
857 		/* hmm, first page of file doesn't have a long header? */
858 		report_invalid_record(state,
859 							  "invalid info bits %04X in log segment %s, offset %u",
860 							  hdr->xlp_info,
861 							  fname,
862 							  offset);
863 		return false;
864 	}
865 
866 	/*
867 	 * Check that the address on the page agrees with what we expected.
868 	 * This check typically fails when an old WAL segment is recycled,
869 	 * and hasn't yet been overwritten with new data yet.
870 	 */
871 	if (hdr->xlp_pageaddr != recaddr)
872 	{
873 		char		fname[MAXFNAMELEN];
874 
875 		XLogFileName(fname, state->readPageTLI, segno);
876 
877 		report_invalid_record(state,
878 							  "unexpected pageaddr %X/%X in log segment %s, offset %u",
879 							  (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
880 							  fname,
881 							  offset);
882 		return false;
883 	}
884 
885 	/*
886 	 * Since child timelines are always assigned a TLI greater than their
887 	 * immediate parent's TLI, we should never see TLI go backwards across
888 	 * successive pages of a consistent WAL sequence.
889 	 *
890 	 * Sometimes we re-read a segment that's already been (partially) read. So
891 	 * we only verify TLIs for pages that are later than the last remembered
892 	 * LSN.
893 	 */
894 	if (recptr > state->latestPagePtr)
895 	{
896 		if (hdr->xlp_tli < state->latestPageTLI)
897 		{
898 			char		fname[MAXFNAMELEN];
899 
900 			XLogFileName(fname, state->readPageTLI, segno);
901 
902 			report_invalid_record(state,
903 								  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
904 								  hdr->xlp_tli,
905 								  state->latestPageTLI,
906 								  fname,
907 								  offset);
908 			return false;
909 		}
910 	}
911 	state->latestPagePtr = recptr;
912 	state->latestPageTLI = hdr->xlp_tli;
913 
914 	return true;
915 }
916 
917 #ifdef FRONTEND
918 /*
919  * Functions that are currently not needed in the backend, but are better
920  * implemented inside xlogreader.c because of the internal facilities available
921  * here.
922  */
923 
924 /*
925  * Find the first record with an lsn >= RecPtr.
926  *
927  * Useful for checking whether RecPtr is a valid xlog address for reading, and
928  * to find the first valid address after some address when dumping records for
929  * debugging purposes.
930  */
931 XLogRecPtr
XLogFindNextRecord(XLogReaderState * state,XLogRecPtr RecPtr)932 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
933 {
934 	XLogReaderState saved_state = *state;
935 	XLogRecPtr	tmpRecPtr;
936 	XLogRecPtr	found = InvalidXLogRecPtr;
937 	XLogPageHeader header;
938 	char	   *errormsg;
939 
940 	Assert(!XLogRecPtrIsInvalid(RecPtr));
941 
942 	/*
943 	 * skip over potential continuation data, keeping in mind that it may span
944 	 * multiple pages
945 	 */
946 	tmpRecPtr = RecPtr;
947 	while (true)
948 	{
949 		XLogRecPtr	targetPagePtr;
950 		int			targetRecOff;
951 		uint32		pageHeaderSize;
952 		int			readLen;
953 
954 		/*
955 		 * Compute targetRecOff. It should typically be equal or greater than
956 		 * short page-header since a valid record can't start anywhere before
957 		 * that, except when caller has explicitly specified the offset that
958 		 * falls somewhere there or when we are skipping multi-page
959 		 * continuation record. It doesn't matter though because
960 		 * ReadPageInternal() is prepared to handle that and will read at
961 		 * least short page-header worth of data
962 		 */
963 		targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
964 
965 		/* scroll back to page boundary */
966 		targetPagePtr = tmpRecPtr - targetRecOff;
967 
968 		/* Read the page containing the record */
969 		readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
970 		if (readLen < 0)
971 			goto err;
972 
973 		header = (XLogPageHeader) state->readBuf;
974 
975 		pageHeaderSize = XLogPageHeaderSize(header);
976 
977 		/* make sure we have enough data for the page header */
978 		readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
979 		if (readLen < 0)
980 			goto err;
981 
982 		/* skip over potential continuation data */
983 		if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
984 		{
985 			/*
986 			 * If the length of the remaining continuation data is more than
987 			 * what can fit in this page, the continuation record crosses over
988 			 * this page. Read the next page and try again. xlp_rem_len in the
989 			 * next page header will contain the remaining length of the
990 			 * continuation data
991 			 *
992 			 * Note that record headers are MAXALIGN'ed
993 			 */
994 			if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
995 				tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
996 			else
997 			{
998 				/*
999 				 * The previous continuation record ends in this page. Set
1000 				 * tmpRecPtr to point to the first valid record
1001 				 */
1002 				tmpRecPtr = targetPagePtr + pageHeaderSize
1003 					+ MAXALIGN(header->xlp_rem_len);
1004 				break;
1005 			}
1006 		}
1007 		else
1008 		{
1009 			tmpRecPtr = targetPagePtr + pageHeaderSize;
1010 			break;
1011 		}
1012 	}
1013 
1014 	/*
1015 	 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1016 	 * because either we're at the first record after the beginning of a page
1017 	 * or we just jumped over the remaining data of a continuation.
1018 	 */
1019 	while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
1020 	{
1021 		/* continue after the record */
1022 		tmpRecPtr = InvalidXLogRecPtr;
1023 
1024 		/* past the record we've found, break out */
1025 		if (RecPtr <= state->ReadRecPtr)
1026 		{
1027 			found = state->ReadRecPtr;
1028 			goto out;
1029 		}
1030 	}
1031 
1032 err:
1033 out:
1034 	/* Reset state to what we had before finding the record */
1035 	state->ReadRecPtr = saved_state.ReadRecPtr;
1036 	state->EndRecPtr = saved_state.EndRecPtr;
1037 	XLogReaderInvalReadState(state);
1038 
1039 	return found;
1040 }
1041 
1042 #endif							/* FRONTEND */
1043 
1044 
1045 /* ----------------------------------------
1046  * Functions for decoding the data and block references in a record.
1047  * ----------------------------------------
1048  */
1049 
1050 /* private function to reset the state between records */
1051 static void
ResetDecoder(XLogReaderState * state)1052 ResetDecoder(XLogReaderState *state)
1053 {
1054 	int			block_id;
1055 
1056 	state->decoded_record = NULL;
1057 
1058 	state->main_data_len = 0;
1059 
1060 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1061 	{
1062 		state->blocks[block_id].in_use = false;
1063 		state->blocks[block_id].has_image = false;
1064 		state->blocks[block_id].has_data = false;
1065 		state->blocks[block_id].apply_image = false;
1066 	}
1067 	state->max_block_id = -1;
1068 }
1069 
1070 /*
1071  * Decode the previously read record.
1072  *
1073  * On error, a human-readable error message is returned in *errormsg, and
1074  * the return value is false.
1075  */
1076 bool
DecodeXLogRecord(XLogReaderState * state,XLogRecord * record,char ** errormsg)1077 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1078 {
1079 	/*
1080 	 * read next _size bytes from record buffer, but check for overrun first.
1081 	 */
1082 #define COPY_HEADER_FIELD(_dst, _size)			\
1083 	do {										\
1084 		if (remaining < _size)					\
1085 			goto shortdata_err;					\
1086 		memcpy(_dst, ptr, _size);				\
1087 		ptr += _size;							\
1088 		remaining -= _size;						\
1089 	} while(0)
1090 
1091 	char	   *ptr;
1092 	uint32		remaining;
1093 	uint32		datatotal;
1094 	RelFileNode *rnode = NULL;
1095 	uint8		block_id;
1096 
1097 	ResetDecoder(state);
1098 
1099 	state->decoded_record = record;
1100 	state->record_origin = InvalidRepOriginId;
1101 
1102 	ptr = (char *) record;
1103 	ptr += SizeOfXLogRecord;
1104 	remaining = record->xl_tot_len - SizeOfXLogRecord;
1105 
1106 	/* Decode the headers */
1107 	datatotal = 0;
1108 	while (remaining > datatotal)
1109 	{
1110 		COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1111 
1112 		if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1113 		{
1114 			/* XLogRecordDataHeaderShort */
1115 			uint8		main_data_len;
1116 
1117 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1118 
1119 			state->main_data_len = main_data_len;
1120 			datatotal += main_data_len;
1121 			break;				/* by convention, the main data fragment is
1122 								 * always last */
1123 		}
1124 		else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1125 		{
1126 			/* XLogRecordDataHeaderLong */
1127 			uint32		main_data_len;
1128 
1129 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1130 			state->main_data_len = main_data_len;
1131 			datatotal += main_data_len;
1132 			break;				/* by convention, the main data fragment is
1133 								 * always last */
1134 		}
1135 		else if (block_id == XLR_BLOCK_ID_ORIGIN)
1136 		{
1137 			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1138 		}
1139 		else if (block_id <= XLR_MAX_BLOCK_ID)
1140 		{
1141 			/* XLogRecordBlockHeader */
1142 			DecodedBkpBlock *blk;
1143 			uint8		fork_flags;
1144 
1145 			if (block_id <= state->max_block_id)
1146 			{
1147 				report_invalid_record(state,
1148 									  "out-of-order block_id %u at %X/%X",
1149 									  block_id,
1150 									  (uint32) (state->ReadRecPtr >> 32),
1151 									  (uint32) state->ReadRecPtr);
1152 				goto err;
1153 			}
1154 			state->max_block_id = block_id;
1155 
1156 			blk = &state->blocks[block_id];
1157 			blk->in_use = true;
1158 			blk->apply_image = false;
1159 
1160 			COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1161 			blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1162 			blk->flags = fork_flags;
1163 			blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1164 			blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1165 
1166 			COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1167 			/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1168 			if (blk->has_data && blk->data_len == 0)
1169 			{
1170 				report_invalid_record(state,
1171 									  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1172 									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1173 				goto err;
1174 			}
1175 			if (!blk->has_data && blk->data_len != 0)
1176 			{
1177 				report_invalid_record(state,
1178 									  "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1179 									  (unsigned int) blk->data_len,
1180 									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1181 				goto err;
1182 			}
1183 			datatotal += blk->data_len;
1184 
1185 			if (blk->has_image)
1186 			{
1187 				COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1188 				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1189 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1190 
1191 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1192 
1193 				if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1194 				{
1195 					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1196 						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1197 					else
1198 						blk->hole_length = 0;
1199 				}
1200 				else
1201 					blk->hole_length = BLCKSZ - blk->bimg_len;
1202 				datatotal += blk->bimg_len;
1203 
1204 				/*
1205 				 * cross-check that hole_offset > 0, hole_length > 0 and
1206 				 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1207 				 */
1208 				if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1209 					(blk->hole_offset == 0 ||
1210 					 blk->hole_length == 0 ||
1211 					 blk->bimg_len == BLCKSZ))
1212 				{
1213 					report_invalid_record(state,
1214 										  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1215 										  (unsigned int) blk->hole_offset,
1216 										  (unsigned int) blk->hole_length,
1217 										  (unsigned int) blk->bimg_len,
1218 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1219 					goto err;
1220 				}
1221 
1222 				/*
1223 				 * cross-check that hole_offset == 0 and hole_length == 0 if
1224 				 * the HAS_HOLE flag is not set.
1225 				 */
1226 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1227 					(blk->hole_offset != 0 || blk->hole_length != 0))
1228 				{
1229 					report_invalid_record(state,
1230 										  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1231 										  (unsigned int) blk->hole_offset,
1232 										  (unsigned int) blk->hole_length,
1233 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1234 					goto err;
1235 				}
1236 
1237 				/*
1238 				 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1239 				 * flag is set.
1240 				 */
1241 				if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1242 					blk->bimg_len == BLCKSZ)
1243 				{
1244 					report_invalid_record(state,
1245 										  "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1246 										  (unsigned int) blk->bimg_len,
1247 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1248 					goto err;
1249 				}
1250 
1251 				/*
1252 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1253 				 * IS_COMPRESSED flag is set.
1254 				 */
1255 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1256 					!(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1257 					blk->bimg_len != BLCKSZ)
1258 				{
1259 					report_invalid_record(state,
1260 										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1261 										  (unsigned int) blk->data_len,
1262 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1263 					goto err;
1264 				}
1265 			}
1266 			if (!(fork_flags & BKPBLOCK_SAME_REL))
1267 			{
1268 				COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1269 				rnode = &blk->rnode;
1270 			}
1271 			else
1272 			{
1273 				if (rnode == NULL)
1274 				{
1275 					report_invalid_record(state,
1276 										  "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1277 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1278 					goto err;
1279 				}
1280 
1281 				blk->rnode = *rnode;
1282 			}
1283 			COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1284 		}
1285 		else
1286 		{
1287 			report_invalid_record(state,
1288 								  "invalid block_id %u at %X/%X",
1289 								  block_id,
1290 								  (uint32) (state->ReadRecPtr >> 32),
1291 								  (uint32) state->ReadRecPtr);
1292 			goto err;
1293 		}
1294 	}
1295 
1296 	if (remaining != datatotal)
1297 		goto shortdata_err;
1298 
1299 	/*
1300 	 * Ok, we've parsed the fragment headers, and verified that the total
1301 	 * length of the payload in the fragments is equal to the amount of data
1302 	 * left. Copy the data of each fragment to a separate buffer.
1303 	 *
1304 	 * We could just set up pointers into readRecordBuf, but we want to align
1305 	 * the data for the convenience of the callers. Backup images are not
1306 	 * copied, however; they don't need alignment.
1307 	 */
1308 
1309 	/* block data first */
1310 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1311 	{
1312 		DecodedBkpBlock *blk = &state->blocks[block_id];
1313 
1314 		if (!blk->in_use)
1315 			continue;
1316 
1317 		Assert(blk->has_image || !blk->apply_image);
1318 
1319 		if (blk->has_image)
1320 		{
1321 			blk->bkp_image = ptr;
1322 			ptr += blk->bimg_len;
1323 		}
1324 		if (blk->has_data)
1325 		{
1326 			if (!blk->data || blk->data_len > blk->data_bufsz)
1327 			{
1328 				if (blk->data)
1329 					pfree(blk->data);
1330 				blk->data_bufsz = blk->data_len;
1331 				blk->data = palloc(blk->data_bufsz);
1332 			}
1333 			memcpy(blk->data, ptr, blk->data_len);
1334 			ptr += blk->data_len;
1335 		}
1336 	}
1337 
1338 	/* and finally, the main data */
1339 	if (state->main_data_len > 0)
1340 	{
1341 		if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1342 		{
1343 			if (state->main_data)
1344 				pfree(state->main_data);
1345 
1346 			/*
1347 			 * main_data_bufsz must be MAXALIGN'ed.  In many xlog record
1348 			 * types, we omit trailing struct padding on-disk to save a few
1349 			 * bytes; but compilers may generate accesses to the xlog struct
1350 			 * that assume that padding bytes are present.  If the palloc
1351 			 * request is not large enough to include such padding bytes then
1352 			 * we'll get valgrind complaints due to otherwise-harmless fetches
1353 			 * of the padding bytes.
1354 			 *
1355 			 * In addition, force the initial request to be reasonably large
1356 			 * so that we don't waste time with lots of trips through this
1357 			 * stanza.  BLCKSZ / 2 seems like a good compromise choice.
1358 			 */
1359 			state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1360 												  BLCKSZ / 2));
1361 			state->main_data = palloc(state->main_data_bufsz);
1362 		}
1363 		memcpy(state->main_data, ptr, state->main_data_len);
1364 		ptr += state->main_data_len;
1365 	}
1366 
1367 	return true;
1368 
1369 shortdata_err:
1370 	report_invalid_record(state,
1371 						  "record with invalid length at %X/%X",
1372 						  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1373 err:
1374 	*errormsg = state->errormsg_buf;
1375 
1376 	return false;
1377 }
1378 
1379 /*
1380  * Returns information about the block that a block reference refers to.
1381  *
1382  * If the WAL record contains a block reference with the given ID, *rnode,
1383  * *forknum, and *blknum are filled in (if not NULL), and returns TRUE.
1384  * Otherwise returns FALSE.
1385  */
1386 bool
XLogRecGetBlockTag(XLogReaderState * record,uint8 block_id,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)1387 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1388 				   RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1389 {
1390 	DecodedBkpBlock *bkpb;
1391 
1392 	if (!record->blocks[block_id].in_use)
1393 		return false;
1394 
1395 	bkpb = &record->blocks[block_id];
1396 	if (rnode)
1397 		*rnode = bkpb->rnode;
1398 	if (forknum)
1399 		*forknum = bkpb->forknum;
1400 	if (blknum)
1401 		*blknum = bkpb->blkno;
1402 	return true;
1403 }
1404 
1405 /*
1406  * Returns the data associated with a block reference, or NULL if there is
1407  * no data (e.g. because a full-page image was taken instead). The returned
1408  * pointer points to a MAXALIGNed buffer.
1409  */
1410 char *
XLogRecGetBlockData(XLogReaderState * record,uint8 block_id,Size * len)1411 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1412 {
1413 	DecodedBkpBlock *bkpb;
1414 
1415 	if (!record->blocks[block_id].in_use)
1416 		return NULL;
1417 
1418 	bkpb = &record->blocks[block_id];
1419 
1420 	if (!bkpb->has_data)
1421 	{
1422 		if (len)
1423 			*len = 0;
1424 		return NULL;
1425 	}
1426 	else
1427 	{
1428 		if (len)
1429 			*len = bkpb->data_len;
1430 		return bkpb->data;
1431 	}
1432 }
1433 
1434 /*
1435  * Restore a full-page image from a backup block attached to an XLOG record.
1436  *
1437  * Returns true if a full-page image is restored.
1438  */
1439 bool
RestoreBlockImage(XLogReaderState * record,uint8 block_id,char * page)1440 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1441 {
1442 	DecodedBkpBlock *bkpb;
1443 	char	   *ptr;
1444 	PGAlignedBlock tmp;
1445 
1446 	if (!record->blocks[block_id].in_use)
1447 		return false;
1448 	if (!record->blocks[block_id].has_image)
1449 		return false;
1450 
1451 	bkpb = &record->blocks[block_id];
1452 	ptr = bkpb->bkp_image;
1453 
1454 	if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1455 	{
1456 		/* If a backup block image is compressed, decompress it */
1457 		if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1458 							BLCKSZ - bkpb->hole_length) < 0)
1459 		{
1460 			report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1461 								  (uint32) (record->ReadRecPtr >> 32),
1462 								  (uint32) record->ReadRecPtr,
1463 								  block_id);
1464 			return false;
1465 		}
1466 		ptr = tmp.data;
1467 	}
1468 
1469 	/* generate page, taking into account hole if necessary */
1470 	if (bkpb->hole_length == 0)
1471 	{
1472 		memcpy(page, ptr, BLCKSZ);
1473 	}
1474 	else
1475 	{
1476 		memcpy(page, ptr, bkpb->hole_offset);
1477 		/* must zero-fill the hole */
1478 		MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1479 		memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1480 			   ptr + bkpb->hole_offset,
1481 			   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1482 	}
1483 
1484 	return true;
1485 }
1486