1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.scheduler
19
20import java.util.Properties
21
22import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
23
24/**
25 * Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
26 * correctly.
27 */
28class PoolSuite extends SparkFunSuite with LocalSparkContext {
29
30  def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
31    : TaskSetManager = {
32    val tasks = Array.tabulate[Task[_]](numTasks) { i =>
33      new FakeTask(stageId, i, Nil)
34    }
35    new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
36  }
37
38  def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: Int) {
39    val taskSetQueue = rootPool.getSortedTaskSetQueue
40    val nextTaskSetToSchedule =
41      taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks)
42    assert(nextTaskSetToSchedule.isDefined)
43    nextTaskSetToSchedule.get.addRunningTask(taskId)
44    assert(nextTaskSetToSchedule.get.stageId === expectedStageId)
45  }
46
47  test("FIFO Scheduler Test") {
48    sc = new SparkContext("local", "TaskSchedulerImplSuite")
49    val taskScheduler = new TaskSchedulerImpl(sc)
50
51    val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
52    val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
53    schedulableBuilder.buildPools()
54
55    val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
56    val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
57    val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler)
58    schedulableBuilder.addTaskSetManager(taskSetManager0, null)
59    schedulableBuilder.addTaskSetManager(taskSetManager1, null)
60    schedulableBuilder.addTaskSetManager(taskSetManager2, null)
61
62    scheduleTaskAndVerifyId(0, rootPool, 0)
63    scheduleTaskAndVerifyId(1, rootPool, 0)
64    scheduleTaskAndVerifyId(2, rootPool, 1)
65    scheduleTaskAndVerifyId(3, rootPool, 1)
66    scheduleTaskAndVerifyId(4, rootPool, 2)
67    scheduleTaskAndVerifyId(5, rootPool, 2)
68  }
69
70  /**
71   * This test creates three scheduling pools, and creates task set managers in the first
72   * two scheduling pools. The test verifies that as tasks are scheduled, the fair scheduling
73   * algorithm properly orders the two scheduling pools.
74   */
75  test("Fair Scheduler Test") {
76    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
77    val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
78    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
79    val taskScheduler = new TaskSchedulerImpl(sc)
80
81    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
82    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
83    schedulableBuilder.buildPools()
84
85    // Ensure that the XML file was read in correctly.
86    assert(rootPool.getSchedulableByName("default") != null)
87    assert(rootPool.getSchedulableByName("1") != null)
88    assert(rootPool.getSchedulableByName("2") != null)
89    assert(rootPool.getSchedulableByName("3") != null)
90    assert(rootPool.getSchedulableByName("1").minShare === 2)
91    assert(rootPool.getSchedulableByName("1").weight === 1)
92    assert(rootPool.getSchedulableByName("2").minShare === 3)
93    assert(rootPool.getSchedulableByName("2").weight === 1)
94    assert(rootPool.getSchedulableByName("3").minShare === 0)
95    assert(rootPool.getSchedulableByName("3").weight === 1)
96
97    val properties1 = new Properties()
98    properties1.setProperty("spark.scheduler.pool", "1")
99    val properties2 = new Properties()
100    properties2.setProperty("spark.scheduler.pool", "2")
101
102    val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
103    val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
104    val taskSetManager12 = createTaskSetManager(2, 2, taskScheduler)
105    schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
106    schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
107    schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
108
109    val taskSetManager23 = createTaskSetManager(3, 2, taskScheduler)
110    val taskSetManager24 = createTaskSetManager(4, 2, taskScheduler)
111    schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
112    schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
113
114    // Pool 1 share ratio: 0. Pool 2 share ratio: 0. 1 gets scheduled based on ordering of names.
115    scheduleTaskAndVerifyId(0, rootPool, 0)
116    // Pool 1 share ratio: 1/2. Pool 2 share ratio: 0. 2 gets scheduled because ratio is lower.
117    scheduleTaskAndVerifyId(1, rootPool, 3)
118    // Pool 1 share ratio: 1/2. Pool 2 share ratio: 1/3. 2 gets scheduled because ratio is lower.
119    scheduleTaskAndVerifyId(2, rootPool, 3)
120    // Pool 1 share ratio: 1/2. Pool 2 share ratio: 2/3. 1 gets scheduled because ratio is lower.
121    scheduleTaskAndVerifyId(3, rootPool, 1)
122    // Pool 1 share ratio: 1. Pool 2 share ratio: 2/3. 2 gets scheduled because ratio is lower.
123    scheduleTaskAndVerifyId(4, rootPool, 4)
124    // Neither pool is needy so ordering is based on number of running tasks.
125    // Pool 1 running tasks: 2, Pool 2 running tasks: 3. 1 gets scheduled because fewer running
126    // tasks.
127    scheduleTaskAndVerifyId(5, rootPool, 2)
128    // Pool 1 running tasks: 3, Pool 2 running tasks: 3. 1 gets scheduled because of naming
129    // ordering.
130    scheduleTaskAndVerifyId(6, rootPool, 2)
131    // Pool 1 running tasks: 4, Pool 2 running tasks: 3. 2 gets scheduled because fewer running
132    // tasks.
133    scheduleTaskAndVerifyId(7, rootPool, 4)
134  }
135
136  test("Nested Pool Test") {
137    sc = new SparkContext("local", "TaskSchedulerImplSuite")
138    val taskScheduler = new TaskSchedulerImpl(sc)
139
140    val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
141    val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
142    val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
143    rootPool.addSchedulable(pool0)
144    rootPool.addSchedulable(pool1)
145
146    val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
147    val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
148    pool0.addSchedulable(pool00)
149    pool0.addSchedulable(pool01)
150
151    val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
152    val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
153    pool1.addSchedulable(pool10)
154    pool1.addSchedulable(pool11)
155
156    val taskSetManager000 = createTaskSetManager(0, 5, taskScheduler)
157    val taskSetManager001 = createTaskSetManager(1, 5, taskScheduler)
158    pool00.addSchedulable(taskSetManager000)
159    pool00.addSchedulable(taskSetManager001)
160
161    val taskSetManager010 = createTaskSetManager(2, 5, taskScheduler)
162    val taskSetManager011 = createTaskSetManager(3, 5, taskScheduler)
163    pool01.addSchedulable(taskSetManager010)
164    pool01.addSchedulable(taskSetManager011)
165
166    val taskSetManager100 = createTaskSetManager(4, 5, taskScheduler)
167    val taskSetManager101 = createTaskSetManager(5, 5, taskScheduler)
168    pool10.addSchedulable(taskSetManager100)
169    pool10.addSchedulable(taskSetManager101)
170
171    val taskSetManager110 = createTaskSetManager(6, 5, taskScheduler)
172    val taskSetManager111 = createTaskSetManager(7, 5, taskScheduler)
173    pool11.addSchedulable(taskSetManager110)
174    pool11.addSchedulable(taskSetManager111)
175
176    scheduleTaskAndVerifyId(0, rootPool, 0)
177    scheduleTaskAndVerifyId(1, rootPool, 4)
178    scheduleTaskAndVerifyId(2, rootPool, 6)
179    scheduleTaskAndVerifyId(3, rootPool, 2)
180  }
181}
182