1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.storage 19 20import scala.collection.mutable 21 22import org.scalatest.{BeforeAndAfter, Matchers} 23 24import org.apache.spark.{LocalSparkContext, SparkFunSuite} 25 26class BlockReplicationPolicySuite extends SparkFunSuite 27 with Matchers 28 with BeforeAndAfter 29 with LocalSparkContext { 30 31 // Implicitly convert strings to BlockIds for test clarity. 32 private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) 33 34 /** 35 * Test if we get the required number of peers when using random sampling from 36 * RandomBlockReplicationPolicy 37 */ 38 test(s"block replication - random block replication policy") { 39 val numBlockManagers = 10 40 val storeSize = 1000 41 val blockManagers = (1 to numBlockManagers).map { i => 42 BlockManagerId(s"store-$i", "localhost", 1000 + i, None) 43 } 44 val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None) 45 val replicationPolicy = new RandomBlockReplicationPolicy 46 val blockId = "test-block" 47 48 (1 to 10).foreach {numReplicas => 49 logDebug(s"Num replicas : $numReplicas") 50 val randomPeers = replicationPolicy.prioritize( 51 candidateBlockManager, 52 blockManagers, 53 mutable.HashSet.empty[BlockManagerId], 54 blockId, 55 numReplicas 56 ) 57 logDebug(s"Random peers : ${randomPeers.mkString(", ")}") 58 assert(randomPeers.toSet.size === numReplicas) 59 60 // choosing n peers out of n 61 val secondPass = replicationPolicy.prioritize( 62 candidateBlockManager, 63 randomPeers, 64 mutable.HashSet.empty[BlockManagerId], 65 blockId, 66 numReplicas 67 ) 68 logDebug(s"Random peers : ${secondPass.mkString(", ")}") 69 assert(secondPass.toSet.size === numReplicas) 70 } 71 72 } 73 74} 75