1 /*-------------------------------------------------------------------------
2  *
3  * nbtutils.c
4  *	  Utility code for Postgres btree implementation.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/nbtree/nbtutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include <time.h>
19 
20 #include "access/nbtree.h"
21 #include "access/reloptions.h"
22 #include "access/relscan.h"
23 #include "miscadmin.h"
24 #include "utils/array.h"
25 #include "utils/lsyscache.h"
26 #include "utils/memutils.h"
27 #include "utils/rel.h"
28 
29 
30 typedef struct BTSortArrayContext
31 {
32 	FmgrInfo	flinfo;
33 	Oid			collation;
34 	bool		reverse;
35 } BTSortArrayContext;
36 
37 static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
38 						 StrategyNumber strat,
39 						 Datum *elems, int nelems);
40 static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
41 						bool reverse,
42 						Datum *elems, int nelems);
43 static int	_bt_compare_array_elements(const void *a, const void *b, void *arg);
44 static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
45 						 ScanKey leftarg, ScanKey rightarg,
46 						 bool *result);
47 static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption);
48 static void _bt_mark_scankey_required(ScanKey skey);
49 static bool _bt_check_rowcompare(ScanKey skey,
50 					 IndexTuple tuple, TupleDesc tupdesc,
51 					 ScanDirection dir, bool *continuescan);
52 
53 
54 /*
55  * _bt_mkscankey
56  *		Build an insertion scan key that contains comparison data from itup
57  *		as well as comparator routines appropriate to the key datatypes.
58  *
59  *		The result is intended for use with _bt_compare().
60  */
61 ScanKey
62 _bt_mkscankey(Relation rel, IndexTuple itup)
63 {
64 	ScanKey		skey;
65 	TupleDesc	itupdesc;
66 	int			indnatts PG_USED_FOR_ASSERTS_ONLY;
67 	int			indnkeyatts;
68 	int16	   *indoption;
69 	int			i;
70 
71 	itupdesc = RelationGetDescr(rel);
72 	indnatts = IndexRelationGetNumberOfAttributes(rel);
73 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
74 	indoption = rel->rd_indoption;
75 
76 	Assert(indnkeyatts > 0);
77 	Assert(indnkeyatts <= indnatts);
78 	Assert(BTreeTupleGetNAtts(itup, rel) == indnatts ||
79 		   BTreeTupleGetNAtts(itup, rel) == indnkeyatts);
80 
81 	/*
82 	 * We'll execute search using scan key constructed on key columns. Non-key
83 	 * (INCLUDE index) columns are always omitted from scan keys.
84 	 */
85 	skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
86 
87 	for (i = 0; i < indnkeyatts; i++)
88 	{
89 		FmgrInfo   *procinfo;
90 		Datum		arg;
91 		bool		null;
92 		int			flags;
93 
94 		/*
95 		 * We can use the cached (default) support procs since no cross-type
96 		 * comparison can be needed.
97 		 */
98 		procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
99 		arg = index_getattr(itup, i + 1, itupdesc, &null);
100 		flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
101 		ScanKeyEntryInitializeWithInfo(&skey[i],
102 									   flags,
103 									   (AttrNumber) (i + 1),
104 									   InvalidStrategy,
105 									   InvalidOid,
106 									   rel->rd_indcollation[i],
107 									   procinfo,
108 									   arg);
109 	}
110 
111 	return skey;
112 }
113 
114 /*
115  * _bt_mkscankey_nodata
116  *		Build an insertion scan key that contains 3-way comparator routines
117  *		appropriate to the key datatypes, but no comparison data.  The
118  *		comparison data ultimately used must match the key datatypes.
119  *
120  *		The result cannot be used with _bt_compare(), unless comparison
121  *		data is first stored into the key entries.  Currently this
122  *		routine is only called by nbtsort.c and tuplesort.c, which have
123  *		their own comparison routines.
124  */
125 ScanKey
126 _bt_mkscankey_nodata(Relation rel)
127 {
128 	ScanKey		skey;
129 	int			indnkeyatts;
130 	int16	   *indoption;
131 	int			i;
132 
133 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
134 	indoption = rel->rd_indoption;
135 
136 	skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
137 
138 	for (i = 0; i < indnkeyatts; i++)
139 	{
140 		FmgrInfo   *procinfo;
141 		int			flags;
142 
143 		/*
144 		 * We can use the cached (default) support procs since no cross-type
145 		 * comparison can be needed.
146 		 */
147 		procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
148 		flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
149 		ScanKeyEntryInitializeWithInfo(&skey[i],
150 									   flags,
151 									   (AttrNumber) (i + 1),
152 									   InvalidStrategy,
153 									   InvalidOid,
154 									   rel->rd_indcollation[i],
155 									   procinfo,
156 									   (Datum) 0);
157 	}
158 
159 	return skey;
160 }
161 
162 /*
163  * free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
164  */
165 void
166 _bt_freeskey(ScanKey skey)
167 {
168 	pfree(skey);
169 }
170 
171 /*
172  * free a retracement stack made by _bt_search.
173  */
174 void
175 _bt_freestack(BTStack stack)
176 {
177 	BTStack		ostack;
178 
179 	while (stack != NULL)
180 	{
181 		ostack = stack;
182 		stack = stack->bts_parent;
183 		pfree(ostack);
184 	}
185 }
186 
187 
188 /*
189  *	_bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys
190  *
191  * If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and
192  * set up BTArrayKeyInfo info for each one that is an equality-type key.
193  * Prepare modified scan keys in so->arrayKeyData, which will hold the current
194  * array elements during each primitive indexscan operation.  For inequality
195  * array keys, it's sufficient to find the extreme element value and replace
196  * the whole array with that scalar value.
197  *
198  * Note: the reason we need so->arrayKeyData, rather than just scribbling
199  * on scan->keyData, is that callers are permitted to call btrescan without
200  * supplying a new set of scankey data.
201  */
202 void
203 _bt_preprocess_array_keys(IndexScanDesc scan)
204 {
205 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
206 	int			numberOfKeys = scan->numberOfKeys;
207 	int16	   *indoption = scan->indexRelation->rd_indoption;
208 	int			numArrayKeys;
209 	ScanKey		cur;
210 	int			i;
211 	MemoryContext oldContext;
212 
213 	/* Quick check to see if there are any array keys */
214 	numArrayKeys = 0;
215 	for (i = 0; i < numberOfKeys; i++)
216 	{
217 		cur = &scan->keyData[i];
218 		if (cur->sk_flags & SK_SEARCHARRAY)
219 		{
220 			numArrayKeys++;
221 			Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL)));
222 			/* If any arrays are null as a whole, we can quit right now. */
223 			if (cur->sk_flags & SK_ISNULL)
224 			{
225 				so->numArrayKeys = -1;
226 				so->arrayKeyData = NULL;
227 				return;
228 			}
229 		}
230 	}
231 
232 	/* Quit if nothing to do. */
233 	if (numArrayKeys == 0)
234 	{
235 		so->numArrayKeys = 0;
236 		so->arrayKeyData = NULL;
237 		return;
238 	}
239 
240 	/*
241 	 * Make a scan-lifespan context to hold array-associated data, or reset it
242 	 * if we already have one from a previous rescan cycle.
243 	 */
244 	if (so->arrayContext == NULL)
245 		so->arrayContext = AllocSetContextCreate(CurrentMemoryContext,
246 												 "BTree array context",
247 												 ALLOCSET_SMALL_SIZES);
248 	else
249 		MemoryContextReset(so->arrayContext);
250 
251 	oldContext = MemoryContextSwitchTo(so->arrayContext);
252 
253 	/* Create modifiable copy of scan->keyData in the workspace context */
254 	so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
255 	memcpy(so->arrayKeyData,
256 		   scan->keyData,
257 		   scan->numberOfKeys * sizeof(ScanKeyData));
258 
259 	/* Allocate space for per-array data in the workspace context */
260 	so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo));
261 
262 	/* Now process each array key */
263 	numArrayKeys = 0;
264 	for (i = 0; i < numberOfKeys; i++)
265 	{
266 		ArrayType  *arrayval;
267 		int16		elmlen;
268 		bool		elmbyval;
269 		char		elmalign;
270 		int			num_elems;
271 		Datum	   *elem_values;
272 		bool	   *elem_nulls;
273 		int			num_nonnulls;
274 		int			j;
275 
276 		cur = &so->arrayKeyData[i];
277 		if (!(cur->sk_flags & SK_SEARCHARRAY))
278 			continue;
279 
280 		/*
281 		 * First, deconstruct the array into elements.  Anything allocated
282 		 * here (including a possibly detoasted array value) is in the
283 		 * workspace context.
284 		 */
285 		arrayval = DatumGetArrayTypeP(cur->sk_argument);
286 		/* We could cache this data, but not clear it's worth it */
287 		get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
288 							 &elmlen, &elmbyval, &elmalign);
289 		deconstruct_array(arrayval,
290 						  ARR_ELEMTYPE(arrayval),
291 						  elmlen, elmbyval, elmalign,
292 						  &elem_values, &elem_nulls, &num_elems);
293 
294 		/*
295 		 * Compress out any null elements.  We can ignore them since we assume
296 		 * all btree operators are strict.
297 		 */
298 		num_nonnulls = 0;
299 		for (j = 0; j < num_elems; j++)
300 		{
301 			if (!elem_nulls[j])
302 				elem_values[num_nonnulls++] = elem_values[j];
303 		}
304 
305 		/* We could pfree(elem_nulls) now, but not worth the cycles */
306 
307 		/* If there's no non-nulls, the scan qual is unsatisfiable */
308 		if (num_nonnulls == 0)
309 		{
310 			numArrayKeys = -1;
311 			break;
312 		}
313 
314 		/*
315 		 * If the comparison operator is not equality, then the array qual
316 		 * degenerates to a simple comparison against the smallest or largest
317 		 * non-null array element, as appropriate.
318 		 */
319 		switch (cur->sk_strategy)
320 		{
321 			case BTLessStrategyNumber:
322 			case BTLessEqualStrategyNumber:
323 				cur->sk_argument =
324 					_bt_find_extreme_element(scan, cur,
325 											 BTGreaterStrategyNumber,
326 											 elem_values, num_nonnulls);
327 				continue;
328 			case BTEqualStrategyNumber:
329 				/* proceed with rest of loop */
330 				break;
331 			case BTGreaterEqualStrategyNumber:
332 			case BTGreaterStrategyNumber:
333 				cur->sk_argument =
334 					_bt_find_extreme_element(scan, cur,
335 											 BTLessStrategyNumber,
336 											 elem_values, num_nonnulls);
337 				continue;
338 			default:
339 				elog(ERROR, "unrecognized StrategyNumber: %d",
340 					 (int) cur->sk_strategy);
341 				break;
342 		}
343 
344 		/*
345 		 * Sort the non-null elements and eliminate any duplicates.  We must
346 		 * sort in the same ordering used by the index column, so that the
347 		 * successive primitive indexscans produce data in index order.
348 		 */
349 		num_elems = _bt_sort_array_elements(scan, cur,
350 											(indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0,
351 											elem_values, num_nonnulls);
352 
353 		/*
354 		 * And set up the BTArrayKeyInfo data.
355 		 */
356 		so->arrayKeys[numArrayKeys].scan_key = i;
357 		so->arrayKeys[numArrayKeys].num_elems = num_elems;
358 		so->arrayKeys[numArrayKeys].elem_values = elem_values;
359 		numArrayKeys++;
360 	}
361 
362 	so->numArrayKeys = numArrayKeys;
363 
364 	MemoryContextSwitchTo(oldContext);
365 }
366 
367 /*
368  * _bt_find_extreme_element() -- get least or greatest array element
369  *
370  * scan and skey identify the index column, whose opfamily determines the
371  * comparison semantics.  strat should be BTLessStrategyNumber to get the
372  * least element, or BTGreaterStrategyNumber to get the greatest.
373  */
374 static Datum
375 _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
376 						 StrategyNumber strat,
377 						 Datum *elems, int nelems)
378 {
379 	Relation	rel = scan->indexRelation;
380 	Oid			elemtype,
381 				cmp_op;
382 	RegProcedure cmp_proc;
383 	FmgrInfo	flinfo;
384 	Datum		result;
385 	int			i;
386 
387 	/*
388 	 * Determine the nominal datatype of the array elements.  We have to
389 	 * support the convention that sk_subtype == InvalidOid means the opclass
390 	 * input type; this is a hack to simplify life for ScanKeyInit().
391 	 */
392 	elemtype = skey->sk_subtype;
393 	if (elemtype == InvalidOid)
394 		elemtype = rel->rd_opcintype[skey->sk_attno - 1];
395 
396 	/*
397 	 * Look up the appropriate comparison operator in the opfamily.
398 	 *
399 	 * Note: it's possible that this would fail, if the opfamily is
400 	 * incomplete, but it seems quite unlikely that an opfamily would omit
401 	 * non-cross-type comparison operators for any datatype that it supports
402 	 * at all.
403 	 */
404 	cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1],
405 								 elemtype,
406 								 elemtype,
407 								 strat);
408 	if (!OidIsValid(cmp_op))
409 		elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
410 			 strat, elemtype, elemtype,
411 			 rel->rd_opfamily[skey->sk_attno - 1]);
412 	cmp_proc = get_opcode(cmp_op);
413 	if (!RegProcedureIsValid(cmp_proc))
414 		elog(ERROR, "missing oprcode for operator %u", cmp_op);
415 
416 	fmgr_info(cmp_proc, &flinfo);
417 
418 	Assert(nelems > 0);
419 	result = elems[0];
420 	for (i = 1; i < nelems; i++)
421 	{
422 		if (DatumGetBool(FunctionCall2Coll(&flinfo,
423 										   skey->sk_collation,
424 										   elems[i],
425 										   result)))
426 			result = elems[i];
427 	}
428 
429 	return result;
430 }
431 
432 /*
433  * _bt_sort_array_elements() -- sort and de-dup array elements
434  *
435  * The array elements are sorted in-place, and the new number of elements
436  * after duplicate removal is returned.
437  *
438  * scan and skey identify the index column, whose opfamily determines the
439  * comparison semantics.  If reverse is true, we sort in descending order.
440  */
441 static int
442 _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
443 						bool reverse,
444 						Datum *elems, int nelems)
445 {
446 	Relation	rel = scan->indexRelation;
447 	Oid			elemtype;
448 	RegProcedure cmp_proc;
449 	BTSortArrayContext cxt;
450 	int			last_non_dup;
451 	int			i;
452 
453 	if (nelems <= 1)
454 		return nelems;			/* no work to do */
455 
456 	/*
457 	 * Determine the nominal datatype of the array elements.  We have to
458 	 * support the convention that sk_subtype == InvalidOid means the opclass
459 	 * input type; this is a hack to simplify life for ScanKeyInit().
460 	 */
461 	elemtype = skey->sk_subtype;
462 	if (elemtype == InvalidOid)
463 		elemtype = rel->rd_opcintype[skey->sk_attno - 1];
464 
465 	/*
466 	 * Look up the appropriate comparison function in the opfamily.
467 	 *
468 	 * Note: it's possible that this would fail, if the opfamily is
469 	 * incomplete, but it seems quite unlikely that an opfamily would omit
470 	 * non-cross-type support functions for any datatype that it supports at
471 	 * all.
472 	 */
473 	cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1],
474 								 elemtype,
475 								 elemtype,
476 								 BTORDER_PROC);
477 	if (!RegProcedureIsValid(cmp_proc))
478 		elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
479 			 BTORDER_PROC, elemtype, elemtype,
480 			 rel->rd_opfamily[skey->sk_attno - 1]);
481 
482 	/* Sort the array elements */
483 	fmgr_info(cmp_proc, &cxt.flinfo);
484 	cxt.collation = skey->sk_collation;
485 	cxt.reverse = reverse;
486 	qsort_arg((void *) elems, nelems, sizeof(Datum),
487 			  _bt_compare_array_elements, (void *) &cxt);
488 
489 	/* Now scan the sorted elements and remove duplicates */
490 	last_non_dup = 0;
491 	for (i = 1; i < nelems; i++)
492 	{
493 		int32		compare;
494 
495 		compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo,
496 												  cxt.collation,
497 												  elems[last_non_dup],
498 												  elems[i]));
499 		if (compare != 0)
500 			elems[++last_non_dup] = elems[i];
501 	}
502 
503 	return last_non_dup + 1;
504 }
505 
506 /*
507  * qsort_arg comparator for sorting array elements
508  */
509 static int
510 _bt_compare_array_elements(const void *a, const void *b, void *arg)
511 {
512 	Datum		da = *((const Datum *) a);
513 	Datum		db = *((const Datum *) b);
514 	BTSortArrayContext *cxt = (BTSortArrayContext *) arg;
515 	int32		compare;
516 
517 	compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo,
518 											  cxt->collation,
519 											  da, db));
520 	if (cxt->reverse)
521 		INVERT_COMPARE_RESULT(compare);
522 	return compare;
523 }
524 
525 /*
526  * _bt_start_array_keys() -- Initialize array keys at start of a scan
527  *
528  * Set up the cur_elem counters and fill in the first sk_argument value for
529  * each array scankey.  We can't do this until we know the scan direction.
530  */
531 void
532 _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
533 {
534 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
535 	int			i;
536 
537 	for (i = 0; i < so->numArrayKeys; i++)
538 	{
539 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
540 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
541 
542 		Assert(curArrayKey->num_elems > 0);
543 		if (ScanDirectionIsBackward(dir))
544 			curArrayKey->cur_elem = curArrayKey->num_elems - 1;
545 		else
546 			curArrayKey->cur_elem = 0;
547 		skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
548 	}
549 }
550 
551 /*
552  * _bt_advance_array_keys() -- Advance to next set of array elements
553  *
554  * Returns true if there is another set of values to consider, false if not.
555  * On true result, the scankeys are initialized with the next set of values.
556  */
557 bool
558 _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
559 {
560 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
561 	bool		found = false;
562 	int			i;
563 
564 	/*
565 	 * We must advance the last array key most quickly, since it will
566 	 * correspond to the lowest-order index column among the available
567 	 * qualifications. This is necessary to ensure correct ordering of output
568 	 * when there are multiple array keys.
569 	 */
570 	for (i = so->numArrayKeys - 1; i >= 0; i--)
571 	{
572 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
573 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
574 		int			cur_elem = curArrayKey->cur_elem;
575 		int			num_elems = curArrayKey->num_elems;
576 
577 		if (ScanDirectionIsBackward(dir))
578 		{
579 			if (--cur_elem < 0)
580 			{
581 				cur_elem = num_elems - 1;
582 				found = false;	/* need to advance next array key */
583 			}
584 			else
585 				found = true;
586 		}
587 		else
588 		{
589 			if (++cur_elem >= num_elems)
590 			{
591 				cur_elem = 0;
592 				found = false;	/* need to advance next array key */
593 			}
594 			else
595 				found = true;
596 		}
597 
598 		curArrayKey->cur_elem = cur_elem;
599 		skey->sk_argument = curArrayKey->elem_values[cur_elem];
600 		if (found)
601 			break;
602 	}
603 
604 	/* advance parallel scan */
605 	if (scan->parallel_scan != NULL)
606 		_bt_parallel_advance_array_keys(scan);
607 
608 	return found;
609 }
610 
611 /*
612  * _bt_mark_array_keys() -- Handle array keys during btmarkpos
613  *
614  * Save the current state of the array keys as the "mark" position.
615  */
616 void
617 _bt_mark_array_keys(IndexScanDesc scan)
618 {
619 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
620 	int			i;
621 
622 	for (i = 0; i < so->numArrayKeys; i++)
623 	{
624 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
625 
626 		curArrayKey->mark_elem = curArrayKey->cur_elem;
627 	}
628 }
629 
630 /*
631  * _bt_restore_array_keys() -- Handle array keys during btrestrpos
632  *
633  * Restore the array keys to where they were when the mark was set.
634  */
635 void
636 _bt_restore_array_keys(IndexScanDesc scan)
637 {
638 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
639 	bool		changed = false;
640 	int			i;
641 
642 	/* Restore each array key to its position when the mark was set */
643 	for (i = 0; i < so->numArrayKeys; i++)
644 	{
645 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
646 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
647 		int			mark_elem = curArrayKey->mark_elem;
648 
649 		if (curArrayKey->cur_elem != mark_elem)
650 		{
651 			curArrayKey->cur_elem = mark_elem;
652 			skey->sk_argument = curArrayKey->elem_values[mark_elem];
653 			changed = true;
654 		}
655 	}
656 
657 	/*
658 	 * If we changed any keys, we must redo _bt_preprocess_keys.  That might
659 	 * sound like overkill, but in cases with multiple keys per index column
660 	 * it seems necessary to do the full set of pushups.
661 	 */
662 	if (changed)
663 	{
664 		_bt_preprocess_keys(scan);
665 		/* The mark should have been set on a consistent set of keys... */
666 		Assert(so->qual_ok);
667 	}
668 }
669 
670 
671 /*
672  *	_bt_preprocess_keys() -- Preprocess scan keys
673  *
674  * The given search-type keys (in scan->keyData[] or so->arrayKeyData[])
675  * are copied to so->keyData[] with possible transformation.
676  * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets
677  * the number of output keys (possibly less, never greater).
678  *
679  * The output keys are marked with additional sk_flag bits beyond the
680  * system-standard bits supplied by the caller.  The DESC and NULLS_FIRST
681  * indoption bits for the relevant index attribute are copied into the flags.
682  * Also, for a DESC column, we commute (flip) all the sk_strategy numbers
683  * so that the index sorts in the desired direction.
684  *
685  * One key purpose of this routine is to discover which scan keys must be
686  * satisfied to continue the scan.  It also attempts to eliminate redundant
687  * keys and detect contradictory keys.  (If the index opfamily provides
688  * incomplete sets of cross-type operators, we may fail to detect redundant
689  * or contradictory keys, but we can survive that.)
690  *
691  * The output keys must be sorted by index attribute.  Presently we expect
692  * (but verify) that the input keys are already so sorted --- this is done
693  * by match_clauses_to_index() in indxpath.c.  Some reordering of the keys
694  * within each attribute may be done as a byproduct of the processing here,
695  * but no other code depends on that.
696  *
697  * The output keys are marked with flags SK_BT_REQFWD and/or SK_BT_REQBKWD
698  * if they must be satisfied in order to continue the scan forward or backward
699  * respectively.  _bt_checkkeys uses these flags.  For example, if the quals
700  * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
701  * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
702  * But once we reach tuples like (1,4,z) we can stop scanning because no
703  * later tuples could match.  This is reflected by marking the x and y keys,
704  * but not the z key, with SK_BT_REQFWD.  In general, the keys for leading
705  * attributes with "=" keys are marked both SK_BT_REQFWD and SK_BT_REQBKWD.
706  * For the first attribute without an "=" key, any "<" and "<=" keys are
707  * marked SK_BT_REQFWD while any ">" and ">=" keys are marked SK_BT_REQBKWD.
708  * This can be seen to be correct by considering the above example.  Note
709  * in particular that if there are no keys for a given attribute, the keys for
710  * subsequent attributes can never be required; for instance "WHERE y = 4"
711  * requires a full-index scan.
712  *
713  * If possible, redundant keys are eliminated: we keep only the tightest
714  * >/>= bound and the tightest </<= bound, and if there's an = key then
715  * that's the only one returned.  (So, we return either a single = key,
716  * or one or two boundary-condition keys for each attr.)  However, if we
717  * cannot compare two keys for lack of a suitable cross-type operator,
718  * we cannot eliminate either.  If there are two such keys of the same
719  * operator strategy, the second one is just pushed into the output array
720  * without further processing here.  We may also emit both >/>= or both
721  * </<= keys if we can't compare them.  The logic about required keys still
722  * works if we don't eliminate redundant keys.
723  *
724  * Note that one reason we need direction-sensitive required-key flags is
725  * precisely that we may not be able to eliminate redundant keys.  Suppose
726  * we have "x > 4::int AND x > 10::bigint", and we are unable to determine
727  * which key is more restrictive for lack of a suitable cross-type operator.
728  * _bt_first will arbitrarily pick one of the keys to do the initial
729  * positioning with.  If it picks x > 4, then the x > 10 condition will fail
730  * until we reach index entries > 10; but we can't stop the scan just because
731  * x > 10 is failing.  On the other hand, if we are scanning backwards, then
732  * failure of either key is indeed enough to stop the scan.  (In general, when
733  * inequality keys are present, the initial-positioning code only promises to
734  * position before the first possible match, not exactly at the first match,
735  * for a forward scan; or after the last match for a backward scan.)
736  *
737  * As a byproduct of this work, we can detect contradictory quals such
738  * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = false,
739  * indicating the scan need not be run at all since no tuples can match.
740  * (In this case we do not bother completing the output key array!)
741  * Again, missing cross-type operators might cause us to fail to prove the
742  * quals contradictory when they really are, but the scan will work correctly.
743  *
744  * Row comparison keys are currently also treated without any smarts:
745  * we just transfer them into the preprocessed array without any
746  * editorialization.  We can treat them the same as an ordinary inequality
747  * comparison on the row's first index column, for the purposes of the logic
748  * about required keys.
749  *
750  * Note: the reason we have to copy the preprocessed scan keys into private
751  * storage is that we are modifying the array based on comparisons of the
752  * key argument values, which could change on a rescan or after moving to
753  * new elements of array keys.  Therefore we can't overwrite the source data.
754  */
755 void
756 _bt_preprocess_keys(IndexScanDesc scan)
757 {
758 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
759 	int			numberOfKeys = scan->numberOfKeys;
760 	int16	   *indoption = scan->indexRelation->rd_indoption;
761 	int			new_numberOfKeys;
762 	int			numberOfEqualCols;
763 	ScanKey		inkeys;
764 	ScanKey		outkeys;
765 	ScanKey		cur;
766 	ScanKey		xform[BTMaxStrategyNumber];
767 	bool		test_result;
768 	int			i,
769 				j;
770 	AttrNumber	attno;
771 
772 	/* initialize result variables */
773 	so->qual_ok = true;
774 	so->numberOfKeys = 0;
775 
776 	if (numberOfKeys < 1)
777 		return;					/* done if qual-less scan */
778 
779 	/*
780 	 * Read so->arrayKeyData if array keys are present, else scan->keyData
781 	 */
782 	if (so->arrayKeyData != NULL)
783 		inkeys = so->arrayKeyData;
784 	else
785 		inkeys = scan->keyData;
786 
787 	outkeys = so->keyData;
788 	cur = &inkeys[0];
789 	/* we check that input keys are correctly ordered */
790 	if (cur->sk_attno < 1)
791 		elog(ERROR, "btree index keys must be ordered by attribute");
792 
793 	/* We can short-circuit most of the work if there's just one key */
794 	if (numberOfKeys == 1)
795 	{
796 		/* Apply indoption to scankey (might change sk_strategy!) */
797 		if (!_bt_fix_scankey_strategy(cur, indoption))
798 			so->qual_ok = false;
799 		memcpy(outkeys, cur, sizeof(ScanKeyData));
800 		so->numberOfKeys = 1;
801 		/* We can mark the qual as required if it's for first index col */
802 		if (cur->sk_attno == 1)
803 			_bt_mark_scankey_required(outkeys);
804 		return;
805 	}
806 
807 	/*
808 	 * Otherwise, do the full set of pushups.
809 	 */
810 	new_numberOfKeys = 0;
811 	numberOfEqualCols = 0;
812 
813 	/*
814 	 * Initialize for processing of keys for attr 1.
815 	 *
816 	 * xform[i] points to the currently best scan key of strategy type i+1; it
817 	 * is NULL if we haven't yet found such a key for this attr.
818 	 */
819 	attno = 1;
820 	memset(xform, 0, sizeof(xform));
821 
822 	/*
823 	 * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
824 	 * handle after-last-key processing.  Actual exit from the loop is at the
825 	 * "break" statement below.
826 	 */
827 	for (i = 0;; cur++, i++)
828 	{
829 		if (i < numberOfKeys)
830 		{
831 			/* Apply indoption to scankey (might change sk_strategy!) */
832 			if (!_bt_fix_scankey_strategy(cur, indoption))
833 			{
834 				/* NULL can't be matched, so give up */
835 				so->qual_ok = false;
836 				return;
837 			}
838 		}
839 
840 		/*
841 		 * If we are at the end of the keys for a particular attr, finish up
842 		 * processing and emit the cleaned-up keys.
843 		 */
844 		if (i == numberOfKeys || cur->sk_attno != attno)
845 		{
846 			int			priorNumberOfEqualCols = numberOfEqualCols;
847 
848 			/* check input keys are correctly ordered */
849 			if (i < numberOfKeys && cur->sk_attno < attno)
850 				elog(ERROR, "btree index keys must be ordered by attribute");
851 
852 			/*
853 			 * If = has been specified, all other keys can be eliminated as
854 			 * redundant.  If we have a case like key = 1 AND key > 2, we can
855 			 * set qual_ok to false and abandon further processing.
856 			 *
857 			 * We also have to deal with the case of "key IS NULL", which is
858 			 * unsatisfiable in combination with any other index condition. By
859 			 * the time we get here, that's been classified as an equality
860 			 * check, and we've rejected any combination of it with a regular
861 			 * equality condition; but not with other types of conditions.
862 			 */
863 			if (xform[BTEqualStrategyNumber - 1])
864 			{
865 				ScanKey		eq = xform[BTEqualStrategyNumber - 1];
866 
867 				for (j = BTMaxStrategyNumber; --j >= 0;)
868 				{
869 					ScanKey		chk = xform[j];
870 
871 					if (!chk || j == (BTEqualStrategyNumber - 1))
872 						continue;
873 
874 					if (eq->sk_flags & SK_SEARCHNULL)
875 					{
876 						/* IS NULL is contradictory to anything else */
877 						so->qual_ok = false;
878 						return;
879 					}
880 
881 					if (_bt_compare_scankey_args(scan, chk, eq, chk,
882 												 &test_result))
883 					{
884 						if (!test_result)
885 						{
886 							/* keys proven mutually contradictory */
887 							so->qual_ok = false;
888 							return;
889 						}
890 						/* else discard the redundant non-equality key */
891 						xform[j] = NULL;
892 					}
893 					/* else, cannot determine redundancy, keep both keys */
894 				}
895 				/* track number of attrs for which we have "=" keys */
896 				numberOfEqualCols++;
897 			}
898 
899 			/* try to keep only one of <, <= */
900 			if (xform[BTLessStrategyNumber - 1]
901 				&& xform[BTLessEqualStrategyNumber - 1])
902 			{
903 				ScanKey		lt = xform[BTLessStrategyNumber - 1];
904 				ScanKey		le = xform[BTLessEqualStrategyNumber - 1];
905 
906 				if (_bt_compare_scankey_args(scan, le, lt, le,
907 											 &test_result))
908 				{
909 					if (test_result)
910 						xform[BTLessEqualStrategyNumber - 1] = NULL;
911 					else
912 						xform[BTLessStrategyNumber - 1] = NULL;
913 				}
914 			}
915 
916 			/* try to keep only one of >, >= */
917 			if (xform[BTGreaterStrategyNumber - 1]
918 				&& xform[BTGreaterEqualStrategyNumber - 1])
919 			{
920 				ScanKey		gt = xform[BTGreaterStrategyNumber - 1];
921 				ScanKey		ge = xform[BTGreaterEqualStrategyNumber - 1];
922 
923 				if (_bt_compare_scankey_args(scan, ge, gt, ge,
924 											 &test_result))
925 				{
926 					if (test_result)
927 						xform[BTGreaterEqualStrategyNumber - 1] = NULL;
928 					else
929 						xform[BTGreaterStrategyNumber - 1] = NULL;
930 				}
931 			}
932 
933 			/*
934 			 * Emit the cleaned-up keys into the outkeys[] array, and then
935 			 * mark them if they are required.  They are required (possibly
936 			 * only in one direction) if all attrs before this one had "=".
937 			 */
938 			for (j = BTMaxStrategyNumber; --j >= 0;)
939 			{
940 				if (xform[j])
941 				{
942 					ScanKey		outkey = &outkeys[new_numberOfKeys++];
943 
944 					memcpy(outkey, xform[j], sizeof(ScanKeyData));
945 					if (priorNumberOfEqualCols == attno - 1)
946 						_bt_mark_scankey_required(outkey);
947 				}
948 			}
949 
950 			/*
951 			 * Exit loop here if done.
952 			 */
953 			if (i == numberOfKeys)
954 				break;
955 
956 			/* Re-initialize for new attno */
957 			attno = cur->sk_attno;
958 			memset(xform, 0, sizeof(xform));
959 		}
960 
961 		/* check strategy this key's operator corresponds to */
962 		j = cur->sk_strategy - 1;
963 
964 		/* if row comparison, push it directly to the output array */
965 		if (cur->sk_flags & SK_ROW_HEADER)
966 		{
967 			ScanKey		outkey = &outkeys[new_numberOfKeys++];
968 
969 			memcpy(outkey, cur, sizeof(ScanKeyData));
970 			if (numberOfEqualCols == attno - 1)
971 				_bt_mark_scankey_required(outkey);
972 
973 			/*
974 			 * We don't support RowCompare using equality; such a qual would
975 			 * mess up the numberOfEqualCols tracking.
976 			 */
977 			Assert(j != (BTEqualStrategyNumber - 1));
978 			continue;
979 		}
980 
981 		/* have we seen one of these before? */
982 		if (xform[j] == NULL)
983 		{
984 			/* nope, so remember this scankey */
985 			xform[j] = cur;
986 		}
987 		else
988 		{
989 			/* yup, keep only the more restrictive key */
990 			if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
991 										 &test_result))
992 			{
993 				if (test_result)
994 					xform[j] = cur;
995 				else if (j == (BTEqualStrategyNumber - 1))
996 				{
997 					/* key == a && key == b, but a != b */
998 					so->qual_ok = false;
999 					return;
1000 				}
1001 				/* else old key is more restrictive, keep it */
1002 			}
1003 			else
1004 			{
1005 				/*
1006 				 * We can't determine which key is more restrictive.  Keep the
1007 				 * previous one in xform[j] and push this one directly to the
1008 				 * output array.
1009 				 */
1010 				ScanKey		outkey = &outkeys[new_numberOfKeys++];
1011 
1012 				memcpy(outkey, cur, sizeof(ScanKeyData));
1013 				if (numberOfEqualCols == attno - 1)
1014 					_bt_mark_scankey_required(outkey);
1015 			}
1016 		}
1017 	}
1018 
1019 	so->numberOfKeys = new_numberOfKeys;
1020 }
1021 
1022 /*
1023  * Compare two scankey values using a specified operator.
1024  *
1025  * The test we want to perform is logically "leftarg op rightarg", where
1026  * leftarg and rightarg are the sk_argument values in those ScanKeys, and
1027  * the comparison operator is the one in the op ScanKey.  However, in
1028  * cross-data-type situations we may need to look up the correct operator in
1029  * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
1030  * and amoplefttype/amoprighttype equal to the two argument datatypes.
1031  *
1032  * If the opfamily doesn't supply a complete set of cross-type operators we
1033  * may not be able to make the comparison.  If we can make the comparison
1034  * we store the operator result in *result and return true.  We return false
1035  * if the comparison could not be made.
1036  *
1037  * Note: op always points at the same ScanKey as either leftarg or rightarg.
1038  * Since we don't scribble on the scankeys, this aliasing should cause no
1039  * trouble.
1040  *
1041  * Note: this routine needs to be insensitive to any DESC option applied
1042  * to the index column.  For example, "x < 4" is a tighter constraint than
1043  * "x < 5" regardless of which way the index is sorted.
1044  */
1045 static bool
1046 _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
1047 						 ScanKey leftarg, ScanKey rightarg,
1048 						 bool *result)
1049 {
1050 	Relation	rel = scan->indexRelation;
1051 	Oid			lefttype,
1052 				righttype,
1053 				optype,
1054 				opcintype,
1055 				cmp_op;
1056 	StrategyNumber strat;
1057 
1058 	/*
1059 	 * First, deal with cases where one or both args are NULL.  This should
1060 	 * only happen when the scankeys represent IS NULL/NOT NULL conditions.
1061 	 */
1062 	if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ISNULL)
1063 	{
1064 		bool		leftnull,
1065 					rightnull;
1066 
1067 		if (leftarg->sk_flags & SK_ISNULL)
1068 		{
1069 			Assert(leftarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
1070 			leftnull = true;
1071 		}
1072 		else
1073 			leftnull = false;
1074 		if (rightarg->sk_flags & SK_ISNULL)
1075 		{
1076 			Assert(rightarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
1077 			rightnull = true;
1078 		}
1079 		else
1080 			rightnull = false;
1081 
1082 		/*
1083 		 * We treat NULL as either greater than or less than all other values.
1084 		 * Since true > false, the tests below work correctly for NULLS LAST
1085 		 * logic.  If the index is NULLS FIRST, we need to flip the strategy.
1086 		 */
1087 		strat = op->sk_strategy;
1088 		if (op->sk_flags & SK_BT_NULLS_FIRST)
1089 			strat = BTCommuteStrategyNumber(strat);
1090 
1091 		switch (strat)
1092 		{
1093 			case BTLessStrategyNumber:
1094 				*result = (leftnull < rightnull);
1095 				break;
1096 			case BTLessEqualStrategyNumber:
1097 				*result = (leftnull <= rightnull);
1098 				break;
1099 			case BTEqualStrategyNumber:
1100 				*result = (leftnull == rightnull);
1101 				break;
1102 			case BTGreaterEqualStrategyNumber:
1103 				*result = (leftnull >= rightnull);
1104 				break;
1105 			case BTGreaterStrategyNumber:
1106 				*result = (leftnull > rightnull);
1107 				break;
1108 			default:
1109 				elog(ERROR, "unrecognized StrategyNumber: %d", (int) strat);
1110 				*result = false;	/* keep compiler quiet */
1111 				break;
1112 		}
1113 		return true;
1114 	}
1115 
1116 	/*
1117 	 * The opfamily we need to worry about is identified by the index column.
1118 	 */
1119 	Assert(leftarg->sk_attno == rightarg->sk_attno);
1120 
1121 	opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
1122 
1123 	/*
1124 	 * Determine the actual datatypes of the ScanKey arguments.  We have to
1125 	 * support the convention that sk_subtype == InvalidOid means the opclass
1126 	 * input type; this is a hack to simplify life for ScanKeyInit().
1127 	 */
1128 	lefttype = leftarg->sk_subtype;
1129 	if (lefttype == InvalidOid)
1130 		lefttype = opcintype;
1131 	righttype = rightarg->sk_subtype;
1132 	if (righttype == InvalidOid)
1133 		righttype = opcintype;
1134 	optype = op->sk_subtype;
1135 	if (optype == InvalidOid)
1136 		optype = opcintype;
1137 
1138 	/*
1139 	 * If leftarg and rightarg match the types expected for the "op" scankey,
1140 	 * we can use its already-looked-up comparison function.
1141 	 */
1142 	if (lefttype == opcintype && righttype == optype)
1143 	{
1144 		*result = DatumGetBool(FunctionCall2Coll(&op->sk_func,
1145 												 op->sk_collation,
1146 												 leftarg->sk_argument,
1147 												 rightarg->sk_argument));
1148 		return true;
1149 	}
1150 
1151 	/*
1152 	 * Otherwise, we need to go to the syscache to find the appropriate
1153 	 * operator.  (This cannot result in infinite recursion, since no
1154 	 * indexscan initiated by syscache lookup will use cross-data-type
1155 	 * operators.)
1156 	 *
1157 	 * If the sk_strategy was flipped by _bt_fix_scankey_strategy, we have to
1158 	 * un-flip it to get the correct opfamily member.
1159 	 */
1160 	strat = op->sk_strategy;
1161 	if (op->sk_flags & SK_BT_DESC)
1162 		strat = BTCommuteStrategyNumber(strat);
1163 
1164 	cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
1165 								 lefttype,
1166 								 righttype,
1167 								 strat);
1168 	if (OidIsValid(cmp_op))
1169 	{
1170 		RegProcedure cmp_proc = get_opcode(cmp_op);
1171 
1172 		if (RegProcedureIsValid(cmp_proc))
1173 		{
1174 			*result = DatumGetBool(OidFunctionCall2Coll(cmp_proc,
1175 														op->sk_collation,
1176 														leftarg->sk_argument,
1177 														rightarg->sk_argument));
1178 			return true;
1179 		}
1180 	}
1181 
1182 	/* Can't make the comparison */
1183 	*result = false;			/* suppress compiler warnings */
1184 	return false;
1185 }
1186 
1187 /*
1188  * Adjust a scankey's strategy and flags setting as needed for indoptions.
1189  *
1190  * We copy the appropriate indoption value into the scankey sk_flags
1191  * (shifting to avoid clobbering system-defined flag bits).  Also, if
1192  * the DESC option is set, commute (flip) the operator strategy number.
1193  *
1194  * A secondary purpose is to check for IS NULL/NOT NULL scankeys and set up
1195  * the strategy field correctly for them.
1196  *
1197  * Lastly, for ordinary scankeys (not IS NULL/NOT NULL), we check for a
1198  * NULL comparison value.  Since all btree operators are assumed strict,
1199  * a NULL means that the qual cannot be satisfied.  We return true if the
1200  * comparison value isn't NULL, or false if the scan should be abandoned.
1201  *
1202  * This function is applied to the *input* scankey structure; therefore
1203  * on a rescan we will be looking at already-processed scankeys.  Hence
1204  * we have to be careful not to re-commute the strategy if we already did it.
1205  * It's a bit ugly to modify the caller's copy of the scankey but in practice
1206  * there shouldn't be any problem, since the index's indoptions are certainly
1207  * not going to change while the scankey survives.
1208  */
1209 static bool
1210 _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption)
1211 {
1212 	int			addflags;
1213 
1214 	addflags = indoption[skey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1215 
1216 	/*
1217 	 * We treat all btree operators as strict (even if they're not so marked
1218 	 * in pg_proc). This means that it is impossible for an operator condition
1219 	 * with a NULL comparison constant to succeed, and we can reject it right
1220 	 * away.
1221 	 *
1222 	 * However, we now also support "x IS NULL" clauses as search conditions,
1223 	 * so in that case keep going. The planner has not filled in any
1224 	 * particular strategy in this case, so set it to BTEqualStrategyNumber
1225 	 * --- we can treat IS NULL as an equality operator for purposes of search
1226 	 * strategy.
1227 	 *
1228 	 * Likewise, "x IS NOT NULL" is supported.  We treat that as either "less
1229 	 * than NULL" in a NULLS LAST index, or "greater than NULL" in a NULLS
1230 	 * FIRST index.
1231 	 *
1232 	 * Note: someday we might have to fill in sk_collation from the index
1233 	 * column's collation.  At the moment this is a non-issue because we'll
1234 	 * never actually call the comparison operator on a NULL.
1235 	 */
1236 	if (skey->sk_flags & SK_ISNULL)
1237 	{
1238 		/* SK_ISNULL shouldn't be set in a row header scankey */
1239 		Assert(!(skey->sk_flags & SK_ROW_HEADER));
1240 
1241 		/* Set indoption flags in scankey (might be done already) */
1242 		skey->sk_flags |= addflags;
1243 
1244 		/* Set correct strategy for IS NULL or NOT NULL search */
1245 		if (skey->sk_flags & SK_SEARCHNULL)
1246 		{
1247 			skey->sk_strategy = BTEqualStrategyNumber;
1248 			skey->sk_subtype = InvalidOid;
1249 			skey->sk_collation = InvalidOid;
1250 		}
1251 		else if (skey->sk_flags & SK_SEARCHNOTNULL)
1252 		{
1253 			if (skey->sk_flags & SK_BT_NULLS_FIRST)
1254 				skey->sk_strategy = BTGreaterStrategyNumber;
1255 			else
1256 				skey->sk_strategy = BTLessStrategyNumber;
1257 			skey->sk_subtype = InvalidOid;
1258 			skey->sk_collation = InvalidOid;
1259 		}
1260 		else
1261 		{
1262 			/* regular qual, so it cannot be satisfied */
1263 			return false;
1264 		}
1265 
1266 		/* Needn't do the rest */
1267 		return true;
1268 	}
1269 
1270 	/* Adjust strategy for DESC, if we didn't already */
1271 	if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC))
1272 		skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy);
1273 	skey->sk_flags |= addflags;
1274 
1275 	/* If it's a row header, fix row member flags and strategies similarly */
1276 	if (skey->sk_flags & SK_ROW_HEADER)
1277 	{
1278 		ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1279 
1280 		for (;;)
1281 		{
1282 			Assert(subkey->sk_flags & SK_ROW_MEMBER);
1283 			addflags = indoption[subkey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1284 			if ((addflags & SK_BT_DESC) && !(subkey->sk_flags & SK_BT_DESC))
1285 				subkey->sk_strategy = BTCommuteStrategyNumber(subkey->sk_strategy);
1286 			subkey->sk_flags |= addflags;
1287 			if (subkey->sk_flags & SK_ROW_END)
1288 				break;
1289 			subkey++;
1290 		}
1291 	}
1292 
1293 	return true;
1294 }
1295 
1296 /*
1297  * Mark a scankey as "required to continue the scan".
1298  *
1299  * Depending on the operator type, the key may be required for both scan
1300  * directions or just one.  Also, if the key is a row comparison header,
1301  * we have to mark its first subsidiary ScanKey as required.  (Subsequent
1302  * subsidiary ScanKeys are normally for lower-order columns, and thus
1303  * cannot be required, since they're after the first non-equality scankey.)
1304  *
1305  * Note: when we set required-key flag bits in a subsidiary scankey, we are
1306  * scribbling on a data structure belonging to the index AM's caller, not on
1307  * our private copy.  This should be OK because the marking will not change
1308  * from scan to scan within a query, and so we'd just re-mark the same way
1309  * anyway on a rescan.  Something to keep an eye on though.
1310  */
1311 static void
1312 _bt_mark_scankey_required(ScanKey skey)
1313 {
1314 	int			addflags;
1315 
1316 	switch (skey->sk_strategy)
1317 	{
1318 		case BTLessStrategyNumber:
1319 		case BTLessEqualStrategyNumber:
1320 			addflags = SK_BT_REQFWD;
1321 			break;
1322 		case BTEqualStrategyNumber:
1323 			addflags = SK_BT_REQFWD | SK_BT_REQBKWD;
1324 			break;
1325 		case BTGreaterEqualStrategyNumber:
1326 		case BTGreaterStrategyNumber:
1327 			addflags = SK_BT_REQBKWD;
1328 			break;
1329 		default:
1330 			elog(ERROR, "unrecognized StrategyNumber: %d",
1331 				 (int) skey->sk_strategy);
1332 			addflags = 0;		/* keep compiler quiet */
1333 			break;
1334 	}
1335 
1336 	skey->sk_flags |= addflags;
1337 
1338 	if (skey->sk_flags & SK_ROW_HEADER)
1339 	{
1340 		ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1341 
1342 		/* First subkey should be same column/operator as the header */
1343 		Assert(subkey->sk_flags & SK_ROW_MEMBER);
1344 		Assert(subkey->sk_attno == skey->sk_attno);
1345 		Assert(subkey->sk_strategy == skey->sk_strategy);
1346 		subkey->sk_flags |= addflags;
1347 	}
1348 }
1349 
1350 /*
1351  * Test whether an indextuple satisfies all the scankey conditions.
1352  *
1353  * If so, return the address of the index tuple on the index page.
1354  * If not, return NULL.
1355  *
1356  * If the tuple fails to pass the qual, we also determine whether there's
1357  * any need to continue the scan beyond this tuple, and set *continuescan
1358  * accordingly.  See comments for _bt_preprocess_keys(), above, about how
1359  * this is done.
1360  *
1361  * scan: index scan descriptor (containing a search-type scankey)
1362  * page: buffer page containing index tuple
1363  * offnum: offset number of index tuple (must be a valid item!)
1364  * dir: direction we are scanning in
1365  * continuescan: output parameter (will be set correctly in all cases)
1366  *
1367  * Caller must hold pin and lock on the index page.
1368  */
1369 IndexTuple
1370 _bt_checkkeys(IndexScanDesc scan,
1371 			  Page page, OffsetNumber offnum,
1372 			  ScanDirection dir, bool *continuescan)
1373 {
1374 	ItemId		iid = PageGetItemId(page, offnum);
1375 	bool		tuple_alive;
1376 	IndexTuple	tuple;
1377 	TupleDesc	tupdesc;
1378 	BTScanOpaque so;
1379 	int			keysz;
1380 	int			ikey;
1381 	ScanKey		key;
1382 
1383 	*continuescan = true;		/* default assumption */
1384 
1385 	/*
1386 	 * If the scan specifies not to return killed tuples, then we treat a
1387 	 * killed tuple as not passing the qual.  Most of the time, it's a win to
1388 	 * not bother examining the tuple's index keys, but just return
1389 	 * immediately with continuescan = true to proceed to the next tuple.
1390 	 * However, if this is the last tuple on the page, we should check the
1391 	 * index keys to prevent uselessly advancing to the next page.
1392 	 */
1393 	if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
1394 	{
1395 		/* return immediately if there are more tuples on the page */
1396 		if (ScanDirectionIsForward(dir))
1397 		{
1398 			if (offnum < PageGetMaxOffsetNumber(page))
1399 				return NULL;
1400 		}
1401 		else
1402 		{
1403 			BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1404 
1405 			if (offnum > P_FIRSTDATAKEY(opaque))
1406 				return NULL;
1407 		}
1408 
1409 		/*
1410 		 * OK, we want to check the keys so we can set continuescan correctly,
1411 		 * but we'll return NULL even if the tuple passes the key tests.
1412 		 */
1413 		tuple_alive = false;
1414 	}
1415 	else
1416 		tuple_alive = true;
1417 
1418 	tuple = (IndexTuple) PageGetItem(page, iid);
1419 
1420 	tupdesc = RelationGetDescr(scan->indexRelation);
1421 	so = (BTScanOpaque) scan->opaque;
1422 	keysz = so->numberOfKeys;
1423 
1424 	for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
1425 	{
1426 		Datum		datum;
1427 		bool		isNull;
1428 		Datum		test;
1429 
1430 		Assert(key->sk_attno <= BTreeTupleGetNAtts(tuple, scan->indexRelation));
1431 		/* row-comparison keys need special processing */
1432 		if (key->sk_flags & SK_ROW_HEADER)
1433 		{
1434 			if (_bt_check_rowcompare(key, tuple, tupdesc, dir, continuescan))
1435 				continue;
1436 			return NULL;
1437 		}
1438 
1439 		datum = index_getattr(tuple,
1440 							  key->sk_attno,
1441 							  tupdesc,
1442 							  &isNull);
1443 
1444 		if (key->sk_flags & SK_ISNULL)
1445 		{
1446 			/* Handle IS NULL/NOT NULL tests */
1447 			if (key->sk_flags & SK_SEARCHNULL)
1448 			{
1449 				if (isNull)
1450 					continue;	/* tuple satisfies this qual */
1451 			}
1452 			else
1453 			{
1454 				Assert(key->sk_flags & SK_SEARCHNOTNULL);
1455 				if (!isNull)
1456 					continue;	/* tuple satisfies this qual */
1457 			}
1458 
1459 			/*
1460 			 * Tuple fails this qual.  If it's a required qual for the current
1461 			 * scan direction, then we can conclude no further tuples will
1462 			 * pass, either.
1463 			 */
1464 			if ((key->sk_flags & SK_BT_REQFWD) &&
1465 				ScanDirectionIsForward(dir))
1466 				*continuescan = false;
1467 			else if ((key->sk_flags & SK_BT_REQBKWD) &&
1468 					 ScanDirectionIsBackward(dir))
1469 				*continuescan = false;
1470 
1471 			/*
1472 			 * In any case, this indextuple doesn't match the qual.
1473 			 */
1474 			return NULL;
1475 		}
1476 
1477 		if (isNull)
1478 		{
1479 			if (key->sk_flags & SK_BT_NULLS_FIRST)
1480 			{
1481 				/*
1482 				 * Since NULLs are sorted before non-NULLs, we know we have
1483 				 * reached the lower limit of the range of values for this
1484 				 * index attr.  On a backward scan, we can stop if this qual
1485 				 * is one of the "must match" subset.  We can stop regardless
1486 				 * of whether the qual is > or <, so long as it's required,
1487 				 * because it's not possible for any future tuples to pass. On
1488 				 * a forward scan, however, we must keep going, because we may
1489 				 * have initially positioned to the start of the index.
1490 				 */
1491 				if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1492 					ScanDirectionIsBackward(dir))
1493 					*continuescan = false;
1494 			}
1495 			else
1496 			{
1497 				/*
1498 				 * Since NULLs are sorted after non-NULLs, we know we have
1499 				 * reached the upper limit of the range of values for this
1500 				 * index attr.  On a forward scan, we can stop if this qual is
1501 				 * one of the "must match" subset.  We can stop regardless of
1502 				 * whether the qual is > or <, so long as it's required,
1503 				 * because it's not possible for any future tuples to pass. On
1504 				 * a backward scan, however, we must keep going, because we
1505 				 * may have initially positioned to the end of the index.
1506 				 */
1507 				if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1508 					ScanDirectionIsForward(dir))
1509 					*continuescan = false;
1510 			}
1511 
1512 			/*
1513 			 * In any case, this indextuple doesn't match the qual.
1514 			 */
1515 			return NULL;
1516 		}
1517 
1518 		test = FunctionCall2Coll(&key->sk_func, key->sk_collation,
1519 								 datum, key->sk_argument);
1520 
1521 		if (!DatumGetBool(test))
1522 		{
1523 			/*
1524 			 * Tuple fails this qual.  If it's a required qual for the current
1525 			 * scan direction, then we can conclude no further tuples will
1526 			 * pass, either.
1527 			 *
1528 			 * Note: because we stop the scan as soon as any required equality
1529 			 * qual fails, it is critical that equality quals be used for the
1530 			 * initial positioning in _bt_first() when they are available. See
1531 			 * comments in _bt_first().
1532 			 */
1533 			if ((key->sk_flags & SK_BT_REQFWD) &&
1534 				ScanDirectionIsForward(dir))
1535 				*continuescan = false;
1536 			else if ((key->sk_flags & SK_BT_REQBKWD) &&
1537 					 ScanDirectionIsBackward(dir))
1538 				*continuescan = false;
1539 
1540 			/*
1541 			 * In any case, this indextuple doesn't match the qual.
1542 			 */
1543 			return NULL;
1544 		}
1545 	}
1546 
1547 	/* Check for failure due to it being a killed tuple. */
1548 	if (!tuple_alive)
1549 		return NULL;
1550 
1551 	/* If we get here, the tuple passes all index quals. */
1552 	return tuple;
1553 }
1554 
1555 /*
1556  * Test whether an indextuple satisfies a row-comparison scan condition.
1557  *
1558  * Return true if so, false if not.  If not, also clear *continuescan if
1559  * it's not possible for any future tuples in the current scan direction
1560  * to pass the qual.
1561  *
1562  * This is a subroutine for _bt_checkkeys, which see for more info.
1563  */
1564 static bool
1565 _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, TupleDesc tupdesc,
1566 					 ScanDirection dir, bool *continuescan)
1567 {
1568 	ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1569 	int32		cmpresult = 0;
1570 	bool		result;
1571 
1572 	/* First subkey should be same as the header says */
1573 	Assert(subkey->sk_attno == skey->sk_attno);
1574 
1575 	/* Loop over columns of the row condition */
1576 	for (;;)
1577 	{
1578 		Datum		datum;
1579 		bool		isNull;
1580 
1581 		Assert(subkey->sk_flags & SK_ROW_MEMBER);
1582 
1583 		datum = index_getattr(tuple,
1584 							  subkey->sk_attno,
1585 							  tupdesc,
1586 							  &isNull);
1587 
1588 		if (isNull)
1589 		{
1590 			if (subkey->sk_flags & SK_BT_NULLS_FIRST)
1591 			{
1592 				/*
1593 				 * Since NULLs are sorted before non-NULLs, we know we have
1594 				 * reached the lower limit of the range of values for this
1595 				 * index attr.  On a backward scan, we can stop if this qual
1596 				 * is one of the "must match" subset.  We can stop regardless
1597 				 * of whether the qual is > or <, so long as it's required,
1598 				 * because it's not possible for any future tuples to pass. On
1599 				 * a forward scan, however, we must keep going, because we may
1600 				 * have initially positioned to the start of the index.
1601 				 */
1602 				if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1603 					ScanDirectionIsBackward(dir))
1604 					*continuescan = false;
1605 			}
1606 			else
1607 			{
1608 				/*
1609 				 * Since NULLs are sorted after non-NULLs, we know we have
1610 				 * reached the upper limit of the range of values for this
1611 				 * index attr.  On a forward scan, we can stop if this qual is
1612 				 * one of the "must match" subset.  We can stop regardless of
1613 				 * whether the qual is > or <, so long as it's required,
1614 				 * because it's not possible for any future tuples to pass. On
1615 				 * a backward scan, however, we must keep going, because we
1616 				 * may have initially positioned to the end of the index.
1617 				 */
1618 				if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1619 					ScanDirectionIsForward(dir))
1620 					*continuescan = false;
1621 			}
1622 
1623 			/*
1624 			 * In any case, this indextuple doesn't match the qual.
1625 			 */
1626 			return false;
1627 		}
1628 
1629 		if (subkey->sk_flags & SK_ISNULL)
1630 		{
1631 			/*
1632 			 * Unlike the simple-scankey case, this isn't a disallowed case.
1633 			 * But it can never match.  If all the earlier row comparison
1634 			 * columns are required for the scan direction, we can stop the
1635 			 * scan, because there can't be another tuple that will succeed.
1636 			 */
1637 			if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument))
1638 				subkey--;
1639 			if ((subkey->sk_flags & SK_BT_REQFWD) &&
1640 				ScanDirectionIsForward(dir))
1641 				*continuescan = false;
1642 			else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1643 					 ScanDirectionIsBackward(dir))
1644 				*continuescan = false;
1645 			return false;
1646 		}
1647 
1648 		/* Perform the test --- three-way comparison not bool operator */
1649 		cmpresult = DatumGetInt32(FunctionCall2Coll(&subkey->sk_func,
1650 													subkey->sk_collation,
1651 													datum,
1652 													subkey->sk_argument));
1653 
1654 		if (subkey->sk_flags & SK_BT_DESC)
1655 			INVERT_COMPARE_RESULT(cmpresult);
1656 
1657 		/* Done comparing if unequal, else advance to next column */
1658 		if (cmpresult != 0)
1659 			break;
1660 
1661 		if (subkey->sk_flags & SK_ROW_END)
1662 			break;
1663 		subkey++;
1664 	}
1665 
1666 	/*
1667 	 * At this point cmpresult indicates the overall result of the row
1668 	 * comparison, and subkey points to the deciding column (or the last
1669 	 * column if the result is "=").
1670 	 */
1671 	switch (subkey->sk_strategy)
1672 	{
1673 			/* EQ and NE cases aren't allowed here */
1674 		case BTLessStrategyNumber:
1675 			result = (cmpresult < 0);
1676 			break;
1677 		case BTLessEqualStrategyNumber:
1678 			result = (cmpresult <= 0);
1679 			break;
1680 		case BTGreaterEqualStrategyNumber:
1681 			result = (cmpresult >= 0);
1682 			break;
1683 		case BTGreaterStrategyNumber:
1684 			result = (cmpresult > 0);
1685 			break;
1686 		default:
1687 			elog(ERROR, "unrecognized RowCompareType: %d",
1688 				 (int) subkey->sk_strategy);
1689 			result = 0;			/* keep compiler quiet */
1690 			break;
1691 	}
1692 
1693 	if (!result)
1694 	{
1695 		/*
1696 		 * Tuple fails this qual.  If it's a required qual for the current
1697 		 * scan direction, then we can conclude no further tuples will pass,
1698 		 * either.  Note we have to look at the deciding column, not
1699 		 * necessarily the first or last column of the row condition.
1700 		 */
1701 		if ((subkey->sk_flags & SK_BT_REQFWD) &&
1702 			ScanDirectionIsForward(dir))
1703 			*continuescan = false;
1704 		else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1705 				 ScanDirectionIsBackward(dir))
1706 			*continuescan = false;
1707 	}
1708 
1709 	return result;
1710 }
1711 
1712 /*
1713  * _bt_killitems - set LP_DEAD state for items an indexscan caller has
1714  * told us were killed
1715  *
1716  * scan->opaque, referenced locally through so, contains information about the
1717  * current page and killed tuples thereon (generally, this should only be
1718  * called if so->numKilled > 0).
1719  *
1720  * The caller does not have a lock on the page and may or may not have the
1721  * page pinned in a buffer.  Note that read-lock is sufficient for setting
1722  * LP_DEAD status (which is only a hint).
1723  *
1724  * We match items by heap TID before assuming they are the right ones to
1725  * delete.  We cope with cases where items have moved right due to insertions.
1726  * If an item has moved off the current page due to a split, we'll fail to
1727  * find it and do nothing (this is not an error case --- we assume the item
1728  * will eventually get marked in a future indexscan).
1729  *
1730  * Note that if we hold a pin on the target page continuously from initially
1731  * reading the items until applying this function, VACUUM cannot have deleted
1732  * any items from the page, and so there is no need to search left from the
1733  * recorded offset.  (This observation also guarantees that the item is still
1734  * the right one to delete, which might otherwise be questionable since heap
1735  * TIDs can get recycled.)	This holds true even if the page has been modified
1736  * by inserts and page splits, so there is no need to consult the LSN.
1737  *
1738  * If the pin was released after reading the page, then we re-read it.  If it
1739  * has been modified since we read it (as determined by the LSN), we dare not
1740  * flag any entries because it is possible that the old entry was vacuumed
1741  * away and the TID was re-used by a completely different heap tuple.
1742  */
1743 void
1744 _bt_killitems(IndexScanDesc scan)
1745 {
1746 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
1747 	Page		page;
1748 	BTPageOpaque opaque;
1749 	OffsetNumber minoff;
1750 	OffsetNumber maxoff;
1751 	int			i;
1752 	int			numKilled = so->numKilled;
1753 	bool		killedsomething = false;
1754 
1755 	Assert(BTScanPosIsValid(so->currPos));
1756 
1757 	/*
1758 	 * Always reset the scan state, so we don't look for same items on other
1759 	 * pages.
1760 	 */
1761 	so->numKilled = 0;
1762 
1763 	if (BTScanPosIsPinned(so->currPos))
1764 	{
1765 		/*
1766 		 * We have held the pin on this page since we read the index tuples,
1767 		 * so all we need to do is lock it.  The pin will have prevented
1768 		 * re-use of any TID on the page, so there is no need to check the
1769 		 * LSN.
1770 		 */
1771 		LockBuffer(so->currPos.buf, BT_READ);
1772 
1773 		page = BufferGetPage(so->currPos.buf);
1774 	}
1775 	else
1776 	{
1777 		Buffer		buf;
1778 
1779 		/* Attempt to re-read the buffer, getting pin and lock. */
1780 		buf = _bt_getbuf(scan->indexRelation, so->currPos.currPage, BT_READ);
1781 
1782 		/* It might not exist anymore; in which case we can't hint it. */
1783 		if (!BufferIsValid(buf))
1784 			return;
1785 
1786 		page = BufferGetPage(buf);
1787 		if (BufferGetLSNAtomic(buf) == so->currPos.lsn)
1788 			so->currPos.buf = buf;
1789 		else
1790 		{
1791 			/* Modified while not pinned means hinting is not safe. */
1792 			_bt_relbuf(scan->indexRelation, buf);
1793 			return;
1794 		}
1795 	}
1796 
1797 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1798 	minoff = P_FIRSTDATAKEY(opaque);
1799 	maxoff = PageGetMaxOffsetNumber(page);
1800 
1801 	for (i = 0; i < numKilled; i++)
1802 	{
1803 		int			itemIndex = so->killedItems[i];
1804 		BTScanPosItem *kitem = &so->currPos.items[itemIndex];
1805 		OffsetNumber offnum = kitem->indexOffset;
1806 
1807 		Assert(itemIndex >= so->currPos.firstItem &&
1808 			   itemIndex <= so->currPos.lastItem);
1809 		if (offnum < minoff)
1810 			continue;			/* pure paranoia */
1811 		while (offnum <= maxoff)
1812 		{
1813 			ItemId		iid = PageGetItemId(page, offnum);
1814 			IndexTuple	ituple = (IndexTuple) PageGetItem(page, iid);
1815 
1816 			if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
1817 			{
1818 				/*
1819 				 * Found the item.  Mark it as dead, if it isn't already.
1820 				 * Since this happens while holding a buffer lock possibly in
1821 				 * shared mode, it's possible that multiple processes attempt
1822 				 * to do this simultaneously, leading to multiple full-page
1823 				 * images being sent to WAL (if wal_log_hints or data checksums
1824 				 * are enabled), which is undesirable.
1825 				 */
1826 				if (!ItemIdIsDead(iid))
1827 				{
1828 					ItemIdMarkDead(iid);
1829 					killedsomething = true;
1830 				}
1831 				break;			/* out of inner search loop */
1832 			}
1833 			offnum = OffsetNumberNext(offnum);
1834 		}
1835 	}
1836 
1837 	/*
1838 	 * Since this can be redone later if needed, mark as dirty hint.
1839 	 *
1840 	 * Whenever we mark anything LP_DEAD, we also set the page's
1841 	 * BTP_HAS_GARBAGE flag, which is likewise just a hint.
1842 	 */
1843 	if (killedsomething)
1844 	{
1845 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
1846 		MarkBufferDirtyHint(so->currPos.buf, true);
1847 	}
1848 
1849 	LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
1850 }
1851 
1852 
1853 /*
1854  * The following routines manage a shared-memory area in which we track
1855  * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
1856  * operations.  There is a single counter which increments each time we
1857  * start a vacuum to assign it a cycle ID.  Since multiple vacuums could
1858  * be active concurrently, we have to track the cycle ID for each active
1859  * vacuum; this requires at most MaxBackends entries (usually far fewer).
1860  * We assume at most one vacuum can be active for a given index.
1861  *
1862  * Access to the shared memory area is controlled by BtreeVacuumLock.
1863  * In principle we could use a separate lmgr locktag for each index,
1864  * but a single LWLock is much cheaper, and given the short time that
1865  * the lock is ever held, the concurrency hit should be minimal.
1866  */
1867 
1868 typedef struct BTOneVacInfo
1869 {
1870 	LockRelId	relid;			/* global identifier of an index */
1871 	BTCycleId	cycleid;		/* cycle ID for its active VACUUM */
1872 } BTOneVacInfo;
1873 
1874 typedef struct BTVacInfo
1875 {
1876 	BTCycleId	cycle_ctr;		/* cycle ID most recently assigned */
1877 	int			num_vacuums;	/* number of currently active VACUUMs */
1878 	int			max_vacuums;	/* allocated length of vacuums[] array */
1879 	BTOneVacInfo vacuums[FLEXIBLE_ARRAY_MEMBER];
1880 } BTVacInfo;
1881 
1882 static BTVacInfo *btvacinfo;
1883 
1884 
1885 /*
1886  * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
1887  *		or zero if there is no active VACUUM
1888  *
1889  * Note: for correct interlocking, the caller must already hold pin and
1890  * exclusive lock on each buffer it will store the cycle ID into.  This
1891  * ensures that even if a VACUUM starts immediately afterwards, it cannot
1892  * process those pages until the page split is complete.
1893  */
1894 BTCycleId
1895 _bt_vacuum_cycleid(Relation rel)
1896 {
1897 	BTCycleId	result = 0;
1898 	int			i;
1899 
1900 	/* Share lock is enough since this is a read-only operation */
1901 	LWLockAcquire(BtreeVacuumLock, LW_SHARED);
1902 
1903 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1904 	{
1905 		BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1906 
1907 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1908 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1909 		{
1910 			result = vac->cycleid;
1911 			break;
1912 		}
1913 	}
1914 
1915 	LWLockRelease(BtreeVacuumLock);
1916 	return result;
1917 }
1918 
1919 /*
1920  * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
1921  *
1922  * Note: the caller must guarantee that it will eventually call
1923  * _bt_end_vacuum, else we'll permanently leak an array slot.  To ensure
1924  * that this happens even in elog(FATAL) scenarios, the appropriate coding
1925  * is not just a PG_TRY, but
1926  *		PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel))
1927  */
1928 BTCycleId
1929 _bt_start_vacuum(Relation rel)
1930 {
1931 	BTCycleId	result;
1932 	int			i;
1933 	BTOneVacInfo *vac;
1934 
1935 	LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1936 
1937 	/*
1938 	 * Assign the next cycle ID, being careful to avoid zero as well as the
1939 	 * reserved high values.
1940 	 */
1941 	result = ++(btvacinfo->cycle_ctr);
1942 	if (result == 0 || result > MAX_BT_CYCLE_ID)
1943 		result = btvacinfo->cycle_ctr = 1;
1944 
1945 	/* Let's just make sure there's no entry already for this index */
1946 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1947 	{
1948 		vac = &btvacinfo->vacuums[i];
1949 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1950 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1951 		{
1952 			/*
1953 			 * Unlike most places in the backend, we have to explicitly
1954 			 * release our LWLock before throwing an error.  This is because
1955 			 * we expect _bt_end_vacuum() to be called before transaction
1956 			 * abort cleanup can run to release LWLocks.
1957 			 */
1958 			LWLockRelease(BtreeVacuumLock);
1959 			elog(ERROR, "multiple active vacuums for index \"%s\"",
1960 				 RelationGetRelationName(rel));
1961 		}
1962 	}
1963 
1964 	/* OK, add an entry */
1965 	if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
1966 	{
1967 		LWLockRelease(BtreeVacuumLock);
1968 		elog(ERROR, "out of btvacinfo slots");
1969 	}
1970 	vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
1971 	vac->relid = rel->rd_lockInfo.lockRelId;
1972 	vac->cycleid = result;
1973 	btvacinfo->num_vacuums++;
1974 
1975 	LWLockRelease(BtreeVacuumLock);
1976 	return result;
1977 }
1978 
1979 /*
1980  * _bt_end_vacuum --- mark a btree VACUUM operation as done
1981  *
1982  * Note: this is deliberately coded not to complain if no entry is found;
1983  * this allows the caller to put PG_TRY around the start_vacuum operation.
1984  */
1985 void
1986 _bt_end_vacuum(Relation rel)
1987 {
1988 	int			i;
1989 
1990 	LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1991 
1992 	/* Find the array entry */
1993 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1994 	{
1995 		BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1996 
1997 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1998 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1999 		{
2000 			/* Remove it by shifting down the last entry */
2001 			*vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
2002 			btvacinfo->num_vacuums--;
2003 			break;
2004 		}
2005 	}
2006 
2007 	LWLockRelease(BtreeVacuumLock);
2008 }
2009 
2010 /*
2011  * _bt_end_vacuum wrapped as an on_shmem_exit callback function
2012  */
2013 void
2014 _bt_end_vacuum_callback(int code, Datum arg)
2015 {
2016 	_bt_end_vacuum((Relation) DatumGetPointer(arg));
2017 }
2018 
2019 /*
2020  * BTreeShmemSize --- report amount of shared memory space needed
2021  */
2022 Size
2023 BTreeShmemSize(void)
2024 {
2025 	Size		size;
2026 
2027 	size = offsetof(BTVacInfo, vacuums);
2028 	size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
2029 	return size;
2030 }
2031 
2032 /*
2033  * BTreeShmemInit --- initialize this module's shared memory
2034  */
2035 void
2036 BTreeShmemInit(void)
2037 {
2038 	bool		found;
2039 
2040 	btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
2041 											  BTreeShmemSize(),
2042 											  &found);
2043 
2044 	if (!IsUnderPostmaster)
2045 	{
2046 		/* Initialize shared memory area */
2047 		Assert(!found);
2048 
2049 		/*
2050 		 * It doesn't really matter what the cycle counter starts at, but
2051 		 * having it always start the same doesn't seem good.  Seed with
2052 		 * low-order bits of time() instead.
2053 		 */
2054 		btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
2055 
2056 		btvacinfo->num_vacuums = 0;
2057 		btvacinfo->max_vacuums = MaxBackends;
2058 	}
2059 	else
2060 		Assert(found);
2061 }
2062 
2063 bytea *
2064 btoptions(Datum reloptions, bool validate)
2065 {
2066 	return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
2067 }
2068 
2069 /*
2070  *	btproperty() -- Check boolean properties of indexes.
2071  *
2072  * This is optional, but handling AMPROP_RETURNABLE here saves opening the rel
2073  * to call btcanreturn.
2074  */
2075 bool
2076 btproperty(Oid index_oid, int attno,
2077 		   IndexAMProperty prop, const char *propname,
2078 		   bool *res, bool *isnull)
2079 {
2080 	switch (prop)
2081 	{
2082 		case AMPROP_RETURNABLE:
2083 			/* answer only for columns, not AM or whole index */
2084 			if (attno == 0)
2085 				return false;
2086 			/* otherwise, btree can always return data */
2087 			*res = true;
2088 			return true;
2089 
2090 		default:
2091 			return false;		/* punt to generic code */
2092 	}
2093 }
2094 
2095 /*
2096  *	_bt_nonkey_truncate() -- create tuple without non-key suffix attributes.
2097  *
2098  * Returns truncated index tuple allocated in caller's memory context, with key
2099  * attributes copied from caller's itup argument.  Currently, suffix truncation
2100  * is only performed to create pivot tuples in INCLUDE indexes, but some day it
2101  * could be generalized to remove suffix attributes after the first
2102  * distinguishing key attribute.
2103  *
2104  * Truncated tuple is guaranteed to be no larger than the original, which is
2105  * important for staying under the 1/3 of a page restriction on tuple size.
2106  *
2107  * Note that returned tuple's t_tid offset will hold the number of attributes
2108  * present, so the original item pointer offset is not represented.  Caller
2109  * should only change truncated tuple's downlink.
2110  */
2111 IndexTuple
2112 _bt_nonkey_truncate(Relation rel, IndexTuple itup)
2113 {
2114 	int			nkeyattrs = IndexRelationGetNumberOfKeyAttributes(rel);
2115 	IndexTuple	truncated;
2116 
2117 	/*
2118 	 * We should only ever truncate leaf index tuples, which must have both
2119 	 * key and non-key attributes.  It's never okay to truncate a second time.
2120 	 */
2121 	Assert(BTreeTupleGetNAtts(itup, rel) ==
2122 		   IndexRelationGetNumberOfAttributes(rel));
2123 
2124 	truncated = index_truncate_tuple(RelationGetDescr(rel), itup, nkeyattrs);
2125 	BTreeTupleSetNAtts(truncated, nkeyattrs);
2126 
2127 	return truncated;
2128 }
2129 
2130 /*
2131  *  _bt_check_natts() -- Verify tuple has expected number of attributes.
2132  *
2133  * Returns value indicating if the expected number of attributes were found
2134  * for a particular offset on page.  This can be used as a general purpose
2135  * sanity check.
2136  *
2137  * Testing a tuple directly with BTreeTupleGetNAtts() should generally be
2138  * preferred to calling here.  That's usually more convenient, and is always
2139  * more explicit.  Call here instead when offnum's tuple may be a negative
2140  * infinity tuple that uses the pre-v11 on-disk representation, or when a low
2141  * context check is appropriate.
2142  */
2143 bool
2144 _bt_check_natts(Relation rel, Page page, OffsetNumber offnum)
2145 {
2146 	int16		natts = IndexRelationGetNumberOfAttributes(rel);
2147 	int16		nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
2148 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2149 	IndexTuple	itup;
2150 
2151 	/*
2152 	 * We cannot reliably test a deleted or half-deleted page, since they have
2153 	 * dummy high keys
2154 	 */
2155 	if (P_IGNORE(opaque))
2156 		return true;
2157 
2158 	Assert(offnum >= FirstOffsetNumber &&
2159 		   offnum <= PageGetMaxOffsetNumber(page));
2160 
2161 	/*
2162 	 * Mask allocated for number of keys in index tuple must be able to fit
2163 	 * maximum possible number of index attributes
2164 	 */
2165 	StaticAssertStmt(BT_N_KEYS_OFFSET_MASK >= INDEX_MAX_KEYS,
2166 					 "BT_N_KEYS_OFFSET_MASK can't fit INDEX_MAX_KEYS");
2167 
2168 	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2169 
2170 	if (P_ISLEAF(opaque))
2171 	{
2172 		if (offnum >= P_FIRSTDATAKEY(opaque))
2173 		{
2174 			/*
2175 			 * Leaf tuples that are not the page high key (non-pivot tuples)
2176 			 * should never be truncated
2177 			 */
2178 			return BTreeTupleGetNAtts(itup, rel) == natts;
2179 		}
2180 		else
2181 		{
2182 			/*
2183 			 * Rightmost page doesn't contain a page high key, so tuple was
2184 			 * checked above as ordinary leaf tuple
2185 			 */
2186 			Assert(!P_RIGHTMOST(opaque));
2187 
2188 			/* Page high key tuple contains only key attributes */
2189 			return BTreeTupleGetNAtts(itup, rel) == nkeyatts;
2190 		}
2191 	}
2192 	else						/* !P_ISLEAF(opaque) */
2193 	{
2194 		if (offnum == P_FIRSTDATAKEY(opaque))
2195 		{
2196 			/*
2197 			 * The first tuple on any internal page (possibly the first after
2198 			 * its high key) is its negative infinity tuple.  Negative
2199 			 * infinity tuples are always truncated to zero attributes.  They
2200 			 * are a particular kind of pivot tuple.
2201 			 *
2202 			 * The number of attributes won't be explicitly represented if the
2203 			 * negative infinity tuple was generated during a page split that
2204 			 * occurred with a version of Postgres before v11.  There must be
2205 			 * a problem when there is an explicit representation that is
2206 			 * non-zero, or when there is no explicit representation and the
2207 			 * tuple is evidently not a pre-pg_upgrade tuple.
2208 			 *
2209 			 * Prior to v11, downlinks always had P_HIKEY as their offset. Use
2210 			 * that to decide if the tuple is a pre-v11 tuple.
2211 			 */
2212 			return BTreeTupleGetNAtts(itup, rel) == 0 ||
2213 				((itup->t_info & INDEX_ALT_TID_MASK) == 0 &&
2214 				 ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
2215 		}
2216 		else
2217 		{
2218 			/*
2219 			 * Tuple contains only key attributes despite on is it page high
2220 			 * key or not
2221 			 */
2222 			return BTreeTupleGetNAtts(itup, rel) == nkeyatts;
2223 		}
2224 
2225 	}
2226 }
2227