1/* 2* Licensed to the Apache Software Foundation (ASF) under one or more 3* contributor license agreements. See the NOTICE file distributed with 4* this work for additional information regarding copyright ownership. 5* The ASF licenses this file to You under the Apache License, Version 2.0 6* (the "License"); you may not use this file except in compliance with 7* the License. You may obtain a copy of the License at 8* 9* http://www.apache.org/licenses/LICENSE-2.0 10* 11* Unless required by applicable law or agreed to in writing, software 12* distributed under the License is distributed on an "AS IS" BASIS, 13* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14* See the License for the specific language governing permissions and 15* limitations under the License. 16*/ 17 18package org.apache.spark.sql.execution.streaming 19 20import java.io._ 21import java.nio.charset.StandardCharsets 22import java.util.{ConcurrentModificationException, EnumSet, UUID} 23 24import scala.reflect.ClassTag 25 26import org.apache.commons.io.IOUtils 27import org.apache.hadoop.conf.Configuration 28import org.apache.hadoop.fs._ 29import org.apache.hadoop.fs.permission.FsPermission 30import org.json4s.NoTypeHints 31import org.json4s.jackson.Serialization 32 33import org.apache.spark.internal.Logging 34import org.apache.spark.sql.SparkSession 35import org.apache.spark.util.UninterruptibleThread 36 37 38/** 39 * A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path` 40 * as the metadata storage. 41 * 42 * When writing a new batch, [[HDFSMetadataLog]] will firstly write to a temp file and then rename 43 * it to the final batch file. If the rename step fails, there must be multiple writers and only 44 * one of them will succeed and the others will fail. 45 * 46 * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing 47 * files in a directory always shows the latest files. 48 */ 49class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String) 50 extends MetadataLog[T] with Logging { 51 52 private implicit val formats = Serialization.formats(NoTypeHints) 53 54 /** Needed to serialize type T into JSON when using Jackson */ 55 private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) 56 57 // Avoid serializing generic sequences, see SPARK-17372 58 require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]], 59 "Should not create a log with type Seq, use Arrays instead - see SPARK-17372") 60 61 import HDFSMetadataLog._ 62 63 val metadataPath = new Path(path) 64 protected val fileManager = createFileManager() 65 66 runUninterruptiblyIfLocal { 67 if (!fileManager.exists(metadataPath)) { 68 fileManager.mkdirs(metadataPath) 69 } 70 } 71 72 private def runUninterruptiblyIfLocal[T](body: => T): T = { 73 if (fileManager.isLocalFileSystem && Thread.currentThread.isInstanceOf[UninterruptibleThread]) { 74 // When using a local file system, some file system APIs like "create" or "mkdirs" must be 75 // called in [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be 76 // disabled. 77 // 78 // This is because there is a potential dead-lock in Hadoop "Shell.runCommand" before 79 // 2.5.0 (HADOOP-10622). If the thread running "Shell.runCommand" is interrupted, then 80 // the thread can get deadlocked. In our case, file system APIs like "create" or "mkdirs" 81 // will call "Shell.runCommand" to set the file permission if using the local file system, 82 // and can get deadlocked if the stream execution thread is stopped by interrupt. 83 // 84 // Hence, we use "runUninterruptibly" here to disable interrupts here. (SPARK-14131) 85 Thread.currentThread.asInstanceOf[UninterruptibleThread].runUninterruptibly { 86 body 87 } 88 } else { 89 // For a distributed file system, such as HDFS or S3, if the network is broken, write 90 // operations may just hang until timeout. We should enable interrupts to allow stopping 91 // the query fast. 92 body 93 } 94 } 95 96 /** 97 * A `PathFilter` to filter only batch files 98 */ 99 protected val batchFilesFilter = new PathFilter { 100 override def accept(path: Path): Boolean = isBatchFile(path) 101 } 102 103 protected def batchIdToPath(batchId: Long): Path = { 104 new Path(metadataPath, batchId.toString) 105 } 106 107 protected def pathToBatchId(path: Path) = { 108 path.getName.toLong 109 } 110 111 protected def isBatchFile(path: Path) = { 112 try { 113 path.getName.toLong 114 true 115 } catch { 116 case _: NumberFormatException => false 117 } 118 } 119 120 protected def serialize(metadata: T, out: OutputStream): Unit = { 121 // called inside a try-finally where the underlying stream is closed in the caller 122 Serialization.write(metadata, out) 123 } 124 125 protected def deserialize(in: InputStream): T = { 126 // called inside a try-finally where the underlying stream is closed in the caller 127 val reader = new InputStreamReader(in, StandardCharsets.UTF_8) 128 Serialization.read[T](reader) 129 } 130 131 /** 132 * Store the metadata for the specified batchId and return `true` if successful. If the batchId's 133 * metadata has already been stored, this method will return `false`. 134 */ 135 override def add(batchId: Long, metadata: T): Boolean = { 136 get(batchId).map(_ => false).getOrElse { 137 // Only write metadata when the batch has not yet been written 138 runUninterruptiblyIfLocal { 139 writeBatch(batchId, metadata) 140 } 141 true 142 } 143 } 144 145 private def writeTempBatch(metadata: T): Option[Path] = { 146 while (true) { 147 val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp") 148 try { 149 val output = fileManager.create(tempPath) 150 try { 151 serialize(metadata, output) 152 return Some(tempPath) 153 } finally { 154 IOUtils.closeQuietly(output) 155 } 156 } catch { 157 case e: IOException if isFileAlreadyExistsException(e) => 158 // Failed to create "tempPath". There are two cases: 159 // 1. Someone is creating "tempPath" too. 160 // 2. This is a restart. "tempPath" has already been created but not moved to the final 161 // batch file (not committed). 162 // 163 // For both cases, the batch has not yet been committed. So we can retry it. 164 // 165 // Note: there is a potential risk here: if HDFSMetadataLog A is running, people can use 166 // the same metadata path to create "HDFSMetadataLog" and fail A. However, this is not a 167 // big problem because it requires the attacker must have the permission to write the 168 // metadata path. In addition, the old Streaming also have this issue, people can create 169 // malicious checkpoint files to crash a Streaming application too. 170 } 171 } 172 None 173 } 174 175 /** 176 * Write a batch to a temp file then rename it to the batch file. 177 * 178 * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a 179 * valid behavior, we still need to prevent it from destroying the files. 180 */ 181 private def writeBatch(batchId: Long, metadata: T): Unit = { 182 val tempPath = writeTempBatch(metadata).getOrElse( 183 throw new IllegalStateException(s"Unable to create temp batch file $batchId")) 184 try { 185 // Try to commit the batch 186 // It will fail if there is an existing file (someone has committed the batch) 187 logDebug(s"Attempting to write log #${batchIdToPath(batchId)}") 188 fileManager.rename(tempPath, batchIdToPath(batchId)) 189 190 // SPARK-17475: HDFSMetadataLog should not leak CRC files 191 // If the underlying filesystem didn't rename the CRC file, delete it. 192 val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc") 193 if (fileManager.exists(crcPath)) fileManager.delete(crcPath) 194 } catch { 195 case e: IOException if isFileAlreadyExistsException(e) => 196 // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch. 197 // So throw an exception to tell the user this is not a valid behavior. 198 throw new ConcurrentModificationException( 199 s"Multiple HDFSMetadataLog are using $path", e) 200 } finally { 201 fileManager.delete(tempPath) 202 } 203 } 204 205 private def isFileAlreadyExistsException(e: IOException): Boolean = { 206 e.isInstanceOf[FileAlreadyExistsException] || 207 // Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in 208 // HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop versions. 209 (e.getMessage != null && e.getMessage.startsWith("File already exists: ")) 210 } 211 212 /** 213 * @return the deserialized metadata in a batch file, or None if file not exist. 214 * @throws IllegalArgumentException when path does not point to a batch file. 215 */ 216 def get(batchFile: Path): Option[T] = { 217 if (fileManager.exists(batchFile)) { 218 if (isBatchFile(batchFile)) { 219 get(pathToBatchId(batchFile)) 220 } else { 221 throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!") 222 } 223 } else { 224 None 225 } 226 } 227 228 override def get(batchId: Long): Option[T] = { 229 val batchMetadataFile = batchIdToPath(batchId) 230 if (fileManager.exists(batchMetadataFile)) { 231 val input = fileManager.open(batchMetadataFile) 232 try { 233 Some(deserialize(input)) 234 } catch { 235 case ise: IllegalStateException => 236 // re-throw the exception with the log file path added 237 throw new IllegalStateException( 238 s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise) 239 } finally { 240 IOUtils.closeQuietly(input) 241 } 242 } else { 243 logDebug(s"Unable to find batch $batchMetadataFile") 244 None 245 } 246 } 247 248 override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = { 249 val files = fileManager.list(metadataPath, batchFilesFilter) 250 val batchIds = files 251 .map(f => pathToBatchId(f.getPath)) 252 .filter { batchId => 253 (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) 254 } 255 batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { 256 case (batchId, metadataOption) => 257 (batchId, metadataOption.get) 258 } 259 } 260 261 override def getLatest(): Option[(Long, T)] = { 262 val batchIds = fileManager.list(metadataPath, batchFilesFilter) 263 .map(f => pathToBatchId(f.getPath)) 264 .sorted 265 .reverse 266 for (batchId <- batchIds) { 267 val batch = get(batchId) 268 if (batch.isDefined) { 269 return Some((batchId, batch.get)) 270 } 271 } 272 None 273 } 274 275 /** 276 * Get an array of [FileStatus] referencing batch files. 277 * The array is sorted by most recent batch file first to 278 * oldest batch file. 279 */ 280 def getOrderedBatchFiles(): Array[FileStatus] = { 281 fileManager.list(metadataPath, batchFilesFilter) 282 .sortBy(f => pathToBatchId(f.getPath)) 283 .reverse 284 } 285 286 /** 287 * Removes all the log entry earlier than thresholdBatchId (exclusive). 288 */ 289 override def purge(thresholdBatchId: Long): Unit = { 290 val batchIds = fileManager.list(metadataPath, batchFilesFilter) 291 .map(f => pathToBatchId(f.getPath)) 292 293 for (batchId <- batchIds if batchId < thresholdBatchId) { 294 val path = batchIdToPath(batchId) 295 fileManager.delete(path) 296 logTrace(s"Removed metadata log file: $path") 297 } 298 } 299 300 private def createFileManager(): FileManager = { 301 val hadoopConf = sparkSession.sessionState.newHadoopConf() 302 try { 303 new FileContextManager(metadataPath, hadoopConf) 304 } catch { 305 case e: UnsupportedFileSystemException => 306 logWarning("Could not use FileContext API for managing metadata log files at path " + 307 s"$metadataPath. Using FileSystem API instead for managing log files. The log may be " + 308 s"inconsistent under failures.") 309 new FileSystemManager(metadataPath, hadoopConf) 310 } 311 } 312 313 /** 314 * Parse the log version from the given `text` -- will throw exception when the parsed version 315 * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1", 316 * "v123xyz" etc.) 317 */ 318 private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = { 319 if (text.length > 0 && text(0) == 'v') { 320 val version = 321 try { 322 text.substring(1, text.length).toInt 323 } catch { 324 case _: NumberFormatException => 325 throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + 326 s"version from $text.") 327 } 328 if (version > 0) { 329 if (version > maxSupportedVersion) { 330 throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + 331 s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + 332 s"by a newer version of Spark and cannot be read by this version. Please upgrade.") 333 } else { 334 return version 335 } 336 } 337 } 338 339 // reaching here means we failed to read the correct log version 340 throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + 341 s"version from $text.") 342 } 343} 344 345object HDFSMetadataLog { 346 347 /** A simple trait to abstract out the file management operations needed by HDFSMetadataLog. */ 348 trait FileManager { 349 350 /** List the files in a path that matches a filter. */ 351 def list(path: Path, filter: PathFilter): Array[FileStatus] 352 353 /** Make directory at the give path and all its parent directories as needed. */ 354 def mkdirs(path: Path): Unit 355 356 /** Whether path exists */ 357 def exists(path: Path): Boolean 358 359 /** Open a file for reading, or throw exception if it does not exist. */ 360 def open(path: Path): FSDataInputStream 361 362 /** Create path, or throw exception if it already exists */ 363 def create(path: Path): FSDataOutputStream 364 365 /** 366 * Atomically rename path, or throw exception if it cannot be done. 367 * Should throw FileNotFoundException if srcPath does not exist. 368 * Should throw FileAlreadyExistsException if destPath already exists. 369 */ 370 def rename(srcPath: Path, destPath: Path): Unit 371 372 /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ 373 def delete(path: Path): Unit 374 375 /** Whether the file systme is a local FS. */ 376 def isLocalFileSystem: Boolean 377 } 378 379 /** 380 * Default implementation of FileManager using newer FileContext API. 381 */ 382 class FileContextManager(path: Path, hadoopConf: Configuration) extends FileManager { 383 private val fc = if (path.toUri.getScheme == null) { 384 FileContext.getFileContext(hadoopConf) 385 } else { 386 FileContext.getFileContext(path.toUri, hadoopConf) 387 } 388 389 override def list(path: Path, filter: PathFilter): Array[FileStatus] = { 390 fc.util.listStatus(path, filter) 391 } 392 393 override def rename(srcPath: Path, destPath: Path): Unit = { 394 fc.rename(srcPath, destPath) 395 } 396 397 override def mkdirs(path: Path): Unit = { 398 fc.mkdir(path, FsPermission.getDirDefault, true) 399 } 400 401 override def open(path: Path): FSDataInputStream = { 402 fc.open(path) 403 } 404 405 override def create(path: Path): FSDataOutputStream = { 406 fc.create(path, EnumSet.of(CreateFlag.CREATE)) 407 } 408 409 override def exists(path: Path): Boolean = { 410 fc.util().exists(path) 411 } 412 413 override def delete(path: Path): Unit = { 414 try { 415 fc.delete(path, true) 416 } catch { 417 case e: FileNotFoundException => 418 // ignore if file has already been deleted 419 } 420 } 421 422 override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match { 423 case _: local.LocalFs | _: local.RawLocalFs => 424 // LocalFs = RawLocalFs + ChecksumFs 425 true 426 case _ => false 427 } 428 } 429 430 /** 431 * Implementation of FileManager using older FileSystem API. Note that this implementation 432 * cannot provide atomic renaming of paths, hence can lead to consistency issues. This 433 * should be used only as a backup option, when FileContextManager cannot be used. 434 */ 435 class FileSystemManager(path: Path, hadoopConf: Configuration) extends FileManager { 436 private val fs = path.getFileSystem(hadoopConf) 437 438 override def list(path: Path, filter: PathFilter): Array[FileStatus] = { 439 fs.listStatus(path, filter) 440 } 441 442 /** 443 * Rename a path. Note that this implementation is not atomic. 444 * @throws FileNotFoundException if source path does not exist. 445 * @throws FileAlreadyExistsException if destination path already exists. 446 * @throws IOException if renaming fails for some unknown reason. 447 */ 448 override def rename(srcPath: Path, destPath: Path): Unit = { 449 if (!fs.exists(srcPath)) { 450 throw new FileNotFoundException(s"Source path does not exist: $srcPath") 451 } 452 if (fs.exists(destPath)) { 453 throw new FileAlreadyExistsException(s"Destination path already exists: $destPath") 454 } 455 if (!fs.rename(srcPath, destPath)) { 456 throw new IOException(s"Failed to rename $srcPath to $destPath") 457 } 458 } 459 460 override def mkdirs(path: Path): Unit = { 461 fs.mkdirs(path, FsPermission.getDirDefault) 462 } 463 464 override def open(path: Path): FSDataInputStream = { 465 fs.open(path) 466 } 467 468 override def create(path: Path): FSDataOutputStream = { 469 fs.create(path, false) 470 } 471 472 override def exists(path: Path): Boolean = { 473 fs.exists(path) 474 } 475 476 override def delete(path: Path): Unit = { 477 try { 478 fs.delete(path, true) 479 } catch { 480 case e: FileNotFoundException => 481 // ignore if file has already been deleted 482 } 483 } 484 485 override def isLocalFileSystem: Boolean = fs match { 486 case _: LocalFileSystem | _: RawLocalFileSystem => 487 // LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem 488 true 489 case _ => false 490 } 491 } 492} 493