1 /*-------------------------------------------------------------------------
2  *
3  * hio.c
4  *	  POSTGRES heap access method input/output code.
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/heap/hio.c
12  *
map_volume(&self, set_volume: Option<u16>) -> Result<(u16), Box<dyn Error>>13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/heapam.h"
19 #include "access/hio.h"
20 #include "access/htup_details.h"
21 #include "access/visibilitymap.h"
22 #include "storage/bufmgr.h"
23 #include "storage/freespace.h"
24 #include "storage/lmgr.h"
25 #include "storage/smgr.h"
26 
27 
28 /*
29  * RelationPutHeapTuple - place tuple at specified page
30  *
31  * !!! EREPORT(ERROR) IS DISALLOWED HERE !!!  Must PANIC on failure!!!
32  *
33  * Note - caller must hold BUFFER_LOCK_EXCLUSIVE on the buffer.
34  */
35 void
36 RelationPutHeapTuple(Relation relation,
37 					 Buffer buffer,
38 					 HeapTuple tuple,
39 					 bool token)
40 {
41 	Page		pageHeader;
42 	OffsetNumber offnum;
43 
44 	/*
45 	 * A tuple that's being inserted speculatively should already have its
46 	 * token set.
47 	 */
48 	Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
49 
50 	/* Add the tuple to the page */
51 	pageHeader = BufferGetPage(buffer);
52 
53 	offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
54 						 tuple->t_len, InvalidOffsetNumber, false, true);
55 
56 	if (offnum == InvalidOffsetNumber)
57 		elog(PANIC, "failed to add tuple to page");
58 
59 	/* Update tuple->t_self to the actual position where it was stored */
60 	ItemPointerSet(&(tuple->t_self), BufferGetBlockNumber(buffer), offnum);
61 
62 	/*
63 	 * Insert the correct position into CTID of the stored tuple, too (unless
64 	 * this is a speculative insertion, in which case the token is held in
65 	 * CTID field instead)
66 	 */
67 	if (!token)
68 	{
69 		ItemId		itemId = PageGetItemId(pageHeader, offnum);
70 		HeapTupleHeader item = (HeapTupleHeader) PageGetItem(pageHeader, itemId);
71 
72 		item->t_ctid = tuple->t_self;
73 	}
74 }
75 
76 /*
77  * Read in a buffer in mode, using bulk-insert strategy if bistate isn't NULL.
78  */
get_audio_filter(&self) -> Option<Box<dyn AudioFilter + Send>>79 static Buffer
80 ReadBufferBI(Relation relation, BlockNumber targetBlock,
81 			 ReadBufferMode mode, BulkInsertState bistate)
82 {
83 	Buffer		buffer;
84 
85 	/* If not bulk-insert, exactly like ReadBuffer */
86 	if (!bistate)
87 		return ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
88 								  mode, NULL);
89 
90 	/* If we have the desired block already pinned, re-pin and return it */
91 	if (bistate->current_buf != InvalidBuffer)
92 	{
93 		if (BufferGetBlockNumber(bistate->current_buf) == targetBlock)
94 		{
95 			/*
96 			 * Currently the LOCK variants are only used for extending
97 			 * relation, which should never reach this branch.
98 			 */
99 			Assert(mode != RBM_ZERO_AND_LOCK &&
100 				   mode != RBM_ZERO_AND_CLEANUP_LOCK);
101 
102 			IncrBufferRefCount(bistate->current_buf);
103 			return bistate->current_buf;
104 		}
105 		/* ... else drop the old buffer */
106 		ReleaseBuffer(bistate->current_buf);
107 		bistate->current_buf = InvalidBuffer;
108 	}
109 
110 	/* Perform a read using the buffer strategy */
111 	buffer = ReadBufferExtended(relation, MAIN_FORKNUM, targetBlock,
112 								mode, bistate->strategy);
113 
114 	/* Save the selected block as target for future inserts */
115 	IncrBufferRefCount(buffer);
116 	bistate->current_buf = buffer;
117 
118 	return buffer;
119 }
120 
121 /*
122  * For each heap page which is all-visible, acquire a pin on the appropriate
123  * visibility map page, if we haven't already got one.
124  *
125  * buffer2 may be InvalidBuffer, if only one buffer is involved.  buffer1
126  * must not be InvalidBuffer.  If both buffers are specified, block1 must
127  * be less than block2.
128  */
129 static void
130 GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
131 					 BlockNumber block1, BlockNumber block2,
132 					 Buffer *vmbuffer1, Buffer *vmbuffer2)
133 {
134 	bool		need_to_pin_buffer1;
135 	bool		need_to_pin_buffer2;
136 
137 	Assert(BufferIsValid(buffer1));
138 	Assert(buffer2 == InvalidBuffer || block1 <= block2);
139 
140 	while (1)
141 	{
142 		/* Figure out which pins we need but don't have. */
143 		need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
144 			&& !visibilitymap_pin_ok(block1, *vmbuffer1);
145 		need_to_pin_buffer2 = buffer2 != InvalidBuffer
146 			&& PageIsAllVisible(BufferGetPage(buffer2))
147 			&& !visibilitymap_pin_ok(block2, *vmbuffer2);
148 		if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
149 			return;
150 
151 		/* We must unlock both buffers before doing any I/O. */
152 		LockBuffer(buffer1, BUFFER_LOCK_UNLOCK);
153 		if (buffer2 != InvalidBuffer && buffer2 != buffer1)
154 			LockBuffer(buffer2, BUFFER_LOCK_UNLOCK);
155 
156 		/* Get pins. */
157 		if (need_to_pin_buffer1)
158 			visibilitymap_pin(relation, block1, vmbuffer1);
159 		if (need_to_pin_buffer2)
160 			visibilitymap_pin(relation, block2, vmbuffer2);
161 
162 		/* Relock buffers. */
163 		LockBuffer(buffer1, BUFFER_LOCK_EXCLUSIVE);
164 		if (buffer2 != InvalidBuffer && buffer2 != buffer1)
165 			LockBuffer(buffer2, BUFFER_LOCK_EXCLUSIVE);
166 
167 		/*
168 		 * If there are two buffers involved and we pinned just one of them,
169 		 * it's possible that the second one became all-visible while we were
170 		 * busy pinning the first one.  If it looks like that's a possible
171 		 * scenario, we'll need to make a second pass through this loop.
172 		 */
173 		if (buffer2 == InvalidBuffer || buffer1 == buffer2
174 			|| (need_to_pin_buffer1 && need_to_pin_buffer2))
175 			break;
176 	}
177 }
178 
179 /*
180  * Extend a relation by multiple blocks to avoid future contention on the
181  * relation extension lock.  Our goal is to pre-extend the relation by an
182  * amount which ramps up as the degree of contention ramps up, but limiting
183  * the result to some sane overall value.
184  */
185 static void
186 RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
187 {
188 	BlockNumber blockNum,
189 				firstBlock = InvalidBlockNumber;
190 	int			extraBlocks;
191 	int			lockWaiters;
192 
193 	/* Use the length of the lock wait queue to judge how much to extend. */
194 	lockWaiters = RelationExtensionLockWaiterCount(relation);
195 	if (lockWaiters <= 0)
196 		return;
197 
198 	/*
199 	 * It might seem like multiplying the number of lock waiters by as much as
200 	 * 20 is too aggressive, but benchmarking revealed that smaller numbers
201 	 * were insufficient.  512 is just an arbitrary cap to prevent
202 	 * pathological results.
203 	 */
204 	extraBlocks = Min(512, lockWaiters * 20);
205 
206 	do
207 	{
208 		Buffer		buffer;
209 		Page		page;
210 		Size		freespace;
211 
212 		/*
213 		 * Extend by one page.  This should generally match the main-line
214 		 * extension code in RelationGetBufferForTuple, except that we hold
215 		 * the relation extension lock throughout, and we don't immediately
216 		 * initialize the page (see below).
217 		 */
218 		buffer = ReadBufferBI(relation, P_NEW, RBM_ZERO_AND_LOCK, bistate);
219 		page = BufferGetPage(buffer);
220 
221 		if (!PageIsNew(page))
222 			elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
223 				 BufferGetBlockNumber(buffer),
224 				 RelationGetRelationName(relation));
225 
226 		/*
227 		 * Add the page to the FSM without initializing. If we were to
228 		 * initialize here, the page would potentially get flushed out to disk
229 		 * before we add any useful content. There's no guarantee that that'd
230 		 * happen before a potential crash, so we need to deal with
231 		 * uninitialized pages anyway, thus avoid the potential for
232 		 * unnecessary writes.
233 		 */
234 
235 		/* we'll need this info below */
236 		blockNum = BufferGetBlockNumber(buffer);
237 		freespace = BufferGetPageSize(buffer) - SizeOfPageHeaderData;
238 
239 		UnlockReleaseBuffer(buffer);
240 
241 		/* Remember first block number thus added. */
242 		if (firstBlock == InvalidBlockNumber)
243 			firstBlock = blockNum;
244 
245 		/*
246 		 * Immediately update the bottom level of the FSM.  This has a good
247 		 * chance of making this page visible to other concurrently inserting
248 		 * backends, and we want that to happen without delay.
249 		 */
250 		RecordPageWithFreeSpace(relation, blockNum, freespace);
251 	}
252 	while (--extraBlocks > 0);
253 
254 	/*
255 	 * Updating the upper levels of the free space map is too expensive to do
256 	 * for every block, but it's worth doing once at the end to make sure that
257 	 * subsequent insertion activity sees all of those nifty free pages we
258 	 * just inserted.
259 	 */
260 	FreeSpaceMapVacuumRange(relation, firstBlock, blockNum + 1);
261 }
262 
263 /*
264  * RelationGetBufferForTuple
265  *
266  *	Returns pinned and exclusive-locked buffer of a page in given relation
267  *	with free space >= given len.
268  *
269  *	If otherBuffer is not InvalidBuffer, then it references a previously
270  *	pinned buffer of another page in the same relation; on return, this
271  *	buffer will also be exclusive-locked.  (This case is used by heap_update;
272  *	the otherBuffer contains the tuple being updated.)
273  *
274  *	The reason for passing otherBuffer is that if two backends are doing
275  *	concurrent heap_update operations, a deadlock could occur if they try
276  *	to lock the same two buffers in opposite orders.  To ensure that this
277  *	can't happen, we impose the rule that buffers of a relation must be
278  *	locked in increasing page number order.  This is most conveniently done
279  *	by having RelationGetBufferForTuple lock them both, with suitable care
280  *	for ordering.
281  *
282  *	NOTE: it is unlikely, but not quite impossible, for otherBuffer to be the
283  *	same buffer we select for insertion of the new tuple (this could only
284  *	happen if space is freed in that page after heap_update finds there's not
285  *	enough there).  In that case, the page will be pinned and locked only once.
286  *
287  *	We also handle the possibility that the all-visible flag will need to be
288  *	cleared on one or both pages.  If so, pin on the associated visibility map
289  *	page must be acquired before acquiring buffer lock(s), to avoid possibly
290  *	doing I/O while holding buffer locks.  The pins are passed back to the
291  *	caller using the input-output arguments vmbuffer and vmbuffer_other.
292  *	Note that in some cases the caller might have already acquired such pins,
293  *	which is indicated by these arguments not being InvalidBuffer on entry.
294  *
295  *	We normally use FSM to help us find free space.  However,
296  *	if HEAP_INSERT_SKIP_FSM is specified, we just append a new empty page to
297  *	the end of the relation if the tuple won't fit on the current target page.
298  *	This can save some cycles when we know the relation is new and doesn't
299  *	contain useful amounts of free space.
300  *
301  *	HEAP_INSERT_SKIP_FSM is also useful for non-WAL-logged additions to a
302  *	relation, if the caller holds exclusive lock and is careful to invalidate
303  *	relation's smgr_targblock before the first insertion --- that ensures that
304  *	all insertions will occur into newly added pages and not be intermixed
305  *	with tuples from other transactions.  That way, a crash can't risk losing
306  *	any committed data of other transactions.  (See heap_insert's comments
307  *	for additional constraints needed for safe usage of this behavior.)
308  *
309  *	The caller can also provide a BulkInsertState object to optimize many
310  *	insertions into the same relation.  This keeps a pin on the current
311  *	insertion target page (to save pin/unpin cycles) and also passes a
312  *	BULKWRITE buffer selection strategy object to the buffer manager.
313  *	Passing NULL for bistate selects the default behavior.
314  *
315  *	We always try to avoid filling existing pages further than the fillfactor.
316  *	This is OK since this routine is not consulted when updating a tuple and
317  *	keeping it on the same page, which is the scenario fillfactor is meant
318  *	to reserve space for.
319  *
320  *	ereport(ERROR) is allowed here, so this routine *must* be called
321  *	before any (unlogged) changes are made in buffer pool.
322  */
323 Buffer
324 RelationGetBufferForTuple(Relation relation, Size len,
325 						  Buffer otherBuffer, int options,
326 						  BulkInsertState bistate,
327 						  Buffer *vmbuffer, Buffer *vmbuffer_other)
328 {
329 	bool		use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
330 	Buffer		buffer = InvalidBuffer;
331 	Page		page;
332 	Size		pageFreeSpace = 0,
333 				saveFreeSpace = 0;
334 	BlockNumber targetBlock,
335 				otherBlock;
336 	bool		needLock;
337 
338 	len = MAXALIGN(len);		/* be conservative */
339 
340 	/* Bulk insert is not supported for updates, only inserts. */
341 	Assert(otherBuffer == InvalidBuffer || !bistate);
342 
343 	/*
344 	 * If we're gonna fail for oversize tuple, do it right away
345 	 */
346 	if (len > MaxHeapTupleSize)
347 		ereport(ERROR,
348 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
349 				 errmsg("row is too big: size %zu, maximum size %zu",
350 						len, MaxHeapTupleSize)));
351 
352 	/* Compute desired extra freespace due to fillfactor option */
353 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
354 												   HEAP_DEFAULT_FILLFACTOR);
355 
356 	if (otherBuffer != InvalidBuffer)
357 		otherBlock = BufferGetBlockNumber(otherBuffer);
358 	else
359 		otherBlock = InvalidBlockNumber;	/* just to keep compiler quiet */
360 
361 	/*
362 	 * We first try to put the tuple on the same page we last inserted a tuple
363 	 * on, as cached in the BulkInsertState or relcache entry.  If that
364 	 * doesn't work, we ask the Free Space Map to locate a suitable page.
365 	 * Since the FSM's info might be out of date, we have to be prepared to
366 	 * loop around and retry multiple times. (To insure this isn't an infinite
367 	 * loop, we must update the FSM with the correct amount of free space on
368 	 * each page that proves not to be suitable.)  If the FSM has no record of
369 	 * a page with enough free space, we give up and extend the relation.
370 	 *
371 	 * When use_fsm is false, we either put the tuple onto the existing target
372 	 * page or extend the relation.
373 	 */
374 	if (len + saveFreeSpace > MaxHeapTupleSize)
375 	{
376 		/* can't fit, don't bother asking FSM */
377 		targetBlock = InvalidBlockNumber;
378 		use_fsm = false;
379 	}
380 	else if (bistate && bistate->current_buf != InvalidBuffer)
381 		targetBlock = BufferGetBlockNumber(bistate->current_buf);
382 	else
383 		targetBlock = RelationGetTargetBlock(relation);
384 
385 	if (targetBlock == InvalidBlockNumber && use_fsm)
386 	{
387 		/*
388 		 * We have no cached target page, so ask the FSM for an initial
389 		 * target.
390 		 */
391 		targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
392 
393 		/*
394 		 * If the FSM knows nothing of the rel, try the last page before we
395 		 * give up and extend.  This avoids one-tuple-per-page syndrome during
396 		 * bootstrapping or in a recently-started system.
397 		 */
398 		if (targetBlock == InvalidBlockNumber)
399 		{
400 			BlockNumber nblocks = RelationGetNumberOfBlocks(relation);
401 
402 			if (nblocks > 0)
403 				targetBlock = nblocks - 1;
404 		}
405 	}
406 
407 loop:
408 	while (targetBlock != InvalidBlockNumber)
409 	{
410 		/*
411 		 * Read and exclusive-lock the target block, as well as the other
412 		 * block if one was given, taking suitable care with lock ordering and
413 		 * the possibility they are the same block.
414 		 *
415 		 * If the page-level all-visible flag is set, caller will need to
416 		 * clear both that and the corresponding visibility map bit.  However,
417 		 * by the time we return, we'll have x-locked the buffer, and we don't
418 		 * want to do any I/O while in that state.  So we check the bit here
419 		 * before taking the lock, and pin the page if it appears necessary.
420 		 * Checking without the lock creates a risk of getting the wrong
421 		 * answer, so we'll have to recheck after acquiring the lock.
422 		 */
423 		if (otherBuffer == InvalidBuffer)
424 		{
425 			/* easy case */
426 			buffer = ReadBufferBI(relation, targetBlock, RBM_NORMAL, bistate);
427 			if (PageIsAllVisible(BufferGetPage(buffer)))
428 				visibilitymap_pin(relation, targetBlock, vmbuffer);
429 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
430 		}
431 		else if (otherBlock == targetBlock)
432 		{
433 			/* also easy case */
434 			buffer = otherBuffer;
435 			if (PageIsAllVisible(BufferGetPage(buffer)))
436 				visibilitymap_pin(relation, targetBlock, vmbuffer);
437 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
438 		}
439 		else if (otherBlock < targetBlock)
440 		{
441 			/* lock other buffer first */
442 			buffer = ReadBuffer(relation, targetBlock);
443 			if (PageIsAllVisible(BufferGetPage(buffer)))
444 				visibilitymap_pin(relation, targetBlock, vmbuffer);
445 			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
446 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
447 		}
448 		else
449 		{
450 			/* lock target buffer first */
451 			buffer = ReadBuffer(relation, targetBlock);
452 			if (PageIsAllVisible(BufferGetPage(buffer)))
453 				visibilitymap_pin(relation, targetBlock, vmbuffer);
454 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
455 			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
456 		}
457 
458 		/*
459 		 * We now have the target page (and the other buffer, if any) pinned
460 		 * and locked.  However, since our initial PageIsAllVisible checks
461 		 * were performed before acquiring the lock, the results might now be
462 		 * out of date, either for the selected victim buffer, or for the
463 		 * other buffer passed by the caller.  In that case, we'll need to
464 		 * give up our locks, go get the pin(s) we failed to get earlier, and
465 		 * re-lock.  That's pretty painful, but hopefully shouldn't happen
466 		 * often.
467 		 *
468 		 * Note that there's a small possibility that we didn't pin the page
469 		 * above but still have the correct page pinned anyway, either because
470 		 * we've already made a previous pass through this loop, or because
471 		 * caller passed us the right page anyway.
472 		 *
473 		 * Note also that it's possible that by the time we get the pin and
474 		 * retake the buffer locks, the visibility map bit will have been
475 		 * cleared by some other backend anyway.  In that case, we'll have
476 		 * done a bit of extra work for no gain, but there's no real harm
477 		 * done.
478 		 */
479 		if (otherBuffer == InvalidBuffer || targetBlock <= otherBlock)
480 			GetVisibilityMapPins(relation, buffer, otherBuffer,
481 								 targetBlock, otherBlock, vmbuffer,
482 								 vmbuffer_other);
483 		else
484 			GetVisibilityMapPins(relation, otherBuffer, buffer,
485 								 otherBlock, targetBlock, vmbuffer_other,
486 								 vmbuffer);
487 
488 		/*
489 		 * Now we can check to see if there's enough free space here. If so,
490 		 * we're done.
491 		 */
492 		page = BufferGetPage(buffer);
493 
494 		/*
495 		 * If necessary initialize page, it'll be used soon.  We could avoid
496 		 * dirtying the buffer here, and rely on the caller to do so whenever
497 		 * it puts a tuple onto the page, but there seems not much benefit in
498 		 * doing so.
499 		 */
500 		if (PageIsNew(page))
501 		{
502 			PageInit(page, BufferGetPageSize(buffer), 0);
503 			MarkBufferDirty(buffer);
504 		}
505 
506 		pageFreeSpace = PageGetHeapFreeSpace(page);
507 		if (len + saveFreeSpace <= pageFreeSpace)
508 		{
509 			/* use this page as future insert target, too */
510 			RelationSetTargetBlock(relation, targetBlock);
511 			return buffer;
512 		}
513 
514 		/*
515 		 * Not enough space, so we must give up our page locks and pin (if
516 		 * any) and prepare to look elsewhere.  We don't care which order we
517 		 * unlock the two buffers in, so this can be slightly simpler than the
518 		 * code above.
519 		 */
520 		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
521 		if (otherBuffer == InvalidBuffer)
522 			ReleaseBuffer(buffer);
523 		else if (otherBlock != targetBlock)
524 		{
525 			LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
526 			ReleaseBuffer(buffer);
527 		}
528 
529 		/* Without FSM, always fall out of the loop and extend */
530 		if (!use_fsm)
531 			break;
532 
533 		/*
534 		 * Update FSM as to condition of this page, and ask for another page
535 		 * to try.
536 		 */
537 		targetBlock = RecordAndGetPageWithFreeSpace(relation,
538 													targetBlock,
539 													pageFreeSpace,
540 													len + saveFreeSpace);
541 	}
542 
543 	/*
544 	 * Have to extend the relation.
545 	 *
546 	 * We have to use a lock to ensure no one else is extending the rel at the
547 	 * same time, else we will both try to initialize the same new page.  We
548 	 * can skip locking for new or temp relations, however, since no one else
549 	 * could be accessing them.
550 	 */
551 	needLock = !RELATION_IS_LOCAL(relation);
552 
553 	/*
554 	 * If we need the lock but are not able to acquire it immediately, we'll
555 	 * consider extending the relation by multiple blocks at a time to manage
556 	 * contention on the relation extension lock.  However, this only makes
557 	 * sense if we're using the FSM; otherwise, there's no point.
558 	 */
559 	if (needLock)
560 	{
561 		if (!use_fsm)
562 			LockRelationForExtension(relation, ExclusiveLock);
563 		else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
564 		{
565 			/* Couldn't get the lock immediately; wait for it. */
566 			LockRelationForExtension(relation, ExclusiveLock);
567 
568 			/*
569 			 * Check if some other backend has extended a block for us while
570 			 * we were waiting on the lock.
571 			 */
572 			targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
573 
574 			/*
575 			 * If some other waiter has already extended the relation, we
576 			 * don't need to do so; just use the existing freespace.
577 			 */
578 			if (targetBlock != InvalidBlockNumber)
579 			{
580 				UnlockRelationForExtension(relation, ExclusiveLock);
581 				goto loop;
582 			}
583 
584 			/* Time to bulk-extend. */
585 			RelationAddExtraBlocks(relation, bistate);
586 		}
587 	}
588 
589 	/*
590 	 * In addition to whatever extension we performed above, we always add at
591 	 * least one block to satisfy our own request.
592 	 *
593 	 * XXX This does an lseek - rather expensive - but at the moment it is the
594 	 * only way to accurately determine how many blocks are in a relation.  Is
595 	 * it worth keeping an accurate file length in shared memory someplace,
596 	 * rather than relying on the kernel to do it for us?
597 	 */
598 	buffer = ReadBufferBI(relation, P_NEW, RBM_ZERO_AND_LOCK, bistate);
599 
600 	/*
601 	 * We need to initialize the empty new page.  Double-check that it really
602 	 * is empty (this should never happen, but if it does we don't want to
603 	 * risk wiping out valid data).
604 	 */
605 	page = BufferGetPage(buffer);
606 
607 	if (!PageIsNew(page))
608 		elog(ERROR, "page %u of relation \"%s\" should be empty but is not",
609 			 BufferGetBlockNumber(buffer),
610 			 RelationGetRelationName(relation));
611 
612 	PageInit(page, BufferGetPageSize(buffer), 0);
613 	MarkBufferDirty(buffer);
614 
615 	/*
616 	 * Release the file-extension lock; it's now OK for someone else to extend
617 	 * the relation some more.
618 	 */
619 	if (needLock)
620 		UnlockRelationForExtension(relation, ExclusiveLock);
621 
622 	/*
623 	 * Lock the other buffer. It's guaranteed to be of a lower page number
624 	 * than the new page. To conform with the deadlock prevent rules, we ought
625 	 * to lock otherBuffer first, but that would give other backends a chance
626 	 * to put tuples on our page. To reduce the likelihood of that, attempt to
627 	 * lock the other buffer conditionally, that's very likely to work.
628 	 * Otherwise we need to lock buffers in the correct order, and retry if
629 	 * the space has been used in the mean time.
630 	 *
631 	 * Alternatively, we could acquire the lock on otherBuffer before
632 	 * extending the relation, but that'd require holding the lock while
633 	 * performing IO, which seems worse than an unlikely retry.
634 	 */
635 	if (otherBuffer != InvalidBuffer)
636 	{
637 		Assert(otherBuffer != buffer);
638 		targetBlock = BufferGetBlockNumber(buffer);
639 		Assert(targetBlock > otherBlock);
640 
641 		if (unlikely(!ConditionalLockBuffer(otherBuffer)))
642 		{
643 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
644 			LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
645 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
646 
647 			/*
648 			 * Because the buffers were unlocked for a while, it's possible,
649 			 * although unlikely, that an all-visible flag became set or that
650 			 * somebody used up the available space in the new page.  We can
651 			 * use GetVisibilityMapPins to deal with the first case.  In the
652 			 * second case, just retry from start.
653 			 */
654 			GetVisibilityMapPins(relation, otherBuffer, buffer,
655 								 otherBlock, targetBlock, vmbuffer_other,
656 								 vmbuffer);
657 
658 			if (len > PageGetHeapFreeSpace(page))
659 			{
660 				LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
661 				UnlockReleaseBuffer(buffer);
662 
663 				goto loop;
664 			}
665 		}
666 	}
667 
668 	if (len > PageGetHeapFreeSpace(page))
669 	{
670 		/* We should not get here given the test at the top */
671 		elog(PANIC, "tuple is too big: size %zu", len);
672 	}
673 
674 	/*
675 	 * Remember the new page as our target for future insertions.
676 	 *
677 	 * XXX should we enter the new page into the free space map immediately,
678 	 * or just keep it for this backend's exclusive use in the short run
679 	 * (until VACUUM sees it)?	Seems to depend on whether you expect the
680 	 * current backend to make more insertions or not, which is probably a
681 	 * good bet most of the time.  So for now, don't add it to FSM yet.
682 	 */
683 	RelationSetTargetBlock(relation, BufferGetBlockNumber(buffer));
684 
685 	return buffer;
686 }
687