1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.launcher;
19 
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.atomic.AtomicInteger;
29 
30 import static org.apache.spark.launcher.CommandBuilderUtils.*;
31 
32 /**
33  * Launcher for Spark applications.
34  * <p>
35  * Use this class to start Spark applications programmatically. The class uses a builder pattern
36  * to allow clients to configure the Spark application and launch it as a child process.
37  * </p>
38  */
39 public class SparkLauncher {
40 
41   /** The Spark master. */
42   public static final String SPARK_MASTER = "spark.master";
43 
44   /** The Spark deploy mode. */
45   public static final String DEPLOY_MODE = "spark.submit.deployMode";
46 
47   /** Configuration key for the driver memory. */
48   public static final String DRIVER_MEMORY = "spark.driver.memory";
49   /** Configuration key for the driver class path. */
50   public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
51   /** Configuration key for the driver VM options. */
52   public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
53   /** Configuration key for the driver native library path. */
54   public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
55 
56   /** Configuration key for the executor memory. */
57   public static final String EXECUTOR_MEMORY = "spark.executor.memory";
58   /** Configuration key for the executor class path. */
59   public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
60   /** Configuration key for the executor VM options. */
61   public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
62   /** Configuration key for the executor native library path. */
63   public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
64   /** Configuration key for the number of executor CPU cores. */
65   public static final String EXECUTOR_CORES = "spark.executor.cores";
66 
67   static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";
68 
69   static final String PYSPARK_PYTHON = "spark.pyspark.python";
70 
71   static final String SPARKR_R_SHELL = "spark.r.shell.command";
72 
73   /** Logger name to use when launching a child process. */
74   public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
75 
76   /**
77    * A special value for the resource that tells Spark to not try to process the app resource as a
78    * file. This is useful when the class being executed is added to the application using other
79    * means - for example, by adding jars using the package download feature.
80    */
81   public static final String NO_RESOURCE = "spark-internal";
82 
83   /**
84    * Maximum time (in ms) to wait for a child process to connect back to the launcher server
85    * when using @link{#start()}.
86    */
87   public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";
88 
89   /** Used internally to create unique logger names. */
90   private static final AtomicInteger COUNTER = new AtomicInteger();
91 
92   /** Factory for creating OutputRedirector threads. **/
93   static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");
94 
95   static final Map<String, String> launcherConfig = new HashMap<>();
96 
97   /**
98    * Set a configuration value for the launcher library. These config values do not affect the
99    * launched application, but rather the behavior of the launcher library itself when managing
100    * applications.
101    *
102    * @since 1.6.0
103    * @param name Config name.
104    * @param value Config value.
105    */
setConfig(String name, String value)106   public static void setConfig(String name, String value) {
107     launcherConfig.put(name, value);
108   }
109 
110   // Visible for testing.
111   final SparkSubmitCommandBuilder builder;
112   File workingDir;
113   boolean redirectToLog;
114   boolean redirectErrorStream;
115   ProcessBuilder.Redirect errorStream;
116   ProcessBuilder.Redirect outputStream;
117 
SparkLauncher()118   public SparkLauncher() {
119     this(null);
120   }
121 
122   /**
123    * Creates a launcher that will set the given environment variables in the child.
124    *
125    * @param env Environment variables to set.
126    */
SparkLauncher(Map<String, String> env)127   public SparkLauncher(Map<String, String> env) {
128     this.builder = new SparkSubmitCommandBuilder();
129     if (env != null) {
130       this.builder.childEnv.putAll(env);
131     }
132   }
133 
134   /**
135    * Set a custom JAVA_HOME for launching the Spark application.
136    *
137    * @param javaHome Path to the JAVA_HOME to use.
138    * @return This launcher.
139    */
setJavaHome(String javaHome)140   public SparkLauncher setJavaHome(String javaHome) {
141     checkNotNull(javaHome, "javaHome");
142     builder.javaHome = javaHome;
143     return this;
144   }
145 
146   /**
147    * Set a custom Spark installation location for the application.
148    *
149    * @param sparkHome Path to the Spark installation to use.
150    * @return This launcher.
151    */
setSparkHome(String sparkHome)152   public SparkLauncher setSparkHome(String sparkHome) {
153     checkNotNull(sparkHome, "sparkHome");
154     builder.childEnv.put(ENV_SPARK_HOME, sparkHome);
155     return this;
156   }
157 
158   /**
159    * Set a custom properties file with Spark configuration for the application.
160    *
161    * @param path Path to custom properties file to use.
162    * @return This launcher.
163    */
setPropertiesFile(String path)164   public SparkLauncher setPropertiesFile(String path) {
165     checkNotNull(path, "path");
166     builder.setPropertiesFile(path);
167     return this;
168   }
169 
170   /**
171    * Set a single configuration value for the application.
172    *
173    * @param key Configuration key.
174    * @param value The value to use.
175    * @return This launcher.
176    */
setConf(String key, String value)177   public SparkLauncher setConf(String key, String value) {
178     checkNotNull(key, "key");
179     checkNotNull(value, "value");
180     checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
181     builder.conf.put(key, value);
182     return this;
183   }
184 
185   /**
186    * Set the application name.
187    *
188    * @param appName Application name.
189    * @return This launcher.
190    */
setAppName(String appName)191   public SparkLauncher setAppName(String appName) {
192     checkNotNull(appName, "appName");
193     builder.appName = appName;
194     return this;
195   }
196 
197   /**
198    * Set the Spark master for the application.
199    *
200    * @param master Spark master.
201    * @return This launcher.
202    */
setMaster(String master)203   public SparkLauncher setMaster(String master) {
204     checkNotNull(master, "master");
205     builder.master = master;
206     return this;
207   }
208 
209   /**
210    * Set the deploy mode for the application.
211    *
212    * @param mode Deploy mode.
213    * @return This launcher.
214    */
setDeployMode(String mode)215   public SparkLauncher setDeployMode(String mode) {
216     checkNotNull(mode, "mode");
217     builder.deployMode = mode;
218     return this;
219   }
220 
221   /**
222    * Set the main application resource. This should be the location of a jar file for Scala/Java
223    * applications, or a python script for PySpark applications.
224    *
225    * @param resource Path to the main application resource.
226    * @return This launcher.
227    */
setAppResource(String resource)228   public SparkLauncher setAppResource(String resource) {
229     checkNotNull(resource, "resource");
230     builder.appResource = resource;
231     return this;
232   }
233 
234   /**
235    * Sets the application class name for Java/Scala applications.
236    *
237    * @param mainClass Application's main class.
238    * @return This launcher.
239    */
setMainClass(String mainClass)240   public SparkLauncher setMainClass(String mainClass) {
241     checkNotNull(mainClass, "mainClass");
242     builder.mainClass = mainClass;
243     return this;
244   }
245 
246   /**
247    * Adds a no-value argument to the Spark invocation. If the argument is known, this method
248    * validates whether the argument is indeed a no-value argument, and throws an exception
249    * otherwise.
250    * <p>
251    * Use this method with caution. It is possible to create an invalid Spark command by passing
252    * unknown arguments to this method, since those are allowed for forward compatibility.
253    *
254    * @since 1.5.0
255    * @param arg Argument to add.
256    * @return This launcher.
257    */
addSparkArg(String arg)258   public SparkLauncher addSparkArg(String arg) {
259     SparkSubmitOptionParser validator = new ArgumentValidator(false);
260     validator.parse(Arrays.asList(arg));
261     builder.sparkArgs.add(arg);
262     return this;
263   }
264 
265   /**
266    * Adds an argument with a value to the Spark invocation. If the argument name corresponds to
267    * a known argument, the code validates that the argument actually expects a value, and throws
268    * an exception otherwise.
269    * <p>
270    * It is safe to add arguments modified by other methods in this class (such as
271    * {@link #setMaster(String)} - the last invocation will be the one to take effect.
272    * <p>
273    * Use this method with caution. It is possible to create an invalid Spark command by passing
274    * unknown arguments to this method, since those are allowed for forward compatibility.
275    *
276    * @since 1.5.0
277    * @param name Name of argument to add.
278    * @param value Value of the argument.
279    * @return This launcher.
280    */
addSparkArg(String name, String value)281   public SparkLauncher addSparkArg(String name, String value) {
282     SparkSubmitOptionParser validator = new ArgumentValidator(true);
283     if (validator.MASTER.equals(name)) {
284       setMaster(value);
285     } else if (validator.PROPERTIES_FILE.equals(name)) {
286       setPropertiesFile(value);
287     } else if (validator.CONF.equals(name)) {
288       String[] vals = value.split("=", 2);
289       setConf(vals[0], vals[1]);
290     } else if (validator.CLASS.equals(name)) {
291       setMainClass(value);
292     } else if (validator.JARS.equals(name)) {
293       builder.jars.clear();
294       for (String jar : value.split(",")) {
295         addJar(jar);
296       }
297     } else if (validator.FILES.equals(name)) {
298       builder.files.clear();
299       for (String file : value.split(",")) {
300         addFile(file);
301       }
302     } else if (validator.PY_FILES.equals(name)) {
303       builder.pyFiles.clear();
304       for (String file : value.split(",")) {
305         addPyFile(file);
306       }
307     } else {
308       validator.parse(Arrays.asList(name, value));
309       builder.sparkArgs.add(name);
310       builder.sparkArgs.add(value);
311     }
312     return this;
313   }
314 
315   /**
316    * Adds command line arguments for the application.
317    *
318    * @param args Arguments to pass to the application's main class.
319    * @return This launcher.
320    */
addAppArgs(String... args)321   public SparkLauncher addAppArgs(String... args) {
322     for (String arg : args) {
323       checkNotNull(arg, "arg");
324       builder.appArgs.add(arg);
325     }
326     return this;
327   }
328 
329   /**
330    * Adds a jar file to be submitted with the application.
331    *
332    * @param jar Path to the jar file.
333    * @return This launcher.
334    */
addJar(String jar)335   public SparkLauncher addJar(String jar) {
336     checkNotNull(jar, "jar");
337     builder.jars.add(jar);
338     return this;
339   }
340 
341   /**
342    * Adds a file to be submitted with the application.
343    *
344    * @param file Path to the file.
345    * @return This launcher.
346    */
addFile(String file)347   public SparkLauncher addFile(String file) {
348     checkNotNull(file, "file");
349     builder.files.add(file);
350     return this;
351   }
352 
353   /**
354    * Adds a python file / zip / egg to be submitted with the application.
355    *
356    * @param file Path to the file.
357    * @return This launcher.
358    */
addPyFile(String file)359   public SparkLauncher addPyFile(String file) {
360     checkNotNull(file, "file");
361     builder.pyFiles.add(file);
362     return this;
363   }
364 
365   /**
366    * Enables verbose reporting for SparkSubmit.
367    *
368    * @param verbose Whether to enable verbose output.
369    * @return This launcher.
370    */
setVerbose(boolean verbose)371   public SparkLauncher setVerbose(boolean verbose) {
372     builder.verbose = verbose;
373     return this;
374   }
375 
376   /**
377    * Sets the working directory of spark-submit.
378    *
379    * @param dir The directory to set as spark-submit's working directory.
380    * @return This launcher.
381    */
directory(File dir)382   public SparkLauncher directory(File dir) {
383     workingDir = dir;
384     return this;
385   }
386 
387   /**
388    * Specifies that stderr in spark-submit should be redirected to stdout.
389    *
390    * @return This launcher.
391    */
redirectError()392   public SparkLauncher redirectError() {
393     redirectErrorStream = true;
394     return this;
395   }
396 
397   /**
398    * Redirects error output to the specified Redirect.
399    *
400    * @param to The method of redirection.
401    * @return This launcher.
402    */
redirectError(ProcessBuilder.Redirect to)403   public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
404     errorStream = to;
405     return this;
406   }
407 
408   /**
409    * Redirects standard output to the specified Redirect.
410    *
411    * @param to The method of redirection.
412    * @return This launcher.
413    */
redirectOutput(ProcessBuilder.Redirect to)414   public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
415     outputStream = to;
416     return this;
417   }
418 
419   /**
420    * Redirects error output to the specified File.
421    *
422    * @param errFile The file to which stderr is written.
423    * @return This launcher.
424    */
redirectError(File errFile)425   public SparkLauncher redirectError(File errFile) {
426     errorStream = ProcessBuilder.Redirect.to(errFile);
427     return this;
428   }
429 
430   /**
431    * Redirects error output to the specified File.
432    *
433    * @param outFile The file to which stdout is written.
434    * @return This launcher.
435    */
redirectOutput(File outFile)436   public SparkLauncher redirectOutput(File outFile) {
437     outputStream = ProcessBuilder.Redirect.to(outFile);
438     return this;
439   }
440 
441   /**
442    * Sets all output to be logged and redirected to a logger with the specified name.
443    *
444    * @param loggerName The name of the logger to log stdout and stderr.
445    * @return This launcher.
446    */
redirectToLog(String loggerName)447   public SparkLauncher redirectToLog(String loggerName) {
448     setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
449     redirectToLog = true;
450     return this;
451   }
452 
453   /**
454    * Launches a sub-process that will start the configured Spark application.
455    * <p>
456    * The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching
457    * Spark, since it provides better control of the child application.
458    *
459    * @return A process handle for the Spark app.
460    */
launch()461   public Process launch() throws IOException {
462     Process childProc = createBuilder().start();
463     if (redirectToLog) {
464       String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
465       new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY);
466     }
467     return childProc;
468   }
469 
470   /**
471    * Starts a Spark application.
472    * <p>
473    * This method returns a handle that provides information about the running application and can
474    * be used to do basic interaction with it.
475    * <p>
476    * The returned handle assumes that the application will instantiate a single SparkContext
477    * during its lifetime. Once that context reports a final state (one that indicates the
478    * SparkContext has stopped), the handle will not perform new state transitions, so anything
479    * that happens after that cannot be monitored. If the underlying application is launched as
480    * a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
481    * <p>
482    * Currently, all applications are launched as child processes. The child's stdout and stderr
483    * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection
484    * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be
485    * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that
486    * option is not set, the code will try to derive a name from the application's name or main
487    * class / script file. If those cannot be determined, an internal, unique name will be used.
488    * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more
489    * easily into the configuration of commonly-used logging systems.
490    *
491    * @since 1.6.0
492    * @param listeners Listeners to add to the handle before the app is launched.
493    * @return A handle for the launched application.
494    */
startApplication(SparkAppHandle.Listener... listeners)495   public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
496     ChildProcAppHandle handle = LauncherServer.newAppHandle();
497     for (SparkAppHandle.Listener l : listeners) {
498       handle.addListener(l);
499     }
500 
501     String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
502     ProcessBuilder pb = createBuilder();
503     // Only setup stderr + stdout to logger redirection if user has not otherwise configured output
504     // redirection.
505     if (loggerName == null) {
506       String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
507       if (appName == null) {
508         if (builder.appName != null) {
509           appName = builder.appName;
510         } else if (builder.mainClass != null) {
511           int dot = builder.mainClass.lastIndexOf(".");
512           if (dot >= 0 && dot < builder.mainClass.length() - 1) {
513             appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
514           } else {
515             appName = builder.mainClass;
516           }
517         } else if (builder.appResource != null) {
518           appName = new File(builder.appResource).getName();
519         } else {
520           appName = String.valueOf(COUNTER.incrementAndGet());
521         }
522       }
523       String loggerPrefix = getClass().getPackage().getName();
524       loggerName = String.format("%s.app.%s", loggerPrefix, appName);
525       pb.redirectErrorStream(true);
526     }
527 
528     pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
529       String.valueOf(LauncherServer.getServerInstance().getPort()));
530     pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
531     try {
532       handle.setChildProc(pb.start(), loggerName);
533     } catch (IOException ioe) {
534       handle.kill();
535       throw ioe;
536     }
537 
538     return handle;
539   }
540 
createBuilder()541   private ProcessBuilder createBuilder() {
542     List<String> cmd = new ArrayList<>();
543     String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
544     cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));
545     cmd.addAll(builder.buildSparkSubmitArgs());
546 
547     // Since the child process is a batch script, let's quote things so that special characters are
548     // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
549     // weird.
550     if (isWindows()) {
551       List<String> winCmd = new ArrayList<>();
552       for (String arg : cmd) {
553         winCmd.add(quoteForBatchScript(arg));
554       }
555       cmd = winCmd;
556     }
557 
558     ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
559     for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
560       pb.environment().put(e.getKey(), e.getValue());
561     }
562 
563     if (workingDir != null) {
564       pb.directory(workingDir);
565     }
566 
567     // Only one of redirectError and redirectError(...) can be specified.
568     // Similarly, if redirectToLog is specified, no other redirections should be specified.
569     checkState(!redirectErrorStream || errorStream == null,
570       "Cannot specify both redirectError() and redirectError(...) ");
571     checkState(!redirectToLog ||
572       (!redirectErrorStream && errorStream == null && outputStream == null),
573       "Cannot used redirectToLog() in conjunction with other redirection methods.");
574 
575     if (redirectErrorStream || redirectToLog) {
576       pb.redirectErrorStream(true);
577     }
578     if (errorStream != null) {
579       pb.redirectError(errorStream);
580     }
581     if (outputStream != null) {
582       pb.redirectOutput(outputStream);
583     }
584 
585     return pb;
586   }
587 
588   private static class ArgumentValidator extends SparkSubmitOptionParser {
589 
590     private final boolean hasValue;
591 
ArgumentValidator(boolean hasValue)592     ArgumentValidator(boolean hasValue) {
593       this.hasValue = hasValue;
594     }
595 
596     @Override
handle(String opt, String value)597     protected boolean handle(String opt, String value) {
598       if (value == null && hasValue) {
599         throw new IllegalArgumentException(String.format("'%s' does not expect a value.", opt));
600       }
601       return true;
602     }
603 
604     @Override
handleUnknown(String opt)605     protected boolean handleUnknown(String opt) {
606       // Do not fail on unknown arguments, to support future arguments added to SparkSubmit.
607       return true;
608     }
609 
handleExtraArgs(List<String> extra)610     protected void handleExtraArgs(List<String> extra) {
611       // No op.
612     }
613 
614   }
615 
616 }
617