1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.deploy.master
19
20import java.text.SimpleDateFormat
21import java.util.{Date, Locale}
22import java.util.concurrent.{ScheduledFuture, TimeUnit}
23
24import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
25import scala.util.Random
26
27import org.apache.spark.{SecurityManager, SparkConf, SparkException}
28import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
29  ExecutorState, SparkHadoopUtil}
30import org.apache.spark.deploy.DeployMessages._
31import org.apache.spark.deploy.master.DriverState.DriverState
32import org.apache.spark.deploy.master.MasterMessages._
33import org.apache.spark.deploy.master.ui.MasterWebUI
34import org.apache.spark.deploy.rest.StandaloneRestServer
35import org.apache.spark.internal.Logging
36import org.apache.spark.metrics.MetricsSystem
37import org.apache.spark.rpc._
38import org.apache.spark.serializer.{JavaSerializer, Serializer}
39import org.apache.spark.util.{ThreadUtils, Utils}
40
41private[deploy] class Master(
42    override val rpcEnv: RpcEnv,
43    address: RpcAddress,
44    webUiPort: Int,
45    val securityMgr: SecurityManager,
46    val conf: SparkConf)
47  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
48
49  private val forwardMessageThread =
50    ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
51
52  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
53
54  // For application IDs
55  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
56
57  private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
58  private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
59  private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
60  private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
61  private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
62  private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)
63
64  val workers = new HashSet[WorkerInfo]
65  val idToApp = new HashMap[String, ApplicationInfo]
66  private val waitingApps = new ArrayBuffer[ApplicationInfo]
67  val apps = new HashSet[ApplicationInfo]
68
69  private val idToWorker = new HashMap[String, WorkerInfo]
70  private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
71
72  private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
73  private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
74  private val completedApps = new ArrayBuffer[ApplicationInfo]
75  private var nextAppNumber = 0
76
77  private val drivers = new HashSet[DriverInfo]
78  private val completedDrivers = new ArrayBuffer[DriverInfo]
79  // Drivers currently spooled for scheduling
80  private val waitingDrivers = new ArrayBuffer[DriverInfo]
81  private var nextDriverNumber = 0
82
83  Utils.checkHost(address.host, "Expected hostname")
84
85  private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
86  private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
87    securityMgr)
88  private val masterSource = new MasterSource(this)
89
90  // After onStart, webUi will be set
91  private var webUi: MasterWebUI = null
92
93  private val masterPublicAddress = {
94    val envVar = conf.getenv("SPARK_PUBLIC_DNS")
95    if (envVar != null) envVar else address.host
96  }
97
98  private val masterUrl = address.toSparkURL
99  private var masterWebUiUrl: String = _
100
101  private var state = RecoveryState.STANDBY
102
103  private var persistenceEngine: PersistenceEngine = _
104
105  private var leaderElectionAgent: LeaderElectionAgent = _
106
107  private var recoveryCompletionTask: ScheduledFuture[_] = _
108
109  private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
110
111  // As a temporary workaround before better ways of configuring memory, we allow users to set
112  // a flag that will perform round-robin scheduling across the nodes (spreading out each app
113  // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
114  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
115
116  // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
117  private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
118  val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
119  if (defaultCores < 1) {
120    throw new SparkException("spark.deploy.defaultCores must be positive")
121  }
122
123  // Alternative application submission gateway that is stable across Spark versions
124  private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
125  private var restServer: Option[StandaloneRestServer] = None
126  private var restServerBoundPort: Option[Int] = None
127
128  override def onStart(): Unit = {
129    logInfo("Starting Spark master at " + masterUrl)
130    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
131    webUi = new MasterWebUI(this, webUiPort)
132    webUi.bind()
133    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
134    if (reverseProxy) {
135      masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
136      logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
137       s"Applications UIs are available at $masterWebUiUrl")
138    }
139    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
140      override def run(): Unit = Utils.tryLogNonFatalError {
141        self.send(CheckForWorkerTimeOut)
142      }
143    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
144
145    if (restServerEnabled) {
146      val port = conf.getInt("spark.master.rest.port", 6066)
147      restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
148    }
149    restServerBoundPort = restServer.map(_.start())
150
151    masterMetricsSystem.registerSource(masterSource)
152    masterMetricsSystem.start()
153    applicationMetricsSystem.start()
154    // Attach the master and app metrics servlet handler to the web ui after the metrics systems are
155    // started.
156    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
157    applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
158
159    val serializer = new JavaSerializer(conf)
160    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
161      case "ZOOKEEPER" =>
162        logInfo("Persisting recovery state to ZooKeeper")
163        val zkFactory =
164          new ZooKeeperRecoveryModeFactory(conf, serializer)
165        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
166      case "FILESYSTEM" =>
167        val fsFactory =
168          new FileSystemRecoveryModeFactory(conf, serializer)
169        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
170      case "CUSTOM" =>
171        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
172        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
173          .newInstance(conf, serializer)
174          .asInstanceOf[StandaloneRecoveryModeFactory]
175        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
176      case _ =>
177        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
178    }
179    persistenceEngine = persistenceEngine_
180    leaderElectionAgent = leaderElectionAgent_
181  }
182
183  override def onStop() {
184    masterMetricsSystem.report()
185    applicationMetricsSystem.report()
186    // prevent the CompleteRecovery message sending to restarted master
187    if (recoveryCompletionTask != null) {
188      recoveryCompletionTask.cancel(true)
189    }
190    if (checkForWorkerTimeOutTask != null) {
191      checkForWorkerTimeOutTask.cancel(true)
192    }
193    forwardMessageThread.shutdownNow()
194    webUi.stop()
195    restServer.foreach(_.stop())
196    masterMetricsSystem.stop()
197    applicationMetricsSystem.stop()
198    persistenceEngine.close()
199    leaderElectionAgent.stop()
200  }
201
202  override def electedLeader() {
203    self.send(ElectedLeader)
204  }
205
206  override def revokedLeadership() {
207    self.send(RevokedLeadership)
208  }
209
210  override def receive: PartialFunction[Any, Unit] = {
211    case ElectedLeader =>
212      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
213      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
214        RecoveryState.ALIVE
215      } else {
216        RecoveryState.RECOVERING
217      }
218      logInfo("I have been elected leader! New state: " + state)
219      if (state == RecoveryState.RECOVERING) {
220        beginRecovery(storedApps, storedDrivers, storedWorkers)
221        recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
222          override def run(): Unit = Utils.tryLogNonFatalError {
223            self.send(CompleteRecovery)
224          }
225        }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
226      }
227
228    case CompleteRecovery => completeRecovery()
229
230    case RevokedLeadership =>
231      logError("Leadership has been revoked -- master shutting down.")
232      System.exit(0)
233
234    case RegisterApplication(description, driver) =>
235      // TODO Prevent repeated registrations from some driver
236      if (state == RecoveryState.STANDBY) {
237        // ignore, don't send response
238      } else {
239        logInfo("Registering app " + description.name)
240        val app = createApplication(description, driver)
241        registerApplication(app)
242        logInfo("Registered app " + description.name + " with ID " + app.id)
243        persistenceEngine.addApplication(app)
244        driver.send(RegisteredApplication(app.id, self))
245        schedule()
246      }
247
248    case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
249      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
250      execOption match {
251        case Some(exec) =>
252          val appInfo = idToApp(appId)
253          val oldState = exec.state
254          exec.state = state
255
256          if (state == ExecutorState.RUNNING) {
257            assert(oldState == ExecutorState.LAUNCHING,
258              s"executor $execId state transfer from $oldState to RUNNING is illegal")
259            appInfo.resetRetryCount()
260          }
261
262          exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
263
264          if (ExecutorState.isFinished(state)) {
265            // Remove this executor from the worker and app
266            logInfo(s"Removing executor ${exec.fullId} because it is $state")
267            // If an application has already finished, preserve its
268            // state to display its information properly on the UI
269            if (!appInfo.isFinished) {
270              appInfo.removeExecutor(exec)
271            }
272            exec.worker.removeExecutor(exec)
273
274            val normalExit = exitStatus == Some(0)
275            // Only retry certain number of times so we don't go into an infinite loop.
276            // Important note: this code path is not exercised by tests, so be very careful when
277            // changing this `if` condition.
278            if (!normalExit
279                && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
280                && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
281              val execs = appInfo.executors.values
282              if (!execs.exists(_.state == ExecutorState.RUNNING)) {
283                logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
284                  s"${appInfo.retryCount} times; removing it")
285                removeApplication(appInfo, ApplicationState.FAILED)
286              }
287            }
288          }
289          schedule()
290        case None =>
291          logWarning(s"Got status update for unknown executor $appId/$execId")
292      }
293
294    case DriverStateChanged(driverId, state, exception) =>
295      state match {
296        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
297          removeDriver(driverId, state, exception)
298        case _ =>
299          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
300      }
301
302    case Heartbeat(workerId, worker) =>
303      idToWorker.get(workerId) match {
304        case Some(workerInfo) =>
305          workerInfo.lastHeartbeat = System.currentTimeMillis()
306        case None =>
307          if (workers.map(_.id).contains(workerId)) {
308            logWarning(s"Got heartbeat from unregistered worker $workerId." +
309              " Asking it to re-register.")
310            worker.send(ReconnectWorker(masterUrl))
311          } else {
312            logWarning(s"Got heartbeat from unregistered worker $workerId." +
313              " This worker was never registered, so ignoring the heartbeat.")
314          }
315      }
316
317    case MasterChangeAcknowledged(appId) =>
318      idToApp.get(appId) match {
319        case Some(app) =>
320          logInfo("Application has been re-registered: " + appId)
321          app.state = ApplicationState.WAITING
322        case None =>
323          logWarning("Master change ack from unknown app: " + appId)
324      }
325
326      if (canCompleteRecovery) { completeRecovery() }
327
328    case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
329      idToWorker.get(workerId) match {
330        case Some(worker) =>
331          logInfo("Worker has been re-registered: " + workerId)
332          worker.state = WorkerState.ALIVE
333
334          val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
335          for (exec <- validExecutors) {
336            val app = idToApp.get(exec.appId).get
337            val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
338            worker.addExecutor(execInfo)
339            execInfo.copyState(exec)
340          }
341
342          for (driverId <- driverIds) {
343            drivers.find(_.id == driverId).foreach { driver =>
344              driver.worker = Some(worker)
345              driver.state = DriverState.RUNNING
346              worker.drivers(driverId) = driver
347            }
348          }
349        case None =>
350          logWarning("Scheduler state from unknown worker: " + workerId)
351      }
352
353      if (canCompleteRecovery) { completeRecovery() }
354
355    case WorkerLatestState(workerId, executors, driverIds) =>
356      idToWorker.get(workerId) match {
357        case Some(worker) =>
358          for (exec <- executors) {
359            val executorMatches = worker.executors.exists {
360              case (_, e) => e.application.id == exec.appId && e.id == exec.execId
361            }
362            if (!executorMatches) {
363              // master doesn't recognize this executor. So just tell worker to kill it.
364              worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
365            }
366          }
367
368          for (driverId <- driverIds) {
369            val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
370            if (!driverMatches) {
371              // master doesn't recognize this driver. So just tell worker to kill it.
372              worker.endpoint.send(KillDriver(driverId))
373            }
374          }
375        case None =>
376          logWarning("Worker state from unknown worker: " + workerId)
377      }
378
379    case UnregisterApplication(applicationId) =>
380      logInfo(s"Received unregister request from application $applicationId")
381      idToApp.get(applicationId).foreach(finishApplication)
382
383    case CheckForWorkerTimeOut =>
384      timeOutDeadWorkers()
385
386  }
387
388  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
389    case RegisterWorker(
390        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
391      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
392        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
393      if (state == RecoveryState.STANDBY) {
394        context.reply(MasterInStandby)
395      } else if (idToWorker.contains(id)) {
396        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
397      } else {
398        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
399          workerRef, workerWebUiUrl)
400        if (registerWorker(worker)) {
401          persistenceEngine.addWorker(worker)
402          context.reply(RegisteredWorker(self, masterWebUiUrl))
403          schedule()
404        } else {
405          val workerAddress = worker.endpoint.address
406          logWarning("Worker registration failed. Attempted to re-register worker at same " +
407            "address: " + workerAddress)
408          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
409            + workerAddress))
410        }
411      }
412
413    case RequestSubmitDriver(description) =>
414      if (state != RecoveryState.ALIVE) {
415        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
416          "Can only accept driver submissions in ALIVE state."
417        context.reply(SubmitDriverResponse(self, false, None, msg))
418      } else {
419        logInfo("Driver submitted " + description.command.mainClass)
420        val driver = createDriver(description)
421        persistenceEngine.addDriver(driver)
422        waitingDrivers += driver
423        drivers.add(driver)
424        schedule()
425
426        // TODO: It might be good to instead have the submission client poll the master to determine
427        //       the current status of the driver. For now it's simply "fire and forget".
428
429        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
430          s"Driver successfully submitted as ${driver.id}"))
431      }
432
433    case RequestKillDriver(driverId) =>
434      if (state != RecoveryState.ALIVE) {
435        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
436          s"Can only kill drivers in ALIVE state."
437        context.reply(KillDriverResponse(self, driverId, success = false, msg))
438      } else {
439        logInfo("Asked to kill driver " + driverId)
440        val driver = drivers.find(_.id == driverId)
441        driver match {
442          case Some(d) =>
443            if (waitingDrivers.contains(d)) {
444              waitingDrivers -= d
445              self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
446            } else {
447              // We just notify the worker to kill the driver here. The final bookkeeping occurs
448              // on the return path when the worker submits a state change back to the master
449              // to notify it that the driver was successfully killed.
450              d.worker.foreach { w =>
451                w.endpoint.send(KillDriver(driverId))
452              }
453            }
454            // TODO: It would be nice for this to be a synchronous response
455            val msg = s"Kill request for $driverId submitted"
456            logInfo(msg)
457            context.reply(KillDriverResponse(self, driverId, success = true, msg))
458          case None =>
459            val msg = s"Driver $driverId has already finished or does not exist"
460            logWarning(msg)
461            context.reply(KillDriverResponse(self, driverId, success = false, msg))
462        }
463      }
464
465    case RequestDriverStatus(driverId) =>
466      if (state != RecoveryState.ALIVE) {
467        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
468          "Can only request driver status in ALIVE state."
469        context.reply(
470          DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))))
471      } else {
472        (drivers ++ completedDrivers).find(_.id == driverId) match {
473          case Some(driver) =>
474            context.reply(DriverStatusResponse(found = true, Some(driver.state),
475              driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception))
476          case None =>
477            context.reply(DriverStatusResponse(found = false, None, None, None, None))
478        }
479      }
480
481    case RequestMasterState =>
482      context.reply(MasterStateResponse(
483        address.host, address.port, restServerBoundPort,
484        workers.toArray, apps.toArray, completedApps.toArray,
485        drivers.toArray, completedDrivers.toArray, state))
486
487    case BoundPortsRequest =>
488      context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
489
490    case RequestExecutors(appId, requestedTotal) =>
491      context.reply(handleRequestExecutors(appId, requestedTotal))
492
493    case KillExecutors(appId, executorIds) =>
494      val formattedExecutorIds = formatExecutorIds(executorIds)
495      context.reply(handleKillExecutors(appId, formattedExecutorIds))
496  }
497
498  override def onDisconnected(address: RpcAddress): Unit = {
499    // The disconnected client could've been either a worker or an app; remove whichever it was
500    logInfo(s"$address got disassociated, removing it.")
501    addressToWorker.get(address).foreach(removeWorker)
502    addressToApp.get(address).foreach(finishApplication)
503    if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
504  }
505
506  private def canCompleteRecovery =
507    workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
508      apps.count(_.state == ApplicationState.UNKNOWN) == 0
509
510  private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
511      storedWorkers: Seq[WorkerInfo]) {
512    for (app <- storedApps) {
513      logInfo("Trying to recover app: " + app.id)
514      try {
515        registerApplication(app)
516        app.state = ApplicationState.UNKNOWN
517        app.driver.send(MasterChanged(self, masterWebUiUrl))
518      } catch {
519        case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
520      }
521    }
522
523    for (driver <- storedDrivers) {
524      // Here we just read in the list of drivers. Any drivers associated with now-lost workers
525      // will be re-launched when we detect that the worker is missing.
526      drivers += driver
527    }
528
529    for (worker <- storedWorkers) {
530      logInfo("Trying to recover worker: " + worker.id)
531      try {
532        registerWorker(worker)
533        worker.state = WorkerState.UNKNOWN
534        worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
535      } catch {
536        case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
537      }
538    }
539  }
540
541  private def completeRecovery() {
542    // Ensure "only-once" recovery semantics using a short synchronization period.
543    if (state != RecoveryState.RECOVERING) { return }
544    state = RecoveryState.COMPLETING_RECOVERY
545
546    // Kill off any workers and apps that didn't respond to us.
547    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
548    apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
549
550    // Reschedule drivers which were not claimed by any workers
551    drivers.filter(_.worker.isEmpty).foreach { d =>
552      logWarning(s"Driver ${d.id} was not found after master recovery")
553      if (d.desc.supervise) {
554        logWarning(s"Re-launching ${d.id}")
555        relaunchDriver(d)
556      } else {
557        removeDriver(d.id, DriverState.ERROR, None)
558        logWarning(s"Did not re-launch ${d.id} because it was not supervised")
559      }
560    }
561
562    state = RecoveryState.ALIVE
563    schedule()
564    logInfo("Recovery complete - resuming operations!")
565  }
566
567  /**
568   * Schedule executors to be launched on the workers.
569   * Returns an array containing number of cores assigned to each worker.
570   *
571   * There are two modes of launching executors. The first attempts to spread out an application's
572   * executors on as many workers as possible, while the second does the opposite (i.e. launch them
573   * on as few workers as possible). The former is usually better for data locality purposes and is
574   * the default.
575   *
576   * The number of cores assigned to each executor is configurable. When this is explicitly set,
577   * multiple executors from the same application may be launched on the same worker if the worker
578   * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
579   * worker by default, in which case only one executor may be launched on each worker.
580   *
581   * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
582   * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
583   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
584   * allocated at a time, 12 cores from each worker would be assigned to each executor.
585   * Since 12 < 16, no executors would launch [SPARK-8881].
586   */
587  private def scheduleExecutorsOnWorkers(
588      app: ApplicationInfo,
589      usableWorkers: Array[WorkerInfo],
590      spreadOutApps: Boolean): Array[Int] = {
591    val coresPerExecutor = app.desc.coresPerExecutor
592    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
593    val oneExecutorPerWorker = coresPerExecutor.isEmpty
594    val memoryPerExecutor = app.desc.memoryPerExecutorMB
595    val numUsable = usableWorkers.length
596    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
597    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
598    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
599
600    /** Return whether the specified worker can launch an executor for this app. */
601    def canLaunchExecutor(pos: Int): Boolean = {
602      val keepScheduling = coresToAssign >= minCoresPerExecutor
603      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
604
605      // If we allow multiple executors per worker, then we can always launch new executors.
606      // Otherwise, if there is already an executor on this worker, just give it more cores.
607      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
608      if (launchingNewExecutor) {
609        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
610        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
611        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
612        keepScheduling && enoughCores && enoughMemory && underLimit
613      } else {
614        // We're adding cores to an existing executor, so no need
615        // to check memory and executor limits
616        keepScheduling && enoughCores
617      }
618    }
619
620    // Keep launching executors until no more workers can accommodate any
621    // more executors, or if we have reached this application's limits
622    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
623    while (freeWorkers.nonEmpty) {
624      freeWorkers.foreach { pos =>
625        var keepScheduling = true
626        while (keepScheduling && canLaunchExecutor(pos)) {
627          coresToAssign -= minCoresPerExecutor
628          assignedCores(pos) += minCoresPerExecutor
629
630          // If we are launching one executor per worker, then every iteration assigns 1 core
631          // to the executor. Otherwise, every iteration assigns cores to a new executor.
632          if (oneExecutorPerWorker) {
633            assignedExecutors(pos) = 1
634          } else {
635            assignedExecutors(pos) += 1
636          }
637
638          // Spreading out an application means spreading out its executors across as
639          // many workers as possible. If we are not spreading out, then we should keep
640          // scheduling executors on this worker until we use all of its resources.
641          // Otherwise, just move on to the next worker.
642          if (spreadOutApps) {
643            keepScheduling = false
644          }
645        }
646      }
647      freeWorkers = freeWorkers.filter(canLaunchExecutor)
648    }
649    assignedCores
650  }
651
652  /**
653   * Schedule and launch executors on workers
654   */
655  private def startExecutorsOnWorkers(): Unit = {
656    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
657    // in the queue, then the second app, etc.
658    for (app <- waitingApps if app.coresLeft > 0) {
659      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
660      // Filter out workers that don't have enough resources to launch an executor
661      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
662        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
663          worker.coresFree >= coresPerExecutor.getOrElse(1))
664        .sortBy(_.coresFree).reverse
665      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
666
667      // Now that we've decided how many cores to allocate on each worker, let's allocate them
668      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
669        allocateWorkerResourceToExecutors(
670          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
671      }
672    }
673  }
674
675  /**
676   * Allocate a worker's resources to one or more executors.
677   * @param app the info of the application which the executors belong to
678   * @param assignedCores number of cores on this worker for this application
679   * @param coresPerExecutor number of cores per executor
680   * @param worker the worker info
681   */
682  private def allocateWorkerResourceToExecutors(
683      app: ApplicationInfo,
684      assignedCores: Int,
685      coresPerExecutor: Option[Int],
686      worker: WorkerInfo): Unit = {
687    // If the number of cores per executor is specified, we divide the cores assigned
688    // to this worker evenly among the executors with no remainder.
689    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
690    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
691    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
692    for (i <- 1 to numExecutors) {
693      val exec = app.addExecutor(worker, coresToAssign)
694      launchExecutor(worker, exec)
695      app.state = ApplicationState.RUNNING
696    }
697  }
698
699  /**
700   * Schedule the currently available resources among waiting apps. This method will be called
701   * every time a new app joins or resource availability changes.
702   */
703  private def schedule(): Unit = {
704    if (state != RecoveryState.ALIVE) {
705      return
706    }
707    // Drivers take strict precedence over executors
708    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
709    val numWorkersAlive = shuffledAliveWorkers.size
710    var curPos = 0
711    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
712      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
713      // start from the last worker that was assigned a driver, and continue onwards until we have
714      // explored all alive workers.
715      var launched = false
716      var numWorkersVisited = 0
717      while (numWorkersVisited < numWorkersAlive && !launched) {
718        val worker = shuffledAliveWorkers(curPos)
719        numWorkersVisited += 1
720        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
721          launchDriver(worker, driver)
722          waitingDrivers -= driver
723          launched = true
724        }
725        curPos = (curPos + 1) % numWorkersAlive
726      }
727    }
728    startExecutorsOnWorkers()
729  }
730
731  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
732    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
733    worker.addExecutor(exec)
734    worker.endpoint.send(LaunchExecutor(masterUrl,
735      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
736    exec.application.driver.send(
737      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
738  }
739
740  private def registerWorker(worker: WorkerInfo): Boolean = {
741    // There may be one or more refs to dead workers on this same node (w/ different ID's),
742    // remove them.
743    workers.filter { w =>
744      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
745    }.foreach { w =>
746      workers -= w
747    }
748
749    val workerAddress = worker.endpoint.address
750    if (addressToWorker.contains(workerAddress)) {
751      val oldWorker = addressToWorker(workerAddress)
752      if (oldWorker.state == WorkerState.UNKNOWN) {
753        // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
754        // The old worker must thus be dead, so we will remove it and accept the new worker.
755        removeWorker(oldWorker)
756      } else {
757        logInfo("Attempted to re-register worker at same address: " + workerAddress)
758        return false
759      }
760    }
761
762    workers += worker
763    idToWorker(worker.id) = worker
764    addressToWorker(workerAddress) = worker
765    if (reverseProxy) {
766       webUi.addProxyTargets(worker.id, worker.webUiAddress)
767    }
768    true
769  }
770
771  private def removeWorker(worker: WorkerInfo) {
772    logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
773    worker.setState(WorkerState.DEAD)
774    idToWorker -= worker.id
775    addressToWorker -= worker.endpoint.address
776    if (reverseProxy) {
777      webUi.removeProxyTargets(worker.id)
778    }
779    for (exec <- worker.executors.values) {
780      logInfo("Telling app of lost executor: " + exec.id)
781      exec.application.driver.send(ExecutorUpdated(
782        exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
783      exec.state = ExecutorState.LOST
784      exec.application.removeExecutor(exec)
785    }
786    for (driver <- worker.drivers.values) {
787      if (driver.desc.supervise) {
788        logInfo(s"Re-launching ${driver.id}")
789        relaunchDriver(driver)
790      } else {
791        logInfo(s"Not re-launching ${driver.id} because it was not supervised")
792        removeDriver(driver.id, DriverState.ERROR, None)
793      }
794    }
795    persistenceEngine.removeWorker(worker)
796  }
797
798  private def relaunchDriver(driver: DriverInfo) {
799    driver.worker = None
800    driver.state = DriverState.RELAUNCHING
801    waitingDrivers += driver
802    schedule()
803  }
804
805  private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
806      ApplicationInfo = {
807    val now = System.currentTimeMillis()
808    val date = new Date(now)
809    val appId = newApplicationId(date)
810    new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
811  }
812
813  private def registerApplication(app: ApplicationInfo): Unit = {
814    val appAddress = app.driver.address
815    if (addressToApp.contains(appAddress)) {
816      logInfo("Attempted to re-register application at same address: " + appAddress)
817      return
818    }
819
820    applicationMetricsSystem.registerSource(app.appSource)
821    apps += app
822    idToApp(app.id) = app
823    endpointToApp(app.driver) = app
824    addressToApp(appAddress) = app
825    waitingApps += app
826    if (reverseProxy) {
827      webUi.addProxyTargets(app.id, app.desc.appUiUrl)
828    }
829  }
830
831  private def finishApplication(app: ApplicationInfo) {
832    removeApplication(app, ApplicationState.FINISHED)
833  }
834
835  def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
836    if (apps.contains(app)) {
837      logInfo("Removing app " + app.id)
838      apps -= app
839      idToApp -= app.id
840      endpointToApp -= app.driver
841      addressToApp -= app.driver.address
842      if (reverseProxy) {
843        webUi.removeProxyTargets(app.id)
844      }
845      if (completedApps.size >= RETAINED_APPLICATIONS) {
846        val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
847        completedApps.take(toRemove).foreach { a =>
848          applicationMetricsSystem.removeSource(a.appSource)
849        }
850        completedApps.trimStart(toRemove)
851      }
852      completedApps += app // Remember it in our history
853      waitingApps -= app
854
855      for (exec <- app.executors.values) {
856        killExecutor(exec)
857      }
858      app.markFinished(state)
859      if (state != ApplicationState.FINISHED) {
860        app.driver.send(ApplicationRemoved(state.toString))
861      }
862      persistenceEngine.removeApplication(app)
863      schedule()
864
865      // Tell all workers that the application has finished, so they can clean up any app state.
866      workers.foreach { w =>
867        w.endpoint.send(ApplicationFinished(app.id))
868      }
869    }
870  }
871
872  /**
873   * Handle a request to set the target number of executors for this application.
874   *
875   * If the executor limit is adjusted upwards, new executors will be launched provided
876   * that there are workers with sufficient resources. If it is adjusted downwards, however,
877   * we do not kill existing executors until we explicitly receive a kill request.
878   *
879   * @return whether the application has previously registered with this Master.
880   */
881  private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = {
882    idToApp.get(appId) match {
883      case Some(appInfo) =>
884        logInfo(s"Application $appId requested to set total executors to $requestedTotal.")
885        appInfo.executorLimit = requestedTotal
886        schedule()
887        true
888      case None =>
889        logWarning(s"Unknown application $appId requested $requestedTotal total executors.")
890        false
891    }
892  }
893
894  /**
895   * Handle a kill request from the given application.
896   *
897   * This method assumes the executor limit has already been adjusted downwards through
898   * a separate [[RequestExecutors]] message, such that we do not launch new executors
899   * immediately after the old ones are removed.
900   *
901   * @return whether the application has previously registered with this Master.
902   */
903  private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = {
904    idToApp.get(appId) match {
905      case Some(appInfo) =>
906        logInfo(s"Application $appId requests to kill executors: " + executorIds.mkString(", "))
907        val (known, unknown) = executorIds.partition(appInfo.executors.contains)
908        known.foreach { executorId =>
909          val desc = appInfo.executors(executorId)
910          appInfo.removeExecutor(desc)
911          killExecutor(desc)
912        }
913        if (unknown.nonEmpty) {
914          logWarning(s"Application $appId attempted to kill non-existent executors: "
915            + unknown.mkString(", "))
916        }
917        schedule()
918        true
919      case None =>
920        logWarning(s"Unregistered application $appId requested us to kill executors!")
921        false
922    }
923  }
924
925  /**
926   * Cast the given executor IDs to integers and filter out the ones that fail.
927   *
928   * All executors IDs should be integers since we launched these executors. However,
929   * the kill interface on the driver side accepts arbitrary strings, so we need to
930   * handle non-integer executor IDs just to be safe.
931   */
932  private def formatExecutorIds(executorIds: Seq[String]): Seq[Int] = {
933    executorIds.flatMap { executorId =>
934      try {
935        Some(executorId.toInt)
936      } catch {
937        case e: NumberFormatException =>
938          logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring")
939          None
940      }
941    }
942  }
943
944  /**
945   * Ask the worker on which the specified executor is launched to kill the executor.
946   */
947  private def killExecutor(exec: ExecutorDesc): Unit = {
948    exec.worker.removeExecutor(exec)
949    exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
950    exec.state = ExecutorState.KILLED
951  }
952
953  /** Generate a new app ID given an app's submission date */
954  private def newApplicationId(submitDate: Date): String = {
955    val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
956    nextAppNumber += 1
957    appId
958  }
959
960  /** Check for, and remove, any timed-out workers */
961  private def timeOutDeadWorkers() {
962    // Copy the workers into an array so we don't modify the hashset while iterating through it
963    val currentTime = System.currentTimeMillis()
964    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
965    for (worker <- toRemove) {
966      if (worker.state != WorkerState.DEAD) {
967        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
968          worker.id, WORKER_TIMEOUT_MS / 1000))
969        removeWorker(worker)
970      } else {
971        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
972          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
973        }
974      }
975    }
976  }
977
978  private def newDriverId(submitDate: Date): String = {
979    val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
980    nextDriverNumber += 1
981    appId
982  }
983
984  private def createDriver(desc: DriverDescription): DriverInfo = {
985    val now = System.currentTimeMillis()
986    val date = new Date(now)
987    new DriverInfo(now, newDriverId(date), desc, date)
988  }
989
990  private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
991    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
992    worker.addDriver(driver)
993    driver.worker = Some(worker)
994    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
995    driver.state = DriverState.RUNNING
996  }
997
998  private def removeDriver(
999      driverId: String,
1000      finalState: DriverState,
1001      exception: Option[Exception]) {
1002    drivers.find(d => d.id == driverId) match {
1003      case Some(driver) =>
1004        logInfo(s"Removing driver: $driverId")
1005        drivers -= driver
1006        if (completedDrivers.size >= RETAINED_DRIVERS) {
1007          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
1008          completedDrivers.trimStart(toRemove)
1009        }
1010        completedDrivers += driver
1011        persistenceEngine.removeDriver(driver)
1012        driver.state = finalState
1013        driver.exception = exception
1014        driver.worker.foreach(w => w.removeDriver(driver))
1015        schedule()
1016      case None =>
1017        logWarning(s"Asked to remove unknown driver: $driverId")
1018    }
1019  }
1020}
1021
1022private[deploy] object Master extends Logging {
1023  val SYSTEM_NAME = "sparkMaster"
1024  val ENDPOINT_NAME = "Master"
1025
1026  def main(argStrings: Array[String]) {
1027    Utils.initDaemon(log)
1028    val conf = new SparkConf
1029    val args = new MasterArguments(argStrings, conf)
1030    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
1031    rpcEnv.awaitTermination()
1032  }
1033
1034  /**
1035   * Start the Master and return a three tuple of:
1036   *   (1) The Master RpcEnv
1037   *   (2) The web UI bound port
1038   *   (3) The REST server bound port, if any
1039   */
1040  def startRpcEnvAndEndpoint(
1041      host: String,
1042      port: Int,
1043      webUiPort: Int,
1044      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
1045    val securityMgr = new SecurityManager(conf)
1046    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
1047    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
1048      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
1049    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
1050    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
1051  }
1052}
1053