1 package org.broadinstitute.hellbender.engine.spark;
2 
3 import org.apache.spark.api.java.JavaSparkContext;
4 import org.broadinstitute.barclay.argparser.Argument;
5 import org.broadinstitute.barclay.argparser.ArgumentCollection;
6 import org.broadinstitute.hellbender.cmdline.CommandLineProgram;
7 
8 import java.io.Serializable;
9 
10 
11 public abstract class SparkCommandLineProgram extends CommandLineProgram implements Serializable {
12     private static final long serialVersionUID = 1L;
13     public static final String SPARK_PROGRAM_NAME_LONG_NAME = "program-name";
14 
15     @Argument(
16             doc = "Name of the program running",
17             fullName = SPARK_PROGRAM_NAME_LONG_NAME,
18             optional = true
19     )
20     public String programName;
21 
22     @ArgumentCollection
23     public SparkCommandLineArgumentCollection sparkArgs = new SparkCommandLineArgumentCollection();
24 
25 
26     @Override
doWork()27     protected Object doWork() {
28         final JavaSparkContext ctx = SparkContextFactory.getSparkContext(getProgramName(), sparkArgs.getSparkProperties(), sparkArgs.getSparkMaster());
29         setSparkVerbosity(ctx);
30         try{
31             runPipeline(ctx);
32             return null;
33         } finally {
34             afterPipeline(ctx);
35         }
36     }
37 
setSparkVerbosity(final JavaSparkContext ctx)38     private final void setSparkVerbosity(final JavaSparkContext ctx) {
39         final String sparkVerbosity = sparkArgs.getSparkVerbosity(VERBOSITY);
40         logger.info("Spark verbosity set to " + sparkVerbosity + " (see --" + SparkCommandLineArgumentCollection.SPARK_VERBOSITY_LONG_NAME + " argument)");
41         ctx.setLogLevel(sparkVerbosity);
42     }
43 
44     // ---------------------------------------------------
45     // Functions meant for overriding
46 
47     /**
48      * Runs the pipeline.
49      */
runPipeline(final JavaSparkContext ctx)50     protected abstract void runPipeline(final JavaSparkContext ctx);
51 
52     /**
53      * Extend this method to run code after the pipeline returns.
54      * This method is called whether or not the runPipeline call succeeded.
55      */
afterPipeline(final JavaSparkContext ctx)56     protected void afterPipeline(final JavaSparkContext ctx) {
57         SparkContextFactory.stopSparkContext(ctx);
58     }
59 
60     /**
61      * Returns the program's name.
62      * If {@link #programName} argument is provided, returns that. Otherwise, returns the simple name of the class.
63      *
64      * Subclasses can override if desired.
65      */
getProgramName()66     protected String getProgramName(){
67         return programName == null ? getClass().getSimpleName() : programName;
68     }
69 }
70