1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.deploy 19 20import scala.collection.immutable.List 21 22import org.apache.spark.deploy.ExecutorState.ExecutorState 23import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} 24import org.apache.spark.deploy.master.DriverState.DriverState 25import org.apache.spark.deploy.master.RecoveryState.MasterState 26import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} 27import org.apache.spark.rpc.RpcEndpointRef 28import org.apache.spark.util.Utils 29 30private[deploy] sealed trait DeployMessage extends Serializable 31 32/** Contains messages sent between Scheduler endpoint nodes. */ 33private[deploy] object DeployMessages { 34 35 // Worker to Master 36 37 case class RegisterWorker( 38 id: String, 39 host: String, 40 port: Int, 41 worker: RpcEndpointRef, 42 cores: Int, 43 memory: Int, 44 workerWebUiUrl: String) 45 extends DeployMessage { 46 Utils.checkHost(host, "Required hostname") 47 assert (port > 0) 48 } 49 50 case class ExecutorStateChanged( 51 appId: String, 52 execId: Int, 53 state: ExecutorState, 54 message: Option[String], 55 exitStatus: Option[Int]) 56 extends DeployMessage 57 58 case class DriverStateChanged( 59 driverId: String, 60 state: DriverState, 61 exception: Option[Exception]) 62 extends DeployMessage 63 64 case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription], 65 driverIds: Seq[String]) 66 67 /** 68 * A worker will send this message to the master when it registers with the master. Then the 69 * master will compare them with the executors and drivers in the master and tell the worker to 70 * kill the unknown executors and drivers. 71 */ 72 case class WorkerLatestState( 73 id: String, 74 executors: Seq[ExecutorDescription], 75 driverIds: Seq[String]) extends DeployMessage 76 77 case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage 78 79 // Master to Worker 80 81 sealed trait RegisterWorkerResponse 82 83 case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage 84 with RegisterWorkerResponse 85 86 case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse 87 88 case object MasterInStandby extends DeployMessage with RegisterWorkerResponse 89 90 case class ReconnectWorker(masterUrl: String) extends DeployMessage 91 92 case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage 93 94 case class LaunchExecutor( 95 masterUrl: String, 96 appId: String, 97 execId: Int, 98 appDesc: ApplicationDescription, 99 cores: Int, 100 memory: Int) 101 extends DeployMessage 102 103 case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage 104 105 case class KillDriver(driverId: String) extends DeployMessage 106 107 case class ApplicationFinished(id: String) 108 109 // Worker internal 110 111 case object WorkDirCleanup // Sent to Worker endpoint periodically for cleaning up app folders 112 113 case object ReregisterWithMaster // used when a worker attempts to reconnect to a master 114 115 // AppClient to Master 116 117 case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) 118 extends DeployMessage 119 120 case class UnregisterApplication(appId: String) 121 122 case class MasterChangeAcknowledged(appId: String) 123 124 case class RequestExecutors(appId: String, requestedTotal: Int) 125 126 case class KillExecutors(appId: String, executorIds: Seq[String]) 127 128 // Master to AppClient 129 130 case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage 131 132 // TODO(matei): replace hostPort with host 133 case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) { 134 Utils.checkHostPort(hostPort, "Required hostport") 135 } 136 137 case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], 138 exitStatus: Option[Int], workerLost: Boolean) 139 140 case class ApplicationRemoved(message: String) 141 142 // DriverClient <-> Master 143 144 case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage 145 146 case class SubmitDriverResponse( 147 master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String) 148 extends DeployMessage 149 150 case class RequestKillDriver(driverId: String) extends DeployMessage 151 152 case class KillDriverResponse( 153 master: RpcEndpointRef, driverId: String, success: Boolean, message: String) 154 extends DeployMessage 155 156 case class RequestDriverStatus(driverId: String) extends DeployMessage 157 158 case class DriverStatusResponse(found: Boolean, state: Option[DriverState], 159 workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception]) 160 161 // Internal message in AppClient 162 163 case object StopAppClient 164 165 // Master to Worker & AppClient 166 167 case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String) 168 169 // MasterWebUI To Master 170 171 case object RequestMasterState 172 173 // Master to MasterWebUI 174 175 case class MasterStateResponse( 176 host: String, 177 port: Int, 178 restPort: Option[Int], 179 workers: Array[WorkerInfo], 180 activeApps: Array[ApplicationInfo], 181 completedApps: Array[ApplicationInfo], 182 activeDrivers: Array[DriverInfo], 183 completedDrivers: Array[DriverInfo], 184 status: MasterState) { 185 186 Utils.checkHost(host, "Required hostname") 187 assert (port > 0) 188 189 def uri: String = "spark://" + host + ":" + port 190 def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" + p } 191 } 192 193 // WorkerWebUI to Worker 194 195 case object RequestWorkerState 196 197 // Worker to WorkerWebUI 198 199 case class WorkerStateResponse(host: String, port: Int, workerId: String, 200 executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], 201 drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String, 202 cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) { 203 204 Utils.checkHost(host, "Required hostname") 205 assert (port > 0) 206 } 207 208 // Liveness checks in various places 209 210 case object SendHeartbeat 211 212} 213