1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import java.util.Properties 21import java.util.concurrent.atomic.AtomicBoolean 22 23import scala.annotation.meta.param 24import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} 25import scala.language.reflectiveCalls 26import scala.util.control.NonFatal 27 28import org.scalatest.concurrent.Timeouts 29import org.scalatest.time.SpanSugar._ 30 31import org.apache.spark._ 32import org.apache.spark.broadcast.BroadcastManager 33import org.apache.spark.rdd.RDD 34import org.apache.spark.scheduler.SchedulingMode.SchedulingMode 35import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} 36import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} 37import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} 38 39class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) 40 extends DAGSchedulerEventProcessLoop(dagScheduler) { 41 42 override def post(event: DAGSchedulerEvent): Unit = { 43 try { 44 // Forward event to `onReceive` directly to avoid processing event asynchronously. 45 onReceive(event) 46 } catch { 47 case NonFatal(e) => onError(e) 48 } 49 } 50 51 override def onError(e: Throwable): Unit = { 52 logError("Error in DAGSchedulerEventLoop: ", e) 53 dagScheduler.stop() 54 throw e 55 } 56 57} 58 59/** 60 * An RDD for passing to DAGScheduler. These RDDs will use the dependencies and 61 * preferredLocations (if any) that are passed to them. They are deliberately not executable 62 * so we can test that DAGScheduler does not try to execute RDDs locally. 63 * 64 * Optionally, one can pass in a list of locations to use as preferred locations for each task, 65 * and a MapOutputTrackerMaster to enable reduce task locality. We pass the tracker separately 66 * because, in this test suite, it won't be the same as sc.env.mapOutputTracker. 67 */ 68class MyRDD( 69 sc: SparkContext, 70 numPartitions: Int, 71 dependencies: List[Dependency[_]], 72 locations: Seq[Seq[String]] = Nil, 73 @(transient @param) tracker: MapOutputTrackerMaster = null) 74 extends RDD[(Int, Int)](sc, dependencies) with Serializable { 75 76 override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = 77 throw new RuntimeException("should not be reached") 78 79 override def getPartitions: Array[Partition] = (0 until numPartitions).map(i => new Partition { 80 override def index: Int = i 81 }).toArray 82 83 override def getPreferredLocations(partition: Partition): Seq[String] = { 84 if (locations.isDefinedAt(partition.index)) { 85 locations(partition.index) 86 } else if (tracker != null && dependencies.size == 1 && 87 dependencies(0).isInstanceOf[ShuffleDependency[_, _, _]]) { 88 // If we have only one shuffle dependency, use the same code path as ShuffledRDD for locality 89 val dep = dependencies(0).asInstanceOf[ShuffleDependency[_, _, _]] 90 tracker.getPreferredLocationsForShuffle(dep, partition.index) 91 } else { 92 Nil 93 } 94 } 95 96 override def toString: String = "DAGSchedulerSuiteRDD " + id 97} 98 99class DAGSchedulerSuiteDummyException extends Exception 100 101class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeouts { 102 103 import DAGSchedulerSuite._ 104 105 val conf = new SparkConf 106 /** Set of TaskSets the DAGScheduler has requested executed. */ 107 val taskSets = scala.collection.mutable.Buffer[TaskSet]() 108 109 /** Stages for which the DAGScheduler has called TaskScheduler.cancelTasks(). */ 110 val cancelledStages = new HashSet[Int]() 111 112 val taskScheduler = new TaskScheduler() { 113 override def rootPool: Pool = null 114 override def schedulingMode: SchedulingMode = SchedulingMode.NONE 115 override def start() = {} 116 override def stop() = {} 117 override def executorHeartbeatReceived( 118 execId: String, 119 accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], 120 blockManagerId: BlockManagerId): Boolean = true 121 override def submitTasks(taskSet: TaskSet) = { 122 // normally done by TaskSetManager 123 taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) 124 taskSets += taskSet 125 } 126 override def cancelTasks(stageId: Int, interruptThread: Boolean) { 127 cancelledStages += stageId 128 } 129 override def setDAGScheduler(dagScheduler: DAGScheduler) = {} 130 override def defaultParallelism() = 2 131 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} 132 override def applicationAttemptId(): Option[String] = None 133 } 134 135 /** Length of time to wait while draining listener events. */ 136 val WAIT_TIMEOUT_MILLIS = 10000 137 val sparkListener = new SparkListener() { 138 val submittedStageInfos = new HashSet[StageInfo] 139 val successfulStages = new HashSet[Int] 140 val failedStages = new ArrayBuffer[Int] 141 val stageByOrderOfExecution = new ArrayBuffer[Int] 142 val endedTasks = new HashSet[Long] 143 144 override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { 145 submittedStageInfos += stageSubmitted.stageInfo 146 } 147 148 override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { 149 val stageInfo = stageCompleted.stageInfo 150 stageByOrderOfExecution += stageInfo.stageId 151 if (stageInfo.failureReason.isEmpty) { 152 successfulStages += stageInfo.stageId 153 } else { 154 failedStages += stageInfo.stageId 155 } 156 } 157 158 override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { 159 endedTasks += taskEnd.taskInfo.taskId 160 } 161 } 162 163 var mapOutputTracker: MapOutputTrackerMaster = null 164 var broadcastManager: BroadcastManager = null 165 var securityMgr: SecurityManager = null 166 var scheduler: DAGScheduler = null 167 var dagEventProcessLoopTester: DAGSchedulerEventProcessLoop = null 168 169 /** 170 * Set of cache locations to return from our mock BlockManagerMaster. 171 * Keys are (rdd ID, partition ID). Anything not present will return an empty 172 * list of cache locations silently. 173 */ 174 val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] 175 // stub out BlockManagerMaster.getLocations to use our cacheLocations 176 val blockManagerMaster = new BlockManagerMaster(null, conf, true) { 177 override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { 178 blockIds.map { 179 _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). 180 getOrElse(Seq()) 181 }.toIndexedSeq 182 } 183 override def removeExecutor(execId: String) { 184 // don't need to propagate to the driver, which we don't have 185 } 186 } 187 188 /** The list of results that DAGScheduler has collected. */ 189 val results = new HashMap[Int, Any]() 190 var failure: Exception = _ 191 val jobListener = new JobListener() { 192 override def taskSucceeded(index: Int, result: Any) = results.put(index, result) 193 override def jobFailed(exception: Exception) = { failure = exception } 194 } 195 196 /** A simple helper class for creating custom JobListeners */ 197 class SimpleListener extends JobListener { 198 val results = new HashMap[Int, Any] 199 var failure: Exception = null 200 override def taskSucceeded(index: Int, result: Any): Unit = results.put(index, result) 201 override def jobFailed(exception: Exception): Unit = { failure = exception } 202 } 203 204 override def beforeEach(): Unit = { 205 super.beforeEach() 206 init(new SparkConf()) 207 } 208 209 private def init(testConf: SparkConf): Unit = { 210 sc = new SparkContext("local", "DAGSchedulerSuite", testConf) 211 sparkListener.submittedStageInfos.clear() 212 sparkListener.successfulStages.clear() 213 sparkListener.failedStages.clear() 214 sparkListener.endedTasks.clear() 215 failure = null 216 sc.addSparkListener(sparkListener) 217 taskSets.clear() 218 cancelledStages.clear() 219 cacheLocations.clear() 220 results.clear() 221 securityMgr = new SecurityManager(conf) 222 broadcastManager = new BroadcastManager(true, conf, securityMgr) 223 mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) { 224 override def sendTracker(message: Any): Unit = { 225 // no-op, just so we can stop this to avoid leaking threads 226 } 227 } 228 scheduler = new DAGScheduler( 229 sc, 230 taskScheduler, 231 sc.listenerBus, 232 mapOutputTracker, 233 blockManagerMaster, 234 sc.env) 235 dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) 236 } 237 238 override def afterEach(): Unit = { 239 try { 240 scheduler.stop() 241 dagEventProcessLoopTester.stop() 242 mapOutputTracker.stop() 243 broadcastManager.stop() 244 } finally { 245 super.afterEach() 246 } 247 } 248 249 override def afterAll() { 250 super.afterAll() 251 } 252 253 /** 254 * Type of RDD we use for testing. Note that we should never call the real RDD compute methods. 255 * This is a pair RDD type so it can always be used in ShuffleDependencies. 256 */ 257 type PairOfIntsRDD = RDD[(Int, Int)] 258 259 /** 260 * Process the supplied event as if it were the top of the DAGScheduler event queue, expecting 261 * the scheduler not to exit. 262 * 263 * After processing the event, submit waiting stages as is done on most iterations of the 264 * DAGScheduler event loop. 265 */ 266 private def runEvent(event: DAGSchedulerEvent) { 267 dagEventProcessLoopTester.post(event) 268 } 269 270 /** 271 * When we submit dummy Jobs, this is the compute function we supply. Except in a local test 272 * below, we do not expect this function to ever be executed; instead, we will return results 273 * directly through CompletionEvents. 274 */ 275 private val jobComputeFunc = (context: TaskContext, it: Iterator[(_)]) => 276 it.next.asInstanceOf[Tuple2[_, _]]._1 277 278 /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ 279 private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { 280 assert(taskSet.tasks.size >= results.size) 281 for ((result, i) <- results.zipWithIndex) { 282 if (i < taskSet.tasks.size) { 283 runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2)) 284 } 285 } 286 } 287 288 private def completeWithAccumulator( 289 accumId: Long, 290 taskSet: TaskSet, 291 results: Seq[(TaskEndReason, Any)]) { 292 assert(taskSet.tasks.size >= results.size) 293 for ((result, i) <- results.zipWithIndex) { 294 if (i < taskSet.tasks.size) { 295 runEvent(makeCompletionEvent( 296 taskSet.tasks(i), 297 result._1, 298 result._2, 299 Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accumId)))) 300 } 301 } 302 } 303 304 /** Submits a job to the scheduler and returns the job id. */ 305 private def submit( 306 rdd: RDD[_], 307 partitions: Array[Int], 308 func: (TaskContext, Iterator[_]) => _ = jobComputeFunc, 309 listener: JobListener = jobListener, 310 properties: Properties = null): Int = { 311 val jobId = scheduler.nextJobId.getAndIncrement() 312 runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties)) 313 jobId 314 } 315 316 /** Submits a map stage to the scheduler and returns the job id. */ 317 private def submitMapStage( 318 shuffleDep: ShuffleDependency[_, _, _], 319 listener: JobListener = jobListener): Int = { 320 val jobId = scheduler.nextJobId.getAndIncrement() 321 runEvent(MapStageSubmitted(jobId, shuffleDep, CallSite("", ""), listener)) 322 jobId 323 } 324 325 /** Sends TaskSetFailed to the scheduler. */ 326 private def failed(taskSet: TaskSet, message: String) { 327 runEvent(TaskSetFailed(taskSet, message, None)) 328 } 329 330 /** Sends JobCancelled to the DAG scheduler. */ 331 private def cancel(jobId: Int) { 332 runEvent(JobCancelled(jobId)) 333 } 334 335 test("[SPARK-3353] parent stage should have lower stage id") { 336 sparkListener.stageByOrderOfExecution.clear() 337 sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() 338 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 339 assert(sparkListener.stageByOrderOfExecution.length === 2) 340 assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) 341 } 342 343 /** 344 * This test ensures that DAGScheduler build stage graph correctly. 345 * 346 * Suppose you have the following DAG: 347 * 348 * [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D] 349 * \ / 350 * <------------- 351 * 352 * Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both 353 * B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example 354 * easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the 355 * shuffled data from B shuffle dependency ID s_B. 356 * 357 * Note: [] means an RDD, () means a shuffle dependency. 358 */ 359 test("[SPARK-13902] Ensure no duplicate stages are created") { 360 val rddA = new MyRDD(sc, 1, Nil) 361 val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) 362 val s_A = shuffleDepA.shuffleId 363 364 val rddB = new MyRDD(sc, 1, List(shuffleDepA), tracker = mapOutputTracker) 365 val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1)) 366 val s_B = shuffleDepB.shuffleId 367 368 val rddC = new MyRDD(sc, 1, List(shuffleDepA, shuffleDepB), tracker = mapOutputTracker) 369 val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1)) 370 val s_C = shuffleDepC.shuffleId 371 372 val rddD = new MyRDD(sc, 1, List(shuffleDepC), tracker = mapOutputTracker) 373 374 submit(rddD, Array(0)) 375 376 assert(scheduler.shuffleIdToMapStage.size === 3) 377 assert(scheduler.activeJobs.size === 1) 378 379 val mapStageA = scheduler.shuffleIdToMapStage(s_A) 380 val mapStageB = scheduler.shuffleIdToMapStage(s_B) 381 val mapStageC = scheduler.shuffleIdToMapStage(s_C) 382 val finalStage = scheduler.activeJobs.head.finalStage 383 384 assert(mapStageA.parents.isEmpty) 385 assert(mapStageB.parents === List(mapStageA)) 386 assert(mapStageC.parents === List(mapStageA, mapStageB)) 387 assert(finalStage.parents === List(mapStageC)) 388 389 complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) 390 complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) 391 complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) 392 complete(taskSets(3), Seq((Success, 42))) 393 assert(results === Map(0 -> 42)) 394 assertDataStructuresEmpty() 395 } 396 397 test("zero split job") { 398 var numResults = 0 399 var failureReason: Option[Exception] = None 400 val fakeListener = new JobListener() { 401 override def taskSucceeded(partition: Int, value: Any): Unit = numResults += 1 402 override def jobFailed(exception: Exception): Unit = { 403 failureReason = Some(exception) 404 } 405 } 406 val jobId = submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener) 407 assert(numResults === 0) 408 cancel(jobId) 409 assert(failureReason.isDefined) 410 assert(failureReason.get.getMessage() === "Job 0 cancelled ") 411 } 412 413 test("run trivial job") { 414 submit(new MyRDD(sc, 1, Nil), Array(0)) 415 complete(taskSets(0), List((Success, 42))) 416 assert(results === Map(0 -> 42)) 417 assertDataStructuresEmpty() 418 } 419 420 test("run trivial job w/ dependency") { 421 val baseRdd = new MyRDD(sc, 1, Nil) 422 val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) 423 submit(finalRdd, Array(0)) 424 complete(taskSets(0), Seq((Success, 42))) 425 assert(results === Map(0 -> 42)) 426 assertDataStructuresEmpty() 427 } 428 429 test("equals and hashCode AccumulableInfo") { 430 val accInfo1 = new AccumulableInfo( 431 1, Some("a1"), Some("delta1"), Some("val1"), internal = true, countFailedValues = false) 432 val accInfo2 = new AccumulableInfo( 433 1, Some("a1"), Some("delta1"), Some("val1"), internal = false, countFailedValues = false) 434 val accInfo3 = new AccumulableInfo( 435 1, Some("a1"), Some("delta1"), Some("val1"), internal = false, countFailedValues = false) 436 assert(accInfo1 !== accInfo2) 437 assert(accInfo2 === accInfo3) 438 assert(accInfo2.hashCode() === accInfo3.hashCode()) 439 } 440 441 test("cache location preferences w/ dependency") { 442 val baseRdd = new MyRDD(sc, 1, Nil).cache() 443 val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) 444 cacheLocations(baseRdd.id -> 0) = 445 Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) 446 submit(finalRdd, Array(0)) 447 val taskSet = taskSets(0) 448 assertLocations(taskSet, Seq(Seq("hostA", "hostB"))) 449 complete(taskSet, Seq((Success, 42))) 450 assert(results === Map(0 -> 42)) 451 assertDataStructuresEmpty() 452 } 453 454 test("regression test for getCacheLocs") { 455 val rdd = new MyRDD(sc, 3, Nil).cache() 456 cacheLocations(rdd.id -> 0) = 457 Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) 458 cacheLocations(rdd.id -> 1) = 459 Seq(makeBlockManagerId("hostB"), makeBlockManagerId("hostC")) 460 cacheLocations(rdd.id -> 2) = 461 Seq(makeBlockManagerId("hostC"), makeBlockManagerId("hostD")) 462 val locs = scheduler.getCacheLocs(rdd).map(_.map(_.host)) 463 assert(locs === Seq(Seq("hostA", "hostB"), Seq("hostB", "hostC"), Seq("hostC", "hostD"))) 464 } 465 466 /** 467 * This test ensures that if a particular RDD is cached, RDDs earlier in the dependency chain 468 * are not computed. It constructs the following chain of dependencies: 469 * +---+ shuffle +---+ +---+ +---+ 470 * | A |<--------| B |<---| C |<---| D | 471 * +---+ +---+ +---+ +---+ 472 * Here, B is derived from A by performing a shuffle, C has a one-to-one dependency on B, 473 * and D similarly has a one-to-one dependency on C. If none of the RDDs were cached, this 474 * set of RDDs would result in a two stage job: one ShuffleMapStage, and a ResultStage that 475 * reads the shuffled data from RDD A. This test ensures that if C is cached, the scheduler 476 * doesn't perform a shuffle, and instead computes the result using a single ResultStage 477 * that reads C's cached data. 478 */ 479 test("getMissingParentStages should consider all ancestor RDDs' cache statuses") { 480 val rddA = new MyRDD(sc, 1, Nil) 481 val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, new HashPartitioner(1))), 482 tracker = mapOutputTracker) 483 val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))).cache() 484 val rddD = new MyRDD(sc, 1, List(new OneToOneDependency(rddC))) 485 cacheLocations(rddC.id -> 0) = 486 Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) 487 submit(rddD, Array(0)) 488 assert(scheduler.runningStages.size === 1) 489 // Make sure that the scheduler is running the final result stage. 490 // Because C is cached, the shuffle map stage to compute A does not need to be run. 491 assert(scheduler.runningStages.head.isInstanceOf[ResultStage]) 492 } 493 494 test("avoid exponential blowup when getting preferred locs list") { 495 // Build up a complex dependency graph with repeated zip operations, without preferred locations 496 var rdd: RDD[_] = new MyRDD(sc, 1, Nil) 497 (1 to 30).foreach(_ => rdd = rdd.zip(rdd)) 498 // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided. 499 failAfter(10 seconds) { 500 val preferredLocs = scheduler.getPreferredLocs(rdd, 0) 501 // No preferred locations are returned. 502 assert(preferredLocs.length === 0) 503 } 504 } 505 506 test("unserializable task") { 507 val unserializableRdd = new MyRDD(sc, 1, Nil) { 508 class UnserializableClass 509 val unserializable = new UnserializableClass 510 } 511 submit(unserializableRdd, Array(0)) 512 assert(failure.getMessage.startsWith( 513 "Job aborted due to stage failure: Task not serializable:")) 514 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 515 assert(sparkListener.failedStages.contains(0)) 516 assert(sparkListener.failedStages.size === 1) 517 assertDataStructuresEmpty() 518 } 519 520 test("trivial job failure") { 521 submit(new MyRDD(sc, 1, Nil), Array(0)) 522 failed(taskSets(0), "some failure") 523 assert(failure.getMessage === "Job aborted due to stage failure: some failure") 524 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 525 assert(sparkListener.failedStages.contains(0)) 526 assert(sparkListener.failedStages.size === 1) 527 assertDataStructuresEmpty() 528 } 529 530 test("trivial job cancellation") { 531 val rdd = new MyRDD(sc, 1, Nil) 532 val jobId = submit(rdd, Array(0)) 533 cancel(jobId) 534 assert(failure.getMessage === s"Job $jobId cancelled ") 535 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 536 assert(sparkListener.failedStages.contains(0)) 537 assert(sparkListener.failedStages.size === 1) 538 assertDataStructuresEmpty() 539 } 540 541 test("job cancellation no-kill backend") { 542 // make sure that the DAGScheduler doesn't crash when the TaskScheduler 543 // doesn't implement killTask() 544 val noKillTaskScheduler = new TaskScheduler() { 545 override def rootPool: Pool = null 546 override def schedulingMode: SchedulingMode = SchedulingMode.NONE 547 override def start(): Unit = {} 548 override def stop(): Unit = {} 549 override def submitTasks(taskSet: TaskSet): Unit = { 550 taskSets += taskSet 551 } 552 override def cancelTasks(stageId: Int, interruptThread: Boolean) { 553 throw new UnsupportedOperationException 554 } 555 override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} 556 override def defaultParallelism(): Int = 2 557 override def executorHeartbeatReceived( 558 execId: String, 559 accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], 560 blockManagerId: BlockManagerId): Boolean = true 561 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} 562 override def applicationAttemptId(): Option[String] = None 563 } 564 val noKillScheduler = new DAGScheduler( 565 sc, 566 noKillTaskScheduler, 567 sc.listenerBus, 568 mapOutputTracker, 569 blockManagerMaster, 570 sc.env) 571 dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(noKillScheduler) 572 val jobId = submit(new MyRDD(sc, 1, Nil), Array(0)) 573 cancel(jobId) 574 // Because the job wasn't actually cancelled, we shouldn't have received a failure message. 575 assert(failure === null) 576 577 // When the task set completes normally, state should be correctly updated. 578 complete(taskSets(0), Seq((Success, 42))) 579 assert(results === Map(0 -> 42)) 580 assertDataStructuresEmpty() 581 582 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 583 assert(sparkListener.failedStages.isEmpty) 584 assert(sparkListener.successfulStages.contains(0)) 585 } 586 587 test("run trivial shuffle") { 588 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 589 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) 590 val shuffleId = shuffleDep.shuffleId 591 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) 592 submit(reduceRdd, Array(0)) 593 complete(taskSets(0), Seq( 594 (Success, makeMapStatus("hostA", 1)), 595 (Success, makeMapStatus("hostB", 1)))) 596 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === 597 HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) 598 complete(taskSets(1), Seq((Success, 42))) 599 assert(results === Map(0 -> 42)) 600 assertDataStructuresEmpty() 601 } 602 603 test("run trivial shuffle with fetch failure") { 604 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 605 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 606 val shuffleId = shuffleDep.shuffleId 607 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 608 submit(reduceRdd, Array(0, 1)) 609 complete(taskSets(0), Seq( 610 (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), 611 (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) 612 // the 2nd ResultTask failed 613 complete(taskSets(1), Seq( 614 (Success, 42), 615 (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null))) 616 // this will get called 617 // blockManagerMaster.removeExecutor("exec-hostA") 618 // ask the scheduler to try it again 619 scheduler.resubmitFailedStages() 620 // have the 2nd attempt pass 621 complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length)))) 622 // we can see both result blocks now 623 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === 624 HashSet("hostA", "hostB")) 625 complete(taskSets(3), Seq((Success, 43))) 626 assert(results === Map(0 -> 42, 1 -> 43)) 627 assertDataStructuresEmpty() 628 } 629 630 private val shuffleFileLossTests = Seq( 631 ("slave lost with shuffle service", SlaveLost("", false), true, false), 632 ("worker lost with shuffle service", SlaveLost("", true), true, true), 633 ("worker lost without shuffle service", SlaveLost("", true), false, true), 634 ("executor failure with shuffle service", ExecutorKilled, true, false), 635 ("executor failure without shuffle service", ExecutorKilled, false, true)) 636 637 for ((eventDescription, event, shuffleServiceOn, expectFileLoss) <- shuffleFileLossTests) { 638 val maybeLost = if (expectFileLoss) { 639 "lost" 640 } else { 641 "not lost" 642 } 643 test(s"shuffle files $maybeLost when $eventDescription") { 644 // reset the test context with the right shuffle service config 645 afterEach() 646 val conf = new SparkConf() 647 conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString) 648 init(conf) 649 assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn) 650 651 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 652 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) 653 val shuffleId = shuffleDep.shuffleId 654 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) 655 submit(reduceRdd, Array(0)) 656 complete(taskSets(0), Seq( 657 (Success, makeMapStatus("hostA", 1)), 658 (Success, makeMapStatus("hostB", 1)))) 659 runEvent(ExecutorLost("exec-hostA", event)) 660 if (expectFileLoss) { 661 intercept[MetadataFetchFailedException] { 662 mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0) 663 } 664 } else { 665 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === 666 HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) 667 } 668 } 669 } 670 671 // Helper function to validate state when creating tests for task failures 672 private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { 673 assert(stageAttempt.stageId === stageId) 674 assert(stageAttempt.stageAttemptId == attempt) 675 } 676 677 // Helper functions to extract commonly used code in Fetch Failure test cases 678 private def setupStageAbortTest(sc: SparkContext) { 679 sc.listenerBus.addListener(new EndListener()) 680 ended = false 681 jobResult = null 682 } 683 684 // Create a new Listener to confirm that the listenerBus sees the JobEnd message 685 // when we abort the stage. This message will also be consumed by the EventLoggingListener 686 // so this will propagate up to the user. 687 var ended = false 688 var jobResult : JobResult = null 689 690 class EndListener extends SparkListener { 691 override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { 692 jobResult = jobEnd.jobResult 693 ended = true 694 } 695 } 696 697 /** 698 * Common code to get the next stage attempt, confirm it's the one we expect, and complete it 699 * successfully. 700 * 701 * @param stageId - The current stageId 702 * @param attemptIdx - The current attempt count 703 * @param numShufflePartitions - The number of partitions in the next stage 704 */ 705 private def completeShuffleMapStageSuccessfully( 706 stageId: Int, 707 attemptIdx: Int, 708 numShufflePartitions: Int): Unit = { 709 val stageAttempt = taskSets.last 710 checkStageId(stageId, attemptIdx, stageAttempt) 711 complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { 712 case (task, idx) => 713 (Success, makeMapStatus("host" + ('A' + idx).toChar, numShufflePartitions)) 714 }.toSeq) 715 } 716 717 /** 718 * Common code to get the next stage attempt, confirm it's the one we expect, and complete it 719 * with all FetchFailure. 720 * 721 * @param stageId - The current stageId 722 * @param attemptIdx - The current attempt count 723 * @param shuffleDep - The shuffle dependency of the stage with a fetch failure 724 */ 725 private def completeNextStageWithFetchFailure( 726 stageId: Int, 727 attemptIdx: Int, 728 shuffleDep: ShuffleDependency[_, _, _]): Unit = { 729 val stageAttempt = taskSets.last 730 checkStageId(stageId, attemptIdx, stageAttempt) 731 complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) => 732 (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null) 733 }.toSeq) 734 } 735 736 /** 737 * Common code to get the next result stage attempt, confirm it's the one we expect, and 738 * complete it with a success where we return 42. 739 * 740 * @param stageId - The current stageId 741 * @param attemptIdx - The current attempt count 742 */ 743 private def completeNextResultStageWithSuccess( 744 stageId: Int, 745 attemptIdx: Int, 746 partitionToResult: Int => Int = _ => 42): Unit = { 747 val stageAttempt = taskSets.last 748 checkStageId(stageId, attemptIdx, stageAttempt) 749 assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) 750 val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) => 751 (Success, partitionToResult(idx)) 752 } 753 complete(stageAttempt, taskResults.toSeq) 754 } 755 756 /** 757 * In this test, we simulate a job where many tasks in the same stage fail. We want to show 758 * that many fetch failures inside a single stage attempt do not trigger an abort 759 * on their own, but only when there are enough failing stage attempts. 760 */ 761 test("Single stage fetch failure should not abort the stage.") { 762 setupStageAbortTest(sc) 763 764 val parts = 8 765 val shuffleMapRdd = new MyRDD(sc, parts, Nil) 766 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) 767 val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) 768 submit(reduceRdd, (0 until parts).toArray) 769 770 completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts) 771 772 completeNextStageWithFetchFailure(1, 0, shuffleDep) 773 774 // Resubmit and confirm that now all is well 775 scheduler.resubmitFailedStages() 776 777 assert(scheduler.runningStages.nonEmpty) 778 assert(!ended) 779 780 // Complete stage 0 and then stage 1 with a "42" 781 completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = parts) 782 completeNextResultStageWithSuccess(1, 1) 783 784 // Confirm job finished successfully 785 sc.listenerBus.waitUntilEmpty(1000) 786 assert(ended === true) 787 assert(results === (0 until parts).map { idx => idx -> 42 }.toMap) 788 assertDataStructuresEmpty() 789 } 790 791 /** 792 * In this test we simulate a job failure where the first stage completes successfully and 793 * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage 794 * trigger an overall job abort to avoid endless retries. 795 */ 796 test("Multiple consecutive stage fetch failures should lead to job being aborted.") { 797 setupStageAbortTest(sc) 798 799 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 800 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 801 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 802 submit(reduceRdd, Array(0, 1)) 803 804 for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) { 805 // Complete all the tasks for the current attempt of stage 0 successfully 806 completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2) 807 808 // Now we should have a new taskSet, for a new attempt of stage 1. 809 // Fail all these tasks with FetchFailure 810 completeNextStageWithFetchFailure(1, attempt, shuffleDep) 811 812 // this will trigger a resubmission of stage 0, since we've lost some of its 813 // map output, for the next iteration through the loop 814 scheduler.resubmitFailedStages() 815 816 if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) { 817 assert(scheduler.runningStages.nonEmpty) 818 assert(!ended) 819 } else { 820 // Stage should have been aborted and removed from running stages 821 assertDataStructuresEmpty() 822 sc.listenerBus.waitUntilEmpty(1000) 823 assert(ended) 824 jobResult match { 825 case JobFailed(reason) => 826 assert(reason.getMessage.contains("ResultStage 1 () has failed the maximum")) 827 case other => fail(s"expected JobFailed, not $other") 828 } 829 } 830 } 831 } 832 833 /** 834 * In this test, we create a job with two consecutive shuffles, and simulate 2 failures for each 835 * shuffle fetch. In total In total, the job has had four failures overall but not four failures 836 * for a particular stage, and as such should not be aborted. 837 */ 838 test("Failures in different stages should not trigger an overall abort") { 839 setupStageAbortTest(sc) 840 841 val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache() 842 val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2)) 843 val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache() 844 val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1)) 845 val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker) 846 submit(finalRdd, Array(0)) 847 848 // In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations, 849 // stage 2 fails. 850 for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) { 851 // Complete all the tasks for the current attempt of stage 0 successfully 852 completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2) 853 854 if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) { 855 // Now we should have a new taskSet, for a new attempt of stage 1. 856 // Fail all these tasks with FetchFailure 857 completeNextStageWithFetchFailure(1, attempt, shuffleDepOne) 858 } else { 859 completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 1) 860 861 // Fail stage 2 862 completeNextStageWithFetchFailure(2, attempt - Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2, 863 shuffleDepTwo) 864 } 865 866 // this will trigger a resubmission of stage 0, since we've lost some of its 867 // map output, for the next iteration through the loop 868 scheduler.resubmitFailedStages() 869 } 870 871 completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2) 872 completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1) 873 874 // Succeed stage2 with a "42" 875 completeNextResultStageWithSuccess(2, Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2) 876 877 assert(results === Map(0 -> 42)) 878 assertDataStructuresEmpty() 879 } 880 881 /** 882 * In this test we demonstrate that only consecutive failures trigger a stage abort. A stage may 883 * fail multiple times, succeed, then fail a few more times (because its run again by downstream 884 * dependencies). The total number of failed attempts for one stage will go over the limit, 885 * but that doesn't matter, since they have successes in the middle. 886 */ 887 test("Non-consecutive stage failures don't trigger abort") { 888 setupStageAbortTest(sc) 889 890 val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache() 891 val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2)) 892 val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache() 893 val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1)) 894 val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker) 895 submit(finalRdd, Array(0)) 896 897 // First, execute stages 0 and 1, failing stage 1 up to MAX-1 times. 898 for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) { 899 // Make each task in stage 0 success 900 completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2) 901 902 // Now we should have a new taskSet, for a new attempt of stage 1. 903 // Fail these tasks with FetchFailure 904 completeNextStageWithFetchFailure(1, attempt, shuffleDepOne) 905 906 scheduler.resubmitFailedStages() 907 908 // Confirm we have not yet aborted 909 assert(scheduler.runningStages.nonEmpty) 910 assert(!ended) 911 } 912 913 // Rerun stage 0 and 1 to step through the task set 914 completeShuffleMapStageSuccessfully(0, 3, numShufflePartitions = 2) 915 completeShuffleMapStageSuccessfully(1, 3, numShufflePartitions = 1) 916 917 // Fail stage 2 so that stage 1 is resubmitted when we call scheduler.resubmitFailedStages() 918 completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) 919 920 scheduler.resubmitFailedStages() 921 922 // Rerun stage 0 to step through the task set 923 completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2) 924 925 // Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this doesn't trigger an abort 926 // since we succeeded in between. 927 completeNextStageWithFetchFailure(1, 4, shuffleDepOne) 928 929 scheduler.resubmitFailedStages() 930 931 // Confirm we have not yet aborted 932 assert(scheduler.runningStages.nonEmpty) 933 assert(!ended) 934 935 // Next, succeed all and confirm output 936 // Rerun stage 0 + 1 937 completeShuffleMapStageSuccessfully(0, 5, numShufflePartitions = 2) 938 completeShuffleMapStageSuccessfully(1, 5, numShufflePartitions = 1) 939 940 // Succeed stage 2 and verify results 941 completeNextResultStageWithSuccess(2, 1) 942 943 assertDataStructuresEmpty() 944 sc.listenerBus.waitUntilEmpty(1000) 945 assert(ended === true) 946 assert(results === Map(0 -> 42)) 947 } 948 949 test("trivial shuffle with multiple fetch failures") { 950 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 951 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 952 val shuffleId = shuffleDep.shuffleId 953 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 954 submit(reduceRdd, Array(0, 1)) 955 complete(taskSets(0), Seq( 956 (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), 957 (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) 958 // The MapOutputTracker should know about both map output locations. 959 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === 960 HashSet("hostA", "hostB")) 961 962 // The first result task fails, with a fetch failure for the output from the first mapper. 963 runEvent(makeCompletionEvent( 964 taskSets(1).tasks(0), 965 FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), 966 null)) 967 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 968 assert(sparkListener.failedStages.contains(1)) 969 970 // The second ResultTask fails, with a fetch failure for the output from the second mapper. 971 runEvent(makeCompletionEvent( 972 taskSets(1).tasks(0), 973 FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"), 974 null)) 975 // The SparkListener should not receive redundant failure events. 976 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 977 assert(sparkListener.failedStages.size == 1) 978 } 979 980 /** 981 * This tests the case where another FetchFailed comes in while the map stage is getting 982 * re-run. 983 */ 984 test("late fetch failures don't cause multiple concurrent attempts for the same map stage") { 985 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 986 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 987 val shuffleId = shuffleDep.shuffleId 988 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 989 submit(reduceRdd, Array(0, 1)) 990 991 val mapStageId = 0 992 def countSubmittedMapStageAttempts(): Int = { 993 sparkListener.submittedStageInfos.count(_.stageId == mapStageId) 994 } 995 996 // The map stage should have been submitted. 997 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 998 assert(countSubmittedMapStageAttempts() === 1) 999 1000 complete(taskSets(0), Seq( 1001 (Success, makeMapStatus("hostA", 2)), 1002 (Success, makeMapStatus("hostB", 2)))) 1003 // The MapOutputTracker should know about both map output locations. 1004 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === 1005 HashSet("hostA", "hostB")) 1006 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 1).map(_._1.host).toSet === 1007 HashSet("hostA", "hostB")) 1008 1009 // The first result task fails, with a fetch failure for the output from the first mapper. 1010 runEvent(makeCompletionEvent( 1011 taskSets(1).tasks(0), 1012 FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), 1013 null)) 1014 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1015 assert(sparkListener.failedStages.contains(1)) 1016 1017 // Trigger resubmission of the failed map stage. 1018 runEvent(ResubmitFailedStages) 1019 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1020 1021 // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. 1022 assert(countSubmittedMapStageAttempts() === 2) 1023 1024 // The second ResultTask fails, with a fetch failure for the output from the second mapper. 1025 runEvent(makeCompletionEvent( 1026 taskSets(1).tasks(1), 1027 FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"), 1028 null)) 1029 1030 // Another ResubmitFailedStages event should not result in another attempt for the map 1031 // stage being run concurrently. 1032 // NOTE: the actual ResubmitFailedStages may get called at any time during this, but it 1033 // shouldn't effect anything -- our calling it just makes *SURE* it gets called between the 1034 // desired event and our check. 1035 runEvent(ResubmitFailedStages) 1036 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1037 assert(countSubmittedMapStageAttempts() === 2) 1038 1039 } 1040 1041 /** 1042 * This tests the case where a late FetchFailed comes in after the map stage has finished getting 1043 * retried and a new reduce stage starts running. 1044 */ 1045 test("extremely late fetch failures don't cause multiple concurrent attempts for " + 1046 "the same stage") { 1047 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 1048 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 1049 val shuffleId = shuffleDep.shuffleId 1050 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 1051 submit(reduceRdd, Array(0, 1)) 1052 1053 def countSubmittedReduceStageAttempts(): Int = { 1054 sparkListener.submittedStageInfos.count(_.stageId == 1) 1055 } 1056 def countSubmittedMapStageAttempts(): Int = { 1057 sparkListener.submittedStageInfos.count(_.stageId == 0) 1058 } 1059 1060 // The map stage should have been submitted. 1061 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1062 assert(countSubmittedMapStageAttempts() === 1) 1063 1064 // Complete the map stage. 1065 complete(taskSets(0), Seq( 1066 (Success, makeMapStatus("hostA", 2)), 1067 (Success, makeMapStatus("hostB", 2)))) 1068 1069 // The reduce stage should have been submitted. 1070 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1071 assert(countSubmittedReduceStageAttempts() === 1) 1072 1073 // The first result task fails, with a fetch failure for the output from the first mapper. 1074 runEvent(makeCompletionEvent( 1075 taskSets(1).tasks(0), 1076 FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), 1077 null)) 1078 1079 // Trigger resubmission of the failed map stage and finish the re-started map task. 1080 runEvent(ResubmitFailedStages) 1081 complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) 1082 1083 // Because the map stage finished, another attempt for the reduce stage should have been 1084 // submitted, resulting in 2 total attempts for each the map and the reduce stage. 1085 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1086 assert(countSubmittedMapStageAttempts() === 2) 1087 assert(countSubmittedReduceStageAttempts() === 2) 1088 1089 // A late FetchFailed arrives from the second task in the original reduce stage. 1090 runEvent(makeCompletionEvent( 1091 taskSets(1).tasks(1), 1092 FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"), 1093 null)) 1094 1095 // Running ResubmitFailedStages shouldn't result in any more attempts for the map stage, because 1096 // the FetchFailed should have been ignored 1097 runEvent(ResubmitFailedStages) 1098 1099 // The FetchFailed from the original reduce stage should be ignored. 1100 assert(countSubmittedMapStageAttempts() === 2) 1101 } 1102 1103 test("task events always posted in speculation / when stage is killed") { 1104 val baseRdd = new MyRDD(sc, 4, Nil) 1105 val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd))) 1106 submit(finalRdd, Array(0, 1, 2, 3)) 1107 1108 // complete two tasks 1109 runEvent(makeCompletionEvent( 1110 taskSets(0).tasks(0), Success, 42, 1111 Seq.empty, createFakeTaskInfoWithId(0))) 1112 runEvent(makeCompletionEvent( 1113 taskSets(0).tasks(1), Success, 42, 1114 Seq.empty, createFakeTaskInfoWithId(1))) 1115 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1116 // verify stage exists 1117 assert(scheduler.stageIdToStage.contains(0)) 1118 assert(sparkListener.endedTasks.size == 2) 1119 1120 // finish other 2 tasks 1121 runEvent(makeCompletionEvent( 1122 taskSets(0).tasks(2), Success, 42, 1123 Seq.empty, createFakeTaskInfoWithId(2))) 1124 runEvent(makeCompletionEvent( 1125 taskSets(0).tasks(3), Success, 42, 1126 Seq.empty, createFakeTaskInfoWithId(3))) 1127 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1128 assert(sparkListener.endedTasks.size == 4) 1129 1130 // verify the stage is done 1131 assert(!scheduler.stageIdToStage.contains(0)) 1132 1133 // Stage should be complete. Finish one other Successful task to simulate what can happen 1134 // with a speculative task and make sure the event is sent out 1135 runEvent(makeCompletionEvent( 1136 taskSets(0).tasks(3), Success, 42, 1137 Seq.empty, createFakeTaskInfoWithId(5))) 1138 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1139 assert(sparkListener.endedTasks.size == 5) 1140 1141 // make sure non successful tasks also send out event 1142 runEvent(makeCompletionEvent( 1143 taskSets(0).tasks(3), UnknownReason, 42, 1144 Seq.empty, createFakeTaskInfoWithId(6))) 1145 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1146 assert(sparkListener.endedTasks.size == 6) 1147 } 1148 1149 test("ignore late map task completions") { 1150 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 1151 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 1152 val shuffleId = shuffleDep.shuffleId 1153 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 1154 submit(reduceRdd, Array(0, 1)) 1155 1156 // pretend we were told hostA went away 1157 val oldEpoch = mapOutputTracker.getEpoch 1158 runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) 1159 val newEpoch = mapOutputTracker.getEpoch 1160 assert(newEpoch > oldEpoch) 1161 1162 // now start completing some tasks in the shuffle map stage, under different hosts 1163 // and epochs, and make sure scheduler updates its state correctly 1164 val taskSet = taskSets(0) 1165 val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage] 1166 assert(shuffleStage.numAvailableOutputs === 0) 1167 1168 // should be ignored for being too old 1169 runEvent(makeCompletionEvent( 1170 taskSet.tasks(0), 1171 Success, 1172 makeMapStatus("hostA", reduceRdd.partitions.size))) 1173 assert(shuffleStage.numAvailableOutputs === 0) 1174 1175 // should work because it's a non-failed host (so the available map outputs will increase) 1176 runEvent(makeCompletionEvent( 1177 taskSet.tasks(0), 1178 Success, 1179 makeMapStatus("hostB", reduceRdd.partitions.size))) 1180 assert(shuffleStage.numAvailableOutputs === 1) 1181 1182 // should be ignored for being too old 1183 runEvent(makeCompletionEvent( 1184 taskSet.tasks(0), 1185 Success, 1186 makeMapStatus("hostA", reduceRdd.partitions.size))) 1187 assert(shuffleStage.numAvailableOutputs === 1) 1188 1189 // should work because it's a new epoch, which will increase the number of available map 1190 // outputs, and also finish the stage 1191 taskSet.tasks(1).epoch = newEpoch 1192 runEvent(makeCompletionEvent( 1193 taskSet.tasks(1), 1194 Success, 1195 makeMapStatus("hostA", reduceRdd.partitions.size))) 1196 assert(shuffleStage.numAvailableOutputs === 2) 1197 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === 1198 HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) 1199 1200 // finish the next stage normally, which completes the job 1201 complete(taskSets(1), Seq((Success, 42), (Success, 43))) 1202 assert(results === Map(0 -> 42, 1 -> 43)) 1203 assertDataStructuresEmpty() 1204 } 1205 1206 test("run shuffle with map stage failure") { 1207 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 1208 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 1209 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 1210 submit(reduceRdd, Array(0, 1)) 1211 1212 // Fail the map stage. This should cause the entire job to fail. 1213 val stageFailureMessage = "Exception failure in map stage" 1214 failed(taskSets(0), stageFailureMessage) 1215 assert(failure.getMessage === s"Job aborted due to stage failure: $stageFailureMessage") 1216 1217 // Listener bus should get told about the map stage failing, but not the reduce stage 1218 // (since the reduce stage hasn't been started yet). 1219 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1220 assert(sparkListener.failedStages.toSet === Set(0)) 1221 1222 assertDataStructuresEmpty() 1223 } 1224 1225 /** 1226 * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which 1227 * requires regenerating some outputs of the shared dependency. One key aspect of this test is 1228 * that the second job actually uses a different stage for the shared dependency (a "skipped" 1229 * stage). 1230 */ 1231 test("shuffle fetch failure in a reused shuffle dependency") { 1232 // Run the first job successfully, which creates one shuffle dependency 1233 1234 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 1235 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 1236 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) 1237 submit(reduceRdd, Array(0, 1)) 1238 1239 completeShuffleMapStageSuccessfully(0, 0, 2) 1240 completeNextResultStageWithSuccess(1, 0) 1241 assert(results === Map(0 -> 42, 1 -> 42)) 1242 assertDataStructuresEmpty() 1243 1244 // submit another job w/ the shared dependency, and have a fetch failure 1245 val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) 1246 submit(reduce2, Array(0, 1)) 1247 // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped 1248 // stage. If instead it reused the existing stage, then this would be stage 2 1249 completeNextStageWithFetchFailure(3, 0, shuffleDep) 1250 scheduler.resubmitFailedStages() 1251 1252 // the scheduler now creates a new task set to regenerate the missing map output, but this time 1253 // using a different stage, the "skipped" one 1254 1255 // SPARK-9809 -- this stage is submitted without a task for each partition (because some of 1256 // the shuffle map output is still available from stage 0); make sure we've still got internal 1257 // accumulators setup 1258 assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics != null) 1259 completeShuffleMapStageSuccessfully(2, 0, 2) 1260 completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) 1261 assert(results === Map(0 -> 1234, 1 -> 1235)) 1262 1263 assertDataStructuresEmpty() 1264 } 1265 1266 /** 1267 * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we 1268 * have completions from both the first & second attempt of stage 1. So all the map output is 1269 * available before we finish any task set for stage 1. We want to make sure that we don't 1270 * submit stage 2 until the map output for stage 1 is registered 1271 */ 1272 test("don't submit stage until its dependencies map outputs are registered (SPARK-5259)") { 1273 val firstRDD = new MyRDD(sc, 3, Nil) 1274 val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) 1275 val firstShuffleId = firstShuffleDep.shuffleId 1276 val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) 1277 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 1278 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) 1279 submit(reduceRdd, Array(0)) 1280 1281 // things start out smoothly, stage 0 completes with no issues 1282 complete(taskSets(0), Seq( 1283 (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), 1284 (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)), 1285 (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length)) 1286 )) 1287 1288 // then one executor dies, and a task fails in stage 1 1289 runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) 1290 runEvent(makeCompletionEvent( 1291 taskSets(1).tasks(0), 1292 FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"), 1293 null)) 1294 1295 // so we resubmit stage 0, which completes happily 1296 scheduler.resubmitFailedStages() 1297 val stage0Resubmit = taskSets(2) 1298 assert(stage0Resubmit.stageId == 0) 1299 assert(stage0Resubmit.stageAttemptId === 1) 1300 val task = stage0Resubmit.tasks(0) 1301 assert(task.partitionId === 2) 1302 runEvent(makeCompletionEvent( 1303 task, 1304 Success, 1305 makeMapStatus("hostC", shuffleMapRdd.partitions.length))) 1306 1307 // now here is where things get tricky : we will now have a task set representing 1308 // the second attempt for stage 1, but we *also* have some tasks for the first attempt for 1309 // stage 1 still going 1310 val stage1Resubmit = taskSets(3) 1311 assert(stage1Resubmit.stageId == 1) 1312 assert(stage1Resubmit.stageAttemptId === 1) 1313 assert(stage1Resubmit.tasks.length === 3) 1314 1315 // we'll have some tasks finish from the first attempt, and some finish from the second attempt, 1316 // so that we actually have all stage outputs, though no attempt has completed all its 1317 // tasks 1318 runEvent(makeCompletionEvent( 1319 taskSets(3).tasks(0), 1320 Success, 1321 makeMapStatus("hostC", reduceRdd.partitions.length))) 1322 runEvent(makeCompletionEvent( 1323 taskSets(3).tasks(1), 1324 Success, 1325 makeMapStatus("hostC", reduceRdd.partitions.length))) 1326 // late task finish from the first attempt 1327 runEvent(makeCompletionEvent( 1328 taskSets(1).tasks(2), 1329 Success, 1330 makeMapStatus("hostB", reduceRdd.partitions.length))) 1331 1332 // What should happen now is that we submit stage 2. However, we might not see an error 1333 // b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But 1334 // we can check some conditions. 1335 // Note that the really important thing here is not so much that we submit stage 2 *immediately* 1336 // but that we don't end up with some error from these interleaved completions. It would also 1337 // be OK (though sub-optimal) if stage 2 simply waited until the resubmission of stage 1 had 1338 // all its tasks complete 1339 1340 // check that we have all the map output for stage 0 (it should have been there even before 1341 // the last round of completions from stage 1, but just to double check it hasn't been messed 1342 // up) and also the newly available stage 1 1343 val stageToReduceIdxs = Seq( 1344 0 -> (0 until 3), 1345 1 -> (0 until 1) 1346 ) 1347 for { 1348 (stage, reduceIdxs) <- stageToReduceIdxs 1349 reduceIdx <- reduceIdxs 1350 } { 1351 // this would throw an exception if the map status hadn't been registered 1352 val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx) 1353 // really we should have already thrown an exception rather than fail either of these 1354 // asserts, but just to be extra defensive let's double check the statuses are OK 1355 assert(statuses != null) 1356 assert(statuses.nonEmpty) 1357 } 1358 1359 // and check that stage 2 has been submitted 1360 assert(taskSets.size == 5) 1361 val stage2TaskSet = taskSets(4) 1362 assert(stage2TaskSet.stageId == 2) 1363 assert(stage2TaskSet.stageAttemptId == 0) 1364 } 1365 1366 /** 1367 * We lose an executor after completing some shuffle map tasks on it. Those tasks get 1368 * resubmitted, and when they finish the job completes normally 1369 */ 1370 test("register map outputs correctly after ExecutorLost and task Resubmitted") { 1371 val firstRDD = new MyRDD(sc, 3, Nil) 1372 val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) 1373 val reduceRdd = new MyRDD(sc, 5, List(firstShuffleDep)) 1374 submit(reduceRdd, Array(0)) 1375 1376 // complete some of the tasks from the first stage, on one host 1377 runEvent(makeCompletionEvent( 1378 taskSets(0).tasks(0), 1379 Success, 1380 makeMapStatus("hostA", reduceRdd.partitions.length))) 1381 runEvent(makeCompletionEvent( 1382 taskSets(0).tasks(1), 1383 Success, 1384 makeMapStatus("hostA", reduceRdd.partitions.length))) 1385 1386 // now that host goes down 1387 runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) 1388 1389 // so we resubmit those tasks 1390 runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null)) 1391 runEvent(makeCompletionEvent(taskSets(0).tasks(1), Resubmitted, null)) 1392 1393 // now complete everything on a different host 1394 complete(taskSets(0), Seq( 1395 (Success, makeMapStatus("hostB", reduceRdd.partitions.length)), 1396 (Success, makeMapStatus("hostB", reduceRdd.partitions.length)), 1397 (Success, makeMapStatus("hostB", reduceRdd.partitions.length)) 1398 )) 1399 1400 // now we should submit stage 1, and the map output from stage 0 should be registered 1401 1402 // check that we have all the map output for stage 0 1403 (0 until reduceRdd.partitions.length).foreach { reduceIdx => 1404 val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx) 1405 // really we should have already thrown an exception rather than fail either of these 1406 // asserts, but just to be extra defensive let's double check the statuses are OK 1407 assert(statuses != null) 1408 assert(statuses.nonEmpty) 1409 } 1410 1411 // and check that stage 1 has been submitted 1412 assert(taskSets.size == 2) 1413 val stage1TaskSet = taskSets(1) 1414 assert(stage1TaskSet.stageId == 1) 1415 assert(stage1TaskSet.stageAttemptId == 0) 1416 } 1417 1418 /** 1419 * Makes sure that failures of stage used by multiple jobs are correctly handled. 1420 * 1421 * This test creates the following dependency graph: 1422 * 1423 * shuffleMapRdd1 shuffleMapRDD2 1424 * | \ | 1425 * | \ | 1426 * | \ | 1427 * | \ | 1428 * reduceRdd1 reduceRdd2 1429 * 1430 * We start both shuffleMapRdds and then fail shuffleMapRdd1. As a result, the job listeners for 1431 * reduceRdd1 and reduceRdd2 should both be informed that the job failed. shuffleMapRDD2 should 1432 * also be cancelled, because it is only used by reduceRdd2 and reduceRdd2 cannot complete 1433 * without shuffleMapRdd1. 1434 */ 1435 test("failure of stage used by two jobs") { 1436 val shuffleMapRdd1 = new MyRDD(sc, 2, Nil) 1437 val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2)) 1438 val shuffleMapRdd2 = new MyRDD(sc, 2, Nil) 1439 val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(2)) 1440 1441 val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker) 1442 val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2), tracker = mapOutputTracker) 1443 1444 // We need to make our own listeners for this test, since by default submit uses the same 1445 // listener for all jobs, and here we want to capture the failure for each job separately. 1446 class FailureRecordingJobListener() extends JobListener { 1447 var failureMessage: String = _ 1448 override def taskSucceeded(index: Int, result: Any) {} 1449 override def jobFailed(exception: Exception): Unit = { failureMessage = exception.getMessage } 1450 } 1451 val listener1 = new FailureRecordingJobListener() 1452 val listener2 = new FailureRecordingJobListener() 1453 1454 submit(reduceRdd1, Array(0, 1), listener = listener1) 1455 submit(reduceRdd2, Array(0, 1), listener = listener2) 1456 1457 val stageFailureMessage = "Exception failure in map stage" 1458 failed(taskSets(0), stageFailureMessage) 1459 1460 assert(cancelledStages.toSet === Set(0, 2)) 1461 1462 // Make sure the listeners got told about both failed stages. 1463 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) 1464 assert(sparkListener.successfulStages.isEmpty) 1465 assert(sparkListener.failedStages.toSet === Set(0, 2)) 1466 1467 assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") 1468 assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") 1469 assertDataStructuresEmpty() 1470 } 1471 1472 def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = { 1473 assert(taskSet.properties != null) 1474 assert(taskSet.properties.getProperty("testProperty") === expected) 1475 assert(taskSet.priority === priority) 1476 } 1477 1478 def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = { 1479 val baseRdd = new MyRDD(sc, 1, Nil) 1480 val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1)) 1481 val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1)) 1482 val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1)) 1483 val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2)) 1484 val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2)) 1485 val job1Properties = new Properties() 1486 val job2Properties = new Properties() 1487 job1Properties.setProperty("testProperty", "job1") 1488 job2Properties.setProperty("testProperty", "job2") 1489 1490 // Run jobs 1 & 2, both referencing the same stage, then cancel job1. 1491 // Note that we have to submit job2 before we cancel job1 to have them actually share 1492 // *Stages*, and not just shuffle dependencies, due to skipped stages (at least until 1493 // we address SPARK-10193.) 1494 val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties) 1495 val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties) 1496 assert(scheduler.activeJobs.nonEmpty) 1497 val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty") 1498 1499 // remove job1 as an ActiveJob 1500 cancel(jobId1) 1501 1502 // job2 should still be running 1503 assert(scheduler.activeJobs.nonEmpty) 1504 val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty") 1505 assert(testProperty1 != testProperty2) 1506 // NB: This next assert isn't necessarily the "desired" behavior; it's just to document 1507 // the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but 1508 // even though we have cancelled that job and are now running it because of job2, we haven't 1509 // updated the TaskSet's properties. Changing the properties to "job2" is likely the more 1510 // correct behavior. 1511 val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob 1512 checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id) 1513 complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) 1514 1515 shuffleDep1 1516 } 1517 1518 /** 1519 * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a 1520 * later, active job if they were previously run under a job that is no longer active 1521 */ 1522 test("stage used by two jobs, the first no longer active (SPARK-6880)") { 1523 launchJobsThatShareStageAndCancelFirst() 1524 1525 // The next check is the key for SPARK-6880. For the stage which was shared by both job1 and 1526 // job2 but never had any tasks submitted for job1, the properties of job2 are now used to run 1527 // the stage. 1528 checkJobPropertiesAndPriority(taskSets(1), "job2", 1) 1529 1530 complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) 1531 assert(taskSets(2).properties != null) 1532 complete(taskSets(2), Seq((Success, 42))) 1533 assert(results === Map(0 -> 42)) 1534 assert(scheduler.activeJobs.isEmpty) 1535 1536 assertDataStructuresEmpty() 1537 } 1538 1539 /** 1540 * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a 1541 * later, active job if they were previously run under a job that is no longer active, even when 1542 * there are fetch failures 1543 */ 1544 test("stage used by two jobs, some fetch failures, and the first job no longer active " + 1545 "(SPARK-6880)") { 1546 val shuffleDep1 = launchJobsThatShareStageAndCancelFirst() 1547 val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob 1548 1549 // lets say there is a fetch failure in this task set, which makes us go back and 1550 // run stage 0, attempt 1 1551 complete(taskSets(1), Seq( 1552 (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null))) 1553 scheduler.resubmitFailedStages() 1554 1555 // stage 0, attempt 1 should have the properties of job2 1556 assert(taskSets(2).stageId === 0) 1557 assert(taskSets(2).stageAttemptId === 1) 1558 checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id) 1559 1560 // run the rest of the stages normally, checking that they have the correct properties 1561 complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) 1562 checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id) 1563 complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1)))) 1564 checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id) 1565 complete(taskSets(4), Seq((Success, 42))) 1566 assert(results === Map(0 -> 42)) 1567 assert(scheduler.activeJobs.isEmpty) 1568 1569 assertDataStructuresEmpty() 1570 } 1571 1572 test("run trivial shuffle with out-of-band failure and retry") { 1573 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 1574 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 1575 val shuffleId = shuffleDep.shuffleId 1576 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) 1577 submit(reduceRdd, Array(0)) 1578 // blockManagerMaster.removeExecutor("exec-hostA") 1579 // pretend we were told hostA went away 1580 runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) 1581 // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks 1582 // rather than marking it is as failed and waiting. 1583 complete(taskSets(0), Seq( 1584 (Success, makeMapStatus("hostA", 1)), 1585 (Success, makeMapStatus("hostB", 1)))) 1586 // have hostC complete the resubmitted task 1587 complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) 1588 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === 1589 HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) 1590 complete(taskSets(2), Seq((Success, 42))) 1591 assert(results === Map(0 -> 42)) 1592 assertDataStructuresEmpty() 1593 } 1594 1595 test("recursive shuffle failures") { 1596 val shuffleOneRdd = new MyRDD(sc, 2, Nil) 1597 val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2)) 1598 val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker) 1599 val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1)) 1600 val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker) 1601 submit(finalRdd, Array(0)) 1602 // have the first stage complete normally 1603 complete(taskSets(0), Seq( 1604 (Success, makeMapStatus("hostA", 2)), 1605 (Success, makeMapStatus("hostB", 2)))) 1606 // have the second stage complete normally 1607 complete(taskSets(1), Seq( 1608 (Success, makeMapStatus("hostA", 1)), 1609 (Success, makeMapStatus("hostC", 1)))) 1610 // fail the third stage because hostA went down 1611 complete(taskSets(2), Seq( 1612 (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null))) 1613 // TODO assert this: 1614 // blockManagerMaster.removeExecutor("exec-hostA") 1615 // have DAGScheduler try again 1616 scheduler.resubmitFailedStages() 1617 complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) 1618 complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) 1619 complete(taskSets(5), Seq((Success, 42))) 1620 assert(results === Map(0 -> 42)) 1621 assertDataStructuresEmpty() 1622 } 1623 1624 test("cached post-shuffle") { 1625 val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache() 1626 val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2)) 1627 val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache() 1628 val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1)) 1629 val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker) 1630 submit(finalRdd, Array(0)) 1631 cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) 1632 cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) 1633 // complete stage 0 1634 complete(taskSets(0), Seq( 1635 (Success, makeMapStatus("hostA", 2)), 1636 (Success, makeMapStatus("hostB", 2)))) 1637 // complete stage 1 1638 complete(taskSets(1), Seq( 1639 (Success, makeMapStatus("hostA", 1)), 1640 (Success, makeMapStatus("hostB", 1)))) 1641 // pretend stage 2 failed because hostA went down 1642 complete(taskSets(2), Seq( 1643 (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null))) 1644 // TODO assert this: 1645 // blockManagerMaster.removeExecutor("exec-hostA") 1646 // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. 1647 scheduler.resubmitFailedStages() 1648 assertLocations(taskSets(3), Seq(Seq("hostD"))) 1649 // allow hostD to recover 1650 complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1)))) 1651 complete(taskSets(4), Seq((Success, 42))) 1652 assert(results === Map(0 -> 42)) 1653 assertDataStructuresEmpty() 1654 } 1655 1656 test("misbehaved accumulator should not crash DAGScheduler and SparkContext") { 1657 val acc = new LongAccumulator { 1658 override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException 1659 override def add(v: Long): Unit = throw new DAGSchedulerSuiteDummyException 1660 } 1661 sc.register(acc) 1662 1663 // Run this on executors 1664 sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } 1665 1666 // Make sure we can still run commands 1667 assert(sc.parallelize(1 to 10, 2).count() === 10) 1668 } 1669 1670 /** 1671 * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException. 1672 * Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException. 1673 * If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions 1674 * and their differing causes as to which will represent result for job... 1675 */ 1676 test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") { 1677 val e = intercept[SparkDriverExecutionException] { 1678 // Number of parallelized partitions implies number of tasks of job 1679 val rdd = sc.parallelize(1 to 10, 2) 1680 sc.runJob[Int, Int]( 1681 rdd, 1682 (context: TaskContext, iter: Iterator[Int]) => iter.size, 1683 // For a robust test assertion, limit number of job tasks to 1; that is, 1684 // if multiple RDD partitions, use id of any one partition, say, first partition id=0 1685 Seq(0), 1686 (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) 1687 } 1688 assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) 1689 1690 // Make sure we can still run commands on our SparkContext 1691 assert(sc.parallelize(1 to 10, 2).count() === 10) 1692 } 1693 1694 test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") { 1695 val e1 = intercept[DAGSchedulerSuiteDummyException] { 1696 val rdd = new MyRDD(sc, 2, Nil) { 1697 override def getPartitions: Array[Partition] = { 1698 throw new DAGSchedulerSuiteDummyException 1699 } 1700 } 1701 rdd.reduceByKey(_ + _, 1).count() 1702 } 1703 1704 // Make sure we can still run commands 1705 assert(sc.parallelize(1 to 10, 2).count() === 10) 1706 } 1707 1708 test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") { 1709 val e1 = intercept[SparkException] { 1710 val rdd = new MyRDD(sc, 2, Nil) { 1711 override def getPreferredLocations(split: Partition): Seq[String] = { 1712 throw new DAGSchedulerSuiteDummyException 1713 } 1714 } 1715 rdd.count() 1716 } 1717 assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName)) 1718 1719 // Make sure we can still run commands 1720 assert(sc.parallelize(1 to 10, 2).count() === 10) 1721 } 1722 1723 test("accumulator not calculated for resubmitted result stage") { 1724 // just for register 1725 val accum = AccumulatorSuite.createLongAccum("a") 1726 val finalRdd = new MyRDD(sc, 1, Nil) 1727 submit(finalRdd, Array(0)) 1728 completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) 1729 completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) 1730 assert(results === Map(0 -> 42)) 1731 1732 assert(accum.value === 1) 1733 assertDataStructuresEmpty() 1734 } 1735 1736 test("accumulators are updated on exception failures") { 1737 val acc1 = AccumulatorSuite.createLongAccum("ingenieur") 1738 val acc2 = AccumulatorSuite.createLongAccum("boulanger") 1739 val acc3 = AccumulatorSuite.createLongAccum("agriculteur") 1740 assert(AccumulatorContext.get(acc1.id).isDefined) 1741 assert(AccumulatorContext.get(acc2.id).isDefined) 1742 assert(AccumulatorContext.get(acc3.id).isDefined) 1743 val accUpdate1 = new LongAccumulator 1744 accUpdate1.metadata = acc1.metadata 1745 accUpdate1.setValue(15) 1746 val accUpdate2 = new LongAccumulator 1747 accUpdate2.metadata = acc2.metadata 1748 accUpdate2.setValue(13) 1749 val accUpdate3 = new LongAccumulator 1750 accUpdate3.metadata = acc3.metadata 1751 accUpdate3.setValue(18) 1752 val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) 1753 val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) 1754 val exceptionFailure = new ExceptionFailure( 1755 new SparkException("fondue?"), 1756 accumInfo).copy(accums = accumUpdates) 1757 submit(new MyRDD(sc, 1, Nil), Array(0)) 1758 runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result")) 1759 assert(AccumulatorContext.get(acc1.id).get.value === 15L) 1760 assert(AccumulatorContext.get(acc2.id).get.value === 13L) 1761 assert(AccumulatorContext.get(acc3.id).get.value === 18L) 1762 } 1763 1764 test("reduce tasks should be placed locally with map output") { 1765 // Create a shuffleMapRdd with 1 partition 1766 val shuffleMapRdd = new MyRDD(sc, 1, Nil) 1767 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 1768 val shuffleId = shuffleDep.shuffleId 1769 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) 1770 submit(reduceRdd, Array(0)) 1771 complete(taskSets(0), Seq( 1772 (Success, makeMapStatus("hostA", 1)))) 1773 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === 1774 HashSet(makeBlockManagerId("hostA"))) 1775 1776 // Reducer should run on the same host that map task ran 1777 val reduceTaskSet = taskSets(1) 1778 assertLocations(reduceTaskSet, Seq(Seq("hostA"))) 1779 complete(reduceTaskSet, Seq((Success, 42))) 1780 assert(results === Map(0 -> 42)) 1781 assertDataStructuresEmpty() 1782 } 1783 1784 test("reduce task locality preferences should only include machines with largest map outputs") { 1785 val numMapTasks = 4 1786 // Create a shuffleMapRdd with more partitions 1787 val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil) 1788 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) 1789 val shuffleId = shuffleDep.shuffleId 1790 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) 1791 submit(reduceRdd, Array(0)) 1792 1793 val statuses = (1 to numMapTasks).map { i => 1794 (Success, makeMapStatus("host" + i, 1, (10*i).toByte)) 1795 } 1796 complete(taskSets(0), statuses) 1797 1798 // Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data 1799 val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1) 1800 1801 val reduceTaskSet = taskSets(1) 1802 assertLocations(reduceTaskSet, Seq(hosts)) 1803 complete(reduceTaskSet, Seq((Success, 42))) 1804 assert(results === Map(0 -> 42)) 1805 assertDataStructuresEmpty() 1806 } 1807 1808 test("stages with both narrow and shuffle dependencies use narrow ones for locality") { 1809 // Create an RDD that has both a shuffle dependency and a narrow dependency (e.g. for a join) 1810 val rdd1 = new MyRDD(sc, 1, Nil) 1811 val rdd2 = new MyRDD(sc, 1, Nil, locations = Seq(Seq("hostB"))) 1812 val shuffleDep = new ShuffleDependency(rdd1, new HashPartitioner(1)) 1813 val narrowDep = new OneToOneDependency(rdd2) 1814 val shuffleId = shuffleDep.shuffleId 1815 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep, narrowDep), tracker = mapOutputTracker) 1816 submit(reduceRdd, Array(0)) 1817 complete(taskSets(0), Seq( 1818 (Success, makeMapStatus("hostA", 1)))) 1819 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === 1820 HashSet(makeBlockManagerId("hostA"))) 1821 1822 // Reducer should run where RDD 2 has preferences, even though though it also has a shuffle dep 1823 val reduceTaskSet = taskSets(1) 1824 assertLocations(reduceTaskSet, Seq(Seq("hostB"))) 1825 complete(reduceTaskSet, Seq((Success, 42))) 1826 assert(results === Map(0 -> 42)) 1827 assertDataStructuresEmpty() 1828 } 1829 1830 test("Spark exceptions should include call site in stack trace") { 1831 val e = intercept[SparkException] { 1832 sc.parallelize(1 to 10, 2).map { _ => throw new RuntimeException("uh-oh!") }.count() 1833 } 1834 1835 // Does not include message, ONLY stack trace. 1836 val stackTraceString = Utils.exceptionString(e) 1837 1838 // should actually include the RDD operation that invoked the method: 1839 assert(stackTraceString.contains("org.apache.spark.rdd.RDD.count")) 1840 1841 // should include the FunSuite setup: 1842 assert(stackTraceString.contains("org.scalatest.FunSuite")) 1843 } 1844 1845 test("catch errors in event loop") { 1846 // this is a test of our testing framework -- make sure errors in event loop don't get ignored 1847 1848 // just run some bad event that will throw an exception -- we'll give a null TaskEndReason 1849 val rdd1 = new MyRDD(sc, 1, Nil) 1850 submit(rdd1, Array(0)) 1851 intercept[Exception] { 1852 complete(taskSets(0), Seq( 1853 (null, makeMapStatus("hostA", 1)))) 1854 } 1855 } 1856 1857 test("simple map stage submission") { 1858 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 1859 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) 1860 val shuffleId = shuffleDep.shuffleId 1861 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) 1862 1863 // Submit a map stage by itself 1864 submitMapStage(shuffleDep) 1865 assert(results.size === 0) // No results yet 1866 completeShuffleMapStageSuccessfully(0, 0, 1) 1867 assert(results.size === 1) 1868 results.clear() 1869 assertDataStructuresEmpty() 1870 1871 // Submit a reduce job that depends on this map stage; it should directly do the reduce 1872 submit(reduceRdd, Array(0)) 1873 completeNextResultStageWithSuccess(2, 0) 1874 assert(results === Map(0 -> 42)) 1875 results.clear() 1876 assertDataStructuresEmpty() 1877 1878 // Check that if we submit the map stage again, no tasks run 1879 submitMapStage(shuffleDep) 1880 assert(results.size === 1) 1881 assertDataStructuresEmpty() 1882 } 1883 1884 test("map stage submission with reduce stage also depending on the data") { 1885 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 1886 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) 1887 val shuffleId = shuffleDep.shuffleId 1888 val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) 1889 1890 // Submit the map stage by itself 1891 submitMapStage(shuffleDep) 1892 1893 // Submit a reduce job that depends on this map stage 1894 submit(reduceRdd, Array(0)) 1895 1896 // Complete tasks for the map stage 1897 completeShuffleMapStageSuccessfully(0, 0, 1) 1898 assert(results.size === 1) 1899 results.clear() 1900 1901 // Complete tasks for the reduce stage 1902 completeNextResultStageWithSuccess(1, 0) 1903 assert(results === Map(0 -> 42)) 1904 results.clear() 1905 assertDataStructuresEmpty() 1906 1907 // Check that if we submit the map stage again, no tasks run 1908 submitMapStage(shuffleDep) 1909 assert(results.size === 1) 1910 assertDataStructuresEmpty() 1911 } 1912 1913 test("map stage submission with fetch failure") { 1914 val shuffleMapRdd = new MyRDD(sc, 2, Nil) 1915 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 1916 val shuffleId = shuffleDep.shuffleId 1917 val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 1918 1919 // Submit a map stage by itself 1920 submitMapStage(shuffleDep) 1921 complete(taskSets(0), Seq( 1922 (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), 1923 (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) 1924 assert(results.size === 1) 1925 results.clear() 1926 assertDataStructuresEmpty() 1927 1928 // Submit a reduce job that depends on this map stage, but where one reduce will fail a fetch 1929 submit(reduceRdd, Array(0, 1)) 1930 complete(taskSets(1), Seq( 1931 (Success, 42), 1932 (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null))) 1933 // Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch 1934 // from, then TaskSet 3 will run the reduce stage 1935 scheduler.resubmitFailedStages() 1936 complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length)))) 1937 complete(taskSets(3), Seq((Success, 43))) 1938 assert(results === Map(0 -> 42, 1 -> 43)) 1939 results.clear() 1940 assertDataStructuresEmpty() 1941 1942 // Run another reduce job without a failure; this should just work 1943 submit(reduceRdd, Array(0, 1)) 1944 complete(taskSets(4), Seq( 1945 (Success, 44), 1946 (Success, 45))) 1947 assert(results === Map(0 -> 44, 1 -> 45)) 1948 results.clear() 1949 assertDataStructuresEmpty() 1950 1951 // Resubmit the map stage; this should also just work 1952 submitMapStage(shuffleDep) 1953 assert(results.size === 1) 1954 results.clear() 1955 assertDataStructuresEmpty() 1956 } 1957 1958 /** 1959 * In this test, we have three RDDs with shuffle dependencies, and we submit map stage jobs 1960 * that are waiting on each one, as well as a reduce job on the last one. We test that all of 1961 * these jobs complete even if there are some fetch failures in both shuffles. 1962 */ 1963 test("map stage submission with multiple shared stages and failures") { 1964 val rdd1 = new MyRDD(sc, 2, Nil) 1965 val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2)) 1966 val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker) 1967 val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2)) 1968 val rdd3 = new MyRDD(sc, 2, List(dep2), tracker = mapOutputTracker) 1969 1970 val listener1 = new SimpleListener 1971 val listener2 = new SimpleListener 1972 val listener3 = new SimpleListener 1973 1974 submitMapStage(dep1, listener1) 1975 submitMapStage(dep2, listener2) 1976 submit(rdd3, Array(0, 1), listener = listener3) 1977 1978 // Complete the first stage 1979 assert(taskSets(0).stageId === 0) 1980 complete(taskSets(0), Seq( 1981 (Success, makeMapStatus("hostA", rdd1.partitions.length)), 1982 (Success, makeMapStatus("hostB", rdd1.partitions.length)))) 1983 assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === 1984 HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) 1985 assert(listener1.results.size === 1) 1986 1987 // When attempting the second stage, show a fetch failure 1988 assert(taskSets(1).stageId === 1) 1989 complete(taskSets(1), Seq( 1990 (Success, makeMapStatus("hostA", rdd2.partitions.length)), 1991 (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null))) 1992 scheduler.resubmitFailedStages() 1993 assert(listener2.results.size === 0) // Second stage listener should not have a result yet 1994 1995 // Stage 0 should now be running as task set 2; make its task succeed 1996 assert(taskSets(2).stageId === 0) 1997 complete(taskSets(2), Seq( 1998 (Success, makeMapStatus("hostC", rdd2.partitions.length)))) 1999 assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === 2000 HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) 2001 assert(listener2.results.size === 0) // Second stage listener should still not have a result 2002 2003 // Stage 1 should now be running as task set 3; make its first task succeed 2004 assert(taskSets(3).stageId === 1) 2005 complete(taskSets(3), Seq( 2006 (Success, makeMapStatus("hostB", rdd2.partitions.length)), 2007 (Success, makeMapStatus("hostD", rdd2.partitions.length)))) 2008 assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet === 2009 HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD"))) 2010 assert(listener2.results.size === 1) 2011 2012 // Finally, the reduce job should be running as task set 4; make it see a fetch failure, 2013 // then make it run again and succeed 2014 assert(taskSets(4).stageId === 2) 2015 complete(taskSets(4), Seq( 2016 (Success, 52), 2017 (FetchFailed(makeBlockManagerId("hostD"), dep2.shuffleId, 0, 0, "ignored"), null))) 2018 scheduler.resubmitFailedStages() 2019 2020 // TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2 2021 assert(taskSets(5).stageId === 1) 2022 complete(taskSets(5), Seq( 2023 (Success, makeMapStatus("hostE", rdd2.partitions.length)))) 2024 complete(taskSets(6), Seq( 2025 (Success, 53))) 2026 assert(listener3.results === Map(0 -> 52, 1 -> 53)) 2027 assertDataStructuresEmpty() 2028 } 2029 2030 /** 2031 * In this test, we run a map stage where one of the executors fails but we still receive a 2032 * "zombie" complete message from that executor. We want to make sure the stage is not reported 2033 * as done until all tasks have completed. 2034 */ 2035 test("map stage submission with executor failure late map task completions") { 2036 val shuffleMapRdd = new MyRDD(sc, 3, Nil) 2037 val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) 2038 2039 submitMapStage(shuffleDep) 2040 2041 val oldTaskSet = taskSets(0) 2042 runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2))) 2043 assert(results.size === 0) // Map stage job should not be complete yet 2044 2045 // Pretend host A was lost 2046 val oldEpoch = mapOutputTracker.getEpoch 2047 runEvent(ExecutorLost("exec-hostA", ExecutorKilled)) 2048 val newEpoch = mapOutputTracker.getEpoch 2049 assert(newEpoch > oldEpoch) 2050 2051 // Suppose we also get a completed event from task 1 on the same host; this should be ignored 2052 runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2))) 2053 assert(results.size === 0) // Map stage job should not be complete yet 2054 2055 // A completion from another task should work because it's a non-failed host 2056 runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2))) 2057 assert(results.size === 0) // Map stage job should not be complete yet 2058 2059 // Now complete tasks in the second task set 2060 val newTaskSet = taskSets(1) 2061 assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA 2062 runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2))) 2063 assert(results.size === 0) // Map stage job should not be complete yet 2064 runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2))) 2065 assert(results.size === 1) // Map stage job should now finally be complete 2066 assertDataStructuresEmpty() 2067 2068 // Also test that a reduce stage using this shuffled data can immediately run 2069 val reduceRDD = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) 2070 results.clear() 2071 submit(reduceRDD, Array(0, 1)) 2072 complete(taskSets(2), Seq((Success, 42), (Success, 43))) 2073 assert(results === Map(0 -> 42, 1 -> 43)) 2074 results.clear() 2075 assertDataStructuresEmpty() 2076 } 2077 2078 /** 2079 * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that 2080 * getShuffleDependencies correctly returns the direct shuffle dependencies of a particular 2081 * RDD. The test creates the following RDD graph (where n denotes a narrow dependency and s 2082 * denotes a shuffle dependency): 2083 * 2084 * A <------------s---------, 2085 * \ 2086 * B <--s-- C <--s-- D <--n---`-- E 2087 * 2088 * Here, the direct shuffle dependency of C is just the shuffle dependency on B. The direct 2089 * shuffle dependencies of E are the shuffle dependency on A and the shuffle dependency on C. 2090 */ 2091 test("getShuffleDependencies correctly returns only direct shuffle parents") { 2092 val rddA = new MyRDD(sc, 2, Nil) 2093 val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) 2094 val rddB = new MyRDD(sc, 2, Nil) 2095 val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1)) 2096 val rddC = new MyRDD(sc, 1, List(shuffleDepB)) 2097 val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1)) 2098 val rddD = new MyRDD(sc, 1, List(shuffleDepC)) 2099 val narrowDepD = new OneToOneDependency(rddD) 2100 val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker = mapOutputTracker) 2101 2102 assert(scheduler.getShuffleDependencies(rddA) === Set()) 2103 assert(scheduler.getShuffleDependencies(rddB) === Set()) 2104 assert(scheduler.getShuffleDependencies(rddC) === Set(shuffleDepB)) 2105 assert(scheduler.getShuffleDependencies(rddD) === Set(shuffleDepC)) 2106 assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) 2107 } 2108 2109 test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" + 2110 "still behave correctly on fetch failures") { 2111 // Runs a job that always encounters a fetch failure, so should eventually be aborted 2112 def runJobWithPersistentFetchFailure: Unit = { 2113 val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() 2114 val shuffleHandle = 2115 rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle 2116 rdd1.map { 2117 case (x, _) if (x == 1) => 2118 throw new FetchFailedException( 2119 BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") 2120 case (x, _) => x 2121 }.count() 2122 } 2123 2124 // Runs a job that encounters a single fetch failure but succeeds on the second attempt 2125 def runJobWithTemporaryFetchFailure: Unit = { 2126 object FailThisAttempt { 2127 val _fail = new AtomicBoolean(true) 2128 } 2129 val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey() 2130 val shuffleHandle = 2131 rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle 2132 rdd1.map { 2133 case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) => 2134 throw new FetchFailedException( 2135 BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test") 2136 } 2137 } 2138 2139 failAfter(10.seconds) { 2140 val e = intercept[SparkException] { 2141 runJobWithPersistentFetchFailure 2142 } 2143 assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) 2144 } 2145 2146 // Run a second job that will fail due to a fetch failure. 2147 // This job will hang without the fix for SPARK-17644. 2148 failAfter(10.seconds) { 2149 val e = intercept[SparkException] { 2150 runJobWithPersistentFetchFailure 2151 } 2152 assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException")) 2153 } 2154 2155 failAfter(10.seconds) { 2156 try { 2157 runJobWithTemporaryFetchFailure 2158 } catch { 2159 case e: Throwable => fail("A job with one fetch failure should eventually succeed") 2160 } 2161 } 2162 } 2163 2164 /** 2165 * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. 2166 * Note that this checks only the host and not the executor ID. 2167 */ 2168 private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) { 2169 assert(hosts.size === taskSet.tasks.size) 2170 for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) { 2171 assert(taskLocs.map(_.host).toSet === expectedLocs.toSet) 2172 } 2173 } 2174 2175 private def assertDataStructuresEmpty(): Unit = { 2176 assert(scheduler.activeJobs.isEmpty) 2177 assert(scheduler.failedStages.isEmpty) 2178 assert(scheduler.jobIdToActiveJob.isEmpty) 2179 assert(scheduler.jobIdToStageIds.isEmpty) 2180 assert(scheduler.stageIdToStage.isEmpty) 2181 assert(scheduler.runningStages.isEmpty) 2182 assert(scheduler.shuffleIdToMapStage.isEmpty) 2183 assert(scheduler.waitingStages.isEmpty) 2184 assert(scheduler.outputCommitCoordinator.isEmpty) 2185 } 2186 2187 // Nothing in this test should break if the task info's fields are null, but 2188 // OutputCommitCoordinator requires the task info itself to not be null. 2189 private def createFakeTaskInfo(): TaskInfo = { 2190 val info = new TaskInfo(0, 0, 0, 0L, "", "", TaskLocality.ANY, false) 2191 info.finishTime = 1 // to prevent spurious errors in JobProgressListener 2192 info 2193 } 2194 2195 private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = { 2196 val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false) 2197 info.finishTime = 1 // to prevent spurious errors in JobProgressListener 2198 info 2199 } 2200 2201 private def makeCompletionEvent( 2202 task: Task[_], 2203 reason: TaskEndReason, 2204 result: Any, 2205 extraAccumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty, 2206 taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = { 2207 val accumUpdates = reason match { 2208 case Success => task.metrics.accumulators() 2209 case ef: ExceptionFailure => ef.accums 2210 case _ => Seq.empty 2211 } 2212 CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo) 2213 } 2214} 2215 2216object DAGSchedulerSuite { 2217 def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus = 2218 MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes)) 2219 2220 def makeBlockManagerId(host: String): BlockManagerId = 2221 BlockManagerId("exec-" + host, host, 12345) 2222} 2223