1CREATE FUNCTION @extschema@.partition_data_id(p_parent_table text
2    , p_batch_count int DEFAULT 1
3    , p_batch_interval bigint DEFAULT NULL
4    , p_lock_wait numeric DEFAULT 0
5    , p_order text DEFAULT 'ASC'
6    , p_analyze boolean DEFAULT true
7    , p_source_table text DEFAULT NULL
8    , p_ignored_columns text[] DEFAULT NULL)
9RETURNS bigint
10LANGUAGE plpgsql
11AS $$
12DECLARE
13
14v_col                       text;
15v_column_list               text;
16v_control                   text;
17v_control_type              text;
18v_current_partition_name    text;
19v_default_exists            boolean;
20v_default_schemaname        text;
21v_default_tablename         text;
22v_epoch                     text;
23v_lock_iter                 int := 1;
24v_lock_obtained             boolean := FALSE;
25v_max_partition_id          bigint;
26v_min_partition_id          bigint;
27v_new_search_path           text := '@extschema@,pg_temp';
28v_old_search_path           text;
29v_parent_tablename          text;
30v_partition_interval        bigint;
31v_partition_id              bigint[];
32v_partition_type            text;
33v_rowcount                  bigint;
34v_source_schemaname         text;
35v_source_tablename          text;
36v_sql                       text;
37v_start_control             bigint;
38v_total_rows                bigint := 0;
39
40BEGIN
41    /*
42     * Populate the child table(s) of an id-based partition set with old data from the original parent
43     */
44
45    SELECT partition_interval::bigint
46    , partition_type
47    , control
48    , epoch
49    INTO v_partition_interval
50    , v_partition_type
51    , v_control
52    , v_epoch
53    FROM @extschema@.part_config
54    WHERE parent_table = p_parent_table;
55    IF NOT FOUND THEN
56        RAISE EXCEPTION 'ERROR: No entry in part_config found for given table:  %', p_parent_table;
57    END IF;
58
59SELECT schemaname, tablename INTO v_source_schemaname, v_source_tablename
60FROM pg_catalog.pg_tables
61WHERE schemaname = split_part(p_parent_table, '.', 1)::name
62AND tablename = split_part(p_parent_table, '.', 2)::name;
63
64-- Preserve real parent tablename for use below
65v_parent_tablename := v_source_tablename;
66
67SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_source_schemaname, v_source_tablename, v_control);
68
69IF v_control_type <> 'id' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
70    RAISE EXCEPTION 'Control column for given partition set is not id/serial based or epoch flag is set for time-based partitioning.';
71END IF;
72
73IF p_source_table IS NOT NULL THEN
74    -- Set source table to user given source table instead of parent table
75    v_source_schemaname := NULL;
76    v_source_tablename := NULL;
77
78    SELECT schemaname, tablename INTO v_source_schemaname, v_source_tablename
79    FROM pg_catalog.pg_tables
80    WHERE schemaname = split_part(p_source_table, '.', 1)::name
81    AND tablename = split_part(p_source_table, '.', 2)::name;
82
83    IF v_source_tablename IS NULL THEN
84        RAISE EXCEPTION 'Given source table does not exist in system catalogs: %', p_source_table;
85    END IF;
86ELSIF v_partition_type = 'native' AND current_setting('server_version_num')::int >= 110000 THEN
87
88    IF p_batch_interval IS NOT NULL AND p_batch_interval != v_partition_interval THEN
89        -- This is true because all data for a given child table must be moved out of the default partition before the child table can be created.
90        -- So cannot create the child table when only some of the data has been moved out of the default partition.
91        RAISE EXCEPTION 'Custom intervals are not allowed when moving data out of the DEFAULT partition in a native set. Please leave p_interval/p_batch_interval parameters unset or NULL to allow use of partition set''s default partitioning interval.';
92    END IF;
93    -- Set source table to default table if PG11+, p_source_table is not set, and it exists
94    -- Otherwise just return with a DEBUG that no data source exists
95    v_sql := format('SELECT n.nspname::text, c.relname::text FROM
96        pg_catalog.pg_inherits h
97        JOIN pg_catalog.pg_class c ON c.oid = h.inhrelid
98        JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
99        WHERE h.inhparent = ''%I.%I''::regclass
100        AND pg_get_expr(relpartbound, c.oid) = ''DEFAULT'''
101        , v_source_schemaname
102        , v_source_tablename);
103
104    EXECUTE v_sql INTO v_default_schemaname, v_default_tablename;
105
106    IF v_default_tablename IS NOT NULL THEN
107        v_source_schemaname := v_default_schemaname;
108        v_source_tablename := v_default_tablename;
109
110        v_default_exists := true;
111        EXECUTE format ('CREATE TEMP TABLE IF NOT EXISTS partman_temp_data_storage (LIKE %I.%I INCLUDING INDEXES) ON COMMIT DROP', v_source_schemaname, v_source_tablename);
112    ELSE
113        RAISE DEBUG 'No default table found when partition_data_id() was called';
114        RETURN v_total_rows;
115    END IF;
116
117END IF;
118
119SELECT current_setting('search_path') INTO v_old_search_path;
120EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_new_search_path, 'false');
121
122IF p_batch_interval IS NULL OR p_batch_interval > v_partition_interval THEN
123    p_batch_interval := v_partition_interval;
124END IF;
125
126-- Generate column list to use in SELECT/INSERT statements below. Allows for exclusion of GENERATED (or any other desired) columns.
127v_sql := format ('SELECT ''"''||string_agg(attname, ''","'')||''"'' FROM pg_catalog.pg_attribute a
128                    JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
129                    JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
130                    WHERE n.nspname = %L
131                    AND c.relname = %L
132                    AND a.attnum > 0
133                    AND a.attisdropped = false'
134                  , v_source_schemaname
135                  , v_source_tablename);
136
137IF p_ignored_columns IS NOT NULL THEN
138    FOREACH v_col IN ARRAY p_ignored_columns LOOP
139        v_sql := v_sql || format(' AND attname != %L ', v_col);
140    END LOOP;
141END IF;
142
143EXECUTE v_sql INTO v_column_list;
144
145FOR i IN 1..p_batch_count LOOP
146
147    IF p_order = 'ASC' THEN
148        EXECUTE format('SELECT min(%I) FROM ONLY %I.%I', v_control, v_source_schemaname, v_source_tablename) INTO v_start_control;
149        IF v_start_control IS NULL THEN
150            EXIT;
151        END IF;
152        v_min_partition_id = v_start_control - (v_start_control % v_partition_interval);
153        v_partition_id := ARRAY[v_min_partition_id];
154        -- Check if custom batch interval overflows current partition maximum
155        IF (v_start_control + p_batch_interval) >= (v_min_partition_id + v_partition_interval) THEN
156            v_max_partition_id := v_min_partition_id + v_partition_interval;
157        ELSE
158            v_max_partition_id := v_start_control + p_batch_interval;
159        END IF;
160
161    ELSIF p_order = 'DESC' THEN
162        EXECUTE format('SELECT max(%I) FROM ONLY %I.%I', v_control, v_source_schemaname, v_source_tablename) INTO v_start_control;
163        IF v_start_control IS NULL THEN
164            EXIT;
165        END IF;
166        v_min_partition_id = v_start_control - (v_start_control % v_partition_interval);
167        -- Must be greater than max value still in parent table since query below grabs < max
168        v_max_partition_id := v_min_partition_id + v_partition_interval;
169        v_partition_id := ARRAY[v_min_partition_id];
170        -- Make sure minimum doesn't underflow current partition minimum
171        IF (v_start_control - p_batch_interval) >= v_min_partition_id THEN
172            v_min_partition_id = v_start_control - p_batch_interval;
173        END IF;
174    ELSE
175        RAISE EXCEPTION 'Invalid value for p_order. Must be ASC or DESC';
176    END IF;
177
178    -- do some locking with timeout, if required
179    IF p_lock_wait > 0  THEN
180        v_lock_iter := 0;
181        WHILE v_lock_iter <= 5 LOOP
182            v_lock_iter := v_lock_iter + 1;
183            BEGIN
184                v_sql := format('SELECT %s FROM ONLY %I.%I WHERE %I >= %s AND %I < %s FOR UPDATE NOWAIT'
185                    , v_column_list
186                    , v_source_schemaname
187                    , v_source_tablename
188                    , v_control
189                    , v_min_partition_id
190                    , v_control
191                    , v_max_partition_id);
192                EXECUTE v_sql;
193                v_lock_obtained := TRUE;
194                EXCEPTION
195                WHEN lock_not_available THEN
196                    PERFORM pg_sleep( p_lock_wait / 5.0 );
197                    CONTINUE;
198            END;
199            EXIT WHEN v_lock_obtained;
200    END LOOP;
201    IF NOT v_lock_obtained THEN
202        RETURN -1;
203    END IF;
204END IF;
205
206v_current_partition_name := @extschema@.check_name_length(COALESCE(v_parent_tablename), v_min_partition_id::text, TRUE);
207
208IF v_default_exists THEN
209
210    -- Child tables cannot be created in native partitioning if data that belongs to it exists in the default
211    -- Have to move data out to temporary location, create child table, then move it back
212
213    -- Temp table created above to avoid excessive temp creation in loop
214    EXECUTE format('WITH partition_data AS (
215            DELETE FROM %1$I.%2$I WHERE %3$I >= %4$s AND %3$I < %5$s RETURNING *)
216        INSERT INTO partman_temp_data_storage (%6$s) SELECT %6$s FROM partition_data'
217        , v_source_schemaname
218        , v_source_tablename
219        , v_control
220        , v_min_partition_id
221        , v_max_partition_id
222        , v_column_list);
223
224    PERFORM @extschema@.create_partition_id(p_parent_table, v_partition_id, p_analyze);
225
226    EXECUTE format('WITH partition_data AS (
227            DELETE FROM partman_temp_data_storage RETURNING *)
228        INSERT INTO %1$I.%2$I (%3$s) SELECT %3$s FROM partition_data'
229        , v_source_schemaname
230        , v_current_partition_name
231        , v_column_list);
232
233
234ELSE
235
236    PERFORM @extschema@.create_partition_id(p_parent_table, v_partition_id, p_analyze);
237
238    EXECUTE format('WITH partition_data AS (
239            DELETE FROM ONLY %1$I.%2$I WHERE %3$I >= %4$s AND %3$I < %5$s RETURNING *)
240        INSERT INTO %1$I.%6$I (%7$s) SELECT %7$s FROM partition_data'
241        , v_source_schemaname
242        , v_source_tablename
243        , v_control
244        , v_min_partition_id
245        , v_max_partition_id
246        , v_current_partition_name
247        , v_column_list);
248
249END IF;
250
251GET DIAGNOSTICS v_rowcount = ROW_COUNT;
252v_total_rows := v_total_rows + v_rowcount;
253IF v_rowcount = 0 THEN
254    EXIT;
255END IF;
256
257END LOOP;
258
259IF v_partition_type = 'partman' THEN
260    PERFORM @extschema@.create_function_id(p_parent_table);
261END IF;
262
263EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false');
264
265RETURN v_total_rows;
266
267END
268$$;
269
270