1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.scheduler
19
20import java.util.Properties
21import java.util.concurrent.atomic.AtomicBoolean
22
23import scala.annotation.meta.param
24import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
25import scala.language.reflectiveCalls
26import scala.util.control.NonFatal
27
28import org.scalatest.concurrent.Timeouts
29import org.scalatest.time.SpanSugar._
30
31import org.apache.spark._
32import org.apache.spark.broadcast.BroadcastManager
33import org.apache.spark.rdd.RDD
34import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
35import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
36import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
37import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils}
38
39class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
40  extends DAGSchedulerEventProcessLoop(dagScheduler) {
41
42  override def post(event: DAGSchedulerEvent): Unit = {
43    try {
44      // Forward event to `onReceive` directly to avoid processing event asynchronously.
45      onReceive(event)
46    } catch {
47      case NonFatal(e) => onError(e)
48    }
49  }
50
51  override def onError(e: Throwable): Unit = {
52    logError("Error in DAGSchedulerEventLoop: ", e)
53    dagScheduler.stop()
54    throw e
55  }
56
57}
58
59/**
60 * An RDD for passing to DAGScheduler. These RDDs will use the dependencies and
61 * preferredLocations (if any) that are passed to them. They are deliberately not executable
62 * so we can test that DAGScheduler does not try to execute RDDs locally.
63 *
64 * Optionally, one can pass in a list of locations to use as preferred locations for each task,
65 * and a MapOutputTrackerMaster to enable reduce task locality. We pass the tracker separately
66 * because, in this test suite, it won't be the same as sc.env.mapOutputTracker.
67 */
68class MyRDD(
69    sc: SparkContext,
70    numPartitions: Int,
71    dependencies: List[Dependency[_]],
72    locations: Seq[Seq[String]] = Nil,
73    @(transient @param) tracker: MapOutputTrackerMaster = null)
74  extends RDD[(Int, Int)](sc, dependencies) with Serializable {
75
76  override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
77    throw new RuntimeException("should not be reached")
78
79  override def getPartitions: Array[Partition] = (0 until numPartitions).map(i => new Partition {
80    override def index: Int = i
81  }).toArray
82
83  override def getPreferredLocations(partition: Partition): Seq[String] = {
84    if (locations.isDefinedAt(partition.index)) {
85      locations(partition.index)
86    } else if (tracker != null && dependencies.size == 1 &&
87        dependencies(0).isInstanceOf[ShuffleDependency[_, _, _]]) {
88      // If we have only one shuffle dependency, use the same code path as ShuffledRDD for locality
89      val dep = dependencies(0).asInstanceOf[ShuffleDependency[_, _, _]]
90      tracker.getPreferredLocationsForShuffle(dep, partition.index)
91    } else {
92      Nil
93    }
94  }
95
96  override def toString: String = "DAGSchedulerSuiteRDD " + id
97}
98
99class DAGSchedulerSuiteDummyException extends Exception
100
101class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeouts {
102
103  import DAGSchedulerSuite._
104
105  val conf = new SparkConf
106  /** Set of TaskSets the DAGScheduler has requested executed. */
107  val taskSets = scala.collection.mutable.Buffer[TaskSet]()
108
109  /** Stages for which the DAGScheduler has called TaskScheduler.cancelTasks(). */
110  val cancelledStages = new HashSet[Int]()
111
112  val taskScheduler = new TaskScheduler() {
113    override def rootPool: Pool = null
114    override def schedulingMode: SchedulingMode = SchedulingMode.NONE
115    override def start() = {}
116    override def stop() = {}
117    override def executorHeartbeatReceived(
118        execId: String,
119        accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
120        blockManagerId: BlockManagerId): Boolean = true
121    override def submitTasks(taskSet: TaskSet) = {
122      // normally done by TaskSetManager
123      taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
124      taskSets += taskSet
125    }
126    override def cancelTasks(stageId: Int, interruptThread: Boolean) {
127      cancelledStages += stageId
128    }
129    override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
130    override def defaultParallelism() = 2
131    override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
132    override def applicationAttemptId(): Option[String] = None
133  }
134
135  /** Length of time to wait while draining listener events. */
136  val WAIT_TIMEOUT_MILLIS = 10000
137  val sparkListener = new SparkListener() {
138    val submittedStageInfos = new HashSet[StageInfo]
139    val successfulStages = new HashSet[Int]
140    val failedStages = new ArrayBuffer[Int]
141    val stageByOrderOfExecution = new ArrayBuffer[Int]
142    val endedTasks = new HashSet[Long]
143
144    override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
145      submittedStageInfos += stageSubmitted.stageInfo
146    }
147
148    override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
149      val stageInfo = stageCompleted.stageInfo
150      stageByOrderOfExecution += stageInfo.stageId
151      if (stageInfo.failureReason.isEmpty) {
152        successfulStages += stageInfo.stageId
153      } else {
154        failedStages += stageInfo.stageId
155      }
156    }
157
158    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
159      endedTasks += taskEnd.taskInfo.taskId
160    }
161  }
162
163  var mapOutputTracker: MapOutputTrackerMaster = null
164  var broadcastManager: BroadcastManager = null
165  var securityMgr: SecurityManager = null
166  var scheduler: DAGScheduler = null
167  var dagEventProcessLoopTester: DAGSchedulerEventProcessLoop = null
168
169  /**
170   * Set of cache locations to return from our mock BlockManagerMaster.
171   * Keys are (rdd ID, partition ID). Anything not present will return an empty
172   * list of cache locations silently.
173   */
174  val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
175  // stub out BlockManagerMaster.getLocations to use our cacheLocations
176  val blockManagerMaster = new BlockManagerMaster(null, conf, true) {
177      override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
178        blockIds.map {
179          _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
180            getOrElse(Seq())
181        }.toIndexedSeq
182      }
183      override def removeExecutor(execId: String) {
184        // don't need to propagate to the driver, which we don't have
185      }
186    }
187
188  /** The list of results that DAGScheduler has collected. */
189  val results = new HashMap[Int, Any]()
190  var failure: Exception = _
191  val jobListener = new JobListener() {
192    override def taskSucceeded(index: Int, result: Any) = results.put(index, result)
193    override def jobFailed(exception: Exception) = { failure = exception }
194  }
195
196  /** A simple helper class for creating custom JobListeners */
197  class SimpleListener extends JobListener {
198    val results = new HashMap[Int, Any]
199    var failure: Exception = null
200    override def taskSucceeded(index: Int, result: Any): Unit = results.put(index, result)
201    override def jobFailed(exception: Exception): Unit = { failure = exception }
202  }
203
204  override def beforeEach(): Unit = {
205    super.beforeEach()
206    init(new SparkConf())
207  }
208
209  private def init(testConf: SparkConf): Unit = {
210    sc = new SparkContext("local", "DAGSchedulerSuite", testConf)
211    sparkListener.submittedStageInfos.clear()
212    sparkListener.successfulStages.clear()
213    sparkListener.failedStages.clear()
214    sparkListener.endedTasks.clear()
215    failure = null
216    sc.addSparkListener(sparkListener)
217    taskSets.clear()
218    cancelledStages.clear()
219    cacheLocations.clear()
220    results.clear()
221    securityMgr = new SecurityManager(conf)
222    broadcastManager = new BroadcastManager(true, conf, securityMgr)
223    mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
224      override def sendTracker(message: Any): Unit = {
225        // no-op, just so we can stop this to avoid leaking threads
226      }
227    }
228    scheduler = new DAGScheduler(
229      sc,
230      taskScheduler,
231      sc.listenerBus,
232      mapOutputTracker,
233      blockManagerMaster,
234      sc.env)
235    dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
236  }
237
238  override def afterEach(): Unit = {
239    try {
240      scheduler.stop()
241      dagEventProcessLoopTester.stop()
242      mapOutputTracker.stop()
243      broadcastManager.stop()
244    } finally {
245      super.afterEach()
246    }
247  }
248
249  override def afterAll() {
250    super.afterAll()
251  }
252
253  /**
254   * Type of RDD we use for testing. Note that we should never call the real RDD compute methods.
255   * This is a pair RDD type so it can always be used in ShuffleDependencies.
256   */
257  type PairOfIntsRDD = RDD[(Int, Int)]
258
259  /**
260   * Process the supplied event as if it were the top of the DAGScheduler event queue, expecting
261   * the scheduler not to exit.
262   *
263   * After processing the event, submit waiting stages as is done on most iterations of the
264   * DAGScheduler event loop.
265   */
266  private def runEvent(event: DAGSchedulerEvent) {
267    dagEventProcessLoopTester.post(event)
268  }
269
270  /**
271   * When we submit dummy Jobs, this is the compute function we supply. Except in a local test
272   * below, we do not expect this function to ever be executed; instead, we will return results
273   * directly through CompletionEvents.
274   */
275  private val jobComputeFunc = (context: TaskContext, it: Iterator[(_)]) =>
276    it.next.asInstanceOf[Tuple2[_, _]]._1
277
278  /** Send the given CompletionEvent messages for the tasks in the TaskSet. */
279  private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) {
280    assert(taskSet.tasks.size >= results.size)
281    for ((result, i) <- results.zipWithIndex) {
282      if (i < taskSet.tasks.size) {
283        runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2))
284      }
285    }
286  }
287
288  private def completeWithAccumulator(
289      accumId: Long,
290      taskSet: TaskSet,
291      results: Seq[(TaskEndReason, Any)]) {
292    assert(taskSet.tasks.size >= results.size)
293    for ((result, i) <- results.zipWithIndex) {
294      if (i < taskSet.tasks.size) {
295        runEvent(makeCompletionEvent(
296          taskSet.tasks(i),
297          result._1,
298          result._2,
299          Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accumId))))
300      }
301    }
302  }
303
304  /** Submits a job to the scheduler and returns the job id. */
305  private def submit(
306      rdd: RDD[_],
307      partitions: Array[Int],
308      func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
309      listener: JobListener = jobListener,
310      properties: Properties = null): Int = {
311    val jobId = scheduler.nextJobId.getAndIncrement()
312    runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener, properties))
313    jobId
314  }
315
316  /** Submits a map stage to the scheduler and returns the job id. */
317  private def submitMapStage(
318      shuffleDep: ShuffleDependency[_, _, _],
319      listener: JobListener = jobListener): Int = {
320    val jobId = scheduler.nextJobId.getAndIncrement()
321    runEvent(MapStageSubmitted(jobId, shuffleDep, CallSite("", ""), listener))
322    jobId
323  }
324
325  /** Sends TaskSetFailed to the scheduler. */
326  private def failed(taskSet: TaskSet, message: String) {
327    runEvent(TaskSetFailed(taskSet, message, None))
328  }
329
330  /** Sends JobCancelled to the DAG scheduler. */
331  private def cancel(jobId: Int) {
332    runEvent(JobCancelled(jobId))
333  }
334
335  test("[SPARK-3353] parent stage should have lower stage id") {
336    sparkListener.stageByOrderOfExecution.clear()
337    sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
338    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
339    assert(sparkListener.stageByOrderOfExecution.length === 2)
340    assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
341  }
342
343  /**
344   * This test ensures that DAGScheduler build stage graph correctly.
345   *
346   * Suppose you have the following DAG:
347   *
348   * [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
349   *             \                /
350   *               <-------------
351   *
352   * Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both
353   * B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example
354   * easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the
355   * shuffled data from B shuffle dependency ID s_B.
356   *
357   * Note: [] means an RDD, () means a shuffle dependency.
358   */
359  test("[SPARK-13902] Ensure no duplicate stages are created") {
360    val rddA = new MyRDD(sc, 1, Nil)
361    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
362    val s_A = shuffleDepA.shuffleId
363
364    val rddB = new MyRDD(sc, 1, List(shuffleDepA), tracker = mapOutputTracker)
365    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1))
366    val s_B = shuffleDepB.shuffleId
367
368    val rddC = new MyRDD(sc, 1, List(shuffleDepA, shuffleDepB), tracker = mapOutputTracker)
369    val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1))
370    val s_C = shuffleDepC.shuffleId
371
372    val rddD = new MyRDD(sc, 1, List(shuffleDepC), tracker = mapOutputTracker)
373
374    submit(rddD, Array(0))
375
376    assert(scheduler.shuffleIdToMapStage.size === 3)
377    assert(scheduler.activeJobs.size === 1)
378
379    val mapStageA = scheduler.shuffleIdToMapStage(s_A)
380    val mapStageB = scheduler.shuffleIdToMapStage(s_B)
381    val mapStageC = scheduler.shuffleIdToMapStage(s_C)
382    val finalStage = scheduler.activeJobs.head.finalStage
383
384    assert(mapStageA.parents.isEmpty)
385    assert(mapStageB.parents === List(mapStageA))
386    assert(mapStageC.parents === List(mapStageA, mapStageB))
387    assert(finalStage.parents === List(mapStageC))
388
389    complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
390    complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
391    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
392    complete(taskSets(3), Seq((Success, 42)))
393    assert(results === Map(0 -> 42))
394    assertDataStructuresEmpty()
395  }
396
397  test("zero split job") {
398    var numResults = 0
399    var failureReason: Option[Exception] = None
400    val fakeListener = new JobListener() {
401      override def taskSucceeded(partition: Int, value: Any): Unit = numResults += 1
402      override def jobFailed(exception: Exception): Unit = {
403        failureReason = Some(exception)
404      }
405    }
406    val jobId = submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener)
407    assert(numResults === 0)
408    cancel(jobId)
409    assert(failureReason.isDefined)
410    assert(failureReason.get.getMessage() === "Job 0 cancelled ")
411  }
412
413  test("run trivial job") {
414    submit(new MyRDD(sc, 1, Nil), Array(0))
415    complete(taskSets(0), List((Success, 42)))
416    assert(results === Map(0 -> 42))
417    assertDataStructuresEmpty()
418  }
419
420  test("run trivial job w/ dependency") {
421    val baseRdd = new MyRDD(sc, 1, Nil)
422    val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
423    submit(finalRdd, Array(0))
424    complete(taskSets(0), Seq((Success, 42)))
425    assert(results === Map(0 -> 42))
426    assertDataStructuresEmpty()
427  }
428
429  test("equals and hashCode AccumulableInfo") {
430    val accInfo1 = new AccumulableInfo(
431      1, Some("a1"), Some("delta1"), Some("val1"), internal = true, countFailedValues = false)
432    val accInfo2 = new AccumulableInfo(
433      1, Some("a1"), Some("delta1"), Some("val1"), internal = false, countFailedValues = false)
434    val accInfo3 = new AccumulableInfo(
435      1, Some("a1"), Some("delta1"), Some("val1"), internal = false, countFailedValues = false)
436    assert(accInfo1 !== accInfo2)
437    assert(accInfo2 === accInfo3)
438    assert(accInfo2.hashCode() === accInfo3.hashCode())
439  }
440
441  test("cache location preferences w/ dependency") {
442    val baseRdd = new MyRDD(sc, 1, Nil).cache()
443    val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
444    cacheLocations(baseRdd.id -> 0) =
445      Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
446    submit(finalRdd, Array(0))
447    val taskSet = taskSets(0)
448    assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
449    complete(taskSet, Seq((Success, 42)))
450    assert(results === Map(0 -> 42))
451    assertDataStructuresEmpty()
452  }
453
454  test("regression test for getCacheLocs") {
455    val rdd = new MyRDD(sc, 3, Nil).cache()
456    cacheLocations(rdd.id -> 0) =
457      Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
458    cacheLocations(rdd.id -> 1) =
459      Seq(makeBlockManagerId("hostB"), makeBlockManagerId("hostC"))
460    cacheLocations(rdd.id -> 2) =
461      Seq(makeBlockManagerId("hostC"), makeBlockManagerId("hostD"))
462    val locs = scheduler.getCacheLocs(rdd).map(_.map(_.host))
463    assert(locs === Seq(Seq("hostA", "hostB"), Seq("hostB", "hostC"), Seq("hostC", "hostD")))
464  }
465
466  /**
467   * This test ensures that if a particular RDD is cached, RDDs earlier in the dependency chain
468   * are not computed. It constructs the following chain of dependencies:
469   * +---+ shuffle +---+    +---+    +---+
470   * | A |<--------| B |<---| C |<---| D |
471   * +---+         +---+    +---+    +---+
472   * Here, B is derived from A by performing a shuffle, C has a one-to-one dependency on B,
473   * and D similarly has a one-to-one dependency on C. If none of the RDDs were cached, this
474   * set of RDDs would result in a two stage job: one ShuffleMapStage, and a ResultStage that
475   * reads the shuffled data from RDD A. This test ensures that if C is cached, the scheduler
476   * doesn't perform a shuffle, and instead computes the result using a single ResultStage
477   * that reads C's cached data.
478   */
479  test("getMissingParentStages should consider all ancestor RDDs' cache statuses") {
480    val rddA = new MyRDD(sc, 1, Nil)
481    val rddB = new MyRDD(sc, 1, List(new ShuffleDependency(rddA, new HashPartitioner(1))),
482      tracker = mapOutputTracker)
483    val rddC = new MyRDD(sc, 1, List(new OneToOneDependency(rddB))).cache()
484    val rddD = new MyRDD(sc, 1, List(new OneToOneDependency(rddC)))
485    cacheLocations(rddC.id -> 0) =
486      Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
487    submit(rddD, Array(0))
488    assert(scheduler.runningStages.size === 1)
489    // Make sure that the scheduler is running the final result stage.
490    // Because C is cached, the shuffle map stage to compute A does not need to be run.
491    assert(scheduler.runningStages.head.isInstanceOf[ResultStage])
492  }
493
494  test("avoid exponential blowup when getting preferred locs list") {
495    // Build up a complex dependency graph with repeated zip operations, without preferred locations
496    var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
497    (1 to 30).foreach(_ => rdd = rdd.zip(rdd))
498    // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
499    failAfter(10 seconds) {
500      val preferredLocs = scheduler.getPreferredLocs(rdd, 0)
501      // No preferred locations are returned.
502      assert(preferredLocs.length === 0)
503    }
504  }
505
506  test("unserializable task") {
507    val unserializableRdd = new MyRDD(sc, 1, Nil) {
508      class UnserializableClass
509      val unserializable = new UnserializableClass
510    }
511    submit(unserializableRdd, Array(0))
512    assert(failure.getMessage.startsWith(
513      "Job aborted due to stage failure: Task not serializable:"))
514    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
515    assert(sparkListener.failedStages.contains(0))
516    assert(sparkListener.failedStages.size === 1)
517    assertDataStructuresEmpty()
518  }
519
520  test("trivial job failure") {
521    submit(new MyRDD(sc, 1, Nil), Array(0))
522    failed(taskSets(0), "some failure")
523    assert(failure.getMessage === "Job aborted due to stage failure: some failure")
524    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
525    assert(sparkListener.failedStages.contains(0))
526    assert(sparkListener.failedStages.size === 1)
527    assertDataStructuresEmpty()
528  }
529
530  test("trivial job cancellation") {
531    val rdd = new MyRDD(sc, 1, Nil)
532    val jobId = submit(rdd, Array(0))
533    cancel(jobId)
534    assert(failure.getMessage === s"Job $jobId cancelled ")
535    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
536    assert(sparkListener.failedStages.contains(0))
537    assert(sparkListener.failedStages.size === 1)
538    assertDataStructuresEmpty()
539  }
540
541  test("job cancellation no-kill backend") {
542    // make sure that the DAGScheduler doesn't crash when the TaskScheduler
543    // doesn't implement killTask()
544    val noKillTaskScheduler = new TaskScheduler() {
545      override def rootPool: Pool = null
546      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
547      override def start(): Unit = {}
548      override def stop(): Unit = {}
549      override def submitTasks(taskSet: TaskSet): Unit = {
550        taskSets += taskSet
551      }
552      override def cancelTasks(stageId: Int, interruptThread: Boolean) {
553        throw new UnsupportedOperationException
554      }
555      override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
556      override def defaultParallelism(): Int = 2
557      override def executorHeartbeatReceived(
558          execId: String,
559          accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
560          blockManagerId: BlockManagerId): Boolean = true
561      override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
562      override def applicationAttemptId(): Option[String] = None
563    }
564    val noKillScheduler = new DAGScheduler(
565      sc,
566      noKillTaskScheduler,
567      sc.listenerBus,
568      mapOutputTracker,
569      blockManagerMaster,
570      sc.env)
571    dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(noKillScheduler)
572    val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
573    cancel(jobId)
574    // Because the job wasn't actually cancelled, we shouldn't have received a failure message.
575    assert(failure === null)
576
577    // When the task set completes normally, state should be correctly updated.
578    complete(taskSets(0), Seq((Success, 42)))
579    assert(results === Map(0 -> 42))
580    assertDataStructuresEmpty()
581
582    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
583    assert(sparkListener.failedStages.isEmpty)
584    assert(sparkListener.successfulStages.contains(0))
585  }
586
587  test("run trivial shuffle") {
588    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
589    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
590    val shuffleId = shuffleDep.shuffleId
591    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
592    submit(reduceRdd, Array(0))
593    complete(taskSets(0), Seq(
594      (Success, makeMapStatus("hostA", 1)),
595      (Success, makeMapStatus("hostB", 1))))
596    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
597      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
598    complete(taskSets(1), Seq((Success, 42)))
599    assert(results === Map(0 -> 42))
600    assertDataStructuresEmpty()
601  }
602
603  test("run trivial shuffle with fetch failure") {
604    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
605    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
606    val shuffleId = shuffleDep.shuffleId
607    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
608    submit(reduceRdd, Array(0, 1))
609    complete(taskSets(0), Seq(
610      (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
611      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
612    // the 2nd ResultTask failed
613    complete(taskSets(1), Seq(
614      (Success, 42),
615      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
616    // this will get called
617    // blockManagerMaster.removeExecutor("exec-hostA")
618    // ask the scheduler to try it again
619    scheduler.resubmitFailedStages()
620    // have the 2nd attempt pass
621    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
622    // we can see both result blocks now
623    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
624      HashSet("hostA", "hostB"))
625    complete(taskSets(3), Seq((Success, 43)))
626    assert(results === Map(0 -> 42, 1 -> 43))
627    assertDataStructuresEmpty()
628  }
629
630  private val shuffleFileLossTests = Seq(
631    ("slave lost with shuffle service", SlaveLost("", false), true, false),
632    ("worker lost with shuffle service", SlaveLost("", true), true, true),
633    ("worker lost without shuffle service", SlaveLost("", true), false, true),
634    ("executor failure with shuffle service", ExecutorKilled, true, false),
635    ("executor failure without shuffle service", ExecutorKilled, false, true))
636
637  for ((eventDescription, event, shuffleServiceOn, expectFileLoss) <- shuffleFileLossTests) {
638    val maybeLost = if (expectFileLoss) {
639      "lost"
640    } else {
641      "not lost"
642    }
643    test(s"shuffle files $maybeLost when $eventDescription") {
644      // reset the test context with the right shuffle service config
645      afterEach()
646      val conf = new SparkConf()
647      conf.set("spark.shuffle.service.enabled", shuffleServiceOn.toString)
648      init(conf)
649      assert(sc.env.blockManager.externalShuffleServiceEnabled == shuffleServiceOn)
650
651      val shuffleMapRdd = new MyRDD(sc, 2, Nil)
652      val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
653      val shuffleId = shuffleDep.shuffleId
654      val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
655      submit(reduceRdd, Array(0))
656      complete(taskSets(0), Seq(
657        (Success, makeMapStatus("hostA", 1)),
658        (Success, makeMapStatus("hostB", 1))))
659      runEvent(ExecutorLost("exec-hostA", event))
660      if (expectFileLoss) {
661        intercept[MetadataFetchFailedException] {
662          mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
663        }
664      } else {
665        assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
666          HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
667      }
668    }
669  }
670
671  // Helper function to validate state when creating tests for task failures
672  private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
673    assert(stageAttempt.stageId === stageId)
674    assert(stageAttempt.stageAttemptId == attempt)
675  }
676
677  // Helper functions to extract commonly used code in Fetch Failure test cases
678  private def setupStageAbortTest(sc: SparkContext) {
679    sc.listenerBus.addListener(new EndListener())
680    ended = false
681    jobResult = null
682  }
683
684  // Create a new Listener to confirm that the listenerBus sees the JobEnd message
685  // when we abort the stage. This message will also be consumed by the EventLoggingListener
686  // so this will propagate up to the user.
687  var ended = false
688  var jobResult : JobResult = null
689
690  class EndListener extends SparkListener {
691    override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
692      jobResult = jobEnd.jobResult
693      ended = true
694    }
695  }
696
697  /**
698   * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
699   * successfully.
700   *
701   * @param stageId - The current stageId
702   * @param attemptIdx - The current attempt count
703   * @param numShufflePartitions - The number of partitions in the next stage
704   */
705  private def completeShuffleMapStageSuccessfully(
706      stageId: Int,
707      attemptIdx: Int,
708      numShufflePartitions: Int): Unit = {
709    val stageAttempt = taskSets.last
710    checkStageId(stageId, attemptIdx, stageAttempt)
711    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
712      case (task, idx) =>
713        (Success, makeMapStatus("host" + ('A' + idx).toChar, numShufflePartitions))
714    }.toSeq)
715  }
716
717  /**
718   * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
719   * with all FetchFailure.
720   *
721   * @param stageId - The current stageId
722   * @param attemptIdx - The current attempt count
723   * @param shuffleDep - The shuffle dependency of the stage with a fetch failure
724   */
725  private def completeNextStageWithFetchFailure(
726      stageId: Int,
727      attemptIdx: Int,
728      shuffleDep: ShuffleDependency[_, _, _]): Unit = {
729    val stageAttempt = taskSets.last
730    checkStageId(stageId, attemptIdx, stageAttempt)
731    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
732      (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null)
733    }.toSeq)
734  }
735
736  /**
737   * Common code to get the next result stage attempt, confirm it's the one we expect, and
738   * complete it with a success where we return 42.
739   *
740   * @param stageId - The current stageId
741   * @param attemptIdx - The current attempt count
742   */
743  private def completeNextResultStageWithSuccess(
744      stageId: Int,
745      attemptIdx: Int,
746      partitionToResult: Int => Int = _ => 42): Unit = {
747    val stageAttempt = taskSets.last
748    checkStageId(stageId, attemptIdx, stageAttempt)
749    assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
750    val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
751      (Success, partitionToResult(idx))
752    }
753    complete(stageAttempt, taskResults.toSeq)
754  }
755
756  /**
757   * In this test, we simulate a job where many tasks in the same stage fail. We want to show
758   * that many fetch failures inside a single stage attempt do not trigger an abort
759   * on their own, but only when there are enough failing stage attempts.
760   */
761  test("Single stage fetch failure should not abort the stage.") {
762    setupStageAbortTest(sc)
763
764    val parts = 8
765    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
766    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
767    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
768    submit(reduceRdd, (0 until parts).toArray)
769
770    completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts)
771
772    completeNextStageWithFetchFailure(1, 0, shuffleDep)
773
774    // Resubmit and confirm that now all is well
775    scheduler.resubmitFailedStages()
776
777    assert(scheduler.runningStages.nonEmpty)
778    assert(!ended)
779
780    // Complete stage 0 and then stage 1 with a "42"
781    completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = parts)
782    completeNextResultStageWithSuccess(1, 1)
783
784    // Confirm job finished successfully
785    sc.listenerBus.waitUntilEmpty(1000)
786    assert(ended === true)
787    assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
788    assertDataStructuresEmpty()
789  }
790
791  /**
792   * In this test we simulate a job failure where the first stage completes successfully and
793   * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage
794   * trigger an overall job abort to avoid endless retries.
795   */
796  test("Multiple consecutive stage fetch failures should lead to job being aborted.") {
797    setupStageAbortTest(sc)
798
799    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
800    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
801    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
802    submit(reduceRdd, Array(0, 1))
803
804    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
805      // Complete all the tasks for the current attempt of stage 0 successfully
806      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
807
808      // Now we should have a new taskSet, for a new attempt of stage 1.
809      // Fail all these tasks with FetchFailure
810      completeNextStageWithFetchFailure(1, attempt, shuffleDep)
811
812      // this will trigger a resubmission of stage 0, since we've lost some of its
813      // map output, for the next iteration through the loop
814      scheduler.resubmitFailedStages()
815
816      if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
817        assert(scheduler.runningStages.nonEmpty)
818        assert(!ended)
819      } else {
820        // Stage should have been aborted and removed from running stages
821        assertDataStructuresEmpty()
822        sc.listenerBus.waitUntilEmpty(1000)
823        assert(ended)
824        jobResult match {
825          case JobFailed(reason) =>
826            assert(reason.getMessage.contains("ResultStage 1 () has failed the maximum"))
827          case other => fail(s"expected JobFailed, not $other")
828        }
829      }
830    }
831  }
832
833  /**
834   * In this test, we create a job with two consecutive shuffles, and simulate 2 failures for each
835   * shuffle fetch. In total In total, the job has had four failures overall but not four failures
836   * for a particular stage, and as such should not be aborted.
837   */
838  test("Failures in different stages should not trigger an overall abort") {
839    setupStageAbortTest(sc)
840
841    val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
842    val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2))
843    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache()
844    val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1))
845    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker)
846    submit(finalRdd, Array(0))
847
848    // In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations,
849    // stage 2 fails.
850    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
851      // Complete all the tasks for the current attempt of stage 0 successfully
852      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
853
854      if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) {
855        // Now we should have a new taskSet, for a new attempt of stage 1.
856        // Fail all these tasks with FetchFailure
857        completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
858      } else {
859        completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 1)
860
861        // Fail stage 2
862        completeNextStageWithFetchFailure(2, attempt - Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2,
863          shuffleDepTwo)
864      }
865
866      // this will trigger a resubmission of stage 0, since we've lost some of its
867      // map output, for the next iteration through the loop
868      scheduler.resubmitFailedStages()
869    }
870
871    completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
872    completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1)
873
874    // Succeed stage2 with a "42"
875    completeNextResultStageWithSuccess(2, Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2)
876
877    assert(results === Map(0 -> 42))
878    assertDataStructuresEmpty()
879  }
880
881  /**
882   * In this test we demonstrate that only consecutive failures trigger a stage abort. A stage may
883   * fail multiple times, succeed, then fail a few more times (because its run again by downstream
884   * dependencies). The total number of failed attempts for one stage will go over the limit,
885   * but that doesn't matter, since they have successes in the middle.
886   */
887  test("Non-consecutive stage failures don't trigger abort") {
888    setupStageAbortTest(sc)
889
890    val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
891    val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2))
892    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache()
893    val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1))
894    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker)
895    submit(finalRdd, Array(0))
896
897    // First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
898    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
899      // Make each task in stage 0 success
900      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
901
902      // Now we should have a new taskSet, for a new attempt of stage 1.
903      // Fail these tasks with FetchFailure
904      completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
905
906      scheduler.resubmitFailedStages()
907
908      // Confirm we have not yet aborted
909      assert(scheduler.runningStages.nonEmpty)
910      assert(!ended)
911    }
912
913    // Rerun stage 0 and 1 to step through the task set
914    completeShuffleMapStageSuccessfully(0, 3, numShufflePartitions = 2)
915    completeShuffleMapStageSuccessfully(1, 3, numShufflePartitions = 1)
916
917    // Fail stage 2 so that stage 1 is resubmitted when we call scheduler.resubmitFailedStages()
918    completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
919
920    scheduler.resubmitFailedStages()
921
922    // Rerun stage 0 to step through the task set
923    completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
924
925    // Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this doesn't trigger an abort
926    // since we succeeded in between.
927    completeNextStageWithFetchFailure(1, 4, shuffleDepOne)
928
929    scheduler.resubmitFailedStages()
930
931    // Confirm we have not yet aborted
932    assert(scheduler.runningStages.nonEmpty)
933    assert(!ended)
934
935    // Next, succeed all and confirm output
936    // Rerun stage 0 + 1
937    completeShuffleMapStageSuccessfully(0, 5, numShufflePartitions = 2)
938    completeShuffleMapStageSuccessfully(1, 5, numShufflePartitions = 1)
939
940    // Succeed stage 2 and verify results
941    completeNextResultStageWithSuccess(2, 1)
942
943    assertDataStructuresEmpty()
944    sc.listenerBus.waitUntilEmpty(1000)
945    assert(ended === true)
946    assert(results === Map(0 -> 42))
947  }
948
949  test("trivial shuffle with multiple fetch failures") {
950    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
951    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
952    val shuffleId = shuffleDep.shuffleId
953    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
954    submit(reduceRdd, Array(0, 1))
955    complete(taskSets(0), Seq(
956      (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
957      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
958    // The MapOutputTracker should know about both map output locations.
959    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
960      HashSet("hostA", "hostB"))
961
962    // The first result task fails, with a fetch failure for the output from the first mapper.
963    runEvent(makeCompletionEvent(
964      taskSets(1).tasks(0),
965      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
966      null))
967    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
968    assert(sparkListener.failedStages.contains(1))
969
970    // The second ResultTask fails, with a fetch failure for the output from the second mapper.
971    runEvent(makeCompletionEvent(
972      taskSets(1).tasks(0),
973      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
974      null))
975    // The SparkListener should not receive redundant failure events.
976    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
977    assert(sparkListener.failedStages.size == 1)
978  }
979
980  /**
981   * This tests the case where another FetchFailed comes in while the map stage is getting
982   * re-run.
983   */
984  test("late fetch failures don't cause multiple concurrent attempts for the same map stage") {
985    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
986    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
987    val shuffleId = shuffleDep.shuffleId
988    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
989    submit(reduceRdd, Array(0, 1))
990
991    val mapStageId = 0
992    def countSubmittedMapStageAttempts(): Int = {
993      sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
994    }
995
996    // The map stage should have been submitted.
997    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
998    assert(countSubmittedMapStageAttempts() === 1)
999
1000    complete(taskSets(0), Seq(
1001      (Success, makeMapStatus("hostA", 2)),
1002      (Success, makeMapStatus("hostB", 2))))
1003    // The MapOutputTracker should know about both map output locations.
1004    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
1005      HashSet("hostA", "hostB"))
1006    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 1).map(_._1.host).toSet ===
1007      HashSet("hostA", "hostB"))
1008
1009    // The first result task fails, with a fetch failure for the output from the first mapper.
1010    runEvent(makeCompletionEvent(
1011      taskSets(1).tasks(0),
1012      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
1013      null))
1014    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1015    assert(sparkListener.failedStages.contains(1))
1016
1017    // Trigger resubmission of the failed map stage.
1018    runEvent(ResubmitFailedStages)
1019    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1020
1021    // Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
1022    assert(countSubmittedMapStageAttempts() === 2)
1023
1024    // The second ResultTask fails, with a fetch failure for the output from the second mapper.
1025    runEvent(makeCompletionEvent(
1026      taskSets(1).tasks(1),
1027      FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
1028      null))
1029
1030    // Another ResubmitFailedStages event should not result in another attempt for the map
1031    // stage being run concurrently.
1032    // NOTE: the actual ResubmitFailedStages may get called at any time during this, but it
1033    // shouldn't effect anything -- our calling it just makes *SURE* it gets called between the
1034    // desired event and our check.
1035    runEvent(ResubmitFailedStages)
1036    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1037    assert(countSubmittedMapStageAttempts() === 2)
1038
1039  }
1040
1041  /**
1042   * This tests the case where a late FetchFailed comes in after the map stage has finished getting
1043   * retried and a new reduce stage starts running.
1044   */
1045  test("extremely late fetch failures don't cause multiple concurrent attempts for " +
1046    "the same stage") {
1047    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1048    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1049    val shuffleId = shuffleDep.shuffleId
1050    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
1051    submit(reduceRdd, Array(0, 1))
1052
1053    def countSubmittedReduceStageAttempts(): Int = {
1054      sparkListener.submittedStageInfos.count(_.stageId == 1)
1055    }
1056    def countSubmittedMapStageAttempts(): Int = {
1057      sparkListener.submittedStageInfos.count(_.stageId == 0)
1058    }
1059
1060    // The map stage should have been submitted.
1061    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1062    assert(countSubmittedMapStageAttempts() === 1)
1063
1064    // Complete the map stage.
1065    complete(taskSets(0), Seq(
1066      (Success, makeMapStatus("hostA", 2)),
1067      (Success, makeMapStatus("hostB", 2))))
1068
1069    // The reduce stage should have been submitted.
1070    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1071    assert(countSubmittedReduceStageAttempts() === 1)
1072
1073    // The first result task fails, with a fetch failure for the output from the first mapper.
1074    runEvent(makeCompletionEvent(
1075      taskSets(1).tasks(0),
1076      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
1077      null))
1078
1079    // Trigger resubmission of the failed map stage and finish the re-started map task.
1080    runEvent(ResubmitFailedStages)
1081    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
1082
1083    // Because the map stage finished, another attempt for the reduce stage should have been
1084    // submitted, resulting in 2 total attempts for each the map and the reduce stage.
1085    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1086    assert(countSubmittedMapStageAttempts() === 2)
1087    assert(countSubmittedReduceStageAttempts() === 2)
1088
1089    // A late FetchFailed arrives from the second task in the original reduce stage.
1090    runEvent(makeCompletionEvent(
1091      taskSets(1).tasks(1),
1092      FetchFailed(makeBlockManagerId("hostB"), shuffleId, 1, 1, "ignored"),
1093      null))
1094
1095    // Running ResubmitFailedStages shouldn't result in any more attempts for the map stage, because
1096    // the FetchFailed should have been ignored
1097    runEvent(ResubmitFailedStages)
1098
1099    // The FetchFailed from the original reduce stage should be ignored.
1100    assert(countSubmittedMapStageAttempts() === 2)
1101  }
1102
1103  test("task events always posted in speculation / when stage is killed") {
1104    val baseRdd = new MyRDD(sc, 4, Nil)
1105    val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
1106    submit(finalRdd, Array(0, 1, 2, 3))
1107
1108    // complete two tasks
1109    runEvent(makeCompletionEvent(
1110      taskSets(0).tasks(0), Success, 42,
1111      Seq.empty, createFakeTaskInfoWithId(0)))
1112    runEvent(makeCompletionEvent(
1113      taskSets(0).tasks(1), Success, 42,
1114      Seq.empty, createFakeTaskInfoWithId(1)))
1115    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1116    // verify stage exists
1117    assert(scheduler.stageIdToStage.contains(0))
1118    assert(sparkListener.endedTasks.size == 2)
1119
1120    // finish other 2 tasks
1121    runEvent(makeCompletionEvent(
1122      taskSets(0).tasks(2), Success, 42,
1123      Seq.empty, createFakeTaskInfoWithId(2)))
1124    runEvent(makeCompletionEvent(
1125      taskSets(0).tasks(3), Success, 42,
1126      Seq.empty, createFakeTaskInfoWithId(3)))
1127    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1128    assert(sparkListener.endedTasks.size == 4)
1129
1130    // verify the stage is done
1131    assert(!scheduler.stageIdToStage.contains(0))
1132
1133    // Stage should be complete. Finish one other Successful task to simulate what can happen
1134    // with a speculative task and make sure the event is sent out
1135    runEvent(makeCompletionEvent(
1136      taskSets(0).tasks(3), Success, 42,
1137      Seq.empty, createFakeTaskInfoWithId(5)))
1138    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1139    assert(sparkListener.endedTasks.size == 5)
1140
1141    // make sure non successful tasks also send out event
1142    runEvent(makeCompletionEvent(
1143      taskSets(0).tasks(3), UnknownReason, 42,
1144      Seq.empty, createFakeTaskInfoWithId(6)))
1145    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1146    assert(sparkListener.endedTasks.size == 6)
1147  }
1148
1149  test("ignore late map task completions") {
1150    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1151    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1152    val shuffleId = shuffleDep.shuffleId
1153    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
1154    submit(reduceRdd, Array(0, 1))
1155
1156    // pretend we were told hostA went away
1157    val oldEpoch = mapOutputTracker.getEpoch
1158    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
1159    val newEpoch = mapOutputTracker.getEpoch
1160    assert(newEpoch > oldEpoch)
1161
1162    // now start completing some tasks in the shuffle map stage, under different hosts
1163    // and epochs, and make sure scheduler updates its state correctly
1164    val taskSet = taskSets(0)
1165    val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage]
1166    assert(shuffleStage.numAvailableOutputs === 0)
1167
1168    // should be ignored for being too old
1169    runEvent(makeCompletionEvent(
1170      taskSet.tasks(0),
1171      Success,
1172      makeMapStatus("hostA", reduceRdd.partitions.size)))
1173    assert(shuffleStage.numAvailableOutputs === 0)
1174
1175    // should work because it's a non-failed host (so the available map outputs will increase)
1176    runEvent(makeCompletionEvent(
1177      taskSet.tasks(0),
1178      Success,
1179      makeMapStatus("hostB", reduceRdd.partitions.size)))
1180    assert(shuffleStage.numAvailableOutputs === 1)
1181
1182    // should be ignored for being too old
1183    runEvent(makeCompletionEvent(
1184      taskSet.tasks(0),
1185      Success,
1186      makeMapStatus("hostA", reduceRdd.partitions.size)))
1187    assert(shuffleStage.numAvailableOutputs === 1)
1188
1189    // should work because it's a new epoch, which will increase the number of available map
1190    // outputs, and also finish the stage
1191    taskSet.tasks(1).epoch = newEpoch
1192    runEvent(makeCompletionEvent(
1193      taskSet.tasks(1),
1194      Success,
1195      makeMapStatus("hostA", reduceRdd.partitions.size)))
1196    assert(shuffleStage.numAvailableOutputs === 2)
1197    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
1198      HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
1199
1200    // finish the next stage normally, which completes the job
1201    complete(taskSets(1), Seq((Success, 42), (Success, 43)))
1202    assert(results === Map(0 -> 42, 1 -> 43))
1203    assertDataStructuresEmpty()
1204  }
1205
1206  test("run shuffle with map stage failure") {
1207    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1208    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1209    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
1210    submit(reduceRdd, Array(0, 1))
1211
1212    // Fail the map stage.  This should cause the entire job to fail.
1213    val stageFailureMessage = "Exception failure in map stage"
1214    failed(taskSets(0), stageFailureMessage)
1215    assert(failure.getMessage === s"Job aborted due to stage failure: $stageFailureMessage")
1216
1217    // Listener bus should get told about the map stage failing, but not the reduce stage
1218    // (since the reduce stage hasn't been started yet).
1219    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1220    assert(sparkListener.failedStages.toSet === Set(0))
1221
1222    assertDataStructuresEmpty()
1223  }
1224
1225  /**
1226   * Run two jobs, with a shared dependency.  We simulate a fetch failure in the second job, which
1227   * requires regenerating some outputs of the shared dependency.  One key aspect of this test is
1228   * that the second job actually uses a different stage for the shared dependency (a "skipped"
1229   * stage).
1230   */
1231  test("shuffle fetch failure in a reused shuffle dependency") {
1232    // Run the first job successfully, which creates one shuffle dependency
1233
1234    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1235    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1236    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
1237    submit(reduceRdd, Array(0, 1))
1238
1239    completeShuffleMapStageSuccessfully(0, 0, 2)
1240    completeNextResultStageWithSuccess(1, 0)
1241    assert(results === Map(0 -> 42, 1 -> 42))
1242    assertDataStructuresEmpty()
1243
1244    // submit another job w/ the shared dependency, and have a fetch failure
1245    val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
1246    submit(reduce2, Array(0, 1))
1247    // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped
1248    // stage.  If instead it reused the existing stage, then this would be stage 2
1249    completeNextStageWithFetchFailure(3, 0, shuffleDep)
1250    scheduler.resubmitFailedStages()
1251
1252    // the scheduler now creates a new task set to regenerate the missing map output, but this time
1253    // using a different stage, the "skipped" one
1254
1255    // SPARK-9809 -- this stage is submitted without a task for each partition (because some of
1256    // the shuffle map output is still available from stage 0); make sure we've still got internal
1257    // accumulators setup
1258    assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics != null)
1259    completeShuffleMapStageSuccessfully(2, 0, 2)
1260    completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
1261    assert(results === Map(0 -> 1234, 1 -> 1235))
1262
1263    assertDataStructuresEmpty()
1264  }
1265
1266  /**
1267   * This test runs a three stage job, with a fetch failure in stage 1.  but during the retry, we
1268   * have completions from both the first & second attempt of stage 1.  So all the map output is
1269   * available before we finish any task set for stage 1.  We want to make sure that we don't
1270   * submit stage 2 until the map output for stage 1 is registered
1271   */
1272  test("don't submit stage until its dependencies map outputs are registered (SPARK-5259)") {
1273    val firstRDD = new MyRDD(sc, 3, Nil)
1274    val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2))
1275    val firstShuffleId = firstShuffleDep.shuffleId
1276    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
1277    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1278    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
1279    submit(reduceRdd, Array(0))
1280
1281    // things start out smoothly, stage 0 completes with no issues
1282    complete(taskSets(0), Seq(
1283      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
1284      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
1285      (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
1286    ))
1287
1288    // then one executor dies, and a task fails in stage 1
1289    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
1290    runEvent(makeCompletionEvent(
1291      taskSets(1).tasks(0),
1292      FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
1293      null))
1294
1295    // so we resubmit stage 0, which completes happily
1296    scheduler.resubmitFailedStages()
1297    val stage0Resubmit = taskSets(2)
1298    assert(stage0Resubmit.stageId == 0)
1299    assert(stage0Resubmit.stageAttemptId === 1)
1300    val task = stage0Resubmit.tasks(0)
1301    assert(task.partitionId === 2)
1302    runEvent(makeCompletionEvent(
1303      task,
1304      Success,
1305      makeMapStatus("hostC", shuffleMapRdd.partitions.length)))
1306
1307    // now here is where things get tricky : we will now have a task set representing
1308    // the second attempt for stage 1, but we *also* have some tasks for the first attempt for
1309    // stage 1 still going
1310    val stage1Resubmit = taskSets(3)
1311    assert(stage1Resubmit.stageId == 1)
1312    assert(stage1Resubmit.stageAttemptId === 1)
1313    assert(stage1Resubmit.tasks.length === 3)
1314
1315    // we'll have some tasks finish from the first attempt, and some finish from the second attempt,
1316    // so that we actually have all stage outputs, though no attempt has completed all its
1317    // tasks
1318    runEvent(makeCompletionEvent(
1319      taskSets(3).tasks(0),
1320      Success,
1321      makeMapStatus("hostC", reduceRdd.partitions.length)))
1322    runEvent(makeCompletionEvent(
1323      taskSets(3).tasks(1),
1324      Success,
1325      makeMapStatus("hostC", reduceRdd.partitions.length)))
1326    // late task finish from the first attempt
1327    runEvent(makeCompletionEvent(
1328      taskSets(1).tasks(2),
1329      Success,
1330      makeMapStatus("hostB", reduceRdd.partitions.length)))
1331
1332    // What should happen now is that we submit stage 2.  However, we might not see an error
1333    // b/c of DAGScheduler's error handling (it tends to swallow errors and just log them).  But
1334    // we can check some conditions.
1335    // Note that the really important thing here is not so much that we submit stage 2 *immediately*
1336    // but that we don't end up with some error from these interleaved completions.  It would also
1337    // be OK (though sub-optimal) if stage 2 simply waited until the resubmission of stage 1 had
1338    // all its tasks complete
1339
1340    // check that we have all the map output for stage 0 (it should have been there even before
1341    // the last round of completions from stage 1, but just to double check it hasn't been messed
1342    // up) and also the newly available stage 1
1343    val stageToReduceIdxs = Seq(
1344      0 -> (0 until 3),
1345      1 -> (0 until 1)
1346    )
1347    for {
1348      (stage, reduceIdxs) <- stageToReduceIdxs
1349      reduceIdx <- reduceIdxs
1350    } {
1351      // this would throw an exception if the map status hadn't been registered
1352      val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx)
1353      // really we should have already thrown an exception rather than fail either of these
1354      // asserts, but just to be extra defensive let's double check the statuses are OK
1355      assert(statuses != null)
1356      assert(statuses.nonEmpty)
1357    }
1358
1359    // and check that stage 2 has been submitted
1360    assert(taskSets.size == 5)
1361    val stage2TaskSet = taskSets(4)
1362    assert(stage2TaskSet.stageId == 2)
1363    assert(stage2TaskSet.stageAttemptId == 0)
1364  }
1365
1366  /**
1367   * We lose an executor after completing some shuffle map tasks on it.  Those tasks get
1368   * resubmitted, and when they finish the job completes normally
1369   */
1370  test("register map outputs correctly after ExecutorLost and task Resubmitted") {
1371    val firstRDD = new MyRDD(sc, 3, Nil)
1372    val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2))
1373    val reduceRdd = new MyRDD(sc, 5, List(firstShuffleDep))
1374    submit(reduceRdd, Array(0))
1375
1376    // complete some of the tasks from the first stage, on one host
1377    runEvent(makeCompletionEvent(
1378      taskSets(0).tasks(0),
1379      Success,
1380      makeMapStatus("hostA", reduceRdd.partitions.length)))
1381    runEvent(makeCompletionEvent(
1382      taskSets(0).tasks(1),
1383      Success,
1384      makeMapStatus("hostA", reduceRdd.partitions.length)))
1385
1386    // now that host goes down
1387    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
1388
1389    // so we resubmit those tasks
1390    runEvent(makeCompletionEvent(taskSets(0).tasks(0), Resubmitted, null))
1391    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Resubmitted, null))
1392
1393    // now complete everything on a different host
1394    complete(taskSets(0), Seq(
1395      (Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
1396      (Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
1397      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))
1398    ))
1399
1400    // now we should submit stage 1, and the map output from stage 0 should be registered
1401
1402    // check that we have all the map output for stage 0
1403    (0 until reduceRdd.partitions.length).foreach { reduceIdx =>
1404      val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
1405      // really we should have already thrown an exception rather than fail either of these
1406      // asserts, but just to be extra defensive let's double check the statuses are OK
1407      assert(statuses != null)
1408      assert(statuses.nonEmpty)
1409    }
1410
1411    // and check that stage 1 has been submitted
1412    assert(taskSets.size == 2)
1413    val stage1TaskSet = taskSets(1)
1414    assert(stage1TaskSet.stageId == 1)
1415    assert(stage1TaskSet.stageAttemptId == 0)
1416  }
1417
1418  /**
1419   * Makes sure that failures of stage used by multiple jobs are correctly handled.
1420   *
1421   * This test creates the following dependency graph:
1422   *
1423   * shuffleMapRdd1     shuffleMapRDD2
1424   *        |     \        |
1425   *        |      \       |
1426   *        |       \      |
1427   *        |        \     |
1428   *   reduceRdd1    reduceRdd2
1429   *
1430   * We start both shuffleMapRdds and then fail shuffleMapRdd1.  As a result, the job listeners for
1431   * reduceRdd1 and reduceRdd2 should both be informed that the job failed.  shuffleMapRDD2 should
1432   * also be cancelled, because it is only used by reduceRdd2 and reduceRdd2 cannot complete
1433   * without shuffleMapRdd1.
1434   */
1435  test("failure of stage used by two jobs") {
1436    val shuffleMapRdd1 = new MyRDD(sc, 2, Nil)
1437    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
1438    val shuffleMapRdd2 = new MyRDD(sc, 2, Nil)
1439    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new HashPartitioner(2))
1440
1441    val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
1442    val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2), tracker = mapOutputTracker)
1443
1444    // We need to make our own listeners for this test, since by default submit uses the same
1445    // listener for all jobs, and here we want to capture the failure for each job separately.
1446    class FailureRecordingJobListener() extends JobListener {
1447      var failureMessage: String = _
1448      override def taskSucceeded(index: Int, result: Any) {}
1449      override def jobFailed(exception: Exception): Unit = { failureMessage = exception.getMessage }
1450    }
1451    val listener1 = new FailureRecordingJobListener()
1452    val listener2 = new FailureRecordingJobListener()
1453
1454    submit(reduceRdd1, Array(0, 1), listener = listener1)
1455    submit(reduceRdd2, Array(0, 1), listener = listener2)
1456
1457    val stageFailureMessage = "Exception failure in map stage"
1458    failed(taskSets(0), stageFailureMessage)
1459
1460    assert(cancelledStages.toSet === Set(0, 2))
1461
1462    // Make sure the listeners got told about both failed stages.
1463    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
1464    assert(sparkListener.successfulStages.isEmpty)
1465    assert(sparkListener.failedStages.toSet === Set(0, 2))
1466
1467    assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
1468    assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
1469    assertDataStructuresEmpty()
1470  }
1471
1472  def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = {
1473    assert(taskSet.properties != null)
1474    assert(taskSet.properties.getProperty("testProperty") === expected)
1475    assert(taskSet.priority === priority)
1476  }
1477
1478  def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = {
1479    val baseRdd = new MyRDD(sc, 1, Nil)
1480    val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1))
1481    val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
1482    val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1))
1483    val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
1484    val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
1485    val job1Properties = new Properties()
1486    val job2Properties = new Properties()
1487    job1Properties.setProperty("testProperty", "job1")
1488    job2Properties.setProperty("testProperty", "job2")
1489
1490    // Run jobs 1 & 2, both referencing the same stage, then cancel job1.
1491    // Note that we have to submit job2 before we cancel job1 to have them actually share
1492    // *Stages*, and not just shuffle dependencies, due to skipped stages (at least until
1493    // we address SPARK-10193.)
1494    val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
1495    val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
1496    assert(scheduler.activeJobs.nonEmpty)
1497    val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")
1498
1499    // remove job1 as an ActiveJob
1500    cancel(jobId1)
1501
1502    // job2 should still be running
1503    assert(scheduler.activeJobs.nonEmpty)
1504    val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
1505    assert(testProperty1 != testProperty2)
1506    // NB: This next assert isn't necessarily the "desired" behavior; it's just to document
1507    // the current behavior.  We've already submitted the TaskSet for stage 0 based on job1, but
1508    // even though we have cancelled that job and are now running it because of job2, we haven't
1509    // updated the TaskSet's properties.  Changing the properties to "job2" is likely the more
1510    // correct behavior.
1511    val job1Id = 0  // TaskSet priority for Stages run with "job1" as the ActiveJob
1512    checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id)
1513    complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
1514
1515    shuffleDep1
1516  }
1517
1518  /**
1519   * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
1520   * later, active job if they were previously run under a job that is no longer active
1521   */
1522  test("stage used by two jobs, the first no longer active (SPARK-6880)") {
1523    launchJobsThatShareStageAndCancelFirst()
1524
1525    // The next check is the key for SPARK-6880.  For the stage which was shared by both job1 and
1526    // job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
1527    // the stage.
1528    checkJobPropertiesAndPriority(taskSets(1), "job2", 1)
1529
1530    complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
1531    assert(taskSets(2).properties != null)
1532    complete(taskSets(2), Seq((Success, 42)))
1533    assert(results === Map(0 -> 42))
1534    assert(scheduler.activeJobs.isEmpty)
1535
1536    assertDataStructuresEmpty()
1537  }
1538
1539  /**
1540   * Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
1541   * later, active job if they were previously run under a job that is no longer active, even when
1542   * there are fetch failures
1543   */
1544  test("stage used by two jobs, some fetch failures, and the first job no longer active " +
1545    "(SPARK-6880)") {
1546    val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
1547    val job2Id = 1  // TaskSet priority for Stages run with "job2" as the ActiveJob
1548
1549    // lets say there is a fetch failure in this task set, which makes us go back and
1550    // run stage 0, attempt 1
1551    complete(taskSets(1), Seq(
1552      (FetchFailed(makeBlockManagerId("hostA"), shuffleDep1.shuffleId, 0, 0, "ignored"), null)))
1553    scheduler.resubmitFailedStages()
1554
1555    // stage 0, attempt 1 should have the properties of job2
1556    assert(taskSets(2).stageId === 0)
1557    assert(taskSets(2).stageAttemptId === 1)
1558    checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id)
1559
1560    // run the rest of the stages normally, checking that they have the correct properties
1561    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
1562    checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id)
1563    complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
1564    checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id)
1565    complete(taskSets(4), Seq((Success, 42)))
1566    assert(results === Map(0 -> 42))
1567    assert(scheduler.activeJobs.isEmpty)
1568
1569    assertDataStructuresEmpty()
1570  }
1571
1572  test("run trivial shuffle with out-of-band failure and retry") {
1573    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1574    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1575    val shuffleId = shuffleDep.shuffleId
1576    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
1577    submit(reduceRdd, Array(0))
1578    // blockManagerMaster.removeExecutor("exec-hostA")
1579    // pretend we were told hostA went away
1580    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
1581    // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
1582    // rather than marking it is as failed and waiting.
1583    complete(taskSets(0), Seq(
1584      (Success, makeMapStatus("hostA", 1)),
1585      (Success, makeMapStatus("hostB", 1))))
1586    // have hostC complete the resubmitted task
1587    complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
1588    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
1589      HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
1590    complete(taskSets(2), Seq((Success, 42)))
1591    assert(results === Map(0 -> 42))
1592    assertDataStructuresEmpty()
1593  }
1594
1595  test("recursive shuffle failures") {
1596    val shuffleOneRdd = new MyRDD(sc, 2, Nil)
1597    val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2))
1598    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker)
1599    val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1))
1600    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker)
1601    submit(finalRdd, Array(0))
1602    // have the first stage complete normally
1603    complete(taskSets(0), Seq(
1604      (Success, makeMapStatus("hostA", 2)),
1605      (Success, makeMapStatus("hostB", 2))))
1606    // have the second stage complete normally
1607    complete(taskSets(1), Seq(
1608      (Success, makeMapStatus("hostA", 1)),
1609      (Success, makeMapStatus("hostC", 1))))
1610    // fail the third stage because hostA went down
1611    complete(taskSets(2), Seq(
1612      (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
1613    // TODO assert this:
1614    // blockManagerMaster.removeExecutor("exec-hostA")
1615    // have DAGScheduler try again
1616    scheduler.resubmitFailedStages()
1617    complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
1618    complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
1619    complete(taskSets(5), Seq((Success, 42)))
1620    assert(results === Map(0 -> 42))
1621    assertDataStructuresEmpty()
1622  }
1623
1624  test("cached post-shuffle") {
1625    val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
1626    val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, new HashPartitioner(2))
1627    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne), tracker = mapOutputTracker).cache()
1628    val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, new HashPartitioner(1))
1629    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo), tracker = mapOutputTracker)
1630    submit(finalRdd, Array(0))
1631    cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
1632    cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
1633    // complete stage 0
1634    complete(taskSets(0), Seq(
1635      (Success, makeMapStatus("hostA", 2)),
1636      (Success, makeMapStatus("hostB", 2))))
1637    // complete stage 1
1638    complete(taskSets(1), Seq(
1639      (Success, makeMapStatus("hostA", 1)),
1640      (Success, makeMapStatus("hostB", 1))))
1641    // pretend stage 2 failed because hostA went down
1642    complete(taskSets(2), Seq(
1643      (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
1644    // TODO assert this:
1645    // blockManagerMaster.removeExecutor("exec-hostA")
1646    // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
1647    scheduler.resubmitFailedStages()
1648    assertLocations(taskSets(3), Seq(Seq("hostD")))
1649    // allow hostD to recover
1650    complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
1651    complete(taskSets(4), Seq((Success, 42)))
1652    assert(results === Map(0 -> 42))
1653    assertDataStructuresEmpty()
1654  }
1655
1656  test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
1657    val acc = new LongAccumulator {
1658      override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException
1659      override def add(v: Long): Unit = throw new DAGSchedulerSuiteDummyException
1660    }
1661    sc.register(acc)
1662
1663    // Run this on executors
1664    sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
1665
1666    // Make sure we can still run commands
1667    assert(sc.parallelize(1 to 10, 2).count() === 10)
1668  }
1669
1670  /**
1671   * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
1672   *  Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.
1673   *  If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions
1674   *  and their differing causes as to which will represent result for job...
1675   */
1676  test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
1677    val e = intercept[SparkDriverExecutionException] {
1678      // Number of parallelized partitions implies number of tasks of job
1679      val rdd = sc.parallelize(1 to 10, 2)
1680      sc.runJob[Int, Int](
1681        rdd,
1682        (context: TaskContext, iter: Iterator[Int]) => iter.size,
1683        // For a robust test assertion, limit number of job tasks to 1; that is,
1684        // if multiple RDD partitions, use id of any one partition, say, first partition id=0
1685        Seq(0),
1686        (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
1687    }
1688    assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
1689
1690    // Make sure we can still run commands on our SparkContext
1691    assert(sc.parallelize(1 to 10, 2).count() === 10)
1692  }
1693
1694  test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {
1695    val e1 = intercept[DAGSchedulerSuiteDummyException] {
1696      val rdd = new MyRDD(sc, 2, Nil) {
1697        override def getPartitions: Array[Partition] = {
1698          throw new DAGSchedulerSuiteDummyException
1699        }
1700      }
1701      rdd.reduceByKey(_ + _, 1).count()
1702    }
1703
1704    // Make sure we can still run commands
1705    assert(sc.parallelize(1 to 10, 2).count() === 10)
1706  }
1707
1708  test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
1709    val e1 = intercept[SparkException] {
1710      val rdd = new MyRDD(sc, 2, Nil) {
1711        override def getPreferredLocations(split: Partition): Seq[String] = {
1712          throw new DAGSchedulerSuiteDummyException
1713        }
1714      }
1715      rdd.count()
1716    }
1717    assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))
1718
1719    // Make sure we can still run commands
1720    assert(sc.parallelize(1 to 10, 2).count() === 10)
1721  }
1722
1723  test("accumulator not calculated for resubmitted result stage") {
1724    // just for register
1725    val accum = AccumulatorSuite.createLongAccum("a")
1726    val finalRdd = new MyRDD(sc, 1, Nil)
1727    submit(finalRdd, Array(0))
1728    completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
1729    completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
1730    assert(results === Map(0 -> 42))
1731
1732    assert(accum.value === 1)
1733    assertDataStructuresEmpty()
1734  }
1735
1736  test("accumulators are updated on exception failures") {
1737    val acc1 = AccumulatorSuite.createLongAccum("ingenieur")
1738    val acc2 = AccumulatorSuite.createLongAccum("boulanger")
1739    val acc3 = AccumulatorSuite.createLongAccum("agriculteur")
1740    assert(AccumulatorContext.get(acc1.id).isDefined)
1741    assert(AccumulatorContext.get(acc2.id).isDefined)
1742    assert(AccumulatorContext.get(acc3.id).isDefined)
1743    val accUpdate1 = new LongAccumulator
1744    accUpdate1.metadata = acc1.metadata
1745    accUpdate1.setValue(15)
1746    val accUpdate2 = new LongAccumulator
1747    accUpdate2.metadata = acc2.metadata
1748    accUpdate2.setValue(13)
1749    val accUpdate3 = new LongAccumulator
1750    accUpdate3.metadata = acc3.metadata
1751    accUpdate3.setValue(18)
1752    val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3)
1753    val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo)
1754    val exceptionFailure = new ExceptionFailure(
1755      new SparkException("fondue?"),
1756      accumInfo).copy(accums = accumUpdates)
1757    submit(new MyRDD(sc, 1, Nil), Array(0))
1758    runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result"))
1759    assert(AccumulatorContext.get(acc1.id).get.value === 15L)
1760    assert(AccumulatorContext.get(acc2.id).get.value === 13L)
1761    assert(AccumulatorContext.get(acc3.id).get.value === 18L)
1762  }
1763
1764  test("reduce tasks should be placed locally with map output") {
1765    // Create a shuffleMapRdd with 1 partition
1766    val shuffleMapRdd = new MyRDD(sc, 1, Nil)
1767    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1768    val shuffleId = shuffleDep.shuffleId
1769    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
1770    submit(reduceRdd, Array(0))
1771    complete(taskSets(0), Seq(
1772      (Success, makeMapStatus("hostA", 1))))
1773    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
1774      HashSet(makeBlockManagerId("hostA")))
1775
1776    // Reducer should run on the same host that map task ran
1777    val reduceTaskSet = taskSets(1)
1778    assertLocations(reduceTaskSet, Seq(Seq("hostA")))
1779    complete(reduceTaskSet, Seq((Success, 42)))
1780    assert(results === Map(0 -> 42))
1781    assertDataStructuresEmpty()
1782  }
1783
1784  test("reduce task locality preferences should only include machines with largest map outputs") {
1785    val numMapTasks = 4
1786    // Create a shuffleMapRdd with more partitions
1787    val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
1788    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
1789    val shuffleId = shuffleDep.shuffleId
1790    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
1791    submit(reduceRdd, Array(0))
1792
1793    val statuses = (1 to numMapTasks).map { i =>
1794      (Success, makeMapStatus("host" + i, 1, (10*i).toByte))
1795    }
1796    complete(taskSets(0), statuses)
1797
1798    // Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data
1799    val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)
1800
1801    val reduceTaskSet = taskSets(1)
1802    assertLocations(reduceTaskSet, Seq(hosts))
1803    complete(reduceTaskSet, Seq((Success, 42)))
1804    assert(results === Map(0 -> 42))
1805    assertDataStructuresEmpty()
1806  }
1807
1808  test("stages with both narrow and shuffle dependencies use narrow ones for locality") {
1809    // Create an RDD that has both a shuffle dependency and a narrow dependency (e.g. for a join)
1810    val rdd1 = new MyRDD(sc, 1, Nil)
1811    val rdd2 = new MyRDD(sc, 1, Nil, locations = Seq(Seq("hostB")))
1812    val shuffleDep = new ShuffleDependency(rdd1, new HashPartitioner(1))
1813    val narrowDep = new OneToOneDependency(rdd2)
1814    val shuffleId = shuffleDep.shuffleId
1815    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep, narrowDep), tracker = mapOutputTracker)
1816    submit(reduceRdd, Array(0))
1817    complete(taskSets(0), Seq(
1818      (Success, makeMapStatus("hostA", 1))))
1819    assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
1820      HashSet(makeBlockManagerId("hostA")))
1821
1822    // Reducer should run where RDD 2 has preferences, even though though it also has a shuffle dep
1823    val reduceTaskSet = taskSets(1)
1824    assertLocations(reduceTaskSet, Seq(Seq("hostB")))
1825    complete(reduceTaskSet, Seq((Success, 42)))
1826    assert(results === Map(0 -> 42))
1827    assertDataStructuresEmpty()
1828  }
1829
1830  test("Spark exceptions should include call site in stack trace") {
1831    val e = intercept[SparkException] {
1832      sc.parallelize(1 to 10, 2).map { _ => throw new RuntimeException("uh-oh!") }.count()
1833    }
1834
1835    // Does not include message, ONLY stack trace.
1836    val stackTraceString = Utils.exceptionString(e)
1837
1838    // should actually include the RDD operation that invoked the method:
1839    assert(stackTraceString.contains("org.apache.spark.rdd.RDD.count"))
1840
1841    // should include the FunSuite setup:
1842    assert(stackTraceString.contains("org.scalatest.FunSuite"))
1843  }
1844
1845  test("catch errors in event loop") {
1846    // this is a test of our testing framework -- make sure errors in event loop don't get ignored
1847
1848    // just run some bad event that will throw an exception -- we'll give a null TaskEndReason
1849    val rdd1 = new MyRDD(sc, 1, Nil)
1850    submit(rdd1, Array(0))
1851    intercept[Exception] {
1852      complete(taskSets(0), Seq(
1853        (null, makeMapStatus("hostA", 1))))
1854    }
1855  }
1856
1857  test("simple map stage submission") {
1858    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1859    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
1860    val shuffleId = shuffleDep.shuffleId
1861    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
1862
1863    // Submit a map stage by itself
1864    submitMapStage(shuffleDep)
1865    assert(results.size === 0)  // No results yet
1866    completeShuffleMapStageSuccessfully(0, 0, 1)
1867    assert(results.size === 1)
1868    results.clear()
1869    assertDataStructuresEmpty()
1870
1871    // Submit a reduce job that depends on this map stage; it should directly do the reduce
1872    submit(reduceRdd, Array(0))
1873    completeNextResultStageWithSuccess(2, 0)
1874    assert(results === Map(0 -> 42))
1875    results.clear()
1876    assertDataStructuresEmpty()
1877
1878    // Check that if we submit the map stage again, no tasks run
1879    submitMapStage(shuffleDep)
1880    assert(results.size === 1)
1881    assertDataStructuresEmpty()
1882  }
1883
1884  test("map stage submission with reduce stage also depending on the data") {
1885    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1886    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
1887    val shuffleId = shuffleDep.shuffleId
1888    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
1889
1890    // Submit the map stage by itself
1891    submitMapStage(shuffleDep)
1892
1893    // Submit a reduce job that depends on this map stage
1894    submit(reduceRdd, Array(0))
1895
1896    // Complete tasks for the map stage
1897    completeShuffleMapStageSuccessfully(0, 0, 1)
1898    assert(results.size === 1)
1899    results.clear()
1900
1901    // Complete tasks for the reduce stage
1902    completeNextResultStageWithSuccess(1, 0)
1903    assert(results === Map(0 -> 42))
1904    results.clear()
1905    assertDataStructuresEmpty()
1906
1907    // Check that if we submit the map stage again, no tasks run
1908    submitMapStage(shuffleDep)
1909    assert(results.size === 1)
1910    assertDataStructuresEmpty()
1911  }
1912
1913  test("map stage submission with fetch failure") {
1914    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
1915    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
1916    val shuffleId = shuffleDep.shuffleId
1917    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
1918
1919    // Submit a map stage by itself
1920    submitMapStage(shuffleDep)
1921    complete(taskSets(0), Seq(
1922      (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
1923      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
1924    assert(results.size === 1)
1925    results.clear()
1926    assertDataStructuresEmpty()
1927
1928    // Submit a reduce job that depends on this map stage, but where one reduce will fail a fetch
1929    submit(reduceRdd, Array(0, 1))
1930    complete(taskSets(1), Seq(
1931      (Success, 42),
1932      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
1933    // Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch
1934    // from, then TaskSet 3 will run the reduce stage
1935    scheduler.resubmitFailedStages()
1936    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
1937    complete(taskSets(3), Seq((Success, 43)))
1938    assert(results === Map(0 -> 42, 1 -> 43))
1939    results.clear()
1940    assertDataStructuresEmpty()
1941
1942    // Run another reduce job without a failure; this should just work
1943    submit(reduceRdd, Array(0, 1))
1944    complete(taskSets(4), Seq(
1945      (Success, 44),
1946      (Success, 45)))
1947    assert(results === Map(0 -> 44, 1 -> 45))
1948    results.clear()
1949    assertDataStructuresEmpty()
1950
1951    // Resubmit the map stage; this should also just work
1952    submitMapStage(shuffleDep)
1953    assert(results.size === 1)
1954    results.clear()
1955    assertDataStructuresEmpty()
1956  }
1957
1958  /**
1959   * In this test, we have three RDDs with shuffle dependencies, and we submit map stage jobs
1960   * that are waiting on each one, as well as a reduce job on the last one. We test that all of
1961   * these jobs complete even if there are some fetch failures in both shuffles.
1962   */
1963  test("map stage submission with multiple shared stages and failures") {
1964    val rdd1 = new MyRDD(sc, 2, Nil)
1965    val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
1966    val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
1967    val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
1968    val rdd3 = new MyRDD(sc, 2, List(dep2), tracker = mapOutputTracker)
1969
1970    val listener1 = new SimpleListener
1971    val listener2 = new SimpleListener
1972    val listener3 = new SimpleListener
1973
1974    submitMapStage(dep1, listener1)
1975    submitMapStage(dep2, listener2)
1976    submit(rdd3, Array(0, 1), listener = listener3)
1977
1978    // Complete the first stage
1979    assert(taskSets(0).stageId === 0)
1980    complete(taskSets(0), Seq(
1981      (Success, makeMapStatus("hostA", rdd1.partitions.length)),
1982      (Success, makeMapStatus("hostB", rdd1.partitions.length))))
1983    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
1984      HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
1985    assert(listener1.results.size === 1)
1986
1987    // When attempting the second stage, show a fetch failure
1988    assert(taskSets(1).stageId === 1)
1989    complete(taskSets(1), Seq(
1990      (Success, makeMapStatus("hostA", rdd2.partitions.length)),
1991      (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
1992    scheduler.resubmitFailedStages()
1993    assert(listener2.results.size === 0)    // Second stage listener should not have a result yet
1994
1995    // Stage 0 should now be running as task set 2; make its task succeed
1996    assert(taskSets(2).stageId === 0)
1997    complete(taskSets(2), Seq(
1998      (Success, makeMapStatus("hostC", rdd2.partitions.length))))
1999    assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
2000      HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
2001    assert(listener2.results.size === 0)    // Second stage listener should still not have a result
2002
2003    // Stage 1 should now be running as task set 3; make its first task succeed
2004    assert(taskSets(3).stageId === 1)
2005    complete(taskSets(3), Seq(
2006      (Success, makeMapStatus("hostB", rdd2.partitions.length)),
2007      (Success, makeMapStatus("hostD", rdd2.partitions.length))))
2008    assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet ===
2009      HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD")))
2010    assert(listener2.results.size === 1)
2011
2012    // Finally, the reduce job should be running as task set 4; make it see a fetch failure,
2013    // then make it run again and succeed
2014    assert(taskSets(4).stageId === 2)
2015    complete(taskSets(4), Seq(
2016      (Success, 52),
2017      (FetchFailed(makeBlockManagerId("hostD"), dep2.shuffleId, 0, 0, "ignored"), null)))
2018    scheduler.resubmitFailedStages()
2019
2020    // TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2
2021    assert(taskSets(5).stageId === 1)
2022    complete(taskSets(5), Seq(
2023      (Success, makeMapStatus("hostE", rdd2.partitions.length))))
2024    complete(taskSets(6), Seq(
2025      (Success, 53)))
2026    assert(listener3.results === Map(0 -> 52, 1 -> 53))
2027    assertDataStructuresEmpty()
2028  }
2029
2030  /**
2031   * In this test, we run a map stage where one of the executors fails but we still receive a
2032   * "zombie" complete message from that executor. We want to make sure the stage is not reported
2033   * as done until all tasks have completed.
2034   */
2035  test("map stage submission with executor failure late map task completions") {
2036    val shuffleMapRdd = new MyRDD(sc, 3, Nil)
2037    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
2038
2039    submitMapStage(shuffleDep)
2040
2041    val oldTaskSet = taskSets(0)
2042    runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
2043    assert(results.size === 0)    // Map stage job should not be complete yet
2044
2045    // Pretend host A was lost
2046    val oldEpoch = mapOutputTracker.getEpoch
2047    runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
2048    val newEpoch = mapOutputTracker.getEpoch
2049    assert(newEpoch > oldEpoch)
2050
2051    // Suppose we also get a completed event from task 1 on the same host; this should be ignored
2052    runEvent(makeCompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2)))
2053    assert(results.size === 0)    // Map stage job should not be complete yet
2054
2055    // A completion from another task should work because it's a non-failed host
2056    runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
2057    assert(results.size === 0)    // Map stage job should not be complete yet
2058
2059    // Now complete tasks in the second task set
2060    val newTaskSet = taskSets(1)
2061    assert(newTaskSet.tasks.size === 2)     // Both tasks 0 and 1 were on on hostA
2062    runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
2063    assert(results.size === 0)    // Map stage job should not be complete yet
2064    runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
2065    assert(results.size === 1)    // Map stage job should now finally be complete
2066    assertDataStructuresEmpty()
2067
2068    // Also test that a reduce stage using this shuffled data can immediately run
2069    val reduceRDD = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
2070    results.clear()
2071    submit(reduceRDD, Array(0, 1))
2072    complete(taskSets(2), Seq((Success, 42), (Success, 43)))
2073    assert(results === Map(0 -> 42, 1 -> 43))
2074    results.clear()
2075    assertDataStructuresEmpty()
2076  }
2077
2078  /**
2079   * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that
2080   * getShuffleDependencies correctly returns the direct shuffle dependencies of a particular
2081   * RDD. The test creates the following RDD graph (where n denotes a narrow dependency and s
2082   * denotes a shuffle dependency):
2083   *
2084   * A <------------s---------,
2085   *                           \
2086   * B <--s-- C <--s-- D <--n---`-- E
2087   *
2088   * Here, the direct shuffle dependency of C is just the shuffle dependency on B. The direct
2089   * shuffle dependencies of E are the shuffle dependency on A and the shuffle dependency on C.
2090   */
2091  test("getShuffleDependencies correctly returns only direct shuffle parents") {
2092    val rddA = new MyRDD(sc, 2, Nil)
2093    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
2094    val rddB = new MyRDD(sc, 2, Nil)
2095    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1))
2096    val rddC = new MyRDD(sc, 1, List(shuffleDepB))
2097    val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1))
2098    val rddD = new MyRDD(sc, 1, List(shuffleDepC))
2099    val narrowDepD = new OneToOneDependency(rddD)
2100    val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker = mapOutputTracker)
2101
2102    assert(scheduler.getShuffleDependencies(rddA) === Set())
2103    assert(scheduler.getShuffleDependencies(rddB) === Set())
2104    assert(scheduler.getShuffleDependencies(rddC) === Set(shuffleDepB))
2105    assert(scheduler.getShuffleDependencies(rddD) === Set(shuffleDepC))
2106    assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC))
2107  }
2108
2109  test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" +
2110    "still behave correctly on fetch failures") {
2111    // Runs a job that always encounters a fetch failure, so should eventually be aborted
2112    def runJobWithPersistentFetchFailure: Unit = {
2113      val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
2114      val shuffleHandle =
2115        rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
2116      rdd1.map {
2117        case (x, _) if (x == 1) =>
2118          throw new FetchFailedException(
2119            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
2120        case (x, _) => x
2121      }.count()
2122    }
2123
2124    // Runs a job that encounters a single fetch failure but succeeds on the second attempt
2125    def runJobWithTemporaryFetchFailure: Unit = {
2126      object FailThisAttempt {
2127        val _fail = new AtomicBoolean(true)
2128      }
2129      val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).map(x => (x, 1)).groupByKey()
2130      val shuffleHandle =
2131        rdd1.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
2132      rdd1.map {
2133        case (x, _) if (x == 1) && FailThisAttempt._fail.getAndSet(false) =>
2134          throw new FetchFailedException(
2135            BlockManagerId("1", "1", 1), shuffleHandle.shuffleId, 0, 0, "test")
2136      }
2137    }
2138
2139    failAfter(10.seconds) {
2140      val e = intercept[SparkException] {
2141        runJobWithPersistentFetchFailure
2142      }
2143      assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
2144    }
2145
2146    // Run a second job that will fail due to a fetch failure.
2147    // This job will hang without the fix for SPARK-17644.
2148    failAfter(10.seconds) {
2149      val e = intercept[SparkException] {
2150        runJobWithPersistentFetchFailure
2151      }
2152      assert(e.getMessage.contains("org.apache.spark.shuffle.FetchFailedException"))
2153    }
2154
2155    failAfter(10.seconds) {
2156      try {
2157        runJobWithTemporaryFetchFailure
2158      } catch {
2159        case e: Throwable => fail("A job with one fetch failure should eventually succeed")
2160      }
2161    }
2162  }
2163
2164  /**
2165   * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
2166   * Note that this checks only the host and not the executor ID.
2167   */
2168  private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
2169    assert(hosts.size === taskSet.tasks.size)
2170    for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
2171      assert(taskLocs.map(_.host).toSet === expectedLocs.toSet)
2172    }
2173  }
2174
2175  private def assertDataStructuresEmpty(): Unit = {
2176    assert(scheduler.activeJobs.isEmpty)
2177    assert(scheduler.failedStages.isEmpty)
2178    assert(scheduler.jobIdToActiveJob.isEmpty)
2179    assert(scheduler.jobIdToStageIds.isEmpty)
2180    assert(scheduler.stageIdToStage.isEmpty)
2181    assert(scheduler.runningStages.isEmpty)
2182    assert(scheduler.shuffleIdToMapStage.isEmpty)
2183    assert(scheduler.waitingStages.isEmpty)
2184    assert(scheduler.outputCommitCoordinator.isEmpty)
2185  }
2186
2187  // Nothing in this test should break if the task info's fields are null, but
2188  // OutputCommitCoordinator requires the task info itself to not be null.
2189  private def createFakeTaskInfo(): TaskInfo = {
2190    val info = new TaskInfo(0, 0, 0, 0L, "", "", TaskLocality.ANY, false)
2191    info.finishTime = 1  // to prevent spurious errors in JobProgressListener
2192    info
2193  }
2194
2195  private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
2196    val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
2197    info.finishTime = 1  // to prevent spurious errors in JobProgressListener
2198    info
2199  }
2200
2201  private def makeCompletionEvent(
2202      task: Task[_],
2203      reason: TaskEndReason,
2204      result: Any,
2205      extraAccumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty,
2206      taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
2207    val accumUpdates = reason match {
2208      case Success => task.metrics.accumulators()
2209      case ef: ExceptionFailure => ef.accums
2210      case _ => Seq.empty
2211    }
2212    CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo)
2213  }
2214}
2215
2216object DAGSchedulerSuite {
2217  def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
2218    MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
2219
2220  def makeBlockManagerId(host: String): BlockManagerId =
2221    BlockManagerId("exec-" + host, host, 12345)
2222}
2223