1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark
19
20import org.scalatest.concurrent.Timeouts._
21import org.scalatest.Matchers
22import org.scalatest.time.{Millis, Span}
23
24import org.apache.spark.storage.{RDDBlockId, StorageLevel}
25import org.apache.spark.util.io.ChunkedByteBuffer
26
27class NotSerializableClass
28class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
29
30
31class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContext {
32
33  val clusterUrl = "local-cluster[2,1,1024]"
34
35  test("task throws not serializable exception") {
36    // Ensures that executors do not crash when an exn is not serializable. If executors crash,
37    // this test will hang. Correct behavior is that executors don't crash but fail tasks
38    // and the scheduler throws a SparkException.
39
40    // numSlaves must be less than numPartitions
41    val numSlaves = 3
42    val numPartitions = 10
43
44    sc = new SparkContext("local-cluster[%s,1,1024]".format(numSlaves), "test")
45    val data = sc.parallelize(1 to 100, numPartitions).
46      map(x => throw new NotSerializableExn(new NotSerializableClass))
47    intercept[SparkException] {
48      data.count()
49    }
50    resetSparkContext()
51  }
52
53  test("local-cluster format") {
54    import SparkMasterRegex._
55
56    val masterStrings = Seq(
57      "local-cluster[2,1,1024]",
58      "local-cluster[2 , 1 , 1024]",
59      "local-cluster[2, 1, 1024]",
60      "local-cluster[ 2, 1, 1024 ]"
61    )
62
63    masterStrings.foreach {
64      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
65        assert(numSlaves.toInt == 2)
66        assert(coresPerSlave.toInt == 1)
67        assert(memoryPerSlave.toInt == 1024)
68    }
69  }
70
71  test("simple groupByKey") {
72    sc = new SparkContext(clusterUrl, "test")
73    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5)
74    val groups = pairs.groupByKey(5).collect()
75    assert(groups.size === 2)
76    val valuesFor1 = groups.find(_._1 == 1).get._2
77    assert(valuesFor1.toList.sorted === List(1, 2, 3))
78    val valuesFor2 = groups.find(_._1 == 2).get._2
79    assert(valuesFor2.toList.sorted === List(1))
80  }
81
82  test("groupByKey where map output sizes exceed maxMbInFlight") {
83    val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "1m")
84    sc = new SparkContext(clusterUrl, "test", conf)
85    // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
86    // file should be about 2.5 MB
87    val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
88    val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
89    assert(groups.length === 16)
90    assert(groups.map(_._2).sum === 2000)
91  }
92
93  test("accumulators") {
94    sc = new SparkContext(clusterUrl, "test")
95    val accum = sc.longAccumulator
96    sc.parallelize(1 to 10, 10).foreach(x => accum.add(x))
97    assert(accum.value === 55)
98  }
99
100  test("broadcast variables") {
101    sc = new SparkContext(clusterUrl, "test")
102    val array = new Array[Int](100)
103    val bv = sc.broadcast(array)
104    array(2) = 3     // Change the array -- this should not be seen on workers
105    val rdd = sc.parallelize(1 to 10, 10)
106    val sum = rdd.map(x => bv.value.sum).reduce(_ + _)
107    assert(sum === 0)
108  }
109
110  test("repeatedly failing task") {
111    sc = new SparkContext(clusterUrl, "test")
112    val thrown = intercept[SparkException] {
113      // scalastyle:off println
114      sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
115      // scalastyle:on println
116    }
117    assert(thrown.getClass === classOf[SparkException])
118    assert(thrown.getMessage.contains("failed 4 times"))
119  }
120
121  test("repeatedly failing task that crashes JVM") {
122    // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather
123    // than hanging due to retrying the failed task infinitely many times (eventually the
124    // standalone scheduler will remove the application, causing the job to hang waiting to
125    // reconnect to the master).
126    sc = new SparkContext(clusterUrl, "test")
127    failAfter(Span(100000, Millis)) {
128      val thrown = intercept[SparkException] {
129        // One of the tasks always fails.
130        sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
131      }
132      assert(thrown.getClass === classOf[SparkException])
133      assert(thrown.getMessage.contains("failed 4 times"))
134    }
135  }
136
137  test("repeatedly failing task that crashes JVM with a zero exit code (SPARK-16925)") {
138    // Ensures that if a task which causes the JVM to exit with a zero exit code will cause the
139    // Spark job to eventually fail.
140    sc = new SparkContext(clusterUrl, "test")
141    failAfter(Span(100000, Millis)) {
142      val thrown = intercept[SparkException] {
143        sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) }
144      }
145      assert(thrown.getClass === classOf[SparkException])
146      assert(thrown.getMessage.contains("failed 4 times"))
147    }
148    // Check that the cluster is still usable:
149    sc.parallelize(1 to 10).count()
150  }
151
152  private def testCaching(storageLevel: StorageLevel): Unit = {
153    sc = new SparkContext(clusterUrl, "test")
154    sc.jobProgressListener.waitUntilExecutorsUp(2, 30000)
155    val data = sc.parallelize(1 to 1000, 10)
156    val cachedData = data.persist(storageLevel)
157    assert(cachedData.count === 1000)
158    assert(sc.getExecutorStorageStatus.map(_.rddBlocksById(cachedData.id).size).sum ===
159      storageLevel.replication * data.getNumPartitions)
160    assert(cachedData.count === 1000)
161    assert(cachedData.count === 1000)
162
163    // Get all the locations of the first partition and try to fetch the partitions
164    // from those locations.
165    val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray
166    val blockId = blockIds(0)
167    val blockManager = SparkEnv.get.blockManager
168    val blockTransfer = blockManager.blockTransferService
169    val serializerManager = SparkEnv.get.serializerManager
170    blockManager.master.getLocations(blockId).foreach { cmId =>
171      val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
172        blockId.toString)
173      val deserialized = serializerManager.dataDeserializeStream(blockId,
174        new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream())(data.elementClassTag).toList
175      assert(deserialized === (1 to 100).toList)
176    }
177    // This will exercise the getRemoteBytes / getRemoteValues code paths:
178    assert(blockIds.flatMap(id => blockManager.get[Int](id).get.data).toSet === (1 to 1000).toSet)
179  }
180
181  Seq(
182    "caching" -> StorageLevel.MEMORY_ONLY,
183    "caching on disk" -> StorageLevel.DISK_ONLY,
184    "caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2,
185    "caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2,
186    "caching on disk, replicated" -> StorageLevel.DISK_ONLY_2,
187    "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2,
188    "caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2
189  ).foreach { case (testName, storageLevel) =>
190    test(testName) {
191      testCaching(storageLevel)
192    }
193  }
194
195  test("compute without caching when no partitions fit in memory") {
196    val size = 10000
197    val conf = new SparkConf()
198      .set("spark.storage.unrollMemoryThreshold", "1024")
199      .set("spark.testing.memory", (size / 2).toString)
200    sc = new SparkContext(clusterUrl, "test", conf)
201    val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY)
202    assert(data.count() === size)
203    assert(data.count() === size)
204    assert(data.count() === size)
205    // ensure only a subset of partitions were cached
206    val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true)
207    assert(rddBlocks.size === 0, s"expected no RDD blocks, found ${rddBlocks.size}")
208  }
209
210  test("compute when only some partitions fit in memory") {
211    val size = 10000
212    val numPartitions = 20
213    val conf = new SparkConf()
214      .set("spark.storage.unrollMemoryThreshold", "1024")
215      .set("spark.testing.memory", size.toString)
216    sc = new SparkContext(clusterUrl, "test", conf)
217    val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY)
218    assert(data.count() === size)
219    assert(data.count() === size)
220    assert(data.count() === size)
221    // ensure only a subset of partitions were cached
222    val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true)
223    assert(rddBlocks.size > 0, "no RDD blocks found")
224    assert(rddBlocks.size < numPartitions, s"too many RDD blocks found, expected <$numPartitions")
225  }
226
227  test("passing environment variables to cluster") {
228    sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE"))
229    val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect()
230    assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE"))
231  }
232
233  test("recover from node failures") {
234    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
235    DistributedSuite.amMaster = true
236    sc = new SparkContext(clusterUrl, "test")
237    val data = sc.parallelize(Seq(true, true), 2)
238    assert(data.count === 2) // force executors to start
239    assert(data.map(markNodeIfIdentity).collect.size === 2)
240    assert(data.map(failOnMarkedIdentity).collect.size === 2)
241  }
242
243  test("recover from repeated node failures during shuffle-map") {
244    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
245    DistributedSuite.amMaster = true
246    sc = new SparkContext(clusterUrl, "test")
247    for (i <- 1 to 3) {
248      val data = sc.parallelize(Seq(true, false), 2)
249      assert(data.count === 2)
250      assert(data.map(markNodeIfIdentity).collect.size === 2)
251      assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey.count === 2)
252    }
253  }
254
255  test("recover from repeated node failures during shuffle-reduce") {
256    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
257    DistributedSuite.amMaster = true
258    sc = new SparkContext(clusterUrl, "test")
259    for (i <- 1 to 3) {
260      val data = sc.parallelize(Seq(true, true), 2)
261      assert(data.count === 2)
262      assert(data.map(markNodeIfIdentity).collect.size === 2)
263      // This relies on mergeCombiners being used to perform the actual reduce for this
264      // test to actually be testing what it claims.
265      val grouped = data.map(x => x -> x).combineByKey(
266                      x => x,
267                      (x: Boolean, y: Boolean) => x,
268                      (x: Boolean, y: Boolean) => failOnMarkedIdentity(x)
269                    )
270      assert(grouped.collect.size === 1)
271    }
272  }
273
274  test("recover from node failures with replication") {
275    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
276    DistributedSuite.amMaster = true
277    // Using more than two nodes so we don't have a symmetric communication pattern and might
278    // cache a partially correct list of peers.
279    sc = new SparkContext("local-cluster[3,1,1024]", "test")
280    for (i <- 1 to 3) {
281      val data = sc.parallelize(Seq(true, false, false, false), 4)
282      data.persist(StorageLevel.MEMORY_ONLY_2)
283
284      assert(data.count === 4)
285      assert(data.map(markNodeIfIdentity).collect.size === 4)
286      assert(data.map(failOnMarkedIdentity).collect.size === 4)
287
288      // Create a new replicated RDD to make sure that cached peer information doesn't cause
289      // problems.
290      val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
291      assert(data2.count === 2)
292    }
293  }
294
295  test("unpersist RDDs") {
296    DistributedSuite.amMaster = true
297    sc = new SparkContext("local-cluster[3,1,1024]", "test")
298    val data = sc.parallelize(Seq(true, false, false, false), 4)
299    data.persist(StorageLevel.MEMORY_ONLY_2)
300    data.count
301    assert(sc.persistentRdds.isEmpty === false)
302    data.unpersist()
303    assert(sc.persistentRdds.isEmpty === true)
304
305    failAfter(Span(3000, Millis)) {
306      try {
307        while (! sc.getRDDStorageInfo.isEmpty) {
308          Thread.sleep(200)
309        }
310      } catch {
311        case _: Throwable => Thread.sleep(10)
312          // Do nothing. We might see exceptions because block manager
313          // is racing this thread to remove entries from the driver.
314      }
315    }
316  }
317
318}
319
320object DistributedSuite {
321  // Indicates whether this JVM is marked for failure.
322  var mark = false
323
324  // Set by test to remember if we are in the driver program so we can assert
325  // that we are not.
326  var amMaster = false
327
328  // Act like an identity function, but if the argument is true, set mark to true.
329  def markNodeIfIdentity(item: Boolean): Boolean = {
330    if (item) {
331      assert(!amMaster)
332      mark = true
333    }
334    item
335  }
336
337  // Act like an identity function, but if mark was set to true previously, fail,
338  // crashing the entire JVM.
339  def failOnMarkedIdentity(item: Boolean): Boolean = {
340    if (mark) {
341      System.exit(42)
342    }
343    item
344  }
345}
346