1 /*-------------------------------------------------------------------------
2 *
3 * local_distributed_join_planner.c
4 *
5 * This file contains functions to convert convert local-distributed
6 * tables to subqueries so that they can be planned by the router planner.
7 *
8 *
9 * The current algorithm checks if there is any table in the `jointree` that
10 * should be converted, if so it creates conversion candidates.
11 * With conversion candidates, it will convert either a distributed table or a local table to a
12 * subquery until it is plannable by router planner. It will choose a distributed table if we
13 * expect it to return few rows, such as a constant equality filter on a unique column.
14 *
15 * ```sql
16 * -- assuming dist.a is a unique column, this will convert distributed table
17 * SELECT * FROM dist join local ON(a) where dist.a = 5;
18 * ```
19 *
20 * If the uniqueness is defined on multiple columns such as `dist.a, dist.b`
21 * then distributed table will only be chosen if there is a constant equality in all of the columns such as:
22 *
23 * ```sql
24 * SELECT * FROM dist join local ON(a) where dist.a = 5 AND dist.b =10; -- this will choose distributed table
25 * SELECT * FROM dist join local ON(a) where dist.a = 5 AND dist.b >10; -- this won't since no equality on dist.b
26 * SELECT * FROM dist join local ON(a) where dist.a = 5; -- this won't since no equality on dist.b
27 * ```
28 *
29 * The algorithm will also not favor distributed tables if there exists a
30 * distributed table which is expected to return many rows, because in that
31 * case we will already plan local tables hence there is no point in converting some distributed tables.
32 *
33 * ```sql
34 * -- here only the local table will be chosen
35 * SELECT * FROM dist_without_unique JOIN dist_with_unique USING(a) join local USING (a);
36 * ```
37 *
38 * this also makes the algorithm consistent.
39 *
40 * The algorithm can understand `OR` and `AND` expressions in the filters.
41 *
42 * There is a GUC called `local_table_join_policy` consisting of 4 modes:
43 * `none`: don't do any conversion
44 * `prefer-local`: prefer converting local tables if there is
45 * `prefer-distributed`: prefer converting distributed tables if there is
46 * `auto`: use the above mechanism to decide (constant equality on unique column)
47 *
48 * `auto` mode is the default.
49 *
50 * While converting to a subquery, we use a trick to avoid unnecessary network bandwidth,
51 * if there are columns that are not required in a table that will be converted to a subquery, We do:
52 *
53 * ```sql
54 * SELECT t.a, NULL, NULL (SELECT a FROM table) t
55 * ```
56 *
57 * instead of
58 *
59 * ```sql
60 * SELECT a, NULL, NULL FROM table
61 * ```
62 *
63 * There are NULLs in the query because we currently don't have an easy way to update the Vars
64 * that reference the non-required ones and we don't want to break the postgres query.
65 *
66 *
67 * Copyright (c) Citus Data, Inc.
68 *
69 *-------------------------------------------------------------------------
70 */
71
72 #include "postgres.h"
73
74 #include "distributed/pg_version_constants.h"
75
76 #include "funcapi.h"
77
78 #include "catalog/pg_type.h"
79 #include "catalog/pg_class.h"
80 #include "catalog/pg_index.h"
81 #include "distributed/citus_nodes.h"
82 #include "distributed/citus_ruleutils.h"
83 #include "distributed/commands.h"
84 #include "distributed/commands/multi_copy.h"
85 #include "distributed/distributed_planner.h"
86 #include "distributed/errormessage.h"
87 #include "distributed/local_distributed_join_planner.h"
88 #include "distributed/listutils.h"
89 #include "distributed/log_utils.h"
90 #include "distributed/metadata_cache.h"
91 #include "distributed/multi_logical_planner.h"
92 #include "distributed/multi_logical_optimizer.h"
93 #include "distributed/multi_router_planner.h"
94 #include "distributed/multi_physical_planner.h"
95 #include "distributed/multi_server_executor.h"
96 #include "distributed/multi_router_planner.h"
97 #include "distributed/coordinator_protocol.h"
98 #include "distributed/query_colocation_checker.h"
99 #include "distributed/query_pushdown_planning.h"
100 #include "distributed/recursive_planning.h"
101 #include "distributed/relation_restriction_equivalence.h"
102 #include "distributed/log_utils.h"
103 #include "distributed/shard_pruning.h"
104 #include "distributed/version_compat.h"
105 #include "lib/stringinfo.h"
106 #include "optimizer/clauses.h"
107 #include "optimizer/optimizer.h"
108 #include "optimizer/planner.h"
109 #include "optimizer/prep.h"
110 #include "parser/parsetree.h"
111 #include "nodes/makefuncs.h"
112 #include "nodes/nodeFuncs.h"
113 #include "nodes/nodes.h"
114 #include "nodes/nodeFuncs.h"
115 #include "nodes/pg_list.h"
116 #include "nodes/primnodes.h"
117 #include "nodes/pathnodes.h"
118 #include "utils/builtins.h"
119 #include "utils/guc.h"
120 #include "utils/lsyscache.h"
121
122 #define INVALID_RTE_IDENTITY -1
123
124 /*
125 * Managed via a GUC
126 */
127 int LocalTableJoinPolicy = LOCAL_JOIN_POLICY_AUTO;
128
129 /*
130 * RangeTableEntryDetails contains some information about
131 * a range table entry so that we don't need to calculate
132 * them over and over.
133 */
134 typedef struct RangeTableEntryDetails
135 {
136 RangeTblEntry *rangeTableEntry;
137 List *requiredAttributeNumbers;
138 bool hasConstantFilterOnUniqueColumn;
139 } RangeTableEntryDetails;
140
141 /*
142 * ConversionCandidates contains candidates that could
143 * be converted to a subquery. This is used as a convenience to
144 * first generate all the candidates and then choose which ones to convert.
145 */
146 typedef struct ConversionCandidates
147 {
148 List *distributedTableList; /* reference or distributed table */
149 List *localTableList; /* local or citus local table */
150 }ConversionCandidates;
151
152
153 /*
154 * IndexColumns contains the column numbers for an index.
155 * For example if there is an index on (a, b) then it will contain
156 * their column numbers (1,2).
157 */
158 typedef struct IndexColumns
159 {
160 List *indexColumnNos;
161 }IndexColumns;
162
163 /*
164 * ConversionChoice represents which conversion group
165 * to convert to a subquery. Currently we either convert all
166 * local tables, or distributed tables.
167 */
168 typedef enum ConversionChoice
169 {
170 CONVERT_LOCAL_TABLES = 1,
171 CONVERT_DISTRIBUTED_TABLES = 2
172 }ConversionChoice;
173
174 static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
175 RelationRestriction *relationRestriction);
176 static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
177 PlannerRestrictionContext *
178 plannerRestrictionContext);
179 static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext *
180 plannerRestrictionContext,
181 List *rangeTableList,
182 int resultRTEIdentity);
183 static void AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexes,
184 int flags);
185 static ConversionChoice GetConversionChoice(ConversionCandidates *
186 conversionCandidates,
187 PlannerRestrictionContext *
188 plannerRestrictionContext);
189 static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList);
190 static bool FirstIsSuperSetOfSecond(List *firstIntList, List *secondIntList);
191 static void ConvertRTEsToSubquery(List *rangeTableEntryDetailsList,
192 RecursivePlanningContext *context);
193 static int ResultRTEIdentity(Query *query);
194 static List * RTEListToConvert(ConversionCandidates *conversionCandidates,
195 ConversionChoice conversionChoice);
196
197
198 /*
199 * RecursivelyPlanLocalTableJoins gets a query and the planner
200 * restrictions. As long as the query is not plannable by router planner,
201 * it converts either a local or distributed table to a subquery.
202 */
203 void
RecursivelyPlanLocalTableJoins(Query * query,RecursivePlanningContext * context)204 RecursivelyPlanLocalTableJoins(Query *query,
205 RecursivePlanningContext *context)
206 {
207 PlannerRestrictionContext *plannerRestrictionContext =
208 GetPlannerRestrictionContext(context);
209
210 List *rangeTableList = query->rtable;
211 int resultRTEIdentity = ResultRTEIdentity(query);
212 ConversionCandidates *conversionCandidates =
213 CreateConversionCandidates(plannerRestrictionContext,
214 rangeTableList, resultRTEIdentity);
215
216 ConversionChoice conversionChoise =
217 GetConversionChoice(conversionCandidates, plannerRestrictionContext);
218
219
220 List *rteListToConvert = RTEListToConvert(conversionCandidates, conversionChoise);
221 ConvertRTEsToSubquery(rteListToConvert, context);
222 }
223
224
225 /*
226 * ResultRTEIdentity returns the result RTE's identity if it exists,
227 * otherwise it returns INVALID_RTE_INDENTITY
228 */
229 static int
ResultRTEIdentity(Query * query)230 ResultRTEIdentity(Query *query)
231 {
232 int resultRTEIdentity = INVALID_RTE_IDENTITY;
233 if (IsModifyCommand(query))
234 {
235 RangeTblEntry *resultRTE = ExtractResultRelationRTEOrError(query);
236 resultRTEIdentity = GetRTEIdentity(resultRTE);
237 }
238 return resultRTEIdentity;
239 }
240
241
242 /*
243 * RTEListToConvert to converts returns a list of RTEs that should
244 * be converted to a subquery.
245 */
246 static List *
RTEListToConvert(ConversionCandidates * conversionCandidates,ConversionChoice conversionChoice)247 RTEListToConvert(ConversionCandidates *conversionCandidates, ConversionChoice
248 conversionChoice)
249 {
250 List *rtesToConvert = NIL;
251 if (conversionChoice == CONVERT_LOCAL_TABLES)
252 {
253 rtesToConvert = list_concat(rtesToConvert, conversionCandidates->localTableList);
254 }
255 else
256 {
257 rtesToConvert = list_concat(rtesToConvert,
258 conversionCandidates->distributedTableList);
259 }
260 return rtesToConvert;
261 }
262
263
264 /*
265 * GetConversionChoice returns the conversion choice considering the local table
266 * join policy.
267 */
268 static ConversionChoice
GetConversionChoice(ConversionCandidates * conversionCandidates,PlannerRestrictionContext * plannerRestrictionContext)269 GetConversionChoice(ConversionCandidates *conversionCandidates,
270 PlannerRestrictionContext *plannerRestrictionContext)
271 {
272 RangeTableEntryDetails *localRTECandidate = NULL;
273 RangeTableEntryDetails *distributedRTECandidate = NULL;
274
275 if (list_length(conversionCandidates->localTableList) > 0)
276 {
277 localRTECandidate = linitial(conversionCandidates->localTableList);
278 }
279 if (list_length(conversionCandidates->distributedTableList) > 0)
280 {
281 distributedRTECandidate = linitial(conversionCandidates->distributedTableList);
282 }
283
284 if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_LOCAL)
285 {
286 return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
287 }
288 else if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_PREFER_DISTRIBUTED)
289 {
290 return distributedRTECandidate ? CONVERT_DISTRIBUTED_TABLES :
291 CONVERT_LOCAL_TABLES;
292 }
293 else
294 {
295 /*
296 * We want to convert distributed tables only if all the distributed tables
297 * have a constant filter on a unique index, otherwise we would be redundantly
298 * converting a distributed table as we will convert all the other local tables.
299 */
300 bool allRangeTableEntriesHaveUniqueIndex = AllRangeTableEntriesHaveUniqueIndex(
301 conversionCandidates->distributedTableList);
302
303 if (allRangeTableEntriesHaveUniqueIndex)
304 {
305 return distributedRTECandidate ? CONVERT_DISTRIBUTED_TABLES :
306 CONVERT_LOCAL_TABLES;
307 }
308 else
309 {
310 return localRTECandidate ? CONVERT_LOCAL_TABLES : CONVERT_DISTRIBUTED_TABLES;
311 }
312 }
313 }
314
315
316 /*
317 * ConvertRTEsToSubquery converts all the given range table entries
318 * to a subquery.
319 */
320 static void
ConvertRTEsToSubquery(List * rangeTableEntryDetailsList,RecursivePlanningContext * context)321 ConvertRTEsToSubquery(List *rangeTableEntryDetailsList, RecursivePlanningContext *context)
322 {
323 RangeTableEntryDetails *rangeTableEntryDetails = NULL;
324 foreach_ptr(rangeTableEntryDetails, rangeTableEntryDetailsList)
325 {
326 RangeTblEntry *rangeTableEntry = rangeTableEntryDetails->rangeTableEntry;
327 List *requiredAttributeNumbers = rangeTableEntryDetails->requiredAttributeNumbers;
328 ReplaceRTERelationWithRteSubquery(rangeTableEntry,
329 requiredAttributeNumbers, context);
330 }
331 }
332
333
334 /*
335 * AllRangeTableEntriesHaveUniqueIndex returns true if all of the RTE's in the given
336 * list have a unique index.
337 */
338 static bool
AllRangeTableEntriesHaveUniqueIndex(List * rangeTableEntryDetailsList)339 AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList)
340 {
341 RangeTableEntryDetails *rangeTableEntryDetails = NULL;
342 foreach_ptr(rangeTableEntryDetails, rangeTableEntryDetailsList)
343 {
344 if (!rangeTableEntryDetails->hasConstantFilterOnUniqueColumn)
345 {
346 return false;
347 }
348 }
349 return true;
350 }
351
352
353 /*
354 * ShouldConvertLocalTableJoinsToSubqueries returns true if we should
355 * convert local-dist table joins to subqueries.
356 */
357 bool
ShouldConvertLocalTableJoinsToSubqueries(List * rangeTableList)358 ShouldConvertLocalTableJoinsToSubqueries(List *rangeTableList)
359 {
360 if (LocalTableJoinPolicy == LOCAL_JOIN_POLICY_NEVER)
361 {
362 /* user doesn't want Citus to enable local table joins */
363 return false;
364 }
365
366 if (!ContainsLocalTableDistributedTableJoin(rangeTableList))
367 {
368 return false;
369 }
370 return true;
371 }
372
373
374 /*
375 * HasConstantFilterOnUniqueColumn returns true if the given rangeTableEntry has a constant
376 * filter on a unique column.
377 */
378 static bool
HasConstantFilterOnUniqueColumn(RangeTblEntry * rangeTableEntry,RelationRestriction * relationRestriction)379 HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
380 RelationRestriction *relationRestriction)
381 {
382 if (rangeTableEntry == NULL || relationRestriction == NULL)
383 {
384 /*
385 * Postgres might not pass relationRestriction info with hooks if
386 * the table doesn't contribute to the result, and in that case
387 * relationRestriction will be NULL. Ideally it doesn't make sense
388 * to recursively plan such tables but for the time being we don't
389 * add any special logic for these tables as it might introduce bugs.
390 */
391 return false;
392 }
393
394 bool joinOnFalse = JoinConditionIsOnFalse(relationRestriction->relOptInfo->joininfo);
395 if (joinOnFalse)
396 {
397 /* If there is a WHERE FALSE, we consider it as a constant filter. */
398 return true;
399 }
400
401 List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
402 List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
403
404 List *rteEqualityColumnsNos =
405 FetchEqualityAttrNumsForRTE((Node *) restrictClauseList);
406
407 List *uniqueIndexColumnsList = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
408 AppendUniqueIndexColumnsToList,
409 INCLUDE_INDEX_ALL_STATEMENTS);
410 IndexColumns *indexColumns = NULL;
411 foreach_ptr(indexColumns, uniqueIndexColumnsList)
412 {
413 List *uniqueIndexColumnNos = indexColumns->indexColumnNos;
414 if (FirstIsSuperSetOfSecond(rteEqualityColumnsNos,
415 uniqueIndexColumnNos))
416 {
417 return true;
418 }
419 }
420 return false;
421 }
422
423
424 /*
425 * FirstIsSuperSetOfSecond returns true if the first int List
426 * contains every element of the second int List.
427 */
428 static bool
FirstIsSuperSetOfSecond(List * firstIntList,List * secondIntList)429 FirstIsSuperSetOfSecond(List *firstIntList, List *secondIntList)
430 {
431 int curInt = 0;
432 foreach_int(curInt, secondIntList)
433 {
434 if (!list_member_int(firstIntList, curInt))
435 {
436 return false;
437 }
438 }
439 return true;
440 }
441
442
443 /*
444 * AppendUniqueIndexColumnsToList adds the given index's column numbers if it is a
445 * unique index.
446 */
447 static void
AppendUniqueIndexColumnsToList(Form_pg_index indexForm,List ** uniqueIndexGroups,int flags)448 AppendUniqueIndexColumnsToList(Form_pg_index indexForm, List **uniqueIndexGroups,
449 int flags)
450 {
451 if (indexForm->indisunique || indexForm->indisprimary)
452 {
453 IndexColumns *indexColumns = palloc0(sizeof(IndexColumns));
454 List *uniqueIndexes = NIL;
455 for (int i = 0; i < indexForm->indkey.dim1; i++)
456 {
457 uniqueIndexes = list_append_unique_int(uniqueIndexes,
458 indexForm->indkey.values[i]);
459 }
460 if (list_length(uniqueIndexes) == 0)
461 {
462 return;
463 }
464 indexColumns->indexColumnNos = uniqueIndexes;
465 *uniqueIndexGroups = lappend(*uniqueIndexGroups, indexColumns);
466 }
467 }
468
469
470 /*
471 * RequiredAttrNumbersForRelation returns the required attribute numbers for
472 * the input RTE relation in order for the planning to succeed.
473 *
474 * The function could be optimized by not adding the columns that only appear
475 * WHERE clause as a filter (e.g., not a join clause).
476 */
477 static List *
RequiredAttrNumbersForRelation(RangeTblEntry * rangeTableEntry,PlannerRestrictionContext * plannerRestrictionContext)478 RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry,
479 PlannerRestrictionContext *plannerRestrictionContext)
480 {
481 RelationRestriction *relationRestriction =
482 RelationRestrictionForRelation(rangeTableEntry, plannerRestrictionContext);
483
484 if (relationRestriction == NULL)
485 {
486 return NIL;
487 }
488
489 PlannerInfo *plannerInfo = relationRestriction->plannerInfo;
490
491 /*
492 * Here we used the query from plannerInfo because it has the optimizations
493 * so that it doesn't have unnecessary columns. The original query doesn't have
494 * some of these optimizations hence if we use it here, we don't get the
495 * 'required' attributes.
496 */
497 Query *queryToProcess = plannerInfo->parse;
498 int rteIndex = relationRestriction->index;
499
500 List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0);
501
502 List *requiredAttrNumbers = NIL;
503
504 Var *var = NULL;
505 foreach_ptr(var, allVarsInQuery)
506 {
507 if (var->varno == rteIndex)
508 {
509 requiredAttrNumbers = list_append_unique_int(requiredAttrNumbers,
510 var->varattno);
511 }
512 }
513
514 return requiredAttrNumbers;
515 }
516
517
518 /*
519 * CreateConversionCandidates creates the conversion candidates that might
520 * be converted to a subquery so that citus planners can work.
521 */
522 static ConversionCandidates *
CreateConversionCandidates(PlannerRestrictionContext * plannerRestrictionContext,List * rangeTableList,int resultRTEIdentity)523 CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
524 List *rangeTableList, int resultRTEIdentity)
525 {
526 ConversionCandidates *conversionCandidates =
527 palloc0(sizeof(ConversionCandidates));
528
529
530 RangeTblEntry *rangeTableEntry = NULL;
531 foreach_ptr(rangeTableEntry, rangeTableList)
532 {
533 /* we're only interested in tables */
534 if (!IsRecursivelyPlannableRelation(rangeTableEntry))
535 {
536 continue;
537 }
538
539 int rteIdentity = GetRTEIdentity(rangeTableEntry);
540
541 /* result relation cannot converted to a subquery */
542 if (resultRTEIdentity == rteIdentity)
543 {
544 continue;
545 }
546
547 RelationRestriction *relationRestriction =
548 RelationRestrictionForRelation(rangeTableEntry, plannerRestrictionContext);
549
550 RangeTableEntryDetails *rangeTableEntryDetails =
551 palloc0(sizeof(RangeTableEntryDetails));
552
553 rangeTableEntryDetails->rangeTableEntry = rangeTableEntry;
554 rangeTableEntryDetails->requiredAttributeNumbers =
555 RequiredAttrNumbersForRelation(rangeTableEntry, plannerRestrictionContext);
556 rangeTableEntryDetails->hasConstantFilterOnUniqueColumn =
557 HasConstantFilterOnUniqueColumn(rangeTableEntry, relationRestriction);
558
559 bool referenceOrDistributedTable =
560 IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE) ||
561 IsCitusTableType(rangeTableEntry->relid, DISTRIBUTED_TABLE);
562 if (referenceOrDistributedTable)
563 {
564 conversionCandidates->distributedTableList =
565 lappend(conversionCandidates->distributedTableList,
566 rangeTableEntryDetails);
567 }
568 else
569 {
570 conversionCandidates->localTableList =
571 lappend(conversionCandidates->localTableList,
572 rangeTableEntryDetails);
573 }
574 }
575 return conversionCandidates;
576 }
577