1 /*
2 * pg_repack.c: bin/pg_repack.c
3 *
4 * Portions Copyright (c) 2008-2011, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
5 * Portions Copyright (c) 2011, Itagaki Takahiro
6 * Portions Copyright (c) 2012-2020, The Reorg Development Team
7 */
8
9 /**
10 * @brief Client Modules
11 */
12
13 const char *PROGRAM_URL = "https://reorg.github.io/pg_repack/";
14 const char *PROGRAM_ISSUES = "https://github.com/reorg/pg_repack/issues";
15
16 #ifdef REPACK_VERSION
17 /* macro trick to stringify a macro expansion */
18 #define xstr(s) str(s)
19 #define str(s) #s
20 const char *PROGRAM_VERSION = xstr(REPACK_VERSION);
21 #else
22 const char *PROGRAM_VERSION = "unknown";
23 #endif
24
25 #include "pgut/pgut-fe.h"
26
27 #include <errno.h>
28 #include <string.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31 #include <time.h>
32
33
34 #ifdef HAVE_POLL_H
35 #include <poll.h>
36 #endif
37 #ifdef HAVE_SYS_POLL_H
38 #include <sys/poll.h>
39 #endif
40 #ifdef HAVE_SYS_SELECT_H
41 #include <sys/select.h>
42 #endif
43
44
45 /*
46 * APPLY_COUNT: Number of applied logs per transaction. Larger values
47 * could be faster, but will be long transactions in the REDO phase.
48 */
49 #define APPLY_COUNT 1000
50
51 /* Once we get down to seeing fewer than this many tuples in the
52 * log table, we'll say that we're ready to perform the switch.
53 */
54 #define MIN_TUPLES_BEFORE_SWITCH 20
55
56 /* poll() or select() timeout, in seconds */
57 #define POLL_TIMEOUT 3
58
59 /* Compile an array of existing transactions which are active during
60 * pg_repack's setup. Some transactions we can safely ignore:
61 * a. The '1/1, -1/0' lock skipped is from the bgwriter on newly promoted
62 * servers. See https://github.com/reorg/pg_reorg/issues/1
63 * b. Our own database connections
64 * c. Other pg_repack clients, as distinguished by application_name, which
65 * may be operating on other tables at the same time. See
66 * https://github.com/reorg/pg_repack/issues/1
67 * d. open transactions/locks existing on other databases than the actual
68 * processing relation (except for locks on shared objects)
69 * e. VACUUMs which are always executed outside transaction blocks.
70 *
71 * Note, there is some redundancy in how the filtering is done (e.g. excluding
72 * based on pg_backend_pid() and application_name), but that shouldn't hurt
73 * anything. Also, the test of application_name is not bulletproof -- for
74 * instance, the application name when running installcheck will be
75 * pg_regress.
76 */
77 #define SQL_XID_SNAPSHOT_90200 \
78 "SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
79 " FROM pg_locks AS l " \
80 " LEFT JOIN pg_stat_activity AS a " \
81 " ON l.pid = a.pid " \
82 " LEFT JOIN pg_database AS d " \
83 " ON a.datid = d.oid " \
84 " WHERE l.locktype = 'virtualxid' " \
85 " AND l.pid NOT IN (pg_backend_pid(), $1) " \
86 " AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
87 " AND (a.application_name IS NULL OR a.application_name <> $2)" \
88 " AND a.query !~* E'^\\\\s*vacuum\\\\s+' " \
89 " AND a.query !~ E'^autovacuum: ' " \
90 " AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)"
91
92 #define SQL_XID_SNAPSHOT_90000 \
93 "SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
94 " FROM pg_locks AS l " \
95 " LEFT JOIN pg_stat_activity AS a " \
96 " ON l.pid = a.procpid " \
97 " LEFT JOIN pg_database AS d " \
98 " ON a.datid = d.oid " \
99 " WHERE l.locktype = 'virtualxid' " \
100 " AND l.pid NOT IN (pg_backend_pid(), $1) " \
101 " AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
102 " AND (a.application_name IS NULL OR a.application_name <> $2)" \
103 " AND a.current_query !~* E'^\\\\s*vacuum\\\\s+' " \
104 " AND a.current_query !~ E'^autovacuum: ' " \
105 " AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)"
106
107 /* application_name is not available before 9.0. The last clause of
108 * the WHERE clause is just to eat the $2 parameter (application name).
109 */
110 #define SQL_XID_SNAPSHOT_80300 \
111 "SELECT coalesce(array_agg(l.virtualtransaction), '{}') " \
112 " FROM pg_locks AS l" \
113 " LEFT JOIN pg_stat_activity AS a " \
114 " ON l.pid = a.procpid " \
115 " LEFT JOIN pg_database AS d " \
116 " ON a.datid = d.oid " \
117 " WHERE l.locktype = 'virtualxid' AND l.pid NOT IN (pg_backend_pid(), $1)" \
118 " AND (l.virtualxid, l.virtualtransaction) <> ('1/1', '-1/0') " \
119 " AND a.current_query !~* E'^\\\\s*vacuum\\\\s+' " \
120 " AND a.current_query !~ E'^autovacuum: ' " \
121 " AND ((d.datname IS NULL OR d.datname = current_database()) OR l.database = 0)" \
122 " AND ($2::text IS NOT NULL)"
123
124 #define SQL_XID_SNAPSHOT \
125 (PQserverVersion(connection) >= 90200 ? SQL_XID_SNAPSHOT_90200 : \
126 (PQserverVersion(connection) >= 90000 ? SQL_XID_SNAPSHOT_90000 : \
127 SQL_XID_SNAPSHOT_80300))
128
129
130 /* Later, check whether any of the transactions we saw before are still
131 * alive, and wait for them to go away.
132 */
133 #define SQL_XID_ALIVE \
134 "SELECT pid FROM pg_locks WHERE locktype = 'virtualxid'"\
135 " AND pid <> pg_backend_pid() AND virtualtransaction = ANY($1)"
136
137 /* To be run while our main connection holds an AccessExclusive lock on the
138 * target table, and our secondary conn is attempting to grab an AccessShare
139 * lock. We know that "granted" must be false for these queries because
140 * we already hold the AccessExclusive lock. Also, we only care about other
141 * transactions trying to grab an ACCESS EXCLUSIVE lock, because we are only
142 * trying to kill off disallowed DDL commands, e.g. ALTER TABLE or TRUNCATE.
143 */
144 #define CANCEL_COMPETING_LOCKS \
145 "SELECT pg_cancel_backend(pid) FROM pg_locks WHERE locktype = 'relation'"\
146 " AND granted = false AND relation = %u"\
147 " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
148
149 #define KILL_COMPETING_LOCKS \
150 "SELECT pg_terminate_backend(pid) "\
151 "FROM pg_locks WHERE locktype = 'relation'"\
152 " AND granted = false AND relation = %u"\
153 " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
154
155 #define COUNT_COMPETING_LOCKS \
156 "SELECT pid FROM pg_locks WHERE locktype = 'relation'" \
157 " AND granted = false AND relation = %u" \
158 " AND mode = 'AccessExclusiveLock' AND pid <> pg_backend_pid()"
159
160 /* Will be used as a unique prefix for advisory locks. */
161 #define REPACK_LOCK_PREFIX_STR "16185446"
162
163 typedef enum
164 {
165 UNPROCESSED,
166 INPROGRESS,
167 FINISHED
168 } index_status_t;
169
170 /*
171 * per-index information
172 */
173 typedef struct repack_index
174 {
175 Oid target_oid; /* target: OID */
176 const char *create_index; /* CREATE INDEX */
177 index_status_t status; /* Track parallel build statuses. */
178 int worker_idx; /* which worker conn is handling */
179 } repack_index;
180
181 /*
182 * per-table information
183 */
184 typedef struct repack_table
185 {
186 const char *target_name; /* target: relname */
187 Oid target_oid; /* target: OID */
188 Oid target_toast; /* target: toast OID */
189 Oid target_tidx; /* target: toast index OID */
190 Oid pkid; /* target: PK OID */
191 Oid ckid; /* target: CK OID */
192 const char *create_pktype; /* CREATE TYPE pk */
193 const char *create_log; /* CREATE TABLE log */
194 const char *create_trigger; /* CREATE TRIGGER repack_trigger */
195 const char *enable_trigger; /* ALTER TABLE ENABLE ALWAYS TRIGGER repack_trigger */
196 const char *create_table; /* CREATE TABLE table AS SELECT WITH NO DATA*/
197 const char *copy_data; /* INSERT INTO */
198 const char *alter_col_storage; /* ALTER TABLE ALTER COLUMN SET STORAGE */
199 const char *drop_columns; /* ALTER TABLE DROP COLUMNs */
200 const char *delete_log; /* DELETE FROM log */
201 const char *lock_table; /* LOCK TABLE table */
202 const char *sql_peek; /* SQL used in flush */
203 const char *sql_insert; /* SQL used in flush */
204 const char *sql_delete; /* SQL used in flush */
205 const char *sql_update; /* SQL used in flush */
206 const char *sql_pop; /* SQL used in flush */
207 int n_indexes; /* number of indexes */
208 repack_index *indexes; /* info on each index */
209 } repack_table;
210
211
212 static bool is_superuser(void);
213 static void check_tablespace(void);
214 static bool preliminary_checks(char *errbuf, size_t errsize);
215 static bool is_requested_relation_exists(char *errbuf, size_t errsize);
216 static void repack_all_databases(const char *order_by);
217 static bool repack_one_database(const char *order_by, char *errbuf, size_t errsize);
218 static void repack_one_table(repack_table *table, const char *order_by);
219 static bool repack_table_indexes(PGresult *index_details);
220 static bool repack_all_indexes(char *errbuf, size_t errsize);
221 static void repack_cleanup(bool fatal, const repack_table *table);
222 static void repack_cleanup_callback(bool fatal, void *userdata);
223 static bool rebuild_indexes(const repack_table *table);
224
225 static char *getstr(PGresult *res, int row, int col);
226 static Oid getoid(PGresult *res, int row, int col);
227 static bool advisory_lock(PGconn *conn, const char *relid);
228 static bool lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact);
229 static bool kill_ddl(PGconn *conn, Oid relid, bool terminate);
230 static bool lock_access_share(PGconn *conn, Oid relid, const char *target_name);
231
232 #define SQLSTATE_INVALID_SCHEMA_NAME "3F000"
233 #define SQLSTATE_UNDEFINED_FUNCTION "42883"
234 #define SQLSTATE_QUERY_CANCELED "57014"
235
sqlstate_equals(PGresult * res,const char * state)236 static bool sqlstate_equals(PGresult *res, const char *state)
237 {
238 return strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), state) == 0;
239 }
240
241 static bool analyze = true;
242 static bool alldb = false;
243 static bool noorder = false;
244 static SimpleStringList parent_table_list = {NULL, NULL};
245 static SimpleStringList table_list = {NULL, NULL};
246 static SimpleStringList schema_list = {NULL, NULL};
247 static char *orderby = NULL;
248 static char *tablespace = NULL;
249 static bool moveidx = false;
250 static SimpleStringList r_index = {NULL, NULL};
251 static bool only_indexes = false;
252 static int wait_timeout = 60; /* in seconds */
253 static int jobs = 0; /* number of concurrent worker conns. */
254 static bool dryrun = false;
255 static unsigned int temp_obj_num = 0; /* temporary objects counter */
256 static bool no_kill_backend = false; /* abandon when timed-out */
257 static bool no_superuser_check = false;
258 static SimpleStringList exclude_extension_list = {NULL, NULL}; /* don't repack tables of these extensions */
259
260 /* buffer should have at least 11 bytes */
261 static char *
utoa(unsigned int value,char * buffer)262 utoa(unsigned int value, char *buffer)
263 {
264 sprintf(buffer, "%u", value);
265
266 return buffer;
267 }
268
269 static pgut_option options[] =
270 {
271 { 'b', 'a', "all", &alldb },
272 { 'l', 't', "table", &table_list },
273 { 'l', 'I', "parent-table", &parent_table_list },
274 { 'l', 'c', "schema", &schema_list },
275 { 'b', 'n', "no-order", &noorder },
276 { 'b', 'N', "dry-run", &dryrun },
277 { 's', 'o', "order-by", &orderby },
278 { 's', 's', "tablespace", &tablespace },
279 { 'b', 'S', "moveidx", &moveidx },
280 { 'l', 'i', "index", &r_index },
281 { 'b', 'x', "only-indexes", &only_indexes },
282 { 'i', 'T', "wait-timeout", &wait_timeout },
283 { 'B', 'Z', "no-analyze", &analyze },
284 { 'i', 'j', "jobs", &jobs },
285 { 'b', 'D', "no-kill-backend", &no_kill_backend },
286 { 'b', 'k', "no-superuser-check", &no_superuser_check },
287 { 'l', 'C', "exclude-extension", &exclude_extension_list },
288 { 0 },
289 };
290
291 int
main(int argc,char * argv[])292 main(int argc, char *argv[])
293 {
294 int i;
295 char errbuf[256];
296
297 i = pgut_getopt(argc, argv, options);
298
299 if (i == argc - 1)
300 dbname = argv[i];
301 else if (i < argc)
302 ereport(ERROR,
303 (errcode(EINVAL),
304 errmsg("too many arguments")));
305
306 check_tablespace();
307
308 if (dryrun)
309 elog(INFO, "Dry run enabled, not executing repack");
310
311 if (r_index.head || only_indexes)
312 {
313 if (r_index.head && table_list.head)
314 ereport(ERROR, (errcode(EINVAL),
315 errmsg("cannot specify --index (-i) and --table (-t)")));
316 if (r_index.head && parent_table_list.head)
317 ereport(ERROR, (errcode(EINVAL),
318 errmsg("cannot specify --index (-i) and --parent-table (-I)")));
319 else if (r_index.head && only_indexes)
320 ereport(ERROR, (errcode(EINVAL),
321 errmsg("cannot specify --index (-i) and --only-indexes (-x)")));
322 else if (r_index.head && exclude_extension_list.head)
323 ereport(ERROR, (errcode(EINVAL),
324 errmsg("cannot specify --index (-i) and --exclude-extension (-C)")));
325 else if (only_indexes && !(table_list.head || parent_table_list.head))
326 ereport(ERROR, (errcode(EINVAL),
327 errmsg("cannot repack all indexes of database, specify the table(s)"
328 "via --table (-t) or --parent-table (-I)")));
329 else if (only_indexes && exclude_extension_list.head)
330 ereport(ERROR, (errcode(EINVAL),
331 errmsg("cannot specify --only-indexes (-x) and --exclude-extension (-C)")));
332 else if (alldb)
333 ereport(ERROR, (errcode(EINVAL),
334 errmsg("cannot repack specific index(es) in all databases")));
335 else
336 {
337 if (orderby)
338 ereport(WARNING, (errcode(EINVAL),
339 errmsg("option -o (--order-by) has no effect while repacking indexes")));
340 else if (noorder)
341 ereport(WARNING, (errcode(EINVAL),
342 errmsg("option -n (--no-order) has no effect while repacking indexes")));
343 else if (!analyze)
344 ereport(WARNING, (errcode(EINVAL),
345 errmsg("ANALYZE is not performed after repacking indexes, -z (--no-analyze) has no effect")));
346 else if (jobs)
347 ereport(WARNING, (errcode(EINVAL),
348 errmsg("option -j (--jobs) has no effect, repacking indexes does not use parallel jobs")));
349 if (!repack_all_indexes(errbuf, sizeof(errbuf)))
350 ereport(ERROR,
351 (errcode(ERROR), errmsg("%s", errbuf)));
352 }
353 }
354 else
355 {
356 if (schema_list.head && (table_list.head || parent_table_list.head))
357 ereport(ERROR,
358 (errcode(EINVAL),
359 errmsg("cannot repack specific table(s) in schema, use schema.table notation instead")));
360
361 if (exclude_extension_list.head && table_list.head)
362 ereport(ERROR,
363 (errcode(EINVAL),
364 errmsg("cannot specify --table (-t) and --exclude-extension (-C)")));
365
366 if (exclude_extension_list.head && parent_table_list.head)
367 ereport(ERROR,
368 (errcode(EINVAL),
369 errmsg("cannot specify --parent-table (-I) and --exclude-extension (-C)")));
370
371 if (noorder)
372 orderby = "";
373
374 if (alldb)
375 {
376 if (table_list.head || parent_table_list.head)
377 ereport(ERROR,
378 (errcode(EINVAL),
379 errmsg("cannot repack specific table(s) in all databases")));
380 if (schema_list.head)
381 ereport(ERROR,
382 (errcode(EINVAL),
383 errmsg("cannot repack specific schema(s) in all databases")));
384 repack_all_databases(orderby);
385 }
386 else
387 {
388 if (!repack_one_database(orderby, errbuf, sizeof(errbuf)))
389 ereport(ERROR,
390 (errcode(ERROR), errmsg("%s failed with error: %s", PROGRAM_NAME, errbuf)));
391 }
392 }
393
394 return 0;
395 }
396
397
398 /*
399 * Test if the current user is a database superuser.
400 * Borrowed from psql/common.c
401 *
402 * Note: this will correctly detect superuserness only with a protocol-3.0
403 * or newer backend; otherwise it will always say "false".
404 */
405 bool
is_superuser(void)406 is_superuser(void)
407 {
408 const char *val;
409
410 if (no_superuser_check)
411 return true;
412
413 if (!connection)
414 return false;
415
416 val = PQparameterStatus(connection, "is_superuser");
417
418 if (val && strcmp(val, "on") == 0)
419 return true;
420
421 return false;
422 }
423
424 /*
425 * Check if the tablespace requested exists.
426 *
427 * Raise an exception on error.
428 */
429 void
check_tablespace()430 check_tablespace()
431 {
432 PGresult *res = NULL;
433 const char *params[1];
434
435 if (tablespace == NULL)
436 {
437 /* nothing to check, but let's see the options */
438 if (moveidx)
439 {
440 ereport(ERROR,
441 (errcode(EINVAL),
442 errmsg("cannot specify --moveidx (-S) without --tablespace (-s)")));
443 }
444 return;
445 }
446
447 /* check if the tablespace exists */
448 reconnect(ERROR);
449 params[0] = tablespace;
450 res = execute_elevel(
451 "select spcname from pg_tablespace where spcname = $1",
452 1, params, DEBUG2);
453
454 if (PQresultStatus(res) == PGRES_TUPLES_OK)
455 {
456 if (PQntuples(res) == 0)
457 {
458 ereport(ERROR,
459 (errcode(EINVAL),
460 errmsg("the tablespace \"%s\" doesn't exist", tablespace)));
461 }
462 }
463 else
464 {
465 ereport(ERROR,
466 (errcode(EINVAL),
467 errmsg("error checking the namespace: %s",
468 PQerrorMessage(connection))));
469 }
470
471 CLEARPGRES(res);
472 }
473
474 /*
475 * Perform sanity checks before beginning work. Make sure pg_repack is
476 * installed in the database, the user is a superuser, etc.
477 */
478 static bool
preliminary_checks(char * errbuf,size_t errsize)479 preliminary_checks(char *errbuf, size_t errsize){
480 bool ret = false;
481 PGresult *res = NULL;
482
483 if (!is_superuser()) {
484 if (errbuf)
485 snprintf(errbuf, errsize, "You must be a superuser to use %s",
486 PROGRAM_NAME);
487 goto cleanup;
488 }
489
490 /* Query the extension version. Exit if no match */
491 res = execute_elevel("select repack.version(), repack.version_sql()",
492 0, NULL, DEBUG2);
493 if (PQresultStatus(res) == PGRES_TUPLES_OK)
494 {
495 const char *libver;
496 char buf[64];
497
498 /* the string is something like "pg_repack 1.1.7" */
499 snprintf(buf, sizeof(buf), "%s %s", PROGRAM_NAME, PROGRAM_VERSION);
500
501 /* check the version of the C library */
502 libver = getstr(res, 0, 0);
503 if (0 != strcmp(buf, libver))
504 {
505 if (errbuf)
506 snprintf(errbuf, errsize,
507 "program '%s' does not match database library '%s'",
508 buf, libver);
509 goto cleanup;
510 }
511
512 /* check the version of the SQL extension */
513 libver = getstr(res, 0, 1);
514 if (0 != strcmp(buf, libver))
515 {
516 if (errbuf)
517 snprintf(errbuf, errsize,
518 "extension '%s' required, found extension '%s'",
519 buf, libver);
520 goto cleanup;
521 }
522 }
523 else
524 {
525 if (sqlstate_equals(res, SQLSTATE_INVALID_SCHEMA_NAME)
526 || sqlstate_equals(res, SQLSTATE_UNDEFINED_FUNCTION))
527 {
528 /* Schema repack does not exist, or version too old (version
529 * functions not found). Skip the database.
530 */
531 if (errbuf)
532 snprintf(errbuf, errsize,
533 "%s %s is not installed in the database",
534 PROGRAM_NAME, PROGRAM_VERSION);
535 }
536 else
537 {
538 /* Return the error message otherwise */
539 if (errbuf)
540 snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
541 }
542 goto cleanup;
543 }
544 CLEARPGRES(res);
545
546 /* Disable statement timeout. */
547 command("SET statement_timeout = 0", 0, NULL);
548
549 /* Restrict search_path to system catalog. */
550 command("SET search_path = pg_catalog, pg_temp, public", 0, NULL);
551
552 /* To avoid annoying "create implicit ..." messages. */
553 command("SET client_min_messages = warning", 0, NULL);
554
555 ret = true;
556
557 cleanup:
558 CLEARPGRES(res);
559 return ret;
560 }
561
562 /*
563 * Check the presence of tables specified by --parent-table and --table
564 * otherwise format user-friendly message
565 */
566 static bool
is_requested_relation_exists(char * errbuf,size_t errsize)567 is_requested_relation_exists(char *errbuf, size_t errsize){
568 bool ret = false;
569 PGresult *res = NULL;
570 const char **params = NULL;
571 int iparam = 0;
572 StringInfoData sql;
573 int num_relations;
574 SimpleStringListCell *cell;
575
576 num_relations = simple_string_list_size(parent_table_list) +
577 simple_string_list_size(table_list);
578
579 /* nothing was implicitly requested, so nothing to do here */
580 if (num_relations == 0)
581 return true;
582
583 /* has no suitable to_regclass(text) */
584 if (PQserverVersion(connection)<90600)
585 return true;
586
587 params = pgut_malloc(num_relations * sizeof(char *));
588 initStringInfo(&sql);
589 appendStringInfoString(&sql, "SELECT r FROM (VALUES ");
590
591 for (cell = table_list.head; cell; cell = cell->next)
592 {
593 appendStringInfo(&sql, "($%d)", iparam + 1);
594 params[iparam++] = cell->val;
595 if (iparam < num_relations)
596 appendStringInfoChar(&sql, ',');
597 }
598 for (cell = parent_table_list.head; cell; cell = cell->next)
599 {
600 appendStringInfo(&sql, "($%d)", iparam + 1);
601 params[iparam++] = cell->val;
602 if (iparam < num_relations)
603 appendStringInfoChar(&sql, ',');
604 }
605 appendStringInfoString(&sql,
606 ") AS given_t(r)"
607 " WHERE NOT EXISTS("
608 " SELECT FROM repack.tables WHERE relid=to_regclass(given_t.r) )"
609 );
610
611 /* double check the parameters array is sane */
612 if (iparam != num_relations)
613 {
614 if (errbuf)
615 snprintf(errbuf, errsize,
616 "internal error: bad parameters count: %i instead of %i",
617 iparam, num_relations);
618 goto cleanup;
619 }
620
621 res = execute_elevel(sql.data, iparam, params, DEBUG2);
622 if (PQresultStatus(res) == PGRES_TUPLES_OK)
623 {
624 int num;
625
626 num = PQntuples(res);
627
628 if (num != 0)
629 {
630 int i;
631 StringInfoData rel_names;
632 initStringInfo(&rel_names);
633
634 for (i = 0; i < num; i++)
635 {
636 appendStringInfo(&rel_names, "\"%s\"", getstr(res, i, 0));
637 if ((i + 1) != num)
638 appendStringInfoString(&rel_names, ", ");
639 }
640
641 if (errbuf)
642 {
643 if (num > 1)
644 snprintf(errbuf, errsize,
645 "relations do not exist: %s", rel_names.data);
646 else
647 snprintf(errbuf, errsize,
648 "ERROR: relation %s does not exist", rel_names.data);
649 }
650 termStringInfo(&rel_names);
651 }
652 else
653 ret = true;
654 }
655 else
656 {
657 if (errbuf)
658 snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
659 }
660 CLEARPGRES(res);
661
662 cleanup:
663 CLEARPGRES(res);
664 termStringInfo(&sql);
665 free(params);
666 return ret;
667 }
668
669 /*
670 * Call repack_one_database for each database.
671 */
672 static void
repack_all_databases(const char * orderby)673 repack_all_databases(const char *orderby)
674 {
675 PGresult *result;
676 int i;
677
678 dbname = "postgres";
679 reconnect(ERROR);
680
681 if (!is_superuser())
682 elog(ERROR, "You must be a superuser to use %s", PROGRAM_NAME);
683
684 result = execute("SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", 0, NULL);
685 disconnect();
686
687 for (i = 0; i < PQntuples(result); i++)
688 {
689 bool ret;
690 char errbuf[256];
691
692 dbname = PQgetvalue(result, i, 0);
693
694 elog(INFO, "repacking database \"%s\"", dbname);
695 if (!dryrun)
696 {
697 ret = repack_one_database(orderby, errbuf, sizeof(errbuf));
698 if (!ret)
699 elog(INFO, "database \"%s\" skipped: %s", dbname, errbuf);
700 }
701 }
702
703 CLEARPGRES(result);
704 }
705
706 /* result is not copied */
707 static char *
getstr(PGresult * res,int row,int col)708 getstr(PGresult *res, int row, int col)
709 {
710 if (PQgetisnull(res, row, col))
711 return NULL;
712 else
713 return PQgetvalue(res, row, col);
714 }
715
716 static Oid
getoid(PGresult * res,int row,int col)717 getoid(PGresult *res, int row, int col)
718 {
719 if (PQgetisnull(res, row, col))
720 return InvalidOid;
721 else
722 return (Oid)strtoul(PQgetvalue(res, row, col), NULL, 10);
723 }
724
725 /*
726 * Call repack_one_table for the target tables or each table in a database.
727 */
728 static bool
repack_one_database(const char * orderby,char * errbuf,size_t errsize)729 repack_one_database(const char *orderby, char *errbuf, size_t errsize)
730 {
731 bool ret = false;
732 PGresult *res = NULL;
733 int i;
734 int num;
735 StringInfoData sql;
736 SimpleStringListCell *cell;
737 const char **params = NULL;
738 int iparam = 0;
739 size_t num_parent_tables,
740 num_tables,
741 num_schemas,
742 num_params,
743 num_excluded_extensions;
744
745 num_parent_tables = simple_string_list_size(parent_table_list);
746 num_tables = simple_string_list_size(table_list);
747 num_schemas = simple_string_list_size(schema_list);
748 num_excluded_extensions = simple_string_list_size(exclude_extension_list);
749
750 /* 1st param is the user-specified tablespace */
751 num_params = num_excluded_extensions +
752 num_parent_tables +
753 num_tables +
754 num_schemas + 1;
755 params = pgut_malloc(num_params * sizeof(char *));
756
757 initStringInfo(&sql);
758
759 reconnect(ERROR);
760
761 /* No sense in setting up concurrent workers if --jobs=1 */
762 if (jobs > 1)
763 setup_workers(jobs);
764
765 if (!preliminary_checks(errbuf, errsize))
766 goto cleanup;
767
768 if (!is_requested_relation_exists(errbuf, errsize))
769 goto cleanup;
770
771 /* acquire target tables */
772 appendStringInfoString(&sql,
773 "SELECT t.*,"
774 " coalesce(v.tablespace, t.tablespace_orig) as tablespace_dest"
775 " FROM repack.tables t, "
776 " (VALUES (quote_ident($1::text))) as v (tablespace)"
777 " WHERE ");
778
779 params[iparam++] = tablespace;
780 if (num_tables || num_parent_tables)
781 {
782 /* standalone tables */
783 if (num_tables)
784 {
785 appendStringInfoString(&sql, "(");
786 for (cell = table_list.head; cell; cell = cell->next)
787 {
788 /* Construct table name placeholders to be used by PQexecParams */
789 appendStringInfo(&sql, "relid = $%d::regclass", iparam + 1);
790 params[iparam++] = cell->val;
791 if (cell->next)
792 appendStringInfoString(&sql, " OR ");
793 }
794 appendStringInfoString(&sql, ")");
795 }
796
797 if (num_tables && num_parent_tables)
798 appendStringInfoString(&sql, " OR ");
799
800 /* parent tables + inherited children */
801 if (num_parent_tables)
802 {
803 appendStringInfoString(&sql, "(");
804 for (cell = parent_table_list.head; cell; cell = cell->next)
805 {
806 /* Construct table name placeholders to be used by PQexecParams */
807 appendStringInfo(&sql,
808 "relid = ANY(repack.get_table_and_inheritors($%d::regclass))",
809 iparam + 1);
810 params[iparam++] = cell->val;
811 if (cell->next)
812 appendStringInfoString(&sql, " OR ");
813 }
814 appendStringInfoString(&sql, ")");
815 }
816 }
817 else if (num_schemas)
818 {
819 appendStringInfoString(&sql, "schemaname IN (");
820 for (cell = schema_list.head; cell; cell = cell->next)
821 {
822 /* Construct schema name placeholders to be used by PQexecParams */
823 appendStringInfo(&sql, "$%d", iparam + 1);
824 params[iparam++] = cell->val;
825 if (cell->next)
826 appendStringInfoString(&sql, ", ");
827 }
828 appendStringInfoString(&sql, ")");
829 }
830 else
831 {
832 appendStringInfoString(&sql, "pkid IS NOT NULL");
833 }
834
835 /* Exclude tables which belong to extensions */
836 if (exclude_extension_list.head)
837 {
838 appendStringInfoString(&sql, " AND t.relid NOT IN"
839 " (SELECT d.objid::regclass"
840 " FROM pg_depend d JOIN pg_extension e"
841 " ON d.refobjid = e.oid"
842 " WHERE d.classid = 'pg_class'::regclass AND (");
843
844 /* List all excluded extensions */
845 for (cell = exclude_extension_list.head; cell; cell = cell->next)
846 {
847 appendStringInfo(&sql, "e.extname = $%d", iparam + 1);
848 params[iparam++] = cell->val;
849
850 appendStringInfoString(&sql, cell->next ? " OR " : ")");
851 }
852
853 /* Close subquery */
854 appendStringInfoString(&sql, ")");
855 }
856
857 /* Ensure the regression tests get a consistent ordering of tables */
858 appendStringInfoString(&sql, " ORDER BY t.relname, t.schemaname");
859
860 /* double check the parameters array is sane */
861 if (iparam != num_params)
862 {
863 if (errbuf)
864 snprintf(errbuf, errsize,
865 "internal error: bad parameters count: %i instead of %zi",
866 iparam, num_params);
867 goto cleanup;
868 }
869
870 res = execute_elevel(sql.data, (int) num_params, params, DEBUG2);
871
872 /* on error skip the database */
873 if (PQresultStatus(res) != PGRES_TUPLES_OK)
874 {
875 /* Return the error message otherwise */
876 if (errbuf)
877 snprintf(errbuf, errsize, "%s", PQerrorMessage(connection));
878 goto cleanup;
879 }
880
881 num = PQntuples(res);
882
883 for (i = 0; i < num; i++)
884 {
885 repack_table table;
886 StringInfoData copy_sql;
887 const char *create_table_1;
888 const char *create_table_2;
889 const char *tablespace;
890 const char *ckey;
891 int c = 0;
892
893 table.target_name = getstr(res, i, c++);
894 table.target_oid = getoid(res, i, c++);
895 table.target_toast = getoid(res, i, c++);
896 table.target_tidx = getoid(res, i, c++);
897 c++; // Skip schemaname
898 table.pkid = getoid(res, i, c++);
899 table.ckid = getoid(res, i, c++);
900
901 if (table.pkid == 0) {
902 ereport(WARNING,
903 (errcode(E_PG_COMMAND),
904 errmsg("relation \"%s\" must have a primary key or not-null unique keys", table.target_name)));
905 continue;
906 }
907
908 table.create_pktype = getstr(res, i, c++);
909 table.create_log = getstr(res, i, c++);
910 table.create_trigger = getstr(res, i, c++);
911 table.enable_trigger = getstr(res, i, c++);
912
913 create_table_1 = getstr(res, i, c++);
914 tablespace = getstr(res, i, c++); /* to be clobbered */
915 create_table_2 = getstr(res, i, c++);
916 table.copy_data = getstr(res, i , c++);
917 table.alter_col_storage = getstr(res, i, c++);
918 table.drop_columns = getstr(res, i, c++);
919 table.delete_log = getstr(res, i, c++);
920 table.lock_table = getstr(res, i, c++);
921 ckey = getstr(res, i, c++);
922 table.sql_peek = getstr(res, i, c++);
923 table.sql_insert = getstr(res, i, c++);
924 table.sql_delete = getstr(res, i, c++);
925 table.sql_update = getstr(res, i, c++);
926 table.sql_pop = getstr(res, i, c++);
927 tablespace = getstr(res, i, c++);
928
929 /* Craft CREATE TABLE SQL */
930 resetStringInfo(&sql);
931 appendStringInfoString(&sql, create_table_1);
932 appendStringInfoString(&sql, tablespace);
933 appendStringInfoString(&sql, create_table_2);
934
935 /* Always append WITH NO DATA to CREATE TABLE SQL*/
936 appendStringInfoString(&sql, " WITH NO DATA");
937 table.create_table = sql.data;
938
939 /* Craft Copy SQL */
940 initStringInfo(©_sql);
941 appendStringInfoString(©_sql, table.copy_data);
942 if (!orderby)
943
944 {
945 if (ckey != NULL)
946 {
947 /* CLUSTER mode */
948 appendStringInfoString(©_sql, " ORDER BY ");
949 appendStringInfoString(©_sql, ckey);
950 }
951
952 /* else, VACUUM FULL mode (non-clustered tables) */
953 }
954 else if (!orderby[0])
955 {
956 /* VACUUM FULL mode (for clustered tables too), do nothing */
957 }
958 else
959 {
960 /* User specified ORDER BY */
961 appendStringInfoString(©_sql, " ORDER BY ");
962 appendStringInfoString(©_sql, orderby);
963 }
964 table.copy_data = copy_sql.data;
965
966 repack_one_table(&table, orderby);
967 }
968 ret = true;
969
970 cleanup:
971 CLEARPGRES(res);
972 disconnect();
973 termStringInfo(&sql);
974 free(params);
975 return ret;
976 }
977
978 static int
apply_log(PGconn * conn,const repack_table * table,int count)979 apply_log(PGconn *conn, const repack_table *table, int count)
980 {
981 int result;
982 PGresult *res;
983 const char *params[6];
984 char buffer[12];
985
986 params[0] = table->sql_peek;
987 params[1] = table->sql_insert;
988 params[2] = table->sql_delete;
989 params[3] = table->sql_update;
990 params[4] = table->sql_pop;
991 params[5] = utoa(count, buffer);
992
993 res = pgut_execute(conn,
994 "SELECT repack.repack_apply($1, $2, $3, $4, $5, $6)",
995 6, params);
996 result = atoi(PQgetvalue(res, 0, 0));
997 CLEARPGRES(res);
998
999 return result;
1000 }
1001
1002 /*
1003 * Create indexes on temp table, possibly using multiple worker connections
1004 * concurrently if the user asked for --jobs=...
1005 */
1006 static bool
rebuild_indexes(const repack_table * table)1007 rebuild_indexes(const repack_table *table)
1008 {
1009 PGresult *res = NULL;
1010 int num_indexes;
1011 int i;
1012 int num_active_workers;
1013 int num_workers;
1014 repack_index *index_jobs;
1015 bool have_error = false;
1016
1017 elog(DEBUG2, "---- create indexes ----");
1018
1019 num_indexes = table->n_indexes;
1020
1021 /* We might have more actual worker connections than we need,
1022 * if the number of workers exceeds the number of indexes to be
1023 * built. In that case, ignore the extra workers.
1024 */
1025 num_workers = num_indexes > workers.num_workers ? workers.num_workers : num_indexes;
1026 num_active_workers = num_workers;
1027
1028 elog(DEBUG2, "Have %d indexes and num_workers=%d", num_indexes,
1029 num_workers);
1030
1031 index_jobs = table->indexes;
1032
1033 for (i = 0; i < num_indexes; i++)
1034 {
1035
1036 elog(DEBUG2, "set up index_jobs [%d]", i);
1037 elog(DEBUG2, "target_oid : %u", index_jobs[i].target_oid);
1038 elog(DEBUG2, "create_index : %s", index_jobs[i].create_index);
1039
1040 if (num_workers <= 1) {
1041 /* Use primary connection if we are not setting up parallel
1042 * index building, or if we only have one worker.
1043 */
1044 command(index_jobs[i].create_index, 0, NULL);
1045
1046 /* This bookkeeping isn't actually important in this no-workers
1047 * case, but just for clarity.
1048 */
1049 index_jobs[i].status = FINISHED;
1050 }
1051 else if (i < num_workers) {
1052 /* Assign available worker to build an index. */
1053 index_jobs[i].status = INPROGRESS;
1054 index_jobs[i].worker_idx = i;
1055 elog(LOG, "Initial worker %d to build index: %s",
1056 i, index_jobs[i].create_index);
1057
1058 if (!(PQsendQuery(workers.conns[i], index_jobs[i].create_index)))
1059 {
1060 elog(WARNING, "Error sending async query: %s\n%s",
1061 index_jobs[i].create_index,
1062 PQerrorMessage(workers.conns[i]));
1063 have_error = true;
1064 goto cleanup;
1065 }
1066 }
1067 /* Else we have more indexes to be built than workers
1068 * available. That's OK, we'll get to them later.
1069 */
1070 }
1071
1072 if (num_workers > 1)
1073 {
1074 int freed_worker = -1;
1075 int ret;
1076
1077 /* Prefer poll() over select(), following PostgreSQL custom. */
1078 #ifdef HAVE_POLL
1079 struct pollfd *input_fds;
1080
1081 input_fds = pgut_malloc(sizeof(struct pollfd) * num_workers);
1082 for (i = 0; i < num_workers; i++)
1083 {
1084 input_fds[i].fd = PQsocket(workers.conns[i]);
1085 input_fds[i].events = POLLIN | POLLERR;
1086 input_fds[i].revents = 0;
1087 }
1088 #else
1089 fd_set input_mask;
1090 struct timeval timeout;
1091 /* select() needs the highest-numbered socket descriptor */
1092 int max_fd;
1093 #endif
1094
1095 /* Now go through our index builds, and look for any which is
1096 * reported complete. Reassign that worker to the next index to
1097 * be built, if any.
1098 */
1099 while (num_active_workers > 0)
1100 {
1101 elog(DEBUG2, "polling %d active workers", num_active_workers);
1102
1103 #ifdef HAVE_POLL
1104 ret = poll(input_fds, num_workers, POLL_TIMEOUT * 1000);
1105 #else
1106 /* re-initialize timeout and input_mask before each
1107 * invocation of select(). I think this isn't
1108 * necessary on many Unixen, but just in case.
1109 */
1110 timeout.tv_sec = POLL_TIMEOUT;
1111 timeout.tv_usec = 0;
1112
1113 FD_ZERO(&input_mask);
1114 for (i = 0, max_fd = 0; i < num_workers; i++)
1115 {
1116 FD_SET(PQsocket(workers.conns[i]), &input_mask);
1117 if (PQsocket(workers.conns[i]) > max_fd)
1118 max_fd = PQsocket(workers.conns[i]);
1119 }
1120
1121 ret = select(max_fd + 1, &input_mask, NULL, NULL, &timeout);
1122 #endif
1123 /* XXX: the errno != EINTR check means we won't bail
1124 * out on SIGINT. We should probably just remove this
1125 * check, though it seems we also need to fix up
1126 * the on_interrupt handling for workers' index
1127 * builds (those PGconns don't seem to have c->cancel
1128 * set, so we don't cancel the in-progress builds).
1129 */
1130 if (ret < 0 && errno != EINTR)
1131 elog(ERROR, "poll() failed: %d, %d", ret, errno);
1132
1133 elog(DEBUG2, "Poll returned: %d", ret);
1134
1135 for (i = 0; i < num_indexes; i++)
1136 {
1137 if (index_jobs[i].status == INPROGRESS)
1138 {
1139 Assert(index_jobs[i].worker_idx >= 0);
1140 /* Must call PQconsumeInput before we can check PQisBusy */
1141 if (PQconsumeInput(workers.conns[index_jobs[i].worker_idx]) != 1)
1142 {
1143 elog(WARNING, "Error fetching async query status: %s",
1144 PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
1145 have_error = true;
1146 goto cleanup;
1147 }
1148 if (!PQisBusy(workers.conns[index_jobs[i].worker_idx]))
1149 {
1150 elog(LOG, "Command finished in worker %d: %s",
1151 index_jobs[i].worker_idx,
1152 index_jobs[i].create_index);
1153
1154 while ((res = PQgetResult(workers.conns[index_jobs[i].worker_idx])))
1155 {
1156 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1157 {
1158 elog(WARNING, "Error with create index: %s",
1159 PQerrorMessage(workers.conns[index_jobs[i].worker_idx]));
1160 have_error = true;
1161 goto cleanup;
1162 }
1163 CLEARPGRES(res);
1164 }
1165
1166 /* We are only going to re-queue one worker, even
1167 * though more than one index build might be finished.
1168 * Any other jobs which may be finished will
1169 * just have to wait for the next pass through the
1170 * poll()/select() loop.
1171 */
1172 freed_worker = index_jobs[i].worker_idx;
1173 index_jobs[i].status = FINISHED;
1174 num_active_workers--;
1175 break;
1176 }
1177 }
1178 }
1179 if (freed_worker > -1)
1180 {
1181 for (i = 0; i < num_indexes; i++)
1182 {
1183 if (index_jobs[i].status == UNPROCESSED)
1184 {
1185 index_jobs[i].status = INPROGRESS;
1186 index_jobs[i].worker_idx = freed_worker;
1187 elog(LOG, "Assigning worker %d to build index #%d: "
1188 "%s", freed_worker, i,
1189 index_jobs[i].create_index);
1190
1191 if (!(PQsendQuery(workers.conns[freed_worker],
1192 index_jobs[i].create_index))) {
1193 elog(WARNING, "Error sending async query: %s\n%s",
1194 index_jobs[i].create_index,
1195 PQerrorMessage(workers.conns[freed_worker]));
1196 have_error = true;
1197 goto cleanup;
1198 }
1199 num_active_workers++;
1200 break;
1201 }
1202 }
1203 freed_worker = -1;
1204 }
1205 }
1206
1207 }
1208
1209 cleanup:
1210 CLEARPGRES(res);
1211 return (!have_error);
1212 }
1213
1214
1215 /*
1216 * Re-organize one table.
1217 */
1218 static void
repack_one_table(repack_table * table,const char * orderby)1219 repack_one_table(repack_table *table, const char *orderby)
1220 {
1221 PGresult *res = NULL;
1222 const char *params[3];
1223 int num;
1224 char *vxid = NULL;
1225 char buffer[12];
1226 StringInfoData sql;
1227 bool ret = false;
1228 PGresult *indexres = NULL;
1229 const char *indexparams[2];
1230 char indexbuffer[12];
1231 int j;
1232
1233 /* appname will be "pg_repack" in normal use on 9.0+, or
1234 * "pg_regress" when run under `make installcheck`
1235 */
1236 const char *appname = getenv("PGAPPNAME");
1237
1238 /* Keep track of whether we have gotten through setup to install
1239 * the repack_trigger, log table, etc. ourselves. We don't want to
1240 * go through repack_cleanup() if we didn't actually set up the
1241 * trigger ourselves, lest we be cleaning up another pg_repack's mess,
1242 * or worse, interfering with a still-running pg_repack.
1243 */
1244 bool table_init = false;
1245
1246 initStringInfo(&sql);
1247
1248 elog(INFO, "repacking table \"%s\"", table->target_name);
1249
1250 elog(DEBUG2, "---- repack_one_table ----");
1251 elog(DEBUG2, "target_name : %s", table->target_name);
1252 elog(DEBUG2, "target_oid : %u", table->target_oid);
1253 elog(DEBUG2, "target_toast : %u", table->target_toast);
1254 elog(DEBUG2, "target_tidx : %u", table->target_tidx);
1255 elog(DEBUG2, "pkid : %u", table->pkid);
1256 elog(DEBUG2, "ckid : %u", table->ckid);
1257 elog(DEBUG2, "create_pktype : %s", table->create_pktype);
1258 elog(DEBUG2, "create_log : %s", table->create_log);
1259 elog(DEBUG2, "create_trigger : %s", table->create_trigger);
1260 elog(DEBUG2, "enable_trigger : %s", table->enable_trigger);
1261 elog(DEBUG2, "create_table : %s", table->create_table);
1262 elog(DEBUG2, "copy_data : %s", table->copy_data);
1263 elog(DEBUG2, "alter_col_storage : %s", table->alter_col_storage ?
1264 table->alter_col_storage : "(skipped)");
1265 elog(DEBUG2, "drop_columns : %s", table->drop_columns ? table->drop_columns : "(skipped)");
1266 elog(DEBUG2, "delete_log : %s", table->delete_log);
1267 elog(DEBUG2, "lock_table : %s", table->lock_table);
1268 elog(DEBUG2, "sql_peek : %s", table->sql_peek);
1269 elog(DEBUG2, "sql_insert : %s", table->sql_insert);
1270 elog(DEBUG2, "sql_delete : %s", table->sql_delete);
1271 elog(DEBUG2, "sql_update : %s", table->sql_update);
1272 elog(DEBUG2, "sql_pop : %s", table->sql_pop);
1273
1274 if (dryrun)
1275 return;
1276
1277 /* push repack_cleanup_callback() on stack to clean temporary objects */
1278 pgut_atexit_push(repack_cleanup_callback, &table->target_oid);
1279
1280 /*
1281 * 1. Setup advisory lock and trigger on main table.
1282 */
1283 elog(DEBUG2, "---- setup ----");
1284
1285 params[0] = utoa(table->target_oid, buffer);
1286
1287 if (!advisory_lock(connection, buffer))
1288 goto cleanup;
1289
1290 if (!(lock_exclusive(connection, buffer, table->lock_table, true)))
1291 {
1292 if (no_kill_backend)
1293 elog(INFO, "Skipping repack %s due to timeout", table->target_name);
1294 else
1295 elog(WARNING, "lock_exclusive() failed for %s", table->target_name);
1296 goto cleanup;
1297 }
1298
1299 /*
1300 * pg_get_indexdef requires an access share lock, so do those calls while
1301 * we have an access exclusive lock anyway, so we know they won't block.
1302 */
1303
1304 indexparams[0] = utoa(table->target_oid, indexbuffer);
1305 indexparams[1] = moveidx ? tablespace : NULL;
1306
1307 /* First, just display a warning message for any invalid indexes
1308 * which may be on the table (mostly to match the behavior of 1.1.8).
1309 */
1310 indexres = execute(
1311 "SELECT pg_get_indexdef(indexrelid)"
1312 " FROM pg_index WHERE indrelid = $1 AND NOT indisvalid",
1313 1, indexparams);
1314
1315 for (j = 0; j < PQntuples(indexres); j++)
1316 {
1317 const char *indexdef;
1318 indexdef = getstr(indexres, j, 0);
1319 elog(WARNING, "skipping invalid index: %s", indexdef);
1320 }
1321
1322 indexres = execute(
1323 "SELECT indexrelid,"
1324 " repack.repack_indexdef(indexrelid, indrelid, $2, FALSE) "
1325 " FROM pg_index WHERE indrelid = $1 AND indisvalid",
1326 2, indexparams);
1327
1328 table->n_indexes = PQntuples(indexres);
1329 table->indexes = pgut_malloc(table->n_indexes * sizeof(repack_index));
1330
1331 for (j = 0; j < table->n_indexes; j++)
1332 {
1333 table->indexes[j].target_oid = getoid(indexres, j, 0);
1334 table->indexes[j].create_index = getstr(indexres, j, 1);
1335 table->indexes[j].status = UNPROCESSED;
1336 table->indexes[j].worker_idx = -1; /* Unassigned */
1337 }
1338
1339 for (j = 0; j < table->n_indexes; j++)
1340 {
1341 elog(DEBUG2, "index[%d].target_oid : %u", j, table->indexes[j].target_oid);
1342 elog(DEBUG2, "index[%d].create_index : %s", j, table->indexes[j].create_index);
1343 }
1344
1345
1346 /*
1347 * Check if repack_trigger is not conflict with existing trigger. We can
1348 * find it out later but we check it in advance and go to cleanup if needed.
1349 * In AFTER trigger context, since triggered tuple is not changed by other
1350 * trigger we don't care about the fire order.
1351 */
1352 res = execute("SELECT repack.conflicted_triggers($1)", 1, params);
1353 if (PQntuples(res) > 0)
1354 {
1355 ereport(WARNING,
1356 (errcode(E_PG_COMMAND),
1357 errmsg("the table \"%s\" already has a trigger called \"%s\"",
1358 table->target_name, "repack_trigger"),
1359 errdetail(
1360 "The trigger was probably installed during a previous"
1361 " attempt to run pg_repack on the table which was"
1362 " interrupted and for some reason failed to clean up"
1363 " the temporary objects. Please drop the trigger or drop"
1364 " and recreate the pg_repack extension altogether"
1365 " to remove all the temporary objects left over.")));
1366 goto cleanup;
1367 }
1368
1369 CLEARPGRES(res);
1370
1371 command(table->create_pktype, 0, NULL);
1372 temp_obj_num++;
1373 command(table->create_log, 0, NULL);
1374 temp_obj_num++;
1375 command(table->create_trigger, 0, NULL);
1376 temp_obj_num++;
1377 command(table->enable_trigger, 0, NULL);
1378 printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
1379 command(sql.data, 0, NULL);
1380
1381 /* While we are still holding an AccessExclusive lock on the table, submit
1382 * the request for an AccessShare lock asynchronously from conn2.
1383 * We want to submit this query in conn2 while connection's
1384 * transaction still holds its lock, so that no DDL may sneak in
1385 * between the time that connection commits and conn2 gets its lock.
1386 */
1387 pgut_command(conn2, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1388
1389 /* grab the backend PID of conn2; we'll need this when querying
1390 * pg_locks momentarily.
1391 */
1392 res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL);
1393 buffer[0] = '\0';
1394 strncat(buffer, PQgetvalue(res, 0, 0), sizeof(buffer) - 1);
1395 CLEARPGRES(res);
1396
1397 /*
1398 * Not using lock_access_share() here since we know that
1399 * it's not possible to obtain the ACCESS SHARE lock right now
1400 * in conn2, since the primary connection holds ACCESS EXCLUSIVE.
1401 */
1402 printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE",
1403 table->target_name);
1404 elog(DEBUG2, "LOCK TABLE %s IN ACCESS SHARE MODE", table->target_name);
1405 if (PQsetnonblocking(conn2, 1))
1406 {
1407 elog(WARNING, "Unable to set conn2 nonblocking.");
1408 goto cleanup;
1409 }
1410 if (!(PQsendQuery(conn2, sql.data)))
1411 {
1412 elog(WARNING, "Error sending async query: %s\n%s", sql.data,
1413 PQerrorMessage(conn2));
1414 goto cleanup;
1415 }
1416
1417 /* Now that we've submitted the LOCK TABLE request through conn2,
1418 * look for and cancel any (potentially dangerous) DDL commands which
1419 * might also be waiting on our table lock at this point --
1420 * it's not safe to let them wait, because they may grab their
1421 * AccessExclusive lock before conn2 gets its AccessShare lock,
1422 * and perform unsafe DDL on the table.
1423 *
1424 * Normally, lock_access_share() would take care of this for us,
1425 * but we're not able to use it here.
1426 */
1427 if (!(kill_ddl(connection, table->target_oid, true)))
1428 {
1429 if (no_kill_backend)
1430 elog(INFO, "Skipping repack %s due to timeout.", table->target_name);
1431 else
1432 elog(WARNING, "kill_ddl() failed.");
1433 goto cleanup;
1434 }
1435
1436 /* We're finished killing off any unsafe DDL. COMMIT in our main
1437 * connection, so that conn2 may get its AccessShare lock.
1438 */
1439 command("COMMIT", 0, NULL);
1440
1441 /* The main connection has now committed its repack_trigger,
1442 * log table, and temp. table. If any error occurs from this point
1443 * on and we bail out, we should try to clean those up.
1444 */
1445 table_init = true;
1446
1447 /* Keep looping PQgetResult() calls until it returns NULL, indicating the
1448 * command is done and we have obtained our lock.
1449 */
1450 while ((res = PQgetResult(conn2)))
1451 {
1452 elog(DEBUG2, "Waiting on ACCESS SHARE lock...");
1453 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1454 {
1455 elog(WARNING, "Error with LOCK TABLE: %s", PQerrorMessage(conn2));
1456 goto cleanup;
1457 }
1458 CLEARPGRES(res);
1459 }
1460
1461 /* Turn conn2 back into blocking mode for further non-async use. */
1462 if (PQsetnonblocking(conn2, 0))
1463 {
1464 elog(WARNING, "Unable to set conn2 blocking.");
1465 goto cleanup;
1466 }
1467
1468 /*
1469 * 2. Copy tuples into temp table.
1470 */
1471 elog(DEBUG2, "---- copy tuples ----");
1472
1473 /* Must use SERIALIZABLE (or at least not READ COMMITTED) to avoid race
1474 * condition between the create_table statement and rows subsequently
1475 * being added to the log.
1476 */
1477 command("BEGIN ISOLATION LEVEL SERIALIZABLE", 0, NULL);
1478 /* SET work_mem = maintenance_work_mem */
1479 command("SELECT set_config('work_mem', current_setting('maintenance_work_mem'), true)", 0, NULL);
1480 if (orderby && !orderby[0])
1481 command("SET LOCAL synchronize_seqscans = off", 0, NULL);
1482
1483 /* Fetch an array of Virtual IDs of all transactions active right now.
1484 */
1485 params[0] = buffer; /* backend PID of conn2 */
1486 params[1] = PROGRAM_NAME;
1487 res = execute(SQL_XID_SNAPSHOT, 2, params);
1488 vxid = pgut_strdup(PQgetvalue(res, 0, 0));
1489
1490 CLEARPGRES(res);
1491
1492 /* Delete any existing entries in the log table now, since we have not
1493 * yet run the CREATE TABLE ... AS SELECT, which will take in all existing
1494 * rows from the target table; if we also included prior rows from the
1495 * log we could wind up with duplicates.
1496 */
1497 command(table->delete_log, 0, NULL);
1498
1499 /* We need to be able to obtain an AccessShare lock on the target table
1500 * for the create_table command to go through, so go ahead and obtain
1501 * the lock explicitly.
1502 *
1503 * Since conn2 has been diligently holding its AccessShare lock, it
1504 * is possible that another transaction has been waiting to acquire
1505 * an AccessExclusive lock on the table (e.g. a concurrent ALTER TABLE
1506 * or TRUNCATE which we must not allow). If there are any such
1507 * transactions, lock_access_share() will kill them so that our
1508 * CREATE TABLE ... AS SELECT does not deadlock waiting for an
1509 * AccessShare lock.
1510 */
1511 if (!(lock_access_share(connection, table->target_oid, table->target_name)))
1512 goto cleanup;
1513
1514 /*
1515 * Before copying data to the target table, we need to set the column storage
1516 * type if its storage type has been changed from the type default.
1517 */
1518 command(table->create_table, 0, NULL);
1519 if (table->alter_col_storage)
1520 command(table->alter_col_storage, 0, NULL);
1521 command(table->copy_data, 0, NULL);
1522 temp_obj_num++;
1523 printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
1524 if (table->drop_columns)
1525 command(table->drop_columns, 0, NULL);
1526 command(sql.data, 0, NULL);
1527 command("COMMIT", 0, NULL);
1528
1529 /*
1530 * 3. Create indexes on temp table.
1531 */
1532 if (!rebuild_indexes(table))
1533 goto cleanup;
1534
1535 /* don't clear indexres until after rebuild_indexes or bad things happen */
1536 CLEARPGRES(indexres);
1537 CLEARPGRES(res);
1538
1539 /*
1540 * 4. Apply log to temp table until no tuples are left in the log
1541 * and all of the old transactions are finished.
1542 */
1543 for (;;)
1544 {
1545 num = apply_log(connection, table, APPLY_COUNT);
1546
1547 /* We'll keep applying tuples from the log table in batches
1548 * of APPLY_COUNT, until applying a batch of tuples
1549 * (via LIMIT) results in our having applied
1550 * MIN_TUPLES_BEFORE_SWITCH or fewer tuples. We don't want to
1551 * get stuck repetitively applying some small number of tuples
1552 * from the log table as inserts/updates/deletes may be
1553 * constantly coming into the original table.
1554 */
1555 if (num > MIN_TUPLES_BEFORE_SWITCH)
1556 continue; /* there might be still some tuples, repeat. */
1557
1558 /* old transactions still alive ? */
1559 params[0] = vxid;
1560 res = execute(SQL_XID_ALIVE, 1, params);
1561 num = PQntuples(res);
1562
1563 if (num > 0)
1564 {
1565 /* Wait for old transactions.
1566 * Only display this message if we are NOT
1567 * running under pg_regress, so as not to cause
1568 * noise which would trip up pg_regress.
1569 */
1570
1571 if (!appname || strcmp(appname, "pg_regress") != 0)
1572 {
1573 elog(NOTICE, "Waiting for %d transactions to finish. First PID: %s", num, PQgetvalue(res, 0, 0));
1574 }
1575
1576 CLEARPGRES(res);
1577 sleep(1);
1578 continue;
1579 }
1580 else
1581 {
1582 /* All old transactions are finished;
1583 * go to next step. */
1584 CLEARPGRES(res);
1585 break;
1586 }
1587 }
1588
1589 /*
1590 * 5. Swap: will be done with conn2, since it already holds an
1591 * AccessShare lock.
1592 */
1593 elog(DEBUG2, "---- swap ----");
1594 /* Bump our existing AccessShare lock to AccessExclusive */
1595
1596 if (!(lock_exclusive(conn2, utoa(table->target_oid, buffer),
1597 table->lock_table, false)))
1598 {
1599 elog(WARNING, "lock_exclusive() failed in conn2 for %s",
1600 table->target_name);
1601 goto cleanup;
1602 }
1603
1604 apply_log(conn2, table, 0);
1605 params[0] = utoa(table->target_oid, buffer);
1606 pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params);
1607 pgut_command(conn2, "COMMIT", 0, NULL);
1608
1609 /*
1610 * 6. Drop.
1611 */
1612 elog(DEBUG2, "---- drop ----");
1613
1614 command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1615 if (!(lock_exclusive(connection, utoa(table->target_oid, buffer),
1616 table->lock_table, false)))
1617 {
1618 elog(WARNING, "lock_exclusive() failed in connection for %s",
1619 table->target_name);
1620 goto cleanup;
1621 }
1622
1623 params[1] = utoa(temp_obj_num, indexbuffer);
1624 command("SELECT repack.repack_drop($1, $2)", 2, params);
1625 command("COMMIT", 0, NULL);
1626 temp_obj_num = 0; /* reset temporary object counter after cleanup */
1627
1628 /*
1629 * 7. Analyze.
1630 * Note that cleanup hook has been already uninstalled here because analyze
1631 * is not an important operation; No clean up even if failed.
1632 */
1633 if (analyze)
1634 {
1635 elog(DEBUG2, "---- analyze ----");
1636
1637 command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1638 printfStringInfo(&sql, "ANALYZE %s", table->target_name);
1639 command(sql.data, 0, NULL);
1640 command("COMMIT", 0, NULL);
1641 }
1642
1643 /* Release advisory lock on table. */
1644 params[0] = REPACK_LOCK_PREFIX_STR;
1645 params[1] = utoa(table->target_oid, buffer);
1646
1647 res = pgut_execute(connection, "SELECT pg_advisory_unlock($1, CAST(-2147483648 + $2::bigint AS integer))",
1648 2, params);
1649 ret = true;
1650
1651 cleanup:
1652 CLEARPGRES(res);
1653 termStringInfo(&sql);
1654 if (vxid)
1655 free(vxid);
1656
1657 /* Rollback current transactions */
1658 pgut_rollback(connection);
1659 pgut_rollback(conn2);
1660
1661 /* XXX: distinguish between fatal and non-fatal errors via the first
1662 * arg to repack_cleanup().
1663 */
1664 if ((!ret) && table_init)
1665 repack_cleanup(false, table);
1666 }
1667
1668 /* Kill off any concurrent DDL (or any transaction attempting to take
1669 * an AccessExclusive lock) trying to run against our table if we want to
1670 * do. Note, we're killing these queries off *before* they are granted
1671 * an AccessExclusive lock on our table.
1672 *
1673 * Returns true if no problems encountered, false otherwise.
1674 */
1675 static bool
kill_ddl(PGconn * conn,Oid relid,bool terminate)1676 kill_ddl(PGconn *conn, Oid relid, bool terminate)
1677 {
1678 bool ret = true;
1679 PGresult *res;
1680 StringInfoData sql;
1681 int n_tuples;
1682
1683 initStringInfo(&sql);
1684
1685 /* Check the number of backends competing AccessExclusiveLock */
1686 printfStringInfo(&sql, COUNT_COMPETING_LOCKS, relid);
1687 res = pgut_execute(conn, sql.data, 0, NULL);
1688 n_tuples = PQntuples(res);
1689
1690 if (n_tuples != 0)
1691 {
1692 /* Competing backend is exsits, but if we do not want to calcel/terminate
1693 * any backend, do nothing.
1694 */
1695 if (no_kill_backend)
1696 {
1697 elog(WARNING, "%d unsafe queries remain but do not cancel them and skip to repack it",
1698 n_tuples);
1699 ret = false;
1700 }
1701 else
1702 {
1703 resetStringInfo(&sql);
1704 printfStringInfo(&sql, CANCEL_COMPETING_LOCKS, relid);
1705 res = pgut_execute(conn, sql.data, 0, NULL);
1706 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1707 {
1708 elog(WARNING, "Error canceling unsafe queries: %s",
1709 PQerrorMessage(conn));
1710 ret = false;
1711 }
1712 else if (PQntuples(res) > 0 && terminate && PQserverVersion(conn) >= 80400)
1713 {
1714 elog(WARNING,
1715 "Canceled %d unsafe queries. Terminating any remaining PIDs.",
1716 PQntuples(res));
1717
1718 CLEARPGRES(res);
1719 printfStringInfo(&sql, KILL_COMPETING_LOCKS, relid);
1720 res = pgut_execute(conn, sql.data, 0, NULL);
1721 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1722 {
1723 elog(WARNING, "Error killing unsafe queries: %s",
1724 PQerrorMessage(conn));
1725 ret = false;
1726 }
1727 }
1728 else if (PQntuples(res) > 0)
1729 elog(NOTICE, "Canceled %d unsafe queries", PQntuples(res));
1730 }
1731 }
1732 else
1733 elog(DEBUG2, "No competing DDL to cancel.");
1734
1735 CLEARPGRES(res);
1736 termStringInfo(&sql);
1737
1738 return ret;
1739 }
1740
1741
1742 /*
1743 * Try to acquire an ACCESS SHARE table lock, avoiding deadlocks and long
1744 * waits by killing off other sessions which may be stuck trying to obtain
1745 * an ACCESS EXCLUSIVE lock.
1746 *
1747 * Arguments:
1748 *
1749 * conn: connection to use
1750 * relid: OID of relation
1751 * target_name: name of table
1752 */
1753 static bool
lock_access_share(PGconn * conn,Oid relid,const char * target_name)1754 lock_access_share(PGconn *conn, Oid relid, const char *target_name)
1755 {
1756 StringInfoData sql;
1757 time_t start = time(NULL);
1758 int i;
1759 bool ret = true;
1760
1761 initStringInfo(&sql);
1762
1763 for (i = 1; ; i++)
1764 {
1765 time_t duration;
1766 PGresult *res;
1767 int wait_msec;
1768
1769 duration = time(NULL) - start;
1770
1771 /* Cancel queries unconditionally, i.e. don't bother waiting
1772 * wait_timeout as lock_exclusive() does -- the only queries we
1773 * should be killing are disallowed DDL commands hanging around
1774 * for an AccessExclusive lock, which must be deadlocked at
1775 * this point anyway since conn2 holds its AccessShare lock
1776 * already.
1777 */
1778 if (duration > (wait_timeout * 2))
1779 ret = kill_ddl(conn, relid, true);
1780 else
1781 ret = kill_ddl(conn, relid, false);
1782
1783 if (!ret)
1784 break;
1785
1786 /* wait for a while to lock the table. */
1787 wait_msec = Min(1000, i * 100);
1788 printfStringInfo(&sql, "SET LOCAL statement_timeout = %d", wait_msec);
1789 pgut_command(conn, sql.data, 0, NULL);
1790
1791 printfStringInfo(&sql, "LOCK TABLE %s IN ACCESS SHARE MODE", target_name);
1792 res = pgut_execute_elevel(conn, sql.data, 0, NULL, DEBUG2);
1793 if (PQresultStatus(res) == PGRES_COMMAND_OK)
1794 {
1795 CLEARPGRES(res);
1796 break;
1797 }
1798 else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED))
1799 {
1800 /* retry if lock conflicted */
1801 CLEARPGRES(res);
1802 pgut_rollback(conn);
1803 continue;
1804 }
1805 else
1806 {
1807 /* exit otherwise */
1808 elog(WARNING, "%s", PQerrorMessage(connection));
1809 CLEARPGRES(res);
1810 ret = false;
1811 break;
1812 }
1813 }
1814
1815 termStringInfo(&sql);
1816 pgut_command(conn, "RESET statement_timeout", 0, NULL);
1817 return ret;
1818 }
1819
1820
1821 /* Obtain an advisory lock on the table's OID, to make sure no other
1822 * pg_repack is working on the table. This is not so much a concern with
1823 * full-table repacks, but mainly so that index-only repacks don't interfere
1824 * with each other or a full-table repack.
1825 */
advisory_lock(PGconn * conn,const char * relid)1826 static bool advisory_lock(PGconn *conn, const char *relid)
1827 {
1828 PGresult *res = NULL;
1829 bool ret = false;
1830 const char *params[2];
1831
1832 params[0] = REPACK_LOCK_PREFIX_STR;
1833 params[1] = relid;
1834
1835 /* For the 2-argument form of pg_try_advisory_lock, we need to
1836 * pass in two signed 4-byte integers. But a table OID is an
1837 * *unsigned* 4-byte integer. Add -2147483648 to that OID to make
1838 * it fit reliably into signed int space.
1839 */
1840 res = pgut_execute(conn, "SELECT pg_try_advisory_lock($1, CAST(-2147483648 + $2::bigint AS integer))",
1841 2, params);
1842
1843 if (PQresultStatus(res) != PGRES_TUPLES_OK) {
1844 elog(ERROR, "%s", PQerrorMessage(connection));
1845 }
1846 else if (strcmp(getstr(res, 0, 0), "t") != 0) {
1847 elog(ERROR, "Another pg_repack command may be running on the table. Please try again later.");
1848 }
1849 else {
1850 ret = true;
1851 }
1852 CLEARPGRES(res);
1853 return ret;
1854 }
1855
1856 /*
1857 * Try acquire an ACCESS EXCLUSIVE table lock, avoiding deadlocks and long
1858 * waits by killing off other sessions.
1859 * Arguments:
1860 *
1861 * conn: connection to use
1862 * relid: OID of relation
1863 * lock_query: LOCK TABLE ... IN ACCESS EXCLUSIVE query to be executed
1864 * start_xact: whether we will issue a BEGIN ourselves. If not, we will
1865 * use a SAVEPOINT and ROLLBACK TO SAVEPOINT if our query
1866 * times out, to avoid leaving the transaction in error state.
1867 */
1868 static bool
lock_exclusive(PGconn * conn,const char * relid,const char * lock_query,bool start_xact)1869 lock_exclusive(PGconn *conn, const char *relid, const char *lock_query, bool start_xact)
1870 {
1871 time_t start = time(NULL);
1872 int i;
1873 bool ret = true;
1874
1875 for (i = 1; ; i++)
1876 {
1877 time_t duration;
1878 char sql[1024];
1879 PGresult *res;
1880 int wait_msec;
1881
1882 if (start_xact)
1883 pgut_command(conn, "BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
1884 else
1885 pgut_command(conn, "SAVEPOINT repack_sp1", 0, NULL);
1886
1887 duration = time(NULL) - start;
1888 if (duration > wait_timeout)
1889 {
1890 if (no_kill_backend)
1891 {
1892 elog(WARNING, "timed out, do not cancel conflicting backends");
1893 ret = false;
1894
1895 /* Before exit the loop reset the transaction */
1896 if (start_xact)
1897 pgut_rollback(conn);
1898 else
1899 pgut_command(conn, "ROLLBACK TO SAVEPOINT repack_sp1", 0, NULL);
1900 break;
1901 }
1902 else
1903 {
1904 const char *cancel_query;
1905 if (PQserverVersion(conn) >= 80400 &&
1906 duration > wait_timeout * 2)
1907 {
1908 elog(WARNING, "terminating conflicted backends");
1909 cancel_query =
1910 "SELECT pg_terminate_backend(pid) FROM pg_locks"
1911 " WHERE locktype = 'relation'"
1912 " AND relation = $1 AND pid <> pg_backend_pid()";
1913 }
1914 else
1915 {
1916 elog(WARNING, "canceling conflicted backends");
1917 cancel_query =
1918 "SELECT pg_cancel_backend(pid) FROM pg_locks"
1919 " WHERE locktype = 'relation'"
1920 " AND relation = $1 AND pid <> pg_backend_pid()";
1921 }
1922
1923 pgut_command(conn, cancel_query, 1, &relid);
1924 }
1925 }
1926
1927 /* wait for a while to lock the table. */
1928 wait_msec = Min(1000, i * 100);
1929 snprintf(sql, lengthof(sql), "SET LOCAL statement_timeout = %d", wait_msec);
1930 pgut_command(conn, sql, 0, NULL);
1931
1932 res = pgut_execute_elevel(conn, lock_query, 0, NULL, DEBUG2);
1933 if (PQresultStatus(res) == PGRES_COMMAND_OK)
1934 {
1935 CLEARPGRES(res);
1936 break;
1937 }
1938 else if (sqlstate_equals(res, SQLSTATE_QUERY_CANCELED))
1939 {
1940 /* retry if lock conflicted */
1941 CLEARPGRES(res);
1942 if (start_xact)
1943 pgut_rollback(conn);
1944 else
1945 pgut_command(conn, "ROLLBACK TO SAVEPOINT repack_sp1", 0, NULL);
1946 continue;
1947 }
1948 else
1949 {
1950 /* exit otherwise */
1951 printf("%s", PQerrorMessage(connection));
1952 CLEARPGRES(res);
1953 ret = false;
1954 break;
1955 }
1956 }
1957
1958 pgut_command(conn, "RESET statement_timeout", 0, NULL);
1959 return ret;
1960 }
1961
1962 /* This function calls to repack_drop() to clean temporary objects on error
1963 * in creation of temporary objects.
1964 */
1965 void
repack_cleanup_callback(bool fatal,void * userdata)1966 repack_cleanup_callback(bool fatal, void *userdata)
1967 {
1968 Oid target_table = *(Oid *) userdata;
1969 const char *params[2];
1970 char buffer[12];
1971 char num_buff[12];
1972
1973 if(fatal)
1974 {
1975 params[0] = utoa(target_table, buffer);
1976 params[1] = utoa(temp_obj_num, num_buff);
1977
1978 /* testing PQstatus() of connection and conn2, as we do
1979 * in repack_cleanup(), doesn't seem to work here,
1980 * so just use an unconditional reconnect().
1981 */
1982 reconnect(ERROR);
1983 command("SELECT repack.repack_drop($1, $2)", 2, params);
1984 temp_obj_num = 0; /* reset temporary object counter after cleanup */
1985 }
1986 }
1987
1988 /*
1989 * The userdata pointing a table being re-organized. We need to cleanup temp
1990 * objects before the program exits.
1991 */
1992 static void
repack_cleanup(bool fatal,const repack_table * table)1993 repack_cleanup(bool fatal, const repack_table *table)
1994 {
1995 if (fatal)
1996 {
1997 fprintf(stderr, "!!!FATAL ERROR!!! Please refer to the manual.\n\n");
1998 }
1999 else
2000 {
2001 char buffer[12];
2002 char num_buff[12];
2003 const char *params[2];
2004
2005 /* Try reconnection if not available. */
2006 if (PQstatus(connection) != CONNECTION_OK ||
2007 PQstatus(conn2) != CONNECTION_OK)
2008 reconnect(ERROR);
2009
2010 /* do cleanup */
2011 params[0] = utoa(table->target_oid, buffer);
2012 params[1] = utoa(temp_obj_num, num_buff);
2013 command("SELECT repack.repack_drop($1, $2)", 2, params);
2014 temp_obj_num = 0; /* reset temporary object counter after cleanup */
2015 }
2016 }
2017
2018 /*
2019 * Indexes of a table are repacked.
2020 */
2021 static bool
repack_table_indexes(PGresult * index_details)2022 repack_table_indexes(PGresult *index_details)
2023 {
2024 bool ret = false;
2025 PGresult *res = NULL, *res2 = NULL;
2026 StringInfoData sql, sql_drop;
2027 char buffer[2][12];
2028 const char *create_idx, *schema_name, *table_name, *params[3];
2029 Oid table, index;
2030 int i, num, num_repacked = 0;
2031 bool *repacked_indexes;
2032
2033 initStringInfo(&sql);
2034
2035 num = PQntuples(index_details);
2036 table = getoid(index_details, 0, 3);
2037 params[1] = utoa(table, buffer[1]);
2038 params[2] = tablespace;
2039 schema_name = getstr(index_details, 0, 5);
2040 /* table_name is schema-qualified */
2041 table_name = getstr(index_details, 0, 4);
2042
2043 /* Keep track of which of the table's indexes we have successfully
2044 * repacked, so that we may DROP only those indexes.
2045 */
2046 if (!(repacked_indexes = calloc(num, sizeof(bool))))
2047 ereport(ERROR, (errcode(ENOMEM),
2048 errmsg("Unable to calloc repacked_indexes")));
2049
2050 /* Check if any concurrent pg_repack command is being run on the same
2051 * table.
2052 */
2053 if (!advisory_lock(connection, params[1]))
2054 ereport(ERROR, (errcode(EINVAL),
2055 errmsg("Unable to obtain advisory lock on \"%s\"", table_name)));
2056
2057 for (i = 0; i < num; i++)
2058 {
2059 char *isvalid = getstr(index_details, i, 2);
2060 char *idx_name = getstr(index_details, i, 0);
2061
2062 if (isvalid[0] == 't')
2063 {
2064 index = getoid(index_details, i, 1);
2065
2066 resetStringInfo(&sql);
2067 appendStringInfo(&sql, "SELECT pgc.relname, nsp.nspname "
2068 "FROM pg_class pgc INNER JOIN pg_namespace nsp "
2069 "ON nsp.oid = pgc.relnamespace "
2070 "WHERE pgc.relname = 'index_%u' "
2071 "AND nsp.nspname = $1", index);
2072 params[0] = schema_name;
2073 elog(INFO, "repacking index \"%s\"", idx_name);
2074 res = execute(sql.data, 1, params);
2075 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2076 {
2077 elog(WARNING, "%s", PQerrorMessage(connection));
2078 continue;
2079 }
2080 if (PQntuples(res) > 0)
2081 {
2082 ereport(WARNING,
2083 (errcode(E_PG_COMMAND),
2084 errmsg("Cannot create index \"%s\".\"index_%u\", "
2085 "already exists", schema_name, index),
2086 errdetail("An invalid index may have been left behind"
2087 " by a previous pg_repack on the table"
2088 " which was interrupted. Please use DROP "
2089 "INDEX \"%s\".\"index_%u\""
2090 " to remove this index and try again.",
2091 schema_name, index)));
2092 continue;
2093 }
2094
2095 if (dryrun)
2096 continue;
2097
2098 params[0] = utoa(index, buffer[0]);
2099 res = execute("SELECT repack.repack_indexdef($1, $2, $3, true)", 3,
2100 params);
2101
2102 if (PQntuples(res) < 1)
2103 {
2104 elog(WARNING,
2105 "unable to generate SQL to CREATE work index for %s",
2106 getstr(index_details, i, 0));
2107 continue;
2108 }
2109
2110 create_idx = getstr(res, 0, 0);
2111 /* Use a separate PGresult to avoid stomping on create_idx */
2112 res2 = execute_elevel(create_idx, 0, NULL, DEBUG2);
2113
2114 if (PQresultStatus(res2) != PGRES_COMMAND_OK)
2115 {
2116 ereport(WARNING,
2117 (errcode(E_PG_COMMAND),
2118 errmsg("Error creating index \"%s\".\"index_%u\": %s",
2119 schema_name, index, PQerrorMessage(connection)
2120 ) ));
2121 }
2122 else
2123 {
2124 repacked_indexes[i] = true;
2125 num_repacked++;
2126 }
2127
2128 CLEARPGRES(res);
2129 CLEARPGRES(res2);
2130 }
2131 else
2132 elog(WARNING, "skipping invalid index: %s.%s", schema_name,
2133 getstr(index_details, i, 0));
2134 }
2135
2136 if (dryrun) {
2137 ret = true;
2138 goto done;
2139 }
2140
2141 /* If we did not successfully repack any indexes, e.g. because of some
2142 * error affecting every CREATE INDEX attempt, don't waste time with
2143 * the ACCESS EXCLUSIVE lock on the table, and return false.
2144 * N.B. none of the DROP INDEXes should be performed since
2145 * repacked_indexes[] flags should all be false.
2146 */
2147 if (!num_repacked)
2148 {
2149 elog(WARNING,
2150 "Skipping index swapping for \"%s\", since no new indexes built",
2151 table_name);
2152 goto drop_idx;
2153 }
2154
2155 /* take an exclusive lock on table before calling repack_index_swap() */
2156 resetStringInfo(&sql);
2157 appendStringInfo(&sql, "LOCK TABLE %s IN ACCESS EXCLUSIVE MODE",
2158 table_name);
2159 if (!(lock_exclusive(connection, params[1], sql.data, true)))
2160 {
2161 elog(WARNING, "lock_exclusive() failed in connection for %s",
2162 table_name);
2163 goto drop_idx;
2164 }
2165
2166 for (i = 0; i < num; i++)
2167 {
2168 index = getoid(index_details, i, 1);
2169 if (repacked_indexes[i])
2170 {
2171 params[0] = utoa(index, buffer[0]);
2172 pgut_command(connection, "SELECT repack.repack_index_swap($1)", 1,
2173 params);
2174 }
2175 else
2176 elog(INFO, "Skipping index swap for index_%u", index);
2177 }
2178 pgut_command(connection, "COMMIT", 0, NULL);
2179 ret = true;
2180
2181 drop_idx:
2182 resetStringInfo(&sql);
2183 initStringInfo(&sql_drop);
2184 appendStringInfoString(&sql, "DROP INDEX CONCURRENTLY ");
2185 appendStringInfo(&sql, "\"%s\".", schema_name);
2186
2187 for (i = 0; i < num; i++)
2188 {
2189 index = getoid(index_details, i, 1);
2190 if (repacked_indexes[i])
2191 {
2192 initStringInfo(&sql_drop);
2193 appendStringInfo(&sql_drop, "%s\"index_%u\"", sql.data, index);
2194 command(sql_drop.data, 0, NULL);
2195 }
2196 else
2197 elog(INFO, "Skipping drop of index_%u", index);
2198 }
2199 termStringInfo(&sql_drop);
2200 termStringInfo(&sql);
2201
2202 done:
2203 CLEARPGRES(res);
2204 free(repacked_indexes);
2205
2206 return ret;
2207 }
2208
2209 /*
2210 * Call repack_table_indexes for each of the tables
2211 */
2212 static bool
repack_all_indexes(char * errbuf,size_t errsize)2213 repack_all_indexes(char *errbuf, size_t errsize)
2214 {
2215 bool ret = false;
2216 PGresult *res = NULL;
2217 StringInfoData sql;
2218 SimpleStringListCell *cell = NULL;
2219 const char *params[1];
2220
2221 initStringInfo(&sql);
2222 reconnect(ERROR);
2223
2224 assert(r_index.head || table_list.head || parent_table_list.head);
2225
2226 if (!preliminary_checks(errbuf, errsize))
2227 goto cleanup;
2228
2229 if (!is_requested_relation_exists(errbuf, errsize))
2230 goto cleanup;
2231
2232 if (r_index.head)
2233 {
2234 appendStringInfoString(&sql,
2235 "SELECT repack.oid2text(i.oid), idx.indexrelid, idx.indisvalid, idx.indrelid, repack.oid2text(idx.indrelid), n.nspname"
2236 " FROM pg_index idx JOIN pg_class i ON i.oid = idx.indexrelid"
2237 " JOIN pg_namespace n ON n.oid = i.relnamespace"
2238 " WHERE idx.indexrelid = $1::regclass ORDER BY indisvalid DESC, i.relname, n.nspname");
2239
2240 cell = r_index.head;
2241 }
2242 else if (table_list.head || parent_table_list.head)
2243 {
2244 appendStringInfoString(&sql,
2245 "SELECT repack.oid2text(i.oid), idx.indexrelid, idx.indisvalid, idx.indrelid, $1::text, n.nspname"
2246 " FROM pg_index idx JOIN pg_class i ON i.oid = idx.indexrelid"
2247 " JOIN pg_namespace n ON n.oid = i.relnamespace"
2248 " WHERE idx.indrelid = $1::regclass ORDER BY indisvalid DESC, i.relname, n.nspname");
2249
2250 for (cell = parent_table_list.head; cell; cell = cell->next)
2251 {
2252 int nchildren, i;
2253
2254 params[0] = cell->val;
2255
2256 /* find children of this parent table */
2257 res = execute_elevel("SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname)"
2258 " FROM pg_class c JOIN pg_namespace n on n.oid = c.relnamespace"
2259 " WHERE c.oid = ANY (repack.get_table_and_inheritors($1::regclass))"
2260 " ORDER BY n.nspname, c.relname", 1, params, DEBUG2);
2261
2262 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2263 {
2264 elog(WARNING, "%s", PQerrorMessage(connection));
2265 continue;
2266 }
2267
2268 nchildren = PQntuples(res);
2269
2270 if (nchildren == 0)
2271 {
2272 elog(WARNING, "relation \"%s\" does not exist", cell->val);
2273 continue;
2274 }
2275
2276 /* append new tables to 'table_list' */
2277 for (i = 0; i < nchildren; i++)
2278 simple_string_list_append(&table_list, getstr(res, i, 0));
2279 }
2280
2281 CLEARPGRES(res);
2282
2283 cell = table_list.head;
2284 }
2285
2286 for (; cell; cell = cell->next)
2287 {
2288 params[0] = cell->val;
2289 res = execute_elevel(sql.data, 1, params, DEBUG2);
2290
2291 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2292 {
2293 elog(WARNING, "%s", PQerrorMessage(connection));
2294 continue;
2295 }
2296
2297 if (PQntuples(res) == 0)
2298 {
2299 if(table_list.head)
2300 elog(WARNING, "\"%s\" does not have any indexes",
2301 cell->val);
2302 else if(r_index.head)
2303 elog(WARNING, "\"%s\" is not a valid index",
2304 cell->val);
2305
2306 continue;
2307 }
2308
2309 if(table_list.head)
2310 elog(INFO, "repacking indexes of \"%s\"", cell->val);
2311
2312 if (!repack_table_indexes(res))
2313 elog(WARNING, "repack failed for \"%s\"", cell->val);
2314
2315 CLEARPGRES(res);
2316 }
2317 ret = true;
2318
2319 cleanup:
2320 disconnect();
2321 termStringInfo(&sql);
2322 return ret;
2323 }
2324
2325 void
pgut_help(bool details)2326 pgut_help(bool details)
2327 {
2328 printf("%s re-organizes a PostgreSQL database.\n\n", PROGRAM_NAME);
2329 printf("Usage:\n");
2330 printf(" %s [OPTION]... [DBNAME]\n", PROGRAM_NAME);
2331
2332 if (!details)
2333 return;
2334
2335 printf("Options:\n");
2336 printf(" -a, --all repack all databases\n");
2337 printf(" -t, --table=TABLE repack specific table only\n");
2338 printf(" -I, --parent-table=TABLE repack specific parent table and its inheritors\n");
2339 printf(" -c, --schema=SCHEMA repack tables in specific schema only\n");
2340 printf(" -s, --tablespace=TBLSPC move repacked tables to a new tablespace\n");
2341 printf(" -S, --moveidx move repacked indexes to TBLSPC too\n");
2342 printf(" -o, --order-by=COLUMNS order by columns instead of cluster keys\n");
2343 printf(" -n, --no-order do vacuum full instead of cluster\n");
2344 printf(" -N, --dry-run print what would have been repacked\n");
2345 printf(" -j, --jobs=NUM Use this many parallel jobs for each table\n");
2346 printf(" -i, --index=INDEX move only the specified index\n");
2347 printf(" -x, --only-indexes move only indexes of the specified table\n");
2348 printf(" -T, --wait-timeout=SECS timeout to cancel other backends on conflict\n");
2349 printf(" -D, --no-kill-backend don't kill other backends when timed out\n");
2350 printf(" -Z, --no-analyze don't analyze at end\n");
2351 printf(" -k, --no-superuser-check skip superuser checks in client\n");
2352 printf(" -C, --exclude-extension don't repack tables which belong to specific extension\n");
2353 }
2354