Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS ECR Remote Hashing #7444

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ The `IX_WORKFLOW_STORE_ENTRY_WS` index is removed from `WORKFLOW_STORE_ENTRY`.

The index had low cardinality and workflow pickup is faster without it. Migration time depends on workflow store size, but should be very fast for most installations. Terminal workflows are removed from the workflow store, so only running workflows contribute to the cost.

### AWS ECR Remote Hashing Support

Cromwell now supports remote hashing for both public and private AWS ECR containers.

## 87 Release Notes

### GCP Batch
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ docker {
max-retries = 3

// Supported registries (Docker Hub, Google, Quay) can have additional configuration set separately
aws {
throttle {
number-of-requests = 1000
per = 100 seconds
}
num-threads = 10
}
azure {
// Worst case `ReadOps per minute` value from official docs
// https://github.com/MicrosoftDocs/azure-docs/blob/main/includes/container-registry-limits.md
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.docker

import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry.isEcr
import cromwell.docker.registryv2.flows.azure.AzureContainerRegistry

import scala.util.{Failure, Success, Try}
Expand All @@ -19,7 +20,9 @@ sealed trait DockerImageIdentifier {
lazy val nameWithDefaultRepository =
// In ACR, the repository is part of the registry domain instead of the path
// e.g. `terrabatchdev.azurecr.io`
if (host.exists(_.contains(AzureContainerRegistry.domain)))
// In ECR, an image with no repository is supported
// e.g. 123456790.dkr.ecr.eu-west-2.amazonaws.com/example-tool
if (host.exists(_.contains(AzureContainerRegistry.domain)) || host.exists(isEcr))
image
else
repository.getOrElse("library") + s"/$image"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import cromwell.core.actor.StreamIntegration.{BackPressure, StreamContext}
import cromwell.core.{Dispatcher, DockerConfiguration}
import cromwell.docker.DockerInfoActor._
import cromwell.docker.registryv2.DockerRegistryV2Abstract
import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry
import cromwell.docker.registryv2.flows.azure.AzureContainerRegistry
import cromwell.docker.registryv2.flows.dockerhub.DockerHubRegistry
import cromwell.docker.registryv2.flows.google.GoogleRegistry
Expand Down Expand Up @@ -239,6 +240,7 @@ object DockerInfoActor {

// To add a new registry, simply add it to that list
List(
("aws", { c: DockerRegistryConfig => new AwsElasticContainerRegistry(c) }),
("azure", { c: DockerRegistryConfig => new AzureContainerRegistry(c) }),
("dockerhub", { c: DockerRegistryConfig => new DockerHubRegistry(c) }),
("google", { c: DockerRegistryConfig => new GoogleRegistry(c) }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import cromwell.docker._
import cromwell.docker.registryv2.DockerRegistryV2Abstract._
import io.circe.Decoder
import io.circe.generic.auto._
import org.apache.commons.codec.digest.DigestUtils
import org.http4s.Uri.{Authority, Scheme}
import org.http4s._
import org.http4s.circe._
Expand Down Expand Up @@ -156,6 +157,11 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi
.handleErrorWith(tryOCIManifest)
}

/**
* Gets the authorization scheme to use for the token request.
*/
protected def getAuthorizationScheme(dockerImageIdentifier: DockerImageIdentifier): AuthScheme = AuthScheme.Bearer

/**
* Returns true if this flow is able to process this docker image,
* false otherwise
Expand Down Expand Up @@ -240,7 +246,7 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi
manifestHeader: Accept
): IO[Request[IO]] = {
val authorizationHeader: Option[Authorization] =
token.map(t => Authorization(Credentials.Token(AuthScheme.Bearer, t)))
token.map(t => Authorization(Credentials.Token(getAuthorizationScheme(imageId), t)))
val request = Method.GET(
buildManifestUri(imageId),
List(
Expand Down Expand Up @@ -316,7 +322,8 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi
}

private def getDigestFromResponse(response: Response[IO]): IO[DockerHashResult] = response match {
case Status.Successful(r) => extractDigestFromHeaders(r.headers)
case Status.Successful(r) =>
extractDigestFromHeaders(r.headers).handleErrorWith(_ => calculateDigestFromBody(r.bodyText))
case Status.Unauthorized(r) =>
r.as[String].flatMap(body => IO.raiseError(new Unauthorized(r.status.toString + " " + body)))
case Status.NotFound(r) => r.as[String].flatMap(body => IO.raiseError(new NotFound(r.status.toString + " " + body)))
Expand All @@ -329,4 +336,10 @@ abstract class DockerRegistryV2Abstract(override val config: DockerRegistryConfi
case Some(digest) => IO.fromEither(DockerHashResult.fromString(digest.value).toEither)
case None => IO.raiseError(new Exception(s"Manifest response did not have a digest header"))
}

private def calculateDigestFromBody(body: fs2.Stream[IO, String]): IO[DockerHashResult] =
body.compile.string
.map(s => "sha256:" + DigestUtils.sha256Hex(s))
.map(DockerHashResult.fromString)
.flatMap(IO.fromTry)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package cromwell.docker.registryv2.flows.aws

import cats.effect.IO
import cromwell.docker.{DockerImageIdentifier, DockerInfoActor, DockerRegistryConfig}
import cromwell.docker.registryv2.DockerRegistryV2Abstract
import cromwell.docker.registryv2.flows.aws.AwsElasticContainerRegistry.{isEcr, isPublicEcr}
import org.http4s.{AuthScheme, Header}
import org.http4s.client.Client
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.ecr.EcrClient
import software.amazon.awssdk.services.ecrpublic.EcrPublicClient
import software.amazon.awssdk.services.ecrpublic.model.GetAuthorizationTokenRequest

import scala.compat.java8.OptionConverters.RichOptionalGeneric

class AwsElasticContainerRegistry(config: DockerRegistryConfig) extends DockerRegistryV2Abstract(config) {

private lazy val ecrClient = EcrClient.create()
private lazy val ecrPublicClient = EcrPublicClient.builder().region(Region.US_EAST_1).build()

override def getAuthorizationScheme(dockerImageIdentifier: DockerImageIdentifier): AuthScheme =
if (isPublicEcr(dockerImageIdentifier.hostAsString)) AuthScheme.Bearer else AuthScheme.Basic

override def accepts(dockerImageIdentifier: DockerImageIdentifier): Boolean =
isEcr(dockerImageIdentifier.hostAsString)

/**
* (e.g registry-1.docker.io)
*/
override def registryHostName(dockerImageIdentifier: DockerImageIdentifier): String =
dockerImageIdentifier.host.getOrElse("")

/**
* (e.g auth.docker.io)
*/
override def authorizationServerHostName(dockerImageIdentifier: DockerImageIdentifier): String =
dockerImageIdentifier.host.getOrElse("")

override def getToken(
dockerInfoContext: DockerInfoActor.DockerInfoContext
)(implicit client: Client[IO]): IO[Option[String]] =
if (isPublicEcr(dockerInfoContext.dockerImageID.hostAsString)) getPublicEcrToken
else getPrivateEcrToken

/**
* Builds the list of headers for the token request
*/
override def buildTokenRequestHeaders(dockerInfoContext: DockerInfoActor.DockerInfoContext): List[Header] =
List.empty

private def getPublicEcrToken: IO[Option[String]] =
IO(
Option(
ecrPublicClient
.getAuthorizationToken(GetAuthorizationTokenRequest.builder().build())
.authorizationData()
.authorizationToken()
)
)

private def getPrivateEcrToken: IO[Option[String]] =
IO(
ecrClient
.getAuthorizationToken()
.authorizationData()
.stream()
.findFirst()
.asScala
.map(_.authorizationToken())
)

}

object AwsElasticContainerRegistry {
def isEcr(host: String): Boolean = isPublicEcr(host) || isPrivateEcr(host)
def isPublicEcr(host: String): Boolean = host.contains("public.ecr.aws")
def isPrivateEcr(host: String): Boolean = host.contains("amazonaws.com")
}
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ object Dependencies {
"batch",
"core",
"cloudwatchlogs",
"ecr",
"ecrpublic",
"s3",
"sts",
).map(artifactName => "software.amazon.awssdk" % artifactName % awsSdkV)
Expand Down
Loading