1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import java.util.Properties 21 22import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} 23 24/** 25 * Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work 26 * correctly. 27 */ 28class PoolSuite extends SparkFunSuite with LocalSparkContext { 29 30 def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) 31 : TaskSetManager = { 32 val tasks = Array.tabulate[Task[_]](numTasks) { i => 33 new FakeTask(stageId, i, Nil) 34 } 35 new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0) 36 } 37 38 def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) { 39 val taskSetQueue = rootPool.getSortedTaskSetQueue 40 val nextTaskSetToSchedule = 41 taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks) 42 assert(nextTaskSetToSchedule.isDefined) 43 nextTaskSetToSchedule.get.addRunningTask(taskId) 44 assert(nextTaskSetToSchedule.get.stageId === expectedStageId) 45 } 46 47 test("FIFO Scheduler Test") { 48 sc = new SparkContext("local", "TaskSchedulerImplSuite") 49 val taskScheduler = new TaskSchedulerImpl(sc) 50 51 val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) 52 val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) 53 schedulableBuilder.buildPools() 54 55 val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) 56 val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) 57 val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler) 58 schedulableBuilder.addTaskSetManager(taskSetManager0, null) 59 schedulableBuilder.addTaskSetManager(taskSetManager1, null) 60 schedulableBuilder.addTaskSetManager(taskSetManager2, null) 61 62 scheduleTaskAndVerifyId(0, rootPool, 0) 63 scheduleTaskAndVerifyId(1, rootPool, 0) 64 scheduleTaskAndVerifyId(2, rootPool, 1) 65 scheduleTaskAndVerifyId(3, rootPool, 1) 66 scheduleTaskAndVerifyId(4, rootPool, 2) 67 scheduleTaskAndVerifyId(5, rootPool, 2) 68 } 69 70 /** 71 * This test creates three scheduling pools, and creates task set managers in the first 72 * two scheduling pools. The test verifies that as tasks are scheduled, the fair scheduling 73 * algorithm properly orders the two scheduling pools. 74 */ 75 test("Fair Scheduler Test") { 76 val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() 77 val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath) 78 sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) 79 val taskScheduler = new TaskSchedulerImpl(sc) 80 81 val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) 82 val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) 83 schedulableBuilder.buildPools() 84 85 // Ensure that the XML file was read in correctly. 86 assert(rootPool.getSchedulableByName("default") != null) 87 assert(rootPool.getSchedulableByName("1") != null) 88 assert(rootPool.getSchedulableByName("2") != null) 89 assert(rootPool.getSchedulableByName("3") != null) 90 assert(rootPool.getSchedulableByName("1").minShare === 2) 91 assert(rootPool.getSchedulableByName("1").weight === 1) 92 assert(rootPool.getSchedulableByName("2").minShare === 3) 93 assert(rootPool.getSchedulableByName("2").weight === 1) 94 assert(rootPool.getSchedulableByName("3").minShare === 0) 95 assert(rootPool.getSchedulableByName("3").weight === 1) 96 97 val properties1 = new Properties() 98 properties1.setProperty("spark.scheduler.pool", "1") 99 val properties2 = new Properties() 100 properties2.setProperty("spark.scheduler.pool", "2") 101 102 val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler) 103 val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler) 104 val taskSetManager12 = createTaskSetManager(2, 2, taskScheduler) 105 schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) 106 schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) 107 schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) 108 109 val taskSetManager23 = createTaskSetManager(3, 2, taskScheduler) 110 val taskSetManager24 = createTaskSetManager(4, 2, taskScheduler) 111 schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) 112 schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) 113 114 // Pool 1 share ratio: 0. Pool 2 share ratio: 0. 1 gets scheduled based on ordering of names. 115 scheduleTaskAndVerifyId(0, rootPool, 0) 116 // Pool 1 share ratio: 1/2. Pool 2 share ratio: 0. 2 gets scheduled because ratio is lower. 117 scheduleTaskAndVerifyId(1, rootPool, 3) 118 // Pool 1 share ratio: 1/2. Pool 2 share ratio: 1/3. 2 gets scheduled because ratio is lower. 119 scheduleTaskAndVerifyId(2, rootPool, 3) 120 // Pool 1 share ratio: 1/2. Pool 2 share ratio: 2/3. 1 gets scheduled because ratio is lower. 121 scheduleTaskAndVerifyId(3, rootPool, 1) 122 // Pool 1 share ratio: 1. Pool 2 share ratio: 2/3. 2 gets scheduled because ratio is lower. 123 scheduleTaskAndVerifyId(4, rootPool, 4) 124 // Neither pool is needy so ordering is based on number of running tasks. 125 // Pool 1 running tasks: 2, Pool 2 running tasks: 3. 1 gets scheduled because fewer running 126 // tasks. 127 scheduleTaskAndVerifyId(5, rootPool, 2) 128 // Pool 1 running tasks: 3, Pool 2 running tasks: 3. 1 gets scheduled because of naming 129 // ordering. 130 scheduleTaskAndVerifyId(6, rootPool, 2) 131 // Pool 1 running tasks: 4, Pool 2 running tasks: 3. 2 gets scheduled because fewer running 132 // tasks. 133 scheduleTaskAndVerifyId(7, rootPool, 4) 134 } 135 136 test("Nested Pool Test") { 137 sc = new SparkContext("local", "TaskSchedulerImplSuite") 138 val taskScheduler = new TaskSchedulerImpl(sc) 139 140 val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) 141 val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1) 142 val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1) 143 rootPool.addSchedulable(pool0) 144 rootPool.addSchedulable(pool1) 145 146 val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2) 147 val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1) 148 pool0.addSchedulable(pool00) 149 pool0.addSchedulable(pool01) 150 151 val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2) 152 val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1) 153 pool1.addSchedulable(pool10) 154 pool1.addSchedulable(pool11) 155 156 val taskSetManager000 = createTaskSetManager(0, 5, taskScheduler) 157 val taskSetManager001 = createTaskSetManager(1, 5, taskScheduler) 158 pool00.addSchedulable(taskSetManager000) 159 pool00.addSchedulable(taskSetManager001) 160 161 val taskSetManager010 = createTaskSetManager(2, 5, taskScheduler) 162 val taskSetManager011 = createTaskSetManager(3, 5, taskScheduler) 163 pool01.addSchedulable(taskSetManager010) 164 pool01.addSchedulable(taskSetManager011) 165 166 val taskSetManager100 = createTaskSetManager(4, 5, taskScheduler) 167 val taskSetManager101 = createTaskSetManager(5, 5, taskScheduler) 168 pool10.addSchedulable(taskSetManager100) 169 pool10.addSchedulable(taskSetManager101) 170 171 val taskSetManager110 = createTaskSetManager(6, 5, taskScheduler) 172 val taskSetManager111 = createTaskSetManager(7, 5, taskScheduler) 173 pool11.addSchedulable(taskSetManager110) 174 pool11.addSchedulable(taskSetManager111) 175 176 scheduleTaskAndVerifyId(0, rootPool, 0) 177 scheduleTaskAndVerifyId(1, rootPool, 4) 178 scheduleTaskAndVerifyId(2, rootPool, 6) 179 scheduleTaskAndVerifyId(3, rootPool, 2) 180 } 181} 182