1#!/usr/local/bin/python3.8 2 3import argparse, psycopg2, sys, time 4from multiprocessing import Process 5 6partman_version = "2.2.0" 7 8parser = argparse.ArgumentParser(description="Script for reapplying additional constraints managed by pg_partman on child tables. See docs for additional info on this special constraint management. Script runs in two distinct modes: 1) Drop all constraints 2) Apply all constraints. Typical usage would be to run the drop mode, edit the data, then run apply mode to re-create all constraints on a partition set.") 9parser.add_argument('-p', '--parent', help="Parent table of an already created partition set. (Required)") 10parser.add_argument('-c','--connection', default="host=", help="""Connection string for use by psycopg. Defaults to "host=" (local socket).""") 11parser.add_argument('-d', '--drop_constraints', action="store_true", help="Drop all constraints managed by pg_partman. Drops constraints on all child tables including current & future.") 12parser.add_argument('-a', '--add_constraints', action="store_true", help="Apply configured constraints to all child tables older than the optimize_constraint value.") 13parser.add_argument('-j', '--jobs', type=int, default=0, help="Use the python multiprocessing library to recreate indexes in parallel. Value for -j is number of simultaneous jobs to run. Note that this is per table, not per index. Be very careful setting this option if load is a concern on your systems.") 14parser.add_argument('-w', '--wait', type=float, default=0, help="Wait the given number of seconds after a table has had its constraints dropped or applied before moving on to the next. When used with -j, this will set the pause between the batches of parallel jobs instead.") 15parser.add_argument('--dryrun', action="store_true", help="Show what the script will do without actually running it against the database. Highly recommend reviewing this before running.") 16parser.add_argument('-q', '--quiet', action="store_true", help="Turn off all output.") 17parser.add_argument('--version', action="store_true", help="Print out the minimum version of pg_partman this script is meant to work with. The version of pg_partman installed may be greater than this.") 18args = parser.parse_args() 19 20 21def apply_proc(child_table, partman_schema): 22 conn = create_conn() 23 conn.autocommit = True 24 cur = conn.cursor() 25 sql = "SELECT " + partman_schema + ".apply_constraints(%s, %s, %s, %s, %s)" 26 debug = False; 27 if not args.quiet: 28 debug = True 29 print(cur.mogrify(sql, [args.parent, child_table, False, None, debug])) 30 if not args.dryrun: 31 cur.execute(sql, [args.parent, child_table, False, None, debug]) 32 cur.close() 33 close_conn(conn) 34 35 36def create_conn(): 37 conn = psycopg2.connect(args.connection) 38 return conn 39 40 41def close_conn(conn): 42 conn.close() 43 44 45def drop_proc(child_table, partman_schema): 46 conn = create_conn() 47 conn.autocommit = True 48 cur = conn.cursor() 49 sql = "SELECT " + partman_schema + ".drop_constraints(%s, %s, %s)" 50 debug = False; 51 if not args.quiet: 52 debug = True 53 print(cur.mogrify(sql, [args.parent, child_table, debug])) 54 if not args.dryrun: 55 cur.execute(sql, [args.parent, child_table, debug]) 56 cur.close() 57 close_conn(conn) 58 59 60def get_children(conn, partman_schema): 61 cur = conn.cursor() 62 sql = "SELECT partition_schemaname||'.'||partition_tablename FROM " + partman_schema + ".show_partitions(%s, %s)" 63 cur.execute(sql, [args.parent, 'ASC']) 64 child_list = cur.fetchall() 65 cur.close() 66 return child_list 67 68 69def get_partman_schema(conn): 70 cur = conn.cursor() 71 sql = "SELECT nspname FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_partman' AND e.extnamespace = n.oid" 72 cur.execute(sql) 73 partman_schema = "\"" + cur.fetchone()[0] + "\"" 74 cur.close() 75 return partman_schema 76 77 78def get_config_values(conn, partman_schema): 79 # [0] = premake, [1] = optimize_constraint 80 cur = conn.cursor() 81 sql = "SELECT premake, optimize_constraint FROM " + partman_schema + ".part_config WHERE parent_table = %s" 82 cur.execute(sql, [args.parent]) 83 config_values = cur.fetchone() 84 cur.close() 85 return config_values 86 87 88def get_quoted_parent_table(conn): 89 cur = conn.cursor() 90 sql = "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = %s" 91 cur.execute(sql, [args.parent]) 92 result = cur.fetchone() 93 if result == None: 94 print("Given parent table ("+args.parent+") does not exist") 95 sys.exit(2) 96 quoted_parent_table = "\"" + result[0] + "\".\"" + result[1] + "\"" 97 cur.close() 98 return quoted_parent_table 99 100 101def print_version(): 102 print(partman_version) 103 sys.exit() 104 105 106if __name__ == "__main__": 107 108 if args.version: 109 print_version() 110 111 if args.parent == None: 112 print("-p/--parent option is required") 113 sys.exit(2) 114 115 if args.parent.find(".") < 0: 116 print("Parent table must be schema qualified") 117 sys.exit(2) 118 119 if args.drop_constraints and args.add_constraints: 120 print("Can only set one or the other of --drop_constraints (-d) and --add_constraints (-a)") 121 sys.exit(2) 122 123 if (args.drop_constraints == False) and (args.add_constraints == False): 124 print("Must set one of --drop_constraints (-d) or --add_constraints (-a)") 125 sys.exit(2) 126 127 main_conn = create_conn() 128 partman_schema = get_partman_schema(main_conn) 129 quoted_parent_table = get_quoted_parent_table(main_conn) 130 child_list = get_children(main_conn, partman_schema) 131 config_values = get_config_values(main_conn, partman_schema) 132 premake = int(config_values[0]) 133 optimize_constraint = int(config_values[1]) 134 if args.add_constraints: 135 # Remove tables from the list of child tables that shouldn't have constraints yet 136 for x in range(optimize_constraint + premake + 1): 137 child_list.pop() 138 139 if args.jobs == 0: 140 for c in child_list: 141 if args.drop_constraints: 142 drop_proc(c[0], partman_schema) 143 if args.add_constraints: 144 apply_proc(c[0], partman_schema) 145 if args.wait > 0: 146 time.sleep(args.wait) 147 else: 148 child_list.reverse() 149 while len(child_list) > 0: 150 if not args.quiet: 151 print("Jobs left in queue: " + str(len(child_list))) 152 if len(child_list) < args.jobs: 153 args.jobs = len(child_list) 154 processlist = [] 155 for num in range(0, args.jobs): 156 c = child_list.pop() 157 if args.drop_constraints: 158 p = Process(target=drop_proc, args=(c[0], partman_schema)) 159 if args.add_constraints: 160 p = Process(target=apply_proc, args=(c[0], partman_schema)) 161 p.start() 162 processlist.append(p) 163 for j in processlist: 164 j.join() 165 if args.wait > 0: 166 time.sleep(args.wait) 167 168 sql = 'ANALYZE ' + quoted_parent_table 169 main_cur = main_conn.cursor() 170 if not args.quiet: 171 print(main_cur.mogrify(sql)) 172 if not args.dryrun: 173 main_cur.execute(sql) 174 175 close_conn(main_conn) 176 177