1 /*-------------------------------------------------------------------------
2  *
3  * tablespace.c
4  *	  Commands to manipulate table spaces
5  *
6  * Tablespaces in PostgreSQL are designed to allow users to determine
7  * where the data file(s) for a given database object reside on the file
8  * system.
9  *
10  * A tablespace represents a directory on the file system. At tablespace
11  * creation time, the directory must be empty. To simplify things and
12  * remove the possibility of having file name conflicts, we isolate
13  * files within a tablespace into database-specific subdirectories.
14  *
15  * To support file access via the information given in RelFileNode, we
16  * maintain a symbolic-link map in $PGDATA/pg_tblspc. The symlinks are
17  * named by tablespace OIDs and point to the actual tablespace directories.
18  * There is also a per-cluster version directory in each tablespace.
19  * Thus the full path to an arbitrary file is
20  *			$PGDATA/pg_tblspc/spcoid/PG_MAJORVER_CATVER/dboid/relfilenode
21  * e.g.
22  *			$PGDATA/pg_tblspc/20981/PG_9.0_201002161/719849/83292814
23  *
24  * There are two tablespaces created at initdb time: pg_global (for shared
25  * tables) and pg_default (for everything else).  For backwards compatibility
26  * and to remain functional on platforms without symlinks, these tablespaces
27  * are accessed specially: they are respectively
28  *			$PGDATA/global/relfilenode
29  *			$PGDATA/base/dboid/relfilenode
30  *
31  * To allow CREATE DATABASE to give a new database a default tablespace
32  * that's different from the template database's default, we make the
33  * provision that a zero in pg_class.reltablespace means the database's
34  * default tablespace.  Without this, CREATE DATABASE would have to go in
35  * and munge the system catalogs of the new database.
36  *
37  *
38  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
39  * Portions Copyright (c) 1994, Regents of the University of California
40  *
41  *
42  * IDENTIFICATION
43  *	  src/backend/commands/tablespace.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <unistd.h>
50 #include <dirent.h>
51 #include <sys/stat.h>
52 
53 #include "access/heapam.h"
54 #include "access/reloptions.h"
55 #include "access/htup_details.h"
56 #include "access/sysattr.h"
57 #include "access/tableam.h"
58 #include "access/xact.h"
59 #include "access/xlog.h"
60 #include "access/xloginsert.h"
61 #include "catalog/catalog.h"
62 #include "catalog/dependency.h"
63 #include "catalog/indexing.h"
64 #include "catalog/namespace.h"
65 #include "catalog/objectaccess.h"
66 #include "catalog/pg_namespace.h"
67 #include "catalog/pg_tablespace.h"
68 #include "commands/comment.h"
69 #include "commands/seclabel.h"
70 #include "commands/tablecmds.h"
71 #include "commands/tablespace.h"
72 #include "common/file_perm.h"
73 #include "miscadmin.h"
74 #include "postmaster/bgwriter.h"
75 #include "storage/fd.h"
76 #include "storage/lmgr.h"
77 #include "storage/standby.h"
78 #include "utils/acl.h"
79 #include "utils/builtins.h"
80 #include "utils/fmgroids.h"
81 #include "utils/guc.h"
82 #include "utils/lsyscache.h"
83 #include "utils/memutils.h"
84 #include "utils/rel.h"
85 #include "utils/varlena.h"
86 
87 
88 /* GUC variables */
89 char	   *default_tablespace = NULL;
90 char	   *temp_tablespaces = NULL;
91 
92 
93 static void create_tablespace_directories(const char *location,
94 										  const Oid tablespaceoid);
95 static bool destroy_tablespace_directories(Oid tablespaceoid, bool redo);
96 
97 
98 /*
99  * Each database using a table space is isolated into its own name space
100  * by a subdirectory named for the database OID.  On first creation of an
101  * object in the tablespace, create the subdirectory.  If the subdirectory
102  * already exists, fall through quietly.
103  *
104  * isRedo indicates that we are creating an object during WAL replay.
105  * In this case we will cope with the possibility of the tablespace
106  * directory not being there either --- this could happen if we are
107  * replaying an operation on a table in a subsequently-dropped tablespace.
108  * We handle this by making a directory in the place where the tablespace
109  * symlink would normally be.  This isn't an exact replay of course, but
110  * it's the best we can do given the available information.
111  *
112  * If tablespaces are not supported, we still need it in case we have to
113  * re-create a database subdirectory (of $PGDATA/base) during WAL replay.
114  */
115 void
TablespaceCreateDbspace(Oid spcNode,Oid dbNode,bool isRedo)116 TablespaceCreateDbspace(Oid spcNode, Oid dbNode, bool isRedo)
117 {
118 	struct stat st;
119 	char	   *dir;
120 
121 	/*
122 	 * The global tablespace doesn't have per-database subdirectories, so
123 	 * nothing to do for it.
124 	 */
125 	if (spcNode == GLOBALTABLESPACE_OID)
126 		return;
127 
128 	Assert(OidIsValid(spcNode));
129 	Assert(OidIsValid(dbNode));
130 
131 	dir = GetDatabasePath(dbNode, spcNode);
132 
133 	if (stat(dir, &st) < 0)
134 	{
135 		/* Directory does not exist? */
136 		if (errno == ENOENT)
137 		{
138 			/*
139 			 * Acquire TablespaceCreateLock to ensure that no DROP TABLESPACE
140 			 * or TablespaceCreateDbspace is running concurrently.
141 			 */
142 			LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
143 
144 			/*
145 			 * Recheck to see if someone created the directory while we were
146 			 * waiting for lock.
147 			 */
148 			if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode))
149 			{
150 				/* Directory was created */
151 			}
152 			else
153 			{
154 				/* Directory creation failed? */
155 				if (MakePGDirectory(dir) < 0)
156 				{
157 					char	   *parentdir;
158 
159 					/* Failure other than not exists or not in WAL replay? */
160 					if (errno != ENOENT || !isRedo)
161 						ereport(ERROR,
162 								(errcode_for_file_access(),
163 								 errmsg("could not create directory \"%s\": %m",
164 										dir)));
165 
166 					/*
167 					 * Parent directories are missing during WAL replay, so
168 					 * continue by creating simple parent directories rather
169 					 * than a symlink.
170 					 */
171 
172 					/* create two parents up if not exist */
173 					parentdir = pstrdup(dir);
174 					get_parent_directory(parentdir);
175 					get_parent_directory(parentdir);
176 					/* Can't create parent and it doesn't already exist? */
177 					if (MakePGDirectory(parentdir) < 0 && errno != EEXIST)
178 						ereport(ERROR,
179 								(errcode_for_file_access(),
180 								 errmsg("could not create directory \"%s\": %m",
181 										parentdir)));
182 					pfree(parentdir);
183 
184 					/* create one parent up if not exist */
185 					parentdir = pstrdup(dir);
186 					get_parent_directory(parentdir);
187 					/* Can't create parent and it doesn't already exist? */
188 					if (MakePGDirectory(parentdir) < 0 && errno != EEXIST)
189 						ereport(ERROR,
190 								(errcode_for_file_access(),
191 								 errmsg("could not create directory \"%s\": %m",
192 										parentdir)));
193 					pfree(parentdir);
194 
195 					/* Create database directory */
196 					if (MakePGDirectory(dir) < 0)
197 						ereport(ERROR,
198 								(errcode_for_file_access(),
199 								 errmsg("could not create directory \"%s\": %m",
200 										dir)));
201 				}
202 			}
203 
204 			LWLockRelease(TablespaceCreateLock);
205 		}
206 		else
207 		{
208 			ereport(ERROR,
209 					(errcode_for_file_access(),
210 					 errmsg("could not stat directory \"%s\": %m", dir)));
211 		}
212 	}
213 	else
214 	{
215 		/* Is it not a directory? */
216 		if (!S_ISDIR(st.st_mode))
217 			ereport(ERROR,
218 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
219 					 errmsg("\"%s\" exists but is not a directory",
220 							dir)));
221 	}
222 
223 	pfree(dir);
224 }
225 
226 /*
227  * Create a table space
228  *
229  * Only superusers can create a tablespace. This seems a reasonable restriction
230  * since we're determining the system layout and, anyway, we probably have
231  * root if we're doing this kind of activity
232  */
233 Oid
CreateTableSpace(CreateTableSpaceStmt * stmt)234 CreateTableSpace(CreateTableSpaceStmt *stmt)
235 {
236 #ifdef HAVE_SYMLINK
237 	Relation	rel;
238 	Datum		values[Natts_pg_tablespace];
239 	bool		nulls[Natts_pg_tablespace];
240 	HeapTuple	tuple;
241 	Oid			tablespaceoid;
242 	char	   *location;
243 	Oid			ownerId;
244 	Datum		newOptions;
245 
246 	/* Must be super user */
247 	if (!superuser())
248 		ereport(ERROR,
249 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
250 				 errmsg("permission denied to create tablespace \"%s\"",
251 						stmt->tablespacename),
252 				 errhint("Must be superuser to create a tablespace.")));
253 
254 	/* However, the eventual owner of the tablespace need not be */
255 	if (stmt->owner)
256 		ownerId = get_rolespec_oid(stmt->owner, false);
257 	else
258 		ownerId = GetUserId();
259 
260 	/* Unix-ify the offered path, and strip any trailing slashes */
261 	location = pstrdup(stmt->location);
262 	canonicalize_path(location);
263 
264 	/* disallow quotes, else CREATE DATABASE would be at risk */
265 	if (strchr(location, '\''))
266 		ereport(ERROR,
267 				(errcode(ERRCODE_INVALID_NAME),
268 				 errmsg("tablespace location cannot contain single quotes")));
269 
270 	/*
271 	 * Allowing relative paths seems risky
272 	 *
273 	 * this also helps us ensure that location is not empty or whitespace
274 	 */
275 	if (!is_absolute_path(location))
276 		ereport(ERROR,
277 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
278 				 errmsg("tablespace location must be an absolute path")));
279 
280 	/*
281 	 * Check that location isn't too long. Remember that we're going to append
282 	 * 'PG_XXX/<dboid>/<relid>_<fork>.<nnn>'.  FYI, we never actually
283 	 * reference the whole path here, but MakePGDirectory() uses the first two
284 	 * parts.
285 	 */
286 	if (strlen(location) + 1 + strlen(TABLESPACE_VERSION_DIRECTORY) + 1 +
287 		OIDCHARS + 1 + OIDCHARS + 1 + FORKNAMECHARS + 1 + OIDCHARS > MAXPGPATH)
288 		ereport(ERROR,
289 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
290 				 errmsg("tablespace location \"%s\" is too long",
291 						location)));
292 
293 	/* Warn if the tablespace is in the data directory. */
294 	if (path_is_prefix_of_path(DataDir, location))
295 		ereport(WARNING,
296 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
297 				 errmsg("tablespace location should not be inside the data directory")));
298 
299 	/*
300 	 * Disallow creation of tablespaces named "pg_xxx"; we reserve this
301 	 * namespace for system purposes.
302 	 */
303 	if (!allowSystemTableMods && IsReservedName(stmt->tablespacename))
304 		ereport(ERROR,
305 				(errcode(ERRCODE_RESERVED_NAME),
306 				 errmsg("unacceptable tablespace name \"%s\"",
307 						stmt->tablespacename),
308 				 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
309 
310 	/*
311 	 * If built with appropriate switch, whine when regression-testing
312 	 * conventions for tablespace names are violated.
313 	 */
314 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
315 	if (strncmp(stmt->tablespacename, "regress_", 8) != 0)
316 		elog(WARNING, "tablespaces created by regression test cases should have names starting with \"regress_\"");
317 #endif
318 
319 	/*
320 	 * Check that there is no other tablespace by this name.  (The unique
321 	 * index would catch this anyway, but might as well give a friendlier
322 	 * message.)
323 	 */
324 	if (OidIsValid(get_tablespace_oid(stmt->tablespacename, true)))
325 		ereport(ERROR,
326 				(errcode(ERRCODE_DUPLICATE_OBJECT),
327 				 errmsg("tablespace \"%s\" already exists",
328 						stmt->tablespacename)));
329 
330 	/*
331 	 * Insert tuple into pg_tablespace.  The purpose of doing this first is to
332 	 * lock the proposed tablename against other would-be creators. The
333 	 * insertion will roll back if we find problems below.
334 	 */
335 	rel = table_open(TableSpaceRelationId, RowExclusiveLock);
336 
337 	MemSet(nulls, false, sizeof(nulls));
338 
339 	tablespaceoid = GetNewOidWithIndex(rel, TablespaceOidIndexId,
340 									   Anum_pg_tablespace_oid);
341 	values[Anum_pg_tablespace_oid - 1] = ObjectIdGetDatum(tablespaceoid);
342 	values[Anum_pg_tablespace_spcname - 1] =
343 		DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename));
344 	values[Anum_pg_tablespace_spcowner - 1] =
345 		ObjectIdGetDatum(ownerId);
346 	nulls[Anum_pg_tablespace_spcacl - 1] = true;
347 
348 	/* Generate new proposed spcoptions (text array) */
349 	newOptions = transformRelOptions((Datum) 0,
350 									 stmt->options,
351 									 NULL, NULL, false, false);
352 	(void) tablespace_reloptions(newOptions, true);
353 	if (newOptions != (Datum) 0)
354 		values[Anum_pg_tablespace_spcoptions - 1] = newOptions;
355 	else
356 		nulls[Anum_pg_tablespace_spcoptions - 1] = true;
357 
358 	tuple = heap_form_tuple(rel->rd_att, values, nulls);
359 
360 	CatalogTupleInsert(rel, tuple);
361 
362 	heap_freetuple(tuple);
363 
364 	/* Record dependency on owner */
365 	recordDependencyOnOwner(TableSpaceRelationId, tablespaceoid, ownerId);
366 
367 	/* Post creation hook for new tablespace */
368 	InvokeObjectPostCreateHook(TableSpaceRelationId, tablespaceoid, 0);
369 
370 	create_tablespace_directories(location, tablespaceoid);
371 
372 	/* Record the filesystem change in XLOG */
373 	{
374 		xl_tblspc_create_rec xlrec;
375 
376 		xlrec.ts_id = tablespaceoid;
377 
378 		XLogBeginInsert();
379 		XLogRegisterData((char *) &xlrec,
380 						 offsetof(xl_tblspc_create_rec, ts_path));
381 		XLogRegisterData((char *) location, strlen(location) + 1);
382 
383 		(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_CREATE);
384 	}
385 
386 	/*
387 	 * Force synchronous commit, to minimize the window between creating the
388 	 * symlink on-disk and marking the transaction committed.  It's not great
389 	 * that there is any window at all, but definitely we don't want to make
390 	 * it larger than necessary.
391 	 */
392 	ForceSyncCommit();
393 
394 	pfree(location);
395 
396 	/* We keep the lock on pg_tablespace until commit */
397 	table_close(rel, NoLock);
398 
399 	return tablespaceoid;
400 #else							/* !HAVE_SYMLINK */
401 	ereport(ERROR,
402 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
403 			 errmsg("tablespaces are not supported on this platform")));
404 	return InvalidOid;			/* keep compiler quiet */
405 #endif							/* HAVE_SYMLINK */
406 }
407 
408 /*
409  * Drop a table space
410  *
411  * Be careful to check that the tablespace is empty.
412  */
413 void
DropTableSpace(DropTableSpaceStmt * stmt)414 DropTableSpace(DropTableSpaceStmt *stmt)
415 {
416 #ifdef HAVE_SYMLINK
417 	char	   *tablespacename = stmt->tablespacename;
418 	TableScanDesc scandesc;
419 	Relation	rel;
420 	HeapTuple	tuple;
421 	Form_pg_tablespace spcform;
422 	ScanKeyData entry[1];
423 	Oid			tablespaceoid;
424 	char	   *detail;
425 	char	   *detail_log;
426 
427 	/*
428 	 * Find the target tuple
429 	 */
430 	rel = table_open(TableSpaceRelationId, RowExclusiveLock);
431 
432 	ScanKeyInit(&entry[0],
433 				Anum_pg_tablespace_spcname,
434 				BTEqualStrategyNumber, F_NAMEEQ,
435 				CStringGetDatum(tablespacename));
436 	scandesc = table_beginscan_catalog(rel, 1, entry);
437 	tuple = heap_getnext(scandesc, ForwardScanDirection);
438 
439 	if (!HeapTupleIsValid(tuple))
440 	{
441 		if (!stmt->missing_ok)
442 		{
443 			ereport(ERROR,
444 					(errcode(ERRCODE_UNDEFINED_OBJECT),
445 					 errmsg("tablespace \"%s\" does not exist",
446 							tablespacename)));
447 		}
448 		else
449 		{
450 			ereport(NOTICE,
451 					(errmsg("tablespace \"%s\" does not exist, skipping",
452 							tablespacename)));
453 			/* XXX I assume I need one or both of these next two calls */
454 			table_endscan(scandesc);
455 			table_close(rel, NoLock);
456 		}
457 		return;
458 	}
459 
460 	spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
461 	tablespaceoid = spcform->oid;
462 
463 	/* Must be tablespace owner */
464 	if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
465 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_TABLESPACE,
466 					   tablespacename);
467 
468 	/* Disallow drop of the standard tablespaces, even by superuser */
469 	if (tablespaceoid == GLOBALTABLESPACE_OID ||
470 		tablespaceoid == DEFAULTTABLESPACE_OID)
471 		aclcheck_error(ACLCHECK_NO_PRIV, OBJECT_TABLESPACE,
472 					   tablespacename);
473 
474 	/* Check for pg_shdepend entries depending on this tablespace */
475 	if (checkSharedDependencies(TableSpaceRelationId, tablespaceoid,
476 								&detail, &detail_log))
477 		ereport(ERROR,
478 				(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
479 				 errmsg("tablespace \"%s\" cannot be dropped because some objects depend on it",
480 						tablespacename),
481 				 errdetail_internal("%s", detail),
482 				 errdetail_log("%s", detail_log)));
483 
484 	/* DROP hook for the tablespace being removed */
485 	InvokeObjectDropHook(TableSpaceRelationId, tablespaceoid, 0);
486 
487 	/*
488 	 * Remove the pg_tablespace tuple (this will roll back if we fail below)
489 	 */
490 	CatalogTupleDelete(rel, &tuple->t_self);
491 
492 	table_endscan(scandesc);
493 
494 	/*
495 	 * Remove any comments or security labels on this tablespace.
496 	 */
497 	DeleteSharedComments(tablespaceoid, TableSpaceRelationId);
498 	DeleteSharedSecurityLabel(tablespaceoid, TableSpaceRelationId);
499 
500 	/*
501 	 * Remove dependency on owner.
502 	 */
503 	deleteSharedDependencyRecordsFor(TableSpaceRelationId, tablespaceoid, 0);
504 
505 	/*
506 	 * Acquire TablespaceCreateLock to ensure that no TablespaceCreateDbspace
507 	 * is running concurrently.
508 	 */
509 	LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
510 
511 	/*
512 	 * Try to remove the physical infrastructure.
513 	 */
514 	if (!destroy_tablespace_directories(tablespaceoid, false))
515 	{
516 		/*
517 		 * Not all files deleted?  However, there can be lingering empty files
518 		 * in the directories, left behind by for example DROP TABLE, that
519 		 * have been scheduled for deletion at next checkpoint (see comments
520 		 * in mdunlink() for details).  We could just delete them immediately,
521 		 * but we can't tell them apart from important data files that we
522 		 * mustn't delete.  So instead, we force a checkpoint which will clean
523 		 * out any lingering files, and try again.
524 		 *
525 		 * XXX On Windows, an unlinked file persists in the directory listing
526 		 * until no process retains an open handle for the file.  The DDL
527 		 * commands that schedule files for unlink send invalidation messages
528 		 * directing other PostgreSQL processes to close the files.  DROP
529 		 * TABLESPACE should not give up on the tablespace becoming empty
530 		 * until all relevant invalidation processing is complete.
531 		 */
532 		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
533 		if (!destroy_tablespace_directories(tablespaceoid, false))
534 		{
535 			/* Still not empty, the files must be important then */
536 			ereport(ERROR,
537 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
538 					 errmsg("tablespace \"%s\" is not empty",
539 							tablespacename)));
540 		}
541 	}
542 
543 	/* Record the filesystem change in XLOG */
544 	{
545 		xl_tblspc_drop_rec xlrec;
546 
547 		xlrec.ts_id = tablespaceoid;
548 
549 		XLogBeginInsert();
550 		XLogRegisterData((char *) &xlrec, sizeof(xl_tblspc_drop_rec));
551 
552 		(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP);
553 	}
554 
555 	/*
556 	 * Note: because we checked that the tablespace was empty, there should be
557 	 * no need to worry about flushing shared buffers or free space map
558 	 * entries for relations in the tablespace.
559 	 */
560 
561 	/*
562 	 * Force synchronous commit, to minimize the window between removing the
563 	 * files on-disk and marking the transaction committed.  It's not great
564 	 * that there is any window at all, but definitely we don't want to make
565 	 * it larger than necessary.
566 	 */
567 	ForceSyncCommit();
568 
569 	/*
570 	 * Allow TablespaceCreateDbspace again.
571 	 */
572 	LWLockRelease(TablespaceCreateLock);
573 
574 	/* We keep the lock on pg_tablespace until commit */
575 	table_close(rel, NoLock);
576 #else							/* !HAVE_SYMLINK */
577 	ereport(ERROR,
578 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
579 			 errmsg("tablespaces are not supported on this platform")));
580 #endif							/* HAVE_SYMLINK */
581 }
582 
583 
584 /*
585  * create_tablespace_directories
586  *
587  *	Attempt to create filesystem infrastructure linking $PGDATA/pg_tblspc/
588  *	to the specified directory
589  */
590 static void
create_tablespace_directories(const char * location,const Oid tablespaceoid)591 create_tablespace_directories(const char *location, const Oid tablespaceoid)
592 {
593 	char	   *linkloc;
594 	char	   *location_with_version_dir;
595 	struct stat st;
596 
597 	linkloc = psprintf("pg_tblspc/%u", tablespaceoid);
598 	location_with_version_dir = psprintf("%s/%s", location,
599 										 TABLESPACE_VERSION_DIRECTORY);
600 
601 	/*
602 	 * Attempt to coerce target directory to safe permissions.  If this fails,
603 	 * it doesn't exist or has the wrong owner.
604 	 */
605 	if (chmod(location, pg_dir_create_mode) != 0)
606 	{
607 		if (errno == ENOENT)
608 			ereport(ERROR,
609 					(errcode(ERRCODE_UNDEFINED_FILE),
610 					 errmsg("directory \"%s\" does not exist", location),
611 					 InRecovery ? errhint("Create this directory for the tablespace before "
612 										  "restarting the server.") : 0));
613 		else
614 			ereport(ERROR,
615 					(errcode_for_file_access(),
616 					 errmsg("could not set permissions on directory \"%s\": %m",
617 							location)));
618 	}
619 
620 	/*
621 	 * The creation of the version directory prevents more than one tablespace
622 	 * in a single location.  This imitates TablespaceCreateDbspace(), but it
623 	 * ignores concurrency and missing parent directories.  The chmod() would
624 	 * have failed in the absence of a parent.  pg_tablespace_spcname_index
625 	 * prevents concurrency.
626 	 */
627 	if (stat(location_with_version_dir, &st) < 0)
628 	{
629 		if (errno != ENOENT)
630 			ereport(ERROR,
631 					(errcode_for_file_access(),
632 					 errmsg("could not stat directory \"%s\": %m",
633 							location_with_version_dir)));
634 		else if (MakePGDirectory(location_with_version_dir) < 0)
635 			ereport(ERROR,
636 					(errcode_for_file_access(),
637 					 errmsg("could not create directory \"%s\": %m",
638 							location_with_version_dir)));
639 	}
640 	else if (!S_ISDIR(st.st_mode))
641 		ereport(ERROR,
642 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
643 				 errmsg("\"%s\" exists but is not a directory",
644 						location_with_version_dir)));
645 	else if (!InRecovery)
646 		ereport(ERROR,
647 				(errcode(ERRCODE_OBJECT_IN_USE),
648 				 errmsg("directory \"%s\" already in use as a tablespace",
649 						location_with_version_dir)));
650 
651 	/*
652 	 * In recovery, remove old symlink, in case it points to the wrong place.
653 	 */
654 	if (InRecovery)
655 		remove_tablespace_symlink(linkloc);
656 
657 	/*
658 	 * Create the symlink under PGDATA
659 	 */
660 	if (symlink(location, linkloc) < 0)
661 		ereport(ERROR,
662 				(errcode_for_file_access(),
663 				 errmsg("could not create symbolic link \"%s\": %m",
664 						linkloc)));
665 
666 	pfree(linkloc);
667 	pfree(location_with_version_dir);
668 }
669 
670 
671 /*
672  * destroy_tablespace_directories
673  *
674  * Attempt to remove filesystem infrastructure for the tablespace.
675  *
676  * 'redo' indicates we are redoing a drop from XLOG; in that case we should
677  * not throw an ERROR for problems, just LOG them.  The worst consequence of
678  * not removing files here would be failure to release some disk space, which
679  * does not justify throwing an error that would require manual intervention
680  * to get the database running again.
681  *
682  * Returns true if successful, false if some subdirectory is not empty
683  */
684 static bool
destroy_tablespace_directories(Oid tablespaceoid,bool redo)685 destroy_tablespace_directories(Oid tablespaceoid, bool redo)
686 {
687 	char	   *linkloc;
688 	char	   *linkloc_with_version_dir;
689 	DIR		   *dirdesc;
690 	struct dirent *de;
691 	char	   *subfile;
692 	struct stat st;
693 
694 	linkloc_with_version_dir = psprintf("pg_tblspc/%u/%s", tablespaceoid,
695 										TABLESPACE_VERSION_DIRECTORY);
696 
697 	/*
698 	 * Check if the tablespace still contains any files.  We try to rmdir each
699 	 * per-database directory we find in it.  rmdir failure implies there are
700 	 * still files in that subdirectory, so give up.  (We do not have to worry
701 	 * about undoing any already completed rmdirs, since the next attempt to
702 	 * use the tablespace from that database will simply recreate the
703 	 * subdirectory via TablespaceCreateDbspace.)
704 	 *
705 	 * Since we hold TablespaceCreateLock, no one else should be creating any
706 	 * fresh subdirectories in parallel. It is possible that new files are
707 	 * being created within subdirectories, though, so the rmdir call could
708 	 * fail.  Worst consequence is a less friendly error message.
709 	 *
710 	 * If redo is true then ENOENT is a likely outcome here, and we allow it
711 	 * to pass without comment.  In normal operation we still allow it, but
712 	 * with a warning.  This is because even though ProcessUtility disallows
713 	 * DROP TABLESPACE in a transaction block, it's possible that a previous
714 	 * DROP failed and rolled back after removing the tablespace directories
715 	 * and/or symlink.  We want to allow a new DROP attempt to succeed at
716 	 * removing the catalog entries (and symlink if still present), so we
717 	 * should not give a hard error here.
718 	 */
719 	dirdesc = AllocateDir(linkloc_with_version_dir);
720 	if (dirdesc == NULL)
721 	{
722 		if (errno == ENOENT)
723 		{
724 			if (!redo)
725 				ereport(WARNING,
726 						(errcode_for_file_access(),
727 						 errmsg("could not open directory \"%s\": %m",
728 								linkloc_with_version_dir)));
729 			/* The symlink might still exist, so go try to remove it */
730 			goto remove_symlink;
731 		}
732 		else if (redo)
733 		{
734 			/* in redo, just log other types of error */
735 			ereport(LOG,
736 					(errcode_for_file_access(),
737 					 errmsg("could not open directory \"%s\": %m",
738 							linkloc_with_version_dir)));
739 			pfree(linkloc_with_version_dir);
740 			return false;
741 		}
742 		/* else let ReadDir report the error */
743 	}
744 
745 	while ((de = ReadDir(dirdesc, linkloc_with_version_dir)) != NULL)
746 	{
747 		if (strcmp(de->d_name, ".") == 0 ||
748 			strcmp(de->d_name, "..") == 0)
749 			continue;
750 
751 		subfile = psprintf("%s/%s", linkloc_with_version_dir, de->d_name);
752 
753 		/* This check is just to deliver a friendlier error message */
754 		if (!redo && !directory_is_empty(subfile))
755 		{
756 			FreeDir(dirdesc);
757 			pfree(subfile);
758 			pfree(linkloc_with_version_dir);
759 			return false;
760 		}
761 
762 		/* remove empty directory */
763 		if (rmdir(subfile) < 0)
764 			ereport(redo ? LOG : ERROR,
765 					(errcode_for_file_access(),
766 					 errmsg("could not remove directory \"%s\": %m",
767 							subfile)));
768 
769 		pfree(subfile);
770 	}
771 
772 	FreeDir(dirdesc);
773 
774 	/* remove version directory */
775 	if (rmdir(linkloc_with_version_dir) < 0)
776 	{
777 		ereport(redo ? LOG : ERROR,
778 				(errcode_for_file_access(),
779 				 errmsg("could not remove directory \"%s\": %m",
780 						linkloc_with_version_dir)));
781 		pfree(linkloc_with_version_dir);
782 		return false;
783 	}
784 
785 	/*
786 	 * Try to remove the symlink.  We must however deal with the possibility
787 	 * that it's a directory instead of a symlink --- this could happen during
788 	 * WAL replay (see TablespaceCreateDbspace), and it is also the case on
789 	 * Windows where junction points lstat() as directories.
790 	 *
791 	 * Note: in the redo case, we'll return true if this final step fails;
792 	 * there's no point in retrying it.  Also, ENOENT should provoke no more
793 	 * than a warning.
794 	 */
795 remove_symlink:
796 	linkloc = pstrdup(linkloc_with_version_dir);
797 	get_parent_directory(linkloc);
798 	if (lstat(linkloc, &st) < 0)
799 	{
800 		int			saved_errno = errno;
801 
802 		ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
803 				(errcode_for_file_access(),
804 				 errmsg("could not stat file \"%s\": %m",
805 						linkloc)));
806 	}
807 	else if (S_ISDIR(st.st_mode))
808 	{
809 		if (rmdir(linkloc) < 0)
810 		{
811 			int			saved_errno = errno;
812 
813 			ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
814 					(errcode_for_file_access(),
815 					 errmsg("could not remove directory \"%s\": %m",
816 							linkloc)));
817 		}
818 	}
819 #ifdef S_ISLNK
820 	else if (S_ISLNK(st.st_mode))
821 	{
822 		if (unlink(linkloc) < 0)
823 		{
824 			int			saved_errno = errno;
825 
826 			ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
827 					(errcode_for_file_access(),
828 					 errmsg("could not remove symbolic link \"%s\": %m",
829 							linkloc)));
830 		}
831 	}
832 #endif
833 	else
834 	{
835 		/* Refuse to remove anything that's not a directory or symlink */
836 		ereport(redo ? LOG : ERROR,
837 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
838 				 errmsg("\"%s\" is not a directory or symbolic link",
839 						linkloc)));
840 	}
841 
842 	pfree(linkloc_with_version_dir);
843 	pfree(linkloc);
844 
845 	return true;
846 }
847 
848 
849 /*
850  * Check if a directory is empty.
851  *
852  * This probably belongs somewhere else, but not sure where...
853  */
854 bool
directory_is_empty(const char * path)855 directory_is_empty(const char *path)
856 {
857 	DIR		   *dirdesc;
858 	struct dirent *de;
859 
860 	dirdesc = AllocateDir(path);
861 
862 	while ((de = ReadDir(dirdesc, path)) != NULL)
863 	{
864 		if (strcmp(de->d_name, ".") == 0 ||
865 			strcmp(de->d_name, "..") == 0)
866 			continue;
867 		FreeDir(dirdesc);
868 		return false;
869 	}
870 
871 	FreeDir(dirdesc);
872 	return true;
873 }
874 
875 /*
876  *	remove_tablespace_symlink
877  *
878  * This function removes symlinks in pg_tblspc.  On Windows, junction points
879  * act like directories so we must be able to apply rmdir.  This function
880  * works like the symlink removal code in destroy_tablespace_directories,
881  * except that failure to remove is always an ERROR.  But if the file doesn't
882  * exist at all, that's OK.
883  */
884 void
remove_tablespace_symlink(const char * linkloc)885 remove_tablespace_symlink(const char *linkloc)
886 {
887 	struct stat st;
888 
889 	if (lstat(linkloc, &st) < 0)
890 	{
891 		if (errno == ENOENT)
892 			return;
893 		ereport(ERROR,
894 				(errcode_for_file_access(),
895 				 errmsg("could not stat file \"%s\": %m", linkloc)));
896 	}
897 
898 	if (S_ISDIR(st.st_mode))
899 	{
900 		/*
901 		 * This will fail if the directory isn't empty, but not if it's a
902 		 * junction point.
903 		 */
904 		if (rmdir(linkloc) < 0 && errno != ENOENT)
905 			ereport(ERROR,
906 					(errcode_for_file_access(),
907 					 errmsg("could not remove directory \"%s\": %m",
908 							linkloc)));
909 	}
910 #ifdef S_ISLNK
911 	else if (S_ISLNK(st.st_mode))
912 	{
913 		if (unlink(linkloc) < 0 && errno != ENOENT)
914 			ereport(ERROR,
915 					(errcode_for_file_access(),
916 					 errmsg("could not remove symbolic link \"%s\": %m",
917 							linkloc)));
918 	}
919 #endif
920 	else
921 	{
922 		/* Refuse to remove anything that's not a directory or symlink */
923 		ereport(ERROR,
924 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
925 				 errmsg("\"%s\" is not a directory or symbolic link",
926 						linkloc)));
927 	}
928 }
929 
930 /*
931  * Rename a tablespace
932  */
933 ObjectAddress
RenameTableSpace(const char * oldname,const char * newname)934 RenameTableSpace(const char *oldname, const char *newname)
935 {
936 	Oid			tspId;
937 	Relation	rel;
938 	ScanKeyData entry[1];
939 	TableScanDesc scan;
940 	HeapTuple	tup;
941 	HeapTuple	newtuple;
942 	Form_pg_tablespace newform;
943 	ObjectAddress address;
944 
945 	/* Search pg_tablespace */
946 	rel = table_open(TableSpaceRelationId, RowExclusiveLock);
947 
948 	ScanKeyInit(&entry[0],
949 				Anum_pg_tablespace_spcname,
950 				BTEqualStrategyNumber, F_NAMEEQ,
951 				CStringGetDatum(oldname));
952 	scan = table_beginscan_catalog(rel, 1, entry);
953 	tup = heap_getnext(scan, ForwardScanDirection);
954 	if (!HeapTupleIsValid(tup))
955 		ereport(ERROR,
956 				(errcode(ERRCODE_UNDEFINED_OBJECT),
957 				 errmsg("tablespace \"%s\" does not exist",
958 						oldname)));
959 
960 	newtuple = heap_copytuple(tup);
961 	newform = (Form_pg_tablespace) GETSTRUCT(newtuple);
962 	tspId = newform->oid;
963 
964 	table_endscan(scan);
965 
966 	/* Must be owner */
967 	if (!pg_tablespace_ownercheck(tspId, GetUserId()))
968 		aclcheck_error(ACLCHECK_NO_PRIV, OBJECT_TABLESPACE, oldname);
969 
970 	/* Validate new name */
971 	if (!allowSystemTableMods && IsReservedName(newname))
972 		ereport(ERROR,
973 				(errcode(ERRCODE_RESERVED_NAME),
974 				 errmsg("unacceptable tablespace name \"%s\"", newname),
975 				 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
976 
977 	/*
978 	 * If built with appropriate switch, whine when regression-testing
979 	 * conventions for tablespace names are violated.
980 	 */
981 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
982 	if (strncmp(newname, "regress_", 8) != 0)
983 		elog(WARNING, "tablespaces created by regression test cases should have names starting with \"regress_\"");
984 #endif
985 
986 	/* Make sure the new name doesn't exist */
987 	ScanKeyInit(&entry[0],
988 				Anum_pg_tablespace_spcname,
989 				BTEqualStrategyNumber, F_NAMEEQ,
990 				CStringGetDatum(newname));
991 	scan = table_beginscan_catalog(rel, 1, entry);
992 	tup = heap_getnext(scan, ForwardScanDirection);
993 	if (HeapTupleIsValid(tup))
994 		ereport(ERROR,
995 				(errcode(ERRCODE_DUPLICATE_OBJECT),
996 				 errmsg("tablespace \"%s\" already exists",
997 						newname)));
998 
999 	table_endscan(scan);
1000 
1001 	/* OK, update the entry */
1002 	namestrcpy(&(newform->spcname), newname);
1003 
1004 	CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1005 
1006 	InvokeObjectPostAlterHook(TableSpaceRelationId, tspId, 0);
1007 
1008 	ObjectAddressSet(address, TableSpaceRelationId, tspId);
1009 
1010 	table_close(rel, NoLock);
1011 
1012 	return address;
1013 }
1014 
1015 /*
1016  * Alter table space options
1017  */
1018 Oid
AlterTableSpaceOptions(AlterTableSpaceOptionsStmt * stmt)1019 AlterTableSpaceOptions(AlterTableSpaceOptionsStmt *stmt)
1020 {
1021 	Relation	rel;
1022 	ScanKeyData entry[1];
1023 	TableScanDesc scandesc;
1024 	HeapTuple	tup;
1025 	Oid			tablespaceoid;
1026 	Datum		datum;
1027 	Datum		newOptions;
1028 	Datum		repl_val[Natts_pg_tablespace];
1029 	bool		isnull;
1030 	bool		repl_null[Natts_pg_tablespace];
1031 	bool		repl_repl[Natts_pg_tablespace];
1032 	HeapTuple	newtuple;
1033 
1034 	/* Search pg_tablespace */
1035 	rel = table_open(TableSpaceRelationId, RowExclusiveLock);
1036 
1037 	ScanKeyInit(&entry[0],
1038 				Anum_pg_tablespace_spcname,
1039 				BTEqualStrategyNumber, F_NAMEEQ,
1040 				CStringGetDatum(stmt->tablespacename));
1041 	scandesc = table_beginscan_catalog(rel, 1, entry);
1042 	tup = heap_getnext(scandesc, ForwardScanDirection);
1043 	if (!HeapTupleIsValid(tup))
1044 		ereport(ERROR,
1045 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1046 				 errmsg("tablespace \"%s\" does not exist",
1047 						stmt->tablespacename)));
1048 
1049 	tablespaceoid = ((Form_pg_tablespace) GETSTRUCT(tup))->oid;
1050 
1051 	/* Must be owner of the existing object */
1052 	if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
1053 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_TABLESPACE,
1054 					   stmt->tablespacename);
1055 
1056 	/* Generate new proposed spcoptions (text array) */
1057 	datum = heap_getattr(tup, Anum_pg_tablespace_spcoptions,
1058 						 RelationGetDescr(rel), &isnull);
1059 	newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
1060 									 stmt->options, NULL, NULL, false,
1061 									 stmt->isReset);
1062 	(void) tablespace_reloptions(newOptions, true);
1063 
1064 	/* Build new tuple. */
1065 	memset(repl_null, false, sizeof(repl_null));
1066 	memset(repl_repl, false, sizeof(repl_repl));
1067 	if (newOptions != (Datum) 0)
1068 		repl_val[Anum_pg_tablespace_spcoptions - 1] = newOptions;
1069 	else
1070 		repl_null[Anum_pg_tablespace_spcoptions - 1] = true;
1071 	repl_repl[Anum_pg_tablespace_spcoptions - 1] = true;
1072 	newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val,
1073 								 repl_null, repl_repl);
1074 
1075 	/* Update system catalog. */
1076 	CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1077 
1078 	InvokeObjectPostAlterHook(TableSpaceRelationId, tablespaceoid, 0);
1079 
1080 	heap_freetuple(newtuple);
1081 
1082 	/* Conclude heap scan. */
1083 	table_endscan(scandesc);
1084 	table_close(rel, NoLock);
1085 
1086 	return tablespaceoid;
1087 }
1088 
1089 /*
1090  * Routines for handling the GUC variable 'default_tablespace'.
1091  */
1092 
1093 /* check_hook: validate new default_tablespace */
1094 bool
check_default_tablespace(char ** newval,void ** extra,GucSource source)1095 check_default_tablespace(char **newval, void **extra, GucSource source)
1096 {
1097 	/*
1098 	 * If we aren't inside a transaction, or connected to a database, we
1099 	 * cannot do the catalog accesses necessary to verify the name.  Must
1100 	 * accept the value on faith.
1101 	 */
1102 	if (IsTransactionState() && MyDatabaseId != InvalidOid)
1103 	{
1104 		if (**newval != '\0' &&
1105 			!OidIsValid(get_tablespace_oid(*newval, true)))
1106 		{
1107 			/*
1108 			 * When source == PGC_S_TEST, don't throw a hard error for a
1109 			 * nonexistent tablespace, only a NOTICE.  See comments in guc.h.
1110 			 */
1111 			if (source == PGC_S_TEST)
1112 			{
1113 				ereport(NOTICE,
1114 						(errcode(ERRCODE_UNDEFINED_OBJECT),
1115 						 errmsg("tablespace \"%s\" does not exist",
1116 								*newval)));
1117 			}
1118 			else
1119 			{
1120 				GUC_check_errdetail("Tablespace \"%s\" does not exist.",
1121 									*newval);
1122 				return false;
1123 			}
1124 		}
1125 	}
1126 
1127 	return true;
1128 }
1129 
1130 /*
1131  * GetDefaultTablespace -- get the OID of the current default tablespace
1132  *
1133  * Temporary objects have different default tablespaces, hence the
1134  * relpersistence parameter must be specified.  Also, for partitioned tables,
1135  * we disallow specifying the database default, so that needs to be specified
1136  * too.
1137  *
1138  * May return InvalidOid to indicate "use the database's default tablespace".
1139  *
1140  * Note that caller is expected to check appropriate permissions for any
1141  * result other than InvalidOid.
1142  *
1143  * This exists to hide (and possibly optimize the use of) the
1144  * default_tablespace GUC variable.
1145  */
1146 Oid
GetDefaultTablespace(char relpersistence,bool partitioned)1147 GetDefaultTablespace(char relpersistence, bool partitioned)
1148 {
1149 	Oid			result;
1150 
1151 	/* The temp-table case is handled elsewhere */
1152 	if (relpersistence == RELPERSISTENCE_TEMP)
1153 	{
1154 		PrepareTempTablespaces();
1155 		return GetNextTempTableSpace();
1156 	}
1157 
1158 	/* Fast path for default_tablespace == "" */
1159 	if (default_tablespace == NULL || default_tablespace[0] == '\0')
1160 		return InvalidOid;
1161 
1162 	/*
1163 	 * It is tempting to cache this lookup for more speed, but then we would
1164 	 * fail to detect the case where the tablespace was dropped since the GUC
1165 	 * variable was set.  Note also that we don't complain if the value fails
1166 	 * to refer to an existing tablespace; we just silently return InvalidOid,
1167 	 * causing the new object to be created in the database's tablespace.
1168 	 */
1169 	result = get_tablespace_oid(default_tablespace, true);
1170 
1171 	/*
1172 	 * Allow explicit specification of database's default tablespace in
1173 	 * default_tablespace without triggering permissions checks.  Don't allow
1174 	 * specifying that when creating a partitioned table, however, since the
1175 	 * result is confusing.
1176 	 */
1177 	if (result == MyDatabaseTableSpace)
1178 	{
1179 		if (partitioned)
1180 			ereport(ERROR,
1181 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1182 					 errmsg("cannot specify default tablespace for partitioned relations")));
1183 		result = InvalidOid;
1184 	}
1185 	return result;
1186 }
1187 
1188 
1189 /*
1190  * Routines for handling the GUC variable 'temp_tablespaces'.
1191  */
1192 
1193 typedef struct
1194 {
1195 	/* Array of OIDs to be passed to SetTempTablespaces() */
1196 	int			numSpcs;
1197 	Oid			tblSpcs[FLEXIBLE_ARRAY_MEMBER];
1198 } temp_tablespaces_extra;
1199 
1200 /* check_hook: validate new temp_tablespaces */
1201 bool
check_temp_tablespaces(char ** newval,void ** extra,GucSource source)1202 check_temp_tablespaces(char **newval, void **extra, GucSource source)
1203 {
1204 	char	   *rawname;
1205 	List	   *namelist;
1206 
1207 	/* Need a modifiable copy of string */
1208 	rawname = pstrdup(*newval);
1209 
1210 	/* Parse string into list of identifiers */
1211 	if (!SplitIdentifierString(rawname, ',', &namelist))
1212 	{
1213 		/* syntax error in name list */
1214 		GUC_check_errdetail("List syntax is invalid.");
1215 		pfree(rawname);
1216 		list_free(namelist);
1217 		return false;
1218 	}
1219 
1220 	/*
1221 	 * If we aren't inside a transaction, or connected to a database, we
1222 	 * cannot do the catalog accesses necessary to verify the name.  Must
1223 	 * accept the value on faith. Fortunately, there's then also no need to
1224 	 * pass the data to fd.c.
1225 	 */
1226 	if (IsTransactionState() && MyDatabaseId != InvalidOid)
1227 	{
1228 		temp_tablespaces_extra *myextra;
1229 		Oid		   *tblSpcs;
1230 		int			numSpcs;
1231 		ListCell   *l;
1232 
1233 		/* temporary workspace until we are done verifying the list */
1234 		tblSpcs = (Oid *) palloc(list_length(namelist) * sizeof(Oid));
1235 		numSpcs = 0;
1236 		foreach(l, namelist)
1237 		{
1238 			char	   *curname = (char *) lfirst(l);
1239 			Oid			curoid;
1240 			AclResult	aclresult;
1241 
1242 			/* Allow an empty string (signifying database default) */
1243 			if (curname[0] == '\0')
1244 			{
1245 				/* InvalidOid signifies database's default tablespace */
1246 				tblSpcs[numSpcs++] = InvalidOid;
1247 				continue;
1248 			}
1249 
1250 			/*
1251 			 * In an interactive SET command, we ereport for bad info.  When
1252 			 * source == PGC_S_TEST, don't throw a hard error for a
1253 			 * nonexistent tablespace, only a NOTICE.  See comments in guc.h.
1254 			 */
1255 			curoid = get_tablespace_oid(curname, source <= PGC_S_TEST);
1256 			if (curoid == InvalidOid)
1257 			{
1258 				if (source == PGC_S_TEST)
1259 					ereport(NOTICE,
1260 							(errcode(ERRCODE_UNDEFINED_OBJECT),
1261 							 errmsg("tablespace \"%s\" does not exist",
1262 									curname)));
1263 				continue;
1264 			}
1265 
1266 			/*
1267 			 * Allow explicit specification of database's default tablespace
1268 			 * in temp_tablespaces without triggering permissions checks.
1269 			 */
1270 			if (curoid == MyDatabaseTableSpace)
1271 			{
1272 				/* InvalidOid signifies database's default tablespace */
1273 				tblSpcs[numSpcs++] = InvalidOid;
1274 				continue;
1275 			}
1276 
1277 			/* Check permissions, similarly complaining only if interactive */
1278 			aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1279 											   ACL_CREATE);
1280 			if (aclresult != ACLCHECK_OK)
1281 			{
1282 				if (source >= PGC_S_INTERACTIVE)
1283 					aclcheck_error(aclresult, OBJECT_TABLESPACE, curname);
1284 				continue;
1285 			}
1286 
1287 			tblSpcs[numSpcs++] = curoid;
1288 		}
1289 
1290 		/* Now prepare an "extra" struct for assign_temp_tablespaces */
1291 		myextra = malloc(offsetof(temp_tablespaces_extra, tblSpcs) +
1292 						 numSpcs * sizeof(Oid));
1293 		if (!myextra)
1294 			return false;
1295 		myextra->numSpcs = numSpcs;
1296 		memcpy(myextra->tblSpcs, tblSpcs, numSpcs * sizeof(Oid));
1297 		*extra = (void *) myextra;
1298 
1299 		pfree(tblSpcs);
1300 	}
1301 
1302 	pfree(rawname);
1303 	list_free(namelist);
1304 
1305 	return true;
1306 }
1307 
1308 /* assign_hook: do extra actions as needed */
1309 void
assign_temp_tablespaces(const char * newval,void * extra)1310 assign_temp_tablespaces(const char *newval, void *extra)
1311 {
1312 	temp_tablespaces_extra *myextra = (temp_tablespaces_extra *) extra;
1313 
1314 	/*
1315 	 * If check_temp_tablespaces was executed inside a transaction, then pass
1316 	 * the list it made to fd.c.  Otherwise, clear fd.c's list; we must be
1317 	 * still outside a transaction, or else restoring during transaction exit,
1318 	 * and in either case we can just let the next PrepareTempTablespaces call
1319 	 * make things sane.
1320 	 */
1321 	if (myextra)
1322 		SetTempTablespaces(myextra->tblSpcs, myextra->numSpcs);
1323 	else
1324 		SetTempTablespaces(NULL, 0);
1325 }
1326 
1327 /*
1328  * PrepareTempTablespaces -- prepare to use temp tablespaces
1329  *
1330  * If we have not already done so in the current transaction, parse the
1331  * temp_tablespaces GUC variable and tell fd.c which tablespace(s) to use
1332  * for temp files.
1333  */
1334 void
PrepareTempTablespaces(void)1335 PrepareTempTablespaces(void)
1336 {
1337 	char	   *rawname;
1338 	List	   *namelist;
1339 	Oid		   *tblSpcs;
1340 	int			numSpcs;
1341 	ListCell   *l;
1342 
1343 	/* No work if already done in current transaction */
1344 	if (TempTablespacesAreSet())
1345 		return;
1346 
1347 	/*
1348 	 * Can't do catalog access unless within a transaction.  This is just a
1349 	 * safety check in case this function is called by low-level code that
1350 	 * could conceivably execute outside a transaction.  Note that in such a
1351 	 * scenario, fd.c will fall back to using the current database's default
1352 	 * tablespace, which should always be OK.
1353 	 */
1354 	if (!IsTransactionState())
1355 		return;
1356 
1357 	/* Need a modifiable copy of string */
1358 	rawname = pstrdup(temp_tablespaces);
1359 
1360 	/* Parse string into list of identifiers */
1361 	if (!SplitIdentifierString(rawname, ',', &namelist))
1362 	{
1363 		/* syntax error in name list */
1364 		SetTempTablespaces(NULL, 0);
1365 		pfree(rawname);
1366 		list_free(namelist);
1367 		return;
1368 	}
1369 
1370 	/* Store tablespace OIDs in an array in TopTransactionContext */
1371 	tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1372 										 list_length(namelist) * sizeof(Oid));
1373 	numSpcs = 0;
1374 	foreach(l, namelist)
1375 	{
1376 		char	   *curname = (char *) lfirst(l);
1377 		Oid			curoid;
1378 		AclResult	aclresult;
1379 
1380 		/* Allow an empty string (signifying database default) */
1381 		if (curname[0] == '\0')
1382 		{
1383 			/* InvalidOid signifies database's default tablespace */
1384 			tblSpcs[numSpcs++] = InvalidOid;
1385 			continue;
1386 		}
1387 
1388 		/* Else verify that name is a valid tablespace name */
1389 		curoid = get_tablespace_oid(curname, true);
1390 		if (curoid == InvalidOid)
1391 		{
1392 			/* Skip any bad list elements */
1393 			continue;
1394 		}
1395 
1396 		/*
1397 		 * Allow explicit specification of database's default tablespace in
1398 		 * temp_tablespaces without triggering permissions checks.
1399 		 */
1400 		if (curoid == MyDatabaseTableSpace)
1401 		{
1402 			/* InvalidOid signifies database's default tablespace */
1403 			tblSpcs[numSpcs++] = InvalidOid;
1404 			continue;
1405 		}
1406 
1407 		/* Check permissions similarly */
1408 		aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1409 										   ACL_CREATE);
1410 		if (aclresult != ACLCHECK_OK)
1411 			continue;
1412 
1413 		tblSpcs[numSpcs++] = curoid;
1414 	}
1415 
1416 	SetTempTablespaces(tblSpcs, numSpcs);
1417 
1418 	pfree(rawname);
1419 	list_free(namelist);
1420 }
1421 
1422 
1423 /*
1424  * get_tablespace_oid - given a tablespace name, look up the OID
1425  *
1426  * If missing_ok is false, throw an error if tablespace name not found.  If
1427  * true, just return InvalidOid.
1428  */
1429 Oid
get_tablespace_oid(const char * tablespacename,bool missing_ok)1430 get_tablespace_oid(const char *tablespacename, bool missing_ok)
1431 {
1432 	Oid			result;
1433 	Relation	rel;
1434 	TableScanDesc scandesc;
1435 	HeapTuple	tuple;
1436 	ScanKeyData entry[1];
1437 
1438 	/*
1439 	 * Search pg_tablespace.  We use a heapscan here even though there is an
1440 	 * index on name, on the theory that pg_tablespace will usually have just
1441 	 * a few entries and so an indexed lookup is a waste of effort.
1442 	 */
1443 	rel = table_open(TableSpaceRelationId, AccessShareLock);
1444 
1445 	ScanKeyInit(&entry[0],
1446 				Anum_pg_tablespace_spcname,
1447 				BTEqualStrategyNumber, F_NAMEEQ,
1448 				CStringGetDatum(tablespacename));
1449 	scandesc = table_beginscan_catalog(rel, 1, entry);
1450 	tuple = heap_getnext(scandesc, ForwardScanDirection);
1451 
1452 	/* We assume that there can be at most one matching tuple */
1453 	if (HeapTupleIsValid(tuple))
1454 		result = ((Form_pg_tablespace) GETSTRUCT(tuple))->oid;
1455 	else
1456 		result = InvalidOid;
1457 
1458 	table_endscan(scandesc);
1459 	table_close(rel, AccessShareLock);
1460 
1461 	if (!OidIsValid(result) && !missing_ok)
1462 		ereport(ERROR,
1463 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1464 				 errmsg("tablespace \"%s\" does not exist",
1465 						tablespacename)));
1466 
1467 	return result;
1468 }
1469 
1470 /*
1471  * get_tablespace_name - given a tablespace OID, look up the name
1472  *
1473  * Returns a palloc'd string, or NULL if no such tablespace.
1474  */
1475 char *
get_tablespace_name(Oid spc_oid)1476 get_tablespace_name(Oid spc_oid)
1477 {
1478 	char	   *result;
1479 	Relation	rel;
1480 	TableScanDesc scandesc;
1481 	HeapTuple	tuple;
1482 	ScanKeyData entry[1];
1483 
1484 	/*
1485 	 * Search pg_tablespace.  We use a heapscan here even though there is an
1486 	 * index on oid, on the theory that pg_tablespace will usually have just a
1487 	 * few entries and so an indexed lookup is a waste of effort.
1488 	 */
1489 	rel = table_open(TableSpaceRelationId, AccessShareLock);
1490 
1491 	ScanKeyInit(&entry[0],
1492 				Anum_pg_tablespace_oid,
1493 				BTEqualStrategyNumber, F_OIDEQ,
1494 				ObjectIdGetDatum(spc_oid));
1495 	scandesc = table_beginscan_catalog(rel, 1, entry);
1496 	tuple = heap_getnext(scandesc, ForwardScanDirection);
1497 
1498 	/* We assume that there can be at most one matching tuple */
1499 	if (HeapTupleIsValid(tuple))
1500 		result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname));
1501 	else
1502 		result = NULL;
1503 
1504 	table_endscan(scandesc);
1505 	table_close(rel, AccessShareLock);
1506 
1507 	return result;
1508 }
1509 
1510 
1511 /*
1512  * TABLESPACE resource manager's routines
1513  */
1514 void
tblspc_redo(XLogReaderState * record)1515 tblspc_redo(XLogReaderState *record)
1516 {
1517 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1518 
1519 	/* Backup blocks are not used in tblspc records */
1520 	Assert(!XLogRecHasAnyBlockRefs(record));
1521 
1522 	if (info == XLOG_TBLSPC_CREATE)
1523 	{
1524 		xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
1525 		char	   *location = xlrec->ts_path;
1526 
1527 		create_tablespace_directories(location, xlrec->ts_id);
1528 	}
1529 	else if (info == XLOG_TBLSPC_DROP)
1530 	{
1531 		xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);
1532 
1533 		/*
1534 		 * If we issued a WAL record for a drop tablespace it implies that
1535 		 * there were no files in it at all when the DROP was done. That means
1536 		 * that no permanent objects can exist in it at this point.
1537 		 *
1538 		 * It is possible for standby users to be using this tablespace as a
1539 		 * location for their temporary files, so if we fail to remove all
1540 		 * files then do conflict processing and try again, if currently
1541 		 * enabled.
1542 		 *
1543 		 * Other possible reasons for failure include bollixed file
1544 		 * permissions on a standby server when they were okay on the primary,
1545 		 * etc etc. There's not much we can do about that, so just remove what
1546 		 * we can and press on.
1547 		 */
1548 		if (!destroy_tablespace_directories(xlrec->ts_id, true))
1549 		{
1550 			ResolveRecoveryConflictWithTablespace(xlrec->ts_id);
1551 
1552 			/*
1553 			 * If we did recovery processing then hopefully the backends who
1554 			 * wrote temp files should have cleaned up and exited by now.  So
1555 			 * retry before complaining.  If we fail again, this is just a LOG
1556 			 * condition, because it's not worth throwing an ERROR for (as
1557 			 * that would crash the database and require manual intervention
1558 			 * before we could get past this WAL record on restart).
1559 			 */
1560 			if (!destroy_tablespace_directories(xlrec->ts_id, true))
1561 				ereport(LOG,
1562 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1563 						 errmsg("directories for tablespace %u could not be removed",
1564 								xlrec->ts_id),
1565 						 errhint("You can remove the directories manually if necessary.")));
1566 		}
1567 	}
1568 	else
1569 		elog(PANIC, "tblspc_redo: unknown op code %u", info);
1570 }
1571