1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.sql.execution.datasources 19 20import java.io.FileNotFoundException 21 22import scala.collection.mutable 23 24import org.apache.hadoop.conf.Configuration 25import org.apache.hadoop.fs._ 26import org.apache.hadoop.mapred.{FileInputFormat, JobConf} 27 28import org.apache.spark.internal.Logging 29import org.apache.spark.metrics.source.HiveCatalogMetrics 30import org.apache.spark.sql.SparkSession 31import org.apache.spark.sql.catalyst.{expressions, InternalRow} 32import org.apache.spark.sql.catalyst.expressions._ 33import org.apache.spark.sql.types.{StringType, StructType} 34import org.apache.spark.util.SerializableConfiguration 35 36/** 37 * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables. 38 * It provides the necessary methods to parse partition data based on a set of files. 39 * 40 * @param parameters as set of options to control partition discovery 41 * @param userPartitionSchema an optional partition schema that will be use to provide types for 42 * the discovered partitions 43 */ 44abstract class PartitioningAwareFileIndex( 45 sparkSession: SparkSession, 46 parameters: Map[String, String], 47 userPartitionSchema: Option[StructType], 48 fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging { 49 import PartitioningAwareFileIndex.BASE_PATH_PARAM 50 51 /** Returns the specification of the partitions inferred from the data. */ 52 def partitionSpec(): PartitionSpec 53 54 override def partitionSchema: StructType = partitionSpec().partitionColumns 55 56 protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) 57 58 protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] 59 60 protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] 61 62 override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { 63 val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { 64 PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil 65 } else { 66 prunePartitions(filters, partitionSpec()).map { 67 case PartitionPath(values, path) => 68 val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { 69 case Some(existingDir) => 70 // Directory has children files in it, return them 71 existingDir.filter(f => isDataPath(f.getPath)) 72 73 case None => 74 // Directory does not exist, or has no children files 75 Nil 76 } 77 PartitionDirectory(values, files) 78 } 79 } 80 logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) 81 selectedPartitions 82 } 83 84 /** Returns the list of files that will be read when scanning this relation. */ 85 override def inputFiles: Array[String] = 86 allFiles().map(_.getPath.toUri.toString).toArray 87 88 override def sizeInBytes: Long = allFiles().map(_.getLen).sum 89 90 def allFiles(): Seq[FileStatus] = { 91 if (partitionSpec().partitionColumns.isEmpty) { 92 // For each of the root input paths, get the list of files inside them 93 rootPaths.flatMap { path => 94 // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). 95 val fs = path.getFileSystem(hadoopConf) 96 val qualifiedPathPre = fs.makeQualified(path) 97 val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) { 98 // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories, 99 // because the `leafFile.getParent` would have returned an absolute path with the 100 // separator at the end. 101 new Path(qualifiedPathPre, Path.SEPARATOR) 102 } else { 103 qualifiedPathPre 104 } 105 106 // There are three cases possible with each path 107 // 1. The path is a directory and has children files in it. Then it must be present in 108 // leafDirToChildrenFiles as those children files will have been found as leaf files. 109 // Find its children files from leafDirToChildrenFiles and include them. 110 // 2. The path is a file, then it will be present in leafFiles. Include this path. 111 // 3. The path is a directory, but has no children files. Do not include this path. 112 113 leafDirToChildrenFiles.get(qualifiedPath) 114 .orElse { leafFiles.get(qualifiedPath).map(Array(_)) } 115 .getOrElse(Array.empty) 116 } 117 } else { 118 leafFiles.values.toSeq 119 } 120 } 121 122 protected def inferPartitioning(): PartitionSpec = { 123 // We use leaf dirs containing data files to discover the schema. 124 val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => 125 files.exists(f => isDataPath(f.getPath)) 126 }.keys.toSeq 127 userPartitionSchema match { 128 case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => 129 val spec = PartitioningUtils.parsePartitions( 130 leafDirs, 131 typeInference = false, 132 basePaths = basePaths) 133 134 // Without auto inference, all of value in the `row` should be null or in StringType, 135 // we need to cast into the data type that user specified. 136 def castPartitionValuesToUserSchema(row: InternalRow) = { 137 InternalRow((0 until row.numFields).map { i => 138 Cast( 139 Literal.create(row.getUTF8String(i), StringType), 140 userProvidedSchema.fields(i).dataType).eval() 141 }: _*) 142 } 143 144 PartitionSpec(userProvidedSchema, spec.partitions.map { part => 145 part.copy(values = castPartitionValuesToUserSchema(part.values)) 146 }) 147 case _ => 148 PartitioningUtils.parsePartitions( 149 leafDirs, 150 typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, 151 basePaths = basePaths) 152 } 153 } 154 155 private def prunePartitions( 156 predicates: Seq[Expression], 157 partitionSpec: PartitionSpec): Seq[PartitionPath] = { 158 val PartitionSpec(partitionColumns, partitions) = partitionSpec 159 val partitionColumnNames = partitionColumns.map(_.name).toSet 160 val partitionPruningPredicates = predicates.filter { 161 _.references.map(_.name).toSet.subsetOf(partitionColumnNames) 162 } 163 164 if (partitionPruningPredicates.nonEmpty) { 165 val predicate = partitionPruningPredicates.reduce(expressions.And) 166 167 val boundPredicate = InterpretedPredicate.create(predicate.transform { 168 case a: AttributeReference => 169 val index = partitionColumns.indexWhere(a.name == _.name) 170 BoundReference(index, partitionColumns(index).dataType, nullable = true) 171 }) 172 173 val selected = partitions.filter { 174 case PartitionPath(values, _) => boundPredicate(values) 175 } 176 logInfo { 177 val total = partitions.length 178 val selectedSize = selected.length 179 val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100 180 s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions." 181 } 182 183 selected 184 } else { 185 partitions 186 } 187 } 188 189 /** 190 * Contains a set of paths that are considered as the base dirs of the input datasets. 191 * The partitioning discovery logic will make sure it will stop when it reaches any 192 * base path. 193 * 194 * By default, the paths of the dataset provided by users will be base paths. 195 * Below are three typical examples, 196 * Case 1) `spark.read.parquet("/path/something=true/")`: the base path will be 197 * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`. 198 * Case 2) `spark.read.parquet("/path/something=true/a.parquet")`: the base path will be 199 * still `/path/something=true/`, and the returned DataFrame will also not contain a column of 200 * `something`. 201 * Case 3) `spark.read.parquet("/path/")`: the base path will be `/path/`, and the returned 202 * DataFrame will have the column of `something`. 203 * 204 * Users also can override the basePath by setting `basePath` in the options to pass the new base 205 * path to the data source. 206 * For example, `spark.read.option("basePath", "/path/").parquet("/path/something=true/")`, 207 * and the returned DataFrame will have the column of `something`. 208 */ 209 private def basePaths: Set[Path] = { 210 parameters.get(BASE_PATH_PARAM).map(new Path(_)) match { 211 case Some(userDefinedBasePath) => 212 val fs = userDefinedBasePath.getFileSystem(hadoopConf) 213 if (!fs.isDirectory(userDefinedBasePath)) { 214 throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") 215 } 216 Set(fs.makeQualified(userDefinedBasePath)) 217 218 case None => 219 rootPaths.map { path => 220 // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). 221 val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path) 222 if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet 223 } 224 } 225 226 // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be 227 // counted as data files, so that they shouldn't participate partition discovery. 228 private def isDataPath(path: Path): Boolean = { 229 val name = path.getName 230 !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) 231 } 232 233 /** 234 * List leaf files of given paths. This method will submit a Spark job to do parallel 235 * listing whenever there is a path having more files than the parallel partition discovery 236 * discovery threshold. 237 * 238 * This is publicly visible for testing. 239 */ 240 def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { 241 val output = mutable.LinkedHashSet[FileStatus]() 242 val pathsToFetch = mutable.ArrayBuffer[Path]() 243 for (path <- paths) { 244 fileStatusCache.getLeafFiles(path) match { 245 case Some(files) => 246 HiveCatalogMetrics.incrementFileCacheHits(files.length) 247 output ++= files 248 case None => 249 pathsToFetch += path 250 } 251 } 252 val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) 253 val discovered = PartitioningAwareFileIndex.bulkListLeafFiles( 254 pathsToFetch, hadoopConf, filter, sparkSession) 255 discovered.foreach { case (path, leafFiles) => 256 HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) 257 fileStatusCache.putLeafFiles(path, leafFiles.toArray) 258 output ++= leafFiles 259 } 260 output 261 } 262} 263 264object PartitioningAwareFileIndex extends Logging { 265 val BASE_PATH_PARAM = "basePath" 266 267 /** A serializable variant of HDFS's BlockLocation. */ 268 private case class SerializableBlockLocation( 269 names: Array[String], 270 hosts: Array[String], 271 offset: Long, 272 length: Long) 273 274 /** A serializable variant of HDFS's FileStatus. */ 275 private case class SerializableFileStatus( 276 path: String, 277 length: Long, 278 isDir: Boolean, 279 blockReplication: Short, 280 blockSize: Long, 281 modificationTime: Long, 282 accessTime: Long, 283 blockLocations: Array[SerializableBlockLocation]) 284 285 /** 286 * Lists a collection of paths recursively. Picks the listing strategy adaptively depending 287 * on the number of paths to list. 288 * 289 * This may only be called on the driver. 290 * 291 * @return for each input path, the set of discovered files for the path 292 */ 293 private def bulkListLeafFiles( 294 paths: Seq[Path], 295 hadoopConf: Configuration, 296 filter: PathFilter, 297 sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { 298 299 // Short-circuits parallel listing when serial listing is likely to be faster. 300 if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { 301 return paths.map { path => 302 (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession))) 303 } 304 } 305 306 logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") 307 HiveCatalogMetrics.incrementParallelListingJobCount(1) 308 309 val sparkContext = sparkSession.sparkContext 310 val serializableConfiguration = new SerializableConfiguration(hadoopConf) 311 val serializedPaths = paths.map(_.toString) 312 val parallelPartitionDiscoveryParallelism = 313 sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism 314 315 // Set the number of parallelism to prevent following file listing from generating many tasks 316 // in case of large #defaultParallelism. 317 val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) 318 319 val statusMap = sparkContext 320 .parallelize(serializedPaths, numParallelism) 321 .mapPartitions { pathStrings => 322 val hadoopConf = serializableConfiguration.value 323 pathStrings.map(new Path(_)).toSeq.map { path => 324 (path, listLeafFiles(path, hadoopConf, filter, None)) 325 }.iterator 326 }.map { case (path, statuses) => 327 val serializableStatuses = statuses.map { status => 328 // Turn FileStatus into SerializableFileStatus so we can send it back to the driver 329 val blockLocations = status match { 330 case f: LocatedFileStatus => 331 f.getBlockLocations.map { loc => 332 SerializableBlockLocation( 333 loc.getNames, 334 loc.getHosts, 335 loc.getOffset, 336 loc.getLength) 337 } 338 339 case _ => 340 Array.empty[SerializableBlockLocation] 341 } 342 343 SerializableFileStatus( 344 status.getPath.toString, 345 status.getLen, 346 status.isDirectory, 347 status.getReplication, 348 status.getBlockSize, 349 status.getModificationTime, 350 status.getAccessTime, 351 blockLocations) 352 } 353 (path.toString, serializableStatuses) 354 }.collect() 355 356 // turn SerializableFileStatus back to Status 357 statusMap.map { case (path, serializableStatuses) => 358 val statuses = serializableStatuses.map { f => 359 val blockLocations = f.blockLocations.map { loc => 360 new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) 361 } 362 new LocatedFileStatus( 363 new FileStatus( 364 f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, 365 new Path(f.path)), 366 blockLocations) 367 } 368 (new Path(path), statuses) 369 } 370 } 371 372 /** 373 * Lists a single filesystem path recursively. If a SparkSession object is specified, this 374 * function may launch Spark jobs to parallelize listing. 375 * 376 * If sessionOpt is None, this may be called on executors. 377 * 378 * @return all children of path that match the specified filter. 379 */ 380 private def listLeafFiles( 381 path: Path, 382 hadoopConf: Configuration, 383 filter: PathFilter, 384 sessionOpt: Option[SparkSession]): Seq[FileStatus] = { 385 logTrace(s"Listing $path") 386 val fs = path.getFileSystem(hadoopConf) 387 val name = path.getName.toLowerCase 388 if (shouldFilterOut(name)) { 389 Seq.empty[FileStatus] 390 } else { 391 // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist 392 // Note that statuses only include FileStatus for the files and dirs directly under path, 393 // and does not include anything else recursively. 394 val statuses = try fs.listStatus(path) catch { 395 case _: FileNotFoundException => 396 logWarning(s"The directory $path was not found. Was it deleted very recently?") 397 Array.empty[FileStatus] 398 } 399 400 val allLeafStatuses = { 401 val (dirs, topLevelFiles) = statuses.partition(_.isDirectory) 402 val nestedFiles: Seq[FileStatus] = sessionOpt match { 403 case Some(session) => 404 bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2) 405 case _ => 406 dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) 407 } 408 val allFiles = topLevelFiles ++ nestedFiles 409 if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles 410 } 411 412 allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { 413 case f: LocatedFileStatus => 414 f 415 416 // NOTE: 417 // 418 // - Although S3/S3A/S3N file system can be quite slow for remote file metadata 419 // operations, calling `getFileBlockLocations` does no harm here since these file system 420 // implementations don't actually issue RPC for this method. 421 // 422 // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not 423 // be a big deal since we always use to `listLeafFilesInParallel` when the number of 424 // paths exceeds threshold. 425 case f => 426 // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), 427 // which is very slow on some file system (RawLocalFileSystem, which is launch a 428 // subprocess and parse the stdout). 429 val locations = fs.getFileBlockLocations(f, 0, f.getLen) 430 val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, 431 f.getModificationTime, 0, null, null, null, null, f.getPath, locations) 432 if (f.isSymlink) { 433 lfs.setSymlink(f.getSymlink) 434 } 435 lfs 436 } 437 } 438 } 439 440 /** Checks if we should filter out this path name. */ 441 def shouldFilterOut(pathName: String): Boolean = { 442 // We filter everything that starts with _ and ., except _common_metadata and _metadata 443 // because Parquet needs to find those metadata files from leaf files returned by this method. 444 // We should refactor this logic to not mix metadata files with data files. 445 ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && 446 !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") 447 } 448} 449