1 /*
2  * brin_pageops.c
3  *		Page-handling routines for BRIN indexes
4  *
5  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
6  * Portions Copyright (c) 1994, Regents of the University of California
7  *
8  * IDENTIFICATION
9  *	  src/backend/access/brin/brin_pageops.c
10  */
11 #include "postgres.h"
12 
13 #include "access/brin_pageops.h"
14 #include "access/brin_page.h"
15 #include "access/brin_revmap.h"
16 #include "access/brin_xlog.h"
17 #include "access/xloginsert.h"
18 #include "miscadmin.h"
19 #include "storage/bufmgr.h"
20 #include "storage/freespace.h"
21 #include "storage/lmgr.h"
22 #include "storage/smgr.h"
23 #include "utils/rel.h"
24 
25 
26 /*
27  * Maximum size of an entry in a BRIN_PAGETYPE_REGULAR page.  We can tolerate
28  * a single item per page, unlike other index AMs.
29  */
30 #define BrinMaxItemSize \
31 	MAXALIGN_DOWN(BLCKSZ - \
32 				  (MAXALIGN(SizeOfPageHeaderData + \
33 							sizeof(ItemIdData)) + \
34 				   MAXALIGN(sizeof(BrinSpecialSpace))))
35 
36 static Buffer brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
37 					 bool *extended);
38 static Size br_page_get_freespace(Page page);
39 static void brin_initialize_empty_new_buffer(Relation idxrel, Buffer buffer);
40 
41 
42 /*
43  * Update tuple origtup (size origsz), located in offset oldoff of buffer
44  * oldbuf, to newtup (size newsz) as summary tuple for the page range starting
45  * at heapBlk.  oldbuf must not be locked on entry, and is not locked at exit.
46  *
47  * If samepage is true, attempt to put the new tuple in the same page, but if
48  * there's no room, use some other one.
49  *
50  * If the update is successful, return true; the revmap is updated to point to
51  * the new tuple.  If the update is not done for whatever reason, return false.
52  * Caller may retry the update if this happens.
53  */
54 bool
brin_doupdate(Relation idxrel,BlockNumber pagesPerRange,BrinRevmap * revmap,BlockNumber heapBlk,Buffer oldbuf,OffsetNumber oldoff,const BrinTuple * origtup,Size origsz,const BrinTuple * newtup,Size newsz,bool samepage)55 brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
56 			  BrinRevmap *revmap, BlockNumber heapBlk,
57 			  Buffer oldbuf, OffsetNumber oldoff,
58 			  const BrinTuple *origtup, Size origsz,
59 			  const BrinTuple *newtup, Size newsz,
60 			  bool samepage)
61 {
62 	Page		oldpage;
63 	ItemId		oldlp;
64 	BrinTuple  *oldtup;
65 	Size		oldsz;
66 	Buffer		newbuf;
67 	bool		extended;
68 
69 	Assert(newsz == MAXALIGN(newsz));
70 
71 	/* If the item is oversized, don't bother. */
72 	if (newsz > BrinMaxItemSize)
73 	{
74 		ereport(ERROR,
75 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
76 			errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
77 				   newsz, BrinMaxItemSize, RelationGetRelationName(idxrel))));
78 		return false;			/* keep compiler quiet */
79 	}
80 
81 	/* make sure the revmap is long enough to contain the entry we need */
82 	brinRevmapExtend(revmap, heapBlk);
83 
84 	if (!samepage)
85 	{
86 		/* need a page on which to put the item */
87 		newbuf = brin_getinsertbuffer(idxrel, oldbuf, newsz, &extended);
88 		if (!BufferIsValid(newbuf))
89 		{
90 			Assert(!extended);
91 			return false;
92 		}
93 
94 		/*
95 		 * Note: it's possible (though unlikely) that the returned newbuf is
96 		 * the same as oldbuf, if brin_getinsertbuffer determined that the old
97 		 * buffer does in fact have enough space.
98 		 */
99 		if (newbuf == oldbuf)
100 		{
101 			Assert(!extended);
102 			newbuf = InvalidBuffer;
103 		}
104 	}
105 	else
106 	{
107 		LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
108 		newbuf = InvalidBuffer;
109 		extended = false;
110 	}
111 	oldpage = BufferGetPage(oldbuf);
112 	oldlp = PageGetItemId(oldpage, oldoff);
113 
114 	/*
115 	 * Check that the old tuple wasn't updated concurrently: it might have
116 	 * moved someplace else entirely, and for that matter the whole page
117 	 * might've become a revmap page.  Note that in the first two cases
118 	 * checked here, the "oldlp" we just calculated is garbage; but
119 	 * PageGetItemId() is simple enough that it was safe to do that
120 	 * calculation anyway.
121 	 */
122 	if (!BRIN_IS_REGULAR_PAGE(oldpage) ||
123 		oldoff > PageGetMaxOffsetNumber(oldpage) ||
124 		!ItemIdIsNormal(oldlp))
125 	{
126 		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
127 
128 		/*
129 		 * If this happens, and the new buffer was obtained by extending the
130 		 * relation, then we need to ensure we don't leave it uninitialized or
131 		 * forget about it.
132 		 */
133 		if (BufferIsValid(newbuf))
134 		{
135 			if (extended)
136 				brin_initialize_empty_new_buffer(idxrel, newbuf);
137 			UnlockReleaseBuffer(newbuf);
138 			if (extended)
139 				FreeSpaceMapVacuum(idxrel);
140 		}
141 		return false;
142 	}
143 
144 	oldsz = ItemIdGetLength(oldlp);
145 	oldtup = (BrinTuple *) PageGetItem(oldpage, oldlp);
146 
147 	/*
148 	 * ... or it might have been updated in place to different contents.
149 	 */
150 	if (!brin_tuples_equal(oldtup, oldsz, origtup, origsz))
151 	{
152 		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
153 		if (BufferIsValid(newbuf))
154 		{
155 			if (extended)
156 				brin_initialize_empty_new_buffer(idxrel, newbuf);
157 			UnlockReleaseBuffer(newbuf);
158 			if (extended)
159 				FreeSpaceMapVacuum(idxrel);
160 		}
161 		return false;
162 	}
163 
164 	/*
165 	 * Great, the old tuple is intact.  We can proceed with the update.
166 	 *
167 	 * If there's enough room in the old page for the new tuple, replace it.
168 	 *
169 	 * Note that there might now be enough space on the page even though the
170 	 * caller told us there isn't, if a concurrent update moved another tuple
171 	 * elsewhere or replaced a tuple with a smaller one.
172 	 */
173 	if (((BrinPageFlags(oldpage) & BRIN_EVACUATE_PAGE) == 0) &&
174 		brin_can_do_samepage_update(oldbuf, origsz, newsz))
175 	{
176 		if (BufferIsValid(newbuf))
177 		{
178 			/* as above */
179 			if (extended)
180 				brin_initialize_empty_new_buffer(idxrel, newbuf);
181 			UnlockReleaseBuffer(newbuf);
182 		}
183 
184 		START_CRIT_SECTION();
185 		PageIndexDeleteNoCompact(oldpage, &oldoff, 1);
186 		if (PageAddItemExtended(oldpage, (Item) newtup, newsz, oldoff,
187 				PAI_OVERWRITE | PAI_ALLOW_FAR_OFFSET) == InvalidOffsetNumber)
188 			elog(ERROR, "failed to add BRIN tuple");
189 		MarkBufferDirty(oldbuf);
190 
191 		/* XLOG stuff */
192 		if (RelationNeedsWAL(idxrel))
193 		{
194 			xl_brin_samepage_update xlrec;
195 			XLogRecPtr	recptr;
196 			uint8		info = XLOG_BRIN_SAMEPAGE_UPDATE;
197 
198 			xlrec.offnum = oldoff;
199 
200 			XLogBeginInsert();
201 			XLogRegisterData((char *) &xlrec, SizeOfBrinSamepageUpdate);
202 
203 			XLogRegisterBuffer(0, oldbuf, REGBUF_STANDARD);
204 			XLogRegisterBufData(0, (char *) newtup, newsz);
205 
206 			recptr = XLogInsert(RM_BRIN_ID, info);
207 
208 			PageSetLSN(oldpage, recptr);
209 		}
210 
211 		END_CRIT_SECTION();
212 
213 		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
214 
215 		if (extended)
216 			FreeSpaceMapVacuum(idxrel);
217 
218 		return true;
219 	}
220 	else if (newbuf == InvalidBuffer)
221 	{
222 		/*
223 		 * Not enough space, but caller said that there was. Tell them to
224 		 * start over.
225 		 */
226 		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
227 		return false;
228 	}
229 	else
230 	{
231 		/*
232 		 * Not enough free space on the oldpage. Put the new tuple on the new
233 		 * page, and update the revmap.
234 		 */
235 		Page		newpage = BufferGetPage(newbuf);
236 		Buffer		revmapbuf;
237 		ItemPointerData newtid;
238 		OffsetNumber newoff;
239 		BlockNumber newblk = InvalidBlockNumber;
240 		Size		freespace = 0;
241 
242 		revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
243 
244 		START_CRIT_SECTION();
245 
246 		/*
247 		 * We need to initialize the page if it's newly obtained.  Note we
248 		 * will WAL-log the initialization as part of the update, so we don't
249 		 * need to do that here.
250 		 */
251 		if (extended)
252 			brin_page_init(BufferGetPage(newbuf), BRIN_PAGETYPE_REGULAR);
253 
254 		PageIndexDeleteNoCompact(oldpage, &oldoff, 1);
255 		newoff = PageAddItem(newpage, (Item) newtup, newsz,
256 							 InvalidOffsetNumber, false, false);
257 		if (newoff == InvalidOffsetNumber)
258 			elog(ERROR, "failed to add BRIN tuple to new page");
259 		MarkBufferDirty(oldbuf);
260 		MarkBufferDirty(newbuf);
261 
262 		/* needed to update FSM below */
263 		if (extended)
264 		{
265 			newblk = BufferGetBlockNumber(newbuf);
266 			freespace = br_page_get_freespace(newpage);
267 		}
268 
269 		ItemPointerSet(&newtid, BufferGetBlockNumber(newbuf), newoff);
270 		brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, newtid);
271 		MarkBufferDirty(revmapbuf);
272 
273 		/* XLOG stuff */
274 		if (RelationNeedsWAL(idxrel))
275 		{
276 			xl_brin_update xlrec;
277 			XLogRecPtr	recptr;
278 			uint8		info;
279 
280 			info = XLOG_BRIN_UPDATE | (extended ? XLOG_BRIN_INIT_PAGE : 0);
281 
282 			xlrec.insert.offnum = newoff;
283 			xlrec.insert.heapBlk = heapBlk;
284 			xlrec.insert.pagesPerRange = pagesPerRange;
285 			xlrec.oldOffnum = oldoff;
286 
287 			XLogBeginInsert();
288 
289 			/* new page */
290 			XLogRegisterData((char *) &xlrec, SizeOfBrinUpdate);
291 
292 			XLogRegisterBuffer(0, newbuf, REGBUF_STANDARD | (extended ? REGBUF_WILL_INIT : 0));
293 			XLogRegisterBufData(0, (char *) newtup, newsz);
294 
295 			/* revmap page */
296 			XLogRegisterBuffer(1, revmapbuf, 0);
297 
298 			/* old page */
299 			XLogRegisterBuffer(2, oldbuf, REGBUF_STANDARD);
300 
301 			recptr = XLogInsert(RM_BRIN_ID, info);
302 
303 			PageSetLSN(oldpage, recptr);
304 			PageSetLSN(newpage, recptr);
305 			PageSetLSN(BufferGetPage(revmapbuf), recptr);
306 		}
307 
308 		END_CRIT_SECTION();
309 
310 		LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
311 		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
312 		UnlockReleaseBuffer(newbuf);
313 
314 		if (extended)
315 		{
316 			Assert(BlockNumberIsValid(newblk));
317 			RecordPageWithFreeSpace(idxrel, newblk, freespace);
318 			FreeSpaceMapVacuum(idxrel);
319 		}
320 
321 		return true;
322 	}
323 }
324 
325 /*
326  * Return whether brin_doupdate can do a samepage update.
327  */
328 bool
brin_can_do_samepage_update(Buffer buffer,Size origsz,Size newsz)329 brin_can_do_samepage_update(Buffer buffer, Size origsz, Size newsz)
330 {
331 	return
332 		((newsz <= origsz) ||
333 		 PageGetExactFreeSpace(BufferGetPage(buffer)) >= (newsz - origsz));
334 }
335 
336 /*
337  * Insert an index tuple into the index relation.  The revmap is updated to
338  * mark the range containing the given page as pointing to the inserted entry.
339  * A WAL record is written.
340  *
341  * The buffer, if valid, is first checked for free space to insert the new
342  * entry; if there isn't enough, a new buffer is obtained and pinned.  No
343  * buffer lock must be held on entry, no buffer lock is held on exit.
344  *
345  * Return value is the offset number where the tuple was inserted.
346  */
347 OffsetNumber
brin_doinsert(Relation idxrel,BlockNumber pagesPerRange,BrinRevmap * revmap,Buffer * buffer,BlockNumber heapBlk,BrinTuple * tup,Size itemsz)348 brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,
349 			  BrinRevmap *revmap, Buffer *buffer, BlockNumber heapBlk,
350 			  BrinTuple *tup, Size itemsz)
351 {
352 	Page		page;
353 	BlockNumber blk;
354 	OffsetNumber off;
355 	Buffer		revmapbuf;
356 	ItemPointerData tid;
357 	bool		extended;
358 
359 	Assert(itemsz == MAXALIGN(itemsz));
360 
361 	/* If the item is oversized, don't even bother. */
362 	if (itemsz > BrinMaxItemSize)
363 	{
364 		ereport(ERROR,
365 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
366 			errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
367 				   itemsz, BrinMaxItemSize, RelationGetRelationName(idxrel))));
368 		return InvalidOffsetNumber;		/* keep compiler quiet */
369 	}
370 
371 	/* Make sure the revmap is long enough to contain the entry we need */
372 	brinRevmapExtend(revmap, heapBlk);
373 
374 	/*
375 	 * Acquire lock on buffer supplied by caller, if any.  If it doesn't have
376 	 * enough space, unpin it to obtain a new one below.
377 	 */
378 	if (BufferIsValid(*buffer))
379 	{
380 		/*
381 		 * It's possible that another backend (or ourselves!) extended the
382 		 * revmap over the page we held a pin on, so we cannot assume that
383 		 * it's still a regular page.
384 		 */
385 		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
386 		if (br_page_get_freespace(BufferGetPage(*buffer)) < itemsz)
387 		{
388 			UnlockReleaseBuffer(*buffer);
389 			*buffer = InvalidBuffer;
390 		}
391 	}
392 
393 	/*
394 	 * If we still don't have a usable buffer, have brin_getinsertbuffer
395 	 * obtain one for us.
396 	 */
397 	if (!BufferIsValid(*buffer))
398 	{
399 		do
400 			*buffer = brin_getinsertbuffer(idxrel, InvalidBuffer, itemsz, &extended);
401 		while (!BufferIsValid(*buffer));
402 	}
403 	else
404 		extended = false;
405 
406 	/* Now obtain lock on revmap buffer */
407 	revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);
408 
409 	page = BufferGetPage(*buffer);
410 	blk = BufferGetBlockNumber(*buffer);
411 
412 	/* Execute the actual insertion */
413 	START_CRIT_SECTION();
414 	if (extended)
415 		brin_page_init(BufferGetPage(*buffer), BRIN_PAGETYPE_REGULAR);
416 	off = PageAddItem(page, (Item) tup, itemsz, InvalidOffsetNumber,
417 					  false, false);
418 	if (off == InvalidOffsetNumber)
419 		elog(ERROR, "could not insert new index tuple to page");
420 	MarkBufferDirty(*buffer);
421 
422 	BRIN_elog((DEBUG2, "inserted tuple (%u,%u) for range starting at %u",
423 			   blk, off, heapBlk));
424 
425 	ItemPointerSet(&tid, blk, off);
426 	brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, tid);
427 	MarkBufferDirty(revmapbuf);
428 
429 	/* XLOG stuff */
430 	if (RelationNeedsWAL(idxrel))
431 	{
432 		xl_brin_insert xlrec;
433 		XLogRecPtr	recptr;
434 		uint8		info;
435 
436 		info = XLOG_BRIN_INSERT | (extended ? XLOG_BRIN_INIT_PAGE : 0);
437 		xlrec.heapBlk = heapBlk;
438 		xlrec.pagesPerRange = pagesPerRange;
439 		xlrec.offnum = off;
440 
441 		XLogBeginInsert();
442 		XLogRegisterData((char *) &xlrec, SizeOfBrinInsert);
443 
444 		XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD | (extended ? REGBUF_WILL_INIT : 0));
445 		XLogRegisterBufData(0, (char *) tup, itemsz);
446 
447 		XLogRegisterBuffer(1, revmapbuf, 0);
448 
449 		recptr = XLogInsert(RM_BRIN_ID, info);
450 
451 		PageSetLSN(page, recptr);
452 		PageSetLSN(BufferGetPage(revmapbuf), recptr);
453 	}
454 
455 	END_CRIT_SECTION();
456 
457 	/* Tuple is firmly on buffer; we can release our locks */
458 	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
459 	LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);
460 
461 	if (extended)
462 		FreeSpaceMapVacuum(idxrel);
463 
464 	return off;
465 }
466 
467 /*
468  * Initialize a page with the given type.
469  *
470  * Caller is responsible for marking it dirty, as appropriate.
471  */
472 void
brin_page_init(Page page,uint16 type)473 brin_page_init(Page page, uint16 type)
474 {
475 	PageInit(page, BLCKSZ, sizeof(BrinSpecialSpace));
476 
477 	BrinPageType(page) = type;
478 }
479 
480 /*
481  * Initialize a new BRIN index' metapage.
482  */
483 void
brin_metapage_init(Page page,BlockNumber pagesPerRange,uint16 version)484 brin_metapage_init(Page page, BlockNumber pagesPerRange, uint16 version)
485 {
486 	BrinMetaPageData *metadata;
487 
488 	brin_page_init(page, BRIN_PAGETYPE_META);
489 
490 	metadata = (BrinMetaPageData *) PageGetContents(page);
491 
492 	metadata->brinMagic = BRIN_META_MAGIC;
493 	metadata->brinVersion = version;
494 	metadata->pagesPerRange = pagesPerRange;
495 
496 	/*
497 	 * Note we cheat here a little.  0 is not a valid revmap block number
498 	 * (because it's the metapage buffer), but doing this enables the first
499 	 * revmap page to be created when the index is.
500 	 */
501 	metadata->lastRevmapPage = 0;
502 }
503 
504 /*
505  * Initiate page evacuation protocol.
506  *
507  * The page must be locked in exclusive mode by the caller.
508  *
509  * If the page is not yet initialized or empty, return false without doing
510  * anything; it can be used for revmap without any further changes.  If it
511  * contains tuples, mark it for evacuation and return true.
512  */
513 bool
brin_start_evacuating_page(Relation idxRel,Buffer buf)514 brin_start_evacuating_page(Relation idxRel, Buffer buf)
515 {
516 	OffsetNumber off;
517 	OffsetNumber maxoff;
518 	Page		page;
519 
520 	page = BufferGetPage(buf);
521 
522 	if (PageIsNew(page))
523 		return false;
524 
525 	maxoff = PageGetMaxOffsetNumber(page);
526 	for (off = FirstOffsetNumber; off <= maxoff; off++)
527 	{
528 		ItemId		lp;
529 
530 		lp = PageGetItemId(page, off);
531 		if (ItemIdIsUsed(lp))
532 		{
533 			/* prevent other backends from adding more stuff to this page */
534 			BrinPageFlags(page) |= BRIN_EVACUATE_PAGE;
535 			MarkBufferDirtyHint(buf, true);
536 
537 			return true;
538 		}
539 	}
540 	return false;
541 }
542 
543 /*
544  * Move all tuples out of a page.
545  *
546  * The caller must hold lock on the page. The lock and pin are released.
547  */
548 void
brin_evacuate_page(Relation idxRel,BlockNumber pagesPerRange,BrinRevmap * revmap,Buffer buf)549 brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
550 				   BrinRevmap *revmap, Buffer buf)
551 {
552 	OffsetNumber off;
553 	OffsetNumber maxoff;
554 	Page		page;
555 
556 	page = BufferGetPage(buf);
557 
558 	Assert(BrinPageFlags(page) & BRIN_EVACUATE_PAGE);
559 
560 	maxoff = PageGetMaxOffsetNumber(page);
561 	for (off = FirstOffsetNumber; off <= maxoff; off++)
562 	{
563 		BrinTuple  *tup;
564 		Size		sz;
565 		ItemId		lp;
566 
567 		CHECK_FOR_INTERRUPTS();
568 
569 		lp = PageGetItemId(page, off);
570 		if (ItemIdIsUsed(lp))
571 		{
572 			sz = ItemIdGetLength(lp);
573 			tup = (BrinTuple *) PageGetItem(page, lp);
574 			tup = brin_copy_tuple(tup, sz);
575 
576 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
577 
578 			if (!brin_doupdate(idxRel, pagesPerRange, revmap, tup->bt_blkno,
579 							   buf, off, tup, sz, tup, sz, false))
580 				off--;			/* retry */
581 
582 			LockBuffer(buf, BUFFER_LOCK_SHARE);
583 
584 			/* It's possible that someone extended the revmap over this page */
585 			if (!BRIN_IS_REGULAR_PAGE(page))
586 				break;
587 		}
588 	}
589 
590 	UnlockReleaseBuffer(buf);
591 }
592 
593 /*
594  * Given a BRIN index page, initialize it if necessary, and record it into the
595  * FSM if necessary.  Return value is true if the FSM itself needs "vacuuming".
596  * The main use for this is when, during vacuuming, an uninitialized page is
597  * found, which could be the result of relation extension followed by a crash
598  * before the page can be used.
599  */
600 bool
brin_page_cleanup(Relation idxrel,Buffer buf)601 brin_page_cleanup(Relation idxrel, Buffer buf)
602 {
603 	Page		page = BufferGetPage(buf);
604 	Size		freespace;
605 
606 	/*
607 	 * If a page was left uninitialized, initialize it now; also record it in
608 	 * FSM.
609 	 *
610 	 * Somebody else might be extending the relation concurrently.  To avoid
611 	 * re-initializing the page before they can grab the buffer lock, we
612 	 * acquire the extension lock momentarily.  Since they hold the extension
613 	 * lock from before getting the page and after its been initialized, we're
614 	 * sure to see their initialization.
615 	 */
616 	if (PageIsNew(page))
617 	{
618 		LockRelationForExtension(idxrel, ShareLock);
619 		UnlockRelationForExtension(idxrel, ShareLock);
620 
621 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
622 		if (PageIsNew(page))
623 		{
624 			brin_initialize_empty_new_buffer(idxrel, buf);
625 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
626 			return true;
627 		}
628 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
629 	}
630 
631 	/* Nothing to be done for non-regular index pages */
632 	if (BRIN_IS_META_PAGE(BufferGetPage(buf)) ||
633 		BRIN_IS_REVMAP_PAGE(BufferGetPage(buf)))
634 		return false;
635 
636 	/* Measure free space and record it */
637 	freespace = br_page_get_freespace(page);
638 	if (freespace > GetRecordedFreeSpace(idxrel, BufferGetBlockNumber(buf)))
639 	{
640 		RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buf), freespace);
641 		return true;
642 	}
643 
644 	return false;
645 }
646 
647 /*
648  * Return a pinned and exclusively locked buffer which can be used to insert an
649  * index item of size itemsz (caller must ensure not to request sizes
650  * impossible to fulfill).  If oldbuf is a valid buffer, it is also locked (in
651  * an order determined to avoid deadlocks.)
652  *
653  * If we find that the old page is no longer a regular index page (because
654  * of a revmap extension), the old buffer is unlocked and we return
655  * InvalidBuffer.
656  *
657  * If there's no existing page with enough free space to accommodate the new
658  * item, the relation is extended.  If this happens, *extended is set to true,
659  * and it is the caller's responsibility to initialize the page (and WAL-log
660  * that fact) prior to use.
661  *
662  * Note that in some corner cases it is possible for this routine to extend the
663  * relation and then not return the buffer.  It is this routine's
664  * responsibility to WAL-log the page initialization and to record the page in
665  * FSM if that happens.  Such a buffer may later be reused by this routine.
666  */
667 static Buffer
brin_getinsertbuffer(Relation irel,Buffer oldbuf,Size itemsz,bool * extended)668 brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
669 					 bool *extended)
670 {
671 	BlockNumber oldblk;
672 	BlockNumber newblk;
673 	Page		page;
674 	Size		freespace;
675 
676 	/* callers must have checked */
677 	Assert(itemsz <= BrinMaxItemSize);
678 
679 	*extended = false;
680 
681 	if (BufferIsValid(oldbuf))
682 		oldblk = BufferGetBlockNumber(oldbuf);
683 	else
684 		oldblk = InvalidBlockNumber;
685 
686 	/*
687 	 * Loop until we find a page with sufficient free space.  By the time we
688 	 * return to caller out of this loop, both buffers are valid and locked;
689 	 * if we have to restart here, neither buffer is locked and buf is not a
690 	 * pinned buffer.
691 	 */
692 	newblk = RelationGetTargetBlock(irel);
693 	if (newblk == InvalidBlockNumber)
694 		newblk = GetPageWithFreeSpace(irel, itemsz);
695 	for (;;)
696 	{
697 		Buffer		buf;
698 		bool		extensionLockHeld = false;
699 
700 		CHECK_FOR_INTERRUPTS();
701 
702 		if (newblk == InvalidBlockNumber)
703 		{
704 			/*
705 			 * There's not enough free space in any existing index page,
706 			 * according to the FSM: extend the relation to obtain a shiny new
707 			 * page.
708 			 */
709 			if (!RELATION_IS_LOCAL(irel))
710 			{
711 				LockRelationForExtension(irel, ExclusiveLock);
712 				extensionLockHeld = true;
713 			}
714 			buf = ReadBuffer(irel, P_NEW);
715 			newblk = BufferGetBlockNumber(buf);
716 			*extended = true;
717 
718 			BRIN_elog((DEBUG2, "brin_getinsertbuffer: extending to page %u",
719 					   BufferGetBlockNumber(buf)));
720 		}
721 		else if (newblk == oldblk)
722 		{
723 			/*
724 			 * There's an odd corner-case here where the FSM is out-of-date,
725 			 * and gave us the old page.
726 			 */
727 			buf = oldbuf;
728 		}
729 		else
730 		{
731 			buf = ReadBuffer(irel, newblk);
732 		}
733 
734 		/*
735 		 * We lock the old buffer first, if it's earlier than the new one; but
736 		 * before we do, we need to check that it hasn't been turned into a
737 		 * revmap page concurrently; if we detect that it happened, give up
738 		 * and tell caller to start over.
739 		 */
740 		if (BufferIsValid(oldbuf) && oldblk < newblk)
741 		{
742 			LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
743 			if (!BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)))
744 			{
745 				LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
746 
747 				/*
748 				 * It is possible that the new page was obtained from
749 				 * extending the relation.  In that case, we must be sure to
750 				 * record it in the FSM before leaving, because otherwise the
751 				 * space would be lost forever.  However, we cannot let an
752 				 * uninitialized page get in the FSM, so we need to initialize
753 				 * it first.
754 				 */
755 				if (*extended)
756 				{
757 					brin_initialize_empty_new_buffer(irel, buf);
758 					/* shouldn't matter, but don't confuse caller */
759 					*extended = false;
760 				}
761 
762 				if (extensionLockHeld)
763 					UnlockRelationForExtension(irel, ExclusiveLock);
764 
765 				ReleaseBuffer(buf);
766 				return InvalidBuffer;
767 			}
768 		}
769 
770 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
771 
772 		if (extensionLockHeld)
773 			UnlockRelationForExtension(irel, ExclusiveLock);
774 
775 		page = BufferGetPage(buf);
776 
777 		/*
778 		 * We have a new buffer to insert into.  Check that the new page has
779 		 * enough free space, and return it if it does; otherwise start over.
780 		 * Note that we allow for the FSM to be out of date here, and in that
781 		 * case we update it and move on.
782 		 *
783 		 * (br_page_get_freespace also checks that the FSM didn't hand us a
784 		 * page that has since been repurposed for the revmap.)
785 		 */
786 		freespace = *extended ?
787 			BrinMaxItemSize : br_page_get_freespace(page);
788 		if (freespace >= itemsz)
789 		{
790 			RelationSetTargetBlock(irel, BufferGetBlockNumber(buf));
791 
792 			/*
793 			 * Since the target block specification can get lost on cache
794 			 * invalidations, make sure we update the more permanent FSM with
795 			 * data about it before going away.
796 			 */
797 			if (*extended)
798 				RecordPageWithFreeSpace(irel, BufferGetBlockNumber(buf),
799 										freespace);
800 
801 			/*
802 			 * Lock the old buffer if not locked already.  Note that in this
803 			 * case we know for sure it's a regular page: it's later than the
804 			 * new page we just got, which is not a revmap page, and revmap
805 			 * pages are always consecutive.
806 			 */
807 			if (BufferIsValid(oldbuf) && oldblk > newblk)
808 			{
809 				LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
810 				Assert(BRIN_IS_REGULAR_PAGE(BufferGetPage(oldbuf)));
811 			}
812 
813 			return buf;
814 		}
815 
816 		/* This page is no good. */
817 
818 		/*
819 		 * If an entirely new page does not contain enough free space for the
820 		 * new item, then surely that item is oversized.  Complain loudly; but
821 		 * first make sure we initialize the page and record it as free, for
822 		 * next time.
823 		 */
824 		if (*extended)
825 		{
826 			brin_initialize_empty_new_buffer(irel, buf);
827 
828 			ereport(ERROR,
829 					(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
830 			errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
831 				   itemsz, freespace, RelationGetRelationName(irel))));
832 			return InvalidBuffer;		/* keep compiler quiet */
833 		}
834 
835 		if (newblk != oldblk)
836 			UnlockReleaseBuffer(buf);
837 		if (BufferIsValid(oldbuf) && oldblk <= newblk)
838 			LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
839 
840 		newblk = RecordAndGetPageWithFreeSpace(irel, newblk, freespace, itemsz);
841 	}
842 }
843 
844 /*
845  * Initialize a page as an empty regular BRIN page, WAL-log this, and record
846  * the page in FSM.
847  *
848  * There are several corner situations in which we extend the relation to
849  * obtain a new page and later find that we cannot use it immediately.  When
850  * that happens, we don't want to leave the page go unrecorded in FSM, because
851  * there is no mechanism to get the space back and the index would bloat.
852  * Also, because we would not WAL-log the action that would initialize the
853  * page, the page would go uninitialized in a standby (or after recovery).
854  */
855 static void
brin_initialize_empty_new_buffer(Relation idxrel,Buffer buffer)856 brin_initialize_empty_new_buffer(Relation idxrel, Buffer buffer)
857 {
858 	Page		page;
859 
860 	BRIN_elog((DEBUG2,
861 			   "brin_initialize_empty_new_buffer: initializing blank page %u",
862 			   BufferGetBlockNumber(buffer)));
863 
864 	START_CRIT_SECTION();
865 	page = BufferGetPage(buffer);
866 	brin_page_init(page, BRIN_PAGETYPE_REGULAR);
867 	MarkBufferDirty(buffer);
868 	log_newpage_buffer(buffer, true);
869 	END_CRIT_SECTION();
870 
871 	/*
872 	 * We update the FSM for this page, but this is not WAL-logged.  This is
873 	 * acceptable because VACUUM will scan the index and update the FSM with
874 	 * pages whose FSM records were forgotten in a crash.
875 	 */
876 	RecordPageWithFreeSpace(idxrel, BufferGetBlockNumber(buffer),
877 							br_page_get_freespace(page));
878 }
879 
880 
881 /*
882  * Return the amount of free space on a regular BRIN index page.
883  *
884  * If the page is not a regular page, or has been marked with the
885  * BRIN_EVACUATE_PAGE flag, returns 0.
886  */
887 static Size
br_page_get_freespace(Page page)888 br_page_get_freespace(Page page)
889 {
890 	if (!BRIN_IS_REGULAR_PAGE(page) ||
891 		(BrinPageFlags(page) & BRIN_EVACUATE_PAGE) != 0)
892 		return 0;
893 	else
894 		return PageGetFreeSpace(page);
895 }
896