1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17package org.apache.spark.streaming.rdd 18 19import java.io.File 20 21import scala.util.Random 22 23import org.apache.hadoop.conf.Configuration 24import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} 25 26import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} 27import org.apache.spark.internal.config._ 28import org.apache.spark.serializer.SerializerManager 29import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} 30import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter} 31import org.apache.spark.util.Utils 32 33class WriteAheadLogBackedBlockRDDSuite 34 extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfterEach { 35 36 val conf = new SparkConf() 37 .setMaster("local[2]") 38 .setAppName(this.getClass.getSimpleName) 39 40 val hadoopConf = new Configuration() 41 42 var sparkContext: SparkContext = null 43 var blockManager: BlockManager = null 44 var serializerManager: SerializerManager = null 45 var dir: File = null 46 47 override def beforeEach(): Unit = { 48 super.beforeEach() 49 initSparkContext() 50 dir = Utils.createTempDir() 51 } 52 53 override def afterEach(): Unit = { 54 try { 55 Utils.deleteRecursively(dir) 56 } finally { 57 super.afterEach() 58 } 59 } 60 61 override def afterAll(): Unit = { 62 try { 63 stopSparkContext() 64 } finally { 65 super.afterAll() 66 } 67 } 68 69 private def initSparkContext(_conf: Option[SparkConf] = None): Unit = { 70 if (sparkContext == null) { 71 sparkContext = new SparkContext(_conf.getOrElse(conf)) 72 blockManager = sparkContext.env.blockManager 73 serializerManager = sparkContext.env.serializerManager 74 } 75 } 76 77 private def stopSparkContext(): Unit = { 78 // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests. 79 try { 80 if (sparkContext != null) { 81 sparkContext.stop() 82 } 83 System.clearProperty("spark.driver.port") 84 blockManager = null 85 serializerManager = null 86 } finally { 87 sparkContext = null 88 } 89 } 90 91 test("Read data available in both block manager and write ahead log") { 92 testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5) 93 } 94 95 test("Read data available only in block manager, not in write ahead log") { 96 testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0) 97 } 98 99 test("Read data available only in write ahead log, not in block manager") { 100 testRDD(numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5) 101 } 102 103 test("Read data with partially available in block manager, and rest in write ahead log") { 104 testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2) 105 } 106 107 test("Test isBlockValid skips block fetching from BlockManager") { 108 testRDD( 109 numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0, testIsBlockValid = true) 110 } 111 112 test("Test whether RDD is valid after removing blocks from block manager") { 113 testRDD( 114 numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5, testBlockRemove = true) 115 } 116 117 test("Test storing of blocks recovered from write ahead log back into block manager") { 118 testRDD( 119 numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true) 120 } 121 122 test("read data in block manager and WAL with encryption on") { 123 stopSparkContext() 124 try { 125 val testConf = conf.clone().set(IO_ENCRYPTION_ENABLED, true) 126 initSparkContext(Some(testConf)) 127 testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2) 128 } finally { 129 stopSparkContext() 130 } 131 } 132 133 /** 134 * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager 135 * and the rest to a write ahead log, and then reading reading it all back using the RDD. 136 * It can also test if the partitions that were read from the log were again stored in 137 * block manager. 138 * 139 * @param numPartitions Number of partitions in RDD 140 * @param numPartitionsInBM Number of partitions to write to the BlockManager. 141 * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager 142 * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log. 143 * Partitions (numPartitions - 1 - numPartitionsInWAL) to 144 * (numPartitions - 1) will be written to WAL 145 * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching 146 * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with 147 * reads falling back to the WAL 148 * @param testStoreInBM Test whether blocks read from log are stored back into block manager 149 * 150 * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4 151 * 152 * numPartitionsInBM = 3 153 * |------------------| 154 * | | 155 * 0 1 2 3 4 156 * | | 157 * |-------------------------| 158 * numPartitionsInWAL = 4 159 */ 160 private def testRDD( 161 numPartitions: Int, 162 numPartitionsInBM: Int, 163 numPartitionsInWAL: Int, 164 testIsBlockValid: Boolean = false, 165 testBlockRemove: Boolean = false, 166 testStoreInBM: Boolean = false 167 ) { 168 require(numPartitionsInBM <= numPartitions, 169 "Can't put more partitions in BlockManager than that in RDD") 170 require(numPartitionsInWAL <= numPartitions, 171 "Can't put more partitions in write ahead log than that in RDD") 172 val data = Seq.fill(numPartitions, 10)(scala.util.Random.nextString(50)) 173 174 // Put the necessary blocks in the block manager 175 val blockIds = Array.fill(numPartitions)(StreamBlockId(Random.nextInt(), Random.nextInt())) 176 data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) => 177 blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER) 178 } 179 180 // Generate write ahead log record handles 181 val recordHandles = generateFakeRecordHandles(numPartitions - numPartitionsInWAL) ++ 182 generateWALRecordHandles(data.takeRight(numPartitionsInWAL), 183 blockIds.takeRight(numPartitionsInWAL)) 184 185 // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not 186 require( 187 blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty), 188 "Expected blocks not in BlockManager" 189 ) 190 require( 191 blockIds.takeRight(numPartitions - numPartitionsInBM).forall(blockManager.get(_).isEmpty), 192 "Unexpected blocks in BlockManager" 193 ) 194 195 // Make sure that the right `numPartitionsInWAL` blocks are in WALs, and other are not 196 require( 197 recordHandles.takeRight(numPartitionsInWAL).forall(s => 198 new File(s.path.stripPrefix("file://")).exists()), 199 "Expected blocks not in write ahead log" 200 ) 201 require( 202 recordHandles.take(numPartitions - numPartitionsInWAL).forall(s => 203 !new File(s.path.stripPrefix("file://")).exists()), 204 "Unexpected blocks in write ahead log" 205 ) 206 207 // Create the RDD and verify whether the returned data is correct 208 val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, 209 recordHandles.toArray, storeInBlockManager = false) 210 assert(rdd.collect() === data.flatten) 211 212 // Verify that the block fetching is skipped when isBlockValid is set to false. 213 // This is done by using an RDD whose data is only in memory but is set to skip block fetching 214 // Using that RDD will throw exception, as it skips block fetching even if the blocks are in 215 // in BlockManager. 216 if (testIsBlockValid) { 217 require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager") 218 require(numPartitionsInWAL === 0, "No partitions must be in WAL") 219 val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, 220 recordHandles.toArray, isBlockIdValid = Array.fill(blockIds.length)(false)) 221 intercept[SparkException] { 222 rdd2.collect() 223 } 224 } 225 226 // Verify that the RDD is not invalid after the blocks are removed and can still read data 227 // from write ahead log 228 if (testBlockRemove) { 229 require(numPartitions === numPartitionsInWAL, "All partitions must be in WAL for this test") 230 require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test") 231 rdd.removeBlocks() 232 assert(rdd.collect() === data.flatten) 233 } 234 235 if (testStoreInBM) { 236 val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray, 237 recordHandles.toArray, storeInBlockManager = true, storageLevel = StorageLevel.MEMORY_ONLY) 238 assert(rdd2.collect() === data.flatten) 239 assert( 240 blockIds.forall(blockManager.get(_).nonEmpty), 241 "All blocks not found in block manager" 242 ) 243 } 244 } 245 246 private def generateWALRecordHandles( 247 blockData: Seq[Seq[String]], 248 blockIds: Seq[BlockId] 249 ): Seq[FileBasedWriteAheadLogSegment] = { 250 require(blockData.size === blockIds.size) 251 val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) 252 val segments = blockData.zip(blockIds).map { case (data, id) => 253 writer.write(serializerManager.dataSerialize(id, data.iterator, allowEncryption = false) 254 .toByteBuffer) 255 } 256 writer.close() 257 segments 258 } 259 260 private def generateFakeRecordHandles(count: Int): Seq[FileBasedWriteAheadLogSegment] = { 261 Array.fill(count)(new FileBasedWriteAheadLogSegment("random", 0L, 0)) 262 } 263} 264