1 /*-------------------------------------------------------------------------
2  *
3  * blvalidate.c
4  *	  Opclass validator for bloom.
5  *
6  * Copyright (c) 2016-2021, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  contrib/bloom/blvalidate.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/amvalidate.h"
16 #include "access/htup_details.h"
17 #include "bloom.h"
18 #include "catalog/pg_amop.h"
19 #include "catalog/pg_amproc.h"
20 #include "catalog/pg_opclass.h"
21 #include "catalog/pg_opfamily.h"
22 #include "catalog/pg_type.h"
23 #include "utils/builtins.h"
24 #include "utils/lsyscache.h"
25 #include "utils/regproc.h"
26 #include "utils/syscache.h"
27 
28 /*
29  * Validator for a bloom opclass.
30  */
31 bool
blvalidate(Oid opclassoid)32 blvalidate(Oid opclassoid)
33 {
34 	bool		result = true;
35 	HeapTuple	classtup;
36 	Form_pg_opclass classform;
37 	Oid			opfamilyoid;
38 	Oid			opcintype;
39 	Oid			opckeytype;
40 	char	   *opclassname;
41 	HeapTuple	familytup;
42 	Form_pg_opfamily familyform;
43 	char	   *opfamilyname;
44 	CatCList   *proclist,
45 			   *oprlist;
46 	List	   *grouplist;
47 	OpFamilyOpFuncGroup *opclassgroup;
48 	int			i;
49 	ListCell   *lc;
50 
51 	/* Fetch opclass information */
52 	classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
53 	if (!HeapTupleIsValid(classtup))
54 		elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
55 	classform = (Form_pg_opclass) GETSTRUCT(classtup);
56 
57 	opfamilyoid = classform->opcfamily;
58 	opcintype = classform->opcintype;
59 	opckeytype = classform->opckeytype;
60 	if (!OidIsValid(opckeytype))
61 		opckeytype = opcintype;
62 	opclassname = NameStr(classform->opcname);
63 
64 	/* Fetch opfamily information */
65 	familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid));
66 	if (!HeapTupleIsValid(familytup))
67 		elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid);
68 	familyform = (Form_pg_opfamily) GETSTRUCT(familytup);
69 
70 	opfamilyname = NameStr(familyform->opfname);
71 
72 	/* Fetch all operators and support functions of the opfamily */
73 	oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
74 	proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
75 
76 	/* Check individual support functions */
77 	for (i = 0; i < proclist->n_members; i++)
78 	{
79 		HeapTuple	proctup = &proclist->members[i]->tuple;
80 		Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
81 		bool		ok;
82 
83 		/*
84 		 * All bloom support functions should be registered with matching
85 		 * left/right types
86 		 */
87 		if (procform->amproclefttype != procform->amprocrighttype)
88 		{
89 			ereport(INFO,
90 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
91 					 errmsg("bloom opfamily %s contains support procedure %s with cross-type registration",
92 							opfamilyname,
93 							format_procedure(procform->amproc))));
94 			result = false;
95 		}
96 
97 		/*
98 		 * We can't check signatures except within the specific opclass, since
99 		 * we need to know the associated opckeytype in many cases.
100 		 */
101 		if (procform->amproclefttype != opcintype)
102 			continue;
103 
104 		/* Check procedure numbers and function signatures */
105 		switch (procform->amprocnum)
106 		{
107 			case BLOOM_HASH_PROC:
108 				ok = check_amproc_signature(procform->amproc, INT4OID, false,
109 											1, 1, opckeytype);
110 				break;
111 			case BLOOM_OPTIONS_PROC:
112 				ok = check_amoptsproc_signature(procform->amproc);
113 				break;
114 			default:
115 				ereport(INFO,
116 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
117 						 errmsg("bloom opfamily %s contains function %s with invalid support number %d",
118 								opfamilyname,
119 								format_procedure(procform->amproc),
120 								procform->amprocnum)));
121 				result = false;
122 				continue;		/* don't want additional message */
123 		}
124 
125 		if (!ok)
126 		{
127 			ereport(INFO,
128 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
129 					 errmsg("gist opfamily %s contains function %s with wrong signature for support number %d",
130 							opfamilyname,
131 							format_procedure(procform->amproc),
132 							procform->amprocnum)));
133 			result = false;
134 		}
135 	}
136 
137 	/* Check individual operators */
138 	for (i = 0; i < oprlist->n_members; i++)
139 	{
140 		HeapTuple	oprtup = &oprlist->members[i]->tuple;
141 		Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
142 
143 		/* Check it's allowed strategy for bloom */
144 		if (oprform->amopstrategy < 1 ||
145 			oprform->amopstrategy > BLOOM_NSTRATEGIES)
146 		{
147 			ereport(INFO,
148 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
149 					 errmsg("bloom opfamily %s contains operator %s with invalid strategy number %d",
150 							opfamilyname,
151 							format_operator(oprform->amopopr),
152 							oprform->amopstrategy)));
153 			result = false;
154 		}
155 
156 		/* bloom doesn't support ORDER BY operators */
157 		if (oprform->amoppurpose != AMOP_SEARCH ||
158 			OidIsValid(oprform->amopsortfamily))
159 		{
160 			ereport(INFO,
161 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
162 					 errmsg("bloom opfamily %s contains invalid ORDER BY specification for operator %s",
163 							opfamilyname,
164 							format_operator(oprform->amopopr))));
165 			result = false;
166 		}
167 
168 		/* Check operator signature --- same for all bloom strategies */
169 		if (!check_amop_signature(oprform->amopopr, BOOLOID,
170 								  oprform->amoplefttype,
171 								  oprform->amoprighttype))
172 		{
173 			ereport(INFO,
174 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
175 					 errmsg("bloom opfamily %s contains operator %s with wrong signature",
176 							opfamilyname,
177 							format_operator(oprform->amopopr))));
178 			result = false;
179 		}
180 	}
181 
182 	/* Now check for inconsistent groups of operators/functions */
183 	grouplist = identify_opfamily_groups(oprlist, proclist);
184 	opclassgroup = NULL;
185 	foreach(lc, grouplist)
186 	{
187 		OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
188 
189 		/* Remember the group exactly matching the test opclass */
190 		if (thisgroup->lefttype == opcintype &&
191 			thisgroup->righttype == opcintype)
192 			opclassgroup = thisgroup;
193 
194 		/*
195 		 * There is not a lot we can do to check the operator sets, since each
196 		 * bloom opclass is more or less a law unto itself, and some contain
197 		 * only operators that are binary-compatible with the opclass datatype
198 		 * (meaning that empty operator sets can be OK).  That case also means
199 		 * that we shouldn't insist on nonempty function sets except for the
200 		 * opclass's own group.
201 		 */
202 	}
203 
204 	/* Check that the originally-named opclass is complete */
205 	for (i = 1; i <= BLOOM_NPROC; i++)
206 	{
207 		if (opclassgroup &&
208 			(opclassgroup->functionset & (((uint64) 1) << i)) != 0)
209 			continue;			/* got it */
210 		if (i == BLOOM_OPTIONS_PROC)
211 			continue;			/* optional method */
212 		ereport(INFO,
213 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
214 				 errmsg("bloom opclass %s is missing support function %d",
215 						opclassname, i)));
216 		result = false;
217 	}
218 
219 	ReleaseCatCacheList(proclist);
220 	ReleaseCatCacheList(oprlist);
221 	ReleaseSysCache(familytup);
222 	ReleaseSysCache(classtup);
223 
224 	return result;
225 }
226