1 /*-------------------------------------------------------------------------
2  *
3  * distributed_planner.c
4  *	  General Citus planner code.
5  *
6  * Copyright (c) Citus Data, Inc.
7  *-------------------------------------------------------------------------
8  */
9 
10 #include "postgres.h"
11 
12 #include "distributed/pg_version_constants.h"
13 
14 #include "funcapi.h"
15 
16 #include <float.h>
17 #include <limits.h>
18 
19 #include "access/htup_details.h"
20 #include "catalog/pg_class.h"
21 #include "catalog/pg_proc.h"
22 #include "catalog/pg_type.h"
23 #include "distributed/citus_nodefuncs.h"
24 #include "distributed/citus_nodes.h"
25 #include "distributed/citus_ruleutils.h"
26 #include "distributed/cte_inline.h"
27 #include "distributed/function_call_delegation.h"
28 #include "distributed/insert_select_planner.h"
29 #include "distributed/intermediate_result_pruning.h"
30 #include "distributed/intermediate_results.h"
31 #include "distributed/listutils.h"
32 #include "distributed/coordinator_protocol.h"
33 #include "distributed/metadata_cache.h"
34 #include "distributed/multi_executor.h"
35 #include "distributed/distributed_planner.h"
36 #include "distributed/query_pushdown_planning.h"
37 #include "distributed/multi_logical_optimizer.h"
38 #include "distributed/multi_logical_planner.h"
39 #include "distributed/multi_partitioning_utils.h"
40 #include "distributed/multi_physical_planner.h"
41 #include "distributed/combine_query_planner.h"
42 #include "distributed/multi_router_planner.h"
43 #include "distributed/query_utils.h"
44 #include "distributed/recursive_planning.h"
45 #include "distributed/shardinterval_utils.h"
46 #include "distributed/shard_utils.h"
47 #include "distributed/version_compat.h"
48 #include "distributed/worker_shard_visibility.h"
49 #include "executor/executor.h"
50 #include "nodes/makefuncs.h"
51 #include "nodes/nodeFuncs.h"
52 #include "nodes/pg_list.h"
53 #include "parser/parsetree.h"
54 #include "parser/parse_type.h"
55 #include "optimizer/optimizer.h"
56 #include "optimizer/plancat.h"
57 #include "optimizer/pathnode.h"
58 #include "optimizer/planner.h"
59 #include "optimizer/planmain.h"
60 #include "utils/builtins.h"
61 #include "utils/datum.h"
62 #include "utils/lsyscache.h"
63 #include "utils/memutils.h"
64 #include "utils/syscache.h"
65 
66 
67 static List *plannerRestrictionContextList = NIL;
68 int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
69 static uint64 NextPlanId = 1;
70 
71 /* keep track of planner call stack levels */
72 int PlannerLevel = 0;
73 
74 static bool ListContainsDistributedTableRTE(List *rangeTableList);
75 static bool IsUpdateOrDelete(Query *query);
76 static PlannedStmt * CreateDistributedPlannedStmt(
77 	DistributedPlanningContext *planContext);
78 static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
79 															   DistributedPlanningContext
80 															   *planContext);
81 static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
82 													 Query *originalQuery,
83 													 Query *query, ParamListInfo
84 													 boundParams,
85 													 PlannerRestrictionContext *
86 													 plannerRestrictionContext);
87 static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
88 																			relationId);
89 
90 static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter);
91 static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
92 static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
93 													 bool setPartitionedTablesInherited);
94 static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
95 										   DistributedPlan *distributedPlan,
96 										   CustomScan *customScan);
97 static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
98 static AppendRelInfo * FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex);
99 static List * makeTargetListFromCustomScanList(List *custom_scan_tlist);
100 static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist);
101 static int32 BlessRecordExpressionList(List *exprs);
102 static void CheckNodeIsDumpable(Node *node);
103 static Node * CheckNodeCopyAndSerialization(Node *node);
104 static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
105 											 RelOptInfo *relOptInfo);
106 static void AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
107 												  RelOptInfo *relOptInfo);
108 static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
109 													  List *columnTypes,
110 													  int resultIdCount,
111 													  Datum *resultIds,
112 													  Const *resultFormatConst);
113 static List * OuterPlanParamsList(PlannerInfo *root);
114 static List * CopyPlanParamList(List *originalPlanParamList);
115 static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
116 static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
117 static void PopPlannerRestrictionContext(void);
118 static void ResetPlannerRestrictionContext(
119 	PlannerRestrictionContext *plannerRestrictionContext);
120 static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
121 												 Node *distributionKeyValue);
122 static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
123 										 int rteIdCounter);
124 static RTEListProperties * GetRTEListProperties(List *rangeTableList);
125 static List * TranslatedVars(PlannerInfo *root, int relationIndex);
126 
127 
128 /* Distributed planner hook */
129 PlannedStmt *
distributed_planner(Query * parse,const char * query_string,int cursorOptions,ParamListInfo boundParams)130 distributed_planner(Query *parse,
131 	#if PG_VERSION_NUM >= PG_VERSION_13
132 					const char *query_string,
133 	#endif
134 					int cursorOptions,
135 					ParamListInfo boundParams)
136 {
137 	bool needsDistributedPlanning = false;
138 	bool fastPathRouterQuery = false;
139 	Node *distributionKeyValue = NULL;
140 
141 	List *rangeTableList = ExtractRangeTableEntryList(parse);
142 
143 	if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
144 	{
145 		/* this cursor flag could only be set when Citus has been loaded */
146 		Assert(CitusHasBeenLoaded());
147 
148 		needsDistributedPlanning = true;
149 	}
150 	else if (CitusHasBeenLoaded())
151 	{
152 		needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList);
153 		if (needsDistributedPlanning)
154 		{
155 			fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
156 		}
157 	}
158 
159 	int rteIdCounter = 1;
160 
161 	DistributedPlanningContext planContext = {
162 		.query = parse,
163 		.cursorOptions = cursorOptions,
164 		.boundParams = boundParams,
165 	};
166 
167 	if (needsDistributedPlanning)
168 	{
169 		/*
170 		 * standard_planner scribbles on it's input, but for deparsing we need the
171 		 * unmodified form. Before copying we call AssignRTEIdentities to be able
172 		 * to match RTEs in the rewritten query tree with those in the original
173 		 * tree.
174 		 */
175 		rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
176 
177 		planContext.originalQuery = copyObject(parse);
178 
179 		/*
180 		 * When there are partitioned tables (not applicable to fast path),
181 		 * pretend that they are regular tables to avoid unnecessary work
182 		 * in standard_planner.
183 		 */
184 		if (!fastPathRouterQuery)
185 		{
186 			bool setPartitionedTablesInherited = false;
187 			AdjustPartitioningForDistributedPlanning(rangeTableList,
188 													 setPartitionedTablesInherited);
189 		}
190 	}
191 
192 	/*
193 	 * Make sure that we hide shard names on the Citus MX worker nodes. See comments in
194 	 * ReplaceTableVisibleFunction() for the details.
195 	 */
196 	ReplaceTableVisibleFunction((Node *) parse);
197 
198 	/* create a restriction context and put it at the end if context list */
199 	planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
200 
201 	/*
202 	 * We keep track of how many times we've recursed into the planner, primarily
203 	 * to detect whether we are in a function call. We need to make sure that the
204 	 * PlannerLevel is decremented exactly once at the end of the next PG_TRY
205 	 * block, both in the happy case and when an error occurs.
206 	 */
207 	PlannerLevel++;
208 
209 	PlannedStmt *result = NULL;
210 
211 	PG_TRY();
212 	{
213 		if (fastPathRouterQuery)
214 		{
215 			result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
216 		}
217 		else
218 		{
219 			/*
220 			 * Call into standard_planner because the Citus planner relies on both the
221 			 * restriction information per table and parse tree transformations made by
222 			 * postgres' planner.
223 			 */
224 			planContext.plan = standard_planner_compat(planContext.query,
225 													   planContext.cursorOptions,
226 													   planContext.boundParams);
227 			if (needsDistributedPlanning)
228 			{
229 				result = PlanDistributedStmt(&planContext, rteIdCounter);
230 			}
231 			else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
232 			{
233 				result = planContext.plan;
234 			}
235 		}
236 	}
237 	PG_CATCH();
238 	{
239 		PopPlannerRestrictionContext();
240 
241 		PlannerLevel--;
242 
243 		PG_RE_THROW();
244 	}
245 	PG_END_TRY();
246 
247 	PlannerLevel--;
248 
249 	/* remove the context from the context list */
250 	PopPlannerRestrictionContext();
251 
252 	/*
253 	 * In some cases, for example; parameterized SQL functions, we may miss that
254 	 * there is a need for distributed planning. Such cases only become clear after
255 	 * standard_planner performs some modifications on parse tree. In such cases
256 	 * we will simply error out.
257 	 */
258 	if (!needsDistributedPlanning && NeedsDistributedPlanning(parse))
259 	{
260 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
261 						errmsg("cannot perform distributed planning on this "
262 							   "query because parameterized queries for SQL "
263 							   "functions referencing distributed tables are "
264 							   "not supported"),
265 						errhint("Consider using PL/pgSQL functions instead.")));
266 	}
267 
268 	return result;
269 }
270 
271 
272 /*
273  * ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
274  * The function traverses the input query and returns all the range table
275  * entries that are in the query tree.
276  */
277 List *
ExtractRangeTableEntryList(Query * query)278 ExtractRangeTableEntryList(Query *query)
279 {
280 	List *rteList = NIL;
281 
282 	ExtractRangeTableEntryWalker((Node *) query, &rteList);
283 
284 	return rteList;
285 }
286 
287 
288 /*
289  * NeedsDistributedPlanning returns true if the Citus extension is loaded and
290  * the query contains a distributed table.
291  *
292  * This function allows queries containing local tables to pass through the
293  * distributed planner. How to handle local tables is a decision that should
294  * be made within the planner
295  */
296 bool
NeedsDistributedPlanning(Query * query)297 NeedsDistributedPlanning(Query *query)
298 {
299 	if (!CitusHasBeenLoaded())
300 	{
301 		return false;
302 	}
303 
304 	CmdType commandType = query->commandType;
305 
306 	if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
307 		commandType != CMD_UPDATE && commandType != CMD_DELETE)
308 	{
309 		return false;
310 	}
311 
312 	List *allRTEs = ExtractRangeTableEntryList(query);
313 
314 	return ListContainsDistributedTableRTE(allRTEs);
315 }
316 
317 
318 /*
319  * ListContainsDistributedTableRTE gets a list of range table entries
320  * and returns true if there is at least one distributed relation range
321  * table entry in the list.
322  */
323 static bool
ListContainsDistributedTableRTE(List * rangeTableList)324 ListContainsDistributedTableRTE(List *rangeTableList)
325 {
326 	ListCell *rangeTableCell = NULL;
327 
328 	foreach(rangeTableCell, rangeTableList)
329 	{
330 		RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
331 
332 		if (rangeTableEntry->rtekind != RTE_RELATION)
333 		{
334 			continue;
335 		}
336 
337 		if (IsCitusTable(rangeTableEntry->relid))
338 		{
339 			return true;
340 		}
341 	}
342 
343 	return false;
344 }
345 
346 
347 /*
348  * AssignRTEIdentities function modifies query tree by adding RTE identities to the
349  * RTE_RELATIONs.
350  *
351  * Please note that, we want to avoid modifying query tree as much as possible
352  * because if PostgreSQL changes the way it uses modified fields, that may break
353  * our logic.
354  *
355  * Returns the next id. This can be used to call on a rangeTableList that may've
356  * been partially assigned. Should be set to 1 initially.
357  */
358 static int
AssignRTEIdentities(List * rangeTableList,int rteIdCounter)359 AssignRTEIdentities(List *rangeTableList, int rteIdCounter)
360 {
361 	ListCell *rangeTableCell = NULL;
362 
363 	foreach(rangeTableCell, rangeTableList)
364 	{
365 		RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
366 
367 		/*
368 		 * To be able to track individual RTEs through PostgreSQL's query
369 		 * planning, we need to be able to figure out whether an RTE is
370 		 * actually a copy of another, rather than a different one. We
371 		 * simply number the RTEs starting from 1.
372 		 *
373 		 * Note that we're only interested in RTE_RELATIONs and thus assigning
374 		 * identifiers to those RTEs only.
375 		 */
376 		if (rangeTableEntry->rtekind == RTE_RELATION &&
377 			rangeTableEntry->values_lists == NIL)
378 		{
379 			AssignRTEIdentity(rangeTableEntry, rteIdCounter++);
380 		}
381 	}
382 
383 	return rteIdCounter;
384 }
385 
386 
387 /*
388  * AdjustPartitioningForDistributedPlanning function modifies query tree by
389  * changing inh flag and relkind of partitioned tables. We want Postgres to
390  * treat partitioned tables as regular relations (i.e. we do not want to
391  * expand them to their partitions) since it breaks Citus planning in different
392  * ways. We let anything related to partitioning happen on the shards.
393  *
394  * Please note that, we want to avoid modifying query tree as much as possible
395  * because if PostgreSQL changes the way it uses modified fields, that may break
396  * our logic.
397  */
398 static void
AdjustPartitioningForDistributedPlanning(List * rangeTableList,bool setPartitionedTablesInherited)399 AdjustPartitioningForDistributedPlanning(List *rangeTableList,
400 										 bool setPartitionedTablesInherited)
401 {
402 	ListCell *rangeTableCell = NULL;
403 
404 	foreach(rangeTableCell, rangeTableList)
405 	{
406 		RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
407 
408 		/*
409 		 * We want Postgres to behave partitioned tables as regular relations
410 		 * (i.e. we do not want to expand them to their partitions). To do this
411 		 * we set each partitioned table's inh flag to appropriate
412 		 * value before and after dropping to the standart_planner.
413 		 */
414 		if (rangeTableEntry->rtekind == RTE_RELATION &&
415 			PartitionedTable(rangeTableEntry->relid))
416 		{
417 			rangeTableEntry->inh = setPartitionedTablesInherited;
418 
419 			if (setPartitionedTablesInherited)
420 			{
421 				rangeTableEntry->relkind = RELKIND_PARTITIONED_TABLE;
422 			}
423 			else
424 			{
425 				rangeTableEntry->relkind = RELKIND_RELATION;
426 			}
427 		}
428 	}
429 }
430 
431 
432 /*
433  * AssignRTEIdentity assigns the given rteIdentifier to the given range table
434  * entry.
435  *
436  * To be able to track RTEs through postgres' query planning, which copies and
437  * duplicate, and modifies them, we sometimes need to figure out whether two
438  * RTEs are copies of the same original RTE. For that we, hackishly, use a
439  * field normally unused in RTE_RELATION RTEs.
440  *
441  * The assigned identifier better be unique within a plantree.
442  */
443 static void
AssignRTEIdentity(RangeTblEntry * rangeTableEntry,int rteIdentifier)444 AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
445 {
446 	Assert(rangeTableEntry->rtekind == RTE_RELATION);
447 
448 	rangeTableEntry->values_lists = list_make2_int(rteIdentifier, rangeTableEntry->inh);
449 }
450 
451 
452 /* GetRTEIdentity returns the identity assigned with AssignRTEIdentity. */
453 int
GetRTEIdentity(RangeTblEntry * rte)454 GetRTEIdentity(RangeTblEntry *rte)
455 {
456 	Assert(rte->rtekind == RTE_RELATION);
457 	Assert(rte->values_lists != NIL);
458 	Assert(IsA(rte->values_lists, IntList));
459 	Assert(list_length(rte->values_lists) == 2);
460 
461 	return linitial_int(rte->values_lists);
462 }
463 
464 
465 /*
466  * GetOriginalInh gets the original value of the inheritance flag set by
467  * AssignRTEIdentity. The planner resets this flag in the rewritten query,
468  * but we need it during deparsing.
469  */
470 bool
GetOriginalInh(RangeTblEntry * rte)471 GetOriginalInh(RangeTblEntry *rte)
472 {
473 	return lsecond_int(rte->values_lists);
474 }
475 
476 
477 /*
478  * GetQueryLockMode returns the necessary lock mode to be acquired for the
479  * given query. (See comment written in RangeTblEntry->rellockmode)
480  */
481 LOCKMODE
GetQueryLockMode(Query * query)482 GetQueryLockMode(Query *query)
483 {
484 	if (IsModifyCommand(query))
485 	{
486 		return RowExclusiveLock;
487 	}
488 	else if (query->hasForUpdate)
489 	{
490 		return RowShareLock;
491 	}
492 	else
493 	{
494 		return AccessShareLock;
495 	}
496 }
497 
498 
499 /*
500  * IsModifyCommand returns true if the query performs modifications, false
501  * otherwise.
502  */
503 bool
IsModifyCommand(Query * query)504 IsModifyCommand(Query *query)
505 {
506 	CmdType commandType = query->commandType;
507 
508 	if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
509 		commandType == CMD_DELETE)
510 	{
511 		return true;
512 	}
513 
514 	return false;
515 }
516 
517 
518 /*
519  * IsMultiTaskPlan returns true if job contains multiple tasks.
520  */
521 bool
IsMultiTaskPlan(DistributedPlan * distributedPlan)522 IsMultiTaskPlan(DistributedPlan *distributedPlan)
523 {
524 	Job *workerJob = distributedPlan->workerJob;
525 
526 	if (workerJob != NULL && list_length(workerJob->taskList) > 1)
527 	{
528 		return true;
529 	}
530 
531 	return false;
532 }
533 
534 
535 /*
536  * IsUpdateOrDelete returns true if the query performs an update or delete.
537  */
538 bool
IsUpdateOrDelete(Query * query)539 IsUpdateOrDelete(Query *query)
540 {
541 	return query->commandType == CMD_UPDATE ||
542 		   query->commandType == CMD_DELETE;
543 }
544 
545 
546 /*
547  * PlanFastPathDistributedStmt creates a distributed planned statement using
548  * the FastPathPlanner.
549  */
550 static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext * planContext,Node * distributionKeyValue)551 PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
552 							Node *distributionKeyValue)
553 {
554 	FastPathRestrictionContext *fastPathContext =
555 		planContext->plannerRestrictionContext->fastPathRestrictionContext;
556 
557 	planContext->plannerRestrictionContext->fastPathRestrictionContext->
558 	fastPathRouterQuery = true;
559 
560 	if (distributionKeyValue == NULL)
561 	{
562 		/* nothing to record */
563 	}
564 	else if (IsA(distributionKeyValue, Const))
565 	{
566 		fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
567 	}
568 	else if (IsA(distributionKeyValue, Param))
569 	{
570 		fastPathContext->distributionKeyHasParam = true;
571 	}
572 
573 	planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
574 										planContext->boundParams);
575 
576 	return CreateDistributedPlannedStmt(planContext);
577 }
578 
579 
580 /*
581  * PlanDistributedStmt creates a distributed planned statement using the PG
582  * planner.
583  */
584 static PlannedStmt *
PlanDistributedStmt(DistributedPlanningContext * planContext,int rteIdCounter)585 PlanDistributedStmt(DistributedPlanningContext *planContext,
586 					int rteIdCounter)
587 {
588 	/* may've inlined new relation rtes */
589 	List *rangeTableList = ExtractRangeTableEntryList(planContext->query);
590 	rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
591 
592 
593 	PlannedStmt *result = CreateDistributedPlannedStmt(planContext);
594 
595 	bool setPartitionedTablesInherited = true;
596 	AdjustPartitioningForDistributedPlanning(rangeTableList,
597 											 setPartitionedTablesInherited);
598 
599 	return result;
600 }
601 
602 
603 /*
604  * DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that
605  * potentially failed due to unresolved prepared statement parameters.
606  */
607 void
DissuadePlannerFromUsingPlan(PlannedStmt * plan)608 DissuadePlannerFromUsingPlan(PlannedStmt *plan)
609 {
610 	/*
611 	 * Arbitrarily high cost, but low enough that it can be added up
612 	 * without overflowing by choose_custom_plan().
613 	 */
614 	plan->planTree->total_cost = FLT_MAX / 100000000;
615 }
616 
617 
618 /*
619  * CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular
620  * query into a distributed plan that is encapsulated by a PlannedStmt.
621  */
622 static PlannedStmt *
CreateDistributedPlannedStmt(DistributedPlanningContext * planContext)623 CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
624 {
625 	uint64 planId = NextPlanId++;
626 	bool hasUnresolvedParams = false;
627 
628 	PlannedStmt *resultPlan = NULL;
629 
630 	if (QueryTreeContainsInlinableCTE(planContext->originalQuery))
631 	{
632 		/*
633 		 * Inlining CTEs as subqueries in the query can avoid recursively
634 		 * planning some (or all) of the CTEs. In other words, the inlined
635 		 * CTEs could become part of query pushdown planning, which is much
636 		 * more efficient than recursively planning. So, first try distributed
637 		 * planning on the inlined CTEs in the query tree.
638 		 *
639 		 * We also should fallback to distributed planning with non-inlined CTEs
640 		 * if the distributed planning fails with inlined CTEs, because recursively
641 		 * planning CTEs can provide full SQL coverage, although it might be slow.
642 		 */
643 		resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext);
644 		if (resultPlan != NULL)
645 		{
646 			return resultPlan;
647 		}
648 	}
649 
650 	if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
651 										planContext->boundParams))
652 	{
653 		hasUnresolvedParams = true;
654 	}
655 
656 	DistributedPlan *distributedPlan =
657 		CreateDistributedPlan(planId, planContext->originalQuery, planContext->query,
658 							  planContext->boundParams,
659 							  hasUnresolvedParams,
660 							  planContext->plannerRestrictionContext);
661 
662 	/*
663 	 * If no plan was generated, prepare a generic error to be emitted.
664 	 * Normally this error message will never returned to the user, as it's
665 	 * usually due to unresolved prepared statement parameters - in that case
666 	 * the logic below will force a custom plan (i.e. with parameters bound to
667 	 * specific values) to be generated.  But sql (not plpgsql) functions
668 	 * unfortunately don't go through a codepath supporting custom plans - so
669 	 * we still need to have an error prepared.
670 	 */
671 	if (!distributedPlan)
672 	{
673 		/* currently always should have a more specific error otherwise */
674 		Assert(hasUnresolvedParams);
675 		distributedPlan = CitusMakeNode(DistributedPlan);
676 		distributedPlan->planningError =
677 			DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
678 						  "could not create distributed plan",
679 						  "Possibly this is caused by the use of parameters in SQL "
680 						  "functions, which is not supported in Citus.",
681 						  "Consider using PL/pgSQL functions instead.");
682 	}
683 
684 	/*
685 	 * Error out if none of the planners resulted in a usable plan, unless the
686 	 * error was possibly triggered by missing parameters.  In that case we'll
687 	 * not error out here, but instead rely on postgres' custom plan logic.
688 	 * Postgres re-plans prepared statements the first five executions
689 	 * (i.e. it produces custom plans), after that the cost of a generic plan
690 	 * is compared with the average custom plan cost.  We support otherwise
691 	 * unsupported prepared statement parameters by assigning an exorbitant
692 	 * cost to the unsupported query.  That'll lead to the custom plan being
693 	 * chosen.  But for that to be possible we can't error out here, as
694 	 * otherwise that logic is never reached.
695 	 */
696 	if (distributedPlan->planningError && !hasUnresolvedParams)
697 	{
698 		RaiseDeferredError(distributedPlan->planningError, ERROR);
699 	}
700 
701 	/* remember the plan's identifier for identifying subplans */
702 	distributedPlan->planId = planId;
703 
704 	/* create final plan by combining local plan with distributed plan */
705 	resultPlan = FinalizePlan(planContext->plan, distributedPlan);
706 
707 	/*
708 	 * As explained above, force planning costs to be unrealistically high if
709 	 * query planning failed (possibly) due to prepared statement parameters or
710 	 * if it is planned as a multi shard modify query.
711 	 */
712 	if ((distributedPlan->planningError ||
713 		 (IsUpdateOrDelete(planContext->originalQuery) && IsMultiTaskPlan(
714 			  distributedPlan))) &&
715 		hasUnresolvedParams)
716 	{
717 		DissuadePlannerFromUsingPlan(resultPlan);
718 	}
719 
720 	return resultPlan;
721 }
722 
723 
724 /*
725  * InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
726  * for creating a distributed planned statement. The function is primarily a
727  * wrapper on top of CreateDistributedPlannedStmt(), by first inlining the
728  * CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The
729  * function returns NULL if the planning fails on the query where eligable
730  * CTEs are inlined.
731  */
732 static PlannedStmt *
InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,DistributedPlanningContext * planContext)733 InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
734 										  DistributedPlanningContext *planContext)
735 {
736 	if (!EnableCTEInlining)
737 	{
738 		/*
739 		 * In Postgres 12+, users can adjust whether to inline/not inline CTEs
740 		 * by [NOT] MATERIALIZED keywords. However, in PG 11, that's not possible.
741 		 * So, with this we provide a way to prevent CTE inlining on Postgres 11.
742 		 *
743 		 * The main use-case for this is not to have divergent test outputs between
744 		 * PG 11 vs PG 12, so not very much intended for users.
745 		 */
746 		return NULL;
747 	}
748 
749 	/*
750 	 * We'll inline the CTEs and try distributed planning, preserve the original
751 	 * query in case the planning fails and we fallback to recursive planning of
752 	 * CTEs.
753 	 */
754 	Query *copyOfOriginalQuery = copyObject(planContext->originalQuery);
755 
756 	RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery);
757 
758 	/* after inlining, we shouldn't have any inlinable CTEs */
759 	Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
760 
761 	/* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */
762 	PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan,
763 														  copyOfOriginalQuery,
764 														  planContext->query,
765 														  planContext->boundParams,
766 														  planContext->
767 														  plannerRestrictionContext);
768 
769 	return result;
770 }
771 
772 
773 /*
774  * TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply
775  * calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input
776  * query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4
777  * message with the reason for the failure.
778  */
779 static PlannedStmt *
TryCreateDistributedPlannedStmt(PlannedStmt * localPlan,Query * originalQuery,Query * query,ParamListInfo boundParams,PlannerRestrictionContext * plannerRestrictionContext)780 TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
781 								Query *originalQuery,
782 								Query *query, ParamListInfo boundParams,
783 								PlannerRestrictionContext *plannerRestrictionContext)
784 {
785 	MemoryContext savedContext = CurrentMemoryContext;
786 	PlannedStmt *result = NULL;
787 
788 	DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext));
789 
790 	planContext->plan = localPlan;
791 	planContext->boundParams = boundParams;
792 	planContext->originalQuery = originalQuery;
793 	planContext->query = query;
794 	planContext->plannerRestrictionContext = plannerRestrictionContext;
795 
796 
797 	PG_TRY();
798 	{
799 		result = CreateDistributedPlannedStmt(planContext);
800 	}
801 	PG_CATCH();
802 	{
803 		MemoryContextSwitchTo(savedContext);
804 		ErrorData *edata = CopyErrorData();
805 		FlushErrorState();
806 
807 		/* don't try to intercept PANIC or FATAL, let those breeze past us */
808 		if (edata->elevel != ERROR)
809 		{
810 			PG_RE_THROW();
811 		}
812 
813 		ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
814 						 errmsg("Planning after CTEs inlined failed with "
815 								"\nmessage: %s\ndetail: %s\nhint: %s",
816 								edata->message ? edata->message : "",
817 								edata->detail ? edata->detail : "",
818 								edata->hint ? edata->hint : "")));
819 
820 		/* leave the error handling system */
821 		FreeErrorData(edata);
822 
823 		result = NULL;
824 	}
825 	PG_END_TRY();
826 
827 	return result;
828 }
829 
830 
831 /*
832  * CreateDistributedPlan generates a distributed plan for a query.
833  * It goes through 3 steps:
834  *
835  * 1. Try router planner
836  * 2. Generate subplans for CTEs and complex subqueries
837  *    - If any, go back to step 1 by calling itself recursively
838  * 3. Logical planner
839  */
840 DistributedPlan *
CreateDistributedPlan(uint64 planId,Query * originalQuery,Query * query,ParamListInfo boundParams,bool hasUnresolvedParams,PlannerRestrictionContext * plannerRestrictionContext)841 CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo
842 					  boundParams, bool hasUnresolvedParams,
843 					  PlannerRestrictionContext *plannerRestrictionContext)
844 {
845 	DistributedPlan *distributedPlan = NULL;
846 	bool hasCtes = originalQuery->cteList != NIL;
847 
848 	if (IsModifyCommand(originalQuery))
849 	{
850 		EnsureModificationsCanRun();
851 
852 		Oid targetRelationId = ModifyQueryResultRelationId(query);
853 		EnsurePartitionTableNotReplicated(targetRelationId);
854 
855 		if (InsertSelectIntoCitusTable(originalQuery))
856 		{
857 			if (hasUnresolvedParams)
858 			{
859 				/*
860 				 * Unresolved parameters can cause performance regressions in
861 				 * INSERT...SELECT when the partition column is a parameter
862 				 * because we don't perform any additional pruning in the executor.
863 				 */
864 				return NULL;
865 			}
866 
867 			distributedPlan =
868 				CreateInsertSelectPlan(planId, originalQuery, plannerRestrictionContext,
869 									   boundParams);
870 		}
871 		else if (InsertSelectIntoLocalTable(originalQuery))
872 		{
873 			if (hasUnresolvedParams)
874 			{
875 				/*
876 				 * Unresolved parameters can cause performance regressions in
877 				 * INSERT...SELECT when the partition column is a parameter
878 				 * because we don't perform any additional pruning in the executor.
879 				 */
880 				return NULL;
881 			}
882 			distributedPlan =
883 				CreateInsertSelectIntoLocalTablePlan(planId, originalQuery, boundParams,
884 													 hasUnresolvedParams,
885 													 plannerRestrictionContext);
886 		}
887 		else
888 		{
889 			/* modifications are always routed through the same planner/executor */
890 			distributedPlan =
891 				CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
892 		}
893 
894 		/* the functions above always return a plan, possibly with an error */
895 		Assert(distributedPlan);
896 
897 		if (distributedPlan->planningError == NULL)
898 		{
899 			return distributedPlan;
900 		}
901 		else
902 		{
903 			RaiseDeferredError(distributedPlan->planningError, DEBUG2);
904 		}
905 	}
906 	else
907 	{
908 		/*
909 		 * For select queries we, if router executor is enabled, first try to
910 		 * plan the query as a router query. If not supported, otherwise try
911 		 * the full blown plan/optimize/physical planning process needed to
912 		 * produce distributed query plans.
913 		 */
914 
915 		distributedPlan = CreateRouterPlan(originalQuery, query,
916 										   plannerRestrictionContext);
917 		if (distributedPlan->planningError == NULL)
918 		{
919 			return distributedPlan;
920 		}
921 		else
922 		{
923 			/*
924 			 * For debugging it's useful to display why query was not
925 			 * router plannable.
926 			 */
927 			RaiseDeferredError(distributedPlan->planningError, DEBUG2);
928 		}
929 	}
930 
931 	if (hasUnresolvedParams)
932 	{
933 		/*
934 		 * There are parameters that don't have a value in boundParams.
935 		 *
936 		 * The remainder of the planning logic cannot handle unbound
937 		 * parameters. We return a NULL plan, which will have an
938 		 * extremely high cost, such that postgres will replan with
939 		 * bound parameters.
940 		 */
941 		return NULL;
942 	}
943 
944 	/* force evaluation of bound params */
945 	boundParams = copyParamList(boundParams);
946 
947 	/*
948 	 * If there are parameters that do have a value in boundParams, replace
949 	 * them in the original query. This allows us to more easily cut the
950 	 * query into pieces (during recursive planning) or deparse parts of
951 	 * the query (during subquery pushdown planning).
952 	 */
953 	originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
954 													boundParams);
955 	Assert(originalQuery != NULL);
956 
957 	/*
958 	 * Plan subqueries and CTEs that cannot be pushed down by recursively
959 	 * calling the planner and return the resulting plans to subPlanList.
960 	 */
961 	List *subPlanList = GenerateSubplansForSubqueriesAndCTEs(planId, originalQuery,
962 															 plannerRestrictionContext);
963 
964 	/*
965 	 * If subqueries were recursively planned then we need to replan the query
966 	 * to get the new planner restriction context and apply planner transformations.
967 	 *
968 	 * We could simplify this code if the logical planner was capable of dealing
969 	 * with an original query. In that case, we would only have to filter the
970 	 * planner restriction context.
971 	 *
972 	 * Note that we check both for subplans and whether the query had CTEs
973 	 * prior to calling GenerateSubplansForSubqueriesAndCTEs. If none of
974 	 * the CTEs are referenced then there are no subplans, but we still want
975 	 * to retry the router planner.
976 	 */
977 	if (list_length(subPlanList) > 0 || hasCtes)
978 	{
979 		Query *newQuery = copyObject(originalQuery);
980 		bool setPartitionedTablesInherited = false;
981 		PlannerRestrictionContext *currentPlannerRestrictionContext =
982 			CurrentPlannerRestrictionContext();
983 
984 		/* reset the current planner restrictions context */
985 		ResetPlannerRestrictionContext(currentPlannerRestrictionContext);
986 
987 		/*
988 		 * We force standard_planner to treat partitioned tables as regular tables
989 		 * by clearing the inh flag on RTEs. We already did this at the start of
990 		 * distributed_planner, but on a copy of the original query, so we need
991 		 * to do it again here.
992 		 */
993 		AdjustPartitioningForDistributedPlanning(ExtractRangeTableEntryList(newQuery),
994 												 setPartitionedTablesInherited);
995 
996 		/*
997 		 * Some relations may have been removed from the query, but we can skip
998 		 * AssignRTEIdentities since we currently do not rely on RTE identities
999 		 * being contiguous.
1000 		 */
1001 
1002 		standard_planner_compat(newQuery, 0, boundParams);
1003 
1004 		/* overwrite the old transformed query with the new transformed query */
1005 		*query = *newQuery;
1006 
1007 		/* recurse into CreateDistributedPlan with subqueries/CTEs replaced */
1008 		distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, false,
1009 												plannerRestrictionContext);
1010 
1011 		/* distributedPlan cannot be null since hasUnresolvedParams argument was false */
1012 		Assert(distributedPlan != NULL);
1013 		distributedPlan->subPlanList = subPlanList;
1014 
1015 		return distributedPlan;
1016 	}
1017 
1018 	/*
1019 	 * DML command returns a planning error, even after recursive planning. The
1020 	 * logical planner cannot handle DML commands so return the plan with the
1021 	 * error.
1022 	 */
1023 	if (IsModifyCommand(originalQuery))
1024 	{
1025 		return distributedPlan;
1026 	}
1027 
1028 	/*
1029 	 * CTEs are stripped from the original query by RecursivelyPlanSubqueriesAndCTEs.
1030 	 * If we get here and there are still CTEs that means that none of the CTEs are
1031 	 * referenced. We therefore also strip the CTEs from the rewritten query.
1032 	 */
1033 	query->cteList = NIL;
1034 	Assert(originalQuery->cteList == NIL);
1035 
1036 	MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query,
1037 														plannerRestrictionContext);
1038 	MultiLogicalPlanOptimize(logicalPlan);
1039 
1040 	/*
1041 	 * This check is here to make it likely that all node types used in
1042 	 * Citus are dumpable. Explain can dump logical and physical plans
1043 	 * using the extended outfuncs infrastructure, but it's infeasible to
1044 	 * test most plans. MultiQueryContainerNode always serializes the
1045 	 * physical plan, so there's no need to check that separately
1046 	 */
1047 	CheckNodeIsDumpable((Node *) logicalPlan);
1048 
1049 	/* Create the physical plan */
1050 	distributedPlan = CreatePhysicalDistributedPlan(logicalPlan,
1051 													plannerRestrictionContext);
1052 
1053 	/* distributed plan currently should always succeed or error out */
1054 	Assert(distributedPlan && distributedPlan->planningError == NULL);
1055 
1056 	return distributedPlan;
1057 }
1058 
1059 
1060 /*
1061  * EnsurePartitionTableNotReplicated errors out if the infput relation is
1062  * a partition table and the table has a replication factor greater than
1063  * one.
1064  *
1065  * If the table is not a partition or replication factor is 1, the function
1066  * becomes a no-op.
1067  */
1068 void
EnsurePartitionTableNotReplicated(Oid relationId)1069 EnsurePartitionTableNotReplicated(Oid relationId)
1070 {
1071 	DeferredErrorMessage *deferredError =
1072 		DeferErrorIfPartitionTableNotSingleReplicated(relationId);
1073 	if (deferredError != NULL)
1074 	{
1075 		RaiseDeferredError(deferredError, ERROR);
1076 	}
1077 }
1078 
1079 
1080 /*
1081  * DeferErrorIfPartitionTableNotSingleReplicated defers error if the input relation
1082  * is a partition table with replication factor > 1. Otherwise, the function returns
1083  * NULL.
1084  */
1085 static DeferredErrorMessage *
DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId)1086 DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId)
1087 {
1088 	if (PartitionTableNoLock(relationId) && !SingleReplicatedTable(relationId))
1089 	{
1090 		Oid parentOid = PartitionParentOid(relationId);
1091 		char *parentRelationTest = get_rel_name(parentOid);
1092 		StringInfo errorHint = makeStringInfo();
1093 
1094 		appendStringInfo(errorHint, "Run the query on the parent table "
1095 									"\"%s\" instead.", parentRelationTest);
1096 
1097 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1098 							 "modifications on partitions when replication "
1099 							 "factor is greater than 1 is not supported",
1100 							 NULL, errorHint->data);
1101 	}
1102 
1103 	return NULL;
1104 }
1105 
1106 
1107 /*
1108  * ResolveExternalParams replaces the external parameters that appears
1109  * in the query with the corresponding entries in the boundParams.
1110  *
1111  * Note that this function is inspired by eval_const_expr() on Postgres.
1112  * We cannot use that function because it requires access to PlannerInfo.
1113  */
1114 Node *
ResolveExternalParams(Node * inputNode,ParamListInfo boundParams)1115 ResolveExternalParams(Node *inputNode, ParamListInfo boundParams)
1116 {
1117 	/* consider resolving external parameters only when boundParams exists */
1118 	if (!boundParams)
1119 	{
1120 		return inputNode;
1121 	}
1122 
1123 	if (inputNode == NULL)
1124 	{
1125 		return NULL;
1126 	}
1127 
1128 	if (IsA(inputNode, Param))
1129 	{
1130 		Param *paramToProcess = (Param *) inputNode;
1131 		int numberOfParameters = boundParams->numParams;
1132 		int parameterId = paramToProcess->paramid;
1133 		int16 typeLength = 0;
1134 		bool typeByValue = false;
1135 		Datum constValue = 0;
1136 
1137 		if (paramToProcess->paramkind != PARAM_EXTERN)
1138 		{
1139 			return inputNode;
1140 		}
1141 
1142 		if (parameterId < 0)
1143 		{
1144 			return inputNode;
1145 		}
1146 
1147 		/* parameterId starts from 1 */
1148 		int parameterIndex = parameterId - 1;
1149 		if (parameterIndex >= numberOfParameters)
1150 		{
1151 			return inputNode;
1152 		}
1153 
1154 		ParamExternData *correspondingParameterData =
1155 			&boundParams->params[parameterIndex];
1156 
1157 		if (!(correspondingParameterData->pflags & PARAM_FLAG_CONST))
1158 		{
1159 			return inputNode;
1160 		}
1161 
1162 		get_typlenbyval(paramToProcess->paramtype, &typeLength, &typeByValue);
1163 
1164 		bool paramIsNull = correspondingParameterData->isnull;
1165 		if (paramIsNull)
1166 		{
1167 			constValue = 0;
1168 		}
1169 		else if (typeByValue)
1170 		{
1171 			constValue = correspondingParameterData->value;
1172 		}
1173 		else
1174 		{
1175 			/*
1176 			 * Out of paranoia ensure that datum lives long enough,
1177 			 * although bind params currently should always live
1178 			 * long enough.
1179 			 */
1180 			constValue = datumCopy(correspondingParameterData->value, typeByValue,
1181 								   typeLength);
1182 		}
1183 
1184 		return (Node *) makeConst(paramToProcess->paramtype, paramToProcess->paramtypmod,
1185 								  paramToProcess->paramcollid, typeLength, constValue,
1186 								  paramIsNull, typeByValue);
1187 	}
1188 	else if (IsA(inputNode, Query))
1189 	{
1190 		return (Node *) query_tree_mutator((Query *) inputNode, ResolveExternalParams,
1191 										   boundParams, 0);
1192 	}
1193 
1194 	return expression_tree_mutator(inputNode, ResolveExternalParams, boundParams);
1195 }
1196 
1197 
1198 /*
1199  * GetDistributedPlan returns the associated DistributedPlan for a CustomScan.
1200  *
1201  * Callers should only read from the returned data structure, since it may be
1202  * the plan of a prepared statement and may therefore be reused.
1203  */
1204 DistributedPlan *
GetDistributedPlan(CustomScan * customScan)1205 GetDistributedPlan(CustomScan *customScan)
1206 {
1207 	Assert(list_length(customScan->custom_private) == 1);
1208 
1209 	Node *node = (Node *) linitial(customScan->custom_private);
1210 	Assert(CitusIsA(node, DistributedPlan));
1211 
1212 	CheckNodeCopyAndSerialization(node);
1213 
1214 	DistributedPlan *distributedPlan = (DistributedPlan *) node;
1215 
1216 	return distributedPlan;
1217 }
1218 
1219 
1220 /*
1221  * FinalizePlan combines local plan with distributed plan and creates a plan
1222  * which can be run by the PostgreSQL executor.
1223  */
1224 PlannedStmt *
FinalizePlan(PlannedStmt * localPlan,DistributedPlan * distributedPlan)1225 FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
1226 {
1227 	PlannedStmt *finalPlan = NULL;
1228 	CustomScan *customScan = makeNode(CustomScan);
1229 	MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST;
1230 
1231 	/* this field is used in JobExecutorType */
1232 	distributedPlan->relationIdList = localPlan->relationOids;
1233 
1234 	if (!distributedPlan->planningError)
1235 	{
1236 		executorType = JobExecutorType(distributedPlan);
1237 	}
1238 
1239 	switch (executorType)
1240 	{
1241 		case MULTI_EXECUTOR_ADAPTIVE:
1242 		{
1243 			customScan->methods = &AdaptiveExecutorCustomScanMethods;
1244 			break;
1245 		}
1246 
1247 		case MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT:
1248 		{
1249 			customScan->methods = &NonPushableInsertSelectCustomScanMethods;
1250 			break;
1251 		}
1252 
1253 		default:
1254 		{
1255 			customScan->methods = &DelayedErrorCustomScanMethods;
1256 			break;
1257 		}
1258 	}
1259 
1260 	if (IsMultiTaskPlan(distributedPlan))
1261 	{
1262 		/* if it is not a single task executable plan, inform user according to the log level */
1263 		if (MultiTaskQueryLogLevel != CITUS_LOG_LEVEL_OFF)
1264 		{
1265 			ereport(MultiTaskQueryLogLevel, (errmsg(
1266 												 "multi-task query about to be executed"),
1267 											 errhint(
1268 												 "Queries are split to multiple tasks "
1269 												 "if they have to be split into several"
1270 												 " queries on the workers.")));
1271 		}
1272 	}
1273 
1274 	distributedPlan->queryId = localPlan->queryId;
1275 
1276 	Node *distributedPlanData = (Node *) distributedPlan;
1277 
1278 	customScan->custom_private = list_make1(distributedPlanData);
1279 	customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
1280 
1281 	/*
1282 	 * Fast path queries cannot have any subplans by definition, so skip
1283 	 * expensive traversals.
1284 	 */
1285 	if (!distributedPlan->fastPathRouterPlan)
1286 	{
1287 		/*
1288 		 * Record subplans used by distributed plan to make intermediate result
1289 		 * pruning easier.
1290 		 *
1291 		 * We do this before finalizing the plan, because the combineQuery is
1292 		 * rewritten by standard_planner in FinalizeNonRouterPlan.
1293 		 */
1294 		distributedPlan->usedSubPlanNodeList = FindSubPlanUsages(distributedPlan);
1295 	}
1296 
1297 	if (distributedPlan->combineQuery)
1298 	{
1299 		finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan);
1300 	}
1301 	else
1302 	{
1303 		finalPlan = FinalizeRouterPlan(localPlan, customScan);
1304 	}
1305 
1306 	return finalPlan;
1307 }
1308 
1309 
1310 /*
1311  * FinalizeNonRouterPlan gets the distributed custom scan plan, and creates the
1312  * final master select plan on the top of this distributed plan for adaptive executor.
1313  */
1314 static PlannedStmt *
FinalizeNonRouterPlan(PlannedStmt * localPlan,DistributedPlan * distributedPlan,CustomScan * customScan)1315 FinalizeNonRouterPlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan,
1316 					  CustomScan *customScan)
1317 {
1318 	PlannedStmt *finalPlan = PlanCombineQuery(distributedPlan, customScan);
1319 	finalPlan->queryId = localPlan->queryId;
1320 	finalPlan->utilityStmt = localPlan->utilityStmt;
1321 
1322 	/* add original range table list for access permission checks */
1323 	finalPlan->rtable = list_concat(finalPlan->rtable, localPlan->rtable);
1324 
1325 	return finalPlan;
1326 }
1327 
1328 
1329 /*
1330  * FinalizeRouterPlan gets a CustomScan node which already wrapped distributed
1331  * part of a router plan and sets it as the direct child of the router plan
1332  * because we don't run any query on master node for router executable queries.
1333  * Here, we also rebuild the column list to read from the remote scan.
1334  */
1335 static PlannedStmt *
FinalizeRouterPlan(PlannedStmt * localPlan,CustomScan * customScan)1336 FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
1337 {
1338 	List *columnNameList = NIL;
1339 
1340 	customScan->custom_scan_tlist =
1341 		makeCustomScanTargetlistFromExistingTargetList(localPlan->planTree->targetlist);
1342 	customScan->scan.plan.targetlist =
1343 		makeTargetListFromCustomScanList(customScan->custom_scan_tlist);
1344 
1345 	/* extract the column names from the final targetlist*/
1346 	TargetEntry *targetEntry = NULL;
1347 	foreach_ptr(targetEntry, customScan->scan.plan.targetlist)
1348 	{
1349 		Value *columnName = makeString(targetEntry->resname);
1350 		columnNameList = lappend(columnNameList, columnName);
1351 	}
1352 
1353 	PlannedStmt *routerPlan = makeNode(PlannedStmt);
1354 	routerPlan->planTree = (Plan *) customScan;
1355 
1356 	RangeTblEntry *remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList);
1357 	routerPlan->rtable = list_make1(remoteScanRangeTableEntry);
1358 
1359 	/* add original range table list for access permission checks */
1360 	routerPlan->rtable = list_concat(routerPlan->rtable, localPlan->rtable);
1361 
1362 	routerPlan->canSetTag = true;
1363 	routerPlan->relationOids = NIL;
1364 
1365 	routerPlan->queryId = localPlan->queryId;
1366 	routerPlan->utilityStmt = localPlan->utilityStmt;
1367 	routerPlan->commandType = localPlan->commandType;
1368 	routerPlan->hasReturning = localPlan->hasReturning;
1369 
1370 	return routerPlan;
1371 }
1372 
1373 
1374 /*
1375  * makeCustomScanTargetlistFromExistingTargetList rebuilds the targetlist from the remote
1376  * query into a list that can be used as the custom_scan_tlist for our Citus Custom Scan.
1377  */
1378 static List *
makeCustomScanTargetlistFromExistingTargetList(List * existingTargetlist)1379 makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist)
1380 {
1381 	List *custom_scan_tlist = NIL;
1382 
1383 	/* we will have custom scan range table entry as the first one in the list */
1384 	const int customScanRangeTableIndex = 1;
1385 
1386 	/* build a targetlist to read from the custom scan output */
1387 	TargetEntry *targetEntry = NULL;
1388 	foreach_ptr(targetEntry, existingTargetlist)
1389 	{
1390 		Assert(IsA(targetEntry, TargetEntry));
1391 
1392 		/*
1393 		 * This is unlikely to be hit because we would not need resjunk stuff
1394 		 * at the toplevel of a router query - all things needing it have been
1395 		 * pushed down.
1396 		 */
1397 		if (targetEntry->resjunk)
1398 		{
1399 			continue;
1400 		}
1401 
1402 		/* build target entry pointing to remote scan range table entry */
1403 		Var *newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry);
1404 
1405 		if (newVar->vartype == RECORDOID || newVar->vartype == RECORDARRAYOID)
1406 		{
1407 			/*
1408 			 * Add the anonymous composite type to the type cache and store
1409 			 * the key in vartypmod. Eventually this makes its way into the
1410 			 * TupleDesc used by the executor, which uses it to parse the
1411 			 * query results from the workers in BuildTupleFromCStrings.
1412 			 */
1413 			newVar->vartypmod = BlessRecordExpression(targetEntry->expr);
1414 		}
1415 
1416 		TargetEntry *newTargetEntry = flatCopyTargetEntry(targetEntry);
1417 		newTargetEntry->expr = (Expr *) newVar;
1418 		custom_scan_tlist = lappend(custom_scan_tlist, newTargetEntry);
1419 	}
1420 
1421 	return custom_scan_tlist;
1422 }
1423 
1424 
1425 /*
1426  * makeTargetListFromCustomScanList based on a custom_scan_tlist create the target list to
1427  * use on the Citus Custom Scan Node. The targetlist differs from the custom_scan_tlist in
1428  * a way that the expressions in the targetlist all are references to the index (resno) in
1429  * the custom_scan_tlist in their varattno while the varno is replaced with INDEX_VAR
1430  * instead of the range table entry index.
1431  */
1432 static List *
makeTargetListFromCustomScanList(List * custom_scan_tlist)1433 makeTargetListFromCustomScanList(List *custom_scan_tlist)
1434 {
1435 	List *targetList = NIL;
1436 	TargetEntry *targetEntry = NULL;
1437 	int resno = 1;
1438 	foreach_ptr(targetEntry, custom_scan_tlist)
1439 	{
1440 		/*
1441 		 * INDEX_VAR is used to reference back to the TargetEntry in custom_scan_tlist by
1442 		 * its resno (index)
1443 		 */
1444 		Var *newVar = makeVarFromTargetEntry(INDEX_VAR, targetEntry);
1445 		TargetEntry *newTargetEntry = makeTargetEntry((Expr *) newVar, resno,
1446 													  targetEntry->resname,
1447 													  targetEntry->resjunk);
1448 		targetList = lappend(targetList, newTargetEntry);
1449 		resno++;
1450 	}
1451 	return targetList;
1452 }
1453 
1454 
1455 /*
1456  * BlessRecordExpression ensures we can parse an anonymous composite type on the
1457  * target list of a query that is sent to the worker.
1458  *
1459  * We cannot normally parse record types coming from the workers unless we
1460  * "bless" the tuple descriptor, which adds a transient type to the type cache
1461  * and assigns it a type mod value, which is the key in the type cache.
1462  */
1463 int32
BlessRecordExpression(Expr * expr)1464 BlessRecordExpression(Expr *expr)
1465 {
1466 	int32 typeMod = -1;
1467 
1468 	if (IsA(expr, FuncExpr) || IsA(expr, OpExpr))
1469 	{
1470 		/*
1471 		 * Handle functions that return records on the target
1472 		 * list, e.g. SELECT function_call(1,2);
1473 		 */
1474 		Oid resultTypeId = InvalidOid;
1475 		TupleDesc resultTupleDesc = NULL;
1476 
1477 		/* get_expr_result_type blesses the tuple descriptor */
1478 		TypeFuncClass typeClass = get_expr_result_type((Node *) expr, &resultTypeId,
1479 													   &resultTupleDesc);
1480 
1481 		if (typeClass == TYPEFUNC_COMPOSITE)
1482 		{
1483 			typeMod = resultTupleDesc->tdtypmod;
1484 		}
1485 	}
1486 	else if (IsA(expr, RowExpr))
1487 	{
1488 		/*
1489 		 * Handle row expressions, e.g. SELECT (1,2);
1490 		 */
1491 		RowExpr *rowExpr = (RowExpr *) expr;
1492 		ListCell *argCell = NULL;
1493 		int currentResno = 1;
1494 
1495 		TupleDesc rowTupleDesc = CreateTemplateTupleDesc(list_length(rowExpr->args));
1496 
1497 		foreach(argCell, rowExpr->args)
1498 		{
1499 			Node *rowArg = (Node *) lfirst(argCell);
1500 			Oid rowArgTypeId = exprType(rowArg);
1501 			int rowArgTypeMod = exprTypmod(rowArg);
1502 
1503 			if (rowArgTypeId == RECORDOID || rowArgTypeId == RECORDARRAYOID)
1504 			{
1505 				/* ensure nested rows are blessed as well */
1506 				rowArgTypeMod = BlessRecordExpression((Expr *) rowArg);
1507 			}
1508 
1509 			TupleDescInitEntry(rowTupleDesc, currentResno, NULL,
1510 							   rowArgTypeId, rowArgTypeMod, 0);
1511 			TupleDescInitEntryCollation(rowTupleDesc, currentResno,
1512 										exprCollation(rowArg));
1513 
1514 			currentResno++;
1515 		}
1516 
1517 		BlessTupleDesc(rowTupleDesc);
1518 
1519 		typeMod = rowTupleDesc->tdtypmod;
1520 	}
1521 	else if (IsA(expr, ArrayExpr))
1522 	{
1523 		/*
1524 		 * Handle row array expressions, e.g. SELECT ARRAY[(1,2)];
1525 		 * Postgres allows ARRAY[(1,2),(1,2,3)]. We do not.
1526 		 */
1527 		ArrayExpr *arrayExpr = (ArrayExpr *) expr;
1528 
1529 		typeMod = BlessRecordExpressionList(arrayExpr->elements);
1530 	}
1531 	else if (IsA(expr, NullIfExpr))
1532 	{
1533 		NullIfExpr *nullIfExpr = (NullIfExpr *) expr;
1534 
1535 		typeMod = BlessRecordExpressionList(nullIfExpr->args);
1536 	}
1537 	else if (IsA(expr, MinMaxExpr))
1538 	{
1539 		MinMaxExpr *minMaxExpr = (MinMaxExpr *) expr;
1540 
1541 		typeMod = BlessRecordExpressionList(minMaxExpr->args);
1542 	}
1543 	else if (IsA(expr, CoalesceExpr))
1544 	{
1545 		CoalesceExpr *coalesceExpr = (CoalesceExpr *) expr;
1546 
1547 		typeMod = BlessRecordExpressionList(coalesceExpr->args);
1548 	}
1549 	else if (IsA(expr, CaseExpr))
1550 	{
1551 		CaseExpr *caseExpr = (CaseExpr *) expr;
1552 		List *results = NIL;
1553 		ListCell *whenCell = NULL;
1554 
1555 		foreach(whenCell, caseExpr->args)
1556 		{
1557 			CaseWhen *whenArg = (CaseWhen *) lfirst(whenCell);
1558 
1559 			results = lappend(results, whenArg->result);
1560 		}
1561 
1562 		if (caseExpr->defresult != NULL)
1563 		{
1564 			results = lappend(results, caseExpr->defresult);
1565 		}
1566 
1567 		typeMod = BlessRecordExpressionList(results);
1568 	}
1569 
1570 	return typeMod;
1571 }
1572 
1573 
1574 /*
1575  * BlessRecordExpressionList maps BlessRecordExpression over a list.
1576  * Returns typmod of all expressions, or -1 if they are not all the same.
1577  * Ignores expressions with a typmod of -1.
1578  */
1579 static int32
BlessRecordExpressionList(List * exprs)1580 BlessRecordExpressionList(List *exprs)
1581 {
1582 	int32 finalTypeMod = -1;
1583 	ListCell *exprCell = NULL;
1584 	foreach(exprCell, exprs)
1585 	{
1586 		Node *exprArg = (Node *) lfirst(exprCell);
1587 		int32 exprTypeMod = BlessRecordExpression((Expr *) exprArg);
1588 
1589 		if (exprTypeMod == -1)
1590 		{
1591 			continue;
1592 		}
1593 		else if (finalTypeMod == -1)
1594 		{
1595 			finalTypeMod = exprTypeMod;
1596 		}
1597 		else if (finalTypeMod != exprTypeMod)
1598 		{
1599 			return -1;
1600 		}
1601 	}
1602 	return finalTypeMod;
1603 }
1604 
1605 
1606 /*
1607  * RemoteScanRangeTableEntry creates a range table entry from given column name
1608  * list to represent a remote scan.
1609  */
1610 RangeTblEntry *
RemoteScanRangeTableEntry(List * columnNameList)1611 RemoteScanRangeTableEntry(List *columnNameList)
1612 {
1613 	RangeTblEntry *remoteScanRangeTableEntry = makeNode(RangeTblEntry);
1614 
1615 	/* we use RTE_VALUES for custom scan because we can't look up relation */
1616 	remoteScanRangeTableEntry->rtekind = RTE_VALUES;
1617 	remoteScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList);
1618 	remoteScanRangeTableEntry->inh = false;
1619 	remoteScanRangeTableEntry->inFromCl = true;
1620 
1621 	return remoteScanRangeTableEntry;
1622 }
1623 
1624 
1625 /*
1626  * CheckNodeIsDumpable checks that the passed node can be dumped using
1627  * nodeToString(). As this checks is expensive, it's only active when
1628  * assertions are enabled.
1629  */
1630 static void
CheckNodeIsDumpable(Node * node)1631 CheckNodeIsDumpable(Node *node)
1632 {
1633 #ifdef USE_ASSERT_CHECKING
1634 	char *out = nodeToString(node);
1635 	pfree(out);
1636 #endif
1637 }
1638 
1639 
1640 /*
1641  * CheckNodeCopyAndSerialization checks copy/dump/read functions
1642  * for nodes and returns copy of the input.
1643  *
1644  * It is only active when assertions are enabled, otherwise it returns
1645  * the input directly. We use this to confirm that our serialization
1646  * and copy logic produces the correct plan during regression tests.
1647  *
1648  * It does not check string equality on node dumps due to differences
1649  * in some Postgres types.
1650  */
1651 static Node *
CheckNodeCopyAndSerialization(Node * node)1652 CheckNodeCopyAndSerialization(Node *node)
1653 {
1654 #ifdef USE_ASSERT_CHECKING
1655 	char *out = nodeToString(node);
1656 	Node *nodeCopy = copyObject(node);
1657 	char *outCopy = nodeToString(nodeCopy);
1658 
1659 	pfree(out);
1660 	pfree(outCopy);
1661 
1662 	return nodeCopy;
1663 #else
1664 	return node;
1665 #endif
1666 }
1667 
1668 
1669 /*
1670  * multi_join_restriction_hook is a hook called by postgresql standard planner
1671  * to notify us about various planning information regarding joins. We use
1672  * it to learn about the joining column.
1673  */
1674 void
multi_join_restriction_hook(PlannerInfo * root,RelOptInfo * joinrel,RelOptInfo * outerrel,RelOptInfo * innerrel,JoinType jointype,JoinPathExtraData * extra)1675 multi_join_restriction_hook(PlannerInfo *root,
1676 							RelOptInfo *joinrel,
1677 							RelOptInfo *outerrel,
1678 							RelOptInfo *innerrel,
1679 							JoinType jointype,
1680 							JoinPathExtraData *extra)
1681 {
1682 	if (bms_is_empty(innerrel->relids) || bms_is_empty(outerrel->relids))
1683 	{
1684 		/*
1685 		 * We do not expect empty relids. Still, ignoring such JoinRestriction is
1686 		 * preferable for two reasons:
1687 		 * 1. This might be a query that doesn't rely on JoinRestrictions at all (e.g.,
1688 		 * local query).
1689 		 * 2. We cannot process them when they are empty (and likely to segfault if
1690 		 * we allow as-is).
1691 		 */
1692 		ereport(DEBUG1, (errmsg("Join restriction information is NULL")));
1693 	}
1694 
1695 	/*
1696 	 * Use a memory context that's guaranteed to live long enough, could be
1697 	 * called in a more shortly lived one (e.g. with GEQO).
1698 	 */
1699 	PlannerRestrictionContext *plannerRestrictionContext =
1700 		CurrentPlannerRestrictionContext();
1701 	MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
1702 	MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
1703 
1704 	JoinRestrictionContext *joinRestrictionContext =
1705 		plannerRestrictionContext->joinRestrictionContext;
1706 	Assert(joinRestrictionContext != NULL);
1707 
1708 	JoinRestriction *joinRestriction = palloc0(sizeof(JoinRestriction));
1709 	joinRestriction->joinType = jointype;
1710 	joinRestriction->plannerInfo = root;
1711 
1712 	/*
1713 	 * We create a copy of restrictInfoList and relids because with geqo they may
1714 	 * be created in a memory context which will be deleted when we still need it,
1715 	 * thus we create a copy of it in our memory context.
1716 	 */
1717 	joinRestriction->joinRestrictInfoList = copyObject(extra->restrictlist);
1718 	joinRestriction->innerrelRelids = bms_copy(innerrel->relids);
1719 	joinRestriction->outerrelRelids = bms_copy(outerrel->relids);
1720 
1721 	joinRestrictionContext->joinRestrictionList =
1722 		lappend(joinRestrictionContext->joinRestrictionList, joinRestriction);
1723 
1724 	/*
1725 	 * Keep track if we received any semi joins here. If we didn't we can
1726 	 * later safely convert any semi joins in the rewritten query to inner
1727 	 * joins.
1728 	 */
1729 	joinRestrictionContext->hasSemiJoin = joinRestrictionContext->hasSemiJoin ||
1730 										  extra->sjinfo->jointype == JOIN_SEMI;
1731 
1732 	MemoryContextSwitchTo(oldMemoryContext);
1733 }
1734 
1735 
1736 /*
1737  * multi_relation_restriction_hook is a hook called by postgresql standard planner
1738  * to notify us about various planning information regarding a relation. We use
1739  * it to retrieve restrictions on relations.
1740  */
1741 void
multi_relation_restriction_hook(PlannerInfo * root,RelOptInfo * relOptInfo,Index restrictionIndex,RangeTblEntry * rte)1742 multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
1743 								Index restrictionIndex, RangeTblEntry *rte)
1744 {
1745 	CitusTableCacheEntry *cacheEntry = NULL;
1746 
1747 	if (ReplaceCitusExtraDataContainer && IsCitusExtraDataContainerRelation(rte))
1748 	{
1749 		/*
1750 		 * We got here by planning the query part that needs to be executed on the query
1751 		 * coordinator node.
1752 		 * We have verified the occurrence of the citus_extra_datacontainer function
1753 		 * encoding the remote scan we plan to execute here. We will replace all paths
1754 		 * with a path describing our custom scan.
1755 		 */
1756 		Path *path = CreateCitusCustomScanPath(root, relOptInfo, restrictionIndex, rte,
1757 											   ReplaceCitusExtraDataContainerWithCustomScan);
1758 
1759 		/* replace all paths with our custom scan and recalculate cheapest */
1760 		relOptInfo->pathlist = list_make1(path);
1761 		set_cheapest(relOptInfo);
1762 
1763 		return;
1764 	}
1765 
1766 	AdjustReadIntermediateResultCost(rte, relOptInfo);
1767 	AdjustReadIntermediateResultArrayCost(rte, relOptInfo);
1768 
1769 	if (rte->rtekind != RTE_RELATION)
1770 	{
1771 		return;
1772 	}
1773 
1774 	/*
1775 	 * Use a memory context that's guaranteed to live long enough, could be
1776 	 * called in a more shortly lived one (e.g. with GEQO).
1777 	 */
1778 	PlannerRestrictionContext *plannerRestrictionContext =
1779 		CurrentPlannerRestrictionContext();
1780 	MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
1781 	MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
1782 
1783 	bool distributedTable = IsCitusTable(rte->relid);
1784 
1785 	RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
1786 	relationRestriction->index = restrictionIndex;
1787 	relationRestriction->relationId = rte->relid;
1788 	relationRestriction->rte = rte;
1789 	relationRestriction->relOptInfo = relOptInfo;
1790 	relationRestriction->distributedRelation = distributedTable;
1791 	relationRestriction->plannerInfo = root;
1792 
1793 	/* see comments on GetVarFromAssignedParam() */
1794 	relationRestriction->outerPlanParamsList = OuterPlanParamsList(root);
1795 	relationRestriction->translatedVars = TranslatedVars(root,
1796 														 relationRestriction->index);
1797 
1798 	RelationRestrictionContext *relationRestrictionContext =
1799 		plannerRestrictionContext->relationRestrictionContext;
1800 
1801 	/*
1802 	 * We're also keeping track of whether all participant
1803 	 * tables are reference tables.
1804 	 */
1805 	if (distributedTable)
1806 	{
1807 		cacheEntry = GetCitusTableCacheEntry(rte->relid);
1808 
1809 		relationRestrictionContext->allReferenceTables &=
1810 			IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
1811 	}
1812 
1813 	relationRestrictionContext->relationRestrictionList =
1814 		lappend(relationRestrictionContext->relationRestrictionList, relationRestriction);
1815 
1816 	MemoryContextSwitchTo(oldMemoryContext);
1817 }
1818 
1819 
1820 /*
1821  * TranslatedVars deep copies the translated vars for the given relation index
1822  * if there is any append rel list.
1823  */
1824 static List *
TranslatedVars(PlannerInfo * root,int relationIndex)1825 TranslatedVars(PlannerInfo *root, int relationIndex)
1826 {
1827 	List *translatedVars = NIL;
1828 
1829 	if (root->append_rel_list != NIL)
1830 	{
1831 		AppendRelInfo *targetAppendRelInfo =
1832 			FindTargetAppendRelInfo(root, relationIndex);
1833 		if (targetAppendRelInfo != NULL)
1834 		{
1835 			/* postgres deletes translated_vars after pg13, hence we deep copy them here */
1836 			Node *targetNode = NULL;
1837 			foreach_ptr(targetNode, targetAppendRelInfo->translated_vars)
1838 			{
1839 				translatedVars =
1840 					lappend(translatedVars, copyObject(targetNode));
1841 			}
1842 		}
1843 	}
1844 	return translatedVars;
1845 }
1846 
1847 
1848 /*
1849  * FindTargetAppendRelInfo finds the target append rel info for the given
1850  * relation rte index.
1851  */
1852 static AppendRelInfo *
FindTargetAppendRelInfo(PlannerInfo * root,int relationRteIndex)1853 FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex)
1854 {
1855 	AppendRelInfo *appendRelInfo = NULL;
1856 
1857 	/* iterate on the queries that are part of UNION ALL subselects */
1858 	foreach_ptr(appendRelInfo, root->append_rel_list)
1859 	{
1860 		/*
1861 		 * We're only interested in the child rel that is equal to the
1862 		 * relation we're investigating. Here we don't need to find the offset
1863 		 * because postgres adds an offset to child_relid and parent_relid after
1864 		 * calling multi_relation_restriction_hook.
1865 		 */
1866 		if (appendRelInfo->child_relid == relationRteIndex)
1867 		{
1868 			return appendRelInfo;
1869 		}
1870 	}
1871 	return NULL;
1872 }
1873 
1874 
1875 /*
1876  * AdjustReadIntermediateResultCost adjusts the row count and total cost
1877  * of a read_intermediate_result call based on the file size.
1878  */
1879 static void
AdjustReadIntermediateResultCost(RangeTblEntry * rangeTableEntry,RelOptInfo * relOptInfo)1880 AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry, RelOptInfo *relOptInfo)
1881 {
1882 	if (rangeTableEntry->rtekind != RTE_FUNCTION ||
1883 		list_length(rangeTableEntry->functions) != 1)
1884 	{
1885 		/* avoid more expensive checks below for non-functions */
1886 		return;
1887 	}
1888 
1889 	if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5))
1890 	{
1891 		/* read_intermediate_result may not exist */
1892 		return;
1893 	}
1894 
1895 	if (!ContainsReadIntermediateResultFunction((Node *) rangeTableEntry->functions))
1896 	{
1897 		return;
1898 	}
1899 
1900 	RangeTblFunction *rangeTableFunction = (RangeTblFunction *) linitial(
1901 		rangeTableEntry->functions);
1902 	FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
1903 	Const *resultIdConst = (Const *) linitial(funcExpression->args);
1904 	if (!IsA(resultIdConst, Const))
1905 	{
1906 		/* not sure how to interpret non-const */
1907 		return;
1908 	}
1909 
1910 	Datum resultIdDatum = resultIdConst->constvalue;
1911 
1912 	Const *resultFormatConst = (Const *) lsecond(funcExpression->args);
1913 	if (!IsA(resultFormatConst, Const))
1914 	{
1915 		/* not sure how to interpret non-const */
1916 		return;
1917 	}
1918 
1919 	AdjustReadIntermediateResultsCostInternal(relOptInfo,
1920 											  rangeTableFunction->funccoltypes,
1921 											  1, &resultIdDatum, resultFormatConst);
1922 }
1923 
1924 
1925 /*
1926  * AdjustReadIntermediateResultArrayCost adjusts the row count and total cost
1927  * of a read_intermediate_results(resultIds, format) call based on the file size.
1928  */
1929 static void
AdjustReadIntermediateResultArrayCost(RangeTblEntry * rangeTableEntry,RelOptInfo * relOptInfo)1930 AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
1931 									  RelOptInfo *relOptInfo)
1932 {
1933 	Datum *resultIdArray = NULL;
1934 	int resultIdCount = 0;
1935 
1936 	if (rangeTableEntry->rtekind != RTE_FUNCTION ||
1937 		list_length(rangeTableEntry->functions) != 1)
1938 	{
1939 		/* avoid more expensive checks below for non-functions */
1940 		return;
1941 	}
1942 
1943 	if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG5))
1944 	{
1945 		/* read_intermediate_result may not exist */
1946 		return;
1947 	}
1948 
1949 	if (!ContainsReadIntermediateResultArrayFunction((Node *) rangeTableEntry->functions))
1950 	{
1951 		return;
1952 	}
1953 
1954 	RangeTblFunction *rangeTableFunction =
1955 		(RangeTblFunction *) linitial(rangeTableEntry->functions);
1956 	FuncExpr *funcExpression = (FuncExpr *) rangeTableFunction->funcexpr;
1957 	Const *resultIdConst = (Const *) linitial(funcExpression->args);
1958 	if (!IsA(resultIdConst, Const))
1959 	{
1960 		/* not sure how to interpret non-const */
1961 		return;
1962 	}
1963 
1964 	Datum resultIdArrayDatum = resultIdConst->constvalue;
1965 	deconstruct_array(DatumGetArrayTypeP(resultIdArrayDatum), TEXTOID, -1, false,
1966 					  'i', &resultIdArray, NULL, &resultIdCount);
1967 
1968 	Const *resultFormatConst = (Const *) lsecond(funcExpression->args);
1969 	if (!IsA(resultFormatConst, Const))
1970 	{
1971 		/* not sure how to interpret non-const */
1972 		return;
1973 	}
1974 
1975 	AdjustReadIntermediateResultsCostInternal(relOptInfo,
1976 											  rangeTableFunction->funccoltypes,
1977 											  resultIdCount, resultIdArray,
1978 											  resultFormatConst);
1979 }
1980 
1981 
1982 /*
1983  * AdjustReadIntermediateResultsCostInternal adjusts the row count and total cost
1984  * of reading intermediate results based on file sizes.
1985  */
1986 static void
AdjustReadIntermediateResultsCostInternal(RelOptInfo * relOptInfo,List * columnTypes,int resultIdCount,Datum * resultIds,Const * resultFormatConst)1987 AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo, List *columnTypes,
1988 										  int resultIdCount, Datum *resultIds,
1989 										  Const *resultFormatConst)
1990 {
1991 	PathTarget *reltarget = relOptInfo->reltarget;
1992 	List *pathList = relOptInfo->pathlist;
1993 	double rowCost = 0.;
1994 	double rowSizeEstimate = 0;
1995 	double rowCountEstimate = 0.;
1996 	double ioCost = 0.;
1997 	QualCost funcCost = { 0., 0. };
1998 	int64 totalResultSize = 0;
1999 	ListCell *typeCell = NULL;
2000 
2001 	Datum resultFormatDatum = resultFormatConst->constvalue;
2002 	Oid resultFormatId = DatumGetObjectId(resultFormatDatum);
2003 	bool binaryFormat = (resultFormatId == BinaryCopyFormatId());
2004 
2005 	for (int index = 0; index < resultIdCount; index++)
2006 	{
2007 		char *resultId = TextDatumGetCString(resultIds[index]);
2008 		int64 resultSize = IntermediateResultSize(resultId);
2009 		if (resultSize < 0)
2010 		{
2011 			/* result does not exist, will probably error out later on */
2012 			return;
2013 		}
2014 
2015 		if (binaryFormat)
2016 		{
2017 			/* subtract 11-byte signature + 8 byte header + 2-byte footer */
2018 			totalResultSize -= 21;
2019 		}
2020 
2021 		totalResultSize += resultSize;
2022 	}
2023 
2024 	/* start with the cost of evaluating quals */
2025 	rowCost += relOptInfo->baserestrictcost.per_tuple;
2026 
2027 	/* postgres' estimate for the width of the rows */
2028 	rowSizeEstimate += reltarget->width;
2029 
2030 	/* add 2 bytes for column count (binary) or line separator (text) */
2031 	rowSizeEstimate += 2;
2032 
2033 	foreach(typeCell, columnTypes)
2034 	{
2035 		Oid columnTypeId = lfirst_oid(typeCell);
2036 		Oid inputFunctionId = InvalidOid;
2037 		Oid typeIOParam = InvalidOid;
2038 
2039 		if (binaryFormat)
2040 		{
2041 			getTypeBinaryInputInfo(columnTypeId, &inputFunctionId, &typeIOParam);
2042 
2043 			/* binary format: 4 bytes for field size */
2044 			rowSizeEstimate += 4;
2045 		}
2046 		else
2047 		{
2048 			getTypeInputInfo(columnTypeId, &inputFunctionId, &typeIOParam);
2049 
2050 			/* text format: 1 byte for tab separator */
2051 			rowSizeEstimate += 1;
2052 		}
2053 
2054 
2055 		/* add the cost of parsing a column */
2056 		add_function_cost(NULL, inputFunctionId, NULL, &funcCost);
2057 	}
2058 	rowCost += funcCost.per_tuple;
2059 
2060 	/* estimate the number of rows based on the file size and estimated row size */
2061 	rowCountEstimate = Max(1, (double) totalResultSize / rowSizeEstimate);
2062 
2063 	/* cost of reading the data */
2064 	ioCost = seq_page_cost * totalResultSize / BLCKSZ;
2065 
2066 	Assert(pathList != NIL);
2067 
2068 	/* tell the planner about the cost and row count of the function */
2069 	Path *path = (Path *) linitial(pathList);
2070 	path->rows = rowCountEstimate;
2071 	path->total_cost = rowCountEstimate * rowCost + ioCost;
2072 
2073 	path->startup_cost = funcCost.startup + relOptInfo->baserestrictcost.startup;
2074 }
2075 
2076 
2077 /*
2078  * OuterPlanParamsList creates a list of RootPlanParams for outer nodes of the
2079  * given root. The first item in the list corresponds to parent_root, and the
2080  * last item corresponds to the outer most node.
2081  */
2082 static List *
OuterPlanParamsList(PlannerInfo * root)2083 OuterPlanParamsList(PlannerInfo *root)
2084 {
2085 	List *planParamsList = NIL;
2086 
2087 	for (PlannerInfo *outerNodeRoot = root->parent_root; outerNodeRoot != NULL;
2088 		 outerNodeRoot = outerNodeRoot->parent_root)
2089 	{
2090 		RootPlanParams *rootPlanParams = palloc0(sizeof(RootPlanParams));
2091 		rootPlanParams->root = outerNodeRoot;
2092 
2093 		/*
2094 		 * TODO: In SearchPlannerParamList() we are only interested in Var plan
2095 		 * params, consider copying just them here.
2096 		 */
2097 		rootPlanParams->plan_params = CopyPlanParamList(outerNodeRoot->plan_params);
2098 
2099 		planParamsList = lappend(planParamsList, rootPlanParams);
2100 	}
2101 
2102 	return planParamsList;
2103 }
2104 
2105 
2106 /*
2107  * CopyPlanParamList deep copies the input PlannerParamItem list and returns the newly
2108  * allocated list.
2109  * Note that we cannot use copyObject() function directly since there is no support for
2110  * copying PlannerParamItem structs.
2111  */
2112 static List *
CopyPlanParamList(List * originalPlanParamList)2113 CopyPlanParamList(List *originalPlanParamList)
2114 {
2115 	ListCell *planParamCell = NULL;
2116 	List *copiedPlanParamList = NIL;
2117 
2118 	foreach(planParamCell, originalPlanParamList)
2119 	{
2120 		PlannerParamItem *originalParamItem = lfirst(planParamCell);
2121 		PlannerParamItem *copiedParamItem = makeNode(PlannerParamItem);
2122 
2123 		copiedParamItem->paramId = originalParamItem->paramId;
2124 		copiedParamItem->item = copyObject(originalParamItem->item);
2125 
2126 		copiedPlanParamList = lappend(copiedPlanParamList, copiedParamItem);
2127 	}
2128 
2129 	return copiedPlanParamList;
2130 }
2131 
2132 
2133 /*
2134  * CreateAndPushPlannerRestrictionContext creates a new relation restriction context
2135  * and a new join context, inserts it to the beginning of the
2136  * plannerRestrictionContextList. Finally, the planner restriction context is
2137  * inserted to the beginning of the plannerRestrictionContextList and it is returned.
2138  */
2139 static PlannerRestrictionContext *
CreateAndPushPlannerRestrictionContext(void)2140 CreateAndPushPlannerRestrictionContext(void)
2141 {
2142 	PlannerRestrictionContext *plannerRestrictionContext =
2143 		palloc0(sizeof(PlannerRestrictionContext));
2144 
2145 	plannerRestrictionContext->relationRestrictionContext =
2146 		palloc0(sizeof(RelationRestrictionContext));
2147 
2148 	plannerRestrictionContext->joinRestrictionContext =
2149 		palloc0(sizeof(JoinRestrictionContext));
2150 
2151 	plannerRestrictionContext->fastPathRestrictionContext =
2152 		palloc0(sizeof(FastPathRestrictionContext));
2153 
2154 	plannerRestrictionContext->memoryContext = CurrentMemoryContext;
2155 
2156 	/* we'll apply logical AND as we add tables */
2157 	plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true;
2158 
2159 	plannerRestrictionContextList = lcons(plannerRestrictionContext,
2160 										  plannerRestrictionContextList);
2161 
2162 	return plannerRestrictionContext;
2163 }
2164 
2165 
2166 /*
2167  * TranslatedVarsForRteIdentity gets an rteIdentity and returns the
2168  * translatedVars that belong to the range table relation. If no
2169  * translatedVars found, the function returns NIL;
2170  */
2171 List *
TranslatedVarsForRteIdentity(int rteIdentity)2172 TranslatedVarsForRteIdentity(int rteIdentity)
2173 {
2174 	PlannerRestrictionContext *currentPlannerRestrictionContext =
2175 		CurrentPlannerRestrictionContext();
2176 
2177 	List *relationRestrictionList =
2178 		currentPlannerRestrictionContext->relationRestrictionContext->
2179 		relationRestrictionList;
2180 	RelationRestriction *relationRestriction = NULL;
2181 	foreach_ptr(relationRestriction, relationRestrictionList)
2182 	{
2183 		if (GetRTEIdentity(relationRestriction->rte) == rteIdentity)
2184 		{
2185 			return relationRestriction->translatedVars;
2186 		}
2187 	}
2188 
2189 	return NIL;
2190 }
2191 
2192 
2193 /*
2194  * CurrentRestrictionContext returns the most recently added
2195  * PlannerRestrictionContext from the plannerRestrictionContextList list.
2196  */
2197 static PlannerRestrictionContext *
CurrentPlannerRestrictionContext(void)2198 CurrentPlannerRestrictionContext(void)
2199 {
2200 	Assert(plannerRestrictionContextList != NIL);
2201 
2202 	PlannerRestrictionContext *plannerRestrictionContext =
2203 		(PlannerRestrictionContext *) linitial(plannerRestrictionContextList);
2204 
2205 	if (plannerRestrictionContext == NULL)
2206 	{
2207 		ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
2208 						errmsg("planner restriction context stack was empty"),
2209 						errdetail("Please report this to the Citus core team.")));
2210 	}
2211 
2212 	return plannerRestrictionContext;
2213 }
2214 
2215 
2216 /*
2217  * PopPlannerRestrictionContext removes the most recently added restriction contexts from
2218  * the planner restriction context list. The function assumes the list is not empty.
2219  */
2220 static void
PopPlannerRestrictionContext(void)2221 PopPlannerRestrictionContext(void)
2222 {
2223 	plannerRestrictionContextList = list_delete_first(plannerRestrictionContextList);
2224 }
2225 
2226 
2227 /*
2228  * ResetPlannerRestrictionContext resets the element of the given planner
2229  * restriction context.
2230  */
2231 static void
ResetPlannerRestrictionContext(PlannerRestrictionContext * plannerRestrictionContext)2232 ResetPlannerRestrictionContext(PlannerRestrictionContext *plannerRestrictionContext)
2233 {
2234 	plannerRestrictionContext->relationRestrictionContext =
2235 		palloc0(sizeof(RelationRestrictionContext));
2236 
2237 	plannerRestrictionContext->joinRestrictionContext =
2238 		palloc0(sizeof(JoinRestrictionContext));
2239 
2240 	plannerRestrictionContext->fastPathRestrictionContext =
2241 		palloc0(sizeof(FastPathRestrictionContext));
2242 
2243 
2244 	/* we'll apply logical AND as we add tables */
2245 	plannerRestrictionContext->relationRestrictionContext->allReferenceTables = true;
2246 }
2247 
2248 
2249 /*
2250  * HasUnresolvedExternParamsWalker returns true if the passed in expression
2251  * has external parameters that are not contained in boundParams, false
2252  * otherwise.
2253  */
2254 bool
HasUnresolvedExternParamsWalker(Node * expression,ParamListInfo boundParams)2255 HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
2256 {
2257 	if (expression == NULL)
2258 	{
2259 		return false;
2260 	}
2261 
2262 	if (IsA(expression, Param))
2263 	{
2264 		Param *param = (Param *) expression;
2265 		int paramId = param->paramid;
2266 
2267 		/* only care about user supplied parameters */
2268 		if (param->paramkind != PARAM_EXTERN)
2269 		{
2270 			return false;
2271 		}
2272 
2273 		/* check whether parameter is available (and valid) */
2274 		if (boundParams && paramId > 0 && paramId <= boundParams->numParams)
2275 		{
2276 			ParamExternData *externParam = NULL;
2277 
2278 			/* give hook a chance in case parameter is dynamic */
2279 			if (boundParams->paramFetch != NULL)
2280 			{
2281 				ParamExternData externParamPlaceholder;
2282 				externParam = (*boundParams->paramFetch)(boundParams, paramId, false,
2283 														 &externParamPlaceholder);
2284 			}
2285 			else
2286 			{
2287 				externParam = &boundParams->params[paramId - 1];
2288 			}
2289 
2290 			Oid paramType = externParam->ptype;
2291 			if (OidIsValid(paramType))
2292 			{
2293 				return false;
2294 			}
2295 		}
2296 
2297 		return true;
2298 	}
2299 
2300 	/* keep traversing */
2301 	if (IsA(expression, Query))
2302 	{
2303 		return query_tree_walker((Query *) expression,
2304 								 HasUnresolvedExternParamsWalker,
2305 								 boundParams,
2306 								 0);
2307 	}
2308 	else
2309 	{
2310 		return expression_tree_walker(expression,
2311 									  HasUnresolvedExternParamsWalker,
2312 									  boundParams);
2313 	}
2314 }
2315 
2316 
2317 /*
2318  * GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that
2319  * returns RTEListProperties for the rte list retrieved from query.
2320  */
2321 RTEListProperties *
GetRTEListPropertiesForQuery(Query * query)2322 GetRTEListPropertiesForQuery(Query *query)
2323 {
2324 	List *rteList = ExtractRangeTableEntryList(query);
2325 	return GetRTEListProperties(rteList);
2326 }
2327 
2328 
2329 /*
2330  * GetRTEListProperties returns RTEListProperties struct processing the given
2331  * rangeTableList.
2332  */
2333 static RTEListProperties *
GetRTEListProperties(List * rangeTableList)2334 GetRTEListProperties(List *rangeTableList)
2335 {
2336 	RTEListProperties *rteListProperties = palloc0(sizeof(RTEListProperties));
2337 
2338 	RangeTblEntry *rangeTableEntry = NULL;
2339 	foreach_ptr(rangeTableEntry, rangeTableList)
2340 	{
2341 		if (rangeTableEntry->rtekind != RTE_RELATION)
2342 		{
2343 			continue;
2344 		}
2345 		else if (rangeTableEntry->relkind == RELKIND_VIEW)
2346 		{
2347 			/*
2348 			 * Skip over views, distributed tables within (regular) views are
2349 			 * already in rangeTableList.
2350 			 */
2351 			continue;
2352 		}
2353 
2354 
2355 		if (rangeTableEntry->relkind == RELKIND_MATVIEW)
2356 		{
2357 			/*
2358 			 * Record materialized views as they are similar to postgres local tables
2359 			 * but it is nice to record them separately.
2360 			 *
2361 			 * Regular tables, partitioned tables or foreign tables can be a local or
2362 			 * distributed tables and we can qualify them accurately.
2363 			 *
2364 			 * For regular views, we don't care because their definitions are already
2365 			 * in the same query tree and we can detect what is inside the view definition.
2366 			 *
2367 			 * For materialized views, they are just local tables in the queries. But, when
2368 			 * REFRESH MATERIALIZED VIEW is used, they behave similar to regular views, adds
2369 			 * the view definition to the query. Hence, it is useful to record it seperately
2370 			 * and let the callers decide on what to do.
2371 			 */
2372 			rteListProperties->hasMaterializedView = true;
2373 			continue;
2374 		}
2375 
2376 		Oid relationId = rangeTableEntry->relid;
2377 		CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId);
2378 		if (!cacheEntry)
2379 		{
2380 			rteListProperties->hasPostgresLocalTable = true;
2381 		}
2382 		else if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE))
2383 		{
2384 			rteListProperties->hasReferenceTable = true;
2385 		}
2386 		else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_TABLE))
2387 		{
2388 			rteListProperties->hasCitusLocalTable = true;
2389 		}
2390 		else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE))
2391 		{
2392 			rteListProperties->hasDistributedTable = true;
2393 		}
2394 		else
2395 		{
2396 			/* it's not expected, but let's do a bug catch here */
2397 			ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
2398 							errmsg("encountered with an unexpected citus "
2399 								   "table type while processing range table "
2400 								   "entries of query")));
2401 		}
2402 	}
2403 
2404 	rteListProperties->hasCitusTable = (rteListProperties->hasDistributedTable ||
2405 										rteListProperties->hasReferenceTable ||
2406 										rteListProperties->hasCitusLocalTable);
2407 
2408 	return rteListProperties;
2409 }
2410