1CREATE SCHEMA local_shard_copy; 2SET search_path TO local_shard_copy; 3 4SET client_min_messages TO DEBUG; 5SET citus.next_shard_id TO 1570000; 6SET citus.replicate_reference_tables_on_activate TO off; 7 8SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0); 9 10SET citus.shard_count TO 4; 11SET citus.shard_replication_factor TO 1; 12 13CREATE TABLE reference_table (key int PRIMARY KEY); 14SELECT create_reference_table('reference_table'); 15 16CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); 17SELECT create_distributed_table('distributed_table','key'); 18 19INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); 20INSERT INTO reference_table SELECT * FROM generate_series(1, 10); 21 22CREATE TABLE local_table (key int PRIMARY KEY); 23INSERT INTO local_table SELECT * from generate_series(1, 10); 24 25-- partitioned table 26CREATE TABLE collections_list ( 27 key bigserial, 28 collection_id integer 29) PARTITION BY LIST (collection_id ); 30 31SELECT create_distributed_table('collections_list', 'key'); 32 33CREATE TABLE collections_list_0 34 PARTITION OF collections_list (key, collection_id) 35 FOR VALUES IN ( 0 ); 36 37CREATE TABLE collections_list_1 38 PARTITION OF collections_list (key, collection_id) 39 FOR VALUES IN ( 1 ); 40 41 42-- connection worker and get ready for the tests 43\c - - - :worker_1_port 44 45SET search_path TO local_shard_copy; 46SET citus.log_local_commands TO ON; 47 48-- returns true of the distribution key filter 49-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard 50-- placement which is local to this not 51CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ 52 53 DECLARE shard_is_local BOOLEAN := FALSE; 54 55 BEGIN 56 57 WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), 58 all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) 59 SELECT 60 true INTO shard_is_local 61 FROM 62 local_shard_ids 63 WHERE 64 get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); 65 66 IF shard_is_local IS NULL THEN 67 shard_is_local = FALSE; 68 END IF; 69 70 RETURN shard_is_local; 71 END; 72$$ LANGUAGE plpgsql; 73 74-- pick some example values that reside on the shards locally and remote 75 76-- distribution key values of 1,6, 500 and 701 are LOCAL to shards, 77-- we'll use these values in the tests 78SELECT shard_of_distribution_column_is_local(1); 79SELECT shard_of_distribution_column_is_local(6); 80SELECT shard_of_distribution_column_is_local(500); 81SELECT shard_of_distribution_column_is_local(701); 82 83-- distribution key values of 11 and 12 are REMOTE to shards 84SELECT shard_of_distribution_column_is_local(11); 85SELECT shard_of_distribution_column_is_local(12); 86 87BEGIN; 88 -- run select with local execution 89 SELECT count(*) FROM distributed_table WHERE key = 1; 90 91 SELECT count(*) FROM distributed_table; 92 -- the local placements should be executed locally 93 COPY distributed_table FROM STDIN WITH delimiter ','; 941, 100 952, 200 963, 300 974, 400 985, 500 99\. 100 -- verify that the copy is successful. 101 SELECT count(*) FROM distributed_table; 102 103ROLLBACK; 104 105BEGIN; 106 -- run select with local execution 107 SELECT count(*) FROM distributed_table WHERE key = 1; 108 109 SELECT count(*) FROM distributed_table; 110 -- the local placements should be executed locally 111 COPY distributed_table FROM STDIN WITH delimiter ','; 1121, 100 1132, 200 1143, 300 1154, 400 1165, 500 117\. 118 -- verify the put ages. 119 SELECT * FROM distributed_table ORDER BY 1,2 DESC; 120 121ROLLBACK; 122 123 124BEGIN; 125 -- run select with local execution 126 SELECT count(*) FROM distributed_table WHERE key = 1; 127 128 SELECT count(*) FROM distributed_table; 129 -- the local placements should be executed locally 130 COPY distributed_table FROM STDIN WITH delimiter ','; 1311, 100 1322, 200 1333, 300 1344, 400 1355, 500 136\. 137 -- verify that the copy is successful. 138 SELECT count(*) FROM distributed_table; 139 140ROLLBACK; 141 142BEGIN; 143 -- run select with local execution 144 SELECT age FROM distributed_table WHERE key = 1; 145 146 SELECT count(*) FROM collections_list; 147 -- the local placements should be executed locally 148 COPY collections_list FROM STDIN WITH delimiter ','; 1491, 0 1502, 0 1513, 0 1524, 1 1535, 1 154\. 155 -- verify that the copy is successful. 156 SELECT count(*) FROM collections_list; 157 158ROLLBACK; 159 160BEGIN; 161 -- run select with local execution 162 SELECT age FROM distributed_table WHERE key = 1; 163 164 SELECT count(*) FROM distributed_table; 165 -- the local placements should be executed locally 166 COPY distributed_table FROM STDIN WITH delimiter ','; 1671, 100 1682, 200 1693, 300 1704, 400 1715, 500 172\. 173 174 175 -- verify that the copy is successful. 176 SELECT count(*) FROM distributed_table; 177 178ROLLBACK; 179 180BEGIN; 181-- Since we are in a transaction, the copy should be locally executed. 182COPY distributed_table FROM STDIN WITH delimiter ','; 1831, 100 1842, 200 1853, 300 1864, 400 1875, 500 188\. 189ROLLBACK; 190 191-- Since we are not in a transaction, the copy should not be locally executed. 192COPY distributed_table FROM STDIN WITH delimiter ','; 1931, 100 1942, 200 1953, 300 1964, 400 1975, 500 198\. 199 200BEGIN; 201-- Since we are in a transaction, the copy should be locally executed. But 202-- we are putting duplicate key, so it should error. 203COPY distributed_table FROM STDIN WITH delimiter ','; 2041, 100 2052, 200 2063, 300 2074, 400 2085, 500 209\. 210ROLLBACK; 211 212TRUNCATE distributed_table; 213 214BEGIN; 215 216-- insert a lot of data ( around 8MB), 217-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) 218INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); 219 220ROLLBACK; 221 222COPY distributed_table FROM STDIN WITH delimiter ','; 2231, 9 224\. 225 226BEGIN; 227-- Since we are in a transaction, the execution will be local, however we are putting invalid age. 228-- The constaints should give an error 229COPY distributed_table FROM STDIN WITH delimiter ','; 2301,9 231\. 232ROLLBACK; 233 234TRUNCATE distributed_table; 235 236 237-- different delimiters 238BEGIN; 239-- run select with local execution 240SELECT count(*) FROM distributed_table WHERE key = 1; 241-- initial size 242SELECT count(*) FROM distributed_table; 243COPY distributed_table FROM STDIN WITH delimiter '|'; 2441|10 2452|30 2463|40 247\. 248-- new size 249SELECT count(*) FROM distributed_table; 250ROLLBACK; 251 252BEGIN; 253-- run select with local execution 254SELECT count(*) FROM distributed_table WHERE key = 1; 255-- initial size 256SELECT count(*) FROM distributed_table; 257COPY distributed_table FROM STDIN WITH delimiter '['; 2581[10 2592[30 2603[40 261\. 262-- new size 263SELECT count(*) FROM distributed_table; 264ROLLBACK; 265 266 267-- multiple local copies 268BEGIN; 269COPY distributed_table FROM STDIN WITH delimiter ','; 2701,15 2712,20 2723,30 273\. 274COPY distributed_table FROM STDIN WITH delimiter ','; 27510,15 27620,20 27730,30 278\. 279COPY distributed_table FROM STDIN WITH delimiter ','; 280100,15 281200,20 282300,30 283\. 284ROLLBACK; 285 286-- local copy followed by local copy should see the changes 287-- and error since it is a duplicate primary key. 288BEGIN; 289COPY distributed_table FROM STDIN WITH delimiter ','; 2901,15 291\. 292COPY distributed_table FROM STDIN WITH delimiter ','; 2931,16 294\. 295ROLLBACK; 296 297 298-- local copy followed by local copy should see the changes 299BEGIN; 300COPY distributed_table FROM STDIN WITH delimiter ','; 3011,15 302\. 303-- select should see the change 304SELECT key FROM distributed_table WHERE key = 1; 305ROLLBACK; 306 307\c - - - :master_port 308 309SET search_path TO local_shard_copy; 310SET citus.log_local_commands TO ON; 311 312TRUNCATE TABLE reference_table; 313TRUNCATE TABLE local_table; 314 315SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; 316 317SET citus.enable_local_execution = 'on'; 318 319BEGIN; 320-- copy should be executed locally 321COPY reference_table FROM STDIN; 3221 3232 3243 3254 326\. 327ROLLBACK; 328 329SET citus.enable_local_execution = 'off'; 330 331BEGIN; 332-- copy should not be executed locally as citus.enable_local_execution = off 333COPY reference_table FROM STDIN; 3341 3352 3363 3374 338\. 339ROLLBACK; 340 341SET citus.enable_local_execution = 'on'; 342 343CREATE TABLE ref_table(a int); 344INSERT INTO ref_table VALUES(1); 345 346BEGIN; 347-- trigger local execution 348SELECT COUNT(*) FROM reference_table; 349-- shard creation should be done locally 350SELECT create_reference_table('ref_table'); 351INSERT INTO ref_table VALUES(2); 352 353-- verify that it worked. 354SELECT COUNT(*) FROM ref_table; 355ROLLBACK; 356 357SET client_min_messages TO ERROR; 358SET search_path TO public; 359DROP SCHEMA local_shard_copy CASCADE; 360