From af18374d38d4a8b13b18b85f0c8f71122a39ad72 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Fri, 23 Aug 2024 17:52:21 +0000 Subject: [PATCH] IAM: generate_credentials_report() now shows active certificates (#8032) --- moto/iam/models.py | 18 +- tests/test_iam/__init__.py | 47 ++-- tests/test_iam/test_iam.py | 68 ------ tests/test_iam/test_iam_cloudformation.py | 4 +- .../test_iam/test_iam_signing_certificates.py | 211 ++++++++++++++++++ 5 files changed, 263 insertions(+), 85 deletions(-) create mode 100644 tests/test_iam/test_iam_signing_certificates.py diff --git a/moto/iam/models.py b/moto/iam/models.py index b052aa4aa7a1..172e55aa82b4 100644 --- a/moto/iam/models.py +++ b/moto/iam/models.py @@ -1519,6 +1519,13 @@ def to_csv(self) -> str: else self.access_keys[1].last_used.strftime(date_format) ) + cert1_active = cert2_active = False + if len(self.signing_certificates) > 0: + cert1 = list(self.signing_certificates.values())[0] + cert1_active = cert1.status == "Active" + if len(self.signing_certificates) > 1: + cert2 = list(self.signing_certificates.values())[1] + cert2_active = cert2.status == "Active" fields = [ self.name, self.arn, @@ -1538,9 +1545,9 @@ def to_csv(self) -> str: access_key_2_last_used, "not_supported", "not_supported", - "false", + "true" if cert1_active else "false", "N/A", - "false", + "true" if cert2_active else "false", "N/A", ] return ",".join(fields) + "\n" @@ -2719,6 +2726,13 @@ def upload_signing_certificate( except Exception: raise MalformedCertificate(body) + if ( + len(user.signing_certificates) + >= self.account_summary._signing_certificates_per_user_quota + ): + raise IAMLimitExceededException( + "Cannot exceed quota for CertificatesPerUser: 2" + ) user.signing_certificates[cert_id] = SigningCertificate( cert_id, user_name, body ) diff --git a/tests/test_iam/__init__.py b/tests/test_iam/__init__.py index 466103fb01d4..402a3cf5c4d4 100644 --- a/tests/test_iam/__init__.py +++ b/tests/test_iam/__init__.py @@ -1,10 +1,13 @@ -import os from functools import wraps +from uuid import uuid4 + +import boto3 from moto import mock_aws +from tests import allow_aws_request -def iam_aws_verified(func): +def iam_aws_verified(create_user: bool = False): """ Function that is verified to work against AWS. Can be run against AWS at any time by setting: @@ -13,16 +16,34 @@ def iam_aws_verified(func): If this environment variable is not set, the function runs in a `mock_aws` context. """ - @wraps(func) - def pagination_wrapper(): - allow_aws_request = ( - os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true" - ) + def inner(func): + def create_user_and_invoke_test(): + client = boto3.client("iam", "us-east-1") + user_name = f"testuser_{str(uuid4())[0:6]}" + try: + if create_user: + client.create_user(UserName=user_name) + return func(user_name=user_name) + finally: + if create_user: + certificates = client.list_signing_certificates(UserName=user_name)[ + "Certificates" + ] + + for cert in certificates: + client.delete_signing_certificate( + UserName=user_name, CertificateId=cert["CertificateId"] + ) + client.delete_user(UserName=user_name) + + @wraps(func) + def pagination_wrapper(): + if allow_aws_request(): + return create_user_and_invoke_test() + else: + with mock_aws(): + return create_user_and_invoke_test() - if allow_aws_request: - return func() - else: - with mock_aws(): - return func() + return pagination_wrapper - return pagination_wrapper + return inner diff --git a/tests/test_iam/test_iam.py b/tests/test_iam/test_iam.py index 507377bc35a8..310d547cfb1d 100644 --- a/tests/test_iam/test_iam.py +++ b/tests/test_iam/test_iam.py @@ -2542,74 +2542,6 @@ def test_get_account_authorization_details(): assert len(result["Policies"]) > 1 -@mock_aws -def test_signing_certs(): - client = boto3.client("iam", region_name="us-east-1") - - # Create the IAM user first: - client.create_user(UserName="testing") - - # Upload the cert: - resp = client.upload_signing_certificate( - UserName="testing", CertificateBody=MOCK_CERT - )["Certificate"] - cert_id = resp["CertificateId"] - - assert resp["UserName"] == "testing" - assert resp["Status"] == "Active" - assert resp["CertificateBody"] == MOCK_CERT - assert resp["CertificateId"] - - # Upload a the cert with an invalid body: - with pytest.raises(ClientError) as ce: - client.upload_signing_certificate( - UserName="testing", CertificateBody="notacert" - ) - assert ce.value.response["Error"]["Code"] == "MalformedCertificate" - - # Upload with an invalid user: - with pytest.raises(ClientError): - client.upload_signing_certificate( - UserName="notauser", CertificateBody=MOCK_CERT - ) - - # Update: - client.update_signing_certificate( - UserName="testing", CertificateId=cert_id, Status="Inactive" - ) - - with pytest.raises(ClientError): - client.update_signing_certificate( - UserName="notauser", CertificateId=cert_id, Status="Inactive" - ) - - fake_id_name = "x" * 32 - with pytest.raises(ClientError) as ce: - client.update_signing_certificate( - UserName="testing", CertificateId=fake_id_name, Status="Inactive" - ) - - assert ( - ce.value.response["Error"]["Message"] - == f"The Certificate with id {fake_id_name} cannot be found." - ) - - # List the certs: - resp = client.list_signing_certificates(UserName="testing")["Certificates"] - assert len(resp) == 1 - assert resp[0]["CertificateBody"] == MOCK_CERT - assert resp[0]["Status"] == "Inactive" # Changed with the update call above. - - with pytest.raises(ClientError): - client.list_signing_certificates(UserName="notauser") - - # Delete: - client.delete_signing_certificate(UserName="testing", CertificateId=cert_id) - - with pytest.raises(ClientError): - client.delete_signing_certificate(UserName="notauser", CertificateId=cert_id) - - @mock_aws() def test_create_saml_provider(): conn = boto3.client("iam", region_name="us-east-1") diff --git a/tests/test_iam/test_iam_cloudformation.py b/tests/test_iam/test_iam_cloudformation.py index 68968870b024..ae66a3fb6490 100644 --- a/tests/test_iam/test_iam_cloudformation.py +++ b/tests/test_iam/test_iam_cloudformation.py @@ -1602,8 +1602,8 @@ def test_iam_roles(): @pytest.mark.aws_verified -@iam_aws_verified -def test_delete_instance_profile_with_existing_role(): +@iam_aws_verified() +def test_delete_instance_profile_with_existing_role(user_name=None): region = "us-east-1" iam = boto3.client("iam", region_name=region) iam_role_name = f"moto_{str(uuid4())[0:6]}" diff --git a/tests/test_iam/test_iam_signing_certificates.py b/tests/test_iam/test_iam_signing_certificates.py new file mode 100644 index 000000000000..d3a7f2fb4d67 --- /dev/null +++ b/tests/test_iam/test_iam_signing_certificates.py @@ -0,0 +1,211 @@ +from datetime import timedelta + +import boto3 +import cryptography +import pytest +from botocore.exceptions import ClientError +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509 import Name, NameAttribute +from cryptography.x509.oid import NameOID + +from moto.core.utils import utcnow +from tests.test_iam import iam_aws_verified + + +@iam_aws_verified(create_user=True) +@pytest.mark.aws_verified +def test_signing_certs(user_name=None): + client = boto3.client("iam", region_name="us-east-1") + certificate = create_certificate() + + # Upload the cert: + resp = client.upload_signing_certificate( + UserName=user_name, CertificateBody=certificate + )["Certificate"] + cert_id = resp["CertificateId"] + + assert resp["UserName"] == user_name + assert resp["Status"] == "Active" + assert resp["CertificateBody"] == certificate + assert resp["CertificateId"] + + # Update: + client.update_signing_certificate( + UserName=user_name, CertificateId=cert_id, Status="Inactive" + ) + + # List the certs: + resp = client.list_signing_certificates(UserName=user_name)["Certificates"] + assert len(resp) == 1 + assert resp[0]["CertificateBody"] == certificate + assert resp[0]["Status"] == "Inactive" # Changed with the update call above. + + # Delete: + client.delete_signing_certificate(UserName=user_name, CertificateId=cert_id) + + +@iam_aws_verified(create_user=True) +@pytest.mark.aws_verified +def test_create_too_many_certificates(user_name=None): + client = boto3.client("iam", region_name="us-east-1") + certificate1 = create_certificate() + certificate2 = create_certificate() + certificate3 = create_certificate() + + # Upload two certs + cert_id1 = client.upload_signing_certificate( + UserName=user_name, CertificateBody=certificate1 + )["Certificate"]["CertificateId"] + cert_id2 = client.upload_signing_certificate( + UserName=user_name, CertificateBody=certificate2 + )["Certificate"]["CertificateId"] + assert cert_id1 != cert_id2 + + # Verify that a third certificate exceeds the limit + with pytest.raises(ClientError) as exc: + client.upload_signing_certificate( + UserName=user_name, CertificateBody=certificate3 + ) + err = exc.value.response["Error"] + assert err["Code"] == "LimitExceeded" + assert err["Message"] == "Cannot exceed quota for CertificatesPerUser: 2" + + +@iam_aws_verified(create_user=True) +def test_retrieve_cert_details_using_credentials_report(user_name=None): + """ + AWS caches the Credentials Report for 4 hours + Once you generate the report, you can request the report over and over again, but you'll always get that same report back in the next 4 hours + # That makes it slightly impossible to verify this against AWS - that's why there is no `aws_verified`-marker + """ + client = boto3.client("iam", region_name="us-east-1") + certificate1 = create_certificate() + certificate2 = create_certificate() + + # Upload the cert: + cert_id1 = client.upload_signing_certificate( + UserName=user_name, CertificateBody=certificate1 + )["Certificate"]["CertificateId"] + + result = client.generate_credential_report() + while result["State"] != "COMPLETE": + result = client.generate_credential_report() + report = client.get_credential_report()["Content"].decode("utf-8") + + our_line = next(line for line in report.split("\n") if line.startswith(user_name)) + cert1_active = our_line.split(",")[-4] + assert cert1_active == "true" + cert2_active = our_line.split(",")[-2] + assert cert2_active == "false" + + client.upload_signing_certificate(UserName=user_name, CertificateBody=certificate2) + + result = client.generate_credential_report() + while result["State"] != "COMPLETE": + result = client.generate_credential_report() + report = client.get_credential_report()["Content"].decode("utf-8") + our_line = next(line for line in report.split("\n") if line.startswith(user_name)) + + cert1_active = our_line.split(",")[-4] + assert cert1_active == "true" + cert2_active = our_line.split(",")[-2] + assert cert2_active == "true" + + # Set Certificate to Inactive + client.update_signing_certificate( + UserName=user_name, CertificateId=cert_id1, Status="Inactive" + ) + + # Verify the credential report is updated + result = client.generate_credential_report() + while result["State"] != "COMPLETE": + result = client.generate_credential_report() + report = client.get_credential_report()["Content"].decode("utf-8") + our_line = next(line for line in report.split("\n") if line.startswith(user_name)) + + cert1_active = our_line.split(",")[-4] + assert cert1_active == "false" + cert2_active = our_line.split(",")[-2] + assert cert2_active == "true" + + +@iam_aws_verified() +@pytest.mark.aws_verified +def test_upload_cert_for_unknown_user(user_name=None): + client = boto3.client("iam", region_name="us-east-1") + with pytest.raises(ClientError) as exc: + client.upload_signing_certificate( + UserName="notauser", CertificateBody=create_certificate() + ) + err = exc.value.response["Error"] + assert err["Code"] == "NoSuchEntity" + assert err["Message"] == "The user with name notauser cannot be found." + + with pytest.raises(ClientError) as exc: + client.update_signing_certificate( + UserName="notauser", + CertificateId="asdfasdfasdfasdfasdfasdfasdasdf", + Status="Inactive", + ) + err = exc.value.response["Error"] + assert err["Code"] == "NoSuchEntity" + assert err["Message"] == "The user with name notauser cannot be found." + + with pytest.raises(ClientError) as exc: + client.list_signing_certificates(UserName="notauser") + err = exc.value.response["Error"] + assert err["Code"] == "NoSuchEntity" + assert err["Message"] == "The user with name notauser cannot be found." + + with pytest.raises(ClientError): + client.delete_signing_certificate(UserName="notauser", CertificateId="x" * 32) + err = exc.value.response["Error"] + assert err["Code"] == "NoSuchEntity" + assert err["Message"] == "The user with name notauser cannot be found." + + +@iam_aws_verified(create_user=True) +@pytest.mark.aws_verified +def test_upload_invalid_certificate(user_name=None): + client = boto3.client("iam", region_name="us-east-1") + with pytest.raises(ClientError) as ce: + client.upload_signing_certificate( + UserName=user_name, CertificateBody="notacert" + ) + assert ce.value.response["Error"]["Code"] == "MalformedCertificate" + + +@iam_aws_verified(create_user=True) +@pytest.mark.aws_verified +def test_update_unknown_certificate(user_name=None): + client = boto3.client("iam", region_name="us-east-1") + fake_id_name = "x" * 32 + with pytest.raises(ClientError) as ce: + client.update_signing_certificate( + UserName=user_name, CertificateId=fake_id_name, Status="Inactive" + ) + err = ce.value.response["Error"] + + assert err["Message"] == f"The Certificate with id {fake_id_name} cannot be found." + + +def create_certificate(): + key = rsa.generate_private_key(public_exponent=65537, key_size=2028) + cert_subject = [NameAttribute(NameOID.COMMON_NAME, "iam.amazonaws.com")] + issuer = [ + NameAttribute(NameOID.COUNTRY_NAME, "US"), + NameAttribute(NameOID.ORGANIZATION_NAME, "Amazon"), + NameAttribute(NameOID.COMMON_NAME, "Amazon RSA 2048 M01"), + ] + cert = ( + cryptography.x509.CertificateBuilder() + .subject_name(Name(cert_subject)) + .issuer_name(Name(issuer)) + .public_key(key.public_key()) + .serial_number(cryptography.x509.random_serial_number()) + .not_valid_before(utcnow()) + .not_valid_after(utcnow() + timedelta(days=365)) + .sign(key, hashes.SHA256()) + ) + return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8")