1-- Test creation of mx tables and metadata syncing 2 3SELECT nextval('pg_catalog.pg_dist_placement_placementid_seq') AS last_placement_id 4\gset 5SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset 6SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset 7SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset 8SELECT nextval('pg_catalog.pg_dist_shardid_seq') AS last_shard_id \gset 9 10 11SET citus.shard_count TO 8; 12SET citus.shard_replication_factor TO 1; 13SET citus.replicate_reference_tables_on_activate TO off; 14 15\set VERBOSITY terse 16 17-- Simulates a readonly node by setting default_transaction_read_only. 18CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN) 19 RETURNS TEXT 20 LANGUAGE sql 21 AS $$ 22 SELECT master_run_on_worker(ARRAY[hostname], ARRAY[port], 23 ARRAY['ALTER SYSTEM SET default_transaction_read_only TO ' || isreadonly::TEXT], false); 24 SELECT result FROM 25 master_run_on_worker(ARRAY[hostname], ARRAY[port], 26 ARRAY['SELECT pg_reload_conf()'], false); 27$$; 28 29CREATE OR REPLACE FUNCTION trigger_metadata_sync() 30 RETURNS void 31 LANGUAGE C STRICT 32 AS 'citus'; 33 34CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync() 35 RETURNS void 36 LANGUAGE C STRICT 37 AS 'citus'; 38 39CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$ 40declare 41 counter integer := -1; 42begin 43 while counter != target_count loop 44 -- pg_stat_activity is cached at xact level and there is no easy way to clear it. 45 -- Look it up in a new connection to get latest updates. 46 SELECT result::int into counter FROM 47 master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[ 48 'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false); 49 PERFORM pg_sleep(0.1); 50 end loop; 51end$$ LANGUAGE plpgsql; 52 53-- add a node to the cluster 54SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset 55SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; 56 57-- create couple of tables 58CREATE TABLE ref_table(a int primary key); 59SELECT create_reference_table('ref_table'); 60 61CREATE TABLE dist_table_1(a int primary key, b int references ref_table(a)); 62SELECT create_distributed_table('dist_table_1', 'a'); 63 64CREATE SEQUENCE sequence; 65CREATE TABLE reference_table (a int default nextval('sequence')); 66SELECT create_reference_table('reference_table'); 67 68-- update the node 69SELECT 1 FROM master_update_node((SELECT nodeid FROM pg_dist_node), 70 'localhost', :worker_2_port); 71SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; 72 73-- start syncing metadata to the node 74SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); 75SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; 76 77-------------------------------------------------------------------------- 78-- Test that maintenance daemon syncs after master_update_node 79-------------------------------------------------------------------------- 80 81-- Update the node again. We do this as epeatable read, so we just see the 82-- changes by master_update_node(). This is to avoid inconsistent results 83-- if the maintenance daemon does the metadata sync too fast. 84BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; 85SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; 86SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); 87SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; 88END; 89 90-- wait until maintenance daemon does the next metadata sync, and then 91-- check if metadata is synced again 92SELECT wait_until_metadata_sync(30000); 93SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; 94 95SELECT verify_metadata('localhost', :worker_1_port); 96 97-- Update the node to a non-existent node. This is to simulate updating to 98-- a unwriteable node. 99BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; 100SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; 101SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); 102SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; 103END; 104 105-- maintenace daemon metadata sync should fail, because node is still unwriteable. 106SELECT wait_until_metadata_sync(30000); 107SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; 108 109-- verify that metadata sync daemon has started 110SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; 111 112-- 113-- terminate maintenance daemon, and verify that we don't spawn multiple 114-- metadata sync daemons 115-- 116SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon'; 117CALL wait_until_process_count('Citus Maintenance Daemon', 1); 118select trigger_metadata_sync(); 119select wait_until_metadata_sync(); 120SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; 121 122-- 123-- cancel metadata sync daemon, and verify that it exits and restarts. 124-- 125select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset 126select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; 127select wait_until_metadata_sync(); 128select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset 129select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted; 130 131-- 132-- cancel metadata sync daemon so it exits and restarts, but at the 133-- same time tell maintenanced to trigger a new metadata sync. One 134-- of these should exit to avoid multiple metadata syncs. 135-- 136select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; 137select trigger_metadata_sync(); 138select wait_until_metadata_sync(); 139-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes. 140select pg_sleep(1.2); 141SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; 142 143-- 144-- error in metadata sync daemon, and verify it exits and restarts. 145-- 146select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset 147select raise_error_in_metadata_sync(); 148select wait_until_metadata_sync(30000); 149select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset 150select :pid_before_error != :pid_after_error AS metadata_sync_restarted; 151 152 153SELECT trigger_metadata_sync(); 154SELECT wait_until_metadata_sync(30000); 155SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; 156 157-- update it back to :worker_1_port, now metadata should be synced 158SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); 159SELECT wait_until_metadata_sync(30000); 160SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; 161 162-------------------------------------------------------------------------- 163-- Test updating a node when another node is in readonly-mode 164-------------------------------------------------------------------------- 165 166-- first, add node and sync metadata in the same transaction 167CREATE TYPE some_type AS (a int, b int); 168CREATE TABLE some_ref_table (a int, b some_type); 169SELECT create_reference_table('some_ref_table'); 170INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; 171 172BEGIN; 173 SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset 174 SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); 175 176 -- and modifications can be read from any worker in the same transaction 177 INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; 178 SET LOCAL citus.task_assignment_policy TO "round-robin"; 179 SELECT count(*) FROM some_ref_table; 180 SELECT count(*) FROM some_ref_table; 181COMMIT; 182 183DROP TABLE some_ref_table; 184DROP TYPE some_type; 185 186-- Create a table with shards on both nodes 187CREATE TABLE dist_table_2(a int); 188SELECT create_distributed_table('dist_table_2', 'a'); 189INSERT INTO dist_table_2 SELECT i FROM generate_series(1, 100) i; 190 191SELECT mark_node_readonly('localhost', :worker_2_port, TRUE); 192 193-- Now updating the other node will mark worker 2 as not synced. 194BEGIN; 195SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); 196SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; 197COMMIT; 198 199-- worker_2 is out of sync, so further updates aren't sent to it and 200-- we shouldn't see the warnings. 201SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 23456); 202SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; 203 204-- Make the node writeable. 205SELECT mark_node_readonly('localhost', :worker_2_port, FALSE); 206SELECT wait_until_metadata_sync(30000); 207 208-- Mark the node readonly again, so the following master_update_node warns 209SELECT mark_node_readonly('localhost', :worker_2_port, TRUE); 210 211-- Revert the nodeport of worker 1. 212BEGIN; 213SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); 214SELECT count(*) FROM dist_table_2; 215END; 216 217SELECT wait_until_metadata_sync(30000); 218 219-- Make the node writeable. 220SELECT mark_node_readonly('localhost', :worker_2_port, FALSE); 221SELECT wait_until_metadata_sync(30000); 222 223SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); 224SELECT verify_metadata('localhost', :worker_1_port), 225 verify_metadata('localhost', :worker_2_port); 226 227-------------------------------------------------------------------------- 228-- Test that master_update_node rolls back properly 229-------------------------------------------------------------------------- 230BEGIN; 231SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); 232ROLLBACK; 233 234SELECT verify_metadata('localhost', :worker_1_port), 235 verify_metadata('localhost', :worker_2_port); 236 237-------------------------------------------------------------------------- 238-- Test that master_update_node invalidates the plan cache 239-------------------------------------------------------------------------- 240 241PREPARE foo AS SELECT COUNT(*) FROM dist_table_1 WHERE a = 1; 242 243SET citus.log_remote_commands = ON; 244-- trigger caching for prepared statements 245EXECUTE foo; 246EXECUTE foo; 247EXECUTE foo; 248EXECUTE foo; 249EXECUTE foo; 250EXECUTE foo; 251EXECUTE foo; 252 253SELECT master_update_node(:nodeid_1, '127.0.0.1', :worker_1_port); 254SELECT wait_until_metadata_sync(30000); 255 256-- make sure the nodename changed. 257EXECUTE foo; 258 259SET citus.log_remote_commands TO OFF; 260 261-------------------------------------------------------------------------- 262-- Test that master_update_node can appear in a prepared transaction. 263-------------------------------------------------------------------------- 264BEGIN; 265SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); 266PREPARE TRANSACTION 'tx01'; 267COMMIT PREPARED 'tx01'; 268 269SELECT wait_until_metadata_sync(30000); 270SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; 271 272BEGIN; 273SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); 274PREPARE TRANSACTION 'tx01'; 275COMMIT PREPARED 'tx01'; 276 277SELECT wait_until_metadata_sync(30000); 278SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; 279 280SELECT verify_metadata('localhost', :worker_1_port), 281 verify_metadata('localhost', :worker_2_port); 282 283-------------------------------------------------------------------------- 284-- Test that changes in isactive is propagated to the metadata nodes 285-------------------------------------------------------------------------- 286-- Don't drop the reference table so it has shards on the nodes being disabled 287DROP TABLE dist_table_1, dist_table_2; 288 289SELECT 1 FROM master_disable_node('localhost', :worker_2_port); 290SELECT verify_metadata('localhost', :worker_1_port); 291 292SELECT 1 FROM master_activate_node('localhost', :worker_2_port); 293SELECT verify_metadata('localhost', :worker_1_port); 294 295------------------------------------------------------------------------------------ 296-- Test master_disable_node() when the node that is being disabled is actually down 297------------------------------------------------------------------------------------ 298SELECT master_update_node(:nodeid_2, 'localhost', 1); 299SELECT wait_until_metadata_sync(30000); 300 301-- set metadatasynced so we try porpagating metadata changes 302UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid IN (:nodeid_1, :nodeid_2); 303 304-- should error out 305SELECT 1 FROM master_disable_node('localhost', 1); 306 307-- try again after stopping metadata sync 308SELECT stop_metadata_sync_to_node('localhost', 1); 309SELECT 1 FROM master_disable_node('localhost', 1); 310 311SELECT verify_metadata('localhost', :worker_1_port); 312 313SELECT master_update_node(:nodeid_2, 'localhost', :worker_2_port); 314SELECT wait_until_metadata_sync(30000); 315 316SELECT 1 FROM master_activate_node('localhost', :worker_2_port); 317SELECT verify_metadata('localhost', :worker_1_port); 318 319 320------------------------------------------------------------------------------------ 321-- Test master_disable_node() when the other node is down 322------------------------------------------------------------------------------------ 323-- node 1 is down. 324SELECT master_update_node(:nodeid_1, 'localhost', 1); 325SELECT wait_until_metadata_sync(30000); 326 327-- set metadatasynced so we try porpagating metadata changes 328UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid IN (:nodeid_1, :nodeid_2); 329 330-- should error out 331SELECT 1 FROM master_disable_node('localhost', :worker_2_port); 332 333-- try again after stopping metadata sync 334SELECT stop_metadata_sync_to_node('localhost', 1); 335SELECT 1 FROM master_disable_node('localhost', :worker_2_port); 336 337-- bring up node 1 338SELECT master_update_node(:nodeid_1, 'localhost', :worker_1_port); 339SELECT wait_until_metadata_sync(30000); 340 341SELECT 1 FROM master_activate_node('localhost', :worker_2_port); 342 343SELECT verify_metadata('localhost', :worker_1_port); 344 345-- verify that metadata sync daemon exits 346call wait_until_process_count('Citus Metadata Sync Daemon', 0); 347 348-- verify that DROP DATABASE terminates metadata sync 349SELECT current_database() datname \gset 350CREATE DATABASE db_to_drop; 351SELECT run_command_on_workers('CREATE DATABASE db_to_drop'); 352 353\c db_to_drop - - :worker_1_port 354CREATE EXTENSION citus; 355 356\c db_to_drop - - :master_port 357CREATE EXTENSION citus; 358SELECT master_add_node('localhost', :worker_1_port); 359UPDATE pg_dist_node SET hasmetadata = true; 360 361SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node; 362 363CREATE OR REPLACE FUNCTION trigger_metadata_sync() 364 RETURNS void 365 LANGUAGE C STRICT 366 AS 'citus'; 367 368SELECT trigger_metadata_sync(); 369 370\c :datname - - :master_port 371 372SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; 373 374DROP DATABASE db_to_drop; 375 376SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; 377 378-- cleanup 379DROP SEQUENCE sequence CASCADE; 380DROP TABLE ref_table; 381DROP TABLE reference_table; 382TRUNCATE pg_dist_colocation; 383SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t; 384ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; 385ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; 386ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; 387ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART :last_placement_id; 388ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART :last_shard_id; 389 390RESET citus.shard_count; 391RESET citus.shard_replication_factor; 392