1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.hive
19
20import java.io.IOException
21
22import scala.collection.JavaConverters._
23
24import com.google.common.base.Objects
25import org.apache.hadoop.fs.FileSystem
26import org.apache.hadoop.hive.common.StatsSetupConst
27import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
28import org.apache.hadoop.hive.metastore.api.FieldSchema
29import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
30import org.apache.hadoop.hive.ql.plan.TableDesc
31
32import org.apache.spark.sql.SparkSession
33import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
34import org.apache.spark.sql.catalyst.catalog._
35import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
36import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
37import org.apache.spark.sql.execution.FileRelation
38import org.apache.spark.sql.types._
39
40
41private[hive] case class MetastoreRelation(
42    databaseName: String,
43    tableName: String)
44    (val catalogTable: CatalogTable,
45     @transient private val sparkSession: SparkSession)
46  extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {
47
48  override def equals(other: Any): Boolean = other match {
49    case relation: MetastoreRelation =>
50      databaseName == relation.databaseName &&
51        tableName == relation.tableName &&
52        output == relation.output
53    case _ => false
54  }
55
56  override def hashCode(): Int = {
57    Objects.hashCode(databaseName, tableName, output)
58  }
59
60  override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
61
62  private def toHiveColumn(c: StructField): FieldSchema = {
63    val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) {
64      c.metadata.getString(HIVE_TYPE_STRING)
65    } else {
66      c.dataType.catalogString
67    }
68    new FieldSchema(c.name, typeString, c.getComment.orNull)
69  }
70
71  // TODO: merge this with HiveClientImpl#toHiveTable
72  @transient val hiveQlTable: HiveTable = {
73    // We start by constructing an API table as Hive performs several important transformations
74    // internally when converting an API table to a QL table.
75    val tTable = new org.apache.hadoop.hive.metastore.api.Table()
76    tTable.setTableName(catalogTable.identifier.table)
77    tTable.setDbName(catalogTable.database)
78
79    val tableParameters = new java.util.HashMap[String, String]()
80    tTable.setParameters(tableParameters)
81    catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
82
83    tTable.setTableType(catalogTable.tableType match {
84      case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
85      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
86      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
87    })
88
89    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
90    tTable.setSd(sd)
91
92    // Note: In Hive the schema and partition columns must be disjoint sets
93    val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
94      catalogTable.partitionColumnNames.contains(c.getName)
95    }
96    sd.setCols(schema.asJava)
97    tTable.setPartitionKeys(partCols.asJava)
98
99    catalogTable.storage.locationUri.foreach(sd.setLocation)
100    catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
101    catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
102
103    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
104    catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
105    sd.setSerdeInfo(serdeInfo)
106
107    val serdeParameters = new java.util.HashMap[String, String]()
108    catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
109    serdeInfo.setParameters(serdeParameters)
110
111    new HiveTable(tTable)
112  }
113
114  @transient override lazy val statistics: Statistics = {
115    catalogTable.stats.getOrElse(Statistics(
116      sizeInBytes = {
117        val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
118        val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
119        // TODO: check if this estimate is valid for tables after partition pruning.
120        // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
121        // relatively cheap if parameters for the table are populated into the metastore.
122        // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
123        // (see StatsSetupConst in Hive) that we can look at in the future.
124        BigInt(
125          // When table is external,`totalSize` is always zero, which will influence join strategy
126          // so when `totalSize` is zero, use `rawDataSize` instead
127          // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`,
128          // which is generated by analyze command.
129          if (totalSize != null && totalSize.toLong > 0L) {
130            totalSize.toLong
131          } else if (rawDataSize != null && rawDataSize.toLong > 0) {
132            rawDataSize.toLong
133          } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
134            try {
135              val hadoopConf = sparkSession.sessionState.newHadoopConf()
136              val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
137              fs.getContentSummary(hiveQlTable.getPath).getLength
138            } catch {
139              case e: IOException =>
140                logWarning("Failed to get table size from hdfs.", e)
141                sparkSession.sessionState.conf.defaultSizeInBytes
142            }
143          } else {
144            sparkSession.sessionState.conf.defaultSizeInBytes
145          })
146      }
147    ))
148  }
149
150  // When metastore partition pruning is turned off, we cache the list of all partitions to
151  // mimic the behavior of Spark < 1.5
152  private lazy val allPartitions: Seq[CatalogTablePartition] = {
153    sparkSession.sharedState.externalCatalog.listPartitions(
154      catalogTable.database,
155      catalogTable.identifier.table)
156  }
157
158  def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
159    val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
160      sparkSession.sharedState.externalCatalog.listPartitionsByFilter(
161        catalogTable.database,
162        catalogTable.identifier.table,
163        predicates)
164    } else {
165      allPartitions
166    }
167
168    rawPartitions.map { p =>
169      val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
170      tPartition.setDbName(databaseName)
171      tPartition.setTableName(tableName)
172      tPartition.setValues(partitionKeys.map(a => p.spec(a.name)).asJava)
173
174      val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
175      tPartition.setSd(sd)
176
177      // Note: In Hive the schema and partition columns must be disjoint sets
178      val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
179        !catalogTable.partitionColumnNames.contains(c.getName)
180      }
181      sd.setCols(schema.asJava)
182
183      p.storage.locationUri.foreach(sd.setLocation)
184      p.storage.inputFormat.foreach(sd.setInputFormat)
185      p.storage.outputFormat.foreach(sd.setOutputFormat)
186
187      val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
188      sd.setSerdeInfo(serdeInfo)
189      // maps and lists should be set only after all elements are ready (see HIVE-7975)
190      p.storage.serde.foreach(serdeInfo.setSerializationLib)
191
192      val serdeParameters = new java.util.HashMap[String, String]()
193      catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
194      p.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
195      serdeInfo.setParameters(serdeParameters)
196
197      new Partition(hiveQlTable, tPartition)
198    }
199  }
200
201  /** Only compare database and tablename, not alias. */
202  override def sameResult(plan: LogicalPlan): Boolean = {
203    plan.canonicalized match {
204      case mr: MetastoreRelation =>
205        mr.databaseName == databaseName && mr.tableName == tableName
206      case _ => false
207    }
208  }
209
210  val tableDesc = new TableDesc(
211    hiveQlTable.getInputFormatClass,
212    // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
213    // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
214    // substitute some output formats, e.g. substituting SequenceFileOutputFormat to
215    // HiveSequenceFileOutputFormat.
216    hiveQlTable.getOutputFormatClass,
217    hiveQlTable.getMetadata
218  )
219
220  implicit class SchemaAttribute(f: StructField) {
221    def toAttribute: AttributeReference = AttributeReference(
222      f.name,
223      f.dataType,
224      // Since data can be dumped in randomly with no validation, everything is nullable.
225      nullable = true
226    )(qualifier = Some(tableName))
227  }
228
229  /** PartitionKey attributes */
230  val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute)
231
232  /** Non-partitionKey attributes */
233  // TODO: just make this hold the schema itself, not just non-partition columns
234  val attributes = catalogTable.schema
235    .filter { c => !catalogTable.partitionColumnNames.contains(c.name) }
236    .map(_.toAttribute)
237
238  val output = attributes ++ partitionKeys
239
240  /** An attribute map that can be used to lookup original attributes based on expression id. */
241  val attributeMap = AttributeMap(output.map(o => (o, o)))
242
243  /** An attribute map for determining the ordinal for non-partition columns. */
244  val columnOrdinals = AttributeMap(attributes.zipWithIndex)
245
246  override def inputFiles: Array[String] = {
247    val partLocations = allPartitions
248      .flatMap(_.storage.locationUri)
249      .toArray
250    if (partLocations.nonEmpty) {
251      partLocations
252    } else {
253      Array(
254        catalogTable.storage.locationUri.getOrElse(
255          sys.error(s"Could not get the location of ${catalogTable.qualifiedName}.")))
256    }
257  }
258
259  override def newInstance(): MetastoreRelation = {
260    MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession)
261  }
262}
263