1 /*-------------------------------------------------------------------------
2 *
3 * xloginsert.c
4 * Functions for constructing WAL records
5 *
6 * Constructing a WAL record begins with a call to XLogBeginInsert,
7 * followed by a number of XLogRegister* calls. The registered data is
8 * collected in private working memory, and finally assembled into a chain
9 * of XLogRecData structs by a call to XLogRecordAssemble(). See
10 * access/transam/README for details.
11 *
12 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
13 * Portions Copyright (c) 1994, Regents of the University of California
14 *
15 * src/backend/access/transam/xloginsert.c
16 *
17 *-------------------------------------------------------------------------
18 */
19
20 #include "postgres.h"
21
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xloginsert.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "miscadmin.h"
29 #include "replication/origin.h"
30 #include "storage/bufmgr.h"
31 #include "storage/proc.h"
32 #include "utils/memutils.h"
33 #include "pg_trace.h"
34
35 /* Buffer size required to store a compressed version of backup block image */
36 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
37
38 /*
39 * For each block reference registered with XLogRegisterBuffer, we fill in
40 * a registered_buffer struct.
41 */
42 typedef struct
43 {
44 bool in_use; /* is this slot in use? */
45 uint8 flags; /* REGBUF_* flags */
46 RelFileNode rnode; /* identifies the relation and block */
47 ForkNumber forkno;
48 BlockNumber block;
49 Page page; /* page content */
50 uint32 rdata_len; /* total length of data in rdata chain */
51 XLogRecData *rdata_head; /* head of the chain of data registered with
52 * this block */
53 XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if
54 * empty */
55
56 XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
57 * backup block data in XLogRecordAssemble() */
58
59 /* buffer to store a compressed version of backup block image */
60 char compressed_page[PGLZ_MAX_BLCKSZ];
61 } registered_buffer;
62
63 static registered_buffer *registered_buffers;
64 static int max_registered_buffers; /* allocated size */
65 static int max_registered_block_id = 0; /* highest block_id + 1 currently
66 * registered */
67
68 /*
69 * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
70 * with XLogRegisterData(...).
71 */
72 static XLogRecData *mainrdata_head;
73 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
74 static uint32 mainrdata_len; /* total # of bytes in chain */
75
76 /* flags for the in-progress insertion */
77 static uint8 curinsert_flags = 0;
78
79 /*
80 * These are used to hold the record header while constructing a record.
81 * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
82 * because we want it to be MAXALIGNed and padding bytes zeroed.
83 *
84 * For simplicity, it's allocated large enough to hold the headers for any
85 * WAL record.
86 */
87 static XLogRecData hdr_rdt;
88 static char *hdr_scratch = NULL;
89
90 #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
91
92 #define HEADER_SCRATCH_SIZE \
93 (SizeOfXLogRecord + \
94 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
95 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
96
97 /*
98 * An array of XLogRecData structs, to hold registered data.
99 */
100 static XLogRecData *rdatas;
101 static int num_rdatas; /* entries currently used */
102 static int max_rdatas; /* allocated size */
103
104 static bool begininsert_called = false;
105
106 /* Memory context to hold the registered buffer and data references. */
107 static MemoryContext xloginsert_cxt;
108
109 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
110 XLogRecPtr RedoRecPtr, bool doPageWrites,
111 XLogRecPtr *fpw_lsn);
112 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
113 uint16 hole_length, char *dest, uint16 *dlen);
114
115 /*
116 * Begin constructing a WAL record. This must be called before the
117 * XLogRegister* functions and XLogInsert().
118 */
119 void
XLogBeginInsert(void)120 XLogBeginInsert(void)
121 {
122 Assert(max_registered_block_id == 0);
123 Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
124 Assert(mainrdata_len == 0);
125
126 /* cross-check on whether we should be here or not */
127 if (!XLogInsertAllowed())
128 elog(ERROR, "cannot make new WAL entries during recovery");
129
130 if (begininsert_called)
131 elog(ERROR, "XLogBeginInsert was already called");
132
133 begininsert_called = true;
134 }
135
136 /*
137 * Ensure that there are enough buffer and data slots in the working area,
138 * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
139 * calls.
140 *
141 * There is always space for a small number of buffers and data chunks, enough
142 * for most record types. This function is for the exceptional cases that need
143 * more.
144 */
145 void
XLogEnsureRecordSpace(int max_block_id,int ndatas)146 XLogEnsureRecordSpace(int max_block_id, int ndatas)
147 {
148 int nbuffers;
149
150 /*
151 * This must be called before entering a critical section, because
152 * allocating memory inside a critical section can fail. repalloc() will
153 * check the same, but better to check it here too so that we fail
154 * consistently even if the arrays happen to be large enough already.
155 */
156 Assert(CritSectionCount == 0);
157
158 /* the minimum values can't be decreased */
159 if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
160 max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
161 if (ndatas < XLR_NORMAL_RDATAS)
162 ndatas = XLR_NORMAL_RDATAS;
163
164 if (max_block_id > XLR_MAX_BLOCK_ID)
165 elog(ERROR, "maximum number of WAL record block references exceeded");
166 nbuffers = max_block_id + 1;
167
168 if (nbuffers > max_registered_buffers)
169 {
170 registered_buffers = (registered_buffer *)
171 repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
172
173 /*
174 * At least the padding bytes in the structs must be zeroed, because
175 * they are included in WAL data, but initialize it all for tidiness.
176 */
177 MemSet(®istered_buffers[max_registered_buffers], 0,
178 (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
179 max_registered_buffers = nbuffers;
180 }
181
182 if (ndatas > max_rdatas)
183 {
184 rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
185 max_rdatas = ndatas;
186 }
187 }
188
189 /*
190 * Reset WAL record construction buffers.
191 */
192 void
XLogResetInsertion(void)193 XLogResetInsertion(void)
194 {
195 int i;
196
197 for (i = 0; i < max_registered_block_id; i++)
198 registered_buffers[i].in_use = false;
199
200 num_rdatas = 0;
201 max_registered_block_id = 0;
202 mainrdata_len = 0;
203 mainrdata_last = (XLogRecData *) &mainrdata_head;
204 curinsert_flags = 0;
205 begininsert_called = false;
206 }
207
208 /*
209 * Register a reference to a buffer with the WAL record being constructed.
210 * This must be called for every page that the WAL-logged operation modifies.
211 */
212 void
XLogRegisterBuffer(uint8 block_id,Buffer buffer,uint8 flags)213 XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
214 {
215 registered_buffer *regbuf;
216
217 /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
218 Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
219 Assert(begininsert_called);
220
221 if (block_id >= max_registered_block_id)
222 {
223 if (block_id >= max_registered_buffers)
224 elog(ERROR, "too many registered buffers");
225 max_registered_block_id = block_id + 1;
226 }
227
228 regbuf = ®istered_buffers[block_id];
229
230 BufferGetTag(buffer, ®buf->rnode, ®buf->forkno, ®buf->block);
231 regbuf->page = BufferGetPage(buffer);
232 regbuf->flags = flags;
233 regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head;
234 regbuf->rdata_len = 0;
235
236 /*
237 * Check that this page hasn't already been registered with some other
238 * block_id.
239 */
240 #ifdef USE_ASSERT_CHECKING
241 {
242 int i;
243
244 for (i = 0; i < max_registered_block_id; i++)
245 {
246 registered_buffer *regbuf_old = ®istered_buffers[i];
247
248 if (i == block_id || !regbuf_old->in_use)
249 continue;
250
251 Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
252 regbuf_old->forkno != regbuf->forkno ||
253 regbuf_old->block != regbuf->block);
254 }
255 }
256 #endif
257
258 regbuf->in_use = true;
259 }
260
261 /*
262 * Like XLogRegisterBuffer, but for registering a block that's not in the
263 * shared buffer pool (i.e. when you don't have a Buffer for it).
264 */
265 void
XLogRegisterBlock(uint8 block_id,RelFileNode * rnode,ForkNumber forknum,BlockNumber blknum,Page page,uint8 flags)266 XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
267 BlockNumber blknum, Page page, uint8 flags)
268 {
269 registered_buffer *regbuf;
270
271 /* This is currently only used to WAL-log a full-page image of a page */
272 Assert(flags & REGBUF_FORCE_IMAGE);
273 Assert(begininsert_called);
274
275 if (block_id >= max_registered_block_id)
276 max_registered_block_id = block_id + 1;
277
278 if (block_id >= max_registered_buffers)
279 elog(ERROR, "too many registered buffers");
280
281 regbuf = ®istered_buffers[block_id];
282
283 regbuf->rnode = *rnode;
284 regbuf->forkno = forknum;
285 regbuf->block = blknum;
286 regbuf->page = page;
287 regbuf->flags = flags;
288 regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head;
289 regbuf->rdata_len = 0;
290
291 /*
292 * Check that this page hasn't already been registered with some other
293 * block_id.
294 */
295 #ifdef USE_ASSERT_CHECKING
296 {
297 int i;
298
299 for (i = 0; i < max_registered_block_id; i++)
300 {
301 registered_buffer *regbuf_old = ®istered_buffers[i];
302
303 if (i == block_id || !regbuf_old->in_use)
304 continue;
305
306 Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
307 regbuf_old->forkno != regbuf->forkno ||
308 regbuf_old->block != regbuf->block);
309 }
310 }
311 #endif
312
313 regbuf->in_use = true;
314 }
315
316 /*
317 * Add data to the WAL record that's being constructed.
318 *
319 * The data is appended to the "main chunk", available at replay with
320 * XLogRecGetData().
321 */
322 void
XLogRegisterData(char * data,int len)323 XLogRegisterData(char *data, int len)
324 {
325 XLogRecData *rdata;
326
327 Assert(begininsert_called);
328
329 if (num_rdatas >= max_rdatas)
330 elog(ERROR, "too much WAL data");
331 rdata = &rdatas[num_rdatas++];
332
333 rdata->data = data;
334 rdata->len = len;
335
336 /*
337 * we use the mainrdata_last pointer to track the end of the chain, so no
338 * need to clear 'next' here.
339 */
340
341 mainrdata_last->next = rdata;
342 mainrdata_last = rdata;
343
344 mainrdata_len += len;
345 }
346
347 /*
348 * Add buffer-specific data to the WAL record that's being constructed.
349 *
350 * Block_id must reference a block previously registered with
351 * XLogRegisterBuffer(). If this is called more than once for the same
352 * block_id, the data is appended.
353 *
354 * The maximum amount of data that can be registered per block is 65535
355 * bytes. That should be plenty; if you need more than BLCKSZ bytes to
356 * reconstruct the changes to the page, you might as well just log a full
357 * copy of it. (the "main data" that's not associated with a block is not
358 * limited)
359 */
360 void
XLogRegisterBufData(uint8 block_id,char * data,int len)361 XLogRegisterBufData(uint8 block_id, char *data, int len)
362 {
363 registered_buffer *regbuf;
364 XLogRecData *rdata;
365
366 Assert(begininsert_called);
367
368 /* find the registered buffer struct */
369 regbuf = ®istered_buffers[block_id];
370 if (!regbuf->in_use)
371 elog(ERROR, "no block with id %d registered with WAL insertion",
372 block_id);
373
374 if (num_rdatas >= max_rdatas)
375 elog(ERROR, "too much WAL data");
376 rdata = &rdatas[num_rdatas++];
377
378 rdata->data = data;
379 rdata->len = len;
380
381 regbuf->rdata_tail->next = rdata;
382 regbuf->rdata_tail = rdata;
383 regbuf->rdata_len += len;
384 }
385
386 /*
387 * Set insert status flags for the upcoming WAL record.
388 *
389 * The flags that can be used here are:
390 * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
391 * included in the record.
392 * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
393 * durability, which allows to avoid triggering WAL archiving and other
394 * background activity.
395 */
396 void
XLogSetRecordFlags(uint8 flags)397 XLogSetRecordFlags(uint8 flags)
398 {
399 Assert(begininsert_called);
400 curinsert_flags = flags;
401 }
402
403 /*
404 * Insert an XLOG record having the specified RMID and info bytes, with the
405 * body of the record being the data and buffer references registered earlier
406 * with XLogRegister* calls.
407 *
408 * Returns XLOG pointer to end of record (beginning of next record).
409 * This can be used as LSN for data pages affected by the logged action.
410 * (LSN is the XLOG point up to which the XLOG must be flushed to disk
411 * before the data page can be written out. This implements the basic
412 * WAL rule "write the log before the data".)
413 */
414 XLogRecPtr
XLogInsert(RmgrId rmid,uint8 info)415 XLogInsert(RmgrId rmid, uint8 info)
416 {
417 XLogRecPtr EndPos;
418
419 /* XLogBeginInsert() must have been called. */
420 if (!begininsert_called)
421 elog(ERROR, "XLogBeginInsert was not called");
422
423 /*
424 * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
425 * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
426 */
427 if ((info & ~(XLR_RMGR_INFO_MASK |
428 XLR_SPECIAL_REL_UPDATE |
429 XLR_CHECK_CONSISTENCY)) != 0)
430 elog(PANIC, "invalid xlog info mask %02X", info);
431
432 TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
433
434 /*
435 * In bootstrap mode, we don't actually log anything but XLOG resources;
436 * return a phony record pointer.
437 */
438 if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
439 {
440 XLogResetInsertion();
441 EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
442 return EndPos;
443 }
444
445 do
446 {
447 XLogRecPtr RedoRecPtr;
448 bool doPageWrites;
449 XLogRecPtr fpw_lsn;
450 XLogRecData *rdt;
451
452 /*
453 * Get values needed to decide whether to do full-page writes. Since
454 * we don't yet have an insertion lock, these could change under us,
455 * but XLogInsertRecord will recheck them once it has a lock.
456 */
457 GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
458
459 rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
460 &fpw_lsn);
461
462 EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
463 } while (EndPos == InvalidXLogRecPtr);
464
465 XLogResetInsertion();
466
467 return EndPos;
468 }
469
470 /*
471 * Assemble a WAL record from the registered data and buffers into an
472 * XLogRecData chain, ready for insertion with XLogInsertRecord().
473 *
474 * The record header fields are filled in, except for the xl_prev field. The
475 * calculated CRC does not include the record header yet.
476 *
477 * If there are any registered buffers, and a full-page image was not taken
478 * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
479 * signals that the assembled record is only good for insertion on the
480 * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
481 */
482 static XLogRecData *
XLogRecordAssemble(RmgrId rmid,uint8 info,XLogRecPtr RedoRecPtr,bool doPageWrites,XLogRecPtr * fpw_lsn)483 XLogRecordAssemble(RmgrId rmid, uint8 info,
484 XLogRecPtr RedoRecPtr, bool doPageWrites,
485 XLogRecPtr *fpw_lsn)
486 {
487 XLogRecData *rdt;
488 uint32 total_len = 0;
489 int block_id;
490 pg_crc32c rdata_crc;
491 registered_buffer *prev_regbuf = NULL;
492 XLogRecData *rdt_datas_last;
493 XLogRecord *rechdr;
494 char *scratch = hdr_scratch;
495
496 /*
497 * Note: this function can be called multiple times for the same record.
498 * All the modifications we do to the rdata chains below must handle that.
499 */
500
501 /* The record begins with the fixed-size header */
502 rechdr = (XLogRecord *) scratch;
503 scratch += SizeOfXLogRecord;
504
505 hdr_rdt.next = NULL;
506 rdt_datas_last = &hdr_rdt;
507 hdr_rdt.data = hdr_scratch;
508
509 /*
510 * Enforce consistency checks for this record if user is looking for it.
511 * Do this before at the beginning of this routine to give the possibility
512 * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
513 * a record.
514 */
515 if (wal_consistency_checking[rmid])
516 info |= XLR_CHECK_CONSISTENCY;
517
518 /*
519 * Make an rdata chain containing all the data portions of all block
520 * references. This includes the data for full-page images. Also append
521 * the headers for the block references in the scratch buffer.
522 */
523 *fpw_lsn = InvalidXLogRecPtr;
524 for (block_id = 0; block_id < max_registered_block_id; block_id++)
525 {
526 registered_buffer *regbuf = ®istered_buffers[block_id];
527 bool needs_backup;
528 bool needs_data;
529 XLogRecordBlockHeader bkpb;
530 XLogRecordBlockImageHeader bimg;
531 XLogRecordBlockCompressHeader cbimg = {0};
532 bool samerel;
533 bool is_compressed = false;
534 bool include_image;
535
536 if (!regbuf->in_use)
537 continue;
538
539 /* Determine if this block needs to be backed up */
540 if (regbuf->flags & REGBUF_FORCE_IMAGE)
541 needs_backup = true;
542 else if (regbuf->flags & REGBUF_NO_IMAGE)
543 needs_backup = false;
544 else if (!doPageWrites)
545 needs_backup = false;
546 else
547 {
548 /*
549 * We assume page LSN is first data on *every* page that can be
550 * passed to XLogInsert, whether it has the standard page layout
551 * or not.
552 */
553 XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
554
555 needs_backup = (page_lsn <= RedoRecPtr);
556 if (!needs_backup)
557 {
558 if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
559 *fpw_lsn = page_lsn;
560 }
561 }
562
563 /* Determine if the buffer data needs to included */
564 if (regbuf->rdata_len == 0)
565 needs_data = false;
566 else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
567 needs_data = true;
568 else
569 needs_data = !needs_backup;
570
571 bkpb.id = block_id;
572 bkpb.fork_flags = regbuf->forkno;
573 bkpb.data_length = 0;
574
575 if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
576 bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
577
578 /*
579 * If needs_backup is true or WAL checking is enabled for current
580 * resource manager, log a full-page write for the current block.
581 */
582 include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
583
584 if (include_image)
585 {
586 Page page = regbuf->page;
587 uint16 compressed_len;
588
589 /*
590 * The page needs to be backed up, so calculate its hole length
591 * and offset.
592 */
593 if (regbuf->flags & REGBUF_STANDARD)
594 {
595 /* Assume we can omit data between pd_lower and pd_upper */
596 uint16 lower = ((PageHeader) page)->pd_lower;
597 uint16 upper = ((PageHeader) page)->pd_upper;
598
599 if (lower >= SizeOfPageHeaderData &&
600 upper > lower &&
601 upper <= BLCKSZ)
602 {
603 bimg.hole_offset = lower;
604 cbimg.hole_length = upper - lower;
605 }
606 else
607 {
608 /* No "hole" to compress out */
609 bimg.hole_offset = 0;
610 cbimg.hole_length = 0;
611 }
612 }
613 else
614 {
615 /* Not a standard page header, don't try to eliminate "hole" */
616 bimg.hole_offset = 0;
617 cbimg.hole_length = 0;
618 }
619
620 /*
621 * Try to compress a block image if wal_compression is enabled
622 */
623 if (wal_compression)
624 {
625 is_compressed =
626 XLogCompressBackupBlock(page, bimg.hole_offset,
627 cbimg.hole_length,
628 regbuf->compressed_page,
629 &compressed_len);
630 }
631
632 /*
633 * Fill in the remaining fields in the XLogRecordBlockHeader
634 * struct
635 */
636 bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
637
638 /*
639 * Construct XLogRecData entries for the page content.
640 */
641 rdt_datas_last->next = ®buf->bkp_rdatas[0];
642 rdt_datas_last = rdt_datas_last->next;
643
644 bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
645
646 /*
647 * If WAL consistency checking is enabled for the resource manager
648 * of this WAL record, a full-page image is included in the record
649 * for the block modified. During redo, the full-page is replayed
650 * only if BKPIMAGE_APPLY is set.
651 */
652 if (needs_backup)
653 bimg.bimg_info |= BKPIMAGE_APPLY;
654
655 if (is_compressed)
656 {
657 bimg.length = compressed_len;
658 bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
659
660 rdt_datas_last->data = regbuf->compressed_page;
661 rdt_datas_last->len = compressed_len;
662 }
663 else
664 {
665 bimg.length = BLCKSZ - cbimg.hole_length;
666
667 if (cbimg.hole_length == 0)
668 {
669 rdt_datas_last->data = page;
670 rdt_datas_last->len = BLCKSZ;
671 }
672 else
673 {
674 /* must skip the hole */
675 rdt_datas_last->data = page;
676 rdt_datas_last->len = bimg.hole_offset;
677
678 rdt_datas_last->next = ®buf->bkp_rdatas[1];
679 rdt_datas_last = rdt_datas_last->next;
680
681 rdt_datas_last->data =
682 page + (bimg.hole_offset + cbimg.hole_length);
683 rdt_datas_last->len =
684 BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
685 }
686 }
687
688 total_len += bimg.length;
689 }
690
691 if (needs_data)
692 {
693 /*
694 * Link the caller-supplied rdata chain for this buffer to the
695 * overall list.
696 */
697 bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
698 bkpb.data_length = regbuf->rdata_len;
699 total_len += regbuf->rdata_len;
700
701 rdt_datas_last->next = regbuf->rdata_head;
702 rdt_datas_last = regbuf->rdata_tail;
703 }
704
705 if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
706 {
707 samerel = true;
708 bkpb.fork_flags |= BKPBLOCK_SAME_REL;
709 }
710 else
711 samerel = false;
712 prev_regbuf = regbuf;
713
714 /* Ok, copy the header to the scratch buffer */
715 memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
716 scratch += SizeOfXLogRecordBlockHeader;
717 if (include_image)
718 {
719 memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
720 scratch += SizeOfXLogRecordBlockImageHeader;
721 if (cbimg.hole_length != 0 && is_compressed)
722 {
723 memcpy(scratch, &cbimg,
724 SizeOfXLogRecordBlockCompressHeader);
725 scratch += SizeOfXLogRecordBlockCompressHeader;
726 }
727 }
728 if (!samerel)
729 {
730 memcpy(scratch, ®buf->rnode, sizeof(RelFileNode));
731 scratch += sizeof(RelFileNode);
732 }
733 memcpy(scratch, ®buf->block, sizeof(BlockNumber));
734 scratch += sizeof(BlockNumber);
735 }
736
737 /* followed by the record's origin, if any */
738 if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
739 replorigin_session_origin != InvalidRepOriginId)
740 {
741 *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
742 memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
743 scratch += sizeof(replorigin_session_origin);
744 }
745
746 /* followed by main data, if any */
747 if (mainrdata_len > 0)
748 {
749 if (mainrdata_len > 255)
750 {
751 *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
752 memcpy(scratch, &mainrdata_len, sizeof(uint32));
753 scratch += sizeof(uint32);
754 }
755 else
756 {
757 *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
758 *(scratch++) = (uint8) mainrdata_len;
759 }
760 rdt_datas_last->next = mainrdata_head;
761 rdt_datas_last = mainrdata_last;
762 total_len += mainrdata_len;
763 }
764 rdt_datas_last->next = NULL;
765
766 hdr_rdt.len = (scratch - hdr_scratch);
767 total_len += hdr_rdt.len;
768
769 /*
770 * Calculate CRC of the data
771 *
772 * Note that the record header isn't added into the CRC initially since we
773 * don't know the prev-link yet. Thus, the CRC will represent the CRC of
774 * the whole record in the order: rdata, then backup blocks, then record
775 * header.
776 */
777 INIT_CRC32C(rdata_crc);
778 COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
779 for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
780 COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
781
782 /*
783 * Fill in the fields in the record header. Prev-link is filled in later,
784 * once we know where in the WAL the record will be inserted. The CRC does
785 * not include the record header yet.
786 */
787 rechdr->xl_xid = GetCurrentTransactionIdIfAny();
788 rechdr->xl_tot_len = total_len;
789 rechdr->xl_info = info;
790 rechdr->xl_rmid = rmid;
791 rechdr->xl_prev = InvalidXLogRecPtr;
792 rechdr->xl_crc = rdata_crc;
793
794 return &hdr_rdt;
795 }
796
797 /*
798 * Create a compressed version of a backup block image.
799 *
800 * Returns FALSE if compression fails (i.e., compressed result is actually
801 * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
802 * the length of compressed block image.
803 */
804 static bool
XLogCompressBackupBlock(char * page,uint16 hole_offset,uint16 hole_length,char * dest,uint16 * dlen)805 XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
806 char *dest, uint16 *dlen)
807 {
808 int32 orig_len = BLCKSZ - hole_length;
809 int32 len;
810 int32 extra_bytes = 0;
811 char *source;
812 PGAlignedBlock tmp;
813
814 if (hole_length != 0)
815 {
816 /* must skip the hole */
817 source = tmp.data;
818 memcpy(source, page, hole_offset);
819 memcpy(source + hole_offset,
820 page + (hole_offset + hole_length),
821 BLCKSZ - (hole_length + hole_offset));
822
823 /*
824 * Extra data needs to be stored in WAL record for the compressed
825 * version of block image if the hole exists.
826 */
827 extra_bytes = SizeOfXLogRecordBlockCompressHeader;
828 }
829 else
830 source = page;
831
832 /*
833 * We recheck the actual size even if pglz_compress() reports success and
834 * see if the number of bytes saved by compression is larger than the
835 * length of extra data needed for the compressed version of block image.
836 */
837 len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
838 if (len >= 0 &&
839 len + extra_bytes < orig_len)
840 {
841 *dlen = (uint16) len; /* successful compression */
842 return true;
843 }
844 return false;
845 }
846
847 /*
848 * Determine whether the buffer referenced has to be backed up.
849 *
850 * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
851 * could change later, so the result should be used for optimization purposes
852 * only.
853 */
854 bool
XLogCheckBufferNeedsBackup(Buffer buffer)855 XLogCheckBufferNeedsBackup(Buffer buffer)
856 {
857 XLogRecPtr RedoRecPtr;
858 bool doPageWrites;
859 Page page;
860
861 GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
862
863 page = BufferGetPage(buffer);
864
865 if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
866 return true; /* buffer requires backup */
867
868 return false; /* buffer does not need to be backed up */
869 }
870
871 /*
872 * Write a backup block if needed when we are setting a hint. Note that
873 * this may be called for a variety of page types, not just heaps.
874 *
875 * Callable while holding just share lock on the buffer content.
876 *
877 * We can't use the plain backup block mechanism since that relies on the
878 * Buffer being exclusively locked. Since some modifications (setting LSN, hint
879 * bits) are allowed in a sharelocked buffer that can lead to wal checksum
880 * failures. So instead we copy the page and insert the copied data as normal
881 * record data.
882 *
883 * We only need to do something if page has not yet been full page written in
884 * this checkpoint round. The LSN of the inserted wal record is returned if we
885 * had to write, InvalidXLogRecPtr otherwise.
886 *
887 * It is possible that multiple concurrent backends could attempt to write WAL
888 * records. In that case, multiple copies of the same block would be recorded
889 * in separate WAL records by different backends, though that is still OK from
890 * a correctness perspective.
891 */
892 XLogRecPtr
XLogSaveBufferForHint(Buffer buffer,bool buffer_std)893 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
894 {
895 XLogRecPtr recptr = InvalidXLogRecPtr;
896 XLogRecPtr lsn;
897 XLogRecPtr RedoRecPtr;
898
899 /*
900 * Ensure no checkpoint can change our view of RedoRecPtr.
901 */
902 Assert(MyPgXact->delayChkpt);
903
904 /*
905 * Update RedoRecPtr so that we can make the right decision
906 */
907 RedoRecPtr = GetRedoRecPtr();
908
909 /*
910 * We assume page LSN is first data on *every* page that can be passed to
911 * XLogInsert, whether it has the standard page layout or not. Since we're
912 * only holding a share-lock on the page, we must take the buffer header
913 * lock when we look at the LSN.
914 */
915 lsn = BufferGetLSNAtomic(buffer);
916
917 if (lsn <= RedoRecPtr)
918 {
919 int flags;
920 PGAlignedBlock copied_buffer;
921 char *origdata = (char *) BufferGetBlock(buffer);
922 RelFileNode rnode;
923 ForkNumber forkno;
924 BlockNumber blkno;
925
926 /*
927 * Copy buffer so we don't have to worry about concurrent hint bit or
928 * lsn updates. We assume pd_lower/upper cannot be changed without an
929 * exclusive lock, so the contents bkp are not racy.
930 */
931 if (buffer_std)
932 {
933 /* Assume we can omit data between pd_lower and pd_upper */
934 Page page = BufferGetPage(buffer);
935 uint16 lower = ((PageHeader) page)->pd_lower;
936 uint16 upper = ((PageHeader) page)->pd_upper;
937
938 memcpy(copied_buffer.data, origdata, lower);
939 memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
940 }
941 else
942 memcpy(copied_buffer.data, origdata, BLCKSZ);
943
944 XLogBeginInsert();
945
946 flags = REGBUF_FORCE_IMAGE;
947 if (buffer_std)
948 flags |= REGBUF_STANDARD;
949
950 BufferGetTag(buffer, &rnode, &forkno, &blkno);
951 XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer.data, flags);
952
953 recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
954 }
955
956 return recptr;
957 }
958
959 /*
960 * Write a WAL record containing a full image of a page. Caller is responsible
961 * for writing the page to disk after calling this routine.
962 *
963 * Note: If you're using this function, you should be building pages in private
964 * memory and writing them directly to smgr. If you're using buffers, call
965 * log_newpage_buffer instead.
966 *
967 * If the page follows the standard page layout, with a PageHeader and unused
968 * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
969 * the unused space to be left out from the WAL record, making it smaller.
970 */
971 XLogRecPtr
log_newpage(RelFileNode * rnode,ForkNumber forkNum,BlockNumber blkno,Page page,bool page_std)972 log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
973 Page page, bool page_std)
974 {
975 int flags;
976 XLogRecPtr recptr;
977
978 flags = REGBUF_FORCE_IMAGE;
979 if (page_std)
980 flags |= REGBUF_STANDARD;
981
982 XLogBeginInsert();
983 XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
984 recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
985
986 /*
987 * The page may be uninitialized. If so, we can't set the LSN because that
988 * would corrupt the page.
989 */
990 if (!PageIsNew(page))
991 {
992 PageSetLSN(page, recptr);
993 }
994
995 return recptr;
996 }
997
998 /*
999 * Write a WAL record containing a full image of a page.
1000 *
1001 * Caller should initialize the buffer and mark it dirty before calling this
1002 * function. This function will set the page LSN.
1003 *
1004 * If the page follows the standard page layout, with a PageHeader and unused
1005 * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
1006 * the unused space to be left out from the WAL record, making it smaller.
1007 */
1008 XLogRecPtr
log_newpage_buffer(Buffer buffer,bool page_std)1009 log_newpage_buffer(Buffer buffer, bool page_std)
1010 {
1011 Page page = BufferGetPage(buffer);
1012 RelFileNode rnode;
1013 ForkNumber forkNum;
1014 BlockNumber blkno;
1015
1016 /* Shared buffers should be modified in a critical section. */
1017 Assert(CritSectionCount > 0);
1018
1019 BufferGetTag(buffer, &rnode, &forkNum, &blkno);
1020
1021 return log_newpage(&rnode, forkNum, blkno, page, page_std);
1022 }
1023
1024 /*
1025 * WAL-log a range of blocks in a relation.
1026 *
1027 * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
1028 * written to the WAL. If the range is large, this is done in multiple WAL
1029 * records.
1030 *
1031 * If all page follows the standard page layout, with a PageHeader and unused
1032 * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1033 * the unused space to be left out from the WAL records, making them smaller.
1034 *
1035 * NOTE: This function acquires exclusive-locks on the pages. Typically, this
1036 * is used on a newly-built relation, and the caller is holding a
1037 * AccessExclusiveLock on it, so no other backend can be accessing it at the
1038 * same time. If that's not the case, you must ensure that this does not
1039 * cause a deadlock through some other means.
1040 */
1041 void
log_newpage_range(Relation rel,ForkNumber forkNum,BlockNumber startblk,BlockNumber endblk,bool page_std)1042 log_newpage_range(Relation rel, ForkNumber forkNum,
1043 BlockNumber startblk, BlockNumber endblk,
1044 bool page_std)
1045 {
1046 int flags;
1047 BlockNumber blkno;
1048
1049 flags = REGBUF_FORCE_IMAGE;
1050 if (page_std)
1051 flags |= REGBUF_STANDARD;
1052
1053 /*
1054 * Iterate over all the pages in the range. They are collected into
1055 * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
1056 * for each batch.
1057 */
1058 XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1059
1060 blkno = startblk;
1061 while (blkno < endblk)
1062 {
1063 Buffer bufpack[XLR_MAX_BLOCK_ID];
1064 XLogRecPtr recptr;
1065 int nbufs;
1066 int i;
1067
1068 CHECK_FOR_INTERRUPTS();
1069
1070 /* Collect a batch of blocks. */
1071 nbufs = 0;
1072 while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
1073 {
1074 Buffer buf = ReadBufferExtended(rel, forkNum, blkno,
1075 RBM_NORMAL, NULL);
1076
1077 LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1078
1079 /*
1080 * Completely empty pages are not WAL-logged. Writing a WAL record
1081 * would change the LSN, and we don't want that. We want the page
1082 * to stay empty.
1083 */
1084 if (!PageIsNew(BufferGetPage(buf)))
1085 bufpack[nbufs++] = buf;
1086 else
1087 UnlockReleaseBuffer(buf);
1088 blkno++;
1089 }
1090
1091 /* Write WAL record for this batch. */
1092 XLogBeginInsert();
1093
1094 START_CRIT_SECTION();
1095 for (i = 0; i < nbufs; i++)
1096 {
1097 XLogRegisterBuffer(i, bufpack[i], flags);
1098 MarkBufferDirty(bufpack[i]);
1099 }
1100
1101 recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_MULTI);
1102
1103 for (i = 0; i < nbufs; i++)
1104 {
1105 PageSetLSN(BufferGetPage(bufpack[i]), recptr);
1106 UnlockReleaseBuffer(bufpack[i]);
1107 }
1108 END_CRIT_SECTION();
1109 }
1110 }
1111
1112 /*
1113 * Allocate working buffers needed for WAL record construction.
1114 */
1115 void
InitXLogInsert(void)1116 InitXLogInsert(void)
1117 {
1118 /* Initialize the working areas */
1119 if (xloginsert_cxt == NULL)
1120 {
1121 xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
1122 "WAL record construction",
1123 ALLOCSET_DEFAULT_SIZES);
1124 }
1125
1126 if (registered_buffers == NULL)
1127 {
1128 registered_buffers = (registered_buffer *)
1129 MemoryContextAllocZero(xloginsert_cxt,
1130 sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1131 max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
1132 }
1133 if (rdatas == NULL)
1134 {
1135 rdatas = MemoryContextAlloc(xloginsert_cxt,
1136 sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1137 max_rdatas = XLR_NORMAL_RDATAS;
1138 }
1139
1140 /*
1141 * Allocate a buffer to hold the header information for a WAL record.
1142 */
1143 if (hdr_scratch == NULL)
1144 hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1145 HEADER_SCRATCH_SIZE);
1146 }
1147