1 /*-------------------------------------------------------------------------
2 *
3 * distributed_planner.c
4 * General Citus planner code.
5 *
6 * Copyright (c) Citus Data, Inc.
7 *-------------------------------------------------------------------------
8 */
9
10 #include "postgres.h"
11
12 #include "distributed/pg_version_constants.h"
13
14 #include "funcapi.h"
15
16 #include <float.h>
17 #include <limits.h>
18
19 #include "access/htup_details.h"
20 #include "catalog/pg_class.h"
21 #include "catalog/pg_proc.h"
22 #include "catalog/pg_type.h"
23 #include "distributed/citus_nodefuncs.h"
24 #include "distributed/citus_nodes.h"
25 #include "distributed/citus_ruleutils.h"
26 #include "distributed/cte_inline.h"
27 #include "distributed/function_call_delegation.h"
28 #include "distributed/insert_select_planner.h"
29 #include "distributed/intermediate_result_pruning.h"
30 #include "distributed/intermediate_results.h"
31 #include "distributed/listutils.h"
32 #include "distributed/coordinator_protocol.h"
33 #include "distributed/metadata_cache.h"
34 #include "distributed/multi_executor.h"
35 #include "distributed/distributed_planner.h"
36 #include "distributed/query_pushdown_planning.h"
37 #include "distributed/multi_logical_optimizer.h"
38 #include "distributed/multi_logical_planner.h"
39 #include "distributed/multi_partitioning_utils.h"
40 #include "distributed/multi_physical_planner.h"
41 #include "distributed/combine_query_planner.h"
42 #include "distributed/multi_router_planner.h"
43 #include "distributed/query_utils.h"
44 #include "distributed/recursive_planning.h"
45 #include "distributed/shardinterval_utils.h"
46 #include "distributed/shard_utils.h"
47 #include "distributed/version_compat.h"
48 #include "distributed/worker_shard_visibility.h"
49 #include "executor/executor.h"
50 #include "nodes/makefuncs.h"
51 #include "nodes/nodeFuncs.h"
52 #include "nodes/pg_list.h"
53 #include "parser/parsetree.h"
54 #include "parser/parse_type.h"
55 #include "optimizer/optimizer.h"
56 #include "optimizer/plancat.h"
57 #include "optimizer/pathnode.h"
58 #include "optimizer/planner.h"
59 #include "optimizer/planmain.h"
60 #include "utils/builtins.h"
61 #include "utils/datum.h"
62 #include "utils/lsyscache.h"
63 #include "utils/memutils.h"
64 #include "utils/syscache.h"
65
66
67 static List *plannerRestrictionContextList = NIL;
68 int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
69 static uint64 NextPlanId = 1;
70
71 /* keep track of planner call stack levels */
72 int PlannerLevel = 0;
73
74 static bool ListContainsDistributedTableRTE(List *rangeTableList);
75 static bool IsUpdateOrDelete(Query *query);
76 static PlannedStmt * CreateDistributedPlannedStmt(
77 DistributedPlanningContext *planContext);
78 static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
79 DistributedPlanningContext
80 *planContext);
81 static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
82 Query *originalQuery,
83 Query *query, ParamListInfo
84 boundParams,
85 PlannerRestrictionContext *
86 plannerRestrictionContext);
87 static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
88 relationId);
89
90 static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter);
91 static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
92 static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
93 bool setPartitionedTablesInherited);
94 static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
95 DistributedPlan *distributedPlan,
96 CustomScan *customScan);
97 static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
98 static AppendRelInfo * FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex);
99 static List * makeTargetListFromCustomScanList(List *custom_scan_tlist);
100 static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist);
101 static int32 BlessRecordExpressionList(List *exprs);
102 static void CheckNodeIsDumpable(Node *node);
103 static Node * CheckNodeCopyAndSerialization(Node *node);
104 static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
105 RelOptInfo *relOptInfo);
106 static void AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
107 RelOptInfo *relOptInfo);
108 static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
109 List *columnTypes,
110 int resultIdCount,
111 Datum *resultIds,
112 Const *resultFormatConst);
113 static List * OuterPlanParamsList(PlannerInfo *root);
114 static List * CopyPlanParamList(List *originalPlanParamList);
115 static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
116 static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
117 static void PopPlannerRestrictionContext(void);
118 static void ResetPlannerRestrictionContext(
119 PlannerRestrictionContext *plannerRestrictionContext);
120 static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
121 Node *distributionKeyValue);
122 static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
123 int rteIdCounter);
124 static RTEListProperties * GetRTEListProperties(List *rangeTableList);
125 static List * TranslatedVars(PlannerInfo *root, int relationIndex);
126
127
128 /* Distributed planner hook */
129 PlannedStmt *
distributed_planner(Query * parse,const char * query_string,int cursorOptions,ParamListInfo boundParams)130 distributed_planner(Query *parse,
131 #if PG_VERSION_NUM >= PG_VERSION_13
132 const char *query_string,
133 #endif
134 int cursorOptions,
135 ParamListInfo boundParams)
136 {
137 bool needsDistributedPlanning = false;
138 bool fastPathRouterQuery = false;
139 Node *distributionKeyValue = NULL;
140
141 List *rangeTableList = ExtractRangeTableEntryList(parse);
142
143 if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
144 {
145 /* this cursor flag could only be set when Citus has been loaded */
146 Assert(CitusHasBeenLoaded());
147
148 needsDistributedPlanning = true;
149 }
150 else if (CitusHasBeenLoaded())
151 {
152 needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
153 if (needsDistributedPlanning)
154 {
155 fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
156 }
157 }
158
159 int rteIdCounter = 1;
160
161 DistributedPlanningContext planContext = {
162 .query = parse,
163 .cursorOptions = cursorOptions,
164 .boundParams = boundParams,
165 };
166
167 if (needsDistributedPlanning)
168 {
169 /*
170 * standard_planner scribbles on it's input, but for deparsing we need the
171 * unmodified form. Before copying we call AssignRTEIdentities to be able
172 * to match RTEs in the rewritten query tree with those in the original
173 * tree.
174 */
175 rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
176
177 planContext.originalQuery = copyObject(parse);
178
179 /*
180 * When there are partitioned tables (not applicable to fast path),
181 * pretend that they are regular tables to avoid unnecessary work
182 * in standard_planner.
183 */
184 if (!fastPathRouterQuery)
185 {
186 bool setPartitionedTablesInherited = false;
187 AdjustPartitioningForDistributedPlanning(rangeTableList,
188 setPartitionedTablesInherited);
189 }
190 }
191
192 /*
193 * Make sure that we hide shard names on the Citus MX worker nodes. See comments in
194 * ReplaceTableVisibleFunction() for the details.
195 */
196 ReplaceTableVisibleFunction((Node *) parse);
197
198 /* create a restriction context and put it at the end if context list */
199 planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
200
201 /*
202 * We keep track of how many times we've recursed into the planner, primarily
203 * to detect whether we are in a function call. We need to make sure that the
204 * PlannerLevel is decremented exactly once at the end of the next PG_TRY
205 * block, both in the happy case and when an error occurs.
206 */
207 PlannerLevel++;
208
209 PlannedStmt *result = NULL;
210
211 PG_TRY();
212 {
213 if (fastPathRouterQuery)
214 {
215 result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
216 }
217 else
218 {
219 /*
220 * Call into standard_planner because the Citus planner relies on both the
221 * restriction information per table and parse tree transformations made by
222 * postgres' planner.
223 */
224 planContext.plan = standard_planner_compat(planContext.query,
225 planContext.cursorOptions,
226 planContext.boundParams);
227 if (needsDistributedPlanning)
228 {
229 result = PlanDistributedStmt(&planContext, rteIdCounter);
230 }
231 else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
232 {
233 result = planContext.plan;
234 }
235 }
236 }
237 PG_CATCH();
238 {
239 PopPlannerRestrictionContext();
240
241 PlannerLevel--;
242
243 PG_RE_THROW();
244 }
245 PG_END_TRY();
246
247 PlannerLevel--;
248
249 /* remove the context from the context list */
250 PopPlannerRestrictionContext();
251
252 /*
253 * In some cases, for example; parameterized SQL functions, we may miss that
254 * there is a need for distributed planning. Such cases only become clear after
255 * standard_planner performs some modifications on parse tree. In such cases
256 * we will simply error out.
257 */
258 if (!needsDistributedPlanning && NeedsDistributedPlanning(parse))
259 {
260 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
261 errmsg("cannot perform distributed planning on this "
262 "query because parameterized queries for SQL "
263 "functions referencing distributed tables are "
264 "not supported"),
265 errhint("Consider using PL/pgSQL functions instead.")));
266 }
267
268 return result;
269 }
270
271
272 /*
273 * ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
274 * The function traverses the input query and returns all the range table
275 * entries that are in the query tree.
276 */
277 List *
ExtractRangeTableEntryList(Query * query)278 ExtractRangeTableEntryList(Query *query)
279 {
280 List *rteList = NIL;
281
282 ExtractRangeTableEntryWalker((Node *) query, &rteList);
283
284 return rteList;
285 }
286
287
288 /*
289 * NeedsDistributedPlanning returns true if the Citus extension is loaded and
290 * the query contains a distributed table.
291 *
292 * This function allows queries containing local tables to pass through the
293 * distributed planner. How to handle local tables is a decision that should
294 * be made within the planner
295 */
296 bool
NeedsDistributedPlanning(Query * query)297 NeedsDistributedPlanning(Query *query)
298 {
299 if (!CitusHasBeenLoaded())
300 {
301 return false;
302 }
303
304 CmdType commandType = query->commandType;
305
306 if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
307 commandType != CMD_UPDATE && commandType != CMD_DELETE)
308 {
309 return false;
310 }
311
312 List *allRTEs = ExtractRangeTableEntryList(query);
313
314 return ListContainsDistributedTableRTE(allRTEs);
315 }
316
317
318 /*
319 * ListContainsDistributedTableRTE gets a list of range table entries
320 * and returns true if there is at least one distributed relation range
321 * table entry in the list.
322 */
323 static bool
ListContainsDistributedTableRTE(List * rangeTableList)324 ListContainsDistributedTableRTE(List *rangeTableList)
325 {
326 ListCell *rangeTableCell = NULL;
327
328 foreach(rangeTableCell, rangeTableList)
329 {
330 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
331
332 if (rangeTableEntry->rtekind != RTE_RELATION)
333 {
334 continue;
335 }
336
337 if (IsCitusTable(rangeTableEntry->relid))
338 {
339 return true;
340 }
341 }
342
343 return false;
344 }
345
346
347 /*
348 * AssignRTEIdentities function modifies query tree by adding RTE identities to the
349 * RTE_RELATIONs.
350 *
351 * Please note that, we want to avoid modifying query tree as much as possible
352 * because if PostgreSQL changes the way it uses modified fields, that may break
353 * our logic.
354 *
355 * Returns the next id. This can be used to call on a rangeTableList that may've
356 * been partially assigned. Should be set to 1 initially.
357 */
358 static int
AssignRTEIdentities(List * rangeTableList,int rteIdCounter)359 AssignRTEIdentities(List *rangeTableList, int rteIdCounter)
360 {
361 ListCell *rangeTableCell = NULL;
362
363 foreach(rangeTableCell, rangeTableList)
364 {
365 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
366
367 /*
368 * To be able to track individual RTEs through PostgreSQL's query
369 * planning, we need to be able to figure out whether an RTE is
370 * actually a copy of another, rather than a different one. We
371 * simply number the RTEs starting from 1.
372 *
373 * Note that we're only interested in RTE_RELATIONs and thus assigning
374 * identifiers to those RTEs only.
375 */
376 if (rangeTableEntry->rtekind == RTE_RELATION &&
377 rangeTableEntry->values_lists == NIL)
378 {
379 AssignRTEIdentity(rangeTableEntry, rteIdCounter++);
380 }
381 }
382
383 return rteIdCounter;
384 }
385
386
387 /*
388 * AdjustPartitioningForDistributedPlanning function modifies query tree by
389 * changing inh flag and relkind of partitioned tables. We want Postgres to
390 * treat partitioned tables as regular relations (i.e. we do not want to
391 * expand them to their partitions) since it breaks Citus planning in different
392 * ways. We let anything related to partitioning happen on the shards.
393 *
394 * Please note that, we want to avoid modifying query tree as much as possible
395 * because if PostgreSQL changes the way it uses modified fields, that may break
396 * our logic.
397 */
398 static void
AdjustPartitioningForDistributedPlanning(List * rangeTableList,bool setPartitionedTablesInherited)399 AdjustPartitioningForDistributedPlanning(List *rangeTableList,
400 bool setPartitionedTablesInherited)
401 {
402 ListCell *rangeTableCell = NULL;
403
404 foreach(rangeTableCell, rangeTableList)
405 {
406 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
407
408 /*
409 * We want Postgres to behave partitioned tables as regular relations
410 * (i.e. we do not want to expand them to their partitions). To do this
411 * we set each partitioned table's inh flag to appropriate
412 * value before and after dropping to the standart_planner.
413 */
414 if (rangeTableEntry->rtekind == RTE_RELATION &&
415 PartitionedTable(rangeTableEntry->relid))
416 {
417 rangeTableEntry->inh = setPartitionedTablesInherited;
418
419 if (setPartitionedTablesInherited)
420 {
421 rangeTableEntry->relkind = RELKIND_PARTITIONED_TABLE;
422 }
423 else
424 {
425 rangeTableEntry->relkind = RELKIND_RELATION;
426 }
427 }
428 }
429 }
430
431
432 /*
433 * AssignRTEIdentity assigns the given rteIdentifier to the given range table
434 * entry.
435 *
436 * To be able to track RTEs through postgres' query planning, which copies and
437 * duplicate, and modifies them, we sometimes need to figure out whether two
438 * RTEs are copies of the same original RTE. For that we, hackishly, use a
439 * field normally unused in RTE_RELATION RTEs.
440 *
441 * The assigned identifier better be unique within a plantree.
442 */
443 static void
AssignRTEIdentity(RangeTblEntry * rangeTableEntry,int rteIdentifier)444 AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
445 {
446 Assert(rangeTableEntry->rtekind == RTE_RELATION);
447
448 rangeTableEntry->values_lists = list_make2_int(rteIdentifier, rangeTableEntry->inh);
449 }
450
451
452 /* GetRTEIdentity returns the identity assigned with AssignRTEIdentity. */
453 int
GetRTEIdentity(RangeTblEntry * rte)454 GetRTEIdentity(RangeTblEntry *rte)
455 {
456 Assert(rte->rtekind == RTE_RELATION);
457 Assert(rte->values_lists != NIL);
458 Assert(IsA(rte->values_lists, IntList));
459 Assert(list_length(rte->values_lists) == 2);
460
461 return linitial_int(rte->values_lists);
462 }
463
464
465 /*
466 * GetOriginalInh gets the original value of the inheritance flag set by
467 * AssignRTEIdentity. The planner resets this flag in the rewritten query,
468 * but we need it during deparsing.
469 */
470 bool
GetOriginalInh(RangeTblEntry * rte)471 GetOriginalInh(RangeTblEntry *rte)
472 {
473 return lsecond_int(rte->values_lists);
474 }
475
476
477 /*
478 * GetQueryLockMode returns the necessary lock mode to be acquired for the
479 * given query. (See comment written in RangeTblEntry->rellockmode)
480 */
481 LOCKMODE
GetQueryLockMode(Query * query)482 GetQueryLockMode(Query *query)
483 {
484 if (IsModifyCommand(query))
485 {
486 return RowExclusiveLock;
487 }
488 else if (query->hasForUpdate)
489 {
490 return RowShareLock;
491 }
492 else
493 {
494 return AccessShareLock;
495 }
496 }
497
498
499 /*
500 * IsModifyCommand returns true if the query performs modifications, false
501 * otherwise.
502 */
503 bool
IsModifyCommand(Query * query)504 IsModifyCommand(Query *query)
505 {
506 CmdType commandType = query->commandType;
507
508 if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
509 commandType == CMD_DELETE)
510 {
511 return true;
512 }
513
514 return false;
515 }
516
517
518 /*
519 * IsMultiTaskPlan returns true if job contains multiple tasks.
520 */
521 bool
IsMultiTaskPlan(DistributedPlan * distributedPlan)522 IsMultiTaskPlan(DistributedPlan *distributedPlan)
523 {
524 Job *workerJob = distributedPlan->workerJob;
525
526 if (workerJob != NULL && list_length(workerJob->taskList) > 1)
527 {
528 return true;
529 }
530
531 return false;
532 }
533
534
535 /*
536 * IsUpdateOrDelete returns true if the query performs an update or delete.
537 */
538 bool
IsUpdateOrDelete(Query * query)539 IsUpdateOrDelete(Query *query)
540 {
541 return query->commandType == CMD_UPDATE ||
542 query->commandType == CMD_DELETE;
543 }
544
545
546 /*
547 * PlanFastPathDistributedStmt creates a distributed planned statement using
548 * the FastPathPlanner.
549 */
550 static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext * planContext,Node * distributionKeyValue)551 PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
552 Node *distributionKeyValue)
553 {
554 FastPathRestrictionContext *fastPathContext =
555 planContext->plannerRestrictionContext->fastPathRestrictionContext;
556
557 planContext->plannerRestrictionContext->fastPathRestrictionContext->
558 fastPathRouterQuery = true;
559
560 if (distributionKeyValue == NULL)
561 {
562 /* nothing to record */
563 }
564 else if (IsA(distributionKeyValue, Const))
565 {
566 fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
567 }
568 else if (IsA(distributionKeyValue, Param))
569 {
570 fastPathContext->distributionKeyHasParam = true;
571 }
572
573 planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
574 planContext->boundParams);
575
576 return CreateDistributedPlannedStmt(planContext);
577 }
578
579
580 /*
581 * PlanDistributedStmt creates a distributed planned statement using the PG
582 * planner.
583 */
584 static PlannedStmt *
PlanDistributedStmt(DistributedPlanningContext * planContext,int rteIdCounter)585 PlanDistributedStmt(DistributedPlanningContext *planContext,
586 int rteIdCounter)
587 {
588 /* may've inlined new relation rtes */
589 List *rangeTableList = ExtractRangeTableEntryList(planContext->query);
590 rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
591
592
593 PlannedStmt *result = CreateDistributedPlannedStmt(planContext);
594
595 bool setPartitionedTablesInherited = true;
596 AdjustPartitioningForDistributedPlanning(rangeTableList,
597 setPartitionedTablesInherited);
598
599 return result;
600 }
601
602
603 /*
604 * DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that
605 * potentially failed due to unresolved prepared statement parameters.
606 */
607 void
DissuadePlannerFromUsingPlan(PlannedStmt * plan)608 DissuadePlannerFromUsingPlan(PlannedStmt *plan)
609 {
610 /*
611 * Arbitrarily high cost, but low enough that it can be added up
612 * without overflowing by choose_custom_plan().
613 */
614 plan->planTree->total_cost = FLT_MAX / 100000000;
615 }
616
617
618 /*
619 * CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular
620 * query into a distributed plan that is encapsulated by a PlannedStmt.
621 */
622 static PlannedStmt *
CreateDistributedPlannedStmt(DistributedPlanningContext * planContext)623 CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
624 {
625 uint64 planId = NextPlanId++;
626 bool hasUnresolvedParams = false;
627
628 PlannedStmt *resultPlan = NULL;
629
630 if (QueryTreeContainsInlinableCTE(planContext->originalQuery))
631 {
632 /*
633 * Inlining CTEs as subqueries in the query can avoid recursively
634 * planning some (or all) of the CTEs. In other words, the inlined
635 * CTEs could become part of query pushdown planning, which is much
636 * more efficient than recursively planning. So, first try distributed
637 * planning on the inlined CTEs in the query tree.
638 *
639 * We also should fallback to distributed planning with non-inlined CTEs
640 * if the distributed planning fails with inlined CTEs, because recursively
641 * planning CTEs can provide full SQL coverage, although it might be slow.
642 */
643 resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext);
644 if (resultPlan != NULL)
645 {
646 return resultPlan;
647 }
648 }
649
650 if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
651 planContext->boundParams))
652 {
653 hasUnresolvedParams = true;
654 }
655
656 DistributedPlan *distributedPlan =
657 CreateDistributedPlan(planId, planContext->originalQuery, planContext->query,
658 planContext->boundParams,
659 hasUnresolvedParams,
660 planContext->plannerRestrictionContext);
661
662 /*
663 * If no plan was generated, prepare a generic error to be emitted.
664 * Normally this error message will never returned to the user, as it's
665 * usually due to unresolved prepared statement parameters - in that case
666 * the logic below will force a custom plan (i.e. with parameters bound to
667 * specific values) to be generated. But sql (not plpgsql) functions
668 * unfortunately don't go through a codepath supporting custom plans - so
669 * we still need to have an error prepared.
670 */
671 if (!distributedPlan)
672 {
673 /* currently always should have a more specific error otherwise */
674 Assert(hasUnresolvedParams);
675 distributedPlan = CitusMakeNode(DistributedPlan);
676 distributedPlan->planningError =
677 DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
678 "could not create distributed plan",
679 "Possibly this is caused by the use of parameters in SQL "
680 "functions, which is not supported in Citus.",
681 "Consider using PL/pgSQL functions instead.");
682 }
683
684 /*
685 * Error out if none of the planners resulted in a usable plan, unless the
686 * error was possibly triggered by missing parameters. In that case we'll
687 * not error out here, but instead rely on postgres' custom plan logic.
688 * Postgres re-plans prepared statements the first five executions
689 * (i.e. it produces custom plans), after that the cost of a generic plan
690 * is compared with the average custom plan cost. We support otherwise
691 * unsupported prepared statement parameters by assigning an exorbitant
692 * cost to the unsupported query. That'll lead to the custom plan being
693 * chosen. But for that to be possible we can't error out here, as
694 * otherwise that logic is never reached.
695 */
696 if (distributedPlan->planningError && !hasUnresolvedParams)
697 {
698 RaiseDeferredError(distributedPlan->planningError, ERROR);
699 }
700
701 /* remember the plan's identifier for identifying subplans */
702 distributedPlan->planId = planId;
703
704 /* create final plan by combining local plan with distributed plan */
705 resultPlan = FinalizePlan(planContext->plan, distributedPlan);
706
707 /*
708 * As explained above, force planning costs to be unrealistically high if
709 * query planning failed (possibly) due to prepared statement parameters or
710 * if it is planned as a multi shard modify query.
711 */
712 if ((distributedPlan->planningError ||
713 (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan(
714 distributedPlan))) &&
715 hasUnresolvedParams)
716 {
717 DissuadePlannerFromUsingPlan(resultPlan);
718 }
719
720 return resultPlan;
721 }
722
723
724 /*
725 * InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
726 * for creating a distributed planned statement. The function is primarily a
727 * wrapper on top of CreateDistributedPlannedStmt(), by first inlining the
728 * CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The
729 * function returns NULL if the planning fails on the query where eligable
730 * CTEs are inlined.
731 */
732 static PlannedStmt *
InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,DistributedPlanningContext * planContext)733 InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
734 DistributedPlanningContext *planContext)
735 {
736 if (!EnableCTEInlining)
737 {
738 /*
739 * In Postgres 12+, users can adjust whether to inline/not inline CTEs
740 * by [NOT] MATERIALIZED keywords. However, in PG 11, that's not possible.
741 * So, with this we provide a way to prevent CTE inlining on Postgres 11.
742 *
743 * The main use-case for this is not to have divergent test outputs between
744 * PG 11 vs PG 12, so not very much intended for users.
745 */
746 return NULL;
747 }
748
749 /*
750 * We'll inline the CTEs and try distributed planning, preserve the original
751 * query in case the planning fails and we fallback to recursive planning of
752 * CTEs.
753 */
754 Query *copyOfOriginalQuery = copyObject(planContext->originalQuery);
755
756 RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery);
757
758 /* after inlining, we shouldn't have any inlinable CTEs */
759 Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
760
761 /* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */
762 PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan,
763 copyOfOriginalQuery,
764 planContext->query,
765 planContext->boundParams,
766 planContext->
767 plannerRestrictionContext);
768
769 return result;
770 }
771
772
773 /*
774 * TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply
775 * calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input
776 * query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4
777 * message with the reason for the failure.
778 */
779 static PlannedStmt *
TryCreateDistributedPlannedStmt(PlannedStmt * localPlan,Query * originalQuery,Query * query,ParamListInfo boundParams,PlannerRestrictionContext * plannerRestrictionContext)780 TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
781 Query *originalQuery,
782 Query *query, ParamListInfo boundParams,
783 PlannerRestrictionContext *plannerRestrictionContext)
784 {
785 MemoryContext savedContext = CurrentMemoryContext;
786 PlannedStmt *result = NULL;
787
788 DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext));
789
790 planContext->plan = localPlan;
791 planContext->boundParams = boundParams;
792 planContext->originalQuery = originalQuery;
793 planContext->query = query;
794 planContext->plannerRestrictionContext = plannerRestrictionContext;
795
796
797 PG_TRY();
798 {
799 result = CreateDistributedPlannedStmt(planContext);
800 }
801 PG_CATCH();
802 {
803 MemoryContextSwitchTo(savedContext);
804 ErrorData *edata = CopyErrorData();
805 FlushErrorState();
806
807 /* don't try to intercept PANIC or FATAL, let those breeze past us */
808 if (edata->elevel != ERROR)
809 {
810 PG_RE_THROW();
811 }
812
813 ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
814 errmsg("Planning after CTEs inlined failed with "
815 "\nmessage: %s\ndetail: %s\nhint: %s",
816 edata->message ? edata->message : "",
817 edata->detail ? edata->detail : "",
818 edata->hint ? edata->hint : "")));
819
820 /* leave the error handling system */
821 FreeErrorData(edata);
822
823 result = NULL;
824 }
825 PG_END_TRY();
826
827 return result;
828 }
829
830
831 /*
832 * CreateDistributedPlan generates a distributed plan for a query.
833 * It goes through 3 steps:
834 *
835 * 1. Try router planner
836 * 2. Generate subplans for CTEs and complex subqueries
837 * - If any, go back to step 1 by calling itself recursively
838 * 3. Logical planner
839 */
840 DistributedPlan *
CreateDistributedPlan(uint64 planId,Query * originalQuery,Query * query,ParamListInfo boundParams,bool hasUnresolvedParams,PlannerRestrictionContext * plannerRestrictionContext)841 CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo
842 boundParams, bool hasUnresolvedParams,
843 PlannerRestrictionContext *plannerRestrictionContext)
844 {
845 DistributedPlan *distributedPlan = NULL;
846 bool hasCtes = originalQuery->cteList != NIL;
847
848 if (IsModifyCommand(originalQuery))
849 {
850 EnsureModificationsCanRun();
851
852 Oid targetRelationId = ModifyQueryResultRelationId(query);
853 EnsurePartitionTableNotReplicated(targetRelationId);
854
855 if (InsertSelectIntoCitusTable(originalQuery))
856 {
857 if (hasUnresolvedParams)
858 {
859 /*
860 * Unresolved parameters can cause performance regressions in
861 * INSERT...SELECT when the partition column is a parameter
862 * because we don't perform any additional pruning in the executor.
863 */
864 return NULL;
865 }
866
867 distributedPlan =
868 CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext,
869 boundParams);
870 }
871 else if (InsertSelectIntoLocalTable(originalQuery))
872 {
873 if (hasUnresolvedParams)
874 {
875 /*
876 * Unresolved parameters can cause performance regressions in
877 * INSERT...SELECT when the partition column is a parameter
878 * because we don't perform any additional pruning in the executor.
879 */
880 return NULL;
881 }
882 distributedPlan =
883 CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, boundParams,
884 hasUnresolvedParams,
885 plannerRestrictionContext);
886 }
887 else
888 {
889 /* modifications are always routed through the same planner/executor */
890 distributedPlan =
891 CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
892 }
893
894 /* the functions above always return a plan, possibly with an error */
895 Assert(distributedPlan);
896
897 if (distributedPlan->planningError == NULL)
898 {
899 return distributedPlan;
900 }
901 else
902 {
903 RaiseDeferredError(distributedPlan->planningError, DEBUG2);
904 }
905 }
906 else
907 {
908 /*
909 * For select queries we, if router executor is enabled, first try to
910 * plan the query as a router query. If not supported, otherwise try
911 * the full blown plan/optimize/physical planning process needed to
912 * produce distributed query plans.
913 */
914
915 distributedPlan = CreateRouterPlan(originalQuery, query,
916 plannerRestrictionContext);
917 if (distributedPlan->planningError == NULL)
918 {
919 return distributedPlan;
920 }
921 else
922 {
923 /*
924 * For debugging it's useful to display why query was not
925 * router plannable.
926 */
927 RaiseDeferredError(distributedPlan->planningError, DEBUG2);
928 }
929 }
930
931 if (hasUnresolvedParams)
932 {
933 /*
934 * There are parameters that don't have a value in boundParams.
935 *
936 * The remainder of the planning logic cannot handle unbound
937 * parameters. We return a NULL plan, which will have an
938 * extremely high cost, such that postgres will replan with
939 * bound parameters.
940 */
941 return NULL;
942 }
943
944 /* force evaluation of bound params */
945 boundParams = copyParamList(boundParams);
946
947 /*
948 * If there are parameters that do have a value in boundParams, replace
949 * them in the original query. This allows us to more easily cut the
950 * query into pieces (during recursive planning) or deparse parts of
951 * the query (during subquery pushdown planning).
952 */
953 originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
954 boundParams);
955 Assert(originalQuery != NULL);
956
957 /*
958 * Plan subqueries and CTEs that cannot be pushed down by recursively
959 * calling the planner and return the resulting plans to subPlanList.
960 */
961 List *subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId, originalQuery,
962 plannerRestrictionContext);
963
964 /*
965 * If subqueries were recursively planned then we need to replan the query
966 * to get the new planner restriction context and apply planner transformations.
967 *
968 * We could simplify this code if the logical planner was capable of dealing
969 * with an original query. In that case, we would only have to filter the
970 * planner restriction context.
971 *
972 * Note that we check both for subplans and whether the query had CTEs
973 * prior to calling GenerateSubplansForSubqueriesAndCTEs. If none of
974 * the CTEs are referenced then there are no subplans, but we still want
975 * to retry the router planner.
976 */
977 if (list_length(subPlanList) > 0 || hasCtes)
978 {
979 Query *newQuery = copyObject(originalQuery);
980 bool setPartitionedTablesInherited = false;
981 PlannerRestrictionContext *currentPlannerRestrictionContext =
982 CurrentPlannerRestrictionContext();
983
984 /* reset the current planner restrictions context */
985 ResetPlannerRestrictionContext(currentPlannerRestrictionContext);
986
987 /*
988 * We force standard_planner to treat partitioned tables as regular tables
989 * by clearing the inh flag on RTEs. We already did this at the start of
990 * distributed_planner, but on a copy of the original query, so we need
991 * to do it again here.
992 */
993 AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery),
994 setPartitionedTablesInherited);
995
996 /*
997 * Some relations may have been removed from the query, but we can skip
998 * AssignRTEIdentities since we currently do not rely on RTE identities
999 * being contiguous.
1000 */
1001
1002 standard_planner_compat(newQuery, 0, boundParams);
1003
1004 /* overwrite the old transformed query with the new transformed query */
1005 *query = *newQuery;
1006
1007 /* recurse into CreateDistributedPlan with subqueries/CTEs replaced */
1008 distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, false,
1009 plannerRestrictionContext);
1010
1011 /* distributedPlan cannot be null since hasUnresolvedParams argument was false */
1012 Assert(distributedPlan != NULL);
1013 distributedPlan->subPlanList = subPlanList;
1014
1015 return distributedPlan;
1016 }
1017
1018 /*
1019 * DML command returns a planning error, even after recursive planning. The
1020 * logical planner cannot handle DML commands so return the plan with the
1021 * error.
1022 */
1023 if (IsModifyCommand(originalQuery))
1024 {
1025 return distributedPlan;
1026 }
1027
1028 /*
1029 * CTEs are stripped from the original query by RecursivelyPlanSubqueriesAndCTEs.
1030 * If we get here and there are still CTEs that means that none of the CTEs are
1031 * referenced. We therefore also strip the CTEs from the rewritten query.
1032 */
1033 query->cteList = NIL;
1034 Assert(originalQuery->cteList == NIL);
1035
1036 MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
1037 plannerRestrictionContext);
1038 MultiLogicalPlanOptimize(logicalPlan);
1039
1040 /*
1041 * This check is here to make it likely that all node types used in
1042 * Citus are dumpable. Explain can dump logical and physical plans
1043 * using the extended outfuncs infrastructure, but it's infeasible to
1044 * test most plans. MultiQueryContainerNode always serializes the
1045 * physical plan, so there's no need to check that separately
1046 */
1047 CheckNodeIsDumpable((Node *) logicalPlan);
1048
1049 /* Create the physical plan */
1050 distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
1051 plannerRestrictionContext);
1052
1053 /* distributed plan currently should always succeed or error out */
1054 Assert(distributedPlan && distributedPlan->planningError == NULL);
1055
1056 return distributedPlan;
1057 }
1058
1059
1060 /*
1061 * EnsurePartitionTableNotReplicated errors out if the infput relation is
1062 * a partition table and the table has a replication factor greater than
1063 * one.
1064 *
1065 * If the table is not a partition or replication factor is 1, the function
1066 * becomes a no-op.
1067 */
1068 void
EnsurePartitionTableNotReplicated(Oid relationId)1069 EnsurePartitionTableNotReplicated(Oid relationId)
1070 {
1071 DeferredErrorMessage *deferredError =
1072 DeferErrorIfPartitionTableNotSingleReplicated(relationId);
1073 if (deferredError != NULL)
1074 {
1075 RaiseDeferredError(deferredError, ERROR);
1076 }
1077 }
1078
1079
1080 /*
1081 * DeferErrorIfPartitionTableNotSingleReplicated defers error if the input relation
1082 * is a partition table with replication factor > 1. Otherwise, the function returns
1083 * NULL.
1084 */
1085 static DeferredErrorMessage *
DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId)1086 DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId)
1087 {
1088 if (PartitionTableNoLock(relationId) && !SingleReplicatedTable(relationId))
1089 {
1090 Oid parentOid = PartitionParentOid(relationId);
1091 char *parentRelationTest = get_rel_name(parentOid);
1092 StringInfo errorHint = makeStringInfo();
1093
1094 appendStringInfo(errorHint, "Run the query on the parent table "
1095 "\"%s\" instead.", parentRelationTest);
1096
1097 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1098 "modifications on partitions when replication "
1099 "factor is greater than 1 is not supported",
1100 NULL, errorHint->data);
1101 }
1102
1103 return NULL;
1104 }
1105
1106
1107 /*
1108 * ResolveExternalParams replaces the external parameters that appears
1109 * in the query with the corresponding entries in the boundParams.
1110 *
1111 * Note that this function is inspired by eval_const_expr() on Postgres.
1112 * We cannot use that function because it requires access to PlannerInfo.
1113 */
1114 Node *
ResolveExternalParams(Node * inputNode,ParamListInfo boundParams)1115 ResolveExternalParams(Node *inputNode, ParamListInfo boundParams)
1116 {
1117 /* consider resolving external parameters only when boundParams exists */
1118 if (!boundParams)
1119 {
1120 return inputNode;
1121 }
1122
1123 if (inputNode == NULL)
1124 {
1125 return NULL;
1126 }
1127
1128 if (IsA(inputNode, Param))
1129 {
1130 Param *paramToProcess = (Param *) inputNode;
1131 int numberOfParameters = boundParams->numParams;
1132 int parameterId = paramToProcess->paramid;
1133 int16 typeLength = 0;
1134 bool typeByValue = false;
1135 Datum constValue = 0;
1136
1137 if (paramToProcess->paramkind != PARAM_EXTERN)
1138 {
1139 return inputNode;
1140 }
1141
1142 if (parameterId < 0)
1143 {
1144 return inputNode;
1145 }
1146
1147 /* parameterId starts from 1 */
1148 int parameterIndex = parameterId - 1;
1149 if (parameterIndex >= numberOfParameters)
1150 {
1151 return inputNode;
1152 }
1153
1154 ParamExternData *correspondingParameterData =
1155 &boundParams->params[parameterIndex];
1156
1157 if (!(correspondingParameterData->pflags & PARAM_FLAG_CONST))
1158 {
1159 return inputNode;
1160 }
1161
1162 get_typlenbyval(paramToProcess->paramtype, &typeLength, &typeByValue);
1163
1164 bool paramIsNull = correspondingParameterData->isnull;
1165 if (paramIsNull)
1166 {
1167 constValue = 0;
1168 }
1169 else if (typeByValue)
1170 {
1171 constValue = correspondingParameterData->value;
1172 }
1173 else
1174 {
1175 /*
1176 * Out of paranoia ensure that datum lives long enough,
1177 * although bind params currently should always live
1178 * long enough.
1179 */
1180 constValue = datumCopy(correspondingParameterData->value, typeByValue,
1181 typeLength);
1182 }
1183
1184 return (Node *) makeConst(paramToProcess->paramtype, paramToProcess->paramtypmod,
1185 paramToProcess->paramcollid, typeLength, constValue,
1186 paramIsNull, typeByValue);
1187 }
1188 else if (IsA(inputNode, Query))
1189 {
1190 return (Node *) query_tree_mutator((Query *) inputNode, ResolveExternalParams,
1191 boundParams, 0);
1192 }
1193
1194 return expression_tree_mutator(inputNode, ResolveExternalParams, boundParams);
1195 }
1196
1197
1198 /*
1199 * GetDistributedPlan returns the associated DistributedPlan for a CustomScan.
1200 *
1201 * Callers should only read from the returned data structure, since it may be
1202 * the plan of a prepared statement and may therefore be reused.
1203 */
1204 DistributedPlan *
GetDistributedPlan(CustomScan * customScan)1205 GetDistributedPlan(CustomScan *customScan)
1206 {
1207 Assert(list_length(customScan->custom_private) == 1);
1208
1209 Node *node = (Node *) linitial(customScan->custom_private);
1210 Assert(CitusIsA(node, DistributedPlan));
1211
1212 CheckNodeCopyAndSerialization(node);
1213
1214 DistributedPlan *distributedPlan = (DistributedPlan *) node;
1215
1216 return distributedPlan;
1217 }
1218
1219
1220 /*
1221 * FinalizePlan combines local plan with distributed plan and creates a plan
1222 * which can be run by the PostgreSQL executor.
1223 */
1224 PlannedStmt *
FinalizePlan(PlannedStmt * localPlan,DistributedPlan * distributedPlan)1225 FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
1226 {
1227 PlannedStmt *finalPlan = NULL;
1228 CustomScan *customScan = makeNode(CustomScan);
1229 MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST;
1230
1231 /* this field is used in JobExecutorType */
1232 distributedPlan->relationIdList = localPlan->relationOids;
1233
1234 if (!distributedPlan->planningError)
1235 {
1236 executorType = JobExecutorType(distributedPlan);
1237 }
1238
1239 switch (executorType)
1240 {
1241 case MULTI_EXECUTOR_ADAPTIVE:
1242 {
1243 customScan->methods = &AdaptiveExecutorCustomScanMethods;
1244 break;
1245 }
1246
1247 case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT:
1248 {
1249 customScan->methods = &NonPushableInsertSelectCustomScanMethods;
1250 break;
1251 }
1252
1253 default:
1254 {
1255 customScan->methods = &DelayedErrorCustomScanMethods;
1256 break;
1257 }
1258 }
1259
1260 if (IsMultiTaskPlan(distributedPlan))
1261 {
1262 /* if it is not a single task executable plan, inform user according to the log level */
1263 if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF)
1264 {
1265 ereport(MultiTaskQueryLogLevel, (errmsg(
1266 "multi-task query about to be executed"),
1267 errhint(
1268 "Queries are split to multiple tasks "
1269 "if they have to be split into several"
1270 " queries on the workers.")));
1271 }
1272 }
1273
1274 distributedPlan->queryId = localPlan->queryId;
1275
1276 Node *distributedPlanData = (Node *) distributedPlan;
1277
1278 customScan->custom_private = list_make1(distributedPlanData);
1279 customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
1280
1281 /*
1282 * Fast path queries cannot have any subplans by definition, so skip
1283 * expensive traversals.
1284 */
1285 if (!distributedPlan->fastPathRouterPlan)
1286 {
1287 /*
1288 * Record subplans used by distributed plan to make intermediate result
1289 * pruning easier.
1290 *
1291 * We do this before finalizing the plan, because the combineQuery is
1292 * rewritten by standard_planner in FinalizeNonRouterPlan.
1293 */
1294 distributedPlan->usedSubPlanNodeList = FindSubPlanUsages(distributedPlan);
1295 }
1296
1297 if (distributedPlan->combineQuery)
1298 {
1299 finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan);
1300 }
1301 else
1302 {
1303 finalPlan = FinalizeRouterPlan(localPlan, customScan);
1304 }
1305
1306 return finalPlan;
1307 }
1308
1309
1310 /*
1311 * FinalizeNonRouterPlan gets the distributed custom scan plan, and creates the
1312 * final master select plan on the top of this distributed plan for adaptive executor.
1313 */
1314 static PlannedStmt *
FinalizeNonRouterPlan(PlannedStmt * localPlan,DistributedPlan * distributedPlan,CustomScan * customScan)1315 FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan,
1316 CustomScan *customScan)
1317 {
1318 PlannedStmt *finalPlan = PlanCombineQuery(distributedPlan, customScan);
1319 finalPlan->queryId = localPlan->queryId;
1320 finalPlan->utilityStmt = localPlan->utilityStmt;
1321
1322 /* add original range table list for access permission checks */
1323 finalPlan->rtable = list_concat(finalPlan->rtable, localPlan->rtable);
1324
1325 return finalPlan;
1326 }
1327
1328
1329 /*
1330 * FinalizeRouterPlan gets a CustomScan node which already wrapped distributed
1331 * part of a router plan and sets it as the direct child of the router plan
1332 * because we don't run any query on master node for router executable queries.
1333 * Here, we also rebuild the column list to read from the remote scan.
1334 */
1335 static PlannedStmt *
FinalizeRouterPlan(PlannedStmt * localPlan,CustomScan * customScan)1336 FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
1337 {
1338 List *columnNameList = NIL;
1339
1340 customScan->custom_scan_tlist =
1341 makeCustomScanTargetlistFromExistingTargetList(localPlan->planTree->targetlist);
1342 customScan->scan.plan.targetlist =
1343 makeTargetListFromCustomScanList(customScan->custom_scan_tlist);
1344
1345 /* extract the column names from the final targetlist*/
1346 TargetEntry *targetEntry = NULL;
1347 foreach_ptr(targetEntry, customScan->scan.plan.targetlist)
1348 {
1349 Value *columnName = makeString(targetEntry->resname);
1350 columnNameList = lappend(columnNameList, columnName);
1351 }
1352
1353 PlannedStmt *routerPlan = makeNode(PlannedStmt);
1354 routerPlan->planTree = (Plan *) customScan;
1355
1356 RangeTblEntry *remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
1357 routerPlan->rtable = list_make1(remoteScanRangeTableEntry);
1358
1359 /* add original range table list for access permission checks */
1360 routerPlan->rtable = list_concat(routerPlan->rtable, localPlan->rtable);
1361
1362 routerPlan->canSetTag = true;
1363 routerPlan->relationOids = NIL;
1364
1365 routerPlan->queryId = localPlan->queryId;
1366 routerPlan->utilityStmt = localPlan->utilityStmt;
1367 routerPlan->commandType = localPlan->commandType;
1368 routerPlan->hasReturning = localPlan->hasReturning;
1369
1370 return routerPlan;
1371 }
1372
1373
1374 /*
1375 * makeCustomScanTargetlistFromExistingTargetList rebuilds the targetlist from the remote
1376 * query into a list that can be used as the custom_scan_tlist for our Citus Custom Scan.
1377 */
1378 static List *
makeCustomScanTargetlistFromExistingTargetList(List * existingTargetlist)1379 makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist)
1380 {
1381 List *custom_scan_tlist = NIL;
1382
1383 /* we will have custom scan range table entry as the first one in the list */
1384 const int customScanRangeTableIndex = 1;
1385
1386 /* build a targetlist to read from the custom scan output */
1387 TargetEntry *targetEntry = NULL;
1388 foreach_ptr(targetEntry, existingTargetlist)
1389 {
1390 Assert(IsA(targetEntry, TargetEntry));
1391
1392 /*
1393 * This is unlikely to be hit because we would not need resjunk stuff
1394 * at the toplevel of a router query - all things needing it have been
1395 * pushed down.
1396 */
1397 if (targetEntry->resjunk)
1398 {
1399 continue;
1400 }
1401
1402 /* build target entry pointing to remote scan range table entry */
1403 Var *newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry);
1404
1405 if (newVar->vartype == RECORDOID || newVar->vartype == RECORDARRAYOID)
1406 {
1407 /*
1408 * Add the anonymous composite type to the type cache and store
1409 * the key in vartypmod. Eventually this makes its way into the
1410 * TupleDesc used by the executor, which uses it to parse the
1411 * query results from the workers in BuildTupleFromCStrings.
1412 */
1413 newVar->vartypmod = BlessRecordExpression(targetEntry->expr);
1414 }
1415
1416 TargetEntry *newTargetEntry = flatCopyTargetEntry(targetEntry);
1417 newTargetEntry->expr = (Expr *) newVar;
1418 custom_scan_tlist = lappend(custom_scan_tlist, newTargetEntry);
1419 }
1420
1421 return custom_scan_tlist;
1422 }
1423
1424
1425 /*
1426 * makeTargetListFromCustomScanList based on a custom_scan_tlist create the target list to
1427 * use on the Citus Custom Scan Node. The targetlist differs from the custom_scan_tlist in
1428 * a way that the expressions in the targetlist all are references to the index (resno) in
1429 * the custom_scan_tlist in their varattno while the varno is replaced with INDEX_VAR
1430 * instead of the range table entry index.
1431 */
1432 static List *
makeTargetListFromCustomScanList(List * custom_scan_tlist)1433 makeTargetListFromCustomScanList(List *custom_scan_tlist)
1434 {
1435 List *targetList = NIL;
1436 TargetEntry *targetEntry = NULL;
1437 int resno = 1;
1438 foreach_ptr(targetEntry, custom_scan_tlist)
1439 {
1440 /*
1441 * INDEX_VAR is used to reference back to the TargetEntry in custom_scan_tlist by
1442 * its resno (index)
1443 */
1444 Var *newVar = makeVarFromTargetEntry(INDEX_VAR, targetEntry);
1445 TargetEntry *newTargetEntry = makeTargetEntry((Expr *) newVar, resno,
1446 targetEntry->resname,
1447 targetEntry->resjunk);
1448 targetList = lappend(targetList, newTargetEntry);
1449 resno++;
1450 }
1451 return targetList;
1452 }
1453
1454
1455 /*
1456 * BlessRecordExpression ensures we can parse an anonymous composite type on the
1457 * target list of a query that is sent to the worker.
1458 *
1459 * We cannot normally parse record types coming from the workers unless we
1460 * "bless" the tuple descriptor, which adds a transient type to the type cache
1461 * and assigns it a type mod value, which is the key in the type cache.
1462 */
1463 int32
BlessRecordExpression(Expr * expr)1464 BlessRecordExpression(Expr *expr)
1465 {
1466 int32 typeMod = -1;
1467
1468 if (IsA(expr, FuncExpr) || IsA(expr, OpExpr))
1469 {
1470 /*
1471 * Handle functions that return records on the target
1472 * list, e.g. SELECT function_call(1,2);
1473 */
1474 Oid resultTypeId = InvalidOid;
1475 TupleDesc resultTupleDesc = NULL;
1476
1477 /* get_expr_result_type blesses the tuple descriptor */
1478 TypeFuncClass typeClass = get_expr_result_type((Node *) expr, &resultTypeId,
1479 &resultTupleDesc);
1480
1481 if (typeClass == TYPEFUNC_COMPOSITE)
1482 {
1483 typeMod = resultTupleDesc->tdtypmod;
1484 }
1485 }
1486 else if (IsA(expr, RowExpr))
1487 {
1488 /*
1489 * Handle row expressions, e.g. SELECT (1,2);
1490 */
1491 RowExpr *rowExpr = (RowExpr *) expr;
1492 ListCell *argCell = NULL;
1493 int currentResno = 1;
1494
1495 TupleDesc rowTupleDesc = CreateTemplateTupleDesc(list_length(rowExpr->args));
1496
1497 foreach(argCell, rowExpr->args)
1498 {
1499 Node *rowArg = (Node *) lfirst(argCell);
1500 Oid rowArgTypeId = exprType(rowArg);
1501 int rowArgTypeMod = exprTypmod(rowArg);
1502
1503 if (rowArgTypeId == RECORDOID || rowArgTypeId == RECORDARRAYOID)
1504 {
1505 /* ensure nested rows are blessed as well */
1506 rowArgTypeMod = BlessRecordExpression((Expr *) rowArg);
1507 }
1508
1509 TupleDescInitEntry(rowTupleDesc, currentResno, NULL,
1510 rowArgTypeId, rowArgTypeMod, 0);
1511 TupleDescInitEntryCollation(rowTupleDesc, currentResno,
1512 exprCollation(rowArg));
1513
1514 currentResno++;
1515 }
1516
1517 BlessTupleDesc(rowTupleDesc);
1518
1519 typeMod = rowTupleDesc->tdtypmod;
1520 }
1521 else if (IsA(expr, ArrayExpr))
1522 {
1523 /*
1524 * Handle row array expressions, e.g. SELECT ARRAY[(1,2)];
1525 * Postgres allows ARRAY[(1,2),(1,2,3)]. We do not.
1526 */
1527 ArrayExpr *arrayExpr = (ArrayExpr *) expr;
1528
1529 typeMod = BlessRecordExpressionList(arrayExpr->elements);
1530 }
1531 else if (IsA(expr, NullIfExpr))
1532 {
1533 NullIfExpr *nullIfExpr = (NullIfExpr *) expr;
1534
1535 typeMod = BlessRecordExpressionList(nullIfExpr->args);
1536 }
1537 else if (IsA(expr, MinMaxExpr))
1538 {
1539 MinMaxExpr *minMaxExpr = (MinMaxExpr *) expr;
1540
1541 typeMod = BlessRecordExpressionList(minMaxExpr->args);
1542 }
1543 else if (IsA(expr, CoalesceExpr))
1544 {
1545 CoalesceExpr *coalesceExpr = (CoalesceExpr *) expr;
1546
1547 typeMod = BlessRecordExpressionList(coalesceExpr->args);
1548 }
1549 else if (IsA(expr, CaseExpr))
1550 {
1551 CaseExpr *caseExpr = (CaseExpr *) expr;
1552 List *results = NIL;
1553 ListCell *whenCell = NULL;
1554
1555 foreach(whenCell, caseExpr->args)
1556 {
1557 CaseWhen *whenArg = (CaseWhen *) lfirst(whenCell);
1558
1559 results = lappend(results, whenArg->result);
1560 }
1561
1562 if (caseExpr->defresult != NULL)
1563 {
1564 results = lappend(results, caseExpr->defresult);
1565 }
1566
1567 typeMod = BlessRecordExpressionList(results);
1568 }
1569
1570 return typeMod;
1571 }
1572
1573
1574 /*
1575 * BlessRecordExpressionList maps BlessRecordExpression over a list.
1576 * Returns typmod of all expressions, or -1 if they are not all the same.
1577 * Ignores expressions with a typmod of -1.
1578 */
1579 static int32
BlessRecordExpressionList(List * exprs)1580 BlessRecordExpressionList(List *exprs)
1581 {
1582 int32 finalTypeMod = -1;
1583 ListCell *exprCell = NULL;
1584 foreach(exprCell, exprs)
1585 {
1586 Node *exprArg = (Node *) lfirst(exprCell);
1587 int32 exprTypeMod = BlessRecordExpression((Expr *) exprArg);
1588
1589 if (exprTypeMod == -1)
1590 {
1591 continue;
1592 }
1593 else if (finalTypeMod == -1)
1594 {
1595 finalTypeMod = exprTypeMod;
1596 }
1597 else if (finalTypeMod != exprTypeMod)
1598 {
1599 return -1;
1600 }
1601 }
1602 return finalTypeMod;
1603 }
1604
1605
1606 /*
1607 * RemoteScanRangeTableEntry creates a range table entry from given column name
1608 * list to represent a remote scan.
1609 */
1610 RangeTblEntry *
RemoteScanRangeTableEntry(List * columnNameList)1611 RemoteScanRangeTableEntry(List *columnNameList)
1612 {
1613 RangeTblEntry *remoteScanRangeTableEntry = makeNode(RangeTblEntry);
1614
1615 /* we use RTE_VALUES for custom scan because we can't look up relation */
1616 remoteScanRangeTableEntry->rtekind = RTE_VALUES;
1617 remoteScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList);
1618 remoteScanRangeTableEntry->inh = false;
1619 remoteScanRangeTableEntry->inFromCl = true;
1620
1621 return remoteScanRangeTableEntry;
1622 }
1623
1624
1625 /*
1626 * CheckNodeIsDumpable checks that the passed node can be dumped using
1627 * nodeToString(). As this checks is expensive, it's only active when
1628 * assertions are enabled.
1629 */
1630 static void
CheckNodeIsDumpable(Node * node)1631 CheckNodeIsDumpable(Node *node)
1632 {
1633 #ifdef USE_ASSERT_CHECKING
1634 char *out = nodeToString(node);
1635 pfree(out);
1636 #endif
1637 }
1638
1639
1640 /*
1641 * CheckNodeCopyAndSerialization checks copy/dump/read functions
1642 * for nodes and returns copy of the input.
1643 *
1644 * It is only active when assertions are enabled, otherwise it returns
1645 * the input directly. We use this to confirm that our serialization
1646 * and copy logic produces the correct plan during regression tests.
1647 *
1648 * It does not check string equality on node dumps due to differences
1649 * in some Postgres types.
1650 */
1651 static Node *
CheckNodeCopyAndSerialization(Node * node)1652 CheckNodeCopyAndSerialization(Node *node)
1653 {
1654 #ifdef USE_ASSERT_CHECKING
1655 char *out = nodeToString(node);
1656 Node *nodeCopy = copyObject(node);
1657 char *outCopy = nodeToString(nodeCopy);
1658
1659 pfree(out);
1660 pfree(outCopy);
1661
1662 return nodeCopy;
1663 #else
1664 return node;
1665 #endif
1666 }
1667
1668
1669 /*
1670 * multi_join_restriction_hook is a hook called by postgresql standard planner
1671 * to notify us about various planning information regarding joins. We use
1672 * it to learn about the joining column.
1673 */
1674 void
multi_join_restriction_hook(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)1675 multi_join_restriction_hook(PlannerInfo *root,
1676 RelOptInfo *joinrel,
1677 RelOptInfo *outerrel,
1678 RelOptInfo *innerrel,
1679 JoinType jointype,
1680 JoinPathExtraData *extra)
1681 {
1682 if (bms_is_empty(innerrel->relids) || bms_is_empty(outerrel->relids))
1683 {
1684 /*
1685 * We do not expect empty relids. Still, ignoring such JoinRestriction is
1686 * preferable for two reasons:
1687 * 1. This might be a query that doesn't rely on JoinRestrictions at all (e.g.,
1688 * local query).
1689 * 2. We cannot process them when they are empty (and likely to segfault if
1690 * we allow as-is).
1691 */
1692 ereport(DEBUG1, (errmsg("Join restriction information is NULL")));
1693 }
1694
1695 /*
1696 * Use a memory context that's guaranteed to live long enough, could be
1697 * called in a more shortly lived one (e.g. with GEQO).
1698 */
1699 PlannerRestrictionContext *plannerRestrictionContext =
1700 CurrentPlannerRestrictionContext();
1701 MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
1702 MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
1703
1704 JoinRestrictionContext *joinRestrictionContext =
1705 plannerRestrictionContext->joinRestrictionContext;
1706 Assert(joinRestrictionContext != NULL);
1707
1708 JoinRestriction *joinRestriction = palloc0(sizeof(JoinRestriction));
1709 joinRestriction->joinType = jointype;
1710 joinRestriction->plannerInfo = root;
1711
1712 /*
1713 * We create a copy of restrictInfoList and relids because with geqo they may
1714 * be created in a memory context which will be deleted when we still need it,
1715 * thus we create a copy of it in our memory context.
1716 */
1717 joinRestriction->joinRestrictInfoList = copyObject(extra->restrictlist);
1718 joinRestriction->innerrelRelids = bms_copy(innerrel->relids);
1719 joinRestriction->outerrelRelids = bms_copy(outerrel->relids);
1720
1721 joinRestrictionContext->joinRestrictionList =
1722 lappend(joinRestrictionContext->joinRestrictionList, joinRestriction);
1723
1724 /*
1725 * Keep track if we received any semi joins here. If we didn't we can
1726 * later safely convert any semi joins in the rewritten query to inner
1727 * joins.
1728 */
1729 joinRestrictionContext->hasSemiJoin = joinRestrictionContext->hasSemiJoin ||
1730 extra->sjinfo->jointype == JOIN_SEMI;
1731
1732 MemoryContextSwitchTo(oldMemoryContext);
1733 }
1734
1735
1736 /*
1737 * multi_relation_restriction_hook is a hook called by postgresql standard planner
1738 * to notify us about various planning information regarding a relation. We use
1739 * it to retrieve restrictions on relations.
1740 */
1741 void
multi_relation_restriction_hook(PlannerInfo * root,RelOptInfo * relOptInfo,Index restrictionIndex,RangeTblEntry * rte)1742 multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
1743 Index restrictionIndex, RangeTblEntry *rte)
1744 {
1745 CitusTableCacheEntry *cacheEntry = NULL;
1746
1747 if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte))
1748 {
1749 /*
1750 * We got here by planning the query part that needs to be executed on the query
1751 * coordinator node.
1752 * We have verified the occurrence of the citus_extra_datacontainer function
1753 * encoding the remote scan we plan to execute here. We will replace all paths
1754 * with a path describing our custom scan.
1755 */
1756 Path *path = CreateCitusCustomScanPath(root, relOptInfo, restrictionIndex, rte,
1757 ReplaceCitusExtraDataContainerWithCustomScan);
1758
1759 /* replace all paths with our custom scan and recalculate cheapest */
1760 relOptInfo->pathlist = list_make1(path);
1761 set_cheapest(relOptInfo);
1762
1763 return;
1764 }
1765
1766 AdjustReadIntermediateResultCost(rte, relOptInfo);
1767 AdjustReadIntermediateResultArrayCost(rte, relOptInfo);
1768
1769 if (rte->rtekind != RTE_RELATION)
1770 {
1771 return;
1772 }
1773
1774 /*
1775 * Use a memory context that's guaranteed to live long enough, could be
1776 * called in a more shortly lived one (e.g. with GEQO).
1777 */
1778 PlannerRestrictionContext *plannerRestrictionContext =
1779 CurrentPlannerRestrictionContext();
1780 MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
1781 MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
1782
1783 bool distributedTable = IsCitusTable(rte->relid);
1784
1785 RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
1786 relationRestriction->index = restrictionIndex;
1787 relationRestriction->relationId = rte->relid;
1788 relationRestriction->rte = rte;
1789 relationRestriction->relOptInfo = relOptInfo;
1790 relationRestriction->distributedRelation = distributedTable;
1791 relationRestriction->plannerInfo = root;
1792
1793 /* see comments on GetVarFromAssignedParam() */
1794 relationRestriction->outerPlanParamsList = OuterPlanParamsList(root);
1795 relationRestriction->translatedVars = TranslatedVars(root,
1796 relationRestriction->index);
1797
1798 RelationRestrictionContext *relationRestrictionContext =
1799 plannerRestrictionContext->relationRestrictionContext;
1800
1801 /*
1802 * We're also keeping track of whether all participant
1803 * tables are reference tables.
1804 */
1805 if (distributedTable)
1806 {
1807 cacheEntry = GetCitusTableCacheEntry(rte->relid);
1808
1809 relationRestrictionContext->allReferenceTables &=
1810 IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
1811 }
1812
1813 relationRestrictionContext->relationRestrictionList =
1814 lappend(relationRestrictionContext->relationRestrictionList, relationRestriction);
1815
1816 MemoryContextSwitchTo(oldMemoryContext);
1817 }
1818
1819
1820 /*
1821 * TranslatedVars deep copies the translated vars for the given relation index
1822 * if there is any append rel list.
1823 */
1824 static List *
TranslatedVars(PlannerInfo * root,int relationIndex)1825 TranslatedVars(PlannerInfo *root, int relationIndex)
1826 {
1827 List *translatedVars = NIL;
1828
1829 if (root->append_rel_list != NIL)
1830 {
1831 AppendRelInfo *targetAppendRelInfo =
1832 FindTargetAppendRelInfo(root, relationIndex);
1833 if (targetAppendRelInfo != NULL)
1834 {
1835 /* postgres deletes translated_vars after pg13, hence we deep copy them here */
1836 Node *targetNode = NULL;
1837 foreach_ptr(targetNode, targetAppendRelInfo->translated_vars)
1838 {
1839 translatedVars =
1840 lappend(translatedVars, copyObject(targetNode));
1841 }
1842 }
1843 }
1844 return translatedVars;
1845 }
1846
1847
1848 /*
1849 * FindTargetAppendRelInfo finds the target append rel info for the given
1850 * relation rte index.
1851 */
1852 static AppendRelInfo *
FindTargetAppendRelInfo(PlannerInfo * root,int relationRteIndex)1853 FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex)
1854 {
1855 AppendRelInfo *appendRelInfo = NULL;
1856
1857 /* iterate on the queries that are part of UNION ALL subselects */
1858 foreach_ptr(appendRelInfo, root->append_rel_list)
1859 {
1860 /*
1861 * We're only interested in the child rel that is equal to the
1862 * relation we're investigating. Here we don't need to find the offset
1863 * because postgres adds an offset to child_relid and parent_relid after
1864 * calling multi_relation_restriction_hook.
1865 */
1866 if (appendRelInfo->child_relid == relationRteIndex)
1867 {
1868 return appendRelInfo;
1869 }
1870 }
1871 return NULL;
1872 }
1873
1874
1875 /*
1876 * AdjustReadIntermediateResultCost adjusts the row count and total cost
1877 * of a read_intermediate_result call based on the file size.
1878 */
1879 static void
AdjustReadIntermediateResultCost(RangeTblEntry * rangeTableEntry,RelOptInfo * relOptInfo)1880 AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *relOptInfo)
1881 {
1882 if (rangeTableEntry->rtekind != RTE_FUNCTION ||
1883 list_length(rangeTableEntry->functions) != 1)
1884 {
1885 /* avoid more expensive checks below for non-functions */
1886 return;
1887 }
1888
1889 if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5))
1890 {
1891 /* read_intermediate_result may not exist */
1892 return;
1893 }
1894
1895 if (!ContainsReadIntermediateResultFunction((Node *) rangeTableEntry->functions))
1896 {
1897 return;
1898 }
1899
1900 RangeTblFunction *rangeTableFunction = (RangeTblFunction *) linitial(
1901 rangeTableEntry->functions);
1902 FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
1903 Const *resultIdConst = (Const *) linitial(funcExpression->args);
1904 if (!IsA(resultIdConst, Const))
1905 {
1906 /* not sure how to interpret non-const */
1907 return;
1908 }
1909
1910 Datum resultIdDatum = resultIdConst->constvalue;
1911
1912 Const *resultFormatConst = (Const *) lsecond(funcExpression->args);
1913 if (!IsA(resultFormatConst, Const))
1914 {
1915 /* not sure how to interpret non-const */
1916 return;
1917 }
1918
1919 AdjustReadIntermediateResultsCostInternal(relOptInfo,
1920 rangeTableFunction->funccoltypes,
1921 1, &resultIdDatum, resultFormatConst);
1922 }
1923
1924
1925 /*
1926 * AdjustReadIntermediateResultArrayCost adjusts the row count and total cost
1927 * of a read_intermediate_results(resultIds, format) call based on the file size.
1928 */
1929 static void
AdjustReadIntermediateResultArrayCost(RangeTblEntry * rangeTableEntry,RelOptInfo * relOptInfo)1930 AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
1931 RelOptInfo *relOptInfo)
1932 {
1933 Datum *resultIdArray = NULL;
1934 int resultIdCount = 0;
1935
1936 if (rangeTableEntry->rtekind != RTE_FUNCTION ||
1937 list_length(rangeTableEntry->functions) != 1)
1938 {
1939 /* avoid more expensive checks below for non-functions */
1940 return;
1941 }
1942
1943 if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5))
1944 {
1945 /* read_intermediate_result may not exist */
1946 return;
1947 }
1948
1949 if (!ContainsReadIntermediateResultArrayFunction((Node *) rangeTableEntry->functions))
1950 {
1951 return;
1952 }
1953
1954 RangeTblFunction *rangeTableFunction =
1955 (RangeTblFunction *) linitial(rangeTableEntry->functions);
1956 FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
1957 Const *resultIdConst = (Const *) linitial(funcExpression->args);
1958 if (!IsA(resultIdConst, Const))
1959 {
1960 /* not sure how to interpret non-const */
1961 return;
1962 }
1963
1964 Datum resultIdArrayDatum = resultIdConst->constvalue;
1965 deconstruct_array(DatumGetArrayTypeP(resultIdArrayDatum), TEXTOID, -1, false,
1966 'i', &resultIdArray, NULL, &resultIdCount);
1967
1968 Const *resultFormatConst = (Const *) lsecond(funcExpression->args);
1969 if (!IsA(resultFormatConst, Const))
1970 {
1971 /* not sure how to interpret non-const */
1972 return;
1973 }
1974
1975 AdjustReadIntermediateResultsCostInternal(relOptInfo,
1976 rangeTableFunction->funccoltypes,
1977 resultIdCount, resultIdArray,
1978 resultFormatConst);
1979 }
1980
1981
1982 /*
1983 * AdjustReadIntermediateResultsCostInternal adjusts the row count and total cost
1984 * of reading intermediate results based on file sizes.
1985 */
1986 static void
AdjustReadIntermediateResultsCostInternal(RelOptInfo * relOptInfo,List * columnTypes,int resultIdCount,Datum * resultIds,Const * resultFormatConst)1987 AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, List *columnTypes,
1988 int resultIdCount, Datum *resultIds,
1989 Const *resultFormatConst)
1990 {
1991 PathTarget *reltarget = relOptInfo->reltarget;
1992 List *pathList = relOptInfo->pathlist;
1993 double rowCost = 0.;
1994 double rowSizeEstimate = 0;
1995 double rowCountEstimate = 0.;
1996 double ioCost = 0.;
1997 QualCost funcCost = { 0., 0. };
1998 int64 totalResultSize = 0;
1999 ListCell *typeCell = NULL;
2000
2001 Datum resultFormatDatum = resultFormatConst->constvalue;
2002 Oid resultFormatId = DatumGetObjectId(resultFormatDatum);
2003 bool binaryFormat = (resultFormatId == BinaryCopyFormatId());
2004
2005 for (int index = 0; index < resultIdCount; index++)
2006 {
2007 char *resultId = TextDatumGetCString(resultIds[index]);
2008 int64 resultSize = IntermediateResultSize(resultId);
2009 if (resultSize < 0)
2010 {
2011 /* result does not exist, will probably error out later on */
2012 return;
2013 }
2014
2015 if (binaryFormat)
2016 {
2017 /* subtract 11-byte signature + 8 byte header + 2-byte footer */
2018 totalResultSize -= 21;
2019 }
2020
2021 totalResultSize += resultSize;
2022 }
2023
2024 /* start with the cost of evaluating quals */
2025 rowCost += relOptInfo->baserestrictcost.per_tuple;
2026
2027 /* postgres' estimate for the width of the rows */
2028 rowSizeEstimate += reltarget->width;
2029
2030 /* add 2 bytes for column count (binary) or line separator (text) */
2031 rowSizeEstimate += 2;
2032
2033 foreach(typeCell, columnTypes)
2034 {
2035 Oid columnTypeId = lfirst_oid(typeCell);
2036 Oid inputFunctionId = InvalidOid;
2037 Oid typeIOParam = InvalidOid;
2038
2039 if (binaryFormat)
2040 {
2041 getTypeBinaryInputInfo(columnTypeId, &inputFunctionId, &typeIOParam);
2042
2043 /* binary format: 4 bytes for field size */
2044 rowSizeEstimate += 4;
2045 }
2046 else
2047 {
2048 getTypeInputInfo(columnTypeId, &inputFunctionId, &typeIOParam);
2049
2050 /* text format: 1 byte for tab separator */
2051 rowSizeEstimate += 1;
2052 }
2053
2054
2055 /* add the cost of parsing a column */
2056 add_function_cost(NULL, inputFunctionId, NULL, &funcCost);
2057 }
2058 rowCost += funcCost.per_tuple;
2059
2060 /* estimate the number of rows based on the file size and estimated row size */
2061 rowCountEstimate = Max(1, (double) totalResultSize / rowSizeEstimate);
2062
2063 /* cost of reading the data */
2064 ioCost = seq_page_cost * totalResultSize / BLCKSZ;
2065
2066 Assert(pathList != NIL);
2067
2068 /* tell the planner about the cost and row count of the function */
2069 Path *path = (Path *) linitial(pathList);
2070 path->rows = rowCountEstimate;
2071 path->total_cost = rowCountEstimate * rowCost + ioCost;
2072
2073 path->startup_cost = funcCost.startup + relOptInfo->baserestrictcost.startup;
2074 }
2075
2076
2077 /*
2078 * OuterPlanParamsList creates a list of RootPlanParams for outer nodes of the
2079 * given root. The first item in the list corresponds to parent_root, and the
2080 * last item corresponds to the outer most node.
2081 */
2082 static List *
OuterPlanParamsList(PlannerInfo * root)2083 OuterPlanParamsList(PlannerInfo *root)
2084 {
2085 List *planParamsList = NIL;
2086
2087 for (PlannerInfo *outerNodeRoot = root->parent_root; outerNodeRoot != NULL;
2088 outerNodeRoot = outerNodeRoot->parent_root)
2089 {
2090 RootPlanParams *rootPlanParams = palloc0(sizeof(RootPlanParams));
2091 rootPlanParams->root = outerNodeRoot;
2092
2093 /*
2094 * TODO: In SearchPlannerParamList() we are only interested in Var plan
2095 * params, consider copying just them here.
2096 */
2097 rootPlanParams->plan_params = CopyPlanParamList(outerNodeRoot->plan_params);
2098
2099 planParamsList = lappend(planParamsList, rootPlanParams);
2100 }
2101
2102 return planParamsList;
2103 }
2104
2105
2106 /*
2107 * CopyPlanParamList deep copies the input PlannerParamItem list and returns the newly
2108 * allocated list.
2109 * Note that we cannot use copyObject() function directly since there is no support for
2110 * copying PlannerParamItem structs.
2111 */
2112 static List *
CopyPlanParamList(List * originalPlanParamList)2113 CopyPlanParamList(List *originalPlanParamList)
2114 {
2115 ListCell *planParamCell = NULL;
2116 List *copiedPlanParamList = NIL;
2117
2118 foreach(planParamCell, originalPlanParamList)
2119 {
2120 PlannerParamItem *originalParamItem = lfirst(planParamCell);
2121 PlannerParamItem *copiedParamItem = makeNode(PlannerParamItem);
2122
2123 copiedParamItem->paramId = originalParamItem->paramId;
2124 copiedParamItem->item = copyObject(originalParamItem->item);
2125
2126 copiedPlanParamList = lappend(copiedPlanParamList, copiedParamItem);
2127 }
2128
2129 return copiedPlanParamList;
2130 }
2131
2132
2133 /*
2134 * CreateAndPushPlannerRestrictionContext creates a new relation restriction context
2135 * and a new join context, inserts it to the beginning of the
2136 * plannerRestrictionContextList. Finally, the planner restriction context is
2137 * inserted to the beginning of the plannerRestrictionContextList and it is returned.
2138 */
2139 static PlannerRestrictionContext *
CreateAndPushPlannerRestrictionContext(void)2140 CreateAndPushPlannerRestrictionContext(void)
2141 {
2142 PlannerRestrictionContext *plannerRestrictionContext =
2143 palloc0(sizeof(PlannerRestrictionContext));
2144
2145 plannerRestrictionContext->relationRestrictionContext =
2146 palloc0(sizeof(RelationRestrictionContext));
2147
2148 plannerRestrictionContext->joinRestrictionContext =
2149 palloc0(sizeof(JoinRestrictionContext));
2150
2151 plannerRestrictionContext->fastPathRestrictionContext =
2152 palloc0(sizeof(FastPathRestrictionContext));
2153
2154 plannerRestrictionContext->memoryContext = CurrentMemoryContext;
2155
2156 /* we'll apply logical AND as we add tables */
2157 plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true;
2158
2159 plannerRestrictionContextList = lcons(plannerRestrictionContext,
2160 plannerRestrictionContextList);
2161
2162 return plannerRestrictionContext;
2163 }
2164
2165
2166 /*
2167 * TranslatedVarsForRteIdentity gets an rteIdentity and returns the
2168 * translatedVars that belong to the range table relation. If no
2169 * translatedVars found, the function returns NIL;
2170 */
2171 List *
TranslatedVarsForRteIdentity(int rteIdentity)2172 TranslatedVarsForRteIdentity(int rteIdentity)
2173 {
2174 PlannerRestrictionContext *currentPlannerRestrictionContext =
2175 CurrentPlannerRestrictionContext();
2176
2177 List *relationRestrictionList =
2178 currentPlannerRestrictionContext->relationRestrictionContext->
2179 relationRestrictionList;
2180 RelationRestriction *relationRestriction = NULL;
2181 foreach_ptr(relationRestriction, relationRestrictionList)
2182 {
2183 if (GetRTEIdentity(relationRestriction->rte) == rteIdentity)
2184 {
2185 return relationRestriction->translatedVars;
2186 }
2187 }
2188
2189 return NIL;
2190 }
2191
2192
2193 /*
2194 * CurrentRestrictionContext returns the most recently added
2195 * PlannerRestrictionContext from the plannerRestrictionContextList list.
2196 */
2197 static PlannerRestrictionContext *
CurrentPlannerRestrictionContext(void)2198 CurrentPlannerRestrictionContext(void)
2199 {
2200 Assert(plannerRestrictionContextList != NIL);
2201
2202 PlannerRestrictionContext *plannerRestrictionContext =
2203 (PlannerRestrictionContext *) linitial(plannerRestrictionContextList);
2204
2205 if (plannerRestrictionContext == NULL)
2206 {
2207 ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
2208 errmsg("planner restriction context stack was empty"),
2209 errdetail("Please report this to the Citus core team.")));
2210 }
2211
2212 return plannerRestrictionContext;
2213 }
2214
2215
2216 /*
2217 * PopPlannerRestrictionContext removes the most recently added restriction contexts from
2218 * the planner restriction context list. The function assumes the list is not empty.
2219 */
2220 static void
PopPlannerRestrictionContext(void)2221 PopPlannerRestrictionContext(void)
2222 {
2223 plannerRestrictionContextList = list_delete_first(plannerRestrictionContextList);
2224 }
2225
2226
2227 /*
2228 * ResetPlannerRestrictionContext resets the element of the given planner
2229 * restriction context.
2230 */
2231 static void
ResetPlannerRestrictionContext(PlannerRestrictionContext * plannerRestrictionContext)2232 ResetPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionContext)
2233 {
2234 plannerRestrictionContext->relationRestrictionContext =
2235 palloc0(sizeof(RelationRestrictionContext));
2236
2237 plannerRestrictionContext->joinRestrictionContext =
2238 palloc0(sizeof(JoinRestrictionContext));
2239
2240 plannerRestrictionContext->fastPathRestrictionContext =
2241 palloc0(sizeof(FastPathRestrictionContext));
2242
2243
2244 /* we'll apply logical AND as we add tables */
2245 plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true;
2246 }
2247
2248
2249 /*
2250 * HasUnresolvedExternParamsWalker returns true if the passed in expression
2251 * has external parameters that are not contained in boundParams, false
2252 * otherwise.
2253 */
2254 bool
HasUnresolvedExternParamsWalker(Node * expression,ParamListInfo boundParams)2255 HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
2256 {
2257 if (expression == NULL)
2258 {
2259 return false;
2260 }
2261
2262 if (IsA(expression, Param))
2263 {
2264 Param *param = (Param *) expression;
2265 int paramId = param->paramid;
2266
2267 /* only care about user supplied parameters */
2268 if (param->paramkind != PARAM_EXTERN)
2269 {
2270 return false;
2271 }
2272
2273 /* check whether parameter is available (and valid) */
2274 if (boundParams && paramId > 0 && paramId <= boundParams->numParams)
2275 {
2276 ParamExternData *externParam = NULL;
2277
2278 /* give hook a chance in case parameter is dynamic */
2279 if (boundParams->paramFetch != NULL)
2280 {
2281 ParamExternData externParamPlaceholder;
2282 externParam = (*boundParams->paramFetch)(boundParams, paramId, false,
2283 &externParamPlaceholder);
2284 }
2285 else
2286 {
2287 externParam = &boundParams->params[paramId - 1];
2288 }
2289
2290 Oid paramType = externParam->ptype;
2291 if (OidIsValid(paramType))
2292 {
2293 return false;
2294 }
2295 }
2296
2297 return true;
2298 }
2299
2300 /* keep traversing */
2301 if (IsA(expression, Query))
2302 {
2303 return query_tree_walker((Query *) expression,
2304 HasUnresolvedExternParamsWalker,
2305 boundParams,
2306 0);
2307 }
2308 else
2309 {
2310 return expression_tree_walker(expression,
2311 HasUnresolvedExternParamsWalker,
2312 boundParams);
2313 }
2314 }
2315
2316
2317 /*
2318 * GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that
2319 * returns RTEListProperties for the rte list retrieved from query.
2320 */
2321 RTEListProperties *
GetRTEListPropertiesForQuery(Query * query)2322 GetRTEListPropertiesForQuery(Query *query)
2323 {
2324 List *rteList = ExtractRangeTableEntryList(query);
2325 return GetRTEListProperties(rteList);
2326 }
2327
2328
2329 /*
2330 * GetRTEListProperties returns RTEListProperties struct processing the given
2331 * rangeTableList.
2332 */
2333 static RTEListProperties *
GetRTEListProperties(List * rangeTableList)2334 GetRTEListProperties(List *rangeTableList)
2335 {
2336 RTEListProperties *rteListProperties = palloc0(sizeof(RTEListProperties));
2337
2338 RangeTblEntry *rangeTableEntry = NULL;
2339 foreach_ptr(rangeTableEntry, rangeTableList)
2340 {
2341 if (rangeTableEntry->rtekind != RTE_RELATION)
2342 {
2343 continue;
2344 }
2345 else if (rangeTableEntry->relkind == RELKIND_VIEW)
2346 {
2347 /*
2348 * Skip over views, distributed tables within (regular) views are
2349 * already in rangeTableList.
2350 */
2351 continue;
2352 }
2353
2354
2355 if (rangeTableEntry->relkind == RELKIND_MATVIEW)
2356 {
2357 /*
2358 * Record materialized views as they are similar to postgres local tables
2359 * but it is nice to record them separately.
2360 *
2361 * Regular tables, partitioned tables or foreign tables can be a local or
2362 * distributed tables and we can qualify them accurately.
2363 *
2364 * For regular views, we don't care because their definitions are already
2365 * in the same query tree and we can detect what is inside the view definition.
2366 *
2367 * For materialized views, they are just local tables in the queries. But, when
2368 * REFRESH MATERIALIZED VIEW is used, they behave similar to regular views, adds
2369 * the view definition to the query. Hence, it is useful to record it seperately
2370 * and let the callers decide on what to do.
2371 */
2372 rteListProperties->hasMaterializedView = true;
2373 continue;
2374 }
2375
2376 Oid relationId = rangeTableEntry->relid;
2377 CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId);
2378 if (!cacheEntry)
2379 {
2380 rteListProperties->hasPostgresLocalTable = true;
2381 }
2382 else if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE))
2383 {
2384 rteListProperties->hasReferenceTable = true;
2385 }
2386 else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_TABLE))
2387 {
2388 rteListProperties->hasCitusLocalTable = true;
2389 }
2390 else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE))
2391 {
2392 rteListProperties->hasDistributedTable = true;
2393 }
2394 else
2395 {
2396 /* it's not expected, but let's do a bug catch here */
2397 ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
2398 errmsg("encountered with an unexpected citus "
2399 "table type while processing range table "
2400 "entries of query")));
2401 }
2402 }
2403
2404 rteListProperties->hasCitusTable = (rteListProperties->hasDistributedTable ||
2405 rteListProperties->hasReferenceTable ||
2406 rteListProperties->hasCitusLocalTable);
2407
2408 return rteListProperties;
2409 }
2410