1 /*-------------------------------------------------------------------------
2  *
3  * hash.c
4  *	  Implementation of Margo Seltzer's Hashing package for postgres.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hash.c
12  *
13  * NOTES
14  *	  This file contains only the public interface routines.
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/hash.h"
22 #include "access/hash_xlog.h"
23 #include "access/relscan.h"
24 #include "catalog/index.h"
25 #include "commands/vacuum.h"
26 #include "miscadmin.h"
27 #include "optimizer/plancat.h"
28 #include "utils/builtins.h"
29 #include "utils/index_selfuncs.h"
30 #include "utils/rel.h"
31 #include "miscadmin.h"
32 
33 
34 /* Working state for hashbuild and its callback */
35 typedef struct
36 {
37 	HSpool	   *spool;			/* NULL if not using spooling */
38 	double		indtuples;		/* # tuples accepted into index */
39 	Relation	heapRel;		/* heap relation descriptor */
40 } HashBuildState;
41 
42 static void hashbuildCallback(Relation index,
43 				  HeapTuple htup,
44 				  Datum *values,
45 				  bool *isnull,
46 				  bool tupleIsAlive,
47 				  void *state);
48 
49 
50 /*
51  * Hash handler function: return IndexAmRoutine with access method parameters
52  * and callbacks.
53  */
54 Datum
hashhandler(PG_FUNCTION_ARGS)55 hashhandler(PG_FUNCTION_ARGS)
56 {
57 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
58 
59 	amroutine->amstrategies = HTMaxStrategyNumber;
60 	amroutine->amsupport = HASHNProcs;
61 	amroutine->amcanorder = false;
62 	amroutine->amcanorderbyop = false;
63 	amroutine->amcanbackward = true;
64 	amroutine->amcanunique = false;
65 	amroutine->amcanmulticol = false;
66 	amroutine->amoptionalkey = false;
67 	amroutine->amsearcharray = false;
68 	amroutine->amsearchnulls = false;
69 	amroutine->amstorage = false;
70 	amroutine->amclusterable = false;
71 	amroutine->ampredlocks = true;
72 	amroutine->amcanparallel = false;
73 	amroutine->amcaninclude = false;
74 	amroutine->amkeytype = INT4OID;
75 
76 	amroutine->ambuild = hashbuild;
77 	amroutine->ambuildempty = hashbuildempty;
78 	amroutine->aminsert = hashinsert;
79 	amroutine->ambulkdelete = hashbulkdelete;
80 	amroutine->amvacuumcleanup = hashvacuumcleanup;
81 	amroutine->amcanreturn = NULL;
82 	amroutine->amcostestimate = hashcostestimate;
83 	amroutine->amoptions = hashoptions;
84 	amroutine->amproperty = NULL;
85 	amroutine->amvalidate = hashvalidate;
86 	amroutine->ambeginscan = hashbeginscan;
87 	amroutine->amrescan = hashrescan;
88 	amroutine->amgettuple = hashgettuple;
89 	amroutine->amgetbitmap = hashgetbitmap;
90 	amroutine->amendscan = hashendscan;
91 	amroutine->ammarkpos = NULL;
92 	amroutine->amrestrpos = NULL;
93 	amroutine->amestimateparallelscan = NULL;
94 	amroutine->aminitparallelscan = NULL;
95 	amroutine->amparallelrescan = NULL;
96 
97 	PG_RETURN_POINTER(amroutine);
98 }
99 
100 /*
101  *	hashbuild() -- build a new hash index.
102  */
103 IndexBuildResult *
hashbuild(Relation heap,Relation index,IndexInfo * indexInfo)104 hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
105 {
106 	IndexBuildResult *result;
107 	BlockNumber relpages;
108 	double		reltuples;
109 	double		allvisfrac;
110 	uint32		num_buckets;
111 	long		sort_threshold;
112 	HashBuildState buildstate;
113 
114 	/*
115 	 * We expect to be called exactly once for any index relation. If that's
116 	 * not the case, big trouble's what we have.
117 	 */
118 	if (RelationGetNumberOfBlocks(index) != 0)
119 		elog(ERROR, "index \"%s\" already contains data",
120 			 RelationGetRelationName(index));
121 
122 	/* Estimate the number of rows currently present in the table */
123 	estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
124 
125 	/* Initialize the hash index metadata page and initial buckets */
126 	num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
127 
128 	/*
129 	 * If we just insert the tuples into the index in scan order, then
130 	 * (assuming their hash codes are pretty random) there will be no locality
131 	 * of access to the index, and if the index is bigger than available RAM
132 	 * then we'll thrash horribly.  To prevent that scenario, we can sort the
133 	 * tuples by (expected) bucket number.  However, such a sort is useless
134 	 * overhead when the index does fit in RAM.  We choose to sort if the
135 	 * initial index size exceeds maintenance_work_mem, or the number of
136 	 * buffers usable for the index, whichever is less.  (Limiting by the
137 	 * number of buffers should reduce thrashing between PG buffers and kernel
138 	 * buffers, which seems useful even if no physical I/O results.  Limiting
139 	 * by maintenance_work_mem is useful to allow easy testing of the sort
140 	 * code path, and may be useful to DBAs as an additional control knob.)
141 	 *
142 	 * NOTE: this test will need adjustment if a bucket is ever different from
143 	 * one page.  Also, "initial index size" accounting does not include the
144 	 * metapage, nor the first bitmap page.
145 	 */
146 	sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
147 	if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
148 		sort_threshold = Min(sort_threshold, NBuffers);
149 	else
150 		sort_threshold = Min(sort_threshold, NLocBuffer);
151 
152 	if (num_buckets >= (uint32) sort_threshold)
153 		buildstate.spool = _h_spoolinit(heap, index, num_buckets);
154 	else
155 		buildstate.spool = NULL;
156 
157 	/* prepare to build the index */
158 	buildstate.indtuples = 0;
159 	buildstate.heapRel = heap;
160 
161 	/* do the heap scan */
162 	reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
163 								   hashbuildCallback, (void *) &buildstate, NULL);
164 
165 	if (buildstate.spool)
166 	{
167 		/* sort the tuples and insert them into the index */
168 		_h_indexbuild(buildstate.spool, buildstate.heapRel);
169 		_h_spooldestroy(buildstate.spool);
170 	}
171 
172 	/*
173 	 * Return statistics
174 	 */
175 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
176 
177 	result->heap_tuples = reltuples;
178 	result->index_tuples = buildstate.indtuples;
179 
180 	return result;
181 }
182 
183 /*
184  *	hashbuildempty() -- build an empty hash index in the initialization fork
185  */
186 void
hashbuildempty(Relation index)187 hashbuildempty(Relation index)
188 {
189 	_hash_init(index, 0, INIT_FORKNUM);
190 }
191 
192 /*
193  * Per-tuple callback from IndexBuildHeapScan
194  */
195 static void
hashbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * state)196 hashbuildCallback(Relation index,
197 				  HeapTuple htup,
198 				  Datum *values,
199 				  bool *isnull,
200 				  bool tupleIsAlive,
201 				  void *state)
202 {
203 	HashBuildState *buildstate = (HashBuildState *) state;
204 	Datum		index_values[1];
205 	bool		index_isnull[1];
206 	IndexTuple	itup;
207 
208 	/* convert data to a hash key; on failure, do not insert anything */
209 	if (!_hash_convert_tuple(index,
210 							 values, isnull,
211 							 index_values, index_isnull))
212 		return;
213 
214 	/* Either spool the tuple for sorting, or just put it into the index */
215 	if (buildstate->spool)
216 		_h_spool(buildstate->spool, &htup->t_self,
217 				 index_values, index_isnull);
218 	else
219 	{
220 		/* form an index tuple and point it at the heap tuple */
221 		itup = index_form_tuple(RelationGetDescr(index),
222 								index_values, index_isnull);
223 		itup->t_tid = htup->t_self;
224 		_hash_doinsert(index, itup, buildstate->heapRel);
225 		pfree(itup);
226 	}
227 
228 	buildstate->indtuples += 1;
229 }
230 
231 /*
232  *	hashinsert() -- insert an index tuple into a hash table.
233  *
234  *	Hash on the heap tuple's key, form an index tuple with hash code.
235  *	Find the appropriate location for the new tuple, and put it there.
236  */
237 bool
hashinsert(Relation rel,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)238 hashinsert(Relation rel, Datum *values, bool *isnull,
239 		   ItemPointer ht_ctid, Relation heapRel,
240 		   IndexUniqueCheck checkUnique,
241 		   IndexInfo *indexInfo)
242 {
243 	Datum		index_values[1];
244 	bool		index_isnull[1];
245 	IndexTuple	itup;
246 
247 	/* convert data to a hash key; on failure, do not insert anything */
248 	if (!_hash_convert_tuple(rel,
249 							 values, isnull,
250 							 index_values, index_isnull))
251 		return false;
252 
253 	/* form an index tuple and point it at the heap tuple */
254 	itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
255 	itup->t_tid = *ht_ctid;
256 
257 	_hash_doinsert(rel, itup, heapRel);
258 
259 	pfree(itup);
260 
261 	return false;
262 }
263 
264 
265 /*
266  *	hashgettuple() -- Get the next tuple in the scan.
267  */
268 bool
hashgettuple(IndexScanDesc scan,ScanDirection dir)269 hashgettuple(IndexScanDesc scan, ScanDirection dir)
270 {
271 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
272 	bool		res;
273 
274 	/* Hash indexes are always lossy since we store only the hash code */
275 	scan->xs_recheck = true;
276 
277 	/*
278 	 * If we've already initialized this scan, we can just advance it in the
279 	 * appropriate direction.  If we haven't done so yet, we call a routine to
280 	 * get the first item in the scan.
281 	 */
282 	if (!HashScanPosIsValid(so->currPos))
283 		res = _hash_first(scan, dir);
284 	else
285 	{
286 		/*
287 		 * Check to see if we should kill the previously-fetched tuple.
288 		 */
289 		if (scan->kill_prior_tuple)
290 		{
291 			/*
292 			 * Yes, so remember it for later. (We'll deal with all such tuples
293 			 * at once right after leaving the index page or at end of scan.)
294 			 * In case if caller reverses the indexscan direction it is quite
295 			 * possible that the same item might get entered multiple times.
296 			 * But, we don't detect that; instead, we just forget any excess
297 			 * entries.
298 			 */
299 			if (so->killedItems == NULL)
300 				so->killedItems = (int *)
301 					palloc(MaxIndexTuplesPerPage * sizeof(int));
302 
303 			if (so->numKilled < MaxIndexTuplesPerPage)
304 				so->killedItems[so->numKilled++] = so->currPos.itemIndex;
305 		}
306 
307 		/*
308 		 * Now continue the scan.
309 		 */
310 		res = _hash_next(scan, dir);
311 	}
312 
313 	return res;
314 }
315 
316 
317 /*
318  *	hashgetbitmap() -- get all tuples at once
319  */
320 int64
hashgetbitmap(IndexScanDesc scan,TIDBitmap * tbm)321 hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
322 {
323 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
324 	bool		res;
325 	int64		ntids = 0;
326 	HashScanPosItem *currItem;
327 
328 	res = _hash_first(scan, ForwardScanDirection);
329 
330 	while (res)
331 	{
332 		currItem = &so->currPos.items[so->currPos.itemIndex];
333 
334 		/*
335 		 * _hash_first and _hash_next handle eliminate dead index entries
336 		 * whenever scan->ignored_killed_tuples is true.  Therefore, there's
337 		 * nothing to do here except add the results to the TIDBitmap.
338 		 */
339 		tbm_add_tuples(tbm, &(currItem->heapTid), 1, true);
340 		ntids++;
341 
342 		res = _hash_next(scan, ForwardScanDirection);
343 	}
344 
345 	return ntids;
346 }
347 
348 
349 /*
350  *	hashbeginscan() -- start a scan on a hash index
351  */
352 IndexScanDesc
hashbeginscan(Relation rel,int nkeys,int norderbys)353 hashbeginscan(Relation rel, int nkeys, int norderbys)
354 {
355 	IndexScanDesc scan;
356 	HashScanOpaque so;
357 
358 	/* no order by operators allowed */
359 	Assert(norderbys == 0);
360 
361 	scan = RelationGetIndexScan(rel, nkeys, norderbys);
362 
363 	so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
364 	HashScanPosInvalidate(so->currPos);
365 	so->hashso_bucket_buf = InvalidBuffer;
366 	so->hashso_split_bucket_buf = InvalidBuffer;
367 
368 	so->hashso_buc_populated = false;
369 	so->hashso_buc_split = false;
370 
371 	so->killedItems = NULL;
372 	so->numKilled = 0;
373 
374 	scan->opaque = so;
375 
376 	return scan;
377 }
378 
379 /*
380  *	hashrescan() -- rescan an index relation
381  */
382 void
hashrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)383 hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
384 		   ScanKey orderbys, int norderbys)
385 {
386 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
387 	Relation	rel = scan->indexRelation;
388 
389 	if (HashScanPosIsValid(so->currPos))
390 	{
391 		/* Before leaving current page, deal with any killed items */
392 		if (so->numKilled > 0)
393 			_hash_kill_items(scan);
394 	}
395 
396 	_hash_dropscanbuf(rel, so);
397 
398 	/* set position invalid (this will cause _hash_first call) */
399 	HashScanPosInvalidate(so->currPos);
400 
401 	/* Update scan key, if a new one is given */
402 	if (scankey && scan->numberOfKeys > 0)
403 	{
404 		memmove(scan->keyData,
405 				scankey,
406 				scan->numberOfKeys * sizeof(ScanKeyData));
407 	}
408 
409 	so->hashso_buc_populated = false;
410 	so->hashso_buc_split = false;
411 }
412 
413 /*
414  *	hashendscan() -- close down a scan
415  */
416 void
hashendscan(IndexScanDesc scan)417 hashendscan(IndexScanDesc scan)
418 {
419 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
420 	Relation	rel = scan->indexRelation;
421 
422 	if (HashScanPosIsValid(so->currPos))
423 	{
424 		/* Before leaving current page, deal with any killed items */
425 		if (so->numKilled > 0)
426 			_hash_kill_items(scan);
427 	}
428 
429 	_hash_dropscanbuf(rel, so);
430 
431 	if (so->killedItems != NULL)
432 		pfree(so->killedItems);
433 	pfree(so);
434 	scan->opaque = NULL;
435 }
436 
437 /*
438  * Bulk deletion of all index entries pointing to a set of heap tuples.
439  * The set of target tuples is specified via a callback routine that tells
440  * whether any given heap tuple (identified by ItemPointer) is being deleted.
441  *
442  * This function also deletes the tuples that are moved by split to other
443  * bucket.
444  *
445  * Result: a palloc'd struct containing statistical info for VACUUM displays.
446  */
447 IndexBulkDeleteResult *
hashbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)448 hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
449 			   IndexBulkDeleteCallback callback, void *callback_state)
450 {
451 	Relation	rel = info->index;
452 	double		tuples_removed;
453 	double		num_index_tuples;
454 	double		orig_ntuples;
455 	Bucket		orig_maxbucket;
456 	Bucket		cur_maxbucket;
457 	Bucket		cur_bucket;
458 	Buffer		metabuf = InvalidBuffer;
459 	HashMetaPage metap;
460 	HashMetaPage cachedmetap;
461 
462 	tuples_removed = 0;
463 	num_index_tuples = 0;
464 
465 	/*
466 	 * We need a copy of the metapage so that we can use its hashm_spares[]
467 	 * values to compute bucket page addresses, but a cached copy should be
468 	 * good enough.  (If not, we'll detect that further down and refresh the
469 	 * cache as necessary.)
470 	 */
471 	cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
472 	Assert(cachedmetap != NULL);
473 
474 	orig_maxbucket = cachedmetap->hashm_maxbucket;
475 	orig_ntuples = cachedmetap->hashm_ntuples;
476 
477 	/* Scan the buckets that we know exist */
478 	cur_bucket = 0;
479 	cur_maxbucket = orig_maxbucket;
480 
481 loop_top:
482 	while (cur_bucket <= cur_maxbucket)
483 	{
484 		BlockNumber bucket_blkno;
485 		BlockNumber blkno;
486 		Buffer		bucket_buf;
487 		Buffer		buf;
488 		HashPageOpaque bucket_opaque;
489 		Page		page;
490 		bool		split_cleanup = false;
491 
492 		/* Get address of bucket's start page */
493 		bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
494 
495 		blkno = bucket_blkno;
496 
497 		/*
498 		 * We need to acquire a cleanup lock on the primary bucket page to out
499 		 * wait concurrent scans before deleting the dead tuples.
500 		 */
501 		buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
502 		LockBufferForCleanup(buf);
503 		_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
504 
505 		page = BufferGetPage(buf);
506 		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
507 
508 		/*
509 		 * If the bucket contains tuples that are moved by split, then we need
510 		 * to delete such tuples.  We can't delete such tuples if the split
511 		 * operation on bucket is not finished as those are needed by scans.
512 		 */
513 		if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
514 			H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
515 		{
516 			split_cleanup = true;
517 
518 			/*
519 			 * This bucket might have been split since we last held a lock on
520 			 * the metapage.  If so, hashm_maxbucket, hashm_highmask and
521 			 * hashm_lowmask might be old enough to cause us to fail to remove
522 			 * tuples left behind by the most recent split.  To prevent that,
523 			 * now that the primary page of the target bucket has been locked
524 			 * (and thus can't be further split), check whether we need to
525 			 * update our cached metapage data.
526 			 */
527 			Assert(bucket_opaque->hasho_prevblkno != InvalidBlockNumber);
528 			if (bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
529 			{
530 				cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
531 				Assert(cachedmetap != NULL);
532 			}
533 		}
534 
535 		bucket_buf = buf;
536 
537 		hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
538 						  cachedmetap->hashm_maxbucket,
539 						  cachedmetap->hashm_highmask,
540 						  cachedmetap->hashm_lowmask, &tuples_removed,
541 						  &num_index_tuples, split_cleanup,
542 						  callback, callback_state);
543 
544 		_hash_dropbuf(rel, bucket_buf);
545 
546 		/* Advance to next bucket */
547 		cur_bucket++;
548 	}
549 
550 	if (BufferIsInvalid(metabuf))
551 		metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
552 
553 	/* Write-lock metapage and check for split since we started */
554 	LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
555 	metap = HashPageGetMeta(BufferGetPage(metabuf));
556 
557 	if (cur_maxbucket != metap->hashm_maxbucket)
558 	{
559 		/* There's been a split, so process the additional bucket(s) */
560 		LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
561 		cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
562 		Assert(cachedmetap != NULL);
563 		cur_maxbucket = cachedmetap->hashm_maxbucket;
564 		goto loop_top;
565 	}
566 
567 	/* Okay, we're really done.  Update tuple count in metapage. */
568 	START_CRIT_SECTION();
569 
570 	if (orig_maxbucket == metap->hashm_maxbucket &&
571 		orig_ntuples == metap->hashm_ntuples)
572 	{
573 		/*
574 		 * No one has split or inserted anything since start of scan, so
575 		 * believe our count as gospel.
576 		 */
577 		metap->hashm_ntuples = num_index_tuples;
578 	}
579 	else
580 	{
581 		/*
582 		 * Otherwise, our count is untrustworthy since we may have
583 		 * double-scanned tuples in split buckets.  Proceed by dead-reckoning.
584 		 * (Note: we still return estimated_count = false, because using this
585 		 * count is better than not updating reltuples at all.)
586 		 */
587 		if (metap->hashm_ntuples > tuples_removed)
588 			metap->hashm_ntuples -= tuples_removed;
589 		else
590 			metap->hashm_ntuples = 0;
591 		num_index_tuples = metap->hashm_ntuples;
592 	}
593 
594 	MarkBufferDirty(metabuf);
595 
596 	/* XLOG stuff */
597 	if (RelationNeedsWAL(rel))
598 	{
599 		xl_hash_update_meta_page xlrec;
600 		XLogRecPtr	recptr;
601 
602 		xlrec.ntuples = metap->hashm_ntuples;
603 
604 		XLogBeginInsert();
605 		XLogRegisterData((char *) &xlrec, SizeOfHashUpdateMetaPage);
606 
607 		XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
608 
609 		recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
610 		PageSetLSN(BufferGetPage(metabuf), recptr);
611 	}
612 
613 	END_CRIT_SECTION();
614 
615 	_hash_relbuf(rel, metabuf);
616 
617 	/* return statistics */
618 	if (stats == NULL)
619 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
620 	stats->estimated_count = false;
621 	stats->num_index_tuples = num_index_tuples;
622 	stats->tuples_removed += tuples_removed;
623 	/* hashvacuumcleanup will fill in num_pages */
624 
625 	return stats;
626 }
627 
628 /*
629  * Post-VACUUM cleanup.
630  *
631  * Result: a palloc'd struct containing statistical info for VACUUM displays.
632  */
633 IndexBulkDeleteResult *
hashvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)634 hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
635 {
636 	Relation	rel = info->index;
637 	BlockNumber num_pages;
638 
639 	/* If hashbulkdelete wasn't called, return NULL signifying no change */
640 	/* Note: this covers the analyze_only case too */
641 	if (stats == NULL)
642 		return NULL;
643 
644 	/* update statistics */
645 	num_pages = RelationGetNumberOfBlocks(rel);
646 	stats->num_pages = num_pages;
647 
648 	return stats;
649 }
650 
651 /*
652  * Helper function to perform deletion of index entries from a bucket.
653  *
654  * This function expects that the caller has acquired a cleanup lock on the
655  * primary bucket page, and will return with a write lock again held on the
656  * primary bucket page.  The lock won't necessarily be held continuously,
657  * though, because we'll release it when visiting overflow pages.
658  *
659  * There can't be any concurrent scans in progress when we first enter this
660  * function because of the cleanup lock we hold on the primary bucket page,
661  * but as soon as we release that lock, there might be.  If those scans got
662  * ahead of our cleanup scan, they might see a tuple before we kill it and
663  * wake up only after VACUUM has completed and the TID has been recycled for
664  * an unrelated tuple.  To avoid that calamity, we prevent scans from passing
665  * our cleanup scan by locking the next page in the bucket chain before
666  * releasing the lock on the previous page.  (This type of lock chaining is not
667  * ideal, so we might want to look for a better solution at some point.)
668  *
669  * We need to retain a pin on the primary bucket to ensure that no concurrent
670  * split can start.
671  */
672 void
hashbucketcleanup(Relation rel,Bucket cur_bucket,Buffer bucket_buf,BlockNumber bucket_blkno,BufferAccessStrategy bstrategy,uint32 maxbucket,uint32 highmask,uint32 lowmask,double * tuples_removed,double * num_index_tuples,bool split_cleanup,IndexBulkDeleteCallback callback,void * callback_state)673 hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
674 				  BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
675 				  uint32 maxbucket, uint32 highmask, uint32 lowmask,
676 				  double *tuples_removed, double *num_index_tuples,
677 				  bool split_cleanup,
678 				  IndexBulkDeleteCallback callback, void *callback_state)
679 {
680 	BlockNumber blkno;
681 	Buffer		buf;
682 	Bucket		new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
683 	bool		bucket_dirty = false;
684 
685 	blkno = bucket_blkno;
686 	buf = bucket_buf;
687 
688 	if (split_cleanup)
689 		new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
690 														lowmask, maxbucket);
691 
692 	/* Scan each page in bucket */
693 	for (;;)
694 	{
695 		HashPageOpaque opaque;
696 		OffsetNumber offno;
697 		OffsetNumber maxoffno;
698 		Buffer		next_buf;
699 		Page		page;
700 		OffsetNumber deletable[MaxOffsetNumber];
701 		int			ndeletable = 0;
702 		bool		retain_pin = false;
703 		bool		clear_dead_marking = false;
704 
705 		vacuum_delay_point();
706 
707 		page = BufferGetPage(buf);
708 		opaque = (HashPageOpaque) PageGetSpecialPointer(page);
709 
710 		/* Scan each tuple in page */
711 		maxoffno = PageGetMaxOffsetNumber(page);
712 		for (offno = FirstOffsetNumber;
713 			 offno <= maxoffno;
714 			 offno = OffsetNumberNext(offno))
715 		{
716 			ItemPointer htup;
717 			IndexTuple	itup;
718 			Bucket		bucket;
719 			bool		kill_tuple = false;
720 
721 			itup = (IndexTuple) PageGetItem(page,
722 											PageGetItemId(page, offno));
723 			htup = &(itup->t_tid);
724 
725 			/*
726 			 * To remove the dead tuples, we strictly want to rely on results
727 			 * of callback function.  refer btvacuumpage for detailed reason.
728 			 */
729 			if (callback && callback(htup, callback_state))
730 			{
731 				kill_tuple = true;
732 				if (tuples_removed)
733 					*tuples_removed += 1;
734 			}
735 			else if (split_cleanup)
736 			{
737 				/* delete the tuples that are moved by split. */
738 				bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
739 											  maxbucket,
740 											  highmask,
741 											  lowmask);
742 				/* mark the item for deletion */
743 				if (bucket != cur_bucket)
744 				{
745 					/*
746 					 * We expect tuples to either belong to current bucket or
747 					 * new_bucket.  This is ensured because we don't allow
748 					 * further splits from bucket that contains garbage. See
749 					 * comments in _hash_expandtable.
750 					 */
751 					Assert(bucket == new_bucket);
752 					kill_tuple = true;
753 				}
754 			}
755 
756 			if (kill_tuple)
757 			{
758 				/* mark the item for deletion */
759 				deletable[ndeletable++] = offno;
760 			}
761 			else
762 			{
763 				/* we're keeping it, so count it */
764 				if (num_index_tuples)
765 					*num_index_tuples += 1;
766 			}
767 		}
768 
769 		/* retain the pin on primary bucket page till end of bucket scan */
770 		if (blkno == bucket_blkno)
771 			retain_pin = true;
772 		else
773 			retain_pin = false;
774 
775 		blkno = opaque->hasho_nextblkno;
776 
777 		/*
778 		 * Apply deletions, advance to next page and write page if needed.
779 		 */
780 		if (ndeletable > 0)
781 		{
782 			/* No ereport(ERROR) until changes are logged */
783 			START_CRIT_SECTION();
784 
785 			PageIndexMultiDelete(page, deletable, ndeletable);
786 			bucket_dirty = true;
787 
788 			/*
789 			 * Let us mark the page as clean if vacuum removes the DEAD tuples
790 			 * from an index page. We do this by clearing
791 			 * LH_PAGE_HAS_DEAD_TUPLES flag.
792 			 */
793 			if (tuples_removed && *tuples_removed > 0 &&
794 				H_HAS_DEAD_TUPLES(opaque))
795 			{
796 				opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
797 				clear_dead_marking = true;
798 			}
799 
800 			MarkBufferDirty(buf);
801 
802 			/* XLOG stuff */
803 			if (RelationNeedsWAL(rel))
804 			{
805 				xl_hash_delete xlrec;
806 				XLogRecPtr	recptr;
807 
808 				xlrec.clear_dead_marking = clear_dead_marking;
809 				xlrec.is_primary_bucket_page = (buf == bucket_buf) ? true : false;
810 
811 				XLogBeginInsert();
812 				XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
813 
814 				/*
815 				 * bucket buffer needs to be registered to ensure that we can
816 				 * acquire a cleanup lock on it during replay.
817 				 */
818 				if (!xlrec.is_primary_bucket_page)
819 					XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
820 
821 				XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
822 				XLogRegisterBufData(1, (char *) deletable,
823 									ndeletable * sizeof(OffsetNumber));
824 
825 				recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
826 				PageSetLSN(BufferGetPage(buf), recptr);
827 			}
828 
829 			END_CRIT_SECTION();
830 		}
831 
832 		/* bail out if there are no more pages to scan. */
833 		if (!BlockNumberIsValid(blkno))
834 			break;
835 
836 		next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
837 											  LH_OVERFLOW_PAGE,
838 											  bstrategy);
839 
840 		/*
841 		 * release the lock on previous page after acquiring the lock on next
842 		 * page
843 		 */
844 		if (retain_pin)
845 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
846 		else
847 			_hash_relbuf(rel, buf);
848 
849 		buf = next_buf;
850 	}
851 
852 	/*
853 	 * lock the bucket page to clear the garbage flag and squeeze the bucket.
854 	 * if the current buffer is same as bucket buffer, then we already have
855 	 * lock on bucket page.
856 	 */
857 	if (buf != bucket_buf)
858 	{
859 		_hash_relbuf(rel, buf);
860 		LockBuffer(bucket_buf, BUFFER_LOCK_EXCLUSIVE);
861 	}
862 
863 	/*
864 	 * Clear the garbage flag from bucket after deleting the tuples that are
865 	 * moved by split.  We purposefully clear the flag before squeeze bucket,
866 	 * so that after restart, vacuum shouldn't again try to delete the moved
867 	 * by split tuples.
868 	 */
869 	if (split_cleanup)
870 	{
871 		HashPageOpaque bucket_opaque;
872 		Page		page;
873 
874 		page = BufferGetPage(bucket_buf);
875 		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
876 
877 		/* No ereport(ERROR) until changes are logged */
878 		START_CRIT_SECTION();
879 
880 		bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
881 		MarkBufferDirty(bucket_buf);
882 
883 		/* XLOG stuff */
884 		if (RelationNeedsWAL(rel))
885 		{
886 			XLogRecPtr	recptr;
887 
888 			XLogBeginInsert();
889 			XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
890 
891 			recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
892 			PageSetLSN(page, recptr);
893 		}
894 
895 		END_CRIT_SECTION();
896 	}
897 
898 	/*
899 	 * If we have deleted anything, try to compact free space.  For squeezing
900 	 * the bucket, we must have a cleanup lock, else it can impact the
901 	 * ordering of tuples for a scan that has started before it.
902 	 */
903 	if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
904 		_hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
905 							bstrategy);
906 	else
907 		LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);
908 }
909