1 /*------------------------------------------------------------------------- 2 * 3 * multi_physical_planner.h 4 * Type and function declarations used in creating the distributed execution 5 * plan. 6 * 7 * Copyright (c) Citus Data, Inc. 8 * 9 * $Id$ 10 * 11 *------------------------------------------------------------------------- 12 */ 13 14 #ifndef MULTI_PHYSICAL_PLANNER_H 15 #define MULTI_PHYSICAL_PLANNER_H 16 17 #include "postgres.h" 18 19 #include "distributed/pg_version_constants.h" 20 21 #include "c.h" 22 23 #include "datatype/timestamp.h" 24 #include "distributed/citus_nodes.h" 25 #include "distributed/errormessage.h" 26 #include "distributed/log_utils.h" 27 #include "distributed/metadata_utility.h" 28 #include "distributed/worker_manager.h" 29 #include "distributed/multi_logical_planner.h" 30 #include "distributed/distributed_planner.h" 31 #include "lib/stringinfo.h" 32 #include "nodes/parsenodes.h" 33 #include "utils/array.h" 34 35 36 /* Definitions local to the physical planner */ 37 #define NON_PRUNABLE_JOIN -1 38 #define RESERVED_HASHED_COLUMN_ID MaxAttrNumber 39 #define MERGE_COLUMN_FORMAT "merge_column_%u" 40 #define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \ 41 (" UINT64_FORMAT ", %u, %u, %u, '%s', %u)" 42 #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ 43 (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)" 44 #define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \ 45 (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)" 46 #define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \ 47 (" UINT64_FORMAT ", %d, '%s', '%s')" 48 49 extern int RepartitionJoinBucketCountPerNode; 50 51 typedef enum CitusRTEKind 52 { 53 CITUS_RTE_RELATION = RTE_RELATION, /* ordinary relation reference */ 54 CITUS_RTE_SUBQUERY = RTE_SUBQUERY, /* subquery in FROM */ 55 CITUS_RTE_JOIN = RTE_JOIN, /* join */ 56 CITUS_RTE_FUNCTION = RTE_FUNCTION, /* function in FROM */ 57 CITUS_RTE_TABLEFUNC = RTE_TABLEFUNC, /* TableFunc(.., column list) */ 58 CITUS_RTE_VALUES = RTE_VALUES, /* VALUES (<exprlist>), (<exprlist>), ... */ 59 CITUS_RTE_CTE = RTE_CTE, /* common table expr (WITH list element) */ 60 CITUS_RTE_NAMEDTUPLESTORE = RTE_NAMEDTUPLESTORE, /* tuplestore, e.g. for triggers */ 61 CITUS_RTE_RESULT = RTE_RESULT, /* RTE represents an empty FROM clause */ 62 CITUS_RTE_SHARD, 63 CITUS_RTE_REMOTE_QUERY 64 } CitusRTEKind; 65 66 67 /* Enumeration that defines the partition type for a remote job */ 68 typedef enum 69 { 70 PARTITION_INVALID_FIRST = 0, 71 RANGE_PARTITION_TYPE = 1, 72 SINGLE_HASH_PARTITION_TYPE = 2, 73 DUAL_HASH_PARTITION_TYPE = 3 74 } PartitionType; 75 76 77 /* Enumeration that defines different task types */ 78 typedef enum 79 { 80 TASK_TYPE_INVALID_FIRST, 81 READ_TASK, 82 MAP_TASK, 83 MERGE_TASK, 84 MAP_OUTPUT_FETCH_TASK, 85 MERGE_FETCH_TASK, 86 MODIFY_TASK, 87 DDL_TASK, 88 VACUUM_ANALYZE_TASK 89 } TaskType; 90 91 92 /* Enumeration that defines the task assignment policy to use */ 93 typedef enum 94 { 95 TASK_ASSIGNMENT_INVALID_FIRST = 0, 96 TASK_ASSIGNMENT_GREEDY = 1, 97 TASK_ASSIGNMENT_ROUND_ROBIN = 2, 98 TASK_ASSIGNMENT_FIRST_REPLICA = 3 99 } TaskAssignmentPolicyType; 100 101 102 /* Enumeration that defines different job types */ 103 typedef enum 104 { 105 JOB_INVALID_FIRST = 0, 106 JOIN_MAP_MERGE_JOB = 1, 107 SUBQUERY_MAP_MERGE_JOB = 2, 108 TOP_LEVEL_WORKER_JOB = 3 109 } BoundaryNodeJobType; 110 111 112 /* Enumeration that specifies extent of DML modifications */ 113 typedef enum RowModifyLevel 114 { 115 ROW_MODIFY_NONE = 0, 116 ROW_MODIFY_READONLY = 1, 117 ROW_MODIFY_COMMUTATIVE = 2, 118 ROW_MODIFY_NONCOMMUTATIVE = 3 119 } RowModifyLevel; 120 121 122 /* 123 * LocalPlannedStatement represents a local plan of a shard. The scope 124 * for the LocalPlannedStatement is Task. 125 */ 126 typedef struct LocalPlannedStatement 127 { 128 CitusNode type; 129 130 uint64 shardId; 131 uint32 localGroupId; 132 PlannedStmt *localPlan; 133 } LocalPlannedStatement; 134 135 136 /* 137 * Job represents a logical unit of work that contains one set of data transfers 138 * in our physical plan. The physical planner maps each SQL query into one or 139 * more jobs depending on the query's complexity, and sets dependencies between 140 * these jobs. Each job consists of multiple executable tasks; and these tasks 141 * either operate on base shards, or repartitioned tables. 142 */ 143 typedef struct Job 144 { 145 CitusNode type; 146 uint64 jobId; 147 Query *jobQuery; 148 List *taskList; 149 List *dependentJobList; 150 bool subqueryPushdown; 151 bool requiresCoordinatorEvaluation; /* only applies to modify jobs */ 152 bool deferredPruning; 153 Const *partitionKeyValue; 154 155 /* for local shard queries, we may save the local plan here */ 156 List *localPlannedStatements; 157 158 /* 159 * When we evaluate functions and parameters in jobQuery then we 160 * should no longer send the list of parameters along with the 161 * query. 162 */ 163 bool parametersInJobQueryResolved; 164 } Job; 165 166 167 /* Defines a repartitioning job and holds additional related data. */ 168 typedef struct MapMergeJob 169 { 170 Job job; 171 Query *reduceQuery; 172 PartitionType partitionType; 173 Var *partitionColumn; 174 uint32 partitionCount; 175 int sortedShardIntervalArrayLength; 176 ShardInterval **sortedShardIntervalArray; /* only applies to range partitioning */ 177 List *mapTaskList; 178 List *mergeTaskList; 179 } MapMergeJob; 180 181 typedef enum TaskQueryType 182 { 183 TASK_QUERY_NULL, 184 TASK_QUERY_TEXT, 185 TASK_QUERY_OBJECT, 186 TASK_QUERY_TEXT_LIST 187 } TaskQueryType; 188 189 typedef struct TaskQuery 190 { 191 TaskQueryType queryType; 192 193 union 194 { 195 /* 196 * For most queries jobQueryReferenceForLazyDeparsing and/or queryStringLazy is not 197 * NULL. This means we have a single query for all placements. 198 * 199 * If this is not the case, the length of perPlacementQueryStrings is 200 * non-zero and equal to length of taskPlacementList. Like this it can 201 * assign a different query for each placement. We need this flexibility 202 * when a query should return node specific values. For example, on which 203 * node did we succeed storing some result files? 204 * 205 * jobQueryReferenceForLazyDeparsing is only not null when the planner thinks the 206 * query could possibly be locally executed. In that case deparsing+parsing 207 * the query might not be necessary, so we do that lazily. 208 * 209 * jobQueryReferenceForLazyDeparsing should only be set by using SetTaskQueryIfShouldLazyDeparse() 210 */ 211 Query *jobQueryReferenceForLazyDeparsing; 212 213 /* 214 * In almost all cases queryStringLazy should be read only indirectly by 215 * using TaskQueryString(). This will populate the field if only the 216 * jobQueryReferenceForLazyDeparsing field is not NULL. 217 * 218 * This field should only be set by using SetTaskQueryString() (or as a 219 * side effect from TaskQueryString()). Otherwise it might not be in sync 220 * with jobQueryReferenceForLazyDeparsing. 221 */ 222 char *queryStringLazy; 223 224 /* 225 * queryStringList contains query strings. They should be 226 * run sequentially. The concatenated version of this list 227 * will already be set for queryStringLazy, this can be useful 228 * when we want to access each query string. 229 */ 230 List *queryStringList; 231 }data; 232 }TaskQuery; 233 234 struct TupleDestination; 235 236 typedef struct Task 237 { 238 CitusNode type; 239 TaskType taskType; 240 uint64 jobId; 241 uint32 taskId; 242 243 /* 244 * taskQuery contains query string information. The way we get queryString can be different 245 * so this is abstracted with taskQuery. 246 */ 247 TaskQuery taskQuery; 248 249 /* 250 * A task can have multiple queries, in which case queryCount will be > 1. If 251 * a task has more one query, then taskQuery->queryType == TASK_QUERY_TEXT_LIST. 252 */ 253 int queryCount; 254 255 Oid anchorDistributedTableId; /* only applies to insert tasks */ 256 uint64 anchorShardId; /* only applies to compute tasks */ 257 List *taskPlacementList; /* only applies to compute tasks */ 258 List *dependentTaskList; /* only applies to compute tasks */ 259 260 uint32 partitionId; 261 uint32 upstreamTaskId; /* only applies to data fetch tasks */ 262 ShardInterval *shardInterval; /* only applies to merge tasks */ 263 bool assignmentConstrained; /* only applies to merge tasks */ 264 char replicationModel; /* only applies to modify tasks */ 265 266 /* 267 * List of struct RelationRowLock. This contains an entry for each 268 * query identified as a FOR [KEY] UPDATE/SHARE target. Citus 269 * converts PostgreSQL's RowMarkClause to RelationRowLock in 270 * RowLocksOnRelations(). 271 */ 272 List *relationRowLockList; 273 274 bool modifyWithSubquery; 275 276 /* 277 * List of struct RelationShard. This represents the mapping of relations 278 * in the RTE list to shard IDs for a task for the purposes of: 279 * - Locking: See AcquireExecutorShardLocks() 280 * - Deparsing: See UpdateRelationToShardNames() 281 * - Relation Access Tracking 282 */ 283 List *relationShardList; 284 285 List *rowValuesLists; /* rows to use when building multi-row INSERT */ 286 287 /* 288 * Used only when local execution happens. Indicates that this task is part of 289 * both local and remote executions. We use "or" in the field name because this 290 * is set to true for both the remote and local tasks generated for such 291 * executions. The most common example is modifications to reference tables where 292 * the task splitted into local and remote tasks. 293 */ 294 bool partiallyLocalOrRemote; 295 296 /* 297 * When we evaluate functions and parameters in the query string then 298 * we should no longer send the list of parameters long with the 299 * query. 300 */ 301 bool parametersInQueryStringResolved; 302 303 /* 304 * Destination of tuples generated as a result of executing this task. Can be 305 * NULL, in which case executor might use a default destination. 306 */ 307 struct TupleDestination *tupleDest; 308 309 /* 310 * totalReceivedTupleData only counts the data for a single placement. So 311 * for RETURNING DML this is not really correct. This is used by 312 * EXPLAIN ANALYZE, to display the amount of received bytes. The local execution 313 * does not increment this value, so only used for remote execution. 314 */ 315 uint64 totalReceivedTupleData; 316 317 /* 318 * EXPLAIN ANALYZE output fetched from worker. This is saved to be used later 319 * by RemoteExplain(). 320 */ 321 char *fetchedExplainAnalyzePlan; 322 int fetchedExplainAnalyzePlacementIndex; 323 324 /* 325 * Execution Duration fetched from worker. This is saved to be used later by 326 * ExplainTaskList(). 327 */ 328 double fetchedExplainAnalyzeExecutionDuration; 329 330 /* 331 * isLocalTableModification is true if the task is on modifying a local table. 332 */ 333 bool isLocalTableModification; 334 } Task; 335 336 337 /* 338 * RangeTableFragment represents a fragment of a range table. This fragment 339 * could be a regular shard or a merged table formed in a MapMerge job. 340 */ 341 typedef struct RangeTableFragment 342 { 343 CitusRTEKind fragmentType; 344 void *fragmentReference; 345 uint32 rangeTableId; 346 } RangeTableFragment; 347 348 349 /* 350 * JoinSequenceNode represents a range table in an ordered sequence of tables 351 * joined together. This representation helps build combinations of all range 352 * table fragments during task generation. 353 */ 354 typedef struct JoinSequenceNode 355 { 356 uint32 rangeTableId; 357 int32 joiningRangeTableId; 358 } JoinSequenceNode; 359 360 361 /* 362 * InsertSelectMethod represents the method to use for INSERT INTO ... SELECT 363 * queries. 364 * 365 * Note that there is a third method which is not represented here, which is 366 * pushing down the INSERT INTO ... SELECT to workers. This method is executed 367 * similar to other distributed queries and doesn't need a special execution 368 * code, so we don't need to represent it here. 369 */ 370 typedef enum InsertSelectMethod 371 { 372 INSERT_SELECT_VIA_COORDINATOR, 373 INSERT_SELECT_REPARTITION 374 } InsertSelectMethod; 375 376 377 /* 378 * DistributedPlan contains all information necessary to execute a 379 * distribute query. 380 */ 381 typedef struct DistributedPlan 382 { 383 CitusNode type; 384 385 /* unique identifier of the plan within the session */ 386 uint64 planId; 387 388 /* specifies nature of modifications in query */ 389 RowModifyLevel modLevel; 390 391 /* 392 * specifies whether plan returns results, 393 * either as a SELECT or a DML which has RETURNING. 394 */ 395 bool expectResults; 396 397 /* job tree containing the tasks to be executed on workers */ 398 Job *workerJob; 399 400 /* local query that merges results from the workers */ 401 Query *combineQuery; 402 403 /* query identifier (copied from the top-level PlannedStmt) */ 404 uint64 queryId; 405 406 /* which relations are accessed by this distributed plan */ 407 List *relationIdList; 408 409 /* target relation of a modification */ 410 Oid targetRelationId; 411 412 /* 413 * INSERT .. SELECT via the coordinator or repartition */ 414 Query *insertSelectQuery; 415 PlannedStmt *selectPlanForInsertSelect; 416 InsertSelectMethod insertSelectMethod; 417 418 /* 419 * If intermediateResultIdPrefix is non-null, an INSERT ... SELECT 420 * via the coordinator is written to a set of intermediate results 421 * named according to <intermediateResultIdPrefix>_<anchorShardId>. 422 * That way we can run a distributed INSERT ... SELECT with 423 * RETURNING or ON CONFLICT from the intermediate results to the 424 * target relation. 425 */ 426 char *intermediateResultIdPrefix; 427 428 /* list of subplans to execute before the distributed query */ 429 List *subPlanList; 430 431 /* 432 * List of subPlans that are used in the DistributedPlan 433 * Note that this is different that "subPlanList" field which 434 * contains the subplans generated as part of the DistributedPlan. 435 * 436 * On the other hand, usedSubPlanNodeList keeps track of which subPlans 437 * are used within this distributed plan as a list of 438 * UsedDistributedSubPlan pointers. 439 * 440 * The list may contain duplicates if the subplan is referenced multiple 441 * times (e.g. a CTE appears in the query tree multiple times). 442 */ 443 List *usedSubPlanNodeList; 444 445 /* 446 * When the query is very simple such that we don't need to call 447 * standard_planner(). See FastPathRouterQuery() for the definition. 448 */ 449 bool fastPathRouterPlan; 450 451 /* number of times this plan has been used (as a prepared statement) */ 452 uint32 numberOfTimesExecuted; 453 454 /* 455 * NULL if this a valid plan, an error description otherwise. This will 456 * e.g. be set if SQL features are present that a planner doesn't support, 457 * or if prepared statement parameters prevented successful planning. 458 */ 459 DeferredErrorMessage *planningError; 460 } DistributedPlan; 461 462 463 /* 464 * DistributedSubPlan contains a subplan of a distributed plan. Subplans are 465 * executed before the distributed query and their results are written to 466 * temporary files. This is used to execute CTEs and subquery joins that 467 * cannot be distributed. 468 */ 469 typedef struct DistributedSubPlan 470 { 471 CitusNode type; 472 473 uint32 subPlanId; 474 PlannedStmt *plan; 475 476 /* EXPLAIN ANALYZE instrumentations */ 477 uint64 bytesSentPerWorker; 478 uint32 remoteWorkerCount; 479 double durationMillisecs; 480 bool writeLocalFile; 481 } DistributedSubPlan; 482 483 484 /* defines how a subplan is used by a distributed query */ 485 typedef enum SubPlanAccessType 486 { 487 SUBPLAN_ACCESS_NONE, 488 SUBPLAN_ACCESS_LOCAL, 489 SUBPLAN_ACCESS_REMOTE, 490 SUBPLAN_ACCESS_ANYWHERE 491 } SubPlanAccessType; 492 493 494 /* 495 * UsedDistributedSubPlan contains information about a subPlan that is used in a 496 * distributed plan. 497 */ 498 typedef struct UsedDistributedSubPlan 499 { 500 CitusNode type; 501 502 /* subplan used by the distributed query */ 503 char *subPlanId; 504 505 /* how the subplan is used by a distributed query */ 506 SubPlanAccessType accessType; 507 } UsedDistributedSubPlan; 508 509 510 /* OperatorCacheEntry contains information for each element in OperatorCache */ 511 typedef struct OperatorCacheEntry 512 { 513 /* cache key consists of typeId, accessMethodId and strategyNumber */ 514 Oid typeId; 515 Oid accessMethodId; 516 int16 strategyNumber; 517 Oid operatorId; 518 Oid operatorClassInputType; 519 char typeType; 520 } OperatorCacheEntry; 521 522 523 /* Named function pointer type for reordering Task lists */ 524 typedef List *(*ReorderFunction)(List *); 525 526 527 /* Config variable managed via guc.c */ 528 extern int TaskAssignmentPolicy; 529 extern bool EnableUniqueJobIds; 530 531 532 /* Function declarations for building physical plans and constructing queries */ 533 extern DistributedPlan * CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, 534 PlannerRestrictionContext * 535 plannerRestrictionContext); 536 extern Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, 537 char *queryString); 538 539 extern OpExpr * MakeOpExpression(Var *variable, int16 strategyNumber); 540 extern Node * WrapUngroupedVarsInAnyValueAggregate(Node *expression, 541 List *groupClauseList, 542 List *targetList, 543 bool checkExpressionEquality); 544 extern CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType); 545 546 /* 547 * Function declarations for building, updating constraints and simple operator 548 * expression check. 549 */ 550 extern Node * BuildBaseConstraint(Var *column); 551 extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval); 552 extern bool BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOperand); 553 extern bool SimpleOpExpression(Expr *clause); 554 555 /* helper functions */ 556 extern Var * MakeInt4Column(void); 557 extern int CompareShardPlacements(const void *leftElement, const void *rightElement); 558 extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, 559 ShardInterval *secondInterval); 560 extern bool ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax, 561 Datum secondMin, Datum secondMax, 562 FmgrInfo *comparisonFunction, 563 Oid collation); 564 extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); 565 extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount); 566 extern RowModifyLevel RowModifyLevelForQuery(Query *query); 567 extern StringInfo ArrayObjectToString(ArrayType *arrayObject, 568 Oid columnType, int32 columnTypeMod); 569 570 571 /* function declarations for Task and Task list operations */ 572 extern bool TasksEqual(const Task *a, const Task *b); 573 extern bool TaskListMember(const List *taskList, const Task *task); 574 extern List * TaskListDifference(const List *list1, const List *list2); 575 extern List * AssignAnchorShardTaskList(List *taskList); 576 extern List * FirstReplicaAssignTaskList(List *taskList); 577 extern List * RoundRobinAssignTaskList(List *taskList); 578 extern List * RoundRobinReorder(List *placementList); 579 extern void SetPlacementNodeMetadata(ShardPlacement *placement, WorkerNode *workerNode); 580 extern int CompareTasksByTaskId(const void *leftElement, const void *rightElement); 581 extern int CompareTasksByExecutionDuration(const void *leftElement, const 582 void *rightElement); 583 584 /* function declaration for creating Task */ 585 extern List * QueryPushdownSqlTaskList(Query *query, uint64 jobId, 586 RelationRestrictionContext * 587 relationRestrictionContext, 588 List *prunedRelationShardList, TaskType taskType, 589 bool modifyRequiresCoordinatorEvaluation, 590 DeferredErrorMessage **planningError); 591 592 extern bool ModifyLocalTableJob(Job *job); 593 594 /* function declarations for managing jobs */ 595 extern uint64 UniqueJobId(void); 596 597 598 extern List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId); 599 extern RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *columnList, 600 List *tableIdList, 601 List *funcColumnNames, 602 List *funcColumnTypes, 603 List *funcColumnTypeMods, 604 List *funcCollations); 605 606 extern List * FetchEqualityAttrNumsForRTE(Node *quals); 607 608 #endif /* MULTI_PHYSICAL_PLANNER_H */ 609