1-- Bug fix: Typos in partition_time_data/id() functions. Only ran into this if a lockwait was hit while trying to partition data.
2
3
4/*
5 * Populate the child table(s) of a time-based partition set with old data from the original parent
6 */
7CREATE OR REPLACE FUNCTION partition_data_time(p_parent_table text, p_batch_count int DEFAULT 1, p_batch_interval interval DEFAULT NULL, p_lock_wait numeric DEFAULT 0) RETURNS bigint
8    LANGUAGE plpgsql SECURITY DEFINER
9    AS $$
10DECLARE
11
12v_control                   text;
13v_datetime_string           text;
14v_last_partition_name       text;
15v_max_partition_timestamp   timestamp;
16v_min_control               timestamp;
17v_min_partition_timestamp   timestamp;
18v_part_interval             interval;
19v_partition_timestamp       timestamp[];
20v_rowcount                  bigint;
21v_sql                       text;
22v_total_rows                bigint := 0;
23v_lock_iter                 int := 1;
24v_lock_obtained             boolean := FALSE;
25
26BEGIN
27
28SELECT part_interval::interval, control, datetime_string
29INTO v_part_interval, v_control, v_datetime_string
30FROM @extschema@.part_config
31WHERE parent_table = p_parent_table
32AND (type = 'time-static' OR type = 'time-dynamic');
33IF NOT FOUND THEN
34    RAISE EXCEPTION 'ERROR: no config found for %', p_parent_table;
35END IF;
36
37IF p_batch_interval IS NULL OR p_batch_interval > v_part_interval THEN
38    p_batch_interval := v_part_interval;
39END IF;
40
41FOR i IN 1..p_batch_count LOOP
42
43    EXECUTE 'SELECT min('||v_control||') FROM ONLY '||p_parent_table INTO v_min_control;
44    IF v_min_control IS NULL THEN
45        RETURN 0;
46    END IF;
47
48    CASE
49        WHEN v_part_interval = '15 mins' THEN
50            v_min_partition_timestamp := date_trunc('hour', v_min_control) +
51                '15min'::interval * floor(date_part('minute', v_min_control) / 15.0);
52        WHEN v_part_interval = '30 mins' THEN
53            v_min_partition_timestamp := date_trunc('hour', v_min_control) +
54                '30min'::interval * floor(date_part('minute', v_min_control) / 30.0);
55        WHEN v_part_interval = '1 hour' THEN
56            v_min_partition_timestamp := date_trunc('hour', v_min_control);
57        WHEN v_part_interval = '1 day' THEN
58            v_min_partition_timestamp := date_trunc('day', v_min_control);
59        WHEN v_part_interval = '1 week' THEN
60            v_min_partition_timestamp := date_trunc('week', v_min_control);
61        WHEN v_part_interval = '1 month' THEN
62            v_min_partition_timestamp := date_trunc('month', v_min_control);
63        WHEN v_part_interval = '3 months' THEN
64            v_min_partition_timestamp := date_trunc('quarter', v_min_control);
65        WHEN v_part_interval = '1 year' THEN
66            v_min_partition_timestamp := date_trunc('year', v_min_control);
67    END CASE;
68
69    v_partition_timestamp := ARRAY[v_min_partition_timestamp];
70--    RAISE NOTICE 'v_partition_timestamp: %',v_partition_timestamp;
71    IF (v_min_control + p_batch_interval) >= (v_min_partition_timestamp + v_part_interval) THEN
72        v_max_partition_timestamp := v_min_partition_timestamp + v_part_interval;
73    ELSE
74        v_max_partition_timestamp := v_min_control + p_batch_interval;
75    END IF;
76--    RAISE NOTICE 'v_max_partition_timestamp: %',v_max_partition_timestamp;
77
78-- do some locking with timeout, if required
79    IF p_lock_wait > 0  THEN
80        WHILE v_lock_iter <= 5 LOOP
81            v_lock_iter := v_lock_iter + 1;
82            BEGIN
83                v_sql := 'SELECT * FROM ONLY ' || p_parent_table ||
84                ' WHERE '||v_control||' >= '||quote_literal(v_min_control)||
85                ' AND '||v_control||' < '||quote_literal(v_max_partition_timestamp)
86                ||' FOR UPDATE NOWAIT';
87                EXECUTE v_sql;
88                v_lock_obtained := TRUE;
89            EXCEPTION
90                WHEN lock_not_available THEN
91                    PERFORM pg_sleep( p_lock_wait / 5.0 );
92                    CONTINUE;
93            END;
94            EXIT WHEN v_lock_obtained;
95        END LOOP;
96        IF NOT v_lock_obtained THEN
97           RETURN -1;
98        END IF;
99    END IF;
100
101    v_sql := 'SELECT @extschema@.create_time_partition('||quote_literal(p_parent_table)||','||quote_literal(v_control)||','
102    ||quote_literal(v_part_interval)||','||quote_literal(v_datetime_string)||','||quote_literal(v_partition_timestamp)||')';
103--    RAISE NOTICE 'v_sql: %', v_sql;
104    EXECUTE v_sql INTO v_last_partition_name;
105
106    v_sql := 'WITH partition_data AS (
107            DELETE FROM ONLY '||p_parent_table||' WHERE '||v_control||' >= '||quote_literal(v_min_control)||
108                ' AND '||v_control||' < '||quote_literal(v_max_partition_timestamp)||' RETURNING *)
109            INSERT INTO '||v_last_partition_name||' SELECT * FROM partition_data';
110--    RAISE NOTICE 'v_sql: %', v_sql;
111    EXECUTE v_sql;
112
113    GET DIAGNOSTICS v_rowcount = ROW_COUNT;
114    v_total_rows := v_total_rows + v_rowcount;
115    IF v_rowcount = 0 THEN
116        EXIT;
117    END IF;
118
119END LOOP;
120
121RETURN v_total_rows;
122
123END
124$$;
125
126
127/*
128 * Populate the child table(s) of an id-based partition set with old data from the original parent
129 */
130CREATE OR REPLACE FUNCTION partition_data_id(p_parent_table text, p_batch_count int DEFAULT 1, p_batch_interval int DEFAULT NULL, p_lock_wait numeric DEFAULT 0) RETURNS bigint
131    LANGUAGE plpgsql SECURITY DEFINER
132    AS $$
133DECLARE
134
135v_control                   text;
136v_last_partition_name       text;
137v_max_partition_id          bigint;
138v_min_control               bigint;
139v_min_partition_id          bigint;
140v_part_interval             bigint;
141v_partition_id              bigint[];
142v_rowcount                  bigint;
143v_sql                       text;
144v_total_rows                bigint := 0;
145v_lock_iter                 int := 1;
146v_lock_obtained             boolean := FALSE;
147
148BEGIN
149
150SELECT part_interval::bigint, control
151INTO v_part_interval, v_control
152FROM @extschema@.part_config
153WHERE parent_table = p_parent_table
154AND (type = 'id-static' OR type = 'id-dynamic');
155IF NOT FOUND THEN
156    RAISE EXCEPTION 'ERROR: no config found for %', p_parent_table;
157END IF;
158
159IF p_batch_interval IS NULL OR p_batch_interval > v_part_interval THEN
160    p_batch_interval := v_part_interval;
161END IF;
162
163FOR i IN 1..p_batch_count LOOP
164
165    EXECUTE 'SELECT min('||v_control||') FROM ONLY '||p_parent_table INTO v_min_control;
166    IF v_min_control IS NULL THEN
167        RETURN 0;
168    END IF;
169
170    v_min_partition_id = v_min_control - (v_min_control % v_part_interval);
171
172    v_partition_id := ARRAY[v_min_partition_id];
173--    RAISE NOTICE 'v_partition_id: %',v_partition_id;
174    IF (v_min_control + p_batch_interval) >= (v_min_partition_id + v_part_interval) THEN
175        v_max_partition_id := v_min_partition_id + v_part_interval;
176    ELSE
177        v_max_partition_id := v_min_control + p_batch_interval;
178    END IF;
179--    RAISE NOTICE 'v_max_partition_id: %',v_max_partition_id;
180
181-- do some locking with timeout, if required
182    IF p_lock_wait > 0  THEN
183        WHILE v_lock_iter <= 5 LOOP
184            v_lock_iter := v_lock_iter + 1;
185            BEGIN
186                v_sql := 'SELECT * FROM ONLY ' || p_parent_table ||
187                ' WHERE '||v_control||' >= '||quote_literal(v_min_control)||
188                ' AND '||v_control||' < '||quote_literal(v_max_partition_id)
189                ||' FOR UPDATE NOWAIT';
190                EXECUTE v_sql;
191                v_lock_obtained := TRUE;
192            EXCEPTION
193                WHEN lock_not_available THEN
194                    PERFORM pg_sleep( p_lock_wait / 5.0 );
195                    CONTINUE;
196            END;
197            EXIT WHEN v_lock_obtained;
198        END LOOP;
199        IF NOT v_lock_obtained THEN
200           RETURN -1;
201        END IF;
202    END IF;
203
204    v_sql := 'SELECT @extschema@.create_id_partition('||quote_literal(p_parent_table)||','||quote_literal(v_control)||','
205    ||v_part_interval||','||quote_literal(v_partition_id)||')';
206--    RAISE NOTICE 'v_sql: %', v_sql;
207    EXECUTE v_sql INTO v_last_partition_name;
208
209    v_sql := 'WITH partition_data AS (
210        DELETE FROM ONLY '||p_parent_table||' WHERE '||v_control||' >= '||v_min_control||
211            ' AND '||v_control||' < '||v_max_partition_id||' RETURNING *)
212        INSERT INTO '||v_last_partition_name||' SELECT * FROM partition_data';
213
214--    RAISE NOTICE 'v_sql: %', v_sql;
215    EXECUTE v_sql;
216
217    GET DIAGNOSTICS v_rowcount = ROW_COUNT;
218    v_total_rows := v_total_rows + v_rowcount;
219    IF v_rowcount = 0 THEN
220        EXIT;
221    END IF;
222
223END LOOP;
224
225RETURN v_total_rows;
226
227END
228$$;
229