1 /*-------------------------------------------------------------------------
2 *
3 * hash.c
4 * Implementation of Margo Seltzer's Hashing package for postgres.
5 *
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/access/hash/hash.c
12 *
13 * NOTES
14 * This file contains only the public interface routines.
15 *
16 *-------------------------------------------------------------------------
17 */
18
19 #include "postgres.h"
20
21 #include "access/hash.h"
22 #include "access/hash_xlog.h"
23 #include "access/relscan.h"
24 #include "catalog/index.h"
25 #include "commands/vacuum.h"
26 #include "miscadmin.h"
27 #include "optimizer/plancat.h"
28 #include "utils/builtins.h"
29 #include "utils/index_selfuncs.h"
30 #include "utils/rel.h"
31 #include "miscadmin.h"
32
33
34 /* Working state for hashbuild and its callback */
35 typedef struct
36 {
37 HSpool *spool; /* NULL if not using spooling */
38 double indtuples; /* # tuples accepted into index */
39 Relation heapRel; /* heap relation descriptor */
40 } HashBuildState;
41
42 static void hashbuildCallback(Relation index,
43 HeapTuple htup,
44 Datum *values,
45 bool *isnull,
46 bool tupleIsAlive,
47 void *state);
48
49
50 /*
51 * Hash handler function: return IndexAmRoutine with access method parameters
52 * and callbacks.
53 */
54 Datum
hashhandler(PG_FUNCTION_ARGS)55 hashhandler(PG_FUNCTION_ARGS)
56 {
57 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
58
59 amroutine->amstrategies = HTMaxStrategyNumber;
60 amroutine->amsupport = HASHNProcs;
61 amroutine->amcanorder = false;
62 amroutine->amcanorderbyop = false;
63 amroutine->amcanbackward = true;
64 amroutine->amcanunique = false;
65 amroutine->amcanmulticol = false;
66 amroutine->amoptionalkey = false;
67 amroutine->amsearcharray = false;
68 amroutine->amsearchnulls = false;
69 amroutine->amstorage = false;
70 amroutine->amclusterable = false;
71 amroutine->ampredlocks = true;
72 amroutine->amcanparallel = false;
73 amroutine->amcaninclude = false;
74 amroutine->amkeytype = INT4OID;
75
76 amroutine->ambuild = hashbuild;
77 amroutine->ambuildempty = hashbuildempty;
78 amroutine->aminsert = hashinsert;
79 amroutine->ambulkdelete = hashbulkdelete;
80 amroutine->amvacuumcleanup = hashvacuumcleanup;
81 amroutine->amcanreturn = NULL;
82 amroutine->amcostestimate = hashcostestimate;
83 amroutine->amoptions = hashoptions;
84 amroutine->amproperty = NULL;
85 amroutine->amvalidate = hashvalidate;
86 amroutine->ambeginscan = hashbeginscan;
87 amroutine->amrescan = hashrescan;
88 amroutine->amgettuple = hashgettuple;
89 amroutine->amgetbitmap = hashgetbitmap;
90 amroutine->amendscan = hashendscan;
91 amroutine->ammarkpos = NULL;
92 amroutine->amrestrpos = NULL;
93 amroutine->amestimateparallelscan = NULL;
94 amroutine->aminitparallelscan = NULL;
95 amroutine->amparallelrescan = NULL;
96
97 PG_RETURN_POINTER(amroutine);
98 }
99
100 /*
101 * hashbuild() -- build a new hash index.
102 */
103 IndexBuildResult *
hashbuild(Relation heap,Relation index,IndexInfo * indexInfo)104 hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
105 {
106 IndexBuildResult *result;
107 BlockNumber relpages;
108 double reltuples;
109 double allvisfrac;
110 uint32 num_buckets;
111 long sort_threshold;
112 HashBuildState buildstate;
113
114 /*
115 * We expect to be called exactly once for any index relation. If that's
116 * not the case, big trouble's what we have.
117 */
118 if (RelationGetNumberOfBlocks(index) != 0)
119 elog(ERROR, "index \"%s\" already contains data",
120 RelationGetRelationName(index));
121
122 /* Estimate the number of rows currently present in the table */
123 estimate_rel_size(heap, NULL, &relpages, &reltuples, &allvisfrac);
124
125 /* Initialize the hash index metadata page and initial buckets */
126 num_buckets = _hash_init(index, reltuples, MAIN_FORKNUM);
127
128 /*
129 * If we just insert the tuples into the index in scan order, then
130 * (assuming their hash codes are pretty random) there will be no locality
131 * of access to the index, and if the index is bigger than available RAM
132 * then we'll thrash horribly. To prevent that scenario, we can sort the
133 * tuples by (expected) bucket number. However, such a sort is useless
134 * overhead when the index does fit in RAM. We choose to sort if the
135 * initial index size exceeds maintenance_work_mem, or the number of
136 * buffers usable for the index, whichever is less. (Limiting by the
137 * number of buffers should reduce thrashing between PG buffers and kernel
138 * buffers, which seems useful even if no physical I/O results. Limiting
139 * by maintenance_work_mem is useful to allow easy testing of the sort
140 * code path, and may be useful to DBAs as an additional control knob.)
141 *
142 * NOTE: this test will need adjustment if a bucket is ever different from
143 * one page. Also, "initial index size" accounting does not include the
144 * metapage, nor the first bitmap page.
145 */
146 sort_threshold = (maintenance_work_mem * 1024L) / BLCKSZ;
147 if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
148 sort_threshold = Min(sort_threshold, NBuffers);
149 else
150 sort_threshold = Min(sort_threshold, NLocBuffer);
151
152 if (num_buckets >= (uint32) sort_threshold)
153 buildstate.spool = _h_spoolinit(heap, index, num_buckets);
154 else
155 buildstate.spool = NULL;
156
157 /* prepare to build the index */
158 buildstate.indtuples = 0;
159 buildstate.heapRel = heap;
160
161 /* do the heap scan */
162 reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
163 hashbuildCallback, (void *) &buildstate, NULL);
164
165 if (buildstate.spool)
166 {
167 /* sort the tuples and insert them into the index */
168 _h_indexbuild(buildstate.spool, buildstate.heapRel);
169 _h_spooldestroy(buildstate.spool);
170 }
171
172 /*
173 * Return statistics
174 */
175 result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
176
177 result->heap_tuples = reltuples;
178 result->index_tuples = buildstate.indtuples;
179
180 return result;
181 }
182
183 /*
184 * hashbuildempty() -- build an empty hash index in the initialization fork
185 */
186 void
hashbuildempty(Relation index)187 hashbuildempty(Relation index)
188 {
189 _hash_init(index, 0, INIT_FORKNUM);
190 }
191
192 /*
193 * Per-tuple callback from IndexBuildHeapScan
194 */
195 static void
hashbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * state)196 hashbuildCallback(Relation index,
197 HeapTuple htup,
198 Datum *values,
199 bool *isnull,
200 bool tupleIsAlive,
201 void *state)
202 {
203 HashBuildState *buildstate = (HashBuildState *) state;
204 Datum index_values[1];
205 bool index_isnull[1];
206 IndexTuple itup;
207
208 /* convert data to a hash key; on failure, do not insert anything */
209 if (!_hash_convert_tuple(index,
210 values, isnull,
211 index_values, index_isnull))
212 return;
213
214 /* Either spool the tuple for sorting, or just put it into the index */
215 if (buildstate->spool)
216 _h_spool(buildstate->spool, &htup->t_self,
217 index_values, index_isnull);
218 else
219 {
220 /* form an index tuple and point it at the heap tuple */
221 itup = index_form_tuple(RelationGetDescr(index),
222 index_values, index_isnull);
223 itup->t_tid = htup->t_self;
224 _hash_doinsert(index, itup, buildstate->heapRel);
225 pfree(itup);
226 }
227
228 buildstate->indtuples += 1;
229 }
230
231 /*
232 * hashinsert() -- insert an index tuple into a hash table.
233 *
234 * Hash on the heap tuple's key, form an index tuple with hash code.
235 * Find the appropriate location for the new tuple, and put it there.
236 */
237 bool
hashinsert(Relation rel,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique,IndexInfo * indexInfo)238 hashinsert(Relation rel, Datum *values, bool *isnull,
239 ItemPointer ht_ctid, Relation heapRel,
240 IndexUniqueCheck checkUnique,
241 IndexInfo *indexInfo)
242 {
243 Datum index_values[1];
244 bool index_isnull[1];
245 IndexTuple itup;
246
247 /* convert data to a hash key; on failure, do not insert anything */
248 if (!_hash_convert_tuple(rel,
249 values, isnull,
250 index_values, index_isnull))
251 return false;
252
253 /* form an index tuple and point it at the heap tuple */
254 itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
255 itup->t_tid = *ht_ctid;
256
257 _hash_doinsert(rel, itup, heapRel);
258
259 pfree(itup);
260
261 return false;
262 }
263
264
265 /*
266 * hashgettuple() -- Get the next tuple in the scan.
267 */
268 bool
hashgettuple(IndexScanDesc scan,ScanDirection dir)269 hashgettuple(IndexScanDesc scan, ScanDirection dir)
270 {
271 HashScanOpaque so = (HashScanOpaque) scan->opaque;
272 bool res;
273
274 /* Hash indexes are always lossy since we store only the hash code */
275 scan->xs_recheck = true;
276
277 /*
278 * If we've already initialized this scan, we can just advance it in the
279 * appropriate direction. If we haven't done so yet, we call a routine to
280 * get the first item in the scan.
281 */
282 if (!HashScanPosIsValid(so->currPos))
283 res = _hash_first(scan, dir);
284 else
285 {
286 /*
287 * Check to see if we should kill the previously-fetched tuple.
288 */
289 if (scan->kill_prior_tuple)
290 {
291 /*
292 * Yes, so remember it for later. (We'll deal with all such tuples
293 * at once right after leaving the index page or at end of scan.)
294 * In case if caller reverses the indexscan direction it is quite
295 * possible that the same item might get entered multiple times.
296 * But, we don't detect that; instead, we just forget any excess
297 * entries.
298 */
299 if (so->killedItems == NULL)
300 so->killedItems = (int *)
301 palloc(MaxIndexTuplesPerPage * sizeof(int));
302
303 if (so->numKilled < MaxIndexTuplesPerPage)
304 so->killedItems[so->numKilled++] = so->currPos.itemIndex;
305 }
306
307 /*
308 * Now continue the scan.
309 */
310 res = _hash_next(scan, dir);
311 }
312
313 return res;
314 }
315
316
317 /*
318 * hashgetbitmap() -- get all tuples at once
319 */
320 int64
hashgetbitmap(IndexScanDesc scan,TIDBitmap * tbm)321 hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
322 {
323 HashScanOpaque so = (HashScanOpaque) scan->opaque;
324 bool res;
325 int64 ntids = 0;
326 HashScanPosItem *currItem;
327
328 res = _hash_first(scan, ForwardScanDirection);
329
330 while (res)
331 {
332 currItem = &so->currPos.items[so->currPos.itemIndex];
333
334 /*
335 * _hash_first and _hash_next handle eliminate dead index entries
336 * whenever scan->ignored_killed_tuples is true. Therefore, there's
337 * nothing to do here except add the results to the TIDBitmap.
338 */
339 tbm_add_tuples(tbm, &(currItem->heapTid), 1, true);
340 ntids++;
341
342 res = _hash_next(scan, ForwardScanDirection);
343 }
344
345 return ntids;
346 }
347
348
349 /*
350 * hashbeginscan() -- start a scan on a hash index
351 */
352 IndexScanDesc
hashbeginscan(Relation rel,int nkeys,int norderbys)353 hashbeginscan(Relation rel, int nkeys, int norderbys)
354 {
355 IndexScanDesc scan;
356 HashScanOpaque so;
357
358 /* no order by operators allowed */
359 Assert(norderbys == 0);
360
361 scan = RelationGetIndexScan(rel, nkeys, norderbys);
362
363 so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
364 HashScanPosInvalidate(so->currPos);
365 so->hashso_bucket_buf = InvalidBuffer;
366 so->hashso_split_bucket_buf = InvalidBuffer;
367
368 so->hashso_buc_populated = false;
369 so->hashso_buc_split = false;
370
371 so->killedItems = NULL;
372 so->numKilled = 0;
373
374 scan->opaque = so;
375
376 return scan;
377 }
378
379 /*
380 * hashrescan() -- rescan an index relation
381 */
382 void
hashrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)383 hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
384 ScanKey orderbys, int norderbys)
385 {
386 HashScanOpaque so = (HashScanOpaque) scan->opaque;
387 Relation rel = scan->indexRelation;
388
389 if (HashScanPosIsValid(so->currPos))
390 {
391 /* Before leaving current page, deal with any killed items */
392 if (so->numKilled > 0)
393 _hash_kill_items(scan);
394 }
395
396 _hash_dropscanbuf(rel, so);
397
398 /* set position invalid (this will cause _hash_first call) */
399 HashScanPosInvalidate(so->currPos);
400
401 /* Update scan key, if a new one is given */
402 if (scankey && scan->numberOfKeys > 0)
403 {
404 memmove(scan->keyData,
405 scankey,
406 scan->numberOfKeys * sizeof(ScanKeyData));
407 }
408
409 so->hashso_buc_populated = false;
410 so->hashso_buc_split = false;
411 }
412
413 /*
414 * hashendscan() -- close down a scan
415 */
416 void
hashendscan(IndexScanDesc scan)417 hashendscan(IndexScanDesc scan)
418 {
419 HashScanOpaque so = (HashScanOpaque) scan->opaque;
420 Relation rel = scan->indexRelation;
421
422 if (HashScanPosIsValid(so->currPos))
423 {
424 /* Before leaving current page, deal with any killed items */
425 if (so->numKilled > 0)
426 _hash_kill_items(scan);
427 }
428
429 _hash_dropscanbuf(rel, so);
430
431 if (so->killedItems != NULL)
432 pfree(so->killedItems);
433 pfree(so);
434 scan->opaque = NULL;
435 }
436
437 /*
438 * Bulk deletion of all index entries pointing to a set of heap tuples.
439 * The set of target tuples is specified via a callback routine that tells
440 * whether any given heap tuple (identified by ItemPointer) is being deleted.
441 *
442 * This function also deletes the tuples that are moved by split to other
443 * bucket.
444 *
445 * Result: a palloc'd struct containing statistical info for VACUUM displays.
446 */
447 IndexBulkDeleteResult *
hashbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)448 hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
449 IndexBulkDeleteCallback callback, void *callback_state)
450 {
451 Relation rel = info->index;
452 double tuples_removed;
453 double num_index_tuples;
454 double orig_ntuples;
455 Bucket orig_maxbucket;
456 Bucket cur_maxbucket;
457 Bucket cur_bucket;
458 Buffer metabuf = InvalidBuffer;
459 HashMetaPage metap;
460 HashMetaPage cachedmetap;
461
462 tuples_removed = 0;
463 num_index_tuples = 0;
464
465 /*
466 * We need a copy of the metapage so that we can use its hashm_spares[]
467 * values to compute bucket page addresses, but a cached copy should be
468 * good enough. (If not, we'll detect that further down and refresh the
469 * cache as necessary.)
470 */
471 cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
472 Assert(cachedmetap != NULL);
473
474 orig_maxbucket = cachedmetap->hashm_maxbucket;
475 orig_ntuples = cachedmetap->hashm_ntuples;
476
477 /* Scan the buckets that we know exist */
478 cur_bucket = 0;
479 cur_maxbucket = orig_maxbucket;
480
481 loop_top:
482 while (cur_bucket <= cur_maxbucket)
483 {
484 BlockNumber bucket_blkno;
485 BlockNumber blkno;
486 Buffer bucket_buf;
487 Buffer buf;
488 HashPageOpaque bucket_opaque;
489 Page page;
490 bool split_cleanup = false;
491
492 /* Get address of bucket's start page */
493 bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
494
495 blkno = bucket_blkno;
496
497 /*
498 * We need to acquire a cleanup lock on the primary bucket page to out
499 * wait concurrent scans before deleting the dead tuples.
500 */
501 buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
502 LockBufferForCleanup(buf);
503 _hash_checkpage(rel, buf, LH_BUCKET_PAGE);
504
505 page = BufferGetPage(buf);
506 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
507
508 /*
509 * If the bucket contains tuples that are moved by split, then we need
510 * to delete such tuples. We can't delete such tuples if the split
511 * operation on bucket is not finished as those are needed by scans.
512 */
513 if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
514 H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
515 {
516 split_cleanup = true;
517
518 /*
519 * This bucket might have been split since we last held a lock on
520 * the metapage. If so, hashm_maxbucket, hashm_highmask and
521 * hashm_lowmask might be old enough to cause us to fail to remove
522 * tuples left behind by the most recent split. To prevent that,
523 * now that the primary page of the target bucket has been locked
524 * (and thus can't be further split), check whether we need to
525 * update our cached metapage data.
526 */
527 Assert(bucket_opaque->hasho_prevblkno != InvalidBlockNumber);
528 if (bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
529 {
530 cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
531 Assert(cachedmetap != NULL);
532 }
533 }
534
535 bucket_buf = buf;
536
537 hashbucketcleanup(rel, cur_bucket, bucket_buf, blkno, info->strategy,
538 cachedmetap->hashm_maxbucket,
539 cachedmetap->hashm_highmask,
540 cachedmetap->hashm_lowmask, &tuples_removed,
541 &num_index_tuples, split_cleanup,
542 callback, callback_state);
543
544 _hash_dropbuf(rel, bucket_buf);
545
546 /* Advance to next bucket */
547 cur_bucket++;
548 }
549
550 if (BufferIsInvalid(metabuf))
551 metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
552
553 /* Write-lock metapage and check for split since we started */
554 LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
555 metap = HashPageGetMeta(BufferGetPage(metabuf));
556
557 if (cur_maxbucket != metap->hashm_maxbucket)
558 {
559 /* There's been a split, so process the additional bucket(s) */
560 LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
561 cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
562 Assert(cachedmetap != NULL);
563 cur_maxbucket = cachedmetap->hashm_maxbucket;
564 goto loop_top;
565 }
566
567 /* Okay, we're really done. Update tuple count in metapage. */
568 START_CRIT_SECTION();
569
570 if (orig_maxbucket == metap->hashm_maxbucket &&
571 orig_ntuples == metap->hashm_ntuples)
572 {
573 /*
574 * No one has split or inserted anything since start of scan, so
575 * believe our count as gospel.
576 */
577 metap->hashm_ntuples = num_index_tuples;
578 }
579 else
580 {
581 /*
582 * Otherwise, our count is untrustworthy since we may have
583 * double-scanned tuples in split buckets. Proceed by dead-reckoning.
584 * (Note: we still return estimated_count = false, because using this
585 * count is better than not updating reltuples at all.)
586 */
587 if (metap->hashm_ntuples > tuples_removed)
588 metap->hashm_ntuples -= tuples_removed;
589 else
590 metap->hashm_ntuples = 0;
591 num_index_tuples = metap->hashm_ntuples;
592 }
593
594 MarkBufferDirty(metabuf);
595
596 /* XLOG stuff */
597 if (RelationNeedsWAL(rel))
598 {
599 xl_hash_update_meta_page xlrec;
600 XLogRecPtr recptr;
601
602 xlrec.ntuples = metap->hashm_ntuples;
603
604 XLogBeginInsert();
605 XLogRegisterData((char *) &xlrec, SizeOfHashUpdateMetaPage);
606
607 XLogRegisterBuffer(0, metabuf, REGBUF_STANDARD);
608
609 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_UPDATE_META_PAGE);
610 PageSetLSN(BufferGetPage(metabuf), recptr);
611 }
612
613 END_CRIT_SECTION();
614
615 _hash_relbuf(rel, metabuf);
616
617 /* return statistics */
618 if (stats == NULL)
619 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
620 stats->estimated_count = false;
621 stats->num_index_tuples = num_index_tuples;
622 stats->tuples_removed += tuples_removed;
623 /* hashvacuumcleanup will fill in num_pages */
624
625 return stats;
626 }
627
628 /*
629 * Post-VACUUM cleanup.
630 *
631 * Result: a palloc'd struct containing statistical info for VACUUM displays.
632 */
633 IndexBulkDeleteResult *
hashvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)634 hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
635 {
636 Relation rel = info->index;
637 BlockNumber num_pages;
638
639 /* If hashbulkdelete wasn't called, return NULL signifying no change */
640 /* Note: this covers the analyze_only case too */
641 if (stats == NULL)
642 return NULL;
643
644 /* update statistics */
645 num_pages = RelationGetNumberOfBlocks(rel);
646 stats->num_pages = num_pages;
647
648 return stats;
649 }
650
651 /*
652 * Helper function to perform deletion of index entries from a bucket.
653 *
654 * This function expects that the caller has acquired a cleanup lock on the
655 * primary bucket page, and will return with a write lock again held on the
656 * primary bucket page. The lock won't necessarily be held continuously,
657 * though, because we'll release it when visiting overflow pages.
658 *
659 * There can't be any concurrent scans in progress when we first enter this
660 * function because of the cleanup lock we hold on the primary bucket page,
661 * but as soon as we release that lock, there might be. If those scans got
662 * ahead of our cleanup scan, they might see a tuple before we kill it and
663 * wake up only after VACUUM has completed and the TID has been recycled for
664 * an unrelated tuple. To avoid that calamity, we prevent scans from passing
665 * our cleanup scan by locking the next page in the bucket chain before
666 * releasing the lock on the previous page. (This type of lock chaining is not
667 * ideal, so we might want to look for a better solution at some point.)
668 *
669 * We need to retain a pin on the primary bucket to ensure that no concurrent
670 * split can start.
671 */
672 void
hashbucketcleanup(Relation rel,Bucket cur_bucket,Buffer bucket_buf,BlockNumber bucket_blkno,BufferAccessStrategy bstrategy,uint32 maxbucket,uint32 highmask,uint32 lowmask,double * tuples_removed,double * num_index_tuples,bool split_cleanup,IndexBulkDeleteCallback callback,void * callback_state)673 hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
674 BlockNumber bucket_blkno, BufferAccessStrategy bstrategy,
675 uint32 maxbucket, uint32 highmask, uint32 lowmask,
676 double *tuples_removed, double *num_index_tuples,
677 bool split_cleanup,
678 IndexBulkDeleteCallback callback, void *callback_state)
679 {
680 BlockNumber blkno;
681 Buffer buf;
682 Bucket new_bucket PG_USED_FOR_ASSERTS_ONLY = InvalidBucket;
683 bool bucket_dirty = false;
684
685 blkno = bucket_blkno;
686 buf = bucket_buf;
687
688 if (split_cleanup)
689 new_bucket = _hash_get_newbucket_from_oldbucket(rel, cur_bucket,
690 lowmask, maxbucket);
691
692 /* Scan each page in bucket */
693 for (;;)
694 {
695 HashPageOpaque opaque;
696 OffsetNumber offno;
697 OffsetNumber maxoffno;
698 Buffer next_buf;
699 Page page;
700 OffsetNumber deletable[MaxOffsetNumber];
701 int ndeletable = 0;
702 bool retain_pin = false;
703 bool clear_dead_marking = false;
704
705 vacuum_delay_point();
706
707 page = BufferGetPage(buf);
708 opaque = (HashPageOpaque) PageGetSpecialPointer(page);
709
710 /* Scan each tuple in page */
711 maxoffno = PageGetMaxOffsetNumber(page);
712 for (offno = FirstOffsetNumber;
713 offno <= maxoffno;
714 offno = OffsetNumberNext(offno))
715 {
716 ItemPointer htup;
717 IndexTuple itup;
718 Bucket bucket;
719 bool kill_tuple = false;
720
721 itup = (IndexTuple) PageGetItem(page,
722 PageGetItemId(page, offno));
723 htup = &(itup->t_tid);
724
725 /*
726 * To remove the dead tuples, we strictly want to rely on results
727 * of callback function. refer btvacuumpage for detailed reason.
728 */
729 if (callback && callback(htup, callback_state))
730 {
731 kill_tuple = true;
732 if (tuples_removed)
733 *tuples_removed += 1;
734 }
735 else if (split_cleanup)
736 {
737 /* delete the tuples that are moved by split. */
738 bucket = _hash_hashkey2bucket(_hash_get_indextuple_hashkey(itup),
739 maxbucket,
740 highmask,
741 lowmask);
742 /* mark the item for deletion */
743 if (bucket != cur_bucket)
744 {
745 /*
746 * We expect tuples to either belong to current bucket or
747 * new_bucket. This is ensured because we don't allow
748 * further splits from bucket that contains garbage. See
749 * comments in _hash_expandtable.
750 */
751 Assert(bucket == new_bucket);
752 kill_tuple = true;
753 }
754 }
755
756 if (kill_tuple)
757 {
758 /* mark the item for deletion */
759 deletable[ndeletable++] = offno;
760 }
761 else
762 {
763 /* we're keeping it, so count it */
764 if (num_index_tuples)
765 *num_index_tuples += 1;
766 }
767 }
768
769 /* retain the pin on primary bucket page till end of bucket scan */
770 if (blkno == bucket_blkno)
771 retain_pin = true;
772 else
773 retain_pin = false;
774
775 blkno = opaque->hasho_nextblkno;
776
777 /*
778 * Apply deletions, advance to next page and write page if needed.
779 */
780 if (ndeletable > 0)
781 {
782 /* No ereport(ERROR) until changes are logged */
783 START_CRIT_SECTION();
784
785 PageIndexMultiDelete(page, deletable, ndeletable);
786 bucket_dirty = true;
787
788 /*
789 * Let us mark the page as clean if vacuum removes the DEAD tuples
790 * from an index page. We do this by clearing
791 * LH_PAGE_HAS_DEAD_TUPLES flag.
792 */
793 if (tuples_removed && *tuples_removed > 0 &&
794 H_HAS_DEAD_TUPLES(opaque))
795 {
796 opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
797 clear_dead_marking = true;
798 }
799
800 MarkBufferDirty(buf);
801
802 /* XLOG stuff */
803 if (RelationNeedsWAL(rel))
804 {
805 xl_hash_delete xlrec;
806 XLogRecPtr recptr;
807
808 xlrec.clear_dead_marking = clear_dead_marking;
809 xlrec.is_primary_bucket_page = (buf == bucket_buf) ? true : false;
810
811 XLogBeginInsert();
812 XLogRegisterData((char *) &xlrec, SizeOfHashDelete);
813
814 /*
815 * bucket buffer needs to be registered to ensure that we can
816 * acquire a cleanup lock on it during replay.
817 */
818 if (!xlrec.is_primary_bucket_page)
819 XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD | REGBUF_NO_IMAGE);
820
821 XLogRegisterBuffer(1, buf, REGBUF_STANDARD);
822 XLogRegisterBufData(1, (char *) deletable,
823 ndeletable * sizeof(OffsetNumber));
824
825 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_DELETE);
826 PageSetLSN(BufferGetPage(buf), recptr);
827 }
828
829 END_CRIT_SECTION();
830 }
831
832 /* bail out if there are no more pages to scan. */
833 if (!BlockNumberIsValid(blkno))
834 break;
835
836 next_buf = _hash_getbuf_with_strategy(rel, blkno, HASH_WRITE,
837 LH_OVERFLOW_PAGE,
838 bstrategy);
839
840 /*
841 * release the lock on previous page after acquiring the lock on next
842 * page
843 */
844 if (retain_pin)
845 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
846 else
847 _hash_relbuf(rel, buf);
848
849 buf = next_buf;
850 }
851
852 /*
853 * lock the bucket page to clear the garbage flag and squeeze the bucket.
854 * if the current buffer is same as bucket buffer, then we already have
855 * lock on bucket page.
856 */
857 if (buf != bucket_buf)
858 {
859 _hash_relbuf(rel, buf);
860 LockBuffer(bucket_buf, BUFFER_LOCK_EXCLUSIVE);
861 }
862
863 /*
864 * Clear the garbage flag from bucket after deleting the tuples that are
865 * moved by split. We purposefully clear the flag before squeeze bucket,
866 * so that after restart, vacuum shouldn't again try to delete the moved
867 * by split tuples.
868 */
869 if (split_cleanup)
870 {
871 HashPageOpaque bucket_opaque;
872 Page page;
873
874 page = BufferGetPage(bucket_buf);
875 bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
876
877 /* No ereport(ERROR) until changes are logged */
878 START_CRIT_SECTION();
879
880 bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
881 MarkBufferDirty(bucket_buf);
882
883 /* XLOG stuff */
884 if (RelationNeedsWAL(rel))
885 {
886 XLogRecPtr recptr;
887
888 XLogBeginInsert();
889 XLogRegisterBuffer(0, bucket_buf, REGBUF_STANDARD);
890
891 recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_SPLIT_CLEANUP);
892 PageSetLSN(page, recptr);
893 }
894
895 END_CRIT_SECTION();
896 }
897
898 /*
899 * If we have deleted anything, try to compact free space. For squeezing
900 * the bucket, we must have a cleanup lock, else it can impact the
901 * ordering of tuples for a scan that has started before it.
902 */
903 if (bucket_dirty && IsBufferCleanupOK(bucket_buf))
904 _hash_squeezebucket(rel, cur_bucket, bucket_blkno, bucket_buf,
905 bstrategy);
906 else
907 LockBuffer(bucket_buf, BUFFER_LOCK_UNLOCK);
908 }
909