1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.launcher;
19 
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.*;
23 
24 import static org.apache.spark.launcher.CommandBuilderUtils.*;
25 
26 /**
27  * Special command builder for handling a CLI invocation of SparkSubmit.
28  * <p>
29  * This builder adds command line parsing compatible with SparkSubmit. It handles setting
30  * driver-side options and special parsing behavior needed for the special-casing certain internal
31  * Spark applications.
32  * <p>
33  * This class has also some special features to aid launching shells (pyspark and sparkR) and also
34  * examples.
35  */
36 class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
37 
38   /**
39    * Name of the app resource used to identify the PySpark shell. The command line parser expects
40    * the resource name to be the very first argument to spark-submit in this case.
41    *
42    * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit
43    * (see java_gateway.py), and can cause this code to enter into an infinite loop.
44    */
45   static final String PYSPARK_SHELL = "pyspark-shell-main";
46 
47   /**
48    * This is the actual resource name that identifies the PySpark shell to SparkSubmit.
49    */
50   static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";
51 
52   /**
53    * Name of the app resource used to identify the SparkR shell. The command line parser expects
54    * the resource name to be the very first argument to spark-submit in this case.
55    *
56    * NOTE: this cannot be "sparkr-shell" since that identifies the SparkR shell to SparkSubmit
57    * (see sparkR.R), and can cause this code to enter into an infinite loop.
58    */
59   static final String SPARKR_SHELL = "sparkr-shell-main";
60 
61   /**
62    * This is the actual resource name that identifies the SparkR shell to SparkSubmit.
63    */
64   static final String SPARKR_SHELL_RESOURCE = "sparkr-shell";
65 
66   /**
67    * Name of app resource used to identify examples. When running examples, args[0] should be
68    * this name. The app resource will identify the example class to run.
69    */
70   static final String RUN_EXAMPLE = "run-example";
71 
72   /**
73    * Prefix for example class names.
74    */
75   static final String EXAMPLE_CLASS_PREFIX = "org.apache.spark.examples.";
76 
77   /**
78    * This map must match the class names for available special classes, since this modifies the way
79    * command line parsing works. This maps the class name to the resource to use when calling
80    * spark-submit.
81    */
82   private static final Map<String, String> specialClasses = new HashMap<>();
83   static {
84     specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
85     specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
86       SparkLauncher.NO_RESOURCE);
87     specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
88       SparkLauncher.NO_RESOURCE);
89   }
90 
91   final List<String> sparkArgs;
92   private final boolean isAppResourceReq;
93   private final boolean isExample;
94 
95   /**
96    * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
97    * to parse the command lines for things like bin/spark-shell, which allows users to mix and
98    * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo").
99    */
100   private boolean allowsMixedArguments;
101 
SparkSubmitCommandBuilder()102   SparkSubmitCommandBuilder() {
103     this.sparkArgs = new ArrayList<>();
104     this.isAppResourceReq = true;
105     this.isExample = false;
106   }
107 
SparkSubmitCommandBuilder(List<String> args)108   SparkSubmitCommandBuilder(List<String> args) {
109     this.allowsMixedArguments = false;
110     this.sparkArgs = new ArrayList<>();
111     boolean isExample = false;
112     List<String> submitArgs = args;
113 
114     if (args.size() > 0) {
115       switch (args.get(0)) {
116         case PYSPARK_SHELL:
117           this.allowsMixedArguments = true;
118           appResource = PYSPARK_SHELL;
119           submitArgs = args.subList(1, args.size());
120           break;
121 
122         case SPARKR_SHELL:
123           this.allowsMixedArguments = true;
124           appResource = SPARKR_SHELL;
125           submitArgs = args.subList(1, args.size());
126           break;
127 
128         case RUN_EXAMPLE:
129           isExample = true;
130           submitArgs = args.subList(1, args.size());
131       }
132 
133       this.isExample = isExample;
134       OptionParser parser = new OptionParser();
135       parser.parse(submitArgs);
136       this.isAppResourceReq = parser.isAppResourceReq;
137     }  else {
138       this.isExample = isExample;
139       this.isAppResourceReq = false;
140     }
141   }
142 
143   @Override
buildCommand(Map<String, String> env)144   public List<String> buildCommand(Map<String, String> env)
145       throws IOException, IllegalArgumentException {
146     if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) {
147       return buildPySparkShellCommand(env);
148     } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) {
149       return buildSparkRCommand(env);
150     } else {
151       return buildSparkSubmitCommand(env);
152     }
153   }
154 
buildSparkSubmitArgs()155   List<String> buildSparkSubmitArgs() {
156     List<String> args = new ArrayList<>();
157     SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
158 
159     if (!allowsMixedArguments && isAppResourceReq) {
160       checkArgument(appResource != null, "Missing application resource.");
161     }
162 
163     if (verbose) {
164       args.add(parser.VERBOSE);
165     }
166 
167     if (master != null) {
168       args.add(parser.MASTER);
169       args.add(master);
170     }
171 
172     if (deployMode != null) {
173       args.add(parser.DEPLOY_MODE);
174       args.add(deployMode);
175     }
176 
177     if (appName != null) {
178       args.add(parser.NAME);
179       args.add(appName);
180     }
181 
182     for (Map.Entry<String, String> e : conf.entrySet()) {
183       args.add(parser.CONF);
184       args.add(String.format("%s=%s", e.getKey(), e.getValue()));
185     }
186 
187     if (propertiesFile != null) {
188       args.add(parser.PROPERTIES_FILE);
189       args.add(propertiesFile);
190     }
191 
192     if (isExample) {
193       jars.addAll(findExamplesJars());
194     }
195 
196     if (!jars.isEmpty()) {
197       args.add(parser.JARS);
198       args.add(join(",", jars));
199     }
200 
201     if (!files.isEmpty()) {
202       args.add(parser.FILES);
203       args.add(join(",", files));
204     }
205 
206     if (!pyFiles.isEmpty()) {
207       args.add(parser.PY_FILES);
208       args.add(join(",", pyFiles));
209     }
210 
211     if (isAppResourceReq) {
212       checkArgument(!isExample || mainClass != null, "Missing example class name.");
213     }
214     if (mainClass != null) {
215       args.add(parser.CLASS);
216       args.add(mainClass);
217     }
218 
219     args.addAll(sparkArgs);
220     if (appResource != null) {
221       args.add(appResource);
222     }
223     args.addAll(appArgs);
224 
225     return args;
226   }
227 
buildSparkSubmitCommand(Map<String, String> env)228   private List<String> buildSparkSubmitCommand(Map<String, String> env)
229       throws IOException, IllegalArgumentException {
230     // Load the properties file and check whether spark-submit will be running the app's driver
231     // or just launching a cluster app. When running the driver, the JVM's argument will be
232     // modified to cover the driver's configuration.
233     Map<String, String> config = getEffectiveConfig();
234     boolean isClientMode = isClientMode(config);
235     String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
236 
237     List<String> cmd = buildJavaCommand(extraClassPath);
238     // Take Thrift Server as daemon
239     if (isThriftServer(mainClass)) {
240       addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
241     }
242     addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
243     addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
244 
245     // We don't want the client to specify Xmx. These have to be set by their corresponding
246     // memory flag --driver-memory or configuration entry spark.driver.memory
247     String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
248     if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
249       String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
250                    "java options (was %s). Use the corresponding --driver-memory or " +
251                    "spark.driver.memory configuration instead.", driverExtraJavaOptions);
252       throw new IllegalArgumentException(msg);
253     }
254 
255     if (isClientMode) {
256       // Figuring out where the memory value come from is a little tricky due to precedence.
257       // Precedence is observed in the following order:
258       // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
259       // - properties file.
260       // - SPARK_DRIVER_MEMORY env variable
261       // - SPARK_MEM env variable
262       // - default value (1g)
263       // Take Thrift Server as daemon
264       String tsMemory =
265         isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
266       String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
267         System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
268       cmd.add("-Xmx" + memory);
269       addOptionString(cmd, driverExtraJavaOptions);
270       mergeEnvPathList(env, getLibPathEnvName(),
271         config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
272     }
273 
274     addPermGenSizeOpt(cmd);
275     cmd.add("org.apache.spark.deploy.SparkSubmit");
276     cmd.addAll(buildSparkSubmitArgs());
277     return cmd;
278   }
279 
buildPySparkShellCommand(Map<String, String> env)280   private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
281     // For backwards compatibility, if a script is specified in
282     // the pyspark command line, then run it using spark-submit.
283     if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
284       System.err.println(
285         "Running python applications through 'pyspark' is not supported as of Spark 2.0.\n" +
286         "Use ./bin/spark-submit <python file>");
287       System.exit(-1);
288     }
289 
290     checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");
291 
292     // When launching the pyspark shell, the spark-submit arguments should be stored in the
293     // PYSPARK_SUBMIT_ARGS env variable.
294     appResource = PYSPARK_SHELL_RESOURCE;
295     constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");
296 
297     // Will pick up the binary executable in the following order
298     // 1. conf spark.pyspark.driver.python
299     // 2. conf spark.pyspark.python
300     // 3. environment variable PYSPARK_DRIVER_PYTHON
301     // 4. environment variable PYSPARK_PYTHON
302     // 5. python
303     List<String> pyargs = new ArrayList<>();
304     pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
305       conf.get(SparkLauncher.PYSPARK_PYTHON),
306       System.getenv("PYSPARK_DRIVER_PYTHON"),
307       System.getenv("PYSPARK_PYTHON"),
308       "python"));
309     String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
310     if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
311       // pass conf spark.pyspark.python to python by environment variable.
312       env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
313     }
314     if (!isEmpty(pyOpts)) {
315       pyargs.addAll(parseOptionString(pyOpts));
316     }
317 
318     return pyargs;
319   }
320 
buildSparkRCommand(Map<String, String> env)321   private List<String> buildSparkRCommand(Map<String, String> env) throws IOException {
322     if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".R")) {
323       System.err.println(
324         "Running R applications through 'sparkR' is not supported as of Spark 2.0.\n" +
325         "Use ./bin/spark-submit <R file>");
326       System.exit(-1);
327     }
328     // When launching the SparkR shell, store the spark-submit arguments in the SPARKR_SUBMIT_ARGS
329     // env variable.
330     appResource = SPARKR_SHELL_RESOURCE;
331     constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS");
332 
333     // Set shell.R as R_PROFILE_USER to load the SparkR package when the shell comes up.
334     String sparkHome = System.getenv("SPARK_HOME");
335     env.put("R_PROFILE_USER",
336             join(File.separator, sparkHome, "R", "lib", "SparkR", "profile", "shell.R"));
337 
338     List<String> args = new ArrayList<>();
339     args.add(firstNonEmpty(conf.get(SparkLauncher.SPARKR_R_SHELL),
340       System.getenv("SPARKR_DRIVER_R"), "R"));
341     return args;
342   }
343 
constructEnvVarArgs( Map<String, String> env, String submitArgsEnvVariable)344   private void constructEnvVarArgs(
345       Map<String, String> env,
346       String submitArgsEnvVariable) throws IOException {
347     mergeEnvPathList(env, getLibPathEnvName(),
348       getEffectiveConfig().get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
349 
350     StringBuilder submitArgs = new StringBuilder();
351     for (String arg : buildSparkSubmitArgs()) {
352       if (submitArgs.length() > 0) {
353         submitArgs.append(" ");
354       }
355       submitArgs.append(quoteForCommandString(arg));
356     }
357     env.put(submitArgsEnvVariable, submitArgs.toString());
358   }
359 
isClientMode(Map<String, String> userProps)360   private boolean isClientMode(Map<String, String> userProps) {
361     String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
362     String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE));
363     // Default master is "local[*]", so assume client mode in that case
364     return userMaster == null ||
365       "client".equals(userDeployMode) ||
366       (!userMaster.equals("yarn-cluster") && userDeployMode == null);
367   }
368 
369   /**
370    * Return whether the given main class represents a thrift server.
371    */
isThriftServer(String mainClass)372   private boolean isThriftServer(String mainClass) {
373     return (mainClass != null &&
374       mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"));
375   }
376 
findExamplesJars()377   private List<String> findExamplesJars() {
378     boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
379     List<String> examplesJars = new ArrayList<>();
380     String sparkHome = getSparkHome();
381 
382     File jarsDir;
383     if (new File(sparkHome, "RELEASE").isFile()) {
384       jarsDir = new File(sparkHome, "examples/jars");
385     } else {
386       jarsDir = new File(sparkHome,
387         String.format("examples/target/scala-%s/jars", getScalaVersion()));
388     }
389 
390     boolean foundDir = jarsDir.isDirectory();
391     checkState(isTesting || foundDir, "Examples jars directory '%s' does not exist.",
392         jarsDir.getAbsolutePath());
393 
394     if (foundDir) {
395       for (File f: jarsDir.listFiles()) {
396         examplesJars.add(f.getAbsolutePath());
397       }
398     }
399     return examplesJars;
400   }
401 
402   private class OptionParser extends SparkSubmitOptionParser {
403 
404     boolean isAppResourceReq = true;
405 
406     @Override
handle(String opt, String value)407     protected boolean handle(String opt, String value) {
408       if (opt.equals(MASTER)) {
409         master = value;
410       } else if (opt.equals(DEPLOY_MODE)) {
411         deployMode = value;
412       } else if (opt.equals(PROPERTIES_FILE)) {
413         propertiesFile = value;
414       } else if (opt.equals(DRIVER_MEMORY)) {
415         conf.put(SparkLauncher.DRIVER_MEMORY, value);
416       } else if (opt.equals(DRIVER_JAVA_OPTIONS)) {
417         conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
418       } else if (opt.equals(DRIVER_LIBRARY_PATH)) {
419         conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
420       } else if (opt.equals(DRIVER_CLASS_PATH)) {
421         conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
422       } else if (opt.equals(CONF)) {
423         String[] setConf = value.split("=", 2);
424         checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
425         conf.put(setConf[0], setConf[1]);
426       } else if (opt.equals(CLASS)) {
427         // The special classes require some special command line handling, since they allow
428         // mixing spark-submit arguments with arguments that should be propagated to the shell
429         // itself. Note that for this to work, the "--class" argument must come before any
430         // non-spark-submit arguments.
431         mainClass = value;
432         if (specialClasses.containsKey(value)) {
433           allowsMixedArguments = true;
434           appResource = specialClasses.get(value);
435         }
436       } else if (opt.equals(KILL_SUBMISSION) || opt.equals(STATUS)) {
437         isAppResourceReq = false;
438         sparkArgs.add(opt);
439         sparkArgs.add(value);
440       } else if (opt.equals(HELP) || opt.equals(USAGE_ERROR)) {
441         isAppResourceReq = false;
442         sparkArgs.add(opt);
443       } else if (opt.equals(VERSION)) {
444         isAppResourceReq = false;
445         sparkArgs.add(opt);
446       } else {
447         sparkArgs.add(opt);
448         if (value != null) {
449           sparkArgs.add(value);
450         }
451       }
452       return true;
453     }
454 
455     @Override
handleUnknown(String opt)456     protected boolean handleUnknown(String opt) {
457       // When mixing arguments, add unrecognized parameters directly to the user arguments list. In
458       // normal mode, any unrecognized parameter triggers the end of command line parsing, and the
459       // parameter itself will be interpreted by SparkSubmit as the application resource. The
460       // remaining params will be appended to the list of SparkSubmit arguments.
461       if (allowsMixedArguments) {
462         appArgs.add(opt);
463         return true;
464       } else if (isExample) {
465         String className = opt;
466         if (!className.startsWith(EXAMPLE_CLASS_PREFIX)) {
467           className = EXAMPLE_CLASS_PREFIX + className;
468         }
469         mainClass = className;
470         appResource = SparkLauncher.NO_RESOURCE;
471         return false;
472       } else {
473         checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
474         checkState(appResource == null, "Found unrecognized argument but resource is already set.");
475         appResource = opt;
476         return false;
477       }
478     }
479 
480     @Override
handleExtraArgs(List<String> extra)481     protected void handleExtraArgs(List<String> extra) {
482       appArgs.addAll(extra);
483     }
484 
485   }
486 
487 }
488