1#!/usr/local/bin/python3.8 2 3import argparse, psycopg2, re, sys, time 4from multiprocessing import Process 5 6partman_version = "4.1.0" 7 8parser = argparse.ArgumentParser(description="Script for reapplying indexes on child tables in a partition set to match the parent table. Any indexes that currently exist on the children and match the definition on the parent will be left as is. There is an option to recreate matching as well indexes if desired, as well as the primary key. Indexes that do not exist on the parent will be dropped. Commits are done after each index is dropped/created to help prevent long running transactions & locks.", epilog="NOTE: New index names are made based off the child table name & columns used, so their naming may differ from the name given on the parent. This is done to allow the tool to account for long or duplicate index names. If an index name would be duplicated, an incremental counter is added on to the end of the index name to allow it to be created. Use the --dryrun option first to see what it will do and which names may cause dupes to be handled like this.") 9parser.add_argument('-p', '--parent', help="Parent table of an already created partition set. (Required)") 10parser.add_argument('-c','--connection', default="host=", help="""Connection string for use by psycopg. Defaults to "host=" (local socket).""") 11parser.add_argument('--concurrent', action="store_true", help="Create indexes with the CONCURRENTLY option. Note this does not work on primary keys when --primary is given.") 12parser.add_argument('--drop_concurrent', action="store_true", help="If an index is dropped (because it doesn't exist on the parent or because you set them to be recreated), do it concurrently. Note this does not work on primary keys when --primary is given.") 13parser.add_argument('-R', '--recreate_all', action="store_true", help="By default, if an index exists on a child and matches the parent, it will not be touched. Setting this option will force all child indexes to be dropped & recreated. Will obey the --concurrent & --drop_concurrent options if given. Will not recreate primary keys unless --primary option is also given.") 14parser.add_argument('--primary', action="store_true", help="By default the primary key is not recreated. Set this option if that is needed. Note this will cause an exclusive lock on the child table.") 15parser.add_argument('-j', '--jobs', type=int, default=0, help="Use the python multiprocessing library to recreate indexes in parallel. Value for -j is number of simultaneous jobs to run. Note that this is per table, not per index. Be very careful setting this option if load is a concern on your systems.") 16parser.add_argument('-w', '--wait', type=float, default=0, help="Wait the given number of seconds after indexes have finished being created on a table before moving on to the next. When used with -j, this will set the pause between the batches of parallel jobs instead.") 17parser.add_argument('--dryrun', action="store_true", help="Show what the script will do without actually running it against the database. Highly recommend reviewing this before running. Note that if multiple indexes would get the same default name, the duplicated name will show in the dryrun (because the index doesn't exist in the catalog to check for it). When the real thing is run, the duplicated names will be handled as stated in NOTE at the end of --help.") 18parser.add_argument('-q', '--quiet', action="store_true", help="Turn off all output.") 19parser.add_argument('--nonpartman', action="store_true", help="If the partition set you are running this on is not managed by pg_partman, set this flag otherwise this script may not work. Note that the pg_partman extension is still required to be installed for this to work since it uses certain internal functions. When this is set the order that the tables are reindexed is alphabetical instead of logical.") 20parser.add_argument('--version', action="store_true", help="Print out the minimum version of pg_partman this script is meant to work with. The version of pg_partman installed may be greater than this.") 21args = parser.parse_args() 22 23def check_compatibility(conn, partman_schema): 24 cur = conn.cursor() 25 26 sql = """SELECT current_setting('server_version_num')::int""" 27 cur.execute(sql) 28 pg_version = int(cur.fetchone()[0]) 29 30 if pg_version < 90400: 31 print("ERROR: This script requires PostgreSQL minimum version of 9.4.0") 32 sys.exit(2) 33 34 sql = "SELECT partition_type FROM " + partman_schema + ".part_config WHERE parent_table = %s" 35 cur.execute(sql, [args.parent]) 36 partition_type = cur.fetchone()[0] 37 38 if pg_version >= 110000 and partition_type == "native": 39 print("This script cannot currently work with native partition sets in PG11+. Please use native index inheritance methods if possible.") 40 cur.close() 41 close_conn(conn) 42 sys.exit(2) 43 44 cur.close() 45 46 47def create_conn(): 48 conn = psycopg2.connect(args.connection) 49 return conn 50 51 52def create_index(conn, partman_schema, child_schemaname, child_tablename, child_index_list, parent, parent_index_list): 53 cur = conn.cursor() 54 sql = """SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = %s""" 55 cur.execute(sql, [parent]) 56 result = cur.fetchone() 57 parent_schemaname = result[0] 58 parent_tablename = result[1] 59 cur.close() 60 parent_match_regex = re.compile(r" ON \"?%s\"?\.\"?%s\"? | ON \"?%s\"?" % (parent_schemaname, parent_tablename, parent_tablename)) 61 index_match_regex = re.compile(r'(?P<index_def>USING .*)') 62 for i in parent_index_list: 63 # if there is already a child index that matches the parent index don't try to create it unless --recreate_all set 64 if args.recreate_all != True: 65 child_found = False 66 statement = None 67 parinddef = index_match_regex.search(i[0]) 68 for c in child_index_list: 69 chinddef = index_match_regex.search(c[0]) 70 if chinddef.group('index_def') == parinddef.group('index_def'): 71 child_found = True 72 break 73 if child_found: 74 continue 75 76 if i[1] == True and args.primary: 77 index_name = child_tablename + "_" + "_".join(i[2].split(",")) 78 sql = "SELECT " + partman_schema + ".check_name_length('" + index_name + "', p_suffix := '_pk')" 79 cur = conn.cursor() 80 cur.execute(sql) 81 index_name = cur.fetchone()[0] 82 cur.close() 83 quoted_column_names = "\"" + "\",\"".join(i[2].split(",")) + "\"" 84 statement = "ALTER TABLE \"" + child_schemaname + "\".\"" + child_tablename + "\" ADD CONSTRAINT \"" + index_name + "\" PRIMARY KEY (" + quoted_column_names + ")" 85 elif i[1] == False: 86 index_name = child_tablename 87 if i[2] != None: 88 index_name += "_" 89 index_name += "_".join(i[2].split(",")) 90 sql = "SELECT " + partman_schema + ".check_name_length('" + index_name + "', p_suffix := '_idx')" 91 cur = conn.cursor() 92 cur.execute(sql) 93 index_name = cur.fetchone()[0] 94 name_counter = 1 95 while True: 96 sql = "SELECT count(*) FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = %s AND c.relname = %s" 97 cur = conn.cursor() 98 cur.execute(sql, [child_schemaname, index_name]) 99 index_exists = cur.fetchone()[0] 100 if index_exists != None and index_exists > 0: 101 index_name = child_tablename 102 if i[2] != None: 103 index_name += "_" 104 index_name += "_".join(i[2].split(",")) 105 suffix = "_idx" + str(name_counter) 106 sql = "SELECT " + partman_schema + ".check_name_length('" + index_name + "', p_suffix := '" + suffix + "')" 107 cur = conn.cursor() 108 cur.execute(sql) 109 index_name = cur.fetchone()[0] 110 name_counter += 1 111 else: 112 break 113 cur.close() 114 statement = i[0] 115 statement = statement.replace(i[3], index_name) # replace parent index name with child index name 116 if args.concurrent: 117 statement = statement.replace("CREATE INDEX ", "CREATE INDEX CONCURRENTLY ") 118 statement = parent_match_regex.sub(" ON \"" + child_schemaname + "\".\"" + child_tablename + "\" ", statement) 119 cur = conn.cursor() 120 if statement != None: 121 if not args.quiet: 122 print(cur.mogrify(statement)) 123 if not args.dryrun: 124 cur.execute(statement) 125 cur.close() 126 127 128def close_conn(conn): 129 conn.close() 130 131 132def drop_index(conn, child_schemaname, child_tablename, child_index_list, parent_index_list): 133 cur = conn.cursor() 134 for d in child_index_list: 135 if d[1] == True and args.primary: 136 statement = "ALTER TABLE \"" + child_schemaname + "\".\"" + child_tablename + "\" DROP CONSTRAINT \"" + d[4] + "\"" 137 if not args.quiet: 138 print(cur.mogrify(statement)) 139 if not args.dryrun: 140 cur.execute(statement) 141 elif d[1] == False: 142 if args.drop_concurrent: 143 statement = "DROP INDEX CONCURRENTLY \"" + d[2] + "\".\"" + d[3] + "\"" 144 else: 145 statement = "DROP INDEX \"" + d[2] + "\".\"" + d[3] + "\"" 146 147 if args.recreate_all != True: 148 pat = re.compile(r'(?P<index_def>USING .*)') 149 # if there is a parent index that matches the child index 150 # don't try to drop it - we'd just have to recreate it 151 chinddef = pat.search(d[0]) 152 parent_found = False 153 for p in parent_index_list: 154 parent_found = False 155 parinddef = pat.search(p[0]) 156 if chinddef.group('index_def') == parinddef.group('index_def'): 157 parent_found = True 158 break 159 if not parent_found: 160 if not args.quiet: 161 print(cur.mogrify(statement)) 162 if not args.dryrun: 163 cur.execute(statement) 164 else: 165 if not args.quiet: 166 print(cur.mogrify(statement)) 167 if not args.dryrun: 168 cur.execute(statement) 169 170 171def get_children(conn, partman_schema): 172 cur = conn.cursor() 173 if args.nonpartman == False: 174 sql = "SELECT partition_schemaname, partition_tablename FROM " + partman_schema + ".show_partitions(%s)" 175 else: 176 sql = """ 177 WITH parent_info AS ( 178 SELECT c1.oid FROM pg_catalog.pg_class c1 179 JOIN pg_catalog.pg_namespace n1 ON c1.relnamespace = n1.oid 180 WHERE n1.nspname ||'.'|| c1.relname = %s 181 ) 182 SELECT n.nspname::text, c.relname::text AS partition_name FROM 183 pg_catalog.pg_inherits h 184 JOIN pg_catalog.pg_class c ON c.oid = h.inhrelid 185 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid 186 JOIN parent_info pi ON h.inhparent = pi.oid 187 ORDER BY 1,2""" 188 cur.execute(sql, [args.parent]) 189 child_list = cur.fetchall() 190 cur.close() 191 return child_list 192 193 194def get_child_index_list(conn, child_schemaname, child_tablename): 195 cur = conn.cursor() 196 sql = """ 197 WITH child_info AS ( 198 SELECT c1.oid FROM pg_catalog.pg_class c1 199 JOIN pg_catalog.pg_namespace n1 ON c1.relnamespace = n1.oid 200 WHERE n1.nspname = %s 201 AND c1.relname = %s 202 ) 203 SELECT pg_get_indexdef(indexrelid) AS statement 204 , i.indisprimary 205 , n.nspname AS index_schemaname 206 , c.relname AS index_name 207 , t.conname 208 FROM pg_catalog.pg_index i 209 JOIN pg_catalog.pg_class c ON i.indexrelid = c.oid 210 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid 211 LEFT JOIN pg_catalog.pg_constraint t ON c.oid = t.conindid 212 JOIN child_info ci ON i.indrelid = ci.oid """ 213 cur.execute(sql, [child_schemaname, child_tablename]) 214 child_index_list = cur.fetchall() 215 cur.close() 216 return child_index_list 217 218 219def get_parent_index_list(conn, parent): 220 cur = conn.cursor() 221 222 sql = """ 223 WITH parent_info AS ( 224 SELECT c1.oid FROM pg_catalog.pg_class c1 225 JOIN pg_catalog.pg_namespace n1 ON c1.relnamespace = n1.oid 226 WHERE n1.nspname||'.'||c1.relname = %s 227 ) 228 SELECT 229 pg_get_indexdef(indexrelid) AS statement 230 , i.indisprimary 231 , ( SELECT array_to_string(array_agg( a.attname ORDER by x.r ), ',') 232 FROM pg_catalog.pg_attribute a 233 JOIN ( SELECT k, row_number() over () as r 234 FROM unnest(i.indkey) k ) as x 235 ON a.attnum = x.k AND a.attrelid = i.indrelid 236 ) AS indkey_names 237 , c.relname AS index_name 238 FROM pg_catalog.pg_index i 239 JOIN pg_catalog.pg_class c ON i.indexrelid = c.oid 240 JOIN parent_info pi ON i.indrelid = pi.oid 241 WHERE i.indisvalid 242 ORDER BY 1""" 243 cur.execute(sql, [parent]) 244 parent_index_list = cur.fetchall() 245 return parent_index_list 246 247 248def get_parent(conn, partman_schema): 249 cur = conn.cursor() 250 251 sql = "SELECT template_table FROM " + partman_schema + ".part_config WHERE parent_table = %s AND partition_type = 'native'" 252 cur.execute(sql, [args.parent]) 253 template_table = cur.fetchone() 254 255 if template_table is None: 256 return args.parent 257 else: 258 return template_table[0] 259 260 261def get_partman_schema(conn): 262 cur = conn.cursor() 263 sql = "SELECT nspname FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_partman' AND e.extnamespace = n.oid" 264 cur.execute(sql) 265 partman_schema = "\"" + cur.fetchone()[0] + "\"" 266 cur.close() 267 return partman_schema 268 269 270def print_version(): 271 print(partman_version) 272 sys.exit() 273 274 275def reindex_proc(child_schemaname, child_tablename, parent, parent_index_list, partman_schema): 276 conn = create_conn() 277 conn.autocommit = True # must be turned on to support CONCURRENTLY 278 cur = conn.cursor() 279 child_index_list = get_child_index_list(conn, child_schemaname, child_tablename) 280 drop_index(conn, child_schemaname, child_tablename, child_index_list, parent_index_list) 281 create_index(conn, partman_schema, child_schemaname, child_tablename, child_index_list, parent, parent_index_list) 282 283 sql = "ANALYZE \"" + child_schemaname + "\".\"" + child_tablename + "\"" 284 if not args.quiet: 285 print(cur.mogrify(sql)) 286 if not args.dryrun: 287 cur.execute(sql) 288 cur.close() 289 close_conn(conn) 290 291 292if __name__ == "__main__": 293 294 if args.version: 295 print_version() 296 297 if args.parent == None: 298 print("-p/--parent option is required") 299 sys.exit(2) 300 301 if args.parent.find(".") < 0: 302 print("ERROR: Parent table must be schema qualified") 303 sys.exit(2) 304 305 conn = create_conn() 306 307 partman_schema = get_partman_schema(conn) 308 check_compatibility(conn,partman_schema) 309 parent = get_parent(conn, partman_schema) 310 parent_index_list = get_parent_index_list(conn, parent) 311 child_list = get_children(conn, partman_schema) 312 close_conn(conn) 313 314 if args.jobs == 0: 315 for c in child_list: 316 reindex_proc(c[0], c[1], parent, parent_index_list, partman_schema) 317 if args.wait > 0: 318 time.sleep(args.wait) 319 else: 320 child_list.reverse() 321 while len(child_list) > 0: 322 if not args.quiet: 323 print("Jobs left in queue: " + str(len(child_list))) 324 if len(child_list) < args.jobs: 325 args.jobs = len(child_list) 326 processlist = [] 327 for num in range(0, args.jobs): 328 c = child_list.pop() 329 p = Process(target=reindex_proc, args=(c[0], c[1], parent, parent_index_list, partman_schema)) 330 p.start() 331 processlist.append(p) 332 for j in processlist: 333 j.join() 334 if args.wait > 0: 335 time.sleep(args.wait) 336