1 /*-------------------------------------------------------------------------
2  *
3  * hash.c
4  *	  Implementation of Margo Seltzer's Hashing package for postgres.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hash.c
12  *
13  * NOTES
14  *	  This file contains only the public interface routines.
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/hash.h"
22 #include "access/hash_xlog.h"
23 #include "access/relscan.h"
24 #include "access/tableam.h"
25 #include "catalog/index.h"
26 #include "commands/progress.h"
27 #include "commands/vacuum.h"
28 #include "miscadmin.h"
29 #include "optimizer/plancat.h"
30 #include "pgstat.h"
31 #include "utils/builtins.h"
32 #include "utils/index_selfuncs.h"
33 #include "utils/rel.h"
34 #include "miscadmin.h"
35 
36 
37 /* Working state for hashbuild and its callback */
38 typedef struct
39 {
40 	HSpool	   *spool;			/* NULL if not using spooling */
41 	double		indtuples;		/* # tuples accepted into index */
42 	Relation	heapRel;		/* heap relation descriptor */
43 } HashBuildState;
44 
45 static void hashbuildCallback(Relation index,
46 							  HeapTuple htup,
47 							  Datum *values,
48 							  bool *isnull,
49 							  bool tupleIsAlive,
50 							  void *state);
51 
52 
53 /*
54  * Hash handler function: return IndexAmRoutine with access method parameters
55  * and callbacks.
56  */
57 Datum
hashhandler(PG_FUNCTION_ARGS)58 hashhandler(PG_FUNCTION_ARGS)
59 {
60 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
61 
62 	amroutine->amstrategies = HTMaxStrategyNumber;
63 	amroutine->amsupport = HASHNProcs;
64 	amroutine->amcanorder = false;
65 	amroutine->amcanorderbyop = false;
66 	amroutine->amcanbackward = true;
67 	amroutine->amcanunique = false;
68 	amroutine->amcanmulticol = false;
69 	amroutine->amoptionalkey = false;
70 	amroutine->amsearcharray = false;
71 	amroutine->amsearchnulls = false;
72 	amroutine->amstorage = false;
73 	amroutine->amclusterable = false;
74 	amroutine->ampredlocks = true;
75 	amroutine->amcanparallel = false;
76 	amroutine->amcaninclude = false;
77 	amroutine->amkeytype = INT4OID;
78 
79 	amroutine->ambuild = hashbuild;
80 	amroutine->ambuildempty = hashbuildempty;
81 	amroutine->aminsert = hashinsert;
82 	amroutine->ambulkdelete = hashbulkdelete;
83 	amroutine->amvacuumcleanup = hashvacuumcleanup;
84 	amroutine->amcanreturn = NULL;
85 	amroutine->amcostestimate = hashcostestimate;
86 	amroutine->amoptions = hashoptions;
87 	amroutine->amproperty = NULL;
88 	amroutine->ambuildphasename = NULL;
89 	amroutine->amvalidate = hashvalidate;
90 	amroutine->ambeginscan = hashbeginscan;
91 	amroutine->amrescan = hashrescan;
92 	amroutine->amgettuple = hashgettuple;
93 	amroutine->amgetbitmap = hashgetbitmap;
94 	amroutine->amendscan = hashendscan;
95 	amroutine->ammarkpos = NULL;
96 	amroutine->amrestrpos = NULL;
97 	amroutine->amestimateparallelscan = NULL;
98 	amroutine->aminitparallelscan = NULL;
99 	amroutine->amparallelrescan = NULL;
100 
101 	PG_RETURN_POINTER(amroutine);
102 }
103 
104 /*
105  *	hashbuild() -- build a new hash index.
106  */
107 IndexBuildResult *
hashbuild(Relation heap,Relation index,IndexInfo * indexInfo)108 hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
109 {
110 	IndexBuildResult *result;
111 	BlockNumber relpages;
112 	double		reltuples;
113 	double		allvisfrac;
114 	uint32		num_buckets;
115 	long		sort_threshold;
116 	HashBuildState buildstate;
117 
118 	/*
119 	 * We expect to be called exactly once for any index relation. If that's
120 	 * not the case, big trouble's what we have.
121 	 */
122 	if (RelationGetNumberOfBlocks(index) != 0)
123 		elog(ERROR, "index \"%s\" already contains data",
124 			 RelationGetRelationName(index));
125 
126 	/* Estimate the number of rows currently present in the table */
127 	estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
128 
129 	/* Initialize the hash index metadata page and initial buckets */
130 	num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
131 
132 	/*
133 	 * If we just insert the tuples into the index in scan order, then
134 	 * (assuming their hash codes are pretty random) there will be no locality
135 	 * of access to the index, and if the index is bigger than available RAM
136 	 * then we'll thrash horribly.  To prevent that scenario, we can sort the
137 	 * tuples by (expected) bucket number.  However, such a sort is useless
138 	 * overhead when the index does fit in RAM.  We choose to sort if the
139 	 * initial index size exceeds maintenance_work_mem, or the number of
140 	 * buffers usable for the index, whichever is less.  (Limiting by the
141 	 * number of buffers should reduce thrashing between PG buffers and kernel
142 	 * buffers, which seems useful even if no physical I/O results.  Limiting
143 	 * by maintenance_work_mem is useful to allow easy testing of the sort
144 	 * code path, and may be useful to DBAs as an additional control knob.)
145 	 *
146 	 * NOTE: this test will need adjustment if a bucket is ever different from
147 	 * one page.  Also, "initial index size" accounting does not include the
148 	 * metapage, nor the first bitmap page.
149 	 */
150 	sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
151 	if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
152 		sort_threshold = Min(sort_threshold, NBuffers);
153 	else
154 		sort_threshold = Min(sort_threshold, NLocBuffer);
155 
156 	if (num_buckets >= (uint32) sort_threshold)
157 		buildstate.spool = _h_spoolinit(heap, index, num_buckets);
158 	else
159 		buildstate.spool = NULL;
160 
161 	/* prepare to build the index */
162 	buildstate.indtuples = 0;
163 	buildstate.heapRel = heap;
164 
165 	/* do the heap scan */
166 	reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
167 									   hashbuildCallback,
168 									   (void *) &buildstate, NULL);
169 	pgstat_progress_update_param(PROGRESS_CREATEIDX_TUPLES_TOTAL,
170 								 buildstate.indtuples);
171 
172 	if (buildstate.spool)
173 	{
174 		/* sort the tuples and insert them into the index */
175 		_h_indexbuild(buildstate.spool, buildstate.heapRel);
176 		_h_spooldestroy(buildstate.spool);
177 	}
178 
179 	/*
180 	 * Return statistics
181 	 */
182 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
183 
184 	result->heap_tuples = reltuples;
185 	result->index_tuples = buildstate.indtuples;
186 
187 	return result;
188 }
189 
190 /*
191  *	hashbuildempty() -- build an empty hash index in the initialization fork
192  */
193 void
hashbuildempty(Relation index)194 hashbuildempty(Relation index)
195 {
196 	_hash_init(index, 0, INIT_FORKNUM);
197 }
198 
199 /*
200  * Per-tuple callback for table_index_build_scan
201  */
202 static void
hashbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * state)203 hashbuildCallback(Relation index,
204 				  HeapTuple htup,
205 				  Datum *values,
206 				  bool *isnull,
207 				  bool tupleIsAlive,
208 				  void *state)
209 {
210 	HashBuildState *buildstate = (HashBuildState *) state;
211 	Datum		index_values[1];
212 	bool		index_isnull[1];
213 	IndexTuple	itup;
214 
215 	/* convert data to a hash key; on failure, do not insert anything */
216 	if (!_hash_convert_tuple(index,
217 							 values, isnull,
218 							 index_values, index_isnull))
219 		return;
220 
221 	/* Either spool the tuple for sorting, or just put it into the index */
222 	if (buildstate->spool)
223 		_h_spool(buildstate->spool, &htup->t_self,
224 				 index_values, index_isnull);
225 	else
226 	{
227 		/* form an index tuple and point it at the heap tuple */
228 		itup = index_form_tuple(RelationGetDescr(index),
229 								index_values, index_isnull);
230 		itup->t_tid = htup->t_self;
231 		_hash_doinsert(index, itup, buildstate->heapRel);
232 		pfree(itup);
233 	}
234 
235 	buildstate->indtuples += 1;
236 }
237 
238 /*
239  *	hashinsert() -- insert an index tuple into a hash table.
240  *
241  *	Hash on the heap tuple's key, form an index tuple with hash code.
242  *	Find the appropriate location for the new tuple, and put it there.
243  */
244 bool
hashinsert(Relation rel,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)245 hashinsert(Relation rel, Datum *values, bool *isnull,
246 		   ItemPointer ht_ctid, Relation heapRel,
247 		   IndexUniqueCheck checkUnique,
248 		   IndexInfo *indexInfo)
249 {
250 	Datum		index_values[1];
251 	bool		index_isnull[1];
252 	IndexTuple	itup;
253 
254 	/* convert data to a hash key; on failure, do not insert anything */
255 	if (!_hash_convert_tuple(rel,
256 							 values, isnull,
257 							 index_values, index_isnull))
258 		return false;
259 
260 	/* form an index tuple and point it at the heap tuple */
261 	itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
262 	itup->t_tid = *ht_ctid;
263 
264 	_hash_doinsert(rel, itup, heapRel);
265 
266 	pfree(itup);
267 
268 	return false;
269 }
270 
271 
272 /*
273  *	hashgettuple() -- Get the next tuple in the scan.
274  */
275 bool
hashgettuple(IndexScanDesc scan,ScanDirection dir)276 hashgettuple(IndexScanDesc scan, ScanDirection dir)
277 {
278 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
279 	bool		res;
280 
281 	/* Hash indexes are always lossy since we store only the hash code */
282 	scan->xs_recheck = true;
283 
284 	/*
285 	 * If we've already initialized this scan, we can just advance it in the
286 	 * appropriate direction.  If we haven't done so yet, we call a routine to
287 	 * get the first item in the scan.
288 	 */
289 	if (!HashScanPosIsValid(so->currPos))
290 		res = _hash_first(scan, dir);
291 	else
292 	{
293 		/*
294 		 * Check to see if we should kill the previously-fetched tuple.
295 		 */
296 		if (scan->kill_prior_tuple)
297 		{
298 			/*
299 			 * Yes, so remember it for later. (We'll deal with all such tuples
300 			 * at once right after leaving the index page or at end of scan.)
301 			 * In case if caller reverses the indexscan direction it is quite
302 			 * possible that the same item might get entered multiple times.
303 			 * But, we don't detect that; instead, we just forget any excess
304 			 * entries.
305 			 */
306 			if (so->killedItems == NULL)
307 				so->killedItems = (int *)
308 					palloc(MaxIndexTuplesPerPage * sizeof(int));
309 
310 			if (so->numKilled < MaxIndexTuplesPerPage)
311 				so->killedItems[so->numKilled++] = so->currPos.itemIndex;
312 		}
313 
314 		/*
315 		 * Now continue the scan.
316 		 */
317 		res = _hash_next(scan, dir);
318 	}
319 
320 	return res;
321 }
322 
323 
324 /*
325  *	hashgetbitmap() -- get all tuples at once
326  */
327 int64
hashgetbitmap(IndexScanDesc scan,TIDBitmap * tbm)328 hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
329 {
330 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
331 	bool		res;
332 	int64		ntids = 0;
333 	HashScanPosItem *currItem;
334 
335 	res = _hash_first(scan, ForwardScanDirection);
336 
337 	while (res)
338 	{
339 		currItem = &so->currPos.items[so->currPos.itemIndex];
340 
341 		/*
342 		 * _hash_first and _hash_next handle eliminate dead index entries
343 		 * whenever scan->ignore_killed_tuples is true.  Therefore, there's
344 		 * nothing to do here except add the results to the TIDBitmap.
345 		 */
346 		tbm_add_tuples(tbm, &(currItem->heapTid), 1, true);
347 		ntids++;
348 
349 		res = _hash_next(scan, ForwardScanDirection);
350 	}
351 
352 	return ntids;
353 }
354 
355 
356 /*
357  *	hashbeginscan() -- start a scan on a hash index
358  */
359 IndexScanDesc
hashbeginscan(Relation rel,int nkeys,int norderbys)360 hashbeginscan(Relation rel, int nkeys, int norderbys)
361 {
362 	IndexScanDesc scan;
363 	HashScanOpaque so;
364 
365 	/* no order by operators allowed */
366 	Assert(norderbys == 0);
367 
368 	scan = RelationGetIndexScan(rel, nkeys, norderbys);
369 
370 	so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
371 	HashScanPosInvalidate(so->currPos);
372 	so->hashso_bucket_buf = InvalidBuffer;
373 	so->hashso_split_bucket_buf = InvalidBuffer;
374 
375 	so->hashso_buc_populated = false;
376 	so->hashso_buc_split = false;
377 
378 	so->killedItems = NULL;
379 	so->numKilled = 0;
380 
381 	scan->opaque = so;
382 
383 	return scan;
384 }
385 
386 /*
387  *	hashrescan() -- rescan an index relation
388  */
389 void
hashrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)390 hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
391 		   ScanKey orderbys, int norderbys)
392 {
393 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
394 	Relation	rel = scan->indexRelation;
395 
396 	if (HashScanPosIsValid(so->currPos))
397 	{
398 		/* Before leaving current page, deal with any killed items */
399 		if (so->numKilled > 0)
400 			_hash_kill_items(scan);
401 	}
402 
403 	_hash_dropscanbuf(rel, so);
404 
405 	/* set position invalid (this will cause _hash_first call) */
406 	HashScanPosInvalidate(so->currPos);
407 
408 	/* Update scan key, if a new one is given */
409 	if (scankey && scan->numberOfKeys > 0)
410 	{
411 		memmove(scan->keyData,
412 				scankey,
413 				scan->numberOfKeys * sizeof(ScanKeyData));
414 	}
415 
416 	so->hashso_buc_populated = false;
417 	so->hashso_buc_split = false;
418 }
419 
420 /*
421  *	hashendscan() -- close down a scan
422  */
423 void
hashendscan(IndexScanDesc scan)424 hashendscan(IndexScanDesc scan)
425 {
426 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
427 	Relation	rel = scan->indexRelation;
428 
429 	if (HashScanPosIsValid(so->currPos))
430 	{
431 		/* Before leaving current page, deal with any killed items */
432 		if (so->numKilled > 0)
433 			_hash_kill_items(scan);
434 	}
435 
436 	_hash_dropscanbuf(rel, so);
437 
438 	if (so->killedItems != NULL)
439 		pfree(so->killedItems);
440 	pfree(so);
441 	scan->opaque = NULL;
442 }
443 
444 /*
445  * Bulk deletion of all index entries pointing to a set of heap tuples.
446  * The set of target tuples is specified via a callback routine that tells
447  * whether any given heap tuple (identified by ItemPointer) is being deleted.
448  *
449  * This function also deletes the tuples that are moved by split to other
450  * bucket.
451  *
452  * Result: a palloc'd struct containing statistical info for VACUUM displays.
453  */
454 IndexBulkDeleteResult *
hashbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)455 hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
456 			   IndexBulkDeleteCallback callback, void *callback_state)
457 {
458 	Relation	rel = info->index;
459 	double		tuples_removed;
460 	double		num_index_tuples;
461 	double		orig_ntuples;
462 	Bucket		orig_maxbucket;
463 	Bucket		cur_maxbucket;
464 	Bucket		cur_bucket;
465 	Buffer		metabuf = InvalidBuffer;
466 	HashMetaPage metap;
467 	HashMetaPage cachedmetap;
468 
469 	tuples_removed = 0;
470 	num_index_tuples = 0;
471 
472 	/*
473 	 * We need a copy of the metapage so that we can use its hashm_spares[]
474 	 * values to compute bucket page addresses, but a cached copy should be
475 	 * good enough.  (If not, we'll detect that further down and refresh the
476 	 * cache as necessary.)
477 	 */
478 	cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
479 	Assert(cachedmetap != NULL);
480 
481 	orig_maxbucket = cachedmetap->hashm_maxbucket;
482 	orig_ntuples = cachedmetap->hashm_ntuples;
483 
484 	/* Scan the buckets that we know exist */
485 	cur_bucket = 0;
486 	cur_maxbucket = orig_maxbucket;
487 
488 loop_top:
489 	while (cur_bucket <= cur_maxbucket)
490 	{
491 		BlockNumber bucket_blkno;
492 		BlockNumber blkno;
493 		Buffer		bucket_buf;
494 		Buffer		buf;
495 		HashPageOpaque bucket_opaque;
496 		Page		page;
497 		bool		split_cleanup = false;
498 
499 		/* Get address of bucket's start page */
500 		bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
501 
502 		blkno = bucket_blkno;
503 
504 		/*
505 		 * We need to acquire a cleanup lock on the primary bucket page to out
506 		 * wait concurrent scans before deleting the dead tuples.
507 		 */
508 		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
509 		LockBufferForCleanup(buf);
510 		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
511 
512 		page = BufferGetPage(buf);
513 		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
514 
515 		/*
516 		 * If the bucket contains tuples that are moved by split, then we need
517 		 * to delete such tuples.  We can't delete such tuples if the split
518 		 * operation on bucket is not finished as those are needed by scans.
519 		 */
520 		if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
521 			H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
522 		{
523 			split_cleanup = true;
524 
525 			/*
526 			 * This bucket might have been split since we last held a lock on
527 			 * the metapage.  If so, hashm_maxbucket, hashm_highmask and
528 			 * hashm_lowmask might be old enough to cause us to fail to remove
529 			 * tuples left behind by the most recent split.  To prevent that,
530 			 * now that the primary page of the target bucket has been locked
531 			 * (and thus can't be further split), check whether we need to
532 			 * update our cached metapage data.
533 			 */
534 			Assert(bucket_opaque->hasho_prevblkno != InvalidBlockNumber);
535 			if (bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
536 			{
537 				cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
538 				Assert(cachedmetap != NULL);
539 			}
540 		}
541 
542 		bucket_buf = buf;
543 
544 		hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
545 						  cachedmetap->hashm_maxbucket,
546 						  cachedmetap->hashm_highmask,
547 						  cachedmetap->hashm_lowmask, &tuples_removed,
548 						  &num_index_tuples, split_cleanup,
549 						  callback, callback_state);
550 
551 		_hash_dropbuf(rel, bucket_buf);
552 
553 		/* Advance to next bucket */
554 		cur_bucket++;
555 	}
556 
557 	if (BufferIsInvalid(metabuf))
558 		metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
559 
560 	/* Write-lock metapage and check for split since we started */
561 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
562 	metap = HashPageGetMeta(BufferGetPage(metabuf));
563 
564 	if (cur_maxbucket != metap->hashm_maxbucket)
565 	{
566 		/* There's been a split, so process the additional bucket(s) */
567 		LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
568 		cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
569 		Assert(cachedmetap != NULL);
570 		cur_maxbucket = cachedmetap->hashm_maxbucket;
571 		goto loop_top;
572 	}
573 
574 	/* Okay, we're really done.  Update tuple count in metapage. */
575 	START_CRIT_SECTION();
576 
577 	if (orig_maxbucket == metap->hashm_maxbucket &&
578 		orig_ntuples == metap->hashm_ntuples)
579 	{
580 		/*
581 		 * No one has split or inserted anything since start of scan, so
582 		 * believe our count as gospel.
583 		 */
584 		metap->hashm_ntuples = num_index_tuples;
585 	}
586 	else
587 	{
588 		/*
589 		 * Otherwise, our count is untrustworthy since we may have
590 		 * double-scanned tuples in split buckets.  Proceed by dead-reckoning.
591 		 * (Note: we still return estimated_count = false, because using this
592 		 * count is better than not updating reltuples at all.)
593 		 */
594 		if (metap->hashm_ntuples > tuples_removed)
595 			metap->hashm_ntuples -= tuples_removed;
596 		else
597 			metap->hashm_ntuples = 0;
598 		num_index_tuples = metap->hashm_ntuples;
599 	}
600 
601 	MarkBufferDirty(metabuf);
602 
603 	/* XLOG stuff */
604 	if (RelationNeedsWAL(rel))
605 	{
606 		xl_hash_update_meta_page xlrec;
607 		XLogRecPtr	recptr;
608 
609 		xlrec.ntuples = metap->hashm_ntuples;
610 
611 		XLogBeginInsert();
612 		XLogRegisterData((char *) &xlrec, SizeOfHashUpdateMetaPage);
613 
614 		XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
615 
616 		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
617 		PageSetLSN(BufferGetPage(metabuf), recptr);
618 	}
619 
620 	END_CRIT_SECTION();
621 
622 	_hash_relbuf(rel, metabuf);
623 
624 	/* return statistics */
625 	if (stats == NULL)
626 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
627 	stats->estimated_count = false;
628 	stats->num_index_tuples = num_index_tuples;
629 	stats->tuples_removed += tuples_removed;
630 	/* hashvacuumcleanup will fill in num_pages */
631 
632 	return stats;
633 }
634 
635 /*
636  * Post-VACUUM cleanup.
637  *
638  * Result: a palloc'd struct containing statistical info for VACUUM displays.
639  */
640 IndexBulkDeleteResult *
hashvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)641 hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
642 {
643 	Relation	rel = info->index;
644 	BlockNumber num_pages;
645 
646 	/* If hashbulkdelete wasn't called, return NULL signifying no change */
647 	/* Note: this covers the analyze_only case too */
648 	if (stats == NULL)
649 		return NULL;
650 
651 	/* update statistics */
652 	num_pages = RelationGetNumberOfBlocks(rel);
653 	stats->num_pages = num_pages;
654 
655 	return stats;
656 }
657 
658 /*
659  * Helper function to perform deletion of index entries from a bucket.
660  *
661  * This function expects that the caller has acquired a cleanup lock on the
662  * primary bucket page, and will return with a write lock again held on the
663  * primary bucket page.  The lock won't necessarily be held continuously,
664  * though, because we'll release it when visiting overflow pages.
665  *
666  * There can't be any concurrent scans in progress when we first enter this
667  * function because of the cleanup lock we hold on the primary bucket page,
668  * but as soon as we release that lock, there might be.  If those scans got
669  * ahead of our cleanup scan, they might see a tuple before we kill it and
670  * wake up only after VACUUM has completed and the TID has been recycled for
671  * an unrelated tuple.  To avoid that calamity, we prevent scans from passing
672  * our cleanup scan by locking the next page in the bucket chain before
673  * releasing the lock on the previous page.  (This type of lock chaining is not
674  * ideal, so we might want to look for a better solution at some point.)
675  *
676  * We need to retain a pin on the primary bucket to ensure that no concurrent
677  * split can start.
678  */
679 void
hashbucketcleanup(Relation rel,Bucket cur_bucket,Buffer bucket_buf,BlockNumber bucket_blkno,BufferAccessStrategy bstrategy,uint32 maxbucket,uint32 highmask,uint32 lowmask,double * tuples_removed,double * num_index_tuples,bool split_cleanup,IndexBulkDeleteCallback callback,void * callback_state)680 hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
681 				  BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
682 				  uint32 maxbucket, uint32 highmask, uint32 lowmask,
683 				  double *tuples_removed, double *num_index_tuples,
684 				  bool split_cleanup,
685 				  IndexBulkDeleteCallback callback, void *callback_state)
686 {
687 	BlockNumber blkno;
688 	Buffer		buf;
689 	Bucket		new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
690 	bool		bucket_dirty = false;
691 
692 	blkno = bucket_blkno;
693 	buf = bucket_buf;
694 
695 	if (split_cleanup)
696 		new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
697 														lowmask, maxbucket);
698 
699 	/* Scan each page in bucket */
700 	for (;;)
701 	{
702 		HashPageOpaque opaque;
703 		OffsetNumber offno;
704 		OffsetNumber maxoffno;
705 		Buffer		next_buf;
706 		Page		page;
707 		OffsetNumber deletable[MaxOffsetNumber];
708 		int			ndeletable = 0;
709 		bool		retain_pin = false;
710 		bool		clear_dead_marking = false;
711 
712 		vacuum_delay_point();
713 
714 		page = BufferGetPage(buf);
715 		opaque = (HashPageOpaque) PageGetSpecialPointer(page);
716 
717 		/* Scan each tuple in page */
718 		maxoffno = PageGetMaxOffsetNumber(page);
719 		for (offno = FirstOffsetNumber;
720 			 offno <= maxoffno;
721 			 offno = OffsetNumberNext(offno))
722 		{
723 			ItemPointer htup;
724 			IndexTuple	itup;
725 			Bucket		bucket;
726 			bool		kill_tuple = false;
727 
728 			itup = (IndexTuple) PageGetItem(page,
729 											PageGetItemId(page, offno));
730 			htup = &(itup->t_tid);
731 
732 			/*
733 			 * To remove the dead tuples, we strictly want to rely on results
734 			 * of callback function.  refer btvacuumpage for detailed reason.
735 			 */
736 			if (callback && callback(htup, callback_state))
737 			{
738 				kill_tuple = true;
739 				if (tuples_removed)
740 					*tuples_removed += 1;
741 			}
742 			else if (split_cleanup)
743 			{
744 				/* delete the tuples that are moved by split. */
745 				bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
746 											  maxbucket,
747 											  highmask,
748 											  lowmask);
749 				/* mark the item for deletion */
750 				if (bucket != cur_bucket)
751 				{
752 					/*
753 					 * We expect tuples to either belong to current bucket or
754 					 * new_bucket.  This is ensured because we don't allow
755 					 * further splits from bucket that contains garbage. See
756 					 * comments in _hash_expandtable.
757 					 */
758 					Assert(bucket == new_bucket);
759 					kill_tuple = true;
760 				}
761 			}
762 
763 			if (kill_tuple)
764 			{
765 				/* mark the item for deletion */
766 				deletable[ndeletable++] = offno;
767 			}
768 			else
769 			{
770 				/* we're keeping it, so count it */
771 				if (num_index_tuples)
772 					*num_index_tuples += 1;
773 			}
774 		}
775 
776 		/* retain the pin on primary bucket page till end of bucket scan */
777 		if (blkno == bucket_blkno)
778 			retain_pin = true;
779 		else
780 			retain_pin = false;
781 
782 		blkno = opaque->hasho_nextblkno;
783 
784 		/*
785 		 * Apply deletions, advance to next page and write page if needed.
786 		 */
787 		if (ndeletable > 0)
788 		{
789 			/* No ereport(ERROR) until changes are logged */
790 			START_CRIT_SECTION();
791 
792 			PageIndexMultiDelete(page, deletable, ndeletable);
793 			bucket_dirty = true;
794 
795 			/*
796 			 * Let us mark the page as clean if vacuum removes the DEAD tuples
797 			 * from an index page. We do this by clearing
798 			 * LH_PAGE_HAS_DEAD_TUPLES flag.
799 			 */
800 			if (tuples_removed && *tuples_removed > 0 &&
801 				H_HAS_DEAD_TUPLES(opaque))
802 			{
803 				opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
804 				clear_dead_marking = true;
805 			}
806 
807 			MarkBufferDirty(buf);
808 
809 			/* XLOG stuff */
810 			if (RelationNeedsWAL(rel))
811 			{
812 				xl_hash_delete xlrec;
813 				XLogRecPtr	recptr;
814 
815 				xlrec.clear_dead_marking = clear_dead_marking;
816 				xlrec.is_primary_bucket_page = (buf == bucket_buf) ? true : false;
817 
818 				XLogBeginInsert();
819 				XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
820 
821 				/*
822 				 * bucket buffer needs to be registered to ensure that we can
823 				 * acquire a cleanup lock on it during replay.
824 				 */
825 				if (!xlrec.is_primary_bucket_page)
826 					XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
827 
828 				XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
829 				XLogRegisterBufData(1, (char *) deletable,
830 									ndeletable * sizeof(OffsetNumber));
831 
832 				recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
833 				PageSetLSN(BufferGetPage(buf), recptr);
834 			}
835 
836 			END_CRIT_SECTION();
837 		}
838 
839 		/* bail out if there are no more pages to scan. */
840 		if (!BlockNumberIsValid(blkno))
841 			break;
842 
843 		next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
844 											  LH_OVERFLOW_PAGE,
845 											  bstrategy);
846 
847 		/*
848 		 * release the lock on previous page after acquiring the lock on next
849 		 * page
850 		 */
851 		if (retain_pin)
852 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
853 		else
854 			_hash_relbuf(rel, buf);
855 
856 		buf = next_buf;
857 	}
858 
859 	/*
860 	 * lock the bucket page to clear the garbage flag and squeeze the bucket.
861 	 * if the current buffer is same as bucket buffer, then we already have
862 	 * lock on bucket page.
863 	 */
864 	if (buf != bucket_buf)
865 	{
866 		_hash_relbuf(rel, buf);
867 		LockBuffer(bucket_buf, BUFFER_LOCK_EXCLUSIVE);
868 	}
869 
870 	/*
871 	 * Clear the garbage flag from bucket after deleting the tuples that are
872 	 * moved by split.  We purposefully clear the flag before squeeze bucket,
873 	 * so that after restart, vacuum shouldn't again try to delete the moved
874 	 * by split tuples.
875 	 */
876 	if (split_cleanup)
877 	{
878 		HashPageOpaque bucket_opaque;
879 		Page		page;
880 
881 		page = BufferGetPage(bucket_buf);
882 		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
883 
884 		/* No ereport(ERROR) until changes are logged */
885 		START_CRIT_SECTION();
886 
887 		bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
888 		MarkBufferDirty(bucket_buf);
889 
890 		/* XLOG stuff */
891 		if (RelationNeedsWAL(rel))
892 		{
893 			XLogRecPtr	recptr;
894 
895 			XLogBeginInsert();
896 			XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
897 
898 			recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
899 			PageSetLSN(page, recptr);
900 		}
901 
902 		END_CRIT_SECTION();
903 	}
904 
905 	/*
906 	 * If we have deleted anything, try to compact free space.  For squeezing
907 	 * the bucket, we must have a cleanup lock, else it can impact the
908 	 * ordering of tuples for a scan that has started before it.
909 	 */
910 	if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
911 		_hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
912 							bstrategy);
913 	else
914 		LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);
915 }
916