1 /*-------------------------------------------------------------------------
2  *
3  * fsmpage.c
4  *	  routines to search and manipulate one FSM page.
5  *
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *	  src/backend/storage/freespace/fsmpage.c
12  *
13  * NOTES:
14  *
15  *	The public functions in this file form an API that hides the internal
16  *	structure of a FSM page. This allows freespace.c to treat each FSM page
17  *	as a black box with SlotsPerPage "slots". fsm_set_avail() and
18  *	fsm_get_avail() let you get/set the value of a slot, and
19  *	fsm_search_avail() lets you search for a slot with value >= X.
20  *
21  *-------------------------------------------------------------------------
22  */
23 #include "postgres.h"
24 
25 #include "storage/bufmgr.h"
26 #include "storage/fsm_internals.h"
27 
28 /* Macros to navigate the tree within a page. Root has index zero. */
29 #define leftchild(x)	(2 * (x) + 1)
30 #define rightchild(x)	(2 * (x) + 2)
31 #define parentof(x)		(((x) - 1) / 2)
32 
33 /*
34  * Find right neighbor of x, wrapping around within the level
35  */
36 static int
rightneighbor(int x)37 rightneighbor(int x)
38 {
39 	/*
40 	 * Move right. This might wrap around, stepping to the leftmost node at
41 	 * the next level.
42 	 */
43 	x++;
44 
45 	/*
46 	 * Check if we stepped to the leftmost node at next level, and correct if
47 	 * so. The leftmost nodes at each level are numbered x = 2^level - 1, so
48 	 * check if (x + 1) is a power of two, using a standard
49 	 * twos-complement-arithmetic trick.
50 	 */
51 	if (((x + 1) & x) == 0)
52 		x = parentof(x);
53 
54 	return x;
55 }
56 
57 /*
58  * Sets the value of a slot on page. Returns true if the page was modified.
59  *
60  * The caller must hold an exclusive lock on the page.
61  */
62 bool
fsm_set_avail(Page page,int slot,uint8 value)63 fsm_set_avail(Page page, int slot, uint8 value)
64 {
65 	int			nodeno = NonLeafNodesPerPage + slot;
66 	FSMPage		fsmpage = (FSMPage) PageGetContents(page);
67 	uint8		oldvalue;
68 
69 	Assert(slot < LeafNodesPerPage);
70 
71 	oldvalue = fsmpage->fp_nodes[nodeno];
72 
73 	/* If the value hasn't changed, we don't need to do anything */
74 	if (oldvalue == value && value <= fsmpage->fp_nodes[0])
75 		return false;
76 
77 	fsmpage->fp_nodes[nodeno] = value;
78 
79 	/*
80 	 * Propagate up, until we hit the root or a node that doesn't need to be
81 	 * updated.
82 	 */
83 	do
84 	{
85 		uint8		newvalue = 0;
86 		int			lchild;
87 		int			rchild;
88 
89 		nodeno = parentof(nodeno);
90 		lchild = leftchild(nodeno);
91 		rchild = lchild + 1;
92 
93 		newvalue = fsmpage->fp_nodes[lchild];
94 		if (rchild < NodesPerPage)
95 			newvalue = Max(newvalue,
96 						   fsmpage->fp_nodes[rchild]);
97 
98 		oldvalue = fsmpage->fp_nodes[nodeno];
99 		if (oldvalue == newvalue)
100 			break;
101 
102 		fsmpage->fp_nodes[nodeno] = newvalue;
103 	} while (nodeno > 0);
104 
105 	/*
106 	 * sanity check: if the new value is (still) higher than the value at the
107 	 * top, the tree is corrupt.  If so, rebuild.
108 	 */
109 	if (value > fsmpage->fp_nodes[0])
110 		fsm_rebuild_page(page);
111 
112 	return true;
113 }
114 
115 /*
116  * Returns the value of given slot on page.
117  *
118  * Since this is just a read-only access of a single byte, the page doesn't
119  * need to be locked.
120  */
121 uint8
fsm_get_avail(Page page,int slot)122 fsm_get_avail(Page page, int slot)
123 {
124 	FSMPage		fsmpage = (FSMPage) PageGetContents(page);
125 
126 	Assert(slot < LeafNodesPerPage);
127 
128 	return fsmpage->fp_nodes[NonLeafNodesPerPage + slot];
129 }
130 
131 /*
132  * Returns the value at the root of a page.
133  *
134  * Since this is just a read-only access of a single byte, the page doesn't
135  * need to be locked.
136  */
137 uint8
fsm_get_max_avail(Page page)138 fsm_get_max_avail(Page page)
139 {
140 	FSMPage		fsmpage = (FSMPage) PageGetContents(page);
141 
142 	return fsmpage->fp_nodes[0];
143 }
144 
145 /*
146  * Searches for a slot with category at least minvalue.
147  * Returns slot number, or -1 if none found.
148  *
149  * The caller must hold at least a shared lock on the page, and this
150  * function can unlock and lock the page again in exclusive mode if it
151  * needs to be updated. exclusive_lock_held should be set to true if the
152  * caller is already holding an exclusive lock, to avoid extra work.
153  *
154  * If advancenext is false, fp_next_slot is set to point to the returned
155  * slot, and if it's true, to the slot after the returned slot.
156  */
157 int
fsm_search_avail(Buffer buf,uint8 minvalue,bool advancenext,bool exclusive_lock_held)158 fsm_search_avail(Buffer buf, uint8 minvalue, bool advancenext,
159 				 bool exclusive_lock_held)
160 {
161 	Page		page = BufferGetPage(buf);
162 	FSMPage		fsmpage = (FSMPage) PageGetContents(page);
163 	int			nodeno;
164 	int			target;
165 	uint16		slot;
166 
167 restart:
168 
169 	/*
170 	 * Check the root first, and exit quickly if there's no leaf with enough
171 	 * free space
172 	 */
173 	if (fsmpage->fp_nodes[0] < minvalue)
174 		return -1;
175 
176 	/*
177 	 * Start search using fp_next_slot.  It's just a hint, so check that it's
178 	 * sane.  (This also handles wrapping around when the prior call returned
179 	 * the last slot on the page.)
180 	 */
181 	target = fsmpage->fp_next_slot;
182 	if (target < 0 || target >= LeafNodesPerPage)
183 		target = 0;
184 	target += NonLeafNodesPerPage;
185 
186 	/*----------
187 	 * Start the search from the target slot.  At every step, move one
188 	 * node to the right, then climb up to the parent.  Stop when we reach
189 	 * a node with enough free space (as we must, since the root has enough
190 	 * space).
191 	 *
192 	 * The idea is to gradually expand our "search triangle", that is, all
193 	 * nodes covered by the current node, and to be sure we search to the
194 	 * right from the start point.  At the first step, only the target slot
195 	 * is examined.  When we move up from a left child to its parent, we are
196 	 * adding the right-hand subtree of that parent to the search triangle.
197 	 * When we move right then up from a right child, we are dropping the
198 	 * current search triangle (which we know doesn't contain any suitable
199 	 * page) and instead looking at the next-larger-size triangle to its
200 	 * right.  So we never look left from our original start point, and at
201 	 * each step the size of the search triangle doubles, ensuring it takes
202 	 * only log2(N) work to search N pages.
203 	 *
204 	 * The "move right" operation will wrap around if it hits the right edge
205 	 * of the tree, so the behavior is still good if we start near the right.
206 	 * Note also that the move-and-climb behavior ensures that we can't end
207 	 * up on one of the missing nodes at the right of the leaf level.
208 	 *
209 	 * For example, consider this tree:
210 	 *
211 	 *		   7
212 	 *	   7	   6
213 	 *	 5	 7	 6	 5
214 	 *	4 5 5 7 2 6 5 2
215 	 *				T
216 	 *
217 	 * Assume that the target node is the node indicated by the letter T,
218 	 * and we're searching for a node with value of 6 or higher. The search
219 	 * begins at T. At the first iteration, we move to the right, then to the
220 	 * parent, arriving at the rightmost 5. At the second iteration, we move
221 	 * to the right, wrapping around, then climb up, arriving at the 7 on the
222 	 * third level.  7 satisfies our search, so we descend down to the bottom,
223 	 * following the path of sevens.  This is in fact the first suitable page
224 	 * to the right of (allowing for wraparound) our start point.
225 	 *----------
226 	 */
227 	nodeno = target;
228 	while (nodeno > 0)
229 	{
230 		if (fsmpage->fp_nodes[nodeno] >= minvalue)
231 			break;
232 
233 		/*
234 		 * Move to the right, wrapping around on same level if necessary, then
235 		 * climb up.
236 		 */
237 		nodeno = parentof(rightneighbor(nodeno));
238 	}
239 
240 	/*
241 	 * We're now at a node with enough free space, somewhere in the middle of
242 	 * the tree. Descend to the bottom, following a path with enough free
243 	 * space, preferring to move left if there's a choice.
244 	 */
245 	while (nodeno < NonLeafNodesPerPage)
246 	{
247 		int			childnodeno = leftchild(nodeno);
248 
249 		if (childnodeno < NodesPerPage &&
250 			fsmpage->fp_nodes[childnodeno] >= minvalue)
251 		{
252 			nodeno = childnodeno;
253 			continue;
254 		}
255 		childnodeno++;			/* point to right child */
256 		if (childnodeno < NodesPerPage &&
257 			fsmpage->fp_nodes[childnodeno] >= minvalue)
258 		{
259 			nodeno = childnodeno;
260 		}
261 		else
262 		{
263 			/*
264 			 * Oops. The parent node promised that either left or right child
265 			 * has enough space, but neither actually did. This can happen in
266 			 * case of a "torn page", IOW if we crashed earlier while writing
267 			 * the page to disk, and only part of the page made it to disk.
268 			 *
269 			 * Fix the corruption and restart.
270 			 */
271 			RelFileNode rnode;
272 			ForkNumber	forknum;
273 			BlockNumber blknum;
274 
275 			BufferGetTag(buf, &rnode, &forknum, &blknum);
276 			elog(DEBUG1, "fixing corrupt FSM block %u, relation %u/%u/%u",
277 				 blknum, rnode.spcNode, rnode.dbNode, rnode.relNode);
278 
279 			/* make sure we hold an exclusive lock */
280 			if (!exclusive_lock_held)
281 			{
282 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
283 				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
284 				exclusive_lock_held = true;
285 			}
286 			fsm_rebuild_page(page);
287 			MarkBufferDirtyHint(buf, false);
288 			goto restart;
289 		}
290 	}
291 
292 	/* We're now at the bottom level, at a node with enough space. */
293 	slot = nodeno - NonLeafNodesPerPage;
294 
295 	/*
296 	 * Update the next-target pointer. Note that we do this even if we're only
297 	 * holding a shared lock, on the grounds that it's better to use a shared
298 	 * lock and get a garbled next pointer every now and then, than take the
299 	 * concurrency hit of an exclusive lock.
300 	 *
301 	 * Wrap-around is handled at the beginning of this function.
302 	 */
303 	fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
304 
305 	return slot;
306 }
307 
308 /*
309  * Sets the available space to zero for all slots numbered >= nslots.
310  * Returns true if the page was modified.
311  */
312 bool
fsm_truncate_avail(Page page,int nslots)313 fsm_truncate_avail(Page page, int nslots)
314 {
315 	FSMPage		fsmpage = (FSMPage) PageGetContents(page);
316 	uint8	   *ptr;
317 	bool		changed = false;
318 
319 	Assert(nslots >= 0 && nslots < LeafNodesPerPage);
320 
321 	/* Clear all truncated leaf nodes */
322 	ptr = &fsmpage->fp_nodes[NonLeafNodesPerPage + nslots];
323 	for (; ptr < &fsmpage->fp_nodes[NodesPerPage]; ptr++)
324 	{
325 		if (*ptr != 0)
326 			changed = true;
327 		*ptr = 0;
328 	}
329 
330 	/* Fix upper nodes. */
331 	if (changed)
332 		fsm_rebuild_page(page);
333 
334 	return changed;
335 }
336 
337 /*
338  * Reconstructs the upper levels of a page. Returns true if the page
339  * was modified.
340  */
341 bool
fsm_rebuild_page(Page page)342 fsm_rebuild_page(Page page)
343 {
344 	FSMPage		fsmpage = (FSMPage) PageGetContents(page);
345 	bool		changed = false;
346 	int			nodeno;
347 
348 	/*
349 	 * Start from the lowest non-leaf level, at last node, working our way
350 	 * backwards, through all non-leaf nodes at all levels, up to the root.
351 	 */
352 	for (nodeno = NonLeafNodesPerPage - 1; nodeno >= 0; nodeno--)
353 	{
354 		int			lchild = leftchild(nodeno);
355 		int			rchild = lchild + 1;
356 		uint8		newvalue = 0;
357 
358 		/* The first few nodes we examine might have zero or one child. */
359 		if (lchild < NodesPerPage)
360 			newvalue = fsmpage->fp_nodes[lchild];
361 
362 		if (rchild < NodesPerPage)
363 			newvalue = Max(newvalue,
364 						   fsmpage->fp_nodes[rchild]);
365 
366 		if (fsmpage->fp_nodes[nodeno] != newvalue)
367 		{
368 			fsmpage->fp_nodes[nodeno] = newvalue;
369 			changed = true;
370 		}
371 	}
372 
373 	return changed;
374 }
375