1-- tests behaviour of INSERT INTO ... SELECT with repartitioning 2CREATE SCHEMA insert_select_repartition; 3SET search_path TO 'insert_select_repartition'; 4SET citus.next_shard_id TO 4213581; 5SET citus.shard_replication_factor TO 1; 6-- 4 shards, hash distributed. 7-- Negate distribution column value. 8SET citus.shard_count TO 4; 9CREATE TABLE source_table(a int); 10SELECT create_distributed_table('source_table', 'a'); 11 create_distributed_table 12--------------------------------------------------------------------- 13 14(1 row) 15 16INSERT INTO source_table SELECT * FROM generate_series(1, 10); 17CREATE TABLE target_table(a int); 18SELECT create_distributed_table('target_table', 'a'); 19 create_distributed_table 20--------------------------------------------------------------------- 21 22(1 row) 23 24SET client_min_messages TO DEBUG2; 25INSERT INTO target_table SELECT -a FROM source_table; 26DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 27DETAIL: Subquery contains an operator in the same position as the target table's partition column. 28HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. 29DEBUG: Router planner cannot handle multi-shard select queries 30DEBUG: performing repartitioned INSERT ... SELECT 31DEBUG: partitioning SELECT query by column index 0 with name 'a' 32DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213583_to_0,repartitioned_results_xxxxx_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) 33DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213586 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213582_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) 34DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_2,repartitioned_results_xxxxx_from_4213582_to_2,repartitioned_results_xxxxx_from_4213584_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) 35DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213588 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) 36RESET client_min_messages; 37SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a; 38 a 39--------------------------------------------------------------------- 40 -7 41 -3 42 -1 43(3 rows) 44 45DROP TABLE source_table, target_table; 46-- 47-- range partitioning, composite distribution column 48-- 49CREATE TYPE composite_key_type AS (f1 int, f2 text); 50-- source 51CREATE TABLE source_table(f1 int, key composite_key_type, value int, mapped_key composite_key_type); 52SELECT create_distributed_table('source_table', 'key', 'range'); 53 create_distributed_table 54--------------------------------------------------------------------- 55 56(1 row) 57 58CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); 59INSERT INTO source_table VALUES (0, (0, 'a'), 1, (0, 'a')); 60INSERT INTO source_table VALUES (1, (1, 'b'), 2, (26, 'b')); 61INSERT INTO source_table VALUES (2, (2, 'c'), 3, (3, 'c')); 62INSERT INTO source_table VALUES (3, (4, 'd'), 4, (27, 'd')); 63INSERT INTO source_table VALUES (4, (30, 'e'), 5, (30, 'e')); 64INSERT INTO source_table VALUES (5, (31, 'f'), 6, (31, 'f')); 65INSERT INTO source_table VALUES (6, (32, 'g'), 50, (8, 'g')); 66-- target 67CREATE TABLE target_table(f1 int DEFAULT 0, value int, key composite_key_type PRIMARY KEY); 68SELECT create_distributed_table('target_table', 'key', 'range'); 69 create_distributed_table 70--------------------------------------------------------------------- 71 72(1 row) 73 74CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); 75SET client_min_messages TO DEBUG2; 76INSERT INTO target_table SELECT f1, value, mapped_key FROM source_table; 77DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 78DETAIL: The target table's partition column should correspond to a partition column in the subquery. 79DEBUG: Router planner cannot handle multi-shard select queries 80DEBUG: performing repartitioned INSERT ... SELECT 81DEBUG: partitioning SELECT query by column index 2 with name 'key' 82DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) 83DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) 84RESET client_min_messages; 85SELECT * FROM target_table ORDER BY key; 86 f1 | value | key 87--------------------------------------------------------------------- 88 0 | 1 | (0,a) 89 2 | 3 | (3,c) 90 6 | 50 | (8,g) 91 1 | 2 | (26,b) 92 3 | 4 | (27,d) 93 4 | 5 | (30,e) 94 5 | 6 | (31,f) 95(7 rows) 96 97SELECT * FROM target_table WHERE key = (26, 'b')::composite_key_type; 98 f1 | value | key 99--------------------------------------------------------------------- 100 1 | 2 | (26,b) 101(1 row) 102 103-- with explicit column names 104TRUNCATE target_table; 105SET client_min_messages TO DEBUG2; 106INSERT INTO target_table(value, key) SELECT value, mapped_key FROM source_table; 107DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 108DETAIL: The target table's partition column should correspond to a partition column in the subquery. 109DEBUG: Router planner cannot handle multi-shard select queries 110DEBUG: performing repartitioned INSERT ... SELECT 111DEBUG: partitioning SELECT query by column index 2 with name 'key' 112DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) 113DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type) 114RESET client_min_messages; 115SELECT * FROM target_table ORDER BY key; 116 f1 | value | key 117--------------------------------------------------------------------- 118 0 | 1 | (0,a) 119 0 | 3 | (3,c) 120 0 | 50 | (8,g) 121 0 | 2 | (26,b) 122 0 | 4 | (27,d) 123 0 | 5 | (30,e) 124 0 | 6 | (31,f) 125(7 rows) 126 127-- missing value for a column 128TRUNCATE target_table; 129SET client_min_messages TO DEBUG2; 130INSERT INTO target_table(key) SELECT mapped_key AS key_renamed FROM source_table; 131DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 132DETAIL: The target table's partition column should correspond to a partition column in the subquery. 133DEBUG: Router planner cannot handle multi-shard select queries 134DEBUG: performing repartitioned INSERT ... SELECT 135DEBUG: partitioning SELECT query by column index 1 with name 'key' 136DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) 137DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) 138RESET client_min_messages; 139SELECT * FROM target_table ORDER BY key; 140 f1 | value | key 141--------------------------------------------------------------------- 142 0 | | (0,a) 143 0 | | (3,c) 144 0 | | (8,g) 145 0 | | (26,b) 146 0 | | (27,d) 147 0 | | (30,e) 148 0 | | (31,f) 149(7 rows) 150 151-- ON CONFLICT 152SET client_min_messages TO DEBUG2; 153INSERT INTO target_table(key) 154SELECT mapped_key AS key_renamed FROM source_table 155WHERE (mapped_key).f1 % 2 = 1 156ON CONFLICT (key) DO UPDATE SET f1=1; 157DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 158DETAIL: The target table's partition column should correspond to a partition column in the subquery. 159DEBUG: Router planner cannot handle multi-shard select queries 160DEBUG: performing repartitioned INSERT ... SELECT 161DEBUG: partitioning SELECT query by column index 1 with name 'key' 162DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1 163DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1 164RESET client_min_messages; 165SELECT * FROM target_table ORDER BY key; 166 f1 | value | key 167--------------------------------------------------------------------- 168 0 | | (0,a) 169 1 | | (3,c) 170 0 | | (8,g) 171 0 | | (26,b) 172 1 | | (27,d) 173 0 | | (30,e) 174 1 | | (31,f) 175(7 rows) 176 177-- missing value for distribution column 178INSERT INTO target_table(value) SELECT value FROM source_table; 179ERROR: the partition column of table insert_select_repartition.target_table should have a value 180DROP TABLE source_table, target_table; 181-- different column types 182-- verifies that we add necessary casts, otherwise even shard routing won't 183-- work correctly and we will see 2 values for the same primary key. 184CREATE TABLE target_table(col_1 int primary key, col_2 int); 185SELECT create_distributed_table('target_table','col_1'); 186 create_distributed_table 187--------------------------------------------------------------------- 188 189(1 row) 190 191INSERT INTO target_table VALUES (1,2), (2,3), (3,4), (4,5), (5,6); 192CREATE TABLE source_table(col_1 numeric, col_2 numeric, col_3 numeric); 193SELECT create_distributed_table('source_table','col_1'); 194 create_distributed_table 195--------------------------------------------------------------------- 196 197(1 row) 198 199INSERT INTO source_table VALUES (1,1,1), (3,3,3), (5,5,5); 200SET client_min_messages TO DEBUG2; 201INSERT INTO target_table 202SELECT 203 col_1, col_2 204FROM 205 source_table 206ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2; 207DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 208DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. 209DEBUG: Router planner cannot handle multi-shard select queries 210DEBUG: performing repartitioned INSERT ... SELECT 211DEBUG: partitioning SELECT query by column index 0 with name 'col_1' 212DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213593 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213597_to_0,repartitioned_results_xxxxx_from_4213600_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 213DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213594 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213599_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 214RESET client_min_messages; 215SELECT * FROM target_table ORDER BY 1; 216 col_1 | col_2 217--------------------------------------------------------------------- 218 1 | 1 219 2 | 3 220 3 | 3 221 4 | 5 222 5 | 5 223(5 rows) 224 225DROP TABLE source_table, target_table; 226-- 227-- array coercion 228-- 229SET citus.shard_count TO 3; 230CREATE TABLE source_table(a int, mapped_key int, c float[]); 231SELECT create_distributed_table('source_table', 'a'); 232 create_distributed_table 233--------------------------------------------------------------------- 234 235(1 row) 236 237INSERT INTO source_table VALUES (1, -1, ARRAY[1.1, 2.2, 3.3]), (2, -2, ARRAY[4.5, 5.8]), 238 (3, -3, ARRAY[]::float[]), (4, -4, ARRAY[3.3]); 239SET citus.shard_count TO 2; 240CREATE TABLE target_table(a int, b int[]); 241SELECT create_distributed_table('target_table', 'a'); 242 create_distributed_table 243--------------------------------------------------------------------- 244 245(1 row) 246 247SET client_min_messages TO DEBUG1; 248INSERT INTO target_table SELECT mapped_key, c FROM source_table; 249DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 250DETAIL: The target table's partition column should correspond to a partition column in the subquery. 251DEBUG: performing repartitioned INSERT ... SELECT 252RESET client_min_messages; 253SELECT * FROM target_table ORDER BY a; 254 a | b 255--------------------------------------------------------------------- 256 -4 | {3} 257 -3 | {} 258 -2 | {4,6} 259 -1 | {1,2,3} 260(4 rows) 261 262-- 263-- worker queries can have more columns than necessary. ExpandWorkerTargetEntry() 264-- might add additional columns to the target list. 265-- 266TRUNCATE target_table; 267\set VERBOSITY TERSE 268-- first verify that the SELECT query below fetches 3 projected columns from workers 269SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG; 270 CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; 271DEBUG: Router planner cannot handle multi-shard select queries 272NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a 273NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a 274NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a 275RESET citus.log_remote_commands; RESET client_min_messages; 276DROP TABLE results; 277-- now verify that we don't write the extra columns to the intermediate result files and 278-- insertion to the target works fine. 279SET client_min_messages TO DEBUG1; 280INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; 281DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 282DEBUG: performing repartitioned INSERT ... SELECT 283RESET client_min_messages; 284SELECT * FROM target_table ORDER BY a; 285 a | b 286--------------------------------------------------------------------- 287 -4 | {-4} 288 -3 | {-3} 289 -2 | {-2} 290 -1 | {-1} 291(4 rows) 292 293-- 294-- repartitioned INSERT/SELECT followed/preceded by other DML in same transaction 295-- 296-- case 1. followed by DELETE 297TRUNCATE target_table; 298BEGIN; 299INSERT INTO target_table SELECT mapped_key, c FROM source_table; 300SELECT * FROM target_table ORDER BY a; 301 a | b 302--------------------------------------------------------------------- 303 -4 | {3} 304 -3 | {} 305 -2 | {4,6} 306 -1 | {1,2,3} 307(4 rows) 308 309DELETE FROM target_table; 310END; 311SELECT * FROM target_table ORDER BY a; 312 a | b 313--------------------------------------------------------------------- 314(0 rows) 315 316-- case 2. followed by UPDATE 317TRUNCATE target_table; 318BEGIN; 319INSERT INTO target_table SELECT mapped_key, c FROM source_table; 320SELECT * FROM target_table ORDER BY a; 321 a | b 322--------------------------------------------------------------------- 323 -4 | {3} 324 -3 | {} 325 -2 | {4,6} 326 -1 | {1,2,3} 327(4 rows) 328 329UPDATE target_table SET b=array_append(b, a); 330END; 331SELECT * FROM target_table ORDER BY a; 332 a | b 333--------------------------------------------------------------------- 334 -4 | {3,-4} 335 -3 | {-3} 336 -2 | {4,6,-2} 337 -1 | {1,2,3,-1} 338(4 rows) 339 340-- case 3. followed by multi-row INSERT 341TRUNCATE target_table; 342BEGIN; 343INSERT INTO target_table SELECT mapped_key, c FROM source_table; 344SELECT * FROM target_table ORDER BY a; 345 a | b 346--------------------------------------------------------------------- 347 -4 | {3} 348 -3 | {} 349 -2 | {4,6} 350 -1 | {1,2,3} 351(4 rows) 352 353INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]); 354END; 355SELECT * FROM target_table ORDER BY a; 356 a | b 357--------------------------------------------------------------------- 358 -7 | {999} 359 -6 | {11,12} 360 -5 | {10,11} 361 -4 | {3} 362 -3 | {} 363 -2 | {4,6} 364 -1 | {1,2,3} 365(7 rows) 366 367-- case 4. followed by distributed INSERT/SELECT 368TRUNCATE target_table; 369BEGIN; 370INSERT INTO target_table SELECT mapped_key, c FROM source_table; 371SELECT * FROM target_table ORDER BY a; 372 a | b 373--------------------------------------------------------------------- 374 -4 | {3} 375 -3 | {} 376 -2 | {4,6} 377 -1 | {1,2,3} 378(4 rows) 379 380INSERT INTO target_table SELECT * FROM target_table; 381END; 382SELECT * FROM target_table ORDER BY a; 383 a | b 384--------------------------------------------------------------------- 385 -4 | {3} 386 -4 | {3} 387 -3 | {} 388 -3 | {} 389 -2 | {4,6} 390 -2 | {4,6} 391 -1 | {1,2,3} 392 -1 | {1,2,3} 393(8 rows) 394 395-- case 5. preceded by DELETE 396TRUNCATE target_table; 397BEGIN; 398DELETE FROM target_table; 399INSERT INTO target_table SELECT mapped_key, c FROM source_table; 400END; 401SELECT * FROM target_table ORDER BY a; 402 a | b 403--------------------------------------------------------------------- 404 -4 | {3} 405 -3 | {} 406 -2 | {4,6} 407 -1 | {1,2,3} 408(4 rows) 409 410-- case 6. preceded by UPDATE 411TRUNCATE target_table; 412BEGIN; 413UPDATE target_table SET b=array_append(b, a); 414INSERT INTO target_table SELECT mapped_key, c FROM source_table; 415END; 416SELECT * FROM target_table ORDER BY a; 417 a | b 418--------------------------------------------------------------------- 419 -4 | {3} 420 -3 | {} 421 -2 | {4,6} 422 -1 | {1,2,3} 423(4 rows) 424 425-- case 7. preceded by multi-row INSERT 426TRUNCATE target_table; 427BEGIN; 428INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]); 429INSERT INTO target_table SELECT mapped_key, c FROM source_table; 430END; 431SELECT * FROM target_table ORDER BY a; 432 a | b 433--------------------------------------------------------------------- 434 -7 | {999} 435 -6 | {11,12} 436 -5 | {10,11} 437 -4 | {3} 438 -3 | {} 439 -2 | {4,6} 440 -1 | {1,2,3} 441(7 rows) 442 443-- case 8. preceded by distributed INSERT/SELECT 444TRUNCATE target_table; 445INSERT INTO target_table SELECT mapped_key, c FROM source_table; 446BEGIN; 447INSERT INTO target_table SELECT * FROM target_table; 448INSERT INTO target_table SELECT mapped_key, c FROM source_table; 449END; 450SELECT * FROM target_table ORDER BY a; 451 a | b 452--------------------------------------------------------------------- 453 -4 | {3} 454 -4 | {3} 455 -4 | {3} 456 -3 | {} 457 -3 | {} 458 -3 | {} 459 -2 | {4,6} 460 -2 | {4,6} 461 -2 | {4,6} 462 -1 | {1,2,3} 463 -1 | {1,2,3} 464 -1 | {1,2,3} 465(12 rows) 466 467-- 468-- repartitioned INSERT/SELECT with RETURNING 469-- 470TRUNCATE target_table; 471SET client_min_messages TO DEBUG1; 472WITH c AS ( 473 INSERT INTO target_table 474 SELECT mapped_key, c FROM source_table 475 RETURNING *) 476SELECT * FROM c ORDER by a; 477DEBUG: generating subplan XXX_1 for CTE c: INSERT INTO insert_select_repartition.target_table (a, b) SELECT mapped_key, c FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b 478DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 479DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer[])) c ORDER BY a 480DEBUG: performing repartitioned INSERT ... SELECT 481 a | b 482--------------------------------------------------------------------- 483 -4 | {3} 484 -3 | {} 485 -2 | {4,6} 486 -1 | {1,2,3} 487(4 rows) 488 489RESET client_min_messages; 490-- 491-- in combination with CTEs 492-- 493TRUNCATE target_table; 494SET client_min_messages TO DEBUG1; 495WITH t AS ( 496 SELECT mapped_key, a, c FROM source_table 497 WHERE a > floor(random()) 498) 499INSERT INTO target_table 500SELECT mapped_key, c FROM t NATURAL JOIN source_table; 501DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries 502DEBUG: generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random())) 503DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, (c)::integer[] AS b FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery 504DEBUG: performing repartitioned INSERT ... SELECT 505RESET client_min_messages; 506SELECT * FROM target_table ORDER BY a; 507 a | b 508--------------------------------------------------------------------- 509 -4 | {3} 510 -3 | {} 511 -2 | {4,6} 512 -1 | {1,2,3} 513(4 rows) 514 515DROP TABLE source_table, target_table; 516-- 517-- The case where select query has a GROUP BY ... 518-- 519SET citus.shard_count TO 4; 520CREATE TABLE source_table(a int, b int); 521SELECT create_distributed_table('source_table', 'a'); 522 create_distributed_table 523--------------------------------------------------------------------- 524 525(1 row) 526 527SET citus.shard_count TO 3; 528CREATE TABLE target_table(a int, b int); 529SELECT create_distributed_table('target_table', 'a'); 530 create_distributed_table 531--------------------------------------------------------------------- 532 533(1 row) 534 535INSERT INTO source_table SELECT floor(i/4), i*i FROM generate_series(1, 20) i; 536SET client_min_messages TO DEBUG1; 537INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; 538DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 539DEBUG: performing repartitioned INSERT ... SELECT 540RESET client_min_messages; 541SELECT * FROM target_table ORDER BY a; 542 a | b 543--------------------------------------------------------------------- 544 0 | 9 545 1 | 49 546 2 | 121 547 3 | 225 548 4 | 361 549 5 | 400 550(6 rows) 551 552-- 553-- EXPLAIN output should specify repartitioned INSERT/SELECT 554-- 555EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; 556 QUERY PLAN 557--------------------------------------------------------------------- 558 Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) 559 INSERT/SELECT method: repartition 560 -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) 561 Task Count: 4 562 Tasks Shown: One of 4 563 -> Task 564 Node: host=localhost port=xxxxx dbname=regression 565 -> HashAggregate (cost=43.90..45.90 rows=200 width=8) 566 Group Key: a 567 -> Seq Scan on source_table_4213606 source_table (cost=0.00..32.60 rows=2260 width=8) 568(10 rows) 569 570-- 571-- EXPLAIN ANALYZE is currently not supported 572-- 573EXPLAIN ANALYZE INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; 574ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning 575-- 576-- Duplicate names in target list 577-- 578TRUNCATE target_table; 579SET client_min_messages TO DEBUG2; 580INSERT INTO target_table 581 SELECT max(b), max(b) FROM source_table GROUP BY a; 582DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 583DEBUG: Router planner cannot handle multi-shard select queries 584DEBUG: performing repartitioned INSERT ... SELECT 585DEBUG: partitioning SELECT query by column index 0 with name 'a' 586DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) 587DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1,repartitioned_results_xxxxx_from_4213609_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) 588DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_2,repartitioned_results_xxxxx_from_4213607_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) 589RESET client_min_messages; 590SELECT * FROM target_table ORDER BY a; 591 a | b 592--------------------------------------------------------------------- 593 9 | 9 594 49 | 49 595 121 | 121 596 225 | 225 597 361 | 361 598 400 | 400 599(6 rows) 600 601-- 602-- Prepared INSERT/SELECT 603-- 604TRUNCATE target_table; 605PREPARE insert_plan(int, int) AS 606INSERT INTO target_table 607 SELECT a, max(b) FROM source_table 608 WHERE a BETWEEN $1 AND $2 GROUP BY a; 609SET client_min_messages TO DEBUG1; 610EXECUTE insert_plan(0, 2); 611DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 612DEBUG: performing repartitioned INSERT ... SELECT 613EXECUTE insert_plan(0, 2); 614DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 615DEBUG: performing repartitioned INSERT ... SELECT 616EXECUTE insert_plan(0, 2); 617DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 618DEBUG: performing repartitioned INSERT ... SELECT 619EXECUTE insert_plan(0, 2); 620DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 621DEBUG: performing repartitioned INSERT ... SELECT 622EXECUTE insert_plan(0, 2); 623DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 624DEBUG: performing repartitioned INSERT ... SELECT 625EXECUTE insert_plan(0, 2); 626DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 627DEBUG: performing repartitioned INSERT ... SELECT 628EXECUTE insert_plan(2, 4); 629DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 630DEBUG: performing repartitioned INSERT ... SELECT 631EXECUTE insert_plan(2, 4); 632DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 633DEBUG: performing repartitioned INSERT ... SELECT 634EXECUTE insert_plan(2, 4); 635DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 636DEBUG: performing repartitioned INSERT ... SELECT 637EXECUTE insert_plan(2, 4); 638DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 639DEBUG: performing repartitioned INSERT ... SELECT 640EXECUTE insert_plan(2, 4); 641DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 642DEBUG: performing repartitioned INSERT ... SELECT 643EXECUTE insert_plan(2, 4); 644DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 645DEBUG: performing repartitioned INSERT ... SELECT 646RESET client_min_messages; 647SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; 648 a | count | distinct_values 649--------------------------------------------------------------------- 650 0 | 6 | 1 651 1 | 6 | 1 652 2 | 12 | 1 653 3 | 6 | 1 654 4 | 6 | 1 655(5 rows) 656 657DEALLOCATE insert_plan; 658-- 659-- Prepared router INSERT/SELECT. We currently use pull to coordinator when the 660-- distributed query has a single task. 661-- 662TRUNCATE target_table; 663PREPARE insert_plan(int) AS 664INSERT INTO target_table 665 SELECT a, max(b) FROM source_table 666 WHERE a=$1 GROUP BY a; 667SET client_min_messages TO DEBUG1; 668EXECUTE insert_plan(0); 669DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 670DEBUG: Collecting INSERT ... SELECT results on coordinator 671EXECUTE insert_plan(0); 672DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 673DEBUG: Collecting INSERT ... SELECT results on coordinator 674EXECUTE insert_plan(0); 675DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 676DEBUG: Collecting INSERT ... SELECT results on coordinator 677EXECUTE insert_plan(0); 678DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 679DEBUG: Collecting INSERT ... SELECT results on coordinator 680EXECUTE insert_plan(0); 681DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 682DEBUG: Collecting INSERT ... SELECT results on coordinator 683EXECUTE insert_plan(0); 684DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 685DEBUG: Collecting INSERT ... SELECT results on coordinator 686EXECUTE insert_plan(0); 687DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 688DEBUG: Collecting INSERT ... SELECT results on coordinator 689RESET client_min_messages; 690SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; 691 a | count | distinct_values 692--------------------------------------------------------------------- 693 0 | 7 | 1 694(1 row) 695 696DEALLOCATE insert_plan; 697-- 698-- Prepared INSERT/SELECT with no parameters. 699-- 700TRUNCATE target_table; 701PREPARE insert_plan AS 702INSERT INTO target_table 703 SELECT a, max(b) FROM source_table 704 WHERE a BETWEEN 1 AND 2 GROUP BY a; 705EXPLAIN EXECUTE insert_plan; 706 QUERY PLAN 707--------------------------------------------------------------------- 708 Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) 709 INSERT/SELECT method: repartition 710 -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) 711 Task Count: 4 712 Tasks Shown: One of 4 713 -> Task 714 Node: host=localhost port=xxxxx dbname=regression 715 -> GroupAggregate (cost=44.09..44.28 rows=11 width=8) 716 Group Key: a 717 -> Sort (cost=44.09..44.12 rows=11 width=8) 718 Sort Key: a 719 -> Seq Scan on source_table_4213606 source_table (cost=0.00..43.90 rows=11 width=8) 720 Filter: ((a >= 1) AND (a <= 2)) 721(13 rows) 722 723SET client_min_messages TO DEBUG1; 724EXECUTE insert_plan; 725DEBUG: performing repartitioned INSERT ... SELECT 726EXECUTE insert_plan; 727DEBUG: performing repartitioned INSERT ... SELECT 728EXECUTE insert_plan; 729DEBUG: performing repartitioned INSERT ... SELECT 730EXECUTE insert_plan; 731DEBUG: performing repartitioned INSERT ... SELECT 732EXECUTE insert_plan; 733DEBUG: performing repartitioned INSERT ... SELECT 734EXECUTE insert_plan; 735DEBUG: performing repartitioned INSERT ... SELECT 736EXECUTE insert_plan; 737DEBUG: performing repartitioned INSERT ... SELECT 738RESET client_min_messages; 739SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a; 740 a | count | distinct_values 741--------------------------------------------------------------------- 742 1 | 7 | 1 743 2 | 7 | 1 744(2 rows) 745 746DEALLOCATE insert_plan; 747-- 748-- INSERT/SELECT in CTE 749-- 750TRUNCATE target_table; 751SET client_min_messages TO DEBUG2; 752WITH r AS ( 753 INSERT INTO target_table SELECT * FROM source_table RETURNING * 754) 755INSERT INTO target_table SELECT source_table.a, max(source_table.b) FROM source_table NATURAL JOIN r GROUP BY source_table.a; 756DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 757DEBUG: only SELECT, UPDATE, or DELETE common table expressions may be router planned 758DEBUG: generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b 759DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 760DEBUG: Router planner cannot handle multi-shard select queries 761DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery 762DEBUG: Router planner cannot handle multi-shard select queries 763DEBUG: performing repartitioned INSERT ... SELECT 764DEBUG: performing repartitioned INSERT ... SELECT 765DEBUG: partitioning SELECT query by column index 0 with name 'a' 766DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b 767DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b 768DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213609_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b 769DEBUG: partitioning SELECT query by column index 0 with name 'a' 770DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) 771DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) 772DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213609_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) 773RESET client_min_messages; 774SELECT * FROM target_table ORDER BY a, b; 775 a | b 776--------------------------------------------------------------------- 777 0 | 1 778 0 | 4 779 0 | 9 780 0 | 9 781 1 | 16 782 1 | 25 783 1 | 36 784 1 | 49 785 1 | 49 786 2 | 64 787 2 | 81 788 2 | 100 789 2 | 121 790 2 | 121 791 3 | 144 792 3 | 169 793 3 | 196 794 3 | 225 795 3 | 225 796 4 | 256 797 4 | 289 798 4 | 324 799 4 | 361 800 4 | 361 801 5 | 400 802 5 | 400 803(26 rows) 804 805DROP TABLE source_table, target_table; 806-- 807-- Constraint failure and rollback 808-- 809SET citus.shard_count TO 4; 810CREATE TABLE source_table(a int, b int); 811SELECT create_distributed_table('source_table', 'a'); 812 create_distributed_table 813--------------------------------------------------------------------- 814 815(1 row) 816 817INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i; 818UPDATE source_table SET b = NULL where b IN (9, 4); 819SET citus.shard_replication_factor TO 2; 820CREATE TABLE target_table(a int, b int not null); 821SELECT create_distributed_table('target_table', 'a', 'range'); 822 create_distributed_table 823--------------------------------------------------------------------- 824 825(1 row) 826 827CALL public.create_range_partitioned_shards('target_table', '{0,3,6,9}','{2,5,8,50}'); 828INSERT INTO target_table VALUES (11,9), (22,4); 829EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table; 830 QUERY PLAN 831--------------------------------------------------------------------- 832 Custom Scan (Citus INSERT ... SELECT) 833 INSERT/SELECT method: repartition 834 -> Custom Scan (Citus Adaptive) 835 Task Count: 4 836 Tasks Shown: One of 4 837 -> Task 838 Node: host=localhost port=xxxxx dbname=regression 839 -> Seq Scan on source_table_4213613 source_table 840(8 rows) 841 842EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL; 843 QUERY PLAN 844--------------------------------------------------------------------- 845 Custom Scan (Citus INSERT ... SELECT) 846 INSERT/SELECT method: repartition 847 -> Custom Scan (Citus Adaptive) 848 Task Count: 4 849 Tasks Shown: One of 4 850 -> Task 851 Node: host=localhost port=xxxxx dbname=regression 852 -> Seq Scan on source_table_4213613 source_table 853 Filter: (b IS NOT NULL) 854(9 rows) 855 856BEGIN; 857SAVEPOINT s1; 858INSERT INTO target_table SELECT * FROM source_table; 859ERROR: null value in column "b" violates not-null constraint 860ROLLBACK TO SAVEPOINT s1; 861INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL; 862END; 863SELECT * FROM target_table ORDER BY b; 864 a | b 865--------------------------------------------------------------------- 866 1 | 1 867 22 | 4 868 11 | 9 869 4 | 16 870 5 | 25 871 6 | 36 872 7 | 49 873 8 | 64 874 9 | 81 875 10 | 100 876(10 rows) 877 878-- verify that values have been replicated to both replicas 879SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport; 880 nodename | nodeport | shardid | success | result 881--------------------------------------------------------------------- 882 localhost | 57637 | 4213617 | t | 1 883 localhost | 57638 | 4213617 | t | 1 884 localhost | 57637 | 4213618 | t | 2 885 localhost | 57638 | 4213618 | t | 2 886 localhost | 57637 | 4213619 | t | 3 887 localhost | 57638 | 4213619 | t | 3 888 localhost | 57637 | 4213620 | t | 4 889 localhost | 57638 | 4213620 | t | 4 890(8 rows) 891 892-- 893-- Multiple casts in the SELECT query 894-- 895TRUNCATE target_table; 896SET client_min_messages TO DEBUG2; 897INSERT INTO target_table SELECT 1.12, b::bigint FROM source_table WHERE b IS NOT NULL; 898DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 899DEBUG: Router planner cannot handle multi-shard select queries 900DEBUG: performing repartitioned INSERT ... SELECT 901DEBUG: partitioning SELECT query by column index 0 with name 'a' 902DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213617 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213613_to_0,repartitioned_results_xxxxx_from_4213614_to_0,repartitioned_results_xxxxx_from_4213615_to_0,repartitioned_results_xxxxx_from_4213616_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) 903RESET client_min_messages; 904SELECT * FROM target_table ORDER BY a, b; 905 a | b 906--------------------------------------------------------------------- 907 1 | 1 908 1 | 16 909 1 | 25 910 1 | 36 911 1 | 49 912 1 | 64 913 1 | 81 914 1 | 100 915(8 rows) 916 917-- 918-- ROLLBACK after out of range error 919-- 920TRUNCATE target_table; 921BEGIN; 922INSERT INTO target_table SELECT a * 10, b FROM source_table WHERE b IS NOT NULL; 923ERROR: could not find shard for partition column value 924END; 925SELECT max(result) FROM run_command_on_placements('target_table', 'select count(*) from %s'); 926 max 927--------------------------------------------------------------------- 928 0 929(1 row) 930 931DROP TABLE source_table, target_table; 932-- 933-- Range partitioned target's ranges doesn't cover the whole range 934-- 935SET citus.shard_replication_factor TO 2; 936SET citus.shard_count TO 4; 937CREATE TABLE source_table(a int, b int); 938SELECT create_distributed_table('source_table', 'a'); 939 create_distributed_table 940--------------------------------------------------------------------- 941 942(1 row) 943 944INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i; 945SET citus.shard_replication_factor TO 2; 946CREATE TABLE target_table(b int not null, a float); 947SELECT create_distributed_table('target_table', 'a', 'range'); 948 create_distributed_table 949--------------------------------------------------------------------- 950 951(1 row) 952 953CALL public.create_range_partitioned_shards('target_table', '{0.0,3.5,6.5,9.5}','{2.9,5.9,8.9,50.0}'); 954INSERT INTO target_table SELECT b, a+0.6 FROM source_table; 955SELECT * FROM target_table ORDER BY a; 956 b | a 957--------------------------------------------------------------------- 958 1 | 1.6 959 4 | 2.6 960 9 | 3.6 961 16 | 4.6 962 25 | 5.6 963 36 | 6.6 964 49 | 7.6 965 64 | 8.6 966 81 | 9.6 967 100 | 10.6 968(10 rows) 969 970-- verify that values have been replicated to both replicas, and that each 971-- replica has received correct number of rows 972SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport; 973 nodename | nodeport | shardid | success | result 974--------------------------------------------------------------------- 975 localhost | 57637 | 4213625 | t | 2 976 localhost | 57638 | 4213625 | t | 2 977 localhost | 57637 | 4213626 | t | 3 978 localhost | 57638 | 4213626 | t | 3 979 localhost | 57637 | 4213627 | t | 3 980 localhost | 57638 | 4213627 | t | 3 981 localhost | 57637 | 4213628 | t | 2 982 localhost | 57638 | 4213628 | t | 2 983(8 rows) 984 985DROP TABLE source_table, target_table; 986-- 987-- Select column names should be unique 988-- 989SET citus.shard_replication_factor TO 1; 990SET citus.shard_count TO 4; 991CREATE TABLE source_table(a int, b int); 992SELECT create_distributed_table('source_table', 'a'); 993 create_distributed_table 994--------------------------------------------------------------------- 995 996(1 row) 997 998SET citus.shard_count TO 3; 999CREATE TABLE target_table(a int, b int, c int, d int, e int, f int); 1000SELECT create_distributed_table('target_table', 'a'); 1001 create_distributed_table 1002--------------------------------------------------------------------- 1003 1004(1 row) 1005 1006INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i; 1007SET client_min_messages TO DEBUG2; 1008INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; 1009DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 1010DEBUG: Router planner cannot handle multi-shard select queries 1011DEBUG: performing repartitioned INSERT ... SELECT 1012DEBUG: partitioning SELECT query by column index 0 with name 'a' 1013DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213633 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213629_to_0,repartitioned_results_xxxxx_from_4213630_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) 1014DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213634 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213630_to_1,repartitioned_results_xxxxx_from_4213631_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) 1015DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213635 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213632_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer) 1016RESET client_min_messages; 1017SELECT count(*) FROM target_table; 1018 count 1019--------------------------------------------------------------------- 1020 10 1021(1 row) 1022 1023-- 1024-- Disable repartitioned insert/select 1025-- 1026TRUNCATE target_table; 1027SET citus.enable_repartitioned_insert_select TO OFF; 1028EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; 1029 QUERY PLAN 1030--------------------------------------------------------------------- 1031 Custom Scan (Citus INSERT ... SELECT) 1032 INSERT/SELECT method: pull to coordinator 1033 -> Custom Scan (Citus Adaptive) 1034 Task Count: 4 1035 Tasks Shown: One of 4 1036 -> Task 1037 Node: host=localhost port=xxxxx dbname=regression 1038 -> Seq Scan on source_table_4213629 source_table 1039(8 rows) 1040 1041SET client_min_messages TO DEBUG2; 1042INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; 1043DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 1044DEBUG: Router planner cannot handle multi-shard select queries 1045DEBUG: Collecting INSERT ... SELECT results on coordinator 1046RESET client_min_messages; 1047SELECT count(*) FROM target_table; 1048 count 1049--------------------------------------------------------------------- 1050 10 1051(1 row) 1052 1053SET citus.enable_repartitioned_insert_select TO ON; 1054EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; 1055 QUERY PLAN 1056--------------------------------------------------------------------- 1057 Custom Scan (Citus INSERT ... SELECT) 1058 INSERT/SELECT method: repartition 1059 -> Custom Scan (Citus Adaptive) 1060 Task Count: 4 1061 Tasks Shown: One of 4 1062 -> Task 1063 Node: host=localhost port=xxxxx dbname=regression 1064 -> Seq Scan on source_table_4213629 source_table 1065(8 rows) 1066 1067DROP TABLE source_table, target_table; 1068-- 1069-- Don't use INSERT/SELECT repartition with repartition joins 1070-- 1071create table test(x int, y int); 1072select create_distributed_table('test', 'x'); 1073 create_distributed_table 1074--------------------------------------------------------------------- 1075 1076(1 row) 1077 1078set citus.enable_repartition_joins to true; 1079INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; 1080EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); 1081 QUERY PLAN 1082--------------------------------------------------------------------- 1083 Custom Scan (Citus INSERT ... SELECT) 1084 INSERT/SELECT method: pull to coordinator 1085 -> Custom Scan (Citus Adaptive) 1086 Task Count: 4 1087 Tasks Shown: None, not supported for re-partition queries 1088 -> MapMergeJob 1089 Map Task Count: 3 1090 Merge Task Count: 4 1091 -> MapMergeJob 1092 Map Task Count: 3 1093 Merge Task Count: 4 1094(11 rows) 1095 1096SET client_min_messages TO DEBUG1; 1097INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y); 1098DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match 1099DEBUG: Collecting INSERT ... SELECT results on coordinator 1100RESET client_min_messages; 1101SELECT count(*) FROM test; 1102 count 1103--------------------------------------------------------------------- 1104 20 1105(1 row) 1106 1107TRUNCATE test; 1108INSERT INTO test SELECT i, i FROM generate_series(1, 10) i; 1109EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); 1110 QUERY PLAN 1111--------------------------------------------------------------------- 1112 Custom Scan (Citus INSERT ... SELECT) 1113 INSERT/SELECT method: pull to coordinator 1114 -> Custom Scan (Citus Adaptive) 1115 Task Count: 4 1116 Tasks Shown: None, not supported for re-partition queries 1117 -> MapMergeJob 1118 Map Task Count: 3 1119 Merge Task Count: 4 1120 -> MapMergeJob 1121 Map Task Count: 3 1122 Merge Task Count: 4 1123(11 rows) 1124 1125SET client_min_messages TO DEBUG1; 1126INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y); 1127DEBUG: Router planner cannot handle multi-shard select queries 1128DEBUG: Collecting INSERT ... SELECT results on coordinator 1129RESET client_min_messages; 1130SELECT count(*) FROM test; 1131 count 1132--------------------------------------------------------------------- 1133 20 1134(1 row) 1135 1136-- 1137-- In the following case we coerce some columns and move uncoerced versions to the 1138-- end of SELECT list. The following case verifies that we rename those columns so 1139-- we don't get "column reference is ambiguous" errors. 1140-- 1141CREATE TABLE target_table( 1142 c1 int, 1143 c2 int, 1144 c3 timestamp, 1145 a int, 1146 b int, 1147 c int, 1148 c4 int, 1149 c5 int, 1150 c6 int[], 1151 cardinality int, 1152 sum int, 1153 PRIMARY KEY (c1, c2, c3, c4, c5, c6) 1154); 1155SET citus.shard_count TO 5; 1156SELECT create_distributed_table('target_table', 'c1'); 1157 create_distributed_table 1158--------------------------------------------------------------------- 1159 1160(1 row) 1161 1162CREATE TABLE source_table( 1163 c1 int, 1164 c2 int, 1165 c3 date, 1166 c4 int, 1167 cardinality int, 1168 sum int 1169); 1170SET citus.shard_count TO 4; 1171SELECT create_distributed_table('source_table', 'c1'); 1172 create_distributed_table 1173--------------------------------------------------------------------- 1174 1175(1 row) 1176 1177CREATE OR REPLACE FUNCTION dist_func(a int, b int) RETURNS int[] 1178AS $$ 1179BEGIN 1180 RETURN array_fill(a, ARRAY[b]); 1181END; 1182$$ 1183LANGUAGE plpgsql STABLE; 1184SELECT create_distributed_function('dist_func(int, int)'); 1185 create_distributed_function 1186--------------------------------------------------------------------- 1187 1188(1 row) 1189 1190SET client_min_messages TO DEBUG; 1191SET citus.enable_unique_job_ids TO off; 1192INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5); 1193DEBUG: Creating router plan 1194DEBUG: query has a single distribution column value: 1 1195INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5); 1196DEBUG: Creating router plan 1197DEBUG: query has a single distribution column value: 1 1198INSERT INTO source_table VALUES (3,4, '2020-02-02', 3, 4, 5); 1199DEBUG: Creating router plan 1200DEBUG: query has a single distribution column value: 3 1201INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum) 1202SELECT c1, c2, c3, c4, -1::float AS c5, 1203 dist_func(c1, 4) c6, 1204 sum(cardinality), 1205 sum(sum) 1206FROM source_table 1207GROUP BY c1, c2, c3, c4, c5, c6 1208ON CONFLICT(c1, c2, c3, c4, c5, c6) 1209DO UPDATE SET 1210 cardinality = enriched.cardinality + excluded.cardinality, 1211 sum = enriched.sum + excluded.sum; 1212DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT 1213DEBUG: Router planner cannot handle multi-shard select queries 1214DEBUG: performing repartitioned INSERT ... SELECT 1215DEBUG: partitioning SELECT query by column index 0 with name 'c1' 1216DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213639 AS enriched (c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, c5, c6, cardinality, sum FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213644_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(c1 integer, c2 integer, c3 timestamp without time zone, c4 integer, c5 integer, c6 integer[], cardinality integer, sum integer) ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = (enriched.cardinality OPERATOR(pg_catalog.+) excluded.cardinality), sum = (enriched.sum OPERATOR(pg_catalog.+) excluded.sum) 1217DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213641 AS enriched (c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, c5, c6, cardinality, sum FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213645_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(c1 integer, c2 integer, c3 timestamp without time zone, c4 integer, c5 integer, c6 integer[], cardinality integer, sum integer) ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = (enriched.cardinality OPERATOR(pg_catalog.+) excluded.cardinality), sum = (enriched.sum OPERATOR(pg_catalog.+) excluded.sum) 1218RESET client_min_messages; 1219EXPLAIN (COSTS OFF) INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum) 1220SELECT c1, c2, c3, c4, -1::float AS c5, 1221 dist_func(c1, 4) c6, 1222 sum(cardinality), 1223 sum(sum) 1224FROM source_table 1225GROUP BY c1, c2, c3, c4, c5, c6 1226ON CONFLICT(c1, c2, c3, c4, c5, c6) 1227DO UPDATE SET 1228 cardinality = enriched.cardinality + excluded.cardinality, 1229 sum = enriched.sum + excluded.sum; 1230 QUERY PLAN 1231--------------------------------------------------------------------- 1232 Custom Scan (Citus INSERT ... SELECT) 1233 INSERT/SELECT method: repartition 1234 -> Custom Scan (Citus Adaptive) 1235 Task Count: 4 1236 Tasks Shown: One of 4 1237 -> Task 1238 Node: host=localhost port=xxxxx dbname=regression 1239 -> HashAggregate 1240 Group Key: c1, c2, c3, c4, '-1'::double precision, insert_select_repartition.dist_func(c1, 4) 1241 -> Seq Scan on source_table_4213644 source_table 1242(10 rows) 1243 1244-- verify that we don't report repartitioned insert/select for tables 1245-- with sequences. See https://github.com/citusdata/citus/issues/3936 1246create table table_with_sequences (x int, y int, z bigserial); 1247insert into table_with_sequences values (1,1); 1248select create_distributed_table('table_with_sequences','x'); 1249NOTICE: Copying data from local table... 1250NOTICE: copying the data has completed 1251 create_distributed_table 1252--------------------------------------------------------------------- 1253 1254(1 row) 1255 1256explain (costs off) insert into table_with_sequences select y, x from table_with_sequences; 1257 QUERY PLAN 1258--------------------------------------------------------------------- 1259 Custom Scan (Citus INSERT ... SELECT) 1260 INSERT/SELECT method: pull to coordinator 1261 -> Custom Scan (Citus Adaptive) 1262 Task Count: 4 1263 Tasks Shown: One of 4 1264 -> Task 1265 Node: host=localhost port=xxxxx dbname=regression 1266 -> Seq Scan on table_with_sequences_4213648 table_with_sequences 1267(8 rows) 1268 1269-- verify that we don't report repartitioned insert/select for tables 1270-- with user-defined sequences. 1271CREATE SEQUENCE user_defined_sequence; 1272create table table_with_user_sequences (x int, y int, z bigint default nextval('user_defined_sequence')); 1273insert into table_with_user_sequences values (1,1); 1274select create_distributed_table('table_with_user_sequences','x'); 1275NOTICE: Copying data from local table... 1276NOTICE: copying the data has completed 1277 create_distributed_table 1278--------------------------------------------------------------------- 1279 1280(1 row) 1281 1282explain (costs off) insert into table_with_user_sequences select y, x from table_with_user_sequences; 1283 QUERY PLAN 1284--------------------------------------------------------------------- 1285 Custom Scan (Citus INSERT ... SELECT) 1286 INSERT/SELECT method: pull to coordinator 1287 -> Custom Scan (Citus Adaptive) 1288 Task Count: 4 1289 Tasks Shown: One of 4 1290 -> Task 1291 Node: host=localhost port=xxxxx dbname=regression 1292 -> Seq Scan on table_with_user_sequences_4213652 table_with_user_sequences 1293(8 rows) 1294 1295-- clean-up 1296SET client_min_messages TO WARNING; 1297DROP SCHEMA insert_select_repartition CASCADE; 1298