1 /*-------------------------------------------------------------------------
2  *
3  * tablespace.c
4  *	  Commands to manipulate table spaces
5  *
6  * Tablespaces in PostgreSQL are designed to allow users to determine
7  * where the data file(s) for a given database object reside on the file
8  * system.
9  *
10  * A tablespace represents a directory on the file system. At tablespace
11  * creation time, the directory must be empty. To simplify things and
12  * remove the possibility of having file name conflicts, we isolate
13  * files within a tablespace into database-specific subdirectories.
14  *
15  * To support file access via the information given in RelFileNode, we
16  * maintain a symbolic-link map in $PGDATA/pg_tblspc. The symlinks are
17  * named by tablespace OIDs and point to the actual tablespace directories.
18  * There is also a per-cluster version directory in each tablespace.
19  * Thus the full path to an arbitrary file is
20  *			$PGDATA/pg_tblspc/spcoid/PG_MAJORVER_CATVER/dboid/relfilenode
21  * e.g.
22  *			$PGDATA/pg_tblspc/20981/PG_9.0_201002161/719849/83292814
23  *
24  * There are two tablespaces created at initdb time: pg_global (for shared
25  * tables) and pg_default (for everything else).  For backwards compatibility
26  * and to remain functional on platforms without symlinks, these tablespaces
27  * are accessed specially: they are respectively
28  *			$PGDATA/global/relfilenode
29  *			$PGDATA/base/dboid/relfilenode
30  *
31  * To allow CREATE DATABASE to give a new database a default tablespace
32  * that's different from the template database's default, we make the
33  * provision that a zero in pg_class.reltablespace means the database's
34  * default tablespace.  Without this, CREATE DATABASE would have to go in
35  * and munge the system catalogs of the new database.
36  *
37  *
38  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
39  * Portions Copyright (c) 1994, Regents of the University of California
40  *
41  *
42  * IDENTIFICATION
43  *	  src/backend/commands/tablespace.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <unistd.h>
50 #include <dirent.h>
51 #include <sys/types.h>
52 #include <sys/stat.h>
53 
54 #include "access/heapam.h"
55 #include "access/reloptions.h"
56 #include "access/htup_details.h"
57 #include "access/sysattr.h"
58 #include "access/xact.h"
59 #include "access/xlog.h"
60 #include "access/xloginsert.h"
61 #include "catalog/catalog.h"
62 #include "catalog/dependency.h"
63 #include "catalog/indexing.h"
64 #include "catalog/namespace.h"
65 #include "catalog/objectaccess.h"
66 #include "catalog/pg_namespace.h"
67 #include "catalog/pg_tablespace.h"
68 #include "commands/comment.h"
69 #include "commands/seclabel.h"
70 #include "commands/tablecmds.h"
71 #include "commands/tablespace.h"
72 #include "miscadmin.h"
73 #include "postmaster/bgwriter.h"
74 #include "storage/fd.h"
75 #include "storage/lmgr.h"
76 #include "storage/standby.h"
77 #include "utils/acl.h"
78 #include "utils/builtins.h"
79 #include "utils/fmgroids.h"
80 #include "utils/guc.h"
81 #include "utils/lsyscache.h"
82 #include "utils/memutils.h"
83 #include "utils/rel.h"
84 #include "utils/tqual.h"
85 
86 
87 /* GUC variables */
88 char	   *default_tablespace = NULL;
89 char	   *temp_tablespaces = NULL;
90 
91 
92 static void create_tablespace_directories(const char *location,
93 							  const Oid tablespaceoid);
94 static bool destroy_tablespace_directories(Oid tablespaceoid, bool redo);
95 
96 
97 /*
98  * Each database using a table space is isolated into its own name space
99  * by a subdirectory named for the database OID.  On first creation of an
100  * object in the tablespace, create the subdirectory.  If the subdirectory
101  * already exists, fall through quietly.
102  *
103  * isRedo indicates that we are creating an object during WAL replay.
104  * In this case we will cope with the possibility of the tablespace
105  * directory not being there either --- this could happen if we are
106  * replaying an operation on a table in a subsequently-dropped tablespace.
107  * We handle this by making a directory in the place where the tablespace
108  * symlink would normally be.  This isn't an exact replay of course, but
109  * it's the best we can do given the available information.
110  *
111  * If tablespaces are not supported, we still need it in case we have to
112  * re-create a database subdirectory (of $PGDATA/base) during WAL replay.
113  */
114 void
TablespaceCreateDbspace(Oid spcNode,Oid dbNode,bool isRedo)115 TablespaceCreateDbspace(Oid spcNode, Oid dbNode, bool isRedo)
116 {
117 	struct stat st;
118 	char	   *dir;
119 
120 	/*
121 	 * The global tablespace doesn't have per-database subdirectories, so
122 	 * nothing to do for it.
123 	 */
124 	if (spcNode == GLOBALTABLESPACE_OID)
125 		return;
126 
127 	Assert(OidIsValid(spcNode));
128 	Assert(OidIsValid(dbNode));
129 
130 	dir = GetDatabasePath(dbNode, spcNode);
131 
132 	if (stat(dir, &st) < 0)
133 	{
134 		/* Directory does not exist? */
135 		if (errno == ENOENT)
136 		{
137 			/*
138 			 * Acquire TablespaceCreateLock to ensure that no DROP TABLESPACE
139 			 * or TablespaceCreateDbspace is running concurrently.
140 			 */
141 			LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
142 
143 			/*
144 			 * Recheck to see if someone created the directory while we were
145 			 * waiting for lock.
146 			 */
147 			if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode))
148 			{
149 				/* Directory was created */
150 			}
151 			else
152 			{
153 				/* Directory creation failed? */
154 				if (mkdir(dir, S_IRWXU) < 0)
155 				{
156 					char	   *parentdir;
157 
158 					/* Failure other than not exists or not in WAL replay? */
159 					if (errno != ENOENT || !isRedo)
160 						ereport(ERROR,
161 								(errcode_for_file_access(),
162 							  errmsg("could not create directory \"%s\": %m",
163 									 dir)));
164 
165 					/*
166 					 * Parent directories are missing during WAL replay, so
167 					 * continue by creating simple parent directories rather
168 					 * than a symlink.
169 					 */
170 
171 					/* create two parents up if not exist */
172 					parentdir = pstrdup(dir);
173 					get_parent_directory(parentdir);
174 					get_parent_directory(parentdir);
175 					/* Can't create parent and it doesn't already exist? */
176 					if (mkdir(parentdir, S_IRWXU) < 0 && errno != EEXIST)
177 						ereport(ERROR,
178 								(errcode_for_file_access(),
179 							  errmsg("could not create directory \"%s\": %m",
180 									 parentdir)));
181 					pfree(parentdir);
182 
183 					/* create one parent up if not exist */
184 					parentdir = pstrdup(dir);
185 					get_parent_directory(parentdir);
186 					/* Can't create parent and it doesn't already exist? */
187 					if (mkdir(parentdir, S_IRWXU) < 0 && errno != EEXIST)
188 						ereport(ERROR,
189 								(errcode_for_file_access(),
190 							  errmsg("could not create directory \"%s\": %m",
191 									 parentdir)));
192 					pfree(parentdir);
193 
194 					/* Create database directory */
195 					if (mkdir(dir, S_IRWXU) < 0)
196 						ereport(ERROR,
197 								(errcode_for_file_access(),
198 							  errmsg("could not create directory \"%s\": %m",
199 									 dir)));
200 				}
201 			}
202 
203 			LWLockRelease(TablespaceCreateLock);
204 		}
205 		else
206 		{
207 			ereport(ERROR,
208 					(errcode_for_file_access(),
209 					 errmsg("could not stat directory \"%s\": %m", dir)));
210 		}
211 	}
212 	else
213 	{
214 		/* Is it not a directory? */
215 		if (!S_ISDIR(st.st_mode))
216 			ereport(ERROR,
217 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
218 					 errmsg("\"%s\" exists but is not a directory",
219 							dir)));
220 	}
221 
222 	pfree(dir);
223 }
224 
225 /*
226  * Create a table space
227  *
228  * Only superusers can create a tablespace. This seems a reasonable restriction
229  * since we're determining the system layout and, anyway, we probably have
230  * root if we're doing this kind of activity
231  */
232 Oid
CreateTableSpace(CreateTableSpaceStmt * stmt)233 CreateTableSpace(CreateTableSpaceStmt *stmt)
234 {
235 #ifdef HAVE_SYMLINK
236 	Relation	rel;
237 	Datum		values[Natts_pg_tablespace];
238 	bool		nulls[Natts_pg_tablespace];
239 	HeapTuple	tuple;
240 	Oid			tablespaceoid;
241 	char	   *location;
242 	Oid			ownerId;
243 	Datum		newOptions;
244 
245 	/* Must be super user */
246 	if (!superuser())
247 		ereport(ERROR,
248 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
249 				 errmsg("permission denied to create tablespace \"%s\"",
250 						stmt->tablespacename),
251 				 errhint("Must be superuser to create a tablespace.")));
252 
253 	/* However, the eventual owner of the tablespace need not be */
254 	if (stmt->owner)
255 		ownerId = get_rolespec_oid(stmt->owner, false);
256 	else
257 		ownerId = GetUserId();
258 
259 	/* Unix-ify the offered path, and strip any trailing slashes */
260 	location = pstrdup(stmt->location);
261 	canonicalize_path(location);
262 
263 	/* disallow quotes, else CREATE DATABASE would be at risk */
264 	if (strchr(location, '\''))
265 		ereport(ERROR,
266 				(errcode(ERRCODE_INVALID_NAME),
267 				 errmsg("tablespace location cannot contain single quotes")));
268 
269 	/*
270 	 * Allowing relative paths seems risky
271 	 *
272 	 * this also helps us ensure that location is not empty or whitespace
273 	 */
274 	if (!is_absolute_path(location))
275 		ereport(ERROR,
276 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
277 				 errmsg("tablespace location must be an absolute path")));
278 
279 	/*
280 	 * Check that location isn't too long. Remember that we're going to append
281 	 * 'PG_XXX/<dboid>/<relid>_<fork>.<nnn>'.  FYI, we never actually
282 	 * reference the whole path here, but mkdir() uses the first two parts.
283 	 */
284 	if (strlen(location) + 1 + strlen(TABLESPACE_VERSION_DIRECTORY) + 1 +
285 	  OIDCHARS + 1 + OIDCHARS + 1 + FORKNAMECHARS + 1 + OIDCHARS > MAXPGPATH)
286 		ereport(ERROR,
287 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
288 				 errmsg("tablespace location \"%s\" is too long",
289 						location)));
290 
291 	/* Warn if the tablespace is in the data directory. */
292 	if (path_is_prefix_of_path(DataDir, location))
293 		ereport(WARNING,
294 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
295 				 errmsg("tablespace location should not be inside the data directory")));
296 
297 	/*
298 	 * Disallow creation of tablespaces named "pg_xxx"; we reserve this
299 	 * namespace for system purposes.
300 	 */
301 	if (!allowSystemTableMods && IsReservedName(stmt->tablespacename))
302 		ereport(ERROR,
303 				(errcode(ERRCODE_RESERVED_NAME),
304 				 errmsg("unacceptable tablespace name \"%s\"",
305 						stmt->tablespacename),
306 		errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
307 
308 	/*
309 	 * Check that there is no other tablespace by this name.  (The unique
310 	 * index would catch this anyway, but might as well give a friendlier
311 	 * message.)
312 	 */
313 	if (OidIsValid(get_tablespace_oid(stmt->tablespacename, true)))
314 		ereport(ERROR,
315 				(errcode(ERRCODE_DUPLICATE_OBJECT),
316 				 errmsg("tablespace \"%s\" already exists",
317 						stmt->tablespacename)));
318 
319 	/*
320 	 * Insert tuple into pg_tablespace.  The purpose of doing this first is to
321 	 * lock the proposed tablename against other would-be creators. The
322 	 * insertion will roll back if we find problems below.
323 	 */
324 	rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
325 
326 	MemSet(nulls, false, sizeof(nulls));
327 
328 	values[Anum_pg_tablespace_spcname - 1] =
329 		DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename));
330 	values[Anum_pg_tablespace_spcowner - 1] =
331 		ObjectIdGetDatum(ownerId);
332 	nulls[Anum_pg_tablespace_spcacl - 1] = true;
333 
334 	/* Generate new proposed spcoptions (text array) */
335 	newOptions = transformRelOptions((Datum) 0,
336 									 stmt->options,
337 									 NULL, NULL, false, false);
338 	(void) tablespace_reloptions(newOptions, true);
339 	if (newOptions != (Datum) 0)
340 		values[Anum_pg_tablespace_spcoptions - 1] = newOptions;
341 	else
342 		nulls[Anum_pg_tablespace_spcoptions - 1] = true;
343 
344 	tuple = heap_form_tuple(rel->rd_att, values, nulls);
345 
346 	tablespaceoid = simple_heap_insert(rel, tuple);
347 
348 	CatalogUpdateIndexes(rel, tuple);
349 
350 	heap_freetuple(tuple);
351 
352 	/* Record dependency on owner */
353 	recordDependencyOnOwner(TableSpaceRelationId, tablespaceoid, ownerId);
354 
355 	/* Post creation hook for new tablespace */
356 	InvokeObjectPostCreateHook(TableSpaceRelationId, tablespaceoid, 0);
357 
358 	create_tablespace_directories(location, tablespaceoid);
359 
360 	/* Record the filesystem change in XLOG */
361 	{
362 		xl_tblspc_create_rec xlrec;
363 
364 		xlrec.ts_id = tablespaceoid;
365 
366 		XLogBeginInsert();
367 		XLogRegisterData((char *) &xlrec,
368 						 offsetof(xl_tblspc_create_rec, ts_path));
369 		XLogRegisterData((char *) location, strlen(location) + 1);
370 
371 		(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_CREATE);
372 	}
373 
374 	/*
375 	 * Force synchronous commit, to minimize the window between creating the
376 	 * symlink on-disk and marking the transaction committed.  It's not great
377 	 * that there is any window at all, but definitely we don't want to make
378 	 * it larger than necessary.
379 	 */
380 	ForceSyncCommit();
381 
382 	pfree(location);
383 
384 	/* We keep the lock on pg_tablespace until commit */
385 	heap_close(rel, NoLock);
386 
387 	return tablespaceoid;
388 #else							/* !HAVE_SYMLINK */
389 	ereport(ERROR,
390 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
391 			 errmsg("tablespaces are not supported on this platform")));
392 	return InvalidOid;			/* keep compiler quiet */
393 #endif   /* HAVE_SYMLINK */
394 }
395 
396 /*
397  * Drop a table space
398  *
399  * Be careful to check that the tablespace is empty.
400  */
401 void
DropTableSpace(DropTableSpaceStmt * stmt)402 DropTableSpace(DropTableSpaceStmt *stmt)
403 {
404 #ifdef HAVE_SYMLINK
405 	char	   *tablespacename = stmt->tablespacename;
406 	HeapScanDesc scandesc;
407 	Relation	rel;
408 	HeapTuple	tuple;
409 	ScanKeyData entry[1];
410 	Oid			tablespaceoid;
411 
412 	/*
413 	 * Find the target tuple
414 	 */
415 	rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
416 
417 	ScanKeyInit(&entry[0],
418 				Anum_pg_tablespace_spcname,
419 				BTEqualStrategyNumber, F_NAMEEQ,
420 				CStringGetDatum(tablespacename));
421 	scandesc = heap_beginscan_catalog(rel, 1, entry);
422 	tuple = heap_getnext(scandesc, ForwardScanDirection);
423 
424 	if (!HeapTupleIsValid(tuple))
425 	{
426 		if (!stmt->missing_ok)
427 		{
428 			ereport(ERROR,
429 					(errcode(ERRCODE_UNDEFINED_OBJECT),
430 					 errmsg("tablespace \"%s\" does not exist",
431 							tablespacename)));
432 		}
433 		else
434 		{
435 			ereport(NOTICE,
436 					(errmsg("tablespace \"%s\" does not exist, skipping",
437 							tablespacename)));
438 			/* XXX I assume I need one or both of these next two calls */
439 			heap_endscan(scandesc);
440 			heap_close(rel, NoLock);
441 		}
442 		return;
443 	}
444 
445 	tablespaceoid = HeapTupleGetOid(tuple);
446 
447 	/* Must be tablespace owner */
448 	if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
449 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
450 					   tablespacename);
451 
452 	/* Disallow drop of the standard tablespaces, even by superuser */
453 	if (tablespaceoid == GLOBALTABLESPACE_OID ||
454 		tablespaceoid == DEFAULTTABLESPACE_OID)
455 		aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE,
456 					   tablespacename);
457 
458 	/* DROP hook for the tablespace being removed */
459 	InvokeObjectDropHook(TableSpaceRelationId, tablespaceoid, 0);
460 
461 	/*
462 	 * Remove the pg_tablespace tuple (this will roll back if we fail below)
463 	 */
464 	simple_heap_delete(rel, &tuple->t_self);
465 
466 	heap_endscan(scandesc);
467 
468 	/*
469 	 * Remove any comments or security labels on this tablespace.
470 	 */
471 	DeleteSharedComments(tablespaceoid, TableSpaceRelationId);
472 	DeleteSharedSecurityLabel(tablespaceoid, TableSpaceRelationId);
473 
474 	/*
475 	 * Remove dependency on owner.
476 	 */
477 	deleteSharedDependencyRecordsFor(TableSpaceRelationId, tablespaceoid, 0);
478 
479 	/*
480 	 * Acquire TablespaceCreateLock to ensure that no TablespaceCreateDbspace
481 	 * is running concurrently.
482 	 */
483 	LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
484 
485 	/*
486 	 * Try to remove the physical infrastructure.
487 	 */
488 	if (!destroy_tablespace_directories(tablespaceoid, false))
489 	{
490 		/*
491 		 * Not all files deleted?  However, there can be lingering empty files
492 		 * in the directories, left behind by for example DROP TABLE, that
493 		 * have been scheduled for deletion at next checkpoint (see comments
494 		 * in mdunlink() for details).  We could just delete them immediately,
495 		 * but we can't tell them apart from important data files that we
496 		 * mustn't delete.  So instead, we force a checkpoint which will clean
497 		 * out any lingering files, and try again.
498 		 *
499 		 * XXX On Windows, an unlinked file persists in the directory listing
500 		 * until no process retains an open handle for the file.  The DDL
501 		 * commands that schedule files for unlink send invalidation messages
502 		 * directing other PostgreSQL processes to close the files.  DROP
503 		 * TABLESPACE should not give up on the tablespace becoming empty
504 		 * until all relevant invalidation processing is complete.
505 		 */
506 		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
507 		if (!destroy_tablespace_directories(tablespaceoid, false))
508 		{
509 			/* Still not empty, the files must be important then */
510 			ereport(ERROR,
511 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
512 					 errmsg("tablespace \"%s\" is not empty",
513 							tablespacename)));
514 		}
515 	}
516 
517 	/* Record the filesystem change in XLOG */
518 	{
519 		xl_tblspc_drop_rec xlrec;
520 
521 		xlrec.ts_id = tablespaceoid;
522 
523 		XLogBeginInsert();
524 		XLogRegisterData((char *) &xlrec, sizeof(xl_tblspc_drop_rec));
525 
526 		(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP);
527 	}
528 
529 	/*
530 	 * Note: because we checked that the tablespace was empty, there should be
531 	 * no need to worry about flushing shared buffers or free space map
532 	 * entries for relations in the tablespace.
533 	 */
534 
535 	/*
536 	 * Force synchronous commit, to minimize the window between removing the
537 	 * files on-disk and marking the transaction committed.  It's not great
538 	 * that there is any window at all, but definitely we don't want to make
539 	 * it larger than necessary.
540 	 */
541 	ForceSyncCommit();
542 
543 	/*
544 	 * Allow TablespaceCreateDbspace again.
545 	 */
546 	LWLockRelease(TablespaceCreateLock);
547 
548 	/* We keep the lock on pg_tablespace until commit */
549 	heap_close(rel, NoLock);
550 #else							/* !HAVE_SYMLINK */
551 	ereport(ERROR,
552 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
553 			 errmsg("tablespaces are not supported on this platform")));
554 #endif   /* HAVE_SYMLINK */
555 }
556 
557 
558 /*
559  * create_tablespace_directories
560  *
561  *	Attempt to create filesystem infrastructure linking $PGDATA/pg_tblspc/
562  *	to the specified directory
563  */
564 static void
create_tablespace_directories(const char * location,const Oid tablespaceoid)565 create_tablespace_directories(const char *location, const Oid tablespaceoid)
566 {
567 	char	   *linkloc;
568 	char	   *location_with_version_dir;
569 	struct stat st;
570 
571 	linkloc = psprintf("pg_tblspc/%u", tablespaceoid);
572 	location_with_version_dir = psprintf("%s/%s", location,
573 										 TABLESPACE_VERSION_DIRECTORY);
574 
575 	/*
576 	 * Attempt to coerce target directory to safe permissions.  If this fails,
577 	 * it doesn't exist or has the wrong owner.
578 	 */
579 	if (chmod(location, S_IRWXU) != 0)
580 	{
581 		if (errno == ENOENT)
582 			ereport(ERROR,
583 					(errcode(ERRCODE_UNDEFINED_FILE),
584 					 errmsg("directory \"%s\" does not exist", location),
585 					 InRecovery ? errhint("Create this directory for the tablespace before "
586 										  "restarting the server.") : 0));
587 		else
588 			ereport(ERROR,
589 					(errcode_for_file_access(),
590 				  errmsg("could not set permissions on directory \"%s\": %m",
591 						 location)));
592 	}
593 
594 	/*
595 	 * The creation of the version directory prevents more than one tablespace
596 	 * in a single location.  This imitates TablespaceCreateDbspace(), but it
597 	 * ignores concurrency and missing parent directories.  The chmod() would
598 	 * have failed in the absence of a parent.  pg_tablespace_spcname_index
599 	 * prevents concurrency.
600 	 */
601 	if (stat(location_with_version_dir, &st) < 0)
602 	{
603 		if (errno != ENOENT)
604 			ereport(ERROR,
605 					(errcode_for_file_access(),
606 					 errmsg("could not stat directory \"%s\": %m",
607 							location_with_version_dir)));
608 		else if (mkdir(location_with_version_dir, S_IRWXU) < 0)
609 			ereport(ERROR,
610 					(errcode_for_file_access(),
611 					 errmsg("could not create directory \"%s\": %m",
612 							location_with_version_dir)));
613 	}
614 	else if (!S_ISDIR(st.st_mode))
615 		ereport(ERROR,
616 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
617 				 errmsg("\"%s\" exists but is not a directory",
618 						location_with_version_dir)));
619 	else if (!InRecovery)
620 		ereport(ERROR,
621 				(errcode(ERRCODE_OBJECT_IN_USE),
622 				 errmsg("directory \"%s\" already in use as a tablespace",
623 						location_with_version_dir)));
624 
625 	/*
626 	 * In recovery, remove old symlink, in case it points to the wrong place.
627 	 */
628 	if (InRecovery)
629 		remove_tablespace_symlink(linkloc);
630 
631 	/*
632 	 * Create the symlink under PGDATA
633 	 */
634 	if (symlink(location, linkloc) < 0)
635 		ereport(ERROR,
636 				(errcode_for_file_access(),
637 				 errmsg("could not create symbolic link \"%s\": %m",
638 						linkloc)));
639 
640 	pfree(linkloc);
641 	pfree(location_with_version_dir);
642 }
643 
644 
645 /*
646  * destroy_tablespace_directories
647  *
648  * Attempt to remove filesystem infrastructure for the tablespace.
649  *
650  * 'redo' indicates we are redoing a drop from XLOG; in that case we should
651  * not throw an ERROR for problems, just LOG them.  The worst consequence of
652  * not removing files here would be failure to release some disk space, which
653  * does not justify throwing an error that would require manual intervention
654  * to get the database running again.
655  *
656  * Returns TRUE if successful, FALSE if some subdirectory is not empty
657  */
658 static bool
destroy_tablespace_directories(Oid tablespaceoid,bool redo)659 destroy_tablespace_directories(Oid tablespaceoid, bool redo)
660 {
661 	char	   *linkloc;
662 	char	   *linkloc_with_version_dir;
663 	DIR		   *dirdesc;
664 	struct dirent *de;
665 	char	   *subfile;
666 	struct stat st;
667 
668 	linkloc_with_version_dir = psprintf("pg_tblspc/%u/%s", tablespaceoid,
669 										TABLESPACE_VERSION_DIRECTORY);
670 
671 	/*
672 	 * Check if the tablespace still contains any files.  We try to rmdir each
673 	 * per-database directory we find in it.  rmdir failure implies there are
674 	 * still files in that subdirectory, so give up.  (We do not have to worry
675 	 * about undoing any already completed rmdirs, since the next attempt to
676 	 * use the tablespace from that database will simply recreate the
677 	 * subdirectory via TablespaceCreateDbspace.)
678 	 *
679 	 * Since we hold TablespaceCreateLock, no one else should be creating any
680 	 * fresh subdirectories in parallel. It is possible that new files are
681 	 * being created within subdirectories, though, so the rmdir call could
682 	 * fail.  Worst consequence is a less friendly error message.
683 	 *
684 	 * If redo is true then ENOENT is a likely outcome here, and we allow it
685 	 * to pass without comment.  In normal operation we still allow it, but
686 	 * with a warning.  This is because even though ProcessUtility disallows
687 	 * DROP TABLESPACE in a transaction block, it's possible that a previous
688 	 * DROP failed and rolled back after removing the tablespace directories
689 	 * and/or symlink.  We want to allow a new DROP attempt to succeed at
690 	 * removing the catalog entries (and symlink if still present), so we
691 	 * should not give a hard error here.
692 	 */
693 	dirdesc = AllocateDir(linkloc_with_version_dir);
694 	if (dirdesc == NULL)
695 	{
696 		if (errno == ENOENT)
697 		{
698 			if (!redo)
699 				ereport(WARNING,
700 						(errcode_for_file_access(),
701 						 errmsg("could not open directory \"%s\": %m",
702 								linkloc_with_version_dir)));
703 			/* The symlink might still exist, so go try to remove it */
704 			goto remove_symlink;
705 		}
706 		else if (redo)
707 		{
708 			/* in redo, just log other types of error */
709 			ereport(LOG,
710 					(errcode_for_file_access(),
711 					 errmsg("could not open directory \"%s\": %m",
712 							linkloc_with_version_dir)));
713 			pfree(linkloc_with_version_dir);
714 			return false;
715 		}
716 		/* else let ReadDir report the error */
717 	}
718 
719 	while ((de = ReadDir(dirdesc, linkloc_with_version_dir)) != NULL)
720 	{
721 		if (strcmp(de->d_name, ".") == 0 ||
722 			strcmp(de->d_name, "..") == 0)
723 			continue;
724 
725 		subfile = psprintf("%s/%s", linkloc_with_version_dir, de->d_name);
726 
727 		/* This check is just to deliver a friendlier error message */
728 		if (!redo && !directory_is_empty(subfile))
729 		{
730 			FreeDir(dirdesc);
731 			pfree(subfile);
732 			pfree(linkloc_with_version_dir);
733 			return false;
734 		}
735 
736 		/* remove empty directory */
737 		if (rmdir(subfile) < 0)
738 			ereport(redo ? LOG : ERROR,
739 					(errcode_for_file_access(),
740 					 errmsg("could not remove directory \"%s\": %m",
741 							subfile)));
742 
743 		pfree(subfile);
744 	}
745 
746 	FreeDir(dirdesc);
747 
748 	/* remove version directory */
749 	if (rmdir(linkloc_with_version_dir) < 0)
750 	{
751 		ereport(redo ? LOG : ERROR,
752 				(errcode_for_file_access(),
753 				 errmsg("could not remove directory \"%s\": %m",
754 						linkloc_with_version_dir)));
755 		pfree(linkloc_with_version_dir);
756 		return false;
757 	}
758 
759 	/*
760 	 * Try to remove the symlink.  We must however deal with the possibility
761 	 * that it's a directory instead of a symlink --- this could happen during
762 	 * WAL replay (see TablespaceCreateDbspace), and it is also the case on
763 	 * Windows where junction points lstat() as directories.
764 	 *
765 	 * Note: in the redo case, we'll return true if this final step fails;
766 	 * there's no point in retrying it.  Also, ENOENT should provoke no more
767 	 * than a warning.
768 	 */
769 remove_symlink:
770 	linkloc = pstrdup(linkloc_with_version_dir);
771 	get_parent_directory(linkloc);
772 	if (lstat(linkloc, &st) < 0)
773 	{
774 		int			saved_errno = errno;
775 
776 		ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
777 				(errcode_for_file_access(),
778 				 errmsg("could not stat file \"%s\": %m",
779 						linkloc)));
780 	}
781 	else if (S_ISDIR(st.st_mode))
782 	{
783 		if (rmdir(linkloc) < 0)
784 		{
785 			int			saved_errno = errno;
786 
787 			ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
788 					(errcode_for_file_access(),
789 					 errmsg("could not remove directory \"%s\": %m",
790 							linkloc)));
791 		}
792 	}
793 #ifdef S_ISLNK
794 	else if (S_ISLNK(st.st_mode))
795 	{
796 		if (unlink(linkloc) < 0)
797 		{
798 			int			saved_errno = errno;
799 
800 			ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
801 					(errcode_for_file_access(),
802 					 errmsg("could not remove symbolic link \"%s\": %m",
803 							linkloc)));
804 		}
805 	}
806 #endif
807 	else
808 	{
809 		/* Refuse to remove anything that's not a directory or symlink */
810 		ereport(redo ? LOG : ERROR,
811 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
812 				 errmsg("\"%s\" is not a directory or symbolic link",
813 						linkloc)));
814 	}
815 
816 	pfree(linkloc_with_version_dir);
817 	pfree(linkloc);
818 
819 	return true;
820 }
821 
822 
823 /*
824  * Check if a directory is empty.
825  *
826  * This probably belongs somewhere else, but not sure where...
827  */
828 bool
directory_is_empty(const char * path)829 directory_is_empty(const char *path)
830 {
831 	DIR		   *dirdesc;
832 	struct dirent *de;
833 
834 	dirdesc = AllocateDir(path);
835 
836 	while ((de = ReadDir(dirdesc, path)) != NULL)
837 	{
838 		if (strcmp(de->d_name, ".") == 0 ||
839 			strcmp(de->d_name, "..") == 0)
840 			continue;
841 		FreeDir(dirdesc);
842 		return false;
843 	}
844 
845 	FreeDir(dirdesc);
846 	return true;
847 }
848 
849 /*
850  *	remove_tablespace_symlink
851  *
852  * This function removes symlinks in pg_tblspc.  On Windows, junction points
853  * act like directories so we must be able to apply rmdir.  This function
854  * works like the symlink removal code in destroy_tablespace_directories,
855  * except that failure to remove is always an ERROR.  But if the file doesn't
856  * exist at all, that's OK.
857  */
858 void
remove_tablespace_symlink(const char * linkloc)859 remove_tablespace_symlink(const char *linkloc)
860 {
861 	struct stat st;
862 
863 	if (lstat(linkloc, &st) < 0)
864 	{
865 		if (errno == ENOENT)
866 			return;
867 		ereport(ERROR,
868 				(errcode_for_file_access(),
869 				 errmsg("could not stat file \"%s\": %m", linkloc)));
870 	}
871 
872 	if (S_ISDIR(st.st_mode))
873 	{
874 		/*
875 		 * This will fail if the directory isn't empty, but not if it's a
876 		 * junction point.
877 		 */
878 		if (rmdir(linkloc) < 0 && errno != ENOENT)
879 			ereport(ERROR,
880 					(errcode_for_file_access(),
881 					 errmsg("could not remove directory \"%s\": %m",
882 							linkloc)));
883 	}
884 #ifdef S_ISLNK
885 	else if (S_ISLNK(st.st_mode))
886 	{
887 		if (unlink(linkloc) < 0 && errno != ENOENT)
888 			ereport(ERROR,
889 					(errcode_for_file_access(),
890 					 errmsg("could not remove symbolic link \"%s\": %m",
891 							linkloc)));
892 	}
893 #endif
894 	else
895 	{
896 		/* Refuse to remove anything that's not a directory or symlink */
897 		ereport(ERROR,
898 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
899 				 errmsg("\"%s\" is not a directory or symbolic link",
900 						linkloc)));
901 	}
902 }
903 
904 /*
905  * Rename a tablespace
906  */
907 ObjectAddress
RenameTableSpace(const char * oldname,const char * newname)908 RenameTableSpace(const char *oldname, const char *newname)
909 {
910 	Oid			tspId;
911 	Relation	rel;
912 	ScanKeyData entry[1];
913 	HeapScanDesc scan;
914 	HeapTuple	tup;
915 	HeapTuple	newtuple;
916 	Form_pg_tablespace newform;
917 	ObjectAddress address;
918 
919 	/* Search pg_tablespace */
920 	rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
921 
922 	ScanKeyInit(&entry[0],
923 				Anum_pg_tablespace_spcname,
924 				BTEqualStrategyNumber, F_NAMEEQ,
925 				CStringGetDatum(oldname));
926 	scan = heap_beginscan_catalog(rel, 1, entry);
927 	tup = heap_getnext(scan, ForwardScanDirection);
928 	if (!HeapTupleIsValid(tup))
929 		ereport(ERROR,
930 				(errcode(ERRCODE_UNDEFINED_OBJECT),
931 				 errmsg("tablespace \"%s\" does not exist",
932 						oldname)));
933 
934 	tspId = HeapTupleGetOid(tup);
935 	newtuple = heap_copytuple(tup);
936 	newform = (Form_pg_tablespace) GETSTRUCT(newtuple);
937 
938 	heap_endscan(scan);
939 
940 	/* Must be owner */
941 	if (!pg_tablespace_ownercheck(HeapTupleGetOid(newtuple), GetUserId()))
942 		aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE, oldname);
943 
944 	/* Validate new name */
945 	if (!allowSystemTableMods && IsReservedName(newname))
946 		ereport(ERROR,
947 				(errcode(ERRCODE_RESERVED_NAME),
948 				 errmsg("unacceptable tablespace name \"%s\"", newname),
949 		errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
950 
951 	/* Make sure the new name doesn't exist */
952 	ScanKeyInit(&entry[0],
953 				Anum_pg_tablespace_spcname,
954 				BTEqualStrategyNumber, F_NAMEEQ,
955 				CStringGetDatum(newname));
956 	scan = heap_beginscan_catalog(rel, 1, entry);
957 	tup = heap_getnext(scan, ForwardScanDirection);
958 	if (HeapTupleIsValid(tup))
959 		ereport(ERROR,
960 				(errcode(ERRCODE_DUPLICATE_OBJECT),
961 				 errmsg("tablespace \"%s\" already exists",
962 						newname)));
963 
964 	heap_endscan(scan);
965 
966 	/* OK, update the entry */
967 	namestrcpy(&(newform->spcname), newname);
968 
969 	simple_heap_update(rel, &newtuple->t_self, newtuple);
970 	CatalogUpdateIndexes(rel, newtuple);
971 
972 	InvokeObjectPostAlterHook(TableSpaceRelationId, tspId, 0);
973 
974 	ObjectAddressSet(address, TableSpaceRelationId, tspId);
975 
976 	heap_close(rel, NoLock);
977 
978 	return address;
979 }
980 
981 /*
982  * Alter table space options
983  */
984 Oid
AlterTableSpaceOptions(AlterTableSpaceOptionsStmt * stmt)985 AlterTableSpaceOptions(AlterTableSpaceOptionsStmt *stmt)
986 {
987 	Relation	rel;
988 	ScanKeyData entry[1];
989 	HeapScanDesc scandesc;
990 	HeapTuple	tup;
991 	Oid			tablespaceoid;
992 	Datum		datum;
993 	Datum		newOptions;
994 	Datum		repl_val[Natts_pg_tablespace];
995 	bool		isnull;
996 	bool		repl_null[Natts_pg_tablespace];
997 	bool		repl_repl[Natts_pg_tablespace];
998 	HeapTuple	newtuple;
999 
1000 	/* Search pg_tablespace */
1001 	rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
1002 
1003 	ScanKeyInit(&entry[0],
1004 				Anum_pg_tablespace_spcname,
1005 				BTEqualStrategyNumber, F_NAMEEQ,
1006 				CStringGetDatum(stmt->tablespacename));
1007 	scandesc = heap_beginscan_catalog(rel, 1, entry);
1008 	tup = heap_getnext(scandesc, ForwardScanDirection);
1009 	if (!HeapTupleIsValid(tup))
1010 		ereport(ERROR,
1011 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1012 				 errmsg("tablespace \"%s\" does not exist",
1013 						stmt->tablespacename)));
1014 
1015 	tablespaceoid = HeapTupleGetOid(tup);
1016 
1017 	/* Must be owner of the existing object */
1018 	if (!pg_tablespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
1019 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
1020 					   stmt->tablespacename);
1021 
1022 	/* Generate new proposed spcoptions (text array) */
1023 	datum = heap_getattr(tup, Anum_pg_tablespace_spcoptions,
1024 						 RelationGetDescr(rel), &isnull);
1025 	newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
1026 									 stmt->options, NULL, NULL, false,
1027 									 stmt->isReset);
1028 	(void) tablespace_reloptions(newOptions, true);
1029 
1030 	/* Build new tuple. */
1031 	memset(repl_null, false, sizeof(repl_null));
1032 	memset(repl_repl, false, sizeof(repl_repl));
1033 	if (newOptions != (Datum) 0)
1034 		repl_val[Anum_pg_tablespace_spcoptions - 1] = newOptions;
1035 	else
1036 		repl_null[Anum_pg_tablespace_spcoptions - 1] = true;
1037 	repl_repl[Anum_pg_tablespace_spcoptions - 1] = true;
1038 	newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val,
1039 								 repl_null, repl_repl);
1040 
1041 	/* Update system catalog. */
1042 	simple_heap_update(rel, &newtuple->t_self, newtuple);
1043 	CatalogUpdateIndexes(rel, newtuple);
1044 
1045 	InvokeObjectPostAlterHook(TableSpaceRelationId, HeapTupleGetOid(tup), 0);
1046 
1047 	heap_freetuple(newtuple);
1048 
1049 	/* Conclude heap scan. */
1050 	heap_endscan(scandesc);
1051 	heap_close(rel, NoLock);
1052 
1053 	return tablespaceoid;
1054 }
1055 
1056 /*
1057  * Routines for handling the GUC variable 'default_tablespace'.
1058  */
1059 
1060 /* check_hook: validate new default_tablespace */
1061 bool
check_default_tablespace(char ** newval,void ** extra,GucSource source)1062 check_default_tablespace(char **newval, void **extra, GucSource source)
1063 {
1064 	/*
1065 	 * If we aren't inside a transaction, or connected to a database, we
1066 	 * cannot do the catalog accesses necessary to verify the name.  Must
1067 	 * accept the value on faith.
1068 	 */
1069 	if (IsTransactionState() && MyDatabaseId != InvalidOid)
1070 	{
1071 		if (**newval != '\0' &&
1072 			!OidIsValid(get_tablespace_oid(*newval, true)))
1073 		{
1074 			/*
1075 			 * When source == PGC_S_TEST, don't throw a hard error for a
1076 			 * nonexistent tablespace, only a NOTICE.  See comments in guc.h.
1077 			 */
1078 			if (source == PGC_S_TEST)
1079 			{
1080 				ereport(NOTICE,
1081 						(errcode(ERRCODE_UNDEFINED_OBJECT),
1082 						 errmsg("tablespace \"%s\" does not exist",
1083 								*newval)));
1084 			}
1085 			else
1086 			{
1087 				GUC_check_errdetail("Tablespace \"%s\" does not exist.",
1088 									*newval);
1089 				return false;
1090 			}
1091 		}
1092 	}
1093 
1094 	return true;
1095 }
1096 
1097 /*
1098  * GetDefaultTablespace -- get the OID of the current default tablespace
1099  *
1100  * Temporary objects have different default tablespaces, hence the
1101  * relpersistence parameter must be specified.
1102  *
1103  * May return InvalidOid to indicate "use the database's default tablespace".
1104  *
1105  * Note that caller is expected to check appropriate permissions for any
1106  * result other than InvalidOid.
1107  *
1108  * This exists to hide (and possibly optimize the use of) the
1109  * default_tablespace GUC variable.
1110  */
1111 Oid
GetDefaultTablespace(char relpersistence)1112 GetDefaultTablespace(char relpersistence)
1113 {
1114 	Oid			result;
1115 
1116 	/* The temp-table case is handled elsewhere */
1117 	if (relpersistence == RELPERSISTENCE_TEMP)
1118 	{
1119 		PrepareTempTablespaces();
1120 		return GetNextTempTableSpace();
1121 	}
1122 
1123 	/* Fast path for default_tablespace == "" */
1124 	if (default_tablespace == NULL || default_tablespace[0] == '\0')
1125 		return InvalidOid;
1126 
1127 	/*
1128 	 * It is tempting to cache this lookup for more speed, but then we would
1129 	 * fail to detect the case where the tablespace was dropped since the GUC
1130 	 * variable was set.  Note also that we don't complain if the value fails
1131 	 * to refer to an existing tablespace; we just silently return InvalidOid,
1132 	 * causing the new object to be created in the database's tablespace.
1133 	 */
1134 	result = get_tablespace_oid(default_tablespace, true);
1135 
1136 	/*
1137 	 * Allow explicit specification of database's default tablespace in
1138 	 * default_tablespace without triggering permissions checks.
1139 	 */
1140 	if (result == MyDatabaseTableSpace)
1141 		result = InvalidOid;
1142 	return result;
1143 }
1144 
1145 
1146 /*
1147  * Routines for handling the GUC variable 'temp_tablespaces'.
1148  */
1149 
1150 typedef struct
1151 {
1152 	/* Array of OIDs to be passed to SetTempTablespaces() */
1153 	int			numSpcs;
1154 	Oid			tblSpcs[FLEXIBLE_ARRAY_MEMBER];
1155 } temp_tablespaces_extra;
1156 
1157 /* check_hook: validate new temp_tablespaces */
1158 bool
check_temp_tablespaces(char ** newval,void ** extra,GucSource source)1159 check_temp_tablespaces(char **newval, void **extra, GucSource source)
1160 {
1161 	char	   *rawname;
1162 	List	   *namelist;
1163 
1164 	/* Need a modifiable copy of string */
1165 	rawname = pstrdup(*newval);
1166 
1167 	/* Parse string into list of identifiers */
1168 	if (!SplitIdentifierString(rawname, ',', &namelist))
1169 	{
1170 		/* syntax error in name list */
1171 		GUC_check_errdetail("List syntax is invalid.");
1172 		pfree(rawname);
1173 		list_free(namelist);
1174 		return false;
1175 	}
1176 
1177 	/*
1178 	 * If we aren't inside a transaction, or connected to a database, we
1179 	 * cannot do the catalog accesses necessary to verify the name.  Must
1180 	 * accept the value on faith.
1181 	 * Fortunately, there's then also no need to pass the data to fd.c.
1182 	 */
1183 	if (IsTransactionState() && MyDatabaseId != InvalidOid)
1184 	{
1185 		temp_tablespaces_extra *myextra;
1186 		Oid		   *tblSpcs;
1187 		int			numSpcs;
1188 		ListCell   *l;
1189 
1190 		/* temporary workspace until we are done verifying the list */
1191 		tblSpcs = (Oid *) palloc(list_length(namelist) * sizeof(Oid));
1192 		numSpcs = 0;
1193 		foreach(l, namelist)
1194 		{
1195 			char	   *curname = (char *) lfirst(l);
1196 			Oid			curoid;
1197 			AclResult	aclresult;
1198 
1199 			/* Allow an empty string (signifying database default) */
1200 			if (curname[0] == '\0')
1201 			{
1202 				/* InvalidOid signifies database's default tablespace */
1203 				tblSpcs[numSpcs++] = InvalidOid;
1204 				continue;
1205 			}
1206 
1207 			/*
1208 			 * In an interactive SET command, we ereport for bad info.  When
1209 			 * source == PGC_S_TEST, don't throw a hard error for a
1210 			 * nonexistent tablespace, only a NOTICE.  See comments in guc.h.
1211 			 */
1212 			curoid = get_tablespace_oid(curname, source <= PGC_S_TEST);
1213 			if (curoid == InvalidOid)
1214 			{
1215 				if (source == PGC_S_TEST)
1216 					ereport(NOTICE,
1217 							(errcode(ERRCODE_UNDEFINED_OBJECT),
1218 							 errmsg("tablespace \"%s\" does not exist",
1219 									curname)));
1220 				continue;
1221 			}
1222 
1223 			/*
1224 			 * Allow explicit specification of database's default tablespace
1225 			 * in temp_tablespaces without triggering permissions checks.
1226 			 */
1227 			if (curoid == MyDatabaseTableSpace)
1228 			{
1229 				/* InvalidOid signifies database's default tablespace */
1230 				tblSpcs[numSpcs++] = InvalidOid;
1231 				continue;
1232 			}
1233 
1234 			/* Check permissions, similarly complaining only if interactive */
1235 			aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1236 											   ACL_CREATE);
1237 			if (aclresult != ACLCHECK_OK)
1238 			{
1239 				if (source >= PGC_S_INTERACTIVE)
1240 					aclcheck_error(aclresult, ACL_KIND_TABLESPACE, curname);
1241 				continue;
1242 			}
1243 
1244 			tblSpcs[numSpcs++] = curoid;
1245 		}
1246 
1247 		/* Now prepare an "extra" struct for assign_temp_tablespaces */
1248 		myextra = malloc(offsetof(temp_tablespaces_extra, tblSpcs) +
1249 						 numSpcs * sizeof(Oid));
1250 		if (!myextra)
1251 			return false;
1252 		myextra->numSpcs = numSpcs;
1253 		memcpy(myextra->tblSpcs, tblSpcs, numSpcs * sizeof(Oid));
1254 		*extra = (void *) myextra;
1255 
1256 		pfree(tblSpcs);
1257 	}
1258 
1259 	pfree(rawname);
1260 	list_free(namelist);
1261 
1262 	return true;
1263 }
1264 
1265 /* assign_hook: do extra actions as needed */
1266 void
assign_temp_tablespaces(const char * newval,void * extra)1267 assign_temp_tablespaces(const char *newval, void *extra)
1268 {
1269 	temp_tablespaces_extra *myextra = (temp_tablespaces_extra *) extra;
1270 
1271 	/*
1272 	 * If check_temp_tablespaces was executed inside a transaction, then pass
1273 	 * the list it made to fd.c.  Otherwise, clear fd.c's list; we must be
1274 	 * still outside a transaction, or else restoring during transaction exit,
1275 	 * and in either case we can just let the next PrepareTempTablespaces call
1276 	 * make things sane.
1277 	 */
1278 	if (myextra)
1279 		SetTempTablespaces(myextra->tblSpcs, myextra->numSpcs);
1280 	else
1281 		SetTempTablespaces(NULL, 0);
1282 }
1283 
1284 /*
1285  * PrepareTempTablespaces -- prepare to use temp tablespaces
1286  *
1287  * If we have not already done so in the current transaction, parse the
1288  * temp_tablespaces GUC variable and tell fd.c which tablespace(s) to use
1289  * for temp files.
1290  */
1291 void
PrepareTempTablespaces(void)1292 PrepareTempTablespaces(void)
1293 {
1294 	char	   *rawname;
1295 	List	   *namelist;
1296 	Oid		   *tblSpcs;
1297 	int			numSpcs;
1298 	ListCell   *l;
1299 
1300 	/* No work if already done in current transaction */
1301 	if (TempTablespacesAreSet())
1302 		return;
1303 
1304 	/*
1305 	 * Can't do catalog access unless within a transaction.  This is just a
1306 	 * safety check in case this function is called by low-level code that
1307 	 * could conceivably execute outside a transaction.  Note that in such a
1308 	 * scenario, fd.c will fall back to using the current database's default
1309 	 * tablespace, which should always be OK.
1310 	 */
1311 	if (!IsTransactionState())
1312 		return;
1313 
1314 	/* Need a modifiable copy of string */
1315 	rawname = pstrdup(temp_tablespaces);
1316 
1317 	/* Parse string into list of identifiers */
1318 	if (!SplitIdentifierString(rawname, ',', &namelist))
1319 	{
1320 		/* syntax error in name list */
1321 		SetTempTablespaces(NULL, 0);
1322 		pfree(rawname);
1323 		list_free(namelist);
1324 		return;
1325 	}
1326 
1327 	/* Store tablespace OIDs in an array in TopTransactionContext */
1328 	tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1329 										 list_length(namelist) * sizeof(Oid));
1330 	numSpcs = 0;
1331 	foreach(l, namelist)
1332 	{
1333 		char	   *curname = (char *) lfirst(l);
1334 		Oid			curoid;
1335 		AclResult	aclresult;
1336 
1337 		/* Allow an empty string (signifying database default) */
1338 		if (curname[0] == '\0')
1339 		{
1340 			/* InvalidOid signifies database's default tablespace */
1341 			tblSpcs[numSpcs++] = InvalidOid;
1342 			continue;
1343 		}
1344 
1345 		/* Else verify that name is a valid tablespace name */
1346 		curoid = get_tablespace_oid(curname, true);
1347 		if (curoid == InvalidOid)
1348 		{
1349 			/* Skip any bad list elements */
1350 			continue;
1351 		}
1352 
1353 		/*
1354 		 * Allow explicit specification of database's default tablespace in
1355 		 * temp_tablespaces without triggering permissions checks.
1356 		 */
1357 		if (curoid == MyDatabaseTableSpace)
1358 		{
1359 			/* InvalidOid signifies database's default tablespace */
1360 			tblSpcs[numSpcs++] = InvalidOid;
1361 			continue;
1362 		}
1363 
1364 		/* Check permissions similarly */
1365 		aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1366 										   ACL_CREATE);
1367 		if (aclresult != ACLCHECK_OK)
1368 			continue;
1369 
1370 		tblSpcs[numSpcs++] = curoid;
1371 	}
1372 
1373 	SetTempTablespaces(tblSpcs, numSpcs);
1374 
1375 	pfree(rawname);
1376 	list_free(namelist);
1377 }
1378 
1379 
1380 /*
1381  * get_tablespace_oid - given a tablespace name, look up the OID
1382  *
1383  * If missing_ok is false, throw an error if tablespace name not found.  If
1384  * true, just return InvalidOid.
1385  */
1386 Oid
get_tablespace_oid(const char * tablespacename,bool missing_ok)1387 get_tablespace_oid(const char *tablespacename, bool missing_ok)
1388 {
1389 	Oid			result;
1390 	Relation	rel;
1391 	HeapScanDesc scandesc;
1392 	HeapTuple	tuple;
1393 	ScanKeyData entry[1];
1394 
1395 	/*
1396 	 * Search pg_tablespace.  We use a heapscan here even though there is an
1397 	 * index on name, on the theory that pg_tablespace will usually have just
1398 	 * a few entries and so an indexed lookup is a waste of effort.
1399 	 */
1400 	rel = heap_open(TableSpaceRelationId, AccessShareLock);
1401 
1402 	ScanKeyInit(&entry[0],
1403 				Anum_pg_tablespace_spcname,
1404 				BTEqualStrategyNumber, F_NAMEEQ,
1405 				CStringGetDatum(tablespacename));
1406 	scandesc = heap_beginscan_catalog(rel, 1, entry);
1407 	tuple = heap_getnext(scandesc, ForwardScanDirection);
1408 
1409 	/* We assume that there can be at most one matching tuple */
1410 	if (HeapTupleIsValid(tuple))
1411 		result = HeapTupleGetOid(tuple);
1412 	else
1413 		result = InvalidOid;
1414 
1415 	heap_endscan(scandesc);
1416 	heap_close(rel, AccessShareLock);
1417 
1418 	if (!OidIsValid(result) && !missing_ok)
1419 		ereport(ERROR,
1420 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1421 				 errmsg("tablespace \"%s\" does not exist",
1422 						tablespacename)));
1423 
1424 	return result;
1425 }
1426 
1427 /*
1428  * get_tablespace_name - given a tablespace OID, look up the name
1429  *
1430  * Returns a palloc'd string, or NULL if no such tablespace.
1431  */
1432 char *
get_tablespace_name(Oid spc_oid)1433 get_tablespace_name(Oid spc_oid)
1434 {
1435 	char	   *result;
1436 	Relation	rel;
1437 	HeapScanDesc scandesc;
1438 	HeapTuple	tuple;
1439 	ScanKeyData entry[1];
1440 
1441 	/*
1442 	 * Search pg_tablespace.  We use a heapscan here even though there is an
1443 	 * index on oid, on the theory that pg_tablespace will usually have just a
1444 	 * few entries and so an indexed lookup is a waste of effort.
1445 	 */
1446 	rel = heap_open(TableSpaceRelationId, AccessShareLock);
1447 
1448 	ScanKeyInit(&entry[0],
1449 				ObjectIdAttributeNumber,
1450 				BTEqualStrategyNumber, F_OIDEQ,
1451 				ObjectIdGetDatum(spc_oid));
1452 	scandesc = heap_beginscan_catalog(rel, 1, entry);
1453 	tuple = heap_getnext(scandesc, ForwardScanDirection);
1454 
1455 	/* We assume that there can be at most one matching tuple */
1456 	if (HeapTupleIsValid(tuple))
1457 		result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname));
1458 	else
1459 		result = NULL;
1460 
1461 	heap_endscan(scandesc);
1462 	heap_close(rel, AccessShareLock);
1463 
1464 	return result;
1465 }
1466 
1467 
1468 /*
1469  * TABLESPACE resource manager's routines
1470  */
1471 void
tblspc_redo(XLogReaderState * record)1472 tblspc_redo(XLogReaderState *record)
1473 {
1474 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1475 
1476 	/* Backup blocks are not used in tblspc records */
1477 	Assert(!XLogRecHasAnyBlockRefs(record));
1478 
1479 	if (info == XLOG_TBLSPC_CREATE)
1480 	{
1481 		xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
1482 		char	   *location = xlrec->ts_path;
1483 
1484 		create_tablespace_directories(location, xlrec->ts_id);
1485 	}
1486 	else if (info == XLOG_TBLSPC_DROP)
1487 	{
1488 		xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);
1489 
1490 		/*
1491 		 * If we issued a WAL record for a drop tablespace it implies that
1492 		 * there were no files in it at all when the DROP was done. That means
1493 		 * that no permanent objects can exist in it at this point.
1494 		 *
1495 		 * It is possible for standby users to be using this tablespace as a
1496 		 * location for their temporary files, so if we fail to remove all
1497 		 * files then do conflict processing and try again, if currently
1498 		 * enabled.
1499 		 *
1500 		 * Other possible reasons for failure include bollixed file
1501 		 * permissions on a standby server when they were okay on the primary,
1502 		 * etc etc. There's not much we can do about that, so just remove what
1503 		 * we can and press on.
1504 		 */
1505 		if (!destroy_tablespace_directories(xlrec->ts_id, true))
1506 		{
1507 			ResolveRecoveryConflictWithTablespace(xlrec->ts_id);
1508 
1509 			/*
1510 			 * If we did recovery processing then hopefully the backends who
1511 			 * wrote temp files should have cleaned up and exited by now.  So
1512 			 * retry before complaining.  If we fail again, this is just a LOG
1513 			 * condition, because it's not worth throwing an ERROR for (as
1514 			 * that would crash the database and require manual intervention
1515 			 * before we could get past this WAL record on restart).
1516 			 */
1517 			if (!destroy_tablespace_directories(xlrec->ts_id, true))
1518 				ereport(LOG,
1519 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1520 				 errmsg("directories for tablespace %u could not be removed",
1521 						xlrec->ts_id),
1522 						 errhint("You can remove the directories manually if necessary.")));
1523 		}
1524 	}
1525 	else
1526 		elog(PANIC, "tblspc_redo: unknown op code %u", info);
1527 }
1528