1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.storage 19 20import java.nio.ByteBuffer 21 22import scala.collection.mutable.ArrayBuffer 23import scala.concurrent.duration._ 24import scala.concurrent.Future 25import scala.language.implicitConversions 26import scala.language.postfixOps 27import scala.reflect.ClassTag 28 29import org.mockito.{Matchers => mc} 30import org.mockito.Mockito.{mock, times, verify, when} 31import org.scalatest._ 32import org.scalatest.concurrent.Eventually._ 33import org.scalatest.concurrent.Timeouts._ 34 35import org.apache.spark._ 36import org.apache.spark.broadcast.BroadcastManager 37import org.apache.spark.executor.DataReadMethod 38import org.apache.spark.memory.UnifiedMemoryManager 39import org.apache.spark.network.{BlockDataManager, BlockTransferService} 40import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} 41import org.apache.spark.network.netty.NettyBlockTransferService 42import org.apache.spark.network.shuffle.BlockFetchingListener 43import org.apache.spark.rpc.RpcEnv 44import org.apache.spark.scheduler.LiveListenerBus 45import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager} 46import org.apache.spark.shuffle.sort.SortShuffleManager 47import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat 48import org.apache.spark.util._ 49import org.apache.spark.util.io.ChunkedByteBuffer 50 51class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach 52 with PrivateMethodTester with LocalSparkContext with ResetSystemProperties { 53 54 import BlockManagerSuite._ 55 56 var conf: SparkConf = null 57 var store: BlockManager = null 58 var store2: BlockManager = null 59 var store3: BlockManager = null 60 var rpcEnv: RpcEnv = null 61 var master: BlockManagerMaster = null 62 val securityMgr = new SecurityManager(new SparkConf(false)) 63 val bcastManager = new BroadcastManager(true, new SparkConf(false), securityMgr) 64 val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false), bcastManager, true) 65 val shuffleManager = new SortShuffleManager(new SparkConf(false)) 66 67 // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test 68 val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) 69 70 // Implicitly convert strings to BlockIds for test clarity. 71 implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) 72 def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) 73 74 private def makeBlockManager( 75 maxMem: Long, 76 name: String = SparkContext.DRIVER_IDENTIFIER, 77 master: BlockManagerMaster = this.master, 78 transferService: Option[BlockTransferService] = Option.empty): BlockManager = { 79 conf.set("spark.testing.memory", maxMem.toString) 80 conf.set("spark.memory.offHeap.size", maxMem.toString) 81 val serializer = new KryoSerializer(conf) 82 val transfer = transferService 83 .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)) 84 val memManager = UnifiedMemoryManager(conf, numCores = 1) 85 val serializerManager = new SerializerManager(serializer, conf) 86 val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf, 87 memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) 88 memManager.setMemoryStore(blockManager.memoryStore) 89 blockManager.initialize("app-id") 90 blockManager 91 } 92 93 override def beforeEach(): Unit = { 94 super.beforeEach() 95 // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case 96 System.setProperty("os.arch", "amd64") 97 conf = new SparkConf(false) 98 .set("spark.app.id", "test") 99 .set("spark.testing", "true") 100 .set("spark.memory.fraction", "1") 101 .set("spark.memory.storageFraction", "1") 102 .set("spark.kryoserializer.buffer", "1m") 103 .set("spark.test.useCompressedOops", "true") 104 .set("spark.storage.unrollFraction", "0.4") 105 .set("spark.storage.unrollMemoryThreshold", "512") 106 107 rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) 108 conf.set("spark.driver.port", rpcEnv.address.port.toString) 109 110 // Mock SparkContext to reduce the memory usage of tests. It's fine since the only reason we 111 // need to create a SparkContext is to initialize LiveListenerBus. 112 sc = mock(classOf[SparkContext]) 113 when(sc.conf).thenReturn(conf) 114 master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", 115 new BlockManagerMasterEndpoint(rpcEnv, true, conf, 116 new LiveListenerBus(sc))), conf, true) 117 118 val initialize = PrivateMethod[Unit]('initialize) 119 SizeEstimator invokePrivate initialize() 120 } 121 122 override def afterEach(): Unit = { 123 try { 124 conf = null 125 if (store != null) { 126 store.stop() 127 store = null 128 } 129 if (store2 != null) { 130 store2.stop() 131 store2 = null 132 } 133 if (store3 != null) { 134 store3.stop() 135 store3 = null 136 } 137 rpcEnv.shutdown() 138 rpcEnv.awaitTermination() 139 rpcEnv = null 140 master = null 141 } finally { 142 super.afterEach() 143 } 144 } 145 146 test("StorageLevel object caching") { 147 val level1 = StorageLevel(false, false, false, 3) 148 // this should return the same object as level1 149 val level2 = StorageLevel(false, false, false, 3) 150 // this should return a different object 151 val level3 = StorageLevel(false, false, false, 2) 152 assert(level2 === level1, "level2 is not same as level1") 153 assert(level2.eq(level1), "level2 is not the same object as level1") 154 assert(level3 != level1, "level3 is same as level1") 155 val bytes1 = Utils.serialize(level1) 156 val level1_ = Utils.deserialize[StorageLevel](bytes1) 157 val bytes2 = Utils.serialize(level2) 158 val level2_ = Utils.deserialize[StorageLevel](bytes2) 159 assert(level1_ === level1, "Deserialized level1 not same as original level1") 160 assert(level1_.eq(level1), "Deserialized level1 not the same object as original level2") 161 assert(level2_ === level2, "Deserialized level2 not same as original level2") 162 assert(level2_.eq(level1), "Deserialized level2 not the same object as original level1") 163 } 164 165 test("BlockManagerId object caching") { 166 val id1 = BlockManagerId("e1", "XXX", 1) 167 val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1 168 val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object 169 assert(id2 === id1, "id2 is not same as id1") 170 assert(id2.eq(id1), "id2 is not the same object as id1") 171 assert(id3 != id1, "id3 is same as id1") 172 val bytes1 = Utils.serialize(id1) 173 val id1_ = Utils.deserialize[BlockManagerId](bytes1) 174 val bytes2 = Utils.serialize(id2) 175 val id2_ = Utils.deserialize[BlockManagerId](bytes2) 176 assert(id1_ === id1, "Deserialized id1 is not same as original id1") 177 assert(id1_.eq(id1), "Deserialized id1 is not the same object as original id1") 178 assert(id2_ === id2, "Deserialized id2 is not same as original id2") 179 assert(id2_.eq(id1), "Deserialized id2 is not the same object as original id1") 180 } 181 182 test("BlockManagerId.isDriver() backwards-compatibility with legacy driver ids (SPARK-6716)") { 183 assert(BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "XXX", 1).isDriver) 184 assert(BlockManagerId(SparkContext.LEGACY_DRIVER_IDENTIFIER, "XXX", 1).isDriver) 185 assert(!BlockManagerId("notADriverIdentifier", "XXX", 1).isDriver) 186 } 187 188 test("master + 1 manager interaction") { 189 store = makeBlockManager(20000) 190 val a1 = new Array[Byte](4000) 191 val a2 = new Array[Byte](4000) 192 val a3 = new Array[Byte](4000) 193 194 // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 195 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) 196 store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) 197 store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) 198 199 // Checking whether blocks are in memory 200 assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") 201 assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") 202 assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") 203 204 // Checking whether master knows about the blocks or not 205 assert(master.getLocations("a1").size > 0, "master was not told about a1") 206 assert(master.getLocations("a2").size > 0, "master was not told about a2") 207 assert(master.getLocations("a3").size === 0, "master was told about a3") 208 209 // Drop a1 and a2 from memory; this should be reported back to the master 210 store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer]) 211 store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer]) 212 assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store") 213 assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store") 214 assert(master.getLocations("a1").size === 0, "master did not remove a1") 215 assert(master.getLocations("a2").size === 0, "master did not remove a2") 216 } 217 218 test("master + 2 managers interaction") { 219 store = makeBlockManager(2000, "exec1") 220 store2 = makeBlockManager(2000, "exec2") 221 222 val peers = master.getPeers(store.blockManagerId) 223 assert(peers.size === 1, "master did not return the other manager as a peer") 224 assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager") 225 226 val a1 = new Array[Byte](400) 227 val a2 = new Array[Byte](400) 228 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2) 229 store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2) 230 assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1") 231 assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2") 232 } 233 234 test("removing block") { 235 store = makeBlockManager(20000) 236 val a1 = new Array[Byte](4000) 237 val a2 = new Array[Byte](4000) 238 val a3 = new Array[Byte](4000) 239 240 // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 241 store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY) 242 store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY) 243 store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false) 244 245 // Checking whether blocks are in memory and memory size 246 val memStatus = master.getMemoryStatus.head._2 247 assert(memStatus._1 == 40000L, "total memory " + memStatus._1 + " should equal 40000") 248 assert(memStatus._2 <= 32000L, "remaining memory " + memStatus._2 + " should <= 12000") 249 assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store") 250 assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store") 251 assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store") 252 253 // Checking whether master knows about the blocks or not 254 assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1") 255 assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2") 256 assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3") 257 258 // Remove a1 and a2 and a3. Should be no-op for a3. 259 master.removeBlock("a1-to-remove") 260 master.removeBlock("a2-to-remove") 261 master.removeBlock("a3-to-remove") 262 263 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 264 assert(!store.hasLocalBlock("a1-to-remove")) 265 master.getLocations("a1-to-remove") should have size 0 266 } 267 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 268 assert(!store.hasLocalBlock("a2-to-remove")) 269 master.getLocations("a2-to-remove") should have size 0 270 } 271 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 272 assert(store.hasLocalBlock("a3-to-remove")) 273 master.getLocations("a3-to-remove") should have size 0 274 } 275 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 276 val memStatus = master.getMemoryStatus.head._2 277 memStatus._1 should equal (40000L) 278 memStatus._2 should equal (40000L) 279 } 280 } 281 282 test("removing rdd") { 283 store = makeBlockManager(20000) 284 val a1 = new Array[Byte](4000) 285 val a2 = new Array[Byte](4000) 286 val a3 = new Array[Byte](4000) 287 // Putting a1, a2 and a3 in memory. 288 store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) 289 store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) 290 store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) 291 master.removeRdd(0, blocking = false) 292 293 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 294 store.getSingleAndReleaseLock(rdd(0, 0)) should be (None) 295 master.getLocations(rdd(0, 0)) should have size 0 296 } 297 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 298 store.getSingleAndReleaseLock(rdd(0, 1)) should be (None) 299 master.getLocations(rdd(0, 1)) should have size 0 300 } 301 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 302 store.getSingleAndReleaseLock("nonrddblock") should not be (None) 303 master.getLocations("nonrddblock") should have size (1) 304 } 305 306 store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY) 307 store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY) 308 master.removeRdd(0, blocking = true) 309 store.getSingleAndReleaseLock(rdd(0, 0)) should be (None) 310 master.getLocations(rdd(0, 0)) should have size 0 311 store.getSingleAndReleaseLock(rdd(0, 1)) should be (None) 312 master.getLocations(rdd(0, 1)) should have size 0 313 } 314 315 test("removing broadcast") { 316 store = makeBlockManager(2000) 317 val driverStore = store 318 val executorStore = makeBlockManager(2000, "executor") 319 val a1 = new Array[Byte](400) 320 val a2 = new Array[Byte](400) 321 val a3 = new Array[Byte](400) 322 val a4 = new Array[Byte](400) 323 324 val broadcast0BlockId = BroadcastBlockId(0) 325 val broadcast1BlockId = BroadcastBlockId(1) 326 val broadcast2BlockId = BroadcastBlockId(2) 327 val broadcast2BlockId2 = BroadcastBlockId(2, "_") 328 329 // insert broadcast blocks in both the stores 330 Seq(driverStore, executorStore).foreach { case s => 331 s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY) 332 s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY) 333 s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY) 334 s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY) 335 } 336 337 // verify whether the blocks exist in both the stores 338 Seq(driverStore, executorStore).foreach { case s => 339 assert(s.hasLocalBlock(broadcast0BlockId)) 340 assert(s.hasLocalBlock(broadcast1BlockId)) 341 assert(s.hasLocalBlock(broadcast2BlockId)) 342 assert(s.hasLocalBlock(broadcast2BlockId2)) 343 } 344 345 // remove broadcast 0 block only from executors 346 master.removeBroadcast(0, removeFromMaster = false, blocking = true) 347 348 // only broadcast 0 block should be removed from the executor store 349 assert(!executorStore.hasLocalBlock(broadcast0BlockId)) 350 assert(executorStore.hasLocalBlock(broadcast1BlockId)) 351 assert(executorStore.hasLocalBlock(broadcast2BlockId)) 352 353 // nothing should be removed from the driver store 354 assert(driverStore.hasLocalBlock(broadcast0BlockId)) 355 assert(driverStore.hasLocalBlock(broadcast1BlockId)) 356 assert(driverStore.hasLocalBlock(broadcast2BlockId)) 357 358 // remove broadcast 0 block from the driver as well 359 master.removeBroadcast(0, removeFromMaster = true, blocking = true) 360 assert(!driverStore.hasLocalBlock(broadcast0BlockId)) 361 assert(driverStore.hasLocalBlock(broadcast1BlockId)) 362 363 // remove broadcast 1 block from both the stores asynchronously 364 // and verify all broadcast 1 blocks have been removed 365 master.removeBroadcast(1, removeFromMaster = true, blocking = false) 366 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 367 assert(!driverStore.hasLocalBlock(broadcast1BlockId)) 368 assert(!executorStore.hasLocalBlock(broadcast1BlockId)) 369 } 370 371 // remove broadcast 2 from both the stores asynchronously 372 // and verify all broadcast 2 blocks have been removed 373 master.removeBroadcast(2, removeFromMaster = true, blocking = false) 374 eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { 375 assert(!driverStore.hasLocalBlock(broadcast2BlockId)) 376 assert(!driverStore.hasLocalBlock(broadcast2BlockId2)) 377 assert(!executorStore.hasLocalBlock(broadcast2BlockId)) 378 assert(!executorStore.hasLocalBlock(broadcast2BlockId2)) 379 } 380 executorStore.stop() 381 driverStore.stop() 382 store = null 383 } 384 385 test("reregistration on heart beat") { 386 store = makeBlockManager(2000) 387 val a1 = new Array[Byte](400) 388 389 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) 390 391 assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") 392 assert(master.getLocations("a1").size > 0, "master was not told about a1") 393 394 master.removeExecutor(store.blockManagerId.executorId) 395 assert(master.getLocations("a1").size == 0, "a1 was not removed from master") 396 397 val reregister = !master.driverEndpoint.askWithRetry[Boolean]( 398 BlockManagerHeartbeat(store.blockManagerId)) 399 assert(reregister == true) 400 } 401 402 test("reregistration on block update") { 403 store = makeBlockManager(2000) 404 val a1 = new Array[Byte](400) 405 val a2 = new Array[Byte](400) 406 407 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) 408 assert(master.getLocations("a1").size > 0, "master was not told about a1") 409 410 master.removeExecutor(store.blockManagerId.executorId) 411 assert(master.getLocations("a1").size == 0, "a1 was not removed from master") 412 413 store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) 414 store.waitForAsyncReregister() 415 416 assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master") 417 assert(master.getLocations("a2").size > 0, "master was not told about a2") 418 } 419 420 test("reregistration doesn't dead lock") { 421 store = makeBlockManager(2000) 422 val a1 = new Array[Byte](400) 423 val a2 = List(new Array[Byte](400)) 424 425 // try many times to trigger any deadlocks 426 for (i <- 1 to 100) { 427 master.removeExecutor(store.blockManagerId.executorId) 428 val t1 = new Thread { 429 override def run() { 430 store.putIterator( 431 "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 432 } 433 } 434 val t2 = new Thread { 435 override def run() { 436 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) 437 } 438 } 439 val t3 = new Thread { 440 override def run() { 441 store.reregister() 442 } 443 } 444 445 t1.start() 446 t2.start() 447 t3.start() 448 t1.join() 449 t2.join() 450 t3.join() 451 452 store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer]) 453 store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer]) 454 store.waitForAsyncReregister() 455 } 456 } 457 458 test("correct BlockResult returned from get() calls") { 459 store = makeBlockManager(12000) 460 val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) 461 val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500)) 462 val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray) 463 val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray) 464 store.putIterator( 465 "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 466 store.putIterator( 467 "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 468 store.putIterator( 469 "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) 470 val list1Get = store.get("list1") 471 assert(list1Get.isDefined, "list1 expected to be in store") 472 assert(list1Get.get.data.size === 2) 473 assert(list1Get.get.bytes === list1SizeEstimate) 474 assert(list1Get.get.readMethod === DataReadMethod.Memory) 475 val list2MemoryGet = store.get("list2memory") 476 assert(list2MemoryGet.isDefined, "list2memory expected to be in store") 477 assert(list2MemoryGet.get.data.size === 3) 478 assert(list2MemoryGet.get.bytes === list2SizeEstimate) 479 assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory) 480 val list2DiskGet = store.get("list2disk") 481 assert(list2DiskGet.isDefined, "list2memory expected to be in store") 482 assert(list2DiskGet.get.data.size === 3) 483 // We don't know the exact size of the data on disk, but it should certainly be > 0. 484 assert(list2DiskGet.get.bytes > 0) 485 assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) 486 } 487 488 test("optimize a location order of blocks") { 489 val localHost = Utils.localHostName() 490 val otherHost = "otherHost" 491 val bmMaster = mock(classOf[BlockManagerMaster]) 492 val bmId1 = BlockManagerId("id1", localHost, 1) 493 val bmId2 = BlockManagerId("id2", localHost, 2) 494 val bmId3 = BlockManagerId("id3", otherHost, 3) 495 when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3)) 496 497 val blockManager = makeBlockManager(128, "exec", bmMaster) 498 val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) 499 val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) 500 assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost)) 501 } 502 503 test("SPARK-9591: getRemoteBytes from another location when Exception throw") { 504 conf.set("spark.shuffle.io.maxRetries", "0") 505 store = makeBlockManager(8000, "executor1") 506 store2 = makeBlockManager(8000, "executor2") 507 store3 = makeBlockManager(8000, "executor3") 508 val list1 = List(new Array[Byte](4000)) 509 store2.putIterator( 510 "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 511 store3.putIterator( 512 "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 513 assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched") 514 store2.stop() 515 store2 = null 516 assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched") 517 store3.stop() 518 store3 = null 519 // Should return None instead of throwing an exception: 520 assert(store.getRemoteBytes("list1").isEmpty) 521 } 522 523 test("SPARK-14252: getOrElseUpdate should still read from remote storage") { 524 store = makeBlockManager(8000, "executor1") 525 store2 = makeBlockManager(8000, "executor2") 526 val list1 = List(new Array[Byte](4000)) 527 store2.putIterator( 528 "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 529 assert(store.getOrElseUpdate( 530 "list1", 531 StorageLevel.MEMORY_ONLY, 532 ClassTag.Any, 533 () => throw new AssertionError("attempted to compute locally")).isLeft) 534 } 535 536 test("in-memory LRU storage") { 537 testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY) 538 } 539 540 test("in-memory LRU storage with serialization") { 541 testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER) 542 } 543 544 test("in-memory LRU storage with off-heap") { 545 testInMemoryLRUStorage(StorageLevel( 546 useDisk = false, 547 useMemory = true, 548 useOffHeap = true, 549 deserialized = false, replication = 1)) 550 } 551 552 private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = { 553 store = makeBlockManager(12000) 554 val a1 = new Array[Byte](4000) 555 val a2 = new Array[Byte](4000) 556 val a3 = new Array[Byte](4000) 557 store.putSingle("a1", a1, storageLevel) 558 store.putSingle("a2", a2, storageLevel) 559 store.putSingle("a3", a3, storageLevel) 560 assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") 561 assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") 562 assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store") 563 assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") 564 // At this point a2 was gotten last, so LRU will getSingle rid of a3 565 store.putSingle("a1", a1, storageLevel) 566 assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") 567 assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") 568 assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store") 569 } 570 571 test("in-memory LRU for partitions of same RDD") { 572 store = makeBlockManager(12000) 573 val a1 = new Array[Byte](4000) 574 val a2 = new Array[Byte](4000) 575 val a3 = new Array[Byte](4000) 576 store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY) 577 store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY) 578 store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY) 579 // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2 580 // from the same RDD 581 assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") 582 assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") 583 assert(store.getSingleAndReleaseLock(rdd(0, 1)).isDefined, "rdd_0_1 was not in store") 584 // Check that rdd_0_3 doesn't replace them even after further accesses 585 assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") 586 assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") 587 assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store") 588 } 589 590 test("in-memory LRU for partitions of multiple RDDs") { 591 store = makeBlockManager(12000) 592 store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) 593 store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) 594 store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) 595 // At this point rdd_1_1 should've replaced rdd_0_1 596 assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store") 597 assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") 598 assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") 599 // Do a get() on rdd_0_2 so that it is the most recently used item 600 assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store") 601 // Put in more partitions from RDD 0; they should replace rdd_1_1 602 store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) 603 store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) 604 // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped 605 // when we try to add rdd_0_4. 606 assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store") 607 assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store") 608 assert(!store.memoryStore.contains(rdd(0, 4)), "rdd_0_4 was in store") 609 assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store") 610 assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store") 611 } 612 613 test("on-disk storage") { 614 store = makeBlockManager(1200) 615 val a1 = new Array[Byte](400) 616 val a2 = new Array[Byte](400) 617 val a3 = new Array[Byte](400) 618 store.putSingle("a1", a1, StorageLevel.DISK_ONLY) 619 store.putSingle("a2", a2, StorageLevel.DISK_ONLY) 620 store.putSingle("a3", a3, StorageLevel.DISK_ONLY) 621 assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store") 622 assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store") 623 assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store") 624 } 625 626 test("disk and memory storage") { 627 testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = false) 628 } 629 630 test("disk and memory storage with getLocalBytes") { 631 testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = true) 632 } 633 634 test("disk and memory storage with serialization") { 635 testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = false) 636 } 637 638 test("disk and memory storage with serialization and getLocalBytes") { 639 testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true) 640 } 641 642 test("disk and off-heap memory storage") { 643 testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = false) 644 } 645 646 test("disk and off-heap memory storage with getLocalBytes") { 647 testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = true) 648 } 649 650 def testDiskAndMemoryStorage( 651 storageLevel: StorageLevel, 652 getAsBytes: Boolean): Unit = { 653 store = makeBlockManager(12000) 654 val accessMethod = 655 if (getAsBytes) store.getLocalBytesAndReleaseLock else store.getSingleAndReleaseLock 656 val a1 = new Array[Byte](4000) 657 val a2 = new Array[Byte](4000) 658 val a3 = new Array[Byte](4000) 659 store.putSingle("a1", a1, storageLevel) 660 store.putSingle("a2", a2, storageLevel) 661 store.putSingle("a3", a3, storageLevel) 662 assert(accessMethod("a2").isDefined, "a2 was not in store") 663 assert(accessMethod("a3").isDefined, "a3 was not in store") 664 assert(accessMethod("a1").isDefined, "a1 was not in store") 665 val dataShouldHaveBeenCachedBackIntoMemory = { 666 if (storageLevel.deserialized) { 667 !getAsBytes 668 } else { 669 // If the block's storage level is serialized, then always cache the bytes in memory, even 670 // if the caller requested values. 671 true 672 } 673 } 674 if (dataShouldHaveBeenCachedBackIntoMemory) { 675 assert(store.memoryStore.contains("a1"), "a1 was not in memory store") 676 } else { 677 assert(!store.memoryStore.contains("a1"), "a1 was in memory store") 678 } 679 } 680 681 test("LRU with mixed storage levels") { 682 store = makeBlockManager(12000) 683 val a1 = new Array[Byte](4000) 684 val a2 = new Array[Byte](4000) 685 val a3 = new Array[Byte](4000) 686 val a4 = new Array[Byte](4000) 687 // First store a1 and a2, both in memory, and a3, on disk only 688 store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) 689 store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER) 690 store.putSingle("a3", a3, StorageLevel.DISK_ONLY) 691 // At this point LRU should not kick in because a3 is only on disk 692 assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store") 693 assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") 694 assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") 695 // Now let's add in a4, which uses both disk and memory; a1 should drop out 696 store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER) 697 assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store") 698 assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") 699 assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store") 700 assert(store.getSingleAndReleaseLock("a4").isDefined, "a4 was not in store") 701 } 702 703 test("in-memory LRU with streams") { 704 store = makeBlockManager(12000) 705 val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) 706 val list2 = List(new Array[Byte](2000), new Array[Byte](2000)) 707 val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) 708 store.putIterator( 709 "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 710 store.putIterator( 711 "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 712 store.putIterator( 713 "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 714 assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") 715 assert(store.get("list2").get.data.size === 2) 716 assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") 717 assert(store.get("list3").get.data.size === 2) 718 assert(store.getAndReleaseLock("list1") === None, "list1 was in store") 719 assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") 720 assert(store.get("list2").get.data.size === 2) 721 // At this point list2 was gotten last, so LRU will getSingle rid of list3 722 store.putIterator( 723 "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 724 assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store") 725 assert(store.get("list1").get.data.size === 2) 726 assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") 727 assert(store.get("list2").get.data.size === 2) 728 assert(store.getAndReleaseLock("list3") === None, "list1 was in store") 729 } 730 731 test("LRU with mixed storage levels and streams") { 732 store = makeBlockManager(12000) 733 val list1 = List(new Array[Byte](2000), new Array[Byte](2000)) 734 val list2 = List(new Array[Byte](2000), new Array[Byte](2000)) 735 val list3 = List(new Array[Byte](2000), new Array[Byte](2000)) 736 val list4 = List(new Array[Byte](2000), new Array[Byte](2000)) 737 // First store list1 and list2, both in memory, and list3, on disk only 738 store.putIterator( 739 "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) 740 store.putIterator( 741 "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) 742 store.putIterator( 743 "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) 744 val listForSizeEstimate = new ArrayBuffer[Any] 745 listForSizeEstimate ++= list1.iterator 746 val listSize = SizeEstimator.estimate(listForSizeEstimate) 747 // At this point LRU should not kick in because list3 is only on disk 748 assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store") 749 assert(store.get("list1").get.data.size === 2) 750 assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") 751 assert(store.get("list2").get.data.size === 2) 752 assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") 753 assert(store.get("list3").get.data.size === 2) 754 assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store") 755 assert(store.get("list1").get.data.size === 2) 756 assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") 757 assert(store.get("list2").get.data.size === 2) 758 assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") 759 assert(store.get("list3").get.data.size === 2) 760 // Now let's add in list4, which uses both disk and memory; list1 should drop out 761 store.putIterator( 762 "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) 763 assert(store.getAndReleaseLock("list1") === None, "list1 was in store") 764 assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store") 765 assert(store.get("list2").get.data.size === 2) 766 assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store") 767 assert(store.get("list3").get.data.size === 2) 768 assert(store.getAndReleaseLock("list4").isDefined, "list4 was not in store") 769 assert(store.get("list4").get.data.size === 2) 770 } 771 772 test("negative byte values in ByteBufferInputStream") { 773 val buffer = ByteBuffer.wrap(Array[Int](254, 255, 0, 1, 2).map(_.toByte).toArray) 774 val stream = new ByteBufferInputStream(buffer) 775 val temp = new Array[Byte](10) 776 assert(stream.read() === 254, "unexpected byte read") 777 assert(stream.read() === 255, "unexpected byte read") 778 assert(stream.read() === 0, "unexpected byte read") 779 assert(stream.read(temp, 0, temp.length) === 2, "unexpected number of bytes read") 780 assert(stream.read() === -1, "end of stream not signalled") 781 assert(stream.read(temp, 0, temp.length) === -1, "end of stream not signalled") 782 } 783 784 test("overly large block") { 785 store = makeBlockManager(5000) 786 store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) 787 assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store") 788 store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK) 789 assert(!store.memoryStore.contains("a2"), "a2 was in memory store") 790 assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store") 791 } 792 793 test("block compression") { 794 try { 795 conf.set("spark.shuffle.compress", "true") 796 store = makeBlockManager(20000, "exec1") 797 store.putSingle( 798 ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) 799 assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, 800 "shuffle_0_0_0 was not compressed") 801 store.stop() 802 store = null 803 804 conf.set("spark.shuffle.compress", "false") 805 store = makeBlockManager(20000, "exec2") 806 store.putSingle( 807 ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) 808 assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000, 809 "shuffle_0_0_0 was compressed") 810 store.stop() 811 store = null 812 813 conf.set("spark.broadcast.compress", "true") 814 store = makeBlockManager(20000, "exec3") 815 store.putSingle( 816 BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) 817 assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000, 818 "broadcast_0 was not compressed") 819 store.stop() 820 store = null 821 822 conf.set("spark.broadcast.compress", "false") 823 store = makeBlockManager(20000, "exec4") 824 store.putSingle( 825 BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) 826 assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed") 827 store.stop() 828 store = null 829 830 conf.set("spark.rdd.compress", "true") 831 store = makeBlockManager(20000, "exec5") 832 store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) 833 assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed") 834 store.stop() 835 store = null 836 837 conf.set("spark.rdd.compress", "false") 838 store = makeBlockManager(20000, "exec6") 839 store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) 840 assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed") 841 store.stop() 842 store = null 843 844 // Check that any other block types are also kept uncompressed 845 store = makeBlockManager(20000, "exec7") 846 store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY) 847 assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed") 848 store.stop() 849 store = null 850 } finally { 851 System.clearProperty("spark.shuffle.compress") 852 System.clearProperty("spark.broadcast.compress") 853 System.clearProperty("spark.rdd.compress") 854 } 855 } 856 857 test("block store put failure") { 858 // Use Java serializer so we can create an unserializable error. 859 conf.set("spark.testing.memory", "1200") 860 val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) 861 val memoryManager = UnifiedMemoryManager(conf, numCores = 1) 862 val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) 863 store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, 864 serializerManager, conf, memoryManager, mapOutputTracker, 865 shuffleManager, transfer, securityMgr, 0) 866 memoryManager.setMemoryStore(store.memoryStore) 867 store.initialize("app-id") 868 869 // The put should fail since a1 is not serializable. 870 class UnserializableClass 871 val a1 = new UnserializableClass 872 intercept[java.io.NotSerializableException] { 873 store.putSingle("a1", a1, StorageLevel.DISK_ONLY) 874 } 875 876 // Make sure get a1 doesn't hang and returns None. 877 failAfter(1 second) { 878 assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store") 879 } 880 } 881 882 test("updated block statuses") { 883 store = makeBlockManager(12000) 884 store.registerTask(0) 885 val list = List.fill(2)(new Array[Byte](2000)) 886 val bigList = List.fill(8)(new Array[Byte](2000)) 887 888 def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = { 889 val context = TaskContext.empty() 890 try { 891 TaskContext.setTaskContext(context) 892 task 893 } finally { 894 TaskContext.unset() 895 } 896 context.taskMetrics.updatedBlockStatuses 897 } 898 899 // 1 updated block (i.e. list1) 900 val updatedBlocks1 = getUpdatedBlocks { 901 store.putIterator( 902 "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 903 } 904 assert(updatedBlocks1.size === 1) 905 assert(updatedBlocks1.head._1 === TestBlockId("list1")) 906 assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY) 907 908 // 1 updated block (i.e. list2) 909 val updatedBlocks2 = getUpdatedBlocks { 910 store.putIterator( 911 "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) 912 } 913 assert(updatedBlocks2.size === 1) 914 assert(updatedBlocks2.head._1 === TestBlockId("list2")) 915 assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY) 916 917 // 2 updated blocks - list1 is kicked out of memory while list3 is added 918 val updatedBlocks3 = getUpdatedBlocks { 919 store.putIterator( 920 "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 921 } 922 assert(updatedBlocks3.size === 2) 923 updatedBlocks3.foreach { case (id, status) => 924 id match { 925 case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE) 926 case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) 927 case _ => fail("Updated block is neither list1 nor list3") 928 } 929 } 930 assert(store.memoryStore.contains("list3"), "list3 was not in memory store") 931 932 // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added 933 val updatedBlocks4 = getUpdatedBlocks { 934 store.putIterator( 935 "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 936 } 937 assert(updatedBlocks4.size === 2) 938 updatedBlocks4.foreach { case (id, status) => 939 id match { 940 case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY) 941 case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY) 942 case _ => fail("Updated block is neither list2 nor list4") 943 } 944 } 945 assert(store.diskStore.contains("list2"), "list2 was not in disk store") 946 assert(store.memoryStore.contains("list4"), "list4 was not in memory store") 947 948 // No updated blocks - list5 is too big to fit in store and nothing is kicked out 949 val updatedBlocks5 = getUpdatedBlocks { 950 store.putIterator( 951 "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 952 } 953 assert(updatedBlocks5.size === 0) 954 955 // memory store contains only list3 and list4 956 assert(!store.memoryStore.contains("list1"), "list1 was in memory store") 957 assert(!store.memoryStore.contains("list2"), "list2 was in memory store") 958 assert(store.memoryStore.contains("list3"), "list3 was not in memory store") 959 assert(store.memoryStore.contains("list4"), "list4 was not in memory store") 960 assert(!store.memoryStore.contains("list5"), "list5 was in memory store") 961 962 // disk store contains only list2 963 assert(!store.diskStore.contains("list1"), "list1 was in disk store") 964 assert(store.diskStore.contains("list2"), "list2 was not in disk store") 965 assert(!store.diskStore.contains("list3"), "list3 was in disk store") 966 assert(!store.diskStore.contains("list4"), "list4 was in disk store") 967 assert(!store.diskStore.contains("list5"), "list5 was in disk store") 968 969 // remove block - list2 should be removed from disk 970 val updatedBlocks6 = getUpdatedBlocks { 971 store.removeBlock( 972 "list2", tellMaster = true) 973 } 974 assert(updatedBlocks6.size === 1) 975 assert(updatedBlocks6.head._1 === TestBlockId("list2")) 976 assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE) 977 assert(!store.diskStore.contains("list2"), "list2 was in disk store") 978 } 979 980 test("query block statuses") { 981 store = makeBlockManager(12000) 982 val list = List.fill(2)(new Array[Byte](2000)) 983 984 // Tell master. By LRU, only list2 and list3 remains. 985 store.putIterator( 986 "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 987 store.putIterator( 988 "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) 989 store.putIterator( 990 "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 991 992 // getLocations and getBlockStatus should yield the same locations 993 assert(store.master.getLocations("list1").size === 0) 994 assert(store.master.getLocations("list2").size === 1) 995 assert(store.master.getLocations("list3").size === 1) 996 assert(store.master.getBlockStatus("list1", askSlaves = false).size === 0) 997 assert(store.master.getBlockStatus("list2", askSlaves = false).size === 1) 998 assert(store.master.getBlockStatus("list3", askSlaves = false).size === 1) 999 assert(store.master.getBlockStatus("list1", askSlaves = true).size === 0) 1000 assert(store.master.getBlockStatus("list2", askSlaves = true).size === 1) 1001 assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1) 1002 1003 // This time don't tell master and see what happens. By LRU, only list5 and list6 remains. 1004 store.putIterator( 1005 "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) 1006 store.putIterator( 1007 "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) 1008 store.putIterator( 1009 "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) 1010 1011 // getLocations should return nothing because the master is not informed 1012 // getBlockStatus without asking slaves should have the same result 1013 // getBlockStatus with asking slaves, however, should return the actual block statuses 1014 assert(store.master.getLocations("list4").size === 0) 1015 assert(store.master.getLocations("list5").size === 0) 1016 assert(store.master.getLocations("list6").size === 0) 1017 assert(store.master.getBlockStatus("list4", askSlaves = false).size === 0) 1018 assert(store.master.getBlockStatus("list5", askSlaves = false).size === 0) 1019 assert(store.master.getBlockStatus("list6", askSlaves = false).size === 0) 1020 assert(store.master.getBlockStatus("list4", askSlaves = true).size === 0) 1021 assert(store.master.getBlockStatus("list5", askSlaves = true).size === 1) 1022 assert(store.master.getBlockStatus("list6", askSlaves = true).size === 1) 1023 } 1024 1025 test("get matching blocks") { 1026 store = makeBlockManager(12000) 1027 val list = List.fill(2)(new Array[Byte](100)) 1028 1029 // insert some blocks 1030 store.putIterator( 1031 "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) 1032 store.putIterator( 1033 "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) 1034 store.putIterator( 1035 "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) 1036 1037 // getLocations and getBlockStatus should yield the same locations 1038 assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size 1039 === 3) 1040 assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size 1041 === 1) 1042 1043 // insert some more blocks 1044 store.putIterator( 1045 "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) 1046 store.putIterator( 1047 "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) 1048 store.putIterator( 1049 "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) 1050 1051 // getLocations and getBlockStatus should yield the same locations 1052 assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size 1053 === 1) 1054 assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = true).size 1055 === 3) 1056 1057 val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0)) 1058 blockIds.foreach { blockId => 1059 store.putIterator( 1060 blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) 1061 } 1062 val matchedBlockIds = store.master.getMatchingBlockIds(_ match { 1063 case RDDBlockId(1, _) => true 1064 case _ => false 1065 }, askSlaves = true) 1066 assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0), RDDBlockId(1, 1))) 1067 } 1068 1069 test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { 1070 store = makeBlockManager(12000) 1071 store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) 1072 store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) 1073 // Access rdd_1_0 to ensure it's not least recently used. 1074 assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store") 1075 // According to the same-RDD rule, rdd_1_0 should be replaced here. 1076 store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY) 1077 // rdd_1_0 should have been replaced, even it's not least recently used. 1078 assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store") 1079 assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store") 1080 assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") 1081 } 1082 1083 test("safely unroll blocks through putIterator (disk)") { 1084 store = makeBlockManager(12000) 1085 val memoryStore = store.memoryStore 1086 val diskStore = store.diskStore 1087 val smallList = List.fill(40)(new Array[Byte](100)) 1088 val bigList = List.fill(40)(new Array[Byte](1000)) 1089 def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] 1090 def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] 1091 assert(memoryStore.currentUnrollMemoryForThisTask === 0) 1092 1093 store.putIterator("b1", smallIterator, StorageLevel.MEMORY_AND_DISK) 1094 store.putIterator("b2", smallIterator, StorageLevel.MEMORY_AND_DISK) 1095 1096 // Unroll with not enough space. This should succeed but kick out b1 in the process. 1097 // Memory store should contain b2 and b3, while disk store should contain only b1 1098 val result3 = memoryStore.putIteratorAsValues("b3", smallIterator, ClassTag.Any) 1099 assert(result3.isRight) 1100 assert(!memoryStore.contains("b1")) 1101 assert(memoryStore.contains("b2")) 1102 assert(memoryStore.contains("b3")) 1103 assert(diskStore.contains("b1")) 1104 assert(!diskStore.contains("b2")) 1105 assert(!diskStore.contains("b3")) 1106 memoryStore.remove("b3") 1107 store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY) 1108 assert(memoryStore.currentUnrollMemoryForThisTask === 0) 1109 1110 // Unroll huge block with not enough space. This should fail and return an iterator so that 1111 // the block may be stored to disk. During the unrolling process, block "b2" should be kicked 1112 // out, so the memory store should contain only b3, while the disk store should contain 1113 // b1, b2 and b4. 1114 val result4 = memoryStore.putIteratorAsValues("b4", bigIterator, ClassTag.Any) 1115 assert(result4.isLeft) 1116 assert(!memoryStore.contains("b1")) 1117 assert(!memoryStore.contains("b2")) 1118 assert(memoryStore.contains("b3")) 1119 assert(!memoryStore.contains("b4")) 1120 } 1121 1122 test("read-locked blocks cannot be evicted from memory") { 1123 store = makeBlockManager(12000) 1124 val arr = new Array[Byte](4000) 1125 // First store a1 and a2, both in memory, and a3, on disk only 1126 store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER) 1127 store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER) 1128 assert(store.getSingle("a1").isDefined, "a1 was not in store") 1129 assert(store.getSingle("a2").isDefined, "a2 was not in store") 1130 // This put should fail because both a1 and a2 should be read-locked: 1131 store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER) 1132 assert(store.getSingle("a3").isEmpty, "a3 was in store") 1133 assert(store.getSingle("a1").isDefined, "a1 was not in store") 1134 assert(store.getSingle("a2").isDefined, "a2 was not in store") 1135 // Release both pins of block a2: 1136 store.releaseLock("a2") 1137 store.releaseLock("a2") 1138 // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before 1139 // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead: 1140 store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER) 1141 assert(store.getSingle("a2").isEmpty, "a2 was in store") 1142 assert(store.getSingle("a1").isDefined, "a1 was not in store") 1143 assert(store.getSingle("a3").isDefined, "a3 was not in store") 1144 } 1145 1146 private def testReadWithLossOfOnDiskFiles( 1147 storageLevel: StorageLevel, 1148 readMethod: BlockManager => Option[_]): Unit = { 1149 store = makeBlockManager(12000) 1150 assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel)) 1151 assert(store.getStatus("blockId").isDefined) 1152 // Directly delete all files from the disk store, triggering failures when reading blocks: 1153 store.diskBlockManager.getAllFiles().foreach(_.delete()) 1154 // The BlockManager still thinks that these blocks exist: 1155 assert(store.getStatus("blockId").isDefined) 1156 // Because the BlockManager's metadata claims that the block exists (i.e. that it's present 1157 // in at least one store), the read attempts to read it and fails when the on-disk file is 1158 // missing. 1159 intercept[SparkException] { 1160 readMethod(store) 1161 } 1162 // Subsequent read attempts will succeed; the block isn't present but we return an expected 1163 // "block not found" response rather than a fatal error: 1164 assert(readMethod(store).isEmpty) 1165 // The reason why this second read succeeded is because the metadata entry for the missing 1166 // block was removed as a result of the read failure: 1167 assert(store.getStatus("blockId").isEmpty) 1168 } 1169 1170 test("remove block if a read fails due to missing DiskStore files (SPARK-15736)") { 1171 val storageLevels = Seq( 1172 StorageLevel(useDisk = true, useMemory = false, deserialized = false), 1173 StorageLevel(useDisk = true, useMemory = false, deserialized = true)) 1174 val readMethods = Map[String, BlockManager => Option[_]]( 1175 "getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")), 1176 "getLocalValues" -> ((m: BlockManager) => m.getLocalValues("blockId")) 1177 ) 1178 testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, _.getLocalBytes("blockId")) 1179 for ((readMethodName, readMethod) <- readMethods; storageLevel <- storageLevels) { 1180 withClue(s"$readMethodName $storageLevel") { 1181 testReadWithLossOfOnDiskFiles(storageLevel, readMethod) 1182 } 1183 } 1184 } 1185 1186 test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") { 1187 val mockBlockTransferService = 1188 new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5)) 1189 store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) 1190 store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true) 1191 assert(store.getRemoteBytes("item").isEmpty) 1192 } 1193 1194 test("SPARK-13328: refresh block locations (fetch should succeed after location refresh)") { 1195 val maxFailuresBeforeLocationRefresh = 1196 conf.getInt("spark.block.failures.beforeLocationRefresh", 5) 1197 val mockBlockManagerMaster = mock(classOf[BlockManagerMaster]) 1198 val mockBlockTransferService = 1199 new MockBlockTransferService(maxFailuresBeforeLocationRefresh) 1200 // make sure we have more than maxFailuresBeforeLocationRefresh locations 1201 // so that we have a chance to do location refresh 1202 val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh) 1203 .map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) } 1204 when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockManagerIds) 1205 store = makeBlockManager(8000, "executor1", mockBlockManagerMaster, 1206 transferService = Option(mockBlockTransferService)) 1207 val block = store.getRemoteBytes("item") 1208 .asInstanceOf[Option[ByteBuffer]] 1209 assert(block.isDefined) 1210 verify(mockBlockManagerMaster, times(2)).getLocations("item") 1211 } 1212 1213 test("SPARK-17484: block status is properly updated following an exception in put()") { 1214 val mockBlockTransferService = new MockBlockTransferService(maxFailures = 10) { 1215 override def uploadBlock( 1216 hostname: String, 1217 port: Int, execId: String, 1218 blockId: BlockId, 1219 blockData: ManagedBuffer, 1220 level: StorageLevel, 1221 classTag: ClassTag[_]): Future[Unit] = { 1222 throw new InterruptedException("Intentional interrupt") 1223 } 1224 } 1225 store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService)) 1226 store2 = makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService)) 1227 intercept[InterruptedException] { 1228 store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster = true) 1229 } 1230 assert(store.getLocalBytes("item").isEmpty) 1231 assert(master.getLocations("item").isEmpty) 1232 assert(store2.getRemoteBytes("item").isEmpty) 1233 } 1234 1235 test("SPARK-17484: master block locations are updated following an invalid remote block fetch") { 1236 store = makeBlockManager(8000, "executor1") 1237 store2 = makeBlockManager(8000, "executor2") 1238 store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = true) 1239 assert(master.getLocations("item").nonEmpty) 1240 store.removeBlock("item", tellMaster = false) 1241 assert(master.getLocations("item").nonEmpty) 1242 assert(store2.getRemoteBytes("item").isEmpty) 1243 assert(master.getLocations("item").isEmpty) 1244 } 1245 1246 class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { 1247 var numCalls = 0 1248 1249 override def init(blockDataManager: BlockDataManager): Unit = {} 1250 1251 override def fetchBlocks( 1252 host: String, 1253 port: Int, 1254 execId: String, 1255 blockIds: Array[String], 1256 listener: BlockFetchingListener): Unit = { 1257 listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1))) 1258 } 1259 1260 override def close(): Unit = {} 1261 1262 override def hostName: String = { "MockBlockTransferServiceHost" } 1263 1264 override def port: Int = { 63332 } 1265 1266 override def uploadBlock( 1267 hostname: String, 1268 port: Int, execId: String, 1269 blockId: BlockId, 1270 blockData: ManagedBuffer, 1271 level: StorageLevel, 1272 classTag: ClassTag[_]): Future[Unit] = { 1273 import scala.concurrent.ExecutionContext.Implicits.global 1274 Future {} 1275 } 1276 1277 override def fetchBlockSync( 1278 host: String, 1279 port: Int, 1280 execId: String, 1281 blockId: String): ManagedBuffer = { 1282 numCalls += 1 1283 if (numCalls <= maxFailures) { 1284 throw new RuntimeException("Failing block fetch in the mock block transfer service") 1285 } 1286 super.fetchBlockSync(host, port, execId, blockId) 1287 } 1288 } 1289} 1290 1291private object BlockManagerSuite { 1292 1293 private implicit class BlockManagerTestUtils(store: BlockManager) { 1294 1295 def dropFromMemoryIfExists( 1296 blockId: BlockId, 1297 data: () => Either[Array[Any], ChunkedByteBuffer]): Unit = { 1298 store.blockInfoManager.lockForWriting(blockId).foreach { info => 1299 val newEffectiveStorageLevel = store.dropFromMemory(blockId, data) 1300 if (newEffectiveStorageLevel.isValid) { 1301 // The block is still present in at least one store, so release the lock 1302 // but don't delete the block info 1303 store.releaseLock(blockId) 1304 } else { 1305 // The block isn't present in any store, so delete the block info so that the 1306 // block can be stored again 1307 store.blockInfoManager.removeBlock(blockId) 1308 } 1309 } 1310 } 1311 1312 private def wrapGet[T](f: BlockId => Option[T]): BlockId => Option[T] = (blockId: BlockId) => { 1313 val result = f(blockId) 1314 if (result.isDefined) { 1315 store.releaseLock(blockId) 1316 } 1317 result 1318 } 1319 1320 def hasLocalBlock(blockId: BlockId): Boolean = { 1321 getLocalAndReleaseLock(blockId).isDefined 1322 } 1323 1324 val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues) 1325 val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get) 1326 val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle) 1327 val getLocalBytesAndReleaseLock: (BlockId) => Option[ChunkedByteBuffer] = { 1328 wrapGet(store.getLocalBytes) 1329 } 1330 } 1331 1332} 1333