Skip to content

Commit

Permalink
Merge pull request #227 from hotosm/drone_imagery_processing
Browse files Browse the repository at this point in the history
Drone Image Processing using Open Drone Map (ODM)
  • Loading branch information
nrjadkry authored Sep 23, 2024
2 parents 32f3bff + 9e81978 commit 5733871
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 11 deletions.
2 changes: 2 additions & 0 deletions src/backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def assemble_db_connection(cls, v: Optional[str], info: ValidationInfo) -> Any:
EMAILS_FROM_EMAIL: Optional[EmailStr] = None
EMAILS_FROM_NAME: Optional[str] = "Drone Tasking Manager"

NODE_ODM_URL: Optional[str] = "http://odm-api:3000"

@computed_field
@property
def emails_enabled(self) -> bool:
Expand Down
155 changes: 155 additions & 0 deletions src/backend/app/projects/image_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import uuid
import tempfile
import shutil
from pathlib import Path
from pyodm import Node
from app.s3 import get_file_from_bucket, list_objects_from_bucket, add_file_to_bucket
from loguru import logger as log
from concurrent.futures import ThreadPoolExecutor


class DroneImageProcessor:
def __init__(
self,
node_odm_url: str,
project_id: uuid.UUID,
task_id: uuid.UUID,
):
"""
Initializes the connection to the ODM node.
"""
# self.node = Node(node_odm_host, node_odm_port)
self.node = Node.from_url(node_odm_url)
self.project_id = project_id
self.task_id = task_id

def options_list_to_dict(self, options=[]):
"""
Converts options formatted as a list ([{'name': optionName, 'value': optionValue}, ...])
to a dictionary {optionName: optionValue, ...}
"""
opts = {}
if options is not None:
for o in options:
opts[o["name"]] = o["value"]
return opts

def download_object(self, bucket_name: str, obj, images_folder: str):
if obj.object_name.endswith((".jpg", ".jpeg", ".JPG", ".png", ".PNG")):
local_path = Path(images_folder) / Path(obj.object_name).name
local_path.parent.mkdir(parents=True, exist_ok=True)
get_file_from_bucket(bucket_name, obj.object_name, local_path)

def download_images_from_s3(self, bucket_name, local_dir):
"""
Downloads images from MinIO under the specified path.
:param bucket_name: Name of the MinIO bucket.
:param project_id: The project UUID.
:param task_id: The task UUID.
:param local_dir: Local directory to save the images.
:return: List of local image file paths.
"""
prefix = f"projects/{self.project_id}/{self.task_id}"

objects = list_objects_from_bucket(bucket_name, prefix)

# Process images concurrently
with ThreadPoolExecutor() as executor:
executor.map(
lambda obj: self.download_object(bucket_name, obj, local_dir),
objects,
)

def list_images(self, directory):
"""
Lists all images in the specified directory.
:param directory: The directory containing the images.
:return: List of image file paths.
"""
images = []
path = Path(directory)

for file in path.rglob("*"):
if file.suffix.lower() in {".jpg", ".jpeg", ".png"}:
images.append(str(file))
return images

def process_new_task(self, images, name=None, options=[], progress_callback=None):
"""
Sends a set of images via the API to start processing.
:param images: List of image file paths.
:param name: Name of the task.
:param options: Processing options ([{'name': optionName, 'value': optionValue}, ...]).
:param progress_callback: Callback function to report upload progress.
:return: The created task object.
"""
opts = self.options_list_to_dict(options)

# FIXME: take this from the function above
opts = {"dsm": True}

task = self.node.create_task(images, opts, name, progress_callback)
return task

def monitor_task(self, task):
"""
Monitors the task progress until completion.
:param task: The task object.
"""
log.info(f"Monitoring task {task.uuid}...")
task.wait_for_completion(interval=5)
log.info("Task completed.")
return task

