1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.scheduler
19
20import java.util.Random
21
22import scala.collection.mutable
23import scala.collection.mutable.ArrayBuffer
24
25import org.mockito.Mockito.{mock, verify}
26
27import org.apache.spark._
28import org.apache.spark.internal.config
29import org.apache.spark.internal.Logging
30import org.apache.spark.util.{AccumulatorV2, ManualClock}
31
32class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
33  extends DAGScheduler(sc) {
34
35  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
36    taskScheduler.startedTasks += taskInfo.index
37  }
38
39  override def taskEnded(
40      task: Task[_],
41      reason: TaskEndReason,
42      result: Any,
43      accumUpdates: Seq[AccumulatorV2[_, _]],
44      taskInfo: TaskInfo) {
45    taskScheduler.endedTasks(taskInfo.index) = reason
46  }
47
48  override def executorAdded(execId: String, host: String) {}
49
50  override def executorLost(execId: String, reason: ExecutorLossReason) {}
51
52  override def taskSetFailed(
53      taskSet: TaskSet,
54      reason: String,
55      exception: Option[Throwable]): Unit = {
56    taskScheduler.taskSetsFailed += taskSet.id
57  }
58}
59
60// Get the rack for a given host
61object FakeRackUtil {
62  private val hostToRack = new mutable.HashMap[String, String]()
63
64  def cleanUp() {
65    hostToRack.clear()
66  }
67
68  def assignHostToRack(host: String, rack: String) {
69    hostToRack(host) = rack
70  }
71
72  def getRackForHost(host: String): Option[String] = {
73    hostToRack.get(host)
74  }
75}
76
77/**
78 * A mock TaskSchedulerImpl implementation that just remembers information about tasks started and
79 * feedback received from the TaskSetManagers. Note that it's important to initialize this with
80 * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
81 * to work, and these are required for locality in TaskSetManager.
82 */
83class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
84  extends TaskSchedulerImpl(sc)
85{
86  val startedTasks = new ArrayBuffer[Long]
87  val endedTasks = new mutable.HashMap[Long, TaskEndReason]
88  val finishedManagers = new ArrayBuffer[TaskSetManager]
89  val taskSetsFailed = new ArrayBuffer[String]
90
91  val executors = new mutable.HashMap[String, String]
92  for ((execId, host) <- liveExecutors) {
93    addExecutor(execId, host)
94  }
95
96  for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) {
97    hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
98  }
99
100  dagScheduler = new FakeDAGScheduler(sc, this)
101
102  def removeExecutor(execId: String) {
103    executors -= execId
104    val host = executorIdToHost.get(execId)
105    assert(host != None)
106    val hostId = host.get
107    val executorsOnHost = hostToExecutors(hostId)
108    executorsOnHost -= execId
109    for (rack <- getRackForHost(hostId); hosts <- hostsByRack.get(rack)) {
110      hosts -= hostId
111      if (hosts.isEmpty) {
112        hostsByRack -= rack
113      }
114    }
115  }
116
117  override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
118
119  override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
120
121  override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
122
123  override def hasHostAliveOnRack(rack: String): Boolean = {
124    hostsByRack.get(rack) != None
125  }
126
127  def addExecutor(execId: String, host: String) {
128    executors.put(execId, host)
129    val executorsOnHost = hostToExecutors.getOrElseUpdate(host, new mutable.HashSet[String])
130    executorsOnHost += execId
131    executorIdToHost += execId -> host
132    for (rack <- getRackForHost(host)) {
133      hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
134    }
135  }
136
137  override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
138}
139
140/**
141 * A Task implementation that results in a large serialized task.
142 */
143class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) {
144
145  val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024)
146  val random = new Random(0)
147  random.nextBytes(randomBuffer)
148
149  override def runTask(context: TaskContext): Array[Byte] = randomBuffer
150  override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
151}
152
153class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logging {
154  import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL}
155
156  private val conf = new SparkConf
157
158  val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
159  val MAX_TASK_FAILURES = 4
160
161  var sched: FakeTaskScheduler = null
162
163  override def beforeEach(): Unit = {
164    super.beforeEach()
165    FakeRackUtil.cleanUp()
166    sched = null
167  }
168
169  override def afterEach(): Unit = {
170    super.afterEach()
171    if (sched != null) {
172      sched.dagScheduler.stop()
173      sched.stop()
174      sched = null
175    }
176  }
177
178
179  test("TaskSet with no preferences") {
180    sc = new SparkContext("local", "test")
181    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
182    val taskSet = FakeTask.createTaskSet(1)
183    val clock = new ManualClock
184    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
185    val accumUpdates = taskSet.tasks.head.metrics.internalAccums
186
187    // Offer a host with NO_PREF as the constraint,
188    // we should get a nopref task immediately since that's what we only have
189    val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
190    assert(taskOption.isDefined)
191
192    // Tell it the task has finished
193    manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
194    assert(sched.endedTasks(0) === Success)
195    assert(sched.finishedManagers.contains(manager))
196  }
197
198  test("multiple offers with no preferences") {
199    sc = new SparkContext("local", "test")
200    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
201    val taskSet = FakeTask.createTaskSet(3)
202    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
203    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
204      task.metrics.internalAccums
205    }
206
207    // First three offers should all find tasks
208    for (i <- 0 until 3) {
209      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
210      assert(taskOption.isDefined)
211      val task = taskOption.get
212      assert(task.executorId === "exec1")
213    }
214    assert(sched.startedTasks.toSet === Set(0, 1, 2))
215
216    // Re-offer the host -- now we should get no more tasks
217    assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None)
218
219    // Finish the first two tasks
220    manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdatesByTask(0)))
221    manager.handleSuccessfulTask(1, createTaskResult(1, accumUpdatesByTask(1)))
222    assert(sched.endedTasks(0) === Success)
223    assert(sched.endedTasks(1) === Success)
224    assert(!sched.finishedManagers.contains(manager))
225
226    // Finish the last task
227    manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2)))
228    assert(sched.endedTasks(2) === Success)
229    assert(sched.finishedManagers.contains(manager))
230  }
231
232  test("skip unsatisfiable locality levels") {
233    sc = new SparkContext("local", "test")
234    sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
235    val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
236    val clock = new ManualClock
237    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
238
239    // An executor that is not NODE_LOCAL should be rejected.
240    assert(manager.resourceOffer("execC", "host2", ANY) === None)
241
242    // Because there are no alive PROCESS_LOCAL executors, the base locality level should be
243    // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
244    // any of the locality wait timers expire.
245    assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0)
246  }
247
248  test("basic delay scheduling") {
249    sc = new SparkContext("local", "test")
250    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
251    val taskSet = FakeTask.createTaskSet(4,
252      Seq(TaskLocation("host1", "exec1")),
253      Seq(TaskLocation("host2", "exec2")),
254      Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
255      Seq()   // Last task has no locality prefs
256    )
257    val clock = new ManualClock
258    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
259    // First offer host1, exec1: first task should be chosen
260    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
261    assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
262
263    clock.advance(LOCALITY_WAIT_MS)
264    // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 3) should
265    // get chosen before the noPref task
266    assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
267
268    // Offer host2, exec2, at NODE_LOCAL level: we should choose task 2
269    assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1)
270
271    // Offer host2, exec2 again, at NODE_LOCAL level: we should get noPref task
272    // after failing to find a node_Local task
273    assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
274    clock.advance(LOCALITY_WAIT_MS)
275    assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
276  }
277
278  test("we do not need to delay scheduling when we only have noPref tasks in the queue") {
279    sc = new SparkContext("local", "test")
280    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2"))
281    val taskSet = FakeTask.createTaskSet(3,
282      Seq(TaskLocation("host1", "exec1")),
283      Seq(TaskLocation("host2", "exec3")),
284      Seq()   // Last task has no locality prefs
285    )
286    val clock = new ManualClock
287    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
288    // First offer host1, exec1: first task should be chosen
289    assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
290    assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1)
291    assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None)
292    assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2)
293  }
294
295  test("delay scheduling with fallback") {
296    sc = new SparkContext("local", "test")
297    sched = new FakeTaskScheduler(sc,
298      ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
299    val taskSet = FakeTask.createTaskSet(5,
300      Seq(TaskLocation("host1")),
301      Seq(TaskLocation("host2")),
302      Seq(TaskLocation("host2")),
303      Seq(TaskLocation("host3")),
304      Seq(TaskLocation("host2"))
305    )
306    val clock = new ManualClock
307    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
308
309    // First offer host1: first task should be chosen
310    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
311
312    // Offer host1 again: nothing should get chosen
313    assert(manager.resourceOffer("exec1", "host1", ANY) === None)
314
315    clock.advance(LOCALITY_WAIT_MS)
316
317    // Offer host1 again: second task (on host2) should get chosen
318    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
319
320    // Offer host1 again: third task (on host2) should get chosen
321    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
322
323    // Offer host2: fifth task (also on host2) should get chosen
324    assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4)
325
326    // Now that we've launched a local task, we should no longer launch the task for host3
327    assert(manager.resourceOffer("exec2", "host2", ANY) === None)
328
329    clock.advance(LOCALITY_WAIT_MS)
330
331    // After another delay, we can go ahead and launch that task non-locally
332    assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
333  }
334
335  test("delay scheduling with failed hosts") {
336    sc = new SparkContext("local", "test")
337    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
338      ("exec3", "host3"))
339    val taskSet = FakeTask.createTaskSet(3,
340      Seq(TaskLocation("host1")),
341      Seq(TaskLocation("host2")),
342      Seq(TaskLocation("host3"))
343    )
344    val clock = new ManualClock
345    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
346
347    // First offer host1: first task should be chosen
348    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
349
350    // After this, nothing should get chosen, because we have separated tasks with unavailable
351    // preference from the noPrefPendingTasks
352    assert(manager.resourceOffer("exec1", "host1", ANY) === None)
353
354    // Now mark host2 as dead
355    sched.removeExecutor("exec2")
356    manager.executorLost("exec2", "host2", SlaveLost())
357
358    // nothing should be chosen
359    assert(manager.resourceOffer("exec1", "host1", ANY) === None)
360
361    clock.advance(LOCALITY_WAIT_MS * 2)
362
363    // task 1 and 2 would be scheduled as nonLocal task
364    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
365    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
366
367    // all finished
368    assert(manager.resourceOffer("exec1", "host1", ANY) === None)
369    assert(manager.resourceOffer("exec2", "host2", ANY) === None)
370  }
371
372  test("task result lost") {
373    sc = new SparkContext("local", "test")
374    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
375    val taskSet = FakeTask.createTaskSet(1)
376    val clock = new ManualClock
377    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
378
379    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
380
381    // Tell it the task has finished but the result was lost.
382    manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost)
383    assert(sched.endedTasks(0) === TaskResultLost)
384
385    // Re-offer the host -- now we should get task 0 again.
386    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
387  }
388
389  test("repeated failures lead to task set abortion") {
390    sc = new SparkContext("local", "test")
391    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
392    val taskSet = FakeTask.createTaskSet(1)
393    val clock = new ManualClock
394    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
395
396    // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
397    // after the last failure.
398    (1 to manager.maxTaskFailures).foreach { index =>
399      val offerResult = manager.resourceOffer("exec1", "host1", ANY)
400      assert(offerResult.isDefined,
401        "Expect resource offer on iteration %s to return a task".format(index))
402      assert(offerResult.get.index === 0)
403      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
404      if (index < MAX_TASK_FAILURES) {
405        assert(!sched.taskSetsFailed.contains(taskSet.id))
406      } else {
407        assert(sched.taskSetsFailed.contains(taskSet.id))
408      }
409    }
410  }
411
412  test("executors should be blacklisted after task failure, in spite of locality preferences") {
413    val rescheduleDelay = 300L
414    val conf = new SparkConf().
415      set(config.BLACKLIST_ENABLED, true).
416      set(config.BLACKLIST_TIMEOUT_CONF, rescheduleDelay).
417      // don't wait to jump locality levels in this test
418      set("spark.locality.wait", "0")
419
420    sc = new SparkContext("local", "test", conf)
421    // two executors on same host, one on different.
422    sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
423      ("exec1.1", "host1"), ("exec2", "host2"))
424    // affinity to exec1 on host1 - which we will fail.
425    val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
426    val clock = new ManualClock
427    val manager = new TaskSetManager(sched, taskSet, 4, clock)
428
429    {
430      val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
431      assert(offerResult.isDefined, "Expect resource offer to return a task")
432
433      assert(offerResult.get.index === 0)
434      assert(offerResult.get.executorId === "exec1")
435
436      // Cause exec1 to fail : failure 1
437      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
438      assert(!sched.taskSetsFailed.contains(taskSet.id))
439
440      // Ensure scheduling on exec1 fails after failure 1 due to blacklist
441      assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
442      assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty)
443      assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty)
444      assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
445    }
446
447    // Run the task on exec1.1 - should work, and then fail it on exec1.1
448    {
449      val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)
450      assert(offerResult.isDefined,
451        "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
452
453      assert(offerResult.get.index === 0)
454      assert(offerResult.get.executorId === "exec1.1")
455
456      // Cause exec1.1 to fail : failure 2
457      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
458      assert(!sched.taskSetsFailed.contains(taskSet.id))
459
460      // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
461      assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty)
462    }
463
464    // Run the task on exec2 - should work, and then fail it on exec2
465    {
466      val offerResult = manager.resourceOffer("exec2", "host2", ANY)
467      assert(offerResult.isDefined, "Expect resource offer to return a task")
468
469      assert(offerResult.get.index === 0)
470      assert(offerResult.get.executorId === "exec2")
471
472      // Cause exec2 to fail : failure 3
473      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
474      assert(!sched.taskSetsFailed.contains(taskSet.id))
475
476      // Ensure scheduling on exec2 fails after failure 3 due to blacklist
477      assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
478    }
479
480    // Despite advancing beyond the time for expiring executors from within the blacklist,
481    // we *never* expire from *within* the stage blacklist
482    clock.advance(rescheduleDelay)
483
484    {
485      val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
486      assert(offerResult.isEmpty)
487    }
488
489    {
490      val offerResult = manager.resourceOffer("exec3", "host3", ANY)
491      assert(offerResult.isDefined)
492      assert(offerResult.get.index === 0)
493      assert(offerResult.get.executorId === "exec3")
494
495      assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty)
496
497      // Cause exec3 to fail : failure 4
498      manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
499    }
500
501    // we have failed the same task 4 times now : task id should now be in taskSetsFailed
502    assert(sched.taskSetsFailed.contains(taskSet.id))
503  }
504
505  test("new executors get added and lost") {
506    // Assign host2 to rack2
507    FakeRackUtil.assignHostToRack("host2", "rack2")
508    sc = new SparkContext("local", "test")
509    sched = new FakeTaskScheduler(sc)
510    val taskSet = FakeTask.createTaskSet(4,
511      Seq(TaskLocation("host1", "execA")),
512      Seq(TaskLocation("host1", "execB")),
513      Seq(TaskLocation("host2", "execC")),
514      Seq())
515    val clock = new ManualClock
516    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
517    // Only ANY is valid
518    assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
519    // Add a new executor
520    sched.addExecutor("execD", "host1")
521    manager.executorAdded()
522    // Valid locality should contain NODE_LOCAL and ANY
523    assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
524    // Add another executor
525    sched.addExecutor("execC", "host2")
526    manager.executorAdded()
527    // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
528    assert(manager.myLocalityLevels.sameElements(
529      Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
530    // test if the valid locality is recomputed when the executor is lost
531    sched.removeExecutor("execC")
532    manager.executorLost("execC", "host2", SlaveLost())
533    assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
534    sched.removeExecutor("execD")
535    manager.executorLost("execD", "host1", SlaveLost())
536    assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
537  }
538
539  test("Executors exit for reason unrelated to currently running tasks") {
540    sc = new SparkContext("local", "test")
541    sched = new FakeTaskScheduler(sc)
542    val taskSet = FakeTask.createTaskSet(4,
543      Seq(TaskLocation("host1", "execA")),
544      Seq(TaskLocation("host1", "execB")),
545      Seq(TaskLocation("host2", "execC")),
546      Seq())
547    val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
548    sched.addExecutor("execA", "host1")
549    manager.executorAdded()
550    sched.addExecutor("execC", "host2")
551    manager.executorAdded()
552    assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
553    sched.removeExecutor("execA")
554    manager.executorLost(
555      "execA",
556      "host1",
557      ExecutorExited(143, false, "Terminated for reason unrelated to running tasks"))
558    assert(!sched.taskSetsFailed.contains(taskSet.id))
559    assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
560    sched.removeExecutor("execC")
561    manager.executorLost(
562      "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks"))
563    assert(sched.taskSetsFailed.contains(taskSet.id))
564  }
565
566  test("test RACK_LOCAL tasks") {
567    // Assign host1 to rack1
568    FakeRackUtil.assignHostToRack("host1", "rack1")
569    // Assign host2 to rack1
570    FakeRackUtil.assignHostToRack("host2", "rack1")
571    // Assign host3 to rack2
572    FakeRackUtil.assignHostToRack("host3", "rack2")
573    sc = new SparkContext("local", "test")
574    sched = new FakeTaskScheduler(sc,
575      ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
576    val taskSet = FakeTask.createTaskSet(2,
577      Seq(TaskLocation("host1", "execA")),
578      Seq(TaskLocation("host1", "execA")))
579    val clock = new ManualClock
580    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
581
582    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
583    // Set allowed locality to ANY
584    clock.advance(LOCALITY_WAIT_MS * 3)
585    // Offer host3
586    // No task is scheduled if we restrict locality to RACK_LOCAL
587    assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
588    // Task 0 can be scheduled with ANY
589    assert(manager.resourceOffer("execC", "host3", ANY).get.index === 0)
590    // Offer host2
591    // Task 1 can be scheduled with RACK_LOCAL
592    assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
593  }
594
595  test("do not emit warning when serialized task is small") {
596    sc = new SparkContext("local", "test")
597    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
598    val taskSet = FakeTask.createTaskSet(1)
599    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
600
601    assert(!manager.emittedTaskSizeWarning)
602
603    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
604
605    assert(!manager.emittedTaskSizeWarning)
606  }
607
608  test("emit warning when serialized task is large") {
609    sc = new SparkContext("local", "test")
610    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
611
612    val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
613    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
614
615    assert(!manager.emittedTaskSizeWarning)
616
617    assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
618
619    assert(manager.emittedTaskSizeWarning)
620  }
621
622  test("Not serializable exception thrown if the task cannot be serialized") {
623    sc = new SparkContext("local", "test")
624    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
625
626    val taskSet = new TaskSet(
627      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
628    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
629
630    intercept[TaskNotSerializableException] {
631      manager.resourceOffer("exec1", "host1", ANY)
632    }
633    assert(manager.isZombie)
634  }
635
636  test("abort the job if total size of results is too large") {
637    val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
638    sc = new SparkContext("local", "test", conf)
639
640    def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) =>
641      val bytes = Array.ofDim[Byte](size)
642      scala.util.Random.nextBytes(bytes)
643      bytes
644    }
645
646    // multiple 1k result
647    val r = sc.makeRDD(0 until 10, 10).map(genBytes(1024)).collect()
648    assert(10 === r.size)
649
650    // single 10M result
651    val thrown = intercept[SparkException] {sc.makeRDD(genBytes(10 << 20)(0), 1).collect()}
652    assert(thrown.getMessage().contains("bigger than spark.driver.maxResultSize"))
653
654    // multiple 1M results
655    val thrown2 = intercept[SparkException] {
656      sc.makeRDD(0 until 10, 10).map(genBytes(1 << 20)).collect()
657    }
658    assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize"))
659  }
660
661  test("speculative and noPref task should be scheduled after node-local") {
662    sc = new SparkContext("local", "test")
663    sched = new FakeTaskScheduler(
664      sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
665    val taskSet = FakeTask.createTaskSet(4,
666      Seq(TaskLocation("host1", "execA")),
667      Seq(TaskLocation("host2"), TaskLocation("host1")),
668      Seq(),
669      Seq(TaskLocation("host3", "execC")))
670    val clock = new ManualClock
671    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
672
673    assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
674    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
675    assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
676
677    manager.speculatableTasks += 1
678    clock.advance(LOCALITY_WAIT_MS)
679    // schedule the nonPref task
680    assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
681    // schedule the speculative task
682    assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
683    clock.advance(LOCALITY_WAIT_MS * 3)
684    // schedule non-local tasks
685    assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
686  }
687
688  test("node-local tasks should be scheduled right away " +
689    "when there are only node-local and no-preference tasks") {
690    sc = new SparkContext("local", "test")
691    sched = new FakeTaskScheduler(
692      sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
693    val taskSet = FakeTask.createTaskSet(4,
694      Seq(TaskLocation("host1")),
695      Seq(TaskLocation("host2")),
696      Seq(),
697      Seq(TaskLocation("host3")))
698    val clock = new ManualClock
699    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
700
701    // node-local tasks are scheduled without delay
702    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
703    assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1)
704    assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3)
705    assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None)
706
707    // schedule no-preference after node local ones
708    assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
709  }
710
711  test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished")
712  {
713    sc = new SparkContext("local", "test")
714    sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
715    val taskSet = FakeTask.createTaskSet(4,
716      Seq(TaskLocation("host1")),
717      Seq(TaskLocation("host2")),
718      Seq(ExecutorCacheTaskLocation("host1", "execA")),
719      Seq(ExecutorCacheTaskLocation("host2", "execB")))
720    val clock = new ManualClock
721    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
722
723    // process-local tasks are scheduled first
724    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2)
725    assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3)
726    // node-local tasks are scheduled without delay
727    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
728    assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1)
729    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
730    assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None)
731  }
732
733  test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") {
734    sc = new SparkContext("local", "test")
735    sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
736    val taskSet = FakeTask.createTaskSet(3,
737      Seq(),
738      Seq(ExecutorCacheTaskLocation("host1", "execA")),
739      Seq(ExecutorCacheTaskLocation("host2", "execB")))
740    val clock = new ManualClock
741    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
742
743    // process-local tasks are scheduled first
744    assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1)
745    assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2)
746    // no-pref tasks are scheduled without delay
747    assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None)
748    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
749    assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0)
750    assert(manager.resourceOffer("execA", "host1", ANY) == None)
751  }
752
753  test("Ensure TaskSetManager is usable after addition of levels") {
754    // Regression test for SPARK-2931
755    sc = new SparkContext("local", "test")
756    sched = new FakeTaskScheduler(sc)
757    val taskSet = FakeTask.createTaskSet(2,
758      Seq(TaskLocation("host1", "execA")),
759      Seq(TaskLocation("host2", "execB.1")))
760    val clock = new ManualClock
761    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
762    // Only ANY is valid
763    assert(manager.myLocalityLevels.sameElements(Array(ANY)))
764    // Add a new executor
765    sched.addExecutor("execA", "host1")
766    sched.addExecutor("execB.2", "host2")
767    manager.executorAdded()
768    assert(manager.pendingTasksWithNoPrefs.size === 0)
769    // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
770    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
771    assert(manager.resourceOffer("execA", "host1", ANY) !== None)
772    clock.advance(LOCALITY_WAIT_MS * 4)
773    assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
774    sched.removeExecutor("execA")
775    sched.removeExecutor("execB.2")
776    manager.executorLost("execA", "host1", SlaveLost())
777    manager.executorLost("execB.2", "host2", SlaveLost())
778    clock.advance(LOCALITY_WAIT_MS * 4)
779    sched.addExecutor("execC", "host3")
780    manager.executorAdded()
781    // Prior to the fix, this line resulted in an ArrayIndexOutOfBoundsException:
782    assert(manager.resourceOffer("execC", "host3", ANY) !== None)
783  }
784
785  test("Test that locations with HDFSCacheTaskLocation are treated as PROCESS_LOCAL.") {
786    // Regression test for SPARK-2931
787    sc = new SparkContext("local", "test")
788    sched = new FakeTaskScheduler(sc,
789      ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
790    val taskSet = FakeTask.createTaskSet(3,
791      Seq(TaskLocation("host1")),
792      Seq(TaskLocation("host2")),
793      Seq(TaskLocation("hdfs_cache_host3")))
794    val clock = new ManualClock
795    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
796    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
797    sched.removeExecutor("execA")
798    manager.executorAdded()
799    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
800    sched.removeExecutor("execB")
801    manager.executorAdded()
802    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
803    sched.removeExecutor("execC")
804    manager.executorAdded()
805    assert(manager.myLocalityLevels.sameElements(Array(ANY)))
806  }
807
808  test("Test TaskLocation for different host type.") {
809    assert(TaskLocation("host1") === HostTaskLocation("host1"))
810    assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
811    assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
812    assert(TaskLocation("executor_some.host1_executor_task_3") ===
813      ExecutorCacheTaskLocation("some.host1", "executor_task_3"))
814  }
815
816  test("Kill other task attempts when one attempt belonging to the same task succeeds") {
817    sc = new SparkContext("local", "test")
818    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
819    val taskSet = FakeTask.createTaskSet(4)
820    // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
821    sc.conf.set("spark.speculation.multiplier", "0.0")
822    val clock = new ManualClock()
823    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
824    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
825      task.metrics.internalAccums
826    }
827    // Offer resources for 4 tasks to start
828    for ((k, v) <- List(
829        "exec1" -> "host1",
830        "exec1" -> "host1",
831        "exec2" -> "host2",
832        "exec2" -> "host2")) {
833      val taskOption = manager.resourceOffer(k, v, NO_PREF)
834      assert(taskOption.isDefined)
835      val task = taskOption.get
836      assert(task.executorId === k)
837    }
838    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
839    // Complete the 3 tasks and leave 1 task in running
840    for (id <- Set(0, 1, 2)) {
841      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
842      assert(sched.endedTasks(id) === Success)
843    }
844
845    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
846    // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
847    // > 0ms, so advance the clock by 1ms here.
848    clock.advance(1)
849    assert(manager.checkSpeculatableTasks(0))
850    // Offer resource to start the speculative attempt for the running task
851    val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
852    assert(taskOption5.isDefined)
853    val task5 = taskOption5.get
854    assert(task5.index === 3)
855    assert(task5.taskId === 4)
856    assert(task5.executorId === "exec1")
857    assert(task5.attemptNumber === 1)
858    sched.backend = mock(classOf[SchedulerBackend])
859    // Complete the speculative attempt for the running task
860    manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3)))
861    // Verify that it kills other running attempt
862    verify(sched.backend).killTask(3, "exec2", true)
863    // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be
864    // killed, so the FakeTaskScheduler is only told about the successful completion
865    // of the speculated task.
866    assert(sched.endedTasks(3) === Success)
867  }
868
869  test("Killing speculative tasks does not count towards aborting the taskset") {
870    sc = new SparkContext("local", "test")
871    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
872    val taskSet = FakeTask.createTaskSet(5)
873    // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
874    sc.conf.set("spark.speculation.multiplier", "0.0")
875    sc.conf.set("spark.speculation.quantile", "0.6")
876    val clock = new ManualClock()
877    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
878    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
879      task.metrics.internalAccums
880    }
881    // Offer resources for 5 tasks to start
882    val tasks = new ArrayBuffer[TaskDescription]()
883    for ((k, v) <- List(
884      "exec1" -> "host1",
885      "exec1" -> "host1",
886      "exec1" -> "host1",
887      "exec2" -> "host2",
888      "exec2" -> "host2")) {
889      val taskOption = manager.resourceOffer(k, v, NO_PREF)
890      assert(taskOption.isDefined)
891      val task = taskOption.get
892      assert(task.executorId === k)
893      tasks += task
894    }
895    assert(sched.startedTasks.toSet === (0 until 5).toSet)
896    // Complete 3 tasks and leave 2 tasks in running
897    for (id <- Set(0, 1, 2)) {
898      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
899      assert(sched.endedTasks(id) === Success)
900    }
901
902    def runningTaskForIndex(index: Int): TaskDescription = {
903      tasks.find { task =>
904        task.index == index && !sched.endedTasks.contains(task.taskId)
905      }.getOrElse {
906        throw new RuntimeException(s"couldn't find index $index in " +
907          s"tasks: ${tasks.map { t => t.index -> t.taskId }} with endedTasks:" +
908          s" ${sched.endedTasks.keys}")
909      }
910    }
911
912    // have each of the running tasks fail 3 times (not enough to abort the stage)
913    (0 until 3).foreach { attempt =>
914      Seq(3, 4).foreach { index =>
915        val task = runningTaskForIndex(index)
916        logInfo(s"failing task $task")
917        val endReason = ExceptionFailure("a", "b", Array(), "c", None)
918        manager.handleFailedTask(task.taskId, TaskState.FAILED, endReason)
919        sched.endedTasks(task.taskId) = endReason
920        assert(!manager.isZombie)
921        val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF)
922        assert(nextTask.isDefined, s"no offer for attempt $attempt of $index")
923        tasks += nextTask.get
924      }
925    }
926
927    // we can't be sure which one of our running tasks will get another speculative copy
928    val originalTasks = Seq(3, 4).map { index => index -> runningTaskForIndex(index) }.toMap
929
930    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
931    // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
932    // > 0ms, so advance the clock by 1ms here.
933    clock.advance(1)
934    assert(manager.checkSpeculatableTasks(0))
935    // Offer resource to start the speculative attempt for the running task
936    val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
937    assert(taskOption5.isDefined)
938    val speculativeTask = taskOption5.get
939    assert(speculativeTask.index === 3 || speculativeTask.index === 4)
940    assert(speculativeTask.taskId === 11)
941    assert(speculativeTask.executorId === "exec1")
942    assert(speculativeTask.attemptNumber === 4)
943    sched.backend = mock(classOf[SchedulerBackend])
944    // Complete the speculative attempt for the running task
945    manager.handleSuccessfulTask(speculativeTask.taskId, createTaskResult(3, accumUpdatesByTask(3)))
946    // Verify that it kills other running attempt
947    val origTask = originalTasks(speculativeTask.index)
948    verify(sched.backend).killTask(origTask.taskId, "exec2", true)
949    // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be
950    // killed, so the FakeTaskScheduler is only told about the successful completion
951    // of the speculated task.
952    assert(sched.endedTasks(3) === Success)
953    // also because the scheduler is a mock, our manager isn't notified about the task killed event,
954    // so we do that manually
955    manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled)
956    // this task has "failed" 4 times, but one of them doesn't count, so keep running the stage
957    assert(manager.tasksSuccessful === 4)
958    assert(!manager.isZombie)
959
960    // now run another speculative task
961    val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF)
962    assert(taskOpt6.isDefined)
963    val speculativeTask2 = taskOpt6.get
964    assert(speculativeTask2.index === 3 || speculativeTask2.index === 4)
965    assert(speculativeTask2.index !== speculativeTask.index)
966    assert(speculativeTask2.attemptNumber === 4)
967    // Complete the speculative attempt for the running task
968    manager.handleSuccessfulTask(speculativeTask2.taskId,
969      createTaskResult(3, accumUpdatesByTask(3)))
970    // Verify that it kills other running attempt
971    val origTask2 = originalTasks(speculativeTask2.index)
972    verify(sched.backend).killTask(origTask2.taskId, "exec2", true)
973    assert(manager.tasksSuccessful === 5)
974    assert(manager.isZombie)
975  }
976
977  test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
978    sc = new SparkContext("local", "test")
979    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
980    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
981    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, new ManualClock)
982    assert(manager.name === "TaskSet_0.0")
983
984    // Make sure a task set with the same stage ID but different attempt ID has a unique name
985    val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 1)
986    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, new ManualClock)
987    assert(manager2.name === "TaskSet_0.1")
988
989    // Make sure a task set with the same attempt ID but different stage ID also has a unique name
990    val taskSet3 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 1)
991    val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, new ManualClock)
992    assert(manager3.name === "TaskSet_1.1")
993  }
994
995  private def createTaskResult(
996      id: Int,
997      accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
998    val valueSer = SparkEnv.get.serializer.newInstance()
999    new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
1000  }
1001}
1002