1 /*-------------------------------------------------------------------------
2  *
3  * hashovfl.c
4  *	  Overflow page management code for the Postgres hash access method
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hashovfl.c
12  *
13  * NOTES
14  *	  Overflow pages look like ordinary relation pages.
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 
20 #include "access/hash.h"
21 #include "access/hash_xlog.h"
22 #include "miscadmin.h"
23 #include "utils/rel.h"
24 
25 
26 static uint32 _hash_firstfreebit(uint32 map);
27 
28 
29 /*
30  * Convert overflow page bit number (its index in the free-page bitmaps)
31  * to block number within the index.
32  */
33 static BlockNumber
bitno_to_blkno(HashMetaPage metap,uint32 ovflbitnum)34 bitno_to_blkno(HashMetaPage metap, uint32 ovflbitnum)
35 {
36 	uint32		splitnum = metap->hashm_ovflpoint;
37 	uint32		i;
38 
39 	/* Convert zero-based bitnumber to 1-based page number */
40 	ovflbitnum += 1;
41 
42 	/* Determine the split number for this page (must be >= 1) */
43 	for (i = 1;
44 		 i < splitnum && ovflbitnum > metap->hashm_spares[i];
45 		 i++)
46 		 /* loop */ ;
47 
48 	/*
49 	 * Convert to absolute page number by adding the number of bucket pages
50 	 * that exist before this split point.
51 	 */
52 	return (BlockNumber) (_hash_get_totalbuckets(i) + ovflbitnum);
53 }
54 
55 /*
56  * _hash_ovflblkno_to_bitno
57  *
58  * Convert overflow page block number to bit number for free-page bitmap.
59  */
60 uint32
_hash_ovflblkno_to_bitno(HashMetaPage metap,BlockNumber ovflblkno)61 _hash_ovflblkno_to_bitno(HashMetaPage metap, BlockNumber ovflblkno)
62 {
63 	uint32		splitnum = metap->hashm_ovflpoint;
64 	uint32		i;
65 	uint32		bitnum;
66 
67 	/* Determine the split number containing this page */
68 	for (i = 1; i <= splitnum; i++)
69 	{
70 		if (ovflblkno <= (BlockNumber) _hash_get_totalbuckets(i))
71 			break;				/* oops */
72 		bitnum = ovflblkno - _hash_get_totalbuckets(i);
73 
74 		/*
75 		 * bitnum has to be greater than number of overflow page added in
76 		 * previous split point. The overflow page at this splitnum (i) if any
77 		 * should start from (_hash_get_totalbuckets(i) +
78 		 * metap->hashm_spares[i - 1] + 1).
79 		 */
80 		if (bitnum > metap->hashm_spares[i - 1] &&
81 			bitnum <= metap->hashm_spares[i])
82 			return bitnum - 1;	/* -1 to convert 1-based to 0-based */
83 	}
84 
85 	ereport(ERROR,
86 			(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
87 			 errmsg("invalid overflow block number %u", ovflblkno)));
88 	return 0;					/* keep compiler quiet */
89 }
90 
91 /*
92  *	_hash_addovflpage
93  *
94  *	Add an overflow page to the bucket whose last page is pointed to by 'buf'.
95  *
96  *	On entry, the caller must hold a pin but no lock on 'buf'.  The pin is
97  *	dropped before exiting (we assume the caller is not interested in 'buf'
98  *	anymore) if not asked to retain.  The pin will be retained only for the
99  *	primary bucket.  The returned overflow page will be pinned and
100  *	write-locked; it is guaranteed to be empty.
101  *
102  *	The caller must hold a pin, but no lock, on the metapage buffer.
103  *	That buffer is returned in the same state.
104  *
105  * NB: since this could be executed concurrently by multiple processes,
106  * one should not assume that the returned overflow page will be the
107  * immediate successor of the originally passed 'buf'.  Additional overflow
108  * pages might have been added to the bucket chain in between.
109  */
110 Buffer
_hash_addovflpage(Relation rel,Buffer metabuf,Buffer buf,bool retain_pin)111 _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin)
112 {
113 	Buffer		ovflbuf;
114 	Page		page;
115 	Page		ovflpage;
116 	HashPageOpaque pageopaque;
117 	HashPageOpaque ovflopaque;
118 	HashMetaPage metap;
119 	Buffer		mapbuf = InvalidBuffer;
120 	Buffer		newmapbuf = InvalidBuffer;
121 	BlockNumber blkno;
122 	uint32		orig_firstfree;
123 	uint32		splitnum;
124 	uint32	   *freep = NULL;
125 	uint32		max_ovflpg;
126 	uint32		bit;
127 	uint32		bitmap_page_bit;
128 	uint32		first_page;
129 	uint32		last_bit;
130 	uint32		last_page;
131 	uint32		i,
132 				j;
133 	bool		page_found = false;
134 
135 	/*
136 	 * Write-lock the tail page.  Here, we need to maintain locking order such
137 	 * that, first acquire the lock on tail page of bucket, then on meta page
138 	 * to find and lock the bitmap page and if it is found, then lock on meta
139 	 * page is released, then finally acquire the lock on new overflow buffer.
140 	 * We need this locking order to avoid deadlock with backends that are
141 	 * doing inserts.
142 	 *
143 	 * Note: We could have avoided locking many buffers here if we made two
144 	 * WAL records for acquiring an overflow page (one to allocate an overflow
145 	 * page and another to add it to overflow bucket chain).  However, doing
146 	 * so can leak an overflow page, if the system crashes after allocation.
147 	 * Needless to say, it is better to have a single record from a
148 	 * performance point of view as well.
149 	 */
150 	LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
151 
152 	/* probably redundant... */
153 	_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
154 
155 	/* loop to find current tail page, in case someone else inserted too */
156 	for (;;)
157 	{
158 		BlockNumber nextblkno;
159 
160 		page = BufferGetPage(buf);
161 		pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
162 		nextblkno = pageopaque->hasho_nextblkno;
163 
164 		if (!BlockNumberIsValid(nextblkno))
165 			break;
166 
167 		/* we assume we do not need to write the unmodified page */
168 		if (retain_pin)
169 		{
170 			/* pin will be retained only for the primary bucket page */
171 			Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_BUCKET_PAGE);
172 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
173 		}
174 		else
175 			_hash_relbuf(rel, buf);
176 
177 		retain_pin = false;
178 
179 		buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
180 	}
181 
182 	/* Get exclusive lock on the meta page */
183 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
184 
185 	_hash_checkpage(rel, metabuf, LH_META_PAGE);
186 	metap = HashPageGetMeta(BufferGetPage(metabuf));
187 
188 	/* start search at hashm_firstfree */
189 	orig_firstfree = metap->hashm_firstfree;
190 	first_page = orig_firstfree >> BMPG_SHIFT(metap);
191 	bit = orig_firstfree & BMPG_MASK(metap);
192 	i = first_page;
193 	j = bit / BITS_PER_MAP;
194 	bit &= ~(BITS_PER_MAP - 1);
195 
196 	/* outer loop iterates once per bitmap page */
197 	for (;;)
198 	{
199 		BlockNumber mapblkno;
200 		Page		mappage;
201 		uint32		last_inpage;
202 
203 		/* want to end search with the last existing overflow page */
204 		splitnum = metap->hashm_ovflpoint;
205 		max_ovflpg = metap->hashm_spares[splitnum] - 1;
206 		last_page = max_ovflpg >> BMPG_SHIFT(metap);
207 		last_bit = max_ovflpg & BMPG_MASK(metap);
208 
209 		if (i > last_page)
210 			break;
211 
212 		Assert(i < metap->hashm_nmaps);
213 		mapblkno = metap->hashm_mapp[i];
214 
215 		if (i == last_page)
216 			last_inpage = last_bit;
217 		else
218 			last_inpage = BMPGSZ_BIT(metap) - 1;
219 
220 		/* Release exclusive lock on metapage while reading bitmap page */
221 		LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
222 
223 		mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE, LH_BITMAP_PAGE);
224 		mappage = BufferGetPage(mapbuf);
225 		freep = HashPageGetBitmap(mappage);
226 
227 		for (; bit <= last_inpage; j++, bit += BITS_PER_MAP)
228 		{
229 			if (freep[j] != ALL_SET)
230 			{
231 				page_found = true;
232 
233 				/* Reacquire exclusive lock on the meta page */
234 				LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
235 
236 				/* convert bit to bit number within page */
237 				bit += _hash_firstfreebit(freep[j]);
238 				bitmap_page_bit = bit;
239 
240 				/* convert bit to absolute bit number */
241 				bit += (i << BMPG_SHIFT(metap));
242 				/* Calculate address of the recycled overflow page */
243 				blkno = bitno_to_blkno(metap, bit);
244 
245 				/* Fetch and init the recycled page */
246 				ovflbuf = _hash_getinitbuf(rel, blkno);
247 
248 				goto found;
249 			}
250 		}
251 
252 		/* No free space here, try to advance to next map page */
253 		_hash_relbuf(rel, mapbuf);
254 		mapbuf = InvalidBuffer;
255 		i++;
256 		j = 0;					/* scan from start of next map page */
257 		bit = 0;
258 
259 		/* Reacquire exclusive lock on the meta page */
260 		LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
261 	}
262 
263 	/*
264 	 * No free pages --- have to extend the relation to add an overflow page.
265 	 * First, check to see if we have to add a new bitmap page too.
266 	 */
267 	if (last_bit == (uint32) (BMPGSZ_BIT(metap) - 1))
268 	{
269 		/*
270 		 * We create the new bitmap page with all pages marked "in use".
271 		 * Actually two pages in the new bitmap's range will exist
272 		 * immediately: the bitmap page itself, and the following page which
273 		 * is the one we return to the caller.  Both of these are correctly
274 		 * marked "in use".  Subsequent pages do not exist yet, but it is
275 		 * convenient to pre-mark them as "in use" too.
276 		 */
277 		bit = metap->hashm_spares[splitnum];
278 
279 		/* metapage already has a write lock */
280 		if (metap->hashm_nmaps >= HASH_MAX_BITMAPS)
281 			ereport(ERROR,
282 					(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
283 					 errmsg("out of overflow pages in hash index \"%s\"",
284 							RelationGetRelationName(rel))));
285 
286 		newmapbuf = _hash_getnewbuf(rel, bitno_to_blkno(metap, bit), MAIN_FORKNUM);
287 	}
288 	else
289 	{
290 		/*
291 		 * Nothing to do here; since the page will be past the last used page,
292 		 * we know its bitmap bit was preinitialized to "in use".
293 		 */
294 	}
295 
296 	/* Calculate address of the new overflow page */
297 	bit = BufferIsValid(newmapbuf) ?
298 		metap->hashm_spares[splitnum] + 1 : metap->hashm_spares[splitnum];
299 	blkno = bitno_to_blkno(metap, bit);
300 
301 	/*
302 	 * Fetch the page with _hash_getnewbuf to ensure smgr's idea of the
303 	 * relation length stays in sync with ours.  XXX It's annoying to do this
304 	 * with metapage write lock held; would be better to use a lock that
305 	 * doesn't block incoming searches.
306 	 *
307 	 * It is okay to hold two buffer locks here (one on tail page of bucket
308 	 * and other on new overflow page) since there cannot be anyone else
309 	 * contending for access to ovflbuf.
310 	 */
311 	ovflbuf = _hash_getnewbuf(rel, blkno, MAIN_FORKNUM);
312 
313 found:
314 
315 	/*
316 	 * Do the update.  No ereport(ERROR) until changes are logged. We want to
317 	 * log the changes for bitmap page and overflow page together to avoid
318 	 * loss of pages in case the new page is added.
319 	 */
320 	START_CRIT_SECTION();
321 
322 	if (page_found)
323 	{
324 		Assert(BufferIsValid(mapbuf));
325 
326 		/* mark page "in use" in the bitmap */
327 		SETBIT(freep, bitmap_page_bit);
328 		MarkBufferDirty(mapbuf);
329 	}
330 	else
331 	{
332 		/* update the count to indicate new overflow page is added */
333 		metap->hashm_spares[splitnum]++;
334 
335 		if (BufferIsValid(newmapbuf))
336 		{
337 			_hash_initbitmapbuffer(newmapbuf, metap->hashm_bmsize, false);
338 			MarkBufferDirty(newmapbuf);
339 
340 			/* add the new bitmap page to the metapage's list of bitmaps */
341 			metap->hashm_mapp[metap->hashm_nmaps] = BufferGetBlockNumber(newmapbuf);
342 			metap->hashm_nmaps++;
343 			metap->hashm_spares[splitnum]++;
344 		}
345 
346 		MarkBufferDirty(metabuf);
347 
348 		/*
349 		 * for new overflow page, we don't need to explicitly set the bit in
350 		 * bitmap page, as by default that will be set to "in use".
351 		 */
352 	}
353 
354 	/*
355 	 * Adjust hashm_firstfree to avoid redundant searches.  But don't risk
356 	 * changing it if someone moved it while we were searching bitmap pages.
357 	 */
358 	if (metap->hashm_firstfree == orig_firstfree)
359 	{
360 		metap->hashm_firstfree = bit + 1;
361 		MarkBufferDirty(metabuf);
362 	}
363 
364 	/* initialize new overflow page */
365 	ovflpage = BufferGetPage(ovflbuf);
366 	ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
367 	ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
368 	ovflopaque->hasho_nextblkno = InvalidBlockNumber;
369 	ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
370 	ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
371 	ovflopaque->hasho_page_id = HASHO_PAGE_ID;
372 
373 	MarkBufferDirty(ovflbuf);
374 
375 	/* logically chain overflow page to previous page */
376 	pageopaque->hasho_nextblkno = BufferGetBlockNumber(ovflbuf);
377 
378 	MarkBufferDirty(buf);
379 
380 	/* XLOG stuff */
381 	if (RelationNeedsWAL(rel))
382 	{
383 		XLogRecPtr	recptr;
384 		xl_hash_add_ovfl_page xlrec;
385 
386 		xlrec.bmpage_found = page_found;
387 		xlrec.bmsize = metap->hashm_bmsize;
388 
389 		XLogBeginInsert();
390 		XLogRegisterData((char *) &xlrec, SizeOfHashAddOvflPage);
391 
392 		XLogRegisterBuffer(0, ovflbuf, REGBUF_WILL_INIT);
393 		XLogRegisterBufData(0, (char *) &pageopaque->hasho_bucket, sizeof(Bucket));
394 
395 		XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
396 
397 		if (BufferIsValid(mapbuf))
398 		{
399 			XLogRegisterBuffer(2, mapbuf, REGBUF_STANDARD);
400 			XLogRegisterBufData(2, (char *) &bitmap_page_bit, sizeof(uint32));
401 		}
402 
403 		if (BufferIsValid(newmapbuf))
404 			XLogRegisterBuffer(3, newmapbuf, REGBUF_WILL_INIT);
405 
406 		XLogRegisterBuffer(4, metabuf, REGBUF_STANDARD);
407 		XLogRegisterBufData(4, (char *) &metap->hashm_firstfree, sizeof(uint32));
408 
409 		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_ADD_OVFL_PAGE);
410 
411 		PageSetLSN(BufferGetPage(ovflbuf), recptr);
412 		PageSetLSN(BufferGetPage(buf), recptr);
413 
414 		if (BufferIsValid(mapbuf))
415 			PageSetLSN(BufferGetPage(mapbuf), recptr);
416 
417 		if (BufferIsValid(newmapbuf))
418 			PageSetLSN(BufferGetPage(newmapbuf), recptr);
419 
420 		PageSetLSN(BufferGetPage(metabuf), recptr);
421 	}
422 
423 	END_CRIT_SECTION();
424 
425 	if (retain_pin)
426 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
427 	else
428 		_hash_relbuf(rel, buf);
429 
430 	if (BufferIsValid(mapbuf))
431 		_hash_relbuf(rel, mapbuf);
432 
433 	LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
434 
435 	if (BufferIsValid(newmapbuf))
436 		_hash_relbuf(rel, newmapbuf);
437 
438 	return ovflbuf;
439 }
440 
441 /*
442  *	_hash_firstfreebit()
443  *
444  *	Return the number of the first bit that is not set in the word 'map'.
445  */
446 static uint32
_hash_firstfreebit(uint32 map)447 _hash_firstfreebit(uint32 map)
448 {
449 	uint32		i,
450 				mask;
451 
452 	mask = 0x1;
453 	for (i = 0; i < BITS_PER_MAP; i++)
454 	{
455 		if (!(mask & map))
456 			return i;
457 		mask <<= 1;
458 	}
459 
460 	elog(ERROR, "firstfreebit found no free bit");
461 
462 	return 0;					/* keep compiler quiet */
463 }
464 
465 /*
466  *	_hash_freeovflpage() -
467  *
468  *	Remove this overflow page from its bucket's chain, and mark the page as
469  *	free.  On entry, ovflbuf is write-locked; it is released before exiting.
470  *
471  *	Add the tuples (itups) to wbuf in this function.  We could do that in the
472  *	caller as well, but the advantage of doing it here is we can easily write
473  *	the WAL for XLOG_HASH_SQUEEZE_PAGE operation.  Addition of tuples and
474  *	removal of overflow page has to done as an atomic operation, otherwise
475  *	during replay on standby users might find duplicate records.
476  *
477  *	Since this function is invoked in VACUUM, we provide an access strategy
478  *	parameter that controls fetches of the bucket pages.
479  *
480  *	Returns the block number of the page that followed the given page
481  *	in the bucket, or InvalidBlockNumber if no following page.
482  *
483  *	NB: caller must not hold lock on metapage, nor on page, that's next to
484  *	ovflbuf in the bucket chain.  We don't acquire the lock on page that's
485  *	prior to ovflbuf in chain if it is same as wbuf because the caller already
486  *	has a lock on same.
487  */
488 BlockNumber
_hash_freeovflpage(Relation rel,Buffer bucketbuf,Buffer ovflbuf,Buffer wbuf,IndexTuple * itups,OffsetNumber * itup_offsets,Size * tups_size,uint16 nitups,BufferAccessStrategy bstrategy)489 _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
490 				   Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets,
491 				   Size *tups_size, uint16 nitups,
492 				   BufferAccessStrategy bstrategy)
493 {
494 	HashMetaPage metap;
495 	Buffer		metabuf;
496 	Buffer		mapbuf;
497 	BlockNumber ovflblkno;
498 	BlockNumber prevblkno;
499 	BlockNumber blkno;
500 	BlockNumber nextblkno;
501 	BlockNumber writeblkno;
502 	HashPageOpaque ovflopaque;
503 	Page		ovflpage;
504 	Page		mappage;
505 	uint32	   *freep;
506 	uint32		ovflbitno;
507 	int32		bitmappage,
508 				bitmapbit;
509 	Bucket		bucket PG_USED_FOR_ASSERTS_ONLY;
510 	Buffer		prevbuf = InvalidBuffer;
511 	Buffer		nextbuf = InvalidBuffer;
512 	bool		update_metap = false;
513 
514 	/* Get information from the doomed page */
515 	_hash_checkpage(rel, ovflbuf, LH_OVERFLOW_PAGE);
516 	ovflblkno = BufferGetBlockNumber(ovflbuf);
517 	ovflpage = BufferGetPage(ovflbuf);
518 	ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
519 	nextblkno = ovflopaque->hasho_nextblkno;
520 	prevblkno = ovflopaque->hasho_prevblkno;
521 	writeblkno = BufferGetBlockNumber(wbuf);
522 	bucket = ovflopaque->hasho_bucket;
523 
524 	/*
525 	 * Fix up the bucket chain.  this is a doubly-linked list, so we must fix
526 	 * up the bucket chain members behind and ahead of the overflow page being
527 	 * deleted.  Concurrency issues are avoided by using lock chaining as
528 	 * described atop hashbucketcleanup.
529 	 */
530 	if (BlockNumberIsValid(prevblkno))
531 	{
532 		if (prevblkno == writeblkno)
533 			prevbuf = wbuf;
534 		else
535 			prevbuf = _hash_getbuf_with_strategy(rel,
536 												 prevblkno,
537 												 HASH_WRITE,
538 												 LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
539 												 bstrategy);
540 	}
541 	if (BlockNumberIsValid(nextblkno))
542 		nextbuf = _hash_getbuf_with_strategy(rel,
543 											 nextblkno,
544 											 HASH_WRITE,
545 											 LH_OVERFLOW_PAGE,
546 											 bstrategy);
547 
548 	/* Note: bstrategy is intentionally not used for metapage and bitmap */
549 
550 	/* Read the metapage so we can determine which bitmap page to use */
551 	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
552 	metap = HashPageGetMeta(BufferGetPage(metabuf));
553 
554 	/* Identify which bit to set */
555 	ovflbitno = _hash_ovflblkno_to_bitno(metap, ovflblkno);
556 
557 	bitmappage = ovflbitno >> BMPG_SHIFT(metap);
558 	bitmapbit = ovflbitno & BMPG_MASK(metap);
559 
560 	if (bitmappage >= metap->hashm_nmaps)
561 		elog(ERROR, "invalid overflow bit number %u", ovflbitno);
562 	blkno = metap->hashm_mapp[bitmappage];
563 
564 	/* Release metapage lock while we access the bitmap page */
565 	LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
566 
567 	/* read the bitmap page to clear the bitmap bit */
568 	mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BITMAP_PAGE);
569 	mappage = BufferGetPage(mapbuf);
570 	freep = HashPageGetBitmap(mappage);
571 	Assert(ISSET(freep, bitmapbit));
572 
573 	/* Get write-lock on metapage to update firstfree */
574 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
575 
576 	/* This operation needs to log multiple tuples, prepare WAL for that */
577 	if (RelationNeedsWAL(rel))
578 		XLogEnsureRecordSpace(HASH_XLOG_FREE_OVFL_BUFS, 4 + nitups);
579 
580 	START_CRIT_SECTION();
581 
582 	/*
583 	 * we have to insert tuples on the "write" page, being careful to preserve
584 	 * hashkey ordering.  (If we insert many tuples into the same "write" page
585 	 * it would be worth qsort'ing them).
586 	 */
587 	if (nitups > 0)
588 	{
589 		_hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
590 		MarkBufferDirty(wbuf);
591 	}
592 
593 	/*
594 	 * Reinitialize the freed overflow page.  Just zeroing the page won't
595 	 * work, because WAL replay routines expect pages to be initialized. See
596 	 * explanation of RBM_NORMAL mode atop XLogReadBufferExtended.  We are
597 	 * careful to make the special space valid here so that tools like
598 	 * pageinspect won't get confused.
599 	 */
600 	_hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
601 
602 	ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
603 
604 	ovflopaque->hasho_prevblkno = InvalidBlockNumber;
605 	ovflopaque->hasho_nextblkno = InvalidBlockNumber;
606 	ovflopaque->hasho_bucket = -1;
607 	ovflopaque->hasho_flag = LH_UNUSED_PAGE;
608 	ovflopaque->hasho_page_id = HASHO_PAGE_ID;
609 
610 	MarkBufferDirty(ovflbuf);
611 
612 	if (BufferIsValid(prevbuf))
613 	{
614 		Page		prevpage = BufferGetPage(prevbuf);
615 		HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
616 
617 		Assert(prevopaque->hasho_bucket == bucket);
618 		prevopaque->hasho_nextblkno = nextblkno;
619 		MarkBufferDirty(prevbuf);
620 	}
621 	if (BufferIsValid(nextbuf))
622 	{
623 		Page		nextpage = BufferGetPage(nextbuf);
624 		HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
625 
626 		Assert(nextopaque->hasho_bucket == bucket);
627 		nextopaque->hasho_prevblkno = prevblkno;
628 		MarkBufferDirty(nextbuf);
629 	}
630 
631 	/* Clear the bitmap bit to indicate that this overflow page is free */
632 	CLRBIT(freep, bitmapbit);
633 	MarkBufferDirty(mapbuf);
634 
635 	/* if this is now the first free page, update hashm_firstfree */
636 	if (ovflbitno < metap->hashm_firstfree)
637 	{
638 		metap->hashm_firstfree = ovflbitno;
639 		update_metap = true;
640 		MarkBufferDirty(metabuf);
641 	}
642 
643 	/* XLOG stuff */
644 	if (RelationNeedsWAL(rel))
645 	{
646 		xl_hash_squeeze_page xlrec;
647 		XLogRecPtr	recptr;
648 		int			i;
649 
650 		xlrec.prevblkno = prevblkno;
651 		xlrec.nextblkno = nextblkno;
652 		xlrec.ntups = nitups;
653 		xlrec.is_prim_bucket_same_wrt = (wbuf == bucketbuf);
654 		xlrec.is_prev_bucket_same_wrt = (wbuf == prevbuf);
655 
656 		XLogBeginInsert();
657 		XLogRegisterData((char *) &xlrec, SizeOfHashSqueezePage);
658 
659 		/*
660 		 * bucket buffer needs to be registered to ensure that we can acquire
661 		 * a cleanup lock on it during replay.
662 		 */
663 		if (!xlrec.is_prim_bucket_same_wrt)
664 			XLogRegisterBuffer(0, bucketbuf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
665 
666 		XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
667 		if (xlrec.ntups > 0)
668 		{
669 			XLogRegisterBufData(1, (char *) itup_offsets,
670 								nitups * sizeof(OffsetNumber));
671 			for (i = 0; i < nitups; i++)
672 				XLogRegisterBufData(1, (char *) itups[i], tups_size[i]);
673 		}
674 
675 		XLogRegisterBuffer(2, ovflbuf, REGBUF_STANDARD);
676 
677 		/*
678 		 * If prevpage and the writepage (block in which we are moving tuples
679 		 * from overflow) are same, then no need to separately register
680 		 * prevpage.  During replay, we can directly update the nextblock in
681 		 * writepage.
682 		 */
683 		if (BufferIsValid(prevbuf) && !xlrec.is_prev_bucket_same_wrt)
684 			XLogRegisterBuffer(3, prevbuf, REGBUF_STANDARD);
685 
686 		if (BufferIsValid(nextbuf))
687 			XLogRegisterBuffer(4, nextbuf, REGBUF_STANDARD);
688 
689 		XLogRegisterBuffer(5, mapbuf, REGBUF_STANDARD);
690 		XLogRegisterBufData(5, (char *) &bitmapbit, sizeof(uint32));
691 
692 		if (update_metap)
693 		{
694 			XLogRegisterBuffer(6, metabuf, REGBUF_STANDARD);
695 			XLogRegisterBufData(6, (char *) &metap->hashm_firstfree, sizeof(uint32));
696 		}
697 
698 		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SQUEEZE_PAGE);
699 
700 		PageSetLSN(BufferGetPage(wbuf), recptr);
701 		PageSetLSN(BufferGetPage(ovflbuf), recptr);
702 
703 		if (BufferIsValid(prevbuf) && !xlrec.is_prev_bucket_same_wrt)
704 			PageSetLSN(BufferGetPage(prevbuf), recptr);
705 		if (BufferIsValid(nextbuf))
706 			PageSetLSN(BufferGetPage(nextbuf), recptr);
707 
708 		PageSetLSN(BufferGetPage(mapbuf), recptr);
709 
710 		if (update_metap)
711 			PageSetLSN(BufferGetPage(metabuf), recptr);
712 	}
713 
714 	END_CRIT_SECTION();
715 
716 	/* release previous bucket if it is not same as write bucket */
717 	if (BufferIsValid(prevbuf) && prevblkno != writeblkno)
718 		_hash_relbuf(rel, prevbuf);
719 
720 	if (BufferIsValid(ovflbuf))
721 		_hash_relbuf(rel, ovflbuf);
722 
723 	if (BufferIsValid(nextbuf))
724 		_hash_relbuf(rel, nextbuf);
725 
726 	_hash_relbuf(rel, mapbuf);
727 	_hash_relbuf(rel, metabuf);
728 
729 	return nextblkno;
730 }
731 
732 
733 /*
734  *	_hash_initbitmapbuffer()
735  *
736  *	 Initialize a new bitmap page.  All bits in the new bitmap page are set to
737  *	 "1", indicating "in use".
738  */
739 void
_hash_initbitmapbuffer(Buffer buf,uint16 bmsize,bool initpage)740 _hash_initbitmapbuffer(Buffer buf, uint16 bmsize, bool initpage)
741 {
742 	Page		pg;
743 	HashPageOpaque op;
744 	uint32	   *freep;
745 
746 	pg = BufferGetPage(buf);
747 
748 	/* initialize the page */
749 	if (initpage)
750 		_hash_pageinit(pg, BufferGetPageSize(buf));
751 
752 	/* initialize the page's special space */
753 	op = (HashPageOpaque) PageGetSpecialPointer(pg);
754 	op->hasho_prevblkno = InvalidBlockNumber;
755 	op->hasho_nextblkno = InvalidBlockNumber;
756 	op->hasho_bucket = -1;
757 	op->hasho_flag = LH_BITMAP_PAGE;
758 	op->hasho_page_id = HASHO_PAGE_ID;
759 
760 	/* set all of the bits to 1 */
761 	freep = HashPageGetBitmap(pg);
762 	MemSet(freep, 0xFF, bmsize);
763 
764 	/*
765 	 * Set pd_lower just past the end of the bitmap page data.  We could even
766 	 * set pd_lower equal to pd_upper, but this is more precise and makes the
767 	 * page look compressible to xlog.c.
768 	 */
769 	((PageHeader) pg)->pd_lower = ((char *) freep + bmsize) - (char *) pg;
770 }
771 
772 
773 /*
774  *	_hash_squeezebucket(rel, bucket)
775  *
776  *	Try to squeeze the tuples onto pages occurring earlier in the
777  *	bucket chain in an attempt to free overflow pages. When we start
778  *	the "squeezing", the page from which we start taking tuples (the
779  *	"read" page) is the last bucket in the bucket chain and the page
780  *	onto which we start squeezing tuples (the "write" page) is the
781  *	first page in the bucket chain.  The read page works backward and
782  *	the write page works forward; the procedure terminates when the
783  *	read page and write page are the same page.
784  *
785  *	At completion of this procedure, it is guaranteed that all pages in
786  *	the bucket are nonempty, unless the bucket is totally empty (in
787  *	which case all overflow pages will be freed).  The original implementation
788  *	required that to be true on entry as well, but it's a lot easier for
789  *	callers to leave empty overflow pages and let this guy clean it up.
790  *
791  *	Caller must acquire cleanup lock on the primary page of the target
792  *	bucket to exclude any scans that are in progress, which could easily
793  *	be confused into returning the same tuple more than once or some tuples
794  *	not at all by the rearrangement we are performing here.  To prevent
795  *	any concurrent scan to cross the squeeze scan we use lock chaining
796  *	similar to hasbucketcleanup.  Refer comments atop hashbucketcleanup.
797  *
798  *	We need to retain a pin on the primary bucket to ensure that no concurrent
799  *	split can start.
800  *
801  *	Since this function is invoked in VACUUM, we provide an access strategy
802  *	parameter that controls fetches of the bucket pages.
803  */
804 void
_hash_squeezebucket(Relation rel,Bucket bucket,BlockNumber bucket_blkno,Buffer bucket_buf,BufferAccessStrategy bstrategy)805 _hash_squeezebucket(Relation rel,
806 					Bucket bucket,
807 					BlockNumber bucket_blkno,
808 					Buffer bucket_buf,
809 					BufferAccessStrategy bstrategy)
810 {
811 	BlockNumber wblkno;
812 	BlockNumber rblkno;
813 	Buffer		wbuf;
814 	Buffer		rbuf;
815 	Page		wpage;
816 	Page		rpage;
817 	HashPageOpaque wopaque;
818 	HashPageOpaque ropaque;
819 
820 	/*
821 	 * start squeezing into the primary bucket page.
822 	 */
823 	wblkno = bucket_blkno;
824 	wbuf = bucket_buf;
825 	wpage = BufferGetPage(wbuf);
826 	wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
827 
828 	/*
829 	 * if there aren't any overflow pages, there's nothing to squeeze. caller
830 	 * is responsible for releasing the pin on primary bucket page.
831 	 */
832 	if (!BlockNumberIsValid(wopaque->hasho_nextblkno))
833 	{
834 		LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
835 		return;
836 	}
837 
838 	/*
839 	 * Find the last page in the bucket chain by starting at the base bucket
840 	 * page and working forward.  Note: we assume that a hash bucket chain is
841 	 * usually smaller than the buffer ring being used by VACUUM, else using
842 	 * the access strategy here would be counterproductive.
843 	 */
844 	rbuf = InvalidBuffer;
845 	ropaque = wopaque;
846 	do
847 	{
848 		rblkno = ropaque->hasho_nextblkno;
849 		if (rbuf != InvalidBuffer)
850 			_hash_relbuf(rel, rbuf);
851 		rbuf = _hash_getbuf_with_strategy(rel,
852 										  rblkno,
853 										  HASH_WRITE,
854 										  LH_OVERFLOW_PAGE,
855 										  bstrategy);
856 		rpage = BufferGetPage(rbuf);
857 		ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
858 		Assert(ropaque->hasho_bucket == bucket);
859 	} while (BlockNumberIsValid(ropaque->hasho_nextblkno));
860 
861 	/*
862 	 * squeeze the tuples.
863 	 */
864 	for (;;)
865 	{
866 		OffsetNumber roffnum;
867 		OffsetNumber maxroffnum;
868 		OffsetNumber deletable[MaxOffsetNumber];
869 		IndexTuple	itups[MaxIndexTuplesPerPage];
870 		Size		tups_size[MaxIndexTuplesPerPage];
871 		OffsetNumber itup_offsets[MaxIndexTuplesPerPage];
872 		uint16		ndeletable = 0;
873 		uint16		nitups = 0;
874 		Size		all_tups_size = 0;
875 		int			i;
876 		bool		retain_pin = false;
877 
878 readpage:
879 		/* Scan each tuple in "read" page */
880 		maxroffnum = PageGetMaxOffsetNumber(rpage);
881 		for (roffnum = FirstOffsetNumber;
882 			 roffnum <= maxroffnum;
883 			 roffnum = OffsetNumberNext(roffnum))
884 		{
885 			IndexTuple	itup;
886 			Size		itemsz;
887 
888 			/* skip dead tuples */
889 			if (ItemIdIsDead(PageGetItemId(rpage, roffnum)))
890 				continue;
891 
892 			itup = (IndexTuple) PageGetItem(rpage,
893 											PageGetItemId(rpage, roffnum));
894 			itemsz = IndexTupleSize(itup);
895 			itemsz = MAXALIGN(itemsz);
896 
897 			/*
898 			 * Walk up the bucket chain, looking for a page big enough for
899 			 * this item and all other accumulated items.  Exit if we reach
900 			 * the read page.
901 			 */
902 			while (PageGetFreeSpaceForMultipleTuples(wpage, nitups + 1) < (all_tups_size + itemsz))
903 			{
904 				Buffer		next_wbuf = InvalidBuffer;
905 				bool		tups_moved = false;
906 
907 				Assert(!PageIsEmpty(wpage));
908 
909 				if (wblkno == bucket_blkno)
910 					retain_pin = true;
911 
912 				wblkno = wopaque->hasho_nextblkno;
913 				Assert(BlockNumberIsValid(wblkno));
914 
915 				/* don't need to move to next page if we reached the read page */
916 				if (wblkno != rblkno)
917 					next_wbuf = _hash_getbuf_with_strategy(rel,
918 														   wblkno,
919 														   HASH_WRITE,
920 														   LH_OVERFLOW_PAGE,
921 														   bstrategy);
922 
923 				if (nitups > 0)
924 				{
925 					Assert(nitups == ndeletable);
926 
927 					/*
928 					 * This operation needs to log multiple tuples, prepare
929 					 * WAL for that.
930 					 */
931 					if (RelationNeedsWAL(rel))
932 						XLogEnsureRecordSpace(0, 3 + nitups);
933 
934 					START_CRIT_SECTION();
935 
936 					/*
937 					 * we have to insert tuples on the "write" page, being
938 					 * careful to preserve hashkey ordering.  (If we insert
939 					 * many tuples into the same "write" page it would be
940 					 * worth qsort'ing them).
941 					 */
942 					_hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
943 					MarkBufferDirty(wbuf);
944 
945 					/* Delete tuples we already moved off read page */
946 					PageIndexMultiDelete(rpage, deletable, ndeletable);
947 					MarkBufferDirty(rbuf);
948 
949 					/* XLOG stuff */
950 					if (RelationNeedsWAL(rel))
951 					{
952 						XLogRecPtr	recptr;
953 						xl_hash_move_page_contents xlrec;
954 
955 						xlrec.ntups = nitups;
956 						xlrec.is_prim_bucket_same_wrt = (wbuf == bucket_buf) ? true : false;
957 
958 						XLogBeginInsert();
959 						XLogRegisterData((char *) &xlrec, SizeOfHashMovePageContents);
960 
961 						/*
962 						 * bucket buffer needs to be registered to ensure that
963 						 * we can acquire a cleanup lock on it during replay.
964 						 */
965 						if (!xlrec.is_prim_bucket_same_wrt)
966 							XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
967 
968 						XLogRegisterBuffer(1, wbuf, REGBUF_STANDARD);
969 						XLogRegisterBufData(1, (char *) itup_offsets,
970 											nitups * sizeof(OffsetNumber));
971 						for (i = 0; i < nitups; i++)
972 							XLogRegisterBufData(1, (char *) itups[i], tups_size[i]);
973 
974 						XLogRegisterBuffer(2, rbuf, REGBUF_STANDARD);
975 						XLogRegisterBufData(2, (char *) deletable,
976 											ndeletable * sizeof(OffsetNumber));
977 
978 						recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_MOVE_PAGE_CONTENTS);
979 
980 						PageSetLSN(BufferGetPage(wbuf), recptr);
981 						PageSetLSN(BufferGetPage(rbuf), recptr);
982 					}
983 
984 					END_CRIT_SECTION();
985 
986 					tups_moved = true;
987 				}
988 
989 				/*
990 				 * release the lock on previous page after acquiring the lock
991 				 * on next page
992 				 */
993 				if (retain_pin)
994 					LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
995 				else
996 					_hash_relbuf(rel, wbuf);
997 
998 				/* nothing more to do if we reached the read page */
999 				if (rblkno == wblkno)
1000 				{
1001 					_hash_relbuf(rel, rbuf);
1002 					return;
1003 				}
1004 
1005 				wbuf = next_wbuf;
1006 				wpage = BufferGetPage(wbuf);
1007 				wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
1008 				Assert(wopaque->hasho_bucket == bucket);
1009 				retain_pin = false;
1010 
1011 				/* be tidy */
1012 				for (i = 0; i < nitups; i++)
1013 					pfree(itups[i]);
1014 				nitups = 0;
1015 				all_tups_size = 0;
1016 				ndeletable = 0;
1017 
1018 				/*
1019 				 * after moving the tuples, rpage would have been compacted,
1020 				 * so we need to rescan it.
1021 				 */
1022 				if (tups_moved)
1023 					goto readpage;
1024 			}
1025 
1026 			/* remember tuple for deletion from "read" page */
1027 			deletable[ndeletable++] = roffnum;
1028 
1029 			/*
1030 			 * we need a copy of index tuples as they can be freed as part of
1031 			 * overflow page, however we need them to write a WAL record in
1032 			 * _hash_freeovflpage.
1033 			 */
1034 			itups[nitups] = CopyIndexTuple(itup);
1035 			tups_size[nitups++] = itemsz;
1036 			all_tups_size += itemsz;
1037 		}
1038 
1039 		/*
1040 		 * If we reach here, there are no live tuples on the "read" page ---
1041 		 * it was empty when we got to it, or we moved them all.  So we can
1042 		 * just free the page without bothering with deleting tuples
1043 		 * individually.  Then advance to the previous "read" page.
1044 		 *
1045 		 * Tricky point here: if our read and write pages are adjacent in the
1046 		 * bucket chain, our write lock on wbuf will conflict with
1047 		 * _hash_freeovflpage's attempt to update the sibling links of the
1048 		 * removed page.  In that case, we don't need to lock it again.
1049 		 */
1050 		rblkno = ropaque->hasho_prevblkno;
1051 		Assert(BlockNumberIsValid(rblkno));
1052 
1053 		/* free this overflow page (releases rbuf) */
1054 		_hash_freeovflpage(rel, bucket_buf, rbuf, wbuf, itups, itup_offsets,
1055 						   tups_size, nitups, bstrategy);
1056 
1057 		/* be tidy */
1058 		for (i = 0; i < nitups; i++)
1059 			pfree(itups[i]);
1060 
1061 		/* are we freeing the page adjacent to wbuf? */
1062 		if (rblkno == wblkno)
1063 		{
1064 			/* retain the pin on primary bucket page till end of bucket scan */
1065 			if (wblkno == bucket_blkno)
1066 				LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
1067 			else
1068 				_hash_relbuf(rel, wbuf);
1069 			return;
1070 		}
1071 
1072 		rbuf = _hash_getbuf_with_strategy(rel,
1073 										  rblkno,
1074 										  HASH_WRITE,
1075 										  LH_OVERFLOW_PAGE,
1076 										  bstrategy);
1077 		rpage = BufferGetPage(rbuf);
1078 		ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
1079 		Assert(ropaque->hasho_bucket == bucket);
1080 	}
1081 
1082 	/* NOTREACHED */
1083 }
1084