def download_results(self, task, output_path):
"""
Downloads all results of the task to the specified output path.
:param task: The task object.
:param output_path: The directory where results will be saved.
"""
log.info(f"Downloading results to {output_path}...")
path = task.download_zip(output_path)
log.info("Download completed.")
return path

def process_images_from_s3(self, bucket_name, name=None, options=[]):
"""
Processes images from MinIO storage.
:param bucket_name: Name of the MinIO bucket.
:param project_id: The project UUID.
:param task_id: The task UUID.
:param name: Name of the task.
:param options: Processing options ([{'name': optionName, 'value': optionValue}, ...]).
:return: The task object.
"""
# Create a temporary directory to store downloaded images
temp_dir = tempfile.mkdtemp()
try:
self.download_images_from_s3(bucket_name, temp_dir)

images_list = self.list_images(temp_dir)

# Start a new processing task
task = self.process_new_task(images_list, name=name, options=options)
# Monitor task progress
self.monitor_task(task)

# Optionally, download results
output_file_path = f"/tmp/{self.project_id}"
path_to_download = self.download_results(task, output_path=output_file_path)

# Upload the results into s3
s3_path = f"projects/{self.project_id}/{self.task_id}/assets.zip"
add_file_to_bucket(bucket_name, path_to_download, s3_path)
return task

finally:
# Clean up temporary directory
shutil.rmtree(temp_dir)
pass
75 changes: 74 additions & 1 deletion src/backend/app/projects/project_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@
import shapely.wkb as wkblib
from shapely.geometry import shape
from io import BytesIO
from app.s3 import add_obj_to_bucket
from app.s3 import (
add_obj_to_bucket,
list_objects_from_bucket,
get_presigned_url,
get_object_metadata,
)
from app.config import settings
from app.projects.image_processing import DroneImageProcessor
from app.projects import project_schemas
from minio import S3Error


async def upload_file_to_s3(
Expand Down Expand Up @@ -155,3 +163,68 @@ async def preview_split_by_square(boundary: str, meters: int):
meters=meters,
)
)


def process_drone_images(project_id: uuid.UUID, task_id: uuid.UUID):
# Initialize the processor
processor = DroneImageProcessor(settings.NODE_ODM_URL, project_id, task_id)

# Define processing options
options = [
{"name": "dsm", "value": True},
{"name": "orthophoto-resolution", "value": 5},
]

processor.process_images_from_s3(
settings.S3_BUCKET_NAME, name=f"DTM-Task-{task_id}", options=options
)


async def get_project_info_from_s3(project_id: uuid.UUID, task_id: uuid.UUID):
"""
Helper function to get the number of images and the URL to download the assets.
"""
try:
# Prefix for the images
images_prefix = f"projects/{project_id}/{task_id}/images/"

# List and count the images
objects = list_objects_from_bucket(
settings.S3_BUCKET_NAME, prefix=images_prefix
)
image_extensions = (".jpg", ".jpeg", ".png", ".tif", ".tiff")
image_count = sum(
1 for obj in objects if obj.object_name.lower().endswith(image_extensions)
)

# Generate a presigned URL for the assets ZIP file
try:
# Check if the object exists
assets_path = f"processed/{project_id}/{task_id}/assets.zip"
get_object_metadata(settings.S3_BUCKET_NAME, assets_path)

# If it exists, generate the presigned URL
presigned_url = get_presigned_url(
settings.S3_BUCKET_NAME, assets_path, expires=3600
)
except S3Error as e:
if e.code == "NoSuchKey":
# The object does not exist
log.info(
f"Assets ZIP file not found for project {project_id}, task {task_id}."
)
presigned_url = None
else:
# An unexpected error occurred
log.error(f"An error occurred while accessing assets file: {e}")
raise HTTPException(status_code=500, detail=str(e))

