1 /*-------------------------------------------------------------------------
2  *
3  * nbtutils.c
4  *	  Utility code for Postgres btree implementation.
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/nbtree/nbtutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include <time.h>
19 
20 #include "access/nbtree.h"
21 #include "access/reloptions.h"
22 #include "access/relscan.h"
23 #include "miscadmin.h"
24 #include "utils/array.h"
25 #include "utils/lsyscache.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28 
29 
30 typedef struct BTSortArrayContext
31 {
32 	FmgrInfo	flinfo;
33 	Oid			collation;
34 	bool		reverse;
35 } BTSortArrayContext;
36 
37 static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
38 						 StrategyNumber strat,
39 						 Datum *elems, int nelems);
40 static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
41 						bool reverse,
42 						Datum *elems, int nelems);
43 static int	_bt_compare_array_elements(const void *a, const void *b, void *arg);
44 static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
45 						 ScanKey leftarg, ScanKey rightarg,
46 						 bool *result);
47 static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption);
48 static void _bt_mark_scankey_required(ScanKey skey);
49 static bool _bt_check_rowcompare(ScanKey skey,
50 					 IndexTuple tuple, TupleDesc tupdesc,
51 					 ScanDirection dir, bool *continuescan);
52 
53 
54 /*
55  * _bt_mkscankey
56  *		Build an insertion scan key that contains comparison data from itup
57  *		as well as comparator routines appropriate to the key datatypes.
58  *
59  *		The result is intended for use with _bt_compare().
60  */
61 ScanKey
_bt_mkscankey(Relation rel,IndexTuple itup)62 _bt_mkscankey(Relation rel, IndexTuple itup)
63 {
64 	ScanKey		skey;
65 	TupleDesc	itupdesc;
66 	int			natts;
67 	int16	   *indoption;
68 	int			i;
69 
70 	itupdesc = RelationGetDescr(rel);
71 	natts = RelationGetNumberOfAttributes(rel);
72 	indoption = rel->rd_indoption;
73 
74 	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
75 
76 	for (i = 0; i < natts; i++)
77 	{
78 		FmgrInfo   *procinfo;
79 		Datum		arg;
80 		bool		null;
81 		int			flags;
82 
83 		/*
84 		 * We can use the cached (default) support procs since no cross-type
85 		 * comparison can be needed.
86 		 */
87 		procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
88 		arg = index_getattr(itup, i + 1, itupdesc, &null);
89 		flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
90 		ScanKeyEntryInitializeWithInfo(&skey[i],
91 									   flags,
92 									   (AttrNumber) (i + 1),
93 									   InvalidStrategy,
94 									   InvalidOid,
95 									   rel->rd_indcollation[i],
96 									   procinfo,
97 									   arg);
98 	}
99 
100 	return skey;
101 }
102 
103 /*
104  * _bt_mkscankey_nodata
105  *		Build an insertion scan key that contains 3-way comparator routines
106  *		appropriate to the key datatypes, but no comparison data.  The
107  *		comparison data ultimately used must match the key datatypes.
108  *
109  *		The result cannot be used with _bt_compare(), unless comparison
110  *		data is first stored into the key entries.  Currently this
111  *		routine is only called by nbtsort.c and tuplesort.c, which have
112  *		their own comparison routines.
113  */
114 ScanKey
_bt_mkscankey_nodata(Relation rel)115 _bt_mkscankey_nodata(Relation rel)
116 {
117 	ScanKey		skey;
118 	int			natts;
119 	int16	   *indoption;
120 	int			i;
121 
122 	natts = RelationGetNumberOfAttributes(rel);
123 	indoption = rel->rd_indoption;
124 
125 	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
126 
127 	for (i = 0; i < natts; i++)
128 	{
129 		FmgrInfo   *procinfo;
130 		int			flags;
131 
132 		/*
133 		 * We can use the cached (default) support procs since no cross-type
134 		 * comparison can be needed.
135 		 */
136 		procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
137 		flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
138 		ScanKeyEntryInitializeWithInfo(&skey[i],
139 									   flags,
140 									   (AttrNumber) (i + 1),
141 									   InvalidStrategy,
142 									   InvalidOid,
143 									   rel->rd_indcollation[i],
144 									   procinfo,
145 									   (Datum) 0);
146 	}
147 
148 	return skey;
149 }
150 
151 /*
152  * free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
153  */
154 void
_bt_freeskey(ScanKey skey)155 _bt_freeskey(ScanKey skey)
156 {
157 	pfree(skey);
158 }
159 
160 /*
161  * free a retracement stack made by _bt_search.
162  */
163 void
_bt_freestack(BTStack stack)164 _bt_freestack(BTStack stack)
165 {
166 	BTStack		ostack;
167 
168 	while (stack != NULL)
169 	{
170 		ostack = stack;
171 		stack = stack->bts_parent;
172 		pfree(ostack);
173 	}
174 }
175 
176 
177 /*
178  *	_bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys
179  *
180  * If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and
181  * set up BTArrayKeyInfo info for each one that is an equality-type key.
182  * Prepare modified scan keys in so->arrayKeyData, which will hold the current
183  * array elements during each primitive indexscan operation.  For inequality
184  * array keys, it's sufficient to find the extreme element value and replace
185  * the whole array with that scalar value.
186  *
187  * Note: the reason we need so->arrayKeyData, rather than just scribbling
188  * on scan->keyData, is that callers are permitted to call btrescan without
189  * supplying a new set of scankey data.
190  */
191 void
_bt_preprocess_array_keys(IndexScanDesc scan)192 _bt_preprocess_array_keys(IndexScanDesc scan)
193 {
194 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
195 	int			numberOfKeys = scan->numberOfKeys;
196 	int16	   *indoption = scan->indexRelation->rd_indoption;
197 	int			numArrayKeys;
198 	ScanKey		cur;
199 	int			i;
200 	MemoryContext oldContext;
201 
202 	/* Quick check to see if there are any array keys */
203 	numArrayKeys = 0;
204 	for (i = 0; i < numberOfKeys; i++)
205 	{
206 		cur = &scan->keyData[i];
207 		if (cur->sk_flags & SK_SEARCHARRAY)
208 		{
209 			numArrayKeys++;
210 			Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL)));
211 			/* If any arrays are null as a whole, we can quit right now. */
212 			if (cur->sk_flags & SK_ISNULL)
213 			{
214 				so->numArrayKeys = -1;
215 				so->arrayKeyData = NULL;
216 				return;
217 			}
218 		}
219 	}
220 
221 	/* Quit if nothing to do. */
222 	if (numArrayKeys == 0)
223 	{
224 		so->numArrayKeys = 0;
225 		so->arrayKeyData = NULL;
226 		return;
227 	}
228 
229 	/*
230 	 * Make a scan-lifespan context to hold array-associated data, or reset it
231 	 * if we already have one from a previous rescan cycle.
232 	 */
233 	if (so->arrayContext == NULL)
234 		so->arrayContext = AllocSetContextCreate(CurrentMemoryContext,
235 												 "BTree array context",
236 												 ALLOCSET_SMALL_SIZES);
237 	else
238 		MemoryContextReset(so->arrayContext);
239 
240 	oldContext = MemoryContextSwitchTo(so->arrayContext);
241 
242 	/* Create modifiable copy of scan->keyData in the workspace context */
243 	so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
244 	memcpy(so->arrayKeyData,
245 		   scan->keyData,
246 		   scan->numberOfKeys * sizeof(ScanKeyData));
247 
248 	/* Allocate space for per-array data in the workspace context */
249 	so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo));
250 
251 	/* Now process each array key */
252 	numArrayKeys = 0;
253 	for (i = 0; i < numberOfKeys; i++)
254 	{
255 		ArrayType  *arrayval;
256 		int16		elmlen;
257 		bool		elmbyval;
258 		char		elmalign;
259 		int			num_elems;
260 		Datum	   *elem_values;
261 		bool	   *elem_nulls;
262 		int			num_nonnulls;
263 		int			j;
264 
265 		cur = &so->arrayKeyData[i];
266 		if (!(cur->sk_flags & SK_SEARCHARRAY))
267 			continue;
268 
269 		/*
270 		 * First, deconstruct the array into elements.  Anything allocated
271 		 * here (including a possibly detoasted array value) is in the
272 		 * workspace context.
273 		 */
274 		arrayval = DatumGetArrayTypeP(cur->sk_argument);
275 		/* We could cache this data, but not clear it's worth it */
276 		get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
277 							 &elmlen, &elmbyval, &elmalign);
278 		deconstruct_array(arrayval,
279 						  ARR_ELEMTYPE(arrayval),
280 						  elmlen, elmbyval, elmalign,
281 						  &elem_values, &elem_nulls, &num_elems);
282 
283 		/*
284 		 * Compress out any null elements.  We can ignore them since we assume
285 		 * all btree operators are strict.
286 		 */
287 		num_nonnulls = 0;
288 		for (j = 0; j < num_elems; j++)
289 		{
290 			if (!elem_nulls[j])
291 				elem_values[num_nonnulls++] = elem_values[j];
292 		}
293 
294 		/* We could pfree(elem_nulls) now, but not worth the cycles */
295 
296 		/* If there's no non-nulls, the scan qual is unsatisfiable */
297 		if (num_nonnulls == 0)
298 		{
299 			numArrayKeys = -1;
300 			break;
301 		}
302 
303 		/*
304 		 * If the comparison operator is not equality, then the array qual
305 		 * degenerates to a simple comparison against the smallest or largest
306 		 * non-null array element, as appropriate.
307 		 */
308 		switch (cur->sk_strategy)
309 		{
310 			case BTLessStrategyNumber:
311 			case BTLessEqualStrategyNumber:
312 				cur->sk_argument =
313 					_bt_find_extreme_element(scan, cur,
314 											 BTGreaterStrategyNumber,
315 											 elem_values, num_nonnulls);
316 				continue;
317 			case BTEqualStrategyNumber:
318 				/* proceed with rest of loop */
319 				break;
320 			case BTGreaterEqualStrategyNumber:
321 			case BTGreaterStrategyNumber:
322 				cur->sk_argument =
323 					_bt_find_extreme_element(scan, cur,
324 											 BTLessStrategyNumber,
325 											 elem_values, num_nonnulls);
326 				continue;
327 			default:
328 				elog(ERROR, "unrecognized StrategyNumber: %d",
329 					 (int) cur->sk_strategy);
330 				break;
331 		}
332 
333 		/*
334 		 * Sort the non-null elements and eliminate any duplicates.  We must
335 		 * sort in the same ordering used by the index column, so that the
336 		 * successive primitive indexscans produce data in index order.
337 		 */
338 		num_elems = _bt_sort_array_elements(scan, cur,
339 						(indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0,
340 											elem_values, num_nonnulls);
341 
342 		/*
343 		 * And set up the BTArrayKeyInfo data.
344 		 */
345 		so->arrayKeys[numArrayKeys].scan_key = i;
346 		so->arrayKeys[numArrayKeys].num_elems = num_elems;
347 		so->arrayKeys[numArrayKeys].elem_values = elem_values;
348 		numArrayKeys++;
349 	}
350 
351 	so->numArrayKeys = numArrayKeys;
352 
353 	MemoryContextSwitchTo(oldContext);
354 }
355 
356 /*
357  * _bt_find_extreme_element() -- get least or greatest array element
358  *
359  * scan and skey identify the index column, whose opfamily determines the
360  * comparison semantics.  strat should be BTLessStrategyNumber to get the
361  * least element, or BTGreaterStrategyNumber to get the greatest.
362  */
363 static Datum
_bt_find_extreme_element(IndexScanDesc scan,ScanKey skey,StrategyNumber strat,Datum * elems,int nelems)364 _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
365 						 StrategyNumber strat,
366 						 Datum *elems, int nelems)
367 {
368 	Relation	rel = scan->indexRelation;
369 	Oid			elemtype,
370 				cmp_op;
371 	RegProcedure cmp_proc;
372 	FmgrInfo	flinfo;
373 	Datum		result;
374 	int			i;
375 
376 	/*
377 	 * Determine the nominal datatype of the array elements.  We have to
378 	 * support the convention that sk_subtype == InvalidOid means the opclass
379 	 * input type; this is a hack to simplify life for ScanKeyInit().
380 	 */
381 	elemtype = skey->sk_subtype;
382 	if (elemtype == InvalidOid)
383 		elemtype = rel->rd_opcintype[skey->sk_attno - 1];
384 
385 	/*
386 	 * Look up the appropriate comparison operator in the opfamily.
387 	 *
388 	 * Note: it's possible that this would fail, if the opfamily is
389 	 * incomplete, but it seems quite unlikely that an opfamily would omit
390 	 * non-cross-type comparison operators for any datatype that it supports
391 	 * at all.
392 	 */
393 	cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1],
394 								 elemtype,
395 								 elemtype,
396 								 strat);
397 	if (!OidIsValid(cmp_op))
398 		elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
399 			 strat, elemtype, elemtype,
400 			 rel->rd_opfamily[skey->sk_attno - 1]);
401 	cmp_proc = get_opcode(cmp_op);
402 	if (!RegProcedureIsValid(cmp_proc))
403 		elog(ERROR, "missing oprcode for operator %u", cmp_op);
404 
405 	fmgr_info(cmp_proc, &flinfo);
406 
407 	Assert(nelems > 0);
408 	result = elems[0];
409 	for (i = 1; i < nelems; i++)
410 	{
411 		if (DatumGetBool(FunctionCall2Coll(&flinfo,
412 										   skey->sk_collation,
413 										   elems[i],
414 										   result)))
415 			result = elems[i];
416 	}
417 
418 	return result;
419 }
420 
421 /*
422  * _bt_sort_array_elements() -- sort and de-dup array elements
423  *
424  * The array elements are sorted in-place, and the new number of elements
425  * after duplicate removal is returned.
426  *
427  * scan and skey identify the index column, whose opfamily determines the
428  * comparison semantics.  If reverse is true, we sort in descending order.
429  */
430 static int
_bt_sort_array_elements(IndexScanDesc scan,ScanKey skey,bool reverse,Datum * elems,int nelems)431 _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
432 						bool reverse,
433 						Datum *elems, int nelems)
434 {
435 	Relation	rel = scan->indexRelation;
436 	Oid			elemtype;
437 	RegProcedure cmp_proc;
438 	BTSortArrayContext cxt;
439 	int			last_non_dup;
440 	int			i;
441 
442 	if (nelems <= 1)
443 		return nelems;			/* no work to do */
444 
445 	/*
446 	 * Determine the nominal datatype of the array elements.  We have to
447 	 * support the convention that sk_subtype == InvalidOid means the opclass
448 	 * input type; this is a hack to simplify life for ScanKeyInit().
449 	 */
450 	elemtype = skey->sk_subtype;
451 	if (elemtype == InvalidOid)
452 		elemtype = rel->rd_opcintype[skey->sk_attno - 1];
453 
454 	/*
455 	 * Look up the appropriate comparison function in the opfamily.
456 	 *
457 	 * Note: it's possible that this would fail, if the opfamily is
458 	 * incomplete, but it seems quite unlikely that an opfamily would omit
459 	 * non-cross-type support functions for any datatype that it supports at
460 	 * all.
461 	 */
462 	cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1],
463 								 elemtype,
464 								 elemtype,
465 								 BTORDER_PROC);
466 	if (!RegProcedureIsValid(cmp_proc))
467 		elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
468 			 BTORDER_PROC, elemtype, elemtype,
469 			 rel->rd_opfamily[skey->sk_attno - 1]);
470 
471 	/* Sort the array elements */
472 	fmgr_info(cmp_proc, &cxt.flinfo);
473 	cxt.collation = skey->sk_collation;
474 	cxt.reverse = reverse;
475 	qsort_arg((void *) elems, nelems, sizeof(Datum),
476 			  _bt_compare_array_elements, (void *) &cxt);
477 
478 	/* Now scan the sorted elements and remove duplicates */
479 	last_non_dup = 0;
480 	for (i = 1; i < nelems; i++)
481 	{
482 		int32		compare;
483 
484 		compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo,
485 												  cxt.collation,
486 												  elems[last_non_dup],
487 												  elems[i]));
488 		if (compare != 0)
489 			elems[++last_non_dup] = elems[i];
490 	}
491 
492 	return last_non_dup + 1;
493 }
494 
495 /*
496  * qsort_arg comparator for sorting array elements
497  */
498 static int
_bt_compare_array_elements(const void * a,const void * b,void * arg)499 _bt_compare_array_elements(const void *a, const void *b, void *arg)
500 {
501 	Datum		da = *((const Datum *) a);
502 	Datum		db = *((const Datum *) b);
503 	BTSortArrayContext *cxt = (BTSortArrayContext *) arg;
504 	int32		compare;
505 
506 	compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo,
507 											  cxt->collation,
508 											  da, db));
509 	if (cxt->reverse)
510 		INVERT_COMPARE_RESULT(compare);
511 	return compare;
512 }
513 
514 /*
515  * _bt_start_array_keys() -- Initialize array keys at start of a scan
516  *
517  * Set up the cur_elem counters and fill in the first sk_argument value for
518  * each array scankey.  We can't do this until we know the scan direction.
519  */
520 void
_bt_start_array_keys(IndexScanDesc scan,ScanDirection dir)521 _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
522 {
523 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
524 	int			i;
525 
526 	for (i = 0; i < so->numArrayKeys; i++)
527 	{
528 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
529 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
530 
531 		Assert(curArrayKey->num_elems > 0);
532 		if (ScanDirectionIsBackward(dir))
533 			curArrayKey->cur_elem = curArrayKey->num_elems - 1;
534 		else
535 			curArrayKey->cur_elem = 0;
536 		skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
537 	}
538 }
539 
540 /*
541  * _bt_advance_array_keys() -- Advance to next set of array elements
542  *
543  * Returns TRUE if there is another set of values to consider, FALSE if not.
544  * On TRUE result, the scankeys are initialized with the next set of values.
545  */
546 bool
_bt_advance_array_keys(IndexScanDesc scan,ScanDirection dir)547 _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
548 {
549 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
550 	bool		found = false;
551 	int			i;
552 
553 	/*
554 	 * We must advance the last array key most quickly, since it will
555 	 * correspond to the lowest-order index column among the available
556 	 * qualifications. This is necessary to ensure correct ordering of output
557 	 * when there are multiple array keys.
558 	 */
559 	for (i = so->numArrayKeys - 1; i >= 0; i--)
560 	{
561 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
562 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
563 		int			cur_elem = curArrayKey->cur_elem;
564 		int			num_elems = curArrayKey->num_elems;
565 
566 		if (ScanDirectionIsBackward(dir))
567 		{
568 			if (--cur_elem < 0)
569 			{
570 				cur_elem = num_elems - 1;
571 				found = false;	/* need to advance next array key */
572 			}
573 			else
574 				found = true;
575 		}
576 		else
577 		{
578 			if (++cur_elem >= num_elems)
579 			{
580 				cur_elem = 0;
581 				found = false;	/* need to advance next array key */
582 			}
583 			else
584 				found = true;
585 		}
586 
587 		curArrayKey->cur_elem = cur_elem;
588 		skey->sk_argument = curArrayKey->elem_values[cur_elem];
589 		if (found)
590 			break;
591 	}
592 
593 	return found;
594 }
595 
596 /*
597  * _bt_mark_array_keys() -- Handle array keys during btmarkpos
598  *
599  * Save the current state of the array keys as the "mark" position.
600  */
601 void
_bt_mark_array_keys(IndexScanDesc scan)602 _bt_mark_array_keys(IndexScanDesc scan)
603 {
604 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
605 	int			i;
606 
607 	for (i = 0; i < so->numArrayKeys; i++)
608 	{
609 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
610 
611 		curArrayKey->mark_elem = curArrayKey->cur_elem;
612 	}
613 }
614 
615 /*
616  * _bt_restore_array_keys() -- Handle array keys during btrestrpos
617  *
618  * Restore the array keys to where they were when the mark was set.
619  */
620 void
_bt_restore_array_keys(IndexScanDesc scan)621 _bt_restore_array_keys(IndexScanDesc scan)
622 {
623 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
624 	bool		changed = false;
625 	int			i;
626 
627 	/* Restore each array key to its position when the mark was set */
628 	for (i = 0; i < so->numArrayKeys; i++)
629 	{
630 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
631 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
632 		int			mark_elem = curArrayKey->mark_elem;
633 
634 		if (curArrayKey->cur_elem != mark_elem)
635 		{
636 			curArrayKey->cur_elem = mark_elem;
637 			skey->sk_argument = curArrayKey->elem_values[mark_elem];
638 			changed = true;
639 		}
640 	}
641 
642 	/*
643 	 * If we changed any keys, we must redo _bt_preprocess_keys.  That might
644 	 * sound like overkill, but in cases with multiple keys per index column
645 	 * it seems necessary to do the full set of pushups.
646 	 */
647 	if (changed)
648 	{
649 		_bt_preprocess_keys(scan);
650 		/* The mark should have been set on a consistent set of keys... */
651 		Assert(so->qual_ok);
652 	}
653 }
654 
655 
656 /*
657  *	_bt_preprocess_keys() -- Preprocess scan keys
658  *
659  * The given search-type keys (in scan->keyData[] or so->arrayKeyData[])
660  * are copied to so->keyData[] with possible transformation.
661  * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets
662  * the number of output keys (possibly less, never greater).
663  *
664  * The output keys are marked with additional sk_flag bits beyond the
665  * system-standard bits supplied by the caller.  The DESC and NULLS_FIRST
666  * indoption bits for the relevant index attribute are copied into the flags.
667  * Also, for a DESC column, we commute (flip) all the sk_strategy numbers
668  * so that the index sorts in the desired direction.
669  *
670  * One key purpose of this routine is to discover which scan keys must be
671  * satisfied to continue the scan.  It also attempts to eliminate redundant
672  * keys and detect contradictory keys.  (If the index opfamily provides
673  * incomplete sets of cross-type operators, we may fail to detect redundant
674  * or contradictory keys, but we can survive that.)
675  *
676  * The output keys must be sorted by index attribute.  Presently we expect
677  * (but verify) that the input keys are already so sorted --- this is done
678  * by match_clauses_to_index() in indxpath.c.  Some reordering of the keys
679  * within each attribute may be done as a byproduct of the processing here,
680  * but no other code depends on that.
681  *
682  * The output keys are marked with flags SK_BT_REQFWD and/or SK_BT_REQBKWD
683  * if they must be satisfied in order to continue the scan forward or backward
684  * respectively.  _bt_checkkeys uses these flags.  For example, if the quals
685  * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
686  * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
687  * But once we reach tuples like (1,4,z) we can stop scanning because no
688  * later tuples could match.  This is reflected by marking the x and y keys,
689  * but not the z key, with SK_BT_REQFWD.  In general, the keys for leading
690  * attributes with "=" keys are marked both SK_BT_REQFWD and SK_BT_REQBKWD.
691  * For the first attribute without an "=" key, any "<" and "<=" keys are
692  * marked SK_BT_REQFWD while any ">" and ">=" keys are marked SK_BT_REQBKWD.
693  * This can be seen to be correct by considering the above example.  Note
694  * in particular that if there are no keys for a given attribute, the keys for
695  * subsequent attributes can never be required; for instance "WHERE y = 4"
696  * requires a full-index scan.
697  *
698  * If possible, redundant keys are eliminated: we keep only the tightest
699  * >/>= bound and the tightest </<= bound, and if there's an = key then
700  * that's the only one returned.  (So, we return either a single = key,
701  * or one or two boundary-condition keys for each attr.)  However, if we
702  * cannot compare two keys for lack of a suitable cross-type operator,
703  * we cannot eliminate either.  If there are two such keys of the same
704  * operator strategy, the second one is just pushed into the output array
705  * without further processing here.  We may also emit both >/>= or both
706  * </<= keys if we can't compare them.  The logic about required keys still
707  * works if we don't eliminate redundant keys.
708  *
709  * Note that one reason we need direction-sensitive required-key flags is
710  * precisely that we may not be able to eliminate redundant keys.  Suppose
711  * we have "x > 4::int AND x > 10::bigint", and we are unable to determine
712  * which key is more restrictive for lack of a suitable cross-type operator.
713  * _bt_first will arbitrarily pick one of the keys to do the initial
714  * positioning with.  If it picks x > 4, then the x > 10 condition will fail
715  * until we reach index entries > 10; but we can't stop the scan just because
716  * x > 10 is failing.  On the other hand, if we are scanning backwards, then
717  * failure of either key is indeed enough to stop the scan.  (In general, when
718  * inequality keys are present, the initial-positioning code only promises to
719  * position before the first possible match, not exactly at the first match,
720  * for a forward scan; or after the last match for a backward scan.)
721  *
722  * As a byproduct of this work, we can detect contradictory quals such
723  * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = FALSE,
724  * indicating the scan need not be run at all since no tuples can match.
725  * (In this case we do not bother completing the output key array!)
726  * Again, missing cross-type operators might cause us to fail to prove the
727  * quals contradictory when they really are, but the scan will work correctly.
728  *
729  * Row comparison keys are currently also treated without any smarts:
730  * we just transfer them into the preprocessed array without any
731  * editorialization.  We can treat them the same as an ordinary inequality
732  * comparison on the row's first index column, for the purposes of the logic
733  * about required keys.
734  *
735  * Note: the reason we have to copy the preprocessed scan keys into private
736  * storage is that we are modifying the array based on comparisons of the
737  * key argument values, which could change on a rescan or after moving to
738  * new elements of array keys.  Therefore we can't overwrite the source data.
739  */
740 void
_bt_preprocess_keys(IndexScanDesc scan)741 _bt_preprocess_keys(IndexScanDesc scan)
742 {
743 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
744 	int			numberOfKeys = scan->numberOfKeys;
745 	int16	   *indoption = scan->indexRelation->rd_indoption;
746 	int			new_numberOfKeys;
747 	int			numberOfEqualCols;
748 	ScanKey		inkeys;
749 	ScanKey		outkeys;
750 	ScanKey		cur;
751 	ScanKey		xform[BTMaxStrategyNumber];
752 	bool		test_result;
753 	int			i,
754 				j;
755 	AttrNumber	attno;
756 
757 	/* initialize result variables */
758 	so->qual_ok = true;
759 	so->numberOfKeys = 0;
760 
761 	if (numberOfKeys < 1)
762 		return;					/* done if qual-less scan */
763 
764 	/*
765 	 * Read so->arrayKeyData if array keys are present, else scan->keyData
766 	 */
767 	if (so->arrayKeyData != NULL)
768 		inkeys = so->arrayKeyData;
769 	else
770 		inkeys = scan->keyData;
771 
772 	outkeys = so->keyData;
773 	cur = &inkeys[0];
774 	/* we check that input keys are correctly ordered */
775 	if (cur->sk_attno < 1)
776 		elog(ERROR, "btree index keys must be ordered by attribute");
777 
778 	/* We can short-circuit most of the work if there's just one key */
779 	if (numberOfKeys == 1)
780 	{
781 		/* Apply indoption to scankey (might change sk_strategy!) */
782 		if (!_bt_fix_scankey_strategy(cur, indoption))
783 			so->qual_ok = false;
784 		memcpy(outkeys, cur, sizeof(ScanKeyData));
785 		so->numberOfKeys = 1;
786 		/* We can mark the qual as required if it's for first index col */
787 		if (cur->sk_attno == 1)
788 			_bt_mark_scankey_required(outkeys);
789 		return;
790 	}
791 
792 	/*
793 	 * Otherwise, do the full set of pushups.
794 	 */
795 	new_numberOfKeys = 0;
796 	numberOfEqualCols = 0;
797 
798 	/*
799 	 * Initialize for processing of keys for attr 1.
800 	 *
801 	 * xform[i] points to the currently best scan key of strategy type i+1; it
802 	 * is NULL if we haven't yet found such a key for this attr.
803 	 */
804 	attno = 1;
805 	memset(xform, 0, sizeof(xform));
806 
807 	/*
808 	 * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
809 	 * handle after-last-key processing.  Actual exit from the loop is at the
810 	 * "break" statement below.
811 	 */
812 	for (i = 0;; cur++, i++)
813 	{
814 		if (i < numberOfKeys)
815 		{
816 			/* Apply indoption to scankey (might change sk_strategy!) */
817 			if (!_bt_fix_scankey_strategy(cur, indoption))
818 			{
819 				/* NULL can't be matched, so give up */
820 				so->qual_ok = false;
821 				return;
822 			}
823 		}
824 
825 		/*
826 		 * If we are at the end of the keys for a particular attr, finish up
827 		 * processing and emit the cleaned-up keys.
828 		 */
829 		if (i == numberOfKeys || cur->sk_attno != attno)
830 		{
831 			int			priorNumberOfEqualCols = numberOfEqualCols;
832 
833 			/* check input keys are correctly ordered */
834 			if (i < numberOfKeys && cur->sk_attno < attno)
835 				elog(ERROR, "btree index keys must be ordered by attribute");
836 
837 			/*
838 			 * If = has been specified, all other keys can be eliminated as
839 			 * redundant.  If we have a case like key = 1 AND key > 2, we can
840 			 * set qual_ok to false and abandon further processing.
841 			 *
842 			 * We also have to deal with the case of "key IS NULL", which is
843 			 * unsatisfiable in combination with any other index condition. By
844 			 * the time we get here, that's been classified as an equality
845 			 * check, and we've rejected any combination of it with a regular
846 			 * equality condition; but not with other types of conditions.
847 			 */
848 			if (xform[BTEqualStrategyNumber - 1])
849 			{
850 				ScanKey		eq = xform[BTEqualStrategyNumber - 1];
851 
852 				for (j = BTMaxStrategyNumber; --j >= 0;)
853 				{
854 					ScanKey		chk = xform[j];
855 
856 					if (!chk || j == (BTEqualStrategyNumber - 1))
857 						continue;
858 
859 					if (eq->sk_flags & SK_SEARCHNULL)
860 					{
861 						/* IS NULL is contradictory to anything else */
862 						so->qual_ok = false;
863 						return;
864 					}
865 
866 					if (_bt_compare_scankey_args(scan, chk, eq, chk,
867 												 &test_result))
868 					{
869 						if (!test_result)
870 						{
871 							/* keys proven mutually contradictory */
872 							so->qual_ok = false;
873 							return;
874 						}
875 						/* else discard the redundant non-equality key */
876 						xform[j] = NULL;
877 					}
878 					/* else, cannot determine redundancy, keep both keys */
879 				}
880 				/* track number of attrs for which we have "=" keys */
881 				numberOfEqualCols++;
882 			}
883 
884 			/* try to keep only one of <, <= */
885 			if (xform[BTLessStrategyNumber - 1]
886 				&& xform[BTLessEqualStrategyNumber - 1])
887 			{
888 				ScanKey		lt = xform[BTLessStrategyNumber - 1];
889 				ScanKey		le = xform[BTLessEqualStrategyNumber - 1];
890 
891 				if (_bt_compare_scankey_args(scan, le, lt, le,
892 											 &test_result))
893 				{
894 					if (test_result)
895 						xform[BTLessEqualStrategyNumber - 1] = NULL;
896 					else
897 						xform[BTLessStrategyNumber - 1] = NULL;
898 				}
899 			}
900 
901 			/* try to keep only one of >, >= */
902 			if (xform[BTGreaterStrategyNumber - 1]
903 				&& xform[BTGreaterEqualStrategyNumber - 1])
904 			{
905 				ScanKey		gt = xform[BTGreaterStrategyNumber - 1];
906 				ScanKey		ge = xform[BTGreaterEqualStrategyNumber - 1];
907 
908 				if (_bt_compare_scankey_args(scan, ge, gt, ge,
909 											 &test_result))
910 				{
911 					if (test_result)
912 						xform[BTGreaterEqualStrategyNumber - 1] = NULL;
913 					else
914 						xform[BTGreaterStrategyNumber - 1] = NULL;
915 				}
916 			}
917 
918 			/*
919 			 * Emit the cleaned-up keys into the outkeys[] array, and then
920 			 * mark them if they are required.  They are required (possibly
921 			 * only in one direction) if all attrs before this one had "=".
922 			 */
923 			for (j = BTMaxStrategyNumber; --j >= 0;)
924 			{
925 				if (xform[j])
926 				{
927 					ScanKey		outkey = &outkeys[new_numberOfKeys++];
928 
929 					memcpy(outkey, xform[j], sizeof(ScanKeyData));
930 					if (priorNumberOfEqualCols == attno - 1)
931 						_bt_mark_scankey_required(outkey);
932 				}
933 			}
934 
935 			/*
936 			 * Exit loop here if done.
937 			 */
938 			if (i == numberOfKeys)
939 				break;
940 
941 			/* Re-initialize for new attno */
942 			attno = cur->sk_attno;
943 			memset(xform, 0, sizeof(xform));
944 		}
945 
946 		/* check strategy this key's operator corresponds to */
947 		j = cur->sk_strategy - 1;
948 
949 		/* if row comparison, push it directly to the output array */
950 		if (cur->sk_flags & SK_ROW_HEADER)
951 		{
952 			ScanKey		outkey = &outkeys[new_numberOfKeys++];
953 
954 			memcpy(outkey, cur, sizeof(ScanKeyData));
955 			if (numberOfEqualCols == attno - 1)
956 				_bt_mark_scankey_required(outkey);
957 
958 			/*
959 			 * We don't support RowCompare using equality; such a qual would
960 			 * mess up the numberOfEqualCols tracking.
961 			 */
962 			Assert(j != (BTEqualStrategyNumber - 1));
963 			continue;
964 		}
965 
966 		/* have we seen one of these before? */
967 		if (xform[j] == NULL)
968 		{
969 			/* nope, so remember this scankey */
970 			xform[j] = cur;
971 		}
972 		else
973 		{
974 			/* yup, keep only the more restrictive key */
975 			if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
976 										 &test_result))
977 			{
978 				if (test_result)
979 					xform[j] = cur;
980 				else if (j == (BTEqualStrategyNumber - 1))
981 				{
982 					/* key == a && key == b, but a != b */
983 					so->qual_ok = false;
984 					return;
985 				}
986 				/* else old key is more restrictive, keep it */
987 			}
988 			else
989 			{
990 				/*
991 				 * We can't determine which key is more restrictive.  Keep the
992 				 * previous one in xform[j] and push this one directly to the
993 				 * output array.
994 				 */
995 				ScanKey		outkey = &outkeys[new_numberOfKeys++];
996 
997 				memcpy(outkey, cur, sizeof(ScanKeyData));
998 				if (numberOfEqualCols == attno - 1)
999 					_bt_mark_scankey_required(outkey);
1000 			}
1001 		}
1002 	}
1003 
1004 	so->numberOfKeys = new_numberOfKeys;
1005 }
1006 
1007 /*
1008  * Compare two scankey values using a specified operator.
1009  *
1010  * The test we want to perform is logically "leftarg op rightarg", where
1011  * leftarg and rightarg are the sk_argument values in those ScanKeys, and
1012  * the comparison operator is the one in the op ScanKey.  However, in
1013  * cross-data-type situations we may need to look up the correct operator in
1014  * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
1015  * and amoplefttype/amoprighttype equal to the two argument datatypes.
1016  *
1017  * If the opfamily doesn't supply a complete set of cross-type operators we
1018  * may not be able to make the comparison.  If we can make the comparison
1019  * we store the operator result in *result and return TRUE.  We return FALSE
1020  * if the comparison could not be made.
1021  *
1022  * Note: op always points at the same ScanKey as either leftarg or rightarg.
1023  * Since we don't scribble on the scankeys, this aliasing should cause no
1024  * trouble.
1025  *
1026  * Note: this routine needs to be insensitive to any DESC option applied
1027  * to the index column.  For example, "x < 4" is a tighter constraint than
1028  * "x < 5" regardless of which way the index is sorted.
1029  */
1030 static bool
_bt_compare_scankey_args(IndexScanDesc scan,ScanKey op,ScanKey leftarg,ScanKey rightarg,bool * result)1031 _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
1032 						 ScanKey leftarg, ScanKey rightarg,
1033 						 bool *result)
1034 {
1035 	Relation	rel = scan->indexRelation;
1036 	Oid			lefttype,
1037 				righttype,
1038 				optype,
1039 				opcintype,
1040 				cmp_op;
1041 	StrategyNumber strat;
1042 
1043 	/*
1044 	 * First, deal with cases where one or both args are NULL.  This should
1045 	 * only happen when the scankeys represent IS NULL/NOT NULL conditions.
1046 	 */
1047 	if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ISNULL)
1048 	{
1049 		bool		leftnull,
1050 					rightnull;
1051 
1052 		if (leftarg->sk_flags & SK_ISNULL)
1053 		{
1054 			Assert(leftarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
1055 			leftnull = true;
1056 		}
1057 		else
1058 			leftnull = false;
1059 		if (rightarg->sk_flags & SK_ISNULL)
1060 		{
1061 			Assert(rightarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
1062 			rightnull = true;
1063 		}
1064 		else
1065 			rightnull = false;
1066 
1067 		/*
1068 		 * We treat NULL as either greater than or less than all other values.
1069 		 * Since true > false, the tests below work correctly for NULLS LAST
1070 		 * logic.  If the index is NULLS FIRST, we need to flip the strategy.
1071 		 */
1072 		strat = op->sk_strategy;
1073 		if (op->sk_flags & SK_BT_NULLS_FIRST)
1074 			strat = BTCommuteStrategyNumber(strat);
1075 
1076 		switch (strat)
1077 		{
1078 			case BTLessStrategyNumber:
1079 				*result = (leftnull < rightnull);
1080 				break;
1081 			case BTLessEqualStrategyNumber:
1082 				*result = (leftnull <= rightnull);
1083 				break;
1084 			case BTEqualStrategyNumber:
1085 				*result = (leftnull == rightnull);
1086 				break;
1087 			case BTGreaterEqualStrategyNumber:
1088 				*result = (leftnull >= rightnull);
1089 				break;
1090 			case BTGreaterStrategyNumber:
1091 				*result = (leftnull > rightnull);
1092 				break;
1093 			default:
1094 				elog(ERROR, "unrecognized StrategyNumber: %d", (int) strat);
1095 				*result = false;	/* keep compiler quiet */
1096 				break;
1097 		}
1098 		return true;
1099 	}
1100 
1101 	/*
1102 	 * The opfamily we need to worry about is identified by the index column.
1103 	 */
1104 	Assert(leftarg->sk_attno == rightarg->sk_attno);
1105 
1106 	opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
1107 
1108 	/*
1109 	 * Determine the actual datatypes of the ScanKey arguments.  We have to
1110 	 * support the convention that sk_subtype == InvalidOid means the opclass
1111 	 * input type; this is a hack to simplify life for ScanKeyInit().
1112 	 */
1113 	lefttype = leftarg->sk_subtype;
1114 	if (lefttype == InvalidOid)
1115 		lefttype = opcintype;
1116 	righttype = rightarg->sk_subtype;
1117 	if (righttype == InvalidOid)
1118 		righttype = opcintype;
1119 	optype = op->sk_subtype;
1120 	if (optype == InvalidOid)
1121 		optype = opcintype;
1122 
1123 	/*
1124 	 * If leftarg and rightarg match the types expected for the "op" scankey,
1125 	 * we can use its already-looked-up comparison function.
1126 	 */
1127 	if (lefttype == opcintype && righttype == optype)
1128 	{
1129 		*result = DatumGetBool(FunctionCall2Coll(&op->sk_func,
1130 												 op->sk_collation,
1131 												 leftarg->sk_argument,
1132 												 rightarg->sk_argument));
1133 		return true;
1134 	}
1135 
1136 	/*
1137 	 * Otherwise, we need to go to the syscache to find the appropriate
1138 	 * operator.  (This cannot result in infinite recursion, since no
1139 	 * indexscan initiated by syscache lookup will use cross-data-type
1140 	 * operators.)
1141 	 *
1142 	 * If the sk_strategy was flipped by _bt_fix_scankey_strategy, we have to
1143 	 * un-flip it to get the correct opfamily member.
1144 	 */
1145 	strat = op->sk_strategy;
1146 	if (op->sk_flags & SK_BT_DESC)
1147 		strat = BTCommuteStrategyNumber(strat);
1148 
1149 	cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
1150 								 lefttype,
1151 								 righttype,
1152 								 strat);
1153 	if (OidIsValid(cmp_op))
1154 	{
1155 		RegProcedure cmp_proc = get_opcode(cmp_op);
1156 
1157 		if (RegProcedureIsValid(cmp_proc))
1158 		{
1159 			*result = DatumGetBool(OidFunctionCall2Coll(cmp_proc,
1160 														op->sk_collation,
1161 														leftarg->sk_argument,
1162 													 rightarg->sk_argument));
1163 			return true;
1164 		}
1165 	}
1166 
1167 	/* Can't make the comparison */
1168 	*result = false;			/* suppress compiler warnings */
1169 	return false;
1170 }
1171 
1172 /*
1173  * Adjust a scankey's strategy and flags setting as needed for indoptions.
1174  *
1175  * We copy the appropriate indoption value into the scankey sk_flags
1176  * (shifting to avoid clobbering system-defined flag bits).  Also, if
1177  * the DESC option is set, commute (flip) the operator strategy number.
1178  *
1179  * A secondary purpose is to check for IS NULL/NOT NULL scankeys and set up
1180  * the strategy field correctly for them.
1181  *
1182  * Lastly, for ordinary scankeys (not IS NULL/NOT NULL), we check for a
1183  * NULL comparison value.  Since all btree operators are assumed strict,
1184  * a NULL means that the qual cannot be satisfied.  We return TRUE if the
1185  * comparison value isn't NULL, or FALSE if the scan should be abandoned.
1186  *
1187  * This function is applied to the *input* scankey structure; therefore
1188  * on a rescan we will be looking at already-processed scankeys.  Hence
1189  * we have to be careful not to re-commute the strategy if we already did it.
1190  * It's a bit ugly to modify the caller's copy of the scankey but in practice
1191  * there shouldn't be any problem, since the index's indoptions are certainly
1192  * not going to change while the scankey survives.
1193  */
1194 static bool
_bt_fix_scankey_strategy(ScanKey skey,int16 * indoption)1195 _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption)
1196 {
1197 	int			addflags;
1198 
1199 	addflags = indoption[skey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1200 
1201 	/*
1202 	 * We treat all btree operators as strict (even if they're not so marked
1203 	 * in pg_proc). This means that it is impossible for an operator condition
1204 	 * with a NULL comparison constant to succeed, and we can reject it right
1205 	 * away.
1206 	 *
1207 	 * However, we now also support "x IS NULL" clauses as search conditions,
1208 	 * so in that case keep going. The planner has not filled in any
1209 	 * particular strategy in this case, so set it to BTEqualStrategyNumber
1210 	 * --- we can treat IS NULL as an equality operator for purposes of search
1211 	 * strategy.
1212 	 *
1213 	 * Likewise, "x IS NOT NULL" is supported.  We treat that as either "less
1214 	 * than NULL" in a NULLS LAST index, or "greater than NULL" in a NULLS
1215 	 * FIRST index.
1216 	 *
1217 	 * Note: someday we might have to fill in sk_collation from the index
1218 	 * column's collation.  At the moment this is a non-issue because we'll
1219 	 * never actually call the comparison operator on a NULL.
1220 	 */
1221 	if (skey->sk_flags & SK_ISNULL)
1222 	{
1223 		/* SK_ISNULL shouldn't be set in a row header scankey */
1224 		Assert(!(skey->sk_flags & SK_ROW_HEADER));
1225 
1226 		/* Set indoption flags in scankey (might be done already) */
1227 		skey->sk_flags |= addflags;
1228 
1229 		/* Set correct strategy for IS NULL or NOT NULL search */
1230 		if (skey->sk_flags & SK_SEARCHNULL)
1231 		{
1232 			skey->sk_strategy = BTEqualStrategyNumber;
1233 			skey->sk_subtype = InvalidOid;
1234 			skey->sk_collation = InvalidOid;
1235 		}
1236 		else if (skey->sk_flags & SK_SEARCHNOTNULL)
1237 		{
1238 			if (skey->sk_flags & SK_BT_NULLS_FIRST)
1239 				skey->sk_strategy = BTGreaterStrategyNumber;
1240 			else
1241 				skey->sk_strategy = BTLessStrategyNumber;
1242 			skey->sk_subtype = InvalidOid;
1243 			skey->sk_collation = InvalidOid;
1244 		}
1245 		else
1246 		{
1247 			/* regular qual, so it cannot be satisfied */
1248 			return false;
1249 		}
1250 
1251 		/* Needn't do the rest */
1252 		return true;
1253 	}
1254 
1255 	/* Adjust strategy for DESC, if we didn't already */
1256 	if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC))
1257 		skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy);
1258 	skey->sk_flags |= addflags;
1259 
1260 	/* If it's a row header, fix row member flags and strategies similarly */
1261 	if (skey->sk_flags & SK_ROW_HEADER)
1262 	{
1263 		ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1264 
1265 		for (;;)
1266 		{
1267 			Assert(subkey->sk_flags & SK_ROW_MEMBER);
1268 			addflags = indoption[subkey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1269 			if ((addflags & SK_BT_DESC) && !(subkey->sk_flags & SK_BT_DESC))
1270 				subkey->sk_strategy = BTCommuteStrategyNumber(subkey->sk_strategy);
1271 			subkey->sk_flags |= addflags;
1272 			if (subkey->sk_flags & SK_ROW_END)
1273 				break;
1274 			subkey++;
1275 		}
1276 	}
1277 
1278 	return true;
1279 }
1280 
1281 /*
1282  * Mark a scankey as "required to continue the scan".
1283  *
1284  * Depending on the operator type, the key may be required for both scan
1285  * directions or just one.  Also, if the key is a row comparison header,
1286  * we have to mark its first subsidiary ScanKey as required.  (Subsequent
1287  * subsidiary ScanKeys are normally for lower-order columns, and thus
1288  * cannot be required, since they're after the first non-equality scankey.)
1289  *
1290  * Note: when we set required-key flag bits in a subsidiary scankey, we are
1291  * scribbling on a data structure belonging to the index AM's caller, not on
1292  * our private copy.  This should be OK because the marking will not change
1293  * from scan to scan within a query, and so we'd just re-mark the same way
1294  * anyway on a rescan.  Something to keep an eye on though.
1295  */
1296 static void
_bt_mark_scankey_required(ScanKey skey)1297 _bt_mark_scankey_required(ScanKey skey)
1298 {
1299 	int			addflags;
1300 
1301 	switch (skey->sk_strategy)
1302 	{
1303 		case BTLessStrategyNumber:
1304 		case BTLessEqualStrategyNumber:
1305 			addflags = SK_BT_REQFWD;
1306 			break;
1307 		case BTEqualStrategyNumber:
1308 			addflags = SK_BT_REQFWD | SK_BT_REQBKWD;
1309 			break;
1310 		case BTGreaterEqualStrategyNumber:
1311 		case BTGreaterStrategyNumber:
1312 			addflags = SK_BT_REQBKWD;
1313 			break;
1314 		default:
1315 			elog(ERROR, "unrecognized StrategyNumber: %d",
1316 				 (int) skey->sk_strategy);
1317 			addflags = 0;		/* keep compiler quiet */
1318 			break;
1319 	}
1320 
1321 	skey->sk_flags |= addflags;
1322 
1323 	if (skey->sk_flags & SK_ROW_HEADER)
1324 	{
1325 		ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1326 
1327 		/* First subkey should be same column/operator as the header */
1328 		Assert(subkey->sk_flags & SK_ROW_MEMBER);
1329 		Assert(subkey->sk_attno == skey->sk_attno);
1330 		Assert(subkey->sk_strategy == skey->sk_strategy);
1331 		subkey->sk_flags |= addflags;
1332 	}
1333 }
1334 
1335 /*
1336  * Test whether an indextuple satisfies all the scankey conditions.
1337  *
1338  * If so, return the address of the index tuple on the index page.
1339  * If not, return NULL.
1340  *
1341  * If the tuple fails to pass the qual, we also determine whether there's
1342  * any need to continue the scan beyond this tuple, and set *continuescan
1343  * accordingly.  See comments for _bt_preprocess_keys(), above, about how
1344  * this is done.
1345  *
1346  * scan: index scan descriptor (containing a search-type scankey)
1347  * page: buffer page containing index tuple
1348  * offnum: offset number of index tuple (must be a valid item!)
1349  * dir: direction we are scanning in
1350  * continuescan: output parameter (will be set correctly in all cases)
1351  *
1352  * Caller must hold pin and lock on the index page.
1353  */
1354 IndexTuple
_bt_checkkeys(IndexScanDesc scan,Page page,OffsetNumber offnum,ScanDirection dir,bool * continuescan)1355 _bt_checkkeys(IndexScanDesc scan,
1356 			  Page page, OffsetNumber offnum,
1357 			  ScanDirection dir, bool *continuescan)
1358 {
1359 	ItemId		iid = PageGetItemId(page, offnum);
1360 	bool		tuple_alive;
1361 	IndexTuple	tuple;
1362 	TupleDesc	tupdesc;
1363 	BTScanOpaque so;
1364 	int			keysz;
1365 	int			ikey;
1366 	ScanKey		key;
1367 
1368 	*continuescan = true;		/* default assumption */
1369 
1370 	/*
1371 	 * If the scan specifies not to return killed tuples, then we treat a
1372 	 * killed tuple as not passing the qual.  Most of the time, it's a win to
1373 	 * not bother examining the tuple's index keys, but just return
1374 	 * immediately with continuescan = true to proceed to the next tuple.
1375 	 * However, if this is the last tuple on the page, we should check the
1376 	 * index keys to prevent uselessly advancing to the next page.
1377 	 */
1378 	if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
1379 	{
1380 		/* return immediately if there are more tuples on the page */
1381 		if (ScanDirectionIsForward(dir))
1382 		{
1383 			if (offnum < PageGetMaxOffsetNumber(page))
1384 				return NULL;
1385 		}
1386 		else
1387 		{
1388 			BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1389 
1390 			if (offnum > P_FIRSTDATAKEY(opaque))
1391 				return NULL;
1392 		}
1393 
1394 		/*
1395 		 * OK, we want to check the keys so we can set continuescan correctly,
1396 		 * but we'll return NULL even if the tuple passes the key tests.
1397 		 */
1398 		tuple_alive = false;
1399 	}
1400 	else
1401 		tuple_alive = true;
1402 
1403 	tuple = (IndexTuple) PageGetItem(page, iid);
1404 
1405 	tupdesc = RelationGetDescr(scan->indexRelation);
1406 	so = (BTScanOpaque) scan->opaque;
1407 	keysz = so->numberOfKeys;
1408 
1409 	for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
1410 	{
1411 		Datum		datum;
1412 		bool		isNull;
1413 		Datum		test;
1414 
1415 		/* row-comparison keys need special processing */
1416 		if (key->sk_flags & SK_ROW_HEADER)
1417 		{
1418 			if (_bt_check_rowcompare(key, tuple, tupdesc, dir, continuescan))
1419 				continue;
1420 			return NULL;
1421 		}
1422 
1423 		datum = index_getattr(tuple,
1424 							  key->sk_attno,
1425 							  tupdesc,
1426 							  &isNull);
1427 
1428 		if (key->sk_flags & SK_ISNULL)
1429 		{
1430 			/* Handle IS NULL/NOT NULL tests */
1431 			if (key->sk_flags & SK_SEARCHNULL)
1432 			{
1433 				if (isNull)
1434 					continue;	/* tuple satisfies this qual */
1435 			}
1436 			else
1437 			{
1438 				Assert(key->sk_flags & SK_SEARCHNOTNULL);
1439 				if (!isNull)
1440 					continue;	/* tuple satisfies this qual */
1441 			}
1442 
1443 			/*
1444 			 * Tuple fails this qual.  If it's a required qual for the current
1445 			 * scan direction, then we can conclude no further tuples will
1446 			 * pass, either.
1447 			 */
1448 			if ((key->sk_flags & SK_BT_REQFWD) &&
1449 				ScanDirectionIsForward(dir))
1450 				*continuescan = false;
1451 			else if ((key->sk_flags & SK_BT_REQBKWD) &&
1452 					 ScanDirectionIsBackward(dir))
1453 				*continuescan = false;
1454 
1455 			/*
1456 			 * In any case, this indextuple doesn't match the qual.
1457 			 */
1458 			return NULL;
1459 		}
1460 
1461 		if (isNull)
1462 		{
1463 			if (key->sk_flags & SK_BT_NULLS_FIRST)
1464 			{
1465 				/*
1466 				 * Since NULLs are sorted before non-NULLs, we know we have
1467 				 * reached the lower limit of the range of values for this
1468 				 * index attr.  On a backward scan, we can stop if this qual
1469 				 * is one of the "must match" subset.  We can stop regardless
1470 				 * of whether the qual is > or <, so long as it's required,
1471 				 * because it's not possible for any future tuples to pass. On
1472 				 * a forward scan, however, we must keep going, because we may
1473 				 * have initially positioned to the start of the index.
1474 				 */
1475 				if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1476 					ScanDirectionIsBackward(dir))
1477 					*continuescan = false;
1478 			}
1479 			else
1480 			{
1481 				/*
1482 				 * Since NULLs are sorted after non-NULLs, we know we have
1483 				 * reached the upper limit of the range of values for this
1484 				 * index attr.  On a forward scan, we can stop if this qual is
1485 				 * one of the "must match" subset.  We can stop regardless of
1486 				 * whether the qual is > or <, so long as it's required,
1487 				 * because it's not possible for any future tuples to pass. On
1488 				 * a backward scan, however, we must keep going, because we
1489 				 * may have initially positioned to the end of the index.
1490 				 */
1491 				if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1492 					ScanDirectionIsForward(dir))
1493 					*continuescan = false;
1494 			}
1495 
1496 			/*
1497 			 * In any case, this indextuple doesn't match the qual.
1498 			 */
1499 			return NULL;
1500 		}
1501 
1502 		test = FunctionCall2Coll(&key->sk_func, key->sk_collation,
1503 								 datum, key->sk_argument);
1504 
1505 		if (!DatumGetBool(test))
1506 		{
1507 			/*
1508 			 * Tuple fails this qual.  If it's a required qual for the current
1509 			 * scan direction, then we can conclude no further tuples will
1510 			 * pass, either.
1511 			 *
1512 			 * Note: because we stop the scan as soon as any required equality
1513 			 * qual fails, it is critical that equality quals be used for the
1514 			 * initial positioning in _bt_first() when they are available. See
1515 			 * comments in _bt_first().
1516 			 */
1517 			if ((key->sk_flags & SK_BT_REQFWD) &&
1518 				ScanDirectionIsForward(dir))
1519 				*continuescan = false;
1520 			else if ((key->sk_flags & SK_BT_REQBKWD) &&
1521 					 ScanDirectionIsBackward(dir))
1522 				*continuescan = false;
1523 
1524 			/*
1525 			 * In any case, this indextuple doesn't match the qual.
1526 			 */
1527 			return NULL;
1528 		}
1529 	}
1530 
1531 	/* Check for failure due to it being a killed tuple. */
1532 	if (!tuple_alive)
1533 		return NULL;
1534 
1535 	/* If we get here, the tuple passes all index quals. */
1536 	return tuple;
1537 }
1538 
1539 /*
1540  * Test whether an indextuple satisfies a row-comparison scan condition.
1541  *
1542  * Return true if so, false if not.  If not, also clear *continuescan if
1543  * it's not possible for any future tuples in the current scan direction
1544  * to pass the qual.
1545  *
1546  * This is a subroutine for _bt_checkkeys, which see for more info.
1547  */
1548 static bool
_bt_check_rowcompare(ScanKey skey,IndexTuple tuple,TupleDesc tupdesc,ScanDirection dir,bool * continuescan)1549 _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc,
1550 					 ScanDirection dir, bool *continuescan)
1551 {
1552 	ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1553 	int32		cmpresult = 0;
1554 	bool		result;
1555 
1556 	/* First subkey should be same as the header says */
1557 	Assert(subkey->sk_attno == skey->sk_attno);
1558 
1559 	/* Loop over columns of the row condition */
1560 	for (;;)
1561 	{
1562 		Datum		datum;
1563 		bool		isNull;
1564 
1565 		Assert(subkey->sk_flags & SK_ROW_MEMBER);
1566 
1567 		datum = index_getattr(tuple,
1568 							  subkey->sk_attno,
1569 							  tupdesc,
1570 							  &isNull);
1571 
1572 		if (isNull)
1573 		{
1574 			if (subkey->sk_flags & SK_BT_NULLS_FIRST)
1575 			{
1576 				/*
1577 				 * Since NULLs are sorted before non-NULLs, we know we have
1578 				 * reached the lower limit of the range of values for this
1579 				 * index attr.  On a backward scan, we can stop if this qual
1580 				 * is one of the "must match" subset.  We can stop regardless
1581 				 * of whether the qual is > or <, so long as it's required,
1582 				 * because it's not possible for any future tuples to pass. On
1583 				 * a forward scan, however, we must keep going, because we may
1584 				 * have initially positioned to the start of the index.
1585 				 */
1586 				if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1587 					ScanDirectionIsBackward(dir))
1588 					*continuescan = false;
1589 			}
1590 			else
1591 			{
1592 				/*
1593 				 * Since NULLs are sorted after non-NULLs, we know we have
1594 				 * reached the upper limit of the range of values for this
1595 				 * index attr.  On a forward scan, we can stop if this qual is
1596 				 * one of the "must match" subset.  We can stop regardless of
1597 				 * whether the qual is > or <, so long as it's required,
1598 				 * because it's not possible for any future tuples to pass. On
1599 				 * a backward scan, however, we must keep going, because we
1600 				 * may have initially positioned to the end of the index.
1601 				 */
1602 				if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1603 					ScanDirectionIsForward(dir))
1604 					*continuescan = false;
1605 			}
1606 
1607 			/*
1608 			 * In any case, this indextuple doesn't match the qual.
1609 			 */
1610 			return false;
1611 		}
1612 
1613 		if (subkey->sk_flags & SK_ISNULL)
1614 		{
1615 			/*
1616 			 * Unlike the simple-scankey case, this isn't a disallowed case.
1617 			 * But it can never match.  If all the earlier row comparison
1618 			 * columns are required for the scan direction, we can stop the
1619 			 * scan, because there can't be another tuple that will succeed.
1620 			 */
1621 			if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument))
1622 				subkey--;
1623 			if ((subkey->sk_flags & SK_BT_REQFWD) &&
1624 				ScanDirectionIsForward(dir))
1625 				*continuescan = false;
1626 			else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1627 					 ScanDirectionIsBackward(dir))
1628 				*continuescan = false;
1629 			return false;
1630 		}
1631 
1632 		/* Perform the test --- three-way comparison not bool operator */
1633 		cmpresult = DatumGetInt32(FunctionCall2Coll(&subkey->sk_func,
1634 													subkey->sk_collation,
1635 													datum,
1636 													subkey->sk_argument));
1637 
1638 		if (subkey->sk_flags & SK_BT_DESC)
1639 			INVERT_COMPARE_RESULT(cmpresult);
1640 
1641 		/* Done comparing if unequal, else advance to next column */
1642 		if (cmpresult != 0)
1643 			break;
1644 
1645 		if (subkey->sk_flags & SK_ROW_END)
1646 			break;
1647 		subkey++;
1648 	}
1649 
1650 	/*
1651 	 * At this point cmpresult indicates the overall result of the row
1652 	 * comparison, and subkey points to the deciding column (or the last
1653 	 * column if the result is "=").
1654 	 */
1655 	switch (subkey->sk_strategy)
1656 	{
1657 			/* EQ and NE cases aren't allowed here */
1658 		case BTLessStrategyNumber:
1659 			result = (cmpresult < 0);
1660 			break;
1661 		case BTLessEqualStrategyNumber:
1662 			result = (cmpresult <= 0);
1663 			break;
1664 		case BTGreaterEqualStrategyNumber:
1665 			result = (cmpresult >= 0);
1666 			break;
1667 		case BTGreaterStrategyNumber:
1668 			result = (cmpresult > 0);
1669 			break;
1670 		default:
1671 			elog(ERROR, "unrecognized RowCompareType: %d",
1672 				 (int) subkey->sk_strategy);
1673 			result = 0;			/* keep compiler quiet */
1674 			break;
1675 	}
1676 
1677 	if (!result)
1678 	{
1679 		/*
1680 		 * Tuple fails this qual.  If it's a required qual for the current
1681 		 * scan direction, then we can conclude no further tuples will pass,
1682 		 * either.  Note we have to look at the deciding column, not
1683 		 * necessarily the first or last column of the row condition.
1684 		 */
1685 		if ((subkey->sk_flags & SK_BT_REQFWD) &&
1686 			ScanDirectionIsForward(dir))
1687 			*continuescan = false;
1688 		else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1689 				 ScanDirectionIsBackward(dir))
1690 			*continuescan = false;
1691 	}
1692 
1693 	return result;
1694 }
1695 
1696 /*
1697  * _bt_killitems - set LP_DEAD state for items an indexscan caller has
1698  * told us were killed
1699  *
1700  * scan->opaque, referenced locally through so, contains information about the
1701  * current page and killed tuples thereon (generally, this should only be
1702  * called if so->numKilled > 0).
1703  *
1704  * The caller does not have a lock on the page and may or may not have the
1705  * page pinned in a buffer.  Note that read-lock is sufficient for setting
1706  * LP_DEAD status (which is only a hint).
1707  *
1708  * We match items by heap TID before assuming they are the right ones to
1709  * delete.  We cope with cases where items have moved right due to insertions.
1710  * If an item has moved off the current page due to a split, we'll fail to
1711  * find it and do nothing (this is not an error case --- we assume the item
1712  * will eventually get marked in a future indexscan).
1713  *
1714  * Note that if we hold a pin on the target page continuously from initially
1715  * reading the items until applying this function, VACUUM cannot have deleted
1716  * any items from the page, and so there is no need to search left from the
1717  * recorded offset.  (This observation also guarantees that the item is still
1718  * the right one to delete, which might otherwise be questionable since heap
1719  * TIDs can get recycled.)	This holds true even if the page has been modified
1720  * by inserts and page splits, so there is no need to consult the LSN.
1721  *
1722  * If the pin was released after reading the page, then we re-read it.  If it
1723  * has been modified since we read it (as determined by the LSN), we dare not
1724  * flag any entries because it is possible that the old entry was vacuumed
1725  * away and the TID was re-used by a completely different heap tuple.
1726  */
1727 void
_bt_killitems(IndexScanDesc scan)1728 _bt_killitems(IndexScanDesc scan)
1729 {
1730 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
1731 	Page		page;
1732 	BTPageOpaque opaque;
1733 	OffsetNumber minoff;
1734 	OffsetNumber maxoff;
1735 	int			i;
1736 	int			numKilled = so->numKilled;
1737 	bool		killedsomething = false;
1738 
1739 	Assert(BTScanPosIsValid(so->currPos));
1740 
1741 	/*
1742 	 * Always reset the scan state, so we don't look for same items on other
1743 	 * pages.
1744 	 */
1745 	so->numKilled = 0;
1746 
1747 	if (BTScanPosIsPinned(so->currPos))
1748 	{
1749 		/*
1750 		 * We have held the pin on this page since we read the index tuples,
1751 		 * so all we need to do is lock it.  The pin will have prevented
1752 		 * re-use of any TID on the page, so there is no need to check the
1753 		 * LSN.
1754 		 */
1755 		LockBuffer(so->currPos.buf, BT_READ);
1756 
1757 		page = BufferGetPage(so->currPos.buf);
1758 	}
1759 	else
1760 	{
1761 		Buffer		buf;
1762 
1763 		/* Attempt to re-read the buffer, getting pin and lock. */
1764 		buf = _bt_getbuf(scan->indexRelation, so->currPos.currPage, BT_READ);
1765 
1766 		/* It might not exist anymore; in which case we can't hint it. */
1767 		if (!BufferIsValid(buf))
1768 			return;
1769 
1770 		page = BufferGetPage(buf);
1771 		if (BufferGetLSNAtomic(buf) == so->currPos.lsn)
1772 			so->currPos.buf = buf;
1773 		else
1774 		{
1775 			/* Modified while not pinned means hinting is not safe. */
1776 			_bt_relbuf(scan->indexRelation, buf);
1777 			return;
1778 		}
1779 	}
1780 
1781 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1782 	minoff = P_FIRSTDATAKEY(opaque);
1783 	maxoff = PageGetMaxOffsetNumber(page);
1784 
1785 	for (i = 0; i < numKilled; i++)
1786 	{
1787 		int			itemIndex = so->killedItems[i];
1788 		BTScanPosItem *kitem = &so->currPos.items[itemIndex];
1789 		OffsetNumber offnum = kitem->indexOffset;
1790 
1791 		Assert(itemIndex >= so->currPos.firstItem &&
1792 			   itemIndex <= so->currPos.lastItem);
1793 		if (offnum < minoff)
1794 			continue;			/* pure paranoia */
1795 		while (offnum <= maxoff)
1796 		{
1797 			ItemId		iid = PageGetItemId(page, offnum);
1798 			IndexTuple	ituple = (IndexTuple) PageGetItem(page, iid);
1799 
1800 			if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
1801 			{
1802 				/*
1803 				 * Found the item.  Mark it as dead, if it isn't already.
1804 				 * Since this happens while holding a buffer lock possibly in
1805 				 * shared mode, it's possible that multiple processes attempt
1806 				 * to do this simultaneously, leading to multiple full-page
1807 				 * images being sent to WAL (if wal_log_hints or data checksums
1808 				 * are enabled), which is undesirable.
1809 				 */
1810 				if (!ItemIdIsDead(iid))
1811 				{
1812 					ItemIdMarkDead(iid);
1813 					killedsomething = true;
1814 				}
1815 				break;			/* out of inner search loop */
1816 			}
1817 			offnum = OffsetNumberNext(offnum);
1818 		}
1819 	}
1820 
1821 	/*
1822 	 * Since this can be redone later if needed, mark as dirty hint.
1823 	 *
1824 	 * Whenever we mark anything LP_DEAD, we also set the page's
1825 	 * BTP_HAS_GARBAGE flag, which is likewise just a hint.
1826 	 */
1827 	if (killedsomething)
1828 	{
1829 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
1830 		MarkBufferDirtyHint(so->currPos.buf, true);
1831 	}
1832 
1833 	LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
1834 }
1835 
1836 
1837 /*
1838  * The following routines manage a shared-memory area in which we track
1839  * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
1840  * operations.  There is a single counter which increments each time we
1841  * start a vacuum to assign it a cycle ID.  Since multiple vacuums could
1842  * be active concurrently, we have to track the cycle ID for each active
1843  * vacuum; this requires at most MaxBackends entries (usually far fewer).
1844  * We assume at most one vacuum can be active for a given index.
1845  *
1846  * Access to the shared memory area is controlled by BtreeVacuumLock.
1847  * In principle we could use a separate lmgr locktag for each index,
1848  * but a single LWLock is much cheaper, and given the short time that
1849  * the lock is ever held, the concurrency hit should be minimal.
1850  */
1851 
1852 typedef struct BTOneVacInfo
1853 {
1854 	LockRelId	relid;			/* global identifier of an index */
1855 	BTCycleId	cycleid;		/* cycle ID for its active VACUUM */
1856 } BTOneVacInfo;
1857 
1858 typedef struct BTVacInfo
1859 {
1860 	BTCycleId	cycle_ctr;		/* cycle ID most recently assigned */
1861 	int			num_vacuums;	/* number of currently active VACUUMs */
1862 	int			max_vacuums;	/* allocated length of vacuums[] array */
1863 	BTOneVacInfo vacuums[FLEXIBLE_ARRAY_MEMBER];
1864 } BTVacInfo;
1865 
1866 static BTVacInfo *btvacinfo;
1867 
1868 
1869 /*
1870  * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
1871  *		or zero if there is no active VACUUM
1872  *
1873  * Note: for correct interlocking, the caller must already hold pin and
1874  * exclusive lock on each buffer it will store the cycle ID into.  This
1875  * ensures that even if a VACUUM starts immediately afterwards, it cannot
1876  * process those pages until the page split is complete.
1877  */
1878 BTCycleId
_bt_vacuum_cycleid(Relation rel)1879 _bt_vacuum_cycleid(Relation rel)
1880 {
1881 	BTCycleId	result = 0;
1882 	int			i;
1883 
1884 	/* Share lock is enough since this is a read-only operation */
1885 	LWLockAcquire(BtreeVacuumLock, LW_SHARED);
1886 
1887 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1888 	{
1889 		BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1890 
1891 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1892 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1893 		{
1894 			result = vac->cycleid;
1895 			break;
1896 		}
1897 	}
1898 
1899 	LWLockRelease(BtreeVacuumLock);
1900 	return result;
1901 }
1902 
1903 /*
1904  * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
1905  *
1906  * Note: the caller must guarantee that it will eventually call
1907  * _bt_end_vacuum, else we'll permanently leak an array slot.  To ensure
1908  * that this happens even in elog(FATAL) scenarios, the appropriate coding
1909  * is not just a PG_TRY, but
1910  *		PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel))
1911  */
1912 BTCycleId
_bt_start_vacuum(Relation rel)1913 _bt_start_vacuum(Relation rel)
1914 {
1915 	BTCycleId	result;
1916 	int			i;
1917 	BTOneVacInfo *vac;
1918 
1919 	LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1920 
1921 	/*
1922 	 * Assign the next cycle ID, being careful to avoid zero as well as the
1923 	 * reserved high values.
1924 	 */
1925 	result = ++(btvacinfo->cycle_ctr);
1926 	if (result == 0 || result > MAX_BT_CYCLE_ID)
1927 		result = btvacinfo->cycle_ctr = 1;
1928 
1929 	/* Let's just make sure there's no entry already for this index */
1930 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1931 	{
1932 		vac = &btvacinfo->vacuums[i];
1933 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1934 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1935 		{
1936 			/*
1937 			 * Unlike most places in the backend, we have to explicitly
1938 			 * release our LWLock before throwing an error.  This is because
1939 			 * we expect _bt_end_vacuum() to be called before transaction
1940 			 * abort cleanup can run to release LWLocks.
1941 			 */
1942 			LWLockRelease(BtreeVacuumLock);
1943 			elog(ERROR, "multiple active vacuums for index \"%s\"",
1944 				 RelationGetRelationName(rel));
1945 		}
1946 	}
1947 
1948 	/* OK, add an entry */
1949 	if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
1950 	{
1951 		LWLockRelease(BtreeVacuumLock);
1952 		elog(ERROR, "out of btvacinfo slots");
1953 	}
1954 	vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
1955 	vac->relid = rel->rd_lockInfo.lockRelId;
1956 	vac->cycleid = result;
1957 	btvacinfo->num_vacuums++;
1958 
1959 	LWLockRelease(BtreeVacuumLock);
1960 	return result;
1961 }
1962 
1963 /*
1964  * _bt_end_vacuum --- mark a btree VACUUM operation as done
1965  *
1966  * Note: this is deliberately coded not to complain if no entry is found;
1967  * this allows the caller to put PG_TRY around the start_vacuum operation.
1968  */
1969 void
_bt_end_vacuum(Relation rel)1970 _bt_end_vacuum(Relation rel)
1971 {
1972 	int			i;
1973 
1974 	LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1975 
1976 	/* Find the array entry */
1977 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1978 	{
1979 		BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1980 
1981 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1982 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1983 		{
1984 			/* Remove it by shifting down the last entry */
1985 			*vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
1986 			btvacinfo->num_vacuums--;
1987 			break;
1988 		}
1989 	}
1990 
1991 	LWLockRelease(BtreeVacuumLock);
1992 }
1993 
1994 /*
1995  * _bt_end_vacuum wrapped as an on_shmem_exit callback function
1996  */
1997 void
_bt_end_vacuum_callback(int code,Datum arg)1998 _bt_end_vacuum_callback(int code, Datum arg)
1999 {
2000 	_bt_end_vacuum((Relation) DatumGetPointer(arg));
2001 }
2002 
2003 /*
2004  * BTreeShmemSize --- report amount of shared memory space needed
2005  */
2006 Size
BTreeShmemSize(void)2007 BTreeShmemSize(void)
2008 {
2009 	Size		size;
2010 
2011 	size = offsetof(BTVacInfo, vacuums);
2012 	size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
2013 	return size;
2014 }
2015 
2016 /*
2017  * BTreeShmemInit --- initialize this module's shared memory
2018  */
2019 void
BTreeShmemInit(void)2020 BTreeShmemInit(void)
2021 {
2022 	bool		found;
2023 
2024 	btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
2025 											  BTreeShmemSize(),
2026 											  &found);
2027 
2028 	if (!IsUnderPostmaster)
2029 	{
2030 		/* Initialize shared memory area */
2031 		Assert(!found);
2032 
2033 		/*
2034 		 * It doesn't really matter what the cycle counter starts at, but
2035 		 * having it always start the same doesn't seem good.  Seed with
2036 		 * low-order bits of time() instead.
2037 		 */
2038 		btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
2039 
2040 		btvacinfo->num_vacuums = 0;
2041 		btvacinfo->max_vacuums = MaxBackends;
2042 	}
2043 	else
2044 		Assert(found);
2045 }
2046 
2047 bytea *
btoptions(Datum reloptions,bool validate)2048 btoptions(Datum reloptions, bool validate)
2049 {
2050 	return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
2051 }
2052 
2053 /*
2054  *	btproperty() -- Check boolean properties of indexes.
2055  *
2056  * This is optional, but handling AMPROP_RETURNABLE here saves opening the rel
2057  * to call btcanreturn.
2058  */
2059 bool
btproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)2060 btproperty(Oid index_oid, int attno,
2061 		   IndexAMProperty prop, const char *propname,
2062 		   bool *res, bool *isnull)
2063 {
2064 	switch (prop)
2065 	{
2066 		case AMPROP_RETURNABLE:
2067 			/* answer only for columns, not AM or whole index */
2068 			if (attno == 0)
2069 				return false;
2070 			/* otherwise, btree can always return data */
2071 			*res = true;
2072 			return true;
2073 
2074 		default:
2075 			return false;		/* punt to generic code */
2076 	}
2077 }
2078