1 package org.broadinstitute.hellbender.tools.spark.sv.evidence; 2 3 import com.esotericsoftware.kryo.DefaultSerializer; 4 import com.esotericsoftware.kryo.Kryo; 5 import com.esotericsoftware.kryo.io.Input; 6 import com.esotericsoftware.kryo.io.Output; 7 import htsjdk.samtools.*; 8 import org.broadinstitute.hellbender.exceptions.UserException; 9 import org.broadinstitute.hellbender.tools.spark.sv.utils.SVFileUtils; 10 import org.broadinstitute.hellbender.tools.spark.sv.utils.SVInterval; 11 import org.broadinstitute.hellbender.tools.spark.sv.utils.SVUtils; 12 import org.broadinstitute.hellbender.utils.SequenceDictionaryUtils; 13 import org.broadinstitute.hellbender.utils.Utils; 14 import org.broadinstitute.hellbender.utils.bwa.BwaMemAlignment; 15 import org.broadinstitute.hellbender.utils.bwa.BwaMemAlignmentUtils; 16 import org.broadinstitute.hellbender.utils.fermi.FermiLiteAssembly; 17 import org.broadinstitute.hellbender.utils.fermi.FermiLiteAssembly.Connection; 18 import org.broadinstitute.hellbender.utils.fermi.FermiLiteAssembly.Contig; 19 import org.broadinstitute.hellbender.utils.gcs.BucketUtils; 20 21 import java.io.BufferedOutputStream; 22 import java.io.IOException; 23 import java.io.OutputStreamWriter; 24 import java.util.ArrayList; 25 import java.util.HashMap; 26 import java.util.List; 27 import java.util.Map; 28 import java.util.stream.IntStream; 29 import java.util.stream.Stream; 30 31 /** 32 * An assembly with its contigs aligned to reference, or a reason that there isn't an assembly. 33 */ 34 @DefaultSerializer(AlignedAssemblyOrExcuse.Serializer.class) 35 public final class AlignedAssemblyOrExcuse { 36 private final int assemblyId; 37 private final String errorMessage; 38 private final FermiLiteAssembly assembly; 39 private final List<List<BwaMemAlignment>> contigAlignments; 40 private final int secondsInAssembly; 41 getSecondsInAssembly()42 public int getSecondsInAssembly() { return secondsInAssembly; } 43 getAssemblyId()44 public int getAssemblyId() { 45 return assemblyId; 46 } 47 48 /** 49 * Either this is null, or the assembly and list of alignments is null. 50 */ getErrorMessage()51 public String getErrorMessage() { 52 return errorMessage; 53 } 54 isNotFailure()55 public boolean isNotFailure() { 56 return errorMessage==null; 57 } 58 getAssembly()59 public FermiLiteAssembly getAssembly() { 60 return assembly; 61 } 62 63 /** 64 * List is equal in length to the number of contigs in the assembly. 65 */ getContigAlignments()66 public List<List<BwaMemAlignment>> getContigAlignments() { 67 return contigAlignments; 68 } 69 AlignedAssemblyOrExcuse( final int assemblyId, final String errorMessage )70 public AlignedAssemblyOrExcuse( final int assemblyId, final String errorMessage ) { 71 this.assemblyId = assemblyId; 72 this.errorMessage = errorMessage; 73 this.assembly = null; 74 this.contigAlignments = null; 75 this.secondsInAssembly = 0; 76 } 77 AlignedAssemblyOrExcuse( final int assemblyId, final FermiLiteAssembly assembly, final int secondsInAssembly, final List<List<BwaMemAlignment>> contigAlignments )78 public AlignedAssemblyOrExcuse( final int assemblyId, final FermiLiteAssembly assembly, final int secondsInAssembly, 79 final List<List<BwaMemAlignment>> contigAlignments ) { 80 Utils.validate(assembly.getNContigs()==contigAlignments.size(), 81 "Number of contigs in assembly doesn't match length of list of alignments."); 82 Utils.validateArg(assembly.getContigs().stream().noneMatch(contig -> contig.getConnections()==null), 83 "Some assembly has contigs that have null connections"); 84 this.assemblyId = assemblyId; 85 this.errorMessage = null; 86 this.assembly = assembly; 87 this.contigAlignments = contigAlignments; 88 this.secondsInAssembly = secondsInAssembly; 89 } 90 AlignedAssemblyOrExcuse( final Kryo kryo, final Input input )91 private AlignedAssemblyOrExcuse( final Kryo kryo, final Input input ) { 92 this.assemblyId = input.readInt(); 93 this.errorMessage = input.readString(); 94 this.secondsInAssembly = input.readInt(); 95 if ( errorMessage != null ) { 96 this.assembly = null; 97 this.contigAlignments = null; 98 } else { 99 final int nContigs = input.readInt(); 100 final List<Contig> contigs = new ArrayList<>(nContigs); 101 for ( int idx = 0; idx != nContigs; ++idx ) { 102 contigs.add(readContig(input)); 103 } 104 for ( int idx = 0; idx != nContigs; ++idx ) { 105 final int nConnections = input.readInt(); 106 if ( nConnections == 0 ) continue; 107 final List<Connection> connections = new ArrayList<>(nConnections); 108 for ( int connIdx = 0; connIdx != nConnections; ++connIdx ) { 109 connections.add(readConnection(input, contigs)); 110 } 111 contigs.get(idx).setConnections(connections); 112 } 113 this.assembly = new FermiLiteAssembly(contigs); 114 final List<List<BwaMemAlignment>> contigAlignments = new ArrayList<>(nContigs); 115 for ( int idx = 0; idx != nContigs; ++idx ) { 116 final int nAlignments = input.readInt(); 117 final List<BwaMemAlignment> alignments = new ArrayList<>(nAlignments); 118 for ( int alnIdx = 0; alnIdx != nAlignments; ++alnIdx ) { 119 alignments.add(readAlignment(input)); 120 } 121 contigAlignments.add(alignments); 122 } 123 this.contigAlignments = contigAlignments; 124 } 125 } 126 writeContig( final Contig contig, final Output output )127 private static void writeContig( final Contig contig, final Output output ) { 128 output.writeInt(contig.getSequence().length); 129 output.writeBytes(contig.getSequence()); 130 final boolean hasCoverage = contig.getPerBaseCoverage() != null; 131 output.writeBoolean(hasCoverage); 132 if ( hasCoverage ) output.writeBytes(contig.getPerBaseCoverage()); 133 output.writeInt(contig.getNSupportingReads()); 134 } 135 readContig( final Input input )136 private static Contig readContig( final Input input ) { 137 final int sequenceLen = input.readInt(); 138 final byte[] sequence = new byte[sequenceLen]; 139 input.readBytes(sequence); 140 byte[] perBaseCoverage = null; 141 if ( input.readBoolean() ) { 142 perBaseCoverage = new byte[sequenceLen]; 143 input.readBytes(perBaseCoverage); 144 } 145 final int nSupportingReads = input.readInt(); 146 return new Contig(sequence, perBaseCoverage, nSupportingReads); 147 } 148 writeConnection( final Connection connection, final Map<Contig, Integer> contigMap, final Output output )149 private static void writeConnection( final Connection connection, 150 final Map<Contig, Integer> contigMap, 151 final Output output ) { 152 output.writeInt(contigMap.get(connection.getTarget())); 153 output.writeInt(connection.getOverlapLen()); 154 output.writeBoolean(connection.isRC()); 155 output.writeBoolean(connection.isTargetRC()); 156 } 157 readConnection( final Input input, final List<Contig> contigs )158 private static Connection readConnection( final Input input, final List<Contig> contigs ) { 159 final Contig target = contigs.get(input.readInt()); 160 final int overlapLen = input.readInt(); 161 final boolean isRC = input.readBoolean(); 162 final boolean isTargetRC = input.readBoolean(); 163 return new Connection(target, overlapLen, isRC, isTargetRC); 164 } 165 writeAlignment(final BwaMemAlignment alignment, final Output output)166 private static void writeAlignment(final BwaMemAlignment alignment, final Output output) { 167 output.writeInt(alignment.getSamFlag()); 168 output.writeInt(alignment.getRefId()); 169 output.writeInt(alignment.getRefStart()); 170 output.writeInt(alignment.getRefEnd()); 171 output.writeInt(alignment.getSeqStart()); 172 output.writeInt(alignment.getSeqEnd()); 173 output.writeInt(alignment.getMapQual()); 174 output.writeInt(alignment.getNMismatches()); 175 output.writeInt(alignment.getAlignerScore()); 176 output.writeInt(alignment.getSuboptimalScore()); 177 output.writeString(alignment.getCigar()); 178 output.writeString(alignment.getMDTag()); 179 output.writeString(alignment.getXATag()); 180 output.writeInt(alignment.getMateRefId()); 181 output.writeInt(alignment.getMateRefStart()); 182 output.writeInt(alignment.getTemplateLen()); 183 } 184 readAlignment(final Input input)185 private static BwaMemAlignment readAlignment(final Input input) { 186 final int samFlag = input.readInt(); 187 final int refId = input.readInt(); 188 final int refStart = input.readInt(); 189 final int refEnd = input.readInt(); 190 final int seqStart = input.readInt(); 191 final int seqEnd = input.readInt(); 192 final int mapQual = input.readInt(); 193 final int nMismatches = input.readInt(); 194 final int alignerScore = input.readInt(); 195 final int suboptimalScore = input.readInt(); 196 final String cigar = input.readString(); 197 final String mdTag = input.readString(); 198 final String xaTag = input.readString(); 199 final int mateRefId = input.readInt(); 200 final int mateRefStart = input.readInt(); 201 final int templateLen = input.readInt(); 202 return new BwaMemAlignment(samFlag, refId, refStart, refEnd, seqStart, seqEnd, 203 mapQual, nMismatches, alignerScore, suboptimalScore, 204 cigar, mdTag, xaTag, 205 mateRefId, mateRefStart, templateLen); 206 } 207 serialize( final Kryo kryo, final Output output )208 private void serialize( final Kryo kryo, final Output output ) { 209 output.writeInt(assemblyId); 210 output.writeString(errorMessage); 211 output.writeInt(secondsInAssembly); 212 if ( errorMessage == null ) { 213 final int nContigs = assembly.getNContigs(); 214 final Map<Contig, Integer> contigMap = new HashMap<>(); 215 output.writeInt(nContigs); 216 for ( int idx = 0; idx != nContigs; ++idx ) { 217 final Contig contig = assembly.getContig(idx); 218 writeContig(contig, output); 219 contigMap.put(contig, idx); 220 } 221 for ( final Contig contig : assembly.getContigs() ) { 222 final List<Connection> connections = contig.getConnections(); 223 output.writeInt(connections.size()); 224 for ( final Connection connection : connections ) { 225 writeConnection(connection, contigMap, output); 226 } 227 } 228 for ( final List<BwaMemAlignment> alignments : contigAlignments ) { 229 output.writeInt(alignments.size()); 230 for ( final BwaMemAlignment alignment : alignments ) { 231 writeAlignment(alignment, output); 232 } 233 } 234 } 235 } 236 237 public static final class Serializer extends com.esotericsoftware.kryo.Serializer<AlignedAssemblyOrExcuse> { 238 @Override write( final Kryo kryo, final Output output, final AlignedAssemblyOrExcuse alignedAssemblyOrExcuse )239 public void write( final Kryo kryo, final Output output, final AlignedAssemblyOrExcuse alignedAssemblyOrExcuse ) { 240 alignedAssemblyOrExcuse.serialize(kryo, output); 241 } 242 243 @Override read( final Kryo kryo, final Input input, final Class<AlignedAssemblyOrExcuse> klass )244 public AlignedAssemblyOrExcuse read( final Kryo kryo, final Input input, final Class<AlignedAssemblyOrExcuse> klass ) { 245 return new AlignedAssemblyOrExcuse(kryo, input); 246 } 247 } 248 249 // ================================================================================================================= 250 toSAMStreamForAlignmentsOfThisAssembly(final SAMFileHeader header, final List<String> refNames, final SAMReadGroupRecord contigAlignmentsReadGroup)251 public Stream<SAMRecord> toSAMStreamForAlignmentsOfThisAssembly(final SAMFileHeader header, final List<String> refNames, 252 final SAMReadGroupRecord contigAlignmentsReadGroup) { 253 return IntStream.range(0, contigAlignments.size()).boxed() 254 .flatMap(contigIdx -> 255 BwaMemAlignmentUtils.toSAMStreamForRead(formatContigName(assemblyId, contigIdx), 256 assembly.getContig(contigIdx).getSequence(), contigAlignments.get(contigIdx), 257 header, refNames, contigAlignmentsReadGroup) 258 ); 259 } 260 formatContigName( final int assemblyId, final int contigIdx )261 public static String formatContigName( final int assemblyId, final int contigIdx ) { 262 return formatAssemblyID(assemblyId) + ":" + formatContigID(contigIdx); 263 } 264 formatAssemblyID( final int assemblyId )265 public static String formatAssemblyID( final int assemblyId ) { 266 return String.format("asm%06d", assemblyId); 267 } 268 formatContigID( final int contigIdx )269 private static String formatContigID( final int contigIdx ) { 270 return String.format("tig%05d", contigIdx); 271 } 272 273 /** 274 * write a file describing each interval 275 */ writeIntervalFile( final String intervalFile, final SAMFileHeader header, final List<SVInterval> intervals, final List<AlignedAssemblyOrExcuse> intervalDispositions )276 public static void writeIntervalFile( final String intervalFile, 277 final SAMFileHeader header, 278 final List<SVInterval> intervals, 279 final List<AlignedAssemblyOrExcuse> intervalDispositions ) { 280 Utils.validate(intervalFile != null && header != null && intervals != null && intervalDispositions != null, 281 "At least one of the arguments is null."); 282 283 final Map<Integer, AlignedAssemblyOrExcuse> resultsMap = new HashMap<>(); 284 intervalDispositions.forEach(alignedAssemblyOrExcuse -> 285 resultsMap.put(alignedAssemblyOrExcuse.getAssemblyId(), alignedAssemblyOrExcuse)); 286 287 try ( final OutputStreamWriter writer = 288 new OutputStreamWriter(new BufferedOutputStream(BucketUtils.createFile(intervalFile))) ) { 289 final List<SAMSequenceRecord> contigs = header.getSequenceDictionary().getSequences(); 290 final int nIntervals = intervals.size(); 291 for ( int intervalIdx = 0; intervalIdx != nIntervals; ++intervalIdx ) { 292 final SVInterval interval = intervals.get(intervalIdx); 293 Utils.nonNull(interval, "interval is null for " + intervalIdx); 294 final String seqName = contigs.get(interval.getContig()).getSequenceName(); 295 final AlignedAssemblyOrExcuse alignedAssemblyOrExcuse = resultsMap.get(intervalIdx); 296 final String disposition; 297 if ( alignedAssemblyOrExcuse == null ) { 298 disposition = "unknown"; 299 } else if ( alignedAssemblyOrExcuse.getErrorMessage() != null ) { 300 disposition = alignedAssemblyOrExcuse.getErrorMessage(); 301 } else { 302 disposition = "produced " + alignedAssemblyOrExcuse.getAssembly().getNContigs() + 303 " contigs in " + alignedAssemblyOrExcuse.getSecondsInAssembly() + " secs."; 304 } 305 writer.write(intervalIdx + "\t" + 306 seqName + ":" + interval.getStart() + "-" + interval.getEnd() + "\t" + 307 disposition + "\n"); 308 } 309 } catch ( final IOException ioe ) { 310 throw new UserException.CouldNotCreateOutputFile("Can't write intervals file " + intervalFile, ioe); 311 } 312 } 313 314 /** 315 * write a SAM file containing records for each aligned contig 316 */ writeAssemblySAMFile(final String outputAssembliesFile, final List<AlignedAssemblyOrExcuse> alignedAssemblyOrExcuseList, final SAMFileHeader header, final SAMFileHeader.SortOrder assemblyAlnSortOrder)317 static void writeAssemblySAMFile(final String outputAssembliesFile, 318 final List<AlignedAssemblyOrExcuse> alignedAssemblyOrExcuseList, 319 final SAMFileHeader header, 320 final SAMFileHeader.SortOrder assemblyAlnSortOrder) { 321 322 final String sampleId = SVUtils.getSampleId(header); 323 final SAMReadGroupRecord contigAlignmentsReadGroup = new SAMReadGroupRecord(SVUtils.GATKSV_CONTIG_ALIGNMENTS_READ_GROUP_ID); 324 contigAlignmentsReadGroup.setSample(sampleId); 325 326 final SAMFileHeader cleanHeader = new SAMFileHeader(header.getSequenceDictionary()); 327 cleanHeader.addReadGroup(contigAlignmentsReadGroup); 328 cleanHeader.setSortOrder(assemblyAlnSortOrder); 329 final SAMRecordComparator samRecordComparator = SVUtils.getSamRecordComparator(assemblyAlnSortOrder); 330 331 final List<String> refNames = SequenceDictionaryUtils.getContigNamesList(cleanHeader.getSequenceDictionary()); 332 final Stream<SAMRecord> samRecordStream = 333 alignedAssemblyOrExcuseList.stream().filter(AlignedAssemblyOrExcuse::isNotFailure) 334 .flatMap(aa -> aa.toSAMStreamForAlignmentsOfThisAssembly(cleanHeader, refNames, contigAlignmentsReadGroup)) 335 .sorted(samRecordComparator); 336 SVFileUtils.writeSAMFile(outputAssembliesFile, samRecordStream.iterator(), cleanHeader, true); 337 } 338 } 339