1 /*-------------------------------------------------------------------------
2  *
3  * tablespace.c
4  *	  Commands to manipulate table spaces
5  *
6  * Tablespaces in PostgreSQL are designed to allow users to determine
7  * where the data file(s) for a given database object reside on the file
8  * system.
9  *
10  * A tablespace represents a directory on the file system. At tablespace
11  * creation time, the directory must be empty. To simplify things and
12  * remove the possibility of having file name conflicts, we isolate
13  * files within a tablespace into database-specific subdirectories.
14  *
15  * To support file access via the information given in RelFileNode, we
16  * maintain a symbolic-link map in $PGDATA/pg_tblspc. The symlinks are
17  * named by tablespace OIDs and point to the actual tablespace directories.
18  * There is also a per-cluster version directory in each tablespace.
19  * Thus the full path to an arbitrary file is
20  *			$PGDATA/pg_tblspc/spcoid/PG_MAJORVER_CATVER/dboid/relfilenode
21  * e.g.
22  *			$PGDATA/pg_tblspc/20981/PG_9.0_201002161/719849/83292814
23  *
24  * There are two tablespaces created at initdb time: pg_global (for shared
25  * tables) and pg_default (for everything else).  For backwards compatibility
26  * and to remain functional on platforms without symlinks, these tablespaces
27  * are accessed specially: they are respectively
28  *			$PGDATA/global/relfilenode
29  *			$PGDATA/base/dboid/relfilenode
30  *
31  * To allow CREATE DATABASE to give a new database a default tablespace
32  * that's different from the template database's default, we make the
33  * provision that a zero in pg_class.reltablespace means the database's
34  * default tablespace.  Without this, CREATE DATABASE would have to go in
35  * and munge the system catalogs of the new database.
36  *
37  *
38  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
39  * Portions Copyright (c) 1994, Regents of the University of California
40  *
41  *
42  * IDENTIFICATION
43  *	  src/backend/commands/tablespace.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <unistd.h>
50 #include <dirent.h>
51 #include <sys/stat.h>
52 
53 #include "access/heapam.h"
54 #include "access/htup_details.h"
55 #include "access/reloptions.h"
56 #include "access/sysattr.h"
57 #include "access/tableam.h"
58 #include "access/xact.h"
59 #include "access/xlog.h"
60 #include "access/xloginsert.h"
61 #include "catalog/catalog.h"
62 #include "catalog/dependency.h"
63 #include "catalog/indexing.h"
64 #include "catalog/namespace.h"
65 #include "catalog/objectaccess.h"
66 #include "catalog/pg_namespace.h"
67 #include "catalog/pg_tablespace.h"
68 #include "commands/comment.h"
69 #include "commands/seclabel.h"
70 #include "commands/tablecmds.h"
71 #include "commands/tablespace.h"
72 #include "common/file_perm.h"
73 #include "miscadmin.h"
74 #include "postmaster/bgwriter.h"
75 #include "storage/fd.h"
76 #include "storage/lmgr.h"
77 #include "storage/standby.h"
78 #include "utils/acl.h"
79 #include "utils/builtins.h"
80 #include "utils/fmgroids.h"
81 #include "utils/guc.h"
82 #include "utils/lsyscache.h"
83 #include "utils/memutils.h"
84 #include "utils/rel.h"
85 #include "utils/varlena.h"
86 
87 /* GUC variables */
88 char	   *default_tablespace = NULL;
89 char	   *temp_tablespaces = NULL;
90 
91 
92 static void create_tablespace_directories(const char *location,
93 										  const Oid tablespaceoid);
94 static bool destroy_tablespace_directories(Oid tablespaceoid, bool redo);
95 
96 
97 /*
98  * Each database using a table space is isolated into its own name space
99  * by a subdirectory named for the database OID.  On first creation of an
100  * object in the tablespace, create the subdirectory.  If the subdirectory
101  * already exists, fall through quietly.
102  *
103  * isRedo indicates that we are creating an object during WAL replay.
104  * In this case we will cope with the possibility of the tablespace
105  * directory not being there either --- this could happen if we are
106  * replaying an operation on a table in a subsequently-dropped tablespace.
107  * We handle this by making a directory in the place where the tablespace
108  * symlink would normally be.  This isn't an exact replay of course, but
109  * it's the best we can do given the available information.
110  *
111  * If tablespaces are not supported, we still need it in case we have to
112  * re-create a database subdirectory (of $PGDATA/base) during WAL replay.
113  */
114 void
TablespaceCreateDbspace(Oid spcNode,Oid dbNode,bool isRedo)115 TablespaceCreateDbspace(Oid spcNode, Oid dbNode, bool isRedo)
116 {
117 	struct stat st;
118 	char	   *dir;
119 
120 	/*
121 	 * The global tablespace doesn't have per-database subdirectories, so
122 	 * nothing to do for it.
123 	 */
124 	if (spcNode == GLOBALTABLESPACE_OID)
125 		return;
126 
127 	Assert(OidIsValid(spcNode));
128 	Assert(OidIsValid(dbNode));
129 
130 	dir = GetDatabasePath(dbNode, spcNode);
131 
132 	if (stat(dir, &st) < 0)
133 	{
134 		/* Directory does not exist? */
135 		if (errno == ENOENT)
136 		{
137 			/*
138 			 * Acquire TablespaceCreateLock to ensure that no DROP TABLESPACE
139 			 * or TablespaceCreateDbspace is running concurrently.
140 			 */
141 			LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
142 
143 			/*
144 			 * Recheck to see if someone created the directory while we were
145 			 * waiting for lock.
146 			 */
147 			if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode))
148 			{
149 				/* Directory was created */
150 			}
151 			else
152 			{
153 				/* Directory creation failed? */
154 				if (MakePGDirectory(dir) < 0)
155 				{
156 					char	   *parentdir;
157 
158 					/* Failure other than not exists or not in WAL replay? */
159 					if (errno != ENOENT || !isRedo)
160 						ereport(ERROR,
161 								(errcode_for_file_access(),
162 								 errmsg("could not create directory \"%s\": %m",
163 										dir)));
164 
165 					/*
166 					 * Parent directories are missing during WAL replay, so
167 					 * continue by creating simple parent directories rather
168 					 * than a symlink.
169 					 */
170 
171 					/* create two parents up if not exist */
172 					parentdir = pstrdup(dir);
173 					get_parent_directory(parentdir);
174 					get_parent_directory(parentdir);
175 					/* Can't create parent and it doesn't already exist? */
176 					if (MakePGDirectory(parentdir) < 0 && errno != EEXIST)
177 						ereport(ERROR,
178 								(errcode_for_file_access(),
179 								 errmsg("could not create directory \"%s\": %m",
180 										parentdir)));
181 					pfree(parentdir);
182 
183 					/* create one parent up if not exist */
184 					parentdir = pstrdup(dir);
185 					get_parent_directory(parentdir);
186 					/* Can't create parent and it doesn't already exist? */
187 					if (MakePGDirectory(parentdir) < 0 && errno != EEXIST)
188 						ereport(ERROR,
189 								(errcode_for_file_access(),
190 								 errmsg("could not create directory \"%s\": %m",
191 										parentdir)));
192 					pfree(parentdir);
193 
194 					/* Create database directory */
195 					if (MakePGDirectory(dir) < 0)
196 						ereport(ERROR,
197 								(errcode_for_file_access(),
198 								 errmsg("could not create directory \"%s\": %m",
199 										dir)));
200 				}
201 			}
202 
203 			LWLockRelease(TablespaceCreateLock);
204 		}
205 		else
206 		{
207 			ereport(ERROR,
208 					(errcode_for_file_access(),
209 					 errmsg("could not stat directory \"%s\": %m", dir)));
210 		}
211 	}
212 	else
213 	{
214 		/* Is it not a directory? */
215 		if (!S_ISDIR(st.st_mode))
216 			ereport(ERROR,
217 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
218 					 errmsg("\"%s\" exists but is not a directory",
219 							dir)));
220 	}
221 
222 	pfree(dir);
223 }
224 
225 /*
226  * Create a table space
227  *
228  * Only superusers can create a tablespace. This seems a reasonable restriction
229  * since we're determining the system layout and, anyway, we probably have
230  * root if we're doing this kind of activity
231  */
232 Oid
CreateTableSpace(CreateTableSpaceStmt * stmt)233 CreateTableSpace(CreateTableSpaceStmt *stmt)
234 {
235 #ifdef HAVE_SYMLINK
236 	Relation	rel;
237 	Datum		values[Natts_pg_tablespace];
238 	bool		nulls[Natts_pg_tablespace];
239 	HeapTuple	tuple;
240 	Oid			tablespaceoid;
241 	char	   *location;
242 	Oid			ownerId;
243 	Datum		newOptions;
244 
245 	/* Must be super user */
246 	if (!superuser())
247 		ereport(ERROR,
248 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
249 				 errmsg("permission denied to create tablespace \"%s\"",
250 						stmt->tablespacename),
251 				 errhint("Must be superuser to create a tablespace.")));
252 
253 	/* However, the eventual owner of the tablespace need not be */
254 	if (stmt->owner)
255 		ownerId = get_rolespec_oid(stmt->owner, false);
256 	else
257 		ownerId = GetUserId();
258 
259 	/* Unix-ify the offered path, and strip any trailing slashes */
260 	location = pstrdup(stmt->location);
261 	canonicalize_path(location);
262 
263 	/* disallow quotes, else CREATE DATABASE would be at risk */
264 	if (strchr(location, '\''))
265 		ereport(ERROR,
266 				(errcode(ERRCODE_INVALID_NAME),
267 				 errmsg("tablespace location cannot contain single quotes")));
268 
269 	/*
270 	 * Allowing relative paths seems risky
271 	 *
272 	 * this also helps us ensure that location is not empty or whitespace
273 	 */
274 	if (!is_absolute_path(location))
275 		ereport(ERROR,
276 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
277 				 errmsg("tablespace location must be an absolute path")));
278 
279 	/*
280 	 * Check that location isn't too long. Remember that we're going to append
281 	 * 'PG_XXX/<dboid>/<relid>_<fork>.<nnn>'.  FYI, we never actually
282 	 * reference the whole path here, but MakePGDirectory() uses the first two
283 	 * parts.
284 	 */
285 	if (strlen(location) + 1 + strlen(TABLESPACE_VERSION_DIRECTORY) + 1 +
286 		OIDCHARS + 1 + OIDCHARS + 1 + FORKNAMECHARS + 1 + OIDCHARS > MAXPGPATH)
287 		ereport(ERROR,
288 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
289 				 errmsg("tablespace location \"%s\" is too long",
290 						location)));
291 
292 	/* Warn if the tablespace is in the data directory. */
293 	if (path_is_prefix_of_path(DataDir, location))
294 		ereport(WARNING,
295 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
296 				 errmsg("tablespace location should not be inside the data directory")));
297 
298 	/*
299 	 * Disallow creation of tablespaces named "pg_xxx"; we reserve this
300 	 * namespace for system purposes.
301 	 */
302 	if (!allowSystemTableMods && IsReservedName(stmt->tablespacename))
303 		ereport(ERROR,
304 				(errcode(ERRCODE_RESERVED_NAME),
305 				 errmsg("unacceptable tablespace name \"%s\"",
306 						stmt->tablespacename),
307 				 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
308 
309 	/*
310 	 * If built with appropriate switch, whine when regression-testing
311 	 * conventions for tablespace names are violated.
312 	 */
313 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
314 	if (strncmp(stmt->tablespacename, "regress_", 8) != 0)
315 		elog(WARNING, "tablespaces created by regression test cases should have names starting with \"regress_\"");
316 #endif
317 
318 	/*
319 	 * Check that there is no other tablespace by this name.  (The unique
320 	 * index would catch this anyway, but might as well give a friendlier
321 	 * message.)
322 	 */
323 	if (OidIsValid(get_tablespace_oid(stmt->tablespacename, true)))
324 		ereport(ERROR,
325 				(errcode(ERRCODE_DUPLICATE_OBJECT),
326 				 errmsg("tablespace \"%s\" already exists",
327 						stmt->tablespacename)));
328 
329 	/*
330 	 * Insert tuple into pg_tablespace.  The purpose of doing this first is to
331 	 * lock the proposed tablename against other would-be creators. The
332 	 * insertion will roll back if we find problems below.
333 	 */
334 	rel = table_open(TableSpaceRelationId, RowExclusiveLock);
335 
336 	MemSet(nulls, false, sizeof(nulls));
337 
338 	tablespaceoid = GetNewOidWithIndex(rel, TablespaceOidIndexId,
339 									   Anum_pg_tablespace_oid);
340 	values[Anum_pg_tablespace_oid - 1] = ObjectIdGetDatum(tablespaceoid);
341 	values[Anum_pg_tablespace_spcname - 1] =
342 		DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename));
343 	values[Anum_pg_tablespace_spcowner - 1] =
344 		ObjectIdGetDatum(ownerId);
345 	nulls[Anum_pg_tablespace_spcacl - 1] = true;
346 
347 	/* Generate new proposed spcoptions (text array) */
348 	newOptions = transformRelOptions((Datum) 0,
349 									 stmt->options,
350 									 NULL, NULL, false, false);
351 	(void) tablespace_reloptions(newOptions, true);
352 	if (newOptions != (Datum) 0)
353 		values[Anum_pg_tablespace_spcoptions - 1] = newOptions;
354 	else
355 		nulls[Anum_pg_tablespace_spcoptions - 1] = true;
356 
357 	tuple = heap_form_tuple(rel->rd_att, values, nulls);
358 
359 	CatalogTupleInsert(rel, tuple);
360 
361 	heap_freetuple(tuple);
362 
363 	/* Record dependency on owner */
364 	recordDependencyOnOwner(TableSpaceRelationId, tablespaceoid, ownerId);
365 
366 	/* Post creation hook for new tablespace */
367 	InvokeObjectPostCreateHook(TableSpaceRelationId, tablespaceoid, 0);
368 
369 	create_tablespace_directories(location, tablespaceoid);
370 
371 	/* Record the filesystem change in XLOG */
372 	{
373 		xl_tblspc_create_rec xlrec;
374 
375 		xlrec.ts_id = tablespaceoid;
376 
377 		XLogBeginInsert();
378 		XLogRegisterData((char *) &xlrec,
379 						 offsetof(xl_tblspc_create_rec, ts_path));
380 		XLogRegisterData((char *) location, strlen(location) + 1);
381 
382 		(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_CREATE);
383 	}
384 
385 	/*
386 	 * Force synchronous commit, to minimize the window between creating the
387 	 * symlink on-disk and marking the transaction committed.  It's not great
388 	 * that there is any window at all, but definitely we don't want to make
389 	 * it larger than necessary.
390 	 */
391 	ForceSyncCommit();
392 
393 	pfree(location);
394 
395 	/* We keep the lock on pg_tablespace until commit */
396 	table_close(rel, NoLock);
397 
398 	return tablespaceoid;
399 #else							/* !HAVE_SYMLINK */
400 	ereport(ERROR,
401 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
402 			 errmsg("tablespaces are not supported on this platform")));
403 	return InvalidOid;			/* keep compiler quiet */
404 #endif							/* HAVE_SYMLINK */
405 }
406 
407 /*
408  * Drop a table space
409  *
410  * Be careful to check that the tablespace is empty.
411  */
412 void
DropTableSpace(DropTableSpaceStmt * stmt)413 DropTableSpace(DropTableSpaceStmt *stmt)
414 {
415 #ifdef HAVE_SYMLINK
416 	char	   *tablespacename = stmt->tablespacename;
417 	TableScanDesc scandesc;
418 	Relation	rel;
419 	HeapTuple	tuple;
420 	Form_pg_tablespace spcform;
421 	ScanKeyData entry[1];
422 	Oid			tablespaceoid;
423 	char	   *detail;
424 	char	   *detail_log;
425 
426 	/*
427 	 * Find the target tuple
428 	 */
429 	rel = table_open(TableSpaceRelationId, RowExclusiveLock);
430 
431 	ScanKeyInit(&entry[0],
432 				Anum_pg_tablespace_spcname,
433 				BTEqualStrategyNumber, F_NAMEEQ,
434 				CStringGetDatum(tablespacename));
435 	scandesc = table_beginscan_catalog(rel, 1, entry);
436 	tuple = heap_getnext(scandesc, ForwardScanDirection);
437 
438 	if (!HeapTupleIsValid(tuple))
439 	{
440 		if (!stmt->missing_ok)
441 		{
442 			ereport(ERROR,
443 					(errcode(ERRCODE_UNDEFINED_OBJECT),
444 					 errmsg("tablespace \"%s\" does not exist",
445 							tablespacename)));
446 		}
447 		else
448 		{
449 			ereport(NOTICE,
450 					(errmsg("tablespace \"%s\" does not exist, skipping",
451 							tablespacename)));
452 			/* XXX I assume I need one or both of these next two calls */
453 			table_endscan(scandesc);
454 			table_close(rel, NoLock);
455 		}
456 		return;
457 	}
458 
459 	spcform = (Form_pg_tablespace) GETSTRUCT(tuple);
460 	tablespaceoid = spcform->oid;
461 
462 	/* Must be tablespace owner */
463 	if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
464 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_TABLESPACE,
465 					   tablespacename);
466 
467 	/* Disallow drop of the standard tablespaces, even by superuser */
468 	if (tablespaceoid == GLOBALTABLESPACE_OID ||
469 		tablespaceoid == DEFAULTTABLESPACE_OID)
470 		aclcheck_error(ACLCHECK_NO_PRIV, OBJECT_TABLESPACE,
471 					   tablespacename);
472 
473 	/* Check for pg_shdepend entries depending on this tablespace */
474 	if (checkSharedDependencies(TableSpaceRelationId, tablespaceoid,
475 								&detail, &detail_log))
476 		ereport(ERROR,
477 				(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
478 				 errmsg("tablespace \"%s\" cannot be dropped because some objects depend on it",
479 						tablespacename),
480 				 errdetail_internal("%s", detail),
481 				 errdetail_log("%s", detail_log)));
482 
483 	/* DROP hook for the tablespace being removed */
484 	InvokeObjectDropHook(TableSpaceRelationId, tablespaceoid, 0);
485 
486 	/*
487 	 * Remove the pg_tablespace tuple (this will roll back if we fail below)
488 	 */
489 	CatalogTupleDelete(rel, &tuple->t_self);
490 
491 	table_endscan(scandesc);
492 
493 	/*
494 	 * Remove any comments or security labels on this tablespace.
495 	 */
496 	DeleteSharedComments(tablespaceoid, TableSpaceRelationId);
497 	DeleteSharedSecurityLabel(tablespaceoid, TableSpaceRelationId);
498 
499 	/*
500 	 * Remove dependency on owner.
501 	 */
502 	deleteSharedDependencyRecordsFor(TableSpaceRelationId, tablespaceoid, 0);
503 
504 	/*
505 	 * Acquire TablespaceCreateLock to ensure that no TablespaceCreateDbspace
506 	 * is running concurrently.
507 	 */
508 	LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
509 
510 	/*
511 	 * Try to remove the physical infrastructure.
512 	 */
513 	if (!destroy_tablespace_directories(tablespaceoid, false))
514 	{
515 		/*
516 		 * Not all files deleted?  However, there can be lingering empty files
517 		 * in the directories, left behind by for example DROP TABLE, that
518 		 * have been scheduled for deletion at next checkpoint (see comments
519 		 * in mdunlink() for details).  We could just delete them immediately,
520 		 * but we can't tell them apart from important data files that we
521 		 * mustn't delete.  So instead, we force a checkpoint which will clean
522 		 * out any lingering files, and try again.
523 		 *
524 		 * XXX On Windows, an unlinked file persists in the directory listing
525 		 * until no process retains an open handle for the file.  The DDL
526 		 * commands that schedule files for unlink send invalidation messages
527 		 * directing other PostgreSQL processes to close the files.  DROP
528 		 * TABLESPACE should not give up on the tablespace becoming empty
529 		 * until all relevant invalidation processing is complete.
530 		 */
531 		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
532 		if (!destroy_tablespace_directories(tablespaceoid, false))
533 		{
534 			/* Still not empty, the files must be important then */
535 			ereport(ERROR,
536 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
537 					 errmsg("tablespace \"%s\" is not empty",
538 							tablespacename)));
539 		}
540 	}
541 
542 	/* Record the filesystem change in XLOG */
543 	{
544 		xl_tblspc_drop_rec xlrec;
545 
546 		xlrec.ts_id = tablespaceoid;
547 
548 		XLogBeginInsert();
549 		XLogRegisterData((char *) &xlrec, sizeof(xl_tblspc_drop_rec));
550 
551 		(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP);
552 	}
553 
554 	/*
555 	 * Note: because we checked that the tablespace was empty, there should be
556 	 * no need to worry about flushing shared buffers or free space map
557 	 * entries for relations in the tablespace.
558 	 */
559 
560 	/*
561 	 * Force synchronous commit, to minimize the window between removing the
562 	 * files on-disk and marking the transaction committed.  It's not great
563 	 * that there is any window at all, but definitely we don't want to make
564 	 * it larger than necessary.
565 	 */
566 	ForceSyncCommit();
567 
568 	/*
569 	 * Allow TablespaceCreateDbspace again.
570 	 */
571 	LWLockRelease(TablespaceCreateLock);
572 
573 	/* We keep the lock on pg_tablespace until commit */
574 	table_close(rel, NoLock);
575 #else							/* !HAVE_SYMLINK */
576 	ereport(ERROR,
577 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
578 			 errmsg("tablespaces are not supported on this platform")));
579 #endif							/* HAVE_SYMLINK */
580 }
581 
582 
583 /*
584  * create_tablespace_directories
585  *
586  *	Attempt to create filesystem infrastructure linking $PGDATA/pg_tblspc/
587  *	to the specified directory
588  */
589 static void
create_tablespace_directories(const char * location,const Oid tablespaceoid)590 create_tablespace_directories(const char *location, const Oid tablespaceoid)
591 {
592 	char	   *linkloc;
593 	char	   *location_with_version_dir;
594 	struct stat st;
595 
596 	linkloc = psprintf("pg_tblspc/%u", tablespaceoid);
597 	location_with_version_dir = psprintf("%s/%s", location,
598 										 TABLESPACE_VERSION_DIRECTORY);
599 
600 	/*
601 	 * Attempt to coerce target directory to safe permissions.  If this fails,
602 	 * it doesn't exist or has the wrong owner.
603 	 */
604 	if (chmod(location, pg_dir_create_mode) != 0)
605 	{
606 		if (errno == ENOENT)
607 			ereport(ERROR,
608 					(errcode(ERRCODE_UNDEFINED_FILE),
609 					 errmsg("directory \"%s\" does not exist", location),
610 					 InRecovery ? errhint("Create this directory for the tablespace before "
611 										  "restarting the server.") : 0));
612 		else
613 			ereport(ERROR,
614 					(errcode_for_file_access(),
615 					 errmsg("could not set permissions on directory \"%s\": %m",
616 							location)));
617 	}
618 
619 	/*
620 	 * The creation of the version directory prevents more than one tablespace
621 	 * in a single location.  This imitates TablespaceCreateDbspace(), but it
622 	 * ignores concurrency and missing parent directories.  The chmod() would
623 	 * have failed in the absence of a parent.  pg_tablespace_spcname_index
624 	 * prevents concurrency.
625 	 */
626 	if (stat(location_with_version_dir, &st) < 0)
627 	{
628 		if (errno != ENOENT)
629 			ereport(ERROR,
630 					(errcode_for_file_access(),
631 					 errmsg("could not stat directory \"%s\": %m",
632 							location_with_version_dir)));
633 		else if (MakePGDirectory(location_with_version_dir) < 0)
634 			ereport(ERROR,
635 					(errcode_for_file_access(),
636 					 errmsg("could not create directory \"%s\": %m",
637 							location_with_version_dir)));
638 	}
639 	else if (!S_ISDIR(st.st_mode))
640 		ereport(ERROR,
641 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
642 				 errmsg("\"%s\" exists but is not a directory",
643 						location_with_version_dir)));
644 	else if (!InRecovery)
645 		ereport(ERROR,
646 				(errcode(ERRCODE_OBJECT_IN_USE),
647 				 errmsg("directory \"%s\" already in use as a tablespace",
648 						location_with_version_dir)));
649 
650 	/*
651 	 * In recovery, remove old symlink, in case it points to the wrong place.
652 	 */
653 	if (InRecovery)
654 		remove_tablespace_symlink(linkloc);
655 
656 	/*
657 	 * Create the symlink under PGDATA
658 	 */
659 	if (symlink(location, linkloc) < 0)
660 		ereport(ERROR,
661 				(errcode_for_file_access(),
662 				 errmsg("could not create symbolic link \"%s\": %m",
663 						linkloc)));
664 
665 	pfree(linkloc);
666 	pfree(location_with_version_dir);
667 }
668 
669 
670 /*
671  * destroy_tablespace_directories
672  *
673  * Attempt to remove filesystem infrastructure for the tablespace.
674  *
675  * 'redo' indicates we are redoing a drop from XLOG; in that case we should
676  * not throw an ERROR for problems, just LOG them.  The worst consequence of
677  * not removing files here would be failure to release some disk space, which
678  * does not justify throwing an error that would require manual intervention
679  * to get the database running again.
680  *
681  * Returns true if successful, false if some subdirectory is not empty
682  */
683 static bool
destroy_tablespace_directories(Oid tablespaceoid,bool redo)684 destroy_tablespace_directories(Oid tablespaceoid, bool redo)
685 {
686 	char	   *linkloc;
687 	char	   *linkloc_with_version_dir;
688 	DIR		   *dirdesc;
689 	struct dirent *de;
690 	char	   *subfile;
691 	struct stat st;
692 
693 	linkloc_with_version_dir = psprintf("pg_tblspc/%u/%s", tablespaceoid,
694 										TABLESPACE_VERSION_DIRECTORY);
695 
696 	/*
697 	 * Check if the tablespace still contains any files.  We try to rmdir each
698 	 * per-database directory we find in it.  rmdir failure implies there are
699 	 * still files in that subdirectory, so give up.  (We do not have to worry
700 	 * about undoing any already completed rmdirs, since the next attempt to
701 	 * use the tablespace from that database will simply recreate the
702 	 * subdirectory via TablespaceCreateDbspace.)
703 	 *
704 	 * Since we hold TablespaceCreateLock, no one else should be creating any
705 	 * fresh subdirectories in parallel. It is possible that new files are
706 	 * being created within subdirectories, though, so the rmdir call could
707 	 * fail.  Worst consequence is a less friendly error message.
708 	 *
709 	 * If redo is true then ENOENT is a likely outcome here, and we allow it
710 	 * to pass without comment.  In normal operation we still allow it, but
711 	 * with a warning.  This is because even though ProcessUtility disallows
712 	 * DROP TABLESPACE in a transaction block, it's possible that a previous
713 	 * DROP failed and rolled back after removing the tablespace directories
714 	 * and/or symlink.  We want to allow a new DROP attempt to succeed at
715 	 * removing the catalog entries (and symlink if still present), so we
716 	 * should not give a hard error here.
717 	 */
718 	dirdesc = AllocateDir(linkloc_with_version_dir);
719 	if (dirdesc == NULL)
720 	{
721 		if (errno == ENOENT)
722 		{
723 			if (!redo)
724 				ereport(WARNING,
725 						(errcode_for_file_access(),
726 						 errmsg("could not open directory \"%s\": %m",
727 								linkloc_with_version_dir)));
728 			/* The symlink might still exist, so go try to remove it */
729 			goto remove_symlink;
730 		}
731 		else if (redo)
732 		{
733 			/* in redo, just log other types of error */
734 			ereport(LOG,
735 					(errcode_for_file_access(),
736 					 errmsg("could not open directory \"%s\": %m",
737 							linkloc_with_version_dir)));
738 			pfree(linkloc_with_version_dir);
739 			return false;
740 		}
741 		/* else let ReadDir report the error */
742 	}
743 
744 	while ((de = ReadDir(dirdesc, linkloc_with_version_dir)) != NULL)
745 	{
746 		if (strcmp(de->d_name, ".") == 0 ||
747 			strcmp(de->d_name, "..") == 0)
748 			continue;
749 
750 		subfile = psprintf("%s/%s", linkloc_with_version_dir, de->d_name);
751 
752 		/* This check is just to deliver a friendlier error message */
753 		if (!redo && !directory_is_empty(subfile))
754 		{
755 			FreeDir(dirdesc);
756 			pfree(subfile);
757 			pfree(linkloc_with_version_dir);
758 			return false;
759 		}
760 
761 		/* remove empty directory */
762 		if (rmdir(subfile) < 0)
763 			ereport(redo ? LOG : ERROR,
764 					(errcode_for_file_access(),
765 					 errmsg("could not remove directory \"%s\": %m",
766 							subfile)));
767 
768 		pfree(subfile);
769 	}
770 
771 	FreeDir(dirdesc);
772 
773 	/* remove version directory */
774 	if (rmdir(linkloc_with_version_dir) < 0)
775 	{
776 		ereport(redo ? LOG : ERROR,
777 				(errcode_for_file_access(),
778 				 errmsg("could not remove directory \"%s\": %m",
779 						linkloc_with_version_dir)));
780 		pfree(linkloc_with_version_dir);
781 		return false;
782 	}
783 
784 	/*
785 	 * Try to remove the symlink.  We must however deal with the possibility
786 	 * that it's a directory instead of a symlink --- this could happen during
787 	 * WAL replay (see TablespaceCreateDbspace), and it is also the case on
788 	 * Windows where junction points lstat() as directories.
789 	 *
790 	 * Note: in the redo case, we'll return true if this final step fails;
791 	 * there's no point in retrying it.  Also, ENOENT should provoke no more
792 	 * than a warning.
793 	 */
794 remove_symlink:
795 	linkloc = pstrdup(linkloc_with_version_dir);
796 	get_parent_directory(linkloc);
797 	if (lstat(linkloc, &st) < 0)
798 	{
799 		int			saved_errno = errno;
800 
801 		ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
802 				(errcode_for_file_access(),
803 				 errmsg("could not stat file \"%s\": %m",
804 						linkloc)));
805 	}
806 	else if (S_ISDIR(st.st_mode))
807 	{
808 		if (rmdir(linkloc) < 0)
809 		{
810 			int			saved_errno = errno;
811 
812 			ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
813 					(errcode_for_file_access(),
814 					 errmsg("could not remove directory \"%s\": %m",
815 							linkloc)));
816 		}
817 	}
818 #ifdef S_ISLNK
819 	else if (S_ISLNK(st.st_mode))
820 	{
821 		if (unlink(linkloc) < 0)
822 		{
823 			int			saved_errno = errno;
824 
825 			ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
826 					(errcode_for_file_access(),
827 					 errmsg("could not remove symbolic link \"%s\": %m",
828 							linkloc)));
829 		}
830 	}
831 #endif
832 	else
833 	{
834 		/* Refuse to remove anything that's not a directory or symlink */
835 		ereport(redo ? LOG : ERROR,
836 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
837 				 errmsg("\"%s\" is not a directory or symbolic link",
838 						linkloc)));
839 	}
840 
841 	pfree(linkloc_with_version_dir);
842 	pfree(linkloc);
843 
844 	return true;
845 }
846 
847 
848 /*
849  * Check if a directory is empty.
850  *
851  * This probably belongs somewhere else, but not sure where...
852  */
853 bool
directory_is_empty(const char * path)854 directory_is_empty(const char *path)
855 {
856 	DIR		   *dirdesc;
857 	struct dirent *de;
858 
859 	dirdesc = AllocateDir(path);
860 
861 	while ((de = ReadDir(dirdesc, path)) != NULL)
862 	{
863 		if (strcmp(de->d_name, ".") == 0 ||
864 			strcmp(de->d_name, "..") == 0)
865 			continue;
866 		FreeDir(dirdesc);
867 		return false;
868 	}
869 
870 	FreeDir(dirdesc);
871 	return true;
872 }
873 
874 /*
875  *	remove_tablespace_symlink
876  *
877  * This function removes symlinks in pg_tblspc.  On Windows, junction points
878  * act like directories so we must be able to apply rmdir.  This function
879  * works like the symlink removal code in destroy_tablespace_directories,
880  * except that failure to remove is always an ERROR.  But if the file doesn't
881  * exist at all, that's OK.
882  */
883 void
remove_tablespace_symlink(const char * linkloc)884 remove_tablespace_symlink(const char *linkloc)
885 {
886 	struct stat st;
887 
888 	if (lstat(linkloc, &st) < 0)
889 	{
890 		if (errno == ENOENT)
891 			return;
892 		ereport(ERROR,
893 				(errcode_for_file_access(),
894 				 errmsg("could not stat file \"%s\": %m", linkloc)));
895 	}
896 
897 	if (S_ISDIR(st.st_mode))
898 	{
899 		/*
900 		 * This will fail if the directory isn't empty, but not if it's a
901 		 * junction point.
902 		 */
903 		if (rmdir(linkloc) < 0 && errno != ENOENT)
904 			ereport(ERROR,
905 					(errcode_for_file_access(),
906 					 errmsg("could not remove directory \"%s\": %m",
907 							linkloc)));
908 	}
909 #ifdef S_ISLNK
910 	else if (S_ISLNK(st.st_mode))
911 	{
912 		if (unlink(linkloc) < 0 && errno != ENOENT)
913 			ereport(ERROR,
914 					(errcode_for_file_access(),
915 					 errmsg("could not remove symbolic link \"%s\": %m",
916 							linkloc)));
917 	}
918 #endif
919 	else
920 	{
921 		/* Refuse to remove anything that's not a directory or symlink */
922 		ereport(ERROR,
923 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
924 				 errmsg("\"%s\" is not a directory or symbolic link",
925 						linkloc)));
926 	}
927 }
928 
929 /*
930  * Rename a tablespace
931  */
932 ObjectAddress
RenameTableSpace(const char * oldname,const char * newname)933 RenameTableSpace(const char *oldname, const char *newname)
934 {
935 	Oid			tspId;
936 	Relation	rel;
937 	ScanKeyData entry[1];
938 	TableScanDesc scan;
939 	HeapTuple	tup;
940 	HeapTuple	newtuple;
941 	Form_pg_tablespace newform;
942 	ObjectAddress address;
943 
944 	/* Search pg_tablespace */
945 	rel = table_open(TableSpaceRelationId, RowExclusiveLock);
946 
947 	ScanKeyInit(&entry[0],
948 				Anum_pg_tablespace_spcname,
949 				BTEqualStrategyNumber, F_NAMEEQ,
950 				CStringGetDatum(oldname));
951 	scan = table_beginscan_catalog(rel, 1, entry);
952 	tup = heap_getnext(scan, ForwardScanDirection);
953 	if (!HeapTupleIsValid(tup))
954 		ereport(ERROR,
955 				(errcode(ERRCODE_UNDEFINED_OBJECT),
956 				 errmsg("tablespace \"%s\" does not exist",
957 						oldname)));
958 
959 	newtuple = heap_copytuple(tup);
960 	newform = (Form_pg_tablespace) GETSTRUCT(newtuple);
961 	tspId = newform->oid;
962 
963 	table_endscan(scan);
964 
965 	/* Must be owner */
966 	if (!pg_tablespace_ownercheck(tspId, GetUserId()))
967 		aclcheck_error(ACLCHECK_NO_PRIV, OBJECT_TABLESPACE, oldname);
968 
969 	/* Validate new name */
970 	if (!allowSystemTableMods && IsReservedName(newname))
971 		ereport(ERROR,
972 				(errcode(ERRCODE_RESERVED_NAME),
973 				 errmsg("unacceptable tablespace name \"%s\"", newname),
974 				 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
975 
976 	/*
977 	 * If built with appropriate switch, whine when regression-testing
978 	 * conventions for tablespace names are violated.
979 	 */
980 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
981 	if (strncmp(newname, "regress_", 8) != 0)
982 		elog(WARNING, "tablespaces created by regression test cases should have names starting with \"regress_\"");
983 #endif
984 
985 	/* Make sure the new name doesn't exist */
986 	ScanKeyInit(&entry[0],
987 				Anum_pg_tablespace_spcname,
988 				BTEqualStrategyNumber, F_NAMEEQ,
989 				CStringGetDatum(newname));
990 	scan = table_beginscan_catalog(rel, 1, entry);
991 	tup = heap_getnext(scan, ForwardScanDirection);
992 	if (HeapTupleIsValid(tup))
993 		ereport(ERROR,
994 				(errcode(ERRCODE_DUPLICATE_OBJECT),
995 				 errmsg("tablespace \"%s\" already exists",
996 						newname)));
997 
998 	table_endscan(scan);
999 
1000 	/* OK, update the entry */
1001 	namestrcpy(&(newform->spcname), newname);
1002 
1003 	CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1004 
1005 	InvokeObjectPostAlterHook(TableSpaceRelationId, tspId, 0);
1006 
1007 	ObjectAddressSet(address, TableSpaceRelationId, tspId);
1008 
1009 	table_close(rel, NoLock);
1010 
1011 	return address;
1012 }
1013 
1014 /*
1015  * Alter table space options
1016  */
1017 Oid
AlterTableSpaceOptions(AlterTableSpaceOptionsStmt * stmt)1018 AlterTableSpaceOptions(AlterTableSpaceOptionsStmt *stmt)
1019 {
1020 	Relation	rel;
1021 	ScanKeyData entry[1];
1022 	TableScanDesc scandesc;
1023 	HeapTuple	tup;
1024 	Oid			tablespaceoid;
1025 	Datum		datum;
1026 	Datum		newOptions;
1027 	Datum		repl_val[Natts_pg_tablespace];
1028 	bool		isnull;
1029 	bool		repl_null[Natts_pg_tablespace];
1030 	bool		repl_repl[Natts_pg_tablespace];
1031 	HeapTuple	newtuple;
1032 
1033 	/* Search pg_tablespace */
1034 	rel = table_open(TableSpaceRelationId, RowExclusiveLock);
1035 
1036 	ScanKeyInit(&entry[0],
1037 				Anum_pg_tablespace_spcname,
1038 				BTEqualStrategyNumber, F_NAMEEQ,
1039 				CStringGetDatum(stmt->tablespacename));
1040 	scandesc = table_beginscan_catalog(rel, 1, entry);
1041 	tup = heap_getnext(scandesc, ForwardScanDirection);
1042 	if (!HeapTupleIsValid(tup))
1043 		ereport(ERROR,
1044 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1045 				 errmsg("tablespace \"%s\" does not exist",
1046 						stmt->tablespacename)));
1047 
1048 	tablespaceoid = ((Form_pg_tablespace) GETSTRUCT(tup))->oid;
1049 
1050 	/* Must be owner of the existing object */
1051 	if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
1052 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_TABLESPACE,
1053 					   stmt->tablespacename);
1054 
1055 	/* Generate new proposed spcoptions (text array) */
1056 	datum = heap_getattr(tup, Anum_pg_tablespace_spcoptions,
1057 						 RelationGetDescr(rel), &isnull);
1058 	newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
1059 									 stmt->options, NULL, NULL, false,
1060 									 stmt->isReset);
1061 	(void) tablespace_reloptions(newOptions, true);
1062 
1063 	/* Build new tuple. */
1064 	memset(repl_null, false, sizeof(repl_null));
1065 	memset(repl_repl, false, sizeof(repl_repl));
1066 	if (newOptions != (Datum) 0)
1067 		repl_val[Anum_pg_tablespace_spcoptions - 1] = newOptions;
1068 	else
1069 		repl_null[Anum_pg_tablespace_spcoptions - 1] = true;
1070 	repl_repl[Anum_pg_tablespace_spcoptions - 1] = true;
1071 	newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val,
1072 								 repl_null, repl_repl);
1073 
1074 	/* Update system catalog. */
1075 	CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1076 
1077 	InvokeObjectPostAlterHook(TableSpaceRelationId, tablespaceoid, 0);
1078 
1079 	heap_freetuple(newtuple);
1080 
1081 	/* Conclude heap scan. */
1082 	table_endscan(scandesc);
1083 	table_close(rel, NoLock);
1084 
1085 	return tablespaceoid;
1086 }
1087 
1088 /*
1089  * Routines for handling the GUC variable 'default_tablespace'.
1090  */
1091 
1092 /* check_hook: validate new default_tablespace */
1093 bool
check_default_tablespace(char ** newval,void ** extra,GucSource source)1094 check_default_tablespace(char **newval, void **extra, GucSource source)
1095 {
1096 	/*
1097 	 * If we aren't inside a transaction, or connected to a database, we
1098 	 * cannot do the catalog accesses necessary to verify the name.  Must
1099 	 * accept the value on faith.
1100 	 */
1101 	if (IsTransactionState() && MyDatabaseId != InvalidOid)
1102 	{
1103 		if (**newval != '\0' &&
1104 			!OidIsValid(get_tablespace_oid(*newval, true)))
1105 		{
1106 			/*
1107 			 * When source == PGC_S_TEST, don't throw a hard error for a
1108 			 * nonexistent tablespace, only a NOTICE.  See comments in guc.h.
1109 			 */
1110 			if (source == PGC_S_TEST)
1111 			{
1112 				ereport(NOTICE,
1113 						(errcode(ERRCODE_UNDEFINED_OBJECT),
1114 						 errmsg("tablespace \"%s\" does not exist",
1115 								*newval)));
1116 			}
1117 			else
1118 			{
1119 				GUC_check_errdetail("Tablespace \"%s\" does not exist.",
1120 									*newval);
1121 				return false;
1122 			}
1123 		}
1124 	}
1125 
1126 	return true;
1127 }
1128 
1129 /*
1130  * GetDefaultTablespace -- get the OID of the current default tablespace
1131  *
1132  * Temporary objects have different default tablespaces, hence the
1133  * relpersistence parameter must be specified.  Also, for partitioned tables,
1134  * we disallow specifying the database default, so that needs to be specified
1135  * too.
1136  *
1137  * May return InvalidOid to indicate "use the database's default tablespace".
1138  *
1139  * Note that caller is expected to check appropriate permissions for any
1140  * result other than InvalidOid.
1141  *
1142  * This exists to hide (and possibly optimize the use of) the
1143  * default_tablespace GUC variable.
1144  */
1145 Oid
GetDefaultTablespace(char relpersistence,bool partitioned)1146 GetDefaultTablespace(char relpersistence, bool partitioned)
1147 {
1148 	Oid			result;
1149 
1150 	/* The temp-table case is handled elsewhere */
1151 	if (relpersistence == RELPERSISTENCE_TEMP)
1152 	{
1153 		PrepareTempTablespaces();
1154 		return GetNextTempTableSpace();
1155 	}
1156 
1157 	/* Fast path for default_tablespace == "" */
1158 	if (default_tablespace == NULL || default_tablespace[0] == '\0')
1159 		return InvalidOid;
1160 
1161 	/*
1162 	 * It is tempting to cache this lookup for more speed, but then we would
1163 	 * fail to detect the case where the tablespace was dropped since the GUC
1164 	 * variable was set.  Note also that we don't complain if the value fails
1165 	 * to refer to an existing tablespace; we just silently return InvalidOid,
1166 	 * causing the new object to be created in the database's tablespace.
1167 	 */
1168 	result = get_tablespace_oid(default_tablespace, true);
1169 
1170 	/*
1171 	 * Allow explicit specification of database's default tablespace in
1172 	 * default_tablespace without triggering permissions checks.  Don't allow
1173 	 * specifying that when creating a partitioned table, however, since the
1174 	 * result is confusing.
1175 	 */
1176 	if (result == MyDatabaseTableSpace)
1177 	{
1178 		if (partitioned)
1179 			ereport(ERROR,
1180 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1181 					 errmsg("cannot specify default tablespace for partitioned relations")));
1182 		result = InvalidOid;
1183 	}
1184 	return result;
1185 }
1186 
1187 
1188 /*
1189  * Routines for handling the GUC variable 'temp_tablespaces'.
1190  */
1191 
1192 typedef struct
1193 {
1194 	/* Array of OIDs to be passed to SetTempTablespaces() */
1195 	int			numSpcs;
1196 	Oid			tblSpcs[FLEXIBLE_ARRAY_MEMBER];
1197 } temp_tablespaces_extra;
1198 
1199 /* check_hook: validate new temp_tablespaces */
1200 bool
check_temp_tablespaces(char ** newval,void ** extra,GucSource source)1201 check_temp_tablespaces(char **newval, void **extra, GucSource source)
1202 {
1203 	char	   *rawname;
1204 	List	   *namelist;
1205 
1206 	/* Need a modifiable copy of string */
1207 	rawname = pstrdup(*newval);
1208 
1209 	/* Parse string into list of identifiers */
1210 	if (!SplitIdentifierString(rawname, ',', &namelist))
1211 	{
1212 		/* syntax error in name list */
1213 		GUC_check_errdetail("List syntax is invalid.");
1214 		pfree(rawname);
1215 		list_free(namelist);
1216 		return false;
1217 	}
1218 
1219 	/*
1220 	 * If we aren't inside a transaction, or connected to a database, we
1221 	 * cannot do the catalog accesses necessary to verify the name.  Must
1222 	 * accept the value on faith. Fortunately, there's then also no need to
1223 	 * pass the data to fd.c.
1224 	 */
1225 	if (IsTransactionState() && MyDatabaseId != InvalidOid)
1226 	{
1227 		temp_tablespaces_extra *myextra;
1228 		Oid		   *tblSpcs;
1229 		int			numSpcs;
1230 		ListCell   *l;
1231 
1232 		/* temporary workspace until we are done verifying the list */
1233 		tblSpcs = (Oid *) palloc(list_length(namelist) * sizeof(Oid));
1234 		numSpcs = 0;
1235 		foreach(l, namelist)
1236 		{
1237 			char	   *curname = (char *) lfirst(l);
1238 			Oid			curoid;
1239 			AclResult	aclresult;
1240 
1241 			/* Allow an empty string (signifying database default) */
1242 			if (curname[0] == '\0')
1243 			{
1244 				/* InvalidOid signifies database's default tablespace */
1245 				tblSpcs[numSpcs++] = InvalidOid;
1246 				continue;
1247 			}
1248 
1249 			/*
1250 			 * In an interactive SET command, we ereport for bad info.  When
1251 			 * source == PGC_S_TEST, don't throw a hard error for a
1252 			 * nonexistent tablespace, only a NOTICE.  See comments in guc.h.
1253 			 */
1254 			curoid = get_tablespace_oid(curname, source <= PGC_S_TEST);
1255 			if (curoid == InvalidOid)
1256 			{
1257 				if (source == PGC_S_TEST)
1258 					ereport(NOTICE,
1259 							(errcode(ERRCODE_UNDEFINED_OBJECT),
1260 							 errmsg("tablespace \"%s\" does not exist",
1261 									curname)));
1262 				continue;
1263 			}
1264 
1265 			/*
1266 			 * Allow explicit specification of database's default tablespace
1267 			 * in temp_tablespaces without triggering permissions checks.
1268 			 */
1269 			if (curoid == MyDatabaseTableSpace)
1270 			{
1271 				/* InvalidOid signifies database's default tablespace */
1272 				tblSpcs[numSpcs++] = InvalidOid;
1273 				continue;
1274 			}
1275 
1276 			/* Check permissions, similarly complaining only if interactive */
1277 			aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1278 											   ACL_CREATE);
1279 			if (aclresult != ACLCHECK_OK)
1280 			{
1281 				if (source >= PGC_S_INTERACTIVE)
1282 					aclcheck_error(aclresult, OBJECT_TABLESPACE, curname);
1283 				continue;
1284 			}
1285 
1286 			tblSpcs[numSpcs++] = curoid;
1287 		}
1288 
1289 		/* Now prepare an "extra" struct for assign_temp_tablespaces */
1290 		myextra = malloc(offsetof(temp_tablespaces_extra, tblSpcs) +
1291 						 numSpcs * sizeof(Oid));
1292 		if (!myextra)
1293 			return false;
1294 		myextra->numSpcs = numSpcs;
1295 		memcpy(myextra->tblSpcs, tblSpcs, numSpcs * sizeof(Oid));
1296 		*extra = (void *) myextra;
1297 
1298 		pfree(tblSpcs);
1299 	}
1300 
1301 	pfree(rawname);
1302 	list_free(namelist);
1303 
1304 	return true;
1305 }
1306 
1307 /* assign_hook: do extra actions as needed */
1308 void
assign_temp_tablespaces(const char * newval,void * extra)1309 assign_temp_tablespaces(const char *newval, void *extra)
1310 {
1311 	temp_tablespaces_extra *myextra = (temp_tablespaces_extra *) extra;
1312 
1313 	/*
1314 	 * If check_temp_tablespaces was executed inside a transaction, then pass
1315 	 * the list it made to fd.c.  Otherwise, clear fd.c's list; we must be
1316 	 * still outside a transaction, or else restoring during transaction exit,
1317 	 * and in either case we can just let the next PrepareTempTablespaces call
1318 	 * make things sane.
1319 	 */
1320 	if (myextra)
1321 		SetTempTablespaces(myextra->tblSpcs, myextra->numSpcs);
1322 	else
1323 		SetTempTablespaces(NULL, 0);
1324 }
1325 
1326 /*
1327  * PrepareTempTablespaces -- prepare to use temp tablespaces
1328  *
1329  * If we have not already done so in the current transaction, parse the
1330  * temp_tablespaces GUC variable and tell fd.c which tablespace(s) to use
1331  * for temp files.
1332  */
1333 void
PrepareTempTablespaces(void)1334 PrepareTempTablespaces(void)
1335 {
1336 	char	   *rawname;
1337 	List	   *namelist;
1338 	Oid		   *tblSpcs;
1339 	int			numSpcs;
1340 	ListCell   *l;
1341 
1342 	/* No work if already done in current transaction */
1343 	if (TempTablespacesAreSet())
1344 		return;
1345 
1346 	/*
1347 	 * Can't do catalog access unless within a transaction.  This is just a
1348 	 * safety check in case this function is called by low-level code that
1349 	 * could conceivably execute outside a transaction.  Note that in such a
1350 	 * scenario, fd.c will fall back to using the current database's default
1351 	 * tablespace, which should always be OK.
1352 	 */
1353 	if (!IsTransactionState())
1354 		return;
1355 
1356 	/* Need a modifiable copy of string */
1357 	rawname = pstrdup(temp_tablespaces);
1358 
1359 	/* Parse string into list of identifiers */
1360 	if (!SplitIdentifierString(rawname, ',', &namelist))
1361 	{
1362 		/* syntax error in name list */
1363 		SetTempTablespaces(NULL, 0);
1364 		pfree(rawname);
1365 		list_free(namelist);
1366 		return;
1367 	}
1368 
1369 	/* Store tablespace OIDs in an array in TopTransactionContext */
1370 	tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1371 										 list_length(namelist) * sizeof(Oid));
1372 	numSpcs = 0;
1373 	foreach(l, namelist)
1374 	{
1375 		char	   *curname = (char *) lfirst(l);
1376 		Oid			curoid;
1377 		AclResult	aclresult;
1378 
1379 		/* Allow an empty string (signifying database default) */
1380 		if (curname[0] == '\0')
1381 		{
1382 			/* InvalidOid signifies database's default tablespace */
1383 			tblSpcs[numSpcs++] = InvalidOid;
1384 			continue;
1385 		}
1386 
1387 		/* Else verify that name is a valid tablespace name */
1388 		curoid = get_tablespace_oid(curname, true);
1389 		if (curoid == InvalidOid)
1390 		{
1391 			/* Skip any bad list elements */
1392 			continue;
1393 		}
1394 
1395 		/*
1396 		 * Allow explicit specification of database's default tablespace in
1397 		 * temp_tablespaces without triggering permissions checks.
1398 		 */
1399 		if (curoid == MyDatabaseTableSpace)
1400 		{
1401 			/* InvalidOid signifies database's default tablespace */
1402 			tblSpcs[numSpcs++] = InvalidOid;
1403 			continue;
1404 		}
1405 
1406 		/* Check permissions similarly */
1407 		aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1408 										   ACL_CREATE);
1409 		if (aclresult != ACLCHECK_OK)
1410 			continue;
1411 
1412 		tblSpcs[numSpcs++] = curoid;
1413 	}
1414 
1415 	SetTempTablespaces(tblSpcs, numSpcs);
1416 
1417 	pfree(rawname);
1418 	list_free(namelist);
1419 }
1420 
1421 
1422 /*
1423  * get_tablespace_oid - given a tablespace name, look up the OID
1424  *
1425  * If missing_ok is false, throw an error if tablespace name not found.  If
1426  * true, just return InvalidOid.
1427  */
1428 Oid
get_tablespace_oid(const char * tablespacename,bool missing_ok)1429 get_tablespace_oid(const char *tablespacename, bool missing_ok)
1430 {
1431 	Oid			result;
1432 	Relation	rel;
1433 	TableScanDesc scandesc;
1434 	HeapTuple	tuple;
1435 	ScanKeyData entry[1];
1436 
1437 	/*
1438 	 * Search pg_tablespace.  We use a heapscan here even though there is an
1439 	 * index on name, on the theory that pg_tablespace will usually have just
1440 	 * a few entries and so an indexed lookup is a waste of effort.
1441 	 */
1442 	rel = table_open(TableSpaceRelationId, AccessShareLock);
1443 
1444 	ScanKeyInit(&entry[0],
1445 				Anum_pg_tablespace_spcname,
1446 				BTEqualStrategyNumber, F_NAMEEQ,
1447 				CStringGetDatum(tablespacename));
1448 	scandesc = table_beginscan_catalog(rel, 1, entry);
1449 	tuple = heap_getnext(scandesc, ForwardScanDirection);
1450 
1451 	/* We assume that there can be at most one matching tuple */
1452 	if (HeapTupleIsValid(tuple))
1453 		result = ((Form_pg_tablespace) GETSTRUCT(tuple))->oid;
1454 	else
1455 		result = InvalidOid;
1456 
1457 	table_endscan(scandesc);
1458 	table_close(rel, AccessShareLock);
1459 
1460 	if (!OidIsValid(result) && !missing_ok)
1461 		ereport(ERROR,
1462 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1463 				 errmsg("tablespace \"%s\" does not exist",
1464 						tablespacename)));
1465 
1466 	return result;
1467 }
1468 
1469 /*
1470  * get_tablespace_name - given a tablespace OID, look up the name
1471  *
1472  * Returns a palloc'd string, or NULL if no such tablespace.
1473  */
1474 char *
get_tablespace_name(Oid spc_oid)1475 get_tablespace_name(Oid spc_oid)
1476 {
1477 	char	   *result;
1478 	Relation	rel;
1479 	TableScanDesc scandesc;
1480 	HeapTuple	tuple;
1481 	ScanKeyData entry[1];
1482 
1483 	/*
1484 	 * Search pg_tablespace.  We use a heapscan here even though there is an
1485 	 * index on oid, on the theory that pg_tablespace will usually have just a
1486 	 * few entries and so an indexed lookup is a waste of effort.
1487 	 */
1488 	rel = table_open(TableSpaceRelationId, AccessShareLock);
1489 
1490 	ScanKeyInit(&entry[0],
1491 				Anum_pg_tablespace_oid,
1492 				BTEqualStrategyNumber, F_OIDEQ,
1493 				ObjectIdGetDatum(spc_oid));
1494 	scandesc = table_beginscan_catalog(rel, 1, entry);
1495 	tuple = heap_getnext(scandesc, ForwardScanDirection);
1496 
1497 	/* We assume that there can be at most one matching tuple */
1498 	if (HeapTupleIsValid(tuple))
1499 		result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname));
1500 	else
1501 		result = NULL;
1502 
1503 	table_endscan(scandesc);
1504 	table_close(rel, AccessShareLock);
1505 
1506 	return result;
1507 }
1508 
1509 
1510 /*
1511  * TABLESPACE resource manager's routines
1512  */
1513 void
tblspc_redo(XLogReaderState * record)1514 tblspc_redo(XLogReaderState *record)
1515 {
1516 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1517 
1518 	/* Backup blocks are not used in tblspc records */
1519 	Assert(!XLogRecHasAnyBlockRefs(record));
1520 
1521 	if (info == XLOG_TBLSPC_CREATE)
1522 	{
1523 		xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
1524 		char	   *location = xlrec->ts_path;
1525 
1526 		create_tablespace_directories(location, xlrec->ts_id);
1527 	}
1528 	else if (info == XLOG_TBLSPC_DROP)
1529 	{
1530 		xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);
1531 
1532 		/*
1533 		 * If we issued a WAL record for a drop tablespace it implies that
1534 		 * there were no files in it at all when the DROP was done. That means
1535 		 * that no permanent objects can exist in it at this point.
1536 		 *
1537 		 * It is possible for standby users to be using this tablespace as a
1538 		 * location for their temporary files, so if we fail to remove all
1539 		 * files then do conflict processing and try again, if currently
1540 		 * enabled.
1541 		 *
1542 		 * Other possible reasons for failure include bollixed file
1543 		 * permissions on a standby server when they were okay on the primary,
1544 		 * etc etc. There's not much we can do about that, so just remove what
1545 		 * we can and press on.
1546 		 */
1547 		if (!destroy_tablespace_directories(xlrec->ts_id, true))
1548 		{
1549 			ResolveRecoveryConflictWithTablespace(xlrec->ts_id);
1550 
1551 			/*
1552 			 * If we did recovery processing then hopefully the backends who
1553 			 * wrote temp files should have cleaned up and exited by now.  So
1554 			 * retry before complaining.  If we fail again, this is just a LOG
1555 			 * condition, because it's not worth throwing an ERROR for (as
1556 			 * that would crash the database and require manual intervention
1557 			 * before we could get past this WAL record on restart).
1558 			 */
1559 			if (!destroy_tablespace_directories(xlrec->ts_id, true))
1560 				ereport(LOG,
1561 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1562 						 errmsg("directories for tablespace %u could not be removed",
1563 								xlrec->ts_id),
1564 						 errhint("You can remove the directories manually if necessary.")));
1565 		}
1566 	}
1567 	else
1568 		elog(PANIC, "tblspc_redo: unknown op code %u", info);
1569 }
1570