Skip to content

Commit

Permalink
VS-1422 Merge VAT changes in echo into ah_var_store (#8977)
Browse files Browse the repository at this point in the history
* VS-1422. Update ah_var_store with VAT-related changes from EchoCallset
* Update GvsCreateVATfromVDS.wdl with changes from EchoCallset.
* Moved changes for hail_create_vat_inputs.py from EchoCallset into branch.
* Update run_in_hail_cluster.py
* Rebuilt docker with changes merged in from EchoCallset.
  • Loading branch information
gbggrant authored Oct 1, 2024
1 parent 4966cff commit 7590b7b
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ workflow GvsCreateVATfromVDS {
String filter_set_name
File? sites_only_vcf
File? sites_only_vcf_index
String? hail_generate_sites_only_script_path
File? vds_path
String output_path

Expand Down Expand Up @@ -50,19 +49,17 @@ workflow GvsCreateVATfromVDS {
help: "name of the filter set used to generate the callset in GVS"
}
sites_only_vcf: {
help: "Optional sites-only VCF file. If defined, generation of a sites-only VCF from the VDS will be skipped. If defined, 'vds_path' and 'hail_generate_sites_only_script_path' must NOT be defined."
}
hail_generate_sites_only_script_path: {
help: "Optional hail_create_vat_inputs.py script in GCS that was created by the GvsExtractAvroFilesForHail WDL. If defined, 'vds_path' must also be defined and 'sites_only_vcf' must NOT be defined"
help: "Optional sites-only VCF file. If defined, generation of a sites-only VCF from the VDS will be skipped. If defined, then 'vds_path' must NOT be defined."
}
output_path: {
help: "GCS location (with a trailing '/') to put temporary and output files for the VAT pipeline"
}
vds_path: {
help: "Optional top-level directory of the GVS VDS to be used to create the VAT. If defined, 'hail_create_vat_inputs_script' must also be defined and 'sites_only_vcf' must NOT be defined"
help: "Optional top-level directory of the GVS VDS to be used to create the VAT. If defined, then 'sites_only_vcf' must NOT be defined"
}
}


File interval_list = "gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list"
File reference = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta"
File reference_dict = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict"
Expand Down Expand Up @@ -93,23 +90,23 @@ workflow GvsCreateVATfromVDS {
String output_path_without_a_trailing_slash = sub(output_path, "/$", "")
String effective_output_path = if (output_path == output_path_without_a_trailing_slash) then output_path + "/" else output_path

if ((defined(sites_only_vcf)) && (defined(vds_path) || defined(hail_generate_sites_only_script_path))) {
if ((defined(sites_only_vcf)) && (defined(vds_path))) {
call Utils.TerminateWorkflow as IfSitesOnlyVcfSetDontSetCreateParameters {
input:
message = "Error: If 'sites_only_vcf' is set as an input, you may not set 'vds_path' and 'hail_generate_sites_only_script_path'",
message = "Error: If 'sites_only_vcf' is set as an input, you may not set 'vds_path'",
basic_docker = effective_basic_docker,
}
}

if (!defined(sites_only_vcf) && ((!defined(vds_path) || !defined(hail_generate_sites_only_script_path)))) {
if (!defined(sites_only_vcf) && (!defined(vds_path))) {
call Utils.TerminateWorkflow as MustSetSitesOnlyVcfCreationParameters {
input:
message = "Error: If 'sites_only_vcf' is not set as an input, you MUST set 'vds_path' and 'hail_generate_sites_only_script_path'",
message = "Error: If 'sites_only_vcf' is not set as an input, you MUST set 'vds_path'",
basic_docker = effective_basic_docker,
}
}

if (defined(sites_only_vcf) || (defined(vds_path) && defined(hail_generate_sites_only_script_path))) {
if (defined(sites_only_vcf) || (defined(vds_path))) {
if (!defined(split_intervals_scatter_count)) {
call Utils.GetBQTableLastModifiedDatetime as SampleDateTime {
input:
Expand Down Expand Up @@ -146,7 +143,6 @@ workflow GvsCreateVATfromVDS {
workspace_project = effective_google_project,
hail_version = effective_hail_version,
hail_wheel = hail_wheel,
hail_generate_sites_only_script_path = select_first([hail_generate_sites_only_script_path]),
ancestry_file_path = MakeSubpopulationFilesAndReadSchemaFiles.ancestry_file_path,
workspace_bucket = GetToolVersions.workspace_bucket,
region = region,
Expand Down Expand Up @@ -315,7 +311,6 @@ task GenerateSitesOnlyVcf {
Boolean leave_cluster_running_at_end
String hail_version
File? hail_wheel
String hail_generate_sites_only_script_path
String ancestry_file_path
String? hail_temp_path
Int? cluster_max_idle_minutes
Expand Down Expand Up @@ -349,7 +344,7 @@ task GenerateSitesOnlyVcf {
cluster_name="~{prefix}-${hex}"
echo ${cluster_name} > cluster_name.txt

sites_only_vcf_filename="~{workspace_bucket}/~{prefix}-${hex}.sites-only.vcf"
sites_only_vcf_filename="~{workspace_bucket}/~{prefix}-${hex}.sites-only.vcf.bgz"
echo ${sites_only_vcf_filename} > sites_only_vcf_filename.txt

if [[ -z "~{hail_temp_path}" ]]
Expand All @@ -369,13 +364,10 @@ task GenerateSitesOnlyVcf {
}
FIN
# Run the hail python script to make a VDS
gsutil cp ~{hail_generate_sites_only_script_path} /app/
# Run the hail python script to make a sites-only VCF from a VDS
# - The autoscaling policy gvs-autoscaling-policy will exist already from the VDS creation
python3 /app/run_in_hail_cluster.py \
--script-path /app/~{basename(hail_generate_sites_only_script_path)} \
--script-path /app/hail_create_vat_inputs.py \
--secondary-script-path-list /app/create_vat_inputs.py \
--script-arguments-json-path script-arguments.json \
--account ${account_name} \
Expand Down Expand Up @@ -709,10 +701,10 @@ task AnnotateVCF {
runtime {
docker: variants_nirvana_docker
memory: "64 GB"
memory: "128 GB"
cpu: "4"
preemptible: 3
maxRetries: 2
preemptible: 1
maxRetries: 1
disks: "local-disk 2000 HDD"
}
Expand Down Expand Up @@ -758,8 +750,8 @@ task PrepVtAnnotationJson {
runtime {
docker: variants_docker
memory: "7 GB"
preemptible: 3
memory: "16 GB"
preemptible: 2
cpu: "1"
disks: "local-disk 500 HDD"
}
Expand Down Expand Up @@ -896,9 +888,12 @@ task BigQueryLoadJson {
echo "Dropping and recreating the vat table ~{dataset_name}.~{vat_table_name}"
bq --apilog=false rm -t -f --project_id=~{project_id} ~{dataset_name}.~{vat_table_name}
fi
bq --apilog=false mk --expiration=$DATE --project_id=~{project_id} ~{dataset_name}.~{vat_table_name} ~{nirvana_schema}
CLUSTERING_STRING="--clustering_fields=contig"
bq --apilog=false mk ${CLUSTERING_STRING} --expiration=$DATE --project_id=~{project_id} ~{dataset_name}.~{vat_table_name} ~{nirvana_schema}
echo "Loading data into it"
# Now we run a giant query in BQ to get this all in the right table and join the genes properly
# Note the genes table join includes the group by to avoid the duplicates that get created from genes that span shards
# Commented out columns in the query are to be added in the next release
Expand Down
6 changes: 3 additions & 3 deletions scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ task GetToolVersions {
# GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but
# there are a handlful of tasks that require the larger GNU libc-based `slim`.
String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-08-15-alpine-254df9be288d"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-09-30-alpine-27b8708f5808"
String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19"
String gatk_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/gatk:2024_08_19-gatkbase-cd5b6b7821b2"
String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest"
Expand Down Expand Up @@ -1059,7 +1059,7 @@ task IndexVcf {
Int max_heap = memory_mb - 500
String local_file = basename(input_vcf)
Boolean is_compressed = sub(local_file, ".*\\.", "") == "gz"
Boolean is_compressed = (sub(local_file, ".*\\.", "") == "gz") || (sub(local_file, ".*\\.", "") == "bgz")
String index_extension = if is_compressed then ".tbi" else ".idx"
command <<<
Expand Down Expand Up @@ -1354,7 +1354,7 @@ task CopyFile {
fi
gsutil cp ~{input_file} ${OUTPUT_GCS_DIR}/
echo ${OUTPUT_PATH} > output_file_path.txt
echo $OUTPUT_PATH > output_file_path.txt
>>>
output {
String output_file_path = read_string("output_file_path.txt")
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/extract/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# expensive to create and isn't expected to change often, while the steps in this Dockerfile are much less expensive and
# more likely to change. Using a build-base image essentially allows the expensive layers to be globally cached which
# should make building the final image much faster in most cases.
FROM us.gcr.io/broad-dsde-methods/variantstore:2024-07-01-alpine-build-base as build
FROM us.gcr.io/broad-dsde-methods/variantstore:2024-09-30-alpine-build-base as build

# Install all of our variantstore Python requirements.
COPY requirements.txt requirements.txt
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/extract/build_base.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ RUN apk add autoconf bash cmake g++ gcc make ninja python3-dev git openssl-dev z

ARG ARROW_VERSION=16.1.0
RUN cd / && \
curl -O https://dlcdn.apache.org/arrow/arrow-$ARROW_VERSION/apache-arrow-$ARROW_VERSION.tar.gz && \
curl -O https://archive.apache.org/dist/arrow/arrow-$ARROW_VERSION/apache-arrow-$ARROW_VERSION.tar.gz && \
tar xfz apache-arrow-$ARROW_VERSION.tar.gz

# Pyarrow build instructions from https://arrow.apache.org/docs/developers/python.html#python-development
Expand Down
144 changes: 114 additions & 30 deletions scripts/variantstore/wdl/extract/hail_create_vat_inputs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import hail as hl
from typing import Union
from datetime import datetime

import create_vat_inputs
Expand Down Expand Up @@ -63,29 +64,75 @@ def failing_gts_to_no_call(vds):

def remove_too_many_alt_allele_sites(vds):
"""
Remove sites with more than 50 alternate alleles (and print how many)
Remove sites with more than 50 alternate alleles
"""
vd = vds.variant_data
print(vd.aggregate_rows(hl.agg.count_where(hl.len(vd.alleles) > 50)))
vd_50_aa_cutoff = vd.filter_rows(hl.len(vd.alleles) <= 50)

return hl.vds.VariantDataset(vds.reference_data, vd_50_aa_cutoff)



def annotate_adj(
mt: hl.MatrixTable,
adj_gq: int = 30,
adj_ab: float = 0.2,
) -> hl.MatrixTable:
"""
Annotate genotypes with adj criteria (assumes diploid).
Defaults similar to gnomAD values, but GQ >= 20 changed to GQ >= 30 to make up for lack of DP filter.
"""
if "LGT" in mt.entry and "LAD" in mt.entry:
gt_expr = mt.LGT
ad_expr = mt.LAD
else:
assert "GT" in mt.entry and "AD" in mt.entry
gt_expr = mt.GT
ad_expr = mt.AD
return mt.annotate_entries(
adj=get_adj_expr(
gt_expr, mt.GQ, ad_expr, adj_gq, adj_ab
)
)
def get_adj_expr(
gt_expr: hl.expr.CallExpression,
gq_expr: Union[hl.expr.Int32Expression, hl.expr.Int64Expression],
ad_expr: hl.expr.ArrayNumericExpression,
adj_gq: int = 30,
adj_ab: float = 0.2,
) -> hl.expr.BooleanExpression:
"""
Get adj genotype annotation.
Defaults similar to gnomAD values, but GQ >= 20 changed to GQ >= 30 to make up for lack of DP filter.
"""
total_ad = hl.sum(ad_expr)
return (
(gq_expr >= adj_gq)
& (
hl.case()
.when(~gt_expr.is_het(), True)
.when(gt_expr.is_het_ref(), ad_expr[gt_expr[1]] / total_ad >= adj_ab)
.default(
(ad_expr[gt_expr[0]] / total_ad >= adj_ab)
& (ad_expr[gt_expr[1]] / total_ad >= adj_ab)
)
)
)
def matrix_table_ac_an_af(mt, ancestry_file):
"""
Create a DENSE MATRIX TABLE to calculate AC, AN, AF and Sample Count
TODO: test sample_count
"""

sample_id_to_sub_population = create_vat_inputs.parse_ancestry_file(ancestry_file)

mt = mt.annotate_cols(pop=hl.literal(sample_id_to_sub_population)[mt.s])
mt = annotate_adj(mt)
ac_an_af_mt = mt.select_rows(
ac_an_af=hl.agg.call_stats(mt.GT, mt.alleles),
call_stats_by_pop=hl.agg.group_by(mt.pop, hl.agg.call_stats(mt.GT, mt.alleles)),
ac_an_af_adj=hl.agg.filter(mt.adj, hl.agg.call_stats(mt.GT, mt.alleles)),
call_stats_by_pop_adj=hl.agg.filter(mt.adj, hl.agg.group_by(mt.pop, hl.agg.call_stats(mt.GT, mt.alleles))),
)

return hl.methods.split_multi(ac_an_af_mt, left_aligned=True) # split each alternate allele onto it's own row. This will also remove all spanning delstions for us


Expand All @@ -110,7 +157,7 @@ def write_sites_only_vcf(ac_an_af_split, sites_only_vcf_path):
pop_info_fields[f'Hom_{pop}'] = ac_an_af_split.call_stats_by_pop.get(pop).homozygote_count[ac_an_af_split.row.a_index]


ac_an_af_rows = ac_an_af_split.annotate_rows(
ac_an_af_rows = ac_an_af_split.annotate(
info = hl.struct(
AC=ac_an_af_split.row.ac_an_af.AC[ac_an_af_split.row.a_index],
AN=ac_an_af_split.row.ac_an_af.AN,
Expand All @@ -122,32 +169,66 @@ def write_sites_only_vcf(ac_an_af_split, sites_only_vcf_path):

# note that SC = AC - homozygote_count

ht = ac_an_af_rows.rows()
ht = ht.filter(ht.alleles[1] != "*") # remove spanning deletions
ht = ac_an_af_rows.filter(ac_an_af_rows.alleles[1] != "*") # remove spanning deletions
# create a filtered sites only VCF
hl.export_vcf(ht, sites_only_vcf_path)


def add_variant_tracking_info(mt, sites_only_vcf_path):
# only need the table of row fields and leaves this as the only field
var_ids_path = sites_only_vcf_path.replace(r".sites-only.vcf", ".var_ids.tsv.bgz")
t = mt.rows()
t.select(var_origin_id=hl.format('%s-%s-%s-%s', t.locus.contig, t.locus.position, t.alleles[0], t.alleles[1])).export(var_ids_path, parallel='separate_header')


def main(vds, ancestry_file_location, sites_only_vcf_path, dry_run_n_parts=None):
n_parts = vds.variant_data.n_partitions()
if dry_run_n_parts:
n_rounds = 1
parts_per_round = dry_run_n_parts
ht_paths = [sites_only_vcf_path.replace(r".sites-only.vcf.bgz", f'_dryrun.ht')]
sites_only_vcf_path = sites_only_vcf_path.replace(r".vcf.bgz", f'_dryrun.vcf.bgz')
else:
n_rounds = 5
# Add in 'n_rounds - 1' to include all of the partitions in the set of groups, otherwise we would omit the final
# n_parts % n_rounds partitions.
parts_per_round = (n_parts + n_rounds - 1) // n_rounds
ht_paths = [sites_only_vcf_path.replace(r".sites-only.vcf.bgz", f'_{i}.ht') for i in range(n_rounds)]
for i in range(n_rounds):
part_range = range(i*parts_per_round, min((i+1)*parts_per_round, n_parts))
vds_part = hl.vds.VariantDataset(
vds.reference_data._filter_partitions(part_range),
vds.variant_data._filter_partitions(part_range),
)

transforms = [
remove_too_many_alt_allele_sites,
hard_filter_non_passing_sites,
failing_gts_to_no_call,
]
transformed_vds=vds_part
for transform in transforms:
transformed_vds = transform(transformed_vds)

print(f"densifying dense matrix table index {i}")
mt = hl.vds.to_dense_mt(transformed_vds)

def main(vds, ancestry_file_location, sites_only_vcf_path):
transforms = [
hard_filter_non_passing_sites,
failing_gts_to_no_call,
remove_too_many_alt_allele_sites
]
transformed_vds=vds
for transform in transforms:
transformed_vds = transform(transformed_vds)
with open(ancestry_file_location, 'r') as ancestry_file:
mt = matrix_table_ac_an_af(mt, ancestry_file) # this adds subpopulation information and splits our multi-allelic rows

mt = hl.vds.to_dense_mt(transformed_vds)
ht = mt.rows()
ht = ht.select('call_stats_by_pop', 'a_index', 'ac_an_af', 'ac_an_af_adj', 'call_stats_by_pop_adj')
ht.write(ht_paths[i])

with open(ancestry_file_location, 'r') as ancestry_file:
mt = matrix_table_ac_an_af(mt, ancestry_file) # this adds subpopulation information and splits our multi-allelic rows
# potentially in the future: merge AC, AN, AF back to the original VDS with: vds = vds_ac_an_af(mt, vds)

# potentially in the future: merge AC, AN, AF back to the original VDS with: vds = vds_ac_an_af(mt, vds)
# for debugging information -- remove for now to get us through Echo
# add_variant_tracking_info(mt, sites_only_vcf_path)

# create a sites only VCF (that is hard filtered!) and that can be made into a custom annotations TSV for Nirvana to use with AC, AN, AF, SC for all subpopulations and populations
write_sites_only_vcf(mt, sites_only_vcf_path)
ht_list = [hl.read_table(ht_path).naive_coalesce(5000) for ht_path in ht_paths] # repartition each table to 5k partitions before we union them
ht_all = ht_list[0].union(*ht_list[1:])
write_sites_only_vcf(ht_all, sites_only_vcf_path)


def annotate_entry_filter_flag(mt):
Expand All @@ -173,18 +254,21 @@ def write_tie_out_vcf(vds, vcf_output_path):
if __name__ == '__main__':
parser = argparse.ArgumentParser(allow_abbrev=False, description='Create VAT inputs TSV')
parser.add_argument('--ancestry_input_path', type=str, help='Input ancestry file path', required=True)
parser.add_argument('--vds_input_path', type=str, help='Input VDS path', default="@VDS_INPUT_PATH@")
parser.add_argument('--sites_only_output_path', type=str, help='Output sites-only VCF path',
default="@SITES_ONLY_VCF_OUTPUT_PATH@"),
parser.add_argument('--vds_input_path', type=str, help='Input VDS path')
parser.add_argument('--sites_only_output_path', type=str, help='Output sites-only VCF path'),
parser.add_argument('--temp_path', type=str, help='Path to temporary directory', required=True)

args = parser.parse_args()

temp_path = args.temp_path if not args.temp_path.endswith('/') else args.temp_path[:-1]
time_stamp = datetime.today().strftime('%Y-%m-%d_%H-%M-%S')
hl.init(tmp_dir=f'{temp_path}/hail_tmp_create_vat_inputs_{time_stamp}')
try:
temp_path = args.temp_path if not args.temp_path.endswith('/') else args.temp_path[:-1]
time_stamp = datetime.today().strftime('%Y-%m-%d_%H-%M-%S')
hl.init(tmp_dir=f'{temp_path}/hail_tmp_create_vat_inputs_{time_stamp}')

vds = hl.vds.read_vds(args.vds_input_path)
local_ancestry_file = create_vat_inputs.download_ancestry_file(args.ancestry_input_path)
vds = hl.vds.read_vds(args.vds_input_path)
local_ancestry_file = create_vat_inputs.download_ancestry_file(args.ancestry_input_path)

main(vds, local_ancestry_file, args.sites_only_output_path)
main(vds, local_ancestry_file, args.sites_only_output_path)
except Exception:
hl.copy_log(args.sites_only_output_path.replace(r".sites-only.vcf", ".log"))
raise
Loading

0 comments on commit 7590b7b

Please sign in to comment.