1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17package org.apache.spark.scheduler
18
19import java.util.Properties
20import java.util.concurrent.{TimeoutException, TimeUnit}
21import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
22
23import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
24import scala.concurrent.{Await, Future}
25import scala.concurrent.duration.{Duration, SECONDS}
26import scala.language.existentials
27import scala.reflect.ClassTag
28
29import org.scalactic.TripleEquals
30import org.scalatest.Assertions.AssertionsHelper
31
32import org.apache.spark._
33import org.apache.spark.TaskState._
34import org.apache.spark.internal.Logging
35import org.apache.spark.rdd.RDD
36import org.apache.spark.util.{CallSite, ThreadUtils, Utils}
37
38/**
39 * Tests for the  entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets,
40 * TaskSetManagers.
41 *
42 * Test cases are configured by providing a set of jobs to submit, and then simulating interaction
43 * with spark's executors via a mocked backend (eg., task completion, task failure, executors
44 * disconnecting, etc.).
45 */
46abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends SparkFunSuite
47    with LocalSparkContext {
48
49  var taskScheduler: TestTaskScheduler = null
50  var scheduler: DAGScheduler = null
51  var backend: T = _
52
53  override def beforeEach(): Unit = {
54    if (taskScheduler != null) {
55      taskScheduler.runningTaskSets.clear()
56    }
57    results.clear()
58    failure = null
59    backendException.set(null)
60    super.beforeEach()
61  }
62
63  override def afterEach(): Unit = {
64    super.afterEach()
65    taskScheduler.stop()
66    backend.stop()
67    scheduler.stop()
68  }
69
70  def setupScheduler(conf: SparkConf): Unit = {
71    conf.setAppName(this.getClass().getSimpleName())
72    val backendClassName = implicitly[ClassTag[T]].runtimeClass.getName()
73    conf.setMaster(s"mock[${backendClassName}]")
74    sc = new SparkContext(conf)
75    backend = sc.schedulerBackend.asInstanceOf[T]
76    taskScheduler = sc.taskScheduler.asInstanceOf[TestTaskScheduler]
77    taskScheduler.initialize(sc.schedulerBackend)
78    scheduler = new DAGScheduler(sc, taskScheduler)
79    taskScheduler.setDAGScheduler(scheduler)
80  }
81
82  def testScheduler(name: String)(testBody: => Unit): Unit = {
83    testScheduler(name, Seq())(testBody)
84  }
85
86  def testScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = {
87    test(name) {
88      val conf = new SparkConf()
89      extraConfs.foreach{ case (k, v) => conf.set(k, v)}
90      setupScheduler(conf)
91      testBody
92    }
93  }
94
95  /**
96   * A map from partition -> results for all tasks of a job when you call this test framework's
97   * [[submit]] method.  Two important considerations:
98   *
99   * 1. If there is a job failure, results may or may not be empty.  If any tasks succeed before
100   * the job has failed, they will get included in `results`.  Instead, check for job failure by
101   * checking [[failure]].  (Also see [[assertDataStructuresEmpty()]])
102   *
103   * 2. This only gets cleared between tests.  So you'll need to do special handling if you submit
104   * more than one job in one test.
105   */
106  val results = new HashMap[Int, Any]()
107
108  /**
109   * If a call to [[submit]] results in a job failure, this will hold the exception, else it will
110   * be null.
111   *
112   * As with [[results]], this only gets cleared between tests, so care must be taken if you are
113   * submitting more than one job in one test.
114   */
115  var failure: Throwable = _
116
117  /**
118   * When we submit dummy Jobs, this is the compute function we supply.
119   */
120  private val jobComputeFunc: (TaskContext, scala.Iterator[_]) => Any = {
121    (context: TaskContext, it: Iterator[(_)]) =>
122      throw new RuntimeException("jobComputeFunc shouldn't get called in this mock")
123  }
124
125  /** Submits a job to the scheduler, and returns a future which does a bit of error handling. */
126  protected def submit(
127      rdd: RDD[_],
128      partitions: Array[Int],
129      func: (TaskContext, Iterator[_]) => _ = jobComputeFunc): Future[Any] = {
130    val waiter: JobWaiter[Any] = scheduler.submitJob(rdd, func, partitions.toSeq, CallSite("", ""),
131      (index, res) => results(index) = res, new Properties())
132    import scala.concurrent.ExecutionContext.Implicits.global
133    waiter.completionFuture.recover { case ex =>
134      failure = ex
135    }
136  }
137
138  /**
139   * Helper to run a few common asserts after a job has completed, in particular some internal
140   * datastructures for bookkeeping.  This only does a very minimal check for whether the job
141   * failed or succeeded -- often you will want extra asserts on [[results]] or [[failure]].
142   */
143  protected def assertDataStructuresEmpty(noFailure: Boolean = true): Unit = {
144    if (noFailure) {
145      if (failure != null) {
146        // if there is a job failure, it can be a bit hard to tease the job failure msg apart
147        // from the test failure msg, so we do a little extra formatting
148        val msg =
149        raw"""
150          | There was a failed job.
151          | ----- Begin Job Failure Msg -----
152          | ${Utils.exceptionString(failure)}
153          | ----- End Job Failure Msg ----
154        """.
155          stripMargin
156        fail(msg)
157      }
158      // When a job fails, we terminate before waiting for all the task end events to come in,
159      // so there might still be a running task set.  So we only check these conditions
160      // when the job succeeds
161      assert(taskScheduler.runningTaskSets.isEmpty)
162      assert(!backend.hasTasks)
163    } else {
164      assert(failure != null)
165    }
166    assert(scheduler.activeJobs.isEmpty)
167    assert(backendException.get() == null)
168  }
169
170  /**
171   * Looks at all shuffleMapOutputs that are dependencies of the given RDD, and makes sure
172   * they are all registered
173   */
174  def assertMapOutputAvailable(targetRdd: MockRDD): Unit = {
175    val shuffleIds = targetRdd.shuffleDeps.map{_.shuffleId}
176    val nParts = targetRdd.numPartitions
177    for {
178      shuffleId <- shuffleIds
179      reduceIdx <- (0 until nParts)
180    } {
181      val statuses = taskScheduler.mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceIdx)
182      // really we should have already thrown an exception rather than fail either of these
183      // asserts, but just to be extra defensive let's double check the statuses are OK
184      assert(statuses != null)
185      assert(statuses.nonEmpty)
186    }
187  }
188
189  /** models a stage boundary with a single dependency, like a shuffle */
190  def shuffle(nParts: Int, input: MockRDD): MockRDD = {
191    val partitioner = new HashPartitioner(nParts)
192    val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, partitioner)
193    new MockRDD(sc, nParts, List(shuffleDep))
194  }
195
196  /** models a stage boundary with multiple dependencies, like a join */
197  def join(nParts: Int, inputs: MockRDD*): MockRDD = {
198    val partitioner = new HashPartitioner(nParts)
199    val shuffleDeps = inputs.map { inputRDD =>
200      new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner)
201    }
202    new MockRDD(sc, nParts, shuffleDeps)
203  }
204
205  val backendException = new AtomicReference[Exception](null)
206
207  /**
208   * Helper which makes it a little easier to setup a test, which starts a mock backend in another
209   * thread, responding to tasks with your custom function.  You also supply the "body" of your
210   * test, where you submit jobs to your backend, wait for them to complete, then check
211   * whatever conditions you want.  Note that this is *not* safe to all bad backends --
212   * in particular, your `backendFunc` has to return quickly, it can't throw errors, (instead
213   * it should send back the right TaskEndReason)
214   */
215  def withBackend[T](backendFunc: () => Unit)(testBody: => T): T = {
216    val backendContinue = new AtomicBoolean(true)
217    val backendThread = new Thread("mock backend thread") {
218      override def run(): Unit = {
219        while (backendContinue.get()) {
220          if (backend.hasTasksWaitingToRun) {
221            try {
222              backendFunc()
223            } catch {
224              case ex: Exception =>
225                // Try to do a little error handling around exceptions that might occur here --
226                // otherwise it can just look like a TimeoutException in the test itself.
227                logError("Exception in mock backend:", ex)
228                backendException.set(ex)
229                backendContinue.set(false)
230                throw ex
231            }
232          } else {
233            Thread.sleep(10)
234          }
235        }
236      }
237    }
238    try {
239      backendThread.start()
240      testBody
241    } finally {
242      backendContinue.set(false)
243      backendThread.join()
244    }
245  }
246
247  /**
248   * Helper to do a little extra error checking while waiting for the job to terminate.  Primarily
249   * just does a little extra error handling if there is an exception from the backend.
250   */
251  def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
252    try {
253      Await.ready(jobFuture, duration)
254    } catch {
255      case te: TimeoutException if backendException.get() != null =>
256        val msg = raw"""
257           | ----- Begin Backend Failure Msg -----
258           | ${Utils.exceptionString(backendException.get())}
259           | ----- End Backend Failure Msg ----
260        """.
261          stripMargin
262
263        fail(s"Future timed out after ${duration}, likely because of failure in backend: $msg")
264    }
265  }
266}
267
268/**
269 * Helper for running a backend in integration tests, does a bunch of the book-keeping
270 * so individual tests can focus on just responding to tasks.  Individual tests will use
271 * [[beginTask]], [[taskSuccess]], and [[taskFailed]].
272 */
273private[spark] abstract class MockBackend(
274    conf: SparkConf,
275    val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging {
276
277  // Periodically revive offers to allow delay scheduling to work
278  private val reviveThread =
279    ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
280  private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")
281
282  /**
283   * Test backends should call this to get a task that has been assigned to them by the scheduler.
284   * Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
285   */
286  def beginTask(): (TaskDescription, Task[_]) = {
287    synchronized {
288      val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1)
289      runningTasks += toRun._1.taskId
290      toRun
291    }
292  }
293
294  /**
295   * Tell the scheduler the task completed successfully, with the given result.  Also
296   * updates some internal state for this mock.
297   */
298  def taskSuccess(task: TaskDescription, result: Any): Unit = {
299    val ser = env.serializer.newInstance()
300    val resultBytes = ser.serialize(result)
301    val directResult = new DirectTaskResult(resultBytes, Seq()) // no accumulator updates
302    taskUpdate(task, TaskState.FINISHED, directResult)
303  }
304
305  /**
306   * Tell the scheduler the task failed, with the given state and result (probably ExceptionFailure
307   * or FetchFailed).  Also updates some internal state for this mock.
308   */
309  def taskFailed(task: TaskDescription, exc: Exception): Unit = {
310    taskUpdate(task, TaskState.FAILED, new ExceptionFailure(exc, Seq()))
311  }
312
313  def taskFailed(task: TaskDescription, reason: TaskFailedReason): Unit = {
314    taskUpdate(task, TaskState.FAILED, reason)
315  }
316
317  def taskUpdate(task: TaskDescription, state: TaskState, result: Any): Unit = {
318    val ser = env.serializer.newInstance()
319    val resultBytes = ser.serialize(result)
320    // statusUpdate is safe to call from multiple threads, its protected inside taskScheduler
321    taskScheduler.statusUpdate(task.taskId, state, resultBytes)
322    if (TaskState.isFinished(state)) {
323      synchronized {
324        runningTasks -= task.taskId
325        executorIdToExecutor(task.executorId).freeCores += taskScheduler.CPUS_PER_TASK
326        freeCores += taskScheduler.CPUS_PER_TASK
327      }
328      reviveOffers()
329    }
330  }
331
332  // protected by this
333  private val assignedTasksWaitingToRun = new ArrayBuffer[(TaskDescription, Task[_])](10000)
334  // protected by this
335  private val runningTasks = HashSet[Long]()
336
337  def hasTasks: Boolean = synchronized {
338    assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty
339  }
340
341  def hasTasksWaitingToRun: Boolean = {
342    assignedTasksWaitingToRun.nonEmpty
343  }
344
345  override def start(): Unit = {
346    reviveThread.scheduleAtFixedRate(new Runnable {
347      override def run(): Unit = Utils.tryLogNonFatalError {
348        reviveOffers()
349      }
350    }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
351  }
352
353  override def stop(): Unit = {
354    reviveThread.shutdown()
355  }
356
357  val env = SparkEnv.get
358
359  /** Accessed by both scheduling and backend thread, so should be protected by this. */
360  var freeCores: Int = _
361
362  /**
363   * Accessed by both scheduling and backend thread, so should be protected by this.
364   * Most likely the only thing that needs to be protected are the inidividual ExecutorTaskStatus,
365   * but for simplicity in this mock just lock the whole backend.
366   */
367  def executorIdToExecutor: Map[String, ExecutorTaskStatus]
368
369  private def generateOffers(): IndexedSeq[WorkerOffer] = {
370    executorIdToExecutor.values.filter { exec =>
371      exec.freeCores > 0
372    }.map { exec =>
373      WorkerOffer(executorId = exec.executorId, host = exec.host,
374        cores = exec.freeCores)
375    }.toIndexedSeq
376  }
377
378  /**
379   * This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks
380   * completes (which will be in a result-getter thread), and by the reviveOffers thread for delay
381   * scheduling.
382   */
383  override def reviveOffers(): Unit = {
384    val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
385    // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
386    // tests from introducing a race if they need it
387    val newTasks = taskScheduler.synchronized {
388      newTaskDescriptions.map { taskDescription =>
389        val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
390        val task = taskSet.tasks(taskDescription.index)
391        (taskDescription, task)
392      }
393    }
394    synchronized {
395      newTasks.foreach { case (taskDescription, _) =>
396        executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK
397      }
398      freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK
399      assignedTasksWaitingToRun ++= newTasks
400    }
401  }
402
403  override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
404    // We have to implement this b/c of SPARK-15385.
405    // Its OK for this to be a no-op, because even if a backend does implement killTask,
406    // it really can only be "best-effort" in any case, and the scheduler should be robust to that.
407    // And in fact its reasonably simulating a case where a real backend finishes tasks in between
408    // the time when the scheduler sends the msg to kill tasks, and the backend receives the msg.
409  }
410}
411
412/**
413 * A very simple mock backend that can just run one task at a time.
414 */
415private[spark] class SingleCoreMockBackend(
416  conf: SparkConf,
417  taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) {
418
419  val cores = 1
420
421  override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", cores)
422
423  freeCores = cores
424  val localExecutorId = SparkContext.DRIVER_IDENTIFIER
425  val localExecutorHostname = "localhost"
426
427  override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = Map(
428    localExecutorId -> new ExecutorTaskStatus(localExecutorHostname, localExecutorId, freeCores)
429  )
430}
431
432case class ExecutorTaskStatus(host: String, executorId: String, var freeCores: Int)
433
434class MockRDD(
435  sc: SparkContext,
436  val numPartitions: Int,
437  val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]]
438) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable {
439
440  MockRDD.validate(numPartitions, shuffleDeps)
441
442  override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
443    throw new RuntimeException("should not be reached")
444  override def getPartitions: Array[Partition] = {
445    (0 until numPartitions).map(i => new Partition {
446      override def index: Int = i
447    }).toArray
448  }
449  override def getPreferredLocations(split: Partition): Seq[String] = Nil
450  override def toString: String = "MockRDD " + id
451}
452
453object MockRDD extends AssertionsHelper with TripleEquals {
454  /**
455   * make sure all the shuffle dependencies have a consistent number of output partitions
456   * (mostly to make sure the test setup makes sense, not that Spark itself would get this wrong)
457   */
458  def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, _]]): Unit = {
459    dependencies.foreach { dependency =>
460      val partitioner = dependency.partitioner
461      assert(partitioner != null)
462      assert(partitioner.numPartitions === numPartitions)
463    }
464  }
465}
466
467/** Simple cluster manager that wires up our mock backend. */
468private class MockExternalClusterManager extends ExternalClusterManager {
469
470  val MOCK_REGEX = """mock\[(.*)\]""".r
471  def canCreate(masterURL: String): Boolean = MOCK_REGEX.findFirstIn(masterURL).isDefined
472
473  def createTaskScheduler(
474      sc: SparkContext,
475      masterURL: String): TaskScheduler = {
476    new TestTaskScheduler(sc)
477 }
478
479  def createSchedulerBackend(
480      sc: SparkContext,
481      masterURL: String,
482      scheduler: TaskScheduler): SchedulerBackend = {
483    masterURL match {
484      case MOCK_REGEX(backendClassName) =>
485        val backendClass = Utils.classForName(backendClassName)
486        val ctor = backendClass.getConstructor(classOf[SparkConf], classOf[TaskSchedulerImpl])
487        ctor.newInstance(sc.getConf, scheduler).asInstanceOf[SchedulerBackend]
488    }
489  }
490
491  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
492    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
493  }
494}
495
496/** TaskSchedulerImpl that just tracks a tiny bit more state to enable checks in tests. */
497class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
498  /** Set of TaskSets the DAGScheduler has requested executed. */
499  val runningTaskSets = HashSet[TaskSet]()
500
501  override def submitTasks(taskSet: TaskSet): Unit = {
502    runningTaskSets += taskSet
503    super.submitTasks(taskSet)
504  }
505
506  override def taskSetFinished(manager: TaskSetManager): Unit = {
507    runningTaskSets -= manager.taskSet
508    super.taskSetFinished(manager)
509  }
510}
511
512/**
513 * Some very basic tests just to demonstrate the use of the test framework (and verify that it
514 * works).
515 */
516class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCoreMockBackend] {
517
518  /**
519   * Very simple one stage job.  Backend successfully completes each task, one by one
520   */
521  testScheduler("super simple job") {
522    def runBackend(): Unit = {
523      val (taskDescripition, _) = backend.beginTask()
524      backend.taskSuccess(taskDescripition, 42)
525    }
526    withBackend(runBackend _) {
527      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
528      val duration = Duration(1, SECONDS)
529      awaitJobTermination(jobFuture, duration)
530    }
531    assert(results === (0 until 10).map { _ -> 42 }.toMap)
532    assertDataStructuresEmpty()
533  }
534
535  /**
536   * 5 stage job, diamond dependencies.
537   *
538   * a ----> b ----> d --> result
539   *    \--> c --/
540   *
541   * Backend successfully completes each task
542   */
543  testScheduler("multi-stage job") {
544
545    def stageToOutputParts(stageId: Int): Int = {
546      stageId match {
547        case 0 => 10
548        case 2 => 20
549        case _ => 30
550      }
551    }
552
553    val a = new MockRDD(sc, 2, Nil)
554    val b = shuffle(10, a)
555    val c = shuffle(20, a)
556    val d = join(30, b, c)
557
558    def runBackend(): Unit = {
559      val (taskDescription, task) = backend.beginTask()
560
561      // make sure the required map output is available
562      task.stageId match {
563        case 4 => assertMapOutputAvailable(d)
564        case _ =>
565        // we can't check for the output for the two intermediate stages, unfortunately,
566        // b/c the stage numbering is non-deterministic, so stage number alone doesn't tell
567        // us what to check
568      }
569
570      (task.stageId, task.stageAttemptId, task.partitionId) match {
571        case (stage, 0, _) if stage < 4 =>
572          backend.taskSuccess(taskDescription,
573            DAGSchedulerSuite.makeMapStatus("hostA", stageToOutputParts(stage)))
574        case (4, 0, partition) =>
575          backend.taskSuccess(taskDescription, 4321 + partition)
576      }
577    }
578    withBackend(runBackend _) {
579      val jobFuture = submit(d, (0 until 30).toArray)
580      val duration = Duration(1, SECONDS)
581      awaitJobTermination(jobFuture, duration)
582    }
583    assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap)
584    assertDataStructuresEmpty()
585  }
586
587  /**
588   * 2 stage job, with a fetch failure.  Make sure that:
589   * (a) map output is available whenever we run stage 1
590   * (b) we get a second attempt for stage 0 & stage 1
591   */
592  testScheduler("job with fetch failure") {
593    val input = new MockRDD(sc, 2, Nil)
594    val shuffledRdd = shuffle(10, input)
595    val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId
596
597    val stageToAttempts = new HashMap[Int, HashSet[Int]]()
598
599    def runBackend(): Unit = {
600      val (taskDescription, task) = backend.beginTask()
601      stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId
602
603      // We cannot check if shuffle output is available, because the failed fetch will clear the
604      // shuffle output.  Then we'd have a race, between the already-started task from the first
605      // attempt, and when the failure clears out the map output status.
606
607      (task.stageId, task.stageAttemptId, task.partitionId) match {
608        case (0, _, _) =>
609          backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 10))
610        case (1, 0, 0) =>
611          val fetchFailed = FetchFailed(
612            DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")
613          backend.taskFailed(taskDescription, fetchFailed)
614        case (1, _, partition) =>
615          backend.taskSuccess(taskDescription, 42 + partition)
616      }
617    }
618    withBackend(runBackend _) {
619      val jobFuture = submit(shuffledRdd, (0 until 10).toArray)
620      val duration = Duration(1, SECONDS)
621      awaitJobTermination(jobFuture, duration)
622    }
623    assertDataStructuresEmpty()
624    assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap)
625    assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1)))
626  }
627
628  testScheduler("job failure after 4 attempts") {
629    def runBackend(): Unit = {
630      val (taskDescription, _) = backend.beginTask()
631      backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
632    }
633    withBackend(runBackend _) {
634      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
635      val duration = Duration(1, SECONDS)
636      awaitJobTermination(jobFuture, duration)
637      assert(failure.getMessage.contains("test task failure"))
638    }
639    assertDataStructuresEmpty(noFailure = false)
640  }
641}
642