1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.launcher; 19 20 import java.io.File; 21 import java.io.IOException; 22 import java.util.*; 23 24 import static org.apache.spark.launcher.CommandBuilderUtils.*; 25 26 /** 27 * Special command builder for handling a CLI invocation of SparkSubmit. 28 * <p> 29 * This builder adds command line parsing compatible with SparkSubmit. It handles setting 30 * driver-side options and special parsing behavior needed for the special-casing certain internal 31 * Spark applications. 32 * <p> 33 * This class has also some special features to aid launching shells (pyspark and sparkR) and also 34 * examples. 35 */ 36 class SparkSubmitCommandBuilder extends AbstractCommandBuilder { 37 38 /** 39 * Name of the app resource used to identify the PySpark shell. The command line parser expects 40 * the resource name to be the very first argument to spark-submit in this case. 41 * 42 * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit 43 * (see java_gateway.py), and can cause this code to enter into an infinite loop. 44 */ 45 static final String PYSPARK_SHELL = "pyspark-shell-main"; 46 47 /** 48 * This is the actual resource name that identifies the PySpark shell to SparkSubmit. 49 */ 50 static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell"; 51 52 /** 53 * Name of the app resource used to identify the SparkR shell. The command line parser expects 54 * the resource name to be the very first argument to spark-submit in this case. 55 * 56 * NOTE: this cannot be "sparkr-shell" since that identifies the SparkR shell to SparkSubmit 57 * (see sparkR.R), and can cause this code to enter into an infinite loop. 58 */ 59 static final String SPARKR_SHELL = "sparkr-shell-main"; 60 61 /** 62 * This is the actual resource name that identifies the SparkR shell to SparkSubmit. 63 */ 64 static final String SPARKR_SHELL_RESOURCE = "sparkr-shell"; 65 66 /** 67 * Name of app resource used to identify examples. When running examples, args[0] should be 68 * this name. The app resource will identify the example class to run. 69 */ 70 static final String RUN_EXAMPLE = "run-example"; 71 72 /** 73 * Prefix for example class names. 74 */ 75 static final String EXAMPLE_CLASS_PREFIX = "org.apache.spark.examples."; 76 77 /** 78 * This map must match the class names for available special classes, since this modifies the way 79 * command line parsing works. This maps the class name to the resource to use when calling 80 * spark-submit. 81 */ 82 private static final Map<String, String> specialClasses = new HashMap<>(); 83 static { 84 specialClasses.put("org.apache.spark.repl.Main", "spark-shell"); 85 specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver", 86 SparkLauncher.NO_RESOURCE); 87 specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", 88 SparkLauncher.NO_RESOURCE); 89 } 90 91 final List<String> sparkArgs; 92 private final boolean isAppResourceReq; 93 private final boolean isExample; 94 95 /** 96 * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed 97 * to parse the command lines for things like bin/spark-shell, which allows users to mix and 98 * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo"). 99 */ 100 private boolean allowsMixedArguments; 101 SparkSubmitCommandBuilder()102 SparkSubmitCommandBuilder() { 103 this.sparkArgs = new ArrayList<>(); 104 this.isAppResourceReq = true; 105 this.isExample = false; 106 } 107 SparkSubmitCommandBuilder(List<String> args)108 SparkSubmitCommandBuilder(List<String> args) { 109 this.allowsMixedArguments = false; 110 this.sparkArgs = new ArrayList<>(); 111 boolean isExample = false; 112 List<String> submitArgs = args; 113 114 if (args.size() > 0) { 115 switch (args.get(0)) { 116 case PYSPARK_SHELL: 117 this.allowsMixedArguments = true; 118 appResource = PYSPARK_SHELL; 119 submitArgs = args.subList(1, args.size()); 120 break; 121 122 case SPARKR_SHELL: 123 this.allowsMixedArguments = true; 124 appResource = SPARKR_SHELL; 125 submitArgs = args.subList(1, args.size()); 126 break; 127 128 case RUN_EXAMPLE: 129 isExample = true; 130 submitArgs = args.subList(1, args.size()); 131 } 132 133 this.isExample = isExample; 134 OptionParser parser = new OptionParser(); 135 parser.parse(submitArgs); 136 this.isAppResourceReq = parser.isAppResourceReq; 137 } else { 138 this.isExample = isExample; 139 this.isAppResourceReq = false; 140 } 141 } 142 143 @Override buildCommand(Map<String, String> env)144 public List<String> buildCommand(Map<String, String> env) 145 throws IOException, IllegalArgumentException { 146 if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) { 147 return buildPySparkShellCommand(env); 148 } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) { 149 return buildSparkRCommand(env); 150 } else { 151 return buildSparkSubmitCommand(env); 152 } 153 } 154 buildSparkSubmitArgs()155 List<String> buildSparkSubmitArgs() { 156 List<String> args = new ArrayList<>(); 157 SparkSubmitOptionParser parser = new SparkSubmitOptionParser(); 158 159 if (!allowsMixedArguments && isAppResourceReq) { 160 checkArgument(appResource != null, "Missing application resource."); 161 } 162 163 if (verbose) { 164 args.add(parser.VERBOSE); 165 } 166 167 if (master != null) { 168 args.add(parser.MASTER); 169 args.add(master); 170 } 171 172 if (deployMode != null) { 173 args.add(parser.DEPLOY_MODE); 174 args.add(deployMode); 175 } 176 177 if (appName != null) { 178 args.add(parser.NAME); 179 args.add(appName); 180 } 181 182 for (Map.Entry<String, String> e : conf.entrySet()) { 183 args.add(parser.CONF); 184 args.add(String.format("%s=%s", e.getKey(), e.getValue())); 185 } 186 187 if (propertiesFile != null) { 188 args.add(parser.PROPERTIES_FILE); 189 args.add(propertiesFile); 190 } 191 192 if (isExample) { 193 jars.addAll(findExamplesJars()); 194 } 195 196 if (!jars.isEmpty()) { 197 args.add(parser.JARS); 198 args.add(join(",", jars)); 199 } 200 201 if (!files.isEmpty()) { 202 args.add(parser.FILES); 203 args.add(join(",", files)); 204 } 205 206 if (!pyFiles.isEmpty()) { 207 args.add(parser.PY_FILES); 208 args.add(join(",", pyFiles)); 209 } 210 211 if (isAppResourceReq) { 212 checkArgument(!isExample || mainClass != null, "Missing example class name."); 213 } 214 if (mainClass != null) { 215 args.add(parser.CLASS); 216 args.add(mainClass); 217 } 218 219 args.addAll(sparkArgs); 220 if (appResource != null) { 221 args.add(appResource); 222 } 223 args.addAll(appArgs); 224 225 return args; 226 } 227 buildSparkSubmitCommand(Map<String, String> env)228 private List<String> buildSparkSubmitCommand(Map<String, String> env) 229 throws IOException, IllegalArgumentException { 230 // Load the properties file and check whether spark-submit will be running the app's driver 231 // or just launching a cluster app. When running the driver, the JVM's argument will be 232 // modified to cover the driver's configuration. 233 Map<String, String> config = getEffectiveConfig(); 234 boolean isClientMode = isClientMode(config); 235 String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null; 236 237 List<String> cmd = buildJavaCommand(extraClassPath); 238 // Take Thrift Server as daemon 239 if (isThriftServer(mainClass)) { 240 addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS")); 241 } 242 addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS")); 243 addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS")); 244 245 // We don't want the client to specify Xmx. These have to be set by their corresponding 246 // memory flag --driver-memory or configuration entry spark.driver.memory 247 String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); 248 if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) { 249 String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " + 250 "java options (was %s). Use the corresponding --driver-memory or " + 251 "spark.driver.memory configuration instead.", driverExtraJavaOptions); 252 throw new IllegalArgumentException(msg); 253 } 254 255 if (isClientMode) { 256 // Figuring out where the memory value come from is a little tricky due to precedence. 257 // Precedence is observed in the following order: 258 // - explicit configuration (setConf()), which also covers --driver-memory cli argument. 259 // - properties file. 260 // - SPARK_DRIVER_MEMORY env variable 261 // - SPARK_MEM env variable 262 // - default value (1g) 263 // Take Thrift Server as daemon 264 String tsMemory = 265 isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null; 266 String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY), 267 System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); 268 cmd.add("-Xmx" + memory); 269 addOptionString(cmd, driverExtraJavaOptions); 270 mergeEnvPathList(env, getLibPathEnvName(), 271 config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH)); 272 } 273 274 addPermGenSizeOpt(cmd); 275 cmd.add("org.apache.spark.deploy.SparkSubmit"); 276 cmd.addAll(buildSparkSubmitArgs()); 277 return cmd; 278 } 279 buildPySparkShellCommand(Map<String, String> env)280 private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException { 281 // For backwards compatibility, if a script is specified in 282 // the pyspark command line, then run it using spark-submit. 283 if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) { 284 System.err.println( 285 "Running python applications through 'pyspark' is not supported as of Spark 2.0.\n" + 286 "Use ./bin/spark-submit <python file>"); 287 System.exit(-1); 288 } 289 290 checkArgument(appArgs.isEmpty(), "pyspark does not support any application options."); 291 292 // When launching the pyspark shell, the spark-submit arguments should be stored in the 293 // PYSPARK_SUBMIT_ARGS env variable. 294 appResource = PYSPARK_SHELL_RESOURCE; 295 constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS"); 296 297 // Will pick up the binary executable in the following order 298 // 1. conf spark.pyspark.driver.python 299 // 2. conf spark.pyspark.python 300 // 3. environment variable PYSPARK_DRIVER_PYTHON 301 // 4. environment variable PYSPARK_PYTHON 302 // 5. python 303 List<String> pyargs = new ArrayList<>(); 304 pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), 305 conf.get(SparkLauncher.PYSPARK_PYTHON), 306 System.getenv("PYSPARK_DRIVER_PYTHON"), 307 System.getenv("PYSPARK_PYTHON"), 308 "python")); 309 String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); 310 if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { 311 // pass conf spark.pyspark.python to python by environment variable. 312 env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); 313 } 314 if (!isEmpty(pyOpts)) { 315 pyargs.addAll(parseOptionString(pyOpts)); 316 } 317 318 return pyargs; 319 } 320 buildSparkRCommand(Map<String, String> env)321 private List<String> buildSparkRCommand(Map<String, String> env) throws IOException { 322 if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".R")) { 323 System.err.println( 324 "Running R applications through 'sparkR' is not supported as of Spark 2.0.\n" + 325 "Use ./bin/spark-submit <R file>"); 326 System.exit(-1); 327 } 328 // When launching the SparkR shell, store the spark-submit arguments in the SPARKR_SUBMIT_ARGS 329 // env variable. 330 appResource = SPARKR_SHELL_RESOURCE; 331 constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS"); 332 333 // Set shell.R as R_PROFILE_USER to load the SparkR package when the shell comes up. 334 String sparkHome = System.getenv("SPARK_HOME"); 335 env.put("R_PROFILE_USER", 336 join(File.separator, sparkHome, "R", "lib", "SparkR", "profile", "shell.R")); 337 338 List<String> args = new ArrayList<>(); 339 args.add(firstNonEmpty(conf.get(SparkLauncher.SPARKR_R_SHELL), 340 System.getenv("SPARKR_DRIVER_R"), "R")); 341 return args; 342 } 343 constructEnvVarArgs( Map<String, String> env, String submitArgsEnvVariable)344 private void constructEnvVarArgs( 345 Map<String, String> env, 346 String submitArgsEnvVariable) throws IOException { 347 mergeEnvPathList(env, getLibPathEnvName(), 348 getEffectiveConfig().get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH)); 349 350 StringBuilder submitArgs = new StringBuilder(); 351 for (String arg : buildSparkSubmitArgs()) { 352 if (submitArgs.length() > 0) { 353 submitArgs.append(" "); 354 } 355 submitArgs.append(quoteForCommandString(arg)); 356 } 357 env.put(submitArgsEnvVariable, submitArgs.toString()); 358 } 359 isClientMode(Map<String, String> userProps)360 private boolean isClientMode(Map<String, String> userProps) { 361 String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER)); 362 String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE)); 363 // Default master is "local[*]", so assume client mode in that case 364 return userMaster == null || 365 "client".equals(userDeployMode) || 366 (!userMaster.equals("yarn-cluster") && userDeployMode == null); 367 } 368 369 /** 370 * Return whether the given main class represents a thrift server. 371 */ isThriftServer(String mainClass)372 private boolean isThriftServer(String mainClass) { 373 return (mainClass != null && 374 mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2")); 375 } 376 findExamplesJars()377 private List<String> findExamplesJars() { 378 boolean isTesting = "1".equals(getenv("SPARK_TESTING")); 379 List<String> examplesJars = new ArrayList<>(); 380 String sparkHome = getSparkHome(); 381 382 File jarsDir; 383 if (new File(sparkHome, "RELEASE").isFile()) { 384 jarsDir = new File(sparkHome, "examples/jars"); 385 } else { 386 jarsDir = new File(sparkHome, 387 String.format("examples/target/scala-%s/jars", getScalaVersion())); 388 } 389 390 boolean foundDir = jarsDir.isDirectory(); 391 checkState(isTesting || foundDir, "Examples jars directory '%s' does not exist.", 392 jarsDir.getAbsolutePath()); 393 394 if (foundDir) { 395 for (File f: jarsDir.listFiles()) { 396 examplesJars.add(f.getAbsolutePath()); 397 } 398 } 399 return examplesJars; 400 } 401 402 private class OptionParser extends SparkSubmitOptionParser { 403 404 boolean isAppResourceReq = true; 405 406 @Override handle(String opt, String value)407 protected boolean handle(String opt, String value) { 408 if (opt.equals(MASTER)) { 409 master = value; 410 } else if (opt.equals(DEPLOY_MODE)) { 411 deployMode = value; 412 } else if (opt.equals(PROPERTIES_FILE)) { 413 propertiesFile = value; 414 } else if (opt.equals(DRIVER_MEMORY)) { 415 conf.put(SparkLauncher.DRIVER_MEMORY, value); 416 } else if (opt.equals(DRIVER_JAVA_OPTIONS)) { 417 conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); 418 } else if (opt.equals(DRIVER_LIBRARY_PATH)) { 419 conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); 420 } else if (opt.equals(DRIVER_CLASS_PATH)) { 421 conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); 422 } else if (opt.equals(CONF)) { 423 String[] setConf = value.split("=", 2); 424 checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); 425 conf.put(setConf[0], setConf[1]); 426 } else if (opt.equals(CLASS)) { 427 // The special classes require some special command line handling, since they allow 428 // mixing spark-submit arguments with arguments that should be propagated to the shell 429 // itself. Note that for this to work, the "--class" argument must come before any 430 // non-spark-submit arguments. 431 mainClass = value; 432 if (specialClasses.containsKey(value)) { 433 allowsMixedArguments = true; 434 appResource = specialClasses.get(value); 435 } 436 } else if (opt.equals(KILL_SUBMISSION) || opt.equals(STATUS)) { 437 isAppResourceReq = false; 438 sparkArgs.add(opt); 439 sparkArgs.add(value); 440 } else if (opt.equals(HELP) || opt.equals(USAGE_ERROR)) { 441 isAppResourceReq = false; 442 sparkArgs.add(opt); 443 } else if (opt.equals(VERSION)) { 444 isAppResourceReq = false; 445 sparkArgs.add(opt); 446 } else { 447 sparkArgs.add(opt); 448 if (value != null) { 449 sparkArgs.add(value); 450 } 451 } 452 return true; 453 } 454 455 @Override handleUnknown(String opt)456 protected boolean handleUnknown(String opt) { 457 // When mixing arguments, add unrecognized parameters directly to the user arguments list. In 458 // normal mode, any unrecognized parameter triggers the end of command line parsing, and the 459 // parameter itself will be interpreted by SparkSubmit as the application resource. The 460 // remaining params will be appended to the list of SparkSubmit arguments. 461 if (allowsMixedArguments) { 462 appArgs.add(opt); 463 return true; 464 } else if (isExample) { 465 String className = opt; 466 if (!className.startsWith(EXAMPLE_CLASS_PREFIX)) { 467 className = EXAMPLE_CLASS_PREFIX + className; 468 } 469 mainClass = className; 470 appResource = SparkLauncher.NO_RESOURCE; 471 return false; 472 } else { 473 checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt); 474 checkState(appResource == null, "Found unrecognized argument but resource is already set."); 475 appResource = opt; 476 return false; 477 } 478 } 479 480 @Override handleExtraArgs(List<String> extra)481 protected void handleExtraArgs(List<String> extra) { 482 appArgs.addAll(extra); 483 } 484 485 } 486 487 } 488