1CREATE SCHEMA local_shard_copy;
2SET search_path TO local_shard_copy;
3
4SET client_min_messages TO DEBUG;
5SET citus.next_shard_id TO 1570000;
6SET citus.replicate_reference_tables_on_activate TO off;
7
8SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0);
9
10SET citus.shard_count TO 4;
11SET citus.shard_replication_factor TO 1;
12
13CREATE TABLE reference_table (key int PRIMARY KEY);
14SELECT create_reference_table('reference_table');
15
16CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10));
17SELECT create_distributed_table('distributed_table','key');
18
19INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40);
20INSERT INTO reference_table SELECT * FROM generate_series(1, 10);
21
22CREATE TABLE local_table (key int PRIMARY KEY);
23INSERT INTO local_table SELECT * from generate_series(1, 10);
24
25-- partitioned table
26CREATE TABLE collections_list (
27	key bigserial,
28	collection_id integer
29) PARTITION BY LIST (collection_id );
30
31SELECT create_distributed_table('collections_list', 'key');
32
33CREATE TABLE collections_list_0
34	PARTITION OF collections_list (key, collection_id)
35	FOR VALUES IN ( 0 );
36
37CREATE TABLE collections_list_1
38	PARTITION OF collections_list (key, collection_id)
39	FOR VALUES IN ( 1 );
40
41
42-- connection worker and get ready for the tests
43\c - - - :worker_1_port
44
45SET search_path TO local_shard_copy;
46SET citus.log_local_commands TO ON;
47
48-- returns true of the distribution key filter
49-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard
50-- placement which is local to this not
51CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$
52
53		DECLARE shard_is_local BOOLEAN := FALSE;
54
55		BEGIN
56
57		  	WITH  local_shard_ids 			  AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)),
58				  all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group))
59		SELECT
60			true INTO shard_is_local
61		FROM
62			local_shard_ids
63		WHERE
64			get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node);
65
66		IF shard_is_local IS NULL THEN
67			shard_is_local = FALSE;
68		END IF;
69
70		RETURN shard_is_local;
71        END;
72$$ LANGUAGE plpgsql;
73
74-- pick some example values that reside on the shards locally and remote
75
76-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
77-- we'll use these values in the tests
78SELECT shard_of_distribution_column_is_local(1);
79SELECT shard_of_distribution_column_is_local(6);
80SELECT shard_of_distribution_column_is_local(500);
81SELECT shard_of_distribution_column_is_local(701);
82
83-- distribution key values of 11 and 12 are REMOTE to shards
84SELECT shard_of_distribution_column_is_local(11);
85SELECT shard_of_distribution_column_is_local(12);
86
87BEGIN;
88    -- run select with local execution
89    SELECT count(*) FROM distributed_table WHERE key = 1;
90
91    SELECT count(*) FROM distributed_table;
92    -- the local placements should be executed locally
93    COPY distributed_table FROM STDIN WITH delimiter ',';
941, 100
952, 200
963, 300
974, 400
985, 500
99\.
100    -- verify that the copy is successful.
101    SELECT count(*) FROM distributed_table;
102
103ROLLBACK;
104
105BEGIN;
106    -- run select with local execution
107    SELECT count(*) FROM distributed_table WHERE key = 1;
108
109    SELECT count(*) FROM distributed_table;
110    -- the local placements should be executed locally
111    COPY distributed_table FROM STDIN WITH delimiter ',';
1121, 100
1132, 200
1143, 300
1154, 400
1165, 500
117\.
118    -- verify the put ages.
119    SELECT * FROM distributed_table ORDER BY 1,2 DESC;
120
121ROLLBACK;
122
123
124BEGIN;
125    -- run select with local execution
126    SELECT count(*) FROM distributed_table WHERE key = 1;
127
128    SELECT count(*) FROM distributed_table;
129    -- the local placements should be executed locally
130    COPY distributed_table FROM STDIN WITH delimiter ',';
1311, 100
1322, 200
1333, 300
1344, 400
1355, 500
136\.
137    -- verify that the copy is successful.
138    SELECT count(*) FROM distributed_table;
139
140ROLLBACK;
141
142BEGIN;
143    -- run select with local execution
144    SELECT age FROM distributed_table WHERE key = 1;
145
146    SELECT count(*) FROM collections_list;
147    -- the local placements should be executed locally
148    COPY collections_list FROM STDIN WITH delimiter ',';
1491, 0
1502, 0
1513, 0
1524, 1
1535, 1
154\.
155    -- verify that the copy is successful.
156    SELECT count(*) FROM collections_list;
157
158ROLLBACK;
159
160BEGIN;
161    -- run select with local execution
162    SELECT age FROM distributed_table WHERE key = 1;
163
164    SELECT count(*) FROM distributed_table;
165    -- the local placements should be executed locally
166    COPY distributed_table FROM STDIN WITH delimiter ',';
1671, 100
1682, 200
1693, 300
1704, 400
1715, 500
172\.
173
174
175    -- verify that the copy is successful.
176    SELECT count(*) FROM distributed_table;
177
178ROLLBACK;
179
180BEGIN;
181-- Since we are in a transaction, the copy should be locally executed.
182COPY distributed_table FROM STDIN WITH delimiter ',';
1831, 100
1842, 200
1853, 300
1864, 400
1875, 500
188\.
189ROLLBACK;
190
191-- Since we are not in a transaction, the copy should not be locally executed.
192COPY distributed_table FROM STDIN WITH delimiter ',';
1931, 100
1942, 200
1953, 300
1964, 400
1975, 500
198\.
199
200BEGIN;
201-- Since we are in a transaction, the copy should be locally executed. But
202-- we are putting duplicate key, so it should error.
203COPY distributed_table FROM STDIN WITH delimiter ',';
2041, 100
2052, 200
2063, 300
2074, 400
2085, 500
209\.
210ROLLBACK;
211
212TRUNCATE distributed_table;
213
214BEGIN;
215
216-- insert a lot of data ( around 8MB),
217-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB)
218INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000);
219
220ROLLBACK;
221
222COPY distributed_table FROM STDIN WITH delimiter ',';
2231, 9
224\.
225
226BEGIN;
227-- Since we are in a transaction, the execution will be local, however we are putting invalid age.
228-- The constaints should give an error
229COPY distributed_table FROM STDIN WITH delimiter ',';
2301,9
231\.
232ROLLBACK;
233
234TRUNCATE distributed_table;
235
236
237-- different delimiters
238BEGIN;
239-- run select with local execution
240SELECT count(*) FROM distributed_table WHERE key = 1;
241-- initial size
242SELECT count(*) FROM distributed_table;
243COPY distributed_table FROM STDIN WITH delimiter '|';
2441|10
2452|30
2463|40
247\.
248-- new size
249SELECT count(*) FROM distributed_table;
250ROLLBACK;
251
252BEGIN;
253-- run select with local execution
254SELECT count(*) FROM distributed_table WHERE key = 1;
255-- initial size
256SELECT count(*) FROM distributed_table;
257COPY distributed_table FROM STDIN WITH delimiter '[';
2581[10
2592[30
2603[40
261\.
262-- new size
263SELECT count(*) FROM distributed_table;
264ROLLBACK;
265
266
267-- multiple local copies
268BEGIN;
269COPY distributed_table FROM STDIN WITH delimiter ',';
2701,15
2712,20
2723,30
273\.
274COPY distributed_table FROM STDIN WITH delimiter ',';
27510,15
27620,20
27730,30
278\.
279COPY distributed_table FROM STDIN WITH delimiter ',';
280100,15
281200,20
282300,30
283\.
284ROLLBACK;
285
286-- local copy followed by local copy should see the changes
287-- and error since it is a duplicate primary key.
288BEGIN;
289COPY distributed_table FROM STDIN WITH delimiter ',';
2901,15
291\.
292COPY distributed_table FROM STDIN WITH delimiter ',';
2931,16
294\.
295ROLLBACK;
296
297
298-- local copy followed by local copy should see the changes
299BEGIN;
300COPY distributed_table FROM STDIN WITH delimiter ',';
3011,15
302\.
303-- select should see the change
304SELECT key FROM distributed_table WHERE key = 1;
305ROLLBACK;
306
307\c - - - :master_port
308
309SET search_path TO local_shard_copy;
310SET citus.log_local_commands TO ON;
311
312TRUNCATE TABLE reference_table;
313TRUNCATE TABLE local_table;
314
315SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key;
316
317SET citus.enable_local_execution = 'on';
318
319BEGIN;
320-- copy should be executed locally
321COPY reference_table FROM STDIN;
3221
3232
3243
3254
326\.
327ROLLBACK;
328
329SET citus.enable_local_execution = 'off';
330
331BEGIN;
332-- copy should not be executed locally as citus.enable_local_execution = off
333COPY reference_table FROM STDIN;
3341
3352
3363
3374
338\.
339ROLLBACK;
340
341SET citus.enable_local_execution = 'on';
342
343CREATE TABLE ref_table(a int);
344INSERT INTO ref_table VALUES(1);
345
346BEGIN;
347-- trigger local execution
348SELECT COUNT(*) FROM reference_table;
349-- shard creation should be done locally
350SELECT create_reference_table('ref_table');
351INSERT INTO ref_table VALUES(2);
352
353-- verify that it worked.
354SELECT COUNT(*) FROM ref_table;
355ROLLBACK;
356
357SET client_min_messages TO ERROR;
358SET search_path TO public;
359DROP SCHEMA local_shard_copy CASCADE;
360