1 /*-------------------------------------------------------------------------
2  *
3  * local_distributed_join_planner.c
4  *
5  * This file contains functions to convert convert local-distributed
6  * tables to subqueries so that they can be planned by the router planner.
7  *
8  *
9  * The current algorithm checks if there is any table in the `jointree` that
10  * should be converted, if so it creates conversion candidates.
11  * With conversion candidates, it will convert either a distributed table or a local table to a
12  * subquery until it is plannable by router planner. It will choose a distributed table if we
13  * expect it to return few rows, such as a constant equality filter on a unique column.
14  *
15  * ```sql
16  * -- assuming dist.a is a unique column, this will convert distributed table
17  * SELECT * FROM dist join local ON(a) where dist.a = 5;
18  * ```
19  *
20  * If the uniqueness is defined on multiple columns such as `dist.a, dist.b`
21  * then distributed table will only be chosen if there is a constant equality in all of the columns such as:
22  *
23  * ```sql
24  * SELECT * FROM dist join local ON(a) where dist.a = 5 AND dist.b =10; -- this will choose distributed table
25  * SELECT * FROM dist join local ON(a) where dist.a = 5 AND dist.b >10; -- this won't since no equality on dist.b
26  * SELECT * FROM dist join local ON(a) where dist.a = 5; -- this won't since no equality on dist.b
27  * ```
28  *
29  * The algorithm will also not favor distributed tables if there exists a
30  * distributed table which is expected to return many rows, because in that
31  * case we will already plan local tables hence there is no point in converting some distributed tables.
32  *
33  * ```sql
34  * -- here only the local table will be chosen
35  * SELECT * FROM dist_without_unique JOIN dist_with_unique USING(a) join local USING (a);
36  * ```
37  *
38  * this also makes the algorithm consistent.
39  *
40  * The algorithm can understand `OR` and `AND` expressions in the filters.
41  *
42  * There is a GUC called `local_table_join_policy` consisting of 4 modes:
43  * `none`: don't do any conversion
44  * `prefer-local`: prefer converting local tables if there is
45  * `prefer-distributed`: prefer converting distributed tables if there is
46  * `auto`: use the above mechanism to decide (constant equality on unique column)
47  *
48  * `auto` mode is the default.
49  *
50  * While converting to a subquery, we use a trick to avoid unnecessary network bandwidth,
51  * if there are columns that are not required in a table that will be converted to a subquery, We do:
52  *
53  * ```sql
54  * SELECT t.a, NULL, NULL (SELECT a FROM table) t
55  * ```
56  *
57  * instead of
58  *
59  * ```sql
60  * SELECT a, NULL, NULL FROM table
61  * ```
62  *
63  * There are NULLs in the query because we currently don't have an easy way to update the Vars
64  * that reference the non-required ones and we don't want to break the postgres query.
65  *
66  *
67  * Copyright (c) Citus Data, Inc.
68  *
69  *-------------------------------------------------------------------------
70  */
71 
72 #include "postgres.h"
73 
74 #include "distributed/pg_version_constants.h"
75 
76 #include "funcapi.h"
77 
78 #include "catalog/pg_type.h"
79 #include "catalog/pg_class.h"
80 #include "catalog/pg_index.h"
81 #include "distributed/citus_nodes.h"
82 #include "distributed/citus_ruleutils.h"
83 #include "distributed/commands.h"
84 #include "distributed/commands/multi_copy.h"
85 #include "distributed/distributed_planner.h"
86 #include "distributed/errormessage.h"
87 #include "distributed/local_distributed_join_planner.h"
88 #include "distributed/listutils.h"
89 #include "distributed/log_utils.h"
90 #include "distributed/metadata_cache.h"
91 #include "distributed/multi_logical_planner.h"
92 #include "distributed/multi_logical_optimizer.h"
93 #include "distributed/multi_router_planner.h"
94 #include "distributed/multi_physical_planner.h"
95 #include "distributed/multi_server_executor.h"
96 #include "distributed/multi_router_planner.h"
97 #include "distributed/coordinator_protocol.h"
98 #include "distributed/query_colocation_checker.h"
99 #include "distributed/query_pushdown_planning.h"
100 #include "distributed/recursive_planning.h"
101 #include "distributed/relation_restriction_equivalence.h"
102 #include "distributed/log_utils.h"
103 #include "distributed/shard_pruning.h"
104 #include "distributed/version_compat.h"
105 #include "lib/stringinfo.h"
106 #include "optimizer/clauses.h"
107 #include "optimizer/optimizer.h"
108 #include "optimizer/planner.h"
109 #include "optimizer/prep.h"
110 #include "parser/parsetree.h"
111 #include "nodes/makefuncs.h"
112 #include "nodes/nodeFuncs.h"
113 #include "nodes/nodes.h"
114 #include "nodes/nodeFuncs.h"
115 #include "nodes/pg_list.h"
116 #include "nodes/primnodes.h"
117 #include "nodes/pathnodes.h"
118 #include "utils/builtins.h"
119 #include "utils/guc.h"
120 #include "utils/lsyscache.h"
121 
122 #define INVALID_RTE_IDENTITY -1
123 
124 /*
125  * Managed via a GUC
126  */
127 int LocalTableJoinPolicy = LOCAL_JOIN_POLICY_AUTO;
128 
129 /*
130  * RangeTableEntryDetails contains some information about
131  * a range table entry so that we don't need to calculate
132  * them over and over.
133  */
134 typedef struct RangeTableEntryDetails
135 {
136 	RangeTblEntry *rangeTableEntry;
137 	List *requiredAttributeNumbers;
138 	bool hasConstantFilterOnUniqueColumn;
139 } RangeTableEntryDetails;
140 
141 /*
142  * ConversionCandidates contains candidates that could
143  * be converted to a subquery. This is used as a convenience to
144  * first generate all the candidates and then choose which ones to convert.
145  */
146 typedef struct ConversionCandidates
147 {
148 	List *distributedTableList; /* reference or distributed table */
149 	List *localTableList; /* local or citus local table */
150 }ConversionCandidates;
151 
152 
153 /*
154  * IndexColumns contains the column numbers for an index.
155  * For example if there is an index on (a, b) then it will contain
156  * their column numbers (1,2).
157  */
158 typedef struct IndexColumns
159 {
160 	List *indexColumnNos;
161 }IndexColumns;
162 
163 /*
164  * ConversionChoice represents which conversion group
165  * to convert to a subquery. Currently we either convert all
166  * local tables, or distributed tables.
167  */
168 typedef enum ConversionChoice
169 {
170 	CONVERT_LOCAL_TABLES = 1,
171 	CONVERT_DISTRIBUTED_TABLES = 2
172 }ConversionChoice;
173 
174 static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
175 											RelationRestriction *relationRestriction);
176 static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
177 											 PlannerRestrictionContext *
178 											 plannerRestrictionContext);
179 static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext *
180 														 plannerRestrictionContext,
181 														 List *rangeTableList,
182 														 int resultRTEIdentity);
183 static void AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexes,
184 										   int flags);
185 static ConversionChoice GetConversionChoice(ConversionCandidates *
186 											conversionCandidates,
187 											PlannerRestrictionContext *
188 											plannerRestrictionContext);
189 static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList);
190 static bool FirstIsSuperSetOfSecond(List *firstIntList, List *secondIntList);
191 static void ConvertRTEsToSubquery(List *rangeTableEntryDetailsList,
192 								  RecursivePlanningContext *context);
193 static int ResultRTEIdentity(Query *query);
194 static List * RTEListToConvert(ConversionCandidates *conversionCandidates,
195 							   ConversionChoice conversionChoice);
196 
197 
198 /*
199  * RecursivelyPlanLocalTableJoins gets a query and the planner
200  * restrictions. As long as the query is not plannable by router planner,
201  * it converts either a local or distributed table to a subquery.
202  */
203 void
RecursivelyPlanLocalTableJoins(Query * query,RecursivePlanningContext * context)204 RecursivelyPlanLocalTableJoins(Query *query,
205 							   RecursivePlanningContext *context)
206 {
207 	PlannerRestrictionContext *plannerRestrictionContext =
208 		GetPlannerRestrictionContext(context);
209 
210 	List *rangeTableList = query->rtable;
211 	int resultRTEIdentity = ResultRTEIdentity(query);
212 	ConversionCandidates *conversionCandidates =
213 		CreateConversionCandidates(plannerRestrictionContext,
214 								   rangeTableList, resultRTEIdentity);
215 
216 	ConversionChoice conversionChoise =
217 		GetConversionChoice(conversionCandidates, plannerRestrictionContext);
218 
219 
220 	List *rteListToConvert = RTEListToConvert(conversionCandidates, conversionChoise);
221 	ConvertRTEsToSubquery(rteListToConvert, context);
222 }
223 
224 
225 /*
226  * ResultRTEIdentity returns the result RTE's identity if it exists,
227  * otherwise it returns INVALID_RTE_INDENTITY
228  */
229 static int
ResultRTEIdentity(Query * query)230 ResultRTEIdentity(Query *query)
231 {
232 	int resultRTEIdentity = INVALID_RTE_IDENTITY;
233 	if (IsModifyCommand(query))
234 	{
235 		RangeTblEntry *resultRTE = ExtractResultRelationRTEOrError(query);
236 		resultRTEIdentity = GetRTEIdentity(resultRTE);
237 	}
238 	return resultRTEIdentity;
239 }
240 
241 
242 /*
243  * RTEListToConvert to converts returns a list of RTEs that should
244  * be converted to a subquery.
245  */
246 static List *
RTEListToConvert(ConversionCandidates * conversionCandidates,ConversionChoice conversionChoice)247 RTEListToConvert(ConversionCandidates *conversionCandidates, ConversionChoice
248 				 conversionChoice)
249 {
250 	List *rtesToConvert = NIL;
251 	if (conversionChoice == CONVERT_LOCAL_TABLES)
252 	{
253 		rtesToConvert = list_concat(rtesToConvert, conversionCandidates->localTableList);
254 	}
255 	else
256 	{
257 		rtesToConvert = list_concat(rtesToConvert,
258 									conversionCandidates->distributedTableList);
259 	}
260 	return rtesToConvert;
261 }
262 
263 
264 /*
265  * GetConversionChoice returns the conversion choice considering the local table
266  * join policy.
267  */
268 static ConversionChoice
GetConversionChoice(ConversionCandidates * conversionCandidates,PlannerRestrictionContext * plannerRestrictionContext)269 GetConversionChoice(ConversionCandidates *conversionCandidates,
270 					PlannerRestrictionContext *plannerRestrictionContext)
271 {
272 	RangeTableEntryDetails *localRTECandidate = NULL;
273 	RangeTableEntryDetails *distributedRTECandidate = NULL;
274 
275 	if (list_length(conversionCandidates->localTableList) > 0)
276 	{
277 		localRTECandidate = linitial(conversionCandidates->localTableList);
278 	}
279 	if (list_length(conversionCandidates->distributedTableList) > 0)
280 	{
281 		distributedRTECandidate = linitial(conversionCandidates->distributedTableList);
282 	}
283 
284 	if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL)
285 	{
286 		return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
287 	}
288 	else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED)
289 	{
290 		return distributedRTECandidate ? CONVERT_DISTRIBUTED_TABLES :
291 			   CONVERT_LOCAL_TABLES;
292 	}
293 	else
294 	{
295 		/*
296 		 * We want to convert distributed tables only if all the distributed tables
297 		 * have a constant filter on a unique index, otherwise we would be redundantly
298 		 * converting a distributed table as we will convert all the other local tables.
299 		 */
300 		bool allRangeTableEntriesHaveUniqueIndex = AllRangeTableEntriesHaveUniqueIndex(
301 			conversionCandidates->distributedTableList);
302 
303 		if (allRangeTableEntriesHaveUniqueIndex)
304 		{
305 			return distributedRTECandidate ? CONVERT_DISTRIBUTED_TABLES :
306 				   CONVERT_LOCAL_TABLES;
307 		}
308 		else
309 		{
310 			return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
311 		}
312 	}
313 }
314 
315 
316 /*
317  * ConvertRTEsToSubquery converts all the given range table entries
318  * to a subquery.
319  */
320 static void
ConvertRTEsToSubquery(List * rangeTableEntryDetailsList,RecursivePlanningContext * context)321 ConvertRTEsToSubquery(List *rangeTableEntryDetailsList, RecursivePlanningContext *context)
322 {
323 	RangeTableEntryDetails *rangeTableEntryDetails = NULL;
324 	foreach_ptr(rangeTableEntryDetails, rangeTableEntryDetailsList)
325 	{
326 		RangeTblEntry *rangeTableEntry = rangeTableEntryDetails->rangeTableEntry;
327 		List *requiredAttributeNumbers = rangeTableEntryDetails->requiredAttributeNumbers;
328 		ReplaceRTERelationWithRteSubquery(rangeTableEntry,
329 										  requiredAttributeNumbers, context);
330 	}
331 }
332 
333 
334 /*
335  * AllRangeTableEntriesHaveUniqueIndex returns true if all of the RTE's in the given
336  * list have a unique index.
337  */
338 static bool
AllRangeTableEntriesHaveUniqueIndex(List * rangeTableEntryDetailsList)339 AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList)
340 {
341 	RangeTableEntryDetails *rangeTableEntryDetails = NULL;
342 	foreach_ptr(rangeTableEntryDetails, rangeTableEntryDetailsList)
343 	{
344 		if (!rangeTableEntryDetails->hasConstantFilterOnUniqueColumn)
345 		{
346 			return false;
347 		}
348 	}
349 	return true;
350 }
351 
352 
353 /*
354  * ShouldConvertLocalTableJoinsToSubqueries returns true if we should
355  * convert local-dist table joins to subqueries.
356  */
357 bool
ShouldConvertLocalTableJoinsToSubqueries(List * rangeTableList)358 ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList)
359 {
360 	if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER)
361 	{
362 		/* user doesn't want Citus to enable local table joins */
363 		return false;
364 	}
365 
366 	if (!ContainsLocalTableDistributedTableJoin(rangeTableList))
367 	{
368 		return false;
369 	}
370 	return true;
371 }
372 
373 
374 /*
375  * HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant
376  * filter on a unique column.
377  */
378 static bool
HasConstantFilterOnUniqueColumn(RangeTblEntry * rangeTableEntry,RelationRestriction * relationRestriction)379 HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
380 								RelationRestriction *relationRestriction)
381 {
382 	if (rangeTableEntry == NULL || relationRestriction == NULL)
383 	{
384 		/*
385 		 * Postgres might not pass relationRestriction info with hooks if
386 		 * the table doesn't contribute to the result, and in that case
387 		 * relationRestriction will be NULL. Ideally it doesn't make sense
388 		 * to recursively plan such tables but for the time being we don't
389 		 * add any special logic for these tables as it might introduce bugs.
390 		 */
391 		return false;
392 	}
393 
394 	bool joinOnFalse = JoinConditionIsOnFalse(relationRestriction->relOptInfo->joininfo);
395 	if (joinOnFalse)
396 	{
397 		/* If there is a WHERE FALSE, we consider it as a constant filter. */
398 		return true;
399 	}
400 
401 	List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
402 	List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
403 
404 	List *rteEqualityColumnsNos =
405 		FetchEqualityAttrNumsForRTE((Node *) restrictClauseList);
406 
407 	List *uniqueIndexColumnsList = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
408 																   AppendUniqueIndexColumnsToList,
409 																   INCLUDE_INDEX_ALL_STATEMENTS);
410 	IndexColumns *indexColumns = NULL;
411 	foreach_ptr(indexColumns, uniqueIndexColumnsList)
412 	{
413 		List *uniqueIndexColumnNos = indexColumns->indexColumnNos;
414 		if (FirstIsSuperSetOfSecond(rteEqualityColumnsNos,
415 									uniqueIndexColumnNos))
416 		{
417 			return true;
418 		}
419 	}
420 	return false;
421 }
422 
423 
424 /*
425  * FirstIsSuperSetOfSecond returns true if the first int List
426  * contains every element of the second int List.
427  */
428 static bool
FirstIsSuperSetOfSecond(List * firstIntList,List * secondIntList)429 FirstIsSuperSetOfSecond(List *firstIntList, List *secondIntList)
430 {
431 	int curInt = 0;
432 	foreach_int(curInt, secondIntList)
433 	{
434 		if (!list_member_int(firstIntList, curInt))
435 		{
436 			return false;
437 		}
438 	}
439 	return true;
440 }
441 
442 
443 /*
444  * AppendUniqueIndexColumnsToList adds the given index's column numbers if it is a
445  * unique index.
446  */
447 static void
AppendUniqueIndexColumnsToList(Form_pg_index indexForm,List ** uniqueIndexGroups,int flags)448 AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexGroups,
449 							   int flags)
450 {
451 	if (indexForm->indisunique || indexForm->indisprimary)
452 	{
453 		IndexColumns *indexColumns = palloc0(sizeof(IndexColumns));
454 		List *uniqueIndexes = NIL;
455 		for (int i = 0; i < indexForm->indkey.dim1; i++)
456 		{
457 			uniqueIndexes = list_append_unique_int(uniqueIndexes,
458 												   indexForm->indkey.values[i]);
459 		}
460 		if (list_length(uniqueIndexes) == 0)
461 		{
462 			return;
463 		}
464 		indexColumns->indexColumnNos = uniqueIndexes;
465 		*uniqueIndexGroups = lappend(*uniqueIndexGroups, indexColumns);
466 	}
467 }
468 
469 
470 /*
471  * RequiredAttrNumbersForRelation returns the required attribute numbers for
472  * the input RTE relation in order for the planning to succeed.
473  *
474  * The function could be optimized by not adding the columns that only appear
475  * WHERE clause as a filter (e.g., not a join clause).
476  */
477 static List *
RequiredAttrNumbersForRelation(RangeTblEntry * rangeTableEntry,PlannerRestrictionContext * plannerRestrictionContext)478 RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry,
479 							   PlannerRestrictionContext *plannerRestrictionContext)
480 {
481 	RelationRestriction *relationRestriction =
482 		RelationRestrictionForRelation(rangeTableEntry, plannerRestrictionContext);
483 
484 	if (relationRestriction == NULL)
485 	{
486 		return NIL;
487 	}
488 
489 	PlannerInfo *plannerInfo = relationRestriction->plannerInfo;
490 
491 	/*
492 	 * Here we used the query from plannerInfo because it has the optimizations
493 	 * so that it doesn't have unnecessary columns. The original query doesn't have
494 	 * some of these optimizations hence if we use it here, we don't get the
495 	 * 'required' attributes.
496 	 */
497 	Query *queryToProcess = plannerInfo->parse;
498 	int rteIndex = relationRestriction->index;
499 
500 	List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0);
501 
502 	List *requiredAttrNumbers = NIL;
503 
504 	Var *var = NULL;
505 	foreach_ptr(var, allVarsInQuery)
506 	{
507 		if (var->varno == rteIndex)
508 		{
509 			requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers,
510 														 var->varattno);
511 		}
512 	}
513 
514 	return requiredAttrNumbers;
515 }
516 
517 
518 /*
519  * CreateConversionCandidates creates the conversion candidates that might
520  * be converted to a subquery so that citus planners can work.
521  */
522 static ConversionCandidates *
CreateConversionCandidates(PlannerRestrictionContext * plannerRestrictionContext,List * rangeTableList,int resultRTEIdentity)523 CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
524 						   List *rangeTableList, int resultRTEIdentity)
525 {
526 	ConversionCandidates *conversionCandidates =
527 		palloc0(sizeof(ConversionCandidates));
528 
529 
530 	RangeTblEntry *rangeTableEntry = NULL;
531 	foreach_ptr(rangeTableEntry, rangeTableList)
532 	{
533 		/* we're only interested in tables */
534 		if (!IsRecursivelyPlannableRelation(rangeTableEntry))
535 		{
536 			continue;
537 		}
538 
539 		int rteIdentity = GetRTEIdentity(rangeTableEntry);
540 
541 		/* result relation cannot converted to a subquery */
542 		if (resultRTEIdentity == rteIdentity)
543 		{
544 			continue;
545 		}
546 
547 		RelationRestriction *relationRestriction =
548 			RelationRestrictionForRelation(rangeTableEntry, plannerRestrictionContext);
549 
550 		RangeTableEntryDetails *rangeTableEntryDetails =
551 			palloc0(sizeof(RangeTableEntryDetails));
552 
553 		rangeTableEntryDetails->rangeTableEntry = rangeTableEntry;
554 		rangeTableEntryDetails->requiredAttributeNumbers =
555 			RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext);
556 		rangeTableEntryDetails->hasConstantFilterOnUniqueColumn =
557 			HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction);
558 
559 		bool referenceOrDistributedTable =
560 			IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) ||
561 			IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE);
562 		if (referenceOrDistributedTable)
563 		{
564 			conversionCandidates->distributedTableList =
565 				lappend(conversionCandidates->distributedTableList,
566 						rangeTableEntryDetails);
567 		}
568 		else
569 		{
570 			conversionCandidates->localTableList =
571 				lappend(conversionCandidates->localTableList,
572 						rangeTableEntryDetails);
573 		}
574 	}
575 	return conversionCandidates;
576 }
577