1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 //! SQL Query Planner (produces logical plan from SQL AST)
19
20 use std::str::FromStr;
21 use std::sync::Arc;
22
23 use crate::datasource::TableProvider;
24 use crate::logical_plan::Expr::Alias;
25 use crate::logical_plan::{
26 and, lit, DFSchema, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
27 StringifiedPlan, ToDFSchema,
28 };
29 use crate::scalar::ScalarValue;
30 use crate::{
31 error::{DataFusionError, Result},
32 physical_plan::udaf::AggregateUDF,
33 };
34 use crate::{
35 physical_plan::udf::ScalarUDF,
36 physical_plan::{aggregates, functions},
37 sql::parser::{CreateExternalTable, FileType, Statement as DFStatement},
38 };
39
40 use arrow::datatypes::*;
41
42 use crate::prelude::JoinType;
43 use sqlparser::ast::{
44 BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
45 Join, JoinConstraint, JoinOperator, Query, Select, SelectItem, SetExpr, TableFactor,
46 TableWithJoins, UnaryOperator, Value,
47 };
48 use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
49 use sqlparser::ast::{OrderByExpr, Statement};
50 use sqlparser::parser::ParserError::ParserError;
51
52 use super::utils::{
53 can_columns_satisfy_exprs, expand_wildcard, expr_as_column_expr, extract_aliases,
54 find_aggregate_exprs, find_column_exprs, rebase_expr, resolve_aliases_to_exprs,
55 };
56
57 /// The ContextProvider trait allows the query planner to obtain meta-data about tables and
58 /// functions referenced in SQL statements
59 pub trait ContextProvider {
60 /// Getter for a datasource
get_table_provider( &self, name: &str, ) -> Option<Arc<dyn TableProvider + Send + Sync>>61 fn get_table_provider(
62 &self,
63 name: &str,
64 ) -> Option<Arc<dyn TableProvider + Send + Sync>>;
65 /// Getter for a UDF description
get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>66 fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
67 /// Getter for a UDAF description
get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>68 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>>;
69 }
70
71 /// SQL query planner
72 pub struct SqlToRel<'a, S: ContextProvider> {
73 schema_provider: &'a S,
74 }
75
76 impl<'a, S: ContextProvider> SqlToRel<'a, S> {
77 /// Create a new query planner
new(schema_provider: &'a S) -> Self78 pub fn new(schema_provider: &'a S) -> Self {
79 SqlToRel { schema_provider }
80 }
81
82 /// Generate a logical plan from an DataFusion SQL statement
statement_to_plan(&self, statement: &DFStatement) -> Result<LogicalPlan>83 pub fn statement_to_plan(&self, statement: &DFStatement) -> Result<LogicalPlan> {
84 match statement {
85 DFStatement::CreateExternalTable(s) => self.external_table_to_plan(&s),
86 DFStatement::Statement(s) => self.sql_statement_to_plan(&s),
87 }
88 }
89
90 /// Generate a logical plan from an SQL statement
sql_statement_to_plan(&self, sql: &Statement) -> Result<LogicalPlan>91 pub fn sql_statement_to_plan(&self, sql: &Statement) -> Result<LogicalPlan> {
92 match sql {
93 Statement::Explain {
94 verbose,
95 statement,
96 analyze: _,
97 } => self.explain_statement_to_plan(*verbose, &statement),
98 Statement::Query(query) => self.query_to_plan(&query),
99 _ => Err(DataFusionError::NotImplemented(
100 "Only SELECT statements are implemented".to_string(),
101 )),
102 }
103 }
104
105 /// Generate a logic plan from an SQL query
query_to_plan(&self, query: &Query) -> Result<LogicalPlan>106 pub fn query_to_plan(&self, query: &Query) -> Result<LogicalPlan> {
107 let plan = match &query.body {
108 SetExpr::Select(s) => self.select_to_plan(s.as_ref()),
109 _ => Err(DataFusionError::NotImplemented(format!(
110 "Query {} not implemented yet",
111 query.body
112 ))),
113 }?;
114
115 let plan = self.order_by(&plan, &query.order_by)?;
116
117 self.limit(&plan, &query.limit)
118 }
119
120 /// Generate a logical plan from a CREATE EXTERNAL TABLE statement
external_table_to_plan( &self, statement: &CreateExternalTable, ) -> Result<LogicalPlan>121 pub fn external_table_to_plan(
122 &self,
123 statement: &CreateExternalTable,
124 ) -> Result<LogicalPlan> {
125 let CreateExternalTable {
126 name,
127 columns,
128 file_type,
129 has_header,
130 location,
131 } = statement;
132
133 // semantic checks
134 match *file_type {
135 FileType::CSV => {
136 if columns.is_empty() {
137 return Err(DataFusionError::Plan(
138 "Column definitions required for CSV files. None found".into(),
139 ));
140 }
141 }
142 FileType::Parquet => {
143 if !columns.is_empty() {
144 return Err(DataFusionError::Plan(
145 "Column definitions can not be specified for PARQUET files."
146 .into(),
147 ));
148 }
149 }
150 FileType::NdJson => {}
151 };
152
153 let schema = self.build_schema(&columns)?;
154
155 Ok(LogicalPlan::CreateExternalTable {
156 schema: schema.to_dfschema_ref()?,
157 name: name.clone(),
158 location: location.clone(),
159 file_type: *file_type,
160 has_header: *has_header,
161 })
162 }
163
164 /// Generate a plan for EXPLAIN ... that will print out a plan
165 ///
explain_statement_to_plan( &self, verbose: bool, statement: &Statement, ) -> Result<LogicalPlan>166 pub fn explain_statement_to_plan(
167 &self,
168 verbose: bool,
169 statement: &Statement,
170 ) -> Result<LogicalPlan> {
171 let plan = self.sql_statement_to_plan(&statement)?;
172
173 let stringified_plans = vec![StringifiedPlan::new(
174 PlanType::LogicalPlan,
175 format!("{:#?}", plan),
176 )];
177
178 let schema = LogicalPlan::explain_schema();
179 let plan = Arc::new(plan);
180
181 Ok(LogicalPlan::Explain {
182 verbose,
183 plan,
184 stringified_plans,
185 schema: schema.to_dfschema_ref()?,
186 })
187 }
188
build_schema(&self, columns: &[SQLColumnDef]) -> Result<Schema>189 fn build_schema(&self, columns: &[SQLColumnDef]) -> Result<Schema> {
190 let mut fields = Vec::new();
191
192 for column in columns {
193 let data_type = self.make_data_type(&column.data_type)?;
194 let allow_null = column
195 .options
196 .iter()
197 .any(|x| x.option == ColumnOption::Null);
198 fields.push(Field::new(&column.name.value, data_type, allow_null));
199 }
200
201 Ok(Schema::new(fields))
202 }
203
204 /// Maps the SQL type to the corresponding Arrow `DataType`
make_data_type(&self, sql_type: &SQLDataType) -> Result<DataType>205 fn make_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
206 match sql_type {
207 SQLDataType::BigInt => Ok(DataType::Int64),
208 SQLDataType::Int => Ok(DataType::Int32),
209 SQLDataType::SmallInt => Ok(DataType::Int16),
210 SQLDataType::Char(_) | SQLDataType::Varchar(_) | SQLDataType::Text => {
211 Ok(DataType::Utf8)
212 }
213 SQLDataType::Decimal(_, _) => Ok(DataType::Float64),
214 SQLDataType::Float(_) => Ok(DataType::Float32),
215 SQLDataType::Real | SQLDataType::Double => Ok(DataType::Float64),
216 SQLDataType::Boolean => Ok(DataType::Boolean),
217 SQLDataType::Date => Ok(DataType::Date32),
218 SQLDataType::Time => Ok(DataType::Time64(TimeUnit::Millisecond)),
219 SQLDataType::Timestamp => Ok(DataType::Date64),
220 _ => Err(DataFusionError::NotImplemented(format!(
221 "The SQL data type {:?} is not implemented",
222 sql_type
223 ))),
224 }
225 }
226
plan_from_tables(&self, from: &[TableWithJoins]) -> Result<Vec<LogicalPlan>>227 fn plan_from_tables(&self, from: &[TableWithJoins]) -> Result<Vec<LogicalPlan>> {
228 match from.len() {
229 0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
230 _ => from
231 .iter()
232 .map(|t| self.plan_table_with_joins(t))
233 .collect::<Result<Vec<_>>>(),
234 }
235 }
236
plan_table_with_joins(&self, t: &TableWithJoins) -> Result<LogicalPlan>237 fn plan_table_with_joins(&self, t: &TableWithJoins) -> Result<LogicalPlan> {
238 let left = self.create_relation(&t.relation)?;
239 match t.joins.len() {
240 0 => Ok(left),
241 n => {
242 let mut left = self.parse_relation_join(&left, &t.joins[0])?;
243 for i in 1..n {
244 left = self.parse_relation_join(&left, &t.joins[i])?;
245 }
246 Ok(left)
247 }
248 }
249 }
250
parse_relation_join( &self, left: &LogicalPlan, join: &Join, ) -> Result<LogicalPlan>251 fn parse_relation_join(
252 &self,
253 left: &LogicalPlan,
254 join: &Join,
255 ) -> Result<LogicalPlan> {
256 let right = self.create_relation(&join.relation)?;
257 match &join.join_operator {
258 JoinOperator::LeftOuter(constraint) => {
259 self.parse_join(left, &right, constraint, JoinType::Left)
260 }
261 JoinOperator::RightOuter(constraint) => {
262 self.parse_join(left, &right, constraint, JoinType::Right)
263 }
264 JoinOperator::Inner(constraint) => {
265 self.parse_join(left, &right, constraint, JoinType::Inner)
266 }
267 other => Err(DataFusionError::NotImplemented(format!(
268 "Unsupported JOIN operator {:?}",
269 other
270 ))),
271 }
272 }
273
parse_join( &self, left: &LogicalPlan, right: &LogicalPlan, constraint: &JoinConstraint, join_type: JoinType, ) -> Result<LogicalPlan>274 fn parse_join(
275 &self,
276 left: &LogicalPlan,
277 right: &LogicalPlan,
278 constraint: &JoinConstraint,
279 join_type: JoinType,
280 ) -> Result<LogicalPlan> {
281 match constraint {
282 JoinConstraint::On(sql_expr) => {
283 let mut keys: Vec<(String, String)> = vec![];
284 let join_schema = left.schema().join(&right.schema())?;
285
286 // parse ON expression
287 let expr = self.sql_to_rex(sql_expr, &join_schema)?;
288
289 // extract join keys
290 extract_join_keys(&expr, &mut keys)?;
291 let left_keys: Vec<&str> =
292 keys.iter().map(|pair| pair.0.as_str()).collect();
293 let right_keys: Vec<&str> =
294 keys.iter().map(|pair| pair.1.as_str()).collect();
295
296 // return the logical plan representing the join
297 LogicalPlanBuilder::from(&left)
298 .join(&right, join_type, &left_keys, &right_keys)?
299 .build()
300 }
301 JoinConstraint::Using(idents) => {
302 let keys: Vec<&str> = idents.iter().map(|x| x.value.as_str()).collect();
303 LogicalPlanBuilder::from(&left)
304 .join(&right, join_type, &keys, &keys)?
305 .build()
306 }
307 JoinConstraint::Natural => {
308 // https://issues.apache.org/jira/browse/ARROW-10727
309 Err(DataFusionError::NotImplemented(
310 "NATURAL JOIN is not supported (https://issues.apache.org/jira/browse/ARROW-10727)".to_string(),
311 ))
312 }
313 JoinConstraint::None => Err(DataFusionError::NotImplemented(
314 "NONE contraint is not supported".to_string(),
315 )),
316 }
317 }
318
create_relation(&self, relation: &TableFactor) -> Result<LogicalPlan>319 fn create_relation(&self, relation: &TableFactor) -> Result<LogicalPlan> {
320 match relation {
321 TableFactor::Table { name, .. } => {
322 let table_name = name.to_string();
323 match self.schema_provider.get_table_provider(&table_name) {
324 Some(provider) => {
325 LogicalPlanBuilder::scan(&table_name, provider, None)?.build()
326 }
327 None => Err(DataFusionError::Plan(format!(
328 "no provider found for table {}",
329 name
330 ))),
331 }
332 }
333 TableFactor::Derived { subquery, .. } => self.query_to_plan(subquery),
334 TableFactor::NestedJoin(table_with_joins) => {
335 self.plan_table_with_joins(table_with_joins)
336 }
337 // @todo Support TableFactory::TableFunction?
338 _ => Err(DataFusionError::NotImplemented(format!(
339 "Unsupported ast node {:?} in create_relation",
340 relation
341 ))),
342 }
343 }
344
345 /// Generate a logic plan from an SQL select
select_to_plan(&self, select: &Select) -> Result<LogicalPlan>346 fn select_to_plan(&self, select: &Select) -> Result<LogicalPlan> {
347 let plans = self.plan_from_tables(&select.from)?;
348
349 let plan = match &select.selection {
350 Some(predicate_expr) => {
351 // build join schema
352 let mut fields = vec![];
353 for plan in &plans {
354 fields.extend_from_slice(&plan.schema().fields());
355 }
356 let join_schema = DFSchema::new(fields)?;
357
358 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema)?;
359
360 // look for expressions of the form `<column> = <column>`
361 let mut possible_join_keys = vec![];
362 extract_possible_join_keys(&filter_expr, &mut possible_join_keys)?;
363
364 let mut all_join_keys = vec![];
365 let mut left = plans[0].clone();
366 for right in plans.iter().skip(1) {
367 let left_schema = left.schema();
368 let right_schema = right.schema();
369 let mut join_keys = vec![];
370 for (l, r) in &possible_join_keys {
371 if left_schema.field_with_unqualified_name(l).is_ok()
372 && right_schema.field_with_unqualified_name(r).is_ok()
373 {
374 join_keys.push((l.as_str(), r.as_str()));
375 } else if left_schema.field_with_unqualified_name(r).is_ok()
376 && right_schema.field_with_unqualified_name(l).is_ok()
377 {
378 join_keys.push((r.as_str(), l.as_str()));
379 }
380 }
381 if join_keys.is_empty() {
382 return Err(DataFusionError::NotImplemented(
383 "Cartesian joins are not supported".to_string(),
384 ));
385 } else {
386 let left_keys: Vec<_> =
387 join_keys.iter().map(|(l, _)| *l).collect();
388 let right_keys: Vec<_> =
389 join_keys.iter().map(|(_, r)| *r).collect();
390 let builder = LogicalPlanBuilder::from(&left);
391 left = builder
392 .join(right, JoinType::Inner, &left_keys, &right_keys)?
393 .build()?;
394 }
395 all_join_keys.extend_from_slice(&join_keys);
396 }
397
398 // remove join expressions from filter
399 match remove_join_expressions(&filter_expr, &all_join_keys)? {
400 Some(filter_expr) => {
401 LogicalPlanBuilder::from(&left).filter(filter_expr)?.build()
402 }
403 _ => Ok(left),
404 }
405 }
406 None => {
407 if plans.len() == 1 {
408 Ok(plans[0].clone())
409 } else {
410 Err(DataFusionError::NotImplemented(
411 "Cartesian joins are not supported".to_string(),
412 ))
413 }
414 }
415 };
416 let plan = plan?;
417
418 // The SELECT expressions, with wildcards expanded.
419 let select_exprs = self.prepare_select_exprs(&plan, &select.projection)?;
420
421 // Optionally the HAVING expression.
422 let having_expr_opt = select
423 .having
424 .as_ref()
425 .map::<Result<Expr>, _>(|having_expr| {
426 let having_expr = self.sql_expr_to_logical_expr(having_expr)?;
427
428 // This step "dereferences" any aliases in the HAVING clause.
429 //
430 // This is how we support queries with HAVING expressions that
431 // refer to aliased columns.
432 //
433 // For example:
434 //
435 // SELECT c1 AS m FROM t HAVING m > 10;
436 // SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING m > 10;
437 //
438 // are rewritten as, respectively:
439 //
440 // SELECT c1 AS m FROM t HAVING c1 > 10;
441 // SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING MAX(c2) > 10;
442 //
443 let having_expr = resolve_aliases_to_exprs(
444 &having_expr,
445 &extract_aliases(&select_exprs),
446 )?;
447
448 Ok(having_expr)
449 })
450 .transpose()?;
451
452 // The outer expressions we will search through for
453 // aggregates. Aggregates may be sourced from the SELECT...
454 let mut aggr_expr_haystack = select_exprs.clone();
455
456 // ... or from the HAVING.
457 if let Some(having_expr) = &having_expr_opt {
458 aggr_expr_haystack.push(having_expr.clone());
459 }
460
461 // All of the aggregate expressions (deduplicated).
462 let aggr_exprs = find_aggregate_exprs(&aggr_expr_haystack);
463
464 let (plan, select_exprs_post_aggr, having_expr_post_aggr_opt) =
465 if !select.group_by.is_empty() || !aggr_exprs.is_empty() {
466 self.aggregate(
467 &plan,
468 &select_exprs,
469 &having_expr_opt,
470 &select.group_by,
471 &aggr_exprs,
472 )?
473 } else {
474 if let Some(having_expr) = &having_expr_opt {
475 let available_columns = select_exprs
476 .iter()
477 .map(|expr| expr_as_column_expr(expr, &plan))
478 .collect::<Result<Vec<Expr>>>()?;
479
480 // Ensure the HAVING expression is using only columns
481 // provided by the SELECT.
482 if !can_columns_satisfy_exprs(
483 &available_columns,
484 &[having_expr.clone()],
485 )? {
486 return Err(DataFusionError::Plan(
487 "Having references column(s) not provided by the select"
488 .to_owned(),
489 ));
490 }
491 }
492
493 (plan, select_exprs, having_expr_opt)
494 };
495
496 let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr_opt {
497 LogicalPlanBuilder::from(&plan)
498 .filter(having_expr_post_aggr)?
499 .build()?
500 } else {
501 plan
502 };
503
504 self.project(&plan, &select_exprs_post_aggr, false)
505 }
506
507 /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
508 ///
509 /// Wildcards are expanded into the concrete list of columns.
prepare_select_exprs( &self, plan: &LogicalPlan, projection: &[SelectItem], ) -> Result<Vec<Expr>>510 fn prepare_select_exprs(
511 &self,
512 plan: &LogicalPlan,
513 projection: &[SelectItem],
514 ) -> Result<Vec<Expr>> {
515 let input_schema = plan.schema();
516
517 Ok(projection
518 .iter()
519 .map(|expr| self.sql_select_to_rex(&expr, &input_schema))
520 .collect::<Result<Vec<Expr>>>()?
521 .iter()
522 .flat_map(|expr| expand_wildcard(&expr, &input_schema))
523 .collect::<Vec<Expr>>())
524 }
525
526 /// Wrap a plan in a projection
527 ///
528 /// If the `force` argument is `false`, the projection is applied only when
529 /// necessary, i.e., when the input fields are different than the
530 /// projection. Note that if the input fields are the same, but out of
531 /// order, the projection will be applied.
project( &self, input: &LogicalPlan, expr: &[Expr], force: bool, ) -> Result<LogicalPlan>532 fn project(
533 &self,
534 input: &LogicalPlan,
535 expr: &[Expr],
536 force: bool,
537 ) -> Result<LogicalPlan> {
538 self.validate_schema_satisfies_exprs(&input.schema(), &expr)?;
539 let plan = LogicalPlanBuilder::from(input).project(expr)?.build()?;
540
541 let project = force
542 || match input {
543 LogicalPlan::TableScan { .. } => true,
544 _ => plan.schema().fields() != input.schema().fields(),
545 };
546
547 if project {
548 Ok(plan)
549 } else {
550 Ok(input.clone())
551 }
552 }
553
aggregate( &self, input: &LogicalPlan, select_exprs: &[Expr], having_expr_opt: &Option<Expr>, group_by: &[SQLExpr], aggr_exprs: &[Expr], ) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)>554 fn aggregate(
555 &self,
556 input: &LogicalPlan,
557 select_exprs: &[Expr],
558 having_expr_opt: &Option<Expr>,
559 group_by: &[SQLExpr],
560 aggr_exprs: &[Expr],
561 ) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
562 let group_by_exprs = group_by
563 .iter()
564 .map(|e| self.sql_to_rex(e, &input.schema()))
565 .collect::<Result<Vec<Expr>>>()?;
566
567 let aggr_projection_exprs = group_by_exprs
568 .iter()
569 .chain(aggr_exprs.iter())
570 .cloned()
571 .collect::<Vec<Expr>>();
572
573 let plan = LogicalPlanBuilder::from(&input)
574 .aggregate(&group_by_exprs, aggr_exprs)?
575 .build()?;
576
577 // After aggregation, these are all of the columns that will be
578 // available to next phases of planning.
579 let column_exprs_post_aggr = aggr_projection_exprs
580 .iter()
581 .map(|expr| expr_as_column_expr(expr, input))
582 .collect::<Result<Vec<Expr>>>()?;
583
584 // Rewrite the SELECT expression to use the columns produced by the
585 // aggregation.
586 let select_exprs_post_aggr = select_exprs
587 .iter()
588 .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input))
589 .collect::<Result<Vec<Expr>>>()?;
590
591 if !can_columns_satisfy_exprs(&column_exprs_post_aggr, &select_exprs_post_aggr)? {
592 return Err(DataFusionError::Plan(
593 "Projection references non-aggregate values".to_owned(),
594 ));
595 }
596
597 // Rewrite the HAVING expression to use the columns produced by the
598 // aggregation.
599 let having_expr_post_aggr_opt = if let Some(having_expr) = having_expr_opt {
600 let having_expr_post_aggr =
601 rebase_expr(having_expr, &aggr_projection_exprs, input)?;
602
603 if !can_columns_satisfy_exprs(
604 &column_exprs_post_aggr,
605 &[having_expr_post_aggr.clone()],
606 )? {
607 return Err(DataFusionError::Plan(
608 "Having references non-aggregate values".to_owned(),
609 ));
610 }
611
612 Some(having_expr_post_aggr)
613 } else {
614 None
615 };
616
617 Ok((plan, select_exprs_post_aggr, having_expr_post_aggr_opt))
618 }
619
620 /// Wrap a plan in a limit
limit(&self, input: &LogicalPlan, limit: &Option<SQLExpr>) -> Result<LogicalPlan>621 fn limit(&self, input: &LogicalPlan, limit: &Option<SQLExpr>) -> Result<LogicalPlan> {
622 match *limit {
623 Some(ref limit_expr) => {
624 let n = match self.sql_to_rex(&limit_expr, &input.schema())? {
625 Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as usize),
626 _ => Err(DataFusionError::Plan(
627 "Unexpected expression for LIMIT clause".to_string(),
628 )),
629 }?;
630
631 LogicalPlanBuilder::from(&input).limit(n)?.build()
632 }
633 _ => Ok(input.clone()),
634 }
635 }
636
637 /// Wrap the logical in a sort
order_by( &self, plan: &LogicalPlan, order_by: &[OrderByExpr], ) -> Result<LogicalPlan>638 fn order_by(
639 &self,
640 plan: &LogicalPlan,
641 order_by: &[OrderByExpr],
642 ) -> Result<LogicalPlan> {
643 if order_by.is_empty() {
644 return Ok(plan.clone());
645 }
646
647 let input_schema = plan.schema();
648 let order_by_rex: Result<Vec<Expr>> = order_by
649 .iter()
650 .map(|e| {
651 Ok(Expr::Sort {
652 expr: Box::new(self.sql_to_rex(&e.expr, &input_schema)?),
653 // by default asc
654 asc: e.asc.unwrap_or(true),
655 // by default nulls first to be consistent with spark
656 nulls_first: e.nulls_first.unwrap_or(true),
657 })
658 })
659 .collect();
660
661 LogicalPlanBuilder::from(&plan)
662 .sort(&order_by_rex?)?
663 .build()
664 }
665
666 /// Validate the schema provides all of the columns referenced in the expressions.
validate_schema_satisfies_exprs( &self, schema: &DFSchema, exprs: &[Expr], ) -> Result<()>667 fn validate_schema_satisfies_exprs(
668 &self,
669 schema: &DFSchema,
670 exprs: &[Expr],
671 ) -> Result<()> {
672 find_column_exprs(exprs)
673 .iter()
674 .try_for_each(|col| match col {
675 Expr::Column(name) => {
676 schema.field_with_unqualified_name(&name).map_err(|_| {
677 DataFusionError::Plan(format!(
678 "Invalid identifier '{}' for schema {}",
679 name,
680 schema.to_string()
681 ))
682 })?;
683 Ok(())
684 }
685 _ => Err(DataFusionError::Internal("Not a column".to_string())),
686 })
687 }
688
689 /// Generate a relational expression from a select SQL expression
sql_select_to_rex(&self, sql: &SelectItem, schema: &DFSchema) -> Result<Expr>690 fn sql_select_to_rex(&self, sql: &SelectItem, schema: &DFSchema) -> Result<Expr> {
691 match sql {
692 SelectItem::UnnamedExpr(expr) => self.sql_to_rex(expr, schema),
693 SelectItem::ExprWithAlias { expr, alias } => Ok(Alias(
694 Box::new(self.sql_to_rex(&expr, schema)?),
695 alias.value.clone(),
696 )),
697 SelectItem::Wildcard => Ok(Expr::Wildcard),
698 SelectItem::QualifiedWildcard(_) => Err(DataFusionError::NotImplemented(
699 "Qualified wildcards are not supported".to_string(),
700 )),
701 }
702 }
703
704 /// Generate a relational expression from a SQL expression
sql_to_rex(&self, sql: &SQLExpr, schema: &DFSchema) -> Result<Expr>705 pub fn sql_to_rex(&self, sql: &SQLExpr, schema: &DFSchema) -> Result<Expr> {
706 let expr = self.sql_expr_to_logical_expr(sql)?;
707 self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
708 Ok(expr)
709 }
710
sql_fn_arg_to_logical_expr(&self, sql: &FunctionArg) -> Result<Expr>711 fn sql_fn_arg_to_logical_expr(&self, sql: &FunctionArg) -> Result<Expr> {
712 match sql {
713 FunctionArg::Named { name: _, arg } => self.sql_expr_to_logical_expr(arg),
714 FunctionArg::Unnamed(value) => self.sql_expr_to_logical_expr(value),
715 }
716 }
717
sql_expr_to_logical_expr(&self, sql: &SQLExpr) -> Result<Expr>718 fn sql_expr_to_logical_expr(&self, sql: &SQLExpr) -> Result<Expr> {
719 match sql {
720 SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
721 Ok(n) => Ok(lit(n)),
722 Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
723 },
724 SQLExpr::Value(Value::SingleQuotedString(ref s)) => Ok(lit(s.clone())),
725
726 SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
727
728 SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))),
729 SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
730 fun: functions::BuiltinScalarFunction::DatePart,
731 args: vec![
732 Expr::Literal(ScalarValue::Utf8(Some(format!("{}", field)))),
733 self.sql_expr_to_logical_expr(expr)?,
734 ],
735 }),
736
737 SQLExpr::Value(Value::Interval {
738 value,
739 leading_field,
740 leading_precision,
741 last_field,
742 fractional_seconds_precision,
743 }) => self.sql_interval_to_literal(
744 value,
745 leading_field,
746 leading_precision,
747 last_field,
748 fractional_seconds_precision,
749 ),
750
751 SQLExpr::Identifier(ref id) => {
752 if &id.value[0..1] == "@" {
753 let var_names = vec![id.value.clone()];
754 Ok(Expr::ScalarVariable(var_names))
755 } else {
756 Ok(Expr::Column(id.value.to_string()))
757 }
758 }
759
760 SQLExpr::CompoundIdentifier(ids) => {
761 let mut var_names = vec![];
762 for id in ids {
763 var_names.push(id.value.clone());
764 }
765 if &var_names[0][0..1] == "@" {
766 Ok(Expr::ScalarVariable(var_names))
767 } else {
768 Err(DataFusionError::NotImplemented(format!(
769 "Unsupported compound identifier '{:?}'",
770 var_names,
771 )))
772 }
773 }
774
775 SQLExpr::Wildcard => Ok(Expr::Wildcard),
776
777 SQLExpr::Case {
778 operand,
779 conditions,
780 results,
781 else_result,
782 } => {
783 let expr = if let Some(e) = operand {
784 Some(Box::new(self.sql_expr_to_logical_expr(e)?))
785 } else {
786 None
787 };
788 let when_expr = conditions
789 .iter()
790 .map(|e| self.sql_expr_to_logical_expr(e))
791 .collect::<Result<Vec<_>>>()?;
792 let then_expr = results
793 .iter()
794 .map(|e| self.sql_expr_to_logical_expr(e))
795 .collect::<Result<Vec<_>>>()?;
796 let else_expr = if let Some(e) = else_result {
797 Some(Box::new(self.sql_expr_to_logical_expr(e)?))
798 } else {
799 None
800 };
801
802 Ok(Expr::Case {
803 expr,
804 when_then_expr: when_expr
805 .iter()
806 .zip(then_expr.iter())
807 .map(|(w, t)| (Box::new(w.to_owned()), Box::new(t.to_owned())))
808 .collect(),
809 else_expr,
810 })
811 }
812
813 SQLExpr::Cast {
814 ref expr,
815 ref data_type,
816 } => Ok(Expr::Cast {
817 expr: Box::new(self.sql_expr_to_logical_expr(&expr)?),
818 data_type: convert_data_type(data_type)?,
819 }),
820
821 SQLExpr::TypedString {
822 ref data_type,
823 ref value,
824 } => Ok(Expr::Cast {
825 expr: Box::new(lit(&**value)),
826 data_type: convert_data_type(data_type)?,
827 }),
828
829 SQLExpr::IsNull(ref expr) => {
830 Ok(Expr::IsNull(Box::new(self.sql_expr_to_logical_expr(expr)?)))
831 }
832
833 SQLExpr::IsNotNull(ref expr) => Ok(Expr::IsNotNull(Box::new(
834 self.sql_expr_to_logical_expr(expr)?,
835 ))),
836
837 SQLExpr::UnaryOp { ref op, ref expr } => match op {
838 UnaryOperator::Not => {
839 Ok(Expr::Not(Box::new(self.sql_expr_to_logical_expr(expr)?)))
840 }
841 UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr)?),
842 UnaryOperator::Minus => {
843 match expr.as_ref() {
844 // optimization: if it's a number literal, we applly the negative operator
845 // here directly to calculate the new literal.
846 SQLExpr::Value(Value::Number(n,_)) => match n.parse::<i64>() {
847 Ok(n) => Ok(lit(-n)),
848 Err(_) => Ok(lit(-n
849 .parse::<f64>()
850 .map_err(|_e| {
851 DataFusionError::Internal(format!(
852 "negative operator can be only applied to integer and float operands, got: {}",
853 n))
854 })?)),
855 },
856 // not a literal, apply negative operator on expression
857 _ => Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr)?))),
858 }
859 }
860 _ => Err(DataFusionError::NotImplemented(format!(
861 "Unsupported SQL unary operator {:?}",
862 op
863 ))),
864 },
865
866 SQLExpr::Between {
867 ref expr,
868 ref negated,
869 ref low,
870 ref high,
871 } => Ok(Expr::Between {
872 expr: Box::new(self.sql_expr_to_logical_expr(&expr)?),
873 negated: *negated,
874 low: Box::new(self.sql_expr_to_logical_expr(&low)?),
875 high: Box::new(self.sql_expr_to_logical_expr(&high)?),
876 }),
877
878 SQLExpr::InList {
879 ref expr,
880 ref list,
881 ref negated,
882 } => {
883 let list_expr = list
884 .iter()
885 .map(|e| self.sql_expr_to_logical_expr(e))
886 .collect::<Result<Vec<_>>>()?;
887
888 Ok(Expr::InList {
889 expr: Box::new(self.sql_expr_to_logical_expr(&expr)?),
890 list: list_expr,
891 negated: *negated,
892 })
893 }
894
895 SQLExpr::BinaryOp {
896 ref left,
897 ref op,
898 ref right,
899 } => {
900 let operator = match *op {
901 BinaryOperator::Gt => Ok(Operator::Gt),
902 BinaryOperator::GtEq => Ok(Operator::GtEq),
903 BinaryOperator::Lt => Ok(Operator::Lt),
904 BinaryOperator::LtEq => Ok(Operator::LtEq),
905 BinaryOperator::Eq => Ok(Operator::Eq),
906 BinaryOperator::NotEq => Ok(Operator::NotEq),
907 BinaryOperator::Plus => Ok(Operator::Plus),
908 BinaryOperator::Minus => Ok(Operator::Minus),
909 BinaryOperator::Multiply => Ok(Operator::Multiply),
910 BinaryOperator::Divide => Ok(Operator::Divide),
911 BinaryOperator::Modulus => Ok(Operator::Modulus),
912 BinaryOperator::And => Ok(Operator::And),
913 BinaryOperator::Or => Ok(Operator::Or),
914 BinaryOperator::Like => Ok(Operator::Like),
915 BinaryOperator::NotLike => Ok(Operator::NotLike),
916 _ => Err(DataFusionError::NotImplemented(format!(
917 "Unsupported SQL binary operator {:?}",
918 op
919 ))),
920 }?;
921
922 Ok(Expr::BinaryExpr {
923 left: Box::new(self.sql_expr_to_logical_expr(&left)?),
924 op: operator,
925 right: Box::new(self.sql_expr_to_logical_expr(&right)?),
926 })
927 }
928
929 SQLExpr::Function(function) => {
930 let name: String = function.name.to_string();
931
932 // first, scalar built-in
933 if let Ok(fun) = functions::BuiltinScalarFunction::from_str(&name) {
934 let args = function
935 .args
936 .iter()
937 .map(|a| self.sql_fn_arg_to_logical_expr(a))
938 .collect::<Result<Vec<Expr>>>()?;
939
940 return Ok(Expr::ScalarFunction { fun, args });
941 };
942
943 // next, aggregate built-ins
944 if let Ok(fun) = aggregates::AggregateFunction::from_str(&name) {
945 let args = if fun == aggregates::AggregateFunction::Count {
946 function
947 .args
948 .iter()
949 .map(|a| match a {
950 FunctionArg::Unnamed(SQLExpr::Value(Value::Number(
951 _,
952 _,
953 ))) => Ok(lit(1_u8)),
954 FunctionArg::Unnamed(SQLExpr::Wildcard) => Ok(lit(1_u8)),
955 _ => self.sql_fn_arg_to_logical_expr(a),
956 })
957 .collect::<Result<Vec<Expr>>>()?
958 } else {
959 function
960 .args
961 .iter()
962 .map(|a| self.sql_fn_arg_to_logical_expr(a))
963 .collect::<Result<Vec<Expr>>>()?
964 };
965
966 return Ok(Expr::AggregateFunction {
967 fun,
968 distinct: function.distinct,
969 args,
970 });
971 };
972
973 // finally, user-defined functions (UDF) and UDAF
974 match self.schema_provider.get_function_meta(&name) {
975 Some(fm) => {
976 let args = function
977 .args
978 .iter()
979 .map(|a| self.sql_fn_arg_to_logical_expr(a))
980 .collect::<Result<Vec<Expr>>>()?;
981
982 Ok(Expr::ScalarUDF { fun: fm, args })
983 }
984 None => match self.schema_provider.get_aggregate_meta(&name) {
985 Some(fm) => {
986 let args = function
987 .args
988 .iter()
989 .map(|a| self.sql_fn_arg_to_logical_expr(a))
990 .collect::<Result<Vec<Expr>>>()?;
991
992 Ok(Expr::AggregateUDF { fun: fm, args })
993 }
994 _ => Err(DataFusionError::Plan(format!(
995 "Invalid function '{}'",
996 name
997 ))),
998 },
999 }
1000 }
1001
1002 SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(&e),
1003
1004 _ => Err(DataFusionError::NotImplemented(format!(
1005 "Unsupported ast node {:?} in sqltorel",
1006 sql
1007 ))),
1008 }
1009 }
1010
sql_interval_to_literal( &self, value: &str, leading_field: &Option<DateTimeField>, leading_precision: &Option<u64>, last_field: &Option<DateTimeField>, fractional_seconds_precision: &Option<u64>, ) -> Result<Expr>1011 fn sql_interval_to_literal(
1012 &self,
1013 value: &str,
1014 leading_field: &Option<DateTimeField>,
1015 leading_precision: &Option<u64>,
1016 last_field: &Option<DateTimeField>,
1017 fractional_seconds_precision: &Option<u64>,
1018 ) -> Result<Expr> {
1019 if leading_field.is_some() {
1020 return Err(DataFusionError::NotImplemented(format!(
1021 "Unsupported Interval Expression with leading_field {:?}",
1022 leading_field
1023 )));
1024 }
1025
1026 if leading_precision.is_some() {
1027 return Err(DataFusionError::NotImplemented(format!(
1028 "Unsupported Interval Expression with leading_precision {:?}",
1029 leading_precision
1030 )));
1031 }
1032
1033 if last_field.is_some() {
1034 return Err(DataFusionError::NotImplemented(format!(
1035 "Unsupported Interval Expression with last_field {:?}",
1036 last_field
1037 )));
1038 }
1039
1040 if fractional_seconds_precision.is_some() {
1041 return Err(DataFusionError::NotImplemented(format!(
1042 "Unsupported Interval Expression with fractional_seconds_precision {:?}",
1043 fractional_seconds_precision
1044 )));
1045 }
1046
1047 const SECONDS_PER_HOUR: f32 = 3_600_f32;
1048 const MILLIS_PER_SECOND: f32 = 1_000_f32;
1049
1050 // We are storing parts as integers, it's why we need to align parts fractional
1051 // INTERVAL '0.5 MONTH' = 15 days, INTERVAL '1.5 MONTH' = 1 month 15 days
1052 // INTERVAL '0.5 DAY' = 12 hours, INTERVAL '1.5 DAY' = 1 day 12 hours
1053 let align_interval_parts = |month_part: f32,
1054 mut day_part: f32,
1055 mut milles_part: f32|
1056 -> (i32, i32, f32) {
1057 // Convert fractional month to days, It's not supported by Arrow types, but anyway
1058 day_part += (month_part - (month_part as i32) as f32) * 30_f32;
1059
1060 // Convert fractional days to hours
1061 milles_part += (day_part - ((day_part as i32) as f32))
1062 * 24_f32
1063 * SECONDS_PER_HOUR
1064 * MILLIS_PER_SECOND;
1065
1066 (month_part as i32, day_part as i32, milles_part)
1067 };
1068
1069 let calculate_from_part = |interval_period_str: &str,
1070 interval_type: &str|
1071 -> Result<(i32, i32, f32)> {
1072 // @todo It's better to use Decimal in order to protect rounding errors
1073 // Wait https://github.com/apache/arrow/pull/9232
1074 let interval_period = match f32::from_str(interval_period_str) {
1075 Ok(n) => n,
1076 Err(_) => {
1077 return Err(DataFusionError::SQL(ParserError(format!(
1078 "Unsupported Interval Expression with value {:?}",
1079 value
1080 ))))
1081 }
1082 };
1083
1084 if interval_period > (i32::MAX as f32) {
1085 return Err(DataFusionError::NotImplemented(format!(
1086 "Interval field value out of range: {:?}",
1087 value
1088 )));
1089 }
1090
1091 match interval_type.to_lowercase().as_str() {
1092 "year" => Ok(align_interval_parts(interval_period * 12_f32, 0.0, 0.0)),
1093 "month" => Ok(align_interval_parts(interval_period, 0.0, 0.0)),
1094 "day" | "days" => Ok(align_interval_parts(0.0, interval_period, 0.0)),
1095 "hour" | "hours" => {
1096 Ok((0, 0, interval_period * SECONDS_PER_HOUR * MILLIS_PER_SECOND))
1097 }
1098 "minutes" | "minute" => {
1099 Ok((0, 0, interval_period * 60_f32 * MILLIS_PER_SECOND))
1100 }
1101 "seconds" | "second" => Ok((0, 0, interval_period * MILLIS_PER_SECOND)),
1102 "milliseconds" | "millisecond" => Ok((0, 0, interval_period)),
1103 _ => Err(DataFusionError::NotImplemented(format!(
1104 "Invalid input syntax for type interval: {:?}",
1105 value
1106 ))),
1107 }
1108 };
1109
1110 let mut result_month: i64 = 0;
1111 let mut result_days: i64 = 0;
1112 let mut result_millis: i64 = 0;
1113
1114 let mut parts = value.split_whitespace();
1115
1116 loop {
1117 let interval_period_str = parts.next();
1118 if interval_period_str.is_none() {
1119 break;
1120 }
1121
1122 let (diff_month, diff_days, diff_millis) = calculate_from_part(
1123 interval_period_str.unwrap(),
1124 parts.next().unwrap_or("second"),
1125 )?;
1126
1127 result_month += diff_month as i64;
1128
1129 if result_month > (i32::MAX as i64) {
1130 return Err(DataFusionError::NotImplemented(format!(
1131 "Interval field value out of range: {:?}",
1132 value
1133 )));
1134 }
1135
1136 result_days += diff_days as i64;
1137
1138 if result_days > (i32::MAX as i64) {
1139 return Err(DataFusionError::NotImplemented(format!(
1140 "Interval field value out of range: {:?}",
1141 value
1142 )));
1143 }
1144
1145 result_millis += diff_millis as i64;
1146
1147 if result_millis > (i32::MAX as i64) {
1148 return Err(DataFusionError::NotImplemented(format!(
1149 "Interval field value out of range: {:?}",
1150 value
1151 )));
1152 }
1153 }
1154
1155 // Interval is tricky thing
1156 // 1 day is not 24 hours because timezones, 1 year != 365/364! 30 days != 1 month
1157 // The true way to store and calculate intervals is to store it as it defined
1158 // Due the fact that Arrow supports only two types YearMonth (month) and DayTime (day, time)
1159 // It's not possible to store complex intervals
1160 // It's possible to do select (NOW() + INTERVAL '1 year') + INTERVAL '1 day'; as workaround
1161 if result_month != 0 && (result_days != 0 || result_millis != 0) {
1162 return Err(DataFusionError::NotImplemented(format!(
1163 "DF does not support intervals that have both a Year/Month part as well as Days/Hours/Mins/Seconds: {:?}. Hint: try breaking the interval into two parts, one with Year/Month and the other with Days/Hours/Mins/Seconds - e.g. (NOW() + INTERVAL '1 year') + INTERVAL '1 day'",
1164 value
1165 )));
1166 }
1167
1168 if result_month != 0 {
1169 return Ok(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
1170 result_month as i32,
1171 ))));
1172 }
1173
1174 let result: i64 = (result_days << 32) | result_millis;
1175 Ok(Expr::Literal(ScalarValue::IntervalDayTime(Some(result))))
1176 }
1177 }
1178
1179 /// Remove join expressions from a filter expression
remove_join_expressions( expr: &Expr, join_columns: &[(&str, &str)], ) -> Result<Option<Expr>>1180 fn remove_join_expressions(
1181 expr: &Expr,
1182 join_columns: &[(&str, &str)],
1183 ) -> Result<Option<Expr>> {
1184 match expr {
1185 Expr::BinaryExpr { left, op, right } => match op {
1186 Operator::Eq => match (left.as_ref(), right.as_ref()) {
1187 (Expr::Column(l), Expr::Column(r)) => {
1188 if join_columns.contains(&(l, r)) || join_columns.contains(&(r, l)) {
1189 Ok(None)
1190 } else {
1191 Ok(Some(expr.clone()))
1192 }
1193 }
1194 _ => Ok(Some(expr.clone())),
1195 },
1196 Operator::And => {
1197 let l = remove_join_expressions(left, join_columns)?;
1198 let r = remove_join_expressions(right, join_columns)?;
1199 match (l, r) {
1200 (Some(ll), Some(rr)) => Ok(Some(and(ll, rr))),
1201 (Some(ll), _) => Ok(Some(ll)),
1202 (_, Some(rr)) => Ok(Some(rr)),
1203 _ => Ok(None),
1204 }
1205 }
1206 _ => Ok(Some(expr.clone())),
1207 },
1208 _ => Ok(Some(expr.clone())),
1209 }
1210 }
1211
1212 /// Parse equijoin ON condition which could be a single Eq or multiple conjunctive Eqs
1213 ///
1214 /// Examples
1215 ///
1216 /// foo = bar
1217 /// foo = bar AND bar = baz AND ...
1218 ///
extract_join_keys(expr: &Expr, accum: &mut Vec<(String, String)>) -> Result<()>1219 fn extract_join_keys(expr: &Expr, accum: &mut Vec<(String, String)>) -> Result<()> {
1220 match expr {
1221 Expr::BinaryExpr { left, op, right } => match op {
1222 Operator::Eq => match (left.as_ref(), right.as_ref()) {
1223 (Expr::Column(l), Expr::Column(r)) => {
1224 accum.push((l.to_owned(), r.to_owned()));
1225 Ok(())
1226 }
1227 other => Err(DataFusionError::SQL(ParserError(format!(
1228 "Unsupported expression '{:?}' in JOIN condition",
1229 other
1230 )))),
1231 },
1232 Operator::And => {
1233 extract_join_keys(left, accum)?;
1234 extract_join_keys(right, accum)
1235 }
1236 other => Err(DataFusionError::SQL(ParserError(format!(
1237 "Unsupported expression '{:?}' in JOIN condition",
1238 other
1239 )))),
1240 },
1241 other => Err(DataFusionError::SQL(ParserError(format!(
1242 "Unsupported expression '{:?}' in JOIN condition",
1243 other
1244 )))),
1245 }
1246 }
1247
1248 /// Extract join keys from a WHERE clause
extract_possible_join_keys( expr: &Expr, accum: &mut Vec<(String, String)>, ) -> Result<()>1249 fn extract_possible_join_keys(
1250 expr: &Expr,
1251 accum: &mut Vec<(String, String)>,
1252 ) -> Result<()> {
1253 match expr {
1254 Expr::BinaryExpr { left, op, right } => match op {
1255 Operator::Eq => match (left.as_ref(), right.as_ref()) {
1256 (Expr::Column(l), Expr::Column(r)) => {
1257 accum.push((l.to_owned(), r.to_owned()));
1258 Ok(())
1259 }
1260 _ => Ok(()),
1261 },
1262 Operator::And => {
1263 extract_possible_join_keys(left, accum)?;
1264 extract_possible_join_keys(right, accum)
1265 }
1266 _ => Ok(()),
1267 },
1268 _ => Ok(()),
1269 }
1270 }
1271
1272 /// Convert SQL data type to relational representation of data type
convert_data_type(sql: &SQLDataType) -> Result<DataType>1273 pub fn convert_data_type(sql: &SQLDataType) -> Result<DataType> {
1274 match sql {
1275 SQLDataType::Boolean => Ok(DataType::Boolean),
1276 SQLDataType::SmallInt => Ok(DataType::Int16),
1277 SQLDataType::Int => Ok(DataType::Int32),
1278 SQLDataType::BigInt => Ok(DataType::Int64),
1279 SQLDataType::Float(_) | SQLDataType::Real => Ok(DataType::Float64),
1280 SQLDataType::Double => Ok(DataType::Float64),
1281 SQLDataType::Char(_) | SQLDataType::Varchar(_) => Ok(DataType::Utf8),
1282 SQLDataType::Timestamp => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
1283 SQLDataType::Date => Ok(DataType::Date32),
1284 other => Err(DataFusionError::NotImplemented(format!(
1285 "Unsupported SQL type {:?}",
1286 other
1287 ))),
1288 }
1289 }
1290
1291 #[cfg(test)]
1292 mod tests {
1293 use super::*;
1294 use crate::datasource::empty::EmptyTable;
1295 use crate::{logical_plan::create_udf, sql::parser::DFParser};
1296 use functions::ScalarFunctionImplementation;
1297
1298 const PERSON_COLUMN_NAMES: &str =
1299 "id, first_name, last_name, age, state, salary, birth_date";
1300
1301 #[test]
select_no_relation()1302 fn select_no_relation() {
1303 quick_test(
1304 "SELECT 1",
1305 "Projection: Int64(1)\
1306 \n EmptyRelation",
1307 );
1308 }
1309
1310 #[test]
select_column_does_not_exist()1311 fn select_column_does_not_exist() {
1312 let sql = "SELECT doesnotexist FROM person";
1313 let err = logical_plan(sql).expect_err("query should have failed");
1314 assert_eq!(
1315 format!(
1316 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1317 PERSON_COLUMN_NAMES
1318 ),
1319 format!("{:?}", err)
1320 );
1321 }
1322
1323 #[test]
select_repeated_column()1324 fn select_repeated_column() {
1325 let sql = "SELECT age, age FROM person";
1326 let err = logical_plan(sql).expect_err("query should have failed");
1327 assert_eq!(
1328 "Plan(\"Projections require unique expression names but the expression \\\"#age\\\" at position 0 and \\\"#age\\\" at position 1 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1329 format!("{:?}", err)
1330 );
1331 }
1332
1333 #[test]
select_wildcard_with_repeated_column()1334 fn select_wildcard_with_repeated_column() {
1335 let sql = "SELECT *, age FROM person";
1336 let err = logical_plan(sql).expect_err("query should have failed");
1337 assert_eq!(
1338 "Plan(\"Projections require unique expression names but the expression \\\"#age\\\" at position 3 and \\\"#age\\\" at position 7 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1339 format!("{:?}", err)
1340 );
1341 }
1342
1343 #[test]
select_wildcard_with_repeated_column_but_is_aliased()1344 fn select_wildcard_with_repeated_column_but_is_aliased() {
1345 quick_test(
1346 "SELECT *, first_name AS fn from person",
1347 "Projection: #id, #first_name, #last_name, #age, #state, #salary, #birth_date, #first_name AS fn\
1348 \n TableScan: person projection=None",
1349 );
1350 }
1351
1352 #[test]
select_scalar_func_with_literal_no_relation()1353 fn select_scalar_func_with_literal_no_relation() {
1354 quick_test(
1355 "SELECT sqrt(9)",
1356 "Projection: sqrt(Int64(9))\
1357 \n EmptyRelation",
1358 );
1359 }
1360
1361 #[test]
select_simple_filter()1362 fn select_simple_filter() {
1363 let sql = "SELECT id, first_name, last_name \
1364 FROM person WHERE state = 'CO'";
1365 let expected = "Projection: #id, #first_name, #last_name\
1366 \n Filter: #state Eq Utf8(\"CO\")\
1367 \n TableScan: person projection=None";
1368 quick_test(sql, expected);
1369 }
1370
1371 #[test]
select_filter_column_does_not_exist()1372 fn select_filter_column_does_not_exist() {
1373 let sql = "SELECT first_name FROM person WHERE doesnotexist = 'A'";
1374 let err = logical_plan(sql).expect_err("query should have failed");
1375 assert_eq!(
1376 format!(
1377 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1378 PERSON_COLUMN_NAMES
1379 ),
1380 format!("{:?}", err)
1381 );
1382 }
1383
1384 #[test]
select_filter_cannot_use_alias()1385 fn select_filter_cannot_use_alias() {
1386 let sql = "SELECT first_name AS x FROM person WHERE x = 'A'";
1387 let err = logical_plan(sql).expect_err("query should have failed");
1388 assert_eq!(
1389 format!(
1390 "Plan(\"Invalid identifier \\\'x\\\' for schema {}\")",
1391 PERSON_COLUMN_NAMES
1392 ),
1393 format!("{:?}", err)
1394 );
1395 }
1396
1397 #[test]
select_neg_filter()1398 fn select_neg_filter() {
1399 let sql = "SELECT id, first_name, last_name \
1400 FROM person WHERE NOT state";
1401 let expected = "Projection: #id, #first_name, #last_name\
1402 \n Filter: NOT #state\
1403 \n TableScan: person projection=None";
1404 quick_test(sql, expected);
1405 }
1406
1407 #[test]
select_compound_filter()1408 fn select_compound_filter() {
1409 let sql = "SELECT id, first_name, last_name \
1410 FROM person WHERE state = 'CO' AND age >= 21 AND age <= 65";
1411 let expected = "Projection: #id, #first_name, #last_name\
1412 \n Filter: #state Eq Utf8(\"CO\") And #age GtEq Int64(21) And #age LtEq Int64(65)\
1413 \n TableScan: person projection=None";
1414 quick_test(sql, expected);
1415 }
1416
1417 #[test]
test_timestamp_filter()1418 fn test_timestamp_filter() {
1419 let sql =
1420 "SELECT state FROM person WHERE birth_date < CAST (158412331400600000 as timestamp)";
1421
1422 let expected = "Projection: #state\
1423 \n Filter: #birth_date Lt CAST(Int64(158412331400600000) AS Timestamp(Nanosecond, None))\
1424 \n TableScan: person projection=None";
1425
1426 quick_test(sql, expected);
1427 }
1428
1429 #[test]
test_date_filter()1430 fn test_date_filter() {
1431 let sql =
1432 "SELECT state FROM person WHERE birth_date < CAST ('2020-01-01' as date)";
1433
1434 let expected = "Projection: #state\
1435 \n Filter: #birth_date Lt CAST(Utf8(\"2020-01-01\") AS Date32)\
1436 \n TableScan: person projection=None";
1437
1438 quick_test(sql, expected);
1439 }
1440
1441 #[test]
select_all_boolean_operators()1442 fn select_all_boolean_operators() {
1443 let sql = "SELECT age, first_name, last_name \
1444 FROM person \
1445 WHERE age = 21 \
1446 AND age != 21 \
1447 AND age > 21 \
1448 AND age >= 21 \
1449 AND age < 65 \
1450 AND age <= 65";
1451 let expected = "Projection: #age, #first_name, #last_name\
1452 \n Filter: #age Eq Int64(21) \
1453 And #age NotEq Int64(21) \
1454 And #age Gt Int64(21) \
1455 And #age GtEq Int64(21) \
1456 And #age Lt Int64(65) \
1457 And #age LtEq Int64(65)\
1458 \n TableScan: person projection=None";
1459 quick_test(sql, expected);
1460 }
1461
1462 #[test]
select_between()1463 fn select_between() {
1464 let sql = "SELECT state FROM person WHERE age BETWEEN 21 AND 65";
1465 let expected = "Projection: #state\
1466 \n Filter: #age BETWEEN Int64(21) AND Int64(65)\
1467 \n TableScan: person projection=None";
1468
1469 quick_test(sql, expected);
1470 }
1471
1472 #[test]
select_between_negated()1473 fn select_between_negated() {
1474 let sql = "SELECT state FROM person WHERE age NOT BETWEEN 21 AND 65";
1475 let expected = "Projection: #state\
1476 \n Filter: #age NOT BETWEEN Int64(21) AND Int64(65)\
1477 \n TableScan: person projection=None";
1478
1479 quick_test(sql, expected);
1480 }
1481
1482 #[test]
select_nested()1483 fn select_nested() {
1484 let sql = "SELECT fn2, last_name
1485 FROM (
1486 SELECT fn1 as fn2, last_name, birth_date
1487 FROM (
1488 SELECT first_name AS fn1, last_name, birth_date, age
1489 FROM person
1490 )
1491 )";
1492 let expected = "Projection: #fn2, #last_name\
1493 \n Projection: #fn1 AS fn2, #last_name, #birth_date\
1494 \n Projection: #first_name AS fn1, #last_name, #birth_date, #age\
1495 \n TableScan: person projection=None";
1496 quick_test(sql, expected);
1497 }
1498
1499 #[test]
select_nested_with_filters()1500 fn select_nested_with_filters() {
1501 let sql = "SELECT fn1, age
1502 FROM (
1503 SELECT first_name AS fn1, age
1504 FROM person
1505 WHERE age > 20
1506 )
1507 WHERE fn1 = 'X' AND age < 30";
1508
1509 let expected = "Filter: #fn1 Eq Utf8(\"X\") And #age Lt Int64(30)\
1510 \n Projection: #first_name AS fn1, #age\
1511 \n Filter: #age Gt Int64(20)\
1512 \n TableScan: person projection=None";
1513
1514 quick_test(sql, expected);
1515 }
1516
1517 #[test]
select_with_having()1518 fn select_with_having() {
1519 let sql = "SELECT id, age
1520 FROM person
1521 HAVING age > 100 AND age < 200";
1522 let expected = "Projection: #id, #age\
1523 \n Filter: #age Gt Int64(100) And #age Lt Int64(200)\
1524 \n TableScan: person projection=None";
1525 quick_test(sql, expected);
1526 }
1527
1528 #[test]
select_with_having_referencing_column_not_in_select()1529 fn select_with_having_referencing_column_not_in_select() {
1530 let sql = "SELECT id, age
1531 FROM person
1532 HAVING first_name = 'M'";
1533 let err = logical_plan(sql).expect_err("query should have failed");
1534 assert_eq!(
1535 "Plan(\"Having references column(s) not provided by the select\")",
1536 format!("{:?}", err)
1537 );
1538 }
1539
1540 #[test]
select_with_having_referencing_column_nested_in_select_expression()1541 fn select_with_having_referencing_column_nested_in_select_expression() {
1542 let sql = "SELECT id, age + 1
1543 FROM person
1544 HAVING age > 100";
1545 let err = logical_plan(sql).expect_err("query should have failed");
1546 assert_eq!(
1547 "Plan(\"Having references column(s) not provided by the select\")",
1548 format!("{:?}", err)
1549 );
1550 }
1551
1552 #[test]
select_with_having_with_aggregate_not_in_select()1553 fn select_with_having_with_aggregate_not_in_select() {
1554 let sql = "SELECT first_name
1555 FROM person
1556 HAVING MAX(age) > 100";
1557 let err = logical_plan(sql).expect_err("query should have failed");
1558 assert_eq!(
1559 "Plan(\"Projection references non-aggregate values\")",
1560 format!("{:?}", err)
1561 );
1562 }
1563
1564 #[test]
select_aggregate_with_having_that_reuses_aggregate()1565 fn select_aggregate_with_having_that_reuses_aggregate() {
1566 let sql = "SELECT MAX(age)
1567 FROM person
1568 HAVING MAX(age) < 30";
1569 let expected = "Filter: #MAX(age) Lt Int64(30)\
1570 \n Aggregate: groupBy=[[]], aggr=[[MAX(#age)]]\
1571 \n TableScan: person projection=None";
1572 quick_test(sql, expected);
1573 }
1574
1575 #[test]
select_aggregate_with_having_with_aggregate_not_in_select()1576 fn select_aggregate_with_having_with_aggregate_not_in_select() {
1577 let sql = "SELECT MAX(age)
1578 FROM person
1579 HAVING MAX(first_name) > 'M'";
1580 let expected = "Projection: #MAX(age)\
1581 \n Filter: #MAX(first_name) Gt Utf8(\"M\")\
1582 \n Aggregate: groupBy=[[]], aggr=[[MAX(#age), MAX(#first_name)]]\
1583 \n TableScan: person projection=None";
1584 quick_test(sql, expected);
1585 }
1586
1587 #[test]
select_aggregate_with_having_referencing_column_not_in_select()1588 fn select_aggregate_with_having_referencing_column_not_in_select() {
1589 let sql = "SELECT COUNT(*)
1590 FROM person
1591 HAVING first_name = 'M'";
1592 let err = logical_plan(sql).expect_err("query should have failed");
1593 assert_eq!(
1594 "Plan(\"Having references non-aggregate values\")",
1595 format!("{:?}", err)
1596 );
1597 }
1598
1599 #[test]
select_aggregate_aliased_with_having_referencing_aggregate_by_its_alias()1600 fn select_aggregate_aliased_with_having_referencing_aggregate_by_its_alias() {
1601 let sql = "SELECT MAX(age) as max_age
1602 FROM person
1603 HAVING max_age < 30";
1604 let expected = "Projection: #MAX(age) AS max_age\
1605 \n Filter: #MAX(age) Lt Int64(30)\
1606 \n Aggregate: groupBy=[[]], aggr=[[MAX(#age)]]\
1607 \n TableScan: person projection=None";
1608 quick_test(sql, expected);
1609 }
1610
1611 #[test]
select_aggregate_aliased_with_having_that_reuses_aggregate_but_not_by_its_alias()1612 fn select_aggregate_aliased_with_having_that_reuses_aggregate_but_not_by_its_alias() {
1613 let sql = "SELECT MAX(age) as max_age
1614 FROM person
1615 HAVING MAX(age) < 30";
1616 let expected = "Projection: #MAX(age) AS max_age\
1617 \n Filter: #MAX(age) Lt Int64(30)\
1618 \n Aggregate: groupBy=[[]], aggr=[[MAX(#age)]]\
1619 \n TableScan: person projection=None";
1620 quick_test(sql, expected);
1621 }
1622
1623 #[test]
select_aggregate_with_group_by_with_having()1624 fn select_aggregate_with_group_by_with_having() {
1625 let sql = "SELECT first_name, MAX(age)
1626 FROM person
1627 GROUP BY first_name
1628 HAVING first_name = 'M'";
1629 let expected = "Filter: #first_name Eq Utf8(\"M\")\
1630 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1631 \n TableScan: person projection=None";
1632 quick_test(sql, expected);
1633 }
1634
1635 #[test]
select_aggregate_with_group_by_with_having_and_where()1636 fn select_aggregate_with_group_by_with_having_and_where() {
1637 let sql = "SELECT first_name, MAX(age)
1638 FROM person
1639 WHERE id > 5
1640 GROUP BY first_name
1641 HAVING MAX(age) < 100";
1642 let expected = "Filter: #MAX(age) Lt Int64(100)\
1643 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1644 \n Filter: #id Gt Int64(5)\
1645 \n TableScan: person projection=None";
1646 quick_test(sql, expected);
1647 }
1648
1649 #[test]
select_aggregate_with_group_by_with_having_and_where_filtering_on_aggregate_column( )1650 fn select_aggregate_with_group_by_with_having_and_where_filtering_on_aggregate_column(
1651 ) {
1652 let sql = "SELECT first_name, MAX(age)
1653 FROM person
1654 WHERE id > 5 AND age > 18
1655 GROUP BY first_name
1656 HAVING MAX(age) < 100";
1657 let expected = "Filter: #MAX(age) Lt Int64(100)\
1658 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1659 \n Filter: #id Gt Int64(5) And #age Gt Int64(18)\
1660 \n TableScan: person projection=None";
1661 quick_test(sql, expected);
1662 }
1663
1664 #[test]
select_aggregate_with_group_by_with_having_using_column_by_alias()1665 fn select_aggregate_with_group_by_with_having_using_column_by_alias() {
1666 let sql = "SELECT first_name AS fn, MAX(age)
1667 FROM person
1668 GROUP BY first_name
1669 HAVING MAX(age) > 2 AND fn = 'M'";
1670 let expected = "Projection: #first_name AS fn, #MAX(age)\
1671 \n Filter: #MAX(age) Gt Int64(2) And #first_name Eq Utf8(\"M\")\
1672 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1673 \n TableScan: person projection=None";
1674 quick_test(sql, expected);
1675 }
1676
1677 #[test]
select_aggregate_with_group_by_with_having_using_columns_with_and_without_their_aliases( )1678 fn select_aggregate_with_group_by_with_having_using_columns_with_and_without_their_aliases(
1679 ) {
1680 let sql = "SELECT first_name AS fn, MAX(age) AS max_age
1681 FROM person
1682 GROUP BY first_name
1683 HAVING MAX(age) > 2 AND max_age < 5 AND first_name = 'M' AND fn = 'N'";
1684 let expected = "Projection: #first_name AS fn, #MAX(age) AS max_age\
1685 \n Filter: #MAX(age) Gt Int64(2) And #MAX(age) Lt Int64(5) And #first_name Eq Utf8(\"M\") And #first_name Eq Utf8(\"N\")\
1686 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1687 \n TableScan: person projection=None";
1688 quick_test(sql, expected);
1689 }
1690
1691 #[test]
select_aggregate_with_group_by_with_having_that_reuses_aggregate()1692 fn select_aggregate_with_group_by_with_having_that_reuses_aggregate() {
1693 let sql = "SELECT first_name, MAX(age)
1694 FROM person
1695 GROUP BY first_name
1696 HAVING MAX(age) > 100";
1697 let expected = "Filter: #MAX(age) Gt Int64(100)\
1698 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1699 \n TableScan: person projection=None";
1700 quick_test(sql, expected);
1701 }
1702
1703 #[test]
select_aggregate_with_group_by_with_having_referencing_column_not_in_group_by()1704 fn select_aggregate_with_group_by_with_having_referencing_column_not_in_group_by() {
1705 let sql = "SELECT first_name, MAX(age)
1706 FROM person
1707 GROUP BY first_name
1708 HAVING MAX(age) > 10 AND last_name = 'M'";
1709 let err = logical_plan(sql).expect_err("query should have failed");
1710 assert_eq!(
1711 "Plan(\"Having references non-aggregate values\")",
1712 format!("{:?}", err)
1713 );
1714 }
1715
1716 #[test]
select_aggregate_with_group_by_with_having_that_reuses_aggregate_multiple_times()1717 fn select_aggregate_with_group_by_with_having_that_reuses_aggregate_multiple_times() {
1718 let sql = "SELECT first_name, MAX(age)
1719 FROM person
1720 GROUP BY first_name
1721 HAVING MAX(age) > 100 AND MAX(age) < 200";
1722 let expected = "Filter: #MAX(age) Gt Int64(100) And #MAX(age) Lt Int64(200)\
1723 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1724 \n TableScan: person projection=None";
1725 quick_test(sql, expected);
1726 }
1727
1728 #[test]
select_aggregate_with_group_by_with_having_using_aggreagate_not_in_select()1729 fn select_aggregate_with_group_by_with_having_using_aggreagate_not_in_select() {
1730 let sql = "SELECT first_name, MAX(age)
1731 FROM person
1732 GROUP BY first_name
1733 HAVING MAX(age) > 100 AND MIN(id) < 50";
1734 let expected = "Projection: #first_name, #MAX(age)\
1735 \n Filter: #MAX(age) Gt Int64(100) And #MIN(id) Lt Int64(50)\
1736 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age), MIN(#id)]]\
1737 \n TableScan: person projection=None";
1738 quick_test(sql, expected);
1739 }
1740
1741 #[test]
select_aggregate_aliased_with_group_by_with_having_referencing_aggregate_by_its_alias( )1742 fn select_aggregate_aliased_with_group_by_with_having_referencing_aggregate_by_its_alias(
1743 ) {
1744 let sql = "SELECT first_name, MAX(age) AS max_age
1745 FROM person
1746 GROUP BY first_name
1747 HAVING max_age > 100";
1748 let expected = "Projection: #first_name, #MAX(age) AS max_age\
1749 \n Filter: #MAX(age) Gt Int64(100)\
1750 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1751 \n TableScan: person projection=None";
1752 quick_test(sql, expected);
1753 }
1754
1755 #[test]
select_aggregate_compound_aliased_with_group_by_with_having_referencing_compound_aggregate_by_its_alias( )1756 fn select_aggregate_compound_aliased_with_group_by_with_having_referencing_compound_aggregate_by_its_alias(
1757 ) {
1758 let sql = "SELECT first_name, MAX(age) + 1 AS max_age_plus_one
1759 FROM person
1760 GROUP BY first_name
1761 HAVING max_age_plus_one > 100";
1762 let expected =
1763 "Projection: #first_name, #MAX(age) Plus Int64(1) AS max_age_plus_one\
1764 \n Filter: #MAX(age) Plus Int64(1) Gt Int64(100)\
1765 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age)]]\
1766 \n TableScan: person projection=None";
1767 quick_test(sql, expected);
1768 }
1769
1770 #[test]
select_aggregate_with_group_by_with_having_using_derived_column_aggreagate_not_in_select( )1771 fn select_aggregate_with_group_by_with_having_using_derived_column_aggreagate_not_in_select(
1772 ) {
1773 let sql = "SELECT first_name, MAX(age)
1774 FROM person
1775 GROUP BY first_name
1776 HAVING MAX(age) > 100 AND MIN(id - 2) < 50";
1777 let expected = "Projection: #first_name, #MAX(age)\
1778 \n Filter: #MAX(age) Gt Int64(100) And #MIN(id Minus Int64(2)) Lt Int64(50)\
1779 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age), MIN(#id Minus Int64(2))]]\
1780 \n TableScan: person projection=None";
1781 quick_test(sql, expected);
1782 }
1783
1784 #[test]
select_aggregate_with_group_by_with_having_using_count_star_not_in_select()1785 fn select_aggregate_with_group_by_with_having_using_count_star_not_in_select() {
1786 let sql = "SELECT first_name, MAX(age)
1787 FROM person
1788 GROUP BY first_name
1789 HAVING MAX(age) > 100 AND COUNT(*) < 50";
1790 let expected = "Projection: #first_name, #MAX(age)\
1791 \n Filter: #MAX(age) Gt Int64(100) And #COUNT(UInt8(1)) Lt Int64(50)\
1792 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#age), COUNT(UInt8(1))]]\
1793 \n TableScan: person projection=None";
1794 quick_test(sql, expected);
1795 }
1796
1797 #[test]
select_binary_expr()1798 fn select_binary_expr() {
1799 let sql = "SELECT age + salary from person";
1800 let expected = "Projection: #age Plus #salary\
1801 \n TableScan: person projection=None";
1802 quick_test(sql, expected);
1803 }
1804
1805 #[test]
select_binary_expr_nested()1806 fn select_binary_expr_nested() {
1807 let sql = "SELECT (age + salary)/2 from person";
1808 let expected = "Projection: #age Plus #salary Divide Int64(2)\
1809 \n TableScan: person projection=None";
1810 quick_test(sql, expected);
1811 }
1812
1813 #[test]
select_wildcard_with_groupby()1814 fn select_wildcard_with_groupby() {
1815 quick_test(
1816 "SELECT * FROM person GROUP BY id, first_name, last_name, age, state, salary, birth_date",
1817 "Aggregate: groupBy=[[#id, #first_name, #last_name, #age, #state, #salary, #birth_date]], aggr=[[]]\
1818 \n TableScan: person projection=None",
1819 );
1820 quick_test(
1821 "SELECT * FROM (SELECT first_name, last_name FROM person) GROUP BY first_name, last_name",
1822 "Aggregate: groupBy=[[#first_name, #last_name]], aggr=[[]]\
1823 \n Projection: #first_name, #last_name\
1824 \n TableScan: person projection=None",
1825 );
1826 }
1827
1828 #[test]
select_simple_aggregate()1829 fn select_simple_aggregate() {
1830 quick_test(
1831 "SELECT MIN(age) FROM person",
1832 "Aggregate: groupBy=[[]], aggr=[[MIN(#age)]]\
1833 \n TableScan: person projection=None",
1834 );
1835 }
1836
1837 #[test]
test_sum_aggregate()1838 fn test_sum_aggregate() {
1839 quick_test(
1840 "SELECT SUM(age) from person",
1841 "Aggregate: groupBy=[[]], aggr=[[SUM(#age)]]\
1842 \n TableScan: person projection=None",
1843 );
1844 }
1845
1846 #[test]
select_simple_aggregate_column_does_not_exist()1847 fn select_simple_aggregate_column_does_not_exist() {
1848 let sql = "SELECT MIN(doesnotexist) FROM person";
1849 let err = logical_plan(sql).expect_err("query should have failed");
1850 assert_eq!(
1851 format!(
1852 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1853 PERSON_COLUMN_NAMES
1854 ),
1855 format!("{:?}", err)
1856 );
1857 }
1858
1859 #[test]
select_simple_aggregate_repeated_aggregate()1860 fn select_simple_aggregate_repeated_aggregate() {
1861 let sql = "SELECT MIN(age), MIN(age) FROM person";
1862 let err = logical_plan(sql).expect_err("query should have failed");
1863 assert_eq!(
1864 "Plan(\"Projections require unique expression names but the expression \\\"#MIN(age)\\\" at position 0 and \\\"#MIN(age)\\\" at position 1 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1865 format!("{:?}", err)
1866 );
1867 }
1868
1869 #[test]
select_simple_aggregate_repeated_aggregate_with_single_alias()1870 fn select_simple_aggregate_repeated_aggregate_with_single_alias() {
1871 quick_test(
1872 "SELECT MIN(age), MIN(age) AS a FROM person",
1873 "Projection: #MIN(age), #MIN(age) AS a\
1874 \n Aggregate: groupBy=[[]], aggr=[[MIN(#age)]]\
1875 \n TableScan: person projection=None",
1876 );
1877 }
1878
1879 #[test]
select_simple_aggregate_repeated_aggregate_with_unique_aliases()1880 fn select_simple_aggregate_repeated_aggregate_with_unique_aliases() {
1881 quick_test(
1882 "SELECT MIN(age) AS a, MIN(age) AS b FROM person",
1883 "Projection: #MIN(age) AS a, #MIN(age) AS b\
1884 \n Aggregate: groupBy=[[]], aggr=[[MIN(#age)]]\
1885 \n TableScan: person projection=None",
1886 );
1887 }
1888
1889 #[test]
select_simple_aggregate_repeated_aggregate_with_repeated_aliases()1890 fn select_simple_aggregate_repeated_aggregate_with_repeated_aliases() {
1891 let sql = "SELECT MIN(age) AS a, MIN(age) AS a FROM person";
1892 let err = logical_plan(sql).expect_err("query should have failed");
1893 assert_eq!(
1894 "Plan(\"Projections require unique expression names but the expression \\\"#MIN(age) AS a\\\" at position 0 and \\\"#MIN(age) AS a\\\" at position 1 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1895 format!("{:?}", err)
1896 );
1897 }
1898
1899 #[test]
select_simple_aggregate_with_groupby()1900 fn select_simple_aggregate_with_groupby() {
1901 quick_test(
1902 "SELECT state, MIN(age), MAX(age) FROM person GROUP BY state",
1903 "Aggregate: groupBy=[[#state]], aggr=[[MIN(#age), MAX(#age)]]\
1904 \n TableScan: person projection=None",
1905 );
1906 }
1907
1908 #[test]
select_simple_aggregate_with_groupby_with_aliases()1909 fn select_simple_aggregate_with_groupby_with_aliases() {
1910 quick_test(
1911 "SELECT state AS a, MIN(age) AS b FROM person GROUP BY state",
1912 "Projection: #state AS a, #MIN(age) AS b\
1913 \n Aggregate: groupBy=[[#state]], aggr=[[MIN(#age)]]\
1914 \n TableScan: person projection=None",
1915 );
1916 }
1917
1918 #[test]
select_simple_aggregate_with_groupby_with_aliases_repeated()1919 fn select_simple_aggregate_with_groupby_with_aliases_repeated() {
1920 let sql = "SELECT state AS a, MIN(age) AS a FROM person GROUP BY state";
1921 let err = logical_plan(sql).expect_err("query should have failed");
1922 assert_eq!(
1923 "Plan(\"Projections require unique expression names but the expression \\\"#state AS a\\\" at position 0 and \\\"#MIN(age) AS a\\\" at position 1 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
1924 format!("{:?}", err)
1925 );
1926 }
1927
1928 #[test]
select_simple_aggregate_with_groupby_column_unselected()1929 fn select_simple_aggregate_with_groupby_column_unselected() {
1930 quick_test(
1931 "SELECT MIN(age), MAX(age) FROM person GROUP BY state",
1932 "Projection: #MIN(age), #MAX(age)\
1933 \n Aggregate: groupBy=[[#state]], aggr=[[MIN(#age), MAX(#age)]]\
1934 \n TableScan: person projection=None",
1935 );
1936 }
1937
1938 #[test]
select_simple_aggregate_with_groupby_and_column_in_group_by_does_not_exist()1939 fn select_simple_aggregate_with_groupby_and_column_in_group_by_does_not_exist() {
1940 let sql = "SELECT SUM(age) FROM person GROUP BY doesnotexist";
1941 let err = logical_plan(sql).expect_err("query should have failed");
1942 assert_eq!(
1943 format!(
1944 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1945 PERSON_COLUMN_NAMES
1946 ),
1947 format!("{:?}", err)
1948 );
1949 }
1950
1951 #[test]
select_simple_aggregate_with_groupby_and_column_in_aggregate_does_not_exist()1952 fn select_simple_aggregate_with_groupby_and_column_in_aggregate_does_not_exist() {
1953 let sql = "SELECT SUM(doesnotexist) FROM person GROUP BY first_name";
1954 let err = logical_plan(sql).expect_err("query should have failed");
1955 assert_eq!(
1956 format!(
1957 "Plan(\"Invalid identifier \\\'doesnotexist\\\' for schema {}\")",
1958 PERSON_COLUMN_NAMES
1959 ),
1960 format!("{:?}", err)
1961 );
1962 }
1963
1964 #[test]
select_interval_out_of_range()1965 fn select_interval_out_of_range() {
1966 let sql = "SELECT INTERVAL '100000000000000000 day'";
1967 let err = logical_plan(sql).expect_err("query should have failed");
1968 assert_eq!(
1969 "NotImplemented(\"Interval field value out of range: \\\"100000000000000000 day\\\"\")",
1970 format!("{:?}", err)
1971 );
1972 }
1973
1974 #[test]
select_unsupported_complex_interval()1975 fn select_unsupported_complex_interval() {
1976 let sql = "SELECT INTERVAL '1 year 1 day'";
1977 let err = logical_plan(sql).expect_err("query should have failed");
1978 assert_eq!(
1979 "NotImplemented(\"DF does not support intervals that have both a Year/Month part as well as Days/Hours/Mins/Seconds: \\\"1 year 1 day\\\". Hint: try breaking the interval into two parts, one with Year/Month and the other with Days/Hours/Mins/Seconds - e.g. (NOW() + INTERVAL \\\'1 year\\\') + INTERVAL \\\'1 day\\\'\")",
1980 format!("{:?}", err)
1981 );
1982 }
1983
1984 #[test]
select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby()1985 fn select_simple_aggregate_with_groupby_and_column_is_in_aggregate_and_groupby() {
1986 quick_test(
1987 "SELECT MAX(first_name) FROM person GROUP BY first_name",
1988 "Projection: #MAX(first_name)\
1989 \n Aggregate: groupBy=[[#first_name]], aggr=[[MAX(#first_name)]]\
1990 \n TableScan: person projection=None",
1991 );
1992 }
1993
1994 #[test]
select_simple_aggregate_with_groupby_cannot_use_alias()1995 fn select_simple_aggregate_with_groupby_cannot_use_alias() {
1996 let sql = "SELECT state AS x, MAX(age) FROM person GROUP BY x";
1997 let err = logical_plan(sql).expect_err("query should have failed");
1998 assert_eq!(
1999 format!(
2000 "Plan(\"Invalid identifier \\\'x\\\' for schema {}\")",
2001 PERSON_COLUMN_NAMES
2002 ),
2003 format!("{:?}", err)
2004 );
2005 }
2006
2007 #[test]
select_simple_aggregate_with_groupby_aggregate_repeated()2008 fn select_simple_aggregate_with_groupby_aggregate_repeated() {
2009 let sql = "SELECT state, MIN(age), MIN(age) FROM person GROUP BY state";
2010 let err = logical_plan(sql).expect_err("query should have failed");
2011 assert_eq!(
2012 "Plan(\"Projections require unique expression names but the expression \\\"#MIN(age)\\\" at position 1 and \\\"#MIN(age)\\\" at position 2 have the same name. Consider aliasing (\\\"AS\\\") one of them.\")",
2013 format!("{:?}", err)
2014 );
2015 }
2016
2017 #[test]
select_simple_aggregate_with_groupby_aggregate_repeated_and_one_has_alias()2018 fn select_simple_aggregate_with_groupby_aggregate_repeated_and_one_has_alias() {
2019 quick_test(
2020 "SELECT state, MIN(age), MIN(age) AS ma FROM person GROUP BY state",
2021 "Projection: #state, #MIN(age), #MIN(age) AS ma\
2022 \n Aggregate: groupBy=[[#state]], aggr=[[MIN(#age)]]\
2023 \n TableScan: person projection=None",
2024 )
2025 }
2026 #[test]
select_simple_aggregate_with_groupby_non_column_expression_unselected()2027 fn select_simple_aggregate_with_groupby_non_column_expression_unselected() {
2028 quick_test(
2029 "SELECT MIN(first_name) FROM person GROUP BY age + 1",
2030 "Projection: #MIN(first_name)\
2031 \n Aggregate: groupBy=[[#age Plus Int64(1)]], aggr=[[MIN(#first_name)]]\
2032 \n TableScan: person projection=None",
2033 );
2034 }
2035
2036 #[test]
select_simple_aggregate_with_groupby_non_column_expression_selected_and_resolvable( )2037 fn select_simple_aggregate_with_groupby_non_column_expression_selected_and_resolvable(
2038 ) {
2039 quick_test(
2040 "SELECT age + 1, MIN(first_name) FROM person GROUP BY age + 1",
2041 "Aggregate: groupBy=[[#age Plus Int64(1)]], aggr=[[MIN(#first_name)]]\
2042 \n TableScan: person projection=None",
2043 );
2044 quick_test(
2045 "SELECT MIN(first_name), age + 1 FROM person GROUP BY age + 1",
2046 "Projection: #MIN(first_name), #age Plus Int64(1)\
2047 \n Aggregate: groupBy=[[#age Plus Int64(1)]], aggr=[[MIN(#first_name)]]\
2048 \n TableScan: person projection=None",
2049 );
2050 }
2051
2052 #[test]
select_simple_aggregate_with_groupby_non_column_expression_nested_and_resolvable()2053 fn select_simple_aggregate_with_groupby_non_column_expression_nested_and_resolvable()
2054 {
2055 quick_test(
2056 "SELECT ((age + 1) / 2) * (age + 1), MIN(first_name) FROM person GROUP BY age + 1",
2057 "Projection: #age Plus Int64(1) Divide Int64(2) Multiply #age Plus Int64(1), #MIN(first_name)\
2058 \n Aggregate: groupBy=[[#age Plus Int64(1)]], aggr=[[MIN(#first_name)]]\
2059 \n TableScan: person projection=None",
2060 );
2061 }
2062
2063 #[test]
select_simple_aggregate_with_groupby_non_column_expression_nested_and_not_resolvable( )2064 fn select_simple_aggregate_with_groupby_non_column_expression_nested_and_not_resolvable(
2065 ) {
2066 // The query should fail, because age + 9 is not in the group by.
2067 let sql =
2068 "SELECT ((age + 1) / 2) * (age + 9), MIN(first_name) FROM person GROUP BY age + 1";
2069 let err = logical_plan(sql).expect_err("query should have failed");
2070 assert_eq!(
2071 "Plan(\"Projection references non-aggregate values\")",
2072 format!("{:?}", err)
2073 );
2074 }
2075
2076 #[test]
select_simple_aggregate_with_groupby_non_column_expression_and_its_column_selected( )2077 fn select_simple_aggregate_with_groupby_non_column_expression_and_its_column_selected(
2078 ) {
2079 let sql = "SELECT age, MIN(first_name) FROM person GROUP BY age + 1";
2080 let err = logical_plan(sql).expect_err("query should have failed");
2081 assert_eq!(
2082 "Plan(\"Projection references non-aggregate values\")",
2083 format!("{:?}", err)
2084 );
2085 }
2086
2087 #[test]
select_simple_aggregate_nested_in_binary_expr_with_groupby()2088 fn select_simple_aggregate_nested_in_binary_expr_with_groupby() {
2089 quick_test(
2090 "SELECT state, MIN(age) < 10 FROM person GROUP BY state",
2091 "Projection: #state, #MIN(age) Lt Int64(10)\
2092 \n Aggregate: groupBy=[[#state]], aggr=[[MIN(#age)]]\
2093 \n TableScan: person projection=None",
2094 );
2095 }
2096
2097 #[test]
select_simple_aggregate_and_nested_groupby_column()2098 fn select_simple_aggregate_and_nested_groupby_column() {
2099 quick_test(
2100 "SELECT age + 1, MAX(first_name) FROM person GROUP BY age",
2101 "Projection: #age Plus Int64(1), #MAX(first_name)\
2102 \n Aggregate: groupBy=[[#age]], aggr=[[MAX(#first_name)]]\
2103 \n TableScan: person projection=None",
2104 );
2105 }
2106
2107 #[test]
select_aggregate_compounded_with_groupby_column()2108 fn select_aggregate_compounded_with_groupby_column() {
2109 quick_test(
2110 "SELECT age + MIN(salary) FROM person GROUP BY age",
2111 "Projection: #age Plus #MIN(salary)\
2112 \n Aggregate: groupBy=[[#age]], aggr=[[MIN(#salary)]]\
2113 \n TableScan: person projection=None",
2114 );
2115 }
2116
2117 #[test]
select_aggregate_with_non_column_inner_expression_with_groupby()2118 fn select_aggregate_with_non_column_inner_expression_with_groupby() {
2119 quick_test(
2120 "SELECT state, MIN(age + 1) FROM person GROUP BY state",
2121 "Aggregate: groupBy=[[#state]], aggr=[[MIN(#age Plus Int64(1))]]\
2122 \n TableScan: person projection=None",
2123 );
2124 }
2125
2126 #[test]
test_wildcard()2127 fn test_wildcard() {
2128 quick_test(
2129 "SELECT * from person",
2130 "Projection: #id, #first_name, #last_name, #age, #state, #salary, #birth_date\
2131 \n TableScan: person projection=None",
2132 );
2133 }
2134
2135 #[test]
select_count_one()2136 fn select_count_one() {
2137 let sql = "SELECT COUNT(1) FROM person";
2138 let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
2139 \n TableScan: person projection=None";
2140 quick_test(sql, expected);
2141 }
2142
2143 #[test]
select_count_column()2144 fn select_count_column() {
2145 let sql = "SELECT COUNT(id) FROM person";
2146 let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(#id)]]\
2147 \n TableScan: person projection=None";
2148 quick_test(sql, expected);
2149 }
2150
2151 #[test]
select_scalar_func()2152 fn select_scalar_func() {
2153 let sql = "SELECT sqrt(age) FROM person";
2154 let expected = "Projection: sqrt(#age)\
2155 \n TableScan: person projection=None";
2156 quick_test(sql, expected);
2157 }
2158
2159 #[test]
select_aliased_scalar_func()2160 fn select_aliased_scalar_func() {
2161 let sql = "SELECT sqrt(age) AS square_people FROM person";
2162 let expected = "Projection: sqrt(#age) AS square_people\
2163 \n TableScan: person projection=None";
2164 quick_test(sql, expected);
2165 }
2166
2167 #[test]
select_where_nullif_division()2168 fn select_where_nullif_division() {
2169 let sql = "SELECT c3/(c4+c5) \
2170 FROM aggregate_test_100 WHERE c3/nullif(c4+c5, 0) > 0.1";
2171 let expected = "Projection: #c3 Divide #c4 Plus #c5\
2172 \n Filter: #c3 Divide nullif(#c4 Plus #c5, Int64(0)) Gt Float64(0.1)\
2173 \n TableScan: aggregate_test_100 projection=None";
2174 quick_test(sql, expected);
2175 }
2176
2177 #[test]
select_where_with_negative_operator()2178 fn select_where_with_negative_operator() {
2179 let sql = "SELECT c3 FROM aggregate_test_100 WHERE c3 > -0.1 AND -c4 > 0";
2180 let expected = "Projection: #c3\
2181 \n Filter: #c3 Gt Float64(-0.1) And (- #c4) Gt Int64(0)\
2182 \n TableScan: aggregate_test_100 projection=None";
2183 quick_test(sql, expected);
2184 }
2185
2186 #[test]
select_where_with_positive_operator()2187 fn select_where_with_positive_operator() {
2188 let sql = "SELECT c3 FROM aggregate_test_100 WHERE c3 > +0.1 AND +c4 > 0";
2189 let expected = "Projection: #c3\
2190 \n Filter: #c3 Gt Float64(0.1) And #c4 Gt Int64(0)\
2191 \n TableScan: aggregate_test_100 projection=None";
2192 quick_test(sql, expected);
2193 }
2194
2195 #[test]
select_order_by()2196 fn select_order_by() {
2197 let sql = "SELECT id FROM person ORDER BY id";
2198 let expected = "Sort: #id ASC NULLS FIRST\
2199 \n Projection: #id\
2200 \n TableScan: person projection=None";
2201 quick_test(sql, expected);
2202 }
2203
2204 #[test]
select_order_by_desc()2205 fn select_order_by_desc() {
2206 let sql = "SELECT id FROM person ORDER BY id DESC";
2207 let expected = "Sort: #id DESC NULLS FIRST\
2208 \n Projection: #id\
2209 \n TableScan: person projection=None";
2210 quick_test(sql, expected);
2211 }
2212
2213 #[test]
select_order_by_nulls_last()2214 fn select_order_by_nulls_last() {
2215 quick_test(
2216 "SELECT id FROM person ORDER BY id DESC NULLS LAST",
2217 "Sort: #id DESC NULLS LAST\
2218 \n Projection: #id\
2219 \n TableScan: person projection=None",
2220 );
2221
2222 quick_test(
2223 "SELECT id FROM person ORDER BY id NULLS LAST",
2224 "Sort: #id ASC NULLS LAST\
2225 \n Projection: #id\
2226 \n TableScan: person projection=None",
2227 );
2228 }
2229
2230 #[test]
select_group_by()2231 fn select_group_by() {
2232 let sql = "SELECT state FROM person GROUP BY state";
2233 let expected = "Aggregate: groupBy=[[#state]], aggr=[[]]\
2234 \n TableScan: person projection=None";
2235
2236 quick_test(sql, expected);
2237 }
2238
2239 #[test]
select_group_by_columns_not_in_select()2240 fn select_group_by_columns_not_in_select() {
2241 let sql = "SELECT MAX(age) FROM person GROUP BY state";
2242 let expected = "Projection: #MAX(age)\
2243 \n Aggregate: groupBy=[[#state]], aggr=[[MAX(#age)]]\
2244 \n TableScan: person projection=None";
2245
2246 quick_test(sql, expected);
2247 }
2248
2249 #[test]
select_group_by_count_star()2250 fn select_group_by_count_star() {
2251 let sql = "SELECT state, COUNT(*) FROM person GROUP BY state";
2252 let expected = "Aggregate: groupBy=[[#state]], aggr=[[COUNT(UInt8(1))]]\
2253 \n TableScan: person projection=None";
2254
2255 quick_test(sql, expected);
2256 }
2257
2258 #[test]
select_group_by_needs_projection()2259 fn select_group_by_needs_projection() {
2260 let sql = "SELECT COUNT(state), state FROM person GROUP BY state";
2261 let expected = "\
2262 Projection: #COUNT(state), #state\
2263 \n Aggregate: groupBy=[[#state]], aggr=[[COUNT(#state)]]\
2264 \n TableScan: person projection=None";
2265
2266 quick_test(sql, expected);
2267 }
2268
2269 #[test]
select_7480_1()2270 fn select_7480_1() {
2271 let sql = "SELECT c1, MIN(c12) FROM aggregate_test_100 GROUP BY c1, c13";
2272 let expected = "Projection: #c1, #MIN(c12)\
2273 \n Aggregate: groupBy=[[#c1, #c13]], aggr=[[MIN(#c12)]]\
2274 \n TableScan: aggregate_test_100 projection=None";
2275 quick_test(sql, expected);
2276 }
2277
2278 #[test]
select_7480_2()2279 fn select_7480_2() {
2280 let sql = "SELECT c1, c13, MIN(c12) FROM aggregate_test_100 GROUP BY c1";
2281 let err = logical_plan(sql).expect_err("query should have failed");
2282 assert_eq!(
2283 "Plan(\"Projection references non-aggregate values\")",
2284 format!("{:?}", err)
2285 );
2286 }
2287
2288 #[test]
create_external_table_csv()2289 fn create_external_table_csv() {
2290 let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'";
2291 let expected = "CreateExternalTable: \"t\"";
2292 quick_test(sql, expected);
2293 }
2294
2295 #[test]
create_external_table_csv_no_schema()2296 fn create_external_table_csv_no_schema() {
2297 let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv'";
2298 let err = logical_plan(sql).expect_err("query should have failed");
2299 assert_eq!(
2300 "Plan(\"Column definitions required for CSV files. None found\")",
2301 format!("{:?}", err)
2302 );
2303 }
2304
2305 #[test]
create_external_table_parquet()2306 fn create_external_table_parquet() {
2307 let sql =
2308 "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 'foo.parquet'";
2309 let err = logical_plan(sql).expect_err("query should have failed");
2310 assert_eq!(
2311 "Plan(\"Column definitions can not be specified for PARQUET files.\")",
2312 format!("{:?}", err)
2313 );
2314 }
2315
2316 #[test]
create_external_table_parquet_no_schema()2317 fn create_external_table_parquet_no_schema() {
2318 let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet'";
2319 let expected = "CreateExternalTable: \"t\"";
2320 quick_test(sql, expected);
2321 }
2322
2323 #[test]
equijoin_explicit_syntax()2324 fn equijoin_explicit_syntax() {
2325 let sql = "SELECT id, order_id \
2326 FROM person \
2327 JOIN orders \
2328 ON id = customer_id";
2329 let expected = "Projection: #id, #order_id\
2330 \n Join: id = customer_id\
2331 \n TableScan: person projection=None\
2332 \n TableScan: orders projection=None";
2333 quick_test(sql, expected);
2334 }
2335
2336 #[test]
equijoin_explicit_syntax_3_tables()2337 fn equijoin_explicit_syntax_3_tables() {
2338 let sql = "SELECT id, order_id, l_description \
2339 FROM person \
2340 JOIN orders ON id = customer_id \
2341 JOIN lineitem ON o_item_id = l_item_id";
2342 let expected = "Projection: #id, #order_id, #l_description\
2343 \n Join: o_item_id = l_item_id\
2344 \n Join: id = customer_id\
2345 \n TableScan: person projection=None\
2346 \n TableScan: orders projection=None\
2347 \n TableScan: lineitem projection=None";
2348 quick_test(sql, expected);
2349 }
2350
2351 #[test]
boolean_literal_in_condition_expression()2352 fn boolean_literal_in_condition_expression() {
2353 let sql = "SELECT order_id \
2354 FROM orders \
2355 WHERE delivered = false OR delivered = true";
2356 let expected = "Projection: #order_id\
2357 \n Filter: #delivered Eq Boolean(false) Or #delivered Eq Boolean(true)\
2358 \n TableScan: orders projection=None";
2359 quick_test(sql, expected);
2360 }
2361
2362 #[test]
select_typedstring()2363 fn select_typedstring() {
2364 let sql = "SELECT date '2020-12-10' AS date FROM person";
2365 let expected = "Projection: CAST(Utf8(\"2020-12-10\") AS Date32) AS date\
2366 \n TableScan: person projection=None";
2367 quick_test(sql, expected);
2368 }
2369
logical_plan(sql: &str) -> Result<LogicalPlan>2370 fn logical_plan(sql: &str) -> Result<LogicalPlan> {
2371 let planner = SqlToRel::new(&MockContextProvider {});
2372 let result = DFParser::parse_sql(&sql);
2373 let ast = result.unwrap();
2374 planner.statement_to_plan(&ast[0])
2375 }
2376
2377 /// Create logical plan, write with formatter, compare to expected output
quick_test(sql: &str, expected: &str)2378 fn quick_test(sql: &str, expected: &str) {
2379 let plan = logical_plan(sql).unwrap();
2380 assert_eq!(expected, format!("{:?}", plan));
2381 }
2382
2383 struct MockContextProvider {}
2384
2385 impl ContextProvider for MockContextProvider {
get_table_provider( &self, name: &str, ) -> Option<Arc<dyn TableProvider + Send + Sync>>2386 fn get_table_provider(
2387 &self,
2388 name: &str,
2389 ) -> Option<Arc<dyn TableProvider + Send + Sync>> {
2390 let schema = match name {
2391 "person" => Some(Schema::new(vec![
2392 Field::new("id", DataType::UInt32, false),
2393 Field::new("first_name", DataType::Utf8, false),
2394 Field::new("last_name", DataType::Utf8, false),
2395 Field::new("age", DataType::Int32, false),
2396 Field::new("state", DataType::Utf8, false),
2397 Field::new("salary", DataType::Float64, false),
2398 Field::new(
2399 "birth_date",
2400 DataType::Timestamp(TimeUnit::Nanosecond, None),
2401 false,
2402 ),
2403 ])),
2404 "orders" => Some(Schema::new(vec![
2405 Field::new("order_id", DataType::UInt32, false),
2406 Field::new("customer_id", DataType::UInt32, false),
2407 Field::new("o_item_id", DataType::Utf8, false),
2408 Field::new("qty", DataType::Int32, false),
2409 Field::new("price", DataType::Float64, false),
2410 Field::new("delivered", DataType::Boolean, false),
2411 ])),
2412 "lineitem" => Some(Schema::new(vec![
2413 Field::new("l_item_id", DataType::UInt32, false),
2414 Field::new("l_description", DataType::Utf8, false),
2415 ])),
2416 "aggregate_test_100" => Some(Schema::new(vec![
2417 Field::new("c1", DataType::Utf8, false),
2418 Field::new("c2", DataType::UInt32, false),
2419 Field::new("c3", DataType::Int8, false),
2420 Field::new("c4", DataType::Int16, false),
2421 Field::new("c5", DataType::Int32, false),
2422 Field::new("c6", DataType::Int64, false),
2423 Field::new("c7", DataType::UInt8, false),
2424 Field::new("c8", DataType::UInt16, false),
2425 Field::new("c9", DataType::UInt32, false),
2426 Field::new("c10", DataType::UInt64, false),
2427 Field::new("c11", DataType::Float32, false),
2428 Field::new("c12", DataType::Float64, false),
2429 Field::new("c13", DataType::Utf8, false),
2430 ])),
2431 _ => None,
2432 };
2433 schema.map(|s| -> Arc<dyn TableProvider + Send + Sync> {
2434 Arc::new(EmptyTable::new(Arc::new(s)))
2435 })
2436 }
2437
get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>2438 fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
2439 let f: ScalarFunctionImplementation =
2440 Arc::new(|_| Err(DataFusionError::NotImplemented("".to_string())));
2441 match name {
2442 "my_sqrt" => Some(Arc::new(create_udf(
2443 "my_sqrt",
2444 vec![DataType::Float64],
2445 Arc::new(DataType::Float64),
2446 f,
2447 ))),
2448 _ => None,
2449 }
2450 }
2451
get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>>2452 fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
2453 unimplemented!()
2454 }
2455 }
2456 }
2457