1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import org.scalatest.concurrent.Timeouts._ 21import org.scalatest.Matchers 22import org.scalatest.time.{Millis, Span} 23 24import org.apache.spark.storage.{RDDBlockId, StorageLevel} 25import org.apache.spark.util.io.ChunkedByteBuffer 26 27class NotSerializableClass 28class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {} 29 30 31class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContext { 32 33 val clusterUrl = "local-cluster[2,1,1024]" 34 35 test("task throws not serializable exception") { 36 // Ensures that executors do not crash when an exn is not serializable. If executors crash, 37 // this test will hang. Correct behavior is that executors don't crash but fail tasks 38 // and the scheduler throws a SparkException. 39 40 // numSlaves must be less than numPartitions 41 val numSlaves = 3 42 val numPartitions = 10 43 44 sc = new SparkContext("local-cluster[%s,1,1024]".format(numSlaves), "test") 45 val data = sc.parallelize(1 to 100, numPartitions). 46 map(x => throw new NotSerializableExn(new NotSerializableClass)) 47 intercept[SparkException] { 48 data.count() 49 } 50 resetSparkContext() 51 } 52 53 test("local-cluster format") { 54 import SparkMasterRegex._ 55 56 val masterStrings = Seq( 57 "local-cluster[2,1,1024]", 58 "local-cluster[2 , 1 , 1024]", 59 "local-cluster[2, 1, 1024]", 60 "local-cluster[ 2, 1, 1024 ]" 61 ) 62 63 masterStrings.foreach { 64 case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => 65 assert(numSlaves.toInt == 2) 66 assert(coresPerSlave.toInt == 1) 67 assert(memoryPerSlave.toInt == 1024) 68 } 69 } 70 71 test("simple groupByKey") { 72 sc = new SparkContext(clusterUrl, "test") 73 val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5) 74 val groups = pairs.groupByKey(5).collect() 75 assert(groups.size === 2) 76 val valuesFor1 = groups.find(_._1 == 1).get._2 77 assert(valuesFor1.toList.sorted === List(1, 2, 3)) 78 val valuesFor2 = groups.find(_._1 == 2).get._2 79 assert(valuesFor2.toList.sorted === List(1)) 80 } 81 82 test("groupByKey where map output sizes exceed maxMbInFlight") { 83 val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "1m") 84 sc = new SparkContext(clusterUrl, "test", conf) 85 // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output 86 // file should be about 2.5 MB 87 val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000))) 88 val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect() 89 assert(groups.length === 16) 90 assert(groups.map(_._2).sum === 2000) 91 } 92 93 test("accumulators") { 94 sc = new SparkContext(clusterUrl, "test") 95 val accum = sc.longAccumulator 96 sc.parallelize(1 to 10, 10).foreach(x => accum.add(x)) 97 assert(accum.value === 55) 98 } 99 100 test("broadcast variables") { 101 sc = new SparkContext(clusterUrl, "test") 102 val array = new Array[Int](100) 103 val bv = sc.broadcast(array) 104 array(2) = 3 // Change the array -- this should not be seen on workers 105 val rdd = sc.parallelize(1 to 10, 10) 106 val sum = rdd.map(x => bv.value.sum).reduce(_ + _) 107 assert(sum === 0) 108 } 109 110 test("repeatedly failing task") { 111 sc = new SparkContext(clusterUrl, "test") 112 val thrown = intercept[SparkException] { 113 // scalastyle:off println 114 sc.parallelize(1 to 10, 10).foreach(x => println(x / 0)) 115 // scalastyle:on println 116 } 117 assert(thrown.getClass === classOf[SparkException]) 118 assert(thrown.getMessage.contains("failed 4 times")) 119 } 120 121 test("repeatedly failing task that crashes JVM") { 122 // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather 123 // than hanging due to retrying the failed task infinitely many times (eventually the 124 // standalone scheduler will remove the application, causing the job to hang waiting to 125 // reconnect to the master). 126 sc = new SparkContext(clusterUrl, "test") 127 failAfter(Span(100000, Millis)) { 128 val thrown = intercept[SparkException] { 129 // One of the tasks always fails. 130 sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) } 131 } 132 assert(thrown.getClass === classOf[SparkException]) 133 assert(thrown.getMessage.contains("failed 4 times")) 134 } 135 } 136 137 test("repeatedly failing task that crashes JVM with a zero exit code (SPARK-16925)") { 138 // Ensures that if a task which causes the JVM to exit with a zero exit code will cause the 139 // Spark job to eventually fail. 140 sc = new SparkContext(clusterUrl, "test") 141 failAfter(Span(100000, Millis)) { 142 val thrown = intercept[SparkException] { 143 sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) } 144 } 145 assert(thrown.getClass === classOf[SparkException]) 146 assert(thrown.getMessage.contains("failed 4 times")) 147 } 148 // Check that the cluster is still usable: 149 sc.parallelize(1 to 10).count() 150 } 151 152 private def testCaching(storageLevel: StorageLevel): Unit = { 153 sc = new SparkContext(clusterUrl, "test") 154 sc.jobProgressListener.waitUntilExecutorsUp(2, 30000) 155 val data = sc.parallelize(1 to 1000, 10) 156 val cachedData = data.persist(storageLevel) 157 assert(cachedData.count === 1000) 158 assert(sc.getExecutorStorageStatus.map(_.rddBlocksById(cachedData.id).size).sum === 159 storageLevel.replication * data.getNumPartitions) 160 assert(cachedData.count === 1000) 161 assert(cachedData.count === 1000) 162 163 // Get all the locations of the first partition and try to fetch the partitions 164 // from those locations. 165 val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray 166 val blockId = blockIds(0) 167 val blockManager = SparkEnv.get.blockManager 168 val blockTransfer = blockManager.blockTransferService 169 val serializerManager = SparkEnv.get.serializerManager 170 blockManager.master.getLocations(blockId).foreach { cmId => 171 val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, 172 blockId.toString) 173 val deserialized = serializerManager.dataDeserializeStream(blockId, 174 new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream())(data.elementClassTag).toList 175 assert(deserialized === (1 to 100).toList) 176 } 177 // This will exercise the getRemoteBytes / getRemoteValues code paths: 178 assert(blockIds.flatMap(id => blockManager.get[Int](id).get.data).toSet === (1 to 1000).toSet) 179 } 180 181 Seq( 182 "caching" -> StorageLevel.MEMORY_ONLY, 183 "caching on disk" -> StorageLevel.DISK_ONLY, 184 "caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2, 185 "caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2, 186 "caching on disk, replicated" -> StorageLevel.DISK_ONLY_2, 187 "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2, 188 "caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2 189 ).foreach { case (testName, storageLevel) => 190 test(testName) { 191 testCaching(storageLevel) 192 } 193 } 194 195 test("compute without caching when no partitions fit in memory") { 196 val size = 10000 197 val conf = new SparkConf() 198 .set("spark.storage.unrollMemoryThreshold", "1024") 199 .set("spark.testing.memory", (size / 2).toString) 200 sc = new SparkContext(clusterUrl, "test", conf) 201 val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY) 202 assert(data.count() === size) 203 assert(data.count() === size) 204 assert(data.count() === size) 205 // ensure only a subset of partitions were cached 206 val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true) 207 assert(rddBlocks.size === 0, s"expected no RDD blocks, found ${rddBlocks.size}") 208 } 209 210 test("compute when only some partitions fit in memory") { 211 val size = 10000 212 val numPartitions = 20 213 val conf = new SparkConf() 214 .set("spark.storage.unrollMemoryThreshold", "1024") 215 .set("spark.testing.memory", size.toString) 216 sc = new SparkContext(clusterUrl, "test", conf) 217 val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY) 218 assert(data.count() === size) 219 assert(data.count() === size) 220 assert(data.count() === size) 221 // ensure only a subset of partitions were cached 222 val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true) 223 assert(rddBlocks.size > 0, "no RDD blocks found") 224 assert(rddBlocks.size < numPartitions, s"too many RDD blocks found, expected <$numPartitions") 225 } 226 227 test("passing environment variables to cluster") { 228 sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE")) 229 val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect() 230 assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE")) 231 } 232 233 test("recover from node failures") { 234 import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity} 235 DistributedSuite.amMaster = true 236 sc = new SparkContext(clusterUrl, "test") 237 val data = sc.parallelize(Seq(true, true), 2) 238 assert(data.count === 2) // force executors to start 239 assert(data.map(markNodeIfIdentity).collect.size === 2) 240 assert(data.map(failOnMarkedIdentity).collect.size === 2) 241 } 242 243 test("recover from repeated node failures during shuffle-map") { 244 import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity} 245 DistributedSuite.amMaster = true 246 sc = new SparkContext(clusterUrl, "test") 247 for (i <- 1 to 3) { 248 val data = sc.parallelize(Seq(true, false), 2) 249 assert(data.count === 2) 250 assert(data.map(markNodeIfIdentity).collect.size === 2) 251 assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey.count === 2) 252 } 253 } 254 255 test("recover from repeated node failures during shuffle-reduce") { 256 import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity} 257 DistributedSuite.amMaster = true 258 sc = new SparkContext(clusterUrl, "test") 259 for (i <- 1 to 3) { 260 val data = sc.parallelize(Seq(true, true), 2) 261 assert(data.count === 2) 262 assert(data.map(markNodeIfIdentity).collect.size === 2) 263 // This relies on mergeCombiners being used to perform the actual reduce for this 264 // test to actually be testing what it claims. 265 val grouped = data.map(x => x -> x).combineByKey( 266 x => x, 267 (x: Boolean, y: Boolean) => x, 268 (x: Boolean, y: Boolean) => failOnMarkedIdentity(x) 269 ) 270 assert(grouped.collect.size === 1) 271 } 272 } 273 274 test("recover from node failures with replication") { 275 import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity} 276 DistributedSuite.amMaster = true 277 // Using more than two nodes so we don't have a symmetric communication pattern and might 278 // cache a partially correct list of peers. 279 sc = new SparkContext("local-cluster[3,1,1024]", "test") 280 for (i <- 1 to 3) { 281 val data = sc.parallelize(Seq(true, false, false, false), 4) 282 data.persist(StorageLevel.MEMORY_ONLY_2) 283 284 assert(data.count === 4) 285 assert(data.map(markNodeIfIdentity).collect.size === 4) 286 assert(data.map(failOnMarkedIdentity).collect.size === 4) 287 288 // Create a new replicated RDD to make sure that cached peer information doesn't cause 289 // problems. 290 val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2) 291 assert(data2.count === 2) 292 } 293 } 294 295 test("unpersist RDDs") { 296 DistributedSuite.amMaster = true 297 sc = new SparkContext("local-cluster[3,1,1024]", "test") 298 val data = sc.parallelize(Seq(true, false, false, false), 4) 299 data.persist(StorageLevel.MEMORY_ONLY_2) 300 data.count 301 assert(sc.persistentRdds.isEmpty === false) 302 data.unpersist() 303 assert(sc.persistentRdds.isEmpty === true) 304 305 failAfter(Span(3000, Millis)) { 306 try { 307 while (! sc.getRDDStorageInfo.isEmpty) { 308 Thread.sleep(200) 309 } 310 } catch { 311 case _: Throwable => Thread.sleep(10) 312 // Do nothing. We might see exceptions because block manager 313 // is racing this thread to remove entries from the driver. 314 } 315 } 316 } 317 318} 319 320object DistributedSuite { 321 // Indicates whether this JVM is marked for failure. 322 var mark = false 323 324 // Set by test to remember if we are in the driver program so we can assert 325 // that we are not. 326 var amMaster = false 327 328 // Act like an identity function, but if the argument is true, set mark to true. 329 def markNodeIfIdentity(item: Boolean): Boolean = { 330 if (item) { 331 assert(!amMaster) 332 mark = true 333 } 334 item 335 } 336 337 // Act like an identity function, but if mark was set to true previously, fail, 338 // crashing the entire JVM. 339 def failOnMarkedIdentity(item: Boolean): Boolean = { 340 if (mark) { 341 System.exit(42) 342 } 343 item 344 } 345} 346