/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.scheduler import java.nio.ByteBuffer import java.util.{Timer, TimerTask} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. * It can also work with a local setup by using a [[LocalSchedulerBackend]] and setting * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking * up to launch speculative tasks, etc. * * Clients should first call initialize() and start(), then submit task sets through the * runTasks method. * * THREADING: [[SchedulerBackend]]s and task-submitting clients can call this class from multiple * threads, so it needs locks in public API methods to maintain its state. In addition, some * [[SchedulerBackend]]s synchronize on themselves when they want to send events here, and then * acquire a lock on us, so we need to make sure that we don't try to lock the backend while * we are holding a lock on ourselves. */ private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, isLocal: Boolean = false) extends TaskScheduler with Logging { def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) val conf = sc.conf // How often to check for speculative tasks val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") // Duplicate copies of a task will only be launched if the original copy has been running for // at least this amount of time. This is to avoid the overhead of launching speculative copies // of tasks that are very short. val MIN_TIME_TO_SPECULATION = 100 private val speculationScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation") // Threshold above which we warn user initial TaskSet may be starved val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") // CPUs to request per task val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // Protected by `this` private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] @volatile private var hasReceivedTask = false @volatile private var hasLaunchedTask = false private val starvationTimer = new Timer(true) // Incrementing task IDs val nextTaskId = new AtomicLong(0) // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size) } // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host protected val hostToExecutors = new HashMap[String, HashSet[String]] protected val hostsByRack = new HashMap[String, HashSet[String]] protected val executorIdToHost = new HashMap[String, String] // Listener object to pass upcalls into var dagScheduler: DAGScheduler = null var backend: SchedulerBackend = null val mapOutputTracker = SparkEnv.get.mapOutputTracker var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null // default scheduler is FIFO private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") val schedulingMode: SchedulingMode = try { SchedulingMode.withName(schedulingModeConf.toUpperCase) } catch { case e: java.util.NoSuchElementException => throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf") } // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) override def setDAGScheduler(dagScheduler: DAGScheduler) { this.dagScheduler = dagScheduler } def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode") } } schedulableBuilder.buildPools() } def newTaskId(): Long = nextTaskId.getAndIncrement() override def start() { backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") speculationScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS) } } override def postStartHook() { waitBackendReady() } override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() } // Label as private[scheduler] to allow tests to swap in different task set managers if necessary private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new TaskSetManager(this, taskSet, maxTaskFailures) } override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => attempts.foreach { case (_, tsm) => // There are two possible cases here: // 1. The task set manager has been created and some tasks have been scheduled. // In this case, send a kill signal to the executors to kill the task and then abort // the stage. // 2. The task set manager has been created but no tasks has been scheduled. In this case, // simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) backend.killTask(tid, execId, interruptThread) } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) } } } /** * Called to indicate that all task attempts (including speculated tasks) associated with the * given TaskSetManager have completed, so state associated with the TaskSetManager should be * cleaned up. */ def taskSetFinished(manager: TaskSetManager): Unit = synchronized { taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { taskSetsForStage => taskSetsForStage -= manager.taskSet.stageAttemptId if (taskSetsForStage.isEmpty) { taskSetsByStageIdAndAttempt -= manager.taskSet.stageId } } manager.parent.removeSchedulable(manager) logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + s" ${manager.parent.name}") } private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask } /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false for (o <- offers) { if (!hostToExecutors.contains(o.host)) { hostToExecutors(o.host) = new HashSet[String]() } if (!executorIdToRunningTaskIds.contains(o.executorId)) { hostToExecutors(o.host) += o.executorId executorAdded(o.executorId, o.host) executorIdToHost(o.executorId) = o.host executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } if (tasks.size > 0) { hasLaunchedTask = true } return tasks } def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None synchronized { try { taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, // where each executor corresponds to a single task, so mark the executor as failed. val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) failedExecutor = Some(execId) } } if (TaskState.isFinished(state)) { cleanupTaskState(tid) taskSet.removeRunningTask(tid) if (state == TaskState.FINISHED) { taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + "likely the result of receiving duplicate task finished status updates) or its " + "executor has been marked as failed.") .format(state, tid)) } } catch { case e: Exception => logError("Exception in statusUpdate", e) } } // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor.isDefined) { assert(reason.isDefined) dagScheduler.executorLost(failedExecutor.get, reason.get) backend.reviveOffers() } } /** * Update metrics for in-progress tasks and let the master know that the BlockManager is still * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ override def executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { accumUpdates.flatMap { case (id, updates) => val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) taskIdToTaskSetManager.get(id).map { taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos) } } } dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId) } def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized { taskSetManager.handleTaskGettingResult(tid) } def handleSuccessfulTask( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized { taskSetManager.handleSuccessfulTask(tid, taskResult) } def handleFailedTask( taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { // Need to revive offers again now that the task set manager state has been updated to // reflect failed tasks that need to be re-run. backend.reviveOffers() } } def error(message: String) { synchronized { if (taskSetsByStageIdAndAttempt.nonEmpty) { // Have each task set throw a SparkException with the error for { attempts <- taskSetsByStageIdAndAttempt.values manager <- attempts.values } { try { manager.abort(message) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { // No task sets are active but we still got an error. Just exit since this // must mean the error is during registration. // It might be good to do something smarter here in the future. throw new SparkException(s"Exiting due to error from cluster scheduler: $message") } } } override def stop() { speculationScheduler.shutdown() if (backend != null) { backend.stop() } if (taskResultGetter != null) { taskResultGetter.stop() } starvationTimer.cancel() } override def defaultParallelism(): Int = backend.defaultParallelism() // Check for speculatable tasks in all our active jobs. def checkSpeculatableTasks() { var shouldRevive = false synchronized { shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION) } if (shouldRevive) { backend.reviveOffers() } } override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None synchronized { if (executorIdToRunningTaskIds.contains(executorId)) { val hostPort = executorIdToHost(executorId) logExecutorLoss(executorId, hostPort, reason) removeExecutor(executorId, reason) failedExecutor = Some(executorId) } else { executorIdToHost.get(executorId) match { case Some(hostPort) => // If the host mapping still exists, it means we don't know the loss reason for the // executor. So call removeExecutor() to update tasks running on that executor when // the real loss reason is finally known. logExecutorLoss(executorId, hostPort, reason) removeExecutor(executorId, reason) case None => // We may get multiple executorLost() calls with different loss reasons. For example, // one may be triggered by a dropped connection from the slave while another may be a // report of executor termination from Mesos. We produce log messages for both so we // eventually report the termination reason. logError(s"Lost an executor $executorId (already removed): $reason") } } } // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock if (failedExecutor.isDefined) { dagScheduler.executorLost(failedExecutor.get, reason) backend.reviveOffers() } } private def logExecutorLoss( executorId: String, hostPort: String, reason: ExecutorLossReason): Unit = reason match { case LossReasonPending => logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.") case ExecutorKilled => logInfo(s"Executor $executorId on $hostPort killed by driver.") case _ => logError(s"Lost executor $executorId on $hostPort: $reason") } /** * Cleans up the TaskScheduler's state for tracking the given task. */ private def cleanupTaskState(tid: Long): Unit = { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { executorId => executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) } } } /** * Remove an executor from all our data structures and mark it as lost. If the executor's loss * reason is not yet known, do not yet remove its association with its host nor update the status * of any running tasks, since the loss reason defines whether we'll fail those tasks. */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { // The tasks on the lost executor may not send any more status updates (because the executor // has been lost), so they should be cleaned up here. executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => logDebug("Cleaning up TaskScheduler state for tasks " + s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId") // We do not notify the TaskSetManager of the task failures because that will // happen below in the rootPool.executorLost() call. taskIds.foreach(cleanupTaskState) } val host = executorIdToHost(executorId) val execs = hostToExecutors.getOrElse(host, new HashSet) execs -= executorId if (execs.isEmpty) { hostToExecutors -= host for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) { hosts -= host if (hosts.isEmpty) { hostsByRack -= rack } } } if (reason != LossReasonPending) { executorIdToHost -= executorId rootPool.executorLost(executorId, host, reason) } } def executorAdded(execId: String, host: String) { dagScheduler.executorAdded(execId, host) } def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized { hostToExecutors.get(host).map(_.toSet) } def hasExecutorsAliveOnHost(host: String): Boolean = synchronized { hostToExecutors.contains(host) } def hasHostAliveOnRack(rack: String): Boolean = synchronized { hostsByRack.contains(rack) } def isExecutorAlive(execId: String): Boolean = synchronized { executorIdToRunningTaskIds.contains(execId) } def isExecutorBusy(execId: String): Boolean = synchronized { executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty) } // By default, rack is unknown def getRackForHost(value: String): Option[String] = None private def waitBackendReady(): Unit = { if (backend.isReady) { return } while (!backend.isReady) { // Might take a while for backend to be ready if it is waiting on resources. if (sc.stopped.get) { // For example: the master removes the application for some reason throw new IllegalStateException("Spark context stopped while waiting for backend") } synchronized { this.wait(100) } } } override def applicationId(): String = backend.applicationId() override def applicationAttemptId(): Option[String] = backend.applicationAttemptId() private[scheduler] def taskSetManagerForAttempt( stageId: Int, stageAttemptId: Int): Option[TaskSetManager] = { for { attempts <- taskSetsByStageIdAndAttempt.get(stageId) manager <- attempts.get(stageAttemptId) } yield { manager } } } private[spark] object TaskSchedulerImpl { /** * Used to balance containers across hosts. * * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of * resource offers representing the order in which the offers should be used. The resource * offers are ordered such that we'll allocate one container on each host before allocating a * second container on any host, and so on, in order to reduce the damage if a host fails. * * For example, given , , , returns * [o1, o5, o4, 02, o6, o3] */ def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = { val _keyList = new ArrayBuffer[K](map.size) _keyList ++= map.keys // order keyList based on population of value in map val keyList = _keyList.sortWith( (left, right) => map(left).size > map(right).size ) val retval = new ArrayBuffer[T](keyList.size * 2) var index = 0 var found = true while (found) { found = false for (key <- keyList) { val containerList: ArrayBuffer[T] = map.getOrElse(key, null) assert(containerList != null) // Get the index'th entry for this host - if present if (index < containerList.size) { retval += containerList.apply(index) found = true } } index += 1 } retval.toList } }