1 /*-------------------------------------------------------------------------
2  *
3  * citus_custom_scan.c
4  *
5  * Definitions of custom scan methods for all executor types.
6  *
7  * Copyright (c) Citus Data, Inc.
8  *-------------------------------------------------------------------------
9  */
10 #include "postgres.h"
11 
12 #include "distributed/pg_version_constants.h"
13 
14 #include "miscadmin.h"
15 
16 #include "commands/copy.h"
17 #include "distributed/backend_data.h"
18 #include "distributed/citus_clauses.h"
19 #include "distributed/citus_custom_scan.h"
20 #include "distributed/citus_nodefuncs.h"
21 #include "distributed/citus_ruleutils.h"
22 #include "distributed/connection_management.h"
23 #include "distributed/deparse_shard_query.h"
24 #include "distributed/distributed_execution_locks.h"
25 #include "distributed/insert_select_executor.h"
26 #include "distributed/insert_select_planner.h"
27 #include "distributed/listutils.h"
28 #include "distributed/local_executor.h"
29 #include "distributed/local_plan_cache.h"
30 #include "distributed/multi_executor.h"
31 #include "distributed/multi_server_executor.h"
32 #include "distributed/multi_router_planner.h"
33 #include "distributed/query_stats.h"
34 #include "distributed/subplan_execution.h"
35 #include "distributed/worker_log_messages.h"
36 #include "distributed/worker_protocol.h"
37 #include "executor/executor.h"
38 #include "nodes/makefuncs.h"
39 #include "optimizer/optimizer.h"
40 #include "optimizer/clauses.h"
41 #include "utils/memutils.h"
42 #include "utils/rel.h"
43 
44 
45 /* functions for creating custom scan nodes */
46 static Node * AdaptiveExecutorCreateScan(CustomScan *scan);
47 static Node * NonPushableInsertSelectCreateScan(CustomScan *scan);
48 static Node * DelayedErrorCreateScan(CustomScan *scan);
49 
50 /* functions that are common to different scans */
51 static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
52 static void CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags);
53 static void CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags);
54 static void CitusPreExecScan(CitusScanState *scanState);
55 static bool ModifyJobNeedsEvaluation(Job *workerJob);
56 static void RegenerateTaskForFasthPathQuery(Job *workerJob);
57 static void RegenerateTaskListForInsert(Job *workerJob);
58 static DistributedPlan * CopyDistributedPlanWithoutCache(
59 	DistributedPlan *originalDistributedPlan);
60 static void CitusEndScan(CustomScanState *node);
61 static void CitusReScan(CustomScanState *node);
62 
63 
64 /* create custom scan methods for all executors */
65 CustomScanMethods AdaptiveExecutorCustomScanMethods = {
66 	"Citus Adaptive",
67 	AdaptiveExecutorCreateScan
68 };
69 
70 CustomScanMethods NonPushableInsertSelectCustomScanMethods = {
71 	"Citus INSERT ... SELECT",
72 	NonPushableInsertSelectCreateScan
73 };
74 
75 CustomScanMethods DelayedErrorCustomScanMethods = {
76 	"Citus Delayed Error",
77 	DelayedErrorCreateScan
78 };
79 
80 
81 /*
82  * Define executor methods for the different executor types.
83  */
84 static CustomExecMethods AdaptiveExecutorCustomExecMethods = {
85 	.CustomName = "AdaptiveExecutorScan",
86 	.BeginCustomScan = CitusBeginScan,
87 	.ExecCustomScan = CitusExecScan,
88 	.EndCustomScan = CitusEndScan,
89 	.ReScanCustomScan = CitusReScan,
90 	.ExplainCustomScan = CitusExplainScan
91 };
92 
93 static CustomExecMethods NonPushableInsertSelectCustomExecMethods = {
94 	.CustomName = "NonPushableInsertSelectScan",
95 	.BeginCustomScan = CitusBeginScan,
96 	.ExecCustomScan = NonPushableInsertSelectExecScan,
97 	.EndCustomScan = CitusEndScan,
98 	.ReScanCustomScan = CitusReScan,
99 	.ExplainCustomScan = NonPushableInsertSelectExplainScan
100 };
101 
102 
103 /*
104  * IsCitusCustomState returns if a given PlanState node is a CitusCustomState node.
105  */
106 bool
IsCitusCustomState(PlanState * planState)107 IsCitusCustomState(PlanState *planState)
108 {
109 	if (!IsA(planState, CustomScanState))
110 	{
111 		return false;
112 	}
113 
114 	CustomScanState *css = castNode(CustomScanState, planState);
115 	if (css->methods == &AdaptiveExecutorCustomExecMethods ||
116 		css->methods == &NonPushableInsertSelectCustomExecMethods)
117 	{
118 		return true;
119 	}
120 
121 	return false;
122 }
123 
124 
125 /*
126  * Let PostgreSQL know about Citus' custom scan nodes.
127  */
128 void
RegisterCitusCustomScanMethods(void)129 RegisterCitusCustomScanMethods(void)
130 {
131 	RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);
132 	RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods);
133 	RegisterCustomScanMethods(&DelayedErrorCustomScanMethods);
134 }
135 
136 
137 /*
138  * CitusBeginScan sets the coordinator backend initiated by Citus for queries using
139  * that function as the BeginCustomScan callback.
140  *
141  * The function also handles deferred shard pruning along with function evaluations.
142  */
143 static void
CitusBeginScan(CustomScanState * node,EState * estate,int eflags)144 CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
145 {
146 	MarkCitusInitiatedCoordinatorBackend();
147 
148 	CitusScanState *scanState = (CitusScanState *) node;
149 
150 	/*
151 	 * Make sure we can see notices during regular queries, which would typically
152 	 * be the result of a function that raises a notices being called.
153 	 */
154 	EnableWorkerMessagePropagation();
155 
156 
157 	/*
158 	 * Since we are using a tuplestore we cannot use the virtual tuples postgres had
159 	 * already setup on the CustomScan. Instead we need to reinitialize the tuples as
160 	 * minimal.
161 	 *
162 	 * During initialization postgres also created the projection information and the
163 	 * quals, but both are 'compiled' to be executed on virtual tuples. Since we replaced
164 	 * the tuples with minimal tuples we also compile both the projection and the quals
165 	 * on to these 'new' tuples.
166 	 */
167 	ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple);
168 
169 	ExecInitScanTupleSlot(node->ss.ps.state, &node->ss, node->ss.ps.scandesc,
170 						  &TTSOpsMinimalTuple);
171 	ExecAssignScanProjectionInfoWithVarno(&node->ss, INDEX_VAR);
172 
173 	node->ss.ps.qual = ExecInitQual(node->ss.ps.plan->qual, (PlanState *) node);
174 
175 	DistributedPlan *distributedPlan = scanState->distributedPlan;
176 	if (distributedPlan->insertSelectQuery != NULL)
177 	{
178 		/*
179 		 * INSERT..SELECT via coordinator or re-partitioning are special because
180 		 * the SELECT part is planned separately.
181 		 */
182 		return;
183 	}
184 	else if (distributedPlan->modLevel == ROW_MODIFY_READONLY)
185 	{
186 		CitusBeginReadOnlyScan(node, estate, eflags);
187 	}
188 	else
189 	{
190 		CitusBeginModifyScan(node, estate, eflags);
191 	}
192 
193 	/*
194 	 * In case of a prepared statement, we will see this distributed plan again
195 	 * on the next execution with a higher usage counter.
196 	 */
197 	distributedPlan->numberOfTimesExecuted++;
198 }
199 
200 
201 /*
202  * CitusPreExecScan is called right before postgres' executor starts pulling tuples.
203  */
204 static void
CitusPreExecScan(CitusScanState * scanState)205 CitusPreExecScan(CitusScanState *scanState)
206 {
207 	AdaptiveExecutorPreExecutorRun(scanState);
208 }
209 
210 
211 /*
212  * CitusExecScan is called when a tuple is pulled from a custom scan.
213  * On the first call, it executes the distributed query and writes the
214  * results to a tuple store. The postgres executor calls this function
215  * repeatedly to read tuples from the tuple store.
216  */
217 TupleTableSlot *
CitusExecScan(CustomScanState * node)218 CitusExecScan(CustomScanState *node)
219 {
220 	CitusScanState *scanState = (CitusScanState *) node;
221 
222 	if (!scanState->finishedRemoteScan)
223 	{
224 		AdaptiveExecutor(scanState);
225 
226 		scanState->finishedRemoteScan = true;
227 	}
228 
229 	return ReturnTupleFromTuplestore(scanState);
230 }
231 
232 
233 /*
234  * CitusBeginReadOnlyScan handles deferred pruning and plan caching for SELECTs.
235  */
236 static void
CitusBeginReadOnlyScan(CustomScanState * node,EState * estate,int eflags)237 CitusBeginReadOnlyScan(CustomScanState *node, EState *estate, int eflags)
238 {
239 	CitusScanState *scanState = (CitusScanState *) node;
240 	DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
241 
242 	Assert(originalDistributedPlan->workerJob->jobQuery->commandType == CMD_SELECT);
243 
244 	if (!originalDistributedPlan->workerJob->deferredPruning)
245 	{
246 		/*
247 		 * For SELECT queries that have already been pruned we can proceed straight
248 		 * to execution, since none of the prepared statement logic applies.
249 		 */
250 		return;
251 	}
252 
253 	/*
254 	 * Create a copy of the generic plan for the current execution, but make a shallow
255 	 * copy of the plan cache. That means we'll be able to access the plan cache via
256 	 * currentPlan->workerJob->localPlannedStatements, but it will be preserved across
257 	 * executions by the prepared statement logic.
258 	 */
259 	DistributedPlan *currentPlan =
260 		CopyDistributedPlanWithoutCache(originalDistributedPlan);
261 	scanState->distributedPlan = currentPlan;
262 
263 	Job *workerJob = currentPlan->workerJob;
264 	Query *jobQuery = workerJob->jobQuery;
265 	PlanState *planState = &(scanState->customScanState.ss.ps);
266 
267 	/*
268 	 * We only do deferred pruning for fast path queries, which have a single
269 	 * partition column value.
270 	 */
271 	Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner);
272 
273 	/*
274 	 * Evaluate parameters, because the parameters are only available on the
275 	 * coordinator and are required for pruning.
276 	 *
277 	 * We don't evaluate functions for read-only queries on the coordinator
278 	 * at the moment. Most function calls would be in a context where they
279 	 * should be re-evaluated for every row in case of volatile functions.
280 	 *
281 	 * TODO: evaluate stable functions
282 	 */
283 	ExecuteCoordinatorEvaluableExpressions(jobQuery, planState);
284 
285 	/* job query no longer has parameters, so we should not send any */
286 	workerJob->parametersInJobQueryResolved = true;
287 
288 	/* parameters are filled in, so we can generate a task for this execution */
289 	RegenerateTaskForFasthPathQuery(workerJob);
290 
291 	if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan))
292 	{
293 		Task *task = linitial(workerJob->taskList);
294 
295 		/*
296 		 * We are going to execute this task locally. If it's not already in
297 		 * the cache, create a local plan now and add it to the cache. During
298 		 * execution, we will get the plan from the cache.
299 		 *
300 		 * The plan will be cached across executions when originalDistributedPlan
301 		 * represents a prepared statement.
302 		 */
303 		CacheLocalPlanForShardQuery(task, originalDistributedPlan,
304 									estate->es_param_list_info);
305 	}
306 }
307 
308 
309 /*
310  * CitusBeginModifyScan prepares the scan state for a modification.
311  *
312  * Modifications are special because:
313  * a) we evaluate function calls (e.g. nextval) here and the outcome may
314  *    determine which shards are affected by this query.
315  * b) we need to take metadata locks to make sure no write is left behind
316  *    when finalizing a shard move.
317  */
318 static void
CitusBeginModifyScan(CustomScanState * node,EState * estate,int eflags)319 CitusBeginModifyScan(CustomScanState *node, EState *estate, int eflags)
320 {
321 	CitusScanState *scanState = (CitusScanState *) node;
322 	PlanState *planState = &(scanState->customScanState.ss.ps);
323 	DistributedPlan *originalDistributedPlan = scanState->distributedPlan;
324 
325 	MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
326 													   "CitusBeginModifyScan",
327 													   ALLOCSET_DEFAULT_SIZES);
328 	MemoryContext oldContext = MemoryContextSwitchTo(localContext);
329 
330 	DistributedPlan *currentPlan =
331 		CopyDistributedPlanWithoutCache(originalDistributedPlan);
332 	scanState->distributedPlan = currentPlan;
333 
334 	Job *workerJob = currentPlan->workerJob;
335 
336 	Query *jobQuery = workerJob->jobQuery;
337 
338 	if (ModifyJobNeedsEvaluation(workerJob))
339 	{
340 		ExecuteCoordinatorEvaluableExpressions(jobQuery, planState);
341 
342 		/* job query no longer has parameters, so we should not send any */
343 		workerJob->parametersInJobQueryResolved = true;
344 	}
345 
346 	if (workerJob->deferredPruning)
347 	{
348 		/*
349 		 * At this point, we're about to do the shard pruning for fast-path queries.
350 		 * Given that pruning is deferred always for INSERTs, we get here
351 		 * !EnableFastPathRouterPlanner  as well. Given that INSERT statements with
352 		 * CTEs/sublinks etc are not eligible for fast-path router plan, we get here
353 		 * jobQuery->commandType == CMD_INSERT as well.
354 		 */
355 		Assert(currentPlan->fastPathRouterPlan || !EnableFastPathRouterPlanner ||
356 			   jobQuery->commandType == CMD_INSERT);
357 
358 		/*
359 		 * We can only now decide which shard to use, so we need to build a new task
360 		 * list.
361 		 */
362 		if (jobQuery->commandType == CMD_INSERT)
363 		{
364 			RegenerateTaskListForInsert(workerJob);
365 		}
366 		else
367 		{
368 			RegenerateTaskForFasthPathQuery(workerJob);
369 		}
370 	}
371 	else if (workerJob->requiresCoordinatorEvaluation)
372 	{
373 		/*
374 		 * When there is no deferred pruning, but we did evaluate functions, then
375 		 * we only rebuild the query strings in the existing tasks.
376 		 */
377 		RebuildQueryStrings(workerJob);
378 	}
379 
380 
381 	/* We skip shard related things if the job contains only local tables */
382 	if (!ModifyLocalTableJob(workerJob))
383 	{
384 		/*
385 		 * Now that we know the shard ID(s) we can acquire the necessary shard metadata
386 		 * locks. Once we have the locks it's safe to load the placement metadata.
387 		 */
388 
389 		/* prevent concurrent placement changes */
390 		AcquireMetadataLocks(workerJob->taskList);
391 
392 		/* modify tasks are always assigned using first-replica policy */
393 		workerJob->taskList = FirstReplicaAssignTaskList(workerJob->taskList);
394 	}
395 
396 
397 	/*
398 	 * Now that we have populated the task placements we can determine whether
399 	 * any of them are local to this node and cache a plan if needed.
400 	 */
401 	if (IsLocalPlanCachingSupported(workerJob, originalDistributedPlan))
402 	{
403 		Task *task = linitial(workerJob->taskList);
404 
405 		/*
406 		 * We are going to execute this task locally. If it's not already in
407 		 * the cache, create a local plan now and add it to the cache. During
408 		 * execution, we will get the plan from the cache.
409 		 *
410 		 * WARNING: In this function we'll use the original plan with the original
411 		 * query tree, meaning parameters and function calls are back and we'll
412 		 * redo evaluation in the local (Postgres) executor. The reason we do this
413 		 * is that we only need to cache one generic plan per shard.
414 		 *
415 		 * The plan will be cached across executions when originalDistributedPlan
416 		 * represents a prepared statement.
417 		 */
418 		CacheLocalPlanForShardQuery(task, originalDistributedPlan,
419 									estate->es_param_list_info);
420 	}
421 
422 	MemoryContextSwitchTo(oldContext);
423 }
424 
425 
426 /*
427  * ModifyJobNeedsEvaluation checks whether the functions and parameters in the job query
428  * need to be evaluated before we can build task query strings.
429  */
430 static bool
ModifyJobNeedsEvaluation(Job * workerJob)431 ModifyJobNeedsEvaluation(Job *workerJob)
432 {
433 	if (workerJob->requiresCoordinatorEvaluation)
434 	{
435 		/* query contains functions that need to be evaluated on the coordinator */
436 		return true;
437 	}
438 
439 	if (workerJob->partitionKeyValue != NULL)
440 	{
441 		/* the value of the distribution column is already known */
442 		return false;
443 	}
444 
445 	/* pruning was deferred due to a parameter in the partition column */
446 	return workerJob->deferredPruning;
447 }
448 
449 
450 /*
451  * CopyDistributedPlanWithoutCache is a helper function which copies the
452  * distributedPlan into the current memory context.
453  *
454  * We must not change the distributed plan since it may be reused across multiple
455  * executions of a prepared statement. Instead we create a deep copy that we only
456  * use for the current execution.
457  *
458  * We also exclude localPlannedStatements from the copyObject call for performance
459  * reasons, as they are immutable, so no need to have a deep copy.
460  */
461 static DistributedPlan *
CopyDistributedPlanWithoutCache(DistributedPlan * originalDistributedPlan)462 CopyDistributedPlanWithoutCache(DistributedPlan *originalDistributedPlan)
463 {
464 	List *localPlannedStatements =
465 		originalDistributedPlan->workerJob->localPlannedStatements;
466 	originalDistributedPlan->workerJob->localPlannedStatements = NIL;
467 
468 	DistributedPlan *distributedPlan = copyObject(originalDistributedPlan);
469 
470 	/* set back the immutable field */
471 	originalDistributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
472 	distributedPlan->workerJob->localPlannedStatements = localPlannedStatements;
473 
474 	return distributedPlan;
475 }
476 
477 
478 /*
479  * RegenerateTaskListForInsert does the shard pruning for an INSERT query
480  * queries and rebuilds the query strings.
481  */
482 static void
RegenerateTaskListForInsert(Job * workerJob)483 RegenerateTaskListForInsert(Job *workerJob)
484 {
485 	Query *jobQuery = workerJob->jobQuery;
486 	bool parametersInJobQueryResolved = workerJob->parametersInJobQueryResolved;
487 	DeferredErrorMessage *planningError = NULL;
488 
489 	/* need to perform shard pruning, rebuild the task list from scratch */
490 	List *taskList = RouterInsertTaskList(jobQuery, parametersInJobQueryResolved,
491 										  &planningError);
492 	if (planningError != NULL)
493 	{
494 		RaiseDeferredError(planningError, ERROR);
495 	}
496 
497 	workerJob->taskList = taskList;
498 
499 	if (workerJob->partitionKeyValue == NULL)
500 	{
501 		/*
502 		 * If we were not able to determine the partition key value in the planner,
503 		 * take another shot now. It may still be NULL in case of a multi-row
504 		 * insert.
505 		 */
506 		workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
507 	}
508 
509 	RebuildQueryStrings(workerJob);
510 }
511 
512 
513 /*
514  * RegenerateTaskForFasthPathQuery does the shard pruning for
515  * UPDATE/DELETE/SELECT fast path router queries and rebuilds the query strings.
516  */
517 static void
RegenerateTaskForFasthPathQuery(Job * workerJob)518 RegenerateTaskForFasthPathQuery(Job *workerJob)
519 {
520 	bool isMultiShardQuery = false;
521 	List *shardIntervalList =
522 		TargetShardIntervalForFastPathQuery(workerJob->jobQuery,
523 											&isMultiShardQuery, NULL,
524 											&workerJob->partitionKeyValue);
525 
526 	/*
527 	 * A fast-path router query can only yield multiple shards when the parameter
528 	 * cannot be resolved properly, which can be triggered by SQL function.
529 	 */
530 	if (isMultiShardQuery)
531 	{
532 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
533 						errmsg("cannot perform distributed planning on this "
534 							   "query because parameterized queries for SQL "
535 							   "functions referencing distributed tables are "
536 							   "not supported"),
537 						errhint("Consider using PL/pgSQL functions instead.")));
538 	}
539 
540 	bool shardsPresent = false;
541 	List *relationShardList =
542 		RelationShardListForShardIntervalList(shardIntervalList, &shardsPresent);
543 
544 	UpdateRelationToShardNames((Node *) workerJob->jobQuery, relationShardList);
545 
546 	/* fast path queries cannot have local tables */
547 	bool hasLocalRelation = false;
548 
549 	List *placementList =
550 		CreateTaskPlacementListForShardIntervals(shardIntervalList, shardsPresent, true,
551 												 hasLocalRelation);
552 	uint64 shardId = INVALID_SHARD_ID;
553 
554 	if (shardsPresent)
555 	{
556 		shardId = GetAnchorShardId(shardIntervalList);
557 	}
558 
559 	bool isLocalTableModification = false;
560 	GenerateSingleShardRouterTaskList(workerJob,
561 									  relationShardList,
562 									  placementList,
563 									  shardId,
564 									  isLocalTableModification);
565 }
566 
567 
568 /*
569  * AdaptiveExecutorCreateScan creates the scan state for the adaptive executor.
570  */
571 static Node *
AdaptiveExecutorCreateScan(CustomScan * scan)572 AdaptiveExecutorCreateScan(CustomScan *scan)
573 {
574 	CitusScanState *scanState = palloc0(sizeof(CitusScanState));
575 
576 	scanState->executorType = MULTI_EXECUTOR_ADAPTIVE;
577 	scanState->customScanState.ss.ps.type = T_CustomScanState;
578 	scanState->distributedPlan = GetDistributedPlan(scan);
579 
580 	scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
581 	scanState->PreExecScan = &CitusPreExecScan;
582 
583 	scanState->finishedPreScan = false;
584 	scanState->finishedRemoteScan = false;
585 
586 	return (Node *) scanState;
587 }
588 
589 
590 /*
591  * NonPushableInsertSelectCrateScan creates the scan state for executing
592  * INSERT..SELECT into a distributed table via the coordinator.
593  */
594 static Node *
NonPushableInsertSelectCreateScan(CustomScan * scan)595 NonPushableInsertSelectCreateScan(CustomScan *scan)
596 {
597 	CitusScanState *scanState = palloc0(sizeof(CitusScanState));
598 
599 	scanState->executorType = MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT;
600 	scanState->customScanState.ss.ps.type = T_CustomScanState;
601 	scanState->distributedPlan = GetDistributedPlan(scan);
602 
603 	scanState->customScanState.methods =
604 		&NonPushableInsertSelectCustomExecMethods;
605 
606 	scanState->finishedPreScan = false;
607 	scanState->finishedRemoteScan = false;
608 
609 	return (Node *) scanState;
610 }
611 
612 
613 /*
614  * DelayedErrorCreateScan is only called if we could not plan for the given
615  * query. This is the case when a plan is not ready for execution because
616  * CreateDistributedPlan() couldn't find a plan due to unresolved prepared
617  * statement parameters, but didn't error out, because we expect custom plans
618  * to come to our rescue. But sql (not plpgsql) functions unfortunately don't
619  * go through a codepath supporting custom plans. Here, we error out with this
620  * delayed error message.
621  */
622 static Node *
DelayedErrorCreateScan(CustomScan * scan)623 DelayedErrorCreateScan(CustomScan *scan)
624 {
625 	DistributedPlan *distributedPlan = GetDistributedPlan(scan);
626 
627 	/* raise the deferred error */
628 	RaiseDeferredError(distributedPlan->planningError, ERROR);
629 
630 	return NULL;
631 }
632 
633 
634 /*
635  * CitusEndScan is used to clean up tuple store of the given custom scan state.
636  */
637 static void
CitusEndScan(CustomScanState * node)638 CitusEndScan(CustomScanState *node)
639 {
640 	CitusScanState *scanState = (CitusScanState *) node;
641 	Job *workerJob = scanState->distributedPlan->workerJob;
642 	uint64 queryId = scanState->distributedPlan->queryId;
643 	MultiExecutorType executorType = scanState->executorType;
644 	Const *partitionKeyConst = NULL;
645 	char *partitionKeyString = NULL;
646 
647 	/* stop propagating notices */
648 	DisableWorkerMessagePropagation();
649 
650 	/*
651 	 * Check whether we received warnings that should not have been
652 	 * ignored.
653 	 */
654 	ErrorIfWorkerErrorIndicationReceived();
655 
656 	if (workerJob != NULL)
657 	{
658 		partitionKeyConst = workerJob->partitionKeyValue;
659 	}
660 
661 	/* queryId is not set if pg_stat_statements is not installed */
662 	if (queryId != 0)
663 	{
664 		if (partitionKeyConst != NULL && executorType == MULTI_EXECUTOR_ADAPTIVE)
665 		{
666 			partitionKeyString = DatumToString(partitionKeyConst->constvalue,
667 											   partitionKeyConst->consttype);
668 		}
669 
670 		/* queries without partition key are also recorded */
671 		CitusQueryStatsExecutorsEntry(queryId, executorType, partitionKeyString);
672 	}
673 
674 	if (scanState->tuplestorestate)
675 	{
676 		tuplestore_end(scanState->tuplestorestate);
677 		scanState->tuplestorestate = NULL;
678 	}
679 }
680 
681 
682 /*
683  * CitusReScan is not normally called, except in certain cases of
684  * DECLARE .. CURSOR WITH HOLD ..
685  */
686 static void
CitusReScan(CustomScanState * node)687 CitusReScan(CustomScanState *node)
688 {
689 	CitusScanState *scanState = (CitusScanState *) node;
690 	Job *workerJob = scanState->distributedPlan->workerJob;
691 	EState *executorState = ScanStateGetExecutorState(scanState);
692 	ParamListInfo paramListInfo = executorState->es_param_list_info;
693 
694 	if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved)
695 	{
696 		ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
697 						errmsg("Cursors for queries on distributed tables with "
698 							   "parameters are currently unsupported")));
699 	}
700 }
701 
702 
703 /*
704  * ScanStateGetTupleDescriptor returns the tuple descriptor for the given
705  * scan state.
706  */
707 TupleDesc
ScanStateGetTupleDescriptor(CitusScanState * scanState)708 ScanStateGetTupleDescriptor(CitusScanState *scanState)
709 {
710 	return scanState->customScanState.ss.ss_ScanTupleSlot->tts_tupleDescriptor;
711 }
712 
713 
714 /*
715  * ScanStateGetExecutorState returns the executor state for the given scan
716  * state.
717  */
718 EState *
ScanStateGetExecutorState(CitusScanState * scanState)719 ScanStateGetExecutorState(CitusScanState *scanState)
720 {
721 	return scanState->customScanState.ss.ps.state;
722 }
723 
724 
725 /*
726  * FetchCitusCustomScanIfExists traverses a given plan and returns a Citus CustomScan
727  * if it has any.
728  */
729 CustomScan *
FetchCitusCustomScanIfExists(Plan * plan)730 FetchCitusCustomScanIfExists(Plan *plan)
731 {
732 	if (plan == NULL)
733 	{
734 		return NULL;
735 	}
736 
737 	if (IsCitusCustomScan(plan))
738 	{
739 		return (CustomScan *) plan;
740 	}
741 
742 	CustomScan *customScan = FetchCitusCustomScanIfExists(plan->lefttree);
743 
744 	if (customScan == NULL)
745 	{
746 		customScan = FetchCitusCustomScanIfExists(plan->righttree);
747 	}
748 
749 	return customScan;
750 }
751 
752 
753 /*
754  * IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus
755  * by recursively walking through the plan tree.
756  */
757 bool
IsCitusPlan(Plan * plan)758 IsCitusPlan(Plan *plan)
759 {
760 	if (plan == NULL)
761 	{
762 		return false;
763 	}
764 
765 	if (IsCitusCustomScan(plan))
766 	{
767 		return true;
768 	}
769 
770 	return IsCitusPlan(plan->lefttree) || IsCitusPlan(plan->righttree);
771 }
772 
773 
774 /*
775  * IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus.
776  */
777 bool
IsCitusCustomScan(Plan * plan)778 IsCitusCustomScan(Plan *plan)
779 {
780 	if (plan == NULL)
781 	{
782 		return false;
783 	}
784 
785 	if (!IsA(plan, CustomScan))
786 	{
787 		return false;
788 	}
789 
790 	CustomScan *customScan = (CustomScan *) plan;
791 	if (list_length(customScan->custom_private) == 0)
792 	{
793 		return false;
794 	}
795 
796 	Node *privateNode = (Node *) linitial(customScan->custom_private);
797 	if (!CitusIsA(privateNode, DistributedPlan))
798 	{
799 		return false;
800 	}
801 
802 	return true;
803 }
804