1 /*-------------------------------------------------------------------------
2  *
3  * xloginsert.c
4  *		Functions for constructing WAL records
5  *
6  * Constructing a WAL record begins with a call to XLogBeginInsert,
7  * followed by a number of XLogRegister* calls. The registered data is
8  * collected in private working memory, and finally assembled into a chain
9  * of XLogRecData structs by a call to XLogRecordAssemble(). See
10  * access/transam/README for details.
11  *
12  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  * src/backend/access/transam/xloginsert.c
16  *
17  *-------------------------------------------------------------------------
18  */
19 
20 #include "postgres.h"
21 
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xloginsert.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "miscadmin.h"
29 #include "replication/origin.h"
30 #include "storage/bufmgr.h"
31 #include "storage/proc.h"
32 #include "utils/memutils.h"
33 #include "pg_trace.h"
34 
35 /* Buffer size required to store a compressed version of backup block image */
36 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
37 
38 /*
39  * For each block reference registered with XLogRegisterBuffer, we fill in
40  * a registered_buffer struct.
41  */
42 typedef struct
43 {
44 	bool		in_use;			/* is this slot in use? */
45 	uint8		flags;			/* REGBUF_* flags */
46 	RelFileNode rnode;			/* identifies the relation and block */
47 	ForkNumber	forkno;
48 	BlockNumber block;
49 	Page		page;			/* page content */
50 	uint32		rdata_len;		/* total length of data in rdata chain */
51 	XLogRecData *rdata_head;	/* head of the chain of data registered with
52 								 * this block */
53 	XLogRecData *rdata_tail;	/* last entry in the chain, or &rdata_head if
54 								 * empty */
55 
56 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
57 								 * backup block data in XLogRecordAssemble() */
58 
59 	/* buffer to store a compressed version of backup block image */
60 	char		compressed_page[PGLZ_MAX_BLCKSZ];
61 } registered_buffer;
62 
63 static registered_buffer *registered_buffers;
64 static int	max_registered_buffers; /* allocated size */
65 static int	max_registered_block_id = 0;	/* highest block_id + 1 currently
66 											 * registered */
67 
68 /*
69  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
70  * with XLogRegisterData(...).
71  */
72 static XLogRecData *mainrdata_head;
73 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
74 static uint32 mainrdata_len;	/* total # of bytes in chain */
75 
76 /* flags for the in-progress insertion */
77 static uint8 curinsert_flags = 0;
78 
79 /*
80  * These are used to hold the record header while constructing a record.
81  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
82  * because we want it to be MAXALIGNed and padding bytes zeroed.
83  *
84  * For simplicity, it's allocated large enough to hold the headers for any
85  * WAL record.
86  */
87 static XLogRecData hdr_rdt;
88 static char *hdr_scratch = NULL;
89 
90 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
91 
92 #define HEADER_SCRATCH_SIZE \
93 	(SizeOfXLogRecord + \
94 	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
95 	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
96 
97 /*
98  * An array of XLogRecData structs, to hold registered data.
99  */
100 static XLogRecData *rdatas;
101 static int	num_rdatas;			/* entries currently used */
102 static int	max_rdatas;			/* allocated size */
103 
104 static bool begininsert_called = false;
105 
106 /* Memory context to hold the registered buffer and data references. */
107 static MemoryContext xloginsert_cxt;
108 
109 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
110 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
111 				   XLogRecPtr *fpw_lsn);
112 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
113 						uint16 hole_length, char *dest, uint16 *dlen);
114 
115 /*
116  * Begin constructing a WAL record. This must be called before the
117  * XLogRegister* functions and XLogInsert().
118  */
119 void
XLogBeginInsert(void)120 XLogBeginInsert(void)
121 {
122 	Assert(max_registered_block_id == 0);
123 	Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
124 	Assert(mainrdata_len == 0);
125 
126 	/* cross-check on whether we should be here or not */
127 	if (!XLogInsertAllowed())
128 		elog(ERROR, "cannot make new WAL entries during recovery");
129 
130 	if (begininsert_called)
131 		elog(ERROR, "XLogBeginInsert was already called");
132 
133 	begininsert_called = true;
134 }
135 
136 /*
137  * Ensure that there are enough buffer and data slots in the working area,
138  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
139  * calls.
140  *
141  * There is always space for a small number of buffers and data chunks, enough
142  * for most record types. This function is for the exceptional cases that need
143  * more.
144  */
145 void
XLogEnsureRecordSpace(int max_block_id,int ndatas)146 XLogEnsureRecordSpace(int max_block_id, int ndatas)
147 {
148 	int			nbuffers;
149 
150 	/*
151 	 * This must be called before entering a critical section, because
152 	 * allocating memory inside a critical section can fail. repalloc() will
153 	 * check the same, but better to check it here too so that we fail
154 	 * consistently even if the arrays happen to be large enough already.
155 	 */
156 	Assert(CritSectionCount == 0);
157 
158 	/* the minimum values can't be decreased */
159 	if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
160 		max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
161 	if (ndatas < XLR_NORMAL_RDATAS)
162 		ndatas = XLR_NORMAL_RDATAS;
163 
164 	if (max_block_id > XLR_MAX_BLOCK_ID)
165 		elog(ERROR, "maximum number of WAL record block references exceeded");
166 	nbuffers = max_block_id + 1;
167 
168 	if (nbuffers > max_registered_buffers)
169 	{
170 		registered_buffers = (registered_buffer *)
171 			repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
172 
173 		/*
174 		 * At least the padding bytes in the structs must be zeroed, because
175 		 * they are included in WAL data, but initialize it all for tidiness.
176 		 */
177 		MemSet(&registered_buffers[max_registered_buffers], 0,
178 			   (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
179 		max_registered_buffers = nbuffers;
180 	}
181 
182 	if (ndatas > max_rdatas)
183 	{
184 		rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
185 		max_rdatas = ndatas;
186 	}
187 }
188 
189 /*
190  * Reset WAL record construction buffers.
191  */
192 void
XLogResetInsertion(void)193 XLogResetInsertion(void)
194 {
195 	int			i;
196 
197 	for (i = 0; i < max_registered_block_id; i++)
198 		registered_buffers[i].in_use = false;
199 
200 	num_rdatas = 0;
201 	max_registered_block_id = 0;
202 	mainrdata_len = 0;
203 	mainrdata_last = (XLogRecData *) &mainrdata_head;
204 	curinsert_flags = 0;
205 	begininsert_called = false;
206 }
207 
208 /*
209  * Register a reference to a buffer with the WAL record being constructed.
210  * This must be called for every page that the WAL-logged operation modifies.
211  */
212 void
XLogRegisterBuffer(uint8 block_id,Buffer buffer,uint8 flags)213 XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
214 {
215 	registered_buffer *regbuf;
216 
217 	/* NO_IMAGE doesn't make sense with FORCE_IMAGE */
218 	Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
219 	Assert(begininsert_called);
220 
221 	if (block_id >= max_registered_block_id)
222 	{
223 		if (block_id >= max_registered_buffers)
224 			elog(ERROR, "too many registered buffers");
225 		max_registered_block_id = block_id + 1;
226 	}
227 
228 	regbuf = &registered_buffers[block_id];
229 
230 	BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
231 	regbuf->page = BufferGetPage(buffer);
232 	regbuf->flags = flags;
233 	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
234 	regbuf->rdata_len = 0;
235 
236 	/*
237 	 * Check that this page hasn't already been registered with some other
238 	 * block_id.
239 	 */
240 #ifdef USE_ASSERT_CHECKING
241 	{
242 		int			i;
243 
244 		for (i = 0; i < max_registered_block_id; i++)
245 		{
246 			registered_buffer *regbuf_old = &registered_buffers[i];
247 
248 			if (i == block_id || !regbuf_old->in_use)
249 				continue;
250 
251 			Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
252 				   regbuf_old->forkno != regbuf->forkno ||
253 				   regbuf_old->block != regbuf->block);
254 		}
255 	}
256 #endif
257 
258 	regbuf->in_use = true;
259 }
260 
261 /*
262  * Like XLogRegisterBuffer, but for registering a block that's not in the
263  * shared buffer pool (i.e. when you don't have a Buffer for it).
264  */
265 void
XLogRegisterBlock(uint8 block_id,RelFileNode * rnode,ForkNumber forknum,BlockNumber blknum,Page page,uint8 flags)266 XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
267 				  BlockNumber blknum, Page page, uint8 flags)
268 {
269 	registered_buffer *regbuf;
270 
271 	/* This is currently only used to WAL-log a full-page image of a page */
272 	Assert(flags & REGBUF_FORCE_IMAGE);
273 	Assert(begininsert_called);
274 
275 	if (block_id >= max_registered_block_id)
276 		max_registered_block_id = block_id + 1;
277 
278 	if (block_id >= max_registered_buffers)
279 		elog(ERROR, "too many registered buffers");
280 
281 	regbuf = &registered_buffers[block_id];
282 
283 	regbuf->rnode = *rnode;
284 	regbuf->forkno = forknum;
285 	regbuf->block = blknum;
286 	regbuf->page = page;
287 	regbuf->flags = flags;
288 	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
289 	regbuf->rdata_len = 0;
290 
291 	/*
292 	 * Check that this page hasn't already been registered with some other
293 	 * block_id.
294 	 */
295 #ifdef USE_ASSERT_CHECKING
296 	{
297 		int			i;
298 
299 		for (i = 0; i < max_registered_block_id; i++)
300 		{
301 			registered_buffer *regbuf_old = &registered_buffers[i];
302 
303 			if (i == block_id || !regbuf_old->in_use)
304 				continue;
305 
306 			Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
307 				   regbuf_old->forkno != regbuf->forkno ||
308 				   regbuf_old->block != regbuf->block);
309 		}
310 	}
311 #endif
312 
313 	regbuf->in_use = true;
314 }
315 
316 /*
317  * Add data to the WAL record that's being constructed.
318  *
319  * The data is appended to the "main chunk", available at replay with
320  * XLogRecGetData().
321  */
322 void
XLogRegisterData(char * data,int len)323 XLogRegisterData(char *data, int len)
324 {
325 	XLogRecData *rdata;
326 
327 	Assert(begininsert_called);
328 
329 	if (num_rdatas >= max_rdatas)
330 		elog(ERROR, "too much WAL data");
331 	rdata = &rdatas[num_rdatas++];
332 
333 	rdata->data = data;
334 	rdata->len = len;
335 
336 	/*
337 	 * we use the mainrdata_last pointer to track the end of the chain, so no
338 	 * need to clear 'next' here.
339 	 */
340 
341 	mainrdata_last->next = rdata;
342 	mainrdata_last = rdata;
343 
344 	mainrdata_len += len;
345 }
346 
347 /*
348  * Add buffer-specific data to the WAL record that's being constructed.
349  *
350  * Block_id must reference a block previously registered with
351  * XLogRegisterBuffer(). If this is called more than once for the same
352  * block_id, the data is appended.
353  *
354  * The maximum amount of data that can be registered per block is 65535
355  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
356  * reconstruct the changes to the page, you might as well just log a full
357  * copy of it. (the "main data" that's not associated with a block is not
358  * limited)
359  */
360 void
XLogRegisterBufData(uint8 block_id,char * data,int len)361 XLogRegisterBufData(uint8 block_id, char *data, int len)
362 {
363 	registered_buffer *regbuf;
364 	XLogRecData *rdata;
365 
366 	Assert(begininsert_called);
367 
368 	/* find the registered buffer struct */
369 	regbuf = &registered_buffers[block_id];
370 	if (!regbuf->in_use)
371 		elog(ERROR, "no block with id %d registered with WAL insertion",
372 			 block_id);
373 
374 	if (num_rdatas >= max_rdatas)
375 		elog(ERROR, "too much WAL data");
376 	rdata = &rdatas[num_rdatas++];
377 
378 	rdata->data = data;
379 	rdata->len = len;
380 
381 	regbuf->rdata_tail->next = rdata;
382 	regbuf->rdata_tail = rdata;
383 	regbuf->rdata_len += len;
384 }
385 
386 /*
387  * Set insert status flags for the upcoming WAL record.
388  *
389  * The flags that can be used here are:
390  * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
391  *	 included in the record.
392  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
393  *	 durability, which allows to avoid triggering WAL archiving and other
394  *	 background activity.
395  */
396 void
XLogSetRecordFlags(uint8 flags)397 XLogSetRecordFlags(uint8 flags)
398 {
399 	Assert(begininsert_called);
400 	curinsert_flags = flags;
401 }
402 
403 /*
404  * Insert an XLOG record having the specified RMID and info bytes, with the
405  * body of the record being the data and buffer references registered earlier
406  * with XLogRegister* calls.
407  *
408  * Returns XLOG pointer to end of record (beginning of next record).
409  * This can be used as LSN for data pages affected by the logged action.
410  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
411  * before the data page can be written out.  This implements the basic
412  * WAL rule "write the log before the data".)
413  */
414 XLogRecPtr
XLogInsert(RmgrId rmid,uint8 info)415 XLogInsert(RmgrId rmid, uint8 info)
416 {
417 	XLogRecPtr	EndPos;
418 
419 	/* XLogBeginInsert() must have been called. */
420 	if (!begininsert_called)
421 		elog(ERROR, "XLogBeginInsert was not called");
422 
423 	/*
424 	 * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
425 	 * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
426 	 */
427 	if ((info & ~(XLR_RMGR_INFO_MASK |
428 				  XLR_SPECIAL_REL_UPDATE |
429 				  XLR_CHECK_CONSISTENCY)) != 0)
430 		elog(PANIC, "invalid xlog info mask %02X", info);
431 
432 	TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
433 
434 	/*
435 	 * In bootstrap mode, we don't actually log anything but XLOG resources;
436 	 * return a phony record pointer.
437 	 */
438 	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
439 	{
440 		XLogResetInsertion();
441 		EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
442 		return EndPos;
443 	}
444 
445 	do
446 	{
447 		XLogRecPtr	RedoRecPtr;
448 		bool		doPageWrites;
449 		XLogRecPtr	fpw_lsn;
450 		XLogRecData *rdt;
451 
452 		/*
453 		 * Get values needed to decide whether to do full-page writes. Since
454 		 * we don't yet have an insertion lock, these could change under us,
455 		 * but XLogInsertRecord will recheck them once it has a lock.
456 		 */
457 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
458 
459 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
460 								 &fpw_lsn);
461 
462 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
463 	} while (EndPos == InvalidXLogRecPtr);
464 
465 	XLogResetInsertion();
466 
467 	return EndPos;
468 }
469 
470 /*
471  * Assemble a WAL record from the registered data and buffers into an
472  * XLogRecData chain, ready for insertion with XLogInsertRecord().
473  *
474  * The record header fields are filled in, except for the xl_prev field. The
475  * calculated CRC does not include the record header yet.
476  *
477  * If there are any registered buffers, and a full-page image was not taken
478  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
479  * signals that the assembled record is only good for insertion on the
480  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
481  */
482 static XLogRecData *
XLogRecordAssemble(RmgrId rmid,uint8 info,XLogRecPtr RedoRecPtr,bool doPageWrites,XLogRecPtr * fpw_lsn)483 XLogRecordAssemble(RmgrId rmid, uint8 info,
484 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
485 				   XLogRecPtr *fpw_lsn)
486 {
487 	XLogRecData *rdt;
488 	uint32		total_len = 0;
489 	int			block_id;
490 	pg_crc32c	rdata_crc;
491 	registered_buffer *prev_regbuf = NULL;
492 	XLogRecData *rdt_datas_last;
493 	XLogRecord *rechdr;
494 	char	   *scratch = hdr_scratch;
495 
496 	/*
497 	 * Note: this function can be called multiple times for the same record.
498 	 * All the modifications we do to the rdata chains below must handle that.
499 	 */
500 
501 	/* The record begins with the fixed-size header */
502 	rechdr = (XLogRecord *) scratch;
503 	scratch += SizeOfXLogRecord;
504 
505 	hdr_rdt.next = NULL;
506 	rdt_datas_last = &hdr_rdt;
507 	hdr_rdt.data = hdr_scratch;
508 
509 	/*
510 	 * Enforce consistency checks for this record if user is looking for it.
511 	 * Do this before at the beginning of this routine to give the possibility
512 	 * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
513 	 * a record.
514 	 */
515 	if (wal_consistency_checking[rmid])
516 		info |= XLR_CHECK_CONSISTENCY;
517 
518 	/*
519 	 * Make an rdata chain containing all the data portions of all block
520 	 * references. This includes the data for full-page images. Also append
521 	 * the headers for the block references in the scratch buffer.
522 	 */
523 	*fpw_lsn = InvalidXLogRecPtr;
524 	for (block_id = 0; block_id < max_registered_block_id; block_id++)
525 	{
526 		registered_buffer *regbuf = &registered_buffers[block_id];
527 		bool		needs_backup;
528 		bool		needs_data;
529 		XLogRecordBlockHeader bkpb;
530 		XLogRecordBlockImageHeader bimg;
531 		XLogRecordBlockCompressHeader cbimg = {0};
532 		bool		samerel;
533 		bool		is_compressed = false;
534 		bool		include_image;
535 
536 		if (!regbuf->in_use)
537 			continue;
538 
539 		/* Determine if this block needs to be backed up */
540 		if (regbuf->flags & REGBUF_FORCE_IMAGE)
541 			needs_backup = true;
542 		else if (regbuf->flags & REGBUF_NO_IMAGE)
543 			needs_backup = false;
544 		else if (!doPageWrites)
545 			needs_backup = false;
546 		else
547 		{
548 			/*
549 			 * We assume page LSN is first data on *every* page that can be
550 			 * passed to XLogInsert, whether it has the standard page layout
551 			 * or not.
552 			 */
553 			XLogRecPtr	page_lsn = PageGetLSN(regbuf->page);
554 
555 			needs_backup = (page_lsn <= RedoRecPtr);
556 			if (!needs_backup)
557 			{
558 				if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
559 					*fpw_lsn = page_lsn;
560 			}
561 		}
562 
563 		/* Determine if the buffer data needs to included */
564 		if (regbuf->rdata_len == 0)
565 			needs_data = false;
566 		else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
567 			needs_data = true;
568 		else
569 			needs_data = !needs_backup;
570 
571 		bkpb.id = block_id;
572 		bkpb.fork_flags = regbuf->forkno;
573 		bkpb.data_length = 0;
574 
575 		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
576 			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
577 
578 		/*
579 		 * If needs_backup is true or WAL checking is enabled for current
580 		 * resource manager, log a full-page write for the current block.
581 		 */
582 		include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
583 
584 		if (include_image)
585 		{
586 			Page		page = regbuf->page;
587 			uint16		compressed_len;
588 
589 			/*
590 			 * The page needs to be backed up, so calculate its hole length
591 			 * and offset.
592 			 */
593 			if (regbuf->flags & REGBUF_STANDARD)
594 			{
595 				/* Assume we can omit data between pd_lower and pd_upper */
596 				uint16		lower = ((PageHeader) page)->pd_lower;
597 				uint16		upper = ((PageHeader) page)->pd_upper;
598 
599 				if (lower >= SizeOfPageHeaderData &&
600 					upper > lower &&
601 					upper <= BLCKSZ)
602 				{
603 					bimg.hole_offset = lower;
604 					cbimg.hole_length = upper - lower;
605 				}
606 				else
607 				{
608 					/* No "hole" to compress out */
609 					bimg.hole_offset = 0;
610 					cbimg.hole_length = 0;
611 				}
612 			}
613 			else
614 			{
615 				/* Not a standard page header, don't try to eliminate "hole" */
616 				bimg.hole_offset = 0;
617 				cbimg.hole_length = 0;
618 			}
619 
620 			/*
621 			 * Try to compress a block image if wal_compression is enabled
622 			 */
623 			if (wal_compression)
624 			{
625 				is_compressed =
626 					XLogCompressBackupBlock(page, bimg.hole_offset,
627 											cbimg.hole_length,
628 											regbuf->compressed_page,
629 											&compressed_len);
630 			}
631 
632 			/*
633 			 * Fill in the remaining fields in the XLogRecordBlockHeader
634 			 * struct
635 			 */
636 			bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
637 
638 			/*
639 			 * Construct XLogRecData entries for the page content.
640 			 */
641 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
642 			rdt_datas_last = rdt_datas_last->next;
643 
644 			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
645 
646 			/*
647 			 * If WAL consistency checking is enabled for the resource manager
648 			 * of this WAL record, a full-page image is included in the record
649 			 * for the block modified. During redo, the full-page is replayed
650 			 * only if BKPIMAGE_APPLY is set.
651 			 */
652 			if (needs_backup)
653 				bimg.bimg_info |= BKPIMAGE_APPLY;
654 
655 			if (is_compressed)
656 			{
657 				bimg.length = compressed_len;
658 				bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
659 
660 				rdt_datas_last->data = regbuf->compressed_page;
661 				rdt_datas_last->len = compressed_len;
662 			}
663 			else
664 			{
665 				bimg.length = BLCKSZ - cbimg.hole_length;
666 
667 				if (cbimg.hole_length == 0)
668 				{
669 					rdt_datas_last->data = page;
670 					rdt_datas_last->len = BLCKSZ;
671 				}
672 				else
673 				{
674 					/* must skip the hole */
675 					rdt_datas_last->data = page;
676 					rdt_datas_last->len = bimg.hole_offset;
677 
678 					rdt_datas_last->next = &regbuf->bkp_rdatas[1];
679 					rdt_datas_last = rdt_datas_last->next;
680 
681 					rdt_datas_last->data =
682 						page + (bimg.hole_offset + cbimg.hole_length);
683 					rdt_datas_last->len =
684 						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
685 				}
686 			}
687 
688 			total_len += bimg.length;
689 		}
690 
691 		if (needs_data)
692 		{
693 			/*
694 			 * Link the caller-supplied rdata chain for this buffer to the
695 			 * overall list.
696 			 */
697 			bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
698 			bkpb.data_length = regbuf->rdata_len;
699 			total_len += regbuf->rdata_len;
700 
701 			rdt_datas_last->next = regbuf->rdata_head;
702 			rdt_datas_last = regbuf->rdata_tail;
703 		}
704 
705 		if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
706 		{
707 			samerel = true;
708 			bkpb.fork_flags |= BKPBLOCK_SAME_REL;
709 		}
710 		else
711 			samerel = false;
712 		prev_regbuf = regbuf;
713 
714 		/* Ok, copy the header to the scratch buffer */
715 		memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
716 		scratch += SizeOfXLogRecordBlockHeader;
717 		if (include_image)
718 		{
719 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
720 			scratch += SizeOfXLogRecordBlockImageHeader;
721 			if (cbimg.hole_length != 0 && is_compressed)
722 			{
723 				memcpy(scratch, &cbimg,
724 					   SizeOfXLogRecordBlockCompressHeader);
725 				scratch += SizeOfXLogRecordBlockCompressHeader;
726 			}
727 		}
728 		if (!samerel)
729 		{
730 			memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
731 			scratch += sizeof(RelFileNode);
732 		}
733 		memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
734 		scratch += sizeof(BlockNumber);
735 	}
736 
737 	/* followed by the record's origin, if any */
738 	if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
739 		replorigin_session_origin != InvalidRepOriginId)
740 	{
741 		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
742 		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
743 		scratch += sizeof(replorigin_session_origin);
744 	}
745 
746 	/* followed by main data, if any */
747 	if (mainrdata_len > 0)
748 	{
749 		if (mainrdata_len > 255)
750 		{
751 			*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
752 			memcpy(scratch, &mainrdata_len, sizeof(uint32));
753 			scratch += sizeof(uint32);
754 		}
755 		else
756 		{
757 			*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
758 			*(scratch++) = (uint8) mainrdata_len;
759 		}
760 		rdt_datas_last->next = mainrdata_head;
761 		rdt_datas_last = mainrdata_last;
762 		total_len += mainrdata_len;
763 	}
764 	rdt_datas_last->next = NULL;
765 
766 	hdr_rdt.len = (scratch - hdr_scratch);
767 	total_len += hdr_rdt.len;
768 
769 	/*
770 	 * Calculate CRC of the data
771 	 *
772 	 * Note that the record header isn't added into the CRC initially since we
773 	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
774 	 * the whole record in the order: rdata, then backup blocks, then record
775 	 * header.
776 	 */
777 	INIT_CRC32C(rdata_crc);
778 	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
779 	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
780 		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
781 
782 	/*
783 	 * Fill in the fields in the record header. Prev-link is filled in later,
784 	 * once we know where in the WAL the record will be inserted. The CRC does
785 	 * not include the record header yet.
786 	 */
787 	rechdr->xl_xid = GetCurrentTransactionIdIfAny();
788 	rechdr->xl_tot_len = total_len;
789 	rechdr->xl_info = info;
790 	rechdr->xl_rmid = rmid;
791 	rechdr->xl_prev = InvalidXLogRecPtr;
792 	rechdr->xl_crc = rdata_crc;
793 
794 	return &hdr_rdt;
795 }
796 
797 /*
798  * Create a compressed version of a backup block image.
799  *
800  * Returns FALSE if compression fails (i.e., compressed result is actually
801  * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
802  * the length of compressed block image.
803  */
804 static bool
XLogCompressBackupBlock(char * page,uint16 hole_offset,uint16 hole_length,char * dest,uint16 * dlen)805 XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
806 						char *dest, uint16 *dlen)
807 {
808 	int32		orig_len = BLCKSZ - hole_length;
809 	int32		len;
810 	int32		extra_bytes = 0;
811 	char	   *source;
812 	PGAlignedBlock tmp;
813 
814 	if (hole_length != 0)
815 	{
816 		/* must skip the hole */
817 		source = tmp.data;
818 		memcpy(source, page, hole_offset);
819 		memcpy(source + hole_offset,
820 			   page + (hole_offset + hole_length),
821 			   BLCKSZ - (hole_length + hole_offset));
822 
823 		/*
824 		 * Extra data needs to be stored in WAL record for the compressed
825 		 * version of block image if the hole exists.
826 		 */
827 		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
828 	}
829 	else
830 		source = page;
831 
832 	/*
833 	 * We recheck the actual size even if pglz_compress() reports success and
834 	 * see if the number of bytes saved by compression is larger than the
835 	 * length of extra data needed for the compressed version of block image.
836 	 */
837 	len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
838 	if (len >= 0 &&
839 		len + extra_bytes < orig_len)
840 	{
841 		*dlen = (uint16) len;	/* successful compression */
842 		return true;
843 	}
844 	return false;
845 }
846 
847 /*
848  * Determine whether the buffer referenced has to be backed up.
849  *
850  * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
851  * could change later, so the result should be used for optimization purposes
852  * only.
853  */
854 bool
XLogCheckBufferNeedsBackup(Buffer buffer)855 XLogCheckBufferNeedsBackup(Buffer buffer)
856 {
857 	XLogRecPtr	RedoRecPtr;
858 	bool		doPageWrites;
859 	Page		page;
860 
861 	GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
862 
863 	page = BufferGetPage(buffer);
864 
865 	if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
866 		return true;			/* buffer requires backup */
867 
868 	return false;				/* buffer does not need to be backed up */
869 }
870 
871 /*
872  * Write a backup block if needed when we are setting a hint. Note that
873  * this may be called for a variety of page types, not just heaps.
874  *
875  * Callable while holding just share lock on the buffer content.
876  *
877  * We can't use the plain backup block mechanism since that relies on the
878  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
879  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
880  * failures. So instead we copy the page and insert the copied data as normal
881  * record data.
882  *
883  * We only need to do something if page has not yet been full page written in
884  * this checkpoint round. The LSN of the inserted wal record is returned if we
885  * had to write, InvalidXLogRecPtr otherwise.
886  *
887  * It is possible that multiple concurrent backends could attempt to write WAL
888  * records. In that case, multiple copies of the same block would be recorded
889  * in separate WAL records by different backends, though that is still OK from
890  * a correctness perspective.
891  */
892 XLogRecPtr
XLogSaveBufferForHint(Buffer buffer,bool buffer_std)893 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
894 {
895 	XLogRecPtr	recptr = InvalidXLogRecPtr;
896 	XLogRecPtr	lsn;
897 	XLogRecPtr	RedoRecPtr;
898 
899 	/*
900 	 * Ensure no checkpoint can change our view of RedoRecPtr.
901 	 */
902 	Assert(MyPgXact->delayChkpt);
903 
904 	/*
905 	 * Update RedoRecPtr so that we can make the right decision
906 	 */
907 	RedoRecPtr = GetRedoRecPtr();
908 
909 	/*
910 	 * We assume page LSN is first data on *every* page that can be passed to
911 	 * XLogInsert, whether it has the standard page layout or not. Since we're
912 	 * only holding a share-lock on the page, we must take the buffer header
913 	 * lock when we look at the LSN.
914 	 */
915 	lsn = BufferGetLSNAtomic(buffer);
916 
917 	if (lsn <= RedoRecPtr)
918 	{
919 		int			flags;
920 		PGAlignedBlock copied_buffer;
921 		char	   *origdata = (char *) BufferGetBlock(buffer);
922 		RelFileNode rnode;
923 		ForkNumber	forkno;
924 		BlockNumber blkno;
925 
926 		/*
927 		 * Copy buffer so we don't have to worry about concurrent hint bit or
928 		 * lsn updates. We assume pd_lower/upper cannot be changed without an
929 		 * exclusive lock, so the contents bkp are not racy.
930 		 */
931 		if (buffer_std)
932 		{
933 			/* Assume we can omit data between pd_lower and pd_upper */
934 			Page		page = BufferGetPage(buffer);
935 			uint16		lower = ((PageHeader) page)->pd_lower;
936 			uint16		upper = ((PageHeader) page)->pd_upper;
937 
938 			memcpy(copied_buffer.data, origdata, lower);
939 			memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
940 		}
941 		else
942 			memcpy(copied_buffer.data, origdata, BLCKSZ);
943 
944 		XLogBeginInsert();
945 
946 		flags = REGBUF_FORCE_IMAGE;
947 		if (buffer_std)
948 			flags |= REGBUF_STANDARD;
949 
950 		BufferGetTag(buffer, &rnode, &forkno, &blkno);
951 		XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer.data, flags);
952 
953 		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
954 	}
955 
956 	return recptr;
957 }
958 
959 /*
960  * Write a WAL record containing a full image of a page. Caller is responsible
961  * for writing the page to disk after calling this routine.
962  *
963  * Note: If you're using this function, you should be building pages in private
964  * memory and writing them directly to smgr.  If you're using buffers, call
965  * log_newpage_buffer instead.
966  *
967  * If the page follows the standard page layout, with a PageHeader and unused
968  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
969  * the unused space to be left out from the WAL record, making it smaller.
970  */
971 XLogRecPtr
log_newpage(RelFileNode * rnode,ForkNumber forkNum,BlockNumber blkno,Page page,bool page_std)972 log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
973 			Page page, bool page_std)
974 {
975 	int			flags;
976 	XLogRecPtr	recptr;
977 
978 	flags = REGBUF_FORCE_IMAGE;
979 	if (page_std)
980 		flags |= REGBUF_STANDARD;
981 
982 	XLogBeginInsert();
983 	XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
984 	recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
985 
986 	/*
987 	 * The page may be uninitialized. If so, we can't set the LSN because that
988 	 * would corrupt the page.
989 	 */
990 	if (!PageIsNew(page))
991 	{
992 		PageSetLSN(page, recptr);
993 	}
994 
995 	return recptr;
996 }
997 
998 /*
999  * Write a WAL record containing a full image of a page.
1000  *
1001  * Caller should initialize the buffer and mark it dirty before calling this
1002  * function.  This function will set the page LSN.
1003  *
1004  * If the page follows the standard page layout, with a PageHeader and unused
1005  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
1006  * the unused space to be left out from the WAL record, making it smaller.
1007  */
1008 XLogRecPtr
log_newpage_buffer(Buffer buffer,bool page_std)1009 log_newpage_buffer(Buffer buffer, bool page_std)
1010 {
1011 	Page		page = BufferGetPage(buffer);
1012 	RelFileNode rnode;
1013 	ForkNumber	forkNum;
1014 	BlockNumber blkno;
1015 
1016 	/* Shared buffers should be modified in a critical section. */
1017 	Assert(CritSectionCount > 0);
1018 
1019 	BufferGetTag(buffer, &rnode, &forkNum, &blkno);
1020 
1021 	return log_newpage(&rnode, forkNum, blkno, page, page_std);
1022 }
1023 
1024 /*
1025  * WAL-log a range of blocks in a relation.
1026  *
1027  * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
1028  * written to the WAL. If the range is large, this is done in multiple WAL
1029  * records.
1030  *
1031  * If all page follows the standard page layout, with a PageHeader and unused
1032  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
1033  * the unused space to be left out from the WAL records, making them smaller.
1034  *
1035  * NOTE: This function acquires exclusive-locks on the pages. Typically, this
1036  * is used on a newly-built relation, and the caller is holding a
1037  * AccessExclusiveLock on it, so no other backend can be accessing it at the
1038  * same time. If that's not the case, you must ensure that this does not
1039  * cause a deadlock through some other means.
1040  */
1041 void
log_newpage_range(Relation rel,ForkNumber forkNum,BlockNumber startblk,BlockNumber endblk,bool page_std)1042 log_newpage_range(Relation rel, ForkNumber forkNum,
1043 				  BlockNumber startblk, BlockNumber endblk,
1044 				  bool page_std)
1045 {
1046 	int			flags;
1047 	BlockNumber blkno;
1048 
1049 	flags = REGBUF_FORCE_IMAGE;
1050 	if (page_std)
1051 		flags |= REGBUF_STANDARD;
1052 
1053 	/*
1054 	 * Iterate over all the pages in the range. They are collected into
1055 	 * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
1056 	 * for each batch.
1057 	 */
1058 	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1059 
1060 	blkno = startblk;
1061 	while (blkno < endblk)
1062 	{
1063 		Buffer		bufpack[XLR_MAX_BLOCK_ID];
1064 		XLogRecPtr	recptr;
1065 		int			nbufs;
1066 		int			i;
1067 
1068 		CHECK_FOR_INTERRUPTS();
1069 
1070 		/* Collect a batch of blocks. */
1071 		nbufs = 0;
1072 		while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
1073 		{
1074 			Buffer		buf = ReadBufferExtended(rel, forkNum, blkno,
1075 												 RBM_NORMAL, NULL);
1076 
1077 			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1078 
1079 			/*
1080 			 * Completely empty pages are not WAL-logged. Writing a WAL record
1081 			 * would change the LSN, and we don't want that. We want the page
1082 			 * to stay empty.
1083 			 */
1084 			if (!PageIsNew(BufferGetPage(buf)))
1085 				bufpack[nbufs++] = buf;
1086 			else
1087 				UnlockReleaseBuffer(buf);
1088 			blkno++;
1089 		}
1090 
1091 		/* Write WAL record for this batch. */
1092 		XLogBeginInsert();
1093 
1094 		START_CRIT_SECTION();
1095 		for (i = 0; i < nbufs; i++)
1096 		{
1097 			XLogRegisterBuffer(i, bufpack[i], flags);
1098 			MarkBufferDirty(bufpack[i]);
1099 		}
1100 
1101 		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_MULTI);
1102 
1103 		for (i = 0; i < nbufs; i++)
1104 		{
1105 			PageSetLSN(BufferGetPage(bufpack[i]), recptr);
1106 			UnlockReleaseBuffer(bufpack[i]);
1107 		}
1108 		END_CRIT_SECTION();
1109 	}
1110 }
1111 
1112 /*
1113  * Allocate working buffers needed for WAL record construction.
1114  */
1115 void
InitXLogInsert(void)1116 InitXLogInsert(void)
1117 {
1118 	/* Initialize the working areas */
1119 	if (xloginsert_cxt == NULL)
1120 	{
1121 		xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
1122 											   "WAL record construction",
1123 											   ALLOCSET_DEFAULT_SIZES);
1124 	}
1125 
1126 	if (registered_buffers == NULL)
1127 	{
1128 		registered_buffers = (registered_buffer *)
1129 			MemoryContextAllocZero(xloginsert_cxt,
1130 								   sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1131 		max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
1132 	}
1133 	if (rdatas == NULL)
1134 	{
1135 		rdatas = MemoryContextAlloc(xloginsert_cxt,
1136 									sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1137 		max_rdatas = XLR_NORMAL_RDATAS;
1138 	}
1139 
1140 	/*
1141 	 * Allocate a buffer to hold the header information for a WAL record.
1142 	 */
1143 	if (hdr_scratch == NULL)
1144 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1145 											 HEADER_SCRATCH_SIZE);
1146 }
1147