1 /*-------------------------------------------------------------------------
2  *
3  * amutils.c
4  *	  SQL-level APIs related to index access methods.
5  *
6  * Copyright (c) 2016-2018, PostgreSQL Global Development Group
7  *
8  *
9  * IDENTIFICATION
10  *	  src/backend/utils/adt/amutils.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include "postgres.h"
15 
16 #include "access/amapi.h"
17 #include "access/htup_details.h"
18 #include "catalog/pg_class.h"
19 #include "catalog/pg_index.h"
20 #include "utils/builtins.h"
21 #include "utils/syscache.h"
22 
23 
24 /* Convert string property name to enum, for efficiency */
25 struct am_propname
26 {
27 	const char *name;
28 	IndexAMProperty prop;
29 };
30 
31 static const struct am_propname am_propnames[] =
32 {
33 	{
34 		"asc", AMPROP_ASC
35 	},
36 	{
37 		"desc", AMPROP_DESC
38 	},
39 	{
40 		"nulls_first", AMPROP_NULLS_FIRST
41 	},
42 	{
43 		"nulls_last", AMPROP_NULLS_LAST
44 	},
45 	{
46 		"orderable", AMPROP_ORDERABLE
47 	},
48 	{
49 		"distance_orderable", AMPROP_DISTANCE_ORDERABLE
50 	},
51 	{
52 		"returnable", AMPROP_RETURNABLE
53 	},
54 	{
55 		"search_array", AMPROP_SEARCH_ARRAY
56 	},
57 	{
58 		"search_nulls", AMPROP_SEARCH_NULLS
59 	},
60 	{
61 		"clusterable", AMPROP_CLUSTERABLE
62 	},
63 	{
64 		"index_scan", AMPROP_INDEX_SCAN
65 	},
66 	{
67 		"bitmap_scan", AMPROP_BITMAP_SCAN
68 	},
69 	{
70 		"backward_scan", AMPROP_BACKWARD_SCAN
71 	},
72 	{
73 		"can_order", AMPROP_CAN_ORDER
74 	},
75 	{
76 		"can_unique", AMPROP_CAN_UNIQUE
77 	},
78 	{
79 		"can_multi_col", AMPROP_CAN_MULTI_COL
80 	},
81 	{
82 		"can_exclude", AMPROP_CAN_EXCLUDE
83 	},
84 	{
85 		"can_include", AMPROP_CAN_INCLUDE
86 	},
87 };
88 
89 static IndexAMProperty
lookup_prop_name(const char * name)90 lookup_prop_name(const char *name)
91 {
92 	int			i;
93 
94 	for (i = 0; i < lengthof(am_propnames); i++)
95 	{
96 		if (pg_strcasecmp(am_propnames[i].name, name) == 0)
97 			return am_propnames[i].prop;
98 	}
99 
100 	/* We do not throw an error, so that AMs can define their own properties */
101 	return AMPROP_UNKNOWN;
102 }
103 
104 /*
105  * Common code for properties that are just bit tests of indoptions.
106  *
107  * tuple: the pg_index heaptuple
108  * attno: identify the index column to test the indoptions of.
109  * guard: if false, a boolean false result is forced (saves code in caller).
110  * iopt_mask: mask for interesting indoption bit.
111  * iopt_expect: value for a "true" result (should be 0 or iopt_mask).
112  *
113  * Returns false to indicate a NULL result (for "unknown/inapplicable"),
114  * otherwise sets *res to the boolean value to return.
115  */
116 static bool
test_indoption(HeapTuple tuple,int attno,bool guard,int16 iopt_mask,int16 iopt_expect,bool * res)117 test_indoption(HeapTuple tuple, int attno, bool guard,
118 			   int16 iopt_mask, int16 iopt_expect,
119 			   bool *res)
120 {
121 	Datum		datum;
122 	bool		isnull;
123 	int2vector *indoption;
124 	int16		indoption_val;
125 
126 	if (!guard)
127 	{
128 		*res = false;
129 		return true;
130 	}
131 
132 	datum = SysCacheGetAttr(INDEXRELID, tuple,
133 							Anum_pg_index_indoption, &isnull);
134 	Assert(!isnull);
135 
136 	indoption = ((int2vector *) DatumGetPointer(datum));
137 	indoption_val = indoption->values[attno - 1];
138 
139 	*res = (indoption_val & iopt_mask) == iopt_expect;
140 
141 	return true;
142 }
143 
144 
145 /*
146  * Test property of an index AM, index, or index column.
147  *
148  * This is common code for different SQL-level funcs, so the amoid and
149  * index_oid parameters are mutually exclusive; we look up the amoid from the
150  * index_oid if needed, or if no index oid is given, we're looking at AM-wide
151  * properties.
152  */
153 static Datum
indexam_property(FunctionCallInfo fcinfo,const char * propname,Oid amoid,Oid index_oid,int attno)154 indexam_property(FunctionCallInfo fcinfo,
155 				 const char *propname,
156 				 Oid amoid, Oid index_oid, int attno)
157 {
158 	bool		res = false;
159 	bool		isnull = false;
160 	int			natts = 0;
161 	IndexAMProperty prop;
162 	IndexAmRoutine *routine;
163 
164 	/* Try to convert property name to enum (no error if not known) */
165 	prop = lookup_prop_name(propname);
166 
167 	/* If we have an index OID, look up the AM, and get # of columns too */
168 	if (OidIsValid(index_oid))
169 	{
170 		HeapTuple	tuple;
171 		Form_pg_class rd_rel;
172 
173 		Assert(!OidIsValid(amoid));
174 		tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(index_oid));
175 		if (!HeapTupleIsValid(tuple))
176 			PG_RETURN_NULL();
177 		rd_rel = (Form_pg_class) GETSTRUCT(tuple);
178 		if (rd_rel->relkind != RELKIND_INDEX &&
179 			rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
180 		{
181 			ReleaseSysCache(tuple);
182 			PG_RETURN_NULL();
183 		}
184 		amoid = rd_rel->relam;
185 		natts = rd_rel->relnatts;
186 		ReleaseSysCache(tuple);
187 	}
188 
189 	/*
190 	 * At this point, either index_oid == InvalidOid or it's a valid index
191 	 * OID. Also, after this test and the one below, either attno == 0 for
192 	 * index-wide or AM-wide tests, or it's a valid column number in a valid
193 	 * index.
194 	 */
195 	if (attno < 0 || attno > natts)
196 		PG_RETURN_NULL();
197 
198 	/*
199 	 * Get AM information.  If we don't have a valid AM OID, return NULL.
200 	 */
201 	routine = GetIndexAmRoutineByAmId(amoid, true);
202 	if (routine == NULL)
203 		PG_RETURN_NULL();
204 
205 	/*
206 	 * If there's an AM property routine, give it a chance to override the
207 	 * generic logic.  Proceed if it returns false.
208 	 */
209 	if (routine->amproperty &&
210 		routine->amproperty(index_oid, attno, prop, propname,
211 							&res, &isnull))
212 	{
213 		if (isnull)
214 			PG_RETURN_NULL();
215 		PG_RETURN_BOOL(res);
216 	}
217 
218 	if (attno > 0)
219 	{
220 		HeapTuple	tuple;
221 		Form_pg_index rd_index;
222 		bool		iskey = true;
223 
224 		/*
225 		 * Handle column-level properties. Many of these need the pg_index row
226 		 * (which we also need to use to check for nonkey atts) so we fetch
227 		 * that first.
228 		 */
229 		tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
230 		if (!HeapTupleIsValid(tuple))
231 			PG_RETURN_NULL();
232 		rd_index = (Form_pg_index) GETSTRUCT(tuple);
233 
234 		Assert(index_oid == rd_index->indexrelid);
235 		Assert(attno > 0 && attno <= rd_index->indnatts);
236 
237 		isnull = true;
238 
239 		/*
240 		 * If amcaninclude, we might be looking at an attno for a nonkey
241 		 * column, for which we (generically) assume that most properties are
242 		 * null.
243 		 */
244 		if (routine->amcaninclude
245 			&& attno > rd_index->indnkeyatts)
246 			iskey = false;
247 
248 		switch (prop)
249 		{
250 			case AMPROP_ASC:
251 				if (iskey &&
252 					test_indoption(tuple, attno, routine->amcanorder,
253 								   INDOPTION_DESC, 0, &res))
254 					isnull = false;
255 				break;
256 
257 			case AMPROP_DESC:
258 				if (iskey &&
259 					test_indoption(tuple, attno, routine->amcanorder,
260 								   INDOPTION_DESC, INDOPTION_DESC, &res))
261 					isnull = false;
262 				break;
263 
264 			case AMPROP_NULLS_FIRST:
265 				if (iskey &&
266 					test_indoption(tuple, attno, routine->amcanorder,
267 								   INDOPTION_NULLS_FIRST, INDOPTION_NULLS_FIRST, &res))
268 					isnull = false;
269 				break;
270 
271 			case AMPROP_NULLS_LAST:
272 				if (iskey &&
273 					test_indoption(tuple, attno, routine->amcanorder,
274 								   INDOPTION_NULLS_FIRST, 0, &res))
275 					isnull = false;
276 				break;
277 
278 			case AMPROP_ORDERABLE:
279 
280 				/*
281 				 * generic assumption is that nonkey columns are not orderable
282 				 */
283 				res = iskey ? routine->amcanorder : false;
284 				isnull = false;
285 				break;
286 
287 			case AMPROP_DISTANCE_ORDERABLE:
288 
289 				/*
290 				 * The conditions for whether a column is distance-orderable
291 				 * are really up to the AM (at time of writing, only GiST
292 				 * supports it at all). The planner has its own idea based on
293 				 * whether it finds an operator with amoppurpose 'o', but
294 				 * getting there from just the index column type seems like a
295 				 * lot of work. So instead we expect the AM to handle this in
296 				 * its amproperty routine. The generic result is to return
297 				 * false if the AM says it never supports this, or if this is
298 				 * a nonkey column, and null otherwise (meaning we don't
299 				 * know).
300 				 */
301 				if (!iskey || !routine->amcanorderbyop)
302 				{
303 					res = false;
304 					isnull = false;
305 				}
306 				break;
307 
308 			case AMPROP_RETURNABLE:
309 
310 				/* note that we ignore iskey for this property */
311 
312 				isnull = false;
313 				res = false;
314 
315 				if (routine->amcanreturn)
316 				{
317 					/*
318 					 * If possible, the AM should handle this test in its
319 					 * amproperty function without opening the rel. But this
320 					 * is the generic fallback if it does not.
321 					 */
322 					Relation	indexrel = index_open(index_oid, AccessShareLock);
323 
324 					res = index_can_return(indexrel, attno);
325 					index_close(indexrel, AccessShareLock);
326 				}
327 				break;
328 
329 			case AMPROP_SEARCH_ARRAY:
330 				if (iskey)
331 				{
332 					res = routine->amsearcharray;
333 					isnull = false;
334 				}
335 				break;
336 
337 			case AMPROP_SEARCH_NULLS:
338 				if (iskey)
339 				{
340 					res = routine->amsearchnulls;
341 					isnull = false;
342 				}
343 				break;
344 
345 			default:
346 				break;
347 		}
348 
349 		ReleaseSysCache(tuple);
350 
351 		if (!isnull)
352 			PG_RETURN_BOOL(res);
353 		PG_RETURN_NULL();
354 	}
355 
356 	if (OidIsValid(index_oid))
357 	{
358 		/*
359 		 * Handle index-level properties.  Currently, these only depend on the
360 		 * AM, but that might not be true forever, so we make users name an
361 		 * index not just an AM.
362 		 */
363 		switch (prop)
364 		{
365 			case AMPROP_CLUSTERABLE:
366 				PG_RETURN_BOOL(routine->amclusterable);
367 
368 			case AMPROP_INDEX_SCAN:
369 				PG_RETURN_BOOL(routine->amgettuple ? true : false);
370 
371 			case AMPROP_BITMAP_SCAN:
372 				PG_RETURN_BOOL(routine->amgetbitmap ? true : false);
373 
374 			case AMPROP_BACKWARD_SCAN:
375 				PG_RETURN_BOOL(routine->amcanbackward);
376 
377 			default:
378 				PG_RETURN_NULL();
379 		}
380 	}
381 
382 	/*
383 	 * Handle AM-level properties (those that control what you can say in
384 	 * CREATE INDEX).
385 	 */
386 	switch (prop)
387 	{
388 		case AMPROP_CAN_ORDER:
389 			PG_RETURN_BOOL(routine->amcanorder);
390 
391 		case AMPROP_CAN_UNIQUE:
392 			PG_RETURN_BOOL(routine->amcanunique);
393 
394 		case AMPROP_CAN_MULTI_COL:
395 			PG_RETURN_BOOL(routine->amcanmulticol);
396 
397 		case AMPROP_CAN_EXCLUDE:
398 			PG_RETURN_BOOL(routine->amgettuple ? true : false);
399 
400 		case AMPROP_CAN_INCLUDE:
401 			PG_RETURN_BOOL(routine->amcaninclude);
402 
403 		default:
404 			PG_RETURN_NULL();
405 	}
406 }
407 
408 /*
409  * Test property of an AM specified by AM OID
410  */
411 Datum
pg_indexam_has_property(PG_FUNCTION_ARGS)412 pg_indexam_has_property(PG_FUNCTION_ARGS)
413 {
414 	Oid			amoid = PG_GETARG_OID(0);
415 	char	   *propname = text_to_cstring(PG_GETARG_TEXT_PP(1));
416 
417 	return indexam_property(fcinfo, propname, amoid, InvalidOid, 0);
418 }
419 
420 /*
421  * Test property of an index specified by index OID
422  */
423 Datum
pg_index_has_property(PG_FUNCTION_ARGS)424 pg_index_has_property(PG_FUNCTION_ARGS)
425 {
426 	Oid			relid = PG_GETARG_OID(0);
427 	char	   *propname = text_to_cstring(PG_GETARG_TEXT_PP(1));
428 
429 	return indexam_property(fcinfo, propname, InvalidOid, relid, 0);
430 }
431 
432 /*
433  * Test property of an index column specified by index OID and column number
434  */
435 Datum
pg_index_column_has_property(PG_FUNCTION_ARGS)436 pg_index_column_has_property(PG_FUNCTION_ARGS)
437 {
438 	Oid			relid = PG_GETARG_OID(0);
439 	int32		attno = PG_GETARG_INT32(1);
440 	char	   *propname = text_to_cstring(PG_GETARG_TEXT_PP(2));
441 
442 	/* Reject attno 0 immediately, so that attno > 0 identifies this case */
443 	if (attno <= 0)
444 		PG_RETURN_NULL();
445 
446 	return indexam_property(fcinfo, propname, InvalidOid, relid, attno);
447 }
448