1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.scheduler
19
20import java.io.NotSerializableException
21import java.util.Properties
22import java.util.concurrent.TimeUnit
23import java.util.concurrent.atomic.AtomicInteger
24
25import scala.annotation.tailrec
26import scala.collection.Map
27import scala.collection.mutable.{HashMap, HashSet, Stack}
28import scala.concurrent.duration._
29import scala.language.existentials
30import scala.language.postfixOps
31import scala.util.control.NonFatal
32
33import org.apache.commons.lang3.SerializationUtils
34
35import org.apache.spark._
36import org.apache.spark.broadcast.Broadcast
37import org.apache.spark.executor.TaskMetrics
38import org.apache.spark.internal.Logging
39import org.apache.spark.network.util.JavaUtils
40import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
41import org.apache.spark.rdd.RDD
42import org.apache.spark.rpc.RpcTimeout
43import org.apache.spark.storage._
44import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
45import org.apache.spark.util._
46
47/**
48 * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
49 * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
50 * minimal schedule to run the job. It then submits stages as TaskSets to an underlying
51 * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent
52 * tasks that can run right away based on the data that's already on the cluster (e.g. map output
53 * files from previous stages), though it may fail if this data becomes unavailable.
54 *
55 * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with
56 * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks
57 * in each stage, but operations with shuffle dependencies require multiple stages (one to write a
58 * set of map output files, and another to read those files after a barrier). In the end, every
59 * stage will have only shuffle dependencies on other stages, and may compute multiple operations
60 * inside it. The actual pipelining of these operations happens in the RDD.compute() functions of
61 * various RDDs (MappedRDD, FilteredRDD, etc).
62 *
63 * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred
64 * locations to run each task on, based on the current cache status, and passes these to the
65 * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
66 * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
67 * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
68 * a small number of times before cancelling the whole stage.
69 *
70 * When looking through this code, there are several key concepts:
71 *
72 *  - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler.
73 *    For example, when the user calls an action, like count(), a job will be submitted through
74 *    submitJob. Each Job may require the execution of multiple stages to build intermediate data.
75 *
76 *  - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each
77 *    task computes the same function on partitions of the same RDD. Stages are separated at shuffle
78 *    boundaries, which introduce a barrier (where we must wait for the previous stage to finish to
79 *    fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that
80 *    executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle.
81 *    Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
82 *
83 *  - Tasks are individual units of work, each sent to one machine.
84 *
85 *  - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them
86 *    and likewise remembers which shuffle map stages have already produced output files to avoid
87 *    redoing the map side of a shuffle.
88 *
89 *  - Preferred locations: the DAGScheduler also computes where to run each task in a stage based
90 *    on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
91 *
92 *  - Cleanup: all data structures are cleared when the running jobs that depend on them finish,
93 *    to prevent memory leaks in a long-running application.
94 *
95 * To recover from failures, the same stage might need to run multiple times, which are called
96 * "attempts". If the TaskScheduler reports that a task failed because a map output file from a
97 * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a
98 * CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small
99 * amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost
100 * stage(s) that compute the missing tasks. As part of this process, we might also have to create
101 * Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since
102 * tasks from the old attempt of a stage could still be running, care must be taken to map any
103 * events received in the correct Stage object.
104 *
105 * Here's a checklist to use when making or reviewing changes to this class:
106 *
107 *  - All data structures should be cleared when the jobs involving them end to avoid indefinite
108 *    accumulation of state in long-running programs.
109 *
110 *  - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
111 *    include the new structure. This will help to catch memory leaks.
112 */
113private[spark]
114class DAGScheduler(
115    private[scheduler] val sc: SparkContext,
116    private[scheduler] val taskScheduler: TaskScheduler,
117    listenerBus: LiveListenerBus,
118    mapOutputTracker: MapOutputTrackerMaster,
119    blockManagerMaster: BlockManagerMaster,
120    env: SparkEnv,
121    clock: Clock = new SystemClock())
122  extends Logging {
123
124  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
125    this(
126      sc,
127      taskScheduler,
128      sc.listenerBus,
129      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
130      sc.env.blockManager.master,
131      sc.env)
132  }
133
134  def this(sc: SparkContext) = this(sc, sc.taskScheduler)
135
136  private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
137
138  private[scheduler] val nextJobId = new AtomicInteger(0)
139  private[scheduler] def numTotalJobs: Int = nextJobId.get()
140  private val nextStageId = new AtomicInteger(0)
141
142  private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
143  private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
144  /**
145   * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for
146   * that dependency. Only includes stages that are part of currently running job (when the job(s)
147   * that require the shuffle stage complete, the mapping will be removed, and the only record of
148   * the shuffle data will be in the MapOutputTracker).
149   */
150  private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
151  private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
152
153  // Stages we need to run whose parents aren't done
154  private[scheduler] val waitingStages = new HashSet[Stage]
155
156  // Stages we are running right now
157  private[scheduler] val runningStages = new HashSet[Stage]
158
159  // Stages that must be resubmitted due to fetch failures
160  private[scheduler] val failedStages = new HashSet[Stage]
161
162  private[scheduler] val activeJobs = new HashSet[ActiveJob]
163
164  /**
165   * Contains the locations that each RDD's partitions are cached on.  This map's keys are RDD ids
166   * and its values are arrays indexed by partition numbers. Each array value is the set of
167   * locations where that RDD partition is cached.
168   *
169   * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
170   */
171  private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
172
173  // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
174  // every task. When we detect a node failing, we note the current epoch number and failed
175  // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
176  //
177  // TODO: Garbage collect information about failure epochs when we know there are no more
178  //       stray messages to detect.
179  private val failedEpoch = new HashMap[String, Long]
180
181  private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
182
183  // A closure serializer that we reuse.
184  // This is only safe because DAGScheduler runs in a single thread.
185  private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
186
187  /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
188  private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
189
190  private val messageScheduler =
191    ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
192
193  private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
194  taskScheduler.setDAGScheduler(this)
195
196  /**
197   * Called by the TaskSetManager to report task's starting.
198   */
199  def taskStarted(task: Task[_], taskInfo: TaskInfo) {
200    eventProcessLoop.post(BeginEvent(task, taskInfo))
201  }
202
203  /**
204   * Called by the TaskSetManager to report that a task has completed
205   * and results are being fetched remotely.
206   */
207  def taskGettingResult(taskInfo: TaskInfo) {
208    eventProcessLoop.post(GettingResultEvent(taskInfo))
209  }
210
211  /**
212   * Called by the TaskSetManager to report task completions or failures.
213   */
214  def taskEnded(
215      task: Task[_],
216      reason: TaskEndReason,
217      result: Any,
218      accumUpdates: Seq[AccumulatorV2[_, _]],
219      taskInfo: TaskInfo): Unit = {
220    eventProcessLoop.post(
221      CompletionEvent(task, reason, result, accumUpdates, taskInfo))
222  }
223
224  /**
225   * Update metrics for in-progress tasks and let the master know that the BlockManager is still
226   * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
227   * indicating that the block manager should re-register.
228   */
229  def executorHeartbeatReceived(
230      execId: String,
231      // (taskId, stageId, stageAttemptId, accumUpdates)
232      accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
233      blockManagerId: BlockManagerId): Boolean = {
234    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
235    blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
236      BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
237  }
238
239  /**
240   * Called by TaskScheduler implementation when an executor fails.
241   */
242  def executorLost(execId: String, reason: ExecutorLossReason): Unit = {
243    eventProcessLoop.post(ExecutorLost(execId, reason))
244  }
245
246  /**
247   * Called by TaskScheduler implementation when a host is added.
248   */
249  def executorAdded(execId: String, host: String): Unit = {
250    eventProcessLoop.post(ExecutorAdded(execId, host))
251  }
252
253  /**
254   * Called by the TaskSetManager to cancel an entire TaskSet due to either repeated failures or
255   * cancellation of the job itself.
256   */
257  def taskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]): Unit = {
258    eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
259  }
260
261  private[scheduler]
262  def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
263    // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
264    if (!cacheLocs.contains(rdd.id)) {
265      // Note: if the storage level is NONE, we don't need to get locations from block manager.
266      val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
267        IndexedSeq.fill(rdd.partitions.length)(Nil)
268      } else {
269        val blockIds =
270          rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
271        blockManagerMaster.getLocations(blockIds).map { bms =>
272          bms.map(bm => TaskLocation(bm.host, bm.executorId))
273        }
274      }
275      cacheLocs(rdd.id) = locs
276    }
277    cacheLocs(rdd.id)
278  }
279
280  private def clearCacheLocs(): Unit = cacheLocs.synchronized {
281    cacheLocs.clear()
282  }
283
284  /**
285   * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
286   * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
287   * addition to any missing ancestor shuffle map stages.
288   */
289  private def getOrCreateShuffleMapStage(
290      shuffleDep: ShuffleDependency[_, _, _],
291      firstJobId: Int): ShuffleMapStage = {
292    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
293      case Some(stage) =>
294        stage
295
296      case None =>
297        // Create stages for all missing ancestor shuffle dependencies.
298        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
299          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
300          // that were not already in shuffleIdToMapStage, it's possible that by the time we
301          // get to a particular dependency in the foreach loop, it's been added to
302          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
303          // SPARK-13902 for more information.
304          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
305            createShuffleMapStage(dep, firstJobId)
306          }
307        }
308        // Finally, create a stage for the given shuffle dependency.
309        createShuffleMapStage(shuffleDep, firstJobId)
310    }
311  }
312
313  /**
314   * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
315   * previously run stage generated the same shuffle data, this function will copy the output
316   * locations that are still available from the previous shuffle to avoid unnecessarily
317   * regenerating data.
318   */
319  def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
320    val rdd = shuffleDep.rdd
321    val numTasks = rdd.partitions.length
322    val parents = getOrCreateParentStages(rdd, jobId)
323    val id = nextStageId.getAndIncrement()
324    val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
325
326    stageIdToStage(id) = stage
327    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
328    updateJobIdStageIdMaps(jobId, stage)
329
330    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
331      // A previously run stage generated partitions for this shuffle, so for each output
332      // that's still available, copy information about that output location to the new stage
333      // (so we don't unnecessarily re-compute that data).
334      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
335      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
336      (0 until locs.length).foreach { i =>
337        if (locs(i) ne null) {
338          // locs(i) will be null if missing
339          stage.addOutputLoc(i, locs(i))
340        }
341      }
342    } else {
343      // Kind of ugly: need to register RDDs with the cache and map output tracker here
344      // since we can't do it in the RDD constructor because # of partitions is unknown
345      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
346      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
347    }
348    stage
349  }
350
351  /**
352   * Create a ResultStage associated with the provided jobId.
353   */
354  private def createResultStage(
355      rdd: RDD[_],
356      func: (TaskContext, Iterator[_]) => _,
357      partitions: Array[Int],
358      jobId: Int,
359      callSite: CallSite): ResultStage = {
360    val parents = getOrCreateParentStages(rdd, jobId)
361    val id = nextStageId.getAndIncrement()
362    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
363    stageIdToStage(id) = stage
364    updateJobIdStageIdMaps(jobId, stage)
365    stage
366  }
367
368  /**
369   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
370   * the provided firstJobId.
371   */
372  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
373    getShuffleDependencies(rdd).map { shuffleDep =>
374      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
375    }.toList
376  }
377
378  /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
379  private def getMissingAncestorShuffleDependencies(
380      rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
381    val ancestors = new Stack[ShuffleDependency[_, _, _]]
382    val visited = new HashSet[RDD[_]]
383    // We are manually maintaining a stack here to prevent StackOverflowError
384    // caused by recursively visiting
385    val waitingForVisit = new Stack[RDD[_]]
386    waitingForVisit.push(rdd)
387    while (waitingForVisit.nonEmpty) {
388      val toVisit = waitingForVisit.pop()
389      if (!visited(toVisit)) {
390        visited += toVisit
391        getShuffleDependencies(toVisit).foreach { shuffleDep =>
392          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
393            ancestors.push(shuffleDep)
394            waitingForVisit.push(shuffleDep.rdd)
395          } // Otherwise, the dependency and its ancestors have already been registered.
396        }
397      }
398    }
399    ancestors
400  }
401
402  /**
403   * Returns shuffle dependencies that are immediate parents of the given RDD.
404   *
405   * This function will not return more distant ancestors.  For example, if C has a shuffle
406   * dependency on B which has a shuffle dependency on A:
407   *
408   * A <-- B <-- C
409   *
410   * calling this function with rdd C will only return the B <-- C dependency.
411   *
412   * This function is scheduler-visible for the purpose of unit testing.
413   */
414  private[scheduler] def getShuffleDependencies(
415      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
416    val parents = new HashSet[ShuffleDependency[_, _, _]]
417    val visited = new HashSet[RDD[_]]
418    val waitingForVisit = new Stack[RDD[_]]
419    waitingForVisit.push(rdd)
420    while (waitingForVisit.nonEmpty) {
421      val toVisit = waitingForVisit.pop()
422      if (!visited(toVisit)) {
423        visited += toVisit
424        toVisit.dependencies.foreach {
425          case shuffleDep: ShuffleDependency[_, _, _] =>
426            parents += shuffleDep
427          case dependency =>
428            waitingForVisit.push(dependency.rdd)
429        }
430      }
431    }
432    parents
433  }
434
435  private def getMissingParentStages(stage: Stage): List[Stage] = {
436    val missing = new HashSet[Stage]
437    val visited = new HashSet[RDD[_]]
438    // We are manually maintaining a stack here to prevent StackOverflowError
439    // caused by recursively visiting
440    val waitingForVisit = new Stack[RDD[_]]
441    def visit(rdd: RDD[_]) {
442      if (!visited(rdd)) {
443        visited += rdd
444        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
445        if (rddHasUncachedPartitions) {
446          for (dep <- rdd.dependencies) {
447            dep match {
448              case shufDep: ShuffleDependency[_, _, _] =>
449                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
450                if (!mapStage.isAvailable) {
451                  missing += mapStage
452                }
453              case narrowDep: NarrowDependency[_] =>
454                waitingForVisit.push(narrowDep.rdd)
455            }
456          }
457        }
458      }
459    }
460    waitingForVisit.push(stage.rdd)
461    while (waitingForVisit.nonEmpty) {
462      visit(waitingForVisit.pop())
463    }
464    missing.toList
465  }
466
467  /**
468   * Registers the given jobId among the jobs that need the given stage and
469   * all of that stage's ancestors.
470   */
471  private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
472    @tailrec
473    def updateJobIdStageIdMapsList(stages: List[Stage]) {
474      if (stages.nonEmpty) {
475        val s = stages.head
476        s.jobIds += jobId
477        jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
478        val parentsWithoutThisJobId = s.parents.filter { ! _.jobIds.contains(jobId) }
479        updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
480      }
481    }
482    updateJobIdStageIdMapsList(List(stage))
483  }
484
485  /**
486   * Removes state for job and any stages that are not needed by any other job.  Does not
487   * handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks.
488   *
489   * @param job The job whose state to cleanup.
490   */
491  private def cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit = {
492    val registeredStages = jobIdToStageIds.get(job.jobId)
493    if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
494      logError("No stages registered for job " + job.jobId)
495    } else {
496      stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach {
497        case (stageId, stage) =>
498          val jobSet = stage.jobIds
499          if (!jobSet.contains(job.jobId)) {
500            logError(
501              "Job %d not registered for stage %d even though that stage was registered for the job"
502              .format(job.jobId, stageId))
503          } else {
504            def removeStage(stageId: Int) {
505              // data structures based on Stage
506              for (stage <- stageIdToStage.get(stageId)) {
507                if (runningStages.contains(stage)) {
508                  logDebug("Removing running stage %d".format(stageId))
509                  runningStages -= stage
510                }
511                for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
512                  shuffleIdToMapStage.remove(k)
513                }
514                if (waitingStages.contains(stage)) {
515                  logDebug("Removing stage %d from waiting set.".format(stageId))
516                  waitingStages -= stage
517                }
518                if (failedStages.contains(stage)) {
519                  logDebug("Removing stage %d from failed set.".format(stageId))
520                  failedStages -= stage
521                }
522              }
523              // data structures based on StageId
524              stageIdToStage -= stageId
525              logDebug("After removal of stage %d, remaining stages = %d"
526                .format(stageId, stageIdToStage.size))
527            }
528
529            jobSet -= job.jobId
530            if (jobSet.isEmpty) { // no other job needs this stage
531              removeStage(stageId)
532            }
533          }
534      }
535    }
536    jobIdToStageIds -= job.jobId
537    jobIdToActiveJob -= job.jobId
538    activeJobs -= job
539    job.finalStage match {
540      case r: ResultStage => r.removeActiveJob()
541      case m: ShuffleMapStage => m.removeActiveJob(job)
542    }
543  }
544
545  /**
546   * Submit an action job to the scheduler.
547   *
548   * @param rdd target RDD to run tasks on
549   * @param func a function to run on each partition of the RDD
550   * @param partitions set of partitions to run on; some jobs may not want to compute on all
551   *   partitions of the target RDD, e.g. for operations like first()
552   * @param callSite where in the user program this job was called
553   * @param resultHandler callback to pass each result to
554   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
555   *
556   * @return a JobWaiter object that can be used to block until the job finishes executing
557   *         or can be used to cancel the job.
558   *
559   * @throws IllegalArgumentException when partitions ids are illegal
560   */
561  def submitJob[T, U](
562      rdd: RDD[T],
563      func: (TaskContext, Iterator[T]) => U,
564      partitions: Seq[Int],
565      callSite: CallSite,
566      resultHandler: (Int, U) => Unit,
567      properties: Properties): JobWaiter[U] = {
568    // Check to make sure we are not launching a task on a partition that does not exist.
569    val maxPartitions = rdd.partitions.length
570    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
571      throw new IllegalArgumentException(
572        "Attempting to access a non-existent partition: " + p + ". " +
573          "Total number of partitions: " + maxPartitions)
574    }
575
576    val jobId = nextJobId.getAndIncrement()
577    if (partitions.size == 0) {
578      // Return immediately if the job is running 0 tasks
579      return new JobWaiter[U](this, jobId, 0, resultHandler)
580    }
581
582    assert(partitions.size > 0)
583    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
584    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
585    eventProcessLoop.post(JobSubmitted(
586      jobId, rdd, func2, partitions.toArray, callSite, waiter,
587      SerializationUtils.clone(properties)))
588    waiter
589  }
590
591  /**
592   * Run an action job on the given RDD and pass all the results to the resultHandler function as
593   * they arrive.
594   *
595   * @param rdd target RDD to run tasks on
596   * @param func a function to run on each partition of the RDD
597   * @param partitions set of partitions to run on; some jobs may not want to compute on all
598   *   partitions of the target RDD, e.g. for operations like first()
599   * @param callSite where in the user program this job was called
600   * @param resultHandler callback to pass each result to
601   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
602   *
603   * @throws Exception when the job fails
604   */
605  def runJob[T, U](
606      rdd: RDD[T],
607      func: (TaskContext, Iterator[T]) => U,
608      partitions: Seq[Int],
609      callSite: CallSite,
610      resultHandler: (Int, U) => Unit,
611      properties: Properties): Unit = {
612    val start = System.nanoTime
613    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
614    // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
615    // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
616    // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
617    // safe to pass in null here. For more detail, see SPARK-13747.
618    val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
619    waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
620    waiter.completionFuture.value.get match {
621      case scala.util.Success(_) =>
622        logInfo("Job %d finished: %s, took %f s".format
623          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
624      case scala.util.Failure(exception) =>
625        logInfo("Job %d failed: %s, took %f s".format
626          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
627        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
628        val callerStackTrace = Thread.currentThread().getStackTrace.tail
629        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
630        throw exception
631    }
632  }
633
634  /**
635   * Run an approximate job on the given RDD and pass all the results to an ApproximateEvaluator
636   * as they arrive. Returns a partial result object from the evaluator.
637   *
638   * @param rdd target RDD to run tasks on
639   * @param func a function to run on each partition of the RDD
640   * @param evaluator [[ApproximateEvaluator]] to receive the partial results
641   * @param callSite where in the user program this job was called
642   * @param timeout maximum time to wait for the job, in milliseconds
643   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
644   */
645  def runApproximateJob[T, U, R](
646      rdd: RDD[T],
647      func: (TaskContext, Iterator[T]) => U,
648      evaluator: ApproximateEvaluator[U, R],
649      callSite: CallSite,
650      timeout: Long,
651      properties: Properties): PartialResult[R] = {
652    val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
653    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
654    val partitions = (0 until rdd.partitions.length).toArray
655    val jobId = nextJobId.getAndIncrement()
656    eventProcessLoop.post(JobSubmitted(
657      jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties)))
658    listener.awaitResult()    // Will throw an exception if the job fails
659  }
660
661  /**
662   * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter
663   * can be used to block until the job finishes executing or can be used to cancel the job.
664   * This method is used for adaptive query planning, to run map stages and look at statistics
665   * about their outputs before submitting downstream stages.
666   *
667   * @param dependency the ShuffleDependency to run a map stage for
668   * @param callback function called with the result of the job, which in this case will be a
669   *   single MapOutputStatistics object showing how much data was produced for each partition
670   * @param callSite where in the user program this job was submitted
671   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
672   */
673  def submitMapStage[K, V, C](
674      dependency: ShuffleDependency[K, V, C],
675      callback: MapOutputStatistics => Unit,
676      callSite: CallSite,
677      properties: Properties): JobWaiter[MapOutputStatistics] = {
678
679    val rdd = dependency.rdd
680    val jobId = nextJobId.getAndIncrement()
681    if (rdd.partitions.length == 0) {
682      throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
683    }
684
685    // We create a JobWaiter with only one "task", which will be marked as complete when the whole
686    // map stage has completed, and will be passed the MapOutputStatistics for that stage.
687    // This makes it easier to avoid race conditions between the user code and the map output
688    // tracker that might result if we told the user the stage had finished, but then they queries
689    // the map output tracker and some node failures had caused the output statistics to be lost.
690    val waiter = new JobWaiter(this, jobId, 1, (i: Int, r: MapOutputStatistics) => callback(r))
691    eventProcessLoop.post(MapStageSubmitted(
692      jobId, dependency, callSite, waiter, SerializationUtils.clone(properties)))
693    waiter
694  }
695
696  /**
697   * Cancel a job that is running or waiting in the queue.
698   */
699  def cancelJob(jobId: Int): Unit = {
700    logInfo("Asked to cancel job " + jobId)
701    eventProcessLoop.post(JobCancelled(jobId))
702  }
703
704  /**
705   * Cancel all jobs in the given job group ID.
706   */
707  def cancelJobGroup(groupId: String): Unit = {
708    logInfo("Asked to cancel job group " + groupId)
709    eventProcessLoop.post(JobGroupCancelled(groupId))
710  }
711
712  /**
713   * Cancel all jobs that are running or waiting in the queue.
714   */
715  def cancelAllJobs(): Unit = {
716    eventProcessLoop.post(AllJobsCancelled)
717  }
718
719  private[scheduler] def doCancelAllJobs() {
720    // Cancel all running jobs.
721    runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
722      reason = "as part of cancellation of all jobs"))
723    activeJobs.clear() // These should already be empty by this point,
724    jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
725  }
726
727  /**
728   * Cancel all jobs associated with a running or scheduled stage.
729   */
730  def cancelStage(stageId: Int) {
731    eventProcessLoop.post(StageCancelled(stageId))
732  }
733
734  /**
735   * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since
736   * the last fetch failure.
737   */
738  private[scheduler] def resubmitFailedStages() {
739    if (failedStages.size > 0) {
740      // Failed stages may be removed by job cancellation, so failed might be empty even if
741      // the ResubmitFailedStages event has been scheduled.
742      logInfo("Resubmitting failed stages")
743      clearCacheLocs()
744      val failedStagesCopy = failedStages.toArray
745      failedStages.clear()
746      for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
747        submitStage(stage)
748      }
749    }
750  }
751
752  /**
753   * Check for waiting stages which are now eligible for resubmission.
754   * Submits stages that depend on the given parent stage. Called when the parent stage completes
755   * successfully.
756   */
757  private def submitWaitingChildStages(parent: Stage) {
758    logTrace(s"Checking if any dependencies of $parent are now runnable")
759    logTrace("running: " + runningStages)
760    logTrace("waiting: " + waitingStages)
761    logTrace("failed: " + failedStages)
762    val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
763    waitingStages --= childStages
764    for (stage <- childStages.sortBy(_.firstJobId)) {
765      submitStage(stage)
766    }
767  }
768
769  /** Finds the earliest-created active job that needs the stage */
770  // TODO: Probably should actually find among the active jobs that need this
771  // stage the one with the highest priority (highest-priority pool, earliest created).
772  // That should take care of at least part of the priority inversion problem with
773  // cross-job dependencies.
774  private def activeJobForStage(stage: Stage): Option[Int] = {
775    val jobsThatUseStage: Array[Int] = stage.jobIds.toArray.sorted
776    jobsThatUseStage.find(jobIdToActiveJob.contains)
777  }
778
779  private[scheduler] def handleJobGroupCancelled(groupId: String) {
780    // Cancel all jobs belonging to this job group.
781    // First finds all active jobs with this group id, and then kill stages for them.
782    val activeInGroup = activeJobs.filter { activeJob =>
783      Option(activeJob.properties).exists {
784        _.getProperty(SparkContext.SPARK_JOB_GROUP_ID) == groupId
785      }
786    }
787    val jobIds = activeInGroup.map(_.jobId)
788    jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
789  }
790
791  private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
792    // Note that there is a chance that this task is launched after the stage is cancelled.
793    // In that case, we wouldn't have the stage anymore in stageIdToStage.
794    val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
795    listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
796  }
797
798  private[scheduler] def handleTaskSetFailed(
799      taskSet: TaskSet,
800      reason: String,
801      exception: Option[Throwable]): Unit = {
802    stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) }
803  }
804
805  private[scheduler] def cleanUpAfterSchedulerStop() {
806    for (job <- activeJobs) {
807      val error =
808        new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
809      job.listener.jobFailed(error)
810      // Tell the listeners that all of the running stages have ended.  Don't bother
811      // cancelling the stages because if the DAG scheduler is stopped, the entire application
812      // is in the process of getting stopped.
813      val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
814      // The `toArray` here is necessary so that we don't iterate over `runningStages` while
815      // mutating it.
816      runningStages.toArray.foreach { stage =>
817        markStageAsFinished(stage, Some(stageFailedMessage))
818      }
819      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
820    }
821  }
822
823  private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
824    listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
825  }
826
827  private[scheduler] def handleJobSubmitted(jobId: Int,
828      finalRDD: RDD[_],
829      func: (TaskContext, Iterator[_]) => _,
830      partitions: Array[Int],
831      callSite: CallSite,
832      listener: JobListener,
833      properties: Properties) {
834    var finalStage: ResultStage = null
835    try {
836      // New stage creation may throw an exception if, for example, jobs are run on a
837      // HadoopRDD whose underlying HDFS files have been deleted.
838      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
839    } catch {
840      case e: Exception =>
841        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
842        listener.jobFailed(e)
843        return
844    }
845
846    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
847    clearCacheLocs()
848    logInfo("Got job %s (%s) with %d output partitions".format(
849      job.jobId, callSite.shortForm, partitions.length))
850    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
851    logInfo("Parents of final stage: " + finalStage.parents)
852    logInfo("Missing parents: " + getMissingParentStages(finalStage))
853
854    val jobSubmissionTime = clock.getTimeMillis()
855    jobIdToActiveJob(jobId) = job
856    activeJobs += job
857    finalStage.setActiveJob(job)
858    val stageIds = jobIdToStageIds(jobId).toArray
859    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
860    listenerBus.post(
861      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
862    submitStage(finalStage)
863  }
864
865  private[scheduler] def handleMapStageSubmitted(jobId: Int,
866      dependency: ShuffleDependency[_, _, _],
867      callSite: CallSite,
868      listener: JobListener,
869      properties: Properties) {
870    // Submitting this map stage might still require the creation of some parent stages, so make
871    // sure that happens.
872    var finalStage: ShuffleMapStage = null
873    try {
874      // New stage creation may throw an exception if, for example, jobs are run on a
875      // HadoopRDD whose underlying HDFS files have been deleted.
876      finalStage = getOrCreateShuffleMapStage(dependency, jobId)
877    } catch {
878      case e: Exception =>
879        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
880        listener.jobFailed(e)
881        return
882    }
883
884    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
885    clearCacheLocs()
886    logInfo("Got map stage job %s (%s) with %d output partitions".format(
887      jobId, callSite.shortForm, dependency.rdd.partitions.length))
888    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
889    logInfo("Parents of final stage: " + finalStage.parents)
890    logInfo("Missing parents: " + getMissingParentStages(finalStage))
891
892    val jobSubmissionTime = clock.getTimeMillis()
893    jobIdToActiveJob(jobId) = job
894    activeJobs += job
895    finalStage.addActiveJob(job)
896    val stageIds = jobIdToStageIds(jobId).toArray
897    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
898    listenerBus.post(
899      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
900    submitStage(finalStage)
901
902    // If the whole stage has already finished, tell the listener and remove it
903    if (finalStage.isAvailable) {
904      markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
905    }
906  }
907
908  /** Submits stage, but first recursively submits any missing parents. */
909  private def submitStage(stage: Stage) {
910    val jobId = activeJobForStage(stage)
911    if (jobId.isDefined) {
912      logDebug("submitStage(" + stage + ")")
913      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
914        val missing = getMissingParentStages(stage).sortBy(_.id)
915        logDebug("missing: " + missing)
916        if (missing.isEmpty) {
917          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
918          submitMissingTasks(stage, jobId.get)
919        } else {
920          for (parent <- missing) {
921            submitStage(parent)
922          }
923          waitingStages += stage
924        }
925      }
926    } else {
927      abortStage(stage, "No active job for stage " + stage.id, None)
928    }
929  }
930
931  /** Called when stage's parents are available and we can now do its task. */
932  private def submitMissingTasks(stage: Stage, jobId: Int) {
933    logDebug("submitMissingTasks(" + stage + ")")
934    // Get our pending tasks and remember them in our pendingTasks entry
935    stage.pendingPartitions.clear()
936
937    // First figure out the indexes of partition ids to compute.
938    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
939
940    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
941    // with this Stage
942    val properties = jobIdToActiveJob(jobId).properties
943
944    runningStages += stage
945    // SparkListenerStageSubmitted should be posted before testing whether tasks are
946    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
947    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
948    // event.
949    stage match {
950      case s: ShuffleMapStage =>
951        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
952      case s: ResultStage =>
953        outputCommitCoordinator.stageStart(
954          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
955    }
956    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
957      stage match {
958        case s: ShuffleMapStage =>
959          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
960        case s: ResultStage =>
961          partitionsToCompute.map { id =>
962            val p = s.partitions(id)
963            (id, getPreferredLocs(stage.rdd, p))
964          }.toMap
965      }
966    } catch {
967      case NonFatal(e) =>
968        stage.makeNewStageAttempt(partitionsToCompute.size)
969        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
970        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
971        runningStages -= stage
972        return
973    }
974
975    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
976    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
977
978    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
979    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
980    // the serialized copy of the RDD and for each task we will deserialize it, which means each
981    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
982    // might modify state of objects referenced in their closures. This is necessary in Hadoop
983    // where the JobConf/Configuration object is not thread-safe.
984    var taskBinary: Broadcast[Array[Byte]] = null
985    try {
986      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
987      // For ResultTask, serialize and broadcast (rdd, func).
988      val taskBinaryBytes: Array[Byte] = stage match {
989        case stage: ShuffleMapStage =>
990          JavaUtils.bufferToArray(
991            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
992        case stage: ResultStage =>
993          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
994      }
995
996      taskBinary = sc.broadcast(taskBinaryBytes)
997    } catch {
998      // In the case of a failure during serialization, abort the stage.
999      case e: NotSerializableException =>
1000        abortStage(stage, "Task not serializable: " + e.toString, Some(e))
1001        runningStages -= stage
1002
1003        // Abort execution
1004        return
1005      case NonFatal(e) =>
1006        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
1007        runningStages -= stage
1008        return
1009    }
1010
1011    val tasks: Seq[Task[_]] = try {
1012      stage match {
1013        case stage: ShuffleMapStage =>
1014          partitionsToCompute.map { id =>
1015            val locs = taskIdToLocations(id)
1016            val part = stage.rdd.partitions(id)
1017            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
1018              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
1019              Option(sc.applicationId), sc.applicationAttemptId)
1020          }
1021
1022        case stage: ResultStage =>
1023          partitionsToCompute.map { id =>
1024            val p: Int = stage.partitions(id)
1025            val part = stage.rdd.partitions(p)
1026            val locs = taskIdToLocations(id)
1027            new ResultTask(stage.id, stage.latestInfo.attemptId,
1028              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
1029              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
1030          }
1031      }
1032    } catch {
1033      case NonFatal(e) =>
1034        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
1035        runningStages -= stage
1036        return
1037    }
1038
1039    if (tasks.size > 0) {
1040      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
1041      stage.pendingPartitions ++= tasks.map(_.partitionId)
1042      logDebug("New pending partitions: " + stage.pendingPartitions)
1043      taskScheduler.submitTasks(new TaskSet(
1044        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
1045      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
1046    } else {
1047      // Because we posted SparkListenerStageSubmitted earlier, we should mark
1048      // the stage as completed here in case there are no tasks to run
1049      markStageAsFinished(stage, None)
1050
1051      val debugString = stage match {
1052        case stage: ShuffleMapStage =>
1053          s"Stage ${stage} is actually done; " +
1054            s"(available: ${stage.isAvailable}," +
1055            s"available outputs: ${stage.numAvailableOutputs}," +
1056            s"partitions: ${stage.numPartitions})"
1057        case stage : ResultStage =>
1058          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
1059      }
1060      logDebug(debugString)
1061
1062      submitWaitingChildStages(stage)
1063    }
1064  }
1065
1066  /**
1067   * Merge local values from a task into the corresponding accumulators previously registered
1068   * here on the driver.
1069   *
1070   * Although accumulators themselves are not thread-safe, this method is called only from one
1071   * thread, the one that runs the scheduling loop. This means we only handle one task
1072   * completion event at a time so we don't need to worry about locking the accumulators.
1073   * This still doesn't stop the caller from updating the accumulator outside the scheduler,
1074   * but that's not our problem since there's nothing we can do about that.
1075   */
1076  private def updateAccumulators(event: CompletionEvent): Unit = {
1077    val task = event.task
1078    val stage = stageIdToStage(task.stageId)
1079    try {
1080      event.accumUpdates.foreach { updates =>
1081        val id = updates.id
1082        // Find the corresponding accumulator on the driver and update it
1083        val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
1084          case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
1085          case None =>
1086            throw new SparkException(s"attempted to access non-existent accumulator $id")
1087        }
1088        acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]])
1089        // To avoid UI cruft, ignore cases where value wasn't updated
1090        if (acc.name.isDefined && !updates.isZero) {
1091          stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
1092          event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value))
1093        }
1094      }
1095    } catch {
1096      case NonFatal(e) =>
1097        logError(s"Failed to update accumulators for task ${task.partitionId}", e)
1098    }
1099  }
1100
1101  /**
1102   * Responds to a task finishing. This is called inside the event loop so it assumes that it can
1103   * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
1104   */
1105  private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
1106    val task = event.task
1107    val taskId = event.taskInfo.id
1108    val stageId = task.stageId
1109    val taskType = Utils.getFormattedClassName(task)
1110
1111    outputCommitCoordinator.taskCompleted(
1112      stageId,
1113      task.partitionId,
1114      event.taskInfo.attemptNumber, // this is a task attempt number
1115      event.reason)
1116
1117    // Reconstruct task metrics. Note: this may be null if the task has failed.
1118    val taskMetrics: TaskMetrics =
1119      if (event.accumUpdates.nonEmpty) {
1120        try {
1121          TaskMetrics.fromAccumulators(event.accumUpdates)
1122        } catch {
1123          case NonFatal(e) =>
1124            logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
1125            null
1126        }
1127      } else {
1128        null
1129      }
1130
1131    // The stage may have already finished when we get this event -- eg. maybe it was a
1132    // speculative task. It is important that we send the TaskEnd event in any case, so listeners
1133    // are properly notified and can chose to handle it. For instance, some listeners are
1134    // doing their own accounting and if they don't get the task end event they think
1135    // tasks are still running when they really aren't.
1136    listenerBus.post(SparkListenerTaskEnd(
1137       stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
1138
1139    if (!stageIdToStage.contains(task.stageId)) {
1140      // Skip all the actions if the stage has been cancelled.
1141      return
1142    }
1143
1144    val stage = stageIdToStage(task.stageId)
1145    event.reason match {
1146      case Success =>
1147        stage.pendingPartitions -= task.partitionId
1148        task match {
1149          case rt: ResultTask[_, _] =>
1150            // Cast to ResultStage here because it's part of the ResultTask
1151            // TODO Refactor this out to a function that accepts a ResultStage
1152            val resultStage = stage.asInstanceOf[ResultStage]
1153            resultStage.activeJob match {
1154              case Some(job) =>
1155                if (!job.finished(rt.outputId)) {
1156                  updateAccumulators(event)
1157                  job.finished(rt.outputId) = true
1158                  job.numFinished += 1
1159                  // If the whole job has finished, remove it
1160                  if (job.numFinished == job.numPartitions) {
1161                    markStageAsFinished(resultStage)
1162                    cleanupStateForJobAndIndependentStages(job)
1163                    listenerBus.post(
1164                      SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
1165                  }
1166
1167                  // taskSucceeded runs some user code that might throw an exception. Make sure
1168                  // we are resilient against that.
1169                  try {
1170                    job.listener.taskSucceeded(rt.outputId, event.result)
1171                  } catch {
1172                    case e: Exception =>
1173                      // TODO: Perhaps we want to mark the resultStage as failed?
1174                      job.listener.jobFailed(new SparkDriverExecutionException(e))
1175                  }
1176                }
1177              case None =>
1178                logInfo("Ignoring result from " + rt + " because its job has finished")
1179            }
1180
1181          case smt: ShuffleMapTask =>
1182            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
1183            updateAccumulators(event)
1184            val status = event.result.asInstanceOf[MapStatus]
1185            val execId = status.location.executorId
1186            logDebug("ShuffleMapTask finished on " + execId)
1187            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
1188              logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
1189            } else {
1190              shuffleStage.addOutputLoc(smt.partitionId, status)
1191            }
1192
1193            if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
1194              markStageAsFinished(shuffleStage)
1195              logInfo("looking for newly runnable stages")
1196              logInfo("running: " + runningStages)
1197              logInfo("waiting: " + waitingStages)
1198              logInfo("failed: " + failedStages)
1199
1200              // We supply true to increment the epoch number here in case this is a
1201              // recomputation of the map outputs. In that case, some nodes may have cached
1202              // locations with holes (from when we detected the error) and will need the
1203              // epoch incremented to refetch them.
1204              // TODO: Only increment the epoch number if this is not the first time
1205              //       we registered these map outputs.
1206              mapOutputTracker.registerMapOutputs(
1207                shuffleStage.shuffleDep.shuffleId,
1208                shuffleStage.outputLocInMapOutputTrackerFormat(),
1209                changeEpoch = true)
1210
1211              clearCacheLocs()
1212
1213              if (!shuffleStage.isAvailable) {
1214                // Some tasks had failed; let's resubmit this shuffleStage
1215                // TODO: Lower-level scheduler should also deal with this
1216                logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
1217                  ") because some of its tasks had failed: " +
1218                  shuffleStage.findMissingPartitions().mkString(", "))
1219                submitStage(shuffleStage)
1220              } else {
1221                // Mark any map-stage jobs waiting on this stage as finished
1222                if (shuffleStage.mapStageJobs.nonEmpty) {
1223                  val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
1224                  for (job <- shuffleStage.mapStageJobs) {
1225                    markMapStageJobAsFinished(job, stats)
1226                  }
1227                }
1228                submitWaitingChildStages(shuffleStage)
1229              }
1230            }
1231        }
1232
1233      case Resubmitted =>
1234        logInfo("Resubmitted " + task + ", so marking it as still running")
1235        stage.pendingPartitions += task.partitionId
1236
1237      case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
1238        val failedStage = stageIdToStage(task.stageId)
1239        val mapStage = shuffleIdToMapStage(shuffleId)
1240
1241        if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
1242          logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
1243            s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
1244            s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
1245        } else {
1246          // It is likely that we receive multiple FetchFailed for a single stage (because we have
1247          // multiple tasks running concurrently on different executors). In that case, it is
1248          // possible the fetch failure has already been handled by the scheduler.
1249          if (runningStages.contains(failedStage)) {
1250            logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
1251              s"due to a fetch failure from $mapStage (${mapStage.name})")
1252            markStageAsFinished(failedStage, Some(failureMessage))
1253          } else {
1254            logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " +
1255              s"longer running")
1256          }
1257
1258          if (disallowStageRetryForTest) {
1259            abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
1260              None)
1261          } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
1262            abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
1263              s"has failed the maximum allowable number of " +
1264              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
1265              s"Most recent failure reason: ${failureMessage}", None)
1266          } else {
1267            if (failedStages.isEmpty) {
1268              // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1269              // in that case the event will already have been scheduled.
1270              // TODO: Cancel running tasks in the stage
1271              logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
1272                s"$failedStage (${failedStage.name}) due to fetch failure")
1273              messageScheduler.schedule(new Runnable {
1274                override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
1275              }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
1276            }
1277            failedStages += failedStage
1278            failedStages += mapStage
1279          }
1280          // Mark the map whose fetch failed as broken in the map stage
1281          if (mapId != -1) {
1282            mapStage.removeOutputLoc(mapId, bmAddress)
1283            mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1284          }
1285
1286          // TODO: mark the executor as failed only if there were lots of fetch failures on it
1287          if (bmAddress != null) {
1288            handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
1289          }
1290        }
1291
1292      case commitDenied: TaskCommitDenied =>
1293        // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits
1294
1295      case exceptionFailure: ExceptionFailure =>
1296        // Tasks failed with exceptions might still have accumulator updates.
1297        updateAccumulators(event)
1298
1299      case TaskResultLost =>
1300        // Do nothing here; the TaskScheduler handles these failures and resubmits the task.
1301
1302      case _: ExecutorLostFailure | TaskKilled | UnknownReason =>
1303        // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
1304        // will abort the job.
1305    }
1306  }
1307
1308  /**
1309   * Responds to an executor being lost. This is called inside the event loop, so it assumes it can
1310   * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
1311   *
1312   * We will also assume that we've lost all shuffle blocks associated with the executor if the
1313   * executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
1314   * is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
1315   * presume all shuffle data related to this executor to be lost.
1316   *
1317   * Optionally the epoch during which the failure was caught can be passed to avoid allowing
1318   * stray fetch failures from possibly retriggering the detection of a node as lost.
1319   */
1320  private[scheduler] def handleExecutorLost(
1321      execId: String,
1322      filesLost: Boolean,
1323      maybeEpoch: Option[Long] = None) {
1324    val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
1325    if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
1326      failedEpoch(execId) = currentEpoch
1327      logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
1328      blockManagerMaster.removeExecutor(execId)
1329
1330      if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {
1331        logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
1332        // TODO: This will be really slow if we keep accumulating shuffle map stages
1333        for ((shuffleId, stage) <- shuffleIdToMapStage) {
1334          stage.removeOutputsOnExecutor(execId)
1335          mapOutputTracker.registerMapOutputs(
1336            shuffleId,
1337            stage.outputLocInMapOutputTrackerFormat(),
1338            changeEpoch = true)
1339        }
1340        if (shuffleIdToMapStage.isEmpty) {
1341          mapOutputTracker.incrementEpoch()
1342        }
1343        clearCacheLocs()
1344      }
1345    } else {
1346      logDebug("Additional executor lost message for " + execId +
1347               "(epoch " + currentEpoch + ")")
1348    }
1349  }
1350
1351  private[scheduler] def handleExecutorAdded(execId: String, host: String) {
1352    // remove from failedEpoch(execId) ?
1353    if (failedEpoch.contains(execId)) {
1354      logInfo("Host added was in lost list earlier: " + host)
1355      failedEpoch -= execId
1356    }
1357  }
1358
1359  private[scheduler] def handleStageCancellation(stageId: Int) {
1360    stageIdToStage.get(stageId) match {
1361      case Some(stage) =>
1362        val jobsThatUseStage: Array[Int] = stage.jobIds.toArray
1363        jobsThatUseStage.foreach { jobId =>
1364          handleJobCancellation(jobId, s"because Stage $stageId was cancelled")
1365        }
1366      case None =>
1367        logInfo("No active jobs to kill for Stage " + stageId)
1368    }
1369  }
1370
1371  private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
1372    if (!jobIdToStageIds.contains(jobId)) {
1373      logDebug("Trying to cancel unregistered job " + jobId)
1374    } else {
1375      failJobAndIndependentStages(
1376        jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
1377    }
1378  }
1379
1380  /**
1381   * Marks a stage as finished and removes it from the list of running stages.
1382   */
1383  private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
1384    val serviceTime = stage.latestInfo.submissionTime match {
1385      case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
1386      case _ => "Unknown"
1387    }
1388    if (errorMessage.isEmpty) {
1389      logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
1390      stage.latestInfo.completionTime = Some(clock.getTimeMillis())
1391
1392      // Clear failure count for this stage, now that it's succeeded.
1393      // We only limit consecutive failures of stage attempts,so that if a stage is
1394      // re-used many times in a long-running job, unrelated failures don't eventually cause the
1395      // stage to be aborted.
1396      stage.clearFailures()
1397    } else {
1398      stage.latestInfo.stageFailed(errorMessage.get)
1399      logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}")
1400    }
1401
1402    outputCommitCoordinator.stageEnd(stage.id)
1403    listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
1404    runningStages -= stage
1405  }
1406
1407  /**
1408   * Aborts all jobs depending on a particular Stage. This is called in response to a task set
1409   * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
1410   */
1411  private[scheduler] def abortStage(
1412      failedStage: Stage,
1413      reason: String,
1414      exception: Option[Throwable]): Unit = {
1415    if (!stageIdToStage.contains(failedStage.id)) {
1416      // Skip all the actions if the stage has been removed.
1417      return
1418    }
1419    val dependentJobs: Seq[ActiveJob] =
1420      activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
1421    failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
1422    for (job <- dependentJobs) {
1423      failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", exception)
1424    }
1425    if (dependentJobs.isEmpty) {
1426      logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
1427    }
1428  }
1429
1430  /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
1431  private def failJobAndIndependentStages(
1432      job: ActiveJob,
1433      failureReason: String,
1434      exception: Option[Throwable] = None): Unit = {
1435    val error = new SparkException(failureReason, exception.getOrElse(null))
1436    var ableToCancelStages = true
1437
1438    val shouldInterruptThread =
1439      if (job.properties == null) false
1440      else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
1441
1442    // Cancel all independent, running stages.
1443    val stages = jobIdToStageIds(job.jobId)
1444    if (stages.isEmpty) {
1445      logError("No stages registered for job " + job.jobId)
1446    }
1447    stages.foreach { stageId =>
1448      val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds)
1449      if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) {
1450        logError(
1451          "Job %d not registered for stage %d even though that stage was registered for the job"
1452            .format(job.jobId, stageId))
1453      } else if (jobsForStage.get.size == 1) {
1454        if (!stageIdToStage.contains(stageId)) {
1455          logError(s"Missing Stage for stage with id $stageId")
1456        } else {
1457          // This is the only job that uses this stage, so fail the stage if it is running.
1458          val stage = stageIdToStage(stageId)
1459          if (runningStages.contains(stage)) {
1460            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
1461              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
1462              markStageAsFinished(stage, Some(failureReason))
1463            } catch {
1464              case e: UnsupportedOperationException =>
1465                logInfo(s"Could not cancel tasks for stage $stageId", e)
1466              ableToCancelStages = false
1467            }
1468          }
1469        }
1470      }
1471    }
1472
1473    if (ableToCancelStages) {
1474      // SPARK-15783 important to cleanup state first, just for tests where we have some asserts
1475      // against the state.  Otherwise we have a *little* bit of flakiness in the tests.
1476      cleanupStateForJobAndIndependentStages(job)
1477      job.listener.jobFailed(error)
1478      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
1479    }
1480  }
1481
1482  /** Return true if one of stage's ancestors is target. */
1483  private def stageDependsOn(stage: Stage, target: Stage): Boolean = {
1484    if (stage == target) {
1485      return true
1486    }
1487    val visitedRdds = new HashSet[RDD[_]]
1488    // We are manually maintaining a stack here to prevent StackOverflowError
1489    // caused by recursively visiting
1490    val waitingForVisit = new Stack[RDD[_]]
1491    def visit(rdd: RDD[_]) {
1492      if (!visitedRdds(rdd)) {
1493        visitedRdds += rdd
1494        for (dep <- rdd.dependencies) {
1495          dep match {
1496            case shufDep: ShuffleDependency[_, _, _] =>
1497              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
1498              if (!mapStage.isAvailable) {
1499                waitingForVisit.push(mapStage.rdd)
1500              }  // Otherwise there's no need to follow the dependency back
1501            case narrowDep: NarrowDependency[_] =>
1502              waitingForVisit.push(narrowDep.rdd)
1503          }
1504        }
1505      }
1506    }
1507    waitingForVisit.push(stage.rdd)
1508    while (waitingForVisit.nonEmpty) {
1509      visit(waitingForVisit.pop())
1510    }
1511    visitedRdds.contains(target.rdd)
1512  }
1513
1514  /**
1515   * Gets the locality information associated with a partition of a particular RDD.
1516   *
1517   * This method is thread-safe and is called from both DAGScheduler and SparkContext.
1518   *
1519   * @param rdd whose partitions are to be looked at
1520   * @param partition to lookup locality information for
1521   * @return list of machines that are preferred by the partition
1522   */
1523  private[spark]
1524  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
1525    getPreferredLocsInternal(rdd, partition, new HashSet)
1526  }
1527
1528  /**
1529   * Recursive implementation for getPreferredLocs.
1530   *
1531   * This method is thread-safe because it only accesses DAGScheduler state through thread-safe
1532   * methods (getCacheLocs()); please be careful when modifying this method, because any new
1533   * DAGScheduler state accessed by it may require additional synchronization.
1534   */
1535  private def getPreferredLocsInternal(
1536      rdd: RDD[_],
1537      partition: Int,
1538      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
1539    // If the partition has already been visited, no need to re-visit.
1540    // This avoids exponential path exploration.  SPARK-695
1541    if (!visited.add((rdd, partition))) {
1542      // Nil has already been returned for previously visited partitions.
1543      return Nil
1544    }
1545    // If the partition is cached, return the cache locations
1546    val cached = getCacheLocs(rdd)(partition)
1547    if (cached.nonEmpty) {
1548      return cached
1549    }
1550    // If the RDD has some placement preferences (as is the case for input RDDs), get those
1551    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
1552    if (rddPrefs.nonEmpty) {
1553      return rddPrefs.map(TaskLocation(_))
1554    }
1555
1556    // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
1557    // that has any placement preferences. Ideally we would choose based on transfer sizes,
1558    // but this will do for now.
1559    rdd.dependencies.foreach {
1560      case n: NarrowDependency[_] =>
1561        for (inPart <- n.getParents(partition)) {
1562          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
1563          if (locs != Nil) {
1564            return locs
1565          }
1566        }
1567
1568      case _ =>
1569    }
1570
1571    Nil
1572  }
1573
1574  /** Mark a map stage job as finished with the given output stats, and report to its listener. */
1575  def markMapStageJobAsFinished(job: ActiveJob, stats: MapOutputStatistics): Unit = {
1576    // In map stage jobs, we only create a single "task", which is to finish all of the stage
1577    // (including reusing any previous map outputs, etc); so we just mark task 0 as done
1578    job.finished(0) = true
1579    job.numFinished += 1
1580    job.listener.taskSucceeded(0, stats)
1581    cleanupStateForJobAndIndependentStages(job)
1582    listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
1583  }
1584
1585  def stop() {
1586    messageScheduler.shutdownNow()
1587    eventProcessLoop.stop()
1588    taskScheduler.stop()
1589  }
1590
1591  eventProcessLoop.start()
1592}
1593
1594private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
1595  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
1596
1597  private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
1598
1599  /**
1600   * The main event loop of the DAG scheduler.
1601   */
1602  override def onReceive(event: DAGSchedulerEvent): Unit = {
1603    val timerContext = timer.time()
1604    try {
1605      doOnReceive(event)
1606    } finally {
1607      timerContext.stop()
1608    }
1609  }
1610
1611  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
1612    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
1613      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
1614
1615    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
1616      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
1617
1618    case StageCancelled(stageId) =>
1619      dagScheduler.handleStageCancellation(stageId)
1620
1621    case JobCancelled(jobId) =>
1622      dagScheduler.handleJobCancellation(jobId)
1623
1624    case JobGroupCancelled(groupId) =>
1625      dagScheduler.handleJobGroupCancelled(groupId)
1626
1627    case AllJobsCancelled =>
1628      dagScheduler.doCancelAllJobs()
1629
1630    case ExecutorAdded(execId, host) =>
1631      dagScheduler.handleExecutorAdded(execId, host)
1632
1633    case ExecutorLost(execId, reason) =>
1634      val filesLost = reason match {
1635        case SlaveLost(_, true) => true
1636        case _ => false
1637      }
1638      dagScheduler.handleExecutorLost(execId, filesLost)
1639
1640    case BeginEvent(task, taskInfo) =>
1641      dagScheduler.handleBeginEvent(task, taskInfo)
1642
1643    case GettingResultEvent(taskInfo) =>
1644      dagScheduler.handleGetTaskResult(taskInfo)
1645
1646    case completion: CompletionEvent =>
1647      dagScheduler.handleTaskCompletion(completion)
1648
1649    case TaskSetFailed(taskSet, reason, exception) =>
1650      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
1651
1652    case ResubmitFailedStages =>
1653      dagScheduler.resubmitFailedStages()
1654  }
1655
1656  override def onError(e: Throwable): Unit = {
1657    logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
1658    try {
1659      dagScheduler.doCancelAllJobs()
1660    } catch {
1661      case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
1662    }
1663    dagScheduler.sc.stopInNewThread()
1664  }
1665
1666  override def onStop(): Unit = {
1667    // Cancel any active jobs in postStop hook
1668    dagScheduler.cleanUpAfterSchedulerStop()
1669  }
1670}
1671
1672private[spark] object DAGScheduler {
1673  // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
1674  // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
1675  // as more failure events come in
1676  val RESUBMIT_TIMEOUT = 200
1677}
1678