diff --git a/pom.xml b/pom.xml index b14c15c72..d925f88f3 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ au.edu.wehi gridss jar - 1.0.4.8-SNAPSHOT + 1.1.0 gridss https://github.com/PapenfussLab/gridss diff --git a/src/main/java/au/edu/wehi/idsv/AggregateReferenceCoverageLookup.java b/src/main/java/au/edu/wehi/idsv/AggregateReferenceCoverageLookup.java deleted file mode 100644 index 678952e15..000000000 --- a/src/main/java/au/edu/wehi/idsv/AggregateReferenceCoverageLookup.java +++ /dev/null @@ -1,30 +0,0 @@ -package au.edu.wehi.idsv; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -public class AggregateReferenceCoverageLookup implements ReferenceCoverageLookup { - private List underlying; - public AggregateReferenceCoverageLookup(Collection underlying) { - this.underlying = new ArrayList(underlying); - } - @Override - public int readsSupportingNoBreakendAfter(int referenceIndex, int position) { - int count = 0; - for (ReferenceCoverageLookup lookup : underlying) { - count += lookup.readsSupportingNoBreakendAfter(referenceIndex, position); - } - return count; - } - - @Override - public int readPairsSupportingNoBreakendAfter(int referenceIndex, int position) { - int count = 0; - for (ReferenceCoverageLookup lookup : underlying) { - count += lookup.readPairsSupportingNoBreakendAfter(referenceIndex, position); - } - return count; - } - -} diff --git a/src/main/java/au/edu/wehi/idsv/ReferenceCoverageLookup.java b/src/main/java/au/edu/wehi/idsv/ReferenceCoverageLookup.java index fcf0b2f5e..1b35dbd63 100644 --- a/src/main/java/au/edu/wehi/idsv/ReferenceCoverageLookup.java +++ b/src/main/java/au/edu/wehi/idsv/ReferenceCoverageLookup.java @@ -8,16 +8,13 @@ public interface ReferenceCoverageLookup { * @param position position immediate before putative breakend * @return number of reads spanning the putative breakend */ - public abstract int readsSupportingNoBreakendAfter(int referenceIndex, - int position); - + int readsSupportingNoBreakendAfter(int referenceIndex, int position); /** * Number of read pairs providing evidence against a breakend immediately after the given base * @param referenceIndex contig * @param position position immediate before putative breakend * @return number of read pairs spanning the putative breakend */ - public abstract int readPairsSupportingNoBreakendAfter(int referenceIndex, - int position); - + int readPairsSupportingNoBreakendAfter(int referenceIndex, int position); + int getCategory(); } \ No newline at end of file diff --git a/src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java b/src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java index e9f4ac123..e70fe660d 100644 --- a/src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java +++ b/src/main/java/au/edu/wehi/idsv/SAMEvidenceSource.java @@ -367,7 +367,7 @@ public static CloseableIterator mergedIterator(final List sources, AssemblyEvidenceSource assembly) { int maxSize = 0; diff --git a/src/main/java/au/edu/wehi/idsv/SequentialCoverageAnnotator.java b/src/main/java/au/edu/wehi/idsv/SequentialCoverageAnnotator.java index 855322431..0c5e5e589 100644 --- a/src/main/java/au/edu/wehi/idsv/SequentialCoverageAnnotator.java +++ b/src/main/java/au/edu/wehi/idsv/SequentialCoverageAnnotator.java @@ -4,10 +4,11 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.stream.IntStream; -import com.google.common.collect.Lists; - import au.edu.wehi.idsv.util.AsyncBufferedIterator; import htsjdk.samtools.SAMFileHeader.SortOrder; import htsjdk.samtools.SAMRecord; @@ -23,6 +24,7 @@ /** * Annotates breakends with reference allele coverage information * + * * @author Daniel Cameron * */ @@ -32,69 +34,87 @@ public class SequentialCoverageAnnotator reference; private final Iterator it; private final List toclose = new ArrayList<>(); - public SequentialCoverageAnnotator(ProcessingContext context, List sources, Iterator it) { - int windowSize = SAMEvidenceSource.maximumWindowSize(context, sources, null); - List> byCategory = Lists.newArrayList(); - while (byCategory.size() < context.getCategoryCount()) { - byCategory.add(new ArrayList()); - } - for (SAMEvidenceSource s : sources) { - byCategory.get(s.getSourceCategory()).add(s); - } - List coverage = Lists.newArrayList(); - for (int i = 0; i < byCategory.size(); i++) { - List lookup = Lists.newArrayList(); - for (SAMEvidenceSource s : byCategory.get(i)) { - // one read-ahead thread per input file - SamReader reader = SamReaderFactory.makeDefault().open(s.getFile()); - SAMRecordIterator rawIterator = reader.iterator(); - rawIterator.assertSorted(SortOrder.coordinate); - CloseableIterator sit = new AsyncBufferedIterator(rawIterator, s.getFile().getName() + "-Coverage"); - toclose.add(sit); // close the async iterator first to prevent aysnc reading from a closed stream - toclose.add(rawIterator); - toclose.add(reader); - sit = new ProgressLoggingSAMRecordIterator(sit, new ProgressLogger(log)); - SequentialReferenceCoverageLookup sourceLookup = new SequentialReferenceCoverageLookup(sit, s.getMetrics().getIdsvMetrics(), s.getReadPairConcordanceCalculator(), windowSize); - context.registerBuffer(s.getFile().getName(), sourceLookup); - lookup.add(sourceLookup); - } - coverage.add(new AggregateReferenceCoverageLookup(lookup)); - } + private final ExecutorService threadpool; + public SequentialCoverageAnnotator(ProcessingContext context, List sources, Iterator it, ExecutorService threadpool) { this.context = context; - this.reference = coverage; + this.reference = createLookup(context, sources); this.it = it; - + this.threadpool = threadpool; + } + private List createLookup(ProcessingContext context, List sources) { + int windowSize = SAMEvidenceSource.maximumWindowSize(context, sources, null); + List result = new ArrayList<>(); + for (SAMEvidenceSource ses : sources) { + assert(ses.getSourceCategory() >= 0); + assert(ses.getSourceCategory() < context.getCategoryCount()); + // one read-ahead thread per input file + SamReader reader = SamReaderFactory.makeDefault().open(ses.getFile()); + SAMRecordIterator rawIterator = reader.iterator(); + rawIterator.assertSorted(SortOrder.coordinate); + CloseableIterator sit = new AsyncBufferedIterator(rawIterator, ses.getFile().getName() + "-Coverage"); + toclose.add(sit); // close the async iterator first to prevent aysnc reading from a closed stream + toclose.add(rawIterator); + toclose.add(reader); + sit = new ProgressLoggingSAMRecordIterator(sit, new ProgressLogger(log)); + SequentialReferenceCoverageLookup sourceLookup = new SequentialReferenceCoverageLookup(sit, ses.getMetrics().getIdsvMetrics(), ses.getReadPairConcordanceCalculator(), windowSize, ses.getSourceCategory()); + context.registerBuffer(ses.getFile().getName(), sourceLookup); + result.add(sourceLookup); + } + return result; } public SequentialCoverageAnnotator( ProcessingContext context, Iterator it, - List reference) { + List reference, + ExecutorService threadpool) { this.it = it; this.context = context; this.reference = reference; + this.threadpool = threadpool; + } + private static class CoverageResult { + public CoverageResult(int reads, int spans) { + this.readsSupportingNoBreakendAfter = reads; + this.readPairsSupportingNoBreakendAfter = spans; + } + public final int readsSupportingNoBreakendAfter; + public final int readPairsSupportingNoBreakendAfter; + } + private static CoverageResult calculateCoverage(ReferenceCoverageLookup lookup, int referenceIndex, int start, int end) { + int reads = IntStream.range(start, end) + .map(p -> lookup.readsSupportingNoBreakendAfter(referenceIndex, p)) + .min().getAsInt(); + int spans = IntStream.range(start, end) + .map(p -> lookup.readPairsSupportingNoBreakendAfter(referenceIndex, p)) + .min().getAsInt(); + return new CoverageResult(reads, spans); } @SuppressWarnings("unchecked") public T annotate(T variant) { BreakendSummary loc = variant.getBreakendSummary(); int referenceIndex = loc.referenceIndex; int offset = loc.direction == BreakendDirection.Forward ? 0 : -1; - int[] reads = new int[reference.size()]; - int[] spans = new int[reference.size()]; - for (int i = 0; i < reference.size(); i++) { - ReferenceCoverageLookup r = reference.get(i); - if (r != null) { - reads[i] = IntStream.range(loc.start + offset, loc.end + 1 + offset) - .map(p -> r.readsSupportingNoBreakendAfter(referenceIndex, p)) - .min().getAsInt(); - spans[i] = IntStream.range(loc.start + offset, loc.end + 1 + offset) - .map(p -> r.readPairsSupportingNoBreakendAfter(referenceIndex, p)) - .min().getAsInt(); + int start = loc.start + offset; + int end = loc.end + 1 + offset; + List> tasks = new ArrayList<>(); + for (ReferenceCoverageLookup rcl : reference) { + tasks.add(threadpool.submit(() -> calculateCoverage(rcl, referenceIndex, start, end))); + } + try { + int[] reads = new int[reference.size()]; + int[] spans = new int[reference.size()]; + for (int i = 0; i < reference.size(); i++) { + ReferenceCoverageLookup rcl = reference.get(i); + reads[rcl.getCategory()] += tasks.get(i).get().readsSupportingNoBreakendAfter; + spans[rcl.getCategory()] += tasks.get(i).get().readPairsSupportingNoBreakendAfter; } + IdsvVariantContextBuilder builder = new IdsvVariantContextBuilder(context, variant); + builder.referenceReads(reads); + builder.referenceSpanningPairs(spans); + return (T)builder.make(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); } - IdsvVariantContextBuilder builder = new IdsvVariantContextBuilder(context, variant); - builder.referenceReads(reads); - builder.referenceSpanningPairs(spans); - return (T)builder.make(); } @Override public boolean hasNext() { diff --git a/src/main/java/au/edu/wehi/idsv/SequentialReferenceCoverageLookup.java b/src/main/java/au/edu/wehi/idsv/SequentialReferenceCoverageLookup.java index 61c2e9334..907e03bd6 100644 --- a/src/main/java/au/edu/wehi/idsv/SequentialReferenceCoverageLookup.java +++ b/src/main/java/au/edu/wehi/idsv/SequentialReferenceCoverageLookup.java @@ -18,7 +18,7 @@ import htsjdk.samtools.filter.AggregateFilter; import htsjdk.samtools.filter.AlignedFilter; import htsjdk.samtools.filter.DuplicateReadFilter; -import htsjdk.samtools.filter.FilteringIterator; +import htsjdk.samtools.filter.FilteringSamIterator; /** * Counts the number of reads and read pairs providing support for the @@ -28,6 +28,7 @@ * */ public class SequentialReferenceCoverageLookup implements Closeable, ReferenceCoverageLookup, TrackedBuffer { + private final int category; private final List toClose = Lists.newArrayList(); private final PeekingIterator reads; private final ReadPairConcordanceCalculator pairing; @@ -54,12 +55,13 @@ public class SequentialReferenceCoverageLookup implements Closeable, ReferenceCo * @param maxFragmentSize maximum fragment * @param reads reads to process. Must be coordinate sorted */ - public SequentialReferenceCoverageLookup(Iterator it, IdsvMetrics metrics, ReadPairConcordanceCalculator pairing, int windowSize) { + public SequentialReferenceCoverageLookup(Iterator it, IdsvMetrics metrics, ReadPairConcordanceCalculator pairing, int windowSize, int category) { this.pairing = pairing; if (it instanceof Closeable) toClose.add((Closeable)it); - this.reads = Iterators.peekingIterator(new FilteringIterator(it, new AggregateFilter(ImmutableList.of(new AlignedFilter(true), new DuplicateReadFilter())))); + this.reads = Iterators.peekingIterator(new FilteringSamIterator(it, new AggregateFilter(ImmutableList.of(new AlignedFilter(true), new DuplicateReadFilter())))); this.largestWindow = windowSize; this.maxEvidenceWindow = Math.max(metrics.MAX_READ_LENGTH, Math.max(metrics.MAX_READ_MAPPED_LENGTH, pairing != null ? pairing.maxConcordantFragmentSize() : 0)); + this.category = category; } public void close() { for (Closeable c : toClose) { @@ -202,4 +204,8 @@ public List currentTrackedBufferSizes() { new NamedTrackedBuffer(trackedBufferName_currentEndReferencePairs, currentEndReferencePairs.size()) ); } + @Override + public int getCategory() { + return category; + } } diff --git a/src/main/java/gridss/AnnotateReferenceCoverage.java b/src/main/java/gridss/AnnotateReferenceCoverage.java index 6aad7607e..c1d63176f 100644 --- a/src/main/java/gridss/AnnotateReferenceCoverage.java +++ b/src/main/java/gridss/AnnotateReferenceCoverage.java @@ -10,6 +10,6 @@ public class AnnotateReferenceCoverage extends VcfTransformCommandLineProgram { @Override public CloseableIterator iterator(CloseableIterator calls, ExecutorService threadpool) { - return new SequentialCoverageAnnotator(getContext(), getSamEvidenceSources(), calls); + return new SequentialCoverageAnnotator(getContext(), getSamEvidenceSources(), calls, threadpool); } } diff --git a/src/test/java/au/edu/wehi/idsv/SequentialCoverageAnnotatorTest.java b/src/test/java/au/edu/wehi/idsv/SequentialCoverageAnnotatorTest.java index 38704fcfd..fe081d72b 100644 --- a/src/test/java/au/edu/wehi/idsv/SequentialCoverageAnnotatorTest.java +++ b/src/test/java/au/edu/wehi/idsv/SequentialCoverageAnnotatorTest.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; import htsjdk.samtools.SAMRecord; import htsjdk.samtools.SAMRecordCoordinateComparator; @@ -18,12 +19,14 @@ public class SequentialCoverageAnnotatorTest extends TestHelper { + @SuppressWarnings("resource") public VariantContextDirectedEvidence go(List ref, VariantContextDirectedEvidence toAnnotate) { Collections.sort(ref, new SAMRecordCoordinateComparator()); - return new SequentialCoverageAnnotator( + return new SequentialCoverageAnnotator( getContext(), ImmutableList.of(toAnnotate).iterator(), - Lists.newArrayList(new SequentialReferenceCoverageLookup(ref.iterator(), IDSV(ref), new SAMFlagReadPairConcordanceCalculator(IDSV(ref)), 1024))) + Lists.newArrayList(new SequentialReferenceCoverageLookup(ref.iterator(), IDSV(ref), new SAMFlagReadPairConcordanceCalculator(IDSV(ref)), 1024, 0)), + MoreExecutors.newDirectExecutorService()) .annotate(toAnnotate); } @Test diff --git a/src/test/java/au/edu/wehi/idsv/SequentialReferenceCoverageLookupTest.java b/src/test/java/au/edu/wehi/idsv/SequentialReferenceCoverageLookupTest.java index 08a481b26..3a578a0a2 100644 --- a/src/test/java/au/edu/wehi/idsv/SequentialReferenceCoverageLookupTest.java +++ b/src/test/java/au/edu/wehi/idsv/SequentialReferenceCoverageLookupTest.java @@ -2,6 +2,7 @@ import static org.junit.Assert.assertEquals; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -14,7 +15,12 @@ public class SequentialReferenceCoverageLookupTest extends TestHelper { public ReferenceCoverageLookup init(List reads, int windowSize) { Collections.sort(reads, new SAMRecordCoordinateComparator()); - return new SequentialReferenceCoverageLookup(reads.iterator(), IDSV(reads), new SAMFlagReadPairConcordanceCalculator(IDSV(reads)), windowSize); + return new SequentialReferenceCoverageLookup(reads.iterator(), IDSV(reads), new SAMFlagReadPairConcordanceCalculator(IDSV(reads)), windowSize, 5); + } + @Test + public void should_report_correct_category() { + ReferenceCoverageLookup lookup = init(new ArrayList(), 1); + assertEquals(5, lookup.getCategory()); } @Test public void readsPairsSupportingNoBreakendAfter_should_return_first_read_in_pair() {