1 /*-------------------------------------------------------------------------
2 *
3 * schemacmds.c
4 * schema creation/manipulation commands
5 *
6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/schemacmds.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/dependency.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/pg_authid.h"
26 #include "catalog/pg_namespace.h"
27 #include "commands/dbcommands.h"
28 #include "commands/event_trigger.h"
29 #include "commands/schemacmds.h"
30 #include "miscadmin.h"
31 #include "parser/parse_utilcmd.h"
32 #include "tcop/utility.h"
33 #include "utils/acl.h"
34 #include "utils/builtins.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
37
38 static void AlterSchemaOwner_internal(HeapTuple tup, Relation rel, Oid newOwnerId);
39
40 /*
41 * CREATE SCHEMA
42 *
43 * Note: caller should pass in location information for the whole
44 * CREATE SCHEMA statement, which in turn we pass down as the location
45 * of the component commands. This comports with our general plan of
46 * reporting location/len for the whole command even when executing
47 * a subquery.
48 */
49 Oid
CreateSchemaCommand(CreateSchemaStmt * stmt,const char * queryString,int stmt_location,int stmt_len)50 CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString,
51 int stmt_location, int stmt_len)
52 {
53 const char *schemaName = stmt->schemaname;
54 Oid namespaceId;
55 OverrideSearchPath *overridePath;
56 List *parsetree_list;
57 ListCell *parsetree_item;
58 Oid owner_uid;
59 Oid saved_uid;
60 int save_sec_context;
61 AclResult aclresult;
62 ObjectAddress address;
63
64 GetUserIdAndSecContext(&saved_uid, &save_sec_context);
65
66 /*
67 * Who is supposed to own the new schema?
68 */
69 if (stmt->authrole)
70 owner_uid = get_rolespec_oid(stmt->authrole, false);
71 else
72 owner_uid = saved_uid;
73
74 /* fill schema name with the user name if not specified */
75 if (!schemaName)
76 {
77 HeapTuple tuple;
78
79 tuple = SearchSysCache1(AUTHOID, ObjectIdGetDatum(owner_uid));
80 if (!HeapTupleIsValid(tuple))
81 elog(ERROR, "cache lookup failed for role %u", owner_uid);
82 schemaName =
83 pstrdup(NameStr(((Form_pg_authid) GETSTRUCT(tuple))->rolname));
84 ReleaseSysCache(tuple);
85 }
86
87 /*
88 * To create a schema, must have schema-create privilege on the current
89 * database and must be able to become the target role (this does not
90 * imply that the target role itself must have create-schema privilege).
91 * The latter provision guards against "giveaway" attacks. Note that a
92 * superuser will always have both of these privileges a fortiori.
93 */
94 aclresult = pg_database_aclcheck(MyDatabaseId, saved_uid, ACL_CREATE);
95 if (aclresult != ACLCHECK_OK)
96 aclcheck_error(aclresult, OBJECT_DATABASE,
97 get_database_name(MyDatabaseId));
98
99 check_is_member_of_role(saved_uid, owner_uid);
100
101 /* Additional check to protect reserved schema names */
102 if (!allowSystemTableMods && IsReservedName(schemaName))
103 ereport(ERROR,
104 (errcode(ERRCODE_RESERVED_NAME),
105 errmsg("unacceptable schema name \"%s\"", schemaName),
106 errdetail("The prefix \"pg_\" is reserved for system schemas.")));
107
108 /*
109 * If if_not_exists was given and the schema already exists, bail out.
110 * (Note: we needn't check this when not if_not_exists, because
111 * NamespaceCreate will complain anyway.) We could do this before making
112 * the permissions checks, but since CREATE TABLE IF NOT EXISTS makes its
113 * creation-permission check first, we do likewise.
114 */
115 if (stmt->if_not_exists &&
116 SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(schemaName)))
117 {
118 ereport(NOTICE,
119 (errcode(ERRCODE_DUPLICATE_SCHEMA),
120 errmsg("schema \"%s\" already exists, skipping",
121 schemaName)));
122 return InvalidOid;
123 }
124
125 /*
126 * If the requested authorization is different from the current user,
127 * temporarily set the current user so that the object(s) will be created
128 * with the correct ownership.
129 *
130 * (The setting will be restored at the end of this routine, or in case of
131 * error, transaction abort will clean things up.)
132 */
133 if (saved_uid != owner_uid)
134 SetUserIdAndSecContext(owner_uid,
135 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
136
137 /* Create the schema's namespace */
138 namespaceId = NamespaceCreate(schemaName, owner_uid, false);
139
140 /* Advance cmd counter to make the namespace visible */
141 CommandCounterIncrement();
142
143 /*
144 * Temporarily make the new namespace be the front of the search path, as
145 * well as the default creation target namespace. This will be undone at
146 * the end of this routine, or upon error.
147 */
148 overridePath = GetOverrideSearchPath(CurrentMemoryContext);
149 overridePath->schemas = lcons_oid(namespaceId, overridePath->schemas);
150 /* XXX should we clear overridePath->useTemp? */
151 PushOverrideSearchPath(overridePath);
152
153 /*
154 * Report the new schema to possibly interested event triggers. Note we
155 * must do this here and not in ProcessUtilitySlow because otherwise the
156 * objects created below are reported before the schema, which would be
157 * wrong.
158 */
159 ObjectAddressSet(address, NamespaceRelationId, namespaceId);
160 EventTriggerCollectSimpleCommand(address, InvalidObjectAddress,
161 (Node *) stmt);
162
163 /*
164 * Examine the list of commands embedded in the CREATE SCHEMA command, and
165 * reorganize them into a sequentially executable order with no forward
166 * references. Note that the result is still a list of raw parsetrees ---
167 * we cannot, in general, run parse analysis on one statement until we
168 * have actually executed the prior ones.
169 */
170 parsetree_list = transformCreateSchemaStmt(stmt);
171
172 /*
173 * Execute each command contained in the CREATE SCHEMA. Since the grammar
174 * allows only utility commands in CREATE SCHEMA, there is no need to pass
175 * them through parse_analyze() or the rewriter; we can just hand them
176 * straight to ProcessUtility.
177 */
178 foreach(parsetree_item, parsetree_list)
179 {
180 Node *stmt = (Node *) lfirst(parsetree_item);
181 PlannedStmt *wrapper;
182
183 /* need to make a wrapper PlannedStmt */
184 wrapper = makeNode(PlannedStmt);
185 wrapper->commandType = CMD_UTILITY;
186 wrapper->canSetTag = false;
187 wrapper->utilityStmt = stmt;
188 wrapper->stmt_location = stmt_location;
189 wrapper->stmt_len = stmt_len;
190
191 /* do this step */
192 ProcessUtility(wrapper,
193 queryString,
194 false,
195 PROCESS_UTILITY_SUBCOMMAND,
196 NULL,
197 NULL,
198 None_Receiver,
199 NULL);
200
201 /* make sure later steps can see the object created here */
202 CommandCounterIncrement();
203 }
204
205 /* Reset search path to normal state */
206 PopOverrideSearchPath();
207
208 /* Reset current user and security context */
209 SetUserIdAndSecContext(saved_uid, save_sec_context);
210
211 return namespaceId;
212 }
213
214
215 /*
216 * Rename schema
217 */
218 ObjectAddress
RenameSchema(const char * oldname,const char * newname)219 RenameSchema(const char *oldname, const char *newname)
220 {
221 Oid nspOid;
222 HeapTuple tup;
223 Relation rel;
224 AclResult aclresult;
225 ObjectAddress address;
226 Form_pg_namespace nspform;
227
228 rel = table_open(NamespaceRelationId, RowExclusiveLock);
229
230 tup = SearchSysCacheCopy1(NAMESPACENAME, CStringGetDatum(oldname));
231 if (!HeapTupleIsValid(tup))
232 ereport(ERROR,
233 (errcode(ERRCODE_UNDEFINED_SCHEMA),
234 errmsg("schema \"%s\" does not exist", oldname)));
235
236 nspform = (Form_pg_namespace) GETSTRUCT(tup);
237 nspOid = nspform->oid;
238
239 /* make sure the new name doesn't exist */
240 if (OidIsValid(get_namespace_oid(newname, true)))
241 ereport(ERROR,
242 (errcode(ERRCODE_DUPLICATE_SCHEMA),
243 errmsg("schema \"%s\" already exists", newname)));
244
245 /* must be owner */
246 if (!pg_namespace_ownercheck(nspOid, GetUserId()))
247 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
248 oldname);
249
250 /* must have CREATE privilege on database */
251 aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
252 if (aclresult != ACLCHECK_OK)
253 aclcheck_error(aclresult, OBJECT_DATABASE,
254 get_database_name(MyDatabaseId));
255
256 if (!allowSystemTableMods && IsReservedName(newname))
257 ereport(ERROR,
258 (errcode(ERRCODE_RESERVED_NAME),
259 errmsg("unacceptable schema name \"%s\"", newname),
260 errdetail("The prefix \"pg_\" is reserved for system schemas.")));
261
262 /* rename */
263 namestrcpy(&nspform->nspname, newname);
264 CatalogTupleUpdate(rel, &tup->t_self, tup);
265
266 InvokeObjectPostAlterHook(NamespaceRelationId, nspOid, 0);
267
268 ObjectAddressSet(address, NamespaceRelationId, nspOid);
269
270 table_close(rel, NoLock);
271 heap_freetuple(tup);
272
273 return address;
274 }
275
276 void
AlterSchemaOwner_oid(Oid oid,Oid newOwnerId)277 AlterSchemaOwner_oid(Oid oid, Oid newOwnerId)
278 {
279 HeapTuple tup;
280 Relation rel;
281
282 rel = table_open(NamespaceRelationId, RowExclusiveLock);
283
284 tup = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(oid));
285 if (!HeapTupleIsValid(tup))
286 elog(ERROR, "cache lookup failed for schema %u", oid);
287
288 AlterSchemaOwner_internal(tup, rel, newOwnerId);
289
290 ReleaseSysCache(tup);
291
292 table_close(rel, RowExclusiveLock);
293 }
294
295
296 /*
297 * Change schema owner
298 */
299 ObjectAddress
AlterSchemaOwner(const char * name,Oid newOwnerId)300 AlterSchemaOwner(const char *name, Oid newOwnerId)
301 {
302 Oid nspOid;
303 HeapTuple tup;
304 Relation rel;
305 ObjectAddress address;
306 Form_pg_namespace nspform;
307
308 rel = table_open(NamespaceRelationId, RowExclusiveLock);
309
310 tup = SearchSysCache1(NAMESPACENAME, CStringGetDatum(name));
311 if (!HeapTupleIsValid(tup))
312 ereport(ERROR,
313 (errcode(ERRCODE_UNDEFINED_SCHEMA),
314 errmsg("schema \"%s\" does not exist", name)));
315
316 nspform = (Form_pg_namespace) GETSTRUCT(tup);
317 nspOid = nspform->oid;
318
319 AlterSchemaOwner_internal(tup, rel, newOwnerId);
320
321 ObjectAddressSet(address, NamespaceRelationId, nspOid);
322
323 ReleaseSysCache(tup);
324
325 table_close(rel, RowExclusiveLock);
326
327 return address;
328 }
329
330 static void
AlterSchemaOwner_internal(HeapTuple tup,Relation rel,Oid newOwnerId)331 AlterSchemaOwner_internal(HeapTuple tup, Relation rel, Oid newOwnerId)
332 {
333 Form_pg_namespace nspForm;
334
335 Assert(tup->t_tableOid == NamespaceRelationId);
336 Assert(RelationGetRelid(rel) == NamespaceRelationId);
337
338 nspForm = (Form_pg_namespace) GETSTRUCT(tup);
339
340 /*
341 * If the new owner is the same as the existing owner, consider the
342 * command to have succeeded. This is for dump restoration purposes.
343 */
344 if (nspForm->nspowner != newOwnerId)
345 {
346 Datum repl_val[Natts_pg_namespace];
347 bool repl_null[Natts_pg_namespace];
348 bool repl_repl[Natts_pg_namespace];
349 Acl *newAcl;
350 Datum aclDatum;
351 bool isNull;
352 HeapTuple newtuple;
353 AclResult aclresult;
354
355 /* Otherwise, must be owner of the existing object */
356 if (!pg_namespace_ownercheck(nspForm->oid, GetUserId()))
357 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
358 NameStr(nspForm->nspname));
359
360 /* Must be able to become new owner */
361 check_is_member_of_role(GetUserId(), newOwnerId);
362
363 /*
364 * must have create-schema rights
365 *
366 * NOTE: This is different from other alter-owner checks in that the
367 * current user is checked for create privileges instead of the
368 * destination owner. This is consistent with the CREATE case for
369 * schemas. Because superusers will always have this right, we need
370 * no special case for them.
371 */
372 aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(),
373 ACL_CREATE);
374 if (aclresult != ACLCHECK_OK)
375 aclcheck_error(aclresult, OBJECT_DATABASE,
376 get_database_name(MyDatabaseId));
377
378 memset(repl_null, false, sizeof(repl_null));
379 memset(repl_repl, false, sizeof(repl_repl));
380
381 repl_repl[Anum_pg_namespace_nspowner - 1] = true;
382 repl_val[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(newOwnerId);
383
384 /*
385 * Determine the modified ACL for the new owner. This is only
386 * necessary when the ACL is non-null.
387 */
388 aclDatum = SysCacheGetAttr(NAMESPACENAME, tup,
389 Anum_pg_namespace_nspacl,
390 &isNull);
391 if (!isNull)
392 {
393 newAcl = aclnewowner(DatumGetAclP(aclDatum),
394 nspForm->nspowner, newOwnerId);
395 repl_repl[Anum_pg_namespace_nspacl - 1] = true;
396 repl_val[Anum_pg_namespace_nspacl - 1] = PointerGetDatum(newAcl);
397 }
398
399 newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
400
401 CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
402
403 heap_freetuple(newtuple);
404
405 /* Update owner dependency reference */
406 changeDependencyOnOwner(NamespaceRelationId, nspForm->oid,
407 newOwnerId);
408 }
409
410 InvokeObjectPostAlterHook(NamespaceRelationId,
411 nspForm->oid, 0);
412 }
413