1CREATE SCHEMA intermediate_result_pruning; 2SET search_path TO intermediate_result_pruning; 3SET citus.log_intermediate_results TO TRUE; 4 5SET citus.shard_count TO 4; 6SET citus.next_shard_id TO 1480000; 7SET citus.shard_replication_factor = 1; 8 9CREATE TABLE table_1 (key int, value text); 10SELECT create_distributed_table('table_1', 'key'); 11 12CREATE TABLE table_2 (key int, value text); 13SELECT create_distributed_table('table_2', 'key'); 14 15 16CREATE TABLE table_3 (key int, value text); 17SELECT create_distributed_table('table_3', 'key'); 18 19CREATE TABLE ref_table (key int, value text); 20SELECT create_reference_table('ref_table'); 21 22 23-- load some data 24INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); 25INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); 26INSERT INTO table_3 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); 27INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); 28 29-- see which workers are hit for intermediate results 30SET client_min_messages TO DEBUG1; 31 32-- a very basic case, where the intermediate result 33-- should go to both workers 34WITH some_values_1 AS MATERIALIZED 35 (SELECT key FROM table_1 WHERE value IN ('3', '4')) 36SELECT 37 count(*) 38FROM 39 some_values_1 JOIN table_2 USING (key); 40 41 42-- a very basic case, where the intermediate result 43-- should only go to one worker because the final query is a router 44-- we use random() to prevent postgres inline the CTE(s) 45WITH some_values_1 AS MATERIALIZED 46 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')) 47SELECT 48 count(*) 49FROM 50 some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1; 51 52-- a similar query, but with a reference table now 53-- given that reference tables are replicated to all nodes 54-- we have to broadcast to all nodes 55WITH some_values_1 AS MATERIALIZED 56 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')) 57SELECT 58 count(*) 59FROM 60 some_values_1 JOIN ref_table USING (key); 61 62 63-- a similar query as above, but this time use the CTE inside 64-- another CTE 65WITH some_values_1 AS MATERIALIZED 66 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 67 some_values_2 AS MATERIALIZED 68 (SELECT key, random() FROM some_values_1) 69SELECT 70 count(*) 71FROM 72 some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 1; 73 74-- the second CTE does a join with a distributed table 75-- and the final query is a router query 76WITH some_values_1 AS MATERIALIZED 77 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 78 some_values_2 AS MATERIALIZED 79 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) 80SELECT 81 count(*) 82FROM 83 some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 3; 84 85-- the first CTE is used both within second CTE and the final query 86-- the second CTE does a join with a distributed table 87-- and the final query is a router query 88WITH some_values_1 AS MATERIALIZED 89 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 90 some_values_2 AS MATERIALIZED 91 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) 92SELECT 93 count(*) 94FROM 95 (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3; 96 97-- the first CTE is used both within second CTE and the final query 98-- the second CTE does a join with a distributed table but a router query on a worker 99-- and the final query is another router query on another worker 100WITH some_values_1 AS MATERIALIZED 101 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 102 some_values_2 AS MATERIALIZED 103 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1) 104SELECT 105 count(*) 106FROM 107 (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3; 108 109-- the first CTE is used both within second CTE and the final query 110-- the second CTE does a join with a distributed table but a router query on a worker 111-- and the final query is a router query on the same worker, so the first result is only 112-- broadcasted to a single node 113WITH some_values_1 AS MATERIALIZED 114 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 115 some_values_2 AS MATERIALIZED 116 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1) 117SELECT 118 count(*) 119FROM 120 (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 1; 121 122-- the same query with the above, but the final query is hitting all shards 123WITH some_values_1 AS MATERIALIZED 124 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 125 some_values_2 AS MATERIALIZED 126 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) 127SELECT 128 count(*) 129FROM 130 (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3; 131 132-- even if we add a filter on the first query and make it a router query, 133-- the first intermediate result still hits all workers because of the final 134-- join is hitting all workers 135WITH some_values_1 AS MATERIALIZED 136 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 137 some_values_2 AS MATERIALIZED 138 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 3) 139SELECT 140 count(*) 141FROM 142 (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3; 143 144-- the reference table is joined with a distributed table and an intermediate 145-- result, but the distributed table hits all shards, so the intermediate 146-- result is sent to all nodes 147WITH some_values_1 AS MATERIALIZED 148 (SELECT key, random() FROM ref_table WHERE value IN ('3', '4')) 149SELECT 150 count(*) 151FROM 152 (some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key); 153 154-- similar query as above, but this time the whole query is a router 155-- query, so no intermediate results 156WITH some_values_1 AS MATERIALIZED 157 (SELECT key, random() FROM ref_table WHERE value IN ('3', '4')) 158SELECT 159 count(*) 160FROM 161 (some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key) WHERE table_2.key = 1; 162 163 164-- now, the second CTE has a single shard join with a distributed table 165-- so the first CTE should only be broadcasted to that node 166-- since the final query doesn't have a join, it should simply be broadcasted 167-- to one node 168WITH some_values_1 AS MATERIALIZED 169 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 170 some_values_2 AS MATERIALIZED 171 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) 172SELECT 173 count(*) 174FROM 175 some_values_2; 176 177 178-- the same query inlined inside a CTE, and the final query has a 179-- join with a distributed table 180WITH top_cte as MATERIALIZED ( 181 WITH some_values_1 AS MATERIALIZED 182 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 183 some_values_2 AS MATERIALIZED 184 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) 185 SELECT 186 DISTINCT key 187 FROM 188 some_values_2 189) 190SELECT 191 count(*) 192FROM 193 top_cte JOIN table_2 USING (key); 194 195 196-- very much the same query, but this time the top query is also a router query 197-- on a single worker, so all intermediate results only hit a single node 198WITH top_cte as MATERIALIZED ( 199 WITH some_values_1 AS MATERIALIZED 200 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 201 some_values_2 AS MATERIALIZED 202 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) 203 SELECT 204 DISTINCT key 205 FROM 206 some_values_2 207) 208SELECT 209 count(*) 210FROM 211 top_cte JOIN table_2 USING (key) WHERE table_2.key = 2; 212 213 214-- some_values_1 is first used by a single shard-query, and than with a multi-shard 215-- CTE, finally a cartesian product join 216WITH some_values_1 AS MATERIALIZED 217 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 218 some_values_2 AS MATERIALIZED 219 (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1), 220 some_values_3 AS MATERIALIZED 221 (SELECT key FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key)) 222SELECT * FROM some_values_3 JOIN ref_table ON (true); 223 224 225 226-- join on intermediate results, so should only 227-- go to a single node 228WITH some_values_1 AS MATERIALIZED 229 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 230 some_values_2 AS MATERIALIZED 231 (SELECT key, random() FROM table_2 WHERE value IN ('3', '4')) 232SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key); 233 234-- same query with WHERE false make sure that we're not broken 235-- for such edge cases 236WITH some_values_1 AS MATERIALIZED 237 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 238 some_values_2 AS MATERIALIZED 239 (SELECT key, random() FROM table_2 WHERE value IN ('3', '4')) 240SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key) WHERE false; 241 242 243-- do not use some_values_2 at all, so only 2 intermediate results are 244-- broadcasted 245WITH some_values_1 AS MATERIALIZED 246 (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), 247 some_values_2 AS MATERIALIZED 248 (SELECT key, random() FROM some_values_1), 249 some_values_3 AS MATERIALIZED 250 (SELECT key, random() FROM some_values_1) 251SELECT 252 count(*) 253FROM 254 some_values_3; 255 256-- lets have some deeper intermediate results 257-- the inner most two results and the final query (which contains only intermediate results) 258-- hitting single worker, others hitting all workers 259-- (see below query where all intermediate results hit a single node) 260SELECT count(*) FROM 261( 262 SELECT avg(min::int) FROM 263 ( 264 SELECT min(table_1.value) FROM 265 ( 266 SELECT avg(value::int) as avg_ev_type FROM 267 ( 268 SELECT max(value) as mx_val_1 FROM 269 ( 270 SELECT avg(value::int) as avg FROM 271 ( 272 SELECT cnt FROM 273 ( 274 SELECT count(*) as cnt, value 275 FROM table_1 276 WHERE key = 1 277 GROUP BY value 278 ) as level_1, table_1 279 WHERE table_1.key = level_1.cnt AND key = 3 280 ) as level_2, table_2 281 WHERE table_2.key = level_2.cnt AND key = 5 282 GROUP BY level_2.cnt 283 ) as level_3, table_1 284 WHERE value::numeric = level_3.avg AND key = 6 285 GROUP BY level_3.avg 286 ) as level_4, table_2 287 WHERE level_4.mx_val_1::int = table_2.key 288 GROUP BY level_4.mx_val_1 289 ) as level_5, table_1 290 WHERE level_5.avg_ev_type = table_1.key AND key > 111 291 GROUP BY level_5.avg_ev_type 292 ) as level_6, table_1 WHERE table_1.key::int = level_6.min::int 293 GROUP BY table_1.value 294) as bar; 295-- the same query where all intermediate results hits one 296-- worker because each and every query is a router query -- but on different nodes 297SELECT count(*) FROM 298( 299 SELECT avg(min::int) FROM 300 ( 301 SELECT min(table_1.value) FROM 302 ( 303 SELECT avg(value::int) as avg_ev_type FROM 304 ( 305 SELECT max(value) as mx_val_1 FROM 306 ( 307 SELECT avg(value::int) as avg FROM 308 ( 309 SELECT cnt FROM 310 ( 311 SELECT count(*) as cnt, value 312 FROM table_1 313 WHERE key = 1 314 GROUP BY value 315 ) as level_1, table_1 316 WHERE table_1.key = level_1.cnt AND key = 3 317 ) as level_2, table_2 318 WHERE table_2.key = level_2.cnt AND key = 5 319 GROUP BY level_2.cnt 320 ) as level_3, table_1 321 WHERE value::numeric = level_3.avg AND key = 6 322 GROUP BY level_3.avg 323 ) as level_4, table_2 324 WHERE level_4.mx_val_1::int = table_2.key AND table_2.key = 1 325 GROUP BY level_4.mx_val_1 326 ) as level_5, table_1 327 WHERE level_5.avg_ev_type = table_1.key AND key = 111 328 GROUP BY level_5.avg_ev_type 329 ) as level_6, table_1 330 WHERE table_1.key::int = level_6.min::int AND table_1.key = 4 331 GROUP BY table_1.value 332) as bar; 333 334 335-- sanity checks for set operations 336 337-- the intermediate results should just hit a single worker 338(SELECT key FROM table_1 WHERE key = 1) 339INTERSECT 340(SELECT key FROM table_1 WHERE key = 2); 341 342-- the intermediate results should just hit a single worker 343WITH cte_1 AS MATERIALIZED 344( 345 (SELECT key FROM table_1 WHERE key = 1) 346 INTERSECT 347 (SELECT key FROM table_1 WHERE key = 2) 348), 349cte_2 AS MATERIALIZED 350( 351 (SELECT key FROM table_1 WHERE key = 3) 352 INTERSECT 353 (SELECT key FROM table_1 WHERE key = 4) 354) 355SELECT * FROM cte_1 356 UNION 357SELECT * FROM cte_2; 358 359-- one final test with SET operations, where 360-- we join the results with distributed tables 361-- so cte_1 should hit all workers, but still the 362-- others should hit single worker each 363WITH cte_1 AS MATERIALIZED 364( 365 (SELECT key FROM table_1 WHERE key = 1) 366 INTERSECT 367 (SELECT key FROM table_1 WHERE key = 2) 368), 369cte_2 AS MATERIALIZED 370( 371 SELECT count(*) FROM table_1 JOIN cte_1 USING (key) 372) 373SELECT * FROM cte_2; 374 375 376-- sanity checks for non-colocated subquery joins 377-- the recursively planned subquery (bar) should hit all 378-- nodes 379SELECT 380 count(*) 381FROM 382 (SELECT key, random() FROM table_1) as foo, 383 (SELECT key, random() FROM table_2) as bar 384WHERE 385 foo.key != bar.key; 386 387-- the recursively planned subquery (bar) should hit one 388-- node because foo goes to a single node 389SELECT 390 count(*) 391FROM 392 (SELECT key, random() FROM table_1 WHERE key = 1) as foo, 393 (SELECT key, random() FROM table_2) as bar 394WHERE 395 foo.key != bar.key; 396 397 398-- sanity checks for modification queries 399 400-- select_data goes to a single node, because it is used in another subquery 401-- raw_data is also the final router query, so hits a single shard 402-- however, the subquery in WHERE clause of the DELETE query is broadcasted to all 403-- nodes 404BEGIN; 405WITH select_data AS MATERIALIZED ( 406 SELECT * FROM table_1 407), 408raw_data AS MATERIALIZED ( 409 DELETE FROM table_2 WHERE key >= (SELECT min(key) FROM select_data WHERE key > 1) RETURNING * 410) 411SELECT * FROM raw_data; 412ROLLBACK; 413 414-- select_data goes to a single node, because it is used in another subquery 415-- raw_data is also the final router query, so hits a single shard 416-- however, the subquery in WHERE clause of the DELETE query is broadcasted to all 417-- nodes 418BEGIN; 419WITH select_data AS MATERIALIZED ( 420 SELECT * FROM table_1 421), 422raw_data AS MATERIALIZED ( 423 DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM select_data WHERE key > 1 + random()) RETURNING * 424) 425SELECT * FROM raw_data; 426ROLLBACK; 427 428-- now, we need only two intermediate results as the subquery in WHERE clause is 429-- router plannable 430BEGIN; 431WITH select_data AS MATERIALIZED ( 432 SELECT * FROM table_1 433), 434raw_data AS MATERIALIZED ( 435 DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM table_1 WHERE key > random()) AND key = 6 RETURNING * 436) 437SELECT * FROM raw_data; 438ROLLBACK; 439 440-- test with INSERT SELECT via coordinator 441 442-- INSERT .. SELECT via coordinator that doesn't have any intermediate results 443-- We use offset 1 to make sure the result needs to be pulled to the coordinator, offset 0 would be optimized away 444INSERT INTO table_1 445 SELECT * FROM table_2 OFFSET 1; 446 447-- INSERT .. SELECT via coordinator which has intermediate result, 448-- and can be pruned to a single worker because the final query is on 449-- single shard via filter in key 450INSERT INTO table_1 451 SELECT * FROM table_2 where value IN (SELECT value FROM table_1 WHERE random() > 1) AND key = 1; 452 453-- a similar query, with more complex subquery 454INSERT INTO table_1 455 SELECT * FROM table_2 where key = 1 AND 456 value::int IN 457 (WITH cte_1 AS MATERIALIZED 458 ( 459 (SELECT key FROM table_1 WHERE key = 1) 460 INTERSECT 461 (SELECT key FROM table_1 WHERE key = 2) 462 ), 463 cte_2 AS MATERIALIZED 464 ( 465 (SELECT key FROM table_1 WHERE key = 3) 466 INTERSECT 467 (SELECT key FROM table_1 WHERE key = 4) 468 ) 469 SELECT * FROM cte_1 470 UNION 471 SELECT * FROM cte_2); 472 473-- same query, cte is on the FROM clause 474-- and this time the final query (and top-level intermediate result) 475-- hits all the shards because table_2.key != 1 476INSERT INTO table_1 477 SELECT table_2.* FROM table_2, 478 (WITH cte_1 AS MATERIALIZED 479 ( 480 (SELECT key FROM table_1 WHERE key = 1) 481 INTERSECT 482 (SELECT key FROM table_1 WHERE key = 2) 483 ), 484 cte_2 AS MATERIALIZED 485 ( 486 (SELECT key FROM table_1 WHERE key = 3) 487 INTERSECT 488 (SELECT key FROM table_1 WHERE key = 4) 489 ) 490 SELECT * FROM cte_1 491 UNION 492 SELECT * FROM cte_2 493 ) foo 494 where table_2.key != 1 AND 495 foo.key = table_2.value::int; 496 497 498 499-- append partitioned/heap-type 500-- do not print out 'building index pg_toast_xxxxx_index' messages 501SET client_min_messages TO DEFAULT; 502CREATE TABLE range_partitioned(range_column text, data int); 503SET client_min_messages TO DEBUG1; 504 505SELECT create_distributed_table('range_partitioned', 'range_column', 'range'); 506SELECT master_create_empty_shard('range_partitioned'); 507SELECT master_create_empty_shard('range_partitioned'); 508SELECT master_create_empty_shard('range_partitioned'); 509SELECT master_create_empty_shard('range_partitioned'); 510SELECT master_create_empty_shard('range_partitioned'); 511 512 513UPDATE pg_dist_shard SET shardminvalue = 'A', shardmaxvalue = 'D' WHERE shardid = 1480013; 514UPDATE pg_dist_shard SET shardminvalue = 'D', shardmaxvalue = 'G' WHERE shardid = 1480014; 515UPDATE pg_dist_shard SET shardminvalue = 'G', shardmaxvalue = 'K' WHERE shardid = 1480015; 516UPDATE pg_dist_shard SET shardminvalue = 'K', shardmaxvalue = 'O' WHERE shardid = 1480016; 517UPDATE pg_dist_shard SET shardminvalue = 'O', shardmaxvalue = 'Z' WHERE shardid = 1480017; 518 519-- final query goes to a single shard 520SELECT 521 count(*) 522FROM 523 range_partitioned 524WHERE 525 range_column = 'A' AND 526 data IN (SELECT data FROM range_partitioned); 527 528 529-- final query goes to three shards, so multiple workers 530SELECT 531 count(*) 532FROM 533 range_partitioned 534WHERE 535 range_column >= 'A' AND range_column <= 'K' AND 536 data IN (SELECT data FROM range_partitioned); 537 538-- two shards, both of which are on the first node 539WITH some_data AS ( 540 SELECT data FROM range_partitioned 541) 542SELECT 543 count(*) 544FROM 545 range_partitioned 546WHERE 547 range_column IN ('A', 'E') AND 548 range_partitioned.data IN (SELECT data FROM some_data); 549 550 551-- test case for issue #3556 552CREATE TABLE accounts (id text PRIMARY KEY); 553CREATE TABLE stats (account_id text PRIMARY KEY, spent int); 554 555SELECT create_distributed_table('accounts', 'id', colocate_with => 'none'); 556SELECT create_distributed_table('stats', 'account_id', colocate_with => 'accounts'); 557 558INSERT INTO accounts (id) VALUES ('foo'); 559INSERT INTO stats (account_id, spent) VALUES ('foo', 100); 560 561SELECT * 562FROM 563( 564 WITH accounts_cte AS MATERIALIZED ( 565 SELECT id AS account_id 566 FROM accounts 567 ), 568 joined_stats_cte_1 AS MATERIALIZED ( 569 SELECT spent, account_id 570 FROM stats 571 INNER JOIN accounts_cte USING (account_id) 572 ), 573 joined_stats_cte_2 AS MATERIALIZED ( 574 SELECT spent, account_id 575 FROM joined_stats_cte_1 576 INNER JOIN accounts_cte USING (account_id) 577 ) 578 SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) 579 FROM accounts_cte 580 INNER JOIN joined_stats_cte_2 USING (account_id) 581) inner_query; 582 583-- confirm that the pruning works well when using round-robin as well 584SET citus.task_assignment_policy to 'round-robin'; 585SELECT * 586FROM 587( 588 WITH accounts_cte AS MATERIALIZED ( 589 SELECT id AS account_id 590 FROM accounts 591 ), 592 joined_stats_cte_1 AS MATERIALIZED ( 593 SELECT spent, account_id 594 FROM stats 595 INNER JOIN accounts_cte USING (account_id) 596 ), 597 joined_stats_cte_2 AS MATERIALIZED ( 598 SELECT spent, account_id 599 FROM joined_stats_cte_1 600 INNER JOIN accounts_cte USING (account_id) 601 ) 602 SELECT SUM(spent) OVER (PARTITION BY coalesce(account_id, NULL)) 603 FROM accounts_cte 604 INNER JOIN joined_stats_cte_2 USING (account_id) 605) inner_query; 606RESET citus.task_assignment_policy; 607 608-- Insert..select is planned differently, make sure we have results everywhere. 609-- We put the insert..select in a CTE here to prevent the CTE from being moved 610-- into the select, which would follow the regular code path for select. 611WITH stats AS MATERIALIZED ( 612 SELECT count(key) m FROM table_3 613), 614inserts AS MATERIALIZED ( 615 INSERT INTO table_2 616 SELECT key, count(*) 617 FROM table_1 618 WHERE key > (SELECT m FROM stats) 619 GROUP BY key 620 HAVING count(*) < (SELECT m FROM stats) 621 LIMIT 1 622 RETURNING * 623) SELECT count(*) FROM inserts; 624 625SET citus.task_assignment_policy to DEFAULT; 626SET client_min_messages TO DEFAULT; 627DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned; 628DROP SCHEMA intermediate_result_pruning; 629 630