1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.datasources
19
20import org.apache.hadoop.conf.Configuration
21import org.apache.hadoop.fs.Path
22
23import org.apache.spark.sql.SparkSession
24import org.apache.spark.sql.catalyst.catalog.CatalogTable
25import org.apache.spark.sql.catalyst.expressions._
26import org.apache.spark.sql.types.StructType
27
28
29/**
30 * A [[FileIndex]] for a metastore catalog table.
31 *
32 * @param sparkSession a [[SparkSession]]
33 * @param table the metadata of the table
34 * @param sizeInBytes the table's data size in bytes
35 */
36class CatalogFileIndex(
37    sparkSession: SparkSession,
38    val table: CatalogTable,
39    override val sizeInBytes: Long) extends FileIndex {
40
41  protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf()
42
43  /** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */
44  private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
45
46  assert(table.identifier.database.isDefined,
47    "The table identifier must be qualified in CatalogFileIndex")
48
49  private val baseLocation: Option[String] = table.storage.locationUri
50
51  override def partitionSchema: StructType = table.partitionSchema
52
53  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
54
55  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
56    filterPartitions(filters).listFiles(Nil)
57  }
58
59  override def refresh(): Unit = fileStatusCache.invalidateAll()
60
61  /**
62   * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
63   * specified by the given partition-pruning filters.
64   *
65   * @param filters partition-pruning filters
66   */
67  def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
68    if (table.partitionColumnNames.nonEmpty) {
69      val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
70        table.identifier, filters)
71      val partitions = selectedPartitions.map { p =>
72        val path = new Path(p.location)
73        val fs = path.getFileSystem(hadoopConf)
74        PartitionPath(
75          p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))
76      }
77      val partitionSpec = PartitionSpec(partitionSchema, partitions)
78      new PrunedInMemoryFileIndex(
79        sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
80    } else {
81      new InMemoryFileIndex(
82        sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
83    }
84  }
85
86  override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
87
88  // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
89  // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
90  // implement `equals` and `hashCode` here, to make it work with cache lookup.
91  override def equals(o: Any): Boolean = o match {
92    case other: CatalogFileIndex => this.table.identifier == other.table.identifier
93    case _ => false
94  }
95
96  override def hashCode(): Int = table.identifier.hashCode()
97}
98
99/**
100 * An override of the standard HDFS listing based catalog, that overrides the partition spec with
101 * the information from the metastore.
102 *
103 * @param tableBasePath The default base path of the Hive metastore table
104 * @param partitionSpec The partition specifications from Hive metastore
105 */
106private class PrunedInMemoryFileIndex(
107    sparkSession: SparkSession,
108    tableBasePath: Path,
109    fileStatusCache: FileStatusCache,
110    override val partitionSpec: PartitionSpec)
111  extends InMemoryFileIndex(
112    sparkSession,
113    partitionSpec.partitions.map(_.path),
114    Map.empty,
115    Some(partitionSpec.partitionColumns),
116    fileStatusCache)
117