1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.command 19 20import java.io.File 21import java.net.URI 22import java.nio.file.FileSystems 23import java.util.Date 24 25import scala.collection.mutable.ArrayBuffer 26import scala.util.control.NonFatal 27import scala.util.Try 28 29import org.apache.hadoop.fs.Path 30 31import org.apache.spark.sql.{AnalysisException, Row, SparkSession} 32import org.apache.spark.sql.catalyst.TableIdentifier 33import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException 34import org.apache.spark.sql.catalyst.catalog._ 35import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ 36import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec 37import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} 38import org.apache.spark.sql.catalyst.util.quoteIdentifier 39import org.apache.spark.sql.execution.datasources.PartitioningUtils 40import org.apache.spark.sql.types._ 41import org.apache.spark.util.Utils 42 43/** 44 * A command to create a MANAGED table with the same definition of the given existing table. 45 * In the target table definition, the table comment is always empty but the column comments 46 * are identical to the ones defined in the source table. 47 * 48 * The CatalogTable attributes copied from the source table are storage(inputFormat, outputFormat, 49 * serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec. 50 * 51 * The syntax of using this command in SQL is: 52 * {{{ 53 * CREATE TABLE [IF NOT EXISTS] [db_name.]table_name 54 * LIKE [other_db_name.]existing_table_name 55 * }}} 56 */ 57case class CreateTableLikeCommand( 58 targetTable: TableIdentifier, 59 sourceTable: TableIdentifier, 60 ifNotExists: Boolean) extends RunnableCommand { 61 62 override def run(sparkSession: SparkSession): Seq[Row] = { 63 val catalog = sparkSession.sessionState.catalog 64 val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable) 65 66 val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) { 67 Some(sparkSession.sessionState.conf.defaultDataSourceName) 68 } else { 69 sourceTableDesc.provider 70 } 71 72 val newTableDesc = 73 CatalogTable( 74 identifier = targetTable, 75 tableType = CatalogTableType.MANAGED, 76 // We are creating a new managed table, which should not have custom table location. 77 storage = sourceTableDesc.storage.copy(locationUri = None), 78 schema = sourceTableDesc.schema, 79 provider = newProvider, 80 partitionColumnNames = sourceTableDesc.partitionColumnNames, 81 bucketSpec = sourceTableDesc.bucketSpec) 82 83 catalog.createTable(newTableDesc, ifNotExists) 84 Seq.empty[Row] 85 } 86} 87 88 89// TODO: move the rest of the table commands from ddl.scala to this file 90 91/** 92 * A command to create a table. 93 * 94 * Note: This is currently used only for creating Hive tables. 95 * This is not intended for temporary tables. 96 * 97 * The syntax of using this command in SQL is: 98 * {{{ 99 * CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name 100 * [(col1 data_type [COMMENT col_comment], ...)] 101 * [COMMENT table_comment] 102 * [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)] 103 * [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS] 104 * [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) 105 * [STORED AS DIRECTORIES] 106 * [ROW FORMAT row_format] 107 * [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]] 108 * [LOCATION path] 109 * [TBLPROPERTIES (property_name=property_value, ...)] 110 * [AS select_statement]; 111 * }}} 112 */ 113case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand { 114 115 override def run(sparkSession: SparkSession): Seq[Row] = { 116 sparkSession.sessionState.catalog.createTable(table, ifNotExists) 117 Seq.empty[Row] 118 } 119} 120 121 122/** 123 * A command that renames a table/view. 124 * 125 * The syntax of this command is: 126 * {{{ 127 * ALTER TABLE table1 RENAME TO table2; 128 * ALTER VIEW view1 RENAME TO view2; 129 * }}} 130 */ 131case class AlterTableRenameCommand( 132 oldName: TableIdentifier, 133 newName: TableIdentifier, 134 isView: Boolean) 135 extends RunnableCommand { 136 137 override def run(sparkSession: SparkSession): Seq[Row] = { 138 val catalog = sparkSession.sessionState.catalog 139 // If this is a temp view, just rename the view. 140 // Otherwise, if this is a real table, we also need to uncache and invalidate the table. 141 if (catalog.isTemporaryTable(oldName)) { 142 catalog.renameTable(oldName, newName) 143 } else { 144 val table = catalog.getTableMetadata(oldName) 145 DDLUtils.verifyAlterTableType(catalog, table, isView) 146 // If an exception is thrown here we can just assume the table is uncached; 147 // this can happen with Hive tables when the underlying catalog is in-memory. 148 val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false) 149 if (wasCached) { 150 try { 151 sparkSession.catalog.uncacheTable(oldName.unquotedString) 152 } catch { 153 case NonFatal(e) => log.warn(e.toString, e) 154 } 155 } 156 // Invalidate the table last, otherwise uncaching the table would load the logical plan 157 // back into the hive metastore cache 158 catalog.refreshTable(oldName) 159 catalog.renameTable(oldName, newName) 160 if (wasCached) { 161 sparkSession.catalog.cacheTable(newName.unquotedString) 162 } 163 } 164 Seq.empty[Row] 165 } 166 167} 168 169/** 170 * A command that loads data into a Hive table. 171 * 172 * The syntax of this command is: 173 * {{{ 174 * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename 175 * [PARTITION (partcol1=val1, partcol2=val2 ...)] 176 * }}} 177 */ 178case class LoadDataCommand( 179 table: TableIdentifier, 180 path: String, 181 isLocal: Boolean, 182 isOverwrite: Boolean, 183 partition: Option[TablePartitionSpec]) extends RunnableCommand { 184 185 override def run(sparkSession: SparkSession): Seq[Row] = { 186 val catalog = sparkSession.sessionState.catalog 187 val targetTable = catalog.getTableMetadata(table) 188 val tableIdentwithDB = targetTable.identifier.quotedString 189 190 if (targetTable.tableType == CatalogTableType.VIEW) { 191 throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $tableIdentwithDB") 192 } 193 if (DDLUtils.isDatasourceTable(targetTable)) { 194 throw new AnalysisException( 195 s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB") 196 } 197 if (targetTable.partitionColumnNames.nonEmpty) { 198 if (partition.isEmpty) { 199 throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + 200 s"but no partition spec is provided") 201 } 202 if (targetTable.partitionColumnNames.size != partition.get.size) { 203 throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + 204 s"but number of columns in provided partition spec (${partition.get.size}) " + 205 s"do not match number of partitioned columns in table " + 206 s"(s${targetTable.partitionColumnNames.size})") 207 } 208 partition.get.keys.foreach { colName => 209 if (!targetTable.partitionColumnNames.contains(colName)) { 210 throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + 211 s"but the specified partition spec refers to a column that is not partitioned: " + 212 s"'$colName'") 213 } 214 } 215 } else { 216 if (partition.nonEmpty) { 217 throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " + 218 s"partitioned, but a partition spec was provided.") 219 } 220 } 221 222 val loadPath = 223 if (isLocal) { 224 val uri = Utils.resolveURI(path) 225 val filePath = uri.getPath() 226 val exists = if (filePath.contains("*")) { 227 val fileSystem = FileSystems.getDefault 228 val pathPattern = fileSystem.getPath(filePath) 229 val dir = pathPattern.getParent.toString 230 if (dir.contains("*")) { 231 throw new AnalysisException( 232 s"LOAD DATA input path allows only filename wildcard: $path") 233 } 234 235 val files = new File(dir).listFiles() 236 if (files == null) { 237 false 238 } else { 239 val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath) 240 files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) 241 } 242 } else { 243 new File(filePath).exists() 244 } 245 if (!exists) { 246 throw new AnalysisException(s"LOAD DATA input path does not exist: $path") 247 } 248 uri 249 } else { 250 val uri = new URI(path) 251 if (uri.getScheme() != null && uri.getAuthority() != null) { 252 uri 253 } else { 254 // Follow Hive's behavior: 255 // If no schema or authority is provided with non-local inpath, 256 // we will use hadoop configuration "fs.default.name". 257 val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.default.name") 258 val defaultFS = if (defaultFSConf == null) { 259 new URI("") 260 } else { 261 new URI(defaultFSConf) 262 } 263 264 val scheme = if (uri.getScheme() != null) { 265 uri.getScheme() 266 } else { 267 defaultFS.getScheme() 268 } 269 val authority = if (uri.getAuthority() != null) { 270 uri.getAuthority() 271 } else { 272 defaultFS.getAuthority() 273 } 274 275 if (scheme == null) { 276 throw new AnalysisException( 277 s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") 278 } 279 280 // Follow Hive's behavior: 281 // If LOCAL is not specified, and the path is relative, 282 // then the path is interpreted relative to "/user/<username>" 283 val uriPath = uri.getPath() 284 val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { 285 uriPath 286 } else { 287 s"/user/${System.getProperty("user.name")}/$uriPath" 288 } 289 new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) 290 } 291 } 292 293 if (partition.nonEmpty) { 294 catalog.loadPartition( 295 targetTable.identifier, 296 loadPath.toString, 297 partition.get, 298 isOverwrite, 299 holdDDLTime = false, 300 inheritTableSpecs = true) 301 } else { 302 catalog.loadTable( 303 targetTable.identifier, 304 loadPath.toString, 305 isOverwrite, 306 holdDDLTime = false) 307 } 308 309 // Refresh the metadata cache to ensure the data visible to the users 310 catalog.refreshTable(targetTable.identifier) 311 312 Seq.empty[Row] 313 } 314} 315 316/** 317 * A command to truncate table. 318 * 319 * The syntax of this command is: 320 * {{{ 321 * TRUNCATE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)] 322 * }}} 323 */ 324case class TruncateTableCommand( 325 tableName: TableIdentifier, 326 partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand { 327 328 override def run(spark: SparkSession): Seq[Row] = { 329 val catalog = spark.sessionState.catalog 330 val table = catalog.getTableMetadata(tableName) 331 val tableIdentWithDB = table.identifier.quotedString 332 333 if (table.tableType == CatalogTableType.EXTERNAL) { 334 throw new AnalysisException( 335 s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB") 336 } 337 if (table.tableType == CatalogTableType.VIEW) { 338 throw new AnalysisException( 339 s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB") 340 } 341 if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { 342 throw new AnalysisException( 343 s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + 344 s"for tables that are not partitioned: $tableIdentWithDB") 345 } 346 if (partitionSpec.isDefined) { 347 DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION") 348 } 349 350 val partCols = table.partitionColumnNames 351 val locations = 352 if (partCols.isEmpty) { 353 Seq(table.storage.locationUri) 354 } else { 355 val normalizedSpec = partitionSpec.map { spec => 356 PartitioningUtils.normalizePartitionSpec( 357 spec, 358 partCols, 359 table.identifier.quotedString, 360 spark.sessionState.conf.resolver) 361 } 362 val partLocations = 363 catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri) 364 365 // Fail if the partition spec is fully specified (not partial) and the partition does not 366 // exist. 367 for (spec <- partitionSpec if partLocations.isEmpty && spec.size == partCols.length) { 368 throw new NoSuchPartitionException(table.database, table.identifier.table, spec) 369 } 370 371 partLocations 372 } 373 val hadoopConf = spark.sessionState.newHadoopConf() 374 locations.foreach { location => 375 if (location.isDefined) { 376 val path = new Path(location.get) 377 try { 378 val fs = path.getFileSystem(hadoopConf) 379 fs.delete(path, true) 380 fs.mkdirs(path) 381 } catch { 382 case NonFatal(e) => 383 throw new AnalysisException( 384 s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " + 385 s"because of ${e.toString}") 386 } 387 } 388 } 389 // After deleting the data, invalidate the table to make sure we don't keep around a stale 390 // file relation in the metastore cache. 391 spark.sessionState.refreshTable(tableName.unquotedString) 392 // Also try to drop the contents of the table from the columnar cache 393 try { 394 spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier)) 395 } catch { 396 case NonFatal(e) => 397 log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e) 398 } 399 Seq.empty[Row] 400 } 401} 402 403/** 404 * Command that looks like 405 * {{{ 406 * DESCRIBE [EXTENDED|FORMATTED] table_name partitionSpec?; 407 * }}} 408 */ 409case class DescribeTableCommand( 410 table: TableIdentifier, 411 partitionSpec: TablePartitionSpec, 412 isExtended: Boolean, 413 isFormatted: Boolean) 414 extends RunnableCommand { 415 416 override val output: Seq[Attribute] = Seq( 417 // Column names are based on Hive. 418 AttributeReference("col_name", StringType, nullable = false, 419 new MetadataBuilder().putString("comment", "name of the column").build())(), 420 AttributeReference("data_type", StringType, nullable = false, 421 new MetadataBuilder().putString("comment", "data type of the column").build())(), 422 AttributeReference("comment", StringType, nullable = true, 423 new MetadataBuilder().putString("comment", "comment of the column").build())() 424 ) 425 426 override def run(sparkSession: SparkSession): Seq[Row] = { 427 val result = new ArrayBuffer[Row] 428 val catalog = sparkSession.sessionState.catalog 429 430 if (catalog.isTemporaryTable(table)) { 431 if (partitionSpec.nonEmpty) { 432 throw new AnalysisException( 433 s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}") 434 } 435 describeSchema(catalog.lookupRelation(table).schema, result) 436 } else { 437 val metadata = catalog.getTableMetadata(table) 438 if (metadata.schema.isEmpty) { 439 // In older version(prior to 2.1) of Spark, the table schema can be empty and should be 440 // inferred at runtime. We should still support it. 441 describeSchema(catalog.lookupRelation(metadata.identifier).schema, result) 442 } else { 443 describeSchema(metadata.schema, result) 444 } 445 446 describePartitionInfo(metadata, result) 447 448 if (partitionSpec.isEmpty) { 449 if (isExtended) { 450 describeExtendedTableInfo(metadata, result) 451 } else if (isFormatted) { 452 describeFormattedTableInfo(metadata, result) 453 } 454 } else { 455 describeDetailedPartitionInfo(sparkSession, catalog, metadata, result) 456 } 457 } 458 459 result 460 } 461 462 private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { 463 if (table.partitionColumnNames.nonEmpty) { 464 append(buffer, "# Partition Information", "", "") 465 append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) 466 describeSchema(table.partitionSchema, buffer) 467 } 468 } 469 470 private def describeExtendedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { 471 append(buffer, "", "", "") 472 append(buffer, "# Detailed Table Information", table.toString, "") 473 } 474 475 private def describeFormattedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { 476 append(buffer, "", "", "") 477 append(buffer, "# Detailed Table Information", "", "") 478 append(buffer, "Database:", table.database, "") 479 append(buffer, "Owner:", table.owner, "") 480 append(buffer, "Create Time:", new Date(table.createTime).toString, "") 481 append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "") 482 append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "") 483 append(buffer, "Table Type:", table.tableType.name, "") 484 table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, "")) 485 486 append(buffer, "Table Parameters:", "", "") 487 table.properties.foreach { case (key, value) => 488 append(buffer, s" $key", value, "") 489 } 490 491 describeStorageInfo(table, buffer) 492 493 if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer) 494 495 if (DDLUtils.isDatasourceTable(table) && table.tracksPartitionsInCatalog) { 496 append(buffer, "Partition Provider:", "Catalog", "") 497 } 498 } 499 500 private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { 501 append(buffer, "", "", "") 502 append(buffer, "# Storage Information", "", "") 503 metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) 504 metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) 505 metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) 506 append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "") 507 describeBucketingInfo(metadata, buffer) 508 509 append(buffer, "Storage Desc Parameters:", "", "") 510 val maskedProperties = CatalogUtils.maskCredentials(metadata.storage.properties) 511 maskedProperties.foreach { case (key, value) => 512 append(buffer, s" $key", value, "") 513 } 514 } 515 516 private def describeViewInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { 517 append(buffer, "", "", "") 518 append(buffer, "# View Information", "", "") 519 append(buffer, "View Original Text:", metadata.viewOriginalText.getOrElse(""), "") 520 append(buffer, "View Expanded Text:", metadata.viewText.getOrElse(""), "") 521 } 522 523 private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { 524 metadata.bucketSpec match { 525 case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => 526 append(buffer, "Num Buckets:", numBuckets.toString, "") 527 append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "") 528 append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", "]"), "") 529 530 case _ => 531 } 532 } 533 534 private def describeDetailedPartitionInfo( 535 spark: SparkSession, 536 catalog: SessionCatalog, 537 metadata: CatalogTable, 538 result: ArrayBuffer[Row]): Unit = { 539 if (metadata.tableType == CatalogTableType.VIEW) { 540 throw new AnalysisException( 541 s"DESC PARTITION is not allowed on a view: ${table.identifier}") 542 } 543 DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION") 544 val partition = catalog.getPartition(table, partitionSpec) 545 if (isExtended) { 546 describeExtendedDetailedPartitionInfo(table, metadata, partition, result) 547 } else if (isFormatted) { 548 describeFormattedDetailedPartitionInfo(table, metadata, partition, result) 549 describeStorageInfo(metadata, result) 550 } 551 } 552 553 private def describeExtendedDetailedPartitionInfo( 554 tableIdentifier: TableIdentifier, 555 table: CatalogTable, 556 partition: CatalogTablePartition, 557 buffer: ArrayBuffer[Row]): Unit = { 558 append(buffer, "", "", "") 559 append(buffer, "Detailed Partition Information " + partition.toString, "", "") 560 } 561 562 private def describeFormattedDetailedPartitionInfo( 563 tableIdentifier: TableIdentifier, 564 table: CatalogTable, 565 partition: CatalogTablePartition, 566 buffer: ArrayBuffer[Row]): Unit = { 567 append(buffer, "", "", "") 568 append(buffer, "# Detailed Partition Information", "", "") 569 append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "") 570 append(buffer, "Database:", table.database, "") 571 append(buffer, "Table:", tableIdentifier.table, "") 572 append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "") 573 append(buffer, "Partition Parameters:", "", "") 574 partition.parameters.foreach { case (key, value) => 575 append(buffer, s" $key", value, "") 576 } 577 } 578 579 private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = { 580 schema.foreach { column => 581 append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull) 582 } 583 } 584 585 private def append( 586 buffer: ArrayBuffer[Row], column: String, dataType: String, comment: String): Unit = { 587 buffer += Row(column, dataType, comment) 588 } 589} 590 591 592/** 593 * A command for users to get tables in the given database. 594 * If a databaseName is not given, the current database will be used. 595 * The syntax of using this command in SQL is: 596 * {{{ 597 * SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards']; 598 * }}} 599 */ 600case class ShowTablesCommand( 601 databaseName: Option[String], 602 tableIdentifierPattern: Option[String]) extends RunnableCommand { 603 604 // The result of SHOW TABLES has three columns: database, tableName and isTemporary. 605 override val output: Seq[Attribute] = { 606 AttributeReference("database", StringType, nullable = false)() :: 607 AttributeReference("tableName", StringType, nullable = false)() :: 608 AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil 609 } 610 611 override def run(sparkSession: SparkSession): Seq[Row] = { 612 // Since we need to return a Seq of rows, we will call getTables directly 613 // instead of calling tables in sparkSession. 614 val catalog = sparkSession.sessionState.catalog 615 val db = databaseName.getOrElse(catalog.getCurrentDatabase) 616 val tables = 617 tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db)) 618 tables.map { tableIdent => 619 val isTemp = catalog.isTemporaryTable(tableIdent) 620 Row(tableIdent.database.getOrElse(""), tableIdent.table, isTemp) 621 } 622 } 623} 624 625 626/** 627 * A command for users to list the properties for a table. If propertyKey is specified, the value 628 * for the propertyKey is returned. If propertyKey is not specified, all the keys and their 629 * corresponding values are returned. 630 * The syntax of using this command in SQL is: 631 * {{{ 632 * SHOW TBLPROPERTIES table_name[('propertyKey')]; 633 * }}} 634 */ 635case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Option[String]) 636 extends RunnableCommand { 637 638 override val output: Seq[Attribute] = { 639 val schema = AttributeReference("value", StringType, nullable = false)() :: Nil 640 propertyKey match { 641 case None => AttributeReference("key", StringType, nullable = false)() :: schema 642 case _ => schema 643 } 644 } 645 646 override def run(sparkSession: SparkSession): Seq[Row] = { 647 val catalog = sparkSession.sessionState.catalog 648 649 if (catalog.isTemporaryTable(table)) { 650 Seq.empty[Row] 651 } else { 652 val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(table) 653 654 propertyKey match { 655 case Some(p) => 656 val propValue = catalogTable 657 .properties 658 .getOrElse(p, s"Table ${catalogTable.qualifiedName} does not have property: $p") 659 Seq(Row(propValue)) 660 case None => 661 catalogTable.properties.map(p => Row(p._1, p._2)).toSeq 662 } 663 } 664 } 665} 666 667/** 668 * A command to list the column names for a table. This function creates a 669 * [[ShowColumnsCommand]] logical plan. 670 * 671 * The syntax of using this command in SQL is: 672 * {{{ 673 * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; 674 * }}} 675 */ 676case class ShowColumnsCommand( 677 databaseName: Option[String], 678 tableName: TableIdentifier) extends RunnableCommand { 679 override val output: Seq[Attribute] = { 680 AttributeReference("col_name", StringType, nullable = false)() :: Nil 681 } 682 683 override def run(sparkSession: SparkSession): Seq[Row] = { 684 val catalog = sparkSession.sessionState.catalog 685 val resolver = sparkSession.sessionState.conf.resolver 686 val lookupTable = databaseName match { 687 case None => tableName 688 case Some(db) if tableName.database.exists(!resolver(_, db)) => 689 throw new AnalysisException( 690 s"SHOW COLUMNS with conflicting databases: '$db' != '${tableName.database.get}'") 691 case Some(db) => TableIdentifier(tableName.identifier, Some(db)) 692 } 693 val table = catalog.getTempViewOrPermanentTableMetadata(lookupTable) 694 table.schema.map { c => 695 Row(c.name) 696 } 697 } 698} 699 700/** 701 * A command to list the partition names of a table. If the partition spec is specified, 702 * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under 703 * the following conditions: 704 * 705 * 1. If the command is called for a non partitioned table. 706 * 2. If the partition spec refers to the columns that are not defined as partitioning columns. 707 * 708 * This function creates a [[ShowPartitionsCommand]] logical plan 709 * 710 * The syntax of using this command in SQL is: 711 * {{{ 712 * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)] 713 * }}} 714 */ 715case class ShowPartitionsCommand( 716 tableName: TableIdentifier, 717 spec: Option[TablePartitionSpec]) extends RunnableCommand { 718 override val output: Seq[Attribute] = { 719 AttributeReference("partition", StringType, nullable = false)() :: Nil 720 } 721 722 override def run(sparkSession: SparkSession): Seq[Row] = { 723 val catalog = sparkSession.sessionState.catalog 724 val table = catalog.getTableMetadata(tableName) 725 val tableIdentWithDB = table.identifier.quotedString 726 727 /** 728 * Validate and throws an [[AnalysisException]] exception under the following conditions: 729 * 1. If the table is not partitioned. 730 * 2. If it is a datasource table. 731 * 3. If it is a view. 732 */ 733 if (table.tableType == VIEW) { 734 throw new AnalysisException(s"SHOW PARTITIONS is not allowed on a view: $tableIdentWithDB") 735 } 736 737 if (table.partitionColumnNames.isEmpty) { 738 throw new AnalysisException( 739 s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB") 740 } 741 742 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "SHOW PARTITIONS") 743 744 /** 745 * Validate the partitioning spec by making sure all the referenced columns are 746 * defined as partitioning columns in table definition. An AnalysisException exception is 747 * thrown if the partitioning spec is invalid. 748 */ 749 if (spec.isDefined) { 750 val badColumns = spec.get.keySet.filterNot(table.partitionColumnNames.contains) 751 if (badColumns.nonEmpty) { 752 val badCols = badColumns.mkString("[", ", ", "]") 753 throw new AnalysisException( 754 s"Non-partitioning column(s) $badCols are specified for SHOW PARTITIONS") 755 } 756 } 757 758 val partNames = catalog.listPartitionNames(tableName, spec) 759 partNames.map(Row(_)) 760 } 761} 762 763case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableCommand { 764 override val output: Seq[Attribute] = Seq( 765 AttributeReference("createtab_stmt", StringType, nullable = false)() 766 ) 767 768 override def run(sparkSession: SparkSession): Seq[Row] = { 769 val catalog = sparkSession.sessionState.catalog 770 val tableMetadata = catalog.getTableMetadata(table) 771 772 // TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table. 773 val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { 774 showCreateDataSourceTable(tableMetadata) 775 } else { 776 showCreateHiveTable(tableMetadata) 777 } 778 779 Seq(Row(stmt)) 780 } 781 782 private def showCreateHiveTable(metadata: CatalogTable): String = { 783 def reportUnsupportedError(features: Seq[String]): Unit = { 784 throw new AnalysisException( 785 s"Failed to execute SHOW CREATE TABLE against table/view ${metadata.identifier}, " + 786 "which is created by Hive and uses the following unsupported feature(s)\n" + 787 features.map(" - " + _).mkString("\n") 788 ) 789 } 790 791 if (metadata.unsupportedFeatures.nonEmpty) { 792 reportUnsupportedError(metadata.unsupportedFeatures) 793 } 794 795 val builder = StringBuilder.newBuilder 796 797 val tableTypeString = metadata.tableType match { 798 case EXTERNAL => " EXTERNAL TABLE" 799 case VIEW => " VIEW" 800 case MANAGED => " TABLE" 801 } 802 803 builder ++= s"CREATE$tableTypeString ${table.quotedString}" 804 805 if (metadata.tableType == VIEW) { 806 if (metadata.schema.nonEmpty) { 807 builder ++= metadata.schema.map(_.name).mkString("(", ", ", ")") 808 } 809 builder ++= metadata.viewText.mkString(" AS\n", "", "\n") 810 } else { 811 showHiveTableHeader(metadata, builder) 812 showHiveTableNonDataColumns(metadata, builder) 813 showHiveTableStorageInfo(metadata, builder) 814 showHiveTableProperties(metadata, builder) 815 } 816 817 builder.toString() 818 } 819 820 private def showHiveTableHeader(metadata: CatalogTable, builder: StringBuilder): Unit = { 821 val columns = metadata.schema.filterNot { column => 822 metadata.partitionColumnNames.contains(column.name) 823 }.map(columnToDDLFragment) 824 825 if (columns.nonEmpty) { 826 builder ++= columns.mkString("(", ", ", ")\n") 827 } 828 829 metadata 830 .comment 831 .map("COMMENT '" + escapeSingleQuotedString(_) + "'\n") 832 .foreach(builder.append) 833 } 834 835 private def columnToDDLFragment(column: StructField): String = { 836 val comment = column.getComment().map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'") 837 s"${quoteIdentifier(column.name)} ${column.dataType.catalogString}${comment.getOrElse("")}" 838 } 839 840 private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = { 841 if (metadata.partitionColumnNames.nonEmpty) { 842 val partCols = metadata.partitionSchema.map(columnToDDLFragment) 843 builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n") 844 } 845 846 if (metadata.bucketSpec.isDefined) { 847 throw new UnsupportedOperationException( 848 "Creating Hive table with bucket spec is not supported yet.") 849 } 850 } 851 852 private def showHiveTableStorageInfo(metadata: CatalogTable, builder: StringBuilder): Unit = { 853 val storage = metadata.storage 854 855 storage.serde.foreach { serde => 856 builder ++= s"ROW FORMAT SERDE '$serde'\n" 857 858 val serdeProps = metadata.storage.properties.map { 859 case (key, value) => 860 s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'" 861 } 862 863 builder ++= serdeProps.mkString("WITH SERDEPROPERTIES (\n ", ",\n ", "\n)\n") 864 } 865 866 if (storage.inputFormat.isDefined || storage.outputFormat.isDefined) { 867 builder ++= "STORED AS\n" 868 869 storage.inputFormat.foreach { format => 870 builder ++= s" INPUTFORMAT '${escapeSingleQuotedString(format)}'\n" 871 } 872 873 storage.outputFormat.foreach { format => 874 builder ++= s" OUTPUTFORMAT '${escapeSingleQuotedString(format)}'\n" 875 } 876 } 877 878 if (metadata.tableType == EXTERNAL) { 879 storage.locationUri.foreach { uri => 880 builder ++= s"LOCATION '$uri'\n" 881 } 882 } 883 } 884 885 private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = { 886 if (metadata.properties.nonEmpty) { 887 val props = metadata.properties.map { case (key, value) => 888 s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'" 889 } 890 891 builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n") 892 } 893 } 894 895 private def showCreateDataSourceTable(metadata: CatalogTable): String = { 896 val builder = StringBuilder.newBuilder 897 898 builder ++= s"CREATE TABLE ${table.quotedString} " 899 showDataSourceTableDataColumns(metadata, builder) 900 showDataSourceTableOptions(metadata, builder) 901 showDataSourceTableNonDataColumns(metadata, builder) 902 903 builder.toString() 904 } 905 906 private def showDataSourceTableDataColumns( 907 metadata: CatalogTable, builder: StringBuilder): Unit = { 908 val columns = metadata.schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") 909 builder ++= columns.mkString("(", ", ", ")\n") 910 } 911 912 private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = { 913 builder ++= s"USING ${metadata.provider.get}\n" 914 915 val dataSourceOptions = metadata.storage.properties.map { 916 case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'" 917 } ++ metadata.storage.locationUri.flatMap { location => 918 if (metadata.tableType == MANAGED) { 919 // If it's a managed table, omit PATH option. Spark SQL always creates external table 920 // when the table creation DDL contains the PATH option. 921 None 922 } else { 923 Some(s"path '${escapeSingleQuotedString(location)}'") 924 } 925 } 926 927 if (dataSourceOptions.nonEmpty) { 928 builder ++= "OPTIONS (\n" 929 builder ++= dataSourceOptions.mkString(" ", ",\n ", "\n") 930 builder ++= ")\n" 931 } 932 } 933 934 private def showDataSourceTableNonDataColumns( 935 metadata: CatalogTable, builder: StringBuilder): Unit = { 936 val partCols = metadata.partitionColumnNames 937 if (partCols.nonEmpty) { 938 builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n" 939 } 940 941 metadata.bucketSpec.foreach { spec => 942 if (spec.bucketColumnNames.nonEmpty) { 943 builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n" 944 945 if (spec.sortColumnNames.nonEmpty) { 946 builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n" 947 } 948 949 builder ++= s"INTO ${spec.numBuckets} BUCKETS\n" 950 } 951 } 952 } 953 954 private def escapeSingleQuotedString(str: String): String = { 955 val builder = StringBuilder.newBuilder 956 957 str.foreach { 958 case '\'' => builder ++= s"\\\'" 959 case ch => builder += ch 960 } 961 962 builder.toString() 963 } 964} 965