1CREATE PROCEDURE @extschema@.partition_data_proc (p_parent_table text, p_interval text DEFAULT NULL, p_batch int DEFAULT NULL, p_wait int DEFAULT 1, p_source_table text DEFAULT NULL, p_order text DEFAULT 'ASC', p_lock_wait int DEFAULT 0, p_lock_wait_tries int DEFAULT 10, p_quiet boolean DEFAULT false, p_ignored_columns text[] DEFAULT NULL)
2    LANGUAGE plpgsql
3    AS $$
4DECLARE
5
6v_adv_lock          boolean;
7v_batch_count       int := 0;
8v_control           text;
9v_control_type      text;
10v_epoch             text;
11v_is_autovac_off    boolean := false;
12v_lockwait_count    int := 0;
13v_parent_schema     text;
14v_parent_tablename  text;
15v_row               record;
16v_rows_moved        bigint;
17v_source_schema     text;
18v_source_tablename  text;
19v_sql               text;
20v_total             bigint := 0;
21
22BEGIN
23
24v_adv_lock := pg_try_advisory_xact_lock(hashtext('pg_partman partition_data_proc'), hashtext(p_parent_table));
25IF v_adv_lock = 'false' THEN
26    RAISE NOTICE 'Partman partition_data_proc already running for given parent table: %.', p_parent_table;
27    RETURN;
28END IF;
29
30SELECT control, epoch
31INTO v_control, v_epoch
32FROM @extschema@.part_config
33WHERE parent_table = p_parent_table;
34IF NOT FOUND THEN
35    RAISE EXCEPTION 'ERROR: No entry in part_config found for given table: %', p_parent_table;
36END IF;
37
38SELECT n.nspname, c.relname INTO v_parent_schema, v_parent_tablename
39FROM pg_catalog.pg_class c
40JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
41WHERE n.nspname = split_part(p_parent_table, '.', 1)::name
42AND c.relname = split_part(p_parent_table, '.', 2)::name;
43    IF v_parent_tablename IS NULL THEN
44        RAISE EXCEPTION 'Unable to find given parent table in system catalogs. Ensure it is schema qualified: %', p_parent_table;
45    END IF;
46
47IF p_source_table IS NOT NULL THEN
48    SELECT n.nspname, c.relname INTO v_source_schema, v_source_tablename
49    FROM pg_catalog.pg_class c
50    JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
51    WHERE n.nspname = split_part(p_source_table, '.', 1)::name
52    AND c.relname = split_part(p_source_table, '.', 2)::name;
53        IF v_source_tablename IS NULL THEN
54            RAISE EXCEPTION 'Unable to find given source table in system catalogs. Ensure it is schema qualified: %', p_source_table;
55        END IF;
56END IF;
57
58SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schema, v_parent_tablename, v_control);
59
60IF v_control_type = 'id' AND v_epoch <> 'none' THEN
61        v_control_type := 'time';
62END IF;
63
64/*
65-- Currently no way to catch exception and reset autovac settings back to normal. Until I can do that, leaving this feature out for now
66-- Leaving the functions to turn off/reset in to let people do that manually if desired
67IF p_autovacuum_on = false THEN         -- Add this parameter back to definition when this is working
68    -- Turn off autovac for parent, source table if set, and all child tables
69    v_is_autovac_off := @extschema@.autovacuum_off(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename);
70    COMMIT;
71END IF;
72*/
73
74v_sql := format('SELECT %I.partition_data_%s (%L, p_lock_wait := %L, p_order := %L, p_analyze := false'
75        , '@extschema@', v_control_type, p_parent_table, p_lock_wait, p_order);
76IF p_interval IS NOT NULL THEN
77    v_sql := v_sql || format(', p_batch_interval := %L', p_interval);
78END IF;
79IF p_source_table IS NOT NULL THEN
80    v_sql := v_sql || format(', p_source_table := %L', p_source_table);
81END IF;
82IF p_ignored_columns IS NOT NULL THEN
83    v_sql := v_sql || format(', p_ignored_columns := %L', p_ignored_columns);
84END IF;
85v_sql := v_sql || ')';
86RAISE DEBUG 'partition_data sql: %', v_sql;
87
88LOOP
89    EXECUTE v_sql INTO v_rows_moved;
90    -- If lock wait timeout, do not increment the counter
91    IF v_rows_moved != -1 THEN
92        v_batch_count := v_batch_count + 1;
93        v_total := v_total + v_rows_moved;
94        v_lockwait_count := 0;
95    ELSE
96        v_lockwait_count := v_lockwait_count + 1;
97        IF v_lockwait_count > p_lock_wait_tries THEN
98            RAISE EXCEPTION 'Quitting due to inability to get lock on next batch of rows to be moved';
99        END IF;
100    END IF;
101    IF p_quiet = false THEN
102        IF v_rows_moved > 0 THEN
103            RAISE NOTICE 'Batch: %, Rows moved: %', v_batch_count, v_rows_moved;
104        ELSIF v_rows_moved = -1 THEN
105            RAISE NOTICE 'Unable to obtain row locks for data to be moved. Trying again...';
106        END IF;
107    END IF;
108    -- If no rows left or given batch argument limit is reached
109    IF v_rows_moved = 0 OR (p_batch > 0 AND v_batch_count >= p_batch) THEN
110        EXIT;
111    END IF;
112    COMMIT;
113    PERFORM pg_sleep(p_wait);
114    RAISE DEBUG 'v_rows_moved: %, v_batch_count: %, v_total: %, v_lockwait_count: %, p_wait: %', p_wait, v_rows_moved, v_batch_count, v_total, v_lockwait_count;
115END LOOP;
116
117/*
118IF v_is_autovac_off = true THEN
119    -- Reset autovac back to default if it was turned off by this procedure
120    PERFORM @extschema@.autovacuum_reset(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename);
121    COMMIT;
122END IF;
123*/
124
125IF p_quiet = false THEN
126    RAISE NOTICE 'Total rows moved: %', v_total;
127END IF;
128RAISE NOTICE 'Ensure to VACUUM ANALYZE the parent (and source table if used) after partitioning data';
129
130/* Leaving here until I can figure out what's wrong with procedures and exception handling
131EXCEPTION
132    WHEN QUERY_CANCELED THEN
133        ROLLBACK;
134        -- Reset autovac back to default if it was turned off by this procedure
135        IF v_is_autovac_off = true THEN
136            PERFORM @extschema@.autovacuum_reset(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename);
137        END IF;
138        RAISE EXCEPTION '%', SQLERRM;
139    WHEN OTHERS THEN
140        ROLLBACK;
141        -- Reset autovac back to default if it was turned off by this procedure
142        IF v_is_autovac_off = true THEN
143            PERFORM @extschema@.autovacuum_reset(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename);
144        END IF;
145        RAISE EXCEPTION '%', SQLERRM;
146*/
147END;
148$$;
149
150