1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.streaming
19
20import java.io.File
21import java.nio.ByteBuffer
22
23import scala.collection.mutable.ArrayBuffer
24import scala.concurrent.duration._
25import scala.language.{implicitConversions, postfixOps}
26import scala.util.Random
27
28import org.apache.hadoop.conf.Configuration
29import org.scalatest.{BeforeAndAfter, Matchers}
30import org.scalatest.concurrent.Eventually._
31
32import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
33import org.apache.spark.internal.Logging
34import org.apache.spark.storage.StreamBlockId
35import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
36import org.apache.spark.streaming.scheduler._
37import org.apache.spark.streaming.util._
38import org.apache.spark.streaming.util.WriteAheadLogSuite._
39import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
40
41class ReceivedBlockTrackerSuite
42  extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
43
44  val hadoopConf = new Configuration()
45  val streamId = 1
46
47  var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
48  var checkpointDirectory: File = null
49  var conf: SparkConf = null
50
51  before {
52    conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
53    checkpointDirectory = Utils.createTempDir()
54  }
55
56  after {
57    allReceivedBlockTrackers.foreach { _.stop() }
58    Utils.deleteRecursively(checkpointDirectory)
59  }
60
61  test("block addition, and block to batch allocation") {
62    val receivedBlockTracker = createTracker(setCheckpointDir = false)
63    receivedBlockTracker.isWriteAheadLogEnabled should be (false)  // should be disable by default
64    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
65
66    val blockInfos = generateBlockInfos()
67    blockInfos.map(receivedBlockTracker.addBlock)
68
69    // Verify added blocks are unallocated blocks
70    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
71    receivedBlockTracker.hasUnallocatedReceivedBlocks should be (true)
72
73
74    // Allocate the blocks to a batch and verify that all of them have been allocated
75    receivedBlockTracker.allocateBlocksToBatch(1)
76    receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
77    receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos)
78    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldBe empty
79    receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false)
80
81    // Allocate no blocks to another batch
82    receivedBlockTracker.allocateBlocksToBatch(2)
83    receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
84    receivedBlockTracker.getBlocksOfBatch(2) shouldEqual Map(streamId -> Seq.empty)
85
86    // Verify that older batches have no operation on batch allocation,
87    // will return the same blocks as previously allocated.
88    receivedBlockTracker.allocateBlocksToBatch(1)
89    receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
90
91    blockInfos.map(receivedBlockTracker.addBlock)
92    receivedBlockTracker.allocateBlocksToBatch(2)
93    receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
94    receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
95  }
96
97  test("recovery and cleanup with write ahead logs") {
98    val manualClock = new ManualClock
99    // Set the time increment level to twice the rotation interval so that every increment creates
100    // a new log file
101
102    def incrementTime() {
103      val timeIncrementMillis = 2000L
104      manualClock.advance(timeIncrementMillis)
105    }
106
107    // Generate and add blocks to the given tracker
108    def addBlockInfos(tracker: ReceivedBlockTracker): Seq[ReceivedBlockInfo] = {
109      val blockInfos = generateBlockInfos()
110      blockInfos.map(tracker.addBlock)
111      blockInfos
112    }
113
114    // Print the data present in the log ahead files in the log directory
115    def printLogFiles(message: String) {
116      val fileContents = getWriteAheadLogFiles().map { file =>
117        (s"\n>>>>> $file: <<<<<\n${getWrittenLogData(file).mkString("\n")}")
118      }.mkString("\n")
119      logInfo(s"\n\n=====================\n$message\n$fileContents\n=====================\n")
120    }
121
122    // Set WAL configuration
123    conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
124    require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
125
126    // Start tracker and add blocks
127    val tracker1 = createTracker(clock = manualClock)
128    tracker1.isWriteAheadLogEnabled should be (true)
129
130    val blockInfos1 = addBlockInfos(tracker1)
131    tracker1.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
132
133    // Verify whether write ahead log has correct contents
134    val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
135    getWrittenLogData() shouldEqual expectedWrittenData1
136    getWriteAheadLogFiles() should have size 1
137
138    incrementTime()
139
140    // Recovery without recovery from WAL and verify list of unallocated blocks is empty
141    val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
142    tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
143    tracker1_.hasUnallocatedReceivedBlocks should be (false)
144
145    // Restart tracker and verify recovered list of unallocated blocks
146    val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
147    val unallocatedBlocks = tracker2.getUnallocatedBlocks(streamId).toList
148    unallocatedBlocks shouldEqual blockInfos1
149    unallocatedBlocks.foreach { block =>
150      block.isBlockIdValid() should be (false)
151    }
152
153
154    // Allocate blocks to batch and verify whether the unallocated blocks got allocated
155    val batchTime1 = manualClock.getTimeMillis()
156    tracker2.allocateBlocksToBatch(batchTime1)
157    tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
158    tracker2.getBlocksOfBatch(batchTime1) shouldEqual Map(streamId -> blockInfos1)
159
160    // Add more blocks and allocate to another batch
161    incrementTime()
162    val batchTime2 = manualClock.getTimeMillis()
163    val blockInfos2 = addBlockInfos(tracker2)
164    tracker2.allocateBlocksToBatch(batchTime2)
165    tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
166
167    // Verify whether log has correct contents
168    val expectedWrittenData2 = expectedWrittenData1 ++
169      Seq(createBatchAllocation(batchTime1, blockInfos1)) ++
170      blockInfos2.map(BlockAdditionEvent) ++
171      Seq(createBatchAllocation(batchTime2, blockInfos2))
172    getWrittenLogData() shouldEqual expectedWrittenData2
173
174    // Restart tracker and verify recovered state
175    incrementTime()
176    val tracker3 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
177    tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
178    tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
179    tracker3.getUnallocatedBlocks(streamId) shouldBe empty
180
181    // Cleanup first batch but not second batch
182    val oldestLogFile = getWriteAheadLogFiles().head
183    incrementTime()
184    tracker3.cleanupOldBatches(batchTime2, waitForCompletion = true)
185
186    // Verify that the batch allocations have been cleaned, and the act has been written to log
187    tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual Seq.empty
188    getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1))
189
190    // Verify that at least one log file gets deleted
191    eventually(timeout(10 seconds), interval(10 millisecond)) {
192      getWriteAheadLogFiles() should not contain oldestLogFile
193    }
194    printLogFiles("After clean")
195
196    // Restart tracker and verify recovered state, specifically whether info about the first
197    // batch has been removed, but not the second batch
198    incrementTime()
199    val tracker4 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
200    tracker4.getUnallocatedBlocks(streamId) shouldBe empty
201    tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty  // should be cleaned
202    tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
203  }
204
205  test("disable write ahead log when checkpoint directory is not set") {
206    // When checkpoint is disabled, then the write ahead log is disabled
207    val tracker1 = createTracker(setCheckpointDir = false)
208    tracker1.isWriteAheadLogEnabled should be (false)
209  }
210
211  test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
212    conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
213    require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
214
215    val addBlocks = generateBlockInfos()
216    val batch1 = addBlocks.slice(0, 1)
217    val batch2 = addBlocks.slice(1, 3)
218    val batch3 = addBlocks.slice(3, addBlocks.length)
219
220    assert(getWriteAheadLogFiles().length === 0)
221
222    // list of timestamps for files
223    val t = Seq.tabulate(5)(i => i * 1000)
224
225    writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
226    assert(getWriteAheadLogFiles().length === 1)
227
228    // The goal is to create several log files which should have been cleaned up.
229    // If we face any issue during recovery, because these old files exist, then we need to make
230    // deletion more robust rather than a parallelized operation where we fire and forget
231    val batch1Allocation = createBatchAllocation(t(1), batch1)
232    writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
233
234    writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1))))
235
236    val batch2Allocation = createBatchAllocation(t(3), batch2)
237    writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) :+ batch2Allocation)
238
239    writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent))
240
241    // We should have 5 different log files as we called `writeEventsManually` with 5 different
242    // timestamps
243    assert(getWriteAheadLogFiles().length === 5)
244
245    // Create the tracker to recover from the log files. We're going to ask the tracker to clean
246    // things up, and then we're going to rewrite that data, and recover using a different tracker.
247    // They should have identical data no matter what
248    val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
249
250    def compareTrackers(base: ReceivedBlockTracker, subject: ReceivedBlockTracker): Unit = {
251      subject.getBlocksOfBatchAndStream(t(3), streamId) should be(
252        base.getBlocksOfBatchAndStream(t(3), streamId))
253      subject.getBlocksOfBatchAndStream(t(1), streamId) should be(
254        base.getBlocksOfBatchAndStream(t(1), streamId))
255      subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil)
256    }
257
258    // ask the tracker to clean up some old files
259    tracker.cleanupOldBatches(t(3), waitForCompletion = true)
260    assert(getWriteAheadLogFiles().length === 3)
261
262    val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
263    compareTrackers(tracker, tracker2)
264
265    // rewrite first file
266    writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
267    assert(getWriteAheadLogFiles().length === 4)
268    // make sure trackers are consistent
269    val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
270    compareTrackers(tracker, tracker3)
271
272    // rewrite second file
273    writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) :+ batch1Allocation)
274    assert(getWriteAheadLogFiles().length === 5)
275    // make sure trackers are consistent
276    val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new ManualClock(t(4)))
277    compareTrackers(tracker, tracker4)
278  }
279
280  /**
281   * Create tracker object with the optional provided clock. Use fake clock if you
282   * want to control time by manually incrementing it to test log clean.
283   */
284  def createTracker(
285      setCheckpointDir: Boolean = true,
286      recoverFromWriteAheadLog: Boolean = false,
287      clock: Clock = new SystemClock): ReceivedBlockTracker = {
288    val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
289    val tracker = new ReceivedBlockTracker(
290      conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
291    allReceivedBlockTrackers += tracker
292    tracker
293  }
294
295  /** Generate blocks infos using random ids */
296  def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
297    List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
298      BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
299  }
300
301  /**
302   * Write received block tracker events to a file manually.
303   */
304  def writeEventsManually(filePath: String, events: Seq[ReceivedBlockTrackerLogEvent]): Unit = {
305    val writer = HdfsUtils.getOutputStream(filePath, hadoopConf)
306    events.foreach { event =>
307      val bytes = Utils.serialize(event)
308      writer.writeInt(bytes.size)
309      writer.write(bytes)
310    }
311    writer.close()
312  }
313
314  /** Get all the data written in the given write ahead log file. */
315  def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = {
316    getWrittenLogData(Seq(logFile))
317  }
318
319  /** Get the log file name for the given log start time. */
320  def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
321    checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
322      File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
323  }
324
325  /**
326   * Get all the data written in the given write ahead log files. By default, it will read all
327   * files in the test log directory.
328   */
329  def getWrittenLogData(logFiles: Seq[String] = getWriteAheadLogFiles)
330    : Seq[ReceivedBlockTrackerLogEvent] = {
331    logFiles.flatMap {
332      file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
333    }.flatMap { byteBuffer =>
334      val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) {
335        Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap)
336      } else {
337        Array(byteBuffer)
338      }
339      validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array()))
340    }.toList
341  }
342
343  /** Get all the write ahead log files in the test directory */
344  def getWriteAheadLogFiles(): Seq[String] = {
345    import ReceivedBlockTracker._
346    val logDir = checkpointDirToLogDir(checkpointDirectory.toString)
347    getLogFilesInDirectory(logDir).map { _.toString }
348  }
349
350  /** Create batch allocation object from the given info */
351  def createBatchAllocation(time: Long, blockInfos: Seq[ReceivedBlockInfo])
352    : BatchAllocationEvent = {
353    BatchAllocationEvent(time, AllocatedBlocks(Map((streamId -> blockInfos))))
354  }
355
356  /** Create batch clean object from the given info */
357  def createBatchCleanup(time: Long, moreTimes: Long*): BatchCleanupEvent = {
358    BatchCleanupEvent((Seq(time) ++ moreTimes).map(Time.apply))
359  }
360
361  implicit def millisToTime(milliseconds: Long): Time = Time(milliseconds)
362
363  implicit def timeToMillis(time: Time): Long = time.milliseconds
364}
365