Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Script to Handle Both Files and FOFN [VS-1441] #8954

Merged
merged 8 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ workflows:
branches:
- master
- ah_var_store
- rsa_vs_1441
tags:
- /.*/
- name: GvsPopulateAltAllele
Expand Down Expand Up @@ -428,7 +429,6 @@ workflows:
- master
- ah_var_store
- EchoCallset
- rsa_vs_1110
- name: GvsExtractCallsetPgenMerged
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl
Expand All @@ -437,7 +437,7 @@ workflows:
- master
- ah_var_store
- EchoCallset
- rsa_vs_1110
- rsa_vs_1441
- name: MergePgenHierarchicalWdl
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/MergePgenHierarchical.wdl
Expand Down
1 change: 1 addition & 0 deletions scripts/variantstore/wdl/GvsExtractCallsetPgen.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ workflow GvsExtractCallsetPgen {
Array[File] output_pgens = PgenExtractTask.output_pgen
Array[File] output_pvars = PgenExtractTask.output_pvar
Array[File] output_psams = PgenExtractTask.output_psam
Array[File] monitoring_logs = PgenExtractTask.monitoring_log
File output_pgen_interval_files = SplitIntervalsTarred.interval_files_tar
Array[String] output_pgen_interval_filenames = SplitIntervalsTarred.interval_filenames
Float total_pgens_size_mb = SumBytes.total_mb
Expand Down
7 changes: 7 additions & 0 deletions scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ workflow GvsExtractCallsetPgenMerged {
}
}

call Utils.SummarizeTaskMonitorLogs as SummarizeExtractCallsetPgen {
input:
variants_docker = effective_variants_docker,
inputs = GvsExtractCallsetPgen.monitoring_logs,
}

output {
Array[File] merged_pgens = MergePgenWorkflow.pgen_file
Array[File] merged_pvars = MergePgenWorkflow.pvar_file
Expand All @@ -161,6 +167,7 @@ workflow GvsExtractCallsetPgenMerged {
Float total_pgens_size_mb = GvsExtractCallsetPgen.total_pgens_size_mb
File manifest = GvsExtractCallsetPgen.manifest
File? sample_name_list = GvsExtractCallsetPgen.sample_name_list
File monitoring_summary = SummarizeExtractCallsetPgen.monitoring_summary
}

}
Expand Down
59 changes: 28 additions & 31 deletions scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ task GetToolVersions {
# GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but
# there are a handlful of tasks that require the larger GNU libc-based `slim`.
String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-07-16-alpine-0b965b8d2679"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-08-13-alpine-fe21da898f54"
String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19"
String gatk_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/gatk:2024-07-23-gatkbase-abbe96265d5f"
String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest"
Expand Down Expand Up @@ -1190,39 +1190,36 @@ task MergeTsvs {
}
task SummarizeTaskMonitorLogs {
input {
Array[File] inputs
String variants_docker
}
command <<<
# Prepend date, time and pwd to xtrace log entries.
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace
input {
Array[File] inputs
String variants_docker
File log_fofn = write_lines(inputs)
}
parameter_meta {
inputs: {
localization_optional: true
}
}
INPUTS="~{sep=" " inputs}"
if [[ -z "$INPUTS" ]]; then
echo "No monitoring log files found" > monitoring_summary.txt
else
python3 /app/summarize_task_monitor_logs.py \
--input $INPUTS \
--output monitoring_summary.txt
fi
command <<<
# Prepend date, time and pwd to xtrace log entries.
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace
>>>
python3 /app/summarize_task_monitor_logs.py --fofn_input ~{log_fofn} \
--output monitoring_summary.txt
>>>
# ------------------------------------------------
# Runtime settings:
runtime {
docker: variants_docker
memory: "1 GB"
preemptible: 3
cpu: "1"
disks: "local-disk 100 HDD"
}
output {
File monitoring_summary = "monitoring_summary.txt"
}
runtime {
docker: variants_docker
memory: "1 GB"
preemptible: 3
cpu: "1"
disks: "local-disk 100 HDD"
}
output {
File monitoring_summary = "monitoring_summary.txt"
}
}
# Note - this task should probably live in GvsCreateFilterSet, but I moved it here when I was refactoring VQSR Classic out of
Expand Down
48 changes: 39 additions & 9 deletions scripts/variantstore/wdl/extract/summarize_task_monitor_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import re
import os
from google.cloud import storage

MaxCpu = -100.0
MaxMem = -100.0
Expand All @@ -10,16 +11,41 @@
MaxDiskPct = -100.0


def parse_monitoring_log_files(mlog_files, output_file):
def parse_monitoring_log_files(file_input, fofn_input, output_file):
with open(output_file, 'w') as output:
header = f"Total Mem\tMax Mem Used\tMax Mem Used (%)\tTotal Disk\tMax Disk Used\tMax Disk Used (" \
f"%)\tTask\tShard\tFile\n"
output.write(header)

for mlog_file in mlog_files:
if not os.path.exists(mlog_file):
eprint(f"ERROR: File {mlog_file} does not exist")
parse_monitoring_log_file(mlog_file, output)
if file_input:
for mlog_file in file_input:
if not os.path.exists(mlog_file):
eprint(f"ERROR: Log file {mlog_file} does not exist.")
parse_monitoring_log_file(mlog_file, output)
else:
if not os.path.exists(fofn_input):
eprint(f"ERROR: FOFN file {fofn_input} does not exist.")
client = storage.Client()
with open(fofn_input) as input:
for path in input:
gcs_re = re.compile("^gs://(?P<bucket_name>[^/]+)/(?P<blob_name>.*)$")
match = gcs_re.match(path)
if not match:
raise ValueError(f"'{path}' does not look like a GCS path")

if not os.path.exists("temp"):
os.makedirs("temp")
bucket_name, blob_name = match.groups()
bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)
converted_path = "temp/" + blob_name.replace('/', '_')[-100:]
blob.download_to_filename(converted_path)
parse_monitoring_log_file(converted_path, output)
# os.remove(converted_path)
input.close()





def parse_monitoring_log_file(mlog_file, output):
Expand Down Expand Up @@ -182,9 +208,13 @@ def eprint(*the_args, **kwargs):
if __name__ == '__main__':
parser = argparse.ArgumentParser(allow_abbrev=False,
description='A tool to summarize the output of multiple monitoring logs')

parser.add_argument('--input', nargs='+', help='Monitoring log file(s)', required=True)
parser.add_argument('--output', type=str, help='Output Monitoring log summary file', required=True)
args = parser.parse_args()

parse_monitoring_log_files(args.input, args.output)
file_args = parser.add_mutually_exclusive_group(required=True)
file_args.add_argument('--file_input', nargs='+',
help='Monitoring log file(s); script will fail if you pass too many.')
file_args.add_argument('--fofn_input', type=str,
help='GCS path to a monitoring log FOFN, 1 GCS log path per line.')

args = parser.parse_args()
parse_monitoring_log_files(args.file_input, args.fofn_input, args.output)
Loading
Loading