1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.launcher; 19 20 import java.io.File; 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.Arrays; 24 import java.util.HashMap; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.concurrent.ThreadFactory; 28 import java.util.concurrent.atomic.AtomicInteger; 29 30 import static org.apache.spark.launcher.CommandBuilderUtils.*; 31 32 /** 33 * Launcher for Spark applications. 34 * <p> 35 * Use this class to start Spark applications programmatically. The class uses a builder pattern 36 * to allow clients to configure the Spark application and launch it as a child process. 37 * </p> 38 */ 39 public class SparkLauncher { 40 41 /** The Spark master. */ 42 public static final String SPARK_MASTER = "spark.master"; 43 44 /** The Spark deploy mode. */ 45 public static final String DEPLOY_MODE = "spark.submit.deployMode"; 46 47 /** Configuration key for the driver memory. */ 48 public static final String DRIVER_MEMORY = "spark.driver.memory"; 49 /** Configuration key for the driver class path. */ 50 public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; 51 /** Configuration key for the driver VM options. */ 52 public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions"; 53 /** Configuration key for the driver native library path. */ 54 public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath"; 55 56 /** Configuration key for the executor memory. */ 57 public static final String EXECUTOR_MEMORY = "spark.executor.memory"; 58 /** Configuration key for the executor class path. */ 59 public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; 60 /** Configuration key for the executor VM options. */ 61 public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"; 62 /** Configuration key for the executor native library path. */ 63 public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath"; 64 /** Configuration key for the number of executor CPU cores. */ 65 public static final String EXECUTOR_CORES = "spark.executor.cores"; 66 67 static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python"; 68 69 static final String PYSPARK_PYTHON = "spark.pyspark.python"; 70 71 static final String SPARKR_R_SHELL = "spark.r.shell.command"; 72 73 /** Logger name to use when launching a child process. */ 74 public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName"; 75 76 /** 77 * A special value for the resource that tells Spark to not try to process the app resource as a 78 * file. This is useful when the class being executed is added to the application using other 79 * means - for example, by adding jars using the package download feature. 80 */ 81 public static final String NO_RESOURCE = "spark-internal"; 82 83 /** 84 * Maximum time (in ms) to wait for a child process to connect back to the launcher server 85 * when using @link{#start()}. 86 */ 87 public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout"; 88 89 /** Used internally to create unique logger names. */ 90 private static final AtomicInteger COUNTER = new AtomicInteger(); 91 92 /** Factory for creating OutputRedirector threads. **/ 93 static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d"); 94 95 static final Map<String, String> launcherConfig = new HashMap<>(); 96 97 /** 98 * Set a configuration value for the launcher library. These config values do not affect the 99 * launched application, but rather the behavior of the launcher library itself when managing 100 * applications. 101 * 102 * @since 1.6.0 103 * @param name Config name. 104 * @param value Config value. 105 */ setConfig(String name, String value)106 public static void setConfig(String name, String value) { 107 launcherConfig.put(name, value); 108 } 109 110 // Visible for testing. 111 final SparkSubmitCommandBuilder builder; 112 File workingDir; 113 boolean redirectToLog; 114 boolean redirectErrorStream; 115 ProcessBuilder.Redirect errorStream; 116 ProcessBuilder.Redirect outputStream; 117 SparkLauncher()118 public SparkLauncher() { 119 this(null); 120 } 121 122 /** 123 * Creates a launcher that will set the given environment variables in the child. 124 * 125 * @param env Environment variables to set. 126 */ SparkLauncher(Map<String, String> env)127 public SparkLauncher(Map<String, String> env) { 128 this.builder = new SparkSubmitCommandBuilder(); 129 if (env != null) { 130 this.builder.childEnv.putAll(env); 131 } 132 } 133 134 /** 135 * Set a custom JAVA_HOME for launching the Spark application. 136 * 137 * @param javaHome Path to the JAVA_HOME to use. 138 * @return This launcher. 139 */ setJavaHome(String javaHome)140 public SparkLauncher setJavaHome(String javaHome) { 141 checkNotNull(javaHome, "javaHome"); 142 builder.javaHome = javaHome; 143 return this; 144 } 145 146 /** 147 * Set a custom Spark installation location for the application. 148 * 149 * @param sparkHome Path to the Spark installation to use. 150 * @return This launcher. 151 */ setSparkHome(String sparkHome)152 public SparkLauncher setSparkHome(String sparkHome) { 153 checkNotNull(sparkHome, "sparkHome"); 154 builder.childEnv.put(ENV_SPARK_HOME, sparkHome); 155 return this; 156 } 157 158 /** 159 * Set a custom properties file with Spark configuration for the application. 160 * 161 * @param path Path to custom properties file to use. 162 * @return This launcher. 163 */ setPropertiesFile(String path)164 public SparkLauncher setPropertiesFile(String path) { 165 checkNotNull(path, "path"); 166 builder.setPropertiesFile(path); 167 return this; 168 } 169 170 /** 171 * Set a single configuration value for the application. 172 * 173 * @param key Configuration key. 174 * @param value The value to use. 175 * @return This launcher. 176 */ setConf(String key, String value)177 public SparkLauncher setConf(String key, String value) { 178 checkNotNull(key, "key"); 179 checkNotNull(value, "value"); 180 checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'"); 181 builder.conf.put(key, value); 182 return this; 183 } 184 185 /** 186 * Set the application name. 187 * 188 * @param appName Application name. 189 * @return This launcher. 190 */ setAppName(String appName)191 public SparkLauncher setAppName(String appName) { 192 checkNotNull(appName, "appName"); 193 builder.appName = appName; 194 return this; 195 } 196 197 /** 198 * Set the Spark master for the application. 199 * 200 * @param master Spark master. 201 * @return This launcher. 202 */ setMaster(String master)203 public SparkLauncher setMaster(String master) { 204 checkNotNull(master, "master"); 205 builder.master = master; 206 return this; 207 } 208 209 /** 210 * Set the deploy mode for the application. 211 * 212 * @param mode Deploy mode. 213 * @return This launcher. 214 */ setDeployMode(String mode)215 public SparkLauncher setDeployMode(String mode) { 216 checkNotNull(mode, "mode"); 217 builder.deployMode = mode; 218 return this; 219 } 220 221 /** 222 * Set the main application resource. This should be the location of a jar file for Scala/Java 223 * applications, or a python script for PySpark applications. 224 * 225 * @param resource Path to the main application resource. 226 * @return This launcher. 227 */ setAppResource(String resource)228 public SparkLauncher setAppResource(String resource) { 229 checkNotNull(resource, "resource"); 230 builder.appResource = resource; 231 return this; 232 } 233 234 /** 235 * Sets the application class name for Java/Scala applications. 236 * 237 * @param mainClass Application's main class. 238 * @return This launcher. 239 */ setMainClass(String mainClass)240 public SparkLauncher setMainClass(String mainClass) { 241 checkNotNull(mainClass, "mainClass"); 242 builder.mainClass = mainClass; 243 return this; 244 } 245 246 /** 247 * Adds a no-value argument to the Spark invocation. If the argument is known, this method 248 * validates whether the argument is indeed a no-value argument, and throws an exception 249 * otherwise. 250 * <p> 251 * Use this method with caution. It is possible to create an invalid Spark command by passing 252 * unknown arguments to this method, since those are allowed for forward compatibility. 253 * 254 * @since 1.5.0 255 * @param arg Argument to add. 256 * @return This launcher. 257 */ addSparkArg(String arg)258 public SparkLauncher addSparkArg(String arg) { 259 SparkSubmitOptionParser validator = new ArgumentValidator(false); 260 validator.parse(Arrays.asList(arg)); 261 builder.sparkArgs.add(arg); 262 return this; 263 } 264 265 /** 266 * Adds an argument with a value to the Spark invocation. If the argument name corresponds to 267 * a known argument, the code validates that the argument actually expects a value, and throws 268 * an exception otherwise. 269 * <p> 270 * It is safe to add arguments modified by other methods in this class (such as 271 * {@link #setMaster(String)} - the last invocation will be the one to take effect. 272 * <p> 273 * Use this method with caution. It is possible to create an invalid Spark command by passing 274 * unknown arguments to this method, since those are allowed for forward compatibility. 275 * 276 * @since 1.5.0 277 * @param name Name of argument to add. 278 * @param value Value of the argument. 279 * @return This launcher. 280 */ addSparkArg(String name, String value)281 public SparkLauncher addSparkArg(String name, String value) { 282 SparkSubmitOptionParser validator = new ArgumentValidator(true); 283 if (validator.MASTER.equals(name)) { 284 setMaster(value); 285 } else if (validator.PROPERTIES_FILE.equals(name)) { 286 setPropertiesFile(value); 287 } else if (validator.CONF.equals(name)) { 288 String[] vals = value.split("=", 2); 289 setConf(vals[0], vals[1]); 290 } else if (validator.CLASS.equals(name)) { 291 setMainClass(value); 292 } else if (validator.JARS.equals(name)) { 293 builder.jars.clear(); 294 for (String jar : value.split(",")) { 295 addJar(jar); 296 } 297 } else if (validator.FILES.equals(name)) { 298 builder.files.clear(); 299 for (String file : value.split(",")) { 300 addFile(file); 301 } 302 } else if (validator.PY_FILES.equals(name)) { 303 builder.pyFiles.clear(); 304 for (String file : value.split(",")) { 305 addPyFile(file); 306 } 307 } else { 308 validator.parse(Arrays.asList(name, value)); 309 builder.sparkArgs.add(name); 310 builder.sparkArgs.add(value); 311 } 312 return this; 313 } 314 315 /** 316 * Adds command line arguments for the application. 317 * 318 * @param args Arguments to pass to the application's main class. 319 * @return This launcher. 320 */ addAppArgs(String... args)321 public SparkLauncher addAppArgs(String... args) { 322 for (String arg : args) { 323 checkNotNull(arg, "arg"); 324 builder.appArgs.add(arg); 325 } 326 return this; 327 } 328 329 /** 330 * Adds a jar file to be submitted with the application. 331 * 332 * @param jar Path to the jar file. 333 * @return This launcher. 334 */ addJar(String jar)335 public SparkLauncher addJar(String jar) { 336 checkNotNull(jar, "jar"); 337 builder.jars.add(jar); 338 return this; 339 } 340 341 /** 342 * Adds a file to be submitted with the application. 343 * 344 * @param file Path to the file. 345 * @return This launcher. 346 */ addFile(String file)347 public SparkLauncher addFile(String file) { 348 checkNotNull(file, "file"); 349 builder.files.add(file); 350 return this; 351 } 352 353 /** 354 * Adds a python file / zip / egg to be submitted with the application. 355 * 356 * @param file Path to the file. 357 * @return This launcher. 358 */ addPyFile(String file)359 public SparkLauncher addPyFile(String file) { 360 checkNotNull(file, "file"); 361 builder.pyFiles.add(file); 362 return this; 363 } 364 365 /** 366 * Enables verbose reporting for SparkSubmit. 367 * 368 * @param verbose Whether to enable verbose output. 369 * @return This launcher. 370 */ setVerbose(boolean verbose)371 public SparkLauncher setVerbose(boolean verbose) { 372 builder.verbose = verbose; 373 return this; 374 } 375 376 /** 377 * Sets the working directory of spark-submit. 378 * 379 * @param dir The directory to set as spark-submit's working directory. 380 * @return This launcher. 381 */ directory(File dir)382 public SparkLauncher directory(File dir) { 383 workingDir = dir; 384 return this; 385 } 386 387 /** 388 * Specifies that stderr in spark-submit should be redirected to stdout. 389 * 390 * @return This launcher. 391 */ redirectError()392 public SparkLauncher redirectError() { 393 redirectErrorStream = true; 394 return this; 395 } 396 397 /** 398 * Redirects error output to the specified Redirect. 399 * 400 * @param to The method of redirection. 401 * @return This launcher. 402 */ redirectError(ProcessBuilder.Redirect to)403 public SparkLauncher redirectError(ProcessBuilder.Redirect to) { 404 errorStream = to; 405 return this; 406 } 407 408 /** 409 * Redirects standard output to the specified Redirect. 410 * 411 * @param to The method of redirection. 412 * @return This launcher. 413 */ redirectOutput(ProcessBuilder.Redirect to)414 public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) { 415 outputStream = to; 416 return this; 417 } 418 419 /** 420 * Redirects error output to the specified File. 421 * 422 * @param errFile The file to which stderr is written. 423 * @return This launcher. 424 */ redirectError(File errFile)425 public SparkLauncher redirectError(File errFile) { 426 errorStream = ProcessBuilder.Redirect.to(errFile); 427 return this; 428 } 429 430 /** 431 * Redirects error output to the specified File. 432 * 433 * @param outFile The file to which stdout is written. 434 * @return This launcher. 435 */ redirectOutput(File outFile)436 public SparkLauncher redirectOutput(File outFile) { 437 outputStream = ProcessBuilder.Redirect.to(outFile); 438 return this; 439 } 440 441 /** 442 * Sets all output to be logged and redirected to a logger with the specified name. 443 * 444 * @param loggerName The name of the logger to log stdout and stderr. 445 * @return This launcher. 446 */ redirectToLog(String loggerName)447 public SparkLauncher redirectToLog(String loggerName) { 448 setConf(CHILD_PROCESS_LOGGER_NAME, loggerName); 449 redirectToLog = true; 450 return this; 451 } 452 453 /** 454 * Launches a sub-process that will start the configured Spark application. 455 * <p> 456 * The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching 457 * Spark, since it provides better control of the child application. 458 * 459 * @return A process handle for the Spark app. 460 */ launch()461 public Process launch() throws IOException { 462 Process childProc = createBuilder().start(); 463 if (redirectToLog) { 464 String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); 465 new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY); 466 } 467 return childProc; 468 } 469 470 /** 471 * Starts a Spark application. 472 * <p> 473 * This method returns a handle that provides information about the running application and can 474 * be used to do basic interaction with it. 475 * <p> 476 * The returned handle assumes that the application will instantiate a single SparkContext 477 * during its lifetime. Once that context reports a final state (one that indicates the 478 * SparkContext has stopped), the handle will not perform new state transitions, so anything 479 * that happens after that cannot be monitored. If the underlying application is launched as 480 * a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process. 481 * <p> 482 * Currently, all applications are launched as child processes. The child's stdout and stderr 483 * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection 484 * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be 485 * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that 486 * option is not set, the code will try to derive a name from the application's name or main 487 * class / script file. If those cannot be determined, an internal, unique name will be used. 488 * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more 489 * easily into the configuration of commonly-used logging systems. 490 * 491 * @since 1.6.0 492 * @param listeners Listeners to add to the handle before the app is launched. 493 * @return A handle for the launched application. 494 */ startApplication(SparkAppHandle.Listener... listeners)495 public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException { 496 ChildProcAppHandle handle = LauncherServer.newAppHandle(); 497 for (SparkAppHandle.Listener l : listeners) { 498 handle.addListener(l); 499 } 500 501 String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); 502 ProcessBuilder pb = createBuilder(); 503 // Only setup stderr + stdout to logger redirection if user has not otherwise configured output 504 // redirection. 505 if (loggerName == null) { 506 String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); 507 if (appName == null) { 508 if (builder.appName != null) { 509 appName = builder.appName; 510 } else if (builder.mainClass != null) { 511 int dot = builder.mainClass.lastIndexOf("."); 512 if (dot >= 0 && dot < builder.mainClass.length() - 1) { 513 appName = builder.mainClass.substring(dot + 1, builder.mainClass.length()); 514 } else { 515 appName = builder.mainClass; 516 } 517 } else if (builder.appResource != null) { 518 appName = new File(builder.appResource).getName(); 519 } else { 520 appName = String.valueOf(COUNTER.incrementAndGet()); 521 } 522 } 523 String loggerPrefix = getClass().getPackage().getName(); 524 loggerName = String.format("%s.app.%s", loggerPrefix, appName); 525 pb.redirectErrorStream(true); 526 } 527 528 pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, 529 String.valueOf(LauncherServer.getServerInstance().getPort())); 530 pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret()); 531 try { 532 handle.setChildProc(pb.start(), loggerName); 533 } catch (IOException ioe) { 534 handle.kill(); 535 throw ioe; 536 } 537 538 return handle; 539 } 540 createBuilder()541 private ProcessBuilder createBuilder() { 542 List<String> cmd = new ArrayList<>(); 543 String script = isWindows() ? "spark-submit.cmd" : "spark-submit"; 544 cmd.add(join(File.separator, builder.getSparkHome(), "bin", script)); 545 cmd.addAll(builder.buildSparkSubmitArgs()); 546 547 // Since the child process is a batch script, let's quote things so that special characters are 548 // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are 549 // weird. 550 if (isWindows()) { 551 List<String> winCmd = new ArrayList<>(); 552 for (String arg : cmd) { 553 winCmd.add(quoteForBatchScript(arg)); 554 } 555 cmd = winCmd; 556 } 557 558 ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()])); 559 for (Map.Entry<String, String> e : builder.childEnv.entrySet()) { 560 pb.environment().put(e.getKey(), e.getValue()); 561 } 562 563 if (workingDir != null) { 564 pb.directory(workingDir); 565 } 566 567 // Only one of redirectError and redirectError(...) can be specified. 568 // Similarly, if redirectToLog is specified, no other redirections should be specified. 569 checkState(!redirectErrorStream || errorStream == null, 570 "Cannot specify both redirectError() and redirectError(...) "); 571 checkState(!redirectToLog || 572 (!redirectErrorStream && errorStream == null && outputStream == null), 573 "Cannot used redirectToLog() in conjunction with other redirection methods."); 574 575 if (redirectErrorStream || redirectToLog) { 576 pb.redirectErrorStream(true); 577 } 578 if (errorStream != null) { 579 pb.redirectError(errorStream); 580 } 581 if (outputStream != null) { 582 pb.redirectOutput(outputStream); 583 } 584 585 return pb; 586 } 587 588 private static class ArgumentValidator extends SparkSubmitOptionParser { 589 590 private final boolean hasValue; 591 ArgumentValidator(boolean hasValue)592 ArgumentValidator(boolean hasValue) { 593 this.hasValue = hasValue; 594 } 595 596 @Override handle(String opt, String value)597 protected boolean handle(String opt, String value) { 598 if (value == null && hasValue) { 599 throw new IllegalArgumentException(String.format("'%s' does not expect a value.", opt)); 600 } 601 return true; 602 } 603 604 @Override handleUnknown(String opt)605 protected boolean handleUnknown(String opt) { 606 // Do not fail on unknown arguments, to support future arguments added to SparkSubmit. 607 return true; 608 } 609 handleExtraArgs(List<String> extra)610 protected void handleExtraArgs(List<String> extra) { 611 // No op. 612 } 613 614 } 615 616 } 617