diff --git a/.dockstore.yml b/.dockstore.yml index 17f2582676d..91a9e10cf6d 100644 --- a/.dockstore.yml +++ b/.dockstore.yml @@ -126,6 +126,7 @@ workflows: branches: - master - ah_var_store + - rsa_vs_1441 tags: - /.*/ - name: GvsPopulateAltAllele @@ -428,7 +429,6 @@ workflows: - master - ah_var_store - EchoCallset - - rsa_vs_1110 - name: GvsExtractCallsetPgenMerged subclass: WDL primaryDescriptorPath: /scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl @@ -437,7 +437,7 @@ workflows: - master - ah_var_store - EchoCallset - - rsa_vs_1110 + - rsa_vs_1441 - name: MergePgenHierarchicalWdl subclass: WDL primaryDescriptorPath: /scripts/variantstore/wdl/MergePgenHierarchical.wdl diff --git a/scripts/variantstore/wdl/GvsExtractCallsetPgen.wdl b/scripts/variantstore/wdl/GvsExtractCallsetPgen.wdl index c9d6e34f08b..be223d201a3 100644 --- a/scripts/variantstore/wdl/GvsExtractCallsetPgen.wdl +++ b/scripts/variantstore/wdl/GvsExtractCallsetPgen.wdl @@ -298,6 +298,7 @@ workflow GvsExtractCallsetPgen { Array[File] output_pgens = PgenExtractTask.output_pgen Array[File] output_pvars = PgenExtractTask.output_pvar Array[File] output_psams = PgenExtractTask.output_psam + Array[File] monitoring_logs = PgenExtractTask.monitoring_log File output_pgen_interval_files = SplitIntervalsTarred.interval_files_tar Array[String] output_pgen_interval_filenames = SplitIntervalsTarred.interval_filenames Float total_pgens_size_mb = SumBytes.total_mb diff --git a/scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl b/scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl index 2d477466e84..2e6ffbc8e74 100644 --- a/scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl +++ b/scripts/variantstore/wdl/GvsExtractCallsetPgenMerged.wdl @@ -150,6 +150,12 @@ workflow GvsExtractCallsetPgenMerged { } } + call Utils.SummarizeTaskMonitorLogs as SummarizeExtractCallsetPgen { + input: + variants_docker = effective_variants_docker, + inputs = GvsExtractCallsetPgen.monitoring_logs, + } + output { Array[File] merged_pgens = MergePgenWorkflow.pgen_file Array[File] merged_pvars = MergePgenWorkflow.pvar_file @@ -161,6 +167,7 @@ workflow GvsExtractCallsetPgenMerged { Float total_pgens_size_mb = GvsExtractCallsetPgen.total_pgens_size_mb File manifest = GvsExtractCallsetPgen.manifest File? sample_name_list = GvsExtractCallsetPgen.sample_name_list + File monitoring_summary = SummarizeExtractCallsetPgen.monitoring_summary } } diff --git a/scripts/variantstore/wdl/GvsUtils.wdl b/scripts/variantstore/wdl/GvsUtils.wdl index f8ad63038a0..cf46eb54962 100644 --- a/scripts/variantstore/wdl/GvsUtils.wdl +++ b/scripts/variantstore/wdl/GvsUtils.wdl @@ -72,7 +72,7 @@ task GetToolVersions { # GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but # there are a handlful of tasks that require the larger GNU libc-based `slim`. String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim" - String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-07-16-alpine-0b965b8d2679" + String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-08-13-alpine-fe21da898f54" String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19" String gatk_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/gatk:2024-07-23-gatkbase-abbe96265d5f" String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest" @@ -1190,39 +1190,36 @@ task MergeTsvs { } task SummarizeTaskMonitorLogs { - input { - Array[File] inputs - String variants_docker - } - - command <<< - # Prepend date, time and pwd to xtrace log entries. - PS4='\D{+%F %T} \w $ ' - set -o errexit -o nounset -o pipefail -o xtrace + input { + Array[File] inputs + String variants_docker + File log_fofn = write_lines(inputs) + } + parameter_meta { + inputs: { + localization_optional: true + } + } - INPUTS="~{sep=" " inputs}" - if [[ -z "$INPUTS" ]]; then - echo "No monitoring log files found" > monitoring_summary.txt - else - python3 /app/summarize_task_monitor_logs.py \ - --input $INPUTS \ - --output monitoring_summary.txt - fi + command <<< + # Prepend date, time and pwd to xtrace log entries. + PS4='\D{+%F %T} \w $ ' + set -o errexit -o nounset -o pipefail -o xtrace - >>> + python3 /app/summarize_task_monitor_logs.py --fofn_input ~{log_fofn} \ + --output monitoring_summary.txt + >>> - # ------------------------------------------------ - # Runtime settings: - runtime { - docker: variants_docker - memory: "1 GB" - preemptible: 3 - cpu: "1" - disks: "local-disk 100 HDD" - } - output { - File monitoring_summary = "monitoring_summary.txt" - } + runtime { + docker: variants_docker + memory: "1 GB" + preemptible: 3 + cpu: "1" + disks: "local-disk 100 HDD" + } + output { + File monitoring_summary = "monitoring_summary.txt" + } } # Note - this task should probably live in GvsCreateFilterSet, but I moved it here when I was refactoring VQSR Classic out of diff --git a/scripts/variantstore/wdl/extract/summarize_task_monitor_logs.py b/scripts/variantstore/wdl/extract/summarize_task_monitor_logs.py index 784427a4979..64b3fe21bbe 100644 --- a/scripts/variantstore/wdl/extract/summarize_task_monitor_logs.py +++ b/scripts/variantstore/wdl/extract/summarize_task_monitor_logs.py @@ -2,6 +2,7 @@ import sys import re import os +from google.cloud import storage MaxCpu = -100.0 MaxMem = -100.0 @@ -10,16 +11,41 @@ MaxDiskPct = -100.0 -def parse_monitoring_log_files(mlog_files, output_file): +def parse_monitoring_log_files(file_input, fofn_input, output_file): with open(output_file, 'w') as output: header = f"Total Mem\tMax Mem Used\tMax Mem Used (%)\tTotal Disk\tMax Disk Used\tMax Disk Used (" \ f"%)\tTask\tShard\tFile\n" output.write(header) - for mlog_file in mlog_files: - if not os.path.exists(mlog_file): - eprint(f"ERROR: File {mlog_file} does not exist") - parse_monitoring_log_file(mlog_file, output) + if file_input: + for mlog_file in file_input: + if not os.path.exists(mlog_file): + eprint(f"ERROR: Log file {mlog_file} does not exist.") + parse_monitoring_log_file(mlog_file, output) + else: + if not os.path.exists(fofn_input): + eprint(f"ERROR: FOFN file {fofn_input} does not exist.") + client = storage.Client() + with open(fofn_input) as input: + for path in input: + gcs_re = re.compile("^gs://(?P[^/]+)/(?P.*)$") + match = gcs_re.match(path) + if not match: + raise ValueError(f"'{path}' does not look like a GCS path") + + if not os.path.exists("temp"): + os.makedirs("temp") + bucket_name, blob_name = match.groups() + bucket = client.get_bucket(bucket_name) + blob = bucket.get_blob(blob_name) + converted_path = "temp/" + blob_name.replace('/', '_')[-100:] + blob.download_to_filename(converted_path) + parse_monitoring_log_file(converted_path, output) +# os.remove(converted_path) + input.close() + + + def parse_monitoring_log_file(mlog_file, output): @@ -182,9 +208,13 @@ def eprint(*the_args, **kwargs): if __name__ == '__main__': parser = argparse.ArgumentParser(allow_abbrev=False, description='A tool to summarize the output of multiple monitoring logs') - - parser.add_argument('--input', nargs='+', help='Monitoring log file(s)', required=True) parser.add_argument('--output', type=str, help='Output Monitoring log summary file', required=True) - args = parser.parse_args() - parse_monitoring_log_files(args.input, args.output) + file_args = parser.add_mutually_exclusive_group(required=True) + file_args.add_argument('--file_input', nargs='+', + help='Monitoring log file(s); script will fail if you pass too many.') + file_args.add_argument('--fofn_input', type=str, + help='GCS path to a monitoring log FOFN, 1 GCS log path per line.') + + args = parser.parse_args() + parse_monitoring_log_files(args.file_input, args.fofn_input, args.output) diff --git a/scripts/variantstore/wdl/extract/summarize_task_monitor_logs_from_file.py b/scripts/variantstore/wdl/extract/summarize_task_monitor_logs_from_file.py deleted file mode 100755 index 6b6f368344b..00000000000 --- a/scripts/variantstore/wdl/extract/summarize_task_monitor_logs_from_file.py +++ /dev/null @@ -1,207 +0,0 @@ -import argparse -import sys -import re -import os -from google.cloud import storage - - -MaxCpu = -100.0 -MaxMem = -100.0 -MaxMemPct = -100.0 -MaxDisk = -100.0 -MaxDiskPct = -100.0 - - -def parse_monitoring_log_files(mlog_file, output_file): - with open(output_file, 'w') as output, open(mlog_file) as input: - header = f"Total Mem\tMax Mem Used\tMax Mem Used (%)\tTotal Disk\tMax Disk Used\tMax Disk Used (" \ - f"%)\tTask\tShard\tFile\n" - output.write(header) - - client = storage.Client() - for path in input: - gcs_re = re.compile("^gs://(?P[^/]+)/(?P.*)$") - match = gcs_re.match(path) - - if not match: - raise ValueError(f"'{path}' does not look like a GCS path") - - bucket_name, blob_name = match.groups() - bucket = client.get_bucket(bucket_name) - blob = bucket.get_blob(blob_name) - converted_path = "temp/" + blob_name.replace('/', '_')[-100:] - blob.download_to_filename(converted_path) - parse_monitoring_log_file(converted_path, output) - os.remove(converted_path) - input.close() - - -def parse_monitoring_log_file(mlog_file, output): - eprint(f"Parsing: {mlog_file}") - - if os.stat(mlog_file).st_size == 0: - eprint(f"Skipping zero length file") - return - - with open(mlog_file) as ml: - advance_to_section(ml, "--- General Information") - - line = ml.readline().rstrip() # #CPU: 16 - tokens = line.split(": ") - if tokens[0] != '#CPU': - eprint(f"ERROR: Line '{line}' does not look as expected. Is this a monitoring_log file?") - sys.exit(1) - num_cpus = tokens[1] - eprint(f"Num CPUs: {num_cpus}") - - line = ml.readline().rstrip() # Total Memory: 98.25 GiB - tokens = line.split(": ") - if tokens[0] != 'Total Memory': - eprint(f"ERROR: Line '{line}' does not look as expected. Is this a monitoring_log file?") - sys.exit(1) - subtokens = tokens[1].split() - if len(subtokens) != 2: - eprint(f"ERROR: Line '{line}' does not look as expected. Is this a monitoring_log file?") - sys.exit(1) - total_memory = float(subtokens[0]) - total_memory_units = subtokens[1] - eprint(f"Total Memory: {total_memory} {total_memory_units}") - - line = ml.readline().rstrip() # Total Disk space: 985.000 GiB - tokens = line.split(": ") - if tokens[0] != 'Total Disk space': - eprint(f"ERROR: Line '{line}' does not look as expected. Is this a monitoring_log file?") - sys.exit(1) - subtokens = tokens[1].split() - if len(subtokens) != 2: - eprint(f"ERROR: Line '{line}' does not look as expected. Is this a monitoring_log file?") - sys.exit(1) - total_disk = float(subtokens[0]) - total_disk_units = subtokens[1] - eprint(f"Total Disk space: {total_disk} {total_disk_units}") - - advance_to_section(ml, "--- Runtime Information") - - global MaxCpu - MaxCpu = -100.0 - global MaxMem - MaxMem = -100.0 - global MaxMemPct - MaxMemPct = -100.0 - global MaxDisk - MaxDisk = -100.0 - global MaxDiskPct - MaxDiskPct = -100.0 - - field_type = 0 - while line := ml.readline().rstrip(): - if field_type == 1: - parse_cpu_usage_line(line) - elif field_type == 2: - parse_memory_usage_line(line) - elif field_type == 3: - parse_disk_usage_line(line) - field_type += 1 - if field_type > 4: - field_type = 0 - - file_path = os.path.abspath(mlog_file) - tokens = file_path.split("/") - # Looks like '*/call-IndelsVariantRecalibrator/monitoring.log' for non-sharded - # Looks like '*/call-ScoreVariantAnnotationsINDELs/shard-35/monitoring.log' for sharded - # Looks like '*/call-MergeVCFs/cacheCopy/monitoring.log' for cached, non-sharded - # Looks like '*/call-ExtractFilterTask/shard-0/cacheCopy/monitoring.log' for cached sharded - shard = "" - index = -2 - if tokens[index] == "cacheCopy": - index = -3 - if tokens[index].startswith("shard-"): - shard = tokens[index][6:] - task = tokens[index - 1][5:] # Strip off the 'call-' prefix - else: - task = tokens[index][5:] - - summary = f"{total_memory}\t{MaxMem}\t{MaxMemPct}\t{total_disk}\t{MaxDisk}\t{MaxDiskPct}\t{task}\t{shard}\t" \ - f"{os.path.abspath(mlog_file)}\n" - output.write(summary) - - -def parse_cpu_usage_line(line): - p = "^\\* CPU usage\\: (\\d+\\.\\d+)%$" # Looks Like: * CPU usage: 17.6% - m = re.match(p, line) - if m is not None: - cpu = float(m.group(1)) - global MaxCpu - if cpu > MaxCpu: - MaxCpu = cpu - else: - # Check if it's a nan (we see this sometimes at startup) - p2 = "^\\* CPU usage\\: -nan\\%$" # * CPU usage: -nan% - m2 = re.match(p2, line) - if m2 is None: - # Check if it's just empty - p3 = "^\\* CPU usage\\: *$" # * CPU usage: - m3 = re.match(p3, line) - if m3 is None: - eprint(f"ERROR: Line '{line}' does not look like a CPU usage line. Is this a monitoring_log file?") - sys.exit(1) - - -def parse_memory_usage_line(line): - p = "^\\* Memory usage\\: (\\d+\\.\\d+) \\S+ (\\d+(\\.\\d*)?)\\%$" # Looks Like: * Memory usage: 1.79 GiB 1.8% - m = re.match(p, line) - if m is None: - eprint(f"ERROR: Line '{line}' does not look like a Memory usage line. Is this a monitoring_log file?") - sys.exit(1) - mem = float(m.group(1)) - mem_pct = float(m.group(2)) - global MaxMem - global MaxMemPct - if mem > MaxMem: - MaxMem = mem - MaxMemPct = mem_pct - - -def parse_disk_usage_line(line): - p = "^\\* Disk usage\\: (\\d+\\.\\d+) \\S+ (\\d+(\\.\\d*)?)\\%$" # Looks Like: * Disk usage: 22.000 GiB 3% - m = re.match(p, line) - if m is None: - eprint(f"ERROR: Line '{line}' does not look like a Disk usage line. Is this a monitoring_log file?") - sys.exit(1) - disk = float(m.group(1)) - disk_pct = float(m.group(2)) - global MaxDisk - global MaxDiskPct - if disk > MaxDisk: - MaxDisk = disk - MaxDiskPct = disk_pct - - -def advance_to_section(fp, section_header_start): - """ - A method to advance to the runtime block in the file - :param fp: The file's file pointer - :param section_header_start: The (starting part) of the section header in the file. - :return: The nextmost line that is not a header line - """ - while line := fp.readline(): - if line.startswith(section_header_start): - return line - - -def eprint(*the_args, **kwargs): - print(*the_args, file=sys.stderr, **kwargs) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser(allow_abbrev=False, - description='A tool to summarize the output of multiple monitoring logs') - - parser.add_argument('--input', type=str, help='Monitoring log FOFN', required=True) - parser.add_argument('--output', type=str, help='Output Monitoring log summary file', required=True) - args = parser.parse_args() - - if not os.path.exists("temp"): - os.makedirs("temp") - - parse_monitoring_log_files(args.input, args.output) diff --git a/scripts/variantstore/wdl/extract/test_summarize_task_monitor_logs.py b/scripts/variantstore/wdl/extract/test_summarize_task_monitor_logs.py index 66e033cd2c3..c2e26164ecb 100644 --- a/scripts/variantstore/wdl/extract/test_summarize_task_monitor_logs.py +++ b/scripts/variantstore/wdl/extract/test_summarize_task_monitor_logs.py @@ -16,7 +16,7 @@ def test_parse_monitoring_log_files(self): 'summarize_task_monitor_logs_test_files/call-MergeVCFs/cacheCopy/monitoring.log', 'summarize_task_monitor_logs_test_files/call-SamplesTableDatetimeCheck/monitoring.log'] with tempfile.NamedTemporaryFile() as actual_output_file: - parse_monitoring_log_files(input_files, actual_output_file.name) + parse_monitoring_log_files(input_files, '', actual_output_file.name) expected_output_file = 'summarize_task_monitor_logs_test_files/expected_monitoring_summary_file.txt' with open(actual_output_file.name, 'r') as actual, open(expected_output_file, 'r') as expected: