Skip to content

Commit

Permalink
Update some functions and improve testing (#3277)
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong authored Jun 6, 2024
1 parent bc1b5cd commit 3bc5d8a
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 136 deletions.
2 changes: 1 addition & 1 deletion src/cfnlint/rules/functions/GetAtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _resolve_getatt(
) in PROVIDER_SCHEMA_MANAGER.get_resource_schemas_by_regions(
t, validator.context.regions
):
_, getatt_schema = schema.resolver.resolve_cfn_pointer(pointer)
getatt_schema = schema.resolver.resolve_cfn_pointer(pointer)

if not getatt_schema.get("type") or not s.get("type"):
continue
Expand Down
2 changes: 1 addition & 1 deletion src/cfnlint/rules/functions/GetAttFormat.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def validate(
) in PROVIDER_SCHEMA_MANAGER.get_resource_schemas_by_regions(
t, validator.context.regions
):
_, getatt_schema = resource_schema.resolver.resolve_cfn_pointer(getatt_ptr)
getatt_schema = resource_schema.resolver.resolve_cfn_pointer(getatt_ptr)
getatt_fmt = getatt_schema.get("format")
if getatt_fmt != fmt:
yield ValidationError(
Expand Down
44 changes: 7 additions & 37 deletions src/cfnlint/schema/resolver/_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def resolve(self, ref):

return url, self._cache(url)

def resolve_cfn_pointer(self, pointer: str) -> tuple[str, dict]:
def resolve_cfn_pointer(self, pointer: str) -> dict[str, Any]:
"""
Resolve the given cfn pointer.
Expand All @@ -174,7 +174,7 @@ def resolve_cfn_pointer(self, pointer: str) -> tuple[str, dict]:

# to handle custom resources
if pointer == "/properties/CfnLintAllTypes":
return pointer, {
return {
"type": [
"string",
"integer",
Expand All @@ -185,37 +185,15 @@ def resolve_cfn_pointer(self, pointer: str) -> tuple[str, dict]:
]
}
elif pointer == "/properties/CfnLintStringType":
return pointer, {
return {
"type": [
"string",
]
}

ref = pointer.lstrip("/").split("/") if pointer else []

return pointer, self._walk_cfn_pointer(schema, ref[1:])

def flatten_schema(self, schema: dict[str, Any]) -> dict[str, Any]:
"""
Flatten the schema used by the cloudformation-cli.
Ref takes precedence and will override
anything in the parent schema
"""
schema = schema.copy()
return self._flatten_schema(schema)

def _flatten_schema(self, schema: dict[str, Any]) -> dict[str, Any]:
r_schema = schema.copy()
for k, v in schema.items():
if k == "$ref":
r_schema.pop(k)
_, ref_schema = self.resolve(v)
r_schema = {**r_schema, **ref_schema}
elif k == "items":
r_schema[k] = self._flatten_schema(v)
return r_schema
return self._walk_cfn_pointer(schema, ref[1:])

@property
def resolution_scope(self):
Expand All @@ -224,7 +202,9 @@ def resolution_scope(self):
"""
return self._scopes_stack[-1]

def _walk_cfn_pointer(self, document, pointer):
def _walk_cfn_pointer(
self, document: dict[str, Any], pointer: Sequence[str]
) -> dict[str, Any]:
"""
Resolve a cfn ``pointer`` within the referenced ``document``.
Expand Down Expand Up @@ -253,12 +233,6 @@ def _walk_cfn_pointer(self, document, pointer):
if point == "*":
if "items" in document:
return self._walk_cfn_pointer(document["items"], pointer[1:])
if "additionalProperties" in document and isinstance(
document["additionalProperties"], dict
):
return self._walk_cfn_pointer(
document["additionalProperties"], pointer[1:]
)
return self._walk_cfn_pointer(document["properties"][point], pointer[1:])
except (TypeError, LookupError, KeyError, RefResolutionError) as e:
for c in ["anyOf", "allOf", "oneOf"]:
Expand Down Expand Up @@ -296,10 +270,6 @@ def resolve_fragment(self, document, fragment):
def find(key):
yield from _search_schema(document, _match_keyword(key))

for keyword in ["$anchor", "$dynamicAnchor"]:
for subschema in find(keyword):
if fragment == subschema[keyword]:
return subschema
for keyword in ["id", "$id"]:
for subschema in find(keyword):
if "#" + fragment == subschema[keyword]:
Expand Down
92 changes: 0 additions & 92 deletions test/unit/module/schema/resolver/test_flatten_schema.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,29 @@ def test_pointer(self):
resolver = RefResolver(self.schema)
self.assertEqual(
resolver.resolve_cfn_pointer("/properties/foo"),
("/properties/foo", {"type": "string"}),
{"type": "string"},
)
self.assertEqual(
resolver.resolve_cfn_pointer("/properties/fooBars/*/a"),
("/properties/fooBars/*/a", {"type": "string"}),
{"type": "string"},
)

# first one found is string (not boolean)
self.assertEqual(
resolver.resolve_cfn_pointer("/properties/anyOf/a"),
("/properties/anyOf/a", {"type": "string"}),
{"type": "string"},
)

# second option in anyOf has the property we are looking for
self.assertEqual(
resolver.resolve_cfn_pointer("/properties/anyOf/c"),
("/properties/anyOf/c", {"type": "boolean"}),
{"type": "boolean"},
)

# refs are handled first
self.assertEqual(
resolver.resolve_cfn_pointer("/properties/refFirst/a"),
("/properties/refFirst/a", {"type": "string"}),
{"type": "string"},
)

with self.assertRaises(RefResolutionError):
Expand Down
102 changes: 102 additions & 0 deletions test/unit/module/schema/resolver/test_resolve_from_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

import logging

import pytest

from cfnlint.schema._schema import Schema
from cfnlint.schema.resolver._exceptions import RefResolutionError

LOGGER = logging.getLogger("cfnlint.schema.manager")
LOGGER.disabled = True


@pytest.fixture
def resource_schema():
return {
"additionalProperties": False,
"definitions": {
"ListItem": {"description": "A a list Id.", "type": "string"},
"ResourceArn": {
"description": "A resource ARN.",
"maxLength": 256,
"minLength": 1,
"pattern": "^arn:aws.*$",
"type": "string",
},
"AList": {
"items": [
{"type": "string"},
{"type": "boolean"},
],
"type": "array",
},
},
"properties": {
"Name": {
"maxLength": 128,
"minLength": 1,
"pattern": "^[a-zA-Z0-9-]+$",
"type": "string",
},
"List": {
"insertionOrder": False,
"items": {"$ref": "#/definitions/ListItem"},
"type": "array",
},
"Arn": {"$ref": "#/definitions/ResourceArn"},
"Id": {
"maxLength": 36,
"minLength": 36,
"pattern": "^([0-9a-f]{8})-([0-9a-f]{4}-){3}([0-9a-f]{12})$",
"type": "string",
},
},
"readOnlyProperties": [
"/properties/Arn",
"/properties/Id",
"/properties/List",
],
"typeName": "AWS::NetworkFirewall::Firewall",
}


def test_schema(resource_schema):

schema = Schema(schema=resource_schema)

assert schema.resolver.resolve_from_url("#/") == resource_schema

assert schema.resolver.resolve_from_url("#/properties/Name") == {
"maxLength": 128,
"minLength": 1,
"pattern": "^[a-zA-Z0-9-]+$",
"type": "string",
}

assert schema.resolver.resolve_from_url("#/definitions/ListItem") == {
"description": "A a list Id.",
"type": "string",
}
assert schema.resolver.resolve_from_url("#/definitions/AList/items/0") == {
"type": "string"
}
assert schema.resolver.resolve_from_url("#/definitions/AList/items/1") == {
"type": "boolean"
}

with pytest.raises(RefResolutionError):
schema.resolver.resolve_from_url("#/definitions/AList/items/2")

with pytest.raises(RefResolutionError):
schema.resolver.resolve_from_url("#/properties/bar/key")

with pytest.raises(RefResolutionError):
schema.resolver.resolve_from_url("test#/properties/bar/key")

with pytest.raises(RefResolutionError):
schema.resolver.pop_scope()
schema.resolver.pop_scope()
4 changes: 4 additions & 0 deletions test/unit/rules/resources/cloudfront/test_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def validator():
"*.example.com",
[],
),
(
{},
[],
),
(
"email.*.example.com",
[
Expand Down
5 changes: 5 additions & 0 deletions test/unit/rules/resources/iam/test_role_arn_pattern.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ def _message_errors(name, arn, errors, **kwargs):
)
],
),
(
"Valid but wrong type",
{},
[],
),
(
"Valid Arn",
"arn:aws:iam::123456789012:role/test",
Expand Down

0 comments on commit 3bc5d8a

Please sign in to comment.