1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17package org.apache.spark.scheduler 18 19import java.util.Properties 20import java.util.concurrent.{TimeoutException, TimeUnit} 21import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} 22 23import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} 24import scala.concurrent.{Await, Future} 25import scala.concurrent.duration.{Duration, SECONDS} 26import scala.language.existentials 27import scala.reflect.ClassTag 28 29import org.scalactic.TripleEquals 30import org.scalatest.Assertions.AssertionsHelper 31 32import org.apache.spark._ 33import org.apache.spark.TaskState._ 34import org.apache.spark.internal.Logging 35import org.apache.spark.rdd.RDD 36import org.apache.spark.util.{CallSite, ThreadUtils, Utils} 37 38/** 39 * Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets, 40 * TaskSetManagers. 41 * 42 * Test cases are configured by providing a set of jobs to submit, and then simulating interaction 43 * with spark's executors via a mocked backend (eg., task completion, task failure, executors 44 * disconnecting, etc.). 45 */ 46abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends SparkFunSuite 47 with LocalSparkContext { 48 49 var taskScheduler: TestTaskScheduler = null 50 var scheduler: DAGScheduler = null 51 var backend: T = _ 52 53 override def beforeEach(): Unit = { 54 if (taskScheduler != null) { 55 taskScheduler.runningTaskSets.clear() 56 } 57 results.clear() 58 failure = null 59 backendException.set(null) 60 super.beforeEach() 61 } 62 63 override def afterEach(): Unit = { 64 super.afterEach() 65 taskScheduler.stop() 66 backend.stop() 67 scheduler.stop() 68 } 69 70 def setupScheduler(conf: SparkConf): Unit = { 71 conf.setAppName(this.getClass().getSimpleName()) 72 val backendClassName = implicitly[ClassTag[T]].runtimeClass.getName() 73 conf.setMaster(s"mock[${backendClassName}]") 74 sc = new SparkContext(conf) 75 backend = sc.schedulerBackend.asInstanceOf[T] 76 taskScheduler = sc.taskScheduler.asInstanceOf[TestTaskScheduler] 77 taskScheduler.initialize(sc.schedulerBackend) 78 scheduler = new DAGScheduler(sc, taskScheduler) 79 taskScheduler.setDAGScheduler(scheduler) 80 } 81 82 def testScheduler(name: String)(testBody: => Unit): Unit = { 83 testScheduler(name, Seq())(testBody) 84 } 85 86 def testScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = { 87 test(name) { 88 val conf = new SparkConf() 89 extraConfs.foreach{ case (k, v) => conf.set(k, v)} 90 setupScheduler(conf) 91 testBody 92 } 93 } 94 95 /** 96 * A map from partition -> results for all tasks of a job when you call this test framework's 97 * [[submit]] method. Two important considerations: 98 * 99 * 1. If there is a job failure, results may or may not be empty. If any tasks succeed before 100 * the job has failed, they will get included in `results`. Instead, check for job failure by 101 * checking [[failure]]. (Also see [[assertDataStructuresEmpty()]]) 102 * 103 * 2. This only gets cleared between tests. So you'll need to do special handling if you submit 104 * more than one job in one test. 105 */ 106 val results = new HashMap[Int, Any]() 107 108 /** 109 * If a call to [[submit]] results in a job failure, this will hold the exception, else it will 110 * be null. 111 * 112 * As with [[results]], this only gets cleared between tests, so care must be taken if you are 113 * submitting more than one job in one test. 114 */ 115 var failure: Throwable = _ 116 117 /** 118 * When we submit dummy Jobs, this is the compute function we supply. 119 */ 120 private val jobComputeFunc: (TaskContext, scala.Iterator[_]) => Any = { 121 (context: TaskContext, it: Iterator[(_)]) => 122 throw new RuntimeException("jobComputeFunc shouldn't get called in this mock") 123 } 124 125 /** Submits a job to the scheduler, and returns a future which does a bit of error handling. */ 126 protected def submit( 127 rdd: RDD[_], 128 partitions: Array[Int], 129 func: (TaskContext, Iterator[_]) => _ = jobComputeFunc): Future[Any] = { 130 val waiter: JobWaiter[Any] = scheduler.submitJob(rdd, func, partitions.toSeq, CallSite("", ""), 131 (index, res) => results(index) = res, new Properties()) 132 import scala.concurrent.ExecutionContext.Implicits.global 133 waiter.completionFuture.recover { case ex => 134 failure = ex 135 } 136 } 137 138 /** 139 * Helper to run a few common asserts after a job has completed, in particular some internal 140 * datastructures for bookkeeping. This only does a very minimal check for whether the job 141 * failed or succeeded -- often you will want extra asserts on [[results]] or [[failure]]. 142 */ 143 protected def assertDataStructuresEmpty(noFailure: Boolean = true): Unit = { 144 if (noFailure) { 145 if (failure != null) { 146 // if there is a job failure, it can be a bit hard to tease the job failure msg apart 147 // from the test failure msg, so we do a little extra formatting 148 val msg = 149 raw""" 150 | There was a failed job. 151 | ----- Begin Job Failure Msg ----- 152 | ${Utils.exceptionString(failure)} 153 | ----- End Job Failure Msg ---- 154 """. 155 stripMargin 156 fail(msg) 157 } 158 // When a job fails, we terminate before waiting for all the task end events to come in, 159 // so there might still be a running task set. So we only check these conditions 160 // when the job succeeds 161 assert(taskScheduler.runningTaskSets.isEmpty) 162 assert(!backend.hasTasks) 163 } else { 164 assert(failure != null) 165 } 166 assert(scheduler.activeJobs.isEmpty) 167 assert(backendException.get() == null) 168 } 169 170 /** 171 * Looks at all shuffleMapOutputs that are dependencies of the given RDD, and makes sure 172 * they are all registered 173 */ 174 def assertMapOutputAvailable(targetRdd: MockRDD): Unit = { 175 val shuffleIds = targetRdd.shuffleDeps.map{_.shuffleId} 176 val nParts = targetRdd.numPartitions 177 for { 178 shuffleId <- shuffleIds 179 reduceIdx <- (0 until nParts) 180 } { 181 val statuses = taskScheduler.mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceIdx) 182 // really we should have already thrown an exception rather than fail either of these 183 // asserts, but just to be extra defensive let's double check the statuses are OK 184 assert(statuses != null) 185 assert(statuses.nonEmpty) 186 } 187 } 188 189 /** models a stage boundary with a single dependency, like a shuffle */ 190 def shuffle(nParts: Int, input: MockRDD): MockRDD = { 191 val partitioner = new HashPartitioner(nParts) 192 val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, partitioner) 193 new MockRDD(sc, nParts, List(shuffleDep)) 194 } 195 196 /** models a stage boundary with multiple dependencies, like a join */ 197 def join(nParts: Int, inputs: MockRDD*): MockRDD = { 198 val partitioner = new HashPartitioner(nParts) 199 val shuffleDeps = inputs.map { inputRDD => 200 new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner) 201 } 202 new MockRDD(sc, nParts, shuffleDeps) 203 } 204 205 val backendException = new AtomicReference[Exception](null) 206 207 /** 208 * Helper which makes it a little easier to setup a test, which starts a mock backend in another 209 * thread, responding to tasks with your custom function. You also supply the "body" of your 210 * test, where you submit jobs to your backend, wait for them to complete, then check 211 * whatever conditions you want. Note that this is *not* safe to all bad backends -- 212 * in particular, your `backendFunc` has to return quickly, it can't throw errors, (instead 213 * it should send back the right TaskEndReason) 214 */ 215 def withBackend[T](backendFunc: () => Unit)(testBody: => T): T = { 216 val backendContinue = new AtomicBoolean(true) 217 val backendThread = new Thread("mock backend thread") { 218 override def run(): Unit = { 219 while (backendContinue.get()) { 220 if (backend.hasTasksWaitingToRun) { 221 try { 222 backendFunc() 223 } catch { 224 case ex: Exception => 225 // Try to do a little error handling around exceptions that might occur here -- 226 // otherwise it can just look like a TimeoutException in the test itself. 227 logError("Exception in mock backend:", ex) 228 backendException.set(ex) 229 backendContinue.set(false) 230 throw ex 231 } 232 } else { 233 Thread.sleep(10) 234 } 235 } 236 } 237 } 238 try { 239 backendThread.start() 240 testBody 241 } finally { 242 backendContinue.set(false) 243 backendThread.join() 244 } 245 } 246 247 /** 248 * Helper to do a little extra error checking while waiting for the job to terminate. Primarily 249 * just does a little extra error handling if there is an exception from the backend. 250 */ 251 def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = { 252 try { 253 Await.ready(jobFuture, duration) 254 } catch { 255 case te: TimeoutException if backendException.get() != null => 256 val msg = raw""" 257 | ----- Begin Backend Failure Msg ----- 258 | ${Utils.exceptionString(backendException.get())} 259 | ----- End Backend Failure Msg ---- 260 """. 261 stripMargin 262 263 fail(s"Future timed out after ${duration}, likely because of failure in backend: $msg") 264 } 265 } 266} 267 268/** 269 * Helper for running a backend in integration tests, does a bunch of the book-keeping 270 * so individual tests can focus on just responding to tasks. Individual tests will use 271 * [[beginTask]], [[taskSuccess]], and [[taskFailed]]. 272 */ 273private[spark] abstract class MockBackend( 274 conf: SparkConf, 275 val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging { 276 277 // Periodically revive offers to allow delay scheduling to work 278 private val reviveThread = 279 ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") 280 private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms") 281 282 /** 283 * Test backends should call this to get a task that has been assigned to them by the scheduler. 284 * Each task should be responded to with either [[taskSuccess]] or [[taskFailed]]. 285 */ 286 def beginTask(): (TaskDescription, Task[_]) = { 287 synchronized { 288 val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1) 289 runningTasks += toRun._1.taskId 290 toRun 291 } 292 } 293 294 /** 295 * Tell the scheduler the task completed successfully, with the given result. Also 296 * updates some internal state for this mock. 297 */ 298 def taskSuccess(task: TaskDescription, result: Any): Unit = { 299 val ser = env.serializer.newInstance() 300 val resultBytes = ser.serialize(result) 301 val directResult = new DirectTaskResult(resultBytes, Seq()) // no accumulator updates 302 taskUpdate(task, TaskState.FINISHED, directResult) 303 } 304 305 /** 306 * Tell the scheduler the task failed, with the given state and result (probably ExceptionFailure 307 * or FetchFailed). Also updates some internal state for this mock. 308 */ 309 def taskFailed(task: TaskDescription, exc: Exception): Unit = { 310 taskUpdate(task, TaskState.FAILED, new ExceptionFailure(exc, Seq())) 311 } 312 313 def taskFailed(task: TaskDescription, reason: TaskFailedReason): Unit = { 314 taskUpdate(task, TaskState.FAILED, reason) 315 } 316 317 def taskUpdate(task: TaskDescription, state: TaskState, result: Any): Unit = { 318 val ser = env.serializer.newInstance() 319 val resultBytes = ser.serialize(result) 320 // statusUpdate is safe to call from multiple threads, its protected inside taskScheduler 321 taskScheduler.statusUpdate(task.taskId, state, resultBytes) 322 if (TaskState.isFinished(state)) { 323 synchronized { 324 runningTasks -= task.taskId 325 executorIdToExecutor(task.executorId).freeCores += taskScheduler.CPUS_PER_TASK 326 freeCores += taskScheduler.CPUS_PER_TASK 327 } 328 reviveOffers() 329 } 330 } 331 332 // protected by this 333 private val assignedTasksWaitingToRun = new ArrayBuffer[(TaskDescription, Task[_])](10000) 334 // protected by this 335 private val runningTasks = HashSet[Long]() 336 337 def hasTasks: Boolean = synchronized { 338 assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty 339 } 340 341 def hasTasksWaitingToRun: Boolean = { 342 assignedTasksWaitingToRun.nonEmpty 343 } 344 345 override def start(): Unit = { 346 reviveThread.scheduleAtFixedRate(new Runnable { 347 override def run(): Unit = Utils.tryLogNonFatalError { 348 reviveOffers() 349 } 350 }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS) 351 } 352 353 override def stop(): Unit = { 354 reviveThread.shutdown() 355 } 356 357 val env = SparkEnv.get 358 359 /** Accessed by both scheduling and backend thread, so should be protected by this. */ 360 var freeCores: Int = _ 361 362 /** 363 * Accessed by both scheduling and backend thread, so should be protected by this. 364 * Most likely the only thing that needs to be protected are the inidividual ExecutorTaskStatus, 365 * but for simplicity in this mock just lock the whole backend. 366 */ 367 def executorIdToExecutor: Map[String, ExecutorTaskStatus] 368 369 private def generateOffers(): IndexedSeq[WorkerOffer] = { 370 executorIdToExecutor.values.filter { exec => 371 exec.freeCores > 0 372 }.map { exec => 373 WorkerOffer(executorId = exec.executorId, host = exec.host, 374 cores = exec.freeCores) 375 }.toIndexedSeq 376 } 377 378 /** 379 * This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks 380 * completes (which will be in a result-getter thread), and by the reviveOffers thread for delay 381 * scheduling. 382 */ 383 override def reviveOffers(): Unit = { 384 val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten 385 // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual 386 // tests from introducing a race if they need it 387 val newTasks = taskScheduler.synchronized { 388 newTaskDescriptions.map { taskDescription => 389 val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet 390 val task = taskSet.tasks(taskDescription.index) 391 (taskDescription, task) 392 } 393 } 394 synchronized { 395 newTasks.foreach { case (taskDescription, _) => 396 executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK 397 } 398 freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK 399 assignedTasksWaitingToRun ++= newTasks 400 } 401 } 402 403 override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { 404 // We have to implement this b/c of SPARK-15385. 405 // Its OK for this to be a no-op, because even if a backend does implement killTask, 406 // it really can only be "best-effort" in any case, and the scheduler should be robust to that. 407 // And in fact its reasonably simulating a case where a real backend finishes tasks in between 408 // the time when the scheduler sends the msg to kill tasks, and the backend receives the msg. 409 } 410} 411 412/** 413 * A very simple mock backend that can just run one task at a time. 414 */ 415private[spark] class SingleCoreMockBackend( 416 conf: SparkConf, 417 taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) { 418 419 val cores = 1 420 421 override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", cores) 422 423 freeCores = cores 424 val localExecutorId = SparkContext.DRIVER_IDENTIFIER 425 val localExecutorHostname = "localhost" 426 427 override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = Map( 428 localExecutorId -> new ExecutorTaskStatus(localExecutorHostname, localExecutorId, freeCores) 429 ) 430} 431 432case class ExecutorTaskStatus(host: String, executorId: String, var freeCores: Int) 433 434class MockRDD( 435 sc: SparkContext, 436 val numPartitions: Int, 437 val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]] 438) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable { 439 440 MockRDD.validate(numPartitions, shuffleDeps) 441 442 override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = 443 throw new RuntimeException("should not be reached") 444 override def getPartitions: Array[Partition] = { 445 (0 until numPartitions).map(i => new Partition { 446 override def index: Int = i 447 }).toArray 448 } 449 override def getPreferredLocations(split: Partition): Seq[String] = Nil 450 override def toString: String = "MockRDD " + id 451} 452 453object MockRDD extends AssertionsHelper with TripleEquals { 454 /** 455 * make sure all the shuffle dependencies have a consistent number of output partitions 456 * (mostly to make sure the test setup makes sense, not that Spark itself would get this wrong) 457 */ 458 def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, _]]): Unit = { 459 dependencies.foreach { dependency => 460 val partitioner = dependency.partitioner 461 assert(partitioner != null) 462 assert(partitioner.numPartitions === numPartitions) 463 } 464 } 465} 466 467/** Simple cluster manager that wires up our mock backend. */ 468private class MockExternalClusterManager extends ExternalClusterManager { 469 470 val MOCK_REGEX = """mock\[(.*)\]""".r 471 def canCreate(masterURL: String): Boolean = MOCK_REGEX.findFirstIn(masterURL).isDefined 472 473 def createTaskScheduler( 474 sc: SparkContext, 475 masterURL: String): TaskScheduler = { 476 new TestTaskScheduler(sc) 477 } 478 479 def createSchedulerBackend( 480 sc: SparkContext, 481 masterURL: String, 482 scheduler: TaskScheduler): SchedulerBackend = { 483 masterURL match { 484 case MOCK_REGEX(backendClassName) => 485 val backendClass = Utils.classForName(backendClassName) 486 val ctor = backendClass.getConstructor(classOf[SparkConf], classOf[TaskSchedulerImpl]) 487 ctor.newInstance(sc.getConf, scheduler).asInstanceOf[SchedulerBackend] 488 } 489 } 490 491 def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { 492 scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) 493 } 494} 495 496/** TaskSchedulerImpl that just tracks a tiny bit more state to enable checks in tests. */ 497class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { 498 /** Set of TaskSets the DAGScheduler has requested executed. */ 499 val runningTaskSets = HashSet[TaskSet]() 500 501 override def submitTasks(taskSet: TaskSet): Unit = { 502 runningTaskSets += taskSet 503 super.submitTasks(taskSet) 504 } 505 506 override def taskSetFinished(manager: TaskSetManager): Unit = { 507 runningTaskSets -= manager.taskSet 508 super.taskSetFinished(manager) 509 } 510} 511 512/** 513 * Some very basic tests just to demonstrate the use of the test framework (and verify that it 514 * works). 515 */ 516class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCoreMockBackend] { 517 518 /** 519 * Very simple one stage job. Backend successfully completes each task, one by one 520 */ 521 testScheduler("super simple job") { 522 def runBackend(): Unit = { 523 val (taskDescripition, _) = backend.beginTask() 524 backend.taskSuccess(taskDescripition, 42) 525 } 526 withBackend(runBackend _) { 527 val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) 528 val duration = Duration(1, SECONDS) 529 awaitJobTermination(jobFuture, duration) 530 } 531 assert(results === (0 until 10).map { _ -> 42 }.toMap) 532 assertDataStructuresEmpty() 533 } 534 535 /** 536 * 5 stage job, diamond dependencies. 537 * 538 * a ----> b ----> d --> result 539 * \--> c --/ 540 * 541 * Backend successfully completes each task 542 */ 543 testScheduler("multi-stage job") { 544 545 def stageToOutputParts(stageId: Int): Int = { 546 stageId match { 547 case 0 => 10 548 case 2 => 20 549 case _ => 30 550 } 551 } 552 553 val a = new MockRDD(sc, 2, Nil) 554 val b = shuffle(10, a) 555 val c = shuffle(20, a) 556 val d = join(30, b, c) 557 558 def runBackend(): Unit = { 559 val (taskDescription, task) = backend.beginTask() 560 561 // make sure the required map output is available 562 task.stageId match { 563 case 4 => assertMapOutputAvailable(d) 564 case _ => 565 // we can't check for the output for the two intermediate stages, unfortunately, 566 // b/c the stage numbering is non-deterministic, so stage number alone doesn't tell 567 // us what to check 568 } 569 570 (task.stageId, task.stageAttemptId, task.partitionId) match { 571 case (stage, 0, _) if stage < 4 => 572 backend.taskSuccess(taskDescription, 573 DAGSchedulerSuite.makeMapStatus("hostA", stageToOutputParts(stage))) 574 case (4, 0, partition) => 575 backend.taskSuccess(taskDescription, 4321 + partition) 576 } 577 } 578 withBackend(runBackend _) { 579 val jobFuture = submit(d, (0 until 30).toArray) 580 val duration = Duration(1, SECONDS) 581 awaitJobTermination(jobFuture, duration) 582 } 583 assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap) 584 assertDataStructuresEmpty() 585 } 586 587 /** 588 * 2 stage job, with a fetch failure. Make sure that: 589 * (a) map output is available whenever we run stage 1 590 * (b) we get a second attempt for stage 0 & stage 1 591 */ 592 testScheduler("job with fetch failure") { 593 val input = new MockRDD(sc, 2, Nil) 594 val shuffledRdd = shuffle(10, input) 595 val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId 596 597 val stageToAttempts = new HashMap[Int, HashSet[Int]]() 598 599 def runBackend(): Unit = { 600 val (taskDescription, task) = backend.beginTask() 601 stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId 602 603 // We cannot check if shuffle output is available, because the failed fetch will clear the 604 // shuffle output. Then we'd have a race, between the already-started task from the first 605 // attempt, and when the failure clears out the map output status. 606 607 (task.stageId, task.stageAttemptId, task.partitionId) match { 608 case (0, _, _) => 609 backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 10)) 610 case (1, 0, 0) => 611 val fetchFailed = FetchFailed( 612 DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored") 613 backend.taskFailed(taskDescription, fetchFailed) 614 case (1, _, partition) => 615 backend.taskSuccess(taskDescription, 42 + partition) 616 } 617 } 618 withBackend(runBackend _) { 619 val jobFuture = submit(shuffledRdd, (0 until 10).toArray) 620 val duration = Duration(1, SECONDS) 621 awaitJobTermination(jobFuture, duration) 622 } 623 assertDataStructuresEmpty() 624 assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap) 625 assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1))) 626 } 627 628 testScheduler("job failure after 4 attempts") { 629 def runBackend(): Unit = { 630 val (taskDescription, _) = backend.beginTask() 631 backend.taskFailed(taskDescription, new RuntimeException("test task failure")) 632 } 633 withBackend(runBackend _) { 634 val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) 635 val duration = Duration(1, SECONDS) 636 awaitJobTermination(jobFuture, duration) 637 assert(failure.getMessage.contains("test task failure")) 638 } 639 assertDataStructuresEmpty(noFailure = false) 640 } 641} 642