1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.sql.execution.datasources
19
20import java.io.FileNotFoundException
21
22import scala.collection.mutable
23
24import org.apache.hadoop.conf.Configuration
25import org.apache.hadoop.fs._
26import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
27
28import org.apache.spark.internal.Logging
29import org.apache.spark.metrics.source.HiveCatalogMetrics
30import org.apache.spark.sql.SparkSession
31import org.apache.spark.sql.catalyst.{expressions, InternalRow}
32import org.apache.spark.sql.catalyst.expressions._
33import org.apache.spark.sql.types.{StringType, StructType}
34import org.apache.spark.util.SerializableConfiguration
35
36/**
37 * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables.
38 * It provides the necessary methods to parse partition data based on a set of files.
39 *
40 * @param parameters as set of options to control partition discovery
41 * @param userPartitionSchema an optional partition schema that will be use to provide types for
42 *                            the discovered partitions
43 */
44abstract class PartitioningAwareFileIndex(
45    sparkSession: SparkSession,
46    parameters: Map[String, String],
47    userPartitionSchema: Option[StructType],
48    fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
49  import PartitioningAwareFileIndex.BASE_PATH_PARAM
50
51  /** Returns the specification of the partitions inferred from the data. */
52  def partitionSpec(): PartitionSpec
53
54  override def partitionSchema: StructType = partitionSpec().partitionColumns
55
56  protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
57
58  protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
59
60  protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
61
62  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
63    val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
64      PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
65    } else {
66      prunePartitions(filters, partitionSpec()).map {
67        case PartitionPath(values, path) =>
68          val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
69            case Some(existingDir) =>
70              // Directory has children files in it, return them
71              existingDir.filter(f => isDataPath(f.getPath))
72
73            case None =>
74              // Directory does not exist, or has no children files
75              Nil
76          }
77          PartitionDirectory(values, files)
78      }
79    }
80    logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
81    selectedPartitions
82  }
83
84  /** Returns the list of files that will be read when scanning this relation. */
85  override def inputFiles: Array[String] =
86    allFiles().map(_.getPath.toUri.toString).toArray
87
88  override def sizeInBytes: Long = allFiles().map(_.getLen).sum
89
90  def allFiles(): Seq[FileStatus] = {
91    if (partitionSpec().partitionColumns.isEmpty) {
92      // For each of the root input paths, get the list of files inside them
93      rootPaths.flatMap { path =>
94        // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
95        val fs = path.getFileSystem(hadoopConf)
96        val qualifiedPathPre = fs.makeQualified(path)
97        val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
98          // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
99          // because the `leafFile.getParent` would have returned an absolute path with the
100          // separator at the end.
101          new Path(qualifiedPathPre, Path.SEPARATOR)
102        } else {
103          qualifiedPathPre
104        }
105
106        // There are three cases possible with each path
107        // 1. The path is a directory and has children files in it. Then it must be present in
108        //    leafDirToChildrenFiles as those children files will have been found as leaf files.
109        //    Find its children files from leafDirToChildrenFiles and include them.
110        // 2. The path is a file, then it will be present in leafFiles. Include this path.
111        // 3. The path is a directory, but has no children files. Do not include this path.
112
113        leafDirToChildrenFiles.get(qualifiedPath)
114          .orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
115          .getOrElse(Array.empty)
116      }
117    } else {
118      leafFiles.values.toSeq
119    }
120  }
121
122  protected def inferPartitioning(): PartitionSpec = {
123    // We use leaf dirs containing data files to discover the schema.
124    val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
125      files.exists(f => isDataPath(f.getPath))
126    }.keys.toSeq
127    userPartitionSchema match {
128      case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
129        val spec = PartitioningUtils.parsePartitions(
130          leafDirs,
131          typeInference = false,
132          basePaths = basePaths)
133
134        // Without auto inference, all of value in the `row` should be null or in StringType,
135        // we need to cast into the data type that user specified.
136        def castPartitionValuesToUserSchema(row: InternalRow) = {
137          InternalRow((0 until row.numFields).map { i =>
138            Cast(
139              Literal.create(row.getUTF8String(i), StringType),
140              userProvidedSchema.fields(i).dataType).eval()
141          }: _*)
142        }
143
144        PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
145          part.copy(values = castPartitionValuesToUserSchema(part.values))
146        })
147      case _ =>
148        PartitioningUtils.parsePartitions(
149          leafDirs,
150          typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
151          basePaths = basePaths)
152    }
153  }
154
155  private def prunePartitions(
156      predicates: Seq[Expression],
157      partitionSpec: PartitionSpec): Seq[PartitionPath] = {
158    val PartitionSpec(partitionColumns, partitions) = partitionSpec
159    val partitionColumnNames = partitionColumns.map(_.name).toSet
160    val partitionPruningPredicates = predicates.filter {
161      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
162    }
163
164    if (partitionPruningPredicates.nonEmpty) {
165      val predicate = partitionPruningPredicates.reduce(expressions.And)
166
167      val boundPredicate = InterpretedPredicate.create(predicate.transform {
168        case a: AttributeReference =>
169          val index = partitionColumns.indexWhere(a.name == _.name)
170          BoundReference(index, partitionColumns(index).dataType, nullable = true)
171      })
172
173      val selected = partitions.filter {
174        case PartitionPath(values, _) => boundPredicate(values)
175      }
176      logInfo {
177        val total = partitions.length
178        val selectedSize = selected.length
179        val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
180        s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
181      }
182
183      selected
184    } else {
185      partitions
186    }
187  }
188
189  /**
190   * Contains a set of paths that are considered as the base dirs of the input datasets.
191   * The partitioning discovery logic will make sure it will stop when it reaches any
192   * base path.
193   *
194   * By default, the paths of the dataset provided by users will be base paths.
195   * Below are three typical examples,
196   * Case 1) `spark.read.parquet("/path/something=true/")`: the base path will be
197   * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`.
198   * Case 2) `spark.read.parquet("/path/something=true/a.parquet")`: the base path will be
199   * still `/path/something=true/`, and the returned DataFrame will also not contain a column of
200   * `something`.
201   * Case 3) `spark.read.parquet("/path/")`: the base path will be `/path/`, and the returned
202   * DataFrame will have the column of `something`.
203   *
204   * Users also can override the basePath by setting `basePath` in the options to pass the new base
205   * path to the data source.
206   * For example, `spark.read.option("basePath", "/path/").parquet("/path/something=true/")`,
207   * and the returned DataFrame will have the column of `something`.
208   */
209  private def basePaths: Set[Path] = {
210    parameters.get(BASE_PATH_PARAM).map(new Path(_)) match {
211      case Some(userDefinedBasePath) =>
212        val fs = userDefinedBasePath.getFileSystem(hadoopConf)
213        if (!fs.isDirectory(userDefinedBasePath)) {
214          throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory")
215        }
216        Set(fs.makeQualified(userDefinedBasePath))
217
218      case None =>
219        rootPaths.map { path =>
220          // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
221          val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
222          if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
223    }
224  }
225
226  // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
227  // counted as data files, so that they shouldn't participate partition discovery.
228  private def isDataPath(path: Path): Boolean = {
229    val name = path.getName
230    !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
231  }
232
233  /**
234   * List leaf files of given paths. This method will submit a Spark job to do parallel
235   * listing whenever there is a path having more files than the parallel partition discovery
236   * discovery threshold.
237   *
238   * This is publicly visible for testing.
239   */
240  def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
241    val output = mutable.LinkedHashSet[FileStatus]()
242    val pathsToFetch = mutable.ArrayBuffer[Path]()
243    for (path <- paths) {
244      fileStatusCache.getLeafFiles(path) match {
245        case Some(files) =>
246          HiveCatalogMetrics.incrementFileCacheHits(files.length)
247          output ++= files
248        case None =>
249          pathsToFetch += path
250      }
251    }
252    val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
253    val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
254      pathsToFetch, hadoopConf, filter, sparkSession)
255    discovered.foreach { case (path, leafFiles) =>
256      HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
257      fileStatusCache.putLeafFiles(path, leafFiles.toArray)
258      output ++= leafFiles
259    }
260    output
261  }
262}
263
264object PartitioningAwareFileIndex extends Logging {
265  val BASE_PATH_PARAM = "basePath"
266
267  /** A serializable variant of HDFS's BlockLocation. */
268  private case class SerializableBlockLocation(
269      names: Array[String],
270      hosts: Array[String],
271      offset: Long,
272      length: Long)
273
274  /** A serializable variant of HDFS's FileStatus. */
275  private case class SerializableFileStatus(
276      path: String,
277      length: Long,
278      isDir: Boolean,
279      blockReplication: Short,
280      blockSize: Long,
281      modificationTime: Long,
282      accessTime: Long,
283      blockLocations: Array[SerializableBlockLocation])
284
285  /**
286   * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
287   * on the number of paths to list.
288   *
289   * This may only be called on the driver.
290   *
291   * @return for each input path, the set of discovered files for the path
292   */
293  private def bulkListLeafFiles(
294      paths: Seq[Path],
295      hadoopConf: Configuration,
296      filter: PathFilter,
297      sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
298
299    // Short-circuits parallel listing when serial listing is likely to be faster.
300    if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
301      return paths.map { path =>
302        (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
303      }
304    }
305
306    logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
307    HiveCatalogMetrics.incrementParallelListingJobCount(1)
308
309    val sparkContext = sparkSession.sparkContext
310    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
311    val serializedPaths = paths.map(_.toString)
312    val parallelPartitionDiscoveryParallelism =
313      sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
314
315    // Set the number of parallelism to prevent following file listing from generating many tasks
316    // in case of large #defaultParallelism.
317    val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
318
319    val statusMap = sparkContext
320      .parallelize(serializedPaths, numParallelism)
321      .mapPartitions { pathStrings =>
322        val hadoopConf = serializableConfiguration.value
323        pathStrings.map(new Path(_)).toSeq.map { path =>
324          (path, listLeafFiles(path, hadoopConf, filter, None))
325        }.iterator
326      }.map { case (path, statuses) =>
327        val serializableStatuses = statuses.map { status =>
328          // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
329          val blockLocations = status match {
330            case f: LocatedFileStatus =>
331              f.getBlockLocations.map { loc =>
332                SerializableBlockLocation(
333                  loc.getNames,
334                  loc.getHosts,
335                  loc.getOffset,
336                  loc.getLength)
337              }
338
339            case _ =>
340              Array.empty[SerializableBlockLocation]
341          }
342
343          SerializableFileStatus(
344            status.getPath.toString,
345            status.getLen,
346            status.isDirectory,
347            status.getReplication,
348            status.getBlockSize,
349            status.getModificationTime,
350            status.getAccessTime,
351            blockLocations)
352        }
353        (path.toString, serializableStatuses)
354      }.collect()
355
356    // turn SerializableFileStatus back to Status
357    statusMap.map { case (path, serializableStatuses) =>
358      val statuses = serializableStatuses.map { f =>
359        val blockLocations = f.blockLocations.map { loc =>
360          new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
361        }
362        new LocatedFileStatus(
363          new FileStatus(
364            f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
365            new Path(f.path)),
366          blockLocations)
367      }
368      (new Path(path), statuses)
369    }
370  }
371
372  /**
373   * Lists a single filesystem path recursively. If a SparkSession object is specified, this
374   * function may launch Spark jobs to parallelize listing.
375   *
376   * If sessionOpt is None, this may be called on executors.
377   *
378   * @return all children of path that match the specified filter.
379   */
380  private def listLeafFiles(
381      path: Path,
382      hadoopConf: Configuration,
383      filter: PathFilter,
384      sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
385    logTrace(s"Listing $path")
386    val fs = path.getFileSystem(hadoopConf)
387    val name = path.getName.toLowerCase
388    if (shouldFilterOut(name)) {
389      Seq.empty[FileStatus]
390    } else {
391      // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
392      // Note that statuses only include FileStatus for the files and dirs directly under path,
393      // and does not include anything else recursively.
394      val statuses = try fs.listStatus(path) catch {
395        case _: FileNotFoundException =>
396          logWarning(s"The directory $path was not found. Was it deleted very recently?")
397          Array.empty[FileStatus]
398      }
399
400      val allLeafStatuses = {
401        val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
402        val nestedFiles: Seq[FileStatus] = sessionOpt match {
403          case Some(session) =>
404            bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
405          case _ =>
406            dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
407        }
408        val allFiles = topLevelFiles ++ nestedFiles
409        if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
410      }
411
412      allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
413        case f: LocatedFileStatus =>
414          f
415
416        // NOTE:
417        //
418        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
419        //   operations, calling `getFileBlockLocations` does no harm here since these file system
420        //   implementations don't actually issue RPC for this method.
421        //
422        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
423        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
424        //   paths exceeds threshold.
425        case f =>
426          // The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
427          // which is very slow on some file system (RawLocalFileSystem, which is launch a
428          // subprocess and parse the stdout).
429          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
430          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
431            f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
432          if (f.isSymlink) {
433            lfs.setSymlink(f.getSymlink)
434          }
435          lfs
436      }
437    }
438  }
439
440  /** Checks if we should filter out this path name. */
441  def shouldFilterOut(pathName: String): Boolean = {
442    // We filter everything that starts with _ and ., except _common_metadata and _metadata
443    // because Parquet needs to find those metadata files from leaf files returned by this method.
444    // We should refactor this logic to not mix metadata files with data files.
445    ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) &&
446      !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata")
447  }
448}
449