1 /*
2  * pg_repack: lib/repack.c
3  *
4  * Portions Copyright (c) 2008-2011, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
5  * Portions Copyright (c) 2011, Itagaki Takahiro
6  * Portions Copyright (c) 2012-2020, The Reorg Development Team
7  */
8 
9 #include "postgres.h"
10 
11 #include <unistd.h>
12 
13 #include "access/genam.h"
14 #include "access/transam.h"
15 #include "access/xact.h"
16 #include "catalog/dependency.h"
17 #include "catalog/indexing.h"
18 #include "catalog/namespace.h"
19 
20 /*
21  * heap_open/heap_close was moved to table_open/table_close in 12.0
22  */
23 #if PG_VERSION_NUM >= 120000
24 #include "access/table.h"
25 #endif
26 
27 /*
28  * utils/rel.h no longer includes pg_am.h as of 9.6, so need to include
29  * it explicitly.
30  */
31 #if PG_VERSION_NUM >= 90600
32 #include "catalog/pg_am.h"
33 #endif
34 /*
35  * catalog/pg_foo_fn.h headers was merged back into pg_foo.h headers
36  */
37 #if PG_VERSION_NUM >= 110000
38 #include "catalog/pg_inherits.h"
39 #else
40 #include "catalog/pg_inherits_fn.h"
41 #endif
42 #include "catalog/pg_namespace.h"
43 #include "catalog/pg_opclass.h"
44 #include "catalog/pg_type.h"
45 #include "commands/tablecmds.h"
46 #include "commands/trigger.h"
47 #include "miscadmin.h"
48 #include "storage/lmgr.h"
49 #include "utils/array.h"
50 #include "utils/builtins.h"
51 #include "utils/guc.h"
52 #include "utils/lsyscache.h"
53 #include "utils/rel.h"
54 #include "utils/relcache.h"
55 #include "utils/syscache.h"
56 
57 #include "pgut/pgut-spi.h"
58 #include "pgut/pgut-be.h"
59 
60 #include "access/htup_details.h"
61 
62 /* builtins.h was reorganized for 9.5, so now we need this header */
63 #if PG_VERSION_NUM >= 90500
64 #include "utils/ruleutils.h"
65 #endif
66 
67 PG_MODULE_MAGIC;
68 
69 extern Datum PGUT_EXPORT repack_version(PG_FUNCTION_ARGS);
70 extern Datum PGUT_EXPORT repack_trigger(PG_FUNCTION_ARGS);
71 extern Datum PGUT_EXPORT repack_apply(PG_FUNCTION_ARGS);
72 extern Datum PGUT_EXPORT repack_get_order_by(PG_FUNCTION_ARGS);
73 extern Datum PGUT_EXPORT repack_indexdef(PG_FUNCTION_ARGS);
74 extern Datum PGUT_EXPORT repack_swap(PG_FUNCTION_ARGS);
75 extern Datum PGUT_EXPORT repack_drop(PG_FUNCTION_ARGS);
76 extern Datum PGUT_EXPORT repack_disable_autovacuum(PG_FUNCTION_ARGS);
77 extern Datum PGUT_EXPORT repack_index_swap(PG_FUNCTION_ARGS);
78 extern Datum PGUT_EXPORT repack_get_table_and_inheritors(PG_FUNCTION_ARGS);
79 
80 PG_FUNCTION_INFO_V1(repack_version);
81 PG_FUNCTION_INFO_V1(repack_trigger);
82 PG_FUNCTION_INFO_V1(repack_apply);
83 PG_FUNCTION_INFO_V1(repack_get_order_by);
84 PG_FUNCTION_INFO_V1(repack_indexdef);
85 PG_FUNCTION_INFO_V1(repack_swap);
86 PG_FUNCTION_INFO_V1(repack_drop);
87 PG_FUNCTION_INFO_V1(repack_disable_autovacuum);
88 PG_FUNCTION_INFO_V1(repack_index_swap);
89 PG_FUNCTION_INFO_V1(repack_get_table_and_inheritors);
90 
91 static void	repack_init(void);
92 static SPIPlanPtr repack_prepare(const char *src, int nargs, Oid *argtypes);
93 static const char *get_quoted_relname(Oid oid);
94 static const char *get_quoted_nspname(Oid oid);
95 static void swap_heap_or_index_files(Oid r1, Oid r2);
96 
97 #define copy_tuple(tuple, desc) \
98 	PointerGetDatum(SPI_returntuple((tuple), (desc)))
99 
100 #define IsToken(c) \
101 	(IS_HIGHBIT_SET((c)) || isalnum((unsigned char) (c)) || (c) == '_')
102 
103 /* check access authority */
104 static void
must_be_superuser(const char * func)105 must_be_superuser(const char *func)
106 {
107 	if (!superuser())
108 		elog(ERROR, "must be superuser to use %s function", func);
109 }
110 
111 
112 /* The API of RenameRelationInternal() was changed in 9.2.
113  * Use the RENAME_REL macro for compatibility across versions.
114  */
115 #if PG_VERSION_NUM < 120000
116 #define RENAME_REL(relid, newrelname) RenameRelationInternal(relid, newrelname, true);
117 #else
118 #define RENAME_REL(relid, newrelname) RenameRelationInternal(relid, newrelname, true, false);
119 #endif
120 /*
121  * is_index flag was added in 12.0, prefer separate macro for relation and index
122  */
123 #if PG_VERSION_NUM < 120000
124 #define RENAME_INDEX(relid, newrelname) RENAME_REL(relid, newrelname);
125 #else
126 #define RENAME_INDEX(relid, newrelname) RenameRelationInternal(relid, newrelname, true, true);
127 #endif
128 
129 #ifdef REPACK_VERSION
130 /* macro trick to stringify a macro expansion */
131 #define xstr(s) str(s)
132 #define str(s) #s
133 #define LIBRARY_VERSION xstr(REPACK_VERSION)
134 #else
135 #define LIBRARY_VERSION "unknown"
136 #endif
137 
138 Datum
repack_version(PG_FUNCTION_ARGS)139 repack_version(PG_FUNCTION_ARGS)
140 {
141 	return CStringGetTextDatum("pg_repack " LIBRARY_VERSION);
142 }
143 
144 /**
145  * @fn      Datum repack_trigger(PG_FUNCTION_ARGS)
146  * @brief   Insert a operation log into log-table.
147  *
148  * repack_trigger(sql)
149  *
150  * @param	sql	SQL to insert a operation log into log-table.
151  */
152 Datum
repack_trigger(PG_FUNCTION_ARGS)153 repack_trigger(PG_FUNCTION_ARGS)
154 {
155 	TriggerData	   *trigdata = (TriggerData *) fcinfo->context;
156 	TupleDesc		desc;
157 	HeapTuple		tuple;
158 	Datum			values[2];
159 	bool			nulls[2] = { 0, 0 };
160 	Oid				argtypes[2];
161 	const char	   *sql;
162 
163 	/* authority check */
164 	must_be_superuser("repack_trigger");
165 
166 	/* make sure it's called as a trigger at all */
167 	if (!CALLED_AS_TRIGGER(fcinfo) ||
168 		!TRIGGER_FIRED_AFTER(trigdata->tg_event) ||
169 		!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event) ||
170 		trigdata->tg_trigger->tgnargs != 1)
171 		elog(ERROR, "repack_trigger: invalid trigger call");
172 
173 	/* retrieve parameters */
174 	sql = trigdata->tg_trigger->tgargs[0];
175 	desc = RelationGetDescr(trigdata->tg_relation);
176 	argtypes[0] = argtypes[1] = trigdata->tg_relation->rd_rel->reltype;
177 
178 	/* connect to SPI manager */
179 	repack_init();
180 
181 	if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
182 	{
183 		/* INSERT: (NULL, newtup) */
184 		tuple = trigdata->tg_trigtuple;
185 		nulls[0] = true;
186 		values[1] = copy_tuple(tuple, desc);
187 	}
188 	else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
189 	{
190 		/* DELETE: (oldtup, NULL) */
191 		tuple = trigdata->tg_trigtuple;
192 		values[0] = copy_tuple(tuple, desc);
193 		nulls[1] = true;
194 	}
195 	else
196 	{
197 		/* UPDATE: (oldtup, newtup) */
198 		tuple = trigdata->tg_newtuple;
199 		values[0] = copy_tuple(trigdata->tg_trigtuple, desc);
200 		values[1] = copy_tuple(tuple, desc);
201 	}
202 
203 	/* INSERT INTO repack.log VALUES ($1, $2) */
204 	execute_with_args(SPI_OK_INSERT, sql, 2, argtypes, values, nulls);
205 
206 	SPI_finish();
207 
208 	PG_RETURN_POINTER(tuple);
209 }
210 
211 /**
212  * @fn      Datum repack_apply(PG_FUNCTION_ARGS)
213  * @brief   Apply operations in log table into temp table.
214  *
215  * repack_apply(sql_peek, sql_insert, sql_delete, sql_update, sql_pop,  count)
216  *
217  * @param	sql_peek	SQL to pop tuple from log table.
218  * @param	sql_insert	SQL to insert into temp table.
219  * @param	sql_delete	SQL to delete from temp table.
220  * @param	sql_update	SQL to update temp table.
221  * @param	sql_pop	SQL to bulk-delete tuples from log table.
222  * @param	count		Max number of operations, or no count iff <=0.
223  * @retval				Number of performed operations.
224  */
225 Datum
repack_apply(PG_FUNCTION_ARGS)226 repack_apply(PG_FUNCTION_ARGS)
227 {
228 #define DEFAULT_PEEK_COUNT	1000
229 
230 	const char *sql_peek = PG_GETARG_CSTRING(0);
231 	const char *sql_insert = PG_GETARG_CSTRING(1);
232 	const char *sql_delete = PG_GETARG_CSTRING(2);
233 	const char *sql_update = PG_GETARG_CSTRING(3);
234 	/* sql_pop, the fourth arg, will be used in the loop below */
235 	int32		count = PG_GETARG_INT32(5);
236 
237 	SPIPlanPtr		plan_peek = NULL;
238 	SPIPlanPtr		plan_insert = NULL;
239 	SPIPlanPtr		plan_delete = NULL;
240 	SPIPlanPtr		plan_update = NULL;
241 	uint32			n, i;
242 	Oid				argtypes_peek[1] = { INT4OID };
243 	Datum			values_peek[1];
244 	const char			nulls_peek[1] = { 0 };
245 	StringInfoData		sql_pop;
246 
247 	initStringInfo(&sql_pop);
248 
249 	/* authority check */
250 	must_be_superuser("repack_apply");
251 
252 	/* connect to SPI manager */
253 	repack_init();
254 
255 	/* peek tuple in log */
256 	plan_peek = repack_prepare(sql_peek, 1, argtypes_peek);
257 
258 	for (n = 0;;)
259 	{
260 		int				ntuples;
261 		SPITupleTable  *tuptable;
262 		TupleDesc		desc;
263 		Oid				argtypes[3];	/* id, pk, row */
264 		Datum			values[3];		/* id, pk, row */
265 		bool			nulls[3];		/* id, pk, row */
266 
267 		/* peek tuple in log */
268 		if (count <= 0)
269 			values_peek[0] = Int32GetDatum(DEFAULT_PEEK_COUNT);
270 		else
271 			values_peek[0] = Int32GetDatum(Min(count - n, DEFAULT_PEEK_COUNT));
272 
273 		execute_plan(SPI_OK_SELECT, plan_peek, values_peek, nulls_peek);
274 		if (SPI_processed <= 0)
275 			break;
276 
277 		/* copy tuptable because we will call other sqls. */
278 		ntuples = SPI_processed;
279 		tuptable = SPI_tuptable;
280 		desc = tuptable->tupdesc;
281 		argtypes[0] = SPI_gettypeid(desc, 1);	/* id */
282 		argtypes[1] = SPI_gettypeid(desc, 2);	/* pk */
283 		argtypes[2] = SPI_gettypeid(desc, 3);	/* row */
284 
285 		resetStringInfo(&sql_pop);
286 		appendStringInfoString(&sql_pop, PG_GETARG_CSTRING(4));
287 
288 		for (i = 0; i < ntuples; i++, n++)
289 		{
290 			HeapTuple	tuple;
291 			char *pkid;
292 
293 			tuple = tuptable->vals[i];
294 			values[0] = SPI_getbinval(tuple, desc, 1, &nulls[0]);
295 			values[1] = SPI_getbinval(tuple, desc, 2, &nulls[1]);
296 			values[2] = SPI_getbinval(tuple, desc, 3, &nulls[2]);
297 
298 			pkid = SPI_getvalue(tuple, desc, 1);
299 			Assert(pkid != NULL);
300 
301 			if (nulls[1])
302 			{
303 				/* INSERT */
304 				if (plan_insert == NULL)
305 					plan_insert = repack_prepare(sql_insert, 1, &argtypes[2]);
306 				execute_plan(SPI_OK_INSERT, plan_insert, &values[2], (nulls[2] ? "n" : " "));
307 			}
308 			else if (nulls[2])
309 			{
310 				/* DELETE */
311 				if (plan_delete == NULL)
312 					plan_delete = repack_prepare(sql_delete, 1, &argtypes[1]);
313 				execute_plan(SPI_OK_DELETE, plan_delete, &values[1], (nulls[1] ? "n" : " "));
314 			}
315 			else
316 			{
317 				/* UPDATE */
318 				if (plan_update == NULL)
319 					plan_update = repack_prepare(sql_update, 2, &argtypes[1]);
320 				execute_plan(SPI_OK_UPDATE, plan_update, &values[1], (nulls[1] ? "n" : " "));
321 			}
322 
323 			/* Add the primary key ID of each row from the log
324 			 * table we have processed so far to this
325 			 * DELETE ... IN (...) query string, so we
326 			 * can delete all the rows we have processed at-once.
327 			 */
328 			if (i == 0)
329 				appendStringInfoString(&sql_pop, pkid);
330 			else
331 				appendStringInfo(&sql_pop, ",%s", pkid);
332 			pfree(pkid);
333 		}
334 		/* i must be > 0 (and hence we must have some rows to delete)
335 		 * since SPI_processed > 0
336 		 */
337 		Assert(i > 0);
338 		appendStringInfoString(&sql_pop, ");");
339 
340 		/* Bulk delete of processed rows from the log table */
341 		execute(SPI_OK_DELETE, sql_pop.data);
342 
343 		SPI_freetuptable(tuptable);
344 	}
345 
346 	SPI_finish();
347 
348 	PG_RETURN_INT32(n);
349 }
350 
351 /*
352  * Parsed CREATE INDEX statement. You can rebuild sql using
353  * sprintf(buf, "%s %s ON %s USING %s (%s)%s",
354  *		create, index, table type, columns, options)
355  */
356 typedef struct IndexDef
357 {
358 	char *create;	/* CREATE INDEX or CREATE UNIQUE INDEX */
359 	char *index;	/* index name including schema */
360 	char *table;	/* table name including schema */
361 	char *type;		/* btree, hash, gist or gin */
362 	char *columns;	/* column definition */
363 	char *options;	/* options after columns, before TABLESPACE (e.g. COLLATE) */
364 	char *tablespace; /* tablespace if specified */
365 	char *where;	/* WHERE content if specified */
366 } IndexDef;
367 
368 static char *
get_relation_name(Oid relid)369 get_relation_name(Oid relid)
370 {
371 	Oid		nsp = get_rel_namespace(relid);
372 	char   *nspname;
373 	char   *strver;
374 	int ver;
375 
376 	/* Get the version of the running server (PG_VERSION_NUM would return
377 	 * the version we compiled the extension with) */
378 	strver = GetConfigOptionByName("server_version_num", NULL
379 #if PG_VERSION_NUM >= 90600
380 		, false	    /* missing_ok */
381 #endif
382 	);
383 
384 	ver = atoi(strver);
385 	pfree(strver);
386 
387 	/*
388 	 * Relation names given by PostgreSQL core are always
389 	 * qualified since some minor releases. Note that this change
390 	 * wasn't introduced in PostgreSQL 9.2 and 9.1 releases.
391 	 */
392 	if ((ver >= 100000 && ver < 100003) ||
393 		(ver >= 90600 && ver < 90608) ||
394 		(ver >= 90500 && ver < 90512) ||
395 		(ver >= 90400 && ver < 90417) ||
396 		(ver >= 90300 && ver < 90322) ||
397 		(ver >= 90200 && ver < 90300) ||
398 		(ver >= 90100 && ver < 90200))
399 	{
400 		/* Qualify the name if not visible in search path */
401 		if (RelationIsVisible(relid))
402 			nspname = NULL;
403 		else
404 			nspname = get_namespace_name(nsp);
405 	}
406 	else
407 	{
408 		/* Always qualify the name */
409 		if (OidIsValid(nsp))
410 			nspname = get_namespace_name(nsp);
411 		else
412 			nspname = NULL;
413 	}
414 
415 	return quote_qualified_identifier(nspname, get_rel_name(relid));
416 }
417 
418 static char *
parse_error(Oid index)419 parse_error(Oid index)
420 {
421 	elog(ERROR, "unexpected index definition: %s", pg_get_indexdef_string(index));
422 	return NULL;
423 }
424 
425 static char *
skip_const(Oid index,char * sql,const char * arg1,const char * arg2)426 skip_const(Oid index, char *sql, const char *arg1, const char *arg2)
427 {
428 	size_t	len;
429 
430 	if ((arg1 && strncmp(sql, arg1, (len = strlen(arg1))) == 0) ||
431 		(arg2 && strncmp(sql, arg2, (len = strlen(arg2))) == 0))
432 	{
433 		sql[len] = '\0';
434 		return sql + len + 1;
435 	}
436 
437 	/* error */
438 	return parse_error(index);
439 }
440 
441 static char *
skip_until_const(Oid index,char * sql,const char * what)442 skip_until_const(Oid index, char *sql, const char *what)
443 {
444 	char *pos;
445 
446 	if ((pos = strstr(sql, what)))
447 	{
448 		size_t	len;
449 
450 		len = strlen(what);
451 		pos[-1] = '\0';
452 		return pos + len + 1;
453 	}
454 
455 	/* error */
456 	return parse_error(index);
457 }
458 
459 static char *
skip_ident(Oid index,char * sql)460 skip_ident(Oid index, char *sql)
461 {
462 	while (*sql && isspace((unsigned char) *sql))
463 		sql++;
464 
465 	if (*sql == '"')
466 	{
467 		sql++;
468 		for (;;)
469 		{
470 			char *end = strchr(sql, '"');
471 			if (end == NULL)
472 				return parse_error(index);
473 			else if (end[1] != '"')
474 			{
475 				end[1] = '\0';
476 				return end + 2;
477 			}
478 			else	/* escaped quote ("") */
479 				sql = end + 2;
480 		}
481 	}
482 	else
483 	{
484 		while (*sql && IsToken(*sql))
485 			sql++;
486 		*sql = '\0';
487 		return sql + 1;
488 	}
489 
490 	/* error */
491 	return parse_error(index);
492 }
493 
494 /*
495  * Skip until 'end' character found. The 'end' character is replaced with \0.
496  * Returns the next character of the 'end', or NULL if 'end' is not found.
497  */
498 static char *
skip_until(Oid index,char * sql,char end)499 skip_until(Oid index, char *sql, char end)
500 {
501 	char	instr = 0;
502 	int		nopen = 0;
503 
504 	for (; *sql && (nopen > 0 || instr != 0 || *sql != end); sql++)
505 	{
506 		if (instr)
507 		{
508 			if (sql[0] == instr)
509 			{
510 				if (sql[1] == instr)
511 					sql++;
512 				else
513 					instr = 0;
514 			}
515 			else if (sql[0] == '\\')
516 				sql++;	/* next char is always string */
517 		}
518 		else
519 		{
520 			switch (sql[0])
521 			{
522 				case '(':
523 					nopen++;
524 					break;
525 				case ')':
526 					nopen--;
527 					break;
528 				case '\'':
529 				case '"':
530 					instr = sql[0];
531 					break;
532 			}
533 		}
534 	}
535 
536 	if (nopen == 0 && instr == 0)
537 	{
538 		if (*sql)
539 		{
540 			*sql = '\0';
541 			return sql + 1;
542 		}
543 		else
544 			return NULL;
545 	}
546 
547 	/* error */
548 	return parse_error(index);
549 }
550 
551 static void
parse_indexdef(IndexDef * stmt,Oid index,Oid table)552 parse_indexdef(IndexDef *stmt, Oid index, Oid table)
553 {
554 	char *sql = pg_get_indexdef_string(index);
555 	const char *idxname = get_quoted_relname(index);
556 	const char *tblname = get_relation_name(table);
557 	const char *limit = strchr(sql, '\0');
558 
559 	/* CREATE [UNIQUE] INDEX */
560 	stmt->create = sql;
561 	sql = skip_const(index, sql, "CREATE INDEX", "CREATE UNIQUE INDEX");
562 	/* index */
563 	stmt->index = sql;
564 	sql = skip_const(index, sql, idxname, NULL);
565 	/* ON */
566 	sql = skip_const(index, sql, "ON", NULL);
567 	/* table */
568 	stmt->table = sql;
569 	sql = skip_const(index, sql, tblname, NULL);
570 	/* USING */
571 	sql = skip_const(index, sql, "USING", NULL);
572 	/* type */
573 	stmt->type = sql;
574 	sql = skip_ident(index, sql);
575 	/* (columns) */
576 	if ((sql = strchr(sql, '(')) == NULL)
577 		parse_error(index);
578 	sql++;
579 	stmt->columns = sql;
580 	if ((sql = skip_until(index, sql, ')')) == NULL)
581 		parse_error(index);
582 
583 	/* options */
584 	stmt->options = sql;
585 	stmt->tablespace = NULL;
586 	stmt->where = NULL;
587 
588 	/* Is there a tablespace? Note that apparently there is never, but
589 	 * if there was one it would appear here. */
590 	if (sql < limit && strstr(sql, "TABLESPACE"))
591 	{
592 		sql = skip_until_const(index, sql, "TABLESPACE");
593 		stmt->tablespace = sql;
594 		sql = skip_ident(index, sql);
595 	}
596 
597 	/* Note: assuming WHERE is the only clause allowed after TABLESPACE */
598 	if (sql < limit && strstr(sql, "WHERE"))
599 	{
600 		sql = skip_until_const(index, sql, "WHERE");
601 		stmt->where = sql;
602 	}
603 
604 	elog(DEBUG2, "indexdef.create  = %s", stmt->create);
605 	elog(DEBUG2, "indexdef.index   = %s", stmt->index);
606 	elog(DEBUG2, "indexdef.table   = %s", stmt->table);
607 	elog(DEBUG2, "indexdef.type    = %s", stmt->type);
608 	elog(DEBUG2, "indexdef.columns = %s", stmt->columns);
609 	elog(DEBUG2, "indexdef.options = %s", stmt->options);
610 	elog(DEBUG2, "indexdef.tspace  = %s", stmt->tablespace);
611 	elog(DEBUG2, "indexdef.where   = %s", stmt->where);
612 }
613 
614 /*
615  * Parse the trailing ... [ COLLATE X ] [ DESC ] [ NULLS { FIRST | LAST } ] from an index
616  * definition column.
617  * Returned values point to token. \0's are inserted to separate parsed parts.
618  */
619 static void
parse_indexdef_col(char * token,char ** desc,char ** nulls,char ** collate)620 parse_indexdef_col(char *token, char **desc, char **nulls, char **collate)
621 {
622 	char *pos;
623 
624 	/* easier to walk backwards than to parse quotes and escapes... */
625 	if (NULL != (pos = strstr(token, " NULLS FIRST")))
626 	{
627 		*nulls = pos + 1;
628 		*pos = '\0';
629 	}
630 	else if (NULL != (pos = strstr(token, " NULLS LAST")))
631 	{
632 		*nulls = pos + 1;
633 		*pos = '\0';
634 	}
635 	if (NULL != (pos = strstr(token, " DESC")))
636 	{
637 		*desc = pos + 1;
638 		*pos = '\0';
639 	}
640 	if (NULL != (pos = strstr(token, " COLLATE ")))
641 	{
642 		*collate = pos + 1;
643 		*pos = '\0';
644 	}
645 }
646 
647 /**
648  * @fn      Datum repack_get_order_by(PG_FUNCTION_ARGS)
649  * @brief   Get key definition of the index.
650  *
651  * repack_get_order_by(index, table)
652  *
653  * @param	index	Oid of target index.
654  * @param	table	Oid of table of the index.
655  * @retval			Create index DDL for temp table.
656  */
657 Datum
repack_get_order_by(PG_FUNCTION_ARGS)658 repack_get_order_by(PG_FUNCTION_ARGS)
659 {
660 	Oid				index = PG_GETARG_OID(0);
661 	Oid				table = PG_GETARG_OID(1);
662 	IndexDef		stmt;
663 	char		   *token;
664 	char		   *next;
665 	StringInfoData	str;
666 	Relation		indexRel = NULL;
667 	int				nattr;
668 
669 	parse_indexdef(&stmt, index, table);
670 
671 	/*
672 	 * FIXME: this is very unreliable implementation but I don't want to
673 	 * re-implement customized versions of pg_get_indexdef_string...
674 	 */
675 
676 	initStringInfo(&str);
677 	for (nattr = 0, next = stmt.columns; next; nattr++)
678 	{
679 		char *opcname;
680 		char *coldesc = NULL;
681 		char *colnulls = NULL;
682 		char *colcollate = NULL;
683 
684 		token = next;
685 		while (isspace((unsigned char) *token))
686 			token++;
687 		next = skip_until(index, next, ',');
688 		parse_indexdef_col(token, &coldesc, &colnulls, &colcollate);
689 		opcname = skip_until(index, token, ' ');
690 		appendStringInfoString(&str, token);
691 		if (colcollate)
692 			appendStringInfo(&str, " %s", colcollate);
693 		if (coldesc)
694 			appendStringInfo(&str, " %s", coldesc);
695 		if (opcname)
696 		{
697 			/* lookup default operator name from operator class */
698 
699 			Oid				opclass;
700 			Oid				oprid;
701 			int16			strategy = BTLessStrategyNumber;
702 			Oid				opcintype;
703 			Oid				opfamily;
704 			HeapTuple		tp;
705 			Form_pg_opclass	opclassTup;
706 
707 			opclass = OpclassnameGetOpcid(BTREE_AM_OID, opcname);
708 
709 			/* Retrieve operator information. */
710 			tp = SearchSysCache(CLAOID, ObjectIdGetDatum(opclass), 0, 0, 0);
711 			if (!HeapTupleIsValid(tp))
712 				elog(ERROR, "cache lookup failed for opclass %u", opclass);
713 			opclassTup = (Form_pg_opclass) GETSTRUCT(tp);
714 			opfamily = opclassTup->opcfamily;
715 			opcintype = opclassTup->opcintype;
716 			ReleaseSysCache(tp);
717 
718 			if (!OidIsValid(opcintype))
719 			{
720 				if (indexRel == NULL)
721 					indexRel = index_open(index, NoLock);
722 
723 #if PG_VERSION_NUM >= 110000
724 				opcintype = TupleDescAttr(RelationGetDescr(indexRel), nattr)->atttypid;
725 #else
726 				opcintype = RelationGetDescr(indexRel)->attrs[nattr]->atttypid;
727 #endif
728 			}
729 
730 			oprid = get_opfamily_member(opfamily, opcintype, opcintype, strategy);
731 			if (!OidIsValid(oprid))
732 				elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
733 					 strategy, opcintype, opcintype, opfamily);
734 
735 			opcname[-1] = '\0';
736 			appendStringInfo(&str, " USING %s", get_opname(oprid));
737 		}
738 		if (colnulls)
739 			appendStringInfo(&str, " %s", colnulls);
740 		if (next)
741 			appendStringInfoString(&str, ", ");
742 	}
743 
744 	if (indexRel != NULL)
745 		index_close(indexRel, NoLock);
746 
747 	PG_RETURN_TEXT_P(cstring_to_text(str.data));
748 }
749 
750 /**
751  * @fn      Datum repack_indexdef(PG_FUNCTION_ARGS)
752  * @brief   Reproduce DDL that create index at the temp table.
753  *
754  * repack_indexdef(index, table)
755  *
756  * @param	index		Oid of target index.
757  * @param	table		Oid of table of the index.
758  * @param	tablespace	Namespace for the index. If NULL keep the original.
759  * @param   boolean		Whether to use CONCURRENTLY when creating the index.
760  * @retval			Create index DDL for temp table.
761  */
762 Datum
repack_indexdef(PG_FUNCTION_ARGS)763 repack_indexdef(PG_FUNCTION_ARGS)
764 {
765 	Oid				index;
766 	Oid				table;
767 	Name			tablespace = NULL;
768 	IndexDef		stmt;
769 	StringInfoData	str;
770 	bool			concurrent_index = PG_GETARG_BOOL(3);
771 
772 	if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
773 		PG_RETURN_NULL();
774 
775 	index = PG_GETARG_OID(0);
776 	table = PG_GETARG_OID(1);
777 
778 	if (!PG_ARGISNULL(2))
779 		tablespace = PG_GETARG_NAME(2);
780 
781 	parse_indexdef(&stmt, index, table);
782 
783 	initStringInfo(&str);
784 	if (concurrent_index)
785 		appendStringInfo(&str, "%s CONCURRENTLY index_%u ON %s USING %s (%s)%s",
786 			stmt.create, index, stmt.table, stmt.type, stmt.columns, stmt.options);
787 	else
788 		appendStringInfo(&str, "%s index_%u ON repack.table_%u USING %s (%s)%s",
789 			stmt.create, index, table, stmt.type, stmt.columns, stmt.options);
790 
791 	/* specify the new tablespace or the original one if any */
792 	if (tablespace || stmt.tablespace)
793 		appendStringInfo(&str, " TABLESPACE %s",
794 			(tablespace ? NameStr(*tablespace) : stmt.tablespace));
795 
796 	if (stmt.where)
797 		appendStringInfo(&str, " WHERE %s", stmt.where);
798 
799 	PG_RETURN_TEXT_P(cstring_to_text(str.data));
800 }
801 
802 static Oid
getoid(HeapTuple tuple,TupleDesc desc,int column)803 getoid(HeapTuple tuple, TupleDesc desc, int column)
804 {
805 	bool	isnull;
806 	Datum	datum = SPI_getbinval(tuple, desc, column, &isnull);
807 	return isnull ? InvalidOid : DatumGetObjectId(datum);
808 }
809 
810 /**
811  * @fn      Datum repack_swap(PG_FUNCTION_ARGS)
812  * @brief   Swapping relfilenode of tables and relation ids of toast tables
813  *          and toast indexes.
814  *
815  * repack_swap(oid, relname)
816  *
817  * TODO: remove useless CommandCounterIncrement().
818  *
819  * @param	oid		Oid of table of target.
820  * @retval			None.
821  */
822 Datum
repack_swap(PG_FUNCTION_ARGS)823 repack_swap(PG_FUNCTION_ARGS)
824 {
825 	Oid				oid = PG_GETARG_OID(0);
826 	const char	   *relname = get_quoted_relname(oid);
827 	const char	   *nspname = get_quoted_nspname(oid);
828 	Oid 			argtypes[1] = { OIDOID };
829 	bool	 		nulls[1] = { 0 };
830 	Datum	 		values[1];
831 	SPITupleTable  *tuptable;
832 	TupleDesc		desc;
833 	HeapTuple		tuple;
834 	uint32			records;
835 	uint32			i;
836 
837 	Oid				reltoastrelid1;
838 	Oid				reltoastidxid1;
839 	Oid				oid2;
840 	Oid				reltoastrelid2;
841 	Oid				reltoastidxid2;
842 	Oid				owner1;
843 	Oid				owner2;
844 
845 	/* authority check */
846 	must_be_superuser("repack_swap");
847 
848 	/* connect to SPI manager */
849 	repack_init();
850 
851 	/* swap relfilenode and dependencies for tables. */
852 	values[0] = ObjectIdGetDatum(oid);
853 	execute_with_args(SPI_OK_SELECT,
854 		"SELECT X.reltoastrelid, TX.indexrelid, X.relowner,"
855 		"       Y.oid, Y.reltoastrelid, TY.indexrelid, Y.relowner"
856 		"  FROM pg_catalog.pg_class X LEFT JOIN pg_catalog.pg_index TX"
857 		"         ON X.reltoastrelid = TX.indrelid AND TX.indisvalid,"
858 		"       pg_catalog.pg_class Y LEFT JOIN pg_catalog.pg_index TY"
859 		"         ON Y.reltoastrelid = TY.indrelid AND TY.indisvalid"
860 		" WHERE X.oid = $1"
861 		"   AND Y.oid = ('repack.table_' || X.oid)::regclass",
862 		1, argtypes, values, nulls);
863 
864 	tuptable = SPI_tuptable;
865 	desc = tuptable->tupdesc;
866 	records = SPI_processed;
867 
868 	if (records == 0)
869 		elog(ERROR, "repack_swap : no swap target");
870 
871 	tuple = tuptable->vals[0];
872 
873 	reltoastrelid1 = getoid(tuple, desc, 1);
874 	reltoastidxid1 = getoid(tuple, desc, 2);
875 	owner1 = getoid(tuple, desc, 3);
876 	oid2 = getoid(tuple, desc, 4);
877 	reltoastrelid2 = getoid(tuple, desc, 5);
878 	reltoastidxid2 = getoid(tuple, desc, 6);
879 	owner2 = getoid(tuple, desc, 7);
880 
881 	/* change owner of new relation to original owner */
882 	if (owner1 != owner2)
883 	{
884 		ATExecChangeOwner(oid2, owner1, true, AccessExclusiveLock);
885 		CommandCounterIncrement();
886 	}
887 
888 	/* swap tables. */
889 	swap_heap_or_index_files(oid, oid2);
890 	CommandCounterIncrement();
891 
892 	/* swap indexes. */
893 	values[0] = ObjectIdGetDatum(oid);
894 	execute_with_args(SPI_OK_SELECT,
895 		"SELECT X.oid, Y.oid"
896 		"  FROM pg_catalog.pg_index I,"
897 		"       pg_catalog.pg_class X,"
898 		"       pg_catalog.pg_class Y"
899 		" WHERE I.indrelid = $1"
900 		"   AND I.indexrelid = X.oid"
901 		"   AND I.indisvalid"
902 		"   AND Y.oid = ('repack.index_' || X.oid)::regclass",
903 		1, argtypes, values, nulls);
904 
905 	tuptable = SPI_tuptable;
906 	desc = tuptable->tupdesc;
907 	records = SPI_processed;
908 
909 	for (i = 0; i < records; i++)
910 	{
911 		Oid		idx1, idx2;
912 
913 		tuple = tuptable->vals[i];
914 		idx1 = getoid(tuple, desc, 1);
915 		idx2 = getoid(tuple, desc, 2);
916 		swap_heap_or_index_files(idx1, idx2);
917 
918 		CommandCounterIncrement();
919 	}
920 
921 	/* swap names for toast tables and toast indexes */
922 	if (reltoastrelid1 == InvalidOid)
923 	{
924 		if (reltoastidxid1 != InvalidOid ||
925 			reltoastrelid2 != InvalidOid ||
926 			reltoastidxid2 != InvalidOid)
927 			elog(ERROR, "repack_swap : unexpected toast relations (T1=%u, I1=%u, T2=%u, I2=%u",
928 				reltoastrelid1, reltoastidxid1, reltoastrelid2, reltoastidxid2);
929 		/* do nothing */
930 	}
931 	else if (reltoastrelid2 == InvalidOid)
932 	{
933 		char	name[NAMEDATALEN];
934 
935 		if (reltoastidxid1 == InvalidOid ||
936 			reltoastidxid2 != InvalidOid)
937 			elog(ERROR, "repack_swap : unexpected toast relations (T1=%u, I1=%u, T2=%u, I2=%u",
938 				reltoastrelid1, reltoastidxid1, reltoastrelid2, reltoastidxid2);
939 
940 		/* rename X to Y */
941 		snprintf(name, NAMEDATALEN, "pg_toast_%u", oid2);
942 		RENAME_REL(reltoastrelid1, name);
943 		snprintf(name, NAMEDATALEN, "pg_toast_%u_index", oid2);
944 		RENAME_INDEX(reltoastidxid1, name);
945 		CommandCounterIncrement();
946 	}
947 	else if (reltoastrelid1 != InvalidOid)
948 	{
949 		char	name[NAMEDATALEN];
950 		int		pid = getpid();
951 
952 		/* rename X to TEMP */
953 		snprintf(name, NAMEDATALEN, "pg_toast_pid%d", pid);
954 		RENAME_REL(reltoastrelid1, name);
955 		snprintf(name, NAMEDATALEN, "pg_toast_pid%d_index", pid);
956 		RENAME_INDEX(reltoastidxid1, name);
957 		CommandCounterIncrement();
958 
959 		/* rename Y to X */
960 		snprintf(name, NAMEDATALEN, "pg_toast_%u", oid);
961 		RENAME_REL(reltoastrelid2, name);
962 		snprintf(name, NAMEDATALEN, "pg_toast_%u_index", oid);
963 		RENAME_INDEX(reltoastidxid2, name);
964 		CommandCounterIncrement();
965 
966 		/* rename TEMP to Y */
967 		snprintf(name, NAMEDATALEN, "pg_toast_%u", oid2);
968 		RENAME_REL(reltoastrelid1, name);
969 		snprintf(name, NAMEDATALEN, "pg_toast_%u_index", oid2);
970 		RENAME_INDEX(reltoastidxid1, name);
971 		CommandCounterIncrement();
972 	}
973 
974 	/* drop repack trigger */
975 	execute_with_format(
976 		SPI_OK_UTILITY,
977 		"DROP TRIGGER IF EXISTS repack_trigger ON %s.%s CASCADE",
978 		nspname, relname);
979 
980 	SPI_finish();
981 
982 	PG_RETURN_VOID();
983 }
984 
985 /**
986  * @fn      Datum repack_drop(PG_FUNCTION_ARGS)
987  * @brief   Delete temporarily objects.
988  *
989  * repack_drop(oid, relname)
990  *
991  * @param	oid		Oid of target table.
992  * @retval			None.
993  */
994 Datum
repack_drop(PG_FUNCTION_ARGS)995 repack_drop(PG_FUNCTION_ARGS)
996 {
997 	Oid			oid = PG_GETARG_OID(0);
998 	int			numobj = PG_GETARG_INT32(1);
999 	const char *relname = get_quoted_relname(oid);
1000 	const char *nspname = get_quoted_nspname(oid);
1001 
1002 	if (!(relname && nspname))
1003 	{
1004 		elog(ERROR, "table name not found for OID %u", oid);
1005 		PG_RETURN_VOID();
1006 	}
1007 
1008 	/* authority check */
1009 	must_be_superuser("repack_drop");
1010 
1011 	/* connect to SPI manager */
1012 	repack_init();
1013 
1014 	/*
1015 	 * To prevent concurrent lockers of the repack target table from causing
1016 	 * deadlocks, take an exclusive lock on it. Consider that the following
1017 	 * commands take exclusive lock on tables log_xxx and the target table
1018 	 * itself when deleting the repack_trigger on it, while concurrent
1019 	 * updaters require row exclusive lock on the target table and in
1020 	 * addition, on the log_xxx table, because of the trigger.
1021 	 *
1022 	 * Consider how a deadlock could occur - if the DROP TABLE repack.log_%u
1023 	 * gets a lock on log_%u table before a concurrent updater could get it
1024 	 * but after the updater has obtained a lock on the target table, the
1025 	 * subsequent DROP TRIGGER ... ON target-table would report a deadlock as
1026 	 * it finds itself waiting for a lock on target-table held by the updater,
1027 	 * which in turn, is waiting for lock on log_%u table.
1028 	 *
1029 	 * Fixes deadlock mentioned in the Github issue #55.
1030 	 *
1031 	 * Skip the lock if we are not going to do anything.
1032 	 * Otherwise, if repack gets accidentally run twice for the same table
1033 	 * at the same time, the second repack, in order to perform
1034 	 * a pointless cleanup, has to wait until the first one completes.
1035 	 * This adds an ACCESS EXCLUSIVE lock request into the queue
1036 	 * making the table effectively inaccessible for any other backend.
1037 	 */
1038 	if (numobj > 0)
1039 	{
1040 		execute_with_format(
1041 			SPI_OK_UTILITY,
1042 			"LOCK TABLE %s.%s IN ACCESS EXCLUSIVE MODE",
1043 			nspname, relname);
1044 	}
1045 
1046 	/* drop log table: must be done before dropping the pk type,
1047 	 * since the log table is dependent on the pk type. (That's
1048 	 * why we check numobj > 1 here.)
1049 	 */
1050 	if (numobj > 1)
1051 	{
1052 		execute_with_format(
1053 			SPI_OK_UTILITY,
1054 			"DROP TABLE IF EXISTS repack.log_%u CASCADE",
1055 			oid);
1056 		--numobj;
1057 	}
1058 
1059 	/* drop type for pk type */
1060 	if (numobj > 0)
1061 	{
1062 		execute_with_format(
1063 			SPI_OK_UTILITY,
1064 			"DROP TYPE IF EXISTS repack.pk_%u",
1065 			oid);
1066 		--numobj;
1067 	}
1068 
1069 	/*
1070 	 * drop repack trigger: We have already dropped the trigger in normal
1071 	 * cases, but it can be left on error.
1072 	 */
1073 	if (numobj > 0)
1074 	{
1075 		execute_with_format(
1076 			SPI_OK_UTILITY,
1077 			"DROP TRIGGER IF EXISTS repack_trigger ON %s.%s CASCADE",
1078 			nspname, relname);
1079 		--numobj;
1080 	}
1081 
1082 	/* drop temp table */
1083 	if (numobj > 0)
1084 	{
1085 		execute_with_format(
1086 			SPI_OK_UTILITY,
1087 			"DROP TABLE IF EXISTS repack.table_%u CASCADE",
1088 			oid);
1089 		--numobj;
1090 	}
1091 
1092 	SPI_finish();
1093 
1094 	PG_RETURN_VOID();
1095 }
1096 
1097 Datum
repack_disable_autovacuum(PG_FUNCTION_ARGS)1098 repack_disable_autovacuum(PG_FUNCTION_ARGS)
1099 {
1100 	Oid			oid = PG_GETARG_OID(0);
1101 
1102 	/* connect to SPI manager */
1103 	repack_init();
1104 
1105 	execute_with_format(
1106 		SPI_OK_UTILITY,
1107 		"ALTER TABLE %s SET (autovacuum_enabled = off)",
1108 		get_relation_name(oid));
1109 
1110 	SPI_finish();
1111 
1112 	PG_RETURN_VOID();
1113 }
1114 
1115 /* init SPI */
1116 static void
repack_init(void)1117 repack_init(void)
1118 {
1119 	int		ret = SPI_connect();
1120 	if (ret != SPI_OK_CONNECT)
1121 		elog(ERROR, "pg_repack: SPI_connect returned %d", ret);
1122 }
1123 
1124 /* prepare plan */
1125 static SPIPlanPtr
repack_prepare(const char * src,int nargs,Oid * argtypes)1126 repack_prepare(const char *src, int nargs, Oid *argtypes)
1127 {
1128 	SPIPlanPtr	plan = SPI_prepare(src, nargs, argtypes);
1129 	if (plan == NULL)
1130 		elog(ERROR, "pg_repack: repack_prepare failed (code=%d, query=%s)", SPI_result, src);
1131 	return plan;
1132 }
1133 
1134 static const char *
get_quoted_relname(Oid oid)1135 get_quoted_relname(Oid oid)
1136 {
1137 	const char *relname = get_rel_name(oid);
1138 	return (relname ? quote_identifier(relname) : NULL);
1139 }
1140 
1141 static const char *
get_quoted_nspname(Oid oid)1142 get_quoted_nspname(Oid oid)
1143 {
1144 	const char *nspname = get_namespace_name(get_rel_namespace(oid));
1145 	return (nspname ? quote_identifier(nspname) : NULL);
1146 }
1147 
1148 /*
1149  * This is a copy of swap_relation_files in cluster.c, but it also swaps
1150  * relfrozenxid.
1151  */
1152 static void
swap_heap_or_index_files(Oid r1,Oid r2)1153 swap_heap_or_index_files(Oid r1, Oid r2)
1154 {
1155 	Relation	relRelation;
1156 	HeapTuple	reltup1,
1157 				reltup2;
1158 	Form_pg_class relform1,
1159 				relform2;
1160 	Oid			swaptemp;
1161 	CatalogIndexState indstate;
1162 
1163 	/* We need writable copies of both pg_class tuples. */
1164 #if PG_VERSION_NUM >= 120000
1165 	relRelation = table_open(RelationRelationId, RowExclusiveLock);
1166 #else
1167 	relRelation = heap_open(RelationRelationId, RowExclusiveLock);
1168 #endif
1169 
1170 	reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
1171 	if (!HeapTupleIsValid(reltup1))
1172 		elog(ERROR, "cache lookup failed for relation %u", r1);
1173 	relform1 = (Form_pg_class) GETSTRUCT(reltup1);
1174 
1175 	reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
1176 	if (!HeapTupleIsValid(reltup2))
1177 		elog(ERROR, "cache lookup failed for relation %u", r2);
1178 	relform2 = (Form_pg_class) GETSTRUCT(reltup2);
1179 
1180 	Assert(relform1->relkind == relform2->relkind);
1181 
1182 	/*
1183 	 * Actually swap the fields in the two tuples
1184 	 */
1185 	swaptemp = relform1->relfilenode;
1186 	relform1->relfilenode = relform2->relfilenode;
1187 	relform2->relfilenode = swaptemp;
1188 
1189 	swaptemp = relform1->reltablespace;
1190 	relform1->reltablespace = relform2->reltablespace;
1191 	relform2->reltablespace = swaptemp;
1192 
1193 	swaptemp = relform1->reltoastrelid;
1194 	relform1->reltoastrelid = relform2->reltoastrelid;
1195 	relform2->reltoastrelid = swaptemp;
1196 
1197 	/* set rel1's frozen Xid to larger one */
1198 	if (TransactionIdIsNormal(relform1->relfrozenxid))
1199 	{
1200 		if (TransactionIdFollows(relform1->relfrozenxid,
1201 								 relform2->relfrozenxid))
1202 			relform1->relfrozenxid = relform2->relfrozenxid;
1203 		else
1204 			relform2->relfrozenxid = relform1->relfrozenxid;
1205 	}
1206 
1207 	/* swap size statistics too, since new rel has freshly-updated stats */
1208 	{
1209 		int32		swap_pages;
1210 		float4		swap_tuples;
1211 
1212 		swap_pages = relform1->relpages;
1213 		relform1->relpages = relform2->relpages;
1214 		relform2->relpages = swap_pages;
1215 
1216 		swap_tuples = relform1->reltuples;
1217 		relform1->reltuples = relform2->reltuples;
1218 		relform2->reltuples = swap_tuples;
1219 	}
1220 
1221 	indstate = CatalogOpenIndexes(relRelation);
1222 
1223 #if PG_VERSION_NUM < 100000
1224 
1225 	/* Update the tuples in pg_class */
1226 	simple_heap_update(relRelation, &reltup1->t_self, reltup1);
1227 	simple_heap_update(relRelation, &reltup2->t_self, reltup2);
1228 
1229 	/* Keep system catalogs current */
1230 	CatalogIndexInsert(indstate, reltup1);
1231 	CatalogIndexInsert(indstate, reltup2);
1232 
1233 #else
1234 
1235 	CatalogTupleUpdateWithInfo(relRelation, &reltup1->t_self, reltup1, indstate);
1236 	CatalogTupleUpdateWithInfo(relRelation, &reltup2->t_self, reltup2, indstate);
1237 
1238 #endif
1239 
1240 	CatalogCloseIndexes(indstate);
1241 
1242 	/*
1243 	 * If we have toast tables associated with the relations being swapped,
1244 	 * change their dependency links to re-associate them with their new
1245 	 * owning relations.  Otherwise the wrong one will get dropped ...
1246 	 *
1247 	 * NOTE: it is possible that only one table has a toast table; this can
1248 	 * happen in CLUSTER if there were dropped columns in the old table, and
1249 	 * in ALTER TABLE when adding or changing type of columns.
1250 	 *
1251 	 * NOTE: at present, a TOAST table's only dependency is the one on its
1252 	 * owning table.  If more are ever created, we'd need to use something
1253 	 * more selective than deleteDependencyRecordsFor() to get rid of only the
1254 	 * link we want.
1255 	 */
1256 	if (relform1->reltoastrelid || relform2->reltoastrelid)
1257 	{
1258 		ObjectAddress baseobject,
1259 					toastobject;
1260 		long		count;
1261 
1262 		/* Delete old dependencies */
1263 		if (relform1->reltoastrelid)
1264 		{
1265 			count = deleteDependencyRecordsFor(RelationRelationId,
1266 											   relform1->reltoastrelid,
1267 											   false);
1268 			if (count != 1)
1269 				elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1270 					 count);
1271 		}
1272 		if (relform2->reltoastrelid)
1273 		{
1274 			count = deleteDependencyRecordsFor(RelationRelationId,
1275 											   relform2->reltoastrelid,
1276 											   false);
1277 			if (count != 1)
1278 				elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1279 					 count);
1280 		}
1281 
1282 		/* Register new dependencies */
1283 		baseobject.classId = RelationRelationId;
1284 		baseobject.objectSubId = 0;
1285 		toastobject.classId = RelationRelationId;
1286 		toastobject.objectSubId = 0;
1287 
1288 		if (relform1->reltoastrelid)
1289 		{
1290 			baseobject.objectId = r1;
1291 			toastobject.objectId = relform1->reltoastrelid;
1292 			recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
1293 		}
1294 
1295 		if (relform2->reltoastrelid)
1296 		{
1297 			baseobject.objectId = r2;
1298 			toastobject.objectId = relform2->reltoastrelid;
1299 			recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
1300 		}
1301 	}
1302 
1303 	/*
1304 	 * Blow away the old relcache entries now.	We need this kluge because
1305 	 * relcache.c keeps a link to the smgr relation for the physical file, and
1306 	 * that will be out of date as soon as we do CommandCounterIncrement.
1307 	 * Whichever of the rels is the second to be cleared during cache
1308 	 * invalidation will have a dangling reference to an already-deleted smgr
1309 	 * relation.  Rather than trying to avoid this by ordering operations just
1310 	 * so, it's easiest to not have the relcache entries there at all.
1311 	 * (Fortunately, since one of the entries is local in our transaction,
1312 	 * it's sufficient to clear out our own relcache this way; the problem
1313 	 * cannot arise for other backends when they see our update on the
1314 	 * non-local relation.)
1315 	 */
1316 	RelationForgetRelation(r1);
1317 	RelationForgetRelation(r2);
1318 
1319 	/* Clean up. */
1320 	heap_freetuple(reltup1);
1321 	heap_freetuple(reltup2);
1322 
1323 #if PG_VERSION_NUM >= 120000
1324 	table_close(relRelation, RowExclusiveLock);
1325 #else
1326 	heap_close(relRelation, RowExclusiveLock);
1327 #endif
1328 }
1329 
1330 /**
1331  * @fn      Datum repack_index_swap(PG_FUNCTION_ARGS)
1332  * @brief   Swap out an original index on a table with the newly-created one.
1333  *
1334  * repack_index_swap(index)
1335  *
1336  * @param	index	Oid of the *original* index.
1337  * @retval	void
1338  */
1339 Datum
repack_index_swap(PG_FUNCTION_ARGS)1340 repack_index_swap(PG_FUNCTION_ARGS)
1341 {
1342 	Oid                orig_idx_oid = PG_GETARG_OID(0);
1343 	Oid                repacked_idx_oid;
1344 	StringInfoData     str;
1345 	SPITupleTable      *tuptable;
1346 	TupleDesc          desc;
1347 	HeapTuple          tuple;
1348 
1349 	/* authority check */
1350 	must_be_superuser("repack_index_swap");
1351 
1352 	/* connect to SPI manager */
1353 	repack_init();
1354 
1355 	initStringInfo(&str);
1356 
1357 	/* Find the OID of our new index. */
1358 	appendStringInfo(&str, "SELECT oid FROM pg_class "
1359 					 "WHERE relname = 'index_%u' AND relkind = 'i'",
1360 					 orig_idx_oid);
1361 	execute(SPI_OK_SELECT, str.data);
1362 	if (SPI_processed != 1)
1363 		elog(ERROR, "Could not find index 'index_%u', found " UINT64_FORMAT " matches",
1364 			 orig_idx_oid, (uint64) SPI_processed);
1365 
1366 	tuptable = SPI_tuptable;
1367 	desc = tuptable->tupdesc;
1368 	tuple = tuptable->vals[0];
1369 	repacked_idx_oid = getoid(tuple, desc, 1);
1370 	swap_heap_or_index_files(orig_idx_oid, repacked_idx_oid);
1371 	SPI_finish();
1372 	PG_RETURN_VOID();
1373 }
1374 
1375 /**
1376  * @fn      Datum get_table_and_inheritors(PG_FUNCTION_ARGS)
1377  * @brief   Return array containing Oids of parent table and its children.
1378  *          Note that this function does not release relation locks.
1379  *
1380  * get_table_and_inheritors(table)
1381  *
1382  * @param	table	parent table.
1383  * @retval	regclass[]
1384  */
1385 Datum
repack_get_table_and_inheritors(PG_FUNCTION_ARGS)1386 repack_get_table_and_inheritors(PG_FUNCTION_ARGS)
1387 {
1388 	Oid			parent = PG_GETARG_OID(0);
1389 	List	   *relations;
1390 	Datum	   *relations_array;
1391 	int			relations_array_size;
1392 	ArrayType  *result;
1393 	ListCell   *lc;
1394 	int			i;
1395 
1396 	LockRelationOid(parent, AccessShareLock);
1397 
1398 	/* Check that parent table exists */
1399 	if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(parent)))
1400 		PG_RETURN_ARRAYTYPE_P(construct_empty_array(OIDOID));
1401 
1402 	/* Also check that children exist */
1403 	relations = find_all_inheritors(parent, AccessShareLock, NULL);
1404 
1405 	relations_array_size = list_length(relations);
1406 	if (relations_array_size == 0)
1407 		PG_RETURN_ARRAYTYPE_P(construct_empty_array(OIDOID));
1408 
1409 	relations_array = palloc(relations_array_size * sizeof(Datum));
1410 
1411 	i = 0;
1412 	foreach (lc, relations)
1413 		relations_array[i++] = ObjectIdGetDatum(lfirst_oid(lc));
1414 
1415 	result = construct_array(relations_array,
1416 							 relations_array_size,
1417 							 OIDOID, sizeof(Oid),
1418 							 true, 'i');
1419 
1420 	pfree(relations_array);
1421 
1422 	PG_RETURN_ARRAYTYPE_P(result);
1423 }
1424