1#!/usr/local/bin/python3.8
2
3import argparse, psycopg2, re, sys, time
4from multiprocessing import Process
5
6partman_version = "4.1.0"
7
8parser = argparse.ArgumentParser(description="Script for reapplying indexes on child tables in a partition set to match the parent table. Any indexes that currently exist on the children and match the definition on the parent will be left as is. There is an option to recreate matching as well indexes if desired, as well as the primary key. Indexes that do not exist on the parent will be dropped. Commits are done after each index is dropped/created to help prevent long running transactions & locks.", epilog="NOTE: New index names are made based off the child table name & columns used, so their naming may differ from the name given on the parent. This is done to allow the tool to account for long or duplicate index names. If an index name would be duplicated, an incremental counter is added on to the end of the index name to allow it to be created. Use the --dryrun option first to see what it will do and which names may cause dupes to be handled like this.")
9parser.add_argument('-p', '--parent', help="Parent table of an already created partition set. (Required)")
10parser.add_argument('-c','--connection', default="host=", help="""Connection string for use by psycopg. Defaults to "host=" (local socket).""")
11parser.add_argument('--concurrent', action="store_true", help="Create indexes with the CONCURRENTLY option. Note this does not work on primary keys when --primary is given.")
12parser.add_argument('--drop_concurrent', action="store_true", help="If an index is dropped (because it doesn't exist on the parent or because you set them to be recreated), do it concurrently. Note this does not work on primary keys when --primary is given.")
13parser.add_argument('-R', '--recreate_all', action="store_true", help="By default, if an index exists on a child and matches the parent, it will not be touched. Setting this option will force all child indexes to be dropped & recreated. Will obey the --concurrent & --drop_concurrent options if given. Will not recreate primary keys unless --primary option is also given.")
14parser.add_argument('--primary', action="store_true", help="By default the primary key is not recreated. Set this option if that is needed. Note this will cause an exclusive lock on the child table.")
15parser.add_argument('-j', '--jobs', type=int, default=0, help="Use the python multiprocessing library to recreate indexes in parallel. Value for -j is number of simultaneous jobs to run. Note that this is per table, not per index. Be very careful setting this option if load is a concern on your systems.")
16parser.add_argument('-w', '--wait', type=float, default=0, help="Wait the given number of seconds after indexes have finished being created on a table before moving on to the next. When used with -j, this will set the pause between the batches of parallel jobs instead.")
17parser.add_argument('--dryrun', action="store_true", help="Show what the script will do without actually running it against the database. Highly recommend reviewing this before running. Note that if multiple indexes would get the same default name, the duplicated name will show in the dryrun (because the index doesn't exist in the catalog to check for it). When the real thing is run, the duplicated names will be handled as stated in NOTE at the end of --help.")
18parser.add_argument('-q', '--quiet', action="store_true", help="Turn off all output.")
19parser.add_argument('--nonpartman', action="store_true", help="If the partition set you are running this on is not managed by pg_partman, set this flag otherwise this script may not work. Note that the pg_partman extension is still required to be installed for this to work since it uses certain internal functions. When this is set the order that the tables are reindexed is alphabetical instead of logical.")
20parser.add_argument('--version', action="store_true", help="Print out the minimum version of pg_partman this script is meant to work with. The version of pg_partman installed may be greater than this.")
21args = parser.parse_args()
22
23def check_compatibility(conn, partman_schema):
24    cur = conn.cursor()
25
26    sql = """SELECT current_setting('server_version_num')::int"""
27    cur.execute(sql)
28    pg_version = int(cur.fetchone()[0])
29
30    if pg_version < 90400:
31        print("ERROR: This script requires PostgreSQL minimum version of 9.4.0")
32        sys.exit(2)
33
34    sql = "SELECT partition_type FROM " + partman_schema + ".part_config WHERE parent_table = %s"
35    cur.execute(sql, [args.parent])
36    partition_type = cur.fetchone()[0]
37
38    if pg_version >= 110000 and partition_type == "native":
39        print("This script cannot currently work with native partition sets in PG11+. Please use native index inheritance methods if possible.")
40        cur.close()
41        close_conn(conn)
42        sys.exit(2)
43
44    cur.close()
45
46
47def create_conn():
48    conn = psycopg2.connect(args.connection)
49    return conn
50
51
52def create_index(conn, partman_schema, child_schemaname, child_tablename, child_index_list, parent, parent_index_list):
53    cur = conn.cursor()
54    sql = """SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = %s"""
55    cur.execute(sql, [parent])
56    result = cur.fetchone()
57    parent_schemaname = result[0]
58    parent_tablename = result[1]
59    cur.close()
60    parent_match_regex = re.compile(r" ON \"?%s\"?\.\"?%s\"? | ON \"?%s\"?" % (parent_schemaname, parent_tablename, parent_tablename))
61    index_match_regex = re.compile(r'(?P<index_def>USING .*)')
62    for i in parent_index_list:
63        # if there is already a child index that matches the parent index don't try to create it unless --recreate_all set
64        if args.recreate_all != True:
65            child_found = False
66            statement = None
67            parinddef = index_match_regex.search(i[0])
68            for c in child_index_list:
69                chinddef = index_match_regex.search(c[0])
70                if chinddef.group('index_def') == parinddef.group('index_def'):
71                    child_found = True
72                    break
73            if child_found:
74                continue
75
76        if i[1] == True and args.primary:
77            index_name = child_tablename + "_" + "_".join(i[2].split(","))
78            sql = "SELECT " + partman_schema + ".check_name_length('" + index_name + "', p_suffix := '_pk')"
79            cur = conn.cursor()
80            cur.execute(sql)
81            index_name = cur.fetchone()[0]
82            cur.close()
83            quoted_column_names = "\"" + "\",\"".join(i[2].split(",")) + "\""
84            statement = "ALTER TABLE \"" + child_schemaname + "\".\"" + child_tablename + "\" ADD CONSTRAINT \"" + index_name + "\" PRIMARY KEY (" + quoted_column_names + ")"
85        elif i[1] == False:
86            index_name = child_tablename
87            if i[2] != None:
88                index_name += "_"
89                index_name += "_".join(i[2].split(","))
90            sql = "SELECT " + partman_schema + ".check_name_length('" + index_name + "', p_suffix := '_idx')"
91            cur = conn.cursor()
92            cur.execute(sql)
93            index_name = cur.fetchone()[0]
94            name_counter = 1
95            while True:
96                sql = "SELECT count(*) FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = %s AND c.relname = %s"
97                cur = conn.cursor()
98                cur.execute(sql, [child_schemaname, index_name])
99                index_exists = cur.fetchone()[0]
100                if index_exists != None and index_exists > 0:
101                    index_name = child_tablename
102                    if i[2] != None:
103                        index_name += "_"
104                        index_name += "_".join(i[2].split(","))
105                    suffix = "_idx" + str(name_counter)
106                    sql = "SELECT " + partman_schema + ".check_name_length('" + index_name + "', p_suffix := '" + suffix + "')"
107                    cur = conn.cursor()
108                    cur.execute(sql)
109                    index_name = cur.fetchone()[0]
110                    name_counter += 1
111                else:
112                    break
113            cur.close()
114            statement = i[0]
115            statement = statement.replace(i[3], index_name)  # replace parent index name with child index name
116            if args.concurrent:
117                statement = statement.replace("CREATE INDEX ", "CREATE INDEX CONCURRENTLY ")
118            statement = parent_match_regex.sub(" ON \"" + child_schemaname + "\".\"" + child_tablename + "\" ", statement)
119        cur = conn.cursor()
120        if statement != None:
121            if not args.quiet:
122                print(cur.mogrify(statement))
123            if not args.dryrun:
124                cur.execute(statement)
125        cur.close()
126
127
128def close_conn(conn):
129    conn.close()
130
131
132def drop_index(conn, child_schemaname, child_tablename, child_index_list, parent_index_list):
133    cur = conn.cursor()
134    for d in child_index_list:
135        if d[1] == True and args.primary:
136            statement = "ALTER TABLE \"" + child_schemaname + "\".\"" + child_tablename + "\" DROP CONSTRAINT \"" + d[4] + "\""
137            if not args.quiet:
138                print(cur.mogrify(statement))
139            if not args.dryrun:
140                cur.execute(statement)
141        elif d[1] == False:
142            if args.drop_concurrent:
143                statement = "DROP INDEX CONCURRENTLY \"" + d[2] + "\".\"" + d[3] + "\""
144            else:
145                statement = "DROP INDEX \"" + d[2] + "\".\"" + d[3] + "\""
146
147            if args.recreate_all != True:
148                pat = re.compile(r'(?P<index_def>USING .*)')
149                # if there is a parent index that matches the child index
150                # don't try to drop it - we'd just have to recreate it
151                chinddef = pat.search(d[0])
152                parent_found = False
153                for p in parent_index_list:
154                    parent_found = False
155                    parinddef = pat.search(p[0])
156                    if chinddef.group('index_def') == parinddef.group('index_def'):
157                        parent_found = True
158                        break
159                if not parent_found:
160                    if not args.quiet:
161                        print(cur.mogrify(statement))
162                    if not args.dryrun:
163                        cur.execute(statement)
164            else:
165                if not args.quiet:
166                    print(cur.mogrify(statement))
167                if not args.dryrun:
168                    cur.execute(statement)
169
170
171def get_children(conn, partman_schema):
172    cur = conn.cursor()
173    if args.nonpartman == False:
174        sql = "SELECT partition_schemaname, partition_tablename FROM " + partman_schema + ".show_partitions(%s)"
175    else:
176        sql = """
177            WITH parent_info AS (
178                SELECT c1.oid FROM pg_catalog.pg_class c1
179                JOIN pg_catalog.pg_namespace n1 ON c1.relnamespace = n1.oid
180                WHERE n1.nspname ||'.'|| c1.relname = %s
181            )
182            SELECT n.nspname::text, c.relname::text AS partition_name FROM
183            pg_catalog.pg_inherits h
184            JOIN pg_catalog.pg_class c ON c.oid = h.inhrelid
185            JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
186            JOIN parent_info pi ON h.inhparent = pi.oid
187            ORDER BY 1,2"""
188    cur.execute(sql, [args.parent])
189    child_list = cur.fetchall()
190    cur.close()
191    return child_list
192
193
194def get_child_index_list(conn, child_schemaname, child_tablename):
195    cur = conn.cursor()
196    sql = """
197            WITH child_info AS (
198                SELECT c1.oid FROM pg_catalog.pg_class c1
199                JOIN pg_catalog.pg_namespace n1 ON c1.relnamespace = n1.oid
200                WHERE n1.nspname = %s
201                AND c1.relname = %s
202            )
203            SELECT pg_get_indexdef(indexrelid) AS statement
204            , i.indisprimary
205            , n.nspname AS index_schemaname
206            , c.relname AS index_name
207            , t.conname
208            FROM pg_catalog.pg_index i
209            JOIN pg_catalog.pg_class c ON i.indexrelid = c.oid
210            JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
211            LEFT JOIN pg_catalog.pg_constraint t ON c.oid = t.conindid
212            JOIN child_info ci ON i.indrelid = ci.oid """
213    cur.execute(sql, [child_schemaname, child_tablename])
214    child_index_list = cur.fetchall()
215    cur.close()
216    return child_index_list
217
218
219def get_parent_index_list(conn, parent):
220    cur = conn.cursor()
221
222    sql = """
223            WITH parent_info AS (
224                SELECT c1.oid FROM pg_catalog.pg_class c1
225                JOIN pg_catalog.pg_namespace n1 ON c1.relnamespace = n1.oid
226                WHERE n1.nspname||'.'||c1.relname = %s
227            )
228            SELECT
229            pg_get_indexdef(indexrelid) AS statement
230            , i.indisprimary
231            , ( SELECT array_to_string(array_agg( a.attname ORDER by x.r ), ',')
232                FROM pg_catalog.pg_attribute a
233                JOIN ( SELECT k, row_number() over () as r
234                        FROM unnest(i.indkey) k ) as x
235                ON a.attnum = x.k AND a.attrelid = i.indrelid
236            ) AS indkey_names
237            , c.relname AS index_name
238            FROM pg_catalog.pg_index i
239            JOIN pg_catalog.pg_class c ON i.indexrelid = c.oid
240            JOIN parent_info pi ON i.indrelid = pi.oid
241            WHERE i.indisvalid
242            ORDER BY 1"""
243    cur.execute(sql, [parent])
244    parent_index_list = cur.fetchall()
245    return parent_index_list
246
247
248def get_parent(conn, partman_schema):
249    cur = conn.cursor()
250
251    sql = "SELECT template_table FROM " + partman_schema + ".part_config WHERE parent_table = %s AND partition_type = 'native'"
252    cur.execute(sql, [args.parent])
253    template_table = cur.fetchone()
254
255    if template_table is None:
256        return args.parent
257    else:
258        return template_table[0]
259
260
261def get_partman_schema(conn):
262    cur = conn.cursor()
263    sql = "SELECT nspname FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_partman' AND e.extnamespace = n.oid"
264    cur.execute(sql)
265    partman_schema = "\"" + cur.fetchone()[0] + "\""
266    cur.close()
267    return partman_schema
268
269
270def print_version():
271    print(partman_version)
272    sys.exit()
273
274
275def reindex_proc(child_schemaname, child_tablename, parent, parent_index_list, partman_schema):
276    conn = create_conn()
277    conn.autocommit = True # must be turned on to support CONCURRENTLY
278    cur = conn.cursor()
279    child_index_list = get_child_index_list(conn, child_schemaname, child_tablename)
280    drop_index(conn, child_schemaname, child_tablename, child_index_list, parent_index_list)
281    create_index(conn, partman_schema, child_schemaname, child_tablename, child_index_list, parent, parent_index_list)
282
283    sql = "ANALYZE \"" + child_schemaname + "\".\"" + child_tablename + "\""
284    if not args.quiet:
285        print(cur.mogrify(sql))
286    if not args.dryrun:
287        cur.execute(sql)
288    cur.close()
289    close_conn(conn)
290
291
292if __name__ == "__main__":
293
294    if args.version:
295        print_version()
296
297    if args.parent == None:
298        print("-p/--parent option is required")
299        sys.exit(2)
300
301    if args.parent.find(".") < 0:
302        print("ERROR: Parent table must be schema qualified")
303        sys.exit(2)
304
305    conn = create_conn()
306
307    partman_schema = get_partman_schema(conn)
308    check_compatibility(conn,partman_schema)
309    parent = get_parent(conn, partman_schema)
310    parent_index_list = get_parent_index_list(conn, parent)
311    child_list = get_children(conn, partman_schema)
312    close_conn(conn)
313
314    if args.jobs == 0:
315         for c in child_list:
316            reindex_proc(c[0], c[1], parent, parent_index_list, partman_schema)
317            if args.wait > 0:
318                time.sleep(args.wait)
319    else:
320        child_list.reverse()
321        while len(child_list) > 0:
322            if not args.quiet:
323                print("Jobs left in queue: " + str(len(child_list)))
324            if len(child_list) < args.jobs:
325                args.jobs = len(child_list)
326            processlist = []
327            for num in range(0, args.jobs):
328                c = child_list.pop()
329                p = Process(target=reindex_proc, args=(c[0], c[1], parent, parent_index_list, partman_schema))
330                p.start()
331                processlist.append(p)
332            for j in processlist:
333                j.join()
334            if args.wait > 0:
335                time.sleep(args.wait)
336