1 /*-------------------------------------------------------------------------
2 *
3 * database.c
4 * Commands to interact with the database object in a distributed
5 * environment.
6 *
7 * Copyright (c) Citus Data, Inc.
8 *
9 *-------------------------------------------------------------------------
10 */
11
12 #include "postgres.h"
13
14 #include "access/htup_details.h"
15 #include "access/xact.h"
16 #include "catalog/objectaddress.h"
17 #include "catalog/pg_database.h"
18 #include "commands/dbcommands.h"
19 #include "miscadmin.h"
20 #include "nodes/parsenodes.h"
21 #include "utils/syscache.h"
22
23 #include "distributed/commands.h"
24 #include "distributed/commands/utility_hook.h"
25 #include "distributed/deparser.h"
26 #include "distributed/metadata_sync.h"
27 #include "distributed/metadata_utility.h"
28 #include "distributed/multi_executor.h"
29 #include "distributed/relation_access_tracking.h"
30 #include "distributed/worker_transaction.h"
31
32 static void EnsureSequentialModeForDatabaseDDL(void);
33 static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
34 static Oid get_database_owner(Oid db_oid);
35
36 /* controlled via GUC */
37 bool EnableAlterDatabaseOwner = false;
38
39
40 /*
41 * PreprocessAlterDatabaseOwnerStmt is called during the utility hook before the alter
42 * command is applied locally on the coordinator. This will verify if the command needs to
43 * be propagated to the workers and if so prepares a list of ddl commands to execute.
44 */
45 List *
PreprocessAlterDatabaseOwnerStmt(Node * node,const char * queryString,ProcessUtilityContext processUtilityContext)46 PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString,
47 ProcessUtilityContext processUtilityContext)
48 {
49 AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
50 Assert(stmt->objectType == OBJECT_DATABASE);
51
52 ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
53 if (!ShouldPropagateObject(&typeAddress))
54 {
55 return NIL;
56 }
57
58 if (!EnableAlterDatabaseOwner)
59 {
60 /* don't propagate if GUC is turned off */
61 return NIL;
62 }
63
64 EnsureCoordinator();
65
66 QualifyTreeNode((Node *) stmt);
67 const char *sql = DeparseTreeNode((Node *) stmt);
68
69 EnsureSequentialModeForDatabaseDDL();
70 List *commands = list_make3(DISABLE_DDL_PROPAGATION,
71 (void *) sql,
72 ENABLE_DDL_PROPAGATION);
73
74 return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
75 }
76
77
78 /*
79 * PostprocessAlterDatabaseOwnerStmt is called during the utility hook after the alter
80 * database command has been applied locally.
81 *
82 * Its main purpose is to propagate the newly formed dependencies onto the nodes before
83 * applying the change of owner of the databse. This ensures, for systems that have role
84 * management, that the roles will be created before applying the alter owner command.
85 */
86 List *
PostprocessAlterDatabaseOwnerStmt(Node * node,const char * queryString)87 PostprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString)
88 {
89 AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
90 Assert(stmt->objectType == OBJECT_DATABASE);
91
92 ObjectAddress typeAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
93 if (!ShouldPropagateObject(&typeAddress))
94 {
95 return NIL;
96 }
97
98 if (!EnableAlterDatabaseOwner)
99 {
100 /* don't propagate if GUC is turned off */
101 return NIL;
102 }
103
104 EnsureDependenciesExistOnAllNodes(&typeAddress);
105 return NIL;
106 }
107
108
109 /*
110 * AlterDatabaseOwnerObjectAddress returns the ObjectAddress of the database that is the
111 * object of the AlterOwnerStmt. Errors if missing_ok is false.
112 */
113 ObjectAddress
AlterDatabaseOwnerObjectAddress(Node * node,bool missing_ok)114 AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok)
115 {
116 AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
117 Assert(stmt->objectType == OBJECT_DATABASE);
118
119 Oid databaseOid = get_database_oid(strVal((Value *) stmt->object), missing_ok);
120 ObjectAddress address = { 0 };
121 ObjectAddressSet(address, DatabaseRelationId, databaseOid);
122
123 return address;
124 }
125
126
127 /*
128 * DatabaseOwnerDDLCommands returns a list of sql statements to idempotently apply a
129 * change of the database owner on the workers so that the database is owned by the same
130 * user on all nodes in the cluster.
131 */
132 List *
DatabaseOwnerDDLCommands(const ObjectAddress * address)133 DatabaseOwnerDDLCommands(const ObjectAddress *address)
134 {
135 Node *stmt = (Node *) RecreateAlterDatabaseOwnerStmt(address->objectId);
136 return list_make1(DeparseTreeNode(stmt));
137 }
138
139
140 /*
141 * RecreateAlterDatabaseOwnerStmt creates an AlterOwnerStmt that represents the operation
142 * of changing the owner of the database to its current owner.
143 */
144 static AlterOwnerStmt *
RecreateAlterDatabaseOwnerStmt(Oid databaseOid)145 RecreateAlterDatabaseOwnerStmt(Oid databaseOid)
146 {
147 AlterOwnerStmt *stmt = makeNode(AlterOwnerStmt);
148
149 stmt->objectType = OBJECT_DATABASE;
150 stmt->object = (Node *) makeString(get_database_name(databaseOid));
151
152 Oid ownerOid = get_database_owner(databaseOid);
153 stmt->newowner = makeNode(RoleSpec);
154 stmt->newowner->roletype = ROLESPEC_CSTRING;
155 stmt->newowner->rolename = GetUserNameFromId(ownerOid, false);
156
157 return stmt;
158 }
159
160
161 /*
162 * get_database_owner returns the Oid of the role owning the database
163 */
164 static Oid
get_database_owner(Oid db_oid)165 get_database_owner(Oid db_oid)
166 {
167 HeapTuple tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_oid));
168 if (!HeapTupleIsValid(tuple))
169 {
170 ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE),
171 errmsg("database with OID %u does not exist", db_oid)));
172 }
173
174 Oid dba = ((Form_pg_database) GETSTRUCT(tuple))->datdba;
175
176 ReleaseSysCache(tuple);
177
178 return dba;
179 }
180
181
182 /*
183 * EnsureSequentialModeForDatabaseDDL makes sure that the current transaction is already
184 * in sequential mode, or can still safely be put in sequential mode, it errors if that is
185 * not possible. The error contains information for the user to retry the transaction with
186 * sequential mode set from the beginning.
187 */
188 static void
EnsureSequentialModeForDatabaseDDL(void)189 EnsureSequentialModeForDatabaseDDL(void)
190 {
191 if (!IsTransactionBlock())
192 {
193 /* we do not need to switch to sequential mode if we are not in a transaction */
194 return;
195 }
196
197 if (ParallelQueryExecutedInTransaction())
198 {
199 ereport(ERROR, (errmsg("cannot create or modify database because there was a "
200 "parallel operation on a distributed table in the "
201 "transaction"),
202 errdetail("When creating or altering a database, Citus needs to "
203 "perform all operations over a single connection per "
204 "node to ensure consistency."),
205 errhint("Try re-running the transaction with "
206 "\"SET LOCAL citus.multi_shard_modify_mode TO "
207 "\'sequential\';\"")));
208 }
209
210 ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
211 errdetail("Database is created or altered. To make sure subsequent "
212 "commands see the type correctly we need to make sure to "
213 "use only one connection for all future commands")));
214 SetLocalMultiShardModifyModeToSequential();
215 }
216