1 /*-------------------------------------------------------------------------
2  *
3  * nbtree.c
4  *	  Implementation of Lehman and Yao's btree management algorithm for
5  *	  Postgres.
6  *
7  * NOTES
8  *	  This file contains only the public interface routines.
9  *
10  *
11  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  * IDENTIFICATION
15  *	  src/backend/access/nbtree/nbtree.c
16  *
17  *-------------------------------------------------------------------------
18  */
19 #include "postgres.h"
20 
21 #include "access/nbtree.h"
22 #include "access/relscan.h"
23 #include "access/xlog.h"
24 #include "catalog/index.h"
25 #include "commands/vacuum.h"
26 #include "storage/indexfsm.h"
27 #include "storage/ipc.h"
28 #include "storage/lmgr.h"
29 #include "storage/smgr.h"
30 #include "tcop/tcopprot.h"		/* pgrminclude ignore */
31 #include "utils/index_selfuncs.h"
32 #include "utils/memutils.h"
33 
34 
35 /* Working state for btbuild and its callback */
36 typedef struct
37 {
38 	bool		isUnique;
39 	bool		haveDead;
40 	Relation	heapRel;
41 	BTSpool    *spool;
42 
43 	/*
44 	 * spool2 is needed only when the index is a unique index. Dead tuples are
45 	 * put into spool2 instead of spool in order to avoid uniqueness check.
46 	 */
47 	BTSpool    *spool2;
48 	double		indtuples;
49 } BTBuildState;
50 
51 /* Working state needed by btvacuumpage */
52 typedef struct
53 {
54 	IndexVacuumInfo *info;
55 	IndexBulkDeleteResult *stats;
56 	IndexBulkDeleteCallback callback;
57 	void	   *callback_state;
58 	BTCycleId	cycleid;
59 	BlockNumber lastBlockVacuumed;		/* highest blkno actually vacuumed */
60 	BlockNumber lastBlockLocked;	/* highest blkno we've cleanup-locked */
61 	BlockNumber totFreePages;	/* true total # of free pages */
62 	MemoryContext pagedelcontext;
63 } BTVacState;
64 
65 
66 static void btbuildCallback(Relation index,
67 				HeapTuple htup,
68 				Datum *values,
69 				bool *isnull,
70 				bool tupleIsAlive,
71 				void *state);
72 static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
73 			 IndexBulkDeleteCallback callback, void *callback_state,
74 			 BTCycleId cycleid);
75 static void btvacuumpage(BTVacState *vstate, BlockNumber blkno,
76 			 BlockNumber orig_blkno);
77 
78 
79 /*
80  * Btree handler function: return IndexAmRoutine with access method parameters
81  * and callbacks.
82  */
83 Datum
bthandler(PG_FUNCTION_ARGS)84 bthandler(PG_FUNCTION_ARGS)
85 {
86 	IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
87 
88 	amroutine->amstrategies = BTMaxStrategyNumber;
89 	amroutine->amsupport = BTNProcs;
90 	amroutine->amcanorder = true;
91 	amroutine->amcanorderbyop = false;
92 	amroutine->amcanbackward = true;
93 	amroutine->amcanunique = true;
94 	amroutine->amcanmulticol = true;
95 	amroutine->amoptionalkey = true;
96 	amroutine->amsearcharray = true;
97 	amroutine->amsearchnulls = true;
98 	amroutine->amstorage = false;
99 	amroutine->amclusterable = true;
100 	amroutine->ampredlocks = true;
101 	amroutine->amkeytype = InvalidOid;
102 
103 	amroutine->ambuild = btbuild;
104 	amroutine->ambuildempty = btbuildempty;
105 	amroutine->aminsert = btinsert;
106 	amroutine->ambulkdelete = btbulkdelete;
107 	amroutine->amvacuumcleanup = btvacuumcleanup;
108 	amroutine->amcanreturn = btcanreturn;
109 	amroutine->amcostestimate = btcostestimate;
110 	amroutine->amoptions = btoptions;
111 	amroutine->amproperty = btproperty;
112 	amroutine->amvalidate = btvalidate;
113 	amroutine->ambeginscan = btbeginscan;
114 	amroutine->amrescan = btrescan;
115 	amroutine->amgettuple = btgettuple;
116 	amroutine->amgetbitmap = btgetbitmap;
117 	amroutine->amendscan = btendscan;
118 	amroutine->ammarkpos = btmarkpos;
119 	amroutine->amrestrpos = btrestrpos;
120 
121 	PG_RETURN_POINTER(amroutine);
122 }
123 
124 /*
125  *	btbuild() -- build a new btree index.
126  */
127 IndexBuildResult *
btbuild(Relation heap,Relation index,IndexInfo * indexInfo)128 btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
129 {
130 	IndexBuildResult *result;
131 	double		reltuples;
132 	BTBuildState buildstate;
133 
134 	buildstate.isUnique = indexInfo->ii_Unique;
135 	buildstate.haveDead = false;
136 	buildstate.heapRel = heap;
137 	buildstate.spool = NULL;
138 	buildstate.spool2 = NULL;
139 	buildstate.indtuples = 0;
140 
141 #ifdef BTREE_BUILD_STATS
142 	if (log_btree_build_stats)
143 		ResetUsage();
144 #endif   /* BTREE_BUILD_STATS */
145 
146 	/*
147 	 * We expect to be called exactly once for any index relation. If that's
148 	 * not the case, big trouble's what we have.
149 	 */
150 	if (RelationGetNumberOfBlocks(index) != 0)
151 		elog(ERROR, "index \"%s\" already contains data",
152 			 RelationGetRelationName(index));
153 
154 	buildstate.spool = _bt_spoolinit(heap, index, indexInfo->ii_Unique, false);
155 
156 	/*
157 	 * If building a unique index, put dead tuples in a second spool to keep
158 	 * them out of the uniqueness check.
159 	 */
160 	if (indexInfo->ii_Unique)
161 		buildstate.spool2 = _bt_spoolinit(heap, index, false, true);
162 
163 	/* do the heap scan */
164 	reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
165 								   btbuildCallback, (void *) &buildstate);
166 
167 	/* okay, all heap tuples are indexed */
168 	if (buildstate.spool2 && !buildstate.haveDead)
169 	{
170 		/* spool2 turns out to be unnecessary */
171 		_bt_spooldestroy(buildstate.spool2);
172 		buildstate.spool2 = NULL;
173 	}
174 
175 	/*
176 	 * Finish the build by (1) completing the sort of the spool file, (2)
177 	 * inserting the sorted tuples into btree pages and (3) building the upper
178 	 * levels.
179 	 */
180 	_bt_leafbuild(buildstate.spool, buildstate.spool2);
181 	_bt_spooldestroy(buildstate.spool);
182 	if (buildstate.spool2)
183 		_bt_spooldestroy(buildstate.spool2);
184 
185 #ifdef BTREE_BUILD_STATS
186 	if (log_btree_build_stats)
187 	{
188 		ShowUsage("BTREE BUILD STATS");
189 		ResetUsage();
190 	}
191 #endif   /* BTREE_BUILD_STATS */
192 
193 	/*
194 	 * Return statistics
195 	 */
196 	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
197 
198 	result->heap_tuples = reltuples;
199 	result->index_tuples = buildstate.indtuples;
200 
201 	return result;
202 }
203 
204 /*
205  * Per-tuple callback from IndexBuildHeapScan
206  */
207 static void
btbuildCallback(Relation index,HeapTuple htup,Datum * values,bool * isnull,bool tupleIsAlive,void * state)208 btbuildCallback(Relation index,
209 				HeapTuple htup,
210 				Datum *values,
211 				bool *isnull,
212 				bool tupleIsAlive,
213 				void *state)
214 {
215 	BTBuildState *buildstate = (BTBuildState *) state;
216 
217 	/*
218 	 * insert the index tuple into the appropriate spool file for subsequent
219 	 * processing
220 	 */
221 	if (tupleIsAlive || buildstate->spool2 == NULL)
222 		_bt_spool(buildstate->spool, &htup->t_self, values, isnull);
223 	else
224 	{
225 		/* dead tuples are put into spool2 */
226 		buildstate->haveDead = true;
227 		_bt_spool(buildstate->spool2, &htup->t_self, values, isnull);
228 	}
229 
230 	buildstate->indtuples += 1;
231 }
232 
233 /*
234  *	btbuildempty() -- build an empty btree index in the initialization fork
235  */
236 void
btbuildempty(Relation index)237 btbuildempty(Relation index)
238 {
239 	Page		metapage;
240 
241 	/* Construct metapage. */
242 	metapage = (Page) palloc(BLCKSZ);
243 	_bt_initmetapage(metapage, P_NONE, 0);
244 
245 	/*
246 	 * Write the page and log it.  It might seem that an immediate sync
247 	 * would be sufficient to guarantee that the file exists on disk, but
248 	 * recovery itself might remove it while replaying, for example, an
249 	 * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we
250 	 * need this even when wal_level=minimal.
251 	 */
252 	PageSetChecksumInplace(metapage, BTREE_METAPAGE);
253 	smgrwrite(index->rd_smgr, INIT_FORKNUM, BTREE_METAPAGE,
254 			  (char *) metapage, true);
255 	log_newpage(&index->rd_smgr->smgr_rnode.node, INIT_FORKNUM,
256 				BTREE_METAPAGE, metapage, false);
257 
258 	/*
259 	 * An immediate sync is required even if we xlog'd the page, because the
260 	 * write did not go through shared_buffers and therefore a concurrent
261 	 * checkpoint may have moved the redo pointer past our xlog record.
262 	 */
263 	smgrimmedsync(index->rd_smgr, INIT_FORKNUM);
264 }
265 
266 /*
267  *	btinsert() -- insert an index tuple into a btree.
268  *
269  *		Descend the tree recursively, find the appropriate location for our
270  *		new tuple, and put it there.
271  */
272 bool
btinsert(Relation rel,Datum * values,bool * isnull,ItemPointer ht_ctid,Relation heapRel,IndexUniqueCheck checkUnique)273 btinsert(Relation rel, Datum *values, bool *isnull,
274 		 ItemPointer ht_ctid, Relation heapRel,
275 		 IndexUniqueCheck checkUnique)
276 {
277 	bool		result;
278 	IndexTuple	itup;
279 
280 	/* generate an index tuple */
281 	itup = index_form_tuple(RelationGetDescr(rel), values, isnull);
282 	itup->t_tid = *ht_ctid;
283 
284 	result = _bt_doinsert(rel, itup, checkUnique, heapRel);
285 
286 	pfree(itup);
287 
288 	return result;
289 }
290 
291 /*
292  *	btgettuple() -- Get the next tuple in the scan.
293  */
294 bool
btgettuple(IndexScanDesc scan,ScanDirection dir)295 btgettuple(IndexScanDesc scan, ScanDirection dir)
296 {
297 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
298 	bool		res;
299 
300 	/* btree indexes are never lossy */
301 	scan->xs_recheck = false;
302 
303 	/*
304 	 * If we have any array keys, initialize them during first call for a
305 	 * scan.  We can't do this in btrescan because we don't know the scan
306 	 * direction at that time.
307 	 */
308 	if (so->numArrayKeys && !BTScanPosIsValid(so->currPos))
309 	{
310 		/* punt if we have any unsatisfiable array keys */
311 		if (so->numArrayKeys < 0)
312 			return false;
313 
314 		_bt_start_array_keys(scan, dir);
315 	}
316 
317 	/* This loop handles advancing to the next array elements, if any */
318 	do
319 	{
320 		/*
321 		 * If we've already initialized this scan, we can just advance it in
322 		 * the appropriate direction.  If we haven't done so yet, we call
323 		 * _bt_first() to get the first item in the scan.
324 		 */
325 		if (!BTScanPosIsValid(so->currPos))
326 			res = _bt_first(scan, dir);
327 		else
328 		{
329 			/*
330 			 * Check to see if we should kill the previously-fetched tuple.
331 			 */
332 			if (scan->kill_prior_tuple)
333 			{
334 				/*
335 				 * Yes, remember it for later. (We'll deal with all such
336 				 * tuples at once right before leaving the index page.)  The
337 				 * test for numKilled overrun is not just paranoia: if the
338 				 * caller reverses direction in the indexscan then the same
339 				 * item might get entered multiple times. It's not worth
340 				 * trying to optimize that, so we don't detect it, but instead
341 				 * just forget any excess entries.
342 				 */
343 				if (so->killedItems == NULL)
344 					so->killedItems = (int *)
345 						palloc(MaxIndexTuplesPerPage * sizeof(int));
346 				if (so->numKilled < MaxIndexTuplesPerPage)
347 					so->killedItems[so->numKilled++] = so->currPos.itemIndex;
348 			}
349 
350 			/*
351 			 * Now continue the scan.
352 			 */
353 			res = _bt_next(scan, dir);
354 		}
355 
356 		/* If we have a tuple, return it ... */
357 		if (res)
358 			break;
359 		/* ... otherwise see if we have more array keys to deal with */
360 	} while (so->numArrayKeys && _bt_advance_array_keys(scan, dir));
361 
362 	return res;
363 }
364 
365 /*
366  * btgetbitmap() -- gets all matching tuples, and adds them to a bitmap
367  */
368 int64
btgetbitmap(IndexScanDesc scan,TIDBitmap * tbm)369 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
370 {
371 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
372 	int64		ntids = 0;
373 	ItemPointer heapTid;
374 
375 	/*
376 	 * If we have any array keys, initialize them.
377 	 */
378 	if (so->numArrayKeys)
379 	{
380 		/* punt if we have any unsatisfiable array keys */
381 		if (so->numArrayKeys < 0)
382 			return ntids;
383 
384 		_bt_start_array_keys(scan, ForwardScanDirection);
385 	}
386 
387 	/* This loop handles advancing to the next array elements, if any */
388 	do
389 	{
390 		/* Fetch the first page & tuple */
391 		if (_bt_first(scan, ForwardScanDirection))
392 		{
393 			/* Save tuple ID, and continue scanning */
394 			heapTid = &scan->xs_ctup.t_self;
395 			tbm_add_tuples(tbm, heapTid, 1, false);
396 			ntids++;
397 
398 			for (;;)
399 			{
400 				/*
401 				 * Advance to next tuple within page.  This is the same as the
402 				 * easy case in _bt_next().
403 				 */
404 				if (++so->currPos.itemIndex > so->currPos.lastItem)
405 				{
406 					/* let _bt_next do the heavy lifting */
407 					if (!_bt_next(scan, ForwardScanDirection))
408 						break;
409 				}
410 
411 				/* Save tuple ID, and continue scanning */
412 				heapTid = &so->currPos.items[so->currPos.itemIndex].heapTid;
413 				tbm_add_tuples(tbm, heapTid, 1, false);
414 				ntids++;
415 			}
416 		}
417 		/* Now see if we have more array keys to deal with */
418 	} while (so->numArrayKeys && _bt_advance_array_keys(scan, ForwardScanDirection));
419 
420 	return ntids;
421 }
422 
423 /*
424  *	btbeginscan() -- start a scan on a btree index
425  */
426 IndexScanDesc
btbeginscan(Relation rel,int nkeys,int norderbys)427 btbeginscan(Relation rel, int nkeys, int norderbys)
428 {
429 	IndexScanDesc scan;
430 	BTScanOpaque so;
431 
432 	/* no order by operators allowed */
433 	Assert(norderbys == 0);
434 
435 	/* get the scan */
436 	scan = RelationGetIndexScan(rel, nkeys, norderbys);
437 
438 	/* allocate private workspace */
439 	so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
440 	BTScanPosInvalidate(so->currPos);
441 	BTScanPosInvalidate(so->markPos);
442 	if (scan->numberOfKeys > 0)
443 		so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
444 	else
445 		so->keyData = NULL;
446 
447 	so->arrayKeyData = NULL;	/* assume no array keys for now */
448 	so->numArrayKeys = 0;
449 	so->arrayKeys = NULL;
450 	so->arrayContext = NULL;
451 
452 	so->killedItems = NULL;		/* until needed */
453 	so->numKilled = 0;
454 
455 	/*
456 	 * We don't know yet whether the scan will be index-only, so we do not
457 	 * allocate the tuple workspace arrays until btrescan.  However, we set up
458 	 * scan->xs_itupdesc whether we'll need it or not, since that's so cheap.
459 	 */
460 	so->currTuples = so->markTuples = NULL;
461 
462 	scan->xs_itupdesc = RelationGetDescr(rel);
463 
464 	scan->opaque = so;
465 
466 	return scan;
467 }
468 
469 /*
470  *	btrescan() -- rescan an index relation
471  */
472 void
btrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)473 btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
474 		 ScanKey orderbys, int norderbys)
475 {
476 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
477 
478 	/* we aren't holding any read locks, but gotta drop the pins */
479 	if (BTScanPosIsValid(so->currPos))
480 	{
481 		/* Before leaving current page, deal with any killed items */
482 		if (so->numKilled > 0)
483 			_bt_killitems(scan);
484 		BTScanPosUnpinIfPinned(so->currPos);
485 		BTScanPosInvalidate(so->currPos);
486 	}
487 
488 	so->markItemIndex = -1;
489 	BTScanPosUnpinIfPinned(so->markPos);
490 	BTScanPosInvalidate(so->markPos);
491 
492 	/*
493 	 * Allocate tuple workspace arrays, if needed for an index-only scan and
494 	 * not already done in a previous rescan call.  To save on palloc
495 	 * overhead, both workspaces are allocated as one palloc block; only this
496 	 * function and btendscan know that.
497 	 *
498 	 * NOTE: this data structure also makes it safe to return data from a
499 	 * "name" column, even though btree name_ops uses an underlying storage
500 	 * datatype of cstring.  The risk there is that "name" is supposed to be
501 	 * padded to NAMEDATALEN, but the actual index tuple is probably shorter.
502 	 * However, since we only return data out of tuples sitting in the
503 	 * currTuples array, a fetch of NAMEDATALEN bytes can at worst pull some
504 	 * data out of the markTuples array --- running off the end of memory for
505 	 * a SIGSEGV is not possible.  Yeah, this is ugly as sin, but it beats
506 	 * adding special-case treatment for name_ops elsewhere.
507 	 */
508 	if (scan->xs_want_itup && so->currTuples == NULL)
509 	{
510 		so->currTuples = (char *) palloc(BLCKSZ * 2);
511 		so->markTuples = so->currTuples + BLCKSZ;
512 	}
513 
514 	/*
515 	 * Reset the scan keys. Note that keys ordering stuff moved to _bt_first.
516 	 * - vadim 05/05/97
517 	 */
518 	if (scankey && scan->numberOfKeys > 0)
519 		memmove(scan->keyData,
520 				scankey,
521 				scan->numberOfKeys * sizeof(ScanKeyData));
522 	so->numberOfKeys = 0;		/* until _bt_preprocess_keys sets it */
523 
524 	/* If any keys are SK_SEARCHARRAY type, set up array-key info */
525 	_bt_preprocess_array_keys(scan);
526 }
527 
528 /*
529  *	btendscan() -- close down a scan
530  */
531 void
btendscan(IndexScanDesc scan)532 btendscan(IndexScanDesc scan)
533 {
534 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
535 
536 	/* we aren't holding any read locks, but gotta drop the pins */
537 	if (BTScanPosIsValid(so->currPos))
538 	{
539 		/* Before leaving current page, deal with any killed items */
540 		if (so->numKilled > 0)
541 			_bt_killitems(scan);
542 		BTScanPosUnpinIfPinned(so->currPos);
543 	}
544 
545 	so->markItemIndex = -1;
546 	BTScanPosUnpinIfPinned(so->markPos);
547 
548 	/* No need to invalidate positions, the RAM is about to be freed. */
549 
550 	/* Release storage */
551 	if (so->keyData != NULL)
552 		pfree(so->keyData);
553 	/* so->arrayKeyData and so->arrayKeys are in arrayContext */
554 	if (so->arrayContext != NULL)
555 		MemoryContextDelete(so->arrayContext);
556 	if (so->killedItems != NULL)
557 		pfree(so->killedItems);
558 	if (so->currTuples != NULL)
559 		pfree(so->currTuples);
560 	/* so->markTuples should not be pfree'd, see btrescan */
561 	pfree(so);
562 }
563 
564 /*
565  *	btmarkpos() -- save current scan position
566  */
567 void
btmarkpos(IndexScanDesc scan)568 btmarkpos(IndexScanDesc scan)
569 {
570 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
571 
572 	/* There may be an old mark with a pin (but no lock). */
573 	BTScanPosUnpinIfPinned(so->markPos);
574 
575 	/*
576 	 * Just record the current itemIndex.  If we later step to next page
577 	 * before releasing the marked position, _bt_steppage makes a full copy of
578 	 * the currPos struct in markPos.  If (as often happens) the mark is moved
579 	 * before we leave the page, we don't have to do that work.
580 	 */
581 	if (BTScanPosIsValid(so->currPos))
582 		so->markItemIndex = so->currPos.itemIndex;
583 	else
584 	{
585 		BTScanPosInvalidate(so->markPos);
586 		so->markItemIndex = -1;
587 	}
588 
589 	/* Also record the current positions of any array keys */
590 	if (so->numArrayKeys)
591 		_bt_mark_array_keys(scan);
592 }
593 
594 /*
595  *	btrestrpos() -- restore scan to last saved position
596  */
597 void
btrestrpos(IndexScanDesc scan)598 btrestrpos(IndexScanDesc scan)
599 {
600 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
601 
602 	/* Restore the marked positions of any array keys */
603 	if (so->numArrayKeys)
604 		_bt_restore_array_keys(scan);
605 
606 	if (so->markItemIndex >= 0)
607 	{
608 		/*
609 		 * The scan has never moved to a new page since the last mark.  Just
610 		 * restore the itemIndex.
611 		 *
612 		 * NB: In this case we can't count on anything in so->markPos to be
613 		 * accurate.
614 		 */
615 		so->currPos.itemIndex = so->markItemIndex;
616 	}
617 	else
618 	{
619 		/*
620 		 * The scan moved to a new page after last mark or restore, and we are
621 		 * now restoring to the marked page.  We aren't holding any read
622 		 * locks, but if we're still holding the pin for the current position,
623 		 * we must drop it.
624 		 */
625 		if (BTScanPosIsValid(so->currPos))
626 		{
627 			/* Before leaving current page, deal with any killed items */
628 			if (so->numKilled > 0)
629 				_bt_killitems(scan);
630 			BTScanPosUnpinIfPinned(so->currPos);
631 		}
632 
633 		if (BTScanPosIsValid(so->markPos))
634 		{
635 			/* bump pin on mark buffer for assignment to current buffer */
636 			if (BTScanPosIsPinned(so->markPos))
637 				IncrBufferRefCount(so->markPos.buf);
638 			memcpy(&so->currPos, &so->markPos,
639 				   offsetof(BTScanPosData, items[1]) +
640 				   so->markPos.lastItem * sizeof(BTScanPosItem));
641 			if (so->currTuples)
642 				memcpy(so->currTuples, so->markTuples,
643 					   so->markPos.nextTupleOffset);
644 		}
645 		else
646 			BTScanPosInvalidate(so->currPos);
647 	}
648 }
649 
650 /*
651  * Bulk deletion of all index entries pointing to a set of heap tuples.
652  * The set of target tuples is specified via a callback routine that tells
653  * whether any given heap tuple (identified by ItemPointer) is being deleted.
654  *
655  * Result: a palloc'd struct containing statistical info for VACUUM displays.
656  */
657 IndexBulkDeleteResult *
btbulkdelete(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state)658 btbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
659 			 IndexBulkDeleteCallback callback, void *callback_state)
660 {
661 	Relation	rel = info->index;
662 	BTCycleId	cycleid;
663 
664 	/* allocate stats if first time through, else re-use existing struct */
665 	if (stats == NULL)
666 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
667 
668 	/* Establish the vacuum cycle ID to use for this scan */
669 	/* The ENSURE stuff ensures we clean up shared memory on failure */
670 	PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel));
671 	{
672 		cycleid = _bt_start_vacuum(rel);
673 
674 		btvacuumscan(info, stats, callback, callback_state, cycleid);
675 	}
676 	PG_END_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel));
677 	_bt_end_vacuum(rel);
678 
679 	return stats;
680 }
681 
682 /*
683  * Post-VACUUM cleanup.
684  *
685  * Result: a palloc'd struct containing statistical info for VACUUM displays.
686  */
687 IndexBulkDeleteResult *
btvacuumcleanup(IndexVacuumInfo * info,IndexBulkDeleteResult * stats)688 btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
689 {
690 	/* No-op in ANALYZE ONLY mode */
691 	if (info->analyze_only)
692 		return stats;
693 
694 	/*
695 	 * If btbulkdelete was called, we need not do anything, just return the
696 	 * stats from the latest btbulkdelete call.  If it wasn't called, we must
697 	 * still do a pass over the index, to recycle any newly-recyclable pages
698 	 * and to obtain index statistics.
699 	 *
700 	 * Since we aren't going to actually delete any leaf items, there's no
701 	 * need to go through all the vacuum-cycle-ID pushups.
702 	 */
703 	if (stats == NULL)
704 	{
705 		stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
706 		btvacuumscan(info, stats, NULL, NULL, 0);
707 	}
708 
709 	/* Finally, vacuum the FSM */
710 	IndexFreeSpaceMapVacuum(info->index);
711 
712 	/*
713 	 * It's quite possible for us to be fooled by concurrent page splits into
714 	 * double-counting some index tuples, so disbelieve any total that exceeds
715 	 * the underlying heap's count ... if we know that accurately.  Otherwise
716 	 * this might just make matters worse.
717 	 */
718 	if (!info->estimated_count)
719 	{
720 		if (stats->num_index_tuples > info->num_heap_tuples)
721 			stats->num_index_tuples = info->num_heap_tuples;
722 	}
723 
724 	return stats;
725 }
726 
727 /*
728  * btvacuumscan --- scan the index for VACUUMing purposes
729  *
730  * This combines the functions of looking for leaf tuples that are deletable
731  * according to the vacuum callback, looking for empty pages that can be
732  * deleted, and looking for old deleted pages that can be recycled.  Both
733  * btbulkdelete and btvacuumcleanup invoke this (the latter only if no
734  * btbulkdelete call occurred).
735  *
736  * The caller is responsible for initially allocating/zeroing a stats struct
737  * and for obtaining a vacuum cycle ID if necessary.
738  */
739 static void
btvacuumscan(IndexVacuumInfo * info,IndexBulkDeleteResult * stats,IndexBulkDeleteCallback callback,void * callback_state,BTCycleId cycleid)740 btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
741 			 IndexBulkDeleteCallback callback, void *callback_state,
742 			 BTCycleId cycleid)
743 {
744 	Relation	rel = info->index;
745 	BTVacState	vstate;
746 	BlockNumber num_pages;
747 	BlockNumber blkno;
748 	bool		needLock;
749 
750 	/*
751 	 * Reset counts that will be incremented during the scan; needed in case
752 	 * of multiple scans during a single VACUUM command
753 	 */
754 	stats->estimated_count = false;
755 	stats->num_index_tuples = 0;
756 	stats->pages_deleted = 0;
757 
758 	/* Set up info to pass down to btvacuumpage */
759 	vstate.info = info;
760 	vstate.stats = stats;
761 	vstate.callback = callback;
762 	vstate.callback_state = callback_state;
763 	vstate.cycleid = cycleid;
764 	vstate.lastBlockVacuumed = BTREE_METAPAGE;	/* Initialise at first block */
765 	vstate.lastBlockLocked = BTREE_METAPAGE;
766 	vstate.totFreePages = 0;
767 
768 	/* Create a temporary memory context to run _bt_pagedel in */
769 	vstate.pagedelcontext = AllocSetContextCreate(CurrentMemoryContext,
770 												  "_bt_pagedel",
771 												  ALLOCSET_DEFAULT_SIZES);
772 
773 	/*
774 	 * The outer loop iterates over all index pages except the metapage, in
775 	 * physical order (we hope the kernel will cooperate in providing
776 	 * read-ahead for speed).  It is critical that we visit all leaf pages,
777 	 * including ones added after we start the scan, else we might fail to
778 	 * delete some deletable tuples.  Hence, we must repeatedly check the
779 	 * relation length.  We must acquire the relation-extension lock while
780 	 * doing so to avoid a race condition: if someone else is extending the
781 	 * relation, there is a window where bufmgr/smgr have created a new
782 	 * all-zero page but it hasn't yet been write-locked by _bt_getbuf(). If
783 	 * we manage to scan such a page here, we'll improperly assume it can be
784 	 * recycled.  Taking the lock synchronizes things enough to prevent a
785 	 * problem: either num_pages won't include the new page, or _bt_getbuf
786 	 * already has write lock on the buffer and it will be fully initialized
787 	 * before we can examine it.  (See also vacuumlazy.c, which has the same
788 	 * issue.)	Also, we need not worry if a page is added immediately after
789 	 * we look; the page splitting code already has write-lock on the left
790 	 * page before it adds a right page, so we must already have processed any
791 	 * tuples due to be moved into such a page.
792 	 *
793 	 * We can skip locking for new or temp relations, however, since no one
794 	 * else could be accessing them.
795 	 */
796 	needLock = !RELATION_IS_LOCAL(rel);
797 
798 	blkno = BTREE_METAPAGE + 1;
799 	for (;;)
800 	{
801 		/* Get the current relation length */
802 		if (needLock)
803 			LockRelationForExtension(rel, ExclusiveLock);
804 		num_pages = RelationGetNumberOfBlocks(rel);
805 		if (needLock)
806 			UnlockRelationForExtension(rel, ExclusiveLock);
807 
808 		/* Quit if we've scanned the whole relation */
809 		if (blkno >= num_pages)
810 			break;
811 		/* Iterate over pages, then loop back to recheck length */
812 		for (; blkno < num_pages; blkno++)
813 		{
814 			btvacuumpage(&vstate, blkno, blkno);
815 		}
816 	}
817 
818 	/*
819 	 * Check to see if we need to issue one final WAL record for this index,
820 	 * which may be needed for correctness on a hot standby node when non-MVCC
821 	 * index scans could take place.
822 	 *
823 	 * If the WAL is replayed in hot standby, the replay process needs to get
824 	 * cleanup locks on all index leaf pages, just as we've been doing here.
825 	 * However, we won't issue any WAL records about pages that have no items
826 	 * to be deleted.  For pages between pages we've vacuumed, the replay code
827 	 * will take locks under the direction of the lastBlockVacuumed fields in
828 	 * the XLOG_BTREE_VACUUM WAL records.  To cover pages after the last one
829 	 * we vacuum, we need to issue a dummy XLOG_BTREE_VACUUM WAL record
830 	 * against the last leaf page in the index, if that one wasn't vacuumed.
831 	 */
832 	if (XLogStandbyInfoActive() &&
833 		vstate.lastBlockVacuumed < vstate.lastBlockLocked)
834 	{
835 		Buffer		buf;
836 
837 		/*
838 		 * The page should be valid, but we can't use _bt_getbuf() because we
839 		 * want to use a nondefault buffer access strategy.  Since we aren't
840 		 * going to delete any items, getting cleanup lock again is probably
841 		 * overkill, but for consistency do that anyway.
842 		 */
843 		buf = ReadBufferExtended(rel, MAIN_FORKNUM, vstate.lastBlockLocked,
844 								 RBM_NORMAL, info->strategy);
845 		LockBufferForCleanup(buf);
846 		_bt_checkpage(rel, buf);
847 		_bt_delitems_vacuum(rel, buf, NULL, 0, vstate.lastBlockVacuumed);
848 		_bt_relbuf(rel, buf);
849 	}
850 
851 	MemoryContextDelete(vstate.pagedelcontext);
852 
853 	/* update statistics */
854 	stats->num_pages = num_pages;
855 	stats->pages_free = vstate.totFreePages;
856 }
857 
858 /*
859  * btvacuumpage --- VACUUM one page
860  *
861  * This processes a single page for btvacuumscan().  In some cases we
862  * must go back and re-examine previously-scanned pages; this routine
863  * recurses when necessary to handle that case.
864  *
865  * blkno is the page to process.  orig_blkno is the highest block number
866  * reached by the outer btvacuumscan loop (the same as blkno, unless we
867  * are recursing to re-examine a previous page).
868  */
869 static void
btvacuumpage(BTVacState * vstate,BlockNumber blkno,BlockNumber orig_blkno)870 btvacuumpage(BTVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
871 {
872 	IndexVacuumInfo *info = vstate->info;
873 	IndexBulkDeleteResult *stats = vstate->stats;
874 	IndexBulkDeleteCallback callback = vstate->callback;
875 	void	   *callback_state = vstate->callback_state;
876 	Relation	rel = info->index;
877 	bool		delete_now;
878 	BlockNumber recurse_to;
879 	Buffer		buf;
880 	Page		page;
881 	BTPageOpaque opaque = NULL;
882 
883 restart:
884 	delete_now = false;
885 	recurse_to = P_NONE;
886 
887 	/* call vacuum_delay_point while not holding any buffer lock */
888 	vacuum_delay_point();
889 
890 	/*
891 	 * We can't use _bt_getbuf() here because it always applies
892 	 * _bt_checkpage(), which will barf on an all-zero page. We want to
893 	 * recycle all-zero pages, not fail.  Also, we want to use a nondefault
894 	 * buffer access strategy.
895 	 */
896 	buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
897 							 info->strategy);
898 	LockBuffer(buf, BT_READ);
899 	page = BufferGetPage(buf);
900 	if (!PageIsNew(page))
901 	{
902 		_bt_checkpage(rel, buf);
903 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
904 	}
905 
906 	/*
907 	 * If we are recursing, the only case we want to do anything with is a
908 	 * live leaf page having the current vacuum cycle ID.  Any other state
909 	 * implies we already saw the page (eg, deleted it as being empty).
910 	 */
911 	if (blkno != orig_blkno)
912 	{
913 		if (_bt_page_recyclable(page) ||
914 			P_IGNORE(opaque) ||
915 			!P_ISLEAF(opaque) ||
916 			opaque->btpo_cycleid != vstate->cycleid)
917 		{
918 			_bt_relbuf(rel, buf);
919 			return;
920 		}
921 	}
922 
923 	/* Page is valid, see what to do with it */
924 	if (_bt_page_recyclable(page))
925 	{
926 		/* Okay to recycle this page */
927 		RecordFreeIndexPage(rel, blkno);
928 		vstate->totFreePages++;
929 		stats->pages_deleted++;
930 	}
931 	else if (P_ISDELETED(opaque))
932 	{
933 		/* Already deleted, but can't recycle yet */
934 		stats->pages_deleted++;
935 	}
936 	else if (P_ISHALFDEAD(opaque))
937 	{
938 		/* Half-dead, try to delete */
939 		delete_now = true;
940 	}
941 	else if (P_ISLEAF(opaque))
942 	{
943 		OffsetNumber deletable[MaxOffsetNumber];
944 		int			ndeletable;
945 		OffsetNumber offnum,
946 					minoff,
947 					maxoff;
948 
949 		/*
950 		 * Trade in the initial read lock for a super-exclusive write lock on
951 		 * this page.  We must get such a lock on every leaf page over the
952 		 * course of the vacuum scan, whether or not it actually contains any
953 		 * deletable tuples --- see nbtree/README.
954 		 */
955 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
956 		LockBufferForCleanup(buf);
957 
958 		/*
959 		 * Remember highest leaf page number we've taken cleanup lock on; see
960 		 * notes in btvacuumscan
961 		 */
962 		if (blkno > vstate->lastBlockLocked)
963 			vstate->lastBlockLocked = blkno;
964 
965 		/*
966 		 * Check whether we need to recurse back to earlier pages.  What we
967 		 * are concerned about is a page split that happened since we started
968 		 * the vacuum scan.  If the split moved some tuples to a lower page
969 		 * then we might have missed 'em.  If so, set up for tail recursion.
970 		 * (Must do this before possibly clearing btpo_cycleid below!)
971 		 */
972 		if (vstate->cycleid != 0 &&
973 			opaque->btpo_cycleid == vstate->cycleid &&
974 			!(opaque->btpo_flags & BTP_SPLIT_END) &&
975 			!P_RIGHTMOST(opaque) &&
976 			opaque->btpo_next < orig_blkno)
977 			recurse_to = opaque->btpo_next;
978 
979 		/*
980 		 * Scan over all items to see which ones need deleted according to the
981 		 * callback function.
982 		 */
983 		ndeletable = 0;
984 		minoff = P_FIRSTDATAKEY(opaque);
985 		maxoff = PageGetMaxOffsetNumber(page);
986 		if (callback)
987 		{
988 			for (offnum = minoff;
989 				 offnum <= maxoff;
990 				 offnum = OffsetNumberNext(offnum))
991 			{
992 				IndexTuple	itup;
993 				ItemPointer htup;
994 
995 				itup = (IndexTuple) PageGetItem(page,
996 												PageGetItemId(page, offnum));
997 				htup = &(itup->t_tid);
998 
999 				/*
1000 				 * During Hot Standby we currently assume that
1001 				 * XLOG_BTREE_VACUUM records do not produce conflicts. That is
1002 				 * only true as long as the callback function depends only
1003 				 * upon whether the index tuple refers to heap tuples removed
1004 				 * in the initial heap scan. When vacuum starts it derives a
1005 				 * value of OldestXmin. Backends taking later snapshots could
1006 				 * have a RecentGlobalXmin with a later xid than the vacuum's
1007 				 * OldestXmin, so it is possible that row versions deleted
1008 				 * after OldestXmin could be marked as killed by other
1009 				 * backends. The callback function *could* look at the index
1010 				 * tuple state in isolation and decide to delete the index
1011 				 * tuple, though currently it does not. If it ever did, we
1012 				 * would need to reconsider whether XLOG_BTREE_VACUUM records
1013 				 * should cause conflicts. If they did cause conflicts they
1014 				 * would be fairly harsh conflicts, since we haven't yet
1015 				 * worked out a way to pass a useful value for
1016 				 * latestRemovedXid on the XLOG_BTREE_VACUUM records. This
1017 				 * applies to *any* type of index that marks index tuples as
1018 				 * killed.
1019 				 */
1020 				if (callback(htup, callback_state))
1021 					deletable[ndeletable++] = offnum;
1022 			}
1023 		}
1024 
1025 		/*
1026 		 * Apply any needed deletes.  We issue just one _bt_delitems_vacuum()
1027 		 * call per page, so as to minimize WAL traffic.
1028 		 */
1029 		if (ndeletable > 0)
1030 		{
1031 			/*
1032 			 * Notice that the issued XLOG_BTREE_VACUUM WAL record includes
1033 			 * all information to the replay code to allow it to get a cleanup
1034 			 * lock on all pages between the previous lastBlockVacuumed and
1035 			 * this page. This ensures that WAL replay locks all leaf pages at
1036 			 * some point, which is important should non-MVCC scans be
1037 			 * requested. This is currently unused on standby, but we record
1038 			 * it anyway, so that the WAL contains the required information.
1039 			 *
1040 			 * Since we can visit leaf pages out-of-order when recursing,
1041 			 * replay might end up locking such pages an extra time, but it
1042 			 * doesn't seem worth the amount of bookkeeping it'd take to avoid
1043 			 * that.
1044 			 */
1045 			_bt_delitems_vacuum(rel, buf, deletable, ndeletable,
1046 								vstate->lastBlockVacuumed);
1047 
1048 			/*
1049 			 * Remember highest leaf page number we've issued a
1050 			 * XLOG_BTREE_VACUUM WAL record for.
1051 			 */
1052 			if (blkno > vstate->lastBlockVacuumed)
1053 				vstate->lastBlockVacuumed = blkno;
1054 
1055 			stats->tuples_removed += ndeletable;
1056 			/* must recompute maxoff */
1057 			maxoff = PageGetMaxOffsetNumber(page);
1058 		}
1059 		else
1060 		{
1061 			/*
1062 			 * If the page has been split during this vacuum cycle, it seems
1063 			 * worth expending a write to clear btpo_cycleid even if we don't
1064 			 * have any deletions to do.  (If we do, _bt_delitems_vacuum takes
1065 			 * care of this.)  This ensures we won't process the page again.
1066 			 *
1067 			 * We treat this like a hint-bit update because there's no need to
1068 			 * WAL-log it.
1069 			 */
1070 			if (vstate->cycleid != 0 &&
1071 				opaque->btpo_cycleid == vstate->cycleid)
1072 			{
1073 				opaque->btpo_cycleid = 0;
1074 				MarkBufferDirtyHint(buf, true);
1075 			}
1076 		}
1077 
1078 		/*
1079 		 * If it's now empty, try to delete; else count the live tuples. We
1080 		 * don't delete when recursing, though, to avoid putting entries into
1081 		 * freePages out-of-order (doesn't seem worth any extra code to handle
1082 		 * the case).
1083 		 */
1084 		if (minoff > maxoff)
1085 			delete_now = (blkno == orig_blkno);
1086 		else
1087 			stats->num_index_tuples += maxoff - minoff + 1;
1088 	}
1089 
1090 	if (delete_now)
1091 	{
1092 		MemoryContext oldcontext;
1093 		int			ndel;
1094 
1095 		/* Run pagedel in a temp context to avoid memory leakage */
1096 		MemoryContextReset(vstate->pagedelcontext);
1097 		oldcontext = MemoryContextSwitchTo(vstate->pagedelcontext);
1098 
1099 		ndel = _bt_pagedel(rel, buf);
1100 
1101 		/* count only this page, else may double-count parent */
1102 		if (ndel)
1103 			stats->pages_deleted++;
1104 
1105 		MemoryContextSwitchTo(oldcontext);
1106 		/* pagedel released buffer, so we shouldn't */
1107 	}
1108 	else
1109 		_bt_relbuf(rel, buf);
1110 
1111 	/*
1112 	 * This is really tail recursion, but if the compiler is too stupid to
1113 	 * optimize it as such, we'd eat an uncomfortably large amount of stack
1114 	 * space per recursion level (due to the deletable[] array). A failure is
1115 	 * improbable since the number of levels isn't likely to be large ... but
1116 	 * just in case, let's hand-optimize into a loop.
1117 	 */
1118 	if (recurse_to != P_NONE)
1119 	{
1120 		blkno = recurse_to;
1121 		goto restart;
1122 	}
1123 }
1124 
1125 /*
1126  *	btcanreturn() -- Check whether btree indexes support index-only scans.
1127  *
1128  * btrees always do, so this is trivial.
1129  */
1130 bool
btcanreturn(Relation index,int attno)1131 btcanreturn(Relation index, int attno)
1132 {
1133 	return true;
1134 }
1135