1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.command 19 20import org.apache.spark.sql._ 21import org.apache.spark.sql.catalyst.catalog._ 22import org.apache.spark.sql.catalyst.expressions.NamedExpression 23import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 24import org.apache.spark.sql.execution.datasources._ 25import org.apache.spark.sql.sources.BaseRelation 26 27/** 28 * A command used to create a data source table. 29 * 30 * Note: This is different from [[CreateTableCommand]]. Please check the syntax for difference. 31 * This is not intended for temporary tables. 32 * 33 * The syntax of using this command in SQL is: 34 * {{{ 35 * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name 36 * [(col1 data_type [COMMENT col_comment], ...)] 37 * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) 38 * }}} 39 */ 40case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean) 41 extends RunnableCommand { 42 43 override def run(sparkSession: SparkSession): Seq[Row] = { 44 assert(table.tableType != CatalogTableType.VIEW) 45 assert(table.provider.isDefined) 46 47 val sessionState = sparkSession.sessionState 48 if (sessionState.catalog.tableExists(table.identifier)) { 49 if (ignoreIfExists) { 50 return Seq.empty[Row] 51 } else { 52 throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") 53 } 54 } 55 56 // Create the relation to validate the arguments before writing the metadata to the metastore, 57 // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. 58 val pathOption = table.storage.locationUri.map("path" -> _) 59 // Fill in some default table options from the session conf 60 val tableWithDefaultOptions = table.copy( 61 identifier = table.identifier.copy( 62 database = Some( 63 table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))), 64 tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions) 65 val dataSource: BaseRelation = 66 DataSource( 67 sparkSession = sparkSession, 68 userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), 69 partitionColumns = table.partitionColumnNames, 70 className = table.provider.get, 71 bucketSpec = table.bucketSpec, 72 options = table.storage.properties ++ pathOption, 73 catalogTable = Some(tableWithDefaultOptions)).resolveRelation() 74 75 dataSource match { 76 case fs: HadoopFsRelation => 77 if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) { 78 throw new AnalysisException( 79 "Cannot create a file-based external data source table without path") 80 } 81 case _ => 82 } 83 84 val partitionColumnNames = if (table.schema.nonEmpty) { 85 table.partitionColumnNames 86 } else { 87 // This is guaranteed in `PreprocessDDL`. 88 assert(table.partitionColumnNames.isEmpty) 89 dataSource match { 90 case r: HadoopFsRelation => r.partitionSchema.fieldNames.toSeq 91 case _ => Nil 92 } 93 } 94 95 val newTable = table.copy( 96 schema = dataSource.schema, 97 partitionColumnNames = partitionColumnNames, 98 // If metastore partition management for file source tables is enabled, we start off with 99 // partition provider hive, but no partitions in the metastore. The user has to call 100 // `msck repair table` to populate the table partitions. 101 tracksPartitionsInCatalog = partitionColumnNames.nonEmpty && 102 sparkSession.sessionState.conf.manageFilesourcePartitions) 103 // We will return Nil or throw exception at the beginning if the table already exists, so when 104 // we reach here, the table should not exist and we should set `ignoreIfExists` to false. 105 sessionState.catalog.createTable(newTable, ignoreIfExists = false) 106 107 Seq.empty[Row] 108 } 109} 110 111/** 112 * A command used to create a data source table using the result of a query. 113 * 114 * Note: This is different from `CreateHiveTableAsSelectCommand`. Please check the syntax for 115 * difference. This is not intended for temporary tables. 116 * 117 * The syntax of using this command in SQL is: 118 * {{{ 119 * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name 120 * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) 121 * AS SELECT ... 122 * }}} 123 */ 124case class CreateDataSourceTableAsSelectCommand( 125 table: CatalogTable, 126 mode: SaveMode, 127 query: LogicalPlan) 128 extends RunnableCommand { 129 130 override protected def innerChildren: Seq[LogicalPlan] = Seq(query) 131 132 override def run(sparkSession: SparkSession): Seq[Row] = { 133 assert(table.tableType != CatalogTableType.VIEW) 134 assert(table.provider.isDefined) 135 assert(table.schema.isEmpty) 136 137 val provider = table.provider.get 138 val sessionState = sparkSession.sessionState 139 val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) 140 val tableIdentWithDB = table.identifier.copy(database = Some(db)) 141 val tableName = tableIdentWithDB.unquotedString 142 143 var createMetastoreTable = false 144 // We may need to reorder the columns of the query to match the existing table. 145 var reorderedColumns = Option.empty[Seq[NamedExpression]] 146 if (sessionState.catalog.tableExists(tableIdentWithDB)) { 147 // Check if we need to throw an exception or just return. 148 mode match { 149 case SaveMode.ErrorIfExists => 150 throw new AnalysisException(s"Table $tableName already exists. " + 151 s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " + 152 s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" + 153 s"the existing data. " + 154 s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.") 155 case SaveMode.Ignore => 156 // Since the table already exists and the save mode is Ignore, we will just return. 157 return Seq.empty[Row] 158 case SaveMode.Append => 159 val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB) 160 161 if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) { 162 throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " + 163 "not supported yet. Please use the insertInto() API as an alternative.") 164 } 165 166 // Check if the specified data source match the data source of the existing table. 167 val existingProvider = DataSource.lookupDataSource(existingTable.provider.get) 168 val specifiedProvider = DataSource.lookupDataSource(table.provider.get) 169 // TODO: Check that options from the resolved relation match the relation that we are 170 // inserting into (i.e. using the same compression). 171 if (existingProvider != specifiedProvider) { 172 throw new AnalysisException(s"The format of the existing table $tableName is " + 173 s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " + 174 s"`${specifiedProvider.getSimpleName}`.") 175 } 176 177 if (query.schema.length != existingTable.schema.length) { 178 throw new AnalysisException( 179 s"The column number of the existing table $tableName" + 180 s"(${existingTable.schema.catalogString}) doesn't match the data schema" + 181 s"(${query.schema.catalogString})") 182 } 183 184 val resolver = sessionState.conf.resolver 185 val tableCols = existingTable.schema.map(_.name) 186 187 reorderedColumns = Some(existingTable.schema.map { f => 188 query.resolve(Seq(f.name), resolver).getOrElse { 189 val inputColumns = query.schema.map(_.name).mkString(", ") 190 throw new AnalysisException( 191 s"cannot resolve '${f.name}' given input columns: [$inputColumns]") 192 } 193 }) 194 195 // In `AnalyzeCreateTable`, we verified the consistency between the user-specified table 196 // definition(partition columns, bucketing) and the SELECT query, here we also need to 197 // verify the the consistency between the user-specified table definition and the existing 198 // table definition. 199 200 // Check if the specified partition columns match the existing table. 201 val specifiedPartCols = CatalogUtils.normalizePartCols( 202 tableName, tableCols, table.partitionColumnNames, resolver) 203 if (specifiedPartCols != existingTable.partitionColumnNames) { 204 throw new AnalysisException( 205 s""" 206 |Specified partitioning does not match that of the existing table $tableName. 207 |Specified partition columns: [${specifiedPartCols.mkString(", ")}] 208 |Existing partition columns: [${existingTable.partitionColumnNames.mkString(", ")}] 209 """.stripMargin) 210 } 211 212 // Check if the specified bucketing match the existing table. 213 val specifiedBucketSpec = table.bucketSpec.map { bucketSpec => 214 CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver) 215 } 216 if (specifiedBucketSpec != existingTable.bucketSpec) { 217 val specifiedBucketString = 218 specifiedBucketSpec.map(_.toString).getOrElse("not bucketed") 219 val existingBucketString = 220 existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed") 221 throw new AnalysisException( 222 s""" 223 |Specified bucketing does not match that of the existing table $tableName. 224 |Specified bucketing: $specifiedBucketString 225 |Existing bucketing: $existingBucketString 226 """.stripMargin) 227 } 228 229 case SaveMode.Overwrite => 230 sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false) 231 // Need to create the table again. 232 createMetastoreTable = true 233 } 234 } else { 235 // The table does not exist. We need to create it in metastore. 236 createMetastoreTable = true 237 } 238 239 val data = Dataset.ofRows(sparkSession, query) 240 val df = reorderedColumns match { 241 // Reorder the columns of the query to match the existing table. 242 case Some(cols) => data.select(cols.map(Column(_)): _*) 243 case None => data 244 } 245 246 val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { 247 Some(sessionState.catalog.defaultTablePath(table.identifier)) 248 } else { 249 table.storage.locationUri 250 } 251 252 // Create the relation based on the data of df. 253 val pathOption = tableLocation.map("path" -> _) 254 val dataSource = DataSource( 255 sparkSession, 256 className = provider, 257 partitionColumns = table.partitionColumnNames, 258 bucketSpec = table.bucketSpec, 259 options = table.storage.properties ++ pathOption, 260 catalogTable = Some(table)) 261 262 val result = try { 263 dataSource.writeAndRead(mode, df) 264 } catch { 265 case ex: AnalysisException => 266 logError(s"Failed to write to table $tableName in $mode mode", ex) 267 throw ex 268 } 269 if (createMetastoreTable) { 270 val newTable = table.copy( 271 storage = table.storage.copy(locationUri = tableLocation), 272 // We will use the schema of resolved.relation as the schema of the table (instead of 273 // the schema of df). It is important since the nullability may be changed by the relation 274 // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). 275 schema = result.schema) 276 sessionState.catalog.createTable(newTable, ignoreIfExists = false) 277 } 278 279 result match { 280 case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && 281 sparkSession.sqlContext.conf.manageFilesourcePartitions => 282 // Need to recover partitions into the metastore so our saved data is visible. 283 sparkSession.sessionState.executePlan( 284 AlterTableRecoverPartitionsCommand(table.identifier)).toRdd 285 case _ => 286 } 287 288 // Refresh the cache of the table in the catalog. 289 sessionState.catalog.refreshTable(tableIdentWithDB) 290 Seq.empty[Row] 291 } 292} 293