1 /*-------------------------------------------------------------------------
2  *
3  * spgvalidate.c
4  *	  Opclass validator for SP-GiST.
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *			src/backend/access/spgist/spgvalidate.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 #include "access/amvalidate.h"
17 #include "access/htup_details.h"
18 #include "access/spgist_private.h"
19 #include "catalog/pg_amop.h"
20 #include "catalog/pg_amproc.h"
21 #include "catalog/pg_opclass.h"
22 #include "catalog/pg_opfamily.h"
23 #include "catalog/pg_type.h"
24 #include "utils/builtins.h"
25 #include "utils/lsyscache.h"
26 #include "utils/regproc.h"
27 #include "utils/syscache.h"
28 
29 
30 /*
31  * Validator for an SP-GiST opclass.
32  *
33  * Some of the checks done here cover the whole opfamily, and therefore are
34  * redundant when checking each opclass in a family.  But they don't run long
35  * enough to be much of a problem, so we accept the duplication rather than
36  * complicate the amvalidate API.
37  */
38 bool
spgvalidate(Oid opclassoid)39 spgvalidate(Oid opclassoid)
40 {
41 	bool		result = true;
42 	HeapTuple	classtup;
43 	Form_pg_opclass classform;
44 	Oid			opfamilyoid;
45 	Oid			opcintype;
46 	Oid			opckeytype;
47 	char	   *opclassname;
48 	HeapTuple	familytup;
49 	Form_pg_opfamily familyform;
50 	char	   *opfamilyname;
51 	CatCList   *proclist,
52 			   *oprlist;
53 	List	   *grouplist;
54 	OpFamilyOpFuncGroup *opclassgroup;
55 	int			i;
56 	ListCell   *lc;
57 	spgConfigIn configIn;
58 	spgConfigOut configOut;
59 	Oid			configOutLefttype = InvalidOid;
60 	Oid			configOutRighttype = InvalidOid;
61 	Oid			configOutLeafType = InvalidOid;
62 
63 	/* Fetch opclass information */
64 	classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
65 	if (!HeapTupleIsValid(classtup))
66 		elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
67 	classform = (Form_pg_opclass) GETSTRUCT(classtup);
68 
69 	opfamilyoid = classform->opcfamily;
70 	opcintype = classform->opcintype;
71 	opckeytype = classform->opckeytype;
72 	opclassname = NameStr(classform->opcname);
73 
74 	/* Fetch opfamily information */
75 	familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid));
76 	if (!HeapTupleIsValid(familytup))
77 		elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid);
78 	familyform = (Form_pg_opfamily) GETSTRUCT(familytup);
79 
80 	opfamilyname = NameStr(familyform->opfname);
81 
82 	/* Fetch all operators and support functions of the opfamily */
83 	oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
84 	proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
85 	grouplist = identify_opfamily_groups(oprlist, proclist);
86 
87 	/* Check individual support functions */
88 	for (i = 0; i < proclist->n_members; i++)
89 	{
90 		HeapTuple	proctup = &proclist->members[i]->tuple;
91 		Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
92 		bool		ok;
93 
94 		/*
95 		 * All SP-GiST support functions should be registered with matching
96 		 * left/right types
97 		 */
98 		if (procform->amproclefttype != procform->amprocrighttype)
99 		{
100 			ereport(INFO,
101 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
102 					 errmsg("operator family \"%s\" of access method %s contains support function %s with different left and right input types",
103 							opfamilyname, "spgist",
104 							format_procedure(procform->amproc))));
105 			result = false;
106 		}
107 
108 		/* Check procedure numbers and function signatures */
109 		switch (procform->amprocnum)
110 		{
111 			case SPGIST_CONFIG_PROC:
112 				ok = check_amproc_signature(procform->amproc, VOIDOID, true,
113 											2, 2, INTERNALOID, INTERNALOID);
114 				configIn.attType = procform->amproclefttype;
115 				memset(&configOut, 0, sizeof(configOut));
116 
117 				OidFunctionCall2(procform->amproc,
118 								 PointerGetDatum(&configIn),
119 								 PointerGetDatum(&configOut));
120 
121 				configOutLefttype = procform->amproclefttype;
122 				configOutRighttype = procform->amprocrighttype;
123 
124 				/* Default leaf type is opckeytype or input type */
125 				if (OidIsValid(opckeytype))
126 					configOutLeafType = opckeytype;
127 				else
128 					configOutLeafType = procform->amproclefttype;
129 
130 				/* If some other leaf datum type is specified, warn */
131 				if (OidIsValid(configOut.leafType) &&
132 					configOutLeafType != configOut.leafType)
133 				{
134 					ereport(INFO,
135 							(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
136 							 errmsg("SP-GiST leaf data type %s does not match declared type %s",
137 									format_type_be(configOut.leafType),
138 									format_type_be(configOutLeafType))));
139 					result = false;
140 					configOutLeafType = configOut.leafType;
141 				}
142 
143 				/*
144 				 * When leaf and attribute types are the same, compress
145 				 * function is not required and we set corresponding bit in
146 				 * functionset for later group consistency check.
147 				 */
148 				if (configOutLeafType == configIn.attType)
149 				{
150 					foreach(lc, grouplist)
151 					{
152 						OpFamilyOpFuncGroup *group = lfirst(lc);
153 
154 						if (group->lefttype == procform->amproclefttype &&
155 							group->righttype == procform->amprocrighttype)
156 						{
157 							group->functionset |=
158 								((uint64) 1) << SPGIST_COMPRESS_PROC;
159 							break;
160 						}
161 					}
162 				}
163 				break;
164 			case SPGIST_CHOOSE_PROC:
165 			case SPGIST_PICKSPLIT_PROC:
166 			case SPGIST_INNER_CONSISTENT_PROC:
167 				ok = check_amproc_signature(procform->amproc, VOIDOID, true,
168 											2, 2, INTERNALOID, INTERNALOID);
169 				break;
170 			case SPGIST_LEAF_CONSISTENT_PROC:
171 				ok = check_amproc_signature(procform->amproc, BOOLOID, true,
172 											2, 2, INTERNALOID, INTERNALOID);
173 				break;
174 			case SPGIST_COMPRESS_PROC:
175 				if (configOutLefttype != procform->amproclefttype ||
176 					configOutRighttype != procform->amprocrighttype)
177 					ok = false;
178 				else
179 					ok = check_amproc_signature(procform->amproc,
180 												configOutLeafType, true,
181 												1, 1, procform->amproclefttype);
182 				break;
183 			case SPGIST_OPTIONS_PROC:
184 				ok = check_amoptsproc_signature(procform->amproc);
185 				break;
186 			default:
187 				ereport(INFO,
188 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
189 						 errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d",
190 								opfamilyname, "spgist",
191 								format_procedure(procform->amproc),
192 								procform->amprocnum)));
193 				result = false;
194 				continue;		/* don't want additional message */
195 		}
196 
197 		if (!ok)
198 		{
199 			ereport(INFO,
200 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
201 					 errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d",
202 							opfamilyname, "spgist",
203 							format_procedure(procform->amproc),
204 							procform->amprocnum)));
205 			result = false;
206 		}
207 	}
208 
209 	/* Check individual operators */
210 	for (i = 0; i < oprlist->n_members; i++)
211 	{
212 		HeapTuple	oprtup = &oprlist->members[i]->tuple;
213 		Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
214 		Oid			op_rettype;
215 
216 		/* TODO: Check that only allowed strategy numbers exist */
217 		if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
218 		{
219 			ereport(INFO,
220 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
221 					 errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d",
222 							opfamilyname, "spgist",
223 							format_operator(oprform->amopopr),
224 							oprform->amopstrategy)));
225 			result = false;
226 		}
227 
228 		/* spgist supports ORDER BY operators */
229 		if (oprform->amoppurpose != AMOP_SEARCH)
230 		{
231 			/* ... and operator result must match the claimed btree opfamily */
232 			op_rettype = get_op_rettype(oprform->amopopr);
233 			if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype))
234 			{
235 				ereport(INFO,
236 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
237 						 errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
238 								opfamilyname, "spgist",
239 								format_operator(oprform->amopopr))));
240 				result = false;
241 			}
242 		}
243 		else
244 			op_rettype = BOOLOID;
245 
246 		/* Check operator signature --- same for all spgist strategies */
247 		if (!check_amop_signature(oprform->amopopr, op_rettype,
248 								  oprform->amoplefttype,
249 								  oprform->amoprighttype))
250 		{
251 			ereport(INFO,
252 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
253 					 errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature",
254 							opfamilyname, "spgist",
255 							format_operator(oprform->amopopr))));
256 			result = false;
257 		}
258 	}
259 
260 	/* Now check for inconsistent groups of operators/functions */
261 	opclassgroup = NULL;
262 	foreach(lc, grouplist)
263 	{
264 		OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
265 
266 		/* Remember the group exactly matching the test opclass */
267 		if (thisgroup->lefttype == opcintype &&
268 			thisgroup->righttype == opcintype)
269 			opclassgroup = thisgroup;
270 
271 		/*
272 		 * Complain if there are any datatype pairs with functions but no
273 		 * operators.  This is about the best we can do for now to detect
274 		 * missing operators.
275 		 */
276 		if (thisgroup->operatorset == 0)
277 		{
278 			ereport(INFO,
279 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
280 					 errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s",
281 							opfamilyname, "spgist",
282 							format_type_be(thisgroup->lefttype),
283 							format_type_be(thisgroup->righttype))));
284 			result = false;
285 		}
286 
287 		/*
288 		 * Complain if we're missing functions for any datatype, remembering
289 		 * that SP-GiST doesn't use cross-type support functions.
290 		 */
291 		if (thisgroup->lefttype != thisgroup->righttype)
292 			continue;
293 
294 		for (i = 1; i <= SPGISTNProc; i++)
295 		{
296 			if ((thisgroup->functionset & (((uint64) 1) << i)) != 0)
297 				continue;		/* got it */
298 			if (i == SPGIST_OPTIONS_PROC)
299 				continue;		/* optional method */
300 			ereport(INFO,
301 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
302 					 errmsg("operator family \"%s\" of access method %s is missing support function %d for type %s",
303 							opfamilyname, "spgist", i,
304 							format_type_be(thisgroup->lefttype))));
305 			result = false;
306 		}
307 	}
308 
309 	/* Check that the originally-named opclass is supported */
310 	/* (if group is there, we already checked it adequately above) */
311 	if (!opclassgroup)
312 	{
313 		ereport(INFO,
314 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
315 				 errmsg("operator class \"%s\" of access method %s is missing operator(s)",
316 						opclassname, "spgist")));
317 		result = false;
318 	}
319 
320 	ReleaseCatCacheList(proclist);
321 	ReleaseCatCacheList(oprlist);
322 	ReleaseSysCache(familytup);
323 	ReleaseSysCache(classtup);
324 
325 	return result;
326 }
327 
328 /*
329  * Prechecking function for adding operators/functions to an SP-GiST opfamily.
330  */
331 void
spgadjustmembers(Oid opfamilyoid,Oid opclassoid,List * operators,List * functions)332 spgadjustmembers(Oid opfamilyoid,
333 				 Oid opclassoid,
334 				 List *operators,
335 				 List *functions)
336 {
337 	ListCell   *lc;
338 
339 	/*
340 	 * Operator members of an SP-GiST opfamily should never have hard
341 	 * dependencies, since their connection to the opfamily depends only on
342 	 * what the support functions think, and that can be altered.  For
343 	 * consistency, we make all soft dependencies point to the opfamily,
344 	 * though a soft dependency on the opclass would work as well in the
345 	 * CREATE OPERATOR CLASS case.
346 	 */
347 	foreach(lc, operators)
348 	{
349 		OpFamilyMember *op = (OpFamilyMember *) lfirst(lc);
350 
351 		op->ref_is_hard = false;
352 		op->ref_is_family = true;
353 		op->refobjid = opfamilyoid;
354 	}
355 
356 	/*
357 	 * Required support functions should have hard dependencies.  Preferably
358 	 * those are just dependencies on the opclass, but if we're in ALTER
359 	 * OPERATOR FAMILY, we leave the dependency pointing at the whole
360 	 * opfamily.  (Given that SP-GiST opclasses generally don't share
361 	 * opfamilies, it seems unlikely to be worth working harder.)
362 	 */
363 	foreach(lc, functions)
364 	{
365 		OpFamilyMember *op = (OpFamilyMember *) lfirst(lc);
366 
367 		switch (op->number)
368 		{
369 			case SPGIST_CONFIG_PROC:
370 			case SPGIST_CHOOSE_PROC:
371 			case SPGIST_PICKSPLIT_PROC:
372 			case SPGIST_INNER_CONSISTENT_PROC:
373 			case SPGIST_LEAF_CONSISTENT_PROC:
374 				/* Required support function */
375 				op->ref_is_hard = true;
376 				break;
377 			case SPGIST_COMPRESS_PROC:
378 			case SPGIST_OPTIONS_PROC:
379 				/* Optional, so force it to be a soft family dependency */
380 				op->ref_is_hard = false;
381 				op->ref_is_family = true;
382 				op->refobjid = opfamilyoid;
383 				break;
384 			default:
385 				ereport(ERROR,
386 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
387 						 errmsg("support function number %d is invalid for access method %s",
388 								op->number, "spgist")));
389 				break;
390 		}
391 	}
392 }
393