1 package org.broadinstitute.hellbender.engine; 2 3 import com.google.common.annotations.VisibleForTesting; 4 import htsjdk.samtools.MergingSamRecordIterator; 5 import htsjdk.samtools.SAMException; 6 import htsjdk.samtools.SAMFileHeader; 7 import htsjdk.samtools.SAMRecord; 8 import htsjdk.samtools.SAMSequenceDictionary; 9 import htsjdk.samtools.SamFileHeaderMerger; 10 import htsjdk.samtools.SamInputResource; 11 import htsjdk.samtools.SamReader; 12 import htsjdk.samtools.SamReaderFactory; 13 import htsjdk.samtools.util.CloseableIterator; 14 import htsjdk.samtools.util.IOUtil; 15 import org.apache.logging.log4j.LogManager; 16 import org.apache.logging.log4j.Logger; 17 import org.broadinstitute.hellbender.exceptions.GATKException; 18 import org.broadinstitute.hellbender.exceptions.UserException; 19 import org.broadinstitute.hellbender.utils.IntervalUtils; 20 import org.broadinstitute.hellbender.utils.SimpleInterval; 21 import org.broadinstitute.hellbender.utils.Utils; 22 import org.broadinstitute.hellbender.utils.gcs.BucketUtils; 23 import org.broadinstitute.hellbender.utils.iterators.SAMRecordToReadIterator; 24 import org.broadinstitute.hellbender.utils.iterators.SamReaderQueryingIterator; 25 import org.broadinstitute.hellbender.utils.read.GATKRead; 26 import org.broadinstitute.hellbender.utils.read.ReadConstants; 27 28 import java.io.IOException; 29 import java.nio.channels.SeekableByteChannel; 30 import java.nio.file.Path; 31 import java.util.ArrayList; 32 import java.util.Arrays; 33 import java.util.Iterator; 34 import java.util.LinkedHashMap; 35 import java.util.List; 36 import java.util.Map; 37 import java.util.Set; 38 import java.util.function.Function; 39 import java.util.stream.Collectors; 40 41 /** 42 * Manages traversals and queries over sources of reads which are accessible via {@link Path}s 43 * (for now, SAM/BAM/CRAM files only). 44 * 45 * Two basic operations are available: 46 * 47 * -Iteration over all reads, optionally restricted to reads that overlap a set of intervals 48 * -Targeted queries by one interval at a time 49 */ 50 public final class ReadsPathDataSource implements ReadsDataSource { 51 private static final Logger logger = LogManager.getLogger(ReadsPathDataSource.class); 52 53 /** 54 * Mapping from SamReaders to iterators over the reads from each reader. Only one 55 * iterator can be open from a given reader at a time (this is a restriction 56 * in htsjdk). Iterator is set to null for a reader if no iteration is currently 57 * active on that reader. 58 */ 59 private final Map<SamReader, CloseableIterator<SAMRecord>> readers; 60 61 /** 62 * Hang onto the input files so that we can print useful errors about them 63 */ 64 private final Map<SamReader, Path> backingPaths; 65 66 /** 67 * Only reads that overlap these intervals (and unmapped reads, if {@link #traverseUnmapped} is set) will be returned 68 * during a full iteration. Null if iteration is unbounded. 69 * 70 * Individual queries are unaffected by these intervals -- only traversals initiated via {@link #iterator} are affected. 71 */ 72 private List<SimpleInterval> intervalsForTraversal; 73 74 /** 75 * If true, restrict traversals to unmapped reads (and reads overlapping any {@link #intervalsForTraversal}, if set). 76 * False if iteration is unbounded or bounded only by our {@link #intervalsForTraversal}. 77 * 78 * Note that this setting covers only unmapped reads that have no position -- unmapped reads that are assigned the 79 * position of their mates will be returned by queries overlapping that position. 80 * 81 * Individual queries are unaffected by this setting -- only traversals initiated via {@link #iterator} are affected. 82 */ 83 private boolean traverseUnmapped; 84 85 /** 86 * Used to create a merged Sam header when we're dealing with multiple readers. Null if we only have a single reader. 87 */ 88 private final SamFileHeaderMerger headerMerger; 89 90 /** 91 * Are indices available for all files? 92 */ 93 private boolean indicesAvailable; 94 95 /** 96 * Has it been closed already. 97 */ 98 private boolean isClosed; 99 100 /** 101 * Initialize this data source with a single SAM/BAM file and validation stringency SILENT. 102 * 103 * @param samFile SAM/BAM file, not null. 104 */ ReadsPathDataSource( final Path samFile )105 public ReadsPathDataSource( final Path samFile ) { 106 this(samFile != null ? Arrays.asList(samFile) : null, (SamReaderFactory)null); 107 } 108 109 /** 110 * Initialize this data source with multiple SAM/BAM files and validation stringency SILENT. 111 * 112 * @param samFiles SAM/BAM files, not null. 113 */ ReadsPathDataSource( final List<Path> samFiles )114 public ReadsPathDataSource( final List<Path> samFiles ) { 115 this(samFiles, (SamReaderFactory)null); 116 } 117 118 /** 119 * Initialize this data source with a single SAM/BAM file and a custom SamReaderFactory 120 * 121 * @param samPath path to SAM/BAM file, not null. 122 * @param customSamReaderFactory SamReaderFactory to use, if null a default factory with no reference and validation 123 * stringency SILENT is used. 124 */ ReadsPathDataSource( final Path samPath, SamReaderFactory customSamReaderFactory )125 public ReadsPathDataSource( final Path samPath, SamReaderFactory customSamReaderFactory ) { 126 this(samPath != null ? Arrays.asList(samPath) : null, customSamReaderFactory); 127 } 128 129 /** 130 * Initialize this data source with multiple SAM/BAM files and a custom SamReaderFactory 131 * 132 * @param samPaths path to SAM/BAM file, not null. 133 * @param customSamReaderFactory SamReaderFactory to use, if null a default factory with no reference and validation 134 * stringency SILENT is used. 135 */ ReadsPathDataSource( final List<Path> samPaths, SamReaderFactory customSamReaderFactory )136 public ReadsPathDataSource( final List<Path> samPaths, SamReaderFactory customSamReaderFactory ) { 137 this(samPaths, null, customSamReaderFactory, 0, 0); 138 } 139 140 /** 141 * Initialize this data source with multiple SAM/BAM/CRAM files, and explicit indices for those files. 142 * 143 * @param samPaths paths to SAM/BAM/CRAM files, not null 144 * @param samIndices indices for all of the SAM/BAM/CRAM files, in the same order as samPaths. May be null, 145 * in which case index paths are inferred automatically. 146 */ ReadsPathDataSource( final List<Path> samPaths, final List<Path> samIndices )147 public ReadsPathDataSource( final List<Path> samPaths, final List<Path> samIndices ) { 148 this(samPaths, samIndices, null, 0, 0); 149 } 150 151 /** 152 * Initialize this data source with multiple SAM/BAM/CRAM files, explicit indices for those files, 153 * and a custom SamReaderFactory. 154 * 155 * @param samPaths paths to SAM/BAM/CRAM files, not null 156 * @param samIndices indices for all of the SAM/BAM/CRAM files, in the same order as samPaths. May be null, 157 * in which case index paths are inferred automatically. 158 * @param customSamReaderFactory SamReaderFactory to use, if null a default factory with no reference and validation 159 * stringency SILENT is used. 160 */ ReadsPathDataSource( final List<Path> samPaths, final List<Path> samIndices, SamReaderFactory customSamReaderFactory )161 public ReadsPathDataSource( final List<Path> samPaths, final List<Path> samIndices, 162 SamReaderFactory customSamReaderFactory ) { 163 this(samPaths, samIndices, customSamReaderFactory, 0, 0); 164 } 165 166 /** 167 * Initialize this data source with multiple SAM/BAM/CRAM files, explicit indices for those files, 168 * and a custom SamReaderFactory. 169 * 170 * @param samPaths paths to SAM/BAM/CRAM files, not null 171 * @param samIndices indices for all of the SAM/BAM/CRAM files, in the same order as samPaths. May be null, 172 * in which case index paths are inferred automatically. 173 * @param customSamReaderFactory SamReaderFactory to use, if null a default factory with no reference and validation 174 * stringency SILENT is used. 175 * @param cloudPrefetchBuffer MB size of caching/prefetching wrapper for the data, if on Google Cloud (0 to disable). 176 * @param cloudIndexPrefetchBuffer MB size of caching/prefetching wrapper for the index, if on Google Cloud (0 to disable). 177 */ ReadsPathDataSource( final List<Path> samPaths, final List<Path> samIndices, SamReaderFactory customSamReaderFactory, int cloudPrefetchBuffer, int cloudIndexPrefetchBuffer)178 public ReadsPathDataSource( final List<Path> samPaths, final List<Path> samIndices, 179 SamReaderFactory customSamReaderFactory, 180 int cloudPrefetchBuffer, int cloudIndexPrefetchBuffer) { 181 this(samPaths, samIndices, customSamReaderFactory, 182 BucketUtils.getPrefetchingWrapper(cloudPrefetchBuffer), 183 BucketUtils.getPrefetchingWrapper(cloudIndexPrefetchBuffer) ); 184 } 185 186 187 /** 188 * Initialize this data source with multiple SAM/BAM/CRAM files, explicit indices for those files, 189 * and a custom SamReaderFactory. 190 * 191 * @param samPaths paths to SAM/BAM/CRAM files, not null 192 * @param samIndices indices for all of the SAM/BAM/CRAM files, in the same order as samPaths. May be null, 193 * in which case index paths are inferred automatically. 194 * @param customSamReaderFactory SamReaderFactory to use, if null a default factory with no reference and validation 195 * stringency SILENT is used. 196 * @param cloudWrapper caching/prefetching wrapper for the data, if on Google Cloud. 197 * @param cloudIndexWrapper caching/prefetching wrapper for the index, if on Google Cloud. 198 */ ReadsPathDataSource( final List<Path> samPaths, final List<Path> samIndices, SamReaderFactory customSamReaderFactory, Function<SeekableByteChannel, SeekableByteChannel> cloudWrapper, Function<SeekableByteChannel, SeekableByteChannel> cloudIndexWrapper )199 public ReadsPathDataSource( final List<Path> samPaths, final List<Path> samIndices, 200 SamReaderFactory customSamReaderFactory, 201 Function<SeekableByteChannel, SeekableByteChannel> cloudWrapper, 202 Function<SeekableByteChannel, SeekableByteChannel> cloudIndexWrapper ) { 203 Utils.nonNull(samPaths); 204 Utils.nonEmpty(samPaths, "ReadsPathDataSource cannot be created from empty file list"); 205 206 if ( samIndices != null && samPaths.size() != samIndices.size() ) { 207 throw new UserException(String.format("Must have the same number of BAM/CRAM/SAM paths and indices. Saw %d BAM/CRAM/SAMs but %d indices", 208 samPaths.size(), samIndices.size())); 209 } 210 211 readers = new LinkedHashMap<>(samPaths.size() * 2); 212 backingPaths = new LinkedHashMap<>(samPaths.size() * 2); 213 indicesAvailable = true; 214 215 final SamReaderFactory samReaderFactory = 216 customSamReaderFactory == null ? 217 SamReaderFactory.makeDefault().validationStringency(ReadConstants.DEFAULT_READ_VALIDATION_STRINGENCY) : 218 customSamReaderFactory; 219 220 int samCount = 0; 221 for ( final Path samPath : samPaths ) { 222 // Ensure each file can be read 223 try { 224 IOUtil.assertFileIsReadable(samPath); 225 } 226 catch ( SAMException|IllegalArgumentException e ) { 227 throw new UserException.CouldNotReadInputFile(samPath.toString(), e); 228 } 229 230 Function<SeekableByteChannel, SeekableByteChannel> wrapper = 231 (BucketUtils.isEligibleForPrefetching(samPath) 232 ? cloudWrapper 233 : Function.identity()); 234 // if samIndices==null then we'll guess the index name from the file name. 235 // If the file's on the cloud, then the search will only consider locations that are also 236 // in the cloud. 237 Function<SeekableByteChannel, SeekableByteChannel> indexWrapper = 238 ((samIndices != null && BucketUtils.isEligibleForPrefetching(samIndices.get(samCount)) 239 || (samIndices == null && BucketUtils.isEligibleForPrefetching(samPath))) 240 ? cloudIndexWrapper 241 : Function.identity()); 242 243 SamReader reader; 244 if ( samIndices == null ) { 245 reader = samReaderFactory.open(samPath, wrapper, indexWrapper); 246 } 247 else { 248 final SamInputResource samResource = SamInputResource.of(samPath, wrapper); 249 Path indexPath = samIndices.get(samCount); 250 samResource.index(indexPath, indexWrapper); 251 reader = samReaderFactory.open(samResource); 252 } 253 254 // Ensure that each file has an index 255 if ( ! reader.hasIndex() ) { 256 indicesAvailable = false; 257 } 258 259 readers.put(reader, null); 260 backingPaths.put(reader, samPath); 261 ++samCount; 262 } 263 264 // Prepare a header merger only if we have multiple readers 265 headerMerger = samPaths.size() > 1 ? createHeaderMerger() : null; 266 } 267 268 /** 269 * Are indices available for all files? 270 */ indicesAvailable()271 public boolean indicesAvailable() { 272 return indicesAvailable; 273 } 274 275 /** 276 * @return true if indices are available for all inputs. 277 * This is identical to {@link #indicesAvailable} 278 */ 279 @Override isQueryableByInterval()280 public boolean isQueryableByInterval() { 281 return indicesAvailable(); 282 } 283 284 /** 285 * Restricts a traversal of this data source via {@link #iterator} to only return reads that overlap the given intervals, 286 * and to unmapped reads if specified. 287 * 288 * Calls to {@link #query} are not affected by this method. 289 * 290 * @param intervals Our next full traversal will return reads overlapping these intervals 291 * @param traverseUnmapped Our next full traversal will return unmapped reads (this affects only unmapped reads that 292 * have no position -- unmapped reads that have the position of their mapped mates will be 293 * included if the interval overlapping that position is included). 294 */ 295 @Override setTraversalBounds( final List<SimpleInterval> intervals, final boolean traverseUnmapped )296 public void setTraversalBounds( final List<SimpleInterval> intervals, final boolean traverseUnmapped ) { 297 // Set intervalsForTraversal to null if intervals is either null or empty 298 this.intervalsForTraversal = intervals != null && ! intervals.isEmpty() ? intervals : null; 299 this.traverseUnmapped = traverseUnmapped; 300 301 if ( traversalIsBounded() && ! indicesAvailable ) { 302 raiseExceptionForMissingIndex("Traversal by intervals was requested but some input files are not indexed."); 303 } 304 } 305 306 /** 307 * @return True if traversals initiated via {@link #iterator} will be restricted to reads that overlap intervals 308 * as configured via {@link #setTraversalBounds}, otherwise false 309 */ 310 @Override traversalIsBounded()311 public boolean traversalIsBounded() { 312 return intervalsForTraversal != null || traverseUnmapped; 313 } 314 raiseExceptionForMissingIndex( String reason )315 private void raiseExceptionForMissingIndex( String reason ) { 316 String commandsToIndex = backingPaths.entrySet().stream() 317 .filter(f -> !f.getKey().hasIndex()) 318 .map(Map.Entry::getValue) 319 .map(Path::toAbsolutePath) 320 .map(f -> "samtools index " + f) 321 .collect(Collectors.joining("\n","\n","\n")); 322 323 throw new UserException(reason + "\nPlease index all input files:\n" + commandsToIndex); 324 } 325 326 /** 327 * Iterate over all reads in this data source. If intervals were provided via {@link #setTraversalBounds}, 328 * iteration is limited to reads that overlap that set of intervals. 329 * 330 * @return An iterator over the reads in this data source, limited to reads that overlap the intervals supplied 331 * via {@link #setTraversalBounds} (if intervals were provided) 332 */ 333 @Override iterator()334 public Iterator<GATKRead> iterator() { 335 logger.debug("Preparing readers for traversal"); 336 return prepareIteratorsForTraversal(intervalsForTraversal, traverseUnmapped); 337 } 338 339 /** 340 * Query reads over a specific interval. This operation is not affected by prior calls to 341 * {@link #setTraversalBounds} 342 * 343 * @param interval The interval over which to query 344 * @return Iterator over reads overlapping the query interval 345 */ 346 @Override query( final SimpleInterval interval )347 public Iterator<GATKRead> query( final SimpleInterval interval ) { 348 if ( ! indicesAvailable ) { 349 raiseExceptionForMissingIndex("Cannot query reads data source by interval unless all files are indexed"); 350 } 351 352 return prepareIteratorsForTraversal(Arrays.asList(interval)); 353 } 354 355 /** 356 * @return An iterator over just the unmapped reads with no assigned position. This operation is not affected 357 * by prior calls to {@link #setTraversalBounds}. The underlying file must be indexed. 358 */ 359 @Override queryUnmapped()360 public Iterator<GATKRead> queryUnmapped() { 361 if ( ! indicesAvailable ) { 362 raiseExceptionForMissingIndex("Cannot query reads data source by interval unless all files are indexed"); 363 } 364 365 return prepareIteratorsForTraversal(null, true); 366 } 367 368 /** 369 * Returns the SAM header for this data source. Will be a merged header if there are multiple readers. 370 * If there is only a single reader, returns its header directly. 371 * 372 * @return SAM header for this data source 373 */ 374 @Override getHeader()375 public SAMFileHeader getHeader() { 376 return headerMerger != null ? headerMerger.getMergedHeader() : readers.entrySet().iterator().next().getKey().getFileHeader(); 377 } 378 379 /** 380 * Prepare iterators over all readers in response to a request for a complete iteration or query 381 * 382 * If there are multiple intervals, they must have been optimized using QueryInterval.optimizeIntervals() 383 * before calling this method. 384 * 385 * @param queryIntervals Intervals to bound the iteration (reads must overlap one of these intervals). If null, iteration is unbounded. 386 * @return Iterator over all reads in this data source, limited to overlap with the supplied intervals 387 */ prepareIteratorsForTraversal( final List<SimpleInterval> queryIntervals )388 private Iterator<GATKRead> prepareIteratorsForTraversal( final List<SimpleInterval> queryIntervals ) { 389 return prepareIteratorsForTraversal(queryIntervals, false); 390 } 391 392 /** 393 * Prepare iterators over all readers in response to a request for a complete iteration or query 394 * 395 * @param queryIntervals Intervals to bound the iteration (reads must overlap one of these intervals). If null, iteration is unbounded. 396 * @return Iterator over all reads in this data source, limited to overlap with the supplied intervals 397 */ prepareIteratorsForTraversal( final List<SimpleInterval> queryIntervals, final boolean queryUnmapped )398 private Iterator<GATKRead> prepareIteratorsForTraversal( final List<SimpleInterval> queryIntervals, final boolean queryUnmapped ) { 399 // htsjdk requires that only one iterator be open at a time per reader, so close out 400 // any previous iterations 401 closePreviousIterationsIfNecessary(); 402 403 final boolean traversalIsBounded = (queryIntervals != null && ! queryIntervals.isEmpty()) || queryUnmapped; 404 405 // Set up an iterator for each reader, bounded to overlap with the supplied intervals if there are any 406 for ( Map.Entry<SamReader, CloseableIterator<SAMRecord>> readerEntry : readers.entrySet() ) { 407 if (traversalIsBounded) { 408 readerEntry.setValue( 409 new SamReaderQueryingIterator( 410 readerEntry.getKey(), 411 readers.size() > 1 ? 412 getIntervalsOverlappingReader(readerEntry.getKey(), queryIntervals) : 413 queryIntervals, 414 queryUnmapped 415 ) 416 ); 417 } else { 418 readerEntry.setValue(readerEntry.getKey().iterator()); 419 } 420 } 421 422 // Create a merging iterator over all readers if necessary. In the case where there's only a single reader, 423 // return its iterator directly to avoid the overhead of the merging iterator. 424 Iterator<SAMRecord> startingIterator = null; 425 if ( readers.size() == 1 ) { 426 startingIterator = readers.entrySet().iterator().next().getValue(); 427 } 428 else { 429 startingIterator = new MergingSamRecordIterator(headerMerger, readers, true); 430 } 431 432 return new SAMRecordToReadIterator(startingIterator); 433 } 434 435 /** 436 * Reduce the intervals down to only include ones that can actually intersect with this reader 437 */ getIntervalsOverlappingReader( final SamReader samReader, final List<SimpleInterval> queryIntervals )438 private List<SimpleInterval> getIntervalsOverlappingReader( 439 final SamReader samReader, 440 final List<SimpleInterval> queryIntervals ) 441 { 442 final SAMSequenceDictionary sequenceDictionary = samReader.getFileHeader().getSequenceDictionary(); 443 return queryIntervals.stream() 444 .filter(interval -> IntervalUtils.intervalIsOnDictionaryContig(interval, sequenceDictionary)) 445 .collect(Collectors.toList()); 446 } 447 448 /** 449 * Create a header merger from the individual SAM/BAM headers in our readers 450 * 451 * @return a header merger containing all individual headers in this data source 452 */ createHeaderMerger()453 private SamFileHeaderMerger createHeaderMerger() { 454 List<SAMFileHeader> headers = new ArrayList<>(readers.size()); 455 for ( Map.Entry<SamReader, CloseableIterator<SAMRecord>> readerEntry : readers.entrySet() ) { 456 headers.add(readerEntry.getKey().getFileHeader()); 457 } 458 459 SamFileHeaderMerger headerMerger = new SamFileHeaderMerger(identifySortOrder(headers), headers, true); 460 return headerMerger; 461 } 462 463 @VisibleForTesting identifySortOrder( final List<SAMFileHeader> headers )464 static SAMFileHeader.SortOrder identifySortOrder( final List<SAMFileHeader> headers ){ 465 final Set<SAMFileHeader.SortOrder> sortOrders = headers.stream().map(SAMFileHeader::getSortOrder).collect(Collectors.toSet()); 466 final SAMFileHeader.SortOrder order; 467 if (sortOrders.size() == 1) { 468 order = sortOrders.iterator().next(); 469 } else { 470 order = SAMFileHeader.SortOrder.unsorted; 471 logger.warn("Inputs have different sort orders. Assuming {} sorted reads for all of them.", order); 472 } 473 return order; 474 } 475 476 /** 477 * @return true if this {@code ReadsPathDataSource} supports serial iteration (has only non-SAM inputs). If any 478 * input has type==SAM_TYPE (is backed by a SamFileReader) this will return false, since SamFileReader 479 * doesn't support serial iterators, and can't be serially re-traversed without re-initialization of the 480 * underlying reader (and {@code ReadsPathDataSource}. 481 */ supportsSerialIteration()482 public boolean supportsSerialIteration() { 483 return !hasSAMInputs(); 484 } 485 486 /** 487 * Shut down this data source permanently, closing all iterations and readers. 488 */ 489 @Override close()490 public void close() { 491 if (isClosed) { 492 return; 493 } 494 isClosed = true; 495 closePreviousIterationsIfNecessary(); 496 497 try { 498 for ( Map.Entry<SamReader, CloseableIterator<SAMRecord>> readerEntry : readers.entrySet() ) { 499 readerEntry.getKey().close(); 500 } 501 } 502 catch ( IOException e ) { 503 throw new GATKException("Error closing SAMReader"); 504 } 505 } 506 isClosed()507 boolean isClosed() { 508 return isClosed; 509 } 510 511 /** 512 * Close any previously-opened iterations over our readers (htsjdk allows only one open iteration per reader). 513 */ closePreviousIterationsIfNecessary()514 private void closePreviousIterationsIfNecessary() { 515 for ( Map.Entry<SamReader, CloseableIterator<SAMRecord>> readerEntry : readers.entrySet() ) { 516 CloseableIterator<SAMRecord> readerIterator = readerEntry.getValue(); 517 if ( readerIterator != null ) { 518 readerIterator.close(); 519 readerEntry.setValue(null); 520 } 521 } 522 } 523 524 // Return true if any input is has type==SAM_TYPE (is backed by a SamFileReader) since SamFileReader 525 // doesn't support serial iterators and can't be serially re-traversed without re-initialization of the 526 // reader hasSAMInputs()527 private boolean hasSAMInputs() { 528 return readers.keySet().stream().anyMatch(r -> r.type().equals(SamReader.Type.SAM_TYPE)); 529 } 530 531 /** 532 * Get the sequence dictionary for this ReadsPathDataSource 533 * 534 * @return SAMSequenceDictionary from the SAMReader backing this if there is only 1 input file, otherwise the merged SAMSequenceDictionary from the merged header 535 */ 536 @Override getSequenceDictionary()537 public SAMSequenceDictionary getSequenceDictionary() { 538 return getHeader().getSequenceDictionary(); 539 } 540 541 542 } 543