1 /*-------------------------------------------------------------------------
2 *
3 * xlogreader.c
4 * Generic XLog reading facility
5 *
6 * Portions Copyright (c) 2013-2016, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogreader.c
10 *
11 * NOTES
12 * See xlogreader.h for more notes on this facility.
13 *
14 * This file is compiled as both front-end and backend code, so it
15 * may not use ereport, server-defined static variables, etc.
16 *-------------------------------------------------------------------------
17 */
18 #include "postgres.h"
19
20 #include "access/transam.h"
21 #include "access/xlogrecord.h"
22 #include "access/xlog_internal.h"
23 #include "access/xlogreader.h"
24 #include "catalog/pg_control.h"
25 #include "common/pg_lzcompress.h"
26 #include "replication/origin.h"
27
28 #ifndef FRONTEND
29 #include "utils/memutils.h"
30 #endif
31
32 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
33
34 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
35 XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
36 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
37 XLogRecPtr recptr);
38 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
39 int reqLen);
40 static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3);
41
42 static void ResetDecoder(XLogReaderState *state);
43
44 /* size of the buffer allocated for error message. */
45 #define MAX_ERRORMSG_LEN 1000
46
47 /*
48 * Construct a string in state->errormsg_buf explaining what's wrong with
49 * the current record being read.
50 */
51 static void
report_invalid_record(XLogReaderState * state,const char * fmt,...)52 report_invalid_record(XLogReaderState *state, const char *fmt,...)
53 {
54 va_list args;
55
56 fmt = _(fmt);
57
58 va_start(args, fmt);
59 vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
60 va_end(args);
61 }
62
63 /*
64 * Allocate and initialize a new XLogReader.
65 *
66 * Returns NULL if the xlogreader couldn't be allocated.
67 */
68 XLogReaderState *
XLogReaderAllocate(XLogPageReadCB pagereadfunc,void * private_data)69 XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
70 {
71 XLogReaderState *state;
72
73 state = (XLogReaderState *)
74 palloc_extended(sizeof(XLogReaderState),
75 MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
76 if (!state)
77 return NULL;
78
79 state->max_block_id = -1;
80
81 /*
82 * Permanently allocate readBuf. We do it this way, rather than just
83 * making a static array, for two reasons: (1) no need to waste the
84 * storage in most instantiations of the backend; (2) a static char array
85 * isn't guaranteed to have any particular alignment, whereas
86 * palloc_extended() will provide MAXALIGN'd storage.
87 */
88 state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
89 MCXT_ALLOC_NO_OOM);
90 if (!state->readBuf)
91 {
92 pfree(state);
93 return NULL;
94 }
95
96 state->read_page = pagereadfunc;
97 /* system_identifier initialized to zeroes above */
98 state->private_data = private_data;
99 /* ReadRecPtr and EndRecPtr initialized to zeroes above */
100 /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
101 state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
102 MCXT_ALLOC_NO_OOM);
103 if (!state->errormsg_buf)
104 {
105 pfree(state->readBuf);
106 pfree(state);
107 return NULL;
108 }
109 state->errormsg_buf[0] = '\0';
110
111 /*
112 * Allocate an initial readRecordBuf of minimal size, which can later be
113 * enlarged if necessary.
114 */
115 if (!allocate_recordbuf(state, 0))
116 {
117 pfree(state->errormsg_buf);
118 pfree(state->readBuf);
119 pfree(state);
120 return NULL;
121 }
122
123 return state;
124 }
125
126 void
XLogReaderFree(XLogReaderState * state)127 XLogReaderFree(XLogReaderState *state)
128 {
129 int block_id;
130
131 for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
132 {
133 if (state->blocks[block_id].data)
134 pfree(state->blocks[block_id].data);
135 }
136 if (state->main_data)
137 pfree(state->main_data);
138
139 pfree(state->errormsg_buf);
140 if (state->readRecordBuf)
141 pfree(state->readRecordBuf);
142 pfree(state->readBuf);
143 pfree(state);
144 }
145
146 /*
147 * Allocate readRecordBuf to fit a record of at least the given length.
148 * Returns true if successful, false if out of memory.
149 *
150 * readRecordBufSize is set to the new buffer size.
151 *
152 * To avoid useless small increases, round its size to a multiple of
153 * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
154 * with. (That is enough for all "normal" records, but very large commit or
155 * abort records might need more space.)
156 */
157 static bool
allocate_recordbuf(XLogReaderState * state,uint32 reclength)158 allocate_recordbuf(XLogReaderState *state, uint32 reclength)
159 {
160 uint32 newSize = reclength;
161
162 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
163 newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
164
165 #ifndef FRONTEND
166
167 /*
168 * Note that in much unlucky circumstances, the random data read from a
169 * recycled segment can cause this routine to be called with a size
170 * causing a hard failure at allocation. For a standby, this would cause
171 * the instance to stop suddenly with a hard failure, preventing it to
172 * retry fetching WAL from one of its sources which could allow it to move
173 * on with replay without a manual restart. If the data comes from a past
174 * recycled segment and is still valid, then the allocation may succeed
175 * but record checks are going to fail so this would be short-lived. If
176 * the allocation fails because of a memory shortage, then this is not a
177 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
178 */
179 if (!AllocSizeIsValid(newSize))
180 return false;
181
182 #endif
183
184 if (state->readRecordBuf)
185 pfree(state->readRecordBuf);
186 state->readRecordBuf =
187 (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM);
188 if (state->readRecordBuf == NULL)
189 {
190 state->readRecordBufSize = 0;
191 return false;
192 }
193 state->readRecordBufSize = newSize;
194 return true;
195 }
196
197 /*
198 * Attempt to read an XLOG record.
199 *
200 * If RecPtr is valid, try to read a record at that position. Otherwise
201 * try to read a record just after the last one previously read.
202 *
203 * If the read_page callback fails to read the requested data, NULL is
204 * returned. The callback is expected to have reported the error; errormsg
205 * is set to NULL.
206 *
207 * If the reading fails for some other reason, NULL is also returned, and
208 * *errormsg is set to a string with details of the failure.
209 *
210 * The returned pointer (or *errormsg) points to an internal buffer that's
211 * valid until the next call to XLogReadRecord.
212 */
213 XLogRecord *
XLogReadRecord(XLogReaderState * state,XLogRecPtr RecPtr,char ** errormsg)214 XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
215 {
216 XLogRecord *record;
217 XLogRecPtr targetPagePtr;
218 bool randAccess;
219 uint32 len,
220 total_len;
221 uint32 targetRecOff;
222 uint32 pageHeaderSize;
223 bool assembled;
224 bool gotheader;
225 int readOff;
226
227 /*
228 * randAccess indicates whether to verify the previous-record pointer of
229 * the record we're reading. We only do this if we're reading
230 * sequentially, which is what we initially assume.
231 */
232 randAccess = false;
233
234 /* reset error state */
235 *errormsg = NULL;
236 state->errormsg_buf[0] = '\0';
237
238 ResetDecoder(state);
239 state->abortedRecPtr = InvalidXLogRecPtr;
240 state->missingContrecPtr = InvalidXLogRecPtr;
241
242 if (RecPtr == InvalidXLogRecPtr)
243 {
244 /* No explicit start point; read the record after the one we just read */
245 RecPtr = state->EndRecPtr;
246
247 if (state->ReadRecPtr == InvalidXLogRecPtr)
248 randAccess = true;
249
250 /*
251 * RecPtr is pointing to end+1 of the previous WAL record. If we're
252 * at a page boundary, no more records can fit on the current page. We
253 * must skip over the page header, but we can't do that until we've
254 * read in the page, since the header size is variable.
255 */
256 }
257 else
258 {
259 /*
260 * Caller supplied a position to start at.
261 *
262 * In this case, the passed-in record pointer should already be
263 * pointing to a valid record starting position.
264 */
265 Assert(XRecOffIsValid(RecPtr));
266 randAccess = true;
267 }
268
269 restart:
270 state->currRecPtr = RecPtr;
271 assembled = false;
272
273 targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
274 targetRecOff = RecPtr % XLOG_BLCKSZ;
275
276 /*
277 * Read the page containing the record into state->readBuf. Request enough
278 * byte to cover the whole record header, or at least the part of it that
279 * fits on the same page.
280 */
281 readOff = ReadPageInternal(state,
282 targetPagePtr,
283 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
284 if (readOff < 0)
285 goto err;
286
287 /*
288 * ReadPageInternal always returns at least the page header, so we can
289 * examine it now.
290 */
291 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
292 if (targetRecOff == 0)
293 {
294 /*
295 * At page start, so skip over page header.
296 */
297 RecPtr += pageHeaderSize;
298 targetRecOff = pageHeaderSize;
299 }
300 else if (targetRecOff < pageHeaderSize)
301 {
302 report_invalid_record(state, "invalid record offset at %X/%X",
303 (uint32) (RecPtr >> 32), (uint32) RecPtr);
304 goto err;
305 }
306
307 if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
308 targetRecOff == pageHeaderSize)
309 {
310 report_invalid_record(state, "contrecord is requested by %X/%X",
311 (uint32) (RecPtr >> 32), (uint32) RecPtr);
312 goto err;
313 }
314
315 /* ReadPageInternal has verified the page header */
316 Assert(pageHeaderSize <= readOff);
317
318 /*
319 * Read the record length.
320 *
321 * NB: Even though we use an XLogRecord pointer here, the whole record
322 * header might not fit on this page. xl_tot_len is the first field of the
323 * struct, so it must be on this page (the records are MAXALIGNed), but we
324 * cannot access any other fields until we've verified that we got the
325 * whole header.
326 */
327 record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
328 total_len = record->xl_tot_len;
329
330 /*
331 * If the whole record header is on this page, validate it immediately.
332 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
333 * rest of the header after reading it from the next page. The xl_tot_len
334 * check is necessary here to ensure that we enter the "Need to reassemble
335 * record" code path below; otherwise we might fail to apply
336 * ValidXLogRecordHeader at all.
337 */
338 if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
339 {
340 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
341 randAccess))
342 goto err;
343 gotheader = true;
344 }
345 else
346 {
347 /* XXX: more validation should be done here */
348 if (total_len < SizeOfXLogRecord)
349 {
350 report_invalid_record(state,
351 "invalid record length at %X/%X: wanted %u, got %u",
352 (uint32) (RecPtr >> 32), (uint32) RecPtr,
353 (uint32) SizeOfXLogRecord, total_len);
354 goto err;
355 }
356 gotheader = false;
357 }
358
359 /*
360 * Enlarge readRecordBuf as needed.
361 */
362 if (total_len > state->readRecordBufSize &&
363 !allocate_recordbuf(state, total_len))
364 {
365 /* We treat this as a "bogus data" condition */
366 report_invalid_record(state, "record length %u at %X/%X too long",
367 total_len,
368 (uint32) (RecPtr >> 32), (uint32) RecPtr);
369 goto err;
370 }
371
372 len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
373 if (total_len > len)
374 {
375 /* Need to reassemble record */
376 char *contdata;
377 XLogPageHeader pageHeader;
378 char *buffer;
379 uint32 gotlen;
380
381 assembled = true;
382 /* Copy the first fragment of the record from the first page. */
383 memcpy(state->readRecordBuf,
384 state->readBuf + RecPtr % XLOG_BLCKSZ, len);
385 buffer = state->readRecordBuf + len;
386 gotlen = len;
387
388 do
389 {
390 /* Calculate pointer to beginning of next page */
391 targetPagePtr += XLOG_BLCKSZ;
392
393 /* Wait for the next page to become available */
394 readOff = ReadPageInternal(state, targetPagePtr,
395 Min(total_len - gotlen + SizeOfXLogShortPHD,
396 XLOG_BLCKSZ));
397
398 if (readOff < 0)
399 goto err;
400
401 Assert(SizeOfXLogShortPHD <= readOff);
402
403 pageHeader = (XLogPageHeader) state->readBuf;
404
405 /*
406 * If we were expecting a continuation record and got an
407 * "overwrite contrecord" flag, that means the continuation record
408 * was overwritten with a different record. Restart the read by
409 * assuming the address to read is the location where we found
410 * this flag; but keep track of the LSN of the record we were
411 * reading, for later verification.
412 */
413 if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
414 {
415 state->overwrittenRecPtr = state->currRecPtr;
416 ResetDecoder(state);
417 RecPtr = targetPagePtr;
418 goto restart;
419 }
420
421 /* Check that the continuation on next page looks valid */
422 if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
423 {
424 report_invalid_record(state,
425 "there is no contrecord flag at %X/%X",
426 (uint32) (RecPtr >> 32), (uint32) RecPtr);
427 goto err;
428 }
429
430 /*
431 * Cross-check that xlp_rem_len agrees with how much of the record
432 * we expect there to be left.
433 */
434 if (pageHeader->xlp_rem_len == 0 ||
435 total_len != (pageHeader->xlp_rem_len + gotlen))
436 {
437 report_invalid_record(state,
438 "invalid contrecord length %u at %X/%X",
439 pageHeader->xlp_rem_len,
440 (uint32) (RecPtr >> 32), (uint32) RecPtr);
441 goto err;
442 }
443
444 /* Append the continuation from this page to the buffer */
445 pageHeaderSize = XLogPageHeaderSize(pageHeader);
446
447 if (readOff < pageHeaderSize)
448 readOff = ReadPageInternal(state, targetPagePtr,
449 pageHeaderSize);
450
451 Assert(pageHeaderSize <= readOff);
452
453 contdata = (char *) state->readBuf + pageHeaderSize;
454 len = XLOG_BLCKSZ - pageHeaderSize;
455 if (pageHeader->xlp_rem_len < len)
456 len = pageHeader->xlp_rem_len;
457
458 if (readOff < pageHeaderSize + len)
459 readOff = ReadPageInternal(state, targetPagePtr,
460 pageHeaderSize + len);
461
462 memcpy(buffer, (char *) contdata, len);
463 buffer += len;
464 gotlen += len;
465
466 /* If we just reassembled the record header, validate it. */
467 if (!gotheader)
468 {
469 record = (XLogRecord *) state->readRecordBuf;
470 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
471 record, randAccess))
472 goto err;
473 gotheader = true;
474 }
475 } while (gotlen < total_len);
476
477 Assert(gotheader);
478
479 record = (XLogRecord *) state->readRecordBuf;
480 if (!ValidXLogRecord(state, record, RecPtr))
481 goto err;
482
483 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
484 state->ReadRecPtr = RecPtr;
485 state->EndRecPtr = targetPagePtr + pageHeaderSize
486 + MAXALIGN(pageHeader->xlp_rem_len);
487 }
488 else
489 {
490 /* Wait for the record data to become available */
491 readOff = ReadPageInternal(state, targetPagePtr,
492 Min(targetRecOff + total_len, XLOG_BLCKSZ));
493 if (readOff < 0)
494 goto err;
495
496 /* Record does not cross a page boundary */
497 if (!ValidXLogRecord(state, record, RecPtr))
498 goto err;
499
500 state->EndRecPtr = RecPtr + MAXALIGN(total_len);
501
502 state->ReadRecPtr = RecPtr;
503 memcpy(state->readRecordBuf, record, total_len);
504 }
505
506 /*
507 * Special processing if it's an XLOG SWITCH record
508 */
509 if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
510 {
511 /* Pretend it extends to end of segment */
512 state->EndRecPtr += XLogSegSize - 1;
513 state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
514 }
515
516 if (DecodeXLogRecord(state, record, errormsg))
517 return record;
518 else
519 return NULL;
520
521 err:
522 if (assembled)
523 {
524 /*
525 * We get here when a record that spans multiple pages needs to be
526 * assembled, but something went wrong -- perhaps a contrecord piece
527 * was lost. If caller is WAL replay, it will know where the aborted
528 * record was and where to direct followup WAL to be written, marking
529 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
530 * in turn signal downstream WAL consumers that the broken WAL record
531 * is to be ignored.
532 */
533 state->abortedRecPtr = RecPtr;
534 state->missingContrecPtr = targetPagePtr;
535 }
536
537 /*
538 * Invalidate the read state. We might read from a different source after
539 * failure.
540 */
541 XLogReaderInvalReadState(state);
542
543 if (state->errormsg_buf[0] != '\0')
544 *errormsg = state->errormsg_buf;
545
546 return NULL;
547 }
548
549 /*
550 * Read a single xlog page including at least [pageptr, reqLen] of valid data
551 * via the read_page() callback.
552 *
553 * Returns -1 if the required page cannot be read for some reason; errormsg_buf
554 * is set in that case (unless the error occurs in the read_page callback).
555 *
556 * We fetch the page from a reader-local cache if we know we have the required
557 * data and if there hasn't been any error since caching the data.
558 */
559 static int
ReadPageInternal(XLogReaderState * state,XLogRecPtr pageptr,int reqLen)560 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
561 {
562 int readLen;
563 uint32 targetPageOff;
564 XLogSegNo targetSegNo;
565 XLogPageHeader hdr;
566
567 Assert((pageptr % XLOG_BLCKSZ) == 0);
568
569 XLByteToSeg(pageptr, targetSegNo);
570 targetPageOff = (pageptr % XLogSegSize);
571
572 /* check whether we have all the requested data already */
573 if (targetSegNo == state->readSegNo && targetPageOff == state->readOff &&
574 reqLen < state->readLen)
575 return state->readLen;
576
577 /*
578 * Data is not in our buffer.
579 *
580 * Every time we actually read the page, even if we looked at parts of it
581 * before, we need to do verification as the read_page callback might now
582 * be rereading data from a different source.
583 *
584 * Whenever switching to a new WAL segment, we read the first page of the
585 * file and validate its header, even if that's not where the target
586 * record is. This is so that we can check the additional identification
587 * info that is present in the first page's "long" header.
588 */
589 if (targetSegNo != state->readSegNo && targetPageOff != 0)
590 {
591 XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
592
593 readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
594 state->currRecPtr,
595 state->readBuf, &state->readPageTLI);
596 if (readLen < 0)
597 goto err;
598
599 /* we can be sure to have enough WAL available, we scrolled back */
600 Assert(readLen == XLOG_BLCKSZ);
601
602 if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
603 state->readBuf))
604 goto err;
605 }
606
607 /*
608 * First, read the requested data length, but at least a short page header
609 * so that we can validate it.
610 */
611 readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
612 state->currRecPtr,
613 state->readBuf, &state->readPageTLI);
614 if (readLen < 0)
615 goto err;
616
617 Assert(readLen <= XLOG_BLCKSZ);
618
619 /* Do we have enough data to check the header length? */
620 if (readLen <= SizeOfXLogShortPHD)
621 goto err;
622
623 Assert(readLen >= reqLen);
624
625 hdr = (XLogPageHeader) state->readBuf;
626
627 /* still not enough */
628 if (readLen < XLogPageHeaderSize(hdr))
629 {
630 readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
631 state->currRecPtr,
632 state->readBuf, &state->readPageTLI);
633 if (readLen < 0)
634 goto err;
635 }
636
637 /*
638 * Now that we know we have the full header, validate it.
639 */
640 if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
641 goto err;
642
643 /* update read state information */
644 state->readSegNo = targetSegNo;
645 state->readOff = targetPageOff;
646 state->readLen = readLen;
647
648 return readLen;
649
650 err:
651 XLogReaderInvalReadState(state);
652 return -1;
653 }
654
655 /*
656 * Invalidate the xlogreader's read state to force a re-read.
657 */
658 void
XLogReaderInvalReadState(XLogReaderState * state)659 XLogReaderInvalReadState(XLogReaderState *state)
660 {
661 state->readSegNo = 0;
662 state->readOff = 0;
663 state->readLen = 0;
664 }
665
666 /*
667 * Validate an XLOG record header.
668 *
669 * This is just a convenience subroutine to avoid duplicated code in
670 * XLogReadRecord. It's not intended for use from anywhere else.
671 */
672 static bool
ValidXLogRecordHeader(XLogReaderState * state,XLogRecPtr RecPtr,XLogRecPtr PrevRecPtr,XLogRecord * record,bool randAccess)673 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
674 XLogRecPtr PrevRecPtr, XLogRecord *record,
675 bool randAccess)
676 {
677 if (record->xl_tot_len < SizeOfXLogRecord)
678 {
679 report_invalid_record(state,
680 "invalid record length at %X/%X: wanted %u, got %u",
681 (uint32) (RecPtr >> 32), (uint32) RecPtr,
682 (uint32) SizeOfXLogRecord, record->xl_tot_len);
683 return false;
684 }
685 if (record->xl_rmid > RM_MAX_ID)
686 {
687 report_invalid_record(state,
688 "invalid resource manager ID %u at %X/%X",
689 record->xl_rmid, (uint32) (RecPtr >> 32),
690 (uint32) RecPtr);
691 return false;
692 }
693 if (randAccess)
694 {
695 /*
696 * We can't exactly verify the prev-link, but surely it should be less
697 * than the record's own address.
698 */
699 if (!(record->xl_prev < RecPtr))
700 {
701 report_invalid_record(state,
702 "record with incorrect prev-link %X/%X at %X/%X",
703 (uint32) (record->xl_prev >> 32),
704 (uint32) record->xl_prev,
705 (uint32) (RecPtr >> 32), (uint32) RecPtr);
706 return false;
707 }
708 }
709 else
710 {
711 /*
712 * Record's prev-link should exactly match our previous location. This
713 * check guards against torn WAL pages where a stale but valid-looking
714 * WAL record starts on a sector boundary.
715 */
716 if (record->xl_prev != PrevRecPtr)
717 {
718 report_invalid_record(state,
719 "record with incorrect prev-link %X/%X at %X/%X",
720 (uint32) (record->xl_prev >> 32),
721 (uint32) record->xl_prev,
722 (uint32) (RecPtr >> 32), (uint32) RecPtr);
723 return false;
724 }
725 }
726
727 return true;
728 }
729
730
731 /*
732 * CRC-check an XLOG record. We do not believe the contents of an XLOG
733 * record (other than to the minimal extent of computing the amount of
734 * data to read in) until we've checked the CRCs.
735 *
736 * We assume all of the record (that is, xl_tot_len bytes) has been read
737 * into memory at *record. Also, ValidXLogRecordHeader() has accepted the
738 * record's header, which means in particular that xl_tot_len is at least
739 * SizeOfXlogRecord.
740 */
741 static bool
ValidXLogRecord(XLogReaderState * state,XLogRecord * record,XLogRecPtr recptr)742 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
743 {
744 pg_crc32c crc;
745
746 /* Calculate the CRC */
747 INIT_CRC32C(crc);
748 COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
749 /* include the record header last */
750 COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
751 FIN_CRC32C(crc);
752
753 if (!EQ_CRC32C(record->xl_crc, crc))
754 {
755 report_invalid_record(state,
756 "incorrect resource manager data checksum in record at %X/%X",
757 (uint32) (recptr >> 32), (uint32) recptr);
758 return false;
759 }
760
761 return true;
762 }
763
764 /*
765 * Validate a page header.
766 *
767 * Check if 'phdr' is valid as the header of the XLog page at position
768 * 'recptr'.
769 */
770 bool
XLogReaderValidatePageHeader(XLogReaderState * state,XLogRecPtr recptr,char * phdr)771 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
772 char *phdr)
773 {
774 XLogRecPtr recaddr;
775 XLogSegNo segno;
776 int32 offset;
777 XLogPageHeader hdr = (XLogPageHeader) phdr;
778
779 Assert((recptr % XLOG_BLCKSZ) == 0);
780
781 XLByteToSeg(recptr, segno);
782 offset = recptr % XLogSegSize;
783
784 XLogSegNoOffsetToRecPtr(segno, offset, recaddr);
785
786 if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
787 {
788 char fname[MAXFNAMELEN];
789
790 XLogFileName(fname, state->readPageTLI, segno);
791
792 report_invalid_record(state,
793 "invalid magic number %04X in log segment %s, offset %u",
794 hdr->xlp_magic,
795 fname,
796 offset);
797 return false;
798 }
799
800 if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
801 {
802 char fname[MAXFNAMELEN];
803
804 XLogFileName(fname, state->readPageTLI, segno);
805
806 report_invalid_record(state,
807 "invalid info bits %04X in log segment %s, offset %u",
808 hdr->xlp_info,
809 fname,
810 offset);
811 return false;
812 }
813
814 if (hdr->xlp_info & XLP_LONG_HEADER)
815 {
816 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
817
818 if (state->system_identifier &&
819 longhdr->xlp_sysid != state->system_identifier)
820 {
821 char fhdrident_str[32];
822 char sysident_str[32];
823
824 /*
825 * Format sysids separately to keep platform-dependent format code
826 * out of the translatable message string.
827 */
828 snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
829 longhdr->xlp_sysid);
830 snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
831 state->system_identifier);
832 report_invalid_record(state,
833 "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s",
834 fhdrident_str, sysident_str);
835 return false;
836 }
837 else if (longhdr->xlp_seg_size != XLogSegSize)
838 {
839 report_invalid_record(state,
840 "WAL file is from different database system: incorrect XLOG_SEG_SIZE in page header");
841 return false;
842 }
843 else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
844 {
845 report_invalid_record(state,
846 "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
847 return false;
848 }
849 }
850 else if (offset == 0)
851 {
852 char fname[MAXFNAMELEN];
853
854 XLogFileName(fname, state->readPageTLI, segno);
855
856 /* hmm, first page of file doesn't have a long header? */
857 report_invalid_record(state,
858 "invalid info bits %04X in log segment %s, offset %u",
859 hdr->xlp_info,
860 fname,
861 offset);
862 return false;
863 }
864
865 /*
866 * Check that the address on the page agrees with what we expected.
867 * This check typically fails when an old WAL segment is recycled,
868 * and hasn't yet been overwritten with new data yet.
869 */
870 if (hdr->xlp_pageaddr != recaddr)
871 {
872 char fname[MAXFNAMELEN];
873
874 XLogFileName(fname, state->readPageTLI, segno);
875
876 report_invalid_record(state,
877 "unexpected pageaddr %X/%X in log segment %s, offset %u",
878 (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr,
879 fname,
880 offset);
881 return false;
882 }
883
884 /*
885 * Since child timelines are always assigned a TLI greater than their
886 * immediate parent's TLI, we should never see TLI go backwards across
887 * successive pages of a consistent WAL sequence.
888 *
889 * Sometimes we re-read a segment that's already been (partially) read. So
890 * we only verify TLIs for pages that are later than the last remembered
891 * LSN.
892 */
893 if (recptr > state->latestPagePtr)
894 {
895 if (hdr->xlp_tli < state->latestPageTLI)
896 {
897 char fname[MAXFNAMELEN];
898
899 XLogFileName(fname, state->readPageTLI, segno);
900
901 report_invalid_record(state,
902 "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u",
903 hdr->xlp_tli,
904 state->latestPageTLI,
905 fname,
906 offset);
907 return false;
908 }
909 }
910 state->latestPagePtr = recptr;
911 state->latestPageTLI = hdr->xlp_tli;
912
913 return true;
914 }
915
916 #ifdef FRONTEND
917 /*
918 * Functions that are currently not needed in the backend, but are better
919 * implemented inside xlogreader.c because of the internal facilities available
920 * here.
921 */
922
923 /*
924 * Find the first record with an lsn >= RecPtr.
925 *
926 * Useful for checking whether RecPtr is a valid xlog address for reading, and
927 * to find the first valid address after some address when dumping records for
928 * debugging purposes.
929 */
930 XLogRecPtr
XLogFindNextRecord(XLogReaderState * state,XLogRecPtr RecPtr)931 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
932 {
933 XLogReaderState saved_state = *state;
934 XLogRecPtr tmpRecPtr;
935 XLogRecPtr found = InvalidXLogRecPtr;
936 XLogPageHeader header;
937 char *errormsg;
938
939 Assert(!XLogRecPtrIsInvalid(RecPtr));
940
941 /*
942 * skip over potential continuation data, keeping in mind that it may span
943 * multiple pages
944 */
945 tmpRecPtr = RecPtr;
946 while (true)
947 {
948 XLogRecPtr targetPagePtr;
949 int targetRecOff;
950 uint32 pageHeaderSize;
951 int readLen;
952
953 /*
954 * Compute targetRecOff. It should typically be equal or greater than
955 * short page-header since a valid record can't start anywhere before
956 * that, except when caller has explicitly specified the offset that
957 * falls somewhere there or when we are skipping multi-page
958 * continuation record. It doesn't matter though because
959 * ReadPageInternal() is prepared to handle that and will read at least
960 * short page-header worth of data
961 */
962 targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
963
964 /* scroll back to page boundary */
965 targetPagePtr = tmpRecPtr - targetRecOff;
966
967 /* Read the page containing the record */
968 readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
969 if (readLen < 0)
970 goto err;
971
972 header = (XLogPageHeader) state->readBuf;
973
974 pageHeaderSize = XLogPageHeaderSize(header);
975
976 /* make sure we have enough data for the page header */
977 readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
978 if (readLen < 0)
979 goto err;
980
981 /* skip over potential continuation data */
982 if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
983 {
984 /*
985 * If the length of the remaining continuation data is more than
986 * what can fit in this page, the continuation record crosses over
987 * this page. Read the next page and try again. xlp_rem_len in the
988 * next page header will contain the remaining length of the
989 * continuation data
990 *
991 * Note that record headers are MAXALIGN'ed
992 */
993 if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
994 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
995 else
996 {
997 /*
998 * The previous continuation record ends in this page. Set
999 * tmpRecPtr to point to the first valid record
1000 */
1001 tmpRecPtr = targetPagePtr + pageHeaderSize
1002 + MAXALIGN(header->xlp_rem_len);
1003 break;
1004 }
1005 }
1006 else
1007 {
1008 tmpRecPtr = targetPagePtr + pageHeaderSize;
1009 break;
1010 }
1011 }
1012
1013 /*
1014 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
1015 * because either we're at the first record after the beginning of a page
1016 * or we just jumped over the remaining data of a continuation.
1017 */
1018 while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL)
1019 {
1020 /* continue after the record */
1021 tmpRecPtr = InvalidXLogRecPtr;
1022
1023 /* past the record we've found, break out */
1024 if (RecPtr <= state->ReadRecPtr)
1025 {
1026 found = state->ReadRecPtr;
1027 goto out;
1028 }
1029 }
1030
1031 err:
1032 out:
1033 /* Reset state to what we had before finding the record */
1034 state->ReadRecPtr = saved_state.ReadRecPtr;
1035 state->EndRecPtr = saved_state.EndRecPtr;
1036 XLogReaderInvalReadState(state);
1037
1038 return found;
1039 }
1040
1041 #endif /* FRONTEND */
1042
1043
1044 /* ----------------------------------------
1045 * Functions for decoding the data and block references in a record.
1046 * ----------------------------------------
1047 */
1048
1049 /* private function to reset the state between records */
1050 static void
ResetDecoder(XLogReaderState * state)1051 ResetDecoder(XLogReaderState *state)
1052 {
1053 int block_id;
1054
1055 state->decoded_record = NULL;
1056
1057 state->main_data_len = 0;
1058
1059 for (block_id = 0; block_id <= state->max_block_id; block_id++)
1060 {
1061 state->blocks[block_id].in_use = false;
1062 state->blocks[block_id].has_image = false;
1063 state->blocks[block_id].has_data = false;
1064 }
1065 state->max_block_id = -1;
1066 }
1067
1068 /*
1069 * Decode the previously read record.
1070 *
1071 * On error, a human-readable error message is returned in *errormsg, and
1072 * the return value is false.
1073 */
1074 bool
DecodeXLogRecord(XLogReaderState * state,XLogRecord * record,char ** errormsg)1075 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
1076 {
1077 /*
1078 * read next _size bytes from record buffer, but check for overrun first.
1079 */
1080 #define COPY_HEADER_FIELD(_dst, _size) \
1081 do { \
1082 if (remaining < _size) \
1083 goto shortdata_err; \
1084 memcpy(_dst, ptr, _size); \
1085 ptr += _size; \
1086 remaining -= _size; \
1087 } while(0)
1088
1089 char *ptr;
1090 uint32 remaining;
1091 uint32 datatotal;
1092 RelFileNode *rnode = NULL;
1093 uint8 block_id;
1094
1095 ResetDecoder(state);
1096
1097 state->decoded_record = record;
1098 state->record_origin = InvalidRepOriginId;
1099
1100 ptr = (char *) record;
1101 ptr += SizeOfXLogRecord;
1102 remaining = record->xl_tot_len - SizeOfXLogRecord;
1103
1104 /* Decode the headers */
1105 datatotal = 0;
1106 while (remaining > datatotal)
1107 {
1108 COPY_HEADER_FIELD(&block_id, sizeof(uint8));
1109
1110 if (block_id == XLR_BLOCK_ID_DATA_SHORT)
1111 {
1112 /* XLogRecordDataHeaderShort */
1113 uint8 main_data_len;
1114
1115 COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
1116
1117 state->main_data_len = main_data_len;
1118 datatotal += main_data_len;
1119 break; /* by convention, the main data fragment is
1120 * always last */
1121 }
1122 else if (block_id == XLR_BLOCK_ID_DATA_LONG)
1123 {
1124 /* XLogRecordDataHeaderLong */
1125 uint32 main_data_len;
1126
1127 COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
1128 state->main_data_len = main_data_len;
1129 datatotal += main_data_len;
1130 break; /* by convention, the main data fragment is
1131 * always last */
1132 }
1133 else if (block_id == XLR_BLOCK_ID_ORIGIN)
1134 {
1135 COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
1136 }
1137 else if (block_id <= XLR_MAX_BLOCK_ID)
1138 {
1139 /* XLogRecordBlockHeader */
1140 DecodedBkpBlock *blk;
1141 uint8 fork_flags;
1142
1143 if (block_id <= state->max_block_id)
1144 {
1145 report_invalid_record(state,
1146 "out-of-order block_id %u at %X/%X",
1147 block_id,
1148 (uint32) (state->ReadRecPtr >> 32),
1149 (uint32) state->ReadRecPtr);
1150 goto err;
1151 }
1152 state->max_block_id = block_id;
1153
1154 blk = &state->blocks[block_id];
1155 blk->in_use = true;
1156
1157 COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
1158 blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
1159 blk->flags = fork_flags;
1160 blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
1161 blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
1162
1163 COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
1164 /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
1165 if (blk->has_data && blk->data_len == 0)
1166 {
1167 report_invalid_record(state,
1168 "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
1169 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1170 goto err;
1171 }
1172 if (!blk->has_data && blk->data_len != 0)
1173 {
1174 report_invalid_record(state,
1175 "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
1176 (unsigned int) blk->data_len,
1177 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1178 goto err;
1179 }
1180 datatotal += blk->data_len;
1181
1182 if (blk->has_image)
1183 {
1184 COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
1185 COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
1186 COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
1187 if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
1188 {
1189 if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
1190 COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
1191 else
1192 blk->hole_length = 0;
1193 }
1194 else
1195 blk->hole_length = BLCKSZ - blk->bimg_len;
1196 datatotal += blk->bimg_len;
1197
1198 /*
1199 * cross-check that hole_offset > 0, hole_length > 0 and
1200 * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
1201 */
1202 if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1203 (blk->hole_offset == 0 ||
1204 blk->hole_length == 0 ||
1205 blk->bimg_len == BLCKSZ))
1206 {
1207 report_invalid_record(state,
1208 "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
1209 (unsigned int) blk->hole_offset,
1210 (unsigned int) blk->hole_length,
1211 (unsigned int) blk->bimg_len,
1212 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1213 goto err;
1214 }
1215
1216 /*
1217 * cross-check that hole_offset == 0 and hole_length == 0 if
1218 * the HAS_HOLE flag is not set.
1219 */
1220 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1221 (blk->hole_offset != 0 || blk->hole_length != 0))
1222 {
1223 report_invalid_record(state,
1224 "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
1225 (unsigned int) blk->hole_offset,
1226 (unsigned int) blk->hole_length,
1227 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1228 goto err;
1229 }
1230
1231 /*
1232 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
1233 * flag is set.
1234 */
1235 if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1236 blk->bimg_len == BLCKSZ)
1237 {
1238 report_invalid_record(state,
1239 "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
1240 (unsigned int) blk->bimg_len,
1241 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1242 goto err;
1243 }
1244
1245 /*
1246 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor
1247 * IS_COMPRESSED flag is set.
1248 */
1249 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
1250 !(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
1251 blk->bimg_len != BLCKSZ)
1252 {
1253 report_invalid_record(state,
1254 "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
1255 (unsigned int) blk->data_len,
1256 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1257 goto err;
1258 }
1259 }
1260 if (!(fork_flags & BKPBLOCK_SAME_REL))
1261 {
1262 COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
1263 rnode = &blk->rnode;
1264 }
1265 else
1266 {
1267 if (rnode == NULL)
1268 {
1269 report_invalid_record(state,
1270 "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
1271 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1272 goto err;
1273 }
1274
1275 blk->rnode = *rnode;
1276 }
1277 COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
1278 }
1279 else
1280 {
1281 report_invalid_record(state,
1282 "invalid block_id %u at %X/%X",
1283 block_id,
1284 (uint32) (state->ReadRecPtr >> 32),
1285 (uint32) state->ReadRecPtr);
1286 goto err;
1287 }
1288 }
1289
1290 if (remaining != datatotal)
1291 goto shortdata_err;
1292
1293 /*
1294 * Ok, we've parsed the fragment headers, and verified that the total
1295 * length of the payload in the fragments is equal to the amount of data
1296 * left. Copy the data of each fragment to a separate buffer.
1297 *
1298 * We could just set up pointers into readRecordBuf, but we want to align
1299 * the data for the convenience of the callers. Backup images are not
1300 * copied, however; they don't need alignment.
1301 */
1302
1303 /* block data first */
1304 for (block_id = 0; block_id <= state->max_block_id; block_id++)
1305 {
1306 DecodedBkpBlock *blk = &state->blocks[block_id];
1307
1308 if (!blk->in_use)
1309 continue;
1310 if (blk->has_image)
1311 {
1312 blk->bkp_image = ptr;
1313 ptr += blk->bimg_len;
1314 }
1315 if (blk->has_data)
1316 {
1317 if (!blk->data || blk->data_len > blk->data_bufsz)
1318 {
1319 if (blk->data)
1320 pfree(blk->data);
1321 blk->data_bufsz = blk->data_len;
1322 blk->data = palloc(blk->data_bufsz);
1323 }
1324 memcpy(blk->data, ptr, blk->data_len);
1325 ptr += blk->data_len;
1326 }
1327 }
1328
1329 /* and finally, the main data */
1330 if (state->main_data_len > 0)
1331 {
1332 if (!state->main_data || state->main_data_len > state->main_data_bufsz)
1333 {
1334 if (state->main_data)
1335 pfree(state->main_data);
1336
1337 /*
1338 * main_data_bufsz must be MAXALIGN'ed. In many xlog record
1339 * types, we omit trailing struct padding on-disk to save a few
1340 * bytes; but compilers may generate accesses to the xlog struct
1341 * that assume that padding bytes are present. If the palloc
1342 * request is not large enough to include such padding bytes then
1343 * we'll get valgrind complaints due to otherwise-harmless fetches
1344 * of the padding bytes.
1345 *
1346 * In addition, force the initial request to be reasonably large
1347 * so that we don't waste time with lots of trips through this
1348 * stanza. BLCKSZ / 2 seems like a good compromise choice.
1349 */
1350 state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
1351 BLCKSZ / 2));
1352 state->main_data = palloc(state->main_data_bufsz);
1353 }
1354 memcpy(state->main_data, ptr, state->main_data_len);
1355 ptr += state->main_data_len;
1356 }
1357
1358 return true;
1359
1360 shortdata_err:
1361 report_invalid_record(state,
1362 "record with invalid length at %X/%X",
1363 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
1364 err:
1365 *errormsg = state->errormsg_buf;
1366
1367 return false;
1368 }
1369
1370 /*
1371 * Returns information about the block that a block reference refers to.
1372 *
1373 * If the WAL record contains a block reference with the given ID, *rnode,
1374 * *forknum, and *blknum are filled in (if not NULL), and returns TRUE.
1375 * Otherwise returns FALSE.
1376 */
1377 bool
XLogRecGetBlockTag(XLogReaderState * record,uint8 block_id,RelFileNode * rnode,ForkNumber * forknum,BlockNumber * blknum)1378 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
1379 RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
1380 {
1381 DecodedBkpBlock *bkpb;
1382
1383 if (!record->blocks[block_id].in_use)
1384 return false;
1385
1386 bkpb = &record->blocks[block_id];
1387 if (rnode)
1388 *rnode = bkpb->rnode;
1389 if (forknum)
1390 *forknum = bkpb->forknum;
1391 if (blknum)
1392 *blknum = bkpb->blkno;
1393 return true;
1394 }
1395
1396 /*
1397 * Returns the data associated with a block reference, or NULL if there is
1398 * no data (e.g. because a full-page image was taken instead). The returned
1399 * pointer points to a MAXALIGNed buffer.
1400 */
1401 char *
XLogRecGetBlockData(XLogReaderState * record,uint8 block_id,Size * len)1402 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
1403 {
1404 DecodedBkpBlock *bkpb;
1405
1406 if (!record->blocks[block_id].in_use)
1407 return NULL;
1408
1409 bkpb = &record->blocks[block_id];
1410
1411 if (!bkpb->has_data)
1412 {
1413 if (len)
1414 *len = 0;
1415 return NULL;
1416 }
1417 else
1418 {
1419 if (len)
1420 *len = bkpb->data_len;
1421 return bkpb->data;
1422 }
1423 }
1424
1425 /*
1426 * Restore a full-page image from a backup block attached to an XLOG record.
1427 *
1428 * Returns true if a full-page image is restored.
1429 */
1430 bool
RestoreBlockImage(XLogReaderState * record,uint8 block_id,char * page)1431 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
1432 {
1433 DecodedBkpBlock *bkpb;
1434 char *ptr;
1435 PGAlignedBlock tmp;
1436
1437 if (!record->blocks[block_id].in_use)
1438 return false;
1439 if (!record->blocks[block_id].has_image)
1440 return false;
1441
1442 bkpb = &record->blocks[block_id];
1443 ptr = bkpb->bkp_image;
1444
1445 if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
1446 {
1447 /* If a backup block image is compressed, decompress it */
1448 if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
1449 BLCKSZ - bkpb->hole_length) < 0)
1450 {
1451 report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
1452 (uint32) (record->ReadRecPtr >> 32),
1453 (uint32) record->ReadRecPtr,
1454 block_id);
1455 return false;
1456 }
1457 ptr = tmp.data;
1458 }
1459
1460 /* generate page, taking into account hole if necessary */
1461 if (bkpb->hole_length == 0)
1462 {
1463 memcpy(page, ptr, BLCKSZ);
1464 }
1465 else
1466 {
1467 memcpy(page, ptr, bkpb->hole_offset);
1468 /* must zero-fill the hole */
1469 MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
1470 memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
1471 ptr + bkpb->hole_offset,
1472 BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
1473 }
1474
1475 return true;
1476 }
1477