1 /*-------------------------------------------------------------------------
2 *
3 * spgvacuum.c
4 * vacuum for SP-GiST
5 *
6 *
7 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/spgist/spgvacuum.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/spgist_private.h"
20 #include "access/transam.h"
21 #include "access/xloginsert.h"
22 #include "catalog/storage_xlog.h"
23 #include "commands/vacuum.h"
24 #include "miscadmin.h"
25 #include "storage/bufmgr.h"
26 #include "storage/indexfsm.h"
27 #include "storage/lmgr.h"
28 #include "utils/snapmgr.h"
29
30
31 /* Entry in pending-list of TIDs we need to revisit */
32 typedef struct spgVacPendingItem
33 {
34 ItemPointerData tid; /* redirection target to visit */
35 bool done; /* have we dealt with this? */
36 struct spgVacPendingItem *next; /* list link */
37 } spgVacPendingItem;
38
39 /* Local state for vacuum operations */
40 typedef struct spgBulkDeleteState
41 {
42 /* Parameters passed in to spgvacuumscan */
43 IndexVacuumInfo *info;
44 IndexBulkDeleteResult *stats;
45 IndexBulkDeleteCallback callback;
46 void *callback_state;
47
48 /* Additional working state */
49 SpGistState spgstate; /* for SPGiST operations that need one */
50 spgVacPendingItem *pendingList; /* TIDs we need to (re)visit */
51 TransactionId myXmin; /* for detecting newly-added redirects */
52 BlockNumber lastFilledBlock; /* last non-deletable block */
53 } spgBulkDeleteState;
54
55
56 /*
57 * Add TID to pendingList, but only if not already present.
58 *
59 * Note that new items are always appended at the end of the list; this
60 * ensures that scans of the list don't miss items added during the scan.
61 */
62 static void
spgAddPendingTID(spgBulkDeleteState * bds,ItemPointer tid)63 spgAddPendingTID(spgBulkDeleteState *bds, ItemPointer tid)
64 {
65 spgVacPendingItem *pitem;
66 spgVacPendingItem **listLink;
67
68 /* search the list for pre-existing entry */
69 listLink = &bds->pendingList;
70 while (*listLink != NULL)
71 {
72 pitem = *listLink;
73 if (ItemPointerEquals(tid, &pitem->tid))
74 return; /* already in list, do nothing */
75 listLink = &pitem->next;
76 }
77 /* not there, so append new entry */
78 pitem = (spgVacPendingItem *) palloc(sizeof(spgVacPendingItem));
79 pitem->tid = *tid;
80 pitem->done = false;
81 pitem->next = NULL;
82 *listLink = pitem;
83 }
84
85 /*
86 * Clear pendingList
87 */
88 static void
spgClearPendingList(spgBulkDeleteState * bds)89 spgClearPendingList(spgBulkDeleteState *bds)
90 {
91 spgVacPendingItem *pitem;
92 spgVacPendingItem *nitem;
93
94 for (pitem = bds->pendingList; pitem != NULL; pitem = nitem)
95 {
96 nitem = pitem->next;
97 /* All items in list should have been dealt with */
98 Assert(pitem->done);
99 pfree(pitem);
100 }
101 bds->pendingList = NULL;
102 }
103
104 /*
105 * Vacuum a regular (non-root) leaf page
106 *
107 * We must delete tuples that are targeted for deletion by the VACUUM,
108 * but not move any tuples that are referenced by outside links; we assume
109 * those are the ones that are heads of chains.
110 *
111 * If we find a REDIRECT that was made by a concurrently-running transaction,
112 * we must add its target TID to pendingList. (We don't try to visit the
113 * target immediately, first because we don't want VACUUM locking more than
114 * one buffer at a time, and second because the duplicate-filtering logic
115 * in spgAddPendingTID is useful to ensure we can't get caught in an infinite
116 * loop in the face of continuous concurrent insertions.)
117 *
118 * If forPending is true, we are examining the page as a consequence of
119 * chasing a redirect link, not as part of the normal sequential scan.
120 * We still vacuum the page normally, but we don't increment the stats
121 * about live tuples; else we'd double-count those tuples, since the page
122 * has been or will be visited in the sequential scan as well.
123 */
124 static void
vacuumLeafPage(spgBulkDeleteState * bds,Relation index,Buffer buffer,bool forPending)125 vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
126 bool forPending)
127 {
128 Page page = BufferGetPage(buffer);
129 spgxlogVacuumLeaf xlrec;
130 OffsetNumber toDead[MaxIndexTuplesPerPage];
131 OffsetNumber toPlaceholder[MaxIndexTuplesPerPage];
132 OffsetNumber moveSrc[MaxIndexTuplesPerPage];
133 OffsetNumber moveDest[MaxIndexTuplesPerPage];
134 OffsetNumber chainSrc[MaxIndexTuplesPerPage];
135 OffsetNumber chainDest[MaxIndexTuplesPerPage];
136 OffsetNumber predecessor[MaxIndexTuplesPerPage + 1];
137 bool deletable[MaxIndexTuplesPerPage + 1];
138 int nDeletable;
139 OffsetNumber i,
140 max = PageGetMaxOffsetNumber(page);
141
142 memset(predecessor, 0, sizeof(predecessor));
143 memset(deletable, 0, sizeof(deletable));
144 nDeletable = 0;
145
146 /* Scan page, identify tuples to delete, accumulate stats */
147 for (i = FirstOffsetNumber; i <= max; i++)
148 {
149 SpGistLeafTuple lt;
150
151 lt = (SpGistLeafTuple) PageGetItem(page,
152 PageGetItemId(page, i));
153 if (lt->tupstate == SPGIST_LIVE)
154 {
155 Assert(ItemPointerIsValid(<->heapPtr));
156
157 if (bds->callback(<->heapPtr, bds->callback_state))
158 {
159 bds->stats->tuples_removed += 1;
160 deletable[i] = true;
161 nDeletable++;
162 }
163 else
164 {
165 if (!forPending)
166 bds->stats->num_index_tuples += 1;
167 }
168
169 /* Form predecessor map, too */
170 if (lt->nextOffset != InvalidOffsetNumber)
171 {
172 /* paranoia about corrupted chain links */
173 if (lt->nextOffset < FirstOffsetNumber ||
174 lt->nextOffset > max ||
175 predecessor[lt->nextOffset] != InvalidOffsetNumber)
176 elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
177 BufferGetBlockNumber(buffer),
178 RelationGetRelationName(index));
179 predecessor[lt->nextOffset] = i;
180 }
181 }
182 else if (lt->tupstate == SPGIST_REDIRECT)
183 {
184 SpGistDeadTuple dt = (SpGistDeadTuple) lt;
185
186 Assert(dt->nextOffset == InvalidOffsetNumber);
187 Assert(ItemPointerIsValid(&dt->pointer));
188
189 /*
190 * Add target TID to pending list if the redirection could have
191 * happened since VACUUM started.
192 *
193 * Note: we could make a tighter test by seeing if the xid is
194 * "running" according to the active snapshot; but tqual.c doesn't
195 * currently export a suitable API, and it's not entirely clear
196 * that a tighter test is worth the cycles anyway.
197 */
198 if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin))
199 spgAddPendingTID(bds, &dt->pointer);
200 }
201 else
202 {
203 Assert(lt->nextOffset == InvalidOffsetNumber);
204 }
205 }
206
207 if (nDeletable == 0)
208 return; /* nothing more to do */
209
210 /*----------
211 * Figure out exactly what we have to do. We do this separately from
212 * actually modifying the page, mainly so that we have a representation
213 * that can be dumped into WAL and then the replay code can do exactly
214 * the same thing. The output of this step consists of six arrays
215 * describing four kinds of operations, to be performed in this order:
216 *
217 * toDead[]: tuple numbers to be replaced with DEAD tuples
218 * toPlaceholder[]: tuple numbers to be replaced with PLACEHOLDER tuples
219 * moveSrc[]: tuple numbers that need to be relocated to another offset
220 * (replacing the tuple there) and then replaced with PLACEHOLDER tuples
221 * moveDest[]: new locations for moveSrc tuples
222 * chainSrc[]: tuple numbers whose chain links (nextOffset) need updates
223 * chainDest[]: new values of nextOffset for chainSrc members
224 *
225 * It's easiest to figure out what we have to do by processing tuple
226 * chains, so we iterate over all the tuples (not just the deletable
227 * ones!) to identify chain heads, then chase down each chain and make
228 * work item entries for deletable tuples within the chain.
229 *----------
230 */
231 xlrec.nDead = xlrec.nPlaceholder = xlrec.nMove = xlrec.nChain = 0;
232
233 for (i = FirstOffsetNumber; i <= max; i++)
234 {
235 SpGistLeafTuple head;
236 bool interveningDeletable;
237 OffsetNumber prevLive;
238 OffsetNumber j;
239
240 head = (SpGistLeafTuple) PageGetItem(page,
241 PageGetItemId(page, i));
242 if (head->tupstate != SPGIST_LIVE)
243 continue; /* can't be a chain member */
244 if (predecessor[i] != 0)
245 continue; /* not a chain head */
246
247 /* initialize ... */
248 interveningDeletable = false;
249 prevLive = deletable[i] ? InvalidOffsetNumber : i;
250
251 /* scan down the chain ... */
252 j = head->nextOffset;
253 while (j != InvalidOffsetNumber)
254 {
255 SpGistLeafTuple lt;
256
257 lt = (SpGistLeafTuple) PageGetItem(page,
258 PageGetItemId(page, j));
259 if (lt->tupstate != SPGIST_LIVE)
260 {
261 /* all tuples in chain should be live */
262 elog(ERROR, "unexpected SPGiST tuple state: %d",
263 lt->tupstate);
264 }
265
266 if (deletable[j])
267 {
268 /* This tuple should be replaced by a placeholder */
269 toPlaceholder[xlrec.nPlaceholder] = j;
270 xlrec.nPlaceholder++;
271 /* previous live tuple's chain link will need an update */
272 interveningDeletable = true;
273 }
274 else if (prevLive == InvalidOffsetNumber)
275 {
276 /*
277 * This is the first live tuple in the chain. It has to move
278 * to the head position.
279 */
280 moveSrc[xlrec.nMove] = j;
281 moveDest[xlrec.nMove] = i;
282 xlrec.nMove++;
283 /* Chain updates will be applied after the move */
284 prevLive = i;
285 interveningDeletable = false;
286 }
287 else
288 {
289 /*
290 * Second or later live tuple. Arrange to re-chain it to the
291 * previous live one, if there was a gap.
292 */
293 if (interveningDeletable)
294 {
295 chainSrc[xlrec.nChain] = prevLive;
296 chainDest[xlrec.nChain] = j;
297 xlrec.nChain++;
298 }
299 prevLive = j;
300 interveningDeletable = false;
301 }
302
303 j = lt->nextOffset;
304 }
305
306 if (prevLive == InvalidOffsetNumber)
307 {
308 /* The chain is entirely removable, so we need a DEAD tuple */
309 toDead[xlrec.nDead] = i;
310 xlrec.nDead++;
311 }
312 else if (interveningDeletable)
313 {
314 /* One or more deletions at end of chain, so close it off */
315 chainSrc[xlrec.nChain] = prevLive;
316 chainDest[xlrec.nChain] = InvalidOffsetNumber;
317 xlrec.nChain++;
318 }
319 }
320
321 /* sanity check ... */
322 if (nDeletable != xlrec.nDead + xlrec.nPlaceholder + xlrec.nMove)
323 elog(ERROR, "inconsistent counts of deletable tuples");
324
325 /* Do the updates */
326 START_CRIT_SECTION();
327
328 spgPageIndexMultiDelete(&bds->spgstate, page,
329 toDead, xlrec.nDead,
330 SPGIST_DEAD, SPGIST_DEAD,
331 InvalidBlockNumber, InvalidOffsetNumber);
332
333 spgPageIndexMultiDelete(&bds->spgstate, page,
334 toPlaceholder, xlrec.nPlaceholder,
335 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
336 InvalidBlockNumber, InvalidOffsetNumber);
337
338 /*
339 * We implement the move step by swapping the item pointers of the source
340 * and target tuples, then replacing the newly-source tuples with
341 * placeholders. This is perhaps unduly friendly with the page data
342 * representation, but it's fast and doesn't risk page overflow when a
343 * tuple to be relocated is large.
344 */
345 for (i = 0; i < xlrec.nMove; i++)
346 {
347 ItemId idSrc = PageGetItemId(page, moveSrc[i]);
348 ItemId idDest = PageGetItemId(page, moveDest[i]);
349 ItemIdData tmp;
350
351 tmp = *idSrc;
352 *idSrc = *idDest;
353 *idDest = tmp;
354 }
355
356 spgPageIndexMultiDelete(&bds->spgstate, page,
357 moveSrc, xlrec.nMove,
358 SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
359 InvalidBlockNumber, InvalidOffsetNumber);
360
361 for (i = 0; i < xlrec.nChain; i++)
362 {
363 SpGistLeafTuple lt;
364
365 lt = (SpGistLeafTuple) PageGetItem(page,
366 PageGetItemId(page, chainSrc[i]));
367 Assert(lt->tupstate == SPGIST_LIVE);
368 lt->nextOffset = chainDest[i];
369 }
370
371 MarkBufferDirty(buffer);
372
373 if (RelationNeedsWAL(index))
374 {
375 XLogRecPtr recptr;
376
377 XLogBeginInsert();
378
379 STORE_STATE(&bds->spgstate, xlrec.stateSrc);
380
381 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumLeaf);
382 /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
383 XLogRegisterData((char *) toDead, sizeof(OffsetNumber) * xlrec.nDead);
384 XLogRegisterData((char *) toPlaceholder, sizeof(OffsetNumber) * xlrec.nPlaceholder);
385 XLogRegisterData((char *) moveSrc, sizeof(OffsetNumber) * xlrec.nMove);
386 XLogRegisterData((char *) moveDest, sizeof(OffsetNumber) * xlrec.nMove);
387 XLogRegisterData((char *) chainSrc, sizeof(OffsetNumber) * xlrec.nChain);
388 XLogRegisterData((char *) chainDest, sizeof(OffsetNumber) * xlrec.nChain);
389
390 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
391
392 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_LEAF);
393
394 PageSetLSN(page, recptr);
395 }
396
397 END_CRIT_SECTION();
398 }
399
400 /*
401 * Vacuum a root page when it is also a leaf
402 *
403 * On the root, we just delete any dead leaf tuples; no fancy business
404 */
405 static void
vacuumLeafRoot(spgBulkDeleteState * bds,Relation index,Buffer buffer)406 vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer)
407 {
408 Page page = BufferGetPage(buffer);
409 spgxlogVacuumRoot xlrec;
410 OffsetNumber toDelete[MaxIndexTuplesPerPage];
411 OffsetNumber i,
412 max = PageGetMaxOffsetNumber(page);
413
414 xlrec.nDelete = 0;
415
416 /* Scan page, identify tuples to delete, accumulate stats */
417 for (i = FirstOffsetNumber; i <= max; i++)
418 {
419 SpGistLeafTuple lt;
420
421 lt = (SpGistLeafTuple) PageGetItem(page,
422 PageGetItemId(page, i));
423 if (lt->tupstate == SPGIST_LIVE)
424 {
425 Assert(ItemPointerIsValid(<->heapPtr));
426
427 if (bds->callback(<->heapPtr, bds->callback_state))
428 {
429 bds->stats->tuples_removed += 1;
430 toDelete[xlrec.nDelete] = i;
431 xlrec.nDelete++;
432 }
433 else
434 {
435 bds->stats->num_index_tuples += 1;
436 }
437 }
438 else
439 {
440 /* all tuples on root should be live */
441 elog(ERROR, "unexpected SPGiST tuple state: %d",
442 lt->tupstate);
443 }
444 }
445
446 if (xlrec.nDelete == 0)
447 return; /* nothing more to do */
448
449 /* Do the update */
450 START_CRIT_SECTION();
451
452 /* The tuple numbers are in order, so we can use PageIndexMultiDelete */
453 PageIndexMultiDelete(page, toDelete, xlrec.nDelete);
454
455 MarkBufferDirty(buffer);
456
457 if (RelationNeedsWAL(index))
458 {
459 XLogRecPtr recptr;
460
461 XLogBeginInsert();
462
463 /* Prepare WAL record */
464 STORE_STATE(&bds->spgstate, xlrec.stateSrc);
465
466 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRoot);
467 /* sizeof(xlrec) should be a multiple of sizeof(OffsetNumber) */
468 XLogRegisterData((char *) toDelete,
469 sizeof(OffsetNumber) * xlrec.nDelete);
470
471 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
472
473 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_ROOT);
474
475 PageSetLSN(page, recptr);
476 }
477
478 END_CRIT_SECTION();
479 }
480
481 /*
482 * Clean up redirect and placeholder tuples on the given page
483 *
484 * Redirect tuples can be marked placeholder once they're old enough.
485 * Placeholder tuples can be removed if it won't change the offsets of
486 * non-placeholder ones.
487 *
488 * Unlike the routines above, this works on both leaf and inner pages.
489 */
490 static void
vacuumRedirectAndPlaceholder(Relation index,Buffer buffer)491 vacuumRedirectAndPlaceholder(Relation index, Buffer buffer)
492 {
493 Page page = BufferGetPage(buffer);
494 SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
495 OffsetNumber i,
496 max = PageGetMaxOffsetNumber(page),
497 firstPlaceholder = InvalidOffsetNumber;
498 bool hasNonPlaceholder = false;
499 bool hasUpdate = false;
500 OffsetNumber itemToPlaceholder[MaxIndexTuplesPerPage];
501 OffsetNumber itemnos[MaxIndexTuplesPerPage];
502 spgxlogVacuumRedirect xlrec;
503
504 xlrec.nToPlaceholder = 0;
505 xlrec.newestRedirectXid = InvalidTransactionId;
506
507 START_CRIT_SECTION();
508
509 /*
510 * Scan backwards to convert old redirection tuples to placeholder tuples,
511 * and identify location of last non-placeholder tuple while at it.
512 */
513 for (i = max;
514 i >= FirstOffsetNumber &&
515 (opaque->nRedirection > 0 || !hasNonPlaceholder);
516 i--)
517 {
518 SpGistDeadTuple dt;
519
520 dt = (SpGistDeadTuple) PageGetItem(page, PageGetItemId(page, i));
521
522 if (dt->tupstate == SPGIST_REDIRECT &&
523 TransactionIdPrecedes(dt->xid, RecentGlobalXmin))
524 {
525 dt->tupstate = SPGIST_PLACEHOLDER;
526 Assert(opaque->nRedirection > 0);
527 opaque->nRedirection--;
528 opaque->nPlaceholder++;
529
530 /* remember newest XID among the removed redirects */
531 if (!TransactionIdIsValid(xlrec.newestRedirectXid) ||
532 TransactionIdPrecedes(xlrec.newestRedirectXid, dt->xid))
533 xlrec.newestRedirectXid = dt->xid;
534
535 ItemPointerSetInvalid(&dt->pointer);
536
537 itemToPlaceholder[xlrec.nToPlaceholder] = i;
538 xlrec.nToPlaceholder++;
539
540 hasUpdate = true;
541 }
542
543 if (dt->tupstate == SPGIST_PLACEHOLDER)
544 {
545 if (!hasNonPlaceholder)
546 firstPlaceholder = i;
547 }
548 else
549 {
550 hasNonPlaceholder = true;
551 }
552 }
553
554 /*
555 * Any placeholder tuples at the end of page can safely be removed. We
556 * can't remove ones before the last non-placeholder, though, because we
557 * can't alter the offset numbers of non-placeholder tuples.
558 */
559 if (firstPlaceholder != InvalidOffsetNumber)
560 {
561 /*
562 * We do not store this array to rdata because it's easy to recreate.
563 */
564 for (i = firstPlaceholder; i <= max; i++)
565 itemnos[i - firstPlaceholder] = i;
566
567 i = max - firstPlaceholder + 1;
568 Assert(opaque->nPlaceholder >= i);
569 opaque->nPlaceholder -= i;
570
571 /* The array is surely sorted, so can use PageIndexMultiDelete */
572 PageIndexMultiDelete(page, itemnos, i);
573
574 hasUpdate = true;
575 }
576
577 xlrec.firstPlaceholder = firstPlaceholder;
578
579 if (hasUpdate)
580 MarkBufferDirty(buffer);
581
582 if (hasUpdate && RelationNeedsWAL(index))
583 {
584 XLogRecPtr recptr;
585
586 XLogBeginInsert();
587
588 XLogRegisterData((char *) &xlrec, SizeOfSpgxlogVacuumRedirect);
589 XLogRegisterData((char *) itemToPlaceholder,
590 sizeof(OffsetNumber) * xlrec.nToPlaceholder);
591
592 XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
593
594 recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_VACUUM_REDIRECT);
595
596 PageSetLSN(page, recptr);
597 }
598
599 END_CRIT_SECTION();
600 }
601
602 /*
603 * Process one page during a bulkdelete scan
604 */
605 static void
spgvacuumpage(spgBulkDeleteState * bds,BlockNumber blkno)606 spgvacuumpage(spgBulkDeleteState *bds, BlockNumber blkno)
607 {
608 Relation index = bds->info->index;
609 Buffer buffer;
610 Page page;
611
612 /* call vacuum_delay_point while not holding any buffer lock */
613 vacuum_delay_point();
614
615 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
616 RBM_NORMAL, bds->info->strategy);
617 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
618 page = (Page) BufferGetPage(buffer);
619
620 if (PageIsNew(page))
621 {
622 /*
623 * We found an all-zero page, which could happen if the database
624 * crashed just after extending the file. Recycle it.
625 */
626 }
627 else if (PageIsEmpty(page))
628 {
629 /* nothing to do */
630 }
631 else if (SpGistPageIsLeaf(page))
632 {
633 if (SpGistBlockIsRoot(blkno))
634 {
635 vacuumLeafRoot(bds, index, buffer);
636 /* no need for vacuumRedirectAndPlaceholder */
637 }
638 else
639 {
640 vacuumLeafPage(bds, index, buffer, false);
641 vacuumRedirectAndPlaceholder(index, buffer);
642 }
643 }
644 else
645 {
646 /* inner page */
647 vacuumRedirectAndPlaceholder(index, buffer);
648 }
649
650 /*
651 * The root pages must never be deleted, nor marked as available in FSM,
652 * because we don't want them ever returned by a search for a place to put
653 * a new tuple. Otherwise, check for empty page, and make sure the FSM
654 * knows about it.
655 */
656 if (!SpGistBlockIsRoot(blkno))
657 {
658 if (PageIsNew(page) || PageIsEmpty(page))
659 {
660 RecordFreeIndexPage(index, blkno);
661 bds->stats->pages_deleted++;
662 }
663 else
664 {
665 SpGistSetLastUsedPage(index, buffer);
666 bds->lastFilledBlock = blkno;
667 }
668 }
669
670 UnlockReleaseBuffer(buffer);
671 }
672
673 /*
674 * Process the pending-TID list between pages of the main scan
675 */
676 static void
spgprocesspending(spgBulkDeleteState * bds)677 spgprocesspending(spgBulkDeleteState *bds)
678 {
679 Relation index = bds->info->index;
680 spgVacPendingItem *pitem;
681 spgVacPendingItem *nitem;
682 BlockNumber blkno;
683 Buffer buffer;
684 Page page;
685
686 for (pitem = bds->pendingList; pitem != NULL; pitem = pitem->next)
687 {
688 if (pitem->done)
689 continue; /* ignore already-done items */
690
691 /* call vacuum_delay_point while not holding any buffer lock */
692 vacuum_delay_point();
693
694 /* examine the referenced page */
695 blkno = ItemPointerGetBlockNumber(&pitem->tid);
696 buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
697 RBM_NORMAL, bds->info->strategy);
698 LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
699 page = (Page) BufferGetPage(buffer);
700
701 if (PageIsNew(page) || SpGistPageIsDeleted(page))
702 {
703 /* Probably shouldn't happen, but ignore it */
704 }
705 else if (SpGistPageIsLeaf(page))
706 {
707 if (SpGistBlockIsRoot(blkno))
708 {
709 /* this should definitely not happen */
710 elog(ERROR, "redirection leads to root page of index \"%s\"",
711 RelationGetRelationName(index));
712 }
713
714 /* deal with any deletable tuples */
715 vacuumLeafPage(bds, index, buffer, true);
716 /* might as well do this while we are here */
717 vacuumRedirectAndPlaceholder(index, buffer);
718
719 SpGistSetLastUsedPage(index, buffer);
720
721 /*
722 * We can mark as done not only this item, but any later ones
723 * pointing at the same page, since we vacuumed the whole page.
724 */
725 pitem->done = true;
726 for (nitem = pitem->next; nitem != NULL; nitem = nitem->next)
727 {
728 if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
729 nitem->done = true;
730 }
731 }
732 else
733 {
734 /*
735 * On an inner page, visit the referenced inner tuple and add all
736 * its downlinks to the pending list. We might have pending items
737 * for more than one inner tuple on the same page (in fact this is
738 * pretty likely given the way space allocation works), so get
739 * them all while we are here.
740 */
741 for (nitem = pitem; nitem != NULL; nitem = nitem->next)
742 {
743 if (nitem->done)
744 continue;
745 if (ItemPointerGetBlockNumber(&nitem->tid) == blkno)
746 {
747 OffsetNumber offset;
748 SpGistInnerTuple innerTuple;
749
750 offset = ItemPointerGetOffsetNumber(&nitem->tid);
751 innerTuple = (SpGistInnerTuple) PageGetItem(page,
752 PageGetItemId(page, offset));
753 if (innerTuple->tupstate == SPGIST_LIVE)
754 {
755 SpGistNodeTuple node;
756 int i;
757
758 SGITITERATE(innerTuple, i, node)
759 {
760 if (ItemPointerIsValid(&node->t_tid))
761 spgAddPendingTID(bds, &node->t_tid);
762 }
763 }
764 else if (innerTuple->tupstate == SPGIST_REDIRECT)
765 {
766 /* transfer attention to redirect point */
767 spgAddPendingTID(bds,
768 &((SpGistDeadTuple) innerTuple)->pointer);
769 }
770 else
771 elog(ERROR, "unexpected SPGiST tuple state: %d",
772 innerTuple->tupstate);
773
774 nitem->done = true;
775 }
776 }
777 }
778
779 UnlockReleaseBuffer(buffer);
780 }
781
782 spgClearPendingList(bds);
783 }
784
785 /*
786 * Perform a bulkdelete scan
787 */
788 static void
spgvacuumscan(spgBulkDeleteState * bds)789 spgvacuumscan(spgBulkDeleteState *bds)
790 {
791 Relation index = bds->info->index;
792 bool needLock;
793 BlockNumber num_pages,
794 blkno;
795
796 /* Finish setting up spgBulkDeleteState */
797 initSpGistState(&bds->spgstate, index);
798 bds->pendingList = NULL;
799 bds->myXmin = GetActiveSnapshot()->xmin;
800 bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO;
801
802 /*
803 * Reset counts that will be incremented during the scan; needed in case
804 * of multiple scans during a single VACUUM command
805 */
806 bds->stats->estimated_count = false;
807 bds->stats->num_index_tuples = 0;
808 bds->stats->pages_deleted = 0;
809
810 /* We can skip locking for new or temp relations */
811 needLock = !RELATION_IS_LOCAL(index);
812
813 /*
814 * The outer loop iterates over all index pages except the metapage, in
815 * physical order (we hope the kernel will cooperate in providing
816 * read-ahead for speed). It is critical that we visit all leaf pages,
817 * including ones added after we start the scan, else we might fail to
818 * delete some deletable tuples. See more extensive comments about this
819 * in btvacuumscan().
820 */
821 blkno = SPGIST_METAPAGE_BLKNO + 1;
822 for (;;)
823 {
824 /* Get the current relation length */
825 if (needLock)
826 LockRelationForExtension(index, ExclusiveLock);
827 num_pages = RelationGetNumberOfBlocks(index);
828 if (needLock)
829 UnlockRelationForExtension(index, ExclusiveLock);
830
831 /* Quit if we've scanned the whole relation */
832 if (blkno >= num_pages)
833 break;
834 /* Iterate over pages, then loop back to recheck length */
835 for (; blkno < num_pages; blkno++)
836 {
837 spgvacuumpage(bds, blkno);
838 /* empty the pending-list after each page */
839 if (bds->pendingList != NULL)
840 spgprocesspending(bds);
841 }
842 }
843
844 /* Propagate local lastUsedPage cache to metablock */
845 SpGistUpdateMetaPage(index);
846
847 /*
848 * Truncate index if possible
849 *
850 * XXX disabled because it's unsafe due to possible concurrent inserts.
851 * We'd have to rescan the pages to make sure they're still empty, and it
852 * doesn't seem worth it. Note that btree doesn't do this either.
853 *
854 * Another reason not to truncate is that it could invalidate the cached
855 * pages-with-freespace pointers in the metapage and other backends'
856 * relation caches, that is leave them pointing to nonexistent pages.
857 * Adding RelationGetNumberOfBlocks calls to protect the places that use
858 * those pointers would be unduly expensive.
859 */
860 #ifdef NOT_USED
861 if (num_pages > bds->lastFilledBlock + 1)
862 {
863 BlockNumber lastBlock = num_pages - 1;
864
865 num_pages = bds->lastFilledBlock + 1;
866 RelationTruncate(index, num_pages);
867 bds->stats->pages_removed += lastBlock - bds->lastFilledBlock;
868 bds->stats->pages_deleted -= lastBlock - bds->lastFilledBlock;
869 }
870 #endif
871
872 /* Report final stats */
873 bds->stats->num_pages = num_pages;
874 bds->stats->pages_free = bds->stats->pages_deleted;
875 }
876
877 /*
878 * Bulk deletion of all index entries pointing to a set of heap tuples.
879 * The set of target tuples is specified via a callback routine that tells
880 * whether any given heap tuple (identified by ItemPointer) is being deleted.
881 *
882 * Result: a palloc'd struct containing statistical info for VACUUM displays.
883 */
884 IndexBulkDeleteResult *
spgbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)885 spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
886 IndexBulkDeleteCallback callback, void *callback_state)
887 {
888 spgBulkDeleteState bds;
889
890 /* allocate stats if first time through, else re-use existing struct */
891 if (stats == NULL)
892 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
893 bds.info = info;
894 bds.stats = stats;
895 bds.callback = callback;
896 bds.callback_state = callback_state;
897
898 spgvacuumscan(&bds);
899
900 return stats;
901 }
902
903 /* Dummy callback to delete no tuples during spgvacuumcleanup */
904 static bool
dummy_callback(ItemPointer itemptr,void * state)905 dummy_callback(ItemPointer itemptr, void *state)
906 {
907 return false;
908 }
909
910 /*
911 * Post-VACUUM cleanup.
912 *
913 * Result: a palloc'd struct containing statistical info for VACUUM displays.
914 */
915 IndexBulkDeleteResult *
spgvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)916 spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
917 {
918 Relation index = info->index;
919 spgBulkDeleteState bds;
920
921 /* No-op in ANALYZE ONLY mode */
922 if (info->analyze_only)
923 return stats;
924
925 /*
926 * We don't need to scan the index if there was a preceding bulkdelete
927 * pass. Otherwise, make a pass that won't delete any live tuples, but
928 * might still accomplish useful stuff with redirect/placeholder cleanup,
929 * and in any case will provide stats.
930 */
931 if (stats == NULL)
932 {
933 stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
934 bds.info = info;
935 bds.stats = stats;
936 bds.callback = dummy_callback;
937 bds.callback_state = NULL;
938
939 spgvacuumscan(&bds);
940 }
941
942 /* Finally, vacuum the FSM */
943 IndexFreeSpaceMapVacuum(index);
944
945 /*
946 * It's quite possible for us to be fooled by concurrent tuple moves into
947 * double-counting some index tuples, so disbelieve any total that exceeds
948 * the underlying heap's count ... if we know that accurately. Otherwise
949 * this might just make matters worse.
950 */
951 if (!info->estimated_count)
952 {
953 if (stats->num_index_tuples > info->num_heap_tuples)
954 stats->num_index_tuples = info->num_heap_tuples;
955 }
956
957 return stats;
958 }
959