diff --git a/src/aind_behavior_services/launcher/_service.py b/src/aind_behavior_services/launcher/_service.py index 6808624..dd38164 100644 --- a/src/aind_behavior_services/launcher/_service.py +++ b/src/aind_behavior_services/launcher/_service.py @@ -1,8 +1,15 @@ import logging -from typing import Protocol +from typing import Optional, Protocol -class Service(Protocol): - def validate(self, logger: logging.Logger, *args, **kwargs) -> bool: ... +class IService(Protocol): + """A minimal interface to ensure that services have expected functionality.""" + + def __init__(self, logger: Optional[logging.Logger], *args, **kwargs) -> None: ... + + def validate(self, *args, **kwargs) -> bool: ... def register(self, *args, **kwargs) -> None: ... + + @property + def logger(self) -> Optional[logging.Logger]: ... diff --git a/src/aind_behavior_services/launcher/resource_manager_service.py b/src/aind_behavior_services/launcher/resource_manager_service.py new file mode 100644 index 0000000..536cf90 --- /dev/null +++ b/src/aind_behavior_services/launcher/resource_manager_service.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import logging +import shutil +from dataclasses import dataclass, field +from typing import Callable, List, Optional +import os + +from aind_behavior_services.launcher._service import IService + + +class ResourceManager(IService): + def __init__( + self, + logger: logging.Logger, + constrains: Optional[List[Constraint]] = None, + ) -> None: + self.constraints = constrains or [] + self._logger = logger + + @property + def logger(self) -> logging.Logger: + return self._logger + + def validate(self, *args, **kwargs) -> bool: + return True + + def register(self, *args, **kwargs) -> None: + pass + + def add_constraint(self, constraint: Constraint) -> None: + self.constraints.append(constraint) + + def remove_constraint(self, constraint: Constraint) -> None: + self.constraints.remove(constraint) + + def evaluate_constraints(self) -> bool: + for constraint in self.constraints: + if not constraint(): + self.logger.error(constraint.on_fail()) + return False + return True + + +@dataclass(frozen=True) +class Constraint: + name: str + constraint: Callable[..., bool] + args: List = field(default_factory=list) + kwargs: dict = field(default_factory=dict) + fail_msg_handler: Optional[Callable[..., str]] = field + + def __call__(self) -> bool | Exception: + return self.constraint(*self.args, **self.kwargs) + + def on_fail(self) -> str: + if self.fail_msg_handler: + return self.fail_msg_handler(*self.args, **self.kwargs) + return f"Constraint {self.name} failed." + + +def available_storage_constraint_factory(drive: str = "C:\\", min_bytes: float = 2e11) -> Constraint: + return Constraint( + name="available_storage", + constraint=lambda drive, min_bytes: shutil.disk_usage(drive).free >= min_bytes, + args=[], + kwargs={"drive": drive, "min_bytes": min_bytes}, + fail_msg_handler=lambda drive, + min_bytes: f"Drive {drive} does not have enough space. Minimum required: {min_bytes} bytes.", + ) + + +def remote_dir_exists_constraint_factory(dir_path: os.PathLike) -> Constraint: + return Constraint( + name="remote_dir_exists", + constraint=lambda dir_path: os.path.exists(dir_path), + args=[], + kwargs={"dir_path": dir_path}, + fail_msg_handler=lambda dir_path: f"Directory {dir_path} does not exist.", + ) + diff --git a/src/aind_behavior_services/launcher/services.py b/src/aind_behavior_services/launcher/services.py index 39780b6..8c739c6 100644 --- a/src/aind_behavior_services/launcher/services.py +++ b/src/aind_behavior_services/launcher/services.py @@ -1,19 +1,22 @@ import logging from typing import Optional, Self, TypeVar, Union +import aind_behavior_services.launcher.resource_manager_service as resource_manager_service import aind_behavior_services.launcher.watchdog_service as watchdog_service -SupportedServices = Union[watchdog_service.WatchdogService] +SupportedServices = Union[watchdog_service.WatchdogService, resource_manager_service.ResourceManager] TService = TypeVar("TService", bound=SupportedServices) class Services: _watchdog: Optional[watchdog_service.WatchdogService] _logger: Optional[logging.Logger] + _resource_manager: Optional[resource_manager_service.ResourceManager] def __init__(self, logger: Optional[logging.Logger] = None): self._watchdog = None self._logger = logger + self._resource_manager = None @property def logger(self) -> logging.Logger: @@ -38,12 +41,25 @@ def register_watchdog(self, watchdog: watchdog_service.WatchdogService) -> Self: self._watchdog = watchdog return self + @property + def resource_manager(self) -> Optional[resource_manager_service.ResourceManager]: + return self._resource_manager + + def register_resource_manager(self, resource_manager: resource_manager_service.ResourceManager) -> Self: + if self._resource_manager is not None: + raise ValueError("Resource manager already registered") + self._resource_manager = resource_manager + return self + def register(self, service: TService) -> Self: if isinstance(service, watchdog_service.WatchdogService): return self.register_watchdog(service) - raise ValueError(f"Unsupported service: {service}") + elif isinstance(service, resource_manager_service.ResourceManager): + return self.register_resource_manager(service) + else: + raise ValueError(f"Unsupported service: {service}") def validate_service(self, obj: Optional[TService]) -> bool: if obj is None: raise ValueError("Service not set") - return obj.validate(self.logger) + return obj.validate() diff --git a/src/aind_behavior_services/launcher/watchdog_service.py b/src/aind_behavior_services/launcher/watchdog_service.py index 4f15497..db6a677 100644 --- a/src/aind_behavior_services/launcher/watchdog_service.py +++ b/src/aind_behavior_services/launcher/watchdog_service.py @@ -9,22 +9,30 @@ import aind_behavior_services.aind_services.watchdog as watchdog from aind_behavior_services.aind_services.data_mapper import AdsSession -from aind_behavior_services.launcher._service import Service +from aind_behavior_services.launcher._service import IService from aind_behavior_services.session import AindBehaviorSessionModel from aind_behavior_services.utils import format_datetime TSession = TypeVar("TSession", bound=AindBehaviorSessionModel) -class WatchdogService(watchdog.Watchdog, Service): +class WatchdogService(watchdog.Watchdog, IService): + def __init__(self, logger: Logger, *args, **kwargs): + super().__init__(*args, **kwargs) + self._logger = logger + + @property + def logger(self) -> Logger: + return self._logger + def post_run_hook_routine( self, - logger: Logger, session_schema: TSession, ads_session: AdsSession, remote_path: PathLike, session_directory: PathLike, ): + logger = self.logger try: if not self.is_running(): logger.warning("Watchdog service is not running. Attempting to start it.") @@ -62,7 +70,8 @@ def post_run_hook_routine( except (pydantic.ValidationError, ValueError, IOError) as e: logger.error("Failed to create watchdog manifest config. %s", e) - def validate(self, logger: Logger) -> bool: + def validate(self, *args, **kwargs) -> bool: + logger = self.logger logger.info("Watchdog service is enabled.") is_running = True if not self.is_running(): diff --git a/tests/test_launcher_services.py b/tests/test_launcher_services.py new file mode 100644 index 0000000..ad78b63 --- /dev/null +++ b/tests/test_launcher_services.py @@ -0,0 +1,59 @@ +import logging +import unittest + +from aind_behavior_services.launcher import resource_manager_service, watchdog_service + + +class LauncherServicesTests(unittest.TestCase): + + def test_resource_manager_service(self): + + logger = logging.getLogger(__name__) + resource_manager = resource_manager_service.ResourceManager(logger) + + resource_manager.add_constraint( + resource_manager_service.Constraint( + name="test_constraint", + constraint=lambda: True, + fail_msg_handler=lambda: "Constraint failed." + ) + ) + + self.assertTrue(resource_manager.evaluate_constraints()) + + resource_manager.add_constraint( + resource_manager_service.Constraint( + name="test_constraint", + constraint=lambda: False, + fail_msg_handler=lambda: "Constraint failed." + ) + ) + + resource_manager.add_constraint(resource_manager) + self.assertFalse(resource_manager.evaluate_constraints()) + + def test_resource_manager_service_constraint(self): + + constraint = resource_manager_service.Constraint( + name="test_constraint", + constraint=lambda x: x, + fail_msg_handler=lambda: "Constraint failed.", + args=[True] + ) + + self.assertTrue(constraint(), True) + + constraint = resource_manager_service.Constraint( + name="test_constraint", + constraint=lambda x: x, + fail_msg_handler=lambda: "Constraint failed.", + args=[False] + ) + self.assertFalse(constraint(), False) + + def test_watchdog_service(self): + _ = watchdog_service.WatchdogService(logger=logging.getLogger(__name__)) + + +if __name__ == "__main__": + unittest.main()