Skip to content

Commit

Permalink
fix typing, add lock for updating repository on creation
Browse files Browse the repository at this point in the history
  • Loading branch information
dfangl committed Feb 29, 2024
1 parent 0428444 commit 194db31
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions moto/ecr/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,10 @@ def __init__(self, region_name: str, account_id: str):
self.registry_policy: Optional[str] = None
self.replication_config: Dict[str, Any] = {"rules": []}
self.repositories: Dict[str, Repository] = {}
self.registry_scanning_configuration = {"scanType": "BASIC", "rules": []}
self.registry_scanning_configuration: Dict[str, Any] = {
"scanType": "BASIC",
"rules": [],
}
self.registry_scanning_configuration_update_lock = threading.RLock()
self.tagger = TaggingService(tag_name="tags")

Expand Down Expand Up @@ -499,14 +502,17 @@ def create_repository(
self.tagger.tag_resource(repository.arn, tags)

# check if any of the registry scanning policies applies to the repository
for rule in self.registry_scanning_configuration["rules"]:
for repo_filter in rule["repositoryFilters"]:
if self._match_repository_filter(
repo_filter["filter"], repository_name
):
repository.scanning_config["scanFrequency"] = rule["scanFrequency"]
# AWS testing seems to indicate that this is always overwritten
repository.scanning_config["appliedScanFilters"] = [repo_filter]
with self.registry_scanning_configuration_update_lock:
for rule in self.registry_scanning_configuration["rules"]:
for repo_filter in rule["repositoryFilters"]:
if self._match_repository_filter(
repo_filter["filter"], repository_name
):
repository.scanning_config["scanFrequency"] = rule[
"scanFrequency"
]
# AWS testing seems to indicate that this is always overwritten
repository.scanning_config["appliedScanFilters"] = [repo_filter]

return repository

Expand Down Expand Up @@ -1130,9 +1136,11 @@ def put_replication_configuration(

return {"replicationConfiguration": replication_config}

def _match_repository_filter(self, filter: str, repository_name: str):
def _match_repository_filter(self, filter: str, repository_name: str) -> bool:
filter_regex = filter.replace("*", ".*")
return filter in repository_name or re.match(filter_regex, repository_name)
return filter in repository_name or bool(
re.match(filter_regex, repository_name)
)

def get_registry_scanning_configuration(self) -> Dict[str, Any]:
return self.registry_scanning_configuration
Expand Down

0 comments on commit 194db31

Please sign in to comment.