Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VS-1422 Merge VAT changes in echo into ah_var_store #8977

Merged
merged 17 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 19 additions & 24 deletions scripts/variantstore/wdl/GvsCreateVATfromVDS.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ workflow GvsCreateVATfromVDS {
String filter_set_name
File? sites_only_vcf
File? sites_only_vcf_index
String? hail_generate_sites_only_script_path
File? vds_path
String output_path

Expand Down Expand Up @@ -50,19 +49,17 @@ workflow GvsCreateVATfromVDS {
help: "name of the filter set used to generate the callset in GVS"
}
sites_only_vcf: {
help: "Optional sites-only VCF file. If defined, generation of a sites-only VCF from the VDS will be skipped. If defined, 'vds_path' and 'hail_generate_sites_only_script_path' must NOT be defined."
}
hail_generate_sites_only_script_path: {
help: "Optional hail_create_vat_inputs.py script in GCS that was created by the GvsExtractAvroFilesForHail WDL. If defined, 'vds_path' must also be defined and 'sites_only_vcf' must NOT be defined"
help: "Optional sites-only VCF file. If defined, generation of a sites-only VCF from the VDS will be skipped. If defined, then 'vds_path' must NOT be defined."
}
output_path: {
help: "GCS location (with a trailing '/') to put temporary and output files for the VAT pipeline"
}
vds_path: {
help: "Optional top-level directory of the GVS VDS to be used to create the VAT. If defined, 'hail_create_vat_inputs_script' must also be defined and 'sites_only_vcf' must NOT be defined"
help: "Optional top-level directory of the GVS VDS to be used to create the VAT. If defined, then 'sites_only_vcf' must NOT be defined"
}
}


