From 78f7057af795b1f0e48b1044c475b2b8dd896ed9 Mon Sep 17 00:00:00 2001 From: Meng Han <4225835+meng-han@users.noreply.github.com> Date: Wed, 16 Oct 2024 15:16:27 -0700 Subject: [PATCH] Support Custom CA for certificate issuing (#438) previously we have one giant class certificatemanager which has a certificatauthority class which is basically ACM PCA. Now we want to add support for custom ca, without impacting existing contracts/apis on `/v1/certificates prefix`. So a new base class is created so that both acm pca and custom ca can inherit from. other changes are mostly tests and settings along side this. --- confidant/routes/certificates.py | 87 ++-- .../services/certificate_authority/acmpca.py | 364 ++++++++++++++ .../certificateauthoritybase.py | 175 +++++++ .../certificate_authority/customca.py | 230 +++++++++ confidant/services/certificatemanager.py | 464 ++---------------- confidant/settings.py | 25 + .../confidant/routes/certificates_test.py | 91 ++-- .../certificate_authority/acmpca_test.py | 311 ++++++++++++ .../certificate_authority/customca_test.py | 249 ++++++++++ .../services/certificatemanager_test.py | 300 ----------- 10 files changed, 1482 insertions(+), 814 deletions(-) create mode 100644 confidant/services/certificate_authority/acmpca.py create mode 100644 confidant/services/certificate_authority/certificateauthoritybase.py create mode 100644 confidant/services/certificate_authority/customca.py create mode 100644 tests/unit/confidant/services/certificate_authority/acmpca_test.py create mode 100644 tests/unit/confidant/services/certificate_authority/customca_test.py delete mode 100644 tests/unit/confidant/services/certificatemanager_test.py diff --git a/confidant/routes/certificates.py b/confidant/routes/certificates.py index a5d839f0..daefb9d0 100644 --- a/confidant/routes/certificates.py +++ b/confidant/routes/certificates.py @@ -4,6 +4,10 @@ from confidant import authnz, settings from confidant.services import certificatemanager +from confidant.services.certificate_authority.certificateauthoritybase import ( + CertificateAuthorityNotFoundError, + CertificateNotReadyError, +) from confidant.schema.certificates import ( certificate_authority_response_schema, certificate_authorities_response_schema, @@ -69,7 +73,7 @@ def get_certificate(ca, cn): ''' try: ca_object = certificatemanager.get_ca(ca) - except certificatemanager.CertificateAuthorityNotFoundError: + except CertificateAuthorityNotFoundError: return jsonify({'error': 'Provided CA not found.'}), 404 san = request.args.getlist('san') @@ -83,8 +87,9 @@ def get_certificate(ca, cn): 'san': san, }, ): - msg = ('{} does not have access to get certificate cn {} against' - ' ca {}').format( + msg = ( + '{} does not have access to get certificate cn {} against' ' ca {}' + ).format( authnz.get_logged_in_user(), cn, ca, @@ -93,11 +98,10 @@ def get_certificate(ca, cn): return jsonify(error_msg), 403 logger.info( - 'get_certificate called on id={} for ca={} by user={}'.format( - cn, - ca, - logged_in_user, - ) + 'get_certificate called on id=%s for ca=%s by user=%s', + cn, + ca, + logged_in_user, ) validity = request.args.get( @@ -111,7 +115,7 @@ def get_certificate(ca, cn): validity, san, ) - except certificatemanager.CertificateNotReadyError: + except CertificateNotReadyError: # Ratelimit response for a locked certificate in the cache error_msg = 'Certificate being requested, please wait and try again.' response = jsonify(error_msg) @@ -173,13 +177,17 @@ def get_certificate_from_csr(ca): ''' try: ca_object = certificatemanager.get_ca(ca) - except certificatemanager.CertificateAuthorityNotFoundError: + except CertificateAuthorityNotFoundError: return jsonify({'error': 'Provided CA not found.'}), 404 data = request.get_json() + if not data or not data.get('csr'): - return jsonify( - {'error': 'csr must be provided in the POST body.'}, - ), 400 + return ( + jsonify( + {'error': 'csr must be provided in the POST body.'}, + ), + 400, + ) validity = data.get( 'validity', ca_object.settings['max_validity_days'], @@ -188,9 +196,12 @@ def get_certificate_from_csr(ca): csr = ca_object.decode_csr(data['csr']) except Exception: logger.exception('Failed to decode PEM csr') - return jsonify( - {'error': 'csr could not be decoded'}, - ), 400 + return ( + jsonify( + {'error': 'csr could not be decoded'}, + ), + 400, + ) # Get the cn and san values from the csr object, so that we can use them # for the ACL check. cn = ca_object.get_csr_common_name(csr) @@ -206,28 +217,24 @@ def get_certificate_from_csr(ca): 'san': san, }, ): - msg = ('{} does not have access to get certificate cn {} against' - ' ca {}').format( - authnz.get_logged_in_user(), - cn, - ca, + msg = ( + f'{authnz.get_logged_in_user()} does not have access to get' + 'certificate cn {cn} against ca {ca}' ) error_msg = {'error': msg, 'reference': cn} return jsonify(error_msg), 403 logger.info( - 'get_certificate called on id={} for ca={} by user={}'.format( - cn, - ca, - logged_in_user, - ) + 'get_certificate called on id=%s for ca=%s by user=%s', + cn, + ca, + logged_in_user, ) - arn = ca_object.issue_certificate(data['csr'], validity) - certificate = ca_object.get_certificate_from_arn(arn) + certificate_json = ca_object.issue_certificate(data['csr'], validity) certificate_response = CertificateResponse( - certificate=certificate['certificate'], - certificate_chain=certificate['certificate_chain'], + certificate=certificate_json['certificate'], + certificate_chain=certificate_json['certificate_chain'], ) return certificate_response_schema.dumps(certificate_response) @@ -284,7 +291,7 @@ def list_cas(): cas = certificatemanager.list_cas() - logger.info('list_cas called by user={}'.format(logged_in_user)) + logger.info('list_cas called by user=%s', logged_in_user) cas_response = CertificateAuthoritiesResponse.from_cas(cas) return certificate_authorities_response_schema.dumps(cas_response) @@ -331,29 +338,23 @@ def get_ca(ca): ''' try: ca_object = certificatemanager.get_ca(ca) - except certificatemanager.CertificateAuthorityNotFoundError: + except CertificateAuthorityNotFoundError: return jsonify({'error': 'Provided CA not found.'}), 404 logged_in_user = authnz.get_logged_in_user() + if not acl_module_check( resource_type='ca', action='get', resource_id=ca, ): - msg = '{} does not have access to get ca {}'.format( - authnz.get_logged_in_user(), - ca, - ) + msg = f''' + {authnz.get_logged_in_user()} does not have access to get ca {ca} + ''' error_msg = {'error': msg, 'reference': ca} return jsonify(error_msg), 403 - logger.info( - 'get_ca called on id={} by user={}'.format( - ca, - logged_in_user, - ) - ) - + logger.info('get_ca called on id=%s by user=%s', ca, logged_in_user) _ca = ca_object.get_certificate_authority_certificate() ca_response = CertificateAuthorityResponse( ca=_ca['ca'], diff --git a/confidant/services/certificate_authority/acmpca.py b/confidant/services/certificate_authority/acmpca.py new file mode 100644 index 00000000..8f32f809 --- /dev/null +++ b/confidant/services/certificate_authority/acmpca.py @@ -0,0 +1,364 @@ +import datetime +import hashlib +import logging +import time + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import NameOID +from lru import LRU + +import confidant.clients +from confidant import settings +from confidant.services.certificate_authority.certificateauthoritybase import ( + CertificateAuthorityBase, + CertificateAuthorityNotFoundError, + CertificateNotReadyError, +) +from confidant.utils import stats + +logger = logging.getLogger(__name__) + + +class CachedCertificate: + def __init__(self, lock=False, response=None): + self._lock = lock + self._response = response + + @property + def lock(self): + return self._lock + + @lock.setter + def lock(self, value): + self._lock = value + + @property + def response(self): + return self._response + + @response.setter + def response(self, value): + self._response = value + + +class CertificateCache: + def __init__(self, cache_size): + self.certificates = LRU(cache_size) + + def get(self, cache_id): + """ + Get the CachedCertificate for the given cache_id. + """ + return self.certificates.get(cache_id) + + def lock(self, cache_id): + """ + Lock the CachedCertificate for the given cache_id. If the id is not in + the cache, create a CachedCertificate for the cache_id, add it to the + cache, and lock it. + """ + if cache_id in self.certificates: + self.certificates[cache_id].lock = True + else: + self.certificates[cache_id] = CachedCertificate( + lock=True, + response=None, + ) + + def release(self, cache_id): + if cache_id in self.certificates: + self.certificates[cache_id].lock = False + else: + logger.warning( + "Attempting to release a non-existent lock in the certificate" + " cache." + ) + + def set_response(self, cache_id, response): + self.certificates[cache_id].response = response + + def get_cache_id(self, cn, validity, san): + """ + Return a unique string from the provided arguments, for use in the + certificate cache. The current day is included in the id, to ensure + cache invalidation (minumum validity is 1 day). + """ + date = datetime.datetime.today().strftime("%Y-%m-%d") + return "{}{}{}{}".format(cn, validity, san, date) + + +class CertificateCacheNoOp: + def get(self, cache_id): + return None + + def lock(self, cache_id): + return None + + def release(self, cache_id): + return None + + def set_response(self, cache_id, response): + return None + + def get_cache_id(self, cn, validity, san): + return "" + + +class ACMPrivateCertificateAuthority(CertificateAuthorityBase): + """ + AWS ACM Private Certificate Authority implementation of + CertificateAuthorityBase. + + Args: + CertificateAuthorityBase (_type_): base class for certificate + authorities. + """ + + def __init__(self, ca): + try: + self.ca_name = ca + self.settings = settings.ACM_PRIVATE_CA_SETTINGS[ca] + except KeyError: + raise CertificateAuthorityNotFoundError() + if self.settings["certificate_use_cache"]: + self.cache = CertificateCache( + self.settings["certificate_cache_size"], + ) + else: + self.cache = CertificateCacheNoOp() + + def issue_certificate(self, csr_pem, validity): + """ + Given a PEM encoded csr, and a validity for the certificate (in number + of days), issue a certificate from ACM Private CA, and return the ARN + of the issued certificate. + """ + csr_pem = csr_pem.encode(encoding="UTF-8") + with stats.timer("issue_certificate"): + client = confidant.clients.get_boto_client("acm-pca") + response = client.issue_certificate( + CertificateAuthorityArn=self.settings["arn"], + Csr=csr_pem, + SigningAlgorithm=self.settings["signing_algorithm"], + Validity={ + "Value": min(validity, self.settings["max_validity_days"]), + "Type": "DAYS", + }, + # Quick/easy idempotent token is just a hash of the csr itself. + # The token must be 36 chars or less. + IdempotencyToken=hashlib.sha256(csr_pem).hexdigest()[:36], + ) + arn = response["CertificateArn"] + return self._get_certificate_from_arn(arn) + + def _get_cached_certificate_with_key(self, cache_id): + """ + For the cache id, get the cached response, or, if another thread is in + the process of issuing the same certificate, wait for the other thread + to populate the cache. + """ + with stats.timer("get_cached_certificate_with_key"): + item = self.cache.get(cache_id) + # We're the first thread attempting to get this certificate + if not item: + return {} + # A certificate hasn't been issued yet, but since the cache id + # exists, another thread has requested the certificate. + if not item.response and item.lock: + raise CertificateNotReadyError() + # If the other thread failed to get the certificate, we need to + # ensure that this thread attempts to fetch a certificate. + return item.response + + def issue_certificate_with_key(self, cn, validity, san=None): + """ + Given the string common name, the validity length of the certificate (in + number of days), and a list of subject alternative names, return a dict + with the PEM encoded certificate, certificate chain, and private RSA + key. + """ + with stats.timer("issue_certificate_with_key"): + cache_id = self.cache.get_cache_id(cn, validity, san) + cached_response = self._get_cached_certificate_with_key(cache_id) + if cached_response: + stats.incr("get_cached_certificate_with_key.hit") + logger.debug("Used cached response for %s", cache_id) + return cached_response + stats.incr("get_cached_certificate_with_key.miss") + key = self.generate_key() + encoded_key = self.encode_key(key) + if self.settings["self_sign"]: + cert = self.encode_certificate( + self.generate_self_signed_certificate( + key, + cn, + validity, + san, + ) + ) + return { + "certificate": cert, + "certificate_chain": cert, + "key": encoded_key, + } + csr = self.generate_csr(key, cn, san) + try: + # set a lock + self.cache.lock(cache_id) + arn = self.issue_certificate(self.encode_csr(csr), validity) + response = self._get_certificate_from_arn(arn) + response["key"] = encoded_key + self.cache.set_response(cache_id, response) + finally: + # release the lock + self.cache.release(cache_id) + return response + + def _get_certificate_from_arn(self, certificate_arn): + """ + Get the PEM encoded certificate from the provided ARN. + """ + with stats.timer("get_certificate_from_arn"): + client = confidant.clients.get_boto_client("acm-pca") + # When a certificate is issued, it may take a while before it's + # available via get_certificate. We need to keep retrying until it's + # fully issued. + i = 0 + while True: + try: + response = client.get_certificate( + CertificateAuthorityArn=self.settings["arn"], + CertificateArn=certificate_arn, + ) + break + except client.exceptions.RequestInProgressException: + # Sleep for a maximum of 10 seconds + if i >= 50: + raise + logger.debug( + "Sleeping in get_certificate_from_arn for %s", + certificate_arn, + ) + time.sleep(0.200) + i = i + 1 + return { + "certificate": response["Certificate"], + "certificate_chain": response["CertificateChain"], + } + + def get_certificate_authority_certificate(self): + """ + Return the PEM encoded CA certificate and certificate chain from the CA + ARN. + """ + client = confidant.clients.get_boto_client("acm-pca") + certificate = client.get_certificate_authority_certificate( + CertificateAuthorityArn=self.settings["arn"], + ) + # TODO: support pagination for this call + tags = client.list_tags( + CertificateAuthorityArn=self.settings["arn"], + ) + _tags = {} + for tag in tags["Tags"]: + _tags[tag["Key"]] = tag["Value"] + return { + "ca": self.ca_name, + "certificate": certificate["Certificate"], + "certificate_chain": certificate["CertificateChain"], + "tags": _tags, + } + + def generate_x509_name(self, cn): + """ + For the given common name string, generate and return an x509.Name, with + attributes configured in the settings. + """ + name_attributes = [ + x509.NameAttribute(NameOID.COMMON_NAME, cn), + ] + if self.settings["csr_country_name"]: + name_attributes.append( + x509.NameAttribute( + NameOID.COUNTRY_NAME, + self.settings["csr_country_name"], + ) + ) + if self.settings["csr_state_or_province_name"]: + name_attributes.append( + x509.NameAttribute( + NameOID.STATE_OR_PROVINCE_NAME, + self.settings["csr_state_or_province_name"], + ) + ) + if self.settings["csr_locality_name"]: + name_attributes.append( + x509.NameAttribute( + NameOID.LOCALITY_NAME, + self.settings["csr_locality_name"], + ) + ) + if self.settings["csr_organization_name"]: + name_attributes.append( + x509.NameAttribute( + NameOID.ORGANIZATION_NAME, + self.settings["csr_organization_name"], + ) + ) + return x509.Name(name_attributes) + + def generate_key(self): + """ + Generate and return a private RSA key object + """ + key = rsa.generate_private_key( + public_exponent=self.settings["key_public_exponent_size"], + key_size=self.settings["key_size"], + backend=default_backend(), + ) + return key + + def generate_self_signed_certificate(self, key, cn, validity, san=None): + """ + Using the provided rsa key, a string common name, a validity (in number + of days), and a list of subject alternative names (as strings), generate + and return a signed certificate object. + """ + _validity = min(validity, self.settings["max_validity_days"]) + subject = self.generate_x509_name(cn) + issuer = subject + # x509.CertificateBuilder functions return modified versions of the + # object, so it's weirdly meant to be chained as function calls, making + # this look weirdly javascript-like. + cert = ( + x509.CertificateBuilder() + .subject_name( + subject, + ) + .issuer_name( + issuer, + ) + .public_key( + key.public_key(), + ) + .serial_number( + x509.random_serial_number(), + ) + .not_valid_before( + datetime.datetime.utcnow(), + ) + .not_valid_after( + datetime.datetime.utcnow() + + datetime.timedelta(days=_validity), + ) + ) + if san: + dns_names = self.encode_san_dns_names(san) + cert = cert.add_extension( + x509.SubjectAlternativeName(dns_names), + critical=False, + ) + return cert.sign(key, hashes.SHA256(), default_backend()) diff --git a/confidant/services/certificate_authority/certificateauthoritybase.py b/confidant/services/certificate_authority/certificateauthoritybase.py new file mode 100644 index 00000000..2dd28af1 --- /dev/null +++ b/confidant/services/certificate_authority/certificateauthoritybase.py @@ -0,0 +1,175 @@ +import logging +from abc import ABC, abstractmethod + +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.x509.extensions import ExtensionNotFound +from cryptography.x509.oid import NameOID + +logger = logging.getLogger(__name__) + + +class CertificateAuthorityNotFoundError(Exception): + """ + Exception raised when a specified certificate authority is not found. + """ + + def __init__(self, message="Certificate Authority not found."): + self.message = message + super().__init__(self.message) + + +class CertificateNotReadyError(Exception): + """ + Exception raised when a certificate is not ready. + """ + + def __init__(self, message="Certificate Authority not ready."): + self.message = message + super().__init__(self.message) + + +class CertificateAuthorityBase(ABC): + """Base class for certificate authorities.""" + + @abstractmethod + def __init__(self, ca: str): + pass + + @abstractmethod + def issue_certificate(self, csr_pem, validity): + """ + Given a PEM encoded csr, and a validity for the certificate (in number + of days), issue a certificate from ACM Private CA, and return the ARN + of the issued certificate. + """ + pass + + @abstractmethod + def issue_certificate_with_key(self, cn, validity, san=None): + """ + Given the string common name, the validity length of the certificate (in + number of days), and a list of subject alternative names, return a dict + with the PEM encoded certificate, certificate chain, and private RSA + key. + """ + pass + + @abstractmethod + def generate_self_signed_certificate(self, key, cn, validity, san=None): + """ + Using the provided rsa key, a string common name, a validity (in number + of days), and a list of subject alternative names (as strings), generate + and return a signed certificate object. + """ + pass + + @abstractmethod + def get_certificate_authority_certificate(self): + """ + Return the PEM encoded CA certificate and certificate chain + """ + pass + + @abstractmethod + def generate_key(self): + """ + Generate and return a private RSA key object + """ + pass + + @abstractmethod + def generate_x509_name(self, cn): + """ + For the given common name string, generate and return an x509.Name, with + attributes configured in the settings. + """ + pass + + def encode_csr(self, csr): + """ + Return a PEM string encoded version of the csr object. + """ + return csr.public_bytes( + serialization.Encoding.PEM, + ).decode(encoding="UTF-8") + + def decode_csr(self, pem_csr): + """ + Return a csr object from the pem encoded csr. + """ + pem_csr = pem_csr.encode(encoding="UTF-8") + return x509.load_pem_x509_csr(pem_csr, default_backend()) + + def get_csr_common_name(self, csr): + """ + From the provided csr object, return the string value of the common + name attribute. + """ + cns = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME) + if cns: + # get_attributes_for_oid returns a list, but there should only be a + # single cn attribute, so just return the first item. + return cns[0].value + return None + + def get_csr_san(self, csr): + """ + From the provided csr object, return a list of the string values of the + subjust alternative name extension. + """ + dns_names = [] + try: + san = csr.extensions.get_extension_for_class( + x509.SubjectAlternativeName + ) + except ExtensionNotFound: + san = None + if san: + for dns_name in san.value: + dns_names.append(dns_name.value) + return dns_names + + def encode_san_dns_names(self, san): + """ + Return a list of x509.DNSName attributes from a list of strings. + """ + dns_names = [] + for dns_name in san: + dns_names.append(x509.DNSName(dns_name)) + return dns_names + + def encode_certificate(self, cert): + """ + Return the PEM string encoded version of the certificate object. + """ + return cert.public_bytes( + serialization.Encoding.PEM, + ).decode(encoding="UTF-8") + + def encode_key(self, key): + """ + Return the PEM encoded version of the provided private RSA key object + """ + return key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ).decode(encoding="UTF-8") + + def generate_csr(self, key, cn, san=None): + """ + Using the provided rsa key object, a string common name, and a list of + string subject alternative names, generate and return a csr object. + """ + csr = x509.CertificateSigningRequestBuilder().subject_name( + self.generate_x509_name(cn) + ) + if san: + dns_names = self.encode_san_dns_names(san) + csr = csr.add_extension( + x509.SubjectAlternativeName(dns_names), + critical=False, + ) + return csr.sign(key, hashes.SHA256(), default_backend()) diff --git a/confidant/services/certificate_authority/customca.py b/confidant/services/certificate_authority/customca.py new file mode 100644 index 00000000..32ac1495 --- /dev/null +++ b/confidant/services/certificate_authority/customca.py @@ -0,0 +1,230 @@ +""" +Custom Certificate Authority Module + +This module provides functionality for managing Custom Certificate +Authorities (CAs) supplied by the user. It supports the signing of +certificates for Certificate Signing Requests (CSRs). + +Note: This module does not provide functionality for generating keys or CSRs. +""" + +import logging +from datetime import datetime, timedelta, timezone + +from cerberus import Validator +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.hashes import SHA256 + +from confidant import settings +from confidant.services.certificate_authority.certificateauthoritybase import ( + CertificateAuthorityBase, + CertificateAuthorityNotFoundError, +) +from confidant.settings import ( + CUSTOM_CA_ACTIVE_KEYS, + CUSTOM_CERTIFICATE_AUTHORITIES, +) + +logger = logging.getLogger(__name__) + +CUSTOM_CA_SCHEMA = { + "rootcrt": {"type": "string", "required": True, "nullable": True}, + "crt": {"type": "string", "required": True}, + "key": {"type": "string", "required": True}, + "passphrase": {"type": "string", "required": True, "nullable": True}, + "kid": {"type": "string", "required": True}, +} + + +class CustomCertificateAuthority(CertificateAuthorityBase): + """Custom Certificate Authority + + Args: + CertificateAuthorityBase (_type_): Base class for Certificate Authority + """ + + def __init__(self, ca_env: str): + self.ca_env = ca_env + self.active_ca_id = None + self.ca_json = self._get_ca_in_json(ca_env) + self.ca_certificate = self._load_ca_certificate(self.ca_json) + self.root_ca_certificate = self._load_rootca_certificate(self.ca_json) + self.ca_private_key = self._load_private_key(self.ca_json) + self.ca_chain = self._load_ca_chain() + self.settings = settings.CUSTOM_CA_SETTINGS + + def _get_ca_in_json(self, ca_env: str): + if ( + not CUSTOM_CERTIFICATE_AUTHORITIES + or ca_env not in CUSTOM_CERTIFICATE_AUTHORITIES + ): + logger.error("Custom CA %s not found", ca_env) + raise CertificateAuthorityNotFoundError( + f"Custom CA {ca_env} not found" + ) + if not CUSTOM_CA_ACTIVE_KEYS or ca_env not in CUSTOM_CA_ACTIVE_KEYS: + logger.error("Custom CA %s has no active keys", ca_env) + raise CertificateAuthorityNotFoundError( + f"Custom CA {ca_env} has no active keys" + ) + validator = Validator(CUSTOM_CA_SCHEMA) + active_ca_id = CUSTOM_CA_ACTIVE_KEYS[ca_env] + self.active_ca_id = active_ca_id + active_ca = [ + ca + for ca in CUSTOM_CERTIFICATE_AUTHORITIES[ca_env] + if validator.validate(ca) and ca["kid"] == active_ca_id + ] + if not active_ca: + logger.error("Custom CA %s has no active keys", ca_env) + raise CertificateAuthorityNotFoundError( + ( + f"Custom CA {ca_env} has no matching valid active keys for " + f"{active_ca_id}" + ) + ) + return active_ca[0] + + def _load_ca_certificate(self, ca_json): + return x509.load_pem_x509_certificate(ca_json["crt"].encode("utf-8")) + + def _load_rootca_certificate(self, ca_json): + if "rootcrt" not in ca_json or not ca_json["rootcrt"]: + logger.warning( + "Custom CA %s has no root CA certificate provided", self.ca_env + ) + return None + return x509.load_pem_x509_certificate( + ca_json["rootcrt"].encode("utf-8") + ) + + def _load_ca_chain(self): + # Get the certificate in PEM format + intermediate_ca_pem = self.encode_certificate(self.ca_certificate) + if not self.root_ca_certificate: + return intermediate_ca_pem + root_ca_pem = self.encode_certificate(self.root_ca_certificate) + return intermediate_ca_pem + root_ca_pem + + def _load_private_key(self, ca_json): + private_key = serialization.load_pem_private_key( + ca_json["key"].encode("utf-8"), + password=ca_json["passphrase"].encode("utf-8"), + ) + return private_key + + def issue_certificate(self, csr_pem, validity): + # Load the CSR from PEM format + csr = x509.load_pem_x509_csr(csr_pem.encode("utf-8")) + + # Verify the CSR + if not csr.is_signature_valid: + raise ValueError("Invalid CSR signature") + + # Define the certificate builder using information from the CSR + builder = x509.CertificateBuilder() + builder = builder.subject_name(csr.subject) + builder = builder.issuer_name( + self.ca_certificate.subject + ) # Issued by our CA + builder = builder.public_key(csr.public_key()) + builder = builder.serial_number(x509.random_serial_number()) + current_time = datetime.now(timezone.utc) + builder = builder.not_valid_before(current_time) + + acceptable_validity = min(validity, self.settings["max_validity_days"]) + builder = builder.not_valid_after( + current_time + timedelta(days=acceptable_validity) + ) + + # add basic constraints extension, restricted for end entity + # certificates + builder = builder.add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + + # add san extension from csr + if not self.get_csr_san(csr): + raise ValueError("CSR does not have a SAN extension") + + builder = builder.add_extension( + x509.SubjectAlternativeName( + csr.extensions.get_extension_for_class( + x509.SubjectAlternativeName + ).value + ), + critical=False, + ) + + # add key usage extension + # Note: this is configured to be a general purpose TLS certificate + builder = builder.add_extension( + x509.KeyUsage( + digital_signature=True, + content_commitment=False, + key_encipherment=True, + data_encipherment=False, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + + # add extended key usage extension + # Note: this is configured to be used for both server and client auth + builder = builder.add_extension( + x509.ExtendedKeyUsage( + [ + x509.oid.ExtendedKeyUsageOID.SERVER_AUTH, + x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH, + ] + ), + critical=False, + ) + + # Sign the certificate with the CA's private key + certificate = builder.sign( + private_key=self.ca_private_key, algorithm=SHA256() + ) + + # Return the certificate in PEM format + response = { + "certificate": certificate.public_bytes( + serialization.Encoding.PEM + ).decode("utf-8"), + "certificate_chain": self.ca_chain, + } + return response + + def get_certificate_authority_certificate(self): + intermediate_ca_pem = self.encode_certificate(self.ca_certificate) + root_ca_pem = self.encode_certificate(self.root_ca_certificate) + return { + "ca": self.active_ca_id, + "certificate": intermediate_ca_pem, + "certificate_chain": intermediate_ca_pem + root_ca_pem, + "tags": [], + } + + def issue_certificate_with_key(self, cn, validity, san=None): + raise NotImplementedError( + "Custom CA does not support issuing certificates with key" + ) + + def generate_self_signed_certificate(self, key, cn, validity, san=None): + raise NotImplementedError( + "Custom CA does not support generating self signed certificates" + ) + + def generate_key(self): + raise NotImplementedError("Custom CA does not support generating keys") + + def generate_x509_name(self, cn): + raise NotImplementedError( + "Custom CA does not support generating x509 names" + ) diff --git a/confidant/services/certificatemanager.py b/confidant/services/certificatemanager.py index 8df3e552..3c63c099 100644 --- a/confidant/services/certificatemanager.py +++ b/confidant/services/certificatemanager.py @@ -1,449 +1,40 @@ -import datetime -import hashlib import logging -import time +from enum import Enum -from cryptography import x509 -from cryptography.x509.extensions import ExtensionNotFound -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.x509.oid import NameOID -from lru import LRU +from confidant.services.certificate_authority.acmpca import ( + ACMPrivateCertificateAuthority, +) + +from confidant.services.certificate_authority.customca import ( + CustomCertificateAuthority, +) -import confidant.clients from confidant import settings -from confidant.utils import stats logger = logging.getLogger(__name__) -class CachedCertificate(object): - def __init__(self, lock=False, response=None): - self._lock = lock - self._response = response - - @property - def lock(self): - return self._lock - - @lock.setter - def lock(self, value): - self._lock = value - - @property - def response(self): - return self._response - - @response.setter - def response(self, value): - self._response = value - - -class CertificateCache(object): - def __init__(self, cache_size): - self.certificates = LRU(cache_size) - - def get(self, cache_id): - """ - Get the CachedCertificate for the given cache_id. - """ - return self.certificates.get(cache_id) - - def lock(self, cache_id): - """ - Lock the CachedCertificate for the given cache_id. If the id is not in - the cache, create a CachedCertificate for the cache_id, add it to the - cache, and lock it. - """ - if cache_id in self.certificates: - self.certificates[cache_id].lock = True - else: - self.certificates[cache_id] = CachedCertificate( - lock=True, - response=None, - ) - - def release(self, cache_id): - if cache_id in self.certificates: - self.certificates[cache_id].lock = False - else: - logger.warning( - 'Attempting to release a non-existent lock in the certificate' - ' cache.' - ) - - def set_response(self, cache_id, response): - self.certificates[cache_id].response = response - - def get_cache_id(self, cn, validity, san): - """ - Return a unique string from the provided arguments, for use in the - certificate cache. The current day is included in the id, to ensure - cache invalidation (minumum validity is 1 day). - """ - date = datetime.datetime.today().strftime('%Y-%m-%d') - return '{}{}{}{}'.format(cn, validity, san, date) - - -class CertificateCacheNoOp(object): - def get(self, cache_id): - return None - - def lock(self, cache_id): - return None - - def release(self, cache_id): - return None - - def set_response(self, cache_id, response): - return None - - def get_cache_id(self, cn, validity, san): - return '' - - -class CertificateAuthorityNotFoundError(Exception): - pass +class CAType(Enum): + """Enum for CA types.""" - -class CertificateNotReadyError(Exception): - pass - - -class CertificateAuthority(object): - def __init__(self, ca): - try: - self.ca_name = ca - self.settings = settings.ACM_PRIVATE_CA_SETTINGS[ca] - except KeyError: - raise CertificateAuthorityNotFoundError() - if self.settings['certificate_use_cache']: - self.cache = CertificateCache( - self.settings['certificate_cache_size'], - ) - else: - self.cache = CertificateCacheNoOp() - - def generate_key(self): - """ - Generate and return a private RSA key object - """ - key = rsa.generate_private_key( - public_exponent=self.settings['key_public_exponent_size'], - key_size=self.settings['key_size'], - backend=default_backend() - ) - return key - - def encode_key(self, key): - """ - Return the PEM encoded version of the provided private RSA key object - """ - return key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ).decode(encoding='UTF-8') - - def generate_x509_name(self, cn): - """ - For the given common name string, generate and return an x509.Name, with - attributes configured in the settings. - """ - name_attributes = [ - x509.NameAttribute(NameOID.COMMON_NAME, cn), - ] - if self.settings['csr_country_name']: - name_attributes.append( - x509.NameAttribute( - NameOID.COUNTRY_NAME, - self.settings['csr_country_name'], - ) - ) - if self.settings['csr_state_or_province_name']: - name_attributes.append( - x509.NameAttribute( - NameOID.STATE_OR_PROVINCE_NAME, - self.settings['csr_state_or_province_name'], - ) - ) - if self.settings['csr_locality_name']: - name_attributes.append( - x509.NameAttribute( - NameOID.LOCALITY_NAME, - self.settings['csr_locality_name'], - ) - ) - if self.settings['csr_organization_name']: - name_attributes.append( - x509.NameAttribute( - NameOID.ORGANIZATION_NAME, - self.settings['csr_organization_name'], - ) - ) - return x509.Name(name_attributes) - - def generate_csr(self, key, cn, san=None): - """ - Using the provided rsa key object, a string common name, and a list of - string subject alternative names, generate and return a csr object. - """ - csr = x509.CertificateSigningRequestBuilder().subject_name( - self.generate_x509_name(cn) - ) - if san: - dns_names = self.encode_san_dns_names(san) - csr = csr.add_extension( - x509.SubjectAlternativeName(dns_names), - critical=False, - ) - return csr.sign(key, hashes.SHA256(), default_backend()) - - def encode_csr(self, csr): - """ - Return a PEM string encoded version of the csr object. - """ - return csr.public_bytes( - serialization.Encoding.PEM, - ).decode(encoding='UTF-8') - - def decode_csr(self, pem_csr): - """ - Return a csr object from the pem encoded csr. - """ - pem_csr = pem_csr.encode(encoding='UTF-8') - return x509.load_pem_x509_csr(pem_csr, default_backend()) - - def get_csr_common_name(self, csr): - """ - From the provided csr object, return the string value of the common - name attribute. - """ - cns = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME) - if cns: - # get_attributes_for_oid returns a list, but there should only be a - # single cn attribute, so just return the first item. - return cns[0].value - return None - - def get_csr_san(self, csr): - """ - From the provided csr object, return a list of the string values of the - subjust alternative name extension. - """ - dns_names = [] - try: - san = csr.extensions.get_extension_for_class( - x509.SubjectAlternativeName - ) - except ExtensionNotFound: - san = None - if san: - for dns_name in san.value: - dns_names.append(dns_name.value) - return dns_names - - def encode_san_dns_names(self, san): - """ - Return a list of x509.DNSName attributes from a list of strings. - """ - dns_names = [] - for dns_name in san: - dns_names.append(x509.DNSName(dns_name)) - return dns_names - - def generate_self_signed_certificate(self, key, cn, validity, san=None): - """ - Using the provided rsa key, a string common name, a validity (in number - of days), and a list of subject alternative names (as strings), generate - and return a signed certificate object. - """ - _validity = min(validity, self.settings['max_validity_days']) - subject = self.generate_x509_name(cn) - issuer = subject - # x509.CertificateBuilder functions return modified versions of the - # object, so it's weirdly meant to be chained as function calls, making - # this look weirdly javascript-like. - cert = x509.CertificateBuilder( - ).subject_name( - subject, - ).issuer_name( - issuer, - ).public_key( - key.public_key(), - ).serial_number( - x509.random_serial_number(), - ).not_valid_before( - datetime.datetime.utcnow(), - ).not_valid_after( - datetime.datetime.utcnow() + datetime.timedelta(days=_validity), - ) - if san: - dns_names = self.encode_san_dns_names(san) - cert = cert.add_extension( - x509.SubjectAlternativeName(dns_names), - critical=False, - ) - return cert.sign(key, hashes.SHA256(), default_backend()) - - def encode_certificate(self, cert): - """ - Return the PEM string encoded version of the certificate object. - """ - return cert.public_bytes( - serialization.Encoding.PEM, - ).decode(encoding='UTF-8') - - def issue_certificate(self, csr, validity): - """ - Given a PEM encoded csr, and a validity for the certificate (in number - of days), issue a certificate from ACM Private CA, and return the ARN - of the issued certificate. - """ - csr = csr.encode(encoding='UTF-8') - with stats.timer('issue_certificate'): - client = confidant.clients.get_boto_client('acm-pca') - response = client.issue_certificate( - CertificateAuthorityArn=self.settings['arn'], - Csr=csr, - SigningAlgorithm=self.settings['signing_algorithm'], - Validity={ - 'Value': min(validity, self.settings['max_validity_days']), - 'Type': 'DAYS', - }, - # Quick/easy idempotent token is just a hash of the csr itself. - # The token must be 36 chars or less. - IdempotencyToken=hashlib.sha256(csr).hexdigest()[:36], - ) - return response['CertificateArn'] - - def _get_cached_certificate_with_key(self, cache_id): - """ - For the cache id, get the cached response, or, if another thread is in - the process of issuing the same certificate, wait for the other thread - to populate the cache. - """ - with stats.timer('get_cached_certificate_with_key'): - item = self.cache.get(cache_id) - # We're the first thread attempting to get this certificate - if not item: - return {} - # A certificate hasn't been issued yet, but since the cache id - # exists, another thread has requested the certificate. - if not item.response and item.lock: - raise CertificateNotReadyError() - # If the other thread failed to get the certificate, we need to - # ensure that this thread attempts to fetch a certificate. - return item.response - - def issue_certificate_with_key(self, cn, validity, san=None): - """ - Given the string common name, the validity length of the certificate (in - number of days), and a list of subject alternative names, return a dict - with the PEM encoded certificate, certificate chain, and private RSA - key. - """ - with stats.timer('issue_certificate_with_key'): - cache_id = self.cache.get_cache_id(cn, validity, san) - cached_response = self._get_cached_certificate_with_key(cache_id) - if cached_response: - stats.incr('get_cached_certificate_with_key.hit') - logger.debug('Used cached response for {}'.format(cache_id)) - return cached_response - stats.incr('get_cached_certificate_with_key.miss') - key = self.generate_key() - encoded_key = self.encode_key(key) - if self.settings['self_sign']: - cert = self.encode_certificate( - self.generate_self_signed_certificate( - key, - cn, - validity, - san, - ) - ) - return { - 'certificate': cert, - 'certificate_chain': cert, - 'key': encoded_key, - } - csr = self.generate_csr(key, cn, san) - try: - # set a lock - self.cache.lock(cache_id) - arn = self.issue_certificate(self.encode_csr(csr), validity) - response = self.get_certificate_from_arn(arn) - response['key'] = encoded_key - self.cache.set_response(cache_id, response) - finally: - # release the lock - self.cache.release(cache_id) - return response - - def get_certificate_from_arn(self, certificate_arn): - """ - Get the PEM encoded certificate from the provided ARN. - """ - with stats.timer('get_certificate_from_arn'): - client = confidant.clients.get_boto_client('acm-pca') - # When a certificate is issued, it may take a while before it's - # available via get_certificate. We need to keep retrying until it's - # fully issued. - i = 0 - while True: - try: - response = client.get_certificate( - CertificateAuthorityArn=self.settings['arn'], - CertificateArn=certificate_arn, - ) - break - except client.exceptions.RequestInProgressException: - # Sleep for a maximum of 10 seconds - if i >= 50: - raise - logger.debug( - 'Sleeping in get_certificate_from_arn for {}'.format( - certificate_arn, - ) - ) - time.sleep(.200) - i = i + 1 - return { - 'certificate': response['Certificate'], - 'certificate_chain': response['CertificateChain'], - } - - def get_certificate_authority_certificate(self): - """ - Return the PEM encoded CA certificate and certificate chain from the CA - ARN. - """ - client = confidant.clients.get_boto_client('acm-pca') - certificate = client.get_certificate_authority_certificate( - CertificateAuthorityArn=self.settings['arn'], - ) - # TODO: support pagination for this call - tags = client.list_tags( - CertificateAuthorityArn=self.settings['arn'], - ) - _tags = {} - for tag in tags['Tags']: - _tags[tag['Key']] = tag['Value'] - return { - 'ca': self.ca_name, - 'certificate': certificate['Certificate'], - 'certificate_chain': certificate['CertificateChain'], - 'tags': _tags, - } + AWS_ACM_PCA = "aws_acm_pca" + CUSTOM_CA = "custom_ca" _CAS = {} def get_ca(ca): + """get_ca returns a CertificateAuthority object for the given CA, + based on the CA type in settings. + """ if ca not in _CAS: - _CAS[ca] = CertificateAuthority(ca) + if settings.CA_TYPE == "aws_acm_pca": + _CAS[ca] = ACMPrivateCertificateAuthority(ca) + elif settings.CA_TYPE == "custom_ca": + _CAS[ca] = CustomCertificateAuthority(ca) + else: + raise ValueError(f"Unknown CA type: {settings.CA_TYPE}") return _CAS[ca] @@ -452,7 +43,14 @@ def list_cas(): Return detailed CA information for all CAs. """ cas = [] - for ca in settings.ACM_PRIVATE_CA_SETTINGS: - _ca = get_ca(ca) - cas.append(_ca.get_certificate_authority_certificate()) + if settings.CA_TYPE == "aws_acm_pca": + for ca in settings.ACM_PRIVATE_CA_SETTINGS: + _ca = get_ca(ca) + cas.append(_ca.get_certificate_authority_certificate()) + elif settings.CA_TYPE == "custom_ca": + for ca in settings.CUSTOM_CERTIFICATE_AUTHORITIES: + _ca = get_ca(ca) + cas.append(_ca.get_certificate_authority_certificate()) + else: + raise ValueError(f"Unknown CA type: {settings.CA_TYPE}") return cas diff --git a/confidant/settings.py b/confidant/settings.py index 6af6bbb8..216c626d 100644 --- a/confidant/settings.py +++ b/confidant/settings.py @@ -680,11 +680,16 @@ def str_env(var_name, default=''): else: decrypted_custom_cas = str_env('CUSTOM_CERTIFICATE_AUTHORITIES') +# CA_TYPE for issuing certificates, defaults to aws_acm_pca. +# options: aws_acm_pca, custom +CA_TYPE = str_env('CA_TYPE', "aws_acm_pca") + # CUSTOM_CERTIFICATE_AUTHORITIES # Should be in encrypted settings following this # format (where name is the name of the environment) and key ids must be unique: # {"":[{ # "key": "--- RSA...", +# "rootcrt": "--- CERT...", # "crt": "--- CERT...", # "passphrase": "some-key", # "kid": "some-kid" @@ -697,6 +702,26 @@ def str_env(var_name, default=''): # {"staging": "some_kid", "production": "some_kid"} CUSTOM_CA_ACTIVE_KEYS = json.loads(str_env('CUSTOM_CA_ACTIVE_KEYS', '{}')) +CUSTOM_CA_SETTINGS = {} +CUSTOM_CA_SETTINGS["max_validity_days"] = int_env( + "CUSTOM_CA_MAX_VALIDITY_DAYS", + 120, +) +# A regex to match against CN and SAN values for this CA. This regex must +# include a named group for service_name: (?P) +# If no named group is defined, then the default ACL will deny generation +# of the certificate. +# Any certificate issue attempt not matching this pattern for CN or values +# in SAN will be denied. If this is unset, all certificate issue attempts +# will be denied by the default_acl. +# Example: (?P[\w-]+)\.example\.com +# Example match: test-service.example.com +# service_name from example: test-service +CUSTOM_CA_SETTINGS["name_regex"] = str_env( + "CUSTOM_CA_DOMAIN_REGEX", + None, +) + # Configuration validation _settings_failures = False if len(set(SCOPED_AUTH_KEYS.values())) != len(SCOPED_AUTH_KEYS.values()): diff --git a/tests/unit/confidant/routes/certificates_test.py b/tests/unit/confidant/routes/certificates_test.py index 5d52c77b..2ccd6761 100644 --- a/tests/unit/confidant/routes/certificates_test.py +++ b/tests/unit/confidant/routes/certificates_test.py @@ -2,7 +2,12 @@ from pytest_mock.plugin import MockerFixture from confidant.app import create_app -from confidant.services import certificatemanager +from confidant.services.certificate_authority.certificateauthoritybase import ( + CertificateNotReadyError, +) +from confidant.services.certificate_authority.acmpca import ( + ACMPrivateCertificateAuthority, +) def test_get_certificate(mocker: MockerFixture): @@ -50,7 +55,7 @@ def test_get_certificate(mocker: MockerFixture): return_value=True, ) mocker.patch('confidant.authnz.get_logged_in_user', return_value='test') - ca_object = certificatemanager.CertificateAuthority('development') + ca_object = ACMPrivateCertificateAuthority('development') mocker.patch( ('confidant.routes.certificates.certificatemanager.get_ca'), return_value=ca_object, @@ -61,8 +66,8 @@ def test_get_certificate(mocker: MockerFixture): 'key': 'test_key', } mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority.issue_certificate_with_key', # noqa: E501 - return_value=issue_certificate_with_key_return_value + 'confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority.issue_certificate_with_key', # noqa: E501 + return_value=issue_certificate_with_key_return_value, ) ret = app.test_client().get( '/v1/certificates/development/test.example.com', @@ -76,8 +81,8 @@ def test_get_certificate(mocker: MockerFixture): 'key': 'test_key', } mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority.issue_certificate_with_key', # noqa: E501 - side_effect=certificatemanager.CertificateNotReadyError(), + 'confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority.issue_certificate_with_key', # noqa: E501 + side_effect=CertificateNotReadyError(), ) ret = app.test_client().get( '/v1/certificates/development/test.example.com', @@ -88,7 +93,7 @@ def test_get_certificate(mocker: MockerFixture): def test_get_certificate_from_csr(mocker: MockerFixture): - ca_object = certificatemanager.CertificateAuthority('development') + ca_object = ACMPrivateCertificateAuthority('development') key = ca_object.generate_key() csr = ca_object.generate_csr(key, 'test.example.com') pem_csr = ca_object.encode_csr(csr) @@ -112,6 +117,10 @@ def test_get_certificate_from_csr(mocker: MockerFixture): ) assert ret.status_code == 400 + mocker.patch( + ('confidant.routes.certificates.certificatemanager.get_ca'), + return_value=ca_object, + ) ret = app.test_client().post( '/v1/certificates/development', data=json.dumps({'csr': 'invalid_csr'}), @@ -134,10 +143,12 @@ def test_get_certificate_from_csr(mocker: MockerFixture): ) ret = app.test_client().post( '/v1/certificates/development', - data=json.dumps({ - 'csr': pem_csr, - 'validity': 7, - }), + data=json.dumps( + { + 'csr': pem_csr, + 'validity': 7, + } + ), content_type='application/json', follow_redirects=False, ) @@ -157,10 +168,12 @@ def test_get_certificate_from_csr(mocker: MockerFixture): ) ret = app.test_client().post( '/v1/certificates/development', - data=json.dumps({ - 'csr': pem_csr, - 'validity': 7, - }), + data=json.dumps( + { + 'csr': pem_csr, + 'validity': 7, + } + ), content_type='application/json', follow_redirects=False, ) @@ -175,11 +188,7 @@ def test_get_certificate_from_csr(mocker: MockerFixture): return_value=ca_object, ) mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority.issue_certificate', # noqa: E501 - return_value='test-certificate-arn', - ) - mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority.get_certificate_from_arn', # noqa: E501 + 'confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority.issue_certificate', # noqa: E501 return_value={ 'certificate': 'test_certificate', 'certificate_chain': 'test_certificate_chain', @@ -187,10 +196,12 @@ def test_get_certificate_from_csr(mocker: MockerFixture): ) ret = app.test_client().post( '/v1/certificates/development', - data=json.dumps({ - 'csr': pem_csr, - 'validity': 7, - }), + data=json.dumps( + { + 'csr': pem_csr, + 'validity': 7, + } + ), content_type='application/json', follow_redirects=False, ) @@ -225,12 +236,14 @@ def test_list_cas(mocker: MockerFixture): return_value=True, ) mocker.patch('confidant.authnz.get_logged_in_user', return_value='test') - cas = [{ - 'ca': 'development', - 'certificate': 'test_certificate', - 'certificate_chain': 'test_certificate_chain', - 'tags': {'environment': 'development'}, - }] + cas = [ + { + 'ca': 'development', + 'certificate': 'test_certificate', + 'certificate_chain': 'test_certificate_chain', + 'tags': {'environment': 'development'}, + } + ] mocker.patch( ('confidant.routes.certificates.certificatemanager.list_cas'), return_value=cas, @@ -239,12 +252,14 @@ def test_list_cas(mocker: MockerFixture): json_data = json.loads(ret.data) assert ret.status_code == 200 assert json_data == { - 'cas': [{ - 'ca': 'development', - 'certificate': 'test_certificate', - 'certificate_chain': 'test_certificate_chain', - 'tags': {'environment': 'development'}, - }], + 'cas': [ + { + 'ca': 'development', + 'certificate': 'test_certificate', + 'certificate_chain': 'test_certificate_chain', + 'tags': {'environment': 'development'}, + } + ], } @@ -271,13 +286,13 @@ def test_get_ca(mocker: MockerFixture): return_value=True, ) mocker.patch('confidant.authnz.get_logged_in_user', return_value='test') - ca_object = certificatemanager.CertificateAuthority('development') + ca_object = ACMPrivateCertificateAuthority('development') mocker.patch( ('confidant.routes.certificates.certificatemanager.get_ca'), return_value=ca_object, ) mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority.get_certificate_authority_certificate', # noqa: E501 + 'confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority.get_certificate_authority_certificate', # noqa: E501 return_value={ 'ca': 'development', 'certificate': 'test_certificate', diff --git a/tests/unit/confidant/services/certificate_authority/acmpca_test.py b/tests/unit/confidant/services/certificate_authority/acmpca_test.py new file mode 100644 index 00000000..5d9b79bf --- /dev/null +++ b/tests/unit/confidant/services/certificate_authority/acmpca_test.py @@ -0,0 +1,311 @@ +import datetime + +import pytest +from pytest_mock.plugin import MockerFixture +from cryptography.hazmat.primitives import hashes + +from confidant.services.certificate_authority.acmpca import ( + ACMPrivateCertificateAuthority, + CertificateCache, +) +from confidant.services.certificate_authority.certificateauthoritybase import ( + CertificateNotReadyError, +) + + +@pytest.fixture() +def ca_object(mocker: MockerFixture) -> ACMPrivateCertificateAuthority: + ca_object = ACMPrivateCertificateAuthority("development") + ca_object.settings["csr_country_name"] = "US" + ca_object.settings["csr_state_or_province_name"] = "California" + ca_object.settings["csr_locality_name"] = "San Francisco" + ca_object.settings["csr_organization_name"] = "Example Inc." + mocker.patch( + "confidant.services.certificatemanager.get_ca", return_value=ca_object + ) + return ca_object + + +def test_certificate_cache(): + cache = CertificateCache(5) + cache_id = cache.get_cache_id("test.example.com", 7, []) + cache.lock(cache_id) + item = cache.get(cache_id) + assert item.lock is True + cache.set_response(cache_id, {"test": "me"}) + assert item.response + cache.release(cache_id) + assert item.lock is False + # Cache size is 5, make sure if we stuff the cache, that the intial + # item is gone. + cache.lock(cache.get_cache_id("test1.example.com", 7, [])) + cache.lock(cache.get_cache_id("test2.example.com", 7, [])) + cache.lock(cache.get_cache_id("test3.example.com", 7, [])) + cache.lock(cache.get_cache_id("test4.example.com", 7, [])) + cache.lock(cache.get_cache_id("test5.example.com", 7, [])) + assert cache.get(cache_id) is None + + +def test_generate_key(ca_object: ACMPrivateCertificateAuthority): + print(type(ca_object)) + ca_object.settings["key_size"] = 1024 + key = ca_object.generate_key() + assert key.key_size == 1024 + + +def test_encode_key(ca_object: ACMPrivateCertificateAuthority): + key = ca_object.generate_key() + encoded_key = ca_object.encode_key(key) + assert encoded_key.startswith("-----BEGIN RSA PRIVATE KEY-----") + + +def test_generate_x509_name(ca_object: ACMPrivateCertificateAuthority): + x509_name = ca_object.generate_x509_name("test.example.com") + assert x509_name.rfc4514_string() == ( + "O=Example Inc.,L=San Francisco,ST=California,C=US,CN=test.example.com" + ) + + +def test_generate_csr(ca_object: ACMPrivateCertificateAuthority): + key = ca_object.generate_key() + san = ["test2.example.com", "test3.example.com"] + csr = ca_object.generate_csr(key, "test.example.com", san) + assert csr.is_signature_valid is True + assert csr.subject.rfc4514_string() == ( + "O=Example Inc.,L=San Francisco,ST=California,C=US,CN=test.example.com" + ) + + +def test_encode_csr(ca_object: ACMPrivateCertificateAuthority): + key = ca_object.generate_key() + csr = ca_object.generate_csr(key, "test.example.com") + encoded_csr = ca_object.encode_csr(csr) + assert encoded_csr.startswith("-----BEGIN CERTIFICATE REQUEST-----") + + +def test_decode_csr(ca_object: ACMPrivateCertificateAuthority): + encoded_csr = "-----BEGIN CERTIFICATE REQUEST-----\nMIICwDCCAagCAQAwajEZMBcGA1UEAwwQdGVzdC5leGFtcGxlLmNvbTELMAkGA1UE\nBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFjAUBgNVBAcMDVNhbiBGcmFuY2lz\nY28xEzARBgNVBAoMCkx5ZnQsIEluYy4wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw\nggEKAoIBAQDqAFNwMlG3DiPJzUgSfzInRlAYdZzycz/mRsw5Boucii4jBQpLfhp/\nwjkbClAuuwLIija5yv95zChxbJPJ6Je1FtcXtbXAEVjWnf+B1s/OEA+uSO8IoGiL\nsYRNFqXI2hyzcqMshnxc90+qfMB+/eAv17t0fkMjT028N5I/Rvqh0RQx9l+0AbvH\nPtNBzNSWj9s/Oy4mEaXary/S3VZPd+38hpXc3HQINczmSKQTG/pPKwQcg+dQMjQz\nvlPuntrgvy5S2mK5D0xOCfLUfNVT7qb89/Rd9siZw7VMzL/XDkNtVZEEuJAL16PN\n/1zPQO6jxqNes0PqeWz0brsrx6LqhxiPAgMBAAGgETAPBgkqhkiG9w0BCQ4xAjAA\nMA0GCSqGSIb3DQEBCwUAA4IBAQBZaU01DoLf4Ldum/gOrjc+R1lqgXna6Thu/DHs\nAKbPyztjQjRwGApoPUXqRs6MYpB8XJOal4rsYazybxRNsiIQV/yNtlToVsz86lys\nPP85zzk7nZTT28gMew/iuS7H0in4XJz3LWdDxVIk+P4ktiqTOQSQyqMBGM+Rw93Y\nBDCfk1/pigxis0umyfp6Ho/qfdKEr4MYi2UZfTIl8F8dLq+PKPzqK+sEEOBDOUtP\nc4Edeg3PL1XROiwv3uPhtfaIe1iVD4IjWxNN06anoa29xMmJ/vXkaqYLQSd+FKHe\ny00DRxiYx7zqqfByUAUV3pPRwytFMit5bsOEAlhYmTRc2PEx\n-----END CERTIFICATE REQUEST-----\n" # noqa:E501 + csr = ca_object.decode_csr(encoded_csr) + assert csr.is_signature_valid is True + + +def test_get_csr_common_name(ca_object: ACMPrivateCertificateAuthority): + key = ca_object.generate_key() + csr = ca_object.generate_csr(key, "test.example.com") + assert ca_object.get_csr_common_name(csr) == "test.example.com" + + +def test_get_csr_san(ca_object: ACMPrivateCertificateAuthority): + key = ca_object.generate_key() + san = ["test1.example.com", "test2.example.com"] + csr = ca_object.generate_csr(key, "test.example.com", san) + assert ca_object.get_csr_san(csr) == san + + +def test_encode_san_dns_names(ca_object: ACMPrivateCertificateAuthority): + san = ["test1.example.com", "test2.example.com"] + dns_names = ca_object.encode_san_dns_names(san) + assert len(dns_names) == len(san) + for dns_name in dns_names: + assert dns_name.value in san + + +def test_generate_self_signed_certificate( + ca_object: ACMPrivateCertificateAuthority, +): + key = ca_object.generate_key() + san = ["test1.example.com", "test2.example.com"] + cert = ca_object.generate_self_signed_certificate( + key, + "test.example.com", + 7, + san, + ) + delta = datetime.timedelta(days=7) + assert cert.not_valid_after - cert.not_valid_before == delta + assert cert.subject.rfc4514_string() == ( + "O=Example Inc.,L=San Francisco,ST=California,C=US,CN=test.example.com" + ) + assert isinstance(cert.signature_hash_algorithm, hashes.SHA256) + + +def test_encode_certificate(ca_object: ACMPrivateCertificateAuthority): + key = ca_object.generate_key() + cert = ca_object.generate_self_signed_certificate( + key, + "test.example.com", + 7, + ) + encoded_cert = ca_object.encode_certificate(cert) + assert encoded_cert.startswith("-----BEGIN CERTIFICATE-----") + + +def test_issue_certificate( + mocker: MockerFixture, ca_object: ACMPrivateCertificateAuthority +): + client_mock = mocker.patch( + "confidant.clients.get_boto_client", + autospec=True, + ) + issue_certificate_mock = mocker.MagicMock() + issue_certificate_mock.issue_certificate.return_value = { + "CertificateArn": "test", + } + client_mock.return_value = issue_certificate_mock + key = ca_object.generate_key() + csr = ca_object.generate_csr(key, "test.example.com", []) + encoded_csr = ca_object.encode_csr(csr) + mocker.patch( + "confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority._get_certificate_from_arn", # noqa:E501 + return_value={ + "certificate": "test_certificate", + "certificate_chain": "test_certificate_chain", + }, + ) + response = ca_object.issue_certificate(encoded_csr, 7) + assert response == { + "certificate": "test_certificate", + "certificate_chain": "test_certificate_chain", + } + + +def test__get_cached_certificate_with_key( + mocker: MockerFixture, ca_object: ACMPrivateCertificateAuthority +): + ca_object.settings["certificate_use_cache"] = False + assert ca_object._get_cached_certificate_with_key("test") == {} + ca_object.settings["certificate_use_cache"] = True + cache = CertificateCache(10) + ca_object.cache = cache + assert ca_object._get_cached_certificate_with_key("test") == {} + cache.lock("test") + cache.set_response("test", {"hello": "world"}) + assert ca_object._get_cached_certificate_with_key("test") == { + "hello": "world" + } # noqa:E501 + # test lock loop + cache.lock("test1") + item = mocker.MagicMock() + type(item).lock = mocker.PropertyMock(side_effect=[True, False]) + type(item).response = mocker.PropertyMock( + side_effect=[None, {"hello": "world"}] + ) + mocker.patch( + "confidant.services.certificate_authority.acmpca.CertificateCache.get", + return_value=item, + ) + with pytest.raises(CertificateNotReadyError): + ca_object._get_cached_certificate_with_key("test1") + + +def test_issue_certificate_with_key( + mocker: MockerFixture, ca_object: ACMPrivateCertificateAuthority +): + ca_object.settings["self_sign"] = True + data = ca_object.issue_certificate_with_key("test.example.com", 7) + assert data["certificate"].startswith("-----BEGIN CERTIFICATE-----") + assert data["certificate_chain"].startswith("-----BEGIN CERTIFICATE-----") + assert data["key"].startswith("-----BEGIN RSA PRIVATE KEY-----") + + mocker.patch( + "confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority._get_cached_certificate_with_key", # noqa:E501 + return_value={"hello": "world"}, + ) + data = ca_object.issue_certificate_with_key("test.example.com", 7) + assert data == {"hello": "world"} + mocker.patch( + "confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority._get_cached_certificate_with_key", # noqa:E501 + return_value={}, + ) + + ca_object.settings["self_sign"] = False + mocker.patch( + "confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority.issue_certificate", # noqa:E501 + return_value="test-certificate-arn", + ) + mocker.patch( + "confidant.services.certificate_authority.acmpca.ACMPrivateCertificateAuthority._get_certificate_from_arn", # noqa:E501 + return_value={ + "certificate": "test_certificate", + "certificate_chain": "test_certificate_chain", + }, + ) + data = ca_object.issue_certificate_with_key("test.example.com", 7) + assert data["certificate"] == "test_certificate" + assert data["certificate_chain"] == "test_certificate_chain" + assert data["key"].startswith("-----BEGIN RSA PRIVATE KEY-----") + + +def test_get_certificate_from_arn_no_exception( + mocker: MockerFixture, ca_object: ACMPrivateCertificateAuthority +): + time_mock = mocker.patch("time.sleep") + client_mock = mocker.patch( + "confidant.clients.get_boto_client", + autospec=True, + ) + get_certificate_mock = mocker.MagicMock() + get_certificate_mock.get_certificate.return_value = { + "Certificate": "test", + "CertificateChain": "test_chain", + } + client_mock.return_value = get_certificate_mock + data = ca_object._get_certificate_from_arn("test_arn") + assert time_mock.called is False + assert data == {"certificate": "test", "certificate_chain": "test_chain"} + + +def test_get_certificate_from_arn_with_exception( + mocker: MockerFixture, ca_object: ACMPrivateCertificateAuthority +): + class RequestInProgressException(Exception): + pass + + time_mock = mocker.patch("time.sleep") + client_mock = mocker.patch( + "confidant.clients.get_boto_client", + autospec=True, + ) + get_certificate_mock = mocker.MagicMock() + get_certificate_mock.exceptions.RequestInProgressException = ( + RequestInProgressException # noqa:E501 + ) + get_certificate_mock.get_certificate.side_effect = [ + RequestInProgressException(), + {"Certificate": "test", "CertificateChain": "test_chain"}, + ] + client_mock.return_value = get_certificate_mock + data = ca_object._get_certificate_from_arn("test_arn") + assert time_mock.called is True + assert data == {"certificate": "test", "certificate_chain": "test_chain"} + + +def test_get_certificate_authority_certificate( + mocker: MockerFixture, ca_object: ACMPrivateCertificateAuthority +): + client_mock = mocker.patch( + "confidant.clients.get_boto_client", + autospec=True, + ) + gcac_mock = mocker.MagicMock() + gcac_mock.get_certificate_authority_certificate.return_value = { + "Certificate": "test-certificate", + "CertificateChain": "test-certificate-chain", + } + gcac_mock.list_tags.return_value = { + "Tags": [ + {"Key": "environment", "Value": "development"}, + {"Key": "extra", "Value": "extra-value"}, + ], + } + client_mock.return_value = gcac_mock + data = ca_object.get_certificate_authority_certificate() + assert data == { + "ca": "development", + "certificate": "test-certificate", + "certificate_chain": "test-certificate-chain", + "tags": { + "environment": "development", + "extra": "extra-value", + }, + } diff --git a/tests/unit/confidant/services/certificate_authority/customca_test.py b/tests/unit/confidant/services/certificate_authority/customca_test.py new file mode 100644 index 00000000..84315ed1 --- /dev/null +++ b/tests/unit/confidant/services/certificate_authority/customca_test.py @@ -0,0 +1,249 @@ +import pytest +from unittest.mock import patch + +from confidant.services.certificate_authority.certificateauthoritybase import ( + CertificateAuthorityNotFoundError, +) +from confidant.services.certificate_authority.customca import ( + CustomCertificateAuthority, +) +from tests.conftest import TEST_CERTIFICATE +from cryptography.hazmat.primitives import serialization +from cryptography import x509 +from datetime import datetime, timedelta, timezone +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.hashes import SHA256 + + +@pytest.fixture +@patch.object( + CustomCertificateAuthority, "__init__", lambda self, ca_env: None +) +def ca_authority(): + # Mock the CA private key and certificate + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048 + ) + certificate = ( + x509.CertificateBuilder() + .subject_name( + x509.Name( + [x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "Test CA")] + ) + ) + .issuer_name( + x509.Name( + [x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "Test CA")] + ) + ) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .public_key(private_key.public_key()) + .sign(private_key, SHA256()) + ) + + ca = CustomCertificateAuthority("test") + ca.ca_private_key = private_key + ca.ca_certificate = certificate + ca.ca_chain = [] # Mock CA chain for simplicity + ca.settings = {"max_validity_days": 30} + + return ca + + +def mock_constants(): + return { + "CUSTOM_CA_ACTIVE_KEYS": {"test": "test1"}, + "CUSTOM_CERTIFICATE_AUTHORITIES": { + "test": [ + { + "rootcrt": TEST_CERTIFICATE.decode("utf-8"), + "crt": "CERTIFICATE", + "key": "KEY", + "passphrase": "DOG-AND-FRIENDS", + "kid": "test1", + }, + ], + "invalid": [], + }, + "CUSTOM_CERTIFICATE_AUTHORITIES_INVALID_SCHEMA": { + "test": [ + { + "invalid_field": "invalid", + "crt": "CERTIFICATE", + "passphrase": "DOG-AND-FRIENDS", + }, + ], + }, + "CUSTOM_CA_SCHEMA": {}, + } + + +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CA_ACTIVE_KEYS", + mock_constants()["CUSTOM_CA_ACTIVE_KEYS"], +) +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CERTIFICATE_AUTHORITIES", # noqa: E501 + mock_constants()["CUSTOM_CERTIFICATE_AUTHORITIES"], +) +@patch.object( + CustomCertificateAuthority, "__init__", lambda self, ca_env: None +) +def test_get_ca_in_json_non_existent(): + ca_object = CustomCertificateAuthority("test") + with pytest.raises(CertificateAuthorityNotFoundError): + ca_object._get_ca_in_json("nonexistent") + + +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CA_ACTIVE_KEYS", + {}, + # intentionally left blank so that active key + # does not exist for 'test' environment +) +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CERTIFICATE_AUTHORITIES", # noqa: E501 + mock_constants()["CUSTOM_CERTIFICATE_AUTHORITIES"], +) +@patch.object( + CustomCertificateAuthority, "__init__", lambda self, ca_env: None +) +def test_get_ca_in_json_non_existent_active_key(): + ca_object = CustomCertificateAuthority("test") + with pytest.raises(CertificateAuthorityNotFoundError): + ca_object._get_ca_in_json("test") + + +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CA_ACTIVE_KEYS", + mock_constants()["CUSTOM_CA_ACTIVE_KEYS"], +) +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CERTIFICATE_AUTHORITIES", # noqa: E501 + mock_constants()["CUSTOM_CERTIFICATE_AUTHORITIES_INVALID_SCHEMA"], +) +@patch.object( + CustomCertificateAuthority, "__init__", lambda self, ca_env: None +) +def test_get_ca_in_json_wrong_schema(): + ca_object = CustomCertificateAuthority("test") + with pytest.raises(CertificateAuthorityNotFoundError): + ca_object._get_ca_in_json("test") + + +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CA_ACTIVE_KEYS", + mock_constants()["CUSTOM_CA_ACTIVE_KEYS"], +) +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CERTIFICATE_AUTHORITIES", # noqa: E501 + mock_constants()["CUSTOM_CERTIFICATE_AUTHORITIES"], +) +@patch.object( + CustomCertificateAuthority, "__init__", lambda self, ca_env: None +) +def test_get_ca_in_json_success(): + ca_object = CustomCertificateAuthority("test") + response = ca_object._get_ca_in_json("test") + assert ( + response + == mock_constants()["CUSTOM_CERTIFICATE_AUTHORITIES"]["test"][0] + ) + + +@patch.object( + CustomCertificateAuthority, "__init__", lambda self, ca_env: None +) +def test_load_rootca_certificate_success(): + ca_object = CustomCertificateAuthority("test") + response = ca_object._load_rootca_certificate( + mock_constants()["CUSTOM_CERTIFICATE_AUTHORITIES"]["test"][0] + ) + assert response is not None + + +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CA_ACTIVE_KEYS", + mock_constants()["CUSTOM_CA_ACTIVE_KEYS"], +) +@patch( + "confidant.services.certificate_authority.customca.CUSTOM_CERTIFICATE_AUTHORITIES", # noqa: E501 + {}, +) +@patch.object( + CustomCertificateAuthority, "__init__", lambda self, ca_env: None +) +def test_load_rootca_certificate_no_root_ca_provided(): + ca_object = CustomCertificateAuthority("test") + ca_object.ca_env = "test" + response = ca_object._load_rootca_certificate({}) + assert response is None + + +def test_issue_certificate_invalid_csr(ca_authority): + # Generate an invalid private key and CSR + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048 + ) + subject = x509.Name( + [x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "example.com")] + ) + + # purposely not adding a subject alternative name + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name(subject) + .sign(private_key, SHA256()) + ) + csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") + + # Issue the certificate + with pytest.raises(ValueError): + ca_authority.issue_certificate(csr_pem, validity=30) + + +def test_issue_certificate_valid_csr(ca_authority): + # Generate a valid private key and CSR + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048 + ) + subject = x509.Name( + [x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "example.com")] + ) + + csr = ( + x509.CertificateSigningRequestBuilder() + .subject_name(subject) + .add_extension( + x509.SubjectAlternativeName( + [x509.DNSName("test1.com"), x509.DNSName("test2.com")] + ), + False, + ) + .sign(private_key, SHA256()) + ) + csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode("utf-8") + print(csr_pem) + + # Issue the certificate + response = ca_authority.issue_certificate(csr_pem, validity=30) + + # Assert the response contains the expected fields + assert "certificate" in response + assert "certificate_chain" in response + + # Load the certificate to validate its structure + issued_cert = x509.load_pem_x509_certificate( + response["certificate"].encode("utf-8") + ) + + # Assert the issued certificate is valid + assert issued_cert.subject == subject + assert issued_cert.issuer == ca_authority.ca_certificate.subject + + # validate SAN + assert issued_cert.extensions.get_extension_for_class( + x509.SubjectAlternativeName + ).value.get_values_for_type(x509.DNSName) == ["test1.com", "test2.com"] diff --git a/tests/unit/confidant/services/certificatemanager_test.py b/tests/unit/confidant/services/certificatemanager_test.py deleted file mode 100644 index 41b23bad..00000000 --- a/tests/unit/confidant/services/certificatemanager_test.py +++ /dev/null @@ -1,300 +0,0 @@ -import datetime - -import pytest -from pytest_mock.plugin import MockerFixture -from cryptography.hazmat.primitives import hashes - -from confidant.services import certificatemanager - - -@pytest.fixture() -def ca_object(mocker: MockerFixture) -> certificatemanager.CertificateAuthority: - ca_object = certificatemanager.CertificateAuthority('development') - ca_object.settings['csr_country_name'] = 'US' - ca_object.settings['csr_state_or_province_name'] = 'California' - ca_object.settings['csr_locality_name'] = 'San Francisco' - ca_object.settings['csr_organization_name'] = 'Example Inc.' - mocker.patch( - 'confidant.services.certificatemanager.get_ca', return_value=ca_object - ) - return ca_object - - -def test_certificate_cache(): - cache = certificatemanager.CertificateCache(5) - cache_id = cache.get_cache_id('test.example.com', 7, []) - cache.lock(cache_id) - item = cache.get(cache_id) - assert item.lock is True - cache.set_response(cache_id, {'test': 'me'}) - assert item.response - cache.release(cache_id) - assert item.lock is False - # Cache size is 5, make sure if we stuff the cache, that the intial - # item is gone. - cache.lock(cache.get_cache_id('test1.example.com', 7, [])) - cache.lock(cache.get_cache_id('test2.example.com', 7, [])) - cache.lock(cache.get_cache_id('test3.example.com', 7, [])) - cache.lock(cache.get_cache_id('test4.example.com', 7, [])) - cache.lock(cache.get_cache_id('test5.example.com', 7, [])) - assert cache.get(cache_id) is None - - -def test_generate_key(ca_object: certificatemanager.CertificateAuthority): - print(type(ca_object)) - ca_object.settings['key_size'] = 1024 - key = ca_object.generate_key() - assert key.key_size == 1024 - - -def test_encode_key(ca_object: certificatemanager.CertificateAuthority): - key = ca_object.generate_key() - encoded_key = ca_object.encode_key(key) - assert encoded_key.startswith('-----BEGIN RSA PRIVATE KEY-----') - - -def test_generate_x509_name(ca_object: certificatemanager.CertificateAuthority): - x509_name = ca_object.generate_x509_name('test.example.com') - assert x509_name.rfc4514_string() == ( - 'O=Example Inc.,L=San Francisco,ST=California,C=US,CN=test.example.com' - ) - - -def test_generate_csr(ca_object: certificatemanager.CertificateAuthority): - key = ca_object.generate_key() - san = ['test2.example.com', 'test3.example.com'] - csr = ca_object.generate_csr(key, 'test.example.com', san) - assert csr.is_signature_valid is True - assert csr.subject.rfc4514_string() == ( - 'O=Example Inc.,L=San Francisco,ST=California,C=US,CN=test.example.com' - ) - - -def test_encode_csr(ca_object: certificatemanager.CertificateAuthority): - key = ca_object.generate_key() - csr = ca_object.generate_csr(key, 'test.example.com') - encoded_csr = ca_object.encode_csr(csr) - assert encoded_csr.startswith('-----BEGIN CERTIFICATE REQUEST-----') - - -def test_decode_csr(ca_object: certificatemanager.CertificateAuthority): - encoded_csr = '-----BEGIN CERTIFICATE REQUEST-----\nMIICwDCCAagCAQAwajEZMBcGA1UEAwwQdGVzdC5leGFtcGxlLmNvbTELMAkGA1UE\nBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFjAUBgNVBAcMDVNhbiBGcmFuY2lz\nY28xEzARBgNVBAoMCkx5ZnQsIEluYy4wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw\nggEKAoIBAQDqAFNwMlG3DiPJzUgSfzInRlAYdZzycz/mRsw5Boucii4jBQpLfhp/\nwjkbClAuuwLIija5yv95zChxbJPJ6Je1FtcXtbXAEVjWnf+B1s/OEA+uSO8IoGiL\nsYRNFqXI2hyzcqMshnxc90+qfMB+/eAv17t0fkMjT028N5I/Rvqh0RQx9l+0AbvH\nPtNBzNSWj9s/Oy4mEaXary/S3VZPd+38hpXc3HQINczmSKQTG/pPKwQcg+dQMjQz\nvlPuntrgvy5S2mK5D0xOCfLUfNVT7qb89/Rd9siZw7VMzL/XDkNtVZEEuJAL16PN\n/1zPQO6jxqNes0PqeWz0brsrx6LqhxiPAgMBAAGgETAPBgkqhkiG9w0BCQ4xAjAA\nMA0GCSqGSIb3DQEBCwUAA4IBAQBZaU01DoLf4Ldum/gOrjc+R1lqgXna6Thu/DHs\nAKbPyztjQjRwGApoPUXqRs6MYpB8XJOal4rsYazybxRNsiIQV/yNtlToVsz86lys\nPP85zzk7nZTT28gMew/iuS7H0in4XJz3LWdDxVIk+P4ktiqTOQSQyqMBGM+Rw93Y\nBDCfk1/pigxis0umyfp6Ho/qfdKEr4MYi2UZfTIl8F8dLq+PKPzqK+sEEOBDOUtP\nc4Edeg3PL1XROiwv3uPhtfaIe1iVD4IjWxNN06anoa29xMmJ/vXkaqYLQSd+FKHe\ny00DRxiYx7zqqfByUAUV3pPRwytFMit5bsOEAlhYmTRc2PEx\n-----END CERTIFICATE REQUEST-----\n' # noqa:E501 - csr = ca_object.decode_csr(encoded_csr) - assert csr.is_signature_valid is True - - -def test_get_csr_common_name( - ca_object: certificatemanager.CertificateAuthority -): - key = ca_object.generate_key() - csr = ca_object.generate_csr(key, 'test.example.com') - assert ca_object.get_csr_common_name(csr) == 'test.example.com' - - -def test_get_csr_san(ca_object: certificatemanager.CertificateAuthority): - key = ca_object.generate_key() - san = ['test1.example.com', 'test2.example.com'] - csr = ca_object.generate_csr(key, 'test.example.com', san) - assert ca_object.get_csr_san(csr) == san - - -def test_encode_san_dns_names( - ca_object: certificatemanager.CertificateAuthority -): - san = ['test1.example.com', 'test2.example.com'] - dns_names = ca_object.encode_san_dns_names(san) - assert len(dns_names) == len(san) - for dns_name in dns_names: - assert dns_name.value in san - - -def test_generate_self_signed_certificate( - ca_object: certificatemanager.CertificateAuthority -): - key = ca_object.generate_key() - san = ['test1.example.com', 'test2.example.com'] - cert = ca_object.generate_self_signed_certificate( - key, - 'test.example.com', - 7, - san, - ) - delta = datetime.timedelta(days=7) - assert cert.not_valid_after - cert.not_valid_before == delta - assert cert.subject.rfc4514_string() == ( - 'O=Example Inc.,L=San Francisco,ST=California,C=US,CN=test.example.com' - ) - assert isinstance(cert.signature_hash_algorithm, hashes.SHA256) - - -def test_encode_certificate(ca_object: certificatemanager.CertificateAuthority): - key = ca_object.generate_key() - cert = ca_object.generate_self_signed_certificate( - key, - 'test.example.com', - 7, - ) - encoded_cert = ca_object.encode_certificate(cert) - assert encoded_cert.startswith('-----BEGIN CERTIFICATE-----') - - -def test_issue_certificate( - mocker: MockerFixture, - ca_object: certificatemanager.CertificateAuthority -): - client_mock = mocker.patch( - 'confidant.clients.get_boto_client', - autospec=True, - ) - issue_certificate_mock = mocker.MagicMock() - issue_certificate_mock.issue_certificate.return_value = { - 'CertificateArn': 'test', - } - client_mock.return_value = issue_certificate_mock - key = ca_object.generate_key() - csr = ca_object.generate_csr(key, 'test.example.com', []) - encoded_csr = ca_object.encode_csr(csr) - assert ca_object.issue_certificate(encoded_csr, 7) == 'test' - - -def test__get_cached_certificate_with_key( - mocker: MockerFixture, - ca_object: certificatemanager.CertificateAuthority -): - ca_object.settings['certificate_use_cache'] = False - assert ca_object._get_cached_certificate_with_key('test') == {} - ca_object.settings['certificate_use_cache'] = True - cache = certificatemanager.CertificateCache(10) - ca_object.cache = cache - assert ca_object._get_cached_certificate_with_key('test') == {} - cache.lock('test') - cache.set_response('test', {'hello': 'world'}) - assert ca_object._get_cached_certificate_with_key('test') == {'hello': 'world'} # noqa:E501 - # test lock loop - cache.lock('test1') - item = mocker.MagicMock() - type(item).lock = mocker.PropertyMock(side_effect=[True, False]) - type(item).response = mocker.PropertyMock( - side_effect=[None, {'hello': 'world'}] - ) - mocker.patch( - 'confidant.services.certificatemanager.CertificateCache.get', - return_value=item - ) - with pytest.raises(certificatemanager.CertificateNotReadyError): - ca_object._get_cached_certificate_with_key('test1') - - -def test_issue_certificate_with_key( - mocker: MockerFixture, - ca_object: certificatemanager.CertificateAuthority -): - ca_object.settings['self_sign'] = True - data = ca_object.issue_certificate_with_key('test.example.com', 7) - assert data['certificate'].startswith('-----BEGIN CERTIFICATE-----') - assert data['certificate_chain'].startswith('-----BEGIN CERTIFICATE-----') - assert data['key'].startswith('-----BEGIN RSA PRIVATE KEY-----') - - mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority._get_cached_certificate_with_key', # noqa:E501 - return_value={'hello': 'world'}, - ) - data = ca_object.issue_certificate_with_key('test.example.com', 7) - assert data == {'hello': 'world'} - mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority._get_cached_certificate_with_key', # noqa:E501 - return_value={}, - ) - - ca_object.settings['self_sign'] = False - mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority.issue_certificate', # noqa:E501 - return_value='test-certificate-arn', - ) - mocker.patch( - 'confidant.services.certificatemanager.CertificateAuthority.get_certificate_from_arn', # noqa:E501 - return_value={ - 'certificate': 'test_certificate', - 'certificate_chain': 'test_certificate_chain', - }, - ) - data = ca_object.issue_certificate_with_key('test.example.com', 7) - assert data['certificate'] == 'test_certificate' - assert data['certificate_chain'] == 'test_certificate_chain' - assert data['key'].startswith('-----BEGIN RSA PRIVATE KEY-----') - - -def test_get_certificate_from_arn_no_exception( - mocker: MockerFixture, - ca_object: certificatemanager.CertificateAuthority -): - time_mock = mocker.patch('time.sleep') - client_mock = mocker.patch( - 'confidant.clients.get_boto_client', - autospec=True, - ) - get_certificate_mock = mocker.MagicMock() - get_certificate_mock.get_certificate.return_value = { - 'Certificate': 'test', - 'CertificateChain': 'test_chain', - } - client_mock.return_value = get_certificate_mock - data = ca_object.get_certificate_from_arn('test_arn') - assert time_mock.called is False - assert data == {'certificate': 'test', 'certificate_chain': 'test_chain'} - - -def test_get_certificate_from_arn_with_exception( - mocker: MockerFixture, - ca_object: certificatemanager.CertificateAuthority -): - class RequestInProgressException(Exception): - pass - - time_mock = mocker.patch('time.sleep') - client_mock = mocker.patch( - 'confidant.clients.get_boto_client', - autospec=True, - ) - get_certificate_mock = mocker.MagicMock() - get_certificate_mock.exceptions.RequestInProgressException = RequestInProgressException # noqa:E501 - get_certificate_mock.get_certificate.side_effect = [ - RequestInProgressException(), - {'Certificate': 'test', 'CertificateChain': 'test_chain'}, - ] - client_mock.return_value = get_certificate_mock - data = ca_object.get_certificate_from_arn('test_arn') - assert time_mock.called is True - assert data == {'certificate': 'test', 'certificate_chain': 'test_chain'} - - -def test_get_certificate_authority_certificate( - mocker: MockerFixture, - ca_object: certificatemanager.CertificateAuthority -): - client_mock = mocker.patch( - 'confidant.clients.get_boto_client', - autospec=True, - ) - gcac_mock = mocker.MagicMock() - gcac_mock.get_certificate_authority_certificate.return_value = { - 'Certificate': 'test-certificate', - 'CertificateChain': 'test-certificate-chain', - } - gcac_mock.list_tags.return_value = { - 'Tags': [ - {'Key': 'environment', 'Value': 'development'}, - {'Key': 'extra', 'Value': 'extra-value'}, - ], - } - client_mock.return_value = gcac_mock - data = ca_object.get_certificate_authority_certificate() - assert data == { - 'ca': 'development', - 'certificate': 'test-certificate', - 'certificate_chain': 'test-certificate-chain', - 'tags': { - 'environment': 'development', - 'extra': 'extra-value', - }, - }