1 /*-------------------------------------------------------------------------
2  *
3  * xloginsert.c
4  *		Functions for constructing WAL records
5  *
6  * Constructing a WAL record begins with a call to XLogBeginInsert,
7  * followed by a number of XLogRegister* calls. The registered data is
8  * collected in private working memory, and finally assembled into a chain
9  * of XLogRecData structs by a call to XLogRecordAssemble(). See
10  * access/transam/README for details.
11  *
12  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
13  * Portions Copyright (c) 1994, Regents of the University of California
14  *
15  * src/backend/access/transam/xloginsert.c
16  *
17  *-------------------------------------------------------------------------
18  */
19 
20 #include "postgres.h"
21 
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "access/xlog_internal.h"
25 #include "access/xloginsert.h"
26 #include "catalog/pg_control.h"
27 #include "common/pg_lzcompress.h"
28 #include "miscadmin.h"
29 #include "replication/origin.h"
30 #include "storage/bufmgr.h"
31 #include "storage/proc.h"
32 #include "utils/memutils.h"
33 #include "pg_trace.h"
34 
35 /* Buffer size required to store a compressed version of backup block image */
36 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
37 
38 /*
39  * For each block reference registered with XLogRegisterBuffer, we fill in
40  * a registered_buffer struct.
41  */
42 typedef struct
43 {
44 	bool		in_use;			/* is this slot in use? */
45 	uint8		flags;			/* REGBUF_* flags */
46 	RelFileNode rnode;			/* identifies the relation and block */
47 	ForkNumber	forkno;
48 	BlockNumber block;
49 	Page		page;			/* page content */
50 	uint32		rdata_len;		/* total length of data in rdata chain */
51 	XLogRecData *rdata_head;	/* head of the chain of data registered with
52 								 * this block */
53 	XLogRecData *rdata_tail;	/* last entry in the chain, or &rdata_head if
54 								 * empty */
55 
56 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
57 								 * backup block data in XLogRecordAssemble() */
58 
59 	/* buffer to store a compressed version of backup block image */
60 	char		compressed_page[PGLZ_MAX_BLCKSZ];
61 } registered_buffer;
62 
63 static registered_buffer *registered_buffers;
64 static int	max_registered_buffers;		/* allocated size */
65 static int	max_registered_block_id = 0;		/* highest block_id + 1
66 												 * currently registered */
67 
68 /*
69  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
70  * with XLogRegisterData(...).
71  */
72 static XLogRecData *mainrdata_head;
73 static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
74 static uint32 mainrdata_len;	/* total # of bytes in chain */
75 
76 /* Should the in-progress insertion log the origin? */
77 static bool include_origin = false;
78 
79 /*
80  * These are used to hold the record header while constructing a record.
81  * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
82  * because we want it to be MAXALIGNed and padding bytes zeroed.
83  *
84  * For simplicity, it's allocated large enough to hold the headers for any
85  * WAL record.
86  */
87 static XLogRecData hdr_rdt;
88 static char *hdr_scratch = NULL;
89 
90 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
91 
92 #define HEADER_SCRATCH_SIZE \
93 	(SizeOfXLogRecord + \
94 	 MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
95 	 SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
96 
97 /*
98  * An array of XLogRecData structs, to hold registered data.
99  */
100 static XLogRecData *rdatas;
101 static int	num_rdatas;			/* entries currently used */
102 static int	max_rdatas;			/* allocated size */
103 
104 static bool begininsert_called = false;
105 
106 /* Memory context to hold the registered buffer and data references. */
107 static MemoryContext xloginsert_cxt;
108 
109 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
110 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
111 				   XLogRecPtr *fpw_lsn);
112 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
113 						uint16 hole_length, char *dest, uint16 *dlen);
114 
115 /*
116  * Begin constructing a WAL record. This must be called before the
117  * XLogRegister* functions and XLogInsert().
118  */
119 void
XLogBeginInsert(void)120 XLogBeginInsert(void)
121 {
122 	Assert(max_registered_block_id == 0);
123 	Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
124 	Assert(mainrdata_len == 0);
125 
126 	/* cross-check on whether we should be here or not */
127 	if (!XLogInsertAllowed())
128 		elog(ERROR, "cannot make new WAL entries during recovery");
129 
130 	if (begininsert_called)
131 		elog(ERROR, "XLogBeginInsert was already called");
132 
133 	begininsert_called = true;
134 }
135 
136 /*
137  * Ensure that there are enough buffer and data slots in the working area,
138  * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
139  * calls.
140  *
141  * There is always space for a small number of buffers and data chunks, enough
142  * for most record types. This function is for the exceptional cases that need
143  * more.
144  */
145 void
XLogEnsureRecordSpace(int max_block_id,int ndatas)146 XLogEnsureRecordSpace(int max_block_id, int ndatas)
147 {
148 	int			nbuffers;
149 
150 	/*
151 	 * This must be called before entering a critical section, because
152 	 * allocating memory inside a critical section can fail. repalloc() will
153 	 * check the same, but better to check it here too so that we fail
154 	 * consistently even if the arrays happen to be large enough already.
155 	 */
156 	Assert(CritSectionCount == 0);
157 
158 	/* the minimum values can't be decreased */
159 	if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
160 		max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
161 	if (ndatas < XLR_NORMAL_RDATAS)
162 		ndatas = XLR_NORMAL_RDATAS;
163 
164 	if (max_block_id > XLR_MAX_BLOCK_ID)
165 		elog(ERROR, "maximum number of WAL record block references exceeded");
166 	nbuffers = max_block_id + 1;
167 
168 	if (nbuffers > max_registered_buffers)
169 	{
170 		registered_buffers = (registered_buffer *)
171 			repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
172 
173 		/*
174 		 * At least the padding bytes in the structs must be zeroed, because
175 		 * they are included in WAL data, but initialize it all for tidiness.
176 		 */
177 		MemSet(&registered_buffers[max_registered_buffers], 0,
178 			(nbuffers - max_registered_buffers) * sizeof(registered_buffer));
179 		max_registered_buffers = nbuffers;
180 	}
181 
182 	if (ndatas > max_rdatas)
183 	{
184 		rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
185 		max_rdatas = ndatas;
186 	}
187 }
188 
189 /*
190  * Reset WAL record construction buffers.
191  */
192 void
XLogResetInsertion(void)193 XLogResetInsertion(void)
194 {
195 	int			i;
196 
197 	for (i = 0; i < max_registered_block_id; i++)
198 		registered_buffers[i].in_use = false;
199 
200 	num_rdatas = 0;
201 	max_registered_block_id = 0;
202 	mainrdata_len = 0;
203 	mainrdata_last = (XLogRecData *) &mainrdata_head;
204 	include_origin = false;
205 	begininsert_called = false;
206 }
207 
208 /*
209  * Register a reference to a buffer with the WAL record being constructed.
210  * This must be called for every page that the WAL-logged operation modifies.
211  */
212 void
XLogRegisterBuffer(uint8 block_id,Buffer buffer,uint8 flags)213 XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
214 {
215 	registered_buffer *regbuf;
216 
217 	/* NO_IMAGE doesn't make sense with FORCE_IMAGE */
218 	Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
219 	Assert(begininsert_called);
220 
221 	if (block_id >= max_registered_block_id)
222 	{
223 		if (block_id >= max_registered_buffers)
224 			elog(ERROR, "too many registered buffers");
225 		max_registered_block_id = block_id + 1;
226 	}
227 
228 	regbuf = &registered_buffers[block_id];
229 
230 	BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
231 	regbuf->page = BufferGetPage(buffer);
232 	regbuf->flags = flags;
233 	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
234 	regbuf->rdata_len = 0;
235 
236 	/*
237 	 * Check that this page hasn't already been registered with some other
238 	 * block_id.
239 	 */
240 #ifdef USE_ASSERT_CHECKING
241 	{
242 		int			i;
243 
244 		for (i = 0; i < max_registered_block_id; i++)
245 		{
246 			registered_buffer *regbuf_old = &registered_buffers[i];
247 
248 			if (i == block_id || !regbuf_old->in_use)
249 				continue;
250 
251 			Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
252 				   regbuf_old->forkno != regbuf->forkno ||
253 				   regbuf_old->block != regbuf->block);
254 		}
255 	}
256 #endif
257 
258 	regbuf->in_use = true;
259 }
260 
261 /*
262  * Like XLogRegisterBuffer, but for registering a block that's not in the
263  * shared buffer pool (i.e. when you don't have a Buffer for it).
264  */
265 void
XLogRegisterBlock(uint8 block_id,RelFileNode * rnode,ForkNumber forknum,BlockNumber blknum,Page page,uint8 flags)266 XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
267 				  BlockNumber blknum, Page page, uint8 flags)
268 {
269 	registered_buffer *regbuf;
270 
271 	/* This is currently only used to WAL-log a full-page image of a page */
272 	Assert(flags & REGBUF_FORCE_IMAGE);
273 	Assert(begininsert_called);
274 
275 	if (block_id >= max_registered_block_id)
276 		max_registered_block_id = block_id + 1;
277 
278 	if (block_id >= max_registered_buffers)
279 		elog(ERROR, "too many registered buffers");
280 
281 	regbuf = &registered_buffers[block_id];
282 
283 	regbuf->rnode = *rnode;
284 	regbuf->forkno = forknum;
285 	regbuf->block = blknum;
286 	regbuf->page = page;
287 	regbuf->flags = flags;
288 	regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
289 	regbuf->rdata_len = 0;
290 
291 	/*
292 	 * Check that this page hasn't already been registered with some other
293 	 * block_id.
294 	 */
295 #ifdef USE_ASSERT_CHECKING
296 	{
297 		int			i;
298 
299 		for (i = 0; i < max_registered_block_id; i++)
300 		{
301 			registered_buffer *regbuf_old = &registered_buffers[i];
302 
303 			if (i == block_id || !regbuf_old->in_use)
304 				continue;
305 
306 			Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
307 				   regbuf_old->forkno != regbuf->forkno ||
308 				   regbuf_old->block != regbuf->block);
309 		}
310 	}
311 #endif
312 
313 	regbuf->in_use = true;
314 }
315 
316 /*
317  * Add data to the WAL record that's being constructed.
318  *
319  * The data is appended to the "main chunk", available at replay with
320  * XLogRecGetData().
321  */
322 void
XLogRegisterData(char * data,int len)323 XLogRegisterData(char *data, int len)
324 {
325 	XLogRecData *rdata;
326 
327 	Assert(begininsert_called);
328 
329 	if (num_rdatas >= max_rdatas)
330 		elog(ERROR, "too much WAL data");
331 	rdata = &rdatas[num_rdatas++];
332 
333 	rdata->data = data;
334 	rdata->len = len;
335 
336 	/*
337 	 * we use the mainrdata_last pointer to track the end of the chain, so no
338 	 * need to clear 'next' here.
339 	 */
340 
341 	mainrdata_last->next = rdata;
342 	mainrdata_last = rdata;
343 
344 	mainrdata_len += len;
345 }
346 
347 /*
348  * Add buffer-specific data to the WAL record that's being constructed.
349  *
350  * Block_id must reference a block previously registered with
351  * XLogRegisterBuffer(). If this is called more than once for the same
352  * block_id, the data is appended.
353  *
354  * The maximum amount of data that can be registered per block is 65535
355  * bytes. That should be plenty; if you need more than BLCKSZ bytes to
356  * reconstruct the changes to the page, you might as well just log a full
357  * copy of it. (the "main data" that's not associated with a block is not
358  * limited)
359  */
360 void
XLogRegisterBufData(uint8 block_id,char * data,int len)361 XLogRegisterBufData(uint8 block_id, char *data, int len)
362 {
363 	registered_buffer *regbuf;
364 	XLogRecData *rdata;
365 
366 	Assert(begininsert_called);
367 
368 	/* find the registered buffer struct */
369 	regbuf = &registered_buffers[block_id];
370 	if (!regbuf->in_use)
371 		elog(ERROR, "no block with id %d registered with WAL insertion",
372 			 block_id);
373 
374 	if (num_rdatas >= max_rdatas)
375 		elog(ERROR, "too much WAL data");
376 	rdata = &rdatas[num_rdatas++];
377 
378 	rdata->data = data;
379 	rdata->len = len;
380 
381 	regbuf->rdata_tail->next = rdata;
382 	regbuf->rdata_tail = rdata;
383 	regbuf->rdata_len += len;
384 }
385 
386 /*
387  * Should this record include the replication origin if one is set up?
388  */
389 void
XLogIncludeOrigin(void)390 XLogIncludeOrigin(void)
391 {
392 	Assert(begininsert_called);
393 	include_origin = true;
394 }
395 
396 /*
397  * Insert an XLOG record having the specified RMID and info bytes, with the
398  * body of the record being the data and buffer references registered earlier
399  * with XLogRegister* calls.
400  *
401  * Returns XLOG pointer to end of record (beginning of next record).
402  * This can be used as LSN for data pages affected by the logged action.
403  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
404  * before the data page can be written out.  This implements the basic
405  * WAL rule "write the log before the data".)
406  */
407 XLogRecPtr
XLogInsert(RmgrId rmid,uint8 info)408 XLogInsert(RmgrId rmid, uint8 info)
409 {
410 	XLogRecPtr	EndPos;
411 
412 	/* XLogBeginInsert() must have been called. */
413 	if (!begininsert_called)
414 		elog(ERROR, "XLogBeginInsert was not called");
415 
416 	/*
417 	 * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
418 	 * reserved for use by me.
419 	 */
420 	if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
421 		elog(PANIC, "invalid xlog info mask %02X", info);
422 
423 	TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
424 
425 	/*
426 	 * In bootstrap mode, we don't actually log anything but XLOG resources;
427 	 * return a phony record pointer.
428 	 */
429 	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
430 	{
431 		XLogResetInsertion();
432 		EndPos = SizeOfXLogLongPHD;		/* start of 1st chkpt record */
433 		return EndPos;
434 	}
435 
436 	do
437 	{
438 		XLogRecPtr	RedoRecPtr;
439 		bool		doPageWrites;
440 		XLogRecPtr	fpw_lsn;
441 		XLogRecData *rdt;
442 
443 		/*
444 		 * Get values needed to decide whether to do full-page writes. Since
445 		 * we don't yet have an insertion lock, these could change under us,
446 		 * but XLogInsertRecord will recheck them once it has a lock.
447 		 */
448 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
449 
450 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
451 								 &fpw_lsn);
452 
453 		EndPos = XLogInsertRecord(rdt, fpw_lsn);
454 	} while (EndPos == InvalidXLogRecPtr);
455 
456 	XLogResetInsertion();
457 
458 	return EndPos;
459 }
460 
461 /*
462  * Assemble a WAL record from the registered data and buffers into an
463  * XLogRecData chain, ready for insertion with XLogInsertRecord().
464  *
465  * The record header fields are filled in, except for the xl_prev field. The
466  * calculated CRC does not include the record header yet.
467  *
468  * If there are any registered buffers, and a full-page image was not taken
469  * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
470  * signals that the assembled record is only good for insertion on the
471  * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
472  */
473 static XLogRecData *
XLogRecordAssemble(RmgrId rmid,uint8 info,XLogRecPtr RedoRecPtr,bool doPageWrites,XLogRecPtr * fpw_lsn)474 XLogRecordAssemble(RmgrId rmid, uint8 info,
475 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
476 				   XLogRecPtr *fpw_lsn)
477 {
478 	XLogRecData *rdt;
479 	uint32		total_len = 0;
480 	int			block_id;
481 	pg_crc32c	rdata_crc;
482 	registered_buffer *prev_regbuf = NULL;
483 	XLogRecData *rdt_datas_last;
484 	XLogRecord *rechdr;
485 	char	   *scratch = hdr_scratch;
486 
487 	/*
488 	 * Note: this function can be called multiple times for the same record.
489 	 * All the modifications we do to the rdata chains below must handle that.
490 	 */
491 
492 	/* The record begins with the fixed-size header */
493 	rechdr = (XLogRecord *) scratch;
494 	scratch += SizeOfXLogRecord;
495 
496 	hdr_rdt.next = NULL;
497 	rdt_datas_last = &hdr_rdt;
498 	hdr_rdt.data = hdr_scratch;
499 
500 	/*
501 	 * Make an rdata chain containing all the data portions of all block
502 	 * references. This includes the data for full-page images. Also append
503 	 * the headers for the block references in the scratch buffer.
504 	 */
505 	*fpw_lsn = InvalidXLogRecPtr;
506 	for (block_id = 0; block_id < max_registered_block_id; block_id++)
507 	{
508 		registered_buffer *regbuf = &registered_buffers[block_id];
509 		bool		needs_backup;
510 		bool		needs_data;
511 		XLogRecordBlockHeader bkpb;
512 		XLogRecordBlockImageHeader bimg;
513 		XLogRecordBlockCompressHeader cbimg = {0};
514 		bool		samerel;
515 		bool		is_compressed = false;
516 
517 		if (!regbuf->in_use)
518 			continue;
519 
520 		/* Determine if this block needs to be backed up */
521 		if (regbuf->flags & REGBUF_FORCE_IMAGE)
522 			needs_backup = true;
523 		else if (regbuf->flags & REGBUF_NO_IMAGE)
524 			needs_backup = false;
525 		else if (!doPageWrites)
526 			needs_backup = false;
527 		else
528 		{
529 			/*
530 			 * We assume page LSN is first data on *every* page that can be
531 			 * passed to XLogInsert, whether it has the standard page layout
532 			 * or not.
533 			 */
534 			XLogRecPtr	page_lsn = PageGetLSN(regbuf->page);
535 
536 			needs_backup = (page_lsn <= RedoRecPtr);
537 			if (!needs_backup)
538 			{
539 				if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
540 					*fpw_lsn = page_lsn;
541 			}
542 		}
543 
544 		/* Determine if the buffer data needs to included */
545 		if (regbuf->rdata_len == 0)
546 			needs_data = false;
547 		else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
548 			needs_data = true;
549 		else
550 			needs_data = !needs_backup;
551 
552 		bkpb.id = block_id;
553 		bkpb.fork_flags = regbuf->forkno;
554 		bkpb.data_length = 0;
555 
556 		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
557 			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
558 
559 		if (needs_backup)
560 		{
561 			Page		page = regbuf->page;
562 			uint16		compressed_len;
563 
564 			/*
565 			 * The page needs to be backed up, so calculate its hole length
566 			 * and offset.
567 			 */
568 			if (regbuf->flags & REGBUF_STANDARD)
569 			{
570 				/* Assume we can omit data between pd_lower and pd_upper */
571 				uint16		lower = ((PageHeader) page)->pd_lower;
572 				uint16		upper = ((PageHeader) page)->pd_upper;
573 
574 				if (lower >= SizeOfPageHeaderData &&
575 					upper > lower &&
576 					upper <= BLCKSZ)
577 				{
578 					bimg.hole_offset = lower;
579 					cbimg.hole_length = upper - lower;
580 				}
581 				else
582 				{
583 					/* No "hole" to compress out */
584 					bimg.hole_offset = 0;
585 					cbimg.hole_length = 0;
586 				}
587 			}
588 			else
589 			{
590 				/* Not a standard page header, don't try to eliminate "hole" */
591 				bimg.hole_offset = 0;
592 				cbimg.hole_length = 0;
593 			}
594 
595 			/*
596 			 * Try to compress a block image if wal_compression is enabled
597 			 */
598 			if (wal_compression)
599 			{
600 				is_compressed =
601 					XLogCompressBackupBlock(page, bimg.hole_offset,
602 											cbimg.hole_length,
603 											regbuf->compressed_page,
604 											&compressed_len);
605 			}
606 
607 			/*
608 			 * Fill in the remaining fields in the XLogRecordBlockHeader
609 			 * struct
610 			 */
611 			bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
612 
613 			/*
614 			 * Construct XLogRecData entries for the page content.
615 			 */
616 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
617 			rdt_datas_last = rdt_datas_last->next;
618 
619 			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
620 
621 			if (is_compressed)
622 			{
623 				bimg.length = compressed_len;
624 				bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
625 
626 				rdt_datas_last->data = regbuf->compressed_page;
627 				rdt_datas_last->len = compressed_len;
628 			}
629 			else
630 			{
631 				bimg.length = BLCKSZ - cbimg.hole_length;
632 
633 				if (cbimg.hole_length == 0)
634 				{
635 					rdt_datas_last->data = page;
636 					rdt_datas_last->len = BLCKSZ;
637 				}
638 				else
639 				{
640 					/* must skip the hole */
641 					rdt_datas_last->data = page;
642 					rdt_datas_last->len = bimg.hole_offset;
643 
644 					rdt_datas_last->next = &regbuf->bkp_rdatas[1];
645 					rdt_datas_last = rdt_datas_last->next;
646 
647 					rdt_datas_last->data =
648 						page + (bimg.hole_offset + cbimg.hole_length);
649 					rdt_datas_last->len =
650 						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
651 				}
652 			}
653 
654 			total_len += bimg.length;
655 		}
656 
657 		if (needs_data)
658 		{
659 			/*
660 			 * Link the caller-supplied rdata chain for this buffer to the
661 			 * overall list.
662 			 */
663 			bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
664 			bkpb.data_length = regbuf->rdata_len;
665 			total_len += regbuf->rdata_len;
666 
667 			rdt_datas_last->next = regbuf->rdata_head;
668 			rdt_datas_last = regbuf->rdata_tail;
669 		}
670 
671 		if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
672 		{
673 			samerel = true;
674 			bkpb.fork_flags |= BKPBLOCK_SAME_REL;
675 		}
676 		else
677 			samerel = false;
678 		prev_regbuf = regbuf;
679 
680 		/* Ok, copy the header to the scratch buffer */
681 		memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
682 		scratch += SizeOfXLogRecordBlockHeader;
683 		if (needs_backup)
684 		{
685 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
686 			scratch += SizeOfXLogRecordBlockImageHeader;
687 			if (cbimg.hole_length != 0 && is_compressed)
688 			{
689 				memcpy(scratch, &cbimg,
690 					   SizeOfXLogRecordBlockCompressHeader);
691 				scratch += SizeOfXLogRecordBlockCompressHeader;
692 			}
693 		}
694 		if (!samerel)
695 		{
696 			memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
697 			scratch += sizeof(RelFileNode);
698 		}
699 		memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
700 		scratch += sizeof(BlockNumber);
701 	}
702 
703 	/* followed by the record's origin, if any */
704 	if (include_origin && replorigin_session_origin != InvalidRepOriginId)
705 	{
706 		*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
707 		memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
708 		scratch += sizeof(replorigin_session_origin);
709 	}
710 
711 	/* followed by main data, if any */
712 	if (mainrdata_len > 0)
713 	{
714 		if (mainrdata_len > 255)
715 		{
716 			*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
717 			memcpy(scratch, &mainrdata_len, sizeof(uint32));
718 			scratch += sizeof(uint32);
719 		}
720 		else
721 		{
722 			*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
723 			*(scratch++) = (uint8) mainrdata_len;
724 		}
725 		rdt_datas_last->next = mainrdata_head;
726 		rdt_datas_last = mainrdata_last;
727 		total_len += mainrdata_len;
728 	}
729 	rdt_datas_last->next = NULL;
730 
731 	hdr_rdt.len = (scratch - hdr_scratch);
732 	total_len += hdr_rdt.len;
733 
734 	/*
735 	 * Calculate CRC of the data
736 	 *
737 	 * Note that the record header isn't added into the CRC initially since we
738 	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
739 	 * the whole record in the order: rdata, then backup blocks, then record
740 	 * header.
741 	 */
742 	INIT_CRC32C(rdata_crc);
743 	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
744 	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
745 		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
746 
747 	/*
748 	 * Fill in the fields in the record header. Prev-link is filled in later,
749 	 * once we know where in the WAL the record will be inserted. The CRC does
750 	 * not include the record header yet.
751 	 */
752 	rechdr->xl_xid = GetCurrentTransactionIdIfAny();
753 	rechdr->xl_tot_len = total_len;
754 	rechdr->xl_info = info;
755 	rechdr->xl_rmid = rmid;
756 	rechdr->xl_prev = InvalidXLogRecPtr;
757 	rechdr->xl_crc = rdata_crc;
758 
759 	return &hdr_rdt;
760 }
761 
762 /*
763  * Create a compressed version of a backup block image.
764  *
765  * Returns FALSE if compression fails (i.e., compressed result is actually
766  * bigger than original). Otherwise, returns TRUE and sets 'dlen' to
767  * the length of compressed block image.
768  */
769 static bool
XLogCompressBackupBlock(char * page,uint16 hole_offset,uint16 hole_length,char * dest,uint16 * dlen)770 XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
771 						char *dest, uint16 *dlen)
772 {
773 	int32		orig_len = BLCKSZ - hole_length;
774 	int32		len;
775 	int32		extra_bytes = 0;
776 	char	   *source;
777 	PGAlignedBlock tmp;
778 
779 	if (hole_length != 0)
780 	{
781 		/* must skip the hole */
782 		source = tmp.data;
783 		memcpy(source, page, hole_offset);
784 		memcpy(source + hole_offset,
785 			   page + (hole_offset + hole_length),
786 			   BLCKSZ - (hole_length + hole_offset));
787 
788 		/*
789 		 * Extra data needs to be stored in WAL record for the compressed
790 		 * version of block image if the hole exists.
791 		 */
792 		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
793 	}
794 	else
795 		source = page;
796 
797 	/*
798 	 * We recheck the actual size even if pglz_compress() reports success and
799 	 * see if the number of bytes saved by compression is larger than the
800 	 * length of extra data needed for the compressed version of block image.
801 	 */
802 	len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
803 	if (len >= 0 &&
804 		len + extra_bytes < orig_len)
805 	{
806 		*dlen = (uint16) len;	/* successful compression */
807 		return true;
808 	}
809 	return false;
810 }
811 
812 /*
813  * Determine whether the buffer referenced has to be backed up.
814  *
815  * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
816  * could change later, so the result should be used for optimization purposes
817  * only.
818  */
819 bool
XLogCheckBufferNeedsBackup(Buffer buffer)820 XLogCheckBufferNeedsBackup(Buffer buffer)
821 {
822 	XLogRecPtr	RedoRecPtr;
823 	bool		doPageWrites;
824 	Page		page;
825 
826 	GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
827 
828 	page = BufferGetPage(buffer);
829 
830 	if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
831 		return true;			/* buffer requires backup */
832 
833 	return false;				/* buffer does not need to be backed up */
834 }
835 
836 /*
837  * Write a backup block if needed when we are setting a hint. Note that
838  * this may be called for a variety of page types, not just heaps.
839  *
840  * Callable while holding just share lock on the buffer content.
841  *
842  * We can't use the plain backup block mechanism since that relies on the
843  * Buffer being exclusively locked. Since some modifications (setting LSN, hint
844  * bits) are allowed in a sharelocked buffer that can lead to wal checksum
845  * failures. So instead we copy the page and insert the copied data as normal
846  * record data.
847  *
848  * We only need to do something if page has not yet been full page written in
849  * this checkpoint round. The LSN of the inserted wal record is returned if we
850  * had to write, InvalidXLogRecPtr otherwise.
851  *
852  * It is possible that multiple concurrent backends could attempt to write WAL
853  * records. In that case, multiple copies of the same block would be recorded
854  * in separate WAL records by different backends, though that is still OK from
855  * a correctness perspective.
856  */
857 XLogRecPtr
XLogSaveBufferForHint(Buffer buffer,bool buffer_std)858 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
859 {
860 	XLogRecPtr	recptr = InvalidXLogRecPtr;
861 	XLogRecPtr	lsn;
862 	XLogRecPtr	RedoRecPtr;
863 
864 	/*
865 	 * Ensure no checkpoint can change our view of RedoRecPtr.
866 	 */
867 	Assert(MyPgXact->delayChkpt);
868 
869 	/*
870 	 * Update RedoRecPtr so that we can make the right decision
871 	 */
872 	RedoRecPtr = GetRedoRecPtr();
873 
874 	/*
875 	 * We assume page LSN is first data on *every* page that can be passed to
876 	 * XLogInsert, whether it has the standard page layout or not. Since we're
877 	 * only holding a share-lock on the page, we must take the buffer header
878 	 * lock when we look at the LSN.
879 	 */
880 	lsn = BufferGetLSNAtomic(buffer);
881 
882 	if (lsn <= RedoRecPtr)
883 	{
884 		int			flags;
885 		PGAlignedBlock copied_buffer;
886 		char	   *origdata = (char *) BufferGetBlock(buffer);
887 		RelFileNode rnode;
888 		ForkNumber	forkno;
889 		BlockNumber blkno;
890 
891 		/*
892 		 * Copy buffer so we don't have to worry about concurrent hint bit or
893 		 * lsn updates. We assume pd_lower/upper cannot be changed without an
894 		 * exclusive lock, so the contents bkp are not racy.
895 		 */
896 		if (buffer_std)
897 		{
898 			/* Assume we can omit data between pd_lower and pd_upper */
899 			Page		page = BufferGetPage(buffer);
900 			uint16		lower = ((PageHeader) page)->pd_lower;
901 			uint16		upper = ((PageHeader) page)->pd_upper;
902 
903 			memcpy(copied_buffer.data, origdata, lower);
904 			memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
905 		}
906 		else
907 			memcpy(copied_buffer.data, origdata, BLCKSZ);
908 
909 		XLogBeginInsert();
910 
911 		flags = REGBUF_FORCE_IMAGE;
912 		if (buffer_std)
913 			flags |= REGBUF_STANDARD;
914 
915 		BufferGetTag(buffer, &rnode, &forkno, &blkno);
916 		XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer.data, flags);
917 
918 		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
919 	}
920 
921 	return recptr;
922 }
923 
924 /*
925  * Write a WAL record containing a full image of a page. Caller is responsible
926  * for writing the page to disk after calling this routine.
927  *
928  * Note: If you're using this function, you should be building pages in private
929  * memory and writing them directly to smgr.  If you're using buffers, call
930  * log_newpage_buffer instead.
931  *
932  * If the page follows the standard page layout, with a PageHeader and unused
933  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
934  * the unused space to be left out from the WAL record, making it smaller.
935  */
936 XLogRecPtr
log_newpage(RelFileNode * rnode,ForkNumber forkNum,BlockNumber blkno,Page page,bool page_std)937 log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
938 			Page page, bool page_std)
939 {
940 	int			flags;
941 	XLogRecPtr	recptr;
942 
943 	flags = REGBUF_FORCE_IMAGE;
944 	if (page_std)
945 		flags |= REGBUF_STANDARD;
946 
947 	XLogBeginInsert();
948 	XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
949 	recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
950 
951 	/*
952 	 * The page may be uninitialized. If so, we can't set the LSN because that
953 	 * would corrupt the page.
954 	 */
955 	if (!PageIsNew(page))
956 	{
957 		PageSetLSN(page, recptr);
958 	}
959 
960 	return recptr;
961 }
962 
963 /*
964  * Write a WAL record containing a full image of a page.
965  *
966  * Caller should initialize the buffer and mark it dirty before calling this
967  * function.  This function will set the page LSN.
968  *
969  * If the page follows the standard page layout, with a PageHeader and unused
970  * space between pd_lower and pd_upper, set 'page_std' to TRUE. That allows
971  * the unused space to be left out from the WAL record, making it smaller.
972  */
973 XLogRecPtr
log_newpage_buffer(Buffer buffer,bool page_std)974 log_newpage_buffer(Buffer buffer, bool page_std)
975 {
976 	Page		page = BufferGetPage(buffer);
977 	RelFileNode rnode;
978 	ForkNumber	forkNum;
979 	BlockNumber blkno;
980 
981 	/* Shared buffers should be modified in a critical section. */
982 	Assert(CritSectionCount > 0);
983 
984 	BufferGetTag(buffer, &rnode, &forkNum, &blkno);
985 
986 	return log_newpage(&rnode, forkNum, blkno, page, page_std);
987 }
988 
989 /*
990  * WAL-log a range of blocks in a relation.
991  *
992  * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
993  * written to the WAL. If the range is large, this is done in multiple WAL
994  * records.
995  *
996  * If all page follows the standard page layout, with a PageHeader and unused
997  * space between pd_lower and pd_upper, set 'page_std' to true. That allows
998  * the unused space to be left out from the WAL records, making them smaller.
999  *
1000  * NOTE: This function acquires exclusive-locks on the pages. Typically, this
1001  * is used on a newly-built relation, and the caller is holding a
1002  * AccessExclusiveLock on it, so no other backend can be accessing it at the
1003  * same time. If that's not the case, you must ensure that this does not
1004  * cause a deadlock through some other means.
1005  */
1006 void
log_newpage_range(Relation rel,ForkNumber forkNum,BlockNumber startblk,BlockNumber endblk,bool page_std)1007 log_newpage_range(Relation rel, ForkNumber forkNum,
1008 				  BlockNumber startblk, BlockNumber endblk,
1009 				  bool page_std)
1010 {
1011 	int			flags;
1012 	BlockNumber blkno;
1013 
1014 	flags = REGBUF_FORCE_IMAGE;
1015 	if (page_std)
1016 		flags |= REGBUF_STANDARD;
1017 
1018 	/*
1019 	 * Iterate over all the pages in the range. They are collected into
1020 	 * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
1021 	 * for each batch.
1022 	 */
1023 	XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
1024 
1025 	blkno = startblk;
1026 	while (blkno < endblk)
1027 	{
1028 		Buffer		bufpack[XLR_MAX_BLOCK_ID];
1029 		XLogRecPtr	recptr;
1030 		int			nbufs;
1031 		int			i;
1032 
1033 		CHECK_FOR_INTERRUPTS();
1034 
1035 		/* Collect a batch of blocks. */
1036 		nbufs = 0;
1037 		while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
1038 		{
1039 			Buffer		buf = ReadBufferExtended(rel, forkNum, blkno,
1040 												 RBM_NORMAL, NULL);
1041 
1042 			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1043 
1044 			/*
1045 			 * Completely empty pages are not WAL-logged. Writing a WAL record
1046 			 * would change the LSN, and we don't want that. We want the page
1047 			 * to stay empty.
1048 			 */
1049 			if (!PageIsNew(BufferGetPage(buf)))
1050 				bufpack[nbufs++] = buf;
1051 			else
1052 				UnlockReleaseBuffer(buf);
1053 			blkno++;
1054 		}
1055 
1056 		/* Write WAL record for this batch. */
1057 		XLogBeginInsert();
1058 
1059 		START_CRIT_SECTION();
1060 		for (i = 0; i < nbufs; i++)
1061 		{
1062 			XLogRegisterBuffer(i, bufpack[i], flags);
1063 			MarkBufferDirty(bufpack[i]);
1064 		}
1065 
1066 		recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_MULTI);
1067 
1068 		for (i = 0; i < nbufs; i++)
1069 		{
1070 			PageSetLSN(BufferGetPage(bufpack[i]), recptr);
1071 			UnlockReleaseBuffer(bufpack[i]);
1072 		}
1073 		END_CRIT_SECTION();
1074 	}
1075 }
1076 
1077 /*
1078  * Allocate working buffers needed for WAL record construction.
1079  */
1080 void
InitXLogInsert(void)1081 InitXLogInsert(void)
1082 {
1083 	/* Initialize the working areas */
1084 	if (xloginsert_cxt == NULL)
1085 	{
1086 		xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
1087 											   "WAL record construction",
1088 											   ALLOCSET_DEFAULT_SIZES);
1089 	}
1090 
1091 	if (registered_buffers == NULL)
1092 	{
1093 		registered_buffers = (registered_buffer *)
1094 			MemoryContextAllocZero(xloginsert_cxt,
1095 				  sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
1096 		max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
1097 	}
1098 	if (rdatas == NULL)
1099 	{
1100 		rdatas = MemoryContextAlloc(xloginsert_cxt,
1101 									sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
1102 		max_rdatas = XLR_NORMAL_RDATAS;
1103 	}
1104 
1105 	/*
1106 	 * Allocate a buffer to hold the header information for a WAL record.
1107 	 */
1108 	if (hdr_scratch == NULL)
1109 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
1110 											 HEADER_SCRATCH_SIZE);
1111 }
1112