1 /*
2  * brin_revmap.c
3  *		Range map for BRIN indexes
4  *
5  * The range map (revmap) is a translation structure for BRIN indexes: for each
6  * page range there is one summary tuple, and its location is tracked by the
7  * revmap.  Whenever a new tuple is inserted into a table that violates the
8  * previously recorded summary values, a new tuple is inserted into the index
9  * and the revmap is updated to point to it.
10  *
11  * The revmap is stored in the first pages of the index, immediately following
12  * the metapage.  When the revmap needs to be expanded, all tuples on the
13  * regular BRIN page at that block (if any) are moved out of the way.
14  *
15  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
16  * Portions Copyright (c) 1994, Regents of the University of California
17  *
18  * IDENTIFICATION
19  *	  src/backend/access/brin/brin_revmap.c
20  */
21 #include "postgres.h"
22 
23 #include "access/brin_page.h"
24 #include "access/brin_pageops.h"
25 #include "access/brin_revmap.h"
26 #include "access/brin_tuple.h"
27 #include "access/brin_xlog.h"
28 #include "access/rmgr.h"
29 #include "access/xloginsert.h"
30 #include "miscadmin.h"
31 #include "storage/bufmgr.h"
32 #include "storage/lmgr.h"
33 #include "utils/rel.h"
34 
35 
36 /*
37  * In revmap pages, each item stores an ItemPointerData.  These defines let one
38  * find the logical revmap page number and index number of the revmap item for
39  * the given heap block number.
40  */
41 #define HEAPBLK_TO_REVMAP_BLK(pagesPerRange, heapBlk) \
42 	((heapBlk / pagesPerRange) / REVMAP_PAGE_MAXITEMS)
43 #define HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk) \
44 	((heapBlk / pagesPerRange) % REVMAP_PAGE_MAXITEMS)
45 
46 
47 struct BrinRevmap
48 {
49 	Relation	rm_irel;
50 	BlockNumber rm_pagesPerRange;
51 	BlockNumber rm_lastRevmapPage;		/* cached from the metapage */
52 	Buffer		rm_metaBuf;
53 	Buffer		rm_currBuf;
54 };
55 
56 /* typedef appears in brin_revmap.h */
57 
58 
59 static BlockNumber revmap_get_blkno(BrinRevmap *revmap,
60 				 BlockNumber heapBlk);
61 static Buffer revmap_get_buffer(BrinRevmap *revmap, BlockNumber heapBlk);
62 static BlockNumber revmap_extend_and_get_blkno(BrinRevmap *revmap,
63 							BlockNumber heapBlk);
64 static void revmap_physical_extend(BrinRevmap *revmap);
65 
66 /*
67  * Initialize an access object for a range map.  This must be freed by
68  * brinRevmapTerminate when caller is done with it.
69  */
70 BrinRevmap *
brinRevmapInitialize(Relation idxrel,BlockNumber * pagesPerRange,Snapshot snapshot)71 brinRevmapInitialize(Relation idxrel, BlockNumber *pagesPerRange,
72 					 Snapshot snapshot)
73 {
74 	BrinRevmap *revmap;
75 	Buffer		meta;
76 	BrinMetaPageData *metadata;
77 	Page		page;
78 
79 	meta = ReadBuffer(idxrel, BRIN_METAPAGE_BLKNO);
80 	LockBuffer(meta, BUFFER_LOCK_SHARE);
81 	page = BufferGetPage(meta);
82 	TestForOldSnapshot(snapshot, idxrel, page);
83 	metadata = (BrinMetaPageData *) PageGetContents(page);
84 
85 	revmap = palloc(sizeof(BrinRevmap));
86 	revmap->rm_irel = idxrel;
87 	revmap->rm_pagesPerRange = metadata->pagesPerRange;
88 	revmap->rm_lastRevmapPage = metadata->lastRevmapPage;
89 	revmap->rm_metaBuf = meta;
90 	revmap->rm_currBuf = InvalidBuffer;
91 
92 	*pagesPerRange = metadata->pagesPerRange;
93 
94 	LockBuffer(meta, BUFFER_LOCK_UNLOCK);
95 
96 	return revmap;
97 }
98 
99 /*
100  * Release resources associated with a revmap access object.
101  */
102 void
brinRevmapTerminate(BrinRevmap * revmap)103 brinRevmapTerminate(BrinRevmap *revmap)
104 {
105 	ReleaseBuffer(revmap->rm_metaBuf);
106 	if (revmap->rm_currBuf != InvalidBuffer)
107 		ReleaseBuffer(revmap->rm_currBuf);
108 	pfree(revmap);
109 }
110 
111 /*
112  * Extend the revmap to cover the given heap block number.
113  */
114 void
brinRevmapExtend(BrinRevmap * revmap,BlockNumber heapBlk)115 brinRevmapExtend(BrinRevmap *revmap, BlockNumber heapBlk)
116 {
117 	BlockNumber mapBlk PG_USED_FOR_ASSERTS_ONLY;
118 
119 	mapBlk = revmap_extend_and_get_blkno(revmap, heapBlk);
120 
121 	/* Ensure the buffer we got is in the expected range */
122 	Assert(mapBlk != InvalidBlockNumber &&
123 		   mapBlk != BRIN_METAPAGE_BLKNO &&
124 		   mapBlk <= revmap->rm_lastRevmapPage);
125 }
126 
127 /*
128  * Prepare to insert an entry into the revmap; the revmap buffer in which the
129  * entry is to reside is locked and returned.  Most callers should call
130  * brinRevmapExtend beforehand, as this routine does not extend the revmap if
131  * it's not long enough.
132  *
133  * The returned buffer is also recorded in the revmap struct; finishing that
134  * releases the buffer, therefore the caller needn't do it explicitly.
135  */
136 Buffer
brinLockRevmapPageForUpdate(BrinRevmap * revmap,BlockNumber heapBlk)137 brinLockRevmapPageForUpdate(BrinRevmap *revmap, BlockNumber heapBlk)
138 {
139 	Buffer		rmBuf;
140 
141 	rmBuf = revmap_get_buffer(revmap, heapBlk);
142 	LockBuffer(rmBuf, BUFFER_LOCK_EXCLUSIVE);
143 
144 	return rmBuf;
145 }
146 
147 /*
148  * In the given revmap buffer (locked appropriately by caller), which is used
149  * in a BRIN index of pagesPerRange pages per range, set the element
150  * corresponding to heap block number heapBlk to the given TID.
151  *
152  * Once the operation is complete, the caller must update the LSN on the
153  * returned buffer.
154  *
155  * This is used both in regular operation and during WAL replay.
156  */
157 void
brinSetHeapBlockItemptr(Buffer buf,BlockNumber pagesPerRange,BlockNumber heapBlk,ItemPointerData tid)158 brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange,
159 						BlockNumber heapBlk, ItemPointerData tid)
160 {
161 	RevmapContents *contents;
162 	ItemPointerData *iptr;
163 	Page		page;
164 
165 	/* The correct page should already be pinned and locked */
166 	page = BufferGetPage(buf);
167 	contents = (RevmapContents *) PageGetContents(page);
168 	iptr = (ItemPointerData *) contents->rm_tids;
169 	iptr += HEAPBLK_TO_REVMAP_INDEX(pagesPerRange, heapBlk);
170 
171 	ItemPointerSet(iptr,
172 				   ItemPointerGetBlockNumber(&tid),
173 				   ItemPointerGetOffsetNumber(&tid));
174 }
175 
176 /*
177  * Fetch the BrinTuple for a given heap block.
178  *
179  * The buffer containing the tuple is locked, and returned in *buf. As an
180  * optimization, the caller can pass a pinned buffer *buf on entry, which will
181  * avoid a pin-unpin cycle when the next tuple is on the same page as a
182  * previous one.
183  *
184  * If no tuple is found for the given heap range, returns NULL. In that case,
185  * *buf might still be updated, but it's not locked.
186  *
187  * The output tuple offset within the buffer is returned in *off, and its size
188  * is returned in *size.
189  */
190 BrinTuple *
brinGetTupleForHeapBlock(BrinRevmap * revmap,BlockNumber heapBlk,Buffer * buf,OffsetNumber * off,Size * size,int mode,Snapshot snapshot)191 brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
192 						 Buffer *buf, OffsetNumber *off, Size *size, int mode,
193 						 Snapshot snapshot)
194 {
195 	Relation	idxRel = revmap->rm_irel;
196 	BlockNumber mapBlk;
197 	RevmapContents *contents;
198 	ItemPointerData *iptr;
199 	BlockNumber blk;
200 	Page		page;
201 	ItemId		lp;
202 	BrinTuple  *tup;
203 	ItemPointerData previptr;
204 
205 	/* normalize the heap block number to be the first page in the range */
206 	heapBlk = (heapBlk / revmap->rm_pagesPerRange) * revmap->rm_pagesPerRange;
207 
208 	/* Compute the revmap page number we need */
209 	mapBlk = revmap_get_blkno(revmap, heapBlk);
210 	if (mapBlk == InvalidBlockNumber)
211 	{
212 		*off = InvalidOffsetNumber;
213 		return NULL;
214 	}
215 
216 	ItemPointerSetInvalid(&previptr);
217 	for (;;)
218 	{
219 		CHECK_FOR_INTERRUPTS();
220 
221 		if (revmap->rm_currBuf == InvalidBuffer ||
222 			BufferGetBlockNumber(revmap->rm_currBuf) != mapBlk)
223 		{
224 			if (revmap->rm_currBuf != InvalidBuffer)
225 				ReleaseBuffer(revmap->rm_currBuf);
226 
227 			Assert(mapBlk != InvalidBlockNumber);
228 			revmap->rm_currBuf = ReadBuffer(revmap->rm_irel, mapBlk);
229 		}
230 
231 		LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_SHARE);
232 
233 		contents = (RevmapContents *)
234 			PageGetContents(BufferGetPage(revmap->rm_currBuf));
235 		iptr = contents->rm_tids;
236 		iptr += HEAPBLK_TO_REVMAP_INDEX(revmap->rm_pagesPerRange, heapBlk);
237 
238 		if (!ItemPointerIsValid(iptr))
239 		{
240 			LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_UNLOCK);
241 			return NULL;
242 		}
243 
244 		/*
245 		 * Check the TID we got in a previous iteration, if any, and save the
246 		 * current TID we got from the revmap; if we loop, we can sanity-check
247 		 * that the next one we get is different.  Otherwise we might be stuck
248 		 * looping forever if the revmap is somehow badly broken.
249 		 */
250 		if (ItemPointerIsValid(&previptr) && ItemPointerEquals(&previptr, iptr))
251 			ereport(ERROR,
252 					(errcode(ERRCODE_INDEX_CORRUPTED),
253 			errmsg_internal("corrupted BRIN index: inconsistent range map")));
254 		previptr = *iptr;
255 
256 		blk = ItemPointerGetBlockNumber(iptr);
257 		*off = ItemPointerGetOffsetNumber(iptr);
258 
259 		LockBuffer(revmap->rm_currBuf, BUFFER_LOCK_UNLOCK);
260 
261 		/* Ok, got a pointer to where the BrinTuple should be. Fetch it. */
262 		if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != blk)
263 		{
264 			if (BufferIsValid(*buf))
265 				ReleaseBuffer(*buf);
266 			*buf = ReadBuffer(idxRel, blk);
267 		}
268 		LockBuffer(*buf, mode);
269 		page = BufferGetPage(*buf);
270 		TestForOldSnapshot(snapshot, idxRel, page);
271 
272 		/* If we land on a revmap page, start over */
273 		if (BRIN_IS_REGULAR_PAGE(page))
274 		{
275 			/*
276 			 * If the offset number is greater than what's in the page, it's
277 			 * possible that the range was desummarized concurrently. Just
278 			 * return NULL to handle that case.
279 			 */
280 			if (*off > PageGetMaxOffsetNumber(page))
281 			{
282 				LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
283 				return NULL;
284 			}
285 
286 			lp = PageGetItemId(page, *off);
287 			if (ItemIdIsUsed(lp))
288 			{
289 				tup = (BrinTuple *) PageGetItem(page, lp);
290 
291 				if (tup->bt_blkno == heapBlk)
292 				{
293 					if (size)
294 						*size = ItemIdGetLength(lp);
295 					/* found it! */
296 					return tup;
297 				}
298 			}
299 		}
300 
301 		/*
302 		 * No luck. Assume that the revmap was updated concurrently.
303 		 */
304 		LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
305 	}
306 	/* not reached, but keep compiler quiet */
307 	return NULL;
308 }
309 
310 /*
311  * Given a heap block number, find the corresponding physical revmap block
312  * number and return it.  If the revmap page hasn't been allocated yet, return
313  * InvalidBlockNumber.
314  */
315 static BlockNumber
revmap_get_blkno(BrinRevmap * revmap,BlockNumber heapBlk)316 revmap_get_blkno(BrinRevmap *revmap, BlockNumber heapBlk)
317 {
318 	BlockNumber targetblk;
319 
320 	/* obtain revmap block number, skip 1 for metapage block */
321 	targetblk = HEAPBLK_TO_REVMAP_BLK(revmap->rm_pagesPerRange, heapBlk) + 1;
322 
323 	/* Normal case: the revmap page is already allocated */
324 	if (targetblk <= revmap->rm_lastRevmapPage)
325 		return targetblk;
326 
327 	return InvalidBlockNumber;
328 }
329 
330 /*
331  * Obtain and return a buffer containing the revmap page for the given heap
332  * page.  The revmap must have been previously extended to cover that page.
333  * The returned buffer is also recorded in the revmap struct; finishing that
334  * releases the buffer, therefore the caller needn't do it explicitly.
335  */
336 static Buffer
revmap_get_buffer(BrinRevmap * revmap,BlockNumber heapBlk)337 revmap_get_buffer(BrinRevmap *revmap, BlockNumber heapBlk)
338 {
339 	BlockNumber mapBlk;
340 
341 	/* Translate the heap block number to physical index location. */
342 	mapBlk = revmap_get_blkno(revmap, heapBlk);
343 
344 	if (mapBlk == InvalidBlockNumber)
345 		elog(ERROR, "revmap does not cover heap block %u", heapBlk);
346 
347 	/* Ensure the buffer we got is in the expected range */
348 	Assert(mapBlk != BRIN_METAPAGE_BLKNO &&
349 		   mapBlk <= revmap->rm_lastRevmapPage);
350 
351 	/*
352 	 * Obtain the buffer from which we need to read.  If we already have the
353 	 * correct buffer in our access struct, use that; otherwise, release that,
354 	 * (if valid) and read the one we need.
355 	 */
356 	if (revmap->rm_currBuf == InvalidBuffer ||
357 		mapBlk != BufferGetBlockNumber(revmap->rm_currBuf))
358 	{
359 		if (revmap->rm_currBuf != InvalidBuffer)
360 			ReleaseBuffer(revmap->rm_currBuf);
361 
362 		revmap->rm_currBuf = ReadBuffer(revmap->rm_irel, mapBlk);
363 	}
364 
365 	return revmap->rm_currBuf;
366 }
367 
368 /*
369  * Given a heap block number, find the corresponding physical revmap block
370  * number and return it. If the revmap page hasn't been allocated yet, extend
371  * the revmap until it is.
372  */
373 static BlockNumber
revmap_extend_and_get_blkno(BrinRevmap * revmap,BlockNumber heapBlk)374 revmap_extend_and_get_blkno(BrinRevmap *revmap, BlockNumber heapBlk)
375 {
376 	BlockNumber targetblk;
377 
378 	/* obtain revmap block number, skip 1 for metapage block */
379 	targetblk = HEAPBLK_TO_REVMAP_BLK(revmap->rm_pagesPerRange, heapBlk) + 1;
380 
381 	/* Extend the revmap, if necessary */
382 	while (targetblk > revmap->rm_lastRevmapPage)
383 	{
384 		CHECK_FOR_INTERRUPTS();
385 		revmap_physical_extend(revmap);
386 	}
387 
388 	return targetblk;
389 }
390 
391 /*
392  * Try to extend the revmap by one page.  This might not happen for a number of
393  * reasons; caller is expected to retry until the expected outcome is obtained.
394  */
395 static void
revmap_physical_extend(BrinRevmap * revmap)396 revmap_physical_extend(BrinRevmap *revmap)
397 {
398 	Buffer		buf;
399 	Page		page;
400 	Page		metapage;
401 	BrinMetaPageData *metadata;
402 	BlockNumber mapBlk;
403 	BlockNumber nblocks;
404 	Relation	irel = revmap->rm_irel;
405 	bool		needLock = !RELATION_IS_LOCAL(irel);
406 
407 	/*
408 	 * Lock the metapage. This locks out concurrent extensions of the revmap,
409 	 * but note that we still need to grab the relation extension lock because
410 	 * another backend can extend the index with regular BRIN pages.
411 	 */
412 	LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_EXCLUSIVE);
413 	metapage = BufferGetPage(revmap->rm_metaBuf);
414 	metadata = (BrinMetaPageData *) PageGetContents(metapage);
415 
416 	/*
417 	 * Check that our cached lastRevmapPage value was up-to-date; if it
418 	 * wasn't, update the cached copy and have caller start over.
419 	 */
420 	if (metadata->lastRevmapPage != revmap->rm_lastRevmapPage)
421 	{
422 		revmap->rm_lastRevmapPage = metadata->lastRevmapPage;
423 		LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
424 		return;
425 	}
426 	mapBlk = metadata->lastRevmapPage + 1;
427 
428 	nblocks = RelationGetNumberOfBlocks(irel);
429 	if (mapBlk < nblocks)
430 	{
431 		buf = ReadBuffer(irel, mapBlk);
432 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
433 		page = BufferGetPage(buf);
434 	}
435 	else
436 	{
437 		if (needLock)
438 			LockRelationForExtension(irel, ExclusiveLock);
439 
440 		buf = ReadBuffer(irel, P_NEW);
441 		if (BufferGetBlockNumber(buf) != mapBlk)
442 		{
443 			/*
444 			 * Very rare corner case: somebody extended the relation
445 			 * concurrently after we read its length.  If this happens, give
446 			 * up and have caller start over.  We will have to evacuate that
447 			 * page from under whoever is using it.
448 			 */
449 			if (needLock)
450 				UnlockRelationForExtension(irel, ExclusiveLock);
451 			LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
452 			ReleaseBuffer(buf);
453 			return;
454 		}
455 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
456 		page = BufferGetPage(buf);
457 
458 		if (needLock)
459 			UnlockRelationForExtension(irel, ExclusiveLock);
460 	}
461 
462 	/* Check that it's a regular block (or an empty page) */
463 	if (!PageIsNew(page) && !BRIN_IS_REGULAR_PAGE(page))
464 		ereport(ERROR,
465 				(errcode(ERRCODE_INDEX_CORRUPTED),
466 		  errmsg("unexpected page type 0x%04X in BRIN index \"%s\" block %u",
467 				 BrinPageType(page),
468 				 RelationGetRelationName(irel),
469 				 BufferGetBlockNumber(buf))));
470 
471 	/* If the page is in use, evacuate it and restart */
472 	if (brin_start_evacuating_page(irel, buf))
473 	{
474 		LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
475 		brin_evacuate_page(irel, revmap->rm_pagesPerRange, revmap, buf);
476 
477 		/* have caller start over */
478 		return;
479 	}
480 
481 	/*
482 	 * Ok, we have now locked the metapage and the target block. Re-initialize
483 	 * it as a revmap page.
484 	 */
485 	START_CRIT_SECTION();
486 
487 	/* the rm_tids array is initialized to all invalid by PageInit */
488 	brin_page_init(page, BRIN_PAGETYPE_REVMAP);
489 	MarkBufferDirty(buf);
490 
491 	metadata->lastRevmapPage = mapBlk;
492 	MarkBufferDirty(revmap->rm_metaBuf);
493 
494 	if (RelationNeedsWAL(revmap->rm_irel))
495 	{
496 		xl_brin_revmap_extend xlrec;
497 		XLogRecPtr	recptr;
498 
499 		xlrec.targetBlk = mapBlk;
500 
501 		XLogBeginInsert();
502 		XLogRegisterData((char *) &xlrec, SizeOfBrinRevmapExtend);
503 		XLogRegisterBuffer(0, revmap->rm_metaBuf, 0);
504 
505 		XLogRegisterBuffer(1, buf, REGBUF_WILL_INIT);
506 
507 		recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_REVMAP_EXTEND);
508 		PageSetLSN(metapage, recptr);
509 		PageSetLSN(page, recptr);
510 	}
511 
512 	END_CRIT_SECTION();
513 
514 	LockBuffer(revmap->rm_metaBuf, BUFFER_LOCK_UNLOCK);
515 
516 	UnlockReleaseBuffer(buf);
517 }
518