1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark 19 20import org.scalatest.PrivateMethodTester 21 22import org.apache.spark.internal.Logging 23import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} 24import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend 25import org.apache.spark.scheduler.local.LocalSchedulerBackend 26 27 28class SparkContextSchedulerCreationSuite 29 extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with Logging { 30 31 def createTaskScheduler(master: String): TaskSchedulerImpl = 32 createTaskScheduler(master, "client") 33 34 def createTaskScheduler(master: String, deployMode: String): TaskSchedulerImpl = 35 createTaskScheduler(master, deployMode, new SparkConf()) 36 37 def createTaskScheduler( 38 master: String, 39 deployMode: String, 40 conf: SparkConf): TaskSchedulerImpl = { 41 // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the 42 // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. 43 sc = new SparkContext("local", "test", conf) 44 val createTaskSchedulerMethod = 45 PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler) 46 val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, deployMode) 47 sched.asInstanceOf[TaskSchedulerImpl] 48 } 49 50 test("bad-master") { 51 val e = intercept[SparkException] { 52 createTaskScheduler("localhost:1234") 53 } 54 assert(e.getMessage.contains("Could not parse Master URL")) 55 } 56 57 test("local") { 58 val sched = createTaskScheduler("local") 59 sched.backend match { 60 case s: LocalSchedulerBackend => assert(s.totalCores === 1) 61 case _ => fail() 62 } 63 } 64 65 test("local-*") { 66 val sched = createTaskScheduler("local[*]") 67 sched.backend match { 68 case s: LocalSchedulerBackend => 69 assert(s.totalCores === Runtime.getRuntime.availableProcessors()) 70 case _ => fail() 71 } 72 } 73 74 test("local-n") { 75 val sched = createTaskScheduler("local[5]") 76 assert(sched.maxTaskFailures === 1) 77 sched.backend match { 78 case s: LocalSchedulerBackend => assert(s.totalCores === 5) 79 case _ => fail() 80 } 81 } 82 83 test("local-*-n-failures") { 84 val sched = createTaskScheduler("local[* ,2]") 85 assert(sched.maxTaskFailures === 2) 86 sched.backend match { 87 case s: LocalSchedulerBackend => 88 assert(s.totalCores === Runtime.getRuntime.availableProcessors()) 89 case _ => fail() 90 } 91 } 92 93 test("local-n-failures") { 94 val sched = createTaskScheduler("local[4, 2]") 95 assert(sched.maxTaskFailures === 2) 96 sched.backend match { 97 case s: LocalSchedulerBackend => assert(s.totalCores === 4) 98 case _ => fail() 99 } 100 } 101 102 test("bad-local-n") { 103 val e = intercept[SparkException] { 104 createTaskScheduler("local[2*]") 105 } 106 assert(e.getMessage.contains("Could not parse Master URL")) 107 } 108 109 test("bad-local-n-failures") { 110 val e = intercept[SparkException] { 111 createTaskScheduler("local[2*,4]") 112 } 113 assert(e.getMessage.contains("Could not parse Master URL")) 114 } 115 116 test("local-default-parallelism") { 117 val conf = new SparkConf().set("spark.default.parallelism", "16") 118 val sched = createTaskScheduler("local", "client", conf) 119 120 sched.backend match { 121 case s: LocalSchedulerBackend => assert(s.defaultParallelism() === 16) 122 case _ => fail() 123 } 124 } 125 126 test("local-cluster") { 127 createTaskScheduler("local-cluster[3, 14, 1024]").backend match { 128 case s: StandaloneSchedulerBackend => // OK 129 case _ => fail() 130 } 131 } 132} 133