Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter to give tables created by prepare step a two week TTL [VS-597] #8595

Merged
merged 7 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ workflows:
branches:
- master
- ah_var_store
- rsa_vs_1124
tags:
- /.*/
- name: GvsImportGenomes
Expand All @@ -184,7 +183,6 @@ workflows:
branches:
- master
- ah_var_store
- vs_1060_phasing_into_vds
tags:
- /.*/
- name: GvsPrepareRangesCallset
Expand All @@ -196,7 +194,7 @@ workflows:
branches:
- master
- ah_var_store
- VS-1063-bit-pack-ref-ranges-data-into-a-more-efficient-representation
- rsa_vs_597
tags:
- /.*/
- name: GvsCreateVATfromVDS
Expand Down Expand Up @@ -235,7 +233,7 @@ workflows:
branches:
- master
- ah_var_store
- rsa_vs_1124
- rsa_vs_597
tags:
- /.*/
- name: GvsWithdrawSamples
Expand Down
1 change: 1 addition & 0 deletions scripts/variantstore/docs/aou/AOU_DELIVERABLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
- You will need to run `GvsPrepareRangesCallset` workflow first, if it has not been run already
- This workflow transforms the data in the vet tables into a schema optimized for callset stats creation and for calculating sensitivity and precision.
- The `only_output_vet_tables` input should be set to `true` (the default value is `false`).
- The `extract_table_ttl` input should be set to `true` (the default value is `false`), which will add a TTL to the tables it creates.
- See [naming conventions doc](https://docs.google.com/document/d/1pNtuv7uDoiOFPbwe4zx5sAGH7MyxwKqXkyrpNmBxeow) for guidance on what to use for `extract_table_prefix` or cohort prefix, which you will need to keep track of for the callset stats.
- This workflow does not use the Terra Data Entity Model to run, so be sure to select the `Run workflow with inputs defined by file paths` workflow submission option.
- You will need to have the "BigQuery Data Viewer" role for your @pmi-ops proxy group on the `spec-ops-aou:gvs_public_reference_data.gnomad_v3_sites` table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ workflow GvsExtractCohortFromSampleNames {
fq_temp_table_dataset = fq_gvs_extraction_temp_tables_dataset,
write_cost_to_db = write_cost_to_db,
cloud_sdk_docker = effective_cloud_sdk_docker,
extract_table_ttl = false,
rsasch marked this conversation as resolved.
Show resolved Hide resolved
}

call GvsExtractCallset.GvsExtractCallset {
Expand Down
1 change: 1 addition & 0 deletions scripts/variantstore/wdl/GvsJointVariantCalling.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ workflow GvsJointVariantCalling {
sample_names_to_extract = sample_names_to_extract,
variants_docker = effective_variants_docker,
cloud_sdk_docker = effective_cloud_sdk_docker,
extract_table_ttl = true,
}

String effective_output_gcs_dir = select_first([extract_output_gcs_dir, "~{effective_workspace_bucket}/output_vcfs/by_submission_id/~{effective_submission_id}"])
Expand Down
6 changes: 5 additions & 1 deletion scripts/variantstore/wdl/GvsPrepareRangesCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workflow GvsPrepareCallset {
String call_set_identifier

String extract_table_prefix = call_set_identifier
Boolean extract_table_ttl = false
String query_project = project_id
String destination_project = project_id
String destination_dataset = dataset_name
Expand Down Expand Up @@ -69,6 +70,7 @@ workflow GvsPrepareCallset {
write_cost_to_db = write_cost_to_db,
variants_docker = effective_variants_docker,
use_compressed_references = IsUsingCompressedReferences.is_using_compressed_references,
extract_table_ttl = extract_table_ttl
rsasch marked this conversation as resolved.
Show resolved Hide resolved
}

output {
Expand Down Expand Up @@ -96,6 +98,7 @@ task PrepareRangesCallsetTask {
Boolean write_cost_to_db
Boolean use_compressed_references
String variants_docker
Boolean extract_table_ttl
}
meta {
# All kinds of BQ reading happening in the referenced Python script.
Expand Down Expand Up @@ -136,7 +139,8 @@ task PrepareRangesCallsetTask {
--ttl ~{temp_table_ttl_in_hours} \
~{true="--only_output_vet_tables True" false='' only_output_vet_tables} \
~{true="--write_cost_to_db True" false="--write_cost_to_db ''" write_cost_to_db} \
~{true="--use_compressed_references True" false='' use_compressed_references}
~{true="--use_compressed_references True" false='' use_compressed_references} \
~{true="--extract_table_ttl True" false='' extract_table_ttl}

>>>
output {
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ task GetToolVersions {
# GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but
# there are a handlful of tasks that require the larger GNU libc-based `slim`.
String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim"
String variants_docker = "us.gcr.io/broad-dsde-methods/variantstore:2023-11-21-alpine-62b8d2b70"
String variants_docker = "us.gcr.io/broad-dsde-methods/variantstore:2023-11-30-alpine-02a0c5bd7"
String gatk_docker = "us.gcr.io/broad-dsde-methods/broad-gatk-snapshots:varstore_2023_10_31_e7746ce7c38a8226bcac5b89284782de2a4cdda1"
String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19"
String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
VET_TABLE_PREFIX = "vet_"
SAMPLES_PER_PARTITION = 4000

FINAL_TABLE_TTL = ""
# FINAL_TABLE_TTL = " OPTIONS( expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 72 HOUR)) "

# temp-table-uuid
output_table_prefix = str(uuid.uuid4()).split("-")[0]
Expand Down Expand Up @@ -111,17 +109,20 @@ def get_all_sample_ids(fq_destination_table_samples, only_output_vet_tables, fq_


def create_extract_samples_table(control_samples, fq_destination_table_samples, fq_sample_name_table,
fq_sample_mapping_table, honor_withdrawn):
fq_sample_mapping_table, honor_withdrawn, extract_table_ttl):
ttl = ""
if extract_table_ttl:
ttl = "OPTIONS( expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 14 DAY))"

sql = f"""

CREATE OR REPLACE TABLE `{fq_destination_table_samples}` AS (
CREATE OR REPLACE TABLE `{fq_destination_table_samples}`
{ttl}
AS (
SELECT m.sample_id, m.sample_name, m.is_loaded, {"m.withdrawn," if honor_withdrawn else "NULL as withdrawn,"} m.is_control FROM `{fq_sample_name_table}` s JOIN
`{fq_sample_mapping_table}` m ON (s.sample_name = m.sample_name) WHERE
m.is_loaded IS TRUE AND m.is_control = {control_samples}
{"AND m.withdrawn IS NULL" if honor_withdrawn else ""}
)

"""
print(sql)

Expand All @@ -130,8 +131,10 @@ def create_extract_samples_table(control_samples, fq_destination_table_samples,
return query_return['results']


def create_final_extract_vet_table(fq_destination_table_vet_data):
# first, create the table structure
def create_final_extract_vet_table(fq_destination_table_vet_data, extract_table_ttl):
ttl = ""
if extract_table_ttl:
ttl = "OPTIONS( expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 14 DAY))"

sql = f"""
CREATE OR REPLACE TABLE `{fq_destination_table_vet_data}`
Expand All @@ -149,15 +152,18 @@ def create_final_extract_vet_table(fq_destination_table_vet_data):
)
PARTITION BY RANGE_BUCKET(location, GENERATE_ARRAY(0, 26000000000000, 6500000000))
CLUSTER BY location
{FINAL_TABLE_TTL}
{ttl}
"""
print(sql)
query_return = utils.execute_with_retry(client, "create final export vet table", sql)
JOBS.append({'job': query_return['job'], 'label': query_return['label']})


def create_final_extract_ref_table(fq_destination_table_ref_data):
# first, create the table structure
def create_final_extract_ref_table(fq_destination_table_ref_data, extract_table_ttl):
ttl = ""
if extract_table_ttl:
ttl = "OPTIONS( expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 14 DAY))"

sql = f"""
CREATE OR REPLACE TABLE `{fq_destination_table_ref_data}`
(
Expand All @@ -168,7 +174,7 @@ def create_final_extract_ref_table(fq_destination_table_ref_data):
)
PARTITION BY RANGE_BUCKET(location, GENERATE_ARRAY(0, 26000000000000, 6500000000))
CLUSTER BY location
{FINAL_TABLE_TTL}
{ttl}
"""
print(sql)
query_return = utils.execute_with_retry(client, "create final export ref table", sql)
Expand Down Expand Up @@ -262,7 +268,8 @@ def make_extract_table(call_set_identifier,
temp_table_ttl_hours,
only_output_vet_tables,
write_cost_to_db,
use_compressed_references):
use_compressed_references,
extract_table_ttl):
try:
fq_destination_table_ref_data = f"{fq_destination_dataset}.{destination_table_prefix}__REF_DATA"
fq_destination_table_vet_data = f"{fq_destination_dataset}.{destination_table_prefix}__VET_DATA"
Expand Down Expand Up @@ -323,17 +330,17 @@ def make_extract_table(call_set_identifier,
# samples with a null `withdrawn` date in the cohort.
if not only_output_vet_tables:
create_extract_samples_table(control_samples, fq_destination_table_samples, fq_sample_name_table,
fq_sample_mapping_table, honor_withdrawn=not sample_names_to_extract)
fq_sample_mapping_table, not sample_names_to_extract, extract_table_ttl)

# pull the sample ids back down
sample_ids = get_all_sample_ids(fq_destination_table_samples, only_output_vet_tables, fq_sample_mapping_table)

# create and populate the tables for extract data
if not only_output_vet_tables:
create_final_extract_ref_table(fq_destination_table_ref_data)
create_final_extract_ref_table(fq_destination_table_ref_data, extract_table_ttl)
populate_final_extract_table_with_ref(fq_ranges_dataset, fq_destination_table_ref_data, sample_ids, use_compressed_references)

create_final_extract_vet_table(fq_destination_table_vet_data)
create_final_extract_vet_table(fq_destination_table_vet_data, extract_table_ttl)
populate_final_extract_table_with_vet(fq_ranges_dataset, fq_destination_table_vet_data, sample_ids)

finally:
Expand Down Expand Up @@ -363,15 +370,16 @@ def make_extract_table(call_set_identifier,
required=False)
parser.add_argument('--fq_sample_mapping_table', type=str, help='Mapping table from sample_id to sample_name',
required=True)
parser.add_argument('--max_tables',type=int, help='Maximum number of vet/ref ranges tables to consider', required=False,
default=250)
parser.add_argument('--max_tables',type=int, help='Maximum number of vet/ref ranges tables to consider', required=False, default=250)
parser.add_argument('--ttl', type=int, help='Temp table TTL in hours', required=False, default=72)
parser.add_argument('--only_output_vet_tables', type=bool,
help='Only create __VET_DATA table, skip __REF_DATA and __SAMPLES tables', required=False, default=False)
parser.add_argument('--write_cost_to_db', type=bool,
help='Populate cost_observability table with BigQuery query bytes scanned', required=False, default=True)
parser.add_argument('--use_compressed_references', type=bool,
help='Expect compressed reference data and expand the fields', required=False, default=False)
parser.add_argument('--extract_table_ttl', type=bool,
help='Add a TTL to the extract tables', required=False, default=False)

sample_args = parser.add_mutually_exclusive_group(required=True)
sample_args.add_argument('--sample_names_to_extract', type=str,
Expand All @@ -398,4 +406,5 @@ def make_extract_table(call_set_identifier,
args.ttl,
args.only_output_vet_tables,
args.write_cost_to_db,
args.use_compressed_references)
args.use_compressed_references,
args.extract_table_ttl)
Loading