From d218b5f9d6dafcffb3e96498842811199f7338d5 Mon Sep 17 00:00:00 2001 From: jtyoung84 <104453205+jtyoung84@users.noreply.github.com> Date: Thu, 8 Aug 2024 10:12:25 -0700 Subject: [PATCH] feat: s5 commands (#4) * feat: adds create s5 commands job --- .../create_s5_commands_job.py | 234 ++++++++++++ tests/test_create_s5_commands_job.py | 348 ++++++++++++++++++ 2 files changed, 582 insertions(+) create mode 100644 src/aind_data_upload_utils/create_s5_commands_job.py create mode 100644 tests/test_create_s5_commands_job.py diff --git a/src/aind_data_upload_utils/create_s5_commands_job.py b/src/aind_data_upload_utils/create_s5_commands_job.py new file mode 100644 index 0000000..3ba85a1 --- /dev/null +++ b/src/aind_data_upload_utils/create_s5_commands_job.py @@ -0,0 +1,234 @@ +""" +Module to handle creating s5 commands. Scans the first few levels of a staging +folder and creates a list of commands for s5cmd to run in parallel. Will save +the commands to a text file. s5cmd can then be called via singularity like: + >> singularity exec docker://peakcom/s5cmd:v2.2.2 /s5cmd --log error run + s5_commands.txt +""" + +import argparse +import logging +import os +import sys +from glob import glob +from pathlib import Path +from time import time +from typing import List, Optional + +from pydantic import Field, model_validator +from pydantic_settings import BaseSettings + +# Set log level from env var +LOG_LEVEL = os.getenv("LOG_LEVEL", "WARNING") +logging.basicConfig(level=LOG_LEVEL) + + +class JobSettings(BaseSettings): + """Job settings for CreateS5CommandsJob""" + + s3_location: str = Field( + ..., description="S3 bucket and prefix to upload staging directory to" + ) + staging_directory: Path = Field( + ..., description="Location of the staging directory" + ) + num_of_dir_levels_to_partition: int = Field( + default=4, + description=( + "Will scan the directory tree of the staging folder up to this " + "number of levels. All of the files and directories this many " + "levels deep will be used to generate s5cmd upload commands to " + "run in parallel." + ), + ) + s5_commands_file: Optional[Path] = Field( + default=None, + validate_default=True, + description=( + "Location to save the s5cmd text file to. As default, will save " + "to the staging folder." + ), + ) + + @model_validator(mode="after") + def set_default_commands_file(self): + """If s5_commands_file is None, will default to saving the file to + the staging directory.""" + if self.s5_commands_file is None: + self.s5_commands_file = self.staging_directory / "s5_commands.txt" + return self + + +class CreateS5CommandsJob: + """Job to scan staging folder directory tree and output list of files + and directories to upload to S3 using s5cmd""" + + def __init__(self, job_settings: JobSettings): + """ + Class constructor for CreateS5CommandsJob. + + Parameters + ---------- + job_settings: JobSettings + """ + self.job_settings = job_settings + + def _map_file_path_to_s3_location(self, file_path: str) -> str: + """ + Maps a local file path to a s3 object path. + Parameters + ---------- + file_path : str + Example, + '/stage/ecephys_12345...-10/abc_123' + + Returns + ------- + str + Example, 's3://some_bucket/ecephys_12345...-10-10/abc_123' + + """ + + return file_path.replace( + self.job_settings.staging_directory.as_posix().rstrip("/"), + self.job_settings.s3_location.rstrip("/"), + 1, + ) + + def _create_file_cp_command(self, file_path: str) -> str: + """ + Maps a file path to command for s5cmd + Parameters + ---------- + file_path : str + Example, + '/stage/ecephys_12345...-10/ophys/hello.txt' + + Returns + ------- + str + Example, + 'cp "/stage/ecephys_12345...-10/ophys/hello.txt" + "s3://some_bucket/ecephys_12345...-10/ophys/hello.txt"' + + """ + return ( + f'cp "{file_path}" ' + f'"{self._map_file_path_to_s3_location(file_path)}"' + ) + + def _create_directory_cp_command(self, directory_path: str) -> str: + """ + Maps a dir path to command for s5cmd + Parameters + ---------- + directory_path : str + Example, + '/stage/ecephys_12345...-10/ophys/sub_dir' + + Returns + ------- + str + Example, + 'cp "/stage/ecephys_12345...-10/ophys/sub_dir/*" + "s3://some_bucket/ecephys_12345...-10/ophys/sub_dir/"' + + """ + local_dir = f"{directory_path.rstrip('/')}/*" + s3_directory = ( + f"{self._map_file_path_to_s3_location(directory_path).rstrip('/')}" + f"/" + ) + return f'cp "{local_dir}" "{s3_directory}"' + + def _get_list_of_upload_commands(self) -> List[str]: + """ + Scans directory tree of the staging folder to generate a list of files + and subdirectories to upload via s5cmd. + Returns + ------- + List[str] + A list of s5cmd that can be run. + + """ + + base_path = self.job_settings.staging_directory.as_posix().rstrip("/") + s5_commands = [] + for _ in range(0, self.job_settings.num_of_dir_levels_to_partition): + base_path = base_path + "/*" + for sub_path in glob(base_path): + if os.path.isfile(Path(sub_path).resolve()): + s5_commands.append( + self._create_file_cp_command(Path(sub_path).as_posix()) + ) + base_path = base_path + "/*" + for sub_path in glob(base_path): + if os.path.isfile(Path(sub_path).resolve()): + s5_commands.append( + self._create_file_cp_command(Path(sub_path).as_posix()) + ) + elif os.path.isdir(Path(sub_path).resolve()): + s5_commands.append( + self._create_directory_cp_command( + Path(sub_path).as_posix() + ) + ) + else: + raise NotADirectoryError( + f"Possible broken file path: {sub_path}" + ) + return s5_commands + + def _save_s5_commands_to_file(self, s5_commands: List[str]) -> None: + """ + Writes list of s5 commands to location defined in configs. + Parameters + ---------- + s5_commands : List[str] + + Returns + ------- + None + + """ + with open(self.job_settings.s5_commands_file, "w") as f: + for line in s5_commands: + f.write(f"{line}\n") + + def run_job(self) -> None: + """ + Runs job. + - Scans staging directory to generate list of s5 commands + - Saves commands to text file that can be used by s5cmd. + Returns + ------- + None + + """ + job_start_time = time() + list_of_upload_commands = self._get_list_of_upload_commands() + self._save_s5_commands_to_file(list_of_upload_commands) + job_end_time = time() + execution_time = job_end_time - job_start_time + logging.debug(f"Task took {execution_time} seconds") + + +if __name__ == "__main__": + sys_args = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument( + "-j", + "--job-settings", + required=False, + type=str, + help=( + r""" + Instead of init args the job settings can optionally be passed in + as a json string in the command line. + """ + ), + ) + cli_args = parser.parse_args(sys_args) + main_job_settings = JobSettings.model_validate_json(cli_args.job_settings) + main_job = CreateS5CommandsJob(job_settings=main_job_settings) + main_job.run_job() diff --git a/tests/test_create_s5_commands_job.py b/tests/test_create_s5_commands_job.py new file mode 100644 index 0000000..202da1b --- /dev/null +++ b/tests/test_create_s5_commands_job.py @@ -0,0 +1,348 @@ +"""Test module for classes and methods in check_directories_job""" + +import os +import unittest +from pathlib import Path +from unittest.mock import MagicMock, mock_open, patch + +from aind_data_upload_utils.create_s5_commands_job import ( + CreateS5CommandsJob, + JobSettings, +) + +RESOURCES_DIR = Path(os.path.dirname(os.path.realpath(__file__))) / "resources" +SMART_SPIM_DIR = ( + RESOURCES_DIR + / "example_smartspim_data_set" + / "SmartSPIM_695464_2023-10-18_20-30-30" +) + + +class TestJobSettings(unittest.TestCase): + """ + Tests for JobSettings class + """ + + def test_class_constructor(self): + """Tests that job settings can be constructed from serialized json.""" + job_settings = JobSettings( + s3_location="s3://some_bucket/some_prefix", + staging_directory="stage", + ) + deserialized_settings = job_settings.model_validate_json( + job_settings.model_dump_json() + ) + self.assertEqual(job_settings, deserialized_settings) + self.assertEqual( + Path("stage") / "s5_commands.txt", job_settings.s5_commands_file + ) + + +class TestCheckDirectoriesJob(unittest.TestCase): + """Tests CheckDirectoriesJob class""" + + @classmethod + def setUpClass(cls) -> None: + """Sets up class with example settings""" + cls.example_job = CreateS5CommandsJob( + job_settings=JobSettings( + s3_location="s3://some_bucket/some_prefix", + staging_directory=SMART_SPIM_DIR, + num_of_dir_levels_to_partition=2, + ) + ) + cls.expected_commands = [ + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/instrument.json" ' + f'"s3://some_bucket/some_prefix/instrument.json"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/nohup.out" ' + f'"s3://some_bucket/some_prefix/SmartSPIM/nohup.out"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/ASI_logging.txt"' + f' "s3://some_bucket/some_prefix/derivatives/ASI_logging.txt"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'DarkMaster_cropped.tif" "s3://some_bucket/some_prefix/' + f'derivatives/DarkMaster_cropped.tif"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'FinalReport.txt" "s3://some_bucket/some_prefix/' + f'derivatives/FinalReport.txt"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Flat_488_Ch0_0.tif" "s3://some_bucket/some_prefix/' + f'derivatives/Flat_488_Ch0_0.tif"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Flat_488_Ch0_1.tif" "s3://some_bucket/some_prefix/' + f'derivatives/Flat_488_Ch0_1.tif"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Flat_561_Ch1_0.tif" "s3://some_bucket/some_prefix/' + f'derivatives/Flat_561_Ch1_0.tif"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Flat_561_Ch1_1.tif" "s3://some_bucket/some_prefix/' + f'derivatives/Flat_561_Ch1_1.tif"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Flat_639_Ch2_0.tif" "s3://some_bucket/some_prefix/' + f'derivatives/Flat_639_Ch2_0.tif"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Flat_639_Ch2_1.tif" "s3://some_bucket/some_prefix/' + f'derivatives/Flat_639_Ch2_1.tif"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'TileSettings.ini" "s3://some_bucket/some_prefix/' + f'derivatives/TileSettings.ini"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/metadata.json" ' + f'"s3://some_bucket/some_prefix/derivatives/metadata.json"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/metadata.txt" ' + f'"s3://some_bucket/some_prefix/derivatives/metadata.txt"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'processing_manifest.json" "s3://some_bucket/some_prefix/' + f'derivatives/processing_manifest.json"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/' + f'Ex_488_Em_525/471320/*" "s3://some_bucket/some_prefix/' + f'SmartSPIM/Ex_488_Em_525/471320/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/' + f'Ex_488_Em_525/503720/*" "s3://some_bucket/some_prefix/' + f'SmartSPIM/Ex_488_Em_525/503720/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/' + f'Ex_488_Em_525/536120/*" "s3://some_bucket/some_prefix/' + f'SmartSPIM/Ex_488_Em_525/536120/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_488_Em_525/' + f'568520/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_488_Em_525/568520/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_488_Em_525/' + f'some_file_here.txt" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_488_Em_525/some_file_here.txt"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_561_Em_600/' + f'471320/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_561_Em_600/471320/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_561_Em_600/' + f'503720/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_561_Em_600/503720/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_561_Em_600/' + f'536120/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_561_Em_600/536120/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_561_Em_600/' + f'568520/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_561_Em_600/568520/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_639_Em_680/' + f'471320/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_639_Em_680/471320/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_639_Em_680/' + f'503720/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_639_Em_680/503720/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_639_Em_680/' + f'536120/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_639_Em_680/536120/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/SmartSPIM/Ex_639_Em_680/' + f'568520/*" "s3://some_bucket/some_prefix/SmartSPIM/' + f'Ex_639_Em_680/568520/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Ex_488_Em_525_MIP/471320/*" "s3://some_bucket/some_prefix/' + f'derivatives/Ex_488_Em_525_MIP/471320/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Ex_488_Em_525_MIP/503720/*" "s3://some_bucket/some_prefix/' + f'derivatives/Ex_488_Em_525_MIP/503720/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Ex_488_Em_525_MIP/536120/*" "s3://some_bucket/some_prefix/' + f'derivatives/Ex_488_Em_525_MIP/536120/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Ex_488_Em_525_MIP/568520/*" "s3://some_bucket/some_prefix/' + f'derivatives/Ex_488_Em_525_MIP/568520/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Ex_561_Em_600_MIP/471320/*" "s3://some_bucket/some_prefix/' + f'derivatives/Ex_561_Em_600_MIP/471320/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Ex_561_Em_600_MIP/503720/*" "s3://some_bucket/some_prefix/' + f'derivatives/Ex_561_Em_600_MIP/503720/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Ex_561_Em_600_MIP/536120/*" "s3://some_bucket/some_prefix/' + f'derivatives/Ex_561_Em_600_MIP/536120/"' + ), + ( + f'cp "{SMART_SPIM_DIR.as_posix()}/derivatives/' + f'Ex_561_Em_600_MIP/568520/*" "s3://some_bucket/some_prefix/' + f'derivatives/Ex_561_Em_600_MIP/568520/"' + ), + ] + + def test_map_file_path_to_s3_location(self): + """Tests _map_file_path_to_s3_location""" + file_path_1 = ( + self.example_job.job_settings.staging_directory / "hello.txt" + ) + file_path_2 = ( + self.example_job.job_settings.staging_directory + / "abc" + / "hello.txt" + ) + dir_path_1 = ( + self.example_job.job_settings.staging_directory / "abc" / "def" + ) + s3_location_1 = self.example_job._map_file_path_to_s3_location( + file_path_1.as_posix() + ) + s3_location_2 = self.example_job._map_file_path_to_s3_location( + file_path_2.as_posix() + ) + s3_location_3 = self.example_job._map_file_path_to_s3_location( + dir_path_1.as_posix() + ) + self.assertEqual( + "s3://some_bucket/some_prefix/hello.txt", s3_location_1 + ) + self.assertEqual( + "s3://some_bucket/some_prefix/abc/hello.txt", s3_location_2 + ) + self.assertEqual("s3://some_bucket/some_prefix/abc/def", s3_location_3) + + def test_create_file_cp_command(self): + """Tests _create_file_cp_command""" + file_path_1 = ( + self.example_job.job_settings.staging_directory / "hello.txt" + ) + file_path_2 = ( + self.example_job.job_settings.staging_directory + / "abc" + / "hello.txt" + ) + command_1 = self.example_job._create_file_cp_command( + file_path_1.as_posix() + ) + command_2 = self.example_job._create_file_cp_command( + file_path_2.as_posix() + ) + expected_command_1 = ( + f'cp "{file_path_1.as_posix()}" ' + f'"s3://some_bucket/some_prefix/hello.txt"' + ) + expected_command_2 = ( + f'cp "{file_path_2.as_posix()}" ' + f'"s3://some_bucket/some_prefix/abc/hello.txt"' + ) + self.assertEqual(expected_command_1, command_1) + self.assertEqual(expected_command_2, command_2) + + def test_create_directory_cp_command(self): + """Tests _create_directory_cp_command""" + dir_path_1 = self.example_job.job_settings.staging_directory / "abc" + dir_path_2 = ( + self.example_job.job_settings.staging_directory / "abc" / "def" + ) + command_1 = self.example_job._create_directory_cp_command( + dir_path_1.as_posix() + ) + command_2 = self.example_job._create_directory_cp_command( + dir_path_2.as_posix() + ) + + expected_command_1 = ( + f'cp "{dir_path_1.as_posix()}/*" ' + f'"s3://some_bucket/some_prefix/abc/"' + ) + expected_command_2 = ( + f'cp "{dir_path_2.as_posix()}/*" ' + f'"s3://some_bucket/some_prefix/abc/def/"' + ) + self.assertEqual(expected_command_1, command_1) + self.assertEqual(expected_command_2, command_2) + + def test_get_list_of_upload_commands(self): + """Tests _get_list_of_upload_commands""" + list_of_commands = self.example_job._get_list_of_upload_commands() + self.assertCountEqual(self.expected_commands, list_of_commands) + + @patch("os.path.isfile") + @patch("os.path.isdir") + def test_get_list_of_upload_commands_error( + self, mock_is_dir: MagicMock, mock_is_file: MagicMock + ): + """Tests _get_list_of_upload_commands edge case where path is + corrupt""" + mock_is_file.return_value = False + mock_is_dir.return_value = False + with self.assertRaises(NotADirectoryError): + self.example_job._get_list_of_upload_commands() + + @patch("builtins.open", new_callable=mock_open()) + def test_save_s5_commands_to_file(self, mock_write: MagicMock): + """Tests _save_s5_commands_to_file""" + + self.example_job._save_s5_commands_to_file(self.expected_commands) + self.assertEqual(39, len(mock_write.mock_calls)) + + @patch("logging.debug") + @patch("builtins.open", new_callable=mock_open()) + def test_run_job(self, mock_write: MagicMock, mock_log_debug: MagicMock): + """Tests run_job""" + + self.example_job.run_job() + self.assertEqual(39, len(mock_write.mock_calls)) + mock_log_debug.assert_called() + + +if __name__ == "__main__": + unittest.main()