1/*
2* Licensed to the Apache Software Foundation (ASF) under one or more
3* contributor license agreements.  See the NOTICE file distributed with
4* this work for additional information regarding copyright ownership.
5* The ASF licenses this file to You under the Apache License, Version 2.0
6* (the "License"); you may not use this file except in compliance with
7* the License.  You may obtain a copy of the License at
8*
9*    http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS,
13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15* limitations under the License.
16*/
17
18package org.apache.spark.sql.execution.streaming
19
20import java.io._
21import java.nio.charset.StandardCharsets
22import java.util.{ConcurrentModificationException, EnumSet, UUID}
23
24import scala.reflect.ClassTag
25
26import org.apache.commons.io.IOUtils
27import org.apache.hadoop.conf.Configuration
28import org.apache.hadoop.fs._
29import org.apache.hadoop.fs.permission.FsPermission
30import org.json4s.NoTypeHints
31import org.json4s.jackson.Serialization
32
33import org.apache.spark.internal.Logging
34import org.apache.spark.sql.SparkSession
35import org.apache.spark.util.UninterruptibleThread
36
37
38/**
39 * A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path`
40 * as the metadata storage.
41 *
42 * When writing a new batch, [[HDFSMetadataLog]] will firstly write to a temp file and then rename
43 * it to the final batch file. If the rename step fails, there must be multiple writers and only
44 * one of them will succeed and the others will fail.
45 *
46 * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
47 * files in a directory always shows the latest files.
48 */
49class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
50  extends MetadataLog[T] with Logging {
51
52  private implicit val formats = Serialization.formats(NoTypeHints)
53
54  /** Needed to serialize type T into JSON when using Jackson */
55  private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
56
57  // Avoid serializing generic sequences, see SPARK-17372
58  require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
59    "Should not create a log with type Seq, use Arrays instead - see SPARK-17372")
60
61  import HDFSMetadataLog._
62
63  val metadataPath = new Path(path)
64  protected val fileManager = createFileManager()
65
66  runUninterruptiblyIfLocal {
67    if (!fileManager.exists(metadataPath)) {
68      fileManager.mkdirs(metadataPath)
69    }
70  }
71
72  private def runUninterruptiblyIfLocal[T](body: => T): T = {
73    if (fileManager.isLocalFileSystem && Thread.currentThread.isInstanceOf[UninterruptibleThread]) {
74      // When using a local file system, some file system APIs like "create" or "mkdirs" must be
75      // called in [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be
76      // disabled.
77      //
78      // This is because there is a potential dead-lock in Hadoop "Shell.runCommand" before
79      // 2.5.0 (HADOOP-10622). If the thread running "Shell.runCommand" is interrupted, then
80      // the thread can get deadlocked. In our case, file system APIs like "create" or "mkdirs"
81      // will call "Shell.runCommand" to set the file permission if using the local file system,
82      // and can get deadlocked if the stream execution thread is stopped by interrupt.
83      //
84      // Hence, we use "runUninterruptibly" here to disable interrupts here. (SPARK-14131)
85      Thread.currentThread.asInstanceOf[UninterruptibleThread].runUninterruptibly {
86        body
87      }
88    } else {
89      // For a distributed file system, such as HDFS or S3, if the network is broken, write
90      // operations may just hang until timeout. We should enable interrupts to allow stopping
91      // the query fast.
92      body
93    }
94  }
95
96  /**
97   * A `PathFilter` to filter only batch files
98   */
99  protected val batchFilesFilter = new PathFilter {
100    override def accept(path: Path): Boolean = isBatchFile(path)
101  }
102
103  protected def batchIdToPath(batchId: Long): Path = {
104    new Path(metadataPath, batchId.toString)
105  }
106
107  protected def pathToBatchId(path: Path) = {
108    path.getName.toLong
109  }
110
111  protected def isBatchFile(path: Path) = {
112    try {
113      path.getName.toLong
114      true
115    } catch {
116      case _: NumberFormatException => false
117    }
118  }
119
120  protected def serialize(metadata: T, out: OutputStream): Unit = {
121    // called inside a try-finally where the underlying stream is closed in the caller
122    Serialization.write(metadata, out)
123  }
124
125  protected def deserialize(in: InputStream): T = {
126    // called inside a try-finally where the underlying stream is closed in the caller
127    val reader = new InputStreamReader(in, StandardCharsets.UTF_8)
128    Serialization.read[T](reader)
129  }
130
131  /**
132   * Store the metadata for the specified batchId and return `true` if successful. If the batchId's
133   * metadata has already been stored, this method will return `false`.
134   */
135  override def add(batchId: Long, metadata: T): Boolean = {
136    get(batchId).map(_ => false).getOrElse {
137      // Only write metadata when the batch has not yet been written
138      runUninterruptiblyIfLocal {
139        writeBatch(batchId, metadata)
140      }
141      true
142    }
143  }
144
145  private def writeTempBatch(metadata: T): Option[Path] = {
146    while (true) {
147      val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
148      try {
149        val output = fileManager.create(tempPath)
150        try {
151          serialize(metadata, output)
152          return Some(tempPath)
153        } finally {
154          IOUtils.closeQuietly(output)
155        }
156      } catch {
157        case e: IOException if isFileAlreadyExistsException(e) =>
158          // Failed to create "tempPath". There are two cases:
159          // 1. Someone is creating "tempPath" too.
160          // 2. This is a restart. "tempPath" has already been created but not moved to the final
161          // batch file (not committed).
162          //
163          // For both cases, the batch has not yet been committed. So we can retry it.
164          //
165          // Note: there is a potential risk here: if HDFSMetadataLog A is running, people can use
166          // the same metadata path to create "HDFSMetadataLog" and fail A. However, this is not a
167          // big problem because it requires the attacker must have the permission to write the
168          // metadata path. In addition, the old Streaming also have this issue, people can create
169          // malicious checkpoint files to crash a Streaming application too.
170      }
171    }
172    None
173  }
174
175  /**
176   * Write a batch to a temp file then rename it to the batch file.
177   *
178   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
179   * valid behavior, we still need to prevent it from destroying the files.
180   */
181  private def writeBatch(batchId: Long, metadata: T): Unit = {
182    val tempPath = writeTempBatch(metadata).getOrElse(
183      throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
184    try {
185      // Try to commit the batch
186      // It will fail if there is an existing file (someone has committed the batch)
187      logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
188      fileManager.rename(tempPath, batchIdToPath(batchId))
189
190      // SPARK-17475: HDFSMetadataLog should not leak CRC files
191      // If the underlying filesystem didn't rename the CRC file, delete it.
192      val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
193      if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
194    } catch {
195      case e: IOException if isFileAlreadyExistsException(e) =>
196        // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
197        // So throw an exception to tell the user this is not a valid behavior.
198        throw new ConcurrentModificationException(
199          s"Multiple HDFSMetadataLog are using $path", e)
200    } finally {
201      fileManager.delete(tempPath)
202    }
203  }
204
205  private def isFileAlreadyExistsException(e: IOException): Boolean = {
206    e.isInstanceOf[FileAlreadyExistsException] ||
207      // Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
208      // HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop versions.
209      (e.getMessage != null && e.getMessage.startsWith("File already exists: "))
210  }
211
212  /**
213   * @return the deserialized metadata in a batch file, or None if file not exist.
214   * @throws IllegalArgumentException when path does not point to a batch file.
215   */
216  def get(batchFile: Path): Option[T] = {
217    if (fileManager.exists(batchFile)) {
218      if (isBatchFile(batchFile)) {
219        get(pathToBatchId(batchFile))
220      } else {
221        throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!")
222      }
223    } else {
224      None
225    }
226  }
227
228  override def get(batchId: Long): Option[T] = {
229    val batchMetadataFile = batchIdToPath(batchId)
230    if (fileManager.exists(batchMetadataFile)) {
231      val input = fileManager.open(batchMetadataFile)
232      try {
233        Some(deserialize(input))
234      } catch {
235        case ise: IllegalStateException =>
236          // re-throw the exception with the log file path added
237          throw new IllegalStateException(
238            s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
239      } finally {
240        IOUtils.closeQuietly(input)
241      }
242    } else {
243      logDebug(s"Unable to find batch $batchMetadataFile")
244      None
245    }
246  }
247
248  override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
249    val files = fileManager.list(metadataPath, batchFilesFilter)
250    val batchIds = files
251      .map(f => pathToBatchId(f.getPath))
252      .filter { batchId =>
253        (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
254    }
255    batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
256      case (batchId, metadataOption) =>
257        (batchId, metadataOption.get)
258    }
259  }
260
261  override def getLatest(): Option[(Long, T)] = {
262    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
263      .map(f => pathToBatchId(f.getPath))
264      .sorted
265      .reverse
266    for (batchId <- batchIds) {
267      val batch = get(batchId)
268      if (batch.isDefined) {
269        return Some((batchId, batch.get))
270      }
271    }
272    None
273  }
274
275  /**
276   * Get an array of [FileStatus] referencing batch files.
277   * The array is sorted by most recent batch file first to
278   * oldest batch file.
279   */
280  def getOrderedBatchFiles(): Array[FileStatus] = {
281    fileManager.list(metadataPath, batchFilesFilter)
282      .sortBy(f => pathToBatchId(f.getPath))
283      .reverse
284  }
285
286  /**
287   * Removes all the log entry earlier than thresholdBatchId (exclusive).
288   */
289  override def purge(thresholdBatchId: Long): Unit = {
290    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
291      .map(f => pathToBatchId(f.getPath))
292
293    for (batchId <- batchIds if batchId < thresholdBatchId) {
294      val path = batchIdToPath(batchId)
295      fileManager.delete(path)
296      logTrace(s"Removed metadata log file: $path")
297    }
298  }
299
300  private def createFileManager(): FileManager = {
301    val hadoopConf = sparkSession.sessionState.newHadoopConf()
302    try {
303      new FileContextManager(metadataPath, hadoopConf)
304    } catch {
305      case e: UnsupportedFileSystemException =>
306        logWarning("Could not use FileContext API for managing metadata log files at path " +
307          s"$metadataPath. Using FileSystem API instead for managing log files. The log may be " +
308          s"inconsistent under failures.")
309        new FileSystemManager(metadataPath, hadoopConf)
310    }
311  }
312
313  /**
314   * Parse the log version from the given `text` -- will throw exception when the parsed version
315   * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1",
316   * "v123xyz" etc.)
317   */
318  private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = {
319    if (text.length > 0 && text(0) == 'v') {
320      val version =
321        try {
322          text.substring(1, text.length).toInt
323        } catch {
324          case _: NumberFormatException =>
325            throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
326              s"version from $text.")
327        }
328      if (version > 0) {
329        if (version > maxSupportedVersion) {
330          throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
331            s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
332            s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
333        } else {
334          return version
335        }
336      }
337    }
338
339    // reaching here means we failed to read the correct log version
340    throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
341      s"version from $text.")
342  }
343}
344
345object HDFSMetadataLog {
346
347  /** A simple trait to abstract out the file management operations needed by HDFSMetadataLog. */
348  trait FileManager {
349
350    /** List the files in a path that matches a filter. */
351    def list(path: Path, filter: PathFilter): Array[FileStatus]
352
353    /** Make directory at the give path and all its parent directories as needed. */
354    def mkdirs(path: Path): Unit
355
356    /** Whether path exists */
357    def exists(path: Path): Boolean
358
359    /** Open a file for reading, or throw exception if it does not exist. */
360    def open(path: Path): FSDataInputStream
361
362    /** Create path, or throw exception if it already exists */
363    def create(path: Path): FSDataOutputStream
364
365    /**
366     * Atomically rename path, or throw exception if it cannot be done.
367     * Should throw FileNotFoundException if srcPath does not exist.
368     * Should throw FileAlreadyExistsException if destPath already exists.
369     */
370    def rename(srcPath: Path, destPath: Path): Unit
371
372    /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
373    def delete(path: Path): Unit
374
375    /** Whether the file systme is a local FS. */
376    def isLocalFileSystem: Boolean
377  }
378
379  /**
380   * Default implementation of FileManager using newer FileContext API.
381   */
382  class FileContextManager(path: Path, hadoopConf: Configuration) extends FileManager {
383    private val fc = if (path.toUri.getScheme == null) {
384      FileContext.getFileContext(hadoopConf)
385    } else {
386      FileContext.getFileContext(path.toUri, hadoopConf)
387    }
388
389    override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
390      fc.util.listStatus(path, filter)
391    }
392
393    override def rename(srcPath: Path, destPath: Path): Unit = {
394      fc.rename(srcPath, destPath)
395    }
396
397    override def mkdirs(path: Path): Unit = {
398      fc.mkdir(path, FsPermission.getDirDefault, true)
399    }
400
401    override def open(path: Path): FSDataInputStream = {
402      fc.open(path)
403    }
404
405    override def create(path: Path): FSDataOutputStream = {
406      fc.create(path, EnumSet.of(CreateFlag.CREATE))
407    }
408
409    override def exists(path: Path): Boolean = {
410      fc.util().exists(path)
411    }
412
413    override def delete(path: Path): Unit = {
414      try {
415        fc.delete(path, true)
416      } catch {
417        case e: FileNotFoundException =>
418        // ignore if file has already been deleted
419      }
420    }
421
422    override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
423      case _: local.LocalFs | _: local.RawLocalFs =>
424        // LocalFs = RawLocalFs + ChecksumFs
425        true
426      case _ => false
427    }
428  }
429
430  /**
431   * Implementation of FileManager using older FileSystem API. Note that this implementation
432   * cannot provide atomic renaming of paths, hence can lead to consistency issues. This
433   * should be used only as a backup option, when FileContextManager cannot be used.
434   */
435  class FileSystemManager(path: Path, hadoopConf: Configuration) extends FileManager {
436    private val fs = path.getFileSystem(hadoopConf)
437
438    override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
439      fs.listStatus(path, filter)
440    }
441
442    /**
443     * Rename a path. Note that this implementation is not atomic.
444     * @throws FileNotFoundException if source path does not exist.
445     * @throws FileAlreadyExistsException if destination path already exists.
446     * @throws IOException if renaming fails for some unknown reason.
447     */
448    override def rename(srcPath: Path, destPath: Path): Unit = {
449      if (!fs.exists(srcPath)) {
450        throw new FileNotFoundException(s"Source path does not exist: $srcPath")
451      }
452      if (fs.exists(destPath)) {
453        throw new FileAlreadyExistsException(s"Destination path already exists: $destPath")
454      }
455      if (!fs.rename(srcPath, destPath)) {
456        throw new IOException(s"Failed to rename $srcPath to $destPath")
457      }
458    }
459
460    override def mkdirs(path: Path): Unit = {
461      fs.mkdirs(path, FsPermission.getDirDefault)
462    }
463
464    override def open(path: Path): FSDataInputStream = {
465      fs.open(path)
466    }
467
468    override def create(path: Path): FSDataOutputStream = {
469      fs.create(path, false)
470    }
471
472    override def exists(path: Path): Boolean = {
473      fs.exists(path)
474    }
475
476    override def delete(path: Path): Unit = {
477      try {
478        fs.delete(path, true)
479      } catch {
480        case e: FileNotFoundException =>
481          // ignore if file has already been deleted
482      }
483    }
484
485    override def isLocalFileSystem: Boolean = fs match {
486      case _: LocalFileSystem | _: RawLocalFileSystem =>
487        // LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
488        true
489      case _ => false
490    }
491  }
492}
493