1-- Test creation of mx tables and metadata syncing
2
3SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement_id
4\gset
5SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
6SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
7SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset
8SELECT nextval('pg_catalog.pg_dist_shardid_seq') AS last_shard_id \gset
9
10
11SET citus.shard_count TO 8;
12SET citus.shard_replication_factor TO 1;
13SET citus.replicate_reference_tables_on_activate TO off;
14
15\set VERBOSITY terse
16
17-- Simulates a readonly node by setting default_transaction_read_only.
18CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
19    RETURNS TEXT
20    LANGUAGE sql
21    AS $$
22    SELECT master_run_on_worker(ARRAY[hostname], ARRAY[port],
23           ARRAY['ALTER SYSTEM SET default_transaction_read_only TO ' || isreadonly::TEXT], false);
24    SELECT result FROM
25        master_run_on_worker(ARRAY[hostname], ARRAY[port],
26                             ARRAY['SELECT pg_reload_conf()'], false);
27$$;
28
29CREATE OR REPLACE FUNCTION trigger_metadata_sync()
30    RETURNS void
31    LANGUAGE C STRICT
32    AS 'citus';
33
34CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
35    RETURNS void
36    LANGUAGE C STRICT
37    AS 'citus';
38
39CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
40declare
41   counter integer := -1;
42begin
43 while counter != target_count loop
44  -- pg_stat_activity is cached at xact level and there is no easy way to clear it.
45  -- Look it up in a new connection to get latest updates.
46  SELECT result::int into counter FROM
47   master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[
48      'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false);
49  PERFORM pg_sleep(0.1);
50 end loop;
51end$$ LANGUAGE plpgsql;
52
53-- add a node to the cluster
54SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
55SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
56
57-- create couple of tables
58CREATE TABLE ref_table(a int primary key);
59SELECT create_reference_table('ref_table');
60
61CREATE TABLE dist_table_1(a int primary key, b int references ref_table(a));
62SELECT create_distributed_table('dist_table_1', 'a');
63
64CREATE SEQUENCE sequence;
65CREATE TABLE reference_table (a int default nextval('sequence'));
66SELECT create_reference_table('reference_table');
67
68-- update the node
69SELECT 1 FROM master_update_node((SELECT nodeid FROM pg_dist_node),
70                                 'localhost', :worker_2_port);
71SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
72
73-- start syncing metadata to the node
74SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
75SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
76
77--------------------------------------------------------------------------
78-- Test that maintenance daemon syncs after master_update_node
79--------------------------------------------------------------------------
80
81-- Update the node again. We do this as epeatable read, so we just see the
82-- changes by master_update_node(). This is to avoid inconsistent results
83-- if the maintenance daemon does the metadata sync too fast.
84BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
85SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
86SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
87SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
88END;
89
90-- wait until maintenance daemon does the next metadata sync, and then
91-- check if metadata is synced again
92SELECT wait_until_metadata_sync(30000);
93SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
94
95SELECT verify_metadata('localhost', :worker_1_port);
96
97-- Update the node to a non-existent node. This is to simulate updating to
98-- a unwriteable node.
99BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
100SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
101SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
102SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
103END;
104
105-- maintenace daemon metadata sync should fail, because node is still unwriteable.
106SELECT wait_until_metadata_sync(30000);
107SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
108
109-- verify that metadata sync daemon has started
110SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
111
112--
113-- terminate maintenance daemon, and verify that we don't spawn multiple
114-- metadata sync daemons
115--
116SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
117CALL wait_until_process_count('Citus Maintenance Daemon', 1);
118select trigger_metadata_sync();
119select wait_until_metadata_sync();
120SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
121
122--
123-- cancel metadata sync daemon, and verify that it exits and restarts.
124--
125select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
126select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
127select wait_until_metadata_sync();
128select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
129select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted;
130
131--
132-- cancel metadata sync daemon so it exits and restarts, but at the
133-- same time tell maintenanced to trigger a new metadata sync. One
134-- of these should exit to avoid multiple metadata syncs.
135--
136select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
137select trigger_metadata_sync();
138select wait_until_metadata_sync();
139-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes.
140select pg_sleep(1.2);
141SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
142
143--
144-- error in metadata sync daemon, and verify it exits and restarts.
145--
146select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset
147select raise_error_in_metadata_sync();
148select wait_until_metadata_sync(30000);
149select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset
150select :pid_before_error != :pid_after_error AS metadata_sync_restarted;
151
152
153SELECT trigger_metadata_sync();
154SELECT wait_until_metadata_sync(30000);
155SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
156
157-- update it back to :worker_1_port, now metadata should be synced
158SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
159SELECT wait_until_metadata_sync(30000);
160SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
161
162--------------------------------------------------------------------------
163-- Test updating a node when another node is in readonly-mode
164--------------------------------------------------------------------------
165
166-- first, add node and sync metadata in the same transaction
167CREATE TYPE some_type AS (a int, b int);
168CREATE TABLE some_ref_table (a int, b some_type);
169SELECT create_reference_table('some_ref_table');
170INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
171
172BEGIN;
173	SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
174	SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
175
176  -- and modifications can be read from any worker in the same transaction
177  INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
178  SET LOCAL citus.task_assignment_policy TO "round-robin";
179  SELECT count(*) FROM some_ref_table;
180  SELECT count(*) FROM some_ref_table;
181COMMIT;
182
183DROP TABLE some_ref_table;
184DROP TYPE some_type;
185
186-- Create a table with shards on both nodes
187CREATE TABLE dist_table_2(a int);
188SELECT create_distributed_table('dist_table_2', 'a');
189INSERT INTO dist_table_2 SELECT i FROM generate_series(1, 100) i;
190
191SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
192
193-- Now updating the other node will mark worker 2 as not synced.
194BEGIN;
195SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
196SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
197COMMIT;
198
199-- worker_2 is out of sync, so further updates aren't sent to it and
200-- we shouldn't see the warnings.
201SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 23456);
202SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
203
204-- Make the node writeable.
205SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
206SELECT wait_until_metadata_sync(30000);
207
208-- Mark the node readonly again, so the following master_update_node warns
209SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
210
211-- Revert the nodeport of worker 1.
212BEGIN;
213SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
214SELECT count(*) FROM dist_table_2;
215END;
216
217SELECT wait_until_metadata_sync(30000);
218
219-- Make the node writeable.
220SELECT mark_node_readonly('localhost', :worker_2_port, FALSE);
221SELECT wait_until_metadata_sync(30000);
222
223SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
224SELECT verify_metadata('localhost', :worker_1_port),
225       verify_metadata('localhost', :worker_2_port);
226
227--------------------------------------------------------------------------
228-- Test that master_update_node rolls back properly
229--------------------------------------------------------------------------
230BEGIN;
231SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
232ROLLBACK;
233
234SELECT verify_metadata('localhost', :worker_1_port),
235       verify_metadata('localhost', :worker_2_port);
236
237--------------------------------------------------------------------------
238-- Test that master_update_node invalidates the plan cache
239--------------------------------------------------------------------------
240
241PREPARE foo AS SELECT COUNT(*) FROM dist_table_1 WHERE a = 1;
242
243SET citus.log_remote_commands = ON;
244-- trigger caching for prepared statements
245EXECUTE foo;
246EXECUTE foo;
247EXECUTE foo;
248EXECUTE foo;
249EXECUTE foo;
250EXECUTE foo;
251EXECUTE foo;
252
253SELECT master_update_node(:nodeid_1, '127.0.0.1', :worker_1_port);
254SELECT wait_until_metadata_sync(30000);
255
256-- make sure the nodename changed.
257EXECUTE foo;
258
259SET citus.log_remote_commands TO OFF;
260
261--------------------------------------------------------------------------
262-- Test that master_update_node can appear in a prepared transaction.
263--------------------------------------------------------------------------
264BEGIN;
265SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
266PREPARE TRANSACTION 'tx01';
267COMMIT PREPARED 'tx01';
268
269SELECT wait_until_metadata_sync(30000);
270SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
271
272BEGIN;
273SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
274PREPARE TRANSACTION 'tx01';
275COMMIT PREPARED 'tx01';
276
277SELECT wait_until_metadata_sync(30000);
278SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
279
280SELECT verify_metadata('localhost', :worker_1_port),
281       verify_metadata('localhost', :worker_2_port);
282
283--------------------------------------------------------------------------
284-- Test that changes in isactive is propagated to the metadata nodes
285--------------------------------------------------------------------------
286-- Don't drop the reference table so it has shards on the nodes being disabled
287DROP TABLE dist_table_1, dist_table_2;
288
289SELECT 1 FROM master_disable_node('localhost', :worker_2_port);
290SELECT verify_metadata('localhost', :worker_1_port);
291
292SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
293SELECT verify_metadata('localhost', :worker_1_port);
294
295------------------------------------------------------------------------------------
296-- Test master_disable_node() when the node that is being disabled is actually down
297------------------------------------------------------------------------------------
298SELECT master_update_node(:nodeid_2, 'localhost', 1);
299SELECT wait_until_metadata_sync(30000);
300
301-- set metadatasynced so we try porpagating metadata changes
302UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid IN (:nodeid_1, :nodeid_2);
303
304-- should error out
305SELECT 1 FROM master_disable_node('localhost', 1);
306
307-- try again after stopping metadata sync
308SELECT stop_metadata_sync_to_node('localhost', 1);
309SELECT 1 FROM master_disable_node('localhost', 1);
310
311SELECT verify_metadata('localhost', :worker_1_port);
312
313SELECT master_update_node(:nodeid_2, 'localhost', :worker_2_port);
314SELECT wait_until_metadata_sync(30000);
315
316SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
317SELECT verify_metadata('localhost', :worker_1_port);
318
319
320------------------------------------------------------------------------------------
321-- Test master_disable_node() when the other node is down
322------------------------------------------------------------------------------------
323-- node 1 is down.
324SELECT master_update_node(:nodeid_1, 'localhost', 1);
325SELECT wait_until_metadata_sync(30000);
326
327-- set metadatasynced so we try porpagating metadata changes
328UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid IN (:nodeid_1, :nodeid_2);
329
330-- should error out
331SELECT 1 FROM master_disable_node('localhost', :worker_2_port);
332
333-- try again after stopping metadata sync
334SELECT stop_metadata_sync_to_node('localhost', 1);
335SELECT 1 FROM master_disable_node('localhost', :worker_2_port);
336
337-- bring up node 1
338SELECT master_update_node(:nodeid_1, 'localhost', :worker_1_port);
339SELECT wait_until_metadata_sync(30000);
340
341SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
342
343SELECT verify_metadata('localhost', :worker_1_port);
344
345-- verify that metadata sync daemon exits
346call wait_until_process_count('Citus Metadata Sync Daemon', 0);
347
348-- verify that DROP DATABASE terminates metadata sync
349SELECT current_database() datname \gset
350CREATE DATABASE db_to_drop;
351SELECT run_command_on_workers('CREATE DATABASE db_to_drop');
352
353\c db_to_drop - - :worker_1_port
354CREATE EXTENSION citus;
355
356\c db_to_drop - - :master_port
357CREATE EXTENSION citus;
358SELECT master_add_node('localhost', :worker_1_port);
359UPDATE pg_dist_node SET hasmetadata = true;
360
361SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node;
362
363CREATE OR REPLACE FUNCTION trigger_metadata_sync()
364    RETURNS void
365    LANGUAGE C STRICT
366    AS 'citus';
367
368SELECT trigger_metadata_sync();
369
370\c :datname - - :master_port
371
372SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
373
374DROP DATABASE db_to_drop;
375
376SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
377
378-- cleanup
379DROP SEQUENCE sequence CASCADE;
380DROP TABLE ref_table;
381DROP TABLE reference_table;
382TRUNCATE pg_dist_colocation;
383SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
384ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
385ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
386ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
387ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART :last_placement_id;
388ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART :last_shard_id;
389
390RESET citus.shard_count;
391RESET citus.shard_replication_factor;
392