diff --git a/inputs/templates/test/MakeCohortVcf/CombineBatches.json.tmpl b/inputs/templates/test/MakeCohortVcf/CombineBatches.json.tmpl index 21508edce..e4019a590 100644 --- a/inputs/templates/test/MakeCohortVcf/CombineBatches.json.tmpl +++ b/inputs/templates/test/MakeCohortVcf/CombineBatches.json.tmpl @@ -4,11 +4,17 @@ "CombineBatches.depth_exclude_list": {{ reference_resources.depth_exclude_list | tojson }}, "CombineBatches.empty_file" : {{ reference_resources.empty_file | tojson }}, + "CombineBatches.reference_fasta": {{ reference_resources.reference_fasta | tojson }}, + "CombineBatches.reference_dict": {{ reference_resources.reference_dict | tojson }}, + "CombineBatches.reference_fasta_fai": {{ reference_resources.reference_index | tojson }}, + "CombineBatches.min_sr_background_fail_batches": 0.5, + "CombineBatches.gatk_docker": {{ dockers.gatk_docker | tojson }}, "CombineBatches.sv_pipeline_docker": {{ dockers.sv_pipeline_docker | tojson }}, "CombineBatches.sv_base_mini_docker":{{ dockers.sv_base_mini_docker | tojson }}, "CombineBatches.cohort_name": {{ test_batch.name | tojson }}, + "CombineBatches.ped_file": {{ test_batch.ped_file | tojson }}, "CombineBatches.batches": [ {{ test_batch.name | tojson }} ], diff --git a/inputs/values/dockers.json b/inputs/values/dockers.json index 8bc4db59d..ec10ec15a 100644 --- a/inputs/values/dockers.json +++ b/inputs/values/dockers.json @@ -1,8 +1,8 @@ { "name": "dockers", - "cnmops_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/cnmops:2024-08-27-v0.29-beta-6b27c39f", + "cnmops_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/cnmops:2024-06-04-v0.28.5-beta-a8dfecba", "condense_counts_docker": "us.gcr.io/broad-dsde-methods/tsharpe/gatk:4.2.6.1-57-g9e03432", - "gatk_docker": "us.gcr.io/broad-dsde-methods/eph/gatk:2024-07-02-4.6.0.0-1-g4af2b49e9-NIGHTLY-SNAPSHOT", + "gatk_docker": "us.gcr.io/broad-dsde-methods/markw/gatk:mw-sv-stratify-99422dc", "gatk_docker_pesr_override": "us.gcr.io/broad-dsde-methods/tsharpe/gatk:4.2.6.1-57-g9e03432", "genomes_in_the_cloud_docker": "us.gcr.io/broad-gotc-prod/genomes-in-the-cloud:2.3.2-1510681135", "linux_docker": "marketplace.gcr.io/google/ubuntu1804", @@ -10,10 +10,10 @@ "melt_docker": "us.gcr.io/talkowski-sv-gnomad/melt:a85c92f", "scramble_docker": "us.gcr.io/broad-dsde-methods/markw/scramble:mw-scramble-99af4c50", "samtools_cloud_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/samtools-cloud:2024-01-24-v0.28.4-beta-9debd6d7", - "sv_base_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-base:2024-08-27-v0.29-beta-6b27c39f", + "sv_base_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-base:2024-01-24-v0.28.4-beta-9debd6d7", "sv_base_mini_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-base-mini:2024-01-24-v0.28.4-beta-9debd6d7", - "sv_pipeline_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-pipeline:2024-09-25-v0.29-beta-f064b2d7", - "sv_pipeline_qc_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-pipeline:2024-09-25-v0.29-beta-f064b2d7", + "sv_pipeline_docker": "us.gcr.io/broad-dsde-methods/markw/sv-pipeline:mw-gatk-combine-batches-7a20df41", + "sv_pipeline_qc_docker": "us.gcr.io/broad-dsde-methods/markw/sv-pipeline:mw-gatk-combine-batches-7a20df41", "wham_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/wham:2024-01-24-v0.28.4-beta-9debd6d7", "igv_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/igv:mw-xz-fixes-2-b1be6a9", "duphold_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/duphold:mw-xz-fixes-2-b1be6a9", @@ -28,5 +28,5 @@ "sv_utils_docker": "us.gcr.io/broad-dsde-methods/markw/sv-utils:mw-train-genotype-filtering-a9479501", "gq_recalibrator_docker": "us.gcr.io/broad-dsde-methods/markw/gatk:mw-tb-form-sv-filter-training-data-899360a", "str": "us.gcr.io/broad-dsde-methods/gatk-sv/str:2023-05-23-v0.27.3-beta-e537bdd6", - "denovo": "us.gcr.io/broad-dsde-methods/gatk-sv/denovo:2024-09-25-v0.29-beta-f064b2d7" + "denovo": "us.gcr.io/broad-dsde-methods/markw/denovo:mw-gatk-combine-batches-7a20df41" } \ No newline at end of file diff --git a/src/sv-pipeline/04_variant_resolution/scripts/add_genotypes.py b/src/sv-pipeline/04_variant_resolution/scripts/add_genotypes.py index 4cceab746..15bb89d3d 100755 --- a/src/sv-pipeline/04_variant_resolution/scripts/add_genotypes.py +++ b/src/sv-pipeline/04_variant_resolution/scripts/add_genotypes.py @@ -12,25 +12,6 @@ import pysam -def make_multiallelic_alts(record, max_CN, is_bca=False): - """ - Add alts for CN states up to half of max observed total CN - """ - - max_haplo_CN = int(np.ceil(max_CN / 2)) - - if is_bca: - alts = tuple([''] + - [''.format(i) for i in range(2, max_haplo_CN + 1)]) - else: - alts = tuple([''] + - [''.format(i) for i in range(2, max_haplo_CN + 1)]) - - stop = record.stop - record.alts = alts - record.stop = stop - - def make_evidence_int(ev): ev = ev.split(',') @@ -59,16 +40,14 @@ def add_genotypes(record, genotypes, varGQ): del record.format[fmt] max_GT = genotypes['GT'].max() - is_bca = record.info['SVTYPE'] not in 'DEL DUP'.split() - - if is_bca: - max_CN = max_GT - else: - max_CN = max_GT + 2 if max_GT > 2: - record.info['MULTIALLELIC'] = True - make_multiallelic_alts(record, max_CN, is_bca) + record.alts = ('',) + if record.info['SVTYPE'] != 'DUP': + msg = 'Invalid SVTYPE {0} for multiallelic record {1}' + msg = msg.format(record.info['SVTYPE'], record.id) + raise Exception(msg) + record.info['SVTYPE'] = 'CNV' cols = 'name sample GT GQ RD_CN RD_GQ PE_GT PE_GQ SR_GT SR_GQ EV'.split() gt_matrix = genotypes.reset_index()[cols].to_numpy() @@ -85,27 +64,7 @@ def add_genotypes(record, genotypes, varGQ): raise Exception(msg) if max_GT > 2: - if record.info['SVTYPE'] == 'DEL': - msg = 'Invalid SVTYPE {0} for multiallelic genotype in record {1}' - msg = msg.format(record.info['SVTYPE'], record.id) - raise Exception(msg) - - if is_bca: - idx1 = int(np.floor(data[2] / 2)) - idx2 = int(np.ceil(data[2] / 2)) - else: - # split copy state roughly evenly between haplotypes - idx1 = int(np.floor((data[2] + 2) / 2)) - idx2 = int(np.ceil((data[2] + 2) / 2)) - - # if copy state is 1, assign reference genotype - if idx1 == 1: - idx1 = 0 - if idx2 == 1: - idx2 = 0 - - record.samples[sample]['GT'] = (idx1, idx2) - + record.samples[sample]['GT'] = (None, None) elif data[2] == 0: record.samples[sample]['GT'] = (0, 0) elif data[2] == 1: diff --git a/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part1b_build_dict.py b/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part1b_build_dict.py index b7da153cb..ed341bed2 100644 --- a/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part1b_build_dict.py +++ b/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part1b_build_dict.py @@ -34,7 +34,7 @@ def __init__(self): self.sample_list = [] def _update_rd_cn(self, variant, sample_indices): - self.rd_cn[variant.id] = {s: variant.samples[s][VariantFormatTypes.RD_CN] for s in sample_indices} + self.rd_cn[variant.id] = {s: variant.samples[s].get(VariantFormatTypes.RD_CN) for s in sample_indices} @staticmethod def get_wider(f): diff --git a/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part1b_filter.py b/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part1b_filter.py index e63b890cd..0da863af8 100644 --- a/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part1b_filter.py +++ b/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part1b_filter.py @@ -43,13 +43,13 @@ def modify_variants(dict_file_gz, vcf, multi_cnvs): for sample_id in geno_normal_revise_dict[variant.id]: o = variant.samples[sample_id] o.update({"GT": (0, 1)}) - o.update({"GQ": o["RD_GQ"]}) + o.update({"GQ": o.get("RD_GQ")}) if variant.stop - variant.start >= 1000: if variant.info[SVTYPE] in [SVType.DEL, SVType.DUP]: is_del = variant.info[SVTYPE] == SVType.DEL for k, v in variant.samples.items(): - rd_cn = v[VariantFormatTypes.RD_CN] + rd_cn = v.get(VariantFormatTypes.RD_CN) if rd_cn is None: continue if (is_del and rd_cn > 3) or \ diff --git a/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part5_update_records.py b/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part5_update_records.py index 5e92c2bf5..98013b07f 100755 --- a/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part5_update_records.py +++ b/src/sv-pipeline/04_variant_resolution/scripts/clean_vcf_part5_update_records.py @@ -93,10 +93,10 @@ def main(): record = revised_lines_by_id[record.id] if record.info.get('SVTYPE', None) == 'DEL': if abs(record.stop - record.pos) >= 1000: - sample_cn_map = {s: record.samples[s]['RD_CN'] for s in non_outlier_samples} + sample_cn_map = {s: record.samples[s].get('RD_CN') for s in non_outlier_samples} if len([s for s in sample_cn_map if (sample_cn_map[s] is not None and sample_cn_map[s] > 3)]) > vf_1: multi_del = True - gts = [record.samples[s]['GT'] for s in non_outlier_samples] + gts = [record.samples[s].get('GT') for s in non_outlier_samples] if any(gt not in biallelic_gts for gt in gts): gt5kb_del = True if abs(record.stop - record.pos) >= 5000: @@ -105,7 +105,7 @@ def main(): if record.info.get('SVTYPE', None) == 'DUP': if abs(record.stop - record.pos) >= 1000: - sample_cn_map = {s: record.samples[s]['RD_CN'] for s in non_outlier_samples} + sample_cn_map = {s: record.samples[s].get('RD_CN') for s in non_outlier_samples} if sum(1 for s in sample_cn_map if sample_cn_map[s] is not None and sample_cn_map[s] > 4) > vf_1: multi_dup = True if sum(1 for x in Counter(sample_cn_map.values()) if x is not None and (x < 1 or x > 4)) > 4: @@ -114,7 +114,7 @@ def main(): (sample_cn_map[s] < 1 or sample_cn_map[s] > 4) and gt4_copystate) > vf_1: multi_dup = True - gts = [record.samples[s]['GT'] for s in non_outlier_samples] + gts = [record.samples[s].get('GT') for s in non_outlier_samples] if any(gt not in biallelic_gts for gt in gts): gt5kb_dup = True if abs(record.stop - record.pos) >= 5000: @@ -123,24 +123,27 @@ def main(): if gt5kb_del: for sample_obj in record.samples.itervalues(): - if not sample_obj['GQ'] is None and \ - (sample_obj['RD_CN'] is not None and sample_obj['RD_CN'] >= 2): + if not sample_obj.get('GQ') is None and \ + (sample_obj.get('RD_CN') is not None and sample_obj.get('RD_CN') >= 2): sample_obj['GT'] = (0, 0) - elif not sample_obj['GQ'] is None and \ - (sample_obj['RD_CN'] is not None and sample_obj['RD_CN'] == 1): + elif not sample_obj.get('GQ') is None and \ + (sample_obj.get('RD_CN') is not None and sample_obj.get('RD_CN') == 1): sample_obj['GT'] = (0, 1) - elif not sample_obj['GQ'] is None: + elif not sample_obj.get('GQ') is None: sample_obj['GT'] = (1, 1) # RD_CN 0 DEL if gt5kb_dup: + # Convert to bi-allelic + if '' in record.alts: + record.alts = ('',) for sample_obj in record.samples.itervalues(): - if not sample_obj['GQ'] is None and \ - (sample_obj['RD_CN'] is not None and sample_obj['RD_CN'] <= 2): + if not sample_obj.get('GQ') is None and \ + (sample_obj.get('RD_CN') is not None and sample_obj.get('RD_CN') <= 2): sample_obj['GT'] = (0, 0) - elif not sample_obj['GQ'] is None and \ - (sample_obj['RD_CN'] is not None and sample_obj['RD_CN'] == 3): + elif not sample_obj.get('GQ') is None and \ + (sample_obj.get('RD_CN') is not None and sample_obj.get('RD_CN') == 3): sample_obj['GT'] = (0, 1) - elif not sample_obj['GQ'] is None: + elif not sample_obj.get('GQ') is None: sample_obj['GT'] = (1, 1) # RD_CN > 3 DUP if record.id in multi_geno_ids: @@ -148,11 +151,11 @@ def main(): if multi_del or multi_dup: record.filter.add('MULTIALLELIC') - for j, sample in enumerate(record.samples): + for sample in record.samples: record.samples[sample]['GT'] = None record.samples[sample]['GQ'] = None - record.samples[sample]['CN'] = record.samples[sample]['RD_CN'] - record.samples[sample]['CNQ'] = record.samples[sample]['RD_GQ'] + record.samples[sample]['CN'] = record.samples[sample].get('RD_CN') + record.samples[sample]['CNQ'] = record.samples[sample].get('RD_GQ') if len(record.filter) > 1 and 'PASS' in record.filter: del record.filter['PASS'] @@ -164,7 +167,7 @@ def main(): if record.id in sexchr_revise: for sample in record.samples: if sample in male_samples: - cn = record.samples[sample]['RD_CN'] + cn = record.samples[sample].get('RD_CN') if cn is not None and int(cn) > 0: cn = int(cn) record.samples[sample]['RD_CN'] = cn - 1 diff --git a/src/sv-pipeline/04_variant_resolution/scripts/overlap_breakpoint_filter.py b/src/sv-pipeline/04_variant_resolution/scripts/overlap_breakpoint_filter.py index 099b9aafd..58cc96181 100644 --- a/src/sv-pipeline/04_variant_resolution/scripts/overlap_breakpoint_filter.py +++ b/src/sv-pipeline/04_variant_resolution/scripts/overlap_breakpoint_filter.py @@ -53,7 +53,7 @@ def __init__(self, record): else: raise ValueError("Uninterpretable evidence: {}".format(ev)) if record.id in bothside_pass: - self.both_end_support = bothside_pass[record.id] + self.both_end_support = 1 else: self.both_end_support = 0 self.sr_fail = record.id in background_fail @@ -102,7 +102,7 @@ def __str__(self): # Read bothside-pass/background-fail records sys.stderr.write("Reading SR files...\n") with open(BOTHSIDE_PASS_PATH) as f: - bothside_pass = {line.strip().split('\t')[-1]: float(line.strip().split('\t')[0]) for line in f} + bothside_pass = set([line.strip().split('\t')[-1] for line in f]) with open(BACKGROUND_FAIL_PATH) as f: background_fail = set([line.strip().split('\t')[-1] for line in f]) diff --git a/src/sv-pipeline/04_variant_resolution/scripts/reformat_genotyped_vcf.py b/src/sv-pipeline/04_variant_resolution/scripts/reformat_genotyped_vcf.py new file mode 100644 index 000000000..3c2c510bc --- /dev/null +++ b/src/sv-pipeline/04_variant_resolution/scripts/reformat_genotyped_vcf.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# + +""" +Replaces EV field numeric codes with strings and fixes alleles for multi-allelic CNVs. Optionally drops records with +invalid coordinates. +""" + +import argparse +import logging +import pysam +import sys + +EVIDENCE_LIST = [ + '', # 0 + 'RD', # 1 + 'PE', # 2 + 'RD,PE', # 3 + 'SR', # 4 + 'RD,SR', # 5 + 'PE,SR', # 6 + 'RD,PE,SR' # 7 +] + +EV_FORMAT_HEADER = '##FORMAT=' + + +def get_new_header(header): + header_list = str(header).split('\n') + new_header_lines = list() + for line in header_list: + if line.startswith('##FORMAT=',) + record.info['SVTYPE'] = 'CNV' + # Remove MULTIALLELIC field (legacy) + record.info.pop('MULTIALLELIC') + # Since pysam messes with some of the formatting (i.e. END limitation) we parse the string and replace + max_ev = len(EVIDENCE_LIST) - 1 + record_tokens = str(record).strip().split('\t') + if drop_invalid_coords: + if not valid_record_coordinates(record_tokens=record_tokens, ref_contig_length_dict=ref_contig_length_dict): + logging.warning(f"Dropping record {record_tokens[2]}") + return None + format_keys = record_tokens[8].split(':') + gt_index = format_keys.index('GT') + ev_index = format_keys.index('EV') + new_record_tokens = record_tokens[:9] + for format in record_tokens[9:]: + format_tokens = format.split(':') + # Reset multiallelic CNV genotypes to ./. + if record.info['SVTYPE'] == 'CNV': + gt_tokens = format_tokens[gt_index].split('/') + format_tokens[gt_index] = "/".join(["." for _ in gt_tokens]) + # Convert evidence as string list representation + ev = format.split(':')[ev_index] + if ev == '.' or not ev: + new_ev = '.' + else: + ev_int = int(ev) + if ev_int < 0 or ev_int > max_ev: + raise ValueError(f"Invalid EV value {ev_int} in record {record.id}") + new_ev = EVIDENCE_LIST[ev_int] + format_tokens[ev_index] = new_ev + new_record_tokens.append(':'.join(format_tokens)) + return '\t'.join(new_record_tokens) + + +def parse_reference_fai(path): + if not path: + raise ValueError("Reference index required if using --drop-invalid-coords") + with open(path) as f: + result = dict() + for line in f: + tokens = line.strip().split('\t') + chrom = tokens[0] + length = int(tokens[1]) + result[chrom] = length + return result + + +def main(): + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--vcf", type=str, required=True, + help="Path to input VCF from GATK-SV GenotypeBatch module") + parser.add_argument("--drop-invalid-coords", action='store_true', + help="Drop records with invalid coordinates, i.e. POS/END/END2 greater than chromosome length") + parser.add_argument("--reference-fai", type=str, required=False, + help="Reference fasta index (.fai), required only if using --drop-invalid-coords") + args = parser.parse_args() + logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO) + + if args.drop_invalid_coords: + ref_contig_length_dict = parse_reference_fai(args.reference_fai) + else: + ref_contig_length_dict = None + + with pysam.VariantFile(args.vcf) as vcf: + new_header = get_new_header(vcf.header) + sys.stdout.write(str(new_header)) + for record in vcf: + new_record = process_record(record=record, + drop_invalid_coords=args.drop_invalid_coords, + ref_contig_length_dict=ref_contig_length_dict) + if new_record is not None: + sys.stdout.write(new_record + "\n") + + +if __name__ == '__main__': + main() diff --git a/src/sv-pipeline/04_variant_resolution/scripts/rename.py b/src/sv-pipeline/04_variant_resolution/scripts/rename.py index e9906ae3b..b890e0199 100755 --- a/src/sv-pipeline/04_variant_resolution/scripts/rename.py +++ b/src/sv-pipeline/04_variant_resolution/scripts/rename.py @@ -30,21 +30,7 @@ def rename(vcf, fout, chrom=None, prefix='SV_pipeline'): record.id = fmt.format(**locals()) # Clean metadata - end = record.stop record.ref = 'N' - # if not record.alts[0].startswith('<'): - # record.alts = ('<{0}>'.format(record.info['SVTYPE']), ) - record.stop = end - - # for key in record.format.keys(): - # if key != 'GT': - # for sample in record.samples.keys(): - # record.samples[sample].pop(key) - # del record.format[key] - - for info in 'CIPOS CIEND RMSSTD MULTIALLELIC_TYPE'.split(): - if info in record.info.keys(): - record.info.pop(info) # Clean metadata for CNVs sucked into complex resolution # and not appropriately cleaned diff --git a/src/sv-pipeline/java/org/broadinstitute/svpipeline/CleanVCFPart1.java b/src/sv-pipeline/java/org/broadinstitute/svpipeline/CleanVCFPart1.java index 5637154bb..7ba3b1713 100644 --- a/src/sv-pipeline/java/org/broadinstitute/svpipeline/CleanVCFPart1.java +++ b/src/sv-pipeline/java/org/broadinstitute/svpipeline/CleanVCFPart1.java @@ -116,10 +116,10 @@ public static void main( final String[] args ) { } } - // move the SVTYPE to the ALT field (except for MEs) + // move the SVTYPE to the ALT field (except for MEs and multi-allelic DUPs) final InfoField info = record.getInfo(); final ByteSequence svType = info.get(SVTYPE_KEY); - if ( !record.getAlt().contains(ME_VALUE) ) { + if ( !record.getAlt().contains(ME_VALUE) && !DUP_VALUE.equals(svType) ) { if ( svType != null ) { record.setAlt(new ByteSequence(LT_VALUE, svType, GT_VALUE)); } diff --git a/src/sv-pipeline/scripts/format_gatk_vcf_for_svtk.py b/src/sv-pipeline/scripts/format_gatk_vcf_for_svtk.py index f096c0267..f42f80035 100644 --- a/src/sv-pipeline/scripts/format_gatk_vcf_for_svtk.py +++ b/src/sv-pipeline/scripts/format_gatk_vcf_for_svtk.py @@ -5,16 +5,36 @@ import sys from typing import Optional, List, Text, Set -_gt_sum_map = dict() +_gt_map = dict() +_cnv_gt_map = dict() -def _cache_gt_sum(gt): +def _cache_gt(gt): if gt is None: - return 0 - s = _gt_sum_map.get(gt, None) + return 0, 0 + s = _gt_map.get(gt, None) if s is None: - s = sum([1 for a in gt if a is not None and a > 0]) - _gt_sum_map[gt] = s + x = sum([1 for a in gt if a is not None and a > 0]) + if x == 0: + s = (0, 0) + elif x == 1: + s = (0, 1) + else: + s = (1, 1) + _gt_map[gt] = s + return s + + +def _cache_cnv_gt(cn): + if cn is None: + return 0, 0 + s = _cnv_gt_map.get(cn, None) + if s is None: + # Split copies evenly between alleles, giving one more copy to the second allele if odd + x = max(0, cn - 2) + alt1 = min(x // 2, 4) + alt2 = min(x - alt1, 4) + _cnv_gt_map[cn] = (alt1, alt2) return s @@ -71,7 +91,8 @@ def create_header(header_in: pysam.VariantHeader, def convert(record: pysam.VariantRecord, vcf_out: pysam.VariantFile, remove_infos: Set[Text], - remove_formats: Set[Text]) -> pysam.VariantRecord: + remove_formats: Set[Text], + set_pass: bool) -> pysam.VariantRecord: """ Converts a record from gatk to svtk style. This includes updating all GT fields to diploid and reverting END tag values. @@ -86,6 +107,8 @@ def convert(record: pysam.VariantRecord, info fields to remove remove_formats: Set[Text] format fields to remove + set_pass: bool + set empty FILTER statuses to PASS Returns ------- @@ -100,15 +123,26 @@ def convert(record: pysam.VariantRecord, if svtype == 'BND': # Ensure we aren't using breakend notation here, since it isn't supported in some modules alleles = ('N', '') + elif svtype == 'CNV': + # Prior to CleanVcf, all CNVs have alleles + alleles = ('N', '', '', '', '') else: alleles = ('N', alleles[1]) contig = record.contig - new_record = vcf_out.new_record(contig=contig, start=record.start, stop=record.stop, alleles=alleles) + # Change filter to PASS if requested + if set_pass and len(record.filter) == 0: + filter = ('PASS',) + else: + filter = record.filter + new_record = vcf_out.new_record(contig=contig, start=record.start, stop=record.stop, alleles=alleles, filter=filter) new_record.id = record.id # copy INFO fields for key in record.info: if key not in remove_infos: new_record.info[key] = record.info[key] + if svtype == 'CNV': + # Prior to CleanVcf, all mCNVs are DUP type + new_record.info['SVTYPE'] = 'DUP' # svtk generally expects all records to have CHR2 assigned chr2 = record.info.get('CHR2', None) if chr2 is None: @@ -124,7 +158,7 @@ def convert(record: pysam.VariantRecord, new_record.info['SVLEN'] = record.info.get('SVLEN', -1) elif svtype == 'DEL': new_record.info['STRANDS'] = '+-' - elif svtype == 'DUP': + elif svtype == 'DUP' or svtype == 'CNV': new_record.info['STRANDS'] = '-+' elif svtype == 'INV': new_record.info['STRANDS'] = record.info.get('STRANDS', None) @@ -137,10 +171,10 @@ def convert(record: pysam.VariantRecord, if key not in remove_formats: new_genotype[key] = genotype[key] # fix GT, always assuming diploid - if _cache_gt_sum(genotype.get('GT', None)) > 0: - new_genotype['GT'] = (0, 1) + if svtype == 'CNV': + new_genotype['GT'] = _cache_cnv_gt(genotype.get('CN', genotype.get('RD_CN'))) else: - new_genotype['GT'] = (0, 0) + new_genotype['GT'] = _cache_gt(genotype.get('GT', None)) return new_record @@ -174,6 +208,8 @@ def __parse_arguments(argv: List[Text]) -> argparse.Namespace: help="Comma-delimited list of FORMAT fields to remove") parser.add_argument("--remove-infos", type=str, help="Comma-delimited list of INFO fields to remove") + parser.add_argument("--set-pass", default=False, action='store_true', + help="Set empty FILTER fields (\".\") to PASS") if len(argv) <= 1: parser.parse_args(["--help"]) sys.exit(0) @@ -207,7 +243,8 @@ def main(argv: Optional[List[Text]] = None): record=record, vcf_out=vcf_out, remove_infos=remove_infos, - remove_formats=remove_formats + remove_formats=remove_formats, + set_pass=arguments.set_pass )) diff --git a/src/sv-pipeline/scripts/format_svtk_vcf_for_gatk.py b/src/sv-pipeline/scripts/format_svtk_vcf_for_gatk.py index 9b5ee31b3..0f9bfa418 100644 --- a/src/sv-pipeline/scripts/format_svtk_vcf_for_gatk.py +++ b/src/sv-pipeline/scripts/format_svtk_vcf_for_gatk.py @@ -5,7 +5,7 @@ import sys import gzip from math import floor -from typing import Any, List, Text, Dict, Optional +from typing import Any, List, Text, Dict, Optional, Set GQ_FIELDS = ["GQ", "PE_GQ", "SR_GQ", "RD_GQ"] @@ -83,6 +83,8 @@ def update_header(header: pysam.VariantHeader) -> None: # Add these just in case (no effect if they exist) header.add_line('##INFO=') header.add_line('##INFO=') + header.add_line('##INFO=') + header.add_line('##INFO=') def rescale_gq(record): @@ -95,7 +97,9 @@ def rescale_gq(record): def convert(record: pysam.VariantRecord, bnd_end_dict: Optional[Dict[Text, int]], ploidy_dict: Dict[Text, Dict[Text, int]], - scale_down_gq: bool) -> pysam.VariantRecord: + scale_down_gq: bool, + bothside_pass_vid_set: Set[Text], + background_fail_vid_set: Set[Text]) -> pysam.VariantRecord: """ Converts a record from svtk to gatk style. This includes updating END/END2 and adding necessary fields such as ECN. @@ -110,6 +114,10 @@ def convert(record: pysam.VariantRecord, map from sample to contig to ploidy scale_down_gq: bool scale GQs to 0-99 range + bothside_pass_vid_set: Set[Text] + set of variant IDs with bothside SR pass flag + background_fail_vid_set: Set[Text] + set of variant Ids with background SR fail flag Returns ------- @@ -148,6 +156,11 @@ def is_null(val): val = record.info[k] if is_null(val) or (isinstance(val, tuple) and len(val) == 1 and is_null(val[0])): del record.info[k] + # Add SR flag to INFO field + if record.id in bothside_pass_vid_set: + record.info['BOTHSIDES_SUPPORT'] = True + if record.id in background_fail_vid_set: + record.info['HIGH_SR_BACKGROUND'] = True # copy FORMAT fields for sample, genotype in record.samples.items(): genotype['ECN'] = ploidy_dict[sample][contig] @@ -156,6 +169,13 @@ def is_null(val): return record +def parse_last_column(path): + if path is None: + return set() + with open(path) as f: + return set([line.strip().split('\t')[-1] for line in f]) + + def _process(vcf_in: pysam.VariantFile, vcf_out: pysam.VariantFile, arguments: Dict[Text, Any]) -> None: @@ -181,9 +201,14 @@ def _process(vcf_in: pysam.VariantFile, bnd_end_dict = None ploidy_dict = _parse_ploidy_table(arguments.ploidy_table) + bothside_pass_vid_set = parse_last_column(arguments.bothside_pass_list) + background_fail_vid_set = parse_last_column(arguments.background_fail_list) + for record in vcf_in: out = convert(record=record, bnd_end_dict=bnd_end_dict, - ploidy_dict=ploidy_dict, scale_down_gq=arguments.scale_down_gq) + ploidy_dict=ploidy_dict, scale_down_gq=arguments.scale_down_gq, + bothside_pass_vid_set=bothside_pass_vid_set, + background_fail_vid_set=background_fail_vid_set) vcf_out.write(out) @@ -206,6 +231,10 @@ def _parse_arguments(argv: List[Text]) -> argparse.Namespace: help="Fix END tags and assign END2 to END") parser.add_argument("--scale-down-gq", action='store_true', help="Scales all GQs down from [0-999] to [0-99]") + parser.add_argument("--bothside-pass-list", type=str, + help="Path to bothside SR pass flag variant list") + parser.add_argument("--background-fail-list", type=str, + help="Path to background SR fail flag variant list") if len(argv) <= 1: parser.parse_args(["--help"]) sys.exit(0) diff --git a/src/sv-pipeline/scripts/single_sample/convert_cnvs_without_depth_support_to_bnds.py b/src/sv-pipeline/scripts/single_sample/convert_cnvs_without_depth_support_to_bnds.py index 290d2f786..f99daa6ec 100755 --- a/src/sv-pipeline/scripts/single_sample/convert_cnvs_without_depth_support_to_bnds.py +++ b/src/sv-pipeline/scripts/single_sample/convert_cnvs_without_depth_support_to_bnds.py @@ -14,28 +14,28 @@ def has_depth_support_autosome(record, sample): - return record.samples[sample]['RD_CN'] is not None and \ - ((record.info['SVTYPE'] == 'DUP' and record.samples[sample]['RD_CN'] > 2) or - (record.info['SVTYPE'] == 'DEL' and record.samples[sample]['RD_CN'] < 2)) + return record.samples[sample].get('RD_CN') is not None and \ + ((record.info['SVTYPE'] == 'DUP' and record.samples[sample].get('RD_CN') > 2) or + (record.info['SVTYPE'] == 'DEL' and record.samples[sample].get('RD_CN') < 2)) def has_sr_or_pe_support(record, sample): - return (record.samples[sample]['PE_GT'] is not None and record.samples[sample]['PE_GT'] > 0) \ - or (record.samples[sample]['SR_GT'] is not None and record.samples[sample]['SR_GT'] > 0) + return (record.samples[sample].get('PE_GT') is not None and record.samples[sample].get('PE_GT') > 0) \ + or (record.samples[sample].get('SR_GT') is not None and record.samples[sample].get('SR_GT') > 0) def has_depth_support_allosome(record, sample, samples_with_same_sex): - if record.samples[sample]['RD_CN'] is None: + if record.samples[sample].get('RD_CN') is None: return False - cns = [record.samples[s]['RD_CN'] for s in samples_with_same_sex] + cns = [record.samples[s].get('RD_CN') for s in samples_with_same_sex] cns = [cn for cn in cns if cn is not None] if len(cns) == 0: return False cns.sort() median_cn = cns[int((len(cns) + 1) / 2)] - return record.samples[sample]['RD_CN'] is not None and \ - ((record.info['SVTYPE'] == 'DUP' and record.samples[sample]['RD_CN'] > median_cn) or - (record.info['SVTYPE'] == 'DEL' and record.samples[sample]['RD_CN'] < median_cn)) + return record.samples[sample].get('RD_CN') is not None and \ + ((record.info['SVTYPE'] == 'DUP' and record.samples[sample].get('RD_CN') > median_cn) or + (record.info['SVTYPE'] == 'DEL' and record.samples[sample].get('RD_CN') < median_cn)) def read_contigs_list(contigs_list): @@ -92,7 +92,7 @@ def main(): svtype = record.info['SVTYPE'] if (svtype == 'DEL' or svtype == 'DUP') and record.info['SVLEN'] >= min_size: pesr_support = has_sr_or_pe_support(record, case_sample) - if record.samples[case_sample]['RD_CN'] is None: + if record.samples[case_sample].get('RD_CN') is None: if not pesr_support: sys.stderr.write("Record {} has a missing depth genotype and no PE/SR support; dropping\n".format(record.id)) continue diff --git a/src/svtk/svtk/utils/genotype_merging.py b/src/svtk/svtk/utils/genotype_merging.py index 470cf6acc..b34c80ee9 100644 --- a/src/svtk/svtk/utils/genotype_merging.py +++ b/src/svtk/svtk/utils/genotype_merging.py @@ -50,43 +50,17 @@ def check_multiallelic(records): records : list of pysam.VariantRecord """ for record in records: - if record.alts[0] == '': + if record.alts[0] in ['', '']: return True - # for sample in record.samples: - # GT = record.samples[sample]['GT'] - # if GT[0] > 1 or GT[1] > 1: - # return True - return False def make_multiallelic_alts(records): """ - Make list of alts corresponding to record with highest observed copy number - - Parameters - ---------- - records : list of pysam.VariantRecord - - Returns - ------- - alts : tuple of str + Sets simple symbolic alt for multi-allelic records """ - - max_CN = 2 - - is_bca = records[0].info['SVTYPE'] not in 'DEL DUP'.split() - for record in records: - if record.alts[0] == '': - CN = int(record.alts[-1].strip('')) - if CN > max_CN: - max_CN = CN - - if is_bca: - return tuple([''] + ['' % i for i in range(1, max_CN + 1)]) - else: - return tuple([''] + ['' % i for i in range(1, max_CN + 1)]) + record.alts = ('',) def update_best_genotypes(new_record, records, preserve_multiallelic=False): @@ -113,7 +87,7 @@ def _str_to_tuple(x): is_multiallelic = False if is_multiallelic: - new_record.alts = make_multiallelic_alts(records) + make_multiallelic_alts(records) new_record_formats = new_record.header.formats.keys() for sample in new_record.samples: @@ -123,17 +97,12 @@ def _str_to_tuple(x): # If any record is multiallelic, replace non-multiallelic # genotypes with multiallelic equivalents if key == 'GT': - GT = best_record.samples[sample][key] + gt = best_record.samples[sample][key] if is_multiallelic: - if GT == (0, 1): - new_record.samples[sample][key] = (0, 2) - elif GT == (1, 1): - new_record.samples[sample][key] = (2, 2) - else: - new_record.samples[sample][key] = GT + # No genotypes for multi-allelic records since they can't always be determined + new_record.samples[sample][key] = (None, None) else: - GT = tuple([x if x is None else min(x, 1) for x in GT]) - new_record.samples[sample][key] = GT + new_record.samples[sample][key] = tuple([x if x is None else min(x, 1) for x in gt]) elif key == 'EV': ev_values = [r.samples[sample][key] for r in records] ev_values = [_str_to_tuple(v) for v in ev_values] diff --git a/src/svtk/svtk/utils/utils.py b/src/svtk/svtk/utils/utils.py index 8ff807513..7921b1a9c 100644 --- a/src/svtk/svtk/utils/utils.py +++ b/src/svtk/svtk/utils/utils.py @@ -116,8 +116,10 @@ def get_called_samples(record, include_null=False): samples.append(sample) if record.info.get('SVTYPE', None) == 'CNV': - for sample in record.samples.keys(): - if record.samples[sample]['CN'] != 2: + for sample, gt in record.samples.items(): + # Get CN, or RD_CN if it doesn't exist + cn = gt.get('CN', gt.get('RD_CN', None)) + if cn is not None and cn != 2: samples.append(sample) return sorted(samples) diff --git a/wdl/CleanVcfChromosome.wdl b/wdl/CleanVcfChromosome.wdl index 29b3bcc8e..79d41734a 100644 --- a/wdl/CleanVcfChromosome.wdl +++ b/wdl/CleanVcfChromosome.wdl @@ -561,18 +561,18 @@ task CleanVcf4 { for sid in record.samples: s = record.samples[sid] # Pick best GT - if s['PE_GT'] is None: + if s.get('PE_GT') is None: continue - elif s['SR_GT'] is None: - gt = s['PE_GT'] - elif s['PE_GT'] > 0 and s['SR_GT'] == 0: - gt = s['PE_GT'] - elif s['PE_GT'] == 0: - gt = s['SR_GT'] - elif s['PE_GQ'] >= s['SR_GQ']: - gt = s['PE_GT'] + elif s.get('SR_GT') is None: + gt = s.get('PE_GT') + elif s.get('PE_GT') > 0 and s.get('SR_GT') == 0: + gt = s.get('PE_GT') + elif s.get('PE_GT') == 0: + gt = s.get('SR_GT') + elif s.get('PE_GQ') >= s.get('SR_GQ'): + gt = s.get('PE_GT') else: - gt = s['SR_GT'] + gt = s.get('SR_GT') if gt > 2: num_gt_over_2 += 1 if num_gt_over_2 > max_vf: diff --git a/wdl/ClusterSingleChromosome.wdl b/wdl/ClusterSingleChromosome.wdl deleted file mode 100644 index 9e67f9d9c..000000000 --- a/wdl/ClusterSingleChromosome.wdl +++ /dev/null @@ -1,110 +0,0 @@ -version 1.0 - -# Author: Ryan Collins - -import "TasksMakeCohortVcf.wdl" as MiniTasks -import "ShardedCluster.wdl" as ShardedCluster -import "HailMerge.wdl" as HailMerge - -# Workflow to perform sharding & clustering of a vcf for a single chromosome -workflow ClusterSingleChrom { - input { - File vcf - File vcf_index - Int num_samples - String contig - String cohort_name - String evidence_type - String prefix - Int dist - Float frac - Float sample_overlap - File? exclude_list - Int sv_size - Array[String] sv_types - File empty_file - - Boolean use_hail - String? gcs_project - - String sv_pipeline_docker - String sv_base_mini_docker - - # overrides for MiniTasks - RuntimeAttr? runtime_override_subset_sv_type - RuntimeAttr? runtime_override_cat_vid_lists_chrom - - # overrides for ShardedCluster - RuntimeAttr? runtime_override_shard_clusters - RuntimeAttr? runtime_override_shard_vids - RuntimeAttr? runtime_override_pull_vcf_shard - RuntimeAttr? runtime_override_svtk_vcf_cluster - RuntimeAttr? runtime_override_get_vcf_header_with_members_info_line - RuntimeAttr? runtime_override_concat_svtypes - RuntimeAttr? runtime_override_concat_sharded_cluster - RuntimeAttr? runtime_override_cat_vid_lists_sharded - RuntimeAttr? runtime_override_make_sites_only - RuntimeAttr? runtime_override_sort_merged_vcf - - RuntimeAttr? runtime_override_preconcat_sharded_cluster - RuntimeAttr? runtime_override_hail_merge_sharded_cluster - RuntimeAttr? runtime_override_fix_header_sharded_cluster - } - - #Scatter over svtypes - scatter ( sv_type in sv_types ) { - #Subset vcf to only contain records for that svtype - - call MiniTasks.FilterVcf as SubsetSvType { - input: - vcf=vcf, - vcf_index=vcf_index, - records_filter='INFO/SVTYPE="~{sv_type}"', - outfile_prefix="~{prefix}.~{sv_type}", - use_ssd=true, - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_subset_sv_type - } - - #For each svtype, intelligently shard VCF for clustering - call ShardedCluster.ShardedCluster { - input: - vcf=SubsetSvType.filtered_vcf, - num_samples=num_samples, - dist=dist, - frac=frac, - prefix="~{prefix}.~{sv_type}", - cohort_name=cohort_name, - contig=contig, - evidence_type=evidence_type, - sv_type=sv_type, - sample_overlap=sample_overlap, - exclude_list=exclude_list, - sv_size=sv_size, - sv_types=sv_types, - empty_file=empty_file, - use_hail=use_hail, - gcs_project=gcs_project, - sv_pipeline_docker=sv_pipeline_docker, - sv_base_mini_docker=sv_base_mini_docker, - runtime_override_shard_clusters=runtime_override_shard_clusters, - runtime_override_shard_vids=runtime_override_shard_vids, - runtime_override_pull_vcf_shard=runtime_override_pull_vcf_shard, - runtime_override_svtk_vcf_cluster=runtime_override_svtk_vcf_cluster, - runtime_override_get_vcf_header_with_members_info_line=runtime_override_get_vcf_header_with_members_info_line, - runtime_override_concat_sharded_cluster=runtime_override_concat_sharded_cluster, - runtime_override_cat_vid_lists_sharded=runtime_override_cat_vid_lists_sharded, - runtime_override_make_sites_only=runtime_override_make_sites_only, - runtime_override_sort_merged_vcf=runtime_override_sort_merged_vcf, - runtime_override_preconcat_sharded_cluster=runtime_override_preconcat_sharded_cluster, - runtime_override_hail_merge_sharded_cluster=runtime_override_hail_merge_sharded_cluster, - runtime_override_fix_header_sharded_cluster=runtime_override_fix_header_sharded_cluster - } - } - - #Output clustered vcf - output { - Array[File] clustered_vcfs = ShardedCluster.clustered_vcf - Array[File] clustered_vcf_indexes = ShardedCluster.clustered_vcf_idx - } -} diff --git a/wdl/CombineBatches.wdl b/wdl/CombineBatches.wdl index 624e3ed54..5c18bd5f4 100644 --- a/wdl/CombineBatches.wdl +++ b/wdl/CombineBatches.wdl @@ -1,22 +1,23 @@ version 1.0 -import "CombineSRBothsidePass.wdl" as CombineSRBothsidePass -import "VcfClusterSingleChromsome.wdl" as VcfClusterContig +import "CombineSRBothsidePass.wdl" as CombineSRBothsidePassWorkflow +import "FormatVcfForGatk.wdl" as GatkFormatting +import "TasksClusterBatch.wdl" as ClusterTasks +import "TasksGenotypeBatch.wdl" as GenotypeTasks import "TasksMakeCohortVcf.wdl" as MiniTasks -import "HailMerge.wdl" as HailMerge -import "HarmonizeHeaders.wdl" as HarmonizeHeaders -import "MergePesrDepth.wdl" as MergePesrDepth -import "Utils.wdl" as Utils workflow CombineBatches { input { String cohort_name Array[String] batches + File ped_file Boolean merge_vcfs = false Array[File] pesr_vcfs Array[File] depth_vcfs + # Set to true if using vcfs generated with a prior version, i.e. not ending in "_reformatted.vcf.gz" + Boolean legacy_vcfs = false Array[File] raw_sr_bothside_pass_files Array[File] raw_sr_background_fail_files @@ -27,80 +28,50 @@ workflow CombineBatches { File depth_exclude_list Float min_sr_background_fail_batches + # Reclustering parameters + File clustering_config_part1 + File stratification_config_part1 + File clustering_config_part2 + File stratification_config_part2 + # These arrays give the names and intervals for reference contexts for stratification (same lengths) + # Names must correspond to those in the stratification config files + Array[String] context_names + Array[File] context_bed_files + + File reference_fasta + File reference_fasta_fai + File reference_dict + String? chr_x + String? chr_y + File empty_file Boolean use_hail = false String? gcs_project + Float? java_mem_fraction + + String gatk_docker String sv_base_mini_docker String sv_pipeline_docker - # overrides for local tasks - RuntimeAttr? runtime_override_update_sr_list - RuntimeAttr? runtime_override_merge_pesr_depth - RuntimeAttr? runtime_override_reheader - RuntimeAttr? runtime_override_pull_header - - # overrides for mini tasks + RuntimeAttr? runtime_attr_create_ploidy + RuntimeAttr? runtime_attr_reformat_1 + RuntimeAttr? runtime_attr_reformat_2 + RuntimeAttr? runtime_attr_join_vcfs + RuntimeAttr? runtime_attr_cluster_sites + RuntimeAttr? runtime_attr_recluster_part1 + RuntimeAttr? runtime_attr_recluster_part2 RuntimeAttr? runtime_attr_get_non_ref_vids RuntimeAttr? runtime_attr_calculate_support_frac RuntimeAttr? runtime_override_clean_background_fail + RuntimeAttr? runtime_attr_gatk_to_svtk_vcf + RuntimeAttr? runtime_attr_extract_vids RuntimeAttr? runtime_override_concat - RuntimeAttr? runtime_override_concat_pesr_depth - RuntimeAttr? runtime_override_update_fix_pesr_header - RuntimeAttr? runtime_override_count_samples - RuntimeAttr? runtime_override_preconcat_pesr_depth - RuntimeAttr? runtime_override_hail_merge_pesr_depth - RuntimeAttr? runtime_override_fix_header_pesr_depth - RuntimeAttr? runtime_override_concat_large_pesr_depth - - # overrides for VcfClusterContig - RuntimeAttr? runtime_override_localize_vcfs - RuntimeAttr? runtime_override_join_vcfs - RuntimeAttr? runtime_override_fix_multiallelic - RuntimeAttr? runtime_override_fix_ev_tags - RuntimeAttr? runtime_override_subset_bothside_pass - RuntimeAttr? runtime_override_subset_background_fail - RuntimeAttr? runtime_override_subset_sv_type - RuntimeAttr? runtime_override_shard_clusters - RuntimeAttr? runtime_override_shard_vids - RuntimeAttr? runtime_override_pull_vcf_shard - RuntimeAttr? runtime_override_svtk_vcf_cluster - RuntimeAttr? runtime_override_get_vcf_header_with_members_info_line - RuntimeAttr? runtime_override_concat_vcf_cluster - RuntimeAttr? runtime_override_concat_svtypes - RuntimeAttr? runtime_override_concat_sharded_cluster - RuntimeAttr? runtime_override_make_sites_only - RuntimeAttr? runtime_override_sort_merged_vcf_cluster - RuntimeAttr? runtime_override_preconcat_sharded_cluster - RuntimeAttr? runtime_override_hail_merge_sharded_cluster - RuntimeAttr? runtime_override_fix_header_sharded_cluster - - # overerides for merge pesr depth - RuntimeAttr? runtime_override_shard_clusters_mpd - RuntimeAttr? runtime_override_shard_vids_mpd - RuntimeAttr? runtime_override_pull_vcf_shard_mpd - RuntimeAttr? runtime_override_merge_pesr_depth_mpd - - RuntimeAttr? runtime_override_sort_merged_vcf_mpd - RuntimeAttr? runtime_override_subset_small_mpd - RuntimeAttr? runtime_override_subset_large_mpd - RuntimeAttr? runtime_override_make_sites_only_mpd - RuntimeAttr? runtime_override_concat_large_pesr_depth_mpd - RuntimeAttr? runtime_override_concat_shards_mpd - - RuntimeAttr? runtime_override_preconcat_large_pesr_depth_mpd - RuntimeAttr? runtime_override_hail_merge_large_pesr_depth_mpd - RuntimeAttr? runtime_override_fix_header_large_pesr_depth_mpd - - RuntimeAttr? runtime_override_preconcat_pesr_depth_shards_mpd - RuntimeAttr? runtime_override_hail_merge_pesr_depth_shards_mpd - RuntimeAttr? runtime_override_fix_header_pesr_depth_shards_mpd - } # Preprocess some inputs - call CombineSRBothsidePass.CombineSRBothsidePass { + call CombineSRBothsidePassWorkflow.CombineSRBothsidePass { input: pesr_vcfs=pesr_vcfs, raw_sr_bothside_pass_files=raw_sr_bothside_pass_files, @@ -112,7 +83,7 @@ workflow CombineBatches { } Float min_background_fail_first_col = min_sr_background_fail_batches * length(raw_sr_background_fail_files) - call MiniTasks.CatUncompressedFiles as CleanBackgroundFail { + call MiniTasks.CatUncompressedFiles as CombineBackgroundFail { input: shards=raw_sr_background_fail_files, filter_command="sort | uniq -c | awk -v OFS='\\t' '{if($1 >= ~{min_background_fail_first_col}) print $2}'", @@ -121,270 +92,169 @@ workflow CombineBatches { runtime_attr_override=runtime_override_clean_background_fail } - call Utils.CountSamples { + call ClusterTasks.CreatePloidyTableFromPed { input: - vcf=depth_vcfs[0], - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_count_samples + ped_file=ped_file, + contig_list=contig_list, + retain_female_chr_y=true, + chr_x=chr_x, + chr_y=chr_y, + output_prefix="~{cohort_name}.ploidy", + sv_pipeline_docker=sv_pipeline_docker, + runtime_attr_override=runtime_attr_create_ploidy } - #Scatter per chromosome - Array[String] contigs = transpose(read_tsv(contig_list))[0] - scatter ( contig in contigs ) { - - #Subset PESR VCFs to single chromosome & cluster - #Note: also subsets bothside_pass and background_fail files to variants - #present on chromosome of interest - call VcfClusterContig.VcfClusterSingleChrom as ClusterPesr { + Array[File] all_vcfs = flatten([pesr_vcfs, depth_vcfs]) + scatter (vcf in all_vcfs) { + if (legacy_vcfs) { + call GenotypeTasks.ReformatGenotypedVcf { + input: + vcf = vcf, + output_prefix = basename(vcf, ".vcf.gz") + ".reformat_legacy", + sv_pipeline_docker = sv_pipeline_docker, + runtime_attr_override = runtime_attr_reformat_1 + } + } + File reformatted_vcf = select_first([ReformatGenotypedVcf.out, vcf]) + call GatkFormatting.FormatVcf { input: - vcfs=pesr_vcfs, - num_samples=CountSamples.num_samples, - batches=batches, - prefix="~{cohort_name}.~{contig}.pesr", - dist=300, - frac=0.1, - sample_overlap=0.5, - exclude_list=pe_exclude_list, - sv_size=50, - sv_types=["DEL","DUP","INV","BND","INS"], - contig=contig, - evidence_type="pesr", - cohort_name=cohort_name, - localize_shard_size=localize_shard_size, - subset_sr_lists=true, - bothside_pass=CombineSRBothsidePass.out, - background_fail=CleanBackgroundFail.outfile, - empty_file=empty_file, - use_hail=use_hail, - gcs_project=gcs_project, + vcf=reformatted_vcf, + ploidy_table=CreatePloidyTableFromPed.out, + args="--fix-end", + output_prefix=basename(vcf, ".vcf.gz") + ".reformat_gatk", + bothside_pass_list=CombineSRBothsidePass.out, + background_fail_list=CombineBackgroundFail.outfile, sv_pipeline_docker=sv_pipeline_docker, - sv_base_mini_docker=sv_base_mini_docker, - runtime_override_localize_vcfs = runtime_override_localize_vcfs, - runtime_override_join_vcfs = runtime_override_join_vcfs, - runtime_override_fix_multiallelic = runtime_override_fix_multiallelic, - runtime_override_fix_ev_tags = runtime_override_fix_ev_tags, - runtime_override_subset_bothside_pass=runtime_override_subset_bothside_pass, - runtime_override_subset_background_fail=runtime_override_subset_background_fail, - runtime_override_subset_sv_type=runtime_override_subset_sv_type, - runtime_override_shard_clusters=runtime_override_shard_clusters, - runtime_override_shard_vids=runtime_override_shard_vids, - runtime_override_pull_vcf_shard=runtime_override_pull_vcf_shard, - runtime_override_svtk_vcf_cluster=runtime_override_svtk_vcf_cluster, - runtime_override_get_vcf_header_with_members_info_line=runtime_override_get_vcf_header_with_members_info_line, - runtime_override_concat_vcf_cluster=runtime_override_concat_vcf_cluster, - runtime_override_concat_svtypes=runtime_override_concat_svtypes, - runtime_override_concat_sharded_cluster=runtime_override_concat_sharded_cluster, - runtime_override_make_sites_only=runtime_override_make_sites_only, - runtime_override_sort_merged_vcf=runtime_override_sort_merged_vcf_cluster, - runtime_override_preconcat_sharded_cluster=runtime_override_preconcat_sharded_cluster, - runtime_override_hail_merge_sharded_cluster=runtime_override_hail_merge_sharded_cluster, - runtime_override_fix_header_sharded_cluster=runtime_override_fix_header_sharded_cluster + runtime_attr_override=runtime_attr_reformat_2 } + } + + #Scatter per chromosome + Array[String] contigs = transpose(read_tsv(contig_list))[0] + scatter ( contig in contigs ) { - #Subset RD VCFs to single chromosome & cluster - call VcfClusterContig.VcfClusterSingleChrom as ClusterDepth { + # First round of clustering + call ClusterTasks.SVCluster as JoinVcfs { input: - vcfs=depth_vcfs, - num_samples=CountSamples.num_samples, - batches=batches, - prefix="~{cohort_name}.~{contig}.depth", - dist=500000, - frac=0.5, - sample_overlap=0.5, - exclude_list=depth_exclude_list, - sv_size=5000, - sv_types=["DEL","DUP"], + vcfs=FormatVcf.out, + ploidy_table=CreatePloidyTableFromPed.out, + output_prefix="~{cohort_name}.combine_batches.~{contig}.join_vcfs", contig=contig, - evidence_type="depth", - cohort_name=cohort_name, - localize_shard_size=localize_shard_size, - subset_sr_lists=false, - bothside_pass=CombineSRBothsidePass.out, - background_fail=CleanBackgroundFail.outfile, - empty_file=empty_file, - use_hail=use_hail, - gcs_project=gcs_project, - sv_pipeline_docker=sv_pipeline_docker, - sv_base_mini_docker=sv_base_mini_docker, - runtime_override_localize_vcfs = runtime_override_localize_vcfs, - runtime_override_join_vcfs = runtime_override_join_vcfs, - runtime_override_fix_multiallelic = runtime_override_fix_multiallelic, - runtime_override_fix_ev_tags = runtime_override_fix_ev_tags, - runtime_override_subset_bothside_pass=runtime_override_subset_bothside_pass, - runtime_override_subset_background_fail=runtime_override_subset_background_fail, - runtime_override_subset_sv_type=runtime_override_subset_sv_type, - runtime_override_shard_clusters=runtime_override_shard_clusters, - runtime_override_shard_vids=runtime_override_shard_vids, - runtime_override_svtk_vcf_cluster=runtime_override_svtk_vcf_cluster, - runtime_override_get_vcf_header_with_members_info_line=runtime_override_get_vcf_header_with_members_info_line, - runtime_override_concat_vcf_cluster=runtime_override_concat_vcf_cluster, - runtime_override_concat_svtypes=runtime_override_concat_svtypes, - runtime_override_concat_sharded_cluster=runtime_override_concat_sharded_cluster, - runtime_override_make_sites_only=runtime_override_make_sites_only, - runtime_override_sort_merged_vcf=runtime_override_sort_merged_vcf_cluster, - runtime_override_preconcat_sharded_cluster=runtime_override_preconcat_sharded_cluster, - runtime_override_hail_merge_sharded_cluster=runtime_override_hail_merge_sharded_cluster, - runtime_override_fix_header_sharded_cluster=runtime_override_fix_header_sharded_cluster + fast_mode=false, + pesr_sample_overlap=0, + pesr_interval_overlap=1, + pesr_breakend_window=0, + depth_sample_overlap=0, + depth_interval_overlap=1, + depth_breakend_window=0, + mixed_sample_overlap=0, + mixed_interval_overlap=1, + mixed_breakend_window=0, + reference_fasta=reference_fasta, + reference_fasta_fai=reference_fasta_fai, + reference_dict=reference_dict, + java_mem_fraction=java_mem_fraction, + gatk_docker=gatk_docker, + runtime_attr_override=runtime_attr_join_vcfs } - call MiniTasks.ConcatVcfs as ConcatPesrSitesOnly { + # First round of clustering + call ClusterTasks.SVCluster as ClusterSites { input: - vcfs=ClusterPesr.clustered_vcfs, - vcfs_idx=ClusterPesr.clustered_vcf_indexes, - naive=true, - generate_index=false, - sites_only=true, - outfile_prefix="~{cohort_name}.clustered_pesr.sites_only", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_concat + vcfs=[JoinVcfs.out], + ploidy_table=CreatePloidyTableFromPed.out, + output_prefix="~{cohort_name}.combine_batches.~{contig}.cluster_sites", + fast_mode=false, + breakpoint_summary_strategy="REPRESENTATIVE", + pesr_sample_overlap=0.5, + pesr_interval_overlap=0.1, + pesr_breakend_window=300, + depth_sample_overlap=0.5, + depth_interval_overlap=0.5, + depth_breakend_window=500000, + mixed_sample_overlap=0.5, + mixed_interval_overlap=0.5, + mixed_breakend_window=1000000, + reference_fasta=reference_fasta, + reference_fasta_fai=reference_fasta_fai, + reference_dict=reference_dict, + java_mem_fraction=java_mem_fraction, + variant_prefix="~{cohort_name}_~{contig}_", + gatk_docker=gatk_docker, + runtime_attr_override=runtime_attr_cluster_sites } - #Update SR background fail & bothside pass files (1) - call MiniTasks.UpdateSrList as UpdateBackgroundFailFirst { - input: - vcf=ConcatPesrSitesOnly.concat_vcf, - original_list=ClusterPesr.filtered_background_fail, - outfile="~{cohort_name}.~{contig}.sr_background_fail.updated.txt", - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_update_sr_list - } - call MiniTasks.UpdateSrList as UpdateBothsidePassFirst { + # Second round of clustering + call GroupedSVClusterTask as GroupedSVClusterPart1 { input: - vcf=ConcatPesrSitesOnly.concat_vcf, - original_list=ClusterPesr.filtered_bothside_pass, - outfile="~{cohort_name}.~{contig}.sr_bothside_pass.updated.txt", - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_update_sr_list + vcf=ClusterSites.out, + ploidy_table=CreatePloidyTableFromPed.out, + output_prefix="~{cohort_name}.combine_batches.~{contig}.recluster_part_1", + reference_fasta=reference_fasta, + reference_fasta_fai=reference_fasta_fai, + reference_dict=reference_dict, + clustering_config=clustering_config_part1, + stratification_config=stratification_config_part1, + context_bed_files=context_bed_files, + context_names=context_names, + breakpoint_summary_strategy="REPRESENTATIVE", + java_mem_fraction=java_mem_fraction, + gatk_docker=gatk_docker, + runtime_attr_override=runtime_attr_recluster_part1 } - call HarmonizeHeaders.HarmonizeHeaders { + # Third round of clustering + call GroupedSVClusterTask as GroupedSVClusterPart2 { input: - header_vcf=ClusterDepth.clustered_vcfs[0], - vcfs=ClusterPesr.clustered_vcfs, - prefix="~{cohort_name}.~{contig}.harmonize_headers", - sv_base_mini_docker=sv_base_mini_docker, - runtime_override_reheader=runtime_override_reheader, - runtime_override_pull_header=runtime_override_pull_header + vcf=GroupedSVClusterPart1.out, + ploidy_table=CreatePloidyTableFromPed.out, + output_prefix="~{cohort_name}.combine_batches.~{contig}.recluster_part_2", + reference_fasta=reference_fasta, + reference_fasta_fai=reference_fasta_fai, + reference_dict=reference_dict, + clustering_config=clustering_config_part2, + stratification_config=stratification_config_part2, + context_bed_files=context_bed_files, + context_names=context_names, + breakpoint_summary_strategy="REPRESENTATIVE", + java_mem_fraction=java_mem_fraction, + gatk_docker=gatk_docker, + runtime_attr_override=runtime_attr_recluster_part2 } - call MergePesrDepth.MergePesrDepth as MergeDeletions { + # Use "depth" as source to match legacy headers + # AC/AF cause errors due to being lists instead of single values + call ClusterTasks.GatkToSvtkVcf { input: - subtyped_pesr_vcf=HarmonizeHeaders.out[0], - subtyped_depth_vcf=ClusterDepth.clustered_vcfs[0], - svtype="DEL", - num_samples=CountSamples.num_samples, - prefix="~{cohort_name}.~{contig}.merge_del", - cohort_name=cohort_name, - contig=contig, - use_hail=use_hail, - gcs_project=gcs_project, - sv_base_mini_docker=sv_base_mini_docker, + vcf=GroupedSVClusterPart2.out, + output_prefix="~{cohort_name}.combine_batches.~{contig}.svtk_formatted", + source="depth", + contig_list=contig_list, + remove_formats="CN", + remove_infos="END2,AC,AF,AN,HIGH_SR_BACKGROUND,BOTHSIDES_SUPPORT", + set_pass=true, sv_pipeline_docker=sv_pipeline_docker, - runtime_override_shard_clusters=runtime_override_shard_clusters_mpd, - runtime_override_shard_vids=runtime_override_shard_vids_mpd, - runtime_override_pull_vcf_shard=runtime_override_pull_vcf_shard_mpd, - runtime_override_merge_pesr_depth=runtime_override_merge_pesr_depth_mpd, - runtime_override_sort_merged_vcf=runtime_override_sort_merged_vcf_mpd, - runtime_override_subset_small=runtime_override_subset_small_mpd, - runtime_override_subset_large=runtime_override_subset_large_mpd, - runtime_override_make_sites_only=runtime_override_make_sites_only_mpd, - runtime_override_concat_large_pesr_depth=runtime_override_concat_large_pesr_depth_mpd, - runtime_override_concat_shards=runtime_override_concat_shards_mpd, - runtime_override_preconcat_large_pesr_depth=runtime_override_preconcat_large_pesr_depth_mpd, - runtime_override_hail_merge_large_pesr_depth=runtime_override_hail_merge_large_pesr_depth_mpd, - runtime_override_fix_header_large_pesr_depth=runtime_override_fix_header_large_pesr_depth_mpd, - runtime_override_preconcat_pesr_depth_shards=runtime_override_preconcat_pesr_depth_shards_mpd, - runtime_override_hail_merge_pesr_depth_shards=runtime_override_hail_merge_pesr_depth_shards_mpd, - runtime_override_fix_header_pesr_depth_shards=runtime_override_fix_header_pesr_depth_shards_mpd + runtime_attr_override=runtime_attr_gatk_to_svtk_vcf } - call MergePesrDepth.MergePesrDepth as MergeDuplications { + call ExtractSRVariantLists { input: - subtyped_pesr_vcf=HarmonizeHeaders.out[1], - subtyped_depth_vcf=ClusterDepth.clustered_vcfs[1], - svtype="DUP", - num_samples=CountSamples.num_samples, - prefix="~{cohort_name}.~{contig}.merge_dup", - cohort_name=cohort_name, - contig=contig, - use_hail=use_hail, - gcs_project=gcs_project, + vcf=GroupedSVClusterPart2.out, + vcf_index=GroupedSVClusterPart2.out_index, + output_prefix="~{cohort_name}.combine_batches.~{contig}", sv_base_mini_docker=sv_base_mini_docker, - sv_pipeline_docker=sv_pipeline_docker, - runtime_override_shard_clusters=runtime_override_shard_clusters_mpd, - runtime_override_shard_vids=runtime_override_shard_vids_mpd, - runtime_override_pull_vcf_shard=runtime_override_pull_vcf_shard_mpd, - runtime_override_merge_pesr_depth=runtime_override_merge_pesr_depth_mpd, - runtime_override_sort_merged_vcf=runtime_override_sort_merged_vcf_mpd, - runtime_override_subset_small=runtime_override_subset_small_mpd, - runtime_override_subset_large=runtime_override_subset_large_mpd, - runtime_override_make_sites_only=runtime_override_make_sites_only_mpd, - runtime_override_concat_large_pesr_depth=runtime_override_concat_large_pesr_depth_mpd, - runtime_override_concat_shards=runtime_override_concat_shards_mpd, - runtime_override_preconcat_large_pesr_depth=runtime_override_preconcat_large_pesr_depth_mpd, - runtime_override_hail_merge_large_pesr_depth=runtime_override_hail_merge_large_pesr_depth_mpd, - runtime_override_fix_header_large_pesr_depth=runtime_override_fix_header_large_pesr_depth_mpd, - runtime_override_preconcat_pesr_depth_shards=runtime_override_preconcat_pesr_depth_shards_mpd, - runtime_override_hail_merge_pesr_depth_shards=runtime_override_hail_merge_pesr_depth_shards_mpd, - runtime_override_fix_header_pesr_depth_shards=runtime_override_fix_header_pesr_depth_shards_mpd - } - - #Merge PESR & RD VCFs - if (use_hail) { - call HailMerge.HailMerge as ConcatPesrDepthHail { - input: - vcfs=[MergeDeletions.out, MergeDuplications.out, HarmonizeHeaders.out[2], HarmonizeHeaders.out[3], HarmonizeHeaders.out[4]], - prefix="~{cohort_name}.~{contig}.concat_pesr_depth", - gcs_project=gcs_project, - sv_base_mini_docker=sv_base_mini_docker, - sv_pipeline_docker=sv_pipeline_docker, - runtime_override_preconcat=runtime_override_preconcat_pesr_depth, - runtime_override_hail_merge=runtime_override_hail_merge_pesr_depth, - runtime_override_fix_header=runtime_override_fix_header_pesr_depth - } - } - if (!use_hail) { - call MiniTasks.ConcatVcfs as ConcatPesrDepth { - input: - vcfs=[MergeDeletions.out, MergeDuplications.out, HarmonizeHeaders.out[2], HarmonizeHeaders.out[3], HarmonizeHeaders.out[4]], - vcfs_idx=[MergeDeletions.out+".tbi", MergeDuplications.out+".tbi", HarmonizeHeaders.out[2]+".tbi", HarmonizeHeaders.out[3]+".tbi", HarmonizeHeaders.out[4]+".tbi"], - allow_overlaps=true, - outfile_prefix="~{cohort_name}.~{contig}.concat_pesr_depth", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_concat_large_pesr_depth - } - } - - #Update SR background fail & bothside pass files (2) - call MiniTasks.UpdateSrList as UpdateBackgroundFailSecond { - input: - vcf=select_first([ConcatPesrDepth.concat_vcf, ConcatPesrDepthHail.merged_vcf]), - original_list=UpdateBackgroundFailFirst.updated_list, - outfile="~{cohort_name}.~{contig}.sr_background_fail.updated2.txt", - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_update_sr_list + runtime_attr_override=runtime_attr_extract_vids } - call MiniTasks.UpdateSrList as UpdateBothsidePassSecond { - input: - vcf=select_first([ConcatPesrDepth.concat_vcf, ConcatPesrDepthHail.merged_vcf]), - original_list=UpdateBothsidePassFirst.updated_list, - outfile="~{cohort_name}.~{contig}.sr_bothside_pass.updated2.txt", - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_update_sr_list - } - - File vcfs_out_ = select_first([ConcatPesrDepth.concat_vcf, ConcatPesrDepthHail.merged_vcf]) - File vcf_indexes_out_ = select_first([ConcatPesrDepth.concat_vcf_idx, ConcatPesrDepthHail.merged_vcf_index]) } - #Merge resolved vcfs for QC + # Merge resolved vcfs for QC if (merge_vcfs) { call MiniTasks.ConcatVcfs { input: - vcfs=vcfs_out_, - vcfs_idx=vcf_indexes_out_, + vcfs=GatkToSvtkVcf.out, + vcfs_idx=GatkToSvtkVcf.out_index, naive=true, - outfile_prefix="~{cohort_name}.combine_batches", + outfile_prefix="~{cohort_name}.combine_batches.concat_all_contigs", sv_base_mini_docker=sv_base_mini_docker, runtime_attr_override=runtime_override_concat } @@ -392,11 +262,147 @@ workflow CombineBatches { #Final outputs output { - Array[File] combined_vcfs = vcfs_out_ - Array[File] combined_vcf_indexes = vcf_indexes_out_ - Array[File] cluster_bothside_pass_lists = UpdateBothsidePassSecond.updated_list - Array[File] cluster_background_fail_lists = UpdateBackgroundFailSecond.updated_list + Array[File] combined_vcfs = GatkToSvtkVcf.out + Array[File] combined_vcf_indexes = GatkToSvtkVcf.out_index + Array[File] cluster_background_fail_lists = ExtractSRVariantLists.high_sr_background_list + Array[File] cluster_bothside_pass_lists = ExtractSRVariantLists.bothsides_sr_support File? combine_batches_merged_vcf = ConcatVcfs.concat_vcf File? combine_batches_merged_vcf_index = ConcatVcfs.concat_vcf_idx } } + + +# Find intersection of Variant IDs from vid_list with those present in vcf, return as filtered_vid_list +task ExtractSRVariantLists { + input { + File vcf + File vcf_index + String output_prefix + String sv_base_mini_docker + RuntimeAttr? runtime_attr_override + } + + # when filtering/sorting/etc, memory usage will likely go up (much of the data will have to + # be held in memory or disk while working, potentially in a form that takes up more space) + RuntimeAttr runtime_default = object { + mem_gb: 3.75, + disk_gb: ceil(10.0 + size(vcf, "GB")), + cpu_cores: 1, + preemptible_tries: 3, + max_retries: 1, + boot_disk_gb: 10 + } + RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default]) + runtime { + memory: select_first([runtime_override.mem_gb, runtime_default.mem_gb]) + " GB" + disks: "local-disk " + select_first([runtime_override.disk_gb, runtime_default.disk_gb]) + " HDD" + cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores]) + preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries]) + maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries]) + docker: sv_base_mini_docker + bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb]) + } + + command <<< + set -euo pipefail + bcftools query -f '%ID\t%HIGH_SR_BACKGROUND\t%BOTHSIDES_SUPPORT\n' ~{vcf} > flags.txt + awk -F'\t' '($2 != "."){print $1}' flags.txt > ~{output_prefix}.high_sr_background.txt + awk -F'\t' '($3 != "."){print $1}' flags.txt > ~{output_prefix}.bothsides_sr_support.txt + >>> + + output { + File high_sr_background_list = "~{output_prefix}.high_sr_background.txt" + File bothsides_sr_support = "~{output_prefix}.bothsides_sr_support.txt" + } +} + +task GroupedSVClusterTask { + input { + File vcf + File ploidy_table + String output_prefix + + File reference_fasta + File reference_fasta_fai + File reference_dict + + File clustering_config + File stratification_config + Array[File] context_bed_files + Array[String] context_names + + Float context_overlap_frac = 0 + Int context_num_breakpoint_overlaps = 1 + Int context_interchrom_num_breakpoint_overlaps = 1 + + String? breakpoint_summary_strategy + String? contig + String? additional_args + + Float? java_mem_fraction + String gatk_docker + RuntimeAttr? runtime_attr_override + } + + parameter_meta { + vcf: { + localization_optional: true + } + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: ceil(10 + size(vcf, "GB") * 2), + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + output { + File out = "~{output_prefix}.vcf.gz" + File out_index = "~{output_prefix}.vcf.gz.tbi" + } + command <<< + set -euo pipefail + + function getJavaMem() { + # get JVM memory in MiB by getting total memory from /proc/meminfo + # and multiplying by java_mem_fraction + cat /proc/meminfo \ + | awk -v MEM_FIELD="$1" '{ + f[substr($1, 1, length($1)-1)] = $2 + } END { + printf "%dM", f[MEM_FIELD] * ~{default="0.85" java_mem_fraction} / 1024 + }' + } + JVM_MAX_MEM=$(getJavaMem MemTotal) + echo "JVM memory: $JVM_MAX_MEM" + + gatk --java-options "-Xmx${JVM_MAX_MEM}" GroupedSVCluster \ + ~{"-L " + contig} \ + --reference ~{reference_fasta} \ + --ploidy-table ~{ploidy_table} \ + -V ~{vcf} \ + -O ~{output_prefix}.vcf.gz \ + --clustering-config ~{clustering_config} \ + --stratify-config ~{stratification_config} \ + --context-intervals ~{sep=" --context-intervals " context_bed_files} \ + --context-name ~{sep=" --context-name " context_names} \ + --stratify-overlap-fraction ~{context_overlap_frac} \ + --stratify-num-breakpoint-overlaps ~{context_num_breakpoint_overlaps} \ + --stratify-num-breakpoint-overlaps-interchromosomal ~{context_interchrom_num_breakpoint_overlaps} \ + ~{"--breakpoint-summary-strategy " + breakpoint_summary_strategy} \ + ~{additional_args} + >>> + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: gatk_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} \ No newline at end of file diff --git a/wdl/DepthClustering.wdl b/wdl/DepthClustering.wdl index 6176c417b..20d94c5b7 100644 --- a/wdl/DepthClustering.wdl +++ b/wdl/DepthClustering.wdl @@ -105,7 +105,7 @@ workflow ClusterDepth { algorithm=clustering_algorithm, depth_sample_overlap=0, depth_interval_overlap=depth_interval_overlap, - depth_breakend_window=0, + depth_breakend_window=10000000, reference_fasta=reference_fasta, reference_fasta_fai=reference_fasta_fai, reference_dict=reference_dict, diff --git a/wdl/FormatVcfForGatk.wdl b/wdl/FormatVcfForGatk.wdl index e6461b841..703395a57 100644 --- a/wdl/FormatVcfForGatk.wdl +++ b/wdl/FormatVcfForGatk.wdl @@ -14,6 +14,9 @@ workflow FormatVcfForGatk { File contig_list File? contigs_header # Replaces vcf contig dictionary if provided String? formatter_args + # Variant ID lists for SR evidence flags. Adds as INFO field flags if provided + File? bothside_pass_list + File? background_fail_list String? chr_x String? chr_y @@ -61,6 +64,8 @@ workflow FormatVcfForGatk { args=formatter_args, output_prefix="~{prefix}.format.shard_~{i}", contigs_header=contigs_header, + bothside_pass_list=bothside_pass_list, + background_fail_list=background_fail_list, script=svtk_to_gatk_script, sv_pipeline_docker=sv_pipeline_docker, runtime_attr_override=runtime_attr_format @@ -89,6 +94,8 @@ task FormatVcf { input { File vcf File ploidy_table + File? bothside_pass_list + File? background_fail_list File? script String? args File? contigs_header # Overwrites contig dictionary, in case they are out of order @@ -119,6 +126,8 @@ task FormatVcf { --vcf ~{vcf} \ --out tmp.vcf.gz \ --ploidy-table ~{ploidy_table} \ + ~{"--bothside-pass-list " + bothside_pass_list} \ + ~{"--background-fail-list " + background_fail_list} \ ~{args} if ~{defined(contigs_header)}; then diff --git a/wdl/GenotypeBatch.wdl b/wdl/GenotypeBatch.wdl index 39855a81e..8f05dbec6 100644 --- a/wdl/GenotypeBatch.wdl +++ b/wdl/GenotypeBatch.wdl @@ -50,6 +50,8 @@ workflow GenotypeBatch { Int? sr_median_hom_ins Float? sr_hom_cutoff_multiplier + File? reformat_script + String sv_base_mini_docker String sv_pipeline_docker String linux_docker @@ -190,6 +192,7 @@ workflow GenotypeBatch { ref_dict = ref_dict, sr_hom_cutoff_multiplier = sr_hom_cutoff_multiplier, sr_median_hom_ins = sr_median_hom_ins, + reformat_script = reformat_script, sv_base_mini_docker = sv_base_mini_docker, sv_pipeline_docker = sv_pipeline_docker, linux_docker = linux_docker, @@ -248,6 +251,7 @@ workflow GenotypeBatch { coveragefile_index = coveragefile_index, n_per_split = n_per_split, ref_dict = ref_dict, + reformat_script = reformat_script, sv_base_mini_docker = sv_base_mini_docker, sv_pipeline_docker = sv_pipeline_docker, runtime_attr_split_variants = runtime_attr_split_variants, diff --git a/wdl/GenotypeBatchMetrics.wdl b/wdl/GenotypeBatchMetrics.wdl index 49f85cf87..7e216f417 100644 --- a/wdl/GenotypeBatchMetrics.wdl +++ b/wdl/GenotypeBatchMetrics.wdl @@ -48,7 +48,7 @@ workflow GenotypeBatchMetrics { baseline_vcf = baseline_genotyped_depth_vcf, samples = samples, prefix = "genotyped_depth", - types = "DEL,DUP", + types = "DEL,DUP,CNV", contig_list = contig_list, sv_pipeline_docker = sv_pipeline_docker, runtime_attr_override = runtime_attr_depth_metrics diff --git a/wdl/GenotypeDepthPart2.wdl b/wdl/GenotypeDepthPart2.wdl index 983eb0f8c..8415008ef 100644 --- a/wdl/GenotypeDepthPart2.wdl +++ b/wdl/GenotypeDepthPart2.wdl @@ -20,6 +20,8 @@ workflow GenotypeDepthPart2 { File coveragefile File? coveragefile_index + File? reformat_script + String sv_pipeline_docker String sv_base_mini_docker RuntimeAttr? runtime_attr_split_variants @@ -156,6 +158,7 @@ workflow GenotypeDepthPart2 { sv_base_mini_docker=sv_base_mini_docker, runtime_attr_override=runtime_attr_concat_vcfs } + output { File genotyped_vcf = ConcatVcfs.concat_vcf File genotyped_vcf_index = ConcatVcfs.concat_vcf_idx diff --git a/wdl/GenotypePESRPart2.wdl b/wdl/GenotypePESRPart2.wdl index 071312205..a3a652971 100644 --- a/wdl/GenotypePESRPart2.wdl +++ b/wdl/GenotypePESRPart2.wdl @@ -31,6 +31,8 @@ workflow GenotypePESRPart2 { Int? sr_median_hom_ins Float? sr_hom_cutoff_multiplier + File? reformat_script + String sv_pipeline_docker String sv_base_mini_docker String linux_docker @@ -661,4 +663,4 @@ task GenotypeSRPart2 { preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) } -} +} \ No newline at end of file diff --git a/wdl/HarmonizeHeaders.wdl b/wdl/HarmonizeHeaders.wdl deleted file mode 100644 index fe3746d96..000000000 --- a/wdl/HarmonizeHeaders.wdl +++ /dev/null @@ -1,79 +0,0 @@ -version 1.0 - -import "Structs.wdl" -import "TasksMakeCohortVcf.wdl" as MiniTasks - -# Reheader a list of vcfs with the header from another vcf - -workflow HarmonizeHeaders { - input { - File header_vcf # Vcf containing desired header - Array[File] vcfs # Vcfs to replace headers of - String prefix - String sv_base_mini_docker - RuntimeAttr? runtime_override_reheader - RuntimeAttr? runtime_override_pull_header - } - - call PullHeader { - input: - vcf=header_vcf, - prefix=prefix, - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_pull_header - } - - scatter (i in range(length(vcfs))) { - call MiniTasks.ReheaderVcf { - input: - vcf=vcfs[i], - vcf_index=vcfs[i] + ".tbi", - header=PullHeader.out, - prefix="~{prefix}.reheadered", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_reheader - } - } - - output { - Array[File] out = ReheaderVcf.out - Array[File] out_index = ReheaderVcf.out_index - } -} - -task PullHeader { - input { - File vcf - String prefix - String sv_base_mini_docker - RuntimeAttr? runtime_attr_override - } - - RuntimeAttr runtime_default = object { - mem_gb: 2.0, - disk_gb: ceil(10.0 + size(vcf, "GiB") ), - cpu_cores: 1, - preemptible_tries: 3, - max_retries: 1, - boot_disk_gb: 10 - } - RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default]) - runtime { - memory: "~{select_first([runtime_override.mem_gb, runtime_default.mem_gb])} GiB" - disks: "local-disk ~{select_first([runtime_override.disk_gb, runtime_default.disk_gb])} HDD" - cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores]) - preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries]) - maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries]) - docker: sv_base_mini_docker - bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb]) - } - - command <<< - set -euo pipefail - bcftools view --header-only ~{vcf} > ~{prefix}.header - >>> - - output { - File out = "~{prefix}.header" - } -} \ No newline at end of file diff --git a/wdl/MergePesrDepth.wdl b/wdl/MergePesrDepth.wdl deleted file mode 100644 index 65a2b139e..000000000 --- a/wdl/MergePesrDepth.wdl +++ /dev/null @@ -1,234 +0,0 @@ -version 1.0 - -import "Structs.wdl" -import "TasksMakeCohortVcf.wdl" as MiniTasks -import "HailMerge.wdl" as HailMerge -import "ShardedCluster.wdl" as ShardedCluster -import "Utils.wdl" as utils - -workflow MergePesrDepth { - input { - File subtyped_pesr_vcf - File subtyped_depth_vcf - Int num_samples - - String prefix - String cohort_name - String svtype - String contig - Float merging_shard_scale_factor = 30000000 - - Boolean use_hail = false - String? gcs_project - - String sv_pipeline_docker - String sv_base_mini_docker - - # overrides for local tasks - RuntimeAttr? runtime_override_shard_clusters - RuntimeAttr? runtime_override_shard_vids - RuntimeAttr? runtime_override_pull_vcf_shard - RuntimeAttr? runtime_override_merge_pesr_depth - - # overrides for MiniTasks - RuntimeAttr? runtime_override_sort_merged_vcf - RuntimeAttr? runtime_override_subset_small - RuntimeAttr? runtime_override_subset_large - RuntimeAttr? runtime_override_make_sites_only - RuntimeAttr? runtime_override_concat_large_pesr_depth - RuntimeAttr? runtime_override_concat_shards - - RuntimeAttr? runtime_override_preconcat_large_pesr_depth - RuntimeAttr? runtime_override_hail_merge_large_pesr_depth - RuntimeAttr? runtime_override_fix_header_large_pesr_depth - - RuntimeAttr? runtime_override_preconcat_pesr_depth_shards - RuntimeAttr? runtime_override_hail_merge_pesr_depth_shards - RuntimeAttr? runtime_override_fix_header_pesr_depth_shards - } - - # Pull out CNVs too small to cluster (less than reciprocal_overlap_fraction * min_depth_only_length) - call MiniTasks.FilterVcf as SubsetSmall { - input: - vcf=subtyped_pesr_vcf, - vcf_index=subtyped_pesr_vcf + ".tbi", - outfile_prefix="~{prefix}.subset_small", - records_filter='INFO/SVLEN<2500', - use_ssd=true, - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_subset_small - } - - call MiniTasks.FilterVcf as SubsetLarge { - input: - vcf=subtyped_pesr_vcf, - vcf_index=subtyped_pesr_vcf + ".tbi", - outfile_prefix="~{prefix}.subset_large", - records_filter='INFO/SVLEN>=2500', - use_ssd=true, - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_subset_large - } - - if (use_hail) { - call HailMerge.HailMerge as ConcatLargePesrDepthHail { - input: - vcfs=[SubsetLarge.filtered_vcf, subtyped_depth_vcf], - prefix="~{prefix}.large_pesr_depth", - gcs_project=gcs_project, - sv_base_mini_docker=sv_base_mini_docker, - sv_pipeline_docker=sv_pipeline_docker, - runtime_override_preconcat=runtime_override_preconcat_large_pesr_depth, - runtime_override_hail_merge=runtime_override_hail_merge_large_pesr_depth, - runtime_override_fix_header=runtime_override_fix_header_large_pesr_depth - } - } - if (!use_hail) { - call MiniTasks.ConcatVcfs as ConcatLargePesrDepth { - input: - vcfs=[SubsetLarge.filtered_vcf, subtyped_depth_vcf], - vcfs_idx=[SubsetLarge.filtered_vcf + ".tbi", subtyped_depth_vcf + ".tbi"], - allow_overlaps=true, - outfile_prefix="~{prefix}.large_pesr_depth", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_concat_large_pesr_depth - } - } - - call MiniTasks.MakeSitesOnlyVcf { - input: - vcf=select_first([ConcatLargePesrDepth.concat_vcf, ConcatLargePesrDepthHail.merged_vcf]), - vcf_index=select_first([ConcatLargePesrDepth.concat_vcf_idx, ConcatLargePesrDepthHail.merged_vcf_index]), - prefix="~{prefix}.large_pesr_depth.sites_only", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_make_sites_only - } - - # Fast cluster without sample overlap linkage for sharding - Int merge_shard_size = ceil(merging_shard_scale_factor / num_samples) - call ShardedCluster.ShardClusters { - input: - vcf=MakeSitesOnlyVcf.out, - prefix="~{prefix}.shard_clusters", - dist=1000000000, - frac=0.5, - svsize=0, - sv_types=[svtype], - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_shard_clusters - } - - call MiniTasks.ShardVidsForClustering { - input: - clustered_vcf=ShardClusters.out, - prefix=prefix, - records_per_shard=merge_shard_size, - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_shard_vids - } - - scatter (i in range(length(ShardVidsForClustering.out))) { - call MiniTasks.PullVcfShard { - input: - vcf=select_first([ConcatLargePesrDepth.concat_vcf, ConcatLargePesrDepthHail.merged_vcf]), - vids=ShardVidsForClustering.out[i], - prefix="~{prefix}.unclustered.shard_${i}", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_pull_vcf_shard - } - call MergePesrDepthShard { - input: - vcf=PullVcfShard.out, - vcf_index=PullVcfShard.out_index, - prefix="~{prefix}.merge_pesr_depth.shard_~{i}", - vid_prefix="~{cohort_name}_~{contig}_mpd~{i}", - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_merge_pesr_depth - } - call MiniTasks.SortVcf { - input: - vcf = MergePesrDepthShard.out, - outfile_prefix = "~{prefix}.sorted.shard_${i}", - sv_base_mini_docker = sv_base_mini_docker, - runtime_attr_override = runtime_override_sort_merged_vcf - } - } - - if (use_hail) { - call HailMerge.HailMerge as ConcatShardsHail { - input: - vcfs=flatten([[SubsetSmall.filtered_vcf], SortVcf.out]), - prefix="~{prefix}.concat_shards", - gcs_project=gcs_project, - sv_base_mini_docker=sv_base_mini_docker, - sv_pipeline_docker=sv_pipeline_docker, - runtime_override_preconcat=runtime_override_preconcat_pesr_depth_shards, - runtime_override_hail_merge=runtime_override_hail_merge_pesr_depth_shards, - runtime_override_fix_header=runtime_override_fix_header_pesr_depth_shards - } - } - if (!use_hail) { - call MiniTasks.ConcatVcfs as ConcatShards { - input: - vcfs=flatten([[SubsetSmall.filtered_vcf], SortVcf.out]), - vcfs_idx=flatten([[SubsetSmall.filtered_vcf_idx], SortVcf.out_index]), - allow_overlaps=true, - outfile_prefix="~{prefix}.concat_shards", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_concat_shards - } - } - - output { - File out = select_first([ConcatShards.concat_vcf, ConcatShardsHail.merged_vcf]) - File out_index = select_first([ConcatShards.concat_vcf_idx, ConcatShardsHail.merged_vcf_index]) - } -} - - -task MergePesrDepthShard { - input { - File vcf - File vcf_index - String prefix - String vid_prefix - String sv_pipeline_docker - RuntimeAttr? runtime_attr_override - } - - String output_file = prefix + ".vcf.gz" - - # when filtering/sorting/etc, memory usage will likely go up (much of the data will have to - # be held in memory or disk while working, potentially in a form that takes up more space) - Float input_size = size(vcf, "GiB") - RuntimeAttr runtime_default = object { - mem_gb: 2.0 + 0.6 * input_size, - disk_gb: ceil(10.0 + 6 * input_size), - cpu_cores: 1, - preemptible_tries: 3, - max_retries: 1, - boot_disk_gb: 10 - } - RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default]) - runtime { - memory: "~{select_first([runtime_override.mem_gb, runtime_default.mem_gb])} GiB" - disks: "local-disk ~{select_first([runtime_override.disk_gb, runtime_default.disk_gb])} SSD" - cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores]) - preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries]) - maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries]) - docker: sv_pipeline_docker - bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb]) - } - - command <<< - set -euo pipefail - /opt/sv-pipeline/04_variant_resolution/scripts/merge_pesr_depth.py \ - --prefix ~{vid_prefix} \ - ~{vcf} \ - ~{output_file} - >>> - - output { - File out = output_file - } -} diff --git a/wdl/ResolveComplexVariants.wdl b/wdl/ResolveComplexVariants.wdl index f712537b6..92ed26476 100644 --- a/wdl/ResolveComplexVariants.wdl +++ b/wdl/ResolveComplexVariants.wdl @@ -318,6 +318,7 @@ task BreakpointOverlap { File bothside_pass_list File background_fail_list String prefix + File? script String sv_pipeline_docker RuntimeAttr? runtime_attr_override } @@ -346,7 +347,7 @@ task BreakpointOverlap { command <<< set -euo pipefail - python /opt/sv-pipeline/04_variant_resolution/scripts/overlap_breakpoint_filter.py \ + python ~{default="/opt/sv-pipeline/04_variant_resolution/scripts/overlap_breakpoint_filter.py" script} \ ~{vcf} \ ~{bothside_pass_list} \ ~{background_fail_list} \ diff --git a/wdl/TasksClusterBatch.wdl b/wdl/TasksClusterBatch.wdl index cab695ca9..099c54adb 100644 --- a/wdl/TasksClusterBatch.wdl +++ b/wdl/TasksClusterBatch.wdl @@ -11,6 +11,7 @@ task SVCluster { File ploidy_table String output_prefix + String? vcf_grep_expression String? contig Boolean? fast_mode @@ -44,6 +45,7 @@ task SVCluster { File reference_dict Float? java_mem_fraction + String? additional_args String? variant_prefix String gatk_docker @@ -71,7 +73,7 @@ task SVCluster { File out_index = "~{output_prefix}.vcf.gz.tbi" } command <<< - set -euo pipefail + set -euxo pipefail function getJavaMem() { # get JVM memory in MiB by getting total memory from /proc/meminfo @@ -97,8 +99,14 @@ task SVCluster { exit 1 fi + if ~{defined(vcf_grep_expression)}; then + fgrep ~{vcf_grep_expression} arguments.txt + else + cat arguments.txt + fi > arguments.grep.txt + gatk --java-options "-Xmx${JVM_MAX_MEM}" SVCluster \ - --arguments_file arguments.txt \ + --arguments_file arguments.grep.txt \ --output ~{output_prefix}.vcf.gz \ --ploidy-table ~{ploidy_table} \ --reference ~{reference_fasta} \ @@ -125,7 +133,8 @@ task SVCluster { ~{"--pesr-breakend-window " + pesr_breakend_window} \ ~{"--insertion-length-summary-strategy " + insertion_length_summary_strategy} \ ~{"--breakpoint-summary-strategy " + breakpoint_summary_strategy} \ - ~{"--alt-allele-summary-strategy " + alt_allele_summary_strategy} + ~{"--alt-allele-summary-strategy " + alt_allele_summary_strategy} \ + ~{additional_args} >>> runtime { cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) @@ -242,6 +251,7 @@ task GatkToSvtkVcf { File contig_list String? remove_infos String? remove_formats + Boolean set_pass = false String output_prefix String sv_pipeline_docker RuntimeAttr? runtime_attr_override @@ -269,7 +279,8 @@ task GatkToSvtkVcf { --source ~{source} \ --contigs ~{contig_list} \ ~{"--remove-infos " + remove_infos} \ - ~{"--remove-formats " + remove_formats} + ~{"--remove-formats " + remove_formats} \ + ~{if set_pass then "--set-pass" else ""} tabix ~{output_prefix}.vcf.gz >>> runtime { diff --git a/wdl/TasksGenotypeBatch.wdl b/wdl/TasksGenotypeBatch.wdl index aa1221b3e..151391489 100644 --- a/wdl/TasksGenotypeBatch.wdl +++ b/wdl/TasksGenotypeBatch.wdl @@ -608,3 +608,45 @@ task IntegrateDepthGq { maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) } } + +task ReformatGenotypedVcf { + input { + File vcf + File? script + String output_prefix + String sv_pipeline_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: ceil(10 + size(vcf, "GB") * 2), + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + output { + File out = "~{output_prefix}.vcf.gz" + File out_index = "~{output_prefix}.vcf.gz.tbi" + } + command <<< + set -euo pipefail + python ~{default="/opt/sv-pipeline/04_variant_resolution/scripts/reformat_genotyped_vcf.py" script} --vcf ~{vcf} \ + | bgzip \ + > ~{output_prefix}.vcf.gz + tabix ~{output_prefix}.vcf.gz + + >>> + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: sv_pipeline_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } +} \ No newline at end of file diff --git a/wdl/VcfClusterSingleChromsome.wdl b/wdl/VcfClusterSingleChromsome.wdl deleted file mode 100644 index 4476ce40a..000000000 --- a/wdl/VcfClusterSingleChromsome.wdl +++ /dev/null @@ -1,467 +0,0 @@ -version 1.0 - -# Author: Ryan Collins - -import "Structs.wdl" -import "TasksMakeCohortVcf.wdl" as MiniTasks -import "ClusterSingleChromosome.wdl" as VcfClusterTasks - -# Workflow to run parallelized vcf clustering for a single chromosome -workflow VcfClusterSingleChrom { - input { - Array[File] vcfs - Int num_samples - String prefix - String evidence_type - String cohort_name - Int dist - Float frac - Float sample_overlap - File? exclude_list - Array[String] batches - Int sv_size - Array[String] sv_types - String contig - Int localize_shard_size - Boolean subset_sr_lists - File bothside_pass - File background_fail - File empty_file - - Boolean use_hail - String? gcs_project - - String sv_pipeline_docker - String sv_base_mini_docker - - # overrides for local tasks - RuntimeAttr? runtime_override_localize_vcfs - RuntimeAttr? runtime_override_join_vcfs - RuntimeAttr? runtime_override_fix_multiallelic - RuntimeAttr? runtime_override_fix_ev_tags - - # overrides for MiniTasks - RuntimeAttr? runtime_override_subset_bothside_pass - RuntimeAttr? runtime_override_subset_background_fail - - # overrides for VcfClusterTasks - RuntimeAttr? runtime_override_shard_clusters - RuntimeAttr? runtime_override_shard_vids - RuntimeAttr? runtime_override_subset_sv_type - RuntimeAttr? runtime_override_shard_vcf_precluster - RuntimeAttr? runtime_override_pull_vcf_shard - RuntimeAttr? runtime_override_svtk_vcf_cluster - RuntimeAttr? runtime_override_get_vcf_header_with_members_info_line - RuntimeAttr? runtime_override_concat_vcf_cluster - RuntimeAttr? runtime_override_concat_svtypes - RuntimeAttr? runtime_override_concat_sharded_cluster - RuntimeAttr? runtime_override_make_sites_only - RuntimeAttr? runtime_override_sort_merged_vcf - - RuntimeAttr? runtime_override_preconcat_sharded_cluster - RuntimeAttr? runtime_override_hail_merge_sharded_cluster - RuntimeAttr? runtime_override_fix_header_sharded_cluster - } - - scatter (i in range(length(vcfs))) { - call LocalizeContigVcfs { - input: - vcf=vcfs[i], - vcf_index = vcfs[i] + ".tbi", - shard_size = localize_shard_size, - contig=contig, - prefix=prefix + "." + contig + "." + batches[i], - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_localize_vcfs - } - } - Array[Array[File]] sharded_vcfs_ = transpose(LocalizeContigVcfs.out) - - scatter (i in range(length(sharded_vcfs_))) { - call JoinVcfs { - input: - vcfs=sharded_vcfs_[i], - contig=contig, - prefix=prefix + "." + i, - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_join_vcfs - } - call FixMultiallelicRecords { - input: - joined_vcf=JoinVcfs.out, - batch_contig_vcfs=sharded_vcfs_[i], - contig=contig, - prefix=prefix + "." + i, - sv_pipeline_docker=sv_pipeline_docker, - runtime_attr_override=runtime_override_fix_multiallelic - } - call FixEvidenceTags { - input: - vcf=FixMultiallelicRecords.out, - contig=contig, - prefix=prefix + "." + i, - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_fix_ev_tags - } - } - - call MiniTasks.ConcatVcfs { - input: - vcfs=FixEvidenceTags.out, - vcfs_idx=FixEvidenceTags.out_index, - naive=true, - outfile_prefix="~{prefix}.precluster_concat", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_concat_vcf_cluster - } - - #Run vcfcluster per chromosome - call VcfClusterTasks.ClusterSingleChrom { - input: - vcf=ConcatVcfs.concat_vcf, - vcf_index=ConcatVcfs.concat_vcf_idx, - num_samples=num_samples, - contig=contig, - cohort_name=cohort_name, - evidence_type=evidence_type, - prefix=prefix, - dist=dist, - frac=frac, - sample_overlap=sample_overlap, - exclude_list=exclude_list, - sv_size=sv_size, - sv_types=sv_types, - empty_file=empty_file, - use_hail=use_hail, - gcs_project=gcs_project, - sv_pipeline_docker=sv_pipeline_docker, - sv_base_mini_docker=sv_base_mini_docker, - runtime_override_subset_sv_type=runtime_override_subset_sv_type, - runtime_override_shard_clusters=runtime_override_shard_clusters, - runtime_override_shard_vids=runtime_override_shard_vids, - runtime_override_pull_vcf_shard=runtime_override_pull_vcf_shard, - runtime_override_svtk_vcf_cluster=runtime_override_svtk_vcf_cluster, - runtime_override_get_vcf_header_with_members_info_line=runtime_override_get_vcf_header_with_members_info_line, - runtime_override_concat_svtypes=runtime_override_concat_svtypes, - runtime_override_concat_sharded_cluster=runtime_override_concat_sharded_cluster, - runtime_override_make_sites_only=runtime_override_make_sites_only, - runtime_override_sort_merged_vcf=runtime_override_sort_merged_vcf, - runtime_override_preconcat_sharded_cluster=runtime_override_preconcat_sharded_cluster, - runtime_override_hail_merge_sharded_cluster=runtime_override_hail_merge_sharded_cluster, - runtime_override_fix_header_sharded_cluster=runtime_override_fix_header_sharded_cluster - } - - if(subset_sr_lists) { - #Subset bothside_pass & background_fail to chromosome of interest - call SubsetVariantList as SubsetBothsidePass { - input: - vid_list=bothside_pass, - vid_col=2, - vcf=ConcatVcfs.concat_vcf, - outfile_name="~{prefix}.pass.VIDs.list", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_subset_bothside_pass - } - call SubsetVariantList as SubsetBackgroundFail { - input: - vid_list=background_fail, - vid_col=1, - vcf=ConcatVcfs.concat_vcf, - outfile_name="~{prefix}.fail.VIDs.list", - sv_base_mini_docker=sv_base_mini_docker, - runtime_attr_override=runtime_override_subset_background_fail - } - } - - output { - Array[File] clustered_vcfs = ClusterSingleChrom.clustered_vcfs - Array[File] clustered_vcf_indexes = ClusterSingleChrom.clustered_vcf_indexes - File filtered_bothside_pass = select_first([SubsetBothsidePass.filtered_vid_list, empty_file]) - File filtered_background_fail = select_first([SubsetBackgroundFail.filtered_vid_list, empty_file]) - } -} - -task LocalizeContigVcfs { - input { - File vcf - File vcf_index - Int shard_size - String contig - String prefix - String sv_pipeline_docker - RuntimeAttr? runtime_attr_override - } - - RuntimeAttr runtime_default = object { - mem_gb: 3.75, - disk_gb: ceil(10 + size(vcf, "GiB") * 1.5), - cpu_cores: 1, - preemptible_tries: 1, - max_retries: 1, - boot_disk_gb: 10 - } - RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default]) - - runtime { - memory: select_first([runtime_override.mem_gb, runtime_default.mem_gb]) + " GiB" - disks: "local-disk " + select_first([runtime_override.disk_gb, runtime_default.disk_gb]) + " HDD" - cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores]) - preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries]) - maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries]) - docker: sv_pipeline_docker - bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb]) - } - - command <<< - set -euxo pipefail - - # See Issue #52 "Use GATK to retrieve VCF records in JoinContigFromRemoteVcfs" - # https://github.com/broadinstitute/gatk-sv/issues/52 - - tabix -h "~{vcf}" "~{contig}" \ - | sed "s/AN=[0-9]*;//g" \ - | sed "s/AC=[0-9]*;//g" \ - | bgzip \ - > contig.vcf.gz - tabix contig.vcf.gz - - python3 <>> - - output { - Array[File] out = glob("~{prefix}.*.vcf.gz") - } -} - -# Merge contig vcfs across batches -task JoinVcfs { - input { - Array[File] vcfs - String contig - String prefix - String sv_pipeline_docker - RuntimeAttr? runtime_attr_override - } - - Float input_size = size(vcfs, "GiB") - Float input_size_ratio = 3.0 - Float base_disk_gb = 10.0 - RuntimeAttr runtime_default = object { - mem_gb: 1.0, - disk_gb: ceil(base_disk_gb + input_size * input_size_ratio), - cpu_cores: 1, - preemptible_tries: 1, - max_retries: 1, - boot_disk_gb: 10 - } - RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default]) - - runtime { - memory: select_first([runtime_override.mem_gb, runtime_default.mem_gb]) + " GiB" - disks: "local-disk " + select_first([runtime_override.disk_gb, runtime_default.disk_gb]) + " HDD" - cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores]) - preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries]) - maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries]) - docker: sv_pipeline_docker - bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb]) - } - - command <<< - set -euo pipefail - - python3 < ~{prefix}.~{contig}.joined.vcf.gz - import sys - import gzip - - fl = open("~{write_lines(vcfs)}") - files = [gzip.open(f.strip(), 'rb') for f in fl.readlines()] - lines_zip = zip(*files) - - for linesb in lines_zip: - lines = [l.decode('utf-8') for l in linesb] - ex = lines[0] - if ex.startswith('##'): - sys.stdout.write(ex) - else: - sys.stdout.write(ex.strip()) - if len(lines) > 1: - sys.stdout.write('\t') - out_lines = [l.strip().split('\t', 9)[-1] for l in lines[1:]] - sys.stdout.write("\t".join(out_lines)) - sys.stdout.write('\n') - CODE - tabix ~{prefix}.~{contig}.joined.vcf.gz - >>> - - output { - File out = "~{prefix}.~{contig}.joined.vcf.gz" - File out_index = "~{prefix}.~{contig}.joined.vcf.gz.tbi" - } -} - -# Add in max CN state to multiallelics -task FixMultiallelicRecords { - input { - File joined_vcf - Array[File] batch_contig_vcfs - String contig - String prefix - String sv_pipeline_docker - RuntimeAttr? runtime_attr_override - } - - Float input_size = size(joined_vcf, "GiB") * 2 + size(batch_contig_vcfs, "GiB") - Float input_size_fraction = 2.0 - Float base_disk_gb = 10.0 - RuntimeAttr runtime_default = object { - mem_gb: 3.75, - disk_gb: ceil(base_disk_gb + input_size * input_size_fraction), - cpu_cores: 1, - preemptible_tries: 1, - max_retries: 1, - boot_disk_gb: 10 - } - RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default]) - - runtime { - memory: select_first([runtime_override.mem_gb, runtime_default.mem_gb]) + " GiB" - disks: "local-disk " + select_first([runtime_override.disk_gb, runtime_default.disk_gb]) + " HDD" - cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores]) - preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries]) - maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries]) - docker: sv_pipeline_docker - bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb]) - } - - command <<< - set -euxo pipefail - /opt/sv-pipeline/04_variant_resolution/scripts/make_concordant_multiallelic_alts.py \ - ~{joined_vcf} \ - ~{write_lines(batch_contig_vcfs)} \ - ~{prefix}.~{contig}.fixed_multiallelics.vcf.gz - tabix ~{prefix}.~{contig}.fixed_multiallelics.vcf.gz - >>> - - output { - File out = "~{prefix}.~{contig}.fixed_multiallelics.vcf.gz" - File out_index = "~{prefix}.~{contig}.fixed_multiallelics.vcf.gz.tbi" - } -} - -# Convert EV field from String to Integer -task FixEvidenceTags { - input { - File vcf - String contig - String prefix - String sv_base_mini_docker - RuntimeAttr? runtime_attr_override - } - - Float input_size = size(vcf, "GiB") - Float input_size_ratio = 2.0 - Float base_disk_gb = 10.0 - RuntimeAttr runtime_default = object { - mem_gb: 3.75, - disk_gb: ceil(base_disk_gb + input_size * input_size_ratio), - cpu_cores: 1, - preemptible_tries: 1, - max_retries: 1, - boot_disk_gb: 10 - } - RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default]) - - runtime { - memory: select_first([runtime_override.mem_gb, runtime_default.mem_gb]) + " GiB" - disks: "local-disk " + select_first([runtime_override.disk_gb, runtime_default.disk_gb]) + " HDD" - cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores]) - preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries]) - maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries]) - docker: sv_base_mini_docker - bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb]) - } - - command <<< - set -euxo pipefail - zcat ~{vcf} \ - | sed -e 's/:RD,PE,SR/:7/g' \ - | sed -e 's/:PE,SR/:6/g' \ - | sed -e 's/:RD,SR/:5/g' \ - | sed -e 's/:RD,PE/:3/g' \ - | sed -e 's/:PE\t/:2\t/g' -e 's/:SR\t/:4\t/g' -e 's/:RD\t/:1\t/g' \ - | sed -e 's/ID=EV,Number=.,Type=String/ID=EV,Number=1,Type=Integer/g' \ - | bgzip \ - > ~{prefix}.~{contig}.unclustered.vcf.gz - tabix ~{prefix}.~{contig}.unclustered.vcf.gz - >>> - - output { - File out = "~{prefix}.~{contig}.unclustered.vcf.gz" - File out_index = "~{prefix}.~{contig}.unclustered.vcf.gz.tbi" - } -} - -# Find intersection of Variant IDs from vid_list with those present in vcf, return as filtered_vid_list -task SubsetVariantList { - input { - File vid_list - Int vid_col - File vcf - String outfile_name - String sv_base_mini_docker - RuntimeAttr? runtime_attr_override - } - - # when filtering/sorting/etc, memory usage will likely go up (much of the data will have to - # be held in memory or disk while working, potentially in a form that takes up more space) - RuntimeAttr runtime_default = object { - mem_gb: 3.75, - disk_gb: ceil(10.0 + size(vid_list, "GB") * 2.0 + size(vcf, "GB")), - cpu_cores: 1, - preemptible_tries: 3, - max_retries: 1, - boot_disk_gb: 10 - } - RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default]) - runtime { - memory: select_first([runtime_override.mem_gb, runtime_default.mem_gb]) + " GB" - disks: "local-disk " + select_first([runtime_override.disk_gb, runtime_default.disk_gb]) + " HDD" - cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores]) - preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries]) - maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries]) - docker: sv_base_mini_docker - bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb]) - } - - command <<< - set -euo pipefail - zgrep -v "^#" ~{vcf} | cut -f3 > valid_vids.list - awk -F'\t' -v OFS='\t' 'ARGIND==1{inFileA[$1]; next} {if ($~{vid_col} in inFileA) print }' valid_vids.list ~{vid_list} \ - > ~{outfile_name} - >>> - - output { - File filtered_vid_list = outfile_name - } -} \ No newline at end of file