1 /*-------------------------------------------------------------------------
2  *
3  * query_colocation_checker.c implements the logic for determining
4  * whether any subqueries in a given query are co-located (e.g.,
5  * distribution keys of the relations inside subqueries are equal).
6  *
7  * The main logic behind non colocated subquery joins is that we pick
8  * an anchor range table entry and check for distribution key equality
9  * of any other subqueries in the given query. If for a given subquery,
10  * we cannot find distribution key equality with the anchor rte, we
11  * recursively plan that subquery.
12  *
13  * We also used a hacky solution for picking relations as the anchor range
14  * table entries. The hack is that we wrap them into a subquery. This is only
15  * necessary since some of the attribute equivalence checks are based on
16  * queries rather than range table entries.
17  *
18  * Copyright (c) Citus Data, Inc.
19  *-------------------------------------------------------------------------
20  */
21 
22 #include "postgres.h"
23 
24 #include "distributed/pg_version_constants.h"
25 
26 #include "access/relation.h"
27 #include "distributed/multi_logical_planner.h"
28 #include "distributed/query_colocation_checker.h"
29 #include "distributed/pg_dist_partition.h"
30 #include "distributed/relation_restriction_equivalence.h"
31 #include "distributed/metadata_cache.h"
32 #include "distributed/multi_logical_planner.h" /* only to access utility functions */
33 
34 #include "catalog/pg_type.h"
35 #include "nodes/makefuncs.h"
36 #include "nodes/nodeFuncs.h"
37 #include "parser/parsetree.h"
38 #include "distributed/listutils.h"
39 #include "parser/parse_relation.h"
40 #include "optimizer/planner.h"
41 #include "optimizer/prep.h"
42 #include "utils/rel.h"
43 
44 
45 static RangeTblEntry * AnchorRte(Query *subquery);
46 static List * UnionRelationRestrictionLists(List *firstRelationList,
47 											List *secondRelationList);
48 static List * CreateFilteredTargetListForRelation(Oid relationId,
49 												  List *requiredAttributes);
50 static List * CreateDummyTargetList(Oid relationId, List *requiredAttributes);
51 static TargetEntry * CreateTargetEntryForColumn(Form_pg_attribute attributeTuple, Index
52 												rteIndex,
53 												int attributeNumber, int resno);
54 static TargetEntry * CreateTargetEntryForNullCol(Form_pg_attribute attributeTuple, int
55 												 resno);
56 static TargetEntry * CreateUnusedTargetEntry(int resno);
57 
58 /*
59  * CreateColocatedJoinChecker is a helper function that simply calculates
60  * a ColocatedJoinChecker with the given input and returns it.
61  */
62 ColocatedJoinChecker
CreateColocatedJoinChecker(Query * subquery,PlannerRestrictionContext * restrictionContext)63 CreateColocatedJoinChecker(Query *subquery, PlannerRestrictionContext *restrictionContext)
64 {
65 	ColocatedJoinChecker colocatedJoinChecker = { 0 };
66 
67 	Query *anchorSubquery = NULL;
68 
69 	/* we couldn't pick an anchor subquery, no need to continue */
70 	RangeTblEntry *anchorRangeTblEntry = AnchorRte(subquery);
71 	if (anchorRangeTblEntry == NULL)
72 	{
73 		colocatedJoinChecker.anchorRelationRestrictionList = NIL;
74 
75 		return colocatedJoinChecker;
76 	}
77 
78 	if (anchorRangeTblEntry->rtekind == RTE_RELATION)
79 	{
80 		/*
81 		 * If we get a relation as our anchor, wrap into a subquery. The only
82 		 * reason that we wrap the relation into a subquery is that some of the utility
83 		 * functions (i.e., FilterPlannerRestrictionForQuery()) rely on queries
84 		 * not relations.
85 		 */
86 		anchorSubquery = WrapRteRelationIntoSubquery(anchorRangeTblEntry, NIL);
87 	}
88 	else if (anchorRangeTblEntry->rtekind == RTE_SUBQUERY)
89 	{
90 		anchorSubquery = anchorRangeTblEntry->subquery;
91 	}
92 	else
93 	{
94 		/* we don't expect any other RTE type here */
95 		pg_unreachable();
96 	}
97 
98 	PlannerRestrictionContext *anchorPlannerRestrictionContext =
99 		FilterPlannerRestrictionForQuery(restrictionContext, anchorSubquery);
100 	RelationRestrictionContext *anchorRelationRestrictionContext =
101 		anchorPlannerRestrictionContext->relationRestrictionContext;
102 	List *anchorRestrictionEquivalences =
103 		GenerateAllAttributeEquivalences(anchorPlannerRestrictionContext);
104 
105 	/* fill the non colocated planning context */
106 	colocatedJoinChecker.subquery = subquery;
107 	colocatedJoinChecker.subqueryPlannerRestriction = restrictionContext;
108 
109 	colocatedJoinChecker.anchorRelationRestrictionList =
110 		anchorRelationRestrictionContext->relationRestrictionList;
111 	colocatedJoinChecker.anchorAttributeEquivalences = anchorRestrictionEquivalences;
112 
113 	return colocatedJoinChecker;
114 }
115 
116 
117 /*
118  * AnchorRte gets a query and searches for a relation or a subquery within
119  * the join tree of the query such that we can use it as our anchor range
120  * table entry during our non colocated subquery planning.
121  *
122  * The function returns NULL if it cannot find a proper range table entry for our
123  * purposes. See the function for the details.
124  */
125 static RangeTblEntry *
AnchorRte(Query * subquery)126 AnchorRte(Query *subquery)
127 {
128 	FromExpr *joinTree = subquery->jointree;
129 	Relids joinRelIds = get_relids_in_jointree((Node *) joinTree, false);
130 	int currentRTEIndex = -1;
131 	RangeTblEntry *anchorRangeTblEntry = NULL;
132 
133 	/*
134 	 * Pick a random anchor relation or subquery (i.e., the first) for now. We
135 	 * might consider picking a better rte as the anchor. For example, we could
136 	 * iterate on the joinRelIds, and check which rteIndex has more distribution
137 	 * key equiality with rteIndexes. For the time being, the current primitive
138 	 * approach helps us in many cases.
139 	 */
140 	while ((currentRTEIndex = bms_next_member(joinRelIds, currentRTEIndex)) >= 0)
141 	{
142 		RangeTblEntry *currentRte = rt_fetch(currentRTEIndex, subquery->rtable);
143 
144 		/*
145 		 * We always prefer distributed relations if we can find any. The
146 		 * reason is that Citus is currently able to recursively plan
147 		 * subqueries, but not relations.
148 		 *
149 		 * For the subqueries, make sure that the subquery contains at least one
150 		 * distributed table and doesn't have a set operation.
151 		 *
152 		 * TODO: The set operation restriction might sound weird, but, the restriction
153 		 * equivalence generation functions ignore set operations. We should
154 		 * integrate the logic in SafeToPushdownUnionSubquery() to
155 		 * GenerateAllAttributeEquivalences() such that the latter becomes aware of
156 		 * the set operations.
157 		 */
158 		if (anchorRangeTblEntry == NULL && currentRte->rtekind == RTE_SUBQUERY &&
159 			FindNodeMatchingCheckFunction((Node *) currentRte->subquery,
160 										  IsDistributedTableRTE) &&
161 			currentRte->subquery->setOperations == NULL &&
162 			!ContainsUnionSubquery(currentRte->subquery))
163 		{
164 			/* found a subquery, keep it if we cannot find a relation */
165 			anchorRangeTblEntry = currentRte;
166 		}
167 		else if (currentRte->rtekind == RTE_RELATION)
168 		{
169 			Oid relationId = currentRte->relid;
170 
171 			if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
172 			{
173 				/*
174 				 * Non-distributed tables should not be the anchor rte since they
175 				 * don't have distribution key.
176 				 */
177 				continue;
178 			}
179 
180 			anchorRangeTblEntry = currentRte;
181 			break;
182 		}
183 	}
184 
185 	return anchorRangeTblEntry;
186 }
187 
188 
189 /*
190  * SubqueryColocated returns true if the input subquery has a distribution
191  * key equality with the anchor subquery. In other words, we refer the
192  * distribution key equality of relations as "colocation" in this context.
193  */
194 bool
SubqueryColocated(Query * subquery,ColocatedJoinChecker * checker)195 SubqueryColocated(Query *subquery, ColocatedJoinChecker *checker)
196 {
197 	List *anchorRelationRestrictionList = checker->anchorRelationRestrictionList;
198 	List *anchorAttributeEquivalences = checker->anchorAttributeEquivalences;
199 
200 	PlannerRestrictionContext *restrictionContext = checker->subqueryPlannerRestriction;
201 	PlannerRestrictionContext *filteredPlannerContext =
202 		FilterPlannerRestrictionForQuery(restrictionContext, subquery);
203 	List *filteredRestrictionList =
204 		filteredPlannerContext->relationRestrictionContext->relationRestrictionList;
205 
206 
207 	/*
208 	 * There are no relations in the input subquery, such as a subquery
209 	 * that consist of only intermediate results or without FROM
210 	 * clause or subquery in WHERE clause anded with FALSE.
211 	 *
212 	 * Note that for the subquery in WHERE clause, the input original
213 	 * subquery (a.k.a., which didn't go through standard_planner()) may
214 	 * contain distributed relations, but postgres is smart enough to
215 	 * not generate the restriction information. That's the reason for
216 	 * not asserting non-existence of distributed relations.
217 	 */
218 	if (list_length(filteredRestrictionList) == 0)
219 	{
220 		return true;
221 	}
222 
223 	/*
224 	 * We merge the relation restrictions of the input subquery and the anchor
225 	 * restrictions to form a temporary relation restriction context. The aim of
226 	 * forming this temporary context is to check whether the context contains
227 	 * distribution key equality or not.
228 	 */
229 	List *unionedRelationRestrictionList =
230 		UnionRelationRestrictionLists(anchorRelationRestrictionList,
231 									  filteredRestrictionList);
232 
233 	/*
234 	 * We already have the attributeEquivalences, thus, only need to prepare
235 	 * the planner restrictions with unioned relations for our purpose of
236 	 * distribution key equality. Note that we don't need to calculate the
237 	 * join restrictions, we're already relying on the attributeEquivalences
238 	 * provided by the context.
239 	 */
240 	RelationRestrictionContext *unionedRelationRestrictionContext = palloc0(
241 		sizeof(RelationRestrictionContext));
242 	unionedRelationRestrictionContext->relationRestrictionList =
243 		unionedRelationRestrictionList;
244 
245 	PlannerRestrictionContext *unionedPlannerRestrictionContext = palloc0(
246 		sizeof(PlannerRestrictionContext));
247 	unionedPlannerRestrictionContext->relationRestrictionContext =
248 		unionedRelationRestrictionContext;
249 
250 	if (!RestrictionEquivalenceForPartitionKeysViaEquivalences(
251 			unionedPlannerRestrictionContext,
252 			anchorAttributeEquivalences))
253 	{
254 		return false;
255 	}
256 
257 	return true;
258 }
259 
260 
261 /*
262  * WrapRteRelationIntoSubquery wraps the given relation range table entry
263  * in a newly constructed "(SELECT * FROM table_name as anchor_relation)" query.
264  *
265  * Note that the query returned by this function does not contain any filters or
266  * projections. The returned query should be used cautiosly and it is mostly
267  * designed for generating a stub query.
268  */
269 Query *
WrapRteRelationIntoSubquery(RangeTblEntry * rteRelation,List * requiredAttributes)270 WrapRteRelationIntoSubquery(RangeTblEntry *rteRelation, List *requiredAttributes)
271 {
272 	Query *subquery = makeNode(Query);
273 	RangeTblRef *newRangeTableRef = makeNode(RangeTblRef);
274 
275 	subquery->commandType = CMD_SELECT;
276 
277 	/* we copy the input rteRelation to preserve the rteIdentity */
278 	RangeTblEntry *newRangeTableEntry = copyObject(rteRelation);
279 	subquery->rtable = list_make1(newRangeTableEntry);
280 
281 	/* set the FROM expression to the subquery */
282 	newRangeTableRef = makeNode(RangeTblRef);
283 	newRangeTableRef->rtindex = SINGLE_RTE_INDEX;
284 	subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);
285 
286 	subquery->targetList =
287 		CreateFilteredTargetListForRelation(rteRelation->relid, requiredAttributes);
288 
289 	if (list_length(subquery->targetList) == 0)
290 	{
291 		/*
292 		 * in case there is no required column, we assign one dummy NULL target entry
293 		 * to the subquery targetList so that it has at least one target.
294 		 * (targetlist should have at least one element)
295 		 */
296 		subquery->targetList = CreateDummyTargetList(rteRelation->relid,
297 													 requiredAttributes);
298 	}
299 
300 	return subquery;
301 }
302 
303 
304 /*
305  * CreateAllTargetListForRelation creates a target list which contains all the columns
306  * of the given relation. If the column is not in required columns, then it is added
307  * as a NULL column.
308  */
309 List *
CreateAllTargetListForRelation(Oid relationId,List * requiredAttributes)310 CreateAllTargetListForRelation(Oid relationId, List *requiredAttributes)
311 {
312 	Relation relation = relation_open(relationId, AccessShareLock);
313 	int numberOfAttributes = RelationGetNumberOfAttributes(relation);
314 
315 	List *targetList = NIL;
316 	int varAttrNo = 1;
317 
318 	for (int attrNum = 1; attrNum <= numberOfAttributes; attrNum++)
319 	{
320 		Form_pg_attribute attributeTuple =
321 			TupleDescAttr(relation->rd_att, attrNum - 1);
322 
323 		int resNo = attrNum;
324 
325 		if (attributeTuple->attisdropped)
326 		{
327 			/*
328 			 * For dropped columns, we generate a dummy null column because
329 			 * varattno in relation and subquery are different things, however if
330 			 * we put the NULL columns to the subquery for the dropped columns,
331 			 * they will point to the same variable.
332 			 */
333 			TargetEntry *nullTargetEntry = CreateUnusedTargetEntry(resNo);
334 			targetList = lappend(targetList, nullTargetEntry);
335 			continue;
336 		}
337 
338 		if (!list_member_int(requiredAttributes, attrNum))
339 		{
340 			TargetEntry *nullTargetEntry =
341 				CreateTargetEntryForNullCol(attributeTuple, resNo);
342 			targetList = lappend(targetList, nullTargetEntry);
343 		}
344 		else
345 		{
346 			TargetEntry *targetEntry =
347 				CreateTargetEntryForColumn(attributeTuple, SINGLE_RTE_INDEX, varAttrNo++,
348 										   resNo);
349 			targetList = lappend(targetList, targetEntry);
350 		}
351 	}
352 
353 	relation_close(relation, NoLock);
354 	return targetList;
355 }
356 
357 
358 /*
359  * CreateFilteredTargetListForRelation creates a target list which contains
360  * only the required columns of the given relation. If there is not required
361  * columns then a dummy NULL column is put as the only entry.
362  */
363 static List *
CreateFilteredTargetListForRelation(Oid relationId,List * requiredAttributes)364 CreateFilteredTargetListForRelation(Oid relationId, List *requiredAttributes)
365 {
366 	Relation relation = relation_open(relationId, AccessShareLock);
367 	int numberOfAttributes = RelationGetNumberOfAttributes(relation);
368 
369 	List *targetList = NIL;
370 	int resultNo = 1;
371 	for (int attrNum = 1; attrNum <= numberOfAttributes; attrNum++)
372 	{
373 		Form_pg_attribute attributeTuple =
374 			TupleDescAttr(relation->rd_att, attrNum - 1);
375 
376 		if (list_member_int(requiredAttributes, attrNum))
377 		{
378 			/* In the subquery with only required attribute numbers, the result no
379 			 * corresponds to the ordinal index of it in targetList.
380 			 */
381 			TargetEntry *targetEntry =
382 				CreateTargetEntryForColumn(attributeTuple, SINGLE_RTE_INDEX, attrNum,
383 										   resultNo++);
384 			targetList = lappend(targetList, targetEntry);
385 		}
386 	}
387 	relation_close(relation, NoLock);
388 	return targetList;
389 }
390 
391 
392 /*
393  * CreateDummyTargetList creates a target list which contains only a
394  * NULL entry.
395  */
396 static List *
CreateDummyTargetList(Oid relationId,List * requiredAttributes)397 CreateDummyTargetList(Oid relationId, List *requiredAttributes)
398 {
399 	int resno = 1;
400 	TargetEntry *dummyTargetEntry = CreateUnusedTargetEntry(resno);
401 	return list_make1(dummyTargetEntry);
402 }
403 
404 
405 /*
406  * CreateTargetEntryForColumn creates a target entry for the given
407  * column.
408  */
409 static TargetEntry *
CreateTargetEntryForColumn(Form_pg_attribute attributeTuple,Index rteIndex,int attributeNumber,int resno)410 CreateTargetEntryForColumn(Form_pg_attribute attributeTuple, Index rteIndex,
411 						   int attributeNumber, int resno)
412 {
413 	Var *targetColumn =
414 		makeVar(rteIndex, attributeNumber, attributeTuple->atttypid,
415 				attributeTuple->atttypmod, attributeTuple->attcollation, 0);
416 	TargetEntry *targetEntry =
417 		makeTargetEntry((Expr *) targetColumn, resno,
418 						strdup(attributeTuple->attname.data), false);
419 	return targetEntry;
420 }
421 
422 
423 /*
424  * CreateTargetEntryForNullCol creates a target entry that has a NULL expression.
425  */
426 static TargetEntry *
CreateTargetEntryForNullCol(Form_pg_attribute attributeTuple,int resno)427 CreateTargetEntryForNullCol(Form_pg_attribute attributeTuple, int resno)
428 {
429 	Expr *nullExpr = (Expr *) makeNullConst(attributeTuple->atttypid,
430 											attributeTuple->atttypmod,
431 											attributeTuple->attcollation);
432 	char *resName = attributeTuple->attname.data;
433 	TargetEntry *targetEntry =
434 		makeTargetEntry(nullExpr, resno, strdup(resName), false);
435 	return targetEntry;
436 }
437 
438 
439 /*
440  * CreateUnusedTargetEntry creates a dummy target entry which is not used
441  * in postgres query.
442  */
443 static TargetEntry *
CreateUnusedTargetEntry(int resno)444 CreateUnusedTargetEntry(int resno)
445 {
446 	StringInfo colname = makeStringInfo();
447 	appendStringInfo(colname, "dummy-%d", resno);
448 	Expr *nullExpr = (Expr *) makeNullConst(INT4OID,
449 											0,
450 											InvalidOid);
451 	TargetEntry *targetEntry =
452 		makeTargetEntry(nullExpr, resno, colname->data, false);
453 	return targetEntry;
454 }
455 
456 
457 /*
458  * UnionRelationRestrictionLists merges two relation restriction lists
459  * and returns a newly allocated list. The merged relation restriction
460  * list doesn't contain any duplicate elements.
461  */
462 static List *
UnionRelationRestrictionLists(List * firstRelationList,List * secondRelationList)463 UnionRelationRestrictionLists(List *firstRelationList, List *secondRelationList)
464 {
465 	List *unionedRelationRestrictionList = NULL;
466 	ListCell *relationRestrictionCell = NULL;
467 	Relids rteIdentities = NULL;
468 
469 	/* list_concat destructively modifies the first list, thus copy it */
470 	firstRelationList = list_copy(firstRelationList);
471 	List *allRestrictionList = list_concat(firstRelationList, secondRelationList);
472 
473 	foreach(relationRestrictionCell, allRestrictionList)
474 	{
475 		RelationRestriction *restriction =
476 			(RelationRestriction *) lfirst(relationRestrictionCell);
477 		int rteIdentity = GetRTEIdentity(restriction->rte);
478 
479 		/* already have the same rte, skip */
480 		if (bms_is_member(rteIdentity, rteIdentities))
481 		{
482 			continue;
483 		}
484 
485 		unionedRelationRestrictionList =
486 			lappend(unionedRelationRestrictionList, restriction);
487 
488 		rteIdentities = bms_add_member(rteIdentities, rteIdentity);
489 	}
490 
491 	RelationRestrictionContext *unionedRestrictionContext = palloc0(
492 		sizeof(RelationRestrictionContext));
493 	unionedRestrictionContext->relationRestrictionList = unionedRelationRestrictionList;
494 
495 	return unionedRelationRestrictionList;
496 }
497