1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 //! SQL Query Planner (produces logical plan from SQL AST)
19
20 use std::sync::Arc;
21
22 use crate::error::{ExecutionError, Result};
23 use crate::logicalplan::{
24 Expr, FunctionMeta, LogicalPlan, LogicalPlanBuilder, Operator, ScalarValue,
25 };
26
27 use arrow::datatypes::*;
28
29 use crate::logicalplan::Expr::Alias;
30 use sqlparser::sqlast::*;
31
32 /// The SchemaProvider trait allows the query planner to obtain meta-data about tables and
33 /// functions referenced in SQL statements
34 pub trait SchemaProvider {
35 /// Getter for a field description
get_table_meta(&self, name: &str) -> Option<Arc<Schema>>36 fn get_table_meta(&self, name: &str) -> Option<Arc<Schema>>;
37 /// Getter for a UDF description
get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>>38 fn get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>>;
39 }
40
41 /// SQL query planner
42 pub struct SqlToRel<S: SchemaProvider> {
43 schema_provider: S,
44 }
45
46 impl<S: SchemaProvider> SqlToRel<S> {
47 /// Create a new query planner
new(schema_provider: S) -> Self48 pub fn new(schema_provider: S) -> Self {
49 SqlToRel { schema_provider }
50 }
51
52 /// Generate a logic plan from a SQL AST node
sql_to_rel(&self, sql: &ASTNode) -> Result<LogicalPlan>53 pub fn sql_to_rel(&self, sql: &ASTNode) -> Result<LogicalPlan> {
54 match *sql {
55 ASTNode::SQLSelect {
56 ref projection,
57 ref relation,
58 ref selection,
59 ref order_by,
60 ref limit,
61 ref group_by,
62 ref having,
63 ..
64 } => {
65 if having.is_some() {
66 return Err(ExecutionError::NotImplemented(
67 "HAVING is not implemented yet".to_string(),
68 ));
69 }
70
71 // parse the input relation so we have access to the row type
72 let plan = match *relation {
73 Some(ref r) => self.sql_to_rel(r)?,
74 None => LogicalPlanBuilder::empty().build()?,
75 };
76
77 // selection first
78 let plan = self.filter(&plan, selection)?;
79
80 let projection_expr: Vec<Expr> = projection
81 .iter()
82 .map(|e| self.sql_to_rex(&e, &plan.schema()))
83 .collect::<Result<Vec<Expr>>>()?;
84
85 let aggr_expr: Vec<Expr> = projection_expr
86 .iter()
87 .filter(|e| is_aggregate_expr(e))
88 .map(|e| e.clone())
89 .collect();
90
91 // apply projection or aggregate
92 let plan = if group_by.is_some() || aggr_expr.len() > 0 {
93 self.aggregate(&plan, projection_expr, group_by, aggr_expr)?
94 } else {
95 self.project(&plan, projection_expr)?
96 };
97
98 // apply ORDER BY
99 let plan = self.order_by(&plan, order_by)?;
100
101 // apply LIMIT
102 self.limit(&plan, limit)
103 }
104
105 ASTNode::SQLIdentifier(ref id) => {
106 match self.schema_provider.get_table_meta(id.as_ref()) {
107 Some(schema) => Ok(LogicalPlanBuilder::scan(
108 "default",
109 id,
110 schema.as_ref(),
111 None,
112 )?
113 .build()?),
114 None => Err(ExecutionError::General(format!(
115 "no schema found for table {}",
116 id
117 ))),
118 }
119 }
120
121 _ => Err(ExecutionError::ExecutionError(format!(
122 "sql_to_rel does not support this relation: {:?}",
123 sql
124 ))),
125 }
126 }
127
128 /// Apply a filter to the plan
filter( &self, plan: &LogicalPlan, selection: &Option<Box<ASTNode>>, ) -> Result<LogicalPlan>129 fn filter(
130 &self,
131 plan: &LogicalPlan,
132 selection: &Option<Box<ASTNode>>,
133 ) -> Result<LogicalPlan> {
134 match *selection {
135 Some(ref filter_expr) => LogicalPlanBuilder::from(&plan)
136 .filter(self.sql_to_rex(filter_expr, &plan.schema())?)?
137 .build(),
138 _ => Ok(plan.clone()),
139 }
140 }
141
142 /// Wrap a plan in a projection
project(&self, input: &LogicalPlan, expr: Vec<Expr>) -> Result<LogicalPlan>143 fn project(&self, input: &LogicalPlan, expr: Vec<Expr>) -> Result<LogicalPlan> {
144 LogicalPlanBuilder::from(input).project(expr)?.build()
145 }
146
147 /// Wrap a plan in an aggregate
aggregate( &self, input: &LogicalPlan, projection_expr: Vec<Expr>, group_by: &Option<Vec<ASTNode>>, aggr_expr: Vec<Expr>, ) -> Result<LogicalPlan>148 fn aggregate(
149 &self,
150 input: &LogicalPlan,
151 projection_expr: Vec<Expr>,
152 group_by: &Option<Vec<ASTNode>>,
153 aggr_expr: Vec<Expr>,
154 ) -> Result<LogicalPlan> {
155 let group_expr: Vec<Expr> = match group_by {
156 Some(gbe) => gbe
157 .iter()
158 .map(|e| self.sql_to_rex(&e, &input.schema()))
159 .collect::<Result<Vec<Expr>>>()?,
160 None => vec![],
161 };
162
163 let group_by_count = group_expr.len();
164 let aggr_count = aggr_expr.len();
165
166 if group_by_count + aggr_count != projection_expr.len() {
167 return Err(ExecutionError::General(
168 "Projection references non-aggregate values".to_owned(),
169 ));
170 }
171
172 let plan = LogicalPlanBuilder::from(&input)
173 .aggregate(group_expr, aggr_expr)?
174 .build()?;
175
176 // wrap in projection to preserve final order of fields
177 let mut projected_fields = Vec::with_capacity(group_by_count + aggr_count);
178 let mut group_expr_index = 0;
179 let mut aggr_expr_index = 0;
180 for i in 0..projection_expr.len() {
181 if is_aggregate_expr(&projection_expr[i]) {
182 projected_fields.push(group_by_count + aggr_expr_index);
183 aggr_expr_index += 1;
184 } else {
185 projected_fields.push(group_expr_index);
186 group_expr_index += 1;
187 }
188 }
189
190 // determine if projection is needed or not
191 // NOTE this would be better done later in a query optimizer rule
192 let mut projection_needed = false;
193 for i in 0..projected_fields.len() {
194 if projected_fields[i] != i {
195 projection_needed = true;
196 break;
197 }
198 }
199
200 if projection_needed {
201 self.project(
202 &plan,
203 projected_fields.iter().map(|i| Expr::Column(*i)).collect(),
204 )
205 } else {
206 Ok(plan)
207 }
208 }
209
210 /// Wrap a plan in a limit
limit( &self, input: &LogicalPlan, limit: &Option<Box<ASTNode>>, ) -> Result<LogicalPlan>211 fn limit(
212 &self,
213 input: &LogicalPlan,
214 limit: &Option<Box<ASTNode>>,
215 ) -> Result<LogicalPlan> {
216 match *limit {
217 Some(ref limit_expr) => {
218 let limit_rex = match self.sql_to_rex(&limit_expr, &input.schema())? {
219 Expr::Literal(ScalarValue::Int64(n)) => {
220 Ok(Expr::Literal(ScalarValue::UInt32(n as u32)))
221 }
222 _ => Err(ExecutionError::General(
223 "Unexpected expression for LIMIT clause".to_string(),
224 )),
225 }?;
226
227 LogicalPlanBuilder::from(&input).limit(limit_rex)?.build()
228 }
229 _ => Ok(input.clone()),
230 }
231 }
232
233 /// Wrap the logical in a sort
order_by( &self, group_by_plan: &LogicalPlan, order_by: &Option<Vec<SQLOrderByExpr>>, ) -> Result<LogicalPlan>234 fn order_by(
235 &self,
236 group_by_plan: &LogicalPlan,
237 order_by: &Option<Vec<SQLOrderByExpr>>,
238 ) -> Result<LogicalPlan> {
239 match *order_by {
240 Some(ref order_by_expr) => {
241 let input_schema = group_by_plan.schema();
242 let order_by_rex: Result<Vec<Expr>> = order_by_expr
243 .iter()
244 .map(|e| {
245 Ok(Expr::Sort {
246 expr: Box::new(
247 self.sql_to_rex(&e.expr, &input_schema).unwrap(),
248 ),
249 asc: e.asc,
250 })
251 })
252 .collect();
253
254 LogicalPlanBuilder::from(&group_by_plan)
255 .sort(order_by_rex?)?
256 .build()
257 }
258 _ => Ok(group_by_plan.clone()),
259 }
260 }
261
262 /// Generate a relational expression from a SQL expression
sql_to_rex(&self, sql: &ASTNode, schema: &Schema) -> Result<Expr>263 pub fn sql_to_rex(&self, sql: &ASTNode, schema: &Schema) -> Result<Expr> {
264 match *sql {
265 ASTNode::SQLValue(sqlparser::sqlast::Value::Long(n)) => {
266 Ok(Expr::Literal(ScalarValue::Int64(n)))
267 }
268 ASTNode::SQLValue(sqlparser::sqlast::Value::Double(n)) => {
269 Ok(Expr::Literal(ScalarValue::Float64(n)))
270 }
271 ASTNode::SQLValue(sqlparser::sqlast::Value::SingleQuotedString(ref s)) => {
272 Ok(Expr::Literal(ScalarValue::Utf8(s.clone())))
273 }
274
275 ASTNode::SQLAliasedExpr(ref expr, ref alias) => Ok(Alias(
276 Box::new(self.sql_to_rex(&expr, schema)?),
277 alias.to_owned(),
278 )),
279
280 ASTNode::SQLIdentifier(ref id) => {
281 match schema.fields().iter().position(|c| c.name().eq(id)) {
282 Some(index) => Ok(Expr::Column(index)),
283 None => Err(ExecutionError::ExecutionError(format!(
284 "Invalid identifier '{}' for schema {}",
285 id,
286 schema.to_string()
287 ))),
288 }
289 }
290
291 ASTNode::SQLWildcard => Ok(Expr::Wildcard),
292
293 ASTNode::SQLCast {
294 ref expr,
295 ref data_type,
296 } => Ok(Expr::Cast {
297 expr: Box::new(self.sql_to_rex(&expr, schema)?),
298 data_type: convert_data_type(data_type)?,
299 }),
300
301 ASTNode::SQLIsNull(ref expr) => {
302 Ok(Expr::IsNull(Box::new(self.sql_to_rex(expr, schema)?)))
303 }
304
305 ASTNode::SQLIsNotNull(ref expr) => {
306 Ok(Expr::IsNotNull(Box::new(self.sql_to_rex(expr, schema)?)))
307 }
308
309 ASTNode::SQLUnary {
310 ref operator,
311 ref expr,
312 } => match *operator {
313 SQLOperator::Not => {
314 Ok(Expr::Not(Box::new(self.sql_to_rex(expr, schema)?)))
315 }
316 _ => Err(ExecutionError::InternalError(format!(
317 "SQL binary operator cannot be interpreted as a unary operator"
318 ))),
319 },
320
321 ASTNode::SQLBinaryExpr {
322 ref left,
323 ref op,
324 ref right,
325 } => {
326 let operator = match *op {
327 SQLOperator::Gt => Operator::Gt,
328 SQLOperator::GtEq => Operator::GtEq,
329 SQLOperator::Lt => Operator::Lt,
330 SQLOperator::LtEq => Operator::LtEq,
331 SQLOperator::Eq => Operator::Eq,
332 SQLOperator::NotEq => Operator::NotEq,
333 SQLOperator::Plus => Operator::Plus,
334 SQLOperator::Minus => Operator::Minus,
335 SQLOperator::Multiply => Operator::Multiply,
336 SQLOperator::Divide => Operator::Divide,
337 SQLOperator::Modulus => Operator::Modulus,
338 SQLOperator::And => Operator::And,
339 SQLOperator::Or => Operator::Or,
340 SQLOperator::Not => Operator::Not,
341 SQLOperator::Like => Operator::Like,
342 SQLOperator::NotLike => Operator::NotLike,
343 };
344
345 match operator {
346 Operator::Not => Err(ExecutionError::InternalError(format!(
347 "SQL unary operator \"NOT\" cannot be interpreted as a binary operator"
348 ))),
349 _ => Ok(Expr::BinaryExpr {
350 left: Box::new(self.sql_to_rex(&left, &schema)?),
351 op: operator,
352 right: Box::new(self.sql_to_rex(&right, &schema)?),
353 })
354 }
355 }
356
357 // &ASTNode::SQLOrderBy { ref expr, asc } => Ok(Expr::Sort {
358 // expr: Box::new(self.sql_to_rex(&expr, &schema)?),
359 // asc,
360 // }),
361 ASTNode::SQLFunction { ref id, ref args } => {
362 //TODO: fix this hack
363 match id.to_lowercase().as_ref() {
364 "min" | "max" | "sum" | "avg" => {
365 let rex_args = args
366 .iter()
367 .map(|a| self.sql_to_rex(a, schema))
368 .collect::<Result<Vec<Expr>>>()?;
369
370 // return type is same as the argument type for these aggregate
371 // functions
372 let return_type = rex_args[0].get_type(schema)?.clone();
373
374 Ok(Expr::AggregateFunction {
375 name: id.clone(),
376 args: rex_args,
377 return_type,
378 })
379 }
380 "count" => {
381 let rex_args = args
382 .iter()
383 .map(|a| match a {
384 ASTNode::SQLValue(sqlparser::sqlast::Value::Long(_)) => {
385 Ok(Expr::Literal(ScalarValue::UInt8(1)))
386 }
387 ASTNode::SQLWildcard => {
388 Ok(Expr::Literal(ScalarValue::UInt8(1)))
389 }
390 _ => self.sql_to_rex(a, schema),
391 })
392 .collect::<Result<Vec<Expr>>>()?;
393
394 Ok(Expr::AggregateFunction {
395 name: id.clone(),
396 args: rex_args,
397 return_type: DataType::UInt64,
398 })
399 }
400 _ => match self.schema_provider.get_function_meta(id) {
401 Some(fm) => {
402 let rex_args = args
403 .iter()
404 .map(|a| self.sql_to_rex(a, schema))
405 .collect::<Result<Vec<Expr>>>()?;
406
407 let mut safe_args: Vec<Expr> = vec![];
408 for i in 0..rex_args.len() {
409 safe_args.push(
410 rex_args[i]
411 .cast_to(fm.args()[i].data_type(), schema)?,
412 );
413 }
414
415 Ok(Expr::ScalarFunction {
416 name: id.clone(),
417 args: safe_args,
418 return_type: fm.return_type().clone(),
419 })
420 }
421 _ => Err(ExecutionError::General(format!(
422 "Invalid function '{}'",
423 id
424 ))),
425 },
426 }
427 }
428
429 _ => Err(ExecutionError::General(format!(
430 "Unsupported ast node {:?} in sqltorel",
431 sql
432 ))),
433 }
434 }
435 }
436
437 /// Determine if an expression is an aggregate expression or not
is_aggregate_expr(e: &Expr) -> bool438 fn is_aggregate_expr(e: &Expr) -> bool {
439 match e {
440 Expr::AggregateFunction { .. } => true,
441 _ => false,
442 }
443 }
444
445 /// Convert SQL data type to relational representation of data type
convert_data_type(sql: &SQLType) -> Result<DataType>446 pub fn convert_data_type(sql: &SQLType) -> Result<DataType> {
447 match sql {
448 SQLType::Boolean => Ok(DataType::Boolean),
449 SQLType::SmallInt => Ok(DataType::Int16),
450 SQLType::Int => Ok(DataType::Int32),
451 SQLType::BigInt => Ok(DataType::Int64),
452 SQLType::Float(_) | SQLType::Real => Ok(DataType::Float64),
453 SQLType::Double => Ok(DataType::Float64),
454 SQLType::Char(_) | SQLType::Varchar(_) => Ok(DataType::Utf8),
455 SQLType::Timestamp => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
456 other => Err(ExecutionError::NotImplemented(format!(
457 "Unsupported SQL type {:?}",
458 other
459 ))),
460 }
461 }
462
463 #[cfg(test)]
464 mod tests {
465
466 use super::*;
467 use crate::logicalplan::FunctionType;
468 use sqlparser::sqlparser::*;
469
470 #[test]
select_no_relation()471 fn select_no_relation() {
472 quick_test(
473 "SELECT 1",
474 "Projection: Int64(1)\
475 \n EmptyRelation",
476 );
477 }
478
479 #[test]
select_scalar_func_with_literal_no_relation()480 fn select_scalar_func_with_literal_no_relation() {
481 quick_test(
482 "SELECT sqrt(9)",
483 "Projection: sqrt(CAST(Int64(9) AS Float64))\
484 \n EmptyRelation",
485 );
486 }
487
488 #[test]
select_simple_selection()489 fn select_simple_selection() {
490 let sql = "SELECT id, first_name, last_name \
491 FROM person WHERE state = 'CO'";
492 let expected = "Projection: #0, #1, #2\
493 \n Selection: #4 Eq Utf8(\"CO\")\
494 \n TableScan: person projection=None";
495 quick_test(sql, expected);
496 }
497
498 #[test]
select_neg_selection()499 fn select_neg_selection() {
500 let sql = "SELECT id, first_name, last_name \
501 FROM person WHERE NOT state";
502 let expected = "Projection: #0, #1, #2\
503 \n Selection: NOT #4\
504 \n TableScan: person projection=None";
505 quick_test(sql, expected);
506 }
507
508 #[test]
select_compound_selection()509 fn select_compound_selection() {
510 let sql = "SELECT id, first_name, last_name \
511 FROM person WHERE state = 'CO' AND age >= 21 AND age <= 65";
512 let expected = "Projection: #0, #1, #2\
513 \n Selection: #4 Eq Utf8(\"CO\") And #3 GtEq Int64(21) And #3 LtEq Int64(65)\
514 \n TableScan: person projection=None";
515 quick_test(sql, expected);
516 }
517
518 #[test]
test_timestamp_selection()519 fn test_timestamp_selection() {
520 let sql = "SELECT state FROM person WHERE birth_date < CAST (158412331400600000 as timestamp)";
521
522 let expected = "Projection: #4\
523 \n Selection: #6 Lt CAST(Int64(158412331400600000) AS Timestamp(Nanosecond, None))\
524 \n TableScan: person projection=None";
525
526 quick_test(sql, expected);
527 }
528
529 #[test]
select_all_boolean_operators()530 fn select_all_boolean_operators() {
531 let sql = "SELECT age, first_name, last_name \
532 FROM person \
533 WHERE age = 21 \
534 AND age != 21 \
535 AND age > 21 \
536 AND age >= 21 \
537 AND age < 65 \
538 AND age <= 65";
539 let expected = "Projection: #3, #1, #2\
540 \n Selection: #3 Eq Int64(21) \
541 And #3 NotEq Int64(21) \
542 And #3 Gt Int64(21) \
543 And #3 GtEq Int64(21) \
544 And #3 Lt Int64(65) \
545 And #3 LtEq Int64(65)\
546 \n TableScan: person projection=None";
547 quick_test(sql, expected);
548 }
549
550 #[test]
select_simple_aggregate()551 fn select_simple_aggregate() {
552 quick_test(
553 "SELECT MIN(age) FROM person",
554 "Aggregate: groupBy=[[]], aggr=[[MIN(#3)]]\
555 \n TableScan: person projection=None",
556 );
557 }
558
559 #[test]
test_sum_aggregate()560 fn test_sum_aggregate() {
561 quick_test(
562 "SELECT SUM(age) from person",
563 "Aggregate: groupBy=[[]], aggr=[[SUM(#3)]]\
564 \n TableScan: person projection=None",
565 );
566 }
567
568 #[test]
select_simple_aggregate_with_groupby()569 fn select_simple_aggregate_with_groupby() {
570 quick_test(
571 "SELECT state, MIN(age), MAX(age) FROM person GROUP BY state",
572 "Aggregate: groupBy=[[#4]], aggr=[[MIN(#3), MAX(#3)]]\
573 \n TableScan: person projection=None",
574 );
575 }
576
577 #[test]
test_wildcard()578 fn test_wildcard() {
579 quick_test(
580 "SELECT * from person",
581 "Projection: #0, #1, #2, #3, #4, #5, #6\
582 \n TableScan: person projection=None",
583 );
584 }
585
586 #[test]
select_count_one()587 fn select_count_one() {
588 let sql = "SELECT COUNT(1) FROM person";
589 let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
590 \n TableScan: person projection=None";
591 quick_test(sql, expected);
592 }
593
594 #[test]
select_count_column()595 fn select_count_column() {
596 let sql = "SELECT COUNT(id) FROM person";
597 let expected = "Aggregate: groupBy=[[]], aggr=[[COUNT(#0)]]\
598 \n TableScan: person projection=None";
599 quick_test(sql, expected);
600 }
601
602 #[test]
select_scalar_func()603 fn select_scalar_func() {
604 let sql = "SELECT sqrt(age) FROM person";
605 let expected = "Projection: sqrt(CAST(#3 AS Float64))\
606 \n TableScan: person projection=None";
607 quick_test(sql, expected);
608 }
609
610 #[test]
select_aliased_scalar_func()611 fn select_aliased_scalar_func() {
612 let sql = "SELECT sqrt(age) AS square_people FROM person";
613 let expected = "Projection: sqrt(CAST(#3 AS Float64)) AS square_people\
614 \n TableScan: person projection=None";
615 quick_test(sql, expected);
616 }
617
618 #[test]
select_order_by()619 fn select_order_by() {
620 let sql = "SELECT id FROM person ORDER BY id";
621 let expected = "Sort: #0 ASC\
622 \n Projection: #0\
623 \n TableScan: person projection=None";
624 quick_test(sql, expected);
625 }
626
627 #[test]
select_order_by_desc()628 fn select_order_by_desc() {
629 let sql = "SELECT id FROM person ORDER BY id DESC";
630 let expected = "Sort: #0 DESC\
631 \n Projection: #0\
632 \n TableScan: person projection=None";
633 quick_test(sql, expected);
634 }
635
636 #[test]
select_group_by()637 fn select_group_by() {
638 let sql = "SELECT state FROM person GROUP BY state";
639 let expected = "Aggregate: groupBy=[[#4]], aggr=[[]]\
640 \n TableScan: person projection=None";
641
642 quick_test(sql, expected);
643 }
644
645 #[test]
select_7480_1()646 fn select_7480_1() {
647 let sql = "SELECT c1, MIN(c12) FROM aggregate_test_100 GROUP BY c1, c13";
648 let err = logical_plan(sql).expect_err("query should have failed");
649 assert_eq!(
650 "General(\"Projection references non-aggregate values\")",
651 format!("{:?}", err)
652 );
653 }
654
655 #[test]
select_7480_2()656 fn select_7480_2() {
657 let sql = "SELECT c1, c13, MIN(c12) FROM aggregate_test_100 GROUP BY c1";
658 let err = logical_plan(sql).expect_err("query should have failed");
659 assert_eq!(
660 "General(\"Projection references non-aggregate values\")",
661 format!("{:?}", err)
662 );
663 }
664
logical_plan(sql: &str) -> Result<LogicalPlan>665 fn logical_plan(sql: &str) -> Result<LogicalPlan> {
666 use sqlparser::dialect::*;
667 let dialect = GenericSqlDialect {};
668 let planner = SqlToRel::new(MockSchemaProvider {});
669 let ast = Parser::parse_sql(&dialect, sql.to_string()).unwrap();
670 planner.sql_to_rel(&ast)
671 }
672
673 /// Create logical plan, write with formatter, compare to expected output
quick_test(sql: &str, expected: &str)674 fn quick_test(sql: &str, expected: &str) {
675 let plan = logical_plan(sql).unwrap();
676 assert_eq!(expected, format!("{:?}", plan));
677 }
678
679 struct MockSchemaProvider {}
680
681 impl SchemaProvider for MockSchemaProvider {
get_table_meta(&self, name: &str) -> Option<Arc<Schema>>682 fn get_table_meta(&self, name: &str) -> Option<Arc<Schema>> {
683 match name {
684 "person" => Some(Arc::new(Schema::new(vec![
685 Field::new("id", DataType::UInt32, false),
686 Field::new("first_name", DataType::Utf8, false),
687 Field::new("last_name", DataType::Utf8, false),
688 Field::new("age", DataType::Int32, false),
689 Field::new("state", DataType::Utf8, false),
690 Field::new("salary", DataType::Float64, false),
691 Field::new(
692 "birth_date",
693 DataType::Timestamp(TimeUnit::Nanosecond, None),
694 false,
695 ),
696 ]))),
697 "aggregate_test_100" => Some(Arc::new(Schema::new(vec![
698 Field::new("c1", DataType::Utf8, false),
699 Field::new("c2", DataType::UInt32, false),
700 Field::new("c3", DataType::Int8, false),
701 Field::new("c4", DataType::Int16, false),
702 Field::new("c5", DataType::Int32, false),
703 Field::new("c6", DataType::Int64, false),
704 Field::new("c7", DataType::UInt8, false),
705 Field::new("c8", DataType::UInt16, false),
706 Field::new("c9", DataType::UInt32, false),
707 Field::new("c10", DataType::UInt64, false),
708 Field::new("c11", DataType::Float32, false),
709 Field::new("c12", DataType::Float64, false),
710 Field::new("c13", DataType::Utf8, false),
711 ]))),
712 _ => None,
713 }
714 }
715
get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>>716 fn get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>> {
717 match name {
718 "sqrt" => Some(Arc::new(FunctionMeta::new(
719 "sqrt".to_string(),
720 vec![Field::new("n", DataType::Float64, false)],
721 DataType::Float64,
722 FunctionType::Scalar,
723 ))),
724 _ => None,
725 }
726 }
727 }
728 }
729