1 /*-------------------------------------------------------------------------
2  *
3  * blvalidate.c
4  *	  Opclass validator for bloom.
5  *
6  * Copyright (c) 2016-2019, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  contrib/bloom/blvalidate.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/amvalidate.h"
16 #include "access/htup_details.h"
17 #include "catalog/pg_amop.h"
18 #include "catalog/pg_amproc.h"
19 #include "catalog/pg_opclass.h"
20 #include "catalog/pg_opfamily.h"
21 #include "catalog/pg_type.h"
22 #include "utils/builtins.h"
23 #include "utils/lsyscache.h"
24 #include "utils/regproc.h"
25 #include "utils/syscache.h"
26 
27 #include "bloom.h"
28 
29 /*
30  * Validator for a bloom opclass.
31  */
32 bool
blvalidate(Oid opclassoid)33 blvalidate(Oid opclassoid)
34 {
35 	bool		result = true;
36 	HeapTuple	classtup;
37 	Form_pg_opclass classform;
38 	Oid			opfamilyoid;
39 	Oid			opcintype;
40 	Oid			opckeytype;
41 	char	   *opclassname;
42 	HeapTuple	familytup;
43 	Form_pg_opfamily familyform;
44 	char	   *opfamilyname;
45 	CatCList   *proclist,
46 			   *oprlist;
47 	List	   *grouplist;
48 	OpFamilyOpFuncGroup *opclassgroup;
49 	int			i;
50 	ListCell   *lc;
51 
52 	/* Fetch opclass information */
53 	classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
54 	if (!HeapTupleIsValid(classtup))
55 		elog(ERROR, "cache lookup failed for operator class %u", opclassoid);
56 	classform = (Form_pg_opclass) GETSTRUCT(classtup);
57 
58 	opfamilyoid = classform->opcfamily;
59 	opcintype = classform->opcintype;
60 	opckeytype = classform->opckeytype;
61 	if (!OidIsValid(opckeytype))
62 		opckeytype = opcintype;
63 	opclassname = NameStr(classform->opcname);
64 
65 	/* Fetch opfamily information */
66 	familytup = SearchSysCache1(OPFAMILYOID, ObjectIdGetDatum(opfamilyoid));
67 	if (!HeapTupleIsValid(familytup))
68 		elog(ERROR, "cache lookup failed for operator family %u", opfamilyoid);
69 	familyform = (Form_pg_opfamily) GETSTRUCT(familytup);
70 
71 	opfamilyname = NameStr(familyform->opfname);
72 
73 	/* Fetch all operators and support functions of the opfamily */
74 	oprlist = SearchSysCacheList1(AMOPSTRATEGY, ObjectIdGetDatum(opfamilyoid));
75 	proclist = SearchSysCacheList1(AMPROCNUM, ObjectIdGetDatum(opfamilyoid));
76 
77 	/* Check individual support functions */
78 	for (i = 0; i < proclist->n_members; i++)
79 	{
80 		HeapTuple	proctup = &proclist->members[i]->tuple;
81 		Form_pg_amproc procform = (Form_pg_amproc) GETSTRUCT(proctup);
82 		bool		ok;
83 
84 		/*
85 		 * All bloom support functions should be registered with matching
86 		 * left/right types
87 		 */
88 		if (procform->amproclefttype != procform->amprocrighttype)
89 		{
90 			ereport(INFO,
91 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
92 					 errmsg("bloom opfamily %s contains support procedure %s with cross-type registration",
93 							opfamilyname,
94 							format_procedure(procform->amproc))));
95 			result = false;
96 		}
97 
98 		/*
99 		 * We can't check signatures except within the specific opclass, since
100 		 * we need to know the associated opckeytype in many cases.
101 		 */
102 		if (procform->amproclefttype != opcintype)
103 			continue;
104 
105 		/* Check procedure numbers and function signatures */
106 		switch (procform->amprocnum)
107 		{
108 			case BLOOM_HASH_PROC:
109 				ok = check_amproc_signature(procform->amproc, INT4OID, false,
110 											1, 1, opckeytype);
111 				break;
112 			default:
113 				ereport(INFO,
114 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
115 						 errmsg("bloom opfamily %s contains function %s with invalid support number %d",
116 								opfamilyname,
117 								format_procedure(procform->amproc),
118 								procform->amprocnum)));
119 				result = false;
120 				continue;		/* don't want additional message */
121 		}
122 
123 		if (!ok)
124 		{
125 			ereport(INFO,
126 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
127 					 errmsg("gist opfamily %s contains function %s with wrong signature for support number %d",
128 							opfamilyname,
129 							format_procedure(procform->amproc),
130 							procform->amprocnum)));
131 			result = false;
132 		}
133 	}
134 
135 	/* Check individual operators */
136 	for (i = 0; i < oprlist->n_members; i++)
137 	{
138 		HeapTuple	oprtup = &oprlist->members[i]->tuple;
139 		Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
140 
141 		/* Check it's allowed strategy for bloom */
142 		if (oprform->amopstrategy < 1 ||
143 			oprform->amopstrategy > BLOOM_NSTRATEGIES)
144 		{
145 			ereport(INFO,
146 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
147 					 errmsg("bloom opfamily %s contains operator %s with invalid strategy number %d",
148 							opfamilyname,
149 							format_operator(oprform->amopopr),
150 							oprform->amopstrategy)));
151 			result = false;
152 		}
153 
154 		/* bloom doesn't support ORDER BY operators */
155 		if (oprform->amoppurpose != AMOP_SEARCH ||
156 			OidIsValid(oprform->amopsortfamily))
157 		{
158 			ereport(INFO,
159 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
160 					 errmsg("bloom opfamily %s contains invalid ORDER BY specification for operator %s",
161 							opfamilyname,
162 							format_operator(oprform->amopopr))));
163 			result = false;
164 		}
165 
166 		/* Check operator signature --- same for all bloom strategies */
167 		if (!check_amop_signature(oprform->amopopr, BOOLOID,
168 								  oprform->amoplefttype,
169 								  oprform->amoprighttype))
170 		{
171 			ereport(INFO,
172 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
173 					 errmsg("bloom opfamily %s contains operator %s with wrong signature",
174 							opfamilyname,
175 							format_operator(oprform->amopopr))));
176 			result = false;
177 		}
178 	}
179 
180 	/* Now check for inconsistent groups of operators/functions */
181 	grouplist = identify_opfamily_groups(oprlist, proclist);
182 	opclassgroup = NULL;
183 	foreach(lc, grouplist)
184 	{
185 		OpFamilyOpFuncGroup *thisgroup = (OpFamilyOpFuncGroup *) lfirst(lc);
186 
187 		/* Remember the group exactly matching the test opclass */
188 		if (thisgroup->lefttype == opcintype &&
189 			thisgroup->righttype == opcintype)
190 			opclassgroup = thisgroup;
191 
192 		/*
193 		 * There is not a lot we can do to check the operator sets, since each
194 		 * bloom opclass is more or less a law unto itself, and some contain
195 		 * only operators that are binary-compatible with the opclass datatype
196 		 * (meaning that empty operator sets can be OK).  That case also means
197 		 * that we shouldn't insist on nonempty function sets except for the
198 		 * opclass's own group.
199 		 */
200 	}
201 
202 	/* Check that the originally-named opclass is complete */
203 	for (i = 1; i <= BLOOM_NPROC; i++)
204 	{
205 		if (opclassgroup &&
206 			(opclassgroup->functionset & (((uint64) 1) << i)) != 0)
207 			continue;			/* got it */
208 		ereport(INFO,
209 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
210 				 errmsg("bloom opclass %s is missing support function %d",
211 						opclassname, i)));
212 		result = false;
213 	}
214 
215 	ReleaseCatCacheList(proclist);
216 	ReleaseCatCacheList(oprlist);
217 	ReleaseSysCache(familytup);
218 	ReleaseSysCache(classtup);
219 
220 	return result;
221 }
222