1 /*-------------------------------------------------------------------------
2  * utility_hook.c
3  *	  Citus utility hook and related functionality.
4  *
5  * The utility hook is called by PostgreSQL when processing any command
6  * that is not SELECT, UPDATE, DELETE, INSERT, in place of the regular
7  * PostprocessUtility function. We use this primarily to implement (or in
8  * some cases prevent) DDL commands and COPY on distributed tables.
9  *
10  * For DDL commands that affect distributed tables, we check whether
11  * they are valid (and implemented) for the distributed table and then
12  * propagate the command to all shards and, in case of MX, to distributed
13  * tables on other nodes. We still call the original ProcessUtility
14  * function to apply catalog changes on the coordinator.
15  *
16  * For COPY into a distributed table, we provide an alternative
17  * implementation in ProcessCopyStmt that sends rows to shards based
18  * on their distribution column value instead of writing it to the local
19  * table on the coordinator. For COPY from a distributed table, we
20  * replace the table with a SELECT * FROM table and pass it back to
21  * PostprocessUtility, which will plan the query via the distributed planner
22  * hook.
23  *
24  * Copyright (c) Citus Data, Inc.
25  *-------------------------------------------------------------------------
26  */
27 
28 #include "distributed/pg_version_constants.h"
29 
30 #include "postgres.h"
31 #include "miscadmin.h"
32 
33 #include "access/attnum.h"
34 #include "access/heapam.h"
35 #include "access/htup_details.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/dependency.h"
39 #include "commands/dbcommands.h"
40 #include "commands/defrem.h"
41 #include "commands/tablecmds.h"
42 #include "distributed/adaptive_executor.h"
43 #include "distributed/colocation_utils.h"
44 #include "distributed/commands.h"
45 #include "distributed/commands/multi_copy.h"
46 #include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */
47 #include "distributed/deparser.h"
48 #include "distributed/deparse_shard_query.h"
49 #include "distributed/foreign_key_relationship.h"
50 #include "distributed/listutils.h"
51 #include "distributed/local_executor.h"
52 #include "distributed/maintenanced.h"
53 #include "distributed/coordinator_protocol.h"
54 #include "distributed/metadata_cache.h"
55 #include "distributed/metadata_sync.h"
56 #include "distributed/multi_executor.h"
57 #include "distributed/multi_explain.h"
58 #include "distributed/multi_physical_planner.h"
59 #include "distributed/reference_table_utils.h"
60 #include "distributed/resource_lock.h"
61 #include "distributed/transmit.h"
62 #include "distributed/version_compat.h"
63 #include "distributed/worker_transaction.h"
64 #include "lib/stringinfo.h"
65 #include "nodes/parsenodes.h"
66 #include "nodes/pg_list.h"
67 #include "tcop/utility.h"
68 #include "utils/builtins.h"
69 #include "utils/lsyscache.h"
70 #include "utils/syscache.h"
71 
72 bool EnableDDLPropagation = true; /* ddl propagation is enabled */
73 PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
74 static bool shouldInvalidateForeignKeyGraph = false;
75 static int activeAlterTables = 0;
76 static int activeDropSchemaOrDBs = 0;
77 static bool ConstraintDropped = false;
78 
79 
80 int UtilityHookLevel = 0;
81 
82 
83 /* Local functions forward declarations for helper functions */
84 static void ProcessUtilityInternal(PlannedStmt *pstmt,
85 								   const char *queryString,
86 								   ProcessUtilityContext context,
87 								   ParamListInfo params,
88 								   struct QueryEnvironment *queryEnv,
89 								   DestReceiver *dest,
90 								   QueryCompletionCompat *completionTag);
91 static char * SetSearchPathToCurrentSearchPathCommand(void);
92 static char * CurrentSearchPath(void);
93 static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
94 static void PostStandardProcessUtility(Node *parsetree);
95 static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
96 static bool IsDropSchemaOrDB(Node *parsetree);
97 static bool ShouldUndistributeCitusLocalTables(void);
98 
99 
100 /*
101  * ProcessUtilityForParseTree is a convenience method to create a PlannedStmt out of
102  * pieces of a utility statement before invoking ProcessUtility.
103  */
104 void
ProcessUtilityParseTree(Node * node,const char * queryString,ProcessUtilityContext context,ParamListInfo params,DestReceiver * dest,QueryCompletionCompat * completionTag)105 ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityContext
106 						context,
107 						ParamListInfo params, DestReceiver *dest,
108 						QueryCompletionCompat *completionTag)
109 {
110 	PlannedStmt *plannedStmt = makeNode(PlannedStmt);
111 	plannedStmt->commandType = CMD_UTILITY;
112 	plannedStmt->utilityStmt = node;
113 
114 	ProcessUtility_compat(plannedStmt, queryString, false, context, params, NULL, dest,
115 						  completionTag);
116 }
117 
118 
119 /*
120  * multi_ProcessUtility is the main entry hook for implementing Citus-specific
121  * utility behavior. Its primary responsibilities are intercepting COPY and DDL
122  * commands and augmenting the coordinator's command with corresponding tasks
123  * to be run on worker nodes, after suitably ensuring said commands' options
124  * are fully supported by Citus. Much of the DDL behavior is toggled by Citus'
125  * enable_ddl_propagation GUC. In addition to DDL and COPY, utilities such as
126  * TRUNCATE and VACUUM are also supported.
127  */
128 void
multi_ProcessUtility(PlannedStmt * pstmt,const char * queryString,bool readOnlyTree,ProcessUtilityContext context,ParamListInfo params,struct QueryEnvironment * queryEnv,DestReceiver * dest,QueryCompletionCompat * completionTag)129 multi_ProcessUtility(PlannedStmt *pstmt,
130 					 const char *queryString,
131 #if PG_VERSION_NUM >= PG_VERSION_14
132 					 bool readOnlyTree,
133 #endif
134 					 ProcessUtilityContext context,
135 					 ParamListInfo params,
136 					 struct QueryEnvironment *queryEnv,
137 					 DestReceiver *dest,
138 					 QueryCompletionCompat *completionTag)
139 {
140 	Node *parsetree;
141 
142 #if PG_VERSION_NUM >= PG_VERSION_14
143 	if (readOnlyTree)
144 	{
145 		pstmt = copyObject(pstmt);
146 	}
147 #endif
148 
149 	parsetree = pstmt->utilityStmt;
150 
151 	if (IsA(parsetree, TransactionStmt) ||
152 		IsA(parsetree, LockStmt) ||
153 		IsA(parsetree, ListenStmt) ||
154 		IsA(parsetree, NotifyStmt) ||
155 		IsA(parsetree, ExecuteStmt) ||
156 		IsA(parsetree, PrepareStmt) ||
157 		IsA(parsetree, DiscardStmt) ||
158 		IsA(parsetree, DeallocateStmt))
159 	{
160 		/*
161 		 * Skip additional checks for common commands that do not have any
162 		 * Citus-specific logic.
163 		 *
164 		 * Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
165 		 * transactions in which case a lot of checks cannot be done safely in
166 		 * that state. Since we never need to intercept transaction statements,
167 		 * skip our checks and immediately fall into standard_ProcessUtility.
168 		 */
169 		standard_ProcessUtility_compat(pstmt, queryString, false, context,
170 									   params, queryEnv, dest, completionTag);
171 
172 		return;
173 	}
174 
175 	bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt(
176 		parsetree);
177 	if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt)
178 	{
179 		ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
180 	}
181 
182 	if (!CitusHasBeenLoaded())
183 	{
184 		/*
185 		 * Ensure that utility commands do not behave any differently until CREATE
186 		 * EXTENSION is invoked.
187 		 */
188 		standard_ProcessUtility_compat(pstmt, queryString, false, context,
189 									   params, queryEnv, dest, completionTag);
190 
191 		return;
192 	}
193 	else if (IsA(parsetree, CallStmt))
194 	{
195 		CallStmt *callStmt = (CallStmt *) parsetree;
196 
197 		/*
198 		 * If the procedure is distributed and we are using MX then we have the
199 		 * possibility of calling it on the worker. If the data is located on
200 		 * the worker this can avoid making many network round trips.
201 		 */
202 		if (context == PROCESS_UTILITY_TOPLEVEL &&
203 			CallDistributedProcedureRemotely(callStmt, dest))
204 		{
205 			return;
206 		}
207 
208 		/*
209 		 * Stored procedures are a bit strange in the sense that some statements
210 		 * are not in a transaction block, but can be rolled back. We need to
211 		 * make sure we send all statements in a transaction block. The
212 		 * StoredProcedureLevel variable signals this to the router executor
213 		 * and indicates how deep in the call stack we are in case of nested
214 		 * stored procedures.
215 		 */
216 		StoredProcedureLevel += 1;
217 
218 		PG_TRY();
219 		{
220 			standard_ProcessUtility_compat(pstmt, queryString, false, context,
221 										   params, queryEnv, dest, completionTag);
222 
223 			StoredProcedureLevel -= 1;
224 		}
225 		PG_CATCH();
226 		{
227 			StoredProcedureLevel -= 1;
228 			PG_RE_THROW();
229 		}
230 		PG_END_TRY();
231 
232 		return;
233 	}
234 	else if (IsA(parsetree, DoStmt))
235 	{
236 		/*
237 		 * All statements in a DO block are executed in a single transaciton,
238 		 * so we need to keep track of whether we are inside a DO block.
239 		 */
240 		DoBlockLevel += 1;
241 
242 		PG_TRY();
243 		{
244 			standard_ProcessUtility_compat(pstmt, queryString, false, context,
245 										   params, queryEnv, dest, completionTag);
246 
247 			DoBlockLevel -= 1;
248 		}
249 		PG_CATCH();
250 		{
251 			DoBlockLevel -= 1;
252 			PG_RE_THROW();
253 		}
254 		PG_END_TRY();
255 
256 		return;
257 	}
258 
259 	UtilityHookLevel++;
260 
261 	PG_TRY();
262 	{
263 		ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest,
264 							   completionTag);
265 
266 		if (UtilityHookLevel == 1)
267 		{
268 			/*
269 			 * When Citus local tables are disconnected from the foreign key graph, which
270 			 * can happen due to various kinds of drop commands, we immediately
271 			 * undistribute them at the end of the command.
272 			 */
273 			if (ShouldUndistributeCitusLocalTables())
274 			{
275 				UndistributeDisconnectedCitusLocalTables();
276 			}
277 			ResetConstraintDropped();
278 		}
279 
280 		UtilityHookLevel--;
281 	}
282 	PG_CATCH();
283 	{
284 		if (UtilityHookLevel == 1)
285 		{
286 			ResetConstraintDropped();
287 		}
288 
289 		UtilityHookLevel--;
290 
291 		PG_RE_THROW();
292 	}
293 	PG_END_TRY();
294 }
295 
296 
297 /*
298  * ProcessUtilityInternal is a helper function for multi_ProcessUtility where majority
299  * of the Citus specific utility statements are handled here. The distinction between
300  * both functions is that Citus_ProcessUtility does not handle CALL and DO statements.
301  * The reason for the distinction is implemented to be able to find the "top-level" DDL
302  * commands (not internal/cascading ones). UtilityHookLevel variable is used to achieve
303  * this goal.
304  */
305 static void
ProcessUtilityInternal(PlannedStmt * pstmt,const char * queryString,ProcessUtilityContext context,ParamListInfo params,struct QueryEnvironment * queryEnv,DestReceiver * dest,QueryCompletionCompat * completionTag)306 ProcessUtilityInternal(PlannedStmt *pstmt,
307 					   const char *queryString,
308 					   ProcessUtilityContext context,
309 					   ParamListInfo params,
310 					   struct QueryEnvironment *queryEnv,
311 					   DestReceiver *dest,
312 					   QueryCompletionCompat *completionTag)
313 {
314 	Node *parsetree = pstmt->utilityStmt;
315 	List *ddlJobs = NIL;
316 
317 	if (IsA(parsetree, ExplainStmt) &&
318 		IsA(((ExplainStmt *) parsetree)->query, Query))
319 	{
320 		ExplainStmt *explainStmt = (ExplainStmt *) parsetree;
321 
322 		if (IsTransactionBlock())
323 		{
324 			bool analyze = false;
325 
326 			DefElem *option = NULL;
327 			foreach_ptr(option, explainStmt->options)
328 			{
329 				if (strcmp(option->defname, "analyze") == 0)
330 				{
331 					analyze = defGetBoolean(option);
332 				}
333 
334 				/* don't "break", as explain.c will use the last value */
335 			}
336 
337 			if (analyze)
338 			{
339 				/*
340 				 * Since we cannot execute EXPLAIN ANALYZE locally, we
341 				 * cannot continue.
342 				 */
343 				ErrorIfTransactionAccessedPlacementsLocally();
344 			}
345 		}
346 
347 		/*
348 		 * EXPLAIN ANALYZE is tricky with local execution, and there is not
349 		 * much difference between the local and distributed execution in terms
350 		 * of the actual EXPLAIN output.
351 		 *
352 		 * TODO: It might be nice to have a way to show that the query is locally
353 		 * executed. Shall we add a INFO output?
354 		 */
355 		DisableLocalExecution();
356 	}
357 
358 	if (IsA(parsetree, CreateSubscriptionStmt))
359 	{
360 		CreateSubscriptionStmt *createSubStmt = (CreateSubscriptionStmt *) parsetree;
361 
362 		parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
363 	}
364 
365 	/* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */
366 	if (IsA(parsetree, VariableSetStmt))
367 	{
368 		VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;
369 
370 		/* at present, we only implement the NONE and LOCAL behaviors */
371 		AssertState(PropagateSetCommands == PROPSETCMD_NONE ||
372 					PropagateSetCommands == PROPSETCMD_LOCAL);
373 
374 		if (IsMultiStatementTransaction() && ShouldPropagateSetCommand(setStmt))
375 		{
376 			PostprocessVariableSetStmt(setStmt, queryString);
377 		}
378 	}
379 
380 	/*
381 	 * TRANSMIT used to be separate command, but to avoid patching the grammar
382 	 * it's now overlaid onto COPY, but with FORMAT = 'transmit' instead of the
383 	 * normal FORMAT options.
384 	 */
385 	if (IsTransmitStmt(parsetree))
386 	{
387 		CopyStmt *copyStatement = (CopyStmt *) parsetree;
388 		char *userName = TransmitStatementUser(copyStatement);
389 		bool missingOK = false;
390 		StringInfo transmitPath = makeStringInfo();
391 
392 		VerifyTransmitStmt(copyStatement);
393 
394 		/* ->relation->relname is the target file in our overloaded COPY */
395 		appendStringInfoString(transmitPath, copyStatement->relation->relname);
396 
397 		if (userName != NULL)
398 		{
399 			Oid userId = get_role_oid(userName, missingOK);
400 			appendStringInfo(transmitPath, ".%d", userId);
401 		}
402 
403 		if (copyStatement->is_from)
404 		{
405 			RedirectCopyDataToRegularFile(transmitPath->data);
406 		}
407 		else
408 		{
409 			SendRegularFile(transmitPath->data);
410 		}
411 
412 		/* Don't execute the faux copy statement */
413 		return;
414 	}
415 
416 	if (IsA(parsetree, CopyStmt))
417 	{
418 		MemoryContext planContext = GetMemoryChunkContext(parsetree);
419 
420 		parsetree = copyObject(parsetree);
421 		parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag, queryString);
422 
423 		if (parsetree == NULL)
424 		{
425 			return;
426 		}
427 
428 		MemoryContext previousContext = MemoryContextSwitchTo(planContext);
429 		parsetree = copyObject(parsetree);
430 		MemoryContextSwitchTo(previousContext);
431 
432 		/*
433 		 * we need to set the parsetree here already as we copy and replace the original
434 		 * parsetree during ddl propagation. In reality we need to refactor the code above
435 		 * to not juggle copy the parsetree and leak it to a potential cache above the
436 		 * utility hook.
437 		 */
438 		pstmt->utilityStmt = parsetree;
439 	}
440 
441 	/* we're mostly in DDL (and VACUUM/TRUNCATE) territory at this point... */
442 
443 	if (IsA(parsetree, CreateSeqStmt))
444 	{
445 		ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree);
446 	}
447 
448 	if (IsA(parsetree, AlterSeqStmt))
449 	{
450 		ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
451 	}
452 
453 	if (IsA(parsetree, TruncateStmt))
454 	{
455 		PreprocessTruncateStatement((TruncateStmt *) parsetree);
456 	}
457 
458 	/*
459 	 * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
460 	 * and distribute the partition if necessary.
461 	 */
462 	if (IsA(parsetree, AlterTableStmt))
463 	{
464 		AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
465 
466 		PreprocessAlterTableStmtAttachPartition(alterTableStatement, queryString);
467 	}
468 
469 	/* only generate worker DDLJobs if propagation is enabled */
470 	const DistributeObjectOps *ops = NULL;
471 	if (EnableDDLPropagation)
472 	{
473 		/* copy planned statement since we might scribble on it or its utilityStmt */
474 		pstmt = copyObject(pstmt);
475 		parsetree = pstmt->utilityStmt;
476 		ops = GetDistributeObjectOps(parsetree);
477 
478 		if (ops && ops->preprocess)
479 		{
480 			ddlJobs = ops->preprocess(parsetree, queryString, context);
481 		}
482 	}
483 	else
484 	{
485 		/*
486 		 * citus.enable_ddl_propagation is disabled, which means that PostgreSQL
487 		 * should handle the DDL command on a distributed table directly, without
488 		 * Citus intervening. The only exception is partition column drop, in
489 		 * which case we error out. Advanced Citus users use this to implement their
490 		 * own DDL propagation. We also use it to avoid re-propagating DDL commands
491 		 * when changing MX tables on workers. Below, we also make sure that DDL
492 		 * commands don't run queries that might get intercepted by Citus and error
493 		 * out, specifically we skip validation in foreign keys.
494 		 */
495 
496 		if (IsA(parsetree, AlterTableStmt))
497 		{
498 			AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
499 			if (AlterTableStmtObjType_compat(alterTableStmt) == OBJECT_TABLE ||
500 				AlterTableStmtObjType_compat(alterTableStmt) == OBJECT_FOREIGN_TABLE)
501 			{
502 				ErrorIfAlterDropsPartitionColumn(alterTableStmt);
503 
504 				/*
505 				 * When issuing an ALTER TABLE ... ADD FOREIGN KEY command, the
506 				 * the validation step should be skipped on the distributed table.
507 				 * Therefore, we check whether the given ALTER TABLE statement is a
508 				 * FOREIGN KEY constraint and if so disable the validation step.
509 				 * Note validation is done on the shard level when DDL propagation
510 				 * is enabled. The following eagerly executes some tasks on workers.
511 				 */
512 				parsetree =
513 					SkipForeignKeyValidationIfConstraintIsFkey(alterTableStmt, false);
514 			}
515 		}
516 	}
517 
518 	/* inform the user about potential caveats */
519 	if (IsA(parsetree, CreatedbStmt))
520 	{
521 		ereport(NOTICE, (errmsg("Citus partially supports CREATE DATABASE for "
522 								"distributed databases"),
523 						 errdetail("Citus does not propagate CREATE DATABASE "
524 								   "command to workers"),
525 						 errhint("You can manually create a database and its "
526 								 "extensions on workers.")));
527 	}
528 	else if (IsA(parsetree, CreateRoleStmt))
529 	{
530 		ereport(NOTICE, (errmsg("not propagating CREATE ROLE/USER commands to worker"
531 								" nodes"),
532 						 errhint("Connect to worker nodes directly to manually create all"
533 								 " necessary users and roles.")));
534 	}
535 
536 	/*
537 	 * Make sure that on DROP DATABASE we terminate the background daemon
538 	 * associated with it.
539 	 */
540 	if (IsA(parsetree, DropdbStmt))
541 	{
542 		const bool missingOK = true;
543 		DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree;
544 		char *dbname = dropDbStatement->dbname;
545 		Oid databaseOid = get_database_oid(dbname, missingOK);
546 
547 		if (OidIsValid(databaseOid))
548 		{
549 			StopMaintenanceDaemon(databaseOid);
550 		}
551 	}
552 
553 	if (IsDropCitusExtensionStmt(parsetree))
554 	{
555 		StopMaintenanceDaemon(MyDatabaseId);
556 	}
557 
558 	pstmt->utilityStmt = parsetree;
559 
560 	PG_TRY();
561 	{
562 		IncrementUtilityHookCountersIfNecessary(parsetree);
563 
564 		/*
565 		 * Check if we are running ALTER EXTENSION citus UPDATE (TO "<version>") command and
566 		 * the available version is different than the current version of Citus. In this case,
567 		 * ALTER EXTENSION citus UPDATE command can actually update Citus to a new version.
568 		 */
569 		bool isCreateAlterExtensionUpdateCitusStmt =
570 			IsCreateAlterExtensionUpdateCitusStmt(parsetree);
571 		bool isAlterExtensionUpdateCitusStmt = isCreateAlterExtensionUpdateCitusStmt &&
572 											   IsA(parsetree, AlterExtensionStmt);
573 
574 		bool citusCanBeUpdatedToAvailableVersion = false;
575 
576 		if (isAlterExtensionUpdateCitusStmt)
577 		{
578 			citusCanBeUpdatedToAvailableVersion = !InstalledAndAvailableVersionsSame();
579 		}
580 
581 		standard_ProcessUtility_compat(pstmt, queryString, false, context,
582 									   params, queryEnv, dest, completionTag);
583 
584 		/*
585 		 * if we are running ALTER EXTENSION citus UPDATE (to "<version>") command, we may need
586 		 * to mark existing objects as distributed depending on the "version" parameter if
587 		 * specified in "ALTER EXTENSION citus UPDATE" command
588 		 */
589 		if (isAlterExtensionUpdateCitusStmt && citusCanBeUpdatedToAvailableVersion)
590 		{
591 			PostprocessAlterExtensionCitusUpdateStmt(parsetree);
592 		}
593 
594 		PostStandardProcessUtility(parsetree);
595 	}
596 	PG_CATCH();
597 	{
598 		PostStandardProcessUtility(parsetree);
599 
600 		PG_RE_THROW();
601 	}
602 	PG_END_TRY();
603 
604 	/*
605 	 * Post process for ddl statements
606 	 */
607 	if (EnableDDLPropagation)
608 	{
609 		if (ops && ops->postprocess)
610 		{
611 			List *processJobs = ops->postprocess(parsetree, queryString);
612 
613 			if (processJobs)
614 			{
615 				Assert(ddlJobs == NIL); /* jobs should not have been set before */
616 				ddlJobs = processJobs;
617 			}
618 		}
619 
620 		if (IsA(parsetree, RenameStmt) && ((RenameStmt *) parsetree)->renameType ==
621 			OBJECT_ROLE && EnableAlterRolePropagation)
622 		{
623 			ereport(NOTICE, (errmsg("not propagating ALTER ROLE ... RENAME TO commands "
624 									"to worker nodes"),
625 							 errhint("Connect to worker nodes directly to manually "
626 									 "rename the role")));
627 		}
628 	}
629 
630 	if (IsA(parsetree, CreateStmt))
631 	{
632 		CreateStmt *createStatement = (CreateStmt *) parsetree;
633 
634 		PostprocessCreateTableStmt(createStatement, queryString);
635 	}
636 
637 	/* after local command has completed, finish by executing worker DDLJobs, if any */
638 	if (ddlJobs != NIL)
639 	{
640 		if (IsA(parsetree, AlterTableStmt))
641 		{
642 			PostprocessAlterTableStmt(castNode(AlterTableStmt, parsetree));
643 		}
644 
645 		DDLJob *ddlJob = NULL;
646 		foreach_ptr(ddlJob, ddlJobs)
647 		{
648 			ExecuteDistributedDDLJob(ddlJob);
649 		}
650 
651 		/*
652 		 * For CREATE/DROP/REINDEX CONCURRENTLY we mark the index as valid
653 		 * after successfully completing the distributed DDL job.
654 		 */
655 		if (IsA(parsetree, IndexStmt))
656 		{
657 			IndexStmt *indexStmt = (IndexStmt *) parsetree;
658 
659 			if (indexStmt->concurrent)
660 			{
661 				/* no failures during CONCURRENTLY, mark the index as valid */
662 				MarkIndexValid(indexStmt);
663 			}
664 		}
665 	}
666 
667 	/* TODO: fold VACUUM's processing into the above block */
668 	if (IsA(parsetree, VacuumStmt))
669 	{
670 		VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree;
671 
672 		PostprocessVacuumStmt(vacuumStmt, queryString);
673 	}
674 
675 	if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt))
676 	{
677 		/*
678 		 * Ensure value is valid, we can't do some checks during CREATE
679 		 * EXTENSION. This is important to register some invalidation callbacks.
680 		 */
681 		CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */
682 	}
683 }
684 
685 
686 /*
687  * UndistributeDisconnectedCitusLocalTables undistributes citus local tables that
688  * are not connected to any reference tables via their individual foreign key
689  * subgraphs.
690  */
691 void
UndistributeDisconnectedCitusLocalTables(void)692 UndistributeDisconnectedCitusLocalTables(void)
693 {
694 	List *citusLocalTableIdList = CitusTableTypeIdList(CITUS_LOCAL_TABLE);
695 	citusLocalTableIdList = SortList(citusLocalTableIdList, CompareOids);
696 
697 	Oid citusLocalTableId = InvalidOid;
698 	foreach_oid(citusLocalTableId, citusLocalTableIdList)
699 	{
700 		/* acquire ShareRowExclusiveLock to prevent concurrent foreign key creation */
701 		LOCKMODE lockMode = ShareRowExclusiveLock;
702 		LockRelationOid(citusLocalTableId, lockMode);
703 
704 		HeapTuple heapTuple =
705 			SearchSysCache1(RELOID, ObjectIdGetDatum(citusLocalTableId));
706 		if (!HeapTupleIsValid(heapTuple))
707 		{
708 			/*
709 			 * UndistributeTable drops relation, skip if already undistributed
710 			 * via cascade.
711 			 */
712 			continue;
713 		}
714 		ReleaseSysCache(heapTuple);
715 
716 		if (ConnectedToReferenceTableViaFKey(citusLocalTableId))
717 		{
718 			/* still connected to a reference table, skip it */
719 			UnlockRelationOid(citusLocalTableId, lockMode);
720 			continue;
721 		}
722 
723 		/*
724 		 * Citus local table is not connected to any reference tables, then
725 		 * undistribute it via cascade. Here, instead of first dropping foreing
726 		 * keys then undistributing the table, we just set cascadeViaForeignKeys
727 		 * to true for simplicity.
728 		 *
729 		 * We suppress notices messages not to be too verbose. On the other hand,
730 		 * as UndistributeTable moves data to a new table, we want to inform user
731 		 * as it might take some time.
732 		 */
733 		ereport(NOTICE, (errmsg("removing table %s from metadata as it is not "
734 								"connected to any reference tables via foreign keys",
735 								generate_qualified_relation_name(citusLocalTableId))));
736 		TableConversionParameters params = {
737 			.relationId = citusLocalTableId,
738 			.cascadeViaForeignKeys = true,
739 			.suppressNoticeMessages = true
740 		};
741 		UndistributeTable(&params);
742 	}
743 }
744 
745 
746 /*
747  * ShouldUndistributeCitusLocalTables returns true if we might need to check
748  * citus local tables for their connectivity to reference tables.
749  */
750 static bool
ShouldUndistributeCitusLocalTables(void)751 ShouldUndistributeCitusLocalTables(void)
752 {
753 	if (!ConstraintDropped)
754 	{
755 		/*
756 		 * citus_drop_trigger executes notify_constraint_dropped to set
757 		 * ConstraintDropped to true, which means that last command dropped
758 		 * a table constraint.
759 		 */
760 		return false;
761 	}
762 
763 	if (!CitusHasBeenLoaded())
764 	{
765 		/*
766 		 * If we are dropping citus, we should not try to undistribute citus
767 		 * local tables as they will also be dropped.
768 		 */
769 		return false;
770 	}
771 
772 	if (!InCoordinatedTransaction())
773 	{
774 		/* not interacting with any Citus objects */
775 		return false;
776 	}
777 
778 	if (IsCitusInitiatedRemoteBackend())
779 	{
780 		/* connection from the coordinator operating on a shard */
781 		return false;
782 	}
783 
784 	if (!ShouldEnableLocalReferenceForeignKeys())
785 	{
786 		/*
787 		 * If foreign keys between reference tables and local tables are
788 		 * disabled, then user might be using citus_add_local_table_to_metadata for
789 		 * their own purposes. In that case, we should not undistribute
790 		 * citus local tables.
791 		 */
792 		return false;
793 	}
794 
795 	if (!IsCoordinator())
796 	{
797 		/* we should not perform this operation in worker nodes */
798 		return false;
799 	}
800 
801 	return true;
802 }
803 
804 
805 /*
806  * NotifyUtilityHookConstraintDropped sets ConstraintDropped to true to tell us
807  * last command dropped a table constraint.
808  */
809 void
NotifyUtilityHookConstraintDropped(void)810 NotifyUtilityHookConstraintDropped(void)
811 {
812 	ConstraintDropped = true;
813 }
814 
815 
816 /*
817  * ResetConstraintDropped sets ConstraintDropped to false.
818  */
819 void
ResetConstraintDropped(void)820 ResetConstraintDropped(void)
821 {
822 	ConstraintDropped = false;
823 }
824 
825 
826 /*
827  * IsDropSchemaOrDB returns true if parsetree represents DROP SCHEMA ...or
828  * a DROP DATABASE.
829  */
830 static bool
IsDropSchemaOrDB(Node * parsetree)831 IsDropSchemaOrDB(Node *parsetree)
832 {
833 	if (!IsA(parsetree, DropStmt))
834 	{
835 		return false;
836 	}
837 
838 	DropStmt *dropStatement = (DropStmt *) parsetree;
839 	return (dropStatement->removeType == OBJECT_SCHEMA) ||
840 		   (dropStatement->removeType == OBJECT_DATABASE);
841 }
842 
843 
844 /*
845  * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed trans-
846  * action, including metadata sync if needed. If the multi shard commit protocol is
847  * in its default value of '1pc', then a notice message indicating that '2pc' might be
848  * used for extra safety. In the commit protocol, a BEGIN is sent after connection to
849  * each shard placement and COMMIT/ROLLBACK is handled by
850  * CoordinatedTransactionCallback function.
851  *
852  * The function errors out if the node is not the coordinator or if the DDL is on
853  * a partitioned table which has replication factor > 1.
854  *
855  */
856 void
ExecuteDistributedDDLJob(DDLJob * ddlJob)857 ExecuteDistributedDDLJob(DDLJob *ddlJob)
858 {
859 	bool shouldSyncMetadata = false;
860 
861 	EnsureCoordinator();
862 
863 	Oid targetRelationId = ddlJob->targetRelationId;
864 
865 	if (OidIsValid(targetRelationId))
866 	{
867 		/*
868 		 * Only for ddlJobs that are targetting a relation (table) we want to sync
869 		 * its metadata and verify some properties around the table.
870 		 */
871 		shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId);
872 		EnsurePartitionTableNotReplicated(targetRelationId);
873 	}
874 
875 	bool localExecutionSupported = true;
876 
877 	if (!ddlJob->concurrentIndexCmd)
878 	{
879 		if (shouldSyncMetadata)
880 		{
881 			char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
882 
883 			SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
884 
885 			/*
886 			 * Given that we're relaying the query to the worker nodes directly,
887 			 * we should set the search path exactly the same when necessary.
888 			 */
889 			if (setSearchPathCommand != NULL)
890 			{
891 				SendCommandToWorkersWithMetadata(setSearchPathCommand);
892 			}
893 
894 			if (ddlJob->commandString != NULL)
895 			{
896 				SendCommandToWorkersWithMetadata((char *) ddlJob->commandString);
897 			}
898 		}
899 
900 		ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported);
901 	}
902 	else
903 	{
904 		localExecutionSupported = false;
905 
906 		/*
907 		 * Start a new transaction to make sure CONCURRENTLY commands
908 		 * on localhost do not block waiting for this transaction to finish.
909 		 */
910 		if (ddlJob->startNewTransaction)
911 		{
912 			/*
913 			 * If cache is not populated, system catalog lookups will cause
914 			 * the xmin of current backend to change. Then the last phase
915 			 * of CREATE INDEX CONCURRENTLY, which is in a separate backend,
916 			 * will hang waiting for our backend and result in a deadlock.
917 			 *
918 			 * We populate the cache before starting the next transaction to
919 			 * avoid this. Most of the metadata has already been resolved in
920 			 * planning phase, we only need to lookup metadata needed for
921 			 * connection establishment.
922 			 */
923 			(void) CurrentDatabaseName();
924 
925 			/*
926 			 * ConnParams (AuthInfo and PoolInfo) gets a snapshot, which
927 			 * will blocks the remote connections to localhost. Hence we warm up
928 			 * the cache here so that after we start a new transaction, the entries
929 			 * will already be in the hash table, hence we won't be holding any snapshots.
930 			 */
931 			WarmUpConnParamsHash();
932 			CommitTransactionCommand();
933 			StartTransactionCommand();
934 		}
935 
936 		/* save old commit protocol to restore at xact end */
937 		Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
938 		SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
939 		MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
940 		MemoryContext savedContext = CurrentMemoryContext;
941 
942 		PG_TRY();
943 		{
944 			ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported);
945 
946 			if (shouldSyncMetadata)
947 			{
948 				List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
949 				char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
950 
951 				/*
952 				 * Given that we're relaying the query to the worker nodes directly,
953 				 * we should set the search path exactly the same when necessary.
954 				 */
955 				if (setSearchPathCommand != NULL)
956 				{
957 					commandList = lappend(commandList, setSearchPathCommand);
958 				}
959 
960 				commandList = lappend(commandList, (char *) ddlJob->commandString);
961 
962 				SendBareCommandListToMetadataWorkers(commandList);
963 			}
964 		}
965 		PG_CATCH();
966 		{
967 			/* CopyErrorData() requires (CurrentMemoryContext != ErrorContext) */
968 			MemoryContextSwitchTo(savedContext);
969 			ErrorData *edata = CopyErrorData();
970 
971 			/*
972 			 * In concurrent index creation, if a worker index with the same name already
973 			 * exists, prompt to DROP the current index and retry the original command
974 			 */
975 			if (edata->sqlerrcode == ERRCODE_DUPLICATE_TABLE)
976 			{
977 				ereport(ERROR,
978 						(errmsg("CONCURRENTLY-enabled index command failed"),
979 						 errdetail(
980 							 "CONCURRENTLY-enabled index commands can fail partially, "
981 							 "leaving behind an INVALID index."),
982 						 errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
983 								 "invalid index, then retry the original command.")));
984 			}
985 			else
986 			{
987 				ereport(WARNING,
988 						(errmsg(
989 							 "CONCURRENTLY-enabled index commands can fail partially, "
990 							 "leaving behind an INVALID index.\n Use DROP INDEX "
991 							 "CONCURRENTLY IF EXISTS to remove the invalid index.")));
992 				PG_RE_THROW();
993 			}
994 		}
995 		PG_END_TRY();
996 	}
997 }
998 
999 
1000 /*
1001  * CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements
1002  * of shards of a distributed table. The command to be applied is generated by the
1003  * TableDDLCommand structure passed in.
1004  */
1005 DDLJob *
CreateCustomDDLTaskList(Oid relationId,TableDDLCommand * command)1006 CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command)
1007 {
1008 	List *taskList = NIL;
1009 	List *shardIntervalList = LoadShardIntervalList(relationId);
1010 	uint64 jobId = INVALID_JOB_ID;
1011 	Oid namespace = get_rel_namespace(relationId);
1012 	char *namespaceName = get_namespace_name(namespace);
1013 	int taskId = 1;
1014 
1015 	/* lock metadata before getting placement lists */
1016 	LockShardListMetadata(shardIntervalList, ShareLock);
1017 
1018 	ShardInterval *shardInterval = NULL;
1019 	foreach_ptr(shardInterval, shardIntervalList)
1020 	{
1021 		uint64 shardId = shardInterval->shardId;
1022 
1023 		char *commandStr = GetShardedTableDDLCommand(command, shardId, namespaceName);
1024 
1025 		Task *task = CitusMakeNode(Task);
1026 		task->jobId = jobId;
1027 		task->taskId = taskId++;
1028 		task->taskType = DDL_TASK;
1029 		SetTaskQueryString(task, commandStr);
1030 		task->replicationModel = REPLICATION_MODEL_INVALID;
1031 		task->dependentTaskList = NULL;
1032 		task->anchorShardId = shardId;
1033 		task->taskPlacementList = ActiveShardPlacementList(shardId);
1034 
1035 		taskList = lappend(taskList, task);
1036 	}
1037 
1038 	DDLJob *ddlJob = palloc0(sizeof(DDLJob));
1039 	ddlJob->targetRelationId = relationId;
1040 	ddlJob->concurrentIndexCmd = false;
1041 	ddlJob->commandString = GetTableDDLCommand(command);
1042 	ddlJob->taskList = taskList;
1043 
1044 	return ddlJob;
1045 }
1046 
1047 
1048 /*
1049  * SetSearchPathToCurrentSearchPathCommand generates a command which can
1050  * set the search path to the exact same search path that the issueing node
1051  * has.
1052  *
1053  * If the current search path is null (or doesn't have any valid schemas),
1054  * the function returns NULL.
1055  */
1056 static char *
SetSearchPathToCurrentSearchPathCommand(void)1057 SetSearchPathToCurrentSearchPathCommand(void)
1058 {
1059 	char *currentSearchPath = CurrentSearchPath();
1060 
1061 	if (currentSearchPath == NULL)
1062 	{
1063 		return NULL;
1064 	}
1065 
1066 	StringInfo setCommand = makeStringInfo();
1067 	appendStringInfo(setCommand, "SET search_path TO %s;", currentSearchPath);
1068 
1069 	return setCommand->data;
1070 }
1071 
1072 
1073 /*
1074  * CurrentSearchPath is a C interface for calling current_schemas(bool) that
1075  * PostgreSQL exports.
1076  *
1077  * CurrentSchemas returns all the schemas in the seach_path that are seperated
1078  * with comma (,) sign. The returned string can be used to set the search_path.
1079  *
1080  * The function omits implicit schemas.
1081  *
1082  * The function returns NULL if there are no valid schemas in the search_path,
1083  * mimicing current_schemas(false) function.
1084  */
1085 static char *
CurrentSearchPath(void)1086 CurrentSearchPath(void)
1087 {
1088 	StringInfo currentSearchPath = makeStringInfo();
1089 	List *searchPathList = fetch_search_path(false);
1090 	bool schemaAdded = false;
1091 
1092 	Oid searchPathOid = InvalidOid;
1093 	foreach_oid(searchPathOid, searchPathList)
1094 	{
1095 		char *schemaName = get_namespace_name(searchPathOid);
1096 
1097 		/* watch out for deleted namespace */
1098 		if (schemaName)
1099 		{
1100 			if (schemaAdded)
1101 			{
1102 				appendStringInfoString(currentSearchPath, ",");
1103 				schemaAdded = false;
1104 			}
1105 
1106 			appendStringInfoString(currentSearchPath, quote_identifier(schemaName));
1107 			schemaAdded = true;
1108 		}
1109 	}
1110 
1111 	/* fetch_search_path() returns a palloc'd list that we should free now */
1112 	list_free(searchPathList);
1113 
1114 	return (currentSearchPath->len > 0 ? currentSearchPath->data : NULL);
1115 }
1116 
1117 
1118 /*
1119  * IncrementUtilityHookCountersIfNecessary increments activeAlterTables and
1120  * activeDropSchemaOrDBs counters if utility command being processed implies
1121  * to do so.
1122  */
1123 static void
IncrementUtilityHookCountersIfNecessary(Node * parsetree)1124 IncrementUtilityHookCountersIfNecessary(Node *parsetree)
1125 {
1126 	if (IsA(parsetree, AlterTableStmt))
1127 	{
1128 		activeAlterTables++;
1129 	}
1130 
1131 	if (IsDropSchemaOrDB(parsetree))
1132 	{
1133 		activeDropSchemaOrDBs++;
1134 	}
1135 }
1136 
1137 
1138 /*
1139  * PostStandardProcessUtility performs operations to alter (backend) global
1140  * state of citus utility hook. Those operations should be done after standard
1141  * process utility executes even if it errors out.
1142  */
1143 static void
PostStandardProcessUtility(Node * parsetree)1144 PostStandardProcessUtility(Node *parsetree)
1145 {
1146 	DecrementUtilityHookCountersIfNecessary(parsetree);
1147 
1148 	/*
1149 	 * Re-forming the foreign key graph relies on the command being executed
1150 	 * on the local table first. However, in order to decide whether the
1151 	 * command leads to an invalidation, we need to check before the command
1152 	 * is being executed since we read pg_constraint table. Thus, we maintain a
1153 	 * local flag and do the invalidation after multi_ProcessUtility,
1154 	 * before ExecuteDistributedDDLJob().
1155 	 */
1156 	InvalidateForeignKeyGraphForDDL();
1157 }
1158 
1159 
1160 /*
1161  * DecrementUtilityHookCountersIfNecessary decrements activeAlterTables and
1162  * activeDropSchemaOrDBs counters if utility command being processed implies
1163  * to do so.
1164  */
1165 static void
DecrementUtilityHookCountersIfNecessary(Node * parsetree)1166 DecrementUtilityHookCountersIfNecessary(Node *parsetree)
1167 {
1168 	if (IsA(parsetree, AlterTableStmt))
1169 	{
1170 		activeAlterTables--;
1171 	}
1172 
1173 	if (IsDropSchemaOrDB(parsetree))
1174 	{
1175 		activeDropSchemaOrDBs--;
1176 	}
1177 }
1178 
1179 
1180 /*
1181  * MarkInvalidateForeignKeyGraph marks whether the foreign key graph should be
1182  * invalidated due to a DDL.
1183  */
1184 void
MarkInvalidateForeignKeyGraph()1185 MarkInvalidateForeignKeyGraph()
1186 {
1187 	shouldInvalidateForeignKeyGraph = true;
1188 }
1189 
1190 
1191 /*
1192  * InvalidateForeignKeyGraphForDDL simply keeps track of whether
1193  * the foreign key graph should be invalidated due to a DDL.
1194  */
1195 void
InvalidateForeignKeyGraphForDDL(void)1196 InvalidateForeignKeyGraphForDDL(void)
1197 {
1198 	if (shouldInvalidateForeignKeyGraph)
1199 	{
1200 		InvalidateForeignKeyGraph();
1201 
1202 		shouldInvalidateForeignKeyGraph = false;
1203 	}
1204 }
1205 
1206 
1207 /*
1208  * DDLTaskList builds a list of tasks to execute a DDL command on a
1209  * given list of shards.
1210  */
1211 List *
DDLTaskList(Oid relationId,const char * commandString)1212 DDLTaskList(Oid relationId, const char *commandString)
1213 {
1214 	List *taskList = NIL;
1215 	List *shardIntervalList = LoadShardIntervalList(relationId);
1216 	Oid schemaId = get_rel_namespace(relationId);
1217 	char *schemaName = get_namespace_name(schemaId);
1218 	char *escapedSchemaName = quote_literal_cstr(schemaName);
1219 	char *escapedCommandString = quote_literal_cstr(commandString);
1220 	uint64 jobId = INVALID_JOB_ID;
1221 	int taskId = 1;
1222 
1223 	/* lock metadata before getting placement lists */
1224 	LockShardListMetadata(shardIntervalList, ShareLock);
1225 
1226 	ShardInterval *shardInterval = NULL;
1227 	foreach_ptr(shardInterval, shardIntervalList)
1228 	{
1229 		uint64 shardId = shardInterval->shardId;
1230 		StringInfo applyCommand = makeStringInfo();
1231 
1232 		/*
1233 		 * If rightRelationId is not InvalidOid, instead of worker_apply_shard_ddl_command
1234 		 * we use worker_apply_inter_shard_ddl_command.
1235 		 */
1236 		appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
1237 						 escapedSchemaName, escapedCommandString);
1238 
1239 		Task *task = CitusMakeNode(Task);
1240 		task->jobId = jobId;
1241 		task->taskId = taskId++;
1242 		task->taskType = DDL_TASK;
1243 		SetTaskQueryString(task, applyCommand->data);
1244 		task->replicationModel = REPLICATION_MODEL_INVALID;
1245 		task->dependentTaskList = NULL;
1246 		task->anchorShardId = shardId;
1247 		task->taskPlacementList = ActiveShardPlacementList(shardId);
1248 
1249 		taskList = lappend(taskList, task);
1250 	}
1251 
1252 	return taskList;
1253 }
1254 
1255 
1256 /*
1257  * NodeDDLTaskList builds a list of tasks to execute a DDL command on a
1258  * given target set of nodes.
1259  */
1260 List *
NodeDDLTaskList(TargetWorkerSet targets,List * commands)1261 NodeDDLTaskList(TargetWorkerSet targets, List *commands)
1262 {
1263 	List *workerNodes = TargetWorkerSetNodeList(targets, NoLock);
1264 
1265 	if (list_length(workerNodes) <= 0)
1266 	{
1267 		/*
1268 		 * if there are no nodes we don't have to plan any ddl tasks. Planning them would
1269 		 * cause the executor to stop responding.
1270 		 */
1271 		return NIL;
1272 	}
1273 
1274 	Task *task = CitusMakeNode(Task);
1275 	task->taskType = DDL_TASK;
1276 	SetTaskQueryStringList(task, commands);
1277 
1278 	WorkerNode *workerNode = NULL;
1279 	foreach_ptr(workerNode, workerNodes)
1280 	{
1281 		ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
1282 		targetPlacement->nodeName = workerNode->workerName;
1283 		targetPlacement->nodePort = workerNode->workerPort;
1284 		targetPlacement->groupId = workerNode->groupId;
1285 
1286 		task->taskPlacementList = lappend(task->taskPlacementList, targetPlacement);
1287 	}
1288 
1289 	DDLJob *ddlJob = palloc0(sizeof(DDLJob));
1290 	ddlJob->targetRelationId = InvalidOid;
1291 	ddlJob->concurrentIndexCmd = false;
1292 	ddlJob->commandString = NULL;
1293 	ddlJob->taskList = list_make1(task);
1294 
1295 	return list_make1(ddlJob);
1296 }
1297 
1298 
1299 /*
1300  * AlterTableInProgress returns true if we're processing an ALTER TABLE command
1301  * right now.
1302  */
1303 bool
AlterTableInProgress(void)1304 AlterTableInProgress(void)
1305 {
1306 	return activeAlterTables > 0;
1307 }
1308 
1309 
1310 /*
1311  * DropSchemaOrDBInProgress returns true if we're processing a DROP SCHEMA
1312  * or a DROP DATABASE command right now.
1313  */
1314 bool
DropSchemaOrDBInProgress(void)1315 DropSchemaOrDBInProgress(void)
1316 {
1317 	return activeDropSchemaOrDBs > 0;
1318 }
1319