1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.streaming 19 20import java.io.File 21import java.nio.ByteBuffer 22 23import scala.collection.mutable.ArrayBuffer 24import scala.concurrent.duration._ 25import scala.language.{implicitConversions, postfixOps} 26import scala.util.Random 27 28import org.apache.hadoop.conf.Configuration 29import org.scalatest.{BeforeAndAfter, Matchers} 30import org.scalatest.concurrent.Eventually._ 31 32import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} 33import org.apache.spark.internal.Logging 34import org.apache.spark.storage.StreamBlockId 35import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult 36import org.apache.spark.streaming.scheduler._ 37import org.apache.spark.streaming.util._ 38import org.apache.spark.streaming.util.WriteAheadLogSuite._ 39import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} 40 41class ReceivedBlockTrackerSuite 42 extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { 43 44 val hadoopConf = new Configuration() 45 val streamId = 1 46 47 var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]() 48 var checkpointDirectory: File = null 49 var conf: SparkConf = null 50 51 before { 52 conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") 53 checkpointDirectory = Utils.createTempDir() 54 } 55 56 after { 57 allReceivedBlockTrackers.foreach { _.stop() } 58 Utils.deleteRecursively(checkpointDirectory) 59 } 60 61 test("block addition, and block to batch allocation") { 62 val receivedBlockTracker = createTracker(setCheckpointDir = false) 63 receivedBlockTracker.isWriteAheadLogEnabled should be (false) // should be disable by default 64 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty 65 66 val blockInfos = generateBlockInfos() 67 blockInfos.map(receivedBlockTracker.addBlock) 68 69 // Verify added blocks are unallocated blocks 70 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos 71 receivedBlockTracker.hasUnallocatedReceivedBlocks should be (true) 72 73 74 // Allocate the blocks to a batch and verify that all of them have been allocated 75 receivedBlockTracker.allocateBlocksToBatch(1) 76 receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos 77 receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos) 78 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldBe empty 79 receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false) 80 81 // Allocate no blocks to another batch 82 receivedBlockTracker.allocateBlocksToBatch(2) 83 receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty 84 receivedBlockTracker.getBlocksOfBatch(2) shouldEqual Map(streamId -> Seq.empty) 85 86 // Verify that older batches have no operation on batch allocation, 87 // will return the same blocks as previously allocated. 88 receivedBlockTracker.allocateBlocksToBatch(1) 89 receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos 90 91 blockInfos.map(receivedBlockTracker.addBlock) 92 receivedBlockTracker.allocateBlocksToBatch(2) 93 receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty 94 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos 95 } 96 97 test("recovery and cleanup with write ahead logs") { 98 val manualClock = new ManualClock 99 // Set the time increment level to twice the rotation interval so that every increment creates 100 // a new log file 101 102 def incrementTime() { 103 val timeIncrementMillis = 2000L 104 manualClock.advance(timeIncrementMillis) 105 } 106 107 // Generate and add blocks to the given tracker 108 def addBlockInfos(tracker: ReceivedBlockTracker): Seq[ReceivedBlockInfo] = { 109 val blockInfos = generateBlockInfos() 110 blockInfos.map(tracker.addBlock) 111 blockInfos 112 } 113 114 // Print the data present in the log ahead files in the log directory 115 def printLogFiles(message: String) { 116 val fileContents = getWriteAheadLogFiles().map { file => 117 (s"\n>>>>> $file: <<<<<\n${getWrittenLogData(file).mkString("\n")}") 118 }.mkString("\n") 119 logInfo(s"\n\n=====================\n$message\n$fileContents\n=====================\n") 120 } 121 122 // Set WAL configuration 123 conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1") 124 require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1) 125 126 // Start tracker and add blocks 127 val tracker1 = createTracker(clock = manualClock) 128 tracker1.isWriteAheadLogEnabled should be (true) 129 130 val blockInfos1 = addBlockInfos(tracker1) 131 tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1 132 133 // Verify whether write ahead log has correct contents 134 val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent) 135 getWrittenLogData() shouldEqual expectedWrittenData1 136 getWriteAheadLogFiles() should have size 1 137 138 incrementTime() 139 140 // Recovery without recovery from WAL and verify list of unallocated blocks is empty 141 val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false) 142 tracker1_.getUnallocatedBlocks(streamId) shouldBe empty 143 tracker1_.hasUnallocatedReceivedBlocks should be (false) 144 145 // Restart tracker and verify recovered list of unallocated blocks 146 val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) 147 val unallocatedBlocks = tracker2.getUnallocatedBlocks(streamId).toList 148 unallocatedBlocks shouldEqual blockInfos1 149 unallocatedBlocks.foreach { block => 150 block.isBlockIdValid() should be (false) 151 } 152 153 154 // Allocate blocks to batch and verify whether the unallocated blocks got allocated 155 val batchTime1 = manualClock.getTimeMillis() 156 tracker2.allocateBlocksToBatch(batchTime1) 157 tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 158 tracker2.getBlocksOfBatch(batchTime1) shouldEqual Map(streamId -> blockInfos1) 159 160 // Add more blocks and allocate to another batch 161 incrementTime() 162 val batchTime2 = manualClock.getTimeMillis() 163 val blockInfos2 = addBlockInfos(tracker2) 164 tracker2.allocateBlocksToBatch(batchTime2) 165 tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 166 167 // Verify whether log has correct contents 168 val expectedWrittenData2 = expectedWrittenData1 ++ 169 Seq(createBatchAllocation(batchTime1, blockInfos1)) ++ 170 blockInfos2.map(BlockAdditionEvent) ++ 171 Seq(createBatchAllocation(batchTime2, blockInfos2)) 172 getWrittenLogData() shouldEqual expectedWrittenData2 173 174 // Restart tracker and verify recovered state 175 incrementTime() 176 val tracker3 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) 177 tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1 178 tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 179 tracker3.getUnallocatedBlocks(streamId) shouldBe empty 180 181 // Cleanup first batch but not second batch 182 val oldestLogFile = getWriteAheadLogFiles().head 183 incrementTime() 184 tracker3.cleanupOldBatches(batchTime2, waitForCompletion = true) 185 186 // Verify that the batch allocations have been cleaned, and the act has been written to log 187 tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual Seq.empty 188 getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1)) 189 190 // Verify that at least one log file gets deleted 191 eventually(timeout(10 seconds), interval(10 millisecond)) { 192 getWriteAheadLogFiles() should not contain oldestLogFile 193 } 194 printLogFiles("After clean") 195 196 // Restart tracker and verify recovered state, specifically whether info about the first 197 // batch has been removed, but not the second batch 198 incrementTime() 199 val tracker4 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) 200 tracker4.getUnallocatedBlocks(streamId) shouldBe empty 201 tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned 202 tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 203 } 204 205 test("disable write ahead log when checkpoint directory is not set") { 206 // When checkpoint is disabled, then the write ahead log is disabled 207 val tracker1 = createTracker(setCheckpointDir = false) 208 tracker1.isWriteAheadLogEnabled should be (false) 209 } 210 211 test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") { 212 conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1") 213 require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1) 214 215 val addBlocks = generateBlockInfos() 216 val batch1 = addBlocks.slice(0, 1) 217 val batch2 = addBlocks.slice(1, 3) 218 val batch3 = addBlocks.slice(3, addBlocks.length) 219 220 assert(getWriteAheadLogFiles().length === 0) 221 222 // list of timestamps for files 223 val t = Seq.tabulate(5)(i => i * 1000) 224 225 writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0)))) 226 assert(getWriteAheadLogFiles().length === 1) 227 228 // The goal is to create several log files which should have been cleaned up. 229 // If we face any issue during recovery, because these old files exist, then we need to make 230 // deletion more robust rather than a parallelized operation where we fire and forget 231 val batch1Allocation = createBatchAllocation(t(1), batch1) 232 writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation) 233 234 writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1)))) 235 236 val batch2Allocation = createBatchAllocation(t(3), batch2) 237 writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation) 238 239 writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent)) 240 241 // We should have 5 different log files as we called `writeEventsManually` with 5 different 242 // timestamps 243 assert(getWriteAheadLogFiles().length === 5) 244 245 // Create the tracker to recover from the log files. We're going to ask the tracker to clean 246 // things up, and then we're going to rewrite that data, and recover using a different tracker. 247 // They should have identical data no matter what 248 val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4))) 249 250 def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = { 251 subject.getBlocksOfBatchAndStream(t(3), streamId) should be( 252 base.getBlocksOfBatchAndStream(t(3), streamId)) 253 subject.getBlocksOfBatchAndStream(t(1), streamId) should be( 254 base.getBlocksOfBatchAndStream(t(1), streamId)) 255 subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil) 256 } 257 258 // ask the tracker to clean up some old files 259 tracker.cleanupOldBatches(t(3), waitForCompletion = true) 260 assert(getWriteAheadLogFiles().length === 3) 261 262 val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4))) 263 compareTrackers(tracker, tracker2) 264 265 // rewrite first file 266 writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0)))) 267 assert(getWriteAheadLogFiles().length === 4) 268 // make sure trackers are consistent 269 val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4))) 270 compareTrackers(tracker, tracker3) 271 272 // rewrite second file 273 writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation) 274 assert(getWriteAheadLogFiles().length === 5) 275 // make sure trackers are consistent 276 val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4))) 277 compareTrackers(tracker, tracker4) 278 } 279 280 /** 281 * Create tracker object with the optional provided clock. Use fake clock if you 282 * want to control time by manually incrementing it to test log clean. 283 */ 284 def createTracker( 285 setCheckpointDir: Boolean = true, 286 recoverFromWriteAheadLog: Boolean = false, 287 clock: Clock = new SystemClock): ReceivedBlockTracker = { 288 val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None 289 val tracker = new ReceivedBlockTracker( 290 conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption) 291 allReceivedBlockTrackers += tracker 292 tracker 293 } 294 295 /** Generate blocks infos using random ids */ 296 def generateBlockInfos(): Seq[ReceivedBlockInfo] = { 297 List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None, 298 BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L)))) 299 } 300 301 /** 302 * Write received block tracker events to a file manually. 303 */ 304 def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit = { 305 val writer = HdfsUtils.getOutputStream(filePath, hadoopConf) 306 events.foreach { event => 307 val bytes = Utils.serialize(event) 308 writer.writeInt(bytes.size) 309 writer.write(bytes) 310 } 311 writer.close() 312 } 313 314 /** Get all the data written in the given write ahead log file. */ 315 def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = { 316 getWrittenLogData(Seq(logFile)) 317 } 318 319 /** Get the log file name for the given log start time. */ 320 def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = { 321 checkpointDirectory.toString + File.separator + "receivedBlockMetadata" + 322 File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}" 323 } 324 325 /** 326 * Get all the data written in the given write ahead log files. By default, it will read all 327 * files in the test log directory. 328 */ 329 def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles) 330 : Seq[ReceivedBlockTrackerLogEvent] = { 331 logFiles.flatMap { 332 file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq 333 }.flatMap { byteBuffer => 334 val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) { 335 Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap) 336 } else { 337 Array(byteBuffer) 338 } 339 validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array())) 340 }.toList 341 } 342 343 /** Get all the write ahead log files in the test directory */ 344 def getWriteAheadLogFiles(): Seq[String] = { 345 import ReceivedBlockTracker._ 346 val logDir = checkpointDirToLogDir(checkpointDirectory.toString) 347 getLogFilesInDirectory(logDir).map { _.toString } 348 } 349 350 /** Create batch allocation object from the given info */ 351 def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo]) 352 : BatchAllocationEvent = { 353 BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos)))) 354 } 355 356 /** Create batch clean object from the given info */ 357 def createBatchCleanup(time: Long, moreTimes: Long*): BatchCleanupEvent = { 358 BatchCleanupEvent((Seq(time) ++ moreTimes).map(Time.apply)) 359 } 360 361 implicit def millisToTime(milliseconds: Long): Time = Time(milliseconds) 362 363 implicit def timeToMillis(time: Time): Long = time.milliseconds 364} 365