1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import org.scalatest.BeforeAndAfterAll 21 22import org.apache.spark.network.TransportContext 23import org.apache.spark.network.netty.SparkTransportConf 24import org.apache.spark.network.server.TransportServer 25import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleClient} 26 27/** 28 * This suite creates an external shuffle server and routes all shuffle fetches through it. 29 * Note that failures in this suite may arise due to changes in Spark that invalidate expectations 30 * set up in [[ExternalShuffleBlockHandler]], such as changing the format of shuffle files or how 31 * we hash files into folders. 32 */ 33class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { 34 var server: TransportServer = _ 35 var rpcHandler: ExternalShuffleBlockHandler = _ 36 37 override def beforeAll() { 38 super.beforeAll() 39 val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 2) 40 rpcHandler = new ExternalShuffleBlockHandler(transportConf, null) 41 val transportContext = new TransportContext(transportConf, rpcHandler) 42 server = transportContext.createServer() 43 44 conf.set("spark.shuffle.manager", "sort") 45 conf.set("spark.shuffle.service.enabled", "true") 46 conf.set("spark.shuffle.service.port", server.getPort.toString) 47 } 48 49 override def afterAll() { 50 try { 51 server.close() 52 } finally { 53 super.afterAll() 54 } 55 } 56 57 // This test ensures that the external shuffle service is actually in use for the other tests. 58 test("using external shuffle service") { 59 sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) 60 sc.env.blockManager.externalShuffleServiceEnabled should equal(true) 61 sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient]) 62 63 // In a slow machine, one slave may register hundreds of milliseconds ahead of the other one. 64 // If we don't wait for all slaves, it's possible that only one executor runs all jobs. Then 65 // all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch 66 // local blocks from the local BlockManager and won't send requests to ExternalShuffleService. 67 // In this case, we won't receive FetchFailed. And it will make this test fail. 68 // Therefore, we should wait until all slaves are up 69 sc.jobProgressListener.waitUntilExecutorsUp(2, 60000) 70 71 val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _) 72 73 rdd.count() 74 rdd.count() 75 76 // Invalidate the registered executors, disallowing access to their shuffle blocks (without 77 // deleting the actual shuffle files, so we could access them without the shuffle service). 78 rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */) 79 80 // Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry" 81 // being set. 82 val e = intercept[SparkException] { 83 rdd.count() 84 } 85 e.getMessage should include ("Fetch failure will not retry stage due to testing config") 86 } 87} 88