1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.deploy
19
20import java.net.{URI, URISyntaxException}
21
22import scala.annotation.tailrec
23import scala.collection.mutable.ListBuffer
24
25import org.apache.log4j.Level
26
27import org.apache.spark.util.{IntParam, MemoryParam, Utils}
28
29/**
30 * Command-line parser for the driver client.
31 */
32private[deploy] class ClientArguments(args: Array[String]) {
33  import ClientArguments._
34
35  var cmd: String = "" // 'launch' or 'kill'
36  var logLevel = Level.WARN
37
38  // launch parameters
39  var masters: Array[String] = null
40  var jarUrl: String = ""
41  var mainClass: String = ""
42  var supervise: Boolean = DEFAULT_SUPERVISE
43  var memory: Int = DEFAULT_MEMORY
44  var cores: Int = DEFAULT_CORES
45  private var _driverOptions = ListBuffer[String]()
46  def driverOptions: Seq[String] = _driverOptions.toSeq
47
48  // kill parameters
49  var driverId: String = ""
50
51  parse(args.toList)
52
53  @tailrec
54  private def parse(args: List[String]): Unit = args match {
55    case ("--cores" | "-c") :: IntParam(value) :: tail =>
56      cores = value
57      parse(tail)
58
59    case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
60      memory = value
61      parse(tail)
62
63    case ("--supervise" | "-s") :: tail =>
64      supervise = true
65      parse(tail)
66
67    case ("--help" | "-h") :: tail =>
68      printUsageAndExit(0)
69
70    case ("--verbose" | "-v") :: tail =>
71      logLevel = Level.INFO
72      parse(tail)
73
74    case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
75      cmd = "launch"
76
77      if (!ClientArguments.isValidJarUrl(_jarUrl)) {
78        // scalastyle:off println
79        println(s"Jar url '${_jarUrl}' is not in valid format.")
80        println(s"Must be a jar file path in URL format " +
81          "(e.g. hdfs://host:port/XX.jar, file:///XX.jar)")
82        // scalastyle:on println
83        printUsageAndExit(-1)
84      }
85
86      jarUrl = _jarUrl
87      masters = Utils.parseStandaloneMasterUrls(_master)
88      mainClass = _mainClass
89      _driverOptions ++= tail
90
91    case "kill" :: _master :: _driverId :: tail =>
92      cmd = "kill"
93      masters = Utils.parseStandaloneMasterUrls(_master)
94      driverId = _driverId
95
96    case _ =>
97      printUsageAndExit(1)
98  }
99
100  /**
101   * Print usage and exit JVM with the given exit code.
102   */
103  private def printUsageAndExit(exitCode: Int) {
104    // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
105    //       separately similar to in the YARN client.
106    val usage =
107     s"""
108      |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options]
109      |Usage: DriverClient kill <active-master> <driver-id>
110      |
111      |Options:
112      |   -c CORES, --cores CORES        Number of cores to request (default: $DEFAULT_CORES)
113      |   -m MEMORY, --memory MEMORY     Megabytes of memory to request (default: $DEFAULT_MEMORY)
114      |   -s, --supervise                Whether to restart the driver on failure
115      |                                  (default: $DEFAULT_SUPERVISE)
116      |   -v, --verbose                  Print more debugging output
117     """.stripMargin
118    // scalastyle:off println
119    System.err.println(usage)
120    // scalastyle:on println
121    System.exit(exitCode)
122  }
123}
124
125private[deploy] object ClientArguments {
126  val DEFAULT_CORES = 1
127  val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // MB
128  val DEFAULT_SUPERVISE = false
129
130  def isValidJarUrl(s: String): Boolean = {
131    try {
132      val uri = new URI(s)
133      uri.getScheme != null && uri.getPath != null && uri.getPath.endsWith(".jar")
134    } catch {
135      case _: URISyntaxException => false
136    }
137  }
138}
139