1 /*-------------------------------------------------------------------------
2  *
3  * xlogreader.c
4  *		Generic XLog reading facility
5  *
6  * Portions Copyright (c) 2013-2016, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		src/backend/access/transam/xlogreader.c
10  *
11  * NOTES
12  *		See xlogreader.h for more notes on this facility.
13  *
14  *		This file is compiled as both front-end and backend code, so it
15  *		may not use ereport, server-defined static variables, etc.
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 
20 #include "access/transam.h"
21 #include "access/xlogrecord.h"
22 #include "access/xlog_internal.h"
23 #include "access/xlogreader.h"
24 #include "catalog/pg_control.h"
25 #include "common/pg_lzcompress.h"
26 #include "replication/origin.h"
27 
28 #ifndef FRONTEND
29 #include "utils/memutils.h"
30 #endif
31 
32 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
33 
34 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
35 				 XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
36 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
37 				XLogRecPtr recptr);
38 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
39 				 int reqLen);
40 static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3);
41 
42 static void ResetDecoder(XLogReaderState *state);
43 
44 /* size of the buffer allocated for error message. */
45 #define MAX_ERRORMSG_LEN 1000
46 
47 /*
48  * Construct a string in state->errormsg_buf explaining what's wrong with
49  * the current record being read.
50  */
51 static void
report_invalid_record(XLogReaderState * state,const char * fmt,...)52 report_invalid_record(XLogReaderState *state, const char *fmt,...)
53 {
54 	va_list		args;
55 
56 	fmt = _(fmt);
57 
58 	va_start(args, fmt);
59 	vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
60 	va_end(args);
61 }
62 
63 /*
64  * Allocate and initialize a new XLogReader.
65  *
66  * Returns NULL if the xlogreader couldn't be allocated.
67  */
68 XLogReaderState *
XLogReaderAllocate(XLogPageReadCB pagereadfunc,void * private_data)69 XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
70 {
71 	XLogReaderState *state;
72 
73 	state = (XLogReaderState *)
74 		palloc_extended(sizeof(XLogReaderState),
75 						MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
76 	if (!state)
77 		return NULL;
78 
79 	state->max_block_id = -1;
80 
81 	/*
82 	 * Permanently allocate readBuf.  We do it this way, rather than just
83 	 * making a static array, for two reasons: (1) no need to waste the
84 	 * storage in most instantiations of the backend; (2) a static char array
85 	 * isn't guaranteed to have any particular alignment, whereas
86 	 * palloc_extended() will provide MAXALIGN'd storage.
87 	 */
88 	state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
89 											  MCXT_ALLOC_NO_OOM);
90 	if (!state->readBuf)
91 	{
92 		pfree(state);
93 		return NULL;
94 	}
95 
96 	state->read_page = pagereadfunc;
97 	/* system_identifier initialized to zeroes above */
98 	state->private_data = private_data;
99 	/* ReadRecPtr and EndRecPtr initialized to zeroes above */
100 	/* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
101 	state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
102 										  MCXT_ALLOC_NO_OOM);
103 	if (!state->errormsg_buf)
104 	{
105 		pfree(state->readBuf);
106 		pfree(state);
107 		return NULL;
108 	}
109 	state->errormsg_buf[0] = '\0';
110 
111 	/*
112 	 * Allocate an initial readRecordBuf of minimal size, which can later be
113 	 * enlarged if necessary.
114 	 */
115 	if (!allocate_recordbuf(state, 0))
116 	{
117 		pfree(state->errormsg_buf);
118 		pfree(state->readBuf);
119 		pfree(state);
120 		return NULL;
121 	}
122 
123 	return state;
124 }
125 
126 void
XLogReaderFree(XLogReaderState * state)127 XLogReaderFree(XLogReaderState *state)
128 {
129 	int			block_id;
130 
131 	for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
132 	{
133 		if (state->blocks[block_id].data)
134 			pfree(state->blocks[block_id].data);
135 	}
136 	if (state->main_data)
137 		pfree(state->main_data);
138 
139 	pfree(state->errormsg_buf);
140 	if (state->readRecordBuf)
141 		pfree(state->readRecordBuf);
142 	pfree(state->readBuf);
143 	pfree(state);
144 }
145 
146 /*
147  * Allocate readRecordBuf to fit a record of at least the given length.
148  * Returns true if successful, false if out of memory.
149  *
150  * readRecordBufSize is set to the new buffer size.
151  *
152  * To avoid useless small increases, round its size to a multiple of
153  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
154  * with.  (That is enough for all "normal" records, but very large commit or
155  * abort records might need more space.)
156  */
157 static bool
allocate_recordbuf(XLogReaderState * state,uint32 reclength)158 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
159 {
160 	uint32		newSize = reclength;
161 
162 	newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
163 	newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
164 
165 #ifndef FRONTEND
166 
167 	/*
168 	 * Note that in much unlucky circumstances, the random data read from a
169 	 * recycled segment can cause this routine to be called with a size
170 	 * causing a hard failure at allocation.  For a standby, this would cause
171 	 * the instance to stop suddenly with a hard failure, preventing it to
172 	 * retry fetching WAL from one of its sources which could allow it to move
173 	 * on with replay without a manual restart. If the data comes from a past
174 	 * recycled segment and is still valid, then the allocation may succeed
175 	 * but record checks are going to fail so this would be short-lived.  If
176 	 * the allocation fails because of a memory shortage, then this is not a
177 	 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
178 	 */
179 	if (!AllocSizeIsValid(newSize))
180 		return false;
181 
182 #endif
183 
184 	if (state->readRecordBuf)
185 		pfree(state->readRecordBuf);
186 	state->readRecordBuf =
187 		(char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
188 	if (state->readRecordBuf == NULL)
189 	{
190 		state->readRecordBufSize = 0;
191 		return false;
192 	}
193 	state->readRecordBufSize = newSize;
194 	return true;
195 }
196 
197 /*
198  * Attempt to read an XLOG record.
199  *
200  * If RecPtr is valid, try to read a record at that position.  Otherwise
201  * try to read a record just after the last one previously read.
202  *
203  * If the read_page callback fails to read the requested data, NULL is
204  * returned.  The callback is expected to have reported the error; errormsg
205  * is set to NULL.
206  *
207  * If the reading fails for some other reason, NULL is also returned, and
208  * *errormsg is set to a string with details of the failure.
209  *
210  * The returned pointer (or *errormsg) points to an internal buffer that's
211  * valid until the next call to XLogReadRecord.
212  */
213 XLogRecord *
XLogReadRecord(XLogReaderState * state,XLogRecPtr RecPtr,char ** errormsg)214 XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
215 {
216 	XLogRecord *record;
217 	XLogRecPtr	targetPagePtr;
218 	bool		randAccess;
219 	uint32		len,
220 				total_len;
221 	uint32		targetRecOff;
222 	uint32		pageHeaderSize;
223 	bool		assembled;
224 	bool		gotheader;
225 	int			readOff;
226 
227 	/*
228 	 * randAccess indicates whether to verify the previous-record pointer of
229 	 * the record we're reading.  We only do this if we're reading
230 	 * sequentially, which is what we initially assume.
231 	 */
232 	randAccess = false;
233 
234 	/* reset error state */
235 	*errormsg = NULL;
236 	state->errormsg_buf[0] = '\0';
237 
238 	ResetDecoder(state);
239 	state->abortedRecPtr = InvalidXLogRecPtr;
240 	state->missingContrecPtr = InvalidXLogRecPtr;
241 
242 	if (RecPtr == InvalidXLogRecPtr)
243 	{
244 		/* No explicit start point; read the record after the one we just read */
245 		RecPtr = state->EndRecPtr;
246 
247 		if (state->ReadRecPtr == InvalidXLogRecPtr)
248 			randAccess = true;
249 
250 		/*
251 		 * RecPtr is pointing to end+1 of the previous WAL record.  If we're
252 		 * at a page boundary, no more records can fit on the current page. We
253 		 * must skip over the page header, but we can't do that until we've
254 		 * read in the page, since the header size is variable.
255 		 */
256 	}
257 	else
258 	{
259 		/*
260 		 * Caller supplied a position to start at.
261 		 *
262 		 * In this case, the passed-in record pointer should already be
263 		 * pointing to a valid record starting position.
264 		 */
265 		Assert(XRecOffIsValid(RecPtr));
266 		randAccess = true;
267 	}
268 
269 restart:
270 	state->currRecPtr = RecPtr;
271 	assembled = false;
272 
273 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
274 	targetRecOff = RecPtr % XLOG_BLCKSZ;
275 
276 	/*
277 	 * Read the page containing the record into state->readBuf. Request enough
278 	 * byte to cover the whole record header, or at least the part of it that
279 	 * fits on the same page.
280 	 */
281 	readOff = ReadPageInternal(state,
282 							   targetPagePtr,
283 						  Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
284 	if (readOff < 0)
285 		goto err;
286 
287 	/*
288 	 * ReadPageInternal always returns at least the page header, so we can
289 	 * examine it now.
290 	 */
291 	pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
292 	if (targetRecOff == 0)
293 	{
294 		/*
295 		 * At page start, so skip over page header.
296 		 */
297 		RecPtr += pageHeaderSize;
298 		targetRecOff = pageHeaderSize;
299 	}
300 	else if (targetRecOff < pageHeaderSize)
301 	{
302 		report_invalid_record(state, "invalid record offset at %X/%X",
303 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
304 		goto err;
305 	}
306 
307 	if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
308 		targetRecOff == pageHeaderSize)
309 	{
310 		report_invalid_record(state, "contrecord is requested by %X/%X",
311 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
312 		goto err;
313 	}
314 
315 	/* ReadPageInternal has verified the page header */
316 	Assert(pageHeaderSize <= readOff);
317 
318 	/*
319 	 * Read the record length.
320 	 *
321 	 * NB: Even though we use an XLogRecord pointer here, the whole record
322 	 * header might not fit on this page. xl_tot_len is the first field of the
323 	 * struct, so it must be on this page (the records are MAXALIGNed), but we
324 	 * cannot access any other fields until we've verified that we got the
325 	 * whole header.
326 	 */
327 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
328 	total_len = record->xl_tot_len;
329 
330 	/*
331 	 * If the whole record header is on this page, validate it immediately.
332 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
333 	 * rest of the header after reading it from the next page.  The xl_tot_len
334 	 * check is necessary here to ensure that we enter the "Need to reassemble
335 	 * record" code path below; otherwise we might fail to apply
336 	 * ValidXLogRecordHeader at all.
337 	 */
338 	if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
339 	{
340 		if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
341 								   randAccess))
342 			goto err;
343 		gotheader = true;
344 	}
345 	else
346 	{
347 		/* XXX: more validation should be done here */
348 		if (total_len < SizeOfXLogRecord)
349 		{
350 			report_invalid_record(state,
351 						 "invalid record length at %X/%X: wanted %u, got %u",
352 								  (uint32) (RecPtr >> 32), (uint32) RecPtr,
353 								  (uint32) SizeOfXLogRecord, total_len);
354 			goto err;
355 		}
356 		gotheader = false;
357 	}
358 
359 	/*
360 	 * Enlarge readRecordBuf as needed.
361 	 */
362 	if (total_len > state->readRecordBufSize &&
363 		!allocate_recordbuf(state, total_len))
364 	{
365 		/* We treat this as a "bogus data" condition */
366 		report_invalid_record(state, "record length %u at %X/%X too long",
367 							  total_len,
368 							  (uint32) (RecPtr >> 32), (uint32) RecPtr);
369 		goto err;
370 	}
371 
372 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
373 	if (total_len > len)
374 	{
375 		/* Need to reassemble record */
376 		char	   *contdata;
377 		XLogPageHeader pageHeader;
378 		char	   *buffer;
379 		uint32		gotlen;
380 
381 		assembled = true;
382 		/* Copy the first fragment of the record from the first page. */
383 		memcpy(state->readRecordBuf,
384 			   state->readBuf + RecPtr % XLOG_BLCKSZ, len);
385 		buffer = state->readRecordBuf + len;
386 		gotlen = len;
387 
388 		do
389 		{
390 			/* Calculate pointer to beginning of next page */
391 			targetPagePtr += XLOG_BLCKSZ;
392 
393 			/* Wait for the next page to become available */
394 			readOff = ReadPageInternal(state, targetPagePtr,
395 								 Min(total_len - gotlen + SizeOfXLogShortPHD,
396 									 XLOG_BLCKSZ));
397 
398 			if (readOff < 0)
399 				goto err;
400 
401 			Assert(SizeOfXLogShortPHD <= readOff);
402 
403 			pageHeader = (XLogPageHeader) state->readBuf;
404 
405 			/*
406 			 * If we were expecting a continuation record and got an
407 			 * "overwrite contrecord" flag, that means the continuation record
408 			 * was overwritten with a different record.  Restart the read by
409 			 * assuming the address to read is the location where we found
410 			 * this flag; but keep track of the LSN of the record we were
411 			 * reading, for later verification.
412 			 */
413 			if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
414 			{
415 				state->overwrittenRecPtr = state->currRecPtr;
416 				ResetDecoder(state);
417 				RecPtr = targetPagePtr;
418 				goto restart;
419 			}
420 
421 			/* Check that the continuation on next page looks valid */
422 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
423 			{
424 				report_invalid_record(state,
425 									  "there is no contrecord flag at %X/%X",
426 								   (uint32) (RecPtr >> 32), (uint32) RecPtr);
427 				goto err;
428 			}
429 
430 			/*
431 			 * Cross-check that xlp_rem_len agrees with how much of the record
432 			 * we expect there to be left.
433 			 */
434 			if (pageHeader->xlp_rem_len == 0 ||
435 				total_len != (pageHeader->xlp_rem_len + gotlen))
436 			{
437 				report_invalid_record(state,
438 									  "invalid contrecord length %u at %X/%X",
439 									  pageHeader->xlp_rem_len,
440 								   (uint32) (RecPtr >> 32), (uint32) RecPtr);
441 				goto err;
442 			}
443 
444 			/* Append the continuation from this page to the buffer */
445 			pageHeaderSize = XLogPageHeaderSize(pageHeader);
446 
447 			if (readOff < pageHeaderSize)
448 				readOff = ReadPageInternal(state, targetPagePtr,
449 										   pageHeaderSize);
450 
451 			Assert(pageHeaderSize <= readOff);
452 
453 			contdata = (char *) state->readBuf + pageHeaderSize;
454 			len = XLOG_BLCKSZ - pageHeaderSize;
455 			if (pageHeader->xlp_rem_len < len)
456 				len = pageHeader->xlp_rem_len;
457 
458 			if (readOff < pageHeaderSize + len)
459 				readOff = ReadPageInternal(state, targetPagePtr,
460 										   pageHeaderSize + len);
461 
462 			memcpy(buffer, (char *) contdata, len);
463 			buffer += len;
464 			gotlen += len;
465 
466 			/* If we just reassembled the record header, validate it. */
467 			if (!gotheader)
468 			{
469 				record = (XLogRecord *) state->readRecordBuf;
470 				if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
471 										   record, randAccess))
472 					goto err;
473 				gotheader = true;
474 			}
475 		} while (gotlen < total_len);
476 
477 		Assert(gotheader);
478 
479 		record = (XLogRecord *) state->readRecordBuf;
480 		if (!ValidXLogRecord(state, record, RecPtr))
481 			goto err;
482 
483 		pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
484 		state->ReadRecPtr = RecPtr;
485 		state->EndRecPtr = targetPagePtr + pageHeaderSize
486 			+ MAXALIGN(pageHeader->xlp_rem_len);
487 	}
488 	else
489 	{
490 		/* Wait for the record data to become available */
491 		readOff = ReadPageInternal(state, targetPagePtr,
492 								 Min(targetRecOff + total_len, XLOG_BLCKSZ));
493 		if (readOff < 0)
494 			goto err;
495 
496 		/* Record does not cross a page boundary */
497 		if (!ValidXLogRecord(state, record, RecPtr))
498 			goto err;
499 
500 		state->EndRecPtr = RecPtr + MAXALIGN(total_len);
501 
502 		state->ReadRecPtr = RecPtr;
503 		memcpy(state->readRecordBuf, record, total_len);
504 	}
505 
506 	/*
507 	 * Special processing if it's an XLOG SWITCH record
508 	 */
509 	if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
510 	{
511 		/* Pretend it extends to end of segment */
512 		state->EndRecPtr += XLogSegSize - 1;
513 		state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
514 	}
515 
516 	if (DecodeXLogRecord(state, record, errormsg))
517 		return record;
518 	else
519 		return NULL;
520 
521 err:
522 	if (assembled)
523 	{
524 		/*
525 		 * We get here when a record that spans multiple pages needs to be
526 		 * assembled, but something went wrong -- perhaps a contrecord piece
527 		 * was lost.  If caller is WAL replay, it will know where the aborted
528 		 * record was and where to direct followup WAL to be written, marking
529 		 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
530 		 * in turn signal downstream WAL consumers that the broken WAL record
531 		 * is to be ignored.
532 		 */
533 		state->abortedRecPtr = RecPtr;
534 		state->missingContrecPtr = targetPagePtr;
535 	}
536 
537 	/*
538 	 * Invalidate the read state. We might read from a different source after
539 	 * failure.
540 	 */
541 	XLogReaderInvalReadState(state);
542 
543 	if (state->errormsg_buf[0] != '\0')
544 		*errormsg = state->errormsg_buf;
545 
546 	return NULL;
547 }
548 
549 /*
550  * Read a single xlog page including at least [pageptr, reqLen] of valid data
551  * via the read_page() callback.
552  *
553  * Returns -1 if the required page cannot be read for some reason; errormsg_buf
554  * is set in that case (unless the error occurs in the read_page callback).
555  *
556  * We fetch the page from a reader-local cache if we know we have the required
557  * data and if there hasn't been any error since caching the data.
558  */
559 static int
ReadPageInternal(XLogReaderState * state,XLogRecPtr pageptr,int reqLen)560 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
561 {
562 	int			readLen;
563 	uint32		targetPageOff;
564 	XLogSegNo	targetSegNo;
565 	XLogPageHeader hdr;
566 
567 	Assert((pageptr % XLOG_BLCKSZ) == 0);
568 
569 	XLByteToSeg(pageptr, targetSegNo);
570 	targetPageOff = (pageptr % XLogSegSize);
571 
572 	/* check whether we have all the requested data already */
573 	if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
574 		reqLen < state->readLen)
575 		return state->readLen;
576 
577 	/*
578 	 * Data is not in our buffer.
579 	 *
580 	 * Every time we actually read the page, even if we looked at parts of it
581 	 * before, we need to do verification as the read_page callback might now
582 	 * be rereading data from a different source.
583 	 *
584 	 * Whenever switching to a new WAL segment, we read the first page of the
585 	 * file and validate its header, even if that's not where the target
586 	 * record is.  This is so that we can check the additional identification
587 	 * info that is present in the first page's "long" header.
588 	 */
589 	if (targetSegNo != state->readSegNo && targetPageOff != 0)
590 	{
591 		XLogRecPtr	targetSegmentPtr = pageptr - targetPageOff;
592 
593 		readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
594 								   state->currRecPtr,
595 								   state->readBuf, &state->readPageTLI);
596 		if (readLen < 0)
597 			goto err;
598 
599 		/* we can be sure to have enough WAL available, we scrolled back */
600 		Assert(readLen == XLOG_BLCKSZ);
601 
602 		if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
603 										  state->readBuf))
604 			goto err;
605 	}
606 
607 	/*
608 	 * First, read the requested data length, but at least a short page header
609 	 * so that we can validate it.
610 	 */
611 	readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
612 							   state->currRecPtr,
613 							   state->readBuf, &state->readPageTLI);
614 	if (readLen < 0)
615 		goto err;
616 
617 	Assert(readLen <= XLOG_BLCKSZ);
618 
619 	/* Do we have enough data to check the header length? */
620 	if (readLen <= SizeOfXLogShortPHD)
621 		goto err;
622 
623 	Assert(readLen >= reqLen);
624 
625 	hdr = (XLogPageHeader) state->readBuf;
626 
627 	/* still not enough */
628 	if (readLen < XLogPageHeaderSize(hdr))
629 	{
630 		readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
631 								   state->currRecPtr,
632 								   state->readBuf, &state->readPageTLI);
633 		if (readLen < 0)
634 			goto err;
635 	}
636 
637 	/*
638 	 * Now that we know we have the full header, validate it.
639 	 */
640 	if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
641 		goto err;
642 
643 	/* update read state information */
644 	state->readSegNo = targetSegNo;
645 	state->readOff = targetPageOff;
646 	state->readLen = readLen;
647 
648 	return readLen;
649 
650 err:
651 	XLogReaderInvalReadState(state);
652 	return -1;
653 }
654 
655 /*
656  * Invalidate the xlogreader's read state to force a re-read.
657  */
658 void
XLogReaderInvalReadState(XLogReaderState * state)659 XLogReaderInvalReadState(XLogReaderState *state)
660 {
661 	state->readSegNo = 0;
662 	state->readOff = 0;
663 	state->readLen = 0;
664 }
665 
666 /*
667  * Validate an XLOG record header.
668  *
669  * This is just a convenience subroutine to avoid duplicated code in
670  * XLogReadRecord.  It's not intended for use from anywhere else.
671  */
672 static bool
ValidXLogRecordHeader(XLogReaderState * state,XLogRecPtr RecPtr,XLogRecPtr PrevRecPtr,XLogRecord * record,bool randAccess)673 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
674 					  XLogRecPtr PrevRecPtr, XLogRecord *record,
675 					  bool randAccess)
676 {
677 	if (record->xl_tot_len < SizeOfXLogRecord)
678 	{
679 		report_invalid_record(state,
680 						 "invalid record length at %X/%X: wanted %u, got %u",
681 							  (uint32) (RecPtr >> 32), (uint32) RecPtr,
682 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
683 		return false;
684 	}
685 	if (record->xl_rmid > RM_MAX_ID)
686 	{
687 		report_invalid_record(state,
688 							  "invalid resource manager ID %u at %X/%X",
689 							  record->xl_rmid, (uint32) (RecPtr >> 32),
690 							  (uint32) RecPtr);
691 		return false;
692 	}
693 	if (randAccess)
694 	{
695 		/*
696 		 * We can't exactly verify the prev-link, but surely it should be less
697 		 * than the record's own address.
698 		 */
699 		if (!(record->xl_prev < RecPtr))
700 		{
701 			report_invalid_record(state,
702 							"record with incorrect prev-link %X/%X at %X/%X",
703 								  (uint32) (record->xl_prev >> 32),
704 								  (uint32) record->xl_prev,
705 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
706 			return false;
707 		}
708 	}
709 	else
710 	{
711 		/*
712 		 * Record's prev-link should exactly match our previous location. This
713 		 * check guards against torn WAL pages where a stale but valid-looking
714 		 * WAL record starts on a sector boundary.
715 		 */
716 		if (record->xl_prev != PrevRecPtr)
717 		{
718 			report_invalid_record(state,
719 							"record with incorrect prev-link %X/%X at %X/%X",
720 								  (uint32) (record->xl_prev >> 32),
721 								  (uint32) record->xl_prev,
722 								  (uint32) (RecPtr >> 32), (uint32) RecPtr);
723 			return false;
724 		}
725 	}
726 
727 	return true;
728 }
729 
730 
731 /*
732  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
733  * record (other than to the minimal extent of computing the amount of
734  * data to read in) until we've checked the CRCs.
735  *
736  * We assume all of the record (that is, xl_tot_len bytes) has been read
737  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
738  * record's header, which means in particular that xl_tot_len is at least
739  * SizeOfXlogRecord.
740  */
741 static bool
ValidXLogRecord(XLogReaderState * state,XLogRecord * record,XLogRecPtr recptr)742 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
743 {
744 	pg_crc32c	crc;
745 
746 	/* Calculate the CRC */
747 	INIT_CRC32C(crc);
748 	COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
749 	/* include the record header last */
750 	COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
751 	FIN_CRC32C(crc);
752 
753 	if (!EQ_CRC32C(record->xl_crc, crc))
754 	{
755 		report_invalid_record(state,
756 			   "incorrect resource manager data checksum in record at %X/%X",
757 							  (uint32) (recptr >> 32), (uint32) recptr);
758 		return false;
759 	}
760 
761 	return true;
762 }
763 
764 /*
765  * Validate a page header.
766  *
767  * Check if 'phdr' is valid as the header of the XLog page at position
768  * 'recptr'.
769  */
770 bool
XLogReaderValidatePageHeader(XLogReaderState * state,XLogRecPtr recptr,char * phdr)771 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
772 							 char *phdr)
773 {
774 	XLogRecPtr	recaddr;
775 	XLogSegNo	segno;
776 	int32		offset;
777 	XLogPageHeader hdr = (XLogPageHeader) phdr;
778 
779 	Assert((recptr % XLOG_BLCKSZ) == 0);
780 
781 	XLByteToSeg(recptr, segno);
782 	offset = recptr % XLogSegSize;
783 
784 	XLogSegNoOffsetToRecPtr(segno, offset, recaddr);
785 
786 	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
787 	{
788 		char		fname[MAXFNAMELEN];
789 
790 		XLogFileName(fname, state->readPageTLI, segno);
791 
792 		report_invalid_record(state,
793 					"invalid magic number %04X in log segment %s, offset %u",
794 							  hdr->xlp_magic,
795 							  fname,
796 							  offset);
797 		return false;
798 	}
799 
800 	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
801 	{
802 		char		fname[MAXFNAMELEN];
803 
804 		XLogFileName(fname, state->readPageTLI, segno);
805 
806 		report_invalid_record(state,
807 					   "invalid info bits %04X in log segment %s, offset %u",
808 							  hdr->xlp_info,
809 							  fname,
810 							  offset);
811 		return false;
812 	}
813 
814 	if (hdr->xlp_info & XLP_LONG_HEADER)
815 	{
816 		XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
817 
818 		if (state->system_identifier &&
819 			longhdr->xlp_sysid != state->system_identifier)
820 		{
821 			char		fhdrident_str[32];
822 			char		sysident_str[32];
823 
824 			/*
825 			 * Format sysids separately to keep platform-dependent format code
826 			 * out of the translatable message string.
827 			 */
828 			snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
829 					 longhdr->xlp_sysid);
830 			snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
831 					 state->system_identifier);
832 			report_invalid_record(state,
833 								  "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s",
834 								  fhdrident_str, sysident_str);
835 			return false;
836 		}
837 		else if (longhdr->xlp_seg_size != XLogSegSize)
838 		{
839 			report_invalid_record(state,
840 								  "WAL file is from different database system: incorrect XLOG_SEG_SIZE in page header");
841 			return false;
842 		}
843 		else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
844 		{
845 			report_invalid_record(state,
846 								  "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
847 			return false;
848 		}
849 	}
850 	else if (offset == 0)
851 	{
852 		char		fname[MAXFNAMELEN];
853 
854 		XLogFileName(fname, state->readPageTLI, segno);
855 
856 		/* hmm, first page of file doesn't have a long header? */
857 		report_invalid_record(state,
858 					   "invalid info bits %04X in log segment %s, offset %u",
859 							  hdr->xlp_info,
860 							  fname,
861 							  offset);
862 		return false;
863 	}
864 
865 	/*
866 	 * Check that the address on the page agrees with what we expected.
867 	 * This check typically fails when an old WAL segment is recycled,
868 	 * and hasn't yet been overwritten with new data yet.
869 	 */
870 	if (hdr->xlp_pageaddr != recaddr)
871 	{
872 		char		fname[MAXFNAMELEN];
873 
874 		XLogFileName(fname, state->readPageTLI, segno);
875 
876 		report_invalid_record(state,
877 					"unexpected pageaddr %X/%X in log segment %s, offset %u",
878 			  (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
879 							  fname,
880 							  offset);
881 		return false;
882 	}
883 
884 	/*
885 	 * Since child timelines are always assigned a TLI greater than their
886 	 * immediate parent's TLI, we should never see TLI go backwards across
887 	 * successive pages of a consistent WAL sequence.
888 	 *
889 	 * Sometimes we re-read a segment that's already been (partially) read. So
890 	 * we only verify TLIs for pages that are later than the last remembered
891 	 * LSN.
892 	 */
893 	if (recptr > state->latestPagePtr)
894 	{
895 		if (hdr->xlp_tli < state->latestPageTLI)
896 		{
897 			char		fname[MAXFNAMELEN];
898 
899 			XLogFileName(fname, state->readPageTLI, segno);
900 
901 			report_invalid_record(state,
902 								  "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
903 								  hdr->xlp_tli,
904 								  state->latestPageTLI,
905 								  fname,
906 								  offset);
907 			return false;
908 		}
909 	}
910 	state->latestPagePtr = recptr;
911 	state->latestPageTLI = hdr->xlp_tli;
912 
913 	return true;
914 }
915 
916 #ifdef FRONTEND
917 /*
918  * Functions that are currently not needed in the backend, but are better
919  * implemented inside xlogreader.c because of the internal facilities available
920  * here.
921  */
922 
923 /*
924  * Find the first record with an lsn >= RecPtr.
925  *
926  * Useful for checking whether RecPtr is a valid xlog address for reading, and
927  * to find the first valid address after some address when dumping records for
928  * debugging purposes.
929  */
930 XLogRecPtr
XLogFindNextRecord(XLogReaderState * state,XLogRecPtr RecPtr)931 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
932 {
933 	XLogReaderState saved_state = *state;
934 	XLogRecPtr	tmpRecPtr;
935 	XLogRecPtr	found = InvalidXLogRecPtr;
936 	XLogPageHeader header;
937 	char	   *errormsg;
938 
939 	Assert(!XLogRecPtrIsInvalid(RecPtr));
940 
941 	/*
942 	 * skip over potential continuation data, keeping in mind that it may span
943 	 * multiple pages
944 	 */
945 	tmpRecPtr = RecPtr;
946 	while (true)
947 	{
948 		XLogRecPtr	targetPagePtr;
949 		int			targetRecOff;
950 		uint32		pageHeaderSize;
951 		int			readLen;
952 
953 		/*
954 		 * Compute targetRecOff. It should typically be equal or greater than
955 		 * short page-header since a valid record can't start anywhere before
956 		 * that, except when caller has explicitly specified the offset that
957 		 * falls somewhere there or when we are skipping multi-page
958 		 * continuation record. It doesn't matter though because
959 		 * ReadPageInternal() is prepared to handle that and will read at least
960 		 * short page-header worth of data
961 		 */
962 		targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
963 
964 		/* scroll back to page boundary */
965 		targetPagePtr = tmpRecPtr - targetRecOff;
966 
967 		/* Read the page containing the record */
968 		readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
969 		if (readLen < 0)
970 			goto err;
971 
972 		header = (XLogPageHeader) state->readBuf;
973 
974 		pageHeaderSize = XLogPageHeaderSize(header);
975 
976 		/* make sure we have enough data for the page header */
977 		readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
978 		if (readLen < 0)
979 			goto err;
980 
981 		/* skip over potential continuation data */
982 		if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
983 		{
984 			/*
985 			 * If the length of the remaining continuation data is more than
986 			 * what can fit in this page, the continuation record crosses over
987 			 * this page. Read the next page and try again. xlp_rem_len in the
988 			 * next page header will contain the remaining length of the
989 			 * continuation data
990 			 *
991 			 * Note that record headers are MAXALIGN'ed
992 			 */
993 			if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
994 				tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
995 			else
996 			{
997 				/*
998 				 * The previous continuation record ends in this page. Set
999 				 * tmpRecPtr to point to the first valid record
1000 				 */
1001 				tmpRecPtr = targetPagePtr + pageHeaderSize
1002 					+ MAXALIGN(header->xlp_rem_len);
1003 				break;
1004 			}
1005 		}
1006 		else
1007 		{
1008 			tmpRecPtr = targetPagePtr + pageHeaderSize;
1009 			break;
1010 		}
1011 	}
1012 
1013 	/*
1014 	 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1015 	 * because either we're at the first record after the beginning of a page
1016 	 * or we just jumped over the remaining data of a continuation.
1017 	 */
1018 	while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
1019 	{
1020 		/* continue after the record */
1021 		tmpRecPtr = InvalidXLogRecPtr;
1022 
1023 		/* past the record we've found, break out */
1024 		if (RecPtr <= state->ReadRecPtr)
1025 		{
1026 			found = state->ReadRecPtr;
1027 			goto out;
1028 		}
1029 	}
1030 
1031 err:
1032 out:
1033 	/* Reset state to what we had before finding the record */
1034 	state->ReadRecPtr = saved_state.ReadRecPtr;
1035 	state->EndRecPtr = saved_state.EndRecPtr;
1036 	XLogReaderInvalReadState(state);
1037 
1038 	return found;
1039 }
1040 
1041 #endif   /* FRONTEND */
1042 
1043 
1044 /* ----------------------------------------
1045  * Functions for decoding the data and block references in a record.
1046  * ----------------------------------------
1047  */
1048 
1049 /* private function to reset the state between records */
1050 static void
ResetDecoder(XLogReaderState * state)1051 ResetDecoder(XLogReaderState *state)
1052 {
1053 	int			block_id;
1054 
1055 	state->decoded_record = NULL;
1056 
1057 	state->main_data_len = 0;
1058 
1059 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1060 	{
1061 		state->blocks[block_id].in_use = false;
1062 		state->blocks[block_id].has_image = false;
1063 		state->blocks[block_id].has_data = false;
1064 	}
1065 	state->max_block_id = -1;
1066 }
1067 
1068 /*
1069  * Decode the previously read record.
1070  *
1071  * On error, a human-readable error message is returned in *errormsg, and
1072  * the return value is false.
1073  */
1074 bool
DecodeXLogRecord(XLogReaderState * state,XLogRecord * record,char ** errormsg)1075 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1076 {
1077 	/*
1078 	 * read next _size bytes from record buffer, but check for overrun first.
1079 	 */
1080 #define COPY_HEADER_FIELD(_dst, _size)			\
1081 	do {										\
1082 		if (remaining < _size)					\
1083 			goto shortdata_err;					\
1084 		memcpy(_dst, ptr, _size);				\
1085 		ptr += _size;							\
1086 		remaining -= _size;						\
1087 	} while(0)
1088 
1089 	char	   *ptr;
1090 	uint32		remaining;
1091 	uint32		datatotal;
1092 	RelFileNode *rnode = NULL;
1093 	uint8		block_id;
1094 
1095 	ResetDecoder(state);
1096 
1097 	state->decoded_record = record;
1098 	state->record_origin = InvalidRepOriginId;
1099 
1100 	ptr = (char *) record;
1101 	ptr += SizeOfXLogRecord;
1102 	remaining = record->xl_tot_len - SizeOfXLogRecord;
1103 
1104 	/* Decode the headers */
1105 	datatotal = 0;
1106 	while (remaining > datatotal)
1107 	{
1108 		COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1109 
1110 		if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1111 		{
1112 			/* XLogRecordDataHeaderShort */
1113 			uint8		main_data_len;
1114 
1115 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1116 
1117 			state->main_data_len = main_data_len;
1118 			datatotal += main_data_len;
1119 			break;				/* by convention, the main data fragment is
1120 								 * always last */
1121 		}
1122 		else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1123 		{
1124 			/* XLogRecordDataHeaderLong */
1125 			uint32		main_data_len;
1126 
1127 			COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1128 			state->main_data_len = main_data_len;
1129 			datatotal += main_data_len;
1130 			break;				/* by convention, the main data fragment is
1131 								 * always last */
1132 		}
1133 		else if (block_id == XLR_BLOCK_ID_ORIGIN)
1134 		{
1135 			COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1136 		}
1137 		else if (block_id <= XLR_MAX_BLOCK_ID)
1138 		{
1139 			/* XLogRecordBlockHeader */
1140 			DecodedBkpBlock *blk;
1141 			uint8		fork_flags;
1142 
1143 			if (block_id <= state->max_block_id)
1144 			{
1145 				report_invalid_record(state,
1146 									  "out-of-order block_id %u at %X/%X",
1147 									  block_id,
1148 									  (uint32) (state->ReadRecPtr >> 32),
1149 									  (uint32) state->ReadRecPtr);
1150 				goto err;
1151 			}
1152 			state->max_block_id = block_id;
1153 
1154 			blk = &state->blocks[block_id];
1155 			blk->in_use = true;
1156 
1157 			COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1158 			blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1159 			blk->flags = fork_flags;
1160 			blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1161 			blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1162 
1163 			COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1164 			/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1165 			if (blk->has_data && blk->data_len == 0)
1166 			{
1167 				report_invalid_record(state,
1168 					  "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1169 									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1170 				goto err;
1171 			}
1172 			if (!blk->has_data && blk->data_len != 0)
1173 			{
1174 				report_invalid_record(state,
1175 				 "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1176 									  (unsigned int) blk->data_len,
1177 									  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1178 				goto err;
1179 			}
1180 			datatotal += blk->data_len;
1181 
1182 			if (blk->has_image)
1183 			{
1184 				COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1185 				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1186 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1187 				if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1188 				{
1189 					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1190 						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1191 					else
1192 						blk->hole_length = 0;
1193 				}
1194 				else
1195 					blk->hole_length = BLCKSZ - blk->bimg_len;
1196 				datatotal += blk->bimg_len;
1197 
1198 				/*
1199 				 * cross-check that hole_offset > 0, hole_length > 0 and
1200 				 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1201 				 */
1202 				if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1203 					(blk->hole_offset == 0 ||
1204 					 blk->hole_length == 0 ||
1205 					 blk->bimg_len == BLCKSZ))
1206 				{
1207 					report_invalid_record(state,
1208 										  "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1209 										  (unsigned int) blk->hole_offset,
1210 										  (unsigned int) blk->hole_length,
1211 										  (unsigned int) blk->bimg_len,
1212 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1213 					goto err;
1214 				}
1215 
1216 				/*
1217 				 * cross-check that hole_offset == 0 and hole_length == 0 if
1218 				 * the HAS_HOLE flag is not set.
1219 				 */
1220 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1221 					(blk->hole_offset != 0 || blk->hole_length != 0))
1222 				{
1223 					report_invalid_record(state,
1224 										  "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1225 										  (unsigned int) blk->hole_offset,
1226 										  (unsigned int) blk->hole_length,
1227 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1228 					goto err;
1229 				}
1230 
1231 				/*
1232 				 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1233 				 * flag is set.
1234 				 */
1235 				if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1236 					blk->bimg_len == BLCKSZ)
1237 				{
1238 					report_invalid_record(state,
1239 										  "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1240 										  (unsigned int) blk->bimg_len,
1241 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1242 					goto err;
1243 				}
1244 
1245 				/*
1246 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1247 				 * IS_COMPRESSED flag is set.
1248 				 */
1249 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1250 					!(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1251 					blk->bimg_len != BLCKSZ)
1252 				{
1253 					report_invalid_record(state,
1254 										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1255 										  (unsigned int) blk->data_len,
1256 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1257 					goto err;
1258 				}
1259 			}
1260 			if (!(fork_flags & BKPBLOCK_SAME_REL))
1261 			{
1262 				COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1263 				rnode = &blk->rnode;
1264 			}
1265 			else
1266 			{
1267 				if (rnode == NULL)
1268 				{
1269 					report_invalid_record(state,
1270 						"BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1271 										  (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1272 					goto err;
1273 				}
1274 
1275 				blk->rnode = *rnode;
1276 			}
1277 			COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1278 		}
1279 		else
1280 		{
1281 			report_invalid_record(state,
1282 								  "invalid block_id %u at %X/%X",
1283 								  block_id,
1284 								  (uint32) (state->ReadRecPtr >> 32),
1285 								  (uint32) state->ReadRecPtr);
1286 			goto err;
1287 		}
1288 	}
1289 
1290 	if (remaining != datatotal)
1291 		goto shortdata_err;
1292 
1293 	/*
1294 	 * Ok, we've parsed the fragment headers, and verified that the total
1295 	 * length of the payload in the fragments is equal to the amount of data
1296 	 * left. Copy the data of each fragment to a separate buffer.
1297 	 *
1298 	 * We could just set up pointers into readRecordBuf, but we want to align
1299 	 * the data for the convenience of the callers. Backup images are not
1300 	 * copied, however; they don't need alignment.
1301 	 */
1302 
1303 	/* block data first */
1304 	for (block_id = 0; block_id <= state->max_block_id; block_id++)
1305 	{
1306 		DecodedBkpBlock *blk = &state->blocks[block_id];
1307 
1308 		if (!blk->in_use)
1309 			continue;
1310 		if (blk->has_image)
1311 		{
1312 			blk->bkp_image = ptr;
1313 			ptr += blk->bimg_len;
1314 		}
1315 		if (blk->has_data)
1316 		{
1317 			if (!blk->data || blk->data_len > blk->data_bufsz)
1318 			{
1319 				if (blk->data)
1320 					pfree(blk->data);
1321 				blk->data_bufsz = blk->data_len;
1322 				blk->data = palloc(blk->data_bufsz);
1323 			}
1324 			memcpy(blk->data, ptr, blk->data_len);
1325 			ptr += blk->data_len;
1326 		}
1327 	}
1328 
1329 	/* and finally, the main data */
1330 	if (state->main_data_len > 0)
1331 	{
1332 		if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1333 		{
1334 			if (state->main_data)
1335 				pfree(state->main_data);
1336 
1337 			/*
1338 			 * main_data_bufsz must be MAXALIGN'ed.  In many xlog record
1339 			 * types, we omit trailing struct padding on-disk to save a few
1340 			 * bytes; but compilers may generate accesses to the xlog struct
1341 			 * that assume that padding bytes are present.  If the palloc
1342 			 * request is not large enough to include such padding bytes then
1343 			 * we'll get valgrind complaints due to otherwise-harmless fetches
1344 			 * of the padding bytes.
1345 			 *
1346 			 * In addition, force the initial request to be reasonably large
1347 			 * so that we don't waste time with lots of trips through this
1348 			 * stanza.  BLCKSZ / 2 seems like a good compromise choice.
1349 			 */
1350 			state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1351 												  BLCKSZ / 2));
1352 			state->main_data = palloc(state->main_data_bufsz);
1353 		}
1354 		memcpy(state->main_data, ptr, state->main_data_len);
1355 		ptr += state->main_data_len;
1356 	}
1357 
1358 	return true;
1359 
1360 shortdata_err:
1361 	report_invalid_record(state,
1362 						  "record with invalid length at %X/%X",
1363 			 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1364 err:
1365 	*errormsg = state->errormsg_buf;
1366 
1367 	return false;
1368 }
1369 
1370 /*
1371  * Returns information about the block that a block reference refers to.
1372  *
1373  * If the WAL record contains a block reference with the given ID, *rnode,
1374  * *forknum, and *blknum are filled in (if not NULL), and returns TRUE.
1375  * Otherwise returns FALSE.
1376  */
1377 bool
XLogRecGetBlockTag(XLogReaderState * record,uint8 block_id,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)1378 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1379 				RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1380 {
1381 	DecodedBkpBlock *bkpb;
1382 
1383 	if (!record->blocks[block_id].in_use)
1384 		return false;
1385 
1386 	bkpb = &record->blocks[block_id];
1387 	if (rnode)
1388 		*rnode = bkpb->rnode;
1389 	if (forknum)
1390 		*forknum = bkpb->forknum;
1391 	if (blknum)
1392 		*blknum = bkpb->blkno;
1393 	return true;
1394 }
1395 
1396 /*
1397  * Returns the data associated with a block reference, or NULL if there is
1398  * no data (e.g. because a full-page image was taken instead). The returned
1399  * pointer points to a MAXALIGNed buffer.
1400  */
1401 char *
XLogRecGetBlockData(XLogReaderState * record,uint8 block_id,Size * len)1402 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1403 {
1404 	DecodedBkpBlock *bkpb;
1405 
1406 	if (!record->blocks[block_id].in_use)
1407 		return NULL;
1408 
1409 	bkpb = &record->blocks[block_id];
1410 
1411 	if (!bkpb->has_data)
1412 	{
1413 		if (len)
1414 			*len = 0;
1415 		return NULL;
1416 	}
1417 	else
1418 	{
1419 		if (len)
1420 			*len = bkpb->data_len;
1421 		return bkpb->data;
1422 	}
1423 }
1424 
1425 /*
1426  * Restore a full-page image from a backup block attached to an XLOG record.
1427  *
1428  * Returns true if a full-page image is restored.
1429  */
1430 bool
RestoreBlockImage(XLogReaderState * record,uint8 block_id,char * page)1431 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1432 {
1433 	DecodedBkpBlock *bkpb;
1434 	char	   *ptr;
1435 	PGAlignedBlock tmp;
1436 
1437 	if (!record->blocks[block_id].in_use)
1438 		return false;
1439 	if (!record->blocks[block_id].has_image)
1440 		return false;
1441 
1442 	bkpb = &record->blocks[block_id];
1443 	ptr = bkpb->bkp_image;
1444 
1445 	if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1446 	{
1447 		/* If a backup block image is compressed, decompress it */
1448 		if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1449 							BLCKSZ - bkpb->hole_length) < 0)
1450 		{
1451 			report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1452 								  (uint32) (record->ReadRecPtr >> 32),
1453 								  (uint32) record->ReadRecPtr,
1454 								  block_id);
1455 			return false;
1456 		}
1457 		ptr = tmp.data;
1458 	}
1459 
1460 	/* generate page, taking into account hole if necessary */
1461 	if (bkpb->hole_length == 0)
1462 	{
1463 		memcpy(page, ptr, BLCKSZ);
1464 	}
1465 	else
1466 	{
1467 		memcpy(page, ptr, bkpb->hole_offset);
1468 		/* must zero-fill the hole */
1469 		MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1470 		memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1471 			   ptr + bkpb->hole_offset,
1472 			   BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1473 	}
1474 
1475 	return true;
1476 }
1477