1 /*-------------------------------------------------------------------------
2 * utility_hook.c
3 * Citus utility hook and related functionality.
4 *
5 * The utility hook is called by PostgreSQL when processing any command
6 * that is not SELECT, UPDATE, DELETE, INSERT, in place of the regular
7 * PostprocessUtility function. We use this primarily to implement (or in
8 * some cases prevent) DDL commands and COPY on distributed tables.
9 *
10 * For DDL commands that affect distributed tables, we check whether
11 * they are valid (and implemented) for the distributed table and then
12 * propagate the command to all shards and, in case of MX, to distributed
13 * tables on other nodes. We still call the original ProcessUtility
14 * function to apply catalog changes on the coordinator.
15 *
16 * For COPY into a distributed table, we provide an alternative
17 * implementation in ProcessCopyStmt that sends rows to shards based
18 * on their distribution column value instead of writing it to the local
19 * table on the coordinator. For COPY from a distributed table, we
20 * replace the table with a SELECT * FROM table and pass it back to
21 * PostprocessUtility, which will plan the query via the distributed planner
22 * hook.
23 *
24 * Copyright (c) Citus Data, Inc.
25 *-------------------------------------------------------------------------
26 */
27
28 #include "distributed/pg_version_constants.h"
29
30 #include "postgres.h"
31 #include "miscadmin.h"
32
33 #include "access/attnum.h"
34 #include "access/heapam.h"
35 #include "access/htup_details.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/dependency.h"
39 #include "commands/dbcommands.h"
40 #include "commands/defrem.h"
41 #include "commands/tablecmds.h"
42 #include "distributed/adaptive_executor.h"
43 #include "distributed/colocation_utils.h"
44 #include "distributed/commands.h"
45 #include "distributed/commands/multi_copy.h"
46 #include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */
47 #include "distributed/deparser.h"
48 #include "distributed/deparse_shard_query.h"
49 #include "distributed/foreign_key_relationship.h"
50 #include "distributed/listutils.h"
51 #include "distributed/local_executor.h"
52 #include "distributed/maintenanced.h"
53 #include "distributed/coordinator_protocol.h"
54 #include "distributed/metadata_cache.h"
55 #include "distributed/metadata_sync.h"
56 #include "distributed/multi_executor.h"
57 #include "distributed/multi_explain.h"
58 #include "distributed/multi_physical_planner.h"
59 #include "distributed/reference_table_utils.h"
60 #include "distributed/resource_lock.h"
61 #include "distributed/transmit.h"
62 #include "distributed/version_compat.h"
63 #include "distributed/worker_transaction.h"
64 #include "lib/stringinfo.h"
65 #include "nodes/parsenodes.h"
66 #include "nodes/pg_list.h"
67 #include "tcop/utility.h"
68 #include "utils/builtins.h"
69 #include "utils/lsyscache.h"
70 #include "utils/syscache.h"
71
72 bool EnableDDLPropagation = true; /* ddl propagation is enabled */
73 PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
74 static bool shouldInvalidateForeignKeyGraph = false;
75 static int activeAlterTables = 0;
76 static int activeDropSchemaOrDBs = 0;
77 static bool ConstraintDropped = false;
78
79
80 int UtilityHookLevel = 0;
81
82
83 /* Local functions forward declarations for helper functions */
84 static void ProcessUtilityInternal(PlannedStmt *pstmt,
85 const char *queryString,
86 ProcessUtilityContext context,
87 ParamListInfo params,
88 struct QueryEnvironment *queryEnv,
89 DestReceiver *dest,
90 QueryCompletionCompat *completionTag);
91 static char * SetSearchPathToCurrentSearchPathCommand(void);
92 static char * CurrentSearchPath(void);
93 static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
94 static void PostStandardProcessUtility(Node *parsetree);
95 static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
96 static bool IsDropSchemaOrDB(Node *parsetree);
97 static bool ShouldUndistributeCitusLocalTables(void);
98
99
100 /*
101 * ProcessUtilityForParseTree is a convenience method to create a PlannedStmt out of
102 * pieces of a utility statement before invoking ProcessUtility.
103 */
104 void
ProcessUtilityParseTree(Node * node,const char * queryString,ProcessUtilityContext context,ParamListInfo params,DestReceiver * dest,QueryCompletionCompat * completionTag)105 ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityContext
106 context,
107 ParamListInfo params, DestReceiver *dest,
108 QueryCompletionCompat *completionTag)
109 {
110 PlannedStmt *plannedStmt = makeNode(PlannedStmt);
111 plannedStmt->commandType = CMD_UTILITY;
112 plannedStmt->utilityStmt = node;
113
114 ProcessUtility_compat(plannedStmt, queryString, false, context, params, NULL, dest,
115 completionTag);
116 }
117
118
119 /*
120 * multi_ProcessUtility is the main entry hook for implementing Citus-specific
121 * utility behavior. Its primary responsibilities are intercepting COPY and DDL
122 * commands and augmenting the coordinator's command with corresponding tasks
123 * to be run on worker nodes, after suitably ensuring said commands' options
124 * are fully supported by Citus. Much of the DDL behavior is toggled by Citus'
125 * enable_ddl_propagation GUC. In addition to DDL and COPY, utilities such as
126 * TRUNCATE and VACUUM are also supported.
127 */
128 void
multi_ProcessUtility(PlannedStmt * pstmt,const char * queryString,bool readOnlyTree,ProcessUtilityContext context,ParamListInfo params,struct QueryEnvironment * queryEnv,DestReceiver * dest,QueryCompletionCompat * completionTag)129 multi_ProcessUtility(PlannedStmt *pstmt,
130 const char *queryString,
131 #if PG_VERSION_NUM >= PG_VERSION_14
132 bool readOnlyTree,
133 #endif
134 ProcessUtilityContext context,
135 ParamListInfo params,
136 struct QueryEnvironment *queryEnv,
137 DestReceiver *dest,
138 QueryCompletionCompat *completionTag)
139 {
140 Node *parsetree;
141
142 #if PG_VERSION_NUM >= PG_VERSION_14
143 if (readOnlyTree)
144 {
145 pstmt = copyObject(pstmt);
146 }
147 #endif
148
149 parsetree = pstmt->utilityStmt;
150
151 if (IsA(parsetree, TransactionStmt) ||
152 IsA(parsetree, LockStmt) ||
153 IsA(parsetree, ListenStmt) ||
154 IsA(parsetree, NotifyStmt) ||
155 IsA(parsetree, ExecuteStmt) ||
156 IsA(parsetree, PrepareStmt) ||
157 IsA(parsetree, DiscardStmt) ||
158 IsA(parsetree, DeallocateStmt))
159 {
160 /*
161 * Skip additional checks for common commands that do not have any
162 * Citus-specific logic.
163 *
164 * Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
165 * transactions in which case a lot of checks cannot be done safely in
166 * that state. Since we never need to intercept transaction statements,
167 * skip our checks and immediately fall into standard_ProcessUtility.
168 */
169 standard_ProcessUtility_compat(pstmt, queryString, false, context,
170 params, queryEnv, dest, completionTag);
171
172 return;
173 }
174
175 bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt(
176 parsetree);
177 if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt)
178 {
179 ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
180 }
181
182 if (!CitusHasBeenLoaded())
183 {
184 /*
185 * Ensure that utility commands do not behave any differently until CREATE
186 * EXTENSION is invoked.
187 */
188 standard_ProcessUtility_compat(pstmt, queryString, false, context,
189 params, queryEnv, dest, completionTag);
190
191 return;
192 }
193 else if (IsA(parsetree, CallStmt))
194 {
195 CallStmt *callStmt = (CallStmt *) parsetree;
196
197 /*
198 * If the procedure is distributed and we are using MX then we have the
199 * possibility of calling it on the worker. If the data is located on
200 * the worker this can avoid making many network round trips.
201 */
202 if (context == PROCESS_UTILITY_TOPLEVEL &&
203 CallDistributedProcedureRemotely(callStmt, dest))
204 {
205 return;
206 }
207
208 /*
209 * Stored procedures are a bit strange in the sense that some statements
210 * are not in a transaction block, but can be rolled back. We need to
211 * make sure we send all statements in a transaction block. The
212 * StoredProcedureLevel variable signals this to the router executor
213 * and indicates how deep in the call stack we are in case of nested
214 * stored procedures.
215 */
216 StoredProcedureLevel += 1;
217
218 PG_TRY();
219 {
220 standard_ProcessUtility_compat(pstmt, queryString, false, context,
221 params, queryEnv, dest, completionTag);
222
223 StoredProcedureLevel -= 1;
224 }
225 PG_CATCH();
226 {
227 StoredProcedureLevel -= 1;
228 PG_RE_THROW();
229 }
230 PG_END_TRY();
231
232 return;
233 }
234 else if (IsA(parsetree, DoStmt))
235 {
236 /*
237 * All statements in a DO block are executed in a single transaciton,
238 * so we need to keep track of whether we are inside a DO block.
239 */
240 DoBlockLevel += 1;
241
242 PG_TRY();
243 {
244 standard_ProcessUtility_compat(pstmt, queryString, false, context,
245 params, queryEnv, dest, completionTag);
246
247 DoBlockLevel -= 1;
248 }
249 PG_CATCH();
250 {
251 DoBlockLevel -= 1;
252 PG_RE_THROW();
253 }
254 PG_END_TRY();
255
256 return;
257 }
258
259 UtilityHookLevel++;
260
261 PG_TRY();
262 {
263 ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest,
264 completionTag);
265
266 if (UtilityHookLevel == 1)
267 {
268 /*
269 * When Citus local tables are disconnected from the foreign key graph, which
270 * can happen due to various kinds of drop commands, we immediately
271 * undistribute them at the end of the command.
272 */
273 if (ShouldUndistributeCitusLocalTables())
274 {
275 UndistributeDisconnectedCitusLocalTables();
276 }
277 ResetConstraintDropped();
278 }
279
280 UtilityHookLevel--;
281 }
282 PG_CATCH();
283 {
284 if (UtilityHookLevel == 1)
285 {
286 ResetConstraintDropped();
287 }
288
289 UtilityHookLevel--;
290
291 PG_RE_THROW();
292 }
293 PG_END_TRY();
294 }
295
296
297 /*
298 * ProcessUtilityInternal is a helper function for multi_ProcessUtility where majority
299 * of the Citus specific utility statements are handled here. The distinction between
300 * both functions is that Citus_ProcessUtility does not handle CALL and DO statements.
301 * The reason for the distinction is implemented to be able to find the "top-level" DDL
302 * commands (not internal/cascading ones). UtilityHookLevel variable is used to achieve
303 * this goal.
304 */
305 static void
ProcessUtilityInternal(PlannedStmt * pstmt,const char * queryString,ProcessUtilityContext context,ParamListInfo params,struct QueryEnvironment * queryEnv,DestReceiver * dest,QueryCompletionCompat * completionTag)306 ProcessUtilityInternal(PlannedStmt *pstmt,
307 const char *queryString,
308 ProcessUtilityContext context,
309 ParamListInfo params,
310 struct QueryEnvironment *queryEnv,
311 DestReceiver *dest,
312 QueryCompletionCompat *completionTag)
313 {
314 Node *parsetree = pstmt->utilityStmt;
315 List *ddlJobs = NIL;
316
317 if (IsA(parsetree, ExplainStmt) &&
318 IsA(((ExplainStmt *) parsetree)->query, Query))
319 {
320 ExplainStmt *explainStmt = (ExplainStmt *) parsetree;
321
322 if (IsTransactionBlock())
323 {
324 bool analyze = false;
325
326 DefElem *option = NULL;
327 foreach_ptr(option, explainStmt->options)
328 {
329 if (strcmp(option->defname, "analyze") == 0)
330 {
331 analyze = defGetBoolean(option);
332 }
333
334 /* don't "break", as explain.c will use the last value */
335 }
336
337 if (analyze)
338 {
339 /*
340 * Since we cannot execute EXPLAIN ANALYZE locally, we
341 * cannot continue.
342 */
343 ErrorIfTransactionAccessedPlacementsLocally();
344 }
345 }
346
347 /*
348 * EXPLAIN ANALYZE is tricky with local execution, and there is not
349 * much difference between the local and distributed execution in terms
350 * of the actual EXPLAIN output.
351 *
352 * TODO: It might be nice to have a way to show that the query is locally
353 * executed. Shall we add a INFO output?
354 */
355 DisableLocalExecution();
356 }
357
358 if (IsA(parsetree, CreateSubscriptionStmt))
359 {
360 CreateSubscriptionStmt *createSubStmt = (CreateSubscriptionStmt *) parsetree;
361
362 parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
363 }
364
365 /* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */
366 if (IsA(parsetree, VariableSetStmt))
367 {
368 VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;
369
370 /* at present, we only implement the NONE and LOCAL behaviors */
371 AssertState(PropagateSetCommands == PROPSETCMD_NONE ||
372 PropagateSetCommands == PROPSETCMD_LOCAL);
373
374 if (IsMultiStatementTransaction() && ShouldPropagateSetCommand(setStmt))
375 {
376 PostprocessVariableSetStmt(setStmt, queryString);
377 }
378 }
379
380 /*
381 * TRANSMIT used to be separate command, but to avoid patching the grammar
382 * it's now overlaid onto COPY, but with FORMAT = 'transmit' instead of the
383 * normal FORMAT options.
384 */
385 if (IsTransmitStmt(parsetree))
386 {
387 CopyStmt *copyStatement = (CopyStmt *) parsetree;
388 char *userName = TransmitStatementUser(copyStatement);
389 bool missingOK = false;
390 StringInfo transmitPath = makeStringInfo();
391
392 VerifyTransmitStmt(copyStatement);
393
394 /* ->relation->relname is the target file in our overloaded COPY */
395 appendStringInfoString(transmitPath, copyStatement->relation->relname);
396
397 if (userName != NULL)
398 {
399 Oid userId = get_role_oid(userName, missingOK);
400 appendStringInfo(transmitPath, ".%d", userId);
401 }
402
403 if (copyStatement->is_from)
404 {
405 RedirectCopyDataToRegularFile(transmitPath->data);
406 }
407 else
408 {
409 SendRegularFile(transmitPath->data);
410 }
411
412 /* Don't execute the faux copy statement */
413 return;
414 }
415
416 if (IsA(parsetree, CopyStmt))
417 {
418 MemoryContext planContext = GetMemoryChunkContext(parsetree);
419
420 parsetree = copyObject(parsetree);
421 parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag, queryString);
422
423 if (parsetree == NULL)
424 {
425 return;
426 }
427
428 MemoryContext previousContext = MemoryContextSwitchTo(planContext);
429 parsetree = copyObject(parsetree);
430 MemoryContextSwitchTo(previousContext);
431
432 /*
433 * we need to set the parsetree here already as we copy and replace the original
434 * parsetree during ddl propagation. In reality we need to refactor the code above
435 * to not juggle copy the parsetree and leak it to a potential cache above the
436 * utility hook.
437 */
438 pstmt->utilityStmt = parsetree;
439 }
440
441 /* we're mostly in DDL (and VACUUM/TRUNCATE) territory at this point... */
442
443 if (IsA(parsetree, CreateSeqStmt))
444 {
445 ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree);
446 }
447
448 if (IsA(parsetree, AlterSeqStmt))
449 {
450 ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
451 }
452
453 if (IsA(parsetree, TruncateStmt))
454 {
455 PreprocessTruncateStatement((TruncateStmt *) parsetree);
456 }
457
458 /*
459 * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
460 * and distribute the partition if necessary.
461 */
462 if (IsA(parsetree, AlterTableStmt))
463 {
464 AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
465
466 PreprocessAlterTableStmtAttachPartition(alterTableStatement, queryString);
467 }
468
469 /* only generate worker DDLJobs if propagation is enabled */
470 const DistributeObjectOps *ops = NULL;
471 if (EnableDDLPropagation)
472 {
473 /* copy planned statement since we might scribble on it or its utilityStmt */
474 pstmt = copyObject(pstmt);
475 parsetree = pstmt->utilityStmt;
476 ops = GetDistributeObjectOps(parsetree);
477
478 if (ops && ops->preprocess)
479 {
480 ddlJobs = ops->preprocess(parsetree, queryString, context);
481 }
482 }
483 else
484 {
485 /*
486 * citus.enable_ddl_propagation is disabled, which means that PostgreSQL
487 * should handle the DDL command on a distributed table directly, without
488 * Citus intervening. The only exception is partition column drop, in
489 * which case we error out. Advanced Citus users use this to implement their
490 * own DDL propagation. We also use it to avoid re-propagating DDL commands
491 * when changing MX tables on workers. Below, we also make sure that DDL
492 * commands don't run queries that might get intercepted by Citus and error
493 * out, specifically we skip validation in foreign keys.
494 */
495
496 if (IsA(parsetree, AlterTableStmt))
497 {
498 AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
499 if (AlterTableStmtObjType_compat(alterTableStmt) == OBJECT_TABLE ||
500 AlterTableStmtObjType_compat(alterTableStmt) == OBJECT_FOREIGN_TABLE)
501 {
502 ErrorIfAlterDropsPartitionColumn(alterTableStmt);
503
504 /*
505 * When issuing an ALTER TABLE ... ADD FOREIGN KEY command, the
506 * the validation step should be skipped on the distributed table.
507 * Therefore, we check whether the given ALTER TABLE statement is a
508 * FOREIGN KEY constraint and if so disable the validation step.
509 * Note validation is done on the shard level when DDL propagation
510 * is enabled. The following eagerly executes some tasks on workers.
511 */
512 parsetree =
513 SkipForeignKeyValidationIfConstraintIsFkey(alterTableStmt, false);
514 }
515 }
516 }
517
518 /* inform the user about potential caveats */
519 if (IsA(parsetree, CreatedbStmt))
520 {
521 ereport(NOTICE, (errmsg("Citus partially supports CREATE DATABASE for "
522 "distributed databases"),
523 errdetail("Citus does not propagate CREATE DATABASE "
524 "command to workers"),
525 errhint("You can manually create a database and its "
526 "extensions on workers.")));
527 }
528 else if (IsA(parsetree, CreateRoleStmt))
529 {
530 ereport(NOTICE, (errmsg("not propagating CREATE ROLE/USER commands to worker"
531 " nodes"),
532 errhint("Connect to worker nodes directly to manually create all"
533 " necessary users and roles.")));
534 }
535
536 /*
537 * Make sure that on DROP DATABASE we terminate the background daemon
538 * associated with it.
539 */
540 if (IsA(parsetree, DropdbStmt))
541 {
542 const bool missingOK = true;
543 DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree;
544 char *dbname = dropDbStatement->dbname;
545 Oid databaseOid = get_database_oid(dbname, missingOK);
546
547 if (OidIsValid(databaseOid))
548 {
549 StopMaintenanceDaemon(databaseOid);
550 }
551 }
552
553 if (IsDropCitusExtensionStmt(parsetree))
554 {
555 StopMaintenanceDaemon(MyDatabaseId);
556 }
557
558 pstmt->utilityStmt = parsetree;
559
560 PG_TRY();
561 {
562 IncrementUtilityHookCountersIfNecessary(parsetree);
563
564 /*
565 * Check if we are running ALTER EXTENSION citus UPDATE (TO "<version>") command and
566 * the available version is different than the current version of Citus. In this case,
567 * ALTER EXTENSION citus UPDATE command can actually update Citus to a new version.
568 */
569 bool isCreateAlterExtensionUpdateCitusStmt =
570 IsCreateAlterExtensionUpdateCitusStmt(parsetree);
571 bool isAlterExtensionUpdateCitusStmt = isCreateAlterExtensionUpdateCitusStmt &&
572 IsA(parsetree, AlterExtensionStmt);
573
574 bool citusCanBeUpdatedToAvailableVersion = false;
575
576 if (isAlterExtensionUpdateCitusStmt)
577 {
578 citusCanBeUpdatedToAvailableVersion = !InstalledAndAvailableVersionsSame();
579 }
580
581 standard_ProcessUtility_compat(pstmt, queryString, false, context,
582 params, queryEnv, dest, completionTag);
583
584 /*
585 * if we are running ALTER EXTENSION citus UPDATE (to "<version>") command, we may need
586 * to mark existing objects as distributed depending on the "version" parameter if
587 * specified in "ALTER EXTENSION citus UPDATE" command
588 */
589 if (isAlterExtensionUpdateCitusStmt && citusCanBeUpdatedToAvailableVersion)
590 {
591 PostprocessAlterExtensionCitusUpdateStmt(parsetree);
592 }
593
594 PostStandardProcessUtility(parsetree);
595 }
596 PG_CATCH();
597 {
598 PostStandardProcessUtility(parsetree);
599
600 PG_RE_THROW();
601 }
602 PG_END_TRY();
603
604 /*
605 * Post process for ddl statements
606 */
607 if (EnableDDLPropagation)
608 {
609 if (ops && ops->postprocess)
610 {
611 List *processJobs = ops->postprocess(parsetree, queryString);
612
613 if (processJobs)
614 {
615 Assert(ddlJobs == NIL); /* jobs should not have been set before */
616 ddlJobs = processJobs;
617 }
618 }
619
620 if (IsA(parsetree, RenameStmt) && ((RenameStmt *) parsetree)->renameType ==
621 OBJECT_ROLE && EnableAlterRolePropagation)
622 {
623 ereport(NOTICE, (errmsg("not propagating ALTER ROLE ... RENAME TO commands "
624 "to worker nodes"),
625 errhint("Connect to worker nodes directly to manually "
626 "rename the role")));
627 }
628 }
629
630 if (IsA(parsetree, CreateStmt))
631 {
632 CreateStmt *createStatement = (CreateStmt *) parsetree;
633
634 PostprocessCreateTableStmt(createStatement, queryString);
635 }
636
637 /* after local command has completed, finish by executing worker DDLJobs, if any */
638 if (ddlJobs != NIL)
639 {
640 if (IsA(parsetree, AlterTableStmt))
641 {
642 PostprocessAlterTableStmt(castNode(AlterTableStmt, parsetree));
643 }
644
645 DDLJob *ddlJob = NULL;
646 foreach_ptr(ddlJob, ddlJobs)
647 {
648 ExecuteDistributedDDLJob(ddlJob);
649 }
650
651 /*
652 * For CREATE/DROP/REINDEX CONCURRENTLY we mark the index as valid
653 * after successfully completing the distributed DDL job.
654 */
655 if (IsA(parsetree, IndexStmt))
656 {
657 IndexStmt *indexStmt = (IndexStmt *) parsetree;
658
659 if (indexStmt->concurrent)
660 {
661 /* no failures during CONCURRENTLY, mark the index as valid */
662 MarkIndexValid(indexStmt);
663 }
664 }
665 }
666
667 /* TODO: fold VACUUM's processing into the above block */
668 if (IsA(parsetree, VacuumStmt))
669 {
670 VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree;
671
672 PostprocessVacuumStmt(vacuumStmt, queryString);
673 }
674
675 if (!IsDropCitusExtensionStmt(parsetree) && !IsA(parsetree, DropdbStmt))
676 {
677 /*
678 * Ensure value is valid, we can't do some checks during CREATE
679 * EXTENSION. This is important to register some invalidation callbacks.
680 */
681 CitusHasBeenLoaded(); /* lgtm[cpp/return-value-ignored] */
682 }
683 }
684
685
686 /*
687 * UndistributeDisconnectedCitusLocalTables undistributes citus local tables that
688 * are not connected to any reference tables via their individual foreign key
689 * subgraphs.
690 */
691 void
UndistributeDisconnectedCitusLocalTables(void)692 UndistributeDisconnectedCitusLocalTables(void)
693 {
694 List *citusLocalTableIdList = CitusTableTypeIdList(CITUS_LOCAL_TABLE);
695 citusLocalTableIdList = SortList(citusLocalTableIdList, CompareOids);
696
697 Oid citusLocalTableId = InvalidOid;
698 foreach_oid(citusLocalTableId, citusLocalTableIdList)
699 {
700 /* acquire ShareRowExclusiveLock to prevent concurrent foreign key creation */
701 LOCKMODE lockMode = ShareRowExclusiveLock;
702 LockRelationOid(citusLocalTableId, lockMode);
703
704 HeapTuple heapTuple =
705 SearchSysCache1(RELOID, ObjectIdGetDatum(citusLocalTableId));
706 if (!HeapTupleIsValid(heapTuple))
707 {
708 /*
709 * UndistributeTable drops relation, skip if already undistributed
710 * via cascade.
711 */
712 continue;
713 }
714 ReleaseSysCache(heapTuple);
715
716 if (ConnectedToReferenceTableViaFKey(citusLocalTableId))
717 {
718 /* still connected to a reference table, skip it */
719 UnlockRelationOid(citusLocalTableId, lockMode);
720 continue;
721 }
722
723 /*
724 * Citus local table is not connected to any reference tables, then
725 * undistribute it via cascade. Here, instead of first dropping foreing
726 * keys then undistributing the table, we just set cascadeViaForeignKeys
727 * to true for simplicity.
728 *
729 * We suppress notices messages not to be too verbose. On the other hand,
730 * as UndistributeTable moves data to a new table, we want to inform user
731 * as it might take some time.
732 */
733 ereport(NOTICE, (errmsg("removing table %s from metadata as it is not "
734 "connected to any reference tables via foreign keys",
735 generate_qualified_relation_name(citusLocalTableId))));
736 TableConversionParameters params = {
737 .relationId = citusLocalTableId,
738 .cascadeViaForeignKeys = true,
739 .suppressNoticeMessages = true
740 };
741 UndistributeTable(¶ms);
742 }
743 }
744
745
746 /*
747 * ShouldUndistributeCitusLocalTables returns true if we might need to check
748 * citus local tables for their connectivity to reference tables.
749 */
750 static bool
ShouldUndistributeCitusLocalTables(void)751 ShouldUndistributeCitusLocalTables(void)
752 {
753 if (!ConstraintDropped)
754 {
755 /*
756 * citus_drop_trigger executes notify_constraint_dropped to set
757 * ConstraintDropped to true, which means that last command dropped
758 * a table constraint.
759 */
760 return false;
761 }
762
763 if (!CitusHasBeenLoaded())
764 {
765 /*
766 * If we are dropping citus, we should not try to undistribute citus
767 * local tables as they will also be dropped.
768 */
769 return false;
770 }
771
772 if (!InCoordinatedTransaction())
773 {
774 /* not interacting with any Citus objects */
775 return false;
776 }
777
778 if (IsCitusInitiatedRemoteBackend())
779 {
780 /* connection from the coordinator operating on a shard */
781 return false;
782 }
783
784 if (!ShouldEnableLocalReferenceForeignKeys())
785 {
786 /*
787 * If foreign keys between reference tables and local tables are
788 * disabled, then user might be using citus_add_local_table_to_metadata for
789 * their own purposes. In that case, we should not undistribute
790 * citus local tables.
791 */
792 return false;
793 }
794
795 if (!IsCoordinator())
796 {
797 /* we should not perform this operation in worker nodes */
798 return false;
799 }
800
801 return true;
802 }
803
804
805 /*
806 * NotifyUtilityHookConstraintDropped sets ConstraintDropped to true to tell us
807 * last command dropped a table constraint.
808 */
809 void
NotifyUtilityHookConstraintDropped(void)810 NotifyUtilityHookConstraintDropped(void)
811 {
812 ConstraintDropped = true;
813 }
814
815
816 /*
817 * ResetConstraintDropped sets ConstraintDropped to false.
818 */
819 void
ResetConstraintDropped(void)820 ResetConstraintDropped(void)
821 {
822 ConstraintDropped = false;
823 }
824
825
826 /*
827 * IsDropSchemaOrDB returns true if parsetree represents DROP SCHEMA ...or
828 * a DROP DATABASE.
829 */
830 static bool
IsDropSchemaOrDB(Node * parsetree)831 IsDropSchemaOrDB(Node *parsetree)
832 {
833 if (!IsA(parsetree, DropStmt))
834 {
835 return false;
836 }
837
838 DropStmt *dropStatement = (DropStmt *) parsetree;
839 return (dropStatement->removeType == OBJECT_SCHEMA) ||
840 (dropStatement->removeType == OBJECT_DATABASE);
841 }
842
843
844 /*
845 * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed trans-
846 * action, including metadata sync if needed. If the multi shard commit protocol is
847 * in its default value of '1pc', then a notice message indicating that '2pc' might be
848 * used for extra safety. In the commit protocol, a BEGIN is sent after connection to
849 * each shard placement and COMMIT/ROLLBACK is handled by
850 * CoordinatedTransactionCallback function.
851 *
852 * The function errors out if the node is not the coordinator or if the DDL is on
853 * a partitioned table which has replication factor > 1.
854 *
855 */
856 void
ExecuteDistributedDDLJob(DDLJob * ddlJob)857 ExecuteDistributedDDLJob(DDLJob *ddlJob)
858 {
859 bool shouldSyncMetadata = false;
860
861 EnsureCoordinator();
862
863 Oid targetRelationId = ddlJob->targetRelationId;
864
865 if (OidIsValid(targetRelationId))
866 {
867 /*
868 * Only for ddlJobs that are targetting a relation (table) we want to sync
869 * its metadata and verify some properties around the table.
870 */
871 shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId);
872 EnsurePartitionTableNotReplicated(targetRelationId);
873 }
874
875 bool localExecutionSupported = true;
876
877 if (!ddlJob->concurrentIndexCmd)
878 {
879 if (shouldSyncMetadata)
880 {
881 char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
882
883 SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
884
885 /*
886 * Given that we're relaying the query to the worker nodes directly,
887 * we should set the search path exactly the same when necessary.
888 */
889 if (setSearchPathCommand != NULL)
890 {
891 SendCommandToWorkersWithMetadata(setSearchPathCommand);
892 }
893
894 if (ddlJob->commandString != NULL)
895 {
896 SendCommandToWorkersWithMetadata((char *) ddlJob->commandString);
897 }
898 }
899
900 ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported);
901 }
902 else
903 {
904 localExecutionSupported = false;
905
906 /*
907 * Start a new transaction to make sure CONCURRENTLY commands
908 * on localhost do not block waiting for this transaction to finish.
909 */
910 if (ddlJob->startNewTransaction)
911 {
912 /*
913 * If cache is not populated, system catalog lookups will cause
914 * the xmin of current backend to change. Then the last phase
915 * of CREATE INDEX CONCURRENTLY, which is in a separate backend,
916 * will hang waiting for our backend and result in a deadlock.
917 *
918 * We populate the cache before starting the next transaction to
919 * avoid this. Most of the metadata has already been resolved in
920 * planning phase, we only need to lookup metadata needed for
921 * connection establishment.
922 */
923 (void) CurrentDatabaseName();
924
925 /*
926 * ConnParams (AuthInfo and PoolInfo) gets a snapshot, which
927 * will blocks the remote connections to localhost. Hence we warm up
928 * the cache here so that after we start a new transaction, the entries
929 * will already be in the hash table, hence we won't be holding any snapshots.
930 */
931 WarmUpConnParamsHash();
932 CommitTransactionCommand();
933 StartTransactionCommand();
934 }
935
936 /* save old commit protocol to restore at xact end */
937 Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
938 SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
939 MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
940 MemoryContext savedContext = CurrentMemoryContext;
941
942 PG_TRY();
943 {
944 ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported);
945
946 if (shouldSyncMetadata)
947 {
948 List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
949 char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
950
951 /*
952 * Given that we're relaying the query to the worker nodes directly,
953 * we should set the search path exactly the same when necessary.
954 */
955 if (setSearchPathCommand != NULL)
956 {
957 commandList = lappend(commandList, setSearchPathCommand);
958 }
959
960 commandList = lappend(commandList, (char *) ddlJob->commandString);
961
962 SendBareCommandListToMetadataWorkers(commandList);
963 }
964 }
965 PG_CATCH();
966 {
967 /* CopyErrorData() requires (CurrentMemoryContext != ErrorContext) */
968 MemoryContextSwitchTo(savedContext);
969 ErrorData *edata = CopyErrorData();
970
971 /*
972 * In concurrent index creation, if a worker index with the same name already
973 * exists, prompt to DROP the current index and retry the original command
974 */
975 if (edata->sqlerrcode == ERRCODE_DUPLICATE_TABLE)
976 {
977 ereport(ERROR,
978 (errmsg("CONCURRENTLY-enabled index command failed"),
979 errdetail(
980 "CONCURRENTLY-enabled index commands can fail partially, "
981 "leaving behind an INVALID index."),
982 errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
983 "invalid index, then retry the original command.")));
984 }
985 else
986 {
987 ereport(WARNING,
988 (errmsg(
989 "CONCURRENTLY-enabled index commands can fail partially, "
990 "leaving behind an INVALID index.\n Use DROP INDEX "
991 "CONCURRENTLY IF EXISTS to remove the invalid index.")));
992 PG_RE_THROW();
993 }
994 }
995 PG_END_TRY();
996 }
997 }
998
999
1000 /*
1001 * CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements
1002 * of shards of a distributed table. The command to be applied is generated by the
1003 * TableDDLCommand structure passed in.
1004 */
1005 DDLJob *
CreateCustomDDLTaskList(Oid relationId,TableDDLCommand * command)1006 CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command)
1007 {
1008 List *taskList = NIL;
1009 List *shardIntervalList = LoadShardIntervalList(relationId);
1010 uint64 jobId = INVALID_JOB_ID;
1011 Oid namespace = get_rel_namespace(relationId);
1012 char *namespaceName = get_namespace_name(namespace);
1013 int taskId = 1;
1014
1015 /* lock metadata before getting placement lists */
1016 LockShardListMetadata(shardIntervalList, ShareLock);
1017
1018 ShardInterval *shardInterval = NULL;
1019 foreach_ptr(shardInterval, shardIntervalList)
1020 {
1021 uint64 shardId = shardInterval->shardId;
1022
1023 char *commandStr = GetShardedTableDDLCommand(command, shardId, namespaceName);
1024
1025 Task *task = CitusMakeNode(Task);
1026 task->jobId = jobId;
1027 task->taskId = taskId++;
1028 task->taskType = DDL_TASK;
1029 SetTaskQueryString(task, commandStr);
1030 task->replicationModel = REPLICATION_MODEL_INVALID;
1031 task->dependentTaskList = NULL;
1032 task->anchorShardId = shardId;
1033 task->taskPlacementList = ActiveShardPlacementList(shardId);
1034
1035 taskList = lappend(taskList, task);
1036 }
1037
1038 DDLJob *ddlJob = palloc0(sizeof(DDLJob));
1039 ddlJob->targetRelationId = relationId;
1040 ddlJob->concurrentIndexCmd = false;
1041 ddlJob->commandString = GetTableDDLCommand(command);
1042 ddlJob->taskList = taskList;
1043
1044 return ddlJob;
1045 }
1046
1047
1048 /*
1049 * SetSearchPathToCurrentSearchPathCommand generates a command which can
1050 * set the search path to the exact same search path that the issueing node
1051 * has.
1052 *
1053 * If the current search path is null (or doesn't have any valid schemas),
1054 * the function returns NULL.
1055 */
1056 static char *
SetSearchPathToCurrentSearchPathCommand(void)1057 SetSearchPathToCurrentSearchPathCommand(void)
1058 {
1059 char *currentSearchPath = CurrentSearchPath();
1060
1061 if (currentSearchPath == NULL)
1062 {
1063 return NULL;
1064 }
1065
1066 StringInfo setCommand = makeStringInfo();
1067 appendStringInfo(setCommand, "SET search_path TO %s;", currentSearchPath);
1068
1069 return setCommand->data;
1070 }
1071
1072
1073 /*
1074 * CurrentSearchPath is a C interface for calling current_schemas(bool) that
1075 * PostgreSQL exports.
1076 *
1077 * CurrentSchemas returns all the schemas in the seach_path that are seperated
1078 * with comma (,) sign. The returned string can be used to set the search_path.
1079 *
1080 * The function omits implicit schemas.
1081 *
1082 * The function returns NULL if there are no valid schemas in the search_path,
1083 * mimicing current_schemas(false) function.
1084 */
1085 static char *
CurrentSearchPath(void)1086 CurrentSearchPath(void)
1087 {
1088 StringInfo currentSearchPath = makeStringInfo();
1089 List *searchPathList = fetch_search_path(false);
1090 bool schemaAdded = false;
1091
1092 Oid searchPathOid = InvalidOid;
1093 foreach_oid(searchPathOid, searchPathList)
1094 {
1095 char *schemaName = get_namespace_name(searchPathOid);
1096
1097 /* watch out for deleted namespace */
1098 if (schemaName)
1099 {
1100 if (schemaAdded)
1101 {
1102 appendStringInfoString(currentSearchPath, ",");
1103 schemaAdded = false;
1104 }
1105
1106 appendStringInfoString(currentSearchPath, quote_identifier(schemaName));
1107 schemaAdded = true;
1108 }
1109 }
1110
1111 /* fetch_search_path() returns a palloc'd list that we should free now */
1112 list_free(searchPathList);
1113
1114 return (currentSearchPath->len > 0 ? currentSearchPath->data : NULL);
1115 }
1116
1117
1118 /*
1119 * IncrementUtilityHookCountersIfNecessary increments activeAlterTables and
1120 * activeDropSchemaOrDBs counters if utility command being processed implies
1121 * to do so.
1122 */
1123 static void
IncrementUtilityHookCountersIfNecessary(Node * parsetree)1124 IncrementUtilityHookCountersIfNecessary(Node *parsetree)
1125 {
1126 if (IsA(parsetree, AlterTableStmt))
1127 {
1128 activeAlterTables++;
1129 }
1130
1131 if (IsDropSchemaOrDB(parsetree))
1132 {
1133 activeDropSchemaOrDBs++;
1134 }
1135 }
1136
1137
1138 /*
1139 * PostStandardProcessUtility performs operations to alter (backend) global
1140 * state of citus utility hook. Those operations should be done after standard
1141 * process utility executes even if it errors out.
1142 */
1143 static void
PostStandardProcessUtility(Node * parsetree)1144 PostStandardProcessUtility(Node *parsetree)
1145 {
1146 DecrementUtilityHookCountersIfNecessary(parsetree);
1147
1148 /*
1149 * Re-forming the foreign key graph relies on the command being executed
1150 * on the local table first. However, in order to decide whether the
1151 * command leads to an invalidation, we need to check before the command
1152 * is being executed since we read pg_constraint table. Thus, we maintain a
1153 * local flag and do the invalidation after multi_ProcessUtility,
1154 * before ExecuteDistributedDDLJob().
1155 */
1156 InvalidateForeignKeyGraphForDDL();
1157 }
1158
1159
1160 /*
1161 * DecrementUtilityHookCountersIfNecessary decrements activeAlterTables and
1162 * activeDropSchemaOrDBs counters if utility command being processed implies
1163 * to do so.
1164 */
1165 static void
DecrementUtilityHookCountersIfNecessary(Node * parsetree)1166 DecrementUtilityHookCountersIfNecessary(Node *parsetree)
1167 {
1168 if (IsA(parsetree, AlterTableStmt))
1169 {
1170 activeAlterTables--;
1171 }
1172
1173 if (IsDropSchemaOrDB(parsetree))
1174 {
1175 activeDropSchemaOrDBs--;
1176 }
1177 }
1178
1179
1180 /*
1181 * MarkInvalidateForeignKeyGraph marks whether the foreign key graph should be
1182 * invalidated due to a DDL.
1183 */
1184 void
MarkInvalidateForeignKeyGraph()1185 MarkInvalidateForeignKeyGraph()
1186 {
1187 shouldInvalidateForeignKeyGraph = true;
1188 }
1189
1190
1191 /*
1192 * InvalidateForeignKeyGraphForDDL simply keeps track of whether
1193 * the foreign key graph should be invalidated due to a DDL.
1194 */
1195 void
InvalidateForeignKeyGraphForDDL(void)1196 InvalidateForeignKeyGraphForDDL(void)
1197 {
1198 if (shouldInvalidateForeignKeyGraph)
1199 {
1200 InvalidateForeignKeyGraph();
1201
1202 shouldInvalidateForeignKeyGraph = false;
1203 }
1204 }
1205
1206
1207 /*
1208 * DDLTaskList builds a list of tasks to execute a DDL command on a
1209 * given list of shards.
1210 */
1211 List *
DDLTaskList(Oid relationId,const char * commandString)1212 DDLTaskList(Oid relationId, const char *commandString)
1213 {
1214 List *taskList = NIL;
1215 List *shardIntervalList = LoadShardIntervalList(relationId);
1216 Oid schemaId = get_rel_namespace(relationId);
1217 char *schemaName = get_namespace_name(schemaId);
1218 char *escapedSchemaName = quote_literal_cstr(schemaName);
1219 char *escapedCommandString = quote_literal_cstr(commandString);
1220 uint64 jobId = INVALID_JOB_ID;
1221 int taskId = 1;
1222
1223 /* lock metadata before getting placement lists */
1224 LockShardListMetadata(shardIntervalList, ShareLock);
1225
1226 ShardInterval *shardInterval = NULL;
1227 foreach_ptr(shardInterval, shardIntervalList)
1228 {
1229 uint64 shardId = shardInterval->shardId;
1230 StringInfo applyCommand = makeStringInfo();
1231
1232 /*
1233 * If rightRelationId is not InvalidOid, instead of worker_apply_shard_ddl_command
1234 * we use worker_apply_inter_shard_ddl_command.
1235 */
1236 appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
1237 escapedSchemaName, escapedCommandString);
1238
1239 Task *task = CitusMakeNode(Task);
1240 task->jobId = jobId;
1241 task->taskId = taskId++;
1242 task->taskType = DDL_TASK;
1243 SetTaskQueryString(task, applyCommand->data);
1244 task->replicationModel = REPLICATION_MODEL_INVALID;
1245 task->dependentTaskList = NULL;
1246 task->anchorShardId = shardId;
1247 task->taskPlacementList = ActiveShardPlacementList(shardId);
1248
1249 taskList = lappend(taskList, task);
1250 }
1251
1252 return taskList;
1253 }
1254
1255
1256 /*
1257 * NodeDDLTaskList builds a list of tasks to execute a DDL command on a
1258 * given target set of nodes.
1259 */
1260 List *
NodeDDLTaskList(TargetWorkerSet targets,List * commands)1261 NodeDDLTaskList(TargetWorkerSet targets, List *commands)
1262 {
1263 List *workerNodes = TargetWorkerSetNodeList(targets, NoLock);
1264
1265 if (list_length(workerNodes) <= 0)
1266 {
1267 /*
1268 * if there are no nodes we don't have to plan any ddl tasks. Planning them would
1269 * cause the executor to stop responding.
1270 */
1271 return NIL;
1272 }
1273
1274 Task *task = CitusMakeNode(Task);
1275 task->taskType = DDL_TASK;
1276 SetTaskQueryStringList(task, commands);
1277
1278 WorkerNode *workerNode = NULL;
1279 foreach_ptr(workerNode, workerNodes)
1280 {
1281 ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
1282 targetPlacement->nodeName = workerNode->workerName;
1283 targetPlacement->nodePort = workerNode->workerPort;
1284 targetPlacement->groupId = workerNode->groupId;
1285
1286 task->taskPlacementList = lappend(task->taskPlacementList, targetPlacement);
1287 }
1288
1289 DDLJob *ddlJob = palloc0(sizeof(DDLJob));
1290 ddlJob->targetRelationId = InvalidOid;
1291 ddlJob->concurrentIndexCmd = false;
1292 ddlJob->commandString = NULL;
1293 ddlJob->taskList = list_make1(task);
1294
1295 return list_make1(ddlJob);
1296 }
1297
1298
1299 /*
1300 * AlterTableInProgress returns true if we're processing an ALTER TABLE command
1301 * right now.
1302 */
1303 bool
AlterTableInProgress(void)1304 AlterTableInProgress(void)
1305 {
1306 return activeAlterTables > 0;
1307 }
1308
1309
1310 /*
1311 * DropSchemaOrDBInProgress returns true if we're processing a DROP SCHEMA
1312 * or a DROP DATABASE command right now.
1313 */
1314 bool
DropSchemaOrDBInProgress(void)1315 DropSchemaOrDBInProgress(void)
1316 {
1317 return activeDropSchemaOrDBs > 0;
1318 }
1319