Skip to content

Commit

Permalink
Engine read caching.
Browse files Browse the repository at this point in the history
  • Loading branch information
cmnbroad committed Jul 30, 2018
1 parent b6a630a commit 9addf1b
Show file tree
Hide file tree
Showing 14 changed files with 1,235 additions and 561 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.broadinstitute.hellbender.cmdline.argumentcollections;

import htsjdk.samtools.ValidationStringency;
import org.broadinstitute.barclay.argparser.Advanced;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.utils.io.IOUtils;
Expand Down Expand Up @@ -37,6 +38,11 @@ public abstract class ReadInputArgumentCollection implements Serializable {
optional = true)
protected List<String> readIndices;

@Advanced
@Argument(fullName = "reads-look-ahead-window-size", shortName="reads-look-ahead-window-size",
doc = "Window size for reads look-ahead query caching")
protected int readsLookaheadWindowSize = 100 * 1024;

/**
* Get the list of BAM/SAM/CRAM files specified at the command line.
* Paths are the preferred format, as this can handle both local disk and NIO direct access to cloud storage.
Expand Down Expand Up @@ -73,4 +79,9 @@ public List<Path> getReadIndexPaths() {
* at the command line.
*/
public ValidationStringency getReadValidationStringency() { return readValidationStringency; };

/**
* Get the look ahead buffer size to be used for read queries
*/
public int getReadsLookaheadWindowSize() { return readsLookaheadWindowSize; };
}
248 changes: 0 additions & 248 deletions src/main/java/org/broadinstitute/hellbender/engine/FeatureCache.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import htsjdk.variant.vcf.VCFHeader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.engine.cache.DrivingFeatureInputCacheStrategy;
import org.broadinstitute.hellbender.engine.cache.LocatableCache;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.IndexFeatureFile;
Expand Down Expand Up @@ -94,16 +96,7 @@ public final class FeatureDataSource<T extends Feature> implements GATKDataSourc
* improve performance of the common access pattern involving multiple queries across nearby intervals
* with gradually increasing start positions.
*/
private final FeatureCache<T> queryCache;

/**
* When we experience a cache miss (ie., a query interval not fully contained within our cache) and need
* to re-populate the Feature cache from disk to satisfy a query, this controls the number of extra bases
* AFTER the end of our interval to fetch. Should be sufficiently large so that typically a significant number
* of subsequent queries will be cache hits (ie., query intervals fully contained within our cache) before
* we have another cache miss and need to go to disk again.
*/
private final int queryLookaheadBases;
private final LocatableCache<T> queryCache;

/**
* Holds information about the path this datasource reads from.
Expand Down Expand Up @@ -278,8 +271,7 @@ public FeatureDataSource(final FeatureInput<T> featureInput, final int queryLook

this.currentIterator = null;
this.intervalsForTraversal = null;
this.queryCache = new FeatureCache<>();
this.queryLookaheadBases = queryLookaheadBases;
this.queryCache = new LocatableCache<>(getName(), new DrivingFeatureInputCacheStrategy<>(queryLookaheadBases, this::refillQueryCache));
}

/**
Expand Down Expand Up @@ -553,50 +545,28 @@ public List<T> queryAndPrefetch( final SimpleInterval interval ) {
"If it's a file, please index it using the bundled tool " + IndexFeatureFile.class.getSimpleName());
}

// If the query can be satisfied using existing cache contents, prepare for retrieval
// by discarding all Features at the beginning of the cache that end before the start
// of our query interval.
if ( queryCache.cacheHit(interval) ) {
queryCache.trimToNewStartPosition(interval.getStart());
}
// Otherwise, we have a cache miss, so go to disk to refill our cache.
else {
refillQueryCache(interval);
}

// Return the subset of our cache that overlaps our query interval
return queryCache.getCachedFeaturesUpToStopPosition(interval.getEnd());
return queryCache.queryAndPrefetch(interval);
}

/**
* Refill our cache from disk after a cache miss. Will prefetch Features overlapping an additional
* queryLookaheadBases bases after the end of the provided interval, in addition to those overlapping
* Called by the cache strategy to refill our cache from disk after a cache miss. Will prefetch Features overlapping
* an additional queryLookaheadBases bases after the end of the provided interval, in addition to those overlapping
* the interval itself.
*
* Calling this has the side effect of invalidating (closing) any currently-open iteration over
* This has the side effect of invalidating (closing) any currently-open iteration over
* this data source.
*
* @param interval the query interval that produced a cache miss
* @param cacheInterval the query interval to be cached
*/
private void refillQueryCache( final SimpleInterval interval ) {
private Iterator<T> refillQueryCache(final SimpleInterval cacheInterval) {
// Tribble documentation states that having multiple iterators open simultaneously over the same FeatureReader
// results in undefined behavior
closeOpenIterationIfNecessary();

// Expand the end of our query by the configured number of bases, in anticipation of probable future
// queries with slightly larger start/stop positions.
//
// Note that it doesn't matter if we go off the end of the contig in the process, since
// our reader's query operation is not aware of (and does not care about) contig boundaries.
// Note: we use addExact to blow up on overflow rather than propagate negative results downstream
final SimpleInterval queryInterval = new SimpleInterval(interval.getContig(), interval.getStart(), Math.addExact(interval.getEnd(), queryLookaheadBases));

// Query iterator over our reader will be immediately closed after re-populating our cache
try ( CloseableTribbleIterator<T> queryIter = featureReader.query(queryInterval.getContig(), queryInterval.getStart(), queryInterval.getEnd()) ) {
queryCache.fill(queryIter, queryInterval);
try {
return featureReader.query(cacheInterval.getContig(), cacheInterval.getStart(), cacheInterval.getEnd());
}
catch ( IOException e ) {
throw new GATKException("Error querying file " + featureInput + " over interval " + interval, e);
throw new GATKException("Error querying file " + featureInput + " over interval " + cacheInterval, e);
}
}

Expand Down Expand Up @@ -626,8 +596,7 @@ public Object getHeader() {
public void close() {
closeOpenIterationIfNecessary();

logger.debug(String.format("Cache statistics for FeatureInput %s:", featureInput));
queryCache.printCacheStatistics();
logger.debug(String.format("Cache statistics for FeatureInput %s: %s", featureInput, queryCache.getCacheStatistics()));

try {
if ( featureReader != null ) {
Expand Down
Loading

0 comments on commit 9addf1b

Please sign in to comment.