1 /*------------------------------------------------------------------------- 2 * 3 * xlogreader.c 4 * Generic XLog reading facility 5 * 6 * Portions Copyright (c) 2013-2018, PostgreSQL Global Development Group 7 * 8 * IDENTIFICATION 9 * src/backend/access/transam/xlogreader.c 10 * 11 * NOTES 12 * See xlogreader.h for more notes on this facility. 13 * 14 * This file is compiled as both front-end and backend code, so it 15 * may not use ereport, server-defined static variables, etc. 16 *------------------------------------------------------------------------- 17 */ 18 #include "postgres.h" 19 20 #include "access/transam.h" 21 #include "access/xlogrecord.h" 22 #include "access/xlog_internal.h" 23 #include "access/xlogreader.h" 24 #include "catalog/pg_control.h" 25 #include "common/pg_lzcompress.h" 26 #include "replication/origin.h" 27 28 #ifndef FRONTEND 29 #include "utils/memutils.h" 30 #endif 31 32 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); 33 34 static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, 35 XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); 36 static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, 37 XLogRecPtr recptr); 38 static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, 39 int reqLen); 40 static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3); 41 42 static void ResetDecoder(XLogReaderState *state); 43 44 /* size of the buffer allocated for error message. */ 45 #define MAX_ERRORMSG_LEN 1000 46 47 /* 48 * Construct a string in state->errormsg_buf explaining what's wrong with 49 * the current record being read. 50 */ 51 static void 52 report_invalid_record(XLogReaderState *state, const char *fmt,...) 53 { 54 va_list args; 55 56 fmt = _(fmt); 57 58 va_start(args, fmt); 59 vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); 60 va_end(args); 61 } 62 63 /* 64 * Allocate and initialize a new XLogReader. 65 * 66 * Returns NULL if the xlogreader couldn't be allocated. 67 */ 68 XLogReaderState * 69 XLogReaderAllocate(int wal_segment_size, XLogPageReadCB pagereadfunc, 70 void *private_data) 71 { 72 XLogReaderState *state; 73 74 state = (XLogReaderState *) 75 palloc_extended(sizeof(XLogReaderState), 76 MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO); 77 if (!state) 78 return NULL; 79 80 state->max_block_id = -1; 81 82 /* 83 * Permanently allocate readBuf. We do it this way, rather than just 84 * making a static array, for two reasons: (1) no need to waste the 85 * storage in most instantiations of the backend; (2) a static char array 86 * isn't guaranteed to have any particular alignment, whereas 87 * palloc_extended() will provide MAXALIGN'd storage. 88 */ 89 state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ, 90 MCXT_ALLOC_NO_OOM); 91 if (!state->readBuf) 92 { 93 pfree(state); 94 return NULL; 95 } 96 97 state->wal_segment_size = wal_segment_size; 98 state->read_page = pagereadfunc; 99 /* system_identifier initialized to zeroes above */ 100 state->private_data = private_data; 101 /* ReadRecPtr and EndRecPtr initialized to zeroes above */ 102 /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */ 103 state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, 104 MCXT_ALLOC_NO_OOM); 105 if (!state->errormsg_buf) 106 { 107 pfree(state->readBuf); 108 pfree(state); 109 return NULL; 110 } 111 state->errormsg_buf[0] = '\0'; 112 113 /* 114 * Allocate an initial readRecordBuf of minimal size, which can later be 115 * enlarged if necessary. 116 */ 117 if (!allocate_recordbuf(state, 0)) 118 { 119 pfree(state->errormsg_buf); 120 pfree(state->readBuf); 121 pfree(state); 122 return NULL; 123 } 124 125 return state; 126 } 127 128 void 129 XLogReaderFree(XLogReaderState *state) 130 { 131 int block_id; 132 133 for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) 134 { 135 if (state->blocks[block_id].data) 136 pfree(state->blocks[block_id].data); 137 } 138 if (state->main_data) 139 pfree(state->main_data); 140 141 pfree(state->errormsg_buf); 142 if (state->readRecordBuf) 143 pfree(state->readRecordBuf); 144 pfree(state->readBuf); 145 pfree(state); 146 } 147 148 /* 149 * Allocate readRecordBuf to fit a record of at least the given length. 150 * Returns true if successful, false if out of memory. 151 * 152 * readRecordBufSize is set to the new buffer size. 153 * 154 * To avoid useless small increases, round its size to a multiple of 155 * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start 156 * with. (That is enough for all "normal" records, but very large commit or 157 * abort records might need more space.) 158 */ 159 static bool 160 allocate_recordbuf(XLogReaderState *state, uint32 reclength) 161 { 162 uint32 newSize = reclength; 163 164 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ); 165 newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ)); 166 167 #ifndef FRONTEND 168 169 /* 170 * Note that in much unlucky circumstances, the random data read from a 171 * recycled segment can cause this routine to be called with a size 172 * causing a hard failure at allocation. For a standby, this would cause 173 * the instance to stop suddenly with a hard failure, preventing it to 174 * retry fetching WAL from one of its sources which could allow it to move 175 * on with replay without a manual restart. If the data comes from a past 176 * recycled segment and is still valid, then the allocation may succeed 177 * but record checks are going to fail so this would be short-lived. If 178 * the allocation fails because of a memory shortage, then this is not a 179 * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM. 180 */ 181 if (!AllocSizeIsValid(newSize)) 182 return false; 183 184 #endif 185 186 if (state->readRecordBuf) 187 pfree(state->readRecordBuf); 188 state->readRecordBuf = 189 (char *) palloc_extended(newSize, MCXT_ALLOC_NO_OOM); 190 if (state->readRecordBuf == NULL) 191 { 192 state->readRecordBufSize = 0; 193 return false; 194 } 195 state->readRecordBufSize = newSize; 196 return true; 197 } 198 199 /* 200 * Attempt to read an XLOG record. 201 * 202 * If RecPtr is valid, try to read a record at that position. Otherwise 203 * try to read a record just after the last one previously read. 204 * 205 * If the read_page callback fails to read the requested data, NULL is 206 * returned. The callback is expected to have reported the error; errormsg 207 * is set to NULL. 208 * 209 * If the reading fails for some other reason, NULL is also returned, and 210 * *errormsg is set to a string with details of the failure. 211 * 212 * The returned pointer (or *errormsg) points to an internal buffer that's 213 * valid until the next call to XLogReadRecord. 214 */ 215 XLogRecord * 216 XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) 217 { 218 XLogRecord *record; 219 XLogRecPtr targetPagePtr; 220 bool randAccess; 221 uint32 len, 222 total_len; 223 uint32 targetRecOff; 224 uint32 pageHeaderSize; 225 bool assembled; 226 bool gotheader; 227 int readOff; 228 229 /* 230 * randAccess indicates whether to verify the previous-record pointer of 231 * the record we're reading. We only do this if we're reading 232 * sequentially, which is what we initially assume. 233 */ 234 randAccess = false; 235 236 /* reset error state */ 237 *errormsg = NULL; 238 state->errormsg_buf[0] = '\0'; 239 240 ResetDecoder(state); 241 state->abortedRecPtr = InvalidXLogRecPtr; 242 state->missingContrecPtr = InvalidXLogRecPtr; 243 244 if (RecPtr == InvalidXLogRecPtr) 245 { 246 /* No explicit start point; read the record after the one we just read */ 247 RecPtr = state->EndRecPtr; 248 249 if (state->ReadRecPtr == InvalidXLogRecPtr) 250 randAccess = true; 251 252 /* 253 * RecPtr is pointing to end+1 of the previous WAL record. If we're 254 * at a page boundary, no more records can fit on the current page. We 255 * must skip over the page header, but we can't do that until we've 256 * read in the page, since the header size is variable. 257 */ 258 } 259 else 260 { 261 /* 262 * Caller supplied a position to start at. 263 * 264 * In this case, the passed-in record pointer should already be 265 * pointing to a valid record starting position. 266 */ 267 Assert(XRecOffIsValid(RecPtr)); 268 randAccess = true; 269 } 270 271 restart: 272 state->currRecPtr = RecPtr; 273 assembled = false; 274 275 targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); 276 targetRecOff = RecPtr % XLOG_BLCKSZ; 277 278 /* 279 * Read the page containing the record into state->readBuf. Request enough 280 * byte to cover the whole record header, or at least the part of it that 281 * fits on the same page. 282 */ 283 readOff = ReadPageInternal(state, 284 targetPagePtr, 285 Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); 286 if (readOff < 0) 287 goto err; 288 289 /* 290 * ReadPageInternal always returns at least the page header, so we can 291 * examine it now. 292 */ 293 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); 294 if (targetRecOff == 0) 295 { 296 /* 297 * At page start, so skip over page header. 298 */ 299 RecPtr += pageHeaderSize; 300 targetRecOff = pageHeaderSize; 301 } 302 else if (targetRecOff < pageHeaderSize) 303 { 304 report_invalid_record(state, "invalid record offset at %X/%X", 305 (uint32) (RecPtr >> 32), (uint32) RecPtr); 306 goto err; 307 } 308 309 if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && 310 targetRecOff == pageHeaderSize) 311 { 312 report_invalid_record(state, "contrecord is requested by %X/%X", 313 (uint32) (RecPtr >> 32), (uint32) RecPtr); 314 goto err; 315 } 316 317 /* ReadPageInternal has verified the page header */ 318 Assert(pageHeaderSize <= readOff); 319 320 /* 321 * Read the record length. 322 * 323 * NB: Even though we use an XLogRecord pointer here, the whole record 324 * header might not fit on this page. xl_tot_len is the first field of the 325 * struct, so it must be on this page (the records are MAXALIGNed), but we 326 * cannot access any other fields until we've verified that we got the 327 * whole header. 328 */ 329 record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); 330 total_len = record->xl_tot_len; 331 332 /* 333 * If the whole record header is on this page, validate it immediately. 334 * Otherwise do just a basic sanity check on xl_tot_len, and validate the 335 * rest of the header after reading it from the next page. The xl_tot_len 336 * check is necessary here to ensure that we enter the "Need to reassemble 337 * record" code path below; otherwise we might fail to apply 338 * ValidXLogRecordHeader at all. 339 */ 340 if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) 341 { 342 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record, 343 randAccess)) 344 goto err; 345 gotheader = true; 346 } 347 else 348 { 349 /* XXX: more validation should be done here */ 350 if (total_len < SizeOfXLogRecord) 351 { 352 report_invalid_record(state, 353 "invalid record length at %X/%X: wanted %u, got %u", 354 (uint32) (RecPtr >> 32), (uint32) RecPtr, 355 (uint32) SizeOfXLogRecord, total_len); 356 goto err; 357 } 358 gotheader = false; 359 } 360 361 /* 362 * Enlarge readRecordBuf as needed. 363 */ 364 if (total_len > state->readRecordBufSize && 365 !allocate_recordbuf(state, total_len)) 366 { 367 /* We treat this as a "bogus data" condition */ 368 report_invalid_record(state, "record length %u at %X/%X too long", 369 total_len, 370 (uint32) (RecPtr >> 32), (uint32) RecPtr); 371 goto err; 372 } 373 374 len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; 375 if (total_len > len) 376 { 377 /* Need to reassemble record */ 378 char *contdata; 379 XLogPageHeader pageHeader; 380 char *buffer; 381 uint32 gotlen; 382 383 assembled = true; 384 /* Copy the first fragment of the record from the first page. */ 385 memcpy(state->readRecordBuf, 386 state->readBuf + RecPtr % XLOG_BLCKSZ, len); 387 buffer = state->readRecordBuf + len; 388 gotlen = len; 389 390 do 391 { 392 /* Calculate pointer to beginning of next page */ 393 targetPagePtr += XLOG_BLCKSZ; 394 395 /* Wait for the next page to become available */ 396 readOff = ReadPageInternal(state, targetPagePtr, 397 Min(total_len - gotlen + SizeOfXLogShortPHD, 398 XLOG_BLCKSZ)); 399 400 if (readOff < 0) 401 goto err; 402 403 Assert(SizeOfXLogShortPHD <= readOff); 404 405 pageHeader = (XLogPageHeader) state->readBuf; 406 407 /* 408 * If we were expecting a continuation record and got an 409 * "overwrite contrecord" flag, that means the continuation record 410 * was overwritten with a different record. Restart the read by 411 * assuming the address to read is the location where we found 412 * this flag; but keep track of the LSN of the record we were 413 * reading, for later verification. 414 */ 415 if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) 416 { 417 state->overwrittenRecPtr = state->currRecPtr; 418 ResetDecoder(state); 419 RecPtr = targetPagePtr; 420 goto restart; 421 } 422 423 /* Check that the continuation on next page looks valid */ 424 if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) 425 { 426 report_invalid_record(state, 427 "there is no contrecord flag at %X/%X", 428 (uint32) (RecPtr >> 32), (uint32) RecPtr); 429 goto err; 430 } 431 432 /* 433 * Cross-check that xlp_rem_len agrees with how much of the record 434 * we expect there to be left. 435 */ 436 if (pageHeader->xlp_rem_len == 0 || 437 total_len != (pageHeader->xlp_rem_len + gotlen)) 438 { 439 report_invalid_record(state, 440 "invalid contrecord length %u at %X/%X", 441 pageHeader->xlp_rem_len, 442 (uint32) (RecPtr >> 32), (uint32) RecPtr); 443 goto err; 444 } 445 446 /* Append the continuation from this page to the buffer */ 447 pageHeaderSize = XLogPageHeaderSize(pageHeader); 448 449 if (readOff < pageHeaderSize) 450 readOff = ReadPageInternal(state, targetPagePtr, 451 pageHeaderSize); 452 453 Assert(pageHeaderSize <= readOff); 454 455 contdata = (char *) state->readBuf + pageHeaderSize; 456 len = XLOG_BLCKSZ - pageHeaderSize; 457 if (pageHeader->xlp_rem_len < len) 458 len = pageHeader->xlp_rem_len; 459 460 if (readOff < pageHeaderSize + len) 461 readOff = ReadPageInternal(state, targetPagePtr, 462 pageHeaderSize + len); 463 464 memcpy(buffer, (char *) contdata, len); 465 buffer += len; 466 gotlen += len; 467 468 /* If we just reassembled the record header, validate it. */ 469 if (!gotheader) 470 { 471 record = (XLogRecord *) state->readRecordBuf; 472 if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, 473 record, randAccess)) 474 goto err; 475 gotheader = true; 476 } 477 } while (gotlen < total_len); 478 479 Assert(gotheader); 480 481 record = (XLogRecord *) state->readRecordBuf; 482 if (!ValidXLogRecord(state, record, RecPtr)) 483 goto err; 484 485 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); 486 state->ReadRecPtr = RecPtr; 487 state->EndRecPtr = targetPagePtr + pageHeaderSize 488 + MAXALIGN(pageHeader->xlp_rem_len); 489 } 490 else 491 { 492 /* Wait for the record data to become available */ 493 readOff = ReadPageInternal(state, targetPagePtr, 494 Min(targetRecOff + total_len, XLOG_BLCKSZ)); 495 if (readOff < 0) 496 goto err; 497 498 /* Record does not cross a page boundary */ 499 if (!ValidXLogRecord(state, record, RecPtr)) 500 goto err; 501 502 state->EndRecPtr = RecPtr + MAXALIGN(total_len); 503 504 state->ReadRecPtr = RecPtr; 505 memcpy(state->readRecordBuf, record, total_len); 506 } 507 508 /* 509 * Special processing if it's an XLOG SWITCH record 510 */ 511 if (record->xl_rmid == RM_XLOG_ID && 512 (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) 513 { 514 /* Pretend it extends to end of segment */ 515 state->EndRecPtr += state->wal_segment_size - 1; 516 state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->wal_segment_size); 517 } 518 519 if (DecodeXLogRecord(state, record, errormsg)) 520 return record; 521 else 522 return NULL; 523 524 err: 525 if (assembled) 526 { 527 /* 528 * We get here when a record that spans multiple pages needs to be 529 * assembled, but something went wrong -- perhaps a contrecord piece 530 * was lost. If caller is WAL replay, it will know where the aborted 531 * record was and where to direct followup WAL to be written, marking 532 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will 533 * in turn signal downstream WAL consumers that the broken WAL record 534 * is to be ignored. 535 */ 536 state->abortedRecPtr = RecPtr; 537 state->missingContrecPtr = targetPagePtr; 538 } 539 540 /* 541 * Invalidate the read state. We might read from a different source after 542 * failure. 543 */ 544 XLogReaderInvalReadState(state); 545 546 if (state->errormsg_buf[0] != '\0') 547 *errormsg = state->errormsg_buf; 548 549 return NULL; 550 } 551 552 /* 553 * Read a single xlog page including at least [pageptr, reqLen] of valid data 554 * via the read_page() callback. 555 * 556 * Returns -1 if the required page cannot be read for some reason; errormsg_buf 557 * is set in that case (unless the error occurs in the read_page callback). 558 * 559 * We fetch the page from a reader-local cache if we know we have the required 560 * data and if there hasn't been any error since caching the data. 561 */ 562 static int 563 ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) 564 { 565 int readLen; 566 uint32 targetPageOff; 567 XLogSegNo targetSegNo; 568 XLogPageHeader hdr; 569 570 Assert((pageptr % XLOG_BLCKSZ) == 0); 571 572 XLByteToSeg(pageptr, targetSegNo, state->wal_segment_size); 573 targetPageOff = XLogSegmentOffset(pageptr, state->wal_segment_size); 574 575 /* check whether we have all the requested data already */ 576 if (targetSegNo == state->readSegNo && targetPageOff == state->readOff && 577 reqLen < state->readLen) 578 return state->readLen; 579 580 /* 581 * Data is not in our buffer. 582 * 583 * Every time we actually read the page, even if we looked at parts of it 584 * before, we need to do verification as the read_page callback might now 585 * be rereading data from a different source. 586 * 587 * Whenever switching to a new WAL segment, we read the first page of the 588 * file and validate its header, even if that's not where the target 589 * record is. This is so that we can check the additional identification 590 * info that is present in the first page's "long" header. 591 */ 592 if (targetSegNo != state->readSegNo && targetPageOff != 0) 593 { 594 XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; 595 596 readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, 597 state->currRecPtr, 598 state->readBuf, &state->readPageTLI); 599 if (readLen < 0) 600 goto err; 601 602 /* we can be sure to have enough WAL available, we scrolled back */ 603 Assert(readLen == XLOG_BLCKSZ); 604 605 if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, 606 state->readBuf)) 607 goto err; 608 } 609 610 /* 611 * First, read the requested data length, but at least a short page header 612 * so that we can validate it. 613 */ 614 readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), 615 state->currRecPtr, 616 state->readBuf, &state->readPageTLI); 617 if (readLen < 0) 618 goto err; 619 620 Assert(readLen <= XLOG_BLCKSZ); 621 622 /* Do we have enough data to check the header length? */ 623 if (readLen <= SizeOfXLogShortPHD) 624 goto err; 625 626 Assert(readLen >= reqLen); 627 628 hdr = (XLogPageHeader) state->readBuf; 629 630 /* still not enough */ 631 if (readLen < XLogPageHeaderSize(hdr)) 632 { 633 readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), 634 state->currRecPtr, 635 state->readBuf, &state->readPageTLI); 636 if (readLen < 0) 637 goto err; 638 } 639 640 /* 641 * Now that we know we have the full header, validate it. 642 */ 643 if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) 644 goto err; 645 646 /* update read state information */ 647 state->readSegNo = targetSegNo; 648 state->readOff = targetPageOff; 649 state->readLen = readLen; 650 651 return readLen; 652 653 err: 654 XLogReaderInvalReadState(state); 655 return -1; 656 } 657 658 /* 659 * Invalidate the xlogreader's read state to force a re-read. 660 */ 661 void 662 XLogReaderInvalReadState(XLogReaderState *state) 663 { 664 state->readSegNo = 0; 665 state->readOff = 0; 666 state->readLen = 0; 667 } 668 669 /* 670 * Validate an XLOG record header. 671 * 672 * This is just a convenience subroutine to avoid duplicated code in 673 * XLogReadRecord. It's not intended for use from anywhere else. 674 */ 675 static bool 676 ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, 677 XLogRecPtr PrevRecPtr, XLogRecord *record, 678 bool randAccess) 679 { 680 if (record->xl_tot_len < SizeOfXLogRecord) 681 { 682 report_invalid_record(state, 683 "invalid record length at %X/%X: wanted %u, got %u", 684 (uint32) (RecPtr >> 32), (uint32) RecPtr, 685 (uint32) SizeOfXLogRecord, record->xl_tot_len); 686 return false; 687 } 688 if (record->xl_rmid > RM_MAX_ID) 689 { 690 report_invalid_record(state, 691 "invalid resource manager ID %u at %X/%X", 692 record->xl_rmid, (uint32) (RecPtr >> 32), 693 (uint32) RecPtr); 694 return false; 695 } 696 if (randAccess) 697 { 698 /* 699 * We can't exactly verify the prev-link, but surely it should be less 700 * than the record's own address. 701 */ 702 if (!(record->xl_prev < RecPtr)) 703 { 704 report_invalid_record(state, 705 "record with incorrect prev-link %X/%X at %X/%X", 706 (uint32) (record->xl_prev >> 32), 707 (uint32) record->xl_prev, 708 (uint32) (RecPtr >> 32), (uint32) RecPtr); 709 return false; 710 } 711 } 712 else 713 { 714 /* 715 * Record's prev-link should exactly match our previous location. This 716 * check guards against torn WAL pages where a stale but valid-looking 717 * WAL record starts on a sector boundary. 718 */ 719 if (record->xl_prev != PrevRecPtr) 720 { 721 report_invalid_record(state, 722 "record with incorrect prev-link %X/%X at %X/%X", 723 (uint32) (record->xl_prev >> 32), 724 (uint32) record->xl_prev, 725 (uint32) (RecPtr >> 32), (uint32) RecPtr); 726 return false; 727 } 728 } 729 730 return true; 731 } 732 733 734 /* 735 * CRC-check an XLOG record. We do not believe the contents of an XLOG 736 * record (other than to the minimal extent of computing the amount of 737 * data to read in) until we've checked the CRCs. 738 * 739 * We assume all of the record (that is, xl_tot_len bytes) has been read 740 * into memory at *record. Also, ValidXLogRecordHeader() has accepted the 741 * record's header, which means in particular that xl_tot_len is at least 742 * SizeOfXlogRecord. 743 */ 744 static bool 745 ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) 746 { 747 pg_crc32c crc; 748 749 /* Calculate the CRC */ 750 INIT_CRC32C(crc); 751 COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); 752 /* include the record header last */ 753 COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); 754 FIN_CRC32C(crc); 755 756 if (!EQ_CRC32C(record->xl_crc, crc)) 757 { 758 report_invalid_record(state, 759 "incorrect resource manager data checksum in record at %X/%X", 760 (uint32) (recptr >> 32), (uint32) recptr); 761 return false; 762 } 763 764 return true; 765 } 766 767 /* 768 * Validate a page header. 769 * 770 * Check if 'phdr' is valid as the header of the XLog page at position 771 * 'recptr'. 772 */ 773 bool 774 XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, 775 char *phdr) 776 { 777 XLogRecPtr recaddr; 778 XLogSegNo segno; 779 int32 offset; 780 XLogPageHeader hdr = (XLogPageHeader) phdr; 781 782 Assert((recptr % XLOG_BLCKSZ) == 0); 783 784 XLByteToSeg(recptr, segno, state->wal_segment_size); 785 offset = XLogSegmentOffset(recptr, state->wal_segment_size); 786 787 XLogSegNoOffsetToRecPtr(segno, offset, state->wal_segment_size, recaddr); 788 789 if (hdr->xlp_magic != XLOG_PAGE_MAGIC) 790 { 791 char fname[MAXFNAMELEN]; 792 793 XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); 794 795 report_invalid_record(state, 796 "invalid magic number %04X in log segment %s, offset %u", 797 hdr->xlp_magic, 798 fname, 799 offset); 800 return false; 801 } 802 803 if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0) 804 { 805 char fname[MAXFNAMELEN]; 806 807 XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); 808 809 report_invalid_record(state, 810 "invalid info bits %04X in log segment %s, offset %u", 811 hdr->xlp_info, 812 fname, 813 offset); 814 return false; 815 } 816 817 if (hdr->xlp_info & XLP_LONG_HEADER) 818 { 819 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr; 820 821 if (state->system_identifier && 822 longhdr->xlp_sysid != state->system_identifier) 823 { 824 char fhdrident_str[32]; 825 char sysident_str[32]; 826 827 /* 828 * Format sysids separately to keep platform-dependent format code 829 * out of the translatable message string. 830 */ 831 snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT, 832 longhdr->xlp_sysid); 833 snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT, 834 state->system_identifier); 835 report_invalid_record(state, 836 "WAL file is from different database system: WAL file database system identifier is %s, pg_control database system identifier is %s", 837 fhdrident_str, sysident_str); 838 return false; 839 } 840 else if (longhdr->xlp_seg_size != state->wal_segment_size) 841 { 842 report_invalid_record(state, 843 "WAL file is from different database system: incorrect segment size in page header"); 844 return false; 845 } 846 else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) 847 { 848 report_invalid_record(state, 849 "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header"); 850 return false; 851 } 852 } 853 else if (offset == 0) 854 { 855 char fname[MAXFNAMELEN]; 856 857 XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); 858 859 /* hmm, first page of file doesn't have a long header? */ 860 report_invalid_record(state, 861 "invalid info bits %04X in log segment %s, offset %u", 862 hdr->xlp_info, 863 fname, 864 offset); 865 return false; 866 } 867 868 /* 869 * Check that the address on the page agrees with what we expected. This 870 * check typically fails when an old WAL segment is recycled, and hasn't 871 * yet been overwritten with new data yet. 872 */ 873 if (hdr->xlp_pageaddr != recaddr) 874 { 875 char fname[MAXFNAMELEN]; 876 877 XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); 878 879 report_invalid_record(state, 880 "unexpected pageaddr %X/%X in log segment %s, offset %u", 881 (uint32) (hdr->xlp_pageaddr >> 32), (uint32) hdr->xlp_pageaddr, 882 fname, 883 offset); 884 return false; 885 } 886 887 /* 888 * Since child timelines are always assigned a TLI greater than their 889 * immediate parent's TLI, we should never see TLI go backwards across 890 * successive pages of a consistent WAL sequence. 891 * 892 * Sometimes we re-read a segment that's already been (partially) read. So 893 * we only verify TLIs for pages that are later than the last remembered 894 * LSN. 895 */ 896 if (recptr > state->latestPagePtr) 897 { 898 if (hdr->xlp_tli < state->latestPageTLI) 899 { 900 char fname[MAXFNAMELEN]; 901 902 XLogFileName(fname, state->readPageTLI, segno, state->wal_segment_size); 903 904 report_invalid_record(state, 905 "out-of-sequence timeline ID %u (after %u) in log segment %s, offset %u", 906 hdr->xlp_tli, 907 state->latestPageTLI, 908 fname, 909 offset); 910 return false; 911 } 912 } 913 state->latestPagePtr = recptr; 914 state->latestPageTLI = hdr->xlp_tli; 915 916 return true; 917 } 918 919 #ifdef FRONTEND 920 /* 921 * Functions that are currently not needed in the backend, but are better 922 * implemented inside xlogreader.c because of the internal facilities available 923 * here. 924 */ 925 926 /* 927 * Find the first record with an lsn >= RecPtr. 928 * 929 * Useful for checking whether RecPtr is a valid xlog address for reading, and 930 * to find the first valid address after some address when dumping records for 931 * debugging purposes. 932 */ 933 XLogRecPtr 934 XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) 935 { 936 XLogReaderState saved_state = *state; 937 XLogRecPtr tmpRecPtr; 938 XLogRecPtr found = InvalidXLogRecPtr; 939 XLogPageHeader header; 940 char *errormsg; 941 942 Assert(!XLogRecPtrIsInvalid(RecPtr)); 943 944 /* 945 * skip over potential continuation data, keeping in mind that it may span 946 * multiple pages 947 */ 948 tmpRecPtr = RecPtr; 949 while (true) 950 { 951 XLogRecPtr targetPagePtr; 952 int targetRecOff; 953 uint32 pageHeaderSize; 954 int readLen; 955 956 /* 957 * Compute targetRecOff. It should typically be equal or greater than 958 * short page-header since a valid record can't start anywhere before 959 * that, except when caller has explicitly specified the offset that 960 * falls somewhere there or when we are skipping multi-page 961 * continuation record. It doesn't matter though because 962 * ReadPageInternal() is prepared to handle that and will read at 963 * least short page-header worth of data 964 */ 965 targetRecOff = tmpRecPtr % XLOG_BLCKSZ; 966 967 /* scroll back to page boundary */ 968 targetPagePtr = tmpRecPtr - targetRecOff; 969 970 /* Read the page containing the record */ 971 readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); 972 if (readLen < 0) 973 goto err; 974 975 header = (XLogPageHeader) state->readBuf; 976 977 pageHeaderSize = XLogPageHeaderSize(header); 978 979 /* make sure we have enough data for the page header */ 980 readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); 981 if (readLen < 0) 982 goto err; 983 984 /* skip over potential continuation data */ 985 if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) 986 { 987 /* 988 * If the length of the remaining continuation data is more than 989 * what can fit in this page, the continuation record crosses over 990 * this page. Read the next page and try again. xlp_rem_len in the 991 * next page header will contain the remaining length of the 992 * continuation data 993 * 994 * Note that record headers are MAXALIGN'ed 995 */ 996 if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize)) 997 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ; 998 else 999 { 1000 /* 1001 * The previous continuation record ends in this page. Set 1002 * tmpRecPtr to point to the first valid record 1003 */ 1004 tmpRecPtr = targetPagePtr + pageHeaderSize 1005 + MAXALIGN(header->xlp_rem_len); 1006 break; 1007 } 1008 } 1009 else 1010 { 1011 tmpRecPtr = targetPagePtr + pageHeaderSize; 1012 break; 1013 } 1014 } 1015 1016 /* 1017 * we know now that tmpRecPtr is an address pointing to a valid XLogRecord 1018 * because either we're at the first record after the beginning of a page 1019 * or we just jumped over the remaining data of a continuation. 1020 */ 1021 while (XLogReadRecord(state, tmpRecPtr, &errormsg) != NULL) 1022 { 1023 /* continue after the record */ 1024 tmpRecPtr = InvalidXLogRecPtr; 1025 1026 /* past the record we've found, break out */ 1027 if (RecPtr <= state->ReadRecPtr) 1028 { 1029 found = state->ReadRecPtr; 1030 goto out; 1031 } 1032 } 1033 1034 err: 1035 out: 1036 /* Reset state to what we had before finding the record */ 1037 state->ReadRecPtr = saved_state.ReadRecPtr; 1038 state->EndRecPtr = saved_state.EndRecPtr; 1039 XLogReaderInvalReadState(state); 1040 1041 return found; 1042 } 1043 1044 #endif /* FRONTEND */ 1045 1046 1047 /* ---------------------------------------- 1048 * Functions for decoding the data and block references in a record. 1049 * ---------------------------------------- 1050 */ 1051 1052 /* private function to reset the state between records */ 1053 static void 1054 ResetDecoder(XLogReaderState *state) 1055 { 1056 int block_id; 1057 1058 state->decoded_record = NULL; 1059 1060 state->main_data_len = 0; 1061 1062 for (block_id = 0; block_id <= state->max_block_id; block_id++) 1063 { 1064 state->blocks[block_id].in_use = false; 1065 state->blocks[block_id].has_image = false; 1066 state->blocks[block_id].has_data = false; 1067 state->blocks[block_id].apply_image = false; 1068 } 1069 state->max_block_id = -1; 1070 } 1071 1072 /* 1073 * Decode the previously read record. 1074 * 1075 * On error, a human-readable error message is returned in *errormsg, and 1076 * the return value is false. 1077 */ 1078 bool 1079 DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) 1080 { 1081 /* 1082 * read next _size bytes from record buffer, but check for overrun first. 1083 */ 1084 #define COPY_HEADER_FIELD(_dst, _size) \ 1085 do { \ 1086 if (remaining < _size) \ 1087 goto shortdata_err; \ 1088 memcpy(_dst, ptr, _size); \ 1089 ptr += _size; \ 1090 remaining -= _size; \ 1091 } while(0) 1092 1093 char *ptr; 1094 uint32 remaining; 1095 uint32 datatotal; 1096 RelFileNode *rnode = NULL; 1097 uint8 block_id; 1098 1099 ResetDecoder(state); 1100 1101 state->decoded_record = record; 1102 state->record_origin = InvalidRepOriginId; 1103 1104 ptr = (char *) record; 1105 ptr += SizeOfXLogRecord; 1106 remaining = record->xl_tot_len - SizeOfXLogRecord; 1107 1108 /* Decode the headers */ 1109 datatotal = 0; 1110 while (remaining > datatotal) 1111 { 1112 COPY_HEADER_FIELD(&block_id, sizeof(uint8)); 1113 1114 if (block_id == XLR_BLOCK_ID_DATA_SHORT) 1115 { 1116 /* XLogRecordDataHeaderShort */ 1117 uint8 main_data_len; 1118 1119 COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); 1120 1121 state->main_data_len = main_data_len; 1122 datatotal += main_data_len; 1123 break; /* by convention, the main data fragment is 1124 * always last */ 1125 } 1126 else if (block_id == XLR_BLOCK_ID_DATA_LONG) 1127 { 1128 /* XLogRecordDataHeaderLong */ 1129 uint32 main_data_len; 1130 1131 COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); 1132 state->main_data_len = main_data_len; 1133 datatotal += main_data_len; 1134 break; /* by convention, the main data fragment is 1135 * always last */ 1136 } 1137 else if (block_id == XLR_BLOCK_ID_ORIGIN) 1138 { 1139 COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); 1140 } 1141 else if (block_id <= XLR_MAX_BLOCK_ID) 1142 { 1143 /* XLogRecordBlockHeader */ 1144 DecodedBkpBlock *blk; 1145 uint8 fork_flags; 1146 1147 if (block_id <= state->max_block_id) 1148 { 1149 report_invalid_record(state, 1150 "out-of-order block_id %u at %X/%X", 1151 block_id, 1152 (uint32) (state->ReadRecPtr >> 32), 1153 (uint32) state->ReadRecPtr); 1154 goto err; 1155 } 1156 state->max_block_id = block_id; 1157 1158 blk = &state->blocks[block_id]; 1159 blk->in_use = true; 1160 blk->apply_image = false; 1161 1162 COPY_HEADER_FIELD(&fork_flags, sizeof(uint8)); 1163 blk->forknum = fork_flags & BKPBLOCK_FORK_MASK; 1164 blk->flags = fork_flags; 1165 blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0); 1166 blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0); 1167 1168 COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16)); 1169 /* cross-check that the HAS_DATA flag is set iff data_length > 0 */ 1170 if (blk->has_data && blk->data_len == 0) 1171 { 1172 report_invalid_record(state, 1173 "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", 1174 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); 1175 goto err; 1176 } 1177 if (!blk->has_data && blk->data_len != 0) 1178 { 1179 report_invalid_record(state, 1180 "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", 1181 (unsigned int) blk->data_len, 1182 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); 1183 goto err; 1184 } 1185 datatotal += blk->data_len; 1186 1187 if (blk->has_image) 1188 { 1189 COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16)); 1190 COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16)); 1191 COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8)); 1192 1193 blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0); 1194 1195 if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED) 1196 { 1197 if (blk->bimg_info & BKPIMAGE_HAS_HOLE) 1198 COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16)); 1199 else 1200 blk->hole_length = 0; 1201 } 1202 else 1203 blk->hole_length = BLCKSZ - blk->bimg_len; 1204 datatotal += blk->bimg_len; 1205 1206 /* 1207 * cross-check that hole_offset > 0, hole_length > 0 and 1208 * bimg_len < BLCKSZ if the HAS_HOLE flag is set. 1209 */ 1210 if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) && 1211 (blk->hole_offset == 0 || 1212 blk->hole_length == 0 || 1213 blk->bimg_len == BLCKSZ)) 1214 { 1215 report_invalid_record(state, 1216 "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", 1217 (unsigned int) blk->hole_offset, 1218 (unsigned int) blk->hole_length, 1219 (unsigned int) blk->bimg_len, 1220 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); 1221 goto err; 1222 } 1223 1224 /* 1225 * cross-check that hole_offset == 0 and hole_length == 0 if 1226 * the HAS_HOLE flag is not set. 1227 */ 1228 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) && 1229 (blk->hole_offset != 0 || blk->hole_length != 0)) 1230 { 1231 report_invalid_record(state, 1232 "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", 1233 (unsigned int) blk->hole_offset, 1234 (unsigned int) blk->hole_length, 1235 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); 1236 goto err; 1237 } 1238 1239 /* 1240 * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED 1241 * flag is set. 1242 */ 1243 if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) && 1244 blk->bimg_len == BLCKSZ) 1245 { 1246 report_invalid_record(state, 1247 "BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X", 1248 (unsigned int) blk->bimg_len, 1249 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); 1250 goto err; 1251 } 1252 1253 /* 1254 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE nor 1255 * IS_COMPRESSED flag is set. 1256 */ 1257 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) && 1258 !(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) && 1259 blk->bimg_len != BLCKSZ) 1260 { 1261 report_invalid_record(state, 1262 "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X", 1263 (unsigned int) blk->data_len, 1264 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); 1265 goto err; 1266 } 1267 } 1268 if (!(fork_flags & BKPBLOCK_SAME_REL)) 1269 { 1270 COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode)); 1271 rnode = &blk->rnode; 1272 } 1273 else 1274 { 1275 if (rnode == NULL) 1276 { 1277 report_invalid_record(state, 1278 "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", 1279 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); 1280 goto err; 1281 } 1282 1283 blk->rnode = *rnode; 1284 } 1285 COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber)); 1286 } 1287 else 1288 { 1289 report_invalid_record(state, 1290 "invalid block_id %u at %X/%X", 1291 block_id, 1292 (uint32) (state->ReadRecPtr >> 32), 1293 (uint32) state->ReadRecPtr); 1294 goto err; 1295 } 1296 } 1297 1298 if (remaining != datatotal) 1299 goto shortdata_err; 1300 1301 /* 1302 * Ok, we've parsed the fragment headers, and verified that the total 1303 * length of the payload in the fragments is equal to the amount of data 1304 * left. Copy the data of each fragment to a separate buffer. 1305 * 1306 * We could just set up pointers into readRecordBuf, but we want to align 1307 * the data for the convenience of the callers. Backup images are not 1308 * copied, however; they don't need alignment. 1309 */ 1310 1311 /* block data first */ 1312 for (block_id = 0; block_id <= state->max_block_id; block_id++) 1313 { 1314 DecodedBkpBlock *blk = &state->blocks[block_id]; 1315 1316 if (!blk->in_use) 1317 continue; 1318 1319 Assert(blk->has_image || !blk->apply_image); 1320 1321 if (blk->has_image) 1322 { 1323 blk->bkp_image = ptr; 1324 ptr += blk->bimg_len; 1325 } 1326 if (blk->has_data) 1327 { 1328 if (!blk->data || blk->data_len > blk->data_bufsz) 1329 { 1330 if (blk->data) 1331 pfree(blk->data); 1332 1333 /* 1334 * Force the initial request to be BLCKSZ so that we don't 1335 * waste time with lots of trips through this stanza as a 1336 * result of WAL compression. 1337 */ 1338 blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ)); 1339 blk->data = palloc(blk->data_bufsz); 1340 } 1341 memcpy(blk->data, ptr, blk->data_len); 1342 ptr += blk->data_len; 1343 } 1344 } 1345 1346 /* and finally, the main data */ 1347 if (state->main_data_len > 0) 1348 { 1349 if (!state->main_data || state->main_data_len > state->main_data_bufsz) 1350 { 1351 if (state->main_data) 1352 pfree(state->main_data); 1353 1354 /* 1355 * main_data_bufsz must be MAXALIGN'ed. In many xlog record 1356 * types, we omit trailing struct padding on-disk to save a few 1357 * bytes; but compilers may generate accesses to the xlog struct 1358 * that assume that padding bytes are present. If the palloc 1359 * request is not large enough to include such padding bytes then 1360 * we'll get valgrind complaints due to otherwise-harmless fetches 1361 * of the padding bytes. 1362 * 1363 * In addition, force the initial request to be reasonably large 1364 * so that we don't waste time with lots of trips through this 1365 * stanza. BLCKSZ / 2 seems like a good compromise choice. 1366 */ 1367 state->main_data_bufsz = MAXALIGN(Max(state->main_data_len, 1368 BLCKSZ / 2)); 1369 state->main_data = palloc(state->main_data_bufsz); 1370 } 1371 memcpy(state->main_data, ptr, state->main_data_len); 1372 ptr += state->main_data_len; 1373 } 1374 1375 return true; 1376 1377 shortdata_err: 1378 report_invalid_record(state, 1379 "record with invalid length at %X/%X", 1380 (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); 1381 err: 1382 *errormsg = state->errormsg_buf; 1383 1384 return false; 1385 } 1386 1387 /* 1388 * Returns information about the block that a block reference refers to. 1389 * 1390 * If the WAL record contains a block reference with the given ID, *rnode, 1391 * *forknum, and *blknum are filled in (if not NULL), and returns true. 1392 * Otherwise returns false. 1393 */ 1394 bool 1395 XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, 1396 RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum) 1397 { 1398 DecodedBkpBlock *bkpb; 1399 1400 if (!record->blocks[block_id].in_use) 1401 return false; 1402 1403 bkpb = &record->blocks[block_id]; 1404 if (rnode) 1405 *rnode = bkpb->rnode; 1406 if (forknum) 1407 *forknum = bkpb->forknum; 1408 if (blknum) 1409 *blknum = bkpb->blkno; 1410 return true; 1411 } 1412 1413 /* 1414 * Returns the data associated with a block reference, or NULL if there is 1415 * no data (e.g. because a full-page image was taken instead). The returned 1416 * pointer points to a MAXALIGNed buffer. 1417 */ 1418 char * 1419 XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) 1420 { 1421 DecodedBkpBlock *bkpb; 1422 1423 if (!record->blocks[block_id].in_use) 1424 return NULL; 1425 1426 bkpb = &record->blocks[block_id]; 1427 1428 if (!bkpb->has_data) 1429 { 1430 if (len) 1431 *len = 0; 1432 return NULL; 1433 } 1434 else 1435 { 1436 if (len) 1437 *len = bkpb->data_len; 1438 return bkpb->data; 1439 } 1440 } 1441 1442 /* 1443 * Restore a full-page image from a backup block attached to an XLOG record. 1444 * 1445 * Returns true if a full-page image is restored. 1446 */ 1447 bool 1448 RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) 1449 { 1450 DecodedBkpBlock *bkpb; 1451 char *ptr; 1452 PGAlignedBlock tmp; 1453 1454 if (!record->blocks[block_id].in_use) 1455 return false; 1456 if (!record->blocks[block_id].has_image) 1457 return false; 1458 1459 bkpb = &record->blocks[block_id]; 1460 ptr = bkpb->bkp_image; 1461 1462 if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED) 1463 { 1464 /* If a backup block image is compressed, decompress it */ 1465 if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data, 1466 BLCKSZ - bkpb->hole_length) < 0) 1467 { 1468 report_invalid_record(record, "invalid compressed image at %X/%X, block %d", 1469 (uint32) (record->ReadRecPtr >> 32), 1470 (uint32) record->ReadRecPtr, 1471 block_id); 1472 return false; 1473 } 1474 ptr = tmp.data; 1475 } 1476 1477 /* generate page, taking into account hole if necessary */ 1478 if (bkpb->hole_length == 0) 1479 { 1480 memcpy(page, ptr, BLCKSZ); 1481 } 1482 else 1483 { 1484 memcpy(page, ptr, bkpb->hole_offset); 1485 /* must zero-fill the hole */ 1486 MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length); 1487 memcpy(page + (bkpb->hole_offset + bkpb->hole_length), 1488 ptr + bkpb->hole_offset, 1489 BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); 1490 } 1491 1492 return true; 1493 } 1494