1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.deploy
19
20import scala.collection.immutable.List
21
22import org.apache.spark.deploy.ExecutorState.ExecutorState
23import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
24import org.apache.spark.deploy.master.DriverState.DriverState
25import org.apache.spark.deploy.master.RecoveryState.MasterState
26import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
27import org.apache.spark.rpc.RpcEndpointRef
28import org.apache.spark.util.Utils
29
30private[deploy] sealed trait DeployMessage extends Serializable
31
32/** Contains messages sent between Scheduler endpoint nodes. */
33private[deploy] object DeployMessages {
34
35  // Worker to Master
36
37  case class RegisterWorker(
38      id: String,
39      host: String,
40      port: Int,
41      worker: RpcEndpointRef,
42      cores: Int,
43      memory: Int,
44      workerWebUiUrl: String)
45    extends DeployMessage {
46    Utils.checkHost(host, "Required hostname")
47    assert (port > 0)
48  }
49
50  case class ExecutorStateChanged(
51      appId: String,
52      execId: Int,
53      state: ExecutorState,
54      message: Option[String],
55      exitStatus: Option[Int])
56    extends DeployMessage
57
58  case class DriverStateChanged(
59      driverId: String,
60      state: DriverState,
61      exception: Option[Exception])
62    extends DeployMessage
63
64  case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
65     driverIds: Seq[String])
66
67  /**
68   * A worker will send this message to the master when it registers with the master. Then the
69   * master will compare them with the executors and drivers in the master and tell the worker to
70   * kill the unknown executors and drivers.
71   */
72  case class WorkerLatestState(
73      id: String,
74      executors: Seq[ExecutorDescription],
75      driverIds: Seq[String]) extends DeployMessage
76
77  case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
78
79  // Master to Worker
80
81  sealed trait RegisterWorkerResponse
82
83  case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
84    with RegisterWorkerResponse
85
86  case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse
87
88  case object MasterInStandby extends DeployMessage with RegisterWorkerResponse
89
90  case class ReconnectWorker(masterUrl: String) extends DeployMessage
91
92  case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
93
94  case class LaunchExecutor(
95      masterUrl: String,
96      appId: String,
97      execId: Int,
98      appDesc: ApplicationDescription,
99      cores: Int,
100      memory: Int)
101    extends DeployMessage
102
103  case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage
104
105  case class KillDriver(driverId: String) extends DeployMessage
106
107  case class ApplicationFinished(id: String)
108
109  // Worker internal
110
111  case object WorkDirCleanup // Sent to Worker endpoint periodically for cleaning up app folders
112
113  case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
114
115  // AppClient to Master
116
117  case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
118    extends DeployMessage
119
120  case class UnregisterApplication(appId: String)
121
122  case class MasterChangeAcknowledged(appId: String)
123
124  case class RequestExecutors(appId: String, requestedTotal: Int)
125
126  case class KillExecutors(appId: String, executorIds: Seq[String])
127
128  // Master to AppClient
129
130  case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
131
132  // TODO(matei): replace hostPort with host
133  case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
134    Utils.checkHostPort(hostPort, "Required hostport")
135  }
136
137  case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
138    exitStatus: Option[Int], workerLost: Boolean)
139
140  case class ApplicationRemoved(message: String)
141
142  // DriverClient <-> Master
143
144  case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
145
146  case class SubmitDriverResponse(
147      master: RpcEndpointRef, success: Boolean, driverId: Option[String], message: String)
148    extends DeployMessage
149
150  case class RequestKillDriver(driverId: String) extends DeployMessage
151
152  case class KillDriverResponse(
153      master: RpcEndpointRef, driverId: String, success: Boolean, message: String)
154    extends DeployMessage
155
156  case class RequestDriverStatus(driverId: String) extends DeployMessage
157
158  case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
159    workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])
160
161  // Internal message in AppClient
162
163  case object StopAppClient
164
165  // Master to Worker & AppClient
166
167  case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
168
169  // MasterWebUI To Master
170
171  case object RequestMasterState
172
173  // Master to MasterWebUI
174
175  case class MasterStateResponse(
176      host: String,
177      port: Int,
178      restPort: Option[Int],
179      workers: Array[WorkerInfo],
180      activeApps: Array[ApplicationInfo],
181      completedApps: Array[ApplicationInfo],
182      activeDrivers: Array[DriverInfo],
183      completedDrivers: Array[DriverInfo],
184      status: MasterState) {
185
186    Utils.checkHost(host, "Required hostname")
187    assert (port > 0)
188
189    def uri: String = "spark://" + host + ":" + port
190    def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" + p }
191  }
192
193  //  WorkerWebUI to Worker
194
195  case object RequestWorkerState
196
197  // Worker to WorkerWebUI
198
199  case class WorkerStateResponse(host: String, port: Int, workerId: String,
200    executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner],
201    drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String,
202    cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
203
204    Utils.checkHost(host, "Required hostname")
205    assert (port > 0)
206  }
207
208  // Liveness checks in various places
209
210  case object SendHeartbeat
211
212}
213