1 /*-------------------------------------------------------------------------
2  *
3  * dbcommands.c
4  *		Database management commands (create/drop database).
5  *
6  * Note: database creation/destruction commands use exclusive locks on
7  * the database objects (as expressed by LockSharedObject()) to avoid
8  * stepping on each others' toes.  Formerly we used table-level locks
9  * on pg_database, but that's too coarse-grained.
10  *
11  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
12  * Portions Copyright (c) 1994, Regents of the University of California
13  *
14  *
15  * IDENTIFICATION
16  *	  src/backend/commands/dbcommands.c
17  *
18  *-------------------------------------------------------------------------
19  */
20 #include "postgres.h"
21 
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <sys/stat.h>
25 
26 #include "access/genam.h"
27 #include "access/heapam.h"
28 #include "access/htup_details.h"
29 #include "access/xact.h"
30 #include "access/xloginsert.h"
31 #include "access/xlogutils.h"
32 #include "catalog/catalog.h"
33 #include "catalog/dependency.h"
34 #include "catalog/indexing.h"
35 #include "catalog/objectaccess.h"
36 #include "catalog/pg_authid.h"
37 #include "catalog/pg_database.h"
38 #include "catalog/pg_db_role_setting.h"
39 #include "catalog/pg_subscription.h"
40 #include "catalog/pg_tablespace.h"
41 #include "commands/comment.h"
42 #include "commands/dbcommands.h"
43 #include "commands/dbcommands_xlog.h"
44 #include "commands/defrem.h"
45 #include "commands/seclabel.h"
46 #include "commands/tablespace.h"
47 #include "mb/pg_wchar.h"
48 #include "miscadmin.h"
49 #include "pgstat.h"
50 #include "postmaster/bgwriter.h"
51 #include "replication/slot.h"
52 #include "storage/copydir.h"
53 #include "storage/fd.h"
54 #include "storage/lmgr.h"
55 #include "storage/ipc.h"
56 #include "storage/procarray.h"
57 #include "storage/smgr.h"
58 #include "utils/acl.h"
59 #include "utils/builtins.h"
60 #include "utils/fmgroids.h"
61 #include "utils/pg_locale.h"
62 #include "utils/snapmgr.h"
63 #include "utils/syscache.h"
64 #include "utils/tqual.h"
65 
66 
67 typedef struct
68 {
69 	Oid			src_dboid;		/* source (template) DB */
70 	Oid			dest_dboid;		/* DB we are trying to create */
71 } createdb_failure_params;
72 
73 typedef struct
74 {
75 	Oid			dest_dboid;		/* DB we are trying to move */
76 	Oid			dest_tsoid;		/* tablespace we are trying to move to */
77 } movedb_failure_params;
78 
79 /* non-export function prototypes */
80 static void createdb_failure_callback(int code, Datum arg);
81 static void movedb(const char *dbname, const char *tblspcname);
82 static void movedb_failure_callback(int code, Datum arg);
83 static bool get_db_info(const char *name, LOCKMODE lockmode,
84 			Oid *dbIdP, Oid *ownerIdP,
85 			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
86 			Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
87 			MultiXactId *dbMinMultiP,
88 			Oid *dbTablespace, char **dbCollate, char **dbCtype);
89 static bool have_createdb_privilege(void);
90 static void remove_dbtablespaces(Oid db_id);
91 static bool check_db_file_conflict(Oid db_id);
92 static int	errdetail_busy_db(int notherbackends, int npreparedxacts);
93 
94 
95 /*
96  * CREATE DATABASE
97  */
98 Oid
createdb(ParseState * pstate,const CreatedbStmt * stmt)99 createdb(ParseState *pstate, const CreatedbStmt *stmt)
100 {
101 	HeapScanDesc scan;
102 	Relation	rel;
103 	Oid			src_dboid;
104 	Oid			src_owner;
105 	int			src_encoding;
106 	char	   *src_collate;
107 	char	   *src_ctype;
108 	bool		src_istemplate;
109 	bool		src_allowconn;
110 	Oid			src_lastsysoid;
111 	TransactionId src_frozenxid;
112 	MultiXactId src_minmxid;
113 	Oid			src_deftablespace;
114 	volatile Oid dst_deftablespace;
115 	Relation	pg_database_rel;
116 	HeapTuple	tuple;
117 	Datum		new_record[Natts_pg_database];
118 	bool		new_record_nulls[Natts_pg_database];
119 	Oid			dboid;
120 	Oid			datdba;
121 	ListCell   *option;
122 	DefElem    *dtablespacename = NULL;
123 	DefElem    *downer = NULL;
124 	DefElem    *dtemplate = NULL;
125 	DefElem    *dencoding = NULL;
126 	DefElem    *dcollate = NULL;
127 	DefElem    *dctype = NULL;
128 	DefElem    *distemplate = NULL;
129 	DefElem    *dallowconnections = NULL;
130 	DefElem    *dconnlimit = NULL;
131 	char	   *dbname = stmt->dbname;
132 	char	   *dbowner = NULL;
133 	const char *dbtemplate = NULL;
134 	char	   *dbcollate = NULL;
135 	char	   *dbctype = NULL;
136 	char	   *canonname;
137 	int			encoding = -1;
138 	bool		dbistemplate = false;
139 	bool		dballowconnections = true;
140 	int			dbconnlimit = -1;
141 	int			notherbackends;
142 	int			npreparedxacts;
143 	createdb_failure_params fparms;
144 
145 	/* Extract options from the statement node tree */
146 	foreach(option, stmt->options)
147 	{
148 		DefElem    *defel = (DefElem *) lfirst(option);
149 
150 		if (strcmp(defel->defname, "tablespace") == 0)
151 		{
152 			if (dtablespacename)
153 				ereport(ERROR,
154 						(errcode(ERRCODE_SYNTAX_ERROR),
155 						 errmsg("conflicting or redundant options"),
156 						 parser_errposition(pstate, defel->location)));
157 			dtablespacename = defel;
158 		}
159 		else if (strcmp(defel->defname, "owner") == 0)
160 		{
161 			if (downer)
162 				ereport(ERROR,
163 						(errcode(ERRCODE_SYNTAX_ERROR),
164 						 errmsg("conflicting or redundant options"),
165 						 parser_errposition(pstate, defel->location)));
166 			downer = defel;
167 		}
168 		else if (strcmp(defel->defname, "template") == 0)
169 		{
170 			if (dtemplate)
171 				ereport(ERROR,
172 						(errcode(ERRCODE_SYNTAX_ERROR),
173 						 errmsg("conflicting or redundant options"),
174 						 parser_errposition(pstate, defel->location)));
175 			dtemplate = defel;
176 		}
177 		else if (strcmp(defel->defname, "encoding") == 0)
178 		{
179 			if (dencoding)
180 				ereport(ERROR,
181 						(errcode(ERRCODE_SYNTAX_ERROR),
182 						 errmsg("conflicting or redundant options"),
183 						 parser_errposition(pstate, defel->location)));
184 			dencoding = defel;
185 		}
186 		else if (strcmp(defel->defname, "lc_collate") == 0)
187 		{
188 			if (dcollate)
189 				ereport(ERROR,
190 						(errcode(ERRCODE_SYNTAX_ERROR),
191 						 errmsg("conflicting or redundant options"),
192 						 parser_errposition(pstate, defel->location)));
193 			dcollate = defel;
194 		}
195 		else if (strcmp(defel->defname, "lc_ctype") == 0)
196 		{
197 			if (dctype)
198 				ereport(ERROR,
199 						(errcode(ERRCODE_SYNTAX_ERROR),
200 						 errmsg("conflicting or redundant options"),
201 						 parser_errposition(pstate, defel->location)));
202 			dctype = defel;
203 		}
204 		else if (strcmp(defel->defname, "is_template") == 0)
205 		{
206 			if (distemplate)
207 				ereport(ERROR,
208 						(errcode(ERRCODE_SYNTAX_ERROR),
209 						 errmsg("conflicting or redundant options"),
210 						 parser_errposition(pstate, defel->location)));
211 			distemplate = defel;
212 		}
213 		else if (strcmp(defel->defname, "allow_connections") == 0)
214 		{
215 			if (dallowconnections)
216 				ereport(ERROR,
217 						(errcode(ERRCODE_SYNTAX_ERROR),
218 						 errmsg("conflicting or redundant options"),
219 						 parser_errposition(pstate, defel->location)));
220 			dallowconnections = defel;
221 		}
222 		else if (strcmp(defel->defname, "connection_limit") == 0)
223 		{
224 			if (dconnlimit)
225 				ereport(ERROR,
226 						(errcode(ERRCODE_SYNTAX_ERROR),
227 						 errmsg("conflicting or redundant options"),
228 						 parser_errposition(pstate, defel->location)));
229 			dconnlimit = defel;
230 		}
231 		else if (strcmp(defel->defname, "location") == 0)
232 		{
233 			ereport(WARNING,
234 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
235 					 errmsg("LOCATION is not supported anymore"),
236 					 errhint("Consider using tablespaces instead."),
237 					 parser_errposition(pstate, defel->location)));
238 		}
239 		else
240 			ereport(ERROR,
241 					(errcode(ERRCODE_SYNTAX_ERROR),
242 					 errmsg("option \"%s\" not recognized", defel->defname),
243 					 parser_errposition(pstate, defel->location)));
244 	}
245 
246 	if (downer && downer->arg)
247 		dbowner = defGetString(downer);
248 	if (dtemplate && dtemplate->arg)
249 		dbtemplate = defGetString(dtemplate);
250 	if (dencoding && dencoding->arg)
251 	{
252 		const char *encoding_name;
253 
254 		if (IsA(dencoding->arg, Integer))
255 		{
256 			encoding = defGetInt32(dencoding);
257 			encoding_name = pg_encoding_to_char(encoding);
258 			if (strcmp(encoding_name, "") == 0 ||
259 				pg_valid_server_encoding(encoding_name) < 0)
260 				ereport(ERROR,
261 						(errcode(ERRCODE_UNDEFINED_OBJECT),
262 						 errmsg("%d is not a valid encoding code",
263 								encoding),
264 						 parser_errposition(pstate, dencoding->location)));
265 		}
266 		else
267 		{
268 			encoding_name = defGetString(dencoding);
269 			encoding = pg_valid_server_encoding(encoding_name);
270 			if (encoding < 0)
271 				ereport(ERROR,
272 						(errcode(ERRCODE_UNDEFINED_OBJECT),
273 						 errmsg("%s is not a valid encoding name",
274 								encoding_name),
275 						 parser_errposition(pstate, dencoding->location)));
276 		}
277 	}
278 	if (dcollate && dcollate->arg)
279 		dbcollate = defGetString(dcollate);
280 	if (dctype && dctype->arg)
281 		dbctype = defGetString(dctype);
282 	if (distemplate && distemplate->arg)
283 		dbistemplate = defGetBoolean(distemplate);
284 	if (dallowconnections && dallowconnections->arg)
285 		dballowconnections = defGetBoolean(dallowconnections);
286 	if (dconnlimit && dconnlimit->arg)
287 	{
288 		dbconnlimit = defGetInt32(dconnlimit);
289 		if (dbconnlimit < -1)
290 			ereport(ERROR,
291 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
292 					 errmsg("invalid connection limit: %d", dbconnlimit)));
293 	}
294 
295 	/* obtain OID of proposed owner */
296 	if (dbowner)
297 		datdba = get_role_oid(dbowner, false);
298 	else
299 		datdba = GetUserId();
300 
301 	/*
302 	 * To create a database, must have createdb privilege and must be able to
303 	 * become the target role (this does not imply that the target role itself
304 	 * must have createdb privilege).  The latter provision guards against
305 	 * "giveaway" attacks.  Note that a superuser will always have both of
306 	 * these privileges a fortiori.
307 	 */
308 	if (!have_createdb_privilege())
309 		ereport(ERROR,
310 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
311 				 errmsg("permission denied to create database")));
312 
313 	check_is_member_of_role(GetUserId(), datdba);
314 
315 	/*
316 	 * Lookup database (template) to be cloned, and obtain share lock on it.
317 	 * ShareLock allows two CREATE DATABASEs to work from the same template
318 	 * concurrently, while ensuring no one is busy dropping it in parallel
319 	 * (which would be Very Bad since we'd likely get an incomplete copy
320 	 * without knowing it).  This also prevents any new connections from being
321 	 * made to the source until we finish copying it, so we can be sure it
322 	 * won't change underneath us.
323 	 */
324 	if (!dbtemplate)
325 		dbtemplate = "template1";	/* Default template database name */
326 
327 	if (!get_db_info(dbtemplate, ShareLock,
328 					 &src_dboid, &src_owner, &src_encoding,
329 					 &src_istemplate, &src_allowconn, &src_lastsysoid,
330 					 &src_frozenxid, &src_minmxid, &src_deftablespace,
331 					 &src_collate, &src_ctype))
332 		ereport(ERROR,
333 				(errcode(ERRCODE_UNDEFINED_DATABASE),
334 				 errmsg("template database \"%s\" does not exist",
335 						dbtemplate)));
336 
337 	/*
338 	 * Permission check: to copy a DB that's not marked datistemplate, you
339 	 * must be superuser or the owner thereof.
340 	 */
341 	if (!src_istemplate)
342 	{
343 		if (!pg_database_ownercheck(src_dboid, GetUserId()))
344 			ereport(ERROR,
345 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
346 					 errmsg("permission denied to copy database \"%s\"",
347 							dbtemplate)));
348 	}
349 
350 	/* If encoding or locales are defaulted, use source's setting */
351 	if (encoding < 0)
352 		encoding = src_encoding;
353 	if (dbcollate == NULL)
354 		dbcollate = src_collate;
355 	if (dbctype == NULL)
356 		dbctype = src_ctype;
357 
358 	/* Some encodings are client only */
359 	if (!PG_VALID_BE_ENCODING(encoding))
360 		ereport(ERROR,
361 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
362 				 errmsg("invalid server encoding %d", encoding)));
363 
364 	/* Check that the chosen locales are valid, and get canonical spellings */
365 	if (!check_locale(LC_COLLATE, dbcollate, &canonname))
366 		ereport(ERROR,
367 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
368 				 errmsg("invalid locale name: \"%s\"", dbcollate)));
369 	dbcollate = canonname;
370 	if (!check_locale(LC_CTYPE, dbctype, &canonname))
371 		ereport(ERROR,
372 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
373 				 errmsg("invalid locale name: \"%s\"", dbctype)));
374 	dbctype = canonname;
375 
376 	check_encoding_locale_matches(encoding, dbcollate, dbctype);
377 
378 	/*
379 	 * Check that the new encoding and locale settings match the source
380 	 * database.  We insist on this because we simply copy the source data ---
381 	 * any non-ASCII data would be wrongly encoded, and any indexes sorted
382 	 * according to the source locale would be wrong.
383 	 *
384 	 * However, we assume that template0 doesn't contain any non-ASCII data
385 	 * nor any indexes that depend on collation or ctype, so template0 can be
386 	 * used as template for creating a database with any encoding or locale.
387 	 */
388 	if (strcmp(dbtemplate, "template0") != 0)
389 	{
390 		if (encoding != src_encoding)
391 			ereport(ERROR,
392 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
393 					 errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
394 							pg_encoding_to_char(encoding),
395 							pg_encoding_to_char(src_encoding)),
396 					 errhint("Use the same encoding as in the template database, or use template0 as template.")));
397 
398 		if (strcmp(dbcollate, src_collate) != 0)
399 			ereport(ERROR,
400 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
401 					 errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
402 							dbcollate, src_collate),
403 					 errhint("Use the same collation as in the template database, or use template0 as template.")));
404 
405 		if (strcmp(dbctype, src_ctype) != 0)
406 			ereport(ERROR,
407 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
408 					 errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
409 							dbctype, src_ctype),
410 					 errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
411 	}
412 
413 	/* Resolve default tablespace for new database */
414 	if (dtablespacename && dtablespacename->arg)
415 	{
416 		char	   *tablespacename;
417 		AclResult	aclresult;
418 
419 		tablespacename = defGetString(dtablespacename);
420 		dst_deftablespace = get_tablespace_oid(tablespacename, false);
421 		/* check permissions */
422 		aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
423 										   ACL_CREATE);
424 		if (aclresult != ACLCHECK_OK)
425 			aclcheck_error(aclresult, OBJECT_TABLESPACE,
426 						   tablespacename);
427 
428 		/* pg_global must never be the default tablespace */
429 		if (dst_deftablespace == GLOBALTABLESPACE_OID)
430 			ereport(ERROR,
431 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
432 					 errmsg("pg_global cannot be used as default tablespace")));
433 
434 		/*
435 		 * If we are trying to change the default tablespace of the template,
436 		 * we require that the template not have any files in the new default
437 		 * tablespace.  This is necessary because otherwise the copied
438 		 * database would contain pg_class rows that refer to its default
439 		 * tablespace both explicitly (by OID) and implicitly (as zero), which
440 		 * would cause problems.  For example another CREATE DATABASE using
441 		 * the copied database as template, and trying to change its default
442 		 * tablespace again, would yield outright incorrect results (it would
443 		 * improperly move tables to the new default tablespace that should
444 		 * stay in the same tablespace).
445 		 */
446 		if (dst_deftablespace != src_deftablespace)
447 		{
448 			char	   *srcpath;
449 			struct stat st;
450 
451 			srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
452 
453 			if (stat(srcpath, &st) == 0 &&
454 				S_ISDIR(st.st_mode) &&
455 				!directory_is_empty(srcpath))
456 				ereport(ERROR,
457 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458 						 errmsg("cannot assign new default tablespace \"%s\"",
459 								tablespacename),
460 						 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
461 								   dbtemplate)));
462 			pfree(srcpath);
463 		}
464 	}
465 	else
466 	{
467 		/* Use template database's default tablespace */
468 		dst_deftablespace = src_deftablespace;
469 		/* Note there is no additional permission check in this path */
470 	}
471 
472 	/*
473 	 * Check for db name conflict.  This is just to give a more friendly error
474 	 * message than "unique index violation".  There's a race condition but
475 	 * we're willing to accept the less friendly message in that case.
476 	 */
477 	if (OidIsValid(get_database_oid(dbname, true)))
478 		ereport(ERROR,
479 				(errcode(ERRCODE_DUPLICATE_DATABASE),
480 				 errmsg("database \"%s\" already exists", dbname)));
481 
482 	/*
483 	 * The source DB can't have any active backends, except this one
484 	 * (exception is to allow CREATE DB while connected to template1).
485 	 * Otherwise we might copy inconsistent data.
486 	 *
487 	 * This should be last among the basic error checks, because it involves
488 	 * potential waiting; we may as well throw an error first if we're gonna
489 	 * throw one.
490 	 */
491 	if (CountOtherDBBackends(src_dboid, &notherbackends, &npreparedxacts))
492 		ereport(ERROR,
493 				(errcode(ERRCODE_OBJECT_IN_USE),
494 				 errmsg("source database \"%s\" is being accessed by other users",
495 						dbtemplate),
496 				 errdetail_busy_db(notherbackends, npreparedxacts)));
497 
498 	/*
499 	 * Select an OID for the new database, checking that it doesn't have a
500 	 * filename conflict with anything already existing in the tablespace
501 	 * directories.
502 	 */
503 	pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
504 
505 	do
506 	{
507 		dboid = GetNewOid(pg_database_rel);
508 	} while (check_db_file_conflict(dboid));
509 
510 	/*
511 	 * Insert a new tuple into pg_database.  This establishes our ownership of
512 	 * the new database name (anyone else trying to insert the same name will
513 	 * block on the unique index, and fail after we commit).
514 	 */
515 
516 	/* Form tuple */
517 	MemSet(new_record, 0, sizeof(new_record));
518 	MemSet(new_record_nulls, false, sizeof(new_record_nulls));
519 
520 	new_record[Anum_pg_database_datname - 1] =
521 		DirectFunctionCall1(namein, CStringGetDatum(dbname));
522 	new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
523 	new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
524 	new_record[Anum_pg_database_datcollate - 1] =
525 		DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
526 	new_record[Anum_pg_database_datctype - 1] =
527 		DirectFunctionCall1(namein, CStringGetDatum(dbctype));
528 	new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
529 	new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
530 	new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
531 	new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
532 	new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
533 	new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
534 	new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
535 
536 	/*
537 	 * We deliberately set datacl to default (NULL), rather than copying it
538 	 * from the template database.  Copying it would be a bad idea when the
539 	 * owner is not the same as the template's owner.
540 	 */
541 	new_record_nulls[Anum_pg_database_datacl - 1] = true;
542 
543 	tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
544 							new_record, new_record_nulls);
545 
546 	HeapTupleSetOid(tuple, dboid);
547 
548 	CatalogTupleInsert(pg_database_rel, tuple);
549 
550 	/*
551 	 * Now generate additional catalog entries associated with the new DB
552 	 */
553 
554 	/* Register owner dependency */
555 	recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
556 
557 	/* Create pg_shdepend entries for objects within database */
558 	copyTemplateDependencies(src_dboid, dboid);
559 
560 	/* Post creation hook for new database */
561 	InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
562 
563 	/*
564 	 * Force a checkpoint before starting the copy. This will force all dirty
565 	 * buffers, including those of unlogged tables, out to disk, to ensure
566 	 * source database is up-to-date on disk for the copy.
567 	 * FlushDatabaseBuffers() would suffice for that, but we also want to
568 	 * process any pending unlink requests. Otherwise, if a checkpoint
569 	 * happened while we're copying files, a file might be deleted just when
570 	 * we're about to copy it, causing the lstat() call in copydir() to fail
571 	 * with ENOENT.
572 	 */
573 	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
574 					  | CHECKPOINT_FLUSH_ALL);
575 
576 	/*
577 	 * Once we start copying subdirectories, we need to be able to clean 'em
578 	 * up if we fail.  Use an ENSURE block to make sure this happens.  (This
579 	 * is not a 100% solution, because of the possibility of failure during
580 	 * transaction commit after we leave this routine, but it should handle
581 	 * most scenarios.)
582 	 */
583 	fparms.src_dboid = src_dboid;
584 	fparms.dest_dboid = dboid;
585 	PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
586 							PointerGetDatum(&fparms));
587 	{
588 		/*
589 		 * Iterate through all tablespaces of the template database, and copy
590 		 * each one to the new database.
591 		 */
592 		rel = heap_open(TableSpaceRelationId, AccessShareLock);
593 		scan = heap_beginscan_catalog(rel, 0, NULL);
594 		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
595 		{
596 			Oid			srctablespace = HeapTupleGetOid(tuple);
597 			Oid			dsttablespace;
598 			char	   *srcpath;
599 			char	   *dstpath;
600 			struct stat st;
601 
602 			/* No need to copy global tablespace */
603 			if (srctablespace == GLOBALTABLESPACE_OID)
604 				continue;
605 
606 			srcpath = GetDatabasePath(src_dboid, srctablespace);
607 
608 			if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
609 				directory_is_empty(srcpath))
610 			{
611 				/* Assume we can ignore it */
612 				pfree(srcpath);
613 				continue;
614 			}
615 
616 			if (srctablespace == src_deftablespace)
617 				dsttablespace = dst_deftablespace;
618 			else
619 				dsttablespace = srctablespace;
620 
621 			dstpath = GetDatabasePath(dboid, dsttablespace);
622 
623 			/*
624 			 * Copy this subdirectory to the new location
625 			 *
626 			 * We don't need to copy subdirectories
627 			 */
628 			copydir(srcpath, dstpath, false);
629 
630 			/* Record the filesystem change in XLOG */
631 			{
632 				xl_dbase_create_rec xlrec;
633 
634 				xlrec.db_id = dboid;
635 				xlrec.tablespace_id = dsttablespace;
636 				xlrec.src_db_id = src_dboid;
637 				xlrec.src_tablespace_id = srctablespace;
638 
639 				XLogBeginInsert();
640 				XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
641 
642 				(void) XLogInsert(RM_DBASE_ID,
643 								  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
644 			}
645 		}
646 		heap_endscan(scan);
647 		heap_close(rel, AccessShareLock);
648 
649 		/*
650 		 * We force a checkpoint before committing.  This effectively means
651 		 * that committed XLOG_DBASE_CREATE operations will never need to be
652 		 * replayed (at least not in ordinary crash recovery; we still have to
653 		 * make the XLOG entry for the benefit of PITR operations). This
654 		 * avoids two nasty scenarios:
655 		 *
656 		 * #1: When PITR is off, we don't XLOG the contents of newly created
657 		 * indexes; therefore the drop-and-recreate-whole-directory behavior
658 		 * of DBASE_CREATE replay would lose such indexes.
659 		 *
660 		 * #2: Since we have to recopy the source database during DBASE_CREATE
661 		 * replay, we run the risk of copying changes in it that were
662 		 * committed after the original CREATE DATABASE command but before the
663 		 * system crash that led to the replay.  This is at least unexpected
664 		 * and at worst could lead to inconsistencies, eg duplicate table
665 		 * names.
666 		 *
667 		 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
668 		 *
669 		 * In PITR replay, the first of these isn't an issue, and the second
670 		 * is only a risk if the CREATE DATABASE and subsequent template
671 		 * database change both occur while a base backup is being taken.
672 		 * There doesn't seem to be much we can do about that except document
673 		 * it as a limitation.
674 		 *
675 		 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
676 		 * we can avoid this.
677 		 */
678 		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
679 
680 		/*
681 		 * Close pg_database, but keep lock till commit.
682 		 */
683 		heap_close(pg_database_rel, NoLock);
684 
685 		/*
686 		 * Force synchronous commit, thus minimizing the window between
687 		 * creation of the database files and committal of the transaction. If
688 		 * we crash before committing, we'll have a DB that's taking up disk
689 		 * space but is not in pg_database, which is not good.
690 		 */
691 		ForceSyncCommit();
692 	}
693 	PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
694 								PointerGetDatum(&fparms));
695 
696 	return dboid;
697 }
698 
699 /*
700  * Check whether chosen encoding matches chosen locale settings.  This
701  * restriction is necessary because libc's locale-specific code usually
702  * fails when presented with data in an encoding it's not expecting. We
703  * allow mismatch in four cases:
704  *
705  * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
706  * which works with any encoding.
707  *
708  * 2. locale encoding = -1, which means that we couldn't determine the
709  * locale's encoding and have to trust the user to get it right.
710  *
711  * 3. selected encoding is UTF8 and platform is win32. This is because
712  * UTF8 is a pseudo codepage that is supported in all locales since it's
713  * converted to UTF16 before being used.
714  *
715  * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
716  * is risky but we have historically allowed it --- notably, the
717  * regression tests require it.
718  *
719  * Note: if you change this policy, fix initdb to match.
720  */
721 void
check_encoding_locale_matches(int encoding,const char * collate,const char * ctype)722 check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
723 {
724 	int			ctype_encoding = pg_get_encoding_from_locale(ctype, true);
725 	int			collate_encoding = pg_get_encoding_from_locale(collate, true);
726 
727 	if (!(ctype_encoding == encoding ||
728 		  ctype_encoding == PG_SQL_ASCII ||
729 		  ctype_encoding == -1 ||
730 #ifdef WIN32
731 		  encoding == PG_UTF8 ||
732 #endif
733 		  (encoding == PG_SQL_ASCII && superuser())))
734 		ereport(ERROR,
735 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
736 				 errmsg("encoding \"%s\" does not match locale \"%s\"",
737 						pg_encoding_to_char(encoding),
738 						ctype),
739 				 errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
740 						   pg_encoding_to_char(ctype_encoding))));
741 
742 	if (!(collate_encoding == encoding ||
743 		  collate_encoding == PG_SQL_ASCII ||
744 		  collate_encoding == -1 ||
745 #ifdef WIN32
746 		  encoding == PG_UTF8 ||
747 #endif
748 		  (encoding == PG_SQL_ASCII && superuser())))
749 		ereport(ERROR,
750 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
751 				 errmsg("encoding \"%s\" does not match locale \"%s\"",
752 						pg_encoding_to_char(encoding),
753 						collate),
754 				 errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
755 						   pg_encoding_to_char(collate_encoding))));
756 }
757 
758 /* Error cleanup callback for createdb */
759 static void
createdb_failure_callback(int code,Datum arg)760 createdb_failure_callback(int code, Datum arg)
761 {
762 	createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
763 
764 	/*
765 	 * Release lock on source database before doing recursive remove. This is
766 	 * not essential but it seems desirable to release the lock as soon as
767 	 * possible.
768 	 */
769 	UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
770 
771 	/* Throw away any successfully copied subdirectories */
772 	remove_dbtablespaces(fparms->dest_dboid);
773 }
774 
775 
776 /*
777  * DROP DATABASE
778  */
779 void
dropdb(const char * dbname,bool missing_ok)780 dropdb(const char *dbname, bool missing_ok)
781 {
782 	Oid			db_id;
783 	bool		db_istemplate;
784 	Relation	pgdbrel;
785 	HeapTuple	tup;
786 	int			notherbackends;
787 	int			npreparedxacts;
788 	int			nslots,
789 				nslots_active;
790 	int			nsubscriptions;
791 
792 	/*
793 	 * Look up the target database's OID, and get exclusive lock on it. We
794 	 * need this to ensure that no new backend starts up in the target
795 	 * database while we are deleting it (see postinit.c), and that no one is
796 	 * using it as a CREATE DATABASE template or trying to delete it for
797 	 * themselves.
798 	 */
799 	pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
800 
801 	if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
802 					 &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
803 	{
804 		if (!missing_ok)
805 		{
806 			ereport(ERROR,
807 					(errcode(ERRCODE_UNDEFINED_DATABASE),
808 					 errmsg("database \"%s\" does not exist", dbname)));
809 		}
810 		else
811 		{
812 			/* Close pg_database, release the lock, since we changed nothing */
813 			heap_close(pgdbrel, RowExclusiveLock);
814 			ereport(NOTICE,
815 					(errmsg("database \"%s\" does not exist, skipping",
816 							dbname)));
817 			return;
818 		}
819 	}
820 
821 	/*
822 	 * Permission checks
823 	 */
824 	if (!pg_database_ownercheck(db_id, GetUserId()))
825 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
826 					   dbname);
827 
828 	/* DROP hook for the database being removed */
829 	InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
830 
831 	/*
832 	 * Disallow dropping a DB that is marked istemplate.  This is just to
833 	 * prevent people from accidentally dropping template0 or template1; they
834 	 * can do so if they're really determined ...
835 	 */
836 	if (db_istemplate)
837 		ereport(ERROR,
838 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
839 				 errmsg("cannot drop a template database")));
840 
841 	/* Obviously can't drop my own database */
842 	if (db_id == MyDatabaseId)
843 		ereport(ERROR,
844 				(errcode(ERRCODE_OBJECT_IN_USE),
845 				 errmsg("cannot drop the currently open database")));
846 
847 	/*
848 	 * Check whether there are active logical slots that refer to the
849 	 * to-be-dropped database. The database lock we are holding prevents the
850 	 * creation of new slots using the database or existing slots becoming
851 	 * active.
852 	 */
853 	(void) ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active);
854 	if (nslots_active)
855 	{
856 		ereport(ERROR,
857 				(errcode(ERRCODE_OBJECT_IN_USE),
858 				 errmsg("database \"%s\" is used by an active logical replication slot",
859 						dbname),
860 				 errdetail_plural("There is %d active slot.",
861 								  "There are %d active slots.",
862 								  nslots_active, nslots_active)));
863 	}
864 
865 	/*
866 	 * Check for other backends in the target database.  (Because we hold the
867 	 * database lock, no new ones can start after this.)
868 	 *
869 	 * As in CREATE DATABASE, check this after other error conditions.
870 	 */
871 	if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
872 		ereport(ERROR,
873 				(errcode(ERRCODE_OBJECT_IN_USE),
874 				 errmsg("database \"%s\" is being accessed by other users",
875 						dbname),
876 				 errdetail_busy_db(notherbackends, npreparedxacts)));
877 
878 	/*
879 	 * Check if there are subscriptions defined in the target database.
880 	 *
881 	 * We can't drop them automatically because they might be holding
882 	 * resources in other databases/instances.
883 	 */
884 	if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
885 		ereport(ERROR,
886 				(errcode(ERRCODE_OBJECT_IN_USE),
887 				 errmsg("database \"%s\" is being used by logical replication subscription",
888 						dbname),
889 				 errdetail_plural("There is %d subscription.",
890 								  "There are %d subscriptions.",
891 								  nsubscriptions, nsubscriptions)));
892 
893 	/*
894 	 * Remove the database's tuple from pg_database.
895 	 */
896 	tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
897 	if (!HeapTupleIsValid(tup))
898 		elog(ERROR, "cache lookup failed for database %u", db_id);
899 
900 	CatalogTupleDelete(pgdbrel, &tup->t_self);
901 
902 	ReleaseSysCache(tup);
903 
904 	/*
905 	 * Delete any comments or security labels associated with the database.
906 	 */
907 	DeleteSharedComments(db_id, DatabaseRelationId);
908 	DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
909 
910 	/*
911 	 * Remove settings associated with this database
912 	 */
913 	DropSetting(db_id, InvalidOid);
914 
915 	/*
916 	 * Remove shared dependency references for the database.
917 	 */
918 	dropDatabaseDependencies(db_id);
919 
920 	/*
921 	 * Drop db-specific replication slots.
922 	 */
923 	ReplicationSlotsDropDBSlots(db_id);
924 
925 	/*
926 	 * Drop pages for this database that are in the shared buffer cache. This
927 	 * is important to ensure that no remaining backend tries to write out a
928 	 * dirty buffer to the dead database later...
929 	 */
930 	DropDatabaseBuffers(db_id);
931 
932 	/*
933 	 * Tell the stats collector to forget it immediately, too.
934 	 */
935 	pgstat_drop_database(db_id);
936 
937 	/*
938 	 * Tell checkpointer to forget any pending fsync and unlink requests for
939 	 * files in the database; else the fsyncs will fail at next checkpoint, or
940 	 * worse, it will delete files that belong to a newly created database
941 	 * with the same OID.
942 	 */
943 	ForgetDatabaseFsyncRequests(db_id);
944 
945 	/*
946 	 * Force a checkpoint to make sure the checkpointer has received the
947 	 * message sent by ForgetDatabaseFsyncRequests. On Windows, this also
948 	 * ensures that background procs don't hold any open files, which would
949 	 * cause rmdir() to fail.
950 	 */
951 	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
952 
953 	/*
954 	 * Remove all tablespace subdirs belonging to the database.
955 	 */
956 	remove_dbtablespaces(db_id);
957 
958 	/*
959 	 * Close pg_database, but keep lock till commit.
960 	 */
961 	heap_close(pgdbrel, NoLock);
962 
963 	/*
964 	 * Force synchronous commit, thus minimizing the window between removal of
965 	 * the database files and committal of the transaction. If we crash before
966 	 * committing, we'll have a DB that's gone on disk but still there
967 	 * according to pg_database, which is not good.
968 	 */
969 	ForceSyncCommit();
970 }
971 
972 
973 /*
974  * Rename database
975  */
976 ObjectAddress
RenameDatabase(const char * oldname,const char * newname)977 RenameDatabase(const char *oldname, const char *newname)
978 {
979 	Oid			db_id;
980 	HeapTuple	newtup;
981 	Relation	rel;
982 	int			notherbackends;
983 	int			npreparedxacts;
984 	ObjectAddress address;
985 
986 	/*
987 	 * Look up the target database's OID, and get exclusive lock on it. We
988 	 * need this for the same reasons as DROP DATABASE.
989 	 */
990 	rel = heap_open(DatabaseRelationId, RowExclusiveLock);
991 
992 	if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
993 					 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
994 		ereport(ERROR,
995 				(errcode(ERRCODE_UNDEFINED_DATABASE),
996 				 errmsg("database \"%s\" does not exist", oldname)));
997 
998 	/* must be owner */
999 	if (!pg_database_ownercheck(db_id, GetUserId()))
1000 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1001 					   oldname);
1002 
1003 	/* must have createdb rights */
1004 	if (!have_createdb_privilege())
1005 		ereport(ERROR,
1006 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1007 				 errmsg("permission denied to rename database")));
1008 
1009 	/*
1010 	 * Make sure the new name doesn't exist.  See notes for same error in
1011 	 * CREATE DATABASE.
1012 	 */
1013 	if (OidIsValid(get_database_oid(newname, true)))
1014 		ereport(ERROR,
1015 				(errcode(ERRCODE_DUPLICATE_DATABASE),
1016 				 errmsg("database \"%s\" already exists", newname)));
1017 
1018 	/*
1019 	 * XXX Client applications probably store the current database somewhere,
1020 	 * so renaming it could cause confusion.  On the other hand, there may not
1021 	 * be an actual problem besides a little confusion, so think about this
1022 	 * and decide.
1023 	 */
1024 	if (db_id == MyDatabaseId)
1025 		ereport(ERROR,
1026 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1027 				 errmsg("current database cannot be renamed")));
1028 
1029 	/*
1030 	 * Make sure the database does not have active sessions.  This is the same
1031 	 * concern as above, but applied to other sessions.
1032 	 *
1033 	 * As in CREATE DATABASE, check this after other error conditions.
1034 	 */
1035 	if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1036 		ereport(ERROR,
1037 				(errcode(ERRCODE_OBJECT_IN_USE),
1038 				 errmsg("database \"%s\" is being accessed by other users",
1039 						oldname),
1040 				 errdetail_busy_db(notherbackends, npreparedxacts)));
1041 
1042 	/* rename */
1043 	newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1044 	if (!HeapTupleIsValid(newtup))
1045 		elog(ERROR, "cache lookup failed for database %u", db_id);
1046 	namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1047 	CatalogTupleUpdate(rel, &newtup->t_self, newtup);
1048 
1049 	InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1050 
1051 	ObjectAddressSet(address, DatabaseRelationId, db_id);
1052 
1053 	/*
1054 	 * Close pg_database, but keep lock till commit.
1055 	 */
1056 	heap_close(rel, NoLock);
1057 
1058 	return address;
1059 }
1060 
1061 
1062 /*
1063  * ALTER DATABASE SET TABLESPACE
1064  */
1065 static void
movedb(const char * dbname,const char * tblspcname)1066 movedb(const char *dbname, const char *tblspcname)
1067 {
1068 	Oid			db_id;
1069 	Relation	pgdbrel;
1070 	int			notherbackends;
1071 	int			npreparedxacts;
1072 	HeapTuple	oldtuple,
1073 				newtuple;
1074 	Oid			src_tblspcoid,
1075 				dst_tblspcoid;
1076 	Datum		new_record[Natts_pg_database];
1077 	bool		new_record_nulls[Natts_pg_database];
1078 	bool		new_record_repl[Natts_pg_database];
1079 	ScanKeyData scankey;
1080 	SysScanDesc sysscan;
1081 	AclResult	aclresult;
1082 	char	   *src_dbpath;
1083 	char	   *dst_dbpath;
1084 	DIR		   *dstdir;
1085 	struct dirent *xlde;
1086 	movedb_failure_params fparms;
1087 
1088 	/*
1089 	 * Look up the target database's OID, and get exclusive lock on it. We
1090 	 * need this to ensure that no new backend starts up in the database while
1091 	 * we are moving it, and that no one is using it as a CREATE DATABASE
1092 	 * template or trying to delete it.
1093 	 */
1094 	pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
1095 
1096 	if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1097 					 NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1098 		ereport(ERROR,
1099 				(errcode(ERRCODE_UNDEFINED_DATABASE),
1100 				 errmsg("database \"%s\" does not exist", dbname)));
1101 
1102 	/*
1103 	 * We actually need a session lock, so that the lock will persist across
1104 	 * the commit/restart below.  (We could almost get away with letting the
1105 	 * lock be released at commit, except that someone could try to move
1106 	 * relations of the DB back into the old directory while we rmtree() it.)
1107 	 */
1108 	LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1109 							   AccessExclusiveLock);
1110 
1111 	/*
1112 	 * Permission checks
1113 	 */
1114 	if (!pg_database_ownercheck(db_id, GetUserId()))
1115 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1116 					   dbname);
1117 
1118 	/*
1119 	 * Obviously can't move the tables of my own database
1120 	 */
1121 	if (db_id == MyDatabaseId)
1122 		ereport(ERROR,
1123 				(errcode(ERRCODE_OBJECT_IN_USE),
1124 				 errmsg("cannot change the tablespace of the currently open database")));
1125 
1126 	/*
1127 	 * Get tablespace's oid
1128 	 */
1129 	dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1130 
1131 	/*
1132 	 * Permission checks
1133 	 */
1134 	aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1135 									   ACL_CREATE);
1136 	if (aclresult != ACLCHECK_OK)
1137 		aclcheck_error(aclresult, OBJECT_TABLESPACE,
1138 					   tblspcname);
1139 
1140 	/*
1141 	 * pg_global must never be the default tablespace
1142 	 */
1143 	if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1144 		ereport(ERROR,
1145 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1146 				 errmsg("pg_global cannot be used as default tablespace")));
1147 
1148 	/*
1149 	 * No-op if same tablespace
1150 	 */
1151 	if (src_tblspcoid == dst_tblspcoid)
1152 	{
1153 		heap_close(pgdbrel, NoLock);
1154 		UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1155 									 AccessExclusiveLock);
1156 		return;
1157 	}
1158 
1159 	/*
1160 	 * Check for other backends in the target database.  (Because we hold the
1161 	 * database lock, no new ones can start after this.)
1162 	 *
1163 	 * As in CREATE DATABASE, check this after other error conditions.
1164 	 */
1165 	if (CountOtherDBBackends(db_id, &notherbackends, &npreparedxacts))
1166 		ereport(ERROR,
1167 				(errcode(ERRCODE_OBJECT_IN_USE),
1168 				 errmsg("database \"%s\" is being accessed by other users",
1169 						dbname),
1170 				 errdetail_busy_db(notherbackends, npreparedxacts)));
1171 
1172 	/*
1173 	 * Get old and new database paths
1174 	 */
1175 	src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1176 	dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1177 
1178 	/*
1179 	 * Force a checkpoint before proceeding. This will force all dirty
1180 	 * buffers, including those of unlogged tables, out to disk, to ensure
1181 	 * source database is up-to-date on disk for the copy.
1182 	 * FlushDatabaseBuffers() would suffice for that, but we also want to
1183 	 * process any pending unlink requests. Otherwise, the check for existing
1184 	 * files in the target directory might fail unnecessarily, not to mention
1185 	 * that the copy might fail due to source files getting deleted under it.
1186 	 * On Windows, this also ensures that background procs don't hold any open
1187 	 * files, which would cause rmdir() to fail.
1188 	 */
1189 	RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
1190 					  | CHECKPOINT_FLUSH_ALL);
1191 
1192 	/*
1193 	 * Now drop all buffers holding data of the target database; they should
1194 	 * no longer be dirty so DropDatabaseBuffers is safe.
1195 	 *
1196 	 * It might seem that we could just let these buffers age out of shared
1197 	 * buffers naturally, since they should not get referenced anymore.  The
1198 	 * problem with that is that if the user later moves the database back to
1199 	 * its original tablespace, any still-surviving buffers would appear to
1200 	 * contain valid data again --- but they'd be missing any changes made in
1201 	 * the database while it was in the new tablespace.  In any case, freeing
1202 	 * buffers that should never be used again seems worth the cycles.
1203 	 *
1204 	 * Note: it'd be sufficient to get rid of buffers matching db_id and
1205 	 * src_tblspcoid, but bufmgr.c presently provides no API for that.
1206 	 */
1207 	DropDatabaseBuffers(db_id);
1208 
1209 	/*
1210 	 * Check for existence of files in the target directory, i.e., objects of
1211 	 * this database that are already in the target tablespace.  We can't
1212 	 * allow the move in such a case, because we would need to change those
1213 	 * relations' pg_class.reltablespace entries to zero, and we don't have
1214 	 * access to the DB's pg_class to do so.
1215 	 */
1216 	dstdir = AllocateDir(dst_dbpath);
1217 	if (dstdir != NULL)
1218 	{
1219 		while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1220 		{
1221 			if (strcmp(xlde->d_name, ".") == 0 ||
1222 				strcmp(xlde->d_name, "..") == 0)
1223 				continue;
1224 
1225 			ereport(ERROR,
1226 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1227 					 errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1228 							dbname, tblspcname),
1229 					 errhint("You must move them back to the database's default tablespace before using this command.")));
1230 		}
1231 
1232 		FreeDir(dstdir);
1233 
1234 		/*
1235 		 * The directory exists but is empty. We must remove it before using
1236 		 * the copydir function.
1237 		 */
1238 		if (rmdir(dst_dbpath) != 0)
1239 			elog(ERROR, "could not remove directory \"%s\": %m",
1240 				 dst_dbpath);
1241 	}
1242 
1243 	/*
1244 	 * Use an ENSURE block to make sure we remove the debris if the copy fails
1245 	 * (eg, due to out-of-disk-space).  This is not a 100% solution, because
1246 	 * of the possibility of failure during transaction commit, but it should
1247 	 * handle most scenarios.
1248 	 */
1249 	fparms.dest_dboid = db_id;
1250 	fparms.dest_tsoid = dst_tblspcoid;
1251 	PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1252 							PointerGetDatum(&fparms));
1253 	{
1254 		/*
1255 		 * Copy files from the old tablespace to the new one
1256 		 */
1257 		copydir(src_dbpath, dst_dbpath, false);
1258 
1259 		/*
1260 		 * Record the filesystem change in XLOG
1261 		 */
1262 		{
1263 			xl_dbase_create_rec xlrec;
1264 
1265 			xlrec.db_id = db_id;
1266 			xlrec.tablespace_id = dst_tblspcoid;
1267 			xlrec.src_db_id = db_id;
1268 			xlrec.src_tablespace_id = src_tblspcoid;
1269 
1270 			XLogBeginInsert();
1271 			XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
1272 
1273 			(void) XLogInsert(RM_DBASE_ID,
1274 							  XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
1275 		}
1276 
1277 		/*
1278 		 * Update the database's pg_database tuple
1279 		 */
1280 		ScanKeyInit(&scankey,
1281 					Anum_pg_database_datname,
1282 					BTEqualStrategyNumber, F_NAMEEQ,
1283 					CStringGetDatum(dbname));
1284 		sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1285 									 NULL, 1, &scankey);
1286 		oldtuple = systable_getnext(sysscan);
1287 		if (!HeapTupleIsValid(oldtuple))	/* shouldn't happen... */
1288 			ereport(ERROR,
1289 					(errcode(ERRCODE_UNDEFINED_DATABASE),
1290 					 errmsg("database \"%s\" does not exist", dbname)));
1291 
1292 		MemSet(new_record, 0, sizeof(new_record));
1293 		MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1294 		MemSet(new_record_repl, false, sizeof(new_record_repl));
1295 
1296 		new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1297 		new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1298 
1299 		newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1300 									 new_record,
1301 									 new_record_nulls, new_record_repl);
1302 		CatalogTupleUpdate(pgdbrel, &oldtuple->t_self, newtuple);
1303 
1304 		InvokeObjectPostAlterHook(DatabaseRelationId,
1305 								  HeapTupleGetOid(newtuple), 0);
1306 
1307 		systable_endscan(sysscan);
1308 
1309 		/*
1310 		 * Force another checkpoint here.  As in CREATE DATABASE, this is to
1311 		 * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1312 		 * operation, which would cause us to lose any unlogged operations
1313 		 * done in the new DB tablespace before the next checkpoint.
1314 		 */
1315 		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1316 
1317 		/*
1318 		 * Force synchronous commit, thus minimizing the window between
1319 		 * copying the database files and committal of the transaction. If we
1320 		 * crash before committing, we'll leave an orphaned set of files on
1321 		 * disk, which is not fatal but not good either.
1322 		 */
1323 		ForceSyncCommit();
1324 
1325 		/*
1326 		 * Close pg_database, but keep lock till commit.
1327 		 */
1328 		heap_close(pgdbrel, NoLock);
1329 	}
1330 	PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1331 								PointerGetDatum(&fparms));
1332 
1333 	/*
1334 	 * Commit the transaction so that the pg_database update is committed. If
1335 	 * we crash while removing files, the database won't be corrupt, we'll
1336 	 * just leave some orphaned files in the old directory.
1337 	 *
1338 	 * (This is OK because we know we aren't inside a transaction block.)
1339 	 *
1340 	 * XXX would it be safe/better to do this inside the ensure block?	Not
1341 	 * convinced it's a good idea; consider elog just after the transaction
1342 	 * really commits.
1343 	 */
1344 	PopActiveSnapshot();
1345 	CommitTransactionCommand();
1346 
1347 	/* Start new transaction for the remaining work; don't need a snapshot */
1348 	StartTransactionCommand();
1349 
1350 	/*
1351 	 * Remove files from the old tablespace
1352 	 */
1353 	if (!rmtree(src_dbpath, true))
1354 		ereport(WARNING,
1355 				(errmsg("some useless files may be left behind in old database directory \"%s\"",
1356 						src_dbpath)));
1357 
1358 	/*
1359 	 * Record the filesystem change in XLOG
1360 	 */
1361 	{
1362 		xl_dbase_drop_rec xlrec;
1363 
1364 		xlrec.db_id = db_id;
1365 		xlrec.tablespace_id = src_tblspcoid;
1366 
1367 		XLogBeginInsert();
1368 		XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1369 
1370 		(void) XLogInsert(RM_DBASE_ID,
1371 						  XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1372 	}
1373 
1374 	/* Now it's safe to release the database lock */
1375 	UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1376 								 AccessExclusiveLock);
1377 }
1378 
1379 /* Error cleanup callback for movedb */
1380 static void
movedb_failure_callback(int code,Datum arg)1381 movedb_failure_callback(int code, Datum arg)
1382 {
1383 	movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1384 	char	   *dstpath;
1385 
1386 	/* Get rid of anything we managed to copy to the target directory */
1387 	dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1388 
1389 	(void) rmtree(dstpath, true);
1390 }
1391 
1392 
1393 /*
1394  * ALTER DATABASE name ...
1395  */
1396 Oid
AlterDatabase(ParseState * pstate,AlterDatabaseStmt * stmt,bool isTopLevel)1397 AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
1398 {
1399 	Relation	rel;
1400 	Oid			dboid;
1401 	HeapTuple	tuple,
1402 				newtuple;
1403 	ScanKeyData scankey;
1404 	SysScanDesc scan;
1405 	ListCell   *option;
1406 	bool		dbistemplate = false;
1407 	bool		dballowconnections = true;
1408 	int			dbconnlimit = -1;
1409 	DefElem    *distemplate = NULL;
1410 	DefElem    *dallowconnections = NULL;
1411 	DefElem    *dconnlimit = NULL;
1412 	DefElem    *dtablespace = NULL;
1413 	Datum		new_record[Natts_pg_database];
1414 	bool		new_record_nulls[Natts_pg_database];
1415 	bool		new_record_repl[Natts_pg_database];
1416 
1417 	/* Extract options from the statement node tree */
1418 	foreach(option, stmt->options)
1419 	{
1420 		DefElem    *defel = (DefElem *) lfirst(option);
1421 
1422 		if (strcmp(defel->defname, "is_template") == 0)
1423 		{
1424 			if (distemplate)
1425 				ereport(ERROR,
1426 						(errcode(ERRCODE_SYNTAX_ERROR),
1427 						 errmsg("conflicting or redundant options"),
1428 						 parser_errposition(pstate, defel->location)));
1429 			distemplate = defel;
1430 		}
1431 		else if (strcmp(defel->defname, "allow_connections") == 0)
1432 		{
1433 			if (dallowconnections)
1434 				ereport(ERROR,
1435 						(errcode(ERRCODE_SYNTAX_ERROR),
1436 						 errmsg("conflicting or redundant options"),
1437 						 parser_errposition(pstate, defel->location)));
1438 			dallowconnections = defel;
1439 		}
1440 		else if (strcmp(defel->defname, "connection_limit") == 0)
1441 		{
1442 			if (dconnlimit)
1443 				ereport(ERROR,
1444 						(errcode(ERRCODE_SYNTAX_ERROR),
1445 						 errmsg("conflicting or redundant options"),
1446 						 parser_errposition(pstate, defel->location)));
1447 			dconnlimit = defel;
1448 		}
1449 		else if (strcmp(defel->defname, "tablespace") == 0)
1450 		{
1451 			if (dtablespace)
1452 				ereport(ERROR,
1453 						(errcode(ERRCODE_SYNTAX_ERROR),
1454 						 errmsg("conflicting or redundant options"),
1455 						 parser_errposition(pstate, defel->location)));
1456 			dtablespace = defel;
1457 		}
1458 		else
1459 			ereport(ERROR,
1460 					(errcode(ERRCODE_SYNTAX_ERROR),
1461 					 errmsg("option \"%s\" not recognized", defel->defname),
1462 					 parser_errposition(pstate, defel->location)));
1463 	}
1464 
1465 	if (dtablespace)
1466 	{
1467 		/*
1468 		 * While the SET TABLESPACE syntax doesn't allow any other options,
1469 		 * somebody could write "WITH TABLESPACE ...".  Forbid any other
1470 		 * options from being specified in that case.
1471 		 */
1472 		if (list_length(stmt->options) != 1)
1473 			ereport(ERROR,
1474 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1475 					 errmsg("option \"%s\" cannot be specified with other options",
1476 							dtablespace->defname),
1477 					 parser_errposition(pstate, dtablespace->location)));
1478 		/* this case isn't allowed within a transaction block */
1479 		PreventInTransactionBlock(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1480 		movedb(stmt->dbname, defGetString(dtablespace));
1481 		return InvalidOid;
1482 	}
1483 
1484 	if (distemplate && distemplate->arg)
1485 		dbistemplate = defGetBoolean(distemplate);
1486 	if (dallowconnections && dallowconnections->arg)
1487 		dballowconnections = defGetBoolean(dallowconnections);
1488 	if (dconnlimit && dconnlimit->arg)
1489 	{
1490 		dbconnlimit = defGetInt32(dconnlimit);
1491 		if (dbconnlimit < -1)
1492 			ereport(ERROR,
1493 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1494 					 errmsg("invalid connection limit: %d", dbconnlimit)));
1495 	}
1496 
1497 	/*
1498 	 * Get the old tuple.  We don't need a lock on the database per se,
1499 	 * because we're not going to do anything that would mess up incoming
1500 	 * connections.
1501 	 */
1502 	rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1503 	ScanKeyInit(&scankey,
1504 				Anum_pg_database_datname,
1505 				BTEqualStrategyNumber, F_NAMEEQ,
1506 				CStringGetDatum(stmt->dbname));
1507 	scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1508 							  NULL, 1, &scankey);
1509 	tuple = systable_getnext(scan);
1510 	if (!HeapTupleIsValid(tuple))
1511 		ereport(ERROR,
1512 				(errcode(ERRCODE_UNDEFINED_DATABASE),
1513 				 errmsg("database \"%s\" does not exist", stmt->dbname)));
1514 
1515 	dboid = HeapTupleGetOid(tuple);
1516 
1517 	if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1518 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1519 					   stmt->dbname);
1520 
1521 	/*
1522 	 * In order to avoid getting locked out and having to go through
1523 	 * standalone mode, we refuse to disallow connections to the database
1524 	 * we're currently connected to.  Lockout can still happen with concurrent
1525 	 * sessions but the likeliness of that is not high enough to worry about.
1526 	 */
1527 	if (!dballowconnections && dboid == MyDatabaseId)
1528 		ereport(ERROR,
1529 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1530 				 errmsg("cannot disallow connections for current database")));
1531 
1532 	/*
1533 	 * Build an updated tuple, perusing the information just obtained
1534 	 */
1535 	MemSet(new_record, 0, sizeof(new_record));
1536 	MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1537 	MemSet(new_record_repl, false, sizeof(new_record_repl));
1538 
1539 	if (distemplate)
1540 	{
1541 		new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1542 		new_record_repl[Anum_pg_database_datistemplate - 1] = true;
1543 	}
1544 	if (dallowconnections)
1545 	{
1546 		new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1547 		new_record_repl[Anum_pg_database_datallowconn - 1] = true;
1548 	}
1549 	if (dconnlimit)
1550 	{
1551 		new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1552 		new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1553 	}
1554 
1555 	newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1556 								 new_record_nulls, new_record_repl);
1557 	CatalogTupleUpdate(rel, &tuple->t_self, newtuple);
1558 
1559 	InvokeObjectPostAlterHook(DatabaseRelationId,
1560 							  HeapTupleGetOid(newtuple), 0);
1561 
1562 	systable_endscan(scan);
1563 
1564 	/* Close pg_database, but keep lock till commit */
1565 	heap_close(rel, NoLock);
1566 
1567 	return dboid;
1568 }
1569 
1570 
1571 /*
1572  * ALTER DATABASE name SET ...
1573  */
1574 Oid
AlterDatabaseSet(AlterDatabaseSetStmt * stmt)1575 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1576 {
1577 	Oid			datid = get_database_oid(stmt->dbname, false);
1578 
1579 	/*
1580 	 * Obtain a lock on the database and make sure it didn't go away in the
1581 	 * meantime.
1582 	 */
1583 	shdepLockAndCheckObject(DatabaseRelationId, datid);
1584 
1585 	if (!pg_database_ownercheck(datid, GetUserId()))
1586 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1587 					   stmt->dbname);
1588 
1589 	AlterSetting(datid, InvalidOid, stmt->setstmt);
1590 
1591 	UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1592 
1593 	return datid;
1594 }
1595 
1596 
1597 /*
1598  * ALTER DATABASE name OWNER TO newowner
1599  */
1600 ObjectAddress
AlterDatabaseOwner(const char * dbname,Oid newOwnerId)1601 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1602 {
1603 	Oid			db_id;
1604 	HeapTuple	tuple;
1605 	Relation	rel;
1606 	ScanKeyData scankey;
1607 	SysScanDesc scan;
1608 	Form_pg_database datForm;
1609 	ObjectAddress address;
1610 
1611 	/*
1612 	 * Get the old tuple.  We don't need a lock on the database per se,
1613 	 * because we're not going to do anything that would mess up incoming
1614 	 * connections.
1615 	 */
1616 	rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1617 	ScanKeyInit(&scankey,
1618 				Anum_pg_database_datname,
1619 				BTEqualStrategyNumber, F_NAMEEQ,
1620 				CStringGetDatum(dbname));
1621 	scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1622 							  NULL, 1, &scankey);
1623 	tuple = systable_getnext(scan);
1624 	if (!HeapTupleIsValid(tuple))
1625 		ereport(ERROR,
1626 				(errcode(ERRCODE_UNDEFINED_DATABASE),
1627 				 errmsg("database \"%s\" does not exist", dbname)));
1628 
1629 	db_id = HeapTupleGetOid(tuple);
1630 	datForm = (Form_pg_database) GETSTRUCT(tuple);
1631 
1632 	/*
1633 	 * If the new owner is the same as the existing owner, consider the
1634 	 * command to have succeeded.  This is to be consistent with other
1635 	 * objects.
1636 	 */
1637 	if (datForm->datdba != newOwnerId)
1638 	{
1639 		Datum		repl_val[Natts_pg_database];
1640 		bool		repl_null[Natts_pg_database];
1641 		bool		repl_repl[Natts_pg_database];
1642 		Acl		   *newAcl;
1643 		Datum		aclDatum;
1644 		bool		isNull;
1645 		HeapTuple	newtuple;
1646 
1647 		/* Otherwise, must be owner of the existing object */
1648 		if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1649 			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
1650 						   dbname);
1651 
1652 		/* Must be able to become new owner */
1653 		check_is_member_of_role(GetUserId(), newOwnerId);
1654 
1655 		/*
1656 		 * must have createdb rights
1657 		 *
1658 		 * NOTE: This is different from other alter-owner checks in that the
1659 		 * current user is checked for createdb privileges instead of the
1660 		 * destination owner.  This is consistent with the CREATE case for
1661 		 * databases.  Because superusers will always have this right, we need
1662 		 * no special case for them.
1663 		 */
1664 		if (!have_createdb_privilege())
1665 			ereport(ERROR,
1666 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1667 					 errmsg("permission denied to change owner of database")));
1668 
1669 		memset(repl_null, false, sizeof(repl_null));
1670 		memset(repl_repl, false, sizeof(repl_repl));
1671 
1672 		repl_repl[Anum_pg_database_datdba - 1] = true;
1673 		repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1674 
1675 		/*
1676 		 * Determine the modified ACL for the new owner.  This is only
1677 		 * necessary when the ACL is non-null.
1678 		 */
1679 		aclDatum = heap_getattr(tuple,
1680 								Anum_pg_database_datacl,
1681 								RelationGetDescr(rel),
1682 								&isNull);
1683 		if (!isNull)
1684 		{
1685 			newAcl = aclnewowner(DatumGetAclP(aclDatum),
1686 								 datForm->datdba, newOwnerId);
1687 			repl_repl[Anum_pg_database_datacl - 1] = true;
1688 			repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1689 		}
1690 
1691 		newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1692 		CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1693 
1694 		heap_freetuple(newtuple);
1695 
1696 		/* Update owner dependency reference */
1697 		changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1698 								newOwnerId);
1699 	}
1700 
1701 	InvokeObjectPostAlterHook(DatabaseRelationId, HeapTupleGetOid(tuple), 0);
1702 
1703 	ObjectAddressSet(address, DatabaseRelationId, db_id);
1704 
1705 	systable_endscan(scan);
1706 
1707 	/* Close pg_database, but keep lock till commit */
1708 	heap_close(rel, NoLock);
1709 
1710 	return address;
1711 }
1712 
1713 
1714 /*
1715  * Helper functions
1716  */
1717 
1718 /*
1719  * Look up info about the database named "name".  If the database exists,
1720  * obtain the specified lock type on it, fill in any of the remaining
1721  * parameters that aren't NULL, and return true.  If no such database,
1722  * return false.
1723  */
1724 static bool
get_db_info(const char * name,LOCKMODE lockmode,Oid * dbIdP,Oid * ownerIdP,int * encodingP,bool * dbIsTemplateP,bool * dbAllowConnP,Oid * dbLastSysOidP,TransactionId * dbFrozenXidP,MultiXactId * dbMinMultiP,Oid * dbTablespace,char ** dbCollate,char ** dbCtype)1725 get_db_info(const char *name, LOCKMODE lockmode,
1726 			Oid *dbIdP, Oid *ownerIdP,
1727 			int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1728 			Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1729 			MultiXactId *dbMinMultiP,
1730 			Oid *dbTablespace, char **dbCollate, char **dbCtype)
1731 {
1732 	bool		result = false;
1733 	Relation	relation;
1734 
1735 	AssertArg(name);
1736 
1737 	/* Caller may wish to grab a better lock on pg_database beforehand... */
1738 	relation = heap_open(DatabaseRelationId, AccessShareLock);
1739 
1740 	/*
1741 	 * Loop covers the rare case where the database is renamed before we can
1742 	 * lock it.  We try again just in case we can find a new one of the same
1743 	 * name.
1744 	 */
1745 	for (;;)
1746 	{
1747 		ScanKeyData scanKey;
1748 		SysScanDesc scan;
1749 		HeapTuple	tuple;
1750 		Oid			dbOid;
1751 
1752 		/*
1753 		 * there's no syscache for database-indexed-by-name, so must do it the
1754 		 * hard way
1755 		 */
1756 		ScanKeyInit(&scanKey,
1757 					Anum_pg_database_datname,
1758 					BTEqualStrategyNumber, F_NAMEEQ,
1759 					CStringGetDatum(name));
1760 
1761 		scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1762 								  NULL, 1, &scanKey);
1763 
1764 		tuple = systable_getnext(scan);
1765 
1766 		if (!HeapTupleIsValid(tuple))
1767 		{
1768 			/* definitely no database of that name */
1769 			systable_endscan(scan);
1770 			break;
1771 		}
1772 
1773 		dbOid = HeapTupleGetOid(tuple);
1774 
1775 		systable_endscan(scan);
1776 
1777 		/*
1778 		 * Now that we have a database OID, we can try to lock the DB.
1779 		 */
1780 		if (lockmode != NoLock)
1781 			LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1782 
1783 		/*
1784 		 * And now, re-fetch the tuple by OID.  If it's still there and still
1785 		 * the same name, we win; else, drop the lock and loop back to try
1786 		 * again.
1787 		 */
1788 		tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1789 		if (HeapTupleIsValid(tuple))
1790 		{
1791 			Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1792 
1793 			if (strcmp(name, NameStr(dbform->datname)) == 0)
1794 			{
1795 				/* oid of the database */
1796 				if (dbIdP)
1797 					*dbIdP = dbOid;
1798 				/* oid of the owner */
1799 				if (ownerIdP)
1800 					*ownerIdP = dbform->datdba;
1801 				/* character encoding */
1802 				if (encodingP)
1803 					*encodingP = dbform->encoding;
1804 				/* allowed as template? */
1805 				if (dbIsTemplateP)
1806 					*dbIsTemplateP = dbform->datistemplate;
1807 				/* allowing connections? */
1808 				if (dbAllowConnP)
1809 					*dbAllowConnP = dbform->datallowconn;
1810 				/* last system OID used in database */
1811 				if (dbLastSysOidP)
1812 					*dbLastSysOidP = dbform->datlastsysoid;
1813 				/* limit of frozen XIDs */
1814 				if (dbFrozenXidP)
1815 					*dbFrozenXidP = dbform->datfrozenxid;
1816 				/* minimum MultixactId */
1817 				if (dbMinMultiP)
1818 					*dbMinMultiP = dbform->datminmxid;
1819 				/* default tablespace for this database */
1820 				if (dbTablespace)
1821 					*dbTablespace = dbform->dattablespace;
1822 				/* default locale settings for this database */
1823 				if (dbCollate)
1824 					*dbCollate = pstrdup(NameStr(dbform->datcollate));
1825 				if (dbCtype)
1826 					*dbCtype = pstrdup(NameStr(dbform->datctype));
1827 				ReleaseSysCache(tuple);
1828 				result = true;
1829 				break;
1830 			}
1831 			/* can only get here if it was just renamed */
1832 			ReleaseSysCache(tuple);
1833 		}
1834 
1835 		if (lockmode != NoLock)
1836 			UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1837 	}
1838 
1839 	heap_close(relation, AccessShareLock);
1840 
1841 	return result;
1842 }
1843 
1844 /* Check if current user has createdb privileges */
1845 static bool
have_createdb_privilege(void)1846 have_createdb_privilege(void)
1847 {
1848 	bool		result = false;
1849 	HeapTuple	utup;
1850 
1851 	/* Superusers can always do everything */
1852 	if (superuser())
1853 		return true;
1854 
1855 	utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
1856 	if (HeapTupleIsValid(utup))
1857 	{
1858 		result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1859 		ReleaseSysCache(utup);
1860 	}
1861 	return result;
1862 }
1863 
1864 /*
1865  * Remove tablespace directories
1866  *
1867  * We don't know what tablespaces db_id is using, so iterate through all
1868  * tablespaces removing <tablespace>/db_id
1869  */
1870 static void
remove_dbtablespaces(Oid db_id)1871 remove_dbtablespaces(Oid db_id)
1872 {
1873 	Relation	rel;
1874 	HeapScanDesc scan;
1875 	HeapTuple	tuple;
1876 
1877 	rel = heap_open(TableSpaceRelationId, AccessShareLock);
1878 	scan = heap_beginscan_catalog(rel, 0, NULL);
1879 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1880 	{
1881 		Oid			dsttablespace = HeapTupleGetOid(tuple);
1882 		char	   *dstpath;
1883 		struct stat st;
1884 
1885 		/* Don't mess with the global tablespace */
1886 		if (dsttablespace == GLOBALTABLESPACE_OID)
1887 			continue;
1888 
1889 		dstpath = GetDatabasePath(db_id, dsttablespace);
1890 
1891 		if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1892 		{
1893 			/* Assume we can ignore it */
1894 			pfree(dstpath);
1895 			continue;
1896 		}
1897 
1898 		if (!rmtree(dstpath, true))
1899 			ereport(WARNING,
1900 					(errmsg("some useless files may be left behind in old database directory \"%s\"",
1901 							dstpath)));
1902 
1903 		/* Record the filesystem change in XLOG */
1904 		{
1905 			xl_dbase_drop_rec xlrec;
1906 
1907 			xlrec.db_id = db_id;
1908 			xlrec.tablespace_id = dsttablespace;
1909 
1910 			XLogBeginInsert();
1911 			XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1912 
1913 			(void) XLogInsert(RM_DBASE_ID,
1914 							  XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1915 		}
1916 
1917 		pfree(dstpath);
1918 	}
1919 
1920 	heap_endscan(scan);
1921 	heap_close(rel, AccessShareLock);
1922 }
1923 
1924 /*
1925  * Check for existing files that conflict with a proposed new DB OID;
1926  * return true if there are any
1927  *
1928  * If there were a subdirectory in any tablespace matching the proposed new
1929  * OID, we'd get a create failure due to the duplicate name ... and then we'd
1930  * try to remove that already-existing subdirectory during the cleanup in
1931  * remove_dbtablespaces.  Nuking existing files seems like a bad idea, so
1932  * instead we make this extra check before settling on the OID of the new
1933  * database.  This exactly parallels what GetNewRelFileNode() does for table
1934  * relfilenode values.
1935  */
1936 static bool
check_db_file_conflict(Oid db_id)1937 check_db_file_conflict(Oid db_id)
1938 {
1939 	bool		result = false;
1940 	Relation	rel;
1941 	HeapScanDesc scan;
1942 	HeapTuple	tuple;
1943 
1944 	rel = heap_open(TableSpaceRelationId, AccessShareLock);
1945 	scan = heap_beginscan_catalog(rel, 0, NULL);
1946 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1947 	{
1948 		Oid			dsttablespace = HeapTupleGetOid(tuple);
1949 		char	   *dstpath;
1950 		struct stat st;
1951 
1952 		/* Don't mess with the global tablespace */
1953 		if (dsttablespace == GLOBALTABLESPACE_OID)
1954 			continue;
1955 
1956 		dstpath = GetDatabasePath(db_id, dsttablespace);
1957 
1958 		if (lstat(dstpath, &st) == 0)
1959 		{
1960 			/* Found a conflicting file (or directory, whatever) */
1961 			pfree(dstpath);
1962 			result = true;
1963 			break;
1964 		}
1965 
1966 		pfree(dstpath);
1967 	}
1968 
1969 	heap_endscan(scan);
1970 	heap_close(rel, AccessShareLock);
1971 
1972 	return result;
1973 }
1974 
1975 /*
1976  * Issue a suitable errdetail message for a busy database
1977  */
1978 static int
errdetail_busy_db(int notherbackends,int npreparedxacts)1979 errdetail_busy_db(int notherbackends, int npreparedxacts)
1980 {
1981 	if (notherbackends > 0 && npreparedxacts > 0)
1982 
1983 		/*
1984 		 * We don't deal with singular versus plural here, since gettext
1985 		 * doesn't support multiple plurals in one string.
1986 		 */
1987 		errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1988 				  notherbackends, npreparedxacts);
1989 	else if (notherbackends > 0)
1990 		errdetail_plural("There is %d other session using the database.",
1991 						 "There are %d other sessions using the database.",
1992 						 notherbackends,
1993 						 notherbackends);
1994 	else
1995 		errdetail_plural("There is %d prepared transaction using the database.",
1996 						 "There are %d prepared transactions using the database.",
1997 						 npreparedxacts,
1998 						 npreparedxacts);
1999 	return 0;					/* just to keep ereport macro happy */
2000 }
2001 
2002 /*
2003  * get_database_oid - given a database name, look up the OID
2004  *
2005  * If missing_ok is false, throw an error if database name not found.  If
2006  * true, just return InvalidOid.
2007  */
2008 Oid
get_database_oid(const char * dbname,bool missing_ok)2009 get_database_oid(const char *dbname, bool missing_ok)
2010 {
2011 	Relation	pg_database;
2012 	ScanKeyData entry[1];
2013 	SysScanDesc scan;
2014 	HeapTuple	dbtuple;
2015 	Oid			oid;
2016 
2017 	/*
2018 	 * There's no syscache for pg_database indexed by name, so we must look
2019 	 * the hard way.
2020 	 */
2021 	pg_database = heap_open(DatabaseRelationId, AccessShareLock);
2022 	ScanKeyInit(&entry[0],
2023 				Anum_pg_database_datname,
2024 				BTEqualStrategyNumber, F_NAMEEQ,
2025 				CStringGetDatum(dbname));
2026 	scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
2027 							  NULL, 1, entry);
2028 
2029 	dbtuple = systable_getnext(scan);
2030 
2031 	/* We assume that there can be at most one matching tuple */
2032 	if (HeapTupleIsValid(dbtuple))
2033 		oid = HeapTupleGetOid(dbtuple);
2034 	else
2035 		oid = InvalidOid;
2036 
2037 	systable_endscan(scan);
2038 	heap_close(pg_database, AccessShareLock);
2039 
2040 	if (!OidIsValid(oid) && !missing_ok)
2041 		ereport(ERROR,
2042 				(errcode(ERRCODE_UNDEFINED_DATABASE),
2043 				 errmsg("database \"%s\" does not exist",
2044 						dbname)));
2045 
2046 	return oid;
2047 }
2048 
2049 
2050 /*
2051  * get_database_name - given a database OID, look up the name
2052  *
2053  * Returns a palloc'd string, or NULL if no such database.
2054  */
2055 char *
get_database_name(Oid dbid)2056 get_database_name(Oid dbid)
2057 {
2058 	HeapTuple	dbtuple;
2059 	char	   *result;
2060 
2061 	dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2062 	if (HeapTupleIsValid(dbtuple))
2063 	{
2064 		result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
2065 		ReleaseSysCache(dbtuple);
2066 	}
2067 	else
2068 		result = NULL;
2069 
2070 	return result;
2071 }
2072 
2073 /*
2074  * DATABASE resource manager's routines
2075  */
2076 void
dbase_redo(XLogReaderState * record)2077 dbase_redo(XLogReaderState *record)
2078 {
2079 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
2080 
2081 	/* Backup blocks are not used in dbase records */
2082 	Assert(!XLogRecHasAnyBlockRefs(record));
2083 
2084 	if (info == XLOG_DBASE_CREATE)
2085 	{
2086 		xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
2087 		char	   *src_path;
2088 		char	   *dst_path;
2089 		struct stat st;
2090 
2091 		src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
2092 		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2093 
2094 		/*
2095 		 * Our theory for replaying a CREATE is to forcibly drop the target
2096 		 * subdirectory if present, then re-copy the source data. This may be
2097 		 * more work than needed, but it is simple to implement.
2098 		 */
2099 		if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
2100 		{
2101 			if (!rmtree(dst_path, true))
2102 				/* If this failed, copydir() below is going to error. */
2103 				ereport(WARNING,
2104 						(errmsg("some useless files may be left behind in old database directory \"%s\"",
2105 								dst_path)));
2106 		}
2107 
2108 		/*
2109 		 * Force dirty buffers out to disk, to ensure source database is
2110 		 * up-to-date for the copy.
2111 		 */
2112 		FlushDatabaseBuffers(xlrec->src_db_id);
2113 
2114 		/*
2115 		 * Copy this subdirectory to the new location
2116 		 *
2117 		 * We don't need to copy subdirectories
2118 		 */
2119 		copydir(src_path, dst_path, false);
2120 	}
2121 	else if (info == XLOG_DBASE_DROP)
2122 	{
2123 		xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
2124 		char	   *dst_path;
2125 
2126 		dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2127 
2128 		if (InHotStandby)
2129 		{
2130 			/*
2131 			 * Lock database while we resolve conflicts to ensure that
2132 			 * InitPostgres() cannot fully re-execute concurrently. This
2133 			 * avoids backends re-connecting automatically to same database,
2134 			 * which can happen in some cases.
2135 			 *
2136 			 * This will lock out walsenders trying to connect to db-specific
2137 			 * slots for logical decoding too, so it's safe for us to drop
2138 			 * slots.
2139 			 */
2140 			LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2141 			ResolveRecoveryConflictWithDatabase(xlrec->db_id);
2142 		}
2143 
2144 		/* Drop any database-specific replication slots */
2145 		ReplicationSlotsDropDBSlots(xlrec->db_id);
2146 
2147 		/* Drop pages for this database that are in the shared buffer cache */
2148 		DropDatabaseBuffers(xlrec->db_id);
2149 
2150 		/* Also, clean out any fsync requests that might be pending in md.c */
2151 		ForgetDatabaseFsyncRequests(xlrec->db_id);
2152 
2153 		/* Clean out the xlog relcache too */
2154 		XLogDropDatabase(xlrec->db_id);
2155 
2156 		/* And remove the physical files */
2157 		if (!rmtree(dst_path, true))
2158 			ereport(WARNING,
2159 					(errmsg("some useless files may be left behind in old database directory \"%s\"",
2160 							dst_path)));
2161 
2162 		if (InHotStandby)
2163 		{
2164 			/*
2165 			 * Release locks prior to commit. XXX There is a race condition
2166 			 * here that may allow backends to reconnect, but the window for
2167 			 * this is small because the gap between here and commit is mostly
2168 			 * fairly small and it is unlikely that people will be dropping
2169 			 * databases that we are trying to connect to anyway.
2170 			 */
2171 			UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2172 		}
2173 	}
2174 	else
2175 		elog(PANIC, "dbase_redo: unknown op code %u", info);
2176 }
2177