1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.command
19
20import org.apache.spark.rdd.RDD
21import org.apache.spark.sql.{Row, SparkSession}
22import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
23import org.apache.spark.sql.catalyst.errors.TreeNodeException
24import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
25import org.apache.spark.sql.catalyst.plans.QueryPlan
26import org.apache.spark.sql.catalyst.plans.logical
27import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
28import org.apache.spark.sql.execution.SparkPlan
29import org.apache.spark.sql.execution.debug._
30import org.apache.spark.sql.execution.streaming.IncrementalExecution
31import org.apache.spark.sql.streaming.OutputMode
32import org.apache.spark.sql.types._
33
34/**
35 * A logical command that is executed for its side-effects.  `RunnableCommand`s are
36 * wrapped in `ExecutedCommand` during execution.
37 */
38trait RunnableCommand extends logical.Command {
39  def run(sparkSession: SparkSession): Seq[Row]
40}
41
42/**
43 * A physical operator that executes the run method of a `RunnableCommand` and
44 * saves the result to prevent multiple executions.
45 */
46case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
47  /**
48   * A concrete command should override this lazy field to wrap up any side effects caused by the
49   * command or any other computation that should be evaluated exactly once. The value of this field
50   * can be used as the contents of the corresponding RDD generated from the physical plan of this
51   * command.
52   *
53   * The `execute()` method of all the physical command classes should reference `sideEffectResult`
54   * so that the command can be executed eagerly right after the command query is created.
55   */
56  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
57    val converter = CatalystTypeConverters.createToCatalystConverter(schema)
58    cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
59  }
60
61  override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
62
63  override def output: Seq[Attribute] = cmd.output
64
65  override def children: Seq[SparkPlan] = Nil
66
67  override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
68
69  override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
70
71  override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
72
73  protected override def doExecute(): RDD[InternalRow] = {
74    sqlContext.sparkContext.parallelize(sideEffectResult, 1)
75  }
76}
77
78/**
79 * An explain command for users to see how a command will be executed.
80 *
81 * Note that this command takes in a logical plan, runs the optimizer on the logical plan
82 * (but do NOT actually execute it).
83 *
84 * {{{
85 *   EXPLAIN (EXTENDED | CODEGEN) SELECT * FROM ...
86 * }}}
87 *
88 * @param logicalPlan plan to explain
89 * @param extended whether to do extended explain or not
90 * @param codegen whether to output generated code from whole-stage codegen or not
91 */
92case class ExplainCommand(
93    logicalPlan: LogicalPlan,
94    extended: Boolean = false,
95    codegen: Boolean = false)
96  extends RunnableCommand {
97
98  override val output: Seq[Attribute] =
99    Seq(AttributeReference("plan", StringType, nullable = true)())
100
101  // Run through the optimizer to generate the physical plan.
102  override def run(sparkSession: SparkSession): Seq[Row] = try {
103    val queryExecution =
104      if (logicalPlan.isStreaming) {
105        // This is used only by explaining `Dataset/DataFrame` created by `spark.readStream`, so the
106        // output mode does not matter since there is no `Sink`.
107        new IncrementalExecution(sparkSession, logicalPlan, OutputMode.Append(), "<unknown>", 0, 0)
108      } else {
109        sparkSession.sessionState.executePlan(logicalPlan)
110      }
111    val outputString =
112      if (codegen) {
113        codegenString(queryExecution.executedPlan)
114      } else if (extended) {
115        queryExecution.toString
116      } else {
117        queryExecution.simpleString
118      }
119    Seq(Row(outputString))
120  } catch { case cause: TreeNodeException[_] =>
121    ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
122  }
123}
124
125/** An explain command for users to see how a streaming batch is executed. */
126case class StreamingExplainCommand(
127    queryExecution: IncrementalExecution,
128    extended: Boolean) extends RunnableCommand {
129
130  override val output: Seq[Attribute] =
131    Seq(AttributeReference("plan", StringType, nullable = true)())
132
133  // Run through the optimizer to generate the physical plan.
134  override def run(sparkSession: SparkSession): Seq[Row] = try {
135    val outputString =
136      if (extended) {
137        queryExecution.toString
138      } else {
139        queryExecution.simpleString
140      }
141    Seq(Row(outputString))
142  } catch { case cause: TreeNodeException[_] =>
143    ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
144  }
145}
146