1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17package org.apache.spark.streaming.rdd
18
19import java.io.File
20
21import scala.util.Random
22
23import org.apache.hadoop.conf.Configuration
24import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
25
26import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
27import org.apache.spark.internal.config._
28import org.apache.spark.serializer.SerializerManager
29import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
30import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter}
31import org.apache.spark.util.Utils
32
33class WriteAheadLogBackedBlockRDDSuite
34  extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
35
36  val conf = new SparkConf()
37    .setMaster("local[2]")
38    .setAppName(this.getClass.getSimpleName)
39
40  val hadoopConf = new Configuration()
41
42  var sparkContext: SparkContext = null
43  var blockManager: BlockManager = null
44  var serializerManager: SerializerManager = null
45  var dir: File = null
46
47  override def beforeEach(): Unit = {
48    super.beforeEach()
49    initSparkContext()
50    dir = Utils.createTempDir()
51  }
52
53  override def afterEach(): Unit = {
54    try {
55      Utils.deleteRecursively(dir)
56    } finally {
57      super.afterEach()
58    }
59  }
60
61  override def afterAll(): Unit = {
62    try {
63      stopSparkContext()
64    } finally {
65      super.afterAll()
66    }
67  }
68
69  private def initSparkContext(_conf: Option[SparkConf] = None): Unit = {
70    if (sparkContext == null) {
71      sparkContext = new SparkContext(_conf.getOrElse(conf))
72      blockManager = sparkContext.env.blockManager
73      serializerManager = sparkContext.env.serializerManager
74    }
75  }
76
77  private def stopSparkContext(): Unit = {
78    // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
79    try {
80      if (sparkContext != null) {
81        sparkContext.stop()
82      }
83      System.clearProperty("spark.driver.port")
84      blockManager = null
85      serializerManager = null
86    } finally {
87      sparkContext = null
88    }
89  }
90
91  test("Read data available in both block manager and write ahead log") {
92    testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5)
93  }
94
95  test("Read data available only in block manager, not in write ahead log") {
96    testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0)
97  }
98
99  test("Read data available only in write ahead log, not in block manager") {
100    testRDD(numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5)
101  }
102
103  test("Read data with partially available in block manager, and rest in write ahead log") {
104    testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2)
105  }
106
107  test("Test isBlockValid skips block fetching from BlockManager") {
108    testRDD(
109      numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0, testIsBlockValid = true)
110  }
111
112  test("Test whether RDD is valid after removing blocks from block manager") {
113    testRDD(
114      numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5, testBlockRemove = true)
115  }
116
117  test("Test storing of blocks recovered from write ahead log back into block manager") {
118    testRDD(
119      numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true)
120  }
121
122  test("read data in block manager and WAL with encryption on") {
123    stopSparkContext()
124    try {
125      val testConf = conf.clone().set(IO_ENCRYPTION_ENABLED, true)
126      initSparkContext(Some(testConf))
127      testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2)
128    } finally {
129      stopSparkContext()
130    }
131  }
132
133  /**
134   * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
135   * and the rest to a write ahead log, and then reading reading it all back using the RDD.
136   * It can also test if the partitions that were read from the log were again stored in
137   * block manager.
138   *
139   * @param numPartitions Number of partitions in RDD
140   * @param numPartitionsInBM Number of partitions to write to the BlockManager.
141   *                          Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
142   * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log.
143   *                           Partitions (numPartitions - 1 - numPartitionsInWAL) to
144   *                           (numPartitions - 1) will be written to WAL
145   * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
146   * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
147   *                        reads falling back to the WAL
148   * @param testStoreInBM   Test whether blocks read from log are stored back into block manager
149   *
150   * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
151   *
152   *   numPartitionsInBM = 3
153   *   |------------------|
154   *   |                  |
155   *    0       1       2       3       4
156   *           |                         |
157   *           |-------------------------|
158   *              numPartitionsInWAL = 4
159   */
160  private def testRDD(
161      numPartitions: Int,
162      numPartitionsInBM: Int,
163      numPartitionsInWAL: Int,
164      testIsBlockValid: Boolean = false,
165      testBlockRemove: Boolean = false,
166      testStoreInBM: Boolean = false
167    ) {
168    require(numPartitionsInBM <= numPartitions,
169      "Can't put more partitions in BlockManager than that in RDD")
170    require(numPartitionsInWAL <= numPartitions,
171      "Can't put more partitions in write ahead log than that in RDD")
172    val data = Seq.fill(numPartitions, 10)(scala.util.Random.nextString(50))
173
174    // Put the necessary blocks in the block manager
175    val blockIds = Array.fill(numPartitions)(StreamBlockId(Random.nextInt(), Random.nextInt()))
176    data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) =>
177      blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
178    }
179
180    // Generate write ahead log record handles
181    val recordHandles = generateFakeRecordHandles(numPartitions - numPartitionsInWAL) ++
182      generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
183        blockIds.takeRight(numPartitionsInWAL))
184
185    // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
186    require(
187      blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
188      "Expected blocks not in BlockManager"
189    )
190    require(
191      blockIds.takeRight(numPartitions - numPartitionsInBM).forall(blockManager.get(_).isEmpty),
192      "Unexpected blocks in BlockManager"
193    )
194
195    // Make sure that the right `numPartitionsInWAL` blocks are in WALs, and other are not
196    require(
197      recordHandles.takeRight(numPartitionsInWAL).forall(s =>
198        new File(s.path.stripPrefix("file://")).exists()),
199      "Expected blocks not in write ahead log"
200    )
201    require(
202      recordHandles.take(numPartitions - numPartitionsInWAL).forall(s =>
203        !new File(s.path.stripPrefix("file://")).exists()),
204      "Unexpected blocks in write ahead log"
205    )
206
207    // Create the RDD and verify whether the returned data is correct
208    val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
209      recordHandles.toArray, storeInBlockManager = false)
210    assert(rdd.collect() === data.flatten)
211
212    // Verify that the block fetching is skipped when isBlockValid is set to false.
213    // This is done by using an RDD whose data is only in memory but is set to skip block fetching
214    // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
215    // in BlockManager.
216    if (testIsBlockValid) {
217      require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
218      require(numPartitionsInWAL === 0, "No partitions must be in WAL")
219      val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
220        recordHandles.toArray, isBlockIdValid = Array.fill(blockIds.length)(false))
221      intercept[SparkException] {
222        rdd2.collect()
223      }
224    }
225
226    // Verify that the RDD is not invalid after the blocks are removed and can still read data
227    // from write ahead log
228    if (testBlockRemove) {
229      require(numPartitions === numPartitionsInWAL, "All partitions must be in WAL for this test")
230      require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
231      rdd.removeBlocks()
232      assert(rdd.collect() === data.flatten)
233    }
234
235    if (testStoreInBM) {
236      val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
237        recordHandles.toArray, storeInBlockManager = true, storageLevel = StorageLevel.MEMORY_ONLY)
238      assert(rdd2.collect() === data.flatten)
239      assert(
240        blockIds.forall(blockManager.get(_).nonEmpty),
241        "All blocks not found in block manager"
242      )
243    }
244  }
245
246  private def generateWALRecordHandles(
247      blockData: Seq[Seq[String]],
248      blockIds: Seq[BlockId]
249    ): Seq[FileBasedWriteAheadLogSegment] = {
250    require(blockData.size === blockIds.size)
251    val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
252    val segments = blockData.zip(blockIds).map { case (data, id) =>
253      writer.write(serializerManager.dataSerialize(id, data.iterator, allowEncryption = false)
254        .toByteBuffer)
255    }
256    writer.close()
257    segments
258  }
259
260  private def generateFakeRecordHandles(count: Int): Seq[FileBasedWriteAheadLogSegment] = {
261    Array.fill(count)(new FileBasedWriteAheadLogSegment("random", 0L, 0))
262  }
263}
264