1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.command
19
20import scala.collection.{GenMap, GenSeq}
21import scala.collection.parallel.ForkJoinTaskSupport
22import scala.concurrent.forkjoin.ForkJoinPool
23import scala.util.control.NonFatal
24
25import org.apache.hadoop.conf.Configuration
26import org.apache.hadoop.fs._
27import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
28
29import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
30import org.apache.spark.sql.catalyst.TableIdentifier
31import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
32import org.apache.spark.sql.catalyst.catalog._
33import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
34import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
35import org.apache.spark.sql.execution.datasources.PartitioningUtils
36import org.apache.spark.sql.types._
37import org.apache.spark.util.SerializableConfiguration
38
39// Note: The definition of these commands are based on the ones described in
40// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
41
42/**
43 * A command for users to create a new database.
44 *
45 * It will issue an error message when the database with the same name already exists,
46 * unless 'ifNotExists' is true.
47 * The syntax of using this command in SQL is:
48 * {{{
49 *   CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name
50 *     [COMMENT database_comment]
51 *     [LOCATION database_directory]
52 *     [WITH DBPROPERTIES (property_name=property_value, ...)];
53 * }}}
54 */
55case class CreateDatabaseCommand(
56    databaseName: String,
57    ifNotExists: Boolean,
58    path: Option[String],
59    comment: Option[String],
60    props: Map[String, String])
61  extends RunnableCommand {
62
63  override def run(sparkSession: SparkSession): Seq[Row] = {
64    val catalog = sparkSession.sessionState.catalog
65    catalog.createDatabase(
66      CatalogDatabase(
67        databaseName,
68        comment.getOrElse(""),
69        path.getOrElse(catalog.getDefaultDBPath(databaseName)),
70        props),
71      ifNotExists)
72    Seq.empty[Row]
73  }
74}
75
76
77/**
78 * A command for users to remove a database from the system.
79 *
80 * 'ifExists':
81 * - true, if database_name does't exist, no action
82 * - false (default), if database_name does't exist, a warning message will be issued
83 * 'cascade':
84 * - true, the dependent objects are automatically dropped before dropping database.
85 * - false (default), it is in the Restrict mode. The database cannot be dropped if
86 * it is not empty. The inclusive tables must be dropped at first.
87 *
88 * The syntax of using this command in SQL is:
89 * {{{
90 *    DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
91 * }}}
92 */
93case class DropDatabaseCommand(
94    databaseName: String,
95    ifExists: Boolean,
96    cascade: Boolean)
97  extends RunnableCommand {
98
99  override def run(sparkSession: SparkSession): Seq[Row] = {
100    sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade)
101    Seq.empty[Row]
102  }
103}
104
105/**
106 * A command for users to add new (key, value) pairs into DBPROPERTIES
107 * If the database does not exist, an error message will be issued to indicate the database
108 * does not exist.
109 * The syntax of using this command in SQL is:
110 * {{{
111 *    ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
112 * }}}
113 */
114case class AlterDatabasePropertiesCommand(
115    databaseName: String,
116    props: Map[String, String])
117  extends RunnableCommand {
118
119  override def run(sparkSession: SparkSession): Seq[Row] = {
120    val catalog = sparkSession.sessionState.catalog
121    val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName)
122    catalog.alterDatabase(db.copy(properties = db.properties ++ props))
123
124    Seq.empty[Row]
125  }
126}
127
128/**
129 * A command for users to show the name of the database, its comment (if one has been set), and its
130 * root location on the filesystem. When extended is true, it also shows the database's properties
131 * If the database does not exist, an error message will be issued to indicate the database
132 * does not exist.
133 * The syntax of using this command in SQL is
134 * {{{
135 *    DESCRIBE DATABASE [EXTENDED] db_name
136 * }}}
137 */
138case class DescribeDatabaseCommand(
139    databaseName: String,
140    extended: Boolean)
141  extends RunnableCommand {
142
143  override def run(sparkSession: SparkSession): Seq[Row] = {
144    val dbMetadata: CatalogDatabase =
145      sparkSession.sessionState.catalog.getDatabaseMetadata(databaseName)
146    val result =
147      Row("Database Name", dbMetadata.name) ::
148        Row("Description", dbMetadata.description) ::
149        Row("Location", dbMetadata.locationUri) :: Nil
150
151    if (extended) {
152      val properties =
153        if (dbMetadata.properties.isEmpty) {
154          ""
155        } else {
156          dbMetadata.properties.toSeq.mkString("(", ", ", ")")
157        }
158      result :+ Row("Properties", properties)
159    } else {
160      result
161    }
162  }
163
164  override val output: Seq[Attribute] = {
165    AttributeReference("database_description_item", StringType, nullable = false)() ::
166      AttributeReference("database_description_value", StringType, nullable = false)() :: Nil
167  }
168}
169
170/**
171 * Drops a table/view from the metastore and removes it if it is cached.
172 *
173 * The syntax of this command is:
174 * {{{
175 *   DROP TABLE [IF EXISTS] table_name;
176 *   DROP VIEW [IF EXISTS] [db_name.]view_name;
177 * }}}
178 */
179case class DropTableCommand(
180    tableName: TableIdentifier,
181    ifExists: Boolean,
182    isView: Boolean,
183    purge: Boolean) extends RunnableCommand {
184
185  override def run(sparkSession: SparkSession): Seq[Row] = {
186    val catalog = sparkSession.sessionState.catalog
187
188    if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) {
189      // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
190      // issue an exception.
191      catalog.getTableMetadata(tableName).tableType match {
192        case CatalogTableType.VIEW if !isView =>
193          throw new AnalysisException(
194            "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
195        case o if o != CatalogTableType.VIEW && isView =>
196          throw new AnalysisException(
197            s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
198        case _ =>
199      }
200    }
201    try {
202      sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
203    } catch {
204      case _: NoSuchTableException if ifExists =>
205      case NonFatal(e) => log.warn(e.toString, e)
206    }
207    catalog.refreshTable(tableName)
208    catalog.dropTable(tableName, ifExists, purge)
209    Seq.empty[Row]
210  }
211}
212
213/**
214 * A command that sets table/view properties.
215 *
216 * The syntax of this command is:
217 * {{{
218 *   ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
219 *   ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
220 * }}}
221 */
222case class AlterTableSetPropertiesCommand(
223    tableName: TableIdentifier,
224    properties: Map[String, String],
225    isView: Boolean)
226  extends RunnableCommand {
227
228  override def run(sparkSession: SparkSession): Seq[Row] = {
229    val catalog = sparkSession.sessionState.catalog
230    val table = catalog.getTableMetadata(tableName)
231    DDLUtils.verifyAlterTableType(catalog, table, isView)
232    // This overrides old properties
233    val newTable = table.copy(properties = table.properties ++ properties)
234    catalog.alterTable(newTable)
235    Seq.empty[Row]
236  }
237
238}
239
240/**
241 * A command that unsets table/view properties.
242 *
243 * The syntax of this command is:
244 * {{{
245 *   ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
246 *   ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
247 * }}}
248 */
249case class AlterTableUnsetPropertiesCommand(
250    tableName: TableIdentifier,
251    propKeys: Seq[String],
252    ifExists: Boolean,
253    isView: Boolean)
254  extends RunnableCommand {
255
256  override def run(sparkSession: SparkSession): Seq[Row] = {
257    val catalog = sparkSession.sessionState.catalog
258    val table = catalog.getTableMetadata(tableName)
259    DDLUtils.verifyAlterTableType(catalog, table, isView)
260    if (!ifExists) {
261      propKeys.foreach { k =>
262        if (!table.properties.contains(k)) {
263          throw new AnalysisException(
264            s"Attempted to unset non-existent property '$k' in table '${table.identifier}'")
265        }
266      }
267    }
268    val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
269    val newTable = table.copy(properties = newProperties)
270    catalog.alterTable(newTable)
271    Seq.empty[Row]
272  }
273
274}
275
276/**
277 * A command that sets the serde class and/or serde properties of a table/view.
278 *
279 * The syntax of this command is:
280 * {{{
281 *   ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
282 *   ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
283 * }}}
284 */
285case class AlterTableSerDePropertiesCommand(
286    tableName: TableIdentifier,
287    serdeClassName: Option[String],
288    serdeProperties: Option[Map[String, String]],
289    partSpec: Option[TablePartitionSpec])
290  extends RunnableCommand {
291
292  // should never happen if we parsed things correctly
293  require(serdeClassName.isDefined || serdeProperties.isDefined,
294    "ALTER TABLE attempted to set neither serde class name nor serde properties")
295
296  override def run(sparkSession: SparkSession): Seq[Row] = {
297    val catalog = sparkSession.sessionState.catalog
298    val table = catalog.getTableMetadata(tableName)
299    DDLUtils.verifyAlterTableType(catalog, table, isView = false)
300    // For datasource tables, disallow setting serde or specifying partition
301    if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
302      throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
303        "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " +
304        "for tables created with the datasource API")
305    }
306    if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
307      throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " +
308        "not supported for tables created with the datasource API")
309    }
310    if (partSpec.isEmpty) {
311      val newTable = table.withNewStorage(
312        serde = serdeClassName.orElse(table.storage.serde),
313        properties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
314      catalog.alterTable(newTable)
315    } else {
316      val spec = partSpec.get
317      val part = catalog.getPartition(table.identifier, spec)
318      val newPart = part.copy(storage = part.storage.copy(
319        serde = serdeClassName.orElse(part.storage.serde),
320        properties = part.storage.properties ++ serdeProperties.getOrElse(Map())))
321      catalog.alterPartitions(table.identifier, Seq(newPart))
322    }
323    Seq.empty[Row]
324  }
325
326}
327
328/**
329 * Add Partition in ALTER TABLE: add the table partitions.
330 *
331 * An error message will be issued if the partition exists, unless 'ifNotExists' is true.
332 *
333 * The syntax of this command is:
334 * {{{
335 *   ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
336 *                                         PARTITION spec2 [LOCATION 'loc2']
337 * }}}
338 */
339case class AlterTableAddPartitionCommand(
340    tableName: TableIdentifier,
341    partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
342    ifNotExists: Boolean)
343  extends RunnableCommand {
344
345  override def run(sparkSession: SparkSession): Seq[Row] = {
346    val catalog = sparkSession.sessionState.catalog
347    val table = catalog.getTableMetadata(tableName)
348    DDLUtils.verifyAlterTableType(catalog, table, isView = false)
349    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION")
350    val parts = partitionSpecsAndLocs.map { case (spec, location) =>
351      val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
352        spec,
353        table.partitionColumnNames,
354        table.identifier.quotedString,
355        sparkSession.sessionState.conf.resolver)
356      // inherit table storage format (possibly except for location)
357      CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location))
358    }
359    catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
360    Seq.empty[Row]
361  }
362
363}
364
365/**
366 * Alter a table partition's spec.
367 *
368 * The syntax of this command is:
369 * {{{
370 *   ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
371 * }}}
372 */
373case class AlterTableRenamePartitionCommand(
374    tableName: TableIdentifier,
375    oldPartition: TablePartitionSpec,
376    newPartition: TablePartitionSpec)
377  extends RunnableCommand {
378
379  override def run(sparkSession: SparkSession): Seq[Row] = {
380    val catalog = sparkSession.sessionState.catalog
381    val table = catalog.getTableMetadata(tableName)
382    DDLUtils.verifyAlterTableType(catalog, table, isView = false)
383    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION")
384
385    val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec(
386      oldPartition,
387      table.partitionColumnNames,
388      table.identifier.quotedString,
389      sparkSession.sessionState.conf.resolver)
390
391    val normalizedNewPartition = PartitioningUtils.normalizePartitionSpec(
392      newPartition,
393      table.partitionColumnNames,
394      table.identifier.quotedString,
395      sparkSession.sessionState.conf.resolver)
396
397    catalog.renamePartitions(
398      tableName, Seq(normalizedOldPartition), Seq(normalizedNewPartition))
399    Seq.empty[Row]
400  }
401
402}
403
404/**
405 * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
406 *
407 * This removes the data and metadata for this partition.
408 * The data is actually moved to the .Trash/Current directory if Trash is configured,
409 * unless 'purge' is true, but the metadata is completely lost.
410 * An error message will be issued if the partition does not exist, unless 'ifExists' is true.
411 * Note: purge is always false when the target is a view.
412 *
413 * The syntax of this command is:
414 * {{{
415 *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
416 * }}}
417 */
418case class AlterTableDropPartitionCommand(
419    tableName: TableIdentifier,
420    specs: Seq[TablePartitionSpec],
421    ifExists: Boolean,
422    purge: Boolean,
423    retainData: Boolean)
424  extends RunnableCommand {
425
426  override def run(sparkSession: SparkSession): Seq[Row] = {
427    val catalog = sparkSession.sessionState.catalog
428    val table = catalog.getTableMetadata(tableName)
429    DDLUtils.verifyAlterTableType(catalog, table, isView = false)
430    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")
431
432    val normalizedSpecs = specs.map { spec =>
433      PartitioningUtils.normalizePartitionSpec(
434        spec,
435        table.partitionColumnNames,
436        table.identifier.quotedString,
437        sparkSession.sessionState.conf.resolver)
438    }
439
440    catalog.dropPartitions(
441      table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
442      retainData = retainData)
443    Seq.empty[Row]
444  }
445
446}
447
448
449case class PartitionStatistics(numFiles: Int, totalSize: Long)
450
451/**
452 * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
453 * update the catalog.
454 *
455 * The syntax of this command is:
456 * {{{
457 *   ALTER TABLE table RECOVER PARTITIONS;
458 *   MSCK REPAIR TABLE table;
459 * }}}
460 */
461case class AlterTableRecoverPartitionsCommand(
462    tableName: TableIdentifier,
463    cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
464
465  // These are list of statistics that can be collected quickly without requiring a scan of the data
466  // see https://github.com/apache/hive/blob/master/
467  //   common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
468  val NUM_FILES = "numFiles"
469  val TOTAL_SIZE = "totalSize"
470  val DDL_TIME = "transient_lastDdlTime"
471
472  private def getPathFilter(hadoopConf: Configuration): PathFilter = {
473    // Dummy jobconf to get to the pathFilter defined in configuration
474    // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
475    val jobConf = new JobConf(hadoopConf, this.getClass)
476    val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
477    new PathFilter {
478      override def accept(path: Path): Boolean = {
479        val name = path.getName
480        if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
481          pathFilter == null || pathFilter.accept(path)
482        } else {
483          false
484        }
485      }
486    }
487  }
488
489  override def run(spark: SparkSession): Seq[Row] = {
490    val catalog = spark.sessionState.catalog
491    val table = catalog.getTableMetadata(tableName)
492    val tableIdentWithDB = table.identifier.quotedString
493    DDLUtils.verifyAlterTableType(catalog, table, isView = false)
494    if (table.partitionColumnNames.isEmpty) {
495      throw new AnalysisException(
496        s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
497    }
498
499    if (table.storage.locationUri.isEmpty) {
500      throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
501        s"location provided: $tableIdentWithDB")
502    }
503
504    val root = new Path(table.location)
505    logInfo(s"Recover all the partitions in $root")
506    val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
507
508    val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
509    val hadoopConf = spark.sparkContext.hadoopConfiguration
510    val pathFilter = getPathFilter(hadoopConf)
511    val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(),
512      table.partitionColumnNames, threshold, spark.sessionState.conf.resolver)
513    val total = partitionSpecsAndLocs.length
514    logInfo(s"Found $total partitions in $root")
515
516    val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
517      gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
518    } else {
519      GenMap.empty[String, PartitionStatistics]
520    }
521    logInfo(s"Finished to gather the fast stats for all $total partitions.")
522
523    addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
524    // Updates the table to indicate that its partition metadata is stored in the Hive metastore.
525    // This is always the case for Hive format tables, but is not true for Datasource tables created
526    // before Spark 2.1 unless they are converted via `msck repair table`.
527    spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
528    catalog.refreshTable(tableName)
529    logInfo(s"Recovered all partitions ($total).")
530    Seq.empty[Row]
531  }
532
533  @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
534
535  private def scanPartitions(
536      spark: SparkSession,
537      fs: FileSystem,
538      filter: PathFilter,
539      path: Path,
540      spec: TablePartitionSpec,
541      partitionNames: Seq[String],
542      threshold: Int,
543      resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = {
544    if (partitionNames.isEmpty) {
545      return Seq(spec -> path)
546    }
547
548    val statuses = fs.listStatus(path, filter)
549    val statusPar: GenSeq[FileStatus] =
550      if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
551        // parallelize the list of partitions here, then we can have better parallelism later.
552        val parArray = statuses.par
553        parArray.tasksupport = evalTaskSupport
554        parArray
555      } else {
556        statuses
557      }
558    statusPar.flatMap { st =>
559      val name = st.getPath.getName
560      if (st.isDirectory && name.contains("=")) {
561        val ps = name.split("=", 2)
562        val columnName = ExternalCatalogUtils.unescapePathName(ps(0))
563        // TODO: Validate the value
564        val value = ExternalCatalogUtils.unescapePathName(ps(1))
565        if (resolver(columnName, partitionNames.head)) {
566          scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
567            partitionNames.drop(1), threshold, resolver)
568        } else {
569          logWarning(
570            s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
571          Seq()
572        }
573      } else {
574        logWarning(s"ignore ${new Path(path, name)}")
575        Seq()
576      }
577    }
578  }
579
580  private def gatherPartitionStats(
581      spark: SparkSession,
582      partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
583      fs: FileSystem,
584      pathFilter: PathFilter,
585      threshold: Int): GenMap[String, PartitionStatistics] = {
586    if (partitionSpecsAndLocs.length > threshold) {
587      val hadoopConf = spark.sparkContext.hadoopConfiguration
588      val serializableConfiguration = new SerializableConfiguration(hadoopConf)
589      val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
590
591      // Set the number of parallelism to prevent following file listing from generating many tasks
592      // in case of large #defaultParallelism.
593      val numParallelism = Math.min(serializedPaths.length,
594        Math.min(spark.sparkContext.defaultParallelism, 10000))
595      // gather the fast stats for all the partitions otherwise Hive metastore will list all the
596      // files for all the new partitions in sequential way, which is super slow.
597      logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
598      spark.sparkContext.parallelize(serializedPaths, numParallelism)
599        .mapPartitions { paths =>
600          val pathFilter = getPathFilter(serializableConfiguration.value)
601          paths.map(new Path(_)).map{ path =>
602            val fs = path.getFileSystem(serializableConfiguration.value)
603            val statuses = fs.listStatus(path, pathFilter)
604            (path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
605          }
606        }.collectAsMap()
607    } else {
608      partitionSpecsAndLocs.map { case (_, location) =>
609        val statuses = fs.listStatus(location, pathFilter)
610        (location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
611      }.toMap
612    }
613  }
614
615  private def addPartitions(
616      spark: SparkSession,
617      table: CatalogTable,
618      partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
619      partitionStats: GenMap[String, PartitionStatistics]): Unit = {
620    val total = partitionSpecsAndLocs.length
621    var done = 0L
622    // Hive metastore may not have enough memory to handle millions of partitions in single RPC,
623    // we should split them into smaller batches. Since Hive client is not thread safe, we cannot
624    // do this in parallel.
625    val batchSize = 100
626    partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
627      val now = System.currentTimeMillis() / 1000
628      val parts = batch.map { case (spec, location) =>
629        val params = partitionStats.get(location.toString).map {
630          case PartitionStatistics(numFiles, totalSize) =>
631            // This two fast stat could prevent Hive metastore to list the files again.
632            Map(NUM_FILES -> numFiles.toString,
633              TOTAL_SIZE -> totalSize.toString,
634              // Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
635              // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
636              DDL_TIME -> now.toString)
637        }.getOrElse(Map.empty)
638        // inherit table storage format (possibly except for location)
639        CatalogTablePartition(
640          spec,
641          table.storage.copy(locationUri = Some(location.toUri.toString)),
642          params)
643      }
644      spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
645      done += parts.length
646      logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
647    }
648  }
649}
650
651
652/**
653 * A command that sets the location of a table or a partition.
654 *
655 * For normal tables, this just sets the location URI in the table/partition's storage format.
656 * For datasource tables, this sets a "path" parameter in the table/partition's serde properties.
657 *
658 * The syntax of this command is:
659 * {{{
660 *    ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc";
661 * }}}
662 */
663case class AlterTableSetLocationCommand(
664    tableName: TableIdentifier,
665    partitionSpec: Option[TablePartitionSpec],
666    location: String)
667  extends RunnableCommand {
668
669  override def run(sparkSession: SparkSession): Seq[Row] = {
670    val catalog = sparkSession.sessionState.catalog
671    val table = catalog.getTableMetadata(tableName)
672    DDLUtils.verifyAlterTableType(catalog, table, isView = false)
673    partitionSpec match {
674      case Some(spec) =>
675        DDLUtils.verifyPartitionProviderIsHive(
676          sparkSession, table, "ALTER TABLE ... SET LOCATION")
677        // Partition spec is specified, so we set the location only for this partition
678        val part = catalog.getPartition(table.identifier, spec)
679        val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
680        catalog.alterPartitions(table.identifier, Seq(newPart))
681      case None =>
682        // No partition spec is specified, so we set the location for the table itself
683        catalog.alterTable(table.withNewStorage(locationUri = Some(location)))
684    }
685    Seq.empty[Row]
686  }
687}
688
689
690object DDLUtils {
691  val HIVE_PROVIDER = "hive"
692
693  def isDatasourceTable(table: CatalogTable): Boolean = {
694    table.provider.isDefined && table.provider.get != HIVE_PROVIDER
695  }
696
697  /**
698   * Throws a standard error for actions that require partitionProvider = hive.
699   */
700  def verifyPartitionProviderIsHive(
701      spark: SparkSession, table: CatalogTable, action: String): Unit = {
702    val tableName = table.identifier.table
703    if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
704      throw new AnalysisException(
705        s"$action is not allowed on $tableName since filesource partition management is " +
706          "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
707    }
708    if (!table.tracksPartitionsInCatalog && isDatasourceTable(table)) {
709      throw new AnalysisException(
710        s"$action is not allowed on $tableName since its partition metadata is not stored in " +
711          "the Hive metastore. To import this information into the metastore, run " +
712          s"`msck repair table $tableName`")
713    }
714  }
715
716  /**
717   * If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
718   * issue an exception [[AnalysisException]].
719   *
720   * Note: temporary views can be altered by both ALTER VIEW and ALTER TABLE commands,
721   * since temporary views can be also created by CREATE TEMPORARY TABLE. In the future,
722   * when we decided to drop the support, we should disallow users to alter temporary views
723   * by ALTER TABLE.
724   */
725  def verifyAlterTableType(
726      catalog: SessionCatalog,
727      tableMetadata: CatalogTable,
728      isView: Boolean): Unit = {
729    if (!catalog.isTemporaryTable(tableMetadata.identifier)) {
730      tableMetadata.tableType match {
731        case CatalogTableType.VIEW if !isView =>
732          throw new AnalysisException(
733            "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
734        case o if o != CatalogTableType.VIEW && isView =>
735          throw new AnalysisException(
736            s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
737        case _ =>
738      }
739    }
740  }
741}
742