Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize ACS pipeline for runtime and memory #5832

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 32 additions & 42 deletions pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

from aiofiles import os as aos
from asgiref.sync import sync_to_async
from django.db.models import Prefetch, prefetch_related_objects, Q
from django.db.models import Prefetch, prefetch_related_objects

from pulpcore.plugin.exceptions import UnsupportedDigestValidationError
from pulpcore.plugin.models import (
AlternateContentSource,
Artifact,
ContentArtifact,
ProgressReport,
Expand Down Expand Up @@ -472,52 +471,43 @@ class ACSArtifactHandler(Stage):

async def run(self):
async for batch in self.batches():
acs_query = AlternateContentSource.objects.filter(pulp_domain=self.domain)
acs_exists = await acs_query.aexists()
if acs_exists:
# Gather batch d_artifact checksums
batch_checksums = defaultdict(list)
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, cks_type):
batch_checksums[cks_type].append(
getattr(d_artifact.artifact, cks_type)
)

batch_query = Q()
for checksum_type in batch_checksums.keys():
batch_query.add(
Q(**{f"{checksum_type}__in": batch_checksums[checksum_type]}), Q.OR
)
# Gather batch d_artifact checksums
batch_checksums = defaultdict(list)
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for cks_type in d_artifact.artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, cks_type):
batch_checksums[cks_type].append(getattr(d_artifact.artifact, cks_type))

existing_ras_dict = dict()
for checksum_type in batch_checksums.keys():
existing_ras = (
RemoteArtifact.objects.acs()
.filter(batch_query)
.only("url", "remote")
.filter(**{f"{checksum_type}__in": batch_checksums[checksum_type]})
.only("url", checksum_type, "remote")
.select_related("remote")
)
existing_ras_dict = dict()
async for ra in existing_ras:
for c_type in Artifact.COMMON_DIGEST_FIELDS:
checksum = await sync_to_async(getattr)(ra, c_type)
# pick the first occurence of RA from ACS
if checksum and checksum not in existing_ras_dict:
existing_ras_dict[checksum] = {
"remote": ra.remote,
"url": ra.url,
}
# todo: we could probably get rid of this select_related by separating
# out the remote query
async for ra in existing_ras.aiterator():
checksum = getattr(ra, checksum_type)
# pick the first occurence of RA from ACS
if checksum not in existing_ras_dict:
existing_ras_dict[checksum] = {
"remote": ra.remote,
"url": ra.url,
}

for d_content in batch:
for d_artifact in d_content.d_artifacts:
for checksum_type in Artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, checksum_type):
checksum = getattr(d_artifact.artifact, checksum_type)
if checksum in existing_ras_dict:
d_artifact.urls = [
existing_ras_dict[checksum]["url"]
] + d_artifact.urls
d_artifact.remote = existing_ras_dict[checksum]["remote"]
for d_content in batch:
for d_artifact in d_content.d_artifacts:
for checksum_type in Artifact.COMMON_DIGEST_FIELDS:
if getattr(d_artifact.artifact, checksum_type):
checksum = getattr(d_artifact.artifact, checksum_type)
if checksum in existing_ras_dict:
d_artifact.urls = [
existing_ras_dict[checksum]["url"]
] + d_artifact.urls
d_artifact.remote = existing_ras_dict[checksum]["remote"]

for d_content in batch:
await self.put(d_content)
10 changes: 6 additions & 4 deletions pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import asyncio
import tempfile

from .api import create_pipeline, EndStage
from .artifact_stages import (
from pulpcore.plugin.models import AlternateContentSource
from pulpcore.plugin.stages.api import create_pipeline, EndStage
from pulpcore.plugin.stages.artifact_stages import (
ACSArtifactHandler,
ArtifactDownloader,
ArtifactSaver,
QueryExistingArtifacts,
RemoteArtifactSaver,
)
from .content_stages import (
from pulpcore.plugin.stages.content_stages import (
ContentAssociation,
ContentSaver,
QueryExistingContents,
ResolveContentFutures,
)
from pulpcore.plugin.util import get_domain_pk


class DeclarativeVersion:
Expand Down Expand Up @@ -131,7 +133,7 @@ def pipeline_stages(self, new_version):
self.first_stage,
QueryExistingArtifacts(),
]
if self.acs:
if self.acs and AlternateContentSource.objects.filter(pulp_domain=get_domain_pk()).exists():
pipeline.append(ACSArtifactHandler())
pipeline.extend(
[
Expand Down