1 /*-------------------------------------------------------------------------
2  *
3  * tablespace.c
4  *	  Commands to manipulate table spaces
5  *
6  * Tablespaces in PostgreSQL are designed to allow users to determine
7  * where the data file(s) for a given database object reside on the file
8  * system.
9  *
10  * A tablespace represents a directory on the file system. At tablespace
11  * creation time, the directory must be empty. To simplify things and
12  * remove the possibility of having file name conflicts, we isolate
13  * files within a tablespace into database-specific subdirectories.
14  *
15  * To support file access via the information given in RelFileNode, we
16  * maintain a symbolic-link map in $PGDATA/pg_tblspc. The symlinks are
17  * named by tablespace OIDs and point to the actual tablespace directories.
18  * There is also a per-cluster version directory in each tablespace.
19  * Thus the full path to an arbitrary file is
20  *			$PGDATA/pg_tblspc/spcoid/PG_MAJORVER_CATVER/dboid/relfilenode
21  * e.g.
22  *			$PGDATA/pg_tblspc/20981/PG_9.0_201002161/719849/83292814
23  *
24  * There are two tablespaces created at initdb time: pg_global (for shared
25  * tables) and pg_default (for everything else).  For backwards compatibility
26  * and to remain functional on platforms without symlinks, these tablespaces
27  * are accessed specially: they are respectively
28  *			$PGDATA/global/relfilenode
29  *			$PGDATA/base/dboid/relfilenode
30  *
31  * To allow CREATE DATABASE to give a new database a default tablespace
32  * that's different from the template database's default, we make the
33  * provision that a zero in pg_class.reltablespace means the database's
34  * default tablespace.  Without this, CREATE DATABASE would have to go in
35  * and munge the system catalogs of the new database.
36  *
37  *
38  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
39  * Portions Copyright (c) 1994, Regents of the University of California
40  *
41  *
42  * IDENTIFICATION
43  *	  src/backend/commands/tablespace.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <unistd.h>
50 #include <dirent.h>
51 #include <sys/stat.h>
52 
53 #include "access/heapam.h"
54 #include "access/reloptions.h"
55 #include "access/htup_details.h"
56 #include "access/sysattr.h"
57 #include "access/xact.h"
58 #include "access/xlog.h"
59 #include "access/xloginsert.h"
60 #include "catalog/catalog.h"
61 #include "catalog/dependency.h"
62 #include "catalog/indexing.h"
63 #include "catalog/namespace.h"
64 #include "catalog/objectaccess.h"
65 #include "catalog/pg_namespace.h"
66 #include "catalog/pg_tablespace.h"
67 #include "commands/comment.h"
68 #include "commands/seclabel.h"
69 #include "commands/tablecmds.h"
70 #include "commands/tablespace.h"
71 #include "miscadmin.h"
72 #include "postmaster/bgwriter.h"
73 #include "storage/fd.h"
74 #include "storage/lmgr.h"
75 #include "storage/standby.h"
76 #include "utils/acl.h"
77 #include "utils/builtins.h"
78 #include "utils/fmgroids.h"
79 #include "utils/guc.h"
80 #include "utils/lsyscache.h"
81 #include "utils/memutils.h"
82 #include "utils/rel.h"
83 #include "utils/tqual.h"
84 #include "utils/varlena.h"
85 
86 
87 /* GUC variables */
88 char	   *default_tablespace = NULL;
89 char	   *temp_tablespaces = NULL;
90 
91 
92 static void create_tablespace_directories(const char *location,
93 							  const Oid tablespaceoid);
94 static bool destroy_tablespace_directories(Oid tablespaceoid, bool redo);
95 
96 
97 /*
98  * Each database using a table space is isolated into its own name space
99  * by a subdirectory named for the database OID.  On first creation of an
100  * object in the tablespace, create the subdirectory.  If the subdirectory
101  * already exists, fall through quietly.
102  *
103  * isRedo indicates that we are creating an object during WAL replay.
104  * In this case we will cope with the possibility of the tablespace
105  * directory not being there either --- this could happen if we are
106  * replaying an operation on a table in a subsequently-dropped tablespace.
107  * We handle this by making a directory in the place where the tablespace
108  * symlink would normally be.  This isn't an exact replay of course, but
109  * it's the best we can do given the available information.
110  *
111  * If tablespaces are not supported, we still need it in case we have to
112  * re-create a database subdirectory (of $PGDATA/base) during WAL replay.
113  */
114 void
TablespaceCreateDbspace(Oid spcNode,Oid dbNode,bool isRedo)115 TablespaceCreateDbspace(Oid spcNode, Oid dbNode, bool isRedo)
116 {
117 	struct stat st;
118 	char	   *dir;
119 
120 	/*
121 	 * The global tablespace doesn't have per-database subdirectories, so
122 	 * nothing to do for it.
123 	 */
124 	if (spcNode == GLOBALTABLESPACE_OID)
125 		return;
126 
127 	Assert(OidIsValid(spcNode));
128 	Assert(OidIsValid(dbNode));
129 
130 	dir = GetDatabasePath(dbNode, spcNode);
131 
132 	if (stat(dir, &st) < 0)
133 	{
134 		/* Directory does not exist? */
135 		if (errno == ENOENT)
136 		{
137 			/*
138 			 * Acquire TablespaceCreateLock to ensure that no DROP TABLESPACE
139 			 * or TablespaceCreateDbspace is running concurrently.
140 			 */
141 			LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
142 
143 			/*
144 			 * Recheck to see if someone created the directory while we were
145 			 * waiting for lock.
146 			 */
147 			if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode))
148 			{
149 				/* Directory was created */
150 			}
151 			else
152 			{
153 				/* Directory creation failed? */
154 				if (mkdir(dir, S_IRWXU) < 0)
155 				{
156 					char	   *parentdir;
157 
158 					/* Failure other than not exists or not in WAL replay? */
159 					if (errno != ENOENT || !isRedo)
160 						ereport(ERROR,
161 								(errcode_for_file_access(),
162 								 errmsg("could not create directory \"%s\": %m",
163 										dir)));
164 
165 					/*
166 					 * Parent directories are missing during WAL replay, so
167 					 * continue by creating simple parent directories rather
168 					 * than a symlink.
169 					 */
170 
171 					/* create two parents up if not exist */
172 					parentdir = pstrdup(dir);
173 					get_parent_directory(parentdir);
174 					get_parent_directory(parentdir);
175 					/* Can't create parent and it doesn't already exist? */
176 					if (mkdir(parentdir, S_IRWXU) < 0 && errno != EEXIST)
177 						ereport(ERROR,
178 								(errcode_for_file_access(),
179 								 errmsg("could not create directory \"%s\": %m",
180 										parentdir)));
181 					pfree(parentdir);
182 
183 					/* create one parent up if not exist */
184 					parentdir = pstrdup(dir);
185 					get_parent_directory(parentdir);
186 					/* Can't create parent and it doesn't already exist? */
187 					if (mkdir(parentdir, S_IRWXU) < 0 && errno != EEXIST)
188 						ereport(ERROR,
189 								(errcode_for_file_access(),
190 								 errmsg("could not create directory \"%s\": %m",
191 										parentdir)));
192 					pfree(parentdir);
193 
194 					/* Create database directory */
195 					if (mkdir(dir, S_IRWXU) < 0)
196 						ereport(ERROR,
197 								(errcode_for_file_access(),
198 								 errmsg("could not create directory \"%s\": %m",
199 										dir)));
200 				}
201 			}
202 
203 			LWLockRelease(TablespaceCreateLock);
204 		}
205 		else
206 		{
207 			ereport(ERROR,
208 					(errcode_for_file_access(),
209 					 errmsg("could not stat directory \"%s\": %m", dir)));
210 		}
211 	}
212 	else
213 	{
214 		/* Is it not a directory? */
215 		if (!S_ISDIR(st.st_mode))
216 			ereport(ERROR,
217 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
218 					 errmsg("\"%s\" exists but is not a directory",
219 							dir)));
220 	}
221 
222 	pfree(dir);
223 }
224 
225 /*
226  * Create a table space
227  *
228  * Only superusers can create a tablespace. This seems a reasonable restriction
229  * since we're determining the system layout and, anyway, we probably have
230  * root if we're doing this kind of activity
231  */
232 Oid
CreateTableSpace(CreateTableSpaceStmt * stmt)233 CreateTableSpace(CreateTableSpaceStmt *stmt)
234 {
235 #ifdef HAVE_SYMLINK
236 	Relation	rel;
237 	Datum		values[Natts_pg_tablespace];
238 	bool		nulls[Natts_pg_tablespace];
239 	HeapTuple	tuple;
240 	Oid			tablespaceoid;
241 	char	   *location;
242 	Oid			ownerId;
243 	Datum		newOptions;
244 
245 	/* Must be super user */
246 	if (!superuser())
247 		ereport(ERROR,
248 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
249 				 errmsg("permission denied to create tablespace \"%s\"",
250 						stmt->tablespacename),
251 				 errhint("Must be superuser to create a tablespace.")));
252 
253 	/* However, the eventual owner of the tablespace need not be */
254 	if (stmt->owner)
255 		ownerId = get_rolespec_oid(stmt->owner, false);
256 	else
257 		ownerId = GetUserId();
258 
259 	/* Unix-ify the offered path, and strip any trailing slashes */
260 	location = pstrdup(stmt->location);
261 	canonicalize_path(location);
262 
263 	/* disallow quotes, else CREATE DATABASE would be at risk */
264 	if (strchr(location, '\''))
265 		ereport(ERROR,
266 				(errcode(ERRCODE_INVALID_NAME),
267 				 errmsg("tablespace location cannot contain single quotes")));
268 
269 	/*
270 	 * Allowing relative paths seems risky
271 	 *
272 	 * this also helps us ensure that location is not empty or whitespace
273 	 */
274 	if (!is_absolute_path(location))
275 		ereport(ERROR,
276 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
277 				 errmsg("tablespace location must be an absolute path")));
278 
279 	/*
280 	 * Check that location isn't too long. Remember that we're going to append
281 	 * 'PG_XXX/<dboid>/<relid>_<fork>.<nnn>'.  FYI, we never actually
282 	 * reference the whole path here, but mkdir() uses the first two parts.
283 	 */
284 	if (strlen(location) + 1 + strlen(TABLESPACE_VERSION_DIRECTORY) + 1 +
285 		OIDCHARS + 1 + OIDCHARS + 1 + FORKNAMECHARS + 1 + OIDCHARS > MAXPGPATH)
286 		ereport(ERROR,
287 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
288 				 errmsg("tablespace location \"%s\" is too long",
289 						location)));
290 
291 	/* Warn if the tablespace is in the data directory. */
292 	if (path_is_prefix_of_path(DataDir, location))
293 		ereport(WARNING,
294 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
295 				 errmsg("tablespace location should not be inside the data directory")));
296 
297 	/*
298 	 * Disallow creation of tablespaces named "pg_xxx"; we reserve this
299 	 * namespace for system purposes.
300 	 */
301 	if (!allowSystemTableMods && IsReservedName(stmt->tablespacename))
302 		ereport(ERROR,
303 				(errcode(ERRCODE_RESERVED_NAME),
304 				 errmsg("unacceptable tablespace name \"%s\"",
305 						stmt->tablespacename),
306 				 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
307 
308 	/*
309 	 * Check that there is no other tablespace by this name.  (The unique
310 	 * index would catch this anyway, but might as well give a friendlier
311 	 * message.)
312 	 */
313 	if (OidIsValid(get_tablespace_oid(stmt->tablespacename, true)))
314 		ereport(ERROR,
315 				(errcode(ERRCODE_DUPLICATE_OBJECT),
316 				 errmsg("tablespace \"%s\" already exists",
317 						stmt->tablespacename)));
318 
319 	/*
320 	 * Insert tuple into pg_tablespace.  The purpose of doing this first is to
321 	 * lock the proposed tablename against other would-be creators. The
322 	 * insertion will roll back if we find problems below.
323 	 */
324 	rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
325 
326 	MemSet(nulls, false, sizeof(nulls));
327 
328 	values[Anum_pg_tablespace_spcname - 1] =
329 		DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename));
330 	values[Anum_pg_tablespace_spcowner - 1] =
331 		ObjectIdGetDatum(ownerId);
332 	nulls[Anum_pg_tablespace_spcacl - 1] = true;
333 
334 	/* Generate new proposed spcoptions (text array) */
335 	newOptions = transformRelOptions((Datum) 0,
336 									 stmt->options,
337 									 NULL, NULL, false, false);
338 	(void) tablespace_reloptions(newOptions, true);
339 	if (newOptions != (Datum) 0)
340 		values[Anum_pg_tablespace_spcoptions - 1] = newOptions;
341 	else
342 		nulls[Anum_pg_tablespace_spcoptions - 1] = true;
343 
344 	tuple = heap_form_tuple(rel->rd_att, values, nulls);
345 
346 	tablespaceoid = CatalogTupleInsert(rel, tuple);
347 
348 	heap_freetuple(tuple);
349 
350 	/* Record dependency on owner */
351 	recordDependencyOnOwner(TableSpaceRelationId, tablespaceoid, ownerId);
352 
353 	/* Post creation hook for new tablespace */
354 	InvokeObjectPostCreateHook(TableSpaceRelationId, tablespaceoid, 0);
355 
356 	create_tablespace_directories(location, tablespaceoid);
357 
358 	/* Record the filesystem change in XLOG */
359 	{
360 		xl_tblspc_create_rec xlrec;
361 
362 		xlrec.ts_id = tablespaceoid;
363 
364 		XLogBeginInsert();
365 		XLogRegisterData((char *) &xlrec,
366 						 offsetof(xl_tblspc_create_rec, ts_path));
367 		XLogRegisterData((char *) location, strlen(location) + 1);
368 
369 		(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_CREATE);
370 	}
371 
372 	/*
373 	 * Force synchronous commit, to minimize the window between creating the
374 	 * symlink on-disk and marking the transaction committed.  It's not great
375 	 * that there is any window at all, but definitely we don't want to make
376 	 * it larger than necessary.
377 	 */
378 	ForceSyncCommit();
379 
380 	pfree(location);
381 
382 	/* We keep the lock on pg_tablespace until commit */
383 	heap_close(rel, NoLock);
384 
385 	return tablespaceoid;
386 #else							/* !HAVE_SYMLINK */
387 	ereport(ERROR,
388 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
389 			 errmsg("tablespaces are not supported on this platform")));
390 	return InvalidOid;			/* keep compiler quiet */
391 #endif							/* HAVE_SYMLINK */
392 }
393 
394 /*
395  * Drop a table space
396  *
397  * Be careful to check that the tablespace is empty.
398  */
399 void
DropTableSpace(DropTableSpaceStmt * stmt)400 DropTableSpace(DropTableSpaceStmt *stmt)
401 {
402 #ifdef HAVE_SYMLINK
403 	char	   *tablespacename = stmt->tablespacename;
404 	HeapScanDesc scandesc;
405 	Relation	rel;
406 	HeapTuple	tuple;
407 	ScanKeyData entry[1];
408 	Oid			tablespaceoid;
409 
410 	/*
411 	 * Find the target tuple
412 	 */
413 	rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
414 
415 	ScanKeyInit(&entry[0],
416 				Anum_pg_tablespace_spcname,
417 				BTEqualStrategyNumber, F_NAMEEQ,
418 				CStringGetDatum(tablespacename));
419 	scandesc = heap_beginscan_catalog(rel, 1, entry);
420 	tuple = heap_getnext(scandesc, ForwardScanDirection);
421 
422 	if (!HeapTupleIsValid(tuple))
423 	{
424 		if (!stmt->missing_ok)
425 		{
426 			ereport(ERROR,
427 					(errcode(ERRCODE_UNDEFINED_OBJECT),
428 					 errmsg("tablespace \"%s\" does not exist",
429 							tablespacename)));
430 		}
431 		else
432 		{
433 			ereport(NOTICE,
434 					(errmsg("tablespace \"%s\" does not exist, skipping",
435 							tablespacename)));
436 			/* XXX I assume I need one or both of these next two calls */
437 			heap_endscan(scandesc);
438 			heap_close(rel, NoLock);
439 		}
440 		return;
441 	}
442 
443 	tablespaceoid = HeapTupleGetOid(tuple);
444 
445 	/* Must be tablespace owner */
446 	if (!pg_tablespace_ownercheck(tablespaceoid, GetUserId()))
447 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
448 					   tablespacename);
449 
450 	/* Disallow drop of the standard tablespaces, even by superuser */
451 	if (tablespaceoid == GLOBALTABLESPACE_OID ||
452 		tablespaceoid == DEFAULTTABLESPACE_OID)
453 		aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE,
454 					   tablespacename);
455 
456 	/* DROP hook for the tablespace being removed */
457 	InvokeObjectDropHook(TableSpaceRelationId, tablespaceoid, 0);
458 
459 	/*
460 	 * Remove the pg_tablespace tuple (this will roll back if we fail below)
461 	 */
462 	CatalogTupleDelete(rel, &tuple->t_self);
463 
464 	heap_endscan(scandesc);
465 
466 	/*
467 	 * Remove any comments or security labels on this tablespace.
468 	 */
469 	DeleteSharedComments(tablespaceoid, TableSpaceRelationId);
470 	DeleteSharedSecurityLabel(tablespaceoid, TableSpaceRelationId);
471 
472 	/*
473 	 * Remove dependency on owner.
474 	 */
475 	deleteSharedDependencyRecordsFor(TableSpaceRelationId, tablespaceoid, 0);
476 
477 	/*
478 	 * Acquire TablespaceCreateLock to ensure that no TablespaceCreateDbspace
479 	 * is running concurrently.
480 	 */
481 	LWLockAcquire(TablespaceCreateLock, LW_EXCLUSIVE);
482 
483 	/*
484 	 * Try to remove the physical infrastructure.
485 	 */
486 	if (!destroy_tablespace_directories(tablespaceoid, false))
487 	{
488 		/*
489 		 * Not all files deleted?  However, there can be lingering empty files
490 		 * in the directories, left behind by for example DROP TABLE, that
491 		 * have been scheduled for deletion at next checkpoint (see comments
492 		 * in mdunlink() for details).  We could just delete them immediately,
493 		 * but we can't tell them apart from important data files that we
494 		 * mustn't delete.  So instead, we force a checkpoint which will clean
495 		 * out any lingering files, and try again.
496 		 *
497 		 * XXX On Windows, an unlinked file persists in the directory listing
498 		 * until no process retains an open handle for the file.  The DDL
499 		 * commands that schedule files for unlink send invalidation messages
500 		 * directing other PostgreSQL processes to close the files.  DROP
501 		 * TABLESPACE should not give up on the tablespace becoming empty
502 		 * until all relevant invalidation processing is complete.
503 		 */
504 		RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_FORCE | CHECKPOINT_WAIT);
505 		if (!destroy_tablespace_directories(tablespaceoid, false))
506 		{
507 			/* Still not empty, the files must be important then */
508 			ereport(ERROR,
509 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
510 					 errmsg("tablespace \"%s\" is not empty",
511 							tablespacename)));
512 		}
513 	}
514 
515 	/* Record the filesystem change in XLOG */
516 	{
517 		xl_tblspc_drop_rec xlrec;
518 
519 		xlrec.ts_id = tablespaceoid;
520 
521 		XLogBeginInsert();
522 		XLogRegisterData((char *) &xlrec, sizeof(xl_tblspc_drop_rec));
523 
524 		(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP);
525 	}
526 
527 	/*
528 	 * Note: because we checked that the tablespace was empty, there should be
529 	 * no need to worry about flushing shared buffers or free space map
530 	 * entries for relations in the tablespace.
531 	 */
532 
533 	/*
534 	 * Force synchronous commit, to minimize the window between removing the
535 	 * files on-disk and marking the transaction committed.  It's not great
536 	 * that there is any window at all, but definitely we don't want to make
537 	 * it larger than necessary.
538 	 */
539 	ForceSyncCommit();
540 
541 	/*
542 	 * Allow TablespaceCreateDbspace again.
543 	 */
544 	LWLockRelease(TablespaceCreateLock);
545 
546 	/* We keep the lock on pg_tablespace until commit */
547 	heap_close(rel, NoLock);
548 #else							/* !HAVE_SYMLINK */
549 	ereport(ERROR,
550 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
551 			 errmsg("tablespaces are not supported on this platform")));
552 #endif							/* HAVE_SYMLINK */
553 }
554 
555 
556 /*
557  * create_tablespace_directories
558  *
559  *	Attempt to create filesystem infrastructure linking $PGDATA/pg_tblspc/
560  *	to the specified directory
561  */
562 static void
create_tablespace_directories(const char * location,const Oid tablespaceoid)563 create_tablespace_directories(const char *location, const Oid tablespaceoid)
564 {
565 	char	   *linkloc;
566 	char	   *location_with_version_dir;
567 	struct stat st;
568 
569 	linkloc = psprintf("pg_tblspc/%u", tablespaceoid);
570 	location_with_version_dir = psprintf("%s/%s", location,
571 										 TABLESPACE_VERSION_DIRECTORY);
572 
573 	/*
574 	 * Attempt to coerce target directory to safe permissions.  If this fails,
575 	 * it doesn't exist or has the wrong owner.
576 	 */
577 	if (chmod(location, S_IRWXU) != 0)
578 	{
579 		if (errno == ENOENT)
580 			ereport(ERROR,
581 					(errcode(ERRCODE_UNDEFINED_FILE),
582 					 errmsg("directory \"%s\" does not exist", location),
583 					 InRecovery ? errhint("Create this directory for the tablespace before "
584 										  "restarting the server.") : 0));
585 		else
586 			ereport(ERROR,
587 					(errcode_for_file_access(),
588 					 errmsg("could not set permissions on directory \"%s\": %m",
589 							location)));
590 	}
591 
592 	/*
593 	 * The creation of the version directory prevents more than one tablespace
594 	 * in a single location.  This imitates TablespaceCreateDbspace(), but it
595 	 * ignores concurrency and missing parent directories.  The chmod() would
596 	 * have failed in the absence of a parent.  pg_tablespace_spcname_index
597 	 * prevents concurrency.
598 	 */
599 	if (stat(location_with_version_dir, &st) < 0)
600 	{
601 		if (errno != ENOENT)
602 			ereport(ERROR,
603 					(errcode_for_file_access(),
604 					 errmsg("could not stat directory \"%s\": %m",
605 							location_with_version_dir)));
606 		else if (mkdir(location_with_version_dir, S_IRWXU) < 0)
607 			ereport(ERROR,
608 					(errcode_for_file_access(),
609 					 errmsg("could not create directory \"%s\": %m",
610 							location_with_version_dir)));
611 	}
612 	else if (!S_ISDIR(st.st_mode))
613 		ereport(ERROR,
614 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
615 				 errmsg("\"%s\" exists but is not a directory",
616 						location_with_version_dir)));
617 	else if (!InRecovery)
618 		ereport(ERROR,
619 				(errcode(ERRCODE_OBJECT_IN_USE),
620 				 errmsg("directory \"%s\" already in use as a tablespace",
621 						location_with_version_dir)));
622 
623 	/*
624 	 * In recovery, remove old symlink, in case it points to the wrong place.
625 	 */
626 	if (InRecovery)
627 		remove_tablespace_symlink(linkloc);
628 
629 	/*
630 	 * Create the symlink under PGDATA
631 	 */
632 	if (symlink(location, linkloc) < 0)
633 		ereport(ERROR,
634 				(errcode_for_file_access(),
635 				 errmsg("could not create symbolic link \"%s\": %m",
636 						linkloc)));
637 
638 	pfree(linkloc);
639 	pfree(location_with_version_dir);
640 }
641 
642 
643 /*
644  * destroy_tablespace_directories
645  *
646  * Attempt to remove filesystem infrastructure for the tablespace.
647  *
648  * 'redo' indicates we are redoing a drop from XLOG; in that case we should
649  * not throw an ERROR for problems, just LOG them.  The worst consequence of
650  * not removing files here would be failure to release some disk space, which
651  * does not justify throwing an error that would require manual intervention
652  * to get the database running again.
653  *
654  * Returns TRUE if successful, FALSE if some subdirectory is not empty
655  */
656 static bool
destroy_tablespace_directories(Oid tablespaceoid,bool redo)657 destroy_tablespace_directories(Oid tablespaceoid, bool redo)
658 {
659 	char	   *linkloc;
660 	char	   *linkloc_with_version_dir;
661 	DIR		   *dirdesc;
662 	struct dirent *de;
663 	char	   *subfile;
664 	struct stat st;
665 
666 	linkloc_with_version_dir = psprintf("pg_tblspc/%u/%s", tablespaceoid,
667 										TABLESPACE_VERSION_DIRECTORY);
668 
669 	/*
670 	 * Check if the tablespace still contains any files.  We try to rmdir each
671 	 * per-database directory we find in it.  rmdir failure implies there are
672 	 * still files in that subdirectory, so give up.  (We do not have to worry
673 	 * about undoing any already completed rmdirs, since the next attempt to
674 	 * use the tablespace from that database will simply recreate the
675 	 * subdirectory via TablespaceCreateDbspace.)
676 	 *
677 	 * Since we hold TablespaceCreateLock, no one else should be creating any
678 	 * fresh subdirectories in parallel. It is possible that new files are
679 	 * being created within subdirectories, though, so the rmdir call could
680 	 * fail.  Worst consequence is a less friendly error message.
681 	 *
682 	 * If redo is true then ENOENT is a likely outcome here, and we allow it
683 	 * to pass without comment.  In normal operation we still allow it, but
684 	 * with a warning.  This is because even though ProcessUtility disallows
685 	 * DROP TABLESPACE in a transaction block, it's possible that a previous
686 	 * DROP failed and rolled back after removing the tablespace directories
687 	 * and/or symlink.  We want to allow a new DROP attempt to succeed at
688 	 * removing the catalog entries (and symlink if still present), so we
689 	 * should not give a hard error here.
690 	 */
691 	dirdesc = AllocateDir(linkloc_with_version_dir);
692 	if (dirdesc == NULL)
693 	{
694 		if (errno == ENOENT)
695 		{
696 			if (!redo)
697 				ereport(WARNING,
698 						(errcode_for_file_access(),
699 						 errmsg("could not open directory \"%s\": %m",
700 								linkloc_with_version_dir)));
701 			/* The symlink might still exist, so go try to remove it */
702 			goto remove_symlink;
703 		}
704 		else if (redo)
705 		{
706 			/* in redo, just log other types of error */
707 			ereport(LOG,
708 					(errcode_for_file_access(),
709 					 errmsg("could not open directory \"%s\": %m",
710 							linkloc_with_version_dir)));
711 			pfree(linkloc_with_version_dir);
712 			return false;
713 		}
714 		/* else let ReadDir report the error */
715 	}
716 
717 	while ((de = ReadDir(dirdesc, linkloc_with_version_dir)) != NULL)
718 	{
719 		if (strcmp(de->d_name, ".") == 0 ||
720 			strcmp(de->d_name, "..") == 0)
721 			continue;
722 
723 		subfile = psprintf("%s/%s", linkloc_with_version_dir, de->d_name);
724 
725 		/* This check is just to deliver a friendlier error message */
726 		if (!redo && !directory_is_empty(subfile))
727 		{
728 			FreeDir(dirdesc);
729 			pfree(subfile);
730 			pfree(linkloc_with_version_dir);
731 			return false;
732 		}
733 
734 		/* remove empty directory */
735 		if (rmdir(subfile) < 0)
736 			ereport(redo ? LOG : ERROR,
737 					(errcode_for_file_access(),
738 					 errmsg("could not remove directory \"%s\": %m",
739 							subfile)));
740 
741 		pfree(subfile);
742 	}
743 
744 	FreeDir(dirdesc);
745 
746 	/* remove version directory */
747 	if (rmdir(linkloc_with_version_dir) < 0)
748 	{
749 		ereport(redo ? LOG : ERROR,
750 				(errcode_for_file_access(),
751 				 errmsg("could not remove directory \"%s\": %m",
752 						linkloc_with_version_dir)));
753 		pfree(linkloc_with_version_dir);
754 		return false;
755 	}
756 
757 	/*
758 	 * Try to remove the symlink.  We must however deal with the possibility
759 	 * that it's a directory instead of a symlink --- this could happen during
760 	 * WAL replay (see TablespaceCreateDbspace), and it is also the case on
761 	 * Windows where junction points lstat() as directories.
762 	 *
763 	 * Note: in the redo case, we'll return true if this final step fails;
764 	 * there's no point in retrying it.  Also, ENOENT should provoke no more
765 	 * than a warning.
766 	 */
767 remove_symlink:
768 	linkloc = pstrdup(linkloc_with_version_dir);
769 	get_parent_directory(linkloc);
770 	if (lstat(linkloc, &st) < 0)
771 	{
772 		int			saved_errno = errno;
773 
774 		ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
775 				(errcode_for_file_access(),
776 				 errmsg("could not stat file \"%s\": %m",
777 						linkloc)));
778 	}
779 	else if (S_ISDIR(st.st_mode))
780 	{
781 		if (rmdir(linkloc) < 0)
782 		{
783 			int			saved_errno = errno;
784 
785 			ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
786 					(errcode_for_file_access(),
787 					 errmsg("could not remove directory \"%s\": %m",
788 							linkloc)));
789 		}
790 	}
791 #ifdef S_ISLNK
792 	else if (S_ISLNK(st.st_mode))
793 	{
794 		if (unlink(linkloc) < 0)
795 		{
796 			int			saved_errno = errno;
797 
798 			ereport(redo ? LOG : (saved_errno == ENOENT ? WARNING : ERROR),
799 					(errcode_for_file_access(),
800 					 errmsg("could not remove symbolic link \"%s\": %m",
801 							linkloc)));
802 		}
803 	}
804 #endif
805 	else
806 	{
807 		/* Refuse to remove anything that's not a directory or symlink */
808 		ereport(redo ? LOG : ERROR,
809 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
810 				 errmsg("\"%s\" is not a directory or symbolic link",
811 						linkloc)));
812 	}
813 
814 	pfree(linkloc_with_version_dir);
815 	pfree(linkloc);
816 
817 	return true;
818 }
819 
820 
821 /*
822  * Check if a directory is empty.
823  *
824  * This probably belongs somewhere else, but not sure where...
825  */
826 bool
directory_is_empty(const char * path)827 directory_is_empty(const char *path)
828 {
829 	DIR		   *dirdesc;
830 	struct dirent *de;
831 
832 	dirdesc = AllocateDir(path);
833 
834 	while ((de = ReadDir(dirdesc, path)) != NULL)
835 	{
836 		if (strcmp(de->d_name, ".") == 0 ||
837 			strcmp(de->d_name, "..") == 0)
838 			continue;
839 		FreeDir(dirdesc);
840 		return false;
841 	}
842 
843 	FreeDir(dirdesc);
844 	return true;
845 }
846 
847 /*
848  *	remove_tablespace_symlink
849  *
850  * This function removes symlinks in pg_tblspc.  On Windows, junction points
851  * act like directories so we must be able to apply rmdir.  This function
852  * works like the symlink removal code in destroy_tablespace_directories,
853  * except that failure to remove is always an ERROR.  But if the file doesn't
854  * exist at all, that's OK.
855  */
856 void
remove_tablespace_symlink(const char * linkloc)857 remove_tablespace_symlink(const char *linkloc)
858 {
859 	struct stat st;
860 
861 	if (lstat(linkloc, &st) < 0)
862 	{
863 		if (errno == ENOENT)
864 			return;
865 		ereport(ERROR,
866 				(errcode_for_file_access(),
867 				 errmsg("could not stat file \"%s\": %m", linkloc)));
868 	}
869 
870 	if (S_ISDIR(st.st_mode))
871 	{
872 		/*
873 		 * This will fail if the directory isn't empty, but not if it's a
874 		 * junction point.
875 		 */
876 		if (rmdir(linkloc) < 0 && errno != ENOENT)
877 			ereport(ERROR,
878 					(errcode_for_file_access(),
879 					 errmsg("could not remove directory \"%s\": %m",
880 							linkloc)));
881 	}
882 #ifdef S_ISLNK
883 	else if (S_ISLNK(st.st_mode))
884 	{
885 		if (unlink(linkloc) < 0 && errno != ENOENT)
886 			ereport(ERROR,
887 					(errcode_for_file_access(),
888 					 errmsg("could not remove symbolic link \"%s\": %m",
889 							linkloc)));
890 	}
891 #endif
892 	else
893 	{
894 		/* Refuse to remove anything that's not a directory or symlink */
895 		ereport(ERROR,
896 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
897 				 errmsg("\"%s\" is not a directory or symbolic link",
898 						linkloc)));
899 	}
900 }
901 
902 /*
903  * Rename a tablespace
904  */
905 ObjectAddress
RenameTableSpace(const char * oldname,const char * newname)906 RenameTableSpace(const char *oldname, const char *newname)
907 {
908 	Oid			tspId;
909 	Relation	rel;
910 	ScanKeyData entry[1];
911 	HeapScanDesc scan;
912 	HeapTuple	tup;
913 	HeapTuple	newtuple;
914 	Form_pg_tablespace newform;
915 	ObjectAddress address;
916 
917 	/* Search pg_tablespace */
918 	rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
919 
920 	ScanKeyInit(&entry[0],
921 				Anum_pg_tablespace_spcname,
922 				BTEqualStrategyNumber, F_NAMEEQ,
923 				CStringGetDatum(oldname));
924 	scan = heap_beginscan_catalog(rel, 1, entry);
925 	tup = heap_getnext(scan, ForwardScanDirection);
926 	if (!HeapTupleIsValid(tup))
927 		ereport(ERROR,
928 				(errcode(ERRCODE_UNDEFINED_OBJECT),
929 				 errmsg("tablespace \"%s\" does not exist",
930 						oldname)));
931 
932 	tspId = HeapTupleGetOid(tup);
933 	newtuple = heap_copytuple(tup);
934 	newform = (Form_pg_tablespace) GETSTRUCT(newtuple);
935 
936 	heap_endscan(scan);
937 
938 	/* Must be owner */
939 	if (!pg_tablespace_ownercheck(HeapTupleGetOid(newtuple), GetUserId()))
940 		aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE, oldname);
941 
942 	/* Validate new name */
943 	if (!allowSystemTableMods && IsReservedName(newname))
944 		ereport(ERROR,
945 				(errcode(ERRCODE_RESERVED_NAME),
946 				 errmsg("unacceptable tablespace name \"%s\"", newname),
947 				 errdetail("The prefix \"pg_\" is reserved for system tablespaces.")));
948 
949 	/* Make sure the new name doesn't exist */
950 	ScanKeyInit(&entry[0],
951 				Anum_pg_tablespace_spcname,
952 				BTEqualStrategyNumber, F_NAMEEQ,
953 				CStringGetDatum(newname));
954 	scan = heap_beginscan_catalog(rel, 1, entry);
955 	tup = heap_getnext(scan, ForwardScanDirection);
956 	if (HeapTupleIsValid(tup))
957 		ereport(ERROR,
958 				(errcode(ERRCODE_DUPLICATE_OBJECT),
959 				 errmsg("tablespace \"%s\" already exists",
960 						newname)));
961 
962 	heap_endscan(scan);
963 
964 	/* OK, update the entry */
965 	namestrcpy(&(newform->spcname), newname);
966 
967 	CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
968 
969 	InvokeObjectPostAlterHook(TableSpaceRelationId, tspId, 0);
970 
971 	ObjectAddressSet(address, TableSpaceRelationId, tspId);
972 
973 	heap_close(rel, NoLock);
974 
975 	return address;
976 }
977 
978 /*
979  * Alter table space options
980  */
981 Oid
AlterTableSpaceOptions(AlterTableSpaceOptionsStmt * stmt)982 AlterTableSpaceOptions(AlterTableSpaceOptionsStmt *stmt)
983 {
984 	Relation	rel;
985 	ScanKeyData entry[1];
986 	HeapScanDesc scandesc;
987 	HeapTuple	tup;
988 	Oid			tablespaceoid;
989 	Datum		datum;
990 	Datum		newOptions;
991 	Datum		repl_val[Natts_pg_tablespace];
992 	bool		isnull;
993 	bool		repl_null[Natts_pg_tablespace];
994 	bool		repl_repl[Natts_pg_tablespace];
995 	HeapTuple	newtuple;
996 
997 	/* Search pg_tablespace */
998 	rel = heap_open(TableSpaceRelationId, RowExclusiveLock);
999 
1000 	ScanKeyInit(&entry[0],
1001 				Anum_pg_tablespace_spcname,
1002 				BTEqualStrategyNumber, F_NAMEEQ,
1003 				CStringGetDatum(stmt->tablespacename));
1004 	scandesc = heap_beginscan_catalog(rel, 1, entry);
1005 	tup = heap_getnext(scandesc, ForwardScanDirection);
1006 	if (!HeapTupleIsValid(tup))
1007 		ereport(ERROR,
1008 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1009 				 errmsg("tablespace \"%s\" does not exist",
1010 						stmt->tablespacename)));
1011 
1012 	tablespaceoid = HeapTupleGetOid(tup);
1013 
1014 	/* Must be owner of the existing object */
1015 	if (!pg_tablespace_ownercheck(HeapTupleGetOid(tup), GetUserId()))
1016 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE,
1017 					   stmt->tablespacename);
1018 
1019 	/* Generate new proposed spcoptions (text array) */
1020 	datum = heap_getattr(tup, Anum_pg_tablespace_spcoptions,
1021 						 RelationGetDescr(rel), &isnull);
1022 	newOptions = transformRelOptions(isnull ? (Datum) 0 : datum,
1023 									 stmt->options, NULL, NULL, false,
1024 									 stmt->isReset);
1025 	(void) tablespace_reloptions(newOptions, true);
1026 
1027 	/* Build new tuple. */
1028 	memset(repl_null, false, sizeof(repl_null));
1029 	memset(repl_repl, false, sizeof(repl_repl));
1030 	if (newOptions != (Datum) 0)
1031 		repl_val[Anum_pg_tablespace_spcoptions - 1] = newOptions;
1032 	else
1033 		repl_null[Anum_pg_tablespace_spcoptions - 1] = true;
1034 	repl_repl[Anum_pg_tablespace_spcoptions - 1] = true;
1035 	newtuple = heap_modify_tuple(tup, RelationGetDescr(rel), repl_val,
1036 								 repl_null, repl_repl);
1037 
1038 	/* Update system catalog. */
1039 	CatalogTupleUpdate(rel, &newtuple->t_self, newtuple);
1040 
1041 	InvokeObjectPostAlterHook(TableSpaceRelationId, HeapTupleGetOid(tup), 0);
1042 
1043 	heap_freetuple(newtuple);
1044 
1045 	/* Conclude heap scan. */
1046 	heap_endscan(scandesc);
1047 	heap_close(rel, NoLock);
1048 
1049 	return tablespaceoid;
1050 }
1051 
1052 /*
1053  * Routines for handling the GUC variable 'default_tablespace'.
1054  */
1055 
1056 /* check_hook: validate new default_tablespace */
1057 bool
check_default_tablespace(char ** newval,void ** extra,GucSource source)1058 check_default_tablespace(char **newval, void **extra, GucSource source)
1059 {
1060 	/*
1061 	 * If we aren't inside a transaction, or connected to a database, we
1062 	 * cannot do the catalog accesses necessary to verify the name.  Must
1063 	 * accept the value on faith.
1064 	 */
1065 	if (IsTransactionState() && MyDatabaseId != InvalidOid)
1066 	{
1067 		if (**newval != '\0' &&
1068 			!OidIsValid(get_tablespace_oid(*newval, true)))
1069 		{
1070 			/*
1071 			 * When source == PGC_S_TEST, don't throw a hard error for a
1072 			 * nonexistent tablespace, only a NOTICE.  See comments in guc.h.
1073 			 */
1074 			if (source == PGC_S_TEST)
1075 			{
1076 				ereport(NOTICE,
1077 						(errcode(ERRCODE_UNDEFINED_OBJECT),
1078 						 errmsg("tablespace \"%s\" does not exist",
1079 								*newval)));
1080 			}
1081 			else
1082 			{
1083 				GUC_check_errdetail("Tablespace \"%s\" does not exist.",
1084 									*newval);
1085 				return false;
1086 			}
1087 		}
1088 	}
1089 
1090 	return true;
1091 }
1092 
1093 /*
1094  * GetDefaultTablespace -- get the OID of the current default tablespace
1095  *
1096  * Temporary objects have different default tablespaces, hence the
1097  * relpersistence parameter must be specified.
1098  *
1099  * May return InvalidOid to indicate "use the database's default tablespace".
1100  *
1101  * Note that caller is expected to check appropriate permissions for any
1102  * result other than InvalidOid.
1103  *
1104  * This exists to hide (and possibly optimize the use of) the
1105  * default_tablespace GUC variable.
1106  */
1107 Oid
GetDefaultTablespace(char relpersistence)1108 GetDefaultTablespace(char relpersistence)
1109 {
1110 	Oid			result;
1111 
1112 	/* The temp-table case is handled elsewhere */
1113 	if (relpersistence == RELPERSISTENCE_TEMP)
1114 	{
1115 		PrepareTempTablespaces();
1116 		return GetNextTempTableSpace();
1117 	}
1118 
1119 	/* Fast path for default_tablespace == "" */
1120 	if (default_tablespace == NULL || default_tablespace[0] == '\0')
1121 		return InvalidOid;
1122 
1123 	/*
1124 	 * It is tempting to cache this lookup for more speed, but then we would
1125 	 * fail to detect the case where the tablespace was dropped since the GUC
1126 	 * variable was set.  Note also that we don't complain if the value fails
1127 	 * to refer to an existing tablespace; we just silently return InvalidOid,
1128 	 * causing the new object to be created in the database's tablespace.
1129 	 */
1130 	result = get_tablespace_oid(default_tablespace, true);
1131 
1132 	/*
1133 	 * Allow explicit specification of database's default tablespace in
1134 	 * default_tablespace without triggering permissions checks.
1135 	 */
1136 	if (result == MyDatabaseTableSpace)
1137 		result = InvalidOid;
1138 	return result;
1139 }
1140 
1141 
1142 /*
1143  * Routines for handling the GUC variable 'temp_tablespaces'.
1144  */
1145 
1146 typedef struct
1147 {
1148 	/* Array of OIDs to be passed to SetTempTablespaces() */
1149 	int			numSpcs;
1150 	Oid			tblSpcs[FLEXIBLE_ARRAY_MEMBER];
1151 } temp_tablespaces_extra;
1152 
1153 /* check_hook: validate new temp_tablespaces */
1154 bool
check_temp_tablespaces(char ** newval,void ** extra,GucSource source)1155 check_temp_tablespaces(char **newval, void **extra, GucSource source)
1156 {
1157 	char	   *rawname;
1158 	List	   *namelist;
1159 
1160 	/* Need a modifiable copy of string */
1161 	rawname = pstrdup(*newval);
1162 
1163 	/* Parse string into list of identifiers */
1164 	if (!SplitIdentifierString(rawname, ',', &namelist))
1165 	{
1166 		/* syntax error in name list */
1167 		GUC_check_errdetail("List syntax is invalid.");
1168 		pfree(rawname);
1169 		list_free(namelist);
1170 		return false;
1171 	}
1172 
1173 	/*
1174 	 * If we aren't inside a transaction, or connected to a database, we
1175 	 * cannot do the catalog accesses necessary to verify the name.  Must
1176 	 * accept the value on faith.
1177 	 * Fortunately, there's then also no need to pass the data to fd.c.
1178 	 */
1179 	if (IsTransactionState() && MyDatabaseId != InvalidOid)
1180 	{
1181 		temp_tablespaces_extra *myextra;
1182 		Oid		   *tblSpcs;
1183 		int			numSpcs;
1184 		ListCell   *l;
1185 
1186 		/* temporary workspace until we are done verifying the list */
1187 		tblSpcs = (Oid *) palloc(list_length(namelist) * sizeof(Oid));
1188 		numSpcs = 0;
1189 		foreach(l, namelist)
1190 		{
1191 			char	   *curname = (char *) lfirst(l);
1192 			Oid			curoid;
1193 			AclResult	aclresult;
1194 
1195 			/* Allow an empty string (signifying database default) */
1196 			if (curname[0] == '\0')
1197 			{
1198 				/* InvalidOid signifies database's default tablespace */
1199 				tblSpcs[numSpcs++] = InvalidOid;
1200 				continue;
1201 			}
1202 
1203 			/*
1204 			 * In an interactive SET command, we ereport for bad info.  When
1205 			 * source == PGC_S_TEST, don't throw a hard error for a
1206 			 * nonexistent tablespace, only a NOTICE.  See comments in guc.h.
1207 			 */
1208 			curoid = get_tablespace_oid(curname, source <= PGC_S_TEST);
1209 			if (curoid == InvalidOid)
1210 			{
1211 				if (source == PGC_S_TEST)
1212 					ereport(NOTICE,
1213 							(errcode(ERRCODE_UNDEFINED_OBJECT),
1214 							 errmsg("tablespace \"%s\" does not exist",
1215 									curname)));
1216 				continue;
1217 			}
1218 
1219 			/*
1220 			 * Allow explicit specification of database's default tablespace
1221 			 * in temp_tablespaces without triggering permissions checks.
1222 			 */
1223 			if (curoid == MyDatabaseTableSpace)
1224 			{
1225 				/* InvalidOid signifies database's default tablespace */
1226 				tblSpcs[numSpcs++] = InvalidOid;
1227 				continue;
1228 			}
1229 
1230 			/* Check permissions, similarly complaining only if interactive */
1231 			aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1232 											   ACL_CREATE);
1233 			if (aclresult != ACLCHECK_OK)
1234 			{
1235 				if (source >= PGC_S_INTERACTIVE)
1236 					aclcheck_error(aclresult, ACL_KIND_TABLESPACE, curname);
1237 				continue;
1238 			}
1239 
1240 			tblSpcs[numSpcs++] = curoid;
1241 		}
1242 
1243 		/* Now prepare an "extra" struct for assign_temp_tablespaces */
1244 		myextra = malloc(offsetof(temp_tablespaces_extra, tblSpcs) +
1245 						 numSpcs * sizeof(Oid));
1246 		if (!myextra)
1247 			return false;
1248 		myextra->numSpcs = numSpcs;
1249 		memcpy(myextra->tblSpcs, tblSpcs, numSpcs * sizeof(Oid));
1250 		*extra = (void *) myextra;
1251 
1252 		pfree(tblSpcs);
1253 	}
1254 
1255 	pfree(rawname);
1256 	list_free(namelist);
1257 
1258 	return true;
1259 }
1260 
1261 /* assign_hook: do extra actions as needed */
1262 void
assign_temp_tablespaces(const char * newval,void * extra)1263 assign_temp_tablespaces(const char *newval, void *extra)
1264 {
1265 	temp_tablespaces_extra *myextra = (temp_tablespaces_extra *) extra;
1266 
1267 	/*
1268 	 * If check_temp_tablespaces was executed inside a transaction, then pass
1269 	 * the list it made to fd.c.  Otherwise, clear fd.c's list; we must be
1270 	 * still outside a transaction, or else restoring during transaction exit,
1271 	 * and in either case we can just let the next PrepareTempTablespaces call
1272 	 * make things sane.
1273 	 */
1274 	if (myextra)
1275 		SetTempTablespaces(myextra->tblSpcs, myextra->numSpcs);
1276 	else
1277 		SetTempTablespaces(NULL, 0);
1278 }
1279 
1280 /*
1281  * PrepareTempTablespaces -- prepare to use temp tablespaces
1282  *
1283  * If we have not already done so in the current transaction, parse the
1284  * temp_tablespaces GUC variable and tell fd.c which tablespace(s) to use
1285  * for temp files.
1286  */
1287 void
PrepareTempTablespaces(void)1288 PrepareTempTablespaces(void)
1289 {
1290 	char	   *rawname;
1291 	List	   *namelist;
1292 	Oid		   *tblSpcs;
1293 	int			numSpcs;
1294 	ListCell   *l;
1295 
1296 	/* No work if already done in current transaction */
1297 	if (TempTablespacesAreSet())
1298 		return;
1299 
1300 	/*
1301 	 * Can't do catalog access unless within a transaction.  This is just a
1302 	 * safety check in case this function is called by low-level code that
1303 	 * could conceivably execute outside a transaction.  Note that in such a
1304 	 * scenario, fd.c will fall back to using the current database's default
1305 	 * tablespace, which should always be OK.
1306 	 */
1307 	if (!IsTransactionState())
1308 		return;
1309 
1310 	/* Need a modifiable copy of string */
1311 	rawname = pstrdup(temp_tablespaces);
1312 
1313 	/* Parse string into list of identifiers */
1314 	if (!SplitIdentifierString(rawname, ',', &namelist))
1315 	{
1316 		/* syntax error in name list */
1317 		SetTempTablespaces(NULL, 0);
1318 		pfree(rawname);
1319 		list_free(namelist);
1320 		return;
1321 	}
1322 
1323 	/* Store tablespace OIDs in an array in TopTransactionContext */
1324 	tblSpcs = (Oid *) MemoryContextAlloc(TopTransactionContext,
1325 										 list_length(namelist) * sizeof(Oid));
1326 	numSpcs = 0;
1327 	foreach(l, namelist)
1328 	{
1329 		char	   *curname = (char *) lfirst(l);
1330 		Oid			curoid;
1331 		AclResult	aclresult;
1332 
1333 		/* Allow an empty string (signifying database default) */
1334 		if (curname[0] == '\0')
1335 		{
1336 			/* InvalidOid signifies database's default tablespace */
1337 			tblSpcs[numSpcs++] = InvalidOid;
1338 			continue;
1339 		}
1340 
1341 		/* Else verify that name is a valid tablespace name */
1342 		curoid = get_tablespace_oid(curname, true);
1343 		if (curoid == InvalidOid)
1344 		{
1345 			/* Skip any bad list elements */
1346 			continue;
1347 		}
1348 
1349 		/*
1350 		 * Allow explicit specification of database's default tablespace in
1351 		 * temp_tablespaces without triggering permissions checks.
1352 		 */
1353 		if (curoid == MyDatabaseTableSpace)
1354 		{
1355 			/* InvalidOid signifies database's default tablespace */
1356 			tblSpcs[numSpcs++] = InvalidOid;
1357 			continue;
1358 		}
1359 
1360 		/* Check permissions similarly */
1361 		aclresult = pg_tablespace_aclcheck(curoid, GetUserId(),
1362 										   ACL_CREATE);
1363 		if (aclresult != ACLCHECK_OK)
1364 			continue;
1365 
1366 		tblSpcs[numSpcs++] = curoid;
1367 	}
1368 
1369 	SetTempTablespaces(tblSpcs, numSpcs);
1370 
1371 	pfree(rawname);
1372 	list_free(namelist);
1373 }
1374 
1375 
1376 /*
1377  * get_tablespace_oid - given a tablespace name, look up the OID
1378  *
1379  * If missing_ok is false, throw an error if tablespace name not found.  If
1380  * true, just return InvalidOid.
1381  */
1382 Oid
get_tablespace_oid(const char * tablespacename,bool missing_ok)1383 get_tablespace_oid(const char *tablespacename, bool missing_ok)
1384 {
1385 	Oid			result;
1386 	Relation	rel;
1387 	HeapScanDesc scandesc;
1388 	HeapTuple	tuple;
1389 	ScanKeyData entry[1];
1390 
1391 	/*
1392 	 * Search pg_tablespace.  We use a heapscan here even though there is an
1393 	 * index on name, on the theory that pg_tablespace will usually have just
1394 	 * a few entries and so an indexed lookup is a waste of effort.
1395 	 */
1396 	rel = heap_open(TableSpaceRelationId, AccessShareLock);
1397 
1398 	ScanKeyInit(&entry[0],
1399 				Anum_pg_tablespace_spcname,
1400 				BTEqualStrategyNumber, F_NAMEEQ,
1401 				CStringGetDatum(tablespacename));
1402 	scandesc = heap_beginscan_catalog(rel, 1, entry);
1403 	tuple = heap_getnext(scandesc, ForwardScanDirection);
1404 
1405 	/* We assume that there can be at most one matching tuple */
1406 	if (HeapTupleIsValid(tuple))
1407 		result = HeapTupleGetOid(tuple);
1408 	else
1409 		result = InvalidOid;
1410 
1411 	heap_endscan(scandesc);
1412 	heap_close(rel, AccessShareLock);
1413 
1414 	if (!OidIsValid(result) && !missing_ok)
1415 		ereport(ERROR,
1416 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1417 				 errmsg("tablespace \"%s\" does not exist",
1418 						tablespacename)));
1419 
1420 	return result;
1421 }
1422 
1423 /*
1424  * get_tablespace_name - given a tablespace OID, look up the name
1425  *
1426  * Returns a palloc'd string, or NULL if no such tablespace.
1427  */
1428 char *
get_tablespace_name(Oid spc_oid)1429 get_tablespace_name(Oid spc_oid)
1430 {
1431 	char	   *result;
1432 	Relation	rel;
1433 	HeapScanDesc scandesc;
1434 	HeapTuple	tuple;
1435 	ScanKeyData entry[1];
1436 
1437 	/*
1438 	 * Search pg_tablespace.  We use a heapscan here even though there is an
1439 	 * index on oid, on the theory that pg_tablespace will usually have just a
1440 	 * few entries and so an indexed lookup is a waste of effort.
1441 	 */
1442 	rel = heap_open(TableSpaceRelationId, AccessShareLock);
1443 
1444 	ScanKeyInit(&entry[0],
1445 				ObjectIdAttributeNumber,
1446 				BTEqualStrategyNumber, F_OIDEQ,
1447 				ObjectIdGetDatum(spc_oid));
1448 	scandesc = heap_beginscan_catalog(rel, 1, entry);
1449 	tuple = heap_getnext(scandesc, ForwardScanDirection);
1450 
1451 	/* We assume that there can be at most one matching tuple */
1452 	if (HeapTupleIsValid(tuple))
1453 		result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname));
1454 	else
1455 		result = NULL;
1456 
1457 	heap_endscan(scandesc);
1458 	heap_close(rel, AccessShareLock);
1459 
1460 	return result;
1461 }
1462 
1463 
1464 /*
1465  * TABLESPACE resource manager's routines
1466  */
1467 void
tblspc_redo(XLogReaderState * record)1468 tblspc_redo(XLogReaderState *record)
1469 {
1470 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1471 
1472 	/* Backup blocks are not used in tblspc records */
1473 	Assert(!XLogRecHasAnyBlockRefs(record));
1474 
1475 	if (info == XLOG_TBLSPC_CREATE)
1476 	{
1477 		xl_tblspc_create_rec *xlrec = (xl_tblspc_create_rec *) XLogRecGetData(record);
1478 		char	   *location = xlrec->ts_path;
1479 
1480 		create_tablespace_directories(location, xlrec->ts_id);
1481 	}
1482 	else if (info == XLOG_TBLSPC_DROP)
1483 	{
1484 		xl_tblspc_drop_rec *xlrec = (xl_tblspc_drop_rec *) XLogRecGetData(record);
1485 
1486 		/*
1487 		 * If we issued a WAL record for a drop tablespace it implies that
1488 		 * there were no files in it at all when the DROP was done. That means
1489 		 * that no permanent objects can exist in it at this point.
1490 		 *
1491 		 * It is possible for standby users to be using this tablespace as a
1492 		 * location for their temporary files, so if we fail to remove all
1493 		 * files then do conflict processing and try again, if currently
1494 		 * enabled.
1495 		 *
1496 		 * Other possible reasons for failure include bollixed file
1497 		 * permissions on a standby server when they were okay on the primary,
1498 		 * etc etc. There's not much we can do about that, so just remove what
1499 		 * we can and press on.
1500 		 */
1501 		if (!destroy_tablespace_directories(xlrec->ts_id, true))
1502 		{
1503 			ResolveRecoveryConflictWithTablespace(xlrec->ts_id);
1504 
1505 			/*
1506 			 * If we did recovery processing then hopefully the backends who
1507 			 * wrote temp files should have cleaned up and exited by now.  So
1508 			 * retry before complaining.  If we fail again, this is just a LOG
1509 			 * condition, because it's not worth throwing an ERROR for (as
1510 			 * that would crash the database and require manual intervention
1511 			 * before we could get past this WAL record on restart).
1512 			 */
1513 			if (!destroy_tablespace_directories(xlrec->ts_id, true))
1514 				ereport(LOG,
1515 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1516 						 errmsg("directories for tablespace %u could not be removed",
1517 								xlrec->ts_id),
1518 						 errhint("You can remove the directories manually if necessary.")));
1519 		}
1520 	}
1521 	else
1522 		elog(PANIC, "tblspc_redo: unknown op code %u", info);
1523 }
1524