1-- Confirm we can use local, and adaptive execution
2
3CREATE SCHEMA with_executors;
4SET search_path TO with_executors, public;
5SET citus.enable_repartition_joins TO on;
6
7CREATE TABLE with_executors.local_table (id int);
8INSERT INTO local_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
9
10CREATE TABLE ref_table (id int);
11SELECT create_reference_table('ref_table');
12INSERT INTO ref_table VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);
13
14-- CTEs should be able to use local queries
15WITH cte AS (
16	WITH local_cte AS (
17		SELECT * FROM local_table
18	),
19	dist_cte AS (
20		SELECT user_id FROM events_table
21	)
22	SELECT * FROM local_cte join dist_cte on dist_cte.user_id=local_cte.id
23)
24SELECT count(*) FROM cte;
25
26
27WITH cte AS (
28	WITH local_cte AS (
29		SELECT * FROM local_table
30	),
31	dist_cte AS (
32		SELECT user_id FROM events_table
33	),
34	merger_cte AS (
35		SELECT id as user_id FROM local_cte UNION (SELECT * FROM dist_cte)
36	)
37	SELECT * FROM merger_cte WHERE user_id IN (1, 2, 3)
38)
39SELECT * FROM cte ORDER BY 1;
40
41
42WITH cte AS (
43	WITH local_cte AS (
44		SELECT * FROM local_table WHERE id < 5
45	),
46	local_cte_2 AS (
47		SELECT * FROM local_table WHERE id > 5
48	)
49	SELECT local_cte.id as id_1, local_cte_2.id as id_2 FROM local_cte,local_cte_2
50)
51SELECT
52	*
53FROM
54	cte
55join
56	users_table
57on
58	cte.id_1 = users_table.user_id
59WHERE
60	cte.id_1 IN (3, 4, 5)
61ORDER BY
62	1,2,3,4,5,6,7
63LIMIT
64	10;
65
66
67-- CTEs should be able to use router queries
68WITH cte AS (
69	WITH router_cte AS (
70		SELECT user_id, value_2 FROM users_table WHERE user_id = 1
71	),
72	router_cte_2 AS (
73		SELECT user_id, event_type, value_2 FROM events_table WHERE user_id = 1
74	)
75	SELECT
76		router_cte.user_id as uid, event_type
77	FROM
78		router_cte, router_cte_2
79)
80SELECT * FROM cte ORDER BY 2 LIMIT 5;
81
82
83-- CTEs should be able to use real-time queries
84WITH real_time_cte AS (
85	SELECT * FROM users_table WHERE value_2 IN (1, 2, 3)
86)
87SELECT * FROM real_time_cte ORDER BY 1, 2, 3, 4, 5, 6 LIMIT 10;
88
89
90-- router & real-time together
91WITH cte AS (
92	WITH router_cte AS (
93		SELECT user_id, value_2 FROM users_table WHERE user_id = 1
94	),
95	real_time AS (
96		SELECT user_id, event_type, value_2 FROM events_table
97	)
98	SELECT
99		router_cte.user_id as uid, event_type
100	FROM
101		router_cte, real_time
102	WHERE
103		router_cte.user_id=real_time.user_id
104)
105SELECT * FROM cte WHERE uid=1 ORDER BY 2 LIMIT 5;
106
107
108-- CTEs should be able to use adaptive executor
109WITH cte AS (
110	WITH task_tracker_1 AS (
111		SELECT
112			users_table.user_id as uid_1, users_table.value_2
113		FROM
114			users_table
115		JOIN
116			events_table
117		ON
118			users_table.value_2=events_table.value_2
119	),
120	task_tracker_2 AS (
121		SELECT
122			users_table.user_id as uid_2, users_table.value_3
123		FROM
124			users_table
125		JOIN
126			events_table
127		ON
128			users_table.value_3=events_table.value_3
129	)
130	SELECT
131		uid_1, uid_2, value_2, value_3
132	FROM
133		task_tracker_1
134	JOIN
135		task_tracker_2
136	ON
137		value_2 = value_3
138)
139SELECT
140	uid_1, uid_2, cte.value_2, cte.value_3
141FROM
142	cte
143JOIN
144	events_table
145ON
146	cte.value_2 = events_table.event_type
147ORDER BY
148	1, 2, 3, 4
149LIMIT 10;
150
151-- All combined
152WITH cte AS (
153	WITH task_tracker AS (
154		SELECT
155			users_table.user_id as uid_1, users_table.value_2 as val_2
156		FROM
157			users_table
158		JOIN
159			events_table
160		ON
161			users_table.value_2=events_table.value_2
162	),
163	real_time AS (
164		SELECT * FROM users_table
165	),
166	router_exec AS (
167		SELECT * FROM events_table WHERE user_id = 1
168	),
169	local_table AS (
170		SELECT * FROM local_table
171	),
172	join_first_two AS (
173		SELECT uid_1, time, value_3 FROM task_tracker JOIN real_time ON val_2=value_3
174	),
175	join_last_two AS (
176		SELECT
177			router_exec.user_id, local_table.id
178		FROM
179			router_exec
180		JOIN
181			local_table
182		ON
183			router_exec.user_id=local_table.id
184	)
185	SELECT * FROM join_first_two JOIN join_last_two ON id = value_3 ORDER BY 1,2,3,4,5 LIMIT 10
186)
187SELECT DISTINCT uid_1, time, value_3 FROM cte ORDER BY 1, 2, 3 LIMIT 20;
188
189-- All combined with outer join
190WITH cte AS (
191	WITH task_tracker AS (
192		SELECT
193			users_table.user_id as uid_1, users_table.value_2 as val_2
194		FROM
195			users_table
196		JOIN
197			events_table
198		ON
199			users_table.value_2=events_table.value_2
200	),
201	real_time AS (
202		SELECT * FROM users_table
203	),
204	router_exec AS (
205		SELECT * FROM events_table WHERE user_id = 1
206	),
207	local_table AS (
208		SELECT * FROM local_table
209	),
210	join_first_two AS (
211		SELECT uid_1, time, value_3 FROM task_tracker JOIN real_time ON val_2=value_3
212	),
213	join_last_two AS (
214		SELECT
215			router_exec.user_id, local_table.id
216		FROM
217			router_exec
218		JOIN
219			local_table
220		ON
221			router_exec.user_id=local_table.id
222	)
223	SELECT uid_1, value_3 as val_3 FROM join_first_two JOIN join_last_two ON id = value_3 ORDER BY 1,2 LIMIT 10
224)
225SELECT DISTINCT uid_1, val_3 FROM cte join events_table on cte.val_3=events_table.event_type ORDER BY 1, 2;
226
227
228-- CTEs should be able to terminate (the last SELECT) in a local query
229WITH cte AS (
230	SELECT user_id FROM users_table
231)
232SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id);
233
234-- even if there are no distributed tables
235WITH cte AS (
236	SELECT user_id FROM users_table
237)
238SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN events_table USING (user_id);
239
240-- unless the distributed table is part of a recursively planned subquery
241WITH cte AS (
242	SELECT user_id FROM users_table
243)
244SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN (SELECT * FROM events_table OFFSET 0) e USING (user_id);
245
246-- joins between local and reference tables are allowed
247-- even when the coordinator is not in the metadata at this stage
248WITH cte AS (
249	SELECT user_id FROM users_table
250)
251SELECT count(*) FROM local_table JOIN ref_table USING (id)
252WHERE id IN (SELECT * FROM cte);
253
254-- CTEs should be able to terminate a router query
255WITH cte AS (
256	WITH cte_1 AS (
257		SELECT * FROM local_table WHERE id < 7
258	),
259	cte_2 AS (
260		SELECT * FROM local_table WHERE id > 3
261	),
262	cte_dist AS (
263		SELECT count(*) as u_id FROM users_table
264	),
265	cte_merge AS (
266		SELECT cte_1.id as id FROM cte_1 join cte_2 on TRUE
267	)
268	SELECT count(*) FROM users_table join cte_merge on id=user_id
269)
270SELECT
271  row_number() OVER (), count(*)
272FROM
273  cte, users_table
274WHERE
275  cte.count=user_id and user_id=5;
276
277
278-- CTEs should be able to terminate a real-time query
279WITH cte AS (
280	WITH cte_1 AS (
281		SELECT * FROM local_table WHERE id < 7
282	),
283	cte_2 AS (
284		SELECT * FROM local_table WHERE id > 3
285	),
286	cte_dist AS (
287		SELECT count(*) as u_id FROM users_table
288	),
289	cte_merge AS (
290		SELECT cte_1.id as id FROM cte_1 join cte_2 on TRUE
291	)
292	SELECT count(*) FROM users_table join cte_merge on id=user_id
293)
294SELECT count(*) FROM cte, users_table where cte.count=user_id;
295
296
297WITH cte_1 AS (
298	SELECT
299		u_table.user_id as u_id, e_table.event_type
300	FROM
301		users_table as u_table
302	join
303		events_table as e_table
304	on
305		u_table.value_2=e_table.event_type
306	WHERE
307		u_table.user_id < 7
308),
309cte_2 AS (
310	SELECT
311		u_table.user_id as u_id, e_table.event_type
312	FROM
313		users_table as u_table
314	join
315		events_table as e_table
316	on
317		u_table.value_2=e_table.event_type
318	WHERE
319		u_table.user_id > 3
320),
321cte_merge AS (
322	SELECT
323		cte_1.u_id, cte_2.event_type
324	FROM
325		cte_1
326	join
327		cte_2
328	on cte_1.event_type=cte_2.u_id
329)
330SELECT
331	count(*)
332FROM
333	users_table, cte_merge
334WHERE
335	users_table.user_id = cte_merge.u_id;
336
337DROP SCHEMA with_executors CASCADE;
338