1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import java.io.NotSerializableException 21import java.util.Properties 22import java.util.concurrent.TimeUnit 23import java.util.concurrent.atomic.AtomicInteger 24 25import scala.annotation.tailrec 26import scala.collection.Map 27import scala.collection.mutable.{HashMap, HashSet, Stack} 28import scala.concurrent.duration._ 29import scala.language.existentials 30import scala.language.postfixOps 31import scala.util.control.NonFatal 32 33import org.apache.commons.lang3.SerializationUtils 34 35import org.apache.spark._ 36import org.apache.spark.broadcast.Broadcast 37import org.apache.spark.executor.TaskMetrics 38import org.apache.spark.internal.Logging 39import org.apache.spark.network.util.JavaUtils 40import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} 41import org.apache.spark.rdd.RDD 42import org.apache.spark.rpc.RpcTimeout 43import org.apache.spark.storage._ 44import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat 45import org.apache.spark.util._ 46 47/** 48 * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of 49 * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a 50 * minimal schedule to run the job. It then submits stages as TaskSets to an underlying 51 * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent 52 * tasks that can run right away based on the data that's already on the cluster (e.g. map output 53 * files from previous stages), though it may fail if this data becomes unavailable. 54 * 55 * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with 56 * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks 57 * in each stage, but operations with shuffle dependencies require multiple stages (one to write a 58 * set of map output files, and another to read those files after a barrier). In the end, every 59 * stage will have only shuffle dependencies on other stages, and may compute multiple operations 60 * inside it. The actual pipelining of these operations happens in the RDD.compute() functions of 61 * various RDDs (MappedRDD, FilteredRDD, etc). 62 * 63 * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred 64 * locations to run each task on, based on the current cache status, and passes these to the 65 * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being 66 * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are 67 * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task 68 * a small number of times before cancelling the whole stage. 69 * 70 * When looking through this code, there are several key concepts: 71 * 72 * - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler. 73 * For example, when the user calls an action, like count(), a job will be submitted through 74 * submitJob. Each Job may require the execution of multiple stages to build intermediate data. 75 * 76 * - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each 77 * task computes the same function on partitions of the same RDD. Stages are separated at shuffle 78 * boundaries, which introduce a barrier (where we must wait for the previous stage to finish to 79 * fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that 80 * executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle. 81 * Stages are often shared across multiple jobs, if these jobs reuse the same RDDs. 82 * 83 * - Tasks are individual units of work, each sent to one machine. 84 * 85 * - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them 86 * and likewise remembers which shuffle map stages have already produced output files to avoid 87 * redoing the map side of a shuffle. 88 * 89 * - Preferred locations: the DAGScheduler also computes where to run each task in a stage based 90 * on the preferred locations of its underlying RDDs, or the location of cached or shuffle data. 91 * 92 * - Cleanup: all data structures are cleared when the running jobs that depend on them finish, 93 * to prevent memory leaks in a long-running application. 94 * 95 * To recover from failures, the same stage might need to run multiple times, which are called 96 * "attempts". If the TaskScheduler reports that a task failed because a map output file from a 97 * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a 98 * CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small 99 * amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost 100 * stage(s) that compute the missing tasks. As part of this process, we might also have to create 101 * Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since 102 * tasks from the old attempt of a stage could still be running, care must be taken to map any 103 * events received in the correct Stage object. 104 * 105 * Here's a checklist to use when making or reviewing changes to this class: 106 * 107 * - All data structures should be cleared when the jobs involving them end to avoid indefinite 108 * accumulation of state in long-running programs. 109 * 110 * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to 111 * include the new structure. This will help to catch memory leaks. 112 */ 113private[spark] 114class DAGScheduler( 115 private[scheduler] val sc: SparkContext, 116 private[scheduler] val taskScheduler: TaskScheduler, 117 listenerBus: LiveListenerBus, 118 mapOutputTracker: MapOutputTrackerMaster, 119 blockManagerMaster: BlockManagerMaster, 120 env: SparkEnv, 121 clock: Clock = new SystemClock()) 122 extends Logging { 123 124 def this(sc: SparkContext, taskScheduler: TaskScheduler) = { 125 this( 126 sc, 127 taskScheduler, 128 sc.listenerBus, 129 sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], 130 sc.env.blockManager.master, 131 sc.env) 132 } 133 134 def this(sc: SparkContext) = this(sc, sc.taskScheduler) 135 136 private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this) 137 138 private[scheduler] val nextJobId = new AtomicInteger(0) 139 private[scheduler] def numTotalJobs: Int = nextJobId.get() 140 private val nextStageId = new AtomicInteger(0) 141 142 private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] 143 private[scheduler] val stageIdToStage = new HashMap[Int, Stage] 144 /** 145 * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for 146 * that dependency. Only includes stages that are part of currently running job (when the job(s) 147 * that require the shuffle stage complete, the mapping will be removed, and the only record of 148 * the shuffle data will be in the MapOutputTracker). 149 */ 150 private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage] 151 private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] 152 153 // Stages we need to run whose parents aren't done 154 private[scheduler] val waitingStages = new HashSet[Stage] 155 156 // Stages we are running right now 157 private[scheduler] val runningStages = new HashSet[Stage] 158 159 // Stages that must be resubmitted due to fetch failures 160 private[scheduler] val failedStages = new HashSet[Stage] 161 162 private[scheduler] val activeJobs = new HashSet[ActiveJob] 163 164 /** 165 * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids 166 * and its values are arrays indexed by partition numbers. Each array value is the set of 167 * locations where that RDD partition is cached. 168 * 169 * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). 170 */ 171 private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]] 172 173 // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with 174 // every task. When we detect a node failing, we note the current epoch number and failed 175 // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results. 176 // 177 // TODO: Garbage collect information about failure epochs when we know there are no more 178 // stray messages to detect. 179 private val failedEpoch = new HashMap[String, Long] 180 181 private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator 182 183 // A closure serializer that we reuse. 184 // This is only safe because DAGScheduler runs in a single thread. 185 private val closureSerializer = SparkEnv.get.closureSerializer.newInstance() 186 187 /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */ 188 private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) 189 190 private val messageScheduler = 191 ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") 192 193 private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) 194 taskScheduler.setDAGScheduler(this) 195 196 /** 197 * Called by the TaskSetManager to report task's starting. 198 */ 199 def taskStarted(task: Task[_], taskInfo: TaskInfo) { 200 eventProcessLoop.post(BeginEvent(task, taskInfo)) 201 } 202 203 /** 204 * Called by the TaskSetManager to report that a task has completed 205 * and results are being fetched remotely. 206 */ 207 def taskGettingResult(taskInfo: TaskInfo) { 208 eventProcessLoop.post(GettingResultEvent(taskInfo)) 209 } 210 211 /** 212 * Called by the TaskSetManager to report task completions or failures. 213 */ 214 def taskEnded( 215 task: Task[_], 216 reason: TaskEndReason, 217 result: Any, 218 accumUpdates: Seq[AccumulatorV2[_, _]], 219 taskInfo: TaskInfo): Unit = { 220 eventProcessLoop.post( 221 CompletionEvent(task, reason, result, accumUpdates, taskInfo)) 222 } 223 224 /** 225 * Update metrics for in-progress tasks and let the master know that the BlockManager is still 226 * alive. Return true if the driver knows about the given block manager. Otherwise, return false, 227 * indicating that the block manager should re-register. 228 */ 229 def executorHeartbeatReceived( 230 execId: String, 231 // (taskId, stageId, stageAttemptId, accumUpdates) 232 accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], 233 blockManagerId: BlockManagerId): Boolean = { 234 listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates)) 235 blockManagerMaster.driverEndpoint.askWithRetry[Boolean]( 236 BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) 237 } 238 239 /** 240 * Called by TaskScheduler implementation when an executor fails. 241 */ 242 def executorLost(execId: String, reason: ExecutorLossReason): Unit = { 243 eventProcessLoop.post(ExecutorLost(execId, reason)) 244 } 245 246 /** 247 * Called by TaskScheduler implementation when a host is added. 248 */ 249 def executorAdded(execId: String, host: String): Unit = { 250 eventProcessLoop.post(ExecutorAdded(execId, host)) 251 } 252 253 /** 254 * Called by the TaskSetManager to cancel an entire TaskSet due to either repeated failures or 255 * cancellation of the job itself. 256 */ 257 def taskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]): Unit = { 258 eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception)) 259 } 260 261 private[scheduler] 262 def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { 263 // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times 264 if (!cacheLocs.contains(rdd.id)) { 265 // Note: if the storage level is NONE, we don't need to get locations from block manager. 266 val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) { 267 IndexedSeq.fill(rdd.partitions.length)(Nil) 268 } else { 269 val blockIds = 270 rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] 271 blockManagerMaster.getLocations(blockIds).map { bms => 272 bms.map(bm => TaskLocation(bm.host, bm.executorId)) 273 } 274 } 275 cacheLocs(rdd.id) = locs 276 } 277 cacheLocs(rdd.id) 278 } 279 280 private def clearCacheLocs(): Unit = cacheLocs.synchronized { 281 cacheLocs.clear() 282 } 283 284 /** 285 * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the 286 * shuffle map stage doesn't already exist, this method will create the shuffle map stage in 287 * addition to any missing ancestor shuffle map stages. 288 */ 289 private def getOrCreateShuffleMapStage( 290 shuffleDep: ShuffleDependency[_, _, _], 291 firstJobId: Int): ShuffleMapStage = { 292 shuffleIdToMapStage.get(shuffleDep.shuffleId) match { 293 case Some(stage) => 294 stage 295 296 case None => 297 // Create stages for all missing ancestor shuffle dependencies. 298 getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => 299 // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies 300 // that were not already in shuffleIdToMapStage, it's possible that by the time we 301 // get to a particular dependency in the foreach loop, it's been added to 302 // shuffleIdToMapStage by the stage creation process for an earlier dependency. See 303 // SPARK-13902 for more information. 304 if (!shuffleIdToMapStage.contains(dep.shuffleId)) { 305 createShuffleMapStage(dep, firstJobId) 306 } 307 } 308 // Finally, create a stage for the given shuffle dependency. 309 createShuffleMapStage(shuffleDep, firstJobId) 310 } 311 } 312 313 /** 314 * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a 315 * previously run stage generated the same shuffle data, this function will copy the output 316 * locations that are still available from the previous shuffle to avoid unnecessarily 317 * regenerating data. 318 */ 319 def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { 320 val rdd = shuffleDep.rdd 321 val numTasks = rdd.partitions.length 322 val parents = getOrCreateParentStages(rdd, jobId) 323 val id = nextStageId.getAndIncrement() 324 val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep) 325 326 stageIdToStage(id) = stage 327 shuffleIdToMapStage(shuffleDep.shuffleId) = stage 328 updateJobIdStageIdMaps(jobId, stage) 329 330 if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { 331 // A previously run stage generated partitions for this shuffle, so for each output 332 // that's still available, copy information about that output location to the new stage 333 // (so we don't unnecessarily re-compute that data). 334 val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) 335 val locs = MapOutputTracker.deserializeMapStatuses(serLocs) 336 (0 until locs.length).foreach { i => 337 if (locs(i) ne null) { 338 // locs(i) will be null if missing 339 stage.addOutputLoc(i, locs(i)) 340 } 341 } 342 } else { 343 // Kind of ugly: need to register RDDs with the cache and map output tracker here 344 // since we can't do it in the RDD constructor because # of partitions is unknown 345 logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") 346 mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) 347 } 348 stage 349 } 350 351 /** 352 * Create a ResultStage associated with the provided jobId. 353 */ 354 private def createResultStage( 355 rdd: RDD[_], 356 func: (TaskContext, Iterator[_]) => _, 357 partitions: Array[Int], 358 jobId: Int, 359 callSite: CallSite): ResultStage = { 360 val parents = getOrCreateParentStages(rdd, jobId) 361 val id = nextStageId.getAndIncrement() 362 val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) 363 stageIdToStage(id) = stage 364 updateJobIdStageIdMaps(jobId, stage) 365 stage 366 } 367 368 /** 369 * Get or create the list of parent stages for a given RDD. The new Stages will be created with 370 * the provided firstJobId. 371 */ 372 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { 373 getShuffleDependencies(rdd).map { shuffleDep => 374 getOrCreateShuffleMapStage(shuffleDep, firstJobId) 375 }.toList 376 } 377 378 /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ 379 private def getMissingAncestorShuffleDependencies( 380 rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { 381 val ancestors = new Stack[ShuffleDependency[_, _, _]] 382 val visited = new HashSet[RDD[_]] 383 // We are manually maintaining a stack here to prevent StackOverflowError 384 // caused by recursively visiting 385 val waitingForVisit = new Stack[RDD[_]] 386 waitingForVisit.push(rdd) 387 while (waitingForVisit.nonEmpty) { 388 val toVisit = waitingForVisit.pop() 389 if (!visited(toVisit)) { 390 visited += toVisit 391 getShuffleDependencies(toVisit).foreach { shuffleDep => 392 if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { 393 ancestors.push(shuffleDep) 394 waitingForVisit.push(shuffleDep.rdd) 395 } // Otherwise, the dependency and its ancestors have already been registered. 396 } 397 } 398 } 399 ancestors 400 } 401 402 /** 403 * Returns shuffle dependencies that are immediate parents of the given RDD. 404 * 405 * This function will not return more distant ancestors. For example, if C has a shuffle 406 * dependency on B which has a shuffle dependency on A: 407 * 408 * A <-- B <-- C 409 * 410 * calling this function with rdd C will only return the B <-- C dependency. 411 * 412 * This function is scheduler-visible for the purpose of unit testing. 413 */ 414 private[scheduler] def getShuffleDependencies( 415 rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { 416 val parents = new HashSet[ShuffleDependency[_, _, _]] 417 val visited = new HashSet[RDD[_]] 418 val waitingForVisit = new Stack[RDD[_]] 419 waitingForVisit.push(rdd) 420 while (waitingForVisit.nonEmpty) { 421 val toVisit = waitingForVisit.pop() 422 if (!visited(toVisit)) { 423 visited += toVisit 424 toVisit.dependencies.foreach { 425 case shuffleDep: ShuffleDependency[_, _, _] => 426 parents += shuffleDep 427 case dependency => 428 waitingForVisit.push(dependency.rdd) 429 } 430 } 431 } 432 parents 433 } 434 435 private def getMissingParentStages(stage: Stage): List[Stage] = { 436 val missing = new HashSet[Stage] 437 val visited = new HashSet[RDD[_]] 438 // We are manually maintaining a stack here to prevent StackOverflowError 439 // caused by recursively visiting 440 val waitingForVisit = new Stack[RDD[_]] 441 def visit(rdd: RDD[_]) { 442 if (!visited(rdd)) { 443 visited += rdd 444 val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) 445 if (rddHasUncachedPartitions) { 446 for (dep <- rdd.dependencies) { 447 dep match { 448 case shufDep: ShuffleDependency[_, _, _] => 449 val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) 450 if (!mapStage.isAvailable) { 451 missing += mapStage 452 } 453 case narrowDep: NarrowDependency[_] => 454 waitingForVisit.push(narrowDep.rdd) 455 } 456 } 457 } 458 } 459 } 460 waitingForVisit.push(stage.rdd) 461 while (waitingForVisit.nonEmpty) { 462 visit(waitingForVisit.pop()) 463 } 464 missing.toList 465 } 466 467 /** 468 * Registers the given jobId among the jobs that need the given stage and 469 * all of that stage's ancestors. 470 */ 471 private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = { 472 @tailrec 473 def updateJobIdStageIdMapsList(stages: List[Stage]) { 474 if (stages.nonEmpty) { 475 val s = stages.head 476 s.jobIds += jobId 477 jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id 478 val parentsWithoutThisJobId = s.parents.filter { ! _.jobIds.contains(jobId) } 479 updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail) 480 } 481 } 482 updateJobIdStageIdMapsList(List(stage)) 483 } 484 485 /** 486 * Removes state for job and any stages that are not needed by any other job. Does not 487 * handle cancelling tasks or notifying the SparkListener about finished jobs/stages/tasks. 488 * 489 * @param job The job whose state to cleanup. 490 */ 491 private def cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit = { 492 val registeredStages = jobIdToStageIds.get(job.jobId) 493 if (registeredStages.isEmpty || registeredStages.get.isEmpty) { 494 logError("No stages registered for job " + job.jobId) 495 } else { 496 stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach { 497 case (stageId, stage) => 498 val jobSet = stage.jobIds 499 if (!jobSet.contains(job.jobId)) { 500 logError( 501 "Job %d not registered for stage %d even though that stage was registered for the job" 502 .format(job.jobId, stageId)) 503 } else { 504 def removeStage(stageId: Int) { 505 // data structures based on Stage 506 for (stage <- stageIdToStage.get(stageId)) { 507 if (runningStages.contains(stage)) { 508 logDebug("Removing running stage %d".format(stageId)) 509 runningStages -= stage 510 } 511 for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) { 512 shuffleIdToMapStage.remove(k) 513 } 514 if (waitingStages.contains(stage)) { 515 logDebug("Removing stage %d from waiting set.".format(stageId)) 516 waitingStages -= stage 517 } 518 if (failedStages.contains(stage)) { 519 logDebug("Removing stage %d from failed set.".format(stageId)) 520 failedStages -= stage 521 } 522 } 523 // data structures based on StageId 524 stageIdToStage -= stageId 525 logDebug("After removal of stage %d, remaining stages = %d" 526 .format(stageId, stageIdToStage.size)) 527 } 528 529 jobSet -= job.jobId 530 if (jobSet.isEmpty) { // no other job needs this stage 531 removeStage(stageId) 532 } 533 } 534 } 535 } 536 jobIdToStageIds -= job.jobId 537 jobIdToActiveJob -= job.jobId 538 activeJobs -= job 539 job.finalStage match { 540 case r: ResultStage => r.removeActiveJob() 541 case m: ShuffleMapStage => m.removeActiveJob(job) 542 } 543 } 544 545 /** 546 * Submit an action job to the scheduler. 547 * 548 * @param rdd target RDD to run tasks on 549 * @param func a function to run on each partition of the RDD 550 * @param partitions set of partitions to run on; some jobs may not want to compute on all 551 * partitions of the target RDD, e.g. for operations like first() 552 * @param callSite where in the user program this job was called 553 * @param resultHandler callback to pass each result to 554 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name 555 * 556 * @return a JobWaiter object that can be used to block until the job finishes executing 557 * or can be used to cancel the job. 558 * 559 * @throws IllegalArgumentException when partitions ids are illegal 560 */ 561 def submitJob[T, U]( 562 rdd: RDD[T], 563 func: (TaskContext, Iterator[T]) => U, 564 partitions: Seq[Int], 565 callSite: CallSite, 566 resultHandler: (Int, U) => Unit, 567 properties: Properties): JobWaiter[U] = { 568 // Check to make sure we are not launching a task on a partition that does not exist. 569 val maxPartitions = rdd.partitions.length 570 partitions.find(p => p >= maxPartitions || p < 0).foreach { p => 571 throw new IllegalArgumentException( 572 "Attempting to access a non-existent partition: " + p + ". " + 573 "Total number of partitions: " + maxPartitions) 574 } 575 576 val jobId = nextJobId.getAndIncrement() 577 if (partitions.size == 0) { 578 // Return immediately if the job is running 0 tasks 579 return new JobWaiter[U](this, jobId, 0, resultHandler) 580 } 581 582 assert(partitions.size > 0) 583 val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] 584 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) 585 eventProcessLoop.post(JobSubmitted( 586 jobId, rdd, func2, partitions.toArray, callSite, waiter, 587 SerializationUtils.clone(properties))) 588 waiter 589 } 590 591 /** 592 * Run an action job on the given RDD and pass all the results to the resultHandler function as 593 * they arrive. 594 * 595 * @param rdd target RDD to run tasks on 596 * @param func a function to run on each partition of the RDD 597 * @param partitions set of partitions to run on; some jobs may not want to compute on all 598 * partitions of the target RDD, e.g. for operations like first() 599 * @param callSite where in the user program this job was called 600 * @param resultHandler callback to pass each result to 601 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name 602 * 603 * @throws Exception when the job fails 604 */ 605 def runJob[T, U]( 606 rdd: RDD[T], 607 func: (TaskContext, Iterator[T]) => U, 608 partitions: Seq[Int], 609 callSite: CallSite, 610 resultHandler: (Int, U) => Unit, 611 properties: Properties): Unit = { 612 val start = System.nanoTime 613 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) 614 // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`, 615 // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that 616 // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's 617 // safe to pass in null here. For more detail, see SPARK-13747. 618 val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] 619 waiter.completionFuture.ready(Duration.Inf)(awaitPermission) 620 waiter.completionFuture.value.get match { 621 case scala.util.Success(_) => 622 logInfo("Job %d finished: %s, took %f s".format 623 (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) 624 case scala.util.Failure(exception) => 625 logInfo("Job %d failed: %s, took %f s".format 626 (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) 627 // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. 628 val callerStackTrace = Thread.currentThread().getStackTrace.tail 629 exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) 630 throw exception 631 } 632 } 633 634 /** 635 * Run an approximate job on the given RDD and pass all the results to an ApproximateEvaluator 636 * as they arrive. Returns a partial result object from the evaluator. 637 * 638 * @param rdd target RDD to run tasks on 639 * @param func a function to run on each partition of the RDD 640 * @param evaluator [[ApproximateEvaluator]] to receive the partial results 641 * @param callSite where in the user program this job was called 642 * @param timeout maximum time to wait for the job, in milliseconds 643 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name 644 */ 645 def runApproximateJob[T, U, R]( 646 rdd: RDD[T], 647 func: (TaskContext, Iterator[T]) => U, 648 evaluator: ApproximateEvaluator[U, R], 649 callSite: CallSite, 650 timeout: Long, 651 properties: Properties): PartialResult[R] = { 652 val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) 653 val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] 654 val partitions = (0 until rdd.partitions.length).toArray 655 val jobId = nextJobId.getAndIncrement() 656 eventProcessLoop.post(JobSubmitted( 657 jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties))) 658 listener.awaitResult() // Will throw an exception if the job fails 659 } 660 661 /** 662 * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter 663 * can be used to block until the job finishes executing or can be used to cancel the job. 664 * This method is used for adaptive query planning, to run map stages and look at statistics 665 * about their outputs before submitting downstream stages. 666 * 667 * @param dependency the ShuffleDependency to run a map stage for 668 * @param callback function called with the result of the job, which in this case will be a 669 * single MapOutputStatistics object showing how much data was produced for each partition 670 * @param callSite where in the user program this job was submitted 671 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name 672 */ 673 def submitMapStage[K, V, C]( 674 dependency: ShuffleDependency[K, V, C], 675 callback: MapOutputStatistics => Unit, 676 callSite: CallSite, 677 properties: Properties): JobWaiter[MapOutputStatistics] = { 678 679 val rdd = dependency.rdd 680 val jobId = nextJobId.getAndIncrement() 681 if (rdd.partitions.length == 0) { 682 throw new SparkException("Can't run submitMapStage on RDD with 0 partitions") 683 } 684 685 // We create a JobWaiter with only one "task", which will be marked as complete when the whole 686 // map stage has completed, and will be passed the MapOutputStatistics for that stage. 687 // This makes it easier to avoid race conditions between the user code and the map output 688 // tracker that might result if we told the user the stage had finished, but then they queries 689 // the map output tracker and some node failures had caused the output statistics to be lost. 690 val waiter = new JobWaiter(this, jobId, 1, (i: Int, r: MapOutputStatistics) => callback(r)) 691 eventProcessLoop.post(MapStageSubmitted( 692 jobId, dependency, callSite, waiter, SerializationUtils.clone(properties))) 693 waiter 694 } 695 696 /** 697 * Cancel a job that is running or waiting in the queue. 698 */ 699 def cancelJob(jobId: Int): Unit = { 700 logInfo("Asked to cancel job " + jobId) 701 eventProcessLoop.post(JobCancelled(jobId)) 702 } 703 704 /** 705 * Cancel all jobs in the given job group ID. 706 */ 707 def cancelJobGroup(groupId: String): Unit = { 708 logInfo("Asked to cancel job group " + groupId) 709 eventProcessLoop.post(JobGroupCancelled(groupId)) 710 } 711 712 /** 713 * Cancel all jobs that are running or waiting in the queue. 714 */ 715 def cancelAllJobs(): Unit = { 716 eventProcessLoop.post(AllJobsCancelled) 717 } 718 719 private[scheduler] def doCancelAllJobs() { 720 // Cancel all running jobs. 721 runningStages.map(_.firstJobId).foreach(handleJobCancellation(_, 722 reason = "as part of cancellation of all jobs")) 723 activeJobs.clear() // These should already be empty by this point, 724 jobIdToActiveJob.clear() // but just in case we lost track of some jobs... 725 } 726 727 /** 728 * Cancel all jobs associated with a running or scheduled stage. 729 */ 730 def cancelStage(stageId: Int) { 731 eventProcessLoop.post(StageCancelled(stageId)) 732 } 733 734 /** 735 * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since 736 * the last fetch failure. 737 */ 738 private[scheduler] def resubmitFailedStages() { 739 if (failedStages.size > 0) { 740 // Failed stages may be removed by job cancellation, so failed might be empty even if 741 // the ResubmitFailedStages event has been scheduled. 742 logInfo("Resubmitting failed stages") 743 clearCacheLocs() 744 val failedStagesCopy = failedStages.toArray 745 failedStages.clear() 746 for (stage <- failedStagesCopy.sortBy(_.firstJobId)) { 747 submitStage(stage) 748 } 749 } 750 } 751 752 /** 753 * Check for waiting stages which are now eligible for resubmission. 754 * Submits stages that depend on the given parent stage. Called when the parent stage completes 755 * successfully. 756 */ 757 private def submitWaitingChildStages(parent: Stage) { 758 logTrace(s"Checking if any dependencies of $parent are now runnable") 759 logTrace("running: " + runningStages) 760 logTrace("waiting: " + waitingStages) 761 logTrace("failed: " + failedStages) 762 val childStages = waitingStages.filter(_.parents.contains(parent)).toArray 763 waitingStages --= childStages 764 for (stage <- childStages.sortBy(_.firstJobId)) { 765 submitStage(stage) 766 } 767 } 768 769 /** Finds the earliest-created active job that needs the stage */ 770 // TODO: Probably should actually find among the active jobs that need this 771 // stage the one with the highest priority (highest-priority pool, earliest created). 772 // That should take care of at least part of the priority inversion problem with 773 // cross-job dependencies. 774 private def activeJobForStage(stage: Stage): Option[Int] = { 775 val jobsThatUseStage: Array[Int] = stage.jobIds.toArray.sorted 776 jobsThatUseStage.find(jobIdToActiveJob.contains) 777 } 778 779 private[scheduler] def handleJobGroupCancelled(groupId: String) { 780 // Cancel all jobs belonging to this job group. 781 // First finds all active jobs with this group id, and then kill stages for them. 782 val activeInGroup = activeJobs.filter { activeJob => 783 Option(activeJob.properties).exists { 784 _.getProperty(SparkContext.SPARK_JOB_GROUP_ID) == groupId 785 } 786 } 787 val jobIds = activeInGroup.map(_.jobId) 788 jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId))) 789 } 790 791 private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { 792 // Note that there is a chance that this task is launched after the stage is cancelled. 793 // In that case, we wouldn't have the stage anymore in stageIdToStage. 794 val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) 795 listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) 796 } 797 798 private[scheduler] def handleTaskSetFailed( 799 taskSet: TaskSet, 800 reason: String, 801 exception: Option[Throwable]): Unit = { 802 stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) } 803 } 804 805 private[scheduler] def cleanUpAfterSchedulerStop() { 806 for (job <- activeJobs) { 807 val error = 808 new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down") 809 job.listener.jobFailed(error) 810 // Tell the listeners that all of the running stages have ended. Don't bother 811 // cancelling the stages because if the DAG scheduler is stopped, the entire application 812 // is in the process of getting stopped. 813 val stageFailedMessage = "Stage cancelled because SparkContext was shut down" 814 // The `toArray` here is necessary so that we don't iterate over `runningStages` while 815 // mutating it. 816 runningStages.toArray.foreach { stage => 817 markStageAsFinished(stage, Some(stageFailedMessage)) 818 } 819 listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error))) 820 } 821 } 822 823 private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) { 824 listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) 825 } 826 827 private[scheduler] def handleJobSubmitted(jobId: Int, 828 finalRDD: RDD[_], 829 func: (TaskContext, Iterator[_]) => _, 830 partitions: Array[Int], 831 callSite: CallSite, 832 listener: JobListener, 833 properties: Properties) { 834 var finalStage: ResultStage = null 835 try { 836 // New stage creation may throw an exception if, for example, jobs are run on a 837 // HadoopRDD whose underlying HDFS files have been deleted. 838 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 839 } catch { 840 case e: Exception => 841 logWarning("Creating new stage failed due to exception - job: " + jobId, e) 842 listener.jobFailed(e) 843 return 844 } 845 846 val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) 847 clearCacheLocs() 848 logInfo("Got job %s (%s) with %d output partitions".format( 849 job.jobId, callSite.shortForm, partitions.length)) 850 logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") 851 logInfo("Parents of final stage: " + finalStage.parents) 852 logInfo("Missing parents: " + getMissingParentStages(finalStage)) 853 854 val jobSubmissionTime = clock.getTimeMillis() 855 jobIdToActiveJob(jobId) = job 856 activeJobs += job 857 finalStage.setActiveJob(job) 858 val stageIds = jobIdToStageIds(jobId).toArray 859 val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) 860 listenerBus.post( 861 SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) 862 submitStage(finalStage) 863 } 864 865 private[scheduler] def handleMapStageSubmitted(jobId: Int, 866 dependency: ShuffleDependency[_, _, _], 867 callSite: CallSite, 868 listener: JobListener, 869 properties: Properties) { 870 // Submitting this map stage might still require the creation of some parent stages, so make 871 // sure that happens. 872 var finalStage: ShuffleMapStage = null 873 try { 874 // New stage creation may throw an exception if, for example, jobs are run on a 875 // HadoopRDD whose underlying HDFS files have been deleted. 876 finalStage = getOrCreateShuffleMapStage(dependency, jobId) 877 } catch { 878 case e: Exception => 879 logWarning("Creating new stage failed due to exception - job: " + jobId, e) 880 listener.jobFailed(e) 881 return 882 } 883 884 val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) 885 clearCacheLocs() 886 logInfo("Got map stage job %s (%s) with %d output partitions".format( 887 jobId, callSite.shortForm, dependency.rdd.partitions.length)) 888 logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") 889 logInfo("Parents of final stage: " + finalStage.parents) 890 logInfo("Missing parents: " + getMissingParentStages(finalStage)) 891 892 val jobSubmissionTime = clock.getTimeMillis() 893 jobIdToActiveJob(jobId) = job 894 activeJobs += job 895 finalStage.addActiveJob(job) 896 val stageIds = jobIdToStageIds(jobId).toArray 897 val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) 898 listenerBus.post( 899 SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) 900 submitStage(finalStage) 901 902 // If the whole stage has already finished, tell the listener and remove it 903 if (finalStage.isAvailable) { 904 markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency)) 905 } 906 } 907 908 /** Submits stage, but first recursively submits any missing parents. */ 909 private def submitStage(stage: Stage) { 910 val jobId = activeJobForStage(stage) 911 if (jobId.isDefined) { 912 logDebug("submitStage(" + stage + ")") 913 if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { 914 val missing = getMissingParentStages(stage).sortBy(_.id) 915 logDebug("missing: " + missing) 916 if (missing.isEmpty) { 917 logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") 918 submitMissingTasks(stage, jobId.get) 919 } else { 920 for (parent <- missing) { 921 submitStage(parent) 922 } 923 waitingStages += stage 924 } 925 } 926 } else { 927 abortStage(stage, "No active job for stage " + stage.id, None) 928 } 929 } 930 931 /** Called when stage's parents are available and we can now do its task. */ 932 private def submitMissingTasks(stage: Stage, jobId: Int) { 933 logDebug("submitMissingTasks(" + stage + ")") 934 // Get our pending tasks and remember them in our pendingTasks entry 935 stage.pendingPartitions.clear() 936 937 // First figure out the indexes of partition ids to compute. 938 val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() 939 940 // Use the scheduling pool, job group, description, etc. from an ActiveJob associated 941 // with this Stage 942 val properties = jobIdToActiveJob(jobId).properties 943 944 runningStages += stage 945 // SparkListenerStageSubmitted should be posted before testing whether tasks are 946 // serializable. If tasks are not serializable, a SparkListenerStageCompleted event 947 // will be posted, which should always come after a corresponding SparkListenerStageSubmitted 948 // event. 949 stage match { 950 case s: ShuffleMapStage => 951 outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) 952 case s: ResultStage => 953 outputCommitCoordinator.stageStart( 954 stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) 955 } 956 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { 957 stage match { 958 case s: ShuffleMapStage => 959 partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap 960 case s: ResultStage => 961 partitionsToCompute.map { id => 962 val p = s.partitions(id) 963 (id, getPreferredLocs(stage.rdd, p)) 964 }.toMap 965 } 966 } catch { 967 case NonFatal(e) => 968 stage.makeNewStageAttempt(partitionsToCompute.size) 969 listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) 970 abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) 971 runningStages -= stage 972 return 973 } 974 975 stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) 976 listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) 977 978 // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. 979 // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast 980 // the serialized copy of the RDD and for each task we will deserialize it, which means each 981 // task gets a different copy of the RDD. This provides stronger isolation between tasks that 982 // might modify state of objects referenced in their closures. This is necessary in Hadoop 983 // where the JobConf/Configuration object is not thread-safe. 984 var taskBinary: Broadcast[Array[Byte]] = null 985 try { 986 // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). 987 // For ResultTask, serialize and broadcast (rdd, func). 988 val taskBinaryBytes: Array[Byte] = stage match { 989 case stage: ShuffleMapStage => 990 JavaUtils.bufferToArray( 991 closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) 992 case stage: ResultStage => 993 JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) 994 } 995 996 taskBinary = sc.broadcast(taskBinaryBytes) 997 } catch { 998 // In the case of a failure during serialization, abort the stage. 999 case e: NotSerializableException => 1000 abortStage(stage, "Task not serializable: " + e.toString, Some(e)) 1001 runningStages -= stage 1002 1003 // Abort execution 1004 return 1005 case NonFatal(e) => 1006 abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e)) 1007 runningStages -= stage 1008 return 1009 } 1010 1011 val tasks: Seq[Task[_]] = try { 1012 stage match { 1013 case stage: ShuffleMapStage => 1014 partitionsToCompute.map { id => 1015 val locs = taskIdToLocations(id) 1016 val part = stage.rdd.partitions(id) 1017 new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, 1018 taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId), 1019 Option(sc.applicationId), sc.applicationAttemptId) 1020 } 1021 1022 case stage: ResultStage => 1023 partitionsToCompute.map { id => 1024 val p: Int = stage.partitions(id) 1025 val part = stage.rdd.partitions(p) 1026 val locs = taskIdToLocations(id) 1027 new ResultTask(stage.id, stage.latestInfo.attemptId, 1028 taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, 1029 Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) 1030 } 1031 } 1032 } catch { 1033 case NonFatal(e) => 1034 abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) 1035 runningStages -= stage 1036 return 1037 } 1038 1039 if (tasks.size > 0) { 1040 logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") 1041 stage.pendingPartitions ++= tasks.map(_.partitionId) 1042 logDebug("New pending partitions: " + stage.pendingPartitions) 1043 taskScheduler.submitTasks(new TaskSet( 1044 tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) 1045 stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) 1046 } else { 1047 // Because we posted SparkListenerStageSubmitted earlier, we should mark 1048 // the stage as completed here in case there are no tasks to run 1049 markStageAsFinished(stage, None) 1050 1051 val debugString = stage match { 1052 case stage: ShuffleMapStage => 1053 s"Stage ${stage} is actually done; " + 1054 s"(available: ${stage.isAvailable}," + 1055 s"available outputs: ${stage.numAvailableOutputs}," + 1056 s"partitions: ${stage.numPartitions})" 1057 case stage : ResultStage => 1058 s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" 1059 } 1060 logDebug(debugString) 1061 1062 submitWaitingChildStages(stage) 1063 } 1064 } 1065 1066 /** 1067 * Merge local values from a task into the corresponding accumulators previously registered 1068 * here on the driver. 1069 * 1070 * Although accumulators themselves are not thread-safe, this method is called only from one 1071 * thread, the one that runs the scheduling loop. This means we only handle one task 1072 * completion event at a time so we don't need to worry about locking the accumulators. 1073 * This still doesn't stop the caller from updating the accumulator outside the scheduler, 1074 * but that's not our problem since there's nothing we can do about that. 1075 */ 1076 private def updateAccumulators(event: CompletionEvent): Unit = { 1077 val task = event.task 1078 val stage = stageIdToStage(task.stageId) 1079 try { 1080 event.accumUpdates.foreach { updates => 1081 val id = updates.id 1082 // Find the corresponding accumulator on the driver and update it 1083 val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match { 1084 case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]] 1085 case None => 1086 throw new SparkException(s"attempted to access non-existent accumulator $id") 1087 } 1088 acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]]) 1089 // To avoid UI cruft, ignore cases where value wasn't updated 1090 if (acc.name.isDefined && !updates.isZero) { 1091 stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) 1092 event.taskInfo.accumulables += acc.toInfo(Some(updates.value), Some(acc.value)) 1093 } 1094 } 1095 } catch { 1096 case NonFatal(e) => 1097 logError(s"Failed to update accumulators for task ${task.partitionId}", e) 1098 } 1099 } 1100 1101 /** 1102 * Responds to a task finishing. This is called inside the event loop so it assumes that it can 1103 * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. 1104 */ 1105 private[scheduler] def handleTaskCompletion(event: CompletionEvent) { 1106 val task = event.task 1107 val taskId = event.taskInfo.id 1108 val stageId = task.stageId 1109 val taskType = Utils.getFormattedClassName(task) 1110 1111 outputCommitCoordinator.taskCompleted( 1112 stageId, 1113 task.partitionId, 1114 event.taskInfo.attemptNumber, // this is a task attempt number 1115 event.reason) 1116 1117 // Reconstruct task metrics. Note: this may be null if the task has failed. 1118 val taskMetrics: TaskMetrics = 1119 if (event.accumUpdates.nonEmpty) { 1120 try { 1121 TaskMetrics.fromAccumulators(event.accumUpdates) 1122 } catch { 1123 case NonFatal(e) => 1124 logError(s"Error when attempting to reconstruct metrics for task $taskId", e) 1125 null 1126 } 1127 } else { 1128 null 1129 } 1130 1131 // The stage may have already finished when we get this event -- eg. maybe it was a 1132 // speculative task. It is important that we send the TaskEnd event in any case, so listeners 1133 // are properly notified and can chose to handle it. For instance, some listeners are 1134 // doing their own accounting and if they don't get the task end event they think 1135 // tasks are still running when they really aren't. 1136 listenerBus.post(SparkListenerTaskEnd( 1137 stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics)) 1138 1139 if (!stageIdToStage.contains(task.stageId)) { 1140 // Skip all the actions if the stage has been cancelled. 1141 return 1142 } 1143 1144 val stage = stageIdToStage(task.stageId) 1145 event.reason match { 1146 case Success => 1147 stage.pendingPartitions -= task.partitionId 1148 task match { 1149 case rt: ResultTask[_, _] => 1150 // Cast to ResultStage here because it's part of the ResultTask 1151 // TODO Refactor this out to a function that accepts a ResultStage 1152 val resultStage = stage.asInstanceOf[ResultStage] 1153 resultStage.activeJob match { 1154 case Some(job) => 1155 if (!job.finished(rt.outputId)) { 1156 updateAccumulators(event) 1157 job.finished(rt.outputId) = true 1158 job.numFinished += 1 1159 // If the whole job has finished, remove it 1160 if (job.numFinished == job.numPartitions) { 1161 markStageAsFinished(resultStage) 1162 cleanupStateForJobAndIndependentStages(job) 1163 listenerBus.post( 1164 SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) 1165 } 1166 1167 // taskSucceeded runs some user code that might throw an exception. Make sure 1168 // we are resilient against that. 1169 try { 1170 job.listener.taskSucceeded(rt.outputId, event.result) 1171 } catch { 1172 case e: Exception => 1173 // TODO: Perhaps we want to mark the resultStage as failed? 1174 job.listener.jobFailed(new SparkDriverExecutionException(e)) 1175 } 1176 } 1177 case None => 1178 logInfo("Ignoring result from " + rt + " because its job has finished") 1179 } 1180 1181 case smt: ShuffleMapTask => 1182 val shuffleStage = stage.asInstanceOf[ShuffleMapStage] 1183 updateAccumulators(event) 1184 val status = event.result.asInstanceOf[MapStatus] 1185 val execId = status.location.executorId 1186 logDebug("ShuffleMapTask finished on " + execId) 1187 if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { 1188 logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") 1189 } else { 1190 shuffleStage.addOutputLoc(smt.partitionId, status) 1191 } 1192 1193 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { 1194 markStageAsFinished(shuffleStage) 1195 logInfo("looking for newly runnable stages") 1196 logInfo("running: " + runningStages) 1197 logInfo("waiting: " + waitingStages) 1198 logInfo("failed: " + failedStages) 1199 1200 // We supply true to increment the epoch number here in case this is a 1201 // recomputation of the map outputs. In that case, some nodes may have cached 1202 // locations with holes (from when we detected the error) and will need the 1203 // epoch incremented to refetch them. 1204 // TODO: Only increment the epoch number if this is not the first time 1205 // we registered these map outputs. 1206 mapOutputTracker.registerMapOutputs( 1207 shuffleStage.shuffleDep.shuffleId, 1208 shuffleStage.outputLocInMapOutputTrackerFormat(), 1209 changeEpoch = true) 1210 1211 clearCacheLocs() 1212 1213 if (!shuffleStage.isAvailable) { 1214 // Some tasks had failed; let's resubmit this shuffleStage 1215 // TODO: Lower-level scheduler should also deal with this 1216 logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + 1217 ") because some of its tasks had failed: " + 1218 shuffleStage.findMissingPartitions().mkString(", ")) 1219 submitStage(shuffleStage) 1220 } else { 1221 // Mark any map-stage jobs waiting on this stage as finished 1222 if (shuffleStage.mapStageJobs.nonEmpty) { 1223 val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) 1224 for (job <- shuffleStage.mapStageJobs) { 1225 markMapStageJobAsFinished(job, stats) 1226 } 1227 } 1228 submitWaitingChildStages(shuffleStage) 1229 } 1230 } 1231 } 1232 1233 case Resubmitted => 1234 logInfo("Resubmitted " + task + ", so marking it as still running") 1235 stage.pendingPartitions += task.partitionId 1236 1237 case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => 1238 val failedStage = stageIdToStage(task.stageId) 1239 val mapStage = shuffleIdToMapStage(shuffleId) 1240 1241 if (failedStage.latestInfo.attemptId != task.stageAttemptId) { 1242 logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + 1243 s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + 1244 s"(attempt ID ${failedStage.latestInfo.attemptId}) running") 1245 } else { 1246 // It is likely that we receive multiple FetchFailed for a single stage (because we have 1247 // multiple tasks running concurrently on different executors). In that case, it is 1248 // possible the fetch failure has already been handled by the scheduler. 1249 if (runningStages.contains(failedStage)) { 1250 logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + 1251 s"due to a fetch failure from $mapStage (${mapStage.name})") 1252 markStageAsFinished(failedStage, Some(failureMessage)) 1253 } else { 1254 logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + 1255 s"longer running") 1256 } 1257 1258 if (disallowStageRetryForTest) { 1259 abortStage(failedStage, "Fetch failure will not retry stage due to testing config", 1260 None) 1261 } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { 1262 abortStage(failedStage, s"$failedStage (${failedStage.name}) " + 1263 s"has failed the maximum allowable number of " + 1264 s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + 1265 s"Most recent failure reason: ${failureMessage}", None) 1266 } else { 1267 if (failedStages.isEmpty) { 1268 // Don't schedule an event to resubmit failed stages if failed isn't empty, because 1269 // in that case the event will already have been scheduled. 1270 // TODO: Cancel running tasks in the stage 1271 logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " + 1272 s"$failedStage (${failedStage.name}) due to fetch failure") 1273 messageScheduler.schedule(new Runnable { 1274 override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) 1275 }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) 1276 } 1277 failedStages += failedStage 1278 failedStages += mapStage 1279 } 1280 // Mark the map whose fetch failed as broken in the map stage 1281 if (mapId != -1) { 1282 mapStage.removeOutputLoc(mapId, bmAddress) 1283 mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) 1284 } 1285 1286 // TODO: mark the executor as failed only if there were lots of fetch failures on it 1287 if (bmAddress != null) { 1288 handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch)) 1289 } 1290 } 1291 1292 case commitDenied: TaskCommitDenied => 1293 // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits 1294 1295 case exceptionFailure: ExceptionFailure => 1296 // Tasks failed with exceptions might still have accumulator updates. 1297 updateAccumulators(event) 1298 1299 case TaskResultLost => 1300 // Do nothing here; the TaskScheduler handles these failures and resubmits the task. 1301 1302 case _: ExecutorLostFailure | TaskKilled | UnknownReason => 1303 // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler 1304 // will abort the job. 1305 } 1306 } 1307 1308 /** 1309 * Responds to an executor being lost. This is called inside the event loop, so it assumes it can 1310 * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. 1311 * 1312 * We will also assume that we've lost all shuffle blocks associated with the executor if the 1313 * executor serves its own blocks (i.e., we're not using external shuffle), the entire slave 1314 * is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we 1315 * presume all shuffle data related to this executor to be lost. 1316 * 1317 * Optionally the epoch during which the failure was caught can be passed to avoid allowing 1318 * stray fetch failures from possibly retriggering the detection of a node as lost. 1319 */ 1320 private[scheduler] def handleExecutorLost( 1321 execId: String, 1322 filesLost: Boolean, 1323 maybeEpoch: Option[Long] = None) { 1324 val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) 1325 if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { 1326 failedEpoch(execId) = currentEpoch 1327 logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) 1328 blockManagerMaster.removeExecutor(execId) 1329 1330 if (filesLost || !env.blockManager.externalShuffleServiceEnabled) { 1331 logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch)) 1332 // TODO: This will be really slow if we keep accumulating shuffle map stages 1333 for ((shuffleId, stage) <- shuffleIdToMapStage) { 1334 stage.removeOutputsOnExecutor(execId) 1335 mapOutputTracker.registerMapOutputs( 1336 shuffleId, 1337 stage.outputLocInMapOutputTrackerFormat(), 1338 changeEpoch = true) 1339 } 1340 if (shuffleIdToMapStage.isEmpty) { 1341 mapOutputTracker.incrementEpoch() 1342 } 1343 clearCacheLocs() 1344 } 1345 } else { 1346 logDebug("Additional executor lost message for " + execId + 1347 "(epoch " + currentEpoch + ")") 1348 } 1349 } 1350 1351 private[scheduler] def handleExecutorAdded(execId: String, host: String) { 1352 // remove from failedEpoch(execId) ? 1353 if (failedEpoch.contains(execId)) { 1354 logInfo("Host added was in lost list earlier: " + host) 1355 failedEpoch -= execId 1356 } 1357 } 1358 1359 private[scheduler] def handleStageCancellation(stageId: Int) { 1360 stageIdToStage.get(stageId) match { 1361 case Some(stage) => 1362 val jobsThatUseStage: Array[Int] = stage.jobIds.toArray 1363 jobsThatUseStage.foreach { jobId => 1364 handleJobCancellation(jobId, s"because Stage $stageId was cancelled") 1365 } 1366 case None => 1367 logInfo("No active jobs to kill for Stage " + stageId) 1368 } 1369 } 1370 1371 private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") { 1372 if (!jobIdToStageIds.contains(jobId)) { 1373 logDebug("Trying to cancel unregistered job " + jobId) 1374 } else { 1375 failJobAndIndependentStages( 1376 jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason)) 1377 } 1378 } 1379 1380 /** 1381 * Marks a stage as finished and removes it from the list of running stages. 1382 */ 1383 private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = { 1384 val serviceTime = stage.latestInfo.submissionTime match { 1385 case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) 1386 case _ => "Unknown" 1387 } 1388 if (errorMessage.isEmpty) { 1389 logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) 1390 stage.latestInfo.completionTime = Some(clock.getTimeMillis()) 1391 1392 // Clear failure count for this stage, now that it's succeeded. 1393 // We only limit consecutive failures of stage attempts,so that if a stage is 1394 // re-used many times in a long-running job, unrelated failures don't eventually cause the 1395 // stage to be aborted. 1396 stage.clearFailures() 1397 } else { 1398 stage.latestInfo.stageFailed(errorMessage.get) 1399 logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}") 1400 } 1401 1402 outputCommitCoordinator.stageEnd(stage.id) 1403 listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) 1404 runningStages -= stage 1405 } 1406 1407 /** 1408 * Aborts all jobs depending on a particular Stage. This is called in response to a task set 1409 * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. 1410 */ 1411 private[scheduler] def abortStage( 1412 failedStage: Stage, 1413 reason: String, 1414 exception: Option[Throwable]): Unit = { 1415 if (!stageIdToStage.contains(failedStage.id)) { 1416 // Skip all the actions if the stage has been removed. 1417 return 1418 } 1419 val dependentJobs: Seq[ActiveJob] = 1420 activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq 1421 failedStage.latestInfo.completionTime = Some(clock.getTimeMillis()) 1422 for (job <- dependentJobs) { 1423 failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", exception) 1424 } 1425 if (dependentJobs.isEmpty) { 1426 logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") 1427 } 1428 } 1429 1430 /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */ 1431 private def failJobAndIndependentStages( 1432 job: ActiveJob, 1433 failureReason: String, 1434 exception: Option[Throwable] = None): Unit = { 1435 val error = new SparkException(failureReason, exception.getOrElse(null)) 1436 var ableToCancelStages = true 1437 1438 val shouldInterruptThread = 1439 if (job.properties == null) false 1440 else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean 1441 1442 // Cancel all independent, running stages. 1443 val stages = jobIdToStageIds(job.jobId) 1444 if (stages.isEmpty) { 1445 logError("No stages registered for job " + job.jobId) 1446 } 1447 stages.foreach { stageId => 1448 val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds) 1449 if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) { 1450 logError( 1451 "Job %d not registered for stage %d even though that stage was registered for the job" 1452 .format(job.jobId, stageId)) 1453 } else if (jobsForStage.get.size == 1) { 1454 if (!stageIdToStage.contains(stageId)) { 1455 logError(s"Missing Stage for stage with id $stageId") 1456 } else { 1457 // This is the only job that uses this stage, so fail the stage if it is running. 1458 val stage = stageIdToStage(stageId) 1459 if (runningStages.contains(stage)) { 1460 try { // cancelTasks will fail if a SchedulerBackend does not implement killTask 1461 taskScheduler.cancelTasks(stageId, shouldInterruptThread) 1462 markStageAsFinished(stage, Some(failureReason)) 1463 } catch { 1464 case e: UnsupportedOperationException => 1465 logInfo(s"Could not cancel tasks for stage $stageId", e) 1466 ableToCancelStages = false 1467 } 1468 } 1469 } 1470 } 1471 } 1472 1473 if (ableToCancelStages) { 1474 // SPARK-15783 important to cleanup state first, just for tests where we have some asserts 1475 // against the state. Otherwise we have a *little* bit of flakiness in the tests. 1476 cleanupStateForJobAndIndependentStages(job) 1477 job.listener.jobFailed(error) 1478 listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error))) 1479 } 1480 } 1481 1482 /** Return true if one of stage's ancestors is target. */ 1483 private def stageDependsOn(stage: Stage, target: Stage): Boolean = { 1484 if (stage == target) { 1485 return true 1486 } 1487 val visitedRdds = new HashSet[RDD[_]] 1488 // We are manually maintaining a stack here to prevent StackOverflowError 1489 // caused by recursively visiting 1490 val waitingForVisit = new Stack[RDD[_]] 1491 def visit(rdd: RDD[_]) { 1492 if (!visitedRdds(rdd)) { 1493 visitedRdds += rdd 1494 for (dep <- rdd.dependencies) { 1495 dep match { 1496 case shufDep: ShuffleDependency[_, _, _] => 1497 val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) 1498 if (!mapStage.isAvailable) { 1499 waitingForVisit.push(mapStage.rdd) 1500 } // Otherwise there's no need to follow the dependency back 1501 case narrowDep: NarrowDependency[_] => 1502 waitingForVisit.push(narrowDep.rdd) 1503 } 1504 } 1505 } 1506 } 1507 waitingForVisit.push(stage.rdd) 1508 while (waitingForVisit.nonEmpty) { 1509 visit(waitingForVisit.pop()) 1510 } 1511 visitedRdds.contains(target.rdd) 1512 } 1513 1514 /** 1515 * Gets the locality information associated with a partition of a particular RDD. 1516 * 1517 * This method is thread-safe and is called from both DAGScheduler and SparkContext. 1518 * 1519 * @param rdd whose partitions are to be looked at 1520 * @param partition to lookup locality information for 1521 * @return list of machines that are preferred by the partition 1522 */ 1523 private[spark] 1524 def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { 1525 getPreferredLocsInternal(rdd, partition, new HashSet) 1526 } 1527 1528 /** 1529 * Recursive implementation for getPreferredLocs. 1530 * 1531 * This method is thread-safe because it only accesses DAGScheduler state through thread-safe 1532 * methods (getCacheLocs()); please be careful when modifying this method, because any new 1533 * DAGScheduler state accessed by it may require additional synchronization. 1534 */ 1535 private def getPreferredLocsInternal( 1536 rdd: RDD[_], 1537 partition: Int, 1538 visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { 1539 // If the partition has already been visited, no need to re-visit. 1540 // This avoids exponential path exploration. SPARK-695 1541 if (!visited.add((rdd, partition))) { 1542 // Nil has already been returned for previously visited partitions. 1543 return Nil 1544 } 1545 // If the partition is cached, return the cache locations 1546 val cached = getCacheLocs(rdd)(partition) 1547 if (cached.nonEmpty) { 1548 return cached 1549 } 1550 // If the RDD has some placement preferences (as is the case for input RDDs), get those 1551 val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList 1552 if (rddPrefs.nonEmpty) { 1553 return rddPrefs.map(TaskLocation(_)) 1554 } 1555 1556 // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency 1557 // that has any placement preferences. Ideally we would choose based on transfer sizes, 1558 // but this will do for now. 1559 rdd.dependencies.foreach { 1560 case n: NarrowDependency[_] => 1561 for (inPart <- n.getParents(partition)) { 1562 val locs = getPreferredLocsInternal(n.rdd, inPart, visited) 1563 if (locs != Nil) { 1564 return locs 1565 } 1566 } 1567 1568 case _ => 1569 } 1570 1571 Nil 1572 } 1573 1574 /** Mark a map stage job as finished with the given output stats, and report to its listener. */ 1575 def markMapStageJobAsFinished(job: ActiveJob, stats: MapOutputStatistics): Unit = { 1576 // In map stage jobs, we only create a single "task", which is to finish all of the stage 1577 // (including reusing any previous map outputs, etc); so we just mark task 0 as done 1578 job.finished(0) = true 1579 job.numFinished += 1 1580 job.listener.taskSucceeded(0, stats) 1581 cleanupStateForJobAndIndependentStages(job) 1582 listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) 1583 } 1584 1585 def stop() { 1586 messageScheduler.shutdownNow() 1587 eventProcessLoop.stop() 1588 taskScheduler.stop() 1589 } 1590 1591 eventProcessLoop.start() 1592} 1593 1594private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) 1595 extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging { 1596 1597 private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer 1598 1599 /** 1600 * The main event loop of the DAG scheduler. 1601 */ 1602 override def onReceive(event: DAGSchedulerEvent): Unit = { 1603 val timerContext = timer.time() 1604 try { 1605 doOnReceive(event) 1606 } finally { 1607 timerContext.stop() 1608 } 1609 } 1610 1611 private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { 1612 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => 1613 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) 1614 1615 case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => 1616 dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) 1617 1618 case StageCancelled(stageId) => 1619 dagScheduler.handleStageCancellation(stageId) 1620 1621 case JobCancelled(jobId) => 1622 dagScheduler.handleJobCancellation(jobId) 1623 1624 case JobGroupCancelled(groupId) => 1625 dagScheduler.handleJobGroupCancelled(groupId) 1626 1627 case AllJobsCancelled => 1628 dagScheduler.doCancelAllJobs() 1629 1630 case ExecutorAdded(execId, host) => 1631 dagScheduler.handleExecutorAdded(execId, host) 1632 1633 case ExecutorLost(execId, reason) => 1634 val filesLost = reason match { 1635 case SlaveLost(_, true) => true 1636 case _ => false 1637 } 1638 dagScheduler.handleExecutorLost(execId, filesLost) 1639 1640 case BeginEvent(task, taskInfo) => 1641 dagScheduler.handleBeginEvent(task, taskInfo) 1642 1643 case GettingResultEvent(taskInfo) => 1644 dagScheduler.handleGetTaskResult(taskInfo) 1645 1646 case completion: CompletionEvent => 1647 dagScheduler.handleTaskCompletion(completion) 1648 1649 case TaskSetFailed(taskSet, reason, exception) => 1650 dagScheduler.handleTaskSetFailed(taskSet, reason, exception) 1651 1652 case ResubmitFailedStages => 1653 dagScheduler.resubmitFailedStages() 1654 } 1655 1656 override def onError(e: Throwable): Unit = { 1657 logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e) 1658 try { 1659 dagScheduler.doCancelAllJobs() 1660 } catch { 1661 case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) 1662 } 1663 dagScheduler.sc.stopInNewThread() 1664 } 1665 1666 override def onStop(): Unit = { 1667 // Cancel any active jobs in postStop hook 1668 dagScheduler.cleanUpAfterSchedulerStop() 1669 } 1670} 1671 1672private[spark] object DAGScheduler { 1673 // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; 1674 // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one 1675 // as more failure events come in 1676 val RESUBMIT_TIMEOUT = 200 1677} 1678