1 /*-------------------------------------------------------------------------
2 *
3 * xlogreader.c
4 * Generic XLog reading facility
5 *
6 * Portions Copyright (c) 2013-2017, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogreader.c
10 *
11 * NOTES
12 * See xlogreader.h for more notes on this facility.
13 *
14 * This file is compiled as both front-end and backend code, so it
15 * may not use ereport, server-defined static variables, etc.
16 *-------------------------------------------------------------------------
17 */
18 #include "postgres.h"
19
20 #include "access/transam.h"
21 #include "access/xlogrecord.h"
22 #include "access/xlog_internal.h"
23 #include "access/xlogreader.h"
24 #include "catalog/pg_control.h"
25 #include "common/pg_lzcompress.h"
26 #include "replication/origin.h"
27
28 #ifndef FRONTEND
29 #include "utils/memutils.h"
30 #endif
31
32 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
33
34 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
35 XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
36 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
37 XLogRecPtr recptr);
38 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
39 int reqLen);
40 static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3);
41
42 static void ResetDecoder(XLogReaderState *state);
43
44 /* size of the buffer allocated for error message. */
45 #define MAX_ERRORMSG_LEN 1000
46
47 /*
48 * Construct a string in state->errormsg_buf explaining what's wrong with
49 * the current record being read.
50 */
51 static void
report_invalid_record(XLogReaderState * state,const char * fmt,...)52 report_invalid_record(XLogReaderState *state, const char *fmt,...)
53 {
54 va_list args;
55
56 fmt = _(fmt);
57
58 va_start(args, fmt);
59 vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
60 va_end(args);
61 }
62
63 /*
64 * Allocate and initialize a new XLogReader.
65 *
66 * Returns NULL if the xlogreader couldn't be allocated.
67 */
68 XLogReaderState *
XLogReaderAllocate(XLogPageReadCB pagereadfunc,void * private_data)69 XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
70 {
71 XLogReaderState *state;
72
73 state = (XLogReaderState *)
74 palloc_extended(sizeof(XLogReaderState),
75 MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
76 if (!state)
77 return NULL;
78
79 state->max_block_id = -1;
80
81 /*
82 * Permanently allocate readBuf. We do it this way, rather than just
83 * making a static array, for two reasons: (1) no need to waste the
84 * storage in most instantiations of the backend; (2) a static char array
85 * isn't guaranteed to have any particular alignment, whereas
86 * palloc_extended() will provide MAXALIGN'd storage.
87 */
88 state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
89 MCXT_ALLOC_NO_OOM);
90 if (!state->readBuf)
91 {
92 pfree(state);
93 return NULL;
94 }
95
96 state->read_page = pagereadfunc;
97 /* system_identifier initialized to zeroes above */
98 state->private_data = private_data;
99 /* ReadRecPtr and EndRecPtr initialized to zeroes above */
100 /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
101 state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
102 MCXT_ALLOC_NO_OOM);
103 if (!state->errormsg_buf)
104 {
105 pfree(state->readBuf);
106 pfree(state);
107 return NULL;
108 }
109 state->errormsg_buf[0] = '\0';
110
111 /*
112 * Allocate an initial readRecordBuf of minimal size, which can later be
113 * enlarged if necessary.
114 */
115 if (!allocate_recordbuf(state, 0))
116 {
117 pfree(state->errormsg_buf);
118 pfree(state->readBuf);
119 pfree(state);
120 return NULL;
121 }
122
123 return state;
124 }
125
126 void
XLogReaderFree(XLogReaderState * state)127 XLogReaderFree(XLogReaderState *state)
128 {
129 int block_id;
130
131 for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
132 {
133 if (state->blocks[block_id].data)
134 pfree(state->blocks[block_id].data);
135 }
136 if (state->main_data)
137 pfree(state->main_data);
138
139 pfree(state->errormsg_buf);
140 if (state->readRecordBuf)
141 pfree(state->readRecordBuf);
142 pfree(state->readBuf);
143 pfree(state);
144 }
145
146 /*
147 * Allocate readRecordBuf to fit a record of at least the given length.
148 * Returns true if successful, false if out of memory.
149 *
150 * readRecordBufSize is set to the new buffer size.
151 *
152 * To avoid useless small increases, round its size to a multiple of
153 * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
154 * with. (That is enough for all "normal" records, but very large commit or
155 * abort records might need more space.)
156 */
157 static bool
allocate_recordbuf(XLogReaderState * state,uint32 reclength)158 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
159 {
160 uint32 newSize = reclength;
161
162 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
163 newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
164
165 #ifndef FRONTEND
166
167 /*
168 * Note that in much unlucky circumstances, the random data read from a
169 * recycled segment can cause this routine to be called with a size
170 * causing a hard failure at allocation. For a standby, this would cause
171 * the instance to stop suddenly with a hard failure, preventing it to
172 * retry fetching WAL from one of its sources which could allow it to move
173 * on with replay without a manual restart. If the data comes from a past
174 * recycled segment and is still valid, then the allocation may succeed
175 * but record checks are going to fail so this would be short-lived. If
176 * the allocation fails because of a memory shortage, then this is not a
177 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
178 */
179 if (!AllocSizeIsValid(newSize))
180 return false;
181
182 #endif
183
184 if (state->readRecordBuf)
185 pfree(state->readRecordBuf);
186 state->readRecordBuf =
187 (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
188 if (state->readRecordBuf == NULL)
189 {
190 state->readRecordBufSize = 0;
191 return false;
192 }
193 state->readRecordBufSize = newSize;
194 return true;
195 }
196
197 /*
198 * Attempt to read an XLOG record.
199 *
200 * If RecPtr is valid, try to read a record at that position. Otherwise
201 * try to read a record just after the last one previously read.
202 *
203 * If the read_page callback fails to read the requested data, NULL is
204 * returned. The callback is expected to have reported the error; errormsg
205 * is set to NULL.
206 *
207 * If the reading fails for some other reason, NULL is also returned, and
208 * *errormsg is set to a string with details of the failure.
209 *
210 * The returned pointer (or *errormsg) points to an internal buffer that's
211 * valid until the next call to XLogReadRecord.
212 */
213 XLogRecord *
XLogReadRecord(XLogReaderState * state,XLogRecPtr RecPtr,char ** errormsg)214 XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
215 {
216 XLogRecord *record;
217 XLogRecPtr targetPagePtr;
218 bool randAccess;
219 uint32 len,
220 total_len;
221 uint32 targetRecOff;
222 uint32 pageHeaderSize;
223 bool assembled;
224 bool gotheader;
225 int readOff;
226
227 /*
228 * randAccess indicates whether to verify the previous-record pointer of
229 * the record we're reading. We only do this if we're reading
230 * sequentially, which is what we initially assume.
231 */
232 randAccess = false;
233
234 /* reset error state */
235 *errormsg = NULL;
236 state->errormsg_buf[0] = '\0';
237
238 ResetDecoder(state);
239 state->abortedRecPtr = InvalidXLogRecPtr;
240 state->missingContrecPtr = InvalidXLogRecPtr;
241
242 if (RecPtr == InvalidXLogRecPtr)
243 {
244 /* No explicit start point; read the record after the one we just read */
245 RecPtr = state->EndRecPtr;
246
247 if (state->ReadRecPtr == InvalidXLogRecPtr)
248 randAccess = true;
249
250 /*
251 * RecPtr is pointing to end+1 of the previous WAL record. If we're
252 * at a page boundary, no more records can fit on the current page. We
253 * must skip over the page header, but we can't do that until we've
254 * read in the page, since the header size is variable.
255 */
256 }
257 else
258 {
259 /*
260 * Caller supplied a position to start at.
261 *
262 * In this case, the passed-in record pointer should already be
263 * pointing to a valid record starting position.
264 */
265 Assert(XRecOffIsValid(RecPtr));
266 randAccess = true;
267 }
268
269 restart:
270 state->currRecPtr = RecPtr;
271 assembled = false;
272
273 targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
274 targetRecOff = RecPtr % XLOG_BLCKSZ;
275
276 /*
277 * Read the page containing the record into state->readBuf. Request enough
278 * byte to cover the whole record header, or at least the part of it that
279 * fits on the same page.
280 */
281 readOff = ReadPageInternal(state,
282 targetPagePtr,
283 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
284 if (readOff < 0)
285 goto err;
286
287 /*
288 * ReadPageInternal always returns at least the page header, so we can
289 * examine it now.
290 */
291 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
292 if (targetRecOff == 0)
293 {
294 /*
295 * At page start, so skip over page header.
296 */
297 RecPtr += pageHeaderSize;
298 targetRecOff = pageHeaderSize;
299 }
300 else if (targetRecOff < pageHeaderSize)
301 {
302 report_invalid_record(state, "invalid record offset at %X/%X",
303 (uint32) (RecPtr >> 32), (uint32) RecPtr);
304 goto err;
305 }
306
307 if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
308 targetRecOff == pageHeaderSize)
309 {
310 report_invalid_record(state, "contrecord is requested by %X/%X",
311 (uint32) (RecPtr >> 32), (uint32) RecPtr);
312 goto err;
313 }
314
315 /* ReadPageInternal has verified the page header */
316 Assert(pageHeaderSize <= readOff);
317
318 /*
319 * Read the record length.
320 *
321 * NB: Even though we use an XLogRecord pointer here, the whole record
322 * header might not fit on this page. xl_tot_len is the first field of the
323 * struct, so it must be on this page (the records are MAXALIGNed), but we
324 * cannot access any other fields until we've verified that we got the
325 * whole header.
326 */
327 record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
328 total_len = record->xl_tot_len;
329
330 /*
331 * If the whole record header is on this page, validate it immediately.
332 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
333 * rest of the header after reading it from the next page. The xl_tot_len
334 * check is necessary here to ensure that we enter the "Need to reassemble
335 * record" code path below; otherwise we might fail to apply
336 * ValidXLogRecordHeader at all.
337 */
338 if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
339 {
340 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
341 randAccess))
342 goto err;
343 gotheader = true;
344 }
345 else
346 {
347 /* XXX: more validation should be done here */
348 if (total_len < SizeOfXLogRecord)
349 {
350 report_invalid_record(state,
351 "invalid record length at %X/%X: wanted %u, got %u",
352 (uint32) (RecPtr >> 32), (uint32) RecPtr,
353 (uint32) SizeOfXLogRecord, total_len);
354 goto err;
355 }
356 gotheader = false;
357 }
358
359 /*
360 * Enlarge readRecordBuf as needed.
361 */
362 if (total_len > state->readRecordBufSize &&
363 !allocate_recordbuf(state, total_len))
364 {
365 /* We treat this as a "bogus data" condition */
366 report_invalid_record(state, "record length %u at %X/%X too long",
367 total_len,
368 (uint32) (RecPtr >> 32), (uint32) RecPtr);
369 goto err;
370 }
371
372 len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
373 if (total_len > len)
374 {
375 /* Need to reassemble record */
376 char *contdata;
377 XLogPageHeader pageHeader;
378 char *buffer;
379 uint32 gotlen;
380
381 assembled = true;
382 /* Copy the first fragment of the record from the first page. */
383 memcpy(state->readRecordBuf,
384 state->readBuf + RecPtr % XLOG_BLCKSZ, len);
385 buffer = state->readRecordBuf + len;
386 gotlen = len;
387
388 do
389 {
390 /* Calculate pointer to beginning of next page */
391 targetPagePtr += XLOG_BLCKSZ;
392
393 /* Wait for the next page to become available */
394 readOff = ReadPageInternal(state, targetPagePtr,
395 Min(total_len - gotlen + SizeOfXLogShortPHD,
396 XLOG_BLCKSZ));
397
398 if (readOff < 0)
399 goto err;
400
401 Assert(SizeOfXLogShortPHD <= readOff);
402
403 pageHeader = (XLogPageHeader) state->readBuf;
404
405 /*
406 * If we were expecting a continuation record and got an
407 * "overwrite contrecord" flag, that means the continuation record
408 * was overwritten with a different record. Restart the read by
409 * assuming the address to read is the location where we found
410 * this flag; but keep track of the LSN of the record we were
411 * reading, for later verification.
412 */
413 if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
414 {
415 state->overwrittenRecPtr = state->currRecPtr;
416 ResetDecoder(state);
417 RecPtr = targetPagePtr;
418 goto restart;
419 }
420
421 /* Check that the continuation on next page looks valid */
422 if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
423 {
424 report_invalid_record(state,
425 "there is no contrecord flag at %X/%X",
426 (uint32) (RecPtr >> 32), (uint32) RecPtr);
427 goto err;
428 }
429
430 /*
431 * Cross-check that xlp_rem_len agrees with how much of the record
432 * we expect there to be left.
433 */
434 if (pageHeader->xlp_rem_len == 0 ||
435 total_len != (pageHeader->xlp_rem_len + gotlen))
436 {
437 report_invalid_record(state,
438 "invalid contrecord length %u at %X/%X",
439 pageHeader->xlp_rem_len,
440 (uint32) (RecPtr >> 32), (uint32) RecPtr);
441 goto err;
442 }
443
444 /* Append the continuation from this page to the buffer */
445 pageHeaderSize = XLogPageHeaderSize(pageHeader);
446
447 if (readOff < pageHeaderSize)
448 readOff = ReadPageInternal(state, targetPagePtr,
449 pageHeaderSize);
450
451 Assert(pageHeaderSize <= readOff);
452
453 contdata = (char *) state->readBuf + pageHeaderSize;
454 len = XLOG_BLCKSZ - pageHeaderSize;
455 if (pageHeader->xlp_rem_len < len)
456 len = pageHeader->xlp_rem_len;
457
458 if (readOff < pageHeaderSize + len)
459 readOff = ReadPageInternal(state, targetPagePtr,
460 pageHeaderSize + len);
461
462 memcpy(buffer, (char *) contdata, len);
463 buffer += len;
464 gotlen += len;
465
466 /* If we just reassembled the record header, validate it. */
467 if (!gotheader)
468 {
469 record = (XLogRecord *) state->readRecordBuf;
470 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
471 record, randAccess))
472 goto err;
473 gotheader = true;
474 }
475 } while (gotlen < total_len);
476
477 Assert(gotheader);
478
479 record = (XLogRecord *) state->readRecordBuf;
480 if (!ValidXLogRecord(state, record, RecPtr))
481 goto err;
482
483 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
484 state->ReadRecPtr = RecPtr;
485 state->EndRecPtr = targetPagePtr + pageHeaderSize
486 + MAXALIGN(pageHeader->xlp_rem_len);
487 }
488 else
489 {
490 /* Wait for the record data to become available */
491 readOff = ReadPageInternal(state, targetPagePtr,
492 Min(targetRecOff + total_len, XLOG_BLCKSZ));
493 if (readOff < 0)
494 goto err;
495
496 /* Record does not cross a page boundary */
497 if (!ValidXLogRecord(state, record, RecPtr))
498 goto err;
499
500 state->EndRecPtr = RecPtr + MAXALIGN(total_len);
501
502 state->ReadRecPtr = RecPtr;
503 memcpy(state->readRecordBuf, record, total_len);
504 }
505
506 /*
507 * Special processing if it's an XLOG SWITCH record
508 */
509 if (record->xl_rmid == RM_XLOG_ID &&
510 (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
511 {
512 /* Pretend it extends to end of segment */
513 state->EndRecPtr += XLogSegSize - 1;
514 state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
515 }
516
517 if (DecodeXLogRecord(state, record, errormsg))
518 return record;
519 else
520 return NULL;
521
522 err:
523 if (assembled)
524 {
525 /*
526 * We get here when a record that spans multiple pages needs to be
527 * assembled, but something went wrong -- perhaps a contrecord piece
528 * was lost. If caller is WAL replay, it will know where the aborted
529 * record was and where to direct followup WAL to be written, marking
530 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
531 * in turn signal downstream WAL consumers that the broken WAL record
532 * is to be ignored.
533 */
534 state->abortedRecPtr = RecPtr;
535 state->missingContrecPtr = targetPagePtr;
536 }
537
538 /*
539 * Invalidate the read state. We might read from a different source after
540 * failure.
541 */
542 XLogReaderInvalReadState(state);
543
544 if (state->errormsg_buf[0] != '\0')
545 *errormsg = state->errormsg_buf;
546
547 return NULL;
548 }
549
550 /*
551 * Read a single xlog page including at least [pageptr, reqLen] of valid data
552 * via the read_page() callback.
553 *
554 * Returns -1 if the required page cannot be read for some reason; errormsg_buf
555 * is set in that case (unless the error occurs in the read_page callback).
556 *
557 * We fetch the page from a reader-local cache if we know we have the required
558 * data and if there hasn't been any error since caching the data.
559 */
560 static int
ReadPageInternal(XLogReaderState * state,XLogRecPtr pageptr,int reqLen)561 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
562 {
563 int readLen;
564 uint32 targetPageOff;
565 XLogSegNo targetSegNo;
566 XLogPageHeader hdr;
567
568 Assert((pageptr % XLOG_BLCKSZ) == 0);
569
570 XLByteToSeg(pageptr, targetSegNo);
571 targetPageOff = (pageptr % XLogSegSize);
572
573 /* check whether we have all the requested data already */
574 if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
575 reqLen < state->readLen)
576 return state->readLen;
577
578 /*
579 * Data is not in our buffer.
580 *
581 * Every time we actually read the page, even if we looked at parts of it
582 * before, we need to do verification as the read_page callback might now
583 * be rereading data from a different source.
584 *
585 * Whenever switching to a new WAL segment, we read the first page of the
586 * file and validate its header, even if that's not where the target
587 * record is. This is so that we can check the additional identification
588 * info that is present in the first page's "long" header.
589 */
590 if (targetSegNo != state->readSegNo && targetPageOff != 0)
591 {
592 XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
593
594 readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
595 state->currRecPtr,
596 state->readBuf, &state->readPageTLI);
597 if (readLen < 0)
598 goto err;
599
600 /* we can be sure to have enough WAL available, we scrolled back */
601 Assert(readLen == XLOG_BLCKSZ);
602
603 if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
604 state->readBuf))
605 goto err;
606 }
607
608 /*
609 * First, read the requested data length, but at least a short page header
610 * so that we can validate it.
611 */
612 readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
613 state->currRecPtr,
614 state->readBuf, &state->readPageTLI);
615 if (readLen < 0)
616 goto err;
617
618 Assert(readLen <= XLOG_BLCKSZ);
619
620 /* Do we have enough data to check the header length? */
621 if (readLen <= SizeOfXLogShortPHD)
622 goto err;
623
624 Assert(readLen >= reqLen);
625
626 hdr = (XLogPageHeader) state->readBuf;
627
628 /* still not enough */
629 if (readLen < XLogPageHeaderSize(hdr))
630 {
631 readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
632 state->currRecPtr,
633 state->readBuf, &state->readPageTLI);
634 if (readLen < 0)
635 goto err;
636 }
637
638 /*
639 * Now that we know we have the full header, validate it.
640 */
641 if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
642 goto err;
643
644 /* update read state information */
645 state->readSegNo = targetSegNo;
646 state->readOff = targetPageOff;
647 state->readLen = readLen;
648
649 return readLen;
650
651 err:
652 XLogReaderInvalReadState(state);
653 return -1;
654 }
655
656 /*
657 * Invalidate the xlogreader's read state to force a re-read.
658 */
659 void
XLogReaderInvalReadState(XLogReaderState * state)660 XLogReaderInvalReadState(XLogReaderState *state)
661 {
662 state->readSegNo = 0;
663 state->readOff = 0;
664 state->readLen = 0;
665 }
666
667 /*
668 * Validate an XLOG record header.
669 *
670 * This is just a convenience subroutine to avoid duplicated code in
671 * XLogReadRecord. It's not intended for use from anywhere else.
672 */
673 static bool
ValidXLogRecordHeader(XLogReaderState * state,XLogRecPtr RecPtr,XLogRecPtr PrevRecPtr,XLogRecord * record,bool randAccess)674 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
675 XLogRecPtr PrevRecPtr, XLogRecord *record,
676 bool randAccess)
677 {
678 if (record->xl_tot_len < SizeOfXLogRecord)
679 {
680 report_invalid_record(state,
681 "invalid record length at %X/%X: wanted %u, got %u",
682 (uint32) (RecPtr >> 32), (uint32) RecPtr,
683 (uint32) SizeOfXLogRecord, record->xl_tot_len);
684 return false;
685 }
686 if (record->xl_rmid > RM_MAX_ID)
687 {
688 report_invalid_record(state,
689 "invalid resource manager ID %u at %X/%X",
690 record->xl_rmid, (uint32) (RecPtr >> 32),
691 (uint32) RecPtr);
692 return false;
693 }
694 if (randAccess)
695 {
696 /*
697 * We can't exactly verify the prev-link, but surely it should be less
698 * than the record's own address.
699 */
700 if (!(record->xl_prev < RecPtr))
701 {
702 report_invalid_record(state,
703 "record with incorrect prev-link %X/%X at %X/%X",
704 (uint32) (record->xl_prev >> 32),
705 (uint32) record->xl_prev,
706 (uint32) (RecPtr >> 32), (uint32) RecPtr);
707 return false;
708 }
709 }
710 else
711 {
712 /*
713 * Record's prev-link should exactly match our previous location. This
714 * check guards against torn WAL pages where a stale but valid-looking
715 * WAL record starts on a sector boundary.
716 */
717 if (record->xl_prev != PrevRecPtr)
718 {
719 report_invalid_record(state,
720 "record with incorrect prev-link %X/%X at %X/%X",
721 (uint32) (record->xl_prev >> 32),
722 (uint32) record->xl_prev,
723 (uint32) (RecPtr >> 32), (uint32) RecPtr);
724 return false;
725 }
726 }
727
728 return true;
729 }
730
731
732 /*
733 * CRC-check an XLOG record. We do not believe the contents of an XLOG
734 * record (other than to the minimal extent of computing the amount of
735 * data to read in) until we've checked the CRCs.
736 *
737 * We assume all of the record (that is, xl_tot_len bytes) has been read
738 * into memory at *record. Also, ValidXLogRecordHeader() has accepted the
739 * record's header, which means in particular that xl_tot_len is at least
740 * SizeOfXlogRecord.
741 */
742 static bool
ValidXLogRecord(XLogReaderState * state,XLogRecord * record,XLogRecPtr recptr)743 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
744 {
745 pg_crc32c crc;
746
747 /* Calculate the CRC */
748 INIT_CRC32C(crc);
749 COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
750 /* include the record header last */
751 COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
752 FIN_CRC32C(crc);
753
754 if (!EQ_CRC32C(record->xl_crc, crc))
755 {
756 report_invalid_record(state,
757 "incorrect resource manager data checksum in record at %X/%X",
758 (uint32) (recptr >> 32), (uint32) recptr);
759 return false;
760 }
761
762 return true;
763 }
764
765 /*
766 * Validate a page header.
767 *
768 * Check if 'phdr' is valid as the header of the XLog page at position
769 * 'recptr'.
770 */
771 bool
XLogReaderValidatePageHeader(XLogReaderState * state,XLogRecPtr recptr,char * phdr)772 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
773 char *phdr)
774 {
775 XLogRecPtr recaddr;
776 XLogSegNo segno;
777 int32 offset;
778 XLogPageHeader hdr = (XLogPageHeader) phdr;
779
780 Assert((recptr % XLOG_BLCKSZ) == 0);
781
782 XLByteToSeg(recptr, segno);
783 offset = recptr % XLogSegSize;
784
785 XLogSegNoOffsetToRecPtr(segno, offset, recaddr);
786
787 if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
788 {
789 char fname[MAXFNAMELEN];
790
791 XLogFileName(fname, state->readPageTLI, segno);
792
793 report_invalid_record(state,
794 "invalid magic number %04X in log segment %s, offset %u",
795 hdr->xlp_magic,
796 fname,
797 offset);
798 return false;
799 }
800
801 if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
802 {
803 char fname[MAXFNAMELEN];
804
805 XLogFileName(fname, state->readPageTLI, segno);
806
807 report_invalid_record(state,
808 "invalid info bits %04X in log segment %s, offset %u",
809 hdr->xlp_info,
810 fname,
811 offset);
812 return false;
813 }
814
815 if (hdr->xlp_info & XLP_LONG_HEADER)
816 {
817 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
818
819 if (state->system_identifier &&
820 longhdr->xlp_sysid != state->system_identifier)
821 {
822 char fhdrident_str[32];
823 char sysident_str[32];
824
825 /*
826 * Format sysids separately to keep platform-dependent format code
827 * out of the translatable message string.
828 */
829 snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
830 longhdr->xlp_sysid);
831 snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
832 state->system_identifier);
833 report_invalid_record(state,
834 "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s",
835 fhdrident_str, sysident_str);
836 return false;
837 }
838 else if (longhdr->xlp_seg_size != XLogSegSize)
839 {
840 report_invalid_record(state,
841 "WAL file is from different database system: incorrect XLOG_SEG_SIZE in page header");
842 return false;
843 }
844 else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
845 {
846 report_invalid_record(state,
847 "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
848 return false;
849 }
850 }
851 else if (offset == 0)
852 {
853 char fname[MAXFNAMELEN];
854
855 XLogFileName(fname, state->readPageTLI, segno);
856
857 /* hmm, first page of file doesn't have a long header? */
858 report_invalid_record(state,
859 "invalid info bits %04X in log segment %s, offset %u",
860 hdr->xlp_info,
861 fname,
862 offset);
863 return false;
864 }
865
866 /*
867 * Check that the address on the page agrees with what we expected.
868 * This check typically fails when an old WAL segment is recycled,
869 * and hasn't yet been overwritten with new data yet.
870 */
871 if (hdr->xlp_pageaddr != recaddr)
872 {
873 char fname[MAXFNAMELEN];
874
875 XLogFileName(fname, state->readPageTLI, segno);
876
877 report_invalid_record(state,
878 "unexpected pageaddr %X/%X in log segment %s, offset %u",
879 (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
880 fname,
881 offset);
882 return false;
883 }
884
885 /*
886 * Since child timelines are always assigned a TLI greater than their
887 * immediate parent's TLI, we should never see TLI go backwards across
888 * successive pages of a consistent WAL sequence.
889 *
890 * Sometimes we re-read a segment that's already been (partially) read. So
891 * we only verify TLIs for pages that are later than the last remembered
892 * LSN.
893 */
894 if (recptr > state->latestPagePtr)
895 {
896 if (hdr->xlp_tli < state->latestPageTLI)
897 {
898 char fname[MAXFNAMELEN];
899
900 XLogFileName(fname, state->readPageTLI, segno);
901
902 report_invalid_record(state,
903 "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
904 hdr->xlp_tli,
905 state->latestPageTLI,
906 fname,
907 offset);
908 return false;
909 }
910 }
911 state->latestPagePtr = recptr;
912 state->latestPageTLI = hdr->xlp_tli;
913
914 return true;
915 }
916
917 #ifdef FRONTEND
918 /*
919 * Functions that are currently not needed in the backend, but are better
920 * implemented inside xlogreader.c because of the internal facilities available
921 * here.
922 */
923
924 /*
925 * Find the first record with an lsn >= RecPtr.
926 *
927 * Useful for checking whether RecPtr is a valid xlog address for reading, and
928 * to find the first valid address after some address when dumping records for
929 * debugging purposes.
930 */
931 XLogRecPtr
XLogFindNextRecord(XLogReaderState * state,XLogRecPtr RecPtr)932 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
933 {
934 XLogReaderState saved_state = *state;
935 XLogRecPtr tmpRecPtr;
936 XLogRecPtr found = InvalidXLogRecPtr;
937 XLogPageHeader header;
938 char *errormsg;
939
940 Assert(!XLogRecPtrIsInvalid(RecPtr));
941
942 /*
943 * skip over potential continuation data, keeping in mind that it may span
944 * multiple pages
945 */
946 tmpRecPtr = RecPtr;
947 while (true)
948 {
949 XLogRecPtr targetPagePtr;
950 int targetRecOff;
951 uint32 pageHeaderSize;
952 int readLen;
953
954 /*
955 * Compute targetRecOff. It should typically be equal or greater than
956 * short page-header since a valid record can't start anywhere before
957 * that, except when caller has explicitly specified the offset that
958 * falls somewhere there or when we are skipping multi-page
959 * continuation record. It doesn't matter though because
960 * ReadPageInternal() is prepared to handle that and will read at
961 * least short page-header worth of data
962 */
963 targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
964
965 /* scroll back to page boundary */
966 targetPagePtr = tmpRecPtr - targetRecOff;
967
968 /* Read the page containing the record */
969 readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
970 if (readLen < 0)
971 goto err;
972
973 header = (XLogPageHeader) state->readBuf;
974
975 pageHeaderSize = XLogPageHeaderSize(header);
976
977 /* make sure we have enough data for the page header */
978 readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
979 if (readLen < 0)
980 goto err;
981
982 /* skip over potential continuation data */
983 if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
984 {
985 /*
986 * If the length of the remaining continuation data is more than
987 * what can fit in this page, the continuation record crosses over
988 * this page. Read the next page and try again. xlp_rem_len in the
989 * next page header will contain the remaining length of the
990 * continuation data
991 *
992 * Note that record headers are MAXALIGN'ed
993 */
994 if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
995 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
996 else
997 {
998 /*
999 * The previous continuation record ends in this page. Set
1000 * tmpRecPtr to point to the first valid record
1001 */
1002 tmpRecPtr = targetPagePtr + pageHeaderSize
1003 + MAXALIGN(header->xlp_rem_len);
1004 break;
1005 }
1006 }
1007 else
1008 {
1009 tmpRecPtr = targetPagePtr + pageHeaderSize;
1010 break;
1011 }
1012 }
1013
1014 /*
1015 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1016 * because either we're at the first record after the beginning of a page
1017 * or we just jumped over the remaining data of a continuation.
1018 */
1019 while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
1020 {
1021 /* continue after the record */
1022 tmpRecPtr = InvalidXLogRecPtr;
1023
1024 /* past the record we've found, break out */
1025 if (RecPtr <= state->ReadRecPtr)
1026 {
1027 found = state->ReadRecPtr;
1028 goto out;
1029 }
1030 }
1031
1032 err:
1033 out:
1034 /* Reset state to what we had before finding the record */
1035 state->ReadRecPtr = saved_state.ReadRecPtr;
1036 state->EndRecPtr = saved_state.EndRecPtr;
1037 XLogReaderInvalReadState(state);
1038
1039 return found;
1040 }
1041
1042 #endif /* FRONTEND */
1043
1044
1045 /* ----------------------------------------
1046 * Functions for decoding the data and block references in a record.
1047 * ----------------------------------------
1048 */
1049
1050 /* private function to reset the state between records */
1051 static void
ResetDecoder(XLogReaderState * state)1052 ResetDecoder(XLogReaderState *state)
1053 {
1054 int block_id;
1055
1056 state->decoded_record = NULL;
1057
1058 state->main_data_len = 0;
1059
1060 for (block_id = 0; block_id <= state->max_block_id; block_id++)
1061 {
1062 state->blocks[block_id].in_use = false;
1063 state->blocks[block_id].has_image = false;
1064 state->blocks[block_id].has_data = false;
1065 state->blocks[block_id].apply_image = false;
1066 }
1067 state->max_block_id = -1;
1068 }
1069
1070 /*
1071 * Decode the previously read record.
1072 *
1073 * On error, a human-readable error message is returned in *errormsg, and
1074 * the return value is false.
1075 */
1076 bool
DecodeXLogRecord(XLogReaderState * state,XLogRecord * record,char ** errormsg)1077 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1078 {
1079 /*
1080 * read next _size bytes from record buffer, but check for overrun first.
1081 */
1082 #define COPY_HEADER_FIELD(_dst, _size) \
1083 do { \
1084 if (remaining < _size) \
1085 goto shortdata_err; \
1086 memcpy(_dst, ptr, _size); \
1087 ptr += _size; \
1088 remaining -= _size; \
1089 } while(0)
1090
1091 char *ptr;
1092 uint32 remaining;
1093 uint32 datatotal;
1094 RelFileNode *rnode = NULL;
1095 uint8 block_id;
1096
1097 ResetDecoder(state);
1098
1099 state->decoded_record = record;
1100 state->record_origin = InvalidRepOriginId;
1101
1102 ptr = (char *) record;
1103 ptr += SizeOfXLogRecord;
1104 remaining = record->xl_tot_len - SizeOfXLogRecord;
1105
1106 /* Decode the headers */
1107 datatotal = 0;
1108 while (remaining > datatotal)
1109 {
1110 COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1111
1112 if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1113 {
1114 /* XLogRecordDataHeaderShort */
1115 uint8 main_data_len;
1116
1117 COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1118
1119 state->main_data_len = main_data_len;
1120 datatotal += main_data_len;
1121 break; /* by convention, the main data fragment is
1122 * always last */
1123 }
1124 else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1125 {
1126 /* XLogRecordDataHeaderLong */
1127 uint32 main_data_len;
1128
1129 COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1130 state->main_data_len = main_data_len;
1131 datatotal += main_data_len;
1132 break; /* by convention, the main data fragment is
1133 * always last */
1134 }
1135 else if (block_id == XLR_BLOCK_ID_ORIGIN)
1136 {
1137 COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1138 }
1139 else if (block_id <= XLR_MAX_BLOCK_ID)
1140 {
1141 /* XLogRecordBlockHeader */
1142 DecodedBkpBlock *blk;
1143 uint8 fork_flags;
1144
1145 if (block_id <= state->max_block_id)
1146 {
1147 report_invalid_record(state,
1148 "out-of-order block_id %u at %X/%X",
1149 block_id,
1150 (uint32) (state->ReadRecPtr >> 32),
1151 (uint32) state->ReadRecPtr);
1152 goto err;
1153 }
1154 state->max_block_id = block_id;
1155
1156 blk = &state->blocks[block_id];
1157 blk->in_use = true;
1158 blk->apply_image = false;
1159
1160 COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1161 blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1162 blk->flags = fork_flags;
1163 blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1164 blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1165
1166 COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1167 /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1168 if (blk->has_data && blk->data_len == 0)
1169 {
1170 report_invalid_record(state,
1171 "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1172 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1173 goto err;
1174 }
1175 if (!blk->has_data && blk->data_len != 0)
1176 {
1177 report_invalid_record(state,
1178 "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1179 (unsigned int) blk->data_len,
1180 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1181 goto err;
1182 }
1183 datatotal += blk->data_len;
1184
1185 if (blk->has_image)
1186 {
1187 COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1188 COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1189 COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1190
1191 blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
1192
1193 if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1194 {
1195 if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1196 COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1197 else
1198 blk->hole_length = 0;
1199 }
1200 else
1201 blk->hole_length = BLCKSZ - blk->bimg_len;
1202 datatotal += blk->bimg_len;
1203
1204 /*
1205 * cross-check that hole_offset > 0, hole_length > 0 and
1206 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1207 */
1208 if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1209 (blk->hole_offset == 0 ||
1210 blk->hole_length == 0 ||
1211 blk->bimg_len == BLCKSZ))
1212 {
1213 report_invalid_record(state,
1214 "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1215 (unsigned int) blk->hole_offset,
1216 (unsigned int) blk->hole_length,
1217 (unsigned int) blk->bimg_len,
1218 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1219 goto err;
1220 }
1221
1222 /*
1223 * cross-check that hole_offset == 0 and hole_length == 0 if
1224 * the HAS_HOLE flag is not set.
1225 */
1226 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1227 (blk->hole_offset != 0 || blk->hole_length != 0))
1228 {
1229 report_invalid_record(state,
1230 "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1231 (unsigned int) blk->hole_offset,
1232 (unsigned int) blk->hole_length,
1233 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1234 goto err;
1235 }
1236
1237 /*
1238 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1239 * flag is set.
1240 */
1241 if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1242 blk->bimg_len == BLCKSZ)
1243 {
1244 report_invalid_record(state,
1245 "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1246 (unsigned int) blk->bimg_len,
1247 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1248 goto err;
1249 }
1250
1251 /*
1252 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1253 * IS_COMPRESSED flag is set.
1254 */
1255 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1256 !(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1257 blk->bimg_len != BLCKSZ)
1258 {
1259 report_invalid_record(state,
1260 "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1261 (unsigned int) blk->data_len,
1262 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1263 goto err;
1264 }
1265 }
1266 if (!(fork_flags & BKPBLOCK_SAME_REL))
1267 {
1268 COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1269 rnode = &blk->rnode;
1270 }
1271 else
1272 {
1273 if (rnode == NULL)
1274 {
1275 report_invalid_record(state,
1276 "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1277 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1278 goto err;
1279 }
1280
1281 blk->rnode = *rnode;
1282 }
1283 COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1284 }
1285 else
1286 {
1287 report_invalid_record(state,
1288 "invalid block_id %u at %X/%X",
1289 block_id,
1290 (uint32) (state->ReadRecPtr >> 32),
1291 (uint32) state->ReadRecPtr);
1292 goto err;
1293 }
1294 }
1295
1296 if (remaining != datatotal)
1297 goto shortdata_err;
1298
1299 /*
1300 * Ok, we've parsed the fragment headers, and verified that the total
1301 * length of the payload in the fragments is equal to the amount of data
1302 * left. Copy the data of each fragment to a separate buffer.
1303 *
1304 * We could just set up pointers into readRecordBuf, but we want to align
1305 * the data for the convenience of the callers. Backup images are not
1306 * copied, however; they don't need alignment.
1307 */
1308
1309 /* block data first */
1310 for (block_id = 0; block_id <= state->max_block_id; block_id++)
1311 {
1312 DecodedBkpBlock *blk = &state->blocks[block_id];
1313
1314 if (!blk->in_use)
1315 continue;
1316
1317 Assert(blk->has_image || !blk->apply_image);
1318
1319 if (blk->has_image)
1320 {
1321 blk->bkp_image = ptr;
1322 ptr += blk->bimg_len;
1323 }
1324 if (blk->has_data)
1325 {
1326 if (!blk->data || blk->data_len > blk->data_bufsz)
1327 {
1328 if (blk->data)
1329 pfree(blk->data);
1330 blk->data_bufsz = blk->data_len;
1331 blk->data = palloc(blk->data_bufsz);
1332 }
1333 memcpy(blk->data, ptr, blk->data_len);
1334 ptr += blk->data_len;
1335 }
1336 }
1337
1338 /* and finally, the main data */
1339 if (state->main_data_len > 0)
1340 {
1341 if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1342 {
1343 if (state->main_data)
1344 pfree(state->main_data);
1345
1346 /*
1347 * main_data_bufsz must be MAXALIGN'ed. In many xlog record
1348 * types, we omit trailing struct padding on-disk to save a few
1349 * bytes; but compilers may generate accesses to the xlog struct
1350 * that assume that padding bytes are present. If the palloc
1351 * request is not large enough to include such padding bytes then
1352 * we'll get valgrind complaints due to otherwise-harmless fetches
1353 * of the padding bytes.
1354 *
1355 * In addition, force the initial request to be reasonably large
1356 * so that we don't waste time with lots of trips through this
1357 * stanza. BLCKSZ / 2 seems like a good compromise choice.
1358 */
1359 state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1360 BLCKSZ / 2));
1361 state->main_data = palloc(state->main_data_bufsz);
1362 }
1363 memcpy(state->main_data, ptr, state->main_data_len);
1364 ptr += state->main_data_len;
1365 }
1366
1367 return true;
1368
1369 shortdata_err:
1370 report_invalid_record(state,
1371 "record with invalid length at %X/%X",
1372 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1373 err:
1374 *errormsg = state->errormsg_buf;
1375
1376 return false;
1377 }
1378
1379 /*
1380 * Returns information about the block that a block reference refers to.
1381 *
1382 * If the WAL record contains a block reference with the given ID, *rnode,
1383 * *forknum, and *blknum are filled in (if not NULL), and returns TRUE.
1384 * Otherwise returns FALSE.
1385 */
1386 bool
XLogRecGetBlockTag(XLogReaderState * record,uint8 block_id,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)1387 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1388 RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1389 {
1390 DecodedBkpBlock *bkpb;
1391
1392 if (!record->blocks[block_id].in_use)
1393 return false;
1394
1395 bkpb = &record->blocks[block_id];
1396 if (rnode)
1397 *rnode = bkpb->rnode;
1398 if (forknum)
1399 *forknum = bkpb->forknum;
1400 if (blknum)
1401 *blknum = bkpb->blkno;
1402 return true;
1403 }
1404
1405 /*
1406 * Returns the data associated with a block reference, or NULL if there is
1407 * no data (e.g. because a full-page image was taken instead). The returned
1408 * pointer points to a MAXALIGNed buffer.
1409 */
1410 char *
XLogRecGetBlockData(XLogReaderState * record,uint8 block_id,Size * len)1411 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1412 {
1413 DecodedBkpBlock *bkpb;
1414
1415 if (!record->blocks[block_id].in_use)
1416 return NULL;
1417
1418 bkpb = &record->blocks[block_id];
1419
1420 if (!bkpb->has_data)
1421 {
1422 if (len)
1423 *len = 0;
1424 return NULL;
1425 }
1426 else
1427 {
1428 if (len)
1429 *len = bkpb->data_len;
1430 return bkpb->data;
1431 }
1432 }
1433
1434 /*
1435 * Restore a full-page image from a backup block attached to an XLOG record.
1436 *
1437 * Returns true if a full-page image is restored.
1438 */
1439 bool
RestoreBlockImage(XLogReaderState * record,uint8 block_id,char * page)1440 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1441 {
1442 DecodedBkpBlock *bkpb;
1443 char *ptr;
1444 PGAlignedBlock tmp;
1445
1446 if (!record->blocks[block_id].in_use)
1447 return false;
1448 if (!record->blocks[block_id].has_image)
1449 return false;
1450
1451 bkpb = &record->blocks[block_id];
1452 ptr = bkpb->bkp_image;
1453
1454 if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1455 {
1456 /* If a backup block image is compressed, decompress it */
1457 if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1458 BLCKSZ - bkpb->hole_length) < 0)
1459 {
1460 report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1461 (uint32) (record->ReadRecPtr >> 32),
1462 (uint32) record->ReadRecPtr,
1463 block_id);
1464 return false;
1465 }
1466 ptr = tmp.data;
1467 }
1468
1469 /* generate page, taking into account hole if necessary */
1470 if (bkpb->hole_length == 0)
1471 {
1472 memcpy(page, ptr, BLCKSZ);
1473 }
1474 else
1475 {
1476 memcpy(page, ptr, bkpb->hole_offset);
1477 /* must zero-fill the hole */
1478 MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1479 memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1480 ptr + bkpb->hole_offset,
1481 BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1482 }
1483
1484 return true;
1485 }
1486