1CREATE SCHEMA intermediate_result_pruning;
2SET search_path TO intermediate_result_pruning;
3SET citus.log_intermediate_results TO TRUE;
4
5SET citus.shard_count TO 4;
6SET citus.next_shard_id TO 1480000;
7SET citus.shard_replication_factor = 1;
8
9CREATE TABLE table_1 (key int, value text);
10SELECT create_distributed_table('table_1', 'key');
11
12CREATE TABLE table_2 (key int, value text);
13SELECT create_distributed_table('table_2', 'key');
14
15
16CREATE TABLE table_3 (key int, value text);
17SELECT create_distributed_table('table_3', 'key');
18
19CREATE TABLE ref_table (key int, value text);
20SELECT create_reference_table('ref_table');
21
22
23-- load some data
24INSERT INTO table_1    VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
25INSERT INTO table_2    VALUES                     (3, '3'), (4, '4'), (5, '5'), (6, '6');
26INSERT INTO table_3    VALUES                     (3, '3'), (4, '4'), (5, '5'), (6, '6');
27INSERT INTO ref_table  VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
28
29-- see which workers are hit for intermediate results
30SET client_min_messages TO DEBUG1;
31
32-- a very basic case, where the intermediate result
33-- should go to both workers
34WITH some_values_1 AS MATERIALIZED
35	(SELECT key FROM table_1 WHERE value IN ('3', '4'))
36SELECT
37	count(*)
38FROM
39	some_values_1 JOIN table_2 USING (key);
40
41
42-- a very basic case, where the intermediate result
43-- should only go to one worker because the final query is a router
44-- we use random() to prevent postgres inline the CTE(s)
45WITH some_values_1 AS MATERIALIZED
46	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4'))
47SELECT
48	count(*)
49FROM
50	some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1;
51
52-- a similar query, but with a reference table now
53-- given that reference tables are replicated to all nodes
54-- we have to broadcast to all nodes
55WITH some_values_1 AS MATERIALIZED
56	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4'))
57SELECT
58	count(*)
59FROM
60	some_values_1 JOIN ref_table USING (key);
61
62
63-- a similar query as above, but this time use the CTE inside
64-- another CTE
65WITH some_values_1 AS MATERIALIZED
66	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
67	some_values_2 AS MATERIALIZED
68	(SELECT key, random() FROM some_values_1)
69SELECT
70	count(*)
71FROM
72	some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 1;
73
74-- the second CTE does a join with a distributed table
75-- and the final query is a router query
76WITH some_values_1 AS MATERIALIZED
77	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
78	some_values_2 AS MATERIALIZED
79	(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key))
80SELECT
81	count(*)
82FROM
83	some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 3;
84
85-- the first CTE is used both within second CTE and the final query
86-- the second CTE does a join with a distributed table
87-- and the final query is a router query
88WITH some_values_1 AS MATERIALIZED
89	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
90	some_values_2 AS MATERIALIZED
91	(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key))
92SELECT
93	count(*)
94FROM
95	(some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3;
96
97-- the first CTE is used both within second CTE and the final query
98-- the second CTE does a join with a distributed table but a router query on a worker
99-- and the final query is another router query on another worker
100WITH some_values_1 AS MATERIALIZED
101	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
102	some_values_2 AS MATERIALIZED
103	(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1)
104SELECT
105	count(*)
106FROM
107	(some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3;
108
109-- the first CTE is used both within second CTE and the final query
110-- the second CTE does a join with a distributed table but a router query on a worker
111-- and the final query is a router query on the same worker, so the first result is only
112-- broadcasted to a single node
113WITH some_values_1 AS MATERIALIZED
114	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
115	some_values_2 AS MATERIALIZED
116	(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1)
117SELECT
118	count(*)
119FROM
120	(some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 1;
121
122-- the same query with the above, but the final query is hitting all shards
123WITH some_values_1 AS MATERIALIZED
124	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
125	some_values_2 AS MATERIALIZED
126	(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key))
127SELECT
128	count(*)
129FROM
130	(some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3;
131
132-- even if we add a filter on the first query and make it a router query,
133-- the first intermediate result still hits all workers because of the final
134-- join is hitting all workers
135WITH some_values_1 AS MATERIALIZED
136	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
137	some_values_2 AS MATERIALIZED
138	(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 3)
139SELECT
140	count(*)
141FROM
142	(some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3;
143
144-- the reference table is joined with a distributed table and an intermediate
145-- result, but the distributed table hits all shards, so the intermediate
146-- result is sent to all nodes
147WITH some_values_1 AS MATERIALIZED
148	(SELECT key, random() FROM ref_table WHERE value IN ('3', '4'))
149SELECT
150	count(*)
151FROM
152	(some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key);
153
154-- similar query as above, but this time the whole query is a router
155-- query, so no intermediate results
156WITH some_values_1 AS MATERIALIZED
157	(SELECT key, random() FROM ref_table WHERE value IN ('3', '4'))
158SELECT
159	count(*)
160FROM
161	(some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key) WHERE table_2.key = 1;
162
163
164-- now, the second CTE has a single shard join with a distributed table
165-- so the first CTE should only be broadcasted to that node
166-- since the final query doesn't have a join, it should simply be broadcasted
167-- to one node
168WITH some_values_1 AS MATERIALIZED
169	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
170	some_values_2 AS MATERIALIZED
171	(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1)
172SELECT
173	count(*)
174FROM
175	some_values_2;
176
177
178-- the same query inlined inside a CTE, and the final query has a
179-- join with a distributed table
180WITH top_cte as MATERIALIZED (
181		WITH some_values_1 AS MATERIALIZED
182		(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
183		some_values_2 AS MATERIALIZED
184		(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1)
185	SELECT
186		DISTINCT key
187	FROM
188		some_values_2
189)
190SELECT
191	count(*)
192FROM
193	top_cte JOIN table_2 USING (key);
194
195
196-- very much the same query, but this time the top query is also a router query
197-- on a single worker, so all intermediate results only hit a single node
198WITH top_cte as MATERIALIZED (
199		WITH some_values_1 AS MATERIALIZED
200		(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
201		some_values_2 AS MATERIALIZED
202		(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1)
203	SELECT
204		DISTINCT key
205	FROM
206		some_values_2
207)
208SELECT
209	count(*)
210FROM
211	top_cte JOIN table_2 USING (key) WHERE table_2.key = 2;
212
213
214-- some_values_1 is first used by a single shard-query, and than with a multi-shard
215-- CTE, finally a cartesian product join
216WITH some_values_1 AS MATERIALIZED
217	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
218	some_values_2 AS MATERIALIZED
219	(SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1),
220	some_values_3 AS MATERIALIZED
221	(SELECT key FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key))
222SELECT * FROM some_values_3 JOIN ref_table ON (true);
223
224
225
226-- join on intermediate results, so should only
227-- go to a single node
228WITH some_values_1 AS MATERIALIZED
229	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
230	some_values_2 AS MATERIALIZED
231	(SELECT key, random() FROM table_2 WHERE value IN ('3', '4'))
232SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key);
233
234-- same query with WHERE false make sure that we're not broken
235-- for such edge cases
236WITH some_values_1 AS MATERIALIZED
237	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
238	some_values_2 AS MATERIALIZED
239	(SELECT key, random() FROM table_2 WHERE value IN ('3', '4'))
240SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key) WHERE false;
241
242
243-- do not use some_values_2 at all, so only 2 intermediate results are
244-- broadcasted
245WITH some_values_1 AS MATERIALIZED
246	(SELECT key, random() FROM table_1 WHERE value IN ('3', '4')),
247	some_values_2 AS MATERIALIZED
248	(SELECT key, random() FROM some_values_1),
249	some_values_3 AS MATERIALIZED
250	(SELECT key, random() FROM some_values_1)
251SELECT
252	count(*)
253FROM
254	some_values_3;
255
256-- lets have some deeper intermediate results
257-- the inner most two results and the final query (which contains only intermediate results)
258-- hitting single worker, others hitting all workers
259-- (see below query where all intermediate results hit a single node)
260SELECT count(*) FROM
261(
262	SELECT avg(min::int) FROM
263	(
264		SELECT min(table_1.value) FROM
265		(
266			SELECT avg(value::int) as avg_ev_type FROM
267			(
268				SELECT max(value) as mx_val_1 FROM
269				(
270					SELECT avg(value::int) as avg FROM
271					(
272						SELECT cnt FROM
273						(
274							SELECT count(*) as cnt, value
275							FROM table_1
276							WHERE key = 1
277							GROUP BY value
278						) as level_1, table_1
279						WHERE table_1.key = level_1.cnt AND key = 3
280					) as level_2, table_2
281					WHERE table_2.key = level_2.cnt AND key = 5
282					GROUP BY level_2.cnt
283				) as level_3, table_1
284				WHERE value::numeric = level_3.avg AND key = 6
285				GROUP BY level_3.avg
286			) as level_4, table_2
287			WHERE level_4.mx_val_1::int = table_2.key
288			GROUP BY level_4.mx_val_1
289		) as level_5, table_1
290		WHERE level_5.avg_ev_type = table_1.key AND key > 111
291		GROUP BY level_5.avg_ev_type
292	) as level_6, table_1 WHERE table_1.key::int = level_6.min::int
293	GROUP BY table_1.value
294) as bar;
295-- the same query where all intermediate results hits one
296-- worker because each and every query is a router query -- but on different nodes
297SELECT count(*) FROM
298(
299	SELECT avg(min::int) FROM
300	(
301		SELECT min(table_1.value) FROM
302		(
303			SELECT avg(value::int) as avg_ev_type FROM
304			(
305				SELECT max(value) as mx_val_1 FROM
306				(
307					SELECT avg(value::int) as avg FROM
308					(
309						SELECT cnt FROM
310						(
311							SELECT count(*) as cnt, value
312							FROM table_1
313							WHERE key = 1
314							GROUP BY value
315						) as level_1, table_1
316						WHERE table_1.key = level_1.cnt AND key = 3
317					) as level_2, table_2
318					WHERE table_2.key = level_2.cnt AND key = 5
319					GROUP BY level_2.cnt
320				) as level_3, table_1
321				WHERE value::numeric = level_3.avg AND key = 6
322				GROUP BY level_3.avg
323			) as level_4, table_2
324			WHERE level_4.mx_val_1::int = table_2.key AND table_2.key = 1
325			GROUP BY level_4.mx_val_1
326		) as level_5, table_1
327		WHERE level_5.avg_ev_type = table_1.key AND key = 111
328		GROUP BY level_5.avg_ev_type
329	) as level_6, table_1
330	WHERE table_1.key::int = level_6.min::int AND table_1.key = 4
331	GROUP BY table_1.value
332) as bar;
333
334
335-- sanity checks for set operations
336
337-- the intermediate results should just hit a single worker
338(SELECT key FROM table_1 WHERE key = 1)
339INTERSECT
340(SELECT key FROM table_1 WHERE key = 2);
341
342-- the intermediate results should just hit a single worker
343WITH cte_1 AS MATERIALIZED
344(
345	(SELECT key FROM table_1 WHERE key = 1)
346	INTERSECT
347	(SELECT key FROM table_1 WHERE key = 2)
348),
349cte_2 AS MATERIALIZED
350(
351	(SELECT key FROM table_1 WHERE key = 3)
352	INTERSECT
353	(SELECT key FROM table_1 WHERE key = 4)
354)
355SELECT * FROM cte_1
356	UNION
357SELECT * FROM cte_2;
358
359-- one final test with SET operations, where
360-- we join the results with distributed tables
361-- so cte_1 should hit all workers, but still the
362-- others should hit single worker each
363WITH cte_1 AS MATERIALIZED
364(
365	(SELECT key FROM table_1 WHERE key = 1)
366	INTERSECT
367	(SELECT key FROM table_1 WHERE key = 2)
368),
369cte_2 AS MATERIALIZED
370(
371	SELECT count(*) FROM table_1 JOIN cte_1 USING (key)
372)
373SELECT * FROM cte_2;
374
375
376-- sanity checks for non-colocated subquery joins
377-- the recursively planned subquery (bar) should hit all
378-- nodes
379SELECT
380	count(*)
381FROM
382	(SELECT key, random() FROM table_1) as foo,
383	(SELECT key, random() FROM table_2) as bar
384WHERE
385	foo.key != bar.key;
386
387-- the recursively planned subquery (bar) should hit one
388-- node because foo goes to a single node
389SELECT
390	count(*)
391FROM
392	(SELECT key, random() FROM table_1 WHERE key = 1) as foo,
393	(SELECT key, random() FROM table_2) as bar
394WHERE
395	foo.key != bar.key;
396
397
398-- sanity checks for modification queries
399
400-- select_data goes to a single node, because it is used in another subquery
401-- raw_data is also the final router query, so hits a single shard
402-- however, the subquery in WHERE clause of the DELETE query is broadcasted to all
403-- nodes
404BEGIN;
405WITH select_data AS MATERIALIZED (
406	SELECT * FROM table_1
407),
408raw_data AS MATERIALIZED (
409	DELETE FROM table_2 WHERE key >= (SELECT min(key) FROM select_data WHERE key > 1) RETURNING *
410)
411SELECT * FROM raw_data;
412ROLLBACK;
413
414-- select_data goes to a single node, because it is used in another subquery
415-- raw_data is also the final router query, so hits a single shard
416-- however, the subquery in WHERE clause of the DELETE query is broadcasted to all
417-- nodes
418BEGIN;
419WITH select_data AS MATERIALIZED (
420	SELECT * FROM table_1
421),
422raw_data AS MATERIALIZED (
423	DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM select_data WHERE key > 1 + random()) RETURNING *
424)
425SELECT * FROM raw_data;
426ROLLBACK;
427
428-- now, we need only two intermediate results as the subquery in WHERE clause is
429-- router plannable
430BEGIN;
431WITH select_data AS MATERIALIZED (
432	SELECT * FROM table_1
433),
434raw_data AS MATERIALIZED (
435	DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM table_1 WHERE key > random()) AND key = 6 RETURNING *
436)
437SELECT * FROM raw_data;
438ROLLBACK;
439
440-- test with INSERT SELECT via coordinator
441
442-- INSERT .. SELECT via coordinator that doesn't have any intermediate results
443-- We use offset 1 to make sure the result needs to be pulled to the coordinator, offset 0 would be optimized away
444INSERT INTO table_1
445	SELECT * FROM table_2 OFFSET 1;
446
447-- INSERT .. SELECT via coordinator which has intermediate result,
448-- and can be pruned to a single worker because the final query is on
449-- single shard via filter in key
450INSERT INTO table_1
451	SELECT * FROM table_2 where value IN (SELECT value FROM table_1 WHERE random() > 1) AND key = 1;
452
453-- a similar query, with more complex subquery
454INSERT INTO table_1
455	SELECT * FROM table_2 where key = 1 AND
456 value::int IN
457		(WITH cte_1 AS MATERIALIZED
458		(
459			(SELECT key FROM table_1 WHERE key = 1)
460			INTERSECT
461			(SELECT key FROM table_1 WHERE key = 2)
462		),
463		cte_2 AS MATERIALIZED
464		(
465			(SELECT key FROM table_1 WHERE key = 3)
466			INTERSECT
467			(SELECT key FROM table_1 WHERE key = 4)
468		)
469		SELECT * FROM cte_1
470			UNION
471		SELECT * FROM cte_2);
472
473-- same query, cte is on the FROM clause
474-- and this time the final query (and top-level intermediate result)
475-- hits all the shards because table_2.key != 1
476INSERT INTO table_1
477	SELECT table_2.* FROM table_2,
478	(WITH cte_1 AS MATERIALIZED
479		(
480			(SELECT key FROM table_1 WHERE key = 1)
481			INTERSECT
482			(SELECT key FROM table_1 WHERE key = 2)
483		),
484		cte_2 AS MATERIALIZED
485		(
486			(SELECT key FROM table_1 WHERE key = 3)
487			INTERSECT
488			(SELECT key FROM table_1 WHERE key = 4)
489		)
490		SELECT * FROM cte_1
491			UNION
492		SELECT * FROM cte_2
493	 ) foo
494	 where table_2.key != 1 AND
495 	foo.key = table_2.value::int;
496
497
498
499-- append partitioned/heap-type
500-- do not print out 'building index pg_toast_xxxxx_index' messages
501SET client_min_messages TO DEFAULT;
502CREATE TABLE range_partitioned(range_column text, data int);
503SET client_min_messages TO DEBUG1;
504
505SELECT create_distributed_table('range_partitioned', 'range_column', 'range');
506SELECT master_create_empty_shard('range_partitioned');
507SELECT master_create_empty_shard('range_partitioned');
508SELECT master_create_empty_shard('range_partitioned');
509SELECT master_create_empty_shard('range_partitioned');
510SELECT master_create_empty_shard('range_partitioned');
511
512
513UPDATE pg_dist_shard SET shardminvalue = 'A', shardmaxvalue = 'D' WHERE shardid = 1480013;
514UPDATE pg_dist_shard SET shardminvalue = 'D', shardmaxvalue = 'G' WHERE shardid = 1480014;
515UPDATE pg_dist_shard SET shardminvalue = 'G', shardmaxvalue = 'K' WHERE shardid = 1480015;
516UPDATE pg_dist_shard SET shardminvalue = 'K', shardmaxvalue = 'O' WHERE shardid = 1480016;
517UPDATE pg_dist_shard SET shardminvalue = 'O', shardmaxvalue = 'Z' WHERE shardid = 1480017;
518
519-- final query goes to a single shard
520SELECT
521	count(*)
522FROM
523	range_partitioned
524WHERE
525	range_column = 'A' AND
526	data IN (SELECT data FROM range_partitioned);
527
528
529-- final query goes to three shards, so multiple workers
530SELECT
531	count(*)
532FROM
533	range_partitioned
534WHERE
535	range_column >= 'A' AND range_column <= 'K' AND
536	data IN (SELECT data FROM range_partitioned);
537
538-- two shards, both of which are on the first node
539WITH some_data AS (
540	SELECT data FROM range_partitioned
541)
542SELECT
543	count(*)
544FROM
545	range_partitioned
546WHERE
547	range_column IN ('A', 'E') AND
548	range_partitioned.data IN (SELECT data FROM some_data);
549
550
551-- test case for issue #3556
552CREATE TABLE accounts (id text PRIMARY KEY);
553CREATE TABLE stats (account_id text PRIMARY KEY, spent int);
554
555SELECT create_distributed_table('accounts', 'id', colocate_with => 'none');
556SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts');
557
558INSERT INTO accounts (id) VALUES ('foo');
559INSERT INTO stats (account_id, spent) VALUES ('foo', 100);
560
561SELECT *
562FROM
563(
564    WITH accounts_cte AS MATERIALIZED (
565        SELECT id AS account_id
566        FROM accounts
567    ),
568    joined_stats_cte_1 AS MATERIALIZED (
569        SELECT spent, account_id
570        FROM stats
571        INNER JOIN accounts_cte USING (account_id)
572    ),
573    joined_stats_cte_2 AS MATERIALIZED (
574        SELECT spent, account_id
575        FROM joined_stats_cte_1
576        INNER JOIN accounts_cte USING (account_id)
577    )
578    SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL))
579    FROM accounts_cte
580    INNER JOIN joined_stats_cte_2 USING (account_id)
581) inner_query;
582
583-- confirm that the pruning works well when using round-robin as well
584SET citus.task_assignment_policy to 'round-robin';
585SELECT *
586FROM
587(
588    WITH accounts_cte AS MATERIALIZED (
589        SELECT id AS account_id
590        FROM accounts
591    ),
592    joined_stats_cte_1 AS MATERIALIZED (
593        SELECT spent, account_id
594        FROM stats
595        INNER JOIN accounts_cte USING (account_id)
596    ),
597    joined_stats_cte_2 AS MATERIALIZED (
598        SELECT spent, account_id
599        FROM joined_stats_cte_1
600        INNER JOIN accounts_cte USING (account_id)
601    )
602    SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL))
603    FROM accounts_cte
604    INNER JOIN joined_stats_cte_2 USING (account_id)
605) inner_query;
606RESET citus.task_assignment_policy;
607
608-- Insert..select is planned differently, make sure we have results everywhere.
609-- We put the insert..select in a CTE here to prevent the CTE from being moved
610-- into the select, which would follow the regular code path for select.
611WITH stats AS MATERIALIZED (
612  SELECT count(key) m FROM table_3
613),
614inserts AS MATERIALIZED (
615  INSERT INTO table_2
616  SELECT key, count(*)
617  FROM table_1
618  WHERE key > (SELECT m FROM stats)
619  GROUP BY key
620  HAVING count(*) < (SELECT m FROM stats)
621  LIMIT 1
622  RETURNING *
623) SELECT count(*) FROM inserts;
624
625SET citus.task_assignment_policy to DEFAULT;
626SET client_min_messages TO DEFAULT;
627DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned;
628DROP SCHEMA intermediate_result_pruning;
629
630