Skip to content

Commit

Permalink
Merge pull request #165 from AllenNeuralDynamics/release-v1.5.0
Browse files Browse the repository at this point in the history
Release v1.5.0
  • Loading branch information
jtyoung84 authored Sep 28, 2024
2 parents 6ac5d08 + b180603 commit 6c1a082
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 4 deletions.
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ ADD src ./src
ADD pyproject.toml .
ADD setup.py .

# Add git in case we need to install from branches
RUN apt-get update && apt-get install -y git

# Pip command. Without '-e' flag, index.html isn't found. There's probably a
# better way to add the static html files to the site-packages.
RUN pip install -e .[server] --no-cache-dir
Expand Down
227 changes: 226 additions & 1 deletion docs/source/UserGuide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ from our shared network drives (e.g., VAST) to the cloud.
Prerequisites
-------------

- Its assumed that raw data is already stored and organized on a
- It's assumed that raw data is already stored and organized on a
shared network drive such as VAST.
- The raw data should be organized by modality.

Expand Down Expand Up @@ -181,6 +181,231 @@ errors.
Please reach out to Scientific Computing if you think you may need to
customize the Slurm settings.

Session settings for aind-metadata-mapper
-----------------------------------------

There are two methods for adding settings to process session.json files automatically during upload.

1) Creating JobSettings directly and attaching them to the BasicUploadJobConfigs

.. code-block:: python
import json
import requests
from aind_data_transfer_models.core import (
ModalityConfigs,
BasicUploadJobConfigs,
SubmitJobRequest,
)
from aind_metadata_mapper.models import SessionSettings, JobSettings as GatherMetadataJobSettings
from aind_metadata_mapper.bergamo.models import JobSettings as BergamoSessionSettings
from aind_data_schema_models.modalities import Modality
from aind_data_schema_models.platforms import Platform
from datetime import datetime
acq_datetime = datetime.fromisoformat("2000-01-01T01:11:41")
bergamo_session_settings = BergamoSessionSettings(
input_source="/allen/aind/scratch/svc_aind_upload/test_data_sets/bci/061022",
experimenter_full_name=["John Apple"],
subject_id="655019",
imaging_laser_wavelength=920,
fov_imaging_depth=200,
fov_targeted_structure="Primary Motor Cortex",
notes="test upload",
)
session_settings = SessionSettings(job_settings=bergamo_session_settings)
# directory_to_write_to is required, but will be set later by service.
# We can set it to "stage" for now.
metadata_job_settings = GatherMetadataJobSettings(directory_to_write_to="stage", session_settings=session_settings)
ephys_config = ModalityConfigs(
modality=Modality.ECEPHYS,
source=(
"/allen/aind/scratch/svc_aind_upload/test_data_sets/ecephys/655019_2023-04-03_18-17-07"
),
)
project_name = "Ephys Platform"
subject_id = "655019"
platform = Platform.ECEPHYS
s3_bucket = "private"
upload_job_configs = BasicUploadJobConfigs(
project_name=project_name,
s3_bucket=s3_bucket,
platform=platform,
subject_id=subject_id,
acq_datetime=acq_datetime,
modalities=[ephys_config],
metadata_configs=metadata_job_settings,
)
upload_jobs = [upload_job_configs]
submit_request = SubmitJobRequest(
upload_jobs=upload_jobs
)
post_request_content = json.loads(submit_request.model_dump_json(round_trip=True, exclude_none=True))
# Uncomment the following to submit the request
# submit_job_response = requests.post(url="http://aind-data-transfer-service/api/v1/submit_jobs", json=post_request_content)
# print(submit_job_response.status_code)
# print(submit_job_response.json())
2) Using a pre-built settings.json file. You can compile the JobSettings class, save it to a json file, and point to that file.

.. code-block:: python
import json
import requests
from aind_data_transfer_models.core import (
ModalityConfigs,
BasicUploadJobConfigs,
SubmitJobRequest,
)
from aind_metadata_mapper.models import SessionSettings, JobSettings as GatherMetadataJobSettings
from aind_metadata_mapper.bergamo.models import JobSettings as BergamoSessionSettings
from aind_data_schema_models.modalities import Modality
from aind_data_schema_models.platforms import Platform
from datetime import datetime
acq_datetime = datetime.fromisoformat("2000-01-01T01:11:41")
metadata_configs_from_file = {
"session_settings": {
"job_settings": {
"user_settings_config_file":"/allen/aind/scratch/svc_aind_upload/test_data_sets/bci/test_bergamo_settings.json",
"job_settings_name": "Bergamo"
}
}
}
ephys_config = ModalityConfigs(
modality=Modality.ECEPHYS,
source=(
"/allen/aind/scratch/svc_aind_upload/test_data_sets/ecephys/655019_2023-04-03_18-17-07"
),
)
project_name = "Ephys Platform"
subject_id = "655019"
platform = Platform.ECEPHYS
s3_bucket = "private"
upload_job_configs = BasicUploadJobConfigs(
project_name=project_name,
s3_bucket=s3_bucket,
platform=platform,
subject_id=subject_id,
acq_datetime=acq_datetime,
modalities=[ephys_config],
metadata_configs=metadata_configs_from_file,
)
upload_jobs = [upload_job_configs]
# Because we use a dict, this may raise a pydantic serializer warning.
# The warning can be suppressed, but it isn't necessary
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
submit_request = SubmitJobRequest(
upload_jobs=upload_jobs
)
post_request_content = json.loads(submit_request.model_dump_json(round_trip=True, exclude_none=True, warnings=False))
# Uncomment the following to submit the request
# submit_job_response = requests.post(url="http://aind-data-transfer-service/api/v1/submit_jobs", json=post_request_content)
# print(submit_job_response.status_code)
# print(submit_job_response.json())
Submitting SmartSPIM jobs
-------------------------

