Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: trims down configs #12

Merged
merged 3 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dynamic = ["version"]

dependencies = [
'dask',
'aind-data-transfer-models==0.8.2'
'aind-data-transfer-models==0.8.4'
]

[project.optional-dependencies]
Expand Down
74 changes: 40 additions & 34 deletions src/aind_data_upload_utils/check_directories_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,62 @@
from glob import glob
from pathlib import Path
from time import time
from typing import List, Union
from typing import List, Optional, Union

from aind_data_schema_models.modalities import Modality
from aind_data_schema_models.platforms import Platform
from aind_data_transfer_models.core import BasicUploadJobConfigs
from aind_data_transfer_models.core import ModalityConfigs
from dask import bag as dask_bag
from pydantic import Field, field_validator
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings

# Set log level from env var
LOG_LEVEL = os.getenv("LOG_LEVEL", "WARNING")
logging.basicConfig(level=LOG_LEVEL)


class JobSettings(BaseSettings):
"""Job settings for CheckDirectoriesJob"""
class DirectoriesToCheckConfigs(BaseModel):
"""Basic model needed from BasicUploadConfigs"""

upload_configs: BasicUploadJobConfigs
n_partitions: int = Field(default=20)
num_of_smart_spim_levels: int = Field(default=3)
platform: Platform.ONE_OF
modalities: List[ModalityConfigs] = []
metadata_dir: Optional[Path] = None

@field_validator("upload_configs", mode="before")
@field_validator("modalities", mode="before")
def parse_json_str(
cls, upload_conf: Union[BasicUploadJobConfigs, dict]
) -> BasicUploadJobConfigs:
cls, mod_configs: Union[List[ModalityConfigs], List[dict]]
) -> List[ModalityConfigs]:
"""
Method to ignore computed fields in serialized model, which might
raise validation errors.
Parameters
----------
upload_conf : Union[BasicUploadJobConfigs, dict]
mod_configs : Union[List[ModalityConfigs], List[dict]]

Returns
-------
BasicUploadJobConfigs
List[ModalityConfigs]
"""
# TODO: This should be moved to the BasicUploadJobConfigs class itself
if isinstance(upload_conf, dict):
json_obj = deepcopy(upload_conf)
# Remove s3_prefix computed field
if json_obj.get("s3_prefix") is not None:
del json_obj["s3_prefix"]
# Remove output_folder_name from modalities
if json_obj.get("modalities") is not None:
for modality in json_obj["modalities"]:
if "output_folder_name" in modality:
del modality["output_folder_name"]
return BasicUploadJobConfigs.model_validate_json(
json.dumps(json_obj)
)
else:
return upload_conf
parsed_configs = []
for mod_conf in mod_configs:
if isinstance(mod_conf, dict):
json_obj = deepcopy(mod_conf)
if "output_folder_name" in json_obj:
del json_obj["output_folder_name"]
parsed_configs.append(
ModalityConfigs.model_validate_json(json.dumps(json_obj))
)
else:
parsed_configs.append(mod_conf)
return parsed_configs


class JobSettings(BaseSettings, extra="allow"):
"""Job settings for CheckDirectoriesJob"""

directories_to_check_configs: DirectoriesToCheckConfigs
n_partitions: int = Field(default=20)
num_of_smart_spim_levels: int = Field(default=3)


class CheckDirectoriesJob:
Expand Down Expand Up @@ -113,16 +117,18 @@ def _get_list_of_directories_to_check(self) -> List[Union[Path, str]]:
List[Union[Path, str]]

"""
upload_configs = self.job_settings.upload_configs
dirs_to_check_configs = self.job_settings.directories_to_check_configs
directories_to_check = []
platform = upload_configs.platform
platform = dirs_to_check_configs.platform
# First, check all the json files in the metadata dir
if upload_configs.metadata_dir is not None:
metadata_dir_path = str(upload_configs.metadata_dir).rstrip("/")
if dirs_to_check_configs.metadata_dir is not None:
metadata_dir_path = str(dirs_to_check_configs.metadata_dir).rstrip(
"/"
)
for json_file in glob(f"{metadata_dir_path}/*.json"):
self._check_path(Path(json_file).as_posix())
# Next add modality directories
for modality_config in upload_configs.modalities:
for modality_config in dirs_to_check_configs.modalities:
modality = modality_config.modality
source_dir = modality_config.source
# We'll handle SmartSPIM differently and partition 3 levels deep
Expand Down
20 changes: 6 additions & 14 deletions tests/test_check_directories_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@

from aind_data_schema_models.modalities import Modality
from aind_data_schema_models.platforms import Platform
from aind_data_transfer_models.core import (
BasicUploadJobConfigs,
ModalityConfigs,
)
from aind_data_transfer_models.core import ModalityConfigs

from aind_data_upload_utils.check_directories_job import (
CheckDirectoriesJob,
DirectoriesToCheckConfigs,
JobSettings,
)

Expand All @@ -33,8 +31,7 @@ class TestJobSettings(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
"""Sets up class with example upload configs"""
example_upload_configs = BasicUploadJobConfigs(
project_name="SmartSPIM",
example_upload_configs = DirectoriesToCheckConfigs(
platform=Platform.SMARTSPIM,
modalities=[
ModalityConfigs(
Expand All @@ -52,16 +49,14 @@ def setUpClass(cls) -> None:
modality=Modality.SPIM,
),
],
subject_id="12345",
acq_datetime="2020-10-10T01:01:01",
metadata_dir=(RESOURCES_DIR / "metadata_dir").as_posix(),
)
cls.example_upload_configs = example_upload_configs

def test_class_constructor(self):
"""Tests that job settings can be constructed from serialized json."""
upload_configs = self.example_upload_configs
job_settings = JobSettings(upload_configs=upload_configs)
job_settings = JobSettings(directories_to_check_configs=upload_configs)
deserialized_settings = job_settings.model_validate_json(
job_settings.model_dump_json()
)
Expand All @@ -74,8 +69,7 @@ class TestCheckDirectoriesJob(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
"""Sets up class with example settings"""
example_upload_configs = BasicUploadJobConfigs(
project_name="SmartSPIM",
example_upload_configs = DirectoriesToCheckConfigs(
platform=Platform.SMARTSPIM,
modalities=[
ModalityConfigs(
Expand All @@ -93,13 +87,11 @@ def setUpClass(cls) -> None:
modality=Modality.SPIM,
),
],
subject_id="12345",
acq_datetime="2020-10-10T01:01:01",
metadata_dir=(RESOURCES_DIR / "metadata_dir").as_posix(),
)
cls.example_job = CheckDirectoriesJob(
job_settings=JobSettings(
upload_configs=example_upload_configs,
directories_to_check_configs=example_upload_configs,
num_of_smart_spim_levels=2,
)
)
Expand Down
Loading