1 /*-------------------------------------------------------------------------
2  *
3  * multi_physical_planner.h
4  *	  Type and function declarations used in creating the distributed execution
5  *	  plan.
6  *
7  * Copyright (c) Citus Data, Inc.
8  *
9  * $Id$
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #ifndef MULTI_PHYSICAL_PLANNER_H
15 #define MULTI_PHYSICAL_PLANNER_H
16 
17 #include "postgres.h"
18 
19 #include "distributed/pg_version_constants.h"
20 
21 #include "c.h"
22 
23 #include "datatype/timestamp.h"
24 #include "distributed/citus_nodes.h"
25 #include "distributed/errormessage.h"
26 #include "distributed/log_utils.h"
27 #include "distributed/metadata_utility.h"
28 #include "distributed/worker_manager.h"
29 #include "distributed/multi_logical_planner.h"
30 #include "distributed/distributed_planner.h"
31 #include "lib/stringinfo.h"
32 #include "nodes/parsenodes.h"
33 #include "utils/array.h"
34 
35 
36 /* Definitions local to the physical planner */
37 #define NON_PRUNABLE_JOIN -1
38 #define RESERVED_HASHED_COLUMN_ID MaxAttrNumber
39 #define MERGE_COLUMN_FORMAT "merge_column_%u"
40 #define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \
41  (" UINT64_FORMAT ", %u, %u, %u, '%s', %u)"
42 #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \
43  (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)"
44 #define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \
45  (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)"
46 #define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \
47  (" UINT64_FORMAT ", %d, '%s', '%s')"
48 
49 extern int RepartitionJoinBucketCountPerNode;
50 
51 typedef enum CitusRTEKind
52 {
53 	CITUS_RTE_RELATION = RTE_RELATION,  /* ordinary relation reference */
54 	CITUS_RTE_SUBQUERY = RTE_SUBQUERY,  /* subquery in FROM */
55 	CITUS_RTE_JOIN = RTE_JOIN,          /* join */
56 	CITUS_RTE_FUNCTION = RTE_FUNCTION,  /* function in FROM */
57 	CITUS_RTE_TABLEFUNC = RTE_TABLEFUNC, /* TableFunc(.., column list) */
58 	CITUS_RTE_VALUES = RTE_VALUES,      /* VALUES (<exprlist>), (<exprlist>), ... */
59 	CITUS_RTE_CTE = RTE_CTE,            /* common table expr (WITH list element) */
60 	CITUS_RTE_NAMEDTUPLESTORE = RTE_NAMEDTUPLESTORE, /* tuplestore, e.g. for triggers */
61 	CITUS_RTE_RESULT = RTE_RESULT,      /* RTE represents an empty FROM clause */
62 	CITUS_RTE_SHARD,
63 	CITUS_RTE_REMOTE_QUERY
64 } CitusRTEKind;
65 
66 
67 /* Enumeration that defines the partition type for a remote job */
68 typedef enum
69 {
70 	PARTITION_INVALID_FIRST = 0,
71 	RANGE_PARTITION_TYPE = 1,
72 	SINGLE_HASH_PARTITION_TYPE = 2,
73 	DUAL_HASH_PARTITION_TYPE = 3
74 } PartitionType;
75 
76 
77 /* Enumeration that defines different task types */
78 typedef enum
79 {
80 	TASK_TYPE_INVALID_FIRST,
81 	READ_TASK,
82 	MAP_TASK,
83 	MERGE_TASK,
84 	MAP_OUTPUT_FETCH_TASK,
85 	MERGE_FETCH_TASK,
86 	MODIFY_TASK,
87 	DDL_TASK,
88 	VACUUM_ANALYZE_TASK
89 } TaskType;
90 
91 
92 /* Enumeration that defines the task assignment policy to use */
93 typedef enum
94 {
95 	TASK_ASSIGNMENT_INVALID_FIRST = 0,
96 	TASK_ASSIGNMENT_GREEDY = 1,
97 	TASK_ASSIGNMENT_ROUND_ROBIN = 2,
98 	TASK_ASSIGNMENT_FIRST_REPLICA = 3
99 } TaskAssignmentPolicyType;
100 
101 
102 /* Enumeration that defines different job types */
103 typedef enum
104 {
105 	JOB_INVALID_FIRST = 0,
106 	JOIN_MAP_MERGE_JOB = 1,
107 	SUBQUERY_MAP_MERGE_JOB = 2,
108 	TOP_LEVEL_WORKER_JOB = 3
109 } BoundaryNodeJobType;
110 
111 
112 /* Enumeration that specifies extent of DML modifications */
113 typedef enum RowModifyLevel
114 {
115 	ROW_MODIFY_NONE = 0,
116 	ROW_MODIFY_READONLY = 1,
117 	ROW_MODIFY_COMMUTATIVE = 2,
118 	ROW_MODIFY_NONCOMMUTATIVE = 3
119 } RowModifyLevel;
120 
121 
122 /*
123  * LocalPlannedStatement represents a local plan of a shard. The scope
124  * for the LocalPlannedStatement is Task.
125  */
126 typedef struct LocalPlannedStatement
127 {
128 	CitusNode type;
129 
130 	uint64 shardId;
131 	uint32 localGroupId;
132 	PlannedStmt *localPlan;
133 } LocalPlannedStatement;
134 
135 
136 /*
137  * Job represents a logical unit of work that contains one set of data transfers
138  * in our physical plan. The physical planner maps each SQL query into one or
139  * more jobs depending on the query's complexity, and sets dependencies between
140  * these jobs. Each job consists of multiple executable tasks; and these tasks
141  * either operate on base shards, or repartitioned tables.
142  */
143 typedef struct Job
144 {
145 	CitusNode type;
146 	uint64 jobId;
147 	Query *jobQuery;
148 	List *taskList;
149 	List *dependentJobList;
150 	bool subqueryPushdown;
151 	bool requiresCoordinatorEvaluation; /* only applies to modify jobs */
152 	bool deferredPruning;
153 	Const *partitionKeyValue;
154 
155 	/* for local shard queries, we may save the local plan here */
156 	List *localPlannedStatements;
157 
158 	/*
159 	 * When we evaluate functions and parameters in jobQuery then we
160 	 * should no longer send the list of parameters along with the
161 	 * query.
162 	 */
163 	bool parametersInJobQueryResolved;
164 } Job;
165 
166 
167 /* Defines a repartitioning job and holds additional related data. */
168 typedef struct MapMergeJob
169 {
170 	Job job;
171 	Query *reduceQuery;
172 	PartitionType partitionType;
173 	Var *partitionColumn;
174 	uint32 partitionCount;
175 	int sortedShardIntervalArrayLength;
176 	ShardInterval **sortedShardIntervalArray; /* only applies to range partitioning */
177 	List *mapTaskList;
178 	List *mergeTaskList;
179 } MapMergeJob;
180 
181 typedef enum TaskQueryType
182 {
183 	TASK_QUERY_NULL,
184 	TASK_QUERY_TEXT,
185 	TASK_QUERY_OBJECT,
186 	TASK_QUERY_TEXT_LIST
187 } TaskQueryType;
188 
189 typedef struct TaskQuery
190 {
191 	TaskQueryType queryType;
192 
193 	union
194 	{
195 		/*
196 		 * For most queries jobQueryReferenceForLazyDeparsing and/or queryStringLazy is not
197 		 * NULL. This means we have a single query for all placements.
198 		 *
199 		 * If this is not the case, the length of perPlacementQueryStrings is
200 		 * non-zero and equal to length of taskPlacementList. Like this it can
201 		 * assign a different query for each placement. We need this flexibility
202 		 * when a query should return node specific values. For example, on which
203 		 * node did we succeed storing some result files?
204 		 *
205 		 * jobQueryReferenceForLazyDeparsing is only not null when the planner thinks the
206 		 * query could possibly be locally executed. In that case deparsing+parsing
207 		 * the query might not be necessary, so we do that lazily.
208 		 *
209 		 * jobQueryReferenceForLazyDeparsing should only be set by using SetTaskQueryIfShouldLazyDeparse()
210 		 */
211 		Query *jobQueryReferenceForLazyDeparsing;
212 
213 		/*
214 		 * In almost all cases queryStringLazy should be read only indirectly by
215 		 * using TaskQueryString(). This will populate the field if only the
216 		 * jobQueryReferenceForLazyDeparsing field is not NULL.
217 		 *
218 		 * This field should only be set by using SetTaskQueryString() (or as a
219 		 * side effect from TaskQueryString()). Otherwise it might not be in sync
220 		 * with jobQueryReferenceForLazyDeparsing.
221 		 */
222 		char *queryStringLazy;
223 
224 		/*
225 		 * queryStringList contains query strings. They should be
226 		 * run sequentially. The concatenated version of this list
227 		 * will already be set for queryStringLazy, this can be useful
228 		 * when we want to access each query string.
229 		 */
230 		List *queryStringList;
231 	}data;
232 }TaskQuery;
233 
234 struct TupleDestination;
235 
236 typedef struct Task
237 {
238 	CitusNode type;
239 	TaskType taskType;
240 	uint64 jobId;
241 	uint32 taskId;
242 
243 	/*
244 	 * taskQuery contains query string information. The way we get queryString can be different
245 	 * so this is abstracted with taskQuery.
246 	 */
247 	TaskQuery taskQuery;
248 
249 	/*
250 	 * A task can have multiple queries, in which case queryCount will be > 1. If
251 	 * a task has more one query, then taskQuery->queryType == TASK_QUERY_TEXT_LIST.
252 	 */
253 	int queryCount;
254 
255 	Oid anchorDistributedTableId;     /* only applies to insert tasks */
256 	uint64 anchorShardId;       /* only applies to compute tasks */
257 	List *taskPlacementList;    /* only applies to compute tasks */
258 	List *dependentTaskList;     /* only applies to compute tasks */
259 
260 	uint32 partitionId;
261 	uint32 upstreamTaskId;         /* only applies to data fetch tasks */
262 	ShardInterval *shardInterval;  /* only applies to merge tasks */
263 	bool assignmentConstrained;    /* only applies to merge tasks */
264 	char replicationModel;         /* only applies to modify tasks */
265 
266 	/*
267 	 * List of struct RelationRowLock. This contains an entry for each
268 	 * query identified as a FOR [KEY] UPDATE/SHARE target. Citus
269 	 * converts PostgreSQL's RowMarkClause to RelationRowLock in
270 	 * RowLocksOnRelations().
271 	 */
272 	List *relationRowLockList;
273 
274 	bool modifyWithSubquery;
275 
276 	/*
277 	 * List of struct RelationShard. This represents the mapping of relations
278 	 * in the RTE list to shard IDs for a task for the purposes of:
279 	 *  - Locking: See AcquireExecutorShardLocks()
280 	 *  - Deparsing: See UpdateRelationToShardNames()
281 	 *  - Relation Access Tracking
282 	 */
283 	List *relationShardList;
284 
285 	List *rowValuesLists;          /* rows to use when building multi-row INSERT */
286 
287 	/*
288 	 * Used only when local execution happens. Indicates that this task is part of
289 	 * both local and remote executions. We use "or" in the field name because this
290 	 * is set to true for both the remote and local tasks generated for such
291 	 * executions. The most common example is modifications to reference tables where
292 	 * the task splitted into local and remote tasks.
293 	 */
294 	bool partiallyLocalOrRemote;
295 
296 	/*
297 	 * When we evaluate functions and parameters in the query string then
298 	 * we should no longer send the list of parameters long with the
299 	 * query.
300 	 */
301 	bool parametersInQueryStringResolved;
302 
303 	/*
304 	 * Destination of tuples generated as a result of executing this task. Can be
305 	 * NULL, in which case executor might use a default destination.
306 	 */
307 	struct TupleDestination *tupleDest;
308 
309 	/*
310 	 * totalReceivedTupleData only counts the data for a single placement. So
311 	 * for RETURNING DML this is not really correct. This is used by
312 	 * EXPLAIN ANALYZE, to display the amount of received bytes. The local execution
313 	 * does not increment this value, so only used for remote execution.
314 	 */
315 	uint64 totalReceivedTupleData;
316 
317 	/*
318 	 * EXPLAIN ANALYZE output fetched from worker. This is saved to be used later
319 	 * by RemoteExplain().
320 	 */
321 	char *fetchedExplainAnalyzePlan;
322 	int fetchedExplainAnalyzePlacementIndex;
323 
324 	/*
325 	 * Execution Duration fetched from worker. This is saved to be used later by
326 	 * ExplainTaskList().
327 	 */
328 	double fetchedExplainAnalyzeExecutionDuration;
329 
330 	/*
331 	 * isLocalTableModification is true if the task is on modifying a local table.
332 	 */
333 	bool isLocalTableModification;
334 } Task;
335 
336 
337 /*
338  * RangeTableFragment represents a fragment of a range table. This fragment
339  * could be a regular shard or a merged table formed in a MapMerge job.
340  */
341 typedef struct RangeTableFragment
342 {
343 	CitusRTEKind fragmentType;
344 	void *fragmentReference;
345 	uint32 rangeTableId;
346 } RangeTableFragment;
347 
348 
349 /*
350  * JoinSequenceNode represents a range table in an ordered sequence of tables
351  * joined together. This representation helps build combinations of all range
352  * table fragments during task generation.
353  */
354 typedef struct JoinSequenceNode
355 {
356 	uint32 rangeTableId;
357 	int32 joiningRangeTableId;
358 } JoinSequenceNode;
359 
360 
361 /*
362  * InsertSelectMethod represents the method to use for INSERT INTO ... SELECT
363  * queries.
364  *
365  * Note that there is a third method which is not represented here, which is
366  * pushing down the INSERT INTO ... SELECT to workers. This method is executed
367  * similar to other distributed queries and doesn't need a special execution
368  * code, so we don't need to represent it here.
369  */
370 typedef enum InsertSelectMethod
371 {
372 	INSERT_SELECT_VIA_COORDINATOR,
373 	INSERT_SELECT_REPARTITION
374 } InsertSelectMethod;
375 
376 
377 /*
378  * DistributedPlan contains all information necessary to execute a
379  * distribute query.
380  */
381 typedef struct DistributedPlan
382 {
383 	CitusNode type;
384 
385 	/* unique identifier of the plan within the session */
386 	uint64 planId;
387 
388 	/* specifies nature of modifications in query */
389 	RowModifyLevel modLevel;
390 
391 	/*
392 	 * specifies whether plan returns results,
393 	 * either as a SELECT or a DML which has RETURNING.
394 	 */
395 	bool expectResults;
396 
397 	/* job tree containing the tasks to be executed on workers */
398 	Job *workerJob;
399 
400 	/* local query that merges results from the workers */
401 	Query *combineQuery;
402 
403 	/* query identifier (copied from the top-level PlannedStmt) */
404 	uint64 queryId;
405 
406 	/* which relations are accessed by this distributed plan */
407 	List *relationIdList;
408 
409 	/* target relation of a modification */
410 	Oid targetRelationId;
411 
412 	/*
413 	 * INSERT .. SELECT via the coordinator or repartition */
414 	Query *insertSelectQuery;
415 	PlannedStmt *selectPlanForInsertSelect;
416 	InsertSelectMethod insertSelectMethod;
417 
418 	/*
419 	 * If intermediateResultIdPrefix is non-null, an INSERT ... SELECT
420 	 * via the coordinator is written to a set of intermediate results
421 	 * named according to <intermediateResultIdPrefix>_<anchorShardId>.
422 	 * That way we can run a distributed INSERT ... SELECT with
423 	 * RETURNING or ON CONFLICT from the intermediate results to the
424 	 * target relation.
425 	 */
426 	char *intermediateResultIdPrefix;
427 
428 	/* list of subplans to execute before the distributed query */
429 	List *subPlanList;
430 
431 	/*
432 	 * List of subPlans that are used in the DistributedPlan
433 	 * Note that this is different that "subPlanList" field which
434 	 * contains the subplans generated as part of the DistributedPlan.
435 	 *
436 	 * On the other hand, usedSubPlanNodeList keeps track of which subPlans
437 	 * are used within this distributed plan as a list of
438 	 * UsedDistributedSubPlan pointers.
439 	 *
440 	 * The list may contain duplicates if the subplan is referenced multiple
441 	 * times (e.g. a CTE appears in the query tree multiple times).
442 	 */
443 	List *usedSubPlanNodeList;
444 
445 	/*
446 	 * When the query is very simple such that we don't need to call
447 	 * standard_planner(). See FastPathRouterQuery() for the definition.
448 	 */
449 	bool fastPathRouterPlan;
450 
451 	/* number of times this plan has been used (as a prepared statement) */
452 	uint32 numberOfTimesExecuted;
453 
454 	/*
455 	 * NULL if this a valid plan, an error description otherwise. This will
456 	 * e.g. be set if SQL features are present that a planner doesn't support,
457 	 * or if prepared statement parameters prevented successful planning.
458 	 */
459 	DeferredErrorMessage *planningError;
460 } DistributedPlan;
461 
462 
463 /*
464  * DistributedSubPlan contains a subplan of a distributed plan. Subplans are
465  * executed before the distributed query and their results are written to
466  * temporary files. This is used to execute CTEs and subquery joins that
467  * cannot be distributed.
468  */
469 typedef struct DistributedSubPlan
470 {
471 	CitusNode type;
472 
473 	uint32 subPlanId;
474 	PlannedStmt *plan;
475 
476 	/* EXPLAIN ANALYZE instrumentations */
477 	uint64 bytesSentPerWorker;
478 	uint32 remoteWorkerCount;
479 	double durationMillisecs;
480 	bool writeLocalFile;
481 } DistributedSubPlan;
482 
483 
484 /* defines how a subplan is used by a distributed query */
485 typedef enum SubPlanAccessType
486 {
487 	SUBPLAN_ACCESS_NONE,
488 	SUBPLAN_ACCESS_LOCAL,
489 	SUBPLAN_ACCESS_REMOTE,
490 	SUBPLAN_ACCESS_ANYWHERE
491 } SubPlanAccessType;
492 
493 
494 /*
495  * UsedDistributedSubPlan contains information about a subPlan that is used in a
496  * distributed plan.
497  */
498 typedef struct UsedDistributedSubPlan
499 {
500 	CitusNode type;
501 
502 	/* subplan used by the distributed query */
503 	char *subPlanId;
504 
505 	/* how the subplan is used by a distributed query */
506 	SubPlanAccessType accessType;
507 } UsedDistributedSubPlan;
508 
509 
510 /* OperatorCacheEntry contains information for each element in OperatorCache */
511 typedef struct OperatorCacheEntry
512 {
513 	/* cache key consists of typeId, accessMethodId and strategyNumber */
514 	Oid typeId;
515 	Oid accessMethodId;
516 	int16 strategyNumber;
517 	Oid operatorId;
518 	Oid operatorClassInputType;
519 	char typeType;
520 } OperatorCacheEntry;
521 
522 
523 /* Named function pointer type for reordering Task lists */
524 typedef List *(*ReorderFunction)(List *);
525 
526 
527 /* Config variable managed via guc.c */
528 extern int TaskAssignmentPolicy;
529 extern bool EnableUniqueJobIds;
530 
531 
532 /* Function declarations for building physical plans and constructing queries */
533 extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
534 													   PlannerRestrictionContext *
535 													   plannerRestrictionContext);
536 extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
537 							  char *queryString);
538 
539 extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber);
540 extern Node *  WrapUngroupedVarsInAnyValueAggregate(Node *expression,
541 													List *groupClauseList,
542 													List *targetList,
543 													bool checkExpressionEquality);
544 extern CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType);
545 
546 /*
547  * Function declarations for building, updating constraints and simple operator
548  * expression check.
549  */
550 extern Node * BuildBaseConstraint(Var *column);
551 extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval);
552 extern bool BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOperand);
553 extern bool SimpleOpExpression(Expr *clause);
554 
555 /* helper functions */
556 extern Var * MakeInt4Column(void);
557 extern int CompareShardPlacements(const void *leftElement, const void *rightElement);
558 extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
559 								  ShardInterval *secondInterval);
560 extern bool ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax,
561 											Datum secondMin, Datum secondMax,
562 											FmgrInfo *comparisonFunction,
563 											Oid collation);
564 extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
565 extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount);
566 extern RowModifyLevel RowModifyLevelForQuery(Query *query);
567 extern StringInfo ArrayObjectToString(ArrayType *arrayObject,
568 									  Oid columnType, int32 columnTypeMod);
569 
570 
571 /* function declarations for Task and Task list operations */
572 extern bool TasksEqual(const Task *a, const Task *b);
573 extern bool TaskListMember(const List *taskList, const Task *task);
574 extern List * TaskListDifference(const List *list1, const List *list2);
575 extern List * AssignAnchorShardTaskList(List *taskList);
576 extern List * FirstReplicaAssignTaskList(List *taskList);
577 extern List * RoundRobinAssignTaskList(List *taskList);
578 extern List * RoundRobinReorder(List *placementList);
579 extern void SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode);
580 extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement);
581 extern int CompareTasksByExecutionDuration(const void *leftElement, const
582 										   void *rightElement);
583 
584 /* function declaration for creating Task */
585 extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId,
586 									   RelationRestrictionContext *
587 									   relationRestrictionContext,
588 									   List *prunedRelationShardList, TaskType taskType,
589 									   bool modifyRequiresCoordinatorEvaluation,
590 									   DeferredErrorMessage **planningError);
591 
592 extern bool ModifyLocalTableJob(Job *job);
593 
594 /* function declarations for managing jobs */
595 extern uint64 UniqueJobId(void);
596 
597 
598 extern List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId);
599 extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnList,
600 											  List *tableIdList,
601 											  List *funcColumnNames,
602 											  List *funcColumnTypes,
603 											  List *funcColumnTypeMods,
604 											  List *funcCollations);
605 
606 extern List * FetchEqualityAttrNumsForRTE(Node *quals);
607 
608 #endif   /* MULTI_PHYSICAL_PLANNER_H */
609