1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark
19
20import org.scalatest.BeforeAndAfterAll
21
22import org.apache.spark.network.TransportContext
23import org.apache.spark.network.netty.SparkTransportConf
24import org.apache.spark.network.server.TransportServer
25import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleClient}
26
27/**
28 * This suite creates an external shuffle server and routes all shuffle fetches through it.
29 * Note that failures in this suite may arise due to changes in Spark that invalidate expectations
30 * set up in [[ExternalShuffleBlockHandler]], such as changing the format of shuffle files or how
31 * we hash files into folders.
32 */
33class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
34  var server: TransportServer = _
35  var rpcHandler: ExternalShuffleBlockHandler = _
36
37  override def beforeAll() {
38    super.beforeAll()
39    val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 2)
40    rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
41    val transportContext = new TransportContext(transportConf, rpcHandler)
42    server = transportContext.createServer()
43
44    conf.set("spark.shuffle.manager", "sort")
45    conf.set("spark.shuffle.service.enabled", "true")
46    conf.set("spark.shuffle.service.port", server.getPort.toString)
47  }
48
49  override def afterAll() {
50    try {
51      server.close()
52    } finally {
53      super.afterAll()
54    }
55  }
56
57  // This test ensures that the external shuffle service is actually in use for the other tests.
58  test("using external shuffle service") {
59    sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
60    sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
61    sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
62
63    // In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
64    // If we don't wait for all slaves, it's possible that only one executor runs all jobs. Then
65    // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
66    // local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
67    // In this case, we won't receive FetchFailed. And it will make this test fail.
68    // Therefore, we should wait until all slaves are up
69    sc.jobProgressListener.waitUntilExecutorsUp(2, 60000)
70
71    val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)
72
73    rdd.count()
74    rdd.count()
75
76    // Invalidate the registered executors, disallowing access to their shuffle blocks (without
77    // deleting the actual shuffle files, so we could access them without the shuffle service).
78    rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */)
79
80    // Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry"
81    // being set.
82    val e = intercept[SparkException] {
83      rdd.count()
84    }
85    e.getMessage should include ("Fetch failure will not retry stage due to testing config")
86  }
87}
88