1 package org.broadinstitute.hellbender.engine.spark; 2 3 import org.apache.spark.api.java.JavaSparkContext; 4 import org.broadinstitute.barclay.argparser.Argument; 5 import org.broadinstitute.barclay.argparser.ArgumentCollection; 6 import org.broadinstitute.hellbender.cmdline.CommandLineProgram; 7 8 import java.io.Serializable; 9 10 11 public abstract class SparkCommandLineProgram extends CommandLineProgram implements Serializable { 12 private static final long serialVersionUID = 1L; 13 public static final String SPARK_PROGRAM_NAME_LONG_NAME = "program-name"; 14 15 @Argument( 16 doc = "Name of the program running", 17 fullName = SPARK_PROGRAM_NAME_LONG_NAME, 18 optional = true 19 ) 20 public String programName; 21 22 @ArgumentCollection 23 public SparkCommandLineArgumentCollection sparkArgs = new SparkCommandLineArgumentCollection(); 24 25 26 @Override doWork()27 protected Object doWork() { 28 final JavaSparkContext ctx = SparkContextFactory.getSparkContext(getProgramName(), sparkArgs.getSparkProperties(), sparkArgs.getSparkMaster()); 29 setSparkVerbosity(ctx); 30 try{ 31 runPipeline(ctx); 32 return null; 33 } finally { 34 afterPipeline(ctx); 35 } 36 } 37 setSparkVerbosity(final JavaSparkContext ctx)38 private final void setSparkVerbosity(final JavaSparkContext ctx) { 39 final String sparkVerbosity = sparkArgs.getSparkVerbosity(VERBOSITY); 40 logger.info("Spark verbosity set to " + sparkVerbosity + " (see --" + SparkCommandLineArgumentCollection.SPARK_VERBOSITY_LONG_NAME + " argument)"); 41 ctx.setLogLevel(sparkVerbosity); 42 } 43 44 // --------------------------------------------------- 45 // Functions meant for overriding 46 47 /** 48 * Runs the pipeline. 49 */ runPipeline(final JavaSparkContext ctx)50 protected abstract void runPipeline(final JavaSparkContext ctx); 51 52 /** 53 * Extend this method to run code after the pipeline returns. 54 * This method is called whether or not the runPipeline call succeeded. 55 */ afterPipeline(final JavaSparkContext ctx)56 protected void afterPipeline(final JavaSparkContext ctx) { 57 SparkContextFactory.stopSparkContext(ctx); 58 } 59 60 /** 61 * Returns the program's name. 62 * If {@link #programName} argument is provided, returns that. Otherwise, returns the simple name of the class. 63 * 64 * Subclasses can override if desired. 65 */ getProgramName()66 protected String getProgramName(){ 67 return programName == null ? getClass().getSimpleName() : programName; 68 } 69 } 70