1CREATE SCHEMA with_transactions;
2SET search_path TO 	with_transactions, public;
3
4SET citus.shard_count TO 4;
5SET citus.shard_replication_factor TO 1; -- https://github.com/citusdata/citus/issues/3061
6SET citus.next_placement_id TO 800000;
7
8CREATE TABLE with_transactions.raw_table (tenant_id int, income float, created_at timestamptz);
9SELECT create_distributed_table('raw_table', 'tenant_id');
10
11CREATE TABLE with_transactions.second_raw_table (tenant_id int, income float, created_at timestamptz);
12SELECT create_distributed_table('second_raw_table', 'tenant_id');
13
14
15INSERT INTO
16	raw_table (tenant_id, income, created_at)
17SELECT
18	i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day'
19FROM
20	generate_series (0, 100) i;
21
22INSERT INTO second_raw_table SELECT * FROM raw_table;
23
24SET client_min_messages TO DEBUG1;
25
26-- run a transaction which DELETE
27BEGIN;
28
29	WITH ids_to_delete AS
30		(
31			SELECT tenant_id FROM raw_table WHERE income < 250
32		),
33		deleted_ids AS
34		(
35			DELETE FROM raw_table WHERE created_at  < '2014-02-10 20:00:00' AND tenant_id IN (SELECT * from ids_to_delete) RETURNING tenant_id
36		)
37		UPDATE raw_table SET income = income * 2 WHERE tenant_id IN (SELECT tenant_id FROM deleted_ids);
38
39ROLLBACK;
40
41-- see that both UPDATE and DELETE commands are rollbacked
42SELECT count(*) FROM raw_table;
43SELECT max(income) FROM raw_table;
44
45-- multi-statement multi shard modifying statements should work
46BEGIN;
47	SELECT count (*) FROM second_raw_table;
48
49	WITH distinct_count AS (
50		SELECT count(DISTINCT created_at) FROM raw_table
51	),
52	ids_inserted AS
53	(
54		INSERT INTO raw_table VALUES (11, 1000, now()) RETURNING tenant_id
55	)
56	UPDATE raw_table SET created_at = '2001-02-10 20:00:00'
57	WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted) AND tenant_id < (SELECT count FROM distinct_count);
58
59	TRUNCATE second_raw_table;
60COMMIT;
61
62-- sequential insert followed by parallel update works just fine
63WITH ids_inserted AS
64(
65  INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id
66)
67UPDATE raw_table SET created_at = '2001-02-10 20:00:00' WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted);
68
69-- make sure that everything committed
70SELECT count(*) FROM raw_table;
71SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00';
72SELECT count(*) FROM second_raw_table;
73
74-- sequential insert followed by a sequential real-time query should be fine
75BEGIN;
76SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
77WITH ids_inserted AS
78(
79  INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id
80)
81SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3;
82ROLLBACK;
83
84RESET client_min_messages;
85
86CREATE OR REPLACE FUNCTION run_ctes(id int)
87RETURNS text
88LANGUAGE 'plpgsql'
89AS $BODY$
90DECLARE
91	value text;
92BEGIN
93
94	WITH
95	dist AS (SELECT tenant_id FROM raw_table WHERE tenant_id < 10 OFFSET 0)
96	SELECT count(*) INTO value FROM dist WHERE id = tenant_id;
97
98	RETURN value ;
99END;
100$BODY$;
101
102SELECT count(*) FROM (SELECT run_ctes(s) FROM generate_series(1,current_setting('max_connections')::int+2) s) a;
103
104DROP SCHEMA with_transactions CASCADE;
105