1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.command
19
20import scala.util.control.NonFatal
21
22import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
23import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
24import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
25import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
26import org.apache.spark.sql.catalyst.expressions.Alias
27import org.apache.spark.sql.catalyst.plans.QueryPlan
28import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
29import org.apache.spark.sql.types.MetadataBuilder
30
31
32/**
33 * ViewType is used to specify the expected view type when we want to create or replace a view in
34 * [[CreateViewCommand]].
35 */
36sealed trait ViewType {
37  override def toString: String = getClass.getSimpleName.stripSuffix("$")
38}
39
40/**
41 * LocalTempView means session-scoped local temporary views. Its lifetime is the lifetime of the
42 * session that created it, i.e. it will be automatically dropped when the session terminates. It's
43 * not tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view.
44 */
45object LocalTempView extends ViewType
46
47/**
48 * GlobalTempView means cross-session global temporary views. Its lifetime is the lifetime of the
49 * Spark application, i.e. it will be automatically dropped when the application terminates. It's
50 * tied to a system preserved database `_global_temp`, and we must use the qualified name to refer a
51 * global temp view, e.g. SELECT * FROM _global_temp.view1.
52 */
53object GlobalTempView extends ViewType
54
55/**
56 * PersistedView means cross-session persisted views. Persisted views stay until they are
57 * explicitly dropped by user command. It's always tied to a database, default to the current
58 * database if not specified.
59 *
60 * Note that, Existing persisted view with the same name are not visible to the current session
61 * while the local temporary view exists, unless the view name is qualified by database.
62 */
63object PersistedView extends ViewType
64
65
66/**
67 * Create or replace a view with given query plan. This command will convert the query plan to
68 * canonicalized SQL string, and store it as view text in metastore, if we need to create a
69 * permanent view.
70 *
71 * @param name the name of this view.
72 * @param userSpecifiedColumns the output column names and optional comments specified by users,
73 *                             can be Nil if not specified.
74 * @param comment the comment of this view.
75 * @param properties the properties of this view.
76 * @param originalText the original SQL text of this view, can be None if this view is created via
77 *                     Dataset API.
78 * @param child the logical plan that represents the view; this is used to generate a canonicalized
79 *              version of the SQL that can be saved in the catalog.
80 * @param allowExisting if true, and if the view already exists, noop; if false, and if the view
81 *                already exists, throws analysis exception.
82 * @param replace if true, and if the view already exists, updates it; if false, and if the view
83 *                already exists, throws analysis exception.
84 * @param viewType the expected view type to be created with this command.
85 */
86case class CreateViewCommand(
87    name: TableIdentifier,
88    userSpecifiedColumns: Seq[(String, Option[String])],
89    comment: Option[String],
90    properties: Map[String, String],
91    originalText: Option[String],
92    child: LogicalPlan,
93    allowExisting: Boolean,
94    replace: Boolean,
95    viewType: ViewType)
96  extends RunnableCommand {
97
98  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
99
100  if (viewType == PersistedView) {
101    require(originalText.isDefined, "'originalText' must be provided to create permanent view")
102  }
103
104  if (allowExisting && replace) {
105    throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
106  }
107
108  private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView
109
110  // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
111  if (allowExisting && isTemporary) {
112    throw new AnalysisException(
113      "It is not allowed to define a TEMPORARY view with IF NOT EXISTS.")
114  }
115
116  // Temporary view names should NOT contain database prefix like "database.table"
117  if (isTemporary && name.database.isDefined) {
118    val database = name.database.get
119    throw new AnalysisException(
120      s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.")
121  }
122
123  override def run(sparkSession: SparkSession): Seq[Row] = {
124    // If the plan cannot be analyzed, throw an exception and don't proceed.
125    val qe = sparkSession.sessionState.executePlan(child)
126    qe.assertAnalyzed()
127    val analyzedPlan = qe.analyzed
128
129    if (userSpecifiedColumns.nonEmpty &&
130        userSpecifiedColumns.length != analyzedPlan.output.length) {
131      throw new AnalysisException(s"The number of columns produced by the SELECT clause " +
132        s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " +
133        s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
134    }
135
136    // When creating a permanent view, not allowed to reference temporary objects.
137    // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
138    verifyTemporaryObjectsNotExists(sparkSession)
139
140    val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
141      analyzedPlan
142    } else {
143      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
144        case (attr, (colName, None)) => Alias(attr, colName)()
145        case (attr, (colName, Some(colComment))) =>
146          val meta = new MetadataBuilder().putString("comment", colComment).build()
147          Alias(attr, colName)(explicitMetadata = Some(meta))
148      }
149      sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
150    }
151
152    val catalog = sparkSession.sessionState.catalog
153    if (viewType == LocalTempView) {
154      catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
155    } else if (viewType == GlobalTempView) {
156      catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace)
157    } else if (catalog.tableExists(name)) {
158      val tableMetadata = catalog.getTableMetadata(name)
159      if (allowExisting) {
160        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
161        // already exists.
162      } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
163        throw new AnalysisException(s"$name is not a view")
164      } else if (replace) {
165        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
166        catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
167      } else {
168        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
169        // exists.
170        throw new AnalysisException(
171          s"View $name already exists. If you want to update the view definition, " +
172            "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
173      }
174    } else {
175      // Create the view if it doesn't exist.
176      catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false)
177    }
178    Seq.empty[Row]
179  }
180
181  /**
182   * Permanent views are not allowed to reference temp objects, including temp function and views
183   */
184  private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = {
185    if (!isTemporary) {
186      // This func traverses the unresolved plan `child`. Below are the reasons:
187      // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding
188      // logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is
189      // added/generated from a temporary view.
190      // 2) The temp functions are represented by multiple classes. Most are inaccessible from this
191      // package (e.g., HiveGenericUDF).
192      child.collect {
193        // Disallow creating permanent views based on temporary views.
194        case s: UnresolvedRelation
195          if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
196          throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
197            s"referencing a temporary view ${s.tableIdentifier}")
198        case other if !other.resolved => other.expressions.flatMap(_.collect {
199          // Disallow creating permanent views based on temporary UDFs.
200          case e: UnresolvedFunction
201            if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) =>
202            throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
203              s"referencing a temporary function `${e.name}`")
204        })
205      }
206    }
207  }
208
209  /**
210   * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
211   * SQL based on the analyzed plan, and also creates the proper schema for the view.
212   */
213  private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = {
214    val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
215
216    // Validate the view SQL - make sure we can parse it and analyze it.
217    // If we cannot analyze the generated query, there is probably a bug in SQL generation.
218    try {
219      sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
220    } catch {
221      case NonFatal(e) =>
222        throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
223    }
224
225    CatalogTable(
226      identifier = name,
227      tableType = CatalogTableType.VIEW,
228      storage = CatalogStorageFormat.empty,
229      schema = aliasedPlan.schema,
230      properties = properties,
231      viewOriginalText = originalText,
232      viewText = Some(viewSQL),
233      comment = comment
234    )
235  }
236}
237
238/**
239 * Alter a view with given query plan. If the view name contains database prefix, this command will
240 * alter a permanent view matching the given name, or throw an exception if view not exist. Else,
241 * this command will try to alter a temporary view first, if view not exist, try permanent view
242 * next, if still not exist, throw an exception.
243 *
244 * @param name the name of this view.
245 * @param originalText the original SQL text of this view. Note that we can only alter a view by
246 *                     SQL API, which means we always have originalText.
247 * @param query the logical plan that represents the view; this is used to generate a canonicalized
248 *              version of the SQL that can be saved in the catalog.
249 */
250case class AlterViewAsCommand(
251    name: TableIdentifier,
252    originalText: String,
253    query: LogicalPlan) extends RunnableCommand {
254
255  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
256
257  override def run(session: SparkSession): Seq[Row] = {
258    // If the plan cannot be analyzed, throw an exception and don't proceed.
259    val qe = session.sessionState.executePlan(query)
260    qe.assertAnalyzed()
261    val analyzedPlan = qe.analyzed
262
263    if (session.sessionState.catalog.alterTempViewDefinition(name, analyzedPlan)) {
264      // a local/global temp view has been altered, we are done.
265    } else {
266      alterPermanentView(session, analyzedPlan)
267    }
268
269    Seq.empty[Row]
270  }
271
272  private def alterPermanentView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = {
273    val viewMeta = session.sessionState.catalog.getTableMetadata(name)
274    if (viewMeta.tableType != CatalogTableType.VIEW) {
275      throw new AnalysisException(s"${viewMeta.identifier} is not a view.")
276    }
277
278    val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL
279    // Validate the view SQL - make sure we can parse it and analyze it.
280    // If we cannot analyze the generated query, there is probably a bug in SQL generation.
281    try {
282      session.sql(viewSQL).queryExecution.assertAnalyzed()
283    } catch {
284      case NonFatal(e) =>
285        throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e)
286    }
287
288    val updatedViewMeta = viewMeta.copy(
289      schema = analyzedPlan.schema,
290      viewOriginalText = Some(originalText),
291      viewText = Some(viewSQL))
292
293    session.sessionState.catalog.alterTable(updatedViewMeta)
294  }
295}
296