1 package org.broadinstitute.hellbender.tools.spark.sv.evidence;
2 
3 import com.esotericsoftware.kryo.DefaultSerializer;
4 import com.esotericsoftware.kryo.Kryo;
5 import com.esotericsoftware.kryo.io.Input;
6 import com.esotericsoftware.kryo.io.Output;
7 import htsjdk.samtools.*;
8 import org.broadinstitute.hellbender.exceptions.UserException;
9 import org.broadinstitute.hellbender.tools.spark.sv.utils.SVFileUtils;
10 import org.broadinstitute.hellbender.tools.spark.sv.utils.SVInterval;
11 import org.broadinstitute.hellbender.tools.spark.sv.utils.SVUtils;
12 import org.broadinstitute.hellbender.utils.SequenceDictionaryUtils;
13 import org.broadinstitute.hellbender.utils.Utils;
14 import org.broadinstitute.hellbender.utils.bwa.BwaMemAlignment;
15 import org.broadinstitute.hellbender.utils.bwa.BwaMemAlignmentUtils;
16 import org.broadinstitute.hellbender.utils.fermi.FermiLiteAssembly;
17 import org.broadinstitute.hellbender.utils.fermi.FermiLiteAssembly.Connection;
18 import org.broadinstitute.hellbender.utils.fermi.FermiLiteAssembly.Contig;
19 import org.broadinstitute.hellbender.utils.gcs.BucketUtils;
20 
21 import java.io.BufferedOutputStream;
22 import java.io.IOException;
23 import java.io.OutputStreamWriter;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.stream.IntStream;
29 import java.util.stream.Stream;
30 
31 /**
32  * An assembly with its contigs aligned to reference, or a reason that there isn't an assembly.
33  */
34 @DefaultSerializer(AlignedAssemblyOrExcuse.Serializer.class)
35 public final class AlignedAssemblyOrExcuse {
36     private final int assemblyId;
37     private final String errorMessage;
38     private final FermiLiteAssembly assembly;
39     private final List<List<BwaMemAlignment>> contigAlignments;
40     private final int secondsInAssembly;
41 
getSecondsInAssembly()42     public int getSecondsInAssembly() { return secondsInAssembly; }
43 
getAssemblyId()44     public int getAssemblyId() {
45         return assemblyId;
46     }
47 
48     /**
49      * Either this is null, or the assembly and list of alignments is null.
50      */
getErrorMessage()51     public String getErrorMessage() {
52         return errorMessage;
53     }
54 
isNotFailure()55     public boolean isNotFailure() {
56         return errorMessage==null;
57     }
58 
getAssembly()59     public FermiLiteAssembly getAssembly() {
60         return assembly;
61     }
62 
63     /**
64      * List is equal in length to the number of contigs in the assembly.
65      */
getContigAlignments()66     public List<List<BwaMemAlignment>> getContigAlignments() {
67         return contigAlignments;
68     }
69 
AlignedAssemblyOrExcuse( final int assemblyId, final String errorMessage )70     public AlignedAssemblyOrExcuse( final int assemblyId, final String errorMessage ) {
71         this.assemblyId = assemblyId;
72         this.errorMessage = errorMessage;
73         this.assembly = null;
74         this.contigAlignments = null;
75         this.secondsInAssembly = 0;
76     }
77 
AlignedAssemblyOrExcuse( final int assemblyId, final FermiLiteAssembly assembly, final int secondsInAssembly, final List<List<BwaMemAlignment>> contigAlignments )78     public AlignedAssemblyOrExcuse( final int assemblyId, final FermiLiteAssembly assembly, final int secondsInAssembly,
79                                     final List<List<BwaMemAlignment>> contigAlignments ) {
80         Utils.validate(assembly.getNContigs()==contigAlignments.size(),
81                 "Number of contigs in assembly doesn't match length of list of alignments.");
82         Utils.validateArg(assembly.getContigs().stream().noneMatch(contig -> contig.getConnections()==null),
83                 "Some assembly has contigs that have null connections");
84         this.assemblyId = assemblyId;
85         this.errorMessage = null;
86         this.assembly = assembly;
87         this.contigAlignments = contigAlignments;
88         this.secondsInAssembly = secondsInAssembly;
89     }
90 
AlignedAssemblyOrExcuse( final Kryo kryo, final Input input )91     private AlignedAssemblyOrExcuse( final Kryo kryo, final Input input ) {
92         this.assemblyId = input.readInt();
93         this.errorMessage = input.readString();
94         this.secondsInAssembly = input.readInt();
95         if ( errorMessage != null ) {
96             this.assembly = null;
97             this.contigAlignments = null;
98         } else {
99             final int nContigs = input.readInt();
100             final List<Contig> contigs = new ArrayList<>(nContigs);
101             for ( int idx = 0; idx != nContigs; ++idx ) {
102                 contigs.add(readContig(input));
103             }
104             for ( int idx = 0; idx != nContigs; ++idx ) {
105                 final int nConnections = input.readInt();
106                 if ( nConnections == 0 ) continue;
107                 final List<Connection> connections = new ArrayList<>(nConnections);
108                 for ( int connIdx = 0; connIdx != nConnections; ++connIdx ) {
109                     connections.add(readConnection(input, contigs));
110                 }
111                 contigs.get(idx).setConnections(connections);
112             }
113             this.assembly = new FermiLiteAssembly(contigs);
114             final List<List<BwaMemAlignment>> contigAlignments = new ArrayList<>(nContigs);
115             for ( int idx = 0; idx != nContigs; ++idx ) {
116                 final int nAlignments = input.readInt();
117                 final List<BwaMemAlignment> alignments = new ArrayList<>(nAlignments);
118                 for ( int alnIdx = 0; alnIdx != nAlignments; ++alnIdx ) {
119                     alignments.add(readAlignment(input));
120                 }
121                 contigAlignments.add(alignments);
122             }
123             this.contigAlignments = contigAlignments;
124         }
125     }
126 
writeContig( final Contig contig, final Output output )127     private static void writeContig( final Contig contig, final Output output ) {
128         output.writeInt(contig.getSequence().length);
129         output.writeBytes(contig.getSequence());
130         final boolean hasCoverage = contig.getPerBaseCoverage() != null;
131         output.writeBoolean(hasCoverage);
132         if ( hasCoverage ) output.writeBytes(contig.getPerBaseCoverage());
133         output.writeInt(contig.getNSupportingReads());
134     }
135 
readContig( final Input input )136     private static Contig readContig( final Input input ) {
137         final int sequenceLen = input.readInt();
138         final byte[] sequence = new byte[sequenceLen];
139         input.readBytes(sequence);
140         byte[] perBaseCoverage = null;
141         if ( input.readBoolean() ) {
142             perBaseCoverage = new byte[sequenceLen];
143             input.readBytes(perBaseCoverage);
144         }
145         final int nSupportingReads = input.readInt();
146         return new Contig(sequence, perBaseCoverage, nSupportingReads);
147     }
148 
writeConnection( final Connection connection, final Map<Contig, Integer> contigMap, final Output output )149     private static void writeConnection( final Connection connection,
150                                          final Map<Contig, Integer> contigMap,
151                                          final Output output ) {
152         output.writeInt(contigMap.get(connection.getTarget()));
153         output.writeInt(connection.getOverlapLen());
154         output.writeBoolean(connection.isRC());
155         output.writeBoolean(connection.isTargetRC());
156     }
157 
readConnection( final Input input, final List<Contig> contigs )158     private static Connection readConnection( final Input input, final List<Contig> contigs ) {
159         final Contig target = contigs.get(input.readInt());
160         final int overlapLen = input.readInt();
161         final boolean isRC = input.readBoolean();
162         final boolean isTargetRC = input.readBoolean();
163         return new Connection(target, overlapLen, isRC, isTargetRC);
164     }
165 
writeAlignment(final BwaMemAlignment alignment, final Output output)166     private static void writeAlignment(final BwaMemAlignment alignment, final Output output) {
167         output.writeInt(alignment.getSamFlag());
168         output.writeInt(alignment.getRefId());
169         output.writeInt(alignment.getRefStart());
170         output.writeInt(alignment.getRefEnd());
171         output.writeInt(alignment.getSeqStart());
172         output.writeInt(alignment.getSeqEnd());
173         output.writeInt(alignment.getMapQual());
174         output.writeInt(alignment.getNMismatches());
175         output.writeInt(alignment.getAlignerScore());
176         output.writeInt(alignment.getSuboptimalScore());
177         output.writeString(alignment.getCigar());
178         output.writeString(alignment.getMDTag());
179         output.writeString(alignment.getXATag());
180         output.writeInt(alignment.getMateRefId());
181         output.writeInt(alignment.getMateRefStart());
182         output.writeInt(alignment.getTemplateLen());
183     }
184 
readAlignment(final Input input)185     private static BwaMemAlignment readAlignment(final Input input) {
186         final int samFlag = input.readInt();
187         final int refId = input.readInt();
188         final int refStart = input.readInt();
189         final int refEnd = input.readInt();
190         final int seqStart = input.readInt();
191         final int seqEnd = input.readInt();
192         final int mapQual = input.readInt();
193         final int nMismatches = input.readInt();
194         final int alignerScore = input.readInt();
195         final int suboptimalScore = input.readInt();
196         final String cigar = input.readString();
197         final String mdTag = input.readString();
198         final String xaTag = input.readString();
199         final int mateRefId = input.readInt();
200         final int mateRefStart = input.readInt();
201         final int templateLen = input.readInt();
202         return new BwaMemAlignment(samFlag, refId, refStart, refEnd, seqStart, seqEnd,
203                 mapQual, nMismatches, alignerScore, suboptimalScore,
204                 cigar, mdTag, xaTag,
205                 mateRefId, mateRefStart, templateLen);
206     }
207 
serialize( final Kryo kryo, final Output output )208     private void serialize( final Kryo kryo, final Output output ) {
209         output.writeInt(assemblyId);
210         output.writeString(errorMessage);
211         output.writeInt(secondsInAssembly);
212         if ( errorMessage == null ) {
213             final int nContigs = assembly.getNContigs();
214             final Map<Contig, Integer> contigMap = new HashMap<>();
215             output.writeInt(nContigs);
216             for ( int idx = 0; idx != nContigs; ++idx ) {
217                 final Contig contig = assembly.getContig(idx);
218                 writeContig(contig, output);
219                 contigMap.put(contig, idx);
220             }
221             for ( final Contig contig : assembly.getContigs() ) {
222                 final List<Connection> connections = contig.getConnections();
223                 output.writeInt(connections.size());
224                 for ( final Connection connection : connections ) {
225                     writeConnection(connection, contigMap, output);
226                 }
227             }
228             for ( final List<BwaMemAlignment> alignments : contigAlignments ) {
229                 output.writeInt(alignments.size());
230                 for ( final BwaMemAlignment alignment : alignments ) {
231                     writeAlignment(alignment, output);
232                 }
233             }
234         }
235     }
236 
237     public static final class Serializer extends com.esotericsoftware.kryo.Serializer<AlignedAssemblyOrExcuse> {
238         @Override
write( final Kryo kryo, final Output output, final AlignedAssemblyOrExcuse alignedAssemblyOrExcuse )239         public void write( final Kryo kryo, final Output output, final AlignedAssemblyOrExcuse alignedAssemblyOrExcuse ) {
240             alignedAssemblyOrExcuse.serialize(kryo, output);
241         }
242 
243         @Override
read( final Kryo kryo, final Input input, final Class<AlignedAssemblyOrExcuse> klass )244         public AlignedAssemblyOrExcuse read( final Kryo kryo, final Input input, final Class<AlignedAssemblyOrExcuse> klass ) {
245             return new AlignedAssemblyOrExcuse(kryo, input);
246         }
247     }
248 
249     // =================================================================================================================
250 
toSAMStreamForAlignmentsOfThisAssembly(final SAMFileHeader header, final List<String> refNames, final SAMReadGroupRecord contigAlignmentsReadGroup)251     public Stream<SAMRecord> toSAMStreamForAlignmentsOfThisAssembly(final SAMFileHeader header, final List<String> refNames,
252                                                                     final SAMReadGroupRecord contigAlignmentsReadGroup) {
253         return IntStream.range(0, contigAlignments.size()).boxed()
254                 .flatMap(contigIdx ->
255                         BwaMemAlignmentUtils.toSAMStreamForRead(formatContigName(assemblyId, contigIdx),
256                                 assembly.getContig(contigIdx).getSequence(), contigAlignments.get(contigIdx),
257                                 header, refNames, contigAlignmentsReadGroup)
258                         );
259     }
260 
formatContigName( final int assemblyId, final int contigIdx )261     public static String formatContigName( final int assemblyId, final int contigIdx ) {
262         return formatAssemblyID(assemblyId) + ":" + formatContigID(contigIdx);
263     }
264 
formatAssemblyID( final int assemblyId )265     public static String formatAssemblyID( final int assemblyId ) {
266         return String.format("asm%06d", assemblyId);
267     }
268 
formatContigID( final int contigIdx )269     private static String formatContigID( final int contigIdx ) {
270         return String.format("tig%05d", contigIdx);
271     }
272 
273     /**
274      * write a file describing each interval
275      */
writeIntervalFile( final String intervalFile, final SAMFileHeader header, final List<SVInterval> intervals, final List<AlignedAssemblyOrExcuse> intervalDispositions )276     public static void writeIntervalFile( final String intervalFile,
277                                           final SAMFileHeader header,
278                                           final List<SVInterval> intervals,
279                                           final List<AlignedAssemblyOrExcuse> intervalDispositions ) {
280         Utils.validate(intervalFile != null && header != null && intervals != null && intervalDispositions != null,
281                 "At least one of the arguments is null.");
282 
283         final Map<Integer, AlignedAssemblyOrExcuse> resultsMap = new HashMap<>();
284         intervalDispositions.forEach(alignedAssemblyOrExcuse ->
285                 resultsMap.put(alignedAssemblyOrExcuse.getAssemblyId(), alignedAssemblyOrExcuse));
286 
287         try ( final OutputStreamWriter writer =
288                       new OutputStreamWriter(new BufferedOutputStream(BucketUtils.createFile(intervalFile))) ) {
289             final List<SAMSequenceRecord> contigs = header.getSequenceDictionary().getSequences();
290             final int nIntervals = intervals.size();
291             for ( int intervalIdx = 0; intervalIdx != nIntervals; ++intervalIdx ) {
292                 final SVInterval interval = intervals.get(intervalIdx);
293                 Utils.nonNull(interval, "interval is null for " + intervalIdx);
294                 final String seqName = contigs.get(interval.getContig()).getSequenceName();
295                 final AlignedAssemblyOrExcuse alignedAssemblyOrExcuse = resultsMap.get(intervalIdx);
296                 final String disposition;
297                 if ( alignedAssemblyOrExcuse == null ) {
298                     disposition = "unknown";
299                 } else if ( alignedAssemblyOrExcuse.getErrorMessage() != null ) {
300                     disposition = alignedAssemblyOrExcuse.getErrorMessage();
301                 } else {
302                     disposition = "produced " + alignedAssemblyOrExcuse.getAssembly().getNContigs() +
303                             " contigs in " + alignedAssemblyOrExcuse.getSecondsInAssembly() + " secs.";
304                 }
305                 writer.write(intervalIdx + "\t" +
306                         seqName + ":" + interval.getStart() + "-" + interval.getEnd() + "\t" +
307                         disposition + "\n");
308             }
309         } catch ( final IOException ioe ) {
310             throw new UserException.CouldNotCreateOutputFile("Can't write intervals file " + intervalFile, ioe);
311         }
312     }
313 
314     /**
315      * write a SAM file containing records for each aligned contig
316      */
writeAssemblySAMFile(final String outputAssembliesFile, final List<AlignedAssemblyOrExcuse> alignedAssemblyOrExcuseList, final SAMFileHeader header, final SAMFileHeader.SortOrder assemblyAlnSortOrder)317     static void writeAssemblySAMFile(final String outputAssembliesFile,
318                                      final List<AlignedAssemblyOrExcuse> alignedAssemblyOrExcuseList,
319                                      final SAMFileHeader header,
320                                      final SAMFileHeader.SortOrder assemblyAlnSortOrder) {
321 
322         final String sampleId = SVUtils.getSampleId(header);
323         final SAMReadGroupRecord contigAlignmentsReadGroup = new SAMReadGroupRecord(SVUtils.GATKSV_CONTIG_ALIGNMENTS_READ_GROUP_ID);
324         contigAlignmentsReadGroup.setSample(sampleId);
325 
326         final SAMFileHeader cleanHeader = new SAMFileHeader(header.getSequenceDictionary());
327         cleanHeader.addReadGroup(contigAlignmentsReadGroup);
328         cleanHeader.setSortOrder(assemblyAlnSortOrder);
329         final SAMRecordComparator samRecordComparator = SVUtils.getSamRecordComparator(assemblyAlnSortOrder);
330 
331         final List<String> refNames = SequenceDictionaryUtils.getContigNamesList(cleanHeader.getSequenceDictionary());
332         final Stream<SAMRecord> samRecordStream =
333                 alignedAssemblyOrExcuseList.stream().filter(AlignedAssemblyOrExcuse::isNotFailure)
334                         .flatMap(aa -> aa.toSAMStreamForAlignmentsOfThisAssembly(cleanHeader, refNames, contigAlignmentsReadGroup))
335                 .sorted(samRecordComparator);
336         SVFileUtils.writeSAMFile(outputAssembliesFile, samRecordStream.iterator(), cleanHeader, true);
337     }
338 }
339