1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.storage
19
20import java.nio.ByteBuffer
21
22import scala.collection.mutable.ArrayBuffer
23import scala.concurrent.duration._
24import scala.concurrent.Future
25import scala.language.implicitConversions
26import scala.language.postfixOps
27import scala.reflect.ClassTag
28
29import org.mockito.{Matchers => mc}
30import org.mockito.Mockito.{mock, times, verify, when}
31import org.scalatest._
32import org.scalatest.concurrent.Eventually._
33import org.scalatest.concurrent.Timeouts._
34
35import org.apache.spark._
36import org.apache.spark.broadcast.BroadcastManager
37import org.apache.spark.executor.DataReadMethod
38import org.apache.spark.memory.UnifiedMemoryManager
39import org.apache.spark.network.{BlockDataManager, BlockTransferService}
40import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
41import org.apache.spark.network.netty.NettyBlockTransferService
42import org.apache.spark.network.shuffle.BlockFetchingListener
43import org.apache.spark.rpc.RpcEnv
44import org.apache.spark.scheduler.LiveListenerBus
45import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
46import org.apache.spark.shuffle.sort.SortShuffleManager
47import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
48import org.apache.spark.util._
49import org.apache.spark.util.io.ChunkedByteBuffer
50
51class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
52  with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {
53
54  import BlockManagerSuite._
55
56  var conf: SparkConf = null
57  var store: BlockManager = null
58  var store2: BlockManager = null
59  var store3: BlockManager = null
60  var rpcEnv: RpcEnv = null
61  var master: BlockManagerMaster = null
62  val securityMgr = new SecurityManager(new SparkConf(false))
63  val bcastManager = new BroadcastManager(true, new SparkConf(false), securityMgr)
64  val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false), bcastManager, true)
65  val shuffleManager = new SortShuffleManager(new SparkConf(false))
66
67  // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
68  val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
69
70  // Implicitly convert strings to BlockIds for test clarity.
71  implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
72  def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
73
74  private def makeBlockManager(
75      maxMem: Long,
76      name: String = SparkContext.DRIVER_IDENTIFIER,
77      master: BlockManagerMaster = this.master,
78      transferService: Option[BlockTransferService] = Option.empty): BlockManager = {
79    conf.set("spark.testing.memory", maxMem.toString)
80    conf.set("spark.memory.offHeap.size", maxMem.toString)
81    val serializer = new KryoSerializer(conf)
82    val transfer = transferService
83      .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1))
84    val memManager = UnifiedMemoryManager(conf, numCores = 1)
85    val serializerManager = new SerializerManager(serializer, conf)
86    val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
87      memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
88    memManager.setMemoryStore(blockManager.memoryStore)
89    blockManager.initialize("app-id")
90    blockManager
91  }
92
93  override def beforeEach(): Unit = {
94    super.beforeEach()
95    // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
96    System.setProperty("os.arch", "amd64")
97    conf = new SparkConf(false)
98      .set("spark.app.id", "test")
99      .set("spark.testing", "true")
100      .set("spark.memory.fraction", "1")
101      .set("spark.memory.storageFraction", "1")
102      .set("spark.kryoserializer.buffer", "1m")
103      .set("spark.test.useCompressedOops", "true")
104      .set("spark.storage.unrollFraction", "0.4")
105      .set("spark.storage.unrollMemoryThreshold", "512")
106
107    rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
108    conf.set("spark.driver.port", rpcEnv.address.port.toString)
109
110    // Mock SparkContext to reduce the memory usage of tests. It's fine since the only reason we
111    // need to create a SparkContext is to initialize LiveListenerBus.
112    sc = mock(classOf[SparkContext])
113    when(sc.conf).thenReturn(conf)
114    master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
115      new BlockManagerMasterEndpoint(rpcEnv, true, conf,
116        new LiveListenerBus(sc))), conf, true)
117
118    val initialize = PrivateMethod[Unit]('initialize)
119    SizeEstimator invokePrivate initialize()
120  }
121
122  override def afterEach(): Unit = {
123    try {
124      conf = null
125      if (store != null) {
126        store.stop()
127        store = null
128      }
129      if (store2 != null) {
130        store2.stop()
131        store2 = null
132      }
133      if (store3 != null) {
134        store3.stop()
135        store3 = null
136      }
137      rpcEnv.shutdown()
138      rpcEnv.awaitTermination()
139      rpcEnv = null
140      master = null
141    } finally {
142      super.afterEach()
143    }
144  }
145
146  test("StorageLevel object caching") {
147    val level1 = StorageLevel(false, false, false, 3)
148    // this should return the same object as level1
149    val level2 = StorageLevel(false, false, false, 3)
150    // this should return a different object
151    val level3 = StorageLevel(false, false, false, 2)
152    assert(level2 === level1, "level2 is not same as level1")
153    assert(level2.eq(level1), "level2 is not the same object as level1")
154    assert(level3 != level1, "level3 is same as level1")
155    val bytes1 = Utils.serialize(level1)
156    val level1_ = Utils.deserialize[StorageLevel](bytes1)
157    val bytes2 = Utils.serialize(level2)
158    val level2_ = Utils.deserialize[StorageLevel](bytes2)
159    assert(level1_ === level1, "Deserialized level1 not same as original level1")
160    assert(level1_.eq(level1), "Deserialized level1 not the same object as original level2")
161    assert(level2_ === level2, "Deserialized level2 not same as original level2")
162    assert(level2_.eq(level1), "Deserialized level2 not the same object as original level1")
163  }
164
165  test("BlockManagerId object caching") {
166    val id1 = BlockManagerId("e1", "XXX", 1)
167    val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1
168    val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object
169    assert(id2 === id1, "id2 is not same as id1")
170    assert(id2.eq(id1), "id2 is not the same object as id1")
171    assert(id3 != id1, "id3 is same as id1")
172    val bytes1 = Utils.serialize(id1)
173    val id1_ = Utils.deserialize[BlockManagerId](bytes1)
174    val bytes2 = Utils.serialize(id2)
175    val id2_ = Utils.deserialize[BlockManagerId](bytes2)
176    assert(id1_ === id1, "Deserialized id1 is not same as original id1")
177    assert(id1_.eq(id1), "Deserialized id1 is not the same object as original id1")
178    assert(id2_ === id2, "Deserialized id2 is not same as original id2")
179    assert(id2_.eq(id1), "Deserialized id2 is not the same object as original id1")
180  }
181
182  test("BlockManagerId.isDriver() backwards-compatibility with legacy driver ids (SPARK-6716)") {
183    assert(BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "XXX", 1).isDriver)
184    assert(BlockManagerId(SparkContext.LEGACY_DRIVER_IDENTIFIER, "XXX", 1).isDriver)
185    assert(!BlockManagerId("notADriverIdentifier", "XXX", 1).isDriver)
186  }
187
188  test("master + 1 manager interaction") {
189    store = makeBlockManager(20000)
190    val a1 = new Array[Byte](4000)
191    val a2 = new Array[Byte](4000)
192    val a3 = new Array[Byte](4000)
193
194    // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
195    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
196    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
197    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
198
199    // Checking whether blocks are in memory
200    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
201    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
202    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
203
204    // Checking whether master knows about the blocks or not
205    assert(master.getLocations("a1").size > 0, "master was not told about a1")
206    assert(master.getLocations("a2").size > 0, "master was not told about a2")
207    assert(master.getLocations("a3").size === 0, "master was told about a3")
208
209    // Drop a1 and a2 from memory; this should be reported back to the master
210    store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer])
211    store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer])
212    assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store")
213    assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store")
214    assert(master.getLocations("a1").size === 0, "master did not remove a1")
215    assert(master.getLocations("a2").size === 0, "master did not remove a2")
216  }
217
218  test("master + 2 managers interaction") {
219    store = makeBlockManager(2000, "exec1")
220    store2 = makeBlockManager(2000, "exec2")
221
222    val peers = master.getPeers(store.blockManagerId)
223    assert(peers.size === 1, "master did not return the other manager as a peer")
224    assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
225
226    val a1 = new Array[Byte](400)
227    val a2 = new Array[Byte](400)
228    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
229    store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
230    assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
231    assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
232  }
233
234  test("removing block") {
235    store = makeBlockManager(20000)
236    val a1 = new Array[Byte](4000)
237    val a2 = new Array[Byte](4000)
238    val a3 = new Array[Byte](4000)
239
240    // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
241    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
242    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
243    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
244
245    // Checking whether blocks are in memory and memory size
246    val memStatus = master.getMemoryStatus.head._2
247    assert(memStatus._1 == 40000L, "total memory " + memStatus._1 + " should equal 40000")
248    assert(memStatus._2 <= 32000L, "remaining memory " + memStatus._2 + " should <= 12000")
249    assert(store.getSingleAndReleaseLock("a1-to-remove").isDefined, "a1 was not in store")
250    assert(store.getSingleAndReleaseLock("a2-to-remove").isDefined, "a2 was not in store")
251    assert(store.getSingleAndReleaseLock("a3-to-remove").isDefined, "a3 was not in store")
252
253    // Checking whether master knows about the blocks or not
254    assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
255    assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2")
256    assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3")
257
258    // Remove a1 and a2 and a3. Should be no-op for a3.
259    master.removeBlock("a1-to-remove")
260    master.removeBlock("a2-to-remove")
261    master.removeBlock("a3-to-remove")
262
263    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
264      assert(!store.hasLocalBlock("a1-to-remove"))
265      master.getLocations("a1-to-remove") should have size 0
266    }
267    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
268      assert(!store.hasLocalBlock("a2-to-remove"))
269      master.getLocations("a2-to-remove") should have size 0
270    }
271    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
272      assert(store.hasLocalBlock("a3-to-remove"))
273      master.getLocations("a3-to-remove") should have size 0
274    }
275    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
276      val memStatus = master.getMemoryStatus.head._2
277      memStatus._1 should equal (40000L)
278      memStatus._2 should equal (40000L)
279    }
280  }
281
282  test("removing rdd") {
283    store = makeBlockManager(20000)
284    val a1 = new Array[Byte](4000)
285    val a2 = new Array[Byte](4000)
286    val a3 = new Array[Byte](4000)
287    // Putting a1, a2 and a3 in memory.
288    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
289    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
290    store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
291    master.removeRdd(0, blocking = false)
292
293    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
294      store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
295      master.getLocations(rdd(0, 0)) should have size 0
296    }
297    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
298      store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
299      master.getLocations(rdd(0, 1)) should have size 0
300    }
301    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
302      store.getSingleAndReleaseLock("nonrddblock") should not be (None)
303      master.getLocations("nonrddblock") should have size (1)
304    }
305
306    store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
307    store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
308    master.removeRdd(0, blocking = true)
309    store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
310    master.getLocations(rdd(0, 0)) should have size 0
311    store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
312    master.getLocations(rdd(0, 1)) should have size 0
313  }
314
315  test("removing broadcast") {
316    store = makeBlockManager(2000)
317    val driverStore = store
318    val executorStore = makeBlockManager(2000, "executor")
319    val a1 = new Array[Byte](400)
320    val a2 = new Array[Byte](400)
321    val a3 = new Array[Byte](400)
322    val a4 = new Array[Byte](400)
323
324    val broadcast0BlockId = BroadcastBlockId(0)
325    val broadcast1BlockId = BroadcastBlockId(1)
326    val broadcast2BlockId = BroadcastBlockId(2)
327    val broadcast2BlockId2 = BroadcastBlockId(2, "_")
328
329    // insert broadcast blocks in both the stores
330    Seq(driverStore, executorStore).foreach { case s =>
331      s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
332      s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
333      s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
334      s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
335    }
336
337    // verify whether the blocks exist in both the stores
338    Seq(driverStore, executorStore).foreach { case s =>
339      assert(s.hasLocalBlock(broadcast0BlockId))
340      assert(s.hasLocalBlock(broadcast1BlockId))
341      assert(s.hasLocalBlock(broadcast2BlockId))
342      assert(s.hasLocalBlock(broadcast2BlockId2))
343    }
344
345    // remove broadcast 0 block only from executors
346    master.removeBroadcast(0, removeFromMaster = false, blocking = true)
347
348    // only broadcast 0 block should be removed from the executor store
349    assert(!executorStore.hasLocalBlock(broadcast0BlockId))
350    assert(executorStore.hasLocalBlock(broadcast1BlockId))
351    assert(executorStore.hasLocalBlock(broadcast2BlockId))
352
353    // nothing should be removed from the driver store
354    assert(driverStore.hasLocalBlock(broadcast0BlockId))
355    assert(driverStore.hasLocalBlock(broadcast1BlockId))
356    assert(driverStore.hasLocalBlock(broadcast2BlockId))
357
358    // remove broadcast 0 block from the driver as well
359    master.removeBroadcast(0, removeFromMaster = true, blocking = true)
360    assert(!driverStore.hasLocalBlock(broadcast0BlockId))
361    assert(driverStore.hasLocalBlock(broadcast1BlockId))
362
363    // remove broadcast 1 block from both the stores asynchronously
364    // and verify all broadcast 1 blocks have been removed
365    master.removeBroadcast(1, removeFromMaster = true, blocking = false)
366    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
367      assert(!driverStore.hasLocalBlock(broadcast1BlockId))
368      assert(!executorStore.hasLocalBlock(broadcast1BlockId))
369    }
370
371    // remove broadcast 2 from both the stores asynchronously
372    // and verify all broadcast 2 blocks have been removed
373    master.removeBroadcast(2, removeFromMaster = true, blocking = false)
374    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
375      assert(!driverStore.hasLocalBlock(broadcast2BlockId))
376      assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
377      assert(!executorStore.hasLocalBlock(broadcast2BlockId))
378      assert(!executorStore.hasLocalBlock(broadcast2BlockId2))
379    }
380    executorStore.stop()
381    driverStore.stop()
382    store = null
383  }
384
385  test("reregistration on heart beat") {
386    store = makeBlockManager(2000)
387    val a1 = new Array[Byte](400)
388
389    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
390
391    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
392    assert(master.getLocations("a1").size > 0, "master was not told about a1")
393
394    master.removeExecutor(store.blockManagerId.executorId)
395    assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
396
397    val reregister = !master.driverEndpoint.askWithRetry[Boolean](
398      BlockManagerHeartbeat(store.blockManagerId))
399    assert(reregister == true)
400  }
401
402  test("reregistration on block update") {
403    store = makeBlockManager(2000)
404    val a1 = new Array[Byte](400)
405    val a2 = new Array[Byte](400)
406
407    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
408    assert(master.getLocations("a1").size > 0, "master was not told about a1")
409
410    master.removeExecutor(store.blockManagerId.executorId)
411    assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
412
413    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
414    store.waitForAsyncReregister()
415
416    assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
417    assert(master.getLocations("a2").size > 0, "master was not told about a2")
418  }
419
420  test("reregistration doesn't dead lock") {
421    store = makeBlockManager(2000)
422    val a1 = new Array[Byte](400)
423    val a2 = List(new Array[Byte](400))
424
425    // try many times to trigger any deadlocks
426    for (i <- 1 to 100) {
427      master.removeExecutor(store.blockManagerId.executorId)
428      val t1 = new Thread {
429        override def run() {
430          store.putIterator(
431            "a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
432        }
433      }
434      val t2 = new Thread {
435        override def run() {
436          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
437        }
438      }
439      val t3 = new Thread {
440        override def run() {
441          store.reregister()
442        }
443      }
444
445      t1.start()
446      t2.start()
447      t3.start()
448      t1.join()
449      t2.join()
450      t3.join()
451
452      store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer])
453      store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer])
454      store.waitForAsyncReregister()
455    }
456  }
457
458  test("correct BlockResult returned from get() calls") {
459    store = makeBlockManager(12000)
460    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
461    val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
462    val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
463    val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
464    store.putIterator(
465      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
466    store.putIterator(
467      "list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
468    store.putIterator(
469      "list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
470    val list1Get = store.get("list1")
471    assert(list1Get.isDefined, "list1 expected to be in store")
472    assert(list1Get.get.data.size === 2)
473    assert(list1Get.get.bytes === list1SizeEstimate)
474    assert(list1Get.get.readMethod === DataReadMethod.Memory)
475    val list2MemoryGet = store.get("list2memory")
476    assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
477    assert(list2MemoryGet.get.data.size === 3)
478    assert(list2MemoryGet.get.bytes === list2SizeEstimate)
479    assert(list2MemoryGet.get.readMethod === DataReadMethod.Memory)
480    val list2DiskGet = store.get("list2disk")
481    assert(list2DiskGet.isDefined, "list2memory expected to be in store")
482    assert(list2DiskGet.get.data.size === 3)
483    // We don't know the exact size of the data on disk, but it should certainly be > 0.
484    assert(list2DiskGet.get.bytes > 0)
485    assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
486  }
487
488  test("optimize a location order of blocks") {
489    val localHost = Utils.localHostName()
490    val otherHost = "otherHost"
491    val bmMaster = mock(classOf[BlockManagerMaster])
492    val bmId1 = BlockManagerId("id1", localHost, 1)
493    val bmId2 = BlockManagerId("id2", localHost, 2)
494    val bmId3 = BlockManagerId("id3", otherHost, 3)
495    when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3))
496
497    val blockManager = makeBlockManager(128, "exec", bmMaster)
498    val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
499    val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
500    assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
501  }
502
503  test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
504    conf.set("spark.shuffle.io.maxRetries", "0")
505    store = makeBlockManager(8000, "executor1")
506    store2 = makeBlockManager(8000, "executor2")
507    store3 = makeBlockManager(8000, "executor3")
508    val list1 = List(new Array[Byte](4000))
509    store2.putIterator(
510      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
511    store3.putIterator(
512      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
513    assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
514    store2.stop()
515    store2 = null
516    assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
517    store3.stop()
518    store3 = null
519    // Should return None instead of throwing an exception:
520    assert(store.getRemoteBytes("list1").isEmpty)
521  }
522
523  test("SPARK-14252: getOrElseUpdate should still read from remote storage") {
524    store = makeBlockManager(8000, "executor1")
525    store2 = makeBlockManager(8000, "executor2")
526    val list1 = List(new Array[Byte](4000))
527    store2.putIterator(
528      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
529    assert(store.getOrElseUpdate(
530      "list1",
531      StorageLevel.MEMORY_ONLY,
532      ClassTag.Any,
533      () => throw new AssertionError("attempted to compute locally")).isLeft)
534  }
535
536  test("in-memory LRU storage") {
537    testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY)
538  }
539
540  test("in-memory LRU storage with serialization") {
541    testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER)
542  }
543
544  test("in-memory LRU storage with off-heap") {
545    testInMemoryLRUStorage(StorageLevel(
546      useDisk = false,
547      useMemory = true,
548      useOffHeap = true,
549      deserialized = false, replication = 1))
550  }
551
552  private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = {
553    store = makeBlockManager(12000)
554    val a1 = new Array[Byte](4000)
555    val a2 = new Array[Byte](4000)
556    val a3 = new Array[Byte](4000)
557    store.putSingle("a1", a1, storageLevel)
558    store.putSingle("a2", a2, storageLevel)
559    store.putSingle("a3", a3, storageLevel)
560    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
561    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
562    assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
563    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
564    // At this point a2 was gotten last, so LRU will getSingle rid of a3
565    store.putSingle("a1", a1, storageLevel)
566    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
567    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
568    assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
569  }
570
571  test("in-memory LRU for partitions of same RDD") {
572    store = makeBlockManager(12000)
573    val a1 = new Array[Byte](4000)
574    val a2 = new Array[Byte](4000)
575    val a3 = new Array[Byte](4000)
576    store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
577    store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
578    store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
579    // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
580    // from the same RDD
581    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
582    assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
583    assert(store.getSingleAndReleaseLock(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
584    // Check that rdd_0_3 doesn't replace them even after further accesses
585    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
586    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
587    assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
588  }
589
590  test("in-memory LRU for partitions of multiple RDDs") {
591    store = makeBlockManager(12000)
592    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
593    store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
594    store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
595    // At this point rdd_1_1 should've replaced rdd_0_1
596    assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
597    assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
598    assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
599    // Do a get() on rdd_0_2 so that it is the most recently used item
600    assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
601    // Put in more partitions from RDD 0; they should replace rdd_1_1
602    store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
603    store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
604    // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
605    // when we try to add rdd_0_4.
606    assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
607    assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
608    assert(!store.memoryStore.contains(rdd(0, 4)), "rdd_0_4 was in store")
609    assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
610    assert(store.memoryStore.contains(rdd(0, 3)), "rdd_0_3 was not in store")
611  }
612
613  test("on-disk storage") {
614    store = makeBlockManager(1200)
615    val a1 = new Array[Byte](400)
616    val a2 = new Array[Byte](400)
617    val a3 = new Array[Byte](400)
618    store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
619    store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
620    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
621    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
622    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
623    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
624  }
625
626  test("disk and memory storage") {
627    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = false)
628  }
629
630  test("disk and memory storage with getLocalBytes") {
631    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = true)
632  }
633
634  test("disk and memory storage with serialization") {
635    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = false)
636  }
637
638  test("disk and memory storage with serialization and getLocalBytes") {
639    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true)
640  }
641
642  test("disk and off-heap memory storage") {
643    testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = false)
644  }
645
646  test("disk and off-heap memory storage with getLocalBytes") {
647    testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = true)
648  }
649
650  def testDiskAndMemoryStorage(
651      storageLevel: StorageLevel,
652      getAsBytes: Boolean): Unit = {
653    store = makeBlockManager(12000)
654    val accessMethod =
655      if (getAsBytes) store.getLocalBytesAndReleaseLock else store.getSingleAndReleaseLock
656    val a1 = new Array[Byte](4000)
657    val a2 = new Array[Byte](4000)
658    val a3 = new Array[Byte](4000)
659    store.putSingle("a1", a1, storageLevel)
660    store.putSingle("a2", a2, storageLevel)
661    store.putSingle("a3", a3, storageLevel)
662    assert(accessMethod("a2").isDefined, "a2 was not in store")
663    assert(accessMethod("a3").isDefined, "a3 was not in store")
664    assert(accessMethod("a1").isDefined, "a1 was not in store")
665    val dataShouldHaveBeenCachedBackIntoMemory = {
666      if (storageLevel.deserialized) {
667        !getAsBytes
668      } else {
669        // If the block's storage level is serialized, then always cache the bytes in memory, even
670        // if the caller requested values.
671        true
672      }
673    }
674    if (dataShouldHaveBeenCachedBackIntoMemory) {
675      assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
676    } else {
677      assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
678    }
679  }
680
681  test("LRU with mixed storage levels") {
682    store = makeBlockManager(12000)
683    val a1 = new Array[Byte](4000)
684    val a2 = new Array[Byte](4000)
685    val a3 = new Array[Byte](4000)
686    val a4 = new Array[Byte](4000)
687    // First store a1 and a2, both in memory, and a3, on disk only
688    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
689    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
690    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
691    // At this point LRU should not kick in because a3 is only on disk
692    assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
693    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
694    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
695    // Now let's add in a4, which uses both disk and memory; a1 should drop out
696    store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
697    assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
698    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
699    assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
700    assert(store.getSingleAndReleaseLock("a4").isDefined, "a4 was not in store")
701  }
702
703  test("in-memory LRU with streams") {
704    store = makeBlockManager(12000)
705    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
706    val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
707    val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
708    store.putIterator(
709      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
710    store.putIterator(
711      "list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
712    store.putIterator(
713      "list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
714    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
715    assert(store.get("list2").get.data.size === 2)
716    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
717    assert(store.get("list3").get.data.size === 2)
718    assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
719    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
720    assert(store.get("list2").get.data.size === 2)
721    // At this point list2 was gotten last, so LRU will getSingle rid of list3
722    store.putIterator(
723      "list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
724    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
725    assert(store.get("list1").get.data.size === 2)
726    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
727    assert(store.get("list2").get.data.size === 2)
728    assert(store.getAndReleaseLock("list3") === None, "list1 was in store")
729  }
730
731  test("LRU with mixed storage levels and streams") {
732    store = makeBlockManager(12000)
733    val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
734    val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
735    val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
736    val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
737    // First store list1 and list2, both in memory, and list3, on disk only
738    store.putIterator(
739      "list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
740    store.putIterator(
741      "list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
742    store.putIterator(
743      "list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
744    val listForSizeEstimate = new ArrayBuffer[Any]
745    listForSizeEstimate ++= list1.iterator
746    val listSize = SizeEstimator.estimate(listForSizeEstimate)
747    // At this point LRU should not kick in because list3 is only on disk
748    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
749    assert(store.get("list1").get.data.size === 2)
750    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
751    assert(store.get("list2").get.data.size === 2)
752    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
753    assert(store.get("list3").get.data.size === 2)
754    assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
755    assert(store.get("list1").get.data.size === 2)
756    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
757    assert(store.get("list2").get.data.size === 2)
758    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
759    assert(store.get("list3").get.data.size === 2)
760    // Now let's add in list4, which uses both disk and memory; list1 should drop out
761    store.putIterator(
762      "list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
763    assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
764    assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
765    assert(store.get("list2").get.data.size === 2)
766    assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
767    assert(store.get("list3").get.data.size === 2)
768    assert(store.getAndReleaseLock("list4").isDefined, "list4 was not in store")
769    assert(store.get("list4").get.data.size === 2)
770  }
771
772  test("negative byte values in ByteBufferInputStream") {
773    val buffer = ByteBuffer.wrap(Array[Int](254, 255, 0, 1, 2).map(_.toByte).toArray)
774    val stream = new ByteBufferInputStream(buffer)
775    val temp = new Array[Byte](10)
776    assert(stream.read() === 254, "unexpected byte read")
777    assert(stream.read() === 255, "unexpected byte read")
778    assert(stream.read() === 0, "unexpected byte read")
779    assert(stream.read(temp, 0, temp.length) === 2, "unexpected number of bytes read")
780    assert(stream.read() === -1, "end of stream not signalled")
781    assert(stream.read(temp, 0, temp.length) === -1, "end of stream not signalled")
782  }
783
784  test("overly large block") {
785    store = makeBlockManager(5000)
786    store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
787    assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
788    store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
789    assert(!store.memoryStore.contains("a2"), "a2 was in memory store")
790    assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
791  }
792
793  test("block compression") {
794    try {
795      conf.set("spark.shuffle.compress", "true")
796      store = makeBlockManager(20000, "exec1")
797      store.putSingle(
798        ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
799      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
800        "shuffle_0_0_0 was not compressed")
801      store.stop()
802      store = null
803
804      conf.set("spark.shuffle.compress", "false")
805      store = makeBlockManager(20000, "exec2")
806      store.putSingle(
807        ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
808      assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
809        "shuffle_0_0_0 was compressed")
810      store.stop()
811      store = null
812
813      conf.set("spark.broadcast.compress", "true")
814      store = makeBlockManager(20000, "exec3")
815      store.putSingle(
816        BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
817      assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
818        "broadcast_0 was not compressed")
819      store.stop()
820      store = null
821
822      conf.set("spark.broadcast.compress", "false")
823      store = makeBlockManager(20000, "exec4")
824      store.putSingle(
825        BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
826      assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
827      store.stop()
828      store = null
829
830      conf.set("spark.rdd.compress", "true")
831      store = makeBlockManager(20000, "exec5")
832      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
833      assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
834      store.stop()
835      store = null
836
837      conf.set("spark.rdd.compress", "false")
838      store = makeBlockManager(20000, "exec6")
839      store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
840      assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
841      store.stop()
842      store = null
843
844      // Check that any other block types are also kept uncompressed
845      store = makeBlockManager(20000, "exec7")
846      store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
847      assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
848      store.stop()
849      store = null
850    } finally {
851      System.clearProperty("spark.shuffle.compress")
852      System.clearProperty("spark.broadcast.compress")
853      System.clearProperty("spark.rdd.compress")
854    }
855  }
856
857  test("block store put failure") {
858    // Use Java serializer so we can create an unserializable error.
859    conf.set("spark.testing.memory", "1200")
860    val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
861    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
862    val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
863    store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
864      serializerManager, conf, memoryManager, mapOutputTracker,
865      shuffleManager, transfer, securityMgr, 0)
866    memoryManager.setMemoryStore(store.memoryStore)
867    store.initialize("app-id")
868
869    // The put should fail since a1 is not serializable.
870    class UnserializableClass
871    val a1 = new UnserializableClass
872    intercept[java.io.NotSerializableException] {
873      store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
874    }
875
876    // Make sure get a1 doesn't hang and returns None.
877    failAfter(1 second) {
878      assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store")
879    }
880  }
881
882  test("updated block statuses") {
883    store = makeBlockManager(12000)
884    store.registerTask(0)
885    val list = List.fill(2)(new Array[Byte](2000))
886    val bigList = List.fill(8)(new Array[Byte](2000))
887
888    def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = {
889      val context = TaskContext.empty()
890      try {
891        TaskContext.setTaskContext(context)
892        task
893      } finally {
894        TaskContext.unset()
895      }
896      context.taskMetrics.updatedBlockStatuses
897    }
898
899    // 1 updated block (i.e. list1)
900    val updatedBlocks1 = getUpdatedBlocks {
901      store.putIterator(
902        "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
903    }
904    assert(updatedBlocks1.size === 1)
905    assert(updatedBlocks1.head._1 === TestBlockId("list1"))
906    assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
907
908    // 1 updated block (i.e. list2)
909    val updatedBlocks2 = getUpdatedBlocks {
910      store.putIterator(
911        "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
912    }
913    assert(updatedBlocks2.size === 1)
914    assert(updatedBlocks2.head._1 === TestBlockId("list2"))
915    assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
916
917    // 2 updated blocks - list1 is kicked out of memory while list3 is added
918    val updatedBlocks3 = getUpdatedBlocks {
919      store.putIterator(
920        "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
921    }
922    assert(updatedBlocks3.size === 2)
923    updatedBlocks3.foreach { case (id, status) =>
924      id match {
925        case TestBlockId("list1") => assert(status.storageLevel === StorageLevel.NONE)
926        case TestBlockId("list3") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
927        case _ => fail("Updated block is neither list1 nor list3")
928      }
929    }
930    assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
931
932    // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
933    val updatedBlocks4 = getUpdatedBlocks {
934      store.putIterator(
935        "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
936    }
937    assert(updatedBlocks4.size === 2)
938    updatedBlocks4.foreach { case (id, status) =>
939      id match {
940        case TestBlockId("list2") => assert(status.storageLevel === StorageLevel.DISK_ONLY)
941        case TestBlockId("list4") => assert(status.storageLevel === StorageLevel.MEMORY_ONLY)
942        case _ => fail("Updated block is neither list2 nor list4")
943      }
944    }
945    assert(store.diskStore.contains("list2"), "list2 was not in disk store")
946    assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
947
948    // No updated blocks - list5 is too big to fit in store and nothing is kicked out
949    val updatedBlocks5 = getUpdatedBlocks {
950      store.putIterator(
951        "list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
952    }
953    assert(updatedBlocks5.size === 0)
954
955    // memory store contains only list3 and list4
956    assert(!store.memoryStore.contains("list1"), "list1 was in memory store")
957    assert(!store.memoryStore.contains("list2"), "list2 was in memory store")
958    assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
959    assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
960    assert(!store.memoryStore.contains("list5"), "list5 was in memory store")
961
962    // disk store contains only list2
963    assert(!store.diskStore.contains("list1"), "list1 was in disk store")
964    assert(store.diskStore.contains("list2"), "list2 was not in disk store")
965    assert(!store.diskStore.contains("list3"), "list3 was in disk store")
966    assert(!store.diskStore.contains("list4"), "list4 was in disk store")
967    assert(!store.diskStore.contains("list5"), "list5 was in disk store")
968
969    // remove block - list2 should be removed from disk
970    val updatedBlocks6 = getUpdatedBlocks {
971      store.removeBlock(
972        "list2", tellMaster = true)
973    }
974    assert(updatedBlocks6.size === 1)
975    assert(updatedBlocks6.head._1 === TestBlockId("list2"))
976    assert(updatedBlocks6.head._2.storageLevel == StorageLevel.NONE)
977    assert(!store.diskStore.contains("list2"), "list2 was in disk store")
978  }
979
980  test("query block statuses") {
981    store = makeBlockManager(12000)
982    val list = List.fill(2)(new Array[Byte](2000))
983
984    // Tell master. By LRU, only list2 and list3 remains.
985    store.putIterator(
986      "list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
987    store.putIterator(
988      "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
989    store.putIterator(
990      "list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
991
992    // getLocations and getBlockStatus should yield the same locations
993    assert(store.master.getLocations("list1").size === 0)
994    assert(store.master.getLocations("list2").size === 1)
995    assert(store.master.getLocations("list3").size === 1)
996    assert(store.master.getBlockStatus("list1", askSlaves = false).size === 0)
997    assert(store.master.getBlockStatus("list2", askSlaves = false).size === 1)
998    assert(store.master.getBlockStatus("list3", askSlaves = false).size === 1)
999    assert(store.master.getBlockStatus("list1", askSlaves = true).size === 0)
1000    assert(store.master.getBlockStatus("list2", askSlaves = true).size === 1)
1001    assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
1002
1003    // This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
1004    store.putIterator(
1005      "list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
1006    store.putIterator(
1007      "list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
1008    store.putIterator(
1009      "list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
1010
1011    // getLocations should return nothing because the master is not informed
1012    // getBlockStatus without asking slaves should have the same result
1013    // getBlockStatus with asking slaves, however, should return the actual block statuses
1014    assert(store.master.getLocations("list4").size === 0)
1015    assert(store.master.getLocations("list5").size === 0)
1016    assert(store.master.getLocations("list6").size === 0)
1017    assert(store.master.getBlockStatus("list4", askSlaves = false).size === 0)
1018    assert(store.master.getBlockStatus("list5", askSlaves = false).size === 0)
1019    assert(store.master.getBlockStatus("list6", askSlaves = false).size === 0)
1020    assert(store.master.getBlockStatus("list4", askSlaves = true).size === 0)
1021    assert(store.master.getBlockStatus("list5", askSlaves = true).size === 1)
1022    assert(store.master.getBlockStatus("list6", askSlaves = true).size === 1)
1023  }
1024
1025  test("get matching blocks") {
1026    store = makeBlockManager(12000)
1027    val list = List.fill(2)(new Array[Byte](100))
1028
1029    // insert some blocks
1030    store.putIterator(
1031      "list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
1032    store.putIterator(
1033      "list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
1034    store.putIterator(
1035      "list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
1036
1037    // getLocations and getBlockStatus should yield the same locations
1038    assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size
1039      === 3)
1040    assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size
1041      === 1)
1042
1043    // insert some more blocks
1044    store.putIterator(
1045      "newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
1046    store.putIterator(
1047      "newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
1048    store.putIterator(
1049      "newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
1050
1051    // getLocations and getBlockStatus should yield the same locations
1052    assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size
1053      === 1)
1054    assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = true).size
1055      === 3)
1056
1057    val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
1058    blockIds.foreach { blockId =>
1059      store.putIterator(
1060        blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
1061    }
1062    val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
1063      case RDDBlockId(1, _) => true
1064      case _ => false
1065    }, askSlaves = true)
1066    assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0), RDDBlockId(1, 1)))
1067  }
1068
1069  test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
1070    store = makeBlockManager(12000)
1071    store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
1072    store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
1073    // Access rdd_1_0 to ensure it's not least recently used.
1074    assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
1075    // According to the same-RDD rule, rdd_1_0 should be replaced here.
1076    store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
1077    // rdd_1_0 should have been replaced, even it's not least recently used.
1078    assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
1079    assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
1080    assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
1081  }
1082
1083  test("safely unroll blocks through putIterator (disk)") {
1084    store = makeBlockManager(12000)
1085    val memoryStore = store.memoryStore
1086    val diskStore = store.diskStore
1087    val smallList = List.fill(40)(new Array[Byte](100))
1088    val bigList = List.fill(40)(new Array[Byte](1000))
1089    def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
1090    def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
1091    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
1092
1093    store.putIterator("b1", smallIterator, StorageLevel.MEMORY_AND_DISK)
1094    store.putIterator("b2", smallIterator, StorageLevel.MEMORY_AND_DISK)
1095
1096    // Unroll with not enough space. This should succeed but kick out b1 in the process.
1097    // Memory store should contain b2 and b3, while disk store should contain only b1
1098    val result3 = memoryStore.putIteratorAsValues("b3", smallIterator, ClassTag.Any)
1099    assert(result3.isRight)
1100    assert(!memoryStore.contains("b1"))
1101    assert(memoryStore.contains("b2"))
1102    assert(memoryStore.contains("b3"))
1103    assert(diskStore.contains("b1"))
1104    assert(!diskStore.contains("b2"))
1105    assert(!diskStore.contains("b3"))
1106    memoryStore.remove("b3")
1107    store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
1108    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
1109
1110    // Unroll huge block with not enough space. This should fail and return an iterator so that
1111    // the block may be stored to disk. During the unrolling process, block "b2" should be kicked
1112    // out, so the memory store should contain only b3, while the disk store should contain
1113    // b1, b2 and b4.
1114    val result4 = memoryStore.putIteratorAsValues("b4", bigIterator, ClassTag.Any)
1115    assert(result4.isLeft)
1116    assert(!memoryStore.contains("b1"))
1117    assert(!memoryStore.contains("b2"))
1118    assert(memoryStore.contains("b3"))
1119    assert(!memoryStore.contains("b4"))
1120  }
1121
1122  test("read-locked blocks cannot be evicted from memory") {
1123    store = makeBlockManager(12000)
1124    val arr = new Array[Byte](4000)
1125    // First store a1 and a2, both in memory, and a3, on disk only
1126    store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER)
1127    store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER)
1128    assert(store.getSingle("a1").isDefined, "a1 was not in store")
1129    assert(store.getSingle("a2").isDefined, "a2 was not in store")
1130    // This put should fail because both a1 and a2 should be read-locked:
1131    store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
1132    assert(store.getSingle("a3").isEmpty, "a3 was in store")
1133    assert(store.getSingle("a1").isDefined, "a1 was not in store")
1134    assert(store.getSingle("a2").isDefined, "a2 was not in store")
1135    // Release both pins of block a2:
1136    store.releaseLock("a2")
1137    store.releaseLock("a2")
1138    // Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
1139    // block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
1140    store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
1141    assert(store.getSingle("a2").isEmpty, "a2 was in store")
1142    assert(store.getSingle("a1").isDefined, "a1 was not in store")
1143    assert(store.getSingle("a3").isDefined, "a3 was not in store")
1144  }
1145
1146  private def testReadWithLossOfOnDiskFiles(
1147      storageLevel: StorageLevel,
1148      readMethod: BlockManager => Option[_]): Unit = {
1149    store = makeBlockManager(12000)
1150    assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel))
1151    assert(store.getStatus("blockId").isDefined)
1152    // Directly delete all files from the disk store, triggering failures when reading blocks:
1153    store.diskBlockManager.getAllFiles().foreach(_.delete())
1154    // The BlockManager still thinks that these blocks exist:
1155    assert(store.getStatus("blockId").isDefined)
1156    // Because the BlockManager's metadata claims that the block exists (i.e. that it's present
1157    // in at least one store), the read attempts to read it and fails when the on-disk file is
1158    // missing.
1159    intercept[SparkException] {
1160      readMethod(store)
1161    }
1162    // Subsequent read attempts will succeed; the block isn't present but we return an expected
1163    // "block not found" response rather than a fatal error:
1164    assert(readMethod(store).isEmpty)
1165    // The reason why this second read succeeded is because the metadata entry for the missing
1166    // block was removed as a result of the read failure:
1167    assert(store.getStatus("blockId").isEmpty)
1168  }
1169
1170  test("remove block if a read fails due to missing DiskStore files (SPARK-15736)") {
1171    val storageLevels = Seq(
1172      StorageLevel(useDisk = true, useMemory = false, deserialized = false),
1173      StorageLevel(useDisk = true, useMemory = false, deserialized = true))
1174    val readMethods = Map[String, BlockManager => Option[_]](
1175      "getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")),
1176      "getLocalValues" -> ((m: BlockManager) => m.getLocalValues("blockId"))
1177    )
1178    testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, _.getLocalBytes("blockId"))
1179    for ((readMethodName, readMethod) <- readMethods; storageLevel <- storageLevels) {
1180      withClue(s"$readMethodName $storageLevel") {
1181        testReadWithLossOfOnDiskFiles(storageLevel, readMethod)
1182      }
1183    }
1184  }
1185
1186  test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") {
1187    val mockBlockTransferService =
1188      new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5))
1189    store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
1190    store.putSingle("item", 999L, StorageLevel.MEMORY_ONLY, tellMaster = true)
1191    assert(store.getRemoteBytes("item").isEmpty)
1192  }
1193
1194  test("SPARK-13328: refresh block locations (fetch should succeed after location refresh)") {
1195    val maxFailuresBeforeLocationRefresh =
1196      conf.getInt("spark.block.failures.beforeLocationRefresh", 5)
1197    val mockBlockManagerMaster = mock(classOf[BlockManagerMaster])
1198    val mockBlockTransferService =
1199      new MockBlockTransferService(maxFailuresBeforeLocationRefresh)
1200    // make sure we have more than maxFailuresBeforeLocationRefresh locations
1201    // so that we have a chance to do location refresh
1202    val blockManagerIds = (0 to maxFailuresBeforeLocationRefresh)
1203      .map { i => BlockManagerId(s"id-$i", s"host-$i", i + 1) }
1204    when(mockBlockManagerMaster.getLocations(mc.any[BlockId])).thenReturn(blockManagerIds)
1205    store = makeBlockManager(8000, "executor1", mockBlockManagerMaster,
1206      transferService = Option(mockBlockTransferService))
1207    val block = store.getRemoteBytes("item")
1208      .asInstanceOf[Option[ByteBuffer]]
1209    assert(block.isDefined)
1210    verify(mockBlockManagerMaster, times(2)).getLocations("item")
1211  }
1212
1213  test("SPARK-17484: block status is properly updated following an exception in put()") {
1214    val mockBlockTransferService = new MockBlockTransferService(maxFailures = 10) {
1215      override def uploadBlock(
1216          hostname: String,
1217          port: Int, execId: String,
1218          blockId: BlockId,
1219          blockData: ManagedBuffer,
1220          level: StorageLevel,
1221          classTag: ClassTag[_]): Future[Unit] = {
1222        throw new InterruptedException("Intentional interrupt")
1223      }
1224    }
1225    store = makeBlockManager(8000, "executor1", transferService = Option(mockBlockTransferService))
1226    store2 = makeBlockManager(8000, "executor2", transferService = Option(mockBlockTransferService))
1227    intercept[InterruptedException] {
1228      store.putSingle("item", "value", StorageLevel.MEMORY_ONLY_2, tellMaster = true)
1229    }
1230    assert(store.getLocalBytes("item").isEmpty)
1231    assert(master.getLocations("item").isEmpty)
1232    assert(store2.getRemoteBytes("item").isEmpty)
1233  }
1234
1235  test("SPARK-17484: master block locations are updated following an invalid remote block fetch") {
1236    store = makeBlockManager(8000, "executor1")
1237    store2 = makeBlockManager(8000, "executor2")
1238    store.putSingle("item", "value", StorageLevel.MEMORY_ONLY, tellMaster = true)
1239    assert(master.getLocations("item").nonEmpty)
1240    store.removeBlock("item", tellMaster = false)
1241    assert(master.getLocations("item").nonEmpty)
1242    assert(store2.getRemoteBytes("item").isEmpty)
1243    assert(master.getLocations("item").isEmpty)
1244  }
1245
1246  class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
1247    var numCalls = 0
1248
1249    override def init(blockDataManager: BlockDataManager): Unit = {}
1250
1251    override def fetchBlocks(
1252        host: String,
1253        port: Int,
1254        execId: String,
1255        blockIds: Array[String],
1256        listener: BlockFetchingListener): Unit = {
1257      listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1)))
1258    }
1259
1260    override def close(): Unit = {}
1261
1262    override def hostName: String = { "MockBlockTransferServiceHost" }
1263
1264    override def port: Int = { 63332 }
1265
1266    override def uploadBlock(
1267        hostname: String,
1268        port: Int, execId: String,
1269        blockId: BlockId,
1270        blockData: ManagedBuffer,
1271        level: StorageLevel,
1272        classTag: ClassTag[_]): Future[Unit] = {
1273      import scala.concurrent.ExecutionContext.Implicits.global
1274      Future {}
1275    }
1276
1277    override def fetchBlockSync(
1278        host: String,
1279        port: Int,
1280        execId: String,
1281        blockId: String): ManagedBuffer = {
1282      numCalls += 1
1283      if (numCalls <= maxFailures) {
1284        throw new RuntimeException("Failing block fetch in the mock block transfer service")
1285      }
1286      super.fetchBlockSync(host, port, execId, blockId)
1287    }
1288  }
1289}
1290
1291private object BlockManagerSuite {
1292
1293  private implicit class BlockManagerTestUtils(store: BlockManager) {
1294
1295    def dropFromMemoryIfExists(
1296        blockId: BlockId,
1297        data: () => Either[Array[Any], ChunkedByteBuffer]): Unit = {
1298      store.blockInfoManager.lockForWriting(blockId).foreach { info =>
1299        val newEffectiveStorageLevel = store.dropFromMemory(blockId, data)
1300        if (newEffectiveStorageLevel.isValid) {
1301          // The block is still present in at least one store, so release the lock
1302          // but don't delete the block info
1303          store.releaseLock(blockId)
1304        } else {
1305          // The block isn't present in any store, so delete the block info so that the
1306          // block can be stored again
1307          store.blockInfoManager.removeBlock(blockId)
1308        }
1309      }
1310    }
1311
1312    private def wrapGet[T](f: BlockId => Option[T]): BlockId => Option[T] = (blockId: BlockId) => {
1313      val result = f(blockId)
1314      if (result.isDefined) {
1315        store.releaseLock(blockId)
1316      }
1317      result
1318    }
1319
1320    def hasLocalBlock(blockId: BlockId): Boolean = {
1321      getLocalAndReleaseLock(blockId).isDefined
1322    }
1323
1324    val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues)
1325    val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
1326    val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
1327    val getLocalBytesAndReleaseLock: (BlockId) => Option[ChunkedByteBuffer] = {
1328      wrapGet(store.getLocalBytes)
1329    }
1330  }
1331
1332}
1333