1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import java.nio.ByteBuffer 21import java.util.{Timer, TimerTask} 22import java.util.concurrent.TimeUnit 23import java.util.concurrent.atomic.AtomicLong 24 25import scala.collection.Set 26import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} 27import scala.util.Random 28 29import org.apache.spark._ 30import org.apache.spark.TaskState.TaskState 31import org.apache.spark.internal.Logging 32import org.apache.spark.internal.config 33import org.apache.spark.scheduler.SchedulingMode.SchedulingMode 34import org.apache.spark.scheduler.TaskLocality.TaskLocality 35import org.apache.spark.scheduler.local.LocalSchedulerBackend 36import org.apache.spark.storage.BlockManagerId 37import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} 38 39/** 40 * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. 41 * It can also work with a local setup by using a [[LocalSchedulerBackend]] and setting 42 * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking 43 * up to launch speculative tasks, etc. 44 * 45 * Clients should first call initialize() and start(), then submit task sets through the 46 * runTasks method. 47 * 48 * THREADING: [[SchedulerBackend]]s and task-submitting clients can call this class from multiple 49 * threads, so it needs locks in public API methods to maintain its state. In addition, some 50 * [[SchedulerBackend]]s synchronize on themselves when they want to send events here, and then 51 * acquire a lock on us, so we need to make sure that we don't try to lock the backend while 52 * we are holding a lock on ourselves. 53 */ 54private[spark] class TaskSchedulerImpl( 55 val sc: SparkContext, 56 val maxTaskFailures: Int, 57 isLocal: Boolean = false) 58 extends TaskScheduler with Logging 59{ 60 def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) 61 62 val conf = sc.conf 63 64 // How often to check for speculative tasks 65 val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") 66 67 // Duplicate copies of a task will only be launched if the original copy has been running for 68 // at least this amount of time. This is to avoid the overhead of launching speculative copies 69 // of tasks that are very short. 70 val MIN_TIME_TO_SPECULATION = 100 71 72 private val speculationScheduler = 73 ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") 74 75 // Threshold above which we warn user initial TaskSet may be starved 76 val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") 77 78 // CPUs to request per task 79 val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) 80 81 // TaskSetManagers are not thread safe, so any access to one should be synchronized 82 // on this class. 83 private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] 84 85 // Protected by `this` 86 private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] 87 val taskIdToExecutorId = new HashMap[Long, String] 88 89 @volatile private var hasReceivedTask = false 90 @volatile private var hasLaunchedTask = false 91 private val starvationTimer = new Timer(true) 92 93 // Incrementing task IDs 94 val nextTaskId = new AtomicLong(0) 95 96 // IDs of the tasks running on each executor 97 private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] 98 99 def runningTasksByExecutors: Map[String, Int] = synchronized { 100 executorIdToRunningTaskIds.toMap.mapValues(_.size) 101 } 102 103 // The set of executors we have on each host; this is used to compute hostsAlive, which 104 // in turn is used to decide when we can attain data locality on a given host 105 protected val hostToExecutors = new HashMap[String, HashSet[String]] 106 107 protected val hostsByRack = new HashMap[String, HashSet[String]] 108 109 protected val executorIdToHost = new HashMap[String, String] 110 111 // Listener object to pass upcalls into 112 var dagScheduler: DAGScheduler = null 113 114 var backend: SchedulerBackend = null 115 116 val mapOutputTracker = SparkEnv.get.mapOutputTracker 117 118 var schedulableBuilder: SchedulableBuilder = null 119 var rootPool: Pool = null 120 // default scheduler is FIFO 121 private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") 122 val schedulingMode: SchedulingMode = try { 123 SchedulingMode.withName(schedulingModeConf.toUpperCase) 124 } catch { 125 case e: java.util.NoSuchElementException => 126 throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf") 127 } 128 129 // This is a var so that we can reset it for testing purposes. 130 private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) 131 132 override def setDAGScheduler(dagScheduler: DAGScheduler) { 133 this.dagScheduler = dagScheduler 134 } 135 136 def initialize(backend: SchedulerBackend) { 137 this.backend = backend 138 // temporarily set rootPool name to empty 139 rootPool = new Pool("", schedulingMode, 0, 0) 140 schedulableBuilder = { 141 schedulingMode match { 142 case SchedulingMode.FIFO => 143 new FIFOSchedulableBuilder(rootPool) 144 case SchedulingMode.FAIR => 145 new FairSchedulableBuilder(rootPool, conf) 146 case _ => 147 throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode") 148 } 149 } 150 schedulableBuilder.buildPools() 151 } 152 153 def newTaskId(): Long = nextTaskId.getAndIncrement() 154 155 override def start() { 156 backend.start() 157 158 if (!isLocal && conf.getBoolean("spark.speculation", false)) { 159 logInfo("Starting speculative execution thread") 160 speculationScheduler.scheduleAtFixedRate(new Runnable { 161 override def run(): Unit = Utils.tryOrStopSparkContext(sc) { 162 checkSpeculatableTasks() 163 } 164 }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS) 165 } 166 } 167 168 override def postStartHook() { 169 waitBackendReady() 170 } 171 172 override def submitTasks(taskSet: TaskSet) { 173 val tasks = taskSet.tasks 174 logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") 175 this.synchronized { 176 val manager = createTaskSetManager(taskSet, maxTaskFailures) 177 val stage = taskSet.stageId 178 val stageTaskSets = 179 taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) 180 stageTaskSets(taskSet.stageAttemptId) = manager 181 val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => 182 ts.taskSet != taskSet && !ts.isZombie 183 } 184 if (conflictingTaskSet) { 185 throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + 186 s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") 187 } 188 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) 189 190 if (!isLocal && !hasReceivedTask) { 191 starvationTimer.scheduleAtFixedRate(new TimerTask() { 192 override def run() { 193 if (!hasLaunchedTask) { 194 logWarning("Initial job has not accepted any resources; " + 195 "check your cluster UI to ensure that workers are registered " + 196 "and have sufficient resources") 197 } else { 198 this.cancel() 199 } 200 } 201 }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) 202 } 203 hasReceivedTask = true 204 } 205 backend.reviveOffers() 206 } 207 208 // Label as private[scheduler] to allow tests to swap in different task set managers if necessary 209 private[scheduler] def createTaskSetManager( 210 taskSet: TaskSet, 211 maxTaskFailures: Int): TaskSetManager = { 212 new TaskSetManager(this, taskSet, maxTaskFailures) 213 } 214 215 override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { 216 logInfo("Cancelling stage " + stageId) 217 taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => 218 attempts.foreach { case (_, tsm) => 219 // There are two possible cases here: 220 // 1. The task set manager has been created and some tasks have been scheduled. 221 // In this case, send a kill signal to the executors to kill the task and then abort 222 // the stage. 223 // 2. The task set manager has been created but no tasks has been scheduled. In this case, 224 // simply abort the stage. 225 tsm.runningTasksSet.foreach { tid => 226 val execId = taskIdToExecutorId(tid) 227 backend.killTask(tid, execId, interruptThread) 228 } 229 tsm.abort("Stage %s cancelled".format(stageId)) 230 logInfo("Stage %d was cancelled".format(stageId)) 231 } 232 } 233 } 234 235 /** 236 * Called to indicate that all task attempts (including speculated tasks) associated with the 237 * given TaskSetManager have completed, so state associated with the TaskSetManager should be 238 * cleaned up. 239 */ 240 def taskSetFinished(manager: TaskSetManager): Unit = synchronized { 241 taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { taskSetsForStage => 242 taskSetsForStage -= manager.taskSet.stageAttemptId 243 if (taskSetsForStage.isEmpty) { 244 taskSetsByStageIdAndAttempt -= manager.taskSet.stageId 245 } 246 } 247 manager.parent.removeSchedulable(manager) 248 logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + 249 s" ${manager.parent.name}") 250 } 251 252 private def resourceOfferSingleTaskSet( 253 taskSet: TaskSetManager, 254 maxLocality: TaskLocality, 255 shuffledOffers: Seq[WorkerOffer], 256 availableCpus: Array[Int], 257 tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { 258 var launchedTask = false 259 for (i <- 0 until shuffledOffers.size) { 260 val execId = shuffledOffers(i).executorId 261 val host = shuffledOffers(i).host 262 if (availableCpus(i) >= CPUS_PER_TASK) { 263 try { 264 for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { 265 tasks(i) += task 266 val tid = task.taskId 267 taskIdToTaskSetManager(tid) = taskSet 268 taskIdToExecutorId(tid) = execId 269 executorIdToRunningTaskIds(execId).add(tid) 270 availableCpus(i) -= CPUS_PER_TASK 271 assert(availableCpus(i) >= 0) 272 launchedTask = true 273 } 274 } catch { 275 case e: TaskNotSerializableException => 276 logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") 277 // Do not offer resources for this task, but don't throw an error to allow other 278 // task sets to be submitted. 279 return launchedTask 280 } 281 } 282 } 283 return launchedTask 284 } 285 286 /** 287 * Called by cluster manager to offer resources on slaves. We respond by asking our active task 288 * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so 289 * that tasks are balanced across the cluster. 290 */ 291 def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { 292 // Mark each slave as alive and remember its hostname 293 // Also track if new executor is added 294 var newExecAvail = false 295 for (o <- offers) { 296 if (!hostToExecutors.contains(o.host)) { 297 hostToExecutors(o.host) = new HashSet[String]() 298 } 299 if (!executorIdToRunningTaskIds.contains(o.executorId)) { 300 hostToExecutors(o.host) += o.executorId 301 executorAdded(o.executorId, o.host) 302 executorIdToHost(o.executorId) = o.host 303 executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() 304 newExecAvail = true 305 } 306 for (rack <- getRackForHost(o.host)) { 307 hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host 308 } 309 } 310 311 // Randomly shuffle offers to avoid always placing tasks on the same set of workers. 312 val shuffledOffers = Random.shuffle(offers) 313 // Build a list of tasks to assign to each worker. 314 val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) 315 val availableCpus = shuffledOffers.map(o => o.cores).toArray 316 val sortedTaskSets = rootPool.getSortedTaskSetQueue 317 for (taskSet <- sortedTaskSets) { 318 logDebug("parentName: %s, name: %s, runningTasks: %s".format( 319 taskSet.parent.name, taskSet.name, taskSet.runningTasks)) 320 if (newExecAvail) { 321 taskSet.executorAdded() 322 } 323 } 324 325 // Take each TaskSet in our scheduling order, and then offer it each node in increasing order 326 // of locality levels so that it gets a chance to launch local tasks on all of them. 327 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY 328 for (taskSet <- sortedTaskSets) { 329 var launchedAnyTask = false 330 var launchedTaskAtCurrentMaxLocality = false 331 for (currentMaxLocality <- taskSet.myLocalityLevels) { 332 do { 333 launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( 334 taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) 335 launchedAnyTask |= launchedTaskAtCurrentMaxLocality 336 } while (launchedTaskAtCurrentMaxLocality) 337 } 338 if (!launchedAnyTask) { 339 taskSet.abortIfCompletelyBlacklisted(hostToExecutors) 340 } 341 } 342 343 if (tasks.size > 0) { 344 hasLaunchedTask = true 345 } 346 return tasks 347 } 348 349 def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { 350 var failedExecutor: Option[String] = None 351 var reason: Option[ExecutorLossReason] = None 352 synchronized { 353 try { 354 taskIdToTaskSetManager.get(tid) match { 355 case Some(taskSet) => 356 if (state == TaskState.LOST) { 357 // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, 358 // where each executor corresponds to a single task, so mark the executor as failed. 359 val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( 360 "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) 361 if (executorIdToRunningTaskIds.contains(execId)) { 362 reason = Some( 363 SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) 364 removeExecutor(execId, reason.get) 365 failedExecutor = Some(execId) 366 } 367 } 368 if (TaskState.isFinished(state)) { 369 cleanupTaskState(tid) 370 taskSet.removeRunningTask(tid) 371 if (state == TaskState.FINISHED) { 372 taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) 373 } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { 374 taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) 375 } 376 } 377 case None => 378 logError( 379 ("Ignoring update with state %s for TID %s because its task set is gone (this is " + 380 "likely the result of receiving duplicate task finished status updates) or its " + 381 "executor has been marked as failed.") 382 .format(state, tid)) 383 } 384 } catch { 385 case e: Exception => logError("Exception in statusUpdate", e) 386 } 387 } 388 // Update the DAGScheduler without holding a lock on this, since that can deadlock 389 if (failedExecutor.isDefined) { 390 assert(reason.isDefined) 391 dagScheduler.executorLost(failedExecutor.get, reason.get) 392 backend.reviveOffers() 393 } 394 } 395 396 /** 397 * Update metrics for in-progress tasks and let the master know that the BlockManager is still 398 * alive. Return true if the driver knows about the given block manager. Otherwise, return false, 399 * indicating that the block manager should re-register. 400 */ 401 override def executorHeartbeatReceived( 402 execId: String, 403 accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], 404 blockManagerId: BlockManagerId): Boolean = { 405 // (taskId, stageId, stageAttemptId, accumUpdates) 406 val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { 407 accumUpdates.flatMap { case (id, updates) => 408 val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) 409 taskIdToTaskSetManager.get(id).map { taskSetMgr => 410 (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos) 411 } 412 } 413 } 414 dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId) 415 } 416 417 def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized { 418 taskSetManager.handleTaskGettingResult(tid) 419 } 420 421 def handleSuccessfulTask( 422 taskSetManager: TaskSetManager, 423 tid: Long, 424 taskResult: DirectTaskResult[_]): Unit = synchronized { 425 taskSetManager.handleSuccessfulTask(tid, taskResult) 426 } 427 428 def handleFailedTask( 429 taskSetManager: TaskSetManager, 430 tid: Long, 431 taskState: TaskState, 432 reason: TaskFailedReason): Unit = synchronized { 433 taskSetManager.handleFailedTask(tid, taskState, reason) 434 if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { 435 // Need to revive offers again now that the task set manager state has been updated to 436 // reflect failed tasks that need to be re-run. 437 backend.reviveOffers() 438 } 439 } 440 441 def error(message: String) { 442 synchronized { 443 if (taskSetsByStageIdAndAttempt.nonEmpty) { 444 // Have each task set throw a SparkException with the error 445 for { 446 attempts <- taskSetsByStageIdAndAttempt.values 447 manager <- attempts.values 448 } { 449 try { 450 manager.abort(message) 451 } catch { 452 case e: Exception => logError("Exception in error callback", e) 453 } 454 } 455 } else { 456 // No task sets are active but we still got an error. Just exit since this 457 // must mean the error is during registration. 458 // It might be good to do something smarter here in the future. 459 throw new SparkException(s"Exiting due to error from cluster scheduler: $message") 460 } 461 } 462 } 463 464 override def stop() { 465 speculationScheduler.shutdown() 466 if (backend != null) { 467 backend.stop() 468 } 469 if (taskResultGetter != null) { 470 taskResultGetter.stop() 471 } 472 starvationTimer.cancel() 473 } 474 475 override def defaultParallelism(): Int = backend.defaultParallelism() 476 477 // Check for speculatable tasks in all our active jobs. 478 def checkSpeculatableTasks() { 479 var shouldRevive = false 480 synchronized { 481 shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION) 482 } 483 if (shouldRevive) { 484 backend.reviveOffers() 485 } 486 } 487 488 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { 489 var failedExecutor: Option[String] = None 490 491 synchronized { 492 if (executorIdToRunningTaskIds.contains(executorId)) { 493 val hostPort = executorIdToHost(executorId) 494 logExecutorLoss(executorId, hostPort, reason) 495 removeExecutor(executorId, reason) 496 failedExecutor = Some(executorId) 497 } else { 498 executorIdToHost.get(executorId) match { 499 case Some(hostPort) => 500 // If the host mapping still exists, it means we don't know the loss reason for the 501 // executor. So call removeExecutor() to update tasks running on that executor when 502 // the real loss reason is finally known. 503 logExecutorLoss(executorId, hostPort, reason) 504 removeExecutor(executorId, reason) 505 506 case None => 507 // We may get multiple executorLost() calls with different loss reasons. For example, 508 // one may be triggered by a dropped connection from the slave while another may be a 509 // report of executor termination from Mesos. We produce log messages for both so we 510 // eventually report the termination reason. 511 logError(s"Lost an executor $executorId (already removed): $reason") 512 } 513 } 514 } 515 // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock 516 if (failedExecutor.isDefined) { 517 dagScheduler.executorLost(failedExecutor.get, reason) 518 backend.reviveOffers() 519 } 520 } 521 522 private def logExecutorLoss( 523 executorId: String, 524 hostPort: String, 525 reason: ExecutorLossReason): Unit = reason match { 526 case LossReasonPending => 527 logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.") 528 case ExecutorKilled => 529 logInfo(s"Executor $executorId on $hostPort killed by driver.") 530 case _ => 531 logError(s"Lost executor $executorId on $hostPort: $reason") 532 } 533 534 /** 535 * Cleans up the TaskScheduler's state for tracking the given task. 536 */ 537 private def cleanupTaskState(tid: Long): Unit = { 538 taskIdToTaskSetManager.remove(tid) 539 taskIdToExecutorId.remove(tid).foreach { executorId => 540 executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) } 541 } 542 } 543 544 /** 545 * Remove an executor from all our data structures and mark it as lost. If the executor's loss 546 * reason is not yet known, do not yet remove its association with its host nor update the status 547 * of any running tasks, since the loss reason defines whether we'll fail those tasks. 548 */ 549 private def removeExecutor(executorId: String, reason: ExecutorLossReason) { 550 // The tasks on the lost executor may not send any more status updates (because the executor 551 // has been lost), so they should be cleaned up here. 552 executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => 553 logDebug("Cleaning up TaskScheduler state for tasks " + 554 s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId") 555 // We do not notify the TaskSetManager of the task failures because that will 556 // happen below in the rootPool.executorLost() call. 557 taskIds.foreach(cleanupTaskState) 558 } 559 560 val host = executorIdToHost(executorId) 561 val execs = hostToExecutors.getOrElse(host, new HashSet) 562 execs -= executorId 563 if (execs.isEmpty) { 564 hostToExecutors -= host 565 for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) { 566 hosts -= host 567 if (hosts.isEmpty) { 568 hostsByRack -= rack 569 } 570 } 571 } 572 573 if (reason != LossReasonPending) { 574 executorIdToHost -= executorId 575 rootPool.executorLost(executorId, host, reason) 576 } 577 } 578 579 def executorAdded(execId: String, host: String) { 580 dagScheduler.executorAdded(execId, host) 581 } 582 583 def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized { 584 hostToExecutors.get(host).map(_.toSet) 585 } 586 587 def hasExecutorsAliveOnHost(host: String): Boolean = synchronized { 588 hostToExecutors.contains(host) 589 } 590 591 def hasHostAliveOnRack(rack: String): Boolean = synchronized { 592 hostsByRack.contains(rack) 593 } 594 595 def isExecutorAlive(execId: String): Boolean = synchronized { 596 executorIdToRunningTaskIds.contains(execId) 597 } 598 599 def isExecutorBusy(execId: String): Boolean = synchronized { 600 executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty) 601 } 602 603 // By default, rack is unknown 604 def getRackForHost(value: String): Option[String] = None 605 606 private def waitBackendReady(): Unit = { 607 if (backend.isReady) { 608 return 609 } 610 while (!backend.isReady) { 611 // Might take a while for backend to be ready if it is waiting on resources. 612 if (sc.stopped.get) { 613 // For example: the master removes the application for some reason 614 throw new IllegalStateException("Spark context stopped while waiting for backend") 615 } 616 synchronized { 617 this.wait(100) 618 } 619 } 620 } 621 622 override def applicationId(): String = backend.applicationId() 623 624 override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() 625 626 private[scheduler] def taskSetManagerForAttempt( 627 stageId: Int, 628 stageAttemptId: Int): Option[TaskSetManager] = { 629 for { 630 attempts <- taskSetsByStageIdAndAttempt.get(stageId) 631 manager <- attempts.get(stageAttemptId) 632 } yield { 633 manager 634 } 635 } 636 637} 638 639 640private[spark] object TaskSchedulerImpl { 641 /** 642 * Used to balance containers across hosts. 643 * 644 * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of 645 * resource offers representing the order in which the offers should be used. The resource 646 * offers are ordered such that we'll allocate one container on each host before allocating a 647 * second container on any host, and so on, in order to reduce the damage if a host fails. 648 * 649 * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns 650 * [o1, o5, o4, 02, o6, o3] 651 */ 652 def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = { 653 val _keyList = new ArrayBuffer[K](map.size) 654 _keyList ++= map.keys 655 656 // order keyList based on population of value in map 657 val keyList = _keyList.sortWith( 658 (left, right) => map(left).size > map(right).size 659 ) 660 661 val retval = new ArrayBuffer[T](keyList.size * 2) 662 var index = 0 663 var found = true 664 665 while (found) { 666 found = false 667 for (key <- keyList) { 668 val containerList: ArrayBuffer[T] = map.getOrElse(key, null) 669 assert(containerList != null) 670 // Get the index'th entry for this host - if present 671 if (index < containerList.size) { 672 retval += containerList.apply(index) 673 found = true 674 } 675 } 676 index += 1 677 } 678 679 retval.toList 680 } 681} 682