1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.scheduler
19
20import java.nio.ByteBuffer
21
22import scala.collection.mutable.HashMap
23
24import org.mockito.Matchers.{anyInt, anyString, eq => meq}
25import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when}
26import org.scalatest.BeforeAndAfterEach
27
28import org.apache.spark._
29import org.apache.spark.internal.config
30import org.apache.spark.internal.Logging
31
32class FakeSchedulerBackend extends SchedulerBackend {
33  def start() {}
34  def stop() {}
35  def reviveOffers() {}
36  def defaultParallelism(): Int = 1
37}
38
39class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach
40    with Logging {
41
42  var failedTaskSetException: Option[Throwable] = None
43  var failedTaskSetReason: String = null
44  var failedTaskSet = false
45
46  var taskScheduler: TaskSchedulerImpl = null
47  var dagScheduler: DAGScheduler = null
48
49  override def beforeEach(): Unit = {
50    super.beforeEach()
51    failedTaskSet = false
52    failedTaskSetException = None
53    failedTaskSetReason = null
54  }
55
56  override def afterEach(): Unit = {
57    super.afterEach()
58    if (taskScheduler != null) {
59      taskScheduler.stop()
60      taskScheduler = null
61    }
62    if (dagScheduler != null) {
63      dagScheduler.stop()
64      dagScheduler = null
65    }
66  }
67
68  def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
69    val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
70    confs.foreach { case (k, v) =>
71      conf.set(k, v)
72    }
73    sc = new SparkContext(conf)
74    taskScheduler = new TaskSchedulerImpl(sc)
75    taskScheduler.initialize(new FakeSchedulerBackend)
76    // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
77    dagScheduler = new DAGScheduler(sc, taskScheduler) {
78      override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
79      override def executorAdded(execId: String, host: String): Unit = {}
80      override def taskSetFailed(
81          taskSet: TaskSet,
82          reason: String,
83          exception: Option[Throwable]): Unit = {
84        // Normally the DAGScheduler puts this in the event loop, which will eventually fail
85        // dependent jobs
86        failedTaskSet = true
87        failedTaskSetReason = reason
88        failedTaskSetException = exception
89      }
90    }
91    taskScheduler
92  }
93
94  test("Scheduler does not always schedule tasks on the same workers") {
95    val taskScheduler = setupScheduler()
96    val numFreeCores = 1
97    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores),
98      new WorkerOffer("executor1", "host1", numFreeCores))
99    // Repeatedly try to schedule a 1-task job, and make sure that it doesn't always
100    // get scheduled on the same executor. While there is a chance this test will fail
101    // because the task randomly gets placed on the first executor all 1000 times, the
102    // probability of that happening is 2^-1000 (so sufficiently small to be considered
103    // negligible).
104    val numTrials = 1000
105    val selectedExecutorIds = 1.to(numTrials).map { _ =>
106      val taskSet = FakeTask.createTaskSet(1)
107      taskScheduler.submitTasks(taskSet)
108      val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
109      assert(1 === taskDescriptions.length)
110      taskDescriptions(0).executorId
111    }
112    val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
113    assert(count > 0)
114    assert(count < numTrials)
115    assert(!failedTaskSet)
116  }
117
118  test("Scheduler correctly accounts for multiple CPUs per task") {
119    val taskCpus = 2
120    val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
121    // Give zero core offers. Should not generate any tasks
122    val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0),
123      new WorkerOffer("executor1", "host1", 0))
124    val taskSet = FakeTask.createTaskSet(1)
125    taskScheduler.submitTasks(taskSet)
126    var taskDescriptions = taskScheduler.resourceOffers(zeroCoreWorkerOffers).flatten
127    assert(0 === taskDescriptions.length)
128
129    // No tasks should run as we only have 1 core free.
130    val numFreeCores = 1
131    val singleCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores),
132      new WorkerOffer("executor1", "host1", numFreeCores))
133    taskScheduler.submitTasks(taskSet)
134    taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
135    assert(0 === taskDescriptions.length)
136
137    // Now change the offers to have 2 cores in one executor and verify if it
138    // is chosen.
139    val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus),
140      new WorkerOffer("executor1", "host1", numFreeCores))
141    taskScheduler.submitTasks(taskSet)
142    taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
143    assert(1 === taskDescriptions.length)
144    assert("executor0" === taskDescriptions(0).executorId)
145    assert(!failedTaskSet)
146  }
147
148  test("Scheduler does not crash when tasks are not serializable") {
149    val taskCpus = 2
150    val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
151    val numFreeCores = 1
152    val taskSet = new TaskSet(
153      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
154    val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus),
155      new WorkerOffer("executor1", "host1", numFreeCores))
156    taskScheduler.submitTasks(taskSet)
157    var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
158    assert(0 === taskDescriptions.length)
159    assert(failedTaskSet)
160    assert(failedTaskSetReason.contains("Failed to serialize task"))
161
162    // Now check that we can still submit tasks
163    // Even if one of the task sets has not-serializable tasks, the other task set should
164    // still be processed without error
165    taskScheduler.submitTasks(FakeTask.createTaskSet(1))
166    taskScheduler.submitTasks(taskSet)
167    taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
168    assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
169  }
170
171  test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") {
172    val taskScheduler = setupScheduler()
173    val attempt1 = FakeTask.createTaskSet(1, 0)
174    val attempt2 = FakeTask.createTaskSet(1, 1)
175    taskScheduler.submitTasks(attempt1)
176    intercept[IllegalStateException] { taskScheduler.submitTasks(attempt2) }
177
178    // OK to submit multiple if previous attempts are all zombie
179    taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId)
180      .get.isZombie = true
181    taskScheduler.submitTasks(attempt2)
182    val attempt3 = FakeTask.createTaskSet(1, 2)
183    intercept[IllegalStateException] { taskScheduler.submitTasks(attempt3) }
184    taskScheduler.taskSetManagerForAttempt(attempt2.stageId, attempt2.stageAttemptId)
185      .get.isZombie = true
186    taskScheduler.submitTasks(attempt3)
187    assert(!failedTaskSet)
188  }
189
190  test("don't schedule more tasks after a taskset is zombie") {
191    val taskScheduler = setupScheduler()
192
193    val numFreeCores = 1
194    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores))
195    val attempt1 = FakeTask.createTaskSet(10)
196
197    // submit attempt 1, offer some resources, some tasks get scheduled
198    taskScheduler.submitTasks(attempt1)
199    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
200    assert(1 === taskDescriptions.length)
201
202    // now mark attempt 1 as a zombie
203    taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId)
204      .get.isZombie = true
205
206    // don't schedule anything on another resource offer
207    val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten
208    assert(0 === taskDescriptions2.length)
209
210    // if we schedule another attempt for the same stage, it should get scheduled
211    val attempt2 = FakeTask.createTaskSet(10, 1)
212
213    // submit attempt 2, offer some resources, some tasks get scheduled
214    taskScheduler.submitTasks(attempt2)
215    val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten
216    assert(1 === taskDescriptions3.length)
217    val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions3(0).taskId).get
218    assert(mgr.taskSet.stageAttemptId === 1)
219    assert(!failedTaskSet)
220  }
221
222  test("if a zombie attempt finishes, continue scheduling tasks for non-zombie attempts") {
223    val taskScheduler = setupScheduler()
224
225    val numFreeCores = 10
226    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores))
227    val attempt1 = FakeTask.createTaskSet(10)
228
229    // submit attempt 1, offer some resources, some tasks get scheduled
230    taskScheduler.submitTasks(attempt1)
231    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
232    assert(10 === taskDescriptions.length)
233
234    // now mark attempt 1 as a zombie
235    val mgr1 = taskScheduler.taskSetManagerForAttempt(attempt1.stageId, attempt1.stageAttemptId).get
236    mgr1.isZombie = true
237
238    // don't schedule anything on another resource offer
239    val taskDescriptions2 = taskScheduler.resourceOffers(workerOffers).flatten
240    assert(0 === taskDescriptions2.length)
241
242    // submit attempt 2
243    val attempt2 = FakeTask.createTaskSet(10, 1)
244    taskScheduler.submitTasks(attempt2)
245
246    // attempt 1 finished (this can happen even if it was marked zombie earlier -- all tasks were
247    // already submitted, and then they finish)
248    taskScheduler.taskSetFinished(mgr1)
249
250    // now with another resource offer, we should still schedule all the tasks in attempt2
251    val taskDescriptions3 = taskScheduler.resourceOffers(workerOffers).flatten
252    assert(10 === taskDescriptions3.length)
253
254    taskDescriptions3.foreach { task =>
255      val mgr = taskScheduler.taskIdToTaskSetManager.get(task.taskId).get
256      assert(mgr.taskSet.stageAttemptId === 1)
257    }
258    assert(!failedTaskSet)
259  }
260
261  test("tasks are not re-scheduled while executor loss reason is pending") {
262    val taskScheduler = setupScheduler()
263
264    val e0Offers = IndexedSeq(new WorkerOffer("executor0", "host0", 1))
265    val e1Offers = IndexedSeq(new WorkerOffer("executor1", "host0", 1))
266    val attempt1 = FakeTask.createTaskSet(1)
267
268    // submit attempt 1, offer resources, task gets scheduled
269    taskScheduler.submitTasks(attempt1)
270    val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
271    assert(1 === taskDescriptions.length)
272
273    // mark executor0 as dead but pending fail reason
274    taskScheduler.executorLost("executor0", LossReasonPending)
275
276    // offer some more resources on a different executor, nothing should change
277    val taskDescriptions2 = taskScheduler.resourceOffers(e1Offers).flatten
278    assert(0 === taskDescriptions2.length)
279
280    // provide the actual loss reason for executor0
281    taskScheduler.executorLost("executor0", SlaveLost("oops"))
282
283    // executor0's tasks should have failed now that the loss reason is known, so offering more
284    // resources should make them be scheduled on the new executor.
285    val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
286    assert(1 === taskDescriptions3.length)
287    assert("executor1" === taskDescriptions3(0).executorId)
288    assert(!failedTaskSet)
289  }
290
291  test("abort stage if executor loss results in unschedulability from previously failed tasks") {
292    // Make sure we can detect when a taskset becomes unschedulable from a blacklisting.  This
293    // test explores a particular corner case -- you may have one task fail, but still be
294    // schedulable on another executor.  However, that executor may fail later on, leaving the
295    // first task with no place to run.
296    val taskScheduler = setupScheduler(
297      config.BLACKLIST_ENABLED.key -> "true"
298    )
299
300    val taskSet = FakeTask.createTaskSet(2)
301    taskScheduler.submitTasks(taskSet)
302    val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
303
304    val firstTaskAttempts = taskScheduler.resourceOffers(IndexedSeq(
305      new WorkerOffer("executor0", "host0", 1),
306      new WorkerOffer("executor1", "host1", 1)
307    )).flatten
308    assert(Set("executor0", "executor1") === firstTaskAttempts.map(_.executorId).toSet)
309
310    // fail one of the tasks, but leave the other running
311    val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
312    taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost)
313    // at this point, our failed task could run on the other executor, so don't give up the task
314    // set yet.
315    assert(!failedTaskSet)
316
317    // Now we fail our second executor.  The other task can still run on executor1, so make an offer
318    // on that executor, and make sure that the other task (not the failed one) is assigned there
319    taskScheduler.executorLost("executor1", SlaveLost("oops"))
320    val nextTaskAttempts =
321      taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))).flatten
322    // Note: Its OK if some future change makes this already realize the taskset has become
323    // unschedulable at this point (though in the current implementation, we're sure it will not)
324    assert(nextTaskAttempts.size === 1)
325    assert(nextTaskAttempts.head.executorId === "executor0")
326    assert(nextTaskAttempts.head.attemptNumber === 1)
327    assert(nextTaskAttempts.head.index != failedTask.index)
328
329    // now we should definitely realize that our task set is unschedulable, because the only
330    // task left can't be scheduled on any executors due to the blacklist
331    taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
332    sc.listenerBus.waitUntilEmpty(100000)
333    assert(tsm.isZombie)
334    assert(failedTaskSet)
335    val idx = failedTask.index
336    assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " +
337      s"cannot run anywhere due to node and executor blacklist.  Blacklisting behavior can be " +
338      s"configured via spark.blacklist.*.")
339  }
340
341  test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
342    // interaction of SPARK-15865 & SPARK-16106
343    // if we have a small number of tasks, we might be able to schedule them all on the first
344    // executor.  But if those tasks fail, we should still realize there is another executor
345    // available and not bail on the job
346
347    val taskScheduler = setupScheduler(
348      config.BLACKLIST_ENABLED.key -> "true"
349    )
350
351    val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ => Seq(TaskLocation("host0")) }: _*)
352    taskScheduler.submitTasks(taskSet)
353    val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
354
355    val offers = IndexedSeq(
356      // each offer has more than enough free cores for the entire task set, so when combined
357      // with the locality preferences, we schedule all tasks on one executor
358      new WorkerOffer("executor0", "host0", 4),
359      new WorkerOffer("executor1", "host1", 4)
360    )
361    val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
362    assert(firstTaskAttempts.size == 2)
363    firstTaskAttempts.foreach { taskAttempt => assert("executor0" === taskAttempt.executorId) }
364
365    // fail all the tasks on the bad executor
366    firstTaskAttempts.foreach { taskAttempt =>
367      taskScheduler.handleFailedTask(tsm, taskAttempt.taskId, TaskState.FAILED, TaskResultLost)
368    }
369
370    // Here is the main check of this test -- we have the same offers again, and we schedule it
371    // successfully.  Because the scheduler first tries to schedule with locality in mind, at first
372    // it won't schedule anything on executor1.  But despite that, we don't abort the job.  Then the
373    // scheduler tries for ANY locality, and successfully schedules tasks on executor1.
374    val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
375    assert(secondTaskAttempts.size == 2)
376    secondTaskAttempts.foreach { taskAttempt => assert("executor1" === taskAttempt.executorId) }
377    assert(!failedTaskSet)
378  }
379
380  test("SPARK-16106 locality levels updated if executor added to existing host") {
381    val taskScheduler = setupScheduler()
382
383    taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0,
384      (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _*
385    ))
386
387    val taskDescs = taskScheduler.resourceOffers(IndexedSeq(
388      new WorkerOffer("executor0", "host0", 1),
389      new WorkerOffer("executor1", "host1", 1)
390    )).flatten
391    // only schedule one task because of locality
392    assert(taskDescs.size === 1)
393
394    val mgr = taskScheduler.taskIdToTaskSetManager.get(taskDescs(0).taskId).get
395    assert(mgr.myLocalityLevels.toSet === Set(TaskLocality.NODE_LOCAL, TaskLocality.ANY))
396    // we should know about both executors, even though we only scheduled tasks on one of them
397    assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0")))
398    assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1")))
399
400    // when executor2 is added, we should realize that we can run process-local tasks.
401    // And we should know its alive on the host.
402    val secondTaskDescs = taskScheduler.resourceOffers(
403      IndexedSeq(new WorkerOffer("executor2", "host0", 1))).flatten
404    assert(secondTaskDescs.size === 1)
405    assert(mgr.myLocalityLevels.toSet ===
406      Set(TaskLocality.PROCESS_LOCAL, TaskLocality.NODE_LOCAL, TaskLocality.ANY))
407    assert(taskScheduler.getExecutorsAliveOnHost("host0") === Some(Set("executor0", "executor2")))
408    assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1")))
409
410    // And even if we don't have anything left to schedule, another resource offer on yet another
411    // executor should also update the set of live executors
412    val thirdTaskDescs = taskScheduler.resourceOffers(
413      IndexedSeq(new WorkerOffer("executor3", "host1", 1))).flatten
414    assert(thirdTaskDescs.size === 0)
415    assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
416  }
417  test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") {
418    sc = new SparkContext("local", "TaskSchedulerImplSuite")
419    val taskScheduler = new TaskSchedulerImpl(sc)
420    taskScheduler.initialize(new FakeSchedulerBackend)
421    // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
422    new DAGScheduler(sc, taskScheduler) {
423      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
424      override def executorAdded(execId: String, host: String) {}
425    }
426
427    val e0Offers = IndexedSeq(WorkerOffer("executor0", "host0", 1))
428    val attempt1 = FakeTask.createTaskSet(1)
429
430    // submit attempt 1, offer resources, task gets scheduled
431    taskScheduler.submitTasks(attempt1)
432    val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
433    assert(1 === taskDescriptions.length)
434
435    // mark executor0 as dead
436    taskScheduler.executorLost("executor0", SlaveLost())
437    assert(!taskScheduler.isExecutorAlive("executor0"))
438    assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
439    assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
440
441
442    // Check that state associated with the lost task attempt is cleaned up:
443    assert(taskScheduler.taskIdToExecutorId.isEmpty)
444    assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
445    assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
446  }
447
448  test("if a task finishes with TaskState.LOST its executor is marked as dead") {
449    sc = new SparkContext("local", "TaskSchedulerImplSuite")
450    val taskScheduler = new TaskSchedulerImpl(sc)
451    taskScheduler.initialize(new FakeSchedulerBackend)
452    // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
453    new DAGScheduler(sc, taskScheduler) {
454      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
455      override def executorAdded(execId: String, host: String) {}
456    }
457
458    val e0Offers = IndexedSeq(WorkerOffer("executor0", "host0", 1))
459    val attempt1 = FakeTask.createTaskSet(1)
460
461    // submit attempt 1, offer resources, task gets scheduled
462    taskScheduler.submitTasks(attempt1)
463    val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
464    assert(1 === taskDescriptions.length)
465
466    // Report the task as failed with TaskState.LOST
467    taskScheduler.statusUpdate(
468      tid = taskDescriptions.head.taskId,
469      state = TaskState.LOST,
470      serializedData = ByteBuffer.allocate(0)
471    )
472
473    // Check that state associated with the lost task attempt is cleaned up:
474    assert(taskScheduler.taskIdToExecutorId.isEmpty)
475    assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
476    assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
477
478    // Check that the executor has been marked as dead
479    assert(!taskScheduler.isExecutorAlive("executor0"))
480    assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
481    assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
482  }
483}
484