1--
2-- REPLICATE_REF_TABLES_ON_COORDINATOR
3--
4CREATE SCHEMA replicate_ref_to_coordinator;
5SET search_path TO 'replicate_ref_to_coordinator';
6SET citus.shard_replication_factor TO 1;
7SET citus.shard_count TO 4;
8SET citus.next_shard_id TO 8000000;
9SET citus.next_placement_id TO 8000000;
10--- enable logging to see which tasks are executed locally
11SET citus.log_local_commands TO ON;
12CREATE TABLE squares(a int, b int);
13SELECT create_reference_table('squares');
14 create_reference_table
15---------------------------------------------------------------------
16
17(1 row)
18
19INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i;
20NOTICE:  executing the copy locally for shard xxxxx
21CREATE INDEX CONCURRENTLY squares_a_idx ON squares (a);
22SELECT substring(current_Setting('server_version'), '\d+')::int > 11 AS server_version_above_eleven
23\gset
24\if :server_version_above_eleven
25REINDEX INDEX CONCURRENTLY squares_a_idx;
26\endif
27DROP INDEX CONCURRENTLY squares_a_idx;
28-- should be executed locally
29SELECT count(*) FROM squares;
30NOTICE:  executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares
31 count
32---------------------------------------------------------------------
33    10
34(1 row)
35
36-- create a second reference table
37CREATE TABLE numbers(a int);
38SELECT create_reference_table('numbers');
39 create_reference_table
40---------------------------------------------------------------------
41
42(1 row)
43
44INSERT INTO numbers VALUES (20), (21);
45NOTICE:  executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (20), (21)
46CREATE OR REPLACE FUNCTION my_volatile_fn()
47RETURNS INT AS $$
48BEGIN
49  RETURN 1;
50END; $$ language plpgsql VOLATILE;
51-- INSERT ... SELECT between reference tables
52BEGIN;
53EXPLAIN (COSTS OFF) INSERT INTO squares SELECT a, a*a FROM numbers;
54                        QUERY PLAN
55---------------------------------------------------------------------
56 Custom Scan (Citus Adaptive)
57   Task Count: 1
58   Tasks Shown: All
59   ->  Task
60         Node: host=localhost port=xxxxx dbname=regression
61         ->  Insert on squares_8000000 citus_table_alias
62               ->  Seq Scan on numbers_8000001 numbers
63(7 rows)
64
65INSERT INTO squares SELECT a, a*a FROM numbers;
66SELECT * FROM squares WHERE a >= 20 ORDER BY a;
67 a  |  b
68---------------------------------------------------------------------
69 20 | 400
70 21 | 441
71(2 rows)
72
73ROLLBACK;
74BEGIN;
75EXPLAIN (COSTS OFF) INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
76                        QUERY PLAN
77---------------------------------------------------------------------
78 Custom Scan (Citus Adaptive)
79   Task Count: 1
80   Tasks Shown: All
81   ->  Task
82         Node: host=localhost port=xxxxx dbname=regression
83         ->  Insert on numbers_8000001 citus_table_alias
84               ->  Seq Scan on squares_8000000 squares
85                     Filter: (a < 3)
86(8 rows)
87
88INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
89SELECT * FROM numbers ORDER BY a;
90 a
91---------------------------------------------------------------------
92  1
93  2
94 20
95 21
96(4 rows)
97
98ROLLBACK;
99-- Make sure we hide shard tables ...
100SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
101 citus_table_is_visible
102---------------------------------------------------------------------
103 f
104(1 row)
105
106-- Join between reference tables and local tables
107CREATE TABLE local_table(a int);
108INSERT INTO local_table VALUES (2), (4), (7), (20);
109EXPLAIN (COSTS OFF) SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
110                         QUERY PLAN
111---------------------------------------------------------------------
112 Custom Scan (Citus Adaptive)
113   Task Count: 1
114   Tasks Shown: All
115   ->  Task
116         Node: host=localhost port=xxxxx dbname=regression
117         ->  Merge Join
118               Merge Cond: (local_table.a = numbers.a)
119               ->  Sort
120                     Sort Key: local_table.a
121                     ->  Seq Scan on local_table
122               ->  Sort
123                     Sort Key: numbers.a
124                     ->  Seq Scan on numbers_8000001 numbers
125(13 rows)
126
127SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
128NOTICE:  executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a
129 a  | a
130---------------------------------------------------------------------
131 20 | 20
132(1 row)
133
134-- test non equijoin
135SELECT lt.a, sq.a, sq.b
136FROM local_table lt
137JOIN squares sq ON sq.a > lt.a and sq.b > 90
138ORDER BY 1,2,3;
139NOTICE:  executing the command locally: SELECT lt.a, sq.a, sq.b FROM (replicate_ref_to_coordinator.local_table lt JOIN replicate_ref_to_coordinator.squares_8000000 sq ON (((sq.a OPERATOR(pg_catalog.>) lt.a) AND (sq.b OPERATOR(pg_catalog.>) 90)))) ORDER BY lt.a, sq.a, sq.b
140 a | a  |  b
141---------------------------------------------------------------------
142 2 | 10 | 100
143 4 | 10 | 100
144 7 | 10 | 100
145(3 rows)
146
147-- should work if in transaction block
148BEGIN;
149SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
150NOTICE:  executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a
151 a  | a
152---------------------------------------------------------------------
153 20 | 20
154(1 row)
155
156ROLLBACK;
157-- should work if in a DO block
158DO $$
159BEGIN
160	PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers;
161END;
162$$;
163NOTICE:  executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a))
164CONTEXT:  SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers"
165PL/pgSQL function inline_code_block line XX at PERFORM
166-- test plpgsql function
167CREATE FUNCTION test_reference_local_join_plpgsql_func()
168RETURNS void AS $$
169BEGIN
170	INSERT INTO local_table VALUES (21);
171	INSERT INTO numbers VALUES (4);
172	PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
173	RAISE EXCEPTION '';
174	PERFORM local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
175END;
176$$ LANGUAGE plpgsql;
177SELECT test_reference_local_join_plpgsql_func();
178NOTICE:  executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (4)
179CONTEXT:  SQL statement "INSERT INTO numbers VALUES (4)"
180PL/pgSQL function test_reference_local_join_plpgsql_func() line XX at SQL statement
181NOTICE:  executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a
182CONTEXT:  SQL statement "SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1"
183PL/pgSQL function test_reference_local_join_plpgsql_func() line XX at PERFORM
184ERROR:
185CONTEXT:  PL/pgSQL function test_reference_local_join_plpgsql_func() line XX at RAISE
186SELECT sum(a) FROM local_table;
187 sum
188---------------------------------------------------------------------
189  33
190(1 row)
191
192SELECT sum(a) FROM numbers;
193NOTICE:  executing the command locally: SELECT sum(a) AS sum FROM replicate_ref_to_coordinator.numbers_8000001 numbers
194 sum
195---------------------------------------------------------------------
196  41
197(1 row)
198
199-- error if in procedure's subtransaction
200CREATE PROCEDURE test_reference_local_join_proc() AS $$
201SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
202$$ LANGUAGE sql;
203CALL test_reference_local_join_proc();
204NOTICE:  executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) ORDER BY local_table.a
205CONTEXT:  SQL function "test_reference_local_join_proc" statement 1
206CREATE SCHEMA s1;
207CREATE TABLE s1.ref(a int);
208SELECT create_reference_table('s1.ref');
209 create_reference_table
210---------------------------------------------------------------------
211
212(1 row)
213
214BEGIN;
215SELECT local_table.a, r.a FROM local_table NATURAL JOIN s1.ref r ORDER BY 1;
216NOTICE:  executing the command locally: SELECT local_table.a, r.a FROM (replicate_ref_to_coordinator.local_table JOIN s1.ref_8000002 r(a) USING (a)) ORDER BY local_table.a
217 a | a
218---------------------------------------------------------------------
219(0 rows)
220
221ROLLBACK;
222BEGIN;
223WITH t1 AS (
224	SELECT my_volatile_fn() r, a FROM local_table
225) SELECT count(*) FROM t1, numbers WHERE t1.a = numbers.a AND r < 0.5;
226NOTICE:  executing the command locally: WITH t1 AS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS r, local_table.a FROM replicate_ref_to_coordinator.local_table) SELECT count(*) AS count FROM t1, replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((t1.a OPERATOR(pg_catalog.=) numbers.a) AND ((t1.r)::numeric OPERATOR(pg_catalog.<) 0.5))
227 count
228---------------------------------------------------------------------
229     0
230(1 row)
231
232END;
233BEGIN;
234WITH t1 AS (
235	SELECT my_volatile_fn() r, a FROM numbers
236) SELECT count(*) FROM t1, local_table WHERE t1.a = local_table.a AND r < 0.5;
237NOTICE:  executing the command locally: WITH t1 AS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS r, numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers) SELECT count(*) AS count FROM t1, replicate_ref_to_coordinator.local_table WHERE ((t1.a OPERATOR(pg_catalog.=) local_table.a) AND ((t1.r)::numeric OPERATOR(pg_catalog.<) 0.5))
238 count
239---------------------------------------------------------------------
240     0
241(1 row)
242
243END;
244BEGIN;
245SELECT count(*) FROM local_table
246WHERE EXISTS(SELECT my_volatile_fn() FROM numbers WHERE local_table.a = numbers.a);
247NOTICE:  executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS my_volatile_fn FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE (local_table.a OPERATOR(pg_catalog.=) numbers.a)))
248 count
249---------------------------------------------------------------------
250     1
251(1 row)
252
253END;
254BEGIN;
255SELECT count(*) FROM numbers
256WHERE EXISTS(SELECT my_volatile_fn() FROM local_table WHERE local_table.a = numbers.a);
257NOTICE:  executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE (EXISTS (SELECT replicate_ref_to_coordinator.my_volatile_fn() AS my_volatile_fn FROM replicate_ref_to_coordinator.local_table WHERE (local_table.a OPERATOR(pg_catalog.=) numbers.a)))
258 count
259---------------------------------------------------------------------
260     1
261(1 row)
262
263END;
264DROP SCHEMA s1 CASCADE;
265NOTICE:  drop cascades to 2 other objects
266DETAIL:  drop cascades to table s1.ref
267drop cascades to table s1.ref_8000002
268-- not error if inside a SQL UDF call
269CREATE or replace FUNCTION test_reference_local_join_func()
270RETURNS SETOF RECORD AS $$
271SET LOCAL citus.enable_local_execution to false;
272INSERT INTO numbers VALUES (2);
273SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers ORDER BY 1;
274$$ LANGUAGE sql;
275SELECT test_reference_local_join_func();
276 test_reference_local_join_func
277---------------------------------------------------------------------
278 (2,2)
279 (20,20)
280(2 rows)
281
282-- CTEs are allowed
283WITH ins AS (INSERT INTO numbers VALUES (1) RETURNING *)
284SELECT * FROM numbers, local_table ORDER BY 1,2;
285NOTICE:  executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 (a) VALUES (1) RETURNING a
286NOTICE:  executing the command locally: SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, (SELECT local_table_1.a FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) local_table_1) local_table ORDER BY numbers.a, local_table.a
287 a  | a
288---------------------------------------------------------------------
289  2 |  2
290  2 |  4
291  2 |  7
292  2 | 20
293 20 |  2
294 20 |  4
295 20 |  7
296 20 | 20
297 21 |  2
298 21 |  4
299 21 |  7
300 21 | 20
301(12 rows)
302
303WITH t AS (SELECT *, my_volatile_fn() x FROM numbers FOR UPDATE)
304SELECT * FROM numbers, local_table
305WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
306NOTICE:  executing the command locally: WITH t AS (SELECT numbers_1.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM replicate_ref_to_coordinator.numbers_8000001 numbers_1 FOR UPDATE OF numbers_1) SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT t.a, t.x FROM t WHERE (t.x OPERATOR(pg_catalog.=) numbers.a)))
307 a | a
308---------------------------------------------------------------------
309 1 |  2
310 1 |  4
311 1 |  7
312 1 | 20
313(4 rows)
314
315WITH t AS (SELECT *, my_volatile_fn() x FROM numbers)
316SELECT * FROM numbers, local_table
317WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
318NOTICE:  executing the command locally: WITH t AS (SELECT numbers_1.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM replicate_ref_to_coordinator.numbers_8000001 numbers_1) SELECT numbers.a, local_table.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers, replicate_ref_to_coordinator.local_table WHERE (EXISTS (SELECT t.a, t.x FROM t WHERE (t.x OPERATOR(pg_catalog.=) numbers.a)))
319 a | a
320---------------------------------------------------------------------
321 1 |  2
322 1 |  4
323 1 |  7
324 1 | 20
325(4 rows)
326
327-- shouldn't plan locally even if distributed table is in CTE or subquery
328CREATE TABLE dist(a int);
329SELECT create_distributed_table('dist', 'a');
330 create_distributed_table
331---------------------------------------------------------------------
332
333(1 row)
334
335INSERT INTO dist VALUES (20),(30);
336WITH t AS (SELECT *, my_volatile_fn() x FROM dist)
337SELECT * FROM numbers, local_table
338WHERE EXISTS (SELECT * FROM t WHERE t.x = numbers.a);
339ERROR:  function replicate_ref_to_coordinator.my_volatile_fn() does not exist
340HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
341CONTEXT:  while executing command on localhost:xxxxx
342-- test CTE being reference/local join for distributed query
343WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l)
344SELECT a FROM t NATURAL JOIN dist;
345NOTICE:  executing the command locally: SELECT n.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM (replicate_ref_to_coordinator.numbers_8000001 n(a) JOIN replicate_ref_to_coordinator.local_table l USING (a))
346 a
347---------------------------------------------------------------------
348 20
349(1 row)
350
351 -- shouldn't error if FOR UPDATE/FOR SHARE
352SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR SHARE;
353NOTICE:  executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) FOR SHARE OF local_table FOR SHARE OF numbers
354 a  | a
355---------------------------------------------------------------------
356  2 |  2
357 20 | 20
358(2 rows)
359
360SELECT local_table.a, numbers.a FROM local_table NATURAL JOIN numbers FOR UPDATE;
361NOTICE:  executing the command locally: SELECT local_table.a, numbers.a FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a)) FOR UPDATE OF local_table FOR UPDATE OF numbers
362 a  | a
363---------------------------------------------------------------------
364  2 |  2
365 20 | 20
366(2 rows)
367
368--
369-- Joins between reference tables and views shouldn't be planned locally.
370--
371CREATE VIEW numbers_v AS SELECT * FROM numbers WHERE a=1;
372SELECT public.coordinator_plan($Q$
373EXPLAIN (COSTS FALSE)
374	SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a;
375$Q$);
376       coordinator_plan
377---------------------------------------------------------------------
378 Custom Scan (Citus Adaptive)
379   Task Count: 1
380(2 rows)
381
382CREATE VIEW local_table_v AS SELECT * FROM local_table WHERE a BETWEEN 1 AND 10;
383SELECT public.coordinator_plan($Q$
384EXPLAIN (COSTS FALSE)
385	SELECT * FROM squares JOIN local_table_v ON squares.a = local_table_v.a;
386$Q$);
387       coordinator_plan
388---------------------------------------------------------------------
389 Custom Scan (Citus Adaptive)
390   Task Count: 1
391(2 rows)
392
393DROP VIEW numbers_v, local_table_v;
394--
395-- Joins between reference tables and materialized views are allowed to
396-- be planned to be executed locally.
397--
398CREATE MATERIALIZED VIEW numbers_v AS SELECT * FROM numbers WHERE a BETWEEN 1 AND 10;
399NOTICE:  executing the command locally: SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((a OPERATOR(pg_catalog.>=) 1) AND (a OPERATOR(pg_catalog.<=) 10))
400REFRESH MATERIALIZED VIEW numbers_v;
401NOTICE:  executing the command locally: SELECT numbers.a FROM replicate_ref_to_coordinator.numbers_8000001 numbers WHERE ((numbers.a OPERATOR(pg_catalog.>=) 1) AND (numbers.a OPERATOR(pg_catalog.<=) 10))
402SELECT * FROM squares JOIN numbers_v ON squares.a = numbers_v.a ORDER BY 1;
403NOTICE:  executing the command locally: SELECT squares.a, squares.b, numbers_v.a FROM (replicate_ref_to_coordinator.squares_8000000 squares JOIN replicate_ref_to_coordinator.numbers_v ON ((squares.a OPERATOR(pg_catalog.=) numbers_v.a))) ORDER BY squares.a
404 a | b | a
405---------------------------------------------------------------------
406 1 | 1 | 1
407 2 | 4 | 2
408(2 rows)
409
410--
411-- Joins between reference tables, local tables, and function calls
412-- are allowed
413--
414SELECT count(*)
415FROM local_table a, numbers b, generate_series(1, 10) c
416WHERE a.a = b.a AND a.a = c;
417NOTICE:  executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b, generate_series(1, 10) c(c) WHERE ((a.a OPERATOR(pg_catalog.=) b.a) AND (a.a OPERATOR(pg_catalog.=) c.c))
418 count
419---------------------------------------------------------------------
420     1
421(1 row)
422
423-- and it should be okay if the function call is not a data source
424SELECT abs(a.a) FROM local_table a, numbers b WHERE a.a = b.a;
425NOTICE:  executing the command locally: SELECT abs(a.a) AS abs FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b WHERE (a.a OPERATOR(pg_catalog.=) b.a)
426 abs
427---------------------------------------------------------------------
428   2
429  20
430(2 rows)
431
432SELECT a.a FROM local_table a, numbers b WHERE a.a = b.a ORDER BY abs(a.a);
433NOTICE:  executing the command locally: SELECT a.a FROM replicate_ref_to_coordinator.local_table a, replicate_ref_to_coordinator.numbers_8000001 b WHERE (a.a OPERATOR(pg_catalog.=) b.a) ORDER BY (abs(a.a))
434 a
435---------------------------------------------------------------------
436  2
437 20
438(2 rows)
439
440TRUNCATE local_table;
441TRUNCATE numbers;
442NOTICE:  executing the command locally: TRUNCATE TABLE replicate_ref_to_coordinator.numbers_xxxxx CASCADE
443BEGIN;
444INSERT INTO local_table VALUES (1), (2), (3), (4);
445INSERT INTO numbers VALUES (1), (2), (3), (4);
446NOTICE:  executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (1), (2), (3), (4)
447ALTER TABLE numbers ADD COLUMN d int;
448NOTICE:  executing the command locally: SELECT worker_apply_shard_ddl_command (8000001, 'replicate_ref_to_coordinator', 'ALTER TABLE numbers ADD COLUMN d int;')
449SELECT * FROM local_table JOIN numbers USING(a) ORDER BY a;
450NOTICE:  executing the command locally: SELECT local_table.a, numbers.d FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a, d) USING (a)) ORDER BY local_table.a
451 a | d
452---------------------------------------------------------------------
453 1 |
454 2 |
455 3 |
456 4 |
457(4 rows)
458
459ROLLBACK;
460BEGIN;
461INSERT INTO local_table VALUES (1), (2), (3);
462WITH t as (SELECT n.a, my_volatile_fn() x FROM numbers n NATURAL JOIN local_table l ORDER BY n.a, x)
463SELECT a FROM t NATURAL JOIN dist ORDER BY a;
464NOTICE:  executing the command locally: SELECT n.a, replicate_ref_to_coordinator.my_volatile_fn() AS x FROM (replicate_ref_to_coordinator.numbers_8000001 n(a) JOIN replicate_ref_to_coordinator.local_table l USING (a)) ORDER BY n.a, (replicate_ref_to_coordinator.my_volatile_fn())
465 a
466---------------------------------------------------------------------
467(0 rows)
468
469ROLLBACK;
470BEGIN;
471INSERT INTO local_table VALUES (1), (2), (3);
472INSERT INTO numbers SELECT * FROM generate_series(1, 100);
473NOTICE:  executing the copy locally for shard xxxxx
474INSERT INTO numbers SELECT * FROM numbers;
475NOTICE:  executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) SELECT a FROM replicate_ref_to_coordinator.numbers_8000001 numbers
476SELECT COUNT(*) FROM local_table JOIN numbers using (a);
477NOTICE:  executing the command locally: SELECT count(*) AS count FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a))
478 count
479---------------------------------------------------------------------
480     6
481(1 row)
482
483UPDATE numbers SET a = a + 1;
484NOTICE:  executing the command locally: UPDATE replicate_ref_to_coordinator.numbers_8000001 numbers SET a = (a OPERATOR(pg_catalog.+) 1)
485SELECT COUNT(*) FROM local_table JOIN numbers using (a);
486NOTICE:  executing the command locally: SELECT count(*) AS count FROM (replicate_ref_to_coordinator.local_table JOIN replicate_ref_to_coordinator.numbers_8000001 numbers(a) USING (a))
487 count
488---------------------------------------------------------------------
489     4
490(1 row)
491
492ROLLBACK;
493-- verify that we can drop columns from reference tables replicated to the coordinator
494-- see https://github.com/citusdata/citus/issues/3279
495ALTER TABLE squares DROP COLUMN b;
496NOTICE:  executing the command locally: SELECT worker_apply_shard_ddl_command (8000000, 'replicate_ref_to_coordinator', 'ALTER TABLE squares DROP COLUMN b;')
497-- verify that we replicate the reference tables that are distributed before
498-- adding the coordinator as a worker.
499SELECT master_remove_node('localhost', :master_port);
500 master_remove_node
501---------------------------------------------------------------------
502
503(1 row)
504
505-- add the coordinator as a worker node and verify that the reference tables are replicated
506SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset
507NOTICE:  Replicating reference table "squares" to the node localhost:xxxxx
508NOTICE:  Replicating reference table "numbers" to the node localhost:xxxxx
509-- clean-up
510SET client_min_messages TO ERROR;
511DROP SCHEMA replicate_ref_to_coordinator CASCADE;
512-- Make sure the shard was dropped
513SELECT 'numbers_8000001'::regclass::oid;
514ERROR:  relation "numbers_8000001" does not exist
515SET search_path TO DEFAULT;
516RESET client_min_messages;
517