1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 use std::convert::TryFrom;
19 use std::sync::Arc;
20
21 extern crate arrow;
22 extern crate datafusion;
23
24 use arrow::{array::*, datatypes::TimeUnit};
25 use arrow::{datatypes::Int32Type, datatypes::Int64Type, record_batch::RecordBatch};
26 use arrow::{
27 datatypes::{DataType, Field, Schema, SchemaRef},
28 util::display::array_value_to_string,
29 };
30
31 use datafusion::execution::context::ExecutionContext;
32 use datafusion::logical_plan::{LogicalPlan, ToDFSchema};
33 use datafusion::prelude::create_udf;
34 use datafusion::{
35 datasource::{csv::CsvReadOptions, MemTable},
36 physical_plan::collect,
37 };
38 use datafusion::{error::Result, physical_plan::ColumnarValue};
39
40 #[tokio::test]
nyc() -> Result<()>41 async fn nyc() -> Result<()> {
42 // schema for nyxtaxi csv files
43 let schema = Schema::new(vec![
44 Field::new("VendorID", DataType::Utf8, true),
45 Field::new("tpep_pickup_datetime", DataType::Utf8, true),
46 Field::new("tpep_dropoff_datetime", DataType::Utf8, true),
47 Field::new("passenger_count", DataType::Utf8, true),
48 Field::new("trip_distance", DataType::Float64, true),
49 Field::new("RatecodeID", DataType::Utf8, true),
50 Field::new("store_and_fwd_flag", DataType::Utf8, true),
51 Field::new("PULocationID", DataType::Utf8, true),
52 Field::new("DOLocationID", DataType::Utf8, true),
53 Field::new("payment_type", DataType::Utf8, true),
54 Field::new("fare_amount", DataType::Float64, true),
55 Field::new("extra", DataType::Float64, true),
56 Field::new("mta_tax", DataType::Float64, true),
57 Field::new("tip_amount", DataType::Float64, true),
58 Field::new("tolls_amount", DataType::Float64, true),
59 Field::new("improvement_surcharge", DataType::Float64, true),
60 Field::new("total_amount", DataType::Float64, true),
61 ]);
62
63 let mut ctx = ExecutionContext::new();
64 ctx.register_csv(
65 "tripdata",
66 "file.csv",
67 CsvReadOptions::new().schema(&schema),
68 )?;
69
70 let logical_plan = ctx.create_logical_plan(
71 "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) \
72 FROM tripdata GROUP BY passenger_count",
73 )?;
74
75 let optimized_plan = ctx.optimize(&logical_plan)?;
76
77 match &optimized_plan {
78 LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
79 LogicalPlan::TableScan {
80 ref projected_schema,
81 ..
82 } => {
83 assert_eq!(2, projected_schema.fields().len());
84 assert_eq!(projected_schema.field(0).name(), "passenger_count");
85 assert_eq!(projected_schema.field(1).name(), "fare_amount");
86 }
87 _ => unreachable!(),
88 },
89 _ => unreachable!(false),
90 }
91
92 Ok(())
93 }
94
95 #[tokio::test]
parquet_query()96 async fn parquet_query() {
97 let mut ctx = ExecutionContext::new();
98 register_alltypes_parquet(&mut ctx);
99 // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
100 // so we need an explicit cast
101 let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
102 let actual = execute(&mut ctx, sql).await;
103 let expected = vec![
104 vec!["4", "0"],
105 vec!["5", "1"],
106 vec!["6", "0"],
107 vec!["7", "1"],
108 vec!["2", "0"],
109 vec!["3", "1"],
110 vec!["0", "0"],
111 vec!["1", "1"],
112 ];
113
114 assert_eq!(expected, actual);
115 }
116
117 #[tokio::test]
parquet_single_nan_schema()118 async fn parquet_single_nan_schema() {
119 let mut ctx = ExecutionContext::new();
120 let testdata = arrow::util::test_util::parquet_test_data();
121 ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
122 .unwrap();
123 let sql = "SELECT mycol FROM single_nan";
124 let plan = ctx.create_logical_plan(&sql).unwrap();
125 let plan = ctx.optimize(&plan).unwrap();
126 let plan = ctx.create_physical_plan(&plan).unwrap();
127 let results = collect(plan).await.unwrap();
128 for batch in results {
129 assert_eq!(1, batch.num_rows());
130 assert_eq!(1, batch.num_columns());
131 }
132 }
133
134 #[tokio::test]
135 #[ignore = "Test ignored, will be enabled as part of the nested Parquet reader"]
parquet_list_columns()136 async fn parquet_list_columns() {
137 let mut ctx = ExecutionContext::new();
138 let testdata = arrow::util::test_util::parquet_test_data();
139 ctx.register_parquet(
140 "list_columns",
141 &format!("{}/list_columns.parquet", testdata),
142 )
143 .unwrap();
144
145 let schema = Arc::new(Schema::new(vec![
146 Field::new(
147 "int64_list",
148 DataType::List(Box::new(Field::new("item", DataType::Int64, true))),
149 true,
150 ),
151 Field::new(
152 "utf8_list",
153 DataType::List(Box::new(Field::new("item", DataType::Utf8, true))),
154 true,
155 ),
156 ]));
157
158 let sql = "SELECT int64_list, utf8_list FROM list_columns";
159 let plan = ctx.create_logical_plan(&sql).unwrap();
160 let plan = ctx.optimize(&plan).unwrap();
161 let plan = ctx.create_physical_plan(&plan).unwrap();
162 let results = collect(plan).await.unwrap();
163
164 // int64_list utf8_list
165 // 0 [1, 2, 3] [abc, efg, hij]
166 // 1 [None, 1] None
167 // 2 [4] [efg, None, hij, xyz]
168
169 assert_eq!(1, results.len());
170 let batch = &results[0];
171 assert_eq!(3, batch.num_rows());
172 assert_eq!(2, batch.num_columns());
173 assert_eq!(schema, batch.schema());
174
175 let int_list_array = batch
176 .column(0)
177 .as_any()
178 .downcast_ref::<ListArray>()
179 .unwrap();
180 let utf8_list_array = batch
181 .column(1)
182 .as_any()
183 .downcast_ref::<ListArray>()
184 .unwrap();
185
186 assert_eq!(
187 int_list_array
188 .value(0)
189 .as_any()
190 .downcast_ref::<PrimitiveArray<Int64Type>>()
191 .unwrap(),
192 &PrimitiveArray::<Int64Type>::from(vec![Some(1), Some(2), Some(3),])
193 );
194
195 assert_eq!(
196 utf8_list_array
197 .value(0)
198 .as_any()
199 .downcast_ref::<StringArray>()
200 .unwrap(),
201 &StringArray::try_from(vec![Some("abc"), Some("efg"), Some("hij"),]).unwrap()
202 );
203
204 assert_eq!(
205 int_list_array
206 .value(1)
207 .as_any()
208 .downcast_ref::<PrimitiveArray<Int64Type>>()
209 .unwrap(),
210 &PrimitiveArray::<Int64Type>::from(vec![None, Some(1),])
211 );
212
213 assert!(utf8_list_array.is_null(1));
214
215 assert_eq!(
216 int_list_array
217 .value(2)
218 .as_any()
219 .downcast_ref::<PrimitiveArray<Int64Type>>()
220 .unwrap(),
221 &PrimitiveArray::<Int64Type>::from(vec![Some(4),])
222 );
223
224 let result = utf8_list_array.value(2);
225 let result = result.as_any().downcast_ref::<StringArray>().unwrap();
226
227 assert_eq!(result.value(0), "efg");
228 assert!(result.is_null(1));
229 assert_eq!(result.value(2), "hij");
230 assert_eq!(result.value(3), "xyz");
231 }
232
233 #[tokio::test]
csv_select_nested() -> Result<()>234 async fn csv_select_nested() -> Result<()> {
235 let mut ctx = ExecutionContext::new();
236 register_aggregate_csv(&mut ctx)?;
237 let sql = "SELECT o1, o2, c3
238 FROM (
239 SELECT c1 AS o1, c2 + 1 AS o2, c3
240 FROM (
241 SELECT c1, c2, c3, c4
242 FROM aggregate_test_100
243 WHERE c1 = 'a' AND c2 >= 4
244 ORDER BY c2 ASC, c3 ASC
245 )
246 )";
247 let actual = execute(&mut ctx, sql).await;
248 let expected = vec![
249 vec!["a", "5", "-101"],
250 vec!["a", "5", "-54"],
251 vec!["a", "5", "-38"],
252 vec!["a", "5", "65"],
253 vec!["a", "6", "-101"],
254 vec!["a", "6", "-31"],
255 vec!["a", "6", "36"],
256 ];
257 assert_eq!(expected, actual);
258 Ok(())
259 }
260
261 #[tokio::test]
csv_count_star() -> Result<()>262 async fn csv_count_star() -> Result<()> {
263 let mut ctx = ExecutionContext::new();
264 register_aggregate_csv(&mut ctx)?;
265 let sql = "SELECT COUNT(*), COUNT(1) AS c, COUNT(c1) FROM aggregate_test_100";
266 let actual = execute(&mut ctx, sql).await;
267 let expected = vec![vec!["100", "100", "100"]];
268 assert_eq!(expected, actual);
269 Ok(())
270 }
271
272 #[tokio::test]
csv_query_with_predicate() -> Result<()>273 async fn csv_query_with_predicate() -> Result<()> {
274 let mut ctx = ExecutionContext::new();
275 register_aggregate_csv(&mut ctx)?;
276 let sql = "SELECT c1, c12 FROM aggregate_test_100 WHERE c12 > 0.376 AND c12 < 0.4";
277 let actual = execute(&mut ctx, sql).await;
278 let expected = vec![
279 vec!["e", "0.39144436569161134"],
280 vec!["d", "0.38870280983958583"],
281 ];
282 assert_eq!(expected, actual);
283 Ok(())
284 }
285
286 #[tokio::test]
csv_query_with_negative_predicate() -> Result<()>287 async fn csv_query_with_negative_predicate() -> Result<()> {
288 let mut ctx = ExecutionContext::new();
289 register_aggregate_csv(&mut ctx)?;
290 let sql = "SELECT c1, c4 FROM aggregate_test_100 WHERE c3 < -55 AND -c4 > 30000";
291 let actual = execute(&mut ctx, sql).await;
292 let expected = vec![vec!["e", "-31500"], vec!["c", "-30187"]];
293 assert_eq!(expected, actual);
294 Ok(())
295 }
296
297 #[tokio::test]
csv_query_with_negated_predicate() -> Result<()>298 async fn csv_query_with_negated_predicate() -> Result<()> {
299 let mut ctx = ExecutionContext::new();
300 register_aggregate_csv(&mut ctx)?;
301 let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE NOT(c1 != 'a')";
302 let actual = execute(&mut ctx, sql).await;
303 let expected = vec![vec!["21"]];
304 assert_eq!(expected, actual);
305 Ok(())
306 }
307
308 #[tokio::test]
csv_query_with_is_not_null_predicate() -> Result<()>309 async fn csv_query_with_is_not_null_predicate() -> Result<()> {
310 let mut ctx = ExecutionContext::new();
311 register_aggregate_csv(&mut ctx)?;
312 let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE c1 IS NOT NULL";
313 let actual = execute(&mut ctx, sql).await;
314 let expected = vec![vec!["100"]];
315 assert_eq!(expected, actual);
316 Ok(())
317 }
318
319 #[tokio::test]
csv_query_with_is_null_predicate() -> Result<()>320 async fn csv_query_with_is_null_predicate() -> Result<()> {
321 let mut ctx = ExecutionContext::new();
322 register_aggregate_csv(&mut ctx)?;
323 let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE c1 IS NULL";
324 let actual = execute(&mut ctx, sql).await;
325 let expected = vec![vec!["0"]];
326 assert_eq!(expected, actual);
327 Ok(())
328 }
329
330 #[tokio::test]
csv_query_group_by_int_min_max() -> Result<()>331 async fn csv_query_group_by_int_min_max() -> Result<()> {
332 let mut ctx = ExecutionContext::new();
333 register_aggregate_csv(&mut ctx)?;
334 let sql = "SELECT c2, MIN(c12), MAX(c12) FROM aggregate_test_100 GROUP BY c2";
335 let mut actual = execute(&mut ctx, sql).await;
336 actual.sort();
337 let expected = vec![
338 vec!["1", "0.05636955101974106", "0.9965400387585364"],
339 vec!["2", "0.16301110515739792", "0.991517828651004"],
340 vec!["3", "0.047343434291126085", "0.9293883502480845"],
341 vec!["4", "0.02182578039211991", "0.9237877978193884"],
342 vec!["5", "0.01479305307777301", "0.9723580396501548"],
343 ];
344 assert_eq!(expected, actual);
345 Ok(())
346 }
347
348 #[tokio::test]
csv_query_group_by_float32() -> Result<()>349 async fn csv_query_group_by_float32() -> Result<()> {
350 let mut ctx = ExecutionContext::new();
351 register_aggregate_simple_csv(&mut ctx)?;
352
353 let sql =
354 "SELECT COUNT(*) as cnt, c1 FROM aggregate_simple GROUP BY c1 ORDER BY cnt DESC";
355 let actual = execute(&mut ctx, sql).await;
356
357 let expected = vec![
358 vec!["5", "0.00005"],
359 vec!["4", "0.00004"],
360 vec!["3", "0.00003"],
361 vec!["2", "0.00002"],
362 vec!["1", "0.00001"],
363 ];
364 assert_eq!(expected, actual);
365
366 Ok(())
367 }
368
369 #[tokio::test]
csv_query_group_by_float64() -> Result<()>370 async fn csv_query_group_by_float64() -> Result<()> {
371 let mut ctx = ExecutionContext::new();
372 register_aggregate_simple_csv(&mut ctx)?;
373
374 let sql =
375 "SELECT COUNT(*) as cnt, c2 FROM aggregate_simple GROUP BY c2 ORDER BY cnt DESC";
376 let actual = execute(&mut ctx, sql).await;
377
378 let expected = vec![
379 vec!["5", "0.000000000005"],
380 vec!["4", "0.000000000004"],
381 vec!["3", "0.000000000003"],
382 vec!["2", "0.000000000002"],
383 vec!["1", "0.000000000001"],
384 ];
385 assert_eq!(expected, actual);
386
387 Ok(())
388 }
389
390 #[tokio::test]
csv_query_group_by_boolean() -> Result<()>391 async fn csv_query_group_by_boolean() -> Result<()> {
392 let mut ctx = ExecutionContext::new();
393 register_aggregate_simple_csv(&mut ctx)?;
394
395 let sql =
396 "SELECT COUNT(*) as cnt, c3 FROM aggregate_simple GROUP BY c3 ORDER BY cnt DESC";
397 let actual = execute(&mut ctx, sql).await;
398
399 let expected = vec![vec!["9", "true"], vec!["6", "false"]];
400 assert_eq!(expected, actual);
401
402 Ok(())
403 }
404
405 #[tokio::test]
csv_query_group_by_two_columns() -> Result<()>406 async fn csv_query_group_by_two_columns() -> Result<()> {
407 let mut ctx = ExecutionContext::new();
408 register_aggregate_csv(&mut ctx)?;
409 let sql = "SELECT c1, c2, MIN(c3) FROM aggregate_test_100 GROUP BY c1, c2";
410 let mut actual = execute(&mut ctx, sql).await;
411 actual.sort();
412 let expected = vec![
413 vec!["a", "1", "-85"],
414 vec!["a", "2", "-48"],
415 vec!["a", "3", "-72"],
416 vec!["a", "4", "-101"],
417 vec!["a", "5", "-101"],
418 vec!["b", "1", "12"],
419 vec!["b", "2", "-60"],
420 vec!["b", "3", "-101"],
421 vec!["b", "4", "-117"],
422 vec!["b", "5", "-82"],
423 vec!["c", "1", "-24"],
424 vec!["c", "2", "-117"],
425 vec!["c", "3", "-2"],
426 vec!["c", "4", "-90"],
427 vec!["c", "5", "-94"],
428 vec!["d", "1", "-99"],
429 vec!["d", "2", "93"],
430 vec!["d", "3", "-76"],
431 vec!["d", "4", "5"],
432 vec!["d", "5", "-59"],
433 vec!["e", "1", "36"],
434 vec!["e", "2", "-61"],
435 vec!["e", "3", "-95"],
436 vec!["e", "4", "-56"],
437 vec!["e", "5", "-86"],
438 ];
439 assert_eq!(expected, actual);
440 Ok(())
441 }
442
443 #[tokio::test]
csv_query_group_by_and_having() -> Result<()>444 async fn csv_query_group_by_and_having() -> Result<()> {
445 let mut ctx = ExecutionContext::new();
446 register_aggregate_csv(&mut ctx)?;
447 let sql = "SELECT c1, MIN(c3) AS m FROM aggregate_test_100 GROUP BY c1 HAVING m < -100 AND MAX(c3) > 70";
448 let mut actual = execute(&mut ctx, sql).await;
449 actual.sort();
450 let expected = vec![vec!["a", "-101"], vec!["c", "-117"]];
451 assert_eq!(expected, actual);
452 Ok(())
453 }
454
455 #[tokio::test]
csv_query_group_by_and_having_and_where() -> Result<()>456 async fn csv_query_group_by_and_having_and_where() -> Result<()> {
457 let mut ctx = ExecutionContext::new();
458 register_aggregate_csv(&mut ctx)?;
459 let sql = "SELECT c1, MIN(c3) AS m
460 FROM aggregate_test_100
461 WHERE c1 IN ('a', 'b')
462 GROUP BY c1
463 HAVING m < -100 AND MAX(c3) > 70";
464 let mut actual = execute(&mut ctx, sql).await;
465 actual.sort();
466 let expected = vec![vec!["a", "-101"]];
467 assert_eq!(expected, actual);
468 Ok(())
469 }
470
471 #[tokio::test]
csv_query_having_without_group_by() -> Result<()>472 async fn csv_query_having_without_group_by() -> Result<()> {
473 let mut ctx = ExecutionContext::new();
474 register_aggregate_csv(&mut ctx)?;
475 let sql = "SELECT c1, c2, c3 FROM aggregate_test_100 HAVING c2 >= 4 AND c3 > 90";
476 let mut actual = execute(&mut ctx, sql).await;
477 actual.sort();
478 let expected = vec![
479 vec!["c", "4", "123"],
480 vec!["c", "5", "118"],
481 vec!["d", "4", "102"],
482 vec!["e", "4", "96"],
483 vec!["e", "4", "97"],
484 ];
485 assert_eq!(expected, actual);
486 Ok(())
487 }
488
489 #[tokio::test]
csv_query_avg_sqrt() -> Result<()>490 async fn csv_query_avg_sqrt() -> Result<()> {
491 let mut ctx = create_ctx()?;
492 register_aggregate_csv(&mut ctx)?;
493 let sql = "SELECT avg(custom_sqrt(c12)) FROM aggregate_test_100";
494 let mut actual = execute(&mut ctx, sql).await;
495 actual.sort();
496 let expected = vec![vec!["0.6706002946036462"]];
497 assert_float_eq(&expected, &actual);
498 Ok(())
499 }
500
501 /// test that casting happens on udfs.
502 /// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the logical plan and
503 /// physical plan have the same schema.
504 #[tokio::test]
csv_query_custom_udf_with_cast() -> Result<()>505 async fn csv_query_custom_udf_with_cast() -> Result<()> {
506 let mut ctx = create_ctx()?;
507 register_aggregate_csv(&mut ctx)?;
508 let sql = "SELECT avg(custom_sqrt(c11)) FROM aggregate_test_100";
509 let actual = execute(&mut ctx, sql).await;
510 let expected = vec![vec!["0.6584408483418833"]];
511 assert_float_eq(&expected, &actual);
512 Ok(())
513 }
514
515 /// sqrt(f32) is slightly different than sqrt(CAST(f32 AS double)))
516 #[tokio::test]
sqrt_f32_vs_f64() -> Result<()>517 async fn sqrt_f32_vs_f64() -> Result<()> {
518 let mut ctx = create_ctx()?;
519 register_aggregate_csv(&mut ctx)?;
520 // sqrt(f32)'s plan passes
521 let sql = "SELECT avg(sqrt(c11)) FROM aggregate_test_100";
522 let actual = execute(&mut ctx, sql).await;
523 let expected = vec![vec!["0.6584408485889435"]];
524
525 assert_eq!(actual, expected);
526 let sql = "SELECT avg(sqrt(CAST(c11 AS double))) FROM aggregate_test_100";
527 let actual = execute(&mut ctx, sql).await;
528 let expected = vec![vec!["0.6584408483418833"]];
529 assert_float_eq(&expected, &actual);
530 Ok(())
531 }
532
533 #[tokio::test]
csv_query_error() -> Result<()>534 async fn csv_query_error() -> Result<()> {
535 // sin(utf8) should error
536 let mut ctx = create_ctx()?;
537 register_aggregate_csv(&mut ctx)?;
538 let sql = "SELECT sin(c1) FROM aggregate_test_100";
539 let plan = ctx.create_logical_plan(&sql);
540 assert!(plan.is_err());
541 Ok(())
542 }
543
544 // this query used to deadlock due to the call udf(udf())
545 #[tokio::test]
csv_query_sqrt_sqrt() -> Result<()>546 async fn csv_query_sqrt_sqrt() -> Result<()> {
547 let mut ctx = create_ctx()?;
548 register_aggregate_csv(&mut ctx)?;
549 let sql = "SELECT sqrt(sqrt(c12)) FROM aggregate_test_100 LIMIT 1";
550 let actual = execute(&mut ctx, sql).await;
551 // sqrt(sqrt(c12=0.9294097332465232)) = 0.9818650561397431
552 let expected = vec![vec!["0.9818650561397431"]];
553 assert_float_eq(&expected, &actual);
554 Ok(())
555 }
556
557 #[allow(clippy::unnecessary_wraps)]
create_ctx() -> Result<ExecutionContext>558 fn create_ctx() -> Result<ExecutionContext> {
559 let mut ctx = ExecutionContext::new();
560
561 // register a custom UDF
562 ctx.register_udf(create_udf(
563 "custom_sqrt",
564 vec![DataType::Float64],
565 Arc::new(DataType::Float64),
566 Arc::new(custom_sqrt),
567 ));
568
569 Ok(ctx)
570 }
571
custom_sqrt(args: &[ColumnarValue]) -> Result<ColumnarValue>572 fn custom_sqrt(args: &[ColumnarValue]) -> Result<ColumnarValue> {
573 let arg = &args[0];
574 if let ColumnarValue::Array(v) = arg {
575 let input = v
576 .as_any()
577 .downcast_ref::<Float64Array>()
578 .expect("cast failed");
579
580 let array: Float64Array = input.iter().map(|v| v.map(|x| x.sqrt())).collect();
581 Ok(ColumnarValue::Array(Arc::new(array)))
582 } else {
583 unimplemented!()
584 }
585 }
586
587 #[tokio::test]
csv_query_avg() -> Result<()>588 async fn csv_query_avg() -> Result<()> {
589 let mut ctx = ExecutionContext::new();
590 register_aggregate_csv(&mut ctx)?;
591 let sql = "SELECT avg(c12) FROM aggregate_test_100";
592 let mut actual = execute(&mut ctx, sql).await;
593 actual.sort();
594 let expected = vec![vec!["0.5089725099127211"]];
595 assert_float_eq(&expected, &actual);
596 Ok(())
597 }
598
599 #[tokio::test]
csv_query_group_by_avg() -> Result<()>600 async fn csv_query_group_by_avg() -> Result<()> {
601 let mut ctx = ExecutionContext::new();
602 register_aggregate_csv(&mut ctx)?;
603 let sql = "SELECT c1, avg(c12) FROM aggregate_test_100 GROUP BY c1";
604 let mut actual = execute(&mut ctx, sql).await;
605 actual.sort();
606 let expected = vec![
607 vec!["a", "0.48754517466109415"],
608 vec!["b", "0.41040709263815384"],
609 vec!["c", "0.6600456536439784"],
610 vec!["d", "0.48855379387549824"],
611 vec!["e", "0.48600669271341534"],
612 ];
613 assert_eq!(expected, actual);
614 Ok(())
615 }
616
617 #[tokio::test]
csv_query_group_by_avg_with_projection() -> Result<()>618 async fn csv_query_group_by_avg_with_projection() -> Result<()> {
619 let mut ctx = ExecutionContext::new();
620 register_aggregate_csv(&mut ctx)?;
621 let sql = "SELECT avg(c12), c1 FROM aggregate_test_100 GROUP BY c1";
622 let mut actual = execute(&mut ctx, sql).await;
623 actual.sort();
624 let expected = vec![
625 vec!["0.41040709263815384", "b"],
626 vec!["0.48600669271341534", "e"],
627 vec!["0.48754517466109415", "a"],
628 vec!["0.48855379387549824", "d"],
629 vec!["0.6600456536439784", "c"],
630 ];
631 assert_eq!(expected, actual);
632 Ok(())
633 }
634
635 #[tokio::test]
csv_query_avg_multi_batch() -> Result<()>636 async fn csv_query_avg_multi_batch() -> Result<()> {
637 let mut ctx = ExecutionContext::new();
638 register_aggregate_csv(&mut ctx)?;
639 let sql = "SELECT avg(c12) FROM aggregate_test_100";
640 let plan = ctx.create_logical_plan(&sql).unwrap();
641 let plan = ctx.optimize(&plan).unwrap();
642 let plan = ctx.create_physical_plan(&plan).unwrap();
643 let results = collect(plan).await.unwrap();
644 let batch = &results[0];
645 let column = batch.column(0);
646 let array = column.as_any().downcast_ref::<Float64Array>().unwrap();
647 let actual = array.value(0);
648 let expected = 0.5089725;
649 // Due to float number's accuracy, different batch size will lead to different
650 // answers.
651 assert!((expected - actual).abs() < 0.01);
652 Ok(())
653 }
654
655 #[tokio::test]
csv_query_nullif_divide_by_0() -> Result<()>656 async fn csv_query_nullif_divide_by_0() -> Result<()> {
657 let mut ctx = ExecutionContext::new();
658 register_aggregate_csv(&mut ctx)?;
659 let sql = "SELECT c8/nullif(c7, 0) FROM aggregate_test_100";
660 let actual = execute(&mut ctx, sql).await;
661 let actual = &actual[80..90]; // We just want to compare rows 80-89
662 let expected = vec![
663 vec!["258"],
664 vec!["664"],
665 vec!["NULL"],
666 vec!["22"],
667 vec!["164"],
668 vec!["448"],
669 vec!["365"],
670 vec!["1640"],
671 vec!["671"],
672 vec!["203"],
673 ];
674 assert_eq!(expected, actual);
675 Ok(())
676 }
677
678 #[tokio::test]
csv_query_count() -> Result<()>679 async fn csv_query_count() -> Result<()> {
680 let mut ctx = ExecutionContext::new();
681 register_aggregate_csv(&mut ctx)?;
682 let sql = "SELECT count(c12) FROM aggregate_test_100";
683 let actual = execute(&mut ctx, sql).await;
684 let expected = vec![vec!["100"]];
685 assert_eq!(expected, actual);
686 Ok(())
687 }
688
689 #[tokio::test]
csv_query_group_by_int_count() -> Result<()>690 async fn csv_query_group_by_int_count() -> Result<()> {
691 let mut ctx = ExecutionContext::new();
692 register_aggregate_csv(&mut ctx)?;
693 let sql = "SELECT c1, count(c12) FROM aggregate_test_100 GROUP BY c1";
694 let mut actual = execute(&mut ctx, sql).await;
695 actual.sort();
696 let expected = vec![
697 vec!["a", "21"],
698 vec!["b", "19"],
699 vec!["c", "21"],
700 vec!["d", "18"],
701 vec!["e", "21"],
702 ];
703 assert_eq!(expected, actual);
704 Ok(())
705 }
706
707 #[tokio::test]
csv_query_group_with_aliased_aggregate() -> Result<()>708 async fn csv_query_group_with_aliased_aggregate() -> Result<()> {
709 let mut ctx = ExecutionContext::new();
710 register_aggregate_csv(&mut ctx)?;
711 let sql = "SELECT c1, count(c12) AS count FROM aggregate_test_100 GROUP BY c1";
712 let mut actual = execute(&mut ctx, sql).await;
713 actual.sort();
714 let expected = vec![
715 vec!["a", "21"],
716 vec!["b", "19"],
717 vec!["c", "21"],
718 vec!["d", "18"],
719 vec!["e", "21"],
720 ];
721 assert_eq!(expected, actual);
722 Ok(())
723 }
724
725 #[tokio::test]
csv_query_group_by_string_min_max() -> Result<()>726 async fn csv_query_group_by_string_min_max() -> Result<()> {
727 let mut ctx = ExecutionContext::new();
728 register_aggregate_csv(&mut ctx)?;
729 let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 GROUP BY c1";
730 let mut actual = execute(&mut ctx, sql).await;
731 actual.sort();
732 let expected = vec![
733 vec!["a", "0.02182578039211991", "0.9800193410444061"],
734 vec!["b", "0.04893135681998029", "0.9185813970744787"],
735 vec!["c", "0.0494924465469434", "0.991517828651004"],
736 vec!["d", "0.061029375346466685", "0.9748360509016578"],
737 vec!["e", "0.01479305307777301", "0.9965400387585364"],
738 ];
739 assert_eq!(expected, actual);
740 Ok(())
741 }
742
743 #[tokio::test]
csv_query_cast() -> Result<()>744 async fn csv_query_cast() -> Result<()> {
745 let mut ctx = ExecutionContext::new();
746 register_aggregate_csv(&mut ctx)?;
747 let sql = "SELECT CAST(c12 AS float) FROM aggregate_test_100 WHERE c12 > 0.376 AND c12 < 0.4";
748 let actual = execute(&mut ctx, sql).await;
749 let expected = vec![vec!["0.39144436569161134"], vec!["0.38870280983958583"]];
750 assert_eq!(expected, actual);
751 Ok(())
752 }
753
754 #[tokio::test]
csv_query_cast_literal() -> Result<()>755 async fn csv_query_cast_literal() -> Result<()> {
756 let mut ctx = ExecutionContext::new();
757 register_aggregate_csv(&mut ctx)?;
758 let sql =
759 "SELECT c12, CAST(1 AS float) FROM aggregate_test_100 WHERE c12 > CAST(0 AS float) LIMIT 2";
760 let actual = execute(&mut ctx, sql).await;
761 let expected = vec![
762 vec!["0.9294097332465232", "1"],
763 vec!["0.3114712539863804", "1"],
764 ];
765 assert_eq!(expected, actual);
766 Ok(())
767 }
768
769 #[tokio::test]
csv_query_limit() -> Result<()>770 async fn csv_query_limit() -> Result<()> {
771 let mut ctx = ExecutionContext::new();
772 register_aggregate_csv(&mut ctx)?;
773 let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 2";
774 let actual = execute(&mut ctx, sql).await;
775 let expected = vec![vec!["c"], vec!["d"]];
776 assert_eq!(expected, actual);
777 Ok(())
778 }
779
780 #[tokio::test]
csv_query_limit_bigger_than_nbr_of_rows() -> Result<()>781 async fn csv_query_limit_bigger_than_nbr_of_rows() -> Result<()> {
782 let mut ctx = ExecutionContext::new();
783 register_aggregate_csv(&mut ctx)?;
784 let sql = "SELECT c2 FROM aggregate_test_100 LIMIT 200";
785 let actual = execute(&mut ctx, sql).await;
786 let expected = vec![
787 vec!["2"],
788 vec!["5"],
789 vec!["1"],
790 vec!["1"],
791 vec!["5"],
792 vec!["4"],
793 vec!["3"],
794 vec!["3"],
795 vec!["1"],
796 vec!["4"],
797 vec!["1"],
798 vec!["4"],
799 vec!["3"],
800 vec!["2"],
801 vec!["1"],
802 vec!["1"],
803 vec!["2"],
804 vec!["1"],
805 vec!["3"],
806 vec!["2"],
807 vec!["4"],
808 vec!["1"],
809 vec!["5"],
810 vec!["4"],
811 vec!["2"],
812 vec!["1"],
813 vec!["4"],
814 vec!["5"],
815 vec!["2"],
816 vec!["3"],
817 vec!["4"],
818 vec!["2"],
819 vec!["1"],
820 vec!["5"],
821 vec!["3"],
822 vec!["1"],
823 vec!["2"],
824 vec!["3"],
825 vec!["3"],
826 vec!["3"],
827 vec!["2"],
828 vec!["4"],
829 vec!["1"],
830 vec!["3"],
831 vec!["2"],
832 vec!["5"],
833 vec!["2"],
834 vec!["1"],
835 vec!["4"],
836 vec!["1"],
837 vec!["4"],
838 vec!["2"],
839 vec!["5"],
840 vec!["4"],
841 vec!["2"],
842 vec!["3"],
843 vec!["4"],
844 vec!["4"],
845 vec!["4"],
846 vec!["5"],
847 vec!["4"],
848 vec!["2"],
849 vec!["1"],
850 vec!["2"],
851 vec!["4"],
852 vec!["2"],
853 vec!["3"],
854 vec!["5"],
855 vec!["1"],
856 vec!["1"],
857 vec!["4"],
858 vec!["2"],
859 vec!["1"],
860 vec!["2"],
861 vec!["1"],
862 vec!["1"],
863 vec!["5"],
864 vec!["4"],
865 vec!["5"],
866 vec!["2"],
867 vec!["3"],
868 vec!["2"],
869 vec!["4"],
870 vec!["1"],
871 vec!["3"],
872 vec!["4"],
873 vec!["3"],
874 vec!["2"],
875 vec!["5"],
876 vec!["3"],
877 vec!["3"],
878 vec!["2"],
879 vec!["5"],
880 vec!["5"],
881 vec!["4"],
882 vec!["1"],
883 vec!["3"],
884 vec!["3"],
885 vec!["4"],
886 vec!["4"],
887 ];
888 assert_eq!(expected, actual);
889 Ok(())
890 }
891
892 #[tokio::test]
csv_query_limit_with_same_nbr_of_rows() -> Result<()>893 async fn csv_query_limit_with_same_nbr_of_rows() -> Result<()> {
894 let mut ctx = ExecutionContext::new();
895 register_aggregate_csv(&mut ctx)?;
896 let sql = "SELECT c2 FROM aggregate_test_100 LIMIT 100";
897 let actual = execute(&mut ctx, sql).await;
898 let expected = vec![
899 vec!["2"],
900 vec!["5"],
901 vec!["1"],
902 vec!["1"],
903 vec!["5"],
904 vec!["4"],
905 vec!["3"],
906 vec!["3"],
907 vec!["1"],
908 vec!["4"],
909 vec!["1"],
910 vec!["4"],
911 vec!["3"],
912 vec!["2"],
913 vec!["1"],
914 vec!["1"],
915 vec!["2"],
916 vec!["1"],
917 vec!["3"],
918 vec!["2"],
919 vec!["4"],
920 vec!["1"],
921 vec!["5"],
922 vec!["4"],
923 vec!["2"],
924 vec!["1"],
925 vec!["4"],
926 vec!["5"],
927 vec!["2"],
928 vec!["3"],
929 vec!["4"],
930 vec!["2"],
931 vec!["1"],
932 vec!["5"],
933 vec!["3"],
934 vec!["1"],
935 vec!["2"],
936 vec!["3"],
937 vec!["3"],
938 vec!["3"],
939 vec!["2"],
940 vec!["4"],
941 vec!["1"],
942 vec!["3"],
943 vec!["2"],
944 vec!["5"],
945 vec!["2"],
946 vec!["1"],
947 vec!["4"],
948 vec!["1"],
949 vec!["4"],
950 vec!["2"],
951 vec!["5"],
952 vec!["4"],
953 vec!["2"],
954 vec!["3"],
955 vec!["4"],
956 vec!["4"],
957 vec!["4"],
958 vec!["5"],
959 vec!["4"],
960 vec!["2"],
961 vec!["1"],
962 vec!["2"],
963 vec!["4"],
964 vec!["2"],
965 vec!["3"],
966 vec!["5"],
967 vec!["1"],
968 vec!["1"],
969 vec!["4"],
970 vec!["2"],
971 vec!["1"],
972 vec!["2"],
973 vec!["1"],
974 vec!["1"],
975 vec!["5"],
976 vec!["4"],
977 vec!["5"],
978 vec!["2"],
979 vec!["3"],
980 vec!["2"],
981 vec!["4"],
982 vec!["1"],
983 vec!["3"],
984 vec!["4"],
985 vec!["3"],
986 vec!["2"],
987 vec!["5"],
988 vec!["3"],
989 vec!["3"],
990 vec!["2"],
991 vec!["5"],
992 vec!["5"],
993 vec!["4"],
994 vec!["1"],
995 vec!["3"],
996 vec!["3"],
997 vec!["4"],
998 vec!["4"],
999 ];
1000 assert_eq!(expected, actual);
1001 Ok(())
1002 }
1003
1004 #[tokio::test]
csv_query_limit_zero() -> Result<()>1005 async fn csv_query_limit_zero() -> Result<()> {
1006 let mut ctx = ExecutionContext::new();
1007 register_aggregate_csv(&mut ctx)?;
1008 let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 0";
1009 let actual = execute(&mut ctx, sql).await;
1010 let expected: Vec<Vec<String>> = vec![];
1011 assert_eq!(expected, actual);
1012 Ok(())
1013 }
1014
1015 #[tokio::test]
csv_query_create_external_table()1016 async fn csv_query_create_external_table() {
1017 let mut ctx = ExecutionContext::new();
1018 register_aggregate_csv_by_sql(&mut ctx).await;
1019 let sql = "SELECT c1, c2, c3, c4, c5, c6, c7, c8, c9, 10, c11, c12, c13 FROM aggregate_test_100 LIMIT 1";
1020 let actual = execute(&mut ctx, sql).await;
1021 let expected = vec![vec![
1022 "c",
1023 "2",
1024 "1",
1025 "18109",
1026 "2033001162",
1027 "-6513304855495910254",
1028 "25",
1029 "43062",
1030 "1491205016",
1031 "10",
1032 "0.110830784",
1033 "0.9294097332465232",
1034 "6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW",
1035 ]];
1036 assert_eq!(expected, actual);
1037 }
1038
1039 #[tokio::test]
csv_query_external_table_count()1040 async fn csv_query_external_table_count() {
1041 let mut ctx = ExecutionContext::new();
1042 register_aggregate_csv_by_sql(&mut ctx).await;
1043 let sql = "SELECT COUNT(c12) FROM aggregate_test_100";
1044 let actual = execute(&mut ctx, sql).await;
1045 let expected = vec![vec!["100"]];
1046 assert_eq!(expected, actual);
1047 }
1048
1049 #[tokio::test]
csv_query_external_table_sum()1050 async fn csv_query_external_table_sum() {
1051 let mut ctx = ExecutionContext::new();
1052 // cast smallint and int to bigint to avoid overflow during calculation
1053 register_aggregate_csv_by_sql(&mut ctx).await;
1054 let sql =
1055 "SELECT SUM(CAST(c7 AS BIGINT)), SUM(CAST(c8 AS BIGINT)) FROM aggregate_test_100";
1056 let actual = execute(&mut ctx, sql).await;
1057 let expected = vec![vec!["13060", "3017641"]];
1058 assert_eq!(expected, actual);
1059 }
1060
1061 #[tokio::test]
csv_query_count_star()1062 async fn csv_query_count_star() {
1063 let mut ctx = ExecutionContext::new();
1064 register_aggregate_csv_by_sql(&mut ctx).await;
1065 let sql = "SELECT COUNT(*) FROM aggregate_test_100";
1066 let actual = execute(&mut ctx, sql).await;
1067 let expected = vec![vec!["100"]];
1068 assert_eq!(expected, actual);
1069 }
1070
1071 #[tokio::test]
csv_query_count_one()1072 async fn csv_query_count_one() {
1073 let mut ctx = ExecutionContext::new();
1074 register_aggregate_csv_by_sql(&mut ctx).await;
1075 let sql = "SELECT COUNT(1) FROM aggregate_test_100";
1076 let actual = execute(&mut ctx, sql).await;
1077 let expected = vec![vec!["100"]];
1078 assert_eq!(expected, actual);
1079 }
1080
1081 #[tokio::test]
case_when() -> Result<()>1082 async fn case_when() -> Result<()> {
1083 let mut ctx = create_case_context()?;
1084 let sql = "SELECT \
1085 CASE WHEN c1 = 'a' THEN 1 \
1086 WHEN c1 = 'b' THEN 2 \
1087 END \
1088 FROM t1";
1089 let actual = execute(&mut ctx, sql).await;
1090 let expected = vec![vec!["1"], vec!["2"], vec!["NULL"], vec!["NULL"]];
1091 assert_eq!(expected, actual);
1092 Ok(())
1093 }
1094
1095 #[tokio::test]
case_when_else() -> Result<()>1096 async fn case_when_else() -> Result<()> {
1097 let mut ctx = create_case_context()?;
1098 let sql = "SELECT \
1099 CASE WHEN c1 = 'a' THEN 1 \
1100 WHEN c1 = 'b' THEN 2 \
1101 ELSE 999 END \
1102 FROM t1";
1103 let actual = execute(&mut ctx, sql).await;
1104 let expected = vec![vec!["1"], vec!["2"], vec!["999"], vec!["999"]];
1105 assert_eq!(expected, actual);
1106 Ok(())
1107 }
1108
1109 #[tokio::test]
case_when_with_base_expr() -> Result<()>1110 async fn case_when_with_base_expr() -> Result<()> {
1111 let mut ctx = create_case_context()?;
1112 let sql = "SELECT \
1113 CASE c1 WHEN 'a' THEN 1 \
1114 WHEN 'b' THEN 2 \
1115 END \
1116 FROM t1";
1117 let actual = execute(&mut ctx, sql).await;
1118 let expected = vec![vec!["1"], vec!["2"], vec!["NULL"], vec!["NULL"]];
1119 assert_eq!(expected, actual);
1120 Ok(())
1121 }
1122
1123 #[tokio::test]
case_when_else_with_base_expr() -> Result<()>1124 async fn case_when_else_with_base_expr() -> Result<()> {
1125 let mut ctx = create_case_context()?;
1126 let sql = "SELECT \
1127 CASE c1 WHEN 'a' THEN 1 \
1128 WHEN 'b' THEN 2 \
1129 ELSE 999 END \
1130 FROM t1";
1131 let actual = execute(&mut ctx, sql).await;
1132 let expected = vec![vec!["1"], vec!["2"], vec!["999"], vec!["999"]];
1133 assert_eq!(expected, actual);
1134 Ok(())
1135 }
1136
create_case_context() -> Result<ExecutionContext>1137 fn create_case_context() -> Result<ExecutionContext> {
1138 let mut ctx = ExecutionContext::new();
1139 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, true)]));
1140 let data = RecordBatch::try_new(
1141 schema.clone(),
1142 vec![Arc::new(StringArray::from(vec![
1143 Some("a"),
1144 Some("b"),
1145 Some("c"),
1146 None,
1147 ]))],
1148 )?;
1149 let table = MemTable::try_new(schema, vec![vec![data]])?;
1150 ctx.register_table("t1", Arc::new(table));
1151 Ok(ctx)
1152 }
1153
1154 #[tokio::test]
equijoin() -> Result<()>1155 async fn equijoin() -> Result<()> {
1156 let mut ctx = create_join_context("t1_id", "t2_id")?;
1157 let sql =
1158 "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
1159 let actual = execute(&mut ctx, sql).await;
1160 let expected = vec![
1161 vec!["11", "a", "z"],
1162 vec!["22", "b", "y"],
1163 vec!["44", "d", "x"],
1164 ];
1165 assert_eq!(expected, actual);
1166 Ok(())
1167 }
1168
1169 #[tokio::test]
left_join() -> Result<()>1170 async fn left_join() -> Result<()> {
1171 let mut ctx = create_join_context("t1_id", "t2_id")?;
1172 let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
1173 let actual = execute(&mut ctx, sql).await;
1174 let expected = vec![
1175 vec!["11", "a", "z"],
1176 vec!["22", "b", "y"],
1177 vec!["33", "c", "NULL"],
1178 vec!["44", "d", "x"],
1179 ];
1180 assert_eq!(expected, actual);
1181 Ok(())
1182 }
1183
1184 #[tokio::test]
right_join() -> Result<()>1185 async fn right_join() -> Result<()> {
1186 let mut ctx = create_join_context("t1_id", "t2_id")?;
1187 let sql =
1188 "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
1189 let actual = execute(&mut ctx, sql).await;
1190 let expected = vec![
1191 vec!["NULL", "NULL", "w"],
1192 vec!["11", "a", "z"],
1193 vec!["22", "b", "y"],
1194 vec!["44", "d", "x"],
1195 ];
1196 assert_eq!(expected, actual);
1197 Ok(())
1198 }
1199
1200 #[tokio::test]
left_join_using() -> Result<()>1201 async fn left_join_using() -> Result<()> {
1202 let mut ctx = create_join_context("id", "id")?;
1203 let sql = "SELECT id, t1_name, t2_name FROM t1 LEFT JOIN t2 USING (id) ORDER BY id";
1204 let actual = execute(&mut ctx, sql).await;
1205 let expected = vec![
1206 vec!["11", "a", "z"],
1207 vec!["22", "b", "y"],
1208 vec!["33", "c", "NULL"],
1209 vec!["44", "d", "x"],
1210 ];
1211 assert_eq!(expected, actual);
1212 Ok(())
1213 }
1214
1215 #[tokio::test]
equijoin_implicit_syntax() -> Result<()>1216 async fn equijoin_implicit_syntax() -> Result<()> {
1217 let mut ctx = create_join_context("t1_id", "t2_id")?;
1218 let sql =
1219 "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id";
1220 let actual = execute(&mut ctx, sql).await;
1221 let expected = vec![
1222 vec!["11", "a", "z"],
1223 vec!["22", "b", "y"],
1224 vec!["44", "d", "x"],
1225 ];
1226 assert_eq!(expected, actual);
1227 Ok(())
1228 }
1229
1230 #[tokio::test]
equijoin_implicit_syntax_with_filter() -> Result<()>1231 async fn equijoin_implicit_syntax_with_filter() -> Result<()> {
1232 let mut ctx = create_join_context("t1_id", "t2_id")?;
1233 let sql = "SELECT t1_id, t1_name, t2_name \
1234 FROM t1, t2 \
1235 WHERE t1_id > 0 \
1236 AND t1_id = t2_id \
1237 AND t2_id < 99 \
1238 ORDER BY t1_id";
1239 let actual = execute(&mut ctx, sql).await;
1240 let expected = vec![
1241 vec!["11", "a", "z"],
1242 vec!["22", "b", "y"],
1243 vec!["44", "d", "x"],
1244 ];
1245 assert_eq!(expected, actual);
1246 Ok(())
1247 }
1248
1249 #[tokio::test]
equijoin_implicit_syntax_reversed() -> Result<()>1250 async fn equijoin_implicit_syntax_reversed() -> Result<()> {
1251 let mut ctx = create_join_context("t1_id", "t2_id")?;
1252 let sql =
1253 "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id";
1254 let actual = execute(&mut ctx, sql).await;
1255 let expected = vec![
1256 vec!["11", "a", "z"],
1257 vec!["22", "b", "y"],
1258 vec!["44", "d", "x"],
1259 ];
1260 assert_eq!(expected, actual);
1261 Ok(())
1262 }
1263
1264 #[tokio::test]
cartesian_join() -> Result<()>1265 async fn cartesian_join() -> Result<()> {
1266 let ctx = create_join_context("t1_id", "t2_id")?;
1267 let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id";
1268 let maybe_plan = ctx.create_logical_plan(&sql);
1269 assert_eq!(
1270 "This feature is not implemented: Cartesian joins are not supported",
1271 &format!("{}", maybe_plan.err().unwrap())
1272 );
1273 Ok(())
1274 }
1275
create_join_context( column_left: &str, column_right: &str, ) -> Result<ExecutionContext>1276 fn create_join_context(
1277 column_left: &str,
1278 column_right: &str,
1279 ) -> Result<ExecutionContext> {
1280 let mut ctx = ExecutionContext::new();
1281
1282 let t1_schema = Arc::new(Schema::new(vec![
1283 Field::new(column_left, DataType::UInt32, true),
1284 Field::new("t1_name", DataType::Utf8, true),
1285 ]));
1286 let t1_data = RecordBatch::try_new(
1287 t1_schema.clone(),
1288 vec![
1289 Arc::new(UInt32Array::from(vec![11, 22, 33, 44])),
1290 Arc::new(StringArray::from(vec![
1291 Some("a"),
1292 Some("b"),
1293 Some("c"),
1294 Some("d"),
1295 ])),
1296 ],
1297 )?;
1298 let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
1299 ctx.register_table("t1", Arc::new(t1_table));
1300
1301 let t2_schema = Arc::new(Schema::new(vec![
1302 Field::new(column_right, DataType::UInt32, true),
1303 Field::new("t2_name", DataType::Utf8, true),
1304 ]));
1305 let t2_data = RecordBatch::try_new(
1306 t2_schema.clone(),
1307 vec![
1308 Arc::new(UInt32Array::from(vec![11, 22, 44, 55])),
1309 Arc::new(StringArray::from(vec![
1310 Some("z"),
1311 Some("y"),
1312 Some("x"),
1313 Some("w"),
1314 ])),
1315 ],
1316 )?;
1317 let t2_table = MemTable::try_new(t2_schema, vec![vec![t2_data]])?;
1318 ctx.register_table("t2", Arc::new(t2_table));
1319
1320 Ok(ctx)
1321 }
1322
create_join_context_qualified() -> Result<ExecutionContext>1323 fn create_join_context_qualified() -> Result<ExecutionContext> {
1324 let mut ctx = ExecutionContext::new();
1325
1326 let t1_schema = Arc::new(Schema::new(vec![
1327 Field::new("a", DataType::UInt32, true),
1328 Field::new("b", DataType::UInt32, true),
1329 Field::new("c", DataType::UInt32, true),
1330 ]));
1331 let t1_data = RecordBatch::try_new(
1332 t1_schema.clone(),
1333 vec![
1334 Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
1335 Arc::new(UInt32Array::from(vec![10, 20, 30, 40])),
1336 Arc::new(UInt32Array::from(vec![50, 60, 70, 80])),
1337 ],
1338 )?;
1339 let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
1340 ctx.register_table("t1", Arc::new(t1_table));
1341
1342 let t2_schema = Arc::new(Schema::new(vec![
1343 Field::new("a", DataType::UInt32, true),
1344 Field::new("b", DataType::UInt32, true),
1345 Field::new("c", DataType::UInt32, true),
1346 ]));
1347 let t2_data = RecordBatch::try_new(
1348 t2_schema.clone(),
1349 vec![
1350 Arc::new(UInt32Array::from(vec![1, 2, 9, 4])),
1351 Arc::new(UInt32Array::from(vec![100, 200, 300, 400])),
1352 Arc::new(UInt32Array::from(vec![500, 600, 700, 800])),
1353 ],
1354 )?;
1355 let t2_table = MemTable::try_new(t2_schema, vec![vec![t2_data]])?;
1356 ctx.register_table("t2", Arc::new(t2_table));
1357
1358 Ok(ctx)
1359 }
1360
1361 #[tokio::test]
csv_explain()1362 async fn csv_explain() {
1363 let mut ctx = ExecutionContext::new();
1364 register_aggregate_csv_by_sql(&mut ctx).await;
1365 let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10";
1366 let actual = execute(&mut ctx, sql).await;
1367 let expected = vec![
1368 vec![
1369 "logical_plan",
1370 "Projection: #c1\n Filter: #c2 Gt Int64(10)\n TableScan: aggregate_test_100 projection=None"
1371 ]
1372 ];
1373 assert_eq!(expected, actual);
1374
1375 // Also, expect same result with lowercase explain
1376 let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10";
1377 let actual = execute(&mut ctx, sql).await;
1378 assert_eq!(expected, actual);
1379 }
1380
1381 #[tokio::test]
csv_explain_verbose()1382 async fn csv_explain_verbose() {
1383 let mut ctx = ExecutionContext::new();
1384 register_aggregate_csv_by_sql(&mut ctx).await;
1385 let sql = "EXPLAIN VERBOSE SELECT c1 FROM aggregate_test_100 where c2 > 10";
1386 let actual = execute(&mut ctx, sql).await;
1387
1388 // flatten to a single string
1389 let actual = actual.into_iter().map(|r| r.join("\t")).collect::<String>();
1390
1391 // Don't actually test the contents of the debuging output (as
1392 // that may change and keeping this test updated will be a
1393 // pain). Instead just check for a few key pieces.
1394 assert!(actual.contains("logical_plan"), "Actual: '{}'", actual);
1395 assert!(actual.contains("physical_plan"), "Actual: '{}'", actual);
1396 assert!(actual.contains("#c2 Gt Int64(10)"), "Actual: '{}'", actual);
1397 }
1398
aggr_test_schema() -> SchemaRef1399 fn aggr_test_schema() -> SchemaRef {
1400 Arc::new(Schema::new(vec![
1401 Field::new("c1", DataType::Utf8, false),
1402 Field::new("c2", DataType::UInt32, false),
1403 Field::new("c3", DataType::Int8, false),
1404 Field::new("c4", DataType::Int16, false),
1405 Field::new("c5", DataType::Int32, false),
1406 Field::new("c6", DataType::Int64, false),
1407 Field::new("c7", DataType::UInt8, false),
1408 Field::new("c8", DataType::UInt16, false),
1409 Field::new("c9", DataType::UInt32, false),
1410 Field::new("c10", DataType::UInt64, false),
1411 Field::new("c11", DataType::Float32, false),
1412 Field::new("c12", DataType::Float64, false),
1413 Field::new("c13", DataType::Utf8, false),
1414 ]))
1415 }
1416
register_aggregate_csv_by_sql(ctx: &mut ExecutionContext)1417 async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
1418 let testdata = arrow::util::test_util::arrow_test_data();
1419
1420 // TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once
1421 // unsigned is supported.
1422 let df = ctx
1423 .sql(&format!(
1424 "
1425 CREATE EXTERNAL TABLE aggregate_test_100 (
1426 c1 VARCHAR NOT NULL,
1427 c2 INT NOT NULL,
1428 c3 SMALLINT NOT NULL,
1429 c4 SMALLINT NOT NULL,
1430 c5 INT NOT NULL,
1431 c6 BIGINT NOT NULL,
1432 c7 SMALLINT NOT NULL,
1433 c8 INT NOT NULL,
1434 c9 BIGINT NOT NULL,
1435 c10 VARCHAR NOT NULL,
1436 c11 FLOAT NOT NULL,
1437 c12 DOUBLE NOT NULL,
1438 c13 VARCHAR NOT NULL
1439 )
1440 STORED AS CSV
1441 WITH HEADER ROW
1442 LOCATION '{}/csv/aggregate_test_100.csv'
1443 ",
1444 testdata
1445 ))
1446 .expect("Creating dataframe for CREATE EXTERNAL TABLE");
1447
1448 // Mimic the CLI and execute the resulting plan -- even though it
1449 // is effectively a no-op (returns zero rows)
1450 let results = df.collect().await.expect("Executing CREATE EXTERNAL TABLE");
1451 assert!(
1452 results.is_empty(),
1453 "Expected no rows from executing CREATE EXTERNAL TABLE"
1454 );
1455 }
1456
register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()>1457 fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> {
1458 let testdata = arrow::util::test_util::arrow_test_data();
1459 let schema = aggr_test_schema();
1460 ctx.register_csv(
1461 "aggregate_test_100",
1462 &format!("{}/csv/aggregate_test_100.csv", testdata),
1463 CsvReadOptions::new().schema(&schema),
1464 )?;
1465 Ok(())
1466 }
1467
register_aggregate_simple_csv(ctx: &mut ExecutionContext) -> Result<()>1468 fn register_aggregate_simple_csv(ctx: &mut ExecutionContext) -> Result<()> {
1469 // It's not possible to use aggregate_test_100, not enought similar values to test grouping on floats
1470 let schema = Arc::new(Schema::new(vec![
1471 Field::new("c1", DataType::Float32, false),
1472 Field::new("c2", DataType::Float64, false),
1473 Field::new("c3", DataType::Boolean, false),
1474 ]));
1475
1476 ctx.register_csv(
1477 "aggregate_simple",
1478 "tests/aggregate_simple.csv",
1479 CsvReadOptions::new().schema(&schema),
1480 )?;
1481 Ok(())
1482 }
1483
register_alltypes_parquet(ctx: &mut ExecutionContext)1484 fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
1485 let testdata = arrow::util::test_util::parquet_test_data();
1486 ctx.register_parquet(
1487 "alltypes_plain",
1488 &format!("{}/alltypes_plain.parquet", testdata),
1489 )
1490 .unwrap();
1491 }
1492
1493 /// Execute query and return result set as 2-d table of Vecs
1494 /// `result[row][column]`
execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<Vec<String>>1495 async fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<Vec<String>> {
1496 let msg = format!("Creating logical plan for '{}'", sql);
1497 let plan = ctx.create_logical_plan(&sql).expect(&msg);
1498 let logical_schema = plan.schema();
1499
1500 let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
1501 let plan = ctx.optimize(&plan).expect(&msg);
1502 let optimized_logical_schema = plan.schema();
1503
1504 let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
1505 let plan = ctx.create_physical_plan(&plan).expect(&msg);
1506 let physical_schema = plan.schema();
1507
1508 let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
1509 let results = collect(plan).await.expect(&msg);
1510
1511 assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
1512 assert_eq!(
1513 logical_schema.as_ref(),
1514 &physical_schema.to_dfschema().unwrap()
1515 );
1516
1517 result_vec(&results)
1518 }
1519
1520 /// Specialised String representation
col_str(column: &ArrayRef, row_index: usize) -> String1521 fn col_str(column: &ArrayRef, row_index: usize) -> String {
1522 if column.is_null(row_index) {
1523 return "NULL".to_string();
1524 }
1525
1526 // Special case ListArray as there is no pretty print support for it yet
1527 if let DataType::FixedSizeList(_, n) = column.data_type() {
1528 let array = column
1529 .as_any()
1530 .downcast_ref::<FixedSizeListArray>()
1531 .unwrap()
1532 .value(row_index);
1533
1534 let mut r = Vec::with_capacity(*n as usize);
1535 for i in 0..*n {
1536 r.push(col_str(&array, i as usize));
1537 }
1538 return format!("[{}]", r.join(","));
1539 }
1540
1541 array_value_to_string(column, row_index)
1542 .ok()
1543 .unwrap_or_else(|| "???".to_string())
1544 }
1545
1546 /// Converts the results into a 2d array of strings, `result[row][column]`
1547 /// Special cases nulls to NULL for testing
result_vec(results: &[RecordBatch]) -> Vec<Vec<String>>1548 fn result_vec(results: &[RecordBatch]) -> Vec<Vec<String>> {
1549 let mut result = vec![];
1550 for batch in results {
1551 for row_index in 0..batch.num_rows() {
1552 let row_vec = batch
1553 .columns()
1554 .iter()
1555 .map(|column| col_str(column, row_index))
1556 .collect();
1557 result.push(row_vec);
1558 }
1559 }
1560 result
1561 }
1562
generic_query_length<T: 'static + Array + From<Vec<&'static str>>>( datatype: DataType, ) -> Result<()>1563 async fn generic_query_length<T: 'static + Array + From<Vec<&'static str>>>(
1564 datatype: DataType,
1565 ) -> Result<()> {
1566 let schema = Arc::new(Schema::new(vec![Field::new("c1", datatype, false)]));
1567
1568 let data = RecordBatch::try_new(
1569 schema.clone(),
1570 vec![Arc::new(T::from(vec!["", "a", "aa", "aaa"]))],
1571 )?;
1572
1573 let table = MemTable::try_new(schema, vec![vec![data]])?;
1574
1575 let mut ctx = ExecutionContext::new();
1576 ctx.register_table("test", Arc::new(table));
1577 let sql = "SELECT length(c1) FROM test";
1578 let actual = execute(&mut ctx, sql).await;
1579 let expected = vec![vec!["0"], vec!["1"], vec!["2"], vec!["3"]];
1580 assert_eq!(expected, actual);
1581 Ok(())
1582 }
1583
1584 #[tokio::test]
query_length() -> Result<()>1585 async fn query_length() -> Result<()> {
1586 generic_query_length::<StringArray>(DataType::Utf8).await
1587 }
1588
1589 #[tokio::test]
query_large_length() -> Result<()>1590 async fn query_large_length() -> Result<()> {
1591 generic_query_length::<LargeStringArray>(DataType::LargeUtf8).await
1592 }
1593
1594 #[tokio::test]
query_not() -> Result<()>1595 async fn query_not() -> Result<()> {
1596 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]));
1597
1598 let data = RecordBatch::try_new(
1599 schema.clone(),
1600 vec![Arc::new(BooleanArray::from(vec![
1601 Some(false),
1602 None,
1603 Some(true),
1604 ]))],
1605 )?;
1606
1607 let table = MemTable::try_new(schema, vec![vec![data]])?;
1608
1609 let mut ctx = ExecutionContext::new();
1610 ctx.register_table("test", Arc::new(table));
1611 let sql = "SELECT NOT c1 FROM test";
1612 let actual = execute(&mut ctx, sql).await;
1613 let expected = vec![vec!["true"], vec!["NULL"], vec!["false"]];
1614 assert_eq!(expected, actual);
1615 Ok(())
1616 }
1617
1618 #[tokio::test]
query_concat() -> Result<()>1619 async fn query_concat() -> Result<()> {
1620 let schema = Arc::new(Schema::new(vec![
1621 Field::new("c1", DataType::Utf8, false),
1622 Field::new("c2", DataType::Int32, true),
1623 ]));
1624
1625 let data = RecordBatch::try_new(
1626 schema.clone(),
1627 vec![
1628 Arc::new(StringArray::from(vec!["", "a", "aa", "aaa"])),
1629 Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3)])),
1630 ],
1631 )?;
1632
1633 let table = MemTable::try_new(schema, vec![vec![data]])?;
1634
1635 let mut ctx = ExecutionContext::new();
1636 ctx.register_table("test", Arc::new(table));
1637 let sql = "SELECT concat(c1, '-hi-', cast(c2 as varchar)) FROM test";
1638 let actual = execute(&mut ctx, sql).await;
1639 let expected = vec![
1640 vec!["-hi-0"],
1641 vec!["a-hi-1"],
1642 vec!["NULL"],
1643 vec!["aaa-hi-3"],
1644 ];
1645 assert_eq!(expected, actual);
1646 Ok(())
1647 }
1648
1649 #[tokio::test]
query_array() -> Result<()>1650 async fn query_array() -> Result<()> {
1651 let schema = Arc::new(Schema::new(vec![
1652 Field::new("c1", DataType::Utf8, false),
1653 Field::new("c2", DataType::Int32, true),
1654 ]));
1655
1656 let data = RecordBatch::try_new(
1657 schema.clone(),
1658 vec![
1659 Arc::new(StringArray::from(vec!["", "a", "aa", "aaa"])),
1660 Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3)])),
1661 ],
1662 )?;
1663
1664 let table = MemTable::try_new(schema, vec![vec![data]])?;
1665
1666 let mut ctx = ExecutionContext::new();
1667 ctx.register_table("test", Arc::new(table));
1668 let sql = "SELECT array(c1, cast(c2 as varchar)) FROM test";
1669 let actual = execute(&mut ctx, sql).await;
1670 let expected = vec![
1671 vec!["[,0]"],
1672 vec!["[a,1]"],
1673 vec!["[aa,NULL]"],
1674 vec!["[aaa,3]"],
1675 ];
1676 assert_eq!(expected, actual);
1677 Ok(())
1678 }
1679
1680 #[tokio::test]
csv_query_sum_cast()1681 async fn csv_query_sum_cast() {
1682 let mut ctx = ExecutionContext::new();
1683 register_aggregate_csv_by_sql(&mut ctx).await;
1684 // c8 = i32; c9 = i64
1685 let sql = "SELECT c8 + c9 FROM aggregate_test_100";
1686 // check that the physical and logical schemas are equal
1687 execute(&mut ctx, sql).await;
1688 }
1689
1690 #[tokio::test]
query_where_neg_num() -> Result<()>1691 async fn query_where_neg_num() -> Result<()> {
1692 let mut ctx = ExecutionContext::new();
1693 register_aggregate_csv_by_sql(&mut ctx).await;
1694
1695 // Negative numbers do not parse correctly as of Arrow 2.0.0
1696 let sql = "select c7, c8 from aggregate_test_100 where c7 >= -2 and c7 < 10";
1697 let actual = execute(&mut ctx, sql).await;
1698 let expected = vec![
1699 vec!["7", "45465"],
1700 vec!["5", "40622"],
1701 vec!["0", "61069"],
1702 vec!["2", "20120"],
1703 vec!["4", "39363"],
1704 ];
1705 assert_eq!(expected, actual);
1706
1707 // Also check floating point neg numbers
1708 let sql = "select c7, c8 from aggregate_test_100 where c7 >= -2.9 and c7 < 10";
1709 let actual = execute(&mut ctx, sql).await;
1710 let expected = vec![
1711 vec!["7", "45465"],
1712 vec!["5", "40622"],
1713 vec!["0", "61069"],
1714 vec!["2", "20120"],
1715 vec!["4", "39363"],
1716 ];
1717 assert_eq!(expected, actual);
1718 Ok(())
1719 }
1720
1721 #[tokio::test]
like() -> Result<()>1722 async fn like() -> Result<()> {
1723 let mut ctx = ExecutionContext::new();
1724 register_aggregate_csv_by_sql(&mut ctx).await;
1725 let sql = "SELECT COUNT(c1) FROM aggregate_test_100 WHERE c13 LIKE '%FB%'";
1726 // check that the physical and logical schemas are equal
1727 let actual = execute(&mut ctx, sql).await;
1728
1729 let expected = vec![vec!["1"]];
1730 assert_eq!(expected, actual);
1731 Ok(())
1732 }
1733
make_timestamp_nano_table() -> Result<Arc<MemTable>>1734 fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
1735 let schema = Arc::new(Schema::new(vec![
1736 Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
1737 Field::new("value", DataType::Int32, true),
1738 ]));
1739
1740 let mut builder = TimestampNanosecondArray::builder(3);
1741
1742 builder.append_value(1599572549190855000)?; // 2020-09-08T13:42:29.190855+00:00
1743 builder.append_value(1599568949190855000)?; // 2020-09-08T12:42:29.190855+00:00
1744 builder.append_value(1599565349190855000)?; // 2020-09-08T11:42:29.190855+00:00
1745
1746 let data = RecordBatch::try_new(
1747 schema.clone(),
1748 vec![
1749 Arc::new(builder.finish()),
1750 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
1751 ],
1752 )?;
1753 let table = MemTable::try_new(schema, vec![vec![data]])?;
1754 Ok(Arc::new(table))
1755 }
1756
1757 #[tokio::test]
to_timestamp() -> Result<()>1758 async fn to_timestamp() -> Result<()> {
1759 let mut ctx = ExecutionContext::new();
1760 ctx.register_table("ts_data", make_timestamp_nano_table()?);
1761
1762 let sql = "SELECT COUNT(*) FROM ts_data where ts > to_timestamp('2020-09-08T12:00:00+00:00')";
1763 let actual = execute(&mut ctx, sql).await;
1764
1765 let expected = vec![vec!["2"]];
1766 assert_eq!(expected, actual);
1767 Ok(())
1768 }
1769
1770 #[tokio::test]
query_is_null() -> Result<()>1771 async fn query_is_null() -> Result<()> {
1772 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Float64, true)]));
1773
1774 let data = RecordBatch::try_new(
1775 schema.clone(),
1776 vec![Arc::new(Float64Array::from(vec![
1777 Some(1.0),
1778 None,
1779 Some(f64::NAN),
1780 ]))],
1781 )?;
1782
1783 let table = MemTable::try_new(schema, vec![vec![data]])?;
1784
1785 let mut ctx = ExecutionContext::new();
1786 ctx.register_table("test", Arc::new(table));
1787 let sql = "SELECT c1 IS NULL FROM test";
1788 let actual = execute(&mut ctx, sql).await;
1789 let expected = vec![vec!["false"], vec!["true"], vec!["false"]];
1790 assert_eq!(expected, actual);
1791 Ok(())
1792 }
1793
1794 #[tokio::test]
query_is_not_null() -> Result<()>1795 async fn query_is_not_null() -> Result<()> {
1796 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Float64, true)]));
1797
1798 let data = RecordBatch::try_new(
1799 schema.clone(),
1800 vec![Arc::new(Float64Array::from(vec![
1801 Some(1.0),
1802 None,
1803 Some(f64::NAN),
1804 ]))],
1805 )?;
1806
1807 let table = MemTable::try_new(schema, vec![vec![data]])?;
1808
1809 let mut ctx = ExecutionContext::new();
1810 ctx.register_table("test", Arc::new(table));
1811 let sql = "SELECT c1 IS NOT NULL FROM test";
1812 let actual = execute(&mut ctx, sql).await;
1813 let expected = vec![vec!["true"], vec!["false"], vec!["true"]];
1814
1815 assert_eq!(expected, actual);
1816 Ok(())
1817 }
1818
1819 #[tokio::test]
query_count_distinct() -> Result<()>1820 async fn query_count_distinct() -> Result<()> {
1821 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
1822
1823 let data = RecordBatch::try_new(
1824 schema.clone(),
1825 vec![Arc::new(Int32Array::from(vec![
1826 Some(0),
1827 Some(1),
1828 None,
1829 Some(3),
1830 Some(3),
1831 ]))],
1832 )?;
1833
1834 let table = MemTable::try_new(schema, vec![vec![data]])?;
1835
1836 let mut ctx = ExecutionContext::new();
1837 ctx.register_table("test", Arc::new(table));
1838 let sql = "SELECT COUNT(DISTINCT c1) FROM test";
1839 let actual = execute(&mut ctx, sql).await;
1840 let expected = vec![vec!["3".to_string()]];
1841 assert_eq!(expected, actual);
1842 Ok(())
1843 }
1844
1845 #[tokio::test]
query_on_string_dictionary() -> Result<()>1846 async fn query_on_string_dictionary() -> Result<()> {
1847 // Test to ensure DataFusion can operate on dictionary types
1848 // Use StringDictionary (32 bit indexes = keys)
1849 let field_type =
1850 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1851 let schema = Arc::new(Schema::new(vec![Field::new("d1", field_type, true)]));
1852
1853 let keys_builder = PrimitiveBuilder::<Int32Type>::new(10);
1854 let values_builder = StringBuilder::new(10);
1855 let mut builder = StringDictionaryBuilder::new(keys_builder, values_builder);
1856
1857 builder.append("one")?;
1858 builder.append_null()?;
1859 builder.append("three")?;
1860 let array = Arc::new(builder.finish());
1861
1862 let data = RecordBatch::try_new(schema.clone(), vec![array])?;
1863
1864 let table = MemTable::try_new(schema, vec![vec![data]])?;
1865 let mut ctx = ExecutionContext::new();
1866 ctx.register_table("test", Arc::new(table));
1867
1868 // Basic SELECT
1869 let sql = "SELECT * FROM test";
1870 let actual = execute(&mut ctx, sql).await;
1871 let expected = vec![vec!["one"], vec!["NULL"], vec!["three"]];
1872 assert_eq!(expected, actual);
1873
1874 // basic filtering
1875 let sql = "SELECT * FROM test WHERE d1 IS NOT NULL";
1876 let actual = execute(&mut ctx, sql).await;
1877 let expected = vec![vec!["one"], vec!["three"]];
1878 assert_eq!(expected, actual);
1879
1880 // filtering with constant
1881 let sql = "SELECT * FROM test WHERE d1 = 'three'";
1882 let actual = execute(&mut ctx, sql).await;
1883 let expected = vec![vec!["three"]];
1884 assert_eq!(expected, actual);
1885
1886 // Expression evaluation
1887 let sql = "SELECT concat(d1, '-foo') FROM test";
1888 let actual = execute(&mut ctx, sql).await;
1889 let expected = vec![vec!["one-foo"], vec!["NULL"], vec!["three-foo"]];
1890 assert_eq!(expected, actual);
1891
1892 // aggregation
1893 let sql = "SELECT COUNT(d1) FROM test";
1894 let actual = execute(&mut ctx, sql).await;
1895 let expected = vec![vec!["2"]];
1896 assert_eq!(expected, actual);
1897
1898 Ok(())
1899 }
1900
1901 #[tokio::test]
query_without_from() -> Result<()>1902 async fn query_without_from() -> Result<()> {
1903 // Test for SELECT <expression> without FROM.
1904 // Should evaluate expressions in project position.
1905 let mut ctx = ExecutionContext::new();
1906
1907 let sql = "SELECT 1";
1908 let actual = execute(&mut ctx, sql).await;
1909 let expected = vec![vec!["1"]];
1910 assert_eq!(expected, actual);
1911
1912 let sql = "SELECT 1+2, 3/4, cos(0)";
1913 let actual = execute(&mut ctx, sql).await;
1914 let expected = vec![vec!["3", "0", "1"]];
1915 assert_eq!(expected, actual);
1916
1917 Ok(())
1918 }
1919
1920 #[tokio::test]
query_scalar_minus_array() -> Result<()>1921 async fn query_scalar_minus_array() -> Result<()> {
1922 let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
1923
1924 let data = RecordBatch::try_new(
1925 schema.clone(),
1926 vec![Arc::new(Int32Array::from(vec![
1927 Some(0),
1928 Some(1),
1929 None,
1930 Some(3),
1931 ]))],
1932 )?;
1933
1934 let table = MemTable::try_new(schema, vec![vec![data]])?;
1935
1936 let mut ctx = ExecutionContext::new();
1937 ctx.register_table("test", Arc::new(table));
1938 let sql = "SELECT 4 - c1 FROM test";
1939 let actual = execute(&mut ctx, sql).await;
1940 let expected = vec![vec!["4"], vec!["3"], vec!["NULL"], vec!["1"]];
1941 assert_eq!(expected, actual);
1942 Ok(())
1943 }
1944
assert_float_eq<T>(expected: &[Vec<T>], received: &[Vec<String>]) where T: AsRef<str>,1945 fn assert_float_eq<T>(expected: &[Vec<T>], received: &[Vec<String>])
1946 where
1947 T: AsRef<str>,
1948 {
1949 expected
1950 .iter()
1951 .flatten()
1952 .zip(received.iter().flatten())
1953 .for_each(|(l, r)| {
1954 let (l, r) = (
1955 l.as_ref().parse::<f64>().unwrap(),
1956 r.as_str().parse::<f64>().unwrap(),
1957 );
1958 assert!((l - r).abs() <= 2.0 * f64::EPSILON);
1959 });
1960 }
1961
1962 #[tokio::test]
csv_between_expr() -> Result<()>1963 async fn csv_between_expr() -> Result<()> {
1964 let mut ctx = ExecutionContext::new();
1965 register_aggregate_csv(&mut ctx)?;
1966 let sql = "SELECT c4 FROM aggregate_test_100 WHERE c12 BETWEEN 0.995 AND 1.0";
1967 let mut actual = execute(&mut ctx, sql).await;
1968 actual.sort();
1969 let expected = vec![vec!["10837"]];
1970 assert_eq!(expected, actual);
1971 Ok(())
1972 }
1973
1974 #[tokio::test]
csv_between_expr_negated() -> Result<()>1975 async fn csv_between_expr_negated() -> Result<()> {
1976 let mut ctx = ExecutionContext::new();
1977 register_aggregate_csv(&mut ctx)?;
1978 let sql = "SELECT c4 FROM aggregate_test_100 WHERE c12 NOT BETWEEN 0 AND 0.995";
1979 let mut actual = execute(&mut ctx, sql).await;
1980 actual.sort();
1981 let expected = vec![vec!["10837"]];
1982 assert_eq!(expected, actual);
1983 Ok(())
1984 }
1985
1986 #[tokio::test]
csv_group_by_date() -> Result<()>1987 async fn csv_group_by_date() -> Result<()> {
1988 let mut ctx = ExecutionContext::new();
1989 let schema = Arc::new(Schema::new(vec![
1990 Field::new("date", DataType::Date32, false),
1991 Field::new("cnt", DataType::Int32, false),
1992 ]));
1993 let data = RecordBatch::try_new(
1994 schema.clone(),
1995 vec![
1996 Arc::new(Date32Array::from(vec![
1997 Some(100),
1998 Some(100),
1999 Some(100),
2000 Some(101),
2001 Some(101),
2002 Some(101),
2003 ])),
2004 Arc::new(Int32Array::from(vec![
2005 Some(1),
2006 Some(2),
2007 Some(3),
2008 Some(3),
2009 Some(3),
2010 Some(3),
2011 ])),
2012 ],
2013 )?;
2014 let table = MemTable::try_new(schema, vec![vec![data]])?;
2015
2016 ctx.register_table("dates", Arc::new(table));
2017 let sql = "SELECT SUM(cnt) FROM dates GROUP BY date";
2018 let actual = execute(&mut ctx, sql).await;
2019 let mut actual: Vec<String> = actual.iter().flatten().cloned().collect();
2020 actual.sort();
2021 let expected = vec!["6", "9"];
2022 assert_eq!(expected, actual);
2023 Ok(())
2024 }
2025
2026 #[tokio::test]
string_expressions() -> Result<()>2027 async fn string_expressions() -> Result<()> {
2028 let mut ctx = ExecutionContext::new();
2029 let sql = "SELECT
2030 char_length('tom') AS char_length
2031 ,char_length(NULL) AS char_length_null
2032 ,character_length('tom') AS character_length
2033 ,character_length(NULL) AS character_length_null
2034 ,lower('TOM') AS lower
2035 ,lower(NULL) AS lower_null
2036 ,upper('tom') AS upper
2037 ,upper(NULL) AS upper_null
2038 ,trim(' tom ') AS trim
2039 ,trim(NULL) AS trim_null
2040 ,ltrim(' tom ') AS trim_left
2041 ,rtrim(' tom ') AS trim_right
2042 ";
2043 let actual = execute(&mut ctx, sql).await;
2044
2045 let expected = vec![vec![
2046 "3", "NULL", "3", "NULL", "tom", "NULL", "TOM", "NULL", "tom", "NULL", "tom ",
2047 " tom",
2048 ]];
2049 assert_eq!(expected, actual);
2050 Ok(())
2051 }
2052
2053 #[tokio::test]
boolean_expressions() -> Result<()>2054 async fn boolean_expressions() -> Result<()> {
2055 let mut ctx = ExecutionContext::new();
2056 let sql = "SELECT
2057 true AS val_1,
2058 false AS val_2
2059 ";
2060 let actual = execute(&mut ctx, sql).await;
2061
2062 let expected = vec![vec!["true", "false"]];
2063 assert_eq!(expected, actual);
2064 Ok(())
2065 }
2066
2067 #[tokio::test]
interval_expressions() -> Result<()>2068 async fn interval_expressions() -> Result<()> {
2069 let mut ctx = ExecutionContext::new();
2070 let sql = "SELECT
2071 (interval '1') as interval_1,
2072 (interval '1 second') as interval_2,
2073 (interval '500 milliseconds') as interval_3,
2074 (interval '5 second') as interval_4,
2075 (interval '1 minute') as interval_5,
2076 (interval '0.5 minute') as interval_6,
2077 (interval '.5 minute') as interval_7,
2078 (interval '5 minute') as interval_8,
2079 (interval '5 minute 1 second') as interval_9,
2080 (interval '1 hour') as interval_10,
2081 (interval '5 hour') as interval_11,
2082 (interval '1 day') as interval_12,
2083 (interval '1 day 1') as interval_13,
2084 (interval '0.5') as interval_14,
2085 (interval '0.5 day 1') as interval_15,
2086 (interval '0.49 day') as interval_16,
2087 (interval '0.499 day') as interval_17,
2088 (interval '0.4999 day') as interval_18,
2089 (interval '0.49999 day') as interval_19,
2090 (interval '0.49999999999 day') as interval_20,
2091 (interval '5 day') as interval_21,
2092 (interval '5 day 4 hours 3 minutes 2 seconds 100 milliseconds') as interval_22,
2093 (interval '0.5 month') as interval_23,
2094 (interval '1 month') as interval_24,
2095 (interval '5 month') as interval_25,
2096 (interval '13 month') as interval_26,
2097 (interval '0.5 year') as interval_27,
2098 (interval '1 year') as interval_28,
2099 (interval '2 year') as interval_29
2100 ";
2101 let actual = execute(&mut ctx, sql).await;
2102
2103 let expected = vec![vec![
2104 "0 years 0 mons 0 days 0 hours 0 mins 1.00 secs",
2105 "0 years 0 mons 0 days 0 hours 0 mins 1.00 secs",
2106 "0 years 0 mons 0 days 0 hours 0 mins 0.500 secs",
2107 "0 years 0 mons 0 days 0 hours 0 mins 5.00 secs",
2108 "0 years 0 mons 0 days 0 hours 1 mins 0.00 secs",
2109 "0 years 0 mons 0 days 0 hours 0 mins 30.00 secs",
2110 "0 years 0 mons 0 days 0 hours 0 mins 30.00 secs",
2111 "0 years 0 mons 0 days 0 hours 5 mins 0.00 secs",
2112 "0 years 0 mons 0 days 0 hours 5 mins 1.00 secs",
2113 "0 years 0 mons 0 days 1 hours 0 mins 0.00 secs",
2114 "0 years 0 mons 0 days 5 hours 0 mins 0.00 secs",
2115 "0 years 0 mons 1 days 0 hours 0 mins 0.00 secs",
2116 "0 years 0 mons 1 days 0 hours 0 mins 1.00 secs",
2117 "0 years 0 mons 0 days 0 hours 0 mins 0.500 secs",
2118 "0 years 0 mons 0 days 12 hours 0 mins 1.00 secs",
2119 "0 years 0 mons 0 days 11 hours 45 mins 36.00 secs",
2120 "0 years 0 mons 0 days 11 hours 58 mins 33.596 secs",
2121 "0 years 0 mons 0 days 11 hours 59 mins 51.364 secs",
2122 "0 years 0 mons 0 days 11 hours 59 mins 59.136 secs",
2123 "0 years 0 mons 0 days 12 hours 0 mins 0.00 secs",
2124 "0 years 0 mons 5 days 0 hours 0 mins 0.00 secs",
2125 "0 years 0 mons 5 days 4 hours 3 mins 2.100 secs",
2126 "0 years 0 mons 15 days 0 hours 0 mins 0.00 secs",
2127 "0 years 1 mons 0 days 0 hours 0 mins 0.00 secs",
2128 "0 years 5 mons 0 days 0 hours 0 mins 0.00 secs",
2129 "1 years 1 mons 0 days 0 hours 0 mins 0.00 secs",
2130 "0 years 6 mons 0 days 0 hours 0 mins 0.00 secs",
2131 "1 years 0 mons 0 days 0 hours 0 mins 0.00 secs",
2132 "2 years 0 mons 0 days 0 hours 0 mins 0.00 secs",
2133 ]];
2134 assert_eq!(expected, actual);
2135 Ok(())
2136 }
2137
2138 #[tokio::test]
crypto_expressions() -> Result<()>2139 async fn crypto_expressions() -> Result<()> {
2140 let mut ctx = ExecutionContext::new();
2141 let sql = "SELECT
2142 md5('tom') AS md5_tom,
2143 md5('') AS md5_empty_str,
2144 md5(null) AS md5_null,
2145 sha224('tom') AS sha224_tom,
2146 sha224('') AS sha224_empty_str,
2147 sha224(null) AS sha224_null,
2148 sha256('tom') AS sha256_tom,
2149 sha256('') AS sha256_empty_str,
2150 sha384('tom') AS sha348_tom,
2151 sha384('') AS sha384_empty_str,
2152 sha512('tom') AS sha512_tom,
2153 sha512('') AS sha512_empty_str
2154 ";
2155 let actual = execute(&mut ctx, sql).await;
2156
2157 let expected = vec![vec![
2158 "34b7da764b21d298ef307d04d8152dc5",
2159 "d41d8cd98f00b204e9800998ecf8427e",
2160 "NULL",
2161 "0bf6cb62649c42a9ae3876ab6f6d92ad36cb5414e495f8873292be4d",
2162 "d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f",
2163 "NULL",
2164 "e1608f75c5d7813f3d4031cb30bfb786507d98137538ff8e128a6ff74e84e643",
2165 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
2166 "096f5b68aa77848e4fdf5c1c0b350de2dbfad60ffd7c25d9ea07c6c19b8a4d55a9187eb117c557883f58c16dfac3e343",
2167 "38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274edebfe76f65fbd51ad2f14898b95b",
2168 "6e1b9b3fe840680e37051f7ad5e959d6f39ad0f8885d855166f55c659469d3c8b78118c44a2a49c72ddb481cd6d8731034e11cc030070ba843a90b3495cb8d3e",
2169 "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"
2170 ]];
2171 assert_eq!(expected, actual);
2172 Ok(())
2173 }
2174
2175 #[tokio::test]
extract_date_part() -> Result<()>2176 async fn extract_date_part() -> Result<()> {
2177 let mut ctx = ExecutionContext::new();
2178 let sql = "SELECT
2179 date_part('hour', CAST('2020-01-01' AS DATE)) AS hr1,
2180 EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) AS hr2,
2181 EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) AS hr3,
2182 date_part('YEAR', CAST('2000-01-01' AS DATE)) AS year1,
2183 EXTRACT(year FROM to_timestamp('2020-09-08T12:00:00+00:00')) AS year2
2184 ";
2185
2186 let actual = execute(&mut ctx, sql).await;
2187
2188 let expected = vec![vec!["0", "0", "12", "2000", "2020"]];
2189 assert_eq!(expected, actual);
2190 Ok(())
2191 }
2192
2193 #[tokio::test]
in_list_array() -> Result<()>2194 async fn in_list_array() -> Result<()> {
2195 let mut ctx = ExecutionContext::new();
2196 register_aggregate_csv_by_sql(&mut ctx).await;
2197 let sql = "SELECT
2198 c1 IN ('a', 'c') AS utf8_in_true
2199 ,c1 IN ('x', 'y') AS utf8_in_false
2200 ,c1 NOT IN ('x', 'y') AS utf8_not_in_true
2201 ,c1 NOT IN ('a', 'c') AS utf8_not_in_false
2202 ,CAST(CAST(c1 AS int) AS varchar) IN ('a', 'c') AS utf8_in_null
2203 FROM aggregate_test_100 WHERE c12 < 0.05";
2204 let actual = execute(&mut ctx, sql).await;
2205 let expected = vec![
2206 vec!["true", "false", "true", "false", "NULL"],
2207 vec!["true", "false", "true", "false", "NULL"],
2208 vec!["true", "false", "true", "false", "NULL"],
2209 vec!["false", "false", "true", "true", "NULL"],
2210 vec!["false", "false", "true", "true", "NULL"],
2211 vec!["false", "false", "true", "true", "NULL"],
2212 vec!["false", "false", "true", "true", "NULL"],
2213 ];
2214 assert_eq!(expected, actual);
2215 Ok(())
2216 }
2217
2218 #[tokio::test]
in_list_scalar() -> Result<()>2219 async fn in_list_scalar() -> Result<()> {
2220 let mut ctx = ExecutionContext::new();
2221 let sql = "SELECT
2222 'a' IN ('a','b') AS utf8_in_true
2223 ,'c' IN ('a','b') AS utf8_in_false
2224 ,'c' NOT IN ('a','b') AS utf8_not_in_true
2225 ,'a' NOT IN ('a','b') AS utf8_not_in_false
2226 ,NULL IN ('a','b') AS utf8_in_null
2227 ,NULL NOT IN ('a','b') AS utf8_not_in_null
2228 ,'a' IN ('a','b',NULL) AS utf8_in_null_true
2229 ,'c' IN ('a','b',NULL) AS utf8_in_null_null
2230 ,'a' NOT IN ('a','b',NULL) AS utf8_not_in_null_false
2231 ,'c' NOT IN ('a','b',NULL) AS utf8_not_in_null_null
2232
2233 ,0 IN (0,1,2) AS int64_in_true
2234 ,3 IN (0,1,2) AS int64_in_false
2235 ,3 NOT IN (0,1,2) AS int64_not_in_true
2236 ,0 NOT IN (0,1,2) AS int64_not_in_false
2237 ,NULL IN (0,1,2) AS int64_in_null
2238 ,NULL NOT IN (0,1,2) AS int64_not_in_null
2239 ,0 IN (0,1,2,NULL) AS int64_in_null_true
2240 ,3 IN (0,1,2,NULL) AS int64_in_null_null
2241 ,0 NOT IN (0,1,2,NULL) AS int64_not_in_null_false
2242 ,3 NOT IN (0,1,2,NULL) AS int64_not_in_null_null
2243
2244 ,0.0 IN (0.0,0.1,0.2) AS float64_in_true
2245 ,0.3 IN (0.0,0.1,0.2) AS float64_in_false
2246 ,0.3 NOT IN (0.0,0.1,0.2) AS float64_not_in_true
2247 ,0.0 NOT IN (0.0,0.1,0.2) AS float64_not_in_false
2248 ,NULL IN (0.0,0.1,0.2) AS float64_in_null
2249 ,NULL NOT IN (0.0,0.1,0.2) AS float64_not_in_null
2250 ,0.0 IN (0.0,0.1,0.2,NULL) AS float64_in_null_true
2251 ,0.3 IN (0.0,0.1,0.2,NULL) AS float64_in_null_null
2252 ,0.0 NOT IN (0.0,0.1,0.2,NULL) AS float64_not_in_null_false
2253 ,0.3 NOT IN (0.0,0.1,0.2,NULL) AS float64_not_in_null_null
2254
2255 ,'1' IN ('a','b',1) AS utf8_cast_in_true
2256 ,'2' IN ('a','b',1) AS utf8_cast_in_false
2257 ,'2' NOT IN ('a','b',1) AS utf8_cast_not_in_true
2258 ,'1' NOT IN ('a','b',1) AS utf8_cast_not_in_false
2259 ,NULL IN ('a','b',1) AS utf8_cast_in_null
2260 ,NULL NOT IN ('a','b',1) AS utf8_cast_not_in_null
2261 ,'1' IN ('a','b',NULL,1) AS utf8_cast_in_null_true
2262 ,'2' IN ('a','b',NULL,1) AS utf8_cast_in_null_null
2263 ,'1' NOT IN ('a','b',NULL,1) AS utf8_cast_not_in_null_false
2264 ,'2' NOT IN ('a','b',NULL,1) AS utf8_cast_not_in_null_null
2265 ";
2266 let actual = execute(&mut ctx, sql).await;
2267
2268 let expected = vec![vec![
2269 "true", "false", "true", "false", "NULL", "NULL", "true", "NULL", "false",
2270 "NULL", "true", "false", "true", "false", "NULL", "NULL", "true", "NULL",
2271 "false", "NULL", "true", "false", "true", "false", "NULL", "NULL", "true",
2272 "NULL", "false", "NULL", "true", "false", "true", "false", "NULL", "NULL",
2273 "true", "NULL", "false", "NULL",
2274 ]];
2275 assert_eq!(expected, actual);
2276 Ok(())
2277 }
2278
2279 // TODO Tests to prove correct implementation of INNER JOIN's with qualified names.
2280 // https://issues.apache.org/jira/projects/ARROW/issues/ARROW-11432.
2281 #[tokio::test]
2282 #[ignore]
inner_join_qualified_names() -> Result<()>2283 async fn inner_join_qualified_names() -> Result<()> {
2284 // Setup the statements that test qualified names function correctly.
2285 let equivalent_sql = [
2286 "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
2287 FROM t1
2288 INNER JOIN t2 ON t1.a = t2.a
2289 ORDER BY t1.a",
2290 "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
2291 FROM t1
2292 INNER JOIN t2 ON t2.a = t1.a
2293 ORDER BY t1.a",
2294 ];
2295
2296 let expected = vec![
2297 vec!["1", "10", "50", "1", "100", "500"],
2298 vec!["2", "20", "60", "2", "20", "600"],
2299 vec!["4", "40", "80", "4", "400", "800"],
2300 ];
2301
2302 for sql in equivalent_sql.iter() {
2303 let mut ctx = create_join_context_qualified()?;
2304 let actual = execute(&mut ctx, sql).await;
2305 assert_eq!(expected, actual);
2306 }
2307 Ok(())
2308 }
2309