Skip to content

Commit

Permalink
Reduce executor code for docker (#4438)
Browse files Browse the repository at this point in the history
* Reduce executor code for docker

* Fix pylint errors and move import/export image

* Fix test and a couple other risky executor calls

* Fix dataclass and return

* Fix test case and add one for corrupt docker

* Add some coverage

* Undo changes to docker manager startup
  • Loading branch information
mdegat01 authored Jul 18, 2023
1 parent 1f940a0 commit 1f92ab4
Show file tree
Hide file tree
Showing 34 changed files with 970 additions and 848 deletions.
9 changes: 7 additions & 2 deletions supervisor/addons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ async def repair(self) -> None:
async def sync_dns(self) -> None:
"""Sync add-ons DNS names."""
# Update hosts
add_host_coros: list[Awaitable[None]] = []
for addon in self.installed:
try:
if not await addon.instance.is_running():
Expand All @@ -448,10 +449,14 @@ async def sync_dns(self) -> None:
)
capture_exception(err)
else:
self.sys_plugins.dns.add_host(
ipv4=addon.ip_address, names=[addon.hostname], write=False
add_host_coros.append(
self.sys_plugins.dns.add_host(
ipv4=addon.ip_address, names=[addon.hostname], write=False
)
)

await asyncio.gather(*add_host_coros)

# Write hosts files
with suppress(CoreDNSError):
self.sys_plugins.dns.write_hosts()
41 changes: 22 additions & 19 deletions supervisor/backups/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ async def _addon_restore(addon_slug: str) -> Awaitable[None] | None:
async def store_folders(self, folder_list: list[str]):
"""Backup Supervisor data into backup."""

def _folder_save(name: str):
async def _folder_save(name: str):
"""Take backup of a folder."""
slug_name = name.replace("/", "_")
tar_name = Path(
Expand All @@ -434,30 +434,33 @@ def _folder_save(name: str):
_LOGGER.warning("Can't find backup folder %s", name)
return

# Take backup
_LOGGER.info("Backing up folder %s", name)
with SecureTarFile(
tar_name, "w", key=self._key, gzip=self.compressed, bufsize=BUF_SIZE
) as tar_file:
atomic_contents_add(
tar_file,
origin_dir,
excludes=[
bound.bind_mount.local_where.as_posix()
for bound in self.sys_mounts.bound_mounts
if bound.bind_mount.local_where
],
arcname=".",
)

_LOGGER.info("Backup folder %s done", name)
def _save() -> None:
# Take backup
_LOGGER.info("Backing up folder %s", name)
with SecureTarFile(
tar_name, "w", key=self._key, gzip=self.compressed, bufsize=BUF_SIZE
) as tar_file:
atomic_contents_add(
tar_file,
origin_dir,
excludes=[
bound.bind_mount.local_where.as_posix()
for bound in self.sys_mounts.bound_mounts
if bound.bind_mount.local_where
],
arcname=".",
)

_LOGGER.info("Backup folder %s done", name)

await self.sys_run_in_executor(_save)
self._data[ATTR_FOLDERS].append(name)

# Save folder sequential
# avoid issue on slow IO
for folder in folder_list:
try:
await self.sys_run_in_executor(_folder_save, folder)
await _folder_save(folder)
except (tarfile.TarError, OSError) as err:
raise BackupError(
f"Can't backup folder {folder}: {str(err)}", _LOGGER.error
Expand Down
2 changes: 1 addition & 1 deletion supervisor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async def setup(self):
self._adjust_system_datetime(),
# Load mounts
self.sys_mounts.load(),
# Start docker monitoring
# Load Docker manager
self.sys_docker.load(),
# Load Plugins container
self.sys_plugins.load(),
Expand Down
10 changes: 7 additions & 3 deletions supervisor/coresys.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
from collections.abc import Callable, Coroutine
from datetime import datetime
from functools import partial
import logging
import os
from types import MappingProxyType
Expand Down Expand Up @@ -520,9 +521,12 @@ def now(self) -> datetime:
return datetime.now(get_time_zone(self.timezone) or UTC)

def run_in_executor(
self, funct: Callable[..., T], *args: Any
self, funct: Callable[..., T], *args: tuple[Any], **kwargs: dict[str, Any]
) -> Coroutine[Any, Any, T]:
"""Add an job to the executor pool."""
if kwargs:
funct = partial(funct, **kwargs)

return self.loop.run_in_executor(None, funct, *args)

def create_task(self, coroutine: Coroutine) -> asyncio.Task:
Expand Down Expand Up @@ -700,10 +704,10 @@ def now(self) -> datetime:
return self.coresys.now()

def sys_run_in_executor(
self, funct: Callable[..., T], *args: Any
self, funct: Callable[..., T], *args: tuple[Any], **kwargs: dict[str, Any]
) -> Coroutine[Any, Any, T]:
"""Add an job to the executor pool."""
return self.coresys.run_in_executor(funct, *args)
return self.coresys.run_in_executor(funct, *args, **kwargs)

def sys_create_task(self, coroutine: Coroutine) -> asyncio.Task:
"""Create an async task."""
Expand Down
143 changes: 44 additions & 99 deletions supervisor/docker/addon.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Init file for Supervisor add-on Docker object."""
from __future__ import annotations

import asyncio
from collections.abc import Awaitable
from contextlib import suppress
from ipaddress import IPv4Address, ip_address
Expand Down Expand Up @@ -494,27 +493,25 @@ def mounts(self) -> list[Mount]:

return mounts

def _run(self) -> None:
"""Run Docker image.
Need run inside executor.
"""
if self._is_running():
async def _run(self) -> None:
"""Run Docker image."""
if await self.is_running():
return

# Security check
if not self.addon.protected:
_LOGGER.warning("%s running with disabled protected mode!", self.addon.name)

# Cleanup
self._stop()
await self._stop()

# Don't set a hostname if no separate UTS namespace is used
hostname = None if self.uts_mode else self.addon.hostname

# Create & Run container
try:
docker_container = self.sys_docker.run(
docker_container = await self.sys_run_in_executor(
self.sys_docker.run,
self.image,
tag=str(self.addon.version),
name=self.name,
Expand Down Expand Up @@ -553,7 +550,7 @@ def _run(self) -> None:

# Write data to DNS server
try:
self.sys_plugins.dns.add_host(
await self.sys_plugins.dns.add_host(
ipv4=self.ip_address, names=[self.addon.hostname]
)
except CoreDNSError as err:
Expand All @@ -566,29 +563,26 @@ def _run(self) -> None:
BusEvent.HARDWARE_NEW_DEVICE, self._hardware_events
)

def _update(
async def _update(
self, version: AwesomeVersion, image: str | None = None, latest: bool = False
) -> None:
"""Update a docker image.
Need run inside executor.
"""
"""Update a docker image."""
image = image or self.image

_LOGGER.info(
"Updating image %s:%s to %s:%s", self.image, self.version, image, version
)

# Update docker image
self._install(
await self._install(
version, image=image, latest=latest, need_build=self.addon.latest_need_build
)

# Stop container & cleanup
with suppress(DockerError):
self._stop()
await self._stop()

def _install(
async def _install(
self,
version: AwesomeVersion,
image: str | None = None,
Expand All @@ -597,29 +591,25 @@ def _install(
*,
need_build: bool | None = None,
) -> None:
"""Pull Docker image or build it.
Need run inside executor.
"""
"""Pull Docker image or build it."""
if need_build is None and self.addon.need_build or need_build:
self._build(version)
await self._build(version)
else:
super()._install(version, image, latest, arch)

def _build(self, version: AwesomeVersion) -> None:
"""Build a Docker container.
await super()._install(version, image, latest, arch)

Need run inside executor.
"""
async def _build(self, version: AwesomeVersion) -> None:
"""Build a Docker container."""
build_env = AddonBuild(self.coresys, self.addon)
if not build_env.is_valid:
_LOGGER.error("Invalid build environment, can't build this add-on!")
raise DockerError()

_LOGGER.info("Starting build for %s:%s", self.image, version)
try:
image, log = self.sys_docker.images.build(
use_config_proxy=False, **build_env.get_docker_args(version)
image, log = await self.sys_run_in_executor(
self.sys_docker.images.build,
use_config_proxy=False,
**build_env.get_docker_args(version),
)

_LOGGER.debug("Build %s:%s done: %s", self.image, version, log)
Expand All @@ -645,74 +635,36 @@ def _build(self, version: AwesomeVersion) -> None:
@process_lock
def export_image(self, tar_file: Path) -> Awaitable[None]:
"""Export current images into a tar file."""
return self.sys_run_in_executor(self._export_image, tar_file)

def _export_image(self, tar_file: Path) -> None:
"""Export current images into a tar file.
Need run inside executor.
"""
try:
image = self.sys_docker.api.get_image(f"{self.image}:{self.version}")
except (docker.errors.DockerException, requests.RequestException) as err:
_LOGGER.error("Can't fetch image %s: %s", self.image, err)
raise DockerError() from err

_LOGGER.info("Export image %s to %s", self.image, tar_file)
try:
with tar_file.open("wb") as write_tar:
for chunk in image:
write_tar.write(chunk)
except (OSError, requests.RequestException) as err:
_LOGGER.error("Can't write tar file %s: %s", tar_file, err)
raise DockerError() from err

_LOGGER.info("Export image %s done", self.image)
return self.sys_run_in_executor(
self.sys_docker.export_image, self.image, self.version, tar_file
)

@process_lock
def import_image(self, tar_file: Path) -> Awaitable[None]:
async def import_image(self, tar_file: Path) -> None:
"""Import a tar file as image."""
return self.sys_run_in_executor(self._import_image, tar_file)

def _import_image(self, tar_file: Path) -> None:
"""Import a tar file as image.
Need run inside executor.
"""
try:
with tar_file.open("rb") as read_tar:
docker_image_list = self.sys_docker.images.load(read_tar)

if len(docker_image_list) != 1:
_LOGGER.warning(
"Unexpected image count %d while importing image from tar",
len(docker_image_list),
)
return
docker_image = docker_image_list[0]
except (docker.errors.DockerException, OSError) as err:
_LOGGER.error("Can't import image %s: %s", self.image, err)
raise DockerError() from err

self._meta = docker_image.attrs
_LOGGER.info("Importing image %s and version %s", tar_file, self.version)
docker_image = await self.sys_run_in_executor(
self.sys_docker.import_image, tar_file
)
if docker_image:
self._meta = docker_image.attrs
_LOGGER.info("Importing image %s and version %s", tar_file, self.version)

with suppress(DockerError):
self._cleanup()
with suppress(DockerError):
await self._cleanup()

@process_lock
def write_stdin(self, data: bytes) -> Awaitable[None]:
async def write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin."""
return self.sys_run_in_executor(self._write_stdin, data)
if not await self.is_running():
raise DockerError()

await self.sys_run_in_executor(self._write_stdin, data)

def _write_stdin(self, data: bytes) -> None:
"""Write to add-on stdin.
Need run inside executor.
"""
if not self._is_running():
raise DockerError()

try:
# Load needed docker objects
container = self.sys_docker.containers.get(self.name)
Expand All @@ -730,15 +682,12 @@ def _write_stdin(self, data: bytes) -> None:
_LOGGER.error("Can't write to %s stdin: %s", self.name, err)
raise DockerError() from err

def _stop(self, remove_container=True) -> None:
"""Stop/remove Docker container.
Need run inside executor.
"""
async def _stop(self, remove_container: bool = True) -> None:
"""Stop/remove Docker container."""
# DNS
if self.ip_address != NO_ADDDRESS:
try:
self.sys_plugins.dns.delete_host(self.addon.hostname)
await self.sys_plugins.dns.delete_host(self.addon.hostname)
except CoreDNSError as err:
_LOGGER.warning("Can't update DNS for %s", self.name)
capture_exception(err)
Expand All @@ -748,21 +697,17 @@ def _stop(self, remove_container=True) -> None:
self.sys_bus.remove_listener(self._hw_listener)
self._hw_listener = None

super()._stop(remove_container)
await super()._stop(remove_container)

def _validate_trust(
async def _validate_trust(
self, image_id: str, image: str, version: AwesomeVersion
) -> None:
"""Validate trust of content."""
if not self.addon.signed:
return

checksum = image_id.partition(":")[2]
job = asyncio.run_coroutine_threadsafe(
self.sys_security.verify_content(self.addon.codenotary, checksum),
self.sys_loop,
)
job.result()
return await self.sys_security.verify_content(self.addon.codenotary, checksum)

@Job(conditions=[JobCondition.OS_AGENT], limit=JobExecutionLimit.SINGLE_WAIT)
async def _hardware_events(self, device: Device) -> None:
Expand Down
Loading

0 comments on commit 1f92ab4

Please sign in to comment.