From 7590b7bfa5aa87ccf8af4951aad047ceef509d63 Mon Sep 17 00:00:00 2001 From: George Grant Date: Tue, 1 Oct 2024 15:43:53 -0400 Subject: [PATCH] VS-1422 Merge VAT changes in echo into ah_var_store (#8977) * VS-1422. Update ah_var_store with VAT-related changes from EchoCallset * Update GvsCreateVATfromVDS.wdl with changes from EchoCallset. * Moved changes for hail_create_vat_inputs.py from EchoCallset into branch. * Update run_in_hail_cluster.py * Rebuilt docker with changes merged in from EchoCallset. --- .../GvsCreateVATfromVDS.wdl | 43 +++--- scripts/variantstore/wdl/GvsUtils.wdl | 6 +- scripts/variantstore/wdl/extract/Dockerfile | 2 +- .../wdl/extract/build_base.Dockerfile | 2 +- .../wdl/extract/hail_create_vat_inputs.py | 144 ++++++++++++++---- .../wdl/extract/run_in_hail_cluster.py | 2 +- 6 files changed, 139 insertions(+), 60 deletions(-) diff --git a/scripts/variantstore/variant-annotations-table/GvsCreateVATfromVDS.wdl b/scripts/variantstore/variant-annotations-table/GvsCreateVATfromVDS.wdl index 805d324d861..2d3120093de 100644 --- a/scripts/variantstore/variant-annotations-table/GvsCreateVATfromVDS.wdl +++ b/scripts/variantstore/variant-annotations-table/GvsCreateVATfromVDS.wdl @@ -11,7 +11,6 @@ workflow GvsCreateVATfromVDS { String filter_set_name File? sites_only_vcf File? sites_only_vcf_index - String? hail_generate_sites_only_script_path File? vds_path String output_path @@ -50,19 +49,17 @@ workflow GvsCreateVATfromVDS { help: "name of the filter set used to generate the callset in GVS" } sites_only_vcf: { - help: "Optional sites-only VCF file. If defined, generation of a sites-only VCF from the VDS will be skipped. If defined, 'vds_path' and 'hail_generate_sites_only_script_path' must NOT be defined." - } - hail_generate_sites_only_script_path: { - help: "Optional hail_create_vat_inputs.py script in GCS that was created by the GvsExtractAvroFilesForHail WDL. If defined, 'vds_path' must also be defined and 'sites_only_vcf' must NOT be defined" + help: "Optional sites-only VCF file. If defined, generation of a sites-only VCF from the VDS will be skipped. If defined, then 'vds_path' must NOT be defined." } output_path: { help: "GCS location (with a trailing '/') to put temporary and output files for the VAT pipeline" } vds_path: { - help: "Optional top-level directory of the GVS VDS to be used to create the VAT. If defined, 'hail_create_vat_inputs_script' must also be defined and 'sites_only_vcf' must NOT be defined" + help: "Optional top-level directory of the GVS VDS to be used to create the VAT. If defined, then 'sites_only_vcf' must NOT be defined" } } + File interval_list = "gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list" File reference = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta" File reference_dict = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict" @@ -93,23 +90,23 @@ workflow GvsCreateVATfromVDS { String output_path_without_a_trailing_slash = sub(output_path, "/$", "") String effective_output_path = if (output_path == output_path_without_a_trailing_slash) then output_path + "/" else output_path - if ((defined(sites_only_vcf)) && (defined(vds_path) || defined(hail_generate_sites_only_script_path))) { + if ((defined(sites_only_vcf)) && (defined(vds_path))) { call Utils.TerminateWorkflow as IfSitesOnlyVcfSetDontSetCreateParameters { input: - message = "Error: If 'sites_only_vcf' is set as an input, you may not set 'vds_path' and 'hail_generate_sites_only_script_path'", + message = "Error: If 'sites_only_vcf' is set as an input, you may not set 'vds_path'", basic_docker = effective_basic_docker, } } - if (!defined(sites_only_vcf) && ((!defined(vds_path) || !defined(hail_generate_sites_only_script_path)))) { + if (!defined(sites_only_vcf) && (!defined(vds_path))) { call Utils.TerminateWorkflow as MustSetSitesOnlyVcfCreationParameters { input: - message = "Error: If 'sites_only_vcf' is not set as an input, you MUST set 'vds_path' and 'hail_generate_sites_only_script_path'", + message = "Error: If 'sites_only_vcf' is not set as an input, you MUST set 'vds_path'", basic_docker = effective_basic_docker, } } - if (defined(sites_only_vcf) || (defined(vds_path) && defined(hail_generate_sites_only_script_path))) { + if (defined(sites_only_vcf) || (defined(vds_path))) { if (!defined(split_intervals_scatter_count)) { call Utils.GetBQTableLastModifiedDatetime as SampleDateTime { input: @@ -146,7 +143,6 @@ workflow GvsCreateVATfromVDS { workspace_project = effective_google_project, hail_version = effective_hail_version, hail_wheel = hail_wheel, - hail_generate_sites_only_script_path = select_first([hail_generate_sites_only_script_path]), ancestry_file_path = MakeSubpopulationFilesAndReadSchemaFiles.ancestry_file_path, workspace_bucket = GetToolVersions.workspace_bucket, region = region, @@ -315,7 +311,6 @@ task GenerateSitesOnlyVcf { Boolean leave_cluster_running_at_end String hail_version File? hail_wheel - String hail_generate_sites_only_script_path String ancestry_file_path String? hail_temp_path Int? cluster_max_idle_minutes @@ -349,7 +344,7 @@ task GenerateSitesOnlyVcf { cluster_name="~{prefix}-${hex}" echo ${cluster_name} > cluster_name.txt - sites_only_vcf_filename="~{workspace_bucket}/~{prefix}-${hex}.sites-only.vcf" + sites_only_vcf_filename="~{workspace_bucket}/~{prefix}-${hex}.sites-only.vcf.bgz" echo ${sites_only_vcf_filename} > sites_only_vcf_filename.txt if [[ -z "~{hail_temp_path}" ]] @@ -369,13 +364,10 @@ task GenerateSitesOnlyVcf { } FIN - # Run the hail python script to make a VDS - gsutil cp ~{hail_generate_sites_only_script_path} /app/ - # Run the hail python script to make a sites-only VCF from a VDS # - The autoscaling policy gvs-autoscaling-policy will exist already from the VDS creation python3 /app/run_in_hail_cluster.py \ - --script-path /app/~{basename(hail_generate_sites_only_script_path)} \ + --script-path /app/hail_create_vat_inputs.py \ --secondary-script-path-list /app/create_vat_inputs.py \ --script-arguments-json-path script-arguments.json \ --account ${account_name} \ @@ -709,10 +701,10 @@ task AnnotateVCF { runtime { docker: variants_nirvana_docker - memory: "64 GB" + memory: "128 GB" cpu: "4" - preemptible: 3 - maxRetries: 2 + preemptible: 1 + maxRetries: 1 disks: "local-disk 2000 HDD" } @@ -758,8 +750,8 @@ task PrepVtAnnotationJson { runtime { docker: variants_docker - memory: "7 GB" - preemptible: 3 + memory: "16 GB" + preemptible: 2 cpu: "1" disks: "local-disk 500 HDD" } @@ -896,9 +888,12 @@ task BigQueryLoadJson { echo "Dropping and recreating the vat table ~{dataset_name}.~{vat_table_name}" bq --apilog=false rm -t -f --project_id=~{project_id} ~{dataset_name}.~{vat_table_name} fi - bq --apilog=false mk --expiration=$DATE --project_id=~{project_id} ~{dataset_name}.~{vat_table_name} ~{nirvana_schema} + + CLUSTERING_STRING="--clustering_fields=contig" + bq --apilog=false mk ${CLUSTERING_STRING} --expiration=$DATE --project_id=~{project_id} ~{dataset_name}.~{vat_table_name} ~{nirvana_schema} echo "Loading data into it" + # Now we run a giant query in BQ to get this all in the right table and join the genes properly # Note the genes table join includes the group by to avoid the duplicates that get created from genes that span shards # Commented out columns in the query are to be added in the next release diff --git a/scripts/variantstore/wdl/GvsUtils.wdl b/scripts/variantstore/wdl/GvsUtils.wdl index 180fb772110..bb405972a75 100644 --- a/scripts/variantstore/wdl/GvsUtils.wdl +++ b/scripts/variantstore/wdl/GvsUtils.wdl @@ -72,7 +72,7 @@ task GetToolVersions { # GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but # there are a handlful of tasks that require the larger GNU libc-based `slim`. String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim" - String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-08-15-alpine-254df9be288d" + String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-09-30-alpine-27b8708f5808" String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19" String gatk_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/gatk:2024_08_19-gatkbase-cd5b6b7821b2" String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest" @@ -1059,7 +1059,7 @@ task IndexVcf { Int max_heap = memory_mb - 500 String local_file = basename(input_vcf) - Boolean is_compressed = sub(local_file, ".*\\.", "") == "gz" + Boolean is_compressed = (sub(local_file, ".*\\.", "") == "gz") || (sub(local_file, ".*\\.", "") == "bgz") String index_extension = if is_compressed then ".tbi" else ".idx" command <<< @@ -1354,7 +1354,7 @@ task CopyFile { fi gsutil cp ~{input_file} ${OUTPUT_GCS_DIR}/ - echo ${OUTPUT_PATH} > output_file_path.txt + echo $OUTPUT_PATH > output_file_path.txt >>> output { String output_file_path = read_string("output_file_path.txt") diff --git a/scripts/variantstore/wdl/extract/Dockerfile b/scripts/variantstore/wdl/extract/Dockerfile index f2192e2868b..229ba2f6c42 100644 --- a/scripts/variantstore/wdl/extract/Dockerfile +++ b/scripts/variantstore/wdl/extract/Dockerfile @@ -8,7 +8,7 @@ # expensive to create and isn't expected to change often, while the steps in this Dockerfile are much less expensive and # more likely to change. Using a build-base image essentially allows the expensive layers to be globally cached which # should make building the final image much faster in most cases. -FROM us.gcr.io/broad-dsde-methods/variantstore:2024-07-01-alpine-build-base as build +FROM us.gcr.io/broad-dsde-methods/variantstore:2024-09-30-alpine-build-base as build # Install all of our variantstore Python requirements. COPY requirements.txt requirements.txt diff --git a/scripts/variantstore/wdl/extract/build_base.Dockerfile b/scripts/variantstore/wdl/extract/build_base.Dockerfile index 9324b405723..6cb9c96e828 100644 --- a/scripts/variantstore/wdl/extract/build_base.Dockerfile +++ b/scripts/variantstore/wdl/extract/build_base.Dockerfile @@ -36,7 +36,7 @@ RUN apk add autoconf bash cmake g++ gcc make ninja python3-dev git openssl-dev z ARG ARROW_VERSION=16.1.0 RUN cd / && \ - curl -O https://dlcdn.apache.org/arrow/arrow-$ARROW_VERSION/apache-arrow-$ARROW_VERSION.tar.gz && \ + curl -O https://archive.apache.org/dist/arrow/arrow-$ARROW_VERSION/apache-arrow-$ARROW_VERSION.tar.gz && \ tar xfz apache-arrow-$ARROW_VERSION.tar.gz # Pyarrow build instructions from https://arrow.apache.org/docs/developers/python.html#python-development diff --git a/scripts/variantstore/wdl/extract/hail_create_vat_inputs.py b/scripts/variantstore/wdl/extract/hail_create_vat_inputs.py index 1c737a881a7..4925d1fafda 100644 --- a/scripts/variantstore/wdl/extract/hail_create_vat_inputs.py +++ b/scripts/variantstore/wdl/extract/hail_create_vat_inputs.py @@ -1,5 +1,6 @@ import argparse import hail as hl +from typing import Union from datetime import datetime import create_vat_inputs @@ -63,29 +64,75 @@ def failing_gts_to_no_call(vds): def remove_too_many_alt_allele_sites(vds): """ - Remove sites with more than 50 alternate alleles (and print how many) + Remove sites with more than 50 alternate alleles """ vd = vds.variant_data - print(vd.aggregate_rows(hl.agg.count_where(hl.len(vd.alleles) > 50))) vd_50_aa_cutoff = vd.filter_rows(hl.len(vd.alleles) <= 50) return hl.vds.VariantDataset(vds.reference_data, vd_50_aa_cutoff) +def annotate_adj( + mt: hl.MatrixTable, + adj_gq: int = 30, + adj_ab: float = 0.2, +) -> hl.MatrixTable: + """ + Annotate genotypes with adj criteria (assumes diploid). + Defaults similar to gnomAD values, but GQ >= 20 changed to GQ >= 30 to make up for lack of DP filter. + """ + if "LGT" in mt.entry and "LAD" in mt.entry: + gt_expr = mt.LGT + ad_expr = mt.LAD + else: + assert "GT" in mt.entry and "AD" in mt.entry + gt_expr = mt.GT + ad_expr = mt.AD + return mt.annotate_entries( + adj=get_adj_expr( + gt_expr, mt.GQ, ad_expr, adj_gq, adj_ab + ) + ) +def get_adj_expr( + gt_expr: hl.expr.CallExpression, + gq_expr: Union[hl.expr.Int32Expression, hl.expr.Int64Expression], + ad_expr: hl.expr.ArrayNumericExpression, + adj_gq: int = 30, + adj_ab: float = 0.2, +) -> hl.expr.BooleanExpression: + """ + Get adj genotype annotation. + Defaults similar to gnomAD values, but GQ >= 20 changed to GQ >= 30 to make up for lack of DP filter. + """ + total_ad = hl.sum(ad_expr) + return ( + (gq_expr >= adj_gq) + & ( + hl.case() + .when(~gt_expr.is_het(), True) + .when(gt_expr.is_het_ref(), ad_expr[gt_expr[1]] / total_ad >= adj_ab) + .default( + (ad_expr[gt_expr[0]] / total_ad >= adj_ab) + & (ad_expr[gt_expr[1]] / total_ad >= adj_ab) + ) + ) + ) def matrix_table_ac_an_af(mt, ancestry_file): """ Create a DENSE MATRIX TABLE to calculate AC, AN, AF and Sample Count TODO: test sample_count """ - sample_id_to_sub_population = create_vat_inputs.parse_ancestry_file(ancestry_file) - mt = mt.annotate_cols(pop=hl.literal(sample_id_to_sub_population)[mt.s]) + mt = annotate_adj(mt) ac_an_af_mt = mt.select_rows( ac_an_af=hl.agg.call_stats(mt.GT, mt.alleles), call_stats_by_pop=hl.agg.group_by(mt.pop, hl.agg.call_stats(mt.GT, mt.alleles)), + ac_an_af_adj=hl.agg.filter(mt.adj, hl.agg.call_stats(mt.GT, mt.alleles)), + call_stats_by_pop_adj=hl.agg.filter(mt.adj, hl.agg.group_by(mt.pop, hl.agg.call_stats(mt.GT, mt.alleles))), ) + return hl.methods.split_multi(ac_an_af_mt, left_aligned=True) # split each alternate allele onto it's own row. This will also remove all spanning delstions for us @@ -110,7 +157,7 @@ def write_sites_only_vcf(ac_an_af_split, sites_only_vcf_path): pop_info_fields[f'Hom_{pop}'] = ac_an_af_split.call_stats_by_pop.get(pop).homozygote_count[ac_an_af_split.row.a_index] - ac_an_af_rows = ac_an_af_split.annotate_rows( + ac_an_af_rows = ac_an_af_split.annotate( info = hl.struct( AC=ac_an_af_split.row.ac_an_af.AC[ac_an_af_split.row.a_index], AN=ac_an_af_split.row.ac_an_af.AN, @@ -122,32 +169,66 @@ def write_sites_only_vcf(ac_an_af_split, sites_only_vcf_path): # note that SC = AC - homozygote_count - ht = ac_an_af_rows.rows() - ht = ht.filter(ht.alleles[1] != "*") # remove spanning deletions + ht = ac_an_af_rows.filter(ac_an_af_rows.alleles[1] != "*") # remove spanning deletions # create a filtered sites only VCF hl.export_vcf(ht, sites_only_vcf_path) +def add_variant_tracking_info(mt, sites_only_vcf_path): + # only need the table of row fields and leaves this as the only field + var_ids_path = sites_only_vcf_path.replace(r".sites-only.vcf", ".var_ids.tsv.bgz") + t = mt.rows() + t.select(var_origin_id=hl.format('%s-%s-%s-%s', t.locus.contig, t.locus.position, t.alleles[0], t.alleles[1])).export(var_ids_path, parallel='separate_header') + + +def main(vds, ancestry_file_location, sites_only_vcf_path, dry_run_n_parts=None): + n_parts = vds.variant_data.n_partitions() + if dry_run_n_parts: + n_rounds = 1 + parts_per_round = dry_run_n_parts + ht_paths = [sites_only_vcf_path.replace(r".sites-only.vcf.bgz", f'_dryrun.ht')] + sites_only_vcf_path = sites_only_vcf_path.replace(r".vcf.bgz", f'_dryrun.vcf.bgz') + else: + n_rounds = 5 + # Add in 'n_rounds - 1' to include all of the partitions in the set of groups, otherwise we would omit the final + # n_parts % n_rounds partitions. + parts_per_round = (n_parts + n_rounds - 1) // n_rounds + ht_paths = [sites_only_vcf_path.replace(r".sites-only.vcf.bgz", f'_{i}.ht') for i in range(n_rounds)] + for i in range(n_rounds): + part_range = range(i*parts_per_round, min((i+1)*parts_per_round, n_parts)) + vds_part = hl.vds.VariantDataset( + vds.reference_data._filter_partitions(part_range), + vds.variant_data._filter_partitions(part_range), + ) + + transforms = [ + remove_too_many_alt_allele_sites, + hard_filter_non_passing_sites, + failing_gts_to_no_call, + ] + transformed_vds=vds_part + for transform in transforms: + transformed_vds = transform(transformed_vds) + + print(f"densifying dense matrix table index {i}") + mt = hl.vds.to_dense_mt(transformed_vds) -def main(vds, ancestry_file_location, sites_only_vcf_path): - transforms = [ - hard_filter_non_passing_sites, - failing_gts_to_no_call, - remove_too_many_alt_allele_sites - ] - transformed_vds=vds - for transform in transforms: - transformed_vds = transform(transformed_vds) + with open(ancestry_file_location, 'r') as ancestry_file: + mt = matrix_table_ac_an_af(mt, ancestry_file) # this adds subpopulation information and splits our multi-allelic rows - mt = hl.vds.to_dense_mt(transformed_vds) + ht = mt.rows() + ht = ht.select('call_stats_by_pop', 'a_index', 'ac_an_af', 'ac_an_af_adj', 'call_stats_by_pop_adj') + ht.write(ht_paths[i]) - with open(ancestry_file_location, 'r') as ancestry_file: - mt = matrix_table_ac_an_af(mt, ancestry_file) # this adds subpopulation information and splits our multi-allelic rows + # potentially in the future: merge AC, AN, AF back to the original VDS with: vds = vds_ac_an_af(mt, vds) - # potentially in the future: merge AC, AN, AF back to the original VDS with: vds = vds_ac_an_af(mt, vds) + # for debugging information -- remove for now to get us through Echo + # add_variant_tracking_info(mt, sites_only_vcf_path) # create a sites only VCF (that is hard filtered!) and that can be made into a custom annotations TSV for Nirvana to use with AC, AN, AF, SC for all subpopulations and populations - write_sites_only_vcf(mt, sites_only_vcf_path) + ht_list = [hl.read_table(ht_path).naive_coalesce(5000) for ht_path in ht_paths] # repartition each table to 5k partitions before we union them + ht_all = ht_list[0].union(*ht_list[1:]) + write_sites_only_vcf(ht_all, sites_only_vcf_path) def annotate_entry_filter_flag(mt): @@ -173,18 +254,21 @@ def write_tie_out_vcf(vds, vcf_output_path): if __name__ == '__main__': parser = argparse.ArgumentParser(allow_abbrev=False, description='Create VAT inputs TSV') parser.add_argument('--ancestry_input_path', type=str, help='Input ancestry file path', required=True) - parser.add_argument('--vds_input_path', type=str, help='Input VDS path', default="@VDS_INPUT_PATH@") - parser.add_argument('--sites_only_output_path', type=str, help='Output sites-only VCF path', - default="@SITES_ONLY_VCF_OUTPUT_PATH@"), + parser.add_argument('--vds_input_path', type=str, help='Input VDS path') + parser.add_argument('--sites_only_output_path', type=str, help='Output sites-only VCF path'), parser.add_argument('--temp_path', type=str, help='Path to temporary directory', required=True) args = parser.parse_args() - temp_path = args.temp_path if not args.temp_path.endswith('/') else args.temp_path[:-1] - time_stamp = datetime.today().strftime('%Y-%m-%d_%H-%M-%S') - hl.init(tmp_dir=f'{temp_path}/hail_tmp_create_vat_inputs_{time_stamp}') + try: + temp_path = args.temp_path if not args.temp_path.endswith('/') else args.temp_path[:-1] + time_stamp = datetime.today().strftime('%Y-%m-%d_%H-%M-%S') + hl.init(tmp_dir=f'{temp_path}/hail_tmp_create_vat_inputs_{time_stamp}') - vds = hl.vds.read_vds(args.vds_input_path) - local_ancestry_file = create_vat_inputs.download_ancestry_file(args.ancestry_input_path) + vds = hl.vds.read_vds(args.vds_input_path) + local_ancestry_file = create_vat_inputs.download_ancestry_file(args.ancestry_input_path) - main(vds, local_ancestry_file, args.sites_only_output_path) + main(vds, local_ancestry_file, args.sites_only_output_path) + except Exception: + hl.copy_log(args.sites_only_output_path.replace(r".sites-only.vcf", ".log")) + raise \ No newline at end of file diff --git a/scripts/variantstore/wdl/extract/run_in_hail_cluster.py b/scripts/variantstore/wdl/extract/run_in_hail_cluster.py index bc6bfbf1a70..dc856b8a6a3 100644 --- a/scripts/variantstore/wdl/extract/run_in_hail_cluster.py +++ b/scripts/variantstore/wdl/extract/run_in_hail_cluster.py @@ -130,7 +130,7 @@ def run_in_cluster(cluster_name, account, worker_machine_type, master_machine_ty parser.add_argument('--account', type=str, help='GCP account name') parser.add_argument('--worker-machine-type', type=str, required=False, default="n1-highmem-8", help='Dataproc cluster worker machine type') - parser.add_argument('--master-machine-type', type=str, required=False, default="n1-highmem-16", + parser.add_argument('--master-machine-type', type=str, required=False, default="n1-highmem-32", help='Dataproc cluster master machine type') parser.add_argument('--master-memory-fraction', type=float, default=0.8, help='Dataproc master memory fraction') parser.add_argument('--region', type=str, required=True, help='GCS region')