1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.command 19 20import scala.util.control.NonFatal 21 22import org.apache.hadoop.fs.{FileSystem, Path} 23 24import org.apache.spark.internal.Logging 25import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} 26import org.apache.spark.sql.catalyst.TableIdentifier 27import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases 28import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} 29import org.apache.spark.sql.catalyst.plans.logical.Statistics 30import org.apache.spark.sql.execution.datasources.LogicalRelation 31import org.apache.spark.sql.internal.SessionState 32 33 34/** 35 * Analyzes the given table to generate statistics, which will be used in query optimizations. 36 */ 37case class AnalyzeTableCommand( 38 tableIdent: TableIdentifier, 39 noscan: Boolean = true) extends RunnableCommand { 40 41 override def run(sparkSession: SparkSession): Seq[Row] = { 42 val sessionState = sparkSession.sessionState 43 val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) 44 val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) 45 val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) 46 47 relation match { 48 case relation: CatalogRelation => 49 updateTableStats(relation.catalogTable, 50 AnalyzeTableCommand.calculateTotalSize(sessionState, relation.catalogTable)) 51 52 // data source tables have been converted into LogicalRelations 53 case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => 54 updateTableStats(logicalRel.catalogTable.get, 55 AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get)) 56 57 case otherRelation => 58 throw new AnalysisException("ANALYZE TABLE is not supported for " + 59 s"${otherRelation.nodeName}.") 60 } 61 62 def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { 63 val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L) 64 val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) 65 var newStats: Option[Statistics] = None 66 if (newTotalSize > 0 && newTotalSize != oldTotalSize) { 67 newStats = Some(Statistics(sizeInBytes = newTotalSize)) 68 } 69 // We only set rowCount when noscan is false, because otherwise: 70 // 1. when total size is not changed, we don't need to alter the table; 71 // 2. when total size is changed, `oldRowCount` becomes invalid. 72 // This is to make sure that we only record the right statistics. 73 if (!noscan) { 74 val newRowCount = Dataset.ofRows(sparkSession, relation).count() 75 if (newRowCount >= 0 && newRowCount != oldRowCount) { 76 newStats = if (newStats.isDefined) { 77 newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) 78 } else { 79 Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) 80 } 81 } 82 } 83 // Update the metastore if the above statistics of the table are different from those 84 // recorded in the metastore. 85 if (newStats.isDefined) { 86 sessionState.catalog.alterTable(catalogTable.copy(stats = newStats)) 87 // Refresh the cached data source table in the catalog. 88 sessionState.catalog.refreshTable(tableIdentWithDB) 89 } 90 } 91 92 Seq.empty[Row] 93 } 94} 95 96object AnalyzeTableCommand extends Logging { 97 98 def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): Long = { 99 // This method is mainly based on 100 // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) 101 // in Hive 0.13 (except that we do not use fs.getContentSummary). 102 // TODO: Generalize statistics collection. 103 // TODO: Why fs.getContentSummary returns wrong size on Jenkins? 104 // Can we use fs.getContentSummary in future? 105 // Seems fs.getContentSummary returns wrong table size on Jenkins. So we use 106 // countFileSize to count the table size. 107 val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") 108 109 def calculateTableSize(fs: FileSystem, path: Path): Long = { 110 val fileStatus = fs.getFileStatus(path) 111 val size = if (fileStatus.isDirectory) { 112 fs.listStatus(path) 113 .map { status => 114 if (!status.getPath.getName.startsWith(stagingDir)) { 115 calculateTableSize(fs, status.getPath) 116 } else { 117 0L 118 } 119 }.sum 120 } else { 121 fileStatus.getLen 122 } 123 124 size 125 } 126 127 catalogTable.storage.locationUri.map { p => 128 val path = new Path(p) 129 try { 130 val fs = path.getFileSystem(sessionState.newHadoopConf()) 131 calculateTableSize(fs, path) 132 } catch { 133 case NonFatal(e) => 134 logWarning( 135 s"Failed to get the size of table ${catalogTable.identifier.table} in the " + 136 s"database ${catalogTable.identifier.database} because of ${e.toString}", e) 137 0L 138 } 139 }.getOrElse(0L) 140 } 141} 142