1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.scheduler
19
20import java.nio.ByteBuffer
21import java.util.{Timer, TimerTask}
22import java.util.concurrent.TimeUnit
23import java.util.concurrent.atomic.AtomicLong
24
25import scala.collection.Set
26import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
27import scala.util.Random
28
29import org.apache.spark._
30import org.apache.spark.TaskState.TaskState
31import org.apache.spark.internal.Logging
32import org.apache.spark.internal.config
33import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
34import org.apache.spark.scheduler.TaskLocality.TaskLocality
35import org.apache.spark.scheduler.local.LocalSchedulerBackend
36import org.apache.spark.storage.BlockManagerId
37import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
38
39/**
40 * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
41 * It can also work with a local setup by using a [[LocalSchedulerBackend]] and setting
42 * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking
43 * up to launch speculative tasks, etc.
44 *
45 * Clients should first call initialize() and start(), then submit task sets through the
46 * runTasks method.
47 *
48 * THREADING: [[SchedulerBackend]]s and task-submitting clients can call this class from multiple
49 * threads, so it needs locks in public API methods to maintain its state. In addition, some
50 * [[SchedulerBackend]]s synchronize on themselves when they want to send events here, and then
51 * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
52 * we are holding a lock on ourselves.
53 */
54private[spark] class TaskSchedulerImpl(
55    val sc: SparkContext,
56    val maxTaskFailures: Int,
57    isLocal: Boolean = false)
58  extends TaskScheduler with Logging
59{
60  def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
61
62  val conf = sc.conf
63
64  // How often to check for speculative tasks
65  val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
66
67  // Duplicate copies of a task will only be launched if the original copy has been running for
68  // at least this amount of time. This is to avoid the overhead of launching speculative copies
69  // of tasks that are very short.
70  val MIN_TIME_TO_SPECULATION = 100
71
72  private val speculationScheduler =
73    ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
74
75  // Threshold above which we warn user initial TaskSet may be starved
76  val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s")
77
78  // CPUs to request per task
79  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
80
81  // TaskSetManagers are not thread safe, so any access to one should be synchronized
82  // on this class.
83  private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
84
85  // Protected by `this`
86  private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]
87  val taskIdToExecutorId = new HashMap[Long, String]
88
89  @volatile private var hasReceivedTask = false
90  @volatile private var hasLaunchedTask = false
91  private val starvationTimer = new Timer(true)
92
93  // Incrementing task IDs
94  val nextTaskId = new AtomicLong(0)
95
96  // IDs of the tasks running on each executor
97  private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
98
99  def runningTasksByExecutors: Map[String, Int] = synchronized {
100    executorIdToRunningTaskIds.toMap.mapValues(_.size)
101  }
102
103  // The set of executors we have on each host; this is used to compute hostsAlive, which
104  // in turn is used to decide when we can attain data locality on a given host
105  protected val hostToExecutors = new HashMap[String, HashSet[String]]
106
107  protected val hostsByRack = new HashMap[String, HashSet[String]]
108
109  protected val executorIdToHost = new HashMap[String, String]
110
111  // Listener object to pass upcalls into
112  var dagScheduler: DAGScheduler = null
113
114  var backend: SchedulerBackend = null
115
116  val mapOutputTracker = SparkEnv.get.mapOutputTracker
117
118  var schedulableBuilder: SchedulableBuilder = null
119  var rootPool: Pool = null
120  // default scheduler is FIFO
121  private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
122  val schedulingMode: SchedulingMode = try {
123    SchedulingMode.withName(schedulingModeConf.toUpperCase)
124  } catch {
125    case e: java.util.NoSuchElementException =>
126      throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
127  }
128
129  // This is a var so that we can reset it for testing purposes.
130  private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
131
132  override def setDAGScheduler(dagScheduler: DAGScheduler) {
133    this.dagScheduler = dagScheduler
134  }
135
136  def initialize(backend: SchedulerBackend) {
137    this.backend = backend
138    // temporarily set rootPool name to empty
139    rootPool = new Pool("", schedulingMode, 0, 0)
140    schedulableBuilder = {
141      schedulingMode match {
142        case SchedulingMode.FIFO =>
143          new FIFOSchedulableBuilder(rootPool)
144        case SchedulingMode.FAIR =>
145          new FairSchedulableBuilder(rootPool, conf)
146        case _ =>
147          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
148      }
149    }
150    schedulableBuilder.buildPools()
151  }
152
153  def newTaskId(): Long = nextTaskId.getAndIncrement()
154
155  override def start() {
156    backend.start()
157
158    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
159      logInfo("Starting speculative execution thread")
160      speculationScheduler.scheduleAtFixedRate(new Runnable {
161        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
162          checkSpeculatableTasks()
163        }
164      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
165    }
166  }
167
168  override def postStartHook() {
169    waitBackendReady()
170  }
171
172  override def submitTasks(taskSet: TaskSet) {
173    val tasks = taskSet.tasks
174    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
175    this.synchronized {
176      val manager = createTaskSetManager(taskSet, maxTaskFailures)
177      val stage = taskSet.stageId
178      val stageTaskSets =
179        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
180      stageTaskSets(taskSet.stageAttemptId) = manager
181      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
182        ts.taskSet != taskSet && !ts.isZombie
183      }
184      if (conflictingTaskSet) {
185        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
186          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
187      }
188      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
189
190      if (!isLocal && !hasReceivedTask) {
191        starvationTimer.scheduleAtFixedRate(new TimerTask() {
192          override def run() {
193            if (!hasLaunchedTask) {
194              logWarning("Initial job has not accepted any resources; " +
195                "check your cluster UI to ensure that workers are registered " +
196                "and have sufficient resources")
197            } else {
198              this.cancel()
199            }
200          }
201        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
202      }
203      hasReceivedTask = true
204    }
205    backend.reviveOffers()
206  }
207
208  // Label as private[scheduler] to allow tests to swap in different task set managers if necessary
209  private[scheduler] def createTaskSetManager(
210      taskSet: TaskSet,
211      maxTaskFailures: Int): TaskSetManager = {
212    new TaskSetManager(this, taskSet, maxTaskFailures)
213  }
214
215  override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
216    logInfo("Cancelling stage " + stageId)
217    taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
218      attempts.foreach { case (_, tsm) =>
219        // There are two possible cases here:
220        // 1. The task set manager has been created and some tasks have been scheduled.
221        //    In this case, send a kill signal to the executors to kill the task and then abort
222        //    the stage.
223        // 2. The task set manager has been created but no tasks has been scheduled. In this case,
224        //    simply abort the stage.
225        tsm.runningTasksSet.foreach { tid =>
226          val execId = taskIdToExecutorId(tid)
227          backend.killTask(tid, execId, interruptThread)
228        }
229        tsm.abort("Stage %s cancelled".format(stageId))
230        logInfo("Stage %d was cancelled".format(stageId))
231      }
232    }
233  }
234
235  /**
236   * Called to indicate that all task attempts (including speculated tasks) associated with the
237   * given TaskSetManager have completed, so state associated with the TaskSetManager should be
238   * cleaned up.
239   */
240  def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
241    taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { taskSetsForStage =>
242      taskSetsForStage -= manager.taskSet.stageAttemptId
243      if (taskSetsForStage.isEmpty) {
244        taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
245      }
246    }
247    manager.parent.removeSchedulable(manager)
248    logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
249      s" ${manager.parent.name}")
250  }
251
252  private def resourceOfferSingleTaskSet(
253      taskSet: TaskSetManager,
254      maxLocality: TaskLocality,
255      shuffledOffers: Seq[WorkerOffer],
256      availableCpus: Array[Int],
257      tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
258    var launchedTask = false
259    for (i <- 0 until shuffledOffers.size) {
260      val execId = shuffledOffers(i).executorId
261      val host = shuffledOffers(i).host
262      if (availableCpus(i) >= CPUS_PER_TASK) {
263        try {
264          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
265            tasks(i) += task
266            val tid = task.taskId
267            taskIdToTaskSetManager(tid) = taskSet
268            taskIdToExecutorId(tid) = execId
269            executorIdToRunningTaskIds(execId).add(tid)
270            availableCpus(i) -= CPUS_PER_TASK
271            assert(availableCpus(i) >= 0)
272            launchedTask = true
273          }
274        } catch {
275          case e: TaskNotSerializableException =>
276            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
277            // Do not offer resources for this task, but don't throw an error to allow other
278            // task sets to be submitted.
279            return launchedTask
280        }
281      }
282    }
283    return launchedTask
284  }
285
286  /**
287   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
288   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
289   * that tasks are balanced across the cluster.
290   */
291  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
292    // Mark each slave as alive and remember its hostname
293    // Also track if new executor is added
294    var newExecAvail = false
295    for (o <- offers) {
296      if (!hostToExecutors.contains(o.host)) {
297        hostToExecutors(o.host) = new HashSet[String]()
298      }
299      if (!executorIdToRunningTaskIds.contains(o.executorId)) {
300        hostToExecutors(o.host) += o.executorId
301        executorAdded(o.executorId, o.host)
302        executorIdToHost(o.executorId) = o.host
303        executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
304        newExecAvail = true
305      }
306      for (rack <- getRackForHost(o.host)) {
307        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
308      }
309    }
310
311    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
312    val shuffledOffers = Random.shuffle(offers)
313    // Build a list of tasks to assign to each worker.
314    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
315    val availableCpus = shuffledOffers.map(o => o.cores).toArray
316    val sortedTaskSets = rootPool.getSortedTaskSetQueue
317    for (taskSet <- sortedTaskSets) {
318      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
319        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
320      if (newExecAvail) {
321        taskSet.executorAdded()
322      }
323    }
324
325    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
326    // of locality levels so that it gets a chance to launch local tasks on all of them.
327    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
328    for (taskSet <- sortedTaskSets) {
329      var launchedAnyTask = false
330      var launchedTaskAtCurrentMaxLocality = false
331      for (currentMaxLocality <- taskSet.myLocalityLevels) {
332        do {
333          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
334            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
335          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
336        } while (launchedTaskAtCurrentMaxLocality)
337      }
338      if (!launchedAnyTask) {
339        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
340      }
341    }
342
343    if (tasks.size > 0) {
344      hasLaunchedTask = true
345    }
346    return tasks
347  }
348
349  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
350    var failedExecutor: Option[String] = None
351    var reason: Option[ExecutorLossReason] = None
352    synchronized {
353      try {
354        taskIdToTaskSetManager.get(tid) match {
355          case Some(taskSet) =>
356            if (state == TaskState.LOST) {
357              // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
358              // where each executor corresponds to a single task, so mark the executor as failed.
359              val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
360                "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
361              if (executorIdToRunningTaskIds.contains(execId)) {
362                reason = Some(
363                  SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
364                removeExecutor(execId, reason.get)
365                failedExecutor = Some(execId)
366              }
367            }
368            if (TaskState.isFinished(state)) {
369              cleanupTaskState(tid)
370              taskSet.removeRunningTask(tid)
371              if (state == TaskState.FINISHED) {
372                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
373              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
374                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
375              }
376            }
377          case None =>
378            logError(
379              ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
380                "likely the result of receiving duplicate task finished status updates) or its " +
381                "executor has been marked as failed.")
382                .format(state, tid))
383        }
384      } catch {
385        case e: Exception => logError("Exception in statusUpdate", e)
386      }
387    }
388    // Update the DAGScheduler without holding a lock on this, since that can deadlock
389    if (failedExecutor.isDefined) {
390      assert(reason.isDefined)
391      dagScheduler.executorLost(failedExecutor.get, reason.get)
392      backend.reviveOffers()
393    }
394  }
395
396  /**
397   * Update metrics for in-progress tasks and let the master know that the BlockManager is still
398   * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
399   * indicating that the block manager should re-register.
400   */
401  override def executorHeartbeatReceived(
402      execId: String,
403      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
404      blockManagerId: BlockManagerId): Boolean = {
405    // (taskId, stageId, stageAttemptId, accumUpdates)
406    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
407      accumUpdates.flatMap { case (id, updates) =>
408        val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
409        taskIdToTaskSetManager.get(id).map { taskSetMgr =>
410          (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
411        }
412      }
413    }
414    dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId)
415  }
416
417  def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
418    taskSetManager.handleTaskGettingResult(tid)
419  }
420
421  def handleSuccessfulTask(
422      taskSetManager: TaskSetManager,
423      tid: Long,
424      taskResult: DirectTaskResult[_]): Unit = synchronized {
425    taskSetManager.handleSuccessfulTask(tid, taskResult)
426  }
427
428  def handleFailedTask(
429      taskSetManager: TaskSetManager,
430      tid: Long,
431      taskState: TaskState,
432      reason: TaskFailedReason): Unit = synchronized {
433    taskSetManager.handleFailedTask(tid, taskState, reason)
434    if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
435      // Need to revive offers again now that the task set manager state has been updated to
436      // reflect failed tasks that need to be re-run.
437      backend.reviveOffers()
438    }
439  }
440
441  def error(message: String) {
442    synchronized {
443      if (taskSetsByStageIdAndAttempt.nonEmpty) {
444        // Have each task set throw a SparkException with the error
445        for {
446          attempts <- taskSetsByStageIdAndAttempt.values
447          manager <- attempts.values
448        } {
449          try {
450            manager.abort(message)
451          } catch {
452            case e: Exception => logError("Exception in error callback", e)
453          }
454        }
455      } else {
456        // No task sets are active but we still got an error. Just exit since this
457        // must mean the error is during registration.
458        // It might be good to do something smarter here in the future.
459        throw new SparkException(s"Exiting due to error from cluster scheduler: $message")
460      }
461    }
462  }
463
464  override def stop() {
465    speculationScheduler.shutdown()
466    if (backend != null) {
467      backend.stop()
468    }
469    if (taskResultGetter != null) {
470      taskResultGetter.stop()
471    }
472    starvationTimer.cancel()
473  }
474
475  override def defaultParallelism(): Int = backend.defaultParallelism()
476
477  // Check for speculatable tasks in all our active jobs.
478  def checkSpeculatableTasks() {
479    var shouldRevive = false
480    synchronized {
481      shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
482    }
483    if (shouldRevive) {
484      backend.reviveOffers()
485    }
486  }
487
488  override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
489    var failedExecutor: Option[String] = None
490
491    synchronized {
492      if (executorIdToRunningTaskIds.contains(executorId)) {
493        val hostPort = executorIdToHost(executorId)
494        logExecutorLoss(executorId, hostPort, reason)
495        removeExecutor(executorId, reason)
496        failedExecutor = Some(executorId)
497      } else {
498        executorIdToHost.get(executorId) match {
499          case Some(hostPort) =>
500            // If the host mapping still exists, it means we don't know the loss reason for the
501            // executor. So call removeExecutor() to update tasks running on that executor when
502            // the real loss reason is finally known.
503            logExecutorLoss(executorId, hostPort, reason)
504            removeExecutor(executorId, reason)
505
506          case None =>
507            // We may get multiple executorLost() calls with different loss reasons. For example,
508            // one may be triggered by a dropped connection from the slave while another may be a
509            // report of executor termination from Mesos. We produce log messages for both so we
510            // eventually report the termination reason.
511            logError(s"Lost an executor $executorId (already removed): $reason")
512        }
513      }
514    }
515    // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
516    if (failedExecutor.isDefined) {
517      dagScheduler.executorLost(failedExecutor.get, reason)
518      backend.reviveOffers()
519    }
520  }
521
522  private def logExecutorLoss(
523      executorId: String,
524      hostPort: String,
525      reason: ExecutorLossReason): Unit = reason match {
526    case LossReasonPending =>
527      logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.")
528    case ExecutorKilled =>
529      logInfo(s"Executor $executorId on $hostPort killed by driver.")
530    case _ =>
531      logError(s"Lost executor $executorId on $hostPort: $reason")
532  }
533
534  /**
535   * Cleans up the TaskScheduler's state for tracking the given task.
536   */
537  private def cleanupTaskState(tid: Long): Unit = {
538    taskIdToTaskSetManager.remove(tid)
539    taskIdToExecutorId.remove(tid).foreach { executorId =>
540      executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
541    }
542  }
543
544  /**
545   * Remove an executor from all our data structures and mark it as lost. If the executor's loss
546   * reason is not yet known, do not yet remove its association with its host nor update the status
547   * of any running tasks, since the loss reason defines whether we'll fail those tasks.
548   */
549  private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
550    // The tasks on the lost executor may not send any more status updates (because the executor
551    // has been lost), so they should be cleaned up here.
552    executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
553      logDebug("Cleaning up TaskScheduler state for tasks " +
554        s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
555      // We do not notify the TaskSetManager of the task failures because that will
556      // happen below in the rootPool.executorLost() call.
557      taskIds.foreach(cleanupTaskState)
558    }
559
560    val host = executorIdToHost(executorId)
561    val execs = hostToExecutors.getOrElse(host, new HashSet)
562    execs -= executorId
563    if (execs.isEmpty) {
564      hostToExecutors -= host
565      for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
566        hosts -= host
567        if (hosts.isEmpty) {
568          hostsByRack -= rack
569        }
570      }
571    }
572
573    if (reason != LossReasonPending) {
574      executorIdToHost -= executorId
575      rootPool.executorLost(executorId, host, reason)
576    }
577  }
578
579  def executorAdded(execId: String, host: String) {
580    dagScheduler.executorAdded(execId, host)
581  }
582
583  def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
584    hostToExecutors.get(host).map(_.toSet)
585  }
586
587  def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
588    hostToExecutors.contains(host)
589  }
590
591  def hasHostAliveOnRack(rack: String): Boolean = synchronized {
592    hostsByRack.contains(rack)
593  }
594
595  def isExecutorAlive(execId: String): Boolean = synchronized {
596    executorIdToRunningTaskIds.contains(execId)
597  }
598
599  def isExecutorBusy(execId: String): Boolean = synchronized {
600    executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
601  }
602
603  // By default, rack is unknown
604  def getRackForHost(value: String): Option[String] = None
605
606  private def waitBackendReady(): Unit = {
607    if (backend.isReady) {
608      return
609    }
610    while (!backend.isReady) {
611      // Might take a while for backend to be ready if it is waiting on resources.
612      if (sc.stopped.get) {
613        // For example: the master removes the application for some reason
614        throw new IllegalStateException("Spark context stopped while waiting for backend")
615      }
616      synchronized {
617        this.wait(100)
618      }
619    }
620  }
621
622  override def applicationId(): String = backend.applicationId()
623
624  override def applicationAttemptId(): Option[String] = backend.applicationAttemptId()
625
626  private[scheduler] def taskSetManagerForAttempt(
627      stageId: Int,
628      stageAttemptId: Int): Option[TaskSetManager] = {
629    for {
630      attempts <- taskSetsByStageIdAndAttempt.get(stageId)
631      manager <- attempts.get(stageAttemptId)
632    } yield {
633      manager
634    }
635  }
636
637}
638
639
640private[spark] object TaskSchedulerImpl {
641  /**
642   * Used to balance containers across hosts.
643   *
644   * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
645   * resource offers representing the order in which the offers should be used.  The resource
646   * offers are ordered such that we'll allocate one container on each host before allocating a
647   * second container on any host, and so on, in order to reduce the damage if a host fails.
648   *
649   * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
650   * [o1, o5, o4, 02, o6, o3]
651   */
652  def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
653    val _keyList = new ArrayBuffer[K](map.size)
654    _keyList ++= map.keys
655
656    // order keyList based on population of value in map
657    val keyList = _keyList.sortWith(
658      (left, right) => map(left).size > map(right).size
659    )
660
661    val retval = new ArrayBuffer[T](keyList.size * 2)
662    var index = 0
663    var found = true
664
665    while (found) {
666      found = false
667      for (key <- keyList) {
668        val containerList: ArrayBuffer[T] = map.getOrElse(key, null)
669        assert(containerList != null)
670        // Get the index'th entry for this host - if present
671        if (index < containerList.size) {
672          retval += containerList.apply(index)
673          found = true
674        }
675      }
676      index += 1
677    }
678
679    retval.toList
680  }
681}
682