1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.deploy.rest
19
20import java.io.File
21import javax.servlet.http.HttpServletResponse
22
23import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
24import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
25import org.apache.spark.deploy.ClientArguments._
26import org.apache.spark.rpc.RpcEndpointRef
27import org.apache.spark.util.Utils
28
29/**
30 * A server that responds to requests submitted by the [[RestSubmissionClient]].
31 * This is intended to be embedded in the standalone Master and used in cluster mode only.
32 *
33 * This server responds with different HTTP codes depending on the situation:
34 *   200 OK - Request was processed successfully
35 *   400 BAD REQUEST - Request was malformed, not successfully validated, or of unexpected type
36 *   468 UNKNOWN PROTOCOL VERSION - Request specified a protocol this server does not understand
37 *   500 INTERNAL SERVER ERROR - Server throws an exception internally while processing the request
38 *
39 * The server always includes a JSON representation of the relevant [[SubmitRestProtocolResponse]]
40 * in the HTTP body. If an error occurs, however, the server will include an [[ErrorResponse]]
41 * instead of the one expected by the client. If the construction of this error response itself
42 * fails, the response will consist of an empty body with a response code that indicates internal
43 * server error.
44 *
45 * @param host the address this server should bind to
46 * @param requestedPort the port this server will attempt to bind to
47 * @param masterConf the conf used by the Master
48 * @param masterEndpoint reference to the Master endpoint to which requests can be sent
49 * @param masterUrl the URL of the Master new drivers will attempt to connect to
50 */
51private[deploy] class StandaloneRestServer(
52    host: String,
53    requestedPort: Int,
54    masterConf: SparkConf,
55    masterEndpoint: RpcEndpointRef,
56    masterUrl: String)
57  extends RestSubmissionServer(host, requestedPort, masterConf) {
58
59  protected override val submitRequestServlet =
60    new StandaloneSubmitRequestServlet(masterEndpoint, masterUrl, masterConf)
61  protected override val killRequestServlet =
62    new StandaloneKillRequestServlet(masterEndpoint, masterConf)
63  protected override val statusRequestServlet =
64    new StandaloneStatusRequestServlet(masterEndpoint, masterConf)
65}
66
67/**
68 * A servlet for handling kill requests passed to the [[StandaloneRestServer]].
69 */
70private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf)
71  extends KillRequestServlet {
72
73  protected def handleKill(submissionId: String): KillSubmissionResponse = {
74    val response = masterEndpoint.askWithRetry[DeployMessages.KillDriverResponse](
75      DeployMessages.RequestKillDriver(submissionId))
76    val k = new KillSubmissionResponse
77    k.serverSparkVersion = sparkVersion
78    k.message = response.message
79    k.submissionId = submissionId
80    k.success = response.success
81    k
82  }
83}
84
85/**
86 * A servlet for handling status requests passed to the [[StandaloneRestServer]].
87 */
88private[rest] class StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf)
89  extends StatusRequestServlet {
90
91  protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
92    val response = masterEndpoint.askWithRetry[DeployMessages.DriverStatusResponse](
93      DeployMessages.RequestDriverStatus(submissionId))
94    val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
95    val d = new SubmissionStatusResponse
96    d.serverSparkVersion = sparkVersion
97    d.submissionId = submissionId
98    d.success = response.found
99    d.driverState = response.state.map(_.toString).orNull
100    d.workerId = response.workerId.orNull
101    d.workerHostPort = response.workerHostPort.orNull
102    d.message = message.orNull
103    d
104  }
105}
106
107/**
108 * A servlet for handling submit requests passed to the [[StandaloneRestServer]].
109 */
110private[rest] class StandaloneSubmitRequestServlet(
111    masterEndpoint: RpcEndpointRef,
112    masterUrl: String,
113    conf: SparkConf)
114  extends SubmitRequestServlet {
115
116  /**
117   * Build a driver description from the fields specified in the submit request.
118   *
119   * This involves constructing a command that takes into account memory, java options,
120   * classpath and other settings to launch the driver. This does not currently consider
121   * fields used by python applications since python is not supported in standalone
122   * cluster mode yet.
123   */
124  private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = {
125    // Required fields, including the main class because python is not yet supported
126    val appResource = Option(request.appResource).getOrElse {
127      throw new SubmitRestMissingFieldException("Application jar is missing.")
128    }
129    val mainClass = Option(request.mainClass).getOrElse {
130      throw new SubmitRestMissingFieldException("Main class is missing.")
131    }
132
133    // Optional fields
134    val sparkProperties = request.sparkProperties
135    val driverMemory = sparkProperties.get("spark.driver.memory")
136    val driverCores = sparkProperties.get("spark.driver.cores")
137    val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
138    val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
139    val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
140    val superviseDriver = sparkProperties.get("spark.driver.supervise")
141    val appArgs = request.appArgs
142    val environmentVariables = request.environmentVariables
143
144    // Construct driver description
145    val conf = new SparkConf(false)
146      .setAll(sparkProperties)
147      .set("spark.master", masterUrl)
148    val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
149    val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
150    val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
151    val sparkJavaOpts = Utils.sparkJavaOpts(conf)
152    val javaOpts = sparkJavaOpts ++ extraJavaOpts
153    val command = new Command(
154      "org.apache.spark.deploy.worker.DriverWrapper",
155      Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
156      environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
157    val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
158    val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
159    val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
160    new DriverDescription(
161      appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
162  }
163
164  /**
165   * Handle the submit request and construct an appropriate response to return to the client.
166   *
167   * This assumes that the request message is already successfully validated.
168   * If the request message is not of the expected type, return error to the client.
169   */
170  protected override def handleSubmit(
171      requestMessageJson: String,
172      requestMessage: SubmitRestProtocolMessage,
173      responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
174    requestMessage match {
175      case submitRequest: CreateSubmissionRequest =>
176        val driverDescription = buildDriverDescription(submitRequest)
177        val response = masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse](
178          DeployMessages.RequestSubmitDriver(driverDescription))
179        val submitResponse = new CreateSubmissionResponse
180        submitResponse.serverSparkVersion = sparkVersion
181        submitResponse.message = response.message
182        submitResponse.success = response.success
183        submitResponse.submissionId = response.driverId.orNull
184        val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
185        if (unknownFields.nonEmpty) {
186          // If there are fields that the server does not know about, warn the client
187          submitResponse.unknownFields = unknownFields
188        }
189        submitResponse
190      case unexpected =>
191        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
192        handleError(s"Received message of unexpected type ${unexpected.messageType}.")
193    }
194  }
195}
196