1 /*-------------------------------------------------------------------------
2  *
3  * insert_select_planner.c
4  *
5  * Planning logic for INSERT..SELECT.
6  *
7  * Copyright (c) Citus Data, Inc.
8  *-------------------------------------------------------------------------
9  */
10 
11 #include "postgres.h"
12 
13 #include "distributed/pg_version_constants.h"
14 
15 #include "catalog/pg_class.h"
16 #include "catalog/pg_type.h"
17 #include "distributed/citus_clauses.h"
18 #include "distributed/citus_ruleutils.h"
19 #include "distributed/colocation_utils.h"
20 #include "distributed/errormessage.h"
21 #include "distributed/listutils.h"
22 #include "distributed/log_utils.h"
23 #include "distributed/insert_select_executor.h"
24 #include "distributed/insert_select_planner.h"
25 #include "distributed/metadata_cache.h"
26 #include "distributed/multi_executor.h"
27 #include "distributed/multi_logical_planner.h"
28 #include "distributed/multi_logical_optimizer.h"
29 #include "distributed/multi_physical_planner.h"
30 #include "distributed/multi_router_planner.h"
31 #include "distributed/pg_dist_partition.h"
32 #include "distributed/query_pushdown_planning.h"
33 #include "distributed/recursive_planning.h"
34 #include "distributed/resource_lock.h"
35 #include "distributed/version_compat.h"
36 #include "nodes/makefuncs.h"
37 #include "nodes/nodeFuncs.h"
38 #include "nodes/parsenodes.h"
39 #include "optimizer/clauses.h"
40 #include "optimizer/planner.h"
41 #include "optimizer/restrictinfo.h"
42 #include "optimizer/tlist.h"
43 #include "optimizer/optimizer.h"
44 #include "parser/parsetree.h"
45 #include "parser/parse_coerce.h"
46 #include "parser/parse_relation.h"
47 #include "tcop/tcopprot.h"
48 #include "utils/builtins.h"
49 #include "utils/lsyscache.h"
50 #include "utils/rel.h"
51 
52 
53 static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
54 														Query *originalQuery,
55 														PlannerRestrictionContext *
56 														plannerRestrictionContext,
57 														ParamListInfo boundParams);
58 static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
59 														   PlannerRestrictionContext *
60 														   plannerRestrictionContext);
61 static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
62 											   CitusTableCacheEntry *targetTableCacheEntry,
63 											   ShardInterval *shardInterval,
64 											   PlannerRestrictionContext *
65 											   plannerRestrictionContext,
66 											   uint32 taskIdIndex,
67 											   bool allRelationsJoinedOnPartitionKey,
68 											   DeferredErrorMessage **routerPlannerError);
69 static Query * CreateCombineQueryForRouterPlan(DistributedPlan *distPlan);
70 static List * CreateTargetListForCombineQuery(List *targetList);
71 static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree,
72 															   RangeTblEntry *insertRte,
73 															   RangeTblEntry *subqueryRte,
74 															   bool allReferenceTables);
75 static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query);
76 static bool HasUnsupportedDistinctOn(Query *query);
77 static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
78 																 RangeTblEntry *insertRte,
79 																 RangeTblEntry *
80 																 subqueryRte,
81 																 Oid *
82 																 selectPartitionColumnTableId);
83 static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
84 														   ParamListInfo boundParams);
85 static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
86 static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
87 static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
88 								   Oid targetRelationId);
89 static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
90 					   int targetTypeMod);
91 
92 
93 /* depth of current insert/select planner. */
94 static int insertSelectPlannerLevel = 0;
95 
96 
97 /*
98  * InsertSelectIntoCitusTable returns true when the input query is an
99  * INSERT INTO ... SELECT kind of query and the target is a citus
100  * table.
101  *
102  * Note that the input query should be the original parsetree of
103  * the query (i.e., not passed trough the standard planner).
104  */
105 bool
InsertSelectIntoCitusTable(Query * query)106 InsertSelectIntoCitusTable(Query *query)
107 {
108 	bool insertSelectQuery = CheckInsertSelectQuery(query);
109 
110 	if (insertSelectQuery)
111 	{
112 		RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
113 		if (IsCitusTable(insertRte->relid))
114 		{
115 			return true;
116 		}
117 	}
118 
119 	return false;
120 }
121 
122 
123 /*
124  * InsertSelectIntoLocalTable checks whether INSERT INTO ... SELECT inserts
125  * into local table. Note that query must be a sample of INSERT INTO ... SELECT
126  * type of query.
127  */
128 bool
InsertSelectIntoLocalTable(Query * query)129 InsertSelectIntoLocalTable(Query *query)
130 {
131 	bool insertSelectQuery = CheckInsertSelectQuery(query);
132 
133 	if (insertSelectQuery)
134 	{
135 		RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
136 		if (!IsCitusTable(insertRte->relid))
137 		{
138 			return true;
139 		}
140 	}
141 
142 	return false;
143 }
144 
145 
146 /*
147  * CheckInsertSelectQuery returns true when the input query is an INSERT INTO
148  * ... SELECT kind of query.
149  *
150  * This function is inspired from getInsertSelectQuery() on
151  * rewrite/rewriteManip.c.
152  */
153 bool
CheckInsertSelectQuery(Query * query)154 CheckInsertSelectQuery(Query *query)
155 {
156 	CmdType commandType = query->commandType;
157 
158 	if (commandType != CMD_INSERT)
159 	{
160 		return false;
161 	}
162 
163 	if (query->jointree == NULL || !IsA(query->jointree, FromExpr))
164 	{
165 		return false;
166 	}
167 
168 	List *fromList = query->jointree->fromlist;
169 	if (list_length(fromList) != 1)
170 	{
171 		return false;
172 	}
173 
174 	RangeTblRef *rangeTableReference = linitial(fromList);
175 	if (!IsA(rangeTableReference, RangeTblRef))
176 	{
177 		return false;
178 	}
179 
180 	RangeTblEntry *subqueryRte = rt_fetch(rangeTableReference->rtindex, query->rtable);
181 	if (subqueryRte->rtekind != RTE_SUBQUERY)
182 	{
183 		return false;
184 	}
185 
186 	/* ensure that there is a query */
187 	Assert(IsA(subqueryRte->subquery, Query));
188 
189 	return true;
190 }
191 
192 
193 /*
194  * CoordinatorInsertSelectExecScan is a wrapper around
195  * CoordinatorInsertSelectExecScanInternal which also properly increments
196  * or decrements insertSelectExecutorLevel.
197  */
198 DistributedPlan *
CreateInsertSelectPlan(uint64 planId,Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext,ParamListInfo boundParams)199 CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
200 					   PlannerRestrictionContext *plannerRestrictionContext,
201 					   ParamListInfo boundParams)
202 {
203 	DistributedPlan *result = NULL;
204 	insertSelectPlannerLevel++;
205 
206 	PG_TRY();
207 	{
208 		result = CreateInsertSelectPlanInternal(planId, originalQuery,
209 												plannerRestrictionContext, boundParams);
210 	}
211 	PG_CATCH();
212 	{
213 		insertSelectPlannerLevel--;
214 		PG_RE_THROW();
215 	}
216 	PG_END_TRY();
217 
218 	insertSelectPlannerLevel--;
219 	return result;
220 }
221 
222 
223 /*
224  * CreateInsertSelectPlan tries to create a distributed plan for an
225  * INSERT INTO distributed_table SELECT ... query by push down the
226  * command to the workers and if that is not possible it creates a
227  * plan for evaluating the SELECT on the coordinator.
228  */
229 static DistributedPlan *
CreateInsertSelectPlanInternal(uint64 planId,Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext,ParamListInfo boundParams)230 CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
231 							   PlannerRestrictionContext *plannerRestrictionContext,
232 							   ParamListInfo boundParams)
233 {
234 	DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery);
235 	if (deferredError != NULL)
236 	{
237 		/* raising the error as there is no possible solution for the unsupported on conflict statements */
238 		RaiseDeferredError(deferredError, ERROR);
239 	}
240 
241 	DistributedPlan *distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
242 																		 plannerRestrictionContext);
243 
244 	if (distributedPlan->planningError != NULL)
245 	{
246 		RaiseDeferredError(distributedPlan->planningError, DEBUG1);
247 
248 		/*
249 		 * If INSERT..SELECT cannot be distributed, pull to coordinator or use
250 		 * repartitioning.
251 		 */
252 		distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery,
253 															boundParams);
254 	}
255 
256 	return distributedPlan;
257 }
258 
259 
260 /*
261  * CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed
262  * INSERT ... SELECT queries which could consist of multiple tasks.
263  *
264  * The function never returns NULL, it errors out if cannot create the DistributedPlan.
265  */
266 static DistributedPlan *
CreateDistributedInsertSelectPlan(Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext)267 CreateDistributedInsertSelectPlan(Query *originalQuery,
268 								  PlannerRestrictionContext *plannerRestrictionContext)
269 {
270 	List *sqlTaskList = NIL;
271 	uint32 taskIdIndex = 1;     /* 0 is reserved for invalid taskId */
272 	uint64 jobId = INVALID_JOB_ID;
273 	DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
274 	RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery);
275 	RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
276 	Oid targetRelationId = insertRte->relid;
277 	CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
278 	int shardCount = targetCacheEntry->shardIntervalArrayLength;
279 	RelationRestrictionContext *relationRestrictionContext =
280 		plannerRestrictionContext->relationRestrictionContext;
281 	bool allReferenceTables = relationRestrictionContext->allReferenceTables;
282 
283 	distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery);
284 
285 	/*
286 	 * Error semantics for INSERT ... SELECT queries are different than regular
287 	 * modify queries. Thus, handle separately.
288 	 */
289 	distributedPlan->planningError = DistributedInsertSelectSupported(originalQuery,
290 																	  insertRte,
291 																	  subqueryRte,
292 																	  allReferenceTables);
293 	if (distributedPlan->planningError)
294 	{
295 		return distributedPlan;
296 	}
297 
298 	bool allDistributionKeysInQueryAreEqual =
299 		AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext);
300 
301 	/*
302 	 * Plan select query for each shard in the target table. Do so by replacing the
303 	 * partitioning qual parameter added in distributed_planner() using the current shard's
304 	 * actual boundary values. Also, add the current shard's boundary values to the
305 	 * top level subquery to ensure that even if the partitioning qual is not distributed
306 	 * to all the tables, we never run the queries on the shards that don't match with
307 	 * the current shard boundaries. Finally, perform the normal shard pruning to
308 	 * decide on whether to push the query to the current shard or not.
309 	 */
310 	for (int shardOffset = 0; shardOffset < shardCount; shardOffset++)
311 	{
312 		ShardInterval *targetShardInterval =
313 			targetCacheEntry->sortedShardIntervalArray[shardOffset];
314 
315 		Task *modifyTask = RouterModifyTaskForShardInterval(originalQuery,
316 															targetCacheEntry,
317 															targetShardInterval,
318 															plannerRestrictionContext,
319 															taskIdIndex,
320 															allDistributionKeysInQueryAreEqual,
321 															&distributedPlan->
322 															planningError);
323 
324 		if (distributedPlan->planningError != NULL)
325 		{
326 			return distributedPlan;
327 		}
328 
329 		/* add the task if it could be created */
330 		if (modifyTask != NULL)
331 		{
332 			modifyTask->modifyWithSubquery = true;
333 
334 			sqlTaskList = lappend(sqlTaskList, modifyTask);
335 		}
336 
337 		taskIdIndex++;
338 	}
339 
340 	/* Create the worker job */
341 	Job *workerJob = CitusMakeNode(Job);
342 	workerJob->taskList = sqlTaskList;
343 	workerJob->subqueryPushdown = false;
344 	workerJob->dependentJobList = NIL;
345 	workerJob->jobId = jobId;
346 	workerJob->jobQuery = originalQuery;
347 	workerJob->requiresCoordinatorEvaluation =
348 		RequiresCoordinatorEvaluation(originalQuery);
349 
350 	/* and finally the multi plan */
351 	distributedPlan->workerJob = workerJob;
352 	distributedPlan->combineQuery = NULL;
353 	distributedPlan->expectResults = originalQuery->returningList != NIL;
354 	distributedPlan->targetRelationId = targetRelationId;
355 
356 	return distributedPlan;
357 }
358 
359 
360 /*
361  * CreateInsertSelectIntoLocalTablePlan creates the plan for INSERT .. SELECT queries
362  * where the selected table is distributed and the inserted table is not.
363  *
364  * To create the plan, this function first creates a distributed plan for the SELECT
365  * part. Then puts it as a subquery to the original (non-distributed) INSERT query as
366  * a subquery. Finally, it puts this INSERT query, which now has a distributed SELECT
367  * subquery, in the combineQuery.
368  *
369  * If the SELECT query is a router query, whose distributed plan does not have a
370  * combineQuery, this function also creates a dummy combineQuery for that.
371  */
372 DistributedPlan *
CreateInsertSelectIntoLocalTablePlan(uint64 planId,Query * originalQuery,ParamListInfo boundParams,bool hasUnresolvedParams,PlannerRestrictionContext * plannerRestrictionContext)373 CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo
374 									 boundParams, bool hasUnresolvedParams,
375 									 PlannerRestrictionContext *plannerRestrictionContext)
376 {
377 	RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery);
378 
379 	Query *selectQuery = BuildSelectForInsertSelect(originalQuery);
380 	originalQuery->cteList = NIL;
381 	DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
382 													  copyObject(selectQuery),
383 													  boundParams, hasUnresolvedParams,
384 													  plannerRestrictionContext);
385 
386 	/*
387 	 * We don't expect distPlan to be NULL here because hasUnresolvedParams is
388 	 * already checked before this function and CreateDistributedPlan only returns
389 	 * NULL when there are unresolved parameters.
390 	 */
391 	Assert(distPlan != NULL);
392 
393 	if (distPlan->planningError)
394 	{
395 		return distPlan;
396 	}
397 
398 	if (distPlan->combineQuery == NULL)
399 	{
400 		/*
401 		 * For router queries, we construct a synthetic master query that simply passes
402 		 * on the results of the remote tasks, which we can then use as the select in
403 		 * the INSERT .. SELECT.
404 		 */
405 		distPlan->combineQuery = CreateCombineQueryForRouterPlan(
406 			distPlan);
407 	}
408 
409 	/*
410 	 * combineQuery of a distributed select is for combining the results from
411 	 * worker nodes on the coordinator node. Putting it as a subquery to the
412 	 * INSERT query, causes the INSERT query to insert the combined select value
413 	 * from the workers. And making the resulting insert query the combineQuery
414 	 * let's us execute this insert command.
415 	 *
416 	 * So this operation makes the master query insert the result of the
417 	 * distributed select instead of returning it.
418 	 */
419 	selectRte->subquery = distPlan->combineQuery;
420 	distPlan->combineQuery = originalQuery;
421 
422 	return distPlan;
423 }
424 
425 
426 /*
427  * CreateCombineQueryForRouterPlan is used for creating a dummy combineQuery
428  * for a router plan, since router plans normally don't have one.
429  */
430 static Query *
CreateCombineQueryForRouterPlan(DistributedPlan * distPlan)431 CreateCombineQueryForRouterPlan(DistributedPlan *distPlan)
432 {
433 	const Index insertTableId = 1;
434 	List *tableIdList = list_make1(makeInteger(insertTableId));
435 	Job *dependentJob = distPlan->workerJob;
436 	List *dependentTargetList = dependentJob->jobQuery->targetList;
437 
438 	/* compute column names for the derived table */
439 	uint32 columnCount = (uint32) list_length(dependentTargetList);
440 	List *columnNameList = DerivedColumnNameList(columnCount,
441 												 dependentJob->jobId);
442 
443 	List *funcColumnNames = NIL;
444 	List *funcColumnTypes = NIL;
445 	List *funcColumnTypeMods = NIL;
446 	List *funcCollations = NIL;
447 
448 	TargetEntry *targetEntry = NULL;
449 	foreach_ptr(targetEntry, dependentTargetList)
450 	{
451 		Node *expr = (Node *) targetEntry->expr;
452 
453 		char *name = targetEntry->resname;
454 		if (name == NULL)
455 		{
456 			name = pstrdup("unnamed");
457 		}
458 
459 		funcColumnNames = lappend(funcColumnNames, makeString(name));
460 
461 		funcColumnTypes = lappend_oid(funcColumnTypes, exprType(expr));
462 		funcColumnTypeMods = lappend_int(funcColumnTypeMods, exprTypmod(expr));
463 		funcCollations = lappend_oid(funcCollations, exprCollation(expr));
464 	}
465 
466 	RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(NULL,
467 															columnNameList,
468 															tableIdList,
469 															funcColumnNames,
470 															funcColumnTypes,
471 															funcColumnTypeMods,
472 															funcCollations);
473 
474 	List *targetList = CreateTargetListForCombineQuery(dependentTargetList);
475 
476 	RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
477 	rangeTableRef->rtindex = 1;
478 
479 	FromExpr *joinTree = makeNode(FromExpr);
480 	joinTree->quals = NULL;
481 	joinTree->fromlist = list_make1(rangeTableRef);
482 
483 	Query *combineQuery = makeNode(Query);
484 	combineQuery->commandType = CMD_SELECT;
485 	combineQuery->querySource = QSRC_ORIGINAL;
486 	combineQuery->canSetTag = true;
487 	combineQuery->rtable = list_make1(rangeTableEntry);
488 	combineQuery->targetList = targetList;
489 	combineQuery->jointree = joinTree;
490 	return combineQuery;
491 }
492 
493 
494 /*
495  * CreateTargetListForCombineQuery is used for creating a target list for
496  * master query.
497  */
498 static List *
CreateTargetListForCombineQuery(List * targetList)499 CreateTargetListForCombineQuery(List *targetList)
500 {
501 	List *newTargetEntryList = NIL;
502 	const uint32 masterTableId = 1;
503 	int columnId = 1;
504 
505 	/* iterate over original target entries */
506 	TargetEntry *originalTargetEntry = NULL;
507 	foreach_ptr(originalTargetEntry, targetList)
508 	{
509 		TargetEntry *newTargetEntry = flatCopyTargetEntry(originalTargetEntry);
510 
511 		Var *column = makeVarFromTargetEntry(masterTableId, originalTargetEntry);
512 		column->varattno = columnId;
513 		column->varattnosyn = columnId;
514 		columnId++;
515 
516 		if (column->vartype == RECORDOID || column->vartype == RECORDARRAYOID)
517 		{
518 			column->vartypmod = BlessRecordExpression(originalTargetEntry->expr);
519 		}
520 
521 		Expr *newExpression = (Expr *) column;
522 
523 		newTargetEntry->expr = newExpression;
524 		newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
525 	}
526 	return newTargetEntryList;
527 }
528 
529 
530 /*
531  * DistributedInsertSelectSupported returns NULL if the INSERT ... SELECT query
532  * is supported, or a description why not.
533  */
534 static DeferredErrorMessage *
DistributedInsertSelectSupported(Query * queryTree,RangeTblEntry * insertRte,RangeTblEntry * subqueryRte,bool allReferenceTables)535 DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
536 								 RangeTblEntry *subqueryRte, bool allReferenceTables)
537 {
538 	Oid selectPartitionColumnTableId = InvalidOid;
539 	Oid targetRelationId = insertRte->relid;
540 	ListCell *rangeTableCell = NULL;
541 
542 	/* we only do this check for INSERT ... SELECT queries */
543 	AssertArg(InsertSelectIntoCitusTable(queryTree));
544 
545 	Query *subquery = subqueryRte->subquery;
546 
547 	if (!NeedsDistributedPlanning(subquery))
548 	{
549 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
550 							 "distributed INSERT ... SELECT can only select from "
551 							 "distributed tables",
552 							 NULL, NULL);
553 	}
554 
555 	RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery);
556 	if (subqueryRteListProperties->hasDistributedTable &&
557 		(subqueryRteListProperties->hasCitusLocalTable ||
558 		 subqueryRteListProperties->hasPostgresLocalTable))
559 	{
560 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
561 							 "distributed INSERT ... SELECT cannot select from "
562 							 "distributed tables and local tables at the same time",
563 							 NULL, NULL);
564 	}
565 
566 	if (subqueryRteListProperties->hasDistributedTable &&
567 		IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
568 	{
569 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
570 							 "distributed INSERT ... SELECT cannot insert into a "
571 							 "local table that is added to metadata",
572 							 NULL, NULL);
573 	}
574 
575 	/*
576 	 * In some cases, it might be possible to allow postgres local tables
577 	 * in distributed insert select. However, we want to behave consistent
578 	 * on all cases including Citus MX, and let insert select via coordinator
579 	 * to kick-in.
580 	 */
581 	if (subqueryRteListProperties->hasPostgresLocalTable)
582 	{
583 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
584 							 "distributed INSERT ... SELECT cannot select from "
585 							 "a local table", NULL, NULL);
586 		return NULL;
587 	}
588 
589 	/* we do not expect to see a view in modify target */
590 	foreach(rangeTableCell, queryTree->rtable)
591 	{
592 		RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
593 		if (rangeTableEntry->rtekind == RTE_RELATION &&
594 			rangeTableEntry->relkind == RELKIND_VIEW)
595 		{
596 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
597 								 "cannot insert into view over distributed table",
598 								 NULL, NULL);
599 		}
600 	}
601 
602 	if (FindNodeMatchingCheckFunction((Node *) queryTree, CitusIsVolatileFunction))
603 	{
604 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
605 							 "volatile functions are not allowed in distributed "
606 							 "INSERT ... SELECT queries",
607 							 NULL, NULL);
608 	}
609 
610 	/* we don't support LIMIT, OFFSET and WINDOW functions */
611 	DeferredErrorMessage *error = MultiTaskRouterSelectQuerySupported(subquery);
612 	if (error)
613 	{
614 		return error;
615 	}
616 
617 	if (IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
618 	{
619 		/*
620 		 * If we're inserting into a citus local table, it is ok because we've
621 		 * checked the non-existence of distributed tables in the subquery.
622 		 */
623 	}
624 	else if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
625 	{
626 		/*
627 		 * If we're inserting into a reference table, all participating tables
628 		 * should be reference tables as well.
629 		 */
630 		if (!allReferenceTables)
631 		{
632 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
633 								 "only reference tables may be queried when targeting "
634 								 "a reference table with distributed INSERT ... SELECT",
635 								 NULL, NULL);
636 		}
637 	}
638 	else
639 	{
640 		/* ensure that INSERT's partition column comes from SELECT's partition column */
641 		error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
642 												   &selectPartitionColumnTableId);
643 		if (error)
644 		{
645 			return error;
646 		}
647 
648 		/*
649 		 * We expect partition column values come from colocated tables. Note that we
650 		 * skip this check from the reference table case given that all reference tables
651 		 * are already (and by default) co-located.
652 		 */
653 		if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
654 		{
655 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
656 								 "INSERT target table and the source relation of the SELECT partition "
657 								 "column value must be colocated in distributed INSERT ... SELECT",
658 								 NULL, NULL);
659 		}
660 	}
661 
662 	return NULL;
663 }
664 
665 
666 /*
667  * RouterModifyTaskForShardInterval creates a modify task by
668  * replacing the partitioning qual parameter added in distributed_planner()
669  * with the shardInterval's boundary value. Then perform the normal
670  * shard pruning on the subquery. Finally, checks if the target shardInterval
671  * has exactly same placements with the select task's available anchor
672  * placements.
673  *
674  * The function errors out if the subquery is not router select query (i.e.,
675  * subqueries with non equi-joins.).
676  */
677 static Task *
RouterModifyTaskForShardInterval(Query * originalQuery,CitusTableCacheEntry * targetTableCacheEntry,ShardInterval * shardInterval,PlannerRestrictionContext * plannerRestrictionContext,uint32 taskIdIndex,bool safeToPushdownSubquery,DeferredErrorMessage ** routerPlannerError)678 RouterModifyTaskForShardInterval(Query *originalQuery,
679 								 CitusTableCacheEntry *targetTableCacheEntry,
680 								 ShardInterval *shardInterval,
681 								 PlannerRestrictionContext *plannerRestrictionContext,
682 								 uint32 taskIdIndex,
683 								 bool safeToPushdownSubquery,
684 								 DeferredErrorMessage **routerPlannerError)
685 {
686 	Query *copiedQuery = copyObject(originalQuery);
687 	RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(copiedQuery);
688 	RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery);
689 	Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery;
690 
691 	uint64 shardId = shardInterval->shardId;
692 	Oid distributedTableId = shardInterval->relationId;
693 
694 	PlannerRestrictionContext *copyOfPlannerRestrictionContext = palloc0(
695 		sizeof(PlannerRestrictionContext));
696 
697 	StringInfo queryString = makeStringInfo();
698 	ListCell *restrictionCell = NULL;
699 	List *selectPlacementList = NIL;
700 	uint64 selectAnchorShardId = INVALID_SHARD_ID;
701 	List *relationShardList = NIL;
702 	List *prunedShardIntervalListList = NIL;
703 	uint64 jobId = INVALID_JOB_ID;
704 	bool allReferenceTables =
705 		plannerRestrictionContext->relationRestrictionContext->allReferenceTables;
706 	List *shardOpExpressions = NIL;
707 	RestrictInfo *shardRestrictionList = NULL;
708 	bool multiShardModifyQuery = false;
709 	List *relationRestrictionList = NIL;
710 
711 	copyOfPlannerRestrictionContext->relationRestrictionContext =
712 		CopyRelationRestrictionContext(
713 			plannerRestrictionContext->relationRestrictionContext);
714 	copyOfPlannerRestrictionContext->joinRestrictionContext =
715 		plannerRestrictionContext->joinRestrictionContext;
716 	copyOfPlannerRestrictionContext->fastPathRestrictionContext =
717 		plannerRestrictionContext->fastPathRestrictionContext;
718 
719 	relationRestrictionList =
720 		copyOfPlannerRestrictionContext->relationRestrictionContext->
721 		relationRestrictionList;
722 
723 	/* grab shared metadata lock to stop concurrent placement additions */
724 	LockShardDistributionMetadata(shardId, ShareLock);
725 
726 	/*
727 	 * Replace the partitioning qual parameter value in all baserestrictinfos.
728 	 * Note that this has to be done on a copy, as the walker modifies in place.
729 	 */
730 	foreach(restrictionCell, relationRestrictionList)
731 	{
732 		RelationRestriction *restriction = lfirst(restrictionCell);
733 		List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo;
734 		List *extendedBaseRestrictInfo = originalBaseRestrictInfo;
735 		Index rteIndex = restriction->index;
736 
737 		if (!safeToPushdownSubquery || allReferenceTables)
738 		{
739 			continue;
740 		}
741 
742 		shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex);
743 
744 		/* means it is a reference table and do not add any shard interval information  */
745 		if (shardOpExpressions == NIL)
746 		{
747 			continue;
748 		}
749 
750 
751 		/*
752 		 * passing NULL for plannerInfo will be problematic if we have placeholder
753 		 * vars. However, it won't be the case here because we are building
754 		 * the expression from shard intervals which don't have placeholder vars.
755 		 * Note that this is only the case with PG14 as the parameter doesn't exist
756 		 * prior to that.
757 		 */
758 		shardRestrictionList = make_simple_restrictinfo_compat(NULL,
759 															   (Expr *) shardOpExpressions);
760 		extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo,
761 										   shardRestrictionList);
762 
763 		restriction->relOptInfo->baserestrictinfo = extendedBaseRestrictInfo;
764 	}
765 
766 	/*
767 	 * We also need to add shard interval range to the subquery in case
768 	 * the partition qual not distributed all tables such as some
769 	 * subqueries in WHERE clause.
770 	 *
771 	 * Note that we need to add the ranges before the shard pruning to
772 	 * prevent shard pruning logic (i.e, namely UpdateRelationNames())
773 	 * modifies range table entries, which makes hard to add the quals.
774 	 */
775 	RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(
776 		copiedSubquery);
777 	if (subqueryRteListProperties->hasDistributedTable)
778 	{
779 		AddPartitionKeyNotNullFilterToSelect(copiedSubquery);
780 	}
781 
782 	/* mark that we don't want the router planner to generate dummy hosts/queries */
783 	bool replacePrunedQueryWithDummy = false;
784 
785 	/*
786 	 * Use router planner to decide on whether we can push down the query or not.
787 	 * If we can, we also rely on the side-effects that all RTEs have been updated
788 	 * to point to the relevant nodes and selectPlacementList is determined.
789 	 */
790 	DeferredErrorMessage *planningError = PlanRouterQuery(copiedSubquery,
791 														  copyOfPlannerRestrictionContext,
792 														  &selectPlacementList,
793 														  &selectAnchorShardId,
794 														  &relationShardList,
795 														  &prunedShardIntervalListList,
796 														  replacePrunedQueryWithDummy,
797 														  &multiShardModifyQuery, NULL,
798 														  false);
799 
800 	Assert(!multiShardModifyQuery);
801 
802 	if (planningError)
803 	{
804 		*routerPlannerError = planningError;
805 		return NULL;
806 	}
807 
808 
809 	/* ensure that we do not send queries where select is pruned away completely */
810 	if (list_length(selectPlacementList) == 0)
811 	{
812 		ereport(DEBUG2, (errmsg("Skipping target shard interval " UINT64_FORMAT
813 								" since SELECT query for it pruned away",
814 								shardId)));
815 
816 		return NULL;
817 	}
818 
819 	/* get the placements for insert target shard and its intersection with select */
820 	List *insertShardPlacementList = ActiveShardPlacementList(shardId);
821 	List *intersectedPlacementList = IntersectPlacementList(insertShardPlacementList,
822 															selectPlacementList);
823 
824 	/*
825 	 * If insert target does not have exactly the same placements with the select,
826 	 * we sholdn't run the query.
827 	 */
828 	if (list_length(insertShardPlacementList) != list_length(intersectedPlacementList))
829 	{
830 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
831 						errmsg("cannot perform distributed planning for the given "
832 							   "modification"),
833 						errdetail("Insert query cannot be executed on all placements "
834 								  "for shard " UINT64_FORMAT "", shardId)));
835 	}
836 
837 
838 	/* this is required for correct deparsing of the query */
839 	ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte);
840 
841 	/* setting an alias simplifies deparsing of RETURNING */
842 	if (copiedInsertRte->alias == NULL)
843 	{
844 		Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
845 		copiedInsertRte->alias = alias;
846 	}
847 
848 	/* and generate the full query string */
849 	deparse_shard_query(copiedQuery, distributedTableId, shardInterval->shardId,
850 						queryString);
851 	ereport(DEBUG2, (errmsg("distributed statement: %s",
852 							ApplyLogRedaction(queryString->data))));
853 
854 	Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
855 									   queryString->data);
856 	modifyTask->dependentTaskList = NULL;
857 	modifyTask->anchorShardId = shardId;
858 	modifyTask->taskPlacementList = insertShardPlacementList;
859 	modifyTask->relationShardList = relationShardList;
860 	modifyTask->replicationModel = targetTableCacheEntry->replicationModel;
861 
862 	return modifyTask;
863 }
864 
865 
866 /*
867  * ReorderInsertSelectTargetLists reorders the target lists of INSERT/SELECT
868  * query which is required for deparsing purposes. The reordered query is returned.
869  *
870  * The necessity for this function comes from the fact that ruleutils.c is not supposed
871  * to be used on "rewritten" queries (i.e. ones that have been passed through
872  * QueryRewrite()). Query rewriting is the process in which views and such are expanded,
873  * and, INSERT/UPDATE targetlists are reordered to match the physical order,
874  * defaults etc. For the details of reordeing, see transformInsertRow() and
875  * rewriteTargetListIU().
876  */
877 Query *
ReorderInsertSelectTargetLists(Query * originalQuery,RangeTblEntry * insertRte,RangeTblEntry * subqueryRte)878 ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
879 							   RangeTblEntry *subqueryRte)
880 {
881 	ListCell *insertTargetEntryCell;
882 	List *newSubqueryTargetlist = NIL;
883 	List *newInsertTargetlist = NIL;
884 	int resno = 1;
885 	Index insertTableId = 1;
886 	int targetEntryIndex = 0;
887 
888 	AssertArg(InsertSelectIntoCitusTable(originalQuery));
889 
890 	Query *subquery = subqueryRte->subquery;
891 
892 	Oid insertRelationId = insertRte->relid;
893 
894 	/*
895 	 * We implement the following algorithm for the reoderding:
896 	 *  - Iterate over the INSERT target list entries
897 	 *    - If the target entry includes a Var, find the corresponding
898 	 *      SELECT target entry on the original query and update resno
899 	 *    - If the target entry does not include a Var (i.e., defaults
900 	 *      or constants), create new target entry and add that to
901 	 *      SELECT target list
902 	 *    - Create a new INSERT target entry with respect to the new
903 	 *      SELECT target entry created.
904 	 */
905 	foreach(insertTargetEntryCell, originalQuery->targetList)
906 	{
907 		TargetEntry *oldInsertTargetEntry = lfirst(insertTargetEntryCell);
908 		TargetEntry *newSubqueryTargetEntry = NULL;
909 		AttrNumber originalAttrNo = get_attnum(insertRelationId,
910 											   oldInsertTargetEntry->resname);
911 
912 		/* see transformInsertRow() for the details */
913 		if (IsA(oldInsertTargetEntry->expr, ArrayRef) ||
914 			IsA(oldInsertTargetEntry->expr, FieldStore))
915 		{
916 			ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
917 							errmsg(
918 								"cannot plan distributed INSERT INTO ... SELECT query"),
919 							errhint("Do not use array references and field stores "
920 									"on the INSERT target list.")));
921 		}
922 
923 		/*
924 		 * It is safe to pull Var clause and ignore the coercions since that
925 		 * are already going to be added on the workers implicitly.
926 		 */
927 		List *targetVarList = pull_var_clause((Node *) oldInsertTargetEntry->expr,
928 											  PVC_RECURSE_AGGREGATES);
929 
930 		int targetVarCount = list_length(targetVarList);
931 
932 		/* a single INSERT target entry cannot have more than one Var */
933 		Assert(targetVarCount <= 1);
934 
935 		if (targetVarCount == 1)
936 		{
937 			Var *oldInsertVar = (Var *) linitial(targetVarList);
938 			TargetEntry *oldSubqueryTle = list_nth(subquery->targetList,
939 												   oldInsertVar->varattno - 1);
940 
941 			newSubqueryTargetEntry = copyObject(oldSubqueryTle);
942 
943 			newSubqueryTargetEntry->resno = resno;
944 			newSubqueryTargetlist = lappend(newSubqueryTargetlist,
945 											newSubqueryTargetEntry);
946 		}
947 		else
948 		{
949 			newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr,
950 													 resno,
951 													 oldInsertTargetEntry->resname,
952 													 oldInsertTargetEntry->resjunk);
953 			newSubqueryTargetlist = lappend(newSubqueryTargetlist,
954 											newSubqueryTargetEntry);
955 		}
956 
957 		/*
958 		 * The newly created select target entry cannot be a junk entry since junk
959 		 * entries are not in the final target list and we're processing the
960 		 * final target list entries.
961 		 */
962 		Assert(!newSubqueryTargetEntry->resjunk);
963 
964 		Var *newInsertVar = makeVar(insertTableId, originalAttrNo,
965 									exprType((Node *) newSubqueryTargetEntry->expr),
966 									exprTypmod((Node *) newSubqueryTargetEntry->expr),
967 									exprCollation((Node *) newSubqueryTargetEntry->expr),
968 									0);
969 		TargetEntry *newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar,
970 															originalAttrNo,
971 															oldInsertTargetEntry->resname,
972 															oldInsertTargetEntry->resjunk);
973 
974 		newInsertTargetlist = lappend(newInsertTargetlist, newInsertTargetEntry);
975 		resno++;
976 	}
977 
978 	/*
979 	 * if there are any remaining target list entries (i.e., GROUP BY column not on the
980 	 * target list of subquery), update the remaining resnos.
981 	 */
982 	int subqueryTargetLength = list_length(subquery->targetList);
983 	for (; targetEntryIndex < subqueryTargetLength; ++targetEntryIndex)
984 	{
985 		TargetEntry *oldSubqueryTle = list_nth(subquery->targetList,
986 											   targetEntryIndex);
987 
988 		/*
989 		 * Skip non-junk entries since we've already processed them above and this
990 		 * loop only is intended for junk entries.
991 		 */
992 		if (!oldSubqueryTle->resjunk)
993 		{
994 			continue;
995 		}
996 
997 		TargetEntry *newSubqueryTargetEntry = copyObject(oldSubqueryTle);
998 
999 		newSubqueryTargetEntry->resno = resno;
1000 		newSubqueryTargetlist = lappend(newSubqueryTargetlist,
1001 										newSubqueryTargetEntry);
1002 
1003 		resno++;
1004 	}
1005 
1006 	originalQuery->targetList = newInsertTargetlist;
1007 	subquery->targetList = newSubqueryTargetlist;
1008 
1009 	return NULL;
1010 }
1011 
1012 
1013 /*
1014  * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used
1015  * as the source for an INSERT ... SELECT or returns a description why not.
1016  */
1017 static DeferredErrorMessage *
MultiTaskRouterSelectQuerySupported(Query * query)1018 MultiTaskRouterSelectQuerySupported(Query *query)
1019 {
1020 	List *queryList = NIL;
1021 	ListCell *queryCell = NULL;
1022 	StringInfo errorDetail = NULL;
1023 	bool hasUnsupportedDistinctOn = false;
1024 
1025 	ExtractQueryWalker((Node *) query, &queryList);
1026 	foreach(queryCell, queryList)
1027 	{
1028 		Query *subquery = (Query *) lfirst(queryCell);
1029 
1030 		Assert(subquery->commandType == CMD_SELECT);
1031 
1032 		/* pushing down rtes without relations yields (shardCount * expectedRows) */
1033 		if (HasEmptyJoinTree(subquery))
1034 		{
1035 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1036 								 "Subqueries without relations are not allowed in "
1037 								 "distributed INSERT ... SELECT queries",
1038 								 NULL, NULL);
1039 		}
1040 
1041 		/* pushing down limit per shard would yield wrong results */
1042 		if (subquery->limitCount != NULL)
1043 		{
1044 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1045 								 "LIMIT clauses are not allowed in distributed INSERT "
1046 								 "... SELECT queries",
1047 								 NULL, NULL);
1048 		}
1049 
1050 		/* pushing down limit offest per shard would yield wrong results */
1051 		if (subquery->limitOffset != NULL)
1052 		{
1053 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1054 								 "OFFSET clauses are not allowed in distributed "
1055 								 "INSERT ... SELECT queries",
1056 								 NULL, NULL);
1057 		}
1058 
1059 		/* group clause list must include partition column */
1060 		if (subquery->groupClause)
1061 		{
1062 			List *groupClauseList = subquery->groupClause;
1063 			List *targetEntryList = subquery->targetList;
1064 			List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
1065 															  targetEntryList);
1066 			bool groupOnPartitionColumn = TargetListOnPartitionColumn(subquery,
1067 																	  groupTargetEntryList);
1068 			if (!groupOnPartitionColumn)
1069 			{
1070 				return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1071 									 "Group by list without distribution column is "
1072 									 "not allowed  in distributed INSERT ... "
1073 									 "SELECT queries",
1074 									 NULL, NULL);
1075 			}
1076 		}
1077 
1078 		/*
1079 		 * We support window functions when the window function
1080 		 * is partitioned on distribution column.
1081 		 */
1082 		if (subquery->windowClause && !SafeToPushdownWindowFunction(subquery,
1083 																	&errorDetail))
1084 		{
1085 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorDetail->data, NULL,
1086 								 NULL);
1087 		}
1088 
1089 		if (subquery->setOperations != NULL)
1090 		{
1091 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1092 								 "Set operations are not allowed in distributed "
1093 								 "INSERT ... SELECT queries",
1094 								 NULL, NULL);
1095 		}
1096 
1097 		/*
1098 		 * We currently do not support grouping sets since it could generate NULL
1099 		 * results even after the restrictions are applied to the query. A solution
1100 		 * would be to add the whole query into a subquery and add the restrictions
1101 		 * on that subquery.
1102 		 */
1103 		if (subquery->groupingSets != NULL)
1104 		{
1105 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1106 								 "grouping sets are not allowed in distributed "
1107 								 "INSERT ... SELECT queries",
1108 								 NULL, NULL);
1109 		}
1110 
1111 		/*
1112 		 * We don't support DISTINCT ON clauses on non-partition columns.
1113 		 */
1114 		hasUnsupportedDistinctOn = HasUnsupportedDistinctOn(subquery);
1115 		if (hasUnsupportedDistinctOn)
1116 		{
1117 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1118 								 "DISTINCT ON (non-partition column) clauses are not "
1119 								 "allowed in distributed INSERT ... SELECT queries",
1120 								 NULL, NULL);
1121 		}
1122 	}
1123 
1124 	return NULL;
1125 }
1126 
1127 
1128 /*
1129  * HasUnsupportedDistinctOn returns true if the query has distinct on and
1130  * distinct targets do not contain partition column.
1131  */
1132 static bool
HasUnsupportedDistinctOn(Query * query)1133 HasUnsupportedDistinctOn(Query *query)
1134 {
1135 	ListCell *distinctCell = NULL;
1136 
1137 	if (!query->hasDistinctOn)
1138 	{
1139 		return false;
1140 	}
1141 
1142 	foreach(distinctCell, query->distinctClause)
1143 	{
1144 		SortGroupClause *distinctClause = lfirst(distinctCell);
1145 		TargetEntry *distinctEntry = get_sortgroupclause_tle(distinctClause,
1146 															 query->targetList);
1147 
1148 		bool skipOuterVars = true;
1149 		if (IsPartitionColumn(distinctEntry->expr, query, skipOuterVars))
1150 		{
1151 			return false;
1152 		}
1153 	}
1154 
1155 	return true;
1156 }
1157 
1158 
1159 /*
1160  * InsertPartitionColumnMatchesSelect returns NULL the partition column in the
1161  * table targeted by INSERTed matches with the any of the SELECTed table's
1162  * partition column.  Returns the error description if there's no match.
1163  *
1164  * On return without error (i.e., if partition columns match), the function
1165  * also sets selectPartitionColumnTableId.
1166  */
1167 static DeferredErrorMessage *
InsertPartitionColumnMatchesSelect(Query * query,RangeTblEntry * insertRte,RangeTblEntry * subqueryRte,Oid * selectPartitionColumnTableId)1168 InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
1169 								   RangeTblEntry *subqueryRte,
1170 								   Oid *selectPartitionColumnTableId)
1171 {
1172 	ListCell *targetEntryCell = NULL;
1173 	uint32 rangeTableId = 1;
1174 	Oid insertRelationId = insertRte->relid;
1175 	Var *insertPartitionColumn = PartitionColumn(insertRelationId, rangeTableId);
1176 	Query *subquery = subqueryRte->subquery;
1177 	bool targetTableHasPartitionColumn = false;
1178 
1179 	foreach(targetEntryCell, query->targetList)
1180 	{
1181 		TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
1182 		List *insertTargetEntryColumnList = pull_var_clause_default((Node *) targetEntry);
1183 		Var *subqueryPartitionColumn = NULL;
1184 
1185 		/*
1186 		 * We only consider target entries that include a single column. Note that this
1187 		 * is slightly different than directly checking the whether the targetEntry->expr
1188 		 * is a var since the var could be wrapped into an implicit/explicit casting.
1189 		 *
1190 		 * Also note that we skip the target entry if it does not contain a Var, which
1191 		 * corresponds to columns with DEFAULT values on the target list.
1192 		 */
1193 		if (list_length(insertTargetEntryColumnList) != 1)
1194 		{
1195 			continue;
1196 		}
1197 
1198 		Var *insertVar = (Var *) linitial(insertTargetEntryColumnList);
1199 		AttrNumber originalAttrNo = targetEntry->resno;
1200 
1201 		/* skip processing of target table non-partition columns */
1202 		if (originalAttrNo != insertPartitionColumn->varattno)
1203 		{
1204 			continue;
1205 		}
1206 
1207 		/* INSERT query includes the partition column */
1208 		targetTableHasPartitionColumn = true;
1209 
1210 		TargetEntry *subqueryTargetEntry = list_nth(subquery->targetList,
1211 													insertVar->varattno - 1);
1212 		Expr *selectTargetExpr = subqueryTargetEntry->expr;
1213 
1214 		RangeTblEntry *subqueryPartitionColumnRelationIdRTE = NULL;
1215 		List *parentQueryList = list_make2(query, subquery);
1216 		bool skipOuterVars = true;
1217 		FindReferencedTableColumn(selectTargetExpr,
1218 								  parentQueryList, subquery,
1219 								  &subqueryPartitionColumn,
1220 								  &subqueryPartitionColumnRelationIdRTE,
1221 								  skipOuterVars);
1222 		Oid subqueryPartitionColumnRelationId = subqueryPartitionColumnRelationIdRTE ?
1223 												subqueryPartitionColumnRelationIdRTE->
1224 												relid :
1225 												InvalidOid;
1226 
1227 		/*
1228 		 * Corresponding (i.e., in the same ordinal position as the target table's
1229 		 * partition column) select target entry does not directly belong a table.
1230 		 * Evaluate its expression type and error out properly.
1231 		 */
1232 		if (subqueryPartitionColumnRelationId == InvalidOid)
1233 		{
1234 			char *errorDetailTemplate = "Subquery contains %s in the "
1235 										"same position as the target table's "
1236 										"partition column.";
1237 
1238 			char *exprDescription = "";
1239 
1240 			switch (selectTargetExpr->type)
1241 			{
1242 				case T_Const:
1243 				{
1244 					exprDescription = "a constant value";
1245 					break;
1246 				}
1247 
1248 				case T_OpExpr:
1249 				{
1250 					exprDescription = "an operator";
1251 					break;
1252 				}
1253 
1254 				case T_FuncExpr:
1255 				{
1256 					FuncExpr *subqueryFunctionExpr = (FuncExpr *) selectTargetExpr;
1257 
1258 					switch (subqueryFunctionExpr->funcformat)
1259 					{
1260 						case COERCE_EXPLICIT_CALL:
1261 						{
1262 							exprDescription = "a function call";
1263 							break;
1264 						}
1265 
1266 						case COERCE_EXPLICIT_CAST:
1267 						{
1268 							exprDescription = "an explicit cast";
1269 							break;
1270 						}
1271 
1272 						case COERCE_IMPLICIT_CAST:
1273 						{
1274 							exprDescription = "an implicit cast";
1275 							break;
1276 						}
1277 
1278 						default:
1279 						{
1280 							exprDescription = "a function call";
1281 							break;
1282 						}
1283 					}
1284 					break;
1285 				}
1286 
1287 				case T_Aggref:
1288 				{
1289 					exprDescription = "an aggregation";
1290 					break;
1291 				}
1292 
1293 				case T_CaseExpr:
1294 				{
1295 					exprDescription = "a case expression";
1296 					break;
1297 				}
1298 
1299 				case T_CoalesceExpr:
1300 				{
1301 					exprDescription = "a coalesce expression";
1302 					break;
1303 				}
1304 
1305 				case T_RowExpr:
1306 				{
1307 					exprDescription = "a row expression";
1308 					break;
1309 				}
1310 
1311 				case T_MinMaxExpr:
1312 				{
1313 					exprDescription = "a min/max expression";
1314 					break;
1315 				}
1316 
1317 				case T_CoerceViaIO:
1318 				{
1319 					exprDescription = "an explicit coercion";
1320 					break;
1321 				}
1322 
1323 				default:
1324 				{
1325 					exprDescription =
1326 						"an expression that is not a simple column reference";
1327 					break;
1328 				}
1329 			}
1330 
1331 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1332 								 "cannot perform distributed INSERT INTO ... SELECT "
1333 								 "because the partition columns in the source table "
1334 								 "and subquery do not match",
1335 								 psprintf(errorDetailTemplate, exprDescription),
1336 								 "Ensure the target table's partition column has a "
1337 								 "corresponding simple column reference to a distributed "
1338 								 "table's partition column in the subquery.");
1339 		}
1340 
1341 		/*
1342 		 * Insert target expression could only be non-var if the select target
1343 		 * entry does not have the same type (i.e., target column requires casting).
1344 		 */
1345 		if (!IsA(targetEntry->expr, Var))
1346 		{
1347 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1348 								 "cannot perform distributed INSERT INTO ... SELECT "
1349 								 "because the partition columns in the source table "
1350 								 "and subquery do not match",
1351 								 "The data type of the target table's partition column "
1352 								 "should exactly match the data type of the "
1353 								 "corresponding simple column reference in the subquery.",
1354 								 NULL);
1355 		}
1356 
1357 		/* finally, check that the select target column is a partition column */
1358 		if (!IsPartitionColumn(selectTargetExpr, subquery, skipOuterVars))
1359 		{
1360 			return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1361 								 "cannot perform distributed INSERT INTO ... SELECT "
1362 								 "because the partition columns in the source table "
1363 								 "and subquery do not match",
1364 								 "The target table's partition column should correspond "
1365 								 "to a partition column in the subquery.",
1366 								 NULL);
1367 		}
1368 
1369 		/* finally, check that the select target column is a partition column */
1370 		/* we can set the select relation id */
1371 		*selectPartitionColumnTableId = subqueryPartitionColumnRelationId;
1372 
1373 		break;
1374 	}
1375 
1376 	if (!targetTableHasPartitionColumn)
1377 	{
1378 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1379 							 "cannot perform distributed INSERT INTO ... SELECT "
1380 							 "because the partition columns in the source table "
1381 							 "and subquery do not match",
1382 							 "the query doesn't include the target table's "
1383 							 "partition column",
1384 							 NULL);
1385 	}
1386 
1387 	return NULL;
1388 }
1389 
1390 
1391 /*
1392  * CreateNonPushableInsertSelectPlan creates a query plan for a SELECT into a
1393  * distributed table. The query plan can also be executed on a worker in MX.
1394  */
1395 static DistributedPlan *
CreateNonPushableInsertSelectPlan(uint64 planId,Query * parse,ParamListInfo boundParams)1396 CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
1397 {
1398 	Query *insertSelectQuery = copyObject(parse);
1399 
1400 	RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
1401 	RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
1402 	Oid targetRelationId = insertRte->relid;
1403 
1404 	DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
1405 	distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
1406 
1407 	distributedPlan->planningError =
1408 		NonPushableInsertSelectSupported(insertSelectQuery);
1409 
1410 	if (distributedPlan->planningError != NULL)
1411 	{
1412 		return distributedPlan;
1413 	}
1414 
1415 	Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
1416 
1417 	selectRte->subquery = selectQuery;
1418 	ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
1419 
1420 	/*
1421 	 * Cast types of insert target list and select projection list to
1422 	 * match the column types of the target relation.
1423 	 */
1424 	selectQuery->targetList =
1425 		AddInsertSelectCasts(insertSelectQuery->targetList,
1426 							 selectQuery->targetList,
1427 							 targetRelationId);
1428 
1429 	/*
1430 	 * Later we might need to call WrapTaskListForProjection(), which requires
1431 	 * that select target list has unique names, otherwise the outer query
1432 	 * cannot select columns unambiguously. So we relabel select columns to
1433 	 * match target columns.
1434 	 */
1435 	List *insertTargetList = insertSelectQuery->targetList;
1436 	RelabelTargetEntryList(selectQuery->targetList, insertTargetList);
1437 
1438 	/*
1439 	 * Make a copy of the select query, since following code scribbles it
1440 	 * but we need to keep the original for EXPLAIN.
1441 	 */
1442 	Query *selectQueryCopy = copyObject(selectQuery);
1443 
1444 	/* plan the subquery, this may be another distributed query */
1445 	int cursorOptions = CURSOR_OPT_PARALLEL_OK;
1446 	PlannedStmt *selectPlan = pg_plan_query_compat(selectQueryCopy, NULL, cursorOptions,
1447 												   boundParams);
1448 
1449 	bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
1450 						 IsSupportedRedistributionTarget(targetRelationId);
1451 
1452 	distributedPlan->insertSelectQuery = insertSelectQuery;
1453 	distributedPlan->selectPlanForInsertSelect = selectPlan;
1454 	distributedPlan->insertSelectMethod = repartitioned ?
1455 										  INSERT_SELECT_REPARTITION :
1456 										  INSERT_SELECT_VIA_COORDINATOR;
1457 	distributedPlan->expectResults = insertSelectQuery->returningList != NIL;
1458 	distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId);
1459 	distributedPlan->targetRelationId = targetRelationId;
1460 
1461 	return distributedPlan;
1462 }
1463 
1464 
1465 /*
1466  * NonPushableInsertSelectSupported returns an error if executing an
1467  * INSERT ... SELECT command by pulling results of the SELECT to the coordinator
1468  * or with repartitioning is unsupported because it needs to generate sequence
1469  * values or insert into an append-distributed table.
1470  */
1471 static DeferredErrorMessage *
NonPushableInsertSelectSupported(Query * insertSelectQuery)1472 NonPushableInsertSelectSupported(Query *insertSelectQuery)
1473 {
1474 	DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(
1475 		insertSelectQuery);
1476 	if (deferredError)
1477 	{
1478 		return deferredError;
1479 	}
1480 
1481 	RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
1482 	if (IsCitusTableType(insertRte->relid, APPEND_DISTRIBUTED))
1483 	{
1484 		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1485 							 "INSERT ... SELECT into an append-distributed table is "
1486 							 "not supported", NULL, NULL);
1487 	}
1488 
1489 	return NULL;
1490 }
1491 
1492 
1493 /*
1494  * InsertSelectResultPrefix returns the prefix to use for intermediate
1495  * results of an INSERT ... SELECT via the coordinator that runs in two
1496  * phases in order to do RETURNING or ON CONFLICT.
1497  */
1498 char *
InsertSelectResultIdPrefix(uint64 planId)1499 InsertSelectResultIdPrefix(uint64 planId)
1500 {
1501 	StringInfo resultIdPrefix = makeStringInfo();
1502 
1503 	appendStringInfo(resultIdPrefix, "insert_select_" UINT64_FORMAT, planId);
1504 
1505 	return resultIdPrefix->data;
1506 }
1507 
1508 
1509 /*
1510  * RelabelTargetEntryList relabels select target list to have matching names with
1511  * insert target list.
1512  */
1513 static void
RelabelTargetEntryList(List * selectTargetList,List * insertTargetList)1514 RelabelTargetEntryList(List *selectTargetList, List *insertTargetList)
1515 {
1516 	ListCell *selectTargetCell = NULL;
1517 	ListCell *insertTargetCell = NULL;
1518 
1519 	forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList)
1520 	{
1521 		TargetEntry *selectTargetEntry = lfirst(selectTargetCell);
1522 		TargetEntry *insertTargetEntry = lfirst(insertTargetCell);
1523 
1524 		selectTargetEntry->resname = insertTargetEntry->resname;
1525 	}
1526 }
1527 
1528 
1529 /*
1530  * AddInsertSelectCasts makes sure that the types in columns in the given
1531  * target lists have the same type as the columns of the given relation.
1532  * It might add casts to ensure that.
1533  *
1534  * It returns the updated selectTargetList.
1535  */
1536 static List *
AddInsertSelectCasts(List * insertTargetList,List * selectTargetList,Oid targetRelationId)1537 AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
1538 					 Oid targetRelationId)
1539 {
1540 	ListCell *insertEntryCell = NULL;
1541 	ListCell *selectEntryCell = NULL;
1542 	List *projectedEntries = NIL;
1543 	List *nonProjectedEntries = NIL;
1544 
1545 	/*
1546 	 * ReorderInsertSelectTargetLists() makes sure that first few columns of
1547 	 * the SELECT query match the insert targets. It might contain additional
1548 	 * items for GROUP BY, etc.
1549 	 */
1550 	Assert(list_length(insertTargetList) <= list_length(selectTargetList));
1551 
1552 	Relation distributedRelation = table_open(targetRelationId, RowExclusiveLock);
1553 	TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
1554 
1555 	int targetEntryIndex = 0;
1556 	forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList)
1557 	{
1558 		TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
1559 		TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
1560 		Var *insertColumn = (Var *) insertEntry->expr;
1561 		Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
1562 											   insertEntry->resno - 1);
1563 
1564 		Oid sourceType = insertColumn->vartype;
1565 		Oid targetType = attr->atttypid;
1566 		if (sourceType != targetType)
1567 		{
1568 			insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
1569 										 attr->attcollation, attr->atttypmod);
1570 
1571 			/*
1572 			 * We cannot modify the selectEntry in-place, because ORDER BY or
1573 			 * GROUP BY clauses might be pointing to it with comparison types
1574 			 * of the source type. So instead we keep the original one as a
1575 			 * non-projected entry, so GROUP BY and ORDER BY are happy, and
1576 			 * create a duplicated projected entry with the coerced expression.
1577 			 */
1578 			TargetEntry *coercedEntry = copyObject(selectEntry);
1579 			coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType,
1580 										  targetType, attr->attcollation,
1581 										  attr->atttypmod);
1582 			coercedEntry->ressortgroupref = 0;
1583 
1584 			/*
1585 			 * The only requirement is that users don't use this name in ORDER BY
1586 			 * or GROUP BY, and it should be unique across the same query.
1587 			 */
1588 			StringInfo resnameString = makeStringInfo();
1589 			appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex);
1590 			coercedEntry->resname = resnameString->data;
1591 
1592 			projectedEntries = lappend(projectedEntries, coercedEntry);
1593 
1594 			if (selectEntry->ressortgroupref != 0)
1595 			{
1596 				selectEntry->resjunk = true;
1597 
1598 				/*
1599 				 * This entry might still end up in the SELECT output list, so
1600 				 * rename it to avoid ambiguity.
1601 				 *
1602 				 * See https://github.com/citusdata/citus/pull/3470.
1603 				 */
1604 				resnameString = makeStringInfo();
1605 				appendStringInfo(resnameString, "discarded_target_item_%d",
1606 								 targetEntryIndex);
1607 				selectEntry->resname = resnameString->data;
1608 
1609 				nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
1610 			}
1611 		}
1612 		else
1613 		{
1614 			projectedEntries = lappend(projectedEntries, selectEntry);
1615 		}
1616 
1617 		targetEntryIndex++;
1618 	}
1619 
1620 	for (int entryIndex = list_length(insertTargetList);
1621 		 entryIndex < list_length(selectTargetList);
1622 		 entryIndex++)
1623 	{
1624 		nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList,
1625 																	entryIndex));
1626 	}
1627 
1628 	/* selectEntry->resno must be the ordinal number of the entry */
1629 	selectTargetList = list_concat(projectedEntries, nonProjectedEntries);
1630 	int entryResNo = 1;
1631 	TargetEntry *selectTargetEntry = NULL;
1632 	foreach_ptr(selectTargetEntry, selectTargetList)
1633 	{
1634 		selectTargetEntry->resno = entryResNo++;
1635 	}
1636 
1637 	table_close(distributedRelation, NoLock);
1638 
1639 	return selectTargetList;
1640 }
1641 
1642 
1643 /*
1644  * CastExpr returns an expression which casts the given expr from sourceType to
1645  * the given targetType.
1646  */
1647 static Expr *
CastExpr(Expr * expr,Oid sourceType,Oid targetType,Oid targetCollation,int targetTypeMod)1648 CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
1649 		 int targetTypeMod)
1650 {
1651 	Oid coercionFuncId = InvalidOid;
1652 	CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType,
1653 														  COERCION_EXPLICIT,
1654 														  &coercionFuncId);
1655 
1656 	if (coercionType == COERCION_PATH_FUNC)
1657 	{
1658 		FuncExpr *coerceExpr = makeNode(FuncExpr);
1659 		coerceExpr->funcid = coercionFuncId;
1660 		coerceExpr->args = list_make1(copyObject(expr));
1661 		coerceExpr->funccollid = targetCollation;
1662 		coerceExpr->funcresulttype = targetType;
1663 
1664 		return (Expr *) coerceExpr;
1665 	}
1666 	else if (coercionType == COERCION_PATH_RELABELTYPE)
1667 	{
1668 		RelabelType *coerceExpr = makeNode(RelabelType);
1669 		coerceExpr->arg = copyObject(expr);
1670 		coerceExpr->resulttype = targetType;
1671 		coerceExpr->resulttypmod = targetTypeMod;
1672 		coerceExpr->resultcollid = targetCollation;
1673 		coerceExpr->relabelformat = COERCE_IMPLICIT_CAST;
1674 		coerceExpr->location = -1;
1675 
1676 		return (Expr *) coerceExpr;
1677 	}
1678 	else if (coercionType == COERCION_PATH_ARRAYCOERCE)
1679 	{
1680 		Oid sourceBaseType = get_base_element_type(sourceType);
1681 		Oid targetBaseType = get_base_element_type(targetType);
1682 
1683 		CaseTestExpr *elemExpr = makeNode(CaseTestExpr);
1684 		elemExpr->collation = targetCollation;
1685 		elemExpr->typeId = sourceBaseType;
1686 		elemExpr->typeMod = -1;
1687 
1688 		Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType,
1689 									  targetBaseType, targetCollation,
1690 									  targetTypeMod);
1691 
1692 		ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr);
1693 		coerceExpr->arg = copyObject(expr);
1694 		coerceExpr->elemexpr = elemCastExpr;
1695 		coerceExpr->resultcollid = targetCollation;
1696 		coerceExpr->resulttype = targetType;
1697 		coerceExpr->resulttypmod = targetTypeMod;
1698 		coerceExpr->location = -1;
1699 		coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
1700 
1701 		return (Expr *) coerceExpr;
1702 	}
1703 	else if (coercionType == COERCION_PATH_COERCEVIAIO)
1704 	{
1705 		CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
1706 		coerceExpr->arg = (Expr *) copyObject(expr);
1707 		coerceExpr->resulttype = targetType;
1708 		coerceExpr->resultcollid = targetCollation;
1709 		coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
1710 		coerceExpr->location = -1;
1711 
1712 		return (Expr *) coerceExpr;
1713 	}
1714 	else
1715 	{
1716 		ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d",
1717 							   sourceType, targetType)));
1718 	}
1719 }
1720 
1721 
1722 /* PlanningInsertSelect returns true if we are planning an INSERT ...SELECT query */
1723 bool
PlanningInsertSelect(void)1724 PlanningInsertSelect(void)
1725 {
1726 	return insertSelectPlannerLevel > 0;
1727 }
1728