1 /*-------------------------------------------------------------------------
2  *
3  * freespace.c
4  *	  POSTGRES free space map for quickly finding free space in relations
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/freespace/freespace.c
12  *
13  *
14  * NOTES:
15  *
16  *	Free Space Map keeps track of the amount of free space on pages, and
17  *	allows quickly searching for a page with enough free space. The FSM is
18  *	stored in a dedicated relation fork of all heap relations, and those
19  *	index access methods that need it (see also indexfsm.c). See README for
20  *	more information.
21  *
22  *-------------------------------------------------------------------------
23  */
24 #include "postgres.h"
25 
26 #include "access/htup_details.h"
27 #include "access/xlogutils.h"
emit_isk12(ARMIns ai,int32_t n)28 #include "miscadmin.h"
29 #include "storage/freespace.h"
30 #include "storage/fsm_internals.h"
31 #include "storage/lmgr.h"
32 #include "storage/smgr.h"
33 
34 
35 /*
36  * We use just one byte to store the amount of free space on a page, so we
37  * divide the amount of free space a page can have into 256 different
38  * categories. The highest category, 255, represents a page with at least
39  * MaxFSMRequestSize bytes of free space, and the second highest category
40  * represents the range from 254 * FSM_CAT_STEP, inclusive, to
41  * MaxFSMRequestSize, exclusive.
42  *
43  * MaxFSMRequestSize depends on the architecture and BLCKSZ, but assuming
44  * default 8k BLCKSZ, and that MaxFSMRequestSize is 8164 bytes, the
45  * categories look like this:
46  *
47  *
48  * Range	 Category
49  * 0	- 31   0
50  * 32	- 63   1
51  * ...    ...  ...
52  * 8096 - 8127 253
53  * 8128 - 8163 254
54  * 8164 - 8192 255
55  *
56  * The reason that MaxFSMRequestSize is special is that if MaxFSMRequestSize
57  * isn't equal to a range boundary, a page with exactly MaxFSMRequestSize
58  * bytes of free space wouldn't satisfy a request for MaxFSMRequestSize
59  * bytes. If there isn't more than MaxFSMRequestSize bytes of free space on a
60  * completely empty page, that would mean that we could never satisfy a
61  * request of exactly MaxFSMRequestSize bytes.
62  */
63 #define FSM_CATEGORIES	256
64 #define FSM_CAT_STEP	(BLCKSZ / FSM_CATEGORIES)
65 #define MaxFSMRequestSize	MaxHeapTupleSize
66 
67 /*
68  * Depth of the on-disk tree. We need to be able to address 2^32-1 blocks,
69  * and 1626 is the smallest number that satisfies X^3 >= 2^32-1. Likewise,
70  * 216 is the smallest number that satisfies X^4 >= 2^32-1. In practice,
71  * this means that 4096 bytes is the smallest BLCKSZ that we can get away
72  * with a 3-level tree, and 512 is the smallest we support.
73  */
74 #define FSM_TREE_DEPTH	((SlotsPerFSMPage >= 1626) ? 3 : 4)
75 
76 #define FSM_ROOT_LEVEL	(FSM_TREE_DEPTH - 1)
77 #define FSM_BOTTOM_LEVEL 0
78 
79 /*
80  * The internal FSM routines work on a logical addressing scheme. Each
81  * level of the tree can be thought of as a separately addressable file.
82  */
83 typedef struct
84 {
85 	int			level;			/* level */
86 	int			logpageno;		/* page number within the level */
87 } FSMAddress;
88 
89 /* Address of the root page. */
90 static const FSMAddress FSM_ROOT_ADDRESS = {FSM_ROOT_LEVEL, 0};
emit_lso(ASMState * as,ARMIns ai,Reg rd,Reg rn,int32_t ofs)91 
92 /* functions to navigate the tree */
93 static FSMAddress fsm_get_child(FSMAddress parent, uint16 slot);
94 static FSMAddress fsm_get_parent(FSMAddress child, uint16 *slot);
95 static FSMAddress fsm_get_location(BlockNumber heapblk, uint16 *slot);
96 static BlockNumber fsm_get_heap_blk(FSMAddress addr, uint16 slot);
97 static BlockNumber fsm_logical_to_physical(FSMAddress addr);
98 
99 static Buffer fsm_readbuf(Relation rel, FSMAddress addr, bool extend);
100 static void fsm_extend(Relation rel, BlockNumber fsm_nblocks);
101 
102 /* functions to convert amount of free space to a FSM category */
103 static uint8 fsm_space_avail_to_cat(Size avail);
104 static uint8 fsm_space_needed_to_cat(Size needed);
105 static Size fsm_space_cat_to_avail(uint8 cat);
106 
107 /* workhorse functions for various operations */
108 static int	fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
emit_vlso(ASMState * as,ARMIns ai,Reg rd,Reg rn,int32_t ofs)109 							   uint8 newValue, uint8 minValue);
110 static BlockNumber fsm_search(Relation rel, uint8 min_cat);
111 static uint8 fsm_vacuum_page(Relation rel, FSMAddress addr,
112 							 BlockNumber start, BlockNumber end,
113 							 bool *eof);
114 
115 
116 /******** Public API ********/
117 
118 /*
119  * GetPageWithFreeSpace - try to find a page in the given relation with
120  *		at least the specified amount of free space.
121  *
122  * If successful, return the block number; if not, return InvalidBlockNumber.
123  *
emit_kdelta1(ASMState * as,Reg d,int32_t i)124  * The caller must be prepared for the possibility that the returned page
125  * will turn out to have too little space available by the time the caller
126  * gets a lock on it.  In that case, the caller should report the actual
127  * amount of free space available on that page and then try again (see
128  * RecordAndGetPageWithFreeSpace).  If InvalidBlockNumber is returned,
129  * extend the relation.
130  */
131 BlockNumber
132 GetPageWithFreeSpace(Relation rel, Size spaceNeeded)
133 {
134 	uint8		min_cat = fsm_space_needed_to_cat(spaceNeeded);
135 
136 	return fsm_search(rel, min_cat);
137 }
138 
139 /*
140  * RecordAndGetPageWithFreeSpace - update info about a page and try again.
141  *
142  * We provide this combo form to save some locking overhead, compared to
143  * separate RecordPageWithFreeSpace + GetPageWithFreeSpace calls. There's
144  * also some effort to return a page close to the old page; if there's a
145  * page with enough free space on the same FSM page where the old one page
146  * is located, it is preferred.
147  */
emit_kdelta2(ASMState * as,Reg rd,int32_t i)148 BlockNumber
149 RecordAndGetPageWithFreeSpace(Relation rel, BlockNumber oldPage,
150 							  Size oldSpaceAvail, Size spaceNeeded)
151 {
152 	int			old_cat = fsm_space_avail_to_cat(oldSpaceAvail);
153 	int			search_cat = fsm_space_needed_to_cat(spaceNeeded);
154 	FSMAddress	addr;
155 	uint16		slot;
156 	int			search_slot;
157 
158 	/* Get the location of the FSM byte representing the heap block */
159 	addr = fsm_get_location(oldPage, &slot);
160 
161 	search_slot = fsm_set_and_search(rel, addr, slot, old_cat, search_cat);
162 
163 	/*
164 	 * If fsm_set_and_search found a suitable new block, return that.
165 	 * Otherwise, search as usual.
166 	 */
167 	if (search_slot != -1)
168 		return fsm_get_heap_blk(addr, search_slot);
169 	else
170 		return fsm_search(rel, search_cat);
171 }
172 
173 /*
174  * RecordPageWithFreeSpace - update info about a page.
175  *
176  * Note that if the new spaceAvail value is higher than the old value stored
emit_loadi(ASMState * as,Reg rd,int32_t i)177  * in the FSM, the space might not become visible to searchers until the next
178  * FreeSpaceMapVacuum call, which updates the upper level pages.
179  */
180 void
181 RecordPageWithFreeSpace(Relation rel, BlockNumber heapBlk, Size spaceAvail)
182 {
183 	int			new_cat = fsm_space_avail_to_cat(spaceAvail);
184 	FSMAddress	addr;
185 	uint16		slot;
186 
187 	/* Get the location of the FSM byte representing the heap block */
188 	addr = fsm_get_location(heapBlk, &slot);
189 
190 	fsm_set_and_search(rel, addr, slot, new_cat, 0);
191 }
192 
193 /*
194  * XLogRecordPageWithFreeSpace - like RecordPageWithFreeSpace, for use in
195  *		WAL replay
196  */
197 void
198 XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
199 							Size spaceAvail)
200 {
201 	int			new_cat = fsm_space_avail_to_cat(spaceAvail);
202 	FSMAddress	addr;
203 	uint16		slot;
204 	BlockNumber blkno;
205 	Buffer		buf;
206 	Page		page;
207 
208 	/* Get the location of the FSM byte representing the heap block */
209 	addr = fsm_get_location(heapBlk, &slot);
210 	blkno = fsm_logical_to_physical(addr);
211 
212 	/* If the page doesn't exist already, extend */
213 	buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
214 	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
215 
216 	page = BufferGetPage(buf);
emit_lsptr(ASMState * as,ARMIns ai,Reg r,void * p)217 	if (PageIsNew(page))
218 		PageInit(page, BLCKSZ, 0);
219 
220 	if (fsm_set_avail(page, slot, new_cat))
221 		MarkBufferDirtyHint(buf, false);
222 	UnlockReleaseBuffer(buf);
223 }
224 
225 /*
emit_loadk64(ASMState * as,Reg r,IRIns * ir)226  * GetRecordedFreePage - return the amount of free space on a particular page,
227  *		according to the FSM.
228  */
229 Size
230 GetRecordedFreeSpace(Relation rel, BlockNumber heapBlk)
231 {
232 	FSMAddress	addr;
233 	uint16		slot;
234 	Buffer		buf;
235 	uint8		cat;
236 
237 	/* Get the location of the FSM byte representing the heap block */
238 	addr = fsm_get_location(heapBlk, &slot);
239 
240 	buf = fsm_readbuf(rel, addr, false);
241 	if (!BufferIsValid(buf))
242 		return 0;
243 	cat = fsm_get_avail(BufferGetPage(buf), slot);
244 	ReleaseBuffer(buf);
245 
246 	return fsm_space_cat_to_avail(cat);
247 }
248 
249 /*
250  * FreeSpaceMapTruncateRel - adjust for truncation of a relation.
251  *
252  * The caller must hold AccessExclusiveLock on the relation, to ensure that
253  * other backends receive the smgr invalidation event that this function sends
254  * before they access the FSM again.
255  *
256  * nblocks is the new size of the heap.
257  */
258 void
259 FreeSpaceMapTruncateRel(Relation rel, BlockNumber nblocks)
260 {
261 	BlockNumber new_nfsmblocks;
262 	FSMAddress	first_removed_address;
263 	uint16		first_removed_slot;
emit_branch(ASMState * as,ARMIns ai,MCode * target)264 	Buffer		buf;
265 
266 	RelationOpenSmgr(rel);
267 
268 	/*
269 	 * If no FSM has been created yet for this relation, there's nothing to
270 	 * truncate.
271 	 */
272 	if (!smgrexists(rel->rd_smgr, FSM_FORKNUM))
273 		return;
274 
emit_call(ASMState * as,void * target)275 	/* Get the location in the FSM of the first removed heap block */
276 	first_removed_address = fsm_get_location(nblocks, &first_removed_slot);
277 
278 	/*
279 	 * Zero out the tail of the last remaining FSM page. If the slot
280 	 * representing the first removed heap block is at a page boundary, as the
281 	 * first slot on the FSM page that first_removed_address points to, we can
282 	 * just truncate that page altogether.
283 	 */
284 	if (first_removed_slot > 0)
285 	{
286 		buf = fsm_readbuf(rel, first_removed_address, false);
287 		if (!BufferIsValid(buf))
288 			return;				/* nothing to do; the FSM was already smaller */
289 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
290 
291 		/* NO EREPORT(ERROR) from here till changes are logged */
292 		START_CRIT_SECTION();
emit_movrr(ASMState * as,IRIns * ir,Reg dst,Reg src)293 
294 		fsm_truncate_avail(BufferGetPage(buf), first_removed_slot);
295 
296 		/*
297 		 * Truncation of a relation is WAL-logged at a higher-level, and we
298 		 * will be called at WAL replay. But if checksums are enabled, we need
299 		 * to still write a WAL record to protect against a torn page, if the
300 		 * page is flushed to disk before the truncation WAL record. We cannot
301 		 * use MarkBufferDirtyHint here, because that will not dirty the page
302 		 * during recovery.
303 		 */
304 		MarkBufferDirty(buf);
305 		if (!InRecovery && RelationNeedsWAL(rel) && XLogHintBitIsNeeded())
306 			log_newpage_buffer(buf, false);
307 
308 		END_CRIT_SECTION();
309 
310 		UnlockReleaseBuffer(buf);
311 
312 		new_nfsmblocks = fsm_logical_to_physical(first_removed_address) + 1;
313 	}
314 	else
315 	{
316 		new_nfsmblocks = fsm_logical_to_physical(first_removed_address);
emit_loadofs(ASMState * as,IRIns * ir,Reg r,Reg base,int32_t ofs)317 		if (smgrnblocks(rel->rd_smgr, FSM_FORKNUM) <= new_nfsmblocks)
318 			return;				/* nothing to do; the FSM was already smaller */
319 	}
320 
321 	/* Truncate the unused FSM pages, and send smgr inval message */
322 	smgrtruncate(rel->rd_smgr, FSM_FORKNUM, new_nfsmblocks);
323 
324 	/*
325 	 * We might as well update the local smgr_fsm_nblocks setting.
326 	 * smgrtruncate sent an smgr cache inval message, which will cause other
327 	 * backends to invalidate their copy of smgr_fsm_nblocks, and this one too
328 	 * at the next command boundary.  But this ensures it isn't outright wrong
329 	 * until then.
emit_storeofs(ASMState * as,IRIns * ir,Reg r,Reg base,int32_t ofs)330 	 */
331 	if (rel->rd_smgr)
332 		rel->rd_smgr->smgr_fsm_nblocks = new_nfsmblocks;
333 
334 	/*
335 	 * Update upper-level FSM pages to account for the truncation.  This is
336 	 * important because the just-truncated pages were likely marked as
337 	 * all-free, and would be preferentially selected.
338 	 */
339 	FreeSpaceMapVacuumRange(rel, nblocks, InvalidBlockNumber);
340 }
341 
342 /*
emit_opk(ASMState * as,ARMIns ai,Reg dest,Reg src,int32_t i,RegSet allow)343  * FreeSpaceMapVacuum - update upper-level pages in the rel's FSM
344  *
345  * We assume that the bottom-level pages have already been updated with
346  * new free-space information.
347  */
348 void
349 FreeSpaceMapVacuum(Relation rel)
350 {
351 	bool		dummy;
352 
353 	/* Recursively scan the tree, starting at the root */
354 	(void) fsm_vacuum_page(rel, FSM_ROOT_ADDRESS,
355 						   (BlockNumber) 0, InvalidBlockNumber,
356 						   &dummy);
357 }
358 
359 /*
360  * FreeSpaceMapVacuumRange - update upper-level pages in the rel's FSM
361  *
362  * As above, but assume that only heap pages between start and end-1 inclusive
363  * have new free-space information, so update only the upper-level slots
364  * covering that block range.  end == InvalidBlockNumber is equivalent to
365  * "all the rest of the relation".
366  */
367 void
368 FreeSpaceMapVacuumRange(Relation rel, BlockNumber start, BlockNumber end)
369 {
370 	bool		dummy;
371 
372 	/* Recursively scan the tree, starting at the root */
373 	if (end > start)
374 		(void) fsm_vacuum_page(rel, FSM_ROOT_ADDRESS, start, end, &dummy);
375 }
376 
377 /******** Internal routines ********/
378 
379 /*
380  * Return category corresponding x bytes of free space
381  */
382 static uint8
383 fsm_space_avail_to_cat(Size avail)
384 {
385 	int			cat;
386 
387 	Assert(avail < BLCKSZ);
388 
389 	if (avail >= MaxFSMRequestSize)
390 		return 255;
391 
392 	cat = avail / FSM_CAT_STEP;
393 
394 	/*
395 	 * The highest category, 255, is reserved for MaxFSMRequestSize bytes or
396 	 * more.
397 	 */
398 	if (cat > 254)
399 		cat = 254;
400 
401 	return (uint8) cat;
402 }
403 
404 /*
405  * Return the lower bound of the range of free space represented by given
406  * category.
407  */
408 static Size
409 fsm_space_cat_to_avail(uint8 cat)
410 {
411 	/* The highest category represents exactly MaxFSMRequestSize bytes. */
412 	if (cat == 255)
413 		return MaxFSMRequestSize;
414 	else
415 		return cat * FSM_CAT_STEP;
416 }
417 
418 /*
419  * Which category does a page need to have, to accommodate x bytes of data?
420  * While fsm_size_to_avail_cat() rounds down, this needs to round up.
421  */
422 static uint8
423 fsm_space_needed_to_cat(Size needed)
424 {
425 	int			cat;
426 
427 	/* Can't ask for more space than the highest category represents */
428 	if (needed > MaxFSMRequestSize)
429 		elog(ERROR, "invalid FSM request size %zu", needed);
430 
431 	if (needed == 0)
432 		return 1;
433 
434 	cat = (needed + FSM_CAT_STEP - 1) / FSM_CAT_STEP;
435 
436 	if (cat > 255)
437 		cat = 255;
438 
439 	return (uint8) cat;
440 }
441 
442 /*
443  * Returns the physical block number of a FSM page
444  */
445 static BlockNumber
446 fsm_logical_to_physical(FSMAddress addr)
447 {
448 	BlockNumber pages;
449 	int			leafno;
450 	int			l;
451 
452 	/*
453 	 * Calculate the logical page number of the first leaf page below the
454 	 * given page.
455 	 */
456 	leafno = addr.logpageno;
457 	for (l = 0; l < addr.level; l++)
458 		leafno *= SlotsPerFSMPage;
459 
460 	/* Count upper level nodes required to address the leaf page */
461 	pages = 0;
462 	for (l = 0; l < FSM_TREE_DEPTH; l++)
463 	{
464 		pages += leafno + 1;
465 		leafno /= SlotsPerFSMPage;
466 	}
467 
468 	/*
469 	 * If the page we were asked for wasn't at the bottom level, subtract the
470 	 * additional lower level pages we counted above.
471 	 */
472 	pages -= addr.level;
473 
474 	/* Turn the page count into 0-based block number */
475 	return pages - 1;
476 }
477 
478 /*
479  * Return the FSM location corresponding to given heap block.
480  */
481 static FSMAddress
482 fsm_get_location(BlockNumber heapblk, uint16 *slot)
483 {
484 	FSMAddress	addr;
485 
486 	addr.level = FSM_BOTTOM_LEVEL;
487 	addr.logpageno = heapblk / SlotsPerFSMPage;
488 	*slot = heapblk % SlotsPerFSMPage;
489 
490 	return addr;
491 }
492 
493 /*
494  * Return the heap block number corresponding to given location in the FSM.
495  */
496 static BlockNumber
497 fsm_get_heap_blk(FSMAddress addr, uint16 slot)
498 {
499 	Assert(addr.level == FSM_BOTTOM_LEVEL);
500 	return ((unsigned int) addr.logpageno) * SlotsPerFSMPage + slot;
501 }
502 
503 /*
504  * Given a logical address of a child page, get the logical page number of
505  * the parent, and the slot within the parent corresponding to the child.
506  */
507 static FSMAddress
508 fsm_get_parent(FSMAddress child, uint16 *slot)
509 {
510 	FSMAddress	parent;
511 
512 	Assert(child.level < FSM_ROOT_LEVEL);
513 
514 	parent.level = child.level + 1;
515 	parent.logpageno = child.logpageno / SlotsPerFSMPage;
516 	*slot = child.logpageno % SlotsPerFSMPage;
517 
518 	return parent;
519 }
520 
521 /*
522  * Given a logical address of a parent page and a slot number, get the
523  * logical address of the corresponding child page.
524  */
525 static FSMAddress
526 fsm_get_child(FSMAddress parent, uint16 slot)
527 {
528 	FSMAddress	child;
529 
530 	Assert(parent.level > FSM_BOTTOM_LEVEL);
531 
532 	child.level = parent.level - 1;
533 	child.logpageno = parent.logpageno * SlotsPerFSMPage + slot;
534 
535 	return child;
536 }
537 
538 /*
539  * Read a FSM page.
540  *
541  * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is
542  * true, the FSM file is extended.
543  */
544 static Buffer
545 fsm_readbuf(Relation rel, FSMAddress addr, bool extend)
546 {
547 	BlockNumber blkno = fsm_logical_to_physical(addr);
548 	Buffer		buf;
549 
550 	RelationOpenSmgr(rel);
551 
552 	/*
553 	 * If we haven't cached the size of the FSM yet, check it first.  Also
554 	 * recheck if the requested block seems to be past end, since our cached
555 	 * value might be stale.  (We send smgr inval messages on truncation, but
556 	 * not on extension.)
557 	 */
558 	if (rel->rd_smgr->smgr_fsm_nblocks == InvalidBlockNumber ||
559 		blkno >= rel->rd_smgr->smgr_fsm_nblocks)
560 	{
561 		if (smgrexists(rel->rd_smgr, FSM_FORKNUM))
562 			rel->rd_smgr->smgr_fsm_nblocks = smgrnblocks(rel->rd_smgr,
563 														 FSM_FORKNUM);
564 		else
565 			rel->rd_smgr->smgr_fsm_nblocks = 0;
566 	}
567 
568 	/* Handle requests beyond EOF */
569 	if (blkno >= rel->rd_smgr->smgr_fsm_nblocks)
570 	{
571 		if (extend)
572 			fsm_extend(rel, blkno + 1);
573 		else
574 			return InvalidBuffer;
575 	}
576 
577 	/*
578 	 * Use ZERO_ON_ERROR mode, and initialize the page if necessary. The FSM
579 	 * information is not accurate anyway, so it's better to clear corrupt
580 	 * pages than error out. Since the FSM changes are not WAL-logged, the
581 	 * so-called torn page problem on crash can lead to pages with corrupt
582 	 * headers, for example.
583 	 *
584 	 * The initialize-the-page part is trickier than it looks, because of the
585 	 * possibility of multiple backends doing this concurrently, and our
586 	 * desire to not uselessly take the buffer lock in the normal path where
587 	 * the page is OK.  We must take the lock to initialize the page, so
588 	 * recheck page newness after we have the lock, in case someone else
589 	 * already did it.  Also, because we initially check PageIsNew with no
590 	 * lock, it's possible to fall through and return the buffer while someone
591 	 * else is still initializing the page (i.e., we might see pd_upper as set
592 	 * but other page header fields are still zeroes).  This is harmless for
593 	 * callers that will take a buffer lock themselves, but some callers
594 	 * inspect the page without any lock at all.  The latter is OK only so
595 	 * long as it doesn't depend on the page header having correct contents.
596 	 * Current usage is safe because PageGetContents() does not require that.
597 	 */
598 	buf = ReadBufferExtended(rel, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR, NULL);
599 	if (PageIsNew(BufferGetPage(buf)))
600 	{
601 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
602 		if (PageIsNew(BufferGetPage(buf)))
603 			PageInit(BufferGetPage(buf), BLCKSZ, 0);
604 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
605 	}
606 	return buf;
607 }
608 
609 /*
610  * Ensure that the FSM fork is at least fsm_nblocks long, extending
611  * it if necessary with empty pages. And by empty, I mean pages filled
612  * with zeros, meaning there's no free space.
613  */
614 static void
615 fsm_extend(Relation rel, BlockNumber fsm_nblocks)
616 {
617 	BlockNumber fsm_nblocks_now;
618 	PGAlignedBlock pg;
619 
620 	PageInit((Page) pg.data, BLCKSZ, 0);
621 
622 	/*
623 	 * We use the relation extension lock to lock out other backends trying to
624 	 * extend the FSM at the same time. It also locks out extension of the
625 	 * main fork, unnecessarily, but extending the FSM happens seldom enough
626 	 * that it doesn't seem worthwhile to have a separate lock tag type for
627 	 * it.
628 	 *
629 	 * Note that another backend might have extended or created the relation
630 	 * by the time we get the lock.
631 	 */
632 	LockRelationForExtension(rel, ExclusiveLock);
633 
634 	/* Might have to re-open if a cache flush happened */
635 	RelationOpenSmgr(rel);
636 
637 	/*
638 	 * Create the FSM file first if it doesn't exist.  If smgr_fsm_nblocks is
639 	 * positive then it must exist, no need for an smgrexists call.
640 	 */
641 	if ((rel->rd_smgr->smgr_fsm_nblocks == 0 ||
642 		 rel->rd_smgr->smgr_fsm_nblocks == InvalidBlockNumber) &&
643 		!smgrexists(rel->rd_smgr, FSM_FORKNUM))
644 		smgrcreate(rel->rd_smgr, FSM_FORKNUM, false);
645 
646 	fsm_nblocks_now = smgrnblocks(rel->rd_smgr, FSM_FORKNUM);
647 
648 	while (fsm_nblocks_now < fsm_nblocks)
649 	{
650 		PageSetChecksumInplace((Page) pg.data, fsm_nblocks_now);
651 
652 		smgrextend(rel->rd_smgr, FSM_FORKNUM, fsm_nblocks_now,
653 				   pg.data, false);
654 		fsm_nblocks_now++;
655 	}
656 
657 	/* Update local cache with the up-to-date size */
658 	rel->rd_smgr->smgr_fsm_nblocks = fsm_nblocks_now;
659 
660 	UnlockRelationForExtension(rel, ExclusiveLock);
661 }
662 
663 /*
664  * Set value in given FSM page and slot.
665  *
666  * If minValue > 0, the updated page is also searched for a page with at
667  * least minValue of free space. If one is found, its slot number is
668  * returned, -1 otherwise.
669  */
670 static int
671 fsm_set_and_search(Relation rel, FSMAddress addr, uint16 slot,
672 				   uint8 newValue, uint8 minValue)
673 {
674 	Buffer		buf;
675 	Page		page;
676 	int			newslot = -1;
677 
678 	buf = fsm_readbuf(rel, addr, true);
679 	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
680 
681 	page = BufferGetPage(buf);
682 
683 	if (fsm_set_avail(page, slot, newValue))
684 		MarkBufferDirtyHint(buf, false);
685 
686 	if (minValue != 0)
687 	{
688 		/* Search while we still hold the lock */
689 		newslot = fsm_search_avail(buf, minValue,
690 								   addr.level == FSM_BOTTOM_LEVEL,
691 								   true);
692 	}
693 
694 	UnlockReleaseBuffer(buf);
695 
696 	return newslot;
697 }
698 
699 /*
700  * Search the tree for a heap page with at least min_cat of free space
701  */
702 static BlockNumber
703 fsm_search(Relation rel, uint8 min_cat)
704 {
705 	int			restarts = 0;
706 	FSMAddress	addr = FSM_ROOT_ADDRESS;
707 
708 	for (;;)
709 	{
710 		int			slot;
711 		Buffer		buf;
712 		uint8		max_avail = 0;
713 
714 		/* Read the FSM page. */
715 		buf = fsm_readbuf(rel, addr, false);
716 
717 		/* Search within the page */
718 		if (BufferIsValid(buf))
719 		{
720 			LockBuffer(buf, BUFFER_LOCK_SHARE);
721 			slot = fsm_search_avail(buf, min_cat,
722 									(addr.level == FSM_BOTTOM_LEVEL),
723 									false);
724 			if (slot == -1)
725 				max_avail = fsm_get_max_avail(BufferGetPage(buf));
726 			UnlockReleaseBuffer(buf);
727 		}
728 		else
729 			slot = -1;
730 
731 		if (slot != -1)
732 		{
733 			/*
734 			 * Descend the tree, or return the found block if we're at the
735 			 * bottom.
736 			 */
737 			if (addr.level == FSM_BOTTOM_LEVEL)
738 				return fsm_get_heap_blk(addr, slot);
739 
740 			addr = fsm_get_child(addr, slot);
741 		}
742 		else if (addr.level == FSM_ROOT_LEVEL)
743 		{
744 			/*
745 			 * At the root, failure means there's no page with enough free
746 			 * space in the FSM. Give up.
747 			 */
748 			return InvalidBlockNumber;
749 		}
750 		else
751 		{
752 			uint16		parentslot;
753 			FSMAddress	parent;
754 
755 			/*
756 			 * At lower level, failure can happen if the value in the upper-
757 			 * level node didn't reflect the value on the lower page. Update
758 			 * the upper node, to avoid falling into the same trap again, and
759 			 * start over.
760 			 *
761 			 * There's a race condition here, if another backend updates this
762 			 * page right after we release it, and gets the lock on the parent
763 			 * page before us. We'll then update the parent page with the now
764 			 * stale information we had. It's OK, because it should happen
765 			 * rarely, and will be fixed by the next vacuum.
766 			 */
767 			parent = fsm_get_parent(addr, &parentslot);
768 			fsm_set_and_search(rel, parent, parentslot, max_avail, 0);
769 
770 			/*
771 			 * If the upper pages are badly out of date, we might need to loop
772 			 * quite a few times, updating them as we go. Any inconsistencies
773 			 * should eventually be corrected and the loop should end. Looping
774 			 * indefinitely is nevertheless scary, so provide an emergency
775 			 * valve.
776 			 */
777 			if (restarts++ > 10000)
778 				return InvalidBlockNumber;
779 
780 			/* Start search all over from the root */
781 			addr = FSM_ROOT_ADDRESS;
782 		}
783 	}
784 }
785 
786 
787 /*
788  * Recursive guts of FreeSpaceMapVacuum
789  *
790  * Examine the FSM page indicated by addr, as well as its children, updating
791  * upper-level nodes that cover the heap block range from start to end-1.
792  * (It's okay if end is beyond the actual end of the map.)
793  * Return the maximum freespace value on this page.
794  *
795  * If addr is past the end of the FSM, set *eof_p to true and return 0.
796  *
797  * This traverses the tree in depth-first order.  The tree is stored
798  * physically in depth-first order, so this should be pretty I/O efficient.
799  */
800 static uint8
801 fsm_vacuum_page(Relation rel, FSMAddress addr,
802 				BlockNumber start, BlockNumber end,
803 				bool *eof_p)
804 {
805 	Buffer		buf;
806 	Page		page;
807 	uint8		max_avail;
808 
809 	/* Read the page if it exists, or return EOF */
810 	buf = fsm_readbuf(rel, addr, false);
811 	if (!BufferIsValid(buf))
812 	{
813 		*eof_p = true;
814 		return 0;
815 	}
816 	else
817 		*eof_p = false;
818 
819 	page = BufferGetPage(buf);
820 
821 	/*
822 	 * If we're above the bottom level, recurse into children, and fix the
823 	 * information stored about them at this level.
824 	 */
825 	if (addr.level > FSM_BOTTOM_LEVEL)
826 	{
827 		FSMAddress	fsm_start,
828 					fsm_end;
829 		uint16		fsm_start_slot,
830 					fsm_end_slot;
831 		int			slot,
832 					start_slot,
833 					end_slot;
834 		bool		eof = false;
835 
836 		/*
837 		 * Compute the range of slots we need to update on this page, given
838 		 * the requested range of heap blocks to consider.  The first slot to
839 		 * update is the one covering the "start" block, and the last slot is
840 		 * the one covering "end - 1".  (Some of this work will be duplicated
841 		 * in each recursive call, but it's cheap enough to not worry about.)
842 		 */
843 		fsm_start = fsm_get_location(start, &fsm_start_slot);
844 		fsm_end = fsm_get_location(end - 1, &fsm_end_slot);
845 
846 		while (fsm_start.level < addr.level)
847 		{
848 			fsm_start = fsm_get_parent(fsm_start, &fsm_start_slot);
849 			fsm_end = fsm_get_parent(fsm_end, &fsm_end_slot);
850 		}
851 		Assert(fsm_start.level == addr.level);
852 
853 		if (fsm_start.logpageno == addr.logpageno)
854 			start_slot = fsm_start_slot;
855 		else if (fsm_start.logpageno > addr.logpageno)
856 			start_slot = SlotsPerFSMPage;	/* shouldn't get here... */
857 		else
858 			start_slot = 0;
859 
860 		if (fsm_end.logpageno == addr.logpageno)
861 			end_slot = fsm_end_slot;
862 		else if (fsm_end.logpageno > addr.logpageno)
863 			end_slot = SlotsPerFSMPage - 1;
864 		else
865 			end_slot = -1;		/* shouldn't get here... */
866 
867 		for (slot = start_slot; slot <= end_slot; slot++)
868 		{
869 			int			child_avail;
870 
871 			CHECK_FOR_INTERRUPTS();
872 
873 			/* After we hit end-of-file, just clear the rest of the slots */
874 			if (!eof)
875 				child_avail = fsm_vacuum_page(rel, fsm_get_child(addr, slot),
876 											  start, end,
877 											  &eof);
878 			else
879 				child_avail = 0;
880 
881 			/* Update information about the child */
882 			if (fsm_get_avail(page, slot) != child_avail)
883 			{
884 				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
885 				fsm_set_avail(page, slot, child_avail);
886 				MarkBufferDirtyHint(buf, false);
887 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
888 			}
889 		}
890 	}
891 
892 	/* Now get the maximum value on the page, to return to caller */
893 	max_avail = fsm_get_max_avail(page);
894 
895 	/*
896 	 * Reset the next slot pointer. This encourages the use of low-numbered
897 	 * pages, increasing the chances that a later vacuum can truncate the
898 	 * relation.  We don't bother with a lock here, nor with marking the page
899 	 * dirty if it wasn't already, since this is just a hint.
900 	 */
901 	((FSMPage) PageGetContents(page))->fp_next_slot = 0;
902 
903 	ReleaseBuffer(buf);
904 
905 	return max_avail;
906 }
907