1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.command
19
20import org.apache.spark.sql._
21import org.apache.spark.sql.catalyst.catalog._
22import org.apache.spark.sql.catalyst.expressions.NamedExpression
23import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24import org.apache.spark.sql.execution.datasources._
25import org.apache.spark.sql.sources.BaseRelation
26
27/**
28 * A command used to create a data source table.
29 *
30 * Note: This is different from [[CreateTableCommand]]. Please check the syntax for difference.
31 * This is not intended for temporary tables.
32 *
33 * The syntax of using this command in SQL is:
34 * {{{
35 *   CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
36 *   [(col1 data_type [COMMENT col_comment], ...)]
37 *   USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
38 * }}}
39 */
40case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
41  extends RunnableCommand {
42
43  override def run(sparkSession: SparkSession): Seq[Row] = {
44    assert(table.tableType != CatalogTableType.VIEW)
45    assert(table.provider.isDefined)
46
47    val sessionState = sparkSession.sessionState
48    if (sessionState.catalog.tableExists(table.identifier)) {
49      if (ignoreIfExists) {
50        return Seq.empty[Row]
51      } else {
52        throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
53      }
54    }
55
56    // Create the relation to validate the arguments before writing the metadata to the metastore,
57    // and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
58    val pathOption = table.storage.locationUri.map("path" -> _)
59    // Fill in some default table options from the session conf
60    val tableWithDefaultOptions = table.copy(
61      identifier = table.identifier.copy(
62        database = Some(
63          table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
64      tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
65    val dataSource: BaseRelation =
66      DataSource(
67        sparkSession = sparkSession,
68        userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
69        partitionColumns = table.partitionColumnNames,
70        className = table.provider.get,
71        bucketSpec = table.bucketSpec,
72        options = table.storage.properties ++ pathOption,
73        catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
74
75    dataSource match {
76      case fs: HadoopFsRelation =>
77        if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
78          throw new AnalysisException(
79            "Cannot create a file-based external data source table without path")
80        }
81      case _ =>
82    }
83
84    val partitionColumnNames = if (table.schema.nonEmpty) {
85      table.partitionColumnNames
86    } else {
87      // This is guaranteed in `PreprocessDDL`.
88      assert(table.partitionColumnNames.isEmpty)
89      dataSource match {
90        case r: HadoopFsRelation => r.partitionSchema.fieldNames.toSeq
91        case _ => Nil
92      }
93    }
94
95    val newTable = table.copy(
96      schema = dataSource.schema,
97      partitionColumnNames = partitionColumnNames,
98      // If metastore partition management for file source tables is enabled, we start off with
99      // partition provider hive, but no partitions in the metastore. The user has to call
100      // `msck repair table` to populate the table partitions.
101      tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
102        sparkSession.sessionState.conf.manageFilesourcePartitions)
103    // We will return Nil or throw exception at the beginning if the table already exists, so when
104    // we reach here, the table should not exist and we should set `ignoreIfExists` to false.
105    sessionState.catalog.createTable(newTable, ignoreIfExists = false)
106
107    Seq.empty[Row]
108  }
109}
110
111/**
112 * A command used to create a data source table using the result of a query.
113 *
114 * Note: This is different from `CreateHiveTableAsSelectCommand`. Please check the syntax for
115 * difference. This is not intended for temporary tables.
116 *
117 * The syntax of using this command in SQL is:
118 * {{{
119 *   CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
120 *   USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
121 *   AS SELECT ...
122 * }}}
123 */
124case class CreateDataSourceTableAsSelectCommand(
125    table: CatalogTable,
126    mode: SaveMode,
127    query: LogicalPlan)
128  extends RunnableCommand {
129
130  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
131
132  override def run(sparkSession: SparkSession): Seq[Row] = {
133    assert(table.tableType != CatalogTableType.VIEW)
134    assert(table.provider.isDefined)
135    assert(table.schema.isEmpty)
136
137    val provider = table.provider.get
138    val sessionState = sparkSession.sessionState
139    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
140    val tableIdentWithDB = table.identifier.copy(database = Some(db))
141    val tableName = tableIdentWithDB.unquotedString
142
143    var createMetastoreTable = false
144    // We may need to reorder the columns of the query to match the existing table.
145    var reorderedColumns = Option.empty[Seq[NamedExpression]]
146    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
147      // Check if we need to throw an exception or just return.
148      mode match {
149        case SaveMode.ErrorIfExists =>
150          throw new AnalysisException(s"Table $tableName already exists. " +
151            s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
152            s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
153            s"the existing data. " +
154            s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
155        case SaveMode.Ignore =>
156          // Since the table already exists and the save mode is Ignore, we will just return.
157          return Seq.empty[Row]
158        case SaveMode.Append =>
159          val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB)
160
161          if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
162            throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
163              "not supported yet. Please use the insertInto() API as an alternative.")
164          }
165
166          // Check if the specified data source match the data source of the existing table.
167          val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
168          val specifiedProvider = DataSource.lookupDataSource(table.provider.get)
169          // TODO: Check that options from the resolved relation match the relation that we are
170          // inserting into (i.e. using the same compression).
171          if (existingProvider != specifiedProvider) {
172            throw new AnalysisException(s"The format of the existing table $tableName is " +
173              s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
174              s"`${specifiedProvider.getSimpleName}`.")
175          }
176
177          if (query.schema.length != existingTable.schema.length) {
178            throw new AnalysisException(
179              s"The column number of the existing table $tableName" +
180                s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
181                s"(${query.schema.catalogString})")
182          }
183
184          val resolver = sessionState.conf.resolver
185          val tableCols = existingTable.schema.map(_.name)
186
187          reorderedColumns = Some(existingTable.schema.map { f =>
188            query.resolve(Seq(f.name), resolver).getOrElse {
189              val inputColumns = query.schema.map(_.name).mkString(", ")
190              throw new AnalysisException(
191                s"cannot resolve '${f.name}' given input columns: [$inputColumns]")
192            }
193          })
194
195          // In `AnalyzeCreateTable`, we verified the consistency between the user-specified table
196          // definition(partition columns, bucketing) and the SELECT query, here we also need to
197          // verify the the consistency between the user-specified table definition and the existing
198          // table definition.
199
200          // Check if the specified partition columns match the existing table.
201          val specifiedPartCols = CatalogUtils.normalizePartCols(
202            tableName, tableCols, table.partitionColumnNames, resolver)
203          if (specifiedPartCols != existingTable.partitionColumnNames) {
204            throw new AnalysisException(
205              s"""
206                |Specified partitioning does not match that of the existing table $tableName.
207                |Specified partition columns: [${specifiedPartCols.mkString(", ")}]
208                |Existing partition columns: [${existingTable.partitionColumnNames.mkString(", ")}]
209              """.stripMargin)
210          }
211
212          // Check if the specified bucketing match the existing table.
213          val specifiedBucketSpec = table.bucketSpec.map { bucketSpec =>
214            CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver)
215          }
216          if (specifiedBucketSpec != existingTable.bucketSpec) {
217            val specifiedBucketString =
218              specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
219            val existingBucketString =
220              existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed")
221            throw new AnalysisException(
222              s"""
223                |Specified bucketing does not match that of the existing table $tableName.
224                |Specified bucketing: $specifiedBucketString
225                |Existing bucketing: $existingBucketString
226              """.stripMargin)
227          }
228
229        case SaveMode.Overwrite =>
230          sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
231          // Need to create the table again.
232          createMetastoreTable = true
233      }
234    } else {
235      // The table does not exist. We need to create it in metastore.
236      createMetastoreTable = true
237    }
238
239    val data = Dataset.ofRows(sparkSession, query)
240    val df = reorderedColumns match {
241      // Reorder the columns of the query to match the existing table.
242      case Some(cols) => data.select(cols.map(Column(_)): _*)
243      case None => data
244    }
245
246    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
247      Some(sessionState.catalog.defaultTablePath(table.identifier))
248    } else {
249      table.storage.locationUri
250    }
251
252    // Create the relation based on the data of df.
253    val pathOption = tableLocation.map("path" -> _)
254    val dataSource = DataSource(
255      sparkSession,
256      className = provider,
257      partitionColumns = table.partitionColumnNames,
258      bucketSpec = table.bucketSpec,
259      options = table.storage.properties ++ pathOption,
260      catalogTable = Some(table))
261
262    val result = try {
263      dataSource.writeAndRead(mode, df)
264    } catch {
265      case ex: AnalysisException =>
266        logError(s"Failed to write to table $tableName in $mode mode", ex)
267        throw ex
268    }
269    if (createMetastoreTable) {
270      val newTable = table.copy(
271        storage = table.storage.copy(locationUri = tableLocation),
272        // We will use the schema of resolved.relation as the schema of the table (instead of
273        // the schema of df). It is important since the nullability may be changed by the relation
274        // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
275        schema = result.schema)
276      sessionState.catalog.createTable(newTable, ignoreIfExists = false)
277    }
278
279    result match {
280      case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
281          sparkSession.sqlContext.conf.manageFilesourcePartitions =>
282        // Need to recover partitions into the metastore so our saved data is visible.
283        sparkSession.sessionState.executePlan(
284          AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
285      case _ =>
286    }
287
288    // Refresh the cache of the table in the catalog.
289    sessionState.catalog.refreshTable(tableIdentWithDB)
290    Seq.empty[Row]
291  }
292}
293