1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 //! Physical query planner
19
20 use std::sync::Arc;
21
22 use super::{aggregates, empty::EmptyExec, expressions::binary, functions, udaf};
23 use crate::error::{DataFusionError, Result};
24 use crate::execution::context::ExecutionContextState;
25 use crate::logical_plan::{
26 DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType,
27 StringifiedPlan, UserDefinedLogicalNode,
28 };
29 use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
30 use crate::physical_plan::explain::ExplainExec;
31 use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr};
32 use crate::physical_plan::filter::FilterExec;
33 use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
34 use crate::physical_plan::hash_join::HashJoinExec;
35 use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
36 use crate::physical_plan::merge::MergeExec;
37 use crate::physical_plan::projection::ProjectionExec;
38 use crate::physical_plan::repartition::RepartitionExec;
39 use crate::physical_plan::sort::SortExec;
40 use crate::physical_plan::udf;
41 use crate::physical_plan::{expressions, Distribution};
42 use crate::physical_plan::{hash_utils, Partitioning};
43 use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner};
44 use crate::prelude::JoinType;
45 use crate::scalar::ScalarValue;
46 use crate::variable::VarType;
47 use arrow::compute::can_cast_types;
48
49 use arrow::compute::SortOptions;
50 use arrow::datatypes::{Schema, SchemaRef};
51 use expressions::col;
52
53 /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a [`LogicalPlan`].
54 pub trait ExtensionPlanner {
55 /// Create a physical plan for a [`UserDefinedLogicalNode`].
56 /// This errors when the planner knows how to plan the concrete implementation of `node`
57 /// but errors while doing so, and `None` when the planner does not know how to plan the `node`
58 /// and wants to delegate the planning to another [`ExtensionPlanner`].
plan_extension( &self, node: &dyn UserDefinedLogicalNode, inputs: &[Arc<dyn ExecutionPlan>], ctx_state: &ExecutionContextState, ) -> Result<Option<Arc<dyn ExecutionPlan>>>59 fn plan_extension(
60 &self,
61 node: &dyn UserDefinedLogicalNode,
62 inputs: &[Arc<dyn ExecutionPlan>],
63 ctx_state: &ExecutionContextState,
64 ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
65 }
66
67 /// Default single node physical query planner that converts a
68 /// `LogicalPlan` to an `ExecutionPlan` suitable for execution.
69 pub struct DefaultPhysicalPlanner {
70 extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
71 }
72
73 impl Default for DefaultPhysicalPlanner {
default() -> Self74 fn default() -> Self {
75 Self {
76 extension_planners: vec![],
77 }
78 }
79 }
80
81 impl PhysicalPlanner for DefaultPhysicalPlanner {
82 /// Create a physical plan from a logical plan
create_physical_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>>83 fn create_physical_plan(
84 &self,
85 logical_plan: &LogicalPlan,
86 ctx_state: &ExecutionContextState,
87 ) -> Result<Arc<dyn ExecutionPlan>> {
88 let plan = self.create_initial_plan(logical_plan, ctx_state)?;
89 self.optimize_plan(plan, ctx_state)
90 }
91 }
92
93 impl DefaultPhysicalPlanner {
94 /// Create a physical planner that uses `extension_planners` to
95 /// plan user-defined logical nodes [`LogicalPlan::Extension`].
96 /// The planner uses the first [`ExtensionPlanner`] to return a non-`None`
97 /// plan.
with_extension_planners( extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>, ) -> Self98 pub fn with_extension_planners(
99 extension_planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>>,
100 ) -> Self {
101 Self { extension_planners }
102 }
103
104 /// Create a physical plan from a logical plan
optimize_plan( &self, plan: Arc<dyn ExecutionPlan>, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>>105 fn optimize_plan(
106 &self,
107 plan: Arc<dyn ExecutionPlan>,
108 ctx_state: &ExecutionContextState,
109 ) -> Result<Arc<dyn ExecutionPlan>> {
110 let children = plan
111 .children()
112 .iter()
113 .map(|child| self.optimize_plan(child.clone(), ctx_state))
114 .collect::<Result<Vec<_>>>()?;
115
116 if children.is_empty() {
117 // leaf node, children cannot be replaced
118 Ok(plan.clone())
119 } else {
120 // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have
121 // highly selective filters
122 let plan_any = plan.as_any();
123 //TODO we should do this in a more generic way either by wrapping all operators
124 // or having an API so that operators can declare when their inputs or outputs
125 // need to be wrapped in a coalesce batches operator.
126 // See https://issues.apache.org/jira/browse/ARROW-11068
127 let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
128 || plan_any.downcast_ref::<HashJoinExec>().is_some()
129 || plan_any.downcast_ref::<RepartitionExec>().is_some();
130
131 //TODO we should also do this for HashAggregateExec but we need to update tests
132 // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068
133 // || plan_any.downcast_ref::<HashAggregateExec>().is_some();
134
135 let plan = if wrap_in_coalesce {
136 //TODO we should add specific configuration settings for coalescing batches and
137 // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
138 // implemented. For now, we choose half the configured batch size to avoid copies
139 // when a small number of rows are removed from a batch
140 let target_batch_size = ctx_state.config.batch_size / 2;
141 Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
142 } else {
143 plan.clone()
144 };
145
146 let children = plan.children().clone();
147
148 match plan.required_child_distribution() {
149 Distribution::UnspecifiedDistribution => plan.with_new_children(children),
150 Distribution::SinglePartition => plan.with_new_children(
151 children
152 .iter()
153 .map(|child| {
154 if child.output_partitioning().partition_count() == 1 {
155 child.clone()
156 } else {
157 Arc::new(MergeExec::new(child.clone()))
158 }
159 })
160 .collect(),
161 ),
162 }
163 }
164 }
165
166 /// Create a physical plan from a logical plan
create_initial_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>>167 fn create_initial_plan(
168 &self,
169 logical_plan: &LogicalPlan,
170 ctx_state: &ExecutionContextState,
171 ) -> Result<Arc<dyn ExecutionPlan>> {
172 let batch_size = ctx_state.config.batch_size;
173
174 match logical_plan {
175 LogicalPlan::TableScan {
176 source,
177 projection,
178 filters,
179 ..
180 } => source.scan(projection, batch_size, filters),
181 LogicalPlan::Aggregate {
182 input,
183 group_expr,
184 aggr_expr,
185 ..
186 } => {
187 // Initially need to perform the aggregate and then merge the partitions
188 let input_exec = self.create_physical_plan(input, ctx_state)?;
189 let input_schema = input_exec.schema();
190 let physical_input_schema = input_exec.as_ref().schema();
191 let logical_input_schema = input.as_ref().schema();
192
193 let groups = group_expr
194 .iter()
195 .map(|e| {
196 tuple_err((
197 self.create_physical_expr(
198 e,
199 &physical_input_schema,
200 ctx_state,
201 ),
202 e.name(&logical_input_schema),
203 ))
204 })
205 .collect::<Result<Vec<_>>>()?;
206 let aggregates = aggr_expr
207 .iter()
208 .map(|e| {
209 self.create_aggregate_expr(
210 e,
211 &logical_input_schema,
212 &physical_input_schema,
213 ctx_state,
214 )
215 })
216 .collect::<Result<Vec<_>>>()?;
217
218 let initial_aggr = Arc::new(HashAggregateExec::try_new(
219 AggregateMode::Partial,
220 groups.clone(),
221 aggregates.clone(),
222 input_exec,
223 input_schema.clone(),
224 )?);
225
226 let final_group: Vec<Arc<dyn PhysicalExpr>> =
227 (0..groups.len()).map(|i| col(&groups[i].1)).collect();
228
229 // construct a second aggregation, keeping the final column name equal to the first aggregation
230 // and the expressions corresponding to the respective aggregate
231 Ok(Arc::new(HashAggregateExec::try_new(
232 AggregateMode::Final,
233 final_group
234 .iter()
235 .enumerate()
236 .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
237 .collect(),
238 aggregates,
239 initial_aggr,
240 input_schema,
241 )?))
242 }
243 LogicalPlan::Projection { input, expr, .. } => {
244 let input_exec = self.create_physical_plan(input, ctx_state)?;
245 let input_schema = input.as_ref().schema();
246 let runtime_expr = expr
247 .iter()
248 .map(|e| {
249 tuple_err((
250 self.create_physical_expr(
251 e,
252 &input_exec.schema(),
253 &ctx_state,
254 ),
255 e.name(&input_schema),
256 ))
257 })
258 .collect::<Result<Vec<_>>>()?;
259 Ok(Arc::new(ProjectionExec::try_new(runtime_expr, input_exec)?))
260 }
261 LogicalPlan::Filter {
262 input, predicate, ..
263 } => {
264 let input = self.create_physical_plan(input, ctx_state)?;
265 let input_schema = input.as_ref().schema();
266 let runtime_expr =
267 self.create_physical_expr(predicate, &input_schema, ctx_state)?;
268 Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?))
269 }
270 LogicalPlan::Repartition {
271 input,
272 partitioning_scheme,
273 } => {
274 let input = self.create_physical_plan(input, ctx_state)?;
275 let input_schema = input.schema();
276 let physical_partitioning = match partitioning_scheme {
277 LogicalPartitioning::RoundRobinBatch(n) => {
278 Partitioning::RoundRobinBatch(*n)
279 }
280 LogicalPartitioning::Hash(expr, n) => {
281 let runtime_expr = expr
282 .iter()
283 .map(|e| {
284 self.create_physical_expr(e, &input_schema, &ctx_state)
285 })
286 .collect::<Result<Vec<_>>>()?;
287 Partitioning::Hash(runtime_expr, *n)
288 }
289 };
290 Ok(Arc::new(RepartitionExec::try_new(
291 input,
292 physical_partitioning,
293 )?))
294 }
295 LogicalPlan::Sort { expr, input, .. } => {
296 let input = self.create_physical_plan(input, ctx_state)?;
297 let input_schema = input.as_ref().schema();
298
299 let sort_expr = expr
300 .iter()
301 .map(|e| match e {
302 Expr::Sort {
303 expr,
304 asc,
305 nulls_first,
306 } => self.create_physical_sort_expr(
307 expr,
308 &input_schema,
309 SortOptions {
310 descending: !*asc,
311 nulls_first: *nulls_first,
312 },
313 ctx_state,
314 ),
315 _ => Err(DataFusionError::Plan(
316 "Sort only accepts sort expressions".to_string(),
317 )),
318 })
319 .collect::<Result<Vec<_>>>()?;
320
321 Ok(Arc::new(SortExec::try_new(sort_expr, input)?))
322 }
323 LogicalPlan::Join {
324 left,
325 right,
326 on: keys,
327 join_type,
328 ..
329 } => {
330 let left = self.create_physical_plan(left, ctx_state)?;
331 let right = self.create_physical_plan(right, ctx_state)?;
332 let physical_join_type = match join_type {
333 JoinType::Inner => hash_utils::JoinType::Inner,
334 JoinType::Left => hash_utils::JoinType::Left,
335 JoinType::Right => hash_utils::JoinType::Right,
336 };
337
338 Ok(Arc::new(HashJoinExec::try_new(
339 left,
340 right,
341 &keys,
342 &physical_join_type,
343 )?))
344 }
345 LogicalPlan::EmptyRelation {
346 produce_one_row,
347 schema,
348 } => Ok(Arc::new(EmptyExec::new(
349 *produce_one_row,
350 SchemaRef::new(schema.as_ref().to_owned().into()),
351 ))),
352 LogicalPlan::Limit { input, n, .. } => {
353 let limit = *n;
354 let input = self.create_physical_plan(input, ctx_state)?;
355
356 // GlobalLimitExec requires a single partition for input
357 let input = if input.output_partitioning().partition_count() == 1 {
358 input
359 } else {
360 // Apply a LocalLimitExec to each partition. The optimizer will also insert
361 // a MergeExec between the GlobalLimitExec and LocalLimitExec
362 Arc::new(LocalLimitExec::new(input, limit))
363 };
364
365 Ok(Arc::new(GlobalLimitExec::new(input, limit)))
366 }
367 LogicalPlan::CreateExternalTable { .. } => {
368 // There is no default plan for "CREATE EXTERNAL
369 // TABLE" -- it must be handled at a higher level (so
370 // that the appropriate table can be registered with
371 // the context)
372 Err(DataFusionError::Internal(
373 "Unsupported logical plan: CreateExternalTable".to_string(),
374 ))
375 }
376 LogicalPlan::Explain {
377 verbose,
378 plan,
379 stringified_plans,
380 schema,
381 } => {
382 let input = self.create_physical_plan(plan, ctx_state)?;
383
384 let mut stringified_plans = stringified_plans
385 .iter()
386 .filter(|s| s.should_display(*verbose))
387 .cloned()
388 .collect::<Vec<_>>();
389
390 // add in the physical plan if requested
391 if *verbose {
392 stringified_plans.push(StringifiedPlan::new(
393 PlanType::PhysicalPlan,
394 format!("{:#?}", input),
395 ));
396 }
397 Ok(Arc::new(ExplainExec::new(
398 SchemaRef::new(schema.as_ref().to_owned().into()),
399 stringified_plans,
400 )))
401 }
402 LogicalPlan::Extension { node } => {
403 let inputs = node
404 .inputs()
405 .into_iter()
406 .map(|input_plan| self.create_physical_plan(input_plan, ctx_state))
407 .collect::<Result<Vec<_>>>()?;
408
409 let maybe_plan = self.extension_planners.iter().try_fold(
410 None,
411 |maybe_plan, planner| {
412 if let Some(plan) = maybe_plan {
413 Ok(Some(plan))
414 } else {
415 planner.plan_extension(node.as_ref(), &inputs, ctx_state)
416 }
417 },
418 )?;
419 let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!(
420 "No installed planner was able to convert the custom node to an execution plan: {:?}", node
421 )))?;
422
423 // Ensure the ExecutionPlan's schema matches the
424 // declared logical schema to catch and warn about
425 // logic errors when creating user defined plans.
426 if plan.schema() != node.schema().as_ref().to_owned().into() {
427 Err(DataFusionError::Plan(format!(
428 "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
429 LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
430 node, node.schema(), plan.schema()
431 )))
432 } else {
433 Ok(plan)
434 }
435 }
436 }
437 }
438
439 /// Create a physical expression from a logical expression
create_physical_expr( &self, e: &Expr, input_schema: &Schema, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn PhysicalExpr>>440 pub fn create_physical_expr(
441 &self,
442 e: &Expr,
443 input_schema: &Schema,
444 ctx_state: &ExecutionContextState,
445 ) -> Result<Arc<dyn PhysicalExpr>> {
446 match e {
447 Expr::Alias(expr, ..) => {
448 Ok(self.create_physical_expr(expr, input_schema, ctx_state)?)
449 }
450 Expr::Column(name) => {
451 // check that name exists
452 input_schema.field_with_name(&name)?;
453 Ok(Arc::new(Column::new(name)))
454 }
455 Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))),
456 Expr::ScalarVariable(variable_names) => {
457 if &variable_names[0][0..2] == "@@" {
458 match ctx_state.var_provider.get(&VarType::System) {
459 Some(provider) => {
460 let scalar_value =
461 provider.get_value(variable_names.clone())?;
462 Ok(Arc::new(Literal::new(scalar_value)))
463 }
464 _ => Err(DataFusionError::Plan(
465 "No system variable provider found".to_string(),
466 )),
467 }
468 } else {
469 match ctx_state.var_provider.get(&VarType::UserDefined) {
470 Some(provider) => {
471 let scalar_value =
472 provider.get_value(variable_names.clone())?;
473 Ok(Arc::new(Literal::new(scalar_value)))
474 }
475 _ => Err(DataFusionError::Plan(
476 "No user defined variable provider found".to_string(),
477 )),
478 }
479 }
480 }
481 Expr::BinaryExpr { left, op, right } => {
482 let lhs = self.create_physical_expr(left, input_schema, ctx_state)?;
483 let rhs = self.create_physical_expr(right, input_schema, ctx_state)?;
484 binary(lhs, *op, rhs, input_schema)
485 }
486 Expr::Case {
487 expr,
488 when_then_expr,
489 else_expr,
490 ..
491 } => {
492 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = expr {
493 Some(self.create_physical_expr(
494 e.as_ref(),
495 input_schema,
496 ctx_state,
497 )?)
498 } else {
499 None
500 };
501 let when_expr = when_then_expr
502 .iter()
503 .map(|(w, _)| {
504 self.create_physical_expr(w.as_ref(), input_schema, ctx_state)
505 })
506 .collect::<Result<Vec<_>>>()?;
507 let then_expr = when_then_expr
508 .iter()
509 .map(|(_, t)| {
510 self.create_physical_expr(t.as_ref(), input_schema, ctx_state)
511 })
512 .collect::<Result<Vec<_>>>()?;
513 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
514 when_expr
515 .iter()
516 .zip(then_expr.iter())
517 .map(|(w, t)| (w.clone(), t.clone()))
518 .collect();
519 let else_expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = else_expr
520 {
521 Some(self.create_physical_expr(
522 e.as_ref(),
523 input_schema,
524 ctx_state,
525 )?)
526 } else {
527 None
528 };
529 Ok(Arc::new(CaseExpr::try_new(
530 expr,
531 &when_then_expr,
532 else_expr,
533 )?))
534 }
535 Expr::Cast { expr, data_type } => expressions::cast(
536 self.create_physical_expr(expr, input_schema, ctx_state)?,
537 input_schema,
538 data_type.clone(),
539 ),
540 Expr::Not(expr) => expressions::not(
541 self.create_physical_expr(expr, input_schema, ctx_state)?,
542 input_schema,
543 ),
544 Expr::Negative(expr) => expressions::negative(
545 self.create_physical_expr(expr, input_schema, ctx_state)?,
546 input_schema,
547 ),
548 Expr::IsNull(expr) => expressions::is_null(self.create_physical_expr(
549 expr,
550 input_schema,
551 ctx_state,
552 )?),
553 Expr::IsNotNull(expr) => expressions::is_not_null(
554 self.create_physical_expr(expr, input_schema, ctx_state)?,
555 ),
556 Expr::ScalarFunction { fun, args } => {
557 let physical_args = args
558 .iter()
559 .map(|e| self.create_physical_expr(e, input_schema, ctx_state))
560 .collect::<Result<Vec<_>>>()?;
561 functions::create_physical_expr(fun, &physical_args, input_schema)
562 }
563 Expr::ScalarUDF { fun, args } => {
564 let mut physical_args = vec![];
565 for e in args {
566 physical_args.push(self.create_physical_expr(
567 e,
568 input_schema,
569 ctx_state,
570 )?);
571 }
572
573 udf::create_physical_expr(
574 fun.clone().as_ref(),
575 &physical_args,
576 input_schema,
577 )
578 }
579 Expr::Between {
580 expr,
581 negated,
582 low,
583 high,
584 } => {
585 let value_expr =
586 self.create_physical_expr(expr, input_schema, ctx_state)?;
587 let low_expr = self.create_physical_expr(low, input_schema, ctx_state)?;
588 let high_expr =
589 self.create_physical_expr(high, input_schema, ctx_state)?;
590
591 // rewrite the between into the two binary operators
592 let binary_expr = binary(
593 binary(value_expr.clone(), Operator::GtEq, low_expr, input_schema)?,
594 Operator::And,
595 binary(value_expr.clone(), Operator::LtEq, high_expr, input_schema)?,
596 input_schema,
597 );
598
599 if *negated {
600 expressions::not(binary_expr?, input_schema)
601 } else {
602 binary_expr
603 }
604 }
605 Expr::InList {
606 expr,
607 list,
608 negated,
609 } => match expr.as_ref() {
610 Expr::Literal(ScalarValue::Utf8(None)) => {
611 Ok(expressions::lit(ScalarValue::Boolean(None)))
612 }
613 _ => {
614 let value_expr =
615 self.create_physical_expr(expr, input_schema, ctx_state)?;
616 let value_expr_data_type = value_expr.data_type(input_schema)?;
617
618 let list_exprs =
619 list.iter()
620 .map(|expr| match expr {
621 Expr::Literal(ScalarValue::Utf8(None)) => self
622 .create_physical_expr(expr, input_schema, ctx_state),
623 _ => {
624 let list_expr = self.create_physical_expr(
625 expr,
626 input_schema,
627 ctx_state,
628 )?;
629 let list_expr_data_type =
630 list_expr.data_type(input_schema)?;
631
632 if list_expr_data_type == value_expr_data_type {
633 Ok(list_expr)
634 } else if can_cast_types(
635 &list_expr_data_type,
636 &value_expr_data_type,
637 ) {
638 expressions::cast(
639 list_expr,
640 input_schema,
641 value_expr.data_type(input_schema)?,
642 )
643 } else {
644 Err(DataFusionError::Plan(format!(
645 "Unsupported CAST from {:?} to {:?}",
646 list_expr_data_type, value_expr_data_type
647 )))
648 }
649 }
650 })
651 .collect::<Result<Vec<_>>>()?;
652
653 expressions::in_list(value_expr, list_exprs, negated)
654 }
655 },
656 other => Err(DataFusionError::NotImplemented(format!(
657 "Physical plan does not support logical expression {:?}",
658 other
659 ))),
660 }
661 }
662
663 /// Create an aggregate expression from a logical expression
create_aggregate_expr( &self, e: &Expr, logical_input_schema: &DFSchema, physical_input_schema: &Schema, ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn AggregateExpr>>664 pub fn create_aggregate_expr(
665 &self,
666 e: &Expr,
667 logical_input_schema: &DFSchema,
668 physical_input_schema: &Schema,
669 ctx_state: &ExecutionContextState,
670 ) -> Result<Arc<dyn AggregateExpr>> {
671 // unpack aliased logical expressions, e.g. "sum(col) as total"
672 let (name, e) = match e {
673 Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
674 _ => (e.name(logical_input_schema)?, e),
675 };
676
677 match e {
678 Expr::AggregateFunction {
679 fun,
680 distinct,
681 args,
682 ..
683 } => {
684 let args = args
685 .iter()
686 .map(|e| {
687 self.create_physical_expr(e, physical_input_schema, ctx_state)
688 })
689 .collect::<Result<Vec<_>>>()?;
690 aggregates::create_aggregate_expr(
691 fun,
692 *distinct,
693 &args,
694 physical_input_schema,
695 name,
696 )
697 }
698 Expr::AggregateUDF { fun, args, .. } => {
699 let args = args
700 .iter()
701 .map(|e| {
702 self.create_physical_expr(e, physical_input_schema, ctx_state)
703 })
704 .collect::<Result<Vec<_>>>()?;
705
706 udaf::create_aggregate_expr(fun, &args, physical_input_schema, name)
707 }
708 other => Err(DataFusionError::Internal(format!(
709 "Invalid aggregate expression '{:?}'",
710 other
711 ))),
712 }
713 }
714
715 /// Create an aggregate expression from a logical expression
create_physical_sort_expr( &self, e: &Expr, input_schema: &Schema, options: SortOptions, ctx_state: &ExecutionContextState, ) -> Result<PhysicalSortExpr>716 pub fn create_physical_sort_expr(
717 &self,
718 e: &Expr,
719 input_schema: &Schema,
720 options: SortOptions,
721 ctx_state: &ExecutionContextState,
722 ) -> Result<PhysicalSortExpr> {
723 Ok(PhysicalSortExpr {
724 expr: self.create_physical_expr(e, input_schema, ctx_state)?,
725 options,
726 })
727 }
728 }
729
tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)>730 fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
731 match value {
732 (Ok(e), Ok(e1)) => Ok((e, e1)),
733 (Err(e), Ok(_)) => Err(e),
734 (Ok(_), Err(e1)) => Err(e1),
735 (Err(e), Err(_)) => Err(e),
736 }
737 }
738
739 #[cfg(test)]
740 mod tests {
741 use super::*;
742 use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
743 use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
744 use crate::prelude::ExecutionConfig;
745 use crate::scalar::ScalarValue;
746 use crate::{
747 logical_plan::{col, lit, sum, LogicalPlanBuilder},
748 physical_plan::SendableRecordBatchStream,
749 };
750 use arrow::datatypes::{DataType, Field, SchemaRef};
751 use async_trait::async_trait;
752 use fmt::Debug;
753 use std::{any::Any, collections::HashMap, fmt};
754
make_ctx_state() -> ExecutionContextState755 fn make_ctx_state() -> ExecutionContextState {
756 ExecutionContextState {
757 datasources: HashMap::new(),
758 scalar_functions: HashMap::new(),
759 var_provider: HashMap::new(),
760 aggregate_functions: HashMap::new(),
761 config: ExecutionConfig::new(),
762 }
763 }
764
plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>>765 fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
766 let ctx_state = make_ctx_state();
767 let planner = DefaultPhysicalPlanner::default();
768 planner.create_physical_plan(logical_plan, &ctx_state)
769 }
770
771 #[test]
test_all_operators() -> Result<()>772 fn test_all_operators() -> Result<()> {
773 let testdata = arrow::util::test_util::arrow_test_data();
774 let path = format!("{}/csv/aggregate_test_100.csv", testdata);
775
776 let options = CsvReadOptions::new().schema_infer_max_records(100);
777 let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
778 // filter clause needs the type coercion rule applied
779 .filter(col("c7").lt(lit(5_u8)))?
780 .project(&[col("c1"), col("c2")])?
781 .aggregate(&[col("c1")], &[sum(col("c2"))])?
782 .sort(&[col("c1").sort(true, true)])?
783 .limit(10)?
784 .build()?;
785
786 let plan = plan(&logical_plan)?;
787
788 // verify that the plan correctly casts u8 to i64
789 let expected = "BinaryExpr { left: Column { name: \"c7\" }, op: Lt, right: CastExpr { expr: Literal { value: UInt8(5) }, cast_type: Int64 } }";
790 assert!(format!("{:?}", plan).contains(expected));
791
792 Ok(())
793 }
794
795 #[test]
test_create_not() -> Result<()>796 fn test_create_not() -> Result<()> {
797 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
798
799 let planner = DefaultPhysicalPlanner::default();
800
801 let expr =
802 planner.create_physical_expr(&col("a").not(), &schema, &make_ctx_state())?;
803 let expected = expressions::not(expressions::col("a"), &schema)?;
804
805 assert_eq!(format!("{:?}", expr), format!("{:?}", expected));
806
807 Ok(())
808 }
809
810 #[test]
test_with_csv_plan() -> Result<()>811 fn test_with_csv_plan() -> Result<()> {
812 let testdata = arrow::util::test_util::arrow_test_data();
813 let path = format!("{}/csv/aggregate_test_100.csv", testdata);
814
815 let options = CsvReadOptions::new().schema_infer_max_records(100);
816 let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
817 .filter(col("c7").lt(col("c12")))?
818 .build()?;
819
820 let plan = plan(&logical_plan)?;
821
822 // c12 is f64, c7 is u8 -> cast c7 to f64
823 let expected = "predicate: BinaryExpr { left: CastExpr { expr: Column { name: \"c7\" }, cast_type: Float64 }, op: Lt, right: Column { name: \"c12\" } }";
824 assert!(format!("{:?}", plan).contains(expected));
825 Ok(())
826 }
827
828 #[test]
errors() -> Result<()>829 fn errors() -> Result<()> {
830 let testdata = arrow::util::test_util::arrow_test_data();
831 let path = format!("{}/csv/aggregate_test_100.csv", testdata);
832 let options = CsvReadOptions::new().schema_infer_max_records(100);
833
834 let bool_expr = col("c1").eq(col("c1"));
835 let cases = vec![
836 // utf8 < u32
837 col("c1").lt(col("c2")),
838 // utf8 AND utf8
839 col("c1").and(col("c1")),
840 // u8 AND u8
841 col("c3").and(col("c3")),
842 // utf8 = u32
843 col("c1").eq(col("c2")),
844 // utf8 = bool
845 col("c1").eq(bool_expr.clone()),
846 // u32 AND bool
847 col("c2").and(bool_expr),
848 // utf8 LIKE u32
849 col("c1").like(col("c2")),
850 ];
851 for case in cases {
852 let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
853 .project(&[case.clone()]);
854 let message = format!(
855 "Expression {:?} expected to error due to impossible coercion",
856 case
857 );
858 assert!(logical_plan.is_err(), "{}", message);
859 }
860 Ok(())
861 }
862
863 #[test]
default_extension_planner()864 fn default_extension_planner() {
865 let ctx_state = make_ctx_state();
866 let planner = DefaultPhysicalPlanner::default();
867 let logical_plan = LogicalPlan::Extension {
868 node: Arc::new(NoOpExtensionNode::default()),
869 };
870 let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
871
872 let expected_error = "No installed planner was able to convert the custom node to an execution plan: NoOp";
873 match plan {
874 Ok(_) => panic!("Expected planning failure"),
875 Err(e) => assert!(
876 e.to_string().contains(expected_error),
877 "Error '{}' did not contain expected error '{}'",
878 e.to_string(),
879 expected_error
880 ),
881 }
882 }
883
884 #[test]
bad_extension_planner()885 fn bad_extension_planner() {
886 // Test that creating an execution plan whose schema doesn't
887 // match the logical plan's schema generates an error.
888 let ctx_state = make_ctx_state();
889 let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
890 BadExtensionPlanner {},
891 )]);
892
893 let logical_plan = LogicalPlan::Extension {
894 node: Arc::new(NoOpExtensionNode::default()),
895 };
896 let plan = planner.create_physical_plan(&logical_plan, &ctx_state);
897
898 let expected_error: &str = "Error during planning: \
899 Extension planner for NoOp created an ExecutionPlan with mismatched schema. \
900 LogicalPlan schema: DFSchema { fields: [\
901 DFField { qualifier: None, field: Field { \
902 name: \"a\", \
903 data_type: Int32, \
904 nullable: false, \
905 dict_id: 0, \
906 dict_is_ordered: false, \
907 metadata: None } }\
908 ] }, \
909 ExecutionPlan schema: Schema { fields: [\
910 Field { \
911 name: \"b\", \
912 data_type: Int32, \
913 nullable: false, \
914 dict_id: 0, \
915 dict_is_ordered: false, \
916 metadata: None }\
917 ], metadata: {} }";
918 match plan {
919 Ok(_) => panic!("Expected planning failure"),
920 Err(e) => assert!(
921 e.to_string().contains(expected_error),
922 "Error '{}' did not contain expected error '{}'",
923 e.to_string(),
924 expected_error
925 ),
926 }
927 }
928
929 #[test]
in_list_types() -> Result<()>930 fn in_list_types() -> Result<()> {
931 let testdata = arrow::util::test_util::arrow_test_data();
932 let path = format!("{}/csv/aggregate_test_100.csv", testdata);
933 let options = CsvReadOptions::new().schema_infer_max_records(100);
934
935 // expression: "a in ('a', 1)"
936 let list = vec![
937 Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
938 Expr::Literal(ScalarValue::Int64(Some(1))),
939 ];
940 let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
941 // filter clause needs the type coercion rule applied
942 .filter(col("c12").lt(lit(0.05)))?
943 .project(&[col("c1").in_list(list, false)])?
944 .build()?;
945 let execution_plan = plan(&logical_plan)?;
946 // verify that the plan correctly adds cast from Int64(1) to Utf8
947 let expected = "InListExpr { expr: Column { name: \"c1\" }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8 }], negated: false }";
948 assert!(format!("{:?}", execution_plan).contains(expected));
949
950 // expression: "a in (true, 'a')"
951 let list = vec![
952 Expr::Literal(ScalarValue::Boolean(Some(true))),
953 Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
954 ];
955 let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
956 // filter clause needs the type coercion rule applied
957 .filter(col("c12").lt(lit(0.05)))?
958 .project(&[col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
959 .build()?;
960 let execution_plan = plan(&logical_plan);
961
962 let expected_error = "Unsupported CAST from Utf8 to Boolean";
963 match execution_plan {
964 Ok(_) => panic!("Expected planning failure"),
965 Err(e) => assert!(
966 e.to_string().contains(expected_error),
967 "Error '{}' did not contain expected error '{}'",
968 e.to_string(),
969 expected_error
970 ),
971 }
972
973 Ok(())
974 }
975
976 #[test]
hash_agg_input_schema() -> Result<()>977 fn hash_agg_input_schema() -> Result<()> {
978 let testdata = arrow::util::test_util::arrow_test_data();
979 let path = format!("{}/csv/aggregate_test_100.csv", testdata);
980
981 let options = CsvReadOptions::new().schema_infer_max_records(100);
982 let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)?
983 .aggregate(&[col("c1")], &[sum(col("c2"))])?
984 .build()?;
985
986 let execution_plan = plan(&logical_plan)?;
987 let final_hash_agg = execution_plan
988 .as_any()
989 .downcast_ref::<HashAggregateExec>()
990 .expect("hash aggregate");
991 assert_eq!("SUM(c2)", final_hash_agg.schema().field(1).name());
992 // we need access to the input to the partial aggregate so that other projects can
993 // implement serde
994 assert_eq!("c2", final_hash_agg.input_schema().field(1).name());
995
996 Ok(())
997 }
998
999 /// An example extension node that doesn't do anything
1000 struct NoOpExtensionNode {
1001 schema: DFSchemaRef,
1002 }
1003
1004 impl Default for NoOpExtensionNode {
default() -> Self1005 fn default() -> Self {
1006 Self {
1007 schema: DFSchemaRef::new(
1008 DFSchema::new(vec![DFField::new(None, "a", DataType::Int32, false)])
1009 .unwrap(),
1010 ),
1011 }
1012 }
1013 }
1014
1015 impl Debug for NoOpExtensionNode {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result1016 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1017 write!(f, "NoOp")
1018 }
1019 }
1020
1021 impl UserDefinedLogicalNode for NoOpExtensionNode {
as_any(&self) -> &dyn Any1022 fn as_any(&self) -> &dyn Any {
1023 self
1024 }
1025
inputs(&self) -> Vec<&LogicalPlan>1026 fn inputs(&self) -> Vec<&LogicalPlan> {
1027 vec![]
1028 }
1029
schema(&self) -> &DFSchemaRef1030 fn schema(&self) -> &DFSchemaRef {
1031 &self.schema
1032 }
1033
expressions(&self) -> Vec<Expr>1034 fn expressions(&self) -> Vec<Expr> {
1035 vec![]
1036 }
1037
fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result1038 fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
1039 write!(f, "NoOp")
1040 }
1041
from_template( &self, _exprs: &[Expr], _inputs: &[LogicalPlan], ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync>1042 fn from_template(
1043 &self,
1044 _exprs: &[Expr],
1045 _inputs: &[LogicalPlan],
1046 ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
1047 unimplemented!("NoOp");
1048 }
1049 }
1050
1051 #[derive(Debug)]
1052 struct NoOpExecutionPlan {
1053 schema: SchemaRef,
1054 }
1055
1056 #[async_trait]
1057 impl ExecutionPlan for NoOpExecutionPlan {
1058 /// Return a reference to Any that can be used for downcasting
as_any(&self) -> &dyn Any1059 fn as_any(&self) -> &dyn Any {
1060 self
1061 }
1062
schema(&self) -> SchemaRef1063 fn schema(&self) -> SchemaRef {
1064 self.schema.clone()
1065 }
1066
output_partitioning(&self) -> Partitioning1067 fn output_partitioning(&self) -> Partitioning {
1068 Partitioning::UnknownPartitioning(1)
1069 }
1070
children(&self) -> Vec<Arc<dyn ExecutionPlan>>1071 fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
1072 vec![]
1073 }
1074
with_new_children( &self, _children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>1075 fn with_new_children(
1076 &self,
1077 _children: Vec<Arc<dyn ExecutionPlan>>,
1078 ) -> Result<Arc<dyn ExecutionPlan>> {
1079 unimplemented!("NoOpExecutionPlan::with_new_children");
1080 }
1081
execute(&self, _partition: usize) -> Result<SendableRecordBatchStream>1082 async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
1083 unimplemented!("NoOpExecutionPlan::execute");
1084 }
1085 }
1086
1087 // Produces an execution plan where the schema is mismatched from
1088 // the logical plan node.
1089 struct BadExtensionPlanner {}
1090
1091 impl ExtensionPlanner for BadExtensionPlanner {
1092 /// Create a physical plan for an extension node
plan_extension( &self, _node: &dyn UserDefinedLogicalNode, _inputs: &[Arc<dyn ExecutionPlan>], _ctx_state: &ExecutionContextState, ) -> Result<Option<Arc<dyn ExecutionPlan>>>1093 fn plan_extension(
1094 &self,
1095 _node: &dyn UserDefinedLogicalNode,
1096 _inputs: &[Arc<dyn ExecutionPlan>],
1097 _ctx_state: &ExecutionContextState,
1098 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
1099 Ok(Some(Arc::new(NoOpExecutionPlan {
1100 schema: SchemaRef::new(Schema::new(vec![Field::new(
1101 "b",
1102 DataType::Int32,
1103 false,
1104 )])),
1105 })))
1106 }
1107 }
1108 }
1109