1CREATE FUNCTION @extschema@.partition_data_id(p_parent_table text 2 , p_batch_count int DEFAULT 1 3 , p_batch_interval bigint DEFAULT NULL 4 , p_lock_wait numeric DEFAULT 0 5 , p_order text DEFAULT 'ASC' 6 , p_analyze boolean DEFAULT true 7 , p_source_table text DEFAULT NULL 8 , p_ignored_columns text[] DEFAULT NULL) 9RETURNS bigint 10LANGUAGE plpgsql 11AS $$ 12DECLARE 13 14v_col text; 15v_column_list text; 16v_control text; 17v_control_type text; 18v_current_partition_name text; 19v_default_exists boolean; 20v_default_schemaname text; 21v_default_tablename text; 22v_epoch text; 23v_lock_iter int := 1; 24v_lock_obtained boolean := FALSE; 25v_max_partition_id bigint; 26v_min_partition_id bigint; 27v_new_search_path text := '@extschema@,pg_temp'; 28v_old_search_path text; 29v_parent_tablename text; 30v_partition_interval bigint; 31v_partition_id bigint[]; 32v_partition_type text; 33v_rowcount bigint; 34v_source_schemaname text; 35v_source_tablename text; 36v_sql text; 37v_start_control bigint; 38v_total_rows bigint := 0; 39 40BEGIN 41 /* 42 * Populate the child table(s) of an id-based partition set with old data from the original parent 43 */ 44 45 SELECT partition_interval::bigint 46 , partition_type 47 , control 48 , epoch 49 INTO v_partition_interval 50 , v_partition_type 51 , v_control 52 , v_epoch 53 FROM @extschema@.part_config 54 WHERE parent_table = p_parent_table; 55 IF NOT FOUND THEN 56 RAISE EXCEPTION 'ERROR: No entry in part_config found for given table: %', p_parent_table; 57 END IF; 58 59SELECT schemaname, tablename INTO v_source_schemaname, v_source_tablename 60FROM pg_catalog.pg_tables 61WHERE schemaname = split_part(p_parent_table, '.', 1)::name 62AND tablename = split_part(p_parent_table, '.', 2)::name; 63 64-- Preserve real parent tablename for use below 65v_parent_tablename := v_source_tablename; 66 67SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_source_schemaname, v_source_tablename, v_control); 68 69IF v_control_type <> 'id' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN 70 RAISE EXCEPTION 'Control column for given partition set is not id/serial based or epoch flag is set for time-based partitioning.'; 71END IF; 72 73IF p_source_table IS NOT NULL THEN 74 -- Set source table to user given source table instead of parent table 75 v_source_schemaname := NULL; 76 v_source_tablename := NULL; 77 78 SELECT schemaname, tablename INTO v_source_schemaname, v_source_tablename 79 FROM pg_catalog.pg_tables 80 WHERE schemaname = split_part(p_source_table, '.', 1)::name 81 AND tablename = split_part(p_source_table, '.', 2)::name; 82 83 IF v_source_tablename IS NULL THEN 84 RAISE EXCEPTION 'Given source table does not exist in system catalogs: %', p_source_table; 85 END IF; 86ELSIF v_partition_type = 'native' AND current_setting('server_version_num')::int >= 110000 THEN 87 88 IF p_batch_interval IS NOT NULL AND p_batch_interval != v_partition_interval THEN 89 -- This is true because all data for a given child table must be moved out of the default partition before the child table can be created. 90 -- So cannot create the child table when only some of the data has been moved out of the default partition. 91 RAISE EXCEPTION 'Custom intervals are not allowed when moving data out of the DEFAULT partition in a native set. Please leave p_interval/p_batch_interval parameters unset or NULL to allow use of partition set''s default partitioning interval.'; 92 END IF; 93 -- Set source table to default table if PG11+, p_source_table is not set, and it exists 94 -- Otherwise just return with a DEBUG that no data source exists 95 v_sql := format('SELECT n.nspname::text, c.relname::text FROM 96 pg_catalog.pg_inherits h 97 JOIN pg_catalog.pg_class c ON c.oid = h.inhrelid 98 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid 99 WHERE h.inhparent = ''%I.%I''::regclass 100 AND pg_get_expr(relpartbound, c.oid) = ''DEFAULT''' 101 , v_source_schemaname 102 , v_source_tablename); 103 104 EXECUTE v_sql INTO v_default_schemaname, v_default_tablename; 105 106 IF v_default_tablename IS NOT NULL THEN 107 v_source_schemaname := v_default_schemaname; 108 v_source_tablename := v_default_tablename; 109 110 v_default_exists := true; 111 EXECUTE format ('CREATE TEMP TABLE IF NOT EXISTS partman_temp_data_storage (LIKE %I.%I INCLUDING INDEXES) ON COMMIT DROP', v_source_schemaname, v_source_tablename); 112 ELSE 113 RAISE DEBUG 'No default table found when partition_data_id() was called'; 114 RETURN v_total_rows; 115 END IF; 116 117END IF; 118 119SELECT current_setting('search_path') INTO v_old_search_path; 120EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_new_search_path, 'false'); 121 122IF p_batch_interval IS NULL OR p_batch_interval > v_partition_interval THEN 123 p_batch_interval := v_partition_interval; 124END IF; 125 126-- Generate column list to use in SELECT/INSERT statements below. Allows for exclusion of GENERATED (or any other desired) columns. 127v_sql := format ('SELECT ''"''||string_agg(attname, ''","'')||''"'' FROM pg_catalog.pg_attribute a 128 JOIN pg_catalog.pg_class c ON a.attrelid = c.oid 129 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid 130 WHERE n.nspname = %L 131 AND c.relname = %L 132 AND a.attnum > 0 133 AND a.attisdropped = false' 134 , v_source_schemaname 135 , v_source_tablename); 136 137IF p_ignored_columns IS NOT NULL THEN 138 FOREACH v_col IN ARRAY p_ignored_columns LOOP 139 v_sql := v_sql || format(' AND attname != %L ', v_col); 140 END LOOP; 141END IF; 142 143EXECUTE v_sql INTO v_column_list; 144 145FOR i IN 1..p_batch_count LOOP 146 147 IF p_order = 'ASC' THEN 148 EXECUTE format('SELECT min(%I) FROM ONLY %I.%I', v_control, v_source_schemaname, v_source_tablename) INTO v_start_control; 149 IF v_start_control IS NULL THEN 150 EXIT; 151 END IF; 152 v_min_partition_id = v_start_control - (v_start_control % v_partition_interval); 153 v_partition_id := ARRAY[v_min_partition_id]; 154 -- Check if custom batch interval overflows current partition maximum 155 IF (v_start_control + p_batch_interval) >= (v_min_partition_id + v_partition_interval) THEN 156 v_max_partition_id := v_min_partition_id + v_partition_interval; 157 ELSE 158 v_max_partition_id := v_start_control + p_batch_interval; 159 END IF; 160 161 ELSIF p_order = 'DESC' THEN 162 EXECUTE format('SELECT max(%I) FROM ONLY %I.%I', v_control, v_source_schemaname, v_source_tablename) INTO v_start_control; 163 IF v_start_control IS NULL THEN 164 EXIT; 165 END IF; 166 v_min_partition_id = v_start_control - (v_start_control % v_partition_interval); 167 -- Must be greater than max value still in parent table since query below grabs < max 168 v_max_partition_id := v_min_partition_id + v_partition_interval; 169 v_partition_id := ARRAY[v_min_partition_id]; 170 -- Make sure minimum doesn't underflow current partition minimum 171 IF (v_start_control - p_batch_interval) >= v_min_partition_id THEN 172 v_min_partition_id = v_start_control - p_batch_interval; 173 END IF; 174 ELSE 175 RAISE EXCEPTION 'Invalid value for p_order. Must be ASC or DESC'; 176 END IF; 177 178 -- do some locking with timeout, if required 179 IF p_lock_wait > 0 THEN 180 v_lock_iter := 0; 181 WHILE v_lock_iter <= 5 LOOP 182 v_lock_iter := v_lock_iter + 1; 183 BEGIN 184 v_sql := format('SELECT %s FROM ONLY %I.%I WHERE %I >= %s AND %I < %s FOR UPDATE NOWAIT' 185 , v_column_list 186 , v_source_schemaname 187 , v_source_tablename 188 , v_control 189 , v_min_partition_id 190 , v_control 191 , v_max_partition_id); 192 EXECUTE v_sql; 193 v_lock_obtained := TRUE; 194 EXCEPTION 195 WHEN lock_not_available THEN 196 PERFORM pg_sleep( p_lock_wait / 5.0 ); 197 CONTINUE; 198 END; 199 EXIT WHEN v_lock_obtained; 200 END LOOP; 201 IF NOT v_lock_obtained THEN 202 RETURN -1; 203 END IF; 204END IF; 205 206v_current_partition_name := @extschema@.check_name_length(COALESCE(v_parent_tablename), v_min_partition_id::text, TRUE); 207 208IF v_default_exists THEN 209 210 -- Child tables cannot be created in native partitioning if data that belongs to it exists in the default 211 -- Have to move data out to temporary location, create child table, then move it back 212 213 -- Temp table created above to avoid excessive temp creation in loop 214 EXECUTE format('WITH partition_data AS ( 215 DELETE FROM %1$I.%2$I WHERE %3$I >= %4$s AND %3$I < %5$s RETURNING *) 216 INSERT INTO partman_temp_data_storage (%6$s) SELECT %6$s FROM partition_data' 217 , v_source_schemaname 218 , v_source_tablename 219 , v_control 220 , v_min_partition_id 221 , v_max_partition_id 222 , v_column_list); 223 224 PERFORM @extschema@.create_partition_id(p_parent_table, v_partition_id, p_analyze); 225 226 EXECUTE format('WITH partition_data AS ( 227 DELETE FROM partman_temp_data_storage RETURNING *) 228 INSERT INTO %1$I.%2$I (%3$s) SELECT %3$s FROM partition_data' 229 , v_source_schemaname 230 , v_current_partition_name 231 , v_column_list); 232 233 234ELSE 235 236 PERFORM @extschema@.create_partition_id(p_parent_table, v_partition_id, p_analyze); 237 238 EXECUTE format('WITH partition_data AS ( 239 DELETE FROM ONLY %1$I.%2$I WHERE %3$I >= %4$s AND %3$I < %5$s RETURNING *) 240 INSERT INTO %1$I.%6$I (%7$s) SELECT %7$s FROM partition_data' 241 , v_source_schemaname 242 , v_source_tablename 243 , v_control 244 , v_min_partition_id 245 , v_max_partition_id 246 , v_current_partition_name 247 , v_column_list); 248 249END IF; 250 251GET DIAGNOSTICS v_rowcount = ROW_COUNT; 252v_total_rows := v_total_rows + v_rowcount; 253IF v_rowcount = 0 THEN 254 EXIT; 255END IF; 256 257END LOOP; 258 259IF v_partition_type = 'partman' THEN 260 PERFORM @extschema@.create_function_id(p_parent_table); 261END IF; 262 263EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false'); 264 265RETURN v_total_rows; 266 267END 268$$; 269 270