1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.datasources 19 20import org.apache.hadoop.conf.Configuration 21import org.apache.hadoop.fs.Path 22 23import org.apache.spark.sql.SparkSession 24import org.apache.spark.sql.catalyst.catalog.CatalogTable 25import org.apache.spark.sql.catalyst.expressions._ 26import org.apache.spark.sql.types.StructType 27 28 29/** 30 * A [[FileIndex]] for a metastore catalog table. 31 * 32 * @param sparkSession a [[SparkSession]] 33 * @param table the metadata of the table 34 * @param sizeInBytes the table's data size in bytes 35 */ 36class CatalogFileIndex( 37 sparkSession: SparkSession, 38 val table: CatalogTable, 39 override val sizeInBytes: Long) extends FileIndex { 40 41 protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf() 42 43 /** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */ 44 private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) 45 46 assert(table.identifier.database.isDefined, 47 "The table identifier must be qualified in CatalogFileIndex") 48 49 private val baseLocation: Option[String] = table.storage.locationUri 50 51 override def partitionSchema: StructType = table.partitionSchema 52 53 override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq 54 55 override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { 56 filterPartitions(filters).listFiles(Nil) 57 } 58 59 override def refresh(): Unit = fileStatusCache.invalidateAll() 60 61 /** 62 * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions 63 * specified by the given partition-pruning filters. 64 * 65 * @param filters partition-pruning filters 66 */ 67 def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { 68 if (table.partitionColumnNames.nonEmpty) { 69 val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( 70 table.identifier, filters) 71 val partitions = selectedPartitions.map { p => 72 val path = new Path(p.location) 73 val fs = path.getFileSystem(hadoopConf) 74 PartitionPath( 75 p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory)) 76 } 77 val partitionSpec = PartitionSpec(partitionSchema, partitions) 78 new PrunedInMemoryFileIndex( 79 sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) 80 } else { 81 new InMemoryFileIndex( 82 sparkSession, rootPaths, table.storage.properties, partitionSchema = None) 83 } 84 } 85 86 override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles 87 88 // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member 89 // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to 90 // implement `equals` and `hashCode` here, to make it work with cache lookup. 91 override def equals(o: Any): Boolean = o match { 92 case other: CatalogFileIndex => this.table.identifier == other.table.identifier 93 case _ => false 94 } 95 96 override def hashCode(): Int = table.identifier.hashCode() 97} 98 99/** 100 * An override of the standard HDFS listing based catalog, that overrides the partition spec with 101 * the information from the metastore. 102 * 103 * @param tableBasePath The default base path of the Hive metastore table 104 * @param partitionSpec The partition specifications from Hive metastore 105 */ 106private class PrunedInMemoryFileIndex( 107 sparkSession: SparkSession, 108 tableBasePath: Path, 109 fileStatusCache: FileStatusCache, 110 override val partitionSpec: PartitionSpec) 111 extends InMemoryFileIndex( 112 sparkSession, 113 partitionSpec.partitions.map(_.path), 114 Map.empty, 115 Some(partitionSpec.partitionColumns), 116 fileStatusCache) 117