File interval_list = "gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list"
File reference = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta"
File reference_dict = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict"
Expand Down Expand Up @@ -93,23 +90,23 @@ workflow GvsCreateVATfromVDS {
String output_path_without_a_trailing_slash = sub(output_path, "/$", "")
String effective_output_path = if (output_path == output_path_without_a_trailing_slash) then output_path + "/" else output_path

if ((defined(sites_only_vcf)) && (defined(vds_path) || defined(hail_generate_sites_only_script_path))) {
if ((defined(sites_only_vcf)) && (defined(vds_path))) {
call Utils.TerminateWorkflow as IfSitesOnlyVcfSetDontSetCreateParameters {
input:
message = "Error: If 'sites_only_vcf' is set as an input, you may not set 'vds_path' and 'hail_generate_sites_only_script_path'",
message = "Error: If 'sites_only_vcf' is set as an input, you may not set 'vds_path'",
basic_docker = effective_basic_docker,
}
}

if (!defined(sites_only_vcf) && ((!defined(vds_path) || !defined(hail_generate_sites_only_script_path)))) {
if (!defined(sites_only_vcf) && (!defined(vds_path))) {
call Utils.TerminateWorkflow as MustSetSitesOnlyVcfCreationParameters {
input:
message = "Error: If 'sites_only_vcf' is not set as an input, you MUST set 'vds_path' and 'hail_generate_sites_only_script_path'",
message = "Error: If 'sites_only_vcf' is not set as an input, you MUST set 'vds_path'",
basic_docker = effective_basic_docker,
}
}

if (defined(sites_only_vcf) || (defined(vds_path) && defined(hail_generate_sites_only_script_path))) {
if (defined(sites_only_vcf) || (defined(vds_path))) {
if (!defined(split_intervals_scatter_count)) {
call Utils.GetBQTableLastModifiedDatetime as SampleDateTime {
input:
Expand Down Expand Up @@ -146,7 +143,6 @@ workflow GvsCreateVATfromVDS {
workspace_project = effective_google_project,
hail_version = effective_hail_version,
hail_wheel = hail_wheel,
hail_generate_sites_only_script_path = select_first([hail_generate_sites_only_script_path]),
ancestry_file_path = MakeSubpopulationFilesAndReadSchemaFiles.ancestry_file_path,
workspace_bucket = GetToolVersions.workspace_bucket,
region = region,
Expand Down Expand Up @@ -315,7 +311,6 @@ task GenerateSitesOnlyVcf {
Boolean leave_cluster_running_at_end
String hail_version
File? hail_wheel
String hail_generate_sites_only_script_path
String ancestry_file_path
String? hail_temp_path
Int? cluster_max_idle_minutes
Expand Down Expand Up @@ -349,7 +344,7 @@ task GenerateSitesOnlyVcf {
cluster_name="~{prefix}-${hex}"
echo ${cluster_name} > cluster_name.txt

sites_only_vcf_filename="~{workspace_bucket}/~{prefix}-${hex}.sites-only.vcf"
sites_only_vcf_filename="~{workspace_bucket}/~{prefix}-${hex}.sites-only.vcf.bgz"
echo ${sites_only_vcf_filename} > sites_only_vcf_filename.txt

if [[ -z "~{hail_temp_path}" ]]
Expand All @@ -369,13 +364,10 @@ task GenerateSitesOnlyVcf {
}
FIN

# Run the hail python script to make a VDS
gsutil cp ~{hail_generate_sites_only_script_path} /app/

# Run the hail python script to make a sites-only VCF from a VDS
# - The autoscaling policy gvs-autoscaling-policy will exist already from the VDS creation
python3 /app/run_in_hail_cluster.py \
--script-path /app/~{basename(hail_generate_sites_only_script_path)} \
--script-path /app/hail_create_vat_inputs.py \
--secondary-script-path-list /app/create_vat_inputs.py \
--script-arguments-json-path script-arguments.json \
--account ${account_name} \
Expand Down Expand Up @@ -709,10 +701,10 @@ task AnnotateVCF {

runtime {
docker: variants_nirvana_docker
memory: "64 GB"
memory: "128 GB"
cpu: "4"
preemptible: 3
maxRetries: 2
preemptible: 1
maxRetries: 1
disks: "local-disk 2000 HDD"
}

Expand Down Expand Up @@ -758,8 +750,8 @@ task PrepVtAnnotationJson {

runtime {
docker: variants_docker
memory: "7 GB"
preemptible: 3
memory: "16 GB"
preemptible: 2
cpu: "1"
disks: "local-disk 500 HDD"
}
Expand Down Expand Up @@ -896,9 +888,12 @@ task BigQueryLoadJson {
echo "Dropping and recreating the vat table ~{dataset_name}.~{vat_table_name}"
bq --apilog=false rm -t -f --project_id=~{project_id} ~{dataset_name}.~{vat_table_name}
fi
bq --apilog=false mk --expiration=$DATE --project_id=~{project_id} ~{dataset_name}.~{vat_table_name} ~{nirvana_schema}

CLUSTERING_STRING="--clustering_fields=contig"
bq --apilog=false mk ${CLUSTERING_STRING} --expiration=$DATE --project_id=~{project_id} ~{dataset_name}.~{vat_table_name} ~{nirvana_schema}
echo "Loading data into it"


# Now we run a giant query in BQ to get this all in the right table and join the genes properly
# Note the genes table join includes the group by to avoid the duplicates that get created from genes that span shards
# Commented out columns in the query are to be added in the next release
Expand Down
6 changes: 3 additions & 3 deletions scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ task GetToolVersions {
# GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but
# there are a handlful of tasks that require the larger GNU libc-based `slim`.
String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-08-15-alpine-254df9be288d"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-09-11-alpine-72765c88f07b"
koncheto-broad marked this conversation as resolved.
Show resolved Hide resolved
String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19"
String gatk_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/gatk:2024_08_19-gatkbase-cd5b6b7821b2"
String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest"
Expand Down Expand Up @@ -1059,7 +1059,7 @@ task IndexVcf {
Int max_heap = memory_mb - 500

String local_file = basename(input_vcf)
Boolean is_compressed = sub(local_file, ".*\\.", "") == "gz"
Boolean is_compressed = (sub(local_file, ".*\\.", "") == "gz") || (sub(local_file, ".*\\.", "") == "bgz")
String index_extension = if is_compressed then ".tbi" else ".idx"

command <<<
Expand Down Expand Up @@ -1354,7 +1354,7 @@ task CopyFile {
fi

gsutil cp ~{input_file} ${OUTPUT_GCS_DIR}/
echo ${OUTPUT_PATH} > output_file_path.txt
echo $OUTPUT_PATH > output_file_path.txt
>>>
output {
String output_file_path = read_string("output_file_path.txt")
Expand Down
144 changes: 114 additions & 30 deletions scripts/variantstore/wdl/extract/hail_create_vat_inputs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import hail as hl
from typing import Union
from datetime import datetime

import create_vat_inputs
Expand Down Expand Up @@ -63,29 +64,75 @@ def failing_gts_to_no_call(vds):

def remove_too_many_alt_allele_sites(vds):
"""
Remove sites with more than 50 alternate alleles (and print how many)
Remove sites with more than 50 alternate alleles
"""
vd = vds.variant_data
print(vd.aggregate_rows(hl.agg.count_where(hl.len(vd.alleles) > 50)))
vd_50_aa_cutoff = vd.filter_rows(hl.len(vd.alleles) <= 50)

return hl.vds.VariantDataset(vds.reference_data, vd_50_aa_cutoff)



def annotate_adj(
mt: hl.MatrixTable,
adj_gq: int = 30,
adj_ab: float = 0.2,
) -> hl.MatrixTable:
"""
Annotate genotypes with adj criteria (assumes diploid).
Defaults similar to gnomAD values, but GQ >= 20 changed to GQ >= 30 to make up for lack of DP filter.
"""
if "LGT" in mt.entry and "LAD" in mt.entry:
gt_expr = mt.LGT
ad_expr = mt.LAD
else:
assert "GT" in mt.entry and "AD" in mt.entry
gt_expr = mt.GT
ad_expr = mt.AD
return mt.annotate_entries(
adj=get_adj_expr(
gt_expr, mt.GQ, ad_expr, adj_gq, adj_ab
)
)
def get_adj_expr(
gt_expr: hl.expr.CallExpression,
gq_expr: Union[hl.expr.Int32Expression, hl.expr.Int64Expression],
ad_expr: hl.expr.ArrayNumericExpression,
adj_gq: int = 30,
adj_ab: float = 0.2,
) -> hl.expr.BooleanExpression:
"""
Get adj genotype annotation.
Defaults similar to gnomAD values, but GQ >= 20 changed to GQ >= 30 to make up for lack of DP filter.
"""
total_ad = hl.sum(ad_expr)
return (
(gq_expr >= adj_gq)
& (
hl.case()
.when(~gt_expr.is_het(), True)
.when(gt_expr.is_het_ref(), ad_expr[gt_expr[1]] / total_ad >= adj_ab)
.default(
(ad_expr[gt_expr[0]] / total_ad >= adj_ab)
& (ad_expr[gt_expr[1]] / total_ad >= adj_ab)
)
)
)
def matrix_table_ac_an_af(mt, ancestry_file):
"""
Create a DENSE MATRIX TABLE to calculate AC, AN, AF and Sample Count
TODO: test sample_count
"""

sample_id_to_sub_population = create_vat_inputs.parse_ancestry_file(ancestry_file)

mt = mt.annotate_cols(pop=hl.literal(sample_id_to_sub_population)[mt.s])
mt = annotate_adj(mt)
ac_an_af_mt = mt.select_rows(
ac_an_af=hl.agg.call_stats(mt.GT, mt.alleles),
call_stats_by_pop=hl.agg.group_by(mt.pop, hl.agg.call_stats(mt.GT, mt.alleles)),
ac_an_af_adj=hl.agg.filter(mt.adj, hl.agg.call_stats(mt.GT, mt.alleles)),
call_stats_by_pop_adj=hl.agg.filter(mt.adj, hl.agg.group_by(mt.pop, hl.agg.call_stats(mt.GT, mt.alleles))),
)

return hl.methods.split_multi(ac_an_af_mt, left_aligned=True) # split each alternate allele onto it's own row. This will also remove all spanning delstions for us


Expand All @@ -110,7 +157,7 @@ def write_sites_only_vcf(ac_an_af_split, sites_only_vcf_path):
pop_info_fields[f'Hom_{pop}'] = ac_an_af_split.call_stats_by_pop.get(pop).homozygote_count[ac_an_af_split.row.a_index]


ac_an_af_rows = ac_an_af_split.annotate_rows(
ac_an_af_rows = ac_an_af_split.annotate(
info = hl.struct(
AC=ac_an_af_split.row.ac_an_af.AC[ac_an_af_split.row.a_index],
AN=ac_an_af_split.row.ac_an_af.AN,
Expand All @@ -122,32 +169,66 @@ def write_sites_only_vcf(ac_an_af_split, sites_only_vcf_path):

# note that SC = AC - homozygote_count

ht = ac_an_af_rows.rows()
ht = ht.filter(ht.alleles[1] != "*") # remove spanning deletions
ht = ac_an_af_rows.filter(ac_an_af_rows.alleles[1] != "*") # remove spanning deletions
# create a filtered sites only VCF
hl.export_vcf(ht, sites_only_vcf_path)


def add_variant_tracking_info(mt, sites_only_vcf_path):
# only need the table of row fields and leaves this as the only field
var_ids_path = sites_only_vcf_path.replace(r".sites-only.vcf", ".var_ids.tsv.bgz")
t = mt.rows()
t.select(var_origin_id=hl.format('%s-%s-%s-%s', t.locus.contig, t.locus.position, t.alleles[0], t.alleles[1])).export(var_ids_path, parallel='separate_header')


def main(vds, ancestry_file_location, sites_only_vcf_path, dry_run_n_parts=None):
n_parts = vds.variant_data.n_partitions()
if dry_run_n_parts:
n_rounds = 1
parts_per_round = dry_run_n_parts
ht_paths = [sites_only_vcf_path.replace(r".sites-only.vcf.bgz", f'_dryrun.ht')]
sites_only_vcf_path = sites_only_vcf_path.replace(r".vcf.bgz", f'_dryrun.vcf.bgz')
else:
n_rounds = 5
# Add in 'n_rounds - 1' to include all of the partitions in the set of groups, otherwise we would omit the final
# n_parts % n_rounds partitions.
parts_per_round = (n_parts + n_rounds - 1) // n_rounds
ht_paths = [sites_only_vcf_path.replace(r".sites-only.vcf.bgz", f'_{i}.ht') for i in range(n_rounds)]
for i in range(n_rounds):
part_range = range(i*parts_per_round, min((i+1)*parts_per_round, n_parts))
vds_part = hl.vds.VariantDataset(
vds.reference_data._filter_partitions(part_range),
vds.variant_data._filter_partitions(part_range),
)

transforms = [
remove_too_many_alt_allele_sites,
hard_filter_non_passing_sites,
failing_gts_to_no_call,
]
transformed_vds=vds_part
for transform in transforms:
transformed_vds = transform(transformed_vds)

print(f"densifying dense matrix table index {i}")
mt = hl.vds.to_dense_mt(transformed_vds)

def main(vds, ancestry_file_location, sites_only_vcf_path):
transforms = [
hard_filter_non_passing_sites,
failing_gts_to_no_call,
remove_too_many_alt_allele_sites
]
transformed_vds=vds
for transform in transforms:
transformed_vds = transform(transformed_vds)
with open(ancestry_file_location, 'r') as ancestry_file:
mt = matrix_table_ac_an_af(mt, ancestry_file) # this adds subpopulation information and splits our multi-allelic rows

mt = hl.vds.to_dense_mt(transformed_vds)
ht = mt.rows()
ht = ht.select('call_stats_by_pop', 'a_index', 'ac_an_af', 'ac_an_af_adj', 'call_stats_by_pop_adj')
ht.write(ht_paths[i])

with open(ancestry_file_location, 'r') as ancestry_file:
mt = matrix_table_ac_an_af(mt, ancestry_file) # this adds subpopulation information and splits our multi-allelic rows
# potentially in the future: merge AC, AN, AF back to the original VDS with: vds = vds_ac_an_af(mt, vds)

# potentially in the future: merge AC, AN, AF back to the original VDS with: vds = vds_ac_an_af(mt, vds)
# for debugging information -- remove for now to get us through Echo
# add_variant_tracking_info(mt, sites_only_vcf_path)

# create a sites only VCF (that is hard filtered!) and that can be made into a custom annotations TSV for Nirvana to use with AC, AN, AF, SC for all subpopulations and populations
write_sites_only_vcf(mt, sites_only_vcf_path)
ht_list = [hl.read_table(ht_path).naive_coalesce(5000) for ht_path in ht_paths] # repartition each table to 5k partitions before we union them
ht_all = ht_list[0].union(*ht_list[1:])
write_sites_only_vcf(ht_all, sites_only_vcf_path)


def annotate_entry_filter_flag(mt):
Expand All @@ -173,18 +254,21 @@ def write_tie_out_vcf(vds, vcf_output_path):
if __name__ == '__main__':
parser = argparse.ArgumentParser(allow_abbrev=False, description='Create VAT inputs TSV')
parser.add_argument('--ancestry_input_path', type=str, help='Input ancestry file path', required=True)
parser.add_argument('--vds_input_path', type=str, help='Input VDS path', default="@VDS_INPUT_PATH@")
parser.add_argument('--sites_only_output_path', type=str, help='Output sites-only VCF path',
default="@SITES_ONLY_VCF_OUTPUT_PATH@"),
parser.add_argument('--vds_input_path', type=str, help='Input VDS path')
parser.add_argument('--sites_only_output_path', type=str, help='Output sites-only VCF path'),
parser.add_argument('--temp_path', type=str, help='Path to temporary directory', required=True)

args = parser.parse_args()

temp_path = args.temp_path if not args.temp_path.endswith('/') else args.temp_path[:-1]
time_stamp = datetime.today().strftime('%Y-%m-%d_%H-%M-%S')
hl.init(tmp_dir=f'{temp_path}/hail_tmp_create_vat_inputs_{time_stamp}')
try:
temp_path = args.temp_path if not args.temp_path.endswith('/') else args.temp_path[:-1]
time_stamp = datetime.today().strftime('%Y-%m-%d_%H-%M-%S')
hl.init(tmp_dir=f'{temp_path}/hail_tmp_create_vat_inputs_{time_stamp}')

vds = hl.vds.read_vds(args.vds_input_path)
local_ancestry_file = create_vat_inputs.download_ancestry_file(args.ancestry_input_path)
vds = hl.vds.read_vds(args.vds_input_path)
local_ancestry_file = create_vat_inputs.download_ancestry_file(args.ancestry_input_path)

main(vds, local_ancestry_file, args.sites_only_output_path)
main(vds, local_ancestry_file, args.sites_only_output_path)
except Exception:
hl.copy_log(args.sites_only_output_path.replace(r".sites-only.vcf", ".log"))
raise
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/extract/run_in_hail_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def run_in_cluster(cluster_name, account, worker_machine_type, master_machine_ty
parser.add_argument('--account', type=str, help='GCP account name')
parser.add_argument('--worker-machine-type', type=str, required=False, default="n1-highmem-8",
help='Dataproc cluster worker machine type')
parser.add_argument('--master-machine-type', type=str, required=False, default="n1-highmem-16",
parser.add_argument('--master-machine-type', type=str, required=False, default="n1-highmem-32",
help='Dataproc cluster master machine type')
parser.add_argument('--master-memory-fraction', type=float, default=0.8, help='Dataproc master memory fraction')
parser.add_argument('--region', type=str, required=True, help='GCS region')
Expand Down
Loading