1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.command
19
20import java.io.File
21import java.net.URI
22import java.nio.file.FileSystems
23import java.util.Date
24
25import scala.collection.mutable.ArrayBuffer
26import scala.util.control.NonFatal
27import scala.util.Try
28
29import org.apache.hadoop.fs.Path
30
31import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
32import org.apache.spark.sql.catalyst.TableIdentifier
33import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
34import org.apache.spark.sql.catalyst.catalog._
35import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
36import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
37import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
38import org.apache.spark.sql.catalyst.util.quoteIdentifier
39import org.apache.spark.sql.execution.datasources.PartitioningUtils
40import org.apache.spark.sql.types._
41import org.apache.spark.util.Utils
42
43/**
44 * A command to create a MANAGED table with the same definition of the given existing table.
45 * In the target table definition, the table comment is always empty but the column comments
46 * are identical to the ones defined in the source table.
47 *
48 * The CatalogTable attributes copied from the source table are storage(inputFormat, outputFormat,
49 * serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec.
50 *
51 * The syntax of using this command in SQL is:
52 * {{{
53 *   CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
54 *   LIKE [other_db_name.]existing_table_name
55 * }}}
56 */
57case class CreateTableLikeCommand(
58    targetTable: TableIdentifier,
59    sourceTable: TableIdentifier,
60    ifNotExists: Boolean) extends RunnableCommand {
61
62  override def run(sparkSession: SparkSession): Seq[Row] = {
63    val catalog = sparkSession.sessionState.catalog
64    val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
65
66    val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
67      Some(sparkSession.sessionState.conf.defaultDataSourceName)
68    } else {
69      sourceTableDesc.provider
70    }
71
72    val newTableDesc =
73      CatalogTable(
74        identifier = targetTable,
75        tableType = CatalogTableType.MANAGED,
76        // We are creating a new managed table, which should not have custom table location.
77        storage = sourceTableDesc.storage.copy(locationUri = None),
78        schema = sourceTableDesc.schema,
79        provider = newProvider,
80        partitionColumnNames = sourceTableDesc.partitionColumnNames,
81        bucketSpec = sourceTableDesc.bucketSpec)
82
83    catalog.createTable(newTableDesc, ifNotExists)
84    Seq.empty[Row]
85  }
86}
87
88
89// TODO: move the rest of the table commands from ddl.scala to this file
90
91/**
92 * A command to create a table.
93 *
94 * Note: This is currently used only for creating Hive tables.
95 * This is not intended for temporary tables.
96 *
97 * The syntax of using this command in SQL is:
98 * {{{
99 *   CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
100 *   [(col1 data_type [COMMENT col_comment], ...)]
101 *   [COMMENT table_comment]
102 *   [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
103 *   [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
104 *   [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...)
105 *   [STORED AS DIRECTORIES]
106 *   [ROW FORMAT row_format]
107 *   [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
108 *   [LOCATION path]
109 *   [TBLPROPERTIES (property_name=property_value, ...)]
110 *   [AS select_statement];
111 * }}}
112 */
113case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
114
115  override def run(sparkSession: SparkSession): Seq[Row] = {
116    sparkSession.sessionState.catalog.createTable(table, ifNotExists)
117    Seq.empty[Row]
118  }
119}
120
121
122/**
123 * A command that renames a table/view.
124 *
125 * The syntax of this command is:
126 * {{{
127 *    ALTER TABLE table1 RENAME TO table2;
128 *    ALTER VIEW view1 RENAME TO view2;
129 * }}}
130 */
131case class AlterTableRenameCommand(
132    oldName: TableIdentifier,
133    newName: TableIdentifier,
134    isView: Boolean)
135  extends RunnableCommand {
136
137  override def run(sparkSession: SparkSession): Seq[Row] = {
138    val catalog = sparkSession.sessionState.catalog
139    // If this is a temp view, just rename the view.
140    // Otherwise, if this is a real table, we also need to uncache and invalidate the table.
141    if (catalog.isTemporaryTable(oldName)) {
142      catalog.renameTable(oldName, newName)
143    } else {
144      val table = catalog.getTableMetadata(oldName)
145      DDLUtils.verifyAlterTableType(catalog, table, isView)
146      // If an exception is thrown here we can just assume the table is uncached;
147      // this can happen with Hive tables when the underlying catalog is in-memory.
148      val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
149      if (wasCached) {
150        try {
151          sparkSession.catalog.uncacheTable(oldName.unquotedString)
152        } catch {
153          case NonFatal(e) => log.warn(e.toString, e)
154        }
155      }
156      // Invalidate the table last, otherwise uncaching the table would load the logical plan
157      // back into the hive metastore cache
158      catalog.refreshTable(oldName)
159      catalog.renameTable(oldName, newName)
160      if (wasCached) {
161        sparkSession.catalog.cacheTable(newName.unquotedString)
162      }
163    }
164    Seq.empty[Row]
165  }
166
167}
168
169/**
170 * A command that loads data into a Hive table.
171 *
172 * The syntax of this command is:
173 * {{{
174 *  LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
175 *  [PARTITION (partcol1=val1, partcol2=val2 ...)]
176 * }}}
177 */
178case class LoadDataCommand(
179    table: TableIdentifier,
180    path: String,
181    isLocal: Boolean,
182    isOverwrite: Boolean,
183    partition: Option[TablePartitionSpec]) extends RunnableCommand {
184
185  override def run(sparkSession: SparkSession): Seq[Row] = {
186    val catalog = sparkSession.sessionState.catalog
187    val targetTable = catalog.getTableMetadata(table)
188    val tableIdentwithDB = targetTable.identifier.quotedString
189
190    if (targetTable.tableType == CatalogTableType.VIEW) {
191      throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $tableIdentwithDB")
192    }
193    if (DDLUtils.isDatasourceTable(targetTable)) {
194      throw new AnalysisException(
195        s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB")
196    }
197    if (targetTable.partitionColumnNames.nonEmpty) {
198      if (partition.isEmpty) {
199        throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
200          s"but no partition spec is provided")
201      }
202      if (targetTable.partitionColumnNames.size != partition.get.size) {
203        throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
204          s"but number of columns in provided partition spec (${partition.get.size}) " +
205          s"do not match number of partitioned columns in table " +
206          s"(s${targetTable.partitionColumnNames.size})")
207      }
208      partition.get.keys.foreach { colName =>
209        if (!targetTable.partitionColumnNames.contains(colName)) {
210          throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
211            s"but the specified partition spec refers to a column that is not partitioned: " +
212            s"'$colName'")
213        }
214      }
215    } else {
216      if (partition.nonEmpty) {
217        throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " +
218          s"partitioned, but a partition spec was provided.")
219      }
220    }
221
222    val loadPath =
223      if (isLocal) {
224        val uri = Utils.resolveURI(path)
225        val filePath = uri.getPath()
226        val exists = if (filePath.contains("*")) {
227          val fileSystem = FileSystems.getDefault
228          val pathPattern = fileSystem.getPath(filePath)
229          val dir = pathPattern.getParent.toString
230          if (dir.contains("*")) {
231            throw new AnalysisException(
232              s"LOAD DATA input path allows only filename wildcard: $path")
233          }
234
235          val files = new File(dir).listFiles()
236          if (files == null) {
237            false
238          } else {
239            val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath)
240            files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
241          }
242        } else {
243          new File(filePath).exists()
244        }
245        if (!exists) {
246          throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
247        }
248        uri
249      } else {
250        val uri = new URI(path)
251        if (uri.getScheme() != null && uri.getAuthority() != null) {
252          uri
253        } else {
254          // Follow Hive's behavior:
255          // If no schema or authority is provided with non-local inpath,
256          // we will use hadoop configuration "fs.default.name".
257          val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.default.name")
258          val defaultFS = if (defaultFSConf == null) {
259            new URI("")
260          } else {
261            new URI(defaultFSConf)
262          }
263
264          val scheme = if (uri.getScheme() != null) {
265            uri.getScheme()
266          } else {
267            defaultFS.getScheme()
268          }
269          val authority = if (uri.getAuthority() != null) {
270            uri.getAuthority()
271          } else {
272            defaultFS.getAuthority()
273          }
274
275          if (scheme == null) {
276            throw new AnalysisException(
277              s"LOAD DATA: URI scheme is required for non-local input paths: '$path'")
278          }
279
280          // Follow Hive's behavior:
281          // If LOCAL is not specified, and the path is relative,
282          // then the path is interpreted relative to "/user/<username>"
283          val uriPath = uri.getPath()
284          val absolutePath = if (uriPath != null && uriPath.startsWith("/")) {
285            uriPath
286          } else {
287            s"/user/${System.getProperty("user.name")}/$uriPath"
288          }
289          new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment())
290        }
291      }
292
293    if (partition.nonEmpty) {
294      catalog.loadPartition(
295        targetTable.identifier,
296        loadPath.toString,
297        partition.get,
298        isOverwrite,
299        holdDDLTime = false,
300        inheritTableSpecs = true)
301    } else {
302      catalog.loadTable(
303        targetTable.identifier,
304        loadPath.toString,
305        isOverwrite,
306        holdDDLTime = false)
307    }
308
309    // Refresh the metadata cache to ensure the data visible to the users
310    catalog.refreshTable(targetTable.identifier)
311
312    Seq.empty[Row]
313  }
314}
315
316/**
317 * A command to truncate table.
318 *
319 * The syntax of this command is:
320 * {{{
321 *   TRUNCATE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
322 * }}}
323 */
324case class TruncateTableCommand(
325    tableName: TableIdentifier,
326    partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand {
327
328  override def run(spark: SparkSession): Seq[Row] = {
329    val catalog = spark.sessionState.catalog
330    val table = catalog.getTableMetadata(tableName)
331    val tableIdentWithDB = table.identifier.quotedString
332
333    if (table.tableType == CatalogTableType.EXTERNAL) {
334      throw new AnalysisException(
335        s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB")
336    }
337    if (table.tableType == CatalogTableType.VIEW) {
338      throw new AnalysisException(
339        s"Operation not allowed: TRUNCATE TABLE on views: $tableIdentWithDB")
340    }
341    if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
342      throw new AnalysisException(
343        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
344        s"for tables that are not partitioned: $tableIdentWithDB")
345    }
346    if (partitionSpec.isDefined) {
347      DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
348    }
349
350    val partCols = table.partitionColumnNames
351    val locations =
352      if (partCols.isEmpty) {
353        Seq(table.storage.locationUri)
354      } else {
355        val normalizedSpec = partitionSpec.map { spec =>
356          PartitioningUtils.normalizePartitionSpec(
357            spec,
358            partCols,
359            table.identifier.quotedString,
360            spark.sessionState.conf.resolver)
361        }
362        val partLocations =
363          catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)
364
365        // Fail if the partition spec is fully specified (not partial) and the partition does not
366        // exist.
367        for (spec <- partitionSpec if partLocations.isEmpty && spec.size == partCols.length) {
368          throw new NoSuchPartitionException(table.database, table.identifier.table, spec)
369        }
370
371        partLocations
372      }
373    val hadoopConf = spark.sessionState.newHadoopConf()
374    locations.foreach { location =>
375      if (location.isDefined) {
376        val path = new Path(location.get)
377        try {
378          val fs = path.getFileSystem(hadoopConf)
379          fs.delete(path, true)
380          fs.mkdirs(path)
381        } catch {
382          case NonFatal(e) =>
383            throw new AnalysisException(
384              s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " +
385                s"because of ${e.toString}")
386        }
387      }
388    }
389    // After deleting the data, invalidate the table to make sure we don't keep around a stale
390    // file relation in the metastore cache.
391    spark.sessionState.refreshTable(tableName.unquotedString)
392    // Also try to drop the contents of the table from the columnar cache
393    try {
394      spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
395    } catch {
396      case NonFatal(e) =>
397        log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e)
398    }
399    Seq.empty[Row]
400  }
401}
402
403/**
404 * Command that looks like
405 * {{{
406 *   DESCRIBE [EXTENDED|FORMATTED] table_name partitionSpec?;
407 * }}}
408 */
409case class DescribeTableCommand(
410    table: TableIdentifier,
411    partitionSpec: TablePartitionSpec,
412    isExtended: Boolean,
413    isFormatted: Boolean)
414  extends RunnableCommand {
415
416  override val output: Seq[Attribute] = Seq(
417    // Column names are based on Hive.
418    AttributeReference("col_name", StringType, nullable = false,
419      new MetadataBuilder().putString("comment", "name of the column").build())(),
420    AttributeReference("data_type", StringType, nullable = false,
421      new MetadataBuilder().putString("comment", "data type of the column").build())(),
422    AttributeReference("comment", StringType, nullable = true,
423      new MetadataBuilder().putString("comment", "comment of the column").build())()
424  )
425
426  override def run(sparkSession: SparkSession): Seq[Row] = {
427    val result = new ArrayBuffer[Row]
428    val catalog = sparkSession.sessionState.catalog
429
430    if (catalog.isTemporaryTable(table)) {
431      if (partitionSpec.nonEmpty) {
432        throw new AnalysisException(
433          s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}")
434      }
435      describeSchema(catalog.lookupRelation(table).schema, result)
436    } else {
437      val metadata = catalog.getTableMetadata(table)
438      if (metadata.schema.isEmpty) {
439        // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
440        // inferred at runtime. We should still support it.
441        describeSchema(catalog.lookupRelation(metadata.identifier).schema, result)
442      } else {
443        describeSchema(metadata.schema, result)
444      }
445
446      describePartitionInfo(metadata, result)
447
448      if (partitionSpec.isEmpty) {
449        if (isExtended) {
450          describeExtendedTableInfo(metadata, result)
451        } else if (isFormatted) {
452          describeFormattedTableInfo(metadata, result)
453        }
454      } else {
455        describeDetailedPartitionInfo(sparkSession, catalog, metadata, result)
456      }
457    }
458
459    result
460  }
461
462  private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
463    if (table.partitionColumnNames.nonEmpty) {
464      append(buffer, "# Partition Information", "", "")
465      append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
466      describeSchema(table.partitionSchema, buffer)
467    }
468  }
469
470  private def describeExtendedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
471    append(buffer, "", "", "")
472    append(buffer, "# Detailed Table Information", table.toString, "")
473  }
474
475  private def describeFormattedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
476    append(buffer, "", "", "")
477    append(buffer, "# Detailed Table Information", "", "")
478    append(buffer, "Database:", table.database, "")
479    append(buffer, "Owner:", table.owner, "")
480    append(buffer, "Create Time:", new Date(table.createTime).toString, "")
481    append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "")
482    append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "")
483    append(buffer, "Table Type:", table.tableType.name, "")
484    table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, ""))
485
486    append(buffer, "Table Parameters:", "", "")
487    table.properties.foreach { case (key, value) =>
488      append(buffer, s"  $key", value, "")
489    }
490
491    describeStorageInfo(table, buffer)
492
493    if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer)
494
495    if (DDLUtils.isDatasourceTable(table) && table.tracksPartitionsInCatalog) {
496      append(buffer, "Partition Provider:", "Catalog", "")
497    }
498  }
499
500  private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
501    append(buffer, "", "", "")
502    append(buffer, "# Storage Information", "", "")
503    metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, ""))
504    metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, ""))
505    metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, ""))
506    append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "")
507    describeBucketingInfo(metadata, buffer)
508
509    append(buffer, "Storage Desc Parameters:", "", "")
510    val maskedProperties = CatalogUtils.maskCredentials(metadata.storage.properties)
511    maskedProperties.foreach { case (key, value) =>
512      append(buffer, s"  $key", value, "")
513    }
514  }
515
516  private def describeViewInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
517    append(buffer, "", "", "")
518    append(buffer, "# View Information", "", "")
519    append(buffer, "View Original Text:", metadata.viewOriginalText.getOrElse(""), "")
520    append(buffer, "View Expanded Text:", metadata.viewText.getOrElse(""), "")
521  }
522
523  private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
524    metadata.bucketSpec match {
525      case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
526        append(buffer, "Num Buckets:", numBuckets.toString, "")
527        append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "")
528        append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", "]"), "")
529
530      case _ =>
531    }
532  }
533
534  private def describeDetailedPartitionInfo(
535      spark: SparkSession,
536      catalog: SessionCatalog,
537      metadata: CatalogTable,
538      result: ArrayBuffer[Row]): Unit = {
539    if (metadata.tableType == CatalogTableType.VIEW) {
540      throw new AnalysisException(
541        s"DESC PARTITION is not allowed on a view: ${table.identifier}")
542    }
543    DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION")
544    val partition = catalog.getPartition(table, partitionSpec)
545    if (isExtended) {
546      describeExtendedDetailedPartitionInfo(table, metadata, partition, result)
547    } else if (isFormatted) {
548      describeFormattedDetailedPartitionInfo(table, metadata, partition, result)
549      describeStorageInfo(metadata, result)
550    }
551  }
552
553  private def describeExtendedDetailedPartitionInfo(
554      tableIdentifier: TableIdentifier,
555      table: CatalogTable,
556      partition: CatalogTablePartition,
557      buffer: ArrayBuffer[Row]): Unit = {
558    append(buffer, "", "", "")
559    append(buffer, "Detailed Partition Information " + partition.toString, "", "")
560  }
561
562  private def describeFormattedDetailedPartitionInfo(
563      tableIdentifier: TableIdentifier,
564      table: CatalogTable,
565      partition: CatalogTablePartition,
566      buffer: ArrayBuffer[Row]): Unit = {
567    append(buffer, "", "", "")
568    append(buffer, "# Detailed Partition Information", "", "")
569    append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "")
570    append(buffer, "Database:", table.database, "")
571    append(buffer, "Table:", tableIdentifier.table, "")
572    append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "")
573    append(buffer, "Partition Parameters:", "", "")
574    partition.parameters.foreach { case (key, value) =>
575      append(buffer, s"  $key", value, "")
576    }
577  }
578
579  private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
580    schema.foreach { column =>
581      append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull)
582    }
583  }
584
585  private def append(
586      buffer: ArrayBuffer[Row], column: String, dataType: String, comment: String): Unit = {
587    buffer += Row(column, dataType, comment)
588  }
589}
590
591
592/**
593 * A command for users to get tables in the given database.
594 * If a databaseName is not given, the current database will be used.
595 * The syntax of using this command in SQL is:
596 * {{{
597 *   SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards'];
598 * }}}
599 */
600case class ShowTablesCommand(
601    databaseName: Option[String],
602    tableIdentifierPattern: Option[String]) extends RunnableCommand {
603
604  // The result of SHOW TABLES has three columns: database, tableName and isTemporary.
605  override val output: Seq[Attribute] = {
606    AttributeReference("database", StringType, nullable = false)() ::
607      AttributeReference("tableName", StringType, nullable = false)() ::
608      AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
609  }
610
611  override def run(sparkSession: SparkSession): Seq[Row] = {
612    // Since we need to return a Seq of rows, we will call getTables directly
613    // instead of calling tables in sparkSession.
614    val catalog = sparkSession.sessionState.catalog
615    val db = databaseName.getOrElse(catalog.getCurrentDatabase)
616    val tables =
617      tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db))
618    tables.map { tableIdent =>
619      val isTemp = catalog.isTemporaryTable(tableIdent)
620      Row(tableIdent.database.getOrElse(""), tableIdent.table, isTemp)
621    }
622  }
623}
624
625
626/**
627 * A command for users to list the properties for a table. If propertyKey is specified, the value
628 * for the propertyKey is returned. If propertyKey is not specified, all the keys and their
629 * corresponding values are returned.
630 * The syntax of using this command in SQL is:
631 * {{{
632 *   SHOW TBLPROPERTIES table_name[('propertyKey')];
633 * }}}
634 */
635case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Option[String])
636  extends RunnableCommand {
637
638  override val output: Seq[Attribute] = {
639    val schema = AttributeReference("value", StringType, nullable = false)() :: Nil
640    propertyKey match {
641      case None => AttributeReference("key", StringType, nullable = false)() :: schema
642      case _ => schema
643    }
644  }
645
646  override def run(sparkSession: SparkSession): Seq[Row] = {
647    val catalog = sparkSession.sessionState.catalog
648
649    if (catalog.isTemporaryTable(table)) {
650      Seq.empty[Row]
651    } else {
652      val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(table)
653
654      propertyKey match {
655        case Some(p) =>
656          val propValue = catalogTable
657            .properties
658            .getOrElse(p, s"Table ${catalogTable.qualifiedName} does not have property: $p")
659          Seq(Row(propValue))
660        case None =>
661          catalogTable.properties.map(p => Row(p._1, p._2)).toSeq
662      }
663    }
664  }
665}
666
667/**
668 * A command to list the column names for a table. This function creates a
669 * [[ShowColumnsCommand]] logical plan.
670 *
671 * The syntax of using this command in SQL is:
672 * {{{
673 *   SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
674 * }}}
675 */
676case class ShowColumnsCommand(
677    databaseName: Option[String],
678    tableName: TableIdentifier) extends RunnableCommand {
679  override val output: Seq[Attribute] = {
680    AttributeReference("col_name", StringType, nullable = false)() :: Nil
681  }
682
683  override def run(sparkSession: SparkSession): Seq[Row] = {
684    val catalog = sparkSession.sessionState.catalog
685    val resolver = sparkSession.sessionState.conf.resolver
686    val lookupTable = databaseName match {
687      case None => tableName
688      case Some(db) if tableName.database.exists(!resolver(_, db)) =>
689        throw new AnalysisException(
690          s"SHOW COLUMNS with conflicting databases: '$db' != '${tableName.database.get}'")
691      case Some(db) => TableIdentifier(tableName.identifier, Some(db))
692    }
693    val table = catalog.getTempViewOrPermanentTableMetadata(lookupTable)
694    table.schema.map { c =>
695      Row(c.name)
696    }
697  }
698}
699
700/**
701 * A command to list the partition names of a table. If the partition spec is specified,
702 * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under
703 * the following conditions:
704 *
705 * 1. If the command is called for a non partitioned table.
706 * 2. If the partition spec refers to the columns that are not defined as partitioning columns.
707 *
708 * This function creates a [[ShowPartitionsCommand]] logical plan
709 *
710 * The syntax of using this command in SQL is:
711 * {{{
712 *   SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
713 * }}}
714 */
715case class ShowPartitionsCommand(
716    tableName: TableIdentifier,
717    spec: Option[TablePartitionSpec]) extends RunnableCommand {
718  override val output: Seq[Attribute] = {
719    AttributeReference("partition", StringType, nullable = false)() :: Nil
720  }
721
722  override def run(sparkSession: SparkSession): Seq[Row] = {
723    val catalog = sparkSession.sessionState.catalog
724    val table = catalog.getTableMetadata(tableName)
725    val tableIdentWithDB = table.identifier.quotedString
726
727    /**
728     * Validate and throws an [[AnalysisException]] exception under the following conditions:
729     * 1. If the table is not partitioned.
730     * 2. If it is a datasource table.
731     * 3. If it is a view.
732     */
733    if (table.tableType == VIEW) {
734      throw new AnalysisException(s"SHOW PARTITIONS is not allowed on a view: $tableIdentWithDB")
735    }
736
737    if (table.partitionColumnNames.isEmpty) {
738      throw new AnalysisException(
739        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
740    }
741
742    DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "SHOW PARTITIONS")
743
744    /**
745     * Validate the partitioning spec by making sure all the referenced columns are
746     * defined as partitioning columns in table definition. An AnalysisException exception is
747     * thrown if the partitioning spec is invalid.
748     */
749    if (spec.isDefined) {
750      val badColumns = spec.get.keySet.filterNot(table.partitionColumnNames.contains)
751      if (badColumns.nonEmpty) {
752        val badCols = badColumns.mkString("[", ", ", "]")
753        throw new AnalysisException(
754          s"Non-partitioning column(s) $badCols are specified for SHOW PARTITIONS")
755      }
756    }
757
758    val partNames = catalog.listPartitionNames(tableName, spec)
759    partNames.map(Row(_))
760  }
761}
762
763case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableCommand {
764  override val output: Seq[Attribute] = Seq(
765    AttributeReference("createtab_stmt", StringType, nullable = false)()
766  )
767
768  override def run(sparkSession: SparkSession): Seq[Row] = {
769    val catalog = sparkSession.sessionState.catalog
770    val tableMetadata = catalog.getTableMetadata(table)
771
772    // TODO: unify this after we unify the CREATE TABLE syntax for hive serde and data source table.
773    val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
774      showCreateDataSourceTable(tableMetadata)
775    } else {
776      showCreateHiveTable(tableMetadata)
777    }
778
779    Seq(Row(stmt))
780  }
781
782  private def showCreateHiveTable(metadata: CatalogTable): String = {
783    def reportUnsupportedError(features: Seq[String]): Unit = {
784      throw new AnalysisException(
785        s"Failed to execute SHOW CREATE TABLE against table/view ${metadata.identifier}, " +
786          "which is created by Hive and uses the following unsupported feature(s)\n" +
787          features.map(" - " + _).mkString("\n")
788      )
789    }
790
791    if (metadata.unsupportedFeatures.nonEmpty) {
792      reportUnsupportedError(metadata.unsupportedFeatures)
793    }
794
795    val builder = StringBuilder.newBuilder
796
797    val tableTypeString = metadata.tableType match {
798      case EXTERNAL => " EXTERNAL TABLE"
799      case VIEW => " VIEW"
800      case MANAGED => " TABLE"
801    }
802
803    builder ++= s"CREATE$tableTypeString ${table.quotedString}"
804
805    if (metadata.tableType == VIEW) {
806      if (metadata.schema.nonEmpty) {
807        builder ++= metadata.schema.map(_.name).mkString("(", ", ", ")")
808      }
809      builder ++= metadata.viewText.mkString(" AS\n", "", "\n")
810    } else {
811      showHiveTableHeader(metadata, builder)
812      showHiveTableNonDataColumns(metadata, builder)
813      showHiveTableStorageInfo(metadata, builder)
814      showHiveTableProperties(metadata, builder)
815    }
816
817    builder.toString()
818  }
819
820  private def showHiveTableHeader(metadata: CatalogTable, builder: StringBuilder): Unit = {
821    val columns = metadata.schema.filterNot { column =>
822      metadata.partitionColumnNames.contains(column.name)
823    }.map(columnToDDLFragment)
824
825    if (columns.nonEmpty) {
826      builder ++= columns.mkString("(", ", ", ")\n")
827    }
828
829    metadata
830      .comment
831      .map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
832      .foreach(builder.append)
833  }
834
835  private def columnToDDLFragment(column: StructField): String = {
836    val comment = column.getComment().map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
837    s"${quoteIdentifier(column.name)} ${column.dataType.catalogString}${comment.getOrElse("")}"
838  }
839
840  private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = {
841    if (metadata.partitionColumnNames.nonEmpty) {
842      val partCols = metadata.partitionSchema.map(columnToDDLFragment)
843      builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
844    }
845
846    if (metadata.bucketSpec.isDefined) {
847      throw new UnsupportedOperationException(
848        "Creating Hive table with bucket spec is not supported yet.")
849    }
850  }
851
852  private def showHiveTableStorageInfo(metadata: CatalogTable, builder: StringBuilder): Unit = {
853    val storage = metadata.storage
854
855    storage.serde.foreach { serde =>
856      builder ++= s"ROW FORMAT SERDE '$serde'\n"
857
858      val serdeProps = metadata.storage.properties.map {
859        case (key, value) =>
860          s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
861      }
862
863      builder ++= serdeProps.mkString("WITH SERDEPROPERTIES (\n  ", ",\n  ", "\n)\n")
864    }
865
866    if (storage.inputFormat.isDefined || storage.outputFormat.isDefined) {
867      builder ++= "STORED AS\n"
868
869      storage.inputFormat.foreach { format =>
870        builder ++= s"  INPUTFORMAT '${escapeSingleQuotedString(format)}'\n"
871      }
872
873      storage.outputFormat.foreach { format =>
874        builder ++= s"  OUTPUTFORMAT '${escapeSingleQuotedString(format)}'\n"
875      }
876    }
877
878    if (metadata.tableType == EXTERNAL) {
879      storage.locationUri.foreach { uri =>
880        builder ++= s"LOCATION '$uri'\n"
881      }
882    }
883  }
884
885  private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
886    if (metadata.properties.nonEmpty) {
887      val props = metadata.properties.map { case (key, value) =>
888        s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
889      }
890
891      builder ++= props.mkString("TBLPROPERTIES (\n  ", ",\n  ", "\n)\n")
892    }
893  }
894
895  private def showCreateDataSourceTable(metadata: CatalogTable): String = {
896    val builder = StringBuilder.newBuilder
897
898    builder ++= s"CREATE TABLE ${table.quotedString} "
899    showDataSourceTableDataColumns(metadata, builder)
900    showDataSourceTableOptions(metadata, builder)
901    showDataSourceTableNonDataColumns(metadata, builder)
902
903    builder.toString()
904  }
905
906  private def showDataSourceTableDataColumns(
907      metadata: CatalogTable, builder: StringBuilder): Unit = {
908    val columns = metadata.schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
909    builder ++= columns.mkString("(", ", ", ")\n")
910  }
911
912  private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
913    builder ++= s"USING ${metadata.provider.get}\n"
914
915    val dataSourceOptions = metadata.storage.properties.map {
916      case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
917    } ++ metadata.storage.locationUri.flatMap { location =>
918      if (metadata.tableType == MANAGED) {
919        // If it's a managed table, omit PATH option. Spark SQL always creates external table
920        // when the table creation DDL contains the PATH option.
921        None
922      } else {
923        Some(s"path '${escapeSingleQuotedString(location)}'")
924      }
925    }
926
927    if (dataSourceOptions.nonEmpty) {
928      builder ++= "OPTIONS (\n"
929      builder ++= dataSourceOptions.mkString("  ", ",\n  ", "\n")
930      builder ++= ")\n"
931    }
932  }
933
934  private def showDataSourceTableNonDataColumns(
935      metadata: CatalogTable, builder: StringBuilder): Unit = {
936    val partCols = metadata.partitionColumnNames
937    if (partCols.nonEmpty) {
938      builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
939    }
940
941    metadata.bucketSpec.foreach { spec =>
942      if (spec.bucketColumnNames.nonEmpty) {
943        builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"
944
945        if (spec.sortColumnNames.nonEmpty) {
946          builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n"
947        }
948
949        builder ++= s"INTO ${spec.numBuckets} BUCKETS\n"
950      }
951    }
952  }
953
954  private def escapeSingleQuotedString(str: String): String = {
955    val builder = StringBuilder.newBuilder
956
957    str.foreach {
958      case '\'' => builder ++= s"\\\'"
959      case ch => builder += ch
960    }
961
962    builder.toString()
963  }
964}
965