1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 //! SQL Query Planner (produces logical plan from SQL AST)
19 
20 use std::str::FromStr;
21 use std::sync::Arc;
22 
23 use crate::datasource::TableProvider;
24 use crate::logical_plan::Expr::Alias;
25 use crate::logical_plan::{
26     and, lit, DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
27     StringifiedPlan, ToDFSchema,
28 };
29 use crate::scalar::ScalarValue;
30 use crate::{
31     error::{DataFusionError, Result},
32     physical_plan::udaf::AggregateUDF,
33 };
34 use crate::{
35     physical_plan::udf::ScalarUDF,
36     physical_plan::{aggregates, functions},
37     sql::parser::{CreateExternalTable, FileType, Statement as DFStatement},
38 };
39 
40 use arrow::datatypes::*;
41 
42 use crate::prelude::JoinType;
43 use sqlparser::ast::{
44     BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
45     Join, JoinConstraint, JoinOperator, Query, Select, SelectItem, SetExpr, TableFactor,
46     TableWithJoins, UnaryOperator, Value,
47 };
48 use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
49 use sqlparser::ast::{OrderByExpr, Statement};
50 use sqlparser::parser::ParserError::ParserError;
51 
52 use super::utils::{
53     can_columns_satisfy_exprs, expand_wildcard, expr_as_column_expr, extract_aliases,
54     find_aggregate_exprs, find_column_exprs, rebase_expr, resolve_aliases_to_exprs,
55 };
56 
57 /// The ContextProvider trait allows the query planner to obtain meta-data about tables and
58 /// functions referenced in SQL statements
59 pub trait ContextProvider {
60     /// Getter for a datasource
get_table_provider( &self, name: &str, ) -> Option<Arc<dyn TableProvider + Send + Sync>>61     fn get_table_provider(
62         &self,
63         name: &str,
64     ) -> Option<Arc<dyn TableProvider + Send + Sync>>;
65     /// Getter for a UDF description
get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>66     fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
67     /// Getter for a UDAF description
get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>68     fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>;
69 }
70 
71 /// SQL query planner
72 pub struct SqlToRel<'a, S: ContextProvider> {
73     schema_provider: &'a S,
74 }
75 
76 impl<'a, S: ContextProvider> SqlToRel<'a, S> {
77     /// Create a new query planner
new(schema_provider: &'a S) -> Self78     pub fn new(schema_provider: &'a S) -> Self {
79         SqlToRel { schema_provider }
80     }
81 
82     /// Generate a logical plan from an DataFusion SQL statement
statement_to_plan(&self, statement: &DFStatement) -> Result<LogicalPlan>83     pub fn statement_to_plan(&self, statement: &DFStatement) -> Result<LogicalPlan> {
84         match statement {
85             DFStatement::CreateExternalTable(s) => self.external_table_to_plan(&s),
86             DFStatement::Statement(s) => self.sql_statement_to_plan(&s),
87         }
88     }
89 
90     /// Generate a logical plan from an SQL statement
sql_statement_to_plan(&self, sql: &Statement) -> Result<LogicalPlan>91     pub fn sql_statement_to_plan(&self, sql: &Statement) -> Result<LogicalPlan> {
92         match sql {
93             Statement::Explain {
94                 verbose,
95                 statement,
96                 analyze: _,
97             } => self.explain_statement_to_plan(*verbose, &statement),
98             Statement::Query(query) => self.query_to_plan(&query),
99             _ => Err(DataFusionError::NotImplemented(
100                 "Only SELECT statements are implemented".to_string(),
101             )),
102         }
103     }
104 
105     /// Generate a logic plan from an SQL query
query_to_plan(&self, query: &Query) -> Result<LogicalPlan>106     pub fn query_to_plan(&self, query: &Query) -> Result<LogicalPlan> {
107         let plan = match &query.body {
108             SetExpr::Select(s) => self.select_to_plan(s.as_ref()),
109             _ => Err(DataFusionError::NotImplemented(format!(
110                 "Query {} not implemented yet",
111                 query.body
112             ))),
113         }?;
114 
115         let plan = self.order_by(&plan, &query.order_by)?;
116 
117         self.limit(&plan, &query.limit)
118     }
119 
120     /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
external_table_to_plan( &self, statement: &CreateExternalTable, ) -> Result<LogicalPlan>121     pub fn external_table_to_plan(
122         &self,
123         statement: &CreateExternalTable,
124     ) -> Result<LogicalPlan> {
125         let CreateExternalTable {
126             name,
127             columns,
128             file_type,
129             has_header,
130             location,
131         } = statement;
132 
133         // semantic checks
134         match *file_type {
135             FileType::CSV => {
136                 if columns.is_empty() {
137                     return Err(DataFusionError::Plan(
138                         "Column definitions required for CSV files. None found".into(),
139                     ));
140                 }
141             }
142             FileType::Parquet => {
143                 if !columns.is_empty() {
144                     return Err(DataFusionError::Plan(
145                         "Column definitions can not be specified for PARQUET files."
146                             .into(),
147                     ));
148                 }
149             }
150             FileType::NdJson => {}
151         };
152 
153         let schema = self.build_schema(&columns)?;
154 
155         Ok(LogicalPlan::CreateExternalTable {
156             schema: schema.to_dfschema_ref()?,
157             name: name.clone(),
158             location: location.clone(),
159             file_type: *file_type,
160             has_header: *has_header,
161         })
162     }
163 
164     /// Generate a plan for EXPLAIN ... that will print out a plan
165     ///
explain_statement_to_plan( &self, verbose: bool, statement: &Statement, ) -> Result<LogicalPlan>166     pub fn explain_statement_to_plan(
167         &self,
168         verbose: bool,
169         statement: &Statement,
170     ) -> Result<LogicalPlan> {
171         let plan = self.sql_statement_to_plan(&statement)?;
172 
173         let stringified_plans = vec![StringifiedPlan::new(
174             PlanType::LogicalPlan,
175             format!("{:#?}", plan),
176         )];
177 
178         let schema = LogicalPlan::explain_schema();
179         let plan = Arc::new(plan);
180 
181         Ok(LogicalPlan::Explain {
182             verbose,
183             plan,
184             stringified_plans,
185             schema: schema.to_dfschema_ref()?,
186         })
187     }
188 
build_schema(&self, columns: &[SQLColumnDef]) -> Result<Schema>189     fn build_schema(&self, columns: &[SQLColumnDef]) -> Result<Schema> {
190         let mut fields = Vec::new();
191 
192         for column in columns {
193             let data_type = self.make_data_type(&column.data_type)?;
194             let allow_null = column
195                 .options
196                 .iter()
197                 .any(|x| x.option == ColumnOption::Null);
198             fields.push(Field::new(&column.name.value, data_type, allow_null));
199         }
200 
201         Ok(Schema::new(fields))
202     }
203 
204     /// Maps the SQL type to the corresponding Arrow `DataType`
make_data_type(&self, sql_type: &SQLDataType) -> Result<DataType>205     fn make_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
206         match sql_type {
207             SQLDataType::BigInt => Ok(DataType::Int64),
208             SQLDataType::Int => Ok(DataType::Int32),
209             SQLDataType::SmallInt => Ok(DataType::Int16),
210             SQLDataType::Char(_) | SQLDataType::Varchar(_) | SQLDataType::Text => {
211                 Ok(DataType::Utf8)
212             }
213             SQLDataType::Decimal(_, _) => Ok(DataType::Float64),
214             SQLDataType::Float(_) => Ok(DataType::Float32),
215             SQLDataType::Real | SQLDataType::Double => Ok(DataType::Float64),
216             SQLDataType::Boolean => Ok(DataType::Boolean),
217             SQLDataType::Date => Ok(DataType::Date32),
218             SQLDataType::Time => Ok(DataType::Time64(TimeUnit::Millisecond)),
219             SQLDataType::Timestamp => Ok(DataType::Date64),
220             _ => Err(DataFusionError::NotImplemented(format!(
221                 "The SQL data type {:?} is not implemented",
222                 sql_type
223             ))),
224         }
225     }
226 
plan_from_tables(&self, from: &[TableWithJoins]) -> Result<Vec<LogicalPlan>>227     fn plan_from_tables(&self, from: &[TableWithJoins]) -> Result<Vec<LogicalPlan>> {
228         match from.len() {
229             0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
230             _ => from
231                 .iter()
232                 .map(|t| self.plan_table_with_joins(t))
233                 .collect::<Result<Vec<_>>>(),
234         }
235     }
236 
plan_table_with_joins(&self, t: &TableWithJoins) -> Result<LogicalPlan>237     fn plan_table_with_joins(&self, t: &TableWithJoins) -> Result<LogicalPlan> {
238         let left = self.create_relation(&t.relation)?;
239         match t.joins.len() {
240             0 => Ok(left),
241             n => {
242                 let mut left = self.parse_relation_join(&left, &t.joins[0])?;
243                 for i in 1..n {
244                     left = self.parse_relation_join(&left, &t.joins[i])?;
245                 }
246                 Ok(left)
247             }
248         }
249     }
250 
parse_relation_join( &self, left: &LogicalPlan, join: &Join, ) -> Result<LogicalPlan>251     fn parse_relation_join(
252         &self,
253         left: &LogicalPlan,
254         join: &Join,
255     ) -> Result<LogicalPlan> {
256         let right = self.create_relation(&join.relation)?;
257         match &join.join_operator {
258             JoinOperator::LeftOuter(constraint) => {
259                 self.parse_join(left, &right, constraint, JoinType::Left)
260             }
261             JoinOperator::RightOuter(constraint) => {
262                 self.parse_join(left, &right, constraint, JoinType::Right)
263             }
264             JoinOperator::Inner(constraint) => {
265                 self.parse_join(left, &right, constraint, JoinType::Inner)
266             }
267             other => Err(DataFusionError::NotImplemented(format!(
268                 "Unsupported JOIN operator {:?}",
269                 other
270             ))),
271         }
272     }
273 
parse_join( &self, left: &LogicalPlan, right: &LogicalPlan, constraint: &JoinConstraint, join_type: JoinType, ) -> Result<LogicalPlan>274     fn parse_join(
275         &self,
276         left: &LogicalPlan,
277         right: &LogicalPlan,
278         constraint: &JoinConstraint,
279         join_type: JoinType,
280     ) -> Result<LogicalPlan> {
281         match constraint {
282             JoinConstraint::On(sql_expr) => {
283                 let mut keys: Vec<(String, String)> = vec![];
284                 let join_schema = left.schema().join(&right.schema())?;
285 
286                 // parse ON expression
287                 let expr = self.sql_to_rex(sql_expr, &join_schema)?;
288 
289                 // extract join keys
290                 extract_join_keys(&expr, &mut keys)?;
291                 let left_keys: Vec<&str> =
292                     keys.iter().map(|pair| pair.0.as_str()).collect();
293                 let right_keys: Vec<&str> =
294                     keys.iter().map(|pair| pair.1.as_str()).collect();
295 
296                 // return the logical plan representing the join
297                 LogicalPlanBuilder::from(&left)
298                     .join(&right, join_type, &left_keys, &right_keys)?
299                     .build()
300             }
301             JoinConstraint::Using(idents) => {
302                 let keys: Vec<&str> = idents.iter().map(|x| x.value.as_str()).collect();
303                 LogicalPlanBuilder::from(&left)
304                     .join(&right, join_type, &keys, &keys)?
305                     .build()
306             }
307             JoinConstraint::Natural => {
308                 // https://issues.apache.org/jira/browse/ARROW-10727
309                 Err(DataFusionError::NotImplemented(
310                     "NATURAL JOIN is not supported (https://issues.apache.org/jira/browse/ARROW-10727)".to_string(),
311                 ))
312             }
313             JoinConstraint::None => Err(DataFusionError::NotImplemented(
314                 "NONE contraint is not supported".to_string(),
315             )),
316         }
317     }
318 
create_relation(&self, relation: &TableFactor) -> Result<LogicalPlan>319     fn create_relation(&self, relation: &TableFactor) -> Result<LogicalPlan> {
320         match relation {
321             TableFactor::Table { name, .. } => {
322                 let table_name = name.to_string();
323                 match self.schema_provider.get_table_provider(&table_name) {
324                     Some(provider) => {
325                         LogicalPlanBuilder::scan(&table_name, provider, None)?.build()
326                     }
327                     None => Err(DataFusionError::Plan(format!(
328                         "no provider found for table {}",
329                         name
330                     ))),
331                 }
332             }
333             TableFactor::Derived { subquery, .. } => self.query_to_plan(subquery),
334             TableFactor::NestedJoin(table_with_joins) => {
335                 self.plan_table_with_joins(table_with_joins)
336             }
337             // @todo Support TableFactory::TableFunction?
338             _ => Err(DataFusionError::NotImplemented(format!(
339                 "Unsupported ast node {:?} in create_relation",
340                 relation
341             ))),
342         }
343     }
344 
345     /// Generate a logic plan from an SQL select
select_to_plan(&self, select: &Select) -> Result<LogicalPlan>346     fn select_to_plan(&self, select: &Select) -> Result<LogicalPlan> {
347         let plans = self.plan_from_tables(&select.from)?;
348 
349         let plan = match &select.selection {
350             Some(predicate_expr) => {
351                 // build join schema
352                 let mut fields = vec![];
353                 for plan in &plans {
354                     fields.extend_from_slice(&plan.schema().fields());
355                 }
356                 let join_schema = DFSchema::new(fields)?;
357 
358                 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema)?;
359 
360                 // look for expressions of the form `<column> = <column>`
361                 let mut possible_join_keys = vec![];
362                 extract_possible_join_keys(&filter_expr, &mut possible_join_keys)?;
363 
364                 let mut all_join_keys = vec![];
365                 let mut left = plans[0].clone();
366                 for right in plans.iter().skip(1) {
367                     let left_schema = left.schema();
368                     let right_schema = right.schema();
369                     let mut join_keys = vec![];
370                     for (l, r) in &possible_join_keys {
371                         if left_schema.field_with_unqualified_name(l).is_ok()
372                             && right_schema.field_with_unqualified_name(r).is_ok()
373                         {
374                             join_keys.push((l.as_str(), r.as_str()));
375                         } else if left_schema.field_with_unqualified_name(r).is_ok()
376                             && right_schema.field_with_unqualified_name(l).is_ok()
377                         {
378                             join_keys.push((r.as_str(), l.as_str()));
379                         }
380                     }
381                     if join_keys.is_empty() {
382                         return Err(DataFusionError::NotImplemented(
383                             "Cartesian joins are not supported".to_string(),
384                         ));
385                     } else {
386                         let left_keys: Vec<_> =
387                             join_keys.iter().map(|(l, _)| *l).collect();
388                         let right_keys: Vec<_> =
389                             join_keys.iter().map(|(_, r)| *r).collect();
390                         let builder = LogicalPlanBuilder::from(&left);
391                         left = builder
392                             .join(right, JoinType::Inner, &left_keys, &right_keys)?
393                             .build()?;
394                     }
395                     all_join_keys.extend_from_slice(&join_keys);
396                 }
397 
398                 // remove join expressions from filter
399                 match remove_join_expressions(&filter_expr, &all_join_keys)? {
400                     Some(filter_expr) => {
401                         LogicalPlanBuilder::from(&left).filter(filter_expr)?.build()
402                     }
403                     _ => Ok(left),
404                 }
405             }
406             None => {
407                 if plans.len() == 1 {
408                     Ok(plans[0].clone())
409                 } else {
410                     Err(DataFusionError::NotImplemented(
411                         "Cartesian joins are not supported".to_string(),
412                     ))
413                 }
414             }
415         };
416         let plan = plan?;
417 
418         // The SELECT expressions, with wildcards expanded.
419         let select_exprs = self.prepare_select_exprs(&plan, &select.projection)?;
420 
421         // Optionally the HAVING expression.
422         let having_expr_opt = select
423             .having
424             .as_ref()
425             .map::<Result<Expr>, _>(|having_expr| {
426                 let having_expr = self.sql_expr_to_logical_expr(having_expr)?;
427 
428                 // This step "dereferences" any aliases in the HAVING clause.
429                 //
430                 // This is how we support queries with HAVING expressions that
431                 // refer to aliased columns.
432                 //
433                 // For example:
434                 //
435                 //   SELECT c1 AS m FROM t HAVING m > 10;
436                 //   SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING m > 10;
437                 //
438                 // are rewritten as, respectively:
439                 //
440                 //   SELECT c1 AS m FROM t HAVING c1 > 10;
441                 //   SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING MAX(c2) > 10;
442                 //
443                 let having_expr = resolve_aliases_to_exprs(
444                     &having_expr,
445                     &extract_aliases(&select_exprs),
446                 )?;
447 
448                 Ok(having_expr)
449             })
450             .transpose()?;
451 
452         // The outer expressions we will search through for
453         // aggregates. Aggregates may be sourced from the SELECT...
454         let mut aggr_expr_haystack = select_exprs.clone();
455 
456         // ... or from the HAVING.
457         if let Some(having_expr) = &having_expr_opt {
458             aggr_expr_haystack.push(having_expr.clone());
459         }
460 
461         // All of the aggregate expressions (deduplicated).
462         let aggr_exprs = find_aggregate_exprs(&aggr_expr_haystack);
463 
464         let (plan, select_exprs_post_aggr, having_expr_post_aggr_opt) =
465             if !select.group_by.is_empty() || !aggr_exprs.is_empty() {
466                 self.aggregate(
467                     &plan,
468                     &select_exprs,
469                     &having_expr_opt,
470                     &select.group_by,
471                     &aggr_exprs,
472                 )?
473             } else {
474                 if let Some(having_expr) = &having_expr_opt {
475                     let available_columns = select_exprs
476                         .iter()
477                         .map(|expr| expr_as_column_expr(expr, &plan))
478                         .collect::<Result<Vec<Expr>>>()?;
479 
480                     // Ensure the HAVING expression is using only columns
481                     // provided by the SELECT.
482                     if !can_columns_satisfy_exprs(
483                         &available_columns,
484                         &[having_expr.clone()],
485                     )? {
486                         return Err(DataFusionError::Plan(
487                             "Having references column(s) not provided by the select"
488                                 .to_owned(),
489                         ));
490                     }
491                 }
492 
493                 (plan, select_exprs, having_expr_opt)
494             };
495 
496         let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr_opt {
497             LogicalPlanBuilder::from(&plan)
498                 .filter(having_expr_post_aggr)?
499                 .build()?
500         } else {
501             plan
502         };
503 
504         self.project(&plan, &select_exprs_post_aggr, false)
505     }
506 
507     /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
508     ///
509     /// Wildcards are expanded into the concrete list of columns.
prepare_select_exprs( &self, plan: &LogicalPlan, projection: &[SelectItem], ) -> Result<Vec<Expr>>510     fn prepare_select_exprs(
511         &self,
512         plan: &LogicalPlan,
513         projection: &[SelectItem],
514     ) -> Result<Vec<Expr>> {
515         let input_schema = plan.schema();
516 
517         Ok(projection
518             .iter()
519             .map(|expr| self.sql_select_to_rex(&expr, &input_schema))
520             .collect::<Result<Vec<Expr>>>()?
521             .iter()
522             .flat_map(|expr| expand_wildcard(&expr, &input_schema))
523             .collect::<Vec<Expr>>())
524     }
525 
526     /// Wrap a plan in a projection
527     ///
528     /// If the `force` argument is `false`, the projection is applied only when
529     /// necessary, i.e., when the input fields are different than the
530     /// projection. Note that if the input fields are the same, but out of
531     /// order, the projection will be applied.
project( &self, input: &LogicalPlan, expr: &[Expr], force: bool, ) -> Result<LogicalPlan>532     fn project(
533         &self,
534         input: &LogicalPlan,
535         expr: &[Expr],
536         force: bool,
537     ) -> Result<LogicalPlan> {
538         self.validate_schema_satisfies_exprs(&input.schema(), &expr)?;
539         let plan = LogicalPlanBuilder::from(input).project(expr)?.build()?;
540 
541         let project = force
542             || match input {
543                 LogicalPlan::TableScan { .. } => true,
544                 _ => plan.schema().fields() != input.schema().fields(),
545             };
546 
547         if project {
548             Ok(plan)
549         } else {
550             Ok(input.clone())
551         }
552     }
553 
aggregate( &self, input: &LogicalPlan, select_exprs: &[Expr], having_expr_opt: &Option<Expr>, group_by: &[SQLExpr], aggr_exprs: &[Expr], ) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)>554     fn aggregate(
555         &self,
556         input: &LogicalPlan,
557         select_exprs: &[Expr],
558         having_expr_opt: &Option<Expr>,
559         group_by: &[SQLExpr],
560         aggr_exprs: &[Expr],
561     ) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
562         let group_by_exprs = group_by
563             .iter()
564             .map(|e| self.sql_to_rex(e, &input.schema()))
565             .collect::<Result<Vec<Expr>>>()?;
566 
567         let aggr_projection_exprs = group_by_exprs
568             .iter()
569             .chain(aggr_exprs.iter())
570             .cloned()
571             .collect::<Vec<Expr>>();
572 
573         let plan = LogicalPlanBuilder::from(&input)
574             .aggregate(&group_by_exprs, aggr_exprs)?
575             .build()?;
576 
577         // After aggregation, these are all of the columns that will be
578         // available to next phases of planning.
579         let column_exprs_post_aggr = aggr_projection_exprs
580             .iter()
581             .map(|expr| expr_as_column_expr(expr, input))
582             .collect::<Result<Vec<Expr>>>()?;
583 
584         // Rewrite the SELECT expression to use the columns produced by the
585         // aggregation.
586         let select_exprs_post_aggr = select_exprs
587             .iter()
588             .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input))
589             .collect::<Result<Vec<Expr>>>()?;
590 
591         if !can_columns_satisfy_exprs(&column_exprs_post_aggr, &select_exprs_post_aggr)? {
592             return Err(DataFusionError::Plan(
593                 "Projection references non-aggregate values".to_owned(),
594             ));
595         }
596 
597         // Rewrite the HAVING expression to use the columns produced by the
598         // aggregation.
599         let having_expr_post_aggr_opt = if let Some(having_expr) = having_expr_opt {
600             let having_expr_post_aggr =
601                 rebase_expr(having_expr, &aggr_projection_exprs, input)?;
602 
603             if !can_columns_satisfy_exprs(
604                 &column_exprs_post_aggr,
605                 &[having_expr_post_aggr.clone()],
606             )? {
607                 return Err(DataFusionError::Plan(
608                     "Having references non-aggregate values".to_owned(),
609                 ));
610             }
611 
612             Some(having_expr_post_aggr)
613         } else {
614             None
615         };
616 
617         Ok((plan, select_exprs_post_aggr, having_expr_post_aggr_opt))
618     }
619 
620     /// Wrap a plan in a limit
limit(&self, input: &LogicalPlan, limit: &Option<SQLExpr>) -> Result<LogicalPlan>621     fn limit(&self, input: &LogicalPlan, limit: &Option<SQLExpr>) -> Result<LogicalPlan> {
622         match *limit {
623             Some(ref limit_expr) => {
624                 let n = match self.sql_to_rex(&limit_expr, &input.schema())? {
625                     Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as usize),
626                     _ => Err(DataFusionError::Plan(
627                         "Unexpected expression for LIMIT clause".to_string(),
628                     )),
629                 }?;
630 
631                 LogicalPlanBuilder::from(&input).limit(n)?.build()
632             }
633             _ => Ok(input.clone()),
634         }
635     }
636 
637     /// Wrap the logical in a sort
order_by( &self, plan: &LogicalPlan, order_by: &[OrderByExpr], ) -> Result<LogicalPlan>638     fn order_by(
639         &self,
640         plan: &LogicalPlan,
641         order_by: &[OrderByExpr],
642     ) -> Result<LogicalPlan> {
643         if order_by.is_empty() {
644             return Ok(plan.clone());
645         }
646 
647         let input_schema = plan.schema();
648         let order_by_rex: Result<Vec<Expr>> = order_by
649             .iter()
650             .map(|e| {
651                 Ok(Expr::Sort {
652                     expr: Box::new(self.sql_to_rex(&e.expr, &input_schema)?),
653                     // by default asc
654                     asc: e.asc.unwrap_or(true),
655                     // by default nulls first to be consistent with spark
656                     nulls_first: e.nulls_first.unwrap_or(true),
657                 })
658             })
659             .collect();
660 
661         LogicalPlanBuilder::from(&plan)
662             .sort(&order_by_rex?)?
663             .build()
664     }
665 
666     /// Validate the schema provides all of the columns referenced in the expressions.
validate_schema_satisfies_exprs( &self, schema: &DFSchema, exprs: &[Expr], ) -> Result<()>667     fn validate_schema_satisfies_exprs(
668         &self,
669         schema: &DFSchema,
670         exprs: &[Expr],
671     ) -> Result<()> {
672         find_column_exprs(exprs)
673             .iter()
674             .try_for_each(|col| match col {
675                 Expr::Column(name) => {
676                     schema.field_with_unqualified_name(&name).map_err(|_| {
677                         DataFusionError::Plan(format!(
678                             "Invalid identifier '{}' for schema {}",
679                             name,
680                             schema.to_string()
681                         ))
682                     })?;
683                     Ok(())
684                 }
685                 _ => Err(DataFusionError::Internal("Not a column".to_string())),
686             })
687     }
688 
689     /// Generate a relational expression from a select SQL expression
sql_select_to_rex(&self, sql: &SelectItem, schema: &DFSchema) -> Result<Expr>690     fn sql_select_to_rex(&self, sql: &SelectItem, schema: &DFSchema) -> Result<Expr> {
691         match sql {
692             SelectItem::UnnamedExpr(expr) => self.sql_to_rex(expr, schema),
693             SelectItem::ExprWithAlias { expr, alias } => Ok(Alias(
694                 Box::new(self.sql_to_rex(&expr, schema)?),
695                 alias.value.clone(),
696             )),
697             SelectItem::Wildcard => Ok(Expr::Wildcard),
698             SelectItem::QualifiedWildcard(_) => Err(DataFusionError::NotImplemented(
699                 "Qualified wildcards are not supported".to_string(),
700             )),
701         }
702     }
703 
704     /// Generate a relational expression from a SQL expression
sql_to_rex(&self, sql: &SQLExpr, schema: &DFSchema) -> Result<Expr>705     pub fn sql_to_rex(&self, sql: &SQLExpr, schema: &DFSchema) -> Result<Expr> {
706         let expr = self.sql_expr_to_logical_expr(sql)?;
707         self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
708         Ok(expr)
709     }
710 
sql_fn_arg_to_logical_expr(&self, sql: &FunctionArg) -> Result<Expr>711     fn sql_fn_arg_to_logical_expr(&self, sql: &FunctionArg) -> Result<Expr> {
712         match sql {
713             FunctionArg::Named { name: _, arg } => self.sql_expr_to_logical_expr(arg),
714             FunctionArg::Unnamed(value) => self.sql_expr_to_logical_expr(value),
715         }
716     }
717 
sql_expr_to_logical_expr(&self, sql: &SQLExpr) -> Result<Expr>718     fn sql_expr_to_logical_expr(&self, sql: &SQLExpr) -> Result<Expr> {
719         match sql {
720             SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
721                 Ok(n) => Ok(lit(n)),
722                 Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
723             },
724             SQLExpr::Value(Value::SingleQuotedString(ref s)) => Ok(lit(s.clone())),
725 
726             SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
727 
728             SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))),
729             SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
730                 fun: functions::BuiltinScalarFunction::DatePart,
731                 args: vec![
732                     Expr::Literal(ScalarValue::Utf8(Some(format!("{}", field)))),
733                     self.sql_expr_to_logical_expr(expr)?,
734                 ],
735             }),
736 
737             SQLExpr::Value(Value::Interval {
738                 value,
739                 leading_field,
740                 leading_precision,
741                 last_field,
742                 fractional_seconds_precision,
743             }) => self.sql_interval_to_literal(
744                 value,
745                 leading_field,
746                 leading_precision,
747                 last_field,
748                 fractional_seconds_precision,
749             ),
750 
751             SQLExpr::Identifier(ref id) => {
752                 if &id.value[0..1] == "@" {
753                     let var_names = vec![id.value.clone()];
754                     Ok(Expr::ScalarVariable(var_names))
755                 } else {
756                     Ok(Expr::Column(id.value.to_string()))
757                 }
758             }
759 
760             SQLExpr::CompoundIdentifier(ids) => {
761                 let mut var_names = vec![];
762                 for id in ids {
763                     var_names.push(id.value.clone());
764                 }
765                 if &var_names[0][0..1] == "@" {
766                     Ok(Expr::ScalarVariable(var_names))
767                 } else {
768                     Err(DataFusionError::NotImplemented(format!(
769                         "Unsupported compound identifier '{:?}'",
770                         var_names,
771                     )))
772                 }
773             }
774 
775             SQLExpr::Wildcard => Ok(Expr::Wildcard),
776 
777             SQLExpr::Case {
778                 operand,
779                 conditions,
780                 results,
781                 else_result,
782             } => {
783                 let expr = if let Some(e) = operand {
784                     Some(Box::new(self.sql_expr_to_logical_expr(e)?))
785                 } else {
786                     None
787                 };
788                 let when_expr = conditions
789                     .iter()
790                     .map(|e| self.sql_expr_to_logical_expr(e))
791                     .collect::<Result<Vec<_>>>()?;
792                 let then_expr = results
793                     .iter()
794                     .map(|e| self.sql_expr_to_logical_expr(e))
795                     .collect::<Result<Vec<_>>>()?;
796                 let else_expr = if let Some(e) = else_result {
797                     Some(Box::new(self.sql_expr_to_logical_expr(e)?))
798                 } else {
799                     None
800                 };
801 
802                 Ok(Expr::Case {
803                     expr,
804                     when_then_expr: when_expr
805                         .iter()
806                         .zip(then_expr.iter())
807                         .map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned())))
808                         .collect(),
809                     else_expr,
810                 })
811             }
812 
813             SQLExpr::Cast {
814                 ref expr,
815                 ref data_type,
816             } => Ok(Expr::Cast {
817                 expr: Box::new(self.sql_expr_to_logical_expr(&expr)?),
818                 data_type: convert_data_type(data_type)?,
819             }),
820 
821             SQLExpr::TypedString {
822                 ref data_type,
823                 ref value,
824             } => Ok(Expr::Cast {
825                 expr: Box::new(lit(&**value)),
826                 data_type: convert_data_type(data_type)?,
827             }),
828 
829             SQLExpr::IsNull(ref expr) => {
830                 Ok(Expr::IsNull(Box::new(self.sql_expr_to_logical_expr(expr)?)))
831             }
832 
833             SQLExpr::IsNotNull(ref expr) => Ok(Expr::IsNotNull(Box::new(
834                 self.sql_expr_to_logical_expr(expr)?,
835             ))),
836 
837             SQLExpr::UnaryOp { ref op, ref expr } => match op {
838                 UnaryOperator::Not => {
839                     Ok(Expr::Not(Box::new(self.sql_expr_to_logical_expr(expr)?)))
840                 }
841                 UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr)?),
842                 UnaryOperator::Minus => {
843                     match expr.as_ref() {
844                         // optimization: if it's a number literal, we applly the negative operator
845                         // here directly to calculate the new literal.
846                         SQLExpr::Value(Value::Number(n,_)) => match n.parse::<i64>() {
847                             Ok(n) => Ok(lit(-n)),
848                             Err(_) => Ok(lit(-n
849                                 .parse::<f64>()
850                                 .map_err(|_e| {
851                                     DataFusionError::Internal(format!(
852                                         "negative operator can be only applied to integer and float operands, got: {}",
853                                     n))
854                                 })?)),
855                         },
856                         // not a literal, apply negative operator on expression
857                         _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr)?))),
858                     }
859                 }
860                 _ => Err(DataFusionError::NotImplemented(format!(
861                     "Unsupported SQL unary operator {:?}",
862                     op
863                 ))),
864             },
865 
866             SQLExpr::Between {
867                 ref expr,
868                 ref negated,
869                 ref low,
870                 ref high,
871             } => Ok(Expr::Between {
872                 expr: Box::new(self.sql_expr_to_logical_expr(&expr)?),
873                 negated: *negated,
874                 low: Box::new(self.sql_expr_to_logical_expr(&low)?),
875                 high: Box::new(self.sql_expr_to_logical_expr(&high)?),
876             }),
877 
878             SQLExpr::InList {
879                 ref expr,
880                 ref list,
881                 ref negated,
882             } => {
883                 let list_expr = list
884                     .iter()
885                     .map(|e| self.sql_expr_to_logical_expr(e))
886                     .collect::<Result<Vec<_>>>()?;
887 
888                 Ok(Expr::InList {
889                     expr: Box::new(self.sql_expr_to_logical_expr(&expr)?),
890                     list: list_expr,
891                     negated: *negated,
892                 })
893             }
894 
895             SQLExpr::BinaryOp {
896                 ref left,
897                 ref op,
898                 ref right,
899             } => {
900                 let operator = match *op {
901                     BinaryOperator::Gt => Ok(Operator::Gt),
902                     BinaryOperator::GtEq => Ok(Operator::GtEq),
903                     BinaryOperator::Lt => Ok(Operator::Lt),
904                     BinaryOperator::LtEq => Ok(Operator::LtEq),
905                     BinaryOperator::Eq => Ok(Operator::Eq),
906                     BinaryOperator::NotEq => Ok(Operator::NotEq),
907                     BinaryOperator::Plus => Ok(Operator::Plus),
908                     BinaryOperator::Minus => Ok(Operator::Minus),
909                     BinaryOperator::Multiply => Ok(Operator::Multiply),
910                     BinaryOperator::Divide => Ok(Operator::Divide),
911                     BinaryOperator::Modulus => Ok(Operator::Modulus),
912                     BinaryOperator::And => Ok(Operator::And),
913                     BinaryOperator::Or => Ok(Operator::Or),
914                     BinaryOperator::Like => Ok(Operator::Like),
915                     BinaryOperator::NotLike => Ok(Operator::NotLike),
916                     _ => Err(DataFusionError::NotImplemented(format!(
917                         "Unsupported SQL binary operator {:?}",
918                         op
919                     ))),
920                 }?;
921 
922                 Ok(Expr::BinaryExpr {
923                     left: Box::new(self.sql_expr_to_logical_expr(&left)?),
924                     op: operator,
925                     right: Box::new(self.sql_expr_to_logical_expr(&right)?),
926                 })
927             }
928 
929             SQLExpr::Function(function) => {
930                 let name: String = function.name.to_string();
931 
932                 // first, scalar built-in
933                 if let Ok(fun) = functions::BuiltinScalarFunction::from_str(&name) {
934                     let args = function
935                         .args
936                         .iter()
937                         .map(|a| self.sql_fn_arg_to_logical_expr(a))
938                         .collect::<Result<Vec<Expr>>>()?;
939 
940                     return Ok(Expr::ScalarFunction { fun, args });
941                 };
942 
943                 // next, aggregate built-ins
944                 if let Ok(fun) = aggregates::AggregateFunction::from_str(&name) {
945                     let args = if fun == aggregates::AggregateFunction::Count {
946                         function
947                             .args
948                             .iter()
949                             .map(|a| match a {
950                                 FunctionArg::Unnamed(SQLExpr::Value(Value::Number(
951                                     _,
952                                     _,
953                                 ))) => Ok(lit(1_u8)),
954                                 FunctionArg::Unnamed(SQLExpr::Wildcard) => Ok(lit(1_u8)),
955                                 _ => self.sql_fn_arg_to_logical_expr(a),
956                             })
957                             .collect::<Result<Vec<Expr>>>()?
958                     } else {
959                         function
960                             .args
961                             .iter()
962                             .map(|a| self.sql_fn_arg_to_logical_expr(a))
963                             .collect::<Result<Vec<Expr>>>()?
964                     };
965 
966                     return Ok(Expr::AggregateFunction {
967                         fun,
968                         distinct: function.distinct,
969                         args,
970                     });
971                 };
972 
973                 // finally, user-defined functions (UDF) and UDAF
974                 match self.schema_provider.get_function_meta(&name) {
975                     Some(fm) => {
976                         let args = function
977                             .args
978                             .iter()
979                             .map(|a| self.sql_fn_arg_to_logical_expr(a))
980                             .collect::<Result<Vec<Expr>>>()?;
981 
982                         Ok(Expr::ScalarUDF { fun: fm, args })
983                     }
984                     None => match self.schema_provider.get_aggregate_meta(&name) {
985                         Some(fm) => {
986                             let args = function
987                                 .args
988                                 .iter()
989                                 .map(|a| self.sql_fn_arg_to_logical_expr(a))
990                                 .collect::<Result<Vec<Expr>>>()?;
991 
992                             Ok(Expr::AggregateUDF { fun: fm, args })
993                         }
994                         _ => Err(DataFusionError::Plan(format!(
995                             "Invalid function '{}'",
996                             name
997                         ))),
998                     },
999                 }
1000             }
1001 
1002             SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(&e),
1003 
1004             _ => Err(DataFusionError::NotImplemented(format!(
1005                 "Unsupported ast node {:?} in sqltorel",
1006                 sql
1007             ))),
1008         }
1009     }
1010 
sql_interval_to_literal( &self, value: &str, leading_field: &Option<DateTimeField>, leading_precision: &Option<u64>, last_field: &Option<DateTimeField>, fractional_seconds_precision: &Option<u64>, ) -> Result<Expr>1011     fn sql_interval_to_literal(
1012         &self,
1013         value: &str,
1014         leading_field: &Option<DateTimeField>,
1015         leading_precision: &Option<u64>,
1016         last_field: &Option<DateTimeField>,
1017         fractional_seconds_precision: &Option<u64>,
1018     ) -> Result<Expr> {
1019         if leading_field.is_some() {
1020             return Err(DataFusionError::NotImplemented(format!(
1021                 "Unsupported Interval Expression with leading_field {:?}",
1022                 leading_field
1023             )));
1024         }
1025 
1026         if leading_precision.is_some() {
1027             return Err(DataFusionError::NotImplemented(format!(
1028                 "Unsupported Interval Expression with leading_precision {:?}",
1029                 leading_precision
1030             )));
1031         }
1032 
1033         if last_field.is_some() {
1034             return Err(DataFusionError::NotImplemented(format!(
1035                 "Unsupported Interval Expression with last_field {:?}",
1036                 last_field
1037             )));
1038         }
1039 
1040         if fractional_seconds_precision.is_some() {
1041             return Err(DataFusionError::NotImplemented(format!(
1042                 "Unsupported Interval Expression with fractional_seconds_precision {:?}",
1043                 fractional_seconds_precision
1044             )));
1045         }
1046 
1047         const SECONDS_PER_HOUR: f32 = 3_600_f32;
1048         const MILLIS_PER_SECOND: f32 = 1_000_f32;
1049 
1050         // We are storing parts as integers, it's why we need to align parts fractional
1051         // INTERVAL '0.5 MONTH' = 15 days, INTERVAL '1.5 MONTH' = 1 month 15 days
1052         // INTERVAL '0.5 DAY' = 12 hours, INTERVAL '1.5 DAY' = 1 day 12 hours
1053         let align_interval_parts = |month_part: f32,
1054                                     mut day_part: f32,
1055                                     mut milles_part: f32|
1056          -> (i32, i32, f32) {
1057             // Convert fractional month to days, It's not supported by Arrow types, but anyway
1058             day_part += (month_part - (month_part as i32) as f32) * 30_f32;
1059 
1060             // Convert fractional days to hours
1061             milles_part += (day_part - ((day_part as i32) as f32))
1062                 * 24_f32
1063                 * SECONDS_PER_HOUR
1064                 * MILLIS_PER_SECOND;
1065 
1066             (month_part as i32, day_part as i32, milles_part)
1067         };
1068 
1069         let calculate_from_part = |interval_period_str: &str,
1070                                    interval_type: &str|
1071          -> Result<(i32, i32, f32)> {
1072             // @todo It's better to use Decimal in order to protect rounding errors
1073             // Wait https://github.com/apache/arrow/pull/9232
1074             let interval_period = match f32::from_str(interval_period_str) {
1075                 Ok(n) => n,
1076                 Err(_) => {
1077                     return Err(DataFusionError::SQL(ParserError(format!(
1078                         "Unsupported Interval Expression with value {:?}",
1079                         value
1080                     ))))
1081                 }
1082             };
1083 
1084             if interval_period > (i32::MAX as f32) {
1085                 return Err(DataFusionError::NotImplemented(format!(
1086                     "Interval field value out of range: {:?}",
1087                     value
1088                 )));
1089             }
1090 
1091             match interval_type.to_lowercase().as_str() {
1092                 "year" => Ok(align_interval_parts(interval_period * 12_f32, 0.0, 0.0)),
1093                 "month" => Ok(align_interval_parts(interval_period, 0.0, 0.0)),
1094                 "day" | "days" => Ok(align_interval_parts(0.0, interval_period, 0.0)),
1095                 "hour" | "hours" => {
1096                     Ok((0, 0, interval_period * SECONDS_PER_HOUR * MILLIS_PER_SECOND))
1097                 }
1098                 "minutes" | "minute" => {
1099                     Ok((0, 0, interval_period * 60_f32 * MILLIS_PER_SECOND))
1100                 }
1101                 "seconds" | "second" => Ok((0, 0, interval_period * MILLIS_PER_SECOND)),
1102                 "milliseconds" | "millisecond" => Ok((0, 0, interval_period)),
1103                 _ => Err(DataFusionError::NotImplemented(format!(
1104                     "Invalid input syntax for type interval: {:?}",
1105                     value
1106                 ))),
1107             }
1108         };
1109 
1110         let mut result_month: i64 = 0;
1111         let mut result_days: i64 = 0;
1112         let mut result_millis: i64 = 0;
1113 
1114         let mut parts = value.split_whitespace();
1115 
1116         loop {
1117             let interval_period_str = parts.next();
1118             if interval_period_str.is_none() {
1119                 break;
1120             }
1121 
1122             let (diff_month, diff_days, diff_millis) = calculate_from_part(
1123                 interval_period_str.unwrap(),
1124                 parts.next().unwrap_or("second"),
1125             )?;
1126 
1127             result_month += diff_month as i64;
1128 
1129             if result_month > (i32::MAX as i64) {
1130                 return Err(DataFusionError::NotImplemented(format!(
1131                     "Interval field value out of range: {:?}",
1132                     value
1133                 )));
1134             }
1135 
1136             result_days += diff_days as i64;
1137 
1138             if result_days > (i32::MAX as i64) {
1139                 return Err(DataFusionError::NotImplemented(format!(
1140                     "Interval field value out of range: {:?}",
1141                     value
1142                 )));
1143             }
1144 
1145             result_millis += diff_millis as i64;
1146 
1147             if result_millis > (i32::MAX as i64) {
1148                 return Err(DataFusionError::NotImplemented(format!(
1149                     "Interval field value out of range: {:?}",
1150                     value
1151                 )));
1152             }
1153         }
1154 
1155         // Interval is tricky thing
1156         // 1 day is not 24 hours because timezones, 1 year != 365/364! 30 days != 1 month
1157         // The true way to store and calculate intervals is to store it as it defined
1158         // Due the fact that Arrow supports only two types YearMonth (month) and DayTime (day, time)
1159         // It's not possible to store complex intervals
1160         // It's possible to do select (NOW() + INTERVAL '1 year') + INTERVAL '1 day'; as workaround
1161         if result_month != 0 && (result_days != 0 || result_millis != 0) {
1162             return Err(DataFusionError::NotImplemented(format!(
1163                 "DF does not support intervals that have both a Year/Month part as well as Days/Hours/Mins/Seconds: {:?}. Hint: try breaking the interval into two parts, one with Year/Month and the other with Days/Hours/Mins/Seconds - e.g. (NOW() + INTERVAL '1 year') + INTERVAL '1 day'",
1164                 value
1165             )));
1166         }
1167 
1168         if result_month != 0 {
1169             return Ok(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
1170                 result_month as i32,
1171             ))));
1172         }
1173 
1174         let result: i64 = (result_days << 32) | result_millis;
1175         Ok(Expr::Literal(ScalarValue::IntervalDayTime(Some(result))))
1176     }
1177 }
1178 
1179 /// Remove join expressions from a filter expression
remove_join_expressions( expr: &Expr, join_columns: &[(&str, &str)], ) -> Result<Option<Expr>>1180 fn remove_join_expressions(
1181     expr: &Expr,
1182     join_columns: &[(&str, &str)],
1183 ) -> Result<Option<Expr>> {
1184     match expr {
1185         Expr::BinaryExpr { left, op, right } => match op {
1186             Operator::Eq => match (left.as_ref(), right.as_ref()) {
1187                 (Expr::Column(l), Expr::Column(r)) => {
1188                     if join_columns.contains(&(l, r)) || join_columns.contains(&(r, l)) {
1189                         Ok(None)
1190                     } else {
1191                         Ok(Some(expr.clone()))
1192                     }
1193                 }
1194                 _ => Ok(Some(expr.clone())),
1195             },
1196             Operator::And => {
1197                 let l = remove_join_expressions(left, join_columns)?;
1198                 let r = remove_join_expressions(right, join_columns)?;
1199                 match (l, r) {
1200                     (Some(ll), Some(rr)) => Ok(Some(and(ll, rr))),
1201                     (Some(ll), _) => Ok(Some(ll)),
1202                     (_, Some(rr)) => Ok(Some(rr)),
1203                     _ => Ok(None),
1204                 }
1205             }
1206             _ => Ok(Some(expr.clone())),
1207         },
1208         _ => Ok(Some(expr.clone())),
1209     }
1210 }
1211 
1212 /// Parse equijoin ON condition which could be a single Eq or multiple conjunctive Eqs
1213 ///
1214 /// Examples
1215 ///
1216 /// foo = bar
1217 /// foo = bar AND bar = baz AND ...
1218 ///
extract_join_keys(expr: &Expr, accum: &mut Vec<(String, String)>) -> Result<()>1219 fn extract_join_keys(expr: &Expr, accum: &mut Vec<(String, String)>) -> Result<()> {
1220     match expr {
1221         Expr::BinaryExpr { left, op, right } => match op {
1222             Operator::Eq => match (left.as_ref(), right.as_ref()) {
1223                 (Expr::Column(l), Expr::Column(r)) => {
1224                     accum.push((l.to_owned(), r.to_owned()));
1225                     Ok(())
1226                 }
1227                 other => Err(DataFusionError::SQL(ParserError(format!(
1228                     "Unsupported expression '{:?}' in JOIN condition",
1229                     other
1230                 )))),
1231             },
1232             Operator::And => {
1233                 extract_join_keys(left, accum)?;
1234                 extract_join_keys(right, accum)
1235             }
1236             other => Err(DataFusionError::SQL(ParserError(format!(
1237                 "Unsupported expression '{:?}' in JOIN condition",
1238                 other
1239             )))),
1240         },
1241         other => Err(DataFusionError::SQL(ParserError(format!(
1242             "Unsupported expression '{:?}' in JOIN condition",
1243             other
1244         )))),
1245     }
1246 }
1247 
1248 /// Extract join keys from a WHERE clause
extract_possible_join_keys( expr: &Expr, accum: &mut Vec<(String, String)>, ) -> Result<()>1249 fn extract_possible_join_keys(
1250     expr: &Expr,
1251     accum: &mut Vec<(String, String)>,
1252 ) -> Result<()> {
1253     match expr {
1254         Expr::BinaryExpr { left, op, right } => match op {
1255             Operator::Eq => match (left.as_ref(), right.as_ref()) {
1256                 (Expr::Column(l), Expr::Column(r)) => {
1257                     accum.push((l.to_owned(), r.to_owned()));
1258                     Ok(())
1259                 }
1260                 _ => Ok(()),
1261             },
1262             Operator::And => {
1263                 extract_possible_join_keys(left, accum)?;
1264                 extract_possible_join_keys(right, accum)
1265             }
1266             _ => Ok(()),
1267         },
1268         _ => Ok(()),
1269     }
1270 }
1271 
1272 /// Convert SQL data type to relational representation of data type
convert_data_type(sql: &SQLDataType) -> Result<DataType>1273 pub fn convert_data_type(sql: &SQLDataType) -> Result<DataType> {
1274     match sql {
1275         SQLDataType::Boolean => Ok(DataType::Boolean),
1276         SQLDataType::SmallInt => Ok(DataType::Int16),
1277         SQLDataType::Int => Ok(DataType::Int32),
1278         SQLDataType::BigInt => Ok(DataType::Int64),
1279         SQLDataType::Float(_) | SQLDataType::Real => Ok(DataType::Float64),
1280         SQLDataType::Double => Ok(DataType::Float64),
1281         SQLDataType::Char(_) | SQLDataType::Varchar(_) => Ok(DataType::Utf8),
1282         SQLDataType::Timestamp => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
1283         SQLDataType::Date => Ok(DataType::Date32),
1284         other => Err(DataFusionError::NotImplemented(format!(
1285             "Unsupported SQL type {:?}",
1286             other
1287         ))),
1288     }
1289 }
1290 
1291 #[cfg(test)]
1292 mod tests {
1293     use super::*;
1294     use crate::datasource::empty::EmptyTable;
1295     use crate::{logical_plan::create_udf, sql::parser::DFParser};
1296     use functions::ScalarFunctionImplementation;
1297 
1298     const PERSON_COLUMN_NAMES: &str =
1299         "id, first_name, last_name, age, state, salary, birth_date";
1300 
1301     #[test]
select_no_relation()1302     fn select_no_relation() {
1303         quick_test(
1304             "SELECT 1",
1305             "Projection: Int64(1)\
1306              \n  EmptyRelation",
1307         );
1308     }
1309 
1310     #[test]
select_column_does_not_exist()1311     fn select_column_does_not_exist() {
1312         let sql = "SELECT doesnotexist FROM person";
1313         let err = logical_plan(sql).expect_err("query should have failed");
1314         assert_eq!(
1315             format!(
1316                 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1317                 PERSON_COLUMN_NAMES
1318             ),
1319             format!("{:?}", err)
1320         );
1321     }
1322 
1323     #[test]
select_repeated_column()1324     fn select_repeated_column() {
1325         let sql = "SELECT age, age FROM person";
1326         let err = logical_plan(sql).expect_err("query should have failed");
1327         assert_eq!(
1328             "Plan(\"Projections require unique expression names but the expression \\\"#age\\\" at position 0 and \\\"#age\\\" at position 1 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1329             format!("{:?}", err)
1330         );
1331     }
1332 
1333     #[test]
select_wildcard_with_repeated_column()1334     fn select_wildcard_with_repeated_column() {
1335         let sql = "SELECT *, age FROM person";
1336         let err = logical_plan(sql).expect_err("query should have failed");
1337         assert_eq!(
1338             "Plan(\"Projections require unique expression names but the expression \\\"#age\\\" at position 3 and \\\"#age\\\" at position 7 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1339             format!("{:?}", err)
1340         );
1341     }
1342 
1343     #[test]
select_wildcard_with_repeated_column_but_is_aliased()1344     fn select_wildcard_with_repeated_column_but_is_aliased() {
1345         quick_test(
1346             "SELECT *, first_name AS fn from person",
1347             "Projection: #id, #first_name, #last_name, #age, #state, #salary, #birth_date, #first_name AS fn\
1348             \n  TableScan: person projection=None",
1349         );
1350     }
1351 
1352     #[test]
select_scalar_func_with_literal_no_relation()1353     fn select_scalar_func_with_literal_no_relation() {
1354         quick_test(
1355             "SELECT sqrt(9)",
1356             "Projection: sqrt(Int64(9))\
1357              \n  EmptyRelation",
1358         );
1359     }
1360 
1361     #[test]
select_simple_filter()1362     fn select_simple_filter() {
1363         let sql = "SELECT id, first_name, last_name \
1364                    FROM person WHERE state = 'CO'";
1365         let expected = "Projection: #id, #first_name, #last_name\
1366                         \n  Filter: #state Eq Utf8(\"CO\")\
1367                         \n    TableScan: person projection=None";
1368         quick_test(sql, expected);
1369     }
1370 
1371     #[test]
select_filter_column_does_not_exist()1372     fn select_filter_column_does_not_exist() {
1373         let sql = "SELECT first_name FROM person WHERE doesnotexist = 'A'";
1374         let err = logical_plan(sql).expect_err("query should have failed");
1375         assert_eq!(
1376             format!(
1377                 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1378                 PERSON_COLUMN_NAMES
1379             ),
1380             format!("{:?}", err)
1381         );
1382     }
1383 
1384     #[test]
select_filter_cannot_use_alias()1385     fn select_filter_cannot_use_alias() {
1386         let sql = "SELECT first_name AS x FROM person WHERE x = 'A'";
1387         let err = logical_plan(sql).expect_err("query should have failed");
1388         assert_eq!(
1389             format!(
1390                 "Plan(\"Invalid identifier \\\'x\\\' for schema {}\")",
1391                 PERSON_COLUMN_NAMES
1392             ),
1393             format!("{:?}", err)
1394         );
1395     }
1396 
1397     #[test]
select_neg_filter()1398     fn select_neg_filter() {
1399         let sql = "SELECT id, first_name, last_name \
1400                    FROM person WHERE NOT state";
1401         let expected = "Projection: #id, #first_name, #last_name\
1402                         \n  Filter: NOT #state\
1403                         \n    TableScan: person projection=None";
1404         quick_test(sql, expected);
1405     }
1406 
1407     #[test]
select_compound_filter()1408     fn select_compound_filter() {
1409         let sql = "SELECT id, first_name, last_name \
1410                    FROM person WHERE state = 'CO' AND age >= 21 AND age <= 65";
1411         let expected = "Projection: #id, #first_name, #last_name\
1412             \n  Filter: #state Eq Utf8(\"CO\") And #age GtEq Int64(21) And #age LtEq Int64(65)\
1413             \n    TableScan: person projection=None";
1414         quick_test(sql, expected);
1415     }
1416 
1417     #[test]
test_timestamp_filter()1418     fn test_timestamp_filter() {
1419         let sql =
1420             "SELECT state FROM person WHERE birth_date < CAST (158412331400600000 as timestamp)";
1421 
1422         let expected = "Projection: #state\
1423             \n  Filter: #birth_date Lt CAST(Int64(158412331400600000) AS Timestamp(Nanosecond, None))\
1424             \n    TableScan: person projection=None";
1425 
1426         quick_test(sql, expected);
1427     }
1428 
1429     #[test]
test_date_filter()1430     fn test_date_filter() {
1431         let sql =
1432             "SELECT state FROM person WHERE birth_date < CAST ('2020-01-01' as date)";
1433 
1434         let expected = "Projection: #state\
1435             \n  Filter: #birth_date Lt CAST(Utf8(\"2020-01-01\") AS Date32)\
1436             \n    TableScan: person projection=None";
1437 
1438         quick_test(sql, expected);
1439     }
1440 
1441     #[test]
select_all_boolean_operators()1442     fn select_all_boolean_operators() {
1443         let sql = "SELECT age, first_name, last_name \
1444                    FROM person \
1445                    WHERE age = 21 \
1446                    AND age != 21 \
1447                    AND age > 21 \
1448                    AND age >= 21 \
1449                    AND age < 65 \
1450                    AND age <= 65";
1451         let expected = "Projection: #age, #first_name, #last_name\
1452                         \n  Filter: #age Eq Int64(21) \
1453                         And #age NotEq Int64(21) \
1454                         And #age Gt Int64(21) \
1455                         And #age GtEq Int64(21) \
1456                         And #age Lt Int64(65) \
1457                         And #age LtEq Int64(65)\
1458                         \n    TableScan: person projection=None";
1459         quick_test(sql, expected);
1460     }
1461 
1462     #[test]
select_between()1463     fn select_between() {
1464         let sql = "SELECT state FROM person WHERE age BETWEEN 21 AND 65";
1465         let expected = "Projection: #state\
1466             \n  Filter: #age BETWEEN Int64(21) AND Int64(65)\
1467             \n    TableScan: person projection=None";
1468 
1469         quick_test(sql, expected);
1470     }
1471 
1472     #[test]
select_between_negated()1473     fn select_between_negated() {
1474         let sql = "SELECT state FROM person WHERE age NOT BETWEEN 21 AND 65";
1475         let expected = "Projection: #state\
1476             \n  Filter: #age NOT BETWEEN Int64(21) AND Int64(65)\
1477             \n    TableScan: person projection=None";
1478 
1479         quick_test(sql, expected);
1480     }
1481 
1482     #[test]
select_nested()1483     fn select_nested() {
1484         let sql = "SELECT fn2, last_name
1485                    FROM (
1486                      SELECT fn1 as fn2, last_name, birth_date
1487                      FROM (
1488                        SELECT first_name AS fn1, last_name, birth_date, age
1489                        FROM person
1490                      )
1491                    )";
1492         let expected = "Projection: #fn2, #last_name\
1493                         \n  Projection: #fn1 AS fn2, #last_name, #birth_date\
1494                         \n    Projection: #first_name AS fn1, #last_name, #birth_date, #age\
1495                         \n      TableScan: person projection=None";
1496         quick_test(sql, expected);
1497     }
1498 
1499     #[test]
select_nested_with_filters()1500     fn select_nested_with_filters() {
1501         let sql = "SELECT fn1, age
1502                    FROM (
1503                      SELECT first_name AS fn1, age
1504                      FROM person
1505                      WHERE age > 20
1506                    )
1507                    WHERE fn1 = 'X' AND age < 30";
1508 
1509         let expected = "Filter: #fn1 Eq Utf8(\"X\") And #age Lt Int64(30)\
1510                         \n  Projection: #first_name AS fn1, #age\
1511                         \n    Filter: #age Gt Int64(20)\
1512                         \n      TableScan: person projection=None";
1513 
1514         quick_test(sql, expected);
1515     }
1516 
1517     #[test]
select_with_having()1518     fn select_with_having() {
1519         let sql = "SELECT id, age
1520                    FROM person
1521                    HAVING age > 100 AND age < 200";
1522         let expected = "Projection: #id, #age\
1523                         \n  Filter: #age Gt Int64(100) And #age Lt Int64(200)\
1524                         \n    TableScan: person projection=None";
1525         quick_test(sql, expected);
1526     }
1527 
1528     #[test]
select_with_having_referencing_column_not_in_select()1529     fn select_with_having_referencing_column_not_in_select() {
1530         let sql = "SELECT id, age
1531                    FROM person
1532                    HAVING first_name = 'M'";
1533         let err = logical_plan(sql).expect_err("query should have failed");
1534         assert_eq!(
1535             "Plan(\"Having references column(s) not provided by the select\")",
1536             format!("{:?}", err)
1537         );
1538     }
1539 
1540     #[test]
select_with_having_referencing_column_nested_in_select_expression()1541     fn select_with_having_referencing_column_nested_in_select_expression() {
1542         let sql = "SELECT id, age + 1
1543                    FROM person
1544                    HAVING age > 100";
1545         let err = logical_plan(sql).expect_err("query should have failed");
1546         assert_eq!(
1547             "Plan(\"Having references column(s) not provided by the select\")",
1548             format!("{:?}", err)
1549         );
1550     }
1551 
1552     #[test]
select_with_having_with_aggregate_not_in_select()1553     fn select_with_having_with_aggregate_not_in_select() {
1554         let sql = "SELECT first_name
1555                    FROM person
1556                    HAVING MAX(age) > 100";
1557         let err = logical_plan(sql).expect_err("query should have failed");
1558         assert_eq!(
1559             "Plan(\"Projection references non-aggregate values\")",
1560             format!("{:?}", err)
1561         );
1562     }
1563 
1564     #[test]
select_aggregate_with_having_that_reuses_aggregate()1565     fn select_aggregate_with_having_that_reuses_aggregate() {
1566         let sql = "SELECT MAX(age)
1567                    FROM person
1568                    HAVING MAX(age) < 30";
1569         let expected = "Filter: #MAX(age) Lt Int64(30)\
1570                         \n  Aggregate: groupBy=[[]], aggr=[[MAX(#age)]]\
1571                         \n    TableScan: person projection=None";
1572         quick_test(sql, expected);
1573     }
1574 
1575     #[test]
select_aggregate_with_having_with_aggregate_not_in_select()1576     fn select_aggregate_with_having_with_aggregate_not_in_select() {
1577         let sql = "SELECT MAX(age)
1578                    FROM person
1579                    HAVING MAX(first_name) > 'M'";
1580         let expected = "Projection: #MAX(age)\
1581                         \n  Filter: #MAX(first_name) Gt Utf8(\"M\")\
1582                         \n    Aggregate: groupBy=[[]], aggr=[[MAX(#age), MAX(#first_name)]]\
1583                         \n      TableScan: person projection=None";
1584         quick_test(sql, expected);
1585     }
1586 
1587     #[test]
select_aggregate_with_having_referencing_column_not_in_select()1588     fn select_aggregate_with_having_referencing_column_not_in_select() {
1589         let sql = "SELECT COUNT(*)
1590                    FROM person
1591                    HAVING first_name = 'M'";
1592         let err = logical_plan(sql).expect_err("query should have failed");
1593         assert_eq!(
1594             "Plan(\"Having references non-aggregate values\")",
1595             format!("{:?}", err)
1596         );
1597     }
1598 
1599     #[test]
select_aggregate_aliased_with_having_referencing_aggregate_by_its_alias()1600     fn select_aggregate_aliased_with_having_referencing_aggregate_by_its_alias() {
1601         let sql = "SELECT MAX(age) as max_age
1602                    FROM person
1603                    HAVING max_age < 30";
1604         let expected = "Projection: #MAX(age) AS max_age\
1605                         \n  Filter: #MAX(age) Lt Int64(30)\
1606                         \n    Aggregate: groupBy=[[]], aggr=[[MAX(#age)]]\
1607                         \n      TableScan: person projection=None";
1608         quick_test(sql, expected);
1609     }
1610 
1611     #[test]
select_aggregate_aliased_with_having_that_reuses_aggregate_but_not_by_its_alias()1612     fn select_aggregate_aliased_with_having_that_reuses_aggregate_but_not_by_its_alias() {
1613         let sql = "SELECT MAX(age) as max_age
1614                    FROM person
1615                    HAVING MAX(age) < 30";
1616         let expected = "Projection: #MAX(age) AS max_age\
1617                         \n  Filter: #MAX(age) Lt Int64(30)\
1618                         \n    Aggregate: groupBy=[[]], aggr=[[MAX(#age)]]\
1619                         \n      TableScan: person projection=None";
1620         quick_test(sql, expected);
1621     }
1622 
1623     #[test]
select_aggregate_with_group_by_with_having()1624     fn select_aggregate_with_group_by_with_having() {
1625         let sql = "SELECT first_name, MAX(age)
1626                    FROM person
1627                    GROUP BY first_name
1628                    HAVING first_name = 'M'";
1629         let expected = "Filter: #first_name Eq Utf8(\"M\")\
1630                         \n  Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1631                         \n    TableScan: person projection=None";
1632         quick_test(sql, expected);
1633     }
1634 
1635     #[test]
select_aggregate_with_group_by_with_having_and_where()1636     fn select_aggregate_with_group_by_with_having_and_where() {
1637         let sql = "SELECT first_name, MAX(age)
1638                    FROM person
1639                    WHERE id > 5
1640                    GROUP BY first_name
1641                    HAVING MAX(age) < 100";
1642         let expected = "Filter: #MAX(age) Lt Int64(100)\
1643                         \n  Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1644                         \n    Filter: #id Gt Int64(5)\
1645                         \n      TableScan: person projection=None";
1646         quick_test(sql, expected);
1647     }
1648 
1649     #[test]
select_aggregate_with_group_by_with_having_and_where_filtering_on_aggregate_column( )1650     fn select_aggregate_with_group_by_with_having_and_where_filtering_on_aggregate_column(
1651     ) {
1652         let sql = "SELECT first_name, MAX(age)
1653                    FROM person
1654                    WHERE id > 5 AND age > 18
1655                    GROUP BY first_name
1656                    HAVING MAX(age) < 100";
1657         let expected = "Filter: #MAX(age) Lt Int64(100)\
1658                         \n  Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1659                         \n    Filter: #id Gt Int64(5) And #age Gt Int64(18)\
1660                         \n      TableScan: person projection=None";
1661         quick_test(sql, expected);
1662     }
1663 
1664     #[test]
select_aggregate_with_group_by_with_having_using_column_by_alias()1665     fn select_aggregate_with_group_by_with_having_using_column_by_alias() {
1666         let sql = "SELECT first_name AS fn, MAX(age)
1667                    FROM person
1668                    GROUP BY first_name
1669                    HAVING MAX(age) > 2 AND fn = 'M'";
1670         let expected = "Projection: #first_name AS fn, #MAX(age)\
1671                         \n  Filter: #MAX(age) Gt Int64(2) And #first_name Eq Utf8(\"M\")\
1672                         \n    Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1673                         \n      TableScan: person projection=None";
1674         quick_test(sql, expected);
1675     }
1676 
1677     #[test]
select_aggregate_with_group_by_with_having_using_columns_with_and_without_their_aliases( )1678     fn select_aggregate_with_group_by_with_having_using_columns_with_and_without_their_aliases(
1679     ) {
1680         let sql = "SELECT first_name AS fn, MAX(age) AS max_age
1681                    FROM person
1682                    GROUP BY first_name
1683                    HAVING MAX(age) > 2 AND max_age < 5 AND first_name = 'M' AND fn = 'N'";
1684         let expected = "Projection: #first_name AS fn, #MAX(age) AS max_age\
1685                         \n  Filter: #MAX(age) Gt Int64(2) And #MAX(age) Lt Int64(5) And #first_name Eq Utf8(\"M\") And #first_name Eq Utf8(\"N\")\
1686                         \n    Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1687                         \n      TableScan: person projection=None";
1688         quick_test(sql, expected);
1689     }
1690 
1691     #[test]
select_aggregate_with_group_by_with_having_that_reuses_aggregate()1692     fn select_aggregate_with_group_by_with_having_that_reuses_aggregate() {
1693         let sql = "SELECT first_name, MAX(age)
1694                    FROM person
1695                    GROUP BY first_name
1696                    HAVING MAX(age) > 100";
1697         let expected = "Filter: #MAX(age) Gt Int64(100)\
1698                         \n  Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1699                         \n    TableScan: person projection=None";
1700         quick_test(sql, expected);
1701     }
1702 
1703     #[test]
select_aggregate_with_group_by_with_having_referencing_column_not_in_group_by()1704     fn select_aggregate_with_group_by_with_having_referencing_column_not_in_group_by() {
1705         let sql = "SELECT first_name, MAX(age)
1706                    FROM person
1707                    GROUP BY first_name
1708                    HAVING MAX(age) > 10 AND last_name = 'M'";
1709         let err = logical_plan(sql).expect_err("query should have failed");
1710         assert_eq!(
1711             "Plan(\"Having references non-aggregate values\")",
1712             format!("{:?}", err)
1713         );
1714     }
1715 
1716     #[test]
select_aggregate_with_group_by_with_having_that_reuses_aggregate_multiple_times()1717     fn select_aggregate_with_group_by_with_having_that_reuses_aggregate_multiple_times() {
1718         let sql = "SELECT first_name, MAX(age)
1719                    FROM person
1720                    GROUP BY first_name
1721                    HAVING MAX(age) > 100 AND MAX(age) < 200";
1722         let expected = "Filter: #MAX(age) Gt Int64(100) And #MAX(age) Lt Int64(200)\
1723                         \n  Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1724                         \n    TableScan: person projection=None";
1725         quick_test(sql, expected);
1726     }
1727 
1728     #[test]
select_aggregate_with_group_by_with_having_using_aggreagate_not_in_select()1729     fn select_aggregate_with_group_by_with_having_using_aggreagate_not_in_select() {
1730         let sql = "SELECT first_name, MAX(age)
1731                    FROM person
1732                    GROUP BY first_name
1733                    HAVING MAX(age) > 100 AND MIN(id) < 50";
1734         let expected = "Projection: #first_name, #MAX(age)\
1735                         \n  Filter: #MAX(age) Gt Int64(100) And #MIN(id) Lt Int64(50)\
1736                         \n    Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age), MIN(#id)]]\
1737                         \n      TableScan: person projection=None";
1738         quick_test(sql, expected);
1739     }
1740 
1741     #[test]
select_aggregate_aliased_with_group_by_with_having_referencing_aggregate_by_its_alias( )1742     fn select_aggregate_aliased_with_group_by_with_having_referencing_aggregate_by_its_alias(
1743     ) {
1744         let sql = "SELECT first_name, MAX(age) AS max_age
1745                    FROM person
1746                    GROUP BY first_name
1747                    HAVING max_age > 100";
1748         let expected = "Projection: #first_name, #MAX(age) AS max_age\
1749                         \n  Filter: #MAX(age) Gt Int64(100)\
1750                         \n    Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1751                         \n      TableScan: person projection=None";
1752         quick_test(sql, expected);
1753     }
1754 
1755     #[test]
select_aggregate_compound_aliased_with_group_by_with_having_referencing_compound_aggregate_by_its_alias( )1756     fn select_aggregate_compound_aliased_with_group_by_with_having_referencing_compound_aggregate_by_its_alias(
1757     ) {
1758         let sql = "SELECT first_name, MAX(age) + 1 AS max_age_plus_one
1759                    FROM person
1760                    GROUP BY first_name
1761                    HAVING max_age_plus_one > 100";
1762         let expected =
1763             "Projection: #first_name, #MAX(age) Plus Int64(1) AS max_age_plus_one\
1764                         \n  Filter: #MAX(age) Plus Int64(1) Gt Int64(100)\
1765                         \n    Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1766                         \n      TableScan: person projection=None";
1767         quick_test(sql, expected);
1768     }
1769 
1770     #[test]
select_aggregate_with_group_by_with_having_using_derived_column_aggreagate_not_in_select( )1771     fn select_aggregate_with_group_by_with_having_using_derived_column_aggreagate_not_in_select(
1772     ) {
1773         let sql = "SELECT first_name, MAX(age)
1774                    FROM person
1775                    GROUP BY first_name
1776                    HAVING MAX(age) > 100 AND MIN(id - 2) < 50";
1777         let expected = "Projection: #first_name, #MAX(age)\
1778                         \n  Filter: #MAX(age) Gt Int64(100) And #MIN(id Minus Int64(2)) Lt Int64(50)\
1779                         \n    Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age), MIN(#id Minus Int64(2))]]\
1780                         \n      TableScan: person projection=None";
1781         quick_test(sql, expected);
1782     }
1783 
1784     #[test]
select_aggregate_with_group_by_with_having_using_count_star_not_in_select()1785     fn select_aggregate_with_group_by_with_having_using_count_star_not_in_select() {
1786         let sql = "SELECT first_name, MAX(age)
1787                    FROM person
1788                    GROUP BY first_name
1789                    HAVING MAX(age) > 100 AND COUNT(*) < 50";
1790         let expected = "Projection: #first_name, #MAX(age)\
1791                         \n  Filter: #MAX(age) Gt Int64(100) And #COUNT(UInt8(1)) Lt Int64(50)\
1792                         \n    Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age), COUNT(UInt8(1))]]\
1793                         \n      TableScan: person projection=None";
1794         quick_test(sql, expected);
1795     }
1796 
1797     #[test]
select_binary_expr()1798     fn select_binary_expr() {
1799         let sql = "SELECT age + salary from person";
1800         let expected = "Projection: #age Plus #salary\
1801                         \n  TableScan: person projection=None";
1802         quick_test(sql, expected);
1803     }
1804 
1805     #[test]
select_binary_expr_nested()1806     fn select_binary_expr_nested() {
1807         let sql = "SELECT (age + salary)/2 from person";
1808         let expected = "Projection: #age Plus #salary Divide Int64(2)\
1809                         \n  TableScan: person projection=None";
1810         quick_test(sql, expected);
1811     }
1812 
1813     #[test]
select_wildcard_with_groupby()1814     fn select_wildcard_with_groupby() {
1815         quick_test(
1816             "SELECT * FROM person GROUP BY id, first_name, last_name, age, state, salary, birth_date",
1817             "Aggregate: groupBy=[[#id, #first_name, #last_name, #age, #state, #salary, #birth_date]], aggr=[[]]\
1818              \n  TableScan: person projection=None",
1819         );
1820         quick_test(
1821             "SELECT * FROM (SELECT first_name, last_name FROM person) GROUP BY first_name, last_name",
1822             "Aggregate: groupBy=[[#first_name, #last_name]], aggr=[[]]\
1823              \n  Projection: #first_name, #last_name\
1824              \n    TableScan: person projection=None",
1825         );
1826     }
1827 
1828     #[test]
select_simple_aggregate()1829     fn select_simple_aggregate() {
1830         quick_test(
1831             "SELECT MIN(age) FROM person",
1832             "Aggregate: groupBy=[[]], aggr=[[MIN(#age)]]\
1833              \n  TableScan: person projection=None",
1834         );
1835     }
1836 
1837     #[test]
test_sum_aggregate()1838     fn test_sum_aggregate() {
1839         quick_test(
1840             "SELECT SUM(age) from person",
1841             "Aggregate: groupBy=[[]], aggr=[[SUM(#age)]]\
1842              \n  TableScan: person projection=None",
1843         );
1844     }
1845 
1846     #[test]
select_simple_aggregate_column_does_not_exist()1847     fn select_simple_aggregate_column_does_not_exist() {
1848         let sql = "SELECT MIN(doesnotexist) FROM person";
1849         let err = logical_plan(sql).expect_err("query should have failed");
1850         assert_eq!(
1851             format!(
1852                 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1853                 PERSON_COLUMN_NAMES
1854             ),
1855             format!("{:?}", err)
1856         );
1857     }
1858 
1859     #[test]
select_simple_aggregate_repeated_aggregate()1860     fn select_simple_aggregate_repeated_aggregate() {
1861         let sql = "SELECT MIN(age), MIN(age) FROM person";
1862         let err = logical_plan(sql).expect_err("query should have failed");
1863         assert_eq!(
1864             "Plan(\"Projections require unique expression names but the expression \\\"#MIN(age)\\\" at position 0 and \\\"#MIN(age)\\\" at position 1 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1865             format!("{:?}", err)
1866         );
1867     }
1868 
1869     #[test]
select_simple_aggregate_repeated_aggregate_with_single_alias()1870     fn select_simple_aggregate_repeated_aggregate_with_single_alias() {
1871         quick_test(
1872             "SELECT MIN(age), MIN(age) AS a FROM person",
1873             "Projection: #MIN(age), #MIN(age) AS a\
1874              \n  Aggregate: groupBy=[[]], aggr=[[MIN(#age)]]\
1875              \n    TableScan: person projection=None",
1876         );
1877     }
1878 
1879     #[test]
select_simple_aggregate_repeated_aggregate_with_unique_aliases()1880     fn select_simple_aggregate_repeated_aggregate_with_unique_aliases() {
1881         quick_test(
1882             "SELECT MIN(age) AS a, MIN(age) AS b FROM person",
1883             "Projection: #MIN(age) AS a, #MIN(age) AS b\
1884              \n  Aggregate: groupBy=[[]], aggr=[[MIN(#age)]]\
1885              \n    TableScan: person projection=None",
1886         );
1887     }
1888 
1889     #[test]
select_simple_aggregate_repeated_aggregate_with_repeated_aliases()1890     fn select_simple_aggregate_repeated_aggregate_with_repeated_aliases() {
1891         let sql = "SELECT MIN(age) AS a, MIN(age) AS a FROM person";
1892         let err = logical_plan(sql).expect_err("query should have failed");
1893         assert_eq!(
1894             "Plan(\"Projections require unique expression names but the expression \\\"#MIN(age) AS a\\\" at position 0 and \\\"#MIN(age) AS a\\\" at position 1 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1895             format!("{:?}", err)
1896         );
1897     }
1898 
1899     #[test]
select_simple_aggregate_with_groupby()1900     fn select_simple_aggregate_with_groupby() {
1901         quick_test(
1902             "SELECT state, MIN(age), MAX(age) FROM person GROUP BY state",
1903             "Aggregate: groupBy=[[#state]], aggr=[[MIN(#age), MAX(#age)]]\
1904              \n  TableScan: person projection=None",
1905         );
1906     }
1907 
1908     #[test]
select_simple_aggregate_with_groupby_with_aliases()1909     fn select_simple_aggregate_with_groupby_with_aliases() {
1910         quick_test(
1911             "SELECT state AS a, MIN(age) AS b FROM person GROUP BY state",
1912             "Projection: #state AS a, #MIN(age) AS b\
1913              \n  Aggregate: groupBy=[[#state]], aggr=[[MIN(#age)]]\
1914              \n    TableScan: person projection=None",
1915         );
1916     }
1917 
1918     #[test]
select_simple_aggregate_with_groupby_with_aliases_repeated()1919     fn select_simple_aggregate_with_groupby_with_aliases_repeated() {
1920         let sql = "SELECT state AS a, MIN(age) AS a FROM person GROUP BY state";
1921         let err = logical_plan(sql).expect_err("query should have failed");
1922         assert_eq!(
1923             "Plan(\"Projections require unique expression names but the expression \\\"#state AS a\\\" at position 0 and \\\"#MIN(age) AS a\\\" at position 1 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1924             format!("{:?}", err)
1925         );
1926     }
1927 
1928     #[test]
select_simple_aggregate_with_groupby_column_unselected()1929     fn select_simple_aggregate_with_groupby_column_unselected() {
1930         quick_test(
1931             "SELECT MIN(age), MAX(age) FROM person GROUP BY state",
1932             "Projection: #MIN(age), #MAX(age)\
1933              \n  Aggregate: groupBy=[[#state]], aggr=[[MIN(#age), MAX(#age)]]\
1934              \n    TableScan: person projection=None",
1935         );
1936     }
1937 
1938     #[test]
select_simple_aggregate_with_groupby_and_column_in_group_by_does_not_exist()1939     fn select_simple_aggregate_with_groupby_and_column_in_group_by_does_not_exist() {
1940         let sql = "SELECT SUM(age) FROM person GROUP BY doesnotexist";
1941         let err = logical_plan(sql).expect_err("query should have failed");
1942         assert_eq!(
1943             format!(
1944                 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1945                 PERSON_COLUMN_NAMES
1946             ),
1947             format!("{:?}", err)
1948         );
1949     }
1950 
1951     #[test]
select_simple_aggregate_with_groupby_and_column_in_aggregate_does_not_exist()1952     fn select_simple_aggregate_with_groupby_and_column_in_aggregate_does_not_exist() {
1953         let sql = "SELECT SUM(doesnotexist) FROM person GROUP BY first_name";
1954         let err = logical_plan(sql).expect_err("query should have failed");
1955         assert_eq!(
1956             format!(
1957                 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1958                 PERSON_COLUMN_NAMES
1959             ),
1960             format!("{:?}", err)
1961         );
1962     }
1963 
1964     #[test]
select_interval_out_of_range()1965     fn select_interval_out_of_range() {
1966         let sql = "SELECT INTERVAL '100000000000000000 day'";
1967         let err = logical_plan(sql).expect_err("query should have failed");
1968         assert_eq!(
1969             "NotImplemented(\"Interval field value out of range: \\\"100000000000000000 day\\\"\")",
1970             format!("{:?}", err)
1971         );
1972     }
1973 
1974     #[test]
select_unsupported_complex_interval()1975     fn select_unsupported_complex_interval() {
1976         let sql = "SELECT INTERVAL '1 year 1 day'";
1977         let err = logical_plan(sql).expect_err("query should have failed");
1978         assert_eq!(
1979             "NotImplemented(\"DF does not support intervals that have both a Year/Month part as well as Days/Hours/Mins/Seconds: \\\"1 year 1 day\\\". Hint: try breaking the interval into two parts, one with Year/Month and the other with Days/Hours/Mins/Seconds - e.g. (NOW() + INTERVAL \\\'1 year\\\') + INTERVAL \\\'1 day\\\'\")",
1980             format!("{:?}", err)
1981         );
1982     }
1983 
1984     #[test]
select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby()1985     fn select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby() {
1986         quick_test(
1987             "SELECT MAX(first_name) FROM person GROUP BY first_name",
1988             "Projection: #MAX(first_name)\
1989              \n  Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#first_name)]]\
1990              \n    TableScan: person projection=None",
1991         );
1992     }
1993 
1994     #[test]
select_simple_aggregate_with_groupby_cannot_use_alias()1995     fn select_simple_aggregate_with_groupby_cannot_use_alias() {
1996         let sql = "SELECT state AS x, MAX(age) FROM person GROUP BY x";
1997         let err = logical_plan(sql).expect_err("query should have failed");
1998         assert_eq!(
1999             format!(
2000                 "Plan(\"Invalid identifier \\\'x\\\' for schema {}\")",
2001                 PERSON_COLUMN_NAMES
2002             ),
2003             format!("{:?}", err)
2004         );
2005     }
2006 
2007     #[test]
select_simple_aggregate_with_groupby_aggregate_repeated()2008     fn select_simple_aggregate_with_groupby_aggregate_repeated() {
2009         let sql = "SELECT state, MIN(age), MIN(age) FROM person GROUP BY state";
2010         let err = logical_plan(sql).expect_err("query should have failed");
2011         assert_eq!(
2012             "Plan(\"Projections require unique expression names but the expression \\\"#MIN(age)\\\" at position 1 and \\\"#MIN(age)\\\" at position 2 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
2013             format!("{:?}", err)
2014         );
2015     }
2016 
2017     #[test]
select_simple_aggregate_with_groupby_aggregate_repeated_and_one_has_alias()2018     fn select_simple_aggregate_with_groupby_aggregate_repeated_and_one_has_alias() {
2019         quick_test(
2020             "SELECT state, MIN(age), MIN(age) AS ma FROM person GROUP BY state",
2021             "Projection: #state, #MIN(age), #MIN(age) AS ma\
2022              \n  Aggregate: groupBy=[[#state]], aggr=[[MIN(#age)]]\
2023              \n    TableScan: person projection=None",
2024         )
2025     }
2026     #[test]
select_simple_aggregate_with_groupby_non_column_expression_unselected()2027     fn select_simple_aggregate_with_groupby_non_column_expression_unselected() {
2028         quick_test(
2029             "SELECT MIN(first_name) FROM person GROUP BY age + 1",
2030             "Projection: #MIN(first_name)\
2031              \n  Aggregate: groupBy=[[#age Plus Int64(1)]], aggr=[[MIN(#first_name)]]\
2032              \n    TableScan: person projection=None",
2033         );
2034     }
2035 
2036     #[test]
select_simple_aggregate_with_groupby_non_column_expression_selected_and_resolvable( )2037     fn select_simple_aggregate_with_groupby_non_column_expression_selected_and_resolvable(
2038     ) {
2039         quick_test(
2040             "SELECT age + 1, MIN(first_name) FROM person GROUP BY age + 1",
2041             "Aggregate: groupBy=[[#age Plus Int64(1)]], aggr=[[MIN(#first_name)]]\
2042              \n  TableScan: person projection=None",
2043         );
2044         quick_test(
2045             "SELECT MIN(first_name), age + 1 FROM person GROUP BY age + 1",
2046             "Projection: #MIN(first_name), #age Plus Int64(1)\
2047              \n  Aggregate: groupBy=[[#age Plus Int64(1)]], aggr=[[MIN(#first_name)]]\
2048              \n    TableScan: person projection=None",
2049         );
2050     }
2051 
2052     #[test]
select_simple_aggregate_with_groupby_non_column_expression_nested_and_resolvable()2053     fn select_simple_aggregate_with_groupby_non_column_expression_nested_and_resolvable()
2054     {
2055         quick_test(
2056             "SELECT ((age + 1) / 2) * (age + 1), MIN(first_name) FROM person GROUP BY age + 1",
2057             "Projection: #age Plus Int64(1) Divide Int64(2) Multiply #age Plus Int64(1), #MIN(first_name)\
2058              \n  Aggregate: groupBy=[[#age Plus Int64(1)]], aggr=[[MIN(#first_name)]]\
2059              \n    TableScan: person projection=None",
2060         );
2061     }
2062 
2063     #[test]
select_simple_aggregate_with_groupby_non_column_expression_nested_and_not_resolvable( )2064     fn select_simple_aggregate_with_groupby_non_column_expression_nested_and_not_resolvable(
2065     ) {
2066         // The query should fail, because age + 9 is not in the group by.
2067         let sql =
2068             "SELECT ((age + 1) / 2) * (age + 9), MIN(first_name) FROM person GROUP BY age + 1";
2069         let err = logical_plan(sql).expect_err("query should have failed");
2070         assert_eq!(
2071             "Plan(\"Projection references non-aggregate values\")",
2072             format!("{:?}", err)
2073         );
2074     }
2075 
2076     #[test]
select_simple_aggregate_with_groupby_non_column_expression_and_its_column_selected( )2077     fn select_simple_aggregate_with_groupby_non_column_expression_and_its_column_selected(
2078     ) {
2079         let sql = "SELECT age, MIN(first_name) FROM person GROUP BY age + 1";
2080         let err = logical_plan(sql).expect_err("query should have failed");
2081         assert_eq!(
2082             "Plan(\"Projection references non-aggregate values\")",
2083             format!("{:?}", err)
2084         );
2085     }
2086 
2087     #[test]
select_simple_aggregate_nested_in_binary_expr_with_groupby()2088     fn select_simple_aggregate_nested_in_binary_expr_with_groupby() {
2089         quick_test(
2090             "SELECT state, MIN(age) < 10 FROM person GROUP BY state",
2091             "Projection: #state, #MIN(age) Lt Int64(10)\
2092              \n  Aggregate: groupBy=[[#state]], aggr=[[MIN(#age)]]\
2093              \n    TableScan: person projection=None",
2094         );
2095     }
2096 
2097     #[test]
select_simple_aggregate_and_nested_groupby_column()2098     fn select_simple_aggregate_and_nested_groupby_column() {
2099         quick_test(
2100             "SELECT age + 1, MAX(first_name) FROM person GROUP BY age",
2101             "Projection: #age Plus Int64(1), #MAX(first_name)\
2102              \n  Aggregate: groupBy=[[#age]], aggr=[[MAX(#first_name)]]\
2103              \n    TableScan: person projection=None",
2104         );
2105     }
2106 
2107     #[test]
select_aggregate_compounded_with_groupby_column()2108     fn select_aggregate_compounded_with_groupby_column() {
2109         quick_test(
2110             "SELECT age + MIN(salary) FROM person GROUP BY age",
2111             "Projection: #age Plus #MIN(salary)\
2112              \n  Aggregate: groupBy=[[#age]], aggr=[[MIN(#salary)]]\
2113              \n    TableScan: person projection=None",
2114         );
2115     }
2116 
2117     #[test]
select_aggregate_with_non_column_inner_expression_with_groupby()2118     fn select_aggregate_with_non_column_inner_expression_with_groupby() {
2119         quick_test(
2120             "SELECT state, MIN(age + 1) FROM person GROUP BY state",
2121             "Aggregate: groupBy=[[#state]], aggr=[[MIN(#age Plus Int64(1))]]\
2122              \n  TableScan: person projection=None",
2123         );
2124     }
2125 
2126     #[test]
test_wildcard()2127     fn test_wildcard() {
2128         quick_test(
2129             "SELECT * from person",
2130             "Projection: #id, #first_name, #last_name, #age, #state, #salary, #birth_date\
2131             \n  TableScan: person projection=None",
2132         );
2133     }
2134 
2135     #[test]
select_count_one()2136     fn select_count_one() {
2137         let sql = "SELECT COUNT(1) FROM person";
2138         let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
2139                         \n  TableScan: person projection=None";
2140         quick_test(sql, expected);
2141     }
2142 
2143     #[test]
select_count_column()2144     fn select_count_column() {
2145         let sql = "SELECT COUNT(id) FROM person";
2146         let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(#id)]]\
2147                         \n  TableScan: person projection=None";
2148         quick_test(sql, expected);
2149     }
2150 
2151     #[test]
select_scalar_func()2152     fn select_scalar_func() {
2153         let sql = "SELECT sqrt(age) FROM person";
2154         let expected = "Projection: sqrt(#age)\
2155                         \n  TableScan: person projection=None";
2156         quick_test(sql, expected);
2157     }
2158 
2159     #[test]
select_aliased_scalar_func()2160     fn select_aliased_scalar_func() {
2161         let sql = "SELECT sqrt(age) AS square_people FROM person";
2162         let expected = "Projection: sqrt(#age) AS square_people\
2163                         \n  TableScan: person projection=None";
2164         quick_test(sql, expected);
2165     }
2166 
2167     #[test]
select_where_nullif_division()2168     fn select_where_nullif_division() {
2169         let sql = "SELECT c3/(c4+c5) \
2170                    FROM aggregate_test_100 WHERE c3/nullif(c4+c5, 0) > 0.1";
2171         let expected = "Projection: #c3 Divide #c4 Plus #c5\
2172             \n  Filter: #c3 Divide nullif(#c4 Plus #c5, Int64(0)) Gt Float64(0.1)\
2173             \n    TableScan: aggregate_test_100 projection=None";
2174         quick_test(sql, expected);
2175     }
2176 
2177     #[test]
select_where_with_negative_operator()2178     fn select_where_with_negative_operator() {
2179         let sql = "SELECT c3 FROM aggregate_test_100 WHERE c3 > -0.1 AND -c4 > 0";
2180         let expected = "Projection: #c3\
2181             \n  Filter: #c3 Gt Float64(-0.1) And (- #c4) Gt Int64(0)\
2182             \n    TableScan: aggregate_test_100 projection=None";
2183         quick_test(sql, expected);
2184     }
2185 
2186     #[test]
select_where_with_positive_operator()2187     fn select_where_with_positive_operator() {
2188         let sql = "SELECT c3 FROM aggregate_test_100 WHERE c3 > +0.1 AND +c4 > 0";
2189         let expected = "Projection: #c3\
2190             \n  Filter: #c3 Gt Float64(0.1) And #c4 Gt Int64(0)\
2191             \n    TableScan: aggregate_test_100 projection=None";
2192         quick_test(sql, expected);
2193     }
2194 
2195     #[test]
select_order_by()2196     fn select_order_by() {
2197         let sql = "SELECT id FROM person ORDER BY id";
2198         let expected = "Sort: #id ASC NULLS FIRST\
2199                         \n  Projection: #id\
2200                         \n    TableScan: person projection=None";
2201         quick_test(sql, expected);
2202     }
2203 
2204     #[test]
select_order_by_desc()2205     fn select_order_by_desc() {
2206         let sql = "SELECT id FROM person ORDER BY id DESC";
2207         let expected = "Sort: #id DESC NULLS FIRST\
2208                         \n  Projection: #id\
2209                         \n    TableScan: person projection=None";
2210         quick_test(sql, expected);
2211     }
2212 
2213     #[test]
select_order_by_nulls_last()2214     fn select_order_by_nulls_last() {
2215         quick_test(
2216             "SELECT id FROM person ORDER BY id DESC NULLS LAST",
2217             "Sort: #id DESC NULLS LAST\
2218             \n  Projection: #id\
2219             \n    TableScan: person projection=None",
2220         );
2221 
2222         quick_test(
2223             "SELECT id FROM person ORDER BY id NULLS LAST",
2224             "Sort: #id ASC NULLS LAST\
2225             \n  Projection: #id\
2226             \n    TableScan: person projection=None",
2227         );
2228     }
2229 
2230     #[test]
select_group_by()2231     fn select_group_by() {
2232         let sql = "SELECT state FROM person GROUP BY state";
2233         let expected = "Aggregate: groupBy=[[#state]], aggr=[[]]\
2234                         \n  TableScan: person projection=None";
2235 
2236         quick_test(sql, expected);
2237     }
2238 
2239     #[test]
select_group_by_columns_not_in_select()2240     fn select_group_by_columns_not_in_select() {
2241         let sql = "SELECT MAX(age) FROM person GROUP BY state";
2242         let expected = "Projection: #MAX(age)\
2243                         \n  Aggregate: groupBy=[[#state]], aggr=[[MAX(#age)]]\
2244                         \n    TableScan: person projection=None";
2245 
2246         quick_test(sql, expected);
2247     }
2248 
2249     #[test]
select_group_by_count_star()2250     fn select_group_by_count_star() {
2251         let sql = "SELECT state, COUNT(*) FROM person GROUP BY state";
2252         let expected = "Aggregate: groupBy=[[#state]], aggr=[[COUNT(UInt8(1))]]\
2253                         \n  TableScan: person projection=None";
2254 
2255         quick_test(sql, expected);
2256     }
2257 
2258     #[test]
select_group_by_needs_projection()2259     fn select_group_by_needs_projection() {
2260         let sql = "SELECT COUNT(state), state FROM person GROUP BY state";
2261         let expected = "\
2262         Projection: #COUNT(state), #state\
2263         \n  Aggregate: groupBy=[[#state]], aggr=[[COUNT(#state)]]\
2264         \n    TableScan: person projection=None";
2265 
2266         quick_test(sql, expected);
2267     }
2268 
2269     #[test]
select_7480_1()2270     fn select_7480_1() {
2271         let sql = "SELECT c1, MIN(c12) FROM aggregate_test_100 GROUP BY c1, c13";
2272         let expected = "Projection: #c1, #MIN(c12)\
2273                        \n  Aggregate: groupBy=[[#c1, #c13]], aggr=[[MIN(#c12)]]\
2274                        \n    TableScan: aggregate_test_100 projection=None";
2275         quick_test(sql, expected);
2276     }
2277 
2278     #[test]
select_7480_2()2279     fn select_7480_2() {
2280         let sql = "SELECT c1, c13, MIN(c12) FROM aggregate_test_100 GROUP BY c1";
2281         let err = logical_plan(sql).expect_err("query should have failed");
2282         assert_eq!(
2283             "Plan(\"Projection references non-aggregate values\")",
2284             format!("{:?}", err)
2285         );
2286     }
2287 
2288     #[test]
create_external_table_csv()2289     fn create_external_table_csv() {
2290         let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'";
2291         let expected = "CreateExternalTable: \"t\"";
2292         quick_test(sql, expected);
2293     }
2294 
2295     #[test]
create_external_table_csv_no_schema()2296     fn create_external_table_csv_no_schema() {
2297         let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv'";
2298         let err = logical_plan(sql).expect_err("query should have failed");
2299         assert_eq!(
2300             "Plan(\"Column definitions required for CSV files. None found\")",
2301             format!("{:?}", err)
2302         );
2303     }
2304 
2305     #[test]
create_external_table_parquet()2306     fn create_external_table_parquet() {
2307         let sql =
2308             "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 'foo.parquet'";
2309         let err = logical_plan(sql).expect_err("query should have failed");
2310         assert_eq!(
2311             "Plan(\"Column definitions can not be specified for PARQUET files.\")",
2312             format!("{:?}", err)
2313         );
2314     }
2315 
2316     #[test]
create_external_table_parquet_no_schema()2317     fn create_external_table_parquet_no_schema() {
2318         let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet'";
2319         let expected = "CreateExternalTable: \"t\"";
2320         quick_test(sql, expected);
2321     }
2322 
2323     #[test]
equijoin_explicit_syntax()2324     fn equijoin_explicit_syntax() {
2325         let sql = "SELECT id, order_id \
2326             FROM person \
2327             JOIN orders \
2328             ON id = customer_id";
2329         let expected = "Projection: #id, #order_id\
2330         \n  Join: id = customer_id\
2331         \n    TableScan: person projection=None\
2332         \n    TableScan: orders projection=None";
2333         quick_test(sql, expected);
2334     }
2335 
2336     #[test]
equijoin_explicit_syntax_3_tables()2337     fn equijoin_explicit_syntax_3_tables() {
2338         let sql = "SELECT id, order_id, l_description \
2339             FROM person \
2340             JOIN orders ON id = customer_id \
2341             JOIN lineitem ON o_item_id = l_item_id";
2342         let expected = "Projection: #id, #order_id, #l_description\
2343             \n  Join: o_item_id = l_item_id\
2344             \n    Join: id = customer_id\
2345             \n      TableScan: person projection=None\
2346             \n      TableScan: orders projection=None\
2347             \n    TableScan: lineitem projection=None";
2348         quick_test(sql, expected);
2349     }
2350 
2351     #[test]
boolean_literal_in_condition_expression()2352     fn boolean_literal_in_condition_expression() {
2353         let sql = "SELECT order_id \
2354         FROM orders \
2355         WHERE delivered = false OR delivered = true";
2356         let expected = "Projection: #order_id\
2357             \n  Filter: #delivered Eq Boolean(false) Or #delivered Eq Boolean(true)\
2358             \n    TableScan: orders projection=None";
2359         quick_test(sql, expected);
2360     }
2361 
2362     #[test]
select_typedstring()2363     fn select_typedstring() {
2364         let sql = "SELECT date '2020-12-10' AS date FROM person";
2365         let expected = "Projection: CAST(Utf8(\"2020-12-10\") AS Date32) AS date\
2366             \n  TableScan: person projection=None";
2367         quick_test(sql, expected);
2368     }
2369 
logical_plan(sql: &str) -> Result<LogicalPlan>2370     fn logical_plan(sql: &str) -> Result<LogicalPlan> {
2371         let planner = SqlToRel::new(&MockContextProvider {});
2372         let result = DFParser::parse_sql(&sql);
2373         let ast = result.unwrap();
2374         planner.statement_to_plan(&ast[0])
2375     }
2376 
2377     /// Create logical plan, write with formatter, compare to expected output
quick_test(sql: &str, expected: &str)2378     fn quick_test(sql: &str, expected: &str) {
2379         let plan = logical_plan(sql).unwrap();
2380         assert_eq!(expected, format!("{:?}", plan));
2381     }
2382 
2383     struct MockContextProvider {}
2384 
2385     impl ContextProvider for MockContextProvider {
get_table_provider( &self, name: &str, ) -> Option<Arc<dyn TableProvider + Send + Sync>>2386         fn get_table_provider(
2387             &self,
2388             name: &str,
2389         ) -> Option<Arc<dyn TableProvider + Send + Sync>> {
2390             let schema = match name {
2391                 "person" => Some(Schema::new(vec![
2392                     Field::new("id", DataType::UInt32, false),
2393                     Field::new("first_name", DataType::Utf8, false),
2394                     Field::new("last_name", DataType::Utf8, false),
2395                     Field::new("age", DataType::Int32, false),
2396                     Field::new("state", DataType::Utf8, false),
2397                     Field::new("salary", DataType::Float64, false),
2398                     Field::new(
2399                         "birth_date",
2400                         DataType::Timestamp(TimeUnit::Nanosecond, None),
2401                         false,
2402                     ),
2403                 ])),
2404                 "orders" => Some(Schema::new(vec![
2405                     Field::new("order_id", DataType::UInt32, false),
2406                     Field::new("customer_id", DataType::UInt32, false),
2407                     Field::new("o_item_id", DataType::Utf8, false),
2408                     Field::new("qty", DataType::Int32, false),
2409                     Field::new("price", DataType::Float64, false),
2410                     Field::new("delivered", DataType::Boolean, false),
2411                 ])),
2412                 "lineitem" => Some(Schema::new(vec![
2413                     Field::new("l_item_id", DataType::UInt32, false),
2414                     Field::new("l_description", DataType::Utf8, false),
2415                 ])),
2416                 "aggregate_test_100" => Some(Schema::new(vec![
2417                     Field::new("c1", DataType::Utf8, false),
2418                     Field::new("c2", DataType::UInt32, false),
2419                     Field::new("c3", DataType::Int8, false),
2420                     Field::new("c4", DataType::Int16, false),
2421                     Field::new("c5", DataType::Int32, false),
2422                     Field::new("c6", DataType::Int64, false),
2423                     Field::new("c7", DataType::UInt8, false),
2424                     Field::new("c8", DataType::UInt16, false),
2425                     Field::new("c9", DataType::UInt32, false),
2426                     Field::new("c10", DataType::UInt64, false),
2427                     Field::new("c11", DataType::Float32, false),
2428                     Field::new("c12", DataType::Float64, false),
2429                     Field::new("c13", DataType::Utf8, false),
2430                 ])),
2431                 _ => None,
2432             };
2433             schema.map(|s| -> Arc<dyn TableProvider + Send + Sync> {
2434                 Arc::new(EmptyTable::new(Arc::new(s)))
2435             })
2436         }
2437 
get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>2438         fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
2439             let f: ScalarFunctionImplementation =
2440                 Arc::new(|_| Err(DataFusionError::NotImplemented("".to_string())));
2441             match name {
2442                 "my_sqrt" => Some(Arc::new(create_udf(
2443                     "my_sqrt",
2444                     vec![DataType::Float64],
2445                     Arc::new(DataType::Float64),
2446                     f,
2447                 ))),
2448                 _ => None,
2449             }
2450         }
2451 
get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>>2452         fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
2453             unimplemented!()
2454         }
2455     }
2456 }
2457