1-- tests behaviour of INSERT INTO ... SELECT with repartitioning
2CREATE SCHEMA insert_select_repartition;
3SET search_path TO 'insert_select_repartition';
4SET citus.next_shard_id TO 4213581;
5SET citus.shard_replication_factor TO 1;
6-- 4 shards, hash distributed.
7-- Negate distribution column value.
8SET citus.shard_count TO 4;
9CREATE TABLE source_table(a int);
10SELECT create_distributed_table('source_table', 'a');
11 create_distributed_table
12---------------------------------------------------------------------
13
14(1 row)
15
16INSERT INTO source_table SELECT * FROM generate_series(1, 10);
17CREATE TABLE target_table(a int);
18SELECT create_distributed_table('target_table', 'a');
19 create_distributed_table
20---------------------------------------------------------------------
21
22(1 row)
23
24SET client_min_messages TO DEBUG2;
25INSERT INTO target_table SELECT -a FROM source_table;
26DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
27DETAIL:  Subquery contains an operator in the same position as the target table's partition column.
28HINT:  Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
29DEBUG:  Router planner cannot handle multi-shard select queries
30DEBUG:  performing repartitioned INSERT ... SELECT
31DEBUG:  partitioning SELECT query by column index 0 with name 'a'
32DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213583_to_0,repartitioned_results_xxxxx_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer)
33DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213586 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213582_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer)
34DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_2,repartitioned_results_xxxxx_from_4213582_to_2,repartitioned_results_xxxxx_from_4213584_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer)
35DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213588 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer)
36RESET client_min_messages;
37SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a;
38 a
39---------------------------------------------------------------------
40 -7
41 -3
42 -1
43(3 rows)
44
45DROP TABLE source_table, target_table;
46--
47-- range partitioning, composite distribution column
48--
49CREATE TYPE composite_key_type AS (f1 int, f2 text);
50-- source
51CREATE TABLE source_table(f1 int, key composite_key_type, value int, mapped_key composite_key_type);
52SELECT create_distributed_table('source_table', 'key', 'range');
53 create_distributed_table
54---------------------------------------------------------------------
55
56(1 row)
57
58CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
59INSERT INTO source_table VALUES (0, (0, 'a'), 1, (0, 'a'));
60INSERT INTO source_table VALUES (1, (1, 'b'), 2, (26, 'b'));
61INSERT INTO source_table VALUES (2, (2, 'c'), 3, (3, 'c'));
62INSERT INTO source_table VALUES (3, (4, 'd'), 4, (27, 'd'));
63INSERT INTO source_table VALUES (4, (30, 'e'), 5, (30, 'e'));
64INSERT INTO source_table VALUES (5, (31, 'f'), 6, (31, 'f'));
65INSERT INTO source_table VALUES (6, (32, 'g'), 50, (8, 'g'));
66-- target
67CREATE TABLE target_table(f1 int DEFAULT 0, value int, key composite_key_type PRIMARY KEY);
68SELECT create_distributed_table('target_table', 'key', 'range');
69 create_distributed_table
70---------------------------------------------------------------------
71
72(1 row)
73
74CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
75SET client_min_messages TO DEBUG2;
76INSERT INTO target_table SELECT f1, value, mapped_key FROM source_table;
77DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
78DETAIL:  The target table's partition column should correspond to a partition column in the subquery.
79DEBUG:  Router planner cannot handle multi-shard select queries
80DEBUG:  performing repartitioned INSERT ... SELECT
81DEBUG:  partitioning SELECT query by column index 2 with name 'key'
82DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type)
83DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type)
84RESET client_min_messages;
85SELECT * FROM target_table ORDER BY key;
86 f1 | value |  key
87---------------------------------------------------------------------
88  0 |     1 | (0,a)
89  2 |     3 | (3,c)
90  6 |    50 | (8,g)
91  1 |     2 | (26,b)
92  3 |     4 | (27,d)
93  4 |     5 | (30,e)
94  5 |     6 | (31,f)
95(7 rows)
96
97SELECT * FROM target_table WHERE key = (26, 'b')::composite_key_type;
98 f1 | value |  key
99---------------------------------------------------------------------
100  1 |     2 | (26,b)
101(1 row)
102
103-- with explicit column names
104TRUNCATE target_table;
105SET client_min_messages TO DEBUG2;
106INSERT INTO target_table(value, key) SELECT value, mapped_key FROM source_table;
107DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
108DETAIL:  The target table's partition column should correspond to a partition column in the subquery.
109DEBUG:  Router planner cannot handle multi-shard select queries
110DEBUG:  performing repartitioned INSERT ... SELECT
111DEBUG:  partitioning SELECT query by column index 2 with name 'key'
112DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type)
113DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, key insert_select_repartition.composite_key_type)
114RESET client_min_messages;
115SELECT * FROM target_table ORDER BY key;
116 f1 | value |  key
117---------------------------------------------------------------------
118  0 |     1 | (0,a)
119  0 |     3 | (3,c)
120  0 |    50 | (8,g)
121  0 |     2 | (26,b)
122  0 |     4 | (27,d)
123  0 |     5 | (30,e)
124  0 |     6 | (31,f)
125(7 rows)
126
127-- missing value for a column
128TRUNCATE target_table;
129SET client_min_messages TO DEBUG2;
130INSERT INTO target_table(key) SELECT mapped_key AS key_renamed FROM source_table;
131DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
132DETAIL:  The target table's partition column should correspond to a partition column in the subquery.
133DEBUG:  Router planner cannot handle multi-shard select queries
134DEBUG:  performing repartitioned INSERT ... SELECT
135DEBUG:  partitioning SELECT query by column index 1 with name 'key'
136DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0,repartitioned_results_xxxxx_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type)
137DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type)
138RESET client_min_messages;
139SELECT * FROM target_table ORDER BY key;
140 f1 | value |  key
141---------------------------------------------------------------------
142  0 |       | (0,a)
143  0 |       | (3,c)
144  0 |       | (8,g)
145  0 |       | (26,b)
146  0 |       | (27,d)
147  0 |       | (30,e)
148  0 |       | (31,f)
149(7 rows)
150
151-- ON CONFLICT
152SET client_min_messages TO DEBUG2;
153INSERT INTO target_table(key)
154SELECT mapped_key AS key_renamed FROM source_table
155WHERE (mapped_key).f1 % 2 = 1
156ON CONFLICT (key) DO UPDATE SET f1=1;
157DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
158DETAIL:  The target table's partition column should correspond to a partition column in the subquery.
159DEBUG:  Router planner cannot handle multi-shard select queries
160DEBUG:  performing repartitioned INSERT ... SELECT
161DEBUG:  partitioning SELECT query by column index 1 with name 'key'
162DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1
163DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213589_to_1,repartitioned_results_xxxxx_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key insert_select_repartition.composite_key_type) ON CONFLICT(key) DO UPDATE SET f1 = 1
164RESET client_min_messages;
165SELECT * FROM target_table ORDER BY key;
166 f1 | value |  key
167---------------------------------------------------------------------
168  0 |       | (0,a)
169  1 |       | (3,c)
170  0 |       | (8,g)
171  0 |       | (26,b)
172  1 |       | (27,d)
173  0 |       | (30,e)
174  1 |       | (31,f)
175(7 rows)
176
177-- missing value for distribution column
178INSERT INTO target_table(value) SELECT value FROM source_table;
179ERROR:  the partition column of table insert_select_repartition.target_table should have a value
180DROP TABLE source_table, target_table;
181-- different column types
182-- verifies that we add necessary casts, otherwise even shard routing won't
183-- work correctly and we will see 2 values for the same primary key.
184CREATE TABLE target_table(col_1 int primary key, col_2 int);
185SELECT create_distributed_table('target_table','col_1');
186 create_distributed_table
187---------------------------------------------------------------------
188
189(1 row)
190
191INSERT INTO target_table VALUES (1,2), (2,3), (3,4), (4,5), (5,6);
192CREATE TABLE source_table(col_1 numeric, col_2 numeric, col_3 numeric);
193SELECT create_distributed_table('source_table','col_1');
194 create_distributed_table
195---------------------------------------------------------------------
196
197(1 row)
198
199INSERT INTO source_table VALUES (1,1,1), (3,3,3), (5,5,5);
200SET client_min_messages TO DEBUG2;
201INSERT INTO target_table
202SELECT
203	col_1, col_2
204FROM
205	source_table
206ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2;
207DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
208DETAIL:  The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
209DEBUG:  Router planner cannot handle multi-shard select queries
210DEBUG:  performing repartitioned INSERT ... SELECT
211DEBUG:  partitioning SELECT query by column index 0 with name 'col_1'
212DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213593 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213597_to_0,repartitioned_results_xxxxx_from_4213600_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2
213DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213594 AS citus_table_alias (col_1, col_2) SELECT col_1, col_2 FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213599_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2
214RESET client_min_messages;
215SELECT * FROM target_table ORDER BY 1;
216 col_1 | col_2
217---------------------------------------------------------------------
218     1 |     1
219     2 |     3
220     3 |     3
221     4 |     5
222     5 |     5
223(5 rows)
224
225DROP TABLE source_table, target_table;
226--
227-- array coercion
228--
229SET citus.shard_count TO 3;
230CREATE TABLE source_table(a int, mapped_key int, c float[]);
231SELECT create_distributed_table('source_table', 'a');
232 create_distributed_table
233---------------------------------------------------------------------
234
235(1 row)
236
237INSERT INTO source_table VALUES (1, -1, ARRAY[1.1, 2.2, 3.3]), (2, -2, ARRAY[4.5, 5.8]),
238                                (3, -3, ARRAY[]::float[]), (4, -4, ARRAY[3.3]);
239SET citus.shard_count TO 2;
240CREATE TABLE target_table(a int, b int[]);
241SELECT create_distributed_table('target_table', 'a');
242 create_distributed_table
243---------------------------------------------------------------------
244
245(1 row)
246
247SET client_min_messages TO DEBUG1;
248INSERT INTO target_table SELECT mapped_key, c FROM source_table;
249DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
250DETAIL:  The target table's partition column should correspond to a partition column in the subquery.
251DEBUG:  performing repartitioned INSERT ... SELECT
252RESET client_min_messages;
253SELECT * FROM target_table ORDER BY a;
254 a  |    b
255---------------------------------------------------------------------
256 -4 | {3}
257 -3 | {}
258 -2 | {4,6}
259 -1 | {1,2,3}
260(4 rows)
261
262--
263-- worker queries can have more columns than necessary. ExpandWorkerTargetEntry()
264-- might add additional columns to the target list.
265--
266TRUNCATE target_table;
267\set VERBOSITY TERSE
268-- first verify that the SELECT query below fetches 3 projected columns from workers
269SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG;
270   CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
271DEBUG:  Router planner cannot handle multi-shard select queries
272NOTICE:  issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a
273NOTICE:  issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a
274NOTICE:  issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a
275RESET citus.log_remote_commands; RESET client_min_messages;
276DROP TABLE results;
277-- now verify that we don't write the extra columns to the intermediate result files and
278-- insertion to the target works fine.
279SET client_min_messages TO DEBUG1;
280INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
281DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
282DEBUG:  performing repartitioned INSERT ... SELECT
283RESET client_min_messages;
284SELECT * FROM target_table ORDER BY a;
285 a  |  b
286---------------------------------------------------------------------
287 -4 | {-4}
288 -3 | {-3}
289 -2 | {-2}
290 -1 | {-1}
291(4 rows)
292
293--
294-- repartitioned INSERT/SELECT followed/preceded by other DML in same transaction
295--
296-- case 1. followed by DELETE
297TRUNCATE target_table;
298BEGIN;
299INSERT INTO target_table SELECT mapped_key, c FROM source_table;
300SELECT * FROM target_table ORDER BY a;
301 a  |    b
302---------------------------------------------------------------------
303 -4 | {3}
304 -3 | {}
305 -2 | {4,6}
306 -1 | {1,2,3}
307(4 rows)
308
309DELETE FROM target_table;
310END;
311SELECT * FROM target_table ORDER BY a;
312 a | b
313---------------------------------------------------------------------
314(0 rows)
315
316-- case 2. followed by UPDATE
317TRUNCATE target_table;
318BEGIN;
319INSERT INTO target_table SELECT mapped_key, c FROM source_table;
320SELECT * FROM target_table ORDER BY a;
321 a  |    b
322---------------------------------------------------------------------
323 -4 | {3}
324 -3 | {}
325 -2 | {4,6}
326 -1 | {1,2,3}
327(4 rows)
328
329UPDATE target_table SET b=array_append(b, a);
330END;
331SELECT * FROM target_table ORDER BY a;
332 a  |     b
333---------------------------------------------------------------------
334 -4 | {3,-4}
335 -3 | {-3}
336 -2 | {4,6,-2}
337 -1 | {1,2,3,-1}
338(4 rows)
339
340-- case 3. followed by multi-row INSERT
341TRUNCATE target_table;
342BEGIN;
343INSERT INTO target_table SELECT mapped_key, c FROM source_table;
344SELECT * FROM target_table ORDER BY a;
345 a  |    b
346---------------------------------------------------------------------
347 -4 | {3}
348 -3 | {}
349 -2 | {4,6}
350 -1 | {1,2,3}
351(4 rows)
352
353INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]);
354END;
355SELECT * FROM target_table ORDER BY a;
356 a  |    b
357---------------------------------------------------------------------
358 -7 | {999}
359 -6 | {11,12}
360 -5 | {10,11}
361 -4 | {3}
362 -3 | {}
363 -2 | {4,6}
364 -1 | {1,2,3}
365(7 rows)
366
367-- case 4. followed by distributed INSERT/SELECT
368TRUNCATE target_table;
369BEGIN;
370INSERT INTO target_table SELECT mapped_key, c FROM source_table;
371SELECT * FROM target_table ORDER BY a;
372 a  |    b
373---------------------------------------------------------------------
374 -4 | {3}
375 -3 | {}
376 -2 | {4,6}
377 -1 | {1,2,3}
378(4 rows)
379
380INSERT INTO target_table SELECT * FROM target_table;
381END;
382SELECT * FROM target_table ORDER BY a;
383 a  |    b
384---------------------------------------------------------------------
385 -4 | {3}
386 -4 | {3}
387 -3 | {}
388 -3 | {}
389 -2 | {4,6}
390 -2 | {4,6}
391 -1 | {1,2,3}
392 -1 | {1,2,3}
393(8 rows)
394
395-- case 5. preceded by DELETE
396TRUNCATE target_table;
397BEGIN;
398DELETE FROM target_table;
399INSERT INTO target_table SELECT mapped_key, c FROM source_table;
400END;
401SELECT * FROM target_table ORDER BY a;
402 a  |    b
403---------------------------------------------------------------------
404 -4 | {3}
405 -3 | {}
406 -2 | {4,6}
407 -1 | {1,2,3}
408(4 rows)
409
410-- case 6. preceded by UPDATE
411TRUNCATE target_table;
412BEGIN;
413UPDATE target_table SET b=array_append(b, a);
414INSERT INTO target_table SELECT mapped_key, c FROM source_table;
415END;
416SELECT * FROM target_table ORDER BY a;
417 a  |    b
418---------------------------------------------------------------------
419 -4 | {3}
420 -3 | {}
421 -2 | {4,6}
422 -1 | {1,2,3}
423(4 rows)
424
425-- case 7. preceded by multi-row INSERT
426TRUNCATE target_table;
427BEGIN;
428INSERT INTO target_table VALUES (-5, ARRAY[10,11]), (-6, ARRAY[11,12]), (-7, ARRAY[999]);
429INSERT INTO target_table SELECT mapped_key, c FROM source_table;
430END;
431SELECT * FROM target_table ORDER BY a;
432 a  |    b
433---------------------------------------------------------------------
434 -7 | {999}
435 -6 | {11,12}
436 -5 | {10,11}
437 -4 | {3}
438 -3 | {}
439 -2 | {4,6}
440 -1 | {1,2,3}
441(7 rows)
442
443-- case 8. preceded by distributed INSERT/SELECT
444TRUNCATE target_table;
445INSERT INTO target_table SELECT mapped_key, c FROM source_table;
446BEGIN;
447INSERT INTO target_table SELECT * FROM target_table;
448INSERT INTO target_table SELECT mapped_key, c FROM source_table;
449END;
450SELECT * FROM target_table ORDER BY a;
451 a  |    b
452---------------------------------------------------------------------
453 -4 | {3}
454 -4 | {3}
455 -4 | {3}
456 -3 | {}
457 -3 | {}
458 -3 | {}
459 -2 | {4,6}
460 -2 | {4,6}
461 -2 | {4,6}
462 -1 | {1,2,3}
463 -1 | {1,2,3}
464 -1 | {1,2,3}
465(12 rows)
466
467--
468-- repartitioned INSERT/SELECT with RETURNING
469--
470TRUNCATE target_table;
471SET client_min_messages TO DEBUG1;
472WITH c AS (
473    INSERT INTO target_table
474    SELECT mapped_key, c FROM source_table
475    RETURNING *)
476SELECT * FROM c ORDER by a;
477DEBUG:  generating subplan XXX_1 for CTE c: INSERT INTO insert_select_repartition.target_table (a, b) SELECT mapped_key, c FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
478DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
479DEBUG:  Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer[])) c ORDER BY a
480DEBUG:  performing repartitioned INSERT ... SELECT
481 a  |    b
482---------------------------------------------------------------------
483 -4 | {3}
484 -3 | {}
485 -2 | {4,6}
486 -1 | {1,2,3}
487(4 rows)
488
489RESET client_min_messages;
490--
491-- in combination with CTEs
492--
493TRUNCATE target_table;
494SET client_min_messages TO DEBUG1;
495WITH t AS (
496    SELECT mapped_key, a, c FROM source_table
497    WHERE a > floor(random())
498)
499INSERT INTO target_table
500SELECT mapped_key, c FROM t NATURAL JOIN source_table;
501DEBUG:  volatile functions are not allowed in distributed INSERT ... SELECT queries
502DEBUG:  generating subplan XXX_1 for CTE t: SELECT mapped_key, a, c FROM insert_select_repartition.source_table WHERE ((a)::double precision OPERATOR(pg_catalog.>) floor(random()))
503DEBUG:  Plan XXX query after replacing subqueries and CTEs: SELECT mapped_key AS a, (c)::integer[] AS b FROM (SELECT t.mapped_key, t.c FROM ((SELECT intermediate_result.mapped_key, intermediate_result.a, intermediate_result.c FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(mapped_key integer, a integer, c double precision[])) t JOIN insert_select_repartition.source_table USING (mapped_key, a, c))) citus_insert_select_subquery
504DEBUG:  performing repartitioned INSERT ... SELECT
505RESET client_min_messages;
506SELECT * FROM target_table ORDER BY a;
507 a  |    b
508---------------------------------------------------------------------
509 -4 | {3}
510 -3 | {}
511 -2 | {4,6}
512 -1 | {1,2,3}
513(4 rows)
514
515DROP TABLE source_table, target_table;
516--
517-- The case where select query has a GROUP BY ...
518--
519SET citus.shard_count TO 4;
520CREATE TABLE source_table(a int, b int);
521SELECT create_distributed_table('source_table', 'a');
522 create_distributed_table
523---------------------------------------------------------------------
524
525(1 row)
526
527SET citus.shard_count TO 3;
528CREATE TABLE target_table(a int, b int);
529SELECT create_distributed_table('target_table', 'a');
530 create_distributed_table
531---------------------------------------------------------------------
532
533(1 row)
534
535INSERT INTO source_table SELECT floor(i/4), i*i FROM generate_series(1, 20) i;
536SET client_min_messages TO DEBUG1;
537INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
538DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
539DEBUG:  performing repartitioned INSERT ... SELECT
540RESET client_min_messages;
541SELECT * FROM target_table ORDER BY a;
542 a |  b
543---------------------------------------------------------------------
544 0 |   9
545 1 |  49
546 2 | 121
547 3 | 225
548 4 | 361
549 5 | 400
550(6 rows)
551
552--
553-- EXPLAIN output should specify repartitioned INSERT/SELECT
554--
555EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
556                                                 QUERY PLAN
557---------------------------------------------------------------------
558 Custom Scan (Citus INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
559   INSERT/SELECT method: repartition
560   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=8)
561         Task Count: 4
562         Tasks Shown: One of 4
563         ->  Task
564               Node: host=localhost port=xxxxx dbname=regression
565               ->  HashAggregate  (cost=43.90..45.90 rows=200 width=8)
566                     Group Key: a
567                     ->  Seq Scan on source_table_4213606 source_table  (cost=0.00..32.60 rows=2260 width=8)
568(10 rows)
569
570--
571-- EXPLAIN ANALYZE is currently not supported
572--
573EXPLAIN ANALYZE INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
574ERROR:  EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning
575--
576-- Duplicate names in target list
577--
578TRUNCATE target_table;
579SET client_min_messages TO DEBUG2;
580INSERT INTO target_table
581 SELECT max(b), max(b) FROM source_table GROUP BY a;
582DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
583DEBUG:  Router planner cannot handle multi-shard select queries
584DEBUG:  performing repartitioned INSERT ... SELECT
585DEBUG:  partitioning SELECT query by column index 0 with name 'a'
586DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
587DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1,repartitioned_results_xxxxx_from_4213609_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
588DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_2,repartitioned_results_xxxxx_from_4213607_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
589RESET client_min_messages;
590SELECT * FROM target_table ORDER BY a;
591  a  |  b
592---------------------------------------------------------------------
593   9 |   9
594  49 |  49
595 121 | 121
596 225 | 225
597 361 | 361
598 400 | 400
599(6 rows)
600
601--
602-- Prepared INSERT/SELECT
603--
604TRUNCATE target_table;
605PREPARE insert_plan(int, int) AS
606INSERT INTO target_table
607  SELECT a, max(b) FROM source_table
608  WHERE a BETWEEN $1 AND $2 GROUP BY a;
609SET client_min_messages TO DEBUG1;
610EXECUTE insert_plan(0, 2);
611DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
612DEBUG:  performing repartitioned INSERT ... SELECT
613EXECUTE insert_plan(0, 2);
614DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
615DEBUG:  performing repartitioned INSERT ... SELECT
616EXECUTE insert_plan(0, 2);
617DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
618DEBUG:  performing repartitioned INSERT ... SELECT
619EXECUTE insert_plan(0, 2);
620DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
621DEBUG:  performing repartitioned INSERT ... SELECT
622EXECUTE insert_plan(0, 2);
623DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
624DEBUG:  performing repartitioned INSERT ... SELECT
625EXECUTE insert_plan(0, 2);
626DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
627DEBUG:  performing repartitioned INSERT ... SELECT
628EXECUTE insert_plan(2, 4);
629DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
630DEBUG:  performing repartitioned INSERT ... SELECT
631EXECUTE insert_plan(2, 4);
632DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
633DEBUG:  performing repartitioned INSERT ... SELECT
634EXECUTE insert_plan(2, 4);
635DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
636DEBUG:  performing repartitioned INSERT ... SELECT
637EXECUTE insert_plan(2, 4);
638DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
639DEBUG:  performing repartitioned INSERT ... SELECT
640EXECUTE insert_plan(2, 4);
641DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
642DEBUG:  performing repartitioned INSERT ... SELECT
643EXECUTE insert_plan(2, 4);
644DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
645DEBUG:  performing repartitioned INSERT ... SELECT
646RESET client_min_messages;
647SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
648 a | count | distinct_values
649---------------------------------------------------------------------
650 0 |     6 |               1
651 1 |     6 |               1
652 2 |    12 |               1
653 3 |     6 |               1
654 4 |     6 |               1
655(5 rows)
656
657DEALLOCATE insert_plan;
658--
659-- Prepared router INSERT/SELECT. We currently use pull to coordinator when the
660-- distributed query has a single task.
661--
662TRUNCATE target_table;
663PREPARE insert_plan(int) AS
664INSERT INTO target_table
665  SELECT a, max(b) FROM source_table
666  WHERE a=$1 GROUP BY a;
667SET client_min_messages TO DEBUG1;
668EXECUTE insert_plan(0);
669DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
670DEBUG:  Collecting INSERT ... SELECT results on coordinator
671EXECUTE insert_plan(0);
672DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
673DEBUG:  Collecting INSERT ... SELECT results on coordinator
674EXECUTE insert_plan(0);
675DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
676DEBUG:  Collecting INSERT ... SELECT results on coordinator
677EXECUTE insert_plan(0);
678DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
679DEBUG:  Collecting INSERT ... SELECT results on coordinator
680EXECUTE insert_plan(0);
681DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
682DEBUG:  Collecting INSERT ... SELECT results on coordinator
683EXECUTE insert_plan(0);
684DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
685DEBUG:  Collecting INSERT ... SELECT results on coordinator
686EXECUTE insert_plan(0);
687DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
688DEBUG:  Collecting INSERT ... SELECT results on coordinator
689RESET client_min_messages;
690SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
691 a | count | distinct_values
692---------------------------------------------------------------------
693 0 |     7 |               1
694(1 row)
695
696DEALLOCATE insert_plan;
697--
698-- Prepared INSERT/SELECT with no parameters.
699--
700TRUNCATE target_table;
701PREPARE insert_plan AS
702INSERT INTO target_table
703  SELECT a, max(b) FROM source_table
704  WHERE a BETWEEN 1 AND 2 GROUP BY a;
705EXPLAIN EXECUTE insert_plan;
706                                                   QUERY PLAN
707---------------------------------------------------------------------
708 Custom Scan (Citus INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
709   INSERT/SELECT method: repartition
710   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=8)
711         Task Count: 4
712         Tasks Shown: One of 4
713         ->  Task
714               Node: host=localhost port=xxxxx dbname=regression
715               ->  GroupAggregate  (cost=44.09..44.28 rows=11 width=8)
716                     Group Key: a
717                     ->  Sort  (cost=44.09..44.12 rows=11 width=8)
718                           Sort Key: a
719                           ->  Seq Scan on source_table_4213606 source_table  (cost=0.00..43.90 rows=11 width=8)
720                                 Filter: ((a >= 1) AND (a <= 2))
721(13 rows)
722
723SET client_min_messages TO DEBUG1;
724EXECUTE insert_plan;
725DEBUG:  performing repartitioned INSERT ... SELECT
726EXECUTE insert_plan;
727DEBUG:  performing repartitioned INSERT ... SELECT
728EXECUTE insert_plan;
729DEBUG:  performing repartitioned INSERT ... SELECT
730EXECUTE insert_plan;
731DEBUG:  performing repartitioned INSERT ... SELECT
732EXECUTE insert_plan;
733DEBUG:  performing repartitioned INSERT ... SELECT
734EXECUTE insert_plan;
735DEBUG:  performing repartitioned INSERT ... SELECT
736EXECUTE insert_plan;
737DEBUG:  performing repartitioned INSERT ... SELECT
738RESET client_min_messages;
739SELECT a, count(*), count(distinct b) distinct_values FROM target_table GROUP BY a ORDER BY a;
740 a | count | distinct_values
741---------------------------------------------------------------------
742 1 |     7 |               1
743 2 |     7 |               1
744(2 rows)
745
746DEALLOCATE insert_plan;
747--
748-- INSERT/SELECT in CTE
749--
750TRUNCATE target_table;
751SET client_min_messages TO DEBUG2;
752WITH r AS (
753  INSERT INTO target_table SELECT * FROM source_table RETURNING *
754)
755INSERT INTO target_table SELECT source_table.a, max(source_table.b) FROM source_table NATURAL JOIN r GROUP BY source_table.a;
756DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
757DEBUG:  only SELECT, UPDATE, or DELETE common table expressions may be router planned
758DEBUG:  generating subplan XXX_1 for CTE r: INSERT INTO insert_select_repartition.target_table (a, b) SELECT a, b FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
759DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
760DEBUG:  Router planner cannot handle multi-shard select queries
761DEBUG:  Plan XXX query after replacing subqueries and CTEs: SELECT a, max AS b FROM (SELECT source_table.a, max(source_table.b) AS max FROM (insert_select_repartition.source_table JOIN (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) r USING (a, b)) GROUP BY source_table.a) citus_insert_select_subquery
762DEBUG:  Router planner cannot handle multi-shard select queries
763DEBUG:  performing repartitioned INSERT ... SELECT
764DEBUG:  performing repartitioned INSERT ... SELECT
765DEBUG:  partitioning SELECT query by column index 0 with name 'a'
766DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
767DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
768DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213609_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
769DEBUG:  partitioning SELECT query by column index 0 with name 'a'
770DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213610 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213606_to_0,repartitioned_results_xxxxx_from_4213607_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
771DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213611 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213607_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
772DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213612 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213609_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
773RESET client_min_messages;
774SELECT * FROM target_table ORDER BY a, b;
775 a |  b
776---------------------------------------------------------------------
777 0 |   1
778 0 |   4
779 0 |   9
780 0 |   9
781 1 |  16
782 1 |  25
783 1 |  36
784 1 |  49
785 1 |  49
786 2 |  64
787 2 |  81
788 2 | 100
789 2 | 121
790 2 | 121
791 3 | 144
792 3 | 169
793 3 | 196
794 3 | 225
795 3 | 225
796 4 | 256
797 4 | 289
798 4 | 324
799 4 | 361
800 4 | 361
801 5 | 400
802 5 | 400
803(26 rows)
804
805DROP TABLE source_table, target_table;
806--
807-- Constraint failure and rollback
808--
809SET citus.shard_count TO 4;
810CREATE TABLE source_table(a int, b int);
811SELECT create_distributed_table('source_table', 'a');
812 create_distributed_table
813---------------------------------------------------------------------
814
815(1 row)
816
817INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i;
818UPDATE source_table SET b = NULL where b IN (9, 4);
819SET citus.shard_replication_factor TO 2;
820CREATE TABLE target_table(a int, b int not null);
821SELECT create_distributed_table('target_table', 'a', 'range');
822 create_distributed_table
823---------------------------------------------------------------------
824
825(1 row)
826
827CALL public.create_range_partitioned_shards('target_table', '{0,3,6,9}','{2,5,8,50}');
828INSERT INTO target_table VALUES (11,9), (22,4);
829EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table;
830                           QUERY PLAN
831---------------------------------------------------------------------
832 Custom Scan (Citus INSERT ... SELECT)
833   INSERT/SELECT method: repartition
834   ->  Custom Scan (Citus Adaptive)
835         Task Count: 4
836         Tasks Shown: One of 4
837         ->  Task
838               Node: host=localhost port=xxxxx dbname=regression
839               ->  Seq Scan on source_table_4213613 source_table
840(8 rows)
841
842EXPLAIN (costs off) INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL;
843                           QUERY PLAN
844---------------------------------------------------------------------
845 Custom Scan (Citus INSERT ... SELECT)
846   INSERT/SELECT method: repartition
847   ->  Custom Scan (Citus Adaptive)
848         Task Count: 4
849         Tasks Shown: One of 4
850         ->  Task
851               Node: host=localhost port=xxxxx dbname=regression
852               ->  Seq Scan on source_table_4213613 source_table
853                     Filter: (b IS NOT NULL)
854(9 rows)
855
856BEGIN;
857SAVEPOINT s1;
858INSERT INTO target_table SELECT * FROM source_table;
859ERROR:  null value in column "b" violates not-null constraint
860ROLLBACK TO SAVEPOINT s1;
861INSERT INTO target_table SELECT * FROM source_table WHERE b IS NOT NULL;
862END;
863SELECT * FROM target_table ORDER BY b;
864 a  |  b
865---------------------------------------------------------------------
866  1 |   1
867 22 |   4
868 11 |   9
869  4 |  16
870  5 |  25
871  6 |  36
872  7 |  49
873  8 |  64
874  9 |  81
875 10 | 100
876(10 rows)
877
878-- verify that values have been replicated to both replicas
879SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport;
880 nodename  | nodeport | shardid | success | result
881---------------------------------------------------------------------
882 localhost |    57637 | 4213617 | t       | 1
883 localhost |    57638 | 4213617 | t       | 1
884 localhost |    57637 | 4213618 | t       | 2
885 localhost |    57638 | 4213618 | t       | 2
886 localhost |    57637 | 4213619 | t       | 3
887 localhost |    57638 | 4213619 | t       | 3
888 localhost |    57637 | 4213620 | t       | 4
889 localhost |    57638 | 4213620 | t       | 4
890(8 rows)
891
892--
893-- Multiple casts in the SELECT query
894--
895TRUNCATE target_table;
896SET client_min_messages TO DEBUG2;
897INSERT INTO target_table SELECT 1.12, b::bigint FROM source_table WHERE b IS NOT NULL;
898DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
899DEBUG:  Router planner cannot handle multi-shard select queries
900DEBUG:  performing repartitioned INSERT ... SELECT
901DEBUG:  partitioning SELECT query by column index 0 with name 'a'
902DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213617 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213613_to_0,repartitioned_results_xxxxx_from_4213614_to_0,repartitioned_results_xxxxx_from_4213615_to_0,repartitioned_results_xxxxx_from_4213616_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
903RESET client_min_messages;
904SELECT * FROM target_table ORDER BY a, b;
905 a |  b
906---------------------------------------------------------------------
907 1 |   1
908 1 |  16
909 1 |  25
910 1 |  36
911 1 |  49
912 1 |  64
913 1 |  81
914 1 | 100
915(8 rows)
916
917--
918-- ROLLBACK after out of range error
919--
920TRUNCATE target_table;
921BEGIN;
922INSERT INTO target_table SELECT a * 10, b FROM source_table WHERE b IS NOT NULL;
923ERROR:  could not find shard for partition column value
924END;
925SELECT max(result) FROM run_command_on_placements('target_table', 'select count(*) from %s');
926 max
927---------------------------------------------------------------------
928 0
929(1 row)
930
931DROP TABLE source_table, target_table;
932--
933-- Range partitioned target's ranges doesn't cover the whole range
934--
935SET citus.shard_replication_factor TO 2;
936SET citus.shard_count TO 4;
937CREATE TABLE source_table(a int, b int);
938SELECT create_distributed_table('source_table', 'a');
939 create_distributed_table
940---------------------------------------------------------------------
941
942(1 row)
943
944INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i;
945SET citus.shard_replication_factor TO 2;
946CREATE TABLE target_table(b int not null, a float);
947SELECT create_distributed_table('target_table', 'a', 'range');
948 create_distributed_table
949---------------------------------------------------------------------
950
951(1 row)
952
953CALL public.create_range_partitioned_shards('target_table', '{0.0,3.5,6.5,9.5}','{2.9,5.9,8.9,50.0}');
954INSERT INTO target_table SELECT b, a+0.6 FROM source_table;
955SELECT * FROM target_table ORDER BY a;
956  b  |  a
957---------------------------------------------------------------------
958   1 |  1.6
959   4 |  2.6
960   9 |  3.6
961  16 |  4.6
962  25 |  5.6
963  36 |  6.6
964  49 |  7.6
965  64 |  8.6
966  81 |  9.6
967 100 | 10.6
968(10 rows)
969
970-- verify that values have been replicated to both replicas, and that each
971-- replica has received correct number of rows
972SELECT * FROM run_command_on_placements('target_table', 'select count(*) from %s') ORDER BY shardid, nodeport;
973 nodename  | nodeport | shardid | success | result
974---------------------------------------------------------------------
975 localhost |    57637 | 4213625 | t       | 2
976 localhost |    57638 | 4213625 | t       | 2
977 localhost |    57637 | 4213626 | t       | 3
978 localhost |    57638 | 4213626 | t       | 3
979 localhost |    57637 | 4213627 | t       | 3
980 localhost |    57638 | 4213627 | t       | 3
981 localhost |    57637 | 4213628 | t       | 2
982 localhost |    57638 | 4213628 | t       | 2
983(8 rows)
984
985DROP TABLE source_table, target_table;
986--
987-- Select column names should be unique
988--
989SET citus.shard_replication_factor TO 1;
990SET citus.shard_count TO 4;
991CREATE TABLE source_table(a int, b int);
992SELECT create_distributed_table('source_table', 'a');
993 create_distributed_table
994---------------------------------------------------------------------
995
996(1 row)
997
998SET citus.shard_count TO 3;
999CREATE TABLE target_table(a int, b int, c int, d int, e int, f int);
1000SELECT create_distributed_table('target_table', 'a');
1001 create_distributed_table
1002---------------------------------------------------------------------
1003
1004(1 row)
1005
1006INSERT INTO source_table SELECT i, i * i FROM generate_series(1, 10) i;
1007SET client_min_messages TO DEBUG2;
1008INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
1009DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
1010DEBUG:  Router planner cannot handle multi-shard select queries
1011DEBUG:  performing repartitioned INSERT ... SELECT
1012DEBUG:  partitioning SELECT query by column index 0 with name 'a'
1013DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213633 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213629_to_0,repartitioned_results_xxxxx_from_4213630_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer)
1014DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213634 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213630_to_1,repartitioned_results_xxxxx_from_4213631_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer)
1015DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213635 AS citus_table_alias (a, b, c, d) SELECT a, b, c, d FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213632_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer, c integer, d integer)
1016RESET client_min_messages;
1017SELECT count(*) FROM target_table;
1018 count
1019---------------------------------------------------------------------
1020    10
1021(1 row)
1022
1023--
1024-- Disable repartitioned insert/select
1025--
1026TRUNCATE target_table;
1027SET citus.enable_repartitioned_insert_select TO OFF;
1028EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
1029                           QUERY PLAN
1030---------------------------------------------------------------------
1031 Custom Scan (Citus INSERT ... SELECT)
1032   INSERT/SELECT method: pull to coordinator
1033   ->  Custom Scan (Citus Adaptive)
1034         Task Count: 4
1035         Tasks Shown: One of 4
1036         ->  Task
1037               Node: host=localhost port=xxxxx dbname=regression
1038               ->  Seq Scan on source_table_4213629 source_table
1039(8 rows)
1040
1041SET client_min_messages TO DEBUG2;
1042INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
1043DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
1044DEBUG:  Router planner cannot handle multi-shard select queries
1045DEBUG:  Collecting INSERT ... SELECT results on coordinator
1046RESET client_min_messages;
1047SELECT count(*) FROM target_table;
1048 count
1049---------------------------------------------------------------------
1050    10
1051(1 row)
1052
1053SET citus.enable_repartitioned_insert_select TO ON;
1054EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
1055                           QUERY PLAN
1056---------------------------------------------------------------------
1057 Custom Scan (Citus INSERT ... SELECT)
1058   INSERT/SELECT method: repartition
1059   ->  Custom Scan (Citus Adaptive)
1060         Task Count: 4
1061         Tasks Shown: One of 4
1062         ->  Task
1063               Node: host=localhost port=xxxxx dbname=regression
1064               ->  Seq Scan on source_table_4213629 source_table
1065(8 rows)
1066
1067DROP TABLE source_table, target_table;
1068--
1069-- Don't use INSERT/SELECT repartition with repartition joins
1070--
1071create table test(x int, y int);
1072select create_distributed_table('test', 'x');
1073 create_distributed_table
1074---------------------------------------------------------------------
1075
1076(1 row)
1077
1078set citus.enable_repartition_joins to true;
1079INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
1080EXPLAIN (costs off) INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
1081                            QUERY PLAN
1082---------------------------------------------------------------------
1083 Custom Scan (Citus INSERT ... SELECT)
1084   INSERT/SELECT method: pull to coordinator
1085   ->  Custom Scan (Citus Adaptive)
1086         Task Count: 4
1087         Tasks Shown: None, not supported for re-partition queries
1088         ->  MapMergeJob
1089               Map Task Count: 3
1090               Merge Task Count: 4
1091         ->  MapMergeJob
1092               Map Task Count: 3
1093               Merge Task Count: 4
1094(11 rows)
1095
1096SET client_min_messages TO DEBUG1;
1097INSERT INTO test(y, x) SELECT a.x, b.y FROM test a JOIN test b USING (y);
1098DEBUG:  cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
1099DEBUG:  Collecting INSERT ... SELECT results on coordinator
1100RESET client_min_messages;
1101SELECT count(*) FROM test;
1102 count
1103---------------------------------------------------------------------
1104    20
1105(1 row)
1106
1107TRUNCATE test;
1108INSERT INTO test SELECT i, i FROM generate_series(1, 10) i;
1109EXPLAIN (costs off) INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
1110                            QUERY PLAN
1111---------------------------------------------------------------------
1112 Custom Scan (Citus INSERT ... SELECT)
1113   INSERT/SELECT method: pull to coordinator
1114   ->  Custom Scan (Citus Adaptive)
1115         Task Count: 4
1116         Tasks Shown: None, not supported for re-partition queries
1117         ->  MapMergeJob
1118               Map Task Count: 3
1119               Merge Task Count: 4
1120         ->  MapMergeJob
1121               Map Task Count: 3
1122               Merge Task Count: 4
1123(11 rows)
1124
1125SET client_min_messages TO DEBUG1;
1126INSERT INTO test SELECT a.* FROM test a JOIN test b USING (y);
1127DEBUG:  Router planner cannot handle multi-shard select queries
1128DEBUG:  Collecting INSERT ... SELECT results on coordinator
1129RESET client_min_messages;
1130SELECT count(*) FROM test;
1131 count
1132---------------------------------------------------------------------
1133    20
1134(1 row)
1135
1136--
1137-- In the following case we coerce some columns and move uncoerced versions to the
1138-- end of SELECT list. The following case verifies that we rename those columns so
1139-- we don't get "column reference is ambiguous" errors.
1140--
1141CREATE TABLE target_table(
1142    c1 int,
1143    c2 int,
1144    c3 timestamp,
1145    a int,
1146    b int,
1147    c int,
1148    c4 int,
1149    c5 int,
1150    c6 int[],
1151    cardinality int,
1152    sum int,
1153    PRIMARY KEY (c1, c2, c3, c4, c5, c6)
1154);
1155SET citus.shard_count TO 5;
1156SELECT create_distributed_table('target_table', 'c1');
1157 create_distributed_table
1158---------------------------------------------------------------------
1159
1160(1 row)
1161
1162CREATE TABLE source_table(
1163    c1 int,
1164    c2 int,
1165    c3 date,
1166    c4 int,
1167    cardinality int,
1168    sum int
1169);
1170SET citus.shard_count TO 4;
1171SELECT create_distributed_table('source_table', 'c1');
1172 create_distributed_table
1173---------------------------------------------------------------------
1174
1175(1 row)
1176
1177CREATE OR REPLACE FUNCTION dist_func(a int, b int) RETURNS int[]
1178AS $$
1179BEGIN
1180 RETURN array_fill(a, ARRAY[b]);
1181END;
1182$$
1183LANGUAGE plpgsql STABLE;
1184SELECT create_distributed_function('dist_func(int, int)');
1185 create_distributed_function
1186---------------------------------------------------------------------
1187
1188(1 row)
1189
1190SET client_min_messages TO DEBUG;
1191SET citus.enable_unique_job_ids TO off;
1192INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5);
1193DEBUG:  Creating router plan
1194DEBUG:  query has a single distribution column value: 1
1195INSERT INTO source_table VALUES (1,2, '2020-02-02', 3, 4, 5);
1196DEBUG:  Creating router plan
1197DEBUG:  query has a single distribution column value: 1
1198INSERT INTO source_table VALUES (3,4, '2020-02-02', 3, 4, 5);
1199DEBUG:  Creating router plan
1200DEBUG:  query has a single distribution column value: 3
1201INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum)
1202SELECT c1, c2, c3, c4, -1::float AS c5,
1203       dist_func(c1, 4) c6,
1204       sum(cardinality),
1205       sum(sum)
1206FROM source_table
1207GROUP BY c1, c2, c3, c4, c5, c6
1208ON CONFLICT(c1, c2, c3, c4, c5, c6)
1209DO UPDATE SET
1210 cardinality = enriched.cardinality + excluded.cardinality,
1211 sum = enriched.sum + excluded.sum;
1212DEBUG:  INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
1213DEBUG:  Router planner cannot handle multi-shard select queries
1214DEBUG:  performing repartitioned INSERT ... SELECT
1215DEBUG:  partitioning SELECT query by column index 0 with name 'c1'
1216DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213639 AS enriched (c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, c5, c6, cardinality, sum FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213644_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(c1 integer, c2 integer, c3 timestamp without time zone, c4 integer, c5 integer, c6 integer[], cardinality integer, sum integer) ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = (enriched.cardinality OPERATOR(pg_catalog.+) excluded.cardinality), sum = (enriched.sum OPERATOR(pg_catalog.+) excluded.sum)
1217DEBUG:  distributed statement: INSERT INTO insert_select_repartition.target_table_4213641 AS enriched (c1, c2, c3, c4, c5, c6, cardinality, sum) SELECT c1, c2, c3, c4, c5, c6, cardinality, sum FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213645_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(c1 integer, c2 integer, c3 timestamp without time zone, c4 integer, c5 integer, c6 integer[], cardinality integer, sum integer) ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = (enriched.cardinality OPERATOR(pg_catalog.+) excluded.cardinality), sum = (enriched.sum OPERATOR(pg_catalog.+) excluded.sum)
1218RESET client_min_messages;
1219EXPLAIN (COSTS OFF) INSERT INTO target_table AS enriched(c1, c2, c3, c4, c5, c6, cardinality, sum)
1220SELECT c1, c2, c3, c4, -1::float AS c5,
1221       dist_func(c1, 4) c6,
1222       sum(cardinality),
1223       sum(sum)
1224FROM source_table
1225GROUP BY c1, c2, c3, c4, c5, c6
1226ON CONFLICT(c1, c2, c3, c4, c5, c6)
1227DO UPDATE SET
1228 cardinality = enriched.cardinality + excluded.cardinality,
1229 sum = enriched.sum + excluded.sum;
1230                                                    QUERY PLAN
1231---------------------------------------------------------------------
1232 Custom Scan (Citus INSERT ... SELECT)
1233   INSERT/SELECT method: repartition
1234   ->  Custom Scan (Citus Adaptive)
1235         Task Count: 4
1236         Tasks Shown: One of 4
1237         ->  Task
1238               Node: host=localhost port=xxxxx dbname=regression
1239               ->  HashAggregate
1240                     Group Key: c1, c2, c3, c4, '-1'::double precision, insert_select_repartition.dist_func(c1, 4)
1241                     ->  Seq Scan on source_table_4213644 source_table
1242(10 rows)
1243
1244-- verify that we don't report repartitioned insert/select for tables
1245-- with sequences. See https://github.com/citusdata/citus/issues/3936
1246create table table_with_sequences (x int, y int, z bigserial);
1247insert into table_with_sequences values (1,1);
1248select create_distributed_table('table_with_sequences','x');
1249NOTICE:  Copying data from local table...
1250NOTICE:  copying the data has completed
1251 create_distributed_table
1252---------------------------------------------------------------------
1253
1254(1 row)
1255
1256explain (costs off) insert into table_with_sequences select y, x from table_with_sequences;
1257                                   QUERY PLAN
1258---------------------------------------------------------------------
1259 Custom Scan (Citus INSERT ... SELECT)
1260   INSERT/SELECT method: pull to coordinator
1261   ->  Custom Scan (Citus Adaptive)
1262         Task Count: 4
1263         Tasks Shown: One of 4
1264         ->  Task
1265               Node: host=localhost port=xxxxx dbname=regression
1266               ->  Seq Scan on table_with_sequences_4213648 table_with_sequences
1267(8 rows)
1268
1269-- verify that we don't report repartitioned insert/select for tables
1270-- with user-defined sequences.
1271CREATE SEQUENCE user_defined_sequence;
1272create table table_with_user_sequences (x int, y int, z bigint default nextval('user_defined_sequence'));
1273insert into table_with_user_sequences values (1,1);
1274select create_distributed_table('table_with_user_sequences','x');
1275NOTICE:  Copying data from local table...
1276NOTICE:  copying the data has completed
1277 create_distributed_table
1278---------------------------------------------------------------------
1279
1280(1 row)
1281
1282explain (costs off) insert into table_with_user_sequences select y, x from table_with_user_sequences;
1283                                        QUERY PLAN
1284---------------------------------------------------------------------
1285 Custom Scan (Citus INSERT ... SELECT)
1286   INSERT/SELECT method: pull to coordinator
1287   ->  Custom Scan (Citus Adaptive)
1288         Task Count: 4
1289         Tasks Shown: One of 4
1290         ->  Task
1291               Node: host=localhost port=xxxxx dbname=regression
1292               ->  Seq Scan on table_with_user_sequences_4213652 table_with_user_sequences
1293(8 rows)
1294
1295-- clean-up
1296SET client_min_messages TO WARNING;
1297DROP SCHEMA insert_select_repartition CASCADE;
1298