1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 use std::convert::TryFrom;
19 use std::sync::Arc;
20 
21 extern crate arrow;
22 extern crate datafusion;
23 
24 use arrow::{array::*, datatypes::TimeUnit};
25 use arrow::{datatypes::Int32Type, datatypes::Int64Type, record_batch::RecordBatch};
26 use arrow::{
27     datatypes::{DataType, Field, Schema, SchemaRef},
28     util::display::array_value_to_string,
29 };
30 
31 use datafusion::execution::context::ExecutionContext;
32 use datafusion::logical_plan::{LogicalPlan, ToDFSchema};
33 use datafusion::prelude::create_udf;
34 use datafusion::{
35     datasource::{csv::CsvReadOptions, MemTable},
36     physical_plan::collect,
37 };
38 use datafusion::{error::Result, physical_plan::ColumnarValue};
39 
40 #[tokio::test]
nyc() -> Result<()>41 async fn nyc() -> Result<()> {
42     // schema for nyxtaxi csv files
43     let schema = Schema::new(vec![
44         Field::new("VendorID", DataType::Utf8, true),
45         Field::new("tpep_pickup_datetime", DataType::Utf8, true),
46         Field::new("tpep_dropoff_datetime", DataType::Utf8, true),
47         Field::new("passenger_count", DataType::Utf8, true),
48         Field::new("trip_distance", DataType::Float64, true),
49         Field::new("RatecodeID", DataType::Utf8, true),
50         Field::new("store_and_fwd_flag", DataType::Utf8, true),
51         Field::new("PULocationID", DataType::Utf8, true),
52         Field::new("DOLocationID", DataType::Utf8, true),
53         Field::new("payment_type", DataType::Utf8, true),
54         Field::new("fare_amount", DataType::Float64, true),
55         Field::new("extra", DataType::Float64, true),
56         Field::new("mta_tax", DataType::Float64, true),
57         Field::new("tip_amount", DataType::Float64, true),
58         Field::new("tolls_amount", DataType::Float64, true),
59         Field::new("improvement_surcharge", DataType::Float64, true),
60         Field::new("total_amount", DataType::Float64, true),
61     ]);
62 
63     let mut ctx = ExecutionContext::new();
64     ctx.register_csv(
65         "tripdata",
66         "file.csv",
67         CsvReadOptions::new().schema(&schema),
68     )?;
69 
70     let logical_plan = ctx.create_logical_plan(
71         "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount) \
72          FROM tripdata GROUP BY passenger_count",
73     )?;
74 
75     let optimized_plan = ctx.optimize(&logical_plan)?;
76 
77     match &optimized_plan {
78         LogicalPlan::Aggregate { input, .. } => match input.as_ref() {
79             LogicalPlan::TableScan {
80                 ref projected_schema,
81                 ..
82             } => {
83                 assert_eq!(2, projected_schema.fields().len());
84                 assert_eq!(projected_schema.field(0).name(), "passenger_count");
85                 assert_eq!(projected_schema.field(1).name(), "fare_amount");
86             }
87             _ => unreachable!(),
88         },
89         _ => unreachable!(false),
90     }
91 
92     Ok(())
93 }
94 
95 #[tokio::test]
parquet_query()96 async fn parquet_query() {
97     let mut ctx = ExecutionContext::new();
98     register_alltypes_parquet(&mut ctx);
99     // NOTE that string_col is actually a binary column and does not have the UTF8 logical type
100     // so we need an explicit cast
101     let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain";
102     let actual = execute(&mut ctx, sql).await;
103     let expected = vec![
104         vec!["4", "0"],
105         vec!["5", "1"],
106         vec!["6", "0"],
107         vec!["7", "1"],
108         vec!["2", "0"],
109         vec!["3", "1"],
110         vec!["0", "0"],
111         vec!["1", "1"],
112     ];
113 
114     assert_eq!(expected, actual);
115 }
116 
117 #[tokio::test]
parquet_single_nan_schema()118 async fn parquet_single_nan_schema() {
119     let mut ctx = ExecutionContext::new();
120     let testdata = arrow::util::test_util::parquet_test_data();
121     ctx.register_parquet("single_nan", &format!("{}/single_nan.parquet", testdata))
122         .unwrap();
123     let sql = "SELECT mycol FROM single_nan";
124     let plan = ctx.create_logical_plan(&sql).unwrap();
125     let plan = ctx.optimize(&plan).unwrap();
126     let plan = ctx.create_physical_plan(&plan).unwrap();
127     let results = collect(plan).await.unwrap();
128     for batch in results {
129         assert_eq!(1, batch.num_rows());
130         assert_eq!(1, batch.num_columns());
131     }
132 }
133 
134 #[tokio::test]
135 #[ignore = "Test ignored, will be enabled as part of the nested Parquet reader"]
parquet_list_columns()136 async fn parquet_list_columns() {
137     let mut ctx = ExecutionContext::new();
138     let testdata = arrow::util::test_util::parquet_test_data();
139     ctx.register_parquet(
140         "list_columns",
141         &format!("{}/list_columns.parquet", testdata),
142     )
143     .unwrap();
144 
145     let schema = Arc::new(Schema::new(vec![
146         Field::new(
147             "int64_list",
148             DataType::List(Box::new(Field::new("item", DataType::Int64, true))),
149             true,
150         ),
151         Field::new(
152             "utf8_list",
153             DataType::List(Box::new(Field::new("item", DataType::Utf8, true))),
154             true,
155         ),
156     ]));
157 
158     let sql = "SELECT int64_list, utf8_list FROM list_columns";
159     let plan = ctx.create_logical_plan(&sql).unwrap();
160     let plan = ctx.optimize(&plan).unwrap();
161     let plan = ctx.create_physical_plan(&plan).unwrap();
162     let results = collect(plan).await.unwrap();
163 
164     //   int64_list              utf8_list
165     // 0  [1, 2, 3]        [abc, efg, hij]
166     // 1  [None, 1]                   None
167     // 2        [4]  [efg, None, hij, xyz]
168 
169     assert_eq!(1, results.len());
170     let batch = &results[0];
171     assert_eq!(3, batch.num_rows());
172     assert_eq!(2, batch.num_columns());
173     assert_eq!(schema, batch.schema());
174 
175     let int_list_array = batch
176         .column(0)
177         .as_any()
178         .downcast_ref::<ListArray>()
179         .unwrap();
180     let utf8_list_array = batch
181         .column(1)
182         .as_any()
183         .downcast_ref::<ListArray>()
184         .unwrap();
185 
186     assert_eq!(
187         int_list_array
188             .value(0)
189             .as_any()
190             .downcast_ref::<PrimitiveArray<Int64Type>>()
191             .unwrap(),
192         &PrimitiveArray::<Int64Type>::from(vec![Some(1), Some(2), Some(3),])
193     );
194 
195     assert_eq!(
196         utf8_list_array
197             .value(0)
198             .as_any()
199             .downcast_ref::<StringArray>()
200             .unwrap(),
201         &StringArray::try_from(vec![Some("abc"), Some("efg"), Some("hij"),]).unwrap()
202     );
203 
204     assert_eq!(
205         int_list_array
206             .value(1)
207             .as_any()
208             .downcast_ref::<PrimitiveArray<Int64Type>>()
209             .unwrap(),
210         &PrimitiveArray::<Int64Type>::from(vec![None, Some(1),])
211     );
212 
213     assert!(utf8_list_array.is_null(1));
214 
215     assert_eq!(
216         int_list_array
217             .value(2)
218             .as_any()
219             .downcast_ref::<PrimitiveArray<Int64Type>>()
220             .unwrap(),
221         &PrimitiveArray::<Int64Type>::from(vec![Some(4),])
222     );
223 
224     let result = utf8_list_array.value(2);
225     let result = result.as_any().downcast_ref::<StringArray>().unwrap();
226 
227     assert_eq!(result.value(0), "efg");
228     assert!(result.is_null(1));
229     assert_eq!(result.value(2), "hij");
230     assert_eq!(result.value(3), "xyz");
231 }
232 
233 #[tokio::test]
csv_select_nested() -> Result<()>234 async fn csv_select_nested() -> Result<()> {
235     let mut ctx = ExecutionContext::new();
236     register_aggregate_csv(&mut ctx)?;
237     let sql = "SELECT o1, o2, c3
238                FROM (
239                  SELECT c1 AS o1, c2 + 1 AS o2, c3
240                  FROM (
241                    SELECT c1, c2, c3, c4
242                    FROM aggregate_test_100
243                    WHERE c1 = 'a' AND c2 >= 4
244                    ORDER BY c2 ASC, c3 ASC
245                  )
246                )";
247     let actual = execute(&mut ctx, sql).await;
248     let expected = vec![
249         vec!["a", "5", "-101"],
250         vec!["a", "5", "-54"],
251         vec!["a", "5", "-38"],
252         vec!["a", "5", "65"],
253         vec!["a", "6", "-101"],
254         vec!["a", "6", "-31"],
255         vec!["a", "6", "36"],
256     ];
257     assert_eq!(expected, actual);
258     Ok(())
259 }
260 
261 #[tokio::test]
csv_count_star() -> Result<()>262 async fn csv_count_star() -> Result<()> {
263     let mut ctx = ExecutionContext::new();
264     register_aggregate_csv(&mut ctx)?;
265     let sql = "SELECT COUNT(*), COUNT(1) AS c, COUNT(c1) FROM aggregate_test_100";
266     let actual = execute(&mut ctx, sql).await;
267     let expected = vec![vec!["100", "100", "100"]];
268     assert_eq!(expected, actual);
269     Ok(())
270 }
271 
272 #[tokio::test]
csv_query_with_predicate() -> Result<()>273 async fn csv_query_with_predicate() -> Result<()> {
274     let mut ctx = ExecutionContext::new();
275     register_aggregate_csv(&mut ctx)?;
276     let sql = "SELECT c1, c12 FROM aggregate_test_100 WHERE c12 > 0.376 AND c12 < 0.4";
277     let actual = execute(&mut ctx, sql).await;
278     let expected = vec![
279         vec!["e", "0.39144436569161134"],
280         vec!["d", "0.38870280983958583"],
281     ];
282     assert_eq!(expected, actual);
283     Ok(())
284 }
285 
286 #[tokio::test]
csv_query_with_negative_predicate() -> Result<()>287 async fn csv_query_with_negative_predicate() -> Result<()> {
288     let mut ctx = ExecutionContext::new();
289     register_aggregate_csv(&mut ctx)?;
290     let sql = "SELECT c1, c4 FROM aggregate_test_100 WHERE c3 < -55 AND -c4 > 30000";
291     let actual = execute(&mut ctx, sql).await;
292     let expected = vec![vec!["e", "-31500"], vec!["c", "-30187"]];
293     assert_eq!(expected, actual);
294     Ok(())
295 }
296 
297 #[tokio::test]
csv_query_with_negated_predicate() -> Result<()>298 async fn csv_query_with_negated_predicate() -> Result<()> {
299     let mut ctx = ExecutionContext::new();
300     register_aggregate_csv(&mut ctx)?;
301     let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE NOT(c1 != 'a')";
302     let actual = execute(&mut ctx, sql).await;
303     let expected = vec![vec!["21"]];
304     assert_eq!(expected, actual);
305     Ok(())
306 }
307 
308 #[tokio::test]
csv_query_with_is_not_null_predicate() -> Result<()>309 async fn csv_query_with_is_not_null_predicate() -> Result<()> {
310     let mut ctx = ExecutionContext::new();
311     register_aggregate_csv(&mut ctx)?;
312     let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE c1 IS NOT NULL";
313     let actual = execute(&mut ctx, sql).await;
314     let expected = vec![vec!["100"]];
315     assert_eq!(expected, actual);
316     Ok(())
317 }
318 
319 #[tokio::test]
csv_query_with_is_null_predicate() -> Result<()>320 async fn csv_query_with_is_null_predicate() -> Result<()> {
321     let mut ctx = ExecutionContext::new();
322     register_aggregate_csv(&mut ctx)?;
323     let sql = "SELECT COUNT(1) FROM aggregate_test_100 WHERE c1 IS NULL";
324     let actual = execute(&mut ctx, sql).await;
325     let expected = vec![vec!["0"]];
326     assert_eq!(expected, actual);
327     Ok(())
328 }
329 
330 #[tokio::test]
csv_query_group_by_int_min_max() -> Result<()>331 async fn csv_query_group_by_int_min_max() -> Result<()> {
332     let mut ctx = ExecutionContext::new();
333     register_aggregate_csv(&mut ctx)?;
334     let sql = "SELECT c2, MIN(c12), MAX(c12) FROM aggregate_test_100 GROUP BY c2";
335     let mut actual = execute(&mut ctx, sql).await;
336     actual.sort();
337     let expected = vec![
338         vec!["1", "0.05636955101974106", "0.9965400387585364"],
339         vec!["2", "0.16301110515739792", "0.991517828651004"],
340         vec!["3", "0.047343434291126085", "0.9293883502480845"],
341         vec!["4", "0.02182578039211991", "0.9237877978193884"],
342         vec!["5", "0.01479305307777301", "0.9723580396501548"],
343     ];
344     assert_eq!(expected, actual);
345     Ok(())
346 }
347 
348 #[tokio::test]
csv_query_group_by_float32() -> Result<()>349 async fn csv_query_group_by_float32() -> Result<()> {
350     let mut ctx = ExecutionContext::new();
351     register_aggregate_simple_csv(&mut ctx)?;
352 
353     let sql =
354         "SELECT COUNT(*) as cnt, c1 FROM aggregate_simple GROUP BY c1 ORDER BY cnt DESC";
355     let actual = execute(&mut ctx, sql).await;
356 
357     let expected = vec![
358         vec!["5", "0.00005"],
359         vec!["4", "0.00004"],
360         vec!["3", "0.00003"],
361         vec!["2", "0.00002"],
362         vec!["1", "0.00001"],
363     ];
364     assert_eq!(expected, actual);
365 
366     Ok(())
367 }
368 
369 #[tokio::test]
csv_query_group_by_float64() -> Result<()>370 async fn csv_query_group_by_float64() -> Result<()> {
371     let mut ctx = ExecutionContext::new();
372     register_aggregate_simple_csv(&mut ctx)?;
373 
374     let sql =
375         "SELECT COUNT(*) as cnt, c2 FROM aggregate_simple GROUP BY c2 ORDER BY cnt DESC";
376     let actual = execute(&mut ctx, sql).await;
377 
378     let expected = vec![
379         vec!["5", "0.000000000005"],
380         vec!["4", "0.000000000004"],
381         vec!["3", "0.000000000003"],
382         vec!["2", "0.000000000002"],
383         vec!["1", "0.000000000001"],
384     ];
385     assert_eq!(expected, actual);
386 
387     Ok(())
388 }
389 
390 #[tokio::test]
csv_query_group_by_boolean() -> Result<()>391 async fn csv_query_group_by_boolean() -> Result<()> {
392     let mut ctx = ExecutionContext::new();
393     register_aggregate_simple_csv(&mut ctx)?;
394 
395     let sql =
396         "SELECT COUNT(*) as cnt, c3 FROM aggregate_simple GROUP BY c3 ORDER BY cnt DESC";
397     let actual = execute(&mut ctx, sql).await;
398 
399     let expected = vec![vec!["9", "true"], vec!["6", "false"]];
400     assert_eq!(expected, actual);
401 
402     Ok(())
403 }
404 
405 #[tokio::test]
csv_query_group_by_two_columns() -> Result<()>406 async fn csv_query_group_by_two_columns() -> Result<()> {
407     let mut ctx = ExecutionContext::new();
408     register_aggregate_csv(&mut ctx)?;
409     let sql = "SELECT c1, c2, MIN(c3) FROM aggregate_test_100 GROUP BY c1, c2";
410     let mut actual = execute(&mut ctx, sql).await;
411     actual.sort();
412     let expected = vec![
413         vec!["a", "1", "-85"],
414         vec!["a", "2", "-48"],
415         vec!["a", "3", "-72"],
416         vec!["a", "4", "-101"],
417         vec!["a", "5", "-101"],
418         vec!["b", "1", "12"],
419         vec!["b", "2", "-60"],
420         vec!["b", "3", "-101"],
421         vec!["b", "4", "-117"],
422         vec!["b", "5", "-82"],
423         vec!["c", "1", "-24"],
424         vec!["c", "2", "-117"],
425         vec!["c", "3", "-2"],
426         vec!["c", "4", "-90"],
427         vec!["c", "5", "-94"],
428         vec!["d", "1", "-99"],
429         vec!["d", "2", "93"],
430         vec!["d", "3", "-76"],
431         vec!["d", "4", "5"],
432         vec!["d", "5", "-59"],
433         vec!["e", "1", "36"],
434         vec!["e", "2", "-61"],
435         vec!["e", "3", "-95"],
436         vec!["e", "4", "-56"],
437         vec!["e", "5", "-86"],
438     ];
439     assert_eq!(expected, actual);
440     Ok(())
441 }
442 
443 #[tokio::test]
csv_query_group_by_and_having() -> Result<()>444 async fn csv_query_group_by_and_having() -> Result<()> {
445     let mut ctx = ExecutionContext::new();
446     register_aggregate_csv(&mut ctx)?;
447     let sql = "SELECT c1, MIN(c3) AS m FROM aggregate_test_100 GROUP BY c1 HAVING m < -100 AND MAX(c3) > 70";
448     let mut actual = execute(&mut ctx, sql).await;
449     actual.sort();
450     let expected = vec![vec!["a", "-101"], vec!["c", "-117"]];
451     assert_eq!(expected, actual);
452     Ok(())
453 }
454 
455 #[tokio::test]
csv_query_group_by_and_having_and_where() -> Result<()>456 async fn csv_query_group_by_and_having_and_where() -> Result<()> {
457     let mut ctx = ExecutionContext::new();
458     register_aggregate_csv(&mut ctx)?;
459     let sql = "SELECT c1, MIN(c3) AS m
460                FROM aggregate_test_100
461                WHERE c1 IN ('a', 'b')
462                GROUP BY c1
463                HAVING m < -100 AND MAX(c3) > 70";
464     let mut actual = execute(&mut ctx, sql).await;
465     actual.sort();
466     let expected = vec![vec!["a", "-101"]];
467     assert_eq!(expected, actual);
468     Ok(())
469 }
470 
471 #[tokio::test]
csv_query_having_without_group_by() -> Result<()>472 async fn csv_query_having_without_group_by() -> Result<()> {
473     let mut ctx = ExecutionContext::new();
474     register_aggregate_csv(&mut ctx)?;
475     let sql = "SELECT c1, c2, c3 FROM aggregate_test_100 HAVING c2 >= 4 AND c3 > 90";
476     let mut actual = execute(&mut ctx, sql).await;
477     actual.sort();
478     let expected = vec![
479         vec!["c", "4", "123"],
480         vec!["c", "5", "118"],
481         vec!["d", "4", "102"],
482         vec!["e", "4", "96"],
483         vec!["e", "4", "97"],
484     ];
485     assert_eq!(expected, actual);
486     Ok(())
487 }
488 
489 #[tokio::test]
csv_query_avg_sqrt() -> Result<()>490 async fn csv_query_avg_sqrt() -> Result<()> {
491     let mut ctx = create_ctx()?;
492     register_aggregate_csv(&mut ctx)?;
493     let sql = "SELECT avg(custom_sqrt(c12)) FROM aggregate_test_100";
494     let mut actual = execute(&mut ctx, sql).await;
495     actual.sort();
496     let expected = vec![vec!["0.6706002946036462"]];
497     assert_float_eq(&expected, &actual);
498     Ok(())
499 }
500 
501 /// test that casting happens on udfs.
502 /// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the logical plan and
503 /// physical plan have the same schema.
504 #[tokio::test]
csv_query_custom_udf_with_cast() -> Result<()>505 async fn csv_query_custom_udf_with_cast() -> Result<()> {
506     let mut ctx = create_ctx()?;
507     register_aggregate_csv(&mut ctx)?;
508     let sql = "SELECT avg(custom_sqrt(c11)) FROM aggregate_test_100";
509     let actual = execute(&mut ctx, sql).await;
510     let expected = vec![vec!["0.6584408483418833"]];
511     assert_float_eq(&expected, &actual);
512     Ok(())
513 }
514 
515 /// sqrt(f32) is slightly different than sqrt(CAST(f32 AS double)))
516 #[tokio::test]
sqrt_f32_vs_f64() -> Result<()>517 async fn sqrt_f32_vs_f64() -> Result<()> {
518     let mut ctx = create_ctx()?;
519     register_aggregate_csv(&mut ctx)?;
520     // sqrt(f32)'s plan passes
521     let sql = "SELECT avg(sqrt(c11)) FROM aggregate_test_100";
522     let actual = execute(&mut ctx, sql).await;
523     let expected = vec![vec!["0.6584408485889435"]];
524 
525     assert_eq!(actual, expected);
526     let sql = "SELECT avg(sqrt(CAST(c11 AS double))) FROM aggregate_test_100";
527     let actual = execute(&mut ctx, sql).await;
528     let expected = vec![vec!["0.6584408483418833"]];
529     assert_float_eq(&expected, &actual);
530     Ok(())
531 }
532 
533 #[tokio::test]
csv_query_error() -> Result<()>534 async fn csv_query_error() -> Result<()> {
535     // sin(utf8) should error
536     let mut ctx = create_ctx()?;
537     register_aggregate_csv(&mut ctx)?;
538     let sql = "SELECT sin(c1) FROM aggregate_test_100";
539     let plan = ctx.create_logical_plan(&sql);
540     assert!(plan.is_err());
541     Ok(())
542 }
543 
544 // this query used to deadlock due to the call udf(udf())
545 #[tokio::test]
csv_query_sqrt_sqrt() -> Result<()>546 async fn csv_query_sqrt_sqrt() -> Result<()> {
547     let mut ctx = create_ctx()?;
548     register_aggregate_csv(&mut ctx)?;
549     let sql = "SELECT sqrt(sqrt(c12)) FROM aggregate_test_100 LIMIT 1";
550     let actual = execute(&mut ctx, sql).await;
551     // sqrt(sqrt(c12=0.9294097332465232)) = 0.9818650561397431
552     let expected = vec![vec!["0.9818650561397431"]];
553     assert_float_eq(&expected, &actual);
554     Ok(())
555 }
556 
557 #[allow(clippy::unnecessary_wraps)]
create_ctx() -> Result<ExecutionContext>558 fn create_ctx() -> Result<ExecutionContext> {
559     let mut ctx = ExecutionContext::new();
560 
561     // register a custom UDF
562     ctx.register_udf(create_udf(
563         "custom_sqrt",
564         vec![DataType::Float64],
565         Arc::new(DataType::Float64),
566         Arc::new(custom_sqrt),
567     ));
568 
569     Ok(ctx)
570 }
571 
custom_sqrt(args: &[ColumnarValue]) -> Result<ColumnarValue>572 fn custom_sqrt(args: &[ColumnarValue]) -> Result<ColumnarValue> {
573     let arg = &args[0];
574     if let ColumnarValue::Array(v) = arg {
575         let input = v
576             .as_any()
577             .downcast_ref::<Float64Array>()
578             .expect("cast failed");
579 
580         let array: Float64Array = input.iter().map(|v| v.map(|x| x.sqrt())).collect();
581         Ok(ColumnarValue::Array(Arc::new(array)))
582     } else {
583         unimplemented!()
584     }
585 }
586 
587 #[tokio::test]
csv_query_avg() -> Result<()>588 async fn csv_query_avg() -> Result<()> {
589     let mut ctx = ExecutionContext::new();
590     register_aggregate_csv(&mut ctx)?;
591     let sql = "SELECT avg(c12) FROM aggregate_test_100";
592     let mut actual = execute(&mut ctx, sql).await;
593     actual.sort();
594     let expected = vec![vec!["0.5089725099127211"]];
595     assert_float_eq(&expected, &actual);
596     Ok(())
597 }
598 
599 #[tokio::test]
csv_query_group_by_avg() -> Result<()>600 async fn csv_query_group_by_avg() -> Result<()> {
601     let mut ctx = ExecutionContext::new();
602     register_aggregate_csv(&mut ctx)?;
603     let sql = "SELECT c1, avg(c12) FROM aggregate_test_100 GROUP BY c1";
604     let mut actual = execute(&mut ctx, sql).await;
605     actual.sort();
606     let expected = vec![
607         vec!["a", "0.48754517466109415"],
608         vec!["b", "0.41040709263815384"],
609         vec!["c", "0.6600456536439784"],
610         vec!["d", "0.48855379387549824"],
611         vec!["e", "0.48600669271341534"],
612     ];
613     assert_eq!(expected, actual);
614     Ok(())
615 }
616 
617 #[tokio::test]
csv_query_group_by_avg_with_projection() -> Result<()>618 async fn csv_query_group_by_avg_with_projection() -> Result<()> {
619     let mut ctx = ExecutionContext::new();
620     register_aggregate_csv(&mut ctx)?;
621     let sql = "SELECT avg(c12), c1 FROM aggregate_test_100 GROUP BY c1";
622     let mut actual = execute(&mut ctx, sql).await;
623     actual.sort();
624     let expected = vec![
625         vec!["0.41040709263815384", "b"],
626         vec!["0.48600669271341534", "e"],
627         vec!["0.48754517466109415", "a"],
628         vec!["0.48855379387549824", "d"],
629         vec!["0.6600456536439784", "c"],
630     ];
631     assert_eq!(expected, actual);
632     Ok(())
633 }
634 
635 #[tokio::test]
csv_query_avg_multi_batch() -> Result<()>636 async fn csv_query_avg_multi_batch() -> Result<()> {
637     let mut ctx = ExecutionContext::new();
638     register_aggregate_csv(&mut ctx)?;
639     let sql = "SELECT avg(c12) FROM aggregate_test_100";
640     let plan = ctx.create_logical_plan(&sql).unwrap();
641     let plan = ctx.optimize(&plan).unwrap();
642     let plan = ctx.create_physical_plan(&plan).unwrap();
643     let results = collect(plan).await.unwrap();
644     let batch = &results[0];
645     let column = batch.column(0);
646     let array = column.as_any().downcast_ref::<Float64Array>().unwrap();
647     let actual = array.value(0);
648     let expected = 0.5089725;
649     // Due to float number's accuracy, different batch size will lead to different
650     // answers.
651     assert!((expected - actual).abs() < 0.01);
652     Ok(())
653 }
654 
655 #[tokio::test]
csv_query_nullif_divide_by_0() -> Result<()>656 async fn csv_query_nullif_divide_by_0() -> Result<()> {
657     let mut ctx = ExecutionContext::new();
658     register_aggregate_csv(&mut ctx)?;
659     let sql = "SELECT c8/nullif(c7, 0) FROM aggregate_test_100";
660     let actual = execute(&mut ctx, sql).await;
661     let actual = &actual[80..90]; // We just want to compare rows 80-89
662     let expected = vec![
663         vec!["258"],
664         vec!["664"],
665         vec!["NULL"],
666         vec!["22"],
667         vec!["164"],
668         vec!["448"],
669         vec!["365"],
670         vec!["1640"],
671         vec!["671"],
672         vec!["203"],
673     ];
674     assert_eq!(expected, actual);
675     Ok(())
676 }
677 
678 #[tokio::test]
csv_query_count() -> Result<()>679 async fn csv_query_count() -> Result<()> {
680     let mut ctx = ExecutionContext::new();
681     register_aggregate_csv(&mut ctx)?;
682     let sql = "SELECT count(c12) FROM aggregate_test_100";
683     let actual = execute(&mut ctx, sql).await;
684     let expected = vec![vec!["100"]];
685     assert_eq!(expected, actual);
686     Ok(())
687 }
688 
689 #[tokio::test]
csv_query_group_by_int_count() -> Result<()>690 async fn csv_query_group_by_int_count() -> Result<()> {
691     let mut ctx = ExecutionContext::new();
692     register_aggregate_csv(&mut ctx)?;
693     let sql = "SELECT c1, count(c12) FROM aggregate_test_100 GROUP BY c1";
694     let mut actual = execute(&mut ctx, sql).await;
695     actual.sort();
696     let expected = vec![
697         vec!["a", "21"],
698         vec!["b", "19"],
699         vec!["c", "21"],
700         vec!["d", "18"],
701         vec!["e", "21"],
702     ];
703     assert_eq!(expected, actual);
704     Ok(())
705 }
706 
707 #[tokio::test]
csv_query_group_with_aliased_aggregate() -> Result<()>708 async fn csv_query_group_with_aliased_aggregate() -> Result<()> {
709     let mut ctx = ExecutionContext::new();
710     register_aggregate_csv(&mut ctx)?;
711     let sql = "SELECT c1, count(c12) AS count FROM aggregate_test_100 GROUP BY c1";
712     let mut actual = execute(&mut ctx, sql).await;
713     actual.sort();
714     let expected = vec![
715         vec!["a", "21"],
716         vec!["b", "19"],
717         vec!["c", "21"],
718         vec!["d", "18"],
719         vec!["e", "21"],
720     ];
721     assert_eq!(expected, actual);
722     Ok(())
723 }
724 
725 #[tokio::test]
csv_query_group_by_string_min_max() -> Result<()>726 async fn csv_query_group_by_string_min_max() -> Result<()> {
727     let mut ctx = ExecutionContext::new();
728     register_aggregate_csv(&mut ctx)?;
729     let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 GROUP BY c1";
730     let mut actual = execute(&mut ctx, sql).await;
731     actual.sort();
732     let expected = vec![
733         vec!["a", "0.02182578039211991", "0.9800193410444061"],
734         vec!["b", "0.04893135681998029", "0.9185813970744787"],
735         vec!["c", "0.0494924465469434", "0.991517828651004"],
736         vec!["d", "0.061029375346466685", "0.9748360509016578"],
737         vec!["e", "0.01479305307777301", "0.9965400387585364"],
738     ];
739     assert_eq!(expected, actual);
740     Ok(())
741 }
742 
743 #[tokio::test]
csv_query_cast() -> Result<()>744 async fn csv_query_cast() -> Result<()> {
745     let mut ctx = ExecutionContext::new();
746     register_aggregate_csv(&mut ctx)?;
747     let sql = "SELECT CAST(c12 AS float) FROM aggregate_test_100 WHERE c12 > 0.376 AND c12 < 0.4";
748     let actual = execute(&mut ctx, sql).await;
749     let expected = vec![vec!["0.39144436569161134"], vec!["0.38870280983958583"]];
750     assert_eq!(expected, actual);
751     Ok(())
752 }
753 
754 #[tokio::test]
csv_query_cast_literal() -> Result<()>755 async fn csv_query_cast_literal() -> Result<()> {
756     let mut ctx = ExecutionContext::new();
757     register_aggregate_csv(&mut ctx)?;
758     let sql =
759         "SELECT c12, CAST(1 AS float) FROM aggregate_test_100 WHERE c12 > CAST(0 AS float) LIMIT 2";
760     let actual = execute(&mut ctx, sql).await;
761     let expected = vec![
762         vec!["0.9294097332465232", "1"],
763         vec!["0.3114712539863804", "1"],
764     ];
765     assert_eq!(expected, actual);
766     Ok(())
767 }
768 
769 #[tokio::test]
csv_query_limit() -> Result<()>770 async fn csv_query_limit() -> Result<()> {
771     let mut ctx = ExecutionContext::new();
772     register_aggregate_csv(&mut ctx)?;
773     let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 2";
774     let actual = execute(&mut ctx, sql).await;
775     let expected = vec![vec!["c"], vec!["d"]];
776     assert_eq!(expected, actual);
777     Ok(())
778 }
779 
780 #[tokio::test]
csv_query_limit_bigger_than_nbr_of_rows() -> Result<()>781 async fn csv_query_limit_bigger_than_nbr_of_rows() -> Result<()> {
782     let mut ctx = ExecutionContext::new();
783     register_aggregate_csv(&mut ctx)?;
784     let sql = "SELECT c2 FROM aggregate_test_100 LIMIT 200";
785     let actual = execute(&mut ctx, sql).await;
786     let expected = vec![
787         vec!["2"],
788         vec!["5"],
789         vec!["1"],
790         vec!["1"],
791         vec!["5"],
792         vec!["4"],
793         vec!["3"],
794         vec!["3"],
795         vec!["1"],
796         vec!["4"],
797         vec!["1"],
798         vec!["4"],
799         vec!["3"],
800         vec!["2"],
801         vec!["1"],
802         vec!["1"],
803         vec!["2"],
804         vec!["1"],
805         vec!["3"],
806         vec!["2"],
807         vec!["4"],
808         vec!["1"],
809         vec!["5"],
810         vec!["4"],
811         vec!["2"],
812         vec!["1"],
813         vec!["4"],
814         vec!["5"],
815         vec!["2"],
816         vec!["3"],
817         vec!["4"],
818         vec!["2"],
819         vec!["1"],
820         vec!["5"],
821         vec!["3"],
822         vec!["1"],
823         vec!["2"],
824         vec!["3"],
825         vec!["3"],
826         vec!["3"],
827         vec!["2"],
828         vec!["4"],
829         vec!["1"],
830         vec!["3"],
831         vec!["2"],
832         vec!["5"],
833         vec!["2"],
834         vec!["1"],
835         vec!["4"],
836         vec!["1"],
837         vec!["4"],
838         vec!["2"],
839         vec!["5"],
840         vec!["4"],
841         vec!["2"],
842         vec!["3"],
843         vec!["4"],
844         vec!["4"],
845         vec!["4"],
846         vec!["5"],
847         vec!["4"],
848         vec!["2"],
849         vec!["1"],
850         vec!["2"],
851         vec!["4"],
852         vec!["2"],
853         vec!["3"],
854         vec!["5"],
855         vec!["1"],
856         vec!["1"],
857         vec!["4"],
858         vec!["2"],
859         vec!["1"],
860         vec!["2"],
861         vec!["1"],
862         vec!["1"],
863         vec!["5"],
864         vec!["4"],
865         vec!["5"],
866         vec!["2"],
867         vec!["3"],
868         vec!["2"],
869         vec!["4"],
870         vec!["1"],
871         vec!["3"],
872         vec!["4"],
873         vec!["3"],
874         vec!["2"],
875         vec!["5"],
876         vec!["3"],
877         vec!["3"],
878         vec!["2"],
879         vec!["5"],
880         vec!["5"],
881         vec!["4"],
882         vec!["1"],
883         vec!["3"],
884         vec!["3"],
885         vec!["4"],
886         vec!["4"],
887     ];
888     assert_eq!(expected, actual);
889     Ok(())
890 }
891 
892 #[tokio::test]
csv_query_limit_with_same_nbr_of_rows() -> Result<()>893 async fn csv_query_limit_with_same_nbr_of_rows() -> Result<()> {
894     let mut ctx = ExecutionContext::new();
895     register_aggregate_csv(&mut ctx)?;
896     let sql = "SELECT c2 FROM aggregate_test_100 LIMIT 100";
897     let actual = execute(&mut ctx, sql).await;
898     let expected = vec![
899         vec!["2"],
900         vec!["5"],
901         vec!["1"],
902         vec!["1"],
903         vec!["5"],
904         vec!["4"],
905         vec!["3"],
906         vec!["3"],
907         vec!["1"],
908         vec!["4"],
909         vec!["1"],
910         vec!["4"],
911         vec!["3"],
912         vec!["2"],
913         vec!["1"],
914         vec!["1"],
915         vec!["2"],
916         vec!["1"],
917         vec!["3"],
918         vec!["2"],
919         vec!["4"],
920         vec!["1"],
921         vec!["5"],
922         vec!["4"],
923         vec!["2"],
924         vec!["1"],
925         vec!["4"],
926         vec!["5"],
927         vec!["2"],
928         vec!["3"],
929         vec!["4"],
930         vec!["2"],
931         vec!["1"],
932         vec!["5"],
933         vec!["3"],
934         vec!["1"],
935         vec!["2"],
936         vec!["3"],
937         vec!["3"],
938         vec!["3"],
939         vec!["2"],
940         vec!["4"],
941         vec!["1"],
942         vec!["3"],
943         vec!["2"],
944         vec!["5"],
945         vec!["2"],
946         vec!["1"],
947         vec!["4"],
948         vec!["1"],
949         vec!["4"],
950         vec!["2"],
951         vec!["5"],
952         vec!["4"],
953         vec!["2"],
954         vec!["3"],
955         vec!["4"],
956         vec!["4"],
957         vec!["4"],
958         vec!["5"],
959         vec!["4"],
960         vec!["2"],
961         vec!["1"],
962         vec!["2"],
963         vec!["4"],
964         vec!["2"],
965         vec!["3"],
966         vec!["5"],
967         vec!["1"],
968         vec!["1"],
969         vec!["4"],
970         vec!["2"],
971         vec!["1"],
972         vec!["2"],
973         vec!["1"],
974         vec!["1"],
975         vec!["5"],
976         vec!["4"],
977         vec!["5"],
978         vec!["2"],
979         vec!["3"],
980         vec!["2"],
981         vec!["4"],
982         vec!["1"],
983         vec!["3"],
984         vec!["4"],
985         vec!["3"],
986         vec!["2"],
987         vec!["5"],
988         vec!["3"],
989         vec!["3"],
990         vec!["2"],
991         vec!["5"],
992         vec!["5"],
993         vec!["4"],
994         vec!["1"],
995         vec!["3"],
996         vec!["3"],
997         vec!["4"],
998         vec!["4"],
999     ];
1000     assert_eq!(expected, actual);
1001     Ok(())
1002 }
1003 
1004 #[tokio::test]
csv_query_limit_zero() -> Result<()>1005 async fn csv_query_limit_zero() -> Result<()> {
1006     let mut ctx = ExecutionContext::new();
1007     register_aggregate_csv(&mut ctx)?;
1008     let sql = "SELECT c1 FROM aggregate_test_100 LIMIT 0";
1009     let actual = execute(&mut ctx, sql).await;
1010     let expected: Vec<Vec<String>> = vec![];
1011     assert_eq!(expected, actual);
1012     Ok(())
1013 }
1014 
1015 #[tokio::test]
csv_query_create_external_table()1016 async fn csv_query_create_external_table() {
1017     let mut ctx = ExecutionContext::new();
1018     register_aggregate_csv_by_sql(&mut ctx).await;
1019     let sql = "SELECT c1, c2, c3, c4, c5, c6, c7, c8, c9, 10, c11, c12, c13 FROM aggregate_test_100 LIMIT 1";
1020     let actual = execute(&mut ctx, sql).await;
1021     let expected = vec![vec![
1022         "c",
1023         "2",
1024         "1",
1025         "18109",
1026         "2033001162",
1027         "-6513304855495910254",
1028         "25",
1029         "43062",
1030         "1491205016",
1031         "10",
1032         "0.110830784",
1033         "0.9294097332465232",
1034         "6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW",
1035     ]];
1036     assert_eq!(expected, actual);
1037 }
1038 
1039 #[tokio::test]
csv_query_external_table_count()1040 async fn csv_query_external_table_count() {
1041     let mut ctx = ExecutionContext::new();
1042     register_aggregate_csv_by_sql(&mut ctx).await;
1043     let sql = "SELECT COUNT(c12) FROM aggregate_test_100";
1044     let actual = execute(&mut ctx, sql).await;
1045     let expected = vec![vec!["100"]];
1046     assert_eq!(expected, actual);
1047 }
1048 
1049 #[tokio::test]
csv_query_external_table_sum()1050 async fn csv_query_external_table_sum() {
1051     let mut ctx = ExecutionContext::new();
1052     // cast smallint and int to bigint to avoid overflow during calculation
1053     register_aggregate_csv_by_sql(&mut ctx).await;
1054     let sql =
1055         "SELECT SUM(CAST(c7 AS BIGINT)), SUM(CAST(c8 AS BIGINT)) FROM aggregate_test_100";
1056     let actual = execute(&mut ctx, sql).await;
1057     let expected = vec![vec!["13060", "3017641"]];
1058     assert_eq!(expected, actual);
1059 }
1060 
1061 #[tokio::test]
csv_query_count_star()1062 async fn csv_query_count_star() {
1063     let mut ctx = ExecutionContext::new();
1064     register_aggregate_csv_by_sql(&mut ctx).await;
1065     let sql = "SELECT COUNT(*) FROM aggregate_test_100";
1066     let actual = execute(&mut ctx, sql).await;
1067     let expected = vec![vec!["100"]];
1068     assert_eq!(expected, actual);
1069 }
1070 
1071 #[tokio::test]
csv_query_count_one()1072 async fn csv_query_count_one() {
1073     let mut ctx = ExecutionContext::new();
1074     register_aggregate_csv_by_sql(&mut ctx).await;
1075     let sql = "SELECT COUNT(1) FROM aggregate_test_100";
1076     let actual = execute(&mut ctx, sql).await;
1077     let expected = vec![vec!["100"]];
1078     assert_eq!(expected, actual);
1079 }
1080 
1081 #[tokio::test]
case_when() -> Result<()>1082 async fn case_when() -> Result<()> {
1083     let mut ctx = create_case_context()?;
1084     let sql = "SELECT \
1085         CASE WHEN c1 = 'a' THEN 1 \
1086              WHEN c1 = 'b' THEN 2 \
1087              END \
1088         FROM t1";
1089     let actual = execute(&mut ctx, sql).await;
1090     let expected = vec![vec!["1"], vec!["2"], vec!["NULL"], vec!["NULL"]];
1091     assert_eq!(expected, actual);
1092     Ok(())
1093 }
1094 
1095 #[tokio::test]
case_when_else() -> Result<()>1096 async fn case_when_else() -> Result<()> {
1097     let mut ctx = create_case_context()?;
1098     let sql = "SELECT \
1099         CASE WHEN c1 = 'a' THEN 1 \
1100              WHEN c1 = 'b' THEN 2 \
1101              ELSE 999 END \
1102         FROM t1";
1103     let actual = execute(&mut ctx, sql).await;
1104     let expected = vec![vec!["1"], vec!["2"], vec!["999"], vec!["999"]];
1105     assert_eq!(expected, actual);
1106     Ok(())
1107 }
1108 
1109 #[tokio::test]
case_when_with_base_expr() -> Result<()>1110 async fn case_when_with_base_expr() -> Result<()> {
1111     let mut ctx = create_case_context()?;
1112     let sql = "SELECT \
1113         CASE c1 WHEN 'a' THEN 1 \
1114              WHEN 'b' THEN 2 \
1115              END \
1116         FROM t1";
1117     let actual = execute(&mut ctx, sql).await;
1118     let expected = vec![vec!["1"], vec!["2"], vec!["NULL"], vec!["NULL"]];
1119     assert_eq!(expected, actual);
1120     Ok(())
1121 }
1122 
1123 #[tokio::test]
case_when_else_with_base_expr() -> Result<()>1124 async fn case_when_else_with_base_expr() -> Result<()> {
1125     let mut ctx = create_case_context()?;
1126     let sql = "SELECT \
1127         CASE c1 WHEN 'a' THEN 1 \
1128              WHEN 'b' THEN 2 \
1129              ELSE 999 END \
1130         FROM t1";
1131     let actual = execute(&mut ctx, sql).await;
1132     let expected = vec![vec!["1"], vec!["2"], vec!["999"], vec!["999"]];
1133     assert_eq!(expected, actual);
1134     Ok(())
1135 }
1136 
create_case_context() -> Result<ExecutionContext>1137 fn create_case_context() -> Result<ExecutionContext> {
1138     let mut ctx = ExecutionContext::new();
1139     let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, true)]));
1140     let data = RecordBatch::try_new(
1141         schema.clone(),
1142         vec![Arc::new(StringArray::from(vec![
1143             Some("a"),
1144             Some("b"),
1145             Some("c"),
1146             None,
1147         ]))],
1148     )?;
1149     let table = MemTable::try_new(schema, vec![vec![data]])?;
1150     ctx.register_table("t1", Arc::new(table));
1151     Ok(ctx)
1152 }
1153 
1154 #[tokio::test]
equijoin() -> Result<()>1155 async fn equijoin() -> Result<()> {
1156     let mut ctx = create_join_context("t1_id", "t2_id")?;
1157     let sql =
1158         "SELECT t1_id, t1_name, t2_name FROM t1 JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
1159     let actual = execute(&mut ctx, sql).await;
1160     let expected = vec![
1161         vec!["11", "a", "z"],
1162         vec!["22", "b", "y"],
1163         vec!["44", "d", "x"],
1164     ];
1165     assert_eq!(expected, actual);
1166     Ok(())
1167 }
1168 
1169 #[tokio::test]
left_join() -> Result<()>1170 async fn left_join() -> Result<()> {
1171     let mut ctx = create_join_context("t1_id", "t2_id")?;
1172     let sql = "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
1173     let actual = execute(&mut ctx, sql).await;
1174     let expected = vec![
1175         vec!["11", "a", "z"],
1176         vec!["22", "b", "y"],
1177         vec!["33", "c", "NULL"],
1178         vec!["44", "d", "x"],
1179     ];
1180     assert_eq!(expected, actual);
1181     Ok(())
1182 }
1183 
1184 #[tokio::test]
right_join() -> Result<()>1185 async fn right_join() -> Result<()> {
1186     let mut ctx = create_join_context("t1_id", "t2_id")?;
1187     let sql =
1188         "SELECT t1_id, t1_name, t2_name FROM t1 RIGHT JOIN t2 ON t1_id = t2_id ORDER BY t1_id";
1189     let actual = execute(&mut ctx, sql).await;
1190     let expected = vec![
1191         vec!["NULL", "NULL", "w"],
1192         vec!["11", "a", "z"],
1193         vec!["22", "b", "y"],
1194         vec!["44", "d", "x"],
1195     ];
1196     assert_eq!(expected, actual);
1197     Ok(())
1198 }
1199 
1200 #[tokio::test]
left_join_using() -> Result<()>1201 async fn left_join_using() -> Result<()> {
1202     let mut ctx = create_join_context("id", "id")?;
1203     let sql = "SELECT id, t1_name, t2_name FROM t1 LEFT JOIN t2 USING (id) ORDER BY id";
1204     let actual = execute(&mut ctx, sql).await;
1205     let expected = vec![
1206         vec!["11", "a", "z"],
1207         vec!["22", "b", "y"],
1208         vec!["33", "c", "NULL"],
1209         vec!["44", "d", "x"],
1210     ];
1211     assert_eq!(expected, actual);
1212     Ok(())
1213 }
1214 
1215 #[tokio::test]
equijoin_implicit_syntax() -> Result<()>1216 async fn equijoin_implicit_syntax() -> Result<()> {
1217     let mut ctx = create_join_context("t1_id", "t2_id")?;
1218     let sql =
1219         "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t1_id = t2_id ORDER BY t1_id";
1220     let actual = execute(&mut ctx, sql).await;
1221     let expected = vec![
1222         vec!["11", "a", "z"],
1223         vec!["22", "b", "y"],
1224         vec!["44", "d", "x"],
1225     ];
1226     assert_eq!(expected, actual);
1227     Ok(())
1228 }
1229 
1230 #[tokio::test]
equijoin_implicit_syntax_with_filter() -> Result<()>1231 async fn equijoin_implicit_syntax_with_filter() -> Result<()> {
1232     let mut ctx = create_join_context("t1_id", "t2_id")?;
1233     let sql = "SELECT t1_id, t1_name, t2_name \
1234         FROM t1, t2 \
1235         WHERE t1_id > 0 \
1236         AND t1_id = t2_id \
1237         AND t2_id < 99 \
1238         ORDER BY t1_id";
1239     let actual = execute(&mut ctx, sql).await;
1240     let expected = vec![
1241         vec!["11", "a", "z"],
1242         vec!["22", "b", "y"],
1243         vec!["44", "d", "x"],
1244     ];
1245     assert_eq!(expected, actual);
1246     Ok(())
1247 }
1248 
1249 #[tokio::test]
equijoin_implicit_syntax_reversed() -> Result<()>1250 async fn equijoin_implicit_syntax_reversed() -> Result<()> {
1251     let mut ctx = create_join_context("t1_id", "t2_id")?;
1252     let sql =
1253         "SELECT t1_id, t1_name, t2_name FROM t1, t2 WHERE t2_id = t1_id ORDER BY t1_id";
1254     let actual = execute(&mut ctx, sql).await;
1255     let expected = vec![
1256         vec!["11", "a", "z"],
1257         vec!["22", "b", "y"],
1258         vec!["44", "d", "x"],
1259     ];
1260     assert_eq!(expected, actual);
1261     Ok(())
1262 }
1263 
1264 #[tokio::test]
cartesian_join() -> Result<()>1265 async fn cartesian_join() -> Result<()> {
1266     let ctx = create_join_context("t1_id", "t2_id")?;
1267     let sql = "SELECT t1_id, t1_name, t2_name FROM t1, t2 ORDER BY t1_id";
1268     let maybe_plan = ctx.create_logical_plan(&sql);
1269     assert_eq!(
1270         "This feature is not implemented: Cartesian joins are not supported",
1271         &format!("{}", maybe_plan.err().unwrap())
1272     );
1273     Ok(())
1274 }
1275 
create_join_context( column_left: &str, column_right: &str, ) -> Result<ExecutionContext>1276 fn create_join_context(
1277     column_left: &str,
1278     column_right: &str,
1279 ) -> Result<ExecutionContext> {
1280     let mut ctx = ExecutionContext::new();
1281 
1282     let t1_schema = Arc::new(Schema::new(vec![
1283         Field::new(column_left, DataType::UInt32, true),
1284         Field::new("t1_name", DataType::Utf8, true),
1285     ]));
1286     let t1_data = RecordBatch::try_new(
1287         t1_schema.clone(),
1288         vec![
1289             Arc::new(UInt32Array::from(vec![11, 22, 33, 44])),
1290             Arc::new(StringArray::from(vec![
1291                 Some("a"),
1292                 Some("b"),
1293                 Some("c"),
1294                 Some("d"),
1295             ])),
1296         ],
1297     )?;
1298     let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
1299     ctx.register_table("t1", Arc::new(t1_table));
1300 
1301     let t2_schema = Arc::new(Schema::new(vec![
1302         Field::new(column_right, DataType::UInt32, true),
1303         Field::new("t2_name", DataType::Utf8, true),
1304     ]));
1305     let t2_data = RecordBatch::try_new(
1306         t2_schema.clone(),
1307         vec![
1308             Arc::new(UInt32Array::from(vec![11, 22, 44, 55])),
1309             Arc::new(StringArray::from(vec![
1310                 Some("z"),
1311                 Some("y"),
1312                 Some("x"),
1313                 Some("w"),
1314             ])),
1315         ],
1316     )?;
1317     let t2_table = MemTable::try_new(t2_schema, vec![vec![t2_data]])?;
1318     ctx.register_table("t2", Arc::new(t2_table));
1319 
1320     Ok(ctx)
1321 }
1322 
create_join_context_qualified() -> Result<ExecutionContext>1323 fn create_join_context_qualified() -> Result<ExecutionContext> {
1324     let mut ctx = ExecutionContext::new();
1325 
1326     let t1_schema = Arc::new(Schema::new(vec![
1327         Field::new("a", DataType::UInt32, true),
1328         Field::new("b", DataType::UInt32, true),
1329         Field::new("c", DataType::UInt32, true),
1330     ]));
1331     let t1_data = RecordBatch::try_new(
1332         t1_schema.clone(),
1333         vec![
1334             Arc::new(UInt32Array::from(vec![1, 2, 3, 4])),
1335             Arc::new(UInt32Array::from(vec![10, 20, 30, 40])),
1336             Arc::new(UInt32Array::from(vec![50, 60, 70, 80])),
1337         ],
1338     )?;
1339     let t1_table = MemTable::try_new(t1_schema, vec![vec![t1_data]])?;
1340     ctx.register_table("t1", Arc::new(t1_table));
1341 
1342     let t2_schema = Arc::new(Schema::new(vec![
1343         Field::new("a", DataType::UInt32, true),
1344         Field::new("b", DataType::UInt32, true),
1345         Field::new("c", DataType::UInt32, true),
1346     ]));
1347     let t2_data = RecordBatch::try_new(
1348         t2_schema.clone(),
1349         vec![
1350             Arc::new(UInt32Array::from(vec![1, 2, 9, 4])),
1351             Arc::new(UInt32Array::from(vec![100, 200, 300, 400])),
1352             Arc::new(UInt32Array::from(vec![500, 600, 700, 800])),
1353         ],
1354     )?;
1355     let t2_table = MemTable::try_new(t2_schema, vec![vec![t2_data]])?;
1356     ctx.register_table("t2", Arc::new(t2_table));
1357 
1358     Ok(ctx)
1359 }
1360 
1361 #[tokio::test]
csv_explain()1362 async fn csv_explain() {
1363     let mut ctx = ExecutionContext::new();
1364     register_aggregate_csv_by_sql(&mut ctx).await;
1365     let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > 10";
1366     let actual = execute(&mut ctx, sql).await;
1367     let expected = vec![
1368         vec![
1369             "logical_plan",
1370             "Projection: #c1\n  Filter: #c2 Gt Int64(10)\n    TableScan: aggregate_test_100 projection=None"
1371         ]
1372     ];
1373     assert_eq!(expected, actual);
1374 
1375     // Also, expect same result with lowercase explain
1376     let sql = "explain SELECT c1 FROM aggregate_test_100 where c2 > 10";
1377     let actual = execute(&mut ctx, sql).await;
1378     assert_eq!(expected, actual);
1379 }
1380 
1381 #[tokio::test]
csv_explain_verbose()1382 async fn csv_explain_verbose() {
1383     let mut ctx = ExecutionContext::new();
1384     register_aggregate_csv_by_sql(&mut ctx).await;
1385     let sql = "EXPLAIN VERBOSE SELECT c1 FROM aggregate_test_100 where c2 > 10";
1386     let actual = execute(&mut ctx, sql).await;
1387 
1388     // flatten to a single string
1389     let actual = actual.into_iter().map(|r| r.join("\t")).collect::<String>();
1390 
1391     // Don't actually test the contents of the debuging output (as
1392     // that may change and keeping this test updated will be a
1393     // pain). Instead just check for a few key pieces.
1394     assert!(actual.contains("logical_plan"), "Actual: '{}'", actual);
1395     assert!(actual.contains("physical_plan"), "Actual: '{}'", actual);
1396     assert!(actual.contains("#c2 Gt Int64(10)"), "Actual: '{}'", actual);
1397 }
1398 
aggr_test_schema() -> SchemaRef1399 fn aggr_test_schema() -> SchemaRef {
1400     Arc::new(Schema::new(vec![
1401         Field::new("c1", DataType::Utf8, false),
1402         Field::new("c2", DataType::UInt32, false),
1403         Field::new("c3", DataType::Int8, false),
1404         Field::new("c4", DataType::Int16, false),
1405         Field::new("c5", DataType::Int32, false),
1406         Field::new("c6", DataType::Int64, false),
1407         Field::new("c7", DataType::UInt8, false),
1408         Field::new("c8", DataType::UInt16, false),
1409         Field::new("c9", DataType::UInt32, false),
1410         Field::new("c10", DataType::UInt64, false),
1411         Field::new("c11", DataType::Float32, false),
1412         Field::new("c12", DataType::Float64, false),
1413         Field::new("c13", DataType::Utf8, false),
1414     ]))
1415 }
1416 
register_aggregate_csv_by_sql(ctx: &mut ExecutionContext)1417 async fn register_aggregate_csv_by_sql(ctx: &mut ExecutionContext) {
1418     let testdata = arrow::util::test_util::arrow_test_data();
1419 
1420     // TODO: The following c9 should be migrated to UInt32 and c10 should be UInt64 once
1421     // unsigned is supported.
1422     let df = ctx
1423         .sql(&format!(
1424             "
1425     CREATE EXTERNAL TABLE aggregate_test_100 (
1426         c1  VARCHAR NOT NULL,
1427         c2  INT NOT NULL,
1428         c3  SMALLINT NOT NULL,
1429         c4  SMALLINT NOT NULL,
1430         c5  INT NOT NULL,
1431         c6  BIGINT NOT NULL,
1432         c7  SMALLINT NOT NULL,
1433         c8  INT NOT NULL,
1434         c9  BIGINT NOT NULL,
1435         c10 VARCHAR NOT NULL,
1436         c11 FLOAT NOT NULL,
1437         c12 DOUBLE NOT NULL,
1438         c13 VARCHAR NOT NULL
1439     )
1440     STORED AS CSV
1441     WITH HEADER ROW
1442     LOCATION '{}/csv/aggregate_test_100.csv'
1443     ",
1444             testdata
1445         ))
1446         .expect("Creating dataframe for CREATE EXTERNAL TABLE");
1447 
1448     // Mimic the CLI and execute the resulting plan -- even though it
1449     // is effectively a no-op (returns zero rows)
1450     let results = df.collect().await.expect("Executing CREATE EXTERNAL TABLE");
1451     assert!(
1452         results.is_empty(),
1453         "Expected no rows from executing CREATE EXTERNAL TABLE"
1454     );
1455 }
1456 
register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()>1457 fn register_aggregate_csv(ctx: &mut ExecutionContext) -> Result<()> {
1458     let testdata = arrow::util::test_util::arrow_test_data();
1459     let schema = aggr_test_schema();
1460     ctx.register_csv(
1461         "aggregate_test_100",
1462         &format!("{}/csv/aggregate_test_100.csv", testdata),
1463         CsvReadOptions::new().schema(&schema),
1464     )?;
1465     Ok(())
1466 }
1467 
register_aggregate_simple_csv(ctx: &mut ExecutionContext) -> Result<()>1468 fn register_aggregate_simple_csv(ctx: &mut ExecutionContext) -> Result<()> {
1469     // It's not possible to use aggregate_test_100, not enought similar values to test grouping on floats
1470     let schema = Arc::new(Schema::new(vec![
1471         Field::new("c1", DataType::Float32, false),
1472         Field::new("c2", DataType::Float64, false),
1473         Field::new("c3", DataType::Boolean, false),
1474     ]));
1475 
1476     ctx.register_csv(
1477         "aggregate_simple",
1478         "tests/aggregate_simple.csv",
1479         CsvReadOptions::new().schema(&schema),
1480     )?;
1481     Ok(())
1482 }
1483 
register_alltypes_parquet(ctx: &mut ExecutionContext)1484 fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
1485     let testdata = arrow::util::test_util::parquet_test_data();
1486     ctx.register_parquet(
1487         "alltypes_plain",
1488         &format!("{}/alltypes_plain.parquet", testdata),
1489     )
1490     .unwrap();
1491 }
1492 
1493 /// Execute query and return result set as 2-d table of Vecs
1494 /// `result[row][column]`
execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<Vec<String>>1495 async fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<Vec<String>> {
1496     let msg = format!("Creating logical plan for '{}'", sql);
1497     let plan = ctx.create_logical_plan(&sql).expect(&msg);
1498     let logical_schema = plan.schema();
1499 
1500     let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
1501     let plan = ctx.optimize(&plan).expect(&msg);
1502     let optimized_logical_schema = plan.schema();
1503 
1504     let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
1505     let plan = ctx.create_physical_plan(&plan).expect(&msg);
1506     let physical_schema = plan.schema();
1507 
1508     let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
1509     let results = collect(plan).await.expect(&msg);
1510 
1511     assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
1512     assert_eq!(
1513         logical_schema.as_ref(),
1514         &physical_schema.to_dfschema().unwrap()
1515     );
1516 
1517     result_vec(&results)
1518 }
1519 
1520 /// Specialised String representation
col_str(column: &ArrayRef, row_index: usize) -> String1521 fn col_str(column: &ArrayRef, row_index: usize) -> String {
1522     if column.is_null(row_index) {
1523         return "NULL".to_string();
1524     }
1525 
1526     // Special case ListArray as there is no pretty print support for it yet
1527     if let DataType::FixedSizeList(_, n) = column.data_type() {
1528         let array = column
1529             .as_any()
1530             .downcast_ref::<FixedSizeListArray>()
1531             .unwrap()
1532             .value(row_index);
1533 
1534         let mut r = Vec::with_capacity(*n as usize);
1535         for i in 0..*n {
1536             r.push(col_str(&array, i as usize));
1537         }
1538         return format!("[{}]", r.join(","));
1539     }
1540 
1541     array_value_to_string(column, row_index)
1542         .ok()
1543         .unwrap_or_else(|| "???".to_string())
1544 }
1545 
1546 /// Converts the results into a 2d array of strings, `result[row][column]`
1547 /// Special cases nulls to NULL for testing
result_vec(results: &[RecordBatch]) -> Vec<Vec<String>>1548 fn result_vec(results: &[RecordBatch]) -> Vec<Vec<String>> {
1549     let mut result = vec![];
1550     for batch in results {
1551         for row_index in 0..batch.num_rows() {
1552             let row_vec = batch
1553                 .columns()
1554                 .iter()
1555                 .map(|column| col_str(column, row_index))
1556                 .collect();
1557             result.push(row_vec);
1558         }
1559     }
1560     result
1561 }
1562 
generic_query_length<T: 'static + Array + From<Vec<&'static str>>>( datatype: DataType, ) -> Result<()>1563 async fn generic_query_length<T: 'static + Array + From<Vec<&'static str>>>(
1564     datatype: DataType,
1565 ) -> Result<()> {
1566     let schema = Arc::new(Schema::new(vec![Field::new("c1", datatype, false)]));
1567 
1568     let data = RecordBatch::try_new(
1569         schema.clone(),
1570         vec![Arc::new(T::from(vec!["", "a", "aa", "aaa"]))],
1571     )?;
1572 
1573     let table = MemTable::try_new(schema, vec![vec![data]])?;
1574 
1575     let mut ctx = ExecutionContext::new();
1576     ctx.register_table("test", Arc::new(table));
1577     let sql = "SELECT length(c1) FROM test";
1578     let actual = execute(&mut ctx, sql).await;
1579     let expected = vec![vec!["0"], vec!["1"], vec!["2"], vec!["3"]];
1580     assert_eq!(expected, actual);
1581     Ok(())
1582 }
1583 
1584 #[tokio::test]
query_length() -> Result<()>1585 async fn query_length() -> Result<()> {
1586     generic_query_length::<StringArray>(DataType::Utf8).await
1587 }
1588 
1589 #[tokio::test]
query_large_length() -> Result<()>1590 async fn query_large_length() -> Result<()> {
1591     generic_query_length::<LargeStringArray>(DataType::LargeUtf8).await
1592 }
1593 
1594 #[tokio::test]
query_not() -> Result<()>1595 async fn query_not() -> Result<()> {
1596     let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]));
1597 
1598     let data = RecordBatch::try_new(
1599         schema.clone(),
1600         vec![Arc::new(BooleanArray::from(vec![
1601             Some(false),
1602             None,
1603             Some(true),
1604         ]))],
1605     )?;
1606 
1607     let table = MemTable::try_new(schema, vec![vec![data]])?;
1608 
1609     let mut ctx = ExecutionContext::new();
1610     ctx.register_table("test", Arc::new(table));
1611     let sql = "SELECT NOT c1 FROM test";
1612     let actual = execute(&mut ctx, sql).await;
1613     let expected = vec![vec!["true"], vec!["NULL"], vec!["false"]];
1614     assert_eq!(expected, actual);
1615     Ok(())
1616 }
1617 
1618 #[tokio::test]
query_concat() -> Result<()>1619 async fn query_concat() -> Result<()> {
1620     let schema = Arc::new(Schema::new(vec![
1621         Field::new("c1", DataType::Utf8, false),
1622         Field::new("c2", DataType::Int32, true),
1623     ]));
1624 
1625     let data = RecordBatch::try_new(
1626         schema.clone(),
1627         vec![
1628             Arc::new(StringArray::from(vec!["", "a", "aa", "aaa"])),
1629             Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3)])),
1630         ],
1631     )?;
1632 
1633     let table = MemTable::try_new(schema, vec![vec![data]])?;
1634 
1635     let mut ctx = ExecutionContext::new();
1636     ctx.register_table("test", Arc::new(table));
1637     let sql = "SELECT concat(c1, '-hi-', cast(c2 as varchar)) FROM test";
1638     let actual = execute(&mut ctx, sql).await;
1639     let expected = vec![
1640         vec!["-hi-0"],
1641         vec!["a-hi-1"],
1642         vec!["NULL"],
1643         vec!["aaa-hi-3"],
1644     ];
1645     assert_eq!(expected, actual);
1646     Ok(())
1647 }
1648 
1649 #[tokio::test]
query_array() -> Result<()>1650 async fn query_array() -> Result<()> {
1651     let schema = Arc::new(Schema::new(vec![
1652         Field::new("c1", DataType::Utf8, false),
1653         Field::new("c2", DataType::Int32, true),
1654     ]));
1655 
1656     let data = RecordBatch::try_new(
1657         schema.clone(),
1658         vec![
1659             Arc::new(StringArray::from(vec!["", "a", "aa", "aaa"])),
1660             Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3)])),
1661         ],
1662     )?;
1663 
1664     let table = MemTable::try_new(schema, vec![vec![data]])?;
1665 
1666     let mut ctx = ExecutionContext::new();
1667     ctx.register_table("test", Arc::new(table));
1668     let sql = "SELECT array(c1, cast(c2 as varchar)) FROM test";
1669     let actual = execute(&mut ctx, sql).await;
1670     let expected = vec![
1671         vec!["[,0]"],
1672         vec!["[a,1]"],
1673         vec!["[aa,NULL]"],
1674         vec!["[aaa,3]"],
1675     ];
1676     assert_eq!(expected, actual);
1677     Ok(())
1678 }
1679 
1680 #[tokio::test]
csv_query_sum_cast()1681 async fn csv_query_sum_cast() {
1682     let mut ctx = ExecutionContext::new();
1683     register_aggregate_csv_by_sql(&mut ctx).await;
1684     // c8 = i32; c9 = i64
1685     let sql = "SELECT c8 + c9 FROM aggregate_test_100";
1686     // check that the physical and logical schemas are equal
1687     execute(&mut ctx, sql).await;
1688 }
1689 
1690 #[tokio::test]
query_where_neg_num() -> Result<()>1691 async fn query_where_neg_num() -> Result<()> {
1692     let mut ctx = ExecutionContext::new();
1693     register_aggregate_csv_by_sql(&mut ctx).await;
1694 
1695     // Negative numbers do not parse correctly as of Arrow 2.0.0
1696     let sql = "select c7, c8 from aggregate_test_100 where c7 >= -2 and c7 < 10";
1697     let actual = execute(&mut ctx, sql).await;
1698     let expected = vec![
1699         vec!["7", "45465"],
1700         vec!["5", "40622"],
1701         vec!["0", "61069"],
1702         vec!["2", "20120"],
1703         vec!["4", "39363"],
1704     ];
1705     assert_eq!(expected, actual);
1706 
1707     // Also check floating point neg numbers
1708     let sql = "select c7, c8 from aggregate_test_100 where c7 >= -2.9 and c7 < 10";
1709     let actual = execute(&mut ctx, sql).await;
1710     let expected = vec![
1711         vec!["7", "45465"],
1712         vec!["5", "40622"],
1713         vec!["0", "61069"],
1714         vec!["2", "20120"],
1715         vec!["4", "39363"],
1716     ];
1717     assert_eq!(expected, actual);
1718     Ok(())
1719 }
1720 
1721 #[tokio::test]
like() -> Result<()>1722 async fn like() -> Result<()> {
1723     let mut ctx = ExecutionContext::new();
1724     register_aggregate_csv_by_sql(&mut ctx).await;
1725     let sql = "SELECT COUNT(c1) FROM aggregate_test_100 WHERE c13 LIKE '%FB%'";
1726     // check that the physical and logical schemas are equal
1727     let actual = execute(&mut ctx, sql).await;
1728 
1729     let expected = vec![vec!["1"]];
1730     assert_eq!(expected, actual);
1731     Ok(())
1732 }
1733 
make_timestamp_nano_table() -> Result<Arc<MemTable>>1734 fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
1735     let schema = Arc::new(Schema::new(vec![
1736         Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
1737         Field::new("value", DataType::Int32, true),
1738     ]));
1739 
1740     let mut builder = TimestampNanosecondArray::builder(3);
1741 
1742     builder.append_value(1599572549190855000)?; // 2020-09-08T13:42:29.190855+00:00
1743     builder.append_value(1599568949190855000)?; // 2020-09-08T12:42:29.190855+00:00
1744     builder.append_value(1599565349190855000)?; // 2020-09-08T11:42:29.190855+00:00
1745 
1746     let data = RecordBatch::try_new(
1747         schema.clone(),
1748         vec![
1749             Arc::new(builder.finish()),
1750             Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
1751         ],
1752     )?;
1753     let table = MemTable::try_new(schema, vec![vec![data]])?;
1754     Ok(Arc::new(table))
1755 }
1756 
1757 #[tokio::test]
to_timestamp() -> Result<()>1758 async fn to_timestamp() -> Result<()> {
1759     let mut ctx = ExecutionContext::new();
1760     ctx.register_table("ts_data", make_timestamp_nano_table()?);
1761 
1762     let sql = "SELECT COUNT(*) FROM ts_data where ts > to_timestamp('2020-09-08T12:00:00+00:00')";
1763     let actual = execute(&mut ctx, sql).await;
1764 
1765     let expected = vec![vec!["2"]];
1766     assert_eq!(expected, actual);
1767     Ok(())
1768 }
1769 
1770 #[tokio::test]
query_is_null() -> Result<()>1771 async fn query_is_null() -> Result<()> {
1772     let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Float64, true)]));
1773 
1774     let data = RecordBatch::try_new(
1775         schema.clone(),
1776         vec![Arc::new(Float64Array::from(vec![
1777             Some(1.0),
1778             None,
1779             Some(f64::NAN),
1780         ]))],
1781     )?;
1782 
1783     let table = MemTable::try_new(schema, vec![vec![data]])?;
1784 
1785     let mut ctx = ExecutionContext::new();
1786     ctx.register_table("test", Arc::new(table));
1787     let sql = "SELECT c1 IS NULL FROM test";
1788     let actual = execute(&mut ctx, sql).await;
1789     let expected = vec![vec!["false"], vec!["true"], vec!["false"]];
1790     assert_eq!(expected, actual);
1791     Ok(())
1792 }
1793 
1794 #[tokio::test]
query_is_not_null() -> Result<()>1795 async fn query_is_not_null() -> Result<()> {
1796     let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Float64, true)]));
1797 
1798     let data = RecordBatch::try_new(
1799         schema.clone(),
1800         vec![Arc::new(Float64Array::from(vec![
1801             Some(1.0),
1802             None,
1803             Some(f64::NAN),
1804         ]))],
1805     )?;
1806 
1807     let table = MemTable::try_new(schema, vec![vec![data]])?;
1808 
1809     let mut ctx = ExecutionContext::new();
1810     ctx.register_table("test", Arc::new(table));
1811     let sql = "SELECT c1 IS NOT NULL FROM test";
1812     let actual = execute(&mut ctx, sql).await;
1813     let expected = vec![vec!["true"], vec!["false"], vec!["true"]];
1814 
1815     assert_eq!(expected, actual);
1816     Ok(())
1817 }
1818 
1819 #[tokio::test]
query_count_distinct() -> Result<()>1820 async fn query_count_distinct() -> Result<()> {
1821     let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
1822 
1823     let data = RecordBatch::try_new(
1824         schema.clone(),
1825         vec![Arc::new(Int32Array::from(vec![
1826             Some(0),
1827             Some(1),
1828             None,
1829             Some(3),
1830             Some(3),
1831         ]))],
1832     )?;
1833 
1834     let table = MemTable::try_new(schema, vec![vec![data]])?;
1835 
1836     let mut ctx = ExecutionContext::new();
1837     ctx.register_table("test", Arc::new(table));
1838     let sql = "SELECT COUNT(DISTINCT c1) FROM test";
1839     let actual = execute(&mut ctx, sql).await;
1840     let expected = vec![vec!["3".to_string()]];
1841     assert_eq!(expected, actual);
1842     Ok(())
1843 }
1844 
1845 #[tokio::test]
query_on_string_dictionary() -> Result<()>1846 async fn query_on_string_dictionary() -> Result<()> {
1847     // Test to ensure DataFusion can operate on dictionary types
1848     // Use StringDictionary (32 bit indexes = keys)
1849     let field_type =
1850         DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1851     let schema = Arc::new(Schema::new(vec![Field::new("d1", field_type, true)]));
1852 
1853     let keys_builder = PrimitiveBuilder::<Int32Type>::new(10);
1854     let values_builder = StringBuilder::new(10);
1855     let mut builder = StringDictionaryBuilder::new(keys_builder, values_builder);
1856 
1857     builder.append("one")?;
1858     builder.append_null()?;
1859     builder.append("three")?;
1860     let array = Arc::new(builder.finish());
1861 
1862     let data = RecordBatch::try_new(schema.clone(), vec![array])?;
1863 
1864     let table = MemTable::try_new(schema, vec![vec![data]])?;
1865     let mut ctx = ExecutionContext::new();
1866     ctx.register_table("test", Arc::new(table));
1867 
1868     // Basic SELECT
1869     let sql = "SELECT * FROM test";
1870     let actual = execute(&mut ctx, sql).await;
1871     let expected = vec![vec!["one"], vec!["NULL"], vec!["three"]];
1872     assert_eq!(expected, actual);
1873 
1874     // basic filtering
1875     let sql = "SELECT * FROM test WHERE d1 IS NOT NULL";
1876     let actual = execute(&mut ctx, sql).await;
1877     let expected = vec![vec!["one"], vec!["three"]];
1878     assert_eq!(expected, actual);
1879 
1880     // filtering with constant
1881     let sql = "SELECT * FROM test WHERE d1 = 'three'";
1882     let actual = execute(&mut ctx, sql).await;
1883     let expected = vec![vec!["three"]];
1884     assert_eq!(expected, actual);
1885 
1886     // Expression evaluation
1887     let sql = "SELECT concat(d1, '-foo') FROM test";
1888     let actual = execute(&mut ctx, sql).await;
1889     let expected = vec![vec!["one-foo"], vec!["NULL"], vec!["three-foo"]];
1890     assert_eq!(expected, actual);
1891 
1892     // aggregation
1893     let sql = "SELECT COUNT(d1) FROM test";
1894     let actual = execute(&mut ctx, sql).await;
1895     let expected = vec![vec!["2"]];
1896     assert_eq!(expected, actual);
1897 
1898     Ok(())
1899 }
1900 
1901 #[tokio::test]
query_without_from() -> Result<()>1902 async fn query_without_from() -> Result<()> {
1903     // Test for SELECT <expression> without FROM.
1904     // Should evaluate expressions in project position.
1905     let mut ctx = ExecutionContext::new();
1906 
1907     let sql = "SELECT 1";
1908     let actual = execute(&mut ctx, sql).await;
1909     let expected = vec![vec!["1"]];
1910     assert_eq!(expected, actual);
1911 
1912     let sql = "SELECT 1+2, 3/4, cos(0)";
1913     let actual = execute(&mut ctx, sql).await;
1914     let expected = vec![vec!["3", "0", "1"]];
1915     assert_eq!(expected, actual);
1916 
1917     Ok(())
1918 }
1919 
1920 #[tokio::test]
query_scalar_minus_array() -> Result<()>1921 async fn query_scalar_minus_array() -> Result<()> {
1922     let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
1923 
1924     let data = RecordBatch::try_new(
1925         schema.clone(),
1926         vec![Arc::new(Int32Array::from(vec![
1927             Some(0),
1928             Some(1),
1929             None,
1930             Some(3),
1931         ]))],
1932     )?;
1933 
1934     let table = MemTable::try_new(schema, vec![vec![data]])?;
1935 
1936     let mut ctx = ExecutionContext::new();
1937     ctx.register_table("test", Arc::new(table));
1938     let sql = "SELECT 4 - c1 FROM test";
1939     let actual = execute(&mut ctx, sql).await;
1940     let expected = vec![vec!["4"], vec!["3"], vec!["NULL"], vec!["1"]];
1941     assert_eq!(expected, actual);
1942     Ok(())
1943 }
1944 
assert_float_eq<T>(expected: &[Vec<T>], received: &[Vec<String>]) where T: AsRef<str>,1945 fn assert_float_eq<T>(expected: &[Vec<T>], received: &[Vec<String>])
1946 where
1947     T: AsRef<str>,
1948 {
1949     expected
1950         .iter()
1951         .flatten()
1952         .zip(received.iter().flatten())
1953         .for_each(|(l, r)| {
1954             let (l, r) = (
1955                 l.as_ref().parse::<f64>().unwrap(),
1956                 r.as_str().parse::<f64>().unwrap(),
1957             );
1958             assert!((l - r).abs() <= 2.0 * f64::EPSILON);
1959         });
1960 }
1961 
1962 #[tokio::test]
csv_between_expr() -> Result<()>1963 async fn csv_between_expr() -> Result<()> {
1964     let mut ctx = ExecutionContext::new();
1965     register_aggregate_csv(&mut ctx)?;
1966     let sql = "SELECT c4 FROM aggregate_test_100 WHERE c12 BETWEEN 0.995 AND 1.0";
1967     let mut actual = execute(&mut ctx, sql).await;
1968     actual.sort();
1969     let expected = vec![vec!["10837"]];
1970     assert_eq!(expected, actual);
1971     Ok(())
1972 }
1973 
1974 #[tokio::test]
csv_between_expr_negated() -> Result<()>1975 async fn csv_between_expr_negated() -> Result<()> {
1976     let mut ctx = ExecutionContext::new();
1977     register_aggregate_csv(&mut ctx)?;
1978     let sql = "SELECT c4 FROM aggregate_test_100 WHERE c12 NOT BETWEEN 0 AND 0.995";
1979     let mut actual = execute(&mut ctx, sql).await;
1980     actual.sort();
1981     let expected = vec![vec!["10837"]];
1982     assert_eq!(expected, actual);
1983     Ok(())
1984 }
1985 
1986 #[tokio::test]
csv_group_by_date() -> Result<()>1987 async fn csv_group_by_date() -> Result<()> {
1988     let mut ctx = ExecutionContext::new();
1989     let schema = Arc::new(Schema::new(vec![
1990         Field::new("date", DataType::Date32, false),
1991         Field::new("cnt", DataType::Int32, false),
1992     ]));
1993     let data = RecordBatch::try_new(
1994         schema.clone(),
1995         vec![
1996             Arc::new(Date32Array::from(vec![
1997                 Some(100),
1998                 Some(100),
1999                 Some(100),
2000                 Some(101),
2001                 Some(101),
2002                 Some(101),
2003             ])),
2004             Arc::new(Int32Array::from(vec![
2005                 Some(1),
2006                 Some(2),
2007                 Some(3),
2008                 Some(3),
2009                 Some(3),
2010                 Some(3),
2011             ])),
2012         ],
2013     )?;
2014     let table = MemTable::try_new(schema, vec![vec![data]])?;
2015 
2016     ctx.register_table("dates", Arc::new(table));
2017     let sql = "SELECT SUM(cnt) FROM dates GROUP BY date";
2018     let actual = execute(&mut ctx, sql).await;
2019     let mut actual: Vec<String> = actual.iter().flatten().cloned().collect();
2020     actual.sort();
2021     let expected = vec!["6", "9"];
2022     assert_eq!(expected, actual);
2023     Ok(())
2024 }
2025 
2026 #[tokio::test]
string_expressions() -> Result<()>2027 async fn string_expressions() -> Result<()> {
2028     let mut ctx = ExecutionContext::new();
2029     let sql = "SELECT
2030         char_length('tom') AS char_length
2031         ,char_length(NULL) AS char_length_null
2032         ,character_length('tom') AS character_length
2033         ,character_length(NULL) AS character_length_null
2034         ,lower('TOM') AS lower
2035         ,lower(NULL) AS lower_null
2036         ,upper('tom') AS upper
2037         ,upper(NULL) AS upper_null
2038         ,trim(' tom ') AS trim
2039         ,trim(NULL) AS trim_null
2040         ,ltrim(' tom ') AS trim_left
2041         ,rtrim(' tom ') AS trim_right
2042     ";
2043     let actual = execute(&mut ctx, sql).await;
2044 
2045     let expected = vec![vec![
2046         "3", "NULL", "3", "NULL", "tom", "NULL", "TOM", "NULL", "tom", "NULL", "tom ",
2047         " tom",
2048     ]];
2049     assert_eq!(expected, actual);
2050     Ok(())
2051 }
2052 
2053 #[tokio::test]
boolean_expressions() -> Result<()>2054 async fn boolean_expressions() -> Result<()> {
2055     let mut ctx = ExecutionContext::new();
2056     let sql = "SELECT
2057         true AS val_1,
2058         false AS val_2
2059     ";
2060     let actual = execute(&mut ctx, sql).await;
2061 
2062     let expected = vec![vec!["true", "false"]];
2063     assert_eq!(expected, actual);
2064     Ok(())
2065 }
2066 
2067 #[tokio::test]
interval_expressions() -> Result<()>2068 async fn interval_expressions() -> Result<()> {
2069     let mut ctx = ExecutionContext::new();
2070     let sql = "SELECT
2071         (interval '1') as interval_1,
2072         (interval '1 second') as interval_2,
2073         (interval '500 milliseconds') as interval_3,
2074         (interval '5 second') as interval_4,
2075         (interval '1 minute') as interval_5,
2076         (interval '0.5 minute') as interval_6,
2077         (interval '.5 minute') as interval_7,
2078         (interval '5 minute') as interval_8,
2079         (interval '5 minute 1 second') as interval_9,
2080         (interval '1 hour') as interval_10,
2081         (interval '5 hour') as interval_11,
2082         (interval '1 day') as interval_12,
2083         (interval '1 day 1') as interval_13,
2084         (interval '0.5') as interval_14,
2085         (interval '0.5 day 1') as interval_15,
2086         (interval '0.49 day') as interval_16,
2087         (interval '0.499 day') as interval_17,
2088         (interval '0.4999 day') as interval_18,
2089         (interval '0.49999 day') as interval_19,
2090         (interval '0.49999999999 day') as interval_20,
2091         (interval '5 day') as interval_21,
2092         (interval '5 day 4 hours 3 minutes 2 seconds 100 milliseconds') as interval_22,
2093         (interval '0.5 month') as interval_23,
2094         (interval '1 month') as interval_24,
2095         (interval '5 month') as interval_25,
2096         (interval '13 month') as interval_26,
2097         (interval '0.5 year') as interval_27,
2098         (interval '1 year') as interval_28,
2099         (interval '2 year') as interval_29
2100     ";
2101     let actual = execute(&mut ctx, sql).await;
2102 
2103     let expected = vec![vec![
2104         "0 years 0 mons 0 days 0 hours 0 mins 1.00 secs",
2105         "0 years 0 mons 0 days 0 hours 0 mins 1.00 secs",
2106         "0 years 0 mons 0 days 0 hours 0 mins 0.500 secs",
2107         "0 years 0 mons 0 days 0 hours 0 mins 5.00 secs",
2108         "0 years 0 mons 0 days 0 hours 1 mins 0.00 secs",
2109         "0 years 0 mons 0 days 0 hours 0 mins 30.00 secs",
2110         "0 years 0 mons 0 days 0 hours 0 mins 30.00 secs",
2111         "0 years 0 mons 0 days 0 hours 5 mins 0.00 secs",
2112         "0 years 0 mons 0 days 0 hours 5 mins 1.00 secs",
2113         "0 years 0 mons 0 days 1 hours 0 mins 0.00 secs",
2114         "0 years 0 mons 0 days 5 hours 0 mins 0.00 secs",
2115         "0 years 0 mons 1 days 0 hours 0 mins 0.00 secs",
2116         "0 years 0 mons 1 days 0 hours 0 mins 1.00 secs",
2117         "0 years 0 mons 0 days 0 hours 0 mins 0.500 secs",
2118         "0 years 0 mons 0 days 12 hours 0 mins 1.00 secs",
2119         "0 years 0 mons 0 days 11 hours 45 mins 36.00 secs",
2120         "0 years 0 mons 0 days 11 hours 58 mins 33.596 secs",
2121         "0 years 0 mons 0 days 11 hours 59 mins 51.364 secs",
2122         "0 years 0 mons 0 days 11 hours 59 mins 59.136 secs",
2123         "0 years 0 mons 0 days 12 hours 0 mins 0.00 secs",
2124         "0 years 0 mons 5 days 0 hours 0 mins 0.00 secs",
2125         "0 years 0 mons 5 days 4 hours 3 mins 2.100 secs",
2126         "0 years 0 mons 15 days 0 hours 0 mins 0.00 secs",
2127         "0 years 1 mons 0 days 0 hours 0 mins 0.00 secs",
2128         "0 years 5 mons 0 days 0 hours 0 mins 0.00 secs",
2129         "1 years 1 mons 0 days 0 hours 0 mins 0.00 secs",
2130         "0 years 6 mons 0 days 0 hours 0 mins 0.00 secs",
2131         "1 years 0 mons 0 days 0 hours 0 mins 0.00 secs",
2132         "2 years 0 mons 0 days 0 hours 0 mins 0.00 secs",
2133     ]];
2134     assert_eq!(expected, actual);
2135     Ok(())
2136 }
2137 
2138 #[tokio::test]
crypto_expressions() -> Result<()>2139 async fn crypto_expressions() -> Result<()> {
2140     let mut ctx = ExecutionContext::new();
2141     let sql = "SELECT
2142         md5('tom') AS md5_tom,
2143         md5('') AS md5_empty_str,
2144         md5(null) AS md5_null,
2145         sha224('tom') AS sha224_tom,
2146         sha224('') AS sha224_empty_str,
2147         sha224(null) AS sha224_null,
2148         sha256('tom') AS sha256_tom,
2149         sha256('') AS sha256_empty_str,
2150         sha384('tom') AS sha348_tom,
2151         sha384('') AS sha384_empty_str,
2152         sha512('tom') AS sha512_tom,
2153         sha512('') AS sha512_empty_str
2154     ";
2155     let actual = execute(&mut ctx, sql).await;
2156 
2157     let expected = vec![vec![
2158         "34b7da764b21d298ef307d04d8152dc5",
2159         "d41d8cd98f00b204e9800998ecf8427e",
2160         "NULL",
2161         "0bf6cb62649c42a9ae3876ab6f6d92ad36cb5414e495f8873292be4d",
2162         "d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f",
2163         "NULL",
2164         "e1608f75c5d7813f3d4031cb30bfb786507d98137538ff8e128a6ff74e84e643",
2165         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
2166         "096f5b68aa77848e4fdf5c1c0b350de2dbfad60ffd7c25d9ea07c6c19b8a4d55a9187eb117c557883f58c16dfac3e343",
2167         "38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274edebfe76f65fbd51ad2f14898b95b",
2168         "6e1b9b3fe840680e37051f7ad5e959d6f39ad0f8885d855166f55c659469d3c8b78118c44a2a49c72ddb481cd6d8731034e11cc030070ba843a90b3495cb8d3e",
2169         "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e"
2170     ]];
2171     assert_eq!(expected, actual);
2172     Ok(())
2173 }
2174 
2175 #[tokio::test]
extract_date_part() -> Result<()>2176 async fn extract_date_part() -> Result<()> {
2177     let mut ctx = ExecutionContext::new();
2178     let sql = "SELECT
2179         date_part('hour', CAST('2020-01-01' AS DATE)) AS hr1,
2180         EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) AS hr2,
2181         EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) AS hr3,
2182         date_part('YEAR', CAST('2000-01-01' AS DATE)) AS year1,
2183         EXTRACT(year FROM to_timestamp('2020-09-08T12:00:00+00:00')) AS year2
2184     ";
2185 
2186     let actual = execute(&mut ctx, sql).await;
2187 
2188     let expected = vec![vec!["0", "0", "12", "2000", "2020"]];
2189     assert_eq!(expected, actual);
2190     Ok(())
2191 }
2192 
2193 #[tokio::test]
in_list_array() -> Result<()>2194 async fn in_list_array() -> Result<()> {
2195     let mut ctx = ExecutionContext::new();
2196     register_aggregate_csv_by_sql(&mut ctx).await;
2197     let sql = "SELECT
2198             c1 IN ('a', 'c') AS utf8_in_true
2199             ,c1 IN ('x', 'y') AS utf8_in_false
2200             ,c1 NOT IN ('x', 'y') AS utf8_not_in_true
2201             ,c1 NOT IN ('a', 'c') AS utf8_not_in_false
2202             ,CAST(CAST(c1 AS int) AS varchar) IN ('a', 'c') AS utf8_in_null
2203         FROM aggregate_test_100 WHERE c12 < 0.05";
2204     let actual = execute(&mut ctx, sql).await;
2205     let expected = vec![
2206         vec!["true", "false", "true", "false", "NULL"],
2207         vec!["true", "false", "true", "false", "NULL"],
2208         vec!["true", "false", "true", "false", "NULL"],
2209         vec!["false", "false", "true", "true", "NULL"],
2210         vec!["false", "false", "true", "true", "NULL"],
2211         vec!["false", "false", "true", "true", "NULL"],
2212         vec!["false", "false", "true", "true", "NULL"],
2213     ];
2214     assert_eq!(expected, actual);
2215     Ok(())
2216 }
2217 
2218 #[tokio::test]
in_list_scalar() -> Result<()>2219 async fn in_list_scalar() -> Result<()> {
2220     let mut ctx = ExecutionContext::new();
2221     let sql = "SELECT
2222         'a' IN ('a','b') AS utf8_in_true
2223         ,'c' IN ('a','b') AS utf8_in_false
2224         ,'c' NOT IN ('a','b') AS utf8_not_in_true
2225         ,'a' NOT IN ('a','b') AS utf8_not_in_false
2226         ,NULL IN ('a','b') AS utf8_in_null
2227         ,NULL NOT IN ('a','b') AS utf8_not_in_null
2228         ,'a' IN ('a','b',NULL) AS utf8_in_null_true
2229         ,'c' IN ('a','b',NULL) AS utf8_in_null_null
2230         ,'a' NOT IN ('a','b',NULL) AS utf8_not_in_null_false
2231         ,'c' NOT IN ('a','b',NULL) AS utf8_not_in_null_null
2232 
2233         ,0 IN (0,1,2) AS int64_in_true
2234         ,3 IN (0,1,2) AS int64_in_false
2235         ,3 NOT IN (0,1,2) AS int64_not_in_true
2236         ,0 NOT IN (0,1,2) AS int64_not_in_false
2237         ,NULL IN (0,1,2) AS int64_in_null
2238         ,NULL NOT IN (0,1,2) AS int64_not_in_null
2239         ,0 IN (0,1,2,NULL) AS int64_in_null_true
2240         ,3 IN (0,1,2,NULL) AS int64_in_null_null
2241         ,0 NOT IN (0,1,2,NULL) AS int64_not_in_null_false
2242         ,3 NOT IN (0,1,2,NULL) AS int64_not_in_null_null
2243 
2244         ,0.0 IN (0.0,0.1,0.2) AS float64_in_true
2245         ,0.3 IN (0.0,0.1,0.2) AS float64_in_false
2246         ,0.3 NOT IN (0.0,0.1,0.2) AS float64_not_in_true
2247         ,0.0 NOT IN (0.0,0.1,0.2) AS float64_not_in_false
2248         ,NULL IN (0.0,0.1,0.2) AS float64_in_null
2249         ,NULL NOT IN (0.0,0.1,0.2) AS float64_not_in_null
2250         ,0.0 IN (0.0,0.1,0.2,NULL) AS float64_in_null_true
2251         ,0.3 IN (0.0,0.1,0.2,NULL) AS float64_in_null_null
2252         ,0.0 NOT IN (0.0,0.1,0.2,NULL) AS float64_not_in_null_false
2253         ,0.3 NOT IN (0.0,0.1,0.2,NULL) AS float64_not_in_null_null
2254 
2255         ,'1' IN ('a','b',1) AS utf8_cast_in_true
2256         ,'2' IN ('a','b',1) AS utf8_cast_in_false
2257         ,'2' NOT IN ('a','b',1) AS utf8_cast_not_in_true
2258         ,'1' NOT IN ('a','b',1) AS utf8_cast_not_in_false
2259         ,NULL IN ('a','b',1) AS utf8_cast_in_null
2260         ,NULL NOT IN ('a','b',1) AS utf8_cast_not_in_null
2261         ,'1' IN ('a','b',NULL,1) AS utf8_cast_in_null_true
2262         ,'2' IN ('a','b',NULL,1) AS utf8_cast_in_null_null
2263         ,'1' NOT IN ('a','b',NULL,1) AS utf8_cast_not_in_null_false
2264         ,'2' NOT IN ('a','b',NULL,1) AS utf8_cast_not_in_null_null
2265     ";
2266     let actual = execute(&mut ctx, sql).await;
2267 
2268     let expected = vec![vec![
2269         "true", "false", "true", "false", "NULL", "NULL", "true", "NULL", "false",
2270         "NULL", "true", "false", "true", "false", "NULL", "NULL", "true", "NULL",
2271         "false", "NULL", "true", "false", "true", "false", "NULL", "NULL", "true",
2272         "NULL", "false", "NULL", "true", "false", "true", "false", "NULL", "NULL",
2273         "true", "NULL", "false", "NULL",
2274     ]];
2275     assert_eq!(expected, actual);
2276     Ok(())
2277 }
2278 
2279 // TODO Tests to prove correct implementation of INNER JOIN's with qualified names.
2280 //  https://issues.apache.org/jira/projects/ARROW/issues/ARROW-11432.
2281 #[tokio::test]
2282 #[ignore]
inner_join_qualified_names() -> Result<()>2283 async fn inner_join_qualified_names() -> Result<()> {
2284     // Setup the statements that test qualified names function correctly.
2285     let equivalent_sql = [
2286         "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
2287             FROM t1
2288             INNER JOIN t2 ON t1.a = t2.a
2289             ORDER BY t1.a",
2290         "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
2291             FROM t1
2292             INNER JOIN t2 ON t2.a = t1.a
2293             ORDER BY t1.a",
2294     ];
2295 
2296     let expected = vec![
2297         vec!["1", "10", "50", "1", "100", "500"],
2298         vec!["2", "20", "60", "2", "20", "600"],
2299         vec!["4", "40", "80", "4", "400", "800"],
2300     ];
2301 
2302     for sql in equivalent_sql.iter() {
2303         let mut ctx = create_join_context_qualified()?;
2304         let actual = execute(&mut ctx, sql).await;
2305         assert_eq!(expected, actual);
2306     }
2307     Ok(())
2308 }
2309