Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource manager service #93

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/aind_behavior_services/launcher/_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import logging
from typing import Protocol
from typing import Optional, Protocol


class Service(Protocol):
def validate(self, logger: logging.Logger, *args, **kwargs) -> bool: ...
class IService(Protocol):
"""A minimal interface to ensure that services have expected functionality."""

def __init__(self, logger: Optional[logging.Logger], *args, **kwargs) -> None: ...

def validate(self, *args, **kwargs) -> bool: ...

def register(self, *args, **kwargs) -> None: ...

@property
def logger(self) -> Optional[logging.Logger]: ...
81 changes: 81 additions & 0 deletions src/aind_behavior_services/launcher/resource_manager_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from __future__ import annotations

import logging
import shutil
from dataclasses import dataclass, field
from typing import Callable, List, Optional
import os

from aind_behavior_services.launcher._service import IService


class ResourceManager(IService):
def __init__(
self,
logger: logging.Logger,
constrains: Optional[List[Constraint]] = None,
) -> None:
self.constraints = constrains or []
self._logger = logger

@property
def logger(self) -> logging.Logger:
return self._logger

def validate(self, *args, **kwargs) -> bool:
return True

def register(self, *args, **kwargs) -> None:
pass

def add_constraint(self, constraint: Constraint) -> None:
self.constraints.append(constraint)

def remove_constraint(self, constraint: Constraint) -> None:
self.constraints.remove(constraint)

def evaluate_constraints(self) -> bool:
for constraint in self.constraints:
if not constraint():
self.logger.error(constraint.on_fail())
return False
return True


@dataclass(frozen=True)
class Constraint:
name: str
constraint: Callable[..., bool]
args: List = field(default_factory=list)
kwargs: dict = field(default_factory=dict)
fail_msg_handler: Optional[Callable[..., str]] = field

def __call__(self) -> bool | Exception:
return self.constraint(*self.args, **self.kwargs)

def on_fail(self) -> str:
if self.fail_msg_handler:
return self.fail_msg_handler(*self.args, **self.kwargs)
return f"Constraint {self.name} failed."


def available_storage_constraint_factory(drive: str = "C:\\", min_bytes: float = 2e11) -> Constraint:
return Constraint(
name="available_storage",
constraint=lambda drive, min_bytes: shutil.disk_usage(drive).free >= min_bytes,
args=[],
kwargs={"drive": drive, "min_bytes": min_bytes},
fail_msg_handler=lambda drive,
min_bytes: f"Drive {drive} does not have enough space. Minimum required: {min_bytes} bytes.",
)


def remote_dir_exists_constraint_factory(dir_path: os.PathLike) -> Constraint:
return Constraint(
name="remote_dir_exists",
constraint=lambda dir_path: os.path.exists(dir_path),
args=[],
kwargs={"dir_path": dir_path},
fail_msg_handler=lambda dir_path: f"Directory {dir_path} does not exist.",
)

22 changes: 19 additions & 3 deletions src/aind_behavior_services/launcher/services.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import logging
from typing import Optional, Self, TypeVar, Union

import aind_behavior_services.launcher.resource_manager_service as resource_manager_service
import aind_behavior_services.launcher.watchdog_service as watchdog_service

SupportedServices = Union[watchdog_service.WatchdogService]
SupportedServices = Union[watchdog_service.WatchdogService, resource_manager_service.ResourceManager]
TService = TypeVar("TService", bound=SupportedServices)


class Services:
_watchdog: Optional[watchdog_service.WatchdogService]
_logger: Optional[logging.Logger]
_resource_manager: Optional[resource_manager_service.ResourceManager]

def __init__(self, logger: Optional[logging.Logger] = None):
self._watchdog = None
self._logger = logger
self._resource_manager = None

@property
def logger(self) -> logging.Logger:
Expand All @@ -38,12 +41,25 @@ def register_watchdog(self, watchdog: watchdog_service.WatchdogService) -> Self:
self._watchdog = watchdog
return self

