1 /*-------------------------------------------------------------------------
2  *
3  * spgvalidate.c
4  *	  Opclass validator for SP-GiST.
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *			src/backend/access/spgist/spgvalidate.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 #include "access/amvalidate.h"
17 #include "access/htup_details.h"
18 #include "access/spgist_private.h"
19 #include "catalog/pg_amop.h"
20 #include "catalog/pg_amproc.h"
21 #include "catalog/pg_opclass.h"
22 #include "catalog/pg_opfamily.h"
23 #include "catalog/pg_type.h"
24 #include "utils/builtins.h"
25 #include "utils/lsyscache.h"
26 #include "utils/regproc.h"
27 #include "utils/syscache.h"
28 
29 
30 /*
31  * Validator for an SP-GiST opclass.
32  *
33  * Some of the checks done here cover the whole opfamily, and therefore are
34  * redundant when checking each opclass in a family.  But they don't run long
35  * enough to be much of a problem, so we accept the duplication rather than
36  * complicate the amvalidate API.
37  */
38 bool
spgvalidate(Oid opclassoid)39 spgvalidate(Oid opclassoid)
40 {
41 	bool		result = true;
42 	HeapTuple	classtup;
43 	Form_pg_opclass classform;
44 	Oid			opfamilyoid;
45 	Oid			opcintype;
46 	char	   *opclassname;
47 	HeapTuple	familytup;
48 	Form_pg_opfamily familyform;
49 	char	   *opfamilyname;
50 	CatCList   *proclist,
51 			   *oprlist;
52 	List	   *grouplist;
53 	OpFamilyOpFuncGroup *opclassgroup;
54 	int			i;
55 	ListCell   *lc;
56 	spgConfigIn configIn;
57 	spgConfigOut configOut;
58 	Oid			configOutLefttype = InvalidOid;
59 	Oid			configOutRighttype = InvalidOid;
60 
61 	/* Fetch opclass information */
62 	classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
63 	if (!HeapTupleIsValid(classtup))
64 		elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
65 	classform = (Form_pg_opclass) GETSTRUCT(classtup);
66 
67 	opfamilyoid = classform->opcfamily;
68 	opcintype = classform->opcintype;
69 	opclassname = NameStr(classform->opcname);
70 
71 	/* Fetch opfamily information */
72 	familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid));
73 	if (!HeapTupleIsValid(familytup))
74 		elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid);
75 	familyform = (Form_pg_opfamily) GETSTRUCT(familytup);
76 
77 	opfamilyname = NameStr(familyform->opfname);
78 
79 	/* Fetch all operators and support functions of the opfamily */
80 	oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
81 	proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
82 	grouplist = identify_opfamily_groups(oprlist, proclist);
83 
84 	/* Check individual support functions */
85 	for (i = 0; i < proclist->n_members; i++)
86 	{
87 		HeapTuple	proctup = &proclist->members[i]->tuple;
88 		Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
89 		bool		ok;
90 
91 		/*
92 		 * All SP-GiST support functions should be registered with matching
93 		 * left/right types
94 		 */
95 		if (procform->amproclefttype != procform->amprocrighttype)
96 		{
97 			ereport(INFO,
98 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
99 					 errmsg("operator family \"%s\" of access method %s contains support function %s with different left and right input types",
100 							opfamilyname, "spgist",
101 							format_procedure(procform->amproc))));
102 			result = false;
103 		}
104 
105 		/* Check procedure numbers and function signatures */
106 		switch (procform->amprocnum)
107 		{
108 			case SPGIST_CONFIG_PROC:
109 				ok = check_amproc_signature(procform->amproc, VOIDOID, true,
110 											2, 2, INTERNALOID, INTERNALOID);
111 				configIn.attType = procform->amproclefttype;
112 				memset(&configOut, 0, sizeof(configOut));
113 
114 				OidFunctionCall2(procform->amproc,
115 								 PointerGetDatum(&configIn),
116 								 PointerGetDatum(&configOut));
117 
118 				configOutLefttype = procform->amproclefttype;
119 				configOutRighttype = procform->amprocrighttype;
120 
121 				/*
122 				 * When leaf and attribute types are the same, compress
123 				 * function is not required and we set corresponding bit in
124 				 * functionset for later group consistency check.
125 				 */
126 				if (!OidIsValid(configOut.leafType) ||
127 					configOut.leafType == configIn.attType)
128 				{
129 					foreach(lc, grouplist)
130 					{
131 						OpFamilyOpFuncGroup *group = lfirst(lc);
132 
133 						if (group->lefttype == procform->amproclefttype &&
134 							group->righttype == procform->amprocrighttype)
135 						{
136 							group->functionset |=
137 								((uint64) 1) << SPGIST_COMPRESS_PROC;
138 							break;
139 						}
140 					}
141 				}
142 				break;
143 			case SPGIST_CHOOSE_PROC:
144 			case SPGIST_PICKSPLIT_PROC:
145 			case SPGIST_INNER_CONSISTENT_PROC:
146 				ok = check_amproc_signature(procform->amproc, VOIDOID, true,
147 											2, 2, INTERNALOID, INTERNALOID);
148 				break;
149 			case SPGIST_LEAF_CONSISTENT_PROC:
150 				ok = check_amproc_signature(procform->amproc, BOOLOID, true,
151 											2, 2, INTERNALOID, INTERNALOID);
152 				break;
153 			case SPGIST_COMPRESS_PROC:
154 				if (configOutLefttype != procform->amproclefttype ||
155 					configOutRighttype != procform->amprocrighttype)
156 					ok = false;
157 				else
158 					ok = check_amproc_signature(procform->amproc,
159 												configOut.leafType, true,
160 												1, 1, procform->amproclefttype);
161 				break;
162 			case SPGIST_OPTIONS_PROC:
163 				ok = check_amoptsproc_signature(procform->amproc);
164 				break;
165 			default:
166 				ereport(INFO,
167 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
168 						 errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d",
169 								opfamilyname, "spgist",
170 								format_procedure(procform->amproc),
171 								procform->amprocnum)));
172 				result = false;
173 				continue;		/* don't want additional message */
174 		}
175 
176 		if (!ok)
177 		{
178 			ereport(INFO,
179 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
180 					 errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d",
181 							opfamilyname, "spgist",
182 							format_procedure(procform->amproc),
183 							procform->amprocnum)));
184 			result = false;
185 		}
186 	}
187 
188 	/* Check individual operators */
189 	for (i = 0; i < oprlist->n_members; i++)
190 	{
191 		HeapTuple	oprtup = &oprlist->members[i]->tuple;
192 		Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
193 		Oid			op_rettype;
194 
195 		/* TODO: Check that only allowed strategy numbers exist */
196 		if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
197 		{
198 			ereport(INFO,
199 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
200 					 errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d",
201 							opfamilyname, "spgist",
202 							format_operator(oprform->amopopr),
203 							oprform->amopstrategy)));
204 			result = false;
205 		}
206 
207 		/* spgist supports ORDER BY operators */
208 		if (oprform->amoppurpose != AMOP_SEARCH)
209 		{
210 			/* ... and operator result must match the claimed btree opfamily */
211 			op_rettype = get_op_rettype(oprform->amopopr);
212 			if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype))
213 			{
214 				ereport(INFO,
215 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
216 						 errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
217 								opfamilyname, "spgist",
218 								format_operator(oprform->amopopr))));
219 				result = false;
220 			}
221 		}
222 		else
223 			op_rettype = BOOLOID;
224 
225 		/* Check operator signature --- same for all spgist strategies */
226 		if (!check_amop_signature(oprform->amopopr, op_rettype,
227 								  oprform->amoplefttype,
228 								  oprform->amoprighttype))
229 		{
230 			ereport(INFO,
231 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
232 					 errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature",
233 							opfamilyname, "spgist",
234 							format_operator(oprform->amopopr))));
235 			result = false;
236 		}
237 	}
238 
239 	/* Now check for inconsistent groups of operators/functions */
240 	opclassgroup = NULL;
241 	foreach(lc, grouplist)
242 	{
243 		OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
244 
245 		/* Remember the group exactly matching the test opclass */
246 		if (thisgroup->lefttype == opcintype &&
247 			thisgroup->righttype == opcintype)
248 			opclassgroup = thisgroup;
249 
250 		/*
251 		 * Complain if there are any datatype pairs with functions but no
252 		 * operators.  This is about the best we can do for now to detect
253 		 * missing operators.
254 		 */
255 		if (thisgroup->operatorset == 0)
256 		{
257 			ereport(INFO,
258 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
259 					 errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s",
260 							opfamilyname, "spgist",
261 							format_type_be(thisgroup->lefttype),
262 							format_type_be(thisgroup->righttype))));
263 			result = false;
264 		}
265 
266 		/*
267 		 * Complain if we're missing functions for any datatype, remembering
268 		 * that SP-GiST doesn't use cross-type support functions.
269 		 */
270 		if (thisgroup->lefttype != thisgroup->righttype)
271 			continue;
272 
273 		for (i = 1; i <= SPGISTNProc; i++)
274 		{
275 			if ((thisgroup->functionset & (((uint64) 1) << i)) != 0)
276 				continue;		/* got it */
277 			if (i == SPGIST_OPTIONS_PROC)
278 				continue;		/* optional method */
279 			ereport(INFO,
280 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
281 					 errmsg("operator family \"%s\" of access method %s is missing support function %d for type %s",
282 							opfamilyname, "spgist", i,
283 							format_type_be(thisgroup->lefttype))));
284 			result = false;
285 		}
286 	}
287 
288 	/* Check that the originally-named opclass is supported */
289 	/* (if group is there, we already checked it adequately above) */
290 	if (!opclassgroup)
291 	{
292 		ereport(INFO,
293 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
294 				 errmsg("operator class \"%s\" of access method %s is missing operator(s)",
295 						opclassname, "spgist")));
296 		result = false;
297 	}
298 
299 	ReleaseCatCacheList(proclist);
300 	ReleaseCatCacheList(oprlist);
301 	ReleaseSysCache(familytup);
302 	ReleaseSysCache(classtup);
303 
304 	return result;
305 }
306