1 package org.broadinstitute.hellbender.engine;
2 
3 import htsjdk.samtools.SAMSequenceDictionary;
4 import htsjdk.variant.variantcontext.VariantContext;
5 import htsjdk.variant.vcf.VCFHeader;
6 import org.broadinstitute.barclay.argparser.Argument;
7 import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
8 import org.broadinstitute.hellbender.engine.filters.CountingReadFilter;
9 import org.broadinstitute.hellbender.exceptions.GATKException;
10 import org.broadinstitute.hellbender.utils.SimpleInterval;
11 
12 import java.util.Spliterator;
13 
14 /**
15  * A VariantWalker is a tool that processes a variant at a time from a source of variants, with
16  * optional contextual information from a reference, sets of reads, and/or supplementary sources
17  * of Features.
18  *
19  * VariantWalker authors must implement the {@link #apply} method to process each variant, and may optionally implement
20  * {@link #onTraversalStart}, {@link #onTraversalSuccess} and/or {@link #closeTool}.
21  */
22 public abstract class VariantWalker extends VariantWalkerBase {
23 
24     // NOTE: using GATKPath rather than FeatureInput<VariantContext> here so that we can keep this driving source
25     //       of variants separate from any other potential sources of Features
26     @Argument(fullName = StandardArgumentDefinitions.VARIANT_LONG_NAME, shortName = StandardArgumentDefinitions.VARIANT_SHORT_NAME, doc = "A VCF file containing variants", common = false, optional = false)
27     public GATKPath drivingVariantFile;
28 
29     // NOTE: keeping the driving source of variants separate from other, supplementary FeatureInputs in our FeatureManager in GATKTool
30     // we do add the driving source to the Feature manager but we do need to treat it differently and thus this field.
31     private FeatureDataSource<VariantContext> drivingVariants;
32     private FeatureInput<VariantContext> drivingVariantsFeatureInput;
33 
34     @Override
getSequenceDictionaryForDrivingVariants()35     protected SAMSequenceDictionary getSequenceDictionaryForDrivingVariants() { return drivingVariants.getSequenceDictionary(); }
36 
37     @Override
getSpliteratorForDrivingVariants()38     protected Spliterator<VariantContext> getSpliteratorForDrivingVariants() { return drivingVariants.spliterator(); }
39 
40     /**
41      * Marked final so that tool authors don't override it. Tool authors should override {@link #onTraversalStart} instead.
42      */
43     @Override
onStartup()44     protected final void onStartup() {
45         super.onStartup();
46         if ( hasUserSuppliedIntervals() ) {
47             drivingVariants.setIntervalsForTraversal(userIntervals);
48         }
49     }
50 
51     @Override
initializeDrivingVariants()52     protected void initializeDrivingVariants() {
53         drivingVariantsFeatureInput = new FeatureInput<>(drivingVariantFile, "drivingVariantFile");
54 
55         // Create a FeatureDataSource for the driving variants FeatureInput, using the
56         // cache lookahead value from getDrivingVariantCacheLookAheadBases()
57         drivingVariants = new FeatureDataSource<>(drivingVariantsFeatureInput, getDrivingVariantCacheLookAheadBases(), VariantContext.class, cloudPrefetchBuffer, cloudIndexPrefetchBuffer,
58                                                   getGenomicsDBOptions());
59 
60         // Also add the driving variants FeatureInput to FeatureManager as well so that it can be queried,
61         // but use a lookahead value of 0 to avoid caching because of windowed queries that need to "look behind" as well.
62         features.addToFeatureSources(0, drivingVariantsFeatureInput, VariantContext.class, cloudPrefetchBuffer, cloudIndexPrefetchBuffer,
63                                      getGenomicsDBOptions());
64 
65         // Note: the intervals for the driving variants are set in onStartup()
66     }
67 
68     /**
69      * Returns the feature input for the driving variants file.
70      */
getDrivingVariantsFeatureInput()71     protected final FeatureInput<VariantContext> getDrivingVariantsFeatureInput() {
72         return drivingVariantsFeatureInput;
73     }
74 
75     /**
76      * Gets the header associated with our driving source of variants as a VCFHeader.
77      *
78      * @return VCFHeader for our driving source of variants
79      */
getHeaderForVariants()80     public final VCFHeader getHeaderForVariants() {
81         final Object header = drivingVariants.getHeader();
82 
83         if ( ! (header instanceof VCFHeader) ) {
84             throw new GATKException("Header for " + drivingVariantFile + " is not in VCF header format");
85         }
86 
87         return (VCFHeader)header;
88     }
89 
90     /**
91      * Implementation of variant-based traversal.
92      *
93      * NOTE: You should only override {@link #traverse()} if you are writing a new walker base class in the
94      * engine package that extends this class. It is not meant to be overridden by tools outside of the engine
95      * package.
96      */
97     @Override
traverse()98     public void traverse() {
99         final CountingReadFilter readFilter = makeReadFilter();
100         // Process each variant in the input stream.
101         getTransformedVariantStream( makeVariantFilter() )
102                 .forEach(variant -> {
103                     final SimpleInterval variantInterval = new SimpleInterval(variant);
104                     apply(variant,
105                             new ReadsContext(reads, variantInterval, readFilter),
106                             new ReferenceContext(reference, variantInterval),
107                             new FeatureContext(features, variantInterval));
108 
109                     progressMeter.update(variantInterval);
110                 });
111     }
112 
113     /**
114      * Process an individual variant. Must be implemented by tool authors.
115      * In general, tool authors should simply stream their output from apply(), and maintain as little internal state
116      * as possible.
117      *
118      * @param variant Current variant being processed.
119      * @param readsContext Reads overlapping the current variant. Will be an empty, but non-null, context object
120      *                     if there is no backing source of reads data (in which case all queries on it will return
121      *                     an empty array/iterator)
122      * @param referenceContext Reference bases spanning the current variant. Will be an empty, but non-null, context object
123      *                         if there is no backing source of reference data (in which case all queries on it will return
124      *                         an empty array/iterator). Can request extra bases of context around the current variant's interval
125      *                         by invoking {@link ReferenceContext#setWindow}
126      *                         on this object before calling {@link ReferenceContext#getBases}
127      * @param featureContext Features spanning the current variant. Will be an empty, but non-null, context object
128      *                       if there is no backing source of Feature data (in which case all queries on it will return an
129      *                       empty List).
130      */
apply( VariantContext variant, ReadsContext readsContext, ReferenceContext referenceContext, FeatureContext featureContext )131     public abstract void apply( VariantContext variant, ReadsContext readsContext, ReferenceContext referenceContext, FeatureContext featureContext );
132 
133     /**
134      * Close all data sources.
135      *
136      * Marked final so that tool authors don't override it. Tool authors should override {@link #onTraversalSuccess} and/or
137      * {@link #closeTool} instead.
138      */
139     @Override
onShutdown()140     protected final void onShutdown() {
141         super.onShutdown();
142 
143         if ( drivingVariants != null )
144             drivingVariants.close();
145     }
146 }
147