1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.deploy.master 19 20import java.text.SimpleDateFormat 21import java.util.{Date, Locale} 22import java.util.concurrent.{ScheduledFuture, TimeUnit} 23 24import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} 25import scala.util.Random 26 27import org.apache.spark.{SecurityManager, SparkConf, SparkException} 28import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, 29 ExecutorState, SparkHadoopUtil} 30import org.apache.spark.deploy.DeployMessages._ 31import org.apache.spark.deploy.master.DriverState.DriverState 32import org.apache.spark.deploy.master.MasterMessages._ 33import org.apache.spark.deploy.master.ui.MasterWebUI 34import org.apache.spark.deploy.rest.StandaloneRestServer 35import org.apache.spark.internal.Logging 36import org.apache.spark.metrics.MetricsSystem 37import org.apache.spark.rpc._ 38import org.apache.spark.serializer.{JavaSerializer, Serializer} 39import org.apache.spark.util.{ThreadUtils, Utils} 40 41private[deploy] class Master( 42 override val rpcEnv: RpcEnv, 43 address: RpcAddress, 44 webUiPort: Int, 45 val securityMgr: SecurityManager, 46 val conf: SparkConf) 47 extends ThreadSafeRpcEndpoint with Logging with LeaderElectable { 48 49 private val forwardMessageThread = 50 ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") 51 52 private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) 53 54 // For application IDs 55 private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) 56 57 private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000 58 private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) 59 private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200) 60 private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) 61 private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") 62 private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10) 63 64 val workers = new HashSet[WorkerInfo] 65 val idToApp = new HashMap[String, ApplicationInfo] 66 private val waitingApps = new ArrayBuffer[ApplicationInfo] 67 val apps = new HashSet[ApplicationInfo] 68 69 private val idToWorker = new HashMap[String, WorkerInfo] 70 private val addressToWorker = new HashMap[RpcAddress, WorkerInfo] 71 72 private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo] 73 private val addressToApp = new HashMap[RpcAddress, ApplicationInfo] 74 private val completedApps = new ArrayBuffer[ApplicationInfo] 75 private var nextAppNumber = 0 76 77 private val drivers = new HashSet[DriverInfo] 78 private val completedDrivers = new ArrayBuffer[DriverInfo] 79 // Drivers currently spooled for scheduling 80 private val waitingDrivers = new ArrayBuffer[DriverInfo] 81 private var nextDriverNumber = 0 82 83 Utils.checkHost(address.host, "Expected hostname") 84 85 private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr) 86 private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf, 87 securityMgr) 88 private val masterSource = new MasterSource(this) 89 90 // After onStart, webUi will be set 91 private var webUi: MasterWebUI = null 92 93 private val masterPublicAddress = { 94 val envVar = conf.getenv("SPARK_PUBLIC_DNS") 95 if (envVar != null) envVar else address.host 96 } 97 98 private val masterUrl = address.toSparkURL 99 private var masterWebUiUrl: String = _ 100 101 private var state = RecoveryState.STANDBY 102 103 private var persistenceEngine: PersistenceEngine = _ 104 105 private var leaderElectionAgent: LeaderElectionAgent = _ 106 107 private var recoveryCompletionTask: ScheduledFuture[_] = _ 108 109 private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _ 110 111 // As a temporary workaround before better ways of configuring memory, we allow users to set 112 // a flag that will perform round-robin scheduling across the nodes (spreading out each app 113 // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. 114 private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true) 115 116 // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue) 117 private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) 118 val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false) 119 if (defaultCores < 1) { 120 throw new SparkException("spark.deploy.defaultCores must be positive") 121 } 122 123 // Alternative application submission gateway that is stable across Spark versions 124 private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true) 125 private var restServer: Option[StandaloneRestServer] = None 126 private var restServerBoundPort: Option[Int] = None 127 128 override def onStart(): Unit = { 129 logInfo("Starting Spark master at " + masterUrl) 130 logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") 131 webUi = new MasterWebUI(this, webUiPort) 132 webUi.bind() 133 masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort 134 if (reverseProxy) { 135 masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl) 136 logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " + 137 s"Applications UIs are available at $masterWebUiUrl") 138 } 139 checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { 140 override def run(): Unit = Utils.tryLogNonFatalError { 141 self.send(CheckForWorkerTimeOut) 142 } 143 }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) 144 145 if (restServerEnabled) { 146 val port = conf.getInt("spark.master.rest.port", 6066) 147 restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) 148 } 149 restServerBoundPort = restServer.map(_.start()) 150 151 masterMetricsSystem.registerSource(masterSource) 152 masterMetricsSystem.start() 153 applicationMetricsSystem.start() 154 // Attach the master and app metrics servlet handler to the web ui after the metrics systems are 155 // started. 156 masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) 157 applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) 158 159 val serializer = new JavaSerializer(conf) 160 val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { 161 case "ZOOKEEPER" => 162 logInfo("Persisting recovery state to ZooKeeper") 163 val zkFactory = 164 new ZooKeeperRecoveryModeFactory(conf, serializer) 165 (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) 166 case "FILESYSTEM" => 167 val fsFactory = 168 new FileSystemRecoveryModeFactory(conf, serializer) 169 (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) 170 case "CUSTOM" => 171 val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) 172 val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) 173 .newInstance(conf, serializer) 174 .asInstanceOf[StandaloneRecoveryModeFactory] 175 (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) 176 case _ => 177 (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) 178 } 179 persistenceEngine = persistenceEngine_ 180 leaderElectionAgent = leaderElectionAgent_ 181 } 182 183 override def onStop() { 184 masterMetricsSystem.report() 185 applicationMetricsSystem.report() 186 // prevent the CompleteRecovery message sending to restarted master 187 if (recoveryCompletionTask != null) { 188 recoveryCompletionTask.cancel(true) 189 } 190 if (checkForWorkerTimeOutTask != null) { 191 checkForWorkerTimeOutTask.cancel(true) 192 } 193 forwardMessageThread.shutdownNow() 194 webUi.stop() 195 restServer.foreach(_.stop()) 196 masterMetricsSystem.stop() 197 applicationMetricsSystem.stop() 198 persistenceEngine.close() 199 leaderElectionAgent.stop() 200 } 201 202 override def electedLeader() { 203 self.send(ElectedLeader) 204 } 205 206 override def revokedLeadership() { 207 self.send(RevokedLeadership) 208 } 209 210 override def receive: PartialFunction[Any, Unit] = { 211 case ElectedLeader => 212 val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) 213 state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { 214 RecoveryState.ALIVE 215 } else { 216 RecoveryState.RECOVERING 217 } 218 logInfo("I have been elected leader! New state: " + state) 219 if (state == RecoveryState.RECOVERING) { 220 beginRecovery(storedApps, storedDrivers, storedWorkers) 221 recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { 222 override def run(): Unit = Utils.tryLogNonFatalError { 223 self.send(CompleteRecovery) 224 } 225 }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) 226 } 227 228 case CompleteRecovery => completeRecovery() 229 230 case RevokedLeadership => 231 logError("Leadership has been revoked -- master shutting down.") 232 System.exit(0) 233 234 case RegisterApplication(description, driver) => 235 // TODO Prevent repeated registrations from some driver 236 if (state == RecoveryState.STANDBY) { 237 // ignore, don't send response 238 } else { 239 logInfo("Registering app " + description.name) 240 val app = createApplication(description, driver) 241 registerApplication(app) 242 logInfo("Registered app " + description.name + " with ID " + app.id) 243 persistenceEngine.addApplication(app) 244 driver.send(RegisteredApplication(app.id, self)) 245 schedule() 246 } 247 248 case ExecutorStateChanged(appId, execId, state, message, exitStatus) => 249 val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) 250 execOption match { 251 case Some(exec) => 252 val appInfo = idToApp(appId) 253 val oldState = exec.state 254 exec.state = state 255 256 if (state == ExecutorState.RUNNING) { 257 assert(oldState == ExecutorState.LAUNCHING, 258 s"executor $execId state transfer from $oldState to RUNNING is illegal") 259 appInfo.resetRetryCount() 260 } 261 262 exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) 263 264 if (ExecutorState.isFinished(state)) { 265 // Remove this executor from the worker and app 266 logInfo(s"Removing executor ${exec.fullId} because it is $state") 267 // If an application has already finished, preserve its 268 // state to display its information properly on the UI 269 if (!appInfo.isFinished) { 270 appInfo.removeExecutor(exec) 271 } 272 exec.worker.removeExecutor(exec) 273 274 val normalExit = exitStatus == Some(0) 275 // Only retry certain number of times so we don't go into an infinite loop. 276 // Important note: this code path is not exercised by tests, so be very careful when 277 // changing this `if` condition. 278 if (!normalExit 279 && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES 280 && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path 281 val execs = appInfo.executors.values 282 if (!execs.exists(_.state == ExecutorState.RUNNING)) { 283 logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + 284 s"${appInfo.retryCount} times; removing it") 285 removeApplication(appInfo, ApplicationState.FAILED) 286 } 287 } 288 } 289 schedule() 290 case None => 291 logWarning(s"Got status update for unknown executor $appId/$execId") 292 } 293 294 case DriverStateChanged(driverId, state, exception) => 295 state match { 296 case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => 297 removeDriver(driverId, state, exception) 298 case _ => 299 throw new Exception(s"Received unexpected state update for driver $driverId: $state") 300 } 301 302 case Heartbeat(workerId, worker) => 303 idToWorker.get(workerId) match { 304 case Some(workerInfo) => 305 workerInfo.lastHeartbeat = System.currentTimeMillis() 306 case None => 307 if (workers.map(_.id).contains(workerId)) { 308 logWarning(s"Got heartbeat from unregistered worker $workerId." + 309 " Asking it to re-register.") 310 worker.send(ReconnectWorker(masterUrl)) 311 } else { 312 logWarning(s"Got heartbeat from unregistered worker $workerId." + 313 " This worker was never registered, so ignoring the heartbeat.") 314 } 315 } 316 317 case MasterChangeAcknowledged(appId) => 318 idToApp.get(appId) match { 319 case Some(app) => 320 logInfo("Application has been re-registered: " + appId) 321 app.state = ApplicationState.WAITING 322 case None => 323 logWarning("Master change ack from unknown app: " + appId) 324 } 325 326 if (canCompleteRecovery) { completeRecovery() } 327 328 case WorkerSchedulerStateResponse(workerId, executors, driverIds) => 329 idToWorker.get(workerId) match { 330 case Some(worker) => 331 logInfo("Worker has been re-registered: " + workerId) 332 worker.state = WorkerState.ALIVE 333 334 val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) 335 for (exec <- validExecutors) { 336 val app = idToApp.get(exec.appId).get 337 val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) 338 worker.addExecutor(execInfo) 339 execInfo.copyState(exec) 340 } 341 342 for (driverId <- driverIds) { 343 drivers.find(_.id == driverId).foreach { driver => 344 driver.worker = Some(worker) 345 driver.state = DriverState.RUNNING 346 worker.drivers(driverId) = driver 347 } 348 } 349 case None => 350 logWarning("Scheduler state from unknown worker: " + workerId) 351 } 352 353 if (canCompleteRecovery) { completeRecovery() } 354 355 case WorkerLatestState(workerId, executors, driverIds) => 356 idToWorker.get(workerId) match { 357 case Some(worker) => 358 for (exec <- executors) { 359 val executorMatches = worker.executors.exists { 360 case (_, e) => e.application.id == exec.appId && e.id == exec.execId 361 } 362 if (!executorMatches) { 363 // master doesn't recognize this executor. So just tell worker to kill it. 364 worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId)) 365 } 366 } 367 368 for (driverId <- driverIds) { 369 val driverMatches = worker.drivers.exists { case (id, _) => id == driverId } 370 if (!driverMatches) { 371 // master doesn't recognize this driver. So just tell worker to kill it. 372 worker.endpoint.send(KillDriver(driverId)) 373 } 374 } 375 case None => 376 logWarning("Worker state from unknown worker: " + workerId) 377 } 378 379 case UnregisterApplication(applicationId) => 380 logInfo(s"Received unregister request from application $applicationId") 381 idToApp.get(applicationId).foreach(finishApplication) 382 383 case CheckForWorkerTimeOut => 384 timeOutDeadWorkers() 385 386 } 387 388 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 389 case RegisterWorker( 390 id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => 391 logInfo("Registering worker %s:%d with %d cores, %s RAM".format( 392 workerHost, workerPort, cores, Utils.megabytesToString(memory))) 393 if (state == RecoveryState.STANDBY) { 394 context.reply(MasterInStandby) 395 } else if (idToWorker.contains(id)) { 396 context.reply(RegisterWorkerFailed("Duplicate worker ID")) 397 } else { 398 val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, 399 workerRef, workerWebUiUrl) 400 if (registerWorker(worker)) { 401 persistenceEngine.addWorker(worker) 402 context.reply(RegisteredWorker(self, masterWebUiUrl)) 403 schedule() 404 } else { 405 val workerAddress = worker.endpoint.address 406 logWarning("Worker registration failed. Attempted to re-register worker at same " + 407 "address: " + workerAddress) 408 context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " 409 + workerAddress)) 410 } 411 } 412 413 case RequestSubmitDriver(description) => 414 if (state != RecoveryState.ALIVE) { 415 val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + 416 "Can only accept driver submissions in ALIVE state." 417 context.reply(SubmitDriverResponse(self, false, None, msg)) 418 } else { 419 logInfo("Driver submitted " + description.command.mainClass) 420 val driver = createDriver(description) 421 persistenceEngine.addDriver(driver) 422 waitingDrivers += driver 423 drivers.add(driver) 424 schedule() 425 426 // TODO: It might be good to instead have the submission client poll the master to determine 427 // the current status of the driver. For now it's simply "fire and forget". 428 429 context.reply(SubmitDriverResponse(self, true, Some(driver.id), 430 s"Driver successfully submitted as ${driver.id}")) 431 } 432 433 case RequestKillDriver(driverId) => 434 if (state != RecoveryState.ALIVE) { 435 val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + 436 s"Can only kill drivers in ALIVE state." 437 context.reply(KillDriverResponse(self, driverId, success = false, msg)) 438 } else { 439 logInfo("Asked to kill driver " + driverId) 440 val driver = drivers.find(_.id == driverId) 441 driver match { 442 case Some(d) => 443 if (waitingDrivers.contains(d)) { 444 waitingDrivers -= d 445 self.send(DriverStateChanged(driverId, DriverState.KILLED, None)) 446 } else { 447 // We just notify the worker to kill the driver here. The final bookkeeping occurs 448 // on the return path when the worker submits a state change back to the master 449 // to notify it that the driver was successfully killed. 450 d.worker.foreach { w => 451 w.endpoint.send(KillDriver(driverId)) 452 } 453 } 454 // TODO: It would be nice for this to be a synchronous response 455 val msg = s"Kill request for $driverId submitted" 456 logInfo(msg) 457 context.reply(KillDriverResponse(self, driverId, success = true, msg)) 458 case None => 459 val msg = s"Driver $driverId has already finished or does not exist" 460 logWarning(msg) 461 context.reply(KillDriverResponse(self, driverId, success = false, msg)) 462 } 463 } 464 465 case RequestDriverStatus(driverId) => 466 if (state != RecoveryState.ALIVE) { 467 val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + 468 "Can only request driver status in ALIVE state." 469 context.reply( 470 DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))) 471 } else { 472 (drivers ++ completedDrivers).find(_.id == driverId) match { 473 case Some(driver) => 474 context.reply(DriverStatusResponse(found = true, Some(driver.state), 475 driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)) 476 case None => 477 context.reply(DriverStatusResponse(found = false, None, None, None, None)) 478 } 479 } 480 481 case RequestMasterState => 482 context.reply(MasterStateResponse( 483 address.host, address.port, restServerBoundPort, 484 workers.toArray, apps.toArray, completedApps.toArray, 485 drivers.toArray, completedDrivers.toArray, state)) 486 487 case BoundPortsRequest => 488 context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort)) 489 490 case RequestExecutors(appId, requestedTotal) => 491 context.reply(handleRequestExecutors(appId, requestedTotal)) 492 493 case KillExecutors(appId, executorIds) => 494 val formattedExecutorIds = formatExecutorIds(executorIds) 495 context.reply(handleKillExecutors(appId, formattedExecutorIds)) 496 } 497 498 override def onDisconnected(address: RpcAddress): Unit = { 499 // The disconnected client could've been either a worker or an app; remove whichever it was 500 logInfo(s"$address got disassociated, removing it.") 501 addressToWorker.get(address).foreach(removeWorker) 502 addressToApp.get(address).foreach(finishApplication) 503 if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } 504 } 505 506 private def canCompleteRecovery = 507 workers.count(_.state == WorkerState.UNKNOWN) == 0 && 508 apps.count(_.state == ApplicationState.UNKNOWN) == 0 509 510 private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], 511 storedWorkers: Seq[WorkerInfo]) { 512 for (app <- storedApps) { 513 logInfo("Trying to recover app: " + app.id) 514 try { 515 registerApplication(app) 516 app.state = ApplicationState.UNKNOWN 517 app.driver.send(MasterChanged(self, masterWebUiUrl)) 518 } catch { 519 case e: Exception => logInfo("App " + app.id + " had exception on reconnect") 520 } 521 } 522 523 for (driver <- storedDrivers) { 524 // Here we just read in the list of drivers. Any drivers associated with now-lost workers 525 // will be re-launched when we detect that the worker is missing. 526 drivers += driver 527 } 528 529 for (worker <- storedWorkers) { 530 logInfo("Trying to recover worker: " + worker.id) 531 try { 532 registerWorker(worker) 533 worker.state = WorkerState.UNKNOWN 534 worker.endpoint.send(MasterChanged(self, masterWebUiUrl)) 535 } catch { 536 case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") 537 } 538 } 539 } 540 541 private def completeRecovery() { 542 // Ensure "only-once" recovery semantics using a short synchronization period. 543 if (state != RecoveryState.RECOVERING) { return } 544 state = RecoveryState.COMPLETING_RECOVERY 545 546 // Kill off any workers and apps that didn't respond to us. 547 workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) 548 apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) 549 550 // Reschedule drivers which were not claimed by any workers 551 drivers.filter(_.worker.isEmpty).foreach { d => 552 logWarning(s"Driver ${d.id} was not found after master recovery") 553 if (d.desc.supervise) { 554 logWarning(s"Re-launching ${d.id}") 555 relaunchDriver(d) 556 } else { 557 removeDriver(d.id, DriverState.ERROR, None) 558 logWarning(s"Did not re-launch ${d.id} because it was not supervised") 559 } 560 } 561 562 state = RecoveryState.ALIVE 563 schedule() 564 logInfo("Recovery complete - resuming operations!") 565 } 566 567 /** 568 * Schedule executors to be launched on the workers. 569 * Returns an array containing number of cores assigned to each worker. 570 * 571 * There are two modes of launching executors. The first attempts to spread out an application's 572 * executors on as many workers as possible, while the second does the opposite (i.e. launch them 573 * on as few workers as possible). The former is usually better for data locality purposes and is 574 * the default. 575 * 576 * The number of cores assigned to each executor is configurable. When this is explicitly set, 577 * multiple executors from the same application may be launched on the same worker if the worker 578 * has enough cores and memory. Otherwise, each executor grabs all the cores available on the 579 * worker by default, in which case only one executor may be launched on each worker. 580 * 581 * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core 582 * at a time). Consider the following example: cluster has 4 workers with 16 cores each. 583 * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is 584 * allocated at a time, 12 cores from each worker would be assigned to each executor. 585 * Since 12 < 16, no executors would launch [SPARK-8881]. 586 */ 587 private def scheduleExecutorsOnWorkers( 588 app: ApplicationInfo, 589 usableWorkers: Array[WorkerInfo], 590 spreadOutApps: Boolean): Array[Int] = { 591 val coresPerExecutor = app.desc.coresPerExecutor 592 val minCoresPerExecutor = coresPerExecutor.getOrElse(1) 593 val oneExecutorPerWorker = coresPerExecutor.isEmpty 594 val memoryPerExecutor = app.desc.memoryPerExecutorMB 595 val numUsable = usableWorkers.length 596 val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker 597 val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker 598 var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) 599 600 /** Return whether the specified worker can launch an executor for this app. */ 601 def canLaunchExecutor(pos: Int): Boolean = { 602 val keepScheduling = coresToAssign >= minCoresPerExecutor 603 val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor 604 605 // If we allow multiple executors per worker, then we can always launch new executors. 606 // Otherwise, if there is already an executor on this worker, just give it more cores. 607 val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 608 if (launchingNewExecutor) { 609 val assignedMemory = assignedExecutors(pos) * memoryPerExecutor 610 val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor 611 val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit 612 keepScheduling && enoughCores && enoughMemory && underLimit 613 } else { 614 // We're adding cores to an existing executor, so no need 615 // to check memory and executor limits 616 keepScheduling && enoughCores 617 } 618 } 619 620 // Keep launching executors until no more workers can accommodate any 621 // more executors, or if we have reached this application's limits 622 var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) 623 while (freeWorkers.nonEmpty) { 624 freeWorkers.foreach { pos => 625 var keepScheduling = true 626 while (keepScheduling && canLaunchExecutor(pos)) { 627 coresToAssign -= minCoresPerExecutor 628 assignedCores(pos) += minCoresPerExecutor 629 630 // If we are launching one executor per worker, then every iteration assigns 1 core 631 // to the executor. Otherwise, every iteration assigns cores to a new executor. 632 if (oneExecutorPerWorker) { 633 assignedExecutors(pos) = 1 634 } else { 635 assignedExecutors(pos) += 1 636 } 637 638 // Spreading out an application means spreading out its executors across as 639 // many workers as possible. If we are not spreading out, then we should keep 640 // scheduling executors on this worker until we use all of its resources. 641 // Otherwise, just move on to the next worker. 642 if (spreadOutApps) { 643 keepScheduling = false 644 } 645 } 646 } 647 freeWorkers = freeWorkers.filter(canLaunchExecutor) 648 } 649 assignedCores 650 } 651 652 /** 653 * Schedule and launch executors on workers 654 */ 655 private def startExecutorsOnWorkers(): Unit = { 656 // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app 657 // in the queue, then the second app, etc. 658 for (app <- waitingApps if app.coresLeft > 0) { 659 val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor 660 // Filter out workers that don't have enough resources to launch an executor 661 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) 662 .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && 663 worker.coresFree >= coresPerExecutor.getOrElse(1)) 664 .sortBy(_.coresFree).reverse 665 val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) 666 667 // Now that we've decided how many cores to allocate on each worker, let's allocate them 668 for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 669 allocateWorkerResourceToExecutors( 670 app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) 671 } 672 } 673 } 674 675 /** 676 * Allocate a worker's resources to one or more executors. 677 * @param app the info of the application which the executors belong to 678 * @param assignedCores number of cores on this worker for this application 679 * @param coresPerExecutor number of cores per executor 680 * @param worker the worker info 681 */ 682 private def allocateWorkerResourceToExecutors( 683 app: ApplicationInfo, 684 assignedCores: Int, 685 coresPerExecutor: Option[Int], 686 worker: WorkerInfo): Unit = { 687 // If the number of cores per executor is specified, we divide the cores assigned 688 // to this worker evenly among the executors with no remainder. 689 // Otherwise, we launch a single executor that grabs all the assignedCores on this worker. 690 val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) 691 val coresToAssign = coresPerExecutor.getOrElse(assignedCores) 692 for (i <- 1 to numExecutors) { 693 val exec = app.addExecutor(worker, coresToAssign) 694 launchExecutor(worker, exec) 695 app.state = ApplicationState.RUNNING 696 } 697 } 698 699 /** 700 * Schedule the currently available resources among waiting apps. This method will be called 701 * every time a new app joins or resource availability changes. 702 */ 703 private def schedule(): Unit = { 704 if (state != RecoveryState.ALIVE) { 705 return 706 } 707 // Drivers take strict precedence over executors 708 val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) 709 val numWorkersAlive = shuffledAliveWorkers.size 710 var curPos = 0 711 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers 712 // We assign workers to each waiting driver in a round-robin fashion. For each driver, we 713 // start from the last worker that was assigned a driver, and continue onwards until we have 714 // explored all alive workers. 715 var launched = false 716 var numWorkersVisited = 0 717 while (numWorkersVisited < numWorkersAlive && !launched) { 718 val worker = shuffledAliveWorkers(curPos) 719 numWorkersVisited += 1 720 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { 721 launchDriver(worker, driver) 722 waitingDrivers -= driver 723 launched = true 724 } 725 curPos = (curPos + 1) % numWorkersAlive 726 } 727 } 728 startExecutorsOnWorkers() 729 } 730 731 private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 732 logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 733 worker.addExecutor(exec) 734 worker.endpoint.send(LaunchExecutor(masterUrl, 735 exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) 736 exec.application.driver.send( 737 ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 738 } 739 740 private def registerWorker(worker: WorkerInfo): Boolean = { 741 // There may be one or more refs to dead workers on this same node (w/ different ID's), 742 // remove them. 743 workers.filter { w => 744 (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD) 745 }.foreach { w => 746 workers -= w 747 } 748 749 val workerAddress = worker.endpoint.address 750 if (addressToWorker.contains(workerAddress)) { 751 val oldWorker = addressToWorker(workerAddress) 752 if (oldWorker.state == WorkerState.UNKNOWN) { 753 // A worker registering from UNKNOWN implies that the worker was restarted during recovery. 754 // The old worker must thus be dead, so we will remove it and accept the new worker. 755 removeWorker(oldWorker) 756 } else { 757 logInfo("Attempted to re-register worker at same address: " + workerAddress) 758 return false 759 } 760 } 761 762 workers += worker 763 idToWorker(worker.id) = worker 764 addressToWorker(workerAddress) = worker 765 if (reverseProxy) { 766 webUi.addProxyTargets(worker.id, worker.webUiAddress) 767 } 768 true 769 } 770 771 private def removeWorker(worker: WorkerInfo) { 772 logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) 773 worker.setState(WorkerState.DEAD) 774 idToWorker -= worker.id 775 addressToWorker -= worker.endpoint.address 776 if (reverseProxy) { 777 webUi.removeProxyTargets(worker.id) 778 } 779 for (exec <- worker.executors.values) { 780 logInfo("Telling app of lost executor: " + exec.id) 781 exec.application.driver.send(ExecutorUpdated( 782 exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true)) 783 exec.state = ExecutorState.LOST 784 exec.application.removeExecutor(exec) 785 } 786 for (driver <- worker.drivers.values) { 787 if (driver.desc.supervise) { 788 logInfo(s"Re-launching ${driver.id}") 789 relaunchDriver(driver) 790 } else { 791 logInfo(s"Not re-launching ${driver.id} because it was not supervised") 792 removeDriver(driver.id, DriverState.ERROR, None) 793 } 794 } 795 persistenceEngine.removeWorker(worker) 796 } 797 798 private def relaunchDriver(driver: DriverInfo) { 799 driver.worker = None 800 driver.state = DriverState.RELAUNCHING 801 waitingDrivers += driver 802 schedule() 803 } 804 805 private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): 806 ApplicationInfo = { 807 val now = System.currentTimeMillis() 808 val date = new Date(now) 809 val appId = newApplicationId(date) 810 new ApplicationInfo(now, appId, desc, date, driver, defaultCores) 811 } 812 813 private def registerApplication(app: ApplicationInfo): Unit = { 814 val appAddress = app.driver.address 815 if (addressToApp.contains(appAddress)) { 816 logInfo("Attempted to re-register application at same address: " + appAddress) 817 return 818 } 819 820 applicationMetricsSystem.registerSource(app.appSource) 821 apps += app 822 idToApp(app.id) = app 823 endpointToApp(app.driver) = app 824 addressToApp(appAddress) = app 825 waitingApps += app 826 if (reverseProxy) { 827 webUi.addProxyTargets(app.id, app.desc.appUiUrl) 828 } 829 } 830 831 private def finishApplication(app: ApplicationInfo) { 832 removeApplication(app, ApplicationState.FINISHED) 833 } 834 835 def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) { 836 if (apps.contains(app)) { 837 logInfo("Removing app " + app.id) 838 apps -= app 839 idToApp -= app.id 840 endpointToApp -= app.driver 841 addressToApp -= app.driver.address 842 if (reverseProxy) { 843 webUi.removeProxyTargets(app.id) 844 } 845 if (completedApps.size >= RETAINED_APPLICATIONS) { 846 val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) 847 completedApps.take(toRemove).foreach { a => 848 applicationMetricsSystem.removeSource(a.appSource) 849 } 850 completedApps.trimStart(toRemove) 851 } 852 completedApps += app // Remember it in our history 853 waitingApps -= app 854 855 for (exec <- app.executors.values) { 856 killExecutor(exec) 857 } 858 app.markFinished(state) 859 if (state != ApplicationState.FINISHED) { 860 app.driver.send(ApplicationRemoved(state.toString)) 861 } 862 persistenceEngine.removeApplication(app) 863 schedule() 864 865 // Tell all workers that the application has finished, so they can clean up any app state. 866 workers.foreach { w => 867 w.endpoint.send(ApplicationFinished(app.id)) 868 } 869 } 870 } 871 872 /** 873 * Handle a request to set the target number of executors for this application. 874 * 875 * If the executor limit is adjusted upwards, new executors will be launched provided 876 * that there are workers with sufficient resources. If it is adjusted downwards, however, 877 * we do not kill existing executors until we explicitly receive a kill request. 878 * 879 * @return whether the application has previously registered with this Master. 880 */ 881 private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = { 882 idToApp.get(appId) match { 883 case Some(appInfo) => 884 logInfo(s"Application $appId requested to set total executors to $requestedTotal.") 885 appInfo.executorLimit = requestedTotal 886 schedule() 887 true 888 case None => 889 logWarning(s"Unknown application $appId requested $requestedTotal total executors.") 890 false 891 } 892 } 893 894 /** 895 * Handle a kill request from the given application. 896 * 897 * This method assumes the executor limit has already been adjusted downwards through 898 * a separate [[RequestExecutors]] message, such that we do not launch new executors 899 * immediately after the old ones are removed. 900 * 901 * @return whether the application has previously registered with this Master. 902 */ 903 private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = { 904 idToApp.get(appId) match { 905 case Some(appInfo) => 906 logInfo(s"Application $appId requests to kill executors: " + executorIds.mkString(", ")) 907 val (known, unknown) = executorIds.partition(appInfo.executors.contains) 908 known.foreach { executorId => 909 val desc = appInfo.executors(executorId) 910 appInfo.removeExecutor(desc) 911 killExecutor(desc) 912 } 913 if (unknown.nonEmpty) { 914 logWarning(s"Application $appId attempted to kill non-existent executors: " 915 + unknown.mkString(", ")) 916 } 917 schedule() 918 true 919 case None => 920 logWarning(s"Unregistered application $appId requested us to kill executors!") 921 false 922 } 923 } 924 925 /** 926 * Cast the given executor IDs to integers and filter out the ones that fail. 927 * 928 * All executors IDs should be integers since we launched these executors. However, 929 * the kill interface on the driver side accepts arbitrary strings, so we need to 930 * handle non-integer executor IDs just to be safe. 931 */ 932 private def formatExecutorIds(executorIds: Seq[String]): Seq[Int] = { 933 executorIds.flatMap { executorId => 934 try { 935 Some(executorId.toInt) 936 } catch { 937 case e: NumberFormatException => 938 logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring") 939 None 940 } 941 } 942 } 943 944 /** 945 * Ask the worker on which the specified executor is launched to kill the executor. 946 */ 947 private def killExecutor(exec: ExecutorDesc): Unit = { 948 exec.worker.removeExecutor(exec) 949 exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id)) 950 exec.state = ExecutorState.KILLED 951 } 952 953 /** Generate a new app ID given an app's submission date */ 954 private def newApplicationId(submitDate: Date): String = { 955 val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber) 956 nextAppNumber += 1 957 appId 958 } 959 960 /** Check for, and remove, any timed-out workers */ 961 private def timeOutDeadWorkers() { 962 // Copy the workers into an array so we don't modify the hashset while iterating through it 963 val currentTime = System.currentTimeMillis() 964 val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray 965 for (worker <- toRemove) { 966 if (worker.state != WorkerState.DEAD) { 967 logWarning("Removing %s because we got no heartbeat in %d seconds".format( 968 worker.id, WORKER_TIMEOUT_MS / 1000)) 969 removeWorker(worker) 970 } else { 971 if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) { 972 workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it 973 } 974 } 975 } 976 } 977 978 private def newDriverId(submitDate: Date): String = { 979 val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber) 980 nextDriverNumber += 1 981 appId 982 } 983 984 private def createDriver(desc: DriverDescription): DriverInfo = { 985 val now = System.currentTimeMillis() 986 val date = new Date(now) 987 new DriverInfo(now, newDriverId(date), desc, date) 988 } 989 990 private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { 991 logInfo("Launching driver " + driver.id + " on worker " + worker.id) 992 worker.addDriver(driver) 993 driver.worker = Some(worker) 994 worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) 995 driver.state = DriverState.RUNNING 996 } 997 998 private def removeDriver( 999 driverId: String, 1000 finalState: DriverState, 1001 exception: Option[Exception]) { 1002 drivers.find(d => d.id == driverId) match { 1003 case Some(driver) => 1004 logInfo(s"Removing driver: $driverId") 1005 drivers -= driver 1006 if (completedDrivers.size >= RETAINED_DRIVERS) { 1007 val toRemove = math.max(RETAINED_DRIVERS / 10, 1) 1008 completedDrivers.trimStart(toRemove) 1009 } 1010 completedDrivers += driver 1011 persistenceEngine.removeDriver(driver) 1012 driver.state = finalState 1013 driver.exception = exception 1014 driver.worker.foreach(w => w.removeDriver(driver)) 1015 schedule() 1016 case None => 1017 logWarning(s"Asked to remove unknown driver: $driverId") 1018 } 1019 } 1020} 1021 1022private[deploy] object Master extends Logging { 1023 val SYSTEM_NAME = "sparkMaster" 1024 val ENDPOINT_NAME = "Master" 1025 1026 def main(argStrings: Array[String]) { 1027 Utils.initDaemon(log) 1028 val conf = new SparkConf 1029 val args = new MasterArguments(argStrings, conf) 1030 val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) 1031 rpcEnv.awaitTermination() 1032 } 1033 1034 /** 1035 * Start the Master and return a three tuple of: 1036 * (1) The Master RpcEnv 1037 * (2) The web UI bound port 1038 * (3) The REST server bound port, if any 1039 */ 1040 def startRpcEnvAndEndpoint( 1041 host: String, 1042 port: Int, 1043 webUiPort: Int, 1044 conf: SparkConf): (RpcEnv, Int, Option[Int]) = { 1045 val securityMgr = new SecurityManager(conf) 1046 val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) 1047 val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, 1048 new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) 1049 val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) 1050 (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) 1051 } 1052} 1053