1-- File to create functions and helpers needed for subsequent tests
2
3-- create a helper function to create objects on each node
4CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
5RETURNS void LANGUAGE plpgsql AS $$
6BEGIN
7     EXECUTE p_sql;
8     PERFORM run_command_on_workers(p_sql);
9END;$$;
10
11-- Create a function to make sure that queries returning the same result
12CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
13BEGIN
14	EXECUTE query;
15	EXCEPTION WHEN OTHERS THEN
16	IF SQLERRM LIKE 'failed to execute task%' THEN
17		RAISE 'Task failed to execute';
18	END IF;
19END;
20$$LANGUAGE plpgsql;
21
22-- Create a function to ignore worker plans in explain output
23CREATE OR REPLACE FUNCTION coordinator_plan(explain_command text, out query_plan text)
24RETURNS SETOF TEXT AS $$
25BEGIN
26  FOR query_plan IN execute explain_command LOOP
27    RETURN next;
28    IF query_plan LIKE '%Task Count:%'
29    THEN
30        RETURN;
31    END IF;
32  END LOOP;
33  RETURN;
34END; $$ language plpgsql;
35
36-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
37CREATE OR REPLACE FUNCTION explain_has_is_not_null(explain_command text)
38RETURNS BOOLEAN AS $$
39DECLARE
40  query_plan text;
41BEGIN
42  FOR query_plan IN EXECUTE explain_command LOOP
43    IF query_plan ILIKE '%is not null%'
44    THEN
45        RETURN true;
46    END IF;
47  END LOOP;
48  RETURN false;
49END; $$ language plpgsql;
50
51-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
52CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_command text)
53RETURNS BOOLEAN AS $$
54DECLARE
55  query_plan text;
56BEGIN
57  FOR query_plan IN EXECUTE explain_command LOOP
58    IF query_plan ILIKE '%Distributed Subplan %_%'
59    THEN
60        RETURN true;
61    END IF;
62  END LOOP;
63  RETURN false;
64END; $$ language plpgsql;
65
66--helper function to check there is a single task
67CREATE OR REPLACE FUNCTION explain_has_single_task(explain_command text)
68RETURNS BOOLEAN AS $$
69DECLARE
70  query_plan text;
71BEGIN
72  FOR query_plan IN EXECUTE explain_command LOOP
73    IF query_plan ILIKE '%Task Count: 1%'
74    THEN
75        RETURN true;
76    END IF;
77  END LOOP;
78  RETURN false;
79END; $$ language plpgsql;
80
81-- helper function to quickly run SQL on the whole cluster
82CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
83RETURNS void LANGUAGE plpgsql AS $$
84BEGIN
85     EXECUTE p_sql;
86     PERFORM run_command_on_workers(p_sql);
87END;$$;
88
89-- 1. Marks the given procedure as colocated with the given table.
90-- 2. Marks the argument index with which we route the procedure.
91CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
92RETURNS void LANGUAGE plpgsql AS $$
93BEGIN
94    update citus.pg_dist_object
95    set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
96    from pg_proc, pg_dist_partition
97    where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
98END;$$;
99
100-- helper function to verify the function of a coordinator is the same on all workers
101CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
102    RETURNS bool
103    LANGUAGE plpgsql
104AS $func$
105DECLARE
106    coordinatorSql text;
107    workerSql text;
108BEGIN
109    SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
110    FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
111            IF workerSql != coordinatorSql THEN
112                RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
113                RETURN false;
114            END IF;
115        END LOOP;
116
117    RETURN true;
118END;
119$func$;
120
121--
122-- Procedure for creating shards for range partitioned distributed table.
123--
124CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[])
125AS $$
126DECLARE
127  new_shardid bigint;
128  idx int;
129BEGIN
130  FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1))
131  LOOP
132    SELECT master_create_empty_shard(rel::text) INTO new_shardid;
133    UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid;
134  END LOOP;
135END;
136$$ LANGUAGE plpgsql;
137