1 /*-------------------------------------------------------------------------
2  *
3  * hash.c
4  *	  Implementation of Margo Seltzer's Hashing package for postgres.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/hash/hash.c
12  *
13  * NOTES
14  *	  This file contains only the public interface routines.
15  *
16  *-------------------------------------------------------------------------
17  */
18 
19 #include "postgres.h"
20 
21 #include "access/hash.h"
22 #include "access/relscan.h"
23 #include "catalog/index.h"
24 #include "commands/vacuum.h"
25 #include "miscadmin.h"
26 #include "optimizer/plancat.h"
27 #include "utils/index_selfuncs.h"
28 #include "utils/rel.h"
29 
30 
31 /* Working state for hashbuild and its callback */
32 typedef struct
33 {
34 	HSpool	   *spool;			/* NULL if not using spooling */
35 	double		indtuples;		/* # tuples accepted into index */
36 } HashBuildState;
37 
38 static void hashbuildCallback(Relation index,
39 				  HeapTuple htup,
40 				  Datum *values,
41 				  bool *isnull,
42 				  bool tupleIsAlive,
43 				  void *state);
44 
45 
46 /*
47  * Hash handler function: return IndexAmRoutine with access method parameters
48  * and callbacks.
49  */
50 Datum
hashhandler(PG_FUNCTION_ARGS)51 hashhandler(PG_FUNCTION_ARGS)
52 {
53 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
54 
55 	amroutine->amstrategies = HTMaxStrategyNumber;
56 	amroutine->amsupport = HASHNProcs;
57 	amroutine->amcanorder = false;
58 	amroutine->amcanorderbyop = false;
59 	amroutine->amcanbackward = true;
60 	amroutine->amcanunique = false;
61 	amroutine->amcanmulticol = false;
62 	amroutine->amoptionalkey = false;
63 	amroutine->amsearcharray = false;
64 	amroutine->amsearchnulls = false;
65 	amroutine->amstorage = false;
66 	amroutine->amclusterable = false;
67 	amroutine->ampredlocks = false;
68 	amroutine->amkeytype = INT4OID;
69 
70 	amroutine->ambuild = hashbuild;
71 	amroutine->ambuildempty = hashbuildempty;
72 	amroutine->aminsert = hashinsert;
73 	amroutine->ambulkdelete = hashbulkdelete;
74 	amroutine->amvacuumcleanup = hashvacuumcleanup;
75 	amroutine->amcanreturn = NULL;
76 	amroutine->amcostestimate = hashcostestimate;
77 	amroutine->amoptions = hashoptions;
78 	amroutine->amproperty = NULL;
79 	amroutine->amvalidate = hashvalidate;
80 	amroutine->ambeginscan = hashbeginscan;
81 	amroutine->amrescan = hashrescan;
82 	amroutine->amgettuple = hashgettuple;
83 	amroutine->amgetbitmap = hashgetbitmap;
84 	amroutine->amendscan = hashendscan;
85 	amroutine->ammarkpos = NULL;
86 	amroutine->amrestrpos = NULL;
87 
88 	PG_RETURN_POINTER(amroutine);
89 }
90 
91 /*
92  *	hashbuild() -- build a new hash index.
93  */
94 IndexBuildResult *
hashbuild(Relation heap,Relation index,IndexInfo * indexInfo)95 hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
96 {
97 	IndexBuildResult *result;
98 	BlockNumber relpages;
99 	double		reltuples;
100 	double		allvisfrac;
101 	uint32		num_buckets;
102 	long		sort_threshold;
103 	HashBuildState buildstate;
104 
105 	/*
106 	 * We expect to be called exactly once for any index relation. If that's
107 	 * not the case, big trouble's what we have.
108 	 */
109 	if (RelationGetNumberOfBlocks(index) != 0)
110 		elog(ERROR, "index \"%s\" already contains data",
111 			 RelationGetRelationName(index));
112 
113 	/* Estimate the number of rows currently present in the table */
114 	estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
115 
116 	/* Initialize the hash index metadata page and initial buckets */
117 	num_buckets = _hash_metapinit(index, reltuples, MAIN_FORKNUM);
118 
119 	/*
120 	 * If we just insert the tuples into the index in scan order, then
121 	 * (assuming their hash codes are pretty random) there will be no locality
122 	 * of access to the index, and if the index is bigger than available RAM
123 	 * then we'll thrash horribly.  To prevent that scenario, we can sort the
124 	 * tuples by (expected) bucket number.  However, such a sort is useless
125 	 * overhead when the index does fit in RAM.  We choose to sort if the
126 	 * initial index size exceeds maintenance_work_mem, or the number of
127 	 * buffers usable for the index, whichever is less.  (Limiting by the
128 	 * number of buffers should reduce thrashing between PG buffers and kernel
129 	 * buffers, which seems useful even if no physical I/O results.  Limiting
130 	 * by maintenance_work_mem is useful to allow easy testing of the sort
131 	 * code path, and may be useful to DBAs as an additional control knob.)
132 	 *
133 	 * NOTE: this test will need adjustment if a bucket is ever different from
134 	 * one page.  Also, "initial index size" accounting does not include the
135 	 * metapage, nor the first bitmap page.
136 	 */
137 	sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
138 	if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
139 		sort_threshold = Min(sort_threshold, NBuffers);
140 	else
141 		sort_threshold = Min(sort_threshold, NLocBuffer);
142 
143 	if (num_buckets >= (uint32) sort_threshold)
144 		buildstate.spool = _h_spoolinit(heap, index, num_buckets);
145 	else
146 		buildstate.spool = NULL;
147 
148 	/* prepare to build the index */
149 	buildstate.indtuples = 0;
150 
151 	/* do the heap scan */
152 	reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
153 								   hashbuildCallback, (void *) &buildstate);
154 
155 	if (buildstate.spool)
156 	{
157 		/* sort the tuples and insert them into the index */
158 		_h_indexbuild(buildstate.spool);
159 		_h_spooldestroy(buildstate.spool);
160 	}
161 
162 	/*
163 	 * Return statistics
164 	 */
165 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
166 
167 	result->heap_tuples = reltuples;
168 	result->index_tuples = buildstate.indtuples;
169 
170 	return result;
171 }
172 
173 /*
174  *	hashbuildempty() -- build an empty hash index in the initialization fork
175  */
176 void
hashbuildempty(Relation index)177 hashbuildempty(Relation index)
178 {
179 	_hash_metapinit(index, 0, INIT_FORKNUM);
180 }
181 
182 /*
183  * Per-tuple callback from IndexBuildHeapScan
184  */
185 static void
hashbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * state)186 hashbuildCallback(Relation index,
187 				  HeapTuple htup,
188 				  Datum *values,
189 				  bool *isnull,
190 				  bool tupleIsAlive,
191 				  void *state)
192 {
193 	HashBuildState *buildstate = (HashBuildState *) state;
194 	Datum		index_values[1];
195 	bool		index_isnull[1];
196 	IndexTuple	itup;
197 
198 	/* convert data to a hash key; on failure, do not insert anything */
199 	if (!_hash_convert_tuple(index,
200 							 values, isnull,
201 							 index_values, index_isnull))
202 		return;
203 
204 	/* Either spool the tuple for sorting, or just put it into the index */
205 	if (buildstate->spool)
206 		_h_spool(buildstate->spool, &htup->t_self,
207 				 index_values, index_isnull);
208 	else
209 	{
210 		/* form an index tuple and point it at the heap tuple */
211 		itup = index_form_tuple(RelationGetDescr(index),
212 								index_values, index_isnull);
213 		itup->t_tid = htup->t_self;
214 		_hash_doinsert(index, itup);
215 		pfree(itup);
216 	}
217 
218 	buildstate->indtuples += 1;
219 }
220 
221 /*
222  *	hashinsert() -- insert an index tuple into a hash table.
223  *
224  *	Hash on the heap tuple's key, form an index tuple with hash code.
225  *	Find the appropriate location for the new tuple, and put it there.
226  */
227 bool
hashinsert(Relation rel,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique)228 hashinsert(Relation rel, Datum *values, bool *isnull,
229 		   ItemPointer ht_ctid, Relation heapRel,
230 		   IndexUniqueCheck checkUnique)
231 {
232 	Datum		index_values[1];
233 	bool		index_isnull[1];
234 	IndexTuple	itup;
235 
236 	/* convert data to a hash key; on failure, do not insert anything */
237 	if (!_hash_convert_tuple(rel,
238 							 values, isnull,
239 							 index_values, index_isnull))
240 		return false;
241 
242 	/* form an index tuple and point it at the heap tuple */
243 	itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
244 	itup->t_tid = *ht_ctid;
245 
246 	_hash_doinsert(rel, itup);
247 
248 	pfree(itup);
249 
250 	return false;
251 }
252 
253 
254 /*
255  *	hashgettuple() -- Get the next tuple in the scan.
256  */
257 bool
hashgettuple(IndexScanDesc scan,ScanDirection dir)258 hashgettuple(IndexScanDesc scan, ScanDirection dir)
259 {
260 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
261 	Relation	rel = scan->indexRelation;
262 	Buffer		buf;
263 	Page		page;
264 	OffsetNumber offnum;
265 	ItemPointer current;
266 	bool		res;
267 
268 	/* Hash indexes are always lossy since we store only the hash code */
269 	scan->xs_recheck = true;
270 
271 	/*
272 	 * We hold pin but not lock on current buffer while outside the hash AM.
273 	 * Reacquire the read lock here.
274 	 */
275 	if (BufferIsValid(so->hashso_curbuf))
276 		_hash_chgbufaccess(rel, so->hashso_curbuf, HASH_NOLOCK, HASH_READ);
277 
278 	/*
279 	 * If we've already initialized this scan, we can just advance it in the
280 	 * appropriate direction.  If we haven't done so yet, we call a routine to
281 	 * get the first item in the scan.
282 	 */
283 	current = &(so->hashso_curpos);
284 	if (ItemPointerIsValid(current))
285 	{
286 		/*
287 		 * An insertion into the current index page could have happened while
288 		 * we didn't have read lock on it.  Re-find our position by looking
289 		 * for the TID we previously returned.  (Because we hold share lock on
290 		 * the bucket, no deletions or splits could have occurred; therefore
291 		 * we can expect that the TID still exists in the current index page,
292 		 * at an offset >= where we were.)
293 		 */
294 		OffsetNumber maxoffnum;
295 
296 		buf = so->hashso_curbuf;
297 		Assert(BufferIsValid(buf));
298 		page = BufferGetPage(buf);
299 		maxoffnum = PageGetMaxOffsetNumber(page);
300 		for (offnum = ItemPointerGetOffsetNumber(current);
301 			 offnum <= maxoffnum;
302 			 offnum = OffsetNumberNext(offnum))
303 		{
304 			IndexTuple	itup;
305 
306 			itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
307 			if (ItemPointerEquals(&(so->hashso_heappos), &(itup->t_tid)))
308 				break;
309 		}
310 		if (offnum > maxoffnum)
311 			elog(ERROR, "failed to re-find scan position within index \"%s\"",
312 				 RelationGetRelationName(rel));
313 		ItemPointerSetOffsetNumber(current, offnum);
314 
315 		/*
316 		 * Check to see if we should kill the previously-fetched tuple.
317 		 */
318 		if (scan->kill_prior_tuple)
319 		{
320 			/*
321 			 * Yes, so mark it by setting the LP_DEAD state in the item flags.
322 			 */
323 			ItemIdMarkDead(PageGetItemId(page, offnum));
324 
325 			/*
326 			 * Since this can be redone later if needed, mark as a hint.
327 			 */
328 			MarkBufferDirtyHint(buf, true);
329 		}
330 
331 		/*
332 		 * Now continue the scan.
333 		 */
334 		res = _hash_next(scan, dir);
335 	}
336 	else
337 		res = _hash_first(scan, dir);
338 
339 	/*
340 	 * Skip killed tuples if asked to.
341 	 */
342 	if (scan->ignore_killed_tuples)
343 	{
344 		while (res)
345 		{
346 			offnum = ItemPointerGetOffsetNumber(current);
347 			page = BufferGetPage(so->hashso_curbuf);
348 			if (!ItemIdIsDead(PageGetItemId(page, offnum)))
349 				break;
350 			res = _hash_next(scan, dir);
351 		}
352 	}
353 
354 	/* Release read lock on current buffer, but keep it pinned */
355 	if (BufferIsValid(so->hashso_curbuf))
356 		_hash_chgbufaccess(rel, so->hashso_curbuf, HASH_READ, HASH_NOLOCK);
357 
358 	/* Return current heap TID on success */
359 	scan->xs_ctup.t_self = so->hashso_heappos;
360 
361 	return res;
362 }
363 
364 
365 /*
366  *	hashgetbitmap() -- get all tuples at once
367  */
368 int64
hashgetbitmap(IndexScanDesc scan,TIDBitmap * tbm)369 hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
370 {
371 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
372 	bool		res;
373 	int64		ntids = 0;
374 
375 	res = _hash_first(scan, ForwardScanDirection);
376 
377 	while (res)
378 	{
379 		bool		add_tuple;
380 
381 		/*
382 		 * Skip killed tuples if asked to.
383 		 */
384 		if (scan->ignore_killed_tuples)
385 		{
386 			Page		page;
387 			OffsetNumber offnum;
388 
389 			offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
390 			page = BufferGetPage(so->hashso_curbuf);
391 			add_tuple = !ItemIdIsDead(PageGetItemId(page, offnum));
392 		}
393 		else
394 			add_tuple = true;
395 
396 		/* Save tuple ID, and continue scanning */
397 		if (add_tuple)
398 		{
399 			/* Note we mark the tuple ID as requiring recheck */
400 			tbm_add_tuples(tbm, &(so->hashso_heappos), 1, true);
401 			ntids++;
402 		}
403 
404 		res = _hash_next(scan, ForwardScanDirection);
405 	}
406 
407 	return ntids;
408 }
409 
410 
411 /*
412  *	hashbeginscan() -- start a scan on a hash index
413  */
414 IndexScanDesc
hashbeginscan(Relation rel,int nkeys,int norderbys)415 hashbeginscan(Relation rel, int nkeys, int norderbys)
416 {
417 	IndexScanDesc scan;
418 	HashScanOpaque so;
419 
420 	/* no order by operators allowed */
421 	Assert(norderbys == 0);
422 
423 	scan = RelationGetIndexScan(rel, nkeys, norderbys);
424 
425 	so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
426 	so->hashso_bucket_valid = false;
427 	so->hashso_bucket_blkno = 0;
428 	so->hashso_curbuf = InvalidBuffer;
429 	/* set position invalid (this will cause _hash_first call) */
430 	ItemPointerSetInvalid(&(so->hashso_curpos));
431 	ItemPointerSetInvalid(&(so->hashso_heappos));
432 
433 	scan->opaque = so;
434 
435 	/* register scan in case we change pages it's using */
436 	_hash_regscan(scan);
437 
438 	return scan;
439 }
440 
441 /*
442  *	hashrescan() -- rescan an index relation
443  */
444 void
hashrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)445 hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
446 		   ScanKey orderbys, int norderbys)
447 {
448 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
449 	Relation	rel = scan->indexRelation;
450 
451 	/* release any pin we still hold */
452 	if (BufferIsValid(so->hashso_curbuf))
453 		_hash_dropbuf(rel, so->hashso_curbuf);
454 	so->hashso_curbuf = InvalidBuffer;
455 
456 	/* release lock on bucket, too */
457 	if (so->hashso_bucket_blkno)
458 		_hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
459 	so->hashso_bucket_blkno = 0;
460 
461 	/* set position invalid (this will cause _hash_first call) */
462 	ItemPointerSetInvalid(&(so->hashso_curpos));
463 	ItemPointerSetInvalid(&(so->hashso_heappos));
464 
465 	/* Update scan key, if a new one is given */
466 	if (scankey && scan->numberOfKeys > 0)
467 	{
468 		memmove(scan->keyData,
469 				scankey,
470 				scan->numberOfKeys * sizeof(ScanKeyData));
471 		so->hashso_bucket_valid = false;
472 	}
473 }
474 
475 /*
476  *	hashendscan() -- close down a scan
477  */
478 void
hashendscan(IndexScanDesc scan)479 hashendscan(IndexScanDesc scan)
480 {
481 	HashScanOpaque so = (HashScanOpaque) scan->opaque;
482 	Relation	rel = scan->indexRelation;
483 
484 	/* don't need scan registered anymore */
485 	_hash_dropscan(scan);
486 
487 	/* release any pin we still hold */
488 	if (BufferIsValid(so->hashso_curbuf))
489 		_hash_dropbuf(rel, so->hashso_curbuf);
490 	so->hashso_curbuf = InvalidBuffer;
491 
492 	/* release lock on bucket, too */
493 	if (so->hashso_bucket_blkno)
494 		_hash_droplock(rel, so->hashso_bucket_blkno, HASH_SHARE);
495 	so->hashso_bucket_blkno = 0;
496 
497 	pfree(so);
498 	scan->opaque = NULL;
499 }
500 
501 /*
502  * Bulk deletion of all index entries pointing to a set of heap tuples.
503  * The set of target tuples is specified via a callback routine that tells
504  * whether any given heap tuple (identified by ItemPointer) is being deleted.
505  *
506  * Result: a palloc'd struct containing statistical info for VACUUM displays.
507  */
508 IndexBulkDeleteResult *
hashbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)509 hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
510 			   IndexBulkDeleteCallback callback, void *callback_state)
511 {
512 	Relation	rel = info->index;
513 	double		tuples_removed;
514 	double		num_index_tuples;
515 	double		orig_ntuples;
516 	Bucket		orig_maxbucket;
517 	Bucket		cur_maxbucket;
518 	Bucket		cur_bucket;
519 	Buffer		metabuf;
520 	HashMetaPage metap;
521 	HashMetaPageData local_metapage;
522 
523 	tuples_removed = 0;
524 	num_index_tuples = 0;
525 
526 	/*
527 	 * Read the metapage to fetch original bucket and tuple counts.  Also, we
528 	 * keep a copy of the last-seen metapage so that we can use its
529 	 * hashm_spares[] values to compute bucket page addresses.  This is a bit
530 	 * hokey but perfectly safe, since the interesting entries in the spares
531 	 * array cannot change under us; and it beats rereading the metapage for
532 	 * each bucket.
533 	 */
534 	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
535 	metap = HashPageGetMeta(BufferGetPage(metabuf));
536 	orig_maxbucket = metap->hashm_maxbucket;
537 	orig_ntuples = metap->hashm_ntuples;
538 	memcpy(&local_metapage, metap, sizeof(local_metapage));
539 	_hash_relbuf(rel, metabuf);
540 
541 	/* Scan the buckets that we know exist */
542 	cur_bucket = 0;
543 	cur_maxbucket = orig_maxbucket;
544 
545 loop_top:
546 	while (cur_bucket <= cur_maxbucket)
547 	{
548 		BlockNumber bucket_blkno;
549 		BlockNumber blkno;
550 		bool		bucket_dirty = false;
551 
552 		/* Get address of bucket's start page */
553 		bucket_blkno = BUCKET_TO_BLKNO(&local_metapage, cur_bucket);
554 
555 		/* Exclusive-lock the bucket so we can shrink it */
556 		_hash_getlock(rel, bucket_blkno, HASH_EXCLUSIVE);
557 
558 		/* Shouldn't have any active scans locally, either */
559 		if (_hash_has_active_scan(rel, cur_bucket))
560 			elog(ERROR, "hash index has active scan during VACUUM");
561 
562 		/* Scan each page in bucket */
563 		blkno = bucket_blkno;
564 		while (BlockNumberIsValid(blkno))
565 		{
566 			Buffer		buf;
567 			Page		page;
568 			HashPageOpaque opaque;
569 			OffsetNumber offno;
570 			OffsetNumber maxoffno;
571 			OffsetNumber deletable[MaxOffsetNumber];
572 			int			ndeletable = 0;
573 
574 			vacuum_delay_point();
575 
576 			buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
577 										   LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
578 											 info->strategy);
579 			page = BufferGetPage(buf);
580 			opaque = (HashPageOpaque) PageGetSpecialPointer(page);
581 			Assert(opaque->hasho_bucket == cur_bucket);
582 
583 			/* Scan each tuple in page */
584 			maxoffno = PageGetMaxOffsetNumber(page);
585 			for (offno = FirstOffsetNumber;
586 				 offno <= maxoffno;
587 				 offno = OffsetNumberNext(offno))
588 			{
589 				IndexTuple	itup;
590 				ItemPointer htup;
591 
592 				itup = (IndexTuple) PageGetItem(page,
593 												PageGetItemId(page, offno));
594 				htup = &(itup->t_tid);
595 				if (callback(htup, callback_state))
596 				{
597 					/* mark the item for deletion */
598 					deletable[ndeletable++] = offno;
599 					tuples_removed += 1;
600 				}
601 				else
602 					num_index_tuples += 1;
603 			}
604 
605 			/*
606 			 * Apply deletions and write page if needed, advance to next page.
607 			 */
608 			blkno = opaque->hasho_nextblkno;
609 
610 			if (ndeletable > 0)
611 			{
612 				PageIndexMultiDelete(page, deletable, ndeletable);
613 				_hash_wrtbuf(rel, buf);
614 				bucket_dirty = true;
615 			}
616 			else
617 				_hash_relbuf(rel, buf);
618 		}
619 
620 		/* If we deleted anything, try to compact free space */
621 		if (bucket_dirty)
622 			_hash_squeezebucket(rel, cur_bucket, bucket_blkno,
623 								info->strategy);
624 
625 		/* Release bucket lock */
626 		_hash_droplock(rel, bucket_blkno, HASH_EXCLUSIVE);
627 
628 		/* Advance to next bucket */
629 		cur_bucket++;
630 	}
631 
632 	/* Write-lock metapage and check for split since we started */
633 	metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE, LH_META_PAGE);
634 	metap = HashPageGetMeta(BufferGetPage(metabuf));
635 
636 	if (cur_maxbucket != metap->hashm_maxbucket)
637 	{
638 		/* There's been a split, so process the additional bucket(s) */
639 		cur_maxbucket = metap->hashm_maxbucket;
640 		memcpy(&local_metapage, metap, sizeof(local_metapage));
641 		_hash_relbuf(rel, metabuf);
642 		goto loop_top;
643 	}
644 
645 	/* Okay, we're really done.  Update tuple count in metapage. */
646 
647 	if (orig_maxbucket == metap->hashm_maxbucket &&
648 		orig_ntuples == metap->hashm_ntuples)
649 	{
650 		/*
651 		 * No one has split or inserted anything since start of scan, so
652 		 * believe our count as gospel.
653 		 */
654 		metap->hashm_ntuples = num_index_tuples;
655 	}
656 	else
657 	{
658 		/*
659 		 * Otherwise, our count is untrustworthy since we may have
660 		 * double-scanned tuples in split buckets.  Proceed by dead-reckoning.
661 		 * (Note: we still return estimated_count = false, because using this
662 		 * count is better than not updating reltuples at all.)
663 		 */
664 		if (metap->hashm_ntuples > tuples_removed)
665 			metap->hashm_ntuples -= tuples_removed;
666 		else
667 			metap->hashm_ntuples = 0;
668 		num_index_tuples = metap->hashm_ntuples;
669 	}
670 
671 	_hash_wrtbuf(rel, metabuf);
672 
673 	/* return statistics */
674 	if (stats == NULL)
675 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
676 	stats->estimated_count = false;
677 	stats->num_index_tuples = num_index_tuples;
678 	stats->tuples_removed += tuples_removed;
679 	/* hashvacuumcleanup will fill in num_pages */
680 
681 	return stats;
682 }
683 
684 /*
685  * Post-VACUUM cleanup.
686  *
687  * Result: a palloc'd struct containing statistical info for VACUUM displays.
688  */
689 IndexBulkDeleteResult *
hashvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)690 hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
691 {
692 	Relation	rel = info->index;
693 	BlockNumber num_pages;
694 
695 	/* If hashbulkdelete wasn't called, return NULL signifying no change */
696 	/* Note: this covers the analyze_only case too */
697 	if (stats == NULL)
698 		return NULL;
699 
700 	/* update statistics */
701 	num_pages = RelationGetNumberOfBlocks(rel);
702 	stats->num_pages = num_pages;
703 
704 	return stats;
705 }
706 
707 
708 void
hash_redo(XLogReaderState * record)709 hash_redo(XLogReaderState *record)
710 {
711 	elog(PANIC, "hash_redo: unimplemented");
712 }
713