1 /*-------------------------------------------------------------------------
2  *
3  * nbtutils.c
4  *	  Utility code for Postgres btree implementation.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/nbtree/nbtutils.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include <time.h>
19 
20 #include "access/nbtree.h"
21 #include "access/reloptions.h"
22 #include "access/relscan.h"
23 #include "commands/progress.h"
24 #include "miscadmin.h"
25 #include "utils/array.h"
26 #include "utils/datum.h"
27 #include "utils/lsyscache.h"
28 #include "utils/memutils.h"
29 #include "utils/rel.h"
30 
31 
32 typedef struct BTSortArrayContext
33 {
34 	FmgrInfo	flinfo;
35 	Oid			collation;
36 	bool		reverse;
37 } BTSortArrayContext;
38 
39 static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
40 									  StrategyNumber strat,
41 									  Datum *elems, int nelems);
42 static int	_bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
43 									bool reverse,
44 									Datum *elems, int nelems);
45 static int	_bt_compare_array_elements(const void *a, const void *b, void *arg);
46 static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
47 									 ScanKey leftarg, ScanKey rightarg,
48 									 bool *result);
49 static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption);
50 static void _bt_mark_scankey_required(ScanKey skey);
51 static bool _bt_check_rowcompare(ScanKey skey,
52 								 IndexTuple tuple, int tupnatts, TupleDesc tupdesc,
53 								 ScanDirection dir, bool *continuescan);
54 static int	_bt_keep_natts(Relation rel, IndexTuple lastleft,
55 						   IndexTuple firstright, BTScanInsert itup_key);
56 
57 
58 /*
59  * _bt_mkscankey
60  *		Build an insertion scan key that contains comparison data from itup
61  *		as well as comparator routines appropriate to the key datatypes.
62  *
63  *		When itup is a non-pivot tuple, the returned insertion scan key is
64  *		suitable for finding a place for it to go on the leaf level.  Pivot
65  *		tuples can be used to re-find leaf page with matching high key, but
66  *		then caller needs to set scan key's pivotsearch field to true.  This
67  *		allows caller to search for a leaf page with a matching high key,
68  *		which is usually to the left of the first leaf page a non-pivot match
69  *		might appear on.
70  *
71  *		The result is intended for use with _bt_compare() and _bt_truncate().
72  *		Callers that don't need to fill out the insertion scankey arguments
73  *		(e.g. they use an ad-hoc comparison routine, or only need a scankey
74  *		for _bt_truncate()) can pass a NULL index tuple.  The scankey will
75  *		be initialized as if an "all truncated" pivot tuple was passed
76  *		instead.
77  *
78  *		Note that we may occasionally have to share lock the metapage to
79  *		determine whether or not the keys in the index are expected to be
80  *		unique (i.e. if this is a "heapkeyspace" index).  We assume a
81  *		heapkeyspace index when caller passes a NULL tuple, allowing index
82  *		build callers to avoid accessing the non-existent metapage.
83  */
84 BTScanInsert
_bt_mkscankey(Relation rel,IndexTuple itup)85 _bt_mkscankey(Relation rel, IndexTuple itup)
86 {
87 	BTScanInsert key;
88 	ScanKey		skey;
89 	TupleDesc	itupdesc;
90 	int			indnkeyatts;
91 	int16	   *indoption;
92 	int			tupnatts;
93 	int			i;
94 
95 	itupdesc = RelationGetDescr(rel);
96 	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
97 	indoption = rel->rd_indoption;
98 	tupnatts = itup ? BTreeTupleGetNAtts(itup, rel) : 0;
99 
100 	Assert(tupnatts <= IndexRelationGetNumberOfAttributes(rel));
101 
102 	/*
103 	 * We'll execute search using scan key constructed on key columns.
104 	 * Truncated attributes and non-key attributes are omitted from the final
105 	 * scan key.
106 	 */
107 	key = palloc(offsetof(BTScanInsertData, scankeys) +
108 				 sizeof(ScanKeyData) * indnkeyatts);
109 	key->heapkeyspace = itup == NULL || _bt_heapkeyspace(rel);
110 	key->anynullkeys = false;	/* initial assumption */
111 	key->nextkey = false;
112 	key->pivotsearch = false;
113 	key->keysz = Min(indnkeyatts, tupnatts);
114 	key->scantid = key->heapkeyspace && itup ?
115 		BTreeTupleGetHeapTID(itup) : NULL;
116 	skey = key->scankeys;
117 	for (i = 0; i < indnkeyatts; i++)
118 	{
119 		FmgrInfo   *procinfo;
120 		Datum		arg;
121 		bool		null;
122 		int			flags;
123 
124 		/*
125 		 * We can use the cached (default) support procs since no cross-type
126 		 * comparison can be needed.
127 		 */
128 		procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
129 
130 		/*
131 		 * Key arguments built from truncated attributes (or when caller
132 		 * provides no tuple) are defensively represented as NULL values. They
133 		 * should never be used.
134 		 */
135 		if (i < tupnatts)
136 			arg = index_getattr(itup, i + 1, itupdesc, &null);
137 		else
138 		{
139 			arg = (Datum) 0;
140 			null = true;
141 		}
142 		flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
143 		ScanKeyEntryInitializeWithInfo(&skey[i],
144 									   flags,
145 									   (AttrNumber) (i + 1),
146 									   InvalidStrategy,
147 									   InvalidOid,
148 									   rel->rd_indcollation[i],
149 									   procinfo,
150 									   arg);
151 		/* Record if any key attribute is NULL (or truncated) */
152 		if (null)
153 			key->anynullkeys = true;
154 	}
155 
156 	return key;
157 }
158 
159 /*
160  * free a retracement stack made by _bt_search.
161  */
162 void
_bt_freestack(BTStack stack)163 _bt_freestack(BTStack stack)
164 {
165 	BTStack		ostack;
166 
167 	while (stack != NULL)
168 	{
169 		ostack = stack;
170 		stack = stack->bts_parent;
171 		pfree(ostack);
172 	}
173 }
174 
175 
176 /*
177  *	_bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys
178  *
179  * If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and
180  * set up BTArrayKeyInfo info for each one that is an equality-type key.
181  * Prepare modified scan keys in so->arrayKeyData, which will hold the current
182  * array elements during each primitive indexscan operation.  For inequality
183  * array keys, it's sufficient to find the extreme element value and replace
184  * the whole array with that scalar value.
185  *
186  * Note: the reason we need so->arrayKeyData, rather than just scribbling
187  * on scan->keyData, is that callers are permitted to call btrescan without
188  * supplying a new set of scankey data.
189  */
190 void
_bt_preprocess_array_keys(IndexScanDesc scan)191 _bt_preprocess_array_keys(IndexScanDesc scan)
192 {
193 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
194 	int			numberOfKeys = scan->numberOfKeys;
195 	int16	   *indoption = scan->indexRelation->rd_indoption;
196 	int			numArrayKeys;
197 	ScanKey		cur;
198 	int			i;
199 	MemoryContext oldContext;
200 
201 	/* Quick check to see if there are any array keys */
202 	numArrayKeys = 0;
203 	for (i = 0; i < numberOfKeys; i++)
204 	{
205 		cur = &scan->keyData[i];
206 		if (cur->sk_flags & SK_SEARCHARRAY)
207 		{
208 			numArrayKeys++;
209 			Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL)));
210 			/* If any arrays are null as a whole, we can quit right now. */
211 			if (cur->sk_flags & SK_ISNULL)
212 			{
213 				so->numArrayKeys = -1;
214 				so->arrayKeyData = NULL;
215 				return;
216 			}
217 		}
218 	}
219 
220 	/* Quit if nothing to do. */
221 	if (numArrayKeys == 0)
222 	{
223 		so->numArrayKeys = 0;
224 		so->arrayKeyData = NULL;
225 		return;
226 	}
227 
228 	/*
229 	 * Make a scan-lifespan context to hold array-associated data, or reset it
230 	 * if we already have one from a previous rescan cycle.
231 	 */
232 	if (so->arrayContext == NULL)
233 		so->arrayContext = AllocSetContextCreate(CurrentMemoryContext,
234 												 "BTree array context",
235 												 ALLOCSET_SMALL_SIZES);
236 	else
237 		MemoryContextReset(so->arrayContext);
238 
239 	oldContext = MemoryContextSwitchTo(so->arrayContext);
240 
241 	/* Create modifiable copy of scan->keyData in the workspace context */
242 	so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
243 	memcpy(so->arrayKeyData,
244 		   scan->keyData,
245 		   scan->numberOfKeys * sizeof(ScanKeyData));
246 
247 	/* Allocate space for per-array data in the workspace context */
248 	so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo));
249 
250 	/* Now process each array key */
251 	numArrayKeys = 0;
252 	for (i = 0; i < numberOfKeys; i++)
253 	{
254 		ArrayType  *arrayval;
255 		int16		elmlen;
256 		bool		elmbyval;
257 		char		elmalign;
258 		int			num_elems;
259 		Datum	   *elem_values;
260 		bool	   *elem_nulls;
261 		int			num_nonnulls;
262 		int			j;
263 
264 		cur = &so->arrayKeyData[i];
265 		if (!(cur->sk_flags & SK_SEARCHARRAY))
266 			continue;
267 
268 		/*
269 		 * First, deconstruct the array into elements.  Anything allocated
270 		 * here (including a possibly detoasted array value) is in the
271 		 * workspace context.
272 		 */
273 		arrayval = DatumGetArrayTypeP(cur->sk_argument);
274 		/* We could cache this data, but not clear it's worth it */
275 		get_typlenbyvalalign(ARR_ELEMTYPE(arrayval),
276 							 &elmlen, &elmbyval, &elmalign);
277 		deconstruct_array(arrayval,
278 						  ARR_ELEMTYPE(arrayval),
279 						  elmlen, elmbyval, elmalign,
280 						  &elem_values, &elem_nulls, &num_elems);
281 
282 		/*
283 		 * Compress out any null elements.  We can ignore them since we assume
284 		 * all btree operators are strict.
285 		 */
286 		num_nonnulls = 0;
287 		for (j = 0; j < num_elems; j++)
288 		{
289 			if (!elem_nulls[j])
290 				elem_values[num_nonnulls++] = elem_values[j];
291 		}
292 
293 		/* We could pfree(elem_nulls) now, but not worth the cycles */
294 
295 		/* If there's no non-nulls, the scan qual is unsatisfiable */
296 		if (num_nonnulls == 0)
297 		{
298 			numArrayKeys = -1;
299 			break;
300 		}
301 
302 		/*
303 		 * If the comparison operator is not equality, then the array qual
304 		 * degenerates to a simple comparison against the smallest or largest
305 		 * non-null array element, as appropriate.
306 		 */
307 		switch (cur->sk_strategy)
308 		{
309 			case BTLessStrategyNumber:
310 			case BTLessEqualStrategyNumber:
311 				cur->sk_argument =
312 					_bt_find_extreme_element(scan, cur,
313 											 BTGreaterStrategyNumber,
314 											 elem_values, num_nonnulls);
315 				continue;
316 			case BTEqualStrategyNumber:
317 				/* proceed with rest of loop */
318 				break;
319 			case BTGreaterEqualStrategyNumber:
320 			case BTGreaterStrategyNumber:
321 				cur->sk_argument =
322 					_bt_find_extreme_element(scan, cur,
323 											 BTLessStrategyNumber,
324 											 elem_values, num_nonnulls);
325 				continue;
326 			default:
327 				elog(ERROR, "unrecognized StrategyNumber: %d",
328 					 (int) cur->sk_strategy);
329 				break;
330 		}
331 
332 		/*
333 		 * Sort the non-null elements and eliminate any duplicates.  We must
334 		 * sort in the same ordering used by the index column, so that the
335 		 * successive primitive indexscans produce data in index order.
336 		 */
337 		num_elems = _bt_sort_array_elements(scan, cur,
338 											(indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0,
339 											elem_values, num_nonnulls);
340 
341 		/*
342 		 * And set up the BTArrayKeyInfo data.
343 		 */
344 		so->arrayKeys[numArrayKeys].scan_key = i;
345 		so->arrayKeys[numArrayKeys].num_elems = num_elems;
346 		so->arrayKeys[numArrayKeys].elem_values = elem_values;
347 		numArrayKeys++;
348 	}
349 
350 	so->numArrayKeys = numArrayKeys;
351 
352 	MemoryContextSwitchTo(oldContext);
353 }
354 
355 /*
356  * _bt_find_extreme_element() -- get least or greatest array element
357  *
358  * scan and skey identify the index column, whose opfamily determines the
359  * comparison semantics.  strat should be BTLessStrategyNumber to get the
360  * least element, or BTGreaterStrategyNumber to get the greatest.
361  */
362 static Datum
_bt_find_extreme_element(IndexScanDesc scan,ScanKey skey,StrategyNumber strat,Datum * elems,int nelems)363 _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey,
364 						 StrategyNumber strat,
365 						 Datum *elems, int nelems)
366 {
367 	Relation	rel = scan->indexRelation;
368 	Oid			elemtype,
369 				cmp_op;
370 	RegProcedure cmp_proc;
371 	FmgrInfo	flinfo;
372 	Datum		result;
373 	int			i;
374 
375 	/*
376 	 * Determine the nominal datatype of the array elements.  We have to
377 	 * support the convention that sk_subtype == InvalidOid means the opclass
378 	 * input type; this is a hack to simplify life for ScanKeyInit().
379 	 */
380 	elemtype = skey->sk_subtype;
381 	if (elemtype == InvalidOid)
382 		elemtype = rel->rd_opcintype[skey->sk_attno - 1];
383 
384 	/*
385 	 * Look up the appropriate comparison operator in the opfamily.
386 	 *
387 	 * Note: it's possible that this would fail, if the opfamily is
388 	 * incomplete, but it seems quite unlikely that an opfamily would omit
389 	 * non-cross-type comparison operators for any datatype that it supports
390 	 * at all.
391 	 */
392 	cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1],
393 								 elemtype,
394 								 elemtype,
395 								 strat);
396 	if (!OidIsValid(cmp_op))
397 		elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
398 			 strat, elemtype, elemtype,
399 			 rel->rd_opfamily[skey->sk_attno - 1]);
400 	cmp_proc = get_opcode(cmp_op);
401 	if (!RegProcedureIsValid(cmp_proc))
402 		elog(ERROR, "missing oprcode for operator %u", cmp_op);
403 
404 	fmgr_info(cmp_proc, &flinfo);
405 
406 	Assert(nelems > 0);
407 	result = elems[0];
408 	for (i = 1; i < nelems; i++)
409 	{
410 		if (DatumGetBool(FunctionCall2Coll(&flinfo,
411 										   skey->sk_collation,
412 										   elems[i],
413 										   result)))
414 			result = elems[i];
415 	}
416 
417 	return result;
418 }
419 
420 /*
421  * _bt_sort_array_elements() -- sort and de-dup array elements
422  *
423  * The array elements are sorted in-place, and the new number of elements
424  * after duplicate removal is returned.
425  *
426  * scan and skey identify the index column, whose opfamily determines the
427  * comparison semantics.  If reverse is true, we sort in descending order.
428  */
429 static int
_bt_sort_array_elements(IndexScanDesc scan,ScanKey skey,bool reverse,Datum * elems,int nelems)430 _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
431 						bool reverse,
432 						Datum *elems, int nelems)
433 {
434 	Relation	rel = scan->indexRelation;
435 	Oid			elemtype;
436 	RegProcedure cmp_proc;
437 	BTSortArrayContext cxt;
438 	int			last_non_dup;
439 	int			i;
440 
441 	if (nelems <= 1)
442 		return nelems;			/* no work to do */
443 
444 	/*
445 	 * Determine the nominal datatype of the array elements.  We have to
446 	 * support the convention that sk_subtype == InvalidOid means the opclass
447 	 * input type; this is a hack to simplify life for ScanKeyInit().
448 	 */
449 	elemtype = skey->sk_subtype;
450 	if (elemtype == InvalidOid)
451 		elemtype = rel->rd_opcintype[skey->sk_attno - 1];
452 
453 	/*
454 	 * Look up the appropriate comparison function in the opfamily.
455 	 *
456 	 * Note: it's possible that this would fail, if the opfamily is
457 	 * incomplete, but it seems quite unlikely that an opfamily would omit
458 	 * non-cross-type support functions for any datatype that it supports at
459 	 * all.
460 	 */
461 	cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1],
462 								 elemtype,
463 								 elemtype,
464 								 BTORDER_PROC);
465 	if (!RegProcedureIsValid(cmp_proc))
466 		elog(ERROR, "missing support function %d(%u,%u) in opfamily %u",
467 			 BTORDER_PROC, elemtype, elemtype,
468 			 rel->rd_opfamily[skey->sk_attno - 1]);
469 
470 	/* Sort the array elements */
471 	fmgr_info(cmp_proc, &cxt.flinfo);
472 	cxt.collation = skey->sk_collation;
473 	cxt.reverse = reverse;
474 	qsort_arg((void *) elems, nelems, sizeof(Datum),
475 			  _bt_compare_array_elements, (void *) &cxt);
476 
477 	/* Now scan the sorted elements and remove duplicates */
478 	last_non_dup = 0;
479 	for (i = 1; i < nelems; i++)
480 	{
481 		int32		compare;
482 
483 		compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo,
484 												  cxt.collation,
485 												  elems[last_non_dup],
486 												  elems[i]));
487 		if (compare != 0)
488 			elems[++last_non_dup] = elems[i];
489 	}
490 
491 	return last_non_dup + 1;
492 }
493 
494 /*
495  * qsort_arg comparator for sorting array elements
496  */
497 static int
_bt_compare_array_elements(const void * a,const void * b,void * arg)498 _bt_compare_array_elements(const void *a, const void *b, void *arg)
499 {
500 	Datum		da = *((const Datum *) a);
501 	Datum		db = *((const Datum *) b);
502 	BTSortArrayContext *cxt = (BTSortArrayContext *) arg;
503 	int32		compare;
504 
505 	compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo,
506 											  cxt->collation,
507 											  da, db));
508 	if (cxt->reverse)
509 		INVERT_COMPARE_RESULT(compare);
510 	return compare;
511 }
512 
513 /*
514  * _bt_start_array_keys() -- Initialize array keys at start of a scan
515  *
516  * Set up the cur_elem counters and fill in the first sk_argument value for
517  * each array scankey.  We can't do this until we know the scan direction.
518  */
519 void
_bt_start_array_keys(IndexScanDesc scan,ScanDirection dir)520 _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
521 {
522 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
523 	int			i;
524 
525 	for (i = 0; i < so->numArrayKeys; i++)
526 	{
527 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
528 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
529 
530 		Assert(curArrayKey->num_elems > 0);
531 		if (ScanDirectionIsBackward(dir))
532 			curArrayKey->cur_elem = curArrayKey->num_elems - 1;
533 		else
534 			curArrayKey->cur_elem = 0;
535 		skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
536 	}
537 }
538 
539 /*
540  * _bt_advance_array_keys() -- Advance to next set of array elements
541  *
542  * Returns true if there is another set of values to consider, false if not.
543  * On true result, the scankeys are initialized with the next set of values.
544  */
545 bool
_bt_advance_array_keys(IndexScanDesc scan,ScanDirection dir)546 _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
547 {
548 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
549 	bool		found = false;
550 	int			i;
551 
552 	/*
553 	 * We must advance the last array key most quickly, since it will
554 	 * correspond to the lowest-order index column among the available
555 	 * qualifications. This is necessary to ensure correct ordering of output
556 	 * when there are multiple array keys.
557 	 */
558 	for (i = so->numArrayKeys - 1; i >= 0; i--)
559 	{
560 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
561 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
562 		int			cur_elem = curArrayKey->cur_elem;
563 		int			num_elems = curArrayKey->num_elems;
564 
565 		if (ScanDirectionIsBackward(dir))
566 		{
567 			if (--cur_elem < 0)
568 			{
569 				cur_elem = num_elems - 1;
570 				found = false;	/* need to advance next array key */
571 			}
572 			else
573 				found = true;
574 		}
575 		else
576 		{
577 			if (++cur_elem >= num_elems)
578 			{
579 				cur_elem = 0;
580 				found = false;	/* need to advance next array key */
581 			}
582 			else
583 				found = true;
584 		}
585 
586 		curArrayKey->cur_elem = cur_elem;
587 		skey->sk_argument = curArrayKey->elem_values[cur_elem];
588 		if (found)
589 			break;
590 	}
591 
592 	/* advance parallel scan */
593 	if (scan->parallel_scan != NULL)
594 		_bt_parallel_advance_array_keys(scan);
595 
596 	return found;
597 }
598 
599 /*
600  * _bt_mark_array_keys() -- Handle array keys during btmarkpos
601  *
602  * Save the current state of the array keys as the "mark" position.
603  */
604 void
_bt_mark_array_keys(IndexScanDesc scan)605 _bt_mark_array_keys(IndexScanDesc scan)
606 {
607 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
608 	int			i;
609 
610 	for (i = 0; i < so->numArrayKeys; i++)
611 	{
612 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
613 
614 		curArrayKey->mark_elem = curArrayKey->cur_elem;
615 	}
616 }
617 
618 /*
619  * _bt_restore_array_keys() -- Handle array keys during btrestrpos
620  *
621  * Restore the array keys to where they were when the mark was set.
622  */
623 void
_bt_restore_array_keys(IndexScanDesc scan)624 _bt_restore_array_keys(IndexScanDesc scan)
625 {
626 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
627 	bool		changed = false;
628 	int			i;
629 
630 	/* Restore each array key to its position when the mark was set */
631 	for (i = 0; i < so->numArrayKeys; i++)
632 	{
633 		BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i];
634 		ScanKey		skey = &so->arrayKeyData[curArrayKey->scan_key];
635 		int			mark_elem = curArrayKey->mark_elem;
636 
637 		if (curArrayKey->cur_elem != mark_elem)
638 		{
639 			curArrayKey->cur_elem = mark_elem;
640 			skey->sk_argument = curArrayKey->elem_values[mark_elem];
641 			changed = true;
642 		}
643 	}
644 
645 	/*
646 	 * If we changed any keys, we must redo _bt_preprocess_keys.  That might
647 	 * sound like overkill, but in cases with multiple keys per index column
648 	 * it seems necessary to do the full set of pushups.
649 	 */
650 	if (changed)
651 	{
652 		_bt_preprocess_keys(scan);
653 		/* The mark should have been set on a consistent set of keys... */
654 		Assert(so->qual_ok);
655 	}
656 }
657 
658 
659 /*
660  *	_bt_preprocess_keys() -- Preprocess scan keys
661  *
662  * The given search-type keys (in scan->keyData[] or so->arrayKeyData[])
663  * are copied to so->keyData[] with possible transformation.
664  * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets
665  * the number of output keys (possibly less, never greater).
666  *
667  * The output keys are marked with additional sk_flag bits beyond the
668  * system-standard bits supplied by the caller.  The DESC and NULLS_FIRST
669  * indoption bits for the relevant index attribute are copied into the flags.
670  * Also, for a DESC column, we commute (flip) all the sk_strategy numbers
671  * so that the index sorts in the desired direction.
672  *
673  * One key purpose of this routine is to discover which scan keys must be
674  * satisfied to continue the scan.  It also attempts to eliminate redundant
675  * keys and detect contradictory keys.  (If the index opfamily provides
676  * incomplete sets of cross-type operators, we may fail to detect redundant
677  * or contradictory keys, but we can survive that.)
678  *
679  * The output keys must be sorted by index attribute.  Presently we expect
680  * (but verify) that the input keys are already so sorted --- this is done
681  * by match_clauses_to_index() in indxpath.c.  Some reordering of the keys
682  * within each attribute may be done as a byproduct of the processing here,
683  * but no other code depends on that.
684  *
685  * The output keys are marked with flags SK_BT_REQFWD and/or SK_BT_REQBKWD
686  * if they must be satisfied in order to continue the scan forward or backward
687  * respectively.  _bt_checkkeys uses these flags.  For example, if the quals
688  * are "x = 1 AND y < 4 AND z < 5", then _bt_checkkeys will reject a tuple
689  * (1,2,7), but we must continue the scan in case there are tuples (1,3,z).
690  * But once we reach tuples like (1,4,z) we can stop scanning because no
691  * later tuples could match.  This is reflected by marking the x and y keys,
692  * but not the z key, with SK_BT_REQFWD.  In general, the keys for leading
693  * attributes with "=" keys are marked both SK_BT_REQFWD and SK_BT_REQBKWD.
694  * For the first attribute without an "=" key, any "<" and "<=" keys are
695  * marked SK_BT_REQFWD while any ">" and ">=" keys are marked SK_BT_REQBKWD.
696  * This can be seen to be correct by considering the above example.  Note
697  * in particular that if there are no keys for a given attribute, the keys for
698  * subsequent attributes can never be required; for instance "WHERE y = 4"
699  * requires a full-index scan.
700  *
701  * If possible, redundant keys are eliminated: we keep only the tightest
702  * >/>= bound and the tightest </<= bound, and if there's an = key then
703  * that's the only one returned.  (So, we return either a single = key,
704  * or one or two boundary-condition keys for each attr.)  However, if we
705  * cannot compare two keys for lack of a suitable cross-type operator,
706  * we cannot eliminate either.  If there are two such keys of the same
707  * operator strategy, the second one is just pushed into the output array
708  * without further processing here.  We may also emit both >/>= or both
709  * </<= keys if we can't compare them.  The logic about required keys still
710  * works if we don't eliminate redundant keys.
711  *
712  * Note that one reason we need direction-sensitive required-key flags is
713  * precisely that we may not be able to eliminate redundant keys.  Suppose
714  * we have "x > 4::int AND x > 10::bigint", and we are unable to determine
715  * which key is more restrictive for lack of a suitable cross-type operator.
716  * _bt_first will arbitrarily pick one of the keys to do the initial
717  * positioning with.  If it picks x > 4, then the x > 10 condition will fail
718  * until we reach index entries > 10; but we can't stop the scan just because
719  * x > 10 is failing.  On the other hand, if we are scanning backwards, then
720  * failure of either key is indeed enough to stop the scan.  (In general, when
721  * inequality keys are present, the initial-positioning code only promises to
722  * position before the first possible match, not exactly at the first match,
723  * for a forward scan; or after the last match for a backward scan.)
724  *
725  * As a byproduct of this work, we can detect contradictory quals such
726  * as "x = 1 AND x > 2".  If we see that, we return so->qual_ok = false,
727  * indicating the scan need not be run at all since no tuples can match.
728  * (In this case we do not bother completing the output key array!)
729  * Again, missing cross-type operators might cause us to fail to prove the
730  * quals contradictory when they really are, but the scan will work correctly.
731  *
732  * Row comparison keys are currently also treated without any smarts:
733  * we just transfer them into the preprocessed array without any
734  * editorialization.  We can treat them the same as an ordinary inequality
735  * comparison on the row's first index column, for the purposes of the logic
736  * about required keys.
737  *
738  * Note: the reason we have to copy the preprocessed scan keys into private
739  * storage is that we are modifying the array based on comparisons of the
740  * key argument values, which could change on a rescan or after moving to
741  * new elements of array keys.  Therefore we can't overwrite the source data.
742  */
743 void
_bt_preprocess_keys(IndexScanDesc scan)744 _bt_preprocess_keys(IndexScanDesc scan)
745 {
746 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
747 	int			numberOfKeys = scan->numberOfKeys;
748 	int16	   *indoption = scan->indexRelation->rd_indoption;
749 	int			new_numberOfKeys;
750 	int			numberOfEqualCols;
751 	ScanKey		inkeys;
752 	ScanKey		outkeys;
753 	ScanKey		cur;
754 	ScanKey		xform[BTMaxStrategyNumber];
755 	bool		test_result;
756 	int			i,
757 				j;
758 	AttrNumber	attno;
759 
760 	/* initialize result variables */
761 	so->qual_ok = true;
762 	so->numberOfKeys = 0;
763 
764 	if (numberOfKeys < 1)
765 		return;					/* done if qual-less scan */
766 
767 	/*
768 	 * Read so->arrayKeyData if array keys are present, else scan->keyData
769 	 */
770 	if (so->arrayKeyData != NULL)
771 		inkeys = so->arrayKeyData;
772 	else
773 		inkeys = scan->keyData;
774 
775 	outkeys = so->keyData;
776 	cur = &inkeys[0];
777 	/* we check that input keys are correctly ordered */
778 	if (cur->sk_attno < 1)
779 		elog(ERROR, "btree index keys must be ordered by attribute");
780 
781 	/* We can short-circuit most of the work if there's just one key */
782 	if (numberOfKeys == 1)
783 	{
784 		/* Apply indoption to scankey (might change sk_strategy!) */
785 		if (!_bt_fix_scankey_strategy(cur, indoption))
786 			so->qual_ok = false;
787 		memcpy(outkeys, cur, sizeof(ScanKeyData));
788 		so->numberOfKeys = 1;
789 		/* We can mark the qual as required if it's for first index col */
790 		if (cur->sk_attno == 1)
791 			_bt_mark_scankey_required(outkeys);
792 		return;
793 	}
794 
795 	/*
796 	 * Otherwise, do the full set of pushups.
797 	 */
798 	new_numberOfKeys = 0;
799 	numberOfEqualCols = 0;
800 
801 	/*
802 	 * Initialize for processing of keys for attr 1.
803 	 *
804 	 * xform[i] points to the currently best scan key of strategy type i+1; it
805 	 * is NULL if we haven't yet found such a key for this attr.
806 	 */
807 	attno = 1;
808 	memset(xform, 0, sizeof(xform));
809 
810 	/*
811 	 * Loop iterates from 0 to numberOfKeys inclusive; we use the last pass to
812 	 * handle after-last-key processing.  Actual exit from the loop is at the
813 	 * "break" statement below.
814 	 */
815 	for (i = 0;; cur++, i++)
816 	{
817 		if (i < numberOfKeys)
818 		{
819 			/* Apply indoption to scankey (might change sk_strategy!) */
820 			if (!_bt_fix_scankey_strategy(cur, indoption))
821 			{
822 				/* NULL can't be matched, so give up */
823 				so->qual_ok = false;
824 				return;
825 			}
826 		}
827 
828 		/*
829 		 * If we are at the end of the keys for a particular attr, finish up
830 		 * processing and emit the cleaned-up keys.
831 		 */
832 		if (i == numberOfKeys || cur->sk_attno != attno)
833 		{
834 			int			priorNumberOfEqualCols = numberOfEqualCols;
835 
836 			/* check input keys are correctly ordered */
837 			if (i < numberOfKeys && cur->sk_attno < attno)
838 				elog(ERROR, "btree index keys must be ordered by attribute");
839 
840 			/*
841 			 * If = has been specified, all other keys can be eliminated as
842 			 * redundant.  If we have a case like key = 1 AND key > 2, we can
843 			 * set qual_ok to false and abandon further processing.
844 			 *
845 			 * We also have to deal with the case of "key IS NULL", which is
846 			 * unsatisfiable in combination with any other index condition. By
847 			 * the time we get here, that's been classified as an equality
848 			 * check, and we've rejected any combination of it with a regular
849 			 * equality condition; but not with other types of conditions.
850 			 */
851 			if (xform[BTEqualStrategyNumber - 1])
852 			{
853 				ScanKey		eq = xform[BTEqualStrategyNumber - 1];
854 
855 				for (j = BTMaxStrategyNumber; --j >= 0;)
856 				{
857 					ScanKey		chk = xform[j];
858 
859 					if (!chk || j == (BTEqualStrategyNumber - 1))
860 						continue;
861 
862 					if (eq->sk_flags & SK_SEARCHNULL)
863 					{
864 						/* IS NULL is contradictory to anything else */
865 						so->qual_ok = false;
866 						return;
867 					}
868 
869 					if (_bt_compare_scankey_args(scan, chk, eq, chk,
870 												 &test_result))
871 					{
872 						if (!test_result)
873 						{
874 							/* keys proven mutually contradictory */
875 							so->qual_ok = false;
876 							return;
877 						}
878 						/* else discard the redundant non-equality key */
879 						xform[j] = NULL;
880 					}
881 					/* else, cannot determine redundancy, keep both keys */
882 				}
883 				/* track number of attrs for which we have "=" keys */
884 				numberOfEqualCols++;
885 			}
886 
887 			/* try to keep only one of <, <= */
888 			if (xform[BTLessStrategyNumber - 1]
889 				&& xform[BTLessEqualStrategyNumber - 1])
890 			{
891 				ScanKey		lt = xform[BTLessStrategyNumber - 1];
892 				ScanKey		le = xform[BTLessEqualStrategyNumber - 1];
893 
894 				if (_bt_compare_scankey_args(scan, le, lt, le,
895 											 &test_result))
896 				{
897 					if (test_result)
898 						xform[BTLessEqualStrategyNumber - 1] = NULL;
899 					else
900 						xform[BTLessStrategyNumber - 1] = NULL;
901 				}
902 			}
903 
904 			/* try to keep only one of >, >= */
905 			if (xform[BTGreaterStrategyNumber - 1]
906 				&& xform[BTGreaterEqualStrategyNumber - 1])
907 			{
908 				ScanKey		gt = xform[BTGreaterStrategyNumber - 1];
909 				ScanKey		ge = xform[BTGreaterEqualStrategyNumber - 1];
910 
911 				if (_bt_compare_scankey_args(scan, ge, gt, ge,
912 											 &test_result))
913 				{
914 					if (test_result)
915 						xform[BTGreaterEqualStrategyNumber - 1] = NULL;
916 					else
917 						xform[BTGreaterStrategyNumber - 1] = NULL;
918 				}
919 			}
920 
921 			/*
922 			 * Emit the cleaned-up keys into the outkeys[] array, and then
923 			 * mark them if they are required.  They are required (possibly
924 			 * only in one direction) if all attrs before this one had "=".
925 			 */
926 			for (j = BTMaxStrategyNumber; --j >= 0;)
927 			{
928 				if (xform[j])
929 				{
930 					ScanKey		outkey = &outkeys[new_numberOfKeys++];
931 
932 					memcpy(outkey, xform[j], sizeof(ScanKeyData));
933 					if (priorNumberOfEqualCols == attno - 1)
934 						_bt_mark_scankey_required(outkey);
935 				}
936 			}
937 
938 			/*
939 			 * Exit loop here if done.
940 			 */
941 			if (i == numberOfKeys)
942 				break;
943 
944 			/* Re-initialize for new attno */
945 			attno = cur->sk_attno;
946 			memset(xform, 0, sizeof(xform));
947 		}
948 
949 		/* check strategy this key's operator corresponds to */
950 		j = cur->sk_strategy - 1;
951 
952 		/* if row comparison, push it directly to the output array */
953 		if (cur->sk_flags & SK_ROW_HEADER)
954 		{
955 			ScanKey		outkey = &outkeys[new_numberOfKeys++];
956 
957 			memcpy(outkey, cur, sizeof(ScanKeyData));
958 			if (numberOfEqualCols == attno - 1)
959 				_bt_mark_scankey_required(outkey);
960 
961 			/*
962 			 * We don't support RowCompare using equality; such a qual would
963 			 * mess up the numberOfEqualCols tracking.
964 			 */
965 			Assert(j != (BTEqualStrategyNumber - 1));
966 			continue;
967 		}
968 
969 		/* have we seen one of these before? */
970 		if (xform[j] == NULL)
971 		{
972 			/* nope, so remember this scankey */
973 			xform[j] = cur;
974 		}
975 		else
976 		{
977 			/* yup, keep only the more restrictive key */
978 			if (_bt_compare_scankey_args(scan, cur, cur, xform[j],
979 										 &test_result))
980 			{
981 				if (test_result)
982 					xform[j] = cur;
983 				else if (j == (BTEqualStrategyNumber - 1))
984 				{
985 					/* key == a && key == b, but a != b */
986 					so->qual_ok = false;
987 					return;
988 				}
989 				/* else old key is more restrictive, keep it */
990 			}
991 			else
992 			{
993 				/*
994 				 * We can't determine which key is more restrictive.  Keep the
995 				 * previous one in xform[j] and push this one directly to the
996 				 * output array.
997 				 */
998 				ScanKey		outkey = &outkeys[new_numberOfKeys++];
999 
1000 				memcpy(outkey, cur, sizeof(ScanKeyData));
1001 				if (numberOfEqualCols == attno - 1)
1002 					_bt_mark_scankey_required(outkey);
1003 			}
1004 		}
1005 	}
1006 
1007 	so->numberOfKeys = new_numberOfKeys;
1008 }
1009 
1010 /*
1011  * Compare two scankey values using a specified operator.
1012  *
1013  * The test we want to perform is logically "leftarg op rightarg", where
1014  * leftarg and rightarg are the sk_argument values in those ScanKeys, and
1015  * the comparison operator is the one in the op ScanKey.  However, in
1016  * cross-data-type situations we may need to look up the correct operator in
1017  * the index's opfamily: it is the one having amopstrategy = op->sk_strategy
1018  * and amoplefttype/amoprighttype equal to the two argument datatypes.
1019  *
1020  * If the opfamily doesn't supply a complete set of cross-type operators we
1021  * may not be able to make the comparison.  If we can make the comparison
1022  * we store the operator result in *result and return true.  We return false
1023  * if the comparison could not be made.
1024  *
1025  * Note: op always points at the same ScanKey as either leftarg or rightarg.
1026  * Since we don't scribble on the scankeys, this aliasing should cause no
1027  * trouble.
1028  *
1029  * Note: this routine needs to be insensitive to any DESC option applied
1030  * to the index column.  For example, "x < 4" is a tighter constraint than
1031  * "x < 5" regardless of which way the index is sorted.
1032  */
1033 static bool
_bt_compare_scankey_args(IndexScanDesc scan,ScanKey op,ScanKey leftarg,ScanKey rightarg,bool * result)1034 _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
1035 						 ScanKey leftarg, ScanKey rightarg,
1036 						 bool *result)
1037 {
1038 	Relation	rel = scan->indexRelation;
1039 	Oid			lefttype,
1040 				righttype,
1041 				optype,
1042 				opcintype,
1043 				cmp_op;
1044 	StrategyNumber strat;
1045 
1046 	/*
1047 	 * First, deal with cases where one or both args are NULL.  This should
1048 	 * only happen when the scankeys represent IS NULL/NOT NULL conditions.
1049 	 */
1050 	if ((leftarg->sk_flags | rightarg->sk_flags) & SK_ISNULL)
1051 	{
1052 		bool		leftnull,
1053 					rightnull;
1054 
1055 		if (leftarg->sk_flags & SK_ISNULL)
1056 		{
1057 			Assert(leftarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
1058 			leftnull = true;
1059 		}
1060 		else
1061 			leftnull = false;
1062 		if (rightarg->sk_flags & SK_ISNULL)
1063 		{
1064 			Assert(rightarg->sk_flags & (SK_SEARCHNULL | SK_SEARCHNOTNULL));
1065 			rightnull = true;
1066 		}
1067 		else
1068 			rightnull = false;
1069 
1070 		/*
1071 		 * We treat NULL as either greater than or less than all other values.
1072 		 * Since true > false, the tests below work correctly for NULLS LAST
1073 		 * logic.  If the index is NULLS FIRST, we need to flip the strategy.
1074 		 */
1075 		strat = op->sk_strategy;
1076 		if (op->sk_flags & SK_BT_NULLS_FIRST)
1077 			strat = BTCommuteStrategyNumber(strat);
1078 
1079 		switch (strat)
1080 		{
1081 			case BTLessStrategyNumber:
1082 				*result = (leftnull < rightnull);
1083 				break;
1084 			case BTLessEqualStrategyNumber:
1085 				*result = (leftnull <= rightnull);
1086 				break;
1087 			case BTEqualStrategyNumber:
1088 				*result = (leftnull == rightnull);
1089 				break;
1090 			case BTGreaterEqualStrategyNumber:
1091 				*result = (leftnull >= rightnull);
1092 				break;
1093 			case BTGreaterStrategyNumber:
1094 				*result = (leftnull > rightnull);
1095 				break;
1096 			default:
1097 				elog(ERROR, "unrecognized StrategyNumber: %d", (int) strat);
1098 				*result = false;	/* keep compiler quiet */
1099 				break;
1100 		}
1101 		return true;
1102 	}
1103 
1104 	/*
1105 	 * The opfamily we need to worry about is identified by the index column.
1106 	 */
1107 	Assert(leftarg->sk_attno == rightarg->sk_attno);
1108 
1109 	opcintype = rel->rd_opcintype[leftarg->sk_attno - 1];
1110 
1111 	/*
1112 	 * Determine the actual datatypes of the ScanKey arguments.  We have to
1113 	 * support the convention that sk_subtype == InvalidOid means the opclass
1114 	 * input type; this is a hack to simplify life for ScanKeyInit().
1115 	 */
1116 	lefttype = leftarg->sk_subtype;
1117 	if (lefttype == InvalidOid)
1118 		lefttype = opcintype;
1119 	righttype = rightarg->sk_subtype;
1120 	if (righttype == InvalidOid)
1121 		righttype = opcintype;
1122 	optype = op->sk_subtype;
1123 	if (optype == InvalidOid)
1124 		optype = opcintype;
1125 
1126 	/*
1127 	 * If leftarg and rightarg match the types expected for the "op" scankey,
1128 	 * we can use its already-looked-up comparison function.
1129 	 */
1130 	if (lefttype == opcintype && righttype == optype)
1131 	{
1132 		*result = DatumGetBool(FunctionCall2Coll(&op->sk_func,
1133 												 op->sk_collation,
1134 												 leftarg->sk_argument,
1135 												 rightarg->sk_argument));
1136 		return true;
1137 	}
1138 
1139 	/*
1140 	 * Otherwise, we need to go to the syscache to find the appropriate
1141 	 * operator.  (This cannot result in infinite recursion, since no
1142 	 * indexscan initiated by syscache lookup will use cross-data-type
1143 	 * operators.)
1144 	 *
1145 	 * If the sk_strategy was flipped by _bt_fix_scankey_strategy, we have to
1146 	 * un-flip it to get the correct opfamily member.
1147 	 */
1148 	strat = op->sk_strategy;
1149 	if (op->sk_flags & SK_BT_DESC)
1150 		strat = BTCommuteStrategyNumber(strat);
1151 
1152 	cmp_op = get_opfamily_member(rel->rd_opfamily[leftarg->sk_attno - 1],
1153 								 lefttype,
1154 								 righttype,
1155 								 strat);
1156 	if (OidIsValid(cmp_op))
1157 	{
1158 		RegProcedure cmp_proc = get_opcode(cmp_op);
1159 
1160 		if (RegProcedureIsValid(cmp_proc))
1161 		{
1162 			*result = DatumGetBool(OidFunctionCall2Coll(cmp_proc,
1163 														op->sk_collation,
1164 														leftarg->sk_argument,
1165 														rightarg->sk_argument));
1166 			return true;
1167 		}
1168 	}
1169 
1170 	/* Can't make the comparison */
1171 	*result = false;			/* suppress compiler warnings */
1172 	return false;
1173 }
1174 
1175 /*
1176  * Adjust a scankey's strategy and flags setting as needed for indoptions.
1177  *
1178  * We copy the appropriate indoption value into the scankey sk_flags
1179  * (shifting to avoid clobbering system-defined flag bits).  Also, if
1180  * the DESC option is set, commute (flip) the operator strategy number.
1181  *
1182  * A secondary purpose is to check for IS NULL/NOT NULL scankeys and set up
1183  * the strategy field correctly for them.
1184  *
1185  * Lastly, for ordinary scankeys (not IS NULL/NOT NULL), we check for a
1186  * NULL comparison value.  Since all btree operators are assumed strict,
1187  * a NULL means that the qual cannot be satisfied.  We return true if the
1188  * comparison value isn't NULL, or false if the scan should be abandoned.
1189  *
1190  * This function is applied to the *input* scankey structure; therefore
1191  * on a rescan we will be looking at already-processed scankeys.  Hence
1192  * we have to be careful not to re-commute the strategy if we already did it.
1193  * It's a bit ugly to modify the caller's copy of the scankey but in practice
1194  * there shouldn't be any problem, since the index's indoptions are certainly
1195  * not going to change while the scankey survives.
1196  */
1197 static bool
_bt_fix_scankey_strategy(ScanKey skey,int16 * indoption)1198 _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption)
1199 {
1200 	int			addflags;
1201 
1202 	addflags = indoption[skey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1203 
1204 	/*
1205 	 * We treat all btree operators as strict (even if they're not so marked
1206 	 * in pg_proc). This means that it is impossible for an operator condition
1207 	 * with a NULL comparison constant to succeed, and we can reject it right
1208 	 * away.
1209 	 *
1210 	 * However, we now also support "x IS NULL" clauses as search conditions,
1211 	 * so in that case keep going. The planner has not filled in any
1212 	 * particular strategy in this case, so set it to BTEqualStrategyNumber
1213 	 * --- we can treat IS NULL as an equality operator for purposes of search
1214 	 * strategy.
1215 	 *
1216 	 * Likewise, "x IS NOT NULL" is supported.  We treat that as either "less
1217 	 * than NULL" in a NULLS LAST index, or "greater than NULL" in a NULLS
1218 	 * FIRST index.
1219 	 *
1220 	 * Note: someday we might have to fill in sk_collation from the index
1221 	 * column's collation.  At the moment this is a non-issue because we'll
1222 	 * never actually call the comparison operator on a NULL.
1223 	 */
1224 	if (skey->sk_flags & SK_ISNULL)
1225 	{
1226 		/* SK_ISNULL shouldn't be set in a row header scankey */
1227 		Assert(!(skey->sk_flags & SK_ROW_HEADER));
1228 
1229 		/* Set indoption flags in scankey (might be done already) */
1230 		skey->sk_flags |= addflags;
1231 
1232 		/* Set correct strategy for IS NULL or NOT NULL search */
1233 		if (skey->sk_flags & SK_SEARCHNULL)
1234 		{
1235 			skey->sk_strategy = BTEqualStrategyNumber;
1236 			skey->sk_subtype = InvalidOid;
1237 			skey->sk_collation = InvalidOid;
1238 		}
1239 		else if (skey->sk_flags & SK_SEARCHNOTNULL)
1240 		{
1241 			if (skey->sk_flags & SK_BT_NULLS_FIRST)
1242 				skey->sk_strategy = BTGreaterStrategyNumber;
1243 			else
1244 				skey->sk_strategy = BTLessStrategyNumber;
1245 			skey->sk_subtype = InvalidOid;
1246 			skey->sk_collation = InvalidOid;
1247 		}
1248 		else
1249 		{
1250 			/* regular qual, so it cannot be satisfied */
1251 			return false;
1252 		}
1253 
1254 		/* Needn't do the rest */
1255 		return true;
1256 	}
1257 
1258 	/* Adjust strategy for DESC, if we didn't already */
1259 	if ((addflags & SK_BT_DESC) && !(skey->sk_flags & SK_BT_DESC))
1260 		skey->sk_strategy = BTCommuteStrategyNumber(skey->sk_strategy);
1261 	skey->sk_flags |= addflags;
1262 
1263 	/* If it's a row header, fix row member flags and strategies similarly */
1264 	if (skey->sk_flags & SK_ROW_HEADER)
1265 	{
1266 		ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1267 
1268 		for (;;)
1269 		{
1270 			Assert(subkey->sk_flags & SK_ROW_MEMBER);
1271 			addflags = indoption[subkey->sk_attno - 1] << SK_BT_INDOPTION_SHIFT;
1272 			if ((addflags & SK_BT_DESC) && !(subkey->sk_flags & SK_BT_DESC))
1273 				subkey->sk_strategy = BTCommuteStrategyNumber(subkey->sk_strategy);
1274 			subkey->sk_flags |= addflags;
1275 			if (subkey->sk_flags & SK_ROW_END)
1276 				break;
1277 			subkey++;
1278 		}
1279 	}
1280 
1281 	return true;
1282 }
1283 
1284 /*
1285  * Mark a scankey as "required to continue the scan".
1286  *
1287  * Depending on the operator type, the key may be required for both scan
1288  * directions or just one.  Also, if the key is a row comparison header,
1289  * we have to mark its first subsidiary ScanKey as required.  (Subsequent
1290  * subsidiary ScanKeys are normally for lower-order columns, and thus
1291  * cannot be required, since they're after the first non-equality scankey.)
1292  *
1293  * Note: when we set required-key flag bits in a subsidiary scankey, we are
1294  * scribbling on a data structure belonging to the index AM's caller, not on
1295  * our private copy.  This should be OK because the marking will not change
1296  * from scan to scan within a query, and so we'd just re-mark the same way
1297  * anyway on a rescan.  Something to keep an eye on though.
1298  */
1299 static void
_bt_mark_scankey_required(ScanKey skey)1300 _bt_mark_scankey_required(ScanKey skey)
1301 {
1302 	int			addflags;
1303 
1304 	switch (skey->sk_strategy)
1305 	{
1306 		case BTLessStrategyNumber:
1307 		case BTLessEqualStrategyNumber:
1308 			addflags = SK_BT_REQFWD;
1309 			break;
1310 		case BTEqualStrategyNumber:
1311 			addflags = SK_BT_REQFWD | SK_BT_REQBKWD;
1312 			break;
1313 		case BTGreaterEqualStrategyNumber:
1314 		case BTGreaterStrategyNumber:
1315 			addflags = SK_BT_REQBKWD;
1316 			break;
1317 		default:
1318 			elog(ERROR, "unrecognized StrategyNumber: %d",
1319 				 (int) skey->sk_strategy);
1320 			addflags = 0;		/* keep compiler quiet */
1321 			break;
1322 	}
1323 
1324 	skey->sk_flags |= addflags;
1325 
1326 	if (skey->sk_flags & SK_ROW_HEADER)
1327 	{
1328 		ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1329 
1330 		/* First subkey should be same column/operator as the header */
1331 		Assert(subkey->sk_flags & SK_ROW_MEMBER);
1332 		Assert(subkey->sk_attno == skey->sk_attno);
1333 		Assert(subkey->sk_strategy == skey->sk_strategy);
1334 		subkey->sk_flags |= addflags;
1335 	}
1336 }
1337 
1338 /*
1339  * Test whether an indextuple satisfies all the scankey conditions.
1340  *
1341  * Return true if so, false if not.  If the tuple fails to pass the qual,
1342  * we also determine whether there's any need to continue the scan beyond
1343  * this tuple, and set *continuescan accordingly.  See comments for
1344  * _bt_preprocess_keys(), above, about how this is done.
1345  *
1346  * Forward scan callers can pass a high key tuple in the hopes of having
1347  * us set *continuescan to false, and avoiding an unnecessary visit to
1348  * the page to the right.
1349  *
1350  * scan: index scan descriptor (containing a search-type scankey)
1351  * tuple: index tuple to test
1352  * tupnatts: number of attributes in tupnatts (high key may be truncated)
1353  * dir: direction we are scanning in
1354  * continuescan: output parameter (will be set correctly in all cases)
1355  */
1356 bool
_bt_checkkeys(IndexScanDesc scan,IndexTuple tuple,int tupnatts,ScanDirection dir,bool * continuescan)1357 _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, int tupnatts,
1358 			  ScanDirection dir, bool *continuescan)
1359 {
1360 	TupleDesc	tupdesc;
1361 	BTScanOpaque so;
1362 	int			keysz;
1363 	int			ikey;
1364 	ScanKey		key;
1365 
1366 	Assert(BTreeTupleGetNAtts(tuple, scan->indexRelation) == tupnatts);
1367 
1368 	*continuescan = true;		/* default assumption */
1369 
1370 	tupdesc = RelationGetDescr(scan->indexRelation);
1371 	so = (BTScanOpaque) scan->opaque;
1372 	keysz = so->numberOfKeys;
1373 
1374 	for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
1375 	{
1376 		Datum		datum;
1377 		bool		isNull;
1378 		Datum		test;
1379 
1380 		if (key->sk_attno > tupnatts)
1381 		{
1382 			/*
1383 			 * This attribute is truncated (must be high key).  The value for
1384 			 * this attribute in the first non-pivot tuple on the page to the
1385 			 * right could be any possible value.  Assume that truncated
1386 			 * attribute passes the qual.
1387 			 */
1388 			Assert(ScanDirectionIsForward(dir));
1389 			continue;
1390 		}
1391 
1392 		/* row-comparison keys need special processing */
1393 		if (key->sk_flags & SK_ROW_HEADER)
1394 		{
1395 			if (_bt_check_rowcompare(key, tuple, tupnatts, tupdesc, dir,
1396 									 continuescan))
1397 				continue;
1398 			return false;
1399 		}
1400 
1401 		datum = index_getattr(tuple,
1402 							  key->sk_attno,
1403 							  tupdesc,
1404 							  &isNull);
1405 
1406 		if (key->sk_flags & SK_ISNULL)
1407 		{
1408 			/* Handle IS NULL/NOT NULL tests */
1409 			if (key->sk_flags & SK_SEARCHNULL)
1410 			{
1411 				if (isNull)
1412 					continue;	/* tuple satisfies this qual */
1413 			}
1414 			else
1415 			{
1416 				Assert(key->sk_flags & SK_SEARCHNOTNULL);
1417 				if (!isNull)
1418 					continue;	/* tuple satisfies this qual */
1419 			}
1420 
1421 			/*
1422 			 * Tuple fails this qual.  If it's a required qual for the current
1423 			 * scan direction, then we can conclude no further tuples will
1424 			 * pass, either.
1425 			 */
1426 			if ((key->sk_flags & SK_BT_REQFWD) &&
1427 				ScanDirectionIsForward(dir))
1428 				*continuescan = false;
1429 			else if ((key->sk_flags & SK_BT_REQBKWD) &&
1430 					 ScanDirectionIsBackward(dir))
1431 				*continuescan = false;
1432 
1433 			/*
1434 			 * In any case, this indextuple doesn't match the qual.
1435 			 */
1436 			return false;
1437 		}
1438 
1439 		if (isNull)
1440 		{
1441 			if (key->sk_flags & SK_BT_NULLS_FIRST)
1442 			{
1443 				/*
1444 				 * Since NULLs are sorted before non-NULLs, we know we have
1445 				 * reached the lower limit of the range of values for this
1446 				 * index attr.  On a backward scan, we can stop if this qual
1447 				 * is one of the "must match" subset.  We can stop regardless
1448 				 * of whether the qual is > or <, so long as it's required,
1449 				 * because it's not possible for any future tuples to pass. On
1450 				 * a forward scan, however, we must keep going, because we may
1451 				 * have initially positioned to the start of the index.
1452 				 */
1453 				if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1454 					ScanDirectionIsBackward(dir))
1455 					*continuescan = false;
1456 			}
1457 			else
1458 			{
1459 				/*
1460 				 * Since NULLs are sorted after non-NULLs, we know we have
1461 				 * reached the upper limit of the range of values for this
1462 				 * index attr.  On a forward scan, we can stop if this qual is
1463 				 * one of the "must match" subset.  We can stop regardless of
1464 				 * whether the qual is > or <, so long as it's required,
1465 				 * because it's not possible for any future tuples to pass. On
1466 				 * a backward scan, however, we must keep going, because we
1467 				 * may have initially positioned to the end of the index.
1468 				 */
1469 				if ((key->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1470 					ScanDirectionIsForward(dir))
1471 					*continuescan = false;
1472 			}
1473 
1474 			/*
1475 			 * In any case, this indextuple doesn't match the qual.
1476 			 */
1477 			return false;
1478 		}
1479 
1480 		test = FunctionCall2Coll(&key->sk_func, key->sk_collation,
1481 								 datum, key->sk_argument);
1482 
1483 		if (!DatumGetBool(test))
1484 		{
1485 			/*
1486 			 * Tuple fails this qual.  If it's a required qual for the current
1487 			 * scan direction, then we can conclude no further tuples will
1488 			 * pass, either.
1489 			 *
1490 			 * Note: because we stop the scan as soon as any required equality
1491 			 * qual fails, it is critical that equality quals be used for the
1492 			 * initial positioning in _bt_first() when they are available. See
1493 			 * comments in _bt_first().
1494 			 */
1495 			if ((key->sk_flags & SK_BT_REQFWD) &&
1496 				ScanDirectionIsForward(dir))
1497 				*continuescan = false;
1498 			else if ((key->sk_flags & SK_BT_REQBKWD) &&
1499 					 ScanDirectionIsBackward(dir))
1500 				*continuescan = false;
1501 
1502 			/*
1503 			 * In any case, this indextuple doesn't match the qual.
1504 			 */
1505 			return false;
1506 		}
1507 	}
1508 
1509 	/* If we get here, the tuple passes all index quals. */
1510 	return true;
1511 }
1512 
1513 /*
1514  * Test whether an indextuple satisfies a row-comparison scan condition.
1515  *
1516  * Return true if so, false if not.  If not, also clear *continuescan if
1517  * it's not possible for any future tuples in the current scan direction
1518  * to pass the qual.
1519  *
1520  * This is a subroutine for _bt_checkkeys, which see for more info.
1521  */
1522 static bool
_bt_check_rowcompare(ScanKey skey,IndexTuple tuple,int tupnatts,TupleDesc tupdesc,ScanDirection dir,bool * continuescan)1523 _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, int tupnatts,
1524 					 TupleDesc tupdesc, ScanDirection dir, bool *continuescan)
1525 {
1526 	ScanKey		subkey = (ScanKey) DatumGetPointer(skey->sk_argument);
1527 	int32		cmpresult = 0;
1528 	bool		result;
1529 
1530 	/* First subkey should be same as the header says */
1531 	Assert(subkey->sk_attno == skey->sk_attno);
1532 
1533 	/* Loop over columns of the row condition */
1534 	for (;;)
1535 	{
1536 		Datum		datum;
1537 		bool		isNull;
1538 
1539 		Assert(subkey->sk_flags & SK_ROW_MEMBER);
1540 
1541 		if (subkey->sk_attno > tupnatts)
1542 		{
1543 			/*
1544 			 * This attribute is truncated (must be high key).  The value for
1545 			 * this attribute in the first non-pivot tuple on the page to the
1546 			 * right could be any possible value.  Assume that truncated
1547 			 * attribute passes the qual.
1548 			 */
1549 			Assert(ScanDirectionIsForward(dir));
1550 			cmpresult = 0;
1551 			if (subkey->sk_flags & SK_ROW_END)
1552 				break;
1553 			subkey++;
1554 			continue;
1555 		}
1556 
1557 		datum = index_getattr(tuple,
1558 							  subkey->sk_attno,
1559 							  tupdesc,
1560 							  &isNull);
1561 
1562 		if (isNull)
1563 		{
1564 			if (subkey->sk_flags & SK_BT_NULLS_FIRST)
1565 			{
1566 				/*
1567 				 * Since NULLs are sorted before non-NULLs, we know we have
1568 				 * reached the lower limit of the range of values for this
1569 				 * index attr.  On a backward scan, we can stop if this qual
1570 				 * is one of the "must match" subset.  We can stop regardless
1571 				 * of whether the qual is > or <, so long as it's required,
1572 				 * because it's not possible for any future tuples to pass. On
1573 				 * a forward scan, however, we must keep going, because we may
1574 				 * have initially positioned to the start of the index.
1575 				 */
1576 				if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1577 					ScanDirectionIsBackward(dir))
1578 					*continuescan = false;
1579 			}
1580 			else
1581 			{
1582 				/*
1583 				 * Since NULLs are sorted after non-NULLs, we know we have
1584 				 * reached the upper limit of the range of values for this
1585 				 * index attr.  On a forward scan, we can stop if this qual is
1586 				 * one of the "must match" subset.  We can stop regardless of
1587 				 * whether the qual is > or <, so long as it's required,
1588 				 * because it's not possible for any future tuples to pass. On
1589 				 * a backward scan, however, we must keep going, because we
1590 				 * may have initially positioned to the end of the index.
1591 				 */
1592 				if ((subkey->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) &&
1593 					ScanDirectionIsForward(dir))
1594 					*continuescan = false;
1595 			}
1596 
1597 			/*
1598 			 * In any case, this indextuple doesn't match the qual.
1599 			 */
1600 			return false;
1601 		}
1602 
1603 		if (subkey->sk_flags & SK_ISNULL)
1604 		{
1605 			/*
1606 			 * Unlike the simple-scankey case, this isn't a disallowed case.
1607 			 * But it can never match.  If all the earlier row comparison
1608 			 * columns are required for the scan direction, we can stop the
1609 			 * scan, because there can't be another tuple that will succeed.
1610 			 */
1611 			if (subkey != (ScanKey) DatumGetPointer(skey->sk_argument))
1612 				subkey--;
1613 			if ((subkey->sk_flags & SK_BT_REQFWD) &&
1614 				ScanDirectionIsForward(dir))
1615 				*continuescan = false;
1616 			else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1617 					 ScanDirectionIsBackward(dir))
1618 				*continuescan = false;
1619 			return false;
1620 		}
1621 
1622 		/* Perform the test --- three-way comparison not bool operator */
1623 		cmpresult = DatumGetInt32(FunctionCall2Coll(&subkey->sk_func,
1624 													subkey->sk_collation,
1625 													datum,
1626 													subkey->sk_argument));
1627 
1628 		if (subkey->sk_flags & SK_BT_DESC)
1629 			INVERT_COMPARE_RESULT(cmpresult);
1630 
1631 		/* Done comparing if unequal, else advance to next column */
1632 		if (cmpresult != 0)
1633 			break;
1634 
1635 		if (subkey->sk_flags & SK_ROW_END)
1636 			break;
1637 		subkey++;
1638 	}
1639 
1640 	/*
1641 	 * At this point cmpresult indicates the overall result of the row
1642 	 * comparison, and subkey points to the deciding column (or the last
1643 	 * column if the result is "=").
1644 	 */
1645 	switch (subkey->sk_strategy)
1646 	{
1647 			/* EQ and NE cases aren't allowed here */
1648 		case BTLessStrategyNumber:
1649 			result = (cmpresult < 0);
1650 			break;
1651 		case BTLessEqualStrategyNumber:
1652 			result = (cmpresult <= 0);
1653 			break;
1654 		case BTGreaterEqualStrategyNumber:
1655 			result = (cmpresult >= 0);
1656 			break;
1657 		case BTGreaterStrategyNumber:
1658 			result = (cmpresult > 0);
1659 			break;
1660 		default:
1661 			elog(ERROR, "unrecognized RowCompareType: %d",
1662 				 (int) subkey->sk_strategy);
1663 			result = 0;			/* keep compiler quiet */
1664 			break;
1665 	}
1666 
1667 	if (!result)
1668 	{
1669 		/*
1670 		 * Tuple fails this qual.  If it's a required qual for the current
1671 		 * scan direction, then we can conclude no further tuples will pass,
1672 		 * either.  Note we have to look at the deciding column, not
1673 		 * necessarily the first or last column of the row condition.
1674 		 */
1675 		if ((subkey->sk_flags & SK_BT_REQFWD) &&
1676 			ScanDirectionIsForward(dir))
1677 			*continuescan = false;
1678 		else if ((subkey->sk_flags & SK_BT_REQBKWD) &&
1679 				 ScanDirectionIsBackward(dir))
1680 			*continuescan = false;
1681 	}
1682 
1683 	return result;
1684 }
1685 
1686 /*
1687  * _bt_killitems - set LP_DEAD state for items an indexscan caller has
1688  * told us were killed
1689  *
1690  * scan->opaque, referenced locally through so, contains information about the
1691  * current page and killed tuples thereon (generally, this should only be
1692  * called if so->numKilled > 0).
1693  *
1694  * The caller does not have a lock on the page and may or may not have the
1695  * page pinned in a buffer.  Note that read-lock is sufficient for setting
1696  * LP_DEAD status (which is only a hint).
1697  *
1698  * We match items by heap TID before assuming they are the right ones to
1699  * delete.  We cope with cases where items have moved right due to insertions.
1700  * If an item has moved off the current page due to a split, we'll fail to
1701  * find it and do nothing (this is not an error case --- we assume the item
1702  * will eventually get marked in a future indexscan).
1703  *
1704  * Note that if we hold a pin on the target page continuously from initially
1705  * reading the items until applying this function, VACUUM cannot have deleted
1706  * any items from the page, and so there is no need to search left from the
1707  * recorded offset.  (This observation also guarantees that the item is still
1708  * the right one to delete, which might otherwise be questionable since heap
1709  * TIDs can get recycled.)	This holds true even if the page has been modified
1710  * by inserts and page splits, so there is no need to consult the LSN.
1711  *
1712  * If the pin was released after reading the page, then we re-read it.  If it
1713  * has been modified since we read it (as determined by the LSN), we dare not
1714  * flag any entries because it is possible that the old entry was vacuumed
1715  * away and the TID was re-used by a completely different heap tuple.
1716  */
1717 void
_bt_killitems(IndexScanDesc scan)1718 _bt_killitems(IndexScanDesc scan)
1719 {
1720 	BTScanOpaque so = (BTScanOpaque) scan->opaque;
1721 	Page		page;
1722 	BTPageOpaque opaque;
1723 	OffsetNumber minoff;
1724 	OffsetNumber maxoff;
1725 	int			i;
1726 	int			numKilled = so->numKilled;
1727 	bool		killedsomething = false;
1728 
1729 	Assert(BTScanPosIsValid(so->currPos));
1730 
1731 	/*
1732 	 * Always reset the scan state, so we don't look for same items on other
1733 	 * pages.
1734 	 */
1735 	so->numKilled = 0;
1736 
1737 	if (BTScanPosIsPinned(so->currPos))
1738 	{
1739 		/*
1740 		 * We have held the pin on this page since we read the index tuples,
1741 		 * so all we need to do is lock it.  The pin will have prevented
1742 		 * re-use of any TID on the page, so there is no need to check the
1743 		 * LSN.
1744 		 */
1745 		LockBuffer(so->currPos.buf, BT_READ);
1746 
1747 		page = BufferGetPage(so->currPos.buf);
1748 	}
1749 	else
1750 	{
1751 		Buffer		buf;
1752 
1753 		/* Attempt to re-read the buffer, getting pin and lock. */
1754 		buf = _bt_getbuf(scan->indexRelation, so->currPos.currPage, BT_READ);
1755 
1756 		/* It might not exist anymore; in which case we can't hint it. */
1757 		if (!BufferIsValid(buf))
1758 			return;
1759 
1760 		page = BufferGetPage(buf);
1761 		if (BufferGetLSNAtomic(buf) == so->currPos.lsn)
1762 			so->currPos.buf = buf;
1763 		else
1764 		{
1765 			/* Modified while not pinned means hinting is not safe. */
1766 			_bt_relbuf(scan->indexRelation, buf);
1767 			return;
1768 		}
1769 	}
1770 
1771 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1772 	minoff = P_FIRSTDATAKEY(opaque);
1773 	maxoff = PageGetMaxOffsetNumber(page);
1774 
1775 	for (i = 0; i < numKilled; i++)
1776 	{
1777 		int			itemIndex = so->killedItems[i];
1778 		BTScanPosItem *kitem = &so->currPos.items[itemIndex];
1779 		OffsetNumber offnum = kitem->indexOffset;
1780 
1781 		Assert(itemIndex >= so->currPos.firstItem &&
1782 			   itemIndex <= so->currPos.lastItem);
1783 		if (offnum < minoff)
1784 			continue;			/* pure paranoia */
1785 		while (offnum <= maxoff)
1786 		{
1787 			ItemId		iid = PageGetItemId(page, offnum);
1788 			IndexTuple	ituple = (IndexTuple) PageGetItem(page, iid);
1789 
1790 			if (ItemPointerEquals(&ituple->t_tid, &kitem->heapTid))
1791 			{
1792 				/*
1793 				 * Found the item.  Mark it as dead, if it isn't already.
1794 				 * Since this happens while holding a buffer lock possibly in
1795 				 * shared mode, it's possible that multiple processes attempt
1796 				 * to do this simultaneously, leading to multiple full-page
1797 				 * images being sent to WAL (if wal_log_hints or data checksums
1798 				 * are enabled), which is undesirable.
1799 				 */
1800 				if (!ItemIdIsDead(iid))
1801 				{
1802 					ItemIdMarkDead(iid);
1803 					killedsomething = true;
1804 				}
1805 				break;			/* out of inner search loop */
1806 			}
1807 			offnum = OffsetNumberNext(offnum);
1808 		}
1809 	}
1810 
1811 	/*
1812 	 * Since this can be redone later if needed, mark as dirty hint.
1813 	 *
1814 	 * Whenever we mark anything LP_DEAD, we also set the page's
1815 	 * BTP_HAS_GARBAGE flag, which is likewise just a hint.
1816 	 */
1817 	if (killedsomething)
1818 	{
1819 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
1820 		MarkBufferDirtyHint(so->currPos.buf, true);
1821 	}
1822 
1823 	LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
1824 }
1825 
1826 
1827 /*
1828  * The following routines manage a shared-memory area in which we track
1829  * assignment of "vacuum cycle IDs" to currently-active btree vacuuming
1830  * operations.  There is a single counter which increments each time we
1831  * start a vacuum to assign it a cycle ID.  Since multiple vacuums could
1832  * be active concurrently, we have to track the cycle ID for each active
1833  * vacuum; this requires at most MaxBackends entries (usually far fewer).
1834  * We assume at most one vacuum can be active for a given index.
1835  *
1836  * Access to the shared memory area is controlled by BtreeVacuumLock.
1837  * In principle we could use a separate lmgr locktag for each index,
1838  * but a single LWLock is much cheaper, and given the short time that
1839  * the lock is ever held, the concurrency hit should be minimal.
1840  */
1841 
1842 typedef struct BTOneVacInfo
1843 {
1844 	LockRelId	relid;			/* global identifier of an index */
1845 	BTCycleId	cycleid;		/* cycle ID for its active VACUUM */
1846 } BTOneVacInfo;
1847 
1848 typedef struct BTVacInfo
1849 {
1850 	BTCycleId	cycle_ctr;		/* cycle ID most recently assigned */
1851 	int			num_vacuums;	/* number of currently active VACUUMs */
1852 	int			max_vacuums;	/* allocated length of vacuums[] array */
1853 	BTOneVacInfo vacuums[FLEXIBLE_ARRAY_MEMBER];
1854 } BTVacInfo;
1855 
1856 static BTVacInfo *btvacinfo;
1857 
1858 
1859 /*
1860  * _bt_vacuum_cycleid --- get the active vacuum cycle ID for an index,
1861  *		or zero if there is no active VACUUM
1862  *
1863  * Note: for correct interlocking, the caller must already hold pin and
1864  * exclusive lock on each buffer it will store the cycle ID into.  This
1865  * ensures that even if a VACUUM starts immediately afterwards, it cannot
1866  * process those pages until the page split is complete.
1867  */
1868 BTCycleId
_bt_vacuum_cycleid(Relation rel)1869 _bt_vacuum_cycleid(Relation rel)
1870 {
1871 	BTCycleId	result = 0;
1872 	int			i;
1873 
1874 	/* Share lock is enough since this is a read-only operation */
1875 	LWLockAcquire(BtreeVacuumLock, LW_SHARED);
1876 
1877 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1878 	{
1879 		BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1880 
1881 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1882 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1883 		{
1884 			result = vac->cycleid;
1885 			break;
1886 		}
1887 	}
1888 
1889 	LWLockRelease(BtreeVacuumLock);
1890 	return result;
1891 }
1892 
1893 /*
1894  * _bt_start_vacuum --- assign a cycle ID to a just-starting VACUUM operation
1895  *
1896  * Note: the caller must guarantee that it will eventually call
1897  * _bt_end_vacuum, else we'll permanently leak an array slot.  To ensure
1898  * that this happens even in elog(FATAL) scenarios, the appropriate coding
1899  * is not just a PG_TRY, but
1900  *		PG_ENSURE_ERROR_CLEANUP(_bt_end_vacuum_callback, PointerGetDatum(rel))
1901  */
1902 BTCycleId
_bt_start_vacuum(Relation rel)1903 _bt_start_vacuum(Relation rel)
1904 {
1905 	BTCycleId	result;
1906 	int			i;
1907 	BTOneVacInfo *vac;
1908 
1909 	LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1910 
1911 	/*
1912 	 * Assign the next cycle ID, being careful to avoid zero as well as the
1913 	 * reserved high values.
1914 	 */
1915 	result = ++(btvacinfo->cycle_ctr);
1916 	if (result == 0 || result > MAX_BT_CYCLE_ID)
1917 		result = btvacinfo->cycle_ctr = 1;
1918 
1919 	/* Let's just make sure there's no entry already for this index */
1920 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1921 	{
1922 		vac = &btvacinfo->vacuums[i];
1923 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1924 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1925 		{
1926 			/*
1927 			 * Unlike most places in the backend, we have to explicitly
1928 			 * release our LWLock before throwing an error.  This is because
1929 			 * we expect _bt_end_vacuum() to be called before transaction
1930 			 * abort cleanup can run to release LWLocks.
1931 			 */
1932 			LWLockRelease(BtreeVacuumLock);
1933 			elog(ERROR, "multiple active vacuums for index \"%s\"",
1934 				 RelationGetRelationName(rel));
1935 		}
1936 	}
1937 
1938 	/* OK, add an entry */
1939 	if (btvacinfo->num_vacuums >= btvacinfo->max_vacuums)
1940 	{
1941 		LWLockRelease(BtreeVacuumLock);
1942 		elog(ERROR, "out of btvacinfo slots");
1943 	}
1944 	vac = &btvacinfo->vacuums[btvacinfo->num_vacuums];
1945 	vac->relid = rel->rd_lockInfo.lockRelId;
1946 	vac->cycleid = result;
1947 	btvacinfo->num_vacuums++;
1948 
1949 	LWLockRelease(BtreeVacuumLock);
1950 	return result;
1951 }
1952 
1953 /*
1954  * _bt_end_vacuum --- mark a btree VACUUM operation as done
1955  *
1956  * Note: this is deliberately coded not to complain if no entry is found;
1957  * this allows the caller to put PG_TRY around the start_vacuum operation.
1958  */
1959 void
_bt_end_vacuum(Relation rel)1960 _bt_end_vacuum(Relation rel)
1961 {
1962 	int			i;
1963 
1964 	LWLockAcquire(BtreeVacuumLock, LW_EXCLUSIVE);
1965 
1966 	/* Find the array entry */
1967 	for (i = 0; i < btvacinfo->num_vacuums; i++)
1968 	{
1969 		BTOneVacInfo *vac = &btvacinfo->vacuums[i];
1970 
1971 		if (vac->relid.relId == rel->rd_lockInfo.lockRelId.relId &&
1972 			vac->relid.dbId == rel->rd_lockInfo.lockRelId.dbId)
1973 		{
1974 			/* Remove it by shifting down the last entry */
1975 			*vac = btvacinfo->vacuums[btvacinfo->num_vacuums - 1];
1976 			btvacinfo->num_vacuums--;
1977 			break;
1978 		}
1979 	}
1980 
1981 	LWLockRelease(BtreeVacuumLock);
1982 }
1983 
1984 /*
1985  * _bt_end_vacuum wrapped as an on_shmem_exit callback function
1986  */
1987 void
_bt_end_vacuum_callback(int code,Datum arg)1988 _bt_end_vacuum_callback(int code, Datum arg)
1989 {
1990 	_bt_end_vacuum((Relation) DatumGetPointer(arg));
1991 }
1992 
1993 /*
1994  * BTreeShmemSize --- report amount of shared memory space needed
1995  */
1996 Size
BTreeShmemSize(void)1997 BTreeShmemSize(void)
1998 {
1999 	Size		size;
2000 
2001 	size = offsetof(BTVacInfo, vacuums);
2002 	size = add_size(size, mul_size(MaxBackends, sizeof(BTOneVacInfo)));
2003 	return size;
2004 }
2005 
2006 /*
2007  * BTreeShmemInit --- initialize this module's shared memory
2008  */
2009 void
BTreeShmemInit(void)2010 BTreeShmemInit(void)
2011 {
2012 	bool		found;
2013 
2014 	btvacinfo = (BTVacInfo *) ShmemInitStruct("BTree Vacuum State",
2015 											  BTreeShmemSize(),
2016 											  &found);
2017 
2018 	if (!IsUnderPostmaster)
2019 	{
2020 		/* Initialize shared memory area */
2021 		Assert(!found);
2022 
2023 		/*
2024 		 * It doesn't really matter what the cycle counter starts at, but
2025 		 * having it always start the same doesn't seem good.  Seed with
2026 		 * low-order bits of time() instead.
2027 		 */
2028 		btvacinfo->cycle_ctr = (BTCycleId) time(NULL);
2029 
2030 		btvacinfo->num_vacuums = 0;
2031 		btvacinfo->max_vacuums = MaxBackends;
2032 	}
2033 	else
2034 		Assert(found);
2035 }
2036 
2037 bytea *
btoptions(Datum reloptions,bool validate)2038 btoptions(Datum reloptions, bool validate)
2039 {
2040 	return default_reloptions(reloptions, validate, RELOPT_KIND_BTREE);
2041 }
2042 
2043 /*
2044  *	btproperty() -- Check boolean properties of indexes.
2045  *
2046  * This is optional, but handling AMPROP_RETURNABLE here saves opening the rel
2047  * to call btcanreturn.
2048  */
2049 bool
btproperty(Oid index_oid,int attno,IndexAMProperty prop,const char * propname,bool * res,bool * isnull)2050 btproperty(Oid index_oid, int attno,
2051 		   IndexAMProperty prop, const char *propname,
2052 		   bool *res, bool *isnull)
2053 {
2054 	switch (prop)
2055 	{
2056 		case AMPROP_RETURNABLE:
2057 			/* answer only for columns, not AM or whole index */
2058 			if (attno == 0)
2059 				return false;
2060 			/* otherwise, btree can always return data */
2061 			*res = true;
2062 			return true;
2063 
2064 		default:
2065 			return false;		/* punt to generic code */
2066 	}
2067 }
2068 
2069 /*
2070  *	btbuildphasename() -- Return name of index build phase.
2071  */
2072 char *
btbuildphasename(int64 phasenum)2073 btbuildphasename(int64 phasenum)
2074 {
2075 	switch (phasenum)
2076 	{
2077 		case PROGRESS_CREATEIDX_SUBPHASE_INITIALIZE:
2078 			return "initializing";
2079 		case PROGRESS_BTREE_PHASE_INDEXBUILD_TABLESCAN:
2080 			return "scanning table";
2081 		case PROGRESS_BTREE_PHASE_PERFORMSORT_1:
2082 			return "sorting live tuples";
2083 		case PROGRESS_BTREE_PHASE_PERFORMSORT_2:
2084 			return "sorting dead tuples";
2085 		case PROGRESS_BTREE_PHASE_LEAF_LOAD:
2086 			return "loading tuples in tree";
2087 		default:
2088 			return NULL;
2089 	}
2090 }
2091 
2092 /*
2093  *	_bt_truncate() -- create tuple without unneeded suffix attributes.
2094  *
2095  * Returns truncated pivot index tuple allocated in caller's memory context,
2096  * with key attributes copied from caller's firstright argument.  If rel is
2097  * an INCLUDE index, non-key attributes will definitely be truncated away,
2098  * since they're not part of the key space.  More aggressive suffix
2099  * truncation can take place when it's clear that the returned tuple does not
2100  * need one or more suffix key attributes.  We only need to keep firstright
2101  * attributes up to and including the first non-lastleft-equal attribute.
2102  * Caller's insertion scankey is used to compare the tuples; the scankey's
2103  * argument values are not considered here.
2104  *
2105  * Sometimes this routine will return a new pivot tuple that takes up more
2106  * space than firstright, because a new heap TID attribute had to be added to
2107  * distinguish lastleft from firstright.  This should only happen when the
2108  * caller is in the process of splitting a leaf page that has many logical
2109  * duplicates, where it's unavoidable.
2110  *
2111  * Note that returned tuple's t_tid offset will hold the number of attributes
2112  * present, so the original item pointer offset is not represented.  Caller
2113  * should only change truncated tuple's downlink.  Note also that truncated
2114  * key attributes are treated as containing "minus infinity" values by
2115  * _bt_compare().
2116  *
2117  * In the worst case (when a heap TID is appended) the size of the returned
2118  * tuple is the size of the first right tuple plus an additional MAXALIGN()'d
2119  * item pointer.  This guarantee is important, since callers need to stay
2120  * under the 1/3 of a page restriction on tuple size.  If this routine is ever
2121  * taught to truncate within an attribute/datum, it will need to avoid
2122  * returning an enlarged tuple to caller when truncation + TOAST compression
2123  * ends up enlarging the final datum.
2124  */
2125 IndexTuple
_bt_truncate(Relation rel,IndexTuple lastleft,IndexTuple firstright,BTScanInsert itup_key)2126 _bt_truncate(Relation rel, IndexTuple lastleft, IndexTuple firstright,
2127 			 BTScanInsert itup_key)
2128 {
2129 	TupleDesc	itupdesc = RelationGetDescr(rel);
2130 	int16		natts = IndexRelationGetNumberOfAttributes(rel);
2131 	int16		nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
2132 	int			keepnatts;
2133 	IndexTuple	pivot;
2134 	ItemPointer pivotheaptid;
2135 	Size		newsize;
2136 
2137 	/*
2138 	 * We should only ever truncate leaf index tuples.  It's never okay to
2139 	 * truncate a second time.
2140 	 */
2141 	Assert(BTreeTupleGetNAtts(lastleft, rel) == natts);
2142 	Assert(BTreeTupleGetNAtts(firstright, rel) == natts);
2143 
2144 	/* Determine how many attributes must be kept in truncated tuple */
2145 	keepnatts = _bt_keep_natts(rel, lastleft, firstright, itup_key);
2146 
2147 #ifdef DEBUG_NO_TRUNCATE
2148 	/* Force truncation to be ineffective for testing purposes */
2149 	keepnatts = nkeyatts + 1;
2150 #endif
2151 
2152 	if (keepnatts <= natts)
2153 	{
2154 		IndexTuple	tidpivot;
2155 
2156 		pivot = index_truncate_tuple(itupdesc, firstright,
2157 									 Min(keepnatts, nkeyatts));
2158 
2159 		/*
2160 		 * If there is a distinguishing key attribute within new pivot tuple,
2161 		 * there is no need to add an explicit heap TID attribute
2162 		 */
2163 		if (keepnatts <= nkeyatts)
2164 		{
2165 			BTreeTupleSetNAtts(pivot, keepnatts);
2166 			return pivot;
2167 		}
2168 
2169 		/*
2170 		 * Only truncation of non-key attributes was possible, since key
2171 		 * attributes are all equal.  It's necessary to add a heap TID
2172 		 * attribute to the new pivot tuple.
2173 		 */
2174 		Assert(natts != nkeyatts);
2175 		newsize = IndexTupleSize(pivot) + MAXALIGN(sizeof(ItemPointerData));
2176 		tidpivot = palloc0(newsize);
2177 		memcpy(tidpivot, pivot, IndexTupleSize(pivot));
2178 		/* cannot leak memory here */
2179 		pfree(pivot);
2180 		pivot = tidpivot;
2181 	}
2182 	else
2183 	{
2184 		/*
2185 		 * No truncation was possible, since key attributes are all equal.
2186 		 * It's necessary to add a heap TID attribute to the new pivot tuple.
2187 		 *
2188 		 * This path is only taken when rel is not an INCLUDE index.  It
2189 		 * avoids a second palloc0() by avoiding the index_truncate_tuple()
2190 		 * call completely.
2191 		 */
2192 		Assert(natts == nkeyatts);
2193 		newsize = IndexTupleSize(firstright) + MAXALIGN(sizeof(ItemPointerData));
2194 		pivot = palloc0(newsize);
2195 		memcpy(pivot, firstright, IndexTupleSize(firstright));
2196 	}
2197 
2198 	/*
2199 	 * We have to use heap TID as a unique-ifier in the new pivot tuple, since
2200 	 * no non-TID key attribute in the right item readily distinguishes the
2201 	 * right side of the split from the left side.  Use enlarged space that
2202 	 * holds a copy of first right tuple; place a heap TID value within the
2203 	 * extra space that remains at the end.
2204 	 *
2205 	 * nbtree conceptualizes this case as an inability to truncate away any
2206 	 * key attribute.  We must use an alternative representation of heap TID
2207 	 * within pivots because heap TID is only treated as an attribute within
2208 	 * nbtree (e.g., there is no pg_attribute entry).
2209 	 */
2210 	Assert(itup_key->heapkeyspace);
2211 	pivot->t_info &= ~INDEX_SIZE_MASK;
2212 	pivot->t_info |= newsize;
2213 
2214 	/*
2215 	 * Lehman & Yao use lastleft as the leaf high key in all cases, but don't
2216 	 * consider suffix truncation.  It seems like a good idea to follow that
2217 	 * example in cases where no truncation takes place -- use lastleft's heap
2218 	 * TID.  (This is also the closest value to negative infinity that's
2219 	 * legally usable.)
2220 	 */
2221 	pivotheaptid = (ItemPointer) ((char *) pivot + newsize -
2222 								  sizeof(ItemPointerData));
2223 	ItemPointerCopy(&lastleft->t_tid, pivotheaptid);
2224 
2225 	/*
2226 	 * Lehman and Yao require that the downlink to the right page, which is to
2227 	 * be inserted into the parent page in the second phase of a page split be
2228 	 * a strict lower bound on items on the right page, and a non-strict upper
2229 	 * bound for items on the left page.  Assert that heap TIDs follow these
2230 	 * invariants, since a heap TID value is apparently needed as a
2231 	 * tiebreaker.
2232 	 */
2233 #ifndef DEBUG_NO_TRUNCATE
2234 	Assert(ItemPointerCompare(&lastleft->t_tid, &firstright->t_tid) < 0);
2235 	Assert(ItemPointerCompare(pivotheaptid, &lastleft->t_tid) >= 0);
2236 	Assert(ItemPointerCompare(pivotheaptid, &firstright->t_tid) < 0);
2237 #else
2238 
2239 	/*
2240 	 * Those invariants aren't guaranteed to hold for lastleft + firstright
2241 	 * heap TID attribute values when they're considered here only because
2242 	 * DEBUG_NO_TRUNCATE is defined (a heap TID is probably not actually
2243 	 * needed as a tiebreaker).  DEBUG_NO_TRUNCATE must therefore use a heap
2244 	 * TID value that always works as a strict lower bound for items to the
2245 	 * right.  In particular, it must avoid using firstright's leading key
2246 	 * attribute values along with lastleft's heap TID value when lastleft's
2247 	 * TID happens to be greater than firstright's TID.
2248 	 */
2249 	ItemPointerCopy(&firstright->t_tid, pivotheaptid);
2250 
2251 	/*
2252 	 * Pivot heap TID should never be fully equal to firstright.  Note that
2253 	 * the pivot heap TID will still end up equal to lastleft's heap TID when
2254 	 * that's the only usable value.
2255 	 */
2256 	ItemPointerSetOffsetNumber(pivotheaptid,
2257 							   OffsetNumberPrev(ItemPointerGetOffsetNumber(pivotheaptid)));
2258 	Assert(ItemPointerCompare(pivotheaptid, &firstright->t_tid) < 0);
2259 #endif
2260 
2261 	BTreeTupleSetNAtts(pivot, nkeyatts);
2262 	BTreeTupleSetAltHeapTID(pivot);
2263 
2264 	return pivot;
2265 }
2266 
2267 /*
2268  * _bt_keep_natts - how many key attributes to keep when truncating.
2269  *
2270  * Caller provides two tuples that enclose a split point.  Caller's insertion
2271  * scankey is used to compare the tuples; the scankey's argument values are
2272  * not considered here.
2273  *
2274  * This can return a number of attributes that is one greater than the
2275  * number of key attributes for the index relation.  This indicates that the
2276  * caller must use a heap TID as a unique-ifier in new pivot tuple.
2277  */
2278 static int
_bt_keep_natts(Relation rel,IndexTuple lastleft,IndexTuple firstright,BTScanInsert itup_key)2279 _bt_keep_natts(Relation rel, IndexTuple lastleft, IndexTuple firstright,
2280 			   BTScanInsert itup_key)
2281 {
2282 	int			nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
2283 	TupleDesc	itupdesc = RelationGetDescr(rel);
2284 	int			keepnatts;
2285 	ScanKey		scankey;
2286 
2287 	/*
2288 	 * Be consistent about the representation of BTREE_VERSION 2/3 tuples
2289 	 * across Postgres versions; don't allow new pivot tuples to have
2290 	 * truncated key attributes there.  _bt_compare() treats truncated key
2291 	 * attributes as having the value minus infinity, which would break
2292 	 * searches within !heapkeyspace indexes.
2293 	 */
2294 	if (!itup_key->heapkeyspace)
2295 	{
2296 		Assert(nkeyatts != IndexRelationGetNumberOfAttributes(rel));
2297 		return nkeyatts;
2298 	}
2299 
2300 	scankey = itup_key->scankeys;
2301 	keepnatts = 1;
2302 	for (int attnum = 1; attnum <= nkeyatts; attnum++, scankey++)
2303 	{
2304 		Datum		datum1,
2305 					datum2;
2306 		bool		isNull1,
2307 					isNull2;
2308 
2309 		datum1 = index_getattr(lastleft, attnum, itupdesc, &isNull1);
2310 		datum2 = index_getattr(firstright, attnum, itupdesc, &isNull2);
2311 
2312 		if (isNull1 != isNull2)
2313 			break;
2314 
2315 		if (!isNull1 &&
2316 			DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
2317 											scankey->sk_collation,
2318 											datum1,
2319 											datum2)) != 0)
2320 			break;
2321 
2322 		keepnatts++;
2323 	}
2324 
2325 	return keepnatts;
2326 }
2327 
2328 /*
2329  * _bt_keep_natts_fast - fast bitwise variant of _bt_keep_natts.
2330  *
2331  * This is exported so that a candidate split point can have its effect on
2332  * suffix truncation inexpensively evaluated ahead of time when finding a
2333  * split location.  A naive bitwise approach to datum comparisons is used to
2334  * save cycles.
2335  *
2336  * The approach taken here usually provides the same answer as _bt_keep_natts
2337  * will (for the same pair of tuples from a heapkeyspace index), since the
2338  * majority of btree opclasses can never indicate that two datums are equal
2339  * unless they're bitwise equal (once detoasted).  Similarly, result may
2340  * differ from the _bt_keep_natts result when either tuple has TOASTed datums,
2341  * though this is barely possible in practice.
2342  *
2343  * These issues must be acceptable to callers, typically because they're only
2344  * concerned about making suffix truncation as effective as possible without
2345  * leaving excessive amounts of free space on either side of page split.
2346  * Callers can rely on the fact that attributes considered equal here are
2347  * definitely also equal according to _bt_keep_natts.
2348  */
2349 int
_bt_keep_natts_fast(Relation rel,IndexTuple lastleft,IndexTuple firstright)2350 _bt_keep_natts_fast(Relation rel, IndexTuple lastleft, IndexTuple firstright)
2351 {
2352 	TupleDesc	itupdesc = RelationGetDescr(rel);
2353 	int			keysz = IndexRelationGetNumberOfKeyAttributes(rel);
2354 	int			keepnatts;
2355 
2356 	keepnatts = 1;
2357 	for (int attnum = 1; attnum <= keysz; attnum++)
2358 	{
2359 		Datum		datum1,
2360 					datum2;
2361 		bool		isNull1,
2362 					isNull2;
2363 		Form_pg_attribute att;
2364 
2365 		datum1 = index_getattr(lastleft, attnum, itupdesc, &isNull1);
2366 		datum2 = index_getattr(firstright, attnum, itupdesc, &isNull2);
2367 		att = TupleDescAttr(itupdesc, attnum - 1);
2368 
2369 		if (isNull1 != isNull2)
2370 			break;
2371 
2372 		if (!isNull1 &&
2373 			!datumIsEqual(datum1, datum2, att->attbyval, att->attlen))
2374 			break;
2375 
2376 		keepnatts++;
2377 	}
2378 
2379 	return keepnatts;
2380 }
2381 
2382 /*
2383  *  _bt_check_natts() -- Verify tuple has expected number of attributes.
2384  *
2385  * Returns value indicating if the expected number of attributes were found
2386  * for a particular offset on page.  This can be used as a general purpose
2387  * sanity check.
2388  *
2389  * Testing a tuple directly with BTreeTupleGetNAtts() should generally be
2390  * preferred to calling here.  That's usually more convenient, and is always
2391  * more explicit.  Call here instead when offnum's tuple may be a negative
2392  * infinity tuple that uses the pre-v11 on-disk representation, or when a low
2393  * context check is appropriate.  This routine is as strict as possible about
2394  * what is expected on each version of btree.
2395  */
2396 bool
_bt_check_natts(Relation rel,bool heapkeyspace,Page page,OffsetNumber offnum)2397 _bt_check_natts(Relation rel, bool heapkeyspace, Page page, OffsetNumber offnum)
2398 {
2399 	int16		natts = IndexRelationGetNumberOfAttributes(rel);
2400 	int16		nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
2401 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2402 	IndexTuple	itup;
2403 	int			tupnatts;
2404 
2405 	/*
2406 	 * We cannot reliably test a deleted or half-deleted page, since they have
2407 	 * dummy high keys
2408 	 */
2409 	if (P_IGNORE(opaque))
2410 		return true;
2411 
2412 	Assert(offnum >= FirstOffsetNumber &&
2413 		   offnum <= PageGetMaxOffsetNumber(page));
2414 
2415 	/*
2416 	 * Mask allocated for number of keys in index tuple must be able to fit
2417 	 * maximum possible number of index attributes
2418 	 */
2419 	StaticAssertStmt(BT_N_KEYS_OFFSET_MASK >= INDEX_MAX_KEYS,
2420 					 "BT_N_KEYS_OFFSET_MASK can't fit INDEX_MAX_KEYS");
2421 
2422 	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2423 	tupnatts = BTreeTupleGetNAtts(itup, rel);
2424 
2425 	if (P_ISLEAF(opaque))
2426 	{
2427 		if (offnum >= P_FIRSTDATAKEY(opaque))
2428 		{
2429 			/*
2430 			 * Non-pivot tuples currently never use alternative heap TID
2431 			 * representation -- even those within heapkeyspace indexes
2432 			 */
2433 			if ((itup->t_info & INDEX_ALT_TID_MASK) != 0)
2434 				return false;
2435 
2436 			/*
2437 			 * Leaf tuples that are not the page high key (non-pivot tuples)
2438 			 * should never be truncated.  (Note that tupnatts must have been
2439 			 * inferred, rather than coming from an explicit on-disk
2440 			 * representation.)
2441 			 */
2442 			return tupnatts == natts;
2443 		}
2444 		else
2445 		{
2446 			/*
2447 			 * Rightmost page doesn't contain a page high key, so tuple was
2448 			 * checked above as ordinary leaf tuple
2449 			 */
2450 			Assert(!P_RIGHTMOST(opaque));
2451 
2452 			/*
2453 			 * !heapkeyspace high key tuple contains only key attributes. Note
2454 			 * that tupnatts will only have been explicitly represented in
2455 			 * !heapkeyspace indexes that happen to have non-key attributes.
2456 			 */
2457 			if (!heapkeyspace)
2458 				return tupnatts == nkeyatts;
2459 
2460 			/* Use generic heapkeyspace pivot tuple handling */
2461 		}
2462 	}
2463 	else						/* !P_ISLEAF(opaque) */
2464 	{
2465 		if (offnum == P_FIRSTDATAKEY(opaque))
2466 		{
2467 			/*
2468 			 * The first tuple on any internal page (possibly the first after
2469 			 * its high key) is its negative infinity tuple.  Negative
2470 			 * infinity tuples are always truncated to zero attributes.  They
2471 			 * are a particular kind of pivot tuple.
2472 			 */
2473 			if (heapkeyspace)
2474 				return tupnatts == 0;
2475 
2476 			/*
2477 			 * The number of attributes won't be explicitly represented if the
2478 			 * negative infinity tuple was generated during a page split that
2479 			 * occurred with a version of Postgres before v11.  There must be
2480 			 * a problem when there is an explicit representation that is
2481 			 * non-zero, or when there is no explicit representation and the
2482 			 * tuple is evidently not a pre-pg_upgrade tuple.
2483 			 *
2484 			 * Prior to v11, downlinks always had P_HIKEY as their offset. Use
2485 			 * that to decide if the tuple is a pre-v11 tuple.
2486 			 */
2487 			return tupnatts == 0 ||
2488 				((itup->t_info & INDEX_ALT_TID_MASK) == 0 &&
2489 				 ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
2490 		}
2491 		else
2492 		{
2493 			/*
2494 			 * !heapkeyspace downlink tuple with separator key contains only
2495 			 * key attributes.  Note that tupnatts will only have been
2496 			 * explicitly represented in !heapkeyspace indexes that happen to
2497 			 * have non-key attributes.
2498 			 */
2499 			if (!heapkeyspace)
2500 				return tupnatts == nkeyatts;
2501 
2502 			/* Use generic heapkeyspace pivot tuple handling */
2503 		}
2504 
2505 	}
2506 
2507 	/* Handle heapkeyspace pivot tuples (excluding minus infinity items) */
2508 	Assert(heapkeyspace);
2509 
2510 	/*
2511 	 * Explicit representation of the number of attributes is mandatory with
2512 	 * heapkeyspace index pivot tuples, regardless of whether or not there are
2513 	 * non-key attributes.
2514 	 */
2515 	if ((itup->t_info & INDEX_ALT_TID_MASK) == 0)
2516 		return false;
2517 
2518 	/*
2519 	 * Heap TID is a tiebreaker key attribute, so it cannot be untruncated
2520 	 * when any other key attribute is truncated
2521 	 */
2522 	if (BTreeTupleGetHeapTID(itup) != NULL && tupnatts != nkeyatts)
2523 		return false;
2524 
2525 	/*
2526 	 * Pivot tuple must have at least one untruncated key attribute (minus
2527 	 * infinity pivot tuples are the only exception).  Pivot tuples can never
2528 	 * represent that there is a value present for a key attribute that
2529 	 * exceeds pg_index.indnkeyatts for the index.
2530 	 */
2531 	return tupnatts > 0 && tupnatts <= nkeyatts;
2532 }
2533 
2534 /*
2535  *
2536  *  _bt_check_third_page() -- check whether tuple fits on a btree page at all.
2537  *
2538  * We actually need to be able to fit three items on every page, so restrict
2539  * any one item to 1/3 the per-page available space.  Note that itemsz should
2540  * not include the ItemId overhead.
2541  *
2542  * It might be useful to apply TOAST methods rather than throw an error here.
2543  * Using out of line storage would break assumptions made by suffix truncation
2544  * and by contrib/amcheck, though.
2545  */
2546 void
_bt_check_third_page(Relation rel,Relation heap,bool needheaptidspace,Page page,IndexTuple newtup)2547 _bt_check_third_page(Relation rel, Relation heap, bool needheaptidspace,
2548 					 Page page, IndexTuple newtup)
2549 {
2550 	Size		itemsz;
2551 	BTPageOpaque opaque;
2552 
2553 	itemsz = MAXALIGN(IndexTupleSize(newtup));
2554 
2555 	/* Double check item size against limit */
2556 	if (itemsz <= BTMaxItemSize(page))
2557 		return;
2558 
2559 	/*
2560 	 * Tuple is probably too large to fit on page, but it's possible that the
2561 	 * index uses version 2 or version 3, or that page is an internal page, in
2562 	 * which case a slightly higher limit applies.
2563 	 */
2564 	if (!needheaptidspace && itemsz <= BTMaxItemSizeNoHeapTid(page))
2565 		return;
2566 
2567 	/*
2568 	 * Internal page insertions cannot fail here, because that would mean that
2569 	 * an earlier leaf level insertion that should have failed didn't
2570 	 */
2571 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2572 	if (!P_ISLEAF(opaque))
2573 		elog(ERROR, "cannot insert oversized tuple of size %zu on internal page of index \"%s\"",
2574 			 itemsz, RelationGetRelationName(rel));
2575 
2576 	ereport(ERROR,
2577 			(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2578 			 errmsg("index row size %zu exceeds btree version %u maximum %zu for index \"%s\"",
2579 					itemsz,
2580 					needheaptidspace ? BTREE_VERSION : BTREE_NOVAC_VERSION,
2581 					needheaptidspace ? BTMaxItemSize(page) :
2582 					BTMaxItemSizeNoHeapTid(page),
2583 					RelationGetRelationName(rel)),
2584 			 errdetail("Index row references tuple (%u,%u) in relation \"%s\".",
2585 					   ItemPointerGetBlockNumber(&newtup->t_tid),
2586 					   ItemPointerGetOffsetNumber(&newtup->t_tid),
2587 					   RelationGetRelationName(heap)),
2588 			 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
2589 					 "Consider a function index of an MD5 hash of the value, "
2590 					 "or use full text indexing."),
2591 			 errtableconstraint(heap, RelationGetRelationName(rel))));
2592 }
2593