1#!/usr/local/bin/python3.8
2
3import argparse, psycopg2, sys, time
4from multiprocessing import Process
5
6partman_version = "2.2.0"
7
8parser = argparse.ArgumentParser(description="Script for reapplying additional constraints managed by pg_partman on child tables. See docs for additional info on this special constraint management. Script runs in two distinct modes: 1) Drop all constraints  2) Apply all constraints. Typical usage would be to run the drop mode, edit the data, then run apply mode to re-create all constraints on a partition set.")
9parser.add_argument('-p', '--parent', help="Parent table of an already created partition set. (Required)")
10parser.add_argument('-c','--connection', default="host=", help="""Connection string for use by psycopg. Defaults to "host=" (local socket).""")
11parser.add_argument('-d', '--drop_constraints', action="store_true", help="Drop all constraints managed by pg_partman. Drops constraints on all child tables including current & future.")
12parser.add_argument('-a', '--add_constraints', action="store_true", help="Apply configured constraints to all child tables older than the optimize_constraint value.")
13parser.add_argument('-j', '--jobs', type=int, default=0, help="Use the python multiprocessing library to recreate indexes in parallel. Value for -j is number of simultaneous jobs to run. Note that this is per table, not per index. Be very careful setting this option if load is a concern on your systems.")
14parser.add_argument('-w', '--wait', type=float, default=0, help="Wait the given number of seconds after a table has had its constraints dropped or applied before moving on to the next. When used with -j, this will set the pause between the batches of parallel jobs instead.")
15parser.add_argument('--dryrun', action="store_true", help="Show what the script will do without actually running it against the database. Highly recommend reviewing this before running.")
16parser.add_argument('-q', '--quiet', action="store_true", help="Turn off all output.")
17parser.add_argument('--version', action="store_true", help="Print out the minimum version of pg_partman this script is meant to work with. The version of pg_partman installed may be greater than this.")
18args = parser.parse_args()
19
20
21def apply_proc(child_table, partman_schema):
22    conn = create_conn()
23    conn.autocommit = True
24    cur = conn.cursor()
25    sql = "SELECT " + partman_schema + ".apply_constraints(%s, %s, %s, %s, %s)"
26    debug = False;
27    if not args.quiet:
28        debug = True
29        print(cur.mogrify(sql, [args.parent, child_table, False, None, debug]))
30    if not args.dryrun:
31        cur.execute(sql, [args.parent, child_table, False, None, debug])
32    cur.close()
33    close_conn(conn)
34
35
36def create_conn():
37    conn = psycopg2.connect(args.connection)
38    return conn
39
40
41def close_conn(conn):
42    conn.close()
43
44
45def drop_proc(child_table, partman_schema):
46    conn = create_conn()
47    conn.autocommit = True
48    cur = conn.cursor()
49    sql = "SELECT " + partman_schema + ".drop_constraints(%s, %s, %s)"
50    debug = False;
51    if not args.quiet:
52        debug = True
53        print(cur.mogrify(sql, [args.parent, child_table, debug]))
54    if not args.dryrun:
55        cur.execute(sql, [args.parent, child_table, debug])
56    cur.close()
57    close_conn(conn)
58
59
60def get_children(conn, partman_schema):
61    cur = conn.cursor()
62    sql = "SELECT partition_schemaname||'.'||partition_tablename FROM " + partman_schema + ".show_partitions(%s, %s)"
63    cur.execute(sql, [args.parent, 'ASC'])
64    child_list = cur.fetchall()
65    cur.close()
66    return child_list
67
68
69def get_partman_schema(conn):
70    cur = conn.cursor()
71    sql = "SELECT nspname FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_partman' AND e.extnamespace = n.oid"
72    cur.execute(sql)
73    partman_schema = "\"" + cur.fetchone()[0] + "\""
74    cur.close()
75    return partman_schema
76
77
78def get_config_values(conn, partman_schema):
79    # [0] = premake, [1] = optimize_constraint
80    cur = conn.cursor()
81    sql = "SELECT premake, optimize_constraint FROM " + partman_schema + ".part_config WHERE parent_table = %s"
82    cur.execute(sql, [args.parent])
83    config_values = cur.fetchone()
84    cur.close()
85    return config_values
86
87
88def get_quoted_parent_table(conn):
89    cur = conn.cursor()
90    sql = "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = %s"
91    cur.execute(sql, [args.parent])
92    result = cur.fetchone()
93    if result == None:
94        print("Given parent table ("+args.parent+") does not exist")
95        sys.exit(2)
96    quoted_parent_table = "\"" + result[0] + "\".\"" + result[1] + "\""
97    cur.close()
98    return quoted_parent_table
99
100
101def print_version():
102    print(partman_version)
103    sys.exit()
104
105
106if __name__ == "__main__":
107
108    if args.version:
109        print_version()
110
111    if args.parent == None:
112        print("-p/--parent option is required")
113        sys.exit(2)
114
115    if args.parent.find(".") < 0:
116        print("Parent table must be schema qualified")
117        sys.exit(2)
118
119    if args.drop_constraints and args.add_constraints:
120        print("Can only set one or the other of --drop_constraints (-d) and --add_constraints (-a)")
121        sys.exit(2)
122
123    if (args.drop_constraints == False) and (args.add_constraints == False):
124        print("Must set one of --drop_constraints (-d) or --add_constraints (-a)")
125        sys.exit(2)
126
127    main_conn = create_conn()
128    partman_schema = get_partman_schema(main_conn)
129    quoted_parent_table = get_quoted_parent_table(main_conn)
130    child_list = get_children(main_conn, partman_schema)
131    config_values = get_config_values(main_conn, partman_schema)
132    premake = int(config_values[0])
133    optimize_constraint = int(config_values[1])
134    if args.add_constraints:
135        # Remove tables from the list of child tables that shouldn't have constraints yet
136        for x in range(optimize_constraint + premake + 1):
137            child_list.pop()
138
139    if args.jobs == 0:
140        for c in child_list:
141            if args.drop_constraints:
142               drop_proc(c[0], partman_schema)
143            if args.add_constraints:
144               apply_proc(c[0], partman_schema)
145            if args.wait > 0:
146                time.sleep(args.wait)
147    else:
148        child_list.reverse()
149        while len(child_list) > 0:
150            if not args.quiet:
151                print("Jobs left in queue: " + str(len(child_list)))
152            if len(child_list) < args.jobs:
153                args.jobs = len(child_list)
154            processlist = []
155            for num in range(0, args.jobs):
156                c = child_list.pop()
157                if args.drop_constraints:
158                    p = Process(target=drop_proc, args=(c[0], partman_schema))
159                if args.add_constraints:
160                    p = Process(target=apply_proc, args=(c[0], partman_schema))
161                p.start()
162                processlist.append(p)
163            for j in processlist:
164                j.join()
165            if args.wait > 0:
166                time.sleep(args.wait)
167
168    sql = 'ANALYZE ' + quoted_parent_table
169    main_cur = main_conn.cursor()
170    if not args.quiet:
171        print(main_cur.mogrify(sql))
172    if not args.dryrun:
173        main_cur.execute(sql)
174
175    close_conn(main_conn)
176
177