1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 //! Defines the EXPLAIN operator
19 
20 use std::any::Any;
21 use std::sync::Arc;
22 
23 use crate::error::{DataFusionError, Result};
24 use crate::{
25     logical_plan::StringifiedPlan,
26     physical_plan::{common::SizedRecordBatchStream, ExecutionPlan},
27 };
28 use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
29 
30 use crate::physical_plan::Partitioning;
31 
32 use super::SendableRecordBatchStream;
33 use async_trait::async_trait;
34 
35 /// Explain execution plan operator. This operator contains the string
36 /// values of the various plans it has when it is created, and passes
37 /// them to its output.
38 #[derive(Debug, Clone)]
39 pub struct ExplainExec {
40     /// The schema that this exec plan node outputs
41     schema: SchemaRef,
42     /// The strings to be printed
43     stringified_plans: Vec<StringifiedPlan>,
44 }
45 
46 impl ExplainExec {
47     /// Create a new ExplainExec
new(schema: SchemaRef, stringified_plans: Vec<StringifiedPlan>) -> Self48     pub fn new(schema: SchemaRef, stringified_plans: Vec<StringifiedPlan>) -> Self {
49         ExplainExec {
50             schema,
51             stringified_plans,
52         }
53     }
54 
55     /// The strings to be printed
stringified_plans(&self) -> &[StringifiedPlan]56     pub fn stringified_plans(&self) -> &[StringifiedPlan] {
57         &self.stringified_plans
58     }
59 }
60 
61 #[async_trait]
62 impl ExecutionPlan for ExplainExec {
63     /// Return a reference to Any that can be used for downcasting
as_any(&self) -> &dyn Any64     fn as_any(&self) -> &dyn Any {
65         self
66     }
67 
schema(&self) -> SchemaRef68     fn schema(&self) -> SchemaRef {
69         self.schema.clone()
70     }
71 
children(&self) -> Vec<Arc<dyn ExecutionPlan>>72     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
73         // this is a leaf node and has no children
74         vec![]
75     }
76 
77     /// Get the output partitioning of this plan
output_partitioning(&self) -> Partitioning78     fn output_partitioning(&self) -> Partitioning {
79         Partitioning::UnknownPartitioning(1)
80     }
81 
with_new_children( &self, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>82     fn with_new_children(
83         &self,
84         children: Vec<Arc<dyn ExecutionPlan>>,
85     ) -> Result<Arc<dyn ExecutionPlan>> {
86         if children.is_empty() {
87             Ok(Arc::new(self.clone()))
88         } else {
89             Err(DataFusionError::Internal(format!(
90                 "Children cannot be replaced in {:?}",
91                 self
92             )))
93         }
94     }
95 
execute(&self, partition: usize) -> Result<SendableRecordBatchStream>96     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
97         if 0 != partition {
98             return Err(DataFusionError::Internal(format!(
99                 "ExplainExec invalid partition {}",
100                 partition
101             )));
102         }
103 
104         let mut type_builder = StringBuilder::new(self.stringified_plans.len());
105         let mut plan_builder = StringBuilder::new(self.stringified_plans.len());
106 
107         for p in &self.stringified_plans {
108             type_builder.append_value(&String::from(&p.plan_type))?;
109             plan_builder.append_value(&p.plan)?;
110         }
111 
112         let record_batch = RecordBatch::try_new(
113             self.schema.clone(),
114             vec![
115                 Arc::new(type_builder.finish()),
116                 Arc::new(plan_builder.finish()),
117             ],
118         )?;
119 
120         Ok(Box::pin(SizedRecordBatchStream::new(
121             self.schema.clone(),
122             vec![Arc::new(record_batch)],
123         )))
124     }
125 }
126