1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.command 19 20import scala.util.control.NonFatal 21 22import org.apache.spark.sql.{AnalysisException, Row, SparkSession} 23import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} 24import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} 25import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} 26import org.apache.spark.sql.catalyst.expressions.Alias 27import org.apache.spark.sql.catalyst.plans.QueryPlan 28import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} 29import org.apache.spark.sql.types.MetadataBuilder 30 31 32/** 33 * ViewType is used to specify the expected view type when we want to create or replace a view in 34 * [[CreateViewCommand]]. 35 */ 36sealed trait ViewType { 37 override def toString: String = getClass.getSimpleName.stripSuffix("$") 38} 39 40/** 41 * LocalTempView means session-scoped local temporary views. Its lifetime is the lifetime of the 42 * session that created it, i.e. it will be automatically dropped when the session terminates. It's 43 * not tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view. 44 */ 45object LocalTempView extends ViewType 46 47/** 48 * GlobalTempView means cross-session global temporary views. Its lifetime is the lifetime of the 49 * Spark application, i.e. it will be automatically dropped when the application terminates. It's 50 * tied to a system preserved database `_global_temp`, and we must use the qualified name to refer a 51 * global temp view, e.g. SELECT * FROM _global_temp.view1. 52 */ 53object GlobalTempView extends ViewType 54 55/** 56 * PersistedView means cross-session persisted views. Persisted views stay until they are 57 * explicitly dropped by user command. It's always tied to a database, default to the current 58 * database if not specified. 59 * 60 * Note that, Existing persisted view with the same name are not visible to the current session 61 * while the local temporary view exists, unless the view name is qualified by database. 62 */ 63object PersistedView extends ViewType 64 65 66/** 67 * Create or replace a view with given query plan. This command will convert the query plan to 68 * canonicalized SQL string, and store it as view text in metastore, if we need to create a 69 * permanent view. 70 * 71 * @param name the name of this view. 72 * @param userSpecifiedColumns the output column names and optional comments specified by users, 73 * can be Nil if not specified. 74 * @param comment the comment of this view. 75 * @param properties the properties of this view. 76 * @param originalText the original SQL text of this view, can be None if this view is created via 77 * Dataset API. 78 * @param child the logical plan that represents the view; this is used to generate a canonicalized 79 * version of the SQL that can be saved in the catalog. 80 * @param allowExisting if true, and if the view already exists, noop; if false, and if the view 81 * already exists, throws analysis exception. 82 * @param replace if true, and if the view already exists, updates it; if false, and if the view 83 * already exists, throws analysis exception. 84 * @param viewType the expected view type to be created with this command. 85 */ 86case class CreateViewCommand( 87 name: TableIdentifier, 88 userSpecifiedColumns: Seq[(String, Option[String])], 89 comment: Option[String], 90 properties: Map[String, String], 91 originalText: Option[String], 92 child: LogicalPlan, 93 allowExisting: Boolean, 94 replace: Boolean, 95 viewType: ViewType) 96 extends RunnableCommand { 97 98 override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) 99 100 if (viewType == PersistedView) { 101 require(originalText.isDefined, "'originalText' must be provided to create permanent view") 102 } 103 104 if (allowExisting && replace) { 105 throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.") 106 } 107 108 private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView 109 110 // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE' 111 if (allowExisting && isTemporary) { 112 throw new AnalysisException( 113 "It is not allowed to define a TEMPORARY view with IF NOT EXISTS.") 114 } 115 116 // Temporary view names should NOT contain database prefix like "database.table" 117 if (isTemporary && name.database.isDefined) { 118 val database = name.database.get 119 throw new AnalysisException( 120 s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.") 121 } 122 123 override def run(sparkSession: SparkSession): Seq[Row] = { 124 // If the plan cannot be analyzed, throw an exception and don't proceed. 125 val qe = sparkSession.sessionState.executePlan(child) 126 qe.assertAnalyzed() 127 val analyzedPlan = qe.analyzed 128 129 if (userSpecifiedColumns.nonEmpty && 130 userSpecifiedColumns.length != analyzedPlan.output.length) { 131 throw new AnalysisException(s"The number of columns produced by the SELECT clause " + 132 s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " + 133 s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).") 134 } 135 136 // When creating a permanent view, not allowed to reference temporary objects. 137 // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved) 138 verifyTemporaryObjectsNotExists(sparkSession) 139 140 val aliasedPlan = if (userSpecifiedColumns.isEmpty) { 141 analyzedPlan 142 } else { 143 val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { 144 case (attr, (colName, None)) => Alias(attr, colName)() 145 case (attr, (colName, Some(colComment))) => 146 val meta = new MetadataBuilder().putString("comment", colComment).build() 147 Alias(attr, colName)(explicitMetadata = Some(meta)) 148 } 149 sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed 150 } 151 152 val catalog = sparkSession.sessionState.catalog 153 if (viewType == LocalTempView) { 154 catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace) 155 } else if (viewType == GlobalTempView) { 156 catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace) 157 } else if (catalog.tableExists(name)) { 158 val tableMetadata = catalog.getTableMetadata(name) 159 if (allowExisting) { 160 // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view 161 // already exists. 162 } else if (tableMetadata.tableType != CatalogTableType.VIEW) { 163 throw new AnalysisException(s"$name is not a view") 164 } else if (replace) { 165 // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` 166 catalog.alterTable(prepareTable(sparkSession, aliasedPlan)) 167 } else { 168 // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already 169 // exists. 170 throw new AnalysisException( 171 s"View $name already exists. If you want to update the view definition, " + 172 "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") 173 } 174 } else { 175 // Create the view if it doesn't exist. 176 catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false) 177 } 178 Seq.empty[Row] 179 } 180 181 /** 182 * Permanent views are not allowed to reference temp objects, including temp function and views 183 */ 184 private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = { 185 if (!isTemporary) { 186 // This func traverses the unresolved plan `child`. Below are the reasons: 187 // 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding 188 // logical plan. After replacement, it is impossible to detect whether the SubqueryAlias is 189 // added/generated from a temporary view. 190 // 2) The temp functions are represented by multiple classes. Most are inaccessible from this 191 // package (e.g., HiveGenericUDF). 192 child.collect { 193 // Disallow creating permanent views based on temporary views. 194 case s: UnresolvedRelation 195 if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) => 196 throw new AnalysisException(s"Not allowed to create a permanent view $name by " + 197 s"referencing a temporary view ${s.tableIdentifier}") 198 case other if !other.resolved => other.expressions.flatMap(_.collect { 199 // Disallow creating permanent views based on temporary UDFs. 200 case e: UnresolvedFunction 201 if sparkSession.sessionState.catalog.isTemporaryFunction(e.name) => 202 throw new AnalysisException(s"Not allowed to create a permanent view $name by " + 203 s"referencing a temporary function `${e.name}`") 204 }) 205 } 206 } 207 } 208 209 /** 210 * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize 211 * SQL based on the analyzed plan, and also creates the proper schema for the view. 212 */ 213 private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = { 214 val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL 215 216 // Validate the view SQL - make sure we can parse it and analyze it. 217 // If we cannot analyze the generated query, there is probably a bug in SQL generation. 218 try { 219 sparkSession.sql(viewSQL).queryExecution.assertAnalyzed() 220 } catch { 221 case NonFatal(e) => 222 throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) 223 } 224 225 CatalogTable( 226 identifier = name, 227 tableType = CatalogTableType.VIEW, 228 storage = CatalogStorageFormat.empty, 229 schema = aliasedPlan.schema, 230 properties = properties, 231 viewOriginalText = originalText, 232 viewText = Some(viewSQL), 233 comment = comment 234 ) 235 } 236} 237 238/** 239 * Alter a view with given query plan. If the view name contains database prefix, this command will 240 * alter a permanent view matching the given name, or throw an exception if view not exist. Else, 241 * this command will try to alter a temporary view first, if view not exist, try permanent view 242 * next, if still not exist, throw an exception. 243 * 244 * @param name the name of this view. 245 * @param originalText the original SQL text of this view. Note that we can only alter a view by 246 * SQL API, which means we always have originalText. 247 * @param query the logical plan that represents the view; this is used to generate a canonicalized 248 * version of the SQL that can be saved in the catalog. 249 */ 250case class AlterViewAsCommand( 251 name: TableIdentifier, 252 originalText: String, 253 query: LogicalPlan) extends RunnableCommand { 254 255 override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query) 256 257 override def run(session: SparkSession): Seq[Row] = { 258 // If the plan cannot be analyzed, throw an exception and don't proceed. 259 val qe = session.sessionState.executePlan(query) 260 qe.assertAnalyzed() 261 val analyzedPlan = qe.analyzed 262 263 if (session.sessionState.catalog.alterTempViewDefinition(name, analyzedPlan)) { 264 // a local/global temp view has been altered, we are done. 265 } else { 266 alterPermanentView(session, analyzedPlan) 267 } 268 269 Seq.empty[Row] 270 } 271 272 private def alterPermanentView(session: SparkSession, analyzedPlan: LogicalPlan): Unit = { 273 val viewMeta = session.sessionState.catalog.getTableMetadata(name) 274 if (viewMeta.tableType != CatalogTableType.VIEW) { 275 throw new AnalysisException(s"${viewMeta.identifier} is not a view.") 276 } 277 278 val viewSQL: String = new SQLBuilder(analyzedPlan).toSQL 279 // Validate the view SQL - make sure we can parse it and analyze it. 280 // If we cannot analyze the generated query, there is probably a bug in SQL generation. 281 try { 282 session.sql(viewSQL).queryExecution.assertAnalyzed() 283 } catch { 284 case NonFatal(e) => 285 throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) 286 } 287 288 val updatedViewMeta = viewMeta.copy( 289 schema = analyzedPlan.schema, 290 viewOriginalText = Some(originalText), 291 viewText = Some(viewSQL)) 292 293 session.sessionState.catalog.alterTable(updatedViewMeta) 294 } 295} 296