1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 //! Physical query planner
19 
20 use std::sync::Arc;
21 
22 use super::{aggregates, empty::EmptyExec, expressions::binary, functions, udaf};
23 use crate::error::{DataFusionError, Result};
24 use crate::execution::context::ExecutionContextState;
25 use crate::logical_plan::{
26     DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType,
27     StringifiedPlan, UserDefinedLogicalNode,
28 };
29 use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
30 use crate::physical_plan::explain::ExplainExec;
31 use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr};
32 use crate::physical_plan::filter::FilterExec;
33 use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
34 use crate::physical_plan::hash_join::HashJoinExec;
35 use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
36 use crate::physical_plan::merge::MergeExec;
37 use crate::physical_plan::projection::ProjectionExec;
38 use crate::physical_plan::repartition::RepartitionExec;
39 use crate::physical_plan::sort::SortExec;
40 use crate::physical_plan::udf;
41 use crate::physical_plan::{expressions, Distribution};
42 use crate::physical_plan::{hash_utils, Partitioning};
43 use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner};
44 use crate::prelude::JoinType;
45 use crate::scalar::ScalarValue;
46 use crate::variable::VarType;
47 use arrow::compute::can_cast_types;
48 
49 use arrow::compute::SortOptions;
50 use arrow::datatypes::{Schema, SchemaRef};
51 use expressions::col;
52 
53 /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`].
54 pub trait ExtensionPlanner {
55     /// Create a physical plan for a [`UserDefinedLogicalNode`].
56     /// This errors when the planner knows how to plan the concrete implementation of `node`
57     /// but errors while doing so, and `None` when the planner does not know how to plan the `node`
58     /// and wants to delegate the planning to another [`ExtensionPlanner`].
plan_extension( &self, node: &dyn UserDefinedLogicalNode, inputs: &[Arc<dyn ExecutionPlan>], ctx_state: &ExecutionContextState, ) -> Result<Option<Arc<dyn ExecutionPlan>>>59     fn plan_extension(
60         &self,
61         node: &dyn UserDefinedLogicalNode,
62         inputs: &[Arc<dyn ExecutionPlan>],
63         ctx_state: &ExecutionContextState,
64     ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
65 }
66 
67 /// Default single node physical query planner that converts a
68 /// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
69 pub struct DefaultPhysicalPlanner {
70     extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
71 }
72 
73 impl Default for DefaultPhysicalPlanner {
default() -> Self74     fn default() -> Self {
75         Self {
76             extension_planners: vec![],
77         }
78     }
79 }
80 
81 impl PhysicalPlanner for DefaultPhysicalPlanner {
82     /// Create a physical plan from a logical plan
create_physical_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>>83     fn create_physical_plan(
84         &self,
85         logical_plan: &LogicalPlan,
86         ctx_state: &ExecutionContextState,
87     ) -> Result<Arc<dyn ExecutionPlan>> {
88         let plan = self.create_initial_plan(logical_plan, ctx_state)?;
89         self.optimize_plan(plan, ctx_state)
90     }
91 }
92 
93 impl DefaultPhysicalPlanner {
94     /// Create a physical planner that uses `extension_planners` to
95     /// plan user-defined logical nodes [`LogicalPlan::Extension`].
96     /// The planner uses the first [`ExtensionPlanner`] to return a non-`None`
97     /// plan.
with_extension_planners( extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>, ) -> Self98     pub fn with_extension_planners(
99         extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
100     ) -> Self {
101         Self { extension_planners }
102     }
103 
104     /// Create a physical plan from a logical plan
optimize_plan( &self, plan: Arc<dyn ExecutionPlan>, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>>105     fn optimize_plan(
106         &self,
107         plan: Arc<dyn ExecutionPlan>,
108         ctx_state: &ExecutionContextState,
109     ) -> Result<Arc<dyn ExecutionPlan>> {
110         let children = plan
111             .children()
112             .iter()
113             .map(|child| self.optimize_plan(child.clone(), ctx_state))
114             .collect::<Result<Vec<_>>>()?;
115 
116         if children.is_empty() {
117             // leaf node, children cannot be replaced
118             Ok(plan.clone())
119         } else {
120             // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have
121             // highly selective filters
122             let plan_any = plan.as_any();
123             //TODO we should do this in a more generic way either by wrapping all operators
124             // or having an API so that operators can declare when their inputs or outputs
125             // need to be wrapped in a coalesce batches operator.
126             // See https://issues.apache.org/jira/browse/ARROW-11068
127             let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
128                 || plan_any.downcast_ref::<HashJoinExec>().is_some()
129                 || plan_any.downcast_ref::<RepartitionExec>().is_some();
130 
131             //TODO we should also do this for HashAggregateExec but we need to update tests
132             // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068
133             // || plan_any.downcast_ref::<HashAggregateExec>().is_some();
134 
135             let plan = if wrap_in_coalesce {
136                 //TODO we should add specific configuration settings for coalescing batches and
137                 // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
138                 // implemented. For now, we choose half the configured batch size to avoid copies
139                 // when a small number of rows are removed from a batch
140                 let target_batch_size = ctx_state.config.batch_size / 2;
141                 Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
142             } else {
143                 plan.clone()
144             };
145 
146             let children = plan.children().clone();
147 
148             match plan.required_child_distribution() {
149                 Distribution::UnspecifiedDistribution => plan.with_new_children(children),
150                 Distribution::SinglePartition => plan.with_new_children(
151                     children
152                         .iter()
153                         .map(|child| {
154                             if child.output_partitioning().partition_count() == 1 {
155                                 child.clone()
156                             } else {
157                                 Arc::new(MergeExec::new(child.clone()))
158                             }
159                         })
160                         .collect(),
161                 ),
162             }
163         }
164     }
165 
166     /// Create a physical plan from a logical plan
create_initial_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>>167     fn create_initial_plan(
168         &self,
169         logical_plan: &LogicalPlan,
170         ctx_state: &ExecutionContextState,
171     ) -> Result<Arc<dyn ExecutionPlan>> {
172         let batch_size = ctx_state.config.batch_size;
173 
174         match logical_plan {
175             LogicalPlan::TableScan {
176                 source,
177                 projection,
178                 filters,
179                 ..
180             } => source.scan(projection, batch_size, filters),
181             LogicalPlan::Aggregate {
182                 input,
183                 group_expr,
184                 aggr_expr,
185                 ..
186             } => {
187                 // Initially need to perform the aggregate and then merge the partitions
188                 let input_exec = self.create_physical_plan(input, ctx_state)?;
189                 let input_schema = input_exec.schema();
190                 let physical_input_schema = input_exec.as_ref().schema();
191                 let logical_input_schema = input.as_ref().schema();
192 
193                 let groups = group_expr
194                     .iter()
195                     .map(|e| {
196                         tuple_err((
197                             self.create_physical_expr(
198                                 e,
199                                 &physical_input_schema,
200                                 ctx_state,
201                             ),
202                             e.name(&logical_input_schema),
203                         ))
204                     })
205                     .collect::<Result<Vec<_>>>()?;
206                 let aggregates = aggr_expr
207                     .iter()
208                     .map(|e| {
209                         self.create_aggregate_expr(
210                             e,
211                             &logical_input_schema,
212                             &physical_input_schema,
213                             ctx_state,
214                         )
215                     })
216                     .collect::<Result<Vec<_>>>()?;
217 
218                 let initial_aggr = Arc::new(HashAggregateExec::try_new(
219                     AggregateMode::Partial,
220                     groups.clone(),
221                     aggregates.clone(),
222                     input_exec,
223                     input_schema.clone(),
224                 )?);
225 
226                 let final_group: Vec<Arc<dyn PhysicalExpr>> =
227                     (0..groups.len()).map(|i| col(&groups[i].1)).collect();
228 
229                 // construct a second aggregation, keeping the final column name equal to the first aggregation
230                 // and the expressions corresponding to the respective aggregate
231                 Ok(Arc::new(HashAggregateExec::try_new(
232                     AggregateMode::Final,
233                     final_group
234                         .iter()
235                         .enumerate()
236                         .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
237                         .collect(),
238                     aggregates,
239                     initial_aggr,
240                     input_schema,
241                 )?))
242             }
243             LogicalPlan::Projection { input, expr, .. } => {
244                 let input_exec = self.create_physical_plan(input, ctx_state)?;
245                 let input_schema = input.as_ref().schema();
246                 let runtime_expr = expr
247                     .iter()
248                     .map(|e| {
249                         tuple_err((
250                             self.create_physical_expr(
251                                 e,
252                                 &input_exec.schema(),
253                                 &ctx_state,
254                             ),
255                             e.name(&input_schema),
256                         ))
257                     })
258                     .collect::<Result<Vec<_>>>()?;
259                 Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input_exec)?))
260             }
261             LogicalPlan::Filter {
262                 input, predicate, ..
263             } => {
264                 let input = self.create_physical_plan(input, ctx_state)?;
265                 let input_schema = input.as_ref().schema();
266                 let runtime_expr =
267                     self.create_physical_expr(predicate, &input_schema, ctx_state)?;
268                 Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?))
269             }
270             LogicalPlan::Repartition {
271                 input,
272                 partitioning_scheme,
273             } => {
274                 let input = self.create_physical_plan(input, ctx_state)?;
275                 let input_schema = input.schema();
276                 let physical_partitioning = match partitioning_scheme {
277                     LogicalPartitioning::RoundRobinBatch(n) => {
278                         Partitioning::RoundRobinBatch(*n)
279                     }
280                     LogicalPartitioning::Hash(expr, n) => {
281                         let runtime_expr = expr
282                             .iter()
283                             .map(|e| {
284                                 self.create_physical_expr(e, &input_schema, &ctx_state)
285                             })
286                             .collect::<Result<Vec<_>>>()?;
287                         Partitioning::Hash(runtime_expr, *n)
288                     }
289                 };
290                 Ok(Arc::new(RepartitionExec::try_new(
291                     input,
292                     physical_partitioning,
293                 )?))
294             }
295             LogicalPlan::Sort { expr, input, .. } => {
296                 let input = self.create_physical_plan(input, ctx_state)?;
297                 let input_schema = input.as_ref().schema();
298 
299                 let sort_expr = expr
300                     .iter()
301                     .map(|e| match e {
302                         Expr::Sort {
303                             expr,
304                             asc,
305                             nulls_first,
306                         } => self.create_physical_sort_expr(
307                             expr,
308                             &input_schema,
309                             SortOptions {
310                                 descending: !*asc,
311                                 nulls_first: *nulls_first,
312                             },
313                             ctx_state,
314                         ),
315                         _ => Err(DataFusionError::Plan(
316                             "Sort only accepts sort expressions".to_string(),
317                         )),
318                     })
319                     .collect::<Result<Vec<_>>>()?;
320 
321                 Ok(Arc::new(SortExec::try_new(sort_expr, input)?))
322             }
323             LogicalPlan::Join {
324                 left,
325                 right,
326                 on: keys,
327                 join_type,
328                 ..
329             } => {
330                 let left = self.create_physical_plan(left, ctx_state)?;
331                 let right = self.create_physical_plan(right, ctx_state)?;
332                 let physical_join_type = match join_type {
333                     JoinType::Inner => hash_utils::JoinType::Inner,
334                     JoinType::Left => hash_utils::JoinType::Left,
335                     JoinType::Right => hash_utils::JoinType::Right,
336                 };
337 
338                 Ok(Arc::new(HashJoinExec::try_new(
339                     left,
340                     right,
341                     &keys,
342                     &physical_join_type,
343                 )?))
344             }
345             LogicalPlan::EmptyRelation {
346                 produce_one_row,
347                 schema,
348             } => Ok(Arc::new(EmptyExec::new(
349                 *produce_one_row,
350                 SchemaRef::new(schema.as_ref().to_owned().into()),
351             ))),
352             LogicalPlan::Limit { input, n, .. } => {
353                 let limit = *n;
354                 let input = self.create_physical_plan(input, ctx_state)?;
355 
356                 // GlobalLimitExec requires a single partition for input
357                 let input = if input.output_partitioning().partition_count() == 1 {
358                     input
359                 } else {
360                     // Apply a LocalLimitExec to each partition. The optimizer will also insert
361                     // a MergeExec between the GlobalLimitExec and LocalLimitExec
362                     Arc::new(LocalLimitExec::new(input, limit))
363                 };
364 
365                 Ok(Arc::new(GlobalLimitExec::new(input, limit)))
366             }
367             LogicalPlan::CreateExternalTable { .. } => {
368                 // There is no default plan for "CREATE EXTERNAL
369                 // TABLE" -- it must be handled at a higher level (so
370                 // that the appropriate table can be registered with
371                 // the context)
372                 Err(DataFusionError::Internal(
373                     "Unsupported logical plan: CreateExternalTable".to_string(),
374                 ))
375             }
376             LogicalPlan::Explain {
377                 verbose,
378                 plan,
379                 stringified_plans,
380                 schema,
381             } => {
382                 let input = self.create_physical_plan(plan, ctx_state)?;
383 
384                 let mut stringified_plans = stringified_plans
385                     .iter()
386                     .filter(|s| s.should_display(*verbose))
387                     .cloned()
388                     .collect::<Vec<_>>();
389 
390                 // add in the physical plan if requested
391                 if *verbose {
392                     stringified_plans.push(StringifiedPlan::new(
393                         PlanType::PhysicalPlan,
394                         format!("{:#?}", input),
395                     ));
396                 }
397                 Ok(Arc::new(ExplainExec::new(
398                     SchemaRef::new(schema.as_ref().to_owned().into()),
399                     stringified_plans,
400                 )))
401             }
402             LogicalPlan::Extension { node } => {
403                 let inputs = node
404                     .inputs()
405                     .into_iter()
406                     .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
407                     .collect::<Result<Vec<_>>>()?;
408 
409                 let maybe_plan = self.extension_planners.iter().try_fold(
410                     None,
411                     |maybe_plan, planner| {
412                         if let Some(plan) = maybe_plan {
413                             Ok(Some(plan))
414                         } else {
415                             planner.plan_extension(node.as_ref(), &inputs, ctx_state)
416                         }
417                     },
418                 )?;
419                 let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!(
420                     "No installed planner was able to convert the custom node to an execution plan: {:?}", node
421                 )))?;
422 
423                 // Ensure the ExecutionPlan's  schema matches the
424                 // declared logical schema to catch and warn about
425                 // logic errors when creating user defined plans.
426                 if plan.schema() != node.schema().as_ref().to_owned().into() {
427                     Err(DataFusionError::Plan(format!(
428                         "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
429                          LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
430                         node, node.schema(), plan.schema()
431                     )))
432                 } else {
433                     Ok(plan)
434                 }
435             }
436         }
437     }
438 
439     /// Create a physical expression from a logical expression
create_physical_expr( &self, e: &Expr, input_schema: &Schema, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn PhysicalExpr>>440     pub fn create_physical_expr(
441         &self,
442         e: &Expr,
443         input_schema: &Schema,
444         ctx_state: &ExecutionContextState,
445     ) -> Result<Arc<dyn PhysicalExpr>> {
446         match e {
447             Expr::Alias(expr, ..) => {
448                 Ok(self.create_physical_expr(expr, input_schema, ctx_state)?)
449             }
450             Expr::Column(name) => {
451                 // check that name exists
452                 input_schema.field_with_name(&name)?;
453                 Ok(Arc::new(Column::new(name)))
454             }
455             Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
456             Expr::ScalarVariable(variable_names) => {
457                 if &variable_names[0][0..2] == "@@" {
458                     match ctx_state.var_provider.get(&VarType::System) {
459                         Some(provider) => {
460                             let scalar_value =
461                                 provider.get_value(variable_names.clone())?;
462                             Ok(Arc::new(Literal::new(scalar_value)))
463                         }
464                         _ => Err(DataFusionError::Plan(
465                             "No system variable provider found".to_string(),
466                         )),
467                     }
468                 } else {
469                     match ctx_state.var_provider.get(&VarType::UserDefined) {
470                         Some(provider) => {
471                             let scalar_value =
472                                 provider.get_value(variable_names.clone())?;
473                             Ok(Arc::new(Literal::new(scalar_value)))
474                         }
475                         _ => Err(DataFusionError::Plan(
476                             "No user defined variable provider found".to_string(),
477                         )),
478                     }
479                 }
480             }
481             Expr::BinaryExpr { left, op, right } => {
482                 let lhs = self.create_physical_expr(left, input_schema, ctx_state)?;
483                 let rhs = self.create_physical_expr(right, input_schema, ctx_state)?;
484                 binary(lhs, *op, rhs, input_schema)
485             }
486             Expr::Case {
487                 expr,
488                 when_then_expr,
489                 else_expr,
490                 ..
491             } => {
492                 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = expr {
493                     Some(self.create_physical_expr(
494                         e.as_ref(),
495                         input_schema,
496                         ctx_state,
497                     )?)
498                 } else {
499                     None
500                 };
501                 let when_expr = when_then_expr
502                     .iter()
503                     .map(|(w, _)| {
504                         self.create_physical_expr(w.as_ref(), input_schema, ctx_state)
505                     })
506                     .collect::<Result<Vec<_>>>()?;
507                 let then_expr = when_then_expr
508                     .iter()
509                     .map(|(_, t)| {
510                         self.create_physical_expr(t.as_ref(), input_schema, ctx_state)
511                     })
512                     .collect::<Result<Vec<_>>>()?;
513                 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
514                     when_expr
515                         .iter()
516                         .zip(then_expr.iter())
517                         .map(|(w, t)| (w.clone(), t.clone()))
518                         .collect();
519                 let else_expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = else_expr
520                 {
521                     Some(self.create_physical_expr(
522                         e.as_ref(),
523                         input_schema,
524                         ctx_state,
525                     )?)
526                 } else {
527                     None
528                 };
529                 Ok(Arc::new(CaseExpr::try_new(
530                     expr,
531                     &when_then_expr,
532                     else_expr,
533                 )?))
534             }
535             Expr::Cast { expr, data_type } => expressions::cast(
536                 self.create_physical_expr(expr, input_schema, ctx_state)?,
537                 input_schema,
538                 data_type.clone(),
539             ),
540             Expr::Not(expr) => expressions::not(
541                 self.create_physical_expr(expr, input_schema, ctx_state)?,
542                 input_schema,
543             ),
544             Expr::Negative(expr) => expressions::negative(
545                 self.create_physical_expr(expr, input_schema, ctx_state)?,
546                 input_schema,
547             ),
548             Expr::IsNull(expr) => expressions::is_null(self.create_physical_expr(
549                 expr,
550                 input_schema,
551                 ctx_state,
552             )?),
553             Expr::IsNotNull(expr) => expressions::is_not_null(
554                 self.create_physical_expr(expr, input_schema, ctx_state)?,
555             ),
556             Expr::ScalarFunction { fun, args } => {
557                 let physical_args = args
558                     .iter()
559                     .map(|e| self.create_physical_expr(e, input_schema, ctx_state))
560                     .collect::<Result<Vec<_>>>()?;
561                 functions::create_physical_expr(fun, &physical_args, input_schema)
562             }
563             Expr::ScalarUDF { fun, args } => {
564                 let mut physical_args = vec![];
565                 for e in args {
566                     physical_args.push(self.create_physical_expr(
567                         e,
568                         input_schema,
569                         ctx_state,
570                     )?);
571                 }
572 
573                 udf::create_physical_expr(
574                     fun.clone().as_ref(),
575                     &physical_args,
576                     input_schema,
577                 )
578             }
579             Expr::Between {
580                 expr,
581                 negated,
582                 low,
583                 high,
584             } => {
585                 let value_expr =
586                     self.create_physical_expr(expr, input_schema, ctx_state)?;
587                 let low_expr = self.create_physical_expr(low, input_schema, ctx_state)?;
588                 let high_expr =
589                     self.create_physical_expr(high, input_schema, ctx_state)?;
590 
591                 // rewrite the between into the two binary operators
592                 let binary_expr = binary(
593                     binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?,
594                     Operator::And,
595                     binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?,
596                     input_schema,
597                 );
598 
599                 if *negated {
600                     expressions::not(binary_expr?, input_schema)
601                 } else {
602                     binary_expr
603                 }
604             }
605             Expr::InList {
606                 expr,
607                 list,
608                 negated,
609             } => match expr.as_ref() {
610                 Expr::Literal(ScalarValue::Utf8(None)) => {
611                     Ok(expressions::lit(ScalarValue::Boolean(None)))
612                 }
613                 _ => {
614                     let value_expr =
615                         self.create_physical_expr(expr, input_schema, ctx_state)?;
616                     let value_expr_data_type = value_expr.data_type(input_schema)?;
617 
618                     let list_exprs =
619                         list.iter()
620                             .map(|expr| match expr {
621                                 Expr::Literal(ScalarValue::Utf8(None)) => self
622                                     .create_physical_expr(expr, input_schema, ctx_state),
623                                 _ => {
624                                     let list_expr = self.create_physical_expr(
625                                         expr,
626                                         input_schema,
627                                         ctx_state,
628                                     )?;
629                                     let list_expr_data_type =
630                                         list_expr.data_type(input_schema)?;
631 
632                                     if list_expr_data_type == value_expr_data_type {
633                                         Ok(list_expr)
634                                     } else if can_cast_types(
635                                         &list_expr_data_type,
636                                         &value_expr_data_type,
637                                     ) {
638                                         expressions::cast(
639                                             list_expr,
640                                             input_schema,
641                                             value_expr.data_type(input_schema)?,
642                                         )
643                                     } else {
644                                         Err(DataFusionError::Plan(format!(
645                                             "Unsupported CAST from {:?} to {:?}",
646                                             list_expr_data_type, value_expr_data_type
647                                         )))
648                                     }
649                                 }
650                             })
651                             .collect::<Result<Vec<_>>>()?;
652 
653                     expressions::in_list(value_expr, list_exprs, negated)
654                 }
655             },
656             other => Err(DataFusionError::NotImplemented(format!(
657                 "Physical plan does not support logical expression {:?}",
658                 other
659             ))),
660         }
661     }
662 
663     /// Create an aggregate expression from a logical expression
create_aggregate_expr( &self, e: &Expr, logical_input_schema: &DFSchema, physical_input_schema: &Schema, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn AggregateExpr>>664     pub fn create_aggregate_expr(
665         &self,
666         e: &Expr,
667         logical_input_schema: &DFSchema,
668         physical_input_schema: &Schema,
669         ctx_state: &ExecutionContextState,
670     ) -> Result<Arc<dyn AggregateExpr>> {
671         // unpack aliased logical expressions, e.g. "sum(col) as total"
672         let (name, e) = match e {
673             Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
674             _ => (e.name(logical_input_schema)?, e),
675         };
676 
677         match e {
678             Expr::AggregateFunction {
679                 fun,
680                 distinct,
681                 args,
682                 ..
683             } => {
684                 let args = args
685                     .iter()
686                     .map(|e| {
687                         self.create_physical_expr(e, physical_input_schema, ctx_state)
688                     })
689                     .collect::<Result<Vec<_>>>()?;
690                 aggregates::create_aggregate_expr(
691                     fun,
692                     *distinct,
693                     &args,
694                     physical_input_schema,
695                     name,
696                 )
697             }
698             Expr::AggregateUDF { fun, args, .. } => {
699                 let args = args
700                     .iter()
701                     .map(|e| {
702                         self.create_physical_expr(e, physical_input_schema, ctx_state)
703                     })
704                     .collect::<Result<Vec<_>>>()?;
705 
706                 udaf::create_aggregate_expr(fun, &args, physical_input_schema, name)
707             }
708             other => Err(DataFusionError::Internal(format!(
709                 "Invalid aggregate expression '{:?}'",
710                 other
711             ))),
712         }
713     }
714 
715     /// Create an aggregate expression from a logical expression
create_physical_sort_expr( &self, e: &Expr, input_schema: &Schema, options: SortOptions, ctx_state: &ExecutionContextState, ) -> Result<PhysicalSortExpr>716     pub fn create_physical_sort_expr(
717         &self,
718         e: &Expr,
719         input_schema: &Schema,
720         options: SortOptions,
721         ctx_state: &ExecutionContextState,
722     ) -> Result<PhysicalSortExpr> {
723         Ok(PhysicalSortExpr {
724             expr: self.create_physical_expr(e, input_schema, ctx_state)?,
725             options,
726         })
727     }
728 }
729 
tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)>730 fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
731     match value {
732         (Ok(e), Ok(e1)) => Ok((e, e1)),
733         (Err(e), Ok(_)) => Err(e),
734         (Ok(_), Err(e1)) => Err(e1),
735         (Err(e), Err(_)) => Err(e),
736     }
737 }
738 
739 #[cfg(test)]
740 mod tests {
741     use super::*;
742     use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
743     use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
744     use crate::prelude::ExecutionConfig;
745     use crate::scalar::ScalarValue;
746     use crate::{
747         logical_plan::{col, lit, sum, LogicalPlanBuilder},
748         physical_plan::SendableRecordBatchStream,
749     };
750     use arrow::datatypes::{DataType, Field, SchemaRef};
751     use async_trait::async_trait;
752     use fmt::Debug;
753     use std::{any::Any, collections::HashMap, fmt};
754 
make_ctx_state() -> ExecutionContextState755     fn make_ctx_state() -> ExecutionContextState {
756         ExecutionContextState {
757             datasources: HashMap::new(),
758             scalar_functions: HashMap::new(),
759             var_provider: HashMap::new(),
760             aggregate_functions: HashMap::new(),
761             config: ExecutionConfig::new(),
762         }
763     }
764 
plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>>765     fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
766         let ctx_state = make_ctx_state();
767         let planner = DefaultPhysicalPlanner::default();
768         planner.create_physical_plan(logical_plan, &ctx_state)
769     }
770 
771     #[test]
test_all_operators() -> Result<()>772     fn test_all_operators() -> Result<()> {
773         let testdata = arrow::util::test_util::arrow_test_data();
774         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
775 
776         let options = CsvReadOptions::new().schema_infer_max_records(100);
777         let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
778             // filter clause needs the type coercion rule applied
779             .filter(col("c7").lt(lit(5_u8)))?
780             .project(&[col("c1"), col("c2")])?
781             .aggregate(&[col("c1")], &[sum(col("c2"))])?
782             .sort(&[col("c1").sort(true, true)])?
783             .limit(10)?
784             .build()?;
785 
786         let plan = plan(&logical_plan)?;
787 
788         // verify that the plan correctly casts u8 to i64
789         let expected = "BinaryExpr { left: Column { name: \"c7\" }, op: Lt, right: CastExpr { expr: Literal { value: UInt8(5) }, cast_type: Int64 } }";
790         assert!(format!("{:?}", plan).contains(expected));
791 
792         Ok(())
793     }
794 
795     #[test]
test_create_not() -> Result<()>796     fn test_create_not() -> Result<()> {
797         let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
798 
799         let planner = DefaultPhysicalPlanner::default();
800 
801         let expr =
802             planner.create_physical_expr(&col("a").not(), &schema, &make_ctx_state())?;
803         let expected = expressions::not(expressions::col("a"), &schema)?;
804 
805         assert_eq!(format!("{:?}", expr), format!("{:?}", expected));
806 
807         Ok(())
808     }
809 
810     #[test]
test_with_csv_plan() -> Result<()>811     fn test_with_csv_plan() -> Result<()> {
812         let testdata = arrow::util::test_util::arrow_test_data();
813         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
814 
815         let options = CsvReadOptions::new().schema_infer_max_records(100);
816         let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
817             .filter(col("c7").lt(col("c12")))?
818             .build()?;
819 
820         let plan = plan(&logical_plan)?;
821 
822         // c12 is f64, c7 is u8 -> cast c7 to f64
823         let expected = "predicate: BinaryExpr { left: CastExpr { expr: Column { name: \"c7\" }, cast_type: Float64 }, op: Lt, right: Column { name: \"c12\" } }";
824         assert!(format!("{:?}", plan).contains(expected));
825         Ok(())
826     }
827 
828     #[test]
errors() -> Result<()>829     fn errors() -> Result<()> {
830         let testdata = arrow::util::test_util::arrow_test_data();
831         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
832         let options = CsvReadOptions::new().schema_infer_max_records(100);
833 
834         let bool_expr = col("c1").eq(col("c1"));
835         let cases = vec![
836             // utf8 < u32
837             col("c1").lt(col("c2")),
838             // utf8 AND utf8
839             col("c1").and(col("c1")),
840             // u8 AND u8
841             col("c3").and(col("c3")),
842             // utf8 = u32
843             col("c1").eq(col("c2")),
844             // utf8 = bool
845             col("c1").eq(bool_expr.clone()),
846             // u32 AND bool
847             col("c2").and(bool_expr),
848             // utf8 LIKE u32
849             col("c1").like(col("c2")),
850         ];
851         for case in cases {
852             let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
853                 .project(&[case.clone()]);
854             let message = format!(
855                 "Expression {:?} expected to error due to impossible coercion",
856                 case
857             );
858             assert!(logical_plan.is_err(), "{}", message);
859         }
860         Ok(())
861     }
862 
863     #[test]
default_extension_planner()864     fn default_extension_planner() {
865         let ctx_state = make_ctx_state();
866         let planner = DefaultPhysicalPlanner::default();
867         let logical_plan = LogicalPlan::Extension {
868             node: Arc::new(NoOpExtensionNode::default()),
869         };
870         let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
871 
872         let expected_error = "No installed planner was able to convert the custom node to an execution plan: NoOp";
873         match plan {
874             Ok(_) => panic!("Expected planning failure"),
875             Err(e) => assert!(
876                 e.to_string().contains(expected_error),
877                 "Error '{}' did not contain expected error '{}'",
878                 e.to_string(),
879                 expected_error
880             ),
881         }
882     }
883 
884     #[test]
bad_extension_planner()885     fn bad_extension_planner() {
886         // Test that creating an execution plan whose schema doesn't
887         // match the logical plan's schema generates an error.
888         let ctx_state = make_ctx_state();
889         let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
890             BadExtensionPlanner {},
891         )]);
892 
893         let logical_plan = LogicalPlan::Extension {
894             node: Arc::new(NoOpExtensionNode::default()),
895         };
896         let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
897 
898         let expected_error: &str = "Error during planning: \
899         Extension planner for NoOp created an ExecutionPlan with mismatched schema. \
900         LogicalPlan schema: DFSchema { fields: [\
901             DFField { qualifier: None, field: Field { \
902                 name: \"a\", \
903                 data_type: Int32, \
904                 nullable: false, \
905                 dict_id: 0, \
906                 dict_is_ordered: false, \
907                 metadata: None } }\
908         ] }, \
909         ExecutionPlan schema: Schema { fields: [\
910             Field { \
911                 name: \"b\", \
912                 data_type: Int32, \
913                 nullable: false, \
914                 dict_id: 0, \
915                 dict_is_ordered: false, \
916                 metadata: None }\
917         ], metadata: {} }";
918         match plan {
919             Ok(_) => panic!("Expected planning failure"),
920             Err(e) => assert!(
921                 e.to_string().contains(expected_error),
922                 "Error '{}' did not contain expected error '{}'",
923                 e.to_string(),
924                 expected_error
925             ),
926         }
927     }
928 
929     #[test]
in_list_types() -> Result<()>930     fn in_list_types() -> Result<()> {
931         let testdata = arrow::util::test_util::arrow_test_data();
932         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
933         let options = CsvReadOptions::new().schema_infer_max_records(100);
934 
935         // expression: "a in ('a', 1)"
936         let list = vec![
937             Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
938             Expr::Literal(ScalarValue::Int64(Some(1))),
939         ];
940         let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
941             // filter clause needs the type coercion rule applied
942             .filter(col("c12").lt(lit(0.05)))?
943             .project(&[col("c1").in_list(list, false)])?
944             .build()?;
945         let execution_plan = plan(&logical_plan)?;
946         // verify that the plan correctly adds cast from Int64(1) to Utf8
947         let expected = "InListExpr { expr: Column { name: \"c1\" }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8 }], negated: false }";
948         assert!(format!("{:?}", execution_plan).contains(expected));
949 
950         // expression: "a in (true, 'a')"
951         let list = vec![
952             Expr::Literal(ScalarValue::Boolean(Some(true))),
953             Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
954         ];
955         let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
956             // filter clause needs the type coercion rule applied
957             .filter(col("c12").lt(lit(0.05)))?
958             .project(&[col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
959             .build()?;
960         let execution_plan = plan(&logical_plan);
961 
962         let expected_error = "Unsupported CAST from Utf8 to Boolean";
963         match execution_plan {
964             Ok(_) => panic!("Expected planning failure"),
965             Err(e) => assert!(
966                 e.to_string().contains(expected_error),
967                 "Error '{}' did not contain expected error '{}'",
968                 e.to_string(),
969                 expected_error
970             ),
971         }
972 
973         Ok(())
974     }
975 
976     #[test]
hash_agg_input_schema() -> Result<()>977     fn hash_agg_input_schema() -> Result<()> {
978         let testdata = arrow::util::test_util::arrow_test_data();
979         let path = format!("{}/csv/aggregate_test_100.csv", testdata);
980 
981         let options = CsvReadOptions::new().schema_infer_max_records(100);
982         let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
983             .aggregate(&[col("c1")], &[sum(col("c2"))])?
984             .build()?;
985 
986         let execution_plan = plan(&logical_plan)?;
987         let final_hash_agg = execution_plan
988             .as_any()
989             .downcast_ref::<HashAggregateExec>()
990             .expect("hash aggregate");
991         assert_eq!("SUM(c2)", final_hash_agg.schema().field(1).name());
992         // we need access to the input to the partial aggregate so that other projects can
993         // implement serde
994         assert_eq!("c2", final_hash_agg.input_schema().field(1).name());
995 
996         Ok(())
997     }
998 
999     /// An example extension node that doesn't do anything
1000     struct NoOpExtensionNode {
1001         schema: DFSchemaRef,
1002     }
1003 
1004     impl Default for NoOpExtensionNode {
default() -> Self1005         fn default() -> Self {
1006             Self {
1007                 schema: DFSchemaRef::new(
1008                     DFSchema::new(vec![DFField::new(None, "a", DataType::Int32, false)])
1009                         .unwrap(),
1010                 ),
1011             }
1012         }
1013     }
1014 
1015     impl Debug for NoOpExtensionNode {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1016         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1017             write!(f, "NoOp")
1018         }
1019     }
1020 
1021     impl UserDefinedLogicalNode for NoOpExtensionNode {
as_any(&self) -> &dyn Any1022         fn as_any(&self) -> &dyn Any {
1023             self
1024         }
1025 
inputs(&self) -> Vec<&LogicalPlan>1026         fn inputs(&self) -> Vec<&LogicalPlan> {
1027             vec![]
1028         }
1029 
schema(&self) -> &DFSchemaRef1030         fn schema(&self) -> &DFSchemaRef {
1031             &self.schema
1032         }
1033 
expressions(&self) -> Vec<Expr>1034         fn expressions(&self) -> Vec<Expr> {
1035             vec![]
1036         }
1037 
fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result1038         fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
1039             write!(f, "NoOp")
1040         }
1041 
from_template( &self, _exprs: &[Expr], _inputs: &[LogicalPlan], ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync>1042         fn from_template(
1043             &self,
1044             _exprs: &[Expr],
1045             _inputs: &[LogicalPlan],
1046         ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
1047             unimplemented!("NoOp");
1048         }
1049     }
1050 
1051     #[derive(Debug)]
1052     struct NoOpExecutionPlan {
1053         schema: SchemaRef,
1054     }
1055 
1056     #[async_trait]
1057     impl ExecutionPlan for NoOpExecutionPlan {
1058         /// Return a reference to Any that can be used for downcasting
as_any(&self) -> &dyn Any1059         fn as_any(&self) -> &dyn Any {
1060             self
1061         }
1062 
schema(&self) -> SchemaRef1063         fn schema(&self) -> SchemaRef {
1064             self.schema.clone()
1065         }
1066 
output_partitioning(&self) -> Partitioning1067         fn output_partitioning(&self) -> Partitioning {
1068             Partitioning::UnknownPartitioning(1)
1069         }
1070 
children(&self) -> Vec<Arc<dyn ExecutionPlan>>1071         fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
1072             vec![]
1073         }
1074 
with_new_children( &self, _children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>1075         fn with_new_children(
1076             &self,
1077             _children: Vec<Arc<dyn ExecutionPlan>>,
1078         ) -> Result<Arc<dyn ExecutionPlan>> {
1079             unimplemented!("NoOpExecutionPlan::with_new_children");
1080         }
1081 
execute(&self, _partition: usize) -> Result<SendableRecordBatchStream>1082         async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
1083             unimplemented!("NoOpExecutionPlan::execute");
1084         }
1085     }
1086 
1087     //  Produces an execution plan where the schema is mismatched from
1088     //  the logical plan node.
1089     struct BadExtensionPlanner {}
1090 
1091     impl ExtensionPlanner for BadExtensionPlanner {
1092         /// Create a physical plan for an extension node
plan_extension( &self, _node: &dyn UserDefinedLogicalNode, _inputs: &[Arc<dyn ExecutionPlan>], _ctx_state: &ExecutionContextState, ) -> Result<Option<Arc<dyn ExecutionPlan>>>1093         fn plan_extension(
1094             &self,
1095             _node: &dyn UserDefinedLogicalNode,
1096             _inputs: &[Arc<dyn ExecutionPlan>],
1097             _ctx_state: &ExecutionContextState,
1098         ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1099             Ok(Some(Arc::new(NoOpExecutionPlan {
1100                 schema: SchemaRef::new(Schema::new(vec![Field::new(
1101                     "b",
1102                     DataType::Int32,
1103                     false,
1104                 )])),
1105             })))
1106         }
1107     }
1108 }
1109