SmartSPIM jobs are unique in that the compression step will be performed as a job array. If the directory structure looks like:
```
SmartSPIM/
- Ex_488_Em_525/
- 471320/
- 471320_701490
...
- 471320_831090
...
- 568520/
...
...
- Ex_639_Em_680/
...
```
Then each "stack" (e.g., 471320_701490) will be processed individually.
If there are 60 stacks, then a good number_of_partitions will be 20.
In this case, the total time for the job will be around 3 times it takes to process one stack.
As a default, the SmartSPIM job will use a number_of_partitions of 10 and distribute the stacks evenly across 10 slurm jobs.
It's possible to customize the number_of_partitions as in the following example:

.. code-block:: python
import json
import requests
from aind_data_transfer_models.core import (
ModalityConfigs,
BasicUploadJobConfigs,
SubmitJobRequest,
)
from aind_data_schema_models.modalities import Modality
from aind_data_schema_models.platforms import Platform
from aind_slurm_rest.models import V0036JobProperties
from datetime import datetime
# Optional settings. Default partition size will be set to 10, but can also be
# provided as such. If partition_size is larger than the number of stacks, this
# may lead to inefficiencies and errors.
partition_size: int = 20
job_props = V0036JobProperties(
environment=dict(),
array=f"0-{partition_size-1}"
)
acq_datetime = datetime.fromisoformat("2023-10-18T20:30:30")
spim_config = ModalityConfigs(
modality=Modality.SPIM,
slurm_settings=job_props,
compress_raw_data=True,
source=(
"/allen/aind/scratch/svc_aind_upload/test_data_sets/smartspim/"
"SmartSPIM_695464_2023-10-18_20-30-30"
),
)
project_name = "MSMA Platform"
subject_id = "695464"
platform = Platform.SMARTSPIM
# can also be set to "open" if writing to the open bucket.
s3_bucket = "private"
upload_job_configs = BasicUploadJobConfigs(
project_name=project_name,
s3_bucket=s3_bucket,
platform=platform,
subject_id=subject_id,
acq_datetime=acq_datetime,
modalities=[spim_config],
)
# Add more to the list if needed
upload_jobs = [upload_job_configs]
# Optional email address and notification types if desired
submit_request = SubmitJobRequest(
upload_jobs=upload_jobs,
)
post_request_content = json.loads(
submit_request.model_dump_json(round_trip=True, exclude_none=True)
)
# Uncomment the following to submit the request
# submit_job_response = requests.post(url="http://aind-data-transfer-service/api/v1/submit_jobs", json=post_request_content)
# print(submit_job_response.status_code)
# print(submit_job_response.json())
Viewing the status of submitted jobs
------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies = [
'pydantic>=2.7,<2.9',
'pydantic-settings>=2.0',
'aind-data-schema>=1.0.0',
'aind-data-transfer-models==0.8.4'
'aind_data_transfer_models==0.9.0'
]

[project.optional-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/aind_data_transfer_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Init package"""
import os

__version__ = "1.4.1"
__version__ = "1.5.0"

# Global constants
OPEN_DATA_BUCKET_NAME = os.getenv("OPEN_DATA_BUCKET_NAME", "open")
Expand Down
49 changes: 48 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,7 @@ def test_submit_v1_jobs_200_trigger_capsule_configs(
self,
mock_post: MagicMock,
):
"""Tests suubmission when user adds trigger_capsule_configs"""
"""Tests submission when user adds trigger_capsule_configs"""

mock_response = Response()
mock_response.status_code = 200
Expand Down Expand Up @@ -1658,6 +1658,53 @@ def test_submit_v1_jobs_200_trigger_capsule_configs(
)
self.assertEqual(200, submit_job_response.status_code)

@patch.dict(os.environ, EXAMPLE_ENV_VAR1, clear=True)
@patch("requests.post")
def test_submit_v1_jobs_200_basic_serialization(
self,
mock_post: MagicMock,
):
"""Tests submission when user posts standard pydantic json"""

mock_response = Response()
mock_response.status_code = 200
mock_response._content = json.dumps({"message": "sent"}).encode(
"utf-8"
)
mock_post.return_value = mock_response
ephys_source_dir = PurePosixPath("shared_drive/ephys_data/690165")

s3_bucket = "private"
subject_id = "690165"
acq_datetime = datetime(2024, 2, 19, 11, 25, 17)
platform = Platform.ECEPHYS

ephys_config = ModalityConfigs(
modality=Modality.ECEPHYS,
source=ephys_source_dir,
)
project_name = "Ephys Platform"

upload_job_configs = BasicUploadJobConfigs(
project_name=project_name,
s3_bucket=s3_bucket,
platform=platform,
subject_id=subject_id,
acq_datetime=acq_datetime,
modalities=[ephys_config],
)

upload_jobs = [upload_job_configs]
submit_request = SubmitJobRequest(upload_jobs=upload_jobs)

post_request_content = json.loads(submit_request.model_dump_json())

with TestClient(app) as client:
submit_job_response = client.post(
url="/api/v1/submit_jobs", json=post_request_content
)
self.assertEqual(200, submit_job_response.status_code)


if __name__ == "__main__":
unittest.main()

0 comments on commit 6c1a082

Please sign in to comment.