1CREATE PROCEDURE @extschema@.partition_data_proc (p_parent_table text, p_interval text DEFAULT NULL, p_batch int DEFAULT NULL, p_wait int DEFAULT 1, p_source_table text DEFAULT NULL, p_order text DEFAULT 'ASC', p_lock_wait int DEFAULT 0, p_lock_wait_tries int DEFAULT 10, p_quiet boolean DEFAULT false, p_ignored_columns text[] DEFAULT NULL) 2 LANGUAGE plpgsql 3 AS $$ 4DECLARE 5 6v_adv_lock boolean; 7v_batch_count int := 0; 8v_control text; 9v_control_type text; 10v_epoch text; 11v_is_autovac_off boolean := false; 12v_lockwait_count int := 0; 13v_parent_schema text; 14v_parent_tablename text; 15v_row record; 16v_rows_moved bigint; 17v_source_schema text; 18v_source_tablename text; 19v_sql text; 20v_total bigint := 0; 21 22BEGIN 23 24v_adv_lock := pg_try_advisory_xact_lock(hashtext('pg_partman partition_data_proc'), hashtext(p_parent_table)); 25IF v_adv_lock = 'false' THEN 26 RAISE NOTICE 'Partman partition_data_proc already running for given parent table: %.', p_parent_table; 27 RETURN; 28END IF; 29 30SELECT control, epoch 31INTO v_control, v_epoch 32FROM @extschema@.part_config 33WHERE parent_table = p_parent_table; 34IF NOT FOUND THEN 35 RAISE EXCEPTION 'ERROR: No entry in part_config found for given table: %', p_parent_table; 36END IF; 37 38SELECT n.nspname, c.relname INTO v_parent_schema, v_parent_tablename 39FROM pg_catalog.pg_class c 40JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid 41WHERE n.nspname = split_part(p_parent_table, '.', 1)::name 42AND c.relname = split_part(p_parent_table, '.', 2)::name; 43 IF v_parent_tablename IS NULL THEN 44 RAISE EXCEPTION 'Unable to find given parent table in system catalogs. Ensure it is schema qualified: %', p_parent_table; 45 END IF; 46 47IF p_source_table IS NOT NULL THEN 48 SELECT n.nspname, c.relname INTO v_source_schema, v_source_tablename 49 FROM pg_catalog.pg_class c 50 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid 51 WHERE n.nspname = split_part(p_source_table, '.', 1)::name 52 AND c.relname = split_part(p_source_table, '.', 2)::name; 53 IF v_source_tablename IS NULL THEN 54 RAISE EXCEPTION 'Unable to find given source table in system catalogs. Ensure it is schema qualified: %', p_source_table; 55 END IF; 56END IF; 57 58SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schema, v_parent_tablename, v_control); 59 60IF v_control_type = 'id' AND v_epoch <> 'none' THEN 61 v_control_type := 'time'; 62END IF; 63 64/* 65-- Currently no way to catch exception and reset autovac settings back to normal. Until I can do that, leaving this feature out for now 66-- Leaving the functions to turn off/reset in to let people do that manually if desired 67IF p_autovacuum_on = false THEN -- Add this parameter back to definition when this is working 68 -- Turn off autovac for parent, source table if set, and all child tables 69 v_is_autovac_off := @extschema@.autovacuum_off(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename); 70 COMMIT; 71END IF; 72*/ 73 74v_sql := format('SELECT %I.partition_data_%s (%L, p_lock_wait := %L, p_order := %L, p_analyze := false' 75 , '@extschema@', v_control_type, p_parent_table, p_lock_wait, p_order); 76IF p_interval IS NOT NULL THEN 77 v_sql := v_sql || format(', p_batch_interval := %L', p_interval); 78END IF; 79IF p_source_table IS NOT NULL THEN 80 v_sql := v_sql || format(', p_source_table := %L', p_source_table); 81END IF; 82IF p_ignored_columns IS NOT NULL THEN 83 v_sql := v_sql || format(', p_ignored_columns := %L', p_ignored_columns); 84END IF; 85v_sql := v_sql || ')'; 86RAISE DEBUG 'partition_data sql: %', v_sql; 87 88LOOP 89 EXECUTE v_sql INTO v_rows_moved; 90 -- If lock wait timeout, do not increment the counter 91 IF v_rows_moved != -1 THEN 92 v_batch_count := v_batch_count + 1; 93 v_total := v_total + v_rows_moved; 94 v_lockwait_count := 0; 95 ELSE 96 v_lockwait_count := v_lockwait_count + 1; 97 IF v_lockwait_count > p_lock_wait_tries THEN 98 RAISE EXCEPTION 'Quitting due to inability to get lock on next batch of rows to be moved'; 99 END IF; 100 END IF; 101 IF p_quiet = false THEN 102 IF v_rows_moved > 0 THEN 103 RAISE NOTICE 'Batch: %, Rows moved: %', v_batch_count, v_rows_moved; 104 ELSIF v_rows_moved = -1 THEN 105 RAISE NOTICE 'Unable to obtain row locks for data to be moved. Trying again...'; 106 END IF; 107 END IF; 108 -- If no rows left or given batch argument limit is reached 109 IF v_rows_moved = 0 OR (p_batch > 0 AND v_batch_count >= p_batch) THEN 110 EXIT; 111 END IF; 112 COMMIT; 113 PERFORM pg_sleep(p_wait); 114 RAISE DEBUG 'v_rows_moved: %, v_batch_count: %, v_total: %, v_lockwait_count: %, p_wait: %', p_wait, v_rows_moved, v_batch_count, v_total, v_lockwait_count; 115END LOOP; 116 117/* 118IF v_is_autovac_off = true THEN 119 -- Reset autovac back to default if it was turned off by this procedure 120 PERFORM @extschema@.autovacuum_reset(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename); 121 COMMIT; 122END IF; 123*/ 124 125IF p_quiet = false THEN 126 RAISE NOTICE 'Total rows moved: %', v_total; 127END IF; 128RAISE NOTICE 'Ensure to VACUUM ANALYZE the parent (and source table if used) after partitioning data'; 129 130/* Leaving here until I can figure out what's wrong with procedures and exception handling 131EXCEPTION 132 WHEN QUERY_CANCELED THEN 133 ROLLBACK; 134 -- Reset autovac back to default if it was turned off by this procedure 135 IF v_is_autovac_off = true THEN 136 PERFORM @extschema@.autovacuum_reset(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename); 137 END IF; 138 RAISE EXCEPTION '%', SQLERRM; 139 WHEN OTHERS THEN 140 ROLLBACK; 141 -- Reset autovac back to default if it was turned off by this procedure 142 IF v_is_autovac_off = true THEN 143 PERFORM @extschema@.autovacuum_reset(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename); 144 END IF; 145 RAISE EXCEPTION '%', SQLERRM; 146*/ 147END; 148$$; 149 150