1 /*-------------------------------------------------------------------------
2 *
3 * citus_custom_scan.c
4 *
5 * Definitions of custom scan methods for all executor types.
6 *
7 * Copyright (c) Citus Data, Inc.
8 *-------------------------------------------------------------------------
9 */
10 #include "postgres.h"
11
12 #include "distributed/pg_version_constants.h"
13
14 #include "miscadmin.h"
15
16 #include "commands/copy.h"
17 #include "distributed/backend_data.h"
18 #include "distributed/citus_clauses.h"
19 #include "distributed/citus_custom_scan.h"
20 #include "distributed/citus_nodefuncs.h"
21 #include "distributed/citus_ruleutils.h"
22 #include "distributed/connection_management.h"
23 #include "distributed/deparse_shard_query.h"
24 #include "distributed/distributed_execution_locks.h"
25 #include "distributed/insert_select_executor.h"
26 #include "distributed/insert_select_planner.h"
27 #include "distributed/listutils.h"
28 #include "distributed/local_executor.h"
29 #include "distributed/local_plan_cache.h"
30 #include "distributed/multi_executor.h"
31 #include "distributed/multi_server_executor.h"
32 #include "distributed/multi_router_planner.h"
33 #include "distributed/query_stats.h"
34 #include "distributed/subplan_execution.h"
35 #include "distributed/worker_log_messages.h"
36 #include "distributed/worker_protocol.h"
37 #include "executor/executor.h"
38 #include "nodes/makefuncs.h"
39 #include "optimizer/optimizer.h"
40 #include "optimizer/clauses.h"
41 #include "utils/memutils.h"
42 #include "utils/rel.h"
43
44
45 /* functions for creating custom scan nodes */
46 static Node * AdaptiveExecutorCreateScan(CustomScan *scan);
47 static Node * NonPushableInsertSelectCreateScan(CustomScan *scan);
48 static Node * DelayedErrorCreateScan(CustomScan *scan);
49
50 /* functions that are common to different scans */
51 static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
52 static void CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags);
53 static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags);
54 static void CitusPreExecScan(CitusScanState *scanState);
55 static bool ModifyJobNeedsEvaluation(Job *workerJob);
56 static void RegenerateTaskForFasthPathQuery(Job *workerJob);
57 static void RegenerateTaskListForInsert(Job *workerJob);
58 static DistributedPlan * CopyDistributedPlanWithoutCache(
59 DistributedPlan *originalDistributedPlan);
60 static void CitusEndScan(CustomScanState *node);
61 static void CitusReScan(CustomScanState *node);
62
63
64 /* create custom scan methods for all executors */
65 CustomScanMethods AdaptiveExecutorCustomScanMethods = {
66 "Citus Adaptive",
67 AdaptiveExecutorCreateScan
68 };
69
70 CustomScanMethods NonPushableInsertSelectCustomScanMethods = {
71 "Citus INSERT ... SELECT",
72 NonPushableInsertSelectCreateScan
73 };
74
75 CustomScanMethods DelayedErrorCustomScanMethods = {
76 "Citus Delayed Error",
77 DelayedErrorCreateScan
78 };
79
80
81 /*
82 * Define executor methods for the different executor types.
83 */
84 static CustomExecMethods AdaptiveExecutorCustomExecMethods = {
85 .CustomName = "AdaptiveExecutorScan",
86 .BeginCustomScan = CitusBeginScan,
87 .ExecCustomScan = CitusExecScan,
88 .EndCustomScan = CitusEndScan,
89 .ReScanCustomScan = CitusReScan,
90 .ExplainCustomScan = CitusExplainScan
91 };
92
93 static CustomExecMethods NonPushableInsertSelectCustomExecMethods = {
94 .CustomName = "NonPushableInsertSelectScan",
95 .BeginCustomScan = CitusBeginScan,
96 .ExecCustomScan = NonPushableInsertSelectExecScan,
97 .EndCustomScan = CitusEndScan,
98 .ReScanCustomScan = CitusReScan,
99 .ExplainCustomScan = NonPushableInsertSelectExplainScan
100 };
101
102
103 /*
104 * IsCitusCustomState returns if a given PlanState node is a CitusCustomState node.
105 */
106 bool
IsCitusCustomState(PlanState * planState)107 IsCitusCustomState(PlanState *planState)
108 {
109 if (!IsA(planState, CustomScanState))
110 {
111 return false;
112 }
113
114 CustomScanState *css = castNode(CustomScanState, planState);
115 if (css->methods == &AdaptiveExecutorCustomExecMethods ||
116 css->methods == &NonPushableInsertSelectCustomExecMethods)
117 {
118 return true;
119 }
120
121 return false;
122 }
123
124
125 /*
126 * Let PostgreSQL know about Citus' custom scan nodes.
127 */
128 void
RegisterCitusCustomScanMethods(void)129 RegisterCitusCustomScanMethods(void)
130 {
131 RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);
132 RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods);
133 RegisterCustomScanMethods(&DelayedErrorCustomScanMethods);
134 }
135
136
137 /*
138 * CitusBeginScan sets the coordinator backend initiated by Citus for queries using
139 * that function as the BeginCustomScan callback.
140 *
141 * The function also handles deferred shard pruning along with function evaluations.
142 */
143 static void
CitusBeginScan(CustomScanState * node,EState * estate,int eflags)144 CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
145 {
146 MarkCitusInitiatedCoordinatorBackend();
147
148 CitusScanState *scanState = (CitusScanState *) node;
149
150 /*
151 * Make sure we can see notices during regular queries, which would typically
152 * be the result of a function that raises a notices being called.
153 */
154 EnableWorkerMessagePropagation();
155
156
157 /*
158 * Since we are using a tuplestore we cannot use the virtual tuples postgres had
159 * already setup on the CustomScan. Instead we need to reinitialize the tuples as
160 * minimal.
161 *
162 * During initialization postgres also created the projection information and the
163 * quals, but both are 'compiled' to be executed on virtual tuples. Since we replaced
164 * the tuples with minimal tuples we also compile both the projection and the quals
165 * on to these 'new' tuples.
166 */
167 ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple);
168
169 ExecInitScanTupleSlot(node->ss.ps.state, &node->ss, node->ss.ps.scandesc,
170 &TTSOpsMinimalTuple);
171 ExecAssignScanProjectionInfoWithVarno(&node->ss, INDEX_VAR);
172
173 node->ss.ps.qual = ExecInitQual(node->ss.ps.plan->qual, (PlanState *) node);
174
175 DistributedPlan *distributedPlan = scanState->distributedPlan;
176 if (distributedPlan->insertSelectQuery != NULL)
177 {
178 /*
179 * INSERT..SELECT via coordinator or re-partitioning are special because
180 * the SELECT part is planned separately.
181 */
182 return;
183 }
184 else if (distributedPlan->modLevel == ROW_MODIFY_READONLY)
185 {
186 CitusBeginReadOnlyScan(node, estate, eflags);
187 }
188 else
189 {
190 CitusBeginModifyScan(node, estate, eflags);
191 }
192
193 /*
194 * In case of a prepared statement, we will see this distributed plan again
195 * on the next execution with a higher usage counter.
196 */
197 distributedPlan->numberOfTimesExecuted++;
198 }
199
200
201 /*
202 * CitusPreExecScan is called right before postgres' executor starts pulling tuples.
203 */
204 static void
CitusPreExecScan(CitusScanState * scanState)205 CitusPreExecScan(CitusScanState *scanState)
206 {
207 AdaptiveExecutorPreExecutorRun(scanState);
208 }
209
210
211 /*
212 * CitusExecScan is called when a tuple is pulled from a custom scan.
213 * On the first call, it executes the distributed query and writes the
214 * results to a tuple store. The postgres executor calls this function
215 * repeatedly to read tuples from the tuple store.
216 */
217 TupleTableSlot *
CitusExecScan(CustomScanState * node)218 CitusExecScan(CustomScanState *node)
219 {
220 CitusScanState *scanState = (CitusScanState *) node;
221
222 if (!scanState->finishedRemoteScan)
223 {
224 AdaptiveExecutor(scanState);
225
226 scanState->finishedRemoteScan = true;
227 }
228
229 return ReturnTupleFromTuplestore(scanState);
230 }
231
232
233 /*
234 * CitusBeginReadOnlyScan handles deferred pruning and plan caching for SELECTs.
235 */
236 static void
CitusBeginReadOnlyScan(CustomScanState * node,EState * estate,int eflags)237 CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags)
238 {
239 CitusScanState *scanState = (CitusScanState *) node;
240 DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
241
242 Assert(originalDistributedPlan->workerJob->jobQuery->commandType == CMD_SELECT);
243
244 if (!originalDistributedPlan->workerJob->deferredPruning)
245 {
246 /*
247 * For SELECT queries that have already been pruned we can proceed straight
248 * to execution, since none of the prepared statement logic applies.
249 */
250 return;
251 }
252
253 /*
254 * Create a copy of the generic plan for the current execution, but make a shallow
255 * copy of the plan cache. That means we'll be able to access the plan cache via
256 * currentPlan->workerJob->localPlannedStatements, but it will be preserved across
257 * executions by the prepared statement logic.
258 */
259 DistributedPlan *currentPlan =
260 CopyDistributedPlanWithoutCache(originalDistributedPlan);
261 scanState->distributedPlan = currentPlan;
262
263 Job *workerJob = currentPlan->workerJob;
264 Query *jobQuery = workerJob->jobQuery;
265 PlanState *planState = &(scanState->customScanState.ss.ps);
266
267 /*
268 * We only do deferred pruning for fast path queries, which have a single
269 * partition column value.
270 */
271 Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner);
272
273 /*
274 * Evaluate parameters, because the parameters are only available on the
275 * coordinator and are required for pruning.
276 *
277 * We don't evaluate functions for read-only queries on the coordinator
278 * at the moment. Most function calls would be in a context where they
279 * should be re-evaluated for every row in case of volatile functions.
280 *
281 * TODO: evaluate stable functions
282 */
283 ExecuteCoordinatorEvaluableExpressions(jobQuery, planState);
284
285 /* job query no longer has parameters, so we should not send any */
286 workerJob->parametersInJobQueryResolved = true;
287
288 /* parameters are filled in, so we can generate a task for this execution */
289 RegenerateTaskForFasthPathQuery(workerJob);
290
291 if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan))
292 {
293 Task *task = linitial(workerJob->taskList);
294
295 /*
296 * We are going to execute this task locally. If it's not already in
297 * the cache, create a local plan now and add it to the cache. During
298 * execution, we will get the plan from the cache.
299 *
300 * The plan will be cached across executions when originalDistributedPlan
301 * represents a prepared statement.
302 */
303 CacheLocalPlanForShardQuery(task, originalDistributedPlan,
304 estate->es_param_list_info);
305 }
306 }
307
308
309 /*
310 * CitusBeginModifyScan prepares the scan state for a modification.
311 *
312 * Modifications are special because:
313 * a) we evaluate function calls (e.g. nextval) here and the outcome may
314 * determine which shards are affected by this query.
315 * b) we need to take metadata locks to make sure no write is left behind
316 * when finalizing a shard move.
317 */
318 static void
CitusBeginModifyScan(CustomScanState * node,EState * estate,int eflags)319 CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
320 {
321 CitusScanState *scanState = (CitusScanState *) node;
322 PlanState *planState = &(scanState->customScanState.ss.ps);
323 DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
324
325 MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
326 "CitusBeginModifyScan",
327 ALLOCSET_DEFAULT_SIZES);
328 MemoryContext oldContext = MemoryContextSwitchTo(localContext);
329
330 DistributedPlan *currentPlan =
331 CopyDistributedPlanWithoutCache(originalDistributedPlan);
332 scanState->distributedPlan = currentPlan;
333
334 Job *workerJob = currentPlan->workerJob;
335
336 Query *jobQuery = workerJob->jobQuery;
337
338 if (ModifyJobNeedsEvaluation(workerJob))
339 {
340 ExecuteCoordinatorEvaluableExpressions(jobQuery, planState);
341
342 /* job query no longer has parameters, so we should not send any */
343 workerJob->parametersInJobQueryResolved = true;
344 }
345
346 if (workerJob->deferredPruning)
347 {
348 /*
349 * At this point, we're about to do the shard pruning for fast-path queries.
350 * Given that pruning is deferred always for INSERTs, we get here
351 * !EnableFastPathRouterPlanner as well. Given that INSERT statements with
352 * CTEs/sublinks etc are not eligible for fast-path router plan, we get here
353 * jobQuery->commandType == CMD_INSERT as well.
354 */
355 Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner ||
356 jobQuery->commandType == CMD_INSERT);
357
358 /*
359 * We can only now decide which shard to use, so we need to build a new task
360 * list.
361 */
362 if (jobQuery->commandType == CMD_INSERT)
363 {
364 RegenerateTaskListForInsert(workerJob);
365 }
366 else
367 {
368 RegenerateTaskForFasthPathQuery(workerJob);
369 }
370 }
371 else if (workerJob->requiresCoordinatorEvaluation)
372 {
373 /*
374 * When there is no deferred pruning, but we did evaluate functions, then
375 * we only rebuild the query strings in the existing tasks.
376 */
377 RebuildQueryStrings(workerJob);
378 }
379
380
381 /* We skip shard related things if the job contains only local tables */
382 if (!ModifyLocalTableJob(workerJob))
383 {
384 /*
385 * Now that we know the shard ID(s) we can acquire the necessary shard metadata
386 * locks. Once we have the locks it's safe to load the placement metadata.
387 */
388
389 /* prevent concurrent placement changes */
390 AcquireMetadataLocks(workerJob->taskList);
391
392 /* modify tasks are always assigned using first-replica policy */
393 workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
394 }
395
396
397 /*
398 * Now that we have populated the task placements we can determine whether
399 * any of them are local to this node and cache a plan if needed.
400 */
401 if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan))
402 {
403 Task *task = linitial(workerJob->taskList);
404
405 /*
406 * We are going to execute this task locally. If it's not already in
407 * the cache, create a local plan now and add it to the cache. During
408 * execution, we will get the plan from the cache.
409 *
410 * WARNING: In this function we'll use the original plan with the original
411 * query tree, meaning parameters and function calls are back and we'll
412 * redo evaluation in the local (Postgres) executor. The reason we do this
413 * is that we only need to cache one generic plan per shard.
414 *
415 * The plan will be cached across executions when originalDistributedPlan
416 * represents a prepared statement.
417 */
418 CacheLocalPlanForShardQuery(task, originalDistributedPlan,
419 estate->es_param_list_info);
420 }
421
422 MemoryContextSwitchTo(oldContext);
423 }
424
425
426 /*
427 * ModifyJobNeedsEvaluation checks whether the functions and parameters in the job query
428 * need to be evaluated before we can build task query strings.
429 */
430 static bool
ModifyJobNeedsEvaluation(Job * workerJob)431 ModifyJobNeedsEvaluation(Job *workerJob)
432 {
433 if (workerJob->requiresCoordinatorEvaluation)
434 {
435 /* query contains functions that need to be evaluated on the coordinator */
436 return true;
437 }
438
439 if (workerJob->partitionKeyValue != NULL)
440 {
441 /* the value of the distribution column is already known */
442 return false;
443 }
444
445 /* pruning was deferred due to a parameter in the partition column */
446 return workerJob->deferredPruning;
447 }
448
449
450 /*
451 * CopyDistributedPlanWithoutCache is a helper function which copies the
452 * distributedPlan into the current memory context.
453 *
454 * We must not change the distributed plan since it may be reused across multiple
455 * executions of a prepared statement. Instead we create a deep copy that we only
456 * use for the current execution.
457 *
458 * We also exclude localPlannedStatements from the copyObject call for performance
459 * reasons, as they are immutable, so no need to have a deep copy.
460 */
461 static DistributedPlan *
CopyDistributedPlanWithoutCache(DistributedPlan * originalDistributedPlan)462 CopyDistributedPlanWithoutCache(DistributedPlan *originalDistributedPlan)
463 {
464 List *localPlannedStatements =
465 originalDistributedPlan->workerJob->localPlannedStatements;
466 originalDistributedPlan->workerJob->localPlannedStatements = NIL;
467
468 DistributedPlan *distributedPlan = copyObject(originalDistributedPlan);
469
470 /* set back the immutable field */
471 originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
472 distributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
473
474 return distributedPlan;
475 }
476
477
478 /*
479 * RegenerateTaskListForInsert does the shard pruning for an INSERT query
480 * queries and rebuilds the query strings.
481 */
482 static void
RegenerateTaskListForInsert(Job * workerJob)483 RegenerateTaskListForInsert(Job *workerJob)
484 {
485 Query *jobQuery = workerJob->jobQuery;
486 bool parametersInJobQueryResolved = workerJob->parametersInJobQueryResolved;
487 DeferredErrorMessage *planningError = NULL;
488
489 /* need to perform shard pruning, rebuild the task list from scratch */
490 List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved,
491 &planningError);
492 if (planningError != NULL)
493 {
494 RaiseDeferredError(planningError, ERROR);
495 }
496
497 workerJob->taskList = taskList;
498
499 if (workerJob->partitionKeyValue == NULL)
500 {
501 /*
502 * If we were not able to determine the partition key value in the planner,
503 * take another shot now. It may still be NULL in case of a multi-row
504 * insert.
505 */
506 workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
507 }
508
509 RebuildQueryStrings(workerJob);
510 }
511
512
513 /*
514 * RegenerateTaskForFasthPathQuery does the shard pruning for
515 * UPDATE/DELETE/SELECT fast path router queries and rebuilds the query strings.
516 */
517 static void
RegenerateTaskForFasthPathQuery(Job * workerJob)518 RegenerateTaskForFasthPathQuery(Job *workerJob)
519 {
520 bool isMultiShardQuery = false;
521 List *shardIntervalList =
522 TargetShardIntervalForFastPathQuery(workerJob->jobQuery,
523 &isMultiShardQuery, NULL,
524 &workerJob->partitionKeyValue);
525
526 /*
527 * A fast-path router query can only yield multiple shards when the parameter
528 * cannot be resolved properly, which can be triggered by SQL function.
529 */
530 if (isMultiShardQuery)
531 {
532 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
533 errmsg("cannot perform distributed planning on this "
534 "query because parameterized queries for SQL "
535 "functions referencing distributed tables are "
536 "not supported"),
537 errhint("Consider using PL/pgSQL functions instead.")));
538 }
539
540 bool shardsPresent = false;
541 List *relationShardList =
542 RelationShardListForShardIntervalList(shardIntervalList, &shardsPresent);
543
544 UpdateRelationToShardNames((Node *) workerJob->jobQuery, relationShardList);
545
546 /* fast path queries cannot have local tables */
547 bool hasLocalRelation = false;
548
549 List *placementList =
550 CreateTaskPlacementListForShardIntervals(shardIntervalList, shardsPresent, true,
551 hasLocalRelation);
552 uint64 shardId = INVALID_SHARD_ID;
553
554 if (shardsPresent)
555 {
556 shardId = GetAnchorShardId(shardIntervalList);
557 }
558
559 bool isLocalTableModification = false;
560 GenerateSingleShardRouterTaskList(workerJob,
561 relationShardList,
562 placementList,
563 shardId,
564 isLocalTableModification);
565 }
566
567
568 /*
569 * AdaptiveExecutorCreateScan creates the scan state for the adaptive executor.
570 */
571 static Node *
AdaptiveExecutorCreateScan(CustomScan * scan)572 AdaptiveExecutorCreateScan(CustomScan *scan)
573 {
574 CitusScanState *scanState = palloc0(sizeof(CitusScanState));
575
576 scanState->executorType = MULTI_EXECUTOR_ADAPTIVE;
577 scanState->customScanState.ss.ps.type = T_CustomScanState;
578 scanState->distributedPlan = GetDistributedPlan(scan);
579
580 scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
581 scanState->PreExecScan = &CitusPreExecScan;
582
583 scanState->finishedPreScan = false;
584 scanState->finishedRemoteScan = false;
585
586 return (Node *) scanState;
587 }
588
589
590 /*
591 * NonPushableInsertSelectCrateScan creates the scan state for executing
592 * INSERT..SELECT into a distributed table via the coordinator.
593 */
594 static Node *
NonPushableInsertSelectCreateScan(CustomScan * scan)595 NonPushableInsertSelectCreateScan(CustomScan *scan)
596 {
597 CitusScanState *scanState = palloc0(sizeof(CitusScanState));
598
599 scanState->executorType = MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT;
600 scanState->customScanState.ss.ps.type = T_CustomScanState;
601 scanState->distributedPlan = GetDistributedPlan(scan);
602
603 scanState->customScanState.methods =
604 &NonPushableInsertSelectCustomExecMethods;
605
606 scanState->finishedPreScan = false;
607 scanState->finishedRemoteScan = false;
608
609 return (Node *) scanState;
610 }
611
612
613 /*
614 * DelayedErrorCreateScan is only called if we could not plan for the given
615 * query. This is the case when a plan is not ready for execution because
616 * CreateDistributedPlan() couldn't find a plan due to unresolved prepared
617 * statement parameters, but didn't error out, because we expect custom plans
618 * to come to our rescue. But sql (not plpgsql) functions unfortunately don't
619 * go through a codepath supporting custom plans. Here, we error out with this
620 * delayed error message.
621 */
622 static Node *
DelayedErrorCreateScan(CustomScan * scan)623 DelayedErrorCreateScan(CustomScan *scan)
624 {
625 DistributedPlan *distributedPlan = GetDistributedPlan(scan);
626
627 /* raise the deferred error */
628 RaiseDeferredError(distributedPlan->planningError, ERROR);
629
630 return NULL;
631 }
632
633
634 /*
635 * CitusEndScan is used to clean up tuple store of the given custom scan state.
636 */
637 static void
CitusEndScan(CustomScanState * node)638 CitusEndScan(CustomScanState *node)
639 {
640 CitusScanState *scanState = (CitusScanState *) node;
641 Job *workerJob = scanState->distributedPlan->workerJob;
642 uint64 queryId = scanState->distributedPlan->queryId;
643 MultiExecutorType executorType = scanState->executorType;
644 Const *partitionKeyConst = NULL;
645 char *partitionKeyString = NULL;
646
647 /* stop propagating notices */
648 DisableWorkerMessagePropagation();
649
650 /*
651 * Check whether we received warnings that should not have been
652 * ignored.
653 */
654 ErrorIfWorkerErrorIndicationReceived();
655
656 if (workerJob != NULL)
657 {
658 partitionKeyConst = workerJob->partitionKeyValue;
659 }
660
661 /* queryId is not set if pg_stat_statements is not installed */
662 if (queryId != 0)
663 {
664 if (partitionKeyConst != NULL && executorType == MULTI_EXECUTOR_ADAPTIVE)
665 {
666 partitionKeyString = DatumToString(partitionKeyConst->constvalue,
667 partitionKeyConst->consttype);
668 }
669
670 /* queries without partition key are also recorded */
671 CitusQueryStatsExecutorsEntry(queryId, executorType, partitionKeyString);
672 }
673
674 if (scanState->tuplestorestate)
675 {
676 tuplestore_end(scanState->tuplestorestate);
677 scanState->tuplestorestate = NULL;
678 }
679 }
680
681
682 /*
683 * CitusReScan is not normally called, except in certain cases of
684 * DECLARE .. CURSOR WITH HOLD ..
685 */
686 static void
CitusReScan(CustomScanState * node)687 CitusReScan(CustomScanState *node)
688 {
689 CitusScanState *scanState = (CitusScanState *) node;
690 Job *workerJob = scanState->distributedPlan->workerJob;
691 EState *executorState = ScanStateGetExecutorState(scanState);
692 ParamListInfo paramListInfo = executorState->es_param_list_info;
693
694 if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved)
695 {
696 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
697 errmsg("Cursors for queries on distributed tables with "
698 "parameters are currently unsupported")));
699 }
700 }
701
702
703 /*
704 * ScanStateGetTupleDescriptor returns the tuple descriptor for the given
705 * scan state.
706 */
707 TupleDesc
ScanStateGetTupleDescriptor(CitusScanState * scanState)708 ScanStateGetTupleDescriptor(CitusScanState *scanState)
709 {
710 return scanState->customScanState.ss.ss_ScanTupleSlot->tts_tupleDescriptor;
711 }
712
713
714 /*
715 * ScanStateGetExecutorState returns the executor state for the given scan
716 * state.
717 */
718 EState *
ScanStateGetExecutorState(CitusScanState * scanState)719 ScanStateGetExecutorState(CitusScanState *scanState)
720 {
721 return scanState->customScanState.ss.ps.state;
722 }
723
724
725 /*
726 * FetchCitusCustomScanIfExists traverses a given plan and returns a Citus CustomScan
727 * if it has any.
728 */
729 CustomScan *
FetchCitusCustomScanIfExists(Plan * plan)730 FetchCitusCustomScanIfExists(Plan *plan)
731 {
732 if (plan == NULL)
733 {
734 return NULL;
735 }
736
737 if (IsCitusCustomScan(plan))
738 {
739 return (CustomScan *) plan;
740 }
741
742 CustomScan *customScan = FetchCitusCustomScanIfExists(plan->lefttree);
743
744 if (customScan == NULL)
745 {
746 customScan = FetchCitusCustomScanIfExists(plan->righttree);
747 }
748
749 return customScan;
750 }
751
752
753 /*
754 * IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus
755 * by recursively walking through the plan tree.
756 */
757 bool
IsCitusPlan(Plan * plan)758 IsCitusPlan(Plan *plan)
759 {
760 if (plan == NULL)
761 {
762 return false;
763 }
764
765 if (IsCitusCustomScan(plan))
766 {
767 return true;
768 }
769
770 return IsCitusPlan(plan->lefttree) || IsCitusPlan(plan->righttree);
771 }
772
773
774 /*
775 * IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus.
776 */
777 bool
IsCitusCustomScan(Plan * plan)778 IsCitusCustomScan(Plan *plan)
779 {
780 if (plan == NULL)
781 {
782 return false;
783 }
784
785 if (!IsA(plan, CustomScan))
786 {
787 return false;
788 }
789
790 CustomScan *customScan = (CustomScan *) plan;
791 if (list_length(customScan->custom_private) == 0)
792 {
793 return false;
794 }
795
796 Node *privateNode = (Node *) linitial(customScan->custom_private);
797 if (!CitusIsA(privateNode, DistributedPlan))
798 {
799 return false;
800 }
801
802 return true;
803 }
804