1 /*-------------------------------------------------------------------------
2  *
3  * generic_xlog.c
4  *	 Implementation of generic xlog records.
5  *
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/backend/access/transam/generic_xlog.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 #include "access/bufmask.h"
17 #include "access/generic_xlog.h"
18 #include "access/xlogutils.h"
19 #include "miscadmin.h"
20 #include "utils/memutils.h"
21 
22 /*-------------------------------------------------------------------------
23  * Internally, a delta between pages consists of a set of fragments.  Each
24  * fragment represents changes made in a given region of a page.  A fragment
25  * is made up as follows:
26  *
27  * - offset of page region (OffsetNumber)
28  * - length of page region (OffsetNumber)
29  * - data - the data to place into the region ('length' number of bytes)
30  *
31  * Unchanged regions of a page are not represented in its delta.  As a result,
32  * a delta can be more compact than the full page image.  But having an
33  * unchanged region between two fragments that is smaller than the fragment
34  * header (offset+length) does not pay off in terms of the overall size of
35  * the delta.  For this reason, we merge adjacent fragments if the unchanged
36  * region between them is <= MATCH_THRESHOLD bytes.
37  *
38  * We do not bother to merge fragments across the "lower" and "upper" parts
39  * of a page; it's very seldom the case that pd_lower and pd_upper are within
40  * MATCH_THRESHOLD bytes of each other, and handling that infrequent case
41  * would complicate and slow down the delta-computation code unduly.
42  * Therefore, the worst-case delta size includes two fragment headers plus
43  * a full page's worth of data.
44  *-------------------------------------------------------------------------
45  */
46 #define FRAGMENT_HEADER_SIZE	(2 * sizeof(OffsetNumber))
47 #define MATCH_THRESHOLD			FRAGMENT_HEADER_SIZE
48 #define MAX_DELTA_SIZE			(BLCKSZ + 2 * FRAGMENT_HEADER_SIZE)
49 
50 /* Struct of generic xlog data for single page */
51 typedef struct
52 {
53 	Buffer		buffer;			/* registered buffer */
54 	int			flags;			/* flags for this buffer */
55 	int			deltaLen;		/* space consumed in delta field */
56 	char	   *image;			/* copy of page image for modification, do not
57 								 * do it in-place to have aligned memory chunk */
58 	char		delta[MAX_DELTA_SIZE];	/* delta between page images */
59 } PageData;
60 
61 /* State of generic xlog record construction */
62 struct GenericXLogState
63 {
64 	/* Info about each page, see above */
65 	PageData	pages[MAX_GENERIC_XLOG_PAGES];
66 	bool		isLogged;
67 	/* Page images (properly aligned) */
68 	PGAlignedBlock images[MAX_GENERIC_XLOG_PAGES];
69 };
70 
71 static void writeFragment(PageData *pageData, OffsetNumber offset,
72 			  OffsetNumber len, const char *data);
73 static void computeRegionDelta(PageData *pageData,
74 				   const char *curpage, const char *targetpage,
75 				   int targetStart, int targetEnd,
76 				   int validStart, int validEnd);
77 static void computeDelta(PageData *pageData, Page curpage, Page targetpage);
78 static void applyPageRedo(Page page, const char *delta, Size deltaSize);
79 
80 
81 /*
82  * Write next fragment into pageData's delta.
83  *
84  * The fragment has the given offset and length, and data points to the
85  * actual data (of length length).
86  */
87 static void
writeFragment(PageData * pageData,OffsetNumber offset,OffsetNumber length,const char * data)88 writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
89 			  const char *data)
90 {
91 	char	   *ptr = pageData->delta + pageData->deltaLen;
92 
93 	/* Verify we have enough space */
94 	Assert(pageData->deltaLen + sizeof(offset) +
95 		   sizeof(length) + length <= sizeof(pageData->delta));
96 
97 	/* Write fragment data */
98 	memcpy(ptr, &offset, sizeof(offset));
99 	ptr += sizeof(offset);
100 	memcpy(ptr, &length, sizeof(length));
101 	ptr += sizeof(length);
102 	memcpy(ptr, data, length);
103 	ptr += length;
104 
105 	pageData->deltaLen = ptr - pageData->delta;
106 }
107 
108 /*
109  * Compute the XLOG fragments needed to transform a region of curpage into the
110  * corresponding region of targetpage, and append them to pageData's delta
111  * field.  The region to transform runs from targetStart to targetEnd-1.
112  * Bytes in curpage outside the range validStart to validEnd-1 should be
113  * considered invalid, and always overwritten with target data.
114  *
115  * This function is a hot spot, so it's worth being as tense as possible
116  * about the data-matching loops.
117  */
118 static void
computeRegionDelta(PageData * pageData,const char * curpage,const char * targetpage,int targetStart,int targetEnd,int validStart,int validEnd)119 computeRegionDelta(PageData *pageData,
120 				   const char *curpage, const char *targetpage,
121 				   int targetStart, int targetEnd,
122 				   int validStart, int validEnd)
123 {
124 	int			i,
125 				loopEnd,
126 				fragmentBegin = -1,
127 				fragmentEnd = -1;
128 
129 	/* Deal with any invalid start region by including it in first fragment */
130 	if (validStart > targetStart)
131 	{
132 		fragmentBegin = targetStart;
133 		targetStart = validStart;
134 	}
135 
136 	/* We'll deal with any invalid end region after the main loop */
137 	loopEnd = Min(targetEnd, validEnd);
138 
139 	/* Examine all the potentially matchable bytes */
140 	i = targetStart;
141 	while (i < loopEnd)
142 	{
143 		if (curpage[i] != targetpage[i])
144 		{
145 			/* On unmatched byte, start new fragment if not already in one */
146 			if (fragmentBegin < 0)
147 				fragmentBegin = i;
148 			/* Mark unmatched-data endpoint as uncertain */
149 			fragmentEnd = -1;
150 			/* Extend the fragment as far as possible in a tight loop */
151 			i++;
152 			while (i < loopEnd && curpage[i] != targetpage[i])
153 				i++;
154 			if (i >= loopEnd)
155 				break;
156 		}
157 
158 		/* Found a matched byte, so remember end of unmatched fragment */
159 		fragmentEnd = i;
160 
161 		/*
162 		 * Extend the match as far as possible in a tight loop.  (On typical
163 		 * workloads, this inner loop is the bulk of this function's runtime.)
164 		 */
165 		i++;
166 		while (i < loopEnd && curpage[i] == targetpage[i])
167 			i++;
168 
169 		/*
170 		 * There are several possible cases at this point:
171 		 *
172 		 * 1. We have no unwritten fragment (fragmentBegin < 0).  There's
173 		 * nothing to write; and it doesn't matter what fragmentEnd is.
174 		 *
175 		 * 2. We found more than MATCH_THRESHOLD consecutive matching bytes.
176 		 * Dump out the unwritten fragment, stopping at fragmentEnd.
177 		 *
178 		 * 3. The match extends to loopEnd.  We'll do nothing here, exit the
179 		 * loop, and then dump the unwritten fragment, after merging it with
180 		 * the invalid end region if any.  If we don't so merge, fragmentEnd
181 		 * establishes how much the final writeFragment call needs to write.
182 		 *
183 		 * 4. We found an unmatched byte before loopEnd.  The loop will repeat
184 		 * and will enter the unmatched-byte stanza above.  So in this case
185 		 * also, it doesn't matter what fragmentEnd is.  The matched bytes
186 		 * will get merged into the continuing unmatched fragment.
187 		 *
188 		 * Only in case 3 do we reach the bottom of the loop with a meaningful
189 		 * fragmentEnd value, which is why it's OK that we unconditionally
190 		 * assign "fragmentEnd = i" above.
191 		 */
192 		if (fragmentBegin >= 0 && i - fragmentEnd > MATCH_THRESHOLD)
193 		{
194 			writeFragment(pageData, fragmentBegin,
195 						  fragmentEnd - fragmentBegin,
196 						  targetpage + fragmentBegin);
197 			fragmentBegin = -1;
198 			fragmentEnd = -1;	/* not really necessary */
199 		}
200 	}
201 
202 	/* Deal with any invalid end region by including it in final fragment */
203 	if (loopEnd < targetEnd)
204 	{
205 		if (fragmentBegin < 0)
206 			fragmentBegin = loopEnd;
207 		fragmentEnd = targetEnd;
208 	}
209 
210 	/* Write final fragment if any */
211 	if (fragmentBegin >= 0)
212 	{
213 		if (fragmentEnd < 0)
214 			fragmentEnd = targetEnd;
215 		writeFragment(pageData, fragmentBegin,
216 					  fragmentEnd - fragmentBegin,
217 					  targetpage + fragmentBegin);
218 	}
219 }
220 
221 /*
222  * Compute the XLOG delta record needed to transform curpage into targetpage,
223  * and store it in pageData's delta field.
224  */
225 static void
computeDelta(PageData * pageData,Page curpage,Page targetpage)226 computeDelta(PageData *pageData, Page curpage, Page targetpage)
227 {
228 	int			targetLower = ((PageHeader) targetpage)->pd_lower,
229 				targetUpper = ((PageHeader) targetpage)->pd_upper,
230 				curLower = ((PageHeader) curpage)->pd_lower,
231 				curUpper = ((PageHeader) curpage)->pd_upper;
232 
233 	pageData->deltaLen = 0;
234 
235 	/* Compute delta records for lower part of page ... */
236 	computeRegionDelta(pageData, curpage, targetpage,
237 					   0, targetLower,
238 					   0, curLower);
239 	/* ... and for upper part, ignoring what's between */
240 	computeRegionDelta(pageData, curpage, targetpage,
241 					   targetUpper, BLCKSZ,
242 					   curUpper, BLCKSZ);
243 
244 	/*
245 	 * If xlog debug is enabled, then check produced delta.  Result of delta
246 	 * application to curpage should be equivalent to targetpage.
247 	 */
248 #ifdef WAL_DEBUG
249 	if (XLOG_DEBUG)
250 	{
251 		PGAlignedBlock tmp;
252 
253 		memcpy(tmp.data, curpage, BLCKSZ);
254 		applyPageRedo(tmp.data, pageData->delta, pageData->deltaLen);
255 		if (memcmp(tmp.data, targetpage, targetLower) != 0 ||
256 			memcmp(tmp.data + targetUpper, targetpage + targetUpper,
257 				   BLCKSZ - targetUpper) != 0)
258 			elog(ERROR, "result of generic xlog apply does not match");
259 	}
260 #endif
261 }
262 
263 /*
264  * Start new generic xlog record for modifications to specified relation.
265  */
266 GenericXLogState *
GenericXLogStart(Relation relation)267 GenericXLogStart(Relation relation)
268 {
269 	GenericXLogState *state;
270 	int			i;
271 
272 	state = (GenericXLogState *) palloc(sizeof(GenericXLogState));
273 	state->isLogged = RelationNeedsWAL(relation);
274 
275 	for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
276 	{
277 		state->pages[i].image = state->images[i].data;
278 		state->pages[i].buffer = InvalidBuffer;
279 	}
280 
281 	return state;
282 }
283 
284 /*
285  * Register new buffer for generic xlog record.
286  *
287  * Returns pointer to the page's image in the GenericXLogState, which
288  * is what the caller should modify.
289  *
290  * If the buffer is already registered, just return its existing entry.
291  * (It's not very clear what to do with the flags in such a case, but
292  * for now we stay with the original flags.)
293  */
294 Page
GenericXLogRegisterBuffer(GenericXLogState * state,Buffer buffer,int flags)295 GenericXLogRegisterBuffer(GenericXLogState *state, Buffer buffer, int flags)
296 {
297 	int			block_id;
298 
299 	/* Search array for existing entry or first unused slot */
300 	for (block_id = 0; block_id < MAX_GENERIC_XLOG_PAGES; block_id++)
301 	{
302 		PageData   *page = &state->pages[block_id];
303 
304 		if (BufferIsInvalid(page->buffer))
305 		{
306 			/* Empty slot, so use it (there cannot be a match later) */
307 			page->buffer = buffer;
308 			page->flags = flags;
309 			memcpy(page->image, BufferGetPage(buffer), BLCKSZ);
310 			return (Page) page->image;
311 		}
312 		else if (page->buffer == buffer)
313 		{
314 			/*
315 			 * Buffer is already registered.  Just return the image, which is
316 			 * already prepared.
317 			 */
318 			return (Page) page->image;
319 		}
320 	}
321 
322 	elog(ERROR, "maximum number %d of generic xlog buffers is exceeded",
323 		 MAX_GENERIC_XLOG_PAGES);
324 	/* keep compiler quiet */
325 	return NULL;
326 }
327 
328 /*
329  * Apply changes represented by GenericXLogState to the actual buffers,
330  * and emit a generic xlog record.
331  */
332 XLogRecPtr
GenericXLogFinish(GenericXLogState * state)333 GenericXLogFinish(GenericXLogState *state)
334 {
335 	XLogRecPtr	lsn;
336 	int			i;
337 
338 	if (state->isLogged)
339 	{
340 		/* Logged relation: make xlog record in critical section. */
341 		XLogBeginInsert();
342 
343 		START_CRIT_SECTION();
344 
345 		for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
346 		{
347 			PageData   *pageData = &state->pages[i];
348 			Page		page;
349 			PageHeader	pageHeader;
350 
351 			if (BufferIsInvalid(pageData->buffer))
352 				continue;
353 
354 			page = BufferGetPage(pageData->buffer);
355 			pageHeader = (PageHeader) pageData->image;
356 
357 			if (pageData->flags & GENERIC_XLOG_FULL_IMAGE)
358 			{
359 				/*
360 				 * A full-page image does not require us to supply any xlog
361 				 * data.  Just apply the image, being careful to zero the
362 				 * "hole" between pd_lower and pd_upper in order to avoid
363 				 * divergence between actual page state and what replay would
364 				 * produce.
365 				 */
366 				memcpy(page, pageData->image, pageHeader->pd_lower);
367 				memset(page + pageHeader->pd_lower, 0,
368 					   pageHeader->pd_upper - pageHeader->pd_lower);
369 				memcpy(page + pageHeader->pd_upper,
370 					   pageData->image + pageHeader->pd_upper,
371 					   BLCKSZ - pageHeader->pd_upper);
372 
373 				XLogRegisterBuffer(i, pageData->buffer,
374 								   REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
375 			}
376 			else
377 			{
378 				/*
379 				 * In normal mode, calculate delta and write it as xlog data
380 				 * associated with this page.
381 				 */
382 				computeDelta(pageData, page, (Page) pageData->image);
383 
384 				/* Apply the image, with zeroed "hole" as above */
385 				memcpy(page, pageData->image, pageHeader->pd_lower);
386 				memset(page + pageHeader->pd_lower, 0,
387 					   pageHeader->pd_upper - pageHeader->pd_lower);
388 				memcpy(page + pageHeader->pd_upper,
389 					   pageData->image + pageHeader->pd_upper,
390 					   BLCKSZ - pageHeader->pd_upper);
391 
392 				XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);
393 				XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);
394 			}
395 		}
396 
397 		/* Insert xlog record */
398 		lsn = XLogInsert(RM_GENERIC_ID, 0);
399 
400 		/* Set LSN and mark buffers dirty */
401 		for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
402 		{
403 			PageData   *pageData = &state->pages[i];
404 
405 			if (BufferIsInvalid(pageData->buffer))
406 				continue;
407 			PageSetLSN(BufferGetPage(pageData->buffer), lsn);
408 			MarkBufferDirty(pageData->buffer);
409 		}
410 		END_CRIT_SECTION();
411 	}
412 	else
413 	{
414 		/* Unlogged relation: skip xlog-related stuff */
415 		START_CRIT_SECTION();
416 		for (i = 0; i < MAX_GENERIC_XLOG_PAGES; i++)
417 		{
418 			PageData   *pageData = &state->pages[i];
419 
420 			if (BufferIsInvalid(pageData->buffer))
421 				continue;
422 			memcpy(BufferGetPage(pageData->buffer),
423 				   pageData->image,
424 				   BLCKSZ);
425 			/* We don't worry about zeroing the "hole" in this case */
426 			MarkBufferDirty(pageData->buffer);
427 		}
428 		END_CRIT_SECTION();
429 		/* We don't have a LSN to return, in this case */
430 		lsn = InvalidXLogRecPtr;
431 	}
432 
433 	pfree(state);
434 
435 	return lsn;
436 }
437 
438 /*
439  * Abort generic xlog record construction.  No changes are applied to buffers.
440  *
441  * Note: caller is responsible for releasing locks/pins on buffers, if needed.
442  */
443 void
GenericXLogAbort(GenericXLogState * state)444 GenericXLogAbort(GenericXLogState *state)
445 {
446 	pfree(state);
447 }
448 
449 /*
450  * Apply delta to given page image.
451  */
452 static void
applyPageRedo(Page page,const char * delta,Size deltaSize)453 applyPageRedo(Page page, const char *delta, Size deltaSize)
454 {
455 	const char *ptr = delta;
456 	const char *end = delta + deltaSize;
457 
458 	while (ptr < end)
459 	{
460 		OffsetNumber offset,
461 					length;
462 
463 		memcpy(&offset, ptr, sizeof(offset));
464 		ptr += sizeof(offset);
465 		memcpy(&length, ptr, sizeof(length));
466 		ptr += sizeof(length);
467 
468 		memcpy(page + offset, ptr, length);
469 
470 		ptr += length;
471 	}
472 }
473 
474 /*
475  * Redo function for generic xlog record.
476  */
477 void
generic_redo(XLogReaderState * record)478 generic_redo(XLogReaderState *record)
479 {
480 	XLogRecPtr	lsn = record->EndRecPtr;
481 	Buffer		buffers[MAX_GENERIC_XLOG_PAGES];
482 	uint8		block_id;
483 
484 	/* Protect limited size of buffers[] array */
485 	Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
486 
487 	/* Iterate over blocks */
488 	for (block_id = 0; block_id <= record->max_block_id; block_id++)
489 	{
490 		XLogRedoAction action;
491 
492 		if (!XLogRecHasBlockRef(record, block_id))
493 		{
494 			buffers[block_id] = InvalidBuffer;
495 			continue;
496 		}
497 
498 		action = XLogReadBufferForRedo(record, block_id, &buffers[block_id]);
499 
500 		/* Apply redo to given block if needed */
501 		if (action == BLK_NEEDS_REDO)
502 		{
503 			Page		page;
504 			PageHeader	pageHeader;
505 			char	   *blockDelta;
506 			Size		blockDeltaSize;
507 
508 			page = BufferGetPage(buffers[block_id]);
509 			blockDelta = XLogRecGetBlockData(record, block_id, &blockDeltaSize);
510 			applyPageRedo(page, blockDelta, blockDeltaSize);
511 
512 			/*
513 			 * Since the delta contains no information about what's in the
514 			 * "hole" between pd_lower and pd_upper, set that to zero to
515 			 * ensure we produce the same page state that application of the
516 			 * logged action by GenericXLogFinish did.
517 			 */
518 			pageHeader = (PageHeader) page;
519 			memset(page + pageHeader->pd_lower, 0,
520 				   pageHeader->pd_upper - pageHeader->pd_lower);
521 
522 			PageSetLSN(page, lsn);
523 			MarkBufferDirty(buffers[block_id]);
524 		}
525 	}
526 
527 	/* Changes are done: unlock and release all buffers */
528 	for (block_id = 0; block_id <= record->max_block_id; block_id++)
529 	{
530 		if (BufferIsValid(buffers[block_id]))
531 			UnlockReleaseBuffer(buffers[block_id]);
532 	}
533 }
534 
535 /*
536  * Mask a generic page before performing consistency checks on it.
537  */
538 void
generic_mask(char * page,BlockNumber blkno)539 generic_mask(char *page, BlockNumber blkno)
540 {
541 	mask_page_lsn_and_checksum(page);
542 
543 	mask_unused_space(page);
544 }
545