diff --git a/scripts/variantstore/wdl/GvsExtractCallset.wdl b/scripts/variantstore/wdl/GvsExtractCallset.wdl index 81a3d1d2b27..ace00044315 100644 --- a/scripts/variantstore/wdl/GvsExtractCallset.wdl +++ b/scripts/variantstore/wdl/GvsExtractCallset.wdl @@ -24,6 +24,7 @@ workflow GvsExtractCallset { Int? disk_override Boolean bgzip_output_vcfs = false Boolean zero_pad_output_vcf_filenames = true + Boolean collect_variant_calling_metrics = false # set to "NONE" if all the reference data was loaded into GVS in GvsImportGenomes String drop_state = "NONE" @@ -60,6 +61,9 @@ workflow GvsExtractCallset { File reference_dict = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict" File reference_index = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai" + File dbsnp_vcf = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dbsnp138.vcf" + File dbsnp_vcf_index = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dbsnp138.vcf.idx" + String fq_gvs_dataset = "~{project_id}.~{dataset_name}" String fq_cohort_dataset = "~{cohort_project_id}.~{cohort_dataset_name}" @@ -258,6 +262,30 @@ workflow GvsExtractCallset { target_interval_list = target_interval_list, } + if (collect_variant_calling_metrics) { + call CollectVariantCallingMetrics as CollectMetricsSharded { + input: + input_vcf = ExtractTask.output_vcf, + input_vcf_index = ExtractTask.output_vcf_index, + metrics_filename_prefix = call_set_identifier + "." + i, + dbsnp_vcf = dbsnp_vcf, + dbsnp_vcf_index = dbsnp_vcf_index, + interval_list = SplitIntervals.interval_files[i], + ref_dict = reference_dict, + gatk_docker = effective_gatk_docker + } + } + } + + if (collect_variant_calling_metrics) { + call GatherVariantCallingMetrics { + input: + input_details = select_all(CollectMetricsSharded.detail_metrics_file), + input_summaries = select_all(CollectMetricsSharded.summary_metrics_file), + output_prefix = call_set_identifier, + output_gcs_dir = output_gcs_dir, + gatk_docker = effective_gatk_docker + } } call SumBytes { @@ -300,6 +328,8 @@ workflow GvsExtractCallset { Float total_vcfs_size_mb = SumBytes.total_mb File manifest = CreateManifestAndOptionallyCopyOutputs.manifest File sample_name_list = GenerateSampleListFile.sample_name_list + File? summary_metrics_file = GatherVariantCallingMetrics.summary_metrics_file + File? detail_metrics_file = GatherVariantCallingMetrics.detail_metrics_file String recorded_git_hash = effective_git_hash Boolean done = true } @@ -634,3 +664,157 @@ task GenerateSampleListFile { cpu: 1 } } + +task CollectVariantCallingMetrics { + input { + File input_vcf + File input_vcf_index + File dbsnp_vcf + File dbsnp_vcf_index + File interval_list + File ref_dict + String metrics_filename_prefix + + Int memory_mb = 7500 + Int disk_size_gb = ceil(2*size(input_vcf, "GiB")) + 200 + String gatk_docker + } + + File monitoring_script = "gs://gvs_quickstart_storage/cromwell_monitoring_script.sh" + + Int command_mem = memory_mb - 1000 + Int max_heap = memory_mb - 500 + + command <<< + # Prepend date, time and pwd to xtrace log entries. + PS4='\D{+%F %T} \w $ ' + set -o errexit -o nounset -o pipefail -o xtrace + + bash ~{monitoring_script} > monitoring.log & + + gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ + CollectVariantCallingMetrics \ + --INPUT ~{input_vcf} \ + --DBSNP ~{dbsnp_vcf} \ + --SEQUENCE_DICTIONARY ~{ref_dict} \ + --OUTPUT ~{metrics_filename_prefix} \ + --THREAD_COUNT 8 \ + --TARGET_INTERVALS ~{interval_list} + >>> + + output { + File summary_metrics_file = "~{metrics_filename_prefix}.variant_calling_summary_metrics" + File detail_metrics_file = "~{metrics_filename_prefix}.variant_calling_detail_metrics" + File monitoring_log = "monitoring.log" + } + + runtime { + docker: gatk_docker + cpu: 2 + memory: "${memory_mb} MiB" + disks: "local-disk ${disk_size_gb} HDD" + bootDiskSizeGb: 15 + preemptible: 2 + noAddress: true + } +} + +task GatherVariantCallingMetrics { + + input { + Array[File] input_details + Array[File] input_summaries + String output_prefix + String? output_gcs_dir + + Int memory_mb = 3000 + Int disk_size_gb = 200 + String gatk_docker + } + + parameter_meta { + input_details: { + localization_optional: true + } + input_summaries: { + localization_optional: true + } + } + + File monitoring_script = "gs://gvs_quickstart_storage/cromwell_monitoring_script.sh" + + Int command_mem = memory_mb - 1000 + Int max_heap = memory_mb - 500 + + command <<< + # Prepend date, time and pwd to xtrace log entries. + PS4='\D{+%F %T} \w $ ' + set -o errexit -o nounset -o pipefail -o xtrace + + # Drop trailing slash if one exists + OUTPUT_GCS_DIR=$(echo ~{output_gcs_dir} | sed 's/\/$//') + + bash ~{monitoring_script} > monitoring.log & + + input_details_fofn=~{write_lines(input_details)} + input_summaries_fofn=~{write_lines(input_summaries)} + + # Jose says: + # Cromwell will fall over if we have it try to localize tens of thousands of files, + # so we manually localize files using gsutil. + # Using gsutil also lets us parallelize the localization, which (as far as we can tell) + # PAPI doesn't do. + + # This is here to deal with the JES bug where commands may be run twice + rm -rf metrics + + mkdir metrics + RETRY_LIMIT=5 + + count=0 + until cat $input_details_fofn | gsutil -m cp -L cp.log -c -I metrics/; do + sleep 1 + ((count++)) && ((count >= $RETRY_LIMIT)) && break + done + if [ "$count" -ge "$RETRY_LIMIT" ]; then + echo 'Could not copy all the metrics from the cloud' && exit 1 + fi + + count=0 + until cat $input_summaries_fofn | gsutil -m cp -L cp.log -c -I metrics/; do + sleep 1 + ((count++)) && ((count >= $RETRY_LIMIT)) && break + done + if [ "$count" -ge "$RETRY_LIMIT" ]; then + echo 'Could not copy all the metrics from the cloud' && exit 1 + fi + + INPUT=$(cat $input_details_fofn | rev | cut -d '/' -f 1 | rev | sed s/.variant_calling_detail_metrics//g | awk '{printf("--INPUT metrics/%s ", $1)}') + + gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ + AccumulateVariantCallingMetrics \ + $INPUT \ + --OUTPUT ~{output_prefix} + + if [ -n "$OUTPUT_GCS_DIR" ]; then + gsutil cp ~{output_prefix}.variant_calling_summary_metrics ${OUTPUT_GCS_DIR}/ + gsutil cp ~{output_prefix}.variant_calling_detail_metrics ${OUTPUT_GCS_DIR}/ + fi + >>> + + runtime { + docker: gatk_docker + cpu: 1 + memory: "${memory_mb} MiB" + disks: "local-disk ${disk_size_gb} HDD" + bootDiskSizeGb: 15 + preemptible: 1 + noAddress: true + } + + output { + File summary_metrics_file = "~{output_prefix}.variant_calling_summary_metrics" + File detail_metrics_file = "~{output_prefix}.variant_calling_detail_metrics" + File monitoring_log = "monitoring.log" + } +} \ No newline at end of file diff --git a/scripts/variantstore/wdl/GvsExtractCohortFromSampleNames.wdl b/scripts/variantstore/wdl/GvsExtractCohortFromSampleNames.wdl index c2754793bdd..9fed98945a1 100644 --- a/scripts/variantstore/wdl/GvsExtractCohortFromSampleNames.wdl +++ b/scripts/variantstore/wdl/GvsExtractCohortFromSampleNames.wdl @@ -33,6 +33,7 @@ workflow GvsExtractCohortFromSampleNames { # set to "NONE" if all the reference data was loaded into GVS in GvsImportGenomes String drop_state = "NONE" Boolean bgzip_output_vcfs = false + Boolean collect_variant_calling_metrics = false File? interval_list Int? extract_preemptible_override @@ -151,6 +152,7 @@ workflow GvsExtractCohortFromSampleNames { drop_state = drop_state, bgzip_output_vcfs = bgzip_output_vcfs, + collect_variant_calling_metrics = collect_variant_calling_metrics, extract_preemptible_override = extract_preemptible_override, extract_maxretries_override = extract_maxretries_override, split_intervals_disk_size_override = split_intervals_disk_size_override, diff --git a/scripts/variantstore/wdl/GvsJointVariantCalling.wdl b/scripts/variantstore/wdl/GvsJointVariantCalling.wdl index 9e740184f15..96d70936d0e 100644 --- a/scripts/variantstore/wdl/GvsJointVariantCalling.wdl +++ b/scripts/variantstore/wdl/GvsJointVariantCalling.wdl @@ -20,6 +20,7 @@ workflow GvsJointVariantCalling { String vcf_index_files_column_name Boolean bgzip_output_vcfs = false + Boolean collect_variant_calling_metrics = false String drop_state = "FORTY" Boolean use_VQSR = false Boolean use_compressed_references = false @@ -228,6 +229,7 @@ workflow GvsJointVariantCalling { do_not_filter_override = extract_do_not_filter_override, drop_state = drop_state, bgzip_output_vcfs = bgzip_output_vcfs, + collect_variant_calling_metrics = collect_variant_calling_metrics, is_wgs = is_wgs, maximum_alternate_alleles = maximum_alternate_alleles, target_interval_list = target_interval_list, diff --git a/scripts/variantstore/wdl/GvsQuickstartIntegration.wdl b/scripts/variantstore/wdl/GvsQuickstartIntegration.wdl index f3491f0cabd..9515fbefe55 100644 --- a/scripts/variantstore/wdl/GvsQuickstartIntegration.wdl +++ b/scripts/variantstore/wdl/GvsQuickstartIntegration.wdl @@ -307,6 +307,7 @@ workflow GvsQuickstartIntegration { String workspace_bucket = GetToolVersions.workspace_bucket String submission_id = GetToolVersions.submission_id String extract_output_gcs_dir = "~{workspace_bucket}/output_vcfs/by_submission_id/~{submission_id}/beta" + Boolean collect_variant_calling_metrics = true call Utils.CreateDatasetForTest { input: @@ -340,6 +341,7 @@ workflow GvsQuickstartIntegration { vcf_files_column_name = vcf_files_column_name, vcf_index_files_column_name = vcf_index_files_column_name, extract_output_gcs_dir = extract_output_gcs_dir, + collect_variant_calling_metrics = collect_variant_calling_metrics, } if (!QuickstartBeta.used_tighter_gcp_quotas) {