1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14* See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.deploy.rest 19 20import java.io.File 21import javax.servlet.http.HttpServletResponse 22 23import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} 24import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} 25import org.apache.spark.deploy.ClientArguments._ 26import org.apache.spark.rpc.RpcEndpointRef 27import org.apache.spark.util.Utils 28 29/** 30 * A server that responds to requests submitted by the [[RestSubmissionClient]]. 31 * This is intended to be embedded in the standalone Master and used in cluster mode only. 32 * 33 * This server responds with different HTTP codes depending on the situation: 34 * 200 OK - Request was processed successfully 35 * 400 BAD REQUEST - Request was malformed, not successfully validated, or of unexpected type 36 * 468 UNKNOWN PROTOCOL VERSION - Request specified a protocol this server does not understand 37 * 500 INTERNAL SERVER ERROR - Server throws an exception internally while processing the request 38 * 39 * The server always includes a JSON representation of the relevant [[SubmitRestProtocolResponse]] 40 * in the HTTP body. If an error occurs, however, the server will include an [[ErrorResponse]] 41 * instead of the one expected by the client. If the construction of this error response itself 42 * fails, the response will consist of an empty body with a response code that indicates internal 43 * server error. 44 * 45 * @param host the address this server should bind to 46 * @param requestedPort the port this server will attempt to bind to 47 * @param masterConf the conf used by the Master 48 * @param masterEndpoint reference to the Master endpoint to which requests can be sent 49 * @param masterUrl the URL of the Master new drivers will attempt to connect to 50 */ 51private[deploy] class StandaloneRestServer( 52 host: String, 53 requestedPort: Int, 54 masterConf: SparkConf, 55 masterEndpoint: RpcEndpointRef, 56 masterUrl: String) 57 extends RestSubmissionServer(host, requestedPort, masterConf) { 58 59 protected override val submitRequestServlet = 60 new StandaloneSubmitRequestServlet(masterEndpoint, masterUrl, masterConf) 61 protected override val killRequestServlet = 62 new StandaloneKillRequestServlet(masterEndpoint, masterConf) 63 protected override val statusRequestServlet = 64 new StandaloneStatusRequestServlet(masterEndpoint, masterConf) 65} 66 67/** 68 * A servlet for handling kill requests passed to the [[StandaloneRestServer]]. 69 */ 70private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf) 71 extends KillRequestServlet { 72 73 protected def handleKill(submissionId: String): KillSubmissionResponse = { 74 val response = masterEndpoint.askWithRetry[DeployMessages.KillDriverResponse]( 75 DeployMessages.RequestKillDriver(submissionId)) 76 val k = new KillSubmissionResponse 77 k.serverSparkVersion = sparkVersion 78 k.message = response.message 79 k.submissionId = submissionId 80 k.success = response.success 81 k 82 } 83} 84 85/** 86 * A servlet for handling status requests passed to the [[StandaloneRestServer]]. 87 */ 88private[rest] class StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRef, conf: SparkConf) 89 extends StatusRequestServlet { 90 91 protected def handleStatus(submissionId: String): SubmissionStatusResponse = { 92 val response = masterEndpoint.askWithRetry[DeployMessages.DriverStatusResponse]( 93 DeployMessages.RequestDriverStatus(submissionId)) 94 val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) } 95 val d = new SubmissionStatusResponse 96 d.serverSparkVersion = sparkVersion 97 d.submissionId = submissionId 98 d.success = response.found 99 d.driverState = response.state.map(_.toString).orNull 100 d.workerId = response.workerId.orNull 101 d.workerHostPort = response.workerHostPort.orNull 102 d.message = message.orNull 103 d 104 } 105} 106 107/** 108 * A servlet for handling submit requests passed to the [[StandaloneRestServer]]. 109 */ 110private[rest] class StandaloneSubmitRequestServlet( 111 masterEndpoint: RpcEndpointRef, 112 masterUrl: String, 113 conf: SparkConf) 114 extends SubmitRequestServlet { 115 116 /** 117 * Build a driver description from the fields specified in the submit request. 118 * 119 * This involves constructing a command that takes into account memory, java options, 120 * classpath and other settings to launch the driver. This does not currently consider 121 * fields used by python applications since python is not supported in standalone 122 * cluster mode yet. 123 */ 124 private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = { 125 // Required fields, including the main class because python is not yet supported 126 val appResource = Option(request.appResource).getOrElse { 127 throw new SubmitRestMissingFieldException("Application jar is missing.") 128 } 129 val mainClass = Option(request.mainClass).getOrElse { 130 throw new SubmitRestMissingFieldException("Main class is missing.") 131 } 132 133 // Optional fields 134 val sparkProperties = request.sparkProperties 135 val driverMemory = sparkProperties.get("spark.driver.memory") 136 val driverCores = sparkProperties.get("spark.driver.cores") 137 val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") 138 val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") 139 val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") 140 val superviseDriver = sparkProperties.get("spark.driver.supervise") 141 val appArgs = request.appArgs 142 val environmentVariables = request.environmentVariables 143 144 // Construct driver description 145 val conf = new SparkConf(false) 146 .setAll(sparkProperties) 147 .set("spark.master", masterUrl) 148 val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) 149 val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) 150 val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) 151 val sparkJavaOpts = Utils.sparkJavaOpts(conf) 152 val javaOpts = sparkJavaOpts ++ extraJavaOpts 153 val command = new Command( 154 "org.apache.spark.deploy.worker.DriverWrapper", 155 Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper 156 environmentVariables, extraClassPath, extraLibraryPath, javaOpts) 157 val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) 158 val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES) 159 val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) 160 new DriverDescription( 161 appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command) 162 } 163 164 /** 165 * Handle the submit request and construct an appropriate response to return to the client. 166 * 167 * This assumes that the request message is already successfully validated. 168 * If the request message is not of the expected type, return error to the client. 169 */ 170 protected override def handleSubmit( 171 requestMessageJson: String, 172 requestMessage: SubmitRestProtocolMessage, 173 responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { 174 requestMessage match { 175 case submitRequest: CreateSubmissionRequest => 176 val driverDescription = buildDriverDescription(submitRequest) 177 val response = masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse]( 178 DeployMessages.RequestSubmitDriver(driverDescription)) 179 val submitResponse = new CreateSubmissionResponse 180 submitResponse.serverSparkVersion = sparkVersion 181 submitResponse.message = response.message 182 submitResponse.success = response.success 183 submitResponse.submissionId = response.driverId.orNull 184 val unknownFields = findUnknownFields(requestMessageJson, requestMessage) 185 if (unknownFields.nonEmpty) { 186 // If there are fields that the server does not know about, warn the client 187 submitResponse.unknownFields = unknownFields 188 } 189 submitResponse 190 case unexpected => 191 responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) 192 handleError(s"Received message of unexpected type ${unexpected.messageType}.") 193 } 194 } 195} 196