return project_schemas.AssetsInfo(
project_id=str(project_id),
task_id=str(task_id),
image_count=image_count,
assets_url=presigned_url,
)
except Exception as e:
log.exception(f"An error occurred while retrieving assets info: {e}")
raise HTTPException(status_code=500, detail=str(e))
33 changes: 33 additions & 0 deletions src/backend/app/projects/project_routes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import uuid
from typing import Annotated, Optional
from uuid import UUID
import geojson
Expand All @@ -13,6 +14,7 @@
File,
Form,
Response,
BackgroundTasks,
)
from geojson_pydantic import FeatureCollection
from loguru import logger as log
Expand All @@ -28,6 +30,7 @@
from app.users.user_schemas import AuthUser
from app.tasks import task_schemas


router = APIRouter(
prefix=f"{settings.API_PREFIX}/projects",
responses={404: {"description": "Not found"}},
Expand Down Expand Up @@ -295,3 +298,33 @@ async def read_project(
):
"""Get a specific project and all associated tasks by ID."""
return project


@router.post("/process_imagery/{project_id}/{task_id}/", tags=["Image Processing"])
async def process_imagery(
task_id: uuid.UUID,
project: Annotated[
project_schemas.DbProject, Depends(project_deps.get_project_by_id)
],
user_data: Annotated[AuthUser, Depends(login_required)],
background_tasks: BackgroundTasks,
):
background_tasks.add_task(project_logic.process_drone_images, project.id, task_id)
return {"message": "Processing started"}


@router.get(
"/assets/{project_id}/{task_id}/",
tags=["Image Processing"],
response_model=project_schemas.AssetsInfo,
)
async def get_assets_info(
project: Annotated[
project_schemas.DbProject, Depends(project_deps.get_project_by_id)
],
task_id: uuid.UUID,
):
"""
Endpoint to get the number of images and the URL to download the assets for a given project and task.
"""
return await project_logic.get_project_info_from_s3(project.id, task_id)
7 changes: 7 additions & 0 deletions src/backend/app/projects/project_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,10 @@ class PresignedUrlRequest(BaseModel):
task_id: uuid.UUID
image_name: List[str]
expiry: int # Expiry time in hours


class AssetsInfo(BaseModel):
project_id: str
task_id: str
image_count: int
assets_url: Optional[str]
45 changes: 44 additions & 1 deletion src/backend/app/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def add_file_to_bucket(bucket_name: str, file_path: str, s3_path: str):
s3_path = f"/{s3_path}"

client = s3_client()
client.fput_object(bucket_name, file_path, s3_path)
client.fput_object(bucket_name, s3_path, file_path)


def add_obj_to_bucket(
Expand Down Expand Up @@ -167,3 +167,46 @@ def get_image_dir_url(bucket_name: str, image_dir: str):

except Exception as e:
log.error(f"Error checking directory existence: {str(e)}")


def list_objects_from_bucket(bucket_name: str, prefix: str):
"""List all objects in a bucket with a specified prefix.
Args:
bucket_name (str): The name of the S3 bucket.
prefix (str): The prefix to filter objects by.
Returns:
list: A list of objects in the bucket with the specified prefix.
"""
client = s3_client()
objects = client.list_objects(bucket_name, prefix=prefix, recursive=True)
return objects


def get_presigned_url(bucket_name: str, object_name: str, expires: int = 3600):
"""Generate a presigned URL for an object in an S3 bucket.
Args:
bucket_name (str): The name of the S3 bucket.
object_name (str): The name of the object in the bucket.
expires (int, optional): The time in seconds until the URL expires.
Defaults to 3600.
Returns:
str: The presigned URL to access the object.
"""
client = s3_client()
return client.presigned_get_object(bucket_name, object_name, expires=expires)


def get_object_metadata(bucket_name: str, object_name: str):
"""Get object metadata from an S3 bucket.
Args:
bucket_name (str): The name of the S3 bucket.
object_name (str): The name of the object in the bucket.
Returns:
dict: A dictionary containing metadata about the object.
"""
client = s3_client()
return client.stat_object(bucket_name, object_name)
Loading

0 comments on commit 5733871

Please sign in to comment.