1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.command 19 20import org.apache.spark.rdd.RDD 21import org.apache.spark.sql.{Row, SparkSession} 22import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} 23import org.apache.spark.sql.catalyst.errors.TreeNodeException 24import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} 25import org.apache.spark.sql.catalyst.plans.QueryPlan 26import org.apache.spark.sql.catalyst.plans.logical 27import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 28import org.apache.spark.sql.execution.SparkPlan 29import org.apache.spark.sql.execution.debug._ 30import org.apache.spark.sql.execution.streaming.IncrementalExecution 31import org.apache.spark.sql.streaming.OutputMode 32import org.apache.spark.sql.types._ 33 34/** 35 * A logical command that is executed for its side-effects. `RunnableCommand`s are 36 * wrapped in `ExecutedCommand` during execution. 37 */ 38trait RunnableCommand extends logical.Command { 39 def run(sparkSession: SparkSession): Seq[Row] 40} 41 42/** 43 * A physical operator that executes the run method of a `RunnableCommand` and 44 * saves the result to prevent multiple executions. 45 */ 46case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { 47 /** 48 * A concrete command should override this lazy field to wrap up any side effects caused by the 49 * command or any other computation that should be evaluated exactly once. The value of this field 50 * can be used as the contents of the corresponding RDD generated from the physical plan of this 51 * command. 52 * 53 * The `execute()` method of all the physical command classes should reference `sideEffectResult` 54 * so that the command can be executed eagerly right after the command query is created. 55 */ 56 protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { 57 val converter = CatalystTypeConverters.createToCatalystConverter(schema) 58 cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) 59 } 60 61 override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil 62 63 override def output: Seq[Attribute] = cmd.output 64 65 override def children: Seq[SparkPlan] = Nil 66 67 override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray 68 69 override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator 70 71 override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray 72 73 protected override def doExecute(): RDD[InternalRow] = { 74 sqlContext.sparkContext.parallelize(sideEffectResult, 1) 75 } 76} 77 78/** 79 * An explain command for users to see how a command will be executed. 80 * 81 * Note that this command takes in a logical plan, runs the optimizer on the logical plan 82 * (but do NOT actually execute it). 83 * 84 * {{{ 85 * EXPLAIN (EXTENDED | CODEGEN) SELECT * FROM ... 86 * }}} 87 * 88 * @param logicalPlan plan to explain 89 * @param extended whether to do extended explain or not 90 * @param codegen whether to output generated code from whole-stage codegen or not 91 */ 92case class ExplainCommand( 93 logicalPlan: LogicalPlan, 94 extended: Boolean = false, 95 codegen: Boolean = false) 96 extends RunnableCommand { 97 98 override val output: Seq[Attribute] = 99 Seq(AttributeReference("plan", StringType, nullable = true)()) 100 101 // Run through the optimizer to generate the physical plan. 102 override def run(sparkSession: SparkSession): Seq[Row] = try { 103 val queryExecution = 104 if (logicalPlan.isStreaming) { 105 // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the 106 // output mode does not matter since there is no `Sink`. 107 new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0, 0) 108 } else { 109 sparkSession.sessionState.executePlan(logicalPlan) 110 } 111 val outputString = 112 if (codegen) { 113 codegenString(queryExecution.executedPlan) 114 } else if (extended) { 115 queryExecution.toString 116 } else { 117 queryExecution.simpleString 118 } 119 Seq(Row(outputString)) 120 } catch { case cause: TreeNodeException[_] => 121 ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) 122 } 123} 124 125/** An explain command for users to see how a streaming batch is executed. */ 126case class StreamingExplainCommand( 127 queryExecution: IncrementalExecution, 128 extended: Boolean) extends RunnableCommand { 129 130 override val output: Seq[Attribute] = 131 Seq(AttributeReference("plan", StringType, nullable = true)()) 132 133 // Run through the optimizer to generate the physical plan. 134 override def run(sparkSession: SparkSession): Seq[Row] = try { 135 val outputString = 136 if (extended) { 137 queryExecution.toString 138 } else { 139 queryExecution.simpleString 140 } 141 Seq(Row(outputString)) 142 } catch { case cause: TreeNodeException[_] => 143 ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) 144 } 145} 146