@property
def resource_manager(self) -> Optional[resource_manager_service.ResourceManager]:
return self._resource_manager

def register_resource_manager(self, resource_manager: resource_manager_service.ResourceManager) -> Self:
if self._resource_manager is not None:
raise ValueError("Resource manager already registered")
self._resource_manager = resource_manager
return self

def register(self, service: TService) -> Self:
if isinstance(service, watchdog_service.WatchdogService):
return self.register_watchdog(service)
raise ValueError(f"Unsupported service: {service}")
elif isinstance(service, resource_manager_service.ResourceManager):
return self.register_resource_manager(service)
else:
raise ValueError(f"Unsupported service: {service}")

def validate_service(self, obj: Optional[TService]) -> bool:
if obj is None:
raise ValueError("Service not set")
return obj.validate(self.logger)
return obj.validate()
17 changes: 13 additions & 4 deletions src/aind_behavior_services/launcher/watchdog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,30 @@

import aind_behavior_services.aind_services.watchdog as watchdog
from aind_behavior_services.aind_services.data_mapper import AdsSession
from aind_behavior_services.launcher._service import Service
from aind_behavior_services.launcher._service import IService
from aind_behavior_services.session import AindBehaviorSessionModel
from aind_behavior_services.utils import format_datetime

TSession = TypeVar("TSession", bound=AindBehaviorSessionModel)


class WatchdogService(watchdog.Watchdog, Service):
class WatchdogService(watchdog.Watchdog, IService):
def __init__(self, logger: Logger, *args, **kwargs):
super().__init__(*args, **kwargs)
self._logger = logger

@property
def logger(self) -> Logger:
return self._logger

def post_run_hook_routine(
self,
logger: Logger,
session_schema: TSession,
ads_session: AdsSession,
remote_path: PathLike,
session_directory: PathLike,
):
logger = self.logger
try:
if not self.is_running():
logger.warning("Watchdog service is not running. Attempting to start it.")
Expand Down Expand Up @@ -62,7 +70,8 @@ def post_run_hook_routine(
except (pydantic.ValidationError, ValueError, IOError) as e:
logger.error("Failed to create watchdog manifest config. %s", e)

def validate(self, logger: Logger) -> bool:
def validate(self, *args, **kwargs) -> bool:
logger = self.logger
logger.info("Watchdog service is enabled.")
is_running = True
if not self.is_running():
Expand Down
59 changes: 59 additions & 0 deletions tests/test_launcher_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
import unittest

from aind_behavior_services.launcher import resource_manager_service, watchdog_service


class LauncherServicesTests(unittest.TestCase):

def test_resource_manager_service(self):

logger = logging.getLogger(__name__)
resource_manager = resource_manager_service.ResourceManager(logger)

resource_manager.add_constraint(
resource_manager_service.Constraint(
name="test_constraint",
constraint=lambda: True,
fail_msg_handler=lambda: "Constraint failed."
)
)

self.assertTrue(resource_manager.evaluate_constraints())

resource_manager.add_constraint(
resource_manager_service.Constraint(
name="test_constraint",
constraint=lambda: False,
fail_msg_handler=lambda: "Constraint failed."
)
)

resource_manager.add_constraint(resource_manager)
self.assertFalse(resource_manager.evaluate_constraints())

def test_resource_manager_service_constraint(self):

constraint = resource_manager_service.Constraint(
name="test_constraint",
constraint=lambda x: x,
fail_msg_handler=lambda: "Constraint failed.",
args=[True]
)

self.assertTrue(constraint(), True)

constraint = resource_manager_service.Constraint(
name="test_constraint",
constraint=lambda x: x,
fail_msg_handler=lambda: "Constraint failed.",
args=[False]
)
self.assertFalse(constraint(), False)

def test_watchdog_service(self):
_ = watchdog_service.WatchdogService(logger=logging.getLogger(__name__))


if __name__ == "__main__":
unittest.main()
Loading