From 30bffb9433838d35214bd8052c1e22757f14e46e Mon Sep 17 00:00:00 2001 From: Sav <74550527+sberss@users.noreply.github.com> Date: Tue, 21 May 2024 17:34:01 +0100 Subject: [PATCH 1/4] feat: Add fallback to calculate digest if no digest header present --- .../docker/registryv2/DockerRegistryV2Abstract.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala b/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala index 234b762429d..4c246ed19fe 100644 --- a/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala +++ b/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala @@ -9,6 +9,7 @@ import cromwell.docker._ import cromwell.docker.registryv2.DockerRegistryV2Abstract._ import io.circe.Decoder import io.circe.generic.auto._ +import org.apache.commons.codec.digest.DigestUtils import org.http4s.Uri.{Authority, Scheme} import org.http4s._ import org.http4s.circe._ @@ -316,7 +317,8 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi } private def getDigestFromResponse(response: Response[IO]): IO[DockerHashResult] = response match { - case Status.Successful(r) => extractDigestFromHeaders(r.headers) + case Status.Successful(r) => + extractDigestFromHeaders(r.headers).handleErrorWith(_ => calculateDigestFromBody(r.bodyText)) case Status.Unauthorized(r) => r.as[String].flatMap(body => IO.raiseError(new Unauthorized(r.status.toString + " " + body))) case Status.NotFound(r) => r.as[String].flatMap(body => IO.raiseError(new NotFound(r.status.toString + " " + body))) @@ -329,4 +331,10 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi case Some(digest) => IO.fromEither(DockerHashResult.fromString(digest.value).toEither) case None => IO.raiseError(new Exception(s"Manifest response did not have a digest header")) } + + private def calculateDigestFromBody(body: fs2.Stream[IO, String]): IO[DockerHashResult] = + body.compile.string + .map(s => "sha256:" + DigestUtils.sha256Hex(s)) + .map(DockerHashResult.fromString) + .flatMap(IO.fromTry) } From c089e4ee194c5eb946b2d3f5dd8d8f4cbe575b87 Mon Sep 17 00:00:00 2001 From: Sav <74550527+sberss@users.noreply.github.com> Date: Mon, 3 Jun 2024 11:10:07 +0100 Subject: [PATCH 2/4] feat: Allow overriding authentication scheme --- .../docker/registryv2/DockerRegistryV2Abstract.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala b/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala index 4c246ed19fe..bb1b717209f 100644 --- a/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala +++ b/dockerHashing/src/main/scala/cromwell/docker/registryv2/DockerRegistryV2Abstract.scala @@ -157,6 +157,11 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi .handleErrorWith(tryOCIManifest) } + /** + * Gets the authorization scheme to use for the token request. + */ + protected def getAuthorizationScheme(dockerImageIdentifier: DockerImageIdentifier): AuthScheme = AuthScheme.Bearer + /** * Returns true if this flow is able to process this docker image, * false otherwise @@ -241,7 +246,7 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi manifestHeader: Accept ): IO[Request[IO]] = { val authorizationHeader: Option[Authorization] = - token.map(t => Authorization(Credentials.Token(AuthScheme.Bearer, t))) + token.map(t => Authorization(Credentials.Token(getAuthorizationScheme(imageId), t))) val request = Method.GET( buildManifestUri(imageId), List( From 9ebc305eddaa1fb0744dcf889123e4680f1a0a78 Mon Sep 17 00:00:00 2001 From: Sav <74550527+sberss@users.noreply.github.com> Date: Mon, 3 Jun 2024 11:10:46 +0100 Subject: [PATCH 3/4] feat: Add remote hashing for AWS ECR --- core/src/main/resources/reference.conf | 7 ++ .../docker/DockerImageIdentifier.scala | 5 +- .../cromwell/docker/DockerInfoActor.scala | 2 + .../aws/AwsElasticContainerRegistry.scala | 78 +++++++++++++++++++ project/Dependencies.scala | 2 + 5 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/aws/AwsElasticContainerRegistry.scala diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 4d5f280079a..c1bfdb168aa 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -416,6 +416,13 @@ docker { max-retries = 3 // Supported registries (Docker Hub, Google, Quay) can have additional configuration set separately + aws { + throttle { + number-of-requests = 1000 + per = 100 seconds + } + num-threads = 10 + } azure { // Worst case `ReadOps per minute` value from official docs // https://github.com/MicrosoftDocs/azure-docs/blob/main/includes/container-registry-limits.md diff --git a/dockerHashing/src/main/scala/cromwell/docker/DockerImageIdentifier.scala b/dockerHashing/src/main/scala/cromwell/docker/DockerImageIdentifier.scala index 7c20760b644..8432fb85e56 100644 --- a/dockerHashing/src/main/scala/cromwell/docker/DockerImageIdentifier.scala +++ b/dockerHashing/src/main/scala/cromwell/docker/DockerImageIdentifier.scala @@ -1,5 +1,6 @@ package cromwell.docker +import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry.isEcr import cromwell.docker.registryv2.flows.azure.AzureContainerRegistry import scala.util.{Failure, Success, Try} @@ -19,7 +20,9 @@ sealed trait DockerImageIdentifier { lazy val nameWithDefaultRepository = // In ACR, the repository is part of the registry domain instead of the path // e.g. `terrabatchdev.azurecr.io` - if (host.exists(_.contains(AzureContainerRegistry.domain))) + // In ECR, an image with no repository is supported + // e.g. 123456790.dkr.ecr.eu-west-2.amazonaws.com/example-tool + if (host.exists(_.contains(AzureContainerRegistry.domain)) || host.exists(isEcr)) image else repository.getOrElse("library") + s"/$image" diff --git a/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala b/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala index 3b3c0e5d7c5..e7bb5e7777b 100644 --- a/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala +++ b/dockerHashing/src/main/scala/cromwell/docker/DockerInfoActor.scala @@ -14,6 +14,7 @@ import cromwell.core.actor.StreamIntegration.{BackPressure, StreamContext} import cromwell.core.{Dispatcher, DockerConfiguration} import cromwell.docker.DockerInfoActor._ import cromwell.docker.registryv2.DockerRegistryV2Abstract +import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry import cromwell.docker.registryv2.flows.azure.AzureContainerRegistry import cromwell.docker.registryv2.flows.dockerhub.DockerHubRegistry import cromwell.docker.registryv2.flows.google.GoogleRegistry @@ -239,6 +240,7 @@ object DockerInfoActor { // To add a new registry, simply add it to that list List( + ("aws", { c: DockerRegistryConfig => new AwsElasticContainerRegistry(c) }), ("azure", { c: DockerRegistryConfig => new AzureContainerRegistry(c) }), ("dockerhub", { c: DockerRegistryConfig => new DockerHubRegistry(c) }), ("google", { c: DockerRegistryConfig => new GoogleRegistry(c) }), diff --git a/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/aws/AwsElasticContainerRegistry.scala b/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/aws/AwsElasticContainerRegistry.scala new file mode 100644 index 00000000000..c2aeda60b33 --- /dev/null +++ b/dockerHashing/src/main/scala/cromwell/docker/registryv2/flows/aws/AwsElasticContainerRegistry.scala @@ -0,0 +1,78 @@ +package cromwell.docker.registryv2.flows.aws + +import cats.effect.IO +import cromwell.docker.{DockerImageIdentifier, DockerInfoActor, DockerRegistryConfig} +import cromwell.docker.registryv2.DockerRegistryV2Abstract +import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry.{isEcr, isPublicEcr} +import org.http4s.{AuthScheme, Header} +import org.http4s.client.Client +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.ecr.EcrClient +import software.amazon.awssdk.services.ecrpublic.EcrPublicClient +import software.amazon.awssdk.services.ecrpublic.model.GetAuthorizationTokenRequest + +import scala.compat.java8.OptionConverters.RichOptionalGeneric + +class AwsElasticContainerRegistry(config: DockerRegistryConfig) extends DockerRegistryV2Abstract(config) { + + private lazy val ecrClient = EcrClient.create() + private lazy val ecrPublicClient = EcrPublicClient.builder().region(Region.US_EAST_1).build() + + override def getAuthorizationScheme(dockerImageIdentifier: DockerImageIdentifier): AuthScheme = + if (isPublicEcr(dockerImageIdentifier.hostAsString)) AuthScheme.Bearer else AuthScheme.Basic + + override def accepts(dockerImageIdentifier: DockerImageIdentifier): Boolean = + isEcr(dockerImageIdentifier.hostAsString) + + /** + * (e.g registry-1.docker.io) + */ + override def registryHostName(dockerImageIdentifier: DockerImageIdentifier): String = + dockerImageIdentifier.host.getOrElse("") + + /** + * (e.g auth.docker.io) + */ + override def authorizationServerHostName(dockerImageIdentifier: DockerImageIdentifier): String = + dockerImageIdentifier.host.getOrElse("") + + override def getToken( + dockerInfoContext: DockerInfoActor.DockerInfoContext + )(implicit client: Client[IO]): IO[Option[String]] = + if (isPublicEcr(dockerInfoContext.dockerImageID.hostAsString)) getPublicEcrToken + else getPrivateEcrToken + + /** + * Builds the list of headers for the token request + */ + override def buildTokenRequestHeaders(dockerInfoContext: DockerInfoActor.DockerInfoContext): List[Header] = + List.empty + + private def getPublicEcrToken: IO[Option[String]] = + IO( + Option( + ecrPublicClient + .getAuthorizationToken(GetAuthorizationTokenRequest.builder().build()) + .authorizationData() + .authorizationToken() + ) + ) + + private def getPrivateEcrToken: IO[Option[String]] = + IO( + ecrClient + .getAuthorizationToken() + .authorizationData() + .stream() + .findFirst() + .asScala + .map(_.authorizationToken()) + ) + +} + +object AwsElasticContainerRegistry { + def isEcr(host: String): Boolean = isPublicEcr(host) || isPrivateEcr(host) + def isPublicEcr(host: String): Boolean = host.contains("public.ecr.aws") + def isPrivateEcr(host: String): Boolean = host.contains("amazonaws.com") +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b1acd1e30aa..e68021d7dac 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -402,6 +402,8 @@ object Dependencies { "batch", "core", "cloudwatchlogs", + "ecr", + "ecrpublic", "s3", "sts", ).map(artifactName => "software.amazon.awssdk" % artifactName % awsSdkV) From 45dea2731c5ca1d34cad565f1f016b1745e0e2c9 Mon Sep 17 00:00:00 2001 From: Sav <74550527+sberss@users.noreply.github.com> Date: Tue, 21 May 2024 17:54:38 +0100 Subject: [PATCH 4/4] docs: update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b2e78fac2c..a7acfa87513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,10 @@ The `IX_WORKFLOW_STORE_ENTRY_WS` index is removed from `WORKFLOW_STORE_ENTRY`. The index had low cardinality and workflow pickup is faster without it. Migration time depends on workflow store size, but should be very fast for most installations. Terminal workflows are removed from the workflow store, so only running workflows contribute to the cost. +### AWS ECR Remote Hashing Support + +Cromwell now supports remote hashing for both public and private AWS ECR containers. + ## 87 Release Notes ### GCP Batch