1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.hive 19 20import java.io.IOException 21 22import scala.collection.JavaConverters._ 23 24import com.google.common.base.Objects 25import org.apache.hadoop.fs.FileSystem 26import org.apache.hadoop.hive.common.StatsSetupConst 27import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} 28import org.apache.hadoop.hive.metastore.api.FieldSchema 29import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} 30import org.apache.hadoop.hive.ql.plan.TableDesc 31 32import org.apache.spark.sql.SparkSession 33import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation 34import org.apache.spark.sql.catalyst.catalog._ 35import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} 36import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} 37import org.apache.spark.sql.execution.FileRelation 38import org.apache.spark.sql.types._ 39 40 41private[hive] case class MetastoreRelation( 42 databaseName: String, 43 tableName: String) 44 (val catalogTable: CatalogTable, 45 @transient private val sparkSession: SparkSession) 46 extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { 47 48 override def equals(other: Any): Boolean = other match { 49 case relation: MetastoreRelation => 50 databaseName == relation.databaseName && 51 tableName == relation.tableName && 52 output == relation.output 53 case _ => false 54 } 55 56 override def hashCode(): Int = { 57 Objects.hashCode(databaseName, tableName, output) 58 } 59 60 override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil 61 62 private def toHiveColumn(c: StructField): FieldSchema = { 63 val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) { 64 c.metadata.getString(HIVE_TYPE_STRING) 65 } else { 66 c.dataType.catalogString 67 } 68 new FieldSchema(c.name, typeString, c.getComment.orNull) 69 } 70 71 // TODO: merge this with HiveClientImpl#toHiveTable 72 @transient val hiveQlTable: HiveTable = { 73 // We start by constructing an API table as Hive performs several important transformations 74 // internally when converting an API table to a QL table. 75 val tTable = new org.apache.hadoop.hive.metastore.api.Table() 76 tTable.setTableName(catalogTable.identifier.table) 77 tTable.setDbName(catalogTable.database) 78 79 val tableParameters = new java.util.HashMap[String, String]() 80 tTable.setParameters(tableParameters) 81 catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) } 82 83 tTable.setTableType(catalogTable.tableType match { 84 case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString 85 case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString 86 case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString 87 }) 88 89 val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() 90 tTable.setSd(sd) 91 92 // Note: In Hive the schema and partition columns must be disjoint sets 93 val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c => 94 catalogTable.partitionColumnNames.contains(c.getName) 95 } 96 sd.setCols(schema.asJava) 97 tTable.setPartitionKeys(partCols.asJava) 98 99 catalogTable.storage.locationUri.foreach(sd.setLocation) 100 catalogTable.storage.inputFormat.foreach(sd.setInputFormat) 101 catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) 102 103 val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo 104 catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib) 105 sd.setSerdeInfo(serdeInfo) 106 107 val serdeParameters = new java.util.HashMap[String, String]() 108 catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) } 109 serdeInfo.setParameters(serdeParameters) 110 111 new HiveTable(tTable) 112 } 113 114 @transient override lazy val statistics: Statistics = { 115 catalogTable.stats.getOrElse(Statistics( 116 sizeInBytes = { 117 val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) 118 val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) 119 // TODO: check if this estimate is valid for tables after partition pruning. 120 // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be 121 // relatively cheap if parameters for the table are populated into the metastore. 122 // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys 123 // (see StatsSetupConst in Hive) that we can look at in the future. 124 BigInt( 125 // When table is external,`totalSize` is always zero, which will influence join strategy 126 // so when `totalSize` is zero, use `rawDataSize` instead 127 // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`, 128 // which is generated by analyze command. 129 if (totalSize != null && totalSize.toLong > 0L) { 130 totalSize.toLong 131 } else if (rawDataSize != null && rawDataSize.toLong > 0) { 132 rawDataSize.toLong 133 } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { 134 try { 135 val hadoopConf = sparkSession.sessionState.newHadoopConf() 136 val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) 137 fs.getContentSummary(hiveQlTable.getPath).getLength 138 } catch { 139 case e: IOException => 140 logWarning("Failed to get table size from hdfs.", e) 141 sparkSession.sessionState.conf.defaultSizeInBytes 142 } 143 } else { 144 sparkSession.sessionState.conf.defaultSizeInBytes 145 }) 146 } 147 )) 148 } 149 150 // When metastore partition pruning is turned off, we cache the list of all partitions to 151 // mimic the behavior of Spark < 1.5 152 private lazy val allPartitions: Seq[CatalogTablePartition] = { 153 sparkSession.sharedState.externalCatalog.listPartitions( 154 catalogTable.database, 155 catalogTable.identifier.table) 156 } 157 158 def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { 159 val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { 160 sparkSession.sharedState.externalCatalog.listPartitionsByFilter( 161 catalogTable.database, 162 catalogTable.identifier.table, 163 predicates) 164 } else { 165 allPartitions 166 } 167 168 rawPartitions.map { p => 169 val tPartition = new org.apache.hadoop.hive.metastore.api.Partition 170 tPartition.setDbName(databaseName) 171 tPartition.setTableName(tableName) 172 tPartition.setValues(partitionKeys.map(a => p.spec(a.name)).asJava) 173 174 val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() 175 tPartition.setSd(sd) 176 177 // Note: In Hive the schema and partition columns must be disjoint sets 178 val schema = catalogTable.schema.map(toHiveColumn).filter { c => 179 !catalogTable.partitionColumnNames.contains(c.getName) 180 } 181 sd.setCols(schema.asJava) 182 183 p.storage.locationUri.foreach(sd.setLocation) 184 p.storage.inputFormat.foreach(sd.setInputFormat) 185 p.storage.outputFormat.foreach(sd.setOutputFormat) 186 187 val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo 188 sd.setSerdeInfo(serdeInfo) 189 // maps and lists should be set only after all elements are ready (see HIVE-7975) 190 p.storage.serde.foreach(serdeInfo.setSerializationLib) 191 192 val serdeParameters = new java.util.HashMap[String, String]() 193 catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) } 194 p.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) } 195 serdeInfo.setParameters(serdeParameters) 196 197 new Partition(hiveQlTable, tPartition) 198 } 199 } 200 201 /** Only compare database and tablename, not alias. */ 202 override def sameResult(plan: LogicalPlan): Boolean = { 203 plan.canonicalized match { 204 case mr: MetastoreRelation => 205 mr.databaseName == databaseName && mr.tableName == tableName 206 case _ => false 207 } 208 } 209 210 val tableDesc = new TableDesc( 211 hiveQlTable.getInputFormatClass, 212 // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because 213 // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to 214 // substitute some output formats, e.g. substituting SequenceFileOutputFormat to 215 // HiveSequenceFileOutputFormat. 216 hiveQlTable.getOutputFormatClass, 217 hiveQlTable.getMetadata 218 ) 219 220 implicit class SchemaAttribute(f: StructField) { 221 def toAttribute: AttributeReference = AttributeReference( 222 f.name, 223 f.dataType, 224 // Since data can be dumped in randomly with no validation, everything is nullable. 225 nullable = true 226 )(qualifier = Some(tableName)) 227 } 228 229 /** PartitionKey attributes */ 230 val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute) 231 232 /** Non-partitionKey attributes */ 233 // TODO: just make this hold the schema itself, not just non-partition columns 234 val attributes = catalogTable.schema 235 .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } 236 .map(_.toAttribute) 237 238 val output = attributes ++ partitionKeys 239 240 /** An attribute map that can be used to lookup original attributes based on expression id. */ 241 val attributeMap = AttributeMap(output.map(o => (o, o))) 242 243 /** An attribute map for determining the ordinal for non-partition columns. */ 244 val columnOrdinals = AttributeMap(attributes.zipWithIndex) 245 246 override def inputFiles: Array[String] = { 247 val partLocations = allPartitions 248 .flatMap(_.storage.locationUri) 249 .toArray 250 if (partLocations.nonEmpty) { 251 partLocations 252 } else { 253 Array( 254 catalogTable.storage.locationUri.getOrElse( 255 sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) 256 } 257 } 258 259 override def newInstance(): MetastoreRelation = { 260 MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession) 261 } 262} 263