1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *	  POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/commands/indexcmds.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/amapi.h"
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/reloptions.h"
22 #include "access/sysattr.h"
23 #include "access/tableam.h"
24 #include "access/xact.h"
25 #include "catalog/catalog.h"
26 #include "catalog/index.h"
27 #include "catalog/indexing.h"
28 #include "catalog/pg_am.h"
29 #include "catalog/pg_constraint.h"
30 #include "catalog/pg_inherits.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_opfamily.h"
33 #include "catalog/pg_tablespace.h"
34 #include "catalog/pg_type.h"
35 #include "commands/comment.h"
36 #include "commands/dbcommands.h"
37 #include "commands/defrem.h"
38 #include "commands/event_trigger.h"
39 #include "commands/progress.h"
40 #include "commands/tablecmds.h"
41 #include "commands/tablespace.h"
42 #include "mb/pg_wchar.h"
43 #include "miscadmin.h"
44 #include "nodes/makefuncs.h"
45 #include "nodes/nodeFuncs.h"
46 #include "optimizer/optimizer.h"
47 #include "parser/parse_coerce.h"
48 #include "parser/parse_func.h"
49 #include "parser/parse_oper.h"
50 #include "partitioning/partdesc.h"
51 #include "pgstat.h"
52 #include "rewrite/rewriteManip.h"
53 #include "storage/lmgr.h"
54 #include "storage/proc.h"
55 #include "storage/procarray.h"
56 #include "storage/sinvaladt.h"
57 #include "utils/acl.h"
58 #include "utils/builtins.h"
59 #include "utils/fmgroids.h"
60 #include "utils/inval.h"
61 #include "utils/lsyscache.h"
62 #include "utils/memutils.h"
63 #include "utils/partcache.h"
64 #include "utils/pg_rusage.h"
65 #include "utils/regproc.h"
66 #include "utils/snapmgr.h"
67 #include "utils/syscache.h"
68 
69 
70 /* non-export function prototypes */
71 static void CheckPredicate(Expr *predicate);
72 static void ComputeIndexAttrs(IndexInfo *indexInfo,
73 							  Oid *typeOidP,
74 							  Oid *collationOidP,
75 							  Oid *classOidP,
76 							  int16 *colOptionP,
77 							  List *attList,
78 							  List *exclusionOpNames,
79 							  Oid relId,
80 							  const char *accessMethodName, Oid accessMethodId,
81 							  bool amcanorder,
82 							  bool isconstraint);
83 static char *ChooseIndexName(const char *tabname, Oid namespaceId,
84 							 List *colnames, List *exclusionOpNames,
85 							 bool primary, bool isconstraint);
86 static char *ChooseIndexNameAddition(List *colnames);
87 static List *ChooseIndexColumnNames(List *indexElems);
88 static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
89 											Oid relId, Oid oldRelId, void *arg);
90 static bool ReindexRelationConcurrently(Oid relationOid, int options);
91 static void ReindexPartitionedIndex(Relation parentIdx);
92 static void update_relispartition(Oid relationId, bool newval);
93 static bool CompareOpclassOptions(Datum *opts1, Datum *opts2, int natts);
94 
95 /*
96  * callback argument type for RangeVarCallbackForReindexIndex()
97  */
98 struct ReindexIndexCallbackState
99 {
100 	bool		concurrent;		/* flag from statement */
101 	Oid			locked_table_oid;	/* tracks previously locked table */
102 };
103 
104 /*
105  * CheckIndexCompatible
106  *		Determine whether an existing index definition is compatible with a
107  *		prospective index definition, such that the existing index storage
108  *		could become the storage of the new index, avoiding a rebuild.
109  *
110  * 'heapRelation': the relation the index would apply to.
111  * 'accessMethodName': name of the AM to use.
112  * 'attributeList': a list of IndexElem specifying columns and expressions
113  *		to index on.
114  * 'exclusionOpNames': list of names of exclusion-constraint operators,
115  *		or NIL if not an exclusion constraint.
116  *
117  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
118  * any indexes that depended on a changing column from their pg_get_indexdef
119  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
120  * DefineIndex.  We assume that the old and new indexes have the same number
121  * of columns and that if one has an expression column or predicate, both do.
122  * Errors arising from the attribute list still apply.
123  *
124  * Most column type changes that can skip a table rewrite do not invalidate
125  * indexes.  We acknowledge this when all operator classes, collations and
126  * exclusion operators match.  Though we could further permit intra-opfamily
127  * changes for btree and hash indexes, that adds subtle complexity with no
128  * concrete benefit for core types. Note, that INCLUDE columns aren't
129  * checked by this function, for them it's enough that table rewrite is
130  * skipped.
131  *
132  * When a comparison or exclusion operator has a polymorphic input type, the
133  * actual input types must also match.  This defends against the possibility
134  * that operators could vary behavior in response to get_fn_expr_argtype().
135  * At present, this hazard is theoretical: check_exclusion_constraint() and
136  * all core index access methods decline to set fn_expr for such calls.
137  *
138  * We do not yet implement a test to verify compatibility of expression
139  * columns or predicates, so assume any such index is incompatible.
140  */
141 bool
CheckIndexCompatible(Oid oldId,const char * accessMethodName,List * attributeList,List * exclusionOpNames)142 CheckIndexCompatible(Oid oldId,
143 					 const char *accessMethodName,
144 					 List *attributeList,
145 					 List *exclusionOpNames)
146 {
147 	bool		isconstraint;
148 	Oid		   *typeObjectId;
149 	Oid		   *collationObjectId;
150 	Oid		   *classObjectId;
151 	Oid			accessMethodId;
152 	Oid			relationId;
153 	HeapTuple	tuple;
154 	Form_pg_index indexForm;
155 	Form_pg_am	accessMethodForm;
156 	IndexAmRoutine *amRoutine;
157 	bool		amcanorder;
158 	int16	   *coloptions;
159 	IndexInfo  *indexInfo;
160 	int			numberOfAttributes;
161 	int			old_natts;
162 	bool		isnull;
163 	bool		ret = true;
164 	oidvector  *old_indclass;
165 	oidvector  *old_indcollation;
166 	Relation	irel;
167 	int			i;
168 	Datum		d;
169 
170 	/* Caller should already have the relation locked in some way. */
171 	relationId = IndexGetRelation(oldId, false);
172 
173 	/*
174 	 * We can pretend isconstraint = false unconditionally.  It only serves to
175 	 * decide the text of an error message that should never happen for us.
176 	 */
177 	isconstraint = false;
178 
179 	numberOfAttributes = list_length(attributeList);
180 	Assert(numberOfAttributes > 0);
181 	Assert(numberOfAttributes <= INDEX_MAX_KEYS);
182 
183 	/* look up the access method */
184 	tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
185 	if (!HeapTupleIsValid(tuple))
186 		ereport(ERROR,
187 				(errcode(ERRCODE_UNDEFINED_OBJECT),
188 				 errmsg("access method \"%s\" does not exist",
189 						accessMethodName)));
190 	accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
191 	accessMethodId = accessMethodForm->oid;
192 	amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
193 	ReleaseSysCache(tuple);
194 
195 	amcanorder = amRoutine->amcanorder;
196 
197 	/*
198 	 * Compute the operator classes, collations, and exclusion operators for
199 	 * the new index, so we can test whether it's compatible with the existing
200 	 * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
201 	 * DefineIndex would have called this function with the same arguments
202 	 * later on, and it would have failed then anyway.  Our attributeList
203 	 * contains only key attributes, thus we're filling ii_NumIndexAttrs and
204 	 * ii_NumIndexKeyAttrs with same value.
205 	 */
206 	indexInfo = makeIndexInfo(numberOfAttributes, numberOfAttributes,
207 							  accessMethodId, NIL, NIL, false, false, false);
208 	typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
209 	collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
210 	classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
211 	coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
212 	ComputeIndexAttrs(indexInfo,
213 					  typeObjectId, collationObjectId, classObjectId,
214 					  coloptions, attributeList,
215 					  exclusionOpNames, relationId,
216 					  accessMethodName, accessMethodId,
217 					  amcanorder, isconstraint);
218 
219 
220 	/* Get the soon-obsolete pg_index tuple. */
221 	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
222 	if (!HeapTupleIsValid(tuple))
223 		elog(ERROR, "cache lookup failed for index %u", oldId);
224 	indexForm = (Form_pg_index) GETSTRUCT(tuple);
225 
226 	/*
227 	 * We don't assess expressions or predicates; assume incompatibility.
228 	 * Also, if the index is invalid for any reason, treat it as incompatible.
229 	 */
230 	if (!(heap_attisnull(tuple, Anum_pg_index_indpred, NULL) &&
231 		  heap_attisnull(tuple, Anum_pg_index_indexprs, NULL) &&
232 		  indexForm->indisvalid))
233 	{
234 		ReleaseSysCache(tuple);
235 		return false;
236 	}
237 
238 	/* Any change in operator class or collation breaks compatibility. */
239 	old_natts = indexForm->indnkeyatts;
240 	Assert(old_natts == numberOfAttributes);
241 
242 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
243 	Assert(!isnull);
244 	old_indcollation = (oidvector *) DatumGetPointer(d);
245 
246 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indclass, &isnull);
247 	Assert(!isnull);
248 	old_indclass = (oidvector *) DatumGetPointer(d);
249 
250 	ret = (memcmp(old_indclass->values, classObjectId,
251 				  old_natts * sizeof(Oid)) == 0 &&
252 		   memcmp(old_indcollation->values, collationObjectId,
253 				  old_natts * sizeof(Oid)) == 0);
254 
255 	ReleaseSysCache(tuple);
256 
257 	if (!ret)
258 		return false;
259 
260 	/* For polymorphic opcintype, column type changes break compatibility. */
261 	irel = index_open(oldId, AccessShareLock);	/* caller probably has a lock */
262 	for (i = 0; i < old_natts; i++)
263 	{
264 		if (IsPolymorphicType(get_opclass_input_type(classObjectId[i])) &&
265 			TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
266 		{
267 			ret = false;
268 			break;
269 		}
270 	}
271 
272 	/* Any change in opclass options break compatibility. */
273 	if (ret)
274 	{
275 		Datum	   *opclassOptions = RelationGetIndexRawAttOptions(irel);
276 
277 		ret = CompareOpclassOptions(opclassOptions,
278 									indexInfo->ii_OpclassOptions, old_natts);
279 
280 		if (opclassOptions)
281 			pfree(opclassOptions);
282 	}
283 
284 	/* Any change in exclusion operator selections breaks compatibility. */
285 	if (ret && indexInfo->ii_ExclusionOps != NULL)
286 	{
287 		Oid		   *old_operators,
288 				   *old_procs;
289 		uint16	   *old_strats;
290 
291 		RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
292 		ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
293 					 old_natts * sizeof(Oid)) == 0;
294 
295 		/* Require an exact input type match for polymorphic operators. */
296 		if (ret)
297 		{
298 			for (i = 0; i < old_natts && ret; i++)
299 			{
300 				Oid			left,
301 							right;
302 
303 				op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
304 				if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
305 					TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
306 				{
307 					ret = false;
308 					break;
309 				}
310 			}
311 		}
312 	}
313 
314 	index_close(irel, NoLock);
315 	return ret;
316 }
317 
318 /*
319  * CompareOpclassOptions
320  *
321  * Compare per-column opclass options which are represented by arrays of text[]
322  * datums.  Both elements of arrays and array themselves can be NULL.
323  */
324 static bool
CompareOpclassOptions(Datum * opts1,Datum * opts2,int natts)325 CompareOpclassOptions(Datum *opts1, Datum *opts2, int natts)
326 {
327 	int			i;
328 
329 	if (!opts1 && !opts2)
330 		return true;
331 
332 	for (i = 0; i < natts; i++)
333 	{
334 		Datum		opt1 = opts1 ? opts1[i] : (Datum) 0;
335 		Datum		opt2 = opts2 ? opts2[i] : (Datum) 0;
336 
337 		if (opt1 == (Datum) 0)
338 		{
339 			if (opt2 == (Datum) 0)
340 				continue;
341 			else
342 				return false;
343 		}
344 		else if (opt2 == (Datum) 0)
345 			return false;
346 
347 		/* Compare non-NULL text[] datums. */
348 		if (!DatumGetBool(DirectFunctionCall2(array_eq, opt1, opt2)))
349 			return false;
350 	}
351 
352 	return true;
353 }
354 
355 /*
356  * WaitForOlderSnapshots
357  *
358  * Wait for transactions that might have an older snapshot than the given xmin
359  * limit, because it might not contain tuples deleted just before it has
360  * been taken. Obtain a list of VXIDs of such transactions, and wait for them
361  * individually. This is used when building an index concurrently.
362  *
363  * We can exclude any running transactions that have xmin > the xmin given;
364  * their oldest snapshot must be newer than our xmin limit.
365  * We can also exclude any transactions that have xmin = zero, since they
366  * evidently have no live snapshot at all (and any one they might be in
367  * process of taking is certainly newer than ours).  Transactions in other
368  * DBs can be ignored too, since they'll never even be able to see the
369  * index being worked on.
370  *
371  * We can also exclude autovacuum processes and processes running manual
372  * lazy VACUUMs, because they won't be fazed by missing index entries
373  * either.  (Manual ANALYZEs, however, can't be excluded because they
374  * might be within transactions that are going to do arbitrary operations
375  * later.)
376  *
377  * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
378  * check for that.
379  *
380  * If a process goes idle-in-transaction with xmin zero, we do not need to
381  * wait for it anymore, per the above argument.  We do not have the
382  * infrastructure right now to stop waiting if that happens, but we can at
383  * least avoid the folly of waiting when it is idle at the time we would
384  * begin to wait.  We do this by repeatedly rechecking the output of
385  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
386  * doesn't show up in the output, we know we can forget about it.
387  */
388 static void
WaitForOlderSnapshots(TransactionId limitXmin,bool progress)389 WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
390 {
391 	int			n_old_snapshots;
392 	int			i;
393 	VirtualTransactionId *old_snapshots;
394 
395 	old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
396 										  PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
397 										  &n_old_snapshots);
398 	if (progress)
399 		pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, n_old_snapshots);
400 
401 	for (i = 0; i < n_old_snapshots; i++)
402 	{
403 		if (!VirtualTransactionIdIsValid(old_snapshots[i]))
404 			continue;			/* found uninteresting in previous cycle */
405 
406 		if (i > 0)
407 		{
408 			/* see if anything's changed ... */
409 			VirtualTransactionId *newer_snapshots;
410 			int			n_newer_snapshots;
411 			int			j;
412 			int			k;
413 
414 			newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
415 													true, false,
416 													PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
417 													&n_newer_snapshots);
418 			for (j = i; j < n_old_snapshots; j++)
419 			{
420 				if (!VirtualTransactionIdIsValid(old_snapshots[j]))
421 					continue;	/* found uninteresting in previous cycle */
422 				for (k = 0; k < n_newer_snapshots; k++)
423 				{
424 					if (VirtualTransactionIdEquals(old_snapshots[j],
425 												   newer_snapshots[k]))
426 						break;
427 				}
428 				if (k >= n_newer_snapshots) /* not there anymore */
429 					SetInvalidVirtualTransactionId(old_snapshots[j]);
430 			}
431 			pfree(newer_snapshots);
432 		}
433 
434 		if (VirtualTransactionIdIsValid(old_snapshots[i]))
435 		{
436 			/* If requested, publish who we're going to wait for. */
437 			if (progress)
438 			{
439 				PGPROC	   *holder = BackendIdGetProc(old_snapshots[i].backendId);
440 
441 				if (holder)
442 					pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
443 												 holder->pid);
444 			}
445 			VirtualXactLock(old_snapshots[i], true);
446 		}
447 
448 		if (progress)
449 			pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, i + 1);
450 	}
451 }
452 
453 
454 /*
455  * DefineIndex
456  *		Creates a new index.
457  *
458  * 'relationId': the OID of the heap relation on which the index is to be
459  *		created
460  * 'stmt': IndexStmt describing the properties of the new index.
461  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
462  *		nonzero to specify a preselected OID for the index.
463  * 'parentIndexId': the OID of the parent index; InvalidOid if not the child
464  *		of a partitioned index.
465  * 'parentConstraintId': the OID of the parent constraint; InvalidOid if not
466  *		the child of a constraint (only used when recursing)
467  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
468  * 'check_rights': check for CREATE rights in namespace and tablespace.  (This
469  *		should be true except when ALTER is deleting/recreating an index.)
470  * 'check_not_in_use': check for table not already in use in current session.
471  *		This should be true unless caller is holding the table open, in which
472  *		case the caller had better have checked it earlier.
473  * 'skip_build': make the catalog entries but don't create the index files
474  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
475  *
476  * Returns the object address of the created index.
477  */
478 ObjectAddress
DefineIndex(Oid relationId,IndexStmt * stmt,Oid indexRelationId,Oid parentIndexId,Oid parentConstraintId,bool is_alter_table,bool check_rights,bool check_not_in_use,bool skip_build,bool quiet)479 DefineIndex(Oid relationId,
480 			IndexStmt *stmt,
481 			Oid indexRelationId,
482 			Oid parentIndexId,
483 			Oid parentConstraintId,
484 			bool is_alter_table,
485 			bool check_rights,
486 			bool check_not_in_use,
487 			bool skip_build,
488 			bool quiet)
489 {
490 	bool		concurrent;
491 	char	   *indexRelationName;
492 	char	   *accessMethodName;
493 	Oid		   *typeObjectId;
494 	Oid		   *collationObjectId;
495 	Oid		   *classObjectId;
496 	Oid			accessMethodId;
497 	Oid			namespaceId;
498 	Oid			tablespaceId;
499 	Oid			createdConstraintId = InvalidOid;
500 	List	   *indexColNames;
501 	List	   *allIndexParams;
502 	Relation	rel;
503 	HeapTuple	tuple;
504 	Form_pg_am	accessMethodForm;
505 	IndexAmRoutine *amRoutine;
506 	bool		amcanorder;
507 	amoptions_function amoptions;
508 	bool		partitioned;
509 	Datum		reloptions;
510 	int16	   *coloptions;
511 	IndexInfo  *indexInfo;
512 	bits16		flags;
513 	bits16		constr_flags;
514 	int			numberOfAttributes;
515 	int			numberOfKeyAttributes;
516 	TransactionId limitXmin;
517 	ObjectAddress address;
518 	LockRelId	heaprelid;
519 	LOCKTAG		heaplocktag;
520 	LOCKMODE	lockmode;
521 	Snapshot	snapshot;
522 	int			save_nestlevel = -1;
523 	int			i;
524 
525 	/*
526 	 * Some callers need us to run with an empty default_tablespace; this is a
527 	 * necessary hack to be able to reproduce catalog state accurately when
528 	 * recreating indexes after table-rewriting ALTER TABLE.
529 	 */
530 	if (stmt->reset_default_tblspc)
531 	{
532 		save_nestlevel = NewGUCNestLevel();
533 		(void) set_config_option("default_tablespace", "",
534 								 PGC_USERSET, PGC_S_SESSION,
535 								 GUC_ACTION_SAVE, true, 0, false);
536 	}
537 
538 	/*
539 	 * Force non-concurrent build on temporary relations, even if CONCURRENTLY
540 	 * was requested.  Other backends can't access a temporary relation, so
541 	 * there's no harm in grabbing a stronger lock, and a non-concurrent DROP
542 	 * is more efficient.  Do this before any use of the concurrent option is
543 	 * done.
544 	 */
545 	if (stmt->concurrent && get_rel_persistence(relationId) != RELPERSISTENCE_TEMP)
546 		concurrent = true;
547 	else
548 		concurrent = false;
549 
550 	/*
551 	 * Start progress report.  If we're building a partition, this was already
552 	 * done.
553 	 */
554 	if (!OidIsValid(parentIndexId))
555 	{
556 		pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
557 									  relationId);
558 		pgstat_progress_update_param(PROGRESS_CREATEIDX_COMMAND,
559 									 concurrent ?
560 									 PROGRESS_CREATEIDX_COMMAND_CREATE_CONCURRENTLY :
561 									 PROGRESS_CREATEIDX_COMMAND_CREATE);
562 	}
563 
564 	/*
565 	 * No index OID to report yet
566 	 */
567 	pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
568 								 InvalidOid);
569 
570 	/*
571 	 * count key attributes in index
572 	 */
573 	numberOfKeyAttributes = list_length(stmt->indexParams);
574 
575 	/*
576 	 * Calculate the new list of index columns including both key columns and
577 	 * INCLUDE columns.  Later we can determine which of these are key
578 	 * columns, and which are just part of the INCLUDE list by checking the
579 	 * list position.  A list item in a position less than ii_NumIndexKeyAttrs
580 	 * is part of the key columns, and anything equal to and over is part of
581 	 * the INCLUDE columns.
582 	 */
583 	allIndexParams = list_concat_copy(stmt->indexParams,
584 									  stmt->indexIncludingParams);
585 	numberOfAttributes = list_length(allIndexParams);
586 
587 	if (numberOfKeyAttributes <= 0)
588 		ereport(ERROR,
589 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
590 				 errmsg("must specify at least one column")));
591 	if (numberOfAttributes > INDEX_MAX_KEYS)
592 		ereport(ERROR,
593 				(errcode(ERRCODE_TOO_MANY_COLUMNS),
594 				 errmsg("cannot use more than %d columns in an index",
595 						INDEX_MAX_KEYS)));
596 
597 	/*
598 	 * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
599 	 * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
600 	 * (but not VACUUM).
601 	 *
602 	 * NB: Caller is responsible for making sure that relationId refers to the
603 	 * relation on which the index should be built; except in bootstrap mode,
604 	 * this will typically require the caller to have already locked the
605 	 * relation.  To avoid lock upgrade hazards, that lock should be at least
606 	 * as strong as the one we take here.
607 	 *
608 	 * NB: If the lock strength here ever changes, code that is run by
609 	 * parallel workers under the control of certain particular ambuild
610 	 * functions will need to be updated, too.
611 	 */
612 	lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
613 	rel = table_open(relationId, lockmode);
614 
615 	namespaceId = RelationGetNamespace(rel);
616 
617 	/* Ensure that it makes sense to index this kind of relation */
618 	switch (rel->rd_rel->relkind)
619 	{
620 		case RELKIND_RELATION:
621 		case RELKIND_MATVIEW:
622 		case RELKIND_PARTITIONED_TABLE:
623 			/* OK */
624 			break;
625 		case RELKIND_FOREIGN_TABLE:
626 
627 			/*
628 			 * Custom error message for FOREIGN TABLE since the term is close
629 			 * to a regular table and can confuse the user.
630 			 */
631 			ereport(ERROR,
632 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
633 					 errmsg("cannot create index on foreign table \"%s\"",
634 							RelationGetRelationName(rel))));
635 			break;
636 		default:
637 			ereport(ERROR,
638 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
639 					 errmsg("\"%s\" is not a table or materialized view",
640 							RelationGetRelationName(rel))));
641 			break;
642 	}
643 
644 	/*
645 	 * Establish behavior for partitioned tables, and verify sanity of
646 	 * parameters.
647 	 *
648 	 * We do not build an actual index in this case; we only create a few
649 	 * catalog entries.  The actual indexes are built by recursing for each
650 	 * partition.
651 	 */
652 	partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
653 	if (partitioned)
654 	{
655 		/*
656 		 * Note: we check 'stmt->concurrent' rather than 'concurrent', so that
657 		 * the error is thrown also for temporary tables.  Seems better to be
658 		 * consistent, even though we could do it on temporary table because
659 		 * we're not actually doing it concurrently.
660 		 */
661 		if (stmt->concurrent)
662 			ereport(ERROR,
663 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
664 					 errmsg("cannot create index on partitioned table \"%s\" concurrently",
665 							RelationGetRelationName(rel))));
666 		if (stmt->excludeOpNames)
667 			ereport(ERROR,
668 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
669 					 errmsg("cannot create exclusion constraints on partitioned table \"%s\"",
670 							RelationGetRelationName(rel))));
671 	}
672 
673 	/*
674 	 * Don't try to CREATE INDEX on temp tables of other backends.
675 	 */
676 	if (RELATION_IS_OTHER_TEMP(rel))
677 		ereport(ERROR,
678 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
679 				 errmsg("cannot create indexes on temporary tables of other sessions")));
680 
681 	/*
682 	 * Unless our caller vouches for having checked this already, insist that
683 	 * the table not be in use by our own session, either.  Otherwise we might
684 	 * fail to make entries in the new index (for instance, if an INSERT or
685 	 * UPDATE is in progress and has already made its list of target indexes).
686 	 */
687 	if (check_not_in_use)
688 		CheckTableNotInUse(rel, "CREATE INDEX");
689 
690 	/*
691 	 * Verify we (still) have CREATE rights in the rel's namespace.
692 	 * (Presumably we did when the rel was created, but maybe not anymore.)
693 	 * Skip check if caller doesn't want it.  Also skip check if
694 	 * bootstrapping, since permissions machinery may not be working yet.
695 	 */
696 	if (check_rights && !IsBootstrapProcessingMode())
697 	{
698 		AclResult	aclresult;
699 
700 		aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
701 										  ACL_CREATE);
702 		if (aclresult != ACLCHECK_OK)
703 			aclcheck_error(aclresult, OBJECT_SCHEMA,
704 						   get_namespace_name(namespaceId));
705 	}
706 
707 	/*
708 	 * Select tablespace to use.  If not specified, use default tablespace
709 	 * (which may in turn default to database's default).
710 	 */
711 	if (stmt->tableSpace)
712 	{
713 		tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
714 		if (partitioned && tablespaceId == MyDatabaseTableSpace)
715 			ereport(ERROR,
716 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
717 					 errmsg("cannot specify default tablespace for partitioned relations")));
718 	}
719 	else
720 	{
721 		tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence,
722 											partitioned);
723 		/* note InvalidOid is OK in this case */
724 	}
725 
726 	/* Check tablespace permissions */
727 	if (check_rights &&
728 		OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
729 	{
730 		AclResult	aclresult;
731 
732 		aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
733 										   ACL_CREATE);
734 		if (aclresult != ACLCHECK_OK)
735 			aclcheck_error(aclresult, OBJECT_TABLESPACE,
736 						   get_tablespace_name(tablespaceId));
737 	}
738 
739 	/*
740 	 * Force shared indexes into the pg_global tablespace.  This is a bit of a
741 	 * hack but seems simpler than marking them in the BKI commands.  On the
742 	 * other hand, if it's not shared, don't allow it to be placed there.
743 	 */
744 	if (rel->rd_rel->relisshared)
745 		tablespaceId = GLOBALTABLESPACE_OID;
746 	else if (tablespaceId == GLOBALTABLESPACE_OID)
747 		ereport(ERROR,
748 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
749 				 errmsg("only shared relations can be placed in pg_global tablespace")));
750 
751 	/*
752 	 * Choose the index column names.
753 	 */
754 	indexColNames = ChooseIndexColumnNames(allIndexParams);
755 
756 	/*
757 	 * Select name for index if caller didn't specify
758 	 */
759 	indexRelationName = stmt->idxname;
760 	if (indexRelationName == NULL)
761 		indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
762 											namespaceId,
763 											indexColNames,
764 											stmt->excludeOpNames,
765 											stmt->primary,
766 											stmt->isconstraint);
767 
768 	/*
769 	 * look up the access method, verify it can handle the requested features
770 	 */
771 	accessMethodName = stmt->accessMethod;
772 	tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
773 	if (!HeapTupleIsValid(tuple))
774 	{
775 		/*
776 		 * Hack to provide more-or-less-transparent updating of old RTREE
777 		 * indexes to GiST: if RTREE is requested and not found, use GIST.
778 		 */
779 		if (strcmp(accessMethodName, "rtree") == 0)
780 		{
781 			ereport(NOTICE,
782 					(errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
783 			accessMethodName = "gist";
784 			tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
785 		}
786 
787 		if (!HeapTupleIsValid(tuple))
788 			ereport(ERROR,
789 					(errcode(ERRCODE_UNDEFINED_OBJECT),
790 					 errmsg("access method \"%s\" does not exist",
791 							accessMethodName)));
792 	}
793 	accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
794 	accessMethodId = accessMethodForm->oid;
795 	amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
796 
797 	pgstat_progress_update_param(PROGRESS_CREATEIDX_ACCESS_METHOD_OID,
798 								 accessMethodId);
799 
800 	if (stmt->unique && !amRoutine->amcanunique)
801 		ereport(ERROR,
802 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
803 				 errmsg("access method \"%s\" does not support unique indexes",
804 						accessMethodName)));
805 	if (stmt->indexIncludingParams != NIL && !amRoutine->amcaninclude)
806 		ereport(ERROR,
807 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
808 				 errmsg("access method \"%s\" does not support included columns",
809 						accessMethodName)));
810 	if (numberOfKeyAttributes > 1 && !amRoutine->amcanmulticol)
811 		ereport(ERROR,
812 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
813 				 errmsg("access method \"%s\" does not support multicolumn indexes",
814 						accessMethodName)));
815 	if (stmt->excludeOpNames && amRoutine->amgettuple == NULL)
816 		ereport(ERROR,
817 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
818 				 errmsg("access method \"%s\" does not support exclusion constraints",
819 						accessMethodName)));
820 
821 	amcanorder = amRoutine->amcanorder;
822 	amoptions = amRoutine->amoptions;
823 
824 	pfree(amRoutine);
825 	ReleaseSysCache(tuple);
826 
827 	/*
828 	 * Validate predicate, if given
829 	 */
830 	if (stmt->whereClause)
831 		CheckPredicate((Expr *) stmt->whereClause);
832 
833 	/*
834 	 * Parse AM-specific options, convert to text array form, validate.
835 	 */
836 	reloptions = transformRelOptions((Datum) 0, stmt->options,
837 									 NULL, NULL, false, false);
838 
839 	(void) index_reloptions(amoptions, reloptions, true);
840 
841 	/*
842 	 * Prepare arguments for index_create, primarily an IndexInfo structure.
843 	 * Note that predicates must be in implicit-AND format.  In a concurrent
844 	 * build, mark it not-ready-for-inserts.
845 	 */
846 	indexInfo = makeIndexInfo(numberOfAttributes,
847 							  numberOfKeyAttributes,
848 							  accessMethodId,
849 							  NIL,	/* expressions, NIL for now */
850 							  make_ands_implicit((Expr *) stmt->whereClause),
851 							  stmt->unique,
852 							  !concurrent,
853 							  concurrent);
854 
855 	typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
856 	collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
857 	classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
858 	coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
859 	ComputeIndexAttrs(indexInfo,
860 					  typeObjectId, collationObjectId, classObjectId,
861 					  coloptions, allIndexParams,
862 					  stmt->excludeOpNames, relationId,
863 					  accessMethodName, accessMethodId,
864 					  amcanorder, stmt->isconstraint);
865 
866 	/*
867 	 * Extra checks when creating a PRIMARY KEY index.
868 	 */
869 	if (stmt->primary)
870 		index_check_primary_key(rel, indexInfo, is_alter_table, stmt);
871 
872 	/*
873 	 * If this table is partitioned and we're creating a unique index or a
874 	 * primary key, make sure that the partition key is a subset of the
875 	 * index's columns.  Otherwise it would be possible to violate uniqueness
876 	 * by putting values that ought to be unique in different partitions.
877 	 *
878 	 * We could lift this limitation if we had global indexes, but those have
879 	 * their own problems, so this is a useful feature combination.
880 	 */
881 	if (partitioned && (stmt->unique || stmt->primary))
882 	{
883 		PartitionKey key = RelationGetPartitionKey(rel);
884 		const char *constraint_type;
885 		int			i;
886 
887 		if (stmt->primary)
888 			constraint_type = "PRIMARY KEY";
889 		else if (stmt->unique)
890 			constraint_type = "UNIQUE";
891 		else if (stmt->excludeOpNames != NIL)
892 			constraint_type = "EXCLUDE";
893 		else
894 		{
895 			elog(ERROR, "unknown constraint type");
896 			constraint_type = NULL; /* keep compiler quiet */
897 		}
898 
899 		/*
900 		 * Verify that all the columns in the partition key appear in the
901 		 * unique key definition, with the same notion of equality.
902 		 */
903 		for (i = 0; i < key->partnatts; i++)
904 		{
905 			bool		found = false;
906 			int			eq_strategy;
907 			Oid			ptkey_eqop;
908 			int			j;
909 
910 			/*
911 			 * Identify the equality operator associated with this partkey
912 			 * column.  For list and range partitioning, partkeys use btree
913 			 * operator classes; hash partitioning uses hash operator classes.
914 			 * (Keep this in sync with ComputePartitionAttrs!)
915 			 */
916 			if (key->strategy == PARTITION_STRATEGY_HASH)
917 				eq_strategy = HTEqualStrategyNumber;
918 			else
919 				eq_strategy = BTEqualStrategyNumber;
920 
921 			ptkey_eqop = get_opfamily_member(key->partopfamily[i],
922 											 key->partopcintype[i],
923 											 key->partopcintype[i],
924 											 eq_strategy);
925 			if (!OidIsValid(ptkey_eqop))
926 				elog(ERROR, "missing operator %d(%u,%u) in partition opfamily %u",
927 					 eq_strategy, key->partopcintype[i], key->partopcintype[i],
928 					 key->partopfamily[i]);
929 
930 			/*
931 			 * We'll need to be able to identify the equality operators
932 			 * associated with index columns, too.  We know what to do with
933 			 * btree opclasses; if there are ever any other index types that
934 			 * support unique indexes, this logic will need extension.
935 			 */
936 			if (accessMethodId == BTREE_AM_OID)
937 				eq_strategy = BTEqualStrategyNumber;
938 			else
939 				ereport(ERROR,
940 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
941 						 errmsg("cannot match partition key to an index using access method \"%s\"",
942 								accessMethodName)));
943 
944 			/*
945 			 * It may be possible to support UNIQUE constraints when partition
946 			 * keys are expressions, but is it worth it?  Give up for now.
947 			 */
948 			if (key->partattrs[i] == 0)
949 				ereport(ERROR,
950 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
951 						 errmsg("unsupported %s constraint with partition key definition",
952 								constraint_type),
953 						 errdetail("%s constraints cannot be used when partition keys include expressions.",
954 								   constraint_type)));
955 
956 			/* Search the index column(s) for a match */
957 			for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++)
958 			{
959 				if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j])
960 				{
961 					/* Matched the column, now what about the equality op? */
962 					Oid			idx_opfamily;
963 					Oid			idx_opcintype;
964 
965 					if (get_opclass_opfamily_and_input_type(classObjectId[j],
966 															&idx_opfamily,
967 															&idx_opcintype))
968 					{
969 						Oid			idx_eqop;
970 
971 						idx_eqop = get_opfamily_member(idx_opfamily,
972 													   idx_opcintype,
973 													   idx_opcintype,
974 													   eq_strategy);
975 						if (ptkey_eqop == idx_eqop)
976 						{
977 							found = true;
978 							break;
979 						}
980 					}
981 				}
982 			}
983 
984 			if (!found)
985 			{
986 				Form_pg_attribute att;
987 
988 				att = TupleDescAttr(RelationGetDescr(rel),
989 									key->partattrs[i] - 1);
990 				ereport(ERROR,
991 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
992 						 errmsg("unique constraint on partitioned table must include all partitioning columns"),
993 						 errdetail("%s constraint on table \"%s\" lacks column \"%s\" which is part of the partition key.",
994 								   constraint_type, RelationGetRelationName(rel),
995 								   NameStr(att->attname))));
996 			}
997 		}
998 	}
999 
1000 
1001 	/*
1002 	 * We disallow indexes on system columns.  They would not necessarily get
1003 	 * updated correctly, and they don't seem useful anyway.
1004 	 */
1005 	for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
1006 	{
1007 		AttrNumber	attno = indexInfo->ii_IndexAttrNumbers[i];
1008 
1009 		if (attno < 0)
1010 			ereport(ERROR,
1011 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1012 					 errmsg("index creation on system columns is not supported")));
1013 	}
1014 
1015 	/*
1016 	 * Also check for system columns used in expressions or predicates.
1017 	 */
1018 	if (indexInfo->ii_Expressions || indexInfo->ii_Predicate)
1019 	{
1020 		Bitmapset  *indexattrs = NULL;
1021 
1022 		pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
1023 		pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
1024 
1025 		for (i = FirstLowInvalidHeapAttributeNumber + 1; i < 0; i++)
1026 		{
1027 			if (bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
1028 							  indexattrs))
1029 				ereport(ERROR,
1030 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1031 						 errmsg("index creation on system columns is not supported")));
1032 		}
1033 	}
1034 
1035 	/*
1036 	 * Report index creation if appropriate (delay this till after most of the
1037 	 * error checks)
1038 	 */
1039 	if (stmt->isconstraint && !quiet)
1040 	{
1041 		const char *constraint_type;
1042 
1043 		if (stmt->primary)
1044 			constraint_type = "PRIMARY KEY";
1045 		else if (stmt->unique)
1046 			constraint_type = "UNIQUE";
1047 		else if (stmt->excludeOpNames != NIL)
1048 			constraint_type = "EXCLUDE";
1049 		else
1050 		{
1051 			elog(ERROR, "unknown constraint type");
1052 			constraint_type = NULL; /* keep compiler quiet */
1053 		}
1054 
1055 		ereport(DEBUG1,
1056 				(errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
1057 						is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
1058 						constraint_type,
1059 						indexRelationName, RelationGetRelationName(rel))));
1060 	}
1061 
1062 	/*
1063 	 * A valid stmt->oldNode implies that we already have a built form of the
1064 	 * index.  The caller should also decline any index build.
1065 	 */
1066 	Assert(!OidIsValid(stmt->oldNode) || (skip_build && !concurrent));
1067 
1068 	/*
1069 	 * Make the catalog entries for the index, including constraints. This
1070 	 * step also actually builds the index, except if caller requested not to
1071 	 * or in concurrent mode, in which case it'll be done later, or doing a
1072 	 * partitioned index (because those don't have storage).
1073 	 */
1074 	flags = constr_flags = 0;
1075 	if (stmt->isconstraint)
1076 		flags |= INDEX_CREATE_ADD_CONSTRAINT;
1077 	if (skip_build || concurrent || partitioned)
1078 		flags |= INDEX_CREATE_SKIP_BUILD;
1079 	if (stmt->if_not_exists)
1080 		flags |= INDEX_CREATE_IF_NOT_EXISTS;
1081 	if (concurrent)
1082 		flags |= INDEX_CREATE_CONCURRENT;
1083 	if (partitioned)
1084 		flags |= INDEX_CREATE_PARTITIONED;
1085 	if (stmt->primary)
1086 		flags |= INDEX_CREATE_IS_PRIMARY;
1087 
1088 	/*
1089 	 * If the table is partitioned, and recursion was declined but partitions
1090 	 * exist, mark the index as invalid.
1091 	 */
1092 	if (partitioned && stmt->relation && !stmt->relation->inh)
1093 	{
1094 		PartitionDesc pd = RelationGetPartitionDesc(rel);
1095 
1096 		if (pd->nparts != 0)
1097 			flags |= INDEX_CREATE_INVALID;
1098 	}
1099 
1100 	if (stmt->deferrable)
1101 		constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
1102 	if (stmt->initdeferred)
1103 		constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
1104 
1105 	indexRelationId =
1106 		index_create(rel, indexRelationName, indexRelationId, parentIndexId,
1107 					 parentConstraintId,
1108 					 stmt->oldNode, indexInfo, indexColNames,
1109 					 accessMethodId, tablespaceId,
1110 					 collationObjectId, classObjectId,
1111 					 coloptions, reloptions,
1112 					 flags, constr_flags,
1113 					 allowSystemTableMods, !check_rights,
1114 					 &createdConstraintId);
1115 
1116 	ObjectAddressSet(address, RelationRelationId, indexRelationId);
1117 
1118 	/*
1119 	 * Revert to original default_tablespace.  Must do this before any return
1120 	 * from this function, but after index_create, so this is a good time.
1121 	 */
1122 	if (save_nestlevel >= 0)
1123 		AtEOXact_GUC(true, save_nestlevel);
1124 
1125 	if (!OidIsValid(indexRelationId))
1126 	{
1127 		table_close(rel, NoLock);
1128 
1129 		/* If this is the top-level index, we're done */
1130 		if (!OidIsValid(parentIndexId))
1131 			pgstat_progress_end_command();
1132 
1133 		return address;
1134 	}
1135 
1136 	/* Add any requested comment */
1137 	if (stmt->idxcomment != NULL)
1138 		CreateComments(indexRelationId, RelationRelationId, 0,
1139 					   stmt->idxcomment);
1140 
1141 	if (partitioned)
1142 	{
1143 		PartitionDesc partdesc;
1144 
1145 		/*
1146 		 * Unless caller specified to skip this step (via ONLY), process each
1147 		 * partition to make sure they all contain a corresponding index.
1148 		 *
1149 		 * If we're called internally (no stmt->relation), recurse always.
1150 		 */
1151 		partdesc = RelationGetPartitionDesc(rel);
1152 		if ((!stmt->relation || stmt->relation->inh) && partdesc->nparts > 0)
1153 		{
1154 			int			nparts = partdesc->nparts;
1155 			Oid		   *part_oids = palloc(sizeof(Oid) * nparts);
1156 			bool		invalidate_parent = false;
1157 			TupleDesc	parentDesc;
1158 			Oid		   *opfamOids;
1159 
1160 			pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_TOTAL,
1161 										 nparts);
1162 
1163 			memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
1164 
1165 			parentDesc = RelationGetDescr(rel);
1166 			opfamOids = palloc(sizeof(Oid) * numberOfKeyAttributes);
1167 			for (i = 0; i < numberOfKeyAttributes; i++)
1168 				opfamOids[i] = get_opclass_family(classObjectId[i]);
1169 
1170 			/*
1171 			 * For each partition, scan all existing indexes; if one matches
1172 			 * our index definition and is not already attached to some other
1173 			 * parent index, attach it to the one we just created.
1174 			 *
1175 			 * If none matches, build a new index by calling ourselves
1176 			 * recursively with the same options (except for the index name).
1177 			 */
1178 			for (i = 0; i < nparts; i++)
1179 			{
1180 				Oid			childRelid = part_oids[i];
1181 				Relation	childrel;
1182 				List	   *childidxs;
1183 				ListCell   *cell;
1184 				AttrMap    *attmap;
1185 				bool		found = false;
1186 
1187 				childrel = table_open(childRelid, lockmode);
1188 
1189 				/*
1190 				 * Don't try to create indexes on foreign tables, though. Skip
1191 				 * those if a regular index, or fail if trying to create a
1192 				 * constraint index.
1193 				 */
1194 				if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1195 				{
1196 					if (stmt->unique || stmt->primary)
1197 						ereport(ERROR,
1198 								(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1199 								 errmsg("cannot create unique index on partitioned table \"%s\"",
1200 										RelationGetRelationName(rel)),
1201 								 errdetail("Table \"%s\" contains partitions that are foreign tables.",
1202 										   RelationGetRelationName(rel))));
1203 
1204 					table_close(childrel, lockmode);
1205 					continue;
1206 				}
1207 
1208 				childidxs = RelationGetIndexList(childrel);
1209 				attmap =
1210 					build_attrmap_by_name(RelationGetDescr(childrel),
1211 										  parentDesc);
1212 
1213 				foreach(cell, childidxs)
1214 				{
1215 					Oid			cldidxid = lfirst_oid(cell);
1216 					Relation	cldidx;
1217 					IndexInfo  *cldIdxInfo;
1218 
1219 					/* this index is already partition of another one */
1220 					if (has_superclass(cldidxid))
1221 						continue;
1222 
1223 					cldidx = index_open(cldidxid, lockmode);
1224 					cldIdxInfo = BuildIndexInfo(cldidx);
1225 					if (CompareIndexInfo(cldIdxInfo, indexInfo,
1226 										 cldidx->rd_indcollation,
1227 										 collationObjectId,
1228 										 cldidx->rd_opfamily,
1229 										 opfamOids,
1230 										 attmap))
1231 					{
1232 						Oid			cldConstrOid = InvalidOid;
1233 
1234 						/*
1235 						 * Found a match.
1236 						 *
1237 						 * If this index is being created in the parent
1238 						 * because of a constraint, then the child needs to
1239 						 * have a constraint also, so look for one.  If there
1240 						 * is no such constraint, this index is no good, so
1241 						 * keep looking.
1242 						 */
1243 						if (createdConstraintId != InvalidOid)
1244 						{
1245 							cldConstrOid =
1246 								get_relation_idx_constraint_oid(childRelid,
1247 																cldidxid);
1248 							if (cldConstrOid == InvalidOid)
1249 							{
1250 								index_close(cldidx, lockmode);
1251 								continue;
1252 							}
1253 						}
1254 
1255 						/* Attach index to parent and we're done. */
1256 						IndexSetParentIndex(cldidx, indexRelationId);
1257 						if (createdConstraintId != InvalidOid)
1258 							ConstraintSetParentConstraint(cldConstrOid,
1259 														  createdConstraintId,
1260 														  childRelid);
1261 
1262 						if (!cldidx->rd_index->indisvalid)
1263 							invalidate_parent = true;
1264 
1265 						found = true;
1266 						/* keep lock till commit */
1267 						index_close(cldidx, NoLock);
1268 						break;
1269 					}
1270 
1271 					index_close(cldidx, lockmode);
1272 				}
1273 
1274 				list_free(childidxs);
1275 				table_close(childrel, NoLock);
1276 
1277 				/*
1278 				 * If no matching index was found, create our own.
1279 				 */
1280 				if (!found)
1281 				{
1282 					IndexStmt  *childStmt = copyObject(stmt);
1283 					bool		found_whole_row;
1284 					ListCell   *lc;
1285 
1286 					/*
1287 					 * We can't use the same index name for the child index,
1288 					 * so clear idxname to let the recursive invocation choose
1289 					 * a new name.  Likewise, the existing target relation
1290 					 * field is wrong, and if indexOid or oldNode are set,
1291 					 * they mustn't be applied to the child either.
1292 					 */
1293 					childStmt->idxname = NULL;
1294 					childStmt->relation = NULL;
1295 					childStmt->indexOid = InvalidOid;
1296 					childStmt->oldNode = InvalidOid;
1297 					childStmt->oldCreateSubid = InvalidSubTransactionId;
1298 					childStmt->oldFirstRelfilenodeSubid = InvalidSubTransactionId;
1299 
1300 					/*
1301 					 * Adjust any Vars (both in expressions and in the index's
1302 					 * WHERE clause) to match the partition's column numbering
1303 					 * in case it's different from the parent's.
1304 					 */
1305 					foreach(lc, childStmt->indexParams)
1306 					{
1307 						IndexElem  *ielem = lfirst(lc);
1308 
1309 						/*
1310 						 * If the index parameter is an expression, we must
1311 						 * translate it to contain child Vars.
1312 						 */
1313 						if (ielem->expr)
1314 						{
1315 							ielem->expr =
1316 								map_variable_attnos((Node *) ielem->expr,
1317 													1, 0, attmap,
1318 													InvalidOid,
1319 													&found_whole_row);
1320 							if (found_whole_row)
1321 								elog(ERROR, "cannot convert whole-row table reference");
1322 						}
1323 					}
1324 					childStmt->whereClause =
1325 						map_variable_attnos(stmt->whereClause, 1, 0,
1326 											attmap,
1327 											InvalidOid, &found_whole_row);
1328 					if (found_whole_row)
1329 						elog(ERROR, "cannot convert whole-row table reference");
1330 
1331 					DefineIndex(childRelid, childStmt,
1332 								InvalidOid, /* no predefined OID */
1333 								indexRelationId,	/* this is our child */
1334 								createdConstraintId,
1335 								is_alter_table, check_rights, check_not_in_use,
1336 								skip_build, quiet);
1337 				}
1338 
1339 				pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_DONE,
1340 											 i + 1);
1341 				free_attrmap(attmap);
1342 			}
1343 
1344 			/*
1345 			 * The pg_index row we inserted for this index was marked
1346 			 * indisvalid=true.  But if we attached an existing index that is
1347 			 * invalid, this is incorrect, so update our row to invalid too.
1348 			 */
1349 			if (invalidate_parent)
1350 			{
1351 				Relation	pg_index = table_open(IndexRelationId, RowExclusiveLock);
1352 				HeapTuple	tup,
1353 							newtup;
1354 
1355 				tup = SearchSysCache1(INDEXRELID,
1356 									  ObjectIdGetDatum(indexRelationId));
1357 				if (!HeapTupleIsValid(tup))
1358 					elog(ERROR, "cache lookup failed for index %u",
1359 						 indexRelationId);
1360 				newtup = heap_copytuple(tup);
1361 				((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
1362 				CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
1363 				ReleaseSysCache(tup);
1364 				table_close(pg_index, RowExclusiveLock);
1365 				heap_freetuple(newtup);
1366 			}
1367 		}
1368 
1369 		/*
1370 		 * Indexes on partitioned tables are not themselves built, so we're
1371 		 * done here.
1372 		 */
1373 		table_close(rel, NoLock);
1374 		if (!OidIsValid(parentIndexId))
1375 			pgstat_progress_end_command();
1376 		return address;
1377 	}
1378 
1379 	if (!concurrent)
1380 	{
1381 		/* Close the heap and we're done, in the non-concurrent case */
1382 		table_close(rel, NoLock);
1383 
1384 		/* If this is the top-level index, we're done. */
1385 		if (!OidIsValid(parentIndexId))
1386 			pgstat_progress_end_command();
1387 
1388 		return address;
1389 	}
1390 
1391 	/* save lockrelid and locktag for below, then close rel */
1392 	heaprelid = rel->rd_lockInfo.lockRelId;
1393 	SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
1394 	table_close(rel, NoLock);
1395 
1396 	/*
1397 	 * For a concurrent build, it's important to make the catalog entries
1398 	 * visible to other transactions before we start to build the index. That
1399 	 * will prevent them from making incompatible HOT updates.  The new index
1400 	 * will be marked not indisready and not indisvalid, so that no one else
1401 	 * tries to either insert into it or use it for queries.
1402 	 *
1403 	 * We must commit our current transaction so that the index becomes
1404 	 * visible; then start another.  Note that all the data structures we just
1405 	 * built are lost in the commit.  The only data we keep past here are the
1406 	 * relation IDs.
1407 	 *
1408 	 * Before committing, get a session-level lock on the table, to ensure
1409 	 * that neither it nor the index can be dropped before we finish. This
1410 	 * cannot block, even if someone else is waiting for access, because we
1411 	 * already have the same lock within our transaction.
1412 	 *
1413 	 * Note: we don't currently bother with a session lock on the index,
1414 	 * because there are no operations that could change its state while we
1415 	 * hold lock on the parent table.  This might need to change later.
1416 	 */
1417 	LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
1418 
1419 	PopActiveSnapshot();
1420 	CommitTransactionCommand();
1421 	StartTransactionCommand();
1422 
1423 	/*
1424 	 * The index is now visible, so we can report the OID.
1425 	 */
1426 	pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
1427 								 indexRelationId);
1428 
1429 	/*
1430 	 * Phase 2 of concurrent index build (see comments for validate_index()
1431 	 * for an overview of how this works)
1432 	 *
1433 	 * Now we must wait until no running transaction could have the table open
1434 	 * with the old list of indexes.  Use ShareLock to consider running
1435 	 * transactions that hold locks that permit writing to the table.  Note we
1436 	 * do not need to worry about xacts that open the table for writing after
1437 	 * this point; they will see the new index when they open it.
1438 	 *
1439 	 * Note: the reason we use actual lock acquisition here, rather than just
1440 	 * checking the ProcArray and sleeping, is that deadlock is possible if
1441 	 * one of the transactions in question is blocked trying to acquire an
1442 	 * exclusive lock on our table.  The lock code will detect deadlock and
1443 	 * error out properly.
1444 	 */
1445 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
1446 								 PROGRESS_CREATEIDX_PHASE_WAIT_1);
1447 	WaitForLockers(heaplocktag, ShareLock, true);
1448 
1449 	/*
1450 	 * At this moment we are sure that there are no transactions with the
1451 	 * table open for write that don't have this new index in their list of
1452 	 * indexes.  We have waited out all the existing transactions and any new
1453 	 * transaction will have the new index in its list, but the index is still
1454 	 * marked as "not-ready-for-inserts".  The index is consulted while
1455 	 * deciding HOT-safety though.  This arrangement ensures that no new HOT
1456 	 * chains can be created where the new tuple and the old tuple in the
1457 	 * chain have different index keys.
1458 	 *
1459 	 * We now take a new snapshot, and build the index using all tuples that
1460 	 * are visible in this snapshot.  We can be sure that any HOT updates to
1461 	 * these tuples will be compatible with the index, since any updates made
1462 	 * by transactions that didn't know about the index are now committed or
1463 	 * rolled back.  Thus, each visible tuple is either the end of its
1464 	 * HOT-chain or the extension of the chain is HOT-safe for this index.
1465 	 */
1466 
1467 	/* Set ActiveSnapshot since functions in the indexes may need it */
1468 	PushActiveSnapshot(GetTransactionSnapshot());
1469 
1470 	/* Perform concurrent build of index */
1471 	index_concurrently_build(relationId, indexRelationId);
1472 
1473 	/* we can do away with our snapshot */
1474 	PopActiveSnapshot();
1475 
1476 	/*
1477 	 * Commit this transaction to make the indisready update visible.
1478 	 */
1479 	CommitTransactionCommand();
1480 	StartTransactionCommand();
1481 
1482 	/*
1483 	 * Phase 3 of concurrent index build
1484 	 *
1485 	 * We once again wait until no transaction can have the table open with
1486 	 * the index marked as read-only for updates.
1487 	 */
1488 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
1489 								 PROGRESS_CREATEIDX_PHASE_WAIT_2);
1490 	WaitForLockers(heaplocktag, ShareLock, true);
1491 
1492 	/*
1493 	 * Now take the "reference snapshot" that will be used by validate_index()
1494 	 * to filter candidate tuples.  Beware!  There might still be snapshots in
1495 	 * use that treat some transaction as in-progress that our reference
1496 	 * snapshot treats as committed.  If such a recently-committed transaction
1497 	 * deleted tuples in the table, we will not include them in the index; yet
1498 	 * those transactions which see the deleting one as still-in-progress will
1499 	 * expect such tuples to be there once we mark the index as valid.
1500 	 *
1501 	 * We solve this by waiting for all endangered transactions to exit before
1502 	 * we mark the index as valid.
1503 	 *
1504 	 * We also set ActiveSnapshot to this snap, since functions in indexes may
1505 	 * need a snapshot.
1506 	 */
1507 	snapshot = RegisterSnapshot(GetTransactionSnapshot());
1508 	PushActiveSnapshot(snapshot);
1509 
1510 	/*
1511 	 * Scan the index and the heap, insert any missing index entries.
1512 	 */
1513 	validate_index(relationId, indexRelationId, snapshot);
1514 
1515 	/*
1516 	 * Drop the reference snapshot.  We must do this before waiting out other
1517 	 * snapshot holders, else we will deadlock against other processes also
1518 	 * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
1519 	 * they must wait for.  But first, save the snapshot's xmin to use as
1520 	 * limitXmin for GetCurrentVirtualXIDs().
1521 	 */
1522 	limitXmin = snapshot->xmin;
1523 
1524 	PopActiveSnapshot();
1525 	UnregisterSnapshot(snapshot);
1526 
1527 	/*
1528 	 * The snapshot subsystem could still contain registered snapshots that
1529 	 * are holding back our process's advertised xmin; in particular, if
1530 	 * default_transaction_isolation = serializable, there is a transaction
1531 	 * snapshot that is still active.  The CatalogSnapshot is likewise a
1532 	 * hazard.  To ensure no deadlocks, we must commit and start yet another
1533 	 * transaction, and do our wait before any snapshot has been taken in it.
1534 	 */
1535 	CommitTransactionCommand();
1536 	StartTransactionCommand();
1537 
1538 	/* We should now definitely not be advertising any xmin. */
1539 	Assert(MyPgXact->xmin == InvalidTransactionId);
1540 
1541 	/*
1542 	 * The index is now valid in the sense that it contains all currently
1543 	 * interesting tuples.  But since it might not contain tuples deleted just
1544 	 * before the reference snap was taken, we have to wait out any
1545 	 * transactions that might have older snapshots.
1546 	 */
1547 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
1548 								 PROGRESS_CREATEIDX_PHASE_WAIT_3);
1549 	WaitForOlderSnapshots(limitXmin, true);
1550 
1551 	/*
1552 	 * Index can now be marked valid -- update its pg_index entry
1553 	 */
1554 	index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
1555 
1556 	/*
1557 	 * The pg_index update will cause backends (including this one) to update
1558 	 * relcache entries for the index itself, but we should also send a
1559 	 * relcache inval on the parent table to force replanning of cached plans.
1560 	 * Otherwise existing sessions might fail to use the new index where it
1561 	 * would be useful.  (Note that our earlier commits did not create reasons
1562 	 * to replan; so relcache flush on the index itself was sufficient.)
1563 	 */
1564 	CacheInvalidateRelcacheByRelid(heaprelid.relId);
1565 
1566 	/*
1567 	 * Last thing to do is release the session-level lock on the parent table.
1568 	 */
1569 	UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
1570 
1571 	pgstat_progress_end_command();
1572 
1573 	return address;
1574 }
1575 
1576 
1577 /*
1578  * CheckMutability
1579  *		Test whether given expression is mutable
1580  */
1581 static bool
CheckMutability(Expr * expr)1582 CheckMutability(Expr *expr)
1583 {
1584 	/*
1585 	 * First run the expression through the planner.  This has a couple of
1586 	 * important consequences.  First, function default arguments will get
1587 	 * inserted, which may affect volatility (consider "default now()").
1588 	 * Second, inline-able functions will get inlined, which may allow us to
1589 	 * conclude that the function is really less volatile than it's marked. As
1590 	 * an example, polymorphic functions must be marked with the most volatile
1591 	 * behavior that they have for any input type, but once we inline the
1592 	 * function we may be able to conclude that it's not so volatile for the
1593 	 * particular input type we're dealing with.
1594 	 *
1595 	 * We assume here that expression_planner() won't scribble on its input.
1596 	 */
1597 	expr = expression_planner(expr);
1598 
1599 	/* Now we can search for non-immutable functions */
1600 	return contain_mutable_functions((Node *) expr);
1601 }
1602 
1603 
1604 /*
1605  * CheckPredicate
1606  *		Checks that the given partial-index predicate is valid.
1607  *
1608  * This used to also constrain the form of the predicate to forms that
1609  * indxpath.c could do something with.  However, that seems overly
1610  * restrictive.  One useful application of partial indexes is to apply
1611  * a UNIQUE constraint across a subset of a table, and in that scenario
1612  * any evaluable predicate will work.  So accept any predicate here
1613  * (except ones requiring a plan), and let indxpath.c fend for itself.
1614  */
1615 static void
CheckPredicate(Expr * predicate)1616 CheckPredicate(Expr *predicate)
1617 {
1618 	/*
1619 	 * transformExpr() should have already rejected subqueries, aggregates,
1620 	 * and window functions, based on the EXPR_KIND_ for a predicate.
1621 	 */
1622 
1623 	/*
1624 	 * A predicate using mutable functions is probably wrong, for the same
1625 	 * reasons that we don't allow an index expression to use one.
1626 	 */
1627 	if (CheckMutability(predicate))
1628 		ereport(ERROR,
1629 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1630 				 errmsg("functions in index predicate must be marked IMMUTABLE")));
1631 }
1632 
1633 /*
1634  * Compute per-index-column information, including indexed column numbers
1635  * or index expressions, opclasses and their options. Note, all output vectors
1636  * should be allocated for all columns, including "including" ones.
1637  */
1638 static void
ComputeIndexAttrs(IndexInfo * indexInfo,Oid * typeOidP,Oid * collationOidP,Oid * classOidP,int16 * colOptionP,List * attList,List * exclusionOpNames,Oid relId,const char * accessMethodName,Oid accessMethodId,bool amcanorder,bool isconstraint)1639 ComputeIndexAttrs(IndexInfo *indexInfo,
1640 				  Oid *typeOidP,
1641 				  Oid *collationOidP,
1642 				  Oid *classOidP,
1643 				  int16 *colOptionP,
1644 				  List *attList,	/* list of IndexElem's */
1645 				  List *exclusionOpNames,
1646 				  Oid relId,
1647 				  const char *accessMethodName,
1648 				  Oid accessMethodId,
1649 				  bool amcanorder,
1650 				  bool isconstraint)
1651 {
1652 	ListCell   *nextExclOp;
1653 	ListCell   *lc;
1654 	int			attn;
1655 	int			nkeycols = indexInfo->ii_NumIndexKeyAttrs;
1656 
1657 	/* Allocate space for exclusion operator info, if needed */
1658 	if (exclusionOpNames)
1659 	{
1660 		Assert(list_length(exclusionOpNames) == nkeycols);
1661 		indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
1662 		indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
1663 		indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
1664 		nextExclOp = list_head(exclusionOpNames);
1665 	}
1666 	else
1667 		nextExclOp = NULL;
1668 
1669 	/*
1670 	 * process attributeList
1671 	 */
1672 	attn = 0;
1673 	foreach(lc, attList)
1674 	{
1675 		IndexElem  *attribute = (IndexElem *) lfirst(lc);
1676 		Oid			atttype;
1677 		Oid			attcollation;
1678 
1679 		/*
1680 		 * Process the column-or-expression to be indexed.
1681 		 */
1682 		if (attribute->name != NULL)
1683 		{
1684 			/* Simple index attribute */
1685 			HeapTuple	atttuple;
1686 			Form_pg_attribute attform;
1687 
1688 			Assert(attribute->expr == NULL);
1689 			atttuple = SearchSysCacheAttName(relId, attribute->name);
1690 			if (!HeapTupleIsValid(atttuple))
1691 			{
1692 				/* difference in error message spellings is historical */
1693 				if (isconstraint)
1694 					ereport(ERROR,
1695 							(errcode(ERRCODE_UNDEFINED_COLUMN),
1696 							 errmsg("column \"%s\" named in key does not exist",
1697 									attribute->name)));
1698 				else
1699 					ereport(ERROR,
1700 							(errcode(ERRCODE_UNDEFINED_COLUMN),
1701 							 errmsg("column \"%s\" does not exist",
1702 									attribute->name)));
1703 			}
1704 			attform = (Form_pg_attribute) GETSTRUCT(atttuple);
1705 			indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
1706 			atttype = attform->atttypid;
1707 			attcollation = attform->attcollation;
1708 			ReleaseSysCache(atttuple);
1709 		}
1710 		else
1711 		{
1712 			/* Index expression */
1713 			Node	   *expr = attribute->expr;
1714 
1715 			Assert(expr != NULL);
1716 
1717 			if (attn >= nkeycols)
1718 				ereport(ERROR,
1719 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1720 						 errmsg("expressions are not supported in included columns")));
1721 			atttype = exprType(expr);
1722 			attcollation = exprCollation(expr);
1723 
1724 			/*
1725 			 * Strip any top-level COLLATE clause.  This ensures that we treat
1726 			 * "x COLLATE y" and "(x COLLATE y)" alike.
1727 			 */
1728 			while (IsA(expr, CollateExpr))
1729 				expr = (Node *) ((CollateExpr *) expr)->arg;
1730 
1731 			if (IsA(expr, Var) &&
1732 				((Var *) expr)->varattno != InvalidAttrNumber)
1733 			{
1734 				/*
1735 				 * User wrote "(column)" or "(column COLLATE something)".
1736 				 * Treat it like simple attribute anyway.
1737 				 */
1738 				indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
1739 			}
1740 			else
1741 			{
1742 				indexInfo->ii_IndexAttrNumbers[attn] = 0;	/* marks expression */
1743 				indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
1744 													expr);
1745 
1746 				/*
1747 				 * transformExpr() should have already rejected subqueries,
1748 				 * aggregates, and window functions, based on the EXPR_KIND_
1749 				 * for an index expression.
1750 				 */
1751 
1752 				/*
1753 				 * An expression using mutable functions is probably wrong,
1754 				 * since if you aren't going to get the same result for the
1755 				 * same data every time, it's not clear what the index entries
1756 				 * mean at all.
1757 				 */
1758 				if (CheckMutability((Expr *) expr))
1759 					ereport(ERROR,
1760 							(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1761 							 errmsg("functions in index expression must be marked IMMUTABLE")));
1762 			}
1763 		}
1764 
1765 		typeOidP[attn] = atttype;
1766 
1767 		/*
1768 		 * Included columns have no collation, no opclass and no ordering
1769 		 * options.
1770 		 */
1771 		if (attn >= nkeycols)
1772 		{
1773 			if (attribute->collation)
1774 				ereport(ERROR,
1775 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1776 						 errmsg("including column does not support a collation")));
1777 			if (attribute->opclass)
1778 				ereport(ERROR,
1779 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1780 						 errmsg("including column does not support an operator class")));
1781 			if (attribute->ordering != SORTBY_DEFAULT)
1782 				ereport(ERROR,
1783 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1784 						 errmsg("including column does not support ASC/DESC options")));
1785 			if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
1786 				ereport(ERROR,
1787 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1788 						 errmsg("including column does not support NULLS FIRST/LAST options")));
1789 
1790 			classOidP[attn] = InvalidOid;
1791 			colOptionP[attn] = 0;
1792 			collationOidP[attn] = InvalidOid;
1793 			attn++;
1794 
1795 			continue;
1796 		}
1797 
1798 		/*
1799 		 * Apply collation override if any
1800 		 */
1801 		if (attribute->collation)
1802 			attcollation = get_collation_oid(attribute->collation, false);
1803 
1804 		/*
1805 		 * Check we have a collation iff it's a collatable type.  The only
1806 		 * expected failures here are (1) COLLATE applied to a noncollatable
1807 		 * type, or (2) index expression had an unresolved collation.  But we
1808 		 * might as well code this to be a complete consistency check.
1809 		 */
1810 		if (type_is_collatable(atttype))
1811 		{
1812 			if (!OidIsValid(attcollation))
1813 				ereport(ERROR,
1814 						(errcode(ERRCODE_INDETERMINATE_COLLATION),
1815 						 errmsg("could not determine which collation to use for index expression"),
1816 						 errhint("Use the COLLATE clause to set the collation explicitly.")));
1817 		}
1818 		else
1819 		{
1820 			if (OidIsValid(attcollation))
1821 				ereport(ERROR,
1822 						(errcode(ERRCODE_DATATYPE_MISMATCH),
1823 						 errmsg("collations are not supported by type %s",
1824 								format_type_be(atttype))));
1825 		}
1826 
1827 		collationOidP[attn] = attcollation;
1828 
1829 		/*
1830 		 * Identify the opclass to use.
1831 		 */
1832 		classOidP[attn] = ResolveOpClass(attribute->opclass,
1833 										 atttype,
1834 										 accessMethodName,
1835 										 accessMethodId);
1836 
1837 		/*
1838 		 * Identify the exclusion operator, if any.
1839 		 */
1840 		if (nextExclOp)
1841 		{
1842 			List	   *opname = (List *) lfirst(nextExclOp);
1843 			Oid			opid;
1844 			Oid			opfamily;
1845 			int			strat;
1846 
1847 			/*
1848 			 * Find the operator --- it must accept the column datatype
1849 			 * without runtime coercion (but binary compatibility is OK)
1850 			 */
1851 			opid = compatible_oper_opid(opname, atttype, atttype, false);
1852 
1853 			/*
1854 			 * Only allow commutative operators to be used in exclusion
1855 			 * constraints. If X conflicts with Y, but Y does not conflict
1856 			 * with X, bad things will happen.
1857 			 */
1858 			if (get_commutator(opid) != opid)
1859 				ereport(ERROR,
1860 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1861 						 errmsg("operator %s is not commutative",
1862 								format_operator(opid)),
1863 						 errdetail("Only commutative operators can be used in exclusion constraints.")));
1864 
1865 			/*
1866 			 * Operator must be a member of the right opfamily, too
1867 			 */
1868 			opfamily = get_opclass_family(classOidP[attn]);
1869 			strat = get_op_opfamily_strategy(opid, opfamily);
1870 			if (strat == 0)
1871 			{
1872 				HeapTuple	opftuple;
1873 				Form_pg_opfamily opfform;
1874 
1875 				/*
1876 				 * attribute->opclass might not explicitly name the opfamily,
1877 				 * so fetch the name of the selected opfamily for use in the
1878 				 * error message.
1879 				 */
1880 				opftuple = SearchSysCache1(OPFAMILYOID,
1881 										   ObjectIdGetDatum(opfamily));
1882 				if (!HeapTupleIsValid(opftuple))
1883 					elog(ERROR, "cache lookup failed for opfamily %u",
1884 						 opfamily);
1885 				opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
1886 
1887 				ereport(ERROR,
1888 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1889 						 errmsg("operator %s is not a member of operator family \"%s\"",
1890 								format_operator(opid),
1891 								NameStr(opfform->opfname)),
1892 						 errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
1893 			}
1894 
1895 			indexInfo->ii_ExclusionOps[attn] = opid;
1896 			indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
1897 			indexInfo->ii_ExclusionStrats[attn] = strat;
1898 			nextExclOp = lnext(exclusionOpNames, nextExclOp);
1899 		}
1900 
1901 		/*
1902 		 * Set up the per-column options (indoption field).  For now, this is
1903 		 * zero for any un-ordered index, while ordered indexes have DESC and
1904 		 * NULLS FIRST/LAST options.
1905 		 */
1906 		colOptionP[attn] = 0;
1907 		if (amcanorder)
1908 		{
1909 			/* default ordering is ASC */
1910 			if (attribute->ordering == SORTBY_DESC)
1911 				colOptionP[attn] |= INDOPTION_DESC;
1912 			/* default null ordering is LAST for ASC, FIRST for DESC */
1913 			if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
1914 			{
1915 				if (attribute->ordering == SORTBY_DESC)
1916 					colOptionP[attn] |= INDOPTION_NULLS_FIRST;
1917 			}
1918 			else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
1919 				colOptionP[attn] |= INDOPTION_NULLS_FIRST;
1920 		}
1921 		else
1922 		{
1923 			/* index AM does not support ordering */
1924 			if (attribute->ordering != SORTBY_DEFAULT)
1925 				ereport(ERROR,
1926 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1927 						 errmsg("access method \"%s\" does not support ASC/DESC options",
1928 								accessMethodName)));
1929 			if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
1930 				ereport(ERROR,
1931 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1932 						 errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
1933 								accessMethodName)));
1934 		}
1935 
1936 		/* Set up the per-column opclass options (attoptions field). */
1937 		if (attribute->opclassopts)
1938 		{
1939 			Assert(attn < nkeycols);
1940 
1941 			if (!indexInfo->ii_OpclassOptions)
1942 				indexInfo->ii_OpclassOptions =
1943 					palloc0(sizeof(Datum) * indexInfo->ii_NumIndexAttrs);
1944 
1945 			indexInfo->ii_OpclassOptions[attn] =
1946 				transformRelOptions((Datum) 0, attribute->opclassopts,
1947 									NULL, NULL, false, false);
1948 		}
1949 
1950 		attn++;
1951 	}
1952 }
1953 
1954 /*
1955  * Resolve possibly-defaulted operator class specification
1956  *
1957  * Note: This is used to resolve operator class specifications in index and
1958  * partition key definitions.
1959  */
1960 Oid
ResolveOpClass(List * opclass,Oid attrType,const char * accessMethodName,Oid accessMethodId)1961 ResolveOpClass(List *opclass, Oid attrType,
1962 			   const char *accessMethodName, Oid accessMethodId)
1963 {
1964 	char	   *schemaname;
1965 	char	   *opcname;
1966 	HeapTuple	tuple;
1967 	Form_pg_opclass opform;
1968 	Oid			opClassId,
1969 				opInputType;
1970 
1971 	if (opclass == NIL)
1972 	{
1973 		/* no operator class specified, so find the default */
1974 		opClassId = GetDefaultOpClass(attrType, accessMethodId);
1975 		if (!OidIsValid(opClassId))
1976 			ereport(ERROR,
1977 					(errcode(ERRCODE_UNDEFINED_OBJECT),
1978 					 errmsg("data type %s has no default operator class for access method \"%s\"",
1979 							format_type_be(attrType), accessMethodName),
1980 					 errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
1981 		return opClassId;
1982 	}
1983 
1984 	/*
1985 	 * Specific opclass name given, so look up the opclass.
1986 	 */
1987 
1988 	/* deconstruct the name list */
1989 	DeconstructQualifiedName(opclass, &schemaname, &opcname);
1990 
1991 	if (schemaname)
1992 	{
1993 		/* Look in specific schema only */
1994 		Oid			namespaceId;
1995 
1996 		namespaceId = LookupExplicitNamespace(schemaname, false);
1997 		tuple = SearchSysCache3(CLAAMNAMENSP,
1998 								ObjectIdGetDatum(accessMethodId),
1999 								PointerGetDatum(opcname),
2000 								ObjectIdGetDatum(namespaceId));
2001 	}
2002 	else
2003 	{
2004 		/* Unqualified opclass name, so search the search path */
2005 		opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
2006 		if (!OidIsValid(opClassId))
2007 			ereport(ERROR,
2008 					(errcode(ERRCODE_UNDEFINED_OBJECT),
2009 					 errmsg("operator class \"%s\" does not exist for access method \"%s\"",
2010 							opcname, accessMethodName)));
2011 		tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
2012 	}
2013 
2014 	if (!HeapTupleIsValid(tuple))
2015 		ereport(ERROR,
2016 				(errcode(ERRCODE_UNDEFINED_OBJECT),
2017 				 errmsg("operator class \"%s\" does not exist for access method \"%s\"",
2018 						NameListToString(opclass), accessMethodName)));
2019 
2020 	/*
2021 	 * Verify that the index operator class accepts this datatype.  Note we
2022 	 * will accept binary compatibility.
2023 	 */
2024 	opform = (Form_pg_opclass) GETSTRUCT(tuple);
2025 	opClassId = opform->oid;
2026 	opInputType = opform->opcintype;
2027 
2028 	if (!IsBinaryCoercible(attrType, opInputType))
2029 		ereport(ERROR,
2030 				(errcode(ERRCODE_DATATYPE_MISMATCH),
2031 				 errmsg("operator class \"%s\" does not accept data type %s",
2032 						NameListToString(opclass), format_type_be(attrType))));
2033 
2034 	ReleaseSysCache(tuple);
2035 
2036 	return opClassId;
2037 }
2038 
2039 /*
2040  * GetDefaultOpClass
2041  *
2042  * Given the OIDs of a datatype and an access method, find the default
2043  * operator class, if any.  Returns InvalidOid if there is none.
2044  */
2045 Oid
GetDefaultOpClass(Oid type_id,Oid am_id)2046 GetDefaultOpClass(Oid type_id, Oid am_id)
2047 {
2048 	Oid			result = InvalidOid;
2049 	int			nexact = 0;
2050 	int			ncompatible = 0;
2051 	int			ncompatiblepreferred = 0;
2052 	Relation	rel;
2053 	ScanKeyData skey[1];
2054 	SysScanDesc scan;
2055 	HeapTuple	tup;
2056 	TYPCATEGORY tcategory;
2057 
2058 	/* If it's a domain, look at the base type instead */
2059 	type_id = getBaseType(type_id);
2060 
2061 	tcategory = TypeCategory(type_id);
2062 
2063 	/*
2064 	 * We scan through all the opclasses available for the access method,
2065 	 * looking for one that is marked default and matches the target type
2066 	 * (either exactly or binary-compatibly, but prefer an exact match).
2067 	 *
2068 	 * We could find more than one binary-compatible match.  If just one is
2069 	 * for a preferred type, use that one; otherwise we fail, forcing the user
2070 	 * to specify which one he wants.  (The preferred-type special case is a
2071 	 * kluge for varchar: it's binary-compatible to both text and bpchar, so
2072 	 * we need a tiebreaker.)  If we find more than one exact match, then
2073 	 * someone put bogus entries in pg_opclass.
2074 	 */
2075 	rel = table_open(OperatorClassRelationId, AccessShareLock);
2076 
2077 	ScanKeyInit(&skey[0],
2078 				Anum_pg_opclass_opcmethod,
2079 				BTEqualStrategyNumber, F_OIDEQ,
2080 				ObjectIdGetDatum(am_id));
2081 
2082 	scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
2083 							  NULL, 1, skey);
2084 
2085 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
2086 	{
2087 		Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
2088 
2089 		/* ignore altogether if not a default opclass */
2090 		if (!opclass->opcdefault)
2091 			continue;
2092 		if (opclass->opcintype == type_id)
2093 		{
2094 			nexact++;
2095 			result = opclass->oid;
2096 		}
2097 		else if (nexact == 0 &&
2098 				 IsBinaryCoercible(type_id, opclass->opcintype))
2099 		{
2100 			if (IsPreferredType(tcategory, opclass->opcintype))
2101 			{
2102 				ncompatiblepreferred++;
2103 				result = opclass->oid;
2104 			}
2105 			else if (ncompatiblepreferred == 0)
2106 			{
2107 				ncompatible++;
2108 				result = opclass->oid;
2109 			}
2110 		}
2111 	}
2112 
2113 	systable_endscan(scan);
2114 
2115 	table_close(rel, AccessShareLock);
2116 
2117 	/* raise error if pg_opclass contains inconsistent data */
2118 	if (nexact > 1)
2119 		ereport(ERROR,
2120 				(errcode(ERRCODE_DUPLICATE_OBJECT),
2121 				 errmsg("there are multiple default operator classes for data type %s",
2122 						format_type_be(type_id))));
2123 
2124 	if (nexact == 1 ||
2125 		ncompatiblepreferred == 1 ||
2126 		(ncompatiblepreferred == 0 && ncompatible == 1))
2127 		return result;
2128 
2129 	return InvalidOid;
2130 }
2131 
2132 /*
2133  *	makeObjectName()
2134  *
2135  *	Create a name for an implicitly created index, sequence, constraint,
2136  *	extended statistics, etc.
2137  *
2138  *	The parameters are typically: the original table name, the original field
2139  *	name, and a "type" string (such as "seq" or "pkey").    The field name
2140  *	and/or type can be NULL if not relevant.
2141  *
2142  *	The result is a palloc'd string.
2143  *
2144  *	The basic result we want is "name1_name2_label", omitting "_name2" or
2145  *	"_label" when those parameters are NULL.  However, we must generate
2146  *	a name with less than NAMEDATALEN characters!  So, we truncate one or
2147  *	both names if necessary to make a short-enough string.  The label part
2148  *	is never truncated (so it had better be reasonably short).
2149  *
2150  *	The caller is responsible for checking uniqueness of the generated
2151  *	name and retrying as needed; retrying will be done by altering the
2152  *	"label" string (which is why we never truncate that part).
2153  */
2154 char *
makeObjectName(const char * name1,const char * name2,const char * label)2155 makeObjectName(const char *name1, const char *name2, const char *label)
2156 {
2157 	char	   *name;
2158 	int			overhead = 0;	/* chars needed for label and underscores */
2159 	int			availchars;		/* chars available for name(s) */
2160 	int			name1chars;		/* chars allocated to name1 */
2161 	int			name2chars;		/* chars allocated to name2 */
2162 	int			ndx;
2163 
2164 	name1chars = strlen(name1);
2165 	if (name2)
2166 	{
2167 		name2chars = strlen(name2);
2168 		overhead++;				/* allow for separating underscore */
2169 	}
2170 	else
2171 		name2chars = 0;
2172 	if (label)
2173 		overhead += strlen(label) + 1;
2174 
2175 	availchars = NAMEDATALEN - 1 - overhead;
2176 	Assert(availchars > 0);		/* else caller chose a bad label */
2177 
2178 	/*
2179 	 * If we must truncate,  preferentially truncate the longer name. This
2180 	 * logic could be expressed without a loop, but it's simple and obvious as
2181 	 * a loop.
2182 	 */
2183 	while (name1chars + name2chars > availchars)
2184 	{
2185 		if (name1chars > name2chars)
2186 			name1chars--;
2187 		else
2188 			name2chars--;
2189 	}
2190 
2191 	name1chars = pg_mbcliplen(name1, name1chars, name1chars);
2192 	if (name2)
2193 		name2chars = pg_mbcliplen(name2, name2chars, name2chars);
2194 
2195 	/* Now construct the string using the chosen lengths */
2196 	name = palloc(name1chars + name2chars + overhead + 1);
2197 	memcpy(name, name1, name1chars);
2198 	ndx = name1chars;
2199 	if (name2)
2200 	{
2201 		name[ndx++] = '_';
2202 		memcpy(name + ndx, name2, name2chars);
2203 		ndx += name2chars;
2204 	}
2205 	if (label)
2206 	{
2207 		name[ndx++] = '_';
2208 		strcpy(name + ndx, label);
2209 	}
2210 	else
2211 		name[ndx] = '\0';
2212 
2213 	return name;
2214 }
2215 
2216 /*
2217  * Select a nonconflicting name for a new relation.  This is ordinarily
2218  * used to choose index names (which is why it's here) but it can also
2219  * be used for sequences, or any autogenerated relation kind.
2220  *
2221  * name1, name2, and label are used the same way as for makeObjectName(),
2222  * except that the label can't be NULL; digits will be appended to the label
2223  * if needed to create a name that is unique within the specified namespace.
2224  *
2225  * If isconstraint is true, we also avoid choosing a name matching any
2226  * existing constraint in the same namespace.  (This is stricter than what
2227  * Postgres itself requires, but the SQL standard says that constraint names
2228  * should be unique within schemas, so we follow that for autogenerated
2229  * constraint names.)
2230  *
2231  * Note: it is theoretically possible to get a collision anyway, if someone
2232  * else chooses the same name concurrently.  This is fairly unlikely to be
2233  * a problem in practice, especially if one is holding an exclusive lock on
2234  * the relation identified by name1.  However, if choosing multiple names
2235  * within a single command, you'd better create the new object and do
2236  * CommandCounterIncrement before choosing the next one!
2237  *
2238  * Returns a palloc'd string.
2239  */
2240 char *
ChooseRelationName(const char * name1,const char * name2,const char * label,Oid namespaceid,bool isconstraint)2241 ChooseRelationName(const char *name1, const char *name2,
2242 				   const char *label, Oid namespaceid,
2243 				   bool isconstraint)
2244 {
2245 	int			pass = 0;
2246 	char	   *relname = NULL;
2247 	char		modlabel[NAMEDATALEN];
2248 
2249 	/* try the unmodified label first */
2250 	StrNCpy(modlabel, label, sizeof(modlabel));
2251 
2252 	for (;;)
2253 	{
2254 		relname = makeObjectName(name1, name2, modlabel);
2255 
2256 		if (!OidIsValid(get_relname_relid(relname, namespaceid)))
2257 		{
2258 			if (!isconstraint ||
2259 				!ConstraintNameExists(relname, namespaceid))
2260 				break;
2261 		}
2262 
2263 		/* found a conflict, so try a new name component */
2264 		pfree(relname);
2265 		snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
2266 	}
2267 
2268 	return relname;
2269 }
2270 
2271 /*
2272  * Select the name to be used for an index.
2273  *
2274  * The argument list is pretty ad-hoc :-(
2275  */
2276 static char *
ChooseIndexName(const char * tabname,Oid namespaceId,List * colnames,List * exclusionOpNames,bool primary,bool isconstraint)2277 ChooseIndexName(const char *tabname, Oid namespaceId,
2278 				List *colnames, List *exclusionOpNames,
2279 				bool primary, bool isconstraint)
2280 {
2281 	char	   *indexname;
2282 
2283 	if (primary)
2284 	{
2285 		/* the primary key's name does not depend on the specific column(s) */
2286 		indexname = ChooseRelationName(tabname,
2287 									   NULL,
2288 									   "pkey",
2289 									   namespaceId,
2290 									   true);
2291 	}
2292 	else if (exclusionOpNames != NIL)
2293 	{
2294 		indexname = ChooseRelationName(tabname,
2295 									   ChooseIndexNameAddition(colnames),
2296 									   "excl",
2297 									   namespaceId,
2298 									   true);
2299 	}
2300 	else if (isconstraint)
2301 	{
2302 		indexname = ChooseRelationName(tabname,
2303 									   ChooseIndexNameAddition(colnames),
2304 									   "key",
2305 									   namespaceId,
2306 									   true);
2307 	}
2308 	else
2309 	{
2310 		indexname = ChooseRelationName(tabname,
2311 									   ChooseIndexNameAddition(colnames),
2312 									   "idx",
2313 									   namespaceId,
2314 									   false);
2315 	}
2316 
2317 	return indexname;
2318 }
2319 
2320 /*
2321  * Generate "name2" for a new index given the list of column names for it
2322  * (as produced by ChooseIndexColumnNames).  This will be passed to
2323  * ChooseRelationName along with the parent table name and a suitable label.
2324  *
2325  * We know that less than NAMEDATALEN characters will actually be used,
2326  * so we can truncate the result once we've generated that many.
2327  *
2328  * XXX See also ChooseForeignKeyConstraintNameAddition and
2329  * ChooseExtendedStatisticNameAddition.
2330  */
2331 static char *
ChooseIndexNameAddition(List * colnames)2332 ChooseIndexNameAddition(List *colnames)
2333 {
2334 	char		buf[NAMEDATALEN * 2];
2335 	int			buflen = 0;
2336 	ListCell   *lc;
2337 
2338 	buf[0] = '\0';
2339 	foreach(lc, colnames)
2340 	{
2341 		const char *name = (const char *) lfirst(lc);
2342 
2343 		if (buflen > 0)
2344 			buf[buflen++] = '_';	/* insert _ between names */
2345 
2346 		/*
2347 		 * At this point we have buflen <= NAMEDATALEN.  name should be less
2348 		 * than NAMEDATALEN already, but use strlcpy for paranoia.
2349 		 */
2350 		strlcpy(buf + buflen, name, NAMEDATALEN);
2351 		buflen += strlen(buf + buflen);
2352 		if (buflen >= NAMEDATALEN)
2353 			break;
2354 	}
2355 	return pstrdup(buf);
2356 }
2357 
2358 /*
2359  * Select the actual names to be used for the columns of an index, given the
2360  * list of IndexElems for the columns.  This is mostly about ensuring the
2361  * names are unique so we don't get a conflicting-attribute-names error.
2362  *
2363  * Returns a List of plain strings (char *, not String nodes).
2364  */
2365 static List *
ChooseIndexColumnNames(List * indexElems)2366 ChooseIndexColumnNames(List *indexElems)
2367 {
2368 	List	   *result = NIL;
2369 	ListCell   *lc;
2370 
2371 	foreach(lc, indexElems)
2372 	{
2373 		IndexElem  *ielem = (IndexElem *) lfirst(lc);
2374 		const char *origname;
2375 		const char *curname;
2376 		int			i;
2377 		char		buf[NAMEDATALEN];
2378 
2379 		/* Get the preliminary name from the IndexElem */
2380 		if (ielem->indexcolname)
2381 			origname = ielem->indexcolname; /* caller-specified name */
2382 		else if (ielem->name)
2383 			origname = ielem->name; /* simple column reference */
2384 		else
2385 			origname = "expr";	/* default name for expression */
2386 
2387 		/* If it conflicts with any previous column, tweak it */
2388 		curname = origname;
2389 		for (i = 1;; i++)
2390 		{
2391 			ListCell   *lc2;
2392 			char		nbuf[32];
2393 			int			nlen;
2394 
2395 			foreach(lc2, result)
2396 			{
2397 				if (strcmp(curname, (char *) lfirst(lc2)) == 0)
2398 					break;
2399 			}
2400 			if (lc2 == NULL)
2401 				break;			/* found nonconflicting name */
2402 
2403 			sprintf(nbuf, "%d", i);
2404 
2405 			/* Ensure generated names are shorter than NAMEDATALEN */
2406 			nlen = pg_mbcliplen(origname, strlen(origname),
2407 								NAMEDATALEN - 1 - strlen(nbuf));
2408 			memcpy(buf, origname, nlen);
2409 			strcpy(buf + nlen, nbuf);
2410 			curname = buf;
2411 		}
2412 
2413 		/* And attach to the result list */
2414 		result = lappend(result, pstrdup(curname));
2415 	}
2416 	return result;
2417 }
2418 
2419 /*
2420  * ReindexIndex
2421  *		Recreate a specific index.
2422  */
2423 void
ReindexIndex(RangeVar * indexRelation,int options,bool concurrent)2424 ReindexIndex(RangeVar *indexRelation, int options, bool concurrent)
2425 {
2426 	struct ReindexIndexCallbackState state;
2427 	Oid			indOid;
2428 	Relation	irel;
2429 	char		persistence;
2430 
2431 	/*
2432 	 * Find and lock index, and check permissions on table; use callback to
2433 	 * obtain lock on table first, to avoid deadlock hazard.  The lock level
2434 	 * used here must match the index lock obtained in reindex_index().
2435 	 *
2436 	 * If it's a temporary index, we will perform a non-concurrent reindex,
2437 	 * even if CONCURRENTLY was requested.  In that case, reindex_index() will
2438 	 * upgrade the lock, but that's OK, because other sessions can't hold
2439 	 * locks on our temporary table.
2440 	 */
2441 	state.concurrent = concurrent;
2442 	state.locked_table_oid = InvalidOid;
2443 	indOid = RangeVarGetRelidExtended(indexRelation,
2444 									  concurrent ? ShareUpdateExclusiveLock : AccessExclusiveLock,
2445 									  0,
2446 									  RangeVarCallbackForReindexIndex,
2447 									  &state);
2448 
2449 	/*
2450 	 * Obtain the current persistence of the existing index.  We already hold
2451 	 * lock on the index.
2452 	 */
2453 	irel = index_open(indOid, NoLock);
2454 
2455 	if (irel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
2456 	{
2457 		ReindexPartitionedIndex(irel);
2458 		return;
2459 	}
2460 
2461 	persistence = irel->rd_rel->relpersistence;
2462 	index_close(irel, NoLock);
2463 
2464 	if (concurrent && persistence != RELPERSISTENCE_TEMP)
2465 		ReindexRelationConcurrently(indOid, options);
2466 	else
2467 		reindex_index(indOid, false, persistence,
2468 					  options | REINDEXOPT_REPORT_PROGRESS);
2469 }
2470 
2471 /*
2472  * Check permissions on table before acquiring relation lock; also lock
2473  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
2474  * deadlocks.
2475  */
2476 static void
RangeVarCallbackForReindexIndex(const RangeVar * relation,Oid relId,Oid oldRelId,void * arg)2477 RangeVarCallbackForReindexIndex(const RangeVar *relation,
2478 								Oid relId, Oid oldRelId, void *arg)
2479 {
2480 	char		relkind;
2481 	struct ReindexIndexCallbackState *state = arg;
2482 	LOCKMODE	table_lockmode;
2483 
2484 	/*
2485 	 * Lock level here should match table lock in reindex_index() for
2486 	 * non-concurrent case and table locks used by index_concurrently_*() for
2487 	 * concurrent case.
2488 	 */
2489 	table_lockmode = state->concurrent ? ShareUpdateExclusiveLock : ShareLock;
2490 
2491 	/*
2492 	 * If we previously locked some other index's heap, and the name we're
2493 	 * looking up no longer refers to that relation, release the now-useless
2494 	 * lock.
2495 	 */
2496 	if (relId != oldRelId && OidIsValid(oldRelId))
2497 	{
2498 		UnlockRelationOid(state->locked_table_oid, table_lockmode);
2499 		state->locked_table_oid = InvalidOid;
2500 	}
2501 
2502 	/* If the relation does not exist, there's nothing more to do. */
2503 	if (!OidIsValid(relId))
2504 		return;
2505 
2506 	/*
2507 	 * If the relation does exist, check whether it's an index.  But note that
2508 	 * the relation might have been dropped between the time we did the name
2509 	 * lookup and now.  In that case, there's nothing to do.
2510 	 */
2511 	relkind = get_rel_relkind(relId);
2512 	if (!relkind)
2513 		return;
2514 	if (relkind != RELKIND_INDEX &&
2515 		relkind != RELKIND_PARTITIONED_INDEX)
2516 		ereport(ERROR,
2517 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2518 				 errmsg("\"%s\" is not an index", relation->relname)));
2519 
2520 	/* Check permissions */
2521 	if (!pg_class_ownercheck(relId, GetUserId()))
2522 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX, relation->relname);
2523 
2524 	/* Lock heap before index to avoid deadlock. */
2525 	if (relId != oldRelId)
2526 	{
2527 		Oid			table_oid = IndexGetRelation(relId, true);
2528 
2529 		/*
2530 		 * If the OID isn't valid, it means the index was concurrently
2531 		 * dropped, which is not a problem for us; just return normally.
2532 		 */
2533 		if (OidIsValid(table_oid))
2534 		{
2535 			LockRelationOid(table_oid, table_lockmode);
2536 			state->locked_table_oid = table_oid;
2537 		}
2538 	}
2539 }
2540 
2541 /*
2542  * ReindexTable
2543  *		Recreate all indexes of a table (and of its toast table, if any)
2544  */
2545 Oid
ReindexTable(RangeVar * relation,int options,bool concurrent)2546 ReindexTable(RangeVar *relation, int options, bool concurrent)
2547 {
2548 	Oid			heapOid;
2549 	bool		result;
2550 
2551 	/*
2552 	 * The lock level used here should match reindex_relation().
2553 	 *
2554 	 * If it's a temporary table, we will perform a non-concurrent reindex,
2555 	 * even if CONCURRENTLY was requested.  In that case, reindex_relation()
2556 	 * will upgrade the lock, but that's OK, because other sessions can't hold
2557 	 * locks on our temporary table.
2558 	 */
2559 	heapOid = RangeVarGetRelidExtended(relation,
2560 									   concurrent ? ShareUpdateExclusiveLock : ShareLock,
2561 									   0,
2562 									   RangeVarCallbackOwnsTable, NULL);
2563 
2564 	if (concurrent && get_rel_persistence(heapOid) != RELPERSISTENCE_TEMP)
2565 	{
2566 		result = ReindexRelationConcurrently(heapOid, options);
2567 
2568 		if (!result)
2569 			ereport(NOTICE,
2570 					(errmsg("table \"%s\" has no indexes that can be reindexed concurrently",
2571 							relation->relname)));
2572 	}
2573 	else
2574 	{
2575 		result = reindex_relation(heapOid,
2576 								  REINDEX_REL_PROCESS_TOAST |
2577 								  REINDEX_REL_CHECK_CONSTRAINTS,
2578 								  options | REINDEXOPT_REPORT_PROGRESS);
2579 		if (!result)
2580 			ereport(NOTICE,
2581 					(errmsg("table \"%s\" has no indexes to reindex",
2582 							relation->relname)));
2583 	}
2584 
2585 	return heapOid;
2586 }
2587 
2588 /*
2589  * ReindexMultipleTables
2590  *		Recreate indexes of tables selected by objectName/objectKind.
2591  *
2592  * To reduce the probability of deadlocks, each table is reindexed in a
2593  * separate transaction, so we can release the lock on it right away.
2594  * That means this must not be called within a user transaction block!
2595  */
2596 void
ReindexMultipleTables(const char * objectName,ReindexObjectType objectKind,int options,bool concurrent)2597 ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
2598 					  int options, bool concurrent)
2599 {
2600 	Oid			objectOid;
2601 	Relation	relationRelation;
2602 	TableScanDesc scan;
2603 	ScanKeyData scan_keys[1];
2604 	HeapTuple	tuple;
2605 	MemoryContext private_context;
2606 	MemoryContext old;
2607 	List	   *relids = NIL;
2608 	ListCell   *l;
2609 	int			num_keys;
2610 	bool		concurrent_warning = false;
2611 
2612 	AssertArg(objectName);
2613 	Assert(objectKind == REINDEX_OBJECT_SCHEMA ||
2614 		   objectKind == REINDEX_OBJECT_SYSTEM ||
2615 		   objectKind == REINDEX_OBJECT_DATABASE);
2616 
2617 	if (objectKind == REINDEX_OBJECT_SYSTEM && concurrent)
2618 		ereport(ERROR,
2619 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2620 				 errmsg("cannot reindex system catalogs concurrently")));
2621 
2622 	/*
2623 	 * Get OID of object to reindex, being the database currently being used
2624 	 * by session for a database or for system catalogs, or the schema defined
2625 	 * by caller. At the same time do permission checks that need different
2626 	 * processing depending on the object type.
2627 	 */
2628 	if (objectKind == REINDEX_OBJECT_SCHEMA)
2629 	{
2630 		objectOid = get_namespace_oid(objectName, false);
2631 
2632 		if (!pg_namespace_ownercheck(objectOid, GetUserId()))
2633 			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
2634 						   objectName);
2635 	}
2636 	else
2637 	{
2638 		objectOid = MyDatabaseId;
2639 
2640 		if (strcmp(objectName, get_database_name(objectOid)) != 0)
2641 			ereport(ERROR,
2642 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2643 					 errmsg("can only reindex the currently open database")));
2644 		if (!pg_database_ownercheck(objectOid, GetUserId()))
2645 			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2646 						   objectName);
2647 	}
2648 
2649 	/*
2650 	 * Create a memory context that will survive forced transaction commits we
2651 	 * do below.  Since it is a child of PortalContext, it will go away
2652 	 * eventually even if we suffer an error; there's no need for special
2653 	 * abort cleanup logic.
2654 	 */
2655 	private_context = AllocSetContextCreate(PortalContext,
2656 											"ReindexMultipleTables",
2657 											ALLOCSET_SMALL_SIZES);
2658 
2659 	/*
2660 	 * Define the search keys to find the objects to reindex. For a schema, we
2661 	 * select target relations using relnamespace, something not necessary for
2662 	 * a database-wide operation.
2663 	 */
2664 	if (objectKind == REINDEX_OBJECT_SCHEMA)
2665 	{
2666 		num_keys = 1;
2667 		ScanKeyInit(&scan_keys[0],
2668 					Anum_pg_class_relnamespace,
2669 					BTEqualStrategyNumber, F_OIDEQ,
2670 					ObjectIdGetDatum(objectOid));
2671 	}
2672 	else
2673 		num_keys = 0;
2674 
2675 	/*
2676 	 * Scan pg_class to build a list of the relations we need to reindex.
2677 	 *
2678 	 * We only consider plain relations and materialized views here (toast
2679 	 * rels will be processed indirectly by reindex_relation).
2680 	 */
2681 	relationRelation = table_open(RelationRelationId, AccessShareLock);
2682 	scan = table_beginscan_catalog(relationRelation, num_keys, scan_keys);
2683 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2684 	{
2685 		Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
2686 		Oid			relid = classtuple->oid;
2687 
2688 		/*
2689 		 * Only regular tables and matviews can have indexes, so ignore any
2690 		 * other kind of relation.
2691 		 *
2692 		 * It is tempting to also consider partitioned tables here, but that
2693 		 * has the problem that if the children are in the same schema, they
2694 		 * would be processed twice.  Maybe we could have a separate list of
2695 		 * partitioned tables, and expand that afterwards into relids,
2696 		 * ignoring any duplicates.
2697 		 */
2698 		if (classtuple->relkind != RELKIND_RELATION &&
2699 			classtuple->relkind != RELKIND_MATVIEW)
2700 			continue;
2701 
2702 		/* Skip temp tables of other backends; we can't reindex them at all */
2703 		if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
2704 			!isTempNamespace(classtuple->relnamespace))
2705 			continue;
2706 
2707 		/* Check user/system classification, and optionally skip */
2708 		if (objectKind == REINDEX_OBJECT_SYSTEM &&
2709 			!IsSystemClass(relid, classtuple))
2710 			continue;
2711 
2712 		/*
2713 		 * The table can be reindexed if the user is superuser, the table
2714 		 * owner, or the database/schema owner (but in the latter case, only
2715 		 * if it's not a shared relation).  pg_class_ownercheck includes the
2716 		 * superuser case, and depending on objectKind we already know that
2717 		 * the user has permission to run REINDEX on this database or schema
2718 		 * per the permission checks at the beginning of this routine.
2719 		 */
2720 		if (classtuple->relisshared &&
2721 			!pg_class_ownercheck(relid, GetUserId()))
2722 			continue;
2723 
2724 		/*
2725 		 * Skip system tables, since index_create() would reject indexing them
2726 		 * concurrently (and it would likely fail if we tried).
2727 		 */
2728 		if (concurrent &&
2729 			IsCatalogRelationOid(relid))
2730 		{
2731 			if (!concurrent_warning)
2732 				ereport(WARNING,
2733 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2734 						 errmsg("cannot reindex system catalogs concurrently, skipping all")));
2735 			concurrent_warning = true;
2736 			continue;
2737 		}
2738 
2739 		/* Save the list of relation OIDs in private context */
2740 		old = MemoryContextSwitchTo(private_context);
2741 
2742 		/*
2743 		 * We always want to reindex pg_class first if it's selected to be
2744 		 * reindexed.  This ensures that if there is any corruption in
2745 		 * pg_class' indexes, they will be fixed before we process any other
2746 		 * tables.  This is critical because reindexing itself will try to
2747 		 * update pg_class.
2748 		 */
2749 		if (relid == RelationRelationId)
2750 			relids = lcons_oid(relid, relids);
2751 		else
2752 			relids = lappend_oid(relids, relid);
2753 
2754 		MemoryContextSwitchTo(old);
2755 	}
2756 	table_endscan(scan);
2757 	table_close(relationRelation, AccessShareLock);
2758 
2759 	/* Now reindex each rel in a separate transaction */
2760 	PopActiveSnapshot();
2761 	CommitTransactionCommand();
2762 	foreach(l, relids)
2763 	{
2764 		Oid			relid = lfirst_oid(l);
2765 
2766 		StartTransactionCommand();
2767 		/* functions in indexes may want a snapshot set */
2768 		PushActiveSnapshot(GetTransactionSnapshot());
2769 
2770 		if (concurrent && get_rel_persistence(relid) != RELPERSISTENCE_TEMP)
2771 		{
2772 			(void) ReindexRelationConcurrently(relid, options);
2773 			/* ReindexRelationConcurrently() does the verbose output */
2774 		}
2775 		else
2776 		{
2777 			bool		result;
2778 
2779 			result = reindex_relation(relid,
2780 									  REINDEX_REL_PROCESS_TOAST |
2781 									  REINDEX_REL_CHECK_CONSTRAINTS,
2782 									  options | REINDEXOPT_REPORT_PROGRESS);
2783 
2784 			if (result && (options & REINDEXOPT_VERBOSE))
2785 				ereport(INFO,
2786 						(errmsg("table \"%s.%s\" was reindexed",
2787 								get_namespace_name(get_rel_namespace(relid)),
2788 								get_rel_name(relid))));
2789 
2790 			PopActiveSnapshot();
2791 		}
2792 
2793 		CommitTransactionCommand();
2794 	}
2795 	StartTransactionCommand();
2796 
2797 	MemoryContextDelete(private_context);
2798 }
2799 
2800 
2801 /*
2802  * ReindexRelationConcurrently - process REINDEX CONCURRENTLY for given
2803  * relation OID
2804  *
2805  * 'relationOid' can either belong to an index, a table or a materialized
2806  * view.  For tables and materialized views, all its indexes will be rebuilt,
2807  * excluding invalid indexes and any indexes used in exclusion constraints,
2808  * but including its associated toast table indexes.  For indexes, the index
2809  * itself will be rebuilt.  If 'relationOid' belongs to a partitioned table
2810  * then we issue a warning to mention these are not yet supported.
2811  *
2812  * The locks taken on parent tables and involved indexes are kept until the
2813  * transaction is committed, at which point a session lock is taken on each
2814  * relation.  Both of these protect against concurrent schema changes.
2815  *
2816  * Returns true if any indexes have been rebuilt (including toast table's
2817  * indexes, when relevant), otherwise returns false.
2818  *
2819  * NOTE: This cannot be used on temporary relations.  A concurrent build would
2820  * cause issues with ON COMMIT actions triggered by the transactions of the
2821  * concurrent build.  Temporary relations are not subject to concurrent
2822  * concerns, so there's no need for the more complicated concurrent build,
2823  * anyway, and a non-concurrent reindex is more efficient.
2824  */
2825 static bool
ReindexRelationConcurrently(Oid relationOid,int options)2826 ReindexRelationConcurrently(Oid relationOid, int options)
2827 {
2828 	List	   *heapRelationIds = NIL;
2829 	List	   *indexIds = NIL;
2830 	List	   *newIndexIds = NIL;
2831 	List	   *relationLocks = NIL;
2832 	List	   *lockTags = NIL;
2833 	ListCell   *lc,
2834 			   *lc2;
2835 	MemoryContext private_context;
2836 	MemoryContext oldcontext;
2837 	char		relkind;
2838 	char	   *relationName = NULL;
2839 	char	   *relationNamespace = NULL;
2840 	PGRUsage	ru0;
2841 	const int	progress_index[] = {
2842 		PROGRESS_CREATEIDX_COMMAND,
2843 		PROGRESS_CREATEIDX_PHASE,
2844 		PROGRESS_CREATEIDX_INDEX_OID,
2845 		PROGRESS_CREATEIDX_ACCESS_METHOD_OID
2846 	};
2847 	int64		progress_vals[4];
2848 
2849 	/*
2850 	 * Create a memory context that will survive forced transaction commits we
2851 	 * do below.  Since it is a child of PortalContext, it will go away
2852 	 * eventually even if we suffer an error; there's no need for special
2853 	 * abort cleanup logic.
2854 	 */
2855 	private_context = AllocSetContextCreate(PortalContext,
2856 											"ReindexConcurrent",
2857 											ALLOCSET_SMALL_SIZES);
2858 
2859 	if (options & REINDEXOPT_VERBOSE)
2860 	{
2861 		/* Save data needed by REINDEX VERBOSE in private context */
2862 		oldcontext = MemoryContextSwitchTo(private_context);
2863 
2864 		relationName = get_rel_name(relationOid);
2865 		relationNamespace = get_namespace_name(get_rel_namespace(relationOid));
2866 
2867 		pg_rusage_init(&ru0);
2868 
2869 		MemoryContextSwitchTo(oldcontext);
2870 	}
2871 
2872 	relkind = get_rel_relkind(relationOid);
2873 
2874 	/*
2875 	 * Extract the list of indexes that are going to be rebuilt based on the
2876 	 * relation Oid given by caller.
2877 	 */
2878 	switch (relkind)
2879 	{
2880 		case RELKIND_RELATION:
2881 		case RELKIND_MATVIEW:
2882 		case RELKIND_TOASTVALUE:
2883 			{
2884 				/*
2885 				 * In the case of a relation, find all its indexes including
2886 				 * toast indexes.
2887 				 */
2888 				Relation	heapRelation;
2889 
2890 				/* Save the list of relation OIDs in private context */
2891 				oldcontext = MemoryContextSwitchTo(private_context);
2892 
2893 				/* Track this relation for session locks */
2894 				heapRelationIds = lappend_oid(heapRelationIds, relationOid);
2895 
2896 				MemoryContextSwitchTo(oldcontext);
2897 
2898 				if (IsCatalogRelationOid(relationOid))
2899 					ereport(ERROR,
2900 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2901 							 errmsg("cannot reindex system catalogs concurrently")));
2902 
2903 				/* Open relation to get its indexes */
2904 				heapRelation = table_open(relationOid, ShareUpdateExclusiveLock);
2905 
2906 				/* Add all the valid indexes of relation to list */
2907 				foreach(lc, RelationGetIndexList(heapRelation))
2908 				{
2909 					Oid			cellOid = lfirst_oid(lc);
2910 					Relation	indexRelation = index_open(cellOid,
2911 														   ShareUpdateExclusiveLock);
2912 
2913 					if (!indexRelation->rd_index->indisvalid)
2914 						ereport(WARNING,
2915 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2916 								 errmsg("cannot reindex invalid index \"%s.%s\" concurrently, skipping",
2917 										get_namespace_name(get_rel_namespace(cellOid)),
2918 										get_rel_name(cellOid))));
2919 					else if (indexRelation->rd_index->indisexclusion)
2920 						ereport(WARNING,
2921 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2922 								 errmsg("cannot reindex exclusion constraint index \"%s.%s\" concurrently, skipping",
2923 										get_namespace_name(get_rel_namespace(cellOid)),
2924 										get_rel_name(cellOid))));
2925 					else
2926 					{
2927 						/* Save the list of relation OIDs in private context */
2928 						oldcontext = MemoryContextSwitchTo(private_context);
2929 
2930 						indexIds = lappend_oid(indexIds, cellOid);
2931 
2932 						MemoryContextSwitchTo(oldcontext);
2933 					}
2934 
2935 					index_close(indexRelation, NoLock);
2936 				}
2937 
2938 				/* Also add the toast indexes */
2939 				if (OidIsValid(heapRelation->rd_rel->reltoastrelid))
2940 				{
2941 					Oid			toastOid = heapRelation->rd_rel->reltoastrelid;
2942 					Relation	toastRelation = table_open(toastOid,
2943 														   ShareUpdateExclusiveLock);
2944 
2945 					/* Save the list of relation OIDs in private context */
2946 					oldcontext = MemoryContextSwitchTo(private_context);
2947 
2948 					/* Track this relation for session locks */
2949 					heapRelationIds = lappend_oid(heapRelationIds, toastOid);
2950 
2951 					MemoryContextSwitchTo(oldcontext);
2952 
2953 					foreach(lc2, RelationGetIndexList(toastRelation))
2954 					{
2955 						Oid			cellOid = lfirst_oid(lc2);
2956 						Relation	indexRelation = index_open(cellOid,
2957 															   ShareUpdateExclusiveLock);
2958 
2959 						if (!indexRelation->rd_index->indisvalid)
2960 							ereport(WARNING,
2961 									(errcode(ERRCODE_INDEX_CORRUPTED),
2962 									 errmsg("cannot reindex invalid index \"%s.%s\" concurrently, skipping",
2963 											get_namespace_name(get_rel_namespace(cellOid)),
2964 											get_rel_name(cellOid))));
2965 						else
2966 						{
2967 							/*
2968 							 * Save the list of relation OIDs in private
2969 							 * context
2970 							 */
2971 							oldcontext = MemoryContextSwitchTo(private_context);
2972 
2973 							indexIds = lappend_oid(indexIds, cellOid);
2974 
2975 							MemoryContextSwitchTo(oldcontext);
2976 						}
2977 
2978 						index_close(indexRelation, NoLock);
2979 					}
2980 
2981 					table_close(toastRelation, NoLock);
2982 				}
2983 
2984 				table_close(heapRelation, NoLock);
2985 				break;
2986 			}
2987 		case RELKIND_INDEX:
2988 			{
2989 				Oid			heapId = IndexGetRelation(relationOid, false);
2990 
2991 				if (IsCatalogRelationOid(heapId))
2992 					ereport(ERROR,
2993 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2994 							 errmsg("cannot reindex system catalogs concurrently")));
2995 
2996 				/*
2997 				 * Don't allow reindex for an invalid index on TOAST table, as
2998 				 * if rebuilt it would not be possible to drop it.  Match
2999 				 * error message in reindex_index().
3000 				 */
3001 				if (IsToastNamespace(get_rel_namespace(relationOid)) &&
3002 					!get_index_isvalid(relationOid))
3003 					ereport(ERROR,
3004 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3005 							 errmsg("cannot reindex invalid index on TOAST table")));
3006 
3007 				/* Save the list of relation OIDs in private context */
3008 				oldcontext = MemoryContextSwitchTo(private_context);
3009 
3010 				/* Track the heap relation of this index for session locks */
3011 				heapRelationIds = list_make1_oid(heapId);
3012 
3013 				/*
3014 				 * Save the list of relation OIDs in private context.  Note
3015 				 * that invalid indexes are allowed here.
3016 				 */
3017 				indexIds = lappend_oid(indexIds, relationOid);
3018 
3019 				MemoryContextSwitchTo(oldcontext);
3020 				break;
3021 			}
3022 		case RELKIND_PARTITIONED_TABLE:
3023 			/* see reindex_relation() */
3024 			ereport(WARNING,
3025 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3026 					 errmsg("REINDEX of partitioned tables is not yet implemented, skipping \"%s\"",
3027 							get_rel_name(relationOid))));
3028 			return false;
3029 		default:
3030 			/* Return error if type of relation is not supported */
3031 			ereport(ERROR,
3032 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
3033 					 errmsg("cannot reindex this type of relation concurrently")));
3034 			break;
3035 	}
3036 
3037 	/* Definitely no indexes, so leave */
3038 	if (indexIds == NIL)
3039 	{
3040 		PopActiveSnapshot();
3041 		return false;
3042 	}
3043 
3044 	Assert(heapRelationIds != NIL);
3045 
3046 	/*-----
3047 	 * Now we have all the indexes we want to process in indexIds.
3048 	 *
3049 	 * The phases now are:
3050 	 *
3051 	 * 1. create new indexes in the catalog
3052 	 * 2. build new indexes
3053 	 * 3. let new indexes catch up with tuples inserted in the meantime
3054 	 * 4. swap index names
3055 	 * 5. mark old indexes as dead
3056 	 * 6. drop old indexes
3057 	 *
3058 	 * We process each phase for all indexes before moving to the next phase,
3059 	 * for efficiency.
3060 	 */
3061 
3062 	/*
3063 	 * Phase 1 of REINDEX CONCURRENTLY
3064 	 *
3065 	 * Create a new index with the same properties as the old one, but it is
3066 	 * only registered in catalogs and will be built later.  Then get session
3067 	 * locks on all involved tables.  See analogous code in DefineIndex() for
3068 	 * more detailed comments.
3069 	 */
3070 
3071 	foreach(lc, indexIds)
3072 	{
3073 		char	   *concurrentName;
3074 		Oid			indexId = lfirst_oid(lc);
3075 		Oid			newIndexId;
3076 		Relation	indexRel;
3077 		Relation	heapRel;
3078 		Relation	newIndexRel;
3079 		LockRelId  *lockrelid;
3080 
3081 		indexRel = index_open(indexId, ShareUpdateExclusiveLock);
3082 		heapRel = table_open(indexRel->rd_index->indrelid,
3083 							 ShareUpdateExclusiveLock);
3084 
3085 		/* This function shouldn't be called for temporary relations. */
3086 		if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
3087 			elog(ERROR, "cannot reindex a temporary table concurrently");
3088 
3089 		pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
3090 									  RelationGetRelid(heapRel));
3091 		progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
3092 		progress_vals[1] = 0;	/* initializing */
3093 		progress_vals[2] = indexId;
3094 		progress_vals[3] = indexRel->rd_rel->relam;
3095 		pgstat_progress_update_multi_param(4, progress_index, progress_vals);
3096 
3097 		/* Choose a temporary relation name for the new index */
3098 		concurrentName = ChooseRelationName(get_rel_name(indexId),
3099 											NULL,
3100 											"ccnew",
3101 											get_rel_namespace(indexRel->rd_index->indrelid),
3102 											false);
3103 
3104 		/* Create new index definition based on given index */
3105 		newIndexId = index_concurrently_create_copy(heapRel,
3106 													indexId,
3107 													concurrentName);
3108 
3109 		/*
3110 		 * Now open the relation of the new index, a session-level lock is
3111 		 * also needed on it.
3112 		 */
3113 		newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
3114 
3115 		/*
3116 		 * Save the list of OIDs and locks in private context
3117 		 */
3118 		oldcontext = MemoryContextSwitchTo(private_context);
3119 
3120 		newIndexIds = lappend_oid(newIndexIds, newIndexId);
3121 
3122 		/*
3123 		 * Save lockrelid to protect each relation from drop then close
3124 		 * relations. The lockrelid on parent relation is not taken here to
3125 		 * avoid multiple locks taken on the same relation, instead we rely on
3126 		 * parentRelationIds built earlier.
3127 		 */
3128 		lockrelid = palloc(sizeof(*lockrelid));
3129 		*lockrelid = indexRel->rd_lockInfo.lockRelId;
3130 		relationLocks = lappend(relationLocks, lockrelid);
3131 		lockrelid = palloc(sizeof(*lockrelid));
3132 		*lockrelid = newIndexRel->rd_lockInfo.lockRelId;
3133 		relationLocks = lappend(relationLocks, lockrelid);
3134 
3135 		MemoryContextSwitchTo(oldcontext);
3136 
3137 		index_close(indexRel, NoLock);
3138 		index_close(newIndexRel, NoLock);
3139 		table_close(heapRel, NoLock);
3140 	}
3141 
3142 	/*
3143 	 * Save the heap lock for following visibility checks with other backends
3144 	 * might conflict with this session.
3145 	 */
3146 	foreach(lc, heapRelationIds)
3147 	{
3148 		Relation	heapRelation = table_open(lfirst_oid(lc), ShareUpdateExclusiveLock);
3149 		LockRelId  *lockrelid;
3150 		LOCKTAG    *heaplocktag;
3151 
3152 		/* Save the list of locks in private context */
3153 		oldcontext = MemoryContextSwitchTo(private_context);
3154 
3155 		/* Add lockrelid of heap relation to the list of locked relations */
3156 		lockrelid = palloc(sizeof(*lockrelid));
3157 		*lockrelid = heapRelation->rd_lockInfo.lockRelId;
3158 		relationLocks = lappend(relationLocks, lockrelid);
3159 
3160 		heaplocktag = (LOCKTAG *) palloc(sizeof(LOCKTAG));
3161 
3162 		/* Save the LOCKTAG for this parent relation for the wait phase */
3163 		SET_LOCKTAG_RELATION(*heaplocktag, lockrelid->dbId, lockrelid->relId);
3164 		lockTags = lappend(lockTags, heaplocktag);
3165 
3166 		MemoryContextSwitchTo(oldcontext);
3167 
3168 		/* Close heap relation */
3169 		table_close(heapRelation, NoLock);
3170 	}
3171 
3172 	/* Get a session-level lock on each table. */
3173 	foreach(lc, relationLocks)
3174 	{
3175 		LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
3176 
3177 		LockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
3178 	}
3179 
3180 	PopActiveSnapshot();
3181 	CommitTransactionCommand();
3182 	StartTransactionCommand();
3183 
3184 	/*
3185 	 * Phase 2 of REINDEX CONCURRENTLY
3186 	 *
3187 	 * Build the new indexes in a separate transaction for each index to avoid
3188 	 * having open transactions for an unnecessary long time.  But before
3189 	 * doing that, wait until no running transactions could have the table of
3190 	 * the index open with the old list of indexes.  See "phase 2" in
3191 	 * DefineIndex() for more details.
3192 	 */
3193 
3194 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3195 								 PROGRESS_CREATEIDX_PHASE_WAIT_1);
3196 	WaitForLockersMultiple(lockTags, ShareLock, true);
3197 	CommitTransactionCommand();
3198 
3199 	foreach(lc, newIndexIds)
3200 	{
3201 		Relation	newIndexRel;
3202 		Oid			newIndexId = lfirst_oid(lc);
3203 		Oid			heapId;
3204 		Oid			indexam;
3205 
3206 		/* Start new transaction for this index's concurrent build */
3207 		StartTransactionCommand();
3208 
3209 		/*
3210 		 * Check for user-requested abort.  This is inside a transaction so as
3211 		 * xact.c does not issue a useless WARNING, and ensures that
3212 		 * session-level locks are cleaned up on abort.
3213 		 */
3214 		CHECK_FOR_INTERRUPTS();
3215 
3216 		/* Set ActiveSnapshot since functions in the indexes may need it */
3217 		PushActiveSnapshot(GetTransactionSnapshot());
3218 
3219 		/*
3220 		 * Index relation has been closed by previous commit, so reopen it to
3221 		 * get its information.
3222 		 */
3223 		newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
3224 		heapId = newIndexRel->rd_index->indrelid;
3225 		indexam = newIndexRel->rd_rel->relam;
3226 		index_close(newIndexRel, NoLock);
3227 
3228 		/*
3229 		 * Update progress for the index to build, with the correct parent
3230 		 * table involved.
3231 		 */
3232 		pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, heapId);
3233 		progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
3234 		progress_vals[1] = PROGRESS_CREATEIDX_PHASE_BUILD;
3235 		progress_vals[2] = newIndexId;
3236 		progress_vals[3] = indexam;
3237 		pgstat_progress_update_multi_param(4, progress_index, progress_vals);
3238 
3239 		/* Perform concurrent build of new index */
3240 		index_concurrently_build(heapId, newIndexId);
3241 
3242 		PopActiveSnapshot();
3243 		CommitTransactionCommand();
3244 	}
3245 	StartTransactionCommand();
3246 
3247 	/*
3248 	 * Phase 3 of REINDEX CONCURRENTLY
3249 	 *
3250 	 * During this phase the old indexes catch up with any new tuples that
3251 	 * were created during the previous phase.  See "phase 3" in DefineIndex()
3252 	 * for more details.
3253 	 */
3254 
3255 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3256 								 PROGRESS_CREATEIDX_PHASE_WAIT_2);
3257 	WaitForLockersMultiple(lockTags, ShareLock, true);
3258 	CommitTransactionCommand();
3259 
3260 	foreach(lc, newIndexIds)
3261 	{
3262 		Oid			newIndexId = lfirst_oid(lc);
3263 		Oid			heapId;
3264 		TransactionId limitXmin;
3265 		Snapshot	snapshot;
3266 		Relation	newIndexRel;
3267 		Oid			indexam;
3268 
3269 		StartTransactionCommand();
3270 
3271 		/*
3272 		 * Check for user-requested abort.  This is inside a transaction so as
3273 		 * xact.c does not issue a useless WARNING, and ensures that
3274 		 * session-level locks are cleaned up on abort.
3275 		 */
3276 		CHECK_FOR_INTERRUPTS();
3277 
3278 		/*
3279 		 * Take the "reference snapshot" that will be used by validate_index()
3280 		 * to filter candidate tuples.
3281 		 */
3282 		snapshot = RegisterSnapshot(GetTransactionSnapshot());
3283 		PushActiveSnapshot(snapshot);
3284 
3285 		/*
3286 		 * Index relation has been closed by previous commit, so reopen it to
3287 		 * get its information.
3288 		 */
3289 		newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
3290 		heapId = newIndexRel->rd_index->indrelid;
3291 		indexam = newIndexRel->rd_rel->relam;
3292 		index_close(newIndexRel, NoLock);
3293 
3294 		/*
3295 		 * Update progress for the index to build, with the correct parent
3296 		 * table involved.
3297 		 */
3298 		pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, heapId);
3299 		progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
3300 		progress_vals[1] = PROGRESS_CREATEIDX_PHASE_VALIDATE_IDXSCAN;
3301 		progress_vals[2] = newIndexId;
3302 		progress_vals[3] = indexam;
3303 		pgstat_progress_update_multi_param(4, progress_index, progress_vals);
3304 
3305 		validate_index(heapId, newIndexId, snapshot);
3306 
3307 		/*
3308 		 * We can now do away with our active snapshot, we still need to save
3309 		 * the xmin limit to wait for older snapshots.
3310 		 */
3311 		limitXmin = snapshot->xmin;
3312 
3313 		PopActiveSnapshot();
3314 		UnregisterSnapshot(snapshot);
3315 
3316 		/*
3317 		 * To ensure no deadlocks, we must commit and start yet another
3318 		 * transaction, and do our wait before any snapshot has been taken in
3319 		 * it.
3320 		 */
3321 		CommitTransactionCommand();
3322 		StartTransactionCommand();
3323 
3324 		/*
3325 		 * The index is now valid in the sense that it contains all currently
3326 		 * interesting tuples.  But since it might not contain tuples deleted
3327 		 * just before the reference snap was taken, we have to wait out any
3328 		 * transactions that might have older snapshots.
3329 		 */
3330 		pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3331 									 PROGRESS_CREATEIDX_PHASE_WAIT_3);
3332 		WaitForOlderSnapshots(limitXmin, true);
3333 
3334 		CommitTransactionCommand();
3335 	}
3336 
3337 	/*
3338 	 * Phase 4 of REINDEX CONCURRENTLY
3339 	 *
3340 	 * Now that the new indexes have been validated, swap each new index with
3341 	 * its corresponding old index.
3342 	 *
3343 	 * We mark the new indexes as valid and the old indexes as not valid at
3344 	 * the same time to make sure we only get constraint violations from the
3345 	 * indexes with the correct names.
3346 	 */
3347 
3348 	StartTransactionCommand();
3349 
3350 	forboth(lc, indexIds, lc2, newIndexIds)
3351 	{
3352 		char	   *oldName;
3353 		Oid			oldIndexId = lfirst_oid(lc);
3354 		Oid			newIndexId = lfirst_oid(lc2);
3355 		Oid			heapId;
3356 
3357 		/*
3358 		 * Check for user-requested abort.  This is inside a transaction so as
3359 		 * xact.c does not issue a useless WARNING, and ensures that
3360 		 * session-level locks are cleaned up on abort.
3361 		 */
3362 		CHECK_FOR_INTERRUPTS();
3363 
3364 		heapId = IndexGetRelation(oldIndexId, false);
3365 
3366 		/* Choose a relation name for old index */
3367 		oldName = ChooseRelationName(get_rel_name(oldIndexId),
3368 									 NULL,
3369 									 "ccold",
3370 									 get_rel_namespace(heapId),
3371 									 false);
3372 
3373 		/*
3374 		 * Swap old index with the new one.  This also marks the new one as
3375 		 * valid and the old one as not valid.
3376 		 */
3377 		index_concurrently_swap(newIndexId, oldIndexId, oldName);
3378 
3379 		/*
3380 		 * Invalidate the relcache for the table, so that after this commit
3381 		 * all sessions will refresh any cached plans that might reference the
3382 		 * index.
3383 		 */
3384 		CacheInvalidateRelcacheByRelid(heapId);
3385 
3386 		/*
3387 		 * CCI here so that subsequent iterations see the oldName in the
3388 		 * catalog and can choose a nonconflicting name for their oldName.
3389 		 * Otherwise, this could lead to conflicts if a table has two indexes
3390 		 * whose names are equal for the first NAMEDATALEN-minus-a-few
3391 		 * characters.
3392 		 */
3393 		CommandCounterIncrement();
3394 	}
3395 
3396 	/* Commit this transaction and make index swaps visible */
3397 	CommitTransactionCommand();
3398 	StartTransactionCommand();
3399 
3400 	/*
3401 	 * Phase 5 of REINDEX CONCURRENTLY
3402 	 *
3403 	 * Mark the old indexes as dead.  First we must wait until no running
3404 	 * transaction could be using the index for a query.  See also
3405 	 * index_drop() for more details.
3406 	 */
3407 
3408 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3409 								 PROGRESS_CREATEIDX_PHASE_WAIT_4);
3410 	WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
3411 
3412 	foreach(lc, indexIds)
3413 	{
3414 		Oid			oldIndexId = lfirst_oid(lc);
3415 		Oid			heapId;
3416 
3417 		/*
3418 		 * Check for user-requested abort.  This is inside a transaction so as
3419 		 * xact.c does not issue a useless WARNING, and ensures that
3420 		 * session-level locks are cleaned up on abort.
3421 		 */
3422 		CHECK_FOR_INTERRUPTS();
3423 
3424 		heapId = IndexGetRelation(oldIndexId, false);
3425 		index_concurrently_set_dead(heapId, oldIndexId);
3426 	}
3427 
3428 	/* Commit this transaction to make the updates visible. */
3429 	CommitTransactionCommand();
3430 	StartTransactionCommand();
3431 
3432 	/*
3433 	 * Phase 6 of REINDEX CONCURRENTLY
3434 	 *
3435 	 * Drop the old indexes.
3436 	 */
3437 
3438 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3439 								 PROGRESS_CREATEIDX_PHASE_WAIT_5);
3440 	WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
3441 
3442 	PushActiveSnapshot(GetTransactionSnapshot());
3443 
3444 	{
3445 		ObjectAddresses *objects = new_object_addresses();
3446 
3447 		foreach(lc, indexIds)
3448 		{
3449 			Oid			oldIndexId = lfirst_oid(lc);
3450 			ObjectAddress object;
3451 
3452 			object.classId = RelationRelationId;
3453 			object.objectId = oldIndexId;
3454 			object.objectSubId = 0;
3455 
3456 			add_exact_object_address(&object, objects);
3457 		}
3458 
3459 		/*
3460 		 * Use PERFORM_DELETION_CONCURRENT_LOCK so that index_drop() uses the
3461 		 * right lock level.
3462 		 */
3463 		performMultipleDeletions(objects, DROP_RESTRICT,
3464 								 PERFORM_DELETION_CONCURRENT_LOCK | PERFORM_DELETION_INTERNAL);
3465 	}
3466 
3467 	PopActiveSnapshot();
3468 	CommitTransactionCommand();
3469 
3470 	/*
3471 	 * Finally, release the session-level lock on the table.
3472 	 */
3473 	foreach(lc, relationLocks)
3474 	{
3475 		LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
3476 
3477 		UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
3478 	}
3479 
3480 	/* Start a new transaction to finish process properly */
3481 	StartTransactionCommand();
3482 
3483 	/* Log what we did */
3484 	if (options & REINDEXOPT_VERBOSE)
3485 	{
3486 		if (relkind == RELKIND_INDEX)
3487 			ereport(INFO,
3488 					(errmsg("index \"%s.%s\" was reindexed",
3489 							relationNamespace, relationName),
3490 					 errdetail("%s.",
3491 							   pg_rusage_show(&ru0))));
3492 		else
3493 		{
3494 			foreach(lc, newIndexIds)
3495 			{
3496 				Oid			indOid = lfirst_oid(lc);
3497 
3498 				ereport(INFO,
3499 						(errmsg("index \"%s.%s\" was reindexed",
3500 								get_namespace_name(get_rel_namespace(indOid)),
3501 								get_rel_name(indOid))));
3502 				/* Don't show rusage here, since it's not per index. */
3503 			}
3504 
3505 			ereport(INFO,
3506 					(errmsg("table \"%s.%s\" was reindexed",
3507 							relationNamespace, relationName),
3508 					 errdetail("%s.",
3509 							   pg_rusage_show(&ru0))));
3510 		}
3511 	}
3512 
3513 	MemoryContextDelete(private_context);
3514 
3515 	pgstat_progress_end_command();
3516 
3517 	return true;
3518 }
3519 
3520 /*
3521  *	ReindexPartitionedIndex
3522  *		Reindex each child of the given partitioned index.
3523  *
3524  * Not yet implemented.
3525  */
3526 static void
ReindexPartitionedIndex(Relation parentIdx)3527 ReindexPartitionedIndex(Relation parentIdx)
3528 {
3529 	ereport(ERROR,
3530 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3531 			 errmsg("REINDEX is not yet implemented for partitioned indexes")));
3532 }
3533 
3534 /*
3535  * Insert or delete an appropriate pg_inherits tuple to make the given index
3536  * be a partition of the indicated parent index.
3537  *
3538  * This also corrects the pg_depend information for the affected index.
3539  */
3540 void
IndexSetParentIndex(Relation partitionIdx,Oid parentOid)3541 IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
3542 {
3543 	Relation	pg_inherits;
3544 	ScanKeyData key[2];
3545 	SysScanDesc scan;
3546 	Oid			partRelid = RelationGetRelid(partitionIdx);
3547 	HeapTuple	tuple;
3548 	bool		fix_dependencies;
3549 
3550 	/* Make sure this is an index */
3551 	Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
3552 		   partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
3553 
3554 	/*
3555 	 * Scan pg_inherits for rows linking our index to some parent.
3556 	 */
3557 	pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
3558 	ScanKeyInit(&key[0],
3559 				Anum_pg_inherits_inhrelid,
3560 				BTEqualStrategyNumber, F_OIDEQ,
3561 				ObjectIdGetDatum(partRelid));
3562 	ScanKeyInit(&key[1],
3563 				Anum_pg_inherits_inhseqno,
3564 				BTEqualStrategyNumber, F_INT4EQ,
3565 				Int32GetDatum(1));
3566 	scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
3567 							  NULL, 2, key);
3568 	tuple = systable_getnext(scan);
3569 
3570 	if (!HeapTupleIsValid(tuple))
3571 	{
3572 		if (parentOid == InvalidOid)
3573 		{
3574 			/*
3575 			 * No pg_inherits row, and no parent wanted: nothing to do in this
3576 			 * case.
3577 			 */
3578 			fix_dependencies = false;
3579 		}
3580 		else
3581 		{
3582 			StoreSingleInheritance(partRelid, parentOid, 1);
3583 			fix_dependencies = true;
3584 		}
3585 	}
3586 	else
3587 	{
3588 		Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
3589 
3590 		if (parentOid == InvalidOid)
3591 		{
3592 			/*
3593 			 * There exists a pg_inherits row, which we want to clear; do so.
3594 			 */
3595 			CatalogTupleDelete(pg_inherits, &tuple->t_self);
3596 			fix_dependencies = true;
3597 		}
3598 		else
3599 		{
3600 			/*
3601 			 * A pg_inherits row exists.  If it's the same we want, then we're
3602 			 * good; if it differs, that amounts to a corrupt catalog and
3603 			 * should not happen.
3604 			 */
3605 			if (inhForm->inhparent != parentOid)
3606 			{
3607 				/* unexpected: we should not get called in this case */
3608 				elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
3609 					 inhForm->inhrelid, inhForm->inhparent);
3610 			}
3611 
3612 			/* already in the right state */
3613 			fix_dependencies = false;
3614 		}
3615 	}
3616 
3617 	/* done with pg_inherits */
3618 	systable_endscan(scan);
3619 	relation_close(pg_inherits, RowExclusiveLock);
3620 
3621 	/* set relhassubclass if an index partition has been added to the parent */
3622 	if (OidIsValid(parentOid))
3623 		SetRelationHasSubclass(parentOid, true);
3624 
3625 	/* set relispartition correctly on the partition */
3626 	update_relispartition(partRelid, OidIsValid(parentOid));
3627 
3628 	if (fix_dependencies)
3629 	{
3630 		/*
3631 		 * Insert/delete pg_depend rows.  If setting a parent, add PARTITION
3632 		 * dependencies on the parent index and the table; if removing a
3633 		 * parent, delete PARTITION dependencies.
3634 		 */
3635 		if (OidIsValid(parentOid))
3636 		{
3637 			ObjectAddress partIdx;
3638 			ObjectAddress parentIdx;
3639 			ObjectAddress partitionTbl;
3640 
3641 			ObjectAddressSet(partIdx, RelationRelationId, partRelid);
3642 			ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
3643 			ObjectAddressSet(partitionTbl, RelationRelationId,
3644 							 partitionIdx->rd_index->indrelid);
3645 			recordDependencyOn(&partIdx, &parentIdx,
3646 							   DEPENDENCY_PARTITION_PRI);
3647 			recordDependencyOn(&partIdx, &partitionTbl,
3648 							   DEPENDENCY_PARTITION_SEC);
3649 		}
3650 		else
3651 		{
3652 			deleteDependencyRecordsForClass(RelationRelationId, partRelid,
3653 											RelationRelationId,
3654 											DEPENDENCY_PARTITION_PRI);
3655 			deleteDependencyRecordsForClass(RelationRelationId, partRelid,
3656 											RelationRelationId,
3657 											DEPENDENCY_PARTITION_SEC);
3658 		}
3659 
3660 		/* make our updates visible */
3661 		CommandCounterIncrement();
3662 	}
3663 }
3664 
3665 /*
3666  * Subroutine of IndexSetParentIndex to update the relispartition flag of the
3667  * given index to the given value.
3668  */
3669 static void
update_relispartition(Oid relationId,bool newval)3670 update_relispartition(Oid relationId, bool newval)
3671 {
3672 	HeapTuple	tup;
3673 	Relation	classRel;
3674 
3675 	classRel = table_open(RelationRelationId, RowExclusiveLock);
3676 	tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
3677 	if (!HeapTupleIsValid(tup))
3678 		elog(ERROR, "cache lookup failed for relation %u", relationId);
3679 	Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
3680 	((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
3681 	CatalogTupleUpdate(classRel, &tup->t_self, tup);
3682 	heap_freetuple(tup);
3683 	table_close(classRel, RowExclusiveLock);
3684 }
3685