Skip to content

Commit

Permalink
VS-1485 - Optionally call CollectVariantCallingMetrics from ExtractVc…
Browse files Browse the repository at this point in the history
…fs (#8968)

* Optionally collect variant calling metrics.
* Output monitoring logs from tasks.
* Output metrics files from workflow
  • Loading branch information
gbggrant authored Sep 9, 2024
1 parent 09bf732 commit ae8b601
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 0 deletions.
184 changes: 184 additions & 0 deletions scripts/variantstore/wdl/GvsExtractCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ workflow GvsExtractCallset {
Int? disk_override
Boolean bgzip_output_vcfs = false
Boolean zero_pad_output_vcf_filenames = true
Boolean collect_variant_calling_metrics = false

# set to "NONE" if all the reference data was loaded into GVS in GvsImportGenomes
String drop_state = "NONE"
Expand Down Expand Up @@ -60,6 +61,9 @@ workflow GvsExtractCallset {
File reference_dict = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict"
File reference_index = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai"

File dbsnp_vcf = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dbsnp138.vcf"
File dbsnp_vcf_index = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dbsnp138.vcf.idx"

String fq_gvs_dataset = "~{project_id}.~{dataset_name}"
String fq_cohort_dataset = "~{cohort_project_id}.~{cohort_dataset_name}"

Expand Down Expand Up @@ -258,6 +262,30 @@ workflow GvsExtractCallset {
target_interval_list = target_interval_list,
}

if (collect_variant_calling_metrics) {
call CollectVariantCallingMetrics as CollectMetricsSharded {
input:
input_vcf = ExtractTask.output_vcf,
input_vcf_index = ExtractTask.output_vcf_index,
metrics_filename_prefix = call_set_identifier + "." + i,
dbsnp_vcf = dbsnp_vcf,
dbsnp_vcf_index = dbsnp_vcf_index,
interval_list = SplitIntervals.interval_files[i],
ref_dict = reference_dict,
gatk_docker = effective_gatk_docker
}
}
}

if (collect_variant_calling_metrics) {
call GatherVariantCallingMetrics {
input:
input_details = select_all(CollectMetricsSharded.detail_metrics_file),
input_summaries = select_all(CollectMetricsSharded.summary_metrics_file),
output_prefix = call_set_identifier,
output_gcs_dir = output_gcs_dir,
gatk_docker = effective_gatk_docker
}
}

call SumBytes {
Expand Down Expand Up @@ -300,6 +328,8 @@ workflow GvsExtractCallset {
Float total_vcfs_size_mb = SumBytes.total_mb
File manifest = CreateManifestAndOptionallyCopyOutputs.manifest
File sample_name_list = GenerateSampleListFile.sample_name_list
File? summary_metrics_file = GatherVariantCallingMetrics.summary_metrics_file
File? detail_metrics_file = GatherVariantCallingMetrics.detail_metrics_file
String recorded_git_hash = effective_git_hash
Boolean done = true
}
Expand Down Expand Up @@ -634,3 +664,157 @@ task GenerateSampleListFile {
cpu: 1
}
}

task CollectVariantCallingMetrics {
input {
File input_vcf
File input_vcf_index
File dbsnp_vcf
File dbsnp_vcf_index
File interval_list
File ref_dict
String metrics_filename_prefix

Int memory_mb = 7500
Int disk_size_gb = ceil(2*size(input_vcf, "GiB")) + 200
String gatk_docker
}

File monitoring_script = "gs://gvs_quickstart_storage/cromwell_monitoring_script.sh"

Int command_mem = memory_mb - 1000
Int max_heap = memory_mb - 500

command <<<
# Prepend date, time and pwd to xtrace log entries.
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace

bash ~{monitoring_script} > monitoring.log &

gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \
CollectVariantCallingMetrics \
--INPUT ~{input_vcf} \
--DBSNP ~{dbsnp_vcf} \
--SEQUENCE_DICTIONARY ~{ref_dict} \
--OUTPUT ~{metrics_filename_prefix} \
--THREAD_COUNT 8 \
--TARGET_INTERVALS ~{interval_list}
>>>

output {
File summary_metrics_file = "~{metrics_filename_prefix}.variant_calling_summary_metrics"
File detail_metrics_file = "~{metrics_filename_prefix}.variant_calling_detail_metrics"
File monitoring_log = "monitoring.log"
}

runtime {
docker: gatk_docker
cpu: 2
memory: "${memory_mb} MiB"
disks: "local-disk ${disk_size_gb} HDD"
bootDiskSizeGb: 15
preemptible: 2
noAddress: true
}
}

task GatherVariantCallingMetrics {

input {
Array[File] input_details
Array[File] input_summaries
String output_prefix
String? output_gcs_dir

Int memory_mb = 3000
Int disk_size_gb = 200
String gatk_docker
}

parameter_meta {
input_details: {
localization_optional: true
}
input_summaries: {
localization_optional: true
}
}

File monitoring_script = "gs://gvs_quickstart_storage/cromwell_monitoring_script.sh"

Int command_mem = memory_mb - 1000
Int max_heap = memory_mb - 500

command <<<
# Prepend date, time and pwd to xtrace log entries.
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace

# Drop trailing slash if one exists
OUTPUT_GCS_DIR=$(echo ~{output_gcs_dir} | sed 's/\/$//')

bash ~{monitoring_script} > monitoring.log &

input_details_fofn=~{write_lines(input_details)}
input_summaries_fofn=~{write_lines(input_summaries)}

# Jose says:
# Cromwell will fall over if we have it try to localize tens of thousands of files,
# so we manually localize files using gsutil.
# Using gsutil also lets us parallelize the localization, which (as far as we can tell)
# PAPI doesn't do.

# This is here to deal with the JES bug where commands may be run twice
rm -rf metrics

mkdir metrics
RETRY_LIMIT=5

count=0
until cat $input_details_fofn | gsutil -m cp -L cp.log -c -I metrics/; do
sleep 1
((count++)) && ((count >= $RETRY_LIMIT)) && break
done
if [ "$count" -ge "$RETRY_LIMIT" ]; then
echo 'Could not copy all the metrics from the cloud' && exit 1
fi

count=0
until cat $input_summaries_fofn | gsutil -m cp -L cp.log -c -I metrics/; do
sleep 1
((count++)) && ((count >= $RETRY_LIMIT)) && break
done
if [ "$count" -ge "$RETRY_LIMIT" ]; then
echo 'Could not copy all the metrics from the cloud' && exit 1
fi

INPUT=$(cat $input_details_fofn | rev | cut -d '/' -f 1 | rev | sed s/.variant_calling_detail_metrics//g | awk '{printf("--INPUT metrics/%s ", $1)}')

gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \
AccumulateVariantCallingMetrics \
$INPUT \
--OUTPUT ~{output_prefix}

if [ -n "$OUTPUT_GCS_DIR" ]; then
gsutil cp ~{output_prefix}.variant_calling_summary_metrics ${OUTPUT_GCS_DIR}/
gsutil cp ~{output_prefix}.variant_calling_detail_metrics ${OUTPUT_GCS_DIR}/
fi
>>>

runtime {
docker: gatk_docker
cpu: 1
memory: "${memory_mb} MiB"
disks: "local-disk ${disk_size_gb} HDD"
bootDiskSizeGb: 15
preemptible: 1
noAddress: true
}

output {
File summary_metrics_file = "~{output_prefix}.variant_calling_summary_metrics"
File detail_metrics_file = "~{output_prefix}.variant_calling_detail_metrics"
File monitoring_log = "monitoring.log"
}
}
2 changes: 2 additions & 0 deletions scripts/variantstore/wdl/GvsExtractCohortFromSampleNames.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ workflow GvsExtractCohortFromSampleNames {
# set to "NONE" if all the reference data was loaded into GVS in GvsImportGenomes
String drop_state = "NONE"
Boolean bgzip_output_vcfs = false
Boolean collect_variant_calling_metrics = false

File? interval_list
Int? extract_preemptible_override
Expand Down Expand Up @@ -151,6 +152,7 @@ workflow GvsExtractCohortFromSampleNames {

drop_state = drop_state,
bgzip_output_vcfs = bgzip_output_vcfs,
collect_variant_calling_metrics = collect_variant_calling_metrics,
extract_preemptible_override = extract_preemptible_override,
extract_maxretries_override = extract_maxretries_override,
split_intervals_disk_size_override = split_intervals_disk_size_override,
Expand Down
2 changes: 2 additions & 0 deletions scripts/variantstore/wdl/GvsJointVariantCalling.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ workflow GvsJointVariantCalling {
String vcf_index_files_column_name

Boolean bgzip_output_vcfs = false
Boolean collect_variant_calling_metrics = false
String drop_state = "FORTY"
Boolean use_VQSR = false
Boolean use_compressed_references = false
Expand Down Expand Up @@ -228,6 +229,7 @@ workflow GvsJointVariantCalling {
do_not_filter_override = extract_do_not_filter_override,
drop_state = drop_state,
bgzip_output_vcfs = bgzip_output_vcfs,
collect_variant_calling_metrics = collect_variant_calling_metrics,
is_wgs = is_wgs,
maximum_alternate_alleles = maximum_alternate_alleles,
target_interval_list = target_interval_list,
Expand Down
2 changes: 2 additions & 0 deletions scripts/variantstore/wdl/GvsQuickstartIntegration.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ workflow GvsQuickstartIntegration {
String workspace_bucket = GetToolVersions.workspace_bucket
String submission_id = GetToolVersions.submission_id
String extract_output_gcs_dir = "~{workspace_bucket}/output_vcfs/by_submission_id/~{submission_id}/beta"
Boolean collect_variant_calling_metrics = true

call Utils.CreateDatasetForTest {
input:
Expand Down Expand Up @@ -340,6 +341,7 @@ workflow GvsQuickstartIntegration {
vcf_files_column_name = vcf_files_column_name,
vcf_index_files_column_name = vcf_index_files_column_name,
extract_output_gcs_dir = extract_output_gcs_dir,
collect_variant_calling_metrics = collect_variant_calling_metrics,
}

if (!QuickstartBeta.used_tighter_gcp_quotas) {
Expand Down

0 comments on commit ae8b601

Please sign in to comment.