1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *	  POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/commands/indexcmds.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres.h"
17 
18 #include "access/amapi.h"
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/reloptions.h"
22 #include "access/sysattr.h"
23 #include "access/tableam.h"
24 #include "access/xact.h"
25 #include "catalog/catalog.h"
26 #include "catalog/index.h"
27 #include "catalog/indexing.h"
28 #include "catalog/pg_am.h"
29 #include "catalog/pg_constraint.h"
30 #include "catalog/pg_inherits.h"
31 #include "catalog/pg_opclass.h"
32 #include "catalog/pg_opfamily.h"
33 #include "catalog/pg_tablespace.h"
34 #include "catalog/pg_type.h"
35 #include "commands/comment.h"
36 #include "commands/dbcommands.h"
37 #include "commands/defrem.h"
38 #include "commands/event_trigger.h"
39 #include "commands/progress.h"
40 #include "commands/tablecmds.h"
41 #include "commands/tablespace.h"
42 #include "mb/pg_wchar.h"
43 #include "miscadmin.h"
44 #include "nodes/makefuncs.h"
45 #include "nodes/nodeFuncs.h"
46 #include "optimizer/optimizer.h"
47 #include "parser/parse_coerce.h"
48 #include "parser/parse_func.h"
49 #include "parser/parse_oper.h"
50 #include "partitioning/partdesc.h"
51 #include "pgstat.h"
52 #include "rewrite/rewriteManip.h"
53 #include "storage/lmgr.h"
54 #include "storage/proc.h"
55 #include "storage/procarray.h"
56 #include "storage/sinvaladt.h"
57 #include "utils/acl.h"
58 #include "utils/builtins.h"
59 #include "utils/fmgroids.h"
60 #include "utils/inval.h"
61 #include "utils/lsyscache.h"
62 #include "utils/memutils.h"
63 #include "utils/partcache.h"
64 #include "utils/pg_rusage.h"
65 #include "utils/regproc.h"
66 #include "utils/snapmgr.h"
67 #include "utils/syscache.h"
68 
69 
70 /* non-export function prototypes */
71 static void CheckPredicate(Expr *predicate);
72 static void ComputeIndexAttrs(IndexInfo *indexInfo,
73 							  Oid *typeOidP,
74 							  Oid *collationOidP,
75 							  Oid *classOidP,
76 							  int16 *colOptionP,
77 							  List *attList,
78 							  List *exclusionOpNames,
79 							  Oid relId,
80 							  const char *accessMethodName, Oid accessMethodId,
81 							  bool amcanorder,
82 							  bool isconstraint);
83 static char *ChooseIndexName(const char *tabname, Oid namespaceId,
84 							 List *colnames, List *exclusionOpNames,
85 							 bool primary, bool isconstraint);
86 static char *ChooseIndexNameAddition(List *colnames);
87 static List *ChooseIndexColumnNames(List *indexElems);
88 static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
89 											Oid relId, Oid oldRelId, void *arg);
90 static bool ReindexRelationConcurrently(Oid relationOid, int options);
91 static void ReindexPartitionedIndex(Relation parentIdx);
92 static void update_relispartition(Oid relationId, bool newval);
93 
94 /*
95  * callback argument type for RangeVarCallbackForReindexIndex()
96  */
97 struct ReindexIndexCallbackState
98 {
99 	bool		concurrent;		/* flag from statement */
100 	Oid			locked_table_oid;	/* tracks previously locked table */
101 };
102 
103 /*
104  * CheckIndexCompatible
105  *		Determine whether an existing index definition is compatible with a
106  *		prospective index definition, such that the existing index storage
107  *		could become the storage of the new index, avoiding a rebuild.
108  *
109  * 'heapRelation': the relation the index would apply to.
110  * 'accessMethodName': name of the AM to use.
111  * 'attributeList': a list of IndexElem specifying columns and expressions
112  *		to index on.
113  * 'exclusionOpNames': list of names of exclusion-constraint operators,
114  *		or NIL if not an exclusion constraint.
115  *
116  * This is tailored to the needs of ALTER TABLE ALTER TYPE, which recreates
117  * any indexes that depended on a changing column from their pg_get_indexdef
118  * or pg_get_constraintdef definitions.  We omit some of the sanity checks of
119  * DefineIndex.  We assume that the old and new indexes have the same number
120  * of columns and that if one has an expression column or predicate, both do.
121  * Errors arising from the attribute list still apply.
122  *
123  * Most column type changes that can skip a table rewrite do not invalidate
124  * indexes.  We acknowledge this when all operator classes, collations and
125  * exclusion operators match.  Though we could further permit intra-opfamily
126  * changes for btree and hash indexes, that adds subtle complexity with no
127  * concrete benefit for core types. Note, that INCLUDE columns aren't
128  * checked by this function, for them it's enough that table rewrite is
129  * skipped.
130  *
131  * When a comparison or exclusion operator has a polymorphic input type, the
132  * actual input types must also match.  This defends against the possibility
133  * that operators could vary behavior in response to get_fn_expr_argtype().
134  * At present, this hazard is theoretical: check_exclusion_constraint() and
135  * all core index access methods decline to set fn_expr for such calls.
136  *
137  * We do not yet implement a test to verify compatibility of expression
138  * columns or predicates, so assume any such index is incompatible.
139  */
140 bool
141 CheckIndexCompatible(Oid oldId,
142 					 const char *accessMethodName,
143 					 List *attributeList,
144 					 List *exclusionOpNames)
145 {
146 	bool		isconstraint;
147 	Oid		   *typeObjectId;
148 	Oid		   *collationObjectId;
149 	Oid		   *classObjectId;
150 	Oid			accessMethodId;
151 	Oid			relationId;
152 	HeapTuple	tuple;
153 	Form_pg_index indexForm;
154 	Form_pg_am	accessMethodForm;
155 	IndexAmRoutine *amRoutine;
156 	bool		amcanorder;
157 	int16	   *coloptions;
158 	IndexInfo  *indexInfo;
159 	int			numberOfAttributes;
160 	int			old_natts;
161 	bool		isnull;
162 	bool		ret = true;
163 	oidvector  *old_indclass;
164 	oidvector  *old_indcollation;
165 	Relation	irel;
166 	int			i;
167 	Datum		d;
168 
169 	/* Caller should already have the relation locked in some way. */
170 	relationId = IndexGetRelation(oldId, false);
171 
172 	/*
173 	 * We can pretend isconstraint = false unconditionally.  It only serves to
174 	 * decide the text of an error message that should never happen for us.
175 	 */
176 	isconstraint = false;
177 
178 	numberOfAttributes = list_length(attributeList);
179 	Assert(numberOfAttributes > 0);
180 	Assert(numberOfAttributes <= INDEX_MAX_KEYS);
181 
182 	/* look up the access method */
183 	tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
184 	if (!HeapTupleIsValid(tuple))
185 		ereport(ERROR,
186 				(errcode(ERRCODE_UNDEFINED_OBJECT),
187 				 errmsg("access method \"%s\" does not exist",
188 						accessMethodName)));
189 	accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
190 	accessMethodId = accessMethodForm->oid;
191 	amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
192 	ReleaseSysCache(tuple);
193 
194 	amcanorder = amRoutine->amcanorder;
195 
196 	/*
197 	 * Compute the operator classes, collations, and exclusion operators for
198 	 * the new index, so we can test whether it's compatible with the existing
199 	 * one.  Note that ComputeIndexAttrs might fail here, but that's OK:
200 	 * DefineIndex would have called this function with the same arguments
201 	 * later on, and it would have failed then anyway.  Our attributeList
202 	 * contains only key attributes, thus we're filling ii_NumIndexAttrs and
203 	 * ii_NumIndexKeyAttrs with same value.
204 	 */
205 	indexInfo = makeIndexInfo(numberOfAttributes, numberOfAttributes,
206 							  accessMethodId, NIL, NIL, false, false, false);
207 	typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
208 	collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
209 	classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
210 	coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
211 	ComputeIndexAttrs(indexInfo,
212 					  typeObjectId, collationObjectId, classObjectId,
213 					  coloptions, attributeList,
214 					  exclusionOpNames, relationId,
215 					  accessMethodName, accessMethodId,
216 					  amcanorder, isconstraint);
217 
218 
219 	/* Get the soon-obsolete pg_index tuple. */
220 	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(oldId));
221 	if (!HeapTupleIsValid(tuple))
222 		elog(ERROR, "cache lookup failed for index %u", oldId);
223 	indexForm = (Form_pg_index) GETSTRUCT(tuple);
224 
225 	/*
226 	 * We don't assess expressions or predicates; assume incompatibility.
227 	 * Also, if the index is invalid for any reason, treat it as incompatible.
228 	 */
229 	if (!(heap_attisnull(tuple, Anum_pg_index_indpred, NULL) &&
230 		  heap_attisnull(tuple, Anum_pg_index_indexprs, NULL) &&
231 		  indexForm->indisvalid))
232 	{
233 		ReleaseSysCache(tuple);
234 		return false;
235 	}
236 
237 	/* Any change in operator class or collation breaks compatibility. */
238 	old_natts = indexForm->indnkeyatts;
239 	Assert(old_natts == numberOfAttributes);
240 
241 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
242 	Assert(!isnull);
243 	old_indcollation = (oidvector *) DatumGetPointer(d);
244 
245 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indclass, &isnull);
246 	Assert(!isnull);
247 	old_indclass = (oidvector *) DatumGetPointer(d);
248 
249 	ret = (memcmp(old_indclass->values, classObjectId,
250 				  old_natts * sizeof(Oid)) == 0 &&
251 		   memcmp(old_indcollation->values, collationObjectId,
252 				  old_natts * sizeof(Oid)) == 0);
253 
254 	ReleaseSysCache(tuple);
255 
256 	if (!ret)
257 		return false;
258 
259 	/* For polymorphic opcintype, column type changes break compatibility. */
260 	irel = index_open(oldId, AccessShareLock);	/* caller probably has a lock */
261 	for (i = 0; i < old_natts; i++)
262 	{
263 		if (IsPolymorphicType(get_opclass_input_type(classObjectId[i])) &&
264 			TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
265 		{
266 			ret = false;
267 			break;
268 		}
269 	}
270 
271 	/* Any change in exclusion operator selections breaks compatibility. */
272 	if (ret && indexInfo->ii_ExclusionOps != NULL)
273 	{
274 		Oid		   *old_operators,
275 				   *old_procs;
276 		uint16	   *old_strats;
277 
278 		RelationGetExclusionInfo(irel, &old_operators, &old_procs, &old_strats);
279 		ret = memcmp(old_operators, indexInfo->ii_ExclusionOps,
280 					 old_natts * sizeof(Oid)) == 0;
281 
282 		/* Require an exact input type match for polymorphic operators. */
283 		if (ret)
284 		{
285 			for (i = 0; i < old_natts && ret; i++)
286 			{
287 				Oid			left,
288 							right;
289 
290 				op_input_types(indexInfo->ii_ExclusionOps[i], &left, &right);
291 				if ((IsPolymorphicType(left) || IsPolymorphicType(right)) &&
292 					TupleDescAttr(irel->rd_att, i)->atttypid != typeObjectId[i])
293 				{
294 					ret = false;
295 					break;
296 				}
297 			}
298 		}
299 	}
300 
301 	index_close(irel, NoLock);
302 	return ret;
303 }
304 
305 
306 /*
307  * WaitForOlderSnapshots
308  *
309  * Wait for transactions that might have an older snapshot than the given xmin
310  * limit, because it might not contain tuples deleted just before it has
311  * been taken. Obtain a list of VXIDs of such transactions, and wait for them
312  * individually. This is used when building an index concurrently.
313  *
314  * We can exclude any running transactions that have xmin > the xmin given;
315  * their oldest snapshot must be newer than our xmin limit.
316  * We can also exclude any transactions that have xmin = zero, since they
317  * evidently have no live snapshot at all (and any one they might be in
318  * process of taking is certainly newer than ours).  Transactions in other
319  * DBs can be ignored too, since they'll never even be able to see the
320  * index being worked on.
321  *
322  * We can also exclude autovacuum processes and processes running manual
323  * lazy VACUUMs, because they won't be fazed by missing index entries
324  * either.  (Manual ANALYZEs, however, can't be excluded because they
325  * might be within transactions that are going to do arbitrary operations
326  * later.)
327  *
328  * Also, GetCurrentVirtualXIDs never reports our own vxid, so we need not
329  * check for that.
330  *
331  * If a process goes idle-in-transaction with xmin zero, we do not need to
332  * wait for it anymore, per the above argument.  We do not have the
333  * infrastructure right now to stop waiting if that happens, but we can at
334  * least avoid the folly of waiting when it is idle at the time we would
335  * begin to wait.  We do this by repeatedly rechecking the output of
336  * GetCurrentVirtualXIDs.  If, during any iteration, a particular vxid
337  * doesn't show up in the output, we know we can forget about it.
338  */
339 static void
340 WaitForOlderSnapshots(TransactionId limitXmin, bool progress)
341 {
342 	int			n_old_snapshots;
343 	int			i;
344 	VirtualTransactionId *old_snapshots;
345 
346 	old_snapshots = GetCurrentVirtualXIDs(limitXmin, true, false,
347 										  PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
348 										  &n_old_snapshots);
349 	if (progress)
350 		pgstat_progress_update_param(PROGRESS_WAITFOR_TOTAL, n_old_snapshots);
351 
352 	for (i = 0; i < n_old_snapshots; i++)
353 	{
354 		if (!VirtualTransactionIdIsValid(old_snapshots[i]))
355 			continue;			/* found uninteresting in previous cycle */
356 
357 		if (i > 0)
358 		{
359 			/* see if anything's changed ... */
360 			VirtualTransactionId *newer_snapshots;
361 			int			n_newer_snapshots;
362 			int			j;
363 			int			k;
364 
365 			newer_snapshots = GetCurrentVirtualXIDs(limitXmin,
366 													true, false,
367 													PROC_IS_AUTOVACUUM | PROC_IN_VACUUM,
368 													&n_newer_snapshots);
369 			for (j = i; j < n_old_snapshots; j++)
370 			{
371 				if (!VirtualTransactionIdIsValid(old_snapshots[j]))
372 					continue;	/* found uninteresting in previous cycle */
373 				for (k = 0; k < n_newer_snapshots; k++)
374 				{
375 					if (VirtualTransactionIdEquals(old_snapshots[j],
376 												   newer_snapshots[k]))
377 						break;
378 				}
379 				if (k >= n_newer_snapshots) /* not there anymore */
380 					SetInvalidVirtualTransactionId(old_snapshots[j]);
381 			}
382 			pfree(newer_snapshots);
383 		}
384 
385 		if (VirtualTransactionIdIsValid(old_snapshots[i]))
386 		{
387 			/* If requested, publish who we're going to wait for. */
388 			if (progress)
389 			{
390 				PGPROC	   *holder = BackendIdGetProc(old_snapshots[i].backendId);
391 
392 				if (holder)
393 					pgstat_progress_update_param(PROGRESS_WAITFOR_CURRENT_PID,
394 												 holder->pid);
395 			}
396 			VirtualXactLock(old_snapshots[i], true);
397 		}
398 
399 		if (progress)
400 			pgstat_progress_update_param(PROGRESS_WAITFOR_DONE, i + 1);
401 	}
402 }
403 
404 
405 /*
406  * DefineIndex
407  *		Creates a new index.
408  *
409  * 'relationId': the OID of the heap relation on which the index is to be
410  *		created
411  * 'stmt': IndexStmt describing the properties of the new index.
412  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
413  *		nonzero to specify a preselected OID for the index.
414  * 'parentIndexId': the OID of the parent index; InvalidOid if not the child
415  *		of a partitioned index.
416  * 'parentConstraintId': the OID of the parent constraint; InvalidOid if not
417  *		the child of a constraint (only used when recursing)
418  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
419  * 'check_rights': check for CREATE rights in namespace and tablespace.  (This
420  *		should be true except when ALTER is deleting/recreating an index.)
421  * 'check_not_in_use': check for table not already in use in current session.
422  *		This should be true unless caller is holding the table open, in which
423  *		case the caller had better have checked it earlier.
424  * 'skip_build': make the catalog entries but don't create the index files
425  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
426  *
427  * Returns the object address of the created index.
428  */
429 ObjectAddress
430 DefineIndex(Oid relationId,
431 			IndexStmt *stmt,
432 			Oid indexRelationId,
433 			Oid parentIndexId,
434 			Oid parentConstraintId,
435 			bool is_alter_table,
436 			bool check_rights,
437 			bool check_not_in_use,
438 			bool skip_build,
439 			bool quiet)
440 {
441 	bool		concurrent;
442 	char	   *indexRelationName;
443 	char	   *accessMethodName;
444 	Oid		   *typeObjectId;
445 	Oid		   *collationObjectId;
446 	Oid		   *classObjectId;
447 	Oid			accessMethodId;
448 	Oid			namespaceId;
449 	Oid			tablespaceId;
450 	Oid			createdConstraintId = InvalidOid;
451 	List	   *indexColNames;
452 	List	   *allIndexParams;
453 	Relation	rel;
454 	HeapTuple	tuple;
455 	Form_pg_am	accessMethodForm;
456 	IndexAmRoutine *amRoutine;
457 	bool		amcanorder;
458 	amoptions_function amoptions;
459 	bool		partitioned;
460 	Datum		reloptions;
461 	int16	   *coloptions;
462 	IndexInfo  *indexInfo;
463 	bits16		flags;
464 	bits16		constr_flags;
465 	int			numberOfAttributes;
466 	int			numberOfKeyAttributes;
467 	TransactionId limitXmin;
468 	ObjectAddress address;
469 	LockRelId	heaprelid;
470 	LOCKTAG		heaplocktag;
471 	LOCKMODE	lockmode;
472 	Snapshot	snapshot;
473 	int			save_nestlevel = -1;
474 	int			i;
475 
476 	/*
477 	 * Some callers need us to run with an empty default_tablespace; this is a
478 	 * necessary hack to be able to reproduce catalog state accurately when
479 	 * recreating indexes after table-rewriting ALTER TABLE.
480 	 */
481 	if (stmt->reset_default_tblspc)
482 	{
483 		save_nestlevel = NewGUCNestLevel();
484 		(void) set_config_option("default_tablespace", "",
485 								 PGC_USERSET, PGC_S_SESSION,
486 								 GUC_ACTION_SAVE, true, 0, false);
487 	}
488 
489 	/*
490 	 * Force non-concurrent build on temporary relations, even if CONCURRENTLY
491 	 * was requested.  Other backends can't access a temporary relation, so
492 	 * there's no harm in grabbing a stronger lock, and a non-concurrent DROP
493 	 * is more efficient.  Do this before any use of the concurrent option is
494 	 * done.
495 	 */
496 	if (stmt->concurrent && get_rel_persistence(relationId) != RELPERSISTENCE_TEMP)
497 		concurrent = true;
498 	else
499 		concurrent = false;
500 
501 	/*
502 	 * Start progress report.  If we're building a partition, this was already
503 	 * done.
504 	 */
505 	if (!OidIsValid(parentIndexId))
506 	{
507 		pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
508 									  relationId);
509 		pgstat_progress_update_param(PROGRESS_CREATEIDX_COMMAND,
510 									 concurrent ?
511 									 PROGRESS_CREATEIDX_COMMAND_CREATE_CONCURRENTLY :
512 									 PROGRESS_CREATEIDX_COMMAND_CREATE);
513 	}
514 
515 	/*
516 	 * No index OID to report yet
517 	 */
518 	pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
519 								 InvalidOid);
520 
521 	/*
522 	 * count key attributes in index
523 	 */
524 	numberOfKeyAttributes = list_length(stmt->indexParams);
525 
526 	/*
527 	 * Calculate the new list of index columns including both key columns and
528 	 * INCLUDE columns.  Later we can determine which of these are key
529 	 * columns, and which are just part of the INCLUDE list by checking the
530 	 * list position.  A list item in a position less than ii_NumIndexKeyAttrs
531 	 * is part of the key columns, and anything equal to and over is part of
532 	 * the INCLUDE columns.
533 	 */
534 	allIndexParams = list_concat(list_copy(stmt->indexParams),
535 								 list_copy(stmt->indexIncludingParams));
536 	numberOfAttributes = list_length(allIndexParams);
537 
538 	if (numberOfKeyAttributes <= 0)
539 		ereport(ERROR,
540 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
541 				 errmsg("must specify at least one column")));
542 	if (numberOfAttributes > INDEX_MAX_KEYS)
543 		ereport(ERROR,
544 				(errcode(ERRCODE_TOO_MANY_COLUMNS),
545 				 errmsg("cannot use more than %d columns in an index",
546 						INDEX_MAX_KEYS)));
547 
548 	/*
549 	 * Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
550 	 * index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
551 	 * (but not VACUUM).
552 	 *
553 	 * NB: Caller is responsible for making sure that relationId refers to the
554 	 * relation on which the index should be built; except in bootstrap mode,
555 	 * this will typically require the caller to have already locked the
556 	 * relation.  To avoid lock upgrade hazards, that lock should be at least
557 	 * as strong as the one we take here.
558 	 *
559 	 * NB: If the lock strength here ever changes, code that is run by
560 	 * parallel workers under the control of certain particular ambuild
561 	 * functions will need to be updated, too.
562 	 */
563 	lockmode = concurrent ? ShareUpdateExclusiveLock : ShareLock;
564 	rel = table_open(relationId, lockmode);
565 
566 	namespaceId = RelationGetNamespace(rel);
567 
568 	/* Ensure that it makes sense to index this kind of relation */
569 	switch (rel->rd_rel->relkind)
570 	{
571 		case RELKIND_RELATION:
572 		case RELKIND_MATVIEW:
573 		case RELKIND_PARTITIONED_TABLE:
574 			/* OK */
575 			break;
576 		case RELKIND_FOREIGN_TABLE:
577 
578 			/*
579 			 * Custom error message for FOREIGN TABLE since the term is close
580 			 * to a regular table and can confuse the user.
581 			 */
582 			ereport(ERROR,
583 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
584 					 errmsg("cannot create index on foreign table \"%s\"",
585 							RelationGetRelationName(rel))));
586 			break;
587 		default:
588 			ereport(ERROR,
589 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
590 					 errmsg("\"%s\" is not a table or materialized view",
591 							RelationGetRelationName(rel))));
592 			break;
593 	}
594 
595 	/*
596 	 * Establish behavior for partitioned tables, and verify sanity of
597 	 * parameters.
598 	 *
599 	 * We do not build an actual index in this case; we only create a few
600 	 * catalog entries.  The actual indexes are built by recursing for each
601 	 * partition.
602 	 */
603 	partitioned = rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE;
604 	if (partitioned)
605 	{
606 		/*
607 		 * Note: we check 'stmt->concurrent' rather than 'concurrent', so that
608 		 * the error is thrown also for temporary tables.  Seems better to be
609 		 * consistent, even though we could do it on temporary table because
610 		 * we're not actually doing it concurrently.
611 		 */
612 		if (stmt->concurrent)
613 			ereport(ERROR,
614 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
615 					 errmsg("cannot create index on partitioned table \"%s\" concurrently",
616 							RelationGetRelationName(rel))));
617 		if (stmt->excludeOpNames)
618 			ereport(ERROR,
619 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
620 					 errmsg("cannot create exclusion constraints on partitioned table \"%s\"",
621 							RelationGetRelationName(rel))));
622 	}
623 
624 	/*
625 	 * Don't try to CREATE INDEX on temp tables of other backends.
626 	 */
627 	if (RELATION_IS_OTHER_TEMP(rel))
628 		ereport(ERROR,
629 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
630 				 errmsg("cannot create indexes on temporary tables of other sessions")));
631 
632 	/*
633 	 * Unless our caller vouches for having checked this already, insist that
634 	 * the table not be in use by our own session, either.  Otherwise we might
635 	 * fail to make entries in the new index (for instance, if an INSERT or
636 	 * UPDATE is in progress and has already made its list of target indexes).
637 	 */
638 	if (check_not_in_use)
639 		CheckTableNotInUse(rel, "CREATE INDEX");
640 
641 	/*
642 	 * Verify we (still) have CREATE rights in the rel's namespace.
643 	 * (Presumably we did when the rel was created, but maybe not anymore.)
644 	 * Skip check if caller doesn't want it.  Also skip check if
645 	 * bootstrapping, since permissions machinery may not be working yet.
646 	 */
647 	if (check_rights && !IsBootstrapProcessingMode())
648 	{
649 		AclResult	aclresult;
650 
651 		aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
652 										  ACL_CREATE);
653 		if (aclresult != ACLCHECK_OK)
654 			aclcheck_error(aclresult, OBJECT_SCHEMA,
655 						   get_namespace_name(namespaceId));
656 	}
657 
658 	/*
659 	 * Select tablespace to use.  If not specified, use default tablespace
660 	 * (which may in turn default to database's default).
661 	 */
662 	if (stmt->tableSpace)
663 	{
664 		tablespaceId = get_tablespace_oid(stmt->tableSpace, false);
665 		if (partitioned && tablespaceId == MyDatabaseTableSpace)
666 			ereport(ERROR,
667 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
668 					 errmsg("cannot specify default tablespace for partitioned relations")));
669 	}
670 	else
671 	{
672 		tablespaceId = GetDefaultTablespace(rel->rd_rel->relpersistence,
673 											partitioned);
674 		/* note InvalidOid is OK in this case */
675 	}
676 
677 	/* Check tablespace permissions */
678 	if (check_rights &&
679 		OidIsValid(tablespaceId) && tablespaceId != MyDatabaseTableSpace)
680 	{
681 		AclResult	aclresult;
682 
683 		aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
684 										   ACL_CREATE);
685 		if (aclresult != ACLCHECK_OK)
686 			aclcheck_error(aclresult, OBJECT_TABLESPACE,
687 						   get_tablespace_name(tablespaceId));
688 	}
689 
690 	/*
691 	 * Force shared indexes into the pg_global tablespace.  This is a bit of a
692 	 * hack but seems simpler than marking them in the BKI commands.  On the
693 	 * other hand, if it's not shared, don't allow it to be placed there.
694 	 */
695 	if (rel->rd_rel->relisshared)
696 		tablespaceId = GLOBALTABLESPACE_OID;
697 	else if (tablespaceId == GLOBALTABLESPACE_OID)
698 		ereport(ERROR,
699 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
700 				 errmsg("only shared relations can be placed in pg_global tablespace")));
701 
702 	/*
703 	 * Choose the index column names.
704 	 */
705 	indexColNames = ChooseIndexColumnNames(allIndexParams);
706 
707 	/*
708 	 * Select name for index if caller didn't specify
709 	 */
710 	indexRelationName = stmt->idxname;
711 	if (indexRelationName == NULL)
712 		indexRelationName = ChooseIndexName(RelationGetRelationName(rel),
713 											namespaceId,
714 											indexColNames,
715 											stmt->excludeOpNames,
716 											stmt->primary,
717 											stmt->isconstraint);
718 
719 	/*
720 	 * look up the access method, verify it can handle the requested features
721 	 */
722 	accessMethodName = stmt->accessMethod;
723 	tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
724 	if (!HeapTupleIsValid(tuple))
725 	{
726 		/*
727 		 * Hack to provide more-or-less-transparent updating of old RTREE
728 		 * indexes to GiST: if RTREE is requested and not found, use GIST.
729 		 */
730 		if (strcmp(accessMethodName, "rtree") == 0)
731 		{
732 			ereport(NOTICE,
733 					(errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
734 			accessMethodName = "gist";
735 			tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
736 		}
737 
738 		if (!HeapTupleIsValid(tuple))
739 			ereport(ERROR,
740 					(errcode(ERRCODE_UNDEFINED_OBJECT),
741 					 errmsg("access method \"%s\" does not exist",
742 							accessMethodName)));
743 	}
744 	accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
745 	accessMethodId = accessMethodForm->oid;
746 	amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
747 
748 	pgstat_progress_update_param(PROGRESS_CREATEIDX_ACCESS_METHOD_OID,
749 								 accessMethodId);
750 
751 	if (stmt->unique && !amRoutine->amcanunique)
752 		ereport(ERROR,
753 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
754 				 errmsg("access method \"%s\" does not support unique indexes",
755 						accessMethodName)));
756 	if (stmt->indexIncludingParams != NIL && !amRoutine->amcaninclude)
757 		ereport(ERROR,
758 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
759 				 errmsg("access method \"%s\" does not support included columns",
760 						accessMethodName)));
761 	if (numberOfKeyAttributes > 1 && !amRoutine->amcanmulticol)
762 		ereport(ERROR,
763 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
764 				 errmsg("access method \"%s\" does not support multicolumn indexes",
765 						accessMethodName)));
766 	if (stmt->excludeOpNames && amRoutine->amgettuple == NULL)
767 		ereport(ERROR,
768 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
769 				 errmsg("access method \"%s\" does not support exclusion constraints",
770 						accessMethodName)));
771 
772 	amcanorder = amRoutine->amcanorder;
773 	amoptions = amRoutine->amoptions;
774 
775 	pfree(amRoutine);
776 	ReleaseSysCache(tuple);
777 
778 	/*
779 	 * Validate predicate, if given
780 	 */
781 	if (stmt->whereClause)
782 		CheckPredicate((Expr *) stmt->whereClause);
783 
784 	/*
785 	 * Parse AM-specific options, convert to text array form, validate.
786 	 */
787 	reloptions = transformRelOptions((Datum) 0, stmt->options,
788 									 NULL, NULL, false, false);
789 
790 	(void) index_reloptions(amoptions, reloptions, true);
791 
792 	/*
793 	 * Prepare arguments for index_create, primarily an IndexInfo structure.
794 	 * Note that predicates must be in implicit-AND format.  In a concurrent
795 	 * build, mark it not-ready-for-inserts.
796 	 */
797 	indexInfo = makeIndexInfo(numberOfAttributes,
798 							  numberOfKeyAttributes,
799 							  accessMethodId,
800 							  NIL,	/* expressions, NIL for now */
801 							  make_ands_implicit((Expr *) stmt->whereClause),
802 							  stmt->unique,
803 							  !concurrent,
804 							  concurrent);
805 
806 	typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
807 	collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
808 	classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
809 	coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
810 	ComputeIndexAttrs(indexInfo,
811 					  typeObjectId, collationObjectId, classObjectId,
812 					  coloptions, allIndexParams,
813 					  stmt->excludeOpNames, relationId,
814 					  accessMethodName, accessMethodId,
815 					  amcanorder, stmt->isconstraint);
816 
817 	/*
818 	 * Extra checks when creating a PRIMARY KEY index.
819 	 */
820 	if (stmt->primary)
821 		index_check_primary_key(rel, indexInfo, is_alter_table, stmt);
822 
823 	/*
824 	 * If this table is partitioned and we're creating a unique index or a
825 	 * primary key, make sure that the partition key is a subset of the
826 	 * index's columns.  Otherwise it would be possible to violate uniqueness
827 	 * by putting values that ought to be unique in different partitions.
828 	 *
829 	 * We could lift this limitation if we had global indexes, but those have
830 	 * their own problems, so this is a useful feature combination.
831 	 */
832 	if (partitioned && (stmt->unique || stmt->primary))
833 	{
834 		PartitionKey key = RelationGetPartitionKey(rel);
835 		const char *constraint_type;
836 		int			i;
837 
838 		if (stmt->primary)
839 			constraint_type = "PRIMARY KEY";
840 		else if (stmt->unique)
841 			constraint_type = "UNIQUE";
842 		else if (stmt->excludeOpNames != NIL)
843 			constraint_type = "EXCLUDE";
844 		else
845 		{
846 			elog(ERROR, "unknown constraint type");
847 			constraint_type = NULL; /* keep compiler quiet */
848 		}
849 
850 		/*
851 		 * Verify that all the columns in the partition key appear in the
852 		 * unique key definition, with the same notion of equality.
853 		 */
854 		for (i = 0; i < key->partnatts; i++)
855 		{
856 			bool		found = false;
857 			int			eq_strategy;
858 			Oid			ptkey_eqop;
859 			int			j;
860 
861 			/*
862 			 * Identify the equality operator associated with this partkey
863 			 * column.  For list and range partitioning, partkeys use btree
864 			 * operator classes; hash partitioning uses hash operator classes.
865 			 * (Keep this in sync with ComputePartitionAttrs!)
866 			 */
867 			if (key->strategy == PARTITION_STRATEGY_HASH)
868 				eq_strategy = HTEqualStrategyNumber;
869 			else
870 				eq_strategy = BTEqualStrategyNumber;
871 
872 			ptkey_eqop = get_opfamily_member(key->partopfamily[i],
873 											 key->partopcintype[i],
874 											 key->partopcintype[i],
875 											 eq_strategy);
876 			if (!OidIsValid(ptkey_eqop))
877 				elog(ERROR, "missing operator %d(%u,%u) in partition opfamily %u",
878 					 eq_strategy, key->partopcintype[i], key->partopcintype[i],
879 					 key->partopfamily[i]);
880 
881 			/*
882 			 * We'll need to be able to identify the equality operators
883 			 * associated with index columns, too.  We know what to do with
884 			 * btree opclasses; if there are ever any other index types that
885 			 * support unique indexes, this logic will need extension.
886 			 */
887 			if (accessMethodId == BTREE_AM_OID)
888 				eq_strategy = BTEqualStrategyNumber;
889 			else
890 				ereport(ERROR,
891 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
892 						 errmsg("cannot match partition key to an index using access method \"%s\"",
893 								accessMethodName)));
894 
895 			/*
896 			 * It may be possible to support UNIQUE constraints when partition
897 			 * keys are expressions, but is it worth it?  Give up for now.
898 			 */
899 			if (key->partattrs[i] == 0)
900 				ereport(ERROR,
901 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
902 						 errmsg("unsupported %s constraint with partition key definition",
903 								constraint_type),
904 						 errdetail("%s constraints cannot be used when partition keys include expressions.",
905 								   constraint_type)));
906 
907 			/* Search the index column(s) for a match */
908 			for (j = 0; j < indexInfo->ii_NumIndexKeyAttrs; j++)
909 			{
910 				if (key->partattrs[i] == indexInfo->ii_IndexAttrNumbers[j])
911 				{
912 					/* Matched the column, now what about the equality op? */
913 					Oid			idx_opfamily;
914 					Oid			idx_opcintype;
915 
916 					if (get_opclass_opfamily_and_input_type(classObjectId[j],
917 															&idx_opfamily,
918 															&idx_opcintype))
919 					{
920 						Oid			idx_eqop;
921 
922 						idx_eqop = get_opfamily_member(idx_opfamily,
923 													   idx_opcintype,
924 													   idx_opcintype,
925 													   eq_strategy);
926 						if (ptkey_eqop == idx_eqop)
927 						{
928 							found = true;
929 							break;
930 						}
931 					}
932 				}
933 			}
934 
935 			if (!found)
936 			{
937 				Form_pg_attribute att;
938 
939 				att = TupleDescAttr(RelationGetDescr(rel),
940 									key->partattrs[i] - 1);
941 				ereport(ERROR,
942 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
943 						 errmsg("unique constraint on partitioned table must include all partitioning columns"),
944 						 errdetail("%s constraint on table \"%s\" lacks column \"%s\" which is part of the partition key.",
945 								   constraint_type, RelationGetRelationName(rel),
946 								   NameStr(att->attname))));
947 			}
948 		}
949 	}
950 
951 
952 	/*
953 	 * We disallow indexes on system columns.  They would not necessarily get
954 	 * updated correctly, and they don't seem useful anyway.
955 	 */
956 	for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
957 	{
958 		AttrNumber	attno = indexInfo->ii_IndexAttrNumbers[i];
959 
960 		if (attno < 0)
961 			ereport(ERROR,
962 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
963 					 errmsg("index creation on system columns is not supported")));
964 	}
965 
966 	/*
967 	 * Also check for system columns used in expressions or predicates.
968 	 */
969 	if (indexInfo->ii_Expressions || indexInfo->ii_Predicate)
970 	{
971 		Bitmapset  *indexattrs = NULL;
972 
973 		pull_varattnos((Node *) indexInfo->ii_Expressions, 1, &indexattrs);
974 		pull_varattnos((Node *) indexInfo->ii_Predicate, 1, &indexattrs);
975 
976 		for (i = FirstLowInvalidHeapAttributeNumber + 1; i < 0; i++)
977 		{
978 			if (bms_is_member(i - FirstLowInvalidHeapAttributeNumber,
979 							  indexattrs))
980 				ereport(ERROR,
981 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
982 						 errmsg("index creation on system columns is not supported")));
983 		}
984 	}
985 
986 	/*
987 	 * Report index creation if appropriate (delay this till after most of the
988 	 * error checks)
989 	 */
990 	if (stmt->isconstraint && !quiet)
991 	{
992 		const char *constraint_type;
993 
994 		if (stmt->primary)
995 			constraint_type = "PRIMARY KEY";
996 		else if (stmt->unique)
997 			constraint_type = "UNIQUE";
998 		else if (stmt->excludeOpNames != NIL)
999 			constraint_type = "EXCLUDE";
1000 		else
1001 		{
1002 			elog(ERROR, "unknown constraint type");
1003 			constraint_type = NULL; /* keep compiler quiet */
1004 		}
1005 
1006 		ereport(DEBUG1,
1007 				(errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
1008 						is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
1009 						constraint_type,
1010 						indexRelationName, RelationGetRelationName(rel))));
1011 	}
1012 
1013 	/*
1014 	 * A valid stmt->oldNode implies that we already have a built form of the
1015 	 * index.  The caller should also decline any index build.
1016 	 */
1017 	Assert(!OidIsValid(stmt->oldNode) || (skip_build && !concurrent));
1018 
1019 	/*
1020 	 * Make the catalog entries for the index, including constraints. This
1021 	 * step also actually builds the index, except if caller requested not to
1022 	 * or in concurrent mode, in which case it'll be done later, or doing a
1023 	 * partitioned index (because those don't have storage).
1024 	 */
1025 	flags = constr_flags = 0;
1026 	if (stmt->isconstraint)
1027 		flags |= INDEX_CREATE_ADD_CONSTRAINT;
1028 	if (skip_build || concurrent || partitioned)
1029 		flags |= INDEX_CREATE_SKIP_BUILD;
1030 	if (stmt->if_not_exists)
1031 		flags |= INDEX_CREATE_IF_NOT_EXISTS;
1032 	if (concurrent)
1033 		flags |= INDEX_CREATE_CONCURRENT;
1034 	if (partitioned)
1035 		flags |= INDEX_CREATE_PARTITIONED;
1036 	if (stmt->primary)
1037 		flags |= INDEX_CREATE_IS_PRIMARY;
1038 
1039 	/*
1040 	 * If the table is partitioned, and recursion was declined but partitions
1041 	 * exist, mark the index as invalid.
1042 	 */
1043 	if (partitioned && stmt->relation && !stmt->relation->inh)
1044 	{
1045 		PartitionDesc pd = RelationGetPartitionDesc(rel);
1046 
1047 		if (pd->nparts != 0)
1048 			flags |= INDEX_CREATE_INVALID;
1049 	}
1050 
1051 	if (stmt->deferrable)
1052 		constr_flags |= INDEX_CONSTR_CREATE_DEFERRABLE;
1053 	if (stmt->initdeferred)
1054 		constr_flags |= INDEX_CONSTR_CREATE_INIT_DEFERRED;
1055 
1056 	indexRelationId =
1057 		index_create(rel, indexRelationName, indexRelationId, parentIndexId,
1058 					 parentConstraintId,
1059 					 stmt->oldNode, indexInfo, indexColNames,
1060 					 accessMethodId, tablespaceId,
1061 					 collationObjectId, classObjectId,
1062 					 coloptions, reloptions,
1063 					 flags, constr_flags,
1064 					 allowSystemTableMods, !check_rights,
1065 					 &createdConstraintId);
1066 
1067 	ObjectAddressSet(address, RelationRelationId, indexRelationId);
1068 
1069 	/*
1070 	 * Revert to original default_tablespace.  Must do this before any return
1071 	 * from this function, but after index_create, so this is a good time.
1072 	 */
1073 	if (save_nestlevel >= 0)
1074 		AtEOXact_GUC(true, save_nestlevel);
1075 
1076 	if (!OidIsValid(indexRelationId))
1077 	{
1078 		table_close(rel, NoLock);
1079 
1080 		/* If this is the top-level index, we're done */
1081 		if (!OidIsValid(parentIndexId))
1082 			pgstat_progress_end_command();
1083 
1084 		return address;
1085 	}
1086 
1087 	/* Add any requested comment */
1088 	if (stmt->idxcomment != NULL)
1089 		CreateComments(indexRelationId, RelationRelationId, 0,
1090 					   stmt->idxcomment);
1091 
1092 	if (partitioned)
1093 	{
1094 		PartitionDesc partdesc;
1095 
1096 		/*
1097 		 * Unless caller specified to skip this step (via ONLY), process each
1098 		 * partition to make sure they all contain a corresponding index.
1099 		 *
1100 		 * If we're called internally (no stmt->relation), recurse always.
1101 		 */
1102 		partdesc = RelationGetPartitionDesc(rel);
1103 		if ((!stmt->relation || stmt->relation->inh) && partdesc->nparts > 0)
1104 		{
1105 			int			nparts = partdesc->nparts;
1106 			Oid		   *part_oids = palloc(sizeof(Oid) * nparts);
1107 			bool		invalidate_parent = false;
1108 			TupleDesc	parentDesc;
1109 			Oid		   *opfamOids;
1110 
1111 			pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_TOTAL,
1112 										 nparts);
1113 
1114 			memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
1115 
1116 			parentDesc = RelationGetDescr(rel);
1117 			opfamOids = palloc(sizeof(Oid) * numberOfKeyAttributes);
1118 			for (i = 0; i < numberOfKeyAttributes; i++)
1119 				opfamOids[i] = get_opclass_family(classObjectId[i]);
1120 
1121 			/*
1122 			 * For each partition, scan all existing indexes; if one matches
1123 			 * our index definition and is not already attached to some other
1124 			 * parent index, attach it to the one we just created.
1125 			 *
1126 			 * If none matches, build a new index by calling ourselves
1127 			 * recursively with the same options (except for the index name).
1128 			 */
1129 			for (i = 0; i < nparts; i++)
1130 			{
1131 				Oid			childRelid = part_oids[i];
1132 				Relation	childrel;
1133 				List	   *childidxs;
1134 				ListCell   *cell;
1135 				AttrNumber *attmap;
1136 				bool		found = false;
1137 				int			maplen;
1138 
1139 				childrel = table_open(childRelid, lockmode);
1140 
1141 				/*
1142 				 * Don't try to create indexes on foreign tables, though. Skip
1143 				 * those if a regular index, or fail if trying to create a
1144 				 * constraint index.
1145 				 */
1146 				if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1147 				{
1148 					if (stmt->unique || stmt->primary)
1149 						ereport(ERROR,
1150 								(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1151 								 errmsg("cannot create unique index on partitioned table \"%s\"",
1152 										RelationGetRelationName(rel)),
1153 								 errdetail("Table \"%s\" contains partitions that are foreign tables.",
1154 										   RelationGetRelationName(rel))));
1155 
1156 					table_close(childrel, lockmode);
1157 					continue;
1158 				}
1159 
1160 				childidxs = RelationGetIndexList(childrel);
1161 				attmap =
1162 					convert_tuples_by_name_map(RelationGetDescr(childrel),
1163 											   parentDesc,
1164 											   gettext_noop("could not convert row type"));
1165 				maplen = parentDesc->natts;
1166 
1167 				foreach(cell, childidxs)
1168 				{
1169 					Oid			cldidxid = lfirst_oid(cell);
1170 					Relation	cldidx;
1171 					IndexInfo  *cldIdxInfo;
1172 
1173 					/* this index is already partition of another one */
1174 					if (has_superclass(cldidxid))
1175 						continue;
1176 
1177 					cldidx = index_open(cldidxid, lockmode);
1178 					cldIdxInfo = BuildIndexInfo(cldidx);
1179 					if (CompareIndexInfo(cldIdxInfo, indexInfo,
1180 										 cldidx->rd_indcollation,
1181 										 collationObjectId,
1182 										 cldidx->rd_opfamily,
1183 										 opfamOids,
1184 										 attmap, maplen))
1185 					{
1186 						Oid			cldConstrOid = InvalidOid;
1187 
1188 						/*
1189 						 * Found a match.
1190 						 *
1191 						 * If this index is being created in the parent
1192 						 * because of a constraint, then the child needs to
1193 						 * have a constraint also, so look for one.  If there
1194 						 * is no such constraint, this index is no good, so
1195 						 * keep looking.
1196 						 */
1197 						if (createdConstraintId != InvalidOid)
1198 						{
1199 							cldConstrOid =
1200 								get_relation_idx_constraint_oid(childRelid,
1201 																cldidxid);
1202 							if (cldConstrOid == InvalidOid)
1203 							{
1204 								index_close(cldidx, lockmode);
1205 								continue;
1206 							}
1207 						}
1208 
1209 						/* Attach index to parent and we're done. */
1210 						IndexSetParentIndex(cldidx, indexRelationId);
1211 						if (createdConstraintId != InvalidOid)
1212 							ConstraintSetParentConstraint(cldConstrOid,
1213 														  createdConstraintId,
1214 														  childRelid);
1215 
1216 						if (!cldidx->rd_index->indisvalid)
1217 							invalidate_parent = true;
1218 
1219 						found = true;
1220 						/* keep lock till commit */
1221 						index_close(cldidx, NoLock);
1222 						break;
1223 					}
1224 
1225 					index_close(cldidx, lockmode);
1226 				}
1227 
1228 				list_free(childidxs);
1229 				table_close(childrel, NoLock);
1230 
1231 				/*
1232 				 * If no matching index was found, create our own.
1233 				 */
1234 				if (!found)
1235 				{
1236 					IndexStmt  *childStmt = copyObject(stmt);
1237 					bool		found_whole_row;
1238 					ListCell   *lc;
1239 
1240 					/*
1241 					 * We can't use the same index name for the child index,
1242 					 * so clear idxname to let the recursive invocation choose
1243 					 * a new name.  Likewise, the existing target relation
1244 					 * field is wrong, and if indexOid or oldNode are set,
1245 					 * they mustn't be applied to the child either.
1246 					 */
1247 					childStmt->idxname = NULL;
1248 					childStmt->relation = NULL;
1249 					childStmt->indexOid = InvalidOid;
1250 					childStmt->oldNode = InvalidOid;
1251 
1252 					/*
1253 					 * Adjust any Vars (both in expressions and in the index's
1254 					 * WHERE clause) to match the partition's column numbering
1255 					 * in case it's different from the parent's.
1256 					 */
1257 					foreach(lc, childStmt->indexParams)
1258 					{
1259 						IndexElem  *ielem = lfirst(lc);
1260 
1261 						/*
1262 						 * If the index parameter is an expression, we must
1263 						 * translate it to contain child Vars.
1264 						 */
1265 						if (ielem->expr)
1266 						{
1267 							ielem->expr =
1268 								map_variable_attnos((Node *) ielem->expr,
1269 													1, 0, attmap, maplen,
1270 													InvalidOid,
1271 													&found_whole_row);
1272 							if (found_whole_row)
1273 								elog(ERROR, "cannot convert whole-row table reference");
1274 						}
1275 					}
1276 					childStmt->whereClause =
1277 						map_variable_attnos(stmt->whereClause, 1, 0,
1278 											attmap, maplen,
1279 											InvalidOid, &found_whole_row);
1280 					if (found_whole_row)
1281 						elog(ERROR, "cannot convert whole-row table reference");
1282 
1283 					DefineIndex(childRelid, childStmt,
1284 								InvalidOid, /* no predefined OID */
1285 								indexRelationId,	/* this is our child */
1286 								createdConstraintId,
1287 								is_alter_table, check_rights, check_not_in_use,
1288 								skip_build, quiet);
1289 				}
1290 
1291 				pgstat_progress_update_param(PROGRESS_CREATEIDX_PARTITIONS_DONE,
1292 											 i + 1);
1293 				pfree(attmap);
1294 			}
1295 
1296 			/*
1297 			 * The pg_index row we inserted for this index was marked
1298 			 * indisvalid=true.  But if we attached an existing index that is
1299 			 * invalid, this is incorrect, so update our row to invalid too.
1300 			 */
1301 			if (invalidate_parent)
1302 			{
1303 				Relation	pg_index = table_open(IndexRelationId, RowExclusiveLock);
1304 				HeapTuple	tup,
1305 							newtup;
1306 
1307 				tup = SearchSysCache1(INDEXRELID,
1308 									  ObjectIdGetDatum(indexRelationId));
1309 				if (!HeapTupleIsValid(tup))
1310 					elog(ERROR, "cache lookup failed for index %u",
1311 						 indexRelationId);
1312 				newtup = heap_copytuple(tup);
1313 				((Form_pg_index) GETSTRUCT(newtup))->indisvalid = false;
1314 				CatalogTupleUpdate(pg_index, &tup->t_self, newtup);
1315 				ReleaseSysCache(tup);
1316 				table_close(pg_index, RowExclusiveLock);
1317 				heap_freetuple(newtup);
1318 			}
1319 		}
1320 
1321 		/*
1322 		 * Indexes on partitioned tables are not themselves built, so we're
1323 		 * done here.
1324 		 */
1325 		table_close(rel, NoLock);
1326 		if (!OidIsValid(parentIndexId))
1327 			pgstat_progress_end_command();
1328 		return address;
1329 	}
1330 
1331 	if (!concurrent)
1332 	{
1333 		/* Close the heap and we're done, in the non-concurrent case */
1334 		table_close(rel, NoLock);
1335 
1336 		/* If this is the top-level index, we're done. */
1337 		if (!OidIsValid(parentIndexId))
1338 			pgstat_progress_end_command();
1339 
1340 		return address;
1341 	}
1342 
1343 	/* save lockrelid and locktag for below, then close rel */
1344 	heaprelid = rel->rd_lockInfo.lockRelId;
1345 	SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
1346 	table_close(rel, NoLock);
1347 
1348 	/*
1349 	 * For a concurrent build, it's important to make the catalog entries
1350 	 * visible to other transactions before we start to build the index. That
1351 	 * will prevent them from making incompatible HOT updates.  The new index
1352 	 * will be marked not indisready and not indisvalid, so that no one else
1353 	 * tries to either insert into it or use it for queries.
1354 	 *
1355 	 * We must commit our current transaction so that the index becomes
1356 	 * visible; then start another.  Note that all the data structures we just
1357 	 * built are lost in the commit.  The only data we keep past here are the
1358 	 * relation IDs.
1359 	 *
1360 	 * Before committing, get a session-level lock on the table, to ensure
1361 	 * that neither it nor the index can be dropped before we finish. This
1362 	 * cannot block, even if someone else is waiting for access, because we
1363 	 * already have the same lock within our transaction.
1364 	 *
1365 	 * Note: we don't currently bother with a session lock on the index,
1366 	 * because there are no operations that could change its state while we
1367 	 * hold lock on the parent table.  This might need to change later.
1368 	 */
1369 	LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
1370 
1371 	PopActiveSnapshot();
1372 	CommitTransactionCommand();
1373 	StartTransactionCommand();
1374 
1375 	/*
1376 	 * The index is now visible, so we can report the OID.
1377 	 */
1378 	pgstat_progress_update_param(PROGRESS_CREATEIDX_INDEX_OID,
1379 								 indexRelationId);
1380 
1381 	/*
1382 	 * Phase 2 of concurrent index build (see comments for validate_index()
1383 	 * for an overview of how this works)
1384 	 *
1385 	 * Now we must wait until no running transaction could have the table open
1386 	 * with the old list of indexes.  Use ShareLock to consider running
1387 	 * transactions that hold locks that permit writing to the table.  Note we
1388 	 * do not need to worry about xacts that open the table for writing after
1389 	 * this point; they will see the new index when they open it.
1390 	 *
1391 	 * Note: the reason we use actual lock acquisition here, rather than just
1392 	 * checking the ProcArray and sleeping, is that deadlock is possible if
1393 	 * one of the transactions in question is blocked trying to acquire an
1394 	 * exclusive lock on our table.  The lock code will detect deadlock and
1395 	 * error out properly.
1396 	 */
1397 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
1398 								 PROGRESS_CREATEIDX_PHASE_WAIT_1);
1399 	WaitForLockers(heaplocktag, ShareLock, true);
1400 
1401 	/*
1402 	 * At this moment we are sure that there are no transactions with the
1403 	 * table open for write that don't have this new index in their list of
1404 	 * indexes.  We have waited out all the existing transactions and any new
1405 	 * transaction will have the new index in its list, but the index is still
1406 	 * marked as "not-ready-for-inserts".  The index is consulted while
1407 	 * deciding HOT-safety though.  This arrangement ensures that no new HOT
1408 	 * chains can be created where the new tuple and the old tuple in the
1409 	 * chain have different index keys.
1410 	 *
1411 	 * We now take a new snapshot, and build the index using all tuples that
1412 	 * are visible in this snapshot.  We can be sure that any HOT updates to
1413 	 * these tuples will be compatible with the index, since any updates made
1414 	 * by transactions that didn't know about the index are now committed or
1415 	 * rolled back.  Thus, each visible tuple is either the end of its
1416 	 * HOT-chain or the extension of the chain is HOT-safe for this index.
1417 	 */
1418 
1419 	/* Set ActiveSnapshot since functions in the indexes may need it */
1420 	PushActiveSnapshot(GetTransactionSnapshot());
1421 
1422 	/* Perform concurrent build of index */
1423 	index_concurrently_build(relationId, indexRelationId);
1424 
1425 	/* we can do away with our snapshot */
1426 	PopActiveSnapshot();
1427 
1428 	/*
1429 	 * Commit this transaction to make the indisready update visible.
1430 	 */
1431 	CommitTransactionCommand();
1432 	StartTransactionCommand();
1433 
1434 	/*
1435 	 * Phase 3 of concurrent index build
1436 	 *
1437 	 * We once again wait until no transaction can have the table open with
1438 	 * the index marked as read-only for updates.
1439 	 */
1440 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
1441 								 PROGRESS_CREATEIDX_PHASE_WAIT_2);
1442 	WaitForLockers(heaplocktag, ShareLock, true);
1443 
1444 	/*
1445 	 * Now take the "reference snapshot" that will be used by validate_index()
1446 	 * to filter candidate tuples.  Beware!  There might still be snapshots in
1447 	 * use that treat some transaction as in-progress that our reference
1448 	 * snapshot treats as committed.  If such a recently-committed transaction
1449 	 * deleted tuples in the table, we will not include them in the index; yet
1450 	 * those transactions which see the deleting one as still-in-progress will
1451 	 * expect such tuples to be there once we mark the index as valid.
1452 	 *
1453 	 * We solve this by waiting for all endangered transactions to exit before
1454 	 * we mark the index as valid.
1455 	 *
1456 	 * We also set ActiveSnapshot to this snap, since functions in indexes may
1457 	 * need a snapshot.
1458 	 */
1459 	snapshot = RegisterSnapshot(GetTransactionSnapshot());
1460 	PushActiveSnapshot(snapshot);
1461 
1462 	/*
1463 	 * Scan the index and the heap, insert any missing index entries.
1464 	 */
1465 	validate_index(relationId, indexRelationId, snapshot);
1466 
1467 	/*
1468 	 * Drop the reference snapshot.  We must do this before waiting out other
1469 	 * snapshot holders, else we will deadlock against other processes also
1470 	 * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one
1471 	 * they must wait for.  But first, save the snapshot's xmin to use as
1472 	 * limitXmin for GetCurrentVirtualXIDs().
1473 	 */
1474 	limitXmin = snapshot->xmin;
1475 
1476 	PopActiveSnapshot();
1477 	UnregisterSnapshot(snapshot);
1478 
1479 	/*
1480 	 * The snapshot subsystem could still contain registered snapshots that
1481 	 * are holding back our process's advertised xmin; in particular, if
1482 	 * default_transaction_isolation = serializable, there is a transaction
1483 	 * snapshot that is still active.  The CatalogSnapshot is likewise a
1484 	 * hazard.  To ensure no deadlocks, we must commit and start yet another
1485 	 * transaction, and do our wait before any snapshot has been taken in it.
1486 	 */
1487 	CommitTransactionCommand();
1488 	StartTransactionCommand();
1489 
1490 	/* We should now definitely not be advertising any xmin. */
1491 	Assert(MyPgXact->xmin == InvalidTransactionId);
1492 
1493 	/*
1494 	 * The index is now valid in the sense that it contains all currently
1495 	 * interesting tuples.  But since it might not contain tuples deleted just
1496 	 * before the reference snap was taken, we have to wait out any
1497 	 * transactions that might have older snapshots.
1498 	 */
1499 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
1500 								 PROGRESS_CREATEIDX_PHASE_WAIT_3);
1501 	WaitForOlderSnapshots(limitXmin, true);
1502 
1503 	/*
1504 	 * Index can now be marked valid -- update its pg_index entry
1505 	 */
1506 	index_set_state_flags(indexRelationId, INDEX_CREATE_SET_VALID);
1507 
1508 	/*
1509 	 * The pg_index update will cause backends (including this one) to update
1510 	 * relcache entries for the index itself, but we should also send a
1511 	 * relcache inval on the parent table to force replanning of cached plans.
1512 	 * Otherwise existing sessions might fail to use the new index where it
1513 	 * would be useful.  (Note that our earlier commits did not create reasons
1514 	 * to replan; so relcache flush on the index itself was sufficient.)
1515 	 */
1516 	CacheInvalidateRelcacheByRelid(heaprelid.relId);
1517 
1518 	/*
1519 	 * Last thing to do is release the session-level lock on the parent table.
1520 	 */
1521 	UnlockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
1522 
1523 	pgstat_progress_end_command();
1524 
1525 	return address;
1526 }
1527 
1528 
1529 /*
1530  * CheckMutability
1531  *		Test whether given expression is mutable
1532  */
1533 static bool
1534 CheckMutability(Expr *expr)
1535 {
1536 	/*
1537 	 * First run the expression through the planner.  This has a couple of
1538 	 * important consequences.  First, function default arguments will get
1539 	 * inserted, which may affect volatility (consider "default now()").
1540 	 * Second, inline-able functions will get inlined, which may allow us to
1541 	 * conclude that the function is really less volatile than it's marked. As
1542 	 * an example, polymorphic functions must be marked with the most volatile
1543 	 * behavior that they have for any input type, but once we inline the
1544 	 * function we may be able to conclude that it's not so volatile for the
1545 	 * particular input type we're dealing with.
1546 	 *
1547 	 * We assume here that expression_planner() won't scribble on its input.
1548 	 */
1549 	expr = expression_planner(expr);
1550 
1551 	/* Now we can search for non-immutable functions */
1552 	return contain_mutable_functions((Node *) expr);
1553 }
1554 
1555 
1556 /*
1557  * CheckPredicate
1558  *		Checks that the given partial-index predicate is valid.
1559  *
1560  * This used to also constrain the form of the predicate to forms that
1561  * indxpath.c could do something with.  However, that seems overly
1562  * restrictive.  One useful application of partial indexes is to apply
1563  * a UNIQUE constraint across a subset of a table, and in that scenario
1564  * any evaluable predicate will work.  So accept any predicate here
1565  * (except ones requiring a plan), and let indxpath.c fend for itself.
1566  */
1567 static void
1568 CheckPredicate(Expr *predicate)
1569 {
1570 	/*
1571 	 * transformExpr() should have already rejected subqueries, aggregates,
1572 	 * and window functions, based on the EXPR_KIND_ for a predicate.
1573 	 */
1574 
1575 	/*
1576 	 * A predicate using mutable functions is probably wrong, for the same
1577 	 * reasons that we don't allow an index expression to use one.
1578 	 */
1579 	if (CheckMutability(predicate))
1580 		ereport(ERROR,
1581 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1582 				 errmsg("functions in index predicate must be marked IMMUTABLE")));
1583 }
1584 
1585 /*
1586  * Compute per-index-column information, including indexed column numbers
1587  * or index expressions, opclasses, and indoptions. Note, all output vectors
1588  * should be allocated for all columns, including "including" ones.
1589  */
1590 static void
1591 ComputeIndexAttrs(IndexInfo *indexInfo,
1592 				  Oid *typeOidP,
1593 				  Oid *collationOidP,
1594 				  Oid *classOidP,
1595 				  int16 *colOptionP,
1596 				  List *attList,	/* list of IndexElem's */
1597 				  List *exclusionOpNames,
1598 				  Oid relId,
1599 				  const char *accessMethodName,
1600 				  Oid accessMethodId,
1601 				  bool amcanorder,
1602 				  bool isconstraint)
1603 {
1604 	ListCell   *nextExclOp;
1605 	ListCell   *lc;
1606 	int			attn;
1607 	int			nkeycols = indexInfo->ii_NumIndexKeyAttrs;
1608 
1609 	/* Allocate space for exclusion operator info, if needed */
1610 	if (exclusionOpNames)
1611 	{
1612 		Assert(list_length(exclusionOpNames) == nkeycols);
1613 		indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
1614 		indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
1615 		indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
1616 		nextExclOp = list_head(exclusionOpNames);
1617 	}
1618 	else
1619 		nextExclOp = NULL;
1620 
1621 	/*
1622 	 * process attributeList
1623 	 */
1624 	attn = 0;
1625 	foreach(lc, attList)
1626 	{
1627 		IndexElem  *attribute = (IndexElem *) lfirst(lc);
1628 		Oid			atttype;
1629 		Oid			attcollation;
1630 
1631 		/*
1632 		 * Process the column-or-expression to be indexed.
1633 		 */
1634 		if (attribute->name != NULL)
1635 		{
1636 			/* Simple index attribute */
1637 			HeapTuple	atttuple;
1638 			Form_pg_attribute attform;
1639 
1640 			Assert(attribute->expr == NULL);
1641 			atttuple = SearchSysCacheAttName(relId, attribute->name);
1642 			if (!HeapTupleIsValid(atttuple))
1643 			{
1644 				/* difference in error message spellings is historical */
1645 				if (isconstraint)
1646 					ereport(ERROR,
1647 							(errcode(ERRCODE_UNDEFINED_COLUMN),
1648 							 errmsg("column \"%s\" named in key does not exist",
1649 									attribute->name)));
1650 				else
1651 					ereport(ERROR,
1652 							(errcode(ERRCODE_UNDEFINED_COLUMN),
1653 							 errmsg("column \"%s\" does not exist",
1654 									attribute->name)));
1655 			}
1656 			attform = (Form_pg_attribute) GETSTRUCT(atttuple);
1657 			indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
1658 			atttype = attform->atttypid;
1659 			attcollation = attform->attcollation;
1660 			ReleaseSysCache(atttuple);
1661 		}
1662 		else
1663 		{
1664 			/* Index expression */
1665 			Node	   *expr = attribute->expr;
1666 
1667 			Assert(expr != NULL);
1668 
1669 			if (attn >= nkeycols)
1670 				ereport(ERROR,
1671 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1672 						 errmsg("expressions are not supported in included columns")));
1673 			atttype = exprType(expr);
1674 			attcollation = exprCollation(expr);
1675 
1676 			/*
1677 			 * Strip any top-level COLLATE clause.  This ensures that we treat
1678 			 * "x COLLATE y" and "(x COLLATE y)" alike.
1679 			 */
1680 			while (IsA(expr, CollateExpr))
1681 				expr = (Node *) ((CollateExpr *) expr)->arg;
1682 
1683 			if (IsA(expr, Var) &&
1684 				((Var *) expr)->varattno != InvalidAttrNumber)
1685 			{
1686 				/*
1687 				 * User wrote "(column)" or "(column COLLATE something)".
1688 				 * Treat it like simple attribute anyway.
1689 				 */
1690 				indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
1691 			}
1692 			else
1693 			{
1694 				indexInfo->ii_IndexAttrNumbers[attn] = 0;	/* marks expression */
1695 				indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
1696 													expr);
1697 
1698 				/*
1699 				 * transformExpr() should have already rejected subqueries,
1700 				 * aggregates, and window functions, based on the EXPR_KIND_
1701 				 * for an index expression.
1702 				 */
1703 
1704 				/*
1705 				 * An expression using mutable functions is probably wrong,
1706 				 * since if you aren't going to get the same result for the
1707 				 * same data every time, it's not clear what the index entries
1708 				 * mean at all.
1709 				 */
1710 				if (CheckMutability((Expr *) expr))
1711 					ereport(ERROR,
1712 							(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1713 							 errmsg("functions in index expression must be marked IMMUTABLE")));
1714 			}
1715 		}
1716 
1717 		typeOidP[attn] = atttype;
1718 
1719 		/*
1720 		 * Included columns have no collation, no opclass and no ordering
1721 		 * options.
1722 		 */
1723 		if (attn >= nkeycols)
1724 		{
1725 			if (attribute->collation)
1726 				ereport(ERROR,
1727 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1728 						 errmsg("including column does not support a collation")));
1729 			if (attribute->opclass)
1730 				ereport(ERROR,
1731 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1732 						 errmsg("including column does not support an operator class")));
1733 			if (attribute->ordering != SORTBY_DEFAULT)
1734 				ereport(ERROR,
1735 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1736 						 errmsg("including column does not support ASC/DESC options")));
1737 			if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
1738 				ereport(ERROR,
1739 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1740 						 errmsg("including column does not support NULLS FIRST/LAST options")));
1741 
1742 			classOidP[attn] = InvalidOid;
1743 			colOptionP[attn] = 0;
1744 			collationOidP[attn] = InvalidOid;
1745 			attn++;
1746 
1747 			continue;
1748 		}
1749 
1750 		/*
1751 		 * Apply collation override if any
1752 		 */
1753 		if (attribute->collation)
1754 			attcollation = get_collation_oid(attribute->collation, false);
1755 
1756 		/*
1757 		 * Check we have a collation iff it's a collatable type.  The only
1758 		 * expected failures here are (1) COLLATE applied to a noncollatable
1759 		 * type, or (2) index expression had an unresolved collation.  But we
1760 		 * might as well code this to be a complete consistency check.
1761 		 */
1762 		if (type_is_collatable(atttype))
1763 		{
1764 			if (!OidIsValid(attcollation))
1765 				ereport(ERROR,
1766 						(errcode(ERRCODE_INDETERMINATE_COLLATION),
1767 						 errmsg("could not determine which collation to use for index expression"),
1768 						 errhint("Use the COLLATE clause to set the collation explicitly.")));
1769 		}
1770 		else
1771 		{
1772 			if (OidIsValid(attcollation))
1773 				ereport(ERROR,
1774 						(errcode(ERRCODE_DATATYPE_MISMATCH),
1775 						 errmsg("collations are not supported by type %s",
1776 								format_type_be(atttype))));
1777 		}
1778 
1779 		collationOidP[attn] = attcollation;
1780 
1781 		/*
1782 		 * Identify the opclass to use.
1783 		 */
1784 		classOidP[attn] = ResolveOpClass(attribute->opclass,
1785 										 atttype,
1786 										 accessMethodName,
1787 										 accessMethodId);
1788 
1789 		/*
1790 		 * Identify the exclusion operator, if any.
1791 		 */
1792 		if (nextExclOp)
1793 		{
1794 			List	   *opname = (List *) lfirst(nextExclOp);
1795 			Oid			opid;
1796 			Oid			opfamily;
1797 			int			strat;
1798 
1799 			/*
1800 			 * Find the operator --- it must accept the column datatype
1801 			 * without runtime coercion (but binary compatibility is OK)
1802 			 */
1803 			opid = compatible_oper_opid(opname, atttype, atttype, false);
1804 
1805 			/*
1806 			 * Only allow commutative operators to be used in exclusion
1807 			 * constraints. If X conflicts with Y, but Y does not conflict
1808 			 * with X, bad things will happen.
1809 			 */
1810 			if (get_commutator(opid) != opid)
1811 				ereport(ERROR,
1812 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1813 						 errmsg("operator %s is not commutative",
1814 								format_operator(opid)),
1815 						 errdetail("Only commutative operators can be used in exclusion constraints.")));
1816 
1817 			/*
1818 			 * Operator must be a member of the right opfamily, too
1819 			 */
1820 			opfamily = get_opclass_family(classOidP[attn]);
1821 			strat = get_op_opfamily_strategy(opid, opfamily);
1822 			if (strat == 0)
1823 			{
1824 				HeapTuple	opftuple;
1825 				Form_pg_opfamily opfform;
1826 
1827 				/*
1828 				 * attribute->opclass might not explicitly name the opfamily,
1829 				 * so fetch the name of the selected opfamily for use in the
1830 				 * error message.
1831 				 */
1832 				opftuple = SearchSysCache1(OPFAMILYOID,
1833 										   ObjectIdGetDatum(opfamily));
1834 				if (!HeapTupleIsValid(opftuple))
1835 					elog(ERROR, "cache lookup failed for opfamily %u",
1836 						 opfamily);
1837 				opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
1838 
1839 				ereport(ERROR,
1840 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
1841 						 errmsg("operator %s is not a member of operator family \"%s\"",
1842 								format_operator(opid),
1843 								NameStr(opfform->opfname)),
1844 						 errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
1845 			}
1846 
1847 			indexInfo->ii_ExclusionOps[attn] = opid;
1848 			indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
1849 			indexInfo->ii_ExclusionStrats[attn] = strat;
1850 			nextExclOp = lnext(nextExclOp);
1851 		}
1852 
1853 		/*
1854 		 * Set up the per-column options (indoption field).  For now, this is
1855 		 * zero for any un-ordered index, while ordered indexes have DESC and
1856 		 * NULLS FIRST/LAST options.
1857 		 */
1858 		colOptionP[attn] = 0;
1859 		if (amcanorder)
1860 		{
1861 			/* default ordering is ASC */
1862 			if (attribute->ordering == SORTBY_DESC)
1863 				colOptionP[attn] |= INDOPTION_DESC;
1864 			/* default null ordering is LAST for ASC, FIRST for DESC */
1865 			if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
1866 			{
1867 				if (attribute->ordering == SORTBY_DESC)
1868 					colOptionP[attn] |= INDOPTION_NULLS_FIRST;
1869 			}
1870 			else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
1871 				colOptionP[attn] |= INDOPTION_NULLS_FIRST;
1872 		}
1873 		else
1874 		{
1875 			/* index AM does not support ordering */
1876 			if (attribute->ordering != SORTBY_DEFAULT)
1877 				ereport(ERROR,
1878 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1879 						 errmsg("access method \"%s\" does not support ASC/DESC options",
1880 								accessMethodName)));
1881 			if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
1882 				ereport(ERROR,
1883 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1884 						 errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
1885 								accessMethodName)));
1886 		}
1887 
1888 		attn++;
1889 	}
1890 }
1891 
1892 /*
1893  * Resolve possibly-defaulted operator class specification
1894  *
1895  * Note: This is used to resolve operator class specification in index and
1896  * partition key definitions.
1897  */
1898 Oid
1899 ResolveOpClass(List *opclass, Oid attrType,
1900 			   const char *accessMethodName, Oid accessMethodId)
1901 {
1902 	char	   *schemaname;
1903 	char	   *opcname;
1904 	HeapTuple	tuple;
1905 	Form_pg_opclass opform;
1906 	Oid			opClassId,
1907 				opInputType;
1908 
1909 	/*
1910 	 * Release 7.0 removed network_ops, timespan_ops, and datetime_ops, so we
1911 	 * ignore those opclass names so the default *_ops is used.  This can be
1912 	 * removed in some later release.  bjm 2000/02/07
1913 	 *
1914 	 * Release 7.1 removes lztext_ops, so suppress that too for a while.  tgl
1915 	 * 2000/07/30
1916 	 *
1917 	 * Release 7.2 renames timestamp_ops to timestamptz_ops, so suppress that
1918 	 * too for awhile.  I'm starting to think we need a better approach. tgl
1919 	 * 2000/10/01
1920 	 *
1921 	 * Release 8.0 removes bigbox_ops (which was dead code for a long while
1922 	 * anyway).  tgl 2003/11/11
1923 	 */
1924 	if (list_length(opclass) == 1)
1925 	{
1926 		char	   *claname = strVal(linitial(opclass));
1927 
1928 		if (strcmp(claname, "network_ops") == 0 ||
1929 			strcmp(claname, "timespan_ops") == 0 ||
1930 			strcmp(claname, "datetime_ops") == 0 ||
1931 			strcmp(claname, "lztext_ops") == 0 ||
1932 			strcmp(claname, "timestamp_ops") == 0 ||
1933 			strcmp(claname, "bigbox_ops") == 0)
1934 			opclass = NIL;
1935 	}
1936 
1937 	if (opclass == NIL)
1938 	{
1939 		/* no operator class specified, so find the default */
1940 		opClassId = GetDefaultOpClass(attrType, accessMethodId);
1941 		if (!OidIsValid(opClassId))
1942 			ereport(ERROR,
1943 					(errcode(ERRCODE_UNDEFINED_OBJECT),
1944 					 errmsg("data type %s has no default operator class for access method \"%s\"",
1945 							format_type_be(attrType), accessMethodName),
1946 					 errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
1947 		return opClassId;
1948 	}
1949 
1950 	/*
1951 	 * Specific opclass name given, so look up the opclass.
1952 	 */
1953 
1954 	/* deconstruct the name list */
1955 	DeconstructQualifiedName(opclass, &schemaname, &opcname);
1956 
1957 	if (schemaname)
1958 	{
1959 		/* Look in specific schema only */
1960 		Oid			namespaceId;
1961 
1962 		namespaceId = LookupExplicitNamespace(schemaname, false);
1963 		tuple = SearchSysCache3(CLAAMNAMENSP,
1964 								ObjectIdGetDatum(accessMethodId),
1965 								PointerGetDatum(opcname),
1966 								ObjectIdGetDatum(namespaceId));
1967 	}
1968 	else
1969 	{
1970 		/* Unqualified opclass name, so search the search path */
1971 		opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
1972 		if (!OidIsValid(opClassId))
1973 			ereport(ERROR,
1974 					(errcode(ERRCODE_UNDEFINED_OBJECT),
1975 					 errmsg("operator class \"%s\" does not exist for access method \"%s\"",
1976 							opcname, accessMethodName)));
1977 		tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opClassId));
1978 	}
1979 
1980 	if (!HeapTupleIsValid(tuple))
1981 		ereport(ERROR,
1982 				(errcode(ERRCODE_UNDEFINED_OBJECT),
1983 				 errmsg("operator class \"%s\" does not exist for access method \"%s\"",
1984 						NameListToString(opclass), accessMethodName)));
1985 
1986 	/*
1987 	 * Verify that the index operator class accepts this datatype.  Note we
1988 	 * will accept binary compatibility.
1989 	 */
1990 	opform = (Form_pg_opclass) GETSTRUCT(tuple);
1991 	opClassId = opform->oid;
1992 	opInputType = opform->opcintype;
1993 
1994 	if (!IsBinaryCoercible(attrType, opInputType))
1995 		ereport(ERROR,
1996 				(errcode(ERRCODE_DATATYPE_MISMATCH),
1997 				 errmsg("operator class \"%s\" does not accept data type %s",
1998 						NameListToString(opclass), format_type_be(attrType))));
1999 
2000 	ReleaseSysCache(tuple);
2001 
2002 	return opClassId;
2003 }
2004 
2005 /*
2006  * GetDefaultOpClass
2007  *
2008  * Given the OIDs of a datatype and an access method, find the default
2009  * operator class, if any.  Returns InvalidOid if there is none.
2010  */
2011 Oid
2012 GetDefaultOpClass(Oid type_id, Oid am_id)
2013 {
2014 	Oid			result = InvalidOid;
2015 	int			nexact = 0;
2016 	int			ncompatible = 0;
2017 	int			ncompatiblepreferred = 0;
2018 	Relation	rel;
2019 	ScanKeyData skey[1];
2020 	SysScanDesc scan;
2021 	HeapTuple	tup;
2022 	TYPCATEGORY tcategory;
2023 
2024 	/* If it's a domain, look at the base type instead */
2025 	type_id = getBaseType(type_id);
2026 
2027 	tcategory = TypeCategory(type_id);
2028 
2029 	/*
2030 	 * We scan through all the opclasses available for the access method,
2031 	 * looking for one that is marked default and matches the target type
2032 	 * (either exactly or binary-compatibly, but prefer an exact match).
2033 	 *
2034 	 * We could find more than one binary-compatible match.  If just one is
2035 	 * for a preferred type, use that one; otherwise we fail, forcing the user
2036 	 * to specify which one he wants.  (The preferred-type special case is a
2037 	 * kluge for varchar: it's binary-compatible to both text and bpchar, so
2038 	 * we need a tiebreaker.)  If we find more than one exact match, then
2039 	 * someone put bogus entries in pg_opclass.
2040 	 */
2041 	rel = table_open(OperatorClassRelationId, AccessShareLock);
2042 
2043 	ScanKeyInit(&skey[0],
2044 				Anum_pg_opclass_opcmethod,
2045 				BTEqualStrategyNumber, F_OIDEQ,
2046 				ObjectIdGetDatum(am_id));
2047 
2048 	scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
2049 							  NULL, 1, skey);
2050 
2051 	while (HeapTupleIsValid(tup = systable_getnext(scan)))
2052 	{
2053 		Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
2054 
2055 		/* ignore altogether if not a default opclass */
2056 		if (!opclass->opcdefault)
2057 			continue;
2058 		if (opclass->opcintype == type_id)
2059 		{
2060 			nexact++;
2061 			result = opclass->oid;
2062 		}
2063 		else if (nexact == 0 &&
2064 				 IsBinaryCoercible(type_id, opclass->opcintype))
2065 		{
2066 			if (IsPreferredType(tcategory, opclass->opcintype))
2067 			{
2068 				ncompatiblepreferred++;
2069 				result = opclass->oid;
2070 			}
2071 			else if (ncompatiblepreferred == 0)
2072 			{
2073 				ncompatible++;
2074 				result = opclass->oid;
2075 			}
2076 		}
2077 	}
2078 
2079 	systable_endscan(scan);
2080 
2081 	table_close(rel, AccessShareLock);
2082 
2083 	/* raise error if pg_opclass contains inconsistent data */
2084 	if (nexact > 1)
2085 		ereport(ERROR,
2086 				(errcode(ERRCODE_DUPLICATE_OBJECT),
2087 				 errmsg("there are multiple default operator classes for data type %s",
2088 						format_type_be(type_id))));
2089 
2090 	if (nexact == 1 ||
2091 		ncompatiblepreferred == 1 ||
2092 		(ncompatiblepreferred == 0 && ncompatible == 1))
2093 		return result;
2094 
2095 	return InvalidOid;
2096 }
2097 
2098 /*
2099  *	makeObjectName()
2100  *
2101  *	Create a name for an implicitly created index, sequence, constraint,
2102  *	extended statistics, etc.
2103  *
2104  *	The parameters are typically: the original table name, the original field
2105  *	name, and a "type" string (such as "seq" or "pkey").    The field name
2106  *	and/or type can be NULL if not relevant.
2107  *
2108  *	The result is a palloc'd string.
2109  *
2110  *	The basic result we want is "name1_name2_label", omitting "_name2" or
2111  *	"_label" when those parameters are NULL.  However, we must generate
2112  *	a name with less than NAMEDATALEN characters!  So, we truncate one or
2113  *	both names if necessary to make a short-enough string.  The label part
2114  *	is never truncated (so it had better be reasonably short).
2115  *
2116  *	The caller is responsible for checking uniqueness of the generated
2117  *	name and retrying as needed; retrying will be done by altering the
2118  *	"label" string (which is why we never truncate that part).
2119  */
2120 char *
2121 makeObjectName(const char *name1, const char *name2, const char *label)
2122 {
2123 	char	   *name;
2124 	int			overhead = 0;	/* chars needed for label and underscores */
2125 	int			availchars;		/* chars available for name(s) */
2126 	int			name1chars;		/* chars allocated to name1 */
2127 	int			name2chars;		/* chars allocated to name2 */
2128 	int			ndx;
2129 
2130 	name1chars = strlen(name1);
2131 	if (name2)
2132 	{
2133 		name2chars = strlen(name2);
2134 		overhead++;				/* allow for separating underscore */
2135 	}
2136 	else
2137 		name2chars = 0;
2138 	if (label)
2139 		overhead += strlen(label) + 1;
2140 
2141 	availchars = NAMEDATALEN - 1 - overhead;
2142 	Assert(availchars > 0);		/* else caller chose a bad label */
2143 
2144 	/*
2145 	 * If we must truncate,  preferentially truncate the longer name. This
2146 	 * logic could be expressed without a loop, but it's simple and obvious as
2147 	 * a loop.
2148 	 */
2149 	while (name1chars + name2chars > availchars)
2150 	{
2151 		if (name1chars > name2chars)
2152 			name1chars--;
2153 		else
2154 			name2chars--;
2155 	}
2156 
2157 	name1chars = pg_mbcliplen(name1, name1chars, name1chars);
2158 	if (name2)
2159 		name2chars = pg_mbcliplen(name2, name2chars, name2chars);
2160 
2161 	/* Now construct the string using the chosen lengths */
2162 	name = palloc(name1chars + name2chars + overhead + 1);
2163 	memcpy(name, name1, name1chars);
2164 	ndx = name1chars;
2165 	if (name2)
2166 	{
2167 		name[ndx++] = '_';
2168 		memcpy(name + ndx, name2, name2chars);
2169 		ndx += name2chars;
2170 	}
2171 	if (label)
2172 	{
2173 		name[ndx++] = '_';
2174 		strcpy(name + ndx, label);
2175 	}
2176 	else
2177 		name[ndx] = '\0';
2178 
2179 	return name;
2180 }
2181 
2182 /*
2183  * Select a nonconflicting name for a new relation.  This is ordinarily
2184  * used to choose index names (which is why it's here) but it can also
2185  * be used for sequences, or any autogenerated relation kind.
2186  *
2187  * name1, name2, and label are used the same way as for makeObjectName(),
2188  * except that the label can't be NULL; digits will be appended to the label
2189  * if needed to create a name that is unique within the specified namespace.
2190  *
2191  * If isconstraint is true, we also avoid choosing a name matching any
2192  * existing constraint in the same namespace.  (This is stricter than what
2193  * Postgres itself requires, but the SQL standard says that constraint names
2194  * should be unique within schemas, so we follow that for autogenerated
2195  * constraint names.)
2196  *
2197  * Note: it is theoretically possible to get a collision anyway, if someone
2198  * else chooses the same name concurrently.  This is fairly unlikely to be
2199  * a problem in practice, especially if one is holding an exclusive lock on
2200  * the relation identified by name1.  However, if choosing multiple names
2201  * within a single command, you'd better create the new object and do
2202  * CommandCounterIncrement before choosing the next one!
2203  *
2204  * Returns a palloc'd string.
2205  */
2206 char *
2207 ChooseRelationName(const char *name1, const char *name2,
2208 				   const char *label, Oid namespaceid,
2209 				   bool isconstraint)
2210 {
2211 	int			pass = 0;
2212 	char	   *relname = NULL;
2213 	char		modlabel[NAMEDATALEN];
2214 
2215 	/* try the unmodified label first */
2216 	StrNCpy(modlabel, label, sizeof(modlabel));
2217 
2218 	for (;;)
2219 	{
2220 		relname = makeObjectName(name1, name2, modlabel);
2221 
2222 		if (!OidIsValid(get_relname_relid(relname, namespaceid)))
2223 		{
2224 			if (!isconstraint ||
2225 				!ConstraintNameExists(relname, namespaceid))
2226 				break;
2227 		}
2228 
2229 		/* found a conflict, so try a new name component */
2230 		pfree(relname);
2231 		snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
2232 	}
2233 
2234 	return relname;
2235 }
2236 
2237 /*
2238  * Select the name to be used for an index.
2239  *
2240  * The argument list is pretty ad-hoc :-(
2241  */
2242 static char *
2243 ChooseIndexName(const char *tabname, Oid namespaceId,
2244 				List *colnames, List *exclusionOpNames,
2245 				bool primary, bool isconstraint)
2246 {
2247 	char	   *indexname;
2248 
2249 	if (primary)
2250 	{
2251 		/* the primary key's name does not depend on the specific column(s) */
2252 		indexname = ChooseRelationName(tabname,
2253 									   NULL,
2254 									   "pkey",
2255 									   namespaceId,
2256 									   true);
2257 	}
2258 	else if (exclusionOpNames != NIL)
2259 	{
2260 		indexname = ChooseRelationName(tabname,
2261 									   ChooseIndexNameAddition(colnames),
2262 									   "excl",
2263 									   namespaceId,
2264 									   true);
2265 	}
2266 	else if (isconstraint)
2267 	{
2268 		indexname = ChooseRelationName(tabname,
2269 									   ChooseIndexNameAddition(colnames),
2270 									   "key",
2271 									   namespaceId,
2272 									   true);
2273 	}
2274 	else
2275 	{
2276 		indexname = ChooseRelationName(tabname,
2277 									   ChooseIndexNameAddition(colnames),
2278 									   "idx",
2279 									   namespaceId,
2280 									   false);
2281 	}
2282 
2283 	return indexname;
2284 }
2285 
2286 /*
2287  * Generate "name2" for a new index given the list of column names for it
2288  * (as produced by ChooseIndexColumnNames).  This will be passed to
2289  * ChooseRelationName along with the parent table name and a suitable label.
2290  *
2291  * We know that less than NAMEDATALEN characters will actually be used,
2292  * so we can truncate the result once we've generated that many.
2293  *
2294  * XXX See also ChooseForeignKeyConstraintNameAddition and
2295  * ChooseExtendedStatisticNameAddition.
2296  */
2297 static char *
2298 ChooseIndexNameAddition(List *colnames)
2299 {
2300 	char		buf[NAMEDATALEN * 2];
2301 	int			buflen = 0;
2302 	ListCell   *lc;
2303 
2304 	buf[0] = '\0';
2305 	foreach(lc, colnames)
2306 	{
2307 		const char *name = (const char *) lfirst(lc);
2308 
2309 		if (buflen > 0)
2310 			buf[buflen++] = '_';	/* insert _ between names */
2311 
2312 		/*
2313 		 * At this point we have buflen <= NAMEDATALEN.  name should be less
2314 		 * than NAMEDATALEN already, but use strlcpy for paranoia.
2315 		 */
2316 		strlcpy(buf + buflen, name, NAMEDATALEN);
2317 		buflen += strlen(buf + buflen);
2318 		if (buflen >= NAMEDATALEN)
2319 			break;
2320 	}
2321 	return pstrdup(buf);
2322 }
2323 
2324 /*
2325  * Select the actual names to be used for the columns of an index, given the
2326  * list of IndexElems for the columns.  This is mostly about ensuring the
2327  * names are unique so we don't get a conflicting-attribute-names error.
2328  *
2329  * Returns a List of plain strings (char *, not String nodes).
2330  */
2331 static List *
2332 ChooseIndexColumnNames(List *indexElems)
2333 {
2334 	List	   *result = NIL;
2335 	ListCell   *lc;
2336 
2337 	foreach(lc, indexElems)
2338 	{
2339 		IndexElem  *ielem = (IndexElem *) lfirst(lc);
2340 		const char *origname;
2341 		const char *curname;
2342 		int			i;
2343 		char		buf[NAMEDATALEN];
2344 
2345 		/* Get the preliminary name from the IndexElem */
2346 		if (ielem->indexcolname)
2347 			origname = ielem->indexcolname; /* caller-specified name */
2348 		else if (ielem->name)
2349 			origname = ielem->name; /* simple column reference */
2350 		else
2351 			origname = "expr";	/* default name for expression */
2352 
2353 		/* If it conflicts with any previous column, tweak it */
2354 		curname = origname;
2355 		for (i = 1;; i++)
2356 		{
2357 			ListCell   *lc2;
2358 			char		nbuf[32];
2359 			int			nlen;
2360 
2361 			foreach(lc2, result)
2362 			{
2363 				if (strcmp(curname, (char *) lfirst(lc2)) == 0)
2364 					break;
2365 			}
2366 			if (lc2 == NULL)
2367 				break;			/* found nonconflicting name */
2368 
2369 			sprintf(nbuf, "%d", i);
2370 
2371 			/* Ensure generated names are shorter than NAMEDATALEN */
2372 			nlen = pg_mbcliplen(origname, strlen(origname),
2373 								NAMEDATALEN - 1 - strlen(nbuf));
2374 			memcpy(buf, origname, nlen);
2375 			strcpy(buf + nlen, nbuf);
2376 			curname = buf;
2377 		}
2378 
2379 		/* And attach to the result list */
2380 		result = lappend(result, pstrdup(curname));
2381 	}
2382 	return result;
2383 }
2384 
2385 /*
2386  * ReindexIndex
2387  *		Recreate a specific index.
2388  */
2389 void
2390 ReindexIndex(RangeVar *indexRelation, int options, bool concurrent)
2391 {
2392 	struct ReindexIndexCallbackState state;
2393 	Oid			indOid;
2394 	Relation	irel;
2395 	char		persistence;
2396 
2397 	/*
2398 	 * Find and lock index, and check permissions on table; use callback to
2399 	 * obtain lock on table first, to avoid deadlock hazard.  The lock level
2400 	 * used here must match the index lock obtained in reindex_index().
2401 	 *
2402 	 * If it's a temporary index, we will perform a non-concurrent reindex,
2403 	 * even if CONCURRENTLY was requested.  In that case, reindex_index() will
2404 	 * upgrade the lock, but that's OK, because other sessions can't hold
2405 	 * locks on our temporary table.
2406 	 */
2407 	state.concurrent = concurrent;
2408 	state.locked_table_oid = InvalidOid;
2409 	indOid = RangeVarGetRelidExtended(indexRelation,
2410 									  concurrent ? ShareUpdateExclusiveLock : AccessExclusiveLock,
2411 									  0,
2412 									  RangeVarCallbackForReindexIndex,
2413 									  &state);
2414 
2415 	/*
2416 	 * Obtain the current persistence of the existing index.  We already hold
2417 	 * lock on the index.
2418 	 */
2419 	irel = index_open(indOid, NoLock);
2420 
2421 	if (irel->rd_rel->relkind == RELKIND_PARTITIONED_INDEX)
2422 	{
2423 		ReindexPartitionedIndex(irel);
2424 		return;
2425 	}
2426 
2427 	persistence = irel->rd_rel->relpersistence;
2428 	index_close(irel, NoLock);
2429 
2430 	if (concurrent && persistence != RELPERSISTENCE_TEMP)
2431 		ReindexRelationConcurrently(indOid, options);
2432 	else
2433 		reindex_index(indOid, false, persistence,
2434 					  options | REINDEXOPT_REPORT_PROGRESS);
2435 }
2436 
2437 /*
2438  * Check permissions on table before acquiring relation lock; also lock
2439  * the heap before the RangeVarGetRelidExtended takes the index lock, to avoid
2440  * deadlocks.
2441  */
2442 static void
2443 RangeVarCallbackForReindexIndex(const RangeVar *relation,
2444 								Oid relId, Oid oldRelId, void *arg)
2445 {
2446 	char		relkind;
2447 	struct ReindexIndexCallbackState *state = arg;
2448 	LOCKMODE	table_lockmode;
2449 
2450 	/*
2451 	 * Lock level here should match table lock in reindex_index() for
2452 	 * non-concurrent case and table locks used by index_concurrently_*() for
2453 	 * concurrent case.
2454 	 */
2455 	table_lockmode = state->concurrent ? ShareUpdateExclusiveLock : ShareLock;
2456 
2457 	/*
2458 	 * If we previously locked some other index's heap, and the name we're
2459 	 * looking up no longer refers to that relation, release the now-useless
2460 	 * lock.
2461 	 */
2462 	if (relId != oldRelId && OidIsValid(oldRelId))
2463 	{
2464 		UnlockRelationOid(state->locked_table_oid, table_lockmode);
2465 		state->locked_table_oid = InvalidOid;
2466 	}
2467 
2468 	/* If the relation does not exist, there's nothing more to do. */
2469 	if (!OidIsValid(relId))
2470 		return;
2471 
2472 	/*
2473 	 * If the relation does exist, check whether it's an index.  But note that
2474 	 * the relation might have been dropped between the time we did the name
2475 	 * lookup and now.  In that case, there's nothing to do.
2476 	 */
2477 	relkind = get_rel_relkind(relId);
2478 	if (!relkind)
2479 		return;
2480 	if (relkind != RELKIND_INDEX &&
2481 		relkind != RELKIND_PARTITIONED_INDEX)
2482 		ereport(ERROR,
2483 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2484 				 errmsg("\"%s\" is not an index", relation->relname)));
2485 
2486 	/* Check permissions */
2487 	if (!pg_class_ownercheck(relId, GetUserId()))
2488 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX, relation->relname);
2489 
2490 	/* Lock heap before index to avoid deadlock. */
2491 	if (relId != oldRelId)
2492 	{
2493 		Oid			table_oid = IndexGetRelation(relId, true);
2494 
2495 		/*
2496 		 * If the OID isn't valid, it means the index was concurrently
2497 		 * dropped, which is not a problem for us; just return normally.
2498 		 */
2499 		if (OidIsValid(table_oid))
2500 		{
2501 			LockRelationOid(table_oid, table_lockmode);
2502 			state->locked_table_oid = table_oid;
2503 		}
2504 	}
2505 }
2506 
2507 /*
2508  * ReindexTable
2509  *		Recreate all indexes of a table (and of its toast table, if any)
2510  */
2511 Oid
2512 ReindexTable(RangeVar *relation, int options, bool concurrent)
2513 {
2514 	Oid			heapOid;
2515 	bool		result;
2516 
2517 	/*
2518 	 * The lock level used here should match reindex_relation().
2519 	 *
2520 	 * If it's a temporary table, we will perform a non-concurrent reindex,
2521 	 * even if CONCURRENTLY was requested.  In that case, reindex_relation()
2522 	 * will upgrade the lock, but that's OK, because other sessions can't hold
2523 	 * locks on our temporary table.
2524 	 */
2525 	heapOid = RangeVarGetRelidExtended(relation,
2526 									   concurrent ? ShareUpdateExclusiveLock : ShareLock,
2527 									   0,
2528 									   RangeVarCallbackOwnsTable, NULL);
2529 
2530 	if (concurrent && get_rel_persistence(heapOid) != RELPERSISTENCE_TEMP)
2531 	{
2532 		result = ReindexRelationConcurrently(heapOid, options);
2533 
2534 		if (!result)
2535 			ereport(NOTICE,
2536 					(errmsg("table \"%s\" has no indexes that can be reindexed concurrently",
2537 							relation->relname)));
2538 	}
2539 	else
2540 	{
2541 		result = reindex_relation(heapOid,
2542 								  REINDEX_REL_PROCESS_TOAST |
2543 								  REINDEX_REL_CHECK_CONSTRAINTS,
2544 								  options | REINDEXOPT_REPORT_PROGRESS);
2545 		if (!result)
2546 			ereport(NOTICE,
2547 					(errmsg("table \"%s\" has no indexes to reindex",
2548 							relation->relname)));
2549 	}
2550 
2551 	return heapOid;
2552 }
2553 
2554 /*
2555  * ReindexMultipleTables
2556  *		Recreate indexes of tables selected by objectName/objectKind.
2557  *
2558  * To reduce the probability of deadlocks, each table is reindexed in a
2559  * separate transaction, so we can release the lock on it right away.
2560  * That means this must not be called within a user transaction block!
2561  */
2562 void
2563 ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
2564 					  int options, bool concurrent)
2565 {
2566 	Oid			objectOid;
2567 	Relation	relationRelation;
2568 	TableScanDesc scan;
2569 	ScanKeyData scan_keys[1];
2570 	HeapTuple	tuple;
2571 	MemoryContext private_context;
2572 	MemoryContext old;
2573 	List	   *relids = NIL;
2574 	ListCell   *l;
2575 	int			num_keys;
2576 	bool		concurrent_warning = false;
2577 
2578 	AssertArg(objectName);
2579 	Assert(objectKind == REINDEX_OBJECT_SCHEMA ||
2580 		   objectKind == REINDEX_OBJECT_SYSTEM ||
2581 		   objectKind == REINDEX_OBJECT_DATABASE);
2582 
2583 	if (objectKind == REINDEX_OBJECT_SYSTEM && concurrent)
2584 		ereport(ERROR,
2585 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2586 				 errmsg("cannot reindex system catalogs concurrently")));
2587 
2588 	/*
2589 	 * Get OID of object to reindex, being the database currently being used
2590 	 * by session for a database or for system catalogs, or the schema defined
2591 	 * by caller. At the same time do permission checks that need different
2592 	 * processing depending on the object type.
2593 	 */
2594 	if (objectKind == REINDEX_OBJECT_SCHEMA)
2595 	{
2596 		objectOid = get_namespace_oid(objectName, false);
2597 
2598 		if (!pg_namespace_ownercheck(objectOid, GetUserId()))
2599 			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
2600 						   objectName);
2601 	}
2602 	else
2603 	{
2604 		objectOid = MyDatabaseId;
2605 
2606 		if (strcmp(objectName, get_database_name(objectOid)) != 0)
2607 			ereport(ERROR,
2608 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2609 					 errmsg("can only reindex the currently open database")));
2610 		if (!pg_database_ownercheck(objectOid, GetUserId()))
2611 			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
2612 						   objectName);
2613 	}
2614 
2615 	/*
2616 	 * Create a memory context that will survive forced transaction commits we
2617 	 * do below.  Since it is a child of PortalContext, it will go away
2618 	 * eventually even if we suffer an error; there's no need for special
2619 	 * abort cleanup logic.
2620 	 */
2621 	private_context = AllocSetContextCreate(PortalContext,
2622 											"ReindexMultipleTables",
2623 											ALLOCSET_SMALL_SIZES);
2624 
2625 	/*
2626 	 * Define the search keys to find the objects to reindex. For a schema, we
2627 	 * select target relations using relnamespace, something not necessary for
2628 	 * a database-wide operation.
2629 	 */
2630 	if (objectKind == REINDEX_OBJECT_SCHEMA)
2631 	{
2632 		num_keys = 1;
2633 		ScanKeyInit(&scan_keys[0],
2634 					Anum_pg_class_relnamespace,
2635 					BTEqualStrategyNumber, F_OIDEQ,
2636 					ObjectIdGetDatum(objectOid));
2637 	}
2638 	else
2639 		num_keys = 0;
2640 
2641 	/*
2642 	 * Scan pg_class to build a list of the relations we need to reindex.
2643 	 *
2644 	 * We only consider plain relations and materialized views here (toast
2645 	 * rels will be processed indirectly by reindex_relation).
2646 	 */
2647 	relationRelation = table_open(RelationRelationId, AccessShareLock);
2648 	scan = table_beginscan_catalog(relationRelation, num_keys, scan_keys);
2649 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
2650 	{
2651 		Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
2652 		Oid			relid = classtuple->oid;
2653 
2654 		/*
2655 		 * Only regular tables and matviews can have indexes, so ignore any
2656 		 * other kind of relation.
2657 		 *
2658 		 * It is tempting to also consider partitioned tables here, but that
2659 		 * has the problem that if the children are in the same schema, they
2660 		 * would be processed twice.  Maybe we could have a separate list of
2661 		 * partitioned tables, and expand that afterwards into relids,
2662 		 * ignoring any duplicates.
2663 		 */
2664 		if (classtuple->relkind != RELKIND_RELATION &&
2665 			classtuple->relkind != RELKIND_MATVIEW)
2666 			continue;
2667 
2668 		/* Skip temp tables of other backends; we can't reindex them at all */
2669 		if (classtuple->relpersistence == RELPERSISTENCE_TEMP &&
2670 			!isTempNamespace(classtuple->relnamespace))
2671 			continue;
2672 
2673 		/* Check user/system classification, and optionally skip */
2674 		if (objectKind == REINDEX_OBJECT_SYSTEM &&
2675 			!IsSystemClass(relid, classtuple))
2676 			continue;
2677 
2678 		/*
2679 		 * The table can be reindexed if the user is superuser, the table
2680 		 * owner, or the database/schema owner (but in the latter case, only
2681 		 * if it's not a shared relation).  pg_class_ownercheck includes the
2682 		 * superuser case, and depending on objectKind we already know that
2683 		 * the user has permission to run REINDEX on this database or schema
2684 		 * per the permission checks at the beginning of this routine.
2685 		 */
2686 		if (classtuple->relisshared &&
2687 			!pg_class_ownercheck(relid, GetUserId()))
2688 			continue;
2689 
2690 		/*
2691 		 * Skip system tables, since index_create() would reject indexing them
2692 		 * concurrently (and it would likely fail if we tried).
2693 		 */
2694 		if (concurrent &&
2695 			IsCatalogRelationOid(relid))
2696 		{
2697 			if (!concurrent_warning)
2698 				ereport(WARNING,
2699 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2700 						 errmsg("cannot reindex system catalogs concurrently, skipping all")));
2701 			concurrent_warning = true;
2702 			continue;
2703 		}
2704 
2705 		/* Save the list of relation OIDs in private context */
2706 		old = MemoryContextSwitchTo(private_context);
2707 
2708 		/*
2709 		 * We always want to reindex pg_class first if it's selected to be
2710 		 * reindexed.  This ensures that if there is any corruption in
2711 		 * pg_class' indexes, they will be fixed before we process any other
2712 		 * tables.  This is critical because reindexing itself will try to
2713 		 * update pg_class.
2714 		 */
2715 		if (relid == RelationRelationId)
2716 			relids = lcons_oid(relid, relids);
2717 		else
2718 			relids = lappend_oid(relids, relid);
2719 
2720 		MemoryContextSwitchTo(old);
2721 	}
2722 	table_endscan(scan);
2723 	table_close(relationRelation, AccessShareLock);
2724 
2725 	/* Now reindex each rel in a separate transaction */
2726 	PopActiveSnapshot();
2727 	CommitTransactionCommand();
2728 	foreach(l, relids)
2729 	{
2730 		Oid			relid = lfirst_oid(l);
2731 
2732 		StartTransactionCommand();
2733 		/* functions in indexes may want a snapshot set */
2734 		PushActiveSnapshot(GetTransactionSnapshot());
2735 
2736 		if (concurrent && get_rel_persistence(relid) != RELPERSISTENCE_TEMP)
2737 		{
2738 			(void) ReindexRelationConcurrently(relid, options);
2739 			/* ReindexRelationConcurrently() does the verbose output */
2740 		}
2741 		else
2742 		{
2743 			bool		result;
2744 
2745 			result = reindex_relation(relid,
2746 									  REINDEX_REL_PROCESS_TOAST |
2747 									  REINDEX_REL_CHECK_CONSTRAINTS,
2748 									  options | REINDEXOPT_REPORT_PROGRESS);
2749 
2750 			if (result && (options & REINDEXOPT_VERBOSE))
2751 				ereport(INFO,
2752 						(errmsg("table \"%s.%s\" was reindexed",
2753 								get_namespace_name(get_rel_namespace(relid)),
2754 								get_rel_name(relid))));
2755 
2756 			PopActiveSnapshot();
2757 		}
2758 
2759 		CommitTransactionCommand();
2760 	}
2761 	StartTransactionCommand();
2762 
2763 	MemoryContextDelete(private_context);
2764 }
2765 
2766 
2767 /*
2768  * ReindexRelationConcurrently - process REINDEX CONCURRENTLY for given
2769  * relation OID
2770  *
2771  * 'relationOid' can either belong to an index, a table or a materialized
2772  * view.  For tables and materialized views, all its indexes will be rebuilt,
2773  * excluding invalid indexes and any indexes used in exclusion constraints,
2774  * but including its associated toast table indexes.  For indexes, the index
2775  * itself will be rebuilt.  If 'relationOid' belongs to a partitioned table
2776  * then we issue a warning to mention these are not yet supported.
2777  *
2778  * The locks taken on parent tables and involved indexes are kept until the
2779  * transaction is committed, at which point a session lock is taken on each
2780  * relation.  Both of these protect against concurrent schema changes.
2781  *
2782  * Returns true if any indexes have been rebuilt (including toast table's
2783  * indexes, when relevant), otherwise returns false.
2784  *
2785  * NOTE: This cannot be used on temporary relations.  A concurrent build would
2786  * cause issues with ON COMMIT actions triggered by the transactions of the
2787  * concurrent build.  Temporary relations are not subject to concurrent
2788  * concerns, so there's no need for the more complicated concurrent build,
2789  * anyway, and a non-concurrent reindex is more efficient.
2790  */
2791 static bool
2792 ReindexRelationConcurrently(Oid relationOid, int options)
2793 {
2794 	List	   *heapRelationIds = NIL;
2795 	List	   *indexIds = NIL;
2796 	List	   *newIndexIds = NIL;
2797 	List	   *relationLocks = NIL;
2798 	List	   *lockTags = NIL;
2799 	ListCell   *lc,
2800 			   *lc2;
2801 	MemoryContext private_context;
2802 	MemoryContext oldcontext;
2803 	char		relkind;
2804 	char	   *relationName = NULL;
2805 	char	   *relationNamespace = NULL;
2806 	PGRUsage	ru0;
2807 	const int	progress_index[] = {
2808 		PROGRESS_CREATEIDX_COMMAND,
2809 		PROGRESS_CREATEIDX_PHASE,
2810 		PROGRESS_CREATEIDX_INDEX_OID,
2811 		PROGRESS_CREATEIDX_ACCESS_METHOD_OID
2812 	};
2813 	int64		progress_vals[4];
2814 
2815 	/*
2816 	 * Create a memory context that will survive forced transaction commits we
2817 	 * do below.  Since it is a child of PortalContext, it will go away
2818 	 * eventually even if we suffer an error; there's no need for special
2819 	 * abort cleanup logic.
2820 	 */
2821 	private_context = AllocSetContextCreate(PortalContext,
2822 											"ReindexConcurrent",
2823 											ALLOCSET_SMALL_SIZES);
2824 
2825 	if (options & REINDEXOPT_VERBOSE)
2826 	{
2827 		/* Save data needed by REINDEX VERBOSE in private context */
2828 		oldcontext = MemoryContextSwitchTo(private_context);
2829 
2830 		relationName = get_rel_name(relationOid);
2831 		relationNamespace = get_namespace_name(get_rel_namespace(relationOid));
2832 
2833 		pg_rusage_init(&ru0);
2834 
2835 		MemoryContextSwitchTo(oldcontext);
2836 	}
2837 
2838 	relkind = get_rel_relkind(relationOid);
2839 
2840 	/*
2841 	 * Extract the list of indexes that are going to be rebuilt based on the
2842 	 * relation Oid given by caller.
2843 	 */
2844 	switch (relkind)
2845 	{
2846 		case RELKIND_RELATION:
2847 		case RELKIND_MATVIEW:
2848 		case RELKIND_TOASTVALUE:
2849 			{
2850 				/*
2851 				 * In the case of a relation, find all its indexes including
2852 				 * toast indexes.
2853 				 */
2854 				Relation	heapRelation;
2855 
2856 				/* Save the list of relation OIDs in private context */
2857 				oldcontext = MemoryContextSwitchTo(private_context);
2858 
2859 				/* Track this relation for session locks */
2860 				heapRelationIds = lappend_oid(heapRelationIds, relationOid);
2861 
2862 				MemoryContextSwitchTo(oldcontext);
2863 
2864 				if (IsCatalogRelationOid(relationOid))
2865 					ereport(ERROR,
2866 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2867 							 errmsg("cannot reindex system catalogs concurrently")));
2868 
2869 				/* Open relation to get its indexes */
2870 				heapRelation = table_open(relationOid, ShareUpdateExclusiveLock);
2871 
2872 				/* Add all the valid indexes of relation to list */
2873 				foreach(lc, RelationGetIndexList(heapRelation))
2874 				{
2875 					Oid			cellOid = lfirst_oid(lc);
2876 					Relation	indexRelation = index_open(cellOid,
2877 														   ShareUpdateExclusiveLock);
2878 
2879 					if (!indexRelation->rd_index->indisvalid)
2880 						ereport(WARNING,
2881 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2882 								 errmsg("cannot reindex invalid index \"%s.%s\" concurrently, skipping",
2883 										get_namespace_name(get_rel_namespace(cellOid)),
2884 										get_rel_name(cellOid))));
2885 					else if (indexRelation->rd_index->indisexclusion)
2886 						ereport(WARNING,
2887 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2888 								 errmsg("cannot reindex exclusion constraint index \"%s.%s\" concurrently, skipping",
2889 										get_namespace_name(get_rel_namespace(cellOid)),
2890 										get_rel_name(cellOid))));
2891 					else
2892 					{
2893 						/* Save the list of relation OIDs in private context */
2894 						oldcontext = MemoryContextSwitchTo(private_context);
2895 
2896 						indexIds = lappend_oid(indexIds, cellOid);
2897 
2898 						MemoryContextSwitchTo(oldcontext);
2899 					}
2900 
2901 					index_close(indexRelation, NoLock);
2902 				}
2903 
2904 				/* Also add the toast indexes */
2905 				if (OidIsValid(heapRelation->rd_rel->reltoastrelid))
2906 				{
2907 					Oid			toastOid = heapRelation->rd_rel->reltoastrelid;
2908 					Relation	toastRelation = table_open(toastOid,
2909 														   ShareUpdateExclusiveLock);
2910 
2911 					/* Save the list of relation OIDs in private context */
2912 					oldcontext = MemoryContextSwitchTo(private_context);
2913 
2914 					/* Track this relation for session locks */
2915 					heapRelationIds = lappend_oid(heapRelationIds, toastOid);
2916 
2917 					MemoryContextSwitchTo(oldcontext);
2918 
2919 					foreach(lc2, RelationGetIndexList(toastRelation))
2920 					{
2921 						Oid			cellOid = lfirst_oid(lc2);
2922 						Relation	indexRelation = index_open(cellOid,
2923 															   ShareUpdateExclusiveLock);
2924 
2925 						if (!indexRelation->rd_index->indisvalid)
2926 							ereport(WARNING,
2927 									(errcode(ERRCODE_INDEX_CORRUPTED),
2928 									 errmsg("cannot reindex invalid index \"%s.%s\" concurrently, skipping",
2929 											get_namespace_name(get_rel_namespace(cellOid)),
2930 											get_rel_name(cellOid))));
2931 						else
2932 						{
2933 							/*
2934 							 * Save the list of relation OIDs in private
2935 							 * context
2936 							 */
2937 							oldcontext = MemoryContextSwitchTo(private_context);
2938 
2939 							indexIds = lappend_oid(indexIds, cellOid);
2940 
2941 							MemoryContextSwitchTo(oldcontext);
2942 						}
2943 
2944 						index_close(indexRelation, NoLock);
2945 					}
2946 
2947 					table_close(toastRelation, NoLock);
2948 				}
2949 
2950 				table_close(heapRelation, NoLock);
2951 				break;
2952 			}
2953 		case RELKIND_INDEX:
2954 			{
2955 				Oid			heapId = IndexGetRelation(relationOid, false);
2956 
2957 				if (IsCatalogRelationOid(heapId))
2958 					ereport(ERROR,
2959 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2960 							 errmsg("cannot reindex system catalogs concurrently")));
2961 
2962 				/*
2963 				 * Don't allow reindex for an invalid index on TOAST table, as
2964 				 * if rebuilt it would not be possible to drop it.
2965 				 */
2966 				if (IsToastNamespace(get_rel_namespace(relationOid)) &&
2967 					!get_index_isvalid(relationOid))
2968 					ereport(ERROR,
2969 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2970 							 errmsg("cannot reindex invalid index on TOAST table concurrently")));
2971 
2972 				/* Save the list of relation OIDs in private context */
2973 				oldcontext = MemoryContextSwitchTo(private_context);
2974 
2975 				/* Track the heap relation of this index for session locks */
2976 				heapRelationIds = list_make1_oid(heapId);
2977 
2978 				/*
2979 				 * Save the list of relation OIDs in private context.  Note
2980 				 * that invalid indexes are allowed here.
2981 				 */
2982 				indexIds = lappend_oid(indexIds, relationOid);
2983 
2984 				MemoryContextSwitchTo(oldcontext);
2985 				break;
2986 			}
2987 		case RELKIND_PARTITIONED_TABLE:
2988 			/* see reindex_relation() */
2989 			ereport(WARNING,
2990 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2991 					 errmsg("REINDEX of partitioned tables is not yet implemented, skipping \"%s\"",
2992 							get_rel_name(relationOid))));
2993 			return false;
2994 		default:
2995 			/* Return error if type of relation is not supported */
2996 			ereport(ERROR,
2997 					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
2998 					 errmsg("cannot reindex this type of relation concurrently")));
2999 			break;
3000 	}
3001 
3002 	/* Definitely no indexes, so leave */
3003 	if (indexIds == NIL)
3004 	{
3005 		PopActiveSnapshot();
3006 		return false;
3007 	}
3008 
3009 	Assert(heapRelationIds != NIL);
3010 
3011 	/*-----
3012 	 * Now we have all the indexes we want to process in indexIds.
3013 	 *
3014 	 * The phases now are:
3015 	 *
3016 	 * 1. create new indexes in the catalog
3017 	 * 2. build new indexes
3018 	 * 3. let new indexes catch up with tuples inserted in the meantime
3019 	 * 4. swap index names
3020 	 * 5. mark old indexes as dead
3021 	 * 6. drop old indexes
3022 	 *
3023 	 * We process each phase for all indexes before moving to the next phase,
3024 	 * for efficiency.
3025 	 */
3026 
3027 	/*
3028 	 * Phase 1 of REINDEX CONCURRENTLY
3029 	 *
3030 	 * Create a new index with the same properties as the old one, but it is
3031 	 * only registered in catalogs and will be built later.  Then get session
3032 	 * locks on all involved tables.  See analogous code in DefineIndex() for
3033 	 * more detailed comments.
3034 	 */
3035 
3036 	foreach(lc, indexIds)
3037 	{
3038 		char	   *concurrentName;
3039 		Oid			indexId = lfirst_oid(lc);
3040 		Oid			newIndexId;
3041 		Relation	indexRel;
3042 		Relation	heapRel;
3043 		Relation	newIndexRel;
3044 		LockRelId  *lockrelid;
3045 
3046 		indexRel = index_open(indexId, ShareUpdateExclusiveLock);
3047 		heapRel = table_open(indexRel->rd_index->indrelid,
3048 							 ShareUpdateExclusiveLock);
3049 
3050 		/* This function shouldn't be called for temporary relations. */
3051 		if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
3052 			elog(ERROR, "cannot reindex a temporary table concurrently");
3053 
3054 		pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX,
3055 									  RelationGetRelid(heapRel));
3056 		progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
3057 		progress_vals[1] = 0;	/* initializing */
3058 		progress_vals[2] = indexId;
3059 		progress_vals[3] = indexRel->rd_rel->relam;
3060 		pgstat_progress_update_multi_param(4, progress_index, progress_vals);
3061 
3062 		/* Choose a temporary relation name for the new index */
3063 		concurrentName = ChooseRelationName(get_rel_name(indexId),
3064 											NULL,
3065 											"ccnew",
3066 											get_rel_namespace(indexRel->rd_index->indrelid),
3067 											false);
3068 
3069 		/* Create new index definition based on given index */
3070 		newIndexId = index_concurrently_create_copy(heapRel,
3071 													indexId,
3072 													concurrentName);
3073 
3074 		/*
3075 		 * Now open the relation of the new index, a session-level lock is
3076 		 * also needed on it.
3077 		 */
3078 		newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
3079 
3080 		/*
3081 		 * Save the list of OIDs and locks in private context
3082 		 */
3083 		oldcontext = MemoryContextSwitchTo(private_context);
3084 
3085 		newIndexIds = lappend_oid(newIndexIds, newIndexId);
3086 
3087 		/*
3088 		 * Save lockrelid to protect each relation from drop then close
3089 		 * relations. The lockrelid on parent relation is not taken here to
3090 		 * avoid multiple locks taken on the same relation, instead we rely on
3091 		 * parentRelationIds built earlier.
3092 		 */
3093 		lockrelid = palloc(sizeof(*lockrelid));
3094 		*lockrelid = indexRel->rd_lockInfo.lockRelId;
3095 		relationLocks = lappend(relationLocks, lockrelid);
3096 		lockrelid = palloc(sizeof(*lockrelid));
3097 		*lockrelid = newIndexRel->rd_lockInfo.lockRelId;
3098 		relationLocks = lappend(relationLocks, lockrelid);
3099 
3100 		MemoryContextSwitchTo(oldcontext);
3101 
3102 		index_close(indexRel, NoLock);
3103 		index_close(newIndexRel, NoLock);
3104 		table_close(heapRel, NoLock);
3105 	}
3106 
3107 	/*
3108 	 * Save the heap lock for following visibility checks with other backends
3109 	 * might conflict with this session.
3110 	 */
3111 	foreach(lc, heapRelationIds)
3112 	{
3113 		Relation	heapRelation = table_open(lfirst_oid(lc), ShareUpdateExclusiveLock);
3114 		LockRelId  *lockrelid;
3115 		LOCKTAG    *heaplocktag;
3116 
3117 		/* Save the list of locks in private context */
3118 		oldcontext = MemoryContextSwitchTo(private_context);
3119 
3120 		/* Add lockrelid of heap relation to the list of locked relations */
3121 		lockrelid = palloc(sizeof(*lockrelid));
3122 		*lockrelid = heapRelation->rd_lockInfo.lockRelId;
3123 		relationLocks = lappend(relationLocks, lockrelid);
3124 
3125 		heaplocktag = (LOCKTAG *) palloc(sizeof(LOCKTAG));
3126 
3127 		/* Save the LOCKTAG for this parent relation for the wait phase */
3128 		SET_LOCKTAG_RELATION(*heaplocktag, lockrelid->dbId, lockrelid->relId);
3129 		lockTags = lappend(lockTags, heaplocktag);
3130 
3131 		MemoryContextSwitchTo(oldcontext);
3132 
3133 		/* Close heap relation */
3134 		table_close(heapRelation, NoLock);
3135 	}
3136 
3137 	/* Get a session-level lock on each table. */
3138 	foreach(lc, relationLocks)
3139 	{
3140 		LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
3141 
3142 		LockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
3143 	}
3144 
3145 	PopActiveSnapshot();
3146 	CommitTransactionCommand();
3147 	StartTransactionCommand();
3148 
3149 	/*
3150 	 * Phase 2 of REINDEX CONCURRENTLY
3151 	 *
3152 	 * Build the new indexes in a separate transaction for each index to avoid
3153 	 * having open transactions for an unnecessary long time.  But before
3154 	 * doing that, wait until no running transactions could have the table of
3155 	 * the index open with the old list of indexes.  See "phase 2" in
3156 	 * DefineIndex() for more details.
3157 	 */
3158 
3159 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3160 								 PROGRESS_CREATEIDX_PHASE_WAIT_1);
3161 	WaitForLockersMultiple(lockTags, ShareLock, true);
3162 	CommitTransactionCommand();
3163 
3164 	foreach(lc, newIndexIds)
3165 	{
3166 		Relation	newIndexRel;
3167 		Oid			newIndexId = lfirst_oid(lc);
3168 		Oid			heapId;
3169 		Oid			indexam;
3170 
3171 		/* Start new transaction for this index's concurrent build */
3172 		StartTransactionCommand();
3173 
3174 		/*
3175 		 * Check for user-requested abort.  This is inside a transaction so as
3176 		 * xact.c does not issue a useless WARNING, and ensures that
3177 		 * session-level locks are cleaned up on abort.
3178 		 */
3179 		CHECK_FOR_INTERRUPTS();
3180 
3181 		/* Set ActiveSnapshot since functions in the indexes may need it */
3182 		PushActiveSnapshot(GetTransactionSnapshot());
3183 
3184 		/*
3185 		 * Index relation has been closed by previous commit, so reopen it to
3186 		 * get its information.
3187 		 */
3188 		newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
3189 		heapId = newIndexRel->rd_index->indrelid;
3190 		indexam = newIndexRel->rd_rel->relam;
3191 		index_close(newIndexRel, NoLock);
3192 
3193 		/*
3194 		 * Update progress for the index to build, with the correct parent
3195 		 * table involved.
3196 		 */
3197 		pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, heapId);
3198 		progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
3199 		progress_vals[1] = PROGRESS_CREATEIDX_PHASE_BUILD;
3200 		progress_vals[2] = newIndexId;
3201 		progress_vals[3] = indexam;
3202 		pgstat_progress_update_multi_param(4, progress_index, progress_vals);
3203 
3204 		/* Perform concurrent build of new index */
3205 		index_concurrently_build(heapId, newIndexId);
3206 
3207 		PopActiveSnapshot();
3208 		CommitTransactionCommand();
3209 	}
3210 	StartTransactionCommand();
3211 
3212 	/*
3213 	 * Phase 3 of REINDEX CONCURRENTLY
3214 	 *
3215 	 * During this phase the old indexes catch up with any new tuples that
3216 	 * were created during the previous phase.  See "phase 3" in DefineIndex()
3217 	 * for more details.
3218 	 */
3219 
3220 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3221 								 PROGRESS_CREATEIDX_PHASE_WAIT_2);
3222 	WaitForLockersMultiple(lockTags, ShareLock, true);
3223 	CommitTransactionCommand();
3224 
3225 	foreach(lc, newIndexIds)
3226 	{
3227 		Oid			newIndexId = lfirst_oid(lc);
3228 		Oid			heapId;
3229 		TransactionId limitXmin;
3230 		Snapshot	snapshot;
3231 		Relation	newIndexRel;
3232 		Oid			indexam;
3233 
3234 		StartTransactionCommand();
3235 
3236 		/*
3237 		 * Check for user-requested abort.  This is inside a transaction so as
3238 		 * xact.c does not issue a useless WARNING, and ensures that
3239 		 * session-level locks are cleaned up on abort.
3240 		 */
3241 		CHECK_FOR_INTERRUPTS();
3242 
3243 		/*
3244 		 * Take the "reference snapshot" that will be used by validate_index()
3245 		 * to filter candidate tuples.
3246 		 */
3247 		snapshot = RegisterSnapshot(GetTransactionSnapshot());
3248 		PushActiveSnapshot(snapshot);
3249 
3250 		/*
3251 		 * Index relation has been closed by previous commit, so reopen it to
3252 		 * get its information.
3253 		 */
3254 		newIndexRel = index_open(newIndexId, ShareUpdateExclusiveLock);
3255 		heapId = newIndexRel->rd_index->indrelid;
3256 		indexam = newIndexRel->rd_rel->relam;
3257 		index_close(newIndexRel, NoLock);
3258 
3259 		/*
3260 		 * Update progress for the index to build, with the correct parent
3261 		 * table involved.
3262 		 */
3263 		pgstat_progress_start_command(PROGRESS_COMMAND_CREATE_INDEX, heapId);
3264 		progress_vals[0] = PROGRESS_CREATEIDX_COMMAND_REINDEX_CONCURRENTLY;
3265 		progress_vals[1] = PROGRESS_CREATEIDX_PHASE_VALIDATE_IDXSCAN;
3266 		progress_vals[2] = newIndexId;
3267 		progress_vals[3] = indexam;
3268 		pgstat_progress_update_multi_param(4, progress_index, progress_vals);
3269 
3270 		validate_index(heapId, newIndexId, snapshot);
3271 
3272 		/*
3273 		 * We can now do away with our active snapshot, we still need to save
3274 		 * the xmin limit to wait for older snapshots.
3275 		 */
3276 		limitXmin = snapshot->xmin;
3277 
3278 		PopActiveSnapshot();
3279 		UnregisterSnapshot(snapshot);
3280 
3281 		/*
3282 		 * To ensure no deadlocks, we must commit and start yet another
3283 		 * transaction, and do our wait before any snapshot has been taken in
3284 		 * it.
3285 		 */
3286 		CommitTransactionCommand();
3287 		StartTransactionCommand();
3288 
3289 		/*
3290 		 * The index is now valid in the sense that it contains all currently
3291 		 * interesting tuples.  But since it might not contain tuples deleted
3292 		 * just before the reference snap was taken, we have to wait out any
3293 		 * transactions that might have older snapshots.
3294 		 */
3295 		pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3296 									 PROGRESS_CREATEIDX_PHASE_WAIT_3);
3297 		WaitForOlderSnapshots(limitXmin, true);
3298 
3299 		CommitTransactionCommand();
3300 	}
3301 
3302 	/*
3303 	 * Phase 4 of REINDEX CONCURRENTLY
3304 	 *
3305 	 * Now that the new indexes have been validated, swap each new index with
3306 	 * its corresponding old index.
3307 	 *
3308 	 * We mark the new indexes as valid and the old indexes as not valid at
3309 	 * the same time to make sure we only get constraint violations from the
3310 	 * indexes with the correct names.
3311 	 */
3312 
3313 	StartTransactionCommand();
3314 
3315 	forboth(lc, indexIds, lc2, newIndexIds)
3316 	{
3317 		char	   *oldName;
3318 		Oid			oldIndexId = lfirst_oid(lc);
3319 		Oid			newIndexId = lfirst_oid(lc2);
3320 		Oid			heapId;
3321 
3322 		/*
3323 		 * Check for user-requested abort.  This is inside a transaction so as
3324 		 * xact.c does not issue a useless WARNING, and ensures that
3325 		 * session-level locks are cleaned up on abort.
3326 		 */
3327 		CHECK_FOR_INTERRUPTS();
3328 
3329 		heapId = IndexGetRelation(oldIndexId, false);
3330 
3331 		/* Choose a relation name for old index */
3332 		oldName = ChooseRelationName(get_rel_name(oldIndexId),
3333 									 NULL,
3334 									 "ccold",
3335 									 get_rel_namespace(heapId),
3336 									 false);
3337 
3338 		/*
3339 		 * Swap old index with the new one.  This also marks the new one as
3340 		 * valid and the old one as not valid.
3341 		 */
3342 		index_concurrently_swap(newIndexId, oldIndexId, oldName);
3343 
3344 		/*
3345 		 * Invalidate the relcache for the table, so that after this commit
3346 		 * all sessions will refresh any cached plans that might reference the
3347 		 * index.
3348 		 */
3349 		CacheInvalidateRelcacheByRelid(heapId);
3350 
3351 		/*
3352 		 * CCI here so that subsequent iterations see the oldName in the
3353 		 * catalog and can choose a nonconflicting name for their oldName.
3354 		 * Otherwise, this could lead to conflicts if a table has two indexes
3355 		 * whose names are equal for the first NAMEDATALEN-minus-a-few
3356 		 * characters.
3357 		 */
3358 		CommandCounterIncrement();
3359 	}
3360 
3361 	/* Commit this transaction and make index swaps visible */
3362 	CommitTransactionCommand();
3363 	StartTransactionCommand();
3364 
3365 	/*
3366 	 * Phase 5 of REINDEX CONCURRENTLY
3367 	 *
3368 	 * Mark the old indexes as dead.  First we must wait until no running
3369 	 * transaction could be using the index for a query.  See also
3370 	 * index_drop() for more details.
3371 	 */
3372 
3373 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3374 								 PROGRESS_CREATEIDX_PHASE_WAIT_4);
3375 	WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
3376 
3377 	foreach(lc, indexIds)
3378 	{
3379 		Oid			oldIndexId = lfirst_oid(lc);
3380 		Oid			heapId;
3381 
3382 		/*
3383 		 * Check for user-requested abort.  This is inside a transaction so as
3384 		 * xact.c does not issue a useless WARNING, and ensures that
3385 		 * session-level locks are cleaned up on abort.
3386 		 */
3387 		CHECK_FOR_INTERRUPTS();
3388 
3389 		heapId = IndexGetRelation(oldIndexId, false);
3390 		index_concurrently_set_dead(heapId, oldIndexId);
3391 	}
3392 
3393 	/* Commit this transaction to make the updates visible. */
3394 	CommitTransactionCommand();
3395 	StartTransactionCommand();
3396 
3397 	/*
3398 	 * Phase 6 of REINDEX CONCURRENTLY
3399 	 *
3400 	 * Drop the old indexes.
3401 	 */
3402 
3403 	pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE,
3404 								 PROGRESS_CREATEIDX_PHASE_WAIT_5);
3405 	WaitForLockersMultiple(lockTags, AccessExclusiveLock, true);
3406 
3407 	PushActiveSnapshot(GetTransactionSnapshot());
3408 
3409 	{
3410 		ObjectAddresses *objects = new_object_addresses();
3411 
3412 		foreach(lc, indexIds)
3413 		{
3414 			Oid			oldIndexId = lfirst_oid(lc);
3415 			ObjectAddress object;
3416 
3417 			object.classId = RelationRelationId;
3418 			object.objectId = oldIndexId;
3419 			object.objectSubId = 0;
3420 
3421 			add_exact_object_address(&object, objects);
3422 		}
3423 
3424 		/*
3425 		 * Use PERFORM_DELETION_CONCURRENT_LOCK so that index_drop() uses the
3426 		 * right lock level.
3427 		 */
3428 		performMultipleDeletions(objects, DROP_RESTRICT,
3429 								 PERFORM_DELETION_CONCURRENT_LOCK | PERFORM_DELETION_INTERNAL);
3430 	}
3431 
3432 	PopActiveSnapshot();
3433 	CommitTransactionCommand();
3434 
3435 	/*
3436 	 * Finally, release the session-level lock on the table.
3437 	 */
3438 	foreach(lc, relationLocks)
3439 	{
3440 		LockRelId  *lockrelid = (LockRelId *) lfirst(lc);
3441 
3442 		UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
3443 	}
3444 
3445 	/* Start a new transaction to finish process properly */
3446 	StartTransactionCommand();
3447 
3448 	/* Log what we did */
3449 	if (options & REINDEXOPT_VERBOSE)
3450 	{
3451 		if (relkind == RELKIND_INDEX)
3452 			ereport(INFO,
3453 					(errmsg("index \"%s.%s\" was reindexed",
3454 							relationNamespace, relationName),
3455 					 errdetail("%s.",
3456 							   pg_rusage_show(&ru0))));
3457 		else
3458 		{
3459 			foreach(lc, newIndexIds)
3460 			{
3461 				Oid			indOid = lfirst_oid(lc);
3462 
3463 				ereport(INFO,
3464 						(errmsg("index \"%s.%s\" was reindexed",
3465 								get_namespace_name(get_rel_namespace(indOid)),
3466 								get_rel_name(indOid))));
3467 				/* Don't show rusage here, since it's not per index. */
3468 			}
3469 
3470 			ereport(INFO,
3471 					(errmsg("table \"%s.%s\" was reindexed",
3472 							relationNamespace, relationName),
3473 					 errdetail("%s.",
3474 							   pg_rusage_show(&ru0))));
3475 		}
3476 	}
3477 
3478 	MemoryContextDelete(private_context);
3479 
3480 	pgstat_progress_end_command();
3481 
3482 	return true;
3483 }
3484 
3485 /*
3486  *	ReindexPartitionedIndex
3487  *		Reindex each child of the given partitioned index.
3488  *
3489  * Not yet implemented.
3490  */
3491 static void
3492 ReindexPartitionedIndex(Relation parentIdx)
3493 {
3494 	ereport(ERROR,
3495 			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3496 			 errmsg("REINDEX is not yet implemented for partitioned indexes")));
3497 }
3498 
3499 /*
3500  * Insert or delete an appropriate pg_inherits tuple to make the given index
3501  * be a partition of the indicated parent index.
3502  *
3503  * This also corrects the pg_depend information for the affected index.
3504  */
3505 void
3506 IndexSetParentIndex(Relation partitionIdx, Oid parentOid)
3507 {
3508 	Relation	pg_inherits;
3509 	ScanKeyData key[2];
3510 	SysScanDesc scan;
3511 	Oid			partRelid = RelationGetRelid(partitionIdx);
3512 	HeapTuple	tuple;
3513 	bool		fix_dependencies;
3514 
3515 	/* Make sure this is an index */
3516 	Assert(partitionIdx->rd_rel->relkind == RELKIND_INDEX ||
3517 		   partitionIdx->rd_rel->relkind == RELKIND_PARTITIONED_INDEX);
3518 
3519 	/*
3520 	 * Scan pg_inherits for rows linking our index to some parent.
3521 	 */
3522 	pg_inherits = relation_open(InheritsRelationId, RowExclusiveLock);
3523 	ScanKeyInit(&key[0],
3524 				Anum_pg_inherits_inhrelid,
3525 				BTEqualStrategyNumber, F_OIDEQ,
3526 				ObjectIdGetDatum(partRelid));
3527 	ScanKeyInit(&key[1],
3528 				Anum_pg_inherits_inhseqno,
3529 				BTEqualStrategyNumber, F_INT4EQ,
3530 				Int32GetDatum(1));
3531 	scan = systable_beginscan(pg_inherits, InheritsRelidSeqnoIndexId, true,
3532 							  NULL, 2, key);
3533 	tuple = systable_getnext(scan);
3534 
3535 	if (!HeapTupleIsValid(tuple))
3536 	{
3537 		if (parentOid == InvalidOid)
3538 		{
3539 			/*
3540 			 * No pg_inherits row, and no parent wanted: nothing to do in this
3541 			 * case.
3542 			 */
3543 			fix_dependencies = false;
3544 		}
3545 		else
3546 		{
3547 			StoreSingleInheritance(partRelid, parentOid, 1);
3548 			fix_dependencies = true;
3549 		}
3550 	}
3551 	else
3552 	{
3553 		Form_pg_inherits inhForm = (Form_pg_inherits) GETSTRUCT(tuple);
3554 
3555 		if (parentOid == InvalidOid)
3556 		{
3557 			/*
3558 			 * There exists a pg_inherits row, which we want to clear; do so.
3559 			 */
3560 			CatalogTupleDelete(pg_inherits, &tuple->t_self);
3561 			fix_dependencies = true;
3562 		}
3563 		else
3564 		{
3565 			/*
3566 			 * A pg_inherits row exists.  If it's the same we want, then we're
3567 			 * good; if it differs, that amounts to a corrupt catalog and
3568 			 * should not happen.
3569 			 */
3570 			if (inhForm->inhparent != parentOid)
3571 			{
3572 				/* unexpected: we should not get called in this case */
3573 				elog(ERROR, "bogus pg_inherit row: inhrelid %u inhparent %u",
3574 					 inhForm->inhrelid, inhForm->inhparent);
3575 			}
3576 
3577 			/* already in the right state */
3578 			fix_dependencies = false;
3579 		}
3580 	}
3581 
3582 	/* done with pg_inherits */
3583 	systable_endscan(scan);
3584 	relation_close(pg_inherits, RowExclusiveLock);
3585 
3586 	/* set relhassubclass if an index partition has been added to the parent */
3587 	if (OidIsValid(parentOid))
3588 		SetRelationHasSubclass(parentOid, true);
3589 
3590 	/* set relispartition correctly on the partition */
3591 	update_relispartition(partRelid, OidIsValid(parentOid));
3592 
3593 	if (fix_dependencies)
3594 	{
3595 		/*
3596 		 * Insert/delete pg_depend rows.  If setting a parent, add PARTITION
3597 		 * dependencies on the parent index and the table; if removing a
3598 		 * parent, delete PARTITION dependencies.
3599 		 */
3600 		if (OidIsValid(parentOid))
3601 		{
3602 			ObjectAddress partIdx;
3603 			ObjectAddress parentIdx;
3604 			ObjectAddress partitionTbl;
3605 
3606 			ObjectAddressSet(partIdx, RelationRelationId, partRelid);
3607 			ObjectAddressSet(parentIdx, RelationRelationId, parentOid);
3608 			ObjectAddressSet(partitionTbl, RelationRelationId,
3609 							 partitionIdx->rd_index->indrelid);
3610 			recordDependencyOn(&partIdx, &parentIdx,
3611 							   DEPENDENCY_PARTITION_PRI);
3612 			recordDependencyOn(&partIdx, &partitionTbl,
3613 							   DEPENDENCY_PARTITION_SEC);
3614 		}
3615 		else
3616 		{
3617 			deleteDependencyRecordsForClass(RelationRelationId, partRelid,
3618 											RelationRelationId,
3619 											DEPENDENCY_PARTITION_PRI);
3620 			deleteDependencyRecordsForClass(RelationRelationId, partRelid,
3621 											RelationRelationId,
3622 											DEPENDENCY_PARTITION_SEC);
3623 		}
3624 
3625 		/* make our updates visible */
3626 		CommandCounterIncrement();
3627 	}
3628 }
3629 
3630 /*
3631  * Subroutine of IndexSetParentIndex to update the relispartition flag of the
3632  * given index to the given value.
3633  */
3634 static void
3635 update_relispartition(Oid relationId, bool newval)
3636 {
3637 	HeapTuple	tup;
3638 	Relation	classRel;
3639 
3640 	classRel = table_open(RelationRelationId, RowExclusiveLock);
3641 	tup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(relationId));
3642 	if (!HeapTupleIsValid(tup))
3643 		elog(ERROR, "cache lookup failed for relation %u", relationId);
3644 	Assert(((Form_pg_class) GETSTRUCT(tup))->relispartition != newval);
3645 	((Form_pg_class) GETSTRUCT(tup))->relispartition = newval;
3646 	CatalogTupleUpdate(classRel, &tup->t_self, tup);
3647 	heap_freetuple(tup);
3648 	table_close(classRel, RowExclusiveLock);
3649 }
3650