1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.command
19
20import scala.util.control.NonFatal
21
22import org.apache.hadoop.fs.{FileSystem, Path}
23
24import org.apache.spark.internal.Logging
25import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
26import org.apache.spark.sql.catalyst.TableIdentifier
27import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
28import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
29import org.apache.spark.sql.catalyst.plans.logical.Statistics
30import org.apache.spark.sql.execution.datasources.LogicalRelation
31import org.apache.spark.sql.internal.SessionState
32
33
34/**
35 * Analyzes the given table to generate statistics, which will be used in query optimizations.
36 */
37case class AnalyzeTableCommand(
38    tableIdent: TableIdentifier,
39    noscan: Boolean = true) extends RunnableCommand {
40
41  override def run(sparkSession: SparkSession): Seq[Row] = {
42    val sessionState = sparkSession.sessionState
43    val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
44    val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
45    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
46
47    relation match {
48      case relation: CatalogRelation =>
49        updateTableStats(relation.catalogTable,
50          AnalyzeTableCommand.calculateTotalSize(sessionState, relation.catalogTable))
51
52      // data source tables have been converted into LogicalRelations
53      case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
54        updateTableStats(logicalRel.catalogTable.get,
55          AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))
56
57      case otherRelation =>
58        throw new AnalysisException("ANALYZE TABLE is not supported for " +
59          s"${otherRelation.nodeName}.")
60    }
61
62    def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
63      val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
64      val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
65      var newStats: Option[Statistics] = None
66      if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
67        newStats = Some(Statistics(sizeInBytes = newTotalSize))
68      }
69      // We only set rowCount when noscan is false, because otherwise:
70      // 1. when total size is not changed, we don't need to alter the table;
71      // 2. when total size is changed, `oldRowCount` becomes invalid.
72      // This is to make sure that we only record the right statistics.
73      if (!noscan) {
74        val newRowCount = Dataset.ofRows(sparkSession, relation).count()
75        if (newRowCount >= 0 && newRowCount != oldRowCount) {
76          newStats = if (newStats.isDefined) {
77            newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
78          } else {
79            Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
80          }
81        }
82      }
83      // Update the metastore if the above statistics of the table are different from those
84      // recorded in the metastore.
85      if (newStats.isDefined) {
86        sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
87        // Refresh the cached data source table in the catalog.
88        sessionState.catalog.refreshTable(tableIdentWithDB)
89      }
90    }
91
92    Seq.empty[Row]
93  }
94}
95
96object AnalyzeTableCommand extends Logging {
97
98  def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): Long = {
99    // This method is mainly based on
100    // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
101    // in Hive 0.13 (except that we do not use fs.getContentSummary).
102    // TODO: Generalize statistics collection.
103    // TODO: Why fs.getContentSummary returns wrong size on Jenkins?
104    // Can we use fs.getContentSummary in future?
105    // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
106    // countFileSize to count the table size.
107    val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
108
109    def calculateTableSize(fs: FileSystem, path: Path): Long = {
110      val fileStatus = fs.getFileStatus(path)
111      val size = if (fileStatus.isDirectory) {
112        fs.listStatus(path)
113          .map { status =>
114            if (!status.getPath.getName.startsWith(stagingDir)) {
115              calculateTableSize(fs, status.getPath)
116            } else {
117              0L
118            }
119          }.sum
120      } else {
121        fileStatus.getLen
122      }
123
124      size
125    }
126
127    catalogTable.storage.locationUri.map { p =>
128      val path = new Path(p)
129      try {
130        val fs = path.getFileSystem(sessionState.newHadoopConf())
131        calculateTableSize(fs, path)
132      } catch {
133        case NonFatal(e) =>
134          logWarning(
135            s"Failed to get the size of table ${catalogTable.identifier.table} in the " +
136              s"database ${catalogTable.identifier.database} because of ${e.toString}", e)
137          0L
138      }
139    }.getOrElse(0L)
140  }
141}
142