1CREATE SCHEMA with_transactions; 2SET search_path TO with_transactions, public; 3 4SET citus.shard_count TO 4; 5SET citus.shard_replication_factor TO 1; -- https://github.com/citusdata/citus/issues/3061 6SET citus.next_placement_id TO 800000; 7 8CREATE TABLE with_transactions.raw_table (tenant_id int, income float, created_at timestamptz); 9SELECT create_distributed_table('raw_table', 'tenant_id'); 10 11CREATE TABLE with_transactions.second_raw_table (tenant_id int, income float, created_at timestamptz); 12SELECT create_distributed_table('second_raw_table', 'tenant_id'); 13 14 15INSERT INTO 16 raw_table (tenant_id, income, created_at) 17SELECT 18 i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day' 19FROM 20 generate_series (0, 100) i; 21 22INSERT INTO second_raw_table SELECT * FROM raw_table; 23 24SET client_min_messages TO DEBUG1; 25 26-- run a transaction which DELETE 27BEGIN; 28 29 WITH ids_to_delete AS 30 ( 31 SELECT tenant_id FROM raw_table WHERE income < 250 32 ), 33 deleted_ids AS 34 ( 35 DELETE FROM raw_table WHERE created_at < '2014-02-10 20:00:00' AND tenant_id IN (SELECT * from ids_to_delete) RETURNING tenant_id 36 ) 37 UPDATE raw_table SET income = income * 2 WHERE tenant_id IN (SELECT tenant_id FROM deleted_ids); 38 39ROLLBACK; 40 41-- see that both UPDATE and DELETE commands are rollbacked 42SELECT count(*) FROM raw_table; 43SELECT max(income) FROM raw_table; 44 45-- multi-statement multi shard modifying statements should work 46BEGIN; 47 SELECT count (*) FROM second_raw_table; 48 49 WITH distinct_count AS ( 50 SELECT count(DISTINCT created_at) FROM raw_table 51 ), 52 ids_inserted AS 53 ( 54 INSERT INTO raw_table VALUES (11, 1000, now()) RETURNING tenant_id 55 ) 56 UPDATE raw_table SET created_at = '2001-02-10 20:00:00' 57 WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted) AND tenant_id < (SELECT count FROM distinct_count); 58 59 TRUNCATE second_raw_table; 60COMMIT; 61 62-- sequential insert followed by parallel update works just fine 63WITH ids_inserted AS 64( 65 INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id 66) 67UPDATE raw_table SET created_at = '2001-02-10 20:00:00' WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted); 68 69-- make sure that everything committed 70SELECT count(*) FROM raw_table; 71SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00'; 72SELECT count(*) FROM second_raw_table; 73 74-- sequential insert followed by a sequential real-time query should be fine 75BEGIN; 76SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; 77WITH ids_inserted AS 78( 79 INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id 80) 81SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3; 82ROLLBACK; 83 84RESET client_min_messages; 85 86CREATE OR REPLACE FUNCTION run_ctes(id int) 87RETURNS text 88LANGUAGE 'plpgsql' 89AS $BODY$ 90DECLARE 91 value text; 92BEGIN 93 94 WITH 95 dist AS (SELECT tenant_id FROM raw_table WHERE tenant_id < 10 OFFSET 0) 96 SELECT count(*) INTO value FROM dist WHERE id = tenant_id; 97 98 RETURN value ; 99END; 100$BODY$; 101 102SELECT count(*) FROM (SELECT run_ctes(s) FROM generate_series(1,current_setting('max_connections')::int+2) s) a; 103 104DROP SCHEMA with_transactions CASCADE; 105