1 /*-------------------------------------------------------------------------
2 *
3 * insert_select_planner.c
4 *
5 * Planning logic for INSERT..SELECT.
6 *
7 * Copyright (c) Citus Data, Inc.
8 *-------------------------------------------------------------------------
9 */
10
11 #include "postgres.h"
12
13 #include "distributed/pg_version_constants.h"
14
15 #include "catalog/pg_class.h"
16 #include "catalog/pg_type.h"
17 #include "distributed/citus_clauses.h"
18 #include "distributed/citus_ruleutils.h"
19 #include "distributed/colocation_utils.h"
20 #include "distributed/errormessage.h"
21 #include "distributed/listutils.h"
22 #include "distributed/log_utils.h"
23 #include "distributed/insert_select_executor.h"
24 #include "distributed/insert_select_planner.h"
25 #include "distributed/metadata_cache.h"
26 #include "distributed/multi_executor.h"
27 #include "distributed/multi_logical_planner.h"
28 #include "distributed/multi_logical_optimizer.h"
29 #include "distributed/multi_physical_planner.h"
30 #include "distributed/multi_router_planner.h"
31 #include "distributed/pg_dist_partition.h"
32 #include "distributed/query_pushdown_planning.h"
33 #include "distributed/recursive_planning.h"
34 #include "distributed/resource_lock.h"
35 #include "distributed/version_compat.h"
36 #include "nodes/makefuncs.h"
37 #include "nodes/nodeFuncs.h"
38 #include "nodes/parsenodes.h"
39 #include "optimizer/clauses.h"
40 #include "optimizer/planner.h"
41 #include "optimizer/restrictinfo.h"
42 #include "optimizer/tlist.h"
43 #include "optimizer/optimizer.h"
44 #include "parser/parsetree.h"
45 #include "parser/parse_coerce.h"
46 #include "parser/parse_relation.h"
47 #include "tcop/tcopprot.h"
48 #include "utils/builtins.h"
49 #include "utils/lsyscache.h"
50 #include "utils/rel.h"
51
52
53 static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
54 Query *originalQuery,
55 PlannerRestrictionContext *
56 plannerRestrictionContext,
57 ParamListInfo boundParams);
58 static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
59 PlannerRestrictionContext *
60 plannerRestrictionContext);
61 static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
62 CitusTableCacheEntry *targetTableCacheEntry,
63 ShardInterval *shardInterval,
64 PlannerRestrictionContext *
65 plannerRestrictionContext,
66 uint32 taskIdIndex,
67 bool allRelationsJoinedOnPartitionKey,
68 DeferredErrorMessage **routerPlannerError);
69 static Query * CreateCombineQueryForRouterPlan(DistributedPlan *distPlan);
70 static List * CreateTargetListForCombineQuery(List *targetList);
71 static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree,
72 RangeTblEntry *insertRte,
73 RangeTblEntry *subqueryRte,
74 bool allReferenceTables);
75 static DeferredErrorMessage * MultiTaskRouterSelectQuerySupported(Query *query);
76 static bool HasUnsupportedDistinctOn(Query *query);
77 static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
78 RangeTblEntry *insertRte,
79 RangeTblEntry *
80 subqueryRte,
81 Oid *
82 selectPartitionColumnTableId);
83 static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
84 ParamListInfo boundParams);
85 static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
86 static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
87 static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
88 Oid targetRelationId);
89 static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
90 int targetTypeMod);
91
92
93 /* depth of current insert/select planner. */
94 static int insertSelectPlannerLevel = 0;
95
96
97 /*
98 * InsertSelectIntoCitusTable returns true when the input query is an
99 * INSERT INTO ... SELECT kind of query and the target is a citus
100 * table.
101 *
102 * Note that the input query should be the original parsetree of
103 * the query (i.e., not passed trough the standard planner).
104 */
105 bool
InsertSelectIntoCitusTable(Query * query)106 InsertSelectIntoCitusTable(Query *query)
107 {
108 bool insertSelectQuery = CheckInsertSelectQuery(query);
109
110 if (insertSelectQuery)
111 {
112 RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
113 if (IsCitusTable(insertRte->relid))
114 {
115 return true;
116 }
117 }
118
119 return false;
120 }
121
122
123 /*
124 * InsertSelectIntoLocalTable checks whether INSERT INTO ... SELECT inserts
125 * into local table. Note that query must be a sample of INSERT INTO ... SELECT
126 * type of query.
127 */
128 bool
InsertSelectIntoLocalTable(Query * query)129 InsertSelectIntoLocalTable(Query *query)
130 {
131 bool insertSelectQuery = CheckInsertSelectQuery(query);
132
133 if (insertSelectQuery)
134 {
135 RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
136 if (!IsCitusTable(insertRte->relid))
137 {
138 return true;
139 }
140 }
141
142 return false;
143 }
144
145
146 /*
147 * CheckInsertSelectQuery returns true when the input query is an INSERT INTO
148 * ... SELECT kind of query.
149 *
150 * This function is inspired from getInsertSelectQuery() on
151 * rewrite/rewriteManip.c.
152 */
153 bool
CheckInsertSelectQuery(Query * query)154 CheckInsertSelectQuery(Query *query)
155 {
156 CmdType commandType = query->commandType;
157
158 if (commandType != CMD_INSERT)
159 {
160 return false;
161 }
162
163 if (query->jointree == NULL || !IsA(query->jointree, FromExpr))
164 {
165 return false;
166 }
167
168 List *fromList = query->jointree->fromlist;
169 if (list_length(fromList) != 1)
170 {
171 return false;
172 }
173
174 RangeTblRef *rangeTableReference = linitial(fromList);
175 if (!IsA(rangeTableReference, RangeTblRef))
176 {
177 return false;
178 }
179
180 RangeTblEntry *subqueryRte = rt_fetch(rangeTableReference->rtindex, query->rtable);
181 if (subqueryRte->rtekind != RTE_SUBQUERY)
182 {
183 return false;
184 }
185
186 /* ensure that there is a query */
187 Assert(IsA(subqueryRte->subquery, Query));
188
189 return true;
190 }
191
192
193 /*
194 * CoordinatorInsertSelectExecScan is a wrapper around
195 * CoordinatorInsertSelectExecScanInternal which also properly increments
196 * or decrements insertSelectExecutorLevel.
197 */
198 DistributedPlan *
CreateInsertSelectPlan(uint64 planId,Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext,ParamListInfo boundParams)199 CreateInsertSelectPlan(uint64 planId, Query *originalQuery,
200 PlannerRestrictionContext *plannerRestrictionContext,
201 ParamListInfo boundParams)
202 {
203 DistributedPlan *result = NULL;
204 insertSelectPlannerLevel++;
205
206 PG_TRY();
207 {
208 result = CreateInsertSelectPlanInternal(planId, originalQuery,
209 plannerRestrictionContext, boundParams);
210 }
211 PG_CATCH();
212 {
213 insertSelectPlannerLevel--;
214 PG_RE_THROW();
215 }
216 PG_END_TRY();
217
218 insertSelectPlannerLevel--;
219 return result;
220 }
221
222
223 /*
224 * CreateInsertSelectPlan tries to create a distributed plan for an
225 * INSERT INTO distributed_table SELECT ... query by push down the
226 * command to the workers and if that is not possible it creates a
227 * plan for evaluating the SELECT on the coordinator.
228 */
229 static DistributedPlan *
CreateInsertSelectPlanInternal(uint64 planId,Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext,ParamListInfo boundParams)230 CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
231 PlannerRestrictionContext *plannerRestrictionContext,
232 ParamListInfo boundParams)
233 {
234 DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(originalQuery);
235 if (deferredError != NULL)
236 {
237 /* raising the error as there is no possible solution for the unsupported on conflict statements */
238 RaiseDeferredError(deferredError, ERROR);
239 }
240
241 DistributedPlan *distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
242 plannerRestrictionContext);
243
244 if (distributedPlan->planningError != NULL)
245 {
246 RaiseDeferredError(distributedPlan->planningError, DEBUG1);
247
248 /*
249 * If INSERT..SELECT cannot be distributed, pull to coordinator or use
250 * repartitioning.
251 */
252 distributedPlan = CreateNonPushableInsertSelectPlan(planId, originalQuery,
253 boundParams);
254 }
255
256 return distributedPlan;
257 }
258
259
260 /*
261 * CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed
262 * INSERT ... SELECT queries which could consist of multiple tasks.
263 *
264 * The function never returns NULL, it errors out if cannot create the DistributedPlan.
265 */
266 static DistributedPlan *
CreateDistributedInsertSelectPlan(Query * originalQuery,PlannerRestrictionContext * plannerRestrictionContext)267 CreateDistributedInsertSelectPlan(Query *originalQuery,
268 PlannerRestrictionContext *plannerRestrictionContext)
269 {
270 List *sqlTaskList = NIL;
271 uint32 taskIdIndex = 1; /* 0 is reserved for invalid taskId */
272 uint64 jobId = INVALID_JOB_ID;
273 DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
274 RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery);
275 RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
276 Oid targetRelationId = insertRte->relid;
277 CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
278 int shardCount = targetCacheEntry->shardIntervalArrayLength;
279 RelationRestrictionContext *relationRestrictionContext =
280 plannerRestrictionContext->relationRestrictionContext;
281 bool allReferenceTables = relationRestrictionContext->allReferenceTables;
282
283 distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery);
284
285 /*
286 * Error semantics for INSERT ... SELECT queries are different than regular
287 * modify queries. Thus, handle separately.
288 */
289 distributedPlan->planningError = DistributedInsertSelectSupported(originalQuery,
290 insertRte,
291 subqueryRte,
292 allReferenceTables);
293 if (distributedPlan->planningError)
294 {
295 return distributedPlan;
296 }
297
298 bool allDistributionKeysInQueryAreEqual =
299 AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext);
300
301 /*
302 * Plan select query for each shard in the target table. Do so by replacing the
303 * partitioning qual parameter added in distributed_planner() using the current shard's
304 * actual boundary values. Also, add the current shard's boundary values to the
305 * top level subquery to ensure that even if the partitioning qual is not distributed
306 * to all the tables, we never run the queries on the shards that don't match with
307 * the current shard boundaries. Finally, perform the normal shard pruning to
308 * decide on whether to push the query to the current shard or not.
309 */
310 for (int shardOffset = 0; shardOffset < shardCount; shardOffset++)
311 {
312 ShardInterval *targetShardInterval =
313 targetCacheEntry->sortedShardIntervalArray[shardOffset];
314
315 Task *modifyTask = RouterModifyTaskForShardInterval(originalQuery,
316 targetCacheEntry,
317 targetShardInterval,
318 plannerRestrictionContext,
319 taskIdIndex,
320 allDistributionKeysInQueryAreEqual,
321 &distributedPlan->
322 planningError);
323
324 if (distributedPlan->planningError != NULL)
325 {
326 return distributedPlan;
327 }
328
329 /* add the task if it could be created */
330 if (modifyTask != NULL)
331 {
332 modifyTask->modifyWithSubquery = true;
333
334 sqlTaskList = lappend(sqlTaskList, modifyTask);
335 }
336
337 taskIdIndex++;
338 }
339
340 /* Create the worker job */
341 Job *workerJob = CitusMakeNode(Job);
342 workerJob->taskList = sqlTaskList;
343 workerJob->subqueryPushdown = false;
344 workerJob->dependentJobList = NIL;
345 workerJob->jobId = jobId;
346 workerJob->jobQuery = originalQuery;
347 workerJob->requiresCoordinatorEvaluation =
348 RequiresCoordinatorEvaluation(originalQuery);
349
350 /* and finally the multi plan */
351 distributedPlan->workerJob = workerJob;
352 distributedPlan->combineQuery = NULL;
353 distributedPlan->expectResults = originalQuery->returningList != NIL;
354 distributedPlan->targetRelationId = targetRelationId;
355
356 return distributedPlan;
357 }
358
359
360 /*
361 * CreateInsertSelectIntoLocalTablePlan creates the plan for INSERT .. SELECT queries
362 * where the selected table is distributed and the inserted table is not.
363 *
364 * To create the plan, this function first creates a distributed plan for the SELECT
365 * part. Then puts it as a subquery to the original (non-distributed) INSERT query as
366 * a subquery. Finally, it puts this INSERT query, which now has a distributed SELECT
367 * subquery, in the combineQuery.
368 *
369 * If the SELECT query is a router query, whose distributed plan does not have a
370 * combineQuery, this function also creates a dummy combineQuery for that.
371 */
372 DistributedPlan *
CreateInsertSelectIntoLocalTablePlan(uint64 planId,Query * originalQuery,ParamListInfo boundParams,bool hasUnresolvedParams,PlannerRestrictionContext * plannerRestrictionContext)373 CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *originalQuery, ParamListInfo
374 boundParams, bool hasUnresolvedParams,
375 PlannerRestrictionContext *plannerRestrictionContext)
376 {
377 RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(originalQuery);
378
379 Query *selectQuery = BuildSelectForInsertSelect(originalQuery);
380 originalQuery->cteList = NIL;
381 DistributedPlan *distPlan = CreateDistributedPlan(planId, selectQuery,
382 copyObject(selectQuery),
383 boundParams, hasUnresolvedParams,
384 plannerRestrictionContext);
385
386 /*
387 * We don't expect distPlan to be NULL here because hasUnresolvedParams is
388 * already checked before this function and CreateDistributedPlan only returns
389 * NULL when there are unresolved parameters.
390 */
391 Assert(distPlan != NULL);
392
393 if (distPlan->planningError)
394 {
395 return distPlan;
396 }
397
398 if (distPlan->combineQuery == NULL)
399 {
400 /*
401 * For router queries, we construct a synthetic master query that simply passes
402 * on the results of the remote tasks, which we can then use as the select in
403 * the INSERT .. SELECT.
404 */
405 distPlan->combineQuery = CreateCombineQueryForRouterPlan(
406 distPlan);
407 }
408
409 /*
410 * combineQuery of a distributed select is for combining the results from
411 * worker nodes on the coordinator node. Putting it as a subquery to the
412 * INSERT query, causes the INSERT query to insert the combined select value
413 * from the workers. And making the resulting insert query the combineQuery
414 * let's us execute this insert command.
415 *
416 * So this operation makes the master query insert the result of the
417 * distributed select instead of returning it.
418 */
419 selectRte->subquery = distPlan->combineQuery;
420 distPlan->combineQuery = originalQuery;
421
422 return distPlan;
423 }
424
425
426 /*
427 * CreateCombineQueryForRouterPlan is used for creating a dummy combineQuery
428 * for a router plan, since router plans normally don't have one.
429 */
430 static Query *
CreateCombineQueryForRouterPlan(DistributedPlan * distPlan)431 CreateCombineQueryForRouterPlan(DistributedPlan *distPlan)
432 {
433 const Index insertTableId = 1;
434 List *tableIdList = list_make1(makeInteger(insertTableId));
435 Job *dependentJob = distPlan->workerJob;
436 List *dependentTargetList = dependentJob->jobQuery->targetList;
437
438 /* compute column names for the derived table */
439 uint32 columnCount = (uint32) list_length(dependentTargetList);
440 List *columnNameList = DerivedColumnNameList(columnCount,
441 dependentJob->jobId);
442
443 List *funcColumnNames = NIL;
444 List *funcColumnTypes = NIL;
445 List *funcColumnTypeMods = NIL;
446 List *funcCollations = NIL;
447
448 TargetEntry *targetEntry = NULL;
449 foreach_ptr(targetEntry, dependentTargetList)
450 {
451 Node *expr = (Node *) targetEntry->expr;
452
453 char *name = targetEntry->resname;
454 if (name == NULL)
455 {
456 name = pstrdup("unnamed");
457 }
458
459 funcColumnNames = lappend(funcColumnNames, makeString(name));
460
461 funcColumnTypes = lappend_oid(funcColumnTypes, exprType(expr));
462 funcColumnTypeMods = lappend_int(funcColumnTypeMods, exprTypmod(expr));
463 funcCollations = lappend_oid(funcCollations, exprCollation(expr));
464 }
465
466 RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(NULL,
467 columnNameList,
468 tableIdList,
469 funcColumnNames,
470 funcColumnTypes,
471 funcColumnTypeMods,
472 funcCollations);
473
474 List *targetList = CreateTargetListForCombineQuery(dependentTargetList);
475
476 RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
477 rangeTableRef->rtindex = 1;
478
479 FromExpr *joinTree = makeNode(FromExpr);
480 joinTree->quals = NULL;
481 joinTree->fromlist = list_make1(rangeTableRef);
482
483 Query *combineQuery = makeNode(Query);
484 combineQuery->commandType = CMD_SELECT;
485 combineQuery->querySource = QSRC_ORIGINAL;
486 combineQuery->canSetTag = true;
487 combineQuery->rtable = list_make1(rangeTableEntry);
488 combineQuery->targetList = targetList;
489 combineQuery->jointree = joinTree;
490 return combineQuery;
491 }
492
493
494 /*
495 * CreateTargetListForCombineQuery is used for creating a target list for
496 * master query.
497 */
498 static List *
CreateTargetListForCombineQuery(List * targetList)499 CreateTargetListForCombineQuery(List *targetList)
500 {
501 List *newTargetEntryList = NIL;
502 const uint32 masterTableId = 1;
503 int columnId = 1;
504
505 /* iterate over original target entries */
506 TargetEntry *originalTargetEntry = NULL;
507 foreach_ptr(originalTargetEntry, targetList)
508 {
509 TargetEntry *newTargetEntry = flatCopyTargetEntry(originalTargetEntry);
510
511 Var *column = makeVarFromTargetEntry(masterTableId, originalTargetEntry);
512 column->varattno = columnId;
513 column->varattnosyn = columnId;
514 columnId++;
515
516 if (column->vartype == RECORDOID || column->vartype == RECORDARRAYOID)
517 {
518 column->vartypmod = BlessRecordExpression(originalTargetEntry->expr);
519 }
520
521 Expr *newExpression = (Expr *) column;
522
523 newTargetEntry->expr = newExpression;
524 newTargetEntryList = lappend(newTargetEntryList, newTargetEntry);
525 }
526 return newTargetEntryList;
527 }
528
529
530 /*
531 * DistributedInsertSelectSupported returns NULL if the INSERT ... SELECT query
532 * is supported, or a description why not.
533 */
534 static DeferredErrorMessage *
DistributedInsertSelectSupported(Query * queryTree,RangeTblEntry * insertRte,RangeTblEntry * subqueryRte,bool allReferenceTables)535 DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
536 RangeTblEntry *subqueryRte, bool allReferenceTables)
537 {
538 Oid selectPartitionColumnTableId = InvalidOid;
539 Oid targetRelationId = insertRte->relid;
540 ListCell *rangeTableCell = NULL;
541
542 /* we only do this check for INSERT ... SELECT queries */
543 AssertArg(InsertSelectIntoCitusTable(queryTree));
544
545 Query *subquery = subqueryRte->subquery;
546
547 if (!NeedsDistributedPlanning(subquery))
548 {
549 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
550 "distributed INSERT ... SELECT can only select from "
551 "distributed tables",
552 NULL, NULL);
553 }
554
555 RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery);
556 if (subqueryRteListProperties->hasDistributedTable &&
557 (subqueryRteListProperties->hasCitusLocalTable ||
558 subqueryRteListProperties->hasPostgresLocalTable))
559 {
560 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
561 "distributed INSERT ... SELECT cannot select from "
562 "distributed tables and local tables at the same time",
563 NULL, NULL);
564 }
565
566 if (subqueryRteListProperties->hasDistributedTable &&
567 IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
568 {
569 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
570 "distributed INSERT ... SELECT cannot insert into a "
571 "local table that is added to metadata",
572 NULL, NULL);
573 }
574
575 /*
576 * In some cases, it might be possible to allow postgres local tables
577 * in distributed insert select. However, we want to behave consistent
578 * on all cases including Citus MX, and let insert select via coordinator
579 * to kick-in.
580 */
581 if (subqueryRteListProperties->hasPostgresLocalTable)
582 {
583 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
584 "distributed INSERT ... SELECT cannot select from "
585 "a local table", NULL, NULL);
586 return NULL;
587 }
588
589 /* we do not expect to see a view in modify target */
590 foreach(rangeTableCell, queryTree->rtable)
591 {
592 RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
593 if (rangeTableEntry->rtekind == RTE_RELATION &&
594 rangeTableEntry->relkind == RELKIND_VIEW)
595 {
596 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
597 "cannot insert into view over distributed table",
598 NULL, NULL);
599 }
600 }
601
602 if (FindNodeMatchingCheckFunction((Node *) queryTree, CitusIsVolatileFunction))
603 {
604 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
605 "volatile functions are not allowed in distributed "
606 "INSERT ... SELECT queries",
607 NULL, NULL);
608 }
609
610 /* we don't support LIMIT, OFFSET and WINDOW functions */
611 DeferredErrorMessage *error = MultiTaskRouterSelectQuerySupported(subquery);
612 if (error)
613 {
614 return error;
615 }
616
617 if (IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
618 {
619 /*
620 * If we're inserting into a citus local table, it is ok because we've
621 * checked the non-existence of distributed tables in the subquery.
622 */
623 }
624 else if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
625 {
626 /*
627 * If we're inserting into a reference table, all participating tables
628 * should be reference tables as well.
629 */
630 if (!allReferenceTables)
631 {
632 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
633 "only reference tables may be queried when targeting "
634 "a reference table with distributed INSERT ... SELECT",
635 NULL, NULL);
636 }
637 }
638 else
639 {
640 /* ensure that INSERT's partition column comes from SELECT's partition column */
641 error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
642 &selectPartitionColumnTableId);
643 if (error)
644 {
645 return error;
646 }
647
648 /*
649 * We expect partition column values come from colocated tables. Note that we
650 * skip this check from the reference table case given that all reference tables
651 * are already (and by default) co-located.
652 */
653 if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
654 {
655 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
656 "INSERT target table and the source relation of the SELECT partition "
657 "column value must be colocated in distributed INSERT ... SELECT",
658 NULL, NULL);
659 }
660 }
661
662 return NULL;
663 }
664
665
666 /*
667 * RouterModifyTaskForShardInterval creates a modify task by
668 * replacing the partitioning qual parameter added in distributed_planner()
669 * with the shardInterval's boundary value. Then perform the normal
670 * shard pruning on the subquery. Finally, checks if the target shardInterval
671 * has exactly same placements with the select task's available anchor
672 * placements.
673 *
674 * The function errors out if the subquery is not router select query (i.e.,
675 * subqueries with non equi-joins.).
676 */
677 static Task *
RouterModifyTaskForShardInterval(Query * originalQuery,CitusTableCacheEntry * targetTableCacheEntry,ShardInterval * shardInterval,PlannerRestrictionContext * plannerRestrictionContext,uint32 taskIdIndex,bool safeToPushdownSubquery,DeferredErrorMessage ** routerPlannerError)678 RouterModifyTaskForShardInterval(Query *originalQuery,
679 CitusTableCacheEntry *targetTableCacheEntry,
680 ShardInterval *shardInterval,
681 PlannerRestrictionContext *plannerRestrictionContext,
682 uint32 taskIdIndex,
683 bool safeToPushdownSubquery,
684 DeferredErrorMessage **routerPlannerError)
685 {
686 Query *copiedQuery = copyObject(originalQuery);
687 RangeTblEntry *copiedInsertRte = ExtractResultRelationRTEOrError(copiedQuery);
688 RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery);
689 Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery;
690
691 uint64 shardId = shardInterval->shardId;
692 Oid distributedTableId = shardInterval->relationId;
693
694 PlannerRestrictionContext *copyOfPlannerRestrictionContext = palloc0(
695 sizeof(PlannerRestrictionContext));
696
697 StringInfo queryString = makeStringInfo();
698 ListCell *restrictionCell = NULL;
699 List *selectPlacementList = NIL;
700 uint64 selectAnchorShardId = INVALID_SHARD_ID;
701 List *relationShardList = NIL;
702 List *prunedShardIntervalListList = NIL;
703 uint64 jobId = INVALID_JOB_ID;
704 bool allReferenceTables =
705 plannerRestrictionContext->relationRestrictionContext->allReferenceTables;
706 List *shardOpExpressions = NIL;
707 RestrictInfo *shardRestrictionList = NULL;
708 bool multiShardModifyQuery = false;
709 List *relationRestrictionList = NIL;
710
711 copyOfPlannerRestrictionContext->relationRestrictionContext =
712 CopyRelationRestrictionContext(
713 plannerRestrictionContext->relationRestrictionContext);
714 copyOfPlannerRestrictionContext->joinRestrictionContext =
715 plannerRestrictionContext->joinRestrictionContext;
716 copyOfPlannerRestrictionContext->fastPathRestrictionContext =
717 plannerRestrictionContext->fastPathRestrictionContext;
718
719 relationRestrictionList =
720 copyOfPlannerRestrictionContext->relationRestrictionContext->
721 relationRestrictionList;
722
723 /* grab shared metadata lock to stop concurrent placement additions */
724 LockShardDistributionMetadata(shardId, ShareLock);
725
726 /*
727 * Replace the partitioning qual parameter value in all baserestrictinfos.
728 * Note that this has to be done on a copy, as the walker modifies in place.
729 */
730 foreach(restrictionCell, relationRestrictionList)
731 {
732 RelationRestriction *restriction = lfirst(restrictionCell);
733 List *originalBaseRestrictInfo = restriction->relOptInfo->baserestrictinfo;
734 List *extendedBaseRestrictInfo = originalBaseRestrictInfo;
735 Index rteIndex = restriction->index;
736
737 if (!safeToPushdownSubquery || allReferenceTables)
738 {
739 continue;
740 }
741
742 shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex);
743
744 /* means it is a reference table and do not add any shard interval information */
745 if (shardOpExpressions == NIL)
746 {
747 continue;
748 }
749
750
751 /*
752 * passing NULL for plannerInfo will be problematic if we have placeholder
753 * vars. However, it won't be the case here because we are building
754 * the expression from shard intervals which don't have placeholder vars.
755 * Note that this is only the case with PG14 as the parameter doesn't exist
756 * prior to that.
757 */
758 shardRestrictionList = make_simple_restrictinfo_compat(NULL,
759 (Expr *) shardOpExpressions);
760 extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo,
761 shardRestrictionList);
762
763 restriction->relOptInfo->baserestrictinfo = extendedBaseRestrictInfo;
764 }
765
766 /*
767 * We also need to add shard interval range to the subquery in case
768 * the partition qual not distributed all tables such as some
769 * subqueries in WHERE clause.
770 *
771 * Note that we need to add the ranges before the shard pruning to
772 * prevent shard pruning logic (i.e, namely UpdateRelationNames())
773 * modifies range table entries, which makes hard to add the quals.
774 */
775 RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(
776 copiedSubquery);
777 if (subqueryRteListProperties->hasDistributedTable)
778 {
779 AddPartitionKeyNotNullFilterToSelect(copiedSubquery);
780 }
781
782 /* mark that we don't want the router planner to generate dummy hosts/queries */
783 bool replacePrunedQueryWithDummy = false;
784
785 /*
786 * Use router planner to decide on whether we can push down the query or not.
787 * If we can, we also rely on the side-effects that all RTEs have been updated
788 * to point to the relevant nodes and selectPlacementList is determined.
789 */
790 DeferredErrorMessage *planningError = PlanRouterQuery(copiedSubquery,
791 copyOfPlannerRestrictionContext,
792 &selectPlacementList,
793 &selectAnchorShardId,
794 &relationShardList,
795 &prunedShardIntervalListList,
796 replacePrunedQueryWithDummy,
797 &multiShardModifyQuery, NULL,
798 false);
799
800 Assert(!multiShardModifyQuery);
801
802 if (planningError)
803 {
804 *routerPlannerError = planningError;
805 return NULL;
806 }
807
808
809 /* ensure that we do not send queries where select is pruned away completely */
810 if (list_length(selectPlacementList) == 0)
811 {
812 ereport(DEBUG2, (errmsg("Skipping target shard interval " UINT64_FORMAT
813 " since SELECT query for it pruned away",
814 shardId)));
815
816 return NULL;
817 }
818
819 /* get the placements for insert target shard and its intersection with select */
820 List *insertShardPlacementList = ActiveShardPlacementList(shardId);
821 List *intersectedPlacementList = IntersectPlacementList(insertShardPlacementList,
822 selectPlacementList);
823
824 /*
825 * If insert target does not have exactly the same placements with the select,
826 * we sholdn't run the query.
827 */
828 if (list_length(insertShardPlacementList) != list_length(intersectedPlacementList))
829 {
830 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
831 errmsg("cannot perform distributed planning for the given "
832 "modification"),
833 errdetail("Insert query cannot be executed on all placements "
834 "for shard " UINT64_FORMAT "", shardId)));
835 }
836
837
838 /* this is required for correct deparsing of the query */
839 ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte);
840
841 /* setting an alias simplifies deparsing of RETURNING */
842 if (copiedInsertRte->alias == NULL)
843 {
844 Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
845 copiedInsertRte->alias = alias;
846 }
847
848 /* and generate the full query string */
849 deparse_shard_query(copiedQuery, distributedTableId, shardInterval->shardId,
850 queryString);
851 ereport(DEBUG2, (errmsg("distributed statement: %s",
852 ApplyLogRedaction(queryString->data))));
853
854 Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
855 queryString->data);
856 modifyTask->dependentTaskList = NULL;
857 modifyTask->anchorShardId = shardId;
858 modifyTask->taskPlacementList = insertShardPlacementList;
859 modifyTask->relationShardList = relationShardList;
860 modifyTask->replicationModel = targetTableCacheEntry->replicationModel;
861
862 return modifyTask;
863 }
864
865
866 /*
867 * ReorderInsertSelectTargetLists reorders the target lists of INSERT/SELECT
868 * query which is required for deparsing purposes. The reordered query is returned.
869 *
870 * The necessity for this function comes from the fact that ruleutils.c is not supposed
871 * to be used on "rewritten" queries (i.e. ones that have been passed through
872 * QueryRewrite()). Query rewriting is the process in which views and such are expanded,
873 * and, INSERT/UPDATE targetlists are reordered to match the physical order,
874 * defaults etc. For the details of reordeing, see transformInsertRow() and
875 * rewriteTargetListIU().
876 */
877 Query *
ReorderInsertSelectTargetLists(Query * originalQuery,RangeTblEntry * insertRte,RangeTblEntry * subqueryRte)878 ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
879 RangeTblEntry *subqueryRte)
880 {
881 ListCell *insertTargetEntryCell;
882 List *newSubqueryTargetlist = NIL;
883 List *newInsertTargetlist = NIL;
884 int resno = 1;
885 Index insertTableId = 1;
886 int targetEntryIndex = 0;
887
888 AssertArg(InsertSelectIntoCitusTable(originalQuery));
889
890 Query *subquery = subqueryRte->subquery;
891
892 Oid insertRelationId = insertRte->relid;
893
894 /*
895 * We implement the following algorithm for the reoderding:
896 * - Iterate over the INSERT target list entries
897 * - If the target entry includes a Var, find the corresponding
898 * SELECT target entry on the original query and update resno
899 * - If the target entry does not include a Var (i.e., defaults
900 * or constants), create new target entry and add that to
901 * SELECT target list
902 * - Create a new INSERT target entry with respect to the new
903 * SELECT target entry created.
904 */
905 foreach(insertTargetEntryCell, originalQuery->targetList)
906 {
907 TargetEntry *oldInsertTargetEntry = lfirst(insertTargetEntryCell);
908 TargetEntry *newSubqueryTargetEntry = NULL;
909 AttrNumber originalAttrNo = get_attnum(insertRelationId,
910 oldInsertTargetEntry->resname);
911
912 /* see transformInsertRow() for the details */
913 if (IsA(oldInsertTargetEntry->expr, ArrayRef) ||
914 IsA(oldInsertTargetEntry->expr, FieldStore))
915 {
916 ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
917 errmsg(
918 "cannot plan distributed INSERT INTO ... SELECT query"),
919 errhint("Do not use array references and field stores "
920 "on the INSERT target list.")));
921 }
922
923 /*
924 * It is safe to pull Var clause and ignore the coercions since that
925 * are already going to be added on the workers implicitly.
926 */
927 List *targetVarList = pull_var_clause((Node *) oldInsertTargetEntry->expr,
928 PVC_RECURSE_AGGREGATES);
929
930 int targetVarCount = list_length(targetVarList);
931
932 /* a single INSERT target entry cannot have more than one Var */
933 Assert(targetVarCount <= 1);
934
935 if (targetVarCount == 1)
936 {
937 Var *oldInsertVar = (Var *) linitial(targetVarList);
938 TargetEntry *oldSubqueryTle = list_nth(subquery->targetList,
939 oldInsertVar->varattno - 1);
940
941 newSubqueryTargetEntry = copyObject(oldSubqueryTle);
942
943 newSubqueryTargetEntry->resno = resno;
944 newSubqueryTargetlist = lappend(newSubqueryTargetlist,
945 newSubqueryTargetEntry);
946 }
947 else
948 {
949 newSubqueryTargetEntry = makeTargetEntry(oldInsertTargetEntry->expr,
950 resno,
951 oldInsertTargetEntry->resname,
952 oldInsertTargetEntry->resjunk);
953 newSubqueryTargetlist = lappend(newSubqueryTargetlist,
954 newSubqueryTargetEntry);
955 }
956
957 /*
958 * The newly created select target entry cannot be a junk entry since junk
959 * entries are not in the final target list and we're processing the
960 * final target list entries.
961 */
962 Assert(!newSubqueryTargetEntry->resjunk);
963
964 Var *newInsertVar = makeVar(insertTableId, originalAttrNo,
965 exprType((Node *) newSubqueryTargetEntry->expr),
966 exprTypmod((Node *) newSubqueryTargetEntry->expr),
967 exprCollation((Node *) newSubqueryTargetEntry->expr),
968 0);
969 TargetEntry *newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar,
970 originalAttrNo,
971 oldInsertTargetEntry->resname,
972 oldInsertTargetEntry->resjunk);
973
974 newInsertTargetlist = lappend(newInsertTargetlist, newInsertTargetEntry);
975 resno++;
976 }
977
978 /*
979 * if there are any remaining target list entries (i.e., GROUP BY column not on the
980 * target list of subquery), update the remaining resnos.
981 */
982 int subqueryTargetLength = list_length(subquery->targetList);
983 for (; targetEntryIndex < subqueryTargetLength; ++targetEntryIndex)
984 {
985 TargetEntry *oldSubqueryTle = list_nth(subquery->targetList,
986 targetEntryIndex);
987
988 /*
989 * Skip non-junk entries since we've already processed them above and this
990 * loop only is intended for junk entries.
991 */
992 if (!oldSubqueryTle->resjunk)
993 {
994 continue;
995 }
996
997 TargetEntry *newSubqueryTargetEntry = copyObject(oldSubqueryTle);
998
999 newSubqueryTargetEntry->resno = resno;
1000 newSubqueryTargetlist = lappend(newSubqueryTargetlist,
1001 newSubqueryTargetEntry);
1002
1003 resno++;
1004 }
1005
1006 originalQuery->targetList = newInsertTargetlist;
1007 subquery->targetList = newSubqueryTargetlist;
1008
1009 return NULL;
1010 }
1011
1012
1013 /*
1014 * MultiTaskRouterSelectQuerySupported returns NULL if the query may be used
1015 * as the source for an INSERT ... SELECT or returns a description why not.
1016 */
1017 static DeferredErrorMessage *
MultiTaskRouterSelectQuerySupported(Query * query)1018 MultiTaskRouterSelectQuerySupported(Query *query)
1019 {
1020 List *queryList = NIL;
1021 ListCell *queryCell = NULL;
1022 StringInfo errorDetail = NULL;
1023 bool hasUnsupportedDistinctOn = false;
1024
1025 ExtractQueryWalker((Node *) query, &queryList);
1026 foreach(queryCell, queryList)
1027 {
1028 Query *subquery = (Query *) lfirst(queryCell);
1029
1030 Assert(subquery->commandType == CMD_SELECT);
1031
1032 /* pushing down rtes without relations yields (shardCount * expectedRows) */
1033 if (HasEmptyJoinTree(subquery))
1034 {
1035 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1036 "Subqueries without relations are not allowed in "
1037 "distributed INSERT ... SELECT queries",
1038 NULL, NULL);
1039 }
1040
1041 /* pushing down limit per shard would yield wrong results */
1042 if (subquery->limitCount != NULL)
1043 {
1044 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1045 "LIMIT clauses are not allowed in distributed INSERT "
1046 "... SELECT queries",
1047 NULL, NULL);
1048 }
1049
1050 /* pushing down limit offest per shard would yield wrong results */
1051 if (subquery->limitOffset != NULL)
1052 {
1053 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1054 "OFFSET clauses are not allowed in distributed "
1055 "INSERT ... SELECT queries",
1056 NULL, NULL);
1057 }
1058
1059 /* group clause list must include partition column */
1060 if (subquery->groupClause)
1061 {
1062 List *groupClauseList = subquery->groupClause;
1063 List *targetEntryList = subquery->targetList;
1064 List *groupTargetEntryList = GroupTargetEntryList(groupClauseList,
1065 targetEntryList);
1066 bool groupOnPartitionColumn = TargetListOnPartitionColumn(subquery,
1067 groupTargetEntryList);
1068 if (!groupOnPartitionColumn)
1069 {
1070 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1071 "Group by list without distribution column is "
1072 "not allowed in distributed INSERT ... "
1073 "SELECT queries",
1074 NULL, NULL);
1075 }
1076 }
1077
1078 /*
1079 * We support window functions when the window function
1080 * is partitioned on distribution column.
1081 */
1082 if (subquery->windowClause && !SafeToPushdownWindowFunction(subquery,
1083 &errorDetail))
1084 {
1085 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, errorDetail->data, NULL,
1086 NULL);
1087 }
1088
1089 if (subquery->setOperations != NULL)
1090 {
1091 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1092 "Set operations are not allowed in distributed "
1093 "INSERT ... SELECT queries",
1094 NULL, NULL);
1095 }
1096
1097 /*
1098 * We currently do not support grouping sets since it could generate NULL
1099 * results even after the restrictions are applied to the query. A solution
1100 * would be to add the whole query into a subquery and add the restrictions
1101 * on that subquery.
1102 */
1103 if (subquery->groupingSets != NULL)
1104 {
1105 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1106 "grouping sets are not allowed in distributed "
1107 "INSERT ... SELECT queries",
1108 NULL, NULL);
1109 }
1110
1111 /*
1112 * We don't support DISTINCT ON clauses on non-partition columns.
1113 */
1114 hasUnsupportedDistinctOn = HasUnsupportedDistinctOn(subquery);
1115 if (hasUnsupportedDistinctOn)
1116 {
1117 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1118 "DISTINCT ON (non-partition column) clauses are not "
1119 "allowed in distributed INSERT ... SELECT queries",
1120 NULL, NULL);
1121 }
1122 }
1123
1124 return NULL;
1125 }
1126
1127
1128 /*
1129 * HasUnsupportedDistinctOn returns true if the query has distinct on and
1130 * distinct targets do not contain partition column.
1131 */
1132 static bool
HasUnsupportedDistinctOn(Query * query)1133 HasUnsupportedDistinctOn(Query *query)
1134 {
1135 ListCell *distinctCell = NULL;
1136
1137 if (!query->hasDistinctOn)
1138 {
1139 return false;
1140 }
1141
1142 foreach(distinctCell, query->distinctClause)
1143 {
1144 SortGroupClause *distinctClause = lfirst(distinctCell);
1145 TargetEntry *distinctEntry = get_sortgroupclause_tle(distinctClause,
1146 query->targetList);
1147
1148 bool skipOuterVars = true;
1149 if (IsPartitionColumn(distinctEntry->expr, query, skipOuterVars))
1150 {
1151 return false;
1152 }
1153 }
1154
1155 return true;
1156 }
1157
1158
1159 /*
1160 * InsertPartitionColumnMatchesSelect returns NULL the partition column in the
1161 * table targeted by INSERTed matches with the any of the SELECTed table's
1162 * partition column. Returns the error description if there's no match.
1163 *
1164 * On return without error (i.e., if partition columns match), the function
1165 * also sets selectPartitionColumnTableId.
1166 */
1167 static DeferredErrorMessage *
InsertPartitionColumnMatchesSelect(Query * query,RangeTblEntry * insertRte,RangeTblEntry * subqueryRte,Oid * selectPartitionColumnTableId)1168 InsertPartitionColumnMatchesSelect(Query *query, RangeTblEntry *insertRte,
1169 RangeTblEntry *subqueryRte,
1170 Oid *selectPartitionColumnTableId)
1171 {
1172 ListCell *targetEntryCell = NULL;
1173 uint32 rangeTableId = 1;
1174 Oid insertRelationId = insertRte->relid;
1175 Var *insertPartitionColumn = PartitionColumn(insertRelationId, rangeTableId);
1176 Query *subquery = subqueryRte->subquery;
1177 bool targetTableHasPartitionColumn = false;
1178
1179 foreach(targetEntryCell, query->targetList)
1180 {
1181 TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
1182 List *insertTargetEntryColumnList = pull_var_clause_default((Node *) targetEntry);
1183 Var *subqueryPartitionColumn = NULL;
1184
1185 /*
1186 * We only consider target entries that include a single column. Note that this
1187 * is slightly different than directly checking the whether the targetEntry->expr
1188 * is a var since the var could be wrapped into an implicit/explicit casting.
1189 *
1190 * Also note that we skip the target entry if it does not contain a Var, which
1191 * corresponds to columns with DEFAULT values on the target list.
1192 */
1193 if (list_length(insertTargetEntryColumnList) != 1)
1194 {
1195 continue;
1196 }
1197
1198 Var *insertVar = (Var *) linitial(insertTargetEntryColumnList);
1199 AttrNumber originalAttrNo = targetEntry->resno;
1200
1201 /* skip processing of target table non-partition columns */
1202 if (originalAttrNo != insertPartitionColumn->varattno)
1203 {
1204 continue;
1205 }
1206
1207 /* INSERT query includes the partition column */
1208 targetTableHasPartitionColumn = true;
1209
1210 TargetEntry *subqueryTargetEntry = list_nth(subquery->targetList,
1211 insertVar->varattno - 1);
1212 Expr *selectTargetExpr = subqueryTargetEntry->expr;
1213
1214 RangeTblEntry *subqueryPartitionColumnRelationIdRTE = NULL;
1215 List *parentQueryList = list_make2(query, subquery);
1216 bool skipOuterVars = true;
1217 FindReferencedTableColumn(selectTargetExpr,
1218 parentQueryList, subquery,
1219 &subqueryPartitionColumn,
1220 &subqueryPartitionColumnRelationIdRTE,
1221 skipOuterVars);
1222 Oid subqueryPartitionColumnRelationId = subqueryPartitionColumnRelationIdRTE ?
1223 subqueryPartitionColumnRelationIdRTE->
1224 relid :
1225 InvalidOid;
1226
1227 /*
1228 * Corresponding (i.e., in the same ordinal position as the target table's
1229 * partition column) select target entry does not directly belong a table.
1230 * Evaluate its expression type and error out properly.
1231 */
1232 if (subqueryPartitionColumnRelationId == InvalidOid)
1233 {
1234 char *errorDetailTemplate = "Subquery contains %s in the "
1235 "same position as the target table's "
1236 "partition column.";
1237
1238 char *exprDescription = "";
1239
1240 switch (selectTargetExpr->type)
1241 {
1242 case T_Const:
1243 {
1244 exprDescription = "a constant value";
1245 break;
1246 }
1247
1248 case T_OpExpr:
1249 {
1250 exprDescription = "an operator";
1251 break;
1252 }
1253
1254 case T_FuncExpr:
1255 {
1256 FuncExpr *subqueryFunctionExpr = (FuncExpr *) selectTargetExpr;
1257
1258 switch (subqueryFunctionExpr->funcformat)
1259 {
1260 case COERCE_EXPLICIT_CALL:
1261 {
1262 exprDescription = "a function call";
1263 break;
1264 }
1265
1266 case COERCE_EXPLICIT_CAST:
1267 {
1268 exprDescription = "an explicit cast";
1269 break;
1270 }
1271
1272 case COERCE_IMPLICIT_CAST:
1273 {
1274 exprDescription = "an implicit cast";
1275 break;
1276 }
1277
1278 default:
1279 {
1280 exprDescription = "a function call";
1281 break;
1282 }
1283 }
1284 break;
1285 }
1286
1287 case T_Aggref:
1288 {
1289 exprDescription = "an aggregation";
1290 break;
1291 }
1292
1293 case T_CaseExpr:
1294 {
1295 exprDescription = "a case expression";
1296 break;
1297 }
1298
1299 case T_CoalesceExpr:
1300 {
1301 exprDescription = "a coalesce expression";
1302 break;
1303 }
1304
1305 case T_RowExpr:
1306 {
1307 exprDescription = "a row expression";
1308 break;
1309 }
1310
1311 case T_MinMaxExpr:
1312 {
1313 exprDescription = "a min/max expression";
1314 break;
1315 }
1316
1317 case T_CoerceViaIO:
1318 {
1319 exprDescription = "an explicit coercion";
1320 break;
1321 }
1322
1323 default:
1324 {
1325 exprDescription =
1326 "an expression that is not a simple column reference";
1327 break;
1328 }
1329 }
1330
1331 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1332 "cannot perform distributed INSERT INTO ... SELECT "
1333 "because the partition columns in the source table "
1334 "and subquery do not match",
1335 psprintf(errorDetailTemplate, exprDescription),
1336 "Ensure the target table's partition column has a "
1337 "corresponding simple column reference to a distributed "
1338 "table's partition column in the subquery.");
1339 }
1340
1341 /*
1342 * Insert target expression could only be non-var if the select target
1343 * entry does not have the same type (i.e., target column requires casting).
1344 */
1345 if (!IsA(targetEntry->expr, Var))
1346 {
1347 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1348 "cannot perform distributed INSERT INTO ... SELECT "
1349 "because the partition columns in the source table "
1350 "and subquery do not match",
1351 "The data type of the target table's partition column "
1352 "should exactly match the data type of the "
1353 "corresponding simple column reference in the subquery.",
1354 NULL);
1355 }
1356
1357 /* finally, check that the select target column is a partition column */
1358 if (!IsPartitionColumn(selectTargetExpr, subquery, skipOuterVars))
1359 {
1360 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1361 "cannot perform distributed INSERT INTO ... SELECT "
1362 "because the partition columns in the source table "
1363 "and subquery do not match",
1364 "The target table's partition column should correspond "
1365 "to a partition column in the subquery.",
1366 NULL);
1367 }
1368
1369 /* finally, check that the select target column is a partition column */
1370 /* we can set the select relation id */
1371 *selectPartitionColumnTableId = subqueryPartitionColumnRelationId;
1372
1373 break;
1374 }
1375
1376 if (!targetTableHasPartitionColumn)
1377 {
1378 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1379 "cannot perform distributed INSERT INTO ... SELECT "
1380 "because the partition columns in the source table "
1381 "and subquery do not match",
1382 "the query doesn't include the target table's "
1383 "partition column",
1384 NULL);
1385 }
1386
1387 return NULL;
1388 }
1389
1390
1391 /*
1392 * CreateNonPushableInsertSelectPlan creates a query plan for a SELECT into a
1393 * distributed table. The query plan can also be executed on a worker in MX.
1394 */
1395 static DistributedPlan *
CreateNonPushableInsertSelectPlan(uint64 planId,Query * parse,ParamListInfo boundParams)1396 CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo boundParams)
1397 {
1398 Query *insertSelectQuery = copyObject(parse);
1399
1400 RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
1401 RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(insertSelectQuery);
1402 Oid targetRelationId = insertRte->relid;
1403
1404 DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
1405 distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
1406
1407 distributedPlan->planningError =
1408 NonPushableInsertSelectSupported(insertSelectQuery);
1409
1410 if (distributedPlan->planningError != NULL)
1411 {
1412 return distributedPlan;
1413 }
1414
1415 Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
1416
1417 selectRte->subquery = selectQuery;
1418 ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
1419
1420 /*
1421 * Cast types of insert target list and select projection list to
1422 * match the column types of the target relation.
1423 */
1424 selectQuery->targetList =
1425 AddInsertSelectCasts(insertSelectQuery->targetList,
1426 selectQuery->targetList,
1427 targetRelationId);
1428
1429 /*
1430 * Later we might need to call WrapTaskListForProjection(), which requires
1431 * that select target list has unique names, otherwise the outer query
1432 * cannot select columns unambiguously. So we relabel select columns to
1433 * match target columns.
1434 */
1435 List *insertTargetList = insertSelectQuery->targetList;
1436 RelabelTargetEntryList(selectQuery->targetList, insertTargetList);
1437
1438 /*
1439 * Make a copy of the select query, since following code scribbles it
1440 * but we need to keep the original for EXPLAIN.
1441 */
1442 Query *selectQueryCopy = copyObject(selectQuery);
1443
1444 /* plan the subquery, this may be another distributed query */
1445 int cursorOptions = CURSOR_OPT_PARALLEL_OK;
1446 PlannedStmt *selectPlan = pg_plan_query_compat(selectQueryCopy, NULL, cursorOptions,
1447 boundParams);
1448
1449 bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
1450 IsSupportedRedistributionTarget(targetRelationId);
1451
1452 distributedPlan->insertSelectQuery = insertSelectQuery;
1453 distributedPlan->selectPlanForInsertSelect = selectPlan;
1454 distributedPlan->insertSelectMethod = repartitioned ?
1455 INSERT_SELECT_REPARTITION :
1456 INSERT_SELECT_VIA_COORDINATOR;
1457 distributedPlan->expectResults = insertSelectQuery->returningList != NIL;
1458 distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId);
1459 distributedPlan->targetRelationId = targetRelationId;
1460
1461 return distributedPlan;
1462 }
1463
1464
1465 /*
1466 * NonPushableInsertSelectSupported returns an error if executing an
1467 * INSERT ... SELECT command by pulling results of the SELECT to the coordinator
1468 * or with repartitioning is unsupported because it needs to generate sequence
1469 * values or insert into an append-distributed table.
1470 */
1471 static DeferredErrorMessage *
NonPushableInsertSelectSupported(Query * insertSelectQuery)1472 NonPushableInsertSelectSupported(Query *insertSelectQuery)
1473 {
1474 DeferredErrorMessage *deferredError = ErrorIfOnConflictNotSupported(
1475 insertSelectQuery);
1476 if (deferredError)
1477 {
1478 return deferredError;
1479 }
1480
1481 RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
1482 if (IsCitusTableType(insertRte->relid, APPEND_DISTRIBUTED))
1483 {
1484 return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
1485 "INSERT ... SELECT into an append-distributed table is "
1486 "not supported", NULL, NULL);
1487 }
1488
1489 return NULL;
1490 }
1491
1492
1493 /*
1494 * InsertSelectResultPrefix returns the prefix to use for intermediate
1495 * results of an INSERT ... SELECT via the coordinator that runs in two
1496 * phases in order to do RETURNING or ON CONFLICT.
1497 */
1498 char *
InsertSelectResultIdPrefix(uint64 planId)1499 InsertSelectResultIdPrefix(uint64 planId)
1500 {
1501 StringInfo resultIdPrefix = makeStringInfo();
1502
1503 appendStringInfo(resultIdPrefix, "insert_select_" UINT64_FORMAT, planId);
1504
1505 return resultIdPrefix->data;
1506 }
1507
1508
1509 /*
1510 * RelabelTargetEntryList relabels select target list to have matching names with
1511 * insert target list.
1512 */
1513 static void
RelabelTargetEntryList(List * selectTargetList,List * insertTargetList)1514 RelabelTargetEntryList(List *selectTargetList, List *insertTargetList)
1515 {
1516 ListCell *selectTargetCell = NULL;
1517 ListCell *insertTargetCell = NULL;
1518
1519 forboth(selectTargetCell, selectTargetList, insertTargetCell, insertTargetList)
1520 {
1521 TargetEntry *selectTargetEntry = lfirst(selectTargetCell);
1522 TargetEntry *insertTargetEntry = lfirst(insertTargetCell);
1523
1524 selectTargetEntry->resname = insertTargetEntry->resname;
1525 }
1526 }
1527
1528
1529 /*
1530 * AddInsertSelectCasts makes sure that the types in columns in the given
1531 * target lists have the same type as the columns of the given relation.
1532 * It might add casts to ensure that.
1533 *
1534 * It returns the updated selectTargetList.
1535 */
1536 static List *
AddInsertSelectCasts(List * insertTargetList,List * selectTargetList,Oid targetRelationId)1537 AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
1538 Oid targetRelationId)
1539 {
1540 ListCell *insertEntryCell = NULL;
1541 ListCell *selectEntryCell = NULL;
1542 List *projectedEntries = NIL;
1543 List *nonProjectedEntries = NIL;
1544
1545 /*
1546 * ReorderInsertSelectTargetLists() makes sure that first few columns of
1547 * the SELECT query match the insert targets. It might contain additional
1548 * items for GROUP BY, etc.
1549 */
1550 Assert(list_length(insertTargetList) <= list_length(selectTargetList));
1551
1552 Relation distributedRelation = table_open(targetRelationId, RowExclusiveLock);
1553 TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
1554
1555 int targetEntryIndex = 0;
1556 forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList)
1557 {
1558 TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
1559 TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
1560 Var *insertColumn = (Var *) insertEntry->expr;
1561 Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
1562 insertEntry->resno - 1);
1563
1564 Oid sourceType = insertColumn->vartype;
1565 Oid targetType = attr->atttypid;
1566 if (sourceType != targetType)
1567 {
1568 insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
1569 attr->attcollation, attr->atttypmod);
1570
1571 /*
1572 * We cannot modify the selectEntry in-place, because ORDER BY or
1573 * GROUP BY clauses might be pointing to it with comparison types
1574 * of the source type. So instead we keep the original one as a
1575 * non-projected entry, so GROUP BY and ORDER BY are happy, and
1576 * create a duplicated projected entry with the coerced expression.
1577 */
1578 TargetEntry *coercedEntry = copyObject(selectEntry);
1579 coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType,
1580 targetType, attr->attcollation,
1581 attr->atttypmod);
1582 coercedEntry->ressortgroupref = 0;
1583
1584 /*
1585 * The only requirement is that users don't use this name in ORDER BY
1586 * or GROUP BY, and it should be unique across the same query.
1587 */
1588 StringInfo resnameString = makeStringInfo();
1589 appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex);
1590 coercedEntry->resname = resnameString->data;
1591
1592 projectedEntries = lappend(projectedEntries, coercedEntry);
1593
1594 if (selectEntry->ressortgroupref != 0)
1595 {
1596 selectEntry->resjunk = true;
1597
1598 /*
1599 * This entry might still end up in the SELECT output list, so
1600 * rename it to avoid ambiguity.
1601 *
1602 * See https://github.com/citusdata/citus/pull/3470.
1603 */
1604 resnameString = makeStringInfo();
1605 appendStringInfo(resnameString, "discarded_target_item_%d",
1606 targetEntryIndex);
1607 selectEntry->resname = resnameString->data;
1608
1609 nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
1610 }
1611 }
1612 else
1613 {
1614 projectedEntries = lappend(projectedEntries, selectEntry);
1615 }
1616
1617 targetEntryIndex++;
1618 }
1619
1620 for (int entryIndex = list_length(insertTargetList);
1621 entryIndex < list_length(selectTargetList);
1622 entryIndex++)
1623 {
1624 nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList,
1625 entryIndex));
1626 }
1627
1628 /* selectEntry->resno must be the ordinal number of the entry */
1629 selectTargetList = list_concat(projectedEntries, nonProjectedEntries);
1630 int entryResNo = 1;
1631 TargetEntry *selectTargetEntry = NULL;
1632 foreach_ptr(selectTargetEntry, selectTargetList)
1633 {
1634 selectTargetEntry->resno = entryResNo++;
1635 }
1636
1637 table_close(distributedRelation, NoLock);
1638
1639 return selectTargetList;
1640 }
1641
1642
1643 /*
1644 * CastExpr returns an expression which casts the given expr from sourceType to
1645 * the given targetType.
1646 */
1647 static Expr *
CastExpr(Expr * expr,Oid sourceType,Oid targetType,Oid targetCollation,int targetTypeMod)1648 CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
1649 int targetTypeMod)
1650 {
1651 Oid coercionFuncId = InvalidOid;
1652 CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType,
1653 COERCION_EXPLICIT,
1654 &coercionFuncId);
1655
1656 if (coercionType == COERCION_PATH_FUNC)
1657 {
1658 FuncExpr *coerceExpr = makeNode(FuncExpr);
1659 coerceExpr->funcid = coercionFuncId;
1660 coerceExpr->args = list_make1(copyObject(expr));
1661 coerceExpr->funccollid = targetCollation;
1662 coerceExpr->funcresulttype = targetType;
1663
1664 return (Expr *) coerceExpr;
1665 }
1666 else if (coercionType == COERCION_PATH_RELABELTYPE)
1667 {
1668 RelabelType *coerceExpr = makeNode(RelabelType);
1669 coerceExpr->arg = copyObject(expr);
1670 coerceExpr->resulttype = targetType;
1671 coerceExpr->resulttypmod = targetTypeMod;
1672 coerceExpr->resultcollid = targetCollation;
1673 coerceExpr->relabelformat = COERCE_IMPLICIT_CAST;
1674 coerceExpr->location = -1;
1675
1676 return (Expr *) coerceExpr;
1677 }
1678 else if (coercionType == COERCION_PATH_ARRAYCOERCE)
1679 {
1680 Oid sourceBaseType = get_base_element_type(sourceType);
1681 Oid targetBaseType = get_base_element_type(targetType);
1682
1683 CaseTestExpr *elemExpr = makeNode(CaseTestExpr);
1684 elemExpr->collation = targetCollation;
1685 elemExpr->typeId = sourceBaseType;
1686 elemExpr->typeMod = -1;
1687
1688 Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType,
1689 targetBaseType, targetCollation,
1690 targetTypeMod);
1691
1692 ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr);
1693 coerceExpr->arg = copyObject(expr);
1694 coerceExpr->elemexpr = elemCastExpr;
1695 coerceExpr->resultcollid = targetCollation;
1696 coerceExpr->resulttype = targetType;
1697 coerceExpr->resulttypmod = targetTypeMod;
1698 coerceExpr->location = -1;
1699 coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
1700
1701 return (Expr *) coerceExpr;
1702 }
1703 else if (coercionType == COERCION_PATH_COERCEVIAIO)
1704 {
1705 CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
1706 coerceExpr->arg = (Expr *) copyObject(expr);
1707 coerceExpr->resulttype = targetType;
1708 coerceExpr->resultcollid = targetCollation;
1709 coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
1710 coerceExpr->location = -1;
1711
1712 return (Expr *) coerceExpr;
1713 }
1714 else
1715 {
1716 ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d",
1717 sourceType, targetType)));
1718 }
1719 }
1720
1721
1722 /* PlanningInsertSelect returns true if we are planning an INSERT ...SELECT query */
1723 bool
PlanningInsertSelect(void)1724 PlanningInsertSelect(void)
1725 {
1726 return insertSelectPlannerLevel > 0;
1727 }
1728