1 /*
2  * pg_repack.c: bin/pg_repack.c
3  *
4  * Portions Copyright (c) 2008-2011, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
5  * Portions Copyright (c) 2011, Itagaki Takahiro
6  * Portions Copyright (c) 2012-2020, The Reorg Development Team
7  */
8 
9 /**
10  * @brief Client Modules
11  */
12 
13 const char *PROGRAM_URL		= "https://reorg.github.io/pg_repack/";
14 const char *PROGRAM_ISSUES	= "https://github.com/reorg/pg_repack/issues";
15 
16 #ifdef REPACK_VERSION
17 /* macro trick to stringify a macro expansion */
18 #define xstr(s) str(s)
19 #define str(s) #s
20 const char *PROGRAM_VERSION = xstr(REPACK_VERSION);
21 #else
22 const char *PROGRAM_VERSION = "unknown";
23 #endif
24 
25 #include "pgut/pgut-fe.h"
26 
27 #include <errno.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31 #include <time.h>
32 
33 
34 #ifdef HAVE_POLL_H
35 #include <poll.h>
36 #endif
37 #ifdef HAVE_SYS_POLL_H
38 #include <sys/poll.h>
39 #endif
40 #ifdef HAVE_SYS_SELECT_H
41 #include <sys/select.h>
42 #endif
43 
44 
45 /*
46  * APPLY_COUNT: Number of applied logs per transaction. Larger values
47  * could be faster, but will be long transactions in the REDO phase.
48  */
49 #define APPLY_COUNT		1000
50 
51 /* Once we get down to seeing fewer than this many tuples in the
52  * log table, we'll say that we're ready to perform the switch.
53  */
54 #define MIN_TUPLES_BEFORE_SWITCH	20
55 
56 /* poll() or select() timeout, in seconds */
57 #define POLL_TIMEOUT    3
58 
59 /* Compile an array of existing transactions which are active during
60  * pg_repack's setup. Some transactions we can safely ignore:
61  *  a. The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
62  *     servers. See https://github.com/reorg/pg_reorg/issues/1
63  *  b. Our own database connections
64  *  c. Other pg_repack clients, as distinguished by application_name, which
65  *     may be operating on other tables at the same time. See
66  *     https://github.com/reorg/pg_repack/issues/1
67  *  d. open transactions/locks existing on other databases than the actual
68  *     processing relation (except for locks on shared objects)
69  *  e. VACUUMs which are always executed outside transaction blocks.
70  *
71  * Note, there is some redundancy in how the filtering is done (e.g. excluding
72  * based on pg_backend_pid() and application_name), but that shouldn't hurt
73  * anything. Also, the test of application_name is not bulletproof -- for
74  * instance, the application name when running installcheck will be
75  * pg_regress.
76  */
77 #define SQL_XID_SNAPSHOT_90200 \
78 	"SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
79 	"  FROM pg_locks AS l " \
80 	"  LEFT JOIN pg_stat_activity AS a " \
81 	"    ON l.pid = a.pid " \
82 	"  LEFT JOIN pg_database AS d " \
83 	"    ON a.datid = d.oid " \
84 	"  WHERE l.locktype = 'virtualxid' " \
85 	"  AND l.pid NOT IN (pg_backend_pid(), $1) " \
86 	"  AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
87 	"  AND (a.application_name IS NULL OR a.application_name <> $2)" \
88 	"  AND a.query !~* E'^\\\\s*vacuum\\\\s+' " \
89 	"  AND a.query !~ E'^autovacuum: ' " \
90 	"  AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)"
91 
92 #define SQL_XID_SNAPSHOT_90000 \
93 	"SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
94 	"  FROM pg_locks AS l " \
95 	"  LEFT JOIN pg_stat_activity AS a " \
96 	"    ON l.pid = a.procpid " \
97 	"  LEFT JOIN pg_database AS d " \
98 	"    ON a.datid = d.oid " \
99 	"  WHERE l.locktype = 'virtualxid' " \
100 	"  AND l.pid NOT IN (pg_backend_pid(), $1) " \
101 	"  AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
102 	"  AND (a.application_name IS NULL OR a.application_name <> $2)" \
103 	"  AND a.current_query !~* E'^\\\\s*vacuum\\\\s+' " \
104 	"  AND a.current_query !~ E'^autovacuum: ' " \
105 	"  AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)"
106 
107 /* application_name is not available before 9.0. The last clause of
108  * the WHERE clause is just to eat the $2 parameter (application name).
109  */
110 #define SQL_XID_SNAPSHOT_80300 \
111 	"SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
112 	"  FROM pg_locks AS l" \
113 	"  LEFT JOIN pg_stat_activity AS a " \
114 	"    ON l.pid = a.procpid " \
115 	"  LEFT JOIN pg_database AS d " \
116 	"    ON a.datid = d.oid " \
117 	" WHERE l.locktype = 'virtualxid' AND l.pid NOT IN (pg_backend_pid(), $1)" \
118 	" AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
119 	" AND a.current_query !~* E'^\\\\s*vacuum\\\\s+' " \
120 	" AND a.current_query !~ E'^autovacuum: ' " \
121 	" AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)" \
122 	" AND ($2::text IS NOT NULL)"
123 
124 #define SQL_XID_SNAPSHOT \
125 	(PQserverVersion(connection) >= 90200 ? SQL_XID_SNAPSHOT_90200 : \
126 	 (PQserverVersion(connection) >= 90000 ? SQL_XID_SNAPSHOT_90000 : \
127 	  SQL_XID_SNAPSHOT_80300))
128 
129 
130 /* Later, check whether any of the transactions we saw before are still
131  * alive, and wait for them to go away.
132  */
133 #define SQL_XID_ALIVE \
134 	"SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
135 	" AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
136 
137 /* To be run while our main connection holds an AccessExclusive lock on the
138  * target table, and our secondary conn is attempting to grab an AccessShare
139  * lock. We know that "granted" must be false for these queries because
140  * we already hold the AccessExclusive lock. Also, we only care about other
141  * transactions trying to grab an ACCESS EXCLUSIVE lock, because we are only
142  * trying to kill off disallowed DDL commands, e.g. ALTER TABLE or TRUNCATE.
143  */
144 #define CANCEL_COMPETING_LOCKS \
145 	"SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
146 	" AND granted = false AND relation = %u"\
147 	" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
148 
149 #define KILL_COMPETING_LOCKS \
150 	"SELECT pg_terminate_backend(pid) "\
151 	"FROM pg_locks WHERE locktype = 'relation'"\
152 	" AND granted = false AND relation = %u"\
153 	" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
154 
155 #define COUNT_COMPETING_LOCKS \
156 	"SELECT pid FROM pg_locks WHERE locktype = 'relation'" \
157 	" AND granted = false AND relation = %u" \
158 	" AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
159 
160 /* Will be used as a unique prefix for advisory locks. */
161 #define REPACK_LOCK_PREFIX_STR "16185446"
162 
163 typedef enum
164 {
165 	UNPROCESSED,
166 	INPROGRESS,
167 	FINISHED
168 } index_status_t;
169 
170 /*
171  * per-index information
172  */
173 typedef struct repack_index
174 {
175 	Oid				target_oid;		/* target: OID */
176 	const char	   *create_index;	/* CREATE INDEX */
177 	index_status_t  status; 		/* Track parallel build statuses. */
178 	int             worker_idx;		/* which worker conn is handling */
179 } repack_index;
180 
181 /*
182  * per-table information
183  */
184 typedef struct repack_table
185 {
186 	const char	   *target_name;	/* target: relname */
187 	Oid				target_oid;		/* target: OID */
188 	Oid				target_toast;	/* target: toast OID */
189 	Oid				target_tidx;	/* target: toast index OID */
190 	Oid				pkid;			/* target: PK OID */
191 	Oid				ckid;			/* target: CK OID */
192 	const char	   *create_pktype;	/* CREATE TYPE pk */
193 	const char	   *create_log;		/* CREATE TABLE log */
194 	const char	   *create_trigger;	/* CREATE TRIGGER repack_trigger */
195 	const char	   *enable_trigger;	/* ALTER TABLE ENABLE ALWAYS TRIGGER repack_trigger */
196 	const char	   *create_table;	/* CREATE TABLE table AS SELECT WITH NO DATA*/
197 	const char	   *copy_data;		/* INSERT INTO */
198 	const char	   *alter_col_storage;	/* ALTER TABLE ALTER COLUMN SET STORAGE */
199 	const char	   *drop_columns;	/* ALTER TABLE DROP COLUMNs */
200 	const char	   *delete_log;		/* DELETE FROM log */
201 	const char	   *lock_table;		/* LOCK TABLE table */
202 	const char	   *sql_peek;		/* SQL used in flush */
203 	const char	   *sql_insert;		/* SQL used in flush */
204 	const char	   *sql_delete;		/* SQL used in flush */
205 	const char	   *sql_update;		/* SQL used in flush */
206 	const char	   *sql_pop;		/* SQL used in flush */
207 	int             n_indexes;      /* number of indexes */
208 	repack_index   *indexes;        /* info on each index */
209 } repack_table;
210 
211 
212 static bool is_superuser(void);
213 static void check_tablespace(void);
214 static bool preliminary_checks(char *errbuf, size_t errsize);
215 static bool is_requested_relation_exists(char *errbuf, size_t errsize);
216 static void repack_all_databases(const char *order_by);
217 static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
218 static void repack_one_table(repack_table *table, const char *order_by);
219 static bool repack_table_indexes(PGresult *index_details);
220 static bool repack_all_indexes(char *errbuf, size_t errsize);
221 static void repack_cleanup(bool fatal, const repack_table *table);
222 static void repack_cleanup_callback(bool fatal, void *userdata);
223 static bool rebuild_indexes(const repack_table *table);
224 
225 static char *getstr(PGresult *res, int row, int col);
226 static Oid getoid(PGresult *res, int row, int col);
227 static bool advisory_lock(PGconn *conn, const char *relid);
228 static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
229 static bool kill_ddl(PGconn *conn, Oid relid, bool terminate);
230 static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name);
231 
232 #define SQLSTATE_INVALID_SCHEMA_NAME	"3F000"
233 #define SQLSTATE_UNDEFINED_FUNCTION		"42883"
234 #define SQLSTATE_QUERY_CANCELED			"57014"
235 
sqlstate_equals(PGresult * res,const char * state)236 static bool sqlstate_equals(PGresult *res, const char *state)
237 {
238 	return strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), state) == 0;
239 }
240 
241 static bool				analyze = true;
242 static bool				alldb = false;
243 static bool				noorder = false;
244 static SimpleStringList	parent_table_list = {NULL, NULL};
245 static SimpleStringList	table_list = {NULL, NULL};
246 static SimpleStringList	schema_list = {NULL, NULL};
247 static char				*orderby = NULL;
248 static char				*tablespace = NULL;
249 static bool				moveidx = false;
250 static SimpleStringList	r_index = {NULL, NULL};
251 static bool				only_indexes = false;
252 static int				wait_timeout = 60;	/* in seconds */
253 static int				jobs = 0;	/* number of concurrent worker conns. */
254 static bool				dryrun = false;
255 static unsigned int		temp_obj_num = 0; /* temporary objects counter */
256 static bool				no_kill_backend = false; /* abandon when timed-out */
257 static bool				no_superuser_check = false;
258 static SimpleStringList	exclude_extension_list = {NULL, NULL}; /* don't repack tables of these extensions */
259 
260 /* buffer should have at least 11 bytes */
261 static char *
utoa(unsigned int value,char * buffer)262 utoa(unsigned int value, char *buffer)
263 {
264 	sprintf(buffer, "%u", value);
265 
266 	return buffer;
267 }
268 
269 static pgut_option options[] =
270 {
271 	{ 'b', 'a', "all", &alldb },
272 	{ 'l', 't', "table", &table_list },
273 	{ 'l', 'I', "parent-table", &parent_table_list },
274 	{ 'l', 'c', "schema", &schema_list },
275 	{ 'b', 'n', "no-order", &noorder },
276 	{ 'b', 'N', "dry-run", &dryrun },
277 	{ 's', 'o', "order-by", &orderby },
278 	{ 's', 's', "tablespace", &tablespace },
279 	{ 'b', 'S', "moveidx", &moveidx },
280 	{ 'l', 'i', "index", &r_index },
281 	{ 'b', 'x', "only-indexes", &only_indexes },
282 	{ 'i', 'T', "wait-timeout", &wait_timeout },
283 	{ 'B', 'Z', "no-analyze", &analyze },
284 	{ 'i', 'j', "jobs", &jobs },
285 	{ 'b', 'D', "no-kill-backend", &no_kill_backend },
286 	{ 'b', 'k', "no-superuser-check", &no_superuser_check },
287 	{ 'l', 'C', "exclude-extension", &exclude_extension_list },
288 	{ 0 },
289 };
290 
291 int
main(int argc,char * argv[])292 main(int argc, char *argv[])
293 {
294 	int						i;
295 	char						errbuf[256];
296 
297 	i = pgut_getopt(argc, argv, options);
298 
299 	if (i == argc - 1)
300 		dbname = argv[i];
301 	else if (i < argc)
302 		ereport(ERROR,
303 			(errcode(EINVAL),
304 			 errmsg("too many arguments")));
305 
306 	check_tablespace();
307 
308 	if (dryrun)
309 		elog(INFO, "Dry run enabled, not executing repack");
310 
311 	if (r_index.head || only_indexes)
312 	{
313 		if (r_index.head && table_list.head)
314 			ereport(ERROR, (errcode(EINVAL),
315 				errmsg("cannot specify --index (-i) and --table (-t)")));
316 		if (r_index.head && parent_table_list.head)
317 			ereport(ERROR, (errcode(EINVAL),
318 				errmsg("cannot specify --index (-i) and --parent-table (-I)")));
319 		else if (r_index.head && only_indexes)
320 			ereport(ERROR, (errcode(EINVAL),
321 				errmsg("cannot specify --index (-i) and --only-indexes (-x)")));
322 		else if (r_index.head && exclude_extension_list.head)
323 			ereport(ERROR, (errcode(EINVAL),
324 				errmsg("cannot specify --index (-i) and --exclude-extension (-C)")));
325 		else if (only_indexes && !(table_list.head || parent_table_list.head))
326 			ereport(ERROR, (errcode(EINVAL),
327 				errmsg("cannot repack all indexes of database, specify the table(s)"
328 					   "via --table (-t) or --parent-table (-I)")));
329 		else if (only_indexes && exclude_extension_list.head)
330 			ereport(ERROR, (errcode(EINVAL),
331 				errmsg("cannot specify --only-indexes (-x) and --exclude-extension (-C)")));
332 		else if (alldb)
333 			ereport(ERROR, (errcode(EINVAL),
334 				errmsg("cannot repack specific index(es) in all databases")));
335 		else
336 		{
337 			if (orderby)
338 				ereport(WARNING, (errcode(EINVAL),
339 					errmsg("option -o (--order-by) has no effect while repacking indexes")));
340 			else if (noorder)
341 				ereport(WARNING, (errcode(EINVAL),
342 					errmsg("option -n (--no-order) has no effect while repacking indexes")));
343 			else if (!analyze)
344 				ereport(WARNING, (errcode(EINVAL),
345 					errmsg("ANALYZE is not performed after repacking indexes, -z (--no-analyze) has no effect")));
346 			else if (jobs)
347 				ereport(WARNING, (errcode(EINVAL),
348 					errmsg("option -j (--jobs) has no effect, repacking indexes does not use parallel jobs")));
349 			if (!repack_all_indexes(errbuf, sizeof(errbuf)))
350 				ereport(ERROR,
351 					(errcode(ERROR), errmsg("%s", errbuf)));
352 		}
353 	}
354 	else
355 	{
356 		if (schema_list.head && (table_list.head || parent_table_list.head))
357 			ereport(ERROR,
358 				(errcode(EINVAL),
359 				 errmsg("cannot repack specific table(s) in schema, use schema.table notation instead")));
360 
361 		if (exclude_extension_list.head && table_list.head)
362 			ereport(ERROR,
363 				(errcode(EINVAL),
364 				 errmsg("cannot specify --table (-t) and --exclude-extension (-C)")));
365 
366 		if (exclude_extension_list.head && parent_table_list.head)
367 			ereport(ERROR,
368 				(errcode(EINVAL),
369 				 errmsg("cannot specify --parent-table (-I) and --exclude-extension (-C)")));
370 
371 		if (noorder)
372 			orderby = "";
373 
374 		if (alldb)
375 		{
376 			if (table_list.head || parent_table_list.head)
377 				ereport(ERROR,
378 					(errcode(EINVAL),
379 					 errmsg("cannot repack specific table(s) in all databases")));
380 			if (schema_list.head)
381 				ereport(ERROR,
382 					(errcode(EINVAL),
383 					 errmsg("cannot repack specific schema(s) in all databases")));
384 			repack_all_databases(orderby);
385 		}
386 		else
387 		{
388 			if (!repack_one_database(orderby, errbuf, sizeof(errbuf)))
389 				ereport(ERROR,
390 					(errcode(ERROR), errmsg("%s failed with error: %s", PROGRAM_NAME, errbuf)));
391 		}
392 	}
393 
394 	return 0;
395 }
396 
397 
398 /*
399  * Test if the current user is a database superuser.
400  * Borrowed from psql/common.c
401  *
402  * Note: this will correctly detect superuserness only with a protocol-3.0
403  * or newer backend; otherwise it will always say "false".
404  */
405 bool
is_superuser(void)406 is_superuser(void)
407 {
408 	const char *val;
409 
410 	if (no_superuser_check)
411 		return true;
412 
413 	if (!connection)
414 		return false;
415 
416 	val = PQparameterStatus(connection, "is_superuser");
417 
418 	if (val && strcmp(val, "on") == 0)
419 		return true;
420 
421 	return false;
422 }
423 
424 /*
425  * Check if the tablespace requested exists.
426  *
427  * Raise an exception on error.
428  */
429 void
check_tablespace()430 check_tablespace()
431 {
432 	PGresult		*res = NULL;
433 	const char *params[1];
434 
435 	if (tablespace == NULL)
436 	{
437 		/* nothing to check, but let's see the options */
438 		if (moveidx)
439 		{
440 			ereport(ERROR,
441 				(errcode(EINVAL),
442 				 errmsg("cannot specify --moveidx (-S) without --tablespace (-s)")));
443 		}
444 		return;
445 	}
446 
447 	/* check if the tablespace exists */
448 	reconnect(ERROR);
449 	params[0] = tablespace;
450 	res = execute_elevel(
451 		"select spcname from pg_tablespace where spcname = $1",
452 		1, params, DEBUG2);
453 
454 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
455 	{
456 		if (PQntuples(res) == 0)
457 		{
458 			ereport(ERROR,
459 				(errcode(EINVAL),
460 				 errmsg("the tablespace \"%s\" doesn't exist", tablespace)));
461 		}
462 	}
463 	else
464 	{
465 		ereport(ERROR,
466 			(errcode(EINVAL),
467 			 errmsg("error checking the namespace: %s",
468 				 PQerrorMessage(connection))));
469 	}
470 
471 	CLEARPGRES(res);
472 }
473 
474 /*
475  * Perform sanity checks before beginning work. Make sure pg_repack is
476  * installed in the database, the user is a superuser, etc.
477  */
478 static bool
preliminary_checks(char * errbuf,size_t errsize)479 preliminary_checks(char *errbuf, size_t errsize){
480 	bool			ret = false;
481 	PGresult		*res = NULL;
482 
483 	if (!is_superuser()) {
484 		if (errbuf)
485 			snprintf(errbuf, errsize, "You must be a superuser to use %s",
486 					 PROGRAM_NAME);
487 		goto cleanup;
488 	}
489 
490 	/* Query the extension version. Exit if no match */
491 	res = execute_elevel("select repack.version(), repack.version_sql()",
492 		0, NULL, DEBUG2);
493 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
494 	{
495 		const char	   *libver;
496 		char			buf[64];
497 
498 		/* the string is something like "pg_repack 1.1.7" */
499 		snprintf(buf, sizeof(buf), "%s %s", PROGRAM_NAME, PROGRAM_VERSION);
500 
501 		/* check the version of the C library */
502 		libver = getstr(res, 0, 0);
503 		if (0 != strcmp(buf, libver))
504 		{
505 			if (errbuf)
506 				snprintf(errbuf, errsize,
507 					"program '%s' does not match database library '%s'",
508 					buf, libver);
509 			goto cleanup;
510 		}
511 
512 		/* check the version of the SQL extension */
513 		libver = getstr(res, 0, 1);
514 		if (0 != strcmp(buf, libver))
515 		{
516 			if (errbuf)
517 				snprintf(errbuf, errsize,
518 					"extension '%s' required, found extension '%s'",
519 					buf, libver);
520 			goto cleanup;
521 		}
522 	}
523 	else
524 	{
525 		if (sqlstate_equals(res, SQLSTATE_INVALID_SCHEMA_NAME)
526 			|| sqlstate_equals(res, SQLSTATE_UNDEFINED_FUNCTION))
527 		{
528 			/* Schema repack does not exist, or version too old (version
529 			 * functions not found). Skip the database.
530 			 */
531 			if (errbuf)
532 				snprintf(errbuf, errsize,
533 					"%s %s is not installed in the database",
534 					PROGRAM_NAME, PROGRAM_VERSION);
535 		}
536 		else
537 		{
538 			/* Return the error message otherwise */
539 			if (errbuf)
540 				snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
541 		}
542 		goto cleanup;
543 	}
544 	CLEARPGRES(res);
545 
546 	/* Disable statement timeout. */
547 	command("SET statement_timeout = 0", 0, NULL);
548 
549 	/* Restrict search_path to system catalog. */
550 	command("SET search_path = pg_catalog, pg_temp, public", 0, NULL);
551 
552 	/* To avoid annoying "create implicit ..." messages. */
553 	command("SET client_min_messages = warning", 0, NULL);
554 
555 	ret = true;
556 
557 cleanup:
558 	CLEARPGRES(res);
559 	return ret;
560 }
561 
562 /*
563  * Check the presence of tables specified by --parent-table and --table
564  * otherwise format user-friendly message
565  */
566 static bool
is_requested_relation_exists(char * errbuf,size_t errsize)567 is_requested_relation_exists(char *errbuf, size_t errsize){
568 	bool			ret = false;
569 	PGresult		*res = NULL;
570 	const char	    **params = NULL;
571 	int				iparam = 0;
572 	StringInfoData	sql;
573 	int				num_relations;
574 	SimpleStringListCell   *cell;
575 
576 	num_relations = simple_string_list_size(parent_table_list) +
577 					simple_string_list_size(table_list);
578 
579 	/* nothing was implicitly requested, so nothing to do here */
580 	if (num_relations == 0)
581 		return true;
582 
583 	/* has no suitable to_regclass(text) */
584 	if (PQserverVersion(connection)<90600)
585 		return true;
586 
587 	params = pgut_malloc(num_relations * sizeof(char *));
588 	initStringInfo(&sql);
589 	appendStringInfoString(&sql, "SELECT r FROM (VALUES ");
590 
591 	for (cell = table_list.head; cell; cell = cell->next)
592 	{
593 		appendStringInfo(&sql, "($%d)", iparam + 1);
594 		params[iparam++] = cell->val;
595 		if (iparam < num_relations)
596 			appendStringInfoChar(&sql, ',');
597 	}
598 	for (cell = parent_table_list.head; cell; cell = cell->next)
599 	{
600 		appendStringInfo(&sql, "($%d)", iparam + 1);
601 		params[iparam++] = cell->val;
602 		if (iparam < num_relations)
603 			appendStringInfoChar(&sql, ',');
604 	}
605 	appendStringInfoString(&sql,
606 		") AS given_t(r)"
607 		" WHERE NOT EXISTS("
608 		"  SELECT FROM repack.tables WHERE relid=to_regclass(given_t.r) )"
609 	);
610 
611 	/* double check the parameters array is sane */
612 	if (iparam != num_relations)
613 	{
614 		if (errbuf)
615 			snprintf(errbuf, errsize,
616 				"internal error: bad parameters count: %i instead of %i",
617 				 iparam, num_relations);
618 		goto cleanup;
619 	}
620 
621 	res = execute_elevel(sql.data, iparam, params, DEBUG2);
622 	if (PQresultStatus(res) == PGRES_TUPLES_OK)
623 	{
624 		int 	num;
625 
626 		num = PQntuples(res);
627 
628 		if (num != 0)
629 		{
630 			int i;
631 			StringInfoData	rel_names;
632 			initStringInfo(&rel_names);
633 
634 			for (i = 0; i < num; i++)
635 			{
636 				appendStringInfo(&rel_names, "\"%s\"", getstr(res, i, 0));
637 				if ((i + 1) != num)
638 					appendStringInfoString(&rel_names, ", ");
639 			}
640 
641 			if (errbuf)
642 			{
643 				if (num > 1)
644 					snprintf(errbuf, errsize,
645 							"relations do not exist: %s", rel_names.data);
646 				else
647 					snprintf(errbuf, errsize,
648 							"ERROR:  relation %s does not exist", rel_names.data);
649 			}
650 			termStringInfo(&rel_names);
651 		}
652 		else
653 			ret = true;
654 	}
655 	else
656 	{
657 		if (errbuf)
658 			snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
659 	}
660 	CLEARPGRES(res);
661 
662 cleanup:
663 	CLEARPGRES(res);
664 	termStringInfo(&sql);
665 	free(params);
666 	return ret;
667 }
668 
669 /*
670  * Call repack_one_database for each database.
671  */
672 static void
repack_all_databases(const char * orderby)673 repack_all_databases(const char *orderby)
674 {
675 	PGresult   *result;
676 	int			i;
677 
678 	dbname = "postgres";
679 	reconnect(ERROR);
680 
681 	if (!is_superuser())
682 		elog(ERROR, "You must be a superuser to use %s", PROGRAM_NAME);
683 
684 	result = execute("SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", 0, NULL);
685 	disconnect();
686 
687 	for (i = 0; i < PQntuples(result); i++)
688 	{
689 		bool	ret;
690 		char	errbuf[256];
691 
692 		dbname = PQgetvalue(result, i, 0);
693 
694 		elog(INFO, "repacking database \"%s\"", dbname);
695 		if (!dryrun)
696 		{
697 			ret = repack_one_database(orderby, errbuf, sizeof(errbuf));
698 			if (!ret)
699 				elog(INFO, "database \"%s\" skipped: %s", dbname, errbuf);
700 		}
701 	}
702 
703 	CLEARPGRES(result);
704 }
705 
706 /* result is not copied */
707 static char *
getstr(PGresult * res,int row,int col)708 getstr(PGresult *res, int row, int col)
709 {
710 	if (PQgetisnull(res, row, col))
711 		return NULL;
712 	else
713 		return PQgetvalue(res, row, col);
714 }
715 
716 static Oid
getoid(PGresult * res,int row,int col)717 getoid(PGresult *res, int row, int col)
718 {
719 	if (PQgetisnull(res, row, col))
720 		return InvalidOid;
721 	else
722 		return (Oid)strtoul(PQgetvalue(res, row, col), NULL, 10);
723 }
724 
725 /*
726  * Call repack_one_table for the target tables or each table in a database.
727  */
728 static bool
repack_one_database(const char * orderby,char * errbuf,size_t errsize)729 repack_one_database(const char *orderby, char *errbuf, size_t errsize)
730 {
731 	bool					ret = false;
732 	PGresult			   *res = NULL;
733 	int						i;
734 	int						num;
735 	StringInfoData			sql;
736 	SimpleStringListCell   *cell;
737 	const char			  **params = NULL;
738 	int						iparam = 0;
739 	size_t					num_parent_tables,
740 							num_tables,
741 							num_schemas,
742 							num_params,
743 							num_excluded_extensions;
744 
745 	num_parent_tables = simple_string_list_size(parent_table_list);
746 	num_tables = simple_string_list_size(table_list);
747 	num_schemas = simple_string_list_size(schema_list);
748 	num_excluded_extensions = simple_string_list_size(exclude_extension_list);
749 
750 	/* 1st param is the user-specified tablespace */
751 	num_params = num_excluded_extensions +
752 				 num_parent_tables +
753 				 num_tables +
754 				 num_schemas + 1;
755 	params = pgut_malloc(num_params * sizeof(char *));
756 
757 	initStringInfo(&sql);
758 
759 	reconnect(ERROR);
760 
761 	/* No sense in setting up concurrent workers if --jobs=1 */
762 	if (jobs > 1)
763 		setup_workers(jobs);
764 
765 	if (!preliminary_checks(errbuf, errsize))
766 		goto cleanup;
767 
768 	if (!is_requested_relation_exists(errbuf, errsize))
769 		goto cleanup;
770 
771 	/* acquire target tables */
772 	appendStringInfoString(&sql,
773 		"SELECT t.*,"
774 		" coalesce(v.tablespace, t.tablespace_orig) as tablespace_dest"
775 		" FROM repack.tables t, "
776 		" (VALUES (quote_ident($1::text))) as v (tablespace)"
777 		" WHERE ");
778 
779 	params[iparam++] = tablespace;
780 	if (num_tables || num_parent_tables)
781 	{
782 		/* standalone tables */
783 		if (num_tables)
784 		{
785 			appendStringInfoString(&sql, "(");
786 			for (cell = table_list.head; cell; cell = cell->next)
787 			{
788 				/* Construct table name placeholders to be used by PQexecParams */
789 				appendStringInfo(&sql, "relid = $%d::regclass", iparam + 1);
790 				params[iparam++] = cell->val;
791 				if (cell->next)
792 					appendStringInfoString(&sql, " OR ");
793 			}
794 			appendStringInfoString(&sql, ")");
795 		}
796 
797 		if (num_tables && num_parent_tables)
798 			appendStringInfoString(&sql, " OR ");
799 
800 		/* parent tables + inherited children */
801 		if (num_parent_tables)
802 		{
803 			appendStringInfoString(&sql, "(");
804 			for (cell = parent_table_list.head; cell; cell = cell->next)
805 			{
806 				/* Construct table name placeholders to be used by PQexecParams */
807 				appendStringInfo(&sql,
808 								 "relid = ANY(repack.get_table_and_inheritors($%d::regclass))",
809 								 iparam + 1);
810 				params[iparam++] = cell->val;
811 				if (cell->next)
812 					appendStringInfoString(&sql, " OR ");
813 			}
814 			appendStringInfoString(&sql, ")");
815 		}
816 	}
817 	else if (num_schemas)
818 	{
819 		appendStringInfoString(&sql, "schemaname IN (");
820 		for (cell = schema_list.head; cell; cell = cell->next)
821 		{
822 			/* Construct schema name placeholders to be used by PQexecParams */
823 			appendStringInfo(&sql, "$%d", iparam + 1);
824 			params[iparam++] = cell->val;
825 			if (cell->next)
826 				appendStringInfoString(&sql, ", ");
827 		}
828 		appendStringInfoString(&sql, ")");
829 	}
830 	else
831 	{
832 		appendStringInfoString(&sql, "pkid IS NOT NULL");
833 	}
834 
835 	/* Exclude tables which belong to extensions */
836 	if (exclude_extension_list.head)
837 	{
838 		appendStringInfoString(&sql, " AND t.relid NOT IN"
839 									 "  (SELECT d.objid::regclass"
840 									 "   FROM pg_depend d JOIN pg_extension e"
841 									 "   ON d.refobjid = e.oid"
842 									 "   WHERE d.classid = 'pg_class'::regclass AND (");
843 
844 		/* List all excluded extensions */
845 		for (cell = exclude_extension_list.head; cell; cell = cell->next)
846 		{
847 			appendStringInfo(&sql, "e.extname = $%d", iparam + 1);
848 			params[iparam++] = cell->val;
849 
850 			appendStringInfoString(&sql, cell->next ? " OR " : ")");
851 		}
852 
853 		/* Close subquery */
854 		appendStringInfoString(&sql, ")");
855 	}
856 
857 	/* Ensure the regression tests get a consistent ordering of tables */
858 	appendStringInfoString(&sql, " ORDER BY t.relname, t.schemaname");
859 
860 	/* double check the parameters array is sane */
861 	if (iparam != num_params)
862 	{
863 		if (errbuf)
864 			snprintf(errbuf, errsize,
865 				"internal error: bad parameters count: %i instead of %zi",
866 				 iparam, num_params);
867 		goto cleanup;
868 	}
869 
870 	res = execute_elevel(sql.data, (int) num_params, params, DEBUG2);
871 
872 	/* on error skip the database */
873 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
874 	{
875 		/* Return the error message otherwise */
876 		if (errbuf)
877 			snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
878 		goto cleanup;
879 	}
880 
881 	num = PQntuples(res);
882 
883 	for (i = 0; i < num; i++)
884 	{
885 		repack_table	table;
886 		StringInfoData	copy_sql;
887 		const char *create_table_1;
888 		const char *create_table_2;
889 		const char *tablespace;
890 		const char *ckey;
891 		int			c = 0;
892 
893 		table.target_name = getstr(res, i, c++);
894 		table.target_oid = getoid(res, i, c++);
895 		table.target_toast = getoid(res, i, c++);
896 		table.target_tidx = getoid(res, i, c++);
897 		c++; // Skip schemaname
898 		table.pkid = getoid(res, i, c++);
899 		table.ckid = getoid(res, i, c++);
900 
901 		if (table.pkid == 0) {
902 			ereport(WARNING,
903 					(errcode(E_PG_COMMAND),
904 					 errmsg("relation \"%s\" must have a primary key or not-null unique keys", table.target_name)));
905 			continue;
906 		}
907 
908 		table.create_pktype = getstr(res, i, c++);
909 		table.create_log = getstr(res, i, c++);
910 		table.create_trigger = getstr(res, i, c++);
911 		table.enable_trigger = getstr(res, i, c++);
912 
913 		create_table_1 = getstr(res, i, c++);
914 		tablespace = getstr(res, i, c++);	/* to be clobbered */
915 		create_table_2 = getstr(res, i, c++);
916 		table.copy_data = getstr(res, i , c++);
917 		table.alter_col_storage = getstr(res, i, c++);
918 		table.drop_columns = getstr(res, i, c++);
919 		table.delete_log = getstr(res, i, c++);
920 		table.lock_table = getstr(res, i, c++);
921 		ckey = getstr(res, i, c++);
922 		table.sql_peek = getstr(res, i, c++);
923 		table.sql_insert = getstr(res, i, c++);
924 		table.sql_delete = getstr(res, i, c++);
925 		table.sql_update = getstr(res, i, c++);
926 		table.sql_pop = getstr(res, i, c++);
927 		tablespace = getstr(res, i, c++);
928 
929 		/* Craft CREATE TABLE SQL */
930 		resetStringInfo(&sql);
931 		appendStringInfoString(&sql, create_table_1);
932 		appendStringInfoString(&sql, tablespace);
933 		appendStringInfoString(&sql, create_table_2);
934 
935 		/* Always append WITH NO DATA to CREATE TABLE SQL*/
936 		appendStringInfoString(&sql, " WITH NO DATA");
937 		table.create_table = sql.data;
938 
939 		/* Craft Copy SQL */
940 		initStringInfo(&copy_sql);
941 		appendStringInfoString(&copy_sql, table.copy_data);
942 		if (!orderby)
943 
944 		{
945 			if (ckey != NULL)
946 			{
947 				/* CLUSTER mode */
948 				appendStringInfoString(&copy_sql, " ORDER BY ");
949 				appendStringInfoString(&copy_sql, ckey);
950 			}
951 
952 			/* else, VACUUM FULL mode (non-clustered tables) */
953 		}
954 		else if (!orderby[0])
955 		{
956 			/* VACUUM FULL mode (for clustered tables too), do nothing */
957 		}
958 		else
959 		{
960 			/* User specified ORDER BY */
961 			appendStringInfoString(&copy_sql, " ORDER BY ");
962 			appendStringInfoString(&copy_sql, orderby);
963 		}
964 		table.copy_data = copy_sql.data;
965 
966 		repack_one_table(&table, orderby);
967 	}
968 	ret = true;
969 
970 cleanup:
971 	CLEARPGRES(res);
972 	disconnect();
973 	termStringInfo(&sql);
974 	free(params);
975 	return ret;
976 }
977 
978 static int
apply_log(PGconn * conn,const repack_table * table,int count)979 apply_log(PGconn *conn, const repack_table *table, int count)
980 {
981 	int			result;
982 	PGresult   *res;
983 	const char *params[6];
984 	char		buffer[12];
985 
986 	params[0] = table->sql_peek;
987 	params[1] = table->sql_insert;
988 	params[2] = table->sql_delete;
989 	params[3] = table->sql_update;
990 	params[4] = table->sql_pop;
991 	params[5] = utoa(count, buffer);
992 
993 	res = pgut_execute(conn,
994 					   "SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
995 					   6, params);
996 	result = atoi(PQgetvalue(res, 0, 0));
997 	CLEARPGRES(res);
998 
999 	return result;
1000 }
1001 
1002 /*
1003  * Create indexes on temp table, possibly using multiple worker connections
1004  * concurrently if the user asked for --jobs=...
1005  */
1006 static bool
rebuild_indexes(const repack_table * table)1007 rebuild_indexes(const repack_table *table)
1008 {
1009 	PGresult	   *res = NULL;
1010 	int			    num_indexes;
1011 	int				i;
1012 	int				num_active_workers;
1013 	int				num_workers;
1014 	repack_index   *index_jobs;
1015 	bool            have_error = false;
1016 
1017 	elog(DEBUG2, "---- create indexes ----");
1018 
1019 	num_indexes = table->n_indexes;
1020 
1021 	/* We might have more actual worker connections than we need,
1022 	 * if the number of workers exceeds the number of indexes to be
1023 	 * built. In that case, ignore the extra workers.
1024 	 */
1025 	num_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
1026 	num_active_workers = num_workers;
1027 
1028 	elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes,
1029 		 num_workers);
1030 
1031 	index_jobs = table->indexes;
1032 
1033 	for (i = 0; i < num_indexes; i++)
1034 	{
1035 
1036 		elog(DEBUG2, "set up index_jobs [%d]", i);
1037 		elog(DEBUG2, "target_oid   : %u", index_jobs[i].target_oid);
1038 		elog(DEBUG2, "create_index : %s", index_jobs[i].create_index);
1039 
1040 		if (num_workers <= 1) {
1041 			/* Use primary connection if we are not setting up parallel
1042 			 * index building, or if we only have one worker.
1043 			 */
1044 			command(index_jobs[i].create_index, 0, NULL);
1045 
1046 			/* This bookkeeping isn't actually important in this no-workers
1047 			 * case, but just for clarity.
1048 			 */
1049 			index_jobs[i].status = FINISHED;
1050 		}
1051 		else if (i < num_workers) {
1052 			/* Assign available worker to build an index. */
1053 			index_jobs[i].status = INPROGRESS;
1054 			index_jobs[i].worker_idx = i;
1055 			elog(LOG, "Initial worker %d to build index: %s",
1056 				 i, index_jobs[i].create_index);
1057 
1058 			if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
1059 			{
1060 				elog(WARNING, "Error sending async query: %s\n%s",
1061 					 index_jobs[i].create_index,
1062 					 PQerrorMessage(workers.conns[i]));
1063 				have_error = true;
1064 				goto cleanup;
1065 			}
1066 		}
1067 		/* Else we have more indexes to be built than workers
1068 		 * available. That's OK, we'll get to them later.
1069 		 */
1070 	}
1071 
1072 	if (num_workers > 1)
1073 	{
1074 		int freed_worker = -1;
1075 		int ret;
1076 
1077 /* Prefer poll() over select(), following PostgreSQL custom. */
1078 #ifdef HAVE_POLL
1079 		struct pollfd *input_fds;
1080 
1081 		input_fds = pgut_malloc(sizeof(struct pollfd) * num_workers);
1082 		for (i = 0; i < num_workers; i++)
1083 		{
1084 			input_fds[i].fd = PQsocket(workers.conns[i]);
1085 			input_fds[i].events = POLLIN | POLLERR;
1086 			input_fds[i].revents = 0;
1087 		}
1088 #else
1089 		fd_set input_mask;
1090 		struct timeval timeout;
1091 		/* select() needs the highest-numbered socket descriptor */
1092 		int max_fd;
1093 #endif
1094 
1095 		/* Now go through our index builds, and look for any which is
1096 		 * reported complete. Reassign that worker to the next index to
1097 		 * be built, if any.
1098 		 */
1099 		while (num_active_workers > 0)
1100 		{
1101 			elog(DEBUG2, "polling %d active workers", num_active_workers);
1102 
1103 #ifdef HAVE_POLL
1104 			ret = poll(input_fds, num_workers, POLL_TIMEOUT * 1000);
1105 #else
1106 			/* re-initialize timeout and input_mask before each
1107 			 * invocation of select(). I think this isn't
1108 			 * necessary on many Unixen, but just in case.
1109 			 */
1110 			timeout.tv_sec = POLL_TIMEOUT;
1111 			timeout.tv_usec = 0;
1112 
1113 			FD_ZERO(&input_mask);
1114 			for (i = 0, max_fd = 0; i < num_workers; i++)
1115 			{
1116 				FD_SET(PQsocket(workers.conns[i]), &input_mask);
1117 				if (PQsocket(workers.conns[i]) > max_fd)
1118 					max_fd = PQsocket(workers.conns[i]);
1119 			}
1120 
1121 			ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout);
1122 #endif
1123 			/* XXX: the errno != EINTR check means we won't bail
1124 			 * out on SIGINT. We should probably just remove this
1125 			 * check, though it seems we also need to fix up
1126 			 * the on_interrupt handling for workers' index
1127 			 * builds (those PGconns don't seem to have c->cancel
1128 			 * set, so we don't cancel the in-progress builds).
1129 			 */
1130 			if (ret < 0 && errno != EINTR)
1131 				elog(ERROR, "poll() failed: %d, %d", ret, errno);
1132 
1133 			elog(DEBUG2, "Poll returned: %d", ret);
1134 
1135 			for (i = 0; i < num_indexes; i++)
1136 			{
1137 				if (index_jobs[i].status == INPROGRESS)
1138 				{
1139 					Assert(index_jobs[i].worker_idx >= 0);
1140 					/* Must call PQconsumeInput before we can check PQisBusy */
1141 					if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1)
1142 					{
1143 						elog(WARNING, "Error fetching async query status: %s",
1144 							 PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
1145 						have_error = true;
1146 						goto cleanup;
1147 					}
1148 					if (!PQisBusy(workers.conns[index_jobs[i].worker_idx]))
1149 					{
1150 						elog(LOG, "Command finished in worker %d: %s",
1151 							 index_jobs[i].worker_idx,
1152 							 index_jobs[i].create_index);
1153 
1154 						while ((res = PQgetResult(workers.conns[index_jobs[i].worker_idx])))
1155 						{
1156 							if (PQresultStatus(res) != PGRES_COMMAND_OK)
1157 							{
1158 								elog(WARNING, "Error with create index: %s",
1159 									 PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
1160 								have_error = true;
1161 								goto cleanup;
1162 							}
1163 							CLEARPGRES(res);
1164 						}
1165 
1166 						/* We are only going to re-queue one worker, even
1167 						 * though more than one index build might be finished.
1168 						 * Any other jobs which may be finished will
1169 						 * just have to wait for the next pass through the
1170 						 * poll()/select() loop.
1171 						 */
1172 						freed_worker = index_jobs[i].worker_idx;
1173 						index_jobs[i].status = FINISHED;
1174 						num_active_workers--;
1175 						break;
1176 					}
1177 				}
1178 			}
1179 			if (freed_worker > -1)
1180 			{
1181 				for (i = 0; i < num_indexes; i++)
1182 				{
1183 					if (index_jobs[i].status == UNPROCESSED)
1184 					{
1185 						index_jobs[i].status = INPROGRESS;
1186 						index_jobs[i].worker_idx = freed_worker;
1187 						elog(LOG, "Assigning worker %d to build index #%d: "
1188 							 "%s", freed_worker, i,
1189 							 index_jobs[i].create_index);
1190 
1191 						if (!(PQsendQuery(workers.conns[freed_worker],
1192 										  index_jobs[i].create_index))) {
1193 							elog(WARNING, "Error sending async query: %s\n%s",
1194 								 index_jobs[i].create_index,
1195 								 PQerrorMessage(workers.conns[freed_worker]));
1196 							have_error = true;
1197 							goto cleanup;
1198 						}
1199 						num_active_workers++;
1200 						break;
1201 					}
1202 				}
1203 				freed_worker = -1;
1204 			}
1205 		}
1206 
1207 	}
1208 
1209 cleanup:
1210 	CLEARPGRES(res);
1211 	return (!have_error);
1212 }
1213 
1214 
1215 /*
1216  * Re-organize one table.
1217  */
1218 static void
repack_one_table(repack_table * table,const char * orderby)1219 repack_one_table(repack_table *table, const char *orderby)
1220 {
1221 	PGresult	   *res = NULL;
1222 	const char	   *params[3];
1223 	int				num;
1224 	char		   *vxid = NULL;
1225 	char			buffer[12];
1226 	StringInfoData	sql;
1227 	bool            ret = false;
1228 	PGresult       *indexres = NULL;
1229 	const char     *indexparams[2];
1230 	char		    indexbuffer[12];
1231 	int             j;
1232 
1233 	/* appname will be "pg_repack" in normal use on 9.0+, or
1234 	 * "pg_regress" when run under `make installcheck`
1235 	 */
1236 	const char     *appname = getenv("PGAPPNAME");
1237 
1238 	/* Keep track of whether we have gotten through setup to install
1239 	 * the repack_trigger, log table, etc. ourselves. We don't want to
1240 	 * go through repack_cleanup() if we didn't actually set up the
1241 	 * trigger ourselves, lest we be cleaning up another pg_repack's mess,
1242 	 * or worse, interfering with a still-running pg_repack.
1243 	 */
1244 	bool            table_init = false;
1245 
1246 	initStringInfo(&sql);
1247 
1248 	elog(INFO, "repacking table \"%s\"", table->target_name);
1249 
1250 	elog(DEBUG2, "---- repack_one_table ----");
1251 	elog(DEBUG2, "target_name       : %s", table->target_name);
1252 	elog(DEBUG2, "target_oid        : %u", table->target_oid);
1253 	elog(DEBUG2, "target_toast      : %u", table->target_toast);
1254 	elog(DEBUG2, "target_tidx       : %u", table->target_tidx);
1255 	elog(DEBUG2, "pkid              : %u", table->pkid);
1256 	elog(DEBUG2, "ckid              : %u", table->ckid);
1257 	elog(DEBUG2, "create_pktype     : %s", table->create_pktype);
1258 	elog(DEBUG2, "create_log        : %s", table->create_log);
1259 	elog(DEBUG2, "create_trigger    : %s", table->create_trigger);
1260 	elog(DEBUG2, "enable_trigger    : %s", table->enable_trigger);
1261 	elog(DEBUG2, "create_table      : %s", table->create_table);
1262 	elog(DEBUG2, "copy_data         : %s", table->copy_data);
1263 	elog(DEBUG2, "alter_col_storage : %s", table->alter_col_storage ?
1264 		 table->alter_col_storage : "(skipped)");
1265 	elog(DEBUG2, "drop_columns      : %s", table->drop_columns ? table->drop_columns : "(skipped)");
1266 	elog(DEBUG2, "delete_log        : %s", table->delete_log);
1267 	elog(DEBUG2, "lock_table        : %s", table->lock_table);
1268 	elog(DEBUG2, "sql_peek          : %s", table->sql_peek);
1269 	elog(DEBUG2, "sql_insert        : %s", table->sql_insert);
1270 	elog(DEBUG2, "sql_delete        : %s", table->sql_delete);
1271 	elog(DEBUG2, "sql_update        : %s", table->sql_update);
1272 	elog(DEBUG2, "sql_pop           : %s", table->sql_pop);
1273 
1274 	if (dryrun)
1275 		return;
1276 
1277 	/* push repack_cleanup_callback() on stack to clean temporary objects */
1278 	pgut_atexit_push(repack_cleanup_callback, &table->target_oid);
1279 
1280 	/*
1281 	 * 1. Setup advisory lock and trigger on main table.
1282 	 */
1283 	elog(DEBUG2, "---- setup ----");
1284 
1285 	params[0] = utoa(table->target_oid, buffer);
1286 
1287 	if (!advisory_lock(connection, buffer))
1288 		goto cleanup;
1289 
1290 	if (!(lock_exclusive(connection, buffer, table->lock_table, true)))
1291 	{
1292 		if (no_kill_backend)
1293 			elog(INFO, "Skipping repack %s due to timeout", table->target_name);
1294 		else
1295 			elog(WARNING, "lock_exclusive() failed for %s", table->target_name);
1296 		goto cleanup;
1297 	}
1298 
1299 	/*
1300 	 * pg_get_indexdef requires an access share lock, so do those calls while
1301 	 * we have an access exclusive lock anyway, so we know they won't block.
1302 	 */
1303 
1304 	indexparams[0] = utoa(table->target_oid, indexbuffer);
1305 	indexparams[1] = moveidx ? tablespace : NULL;
1306 
1307 	/* First, just display a warning message for any invalid indexes
1308 	 * which may be on the table (mostly to match the behavior of 1.1.8).
1309 	 */
1310 	indexres = execute(
1311 		"SELECT pg_get_indexdef(indexrelid)"
1312 		" FROM pg_index WHERE indrelid = $1 AND NOT indisvalid",
1313 		1, indexparams);
1314 
1315 	for (j = 0; j < PQntuples(indexres); j++)
1316 	{
1317 		const char *indexdef;
1318 		indexdef = getstr(indexres, j, 0);
1319 		elog(WARNING, "skipping invalid index: %s", indexdef);
1320 	}
1321 
1322 	indexres = execute(
1323 		"SELECT indexrelid,"
1324 		" repack.repack_indexdef(indexrelid, indrelid, $2, FALSE) "
1325 		" FROM pg_index WHERE indrelid = $1 AND indisvalid",
1326 		2, indexparams);
1327 
1328 	table->n_indexes = PQntuples(indexres);
1329 	table->indexes = pgut_malloc(table->n_indexes * sizeof(repack_index));
1330 
1331 	for (j = 0; j < table->n_indexes; j++)
1332 	{
1333 		table->indexes[j].target_oid = getoid(indexres, j, 0);
1334 		table->indexes[j].create_index = getstr(indexres, j, 1);
1335 		table->indexes[j].status = UNPROCESSED;
1336 		table->indexes[j].worker_idx = -1; /* Unassigned */
1337 	}
1338 
1339 	for (j = 0; j < table->n_indexes; j++)
1340 	{
1341 		elog(DEBUG2, "index[%d].target_oid      : %u", j, table->indexes[j].target_oid);
1342 		elog(DEBUG2, "index[%d].create_index    : %s", j, table->indexes[j].create_index);
1343 	}
1344 
1345 
1346 	/*
1347 	 * Check if repack_trigger is not conflict with existing trigger. We can
1348 	 * find it out later but we check it in advance and go to cleanup if needed.
1349 	 * In AFTER trigger context, since triggered tuple is not changed by other
1350 	 * trigger we don't care about the fire order.
1351 	 */
1352 	res = execute("SELECT repack.conflicted_triggers($1)", 1, params);
1353 	if (PQntuples(res) > 0)
1354 	{
1355 		ereport(WARNING,
1356 				(errcode(E_PG_COMMAND),
1357 				 errmsg("the table \"%s\" already has a trigger called \"%s\"",
1358 						table->target_name, "repack_trigger"),
1359 				 errdetail(
1360 					 "The trigger was probably installed during a previous"
1361 					 " attempt to run pg_repack on the table which was"
1362 					 " interrupted and for some reason failed to clean up"
1363 					 " the temporary objects.  Please drop the trigger or drop"
1364 					" and recreate the pg_repack extension altogether"
1365 					 " to remove all the temporary objects left over.")));
1366 		goto cleanup;
1367 	}
1368 
1369 	CLEARPGRES(res);
1370 
1371 	command(table->create_pktype, 0, NULL);
1372 	temp_obj_num++;
1373 	command(table->create_log, 0, NULL);
1374 	temp_obj_num++;
1375 	command(table->create_trigger, 0, NULL);
1376 	temp_obj_num++;
1377 	command(table->enable_trigger, 0, NULL);
1378 	printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
1379 	command(sql.data, 0, NULL);
1380 
1381 	/* While we are still holding an AccessExclusive lock on the table, submit
1382 	 * the request for an AccessShare lock asynchronously from conn2.
1383 	 * We want to submit this query in conn2 while connection's
1384 	 * transaction still holds its lock, so that no DDL may sneak in
1385 	 * between the time that connection commits and conn2 gets its lock.
1386 	 */
1387 	pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1388 
1389 	/* grab the backend PID of conn2; we'll need this when querying
1390 	 * pg_locks momentarily.
1391 	 */
1392 	res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL);
1393 	buffer[0] = '\0';
1394 	strncat(buffer, PQgetvalue(res, 0, 0), sizeof(buffer) - 1);
1395 	CLEARPGRES(res);
1396 
1397 	/*
1398 	 * Not using lock_access_share() here since we know that
1399 	 * it's not possible to obtain the ACCESS SHARE lock right now
1400 	 * in conn2, since the primary connection holds ACCESS EXCLUSIVE.
1401 	 */
1402 	printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
1403 					 table->target_name);
1404 	elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
1405 	if (PQsetnonblocking(conn2, 1))
1406 	{
1407 		elog(WARNING, "Unable to set conn2 nonblocking.");
1408 		goto cleanup;
1409 	}
1410 	if (!(PQsendQuery(conn2, sql.data)))
1411 	{
1412 		elog(WARNING, "Error sending async query: %s\n%s", sql.data,
1413 			 PQerrorMessage(conn2));
1414 		goto cleanup;
1415 	}
1416 
1417 	/* Now that we've submitted the LOCK TABLE request through conn2,
1418 	 * look for and cancel any (potentially dangerous) DDL commands which
1419 	 * might also be waiting on our table lock at this point --
1420 	 * it's not safe to let them wait, because they may grab their
1421 	 * AccessExclusive lock before conn2 gets its AccessShare lock,
1422 	 * and perform unsafe DDL on the table.
1423 	 *
1424 	 * Normally, lock_access_share() would take care of this for us,
1425 	 * but we're not able to use it here.
1426 	 */
1427 	if (!(kill_ddl(connection, table->target_oid, true)))
1428 	{
1429 		if (no_kill_backend)
1430 			elog(INFO, "Skipping repack %s due to timeout.", table->target_name);
1431 		else
1432 			elog(WARNING, "kill_ddl() failed.");
1433 		goto cleanup;
1434 	}
1435 
1436 	/* We're finished killing off any unsafe DDL. COMMIT in our main
1437 	 * connection, so that conn2 may get its AccessShare lock.
1438 	 */
1439 	command("COMMIT", 0, NULL);
1440 
1441 	/* The main connection has now committed its repack_trigger,
1442 	 * log table, and temp. table. If any error occurs from this point
1443 	 * on and we bail out, we should try to clean those up.
1444 	 */
1445 	table_init = true;
1446 
1447 	/* Keep looping PQgetResult() calls until it returns NULL, indicating the
1448 	 * command is done and we have obtained our lock.
1449 	 */
1450 	while ((res = PQgetResult(conn2)))
1451 	{
1452 		elog(DEBUG2, "Waiting on ACCESS SHARE lock...");
1453 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
1454 		{
1455 			elog(WARNING, "Error with LOCK TABLE: %s", PQerrorMessage(conn2));
1456 			goto cleanup;
1457 		}
1458 		CLEARPGRES(res);
1459 	}
1460 
1461 	/* Turn conn2 back into blocking mode for further non-async use. */
1462 	if (PQsetnonblocking(conn2, 0))
1463 	{
1464 		elog(WARNING, "Unable to set conn2 blocking.");
1465 		goto cleanup;
1466 	}
1467 
1468 	/*
1469 	 * 2. Copy tuples into temp table.
1470 	 */
1471 	elog(DEBUG2, "---- copy tuples ----");
1472 
1473 	/* Must use SERIALIZABLE (or at least not READ COMMITTED) to avoid race
1474 	 * condition between the create_table statement and rows subsequently
1475 	 * being added to the log.
1476 	 */
1477 	command("BEGIN ISOLATION LEVEL SERIALIZABLE", 0, NULL);
1478 	/* SET work_mem = maintenance_work_mem */
1479 	command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL);
1480 	if (orderby && !orderby[0])
1481 		command("SET LOCAL synchronize_seqscans = off", 0, NULL);
1482 
1483 	/* Fetch an array of Virtual IDs of all transactions active right now.
1484 	 */
1485 	params[0] = buffer; /* backend PID of conn2 */
1486 	params[1] = PROGRAM_NAME;
1487 	res = execute(SQL_XID_SNAPSHOT, 2, params);
1488 	vxid = pgut_strdup(PQgetvalue(res, 0, 0));
1489 
1490 	CLEARPGRES(res);
1491 
1492 	/* Delete any existing entries in the log table now, since we have not
1493 	 * yet run the CREATE TABLE ... AS SELECT, which will take in all existing
1494 	 * rows from the target table; if we also included prior rows from the
1495 	 * log we could wind up with duplicates.
1496 	 */
1497 	command(table->delete_log, 0, NULL);
1498 
1499 	/* We need to be able to obtain an AccessShare lock on the target table
1500 	 * for the create_table command to go through, so go ahead and obtain
1501 	 * the lock explicitly.
1502 	 *
1503 	 * Since conn2 has been diligently holding its AccessShare lock, it
1504 	 * is possible that another transaction has been waiting to acquire
1505 	 * an AccessExclusive lock on the table (e.g. a concurrent ALTER TABLE
1506 	 * or TRUNCATE which we must not allow). If there are any such
1507 	 * transactions, lock_access_share() will kill them so that our
1508 	 * CREATE TABLE ... AS SELECT does not deadlock waiting for an
1509 	 * AccessShare lock.
1510 	 */
1511 	if (!(lock_access_share(connection, table->target_oid, table->target_name)))
1512 		goto cleanup;
1513 
1514 	/*
1515 	 * Before copying data to the target table, we need to set the column storage
1516 	 * type if its storage type has been changed from the type default.
1517 	 */
1518 	command(table->create_table, 0, NULL);
1519 	if (table->alter_col_storage)
1520 		command(table->alter_col_storage, 0, NULL);
1521 	command(table->copy_data, 0, NULL);
1522 	temp_obj_num++;
1523 	printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
1524 	if (table->drop_columns)
1525 		command(table->drop_columns, 0, NULL);
1526 	command(sql.data, 0, NULL);
1527 	command("COMMIT", 0, NULL);
1528 
1529 	/*
1530 	 * 3. Create indexes on temp table.
1531 	 */
1532 	if (!rebuild_indexes(table))
1533 		goto cleanup;
1534 
1535 	/* don't clear indexres until after rebuild_indexes or bad things happen */
1536 	CLEARPGRES(indexres);
1537 	CLEARPGRES(res);
1538 
1539 	/*
1540 	 * 4. Apply log to temp table until no tuples are left in the log
1541 	 * and all of the old transactions are finished.
1542 	 */
1543 	for (;;)
1544 	{
1545 		num = apply_log(connection, table, APPLY_COUNT);
1546 
1547 		/* We'll keep applying tuples from the log table in batches
1548 		 * of APPLY_COUNT, until applying a batch of tuples
1549 		 * (via LIMIT) results in our having applied
1550 		 * MIN_TUPLES_BEFORE_SWITCH or fewer tuples. We don't want to
1551 		 * get stuck repetitively applying some small number of tuples
1552 		 * from the log table as inserts/updates/deletes may be
1553 		 * constantly coming into the original table.
1554 		 */
1555 		if (num > MIN_TUPLES_BEFORE_SWITCH)
1556 			continue;	/* there might be still some tuples, repeat. */
1557 
1558 		/* old transactions still alive ? */
1559 		params[0] = vxid;
1560 		res = execute(SQL_XID_ALIVE, 1, params);
1561 		num = PQntuples(res);
1562 
1563 		if (num > 0)
1564 		{
1565 			/* Wait for old transactions.
1566 			 * Only display this message if we are NOT
1567 			 * running under pg_regress, so as not to cause
1568 			 * noise which would trip up pg_regress.
1569 			 */
1570 
1571 			if (!appname || strcmp(appname, "pg_regress") != 0)
1572 			{
1573 				elog(NOTICE, "Waiting for %d transactions to finish. First PID: %s", num, PQgetvalue(res, 0, 0));
1574 			}
1575 
1576 			CLEARPGRES(res);
1577 			sleep(1);
1578 			continue;
1579 		}
1580 		else
1581 		{
1582 			/* All old transactions are finished;
1583 			 * go to next step. */
1584 			CLEARPGRES(res);
1585 			break;
1586 		}
1587 	}
1588 
1589 	/*
1590 	 * 5. Swap: will be done with conn2, since it already holds an
1591 	 *    AccessShare lock.
1592 	 */
1593 	elog(DEBUG2, "---- swap ----");
1594 	/* Bump our existing AccessShare lock to AccessExclusive */
1595 
1596 	if (!(lock_exclusive(conn2, utoa(table->target_oid, buffer),
1597 						 table->lock_table, false)))
1598 	{
1599 		elog(WARNING, "lock_exclusive() failed in conn2 for %s",
1600 			 table->target_name);
1601 		goto cleanup;
1602 	}
1603 
1604 	apply_log(conn2, table, 0);
1605 	params[0] = utoa(table->target_oid, buffer);
1606 	pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params);
1607 	pgut_command(conn2, "COMMIT", 0, NULL);
1608 
1609 	/*
1610 	 * 6. Drop.
1611 	 */
1612 	elog(DEBUG2, "---- drop ----");
1613 
1614 	command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1615 	if (!(lock_exclusive(connection, utoa(table->target_oid, buffer),
1616 						 table->lock_table, false)))
1617 	{
1618 		elog(WARNING, "lock_exclusive() failed in connection for %s",
1619 			 table->target_name);
1620 		goto cleanup;
1621 	}
1622 
1623 	params[1] = utoa(temp_obj_num, indexbuffer);
1624 	command("SELECT repack.repack_drop($1, $2)", 2, params);
1625 	command("COMMIT", 0, NULL);
1626 	temp_obj_num = 0; /* reset temporary object counter after cleanup */
1627 
1628 	/*
1629 	 * 7. Analyze.
1630 	 * Note that cleanup hook has been already uninstalled here because analyze
1631 	 * is not an important operation; No clean up even if failed.
1632 	 */
1633 	if (analyze)
1634 	{
1635 		elog(DEBUG2, "---- analyze ----");
1636 
1637 		command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1638 		printfStringInfo(&sql, "ANALYZE %s", table->target_name);
1639 		command(sql.data, 0, NULL);
1640 		command("COMMIT", 0, NULL);
1641 	}
1642 
1643 	/* Release advisory lock on table. */
1644 	params[0] = REPACK_LOCK_PREFIX_STR;
1645 	params[1] = utoa(table->target_oid, buffer);
1646 
1647 	res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))",
1648 			   2, params);
1649 	ret = true;
1650 
1651 cleanup:
1652 	CLEARPGRES(res);
1653 	termStringInfo(&sql);
1654 	if (vxid)
1655 		free(vxid);
1656 
1657 	/* Rollback current transactions */
1658 	pgut_rollback(connection);
1659 	pgut_rollback(conn2);
1660 
1661 	/* XXX: distinguish between fatal and non-fatal errors via the first
1662 	 * arg to repack_cleanup().
1663 	 */
1664 	if ((!ret) && table_init)
1665 		repack_cleanup(false, table);
1666 }
1667 
1668 /* Kill off any concurrent DDL (or any transaction attempting to take
1669  * an AccessExclusive lock) trying to run against our table if we want to
1670  * do. Note, we're killing these queries off *before* they are granted
1671  * an AccessExclusive lock on our table.
1672  *
1673  * Returns true if no problems encountered, false otherwise.
1674  */
1675 static bool
kill_ddl(PGconn * conn,Oid relid,bool terminate)1676 kill_ddl(PGconn *conn, Oid relid, bool terminate)
1677 {
1678 	bool			ret = true;
1679 	PGresult	   *res;
1680 	StringInfoData	sql;
1681 	int				n_tuples;
1682 
1683 	initStringInfo(&sql);
1684 
1685 	/* Check the number of backends competing AccessExclusiveLock */
1686 	printfStringInfo(&sql, COUNT_COMPETING_LOCKS, relid);
1687 	res = pgut_execute(conn, sql.data, 0, NULL);
1688 	n_tuples = PQntuples(res);
1689 
1690 	if (n_tuples != 0)
1691 	{
1692 		/* Competing backend is exsits, but if we do not want to calcel/terminate
1693 		 * any backend, do nothing.
1694 		 */
1695 		if (no_kill_backend)
1696 		{
1697 			elog(WARNING, "%d unsafe queries remain but do not cancel them and skip to repack it",
1698 				 n_tuples);
1699 			ret = false;
1700 		}
1701 		else
1702 		{
1703 			resetStringInfo(&sql);
1704 			printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, relid);
1705 			res = pgut_execute(conn, sql.data, 0, NULL);
1706 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
1707 			{
1708 				elog(WARNING, "Error canceling unsafe queries: %s",
1709 					 PQerrorMessage(conn));
1710 				ret = false;
1711 			}
1712 			else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400)
1713 			{
1714 				elog(WARNING,
1715 					 "Canceled %d unsafe queries. Terminating any remaining PIDs.",
1716 					 PQntuples(res));
1717 
1718 				CLEARPGRES(res);
1719 				printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid);
1720 				res = pgut_execute(conn, sql.data, 0, NULL);
1721 				if (PQresultStatus(res) != PGRES_TUPLES_OK)
1722 				{
1723 					elog(WARNING, "Error killing unsafe queries: %s",
1724 						 PQerrorMessage(conn));
1725 					ret = false;
1726 				}
1727 			}
1728 			else if (PQntuples(res) > 0)
1729 				elog(NOTICE, "Canceled %d unsafe queries", PQntuples(res));
1730 		}
1731 	}
1732 	else
1733 		elog(DEBUG2, "No competing DDL to cancel.");
1734 
1735 	CLEARPGRES(res);
1736 	termStringInfo(&sql);
1737 
1738 	return ret;
1739 }
1740 
1741 
1742 /*
1743  * Try to acquire an ACCESS SHARE table lock, avoiding deadlocks and long
1744  * waits by killing off other sessions which may be stuck trying to obtain
1745  * an ACCESS EXCLUSIVE lock.
1746  *
1747  * Arguments:
1748  *
1749  *  conn: connection to use
1750  *  relid: OID of relation
1751  *  target_name: name of table
1752  */
1753 static bool
lock_access_share(PGconn * conn,Oid relid,const char * target_name)1754 lock_access_share(PGconn *conn, Oid relid, const char *target_name)
1755 {
1756 	StringInfoData	sql;
1757 	time_t			start = time(NULL);
1758 	int				i;
1759 	bool			ret = true;
1760 
1761 	initStringInfo(&sql);
1762 
1763 	for (i = 1; ; i++)
1764 	{
1765 		time_t		duration;
1766 		PGresult   *res;
1767 		int			wait_msec;
1768 
1769 		duration = time(NULL) - start;
1770 
1771 		/* Cancel queries unconditionally, i.e. don't bother waiting
1772 		 * wait_timeout as lock_exclusive() does -- the only queries we
1773 		 * should be killing are disallowed DDL commands hanging around
1774 		 * for an AccessExclusive lock, which must be deadlocked at
1775 		 * this point anyway since conn2 holds its AccessShare lock
1776 		 * already.
1777 		 */
1778 		if (duration > (wait_timeout * 2))
1779 			ret = kill_ddl(conn, relid, true);
1780 		else
1781 			ret = kill_ddl(conn, relid, false);
1782 
1783 		if (!ret)
1784 			break;
1785 
1786 		/* wait for a while to lock the table. */
1787 		wait_msec = Min(1000, i * 100);
1788 		printfStringInfo(&sql, "SET LOCAL statement_timeout = %d", wait_msec);
1789 		pgut_command(conn, sql.data, 0, NULL);
1790 
1791 		printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", target_name);
1792 		res = pgut_execute_elevel(conn, sql.data, 0, NULL, DEBUG2);
1793 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
1794 		{
1795 			CLEARPGRES(res);
1796 			break;
1797 		}
1798 		else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED))
1799 		{
1800 			/* retry if lock conflicted */
1801 			CLEARPGRES(res);
1802 			pgut_rollback(conn);
1803 			continue;
1804 		}
1805 		else
1806 		{
1807 			/* exit otherwise */
1808 			elog(WARNING, "%s", PQerrorMessage(connection));
1809 			CLEARPGRES(res);
1810 			ret = false;
1811 			break;
1812 		}
1813 	}
1814 
1815 	termStringInfo(&sql);
1816 	pgut_command(conn, "RESET statement_timeout", 0, NULL);
1817 	return ret;
1818 }
1819 
1820 
1821 /* Obtain an advisory lock on the table's OID, to make sure no other
1822  * pg_repack is working on the table. This is not so much a concern with
1823  * full-table repacks, but mainly so that index-only repacks don't interfere
1824  * with each other or a full-table repack.
1825  */
advisory_lock(PGconn * conn,const char * relid)1826 static bool advisory_lock(PGconn *conn, const char *relid)
1827 {
1828 	PGresult	   *res = NULL;
1829 	bool			ret = false;
1830 	const char	   *params[2];
1831 
1832 	params[0] = REPACK_LOCK_PREFIX_STR;
1833 	params[1] = relid;
1834 
1835 	/* For the 2-argument form of pg_try_advisory_lock, we need to
1836 	 * pass in two signed 4-byte integers. But a table OID is an
1837 	 * *unsigned* 4-byte integer. Add -2147483648 to that OID to make
1838 	 * it fit reliably into signed int space.
1839 	 */
1840 	res = pgut_execute(conn, "SELECT pg_try_advisory_lock($1, CAST(-2147483648 + $2::bigint AS integer))",
1841 			   2, params);
1842 
1843 	if (PQresultStatus(res) != PGRES_TUPLES_OK) {
1844 		elog(ERROR, "%s",  PQerrorMessage(connection));
1845 	}
1846 	else if (strcmp(getstr(res, 0, 0), "t") != 0) {
1847 		elog(ERROR, "Another pg_repack command may be running on the table. Please try again later.");
1848 	}
1849 	else {
1850 		ret = true;
1851 	}
1852 	CLEARPGRES(res);
1853 	return ret;
1854 }
1855 
1856 /*
1857  * Try acquire an ACCESS EXCLUSIVE table lock, avoiding deadlocks and long
1858  * waits by killing off other sessions.
1859  * Arguments:
1860  *
1861  *  conn: connection to use
1862  *  relid: OID of relation
1863  *  lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
1864  *  start_xact: whether we will issue a BEGIN ourselves. If not, we will
1865  *              use a SAVEPOINT and ROLLBACK TO SAVEPOINT if our query
1866  *              times out, to avoid leaving the transaction in error state.
1867  */
1868 static bool
lock_exclusive(PGconn * conn,const char * relid,const char * lock_query,bool start_xact)1869 lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact)
1870 {
1871 	time_t		start = time(NULL);
1872 	int			i;
1873 	bool		ret = true;
1874 
1875 	for (i = 1; ; i++)
1876 	{
1877 		time_t		duration;
1878 		char		sql[1024];
1879 		PGresult   *res;
1880 		int			wait_msec;
1881 
1882 		if (start_xact)
1883 			pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1884 		else
1885 			pgut_command(conn, "SAVEPOINT repack_sp1", 0, NULL);
1886 
1887 		duration = time(NULL) - start;
1888 		if (duration > wait_timeout)
1889 		{
1890 			if (no_kill_backend)
1891 			{
1892 				elog(WARNING, "timed out, do not cancel conflicting backends");
1893 				ret = false;
1894 
1895 				/* Before exit the loop reset the transaction */
1896 				if (start_xact)
1897 					pgut_rollback(conn);
1898 				else
1899 					pgut_command(conn, "ROLLBACK TO SAVEPOINT repack_sp1", 0, NULL);
1900 				break;
1901 			}
1902 			else
1903 			{
1904 				const char *cancel_query;
1905 				if (PQserverVersion(conn) >= 80400 &&
1906 					duration > wait_timeout * 2)
1907 				{
1908 					elog(WARNING, "terminating conflicted backends");
1909 					cancel_query =
1910 						"SELECT pg_terminate_backend(pid) FROM pg_locks"
1911 						" WHERE locktype = 'relation'"
1912 						"   AND relation = $1 AND pid <> pg_backend_pid()";
1913 				}
1914 				else
1915 				{
1916 					elog(WARNING, "canceling conflicted backends");
1917 					cancel_query =
1918 						"SELECT pg_cancel_backend(pid) FROM pg_locks"
1919 						" WHERE locktype = 'relation'"
1920 						"   AND relation = $1 AND pid <> pg_backend_pid()";
1921 				}
1922 
1923 				pgut_command(conn, cancel_query, 1, &relid);
1924 			}
1925 		}
1926 
1927 		/* wait for a while to lock the table. */
1928 		wait_msec = Min(1000, i * 100);
1929 		snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
1930 		pgut_command(conn, sql, 0, NULL);
1931 
1932 		res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2);
1933 		if (PQresultStatus(res) == PGRES_COMMAND_OK)
1934 		{
1935 			CLEARPGRES(res);
1936 			break;
1937 		}
1938 		else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED))
1939 		{
1940 			/* retry if lock conflicted */
1941 			CLEARPGRES(res);
1942 			if (start_xact)
1943 				pgut_rollback(conn);
1944 			else
1945 				pgut_command(conn, "ROLLBACK TO SAVEPOINT repack_sp1", 0, NULL);
1946 			continue;
1947 		}
1948 		else
1949 		{
1950 			/* exit otherwise */
1951 			printf("%s", PQerrorMessage(connection));
1952 			CLEARPGRES(res);
1953 			ret = false;
1954 			break;
1955 		}
1956 	}
1957 
1958 	pgut_command(conn, "RESET statement_timeout", 0, NULL);
1959 	return ret;
1960 }
1961 
1962 /* This function calls to repack_drop() to clean temporary objects on error
1963  * in creation of temporary objects.
1964  */
1965 void
repack_cleanup_callback(bool fatal,void * userdata)1966 repack_cleanup_callback(bool fatal, void *userdata)
1967 {
1968 	Oid			target_table = *(Oid *) userdata;
1969 	const char *params[2];
1970 	char		buffer[12];
1971 	char		num_buff[12];
1972 
1973 	if(fatal)
1974 	{
1975 		params[0] = utoa(target_table, buffer);
1976 		params[1] = utoa(temp_obj_num, num_buff);
1977 
1978 		/* testing PQstatus() of connection and conn2, as we do
1979 		 * in repack_cleanup(), doesn't seem to work here,
1980 		 * so just use an unconditional reconnect().
1981 		 */
1982 		reconnect(ERROR);
1983 		command("SELECT repack.repack_drop($1, $2)", 2, params);
1984 		temp_obj_num = 0; /* reset temporary object counter after cleanup */
1985 	}
1986 }
1987 
1988 /*
1989  * The userdata pointing a table being re-organized. We need to cleanup temp
1990  * objects before the program exits.
1991  */
1992 static void
repack_cleanup(bool fatal,const repack_table * table)1993 repack_cleanup(bool fatal, const repack_table *table)
1994 {
1995 	if (fatal)
1996 	{
1997 		fprintf(stderr, "!!!FATAL ERROR!!! Please refer to the manual.\n\n");
1998 	}
1999 	else
2000 	{
2001 		char		buffer[12];
2002 		char		num_buff[12];
2003 		const char *params[2];
2004 
2005 		/* Try reconnection if not available. */
2006 		if (PQstatus(connection) != CONNECTION_OK ||
2007 			PQstatus(conn2) != CONNECTION_OK)
2008 			reconnect(ERROR);
2009 
2010 		/* do cleanup */
2011 		params[0] = utoa(table->target_oid, buffer);
2012 		params[1] =  utoa(temp_obj_num, num_buff);
2013 		command("SELECT repack.repack_drop($1, $2)", 2, params);
2014 		temp_obj_num = 0; /* reset temporary object counter after cleanup */
2015 	}
2016 }
2017 
2018 /*
2019  * Indexes of a table are repacked.
2020  */
2021 static bool
repack_table_indexes(PGresult * index_details)2022 repack_table_indexes(PGresult *index_details)
2023 {
2024 	bool				ret = false;
2025 	PGresult			*res = NULL, *res2 = NULL;
2026 	StringInfoData		sql, sql_drop;
2027 	char				buffer[2][12];
2028 	const char			*create_idx, *schema_name, *table_name, *params[3];
2029 	Oid					table, index;
2030 	int					i, num, num_repacked = 0;
2031 	bool                *repacked_indexes;
2032 
2033 	initStringInfo(&sql);
2034 
2035 	num = PQntuples(index_details);
2036 	table = getoid(index_details, 0, 3);
2037 	params[1] = utoa(table, buffer[1]);
2038 	params[2] = tablespace;
2039 	schema_name = getstr(index_details, 0, 5);
2040 	/* table_name is schema-qualified */
2041 	table_name = getstr(index_details, 0, 4);
2042 
2043 	/* Keep track of which of the table's indexes we have successfully
2044 	 * repacked, so that we may DROP only those indexes.
2045 	 */
2046 	if (!(repacked_indexes = calloc(num, sizeof(bool))))
2047 		ereport(ERROR, (errcode(ENOMEM),
2048 						errmsg("Unable to calloc repacked_indexes")));
2049 
2050 	/* Check if any concurrent pg_repack command is being run on the same
2051 	 * table.
2052 	 */
2053 	if (!advisory_lock(connection, params[1]))
2054 		ereport(ERROR, (errcode(EINVAL),
2055 			errmsg("Unable to obtain advisory lock on \"%s\"", table_name)));
2056 
2057 	for (i = 0; i < num; i++)
2058 	{
2059 		char *isvalid = getstr(index_details, i, 2);
2060 		char *idx_name = getstr(index_details, i, 0);
2061 
2062 		if (isvalid[0] == 't')
2063 		{
2064 			index = getoid(index_details, i, 1);
2065 
2066 			resetStringInfo(&sql);
2067 			appendStringInfo(&sql, "SELECT pgc.relname, nsp.nspname "
2068 							 "FROM pg_class pgc INNER JOIN pg_namespace nsp "
2069 							 "ON nsp.oid = pgc.relnamespace "
2070 							 "WHERE pgc.relname = 'index_%u' "
2071 							 "AND nsp.nspname = $1", index);
2072 			params[0] = schema_name;
2073 			elog(INFO, "repacking index \"%s\"", idx_name);
2074 			res = execute(sql.data, 1, params);
2075 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
2076 			{
2077 				elog(WARNING, "%s", PQerrorMessage(connection));
2078 				continue;
2079 			}
2080 			if (PQntuples(res) > 0)
2081 			{
2082 				ereport(WARNING,
2083 						(errcode(E_PG_COMMAND),
2084 						 errmsg("Cannot create index \"%s\".\"index_%u\", "
2085 								"already exists", schema_name, index),
2086 						 errdetail("An invalid index may have been left behind"
2087 								   " by a previous pg_repack on the table"
2088 								   " which was interrupted. Please use DROP "
2089 								   "INDEX \"%s\".\"index_%u\""
2090 								   " to remove this index and try again.",
2091 								   schema_name, index)));
2092 				continue;
2093 			}
2094 
2095 			if (dryrun)
2096 				continue;
2097 
2098 			params[0] = utoa(index, buffer[0]);
2099 			res = execute("SELECT repack.repack_indexdef($1, $2, $3, true)", 3,
2100 						  params);
2101 
2102 			if (PQntuples(res) < 1)
2103 			{
2104 				elog(WARNING,
2105 					"unable to generate SQL to CREATE work index for %s",
2106 					getstr(index_details, i, 0));
2107 				continue;
2108 			}
2109 
2110 			create_idx = getstr(res, 0, 0);
2111 			/* Use a separate PGresult to avoid stomping on create_idx */
2112 			res2 = execute_elevel(create_idx, 0, NULL, DEBUG2);
2113 
2114 			if (PQresultStatus(res2) != PGRES_COMMAND_OK)
2115 			{
2116 				ereport(WARNING,
2117 						(errcode(E_PG_COMMAND),
2118 						 errmsg("Error creating index \"%s\".\"index_%u\": %s",
2119 								schema_name, index, PQerrorMessage(connection)
2120 							 ) ));
2121 			}
2122 			else
2123 			{
2124 				repacked_indexes[i] = true;
2125 				num_repacked++;
2126 			}
2127 
2128 			CLEARPGRES(res);
2129 			CLEARPGRES(res2);
2130 		}
2131 		else
2132 			elog(WARNING, "skipping invalid index: %s.%s", schema_name,
2133 				 getstr(index_details, i, 0));
2134 	}
2135 
2136 	if (dryrun) {
2137 		ret = true;
2138 		goto done;
2139 	}
2140 
2141 	/* If we did not successfully repack any indexes, e.g. because of some
2142 	 * error affecting every CREATE INDEX attempt, don't waste time with
2143 	 * the ACCESS EXCLUSIVE lock on the table, and return false.
2144 	 * N.B. none of the DROP INDEXes should be performed since
2145 	 * repacked_indexes[] flags should all be false.
2146 	 */
2147 	if (!num_repacked)
2148 	{
2149 		elog(WARNING,
2150 			 "Skipping index swapping for \"%s\", since no new indexes built",
2151 			 table_name);
2152 		goto drop_idx;
2153 	}
2154 
2155 	/* take an exclusive lock on table before calling repack_index_swap() */
2156 	resetStringInfo(&sql);
2157 	appendStringInfo(&sql, "LOCK TABLE %s IN ACCESS EXCLUSIVE MODE",
2158 					 table_name);
2159 	if (!(lock_exclusive(connection, params[1], sql.data, true)))
2160 	{
2161 		elog(WARNING, "lock_exclusive() failed in connection for %s",
2162 			 table_name);
2163 		goto drop_idx;
2164 	}
2165 
2166 	for (i = 0; i < num; i++)
2167 	{
2168 		index = getoid(index_details, i, 1);
2169 		if (repacked_indexes[i])
2170 		{
2171 			params[0] = utoa(index, buffer[0]);
2172 			pgut_command(connection, "SELECT repack.repack_index_swap($1)", 1,
2173 						 params);
2174 		}
2175 		else
2176 			elog(INFO, "Skipping index swap for index_%u", index);
2177 	}
2178 	pgut_command(connection, "COMMIT", 0, NULL);
2179 	ret = true;
2180 
2181 drop_idx:
2182 	resetStringInfo(&sql);
2183 	initStringInfo(&sql_drop);
2184 	appendStringInfoString(&sql, "DROP INDEX CONCURRENTLY ");
2185 	appendStringInfo(&sql, "\"%s\".",  schema_name);
2186 
2187 	for (i = 0; i < num; i++)
2188 	{
2189 		index = getoid(index_details, i, 1);
2190 		if (repacked_indexes[i])
2191 		{
2192 			initStringInfo(&sql_drop);
2193 			appendStringInfo(&sql_drop, "%s\"index_%u\"", sql.data, index);
2194 			command(sql_drop.data, 0, NULL);
2195 		}
2196 		else
2197 			elog(INFO, "Skipping drop of index_%u", index);
2198 	}
2199 	termStringInfo(&sql_drop);
2200 	termStringInfo(&sql);
2201 
2202 done:
2203 	CLEARPGRES(res);
2204 	free(repacked_indexes);
2205 
2206 	return ret;
2207 }
2208 
2209 /*
2210  * Call repack_table_indexes for each of the tables
2211  */
2212 static bool
repack_all_indexes(char * errbuf,size_t errsize)2213 repack_all_indexes(char *errbuf, size_t errsize)
2214 {
2215 	bool					ret = false;
2216 	PGresult				*res = NULL;
2217 	StringInfoData			sql;
2218 	SimpleStringListCell	*cell = NULL;
2219 	const char				*params[1];
2220 
2221 	initStringInfo(&sql);
2222 	reconnect(ERROR);
2223 
2224 	assert(r_index.head || table_list.head || parent_table_list.head);
2225 
2226 	if (!preliminary_checks(errbuf, errsize))
2227 		goto cleanup;
2228 
2229 	if (!is_requested_relation_exists(errbuf, errsize))
2230 		goto cleanup;
2231 
2232 	if (r_index.head)
2233 	{
2234 		appendStringInfoString(&sql,
2235 			"SELECT repack.oid2text(i.oid), idx.indexrelid, idx.indisvalid, idx.indrelid, repack.oid2text(idx.indrelid), n.nspname"
2236 			" FROM pg_index idx JOIN pg_class i ON i.oid = idx.indexrelid"
2237 			" JOIN pg_namespace n ON n.oid = i.relnamespace"
2238 			" WHERE idx.indexrelid = $1::regclass ORDER BY indisvalid DESC, i.relname, n.nspname");
2239 
2240 		cell = r_index.head;
2241 	}
2242 	else if (table_list.head || parent_table_list.head)
2243 	{
2244 		appendStringInfoString(&sql,
2245 			"SELECT repack.oid2text(i.oid), idx.indexrelid, idx.indisvalid, idx.indrelid, $1::text, n.nspname"
2246 			" FROM pg_index idx JOIN pg_class i ON i.oid = idx.indexrelid"
2247 			" JOIN pg_namespace n ON n.oid = i.relnamespace"
2248 			" WHERE idx.indrelid = $1::regclass ORDER BY indisvalid DESC, i.relname, n.nspname");
2249 
2250 		for (cell = parent_table_list.head; cell; cell = cell->next)
2251 		{
2252 			int nchildren, i;
2253 
2254 			params[0] = cell->val;
2255 
2256 			/* find children of this parent table */
2257 			res = execute_elevel("SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname)"
2258 								 " FROM pg_class c JOIN pg_namespace n on n.oid = c.relnamespace"
2259 								 " WHERE c.oid = ANY (repack.get_table_and_inheritors($1::regclass))"
2260 								 " ORDER BY n.nspname, c.relname", 1, params, DEBUG2);
2261 
2262 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
2263 			{
2264 				elog(WARNING, "%s", PQerrorMessage(connection));
2265 				continue;
2266 			}
2267 
2268 			nchildren = PQntuples(res);
2269 
2270 			if (nchildren == 0)
2271 			{
2272 				elog(WARNING, "relation \"%s\" does not exist", cell->val);
2273 				continue;
2274 			}
2275 
2276 			/* append new tables to 'table_list' */
2277 			for (i = 0; i < nchildren; i++)
2278 				simple_string_list_append(&table_list, getstr(res, i, 0));
2279 		}
2280 
2281 		CLEARPGRES(res);
2282 
2283 		cell = table_list.head;
2284 	}
2285 
2286 	for (; cell; cell = cell->next)
2287 	{
2288 		params[0] = cell->val;
2289 		res = execute_elevel(sql.data, 1, params, DEBUG2);
2290 
2291 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
2292 		{
2293 			elog(WARNING, "%s", PQerrorMessage(connection));
2294 			continue;
2295 		}
2296 
2297 		if (PQntuples(res) == 0)
2298 		{
2299 			if(table_list.head)
2300 				elog(WARNING, "\"%s\" does not have any indexes",
2301 					cell->val);
2302 			else if(r_index.head)
2303 				elog(WARNING, "\"%s\" is not a valid index",
2304 					cell->val);
2305 
2306 			continue;
2307 		}
2308 
2309 		if(table_list.head)
2310 			elog(INFO, "repacking indexes of \"%s\"", cell->val);
2311 
2312 		if (!repack_table_indexes(res))
2313 			elog(WARNING, "repack failed for \"%s\"", cell->val);
2314 
2315 		CLEARPGRES(res);
2316 	}
2317 	ret = true;
2318 
2319 cleanup:
2320 	disconnect();
2321 	termStringInfo(&sql);
2322 	return ret;
2323 }
2324 
2325 void
pgut_help(bool details)2326 pgut_help(bool details)
2327 {
2328 	printf("%s re-organizes a PostgreSQL database.\n\n", PROGRAM_NAME);
2329 	printf("Usage:\n");
2330 	printf("  %s [OPTION]... [DBNAME]\n", PROGRAM_NAME);
2331 
2332 	if (!details)
2333 		return;
2334 
2335 	printf("Options:\n");
2336 	printf("  -a, --all                 repack all databases\n");
2337 	printf("  -t, --table=TABLE         repack specific table only\n");
2338 	printf("  -I, --parent-table=TABLE  repack specific parent table and its inheritors\n");
2339 	printf("  -c, --schema=SCHEMA       repack tables in specific schema only\n");
2340 	printf("  -s, --tablespace=TBLSPC   move repacked tables to a new tablespace\n");
2341 	printf("  -S, --moveidx             move repacked indexes to TBLSPC too\n");
2342 	printf("  -o, --order-by=COLUMNS    order by columns instead of cluster keys\n");
2343 	printf("  -n, --no-order            do vacuum full instead of cluster\n");
2344 	printf("  -N, --dry-run             print what would have been repacked\n");
2345 	printf("  -j, --jobs=NUM            Use this many parallel jobs for each table\n");
2346 	printf("  -i, --index=INDEX         move only the specified index\n");
2347 	printf("  -x, --only-indexes        move only indexes of the specified table\n");
2348 	printf("  -T, --wait-timeout=SECS   timeout to cancel other backends on conflict\n");
2349 	printf("  -D, --no-kill-backend     don't kill other backends when timed out\n");
2350 	printf("  -Z, --no-analyze          don't analyze at end\n");
2351 	printf("  -k, --no-superuser-check  skip superuser checks in client\n");
2352 	printf("  -C, --exclude-extension   don't repack tables which belong to specific extension\n");
2353 }
2354