1-- File to create functions and helpers needed for subsequent tests 2 3-- create a helper function to create objects on each node 4CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text) 5RETURNS void LANGUAGE plpgsql AS $$ 6BEGIN 7 EXECUTE p_sql; 8 PERFORM run_command_on_workers(p_sql); 9END;$$; 10 11-- Create a function to make sure that queries returning the same result 12CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$ 13BEGIN 14 EXECUTE query; 15 EXCEPTION WHEN OTHERS THEN 16 IF SQLERRM LIKE 'failed to execute task%' THEN 17 RAISE 'Task failed to execute'; 18 END IF; 19END; 20$$LANGUAGE plpgsql; 21 22-- Create a function to ignore worker plans in explain output 23CREATE OR REPLACE FUNCTION coordinator_plan(explain_command text, out query_plan text) 24RETURNS SETOF TEXT AS $$ 25BEGIN 26 FOR query_plan IN execute explain_command LOOP 27 RETURN next; 28 IF query_plan LIKE '%Task Count:%' 29 THEN 30 RETURN; 31 END IF; 32 END LOOP; 33 RETURN; 34END; $$ language plpgsql; 35 36-- helper function that returns true if output of given explain has "is not null" (case in-sensitive) 37CREATE OR REPLACE FUNCTION explain_has_is_not_null(explain_command text) 38RETURNS BOOLEAN AS $$ 39DECLARE 40 query_plan text; 41BEGIN 42 FOR query_plan IN EXECUTE explain_command LOOP 43 IF query_plan ILIKE '%is not null%' 44 THEN 45 RETURN true; 46 END IF; 47 END LOOP; 48 RETURN false; 49END; $$ language plpgsql; 50 51-- helper function that returns true if output of given explain has "is not null" (case in-sensitive) 52CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_command text) 53RETURNS BOOLEAN AS $$ 54DECLARE 55 query_plan text; 56BEGIN 57 FOR query_plan IN EXECUTE explain_command LOOP 58 IF query_plan ILIKE '%Distributed Subplan %_%' 59 THEN 60 RETURN true; 61 END IF; 62 END LOOP; 63 RETURN false; 64END; $$ language plpgsql; 65 66--helper function to check there is a single task 67CREATE OR REPLACE FUNCTION explain_has_single_task(explain_command text) 68RETURNS BOOLEAN AS $$ 69DECLARE 70 query_plan text; 71BEGIN 72 FOR query_plan IN EXECUTE explain_command LOOP 73 IF query_plan ILIKE '%Task Count: 1%' 74 THEN 75 RETURN true; 76 END IF; 77 END LOOP; 78 RETURN false; 79END; $$ language plpgsql; 80 81-- helper function to quickly run SQL on the whole cluster 82CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text) 83RETURNS void LANGUAGE plpgsql AS $$ 84BEGIN 85 EXECUTE p_sql; 86 PERFORM run_command_on_workers(p_sql); 87END;$$; 88 89-- 1. Marks the given procedure as colocated with the given table. 90-- 2. Marks the argument index with which we route the procedure. 91CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int) 92RETURNS void LANGUAGE plpgsql AS $$ 93BEGIN 94 update citus.pg_dist_object 95 set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid 96 from pg_proc, pg_dist_partition 97 where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid; 98END;$$; 99 100-- helper function to verify the function of a coordinator is the same on all workers 101CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text) 102 RETURNS bool 103 LANGUAGE plpgsql 104AS $func$ 105DECLARE 106 coordinatorSql text; 107 workerSql text; 108BEGIN 109 SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql; 110 FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP 111 IF workerSql != coordinatorSql THEN 112 RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql; 113 RETURN false; 114 END IF; 115 END LOOP; 116 117 RETURN true; 118END; 119$func$; 120 121-- 122-- Procedure for creating shards for range partitioned distributed table. 123-- 124CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) 125AS $$ 126DECLARE 127 new_shardid bigint; 128 idx int; 129BEGIN 130 FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) 131 LOOP 132 SELECT master_create_empty_shard(rel::text) INTO new_shardid; 133 UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; 134 END LOOP; 135END; 136$$ LANGUAGE plpgsql; 137