1 /*-------------------------------------------------------------------------
2  *
3  * amvalidate.c
4  *	  Support routines for index access methods' amvalidate and
5  *	  amadjustmembers functions.
6  *
7  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/access/index/amvalidate.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/amvalidate.h"
18 #include "access/htup_details.h"
19 #include "catalog/pg_am.h"
20 #include "catalog/pg_amop.h"
21 #include "catalog/pg_amproc.h"
22 #include "catalog/pg_opclass.h"
23 #include "catalog/pg_operator.h"
24 #include "catalog/pg_proc.h"
25 #include "catalog/pg_type.h"
26 #include "parser/parse_coerce.h"
27 #include "utils/syscache.h"
28 
29 
30 /*
31  * identify_opfamily_groups() returns a List of OpFamilyOpFuncGroup structs,
32  * one for each combination of lefttype/righttype present in the family's
33  * operator and support function lists.  If amopstrategy K is present for
34  * this datatype combination, we set bit 1 << K in operatorset, and similarly
35  * for the support functions.  With uint64 fields we can handle operator and
36  * function numbers up to 63, which is plenty for the foreseeable future.
37  *
38  * The given CatCLists are expected to represent a single opfamily fetched
39  * from the AMOPSTRATEGY and AMPROCNUM caches, so that they will be in
40  * order by those caches' second and third cache keys, namely the datatypes.
41  */
42 List *
identify_opfamily_groups(CatCList * oprlist,CatCList * proclist)43 identify_opfamily_groups(CatCList *oprlist, CatCList *proclist)
44 {
45 	List	   *result = NIL;
46 	OpFamilyOpFuncGroup *thisgroup;
47 	Form_pg_amop oprform;
48 	Form_pg_amproc procform;
49 	int			io,
50 				ip;
51 
52 	/* We need the lists to be ordered; should be true in normal operation */
53 	if (!oprlist->ordered || !proclist->ordered)
54 		elog(ERROR, "cannot validate operator family without ordered data");
55 
56 	/*
57 	 * Advance through the lists concurrently.  Thanks to the ordering, we
58 	 * should see all operators and functions of a given datatype pair
59 	 * consecutively.
60 	 */
61 	thisgroup = NULL;
62 	io = ip = 0;
63 	if (io < oprlist->n_members)
64 	{
65 		oprform = (Form_pg_amop) GETSTRUCT(&oprlist->members[io]->tuple);
66 		io++;
67 	}
68 	else
69 		oprform = NULL;
70 	if (ip < proclist->n_members)
71 	{
72 		procform = (Form_pg_amproc) GETSTRUCT(&proclist->members[ip]->tuple);
73 		ip++;
74 	}
75 	else
76 		procform = NULL;
77 
78 	while (oprform || procform)
79 	{
80 		if (oprform && thisgroup &&
81 			oprform->amoplefttype == thisgroup->lefttype &&
82 			oprform->amoprighttype == thisgroup->righttype)
83 		{
84 			/* Operator belongs to current group; include it and advance */
85 
86 			/* Ignore strategy numbers outside supported range */
87 			if (oprform->amopstrategy > 0 && oprform->amopstrategy < 64)
88 				thisgroup->operatorset |= ((uint64) 1) << oprform->amopstrategy;
89 
90 			if (io < oprlist->n_members)
91 			{
92 				oprform = (Form_pg_amop) GETSTRUCT(&oprlist->members[io]->tuple);
93 				io++;
94 			}
95 			else
96 				oprform = NULL;
97 			continue;
98 		}
99 
100 		if (procform && thisgroup &&
101 			procform->amproclefttype == thisgroup->lefttype &&
102 			procform->amprocrighttype == thisgroup->righttype)
103 		{
104 			/* Procedure belongs to current group; include it and advance */
105 
106 			/* Ignore function numbers outside supported range */
107 			if (procform->amprocnum > 0 && procform->amprocnum < 64)
108 				thisgroup->functionset |= ((uint64) 1) << procform->amprocnum;
109 
110 			if (ip < proclist->n_members)
111 			{
112 				procform = (Form_pg_amproc) GETSTRUCT(&proclist->members[ip]->tuple);
113 				ip++;
114 			}
115 			else
116 				procform = NULL;
117 			continue;
118 		}
119 
120 		/* Time for a new group */
121 		thisgroup = (OpFamilyOpFuncGroup *) palloc(sizeof(OpFamilyOpFuncGroup));
122 		if (oprform &&
123 			(!procform ||
124 			 (oprform->amoplefttype < procform->amproclefttype ||
125 			  (oprform->amoplefttype == procform->amproclefttype &&
126 			   oprform->amoprighttype < procform->amprocrighttype))))
127 		{
128 			thisgroup->lefttype = oprform->amoplefttype;
129 			thisgroup->righttype = oprform->amoprighttype;
130 		}
131 		else
132 		{
133 			thisgroup->lefttype = procform->amproclefttype;
134 			thisgroup->righttype = procform->amprocrighttype;
135 		}
136 		thisgroup->operatorset = thisgroup->functionset = 0;
137 		result = lappend(result, thisgroup);
138 	}
139 
140 	return result;
141 }
142 
143 /*
144  * Validate the signature (argument and result types) of an opclass support
145  * function.  Return true if OK, false if not.
146  *
147  * The "..." represents maxargs argument-type OIDs.  If "exact" is true, they
148  * must match the function arg types exactly, else only binary-coercibly.
149  * In any case the function result type must match restype exactly.
150  */
151 bool
check_amproc_signature(Oid funcid,Oid restype,bool exact,int minargs,int maxargs,...)152 check_amproc_signature(Oid funcid, Oid restype, bool exact,
153 					   int minargs, int maxargs,...)
154 {
155 	bool		result = true;
156 	HeapTuple	tp;
157 	Form_pg_proc procform;
158 	va_list		ap;
159 	int			i;
160 
161 	tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
162 	if (!HeapTupleIsValid(tp))
163 		elog(ERROR, "cache lookup failed for function %u", funcid);
164 	procform = (Form_pg_proc) GETSTRUCT(tp);
165 
166 	if (procform->prorettype != restype || procform->proretset ||
167 		procform->pronargs < minargs || procform->pronargs > maxargs)
168 		result = false;
169 
170 	va_start(ap, maxargs);
171 	for (i = 0; i < maxargs; i++)
172 	{
173 		Oid			argtype = va_arg(ap, Oid);
174 
175 		if (i >= procform->pronargs)
176 			continue;
177 		if (exact ? (argtype != procform->proargtypes.values[i]) :
178 			!IsBinaryCoercible(argtype, procform->proargtypes.values[i]))
179 			result = false;
180 	}
181 	va_end(ap);
182 
183 	ReleaseSysCache(tp);
184 	return result;
185 }
186 
187 /*
188  * Validate the signature of an opclass options support function, that should
189  * be 'void(internal)'.
190  */
191 bool
check_amoptsproc_signature(Oid funcid)192 check_amoptsproc_signature(Oid funcid)
193 {
194 	return check_amproc_signature(funcid, VOIDOID, true, 1, 1, INTERNALOID);
195 }
196 
197 /*
198  * Validate the signature (argument and result types) of an opclass operator.
199  * Return true if OK, false if not.
200  *
201  * Currently, we can hard-wire this as accepting only binary operators.  Also,
202  * we can insist on exact type matches, since the given lefttype/righttype
203  * come from pg_amop and should always match the operator exactly.
204  */
205 bool
check_amop_signature(Oid opno,Oid restype,Oid lefttype,Oid righttype)206 check_amop_signature(Oid opno, Oid restype, Oid lefttype, Oid righttype)
207 {
208 	bool		result = true;
209 	HeapTuple	tp;
210 	Form_pg_operator opform;
211 
212 	tp = SearchSysCache1(OPEROID, ObjectIdGetDatum(opno));
213 	if (!HeapTupleIsValid(tp))	/* shouldn't happen */
214 		elog(ERROR, "cache lookup failed for operator %u", opno);
215 	opform = (Form_pg_operator) GETSTRUCT(tp);
216 
217 	if (opform->oprresult != restype || opform->oprkind != 'b' ||
218 		opform->oprleft != lefttype || opform->oprright != righttype)
219 		result = false;
220 
221 	ReleaseSysCache(tp);
222 	return result;
223 }
224 
225 /*
226  * Get the OID of the opclass belonging to an opfamily and accepting
227  * the specified type as input type.  Returns InvalidOid if no such opclass.
228  *
229  * If there is more than one such opclass, you get a random one of them.
230  * Since that shouldn't happen, we don't waste cycles checking.
231  *
232  * We could look up the AM's OID from the opfamily, but all existing callers
233  * know that or can get it without an extra lookup, so we make them pass it.
234  */
235 Oid
opclass_for_family_datatype(Oid amoid,Oid opfamilyoid,Oid datatypeoid)236 opclass_for_family_datatype(Oid amoid, Oid opfamilyoid, Oid datatypeoid)
237 {
238 	Oid			result = InvalidOid;
239 	CatCList   *opclist;
240 	int			i;
241 
242 	/*
243 	 * We search through all the AM's opclasses to see if one matches.  This
244 	 * is a bit inefficient but there is no better index available.  It also
245 	 * saves making an explicit check that the opfamily belongs to the AM.
246 	 */
247 	opclist = SearchSysCacheList1(CLAAMNAMENSP, ObjectIdGetDatum(amoid));
248 
249 	for (i = 0; i < opclist->n_members; i++)
250 	{
251 		HeapTuple	classtup = &opclist->members[i]->tuple;
252 		Form_pg_opclass classform = (Form_pg_opclass) GETSTRUCT(classtup);
253 
254 		if (classform->opcfamily == opfamilyoid &&
255 			classform->opcintype == datatypeoid)
256 		{
257 			result = classform->oid;
258 			break;
259 		}
260 	}
261 
262 	ReleaseCatCacheList(opclist);
263 
264 	return result;
265 }
266 
267 /*
268  * Is the datatype a legitimate input type for the btree opfamily?
269  */
270 bool
opfamily_can_sort_type(Oid opfamilyoid,Oid datatypeoid)271 opfamily_can_sort_type(Oid opfamilyoid, Oid datatypeoid)
272 {
273 	return OidIsValid(opclass_for_family_datatype(BTREE_AM_OID,
274 												  opfamilyoid,
275 												  datatypeoid));
276 }
277