1 /*-------------------------------------------------------------------------
2 *
3 * dbcommands.c
4 * Database management commands (create/drop database).
5 *
6 * Note: database creation/destruction commands use exclusive locks on
7 * the database objects (as expressed by LockSharedObject()) to avoid
8 * stepping on each others' toes. Formerly we used table-level locks
9 * on pg_database, but that's too coarse-grained.
10 *
11 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 *
15 * IDENTIFICATION
16 * src/backend/commands/dbcommands.c
17 *
18 *-------------------------------------------------------------------------
19 */
20 #include "postgres.h"
21
22 #include <stdbool.h>
23 #include <fcntl.h>
24 #include <locale.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27
28 #include "access/genam.h"
29 #include "access/heapam.h"
30 #include "access/htup_details.h"
31 #include "access/xact.h"
32 #include "access/xloginsert.h"
33 #include "access/xlogutils.h"
34 #include "catalog/catalog.h"
35 #include "catalog/dependency.h"
36 #include "catalog/indexing.h"
37 #include "catalog/objectaccess.h"
38 #include "catalog/pg_authid.h"
39 #include "catalog/pg_database.h"
40 #include "catalog/pg_db_role_setting.h"
41 #include "catalog/pg_tablespace.h"
42 #include "commands/comment.h"
43 #include "commands/dbcommands.h"
44 #include "commands/dbcommands_xlog.h"
45 #include "commands/defrem.h"
46 #include "commands/seclabel.h"
47 #include "commands/tablespace.h"
48 #include "mb/pg_wchar.h"
49 #include "miscadmin.h"
50 #include "pgstat.h"
51 #include "postmaster/bgwriter.h"
52 #include "replication/slot.h"
53 #include "storage/copydir.h"
54 #include "storage/fd.h"
55 #include "storage/lmgr.h"
56 #include "storage/ipc.h"
57 #include "storage/procarray.h"
58 #include "storage/smgr.h"
59 #include "utils/acl.h"
60 #include "utils/builtins.h"
61 #include "utils/fmgroids.h"
62 #include "utils/pg_locale.h"
63 #include "utils/snapmgr.h"
64 #include "utils/syscache.h"
65 #include "utils/tqual.h"
66
67
68 typedef struct
69 {
70 Oid src_dboid; /* source (template) DB */
71 Oid dest_dboid; /* DB we are trying to create */
72 } createdb_failure_params;
73
74 typedef struct
75 {
76 Oid dest_dboid; /* DB we are trying to move */
77 Oid dest_tsoid; /* tablespace we are trying to move to */
78 } movedb_failure_params;
79
80 /* non-export function prototypes */
81 static void createdb_failure_callback(int code, Datum arg);
82 static void movedb(const char *dbname, const char *tblspcname);
83 static void movedb_failure_callback(int code, Datum arg);
84 static bool get_db_info(const char *name, LOCKMODE lockmode,
85 Oid *dbIdP, Oid *ownerIdP,
86 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
87 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
88 MultiXactId *dbMinMultiP,
89 Oid *dbTablespace, char **dbCollate, char **dbCtype);
90 static bool have_createdb_privilege(void);
91 static void remove_dbtablespaces(Oid db_id);
92 static bool check_db_file_conflict(Oid db_id);
93 static int errdetail_busy_db(int notherbackends, int npreparedxacts);
94
95
96 /*
97 * CREATE DATABASE
98 */
99 Oid
createdb(const CreatedbStmt * stmt)100 createdb(const CreatedbStmt *stmt)
101 {
102 HeapScanDesc scan;
103 Relation rel;
104 Oid src_dboid;
105 Oid src_owner;
106 int src_encoding;
107 char *src_collate;
108 char *src_ctype;
109 bool src_istemplate;
110 bool src_allowconn;
111 Oid src_lastsysoid;
112 TransactionId src_frozenxid;
113 MultiXactId src_minmxid;
114 Oid src_deftablespace;
115 volatile Oid dst_deftablespace;
116 Relation pg_database_rel;
117 HeapTuple tuple;
118 Datum new_record[Natts_pg_database];
119 bool new_record_nulls[Natts_pg_database];
120 Oid dboid;
121 Oid datdba;
122 ListCell *option;
123 DefElem *dtablespacename = NULL;
124 DefElem *downer = NULL;
125 DefElem *dtemplate = NULL;
126 DefElem *dencoding = NULL;
127 DefElem *dcollate = NULL;
128 DefElem *dctype = NULL;
129 DefElem *distemplate = NULL;
130 DefElem *dallowconnections = NULL;
131 DefElem *dconnlimit = NULL;
132 char *dbname = stmt->dbname;
133 char *dbowner = NULL;
134 const char *dbtemplate = NULL;
135 char *dbcollate = NULL;
136 char *dbctype = NULL;
137 char *canonname;
138 int encoding = -1;
139 bool dbistemplate = false;
140 bool dballowconnections = true;
141 int dbconnlimit = -1;
142 int notherbackends;
143 int npreparedxacts;
144 createdb_failure_params fparms;
145
146 /* Extract options from the statement node tree */
147 foreach(option, stmt->options)
148 {
149 DefElem *defel = (DefElem *) lfirst(option);
150
151 if (strcmp(defel->defname, "tablespace") == 0)
152 {
153 if (dtablespacename)
154 ereport(ERROR,
155 (errcode(ERRCODE_SYNTAX_ERROR),
156 errmsg("conflicting or redundant options")));
157 dtablespacename = defel;
158 }
159 else if (strcmp(defel->defname, "owner") == 0)
160 {
161 if (downer)
162 ereport(ERROR,
163 (errcode(ERRCODE_SYNTAX_ERROR),
164 errmsg("conflicting or redundant options")));
165 downer = defel;
166 }
167 else if (strcmp(defel->defname, "template") == 0)
168 {
169 if (dtemplate)
170 ereport(ERROR,
171 (errcode(ERRCODE_SYNTAX_ERROR),
172 errmsg("conflicting or redundant options")));
173 dtemplate = defel;
174 }
175 else if (strcmp(defel->defname, "encoding") == 0)
176 {
177 if (dencoding)
178 ereport(ERROR,
179 (errcode(ERRCODE_SYNTAX_ERROR),
180 errmsg("conflicting or redundant options")));
181 dencoding = defel;
182 }
183 else if (strcmp(defel->defname, "lc_collate") == 0)
184 {
185 if (dcollate)
186 ereport(ERROR,
187 (errcode(ERRCODE_SYNTAX_ERROR),
188 errmsg("conflicting or redundant options")));
189 dcollate = defel;
190 }
191 else if (strcmp(defel->defname, "lc_ctype") == 0)
192 {
193 if (dctype)
194 ereport(ERROR,
195 (errcode(ERRCODE_SYNTAX_ERROR),
196 errmsg("conflicting or redundant options")));
197 dctype = defel;
198 }
199 else if (strcmp(defel->defname, "is_template") == 0)
200 {
201 if (distemplate)
202 ereport(ERROR,
203 (errcode(ERRCODE_SYNTAX_ERROR),
204 errmsg("conflicting or redundant options")));
205 distemplate = defel;
206 }
207 else if (strcmp(defel->defname, "allow_connections") == 0)
208 {
209 if (dallowconnections)
210 ereport(ERROR,
211 (errcode(ERRCODE_SYNTAX_ERROR),
212 errmsg("conflicting or redundant options")));
213 dallowconnections = defel;
214 }
215 else if (strcmp(defel->defname, "connection_limit") == 0)
216 {
217 if (dconnlimit)
218 ereport(ERROR,
219 (errcode(ERRCODE_SYNTAX_ERROR),
220 errmsg("conflicting or redundant options")));
221 dconnlimit = defel;
222 }
223 else if (strcmp(defel->defname, "location") == 0)
224 {
225 ereport(WARNING,
226 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
227 errmsg("LOCATION is not supported anymore"),
228 errhint("Consider using tablespaces instead.")));
229 }
230 else
231 ereport(ERROR,
232 (errcode(ERRCODE_SYNTAX_ERROR),
233 errmsg("option \"%s\" not recognized", defel->defname)));
234 }
235
236 if (downer && downer->arg)
237 dbowner = defGetString(downer);
238 if (dtemplate && dtemplate->arg)
239 dbtemplate = defGetString(dtemplate);
240 if (dencoding && dencoding->arg)
241 {
242 const char *encoding_name;
243
244 if (IsA(dencoding->arg, Integer))
245 {
246 encoding = defGetInt32(dencoding);
247 encoding_name = pg_encoding_to_char(encoding);
248 if (strcmp(encoding_name, "") == 0 ||
249 pg_valid_server_encoding(encoding_name) < 0)
250 ereport(ERROR,
251 (errcode(ERRCODE_UNDEFINED_OBJECT),
252 errmsg("%d is not a valid encoding code",
253 encoding)));
254 }
255 else
256 {
257 encoding_name = defGetString(dencoding);
258 encoding = pg_valid_server_encoding(encoding_name);
259 if (encoding < 0)
260 ereport(ERROR,
261 (errcode(ERRCODE_UNDEFINED_OBJECT),
262 errmsg("%s is not a valid encoding name",
263 encoding_name)));
264 }
265 }
266 if (dcollate && dcollate->arg)
267 dbcollate = defGetString(dcollate);
268 if (dctype && dctype->arg)
269 dbctype = defGetString(dctype);
270 if (distemplate && distemplate->arg)
271 dbistemplate = defGetBoolean(distemplate);
272 if (dallowconnections && dallowconnections->arg)
273 dballowconnections = defGetBoolean(dallowconnections);
274 if (dconnlimit && dconnlimit->arg)
275 {
276 dbconnlimit = defGetInt32(dconnlimit);
277 if (dbconnlimit < -1)
278 ereport(ERROR,
279 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
280 errmsg("invalid connection limit: %d", dbconnlimit)));
281 }
282
283 /* obtain OID of proposed owner */
284 if (dbowner)
285 datdba = get_role_oid(dbowner, false);
286 else
287 datdba = GetUserId();
288
289 /*
290 * To create a database, must have createdb privilege and must be able to
291 * become the target role (this does not imply that the target role itself
292 * must have createdb privilege). The latter provision guards against
293 * "giveaway" attacks. Note that a superuser will always have both of
294 * these privileges a fortiori.
295 */
296 if (!have_createdb_privilege())
297 ereport(ERROR,
298 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
299 errmsg("permission denied to create database")));
300
301 check_is_member_of_role(GetUserId(), datdba);
302
303 /*
304 * Lookup database (template) to be cloned, and obtain share lock on it.
305 * ShareLock allows two CREATE DATABASEs to work from the same template
306 * concurrently, while ensuring no one is busy dropping it in parallel
307 * (which would be Very Bad since we'd likely get an incomplete copy
308 * without knowing it). This also prevents any new connections from being
309 * made to the source until we finish copying it, so we can be sure it
310 * won't change underneath us.
311 */
312 if (!dbtemplate)
313 dbtemplate = "template1"; /* Default template database name */
314
315 if (!get_db_info(dbtemplate, ShareLock,
316 &src_dboid, &src_owner, &src_encoding,
317 &src_istemplate, &src_allowconn, &src_lastsysoid,
318 &src_frozenxid, &src_minmxid, &src_deftablespace,
319 &src_collate, &src_ctype))
320 ereport(ERROR,
321 (errcode(ERRCODE_UNDEFINED_DATABASE),
322 errmsg("template database \"%s\" does not exist",
323 dbtemplate)));
324
325 /*
326 * Permission check: to copy a DB that's not marked datistemplate, you
327 * must be superuser or the owner thereof.
328 */
329 if (!src_istemplate)
330 {
331 if (!pg_database_ownercheck(src_dboid, GetUserId()))
332 ereport(ERROR,
333 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
334 errmsg("permission denied to copy database \"%s\"",
335 dbtemplate)));
336 }
337
338 /* If encoding or locales are defaulted, use source's setting */
339 if (encoding < 0)
340 encoding = src_encoding;
341 if (dbcollate == NULL)
342 dbcollate = src_collate;
343 if (dbctype == NULL)
344 dbctype = src_ctype;
345
346 /* Some encodings are client only */
347 if (!PG_VALID_BE_ENCODING(encoding))
348 ereport(ERROR,
349 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
350 errmsg("invalid server encoding %d", encoding)));
351
352 /* Check that the chosen locales are valid, and get canonical spellings */
353 if (!check_locale(LC_COLLATE, dbcollate, &canonname))
354 ereport(ERROR,
355 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
356 errmsg("invalid locale name: \"%s\"", dbcollate)));
357 dbcollate = canonname;
358 if (!check_locale(LC_CTYPE, dbctype, &canonname))
359 ereport(ERROR,
360 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
361 errmsg("invalid locale name: \"%s\"", dbctype)));
362 dbctype = canonname;
363
364 check_encoding_locale_matches(encoding, dbcollate, dbctype);
365
366 /*
367 * Check that the new encoding and locale settings match the source
368 * database. We insist on this because we simply copy the source data ---
369 * any non-ASCII data would be wrongly encoded, and any indexes sorted
370 * according to the source locale would be wrong.
371 *
372 * However, we assume that template0 doesn't contain any non-ASCII data
373 * nor any indexes that depend on collation or ctype, so template0 can be
374 * used as template for creating a database with any encoding or locale.
375 */
376 if (strcmp(dbtemplate, "template0") != 0)
377 {
378 if (encoding != src_encoding)
379 ereport(ERROR,
380 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
381 errmsg("new encoding (%s) is incompatible with the encoding of the template database (%s)",
382 pg_encoding_to_char(encoding),
383 pg_encoding_to_char(src_encoding)),
384 errhint("Use the same encoding as in the template database, or use template0 as template.")));
385
386 if (strcmp(dbcollate, src_collate) != 0)
387 ereport(ERROR,
388 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
389 errmsg("new collation (%s) is incompatible with the collation of the template database (%s)",
390 dbcollate, src_collate),
391 errhint("Use the same collation as in the template database, or use template0 as template.")));
392
393 if (strcmp(dbctype, src_ctype) != 0)
394 ereport(ERROR,
395 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
396 errmsg("new LC_CTYPE (%s) is incompatible with the LC_CTYPE of the template database (%s)",
397 dbctype, src_ctype),
398 errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
399 }
400
401 /* Resolve default tablespace for new database */
402 if (dtablespacename && dtablespacename->arg)
403 {
404 char *tablespacename;
405 AclResult aclresult;
406
407 tablespacename = defGetString(dtablespacename);
408 dst_deftablespace = get_tablespace_oid(tablespacename, false);
409 /* check permissions */
410 aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(),
411 ACL_CREATE);
412 if (aclresult != ACLCHECK_OK)
413 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
414 tablespacename);
415
416 /* pg_global must never be the default tablespace */
417 if (dst_deftablespace == GLOBALTABLESPACE_OID)
418 ereport(ERROR,
419 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
420 errmsg("pg_global cannot be used as default tablespace")));
421
422 /*
423 * If we are trying to change the default tablespace of the template,
424 * we require that the template not have any files in the new default
425 * tablespace. This is necessary because otherwise the copied
426 * database would contain pg_class rows that refer to its default
427 * tablespace both explicitly (by OID) and implicitly (as zero), which
428 * would cause problems. For example another CREATE DATABASE using
429 * the copied database as template, and trying to change its default
430 * tablespace again, would yield outright incorrect results (it would
431 * improperly move tables to the new default tablespace that should
432 * stay in the same tablespace).
433 */
434 if (dst_deftablespace != src_deftablespace)
435 {
436 char *srcpath;
437 struct stat st;
438
439 srcpath = GetDatabasePath(src_dboid, dst_deftablespace);
440
441 if (stat(srcpath, &st) == 0 &&
442 S_ISDIR(st.st_mode) &&
443 !directory_is_empty(srcpath))
444 ereport(ERROR,
445 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
446 errmsg("cannot assign new default tablespace \"%s\"",
447 tablespacename),
448 errdetail("There is a conflict because database \"%s\" already has some tables in this tablespace.",
449 dbtemplate)));
450 pfree(srcpath);
451 }
452 }
453 else
454 {
455 /* Use template database's default tablespace */
456 dst_deftablespace = src_deftablespace;
457 /* Note there is no additional permission check in this path */
458 }
459
460 /*
461 * Check for db name conflict. This is just to give a more friendly error
462 * message than "unique index violation". There's a race condition but
463 * we're willing to accept the less friendly message in that case.
464 */
465 if (OidIsValid(get_database_oid(dbname, true)))
466 ereport(ERROR,
467 (errcode(ERRCODE_DUPLICATE_DATABASE),
468 errmsg("database \"%s\" already exists", dbname)));
469
470 /*
471 * The source DB can't have any active backends, except this one
472 * (exception is to allow CREATE DB while connected to template1).
473 * Otherwise we might copy inconsistent data.
474 *
475 * This should be last among the basic error checks, because it involves
476 * potential waiting; we may as well throw an error first if we're gonna
477 * throw one.
478 */
479 if (CountOtherDBBackends(src_dboid, ¬herbackends, &npreparedxacts))
480 ereport(ERROR,
481 (errcode(ERRCODE_OBJECT_IN_USE),
482 errmsg("source database \"%s\" is being accessed by other users",
483 dbtemplate),
484 errdetail_busy_db(notherbackends, npreparedxacts)));
485
486 /*
487 * Select an OID for the new database, checking that it doesn't have a
488 * filename conflict with anything already existing in the tablespace
489 * directories.
490 */
491 pg_database_rel = heap_open(DatabaseRelationId, RowExclusiveLock);
492
493 do
494 {
495 dboid = GetNewOid(pg_database_rel);
496 } while (check_db_file_conflict(dboid));
497
498 /*
499 * Insert a new tuple into pg_database. This establishes our ownership of
500 * the new database name (anyone else trying to insert the same name will
501 * block on the unique index, and fail after we commit).
502 */
503
504 /* Form tuple */
505 MemSet(new_record, 0, sizeof(new_record));
506 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
507
508 new_record[Anum_pg_database_datname - 1] =
509 DirectFunctionCall1(namein, CStringGetDatum(dbname));
510 new_record[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(datdba);
511 new_record[Anum_pg_database_encoding - 1] = Int32GetDatum(encoding);
512 new_record[Anum_pg_database_datcollate - 1] =
513 DirectFunctionCall1(namein, CStringGetDatum(dbcollate));
514 new_record[Anum_pg_database_datctype - 1] =
515 DirectFunctionCall1(namein, CStringGetDatum(dbctype));
516 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
517 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
518 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
519 new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid);
520 new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid);
521 new_record[Anum_pg_database_datminmxid - 1] = TransactionIdGetDatum(src_minmxid);
522 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
523
524 /*
525 * We deliberately set datacl to default (NULL), rather than copying it
526 * from the template database. Copying it would be a bad idea when the
527 * owner is not the same as the template's owner.
528 */
529 new_record_nulls[Anum_pg_database_datacl - 1] = true;
530
531 tuple = heap_form_tuple(RelationGetDescr(pg_database_rel),
532 new_record, new_record_nulls);
533
534 HeapTupleSetOid(tuple, dboid);
535
536 simple_heap_insert(pg_database_rel, tuple);
537
538 /* Update indexes */
539 CatalogUpdateIndexes(pg_database_rel, tuple);
540
541 /*
542 * Now generate additional catalog entries associated with the new DB
543 */
544
545 /* Register owner dependency */
546 recordDependencyOnOwner(DatabaseRelationId, dboid, datdba);
547
548 /* Create pg_shdepend entries for objects within database */
549 copyTemplateDependencies(src_dboid, dboid);
550
551 /* Post creation hook for new database */
552 InvokeObjectPostCreateHook(DatabaseRelationId, dboid, 0);
553
554 /*
555 * Force a checkpoint before starting the copy. This will force all dirty
556 * buffers, including those of unlogged tables, out to disk, to ensure
557 * source database is up-to-date on disk for the copy.
558 * FlushDatabaseBuffers() would suffice for that, but we also want to
559 * process any pending unlink requests. Otherwise, if a checkpoint
560 * happened while we're copying files, a file might be deleted just when
561 * we're about to copy it, causing the lstat() call in copydir() to fail
562 * with ENOENT.
563 */
564 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
565 | CHECKPOINT_FLUSH_ALL);
566
567 /*
568 * Once we start copying subdirectories, we need to be able to clean 'em
569 * up if we fail. Use an ENSURE block to make sure this happens. (This
570 * is not a 100% solution, because of the possibility of failure during
571 * transaction commit after we leave this routine, but it should handle
572 * most scenarios.)
573 */
574 fparms.src_dboid = src_dboid;
575 fparms.dest_dboid = dboid;
576 PG_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
577 PointerGetDatum(&fparms));
578 {
579 /*
580 * Iterate through all tablespaces of the template database, and copy
581 * each one to the new database.
582 */
583 rel = heap_open(TableSpaceRelationId, AccessShareLock);
584 scan = heap_beginscan_catalog(rel, 0, NULL);
585 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
586 {
587 Oid srctablespace = HeapTupleGetOid(tuple);
588 Oid dsttablespace;
589 char *srcpath;
590 char *dstpath;
591 struct stat st;
592
593 /* No need to copy global tablespace */
594 if (srctablespace == GLOBALTABLESPACE_OID)
595 continue;
596
597 srcpath = GetDatabasePath(src_dboid, srctablespace);
598
599 if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode) ||
600 directory_is_empty(srcpath))
601 {
602 /* Assume we can ignore it */
603 pfree(srcpath);
604 continue;
605 }
606
607 if (srctablespace == src_deftablespace)
608 dsttablespace = dst_deftablespace;
609 else
610 dsttablespace = srctablespace;
611
612 dstpath = GetDatabasePath(dboid, dsttablespace);
613
614 /*
615 * Copy this subdirectory to the new location
616 *
617 * We don't need to copy subdirectories
618 */
619 copydir(srcpath, dstpath, false);
620
621 /* Record the filesystem change in XLOG */
622 {
623 xl_dbase_create_rec xlrec;
624
625 xlrec.db_id = dboid;
626 xlrec.tablespace_id = dsttablespace;
627 xlrec.src_db_id = src_dboid;
628 xlrec.src_tablespace_id = srctablespace;
629
630 XLogBeginInsert();
631 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
632
633 (void) XLogInsert(RM_DBASE_ID,
634 XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
635 }
636 }
637 heap_endscan(scan);
638 heap_close(rel, AccessShareLock);
639
640 /*
641 * We force a checkpoint before committing. This effectively means
642 * that committed XLOG_DBASE_CREATE operations will never need to be
643 * replayed (at least not in ordinary crash recovery; we still have to
644 * make the XLOG entry for the benefit of PITR operations). This
645 * avoids two nasty scenarios:
646 *
647 * #1: When PITR is off, we don't XLOG the contents of newly created
648 * indexes; therefore the drop-and-recreate-whole-directory behavior
649 * of DBASE_CREATE replay would lose such indexes.
650 *
651 * #2: Since we have to recopy the source database during DBASE_CREATE
652 * replay, we run the risk of copying changes in it that were
653 * committed after the original CREATE DATABASE command but before the
654 * system crash that led to the replay. This is at least unexpected
655 * and at worst could lead to inconsistencies, eg duplicate table
656 * names.
657 *
658 * (Both of these were real bugs in releases 8.0 through 8.0.3.)
659 *
660 * In PITR replay, the first of these isn't an issue, and the second
661 * is only a risk if the CREATE DATABASE and subsequent template
662 * database change both occur while a base backup is being taken.
663 * There doesn't seem to be much we can do about that except document
664 * it as a limitation.
665 *
666 * Perhaps if we ever implement CREATE DATABASE in a less cheesy way,
667 * we can avoid this.
668 */
669 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
670
671 /*
672 * Close pg_database, but keep lock till commit.
673 */
674 heap_close(pg_database_rel, NoLock);
675
676 /*
677 * Force synchronous commit, thus minimizing the window between
678 * creation of the database files and committal of the transaction. If
679 * we crash before committing, we'll have a DB that's taking up disk
680 * space but is not in pg_database, which is not good.
681 */
682 ForceSyncCommit();
683 }
684 PG_END_ENSURE_ERROR_CLEANUP(createdb_failure_callback,
685 PointerGetDatum(&fparms));
686
687 return dboid;
688 }
689
690 /*
691 * Check whether chosen encoding matches chosen locale settings. This
692 * restriction is necessary because libc's locale-specific code usually
693 * fails when presented with data in an encoding it's not expecting. We
694 * allow mismatch in four cases:
695 *
696 * 1. locale encoding = SQL_ASCII, which means that the locale is C/POSIX
697 * which works with any encoding.
698 *
699 * 2. locale encoding = -1, which means that we couldn't determine the
700 * locale's encoding and have to trust the user to get it right.
701 *
702 * 3. selected encoding is UTF8 and platform is win32. This is because
703 * UTF8 is a pseudo codepage that is supported in all locales since it's
704 * converted to UTF16 before being used.
705 *
706 * 4. selected encoding is SQL_ASCII, but only if you're a superuser. This
707 * is risky but we have historically allowed it --- notably, the
708 * regression tests require it.
709 *
710 * Note: if you change this policy, fix initdb to match.
711 */
712 void
check_encoding_locale_matches(int encoding,const char * collate,const char * ctype)713 check_encoding_locale_matches(int encoding, const char *collate, const char *ctype)
714 {
715 int ctype_encoding = pg_get_encoding_from_locale(ctype, true);
716 int collate_encoding = pg_get_encoding_from_locale(collate, true);
717
718 if (!(ctype_encoding == encoding ||
719 ctype_encoding == PG_SQL_ASCII ||
720 ctype_encoding == -1 ||
721 #ifdef WIN32
722 encoding == PG_UTF8 ||
723 #endif
724 (encoding == PG_SQL_ASCII && superuser())))
725 ereport(ERROR,
726 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
727 errmsg("encoding \"%s\" does not match locale \"%s\"",
728 pg_encoding_to_char(encoding),
729 ctype),
730 errdetail("The chosen LC_CTYPE setting requires encoding \"%s\".",
731 pg_encoding_to_char(ctype_encoding))));
732
733 if (!(collate_encoding == encoding ||
734 collate_encoding == PG_SQL_ASCII ||
735 collate_encoding == -1 ||
736 #ifdef WIN32
737 encoding == PG_UTF8 ||
738 #endif
739 (encoding == PG_SQL_ASCII && superuser())))
740 ereport(ERROR,
741 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
742 errmsg("encoding \"%s\" does not match locale \"%s\"",
743 pg_encoding_to_char(encoding),
744 collate),
745 errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
746 pg_encoding_to_char(collate_encoding))));
747 }
748
749 /* Error cleanup callback for createdb */
750 static void
createdb_failure_callback(int code,Datum arg)751 createdb_failure_callback(int code, Datum arg)
752 {
753 createdb_failure_params *fparms = (createdb_failure_params *) DatumGetPointer(arg);
754
755 /*
756 * Release lock on source database before doing recursive remove. This is
757 * not essential but it seems desirable to release the lock as soon as
758 * possible.
759 */
760 UnlockSharedObject(DatabaseRelationId, fparms->src_dboid, 0, ShareLock);
761
762 /* Throw away any successfully copied subdirectories */
763 remove_dbtablespaces(fparms->dest_dboid);
764 }
765
766
767 /*
768 * DROP DATABASE
769 */
770 void
dropdb(const char * dbname,bool missing_ok)771 dropdb(const char *dbname, bool missing_ok)
772 {
773 Oid db_id;
774 bool db_istemplate;
775 Relation pgdbrel;
776 HeapTuple tup;
777 int notherbackends;
778 int npreparedxacts;
779 int nslots,
780 nslots_active;
781
782 /*
783 * Look up the target database's OID, and get exclusive lock on it. We
784 * need this to ensure that no new backend starts up in the target
785 * database while we are deleting it (see postinit.c), and that no one is
786 * using it as a CREATE DATABASE template or trying to delete it for
787 * themselves.
788 */
789 pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
790
791 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
792 &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
793 {
794 if (!missing_ok)
795 {
796 ereport(ERROR,
797 (errcode(ERRCODE_UNDEFINED_DATABASE),
798 errmsg("database \"%s\" does not exist", dbname)));
799 }
800 else
801 {
802 /* Close pg_database, release the lock, since we changed nothing */
803 heap_close(pgdbrel, RowExclusiveLock);
804 ereport(NOTICE,
805 (errmsg("database \"%s\" does not exist, skipping",
806 dbname)));
807 return;
808 }
809 }
810
811 /*
812 * Permission checks
813 */
814 if (!pg_database_ownercheck(db_id, GetUserId()))
815 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
816 dbname);
817
818 /* DROP hook for the database being removed */
819 InvokeObjectDropHook(DatabaseRelationId, db_id, 0);
820
821 /*
822 * Disallow dropping a DB that is marked istemplate. This is just to
823 * prevent people from accidentally dropping template0 or template1; they
824 * can do so if they're really determined ...
825 */
826 if (db_istemplate)
827 ereport(ERROR,
828 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
829 errmsg("cannot drop a template database")));
830
831 /* Obviously can't drop my own database */
832 if (db_id == MyDatabaseId)
833 ereport(ERROR,
834 (errcode(ERRCODE_OBJECT_IN_USE),
835 errmsg("cannot drop the currently open database")));
836
837 /*
838 * Check whether there are, possibly unconnected, logical slots that refer
839 * to the to-be-dropped database. The database lock we are holding
840 * prevents the creation of new slots using the database.
841 */
842 if (ReplicationSlotsCountDBSlots(db_id, &nslots, &nslots_active))
843 ereport(ERROR,
844 (errcode(ERRCODE_OBJECT_IN_USE),
845 errmsg("database \"%s\" is used by a logical replication slot",
846 dbname),
847 errdetail_plural("There is %d slot, %d of them active.",
848 "There are %d slots, %d of them active.",
849 nslots,
850 nslots, nslots_active)));
851
852 /*
853 * Check for other backends in the target database. (Because we hold the
854 * database lock, no new ones can start after this.)
855 *
856 * As in CREATE DATABASE, check this after other error conditions.
857 */
858 if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
859 ereport(ERROR,
860 (errcode(ERRCODE_OBJECT_IN_USE),
861 errmsg("database \"%s\" is being accessed by other users",
862 dbname),
863 errdetail_busy_db(notherbackends, npreparedxacts)));
864
865 /*
866 * Remove the database's tuple from pg_database.
867 */
868 tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
869 if (!HeapTupleIsValid(tup))
870 elog(ERROR, "cache lookup failed for database %u", db_id);
871
872 simple_heap_delete(pgdbrel, &tup->t_self);
873
874 ReleaseSysCache(tup);
875
876 /*
877 * Delete any comments or security labels associated with the database.
878 */
879 DeleteSharedComments(db_id, DatabaseRelationId);
880 DeleteSharedSecurityLabel(db_id, DatabaseRelationId);
881
882 /*
883 * Remove settings associated with this database
884 */
885 DropSetting(db_id, InvalidOid);
886
887 /*
888 * Remove shared dependency references for the database.
889 */
890 dropDatabaseDependencies(db_id);
891
892 /*
893 * Drop pages for this database that are in the shared buffer cache. This
894 * is important to ensure that no remaining backend tries to write out a
895 * dirty buffer to the dead database later...
896 */
897 DropDatabaseBuffers(db_id);
898
899 /*
900 * Tell the stats collector to forget it immediately, too.
901 */
902 pgstat_drop_database(db_id);
903
904 /*
905 * Tell checkpointer to forget any pending fsync and unlink requests for
906 * files in the database; else the fsyncs will fail at next checkpoint, or
907 * worse, it will delete files that belong to a newly created database
908 * with the same OID.
909 */
910 ForgetDatabaseFsyncRequests(db_id);
911
912 /*
913 * Force a checkpoint to make sure the checkpointer has received the
914 * message sent by ForgetDatabaseFsyncRequests. On Windows, this also
915 * ensures that background procs don't hold any open files, which would
916 * cause rmdir() to fail.
917 */
918 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
919
920 /*
921 * Remove all tablespace subdirs belonging to the database.
922 */
923 remove_dbtablespaces(db_id);
924
925 /*
926 * Close pg_database, but keep lock till commit.
927 */
928 heap_close(pgdbrel, NoLock);
929
930 /*
931 * Force synchronous commit, thus minimizing the window between removal of
932 * the database files and committal of the transaction. If we crash before
933 * committing, we'll have a DB that's gone on disk but still there
934 * according to pg_database, which is not good.
935 */
936 ForceSyncCommit();
937 }
938
939
940 /*
941 * Rename database
942 */
943 ObjectAddress
RenameDatabase(const char * oldname,const char * newname)944 RenameDatabase(const char *oldname, const char *newname)
945 {
946 Oid db_id;
947 HeapTuple newtup;
948 Relation rel;
949 int notherbackends;
950 int npreparedxacts;
951 ObjectAddress address;
952
953 /*
954 * Look up the target database's OID, and get exclusive lock on it. We
955 * need this for the same reasons as DROP DATABASE.
956 */
957 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
958
959 if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
960 NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
961 ereport(ERROR,
962 (errcode(ERRCODE_UNDEFINED_DATABASE),
963 errmsg("database \"%s\" does not exist", oldname)));
964
965 /* must be owner */
966 if (!pg_database_ownercheck(db_id, GetUserId()))
967 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
968 oldname);
969
970 /* must have createdb rights */
971 if (!have_createdb_privilege())
972 ereport(ERROR,
973 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
974 errmsg("permission denied to rename database")));
975
976 /*
977 * Make sure the new name doesn't exist. See notes for same error in
978 * CREATE DATABASE.
979 */
980 if (OidIsValid(get_database_oid(newname, true)))
981 ereport(ERROR,
982 (errcode(ERRCODE_DUPLICATE_DATABASE),
983 errmsg("database \"%s\" already exists", newname)));
984
985 /*
986 * XXX Client applications probably store the current database somewhere,
987 * so renaming it could cause confusion. On the other hand, there may not
988 * be an actual problem besides a little confusion, so think about this
989 * and decide.
990 */
991 if (db_id == MyDatabaseId)
992 ereport(ERROR,
993 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
994 errmsg("current database cannot be renamed")));
995
996 /*
997 * Make sure the database does not have active sessions. This is the same
998 * concern as above, but applied to other sessions.
999 *
1000 * As in CREATE DATABASE, check this after other error conditions.
1001 */
1002 if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1003 ereport(ERROR,
1004 (errcode(ERRCODE_OBJECT_IN_USE),
1005 errmsg("database \"%s\" is being accessed by other users",
1006 oldname),
1007 errdetail_busy_db(notherbackends, npreparedxacts)));
1008
1009 /* rename */
1010 newtup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
1011 if (!HeapTupleIsValid(newtup))
1012 elog(ERROR, "cache lookup failed for database %u", db_id);
1013 namestrcpy(&(((Form_pg_database) GETSTRUCT(newtup))->datname), newname);
1014 simple_heap_update(rel, &newtup->t_self, newtup);
1015 CatalogUpdateIndexes(rel, newtup);
1016
1017 InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
1018
1019 ObjectAddressSet(address, DatabaseRelationId, db_id);
1020
1021 /*
1022 * Close pg_database, but keep lock till commit.
1023 */
1024 heap_close(rel, NoLock);
1025
1026 return address;
1027 }
1028
1029
1030 /*
1031 * ALTER DATABASE SET TABLESPACE
1032 */
1033 static void
movedb(const char * dbname,const char * tblspcname)1034 movedb(const char *dbname, const char *tblspcname)
1035 {
1036 Oid db_id;
1037 Relation pgdbrel;
1038 int notherbackends;
1039 int npreparedxacts;
1040 HeapTuple oldtuple,
1041 newtuple;
1042 Oid src_tblspcoid,
1043 dst_tblspcoid;
1044 Datum new_record[Natts_pg_database];
1045 bool new_record_nulls[Natts_pg_database];
1046 bool new_record_repl[Natts_pg_database];
1047 ScanKeyData scankey;
1048 SysScanDesc sysscan;
1049 AclResult aclresult;
1050 char *src_dbpath;
1051 char *dst_dbpath;
1052 DIR *dstdir;
1053 struct dirent *xlde;
1054 movedb_failure_params fparms;
1055
1056 /*
1057 * Look up the target database's OID, and get exclusive lock on it. We
1058 * need this to ensure that no new backend starts up in the database while
1059 * we are moving it, and that no one is using it as a CREATE DATABASE
1060 * template or trying to delete it.
1061 */
1062 pgdbrel = heap_open(DatabaseRelationId, RowExclusiveLock);
1063
1064 if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
1065 NULL, NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
1066 ereport(ERROR,
1067 (errcode(ERRCODE_UNDEFINED_DATABASE),
1068 errmsg("database \"%s\" does not exist", dbname)));
1069
1070 /*
1071 * We actually need a session lock, so that the lock will persist across
1072 * the commit/restart below. (We could almost get away with letting the
1073 * lock be released at commit, except that someone could try to move
1074 * relations of the DB back into the old directory while we rmtree() it.)
1075 */
1076 LockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1077 AccessExclusiveLock);
1078
1079 /*
1080 * Permission checks
1081 */
1082 if (!pg_database_ownercheck(db_id, GetUserId()))
1083 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1084 dbname);
1085
1086 /*
1087 * Obviously can't move the tables of my own database
1088 */
1089 if (db_id == MyDatabaseId)
1090 ereport(ERROR,
1091 (errcode(ERRCODE_OBJECT_IN_USE),
1092 errmsg("cannot change the tablespace of the currently open database")));
1093
1094 /*
1095 * Get tablespace's oid
1096 */
1097 dst_tblspcoid = get_tablespace_oid(tblspcname, false);
1098
1099 /*
1100 * Permission checks
1101 */
1102 aclresult = pg_tablespace_aclcheck(dst_tblspcoid, GetUserId(),
1103 ACL_CREATE);
1104 if (aclresult != ACLCHECK_OK)
1105 aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
1106 tblspcname);
1107
1108 /*
1109 * pg_global must never be the default tablespace
1110 */
1111 if (dst_tblspcoid == GLOBALTABLESPACE_OID)
1112 ereport(ERROR,
1113 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1114 errmsg("pg_global cannot be used as default tablespace")));
1115
1116 /*
1117 * No-op if same tablespace
1118 */
1119 if (src_tblspcoid == dst_tblspcoid)
1120 {
1121 heap_close(pgdbrel, NoLock);
1122 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1123 AccessExclusiveLock);
1124 return;
1125 }
1126
1127 /*
1128 * Check for other backends in the target database. (Because we hold the
1129 * database lock, no new ones can start after this.)
1130 *
1131 * As in CREATE DATABASE, check this after other error conditions.
1132 */
1133 if (CountOtherDBBackends(db_id, ¬herbackends, &npreparedxacts))
1134 ereport(ERROR,
1135 (errcode(ERRCODE_OBJECT_IN_USE),
1136 errmsg("database \"%s\" is being accessed by other users",
1137 dbname),
1138 errdetail_busy_db(notherbackends, npreparedxacts)));
1139
1140 /*
1141 * Get old and new database paths
1142 */
1143 src_dbpath = GetDatabasePath(db_id, src_tblspcoid);
1144 dst_dbpath = GetDatabasePath(db_id, dst_tblspcoid);
1145
1146 /*
1147 * Force a checkpoint before proceeding. This will force all dirty
1148 * buffers, including those of unlogged tables, out to disk, to ensure
1149 * source database is up-to-date on disk for the copy.
1150 * FlushDatabaseBuffers() would suffice for that, but we also want to
1151 * process any pending unlink requests. Otherwise, the check for existing
1152 * files in the target directory might fail unnecessarily, not to mention
1153 * that the copy might fail due to source files getting deleted under it.
1154 * On Windows, this also ensures that background procs don't hold any open
1155 * files, which would cause rmdir() to fail.
1156 */
1157 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT
1158 | CHECKPOINT_FLUSH_ALL);
1159
1160 /*
1161 * Now drop all buffers holding data of the target database; they should
1162 * no longer be dirty so DropDatabaseBuffers is safe.
1163 *
1164 * It might seem that we could just let these buffers age out of shared
1165 * buffers naturally, since they should not get referenced anymore. The
1166 * problem with that is that if the user later moves the database back to
1167 * its original tablespace, any still-surviving buffers would appear to
1168 * contain valid data again --- but they'd be missing any changes made in
1169 * the database while it was in the new tablespace. In any case, freeing
1170 * buffers that should never be used again seems worth the cycles.
1171 *
1172 * Note: it'd be sufficient to get rid of buffers matching db_id and
1173 * src_tblspcoid, but bufmgr.c presently provides no API for that.
1174 */
1175 DropDatabaseBuffers(db_id);
1176
1177 /*
1178 * Check for existence of files in the target directory, i.e., objects of
1179 * this database that are already in the target tablespace. We can't
1180 * allow the move in such a case, because we would need to change those
1181 * relations' pg_class.reltablespace entries to zero, and we don't have
1182 * access to the DB's pg_class to do so.
1183 */
1184 dstdir = AllocateDir(dst_dbpath);
1185 if (dstdir != NULL)
1186 {
1187 while ((xlde = ReadDir(dstdir, dst_dbpath)) != NULL)
1188 {
1189 if (strcmp(xlde->d_name, ".") == 0 ||
1190 strcmp(xlde->d_name, "..") == 0)
1191 continue;
1192
1193 ereport(ERROR,
1194 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1195 errmsg("some relations of database \"%s\" are already in tablespace \"%s\"",
1196 dbname, tblspcname),
1197 errhint("You must move them back to the database's default tablespace before using this command.")));
1198 }
1199
1200 FreeDir(dstdir);
1201
1202 /*
1203 * The directory exists but is empty. We must remove it before using
1204 * the copydir function.
1205 */
1206 if (rmdir(dst_dbpath) != 0)
1207 elog(ERROR, "could not remove directory \"%s\": %m",
1208 dst_dbpath);
1209 }
1210
1211 /*
1212 * Use an ENSURE block to make sure we remove the debris if the copy fails
1213 * (eg, due to out-of-disk-space). This is not a 100% solution, because
1214 * of the possibility of failure during transaction commit, but it should
1215 * handle most scenarios.
1216 */
1217 fparms.dest_dboid = db_id;
1218 fparms.dest_tsoid = dst_tblspcoid;
1219 PG_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1220 PointerGetDatum(&fparms));
1221 {
1222 /*
1223 * Copy files from the old tablespace to the new one
1224 */
1225 copydir(src_dbpath, dst_dbpath, false);
1226
1227 /*
1228 * Record the filesystem change in XLOG
1229 */
1230 {
1231 xl_dbase_create_rec xlrec;
1232
1233 xlrec.db_id = db_id;
1234 xlrec.tablespace_id = dst_tblspcoid;
1235 xlrec.src_db_id = db_id;
1236 xlrec.src_tablespace_id = src_tblspcoid;
1237
1238 XLogBeginInsert();
1239 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_rec));
1240
1241 (void) XLogInsert(RM_DBASE_ID,
1242 XLOG_DBASE_CREATE | XLR_SPECIAL_REL_UPDATE);
1243 }
1244
1245 /*
1246 * Update the database's pg_database tuple
1247 */
1248 ScanKeyInit(&scankey,
1249 Anum_pg_database_datname,
1250 BTEqualStrategyNumber, F_NAMEEQ,
1251 NameGetDatum(dbname));
1252 sysscan = systable_beginscan(pgdbrel, DatabaseNameIndexId, true,
1253 NULL, 1, &scankey);
1254 oldtuple = systable_getnext(sysscan);
1255 if (!HeapTupleIsValid(oldtuple)) /* shouldn't happen... */
1256 ereport(ERROR,
1257 (errcode(ERRCODE_UNDEFINED_DATABASE),
1258 errmsg("database \"%s\" does not exist", dbname)));
1259
1260 MemSet(new_record, 0, sizeof(new_record));
1261 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1262 MemSet(new_record_repl, false, sizeof(new_record_repl));
1263
1264 new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_tblspcoid);
1265 new_record_repl[Anum_pg_database_dattablespace - 1] = true;
1266
1267 newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(pgdbrel),
1268 new_record,
1269 new_record_nulls, new_record_repl);
1270 simple_heap_update(pgdbrel, &oldtuple->t_self, newtuple);
1271
1272 /* Update indexes */
1273 CatalogUpdateIndexes(pgdbrel, newtuple);
1274
1275 InvokeObjectPostAlterHook(DatabaseRelationId,
1276 HeapTupleGetOid(newtuple), 0);
1277
1278 systable_endscan(sysscan);
1279
1280 /*
1281 * Force another checkpoint here. As in CREATE DATABASE, this is to
1282 * ensure that we don't have to replay a committed XLOG_DBASE_CREATE
1283 * operation, which would cause us to lose any unlogged operations
1284 * done in the new DB tablespace before the next checkpoint.
1285 */
1286 RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
1287
1288 /*
1289 * Force synchronous commit, thus minimizing the window between
1290 * copying the database files and committal of the transaction. If we
1291 * crash before committing, we'll leave an orphaned set of files on
1292 * disk, which is not fatal but not good either.
1293 */
1294 ForceSyncCommit();
1295
1296 /*
1297 * Close pg_database, but keep lock till commit.
1298 */
1299 heap_close(pgdbrel, NoLock);
1300 }
1301 PG_END_ENSURE_ERROR_CLEANUP(movedb_failure_callback,
1302 PointerGetDatum(&fparms));
1303
1304 /*
1305 * Commit the transaction so that the pg_database update is committed. If
1306 * we crash while removing files, the database won't be corrupt, we'll
1307 * just leave some orphaned files in the old directory.
1308 *
1309 * (This is OK because we know we aren't inside a transaction block.)
1310 *
1311 * XXX would it be safe/better to do this inside the ensure block? Not
1312 * convinced it's a good idea; consider elog just after the transaction
1313 * really commits.
1314 */
1315 PopActiveSnapshot();
1316 CommitTransactionCommand();
1317
1318 /* Start new transaction for the remaining work; don't need a snapshot */
1319 StartTransactionCommand();
1320
1321 /*
1322 * Remove files from the old tablespace
1323 */
1324 if (!rmtree(src_dbpath, true))
1325 ereport(WARNING,
1326 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1327 src_dbpath)));
1328
1329 /*
1330 * Record the filesystem change in XLOG
1331 */
1332 {
1333 xl_dbase_drop_rec xlrec;
1334
1335 xlrec.db_id = db_id;
1336 xlrec.tablespace_id = src_tblspcoid;
1337
1338 XLogBeginInsert();
1339 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1340
1341 (void) XLogInsert(RM_DBASE_ID,
1342 XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1343 }
1344
1345 /* Now it's safe to release the database lock */
1346 UnlockSharedObjectForSession(DatabaseRelationId, db_id, 0,
1347 AccessExclusiveLock);
1348 }
1349
1350 /* Error cleanup callback for movedb */
1351 static void
movedb_failure_callback(int code,Datum arg)1352 movedb_failure_callback(int code, Datum arg)
1353 {
1354 movedb_failure_params *fparms = (movedb_failure_params *) DatumGetPointer(arg);
1355 char *dstpath;
1356
1357 /* Get rid of anything we managed to copy to the target directory */
1358 dstpath = GetDatabasePath(fparms->dest_dboid, fparms->dest_tsoid);
1359
1360 (void) rmtree(dstpath, true);
1361 }
1362
1363
1364 /*
1365 * ALTER DATABASE name ...
1366 */
1367 Oid
AlterDatabase(AlterDatabaseStmt * stmt,bool isTopLevel)1368 AlterDatabase(AlterDatabaseStmt *stmt, bool isTopLevel)
1369 {
1370 Relation rel;
1371 Oid dboid;
1372 HeapTuple tuple,
1373 newtuple;
1374 ScanKeyData scankey;
1375 SysScanDesc scan;
1376 ListCell *option;
1377 bool dbistemplate = false;
1378 bool dballowconnections = true;
1379 int dbconnlimit = -1;
1380 DefElem *distemplate = NULL;
1381 DefElem *dallowconnections = NULL;
1382 DefElem *dconnlimit = NULL;
1383 DefElem *dtablespace = NULL;
1384 Datum new_record[Natts_pg_database];
1385 bool new_record_nulls[Natts_pg_database];
1386 bool new_record_repl[Natts_pg_database];
1387
1388 /* Extract options from the statement node tree */
1389 foreach(option, stmt->options)
1390 {
1391 DefElem *defel = (DefElem *) lfirst(option);
1392
1393 if (strcmp(defel->defname, "is_template") == 0)
1394 {
1395 if (distemplate)
1396 ereport(ERROR,
1397 (errcode(ERRCODE_SYNTAX_ERROR),
1398 errmsg("conflicting or redundant options")));
1399 distemplate = defel;
1400 }
1401 else if (strcmp(defel->defname, "allow_connections") == 0)
1402 {
1403 if (dallowconnections)
1404 ereport(ERROR,
1405 (errcode(ERRCODE_SYNTAX_ERROR),
1406 errmsg("conflicting or redundant options")));
1407 dallowconnections = defel;
1408 }
1409 else if (strcmp(defel->defname, "connection_limit") == 0)
1410 {
1411 if (dconnlimit)
1412 ereport(ERROR,
1413 (errcode(ERRCODE_SYNTAX_ERROR),
1414 errmsg("conflicting or redundant options")));
1415 dconnlimit = defel;
1416 }
1417 else if (strcmp(defel->defname, "tablespace") == 0)
1418 {
1419 if (dtablespace)
1420 ereport(ERROR,
1421 (errcode(ERRCODE_SYNTAX_ERROR),
1422 errmsg("conflicting or redundant options")));
1423 dtablespace = defel;
1424 }
1425 else
1426 ereport(ERROR,
1427 (errcode(ERRCODE_SYNTAX_ERROR),
1428 errmsg("option \"%s\" not recognized", defel->defname)));
1429 }
1430
1431 if (dtablespace)
1432 {
1433 /*
1434 * While the SET TABLESPACE syntax doesn't allow any other options,
1435 * somebody could write "WITH TABLESPACE ...". Forbid any other
1436 * options from being specified in that case.
1437 */
1438 if (list_length(stmt->options) != 1)
1439 ereport(ERROR,
1440 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1441 errmsg("option \"%s\" cannot be specified with other options",
1442 dtablespace->defname)));
1443 /* this case isn't allowed within a transaction block */
1444 PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
1445 movedb(stmt->dbname, defGetString(dtablespace));
1446 return InvalidOid;
1447 }
1448
1449 if (distemplate && distemplate->arg)
1450 dbistemplate = defGetBoolean(distemplate);
1451 if (dallowconnections && dallowconnections->arg)
1452 dballowconnections = defGetBoolean(dallowconnections);
1453 if (dconnlimit && dconnlimit->arg)
1454 {
1455 dbconnlimit = defGetInt32(dconnlimit);
1456 if (dbconnlimit < -1)
1457 ereport(ERROR,
1458 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1459 errmsg("invalid connection limit: %d", dbconnlimit)));
1460 }
1461
1462 /*
1463 * Get the old tuple. We don't need a lock on the database per se,
1464 * because we're not going to do anything that would mess up incoming
1465 * connections.
1466 */
1467 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1468 ScanKeyInit(&scankey,
1469 Anum_pg_database_datname,
1470 BTEqualStrategyNumber, F_NAMEEQ,
1471 NameGetDatum(stmt->dbname));
1472 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1473 NULL, 1, &scankey);
1474 tuple = systable_getnext(scan);
1475 if (!HeapTupleIsValid(tuple))
1476 ereport(ERROR,
1477 (errcode(ERRCODE_UNDEFINED_DATABASE),
1478 errmsg("database \"%s\" does not exist", stmt->dbname)));
1479
1480 dboid = HeapTupleGetOid(tuple);
1481
1482 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1483 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1484 stmt->dbname);
1485
1486 /*
1487 * In order to avoid getting locked out and having to go through
1488 * standalone mode, we refuse to disallow connections to the database
1489 * we're currently connected to. Lockout can still happen with concurrent
1490 * sessions but the likeliness of that is not high enough to worry about.
1491 */
1492 if (!dballowconnections && dboid == MyDatabaseId)
1493 ereport(ERROR,
1494 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1495 errmsg("cannot disallow connections for current database")));
1496
1497 /*
1498 * Build an updated tuple, perusing the information just obtained
1499 */
1500 MemSet(new_record, 0, sizeof(new_record));
1501 MemSet(new_record_nulls, false, sizeof(new_record_nulls));
1502 MemSet(new_record_repl, false, sizeof(new_record_repl));
1503
1504 if (distemplate)
1505 {
1506 new_record[Anum_pg_database_datistemplate - 1] = BoolGetDatum(dbistemplate);
1507 new_record_repl[Anum_pg_database_datistemplate - 1] = true;
1508 }
1509 if (dallowconnections)
1510 {
1511 new_record[Anum_pg_database_datallowconn - 1] = BoolGetDatum(dballowconnections);
1512 new_record_repl[Anum_pg_database_datallowconn - 1] = true;
1513 }
1514 if (dconnlimit)
1515 {
1516 new_record[Anum_pg_database_datconnlimit - 1] = Int32GetDatum(dbconnlimit);
1517 new_record_repl[Anum_pg_database_datconnlimit - 1] = true;
1518 }
1519
1520 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), new_record,
1521 new_record_nulls, new_record_repl);
1522 simple_heap_update(rel, &tuple->t_self, newtuple);
1523
1524 /* Update indexes */
1525 CatalogUpdateIndexes(rel, newtuple);
1526
1527 InvokeObjectPostAlterHook(DatabaseRelationId,
1528 HeapTupleGetOid(newtuple), 0);
1529
1530 systable_endscan(scan);
1531
1532 /* Close pg_database, but keep lock till commit */
1533 heap_close(rel, NoLock);
1534
1535 return dboid;
1536 }
1537
1538
1539 /*
1540 * ALTER DATABASE name SET ...
1541 */
1542 Oid
AlterDatabaseSet(AlterDatabaseSetStmt * stmt)1543 AlterDatabaseSet(AlterDatabaseSetStmt *stmt)
1544 {
1545 Oid datid = get_database_oid(stmt->dbname, false);
1546
1547 /*
1548 * Obtain a lock on the database and make sure it didn't go away in the
1549 * meantime.
1550 */
1551 shdepLockAndCheckObject(DatabaseRelationId, datid);
1552
1553 if (!pg_database_ownercheck(datid, GetUserId()))
1554 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1555 stmt->dbname);
1556
1557 AlterSetting(datid, InvalidOid, stmt->setstmt);
1558
1559 UnlockSharedObject(DatabaseRelationId, datid, 0, AccessShareLock);
1560
1561 return datid;
1562 }
1563
1564
1565 /*
1566 * ALTER DATABASE name OWNER TO newowner
1567 */
1568 ObjectAddress
AlterDatabaseOwner(const char * dbname,Oid newOwnerId)1569 AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
1570 {
1571 Oid db_id;
1572 HeapTuple tuple;
1573 Relation rel;
1574 ScanKeyData scankey;
1575 SysScanDesc scan;
1576 Form_pg_database datForm;
1577 ObjectAddress address;
1578
1579 /*
1580 * Get the old tuple. We don't need a lock on the database per se,
1581 * because we're not going to do anything that would mess up incoming
1582 * connections.
1583 */
1584 rel = heap_open(DatabaseRelationId, RowExclusiveLock);
1585 ScanKeyInit(&scankey,
1586 Anum_pg_database_datname,
1587 BTEqualStrategyNumber, F_NAMEEQ,
1588 NameGetDatum(dbname));
1589 scan = systable_beginscan(rel, DatabaseNameIndexId, true,
1590 NULL, 1, &scankey);
1591 tuple = systable_getnext(scan);
1592 if (!HeapTupleIsValid(tuple))
1593 ereport(ERROR,
1594 (errcode(ERRCODE_UNDEFINED_DATABASE),
1595 errmsg("database \"%s\" does not exist", dbname)));
1596
1597 db_id = HeapTupleGetOid(tuple);
1598 datForm = (Form_pg_database) GETSTRUCT(tuple);
1599
1600 /*
1601 * If the new owner is the same as the existing owner, consider the
1602 * command to have succeeded. This is to be consistent with other
1603 * objects.
1604 */
1605 if (datForm->datdba != newOwnerId)
1606 {
1607 Datum repl_val[Natts_pg_database];
1608 bool repl_null[Natts_pg_database];
1609 bool repl_repl[Natts_pg_database];
1610 Acl *newAcl;
1611 Datum aclDatum;
1612 bool isNull;
1613 HeapTuple newtuple;
1614
1615 /* Otherwise, must be owner of the existing object */
1616 if (!pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId()))
1617 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1618 dbname);
1619
1620 /* Must be able to become new owner */
1621 check_is_member_of_role(GetUserId(), newOwnerId);
1622
1623 /*
1624 * must have createdb rights
1625 *
1626 * NOTE: This is different from other alter-owner checks in that the
1627 * current user is checked for createdb privileges instead of the
1628 * destination owner. This is consistent with the CREATE case for
1629 * databases. Because superusers will always have this right, we need
1630 * no special case for them.
1631 */
1632 if (!have_createdb_privilege())
1633 ereport(ERROR,
1634 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1635 errmsg("permission denied to change owner of database")));
1636
1637 memset(repl_null, false, sizeof(repl_null));
1638 memset(repl_repl, false, sizeof(repl_repl));
1639
1640 repl_repl[Anum_pg_database_datdba - 1] = true;
1641 repl_val[Anum_pg_database_datdba - 1] = ObjectIdGetDatum(newOwnerId);
1642
1643 /*
1644 * Determine the modified ACL for the new owner. This is only
1645 * necessary when the ACL is non-null.
1646 */
1647 aclDatum = heap_getattr(tuple,
1648 Anum_pg_database_datacl,
1649 RelationGetDescr(rel),
1650 &isNull);
1651 if (!isNull)
1652 {
1653 newAcl = aclnewowner(DatumGetAclP(aclDatum),
1654 datForm->datdba, newOwnerId);
1655 repl_repl[Anum_pg_database_datacl - 1] = true;
1656 repl_val[Anum_pg_database_datacl - 1] = PointerGetDatum(newAcl);
1657 }
1658
1659 newtuple = heap_modify_tuple(tuple, RelationGetDescr(rel), repl_val, repl_null, repl_repl);
1660 simple_heap_update(rel, &newtuple->t_self, newtuple);
1661 CatalogUpdateIndexes(rel, newtuple);
1662
1663 heap_freetuple(newtuple);
1664
1665 /* Update owner dependency reference */
1666 changeDependencyOnOwner(DatabaseRelationId, HeapTupleGetOid(tuple),
1667 newOwnerId);
1668 }
1669
1670 InvokeObjectPostAlterHook(DatabaseRelationId, HeapTupleGetOid(tuple), 0);
1671
1672 ObjectAddressSet(address, DatabaseRelationId, db_id);
1673
1674 systable_endscan(scan);
1675
1676 /* Close pg_database, but keep lock till commit */
1677 heap_close(rel, NoLock);
1678
1679 return address;
1680 }
1681
1682
1683 /*
1684 * Helper functions
1685 */
1686
1687 /*
1688 * Look up info about the database named "name". If the database exists,
1689 * obtain the specified lock type on it, fill in any of the remaining
1690 * parameters that aren't NULL, and return TRUE. If no such database,
1691 * return FALSE.
1692 */
1693 static bool
get_db_info(const char * name,LOCKMODE lockmode,Oid * dbIdP,Oid * ownerIdP,int * encodingP,bool * dbIsTemplateP,bool * dbAllowConnP,Oid * dbLastSysOidP,TransactionId * dbFrozenXidP,MultiXactId * dbMinMultiP,Oid * dbTablespace,char ** dbCollate,char ** dbCtype)1694 get_db_info(const char *name, LOCKMODE lockmode,
1695 Oid *dbIdP, Oid *ownerIdP,
1696 int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
1697 Oid *dbLastSysOidP, TransactionId *dbFrozenXidP,
1698 MultiXactId *dbMinMultiP,
1699 Oid *dbTablespace, char **dbCollate, char **dbCtype)
1700 {
1701 bool result = false;
1702 Relation relation;
1703
1704 AssertArg(name);
1705
1706 /* Caller may wish to grab a better lock on pg_database beforehand... */
1707 relation = heap_open(DatabaseRelationId, AccessShareLock);
1708
1709 /*
1710 * Loop covers the rare case where the database is renamed before we can
1711 * lock it. We try again just in case we can find a new one of the same
1712 * name.
1713 */
1714 for (;;)
1715 {
1716 ScanKeyData scanKey;
1717 SysScanDesc scan;
1718 HeapTuple tuple;
1719 Oid dbOid;
1720
1721 /*
1722 * there's no syscache for database-indexed-by-name, so must do it the
1723 * hard way
1724 */
1725 ScanKeyInit(&scanKey,
1726 Anum_pg_database_datname,
1727 BTEqualStrategyNumber, F_NAMEEQ,
1728 NameGetDatum(name));
1729
1730 scan = systable_beginscan(relation, DatabaseNameIndexId, true,
1731 NULL, 1, &scanKey);
1732
1733 tuple = systable_getnext(scan);
1734
1735 if (!HeapTupleIsValid(tuple))
1736 {
1737 /* definitely no database of that name */
1738 systable_endscan(scan);
1739 break;
1740 }
1741
1742 dbOid = HeapTupleGetOid(tuple);
1743
1744 systable_endscan(scan);
1745
1746 /*
1747 * Now that we have a database OID, we can try to lock the DB.
1748 */
1749 if (lockmode != NoLock)
1750 LockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1751
1752 /*
1753 * And now, re-fetch the tuple by OID. If it's still there and still
1754 * the same name, we win; else, drop the lock and loop back to try
1755 * again.
1756 */
1757 tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbOid));
1758 if (HeapTupleIsValid(tuple))
1759 {
1760 Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);
1761
1762 if (strcmp(name, NameStr(dbform->datname)) == 0)
1763 {
1764 /* oid of the database */
1765 if (dbIdP)
1766 *dbIdP = dbOid;
1767 /* oid of the owner */
1768 if (ownerIdP)
1769 *ownerIdP = dbform->datdba;
1770 /* character encoding */
1771 if (encodingP)
1772 *encodingP = dbform->encoding;
1773 /* allowed as template? */
1774 if (dbIsTemplateP)
1775 *dbIsTemplateP = dbform->datistemplate;
1776 /* allowing connections? */
1777 if (dbAllowConnP)
1778 *dbAllowConnP = dbform->datallowconn;
1779 /* last system OID used in database */
1780 if (dbLastSysOidP)
1781 *dbLastSysOidP = dbform->datlastsysoid;
1782 /* limit of frozen XIDs */
1783 if (dbFrozenXidP)
1784 *dbFrozenXidP = dbform->datfrozenxid;
1785 /* minimum MultixactId */
1786 if (dbMinMultiP)
1787 *dbMinMultiP = dbform->datminmxid;
1788 /* default tablespace for this database */
1789 if (dbTablespace)
1790 *dbTablespace = dbform->dattablespace;
1791 /* default locale settings for this database */
1792 if (dbCollate)
1793 *dbCollate = pstrdup(NameStr(dbform->datcollate));
1794 if (dbCtype)
1795 *dbCtype = pstrdup(NameStr(dbform->datctype));
1796 ReleaseSysCache(tuple);
1797 result = true;
1798 break;
1799 }
1800 /* can only get here if it was just renamed */
1801 ReleaseSysCache(tuple);
1802 }
1803
1804 if (lockmode != NoLock)
1805 UnlockSharedObject(DatabaseRelationId, dbOid, 0, lockmode);
1806 }
1807
1808 heap_close(relation, AccessShareLock);
1809
1810 return result;
1811 }
1812
1813 /* Check if current user has createdb privileges */
1814 static bool
have_createdb_privilege(void)1815 have_createdb_privilege(void)
1816 {
1817 bool result = false;
1818 HeapTuple utup;
1819
1820 /* Superusers can always do everything */
1821 if (superuser())
1822 return true;
1823
1824 utup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(GetUserId()));
1825 if (HeapTupleIsValid(utup))
1826 {
1827 result = ((Form_pg_authid) GETSTRUCT(utup))->rolcreatedb;
1828 ReleaseSysCache(utup);
1829 }
1830 return result;
1831 }
1832
1833 /*
1834 * Remove tablespace directories
1835 *
1836 * We don't know what tablespaces db_id is using, so iterate through all
1837 * tablespaces removing <tablespace>/db_id
1838 */
1839 static void
remove_dbtablespaces(Oid db_id)1840 remove_dbtablespaces(Oid db_id)
1841 {
1842 Relation rel;
1843 HeapScanDesc scan;
1844 HeapTuple tuple;
1845
1846 rel = heap_open(TableSpaceRelationId, AccessShareLock);
1847 scan = heap_beginscan_catalog(rel, 0, NULL);
1848 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1849 {
1850 Oid dsttablespace = HeapTupleGetOid(tuple);
1851 char *dstpath;
1852 struct stat st;
1853
1854 /* Don't mess with the global tablespace */
1855 if (dsttablespace == GLOBALTABLESPACE_OID)
1856 continue;
1857
1858 dstpath = GetDatabasePath(db_id, dsttablespace);
1859
1860 if (lstat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode))
1861 {
1862 /* Assume we can ignore it */
1863 pfree(dstpath);
1864 continue;
1865 }
1866
1867 if (!rmtree(dstpath, true))
1868 ereport(WARNING,
1869 (errmsg("some useless files may be left behind in old database directory \"%s\"",
1870 dstpath)));
1871
1872 /* Record the filesystem change in XLOG */
1873 {
1874 xl_dbase_drop_rec xlrec;
1875
1876 xlrec.db_id = db_id;
1877 xlrec.tablespace_id = dsttablespace;
1878
1879 XLogBeginInsert();
1880 XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec));
1881
1882 (void) XLogInsert(RM_DBASE_ID,
1883 XLOG_DBASE_DROP | XLR_SPECIAL_REL_UPDATE);
1884 }
1885
1886 pfree(dstpath);
1887 }
1888
1889 heap_endscan(scan);
1890 heap_close(rel, AccessShareLock);
1891 }
1892
1893 /*
1894 * Check for existing files that conflict with a proposed new DB OID;
1895 * return TRUE if there are any
1896 *
1897 * If there were a subdirectory in any tablespace matching the proposed new
1898 * OID, we'd get a create failure due to the duplicate name ... and then we'd
1899 * try to remove that already-existing subdirectory during the cleanup in
1900 * remove_dbtablespaces. Nuking existing files seems like a bad idea, so
1901 * instead we make this extra check before settling on the OID of the new
1902 * database. This exactly parallels what GetNewRelFileNode() does for table
1903 * relfilenode values.
1904 */
1905 static bool
check_db_file_conflict(Oid db_id)1906 check_db_file_conflict(Oid db_id)
1907 {
1908 bool result = false;
1909 Relation rel;
1910 HeapScanDesc scan;
1911 HeapTuple tuple;
1912
1913 rel = heap_open(TableSpaceRelationId, AccessShareLock);
1914 scan = heap_beginscan_catalog(rel, 0, NULL);
1915 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1916 {
1917 Oid dsttablespace = HeapTupleGetOid(tuple);
1918 char *dstpath;
1919 struct stat st;
1920
1921 /* Don't mess with the global tablespace */
1922 if (dsttablespace == GLOBALTABLESPACE_OID)
1923 continue;
1924
1925 dstpath = GetDatabasePath(db_id, dsttablespace);
1926
1927 if (lstat(dstpath, &st) == 0)
1928 {
1929 /* Found a conflicting file (or directory, whatever) */
1930 pfree(dstpath);
1931 result = true;
1932 break;
1933 }
1934
1935 pfree(dstpath);
1936 }
1937
1938 heap_endscan(scan);
1939 heap_close(rel, AccessShareLock);
1940
1941 return result;
1942 }
1943
1944 /*
1945 * Issue a suitable errdetail message for a busy database
1946 */
1947 static int
errdetail_busy_db(int notherbackends,int npreparedxacts)1948 errdetail_busy_db(int notherbackends, int npreparedxacts)
1949 {
1950 if (notherbackends > 0 && npreparedxacts > 0)
1951
1952 /*
1953 * We don't deal with singular versus plural here, since gettext
1954 * doesn't support multiple plurals in one string.
1955 */
1956 errdetail("There are %d other session(s) and %d prepared transaction(s) using the database.",
1957 notherbackends, npreparedxacts);
1958 else if (notherbackends > 0)
1959 errdetail_plural("There is %d other session using the database.",
1960 "There are %d other sessions using the database.",
1961 notherbackends,
1962 notherbackends);
1963 else
1964 errdetail_plural("There is %d prepared transaction using the database.",
1965 "There are %d prepared transactions using the database.",
1966 npreparedxacts,
1967 npreparedxacts);
1968 return 0; /* just to keep ereport macro happy */
1969 }
1970
1971 /*
1972 * get_database_oid - given a database name, look up the OID
1973 *
1974 * If missing_ok is false, throw an error if database name not found. If
1975 * true, just return InvalidOid.
1976 */
1977 Oid
get_database_oid(const char * dbname,bool missing_ok)1978 get_database_oid(const char *dbname, bool missing_ok)
1979 {
1980 Relation pg_database;
1981 ScanKeyData entry[1];
1982 SysScanDesc scan;
1983 HeapTuple dbtuple;
1984 Oid oid;
1985
1986 /*
1987 * There's no syscache for pg_database indexed by name, so we must look
1988 * the hard way.
1989 */
1990 pg_database = heap_open(DatabaseRelationId, AccessShareLock);
1991 ScanKeyInit(&entry[0],
1992 Anum_pg_database_datname,
1993 BTEqualStrategyNumber, F_NAMEEQ,
1994 CStringGetDatum(dbname));
1995 scan = systable_beginscan(pg_database, DatabaseNameIndexId, true,
1996 NULL, 1, entry);
1997
1998 dbtuple = systable_getnext(scan);
1999
2000 /* We assume that there can be at most one matching tuple */
2001 if (HeapTupleIsValid(dbtuple))
2002 oid = HeapTupleGetOid(dbtuple);
2003 else
2004 oid = InvalidOid;
2005
2006 systable_endscan(scan);
2007 heap_close(pg_database, AccessShareLock);
2008
2009 if (!OidIsValid(oid) && !missing_ok)
2010 ereport(ERROR,
2011 (errcode(ERRCODE_UNDEFINED_DATABASE),
2012 errmsg("database \"%s\" does not exist",
2013 dbname)));
2014
2015 return oid;
2016 }
2017
2018
2019 /*
2020 * get_database_name - given a database OID, look up the name
2021 *
2022 * Returns a palloc'd string, or NULL if no such database.
2023 */
2024 char *
get_database_name(Oid dbid)2025 get_database_name(Oid dbid)
2026 {
2027 HeapTuple dbtuple;
2028 char *result;
2029
2030 dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
2031 if (HeapTupleIsValid(dbtuple))
2032 {
2033 result = pstrdup(NameStr(((Form_pg_database) GETSTRUCT(dbtuple))->datname));
2034 ReleaseSysCache(dbtuple);
2035 }
2036 else
2037 result = NULL;
2038
2039 return result;
2040 }
2041
2042 /*
2043 * DATABASE resource manager's routines
2044 */
2045 void
dbase_redo(XLogReaderState * record)2046 dbase_redo(XLogReaderState *record)
2047 {
2048 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
2049
2050 /* Backup blocks are not used in dbase records */
2051 Assert(!XLogRecHasAnyBlockRefs(record));
2052
2053 if (info == XLOG_DBASE_CREATE)
2054 {
2055 xl_dbase_create_rec *xlrec = (xl_dbase_create_rec *) XLogRecGetData(record);
2056 char *src_path;
2057 char *dst_path;
2058 struct stat st;
2059
2060 src_path = GetDatabasePath(xlrec->src_db_id, xlrec->src_tablespace_id);
2061 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2062
2063 /*
2064 * Our theory for replaying a CREATE is to forcibly drop the target
2065 * subdirectory if present, then re-copy the source data. This may be
2066 * more work than needed, but it is simple to implement.
2067 */
2068 if (stat(dst_path, &st) == 0 && S_ISDIR(st.st_mode))
2069 {
2070 if (!rmtree(dst_path, true))
2071 /* If this failed, copydir() below is going to error. */
2072 ereport(WARNING,
2073 (errmsg("some useless files may be left behind in old database directory \"%s\"",
2074 dst_path)));
2075 }
2076
2077 /*
2078 * Force dirty buffers out to disk, to ensure source database is
2079 * up-to-date for the copy.
2080 */
2081 FlushDatabaseBuffers(xlrec->src_db_id);
2082
2083 /*
2084 * Copy this subdirectory to the new location
2085 *
2086 * We don't need to copy subdirectories
2087 */
2088 copydir(src_path, dst_path, false);
2089 }
2090 else if (info == XLOG_DBASE_DROP)
2091 {
2092 xl_dbase_drop_rec *xlrec = (xl_dbase_drop_rec *) XLogRecGetData(record);
2093 char *dst_path;
2094
2095 dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
2096
2097 if (InHotStandby)
2098 {
2099 /*
2100 * Lock database while we resolve conflicts to ensure that
2101 * InitPostgres() cannot fully re-execute concurrently. This
2102 * avoids backends re-connecting automatically to same database,
2103 * which can happen in some cases.
2104 */
2105 LockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2106 ResolveRecoveryConflictWithDatabase(xlrec->db_id);
2107 }
2108
2109 /* Drop pages for this database that are in the shared buffer cache */
2110 DropDatabaseBuffers(xlrec->db_id);
2111
2112 /* Also, clean out any fsync requests that might be pending in md.c */
2113 ForgetDatabaseFsyncRequests(xlrec->db_id);
2114
2115 /* Clean out the xlog relcache too */
2116 XLogDropDatabase(xlrec->db_id);
2117
2118 /* And remove the physical files */
2119 if (!rmtree(dst_path, true))
2120 ereport(WARNING,
2121 (errmsg("some useless files may be left behind in old database directory \"%s\"",
2122 dst_path)));
2123
2124 if (InHotStandby)
2125 {
2126 /*
2127 * Release locks prior to commit. XXX There is a race condition
2128 * here that may allow backends to reconnect, but the window for
2129 * this is small because the gap between here and commit is mostly
2130 * fairly small and it is unlikely that people will be dropping
2131 * databases that we are trying to connect to anyway.
2132 */
2133 UnlockSharedObjectForSession(DatabaseRelationId, xlrec->db_id, 0, AccessExclusiveLock);
2134 }
2135 }
2136 else
2137 elog(PANIC, "dbase_redo: unknown op code %u", info);
2138 }
2139