1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.command 19 20import scala.collection.{GenMap, GenSeq} 21import scala.collection.parallel.ForkJoinTaskSupport 22import scala.concurrent.forkjoin.ForkJoinPool 23import scala.util.control.NonFatal 24 25import org.apache.hadoop.conf.Configuration 26import org.apache.hadoop.fs._ 27import org.apache.hadoop.mapred.{FileInputFormat, JobConf} 28 29import org.apache.spark.sql.{AnalysisException, Row, SparkSession} 30import org.apache.spark.sql.catalyst.TableIdentifier 31import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} 32import org.apache.spark.sql.catalyst.catalog._ 33import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec 34import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} 35import org.apache.spark.sql.execution.datasources.PartitioningUtils 36import org.apache.spark.sql.types._ 37import org.apache.spark.util.SerializableConfiguration 38 39// Note: The definition of these commands are based on the ones described in 40// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL 41 42/** 43 * A command for users to create a new database. 44 * 45 * It will issue an error message when the database with the same name already exists, 46 * unless 'ifNotExists' is true. 47 * The syntax of using this command in SQL is: 48 * {{{ 49 * CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name 50 * [COMMENT database_comment] 51 * [LOCATION database_directory] 52 * [WITH DBPROPERTIES (property_name=property_value, ...)]; 53 * }}} 54 */ 55case class CreateDatabaseCommand( 56 databaseName: String, 57 ifNotExists: Boolean, 58 path: Option[String], 59 comment: Option[String], 60 props: Map[String, String]) 61 extends RunnableCommand { 62 63 override def run(sparkSession: SparkSession): Seq[Row] = { 64 val catalog = sparkSession.sessionState.catalog 65 catalog.createDatabase( 66 CatalogDatabase( 67 databaseName, 68 comment.getOrElse(""), 69 path.getOrElse(catalog.getDefaultDBPath(databaseName)), 70 props), 71 ifNotExists) 72 Seq.empty[Row] 73 } 74} 75 76 77/** 78 * A command for users to remove a database from the system. 79 * 80 * 'ifExists': 81 * - true, if database_name does't exist, no action 82 * - false (default), if database_name does't exist, a warning message will be issued 83 * 'cascade': 84 * - true, the dependent objects are automatically dropped before dropping database. 85 * - false (default), it is in the Restrict mode. The database cannot be dropped if 86 * it is not empty. The inclusive tables must be dropped at first. 87 * 88 * The syntax of using this command in SQL is: 89 * {{{ 90 * DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]; 91 * }}} 92 */ 93case class DropDatabaseCommand( 94 databaseName: String, 95 ifExists: Boolean, 96 cascade: Boolean) 97 extends RunnableCommand { 98 99 override def run(sparkSession: SparkSession): Seq[Row] = { 100 sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade) 101 Seq.empty[Row] 102 } 103} 104 105/** 106 * A command for users to add new (key, value) pairs into DBPROPERTIES 107 * If the database does not exist, an error message will be issued to indicate the database 108 * does not exist. 109 * The syntax of using this command in SQL is: 110 * {{{ 111 * ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...) 112 * }}} 113 */ 114case class AlterDatabasePropertiesCommand( 115 databaseName: String, 116 props: Map[String, String]) 117 extends RunnableCommand { 118 119 override def run(sparkSession: SparkSession): Seq[Row] = { 120 val catalog = sparkSession.sessionState.catalog 121 val db: CatalogDatabase = catalog.getDatabaseMetadata(databaseName) 122 catalog.alterDatabase(db.copy(properties = db.properties ++ props)) 123 124 Seq.empty[Row] 125 } 126} 127 128/** 129 * A command for users to show the name of the database, its comment (if one has been set), and its 130 * root location on the filesystem. When extended is true, it also shows the database's properties 131 * If the database does not exist, an error message will be issued to indicate the database 132 * does not exist. 133 * The syntax of using this command in SQL is 134 * {{{ 135 * DESCRIBE DATABASE [EXTENDED] db_name 136 * }}} 137 */ 138case class DescribeDatabaseCommand( 139 databaseName: String, 140 extended: Boolean) 141 extends RunnableCommand { 142 143 override def run(sparkSession: SparkSession): Seq[Row] = { 144 val dbMetadata: CatalogDatabase = 145 sparkSession.sessionState.catalog.getDatabaseMetadata(databaseName) 146 val result = 147 Row("Database Name", dbMetadata.name) :: 148 Row("Description", dbMetadata.description) :: 149 Row("Location", dbMetadata.locationUri) :: Nil 150 151 if (extended) { 152 val properties = 153 if (dbMetadata.properties.isEmpty) { 154 "" 155 } else { 156 dbMetadata.properties.toSeq.mkString("(", ", ", ")") 157 } 158 result :+ Row("Properties", properties) 159 } else { 160 result 161 } 162 } 163 164 override val output: Seq[Attribute] = { 165 AttributeReference("database_description_item", StringType, nullable = false)() :: 166 AttributeReference("database_description_value", StringType, nullable = false)() :: Nil 167 } 168} 169 170/** 171 * Drops a table/view from the metastore and removes it if it is cached. 172 * 173 * The syntax of this command is: 174 * {{{ 175 * DROP TABLE [IF EXISTS] table_name; 176 * DROP VIEW [IF EXISTS] [db_name.]view_name; 177 * }}} 178 */ 179case class DropTableCommand( 180 tableName: TableIdentifier, 181 ifExists: Boolean, 182 isView: Boolean, 183 purge: Boolean) extends RunnableCommand { 184 185 override def run(sparkSession: SparkSession): Seq[Row] = { 186 val catalog = sparkSession.sessionState.catalog 187 188 if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) { 189 // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view 190 // issue an exception. 191 catalog.getTableMetadata(tableName).tableType match { 192 case CatalogTableType.VIEW if !isView => 193 throw new AnalysisException( 194 "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") 195 case o if o != CatalogTableType.VIEW && isView => 196 throw new AnalysisException( 197 s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") 198 case _ => 199 } 200 } 201 try { 202 sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) 203 } catch { 204 case _: NoSuchTableException if ifExists => 205 case NonFatal(e) => log.warn(e.toString, e) 206 } 207 catalog.refreshTable(tableName) 208 catalog.dropTable(tableName, ifExists, purge) 209 Seq.empty[Row] 210 } 211} 212 213/** 214 * A command that sets table/view properties. 215 * 216 * The syntax of this command is: 217 * {{{ 218 * ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...); 219 * ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...); 220 * }}} 221 */ 222case class AlterTableSetPropertiesCommand( 223 tableName: TableIdentifier, 224 properties: Map[String, String], 225 isView: Boolean) 226 extends RunnableCommand { 227 228 override def run(sparkSession: SparkSession): Seq[Row] = { 229 val catalog = sparkSession.sessionState.catalog 230 val table = catalog.getTableMetadata(tableName) 231 DDLUtils.verifyAlterTableType(catalog, table, isView) 232 // This overrides old properties 233 val newTable = table.copy(properties = table.properties ++ properties) 234 catalog.alterTable(newTable) 235 Seq.empty[Row] 236 } 237 238} 239 240/** 241 * A command that unsets table/view properties. 242 * 243 * The syntax of this command is: 244 * {{{ 245 * ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...); 246 * ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...); 247 * }}} 248 */ 249case class AlterTableUnsetPropertiesCommand( 250 tableName: TableIdentifier, 251 propKeys: Seq[String], 252 ifExists: Boolean, 253 isView: Boolean) 254 extends RunnableCommand { 255 256 override def run(sparkSession: SparkSession): Seq[Row] = { 257 val catalog = sparkSession.sessionState.catalog 258 val table = catalog.getTableMetadata(tableName) 259 DDLUtils.verifyAlterTableType(catalog, table, isView) 260 if (!ifExists) { 261 propKeys.foreach { k => 262 if (!table.properties.contains(k)) { 263 throw new AnalysisException( 264 s"Attempted to unset non-existent property '$k' in table '${table.identifier}'") 265 } 266 } 267 } 268 val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } 269 val newTable = table.copy(properties = newProperties) 270 catalog.alterTable(newTable) 271 Seq.empty[Row] 272 } 273 274} 275 276/** 277 * A command that sets the serde class and/or serde properties of a table/view. 278 * 279 * The syntax of this command is: 280 * {{{ 281 * ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props]; 282 * ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties; 283 * }}} 284 */ 285case class AlterTableSerDePropertiesCommand( 286 tableName: TableIdentifier, 287 serdeClassName: Option[String], 288 serdeProperties: Option[Map[String, String]], 289 partSpec: Option[TablePartitionSpec]) 290 extends RunnableCommand { 291 292 // should never happen if we parsed things correctly 293 require(serdeClassName.isDefined || serdeProperties.isDefined, 294 "ALTER TABLE attempted to set neither serde class name nor serde properties") 295 296 override def run(sparkSession: SparkSession): Seq[Row] = { 297 val catalog = sparkSession.sessionState.catalog 298 val table = catalog.getTableMetadata(tableName) 299 DDLUtils.verifyAlterTableType(catalog, table, isView = false) 300 // For datasource tables, disallow setting serde or specifying partition 301 if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) { 302 throw new AnalysisException("Operation not allowed: ALTER TABLE SET " + 303 "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " + 304 "for tables created with the datasource API") 305 } 306 if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) { 307 throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " + 308 "not supported for tables created with the datasource API") 309 } 310 if (partSpec.isEmpty) { 311 val newTable = table.withNewStorage( 312 serde = serdeClassName.orElse(table.storage.serde), 313 properties = table.storage.properties ++ serdeProperties.getOrElse(Map())) 314 catalog.alterTable(newTable) 315 } else { 316 val spec = partSpec.get 317 val part = catalog.getPartition(table.identifier, spec) 318 val newPart = part.copy(storage = part.storage.copy( 319 serde = serdeClassName.orElse(part.storage.serde), 320 properties = part.storage.properties ++ serdeProperties.getOrElse(Map()))) 321 catalog.alterPartitions(table.identifier, Seq(newPart)) 322 } 323 Seq.empty[Row] 324 } 325 326} 327 328/** 329 * Add Partition in ALTER TABLE: add the table partitions. 330 * 331 * An error message will be issued if the partition exists, unless 'ifNotExists' is true. 332 * 333 * The syntax of this command is: 334 * {{{ 335 * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1'] 336 * PARTITION spec2 [LOCATION 'loc2'] 337 * }}} 338 */ 339case class AlterTableAddPartitionCommand( 340 tableName: TableIdentifier, 341 partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], 342 ifNotExists: Boolean) 343 extends RunnableCommand { 344 345 override def run(sparkSession: SparkSession): Seq[Row] = { 346 val catalog = sparkSession.sessionState.catalog 347 val table = catalog.getTableMetadata(tableName) 348 DDLUtils.verifyAlterTableType(catalog, table, isView = false) 349 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION") 350 val parts = partitionSpecsAndLocs.map { case (spec, location) => 351 val normalizedSpec = PartitioningUtils.normalizePartitionSpec( 352 spec, 353 table.partitionColumnNames, 354 table.identifier.quotedString, 355 sparkSession.sessionState.conf.resolver) 356 // inherit table storage format (possibly except for location) 357 CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location)) 358 } 359 catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) 360 Seq.empty[Row] 361 } 362 363} 364 365/** 366 * Alter a table partition's spec. 367 * 368 * The syntax of this command is: 369 * {{{ 370 * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2; 371 * }}} 372 */ 373case class AlterTableRenamePartitionCommand( 374 tableName: TableIdentifier, 375 oldPartition: TablePartitionSpec, 376 newPartition: TablePartitionSpec) 377 extends RunnableCommand { 378 379 override def run(sparkSession: SparkSession): Seq[Row] = { 380 val catalog = sparkSession.sessionState.catalog 381 val table = catalog.getTableMetadata(tableName) 382 DDLUtils.verifyAlterTableType(catalog, table, isView = false) 383 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION") 384 385 val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec( 386 oldPartition, 387 table.partitionColumnNames, 388 table.identifier.quotedString, 389 sparkSession.sessionState.conf.resolver) 390 391 val normalizedNewPartition = PartitioningUtils.normalizePartitionSpec( 392 newPartition, 393 table.partitionColumnNames, 394 table.identifier.quotedString, 395 sparkSession.sessionState.conf.resolver) 396 397 catalog.renamePartitions( 398 tableName, Seq(normalizedOldPartition), Seq(normalizedNewPartition)) 399 Seq.empty[Row] 400 } 401 402} 403 404/** 405 * Drop Partition in ALTER TABLE: to drop a particular partition for a table. 406 * 407 * This removes the data and metadata for this partition. 408 * The data is actually moved to the .Trash/Current directory if Trash is configured, 409 * unless 'purge' is true, but the metadata is completely lost. 410 * An error message will be issued if the partition does not exist, unless 'ifExists' is true. 411 * Note: purge is always false when the target is a view. 412 * 413 * The syntax of this command is: 414 * {{{ 415 * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; 416 * }}} 417 */ 418case class AlterTableDropPartitionCommand( 419 tableName: TableIdentifier, 420 specs: Seq[TablePartitionSpec], 421 ifExists: Boolean, 422 purge: Boolean, 423 retainData: Boolean) 424 extends RunnableCommand { 425 426 override def run(sparkSession: SparkSession): Seq[Row] = { 427 val catalog = sparkSession.sessionState.catalog 428 val table = catalog.getTableMetadata(tableName) 429 DDLUtils.verifyAlterTableType(catalog, table, isView = false) 430 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") 431 432 val normalizedSpecs = specs.map { spec => 433 PartitioningUtils.normalizePartitionSpec( 434 spec, 435 table.partitionColumnNames, 436 table.identifier.quotedString, 437 sparkSession.sessionState.conf.resolver) 438 } 439 440 catalog.dropPartitions( 441 table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, 442 retainData = retainData) 443 Seq.empty[Row] 444 } 445 446} 447 448 449case class PartitionStatistics(numFiles: Int, totalSize: Long) 450 451/** 452 * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and 453 * update the catalog. 454 * 455 * The syntax of this command is: 456 * {{{ 457 * ALTER TABLE table RECOVER PARTITIONS; 458 * MSCK REPAIR TABLE table; 459 * }}} 460 */ 461case class AlterTableRecoverPartitionsCommand( 462 tableName: TableIdentifier, 463 cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { 464 465 // These are list of statistics that can be collected quickly without requiring a scan of the data 466 // see https://github.com/apache/hive/blob/master/ 467 // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java 468 val NUM_FILES = "numFiles" 469 val TOTAL_SIZE = "totalSize" 470 val DDL_TIME = "transient_lastDdlTime" 471 472 private def getPathFilter(hadoopConf: Configuration): PathFilter = { 473 // Dummy jobconf to get to the pathFilter defined in configuration 474 // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) 475 val jobConf = new JobConf(hadoopConf, this.getClass) 476 val pathFilter = FileInputFormat.getInputPathFilter(jobConf) 477 new PathFilter { 478 override def accept(path: Path): Boolean = { 479 val name = path.getName 480 if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) { 481 pathFilter == null || pathFilter.accept(path) 482 } else { 483 false 484 } 485 } 486 } 487 } 488 489 override def run(spark: SparkSession): Seq[Row] = { 490 val catalog = spark.sessionState.catalog 491 val table = catalog.getTableMetadata(tableName) 492 val tableIdentWithDB = table.identifier.quotedString 493 DDLUtils.verifyAlterTableType(catalog, table, isView = false) 494 if (table.partitionColumnNames.isEmpty) { 495 throw new AnalysisException( 496 s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") 497 } 498 499 if (table.storage.locationUri.isEmpty) { 500 throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " + 501 s"location provided: $tableIdentWithDB") 502 } 503 504 val root = new Path(table.location) 505 logInfo(s"Recover all the partitions in $root") 506 val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) 507 508 val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt 509 val hadoopConf = spark.sparkContext.hadoopConfiguration 510 val pathFilter = getPathFilter(hadoopConf) 511 val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(), 512 table.partitionColumnNames, threshold, spark.sessionState.conf.resolver) 513 val total = partitionSpecsAndLocs.length 514 logInfo(s"Found $total partitions in $root") 515 516 val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { 517 gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) 518 } else { 519 GenMap.empty[String, PartitionStatistics] 520 } 521 logInfo(s"Finished to gather the fast stats for all $total partitions.") 522 523 addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) 524 // Updates the table to indicate that its partition metadata is stored in the Hive metastore. 525 // This is always the case for Hive format tables, but is not true for Datasource tables created 526 // before Spark 2.1 unless they are converted via `msck repair table`. 527 spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true)) 528 catalog.refreshTable(tableName) 529 logInfo(s"Recovered all partitions ($total).") 530 Seq.empty[Row] 531 } 532 533 @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) 534 535 private def scanPartitions( 536 spark: SparkSession, 537 fs: FileSystem, 538 filter: PathFilter, 539 path: Path, 540 spec: TablePartitionSpec, 541 partitionNames: Seq[String], 542 threshold: Int, 543 resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = { 544 if (partitionNames.isEmpty) { 545 return Seq(spec -> path) 546 } 547 548 val statuses = fs.listStatus(path, filter) 549 val statusPar: GenSeq[FileStatus] = 550 if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) { 551 // parallelize the list of partitions here, then we can have better parallelism later. 552 val parArray = statuses.par 553 parArray.tasksupport = evalTaskSupport 554 parArray 555 } else { 556 statuses 557 } 558 statusPar.flatMap { st => 559 val name = st.getPath.getName 560 if (st.isDirectory && name.contains("=")) { 561 val ps = name.split("=", 2) 562 val columnName = ExternalCatalogUtils.unescapePathName(ps(0)) 563 // TODO: Validate the value 564 val value = ExternalCatalogUtils.unescapePathName(ps(1)) 565 if (resolver(columnName, partitionNames.head)) { 566 scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), 567 partitionNames.drop(1), threshold, resolver) 568 } else { 569 logWarning( 570 s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") 571 Seq() 572 } 573 } else { 574 logWarning(s"ignore ${new Path(path, name)}") 575 Seq() 576 } 577 } 578 } 579 580 private def gatherPartitionStats( 581 spark: SparkSession, 582 partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], 583 fs: FileSystem, 584 pathFilter: PathFilter, 585 threshold: Int): GenMap[String, PartitionStatistics] = { 586 if (partitionSpecsAndLocs.length > threshold) { 587 val hadoopConf = spark.sparkContext.hadoopConfiguration 588 val serializableConfiguration = new SerializableConfiguration(hadoopConf) 589 val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray 590 591 // Set the number of parallelism to prevent following file listing from generating many tasks 592 // in case of large #defaultParallelism. 593 val numParallelism = Math.min(serializedPaths.length, 594 Math.min(spark.sparkContext.defaultParallelism, 10000)) 595 // gather the fast stats for all the partitions otherwise Hive metastore will list all the 596 // files for all the new partitions in sequential way, which is super slow. 597 logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.") 598 spark.sparkContext.parallelize(serializedPaths, numParallelism) 599 .mapPartitions { paths => 600 val pathFilter = getPathFilter(serializableConfiguration.value) 601 paths.map(new Path(_)).map{ path => 602 val fs = path.getFileSystem(serializableConfiguration.value) 603 val statuses = fs.listStatus(path, pathFilter) 604 (path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) 605 } 606 }.collectAsMap() 607 } else { 608 partitionSpecsAndLocs.map { case (_, location) => 609 val statuses = fs.listStatus(location, pathFilter) 610 (location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) 611 }.toMap 612 } 613 } 614 615 private def addPartitions( 616 spark: SparkSession, 617 table: CatalogTable, 618 partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], 619 partitionStats: GenMap[String, PartitionStatistics]): Unit = { 620 val total = partitionSpecsAndLocs.length 621 var done = 0L 622 // Hive metastore may not have enough memory to handle millions of partitions in single RPC, 623 // we should split them into smaller batches. Since Hive client is not thread safe, we cannot 624 // do this in parallel. 625 val batchSize = 100 626 partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch => 627 val now = System.currentTimeMillis() / 1000 628 val parts = batch.map { case (spec, location) => 629 val params = partitionStats.get(location.toString).map { 630 case PartitionStatistics(numFiles, totalSize) => 631 // This two fast stat could prevent Hive metastore to list the files again. 632 Map(NUM_FILES -> numFiles.toString, 633 TOTAL_SIZE -> totalSize.toString, 634 // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. 635 // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 636 DDL_TIME -> now.toString) 637 }.getOrElse(Map.empty) 638 // inherit table storage format (possibly except for location) 639 CatalogTablePartition( 640 spec, 641 table.storage.copy(locationUri = Some(location.toUri.toString)), 642 params) 643 } 644 spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) 645 done += parts.length 646 logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)") 647 } 648 } 649} 650 651 652/** 653 * A command that sets the location of a table or a partition. 654 * 655 * For normal tables, this just sets the location URI in the table/partition's storage format. 656 * For datasource tables, this sets a "path" parameter in the table/partition's serde properties. 657 * 658 * The syntax of this command is: 659 * {{{ 660 * ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc"; 661 * }}} 662 */ 663case class AlterTableSetLocationCommand( 664 tableName: TableIdentifier, 665 partitionSpec: Option[TablePartitionSpec], 666 location: String) 667 extends RunnableCommand { 668 669 override def run(sparkSession: SparkSession): Seq[Row] = { 670 val catalog = sparkSession.sessionState.catalog 671 val table = catalog.getTableMetadata(tableName) 672 DDLUtils.verifyAlterTableType(catalog, table, isView = false) 673 partitionSpec match { 674 case Some(spec) => 675 DDLUtils.verifyPartitionProviderIsHive( 676 sparkSession, table, "ALTER TABLE ... SET LOCATION") 677 // Partition spec is specified, so we set the location only for this partition 678 val part = catalog.getPartition(table.identifier, spec) 679 val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location))) 680 catalog.alterPartitions(table.identifier, Seq(newPart)) 681 case None => 682 // No partition spec is specified, so we set the location for the table itself 683 catalog.alterTable(table.withNewStorage(locationUri = Some(location))) 684 } 685 Seq.empty[Row] 686 } 687} 688 689 690object DDLUtils { 691 val HIVE_PROVIDER = "hive" 692 693 def isDatasourceTable(table: CatalogTable): Boolean = { 694 table.provider.isDefined && table.provider.get != HIVE_PROVIDER 695 } 696 697 /** 698 * Throws a standard error for actions that require partitionProvider = hive. 699 */ 700 def verifyPartitionProviderIsHive( 701 spark: SparkSession, table: CatalogTable, action: String): Unit = { 702 val tableName = table.identifier.table 703 if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) { 704 throw new AnalysisException( 705 s"$action is not allowed on $tableName since filesource partition management is " + 706 "disabled (spark.sql.hive.manageFilesourcePartitions = false).") 707 } 708 if (!table.tracksPartitionsInCatalog && isDatasourceTable(table)) { 709 throw new AnalysisException( 710 s"$action is not allowed on $tableName since its partition metadata is not stored in " + 711 "the Hive metastore. To import this information into the metastore, run " + 712 s"`msck repair table $tableName`") 713 } 714 } 715 716 /** 717 * If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view, 718 * issue an exception [[AnalysisException]]. 719 * 720 * Note: temporary views can be altered by both ALTER VIEW and ALTER TABLE commands, 721 * since temporary views can be also created by CREATE TEMPORARY TABLE. In the future, 722 * when we decided to drop the support, we should disallow users to alter temporary views 723 * by ALTER TABLE. 724 */ 725 def verifyAlterTableType( 726 catalog: SessionCatalog, 727 tableMetadata: CatalogTable, 728 isView: Boolean): Unit = { 729 if (!catalog.isTemporaryTable(tableMetadata.identifier)) { 730 tableMetadata.tableType match { 731 case CatalogTableType.VIEW if !isView => 732 throw new AnalysisException( 733 "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead") 734 case o if o != CatalogTableType.VIEW && isView => 735 throw new AnalysisException( 736 s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead") 737 case _ => 738 } 739 } 740 } 741} 742