Skip to content

Commit

Permalink
Update Script to Handle Both Files and FOFN [VS-1441] (#8954)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsasch authored Aug 14, 2024
1 parent a20c360 commit 3a3b90a
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 250 deletions.
4 changes: 2 additions & 2 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ workflows:
branches:
- master
- ah_var_store
- rsa_vs_1441
tags:
- /.*/
- name: GvsPopulateAltAllele
Expand Down Expand Up @@ -428,7 +429,6 @@ workflows:
- master
- ah_var_store
- EchoCallset
- rsa_vs_1110
- name: GvsExtractCallsetPgenMerged
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl
Expand All @@ -437,7 +437,7 @@ workflows:
- master
- ah_var_store
- EchoCallset
- rsa_vs_1110
- rsa_vs_1441
- name: MergePgenHierarchicalWdl
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/MergePgenHierarchical.wdl
Expand Down
1 change: 1 addition & 0 deletions scripts/variantstore/wdl/GvsExtractCallsetPgen.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ workflow GvsExtractCallsetPgen {
Array[File] output_pgens = PgenExtractTask.output_pgen
Array[File] output_pvars = PgenExtractTask.output_pvar
Array[File] output_psams = PgenExtractTask.output_psam
Array[File] monitoring_logs = PgenExtractTask.monitoring_log
File output_pgen_interval_files = SplitIntervalsTarred.interval_files_tar
Array[String] output_pgen_interval_filenames = SplitIntervalsTarred.interval_filenames
Float total_pgens_size_mb = SumBytes.total_mb
Expand Down
7 changes: 7 additions & 0 deletions scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ workflow GvsExtractCallsetPgenMerged {
}
}

call Utils.SummarizeTaskMonitorLogs as SummarizeExtractCallsetPgen {
input:
variants_docker = effective_variants_docker,
inputs = GvsExtractCallsetPgen.monitoring_logs,
}

output {
Array[File] merged_pgens = MergePgenWorkflow.pgen_file
Array[File] merged_pvars = MergePgenWorkflow.pvar_file
Expand All @@ -161,6 +167,7 @@ workflow GvsExtractCallsetPgenMerged {
Float total_pgens_size_mb = GvsExtractCallsetPgen.total_pgens_size_mb
File manifest = GvsExtractCallsetPgen.manifest
File? sample_name_list = GvsExtractCallsetPgen.sample_name_list
File monitoring_summary = SummarizeExtractCallsetPgen.monitoring_summary
}

}
Expand Down
59 changes: 28 additions & 31 deletions scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ task GetToolVersions {
# GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but
# there are a handlful of tasks that require the larger GNU libc-based `slim`.
String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-07-16-alpine-0b965b8d2679"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-08-13-alpine-fe21da898f54"
String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19"
String gatk_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/gatk:2024-07-23-gatkbase-abbe96265d5f"
String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest"
Expand Down Expand Up @@ -1190,39 +1190,36 @@ task MergeTsvs {
}
task SummarizeTaskMonitorLogs {
input {
Array[File] inputs
String variants_docker
}
command <<<
# Prepend date, time and pwd to xtrace log entries.
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace
input {
Array[File] inputs
String variants_docker
File log_fofn = write_lines(inputs)
}
parameter_meta {
inputs: {
localization_optional: true
}
}
INPUTS="~{sep=" " inputs}"
if [[ -z "$INPUTS" ]]; then
echo "No monitoring log files found" > monitoring_summary.txt
else
python3 /app/summarize_task_monitor_logs.py \
--input $INPUTS \
--output monitoring_summary.txt
fi
command <<<
# Prepend date, time and pwd to xtrace log entries.
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace
>>>
python3 /app/summarize_task_monitor_logs.py --fofn_input ~{log_fofn} \
--output monitoring_summary.txt
>>>
# ------------------------------------------------
# Runtime settings:
runtime {
docker: variants_docker
memory: "1 GB"
preemptible: 3
cpu: "1"
disks: "local-disk 100 HDD"
}
output {
File monitoring_summary = "monitoring_summary.txt"
}
runtime {
docker: variants_docker
memory: "1 GB"
preemptible: 3
cpu: "1"
disks: "local-disk 100 HDD"
}
output {
File monitoring_summary = "monitoring_summary.txt"
}
}
# Note - this task should probably live in GvsCreateFilterSet, but I moved it here when I was refactoring VQSR Classic out of
Expand Down
48 changes: 39 additions & 9 deletions scripts/variantstore/wdl/extract/summarize_task_monitor_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import re
import os
from google.cloud import storage

MaxCpu = -100.0
MaxMem = -100.0
Expand All @@ -10,16 +11,41 @@
MaxDiskPct = -100.0


def parse_monitoring_log_files(mlog_files, output_file):
def parse_monitoring_log_files(file_input, fofn_input, output_file):
with open(output_file, 'w') as output:
header = f"Total Mem\tMax Mem Used\tMax Mem Used (%)\tTotal Disk\tMax Disk Used\tMax Disk Used (" \
f"%)\tTask\tShard\tFile\n"
output.write(header)

for mlog_file in mlog_files:
if not os.path.exists(mlog_file):
eprint(f"ERROR: File {mlog_file} does not exist")
parse_monitoring_log_file(mlog_file, output)
if file_input:
for mlog_file in file_input:
if not os.path.exists(mlog_file):
eprint(f"ERROR: Log file {mlog_file} does not exist.")
parse_monitoring_log_file(mlog_file, output)
else:
if not os.path.exists(fofn_input):
eprint(f"ERROR: FOFN file {fofn_input} does not exist.")
client = storage.Client()
with open(fofn_input) as input:
for path in input:
gcs_re = re.compile("^gs://(?P<bucket_name>[^/]+)/(?P<blob_name>.*)$")
match = gcs_re.match(path)
if not match:
raise ValueError(f"'{path}' does not look like a GCS path")

if not os.path.exists("temp"):
os.makedirs("temp")
bucket_name, blob_name = match.groups()
bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
converted_path = "temp/" + blob_name.replace('/', '_')[-100:]
blob.download_to_filename(converted_path)
parse_monitoring_log_file(converted_path, output)
# os.remove(converted_path)
input.close()





def parse_monitoring_log_file(mlog_file, output):
Expand Down Expand Up @@ -182,9 +208,13 @@ def eprint(*the_args, **kwargs):
if __name__ == '__main__':
parser = argparse.ArgumentParser(allow_abbrev=False,
description='A tool to summarize the output of multiple monitoring logs')

parser.add_argument('--input', nargs='+', help='Monitoring log file(s)', required=True)
parser.add_argument('--output', type=str, help='Output Monitoring log summary file', required=True)
args = parser.parse_args()

parse_monitoring_log_files(args.input, args.output)
file_args = parser.add_mutually_exclusive_group(required=True)
file_args.add_argument('--file_input', nargs='+',
help='Monitoring log file(s); script will fail if you pass too many.')
file_args.add_argument('--fofn_input', type=str,
help='GCS path to a monitoring log FOFN, 1 GCS log path per line.')

args = parser.parse_args()
parse_monitoring_log_files(args.file_input, args.fofn_input, args.output)
Loading

0 comments on commit 3a3b90a

Please sign in to comment.