1 /*-------------------------------------------------------------------------
2  *
3  * hash.c
4  *	  Implementation of Margo Seltzer's Hashing package for postgres.
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hash.c
12  *
13  * NOTES
14  *	  This file contains only the public interface routines.
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/hash.h"
22 #include "access/hash_xlog.h"
23 #include "access/relscan.h"
24 #include "catalog/index.h"
25 #include "commands/vacuum.h"
26 #include "miscadmin.h"
27 #include "optimizer/plancat.h"
28 #include "utils/builtins.h"
29 #include "utils/index_selfuncs.h"
30 #include "utils/rel.h"
31 #include "miscadmin.h"
32 
33 
34 /* Working state for hashbuild and its callback */
35 typedef struct
36 {
37 	HSpool	   *spool;			/* NULL if not using spooling */
38 	double		indtuples;		/* # tuples accepted into index */
39 	Relation	heapRel;		/* heap relation descriptor */
40 } HashBuildState;
41 
42 static void hashbuildCallback(Relation index,
43 				  HeapTuple htup,
44 				  Datum *values,
45 				  bool *isnull,
46 				  bool tupleIsAlive,
47 				  void *state);
48 
49 
50 /*
51  * Hash handler function: return IndexAmRoutine with access method parameters
52  * and callbacks.
53  */
54 Datum
hashhandler(PG_FUNCTION_ARGS)55 hashhandler(PG_FUNCTION_ARGS)
56 {
57 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
58 
59 	amroutine->amstrategies = HTMaxStrategyNumber;
60 	amroutine->amsupport = HASHNProcs;
61 	amroutine->amcanorder = false;
62 	amroutine->amcanorderbyop = false;
63 	amroutine->amcanbackward = true;
64 	amroutine->amcanunique = false;
65 	amroutine->amcanmulticol = false;
66 	amroutine->amoptionalkey = false;
67 	amroutine->amsearcharray = false;
68 	amroutine->amsearchnulls = false;
69 	amroutine->amstorage = false;
70 	amroutine->amclusterable = false;
71 	amroutine->ampredlocks = false;
72 	amroutine->amcanparallel = false;
73 	amroutine->amkeytype = INT4OID;
74 
75 	amroutine->ambuild = hashbuild;
76 	amroutine->ambuildempty = hashbuildempty;
77 	amroutine->aminsert = hashinsert;
78 	amroutine->ambulkdelete = hashbulkdelete;
79 	amroutine->amvacuumcleanup = hashvacuumcleanup;
80 	amroutine->amcanreturn = NULL;
81 	amroutine->amcostestimate = hashcostestimate;
82 	amroutine->amoptions = hashoptions;
83 	amroutine->amproperty = NULL;
84 	amroutine->amvalidate = hashvalidate;
85 	amroutine->ambeginscan = hashbeginscan;
86 	amroutine->amrescan = hashrescan;
87 	amroutine->amgettuple = hashgettuple;
88 	amroutine->amgetbitmap = hashgetbitmap;
89 	amroutine->amendscan = hashendscan;
90 	amroutine->ammarkpos = NULL;
91 	amroutine->amrestrpos = NULL;
92 	amroutine->amestimateparallelscan = NULL;
93 	amroutine->aminitparallelscan = NULL;
94 	amroutine->amparallelrescan = NULL;
95 
96 	PG_RETURN_POINTER(amroutine);
97 }
98 
99 /*
100  *	hashbuild() -- build a new hash index.
101  */
102 IndexBuildResult *
hashbuild(Relation heap,Relation index,IndexInfo * indexInfo)103 hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
104 {
105 	IndexBuildResult *result;
106 	BlockNumber relpages;
107 	double		reltuples;
108 	double		allvisfrac;
109 	uint32		num_buckets;
110 	long		sort_threshold;
111 	HashBuildState buildstate;
112 
113 	/*
114 	 * We expect to be called exactly once for any index relation. If that's
115 	 * not the case, big trouble's what we have.
116 	 */
117 	if (RelationGetNumberOfBlocks(index) != 0)
118 		elog(ERROR, "index \"%s\" already contains data",
119 			 RelationGetRelationName(index));
120 
121 	/* Estimate the number of rows currently present in the table */
122 	estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
123 
124 	/* Initialize the hash index metadata page and initial buckets */
125 	num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
126 
127 	/*
128 	 * If we just insert the tuples into the index in scan order, then
129 	 * (assuming their hash codes are pretty random) there will be no locality
130 	 * of access to the index, and if the index is bigger than available RAM
131 	 * then we'll thrash horribly.  To prevent that scenario, we can sort the
132 	 * tuples by (expected) bucket number.  However, such a sort is useless
133 	 * overhead when the index does fit in RAM.  We choose to sort if the
134 	 * initial index size exceeds maintenance_work_mem, or the number of
135 	 * buffers usable for the index, whichever is less.  (Limiting by the
136 	 * number of buffers should reduce thrashing between PG buffers and kernel
137 	 * buffers, which seems useful even if no physical I/O results.  Limiting
138 	 * by maintenance_work_mem is useful to allow easy testing of the sort
139 	 * code path, and may be useful to DBAs as an additional control knob.)
140 	 *
141 	 * NOTE: this test will need adjustment if a bucket is ever different from
142 	 * one page.  Also, "initial index size" accounting does not include the
143 	 * metapage, nor the first bitmap page.
144 	 */
145 	sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
146 	if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
147 		sort_threshold = Min(sort_threshold, NBuffers);
148 	else
149 		sort_threshold = Min(sort_threshold, NLocBuffer);
150 
151 	if (num_buckets >= (uint32) sort_threshold)
152 		buildstate.spool = _h_spoolinit(heap, index, num_buckets);
153 	else
154 		buildstate.spool = NULL;
155 
156 	/* prepare to build the index */
157 	buildstate.indtuples = 0;
158 	buildstate.heapRel = heap;
159 
160 	/* do the heap scan */
161 	reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
162 								   hashbuildCallback, (void *) &buildstate);
163 
164 	if (buildstate.spool)
165 	{
166 		/* sort the tuples and insert them into the index */
167 		_h_indexbuild(buildstate.spool, buildstate.heapRel);
168 		_h_spooldestroy(buildstate.spool);
169 	}
170 
171 	/*
172 	 * Return statistics
173 	 */
174 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
175 
176 	result->heap_tuples = reltuples;
177 	result->index_tuples = buildstate.indtuples;
178 
179 	return result;
180 }
181 
182 /*
183  *	hashbuildempty() -- build an empty hash index in the initialization fork
184  */
185 void
hashbuildempty(Relation index)186 hashbuildempty(Relation index)
187 {
188 	_hash_init(index, 0, INIT_FORKNUM);
189 }
190 
191 /*
192  * Per-tuple callback from IndexBuildHeapScan
193  */
194 static void
hashbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * state)195 hashbuildCallback(Relation index,
196 				  HeapTuple htup,
197 				  Datum *values,
198 				  bool *isnull,
199 				  bool tupleIsAlive,
200 				  void *state)
201 {
202 	HashBuildState *buildstate = (HashBuildState *) state;
203 	Datum		index_values[1];
204 	bool		index_isnull[1];
205 	IndexTuple	itup;
206 
207 	/* convert data to a hash key; on failure, do not insert anything */
208 	if (!_hash_convert_tuple(index,
209 							 values, isnull,
210 							 index_values, index_isnull))
211 		return;
212 
213 	/* Either spool the tuple for sorting, or just put it into the index */
214 	if (buildstate->spool)
215 		_h_spool(buildstate->spool, &htup->t_self,
216 				 index_values, index_isnull);
217 	else
218 	{
219 		/* form an index tuple and point it at the heap tuple */
220 		itup = index_form_tuple(RelationGetDescr(index),
221 								index_values, index_isnull);
222 		itup->t_tid = htup->t_self;
223 		_hash_doinsert(index, itup, buildstate->heapRel);
224 		pfree(itup);
225 	}
226 
227 	buildstate->indtuples += 1;
228 }
229 
230 /*
231  *	hashinsert() -- insert an index tuple into a hash table.
232  *
233  *	Hash on the heap tuple's key, form an index tuple with hash code.
234  *	Find the appropriate location for the new tuple, and put it there.
235  */
236 bool
hashinsert(Relation rel,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)237 hashinsert(Relation rel, Datum *values, bool *isnull,
238 		   ItemPointer ht_ctid, Relation heapRel,
239 		   IndexUniqueCheck checkUnique,
240 		   IndexInfo *indexInfo)
241 {
242 	Datum		index_values[1];
243 	bool		index_isnull[1];
244 	IndexTuple	itup;
245 
246 	/* convert data to a hash key; on failure, do not insert anything */
247 	if (!_hash_convert_tuple(rel,
248 							 values, isnull,
249 							 index_values, index_isnull))
250 		return false;
251 
252 	/* form an index tuple and point it at the heap tuple */
253 	itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
254 	itup->t_tid = *ht_ctid;
255 
256 	_hash_doinsert(rel, itup, heapRel);
257 
258 	pfree(itup);
259 
260 	return false;
261 }
262 
263 
264 /*
265  *	hashgettuple() -- Get the next tuple in the scan.
266  */
267 bool
hashgettuple(IndexScanDesc scan,ScanDirection dir)268 hashgettuple(IndexScanDesc scan, ScanDirection dir)
269 {
270 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
271 	Relation	rel = scan->indexRelation;
272 	Buffer		buf;
273 	Page		page;
274 	OffsetNumber offnum;
275 	ItemPointer current;
276 	bool		res;
277 
278 	/* Hash indexes are always lossy since we store only the hash code */
279 	scan->xs_recheck = true;
280 
281 	/*
282 	 * We hold pin but not lock on current buffer while outside the hash AM.
283 	 * Reacquire the read lock here.
284 	 */
285 	if (BufferIsValid(so->hashso_curbuf))
286 		LockBuffer(so->hashso_curbuf, BUFFER_LOCK_SHARE);
287 
288 	/*
289 	 * If we've already initialized this scan, we can just advance it in the
290 	 * appropriate direction.  If we haven't done so yet, we call a routine to
291 	 * get the first item in the scan.
292 	 */
293 	current = &(so->hashso_curpos);
294 	if (ItemPointerIsValid(current))
295 	{
296 		/*
297 		 * An insertion into the current index page could have happened while
298 		 * we didn't have read lock on it.  Re-find our position by looking
299 		 * for the TID we previously returned.  (Because we hold a pin on the
300 		 * primary bucket page, no deletions or splits could have occurred;
301 		 * therefore we can expect that the TID still exists in the current
302 		 * index page, at an offset >= where we were.)
303 		 */
304 		OffsetNumber maxoffnum;
305 
306 		buf = so->hashso_curbuf;
307 		Assert(BufferIsValid(buf));
308 		page = BufferGetPage(buf);
309 
310 		/*
311 		 * We don't need test for old snapshot here as the current buffer is
312 		 * pinned, so vacuum can't clean the page.
313 		 */
314 		maxoffnum = PageGetMaxOffsetNumber(page);
315 		for (offnum = ItemPointerGetOffsetNumber(current);
316 			 offnum <= maxoffnum;
317 			 offnum = OffsetNumberNext(offnum))
318 		{
319 			IndexTuple	itup;
320 
321 			itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
322 			if (ItemPointerEquals(&(so->hashso_heappos), &(itup->t_tid)))
323 				break;
324 		}
325 		if (offnum > maxoffnum)
326 			elog(ERROR, "failed to re-find scan position within index \"%s\"",
327 				 RelationGetRelationName(rel));
328 		ItemPointerSetOffsetNumber(current, offnum);
329 
330 		/*
331 		 * Check to see if we should kill the previously-fetched tuple.
332 		 */
333 		if (scan->kill_prior_tuple)
334 		{
335 			/*
336 			 * Yes, so remember it for later. (We'll deal with all such tuples
337 			 * at once right after leaving the index page or at end of scan.)
338 			 * In case if caller reverses the indexscan direction it is quite
339 			 * possible that the same item might get entered multiple times.
340 			 * But, we don't detect that; instead, we just forget any excess
341 			 * entries.
342 			 */
343 			if (so->killedItems == NULL)
344 				so->killedItems = palloc(MaxIndexTuplesPerPage *
345 										 sizeof(HashScanPosItem));
346 
347 			if (so->numKilled < MaxIndexTuplesPerPage)
348 			{
349 				so->killedItems[so->numKilled].heapTid = so->hashso_heappos;
350 				so->killedItems[so->numKilled].indexOffset =
351 					ItemPointerGetOffsetNumber(&(so->hashso_curpos));
352 				so->numKilled++;
353 			}
354 		}
355 
356 		/*
357 		 * Now continue the scan.
358 		 */
359 		res = _hash_next(scan, dir);
360 	}
361 	else
362 		res = _hash_first(scan, dir);
363 
364 	/*
365 	 * Skip killed tuples if asked to.
366 	 */
367 	if (scan->ignore_killed_tuples)
368 	{
369 		while (res)
370 		{
371 			offnum = ItemPointerGetOffsetNumber(current);
372 			page = BufferGetPage(so->hashso_curbuf);
373 			if (!ItemIdIsDead(PageGetItemId(page, offnum)))
374 				break;
375 			res = _hash_next(scan, dir);
376 		}
377 	}
378 
379 	/* Release read lock on current buffer, but keep it pinned */
380 	if (BufferIsValid(so->hashso_curbuf))
381 		LockBuffer(so->hashso_curbuf, BUFFER_LOCK_UNLOCK);
382 
383 	/* Return current heap TID on success */
384 	scan->xs_ctup.t_self = so->hashso_heappos;
385 
386 	return res;
387 }
388 
389 
390 /*
391  *	hashgetbitmap() -- get all tuples at once
392  */
393 int64
hashgetbitmap(IndexScanDesc scan,TIDBitmap * tbm)394 hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
395 {
396 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
397 	bool		res;
398 	int64		ntids = 0;
399 
400 	res = _hash_first(scan, ForwardScanDirection);
401 
402 	while (res)
403 	{
404 		bool		add_tuple;
405 
406 		/*
407 		 * Skip killed tuples if asked to.
408 		 */
409 		if (scan->ignore_killed_tuples)
410 		{
411 			Page		page;
412 			OffsetNumber offnum;
413 
414 			offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
415 			page = BufferGetPage(so->hashso_curbuf);
416 			add_tuple = !ItemIdIsDead(PageGetItemId(page, offnum));
417 		}
418 		else
419 			add_tuple = true;
420 
421 		/* Save tuple ID, and continue scanning */
422 		if (add_tuple)
423 		{
424 			/* Note we mark the tuple ID as requiring recheck */
425 			tbm_add_tuples(tbm, &(so->hashso_heappos), 1, true);
426 			ntids++;
427 		}
428 
429 		res = _hash_next(scan, ForwardScanDirection);
430 	}
431 
432 	return ntids;
433 }
434 
435 
436 /*
437  *	hashbeginscan() -- start a scan on a hash index
438  */
439 IndexScanDesc
hashbeginscan(Relation rel,int nkeys,int norderbys)440 hashbeginscan(Relation rel, int nkeys, int norderbys)
441 {
442 	IndexScanDesc scan;
443 	HashScanOpaque so;
444 
445 	/* no order by operators allowed */
446 	Assert(norderbys == 0);
447 
448 	scan = RelationGetIndexScan(rel, nkeys, norderbys);
449 
450 	so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
451 	so->hashso_curbuf = InvalidBuffer;
452 	so->hashso_bucket_buf = InvalidBuffer;
453 	so->hashso_split_bucket_buf = InvalidBuffer;
454 	/* set position invalid (this will cause _hash_first call) */
455 	ItemPointerSetInvalid(&(so->hashso_curpos));
456 	ItemPointerSetInvalid(&(so->hashso_heappos));
457 
458 	so->hashso_buc_populated = false;
459 	so->hashso_buc_split = false;
460 
461 	so->killedItems = NULL;
462 	so->numKilled = 0;
463 
464 	scan->opaque = so;
465 
466 	return scan;
467 }
468 
469 /*
470  *	hashrescan() -- rescan an index relation
471  */
472 void
hashrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)473 hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
474 		   ScanKey orderbys, int norderbys)
475 {
476 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
477 	Relation	rel = scan->indexRelation;
478 
479 	/*
480 	 * Before leaving current page, deal with any killed items. Also, ensure
481 	 * that we acquire lock on current page before calling _hash_kill_items.
482 	 */
483 	if (so->numKilled > 0)
484 	{
485 		LockBuffer(so->hashso_curbuf, BUFFER_LOCK_SHARE);
486 		_hash_kill_items(scan);
487 		LockBuffer(so->hashso_curbuf, BUFFER_LOCK_UNLOCK);
488 	}
489 
490 	_hash_dropscanbuf(rel, so);
491 
492 	/* set position invalid (this will cause _hash_first call) */
493 	ItemPointerSetInvalid(&(so->hashso_curpos));
494 	ItemPointerSetInvalid(&(so->hashso_heappos));
495 
496 	/* Update scan key, if a new one is given */
497 	if (scankey && scan->numberOfKeys > 0)
498 	{
499 		memmove(scan->keyData,
500 				scankey,
501 				scan->numberOfKeys * sizeof(ScanKeyData));
502 	}
503 
504 	so->hashso_buc_populated = false;
505 	so->hashso_buc_split = false;
506 }
507 
508 /*
509  *	hashendscan() -- close down a scan
510  */
511 void
hashendscan(IndexScanDesc scan)512 hashendscan(IndexScanDesc scan)
513 {
514 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
515 	Relation	rel = scan->indexRelation;
516 
517 	/*
518 	 * Before leaving current page, deal with any killed items. Also, ensure
519 	 * that we acquire lock on current page before calling _hash_kill_items.
520 	 */
521 	if (so->numKilled > 0)
522 	{
523 		LockBuffer(so->hashso_curbuf, BUFFER_LOCK_SHARE);
524 		_hash_kill_items(scan);
525 		LockBuffer(so->hashso_curbuf, BUFFER_LOCK_UNLOCK);
526 	}
527 
528 	_hash_dropscanbuf(rel, so);
529 
530 	if (so->killedItems != NULL)
531 		pfree(so->killedItems);
532 	pfree(so);
533 	scan->opaque = NULL;
534 }
535 
536 /*
537  * Bulk deletion of all index entries pointing to a set of heap tuples.
538  * The set of target tuples is specified via a callback routine that tells
539  * whether any given heap tuple (identified by ItemPointer) is being deleted.
540  *
541  * This function also deletes the tuples that are moved by split to other
542  * bucket.
543  *
544  * Result: a palloc'd struct containing statistical info for VACUUM displays.
545  */
546 IndexBulkDeleteResult *
hashbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)547 hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
548 			   IndexBulkDeleteCallback callback, void *callback_state)
549 {
550 	Relation	rel = info->index;
551 	double		tuples_removed;
552 	double		num_index_tuples;
553 	double		orig_ntuples;
554 	Bucket		orig_maxbucket;
555 	Bucket		cur_maxbucket;
556 	Bucket		cur_bucket;
557 	Buffer		metabuf = InvalidBuffer;
558 	HashMetaPage metap;
559 	HashMetaPage cachedmetap;
560 
561 	tuples_removed = 0;
562 	num_index_tuples = 0;
563 
564 	/*
565 	 * We need a copy of the metapage so that we can use its hashm_spares[]
566 	 * values to compute bucket page addresses, but a cached copy should be
567 	 * good enough.  (If not, we'll detect that further down and refresh the
568 	 * cache as necessary.)
569 	 */
570 	cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
571 	Assert(cachedmetap != NULL);
572 
573 	orig_maxbucket = cachedmetap->hashm_maxbucket;
574 	orig_ntuples = cachedmetap->hashm_ntuples;
575 
576 	/* Scan the buckets that we know exist */
577 	cur_bucket = 0;
578 	cur_maxbucket = orig_maxbucket;
579 
580 loop_top:
581 	while (cur_bucket <= cur_maxbucket)
582 	{
583 		BlockNumber bucket_blkno;
584 		BlockNumber blkno;
585 		Buffer		bucket_buf;
586 		Buffer		buf;
587 		HashPageOpaque bucket_opaque;
588 		Page		page;
589 		bool		split_cleanup = false;
590 
591 		/* Get address of bucket's start page */
592 		bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
593 
594 		blkno = bucket_blkno;
595 
596 		/*
597 		 * We need to acquire a cleanup lock on the primary bucket page to out
598 		 * wait concurrent scans before deleting the dead tuples.
599 		 */
600 		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
601 		LockBufferForCleanup(buf);
602 		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
603 
604 		page = BufferGetPage(buf);
605 		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
606 
607 		/*
608 		 * If the bucket contains tuples that are moved by split, then we need
609 		 * to delete such tuples.  We can't delete such tuples if the split
610 		 * operation on bucket is not finished as those are needed by scans.
611 		 */
612 		if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
613 			H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
614 		{
615 			split_cleanup = true;
616 
617 			/*
618 			 * This bucket might have been split since we last held a lock on
619 			 * the metapage.  If so, hashm_maxbucket, hashm_highmask and
620 			 * hashm_lowmask might be old enough to cause us to fail to remove
621 			 * tuples left behind by the most recent split.  To prevent that,
622 			 * now that the primary page of the target bucket has been locked
623 			 * (and thus can't be further split), check whether we need to
624 			 * update our cached metapage data.
625 			 */
626 			Assert(bucket_opaque->hasho_prevblkno != InvalidBlockNumber);
627 			if (bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
628 			{
629 				cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
630 				Assert(cachedmetap != NULL);
631 			}
632 		}
633 
634 		bucket_buf = buf;
635 
636 		hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
637 						  cachedmetap->hashm_maxbucket,
638 						  cachedmetap->hashm_highmask,
639 						  cachedmetap->hashm_lowmask, &tuples_removed,
640 						  &num_index_tuples, split_cleanup,
641 						  callback, callback_state);
642 
643 		_hash_dropbuf(rel, bucket_buf);
644 
645 		/* Advance to next bucket */
646 		cur_bucket++;
647 	}
648 
649 	if (BufferIsInvalid(metabuf))
650 		metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
651 
652 	/* Write-lock metapage and check for split since we started */
653 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
654 	metap = HashPageGetMeta(BufferGetPage(metabuf));
655 
656 	if (cur_maxbucket != metap->hashm_maxbucket)
657 	{
658 		/* There's been a split, so process the additional bucket(s) */
659 		LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
660 		cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
661 		Assert(cachedmetap != NULL);
662 		cur_maxbucket = cachedmetap->hashm_maxbucket;
663 		goto loop_top;
664 	}
665 
666 	/* Okay, we're really done.  Update tuple count in metapage. */
667 	START_CRIT_SECTION();
668 
669 	if (orig_maxbucket == metap->hashm_maxbucket &&
670 		orig_ntuples == metap->hashm_ntuples)
671 	{
672 		/*
673 		 * No one has split or inserted anything since start of scan, so
674 		 * believe our count as gospel.
675 		 */
676 		metap->hashm_ntuples = num_index_tuples;
677 	}
678 	else
679 	{
680 		/*
681 		 * Otherwise, our count is untrustworthy since we may have
682 		 * double-scanned tuples in split buckets.  Proceed by dead-reckoning.
683 		 * (Note: we still return estimated_count = false, because using this
684 		 * count is better than not updating reltuples at all.)
685 		 */
686 		if (metap->hashm_ntuples > tuples_removed)
687 			metap->hashm_ntuples -= tuples_removed;
688 		else
689 			metap->hashm_ntuples = 0;
690 		num_index_tuples = metap->hashm_ntuples;
691 	}
692 
693 	MarkBufferDirty(metabuf);
694 
695 	/* XLOG stuff */
696 	if (RelationNeedsWAL(rel))
697 	{
698 		xl_hash_update_meta_page xlrec;
699 		XLogRecPtr	recptr;
700 
701 		xlrec.ntuples = metap->hashm_ntuples;
702 
703 		XLogBeginInsert();
704 		XLogRegisterData((char *) &xlrec, SizeOfHashUpdateMetaPage);
705 
706 		XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
707 
708 		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
709 		PageSetLSN(BufferGetPage(metabuf), recptr);
710 	}
711 
712 	END_CRIT_SECTION();
713 
714 	_hash_relbuf(rel, metabuf);
715 
716 	/* return statistics */
717 	if (stats == NULL)
718 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
719 	stats->estimated_count = false;
720 	stats->num_index_tuples = num_index_tuples;
721 	stats->tuples_removed += tuples_removed;
722 	/* hashvacuumcleanup will fill in num_pages */
723 
724 	return stats;
725 }
726 
727 /*
728  * Post-VACUUM cleanup.
729  *
730  * Result: a palloc'd struct containing statistical info for VACUUM displays.
731  */
732 IndexBulkDeleteResult *
hashvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)733 hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
734 {
735 	Relation	rel = info->index;
736 	BlockNumber num_pages;
737 
738 	/* If hashbulkdelete wasn't called, return NULL signifying no change */
739 	/* Note: this covers the analyze_only case too */
740 	if (stats == NULL)
741 		return NULL;
742 
743 	/* update statistics */
744 	num_pages = RelationGetNumberOfBlocks(rel);
745 	stats->num_pages = num_pages;
746 
747 	return stats;
748 }
749 
750 /*
751  * Helper function to perform deletion of index entries from a bucket.
752  *
753  * This function expects that the caller has acquired a cleanup lock on the
754  * primary bucket page, and will return with a write lock again held on the
755  * primary bucket page.  The lock won't necessarily be held continuously,
756  * though, because we'll release it when visiting overflow pages.
757  *
758  * It would be very bad if this function cleaned a page while some other
759  * backend was in the midst of scanning it, because hashgettuple assumes
760  * that the next valid TID will be greater than or equal to the current
761  * valid TID.  There can't be any concurrent scans in progress when we first
762  * enter this function because of the cleanup lock we hold on the primary
763  * bucket page, but as soon as we release that lock, there might be.  We
764  * handle that by conspiring to prevent those scans from passing our cleanup
765  * scan.  To do that, we lock the next page in the bucket chain before
766  * releasing the lock on the previous page.  (This type of lock chaining is
767  * not ideal, so we might want to look for a better solution at some point.)
768  *
769  * We need to retain a pin on the primary bucket to ensure that no concurrent
770  * split can start.
771  */
772 void
hashbucketcleanup(Relation rel,Bucket cur_bucket,Buffer bucket_buf,BlockNumber bucket_blkno,BufferAccessStrategy bstrategy,uint32 maxbucket,uint32 highmask,uint32 lowmask,double * tuples_removed,double * num_index_tuples,bool split_cleanup,IndexBulkDeleteCallback callback,void * callback_state)773 hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
774 				  BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
775 				  uint32 maxbucket, uint32 highmask, uint32 lowmask,
776 				  double *tuples_removed, double *num_index_tuples,
777 				  bool split_cleanup,
778 				  IndexBulkDeleteCallback callback, void *callback_state)
779 {
780 	BlockNumber blkno;
781 	Buffer		buf;
782 	Bucket		new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
783 	bool		bucket_dirty = false;
784 
785 	blkno = bucket_blkno;
786 	buf = bucket_buf;
787 
788 	if (split_cleanup)
789 		new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
790 														lowmask, maxbucket);
791 
792 	/* Scan each page in bucket */
793 	for (;;)
794 	{
795 		HashPageOpaque opaque;
796 		OffsetNumber offno;
797 		OffsetNumber maxoffno;
798 		Buffer		next_buf;
799 		Page		page;
800 		OffsetNumber deletable[MaxOffsetNumber];
801 		int			ndeletable = 0;
802 		bool		retain_pin = false;
803 		bool		clear_dead_marking = false;
804 
805 		vacuum_delay_point();
806 
807 		page = BufferGetPage(buf);
808 		opaque = (HashPageOpaque) PageGetSpecialPointer(page);
809 
810 		/* Scan each tuple in page */
811 		maxoffno = PageGetMaxOffsetNumber(page);
812 		for (offno = FirstOffsetNumber;
813 			 offno <= maxoffno;
814 			 offno = OffsetNumberNext(offno))
815 		{
816 			ItemPointer htup;
817 			IndexTuple	itup;
818 			Bucket		bucket;
819 			bool		kill_tuple = false;
820 
821 			itup = (IndexTuple) PageGetItem(page,
822 											PageGetItemId(page, offno));
823 			htup = &(itup->t_tid);
824 
825 			/*
826 			 * To remove the dead tuples, we strictly want to rely on results
827 			 * of callback function.  refer btvacuumpage for detailed reason.
828 			 */
829 			if (callback && callback(htup, callback_state))
830 			{
831 				kill_tuple = true;
832 				if (tuples_removed)
833 					*tuples_removed += 1;
834 			}
835 			else if (split_cleanup)
836 			{
837 				/* delete the tuples that are moved by split. */
838 				bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
839 											  maxbucket,
840 											  highmask,
841 											  lowmask);
842 				/* mark the item for deletion */
843 				if (bucket != cur_bucket)
844 				{
845 					/*
846 					 * We expect tuples to either belong to current bucket or
847 					 * new_bucket.  This is ensured because we don't allow
848 					 * further splits from bucket that contains garbage. See
849 					 * comments in _hash_expandtable.
850 					 */
851 					Assert(bucket == new_bucket);
852 					kill_tuple = true;
853 				}
854 			}
855 
856 			if (kill_tuple)
857 			{
858 				/* mark the item for deletion */
859 				deletable[ndeletable++] = offno;
860 			}
861 			else
862 			{
863 				/* we're keeping it, so count it */
864 				if (num_index_tuples)
865 					*num_index_tuples += 1;
866 			}
867 		}
868 
869 		/* retain the pin on primary bucket page till end of bucket scan */
870 		if (blkno == bucket_blkno)
871 			retain_pin = true;
872 		else
873 			retain_pin = false;
874 
875 		blkno = opaque->hasho_nextblkno;
876 
877 		/*
878 		 * Apply deletions, advance to next page and write page if needed.
879 		 */
880 		if (ndeletable > 0)
881 		{
882 			/* No ereport(ERROR) until changes are logged */
883 			START_CRIT_SECTION();
884 
885 			PageIndexMultiDelete(page, deletable, ndeletable);
886 			bucket_dirty = true;
887 
888 			/*
889 			 * Let us mark the page as clean if vacuum removes the DEAD tuples
890 			 * from an index page. We do this by clearing
891 			 * LH_PAGE_HAS_DEAD_TUPLES flag.
892 			 */
893 			if (tuples_removed && *tuples_removed > 0 &&
894 				H_HAS_DEAD_TUPLES(opaque))
895 			{
896 				opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
897 				clear_dead_marking = true;
898 			}
899 
900 			MarkBufferDirty(buf);
901 
902 			/* XLOG stuff */
903 			if (RelationNeedsWAL(rel))
904 			{
905 				xl_hash_delete xlrec;
906 				XLogRecPtr	recptr;
907 
908 				xlrec.clear_dead_marking = clear_dead_marking;
909 				xlrec.is_primary_bucket_page = (buf == bucket_buf) ? true : false;
910 
911 				XLogBeginInsert();
912 				XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
913 
914 				/*
915 				 * bucket buffer needs to be registered to ensure that we can
916 				 * acquire a cleanup lock on it during replay.
917 				 */
918 				if (!xlrec.is_primary_bucket_page)
919 					XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
920 
921 				XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
922 				XLogRegisterBufData(1, (char *) deletable,
923 									ndeletable * sizeof(OffsetNumber));
924 
925 				recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
926 				PageSetLSN(BufferGetPage(buf), recptr);
927 			}
928 
929 			END_CRIT_SECTION();
930 		}
931 
932 		/* bail out if there are no more pages to scan. */
933 		if (!BlockNumberIsValid(blkno))
934 			break;
935 
936 		next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
937 											  LH_OVERFLOW_PAGE,
938 											  bstrategy);
939 
940 		/*
941 		 * release the lock on previous page after acquiring the lock on next
942 		 * page
943 		 */
944 		if (retain_pin)
945 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
946 		else
947 			_hash_relbuf(rel, buf);
948 
949 		buf = next_buf;
950 	}
951 
952 	/*
953 	 * lock the bucket page to clear the garbage flag and squeeze the bucket.
954 	 * if the current buffer is same as bucket buffer, then we already have
955 	 * lock on bucket page.
956 	 */
957 	if (buf != bucket_buf)
958 	{
959 		_hash_relbuf(rel, buf);
960 		LockBuffer(bucket_buf, BUFFER_LOCK_EXCLUSIVE);
961 	}
962 
963 	/*
964 	 * Clear the garbage flag from bucket after deleting the tuples that are
965 	 * moved by split.  We purposefully clear the flag before squeeze bucket,
966 	 * so that after restart, vacuum shouldn't again try to delete the moved
967 	 * by split tuples.
968 	 */
969 	if (split_cleanup)
970 	{
971 		HashPageOpaque bucket_opaque;
972 		Page		page;
973 
974 		page = BufferGetPage(bucket_buf);
975 		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
976 
977 		/* No ereport(ERROR) until changes are logged */
978 		START_CRIT_SECTION();
979 
980 		bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
981 		MarkBufferDirty(bucket_buf);
982 
983 		/* XLOG stuff */
984 		if (RelationNeedsWAL(rel))
985 		{
986 			XLogRecPtr	recptr;
987 
988 			XLogBeginInsert();
989 			XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
990 
991 			recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
992 			PageSetLSN(page, recptr);
993 		}
994 
995 		END_CRIT_SECTION();
996 	}
997 
998 	/*
999 	 * If we have deleted anything, try to compact free space.  For squeezing
1000 	 * the bucket, we must have a cleanup lock, else it can impact the
1001 	 * ordering of tuples for a scan that has started before it.
1002 	 */
1003 	if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
1004 		_hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
1005 							bstrategy);
1006 	else
1007 		LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);
1008 }
1009