1 /*
2  * brin_inclusion.c
3  *		Implementation of inclusion opclasses for BRIN
4  *
5  * This module provides framework BRIN support functions for the "inclusion"
6  * operator classes.  A few SQL-level support functions are also required for
7  * each opclass.
8  *
9  * The "inclusion" BRIN strategy is useful for types that support R-Tree
10  * operations.  This implementation is a straight mapping of those operations
11  * to the block-range nature of BRIN, with two exceptions: (a) we explicitly
12  * support "empty" elements: at least with range types, we need to consider
13  * emptiness separately from regular R-Tree strategies; and (b) we need to
14  * consider "unmergeable" elements, that is, a set of elements for whose union
15  * no representation exists.  The only case where that happens as of this
16  * writing is the INET type, where IPv6 values cannot be merged with IPv4
17  * values.
18  *
19  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
20  * Portions Copyright (c) 1994, Regents of the University of California
21  *
22  * IDENTIFICATION
23  *	  src/backend/access/brin/brin_inclusion.c
24  */
25 #include "postgres.h"
26 
27 #include "access/brin_internal.h"
28 #include "access/brin_tuple.h"
29 #include "access/genam.h"
30 #include "access/skey.h"
31 #include "catalog/pg_amop.h"
32 #include "catalog/pg_type.h"
33 #include "utils/datum.h"
34 #include "utils/lsyscache.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
37 
38 
39 /*
40  * Additional SQL level support functions
41  *
42  * Procedure numbers must not use values reserved for BRIN itself; see
43  * brin_internal.h.
44  */
45 #define		INCLUSION_MAX_PROCNUMS	4	/* maximum support procs we need */
46 #define		PROCNUM_MERGE			11	/* required */
47 #define		PROCNUM_MERGEABLE		12	/* optional */
48 #define		PROCNUM_CONTAINS		13	/* optional */
49 #define		PROCNUM_EMPTY			14	/* optional */
50 
51 
52 /*
53  * Subtract this from procnum to obtain index in InclusionOpaque arrays
54  * (Must be equal to minimum of private procnums).
55  */
56 #define		PROCNUM_BASE			11
57 
58 /*-
59  * The values stored in the bv_values arrays correspond to:
60  *
61  * INCLUSION_UNION
62  *		the union of the values in the block range
63  * INCLUSION_UNMERGEABLE
64  *		whether the values in the block range cannot be merged
65  *		(e.g. an IPv6 address amidst IPv4 addresses)
66  * INCLUSION_CONTAINS_EMPTY
67  *		whether an empty value is present in any tuple
68  *		in the block range
69  */
70 #define INCLUSION_UNION				0
71 #define INCLUSION_UNMERGEABLE		1
72 #define INCLUSION_CONTAINS_EMPTY	2
73 
74 
75 typedef struct InclusionOpaque
76 {
77 	FmgrInfo	extra_procinfos[INCLUSION_MAX_PROCNUMS];
78 	bool		extra_proc_missing[INCLUSION_MAX_PROCNUMS];
79 	Oid			cached_subtype;
80 	FmgrInfo	strategy_procinfos[RTMaxStrategyNumber];
81 } InclusionOpaque;
82 
83 Datum		brin_inclusion_opcinfo(PG_FUNCTION_ARGS);
84 Datum		brin_inclusion_add_value(PG_FUNCTION_ARGS);
85 Datum		brin_inclusion_consistent(PG_FUNCTION_ARGS);
86 Datum		brin_inclusion_union(PG_FUNCTION_ARGS);
87 static FmgrInfo *inclusion_get_procinfo(BrinDesc *bdesc, uint16 attno,
88 					   uint16 procnum);
89 static FmgrInfo *inclusion_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno,
90 								Oid subtype, uint16 strategynum);
91 
92 
93 /*
94  * BRIN inclusion OpcInfo function
95  */
96 Datum
brin_inclusion_opcinfo(PG_FUNCTION_ARGS)97 brin_inclusion_opcinfo(PG_FUNCTION_ARGS)
98 {
99 	Oid			typoid = PG_GETARG_OID(0);
100 	BrinOpcInfo *result;
101 	TypeCacheEntry *bool_typcache = lookup_type_cache(BOOLOID, 0);
102 
103 	/*
104 	 * All members of opaque are initialized lazily; both procinfo arrays
105 	 * start out as non-initialized by having fn_oid be InvalidOid, and
106 	 * "missing" to false, by zeroing here.  strategy_procinfos elements can
107 	 * be invalidated when cached_subtype changes by zeroing fn_oid.
108 	 * extra_procinfo entries are never invalidated, but if a lookup fails
109 	 * (which is expected), extra_proc_missing is set to true, indicating not
110 	 * to look it up again.
111 	 */
112 	result = palloc0(MAXALIGN(SizeofBrinOpcInfo(3)) + sizeof(InclusionOpaque));
113 	result->oi_nstored = 3;
114 	result->oi_opaque = (InclusionOpaque *)
115 		MAXALIGN((char *) result + SizeofBrinOpcInfo(3));
116 
117 	/* the union */
118 	result->oi_typcache[INCLUSION_UNION] =
119 		lookup_type_cache(typoid, 0);
120 
121 	/* includes elements that are not mergeable */
122 	result->oi_typcache[INCLUSION_UNMERGEABLE] = bool_typcache;
123 
124 	/* includes the empty element */
125 	result->oi_typcache[INCLUSION_CONTAINS_EMPTY] = bool_typcache;
126 
127 	PG_RETURN_POINTER(result);
128 }
129 
130 /*
131  * BRIN inclusion add value function
132  *
133  * Examine the given index tuple (which contains partial status of a certain
134  * page range) by comparing it to the given value that comes from another heap
135  * tuple.  If the new value is outside the union specified by the existing
136  * tuple values, update the index tuple and return true.  Otherwise, return
137  * false and do not modify in this case.
138  */
139 Datum
brin_inclusion_add_value(PG_FUNCTION_ARGS)140 brin_inclusion_add_value(PG_FUNCTION_ARGS)
141 {
142 	BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
143 	BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
144 	Datum		newval = PG_GETARG_DATUM(2);
145 	bool		isnull = PG_GETARG_BOOL(3);
146 	Oid			colloid = PG_GET_COLLATION();
147 	FmgrInfo   *finfo;
148 	Datum		result;
149 	bool		new = false;
150 	AttrNumber	attno;
151 	Form_pg_attribute attr;
152 
153 	/*
154 	 * If the new value is null, we record that we saw it if it's the first
155 	 * one; otherwise, there's nothing to do.
156 	 */
157 	if (isnull)
158 	{
159 		if (column->bv_hasnulls)
160 			PG_RETURN_BOOL(false);
161 
162 		column->bv_hasnulls = true;
163 		PG_RETURN_BOOL(true);
164 	}
165 
166 	attno = column->bv_attno;
167 	attr = bdesc->bd_tupdesc->attrs[attno - 1];
168 
169 	/*
170 	 * If the recorded value is null, copy the new value (which we know to be
171 	 * not null), and we're almost done.
172 	 */
173 	if (column->bv_allnulls)
174 	{
175 		column->bv_values[INCLUSION_UNION] =
176 			datumCopy(newval, attr->attbyval, attr->attlen);
177 		column->bv_values[INCLUSION_UNMERGEABLE] = BoolGetDatum(false);
178 		column->bv_values[INCLUSION_CONTAINS_EMPTY] = BoolGetDatum(false);
179 		column->bv_allnulls = false;
180 		new = true;
181 	}
182 
183 	/*
184 	 * No need for further processing if the block range is marked as
185 	 * containing unmergeable values.
186 	 */
187 	if (DatumGetBool(column->bv_values[INCLUSION_UNMERGEABLE]))
188 		PG_RETURN_BOOL(false);
189 
190 	/*
191 	 * If the opclass supports the concept of empty values, test the passed
192 	 * new value for emptiness; if it returns true, we need to set the
193 	 * "contains empty" flag in the element (unless already set).
194 	 */
195 	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_EMPTY);
196 	if (finfo != NULL && DatumGetBool(FunctionCall1Coll(finfo, colloid, newval)))
197 	{
198 		if (!DatumGetBool(column->bv_values[INCLUSION_CONTAINS_EMPTY]))
199 		{
200 			column->bv_values[INCLUSION_CONTAINS_EMPTY] = BoolGetDatum(true);
201 			PG_RETURN_BOOL(true);
202 		}
203 
204 		PG_RETURN_BOOL(false);
205 	}
206 
207 	if (new)
208 		PG_RETURN_BOOL(true);
209 
210 	/* Check if the new value is already contained. */
211 	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_CONTAINS);
212 	if (finfo != NULL &&
213 		DatumGetBool(FunctionCall2Coll(finfo, colloid,
214 									   column->bv_values[INCLUSION_UNION],
215 									   newval)))
216 		PG_RETURN_BOOL(false);
217 
218 	/*
219 	 * Check if the new value is mergeable to the existing union.  If it is
220 	 * not, mark the value as containing unmergeable elements and get out.
221 	 *
222 	 * Note: at this point we could remove the value from the union, since
223 	 * it's not going to be used any longer.  However, the BRIN framework
224 	 * doesn't allow for the value not being present.  Improve someday.
225 	 */
226 	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGEABLE);
227 	if (finfo != NULL &&
228 		!DatumGetBool(FunctionCall2Coll(finfo, colloid,
229 										column->bv_values[INCLUSION_UNION],
230 										newval)))
231 	{
232 		column->bv_values[INCLUSION_UNMERGEABLE] = BoolGetDatum(true);
233 		PG_RETURN_BOOL(true);
234 	}
235 
236 	/* Finally, merge the new value to the existing union. */
237 	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGE);
238 	Assert(finfo != NULL);
239 	result = FunctionCall2Coll(finfo, colloid,
240 							   column->bv_values[INCLUSION_UNION], newval);
241 	if (!attr->attbyval &&
242 		DatumGetPointer(result) != DatumGetPointer(column->bv_values[INCLUSION_UNION]))
243 	{
244 		pfree(DatumGetPointer(column->bv_values[INCLUSION_UNION]));
245 
246 		if (result == newval)
247 			result = datumCopy(result, attr->attbyval, attr->attlen);
248 	}
249 	column->bv_values[INCLUSION_UNION] = result;
250 
251 	PG_RETURN_BOOL(true);
252 }
253 
254 /*
255  * BRIN inclusion consistent function
256  *
257  * All of the strategies are optional.
258  */
259 Datum
brin_inclusion_consistent(PG_FUNCTION_ARGS)260 brin_inclusion_consistent(PG_FUNCTION_ARGS)
261 {
262 	BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
263 	BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
264 	ScanKey		key = (ScanKey) PG_GETARG_POINTER(2);
265 	Oid			colloid = PG_GET_COLLATION(),
266 				subtype;
267 	Datum		unionval;
268 	AttrNumber	attno;
269 	Datum		query;
270 	FmgrInfo   *finfo;
271 	Datum		result;
272 
273 	Assert(key->sk_attno == column->bv_attno);
274 
275 	/* Handle IS NULL/IS NOT NULL tests. */
276 	if (key->sk_flags & SK_ISNULL)
277 	{
278 		if (key->sk_flags & SK_SEARCHNULL)
279 		{
280 			if (column->bv_allnulls || column->bv_hasnulls)
281 				PG_RETURN_BOOL(true);
282 			PG_RETURN_BOOL(false);
283 		}
284 
285 		/*
286 		 * For IS NOT NULL, we can only skip ranges that are known to have
287 		 * only nulls.
288 		 */
289 		if (key->sk_flags & SK_SEARCHNOTNULL)
290 			PG_RETURN_BOOL(!column->bv_allnulls);
291 
292 		/*
293 		 * Neither IS NULL nor IS NOT NULL was used; assume all indexable
294 		 * operators are strict and return false.
295 		 */
296 		PG_RETURN_BOOL(false);
297 	}
298 
299 	/* If it is all nulls, it cannot possibly be consistent. */
300 	if (column->bv_allnulls)
301 		PG_RETURN_BOOL(false);
302 
303 	/* It has to be checked, if it contains elements that are not mergeable. */
304 	if (DatumGetBool(column->bv_values[INCLUSION_UNMERGEABLE]))
305 		PG_RETURN_BOOL(true);
306 
307 	attno = key->sk_attno;
308 	subtype = key->sk_subtype;
309 	query = key->sk_argument;
310 	unionval = column->bv_values[INCLUSION_UNION];
311 	switch (key->sk_strategy)
312 	{
313 			/*
314 			 * Placement strategies
315 			 *
316 			 * These are implemented by logically negating the result of the
317 			 * converse placement operator; for this to work, the converse
318 			 * operator must be part of the opclass.  An error will be thrown
319 			 * by inclusion_get_strategy_procinfo() if the required strategy
320 			 * is not part of the opclass.
321 			 *
322 			 * These all return false if either argument is empty, so there is
323 			 * no need to check for empty elements.
324 			 */
325 
326 		case RTLeftStrategyNumber:
327 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
328 												  RTOverRightStrategyNumber);
329 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
330 			PG_RETURN_BOOL(!DatumGetBool(result));
331 
332 		case RTOverLeftStrategyNumber:
333 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
334 													RTRightStrategyNumber);
335 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
336 			PG_RETURN_BOOL(!DatumGetBool(result));
337 
338 		case RTOverRightStrategyNumber:
339 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
340 													RTLeftStrategyNumber);
341 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
342 			PG_RETURN_BOOL(!DatumGetBool(result));
343 
344 		case RTRightStrategyNumber:
345 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
346 													RTOverLeftStrategyNumber);
347 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
348 			PG_RETURN_BOOL(!DatumGetBool(result));
349 
350 		case RTBelowStrategyNumber:
351 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
352 												  RTOverAboveStrategyNumber);
353 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
354 			PG_RETURN_BOOL(!DatumGetBool(result));
355 
356 		case RTOverBelowStrategyNumber:
357 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
358 													RTAboveStrategyNumber);
359 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
360 			PG_RETURN_BOOL(!DatumGetBool(result));
361 
362 		case RTOverAboveStrategyNumber:
363 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
364 													RTBelowStrategyNumber);
365 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
366 			PG_RETURN_BOOL(!DatumGetBool(result));
367 
368 		case RTAboveStrategyNumber:
369 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
370 												  RTOverBelowStrategyNumber);
371 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
372 			PG_RETURN_BOOL(!DatumGetBool(result));
373 
374 			/*
375 			 * Overlap and contains strategies
376 			 *
377 			 * These strategies are simple enough that we can simply call the
378 			 * operator and return its result.  Empty elements don't change
379 			 * the result.
380 			 */
381 
382 		case RTOverlapStrategyNumber:
383 		case RTContainsStrategyNumber:
384 		case RTOldContainsStrategyNumber:
385 		case RTContainsElemStrategyNumber:
386 		case RTSubStrategyNumber:
387 		case RTSubEqualStrategyNumber:
388 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
389 													key->sk_strategy);
390 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
391 			PG_RETURN_DATUM(result);
392 
393 			/*
394 			 * Contained by strategies
395 			 *
396 			 * We cannot just call the original operator for the contained by
397 			 * strategies because some elements can be contained even though
398 			 * the union is not; instead we use the overlap operator.
399 			 *
400 			 * We check for empty elements separately as they are not merged
401 			 * to the union but contained by everything.
402 			 */
403 
404 		case RTContainedByStrategyNumber:
405 		case RTOldContainedByStrategyNumber:
406 		case RTSuperStrategyNumber:
407 		case RTSuperEqualStrategyNumber:
408 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
409 													RTOverlapStrategyNumber);
410 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
411 			if (DatumGetBool(result))
412 				PG_RETURN_BOOL(true);
413 
414 			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
415 
416 			/*
417 			 * Adjacent strategy
418 			 *
419 			 * We test for overlap first but to be safe we need to call the
420 			 * actual adjacent operator also.
421 			 *
422 			 * An empty element cannot be adjacent to any other, so there is
423 			 * no need to check for it.
424 			 */
425 
426 		case RTAdjacentStrategyNumber:
427 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
428 													RTOverlapStrategyNumber);
429 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
430 			if (DatumGetBool(result))
431 				PG_RETURN_BOOL(true);
432 
433 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
434 													RTAdjacentStrategyNumber);
435 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
436 			PG_RETURN_DATUM(result);
437 
438 			/*
439 			 * Basic comparison strategies
440 			 *
441 			 * It is straightforward to support the equality strategies with
442 			 * the contains operator.  Generally, inequality strategies do not
443 			 * make much sense for the types which will be used with the
444 			 * inclusion BRIN family of opclasses, but is is possible to
445 			 * implement them with logical negation of the left-of and
446 			 * right-of operators.
447 			 *
448 			 * NB: These strategies cannot be used with geometric datatypes
449 			 * that use comparison of areas!  The only exception is the "same"
450 			 * strategy.
451 			 *
452 			 * Empty elements are considered to be less than the others.  We
453 			 * cannot use the empty support function to check the query is an
454 			 * empty element, because the query can be another data type than
455 			 * the empty support function argument.  So we will return true,
456 			 * if there is a possibility that empty elements will change the
457 			 * result.
458 			 */
459 
460 		case RTLessStrategyNumber:
461 		case RTLessEqualStrategyNumber:
462 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
463 													RTRightStrategyNumber);
464 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
465 			if (!DatumGetBool(result))
466 				PG_RETURN_BOOL(true);
467 
468 			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
469 
470 		case RTSameStrategyNumber:
471 		case RTEqualStrategyNumber:
472 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
473 													RTContainsStrategyNumber);
474 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
475 			if (DatumGetBool(result))
476 				PG_RETURN_BOOL(true);
477 
478 			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
479 
480 		case RTGreaterEqualStrategyNumber:
481 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
482 													RTLeftStrategyNumber);
483 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
484 			if (!DatumGetBool(result))
485 				PG_RETURN_BOOL(true);
486 
487 			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
488 
489 		case RTGreaterStrategyNumber:
490 			/* no need to check for empty elements */
491 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
492 													RTLeftStrategyNumber);
493 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
494 			PG_RETURN_BOOL(!DatumGetBool(result));
495 
496 		default:
497 			/* shouldn't happen */
498 			elog(ERROR, "invalid strategy number %d", key->sk_strategy);
499 			PG_RETURN_BOOL(false);
500 	}
501 }
502 
503 /*
504  * BRIN inclusion union function
505  *
506  * Given two BrinValues, update the first of them as a union of the summary
507  * values contained in both.  The second one is untouched.
508  */
509 Datum
brin_inclusion_union(PG_FUNCTION_ARGS)510 brin_inclusion_union(PG_FUNCTION_ARGS)
511 {
512 	BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
513 	BrinValues *col_a = (BrinValues *) PG_GETARG_POINTER(1);
514 	BrinValues *col_b = (BrinValues *) PG_GETARG_POINTER(2);
515 	Oid			colloid = PG_GET_COLLATION();
516 	AttrNumber	attno;
517 	Form_pg_attribute attr;
518 	FmgrInfo   *finfo;
519 	Datum		result;
520 
521 	Assert(col_a->bv_attno == col_b->bv_attno);
522 
523 	/* Adjust "hasnulls". */
524 	if (!col_a->bv_hasnulls && col_b->bv_hasnulls)
525 		col_a->bv_hasnulls = true;
526 
527 	/* If there are no values in B, there's nothing left to do. */
528 	if (col_b->bv_allnulls)
529 		PG_RETURN_VOID();
530 
531 	attno = col_a->bv_attno;
532 	attr = bdesc->bd_tupdesc->attrs[attno - 1];
533 
534 	/*
535 	 * Adjust "allnulls".  If A doesn't have values, just copy the values from
536 	 * B into A, and we're done.  We cannot run the operators in this case,
537 	 * because values in A might contain garbage.  Note we already established
538 	 * that B contains values.
539 	 */
540 	if (col_a->bv_allnulls)
541 	{
542 		col_a->bv_allnulls = false;
543 		col_a->bv_values[INCLUSION_UNION] =
544 			datumCopy(col_b->bv_values[INCLUSION_UNION],
545 					  attr->attbyval, attr->attlen);
546 		col_a->bv_values[INCLUSION_UNMERGEABLE] =
547 			col_b->bv_values[INCLUSION_UNMERGEABLE];
548 		col_a->bv_values[INCLUSION_CONTAINS_EMPTY] =
549 			col_b->bv_values[INCLUSION_CONTAINS_EMPTY];
550 		PG_RETURN_VOID();
551 	}
552 
553 	/* If B includes empty elements, mark A similarly, if needed. */
554 	if (!DatumGetBool(col_a->bv_values[INCLUSION_CONTAINS_EMPTY]) &&
555 		DatumGetBool(col_b->bv_values[INCLUSION_CONTAINS_EMPTY]))
556 		col_a->bv_values[INCLUSION_CONTAINS_EMPTY] = BoolGetDatum(true);
557 
558 	/* Check if A includes elements that are not mergeable. */
559 	if (DatumGetBool(col_a->bv_values[INCLUSION_UNMERGEABLE]))
560 		PG_RETURN_VOID();
561 
562 	/* If B includes elements that are not mergeable, mark A similarly. */
563 	if (DatumGetBool(col_b->bv_values[INCLUSION_UNMERGEABLE]))
564 	{
565 		col_a->bv_values[INCLUSION_UNMERGEABLE] = BoolGetDatum(true);
566 		PG_RETURN_VOID();
567 	}
568 
569 	/* Check if A and B are mergeable; if not, mark A unmergeable. */
570 	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGEABLE);
571 	if (finfo != NULL &&
572 		!DatumGetBool(FunctionCall2Coll(finfo, colloid,
573 										col_a->bv_values[INCLUSION_UNION],
574 										col_b->bv_values[INCLUSION_UNION])))
575 	{
576 		col_a->bv_values[INCLUSION_UNMERGEABLE] = BoolGetDatum(true);
577 		PG_RETURN_VOID();
578 	}
579 
580 	/* Finally, merge B to A. */
581 	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGE);
582 	Assert(finfo != NULL);
583 	result = FunctionCall2Coll(finfo, colloid,
584 							   col_a->bv_values[INCLUSION_UNION],
585 							   col_b->bv_values[INCLUSION_UNION]);
586 	if (!attr->attbyval &&
587 		DatumGetPointer(result) != DatumGetPointer(col_a->bv_values[INCLUSION_UNION]))
588 	{
589 		pfree(DatumGetPointer(col_a->bv_values[INCLUSION_UNION]));
590 
591 		if (result == col_b->bv_values[INCLUSION_UNION])
592 			result = datumCopy(result, attr->attbyval, attr->attlen);
593 	}
594 	col_a->bv_values[INCLUSION_UNION] = result;
595 
596 	PG_RETURN_VOID();
597 }
598 
599 /*
600  * Cache and return inclusion opclass support procedure
601  *
602  * Return the procedure corresponding to the given function support number
603  * or null if it is not exists.
604  */
605 static FmgrInfo *
inclusion_get_procinfo(BrinDesc * bdesc,uint16 attno,uint16 procnum)606 inclusion_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
607 {
608 	InclusionOpaque *opaque;
609 	uint16		basenum = procnum - PROCNUM_BASE;
610 
611 	/*
612 	 * We cache these in the opaque struct, to avoid repetitive syscache
613 	 * lookups.
614 	 */
615 	opaque = (InclusionOpaque *) bdesc->bd_info[attno - 1]->oi_opaque;
616 
617 	/*
618 	 * If we already searched for this proc and didn't find it, don't bother
619 	 * searching again.
620 	 */
621 	if (opaque->extra_proc_missing[basenum])
622 		return NULL;
623 
624 	if (opaque->extra_procinfos[basenum].fn_oid == InvalidOid)
625 	{
626 		if (RegProcedureIsValid(index_getprocid(bdesc->bd_index, attno,
627 												procnum)))
628 		{
629 			fmgr_info_copy(&opaque->extra_procinfos[basenum],
630 						   index_getprocinfo(bdesc->bd_index, attno, procnum),
631 						   bdesc->bd_context);
632 		}
633 		else
634 		{
635 			opaque->extra_proc_missing[basenum] = true;
636 			return NULL;
637 		}
638 	}
639 
640 	return &opaque->extra_procinfos[basenum];
641 }
642 
643 /*
644  * Cache and return the procedure of the given strategy
645  *
646  * Return the procedure corresponding to the given sub-type and strategy
647  * number.  The data type of the index will be used as the left hand side of
648  * the operator and the given sub-type will be used as the right hand side.
649  * Throws an error if the pg_amop row does not exist, but that should not
650  * happen with a properly configured opclass.
651  *
652  * It always throws an error when the data type of the opclass is different
653  * from the data type of the column or the expression.  That happens when the
654  * column data type has implicit cast to the opclass data type.  We don't
655  * bother casting types, because this situation can easily be avoided by
656  * setting storage data type to that of the opclass.  The same problem does not
657  * apply to the data type of the right hand side, because the type in the
658  * ScanKey always matches the opclass' one.
659  *
660  * Note: this function mirrors minmax_get_strategy_procinfo; if changes are
661  * made here, see that function too.
662  */
663 static FmgrInfo *
inclusion_get_strategy_procinfo(BrinDesc * bdesc,uint16 attno,Oid subtype,uint16 strategynum)664 inclusion_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno, Oid subtype,
665 								uint16 strategynum)
666 {
667 	InclusionOpaque *opaque;
668 
669 	Assert(strategynum >= 1 &&
670 		   strategynum <= RTMaxStrategyNumber);
671 
672 	opaque = (InclusionOpaque *) bdesc->bd_info[attno - 1]->oi_opaque;
673 
674 	/*
675 	 * We cache the procedures for the last sub-type in the opaque struct, to
676 	 * avoid repetitive syscache lookups.  If the sub-type is changed,
677 	 * invalidate all the cached entries.
678 	 */
679 	if (opaque->cached_subtype != subtype)
680 	{
681 		uint16		i;
682 
683 		for (i = 1; i <= RTMaxStrategyNumber; i++)
684 			opaque->strategy_procinfos[i - 1].fn_oid = InvalidOid;
685 		opaque->cached_subtype = subtype;
686 	}
687 
688 	if (opaque->strategy_procinfos[strategynum - 1].fn_oid == InvalidOid)
689 	{
690 		Form_pg_attribute attr;
691 		HeapTuple	tuple;
692 		Oid			opfamily,
693 					oprid;
694 		bool		isNull;
695 
696 		opfamily = bdesc->bd_index->rd_opfamily[attno - 1];
697 		attr = bdesc->bd_tupdesc->attrs[attno - 1];
698 		tuple = SearchSysCache4(AMOPSTRATEGY, ObjectIdGetDatum(opfamily),
699 								ObjectIdGetDatum(attr->atttypid),
700 								ObjectIdGetDatum(subtype),
701 								Int16GetDatum(strategynum));
702 
703 		if (!HeapTupleIsValid(tuple))
704 			elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
705 				 strategynum, attr->atttypid, subtype, opfamily);
706 
707 		oprid = DatumGetObjectId(SysCacheGetAttr(AMOPSTRATEGY, tuple,
708 											 Anum_pg_amop_amopopr, &isNull));
709 		ReleaseSysCache(tuple);
710 		Assert(!isNull && RegProcedureIsValid(oprid));
711 
712 		fmgr_info_cxt(get_opcode(oprid),
713 					  &opaque->strategy_procinfos[strategynum - 1],
714 					  bdesc->bd_context);
715 	}
716 
717 	return &opaque->strategy_procinfos[strategynum - 1];
718 }
719