Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReshardVcf workflow #613

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions inputs/templates/test/ReshardVcf/ReshardVcf.json.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"ReshardVcf.vcfs": {{ test_batch.complex_resolve_vcfs | tojson }},
"ReshardVcf.contig_list": {{ reference_resources.primary_contigs_list | tojson }},
"ReshardVcf.prefix": {{ test_batch.name | tojson }},
"ReshardVcf.sv_base_mini_docker": {{ dockers.sv_base_mini_docker | tojson }}
}
87 changes: 87 additions & 0 deletions wdl/ReshardVcf.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
version 1.0

import "Structs.wdl"
import "TasksMakeCohortVcf.wdl" as MiniTasks

# Consumes a contig-sharded vcf where some shards may contain a small proportion of records from other
# contigs and moves them to their correct shards.

workflow ReshardVcf {
input {
Array[File] vcfs # Must be sorted and indexed
File contig_list
String prefix
Boolean? use_ssd
String sv_base_mini_docker
RuntimeAttr? runtime_override_reshard
}

Array[String] contigs = read_lines(contig_list)

scatter (i in range(length(vcfs))) {
File vcf_indexes = vcfs[i] + ".tbi"
}
Copy link
Member

@VJalili VJalili Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to add an optional input Array[File]? vcf_indexes and fall back to constructing the indexes based on the input VCFs if the optional input was not provided? I think it can be helpful to hint to a user if indexes are needed and provide them with an option of explicitly providing them (e.g., if they are on a different path).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a convention we always require input VCFs to have indexes for top-level workflows. These are already generated by the pipeline so shouldn't cause any problems as long as the VCFs were not manually moved without their indexes.


scatter (contig in contigs) {
call ReshardContig {
input:
vcfs=vcfs,
vcf_indexes=vcf_indexes,
contig=contig,
prefix="~{prefix}.~{contig}.resharded",
use_ssd=use_ssd,
sv_base_mini_docker=sv_base_mini_docker,
runtime_attr_override=runtime_override_reshard
}
}
output {
Array[File] resharded_vcfs = ReshardContig.out
Array[File] resharded_vcf_indexes = ReshardContig.out_index
}
}

task ReshardContig {
input {
Array[File] vcfs
Array[File] vcf_indexes
String contig
String prefix
String sv_base_mini_docker
Boolean use_ssd = false
RuntimeAttr? runtime_attr_override
}

String disk_type = if use_ssd then "SSD" else "HDD"

RuntimeAttr runtime_default = object {
mem_gb: 8,
disk_gb: ceil(10 + size(vcfs, "GB") * 2),
cpu_cores: 4,
preemptible_tries: 3,
max_retries: 1,
boot_disk_gb: 10
}
RuntimeAttr runtime_attr = select_first([runtime_attr_override, runtime_default])
Int n_cpu = select_first([runtime_attr.cpu_cores, runtime_default.cpu_cores])
String threads_arg = if n_cpu > 1 then "--threads " + n_cpu else ""
runtime {
memory: select_first([runtime_attr.mem_gb, runtime_default.mem_gb]) + " GB"
disks: "local-disk " + select_first([runtime_attr.disk_gb, runtime_default.disk_gb]) + " " + disk_type
cpu: n_cpu
preemptible: select_first([runtime_attr.preemptible_tries, runtime_default.preemptible_tries])
maxRetries: select_first([runtime_attr.max_retries, runtime_default.max_retries])
docker: sv_base_mini_docker
bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, runtime_default.boot_disk_gb])
}

command <<<
set -euo pipefail
bcftools concat ~{threads_arg} --allow-overlaps --regions "~{contig}" -Oz -o ~{prefix}.vcf.gz ~{sep=" " vcfs}
tabix ~{prefix}.vcf.gz
>>>

output {
File out = "~{prefix}.vcf.gz"
File out_index = "~{prefix}.vcf.gz.tbi"
}
}