Skip to content

Commit

Permalink
Support Custom CA for certificate issuing (#438)
Browse files Browse the repository at this point in the history
previously we have one giant class certificatemanager which has a
certificatauthority class which is basically ACM PCA.

Now we want to add support for custom ca, without impacting existing
contracts/apis on `/v1/certificates prefix`. So a new base class is
created so that both acm pca and custom ca can inherit from.

other changes are mostly tests and settings along side this.
  • Loading branch information
meng-han authored Oct 16, 2024
1 parent e682a69 commit 78f7057
Show file tree
Hide file tree
Showing 10 changed files with 1,482 additions and 814 deletions.
87 changes: 44 additions & 43 deletions confidant/routes/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from confidant import authnz, settings
from confidant.services import certificatemanager
from confidant.services.certificate_authority.certificateauthoritybase import (
CertificateAuthorityNotFoundError,
CertificateNotReadyError,
)
from confidant.schema.certificates import (
certificate_authority_response_schema,
certificate_authorities_response_schema,
Expand Down Expand Up @@ -69,7 +73,7 @@ def get_certificate(ca, cn):
'''
try:
ca_object = certificatemanager.get_ca(ca)
except certificatemanager.CertificateAuthorityNotFoundError:
except CertificateAuthorityNotFoundError:
return jsonify({'error': 'Provided CA not found.'}), 404
san = request.args.getlist('san')

Expand All @@ -83,8 +87,9 @@ def get_certificate(ca, cn):
'san': san,
},
):
msg = ('{} does not have access to get certificate cn {} against'
' ca {}').format(
msg = (
'{} does not have access to get certificate cn {} against' ' ca {}'
).format(
authnz.get_logged_in_user(),
cn,
ca,
Expand All @@ -93,11 +98,10 @@ def get_certificate(ca, cn):
return jsonify(error_msg), 403

logger.info(
'get_certificate called on id={} for ca={} by user={}'.format(
cn,
ca,
logged_in_user,
)
'get_certificate called on id=%s for ca=%s by user=%s',
cn,
ca,
logged_in_user,
)

validity = request.args.get(
Expand All @@ -111,7 +115,7 @@ def get_certificate(ca, cn):
validity,
san,
)
except certificatemanager.CertificateNotReadyError:
except CertificateNotReadyError:
# Ratelimit response for a locked certificate in the cache
error_msg = 'Certificate being requested, please wait and try again.'
response = jsonify(error_msg)
Expand Down Expand Up @@ -173,13 +177,17 @@ def get_certificate_from_csr(ca):
'''
try:
ca_object = certificatemanager.get_ca(ca)
except certificatemanager.CertificateAuthorityNotFoundError:
except CertificateAuthorityNotFoundError:
return jsonify({'error': 'Provided CA not found.'}), 404
data = request.get_json()

if not data or not data.get('csr'):
return jsonify(
{'error': 'csr must be provided in the POST body.'},
), 400
return (
jsonify(
{'error': 'csr must be provided in the POST body.'},
),
400,
)
validity = data.get(
'validity',
ca_object.settings['max_validity_days'],
Expand All @@ -188,9 +196,12 @@ def get_certificate_from_csr(ca):
csr = ca_object.decode_csr(data['csr'])
except Exception:
logger.exception('Failed to decode PEM csr')
return jsonify(
{'error': 'csr could not be decoded'},
), 400
return (
jsonify(
{'error': 'csr could not be decoded'},
),
400,
)
# Get the cn and san values from the csr object, so that we can use them
# for the ACL check.
cn = ca_object.get_csr_common_name(csr)
Expand All @@ -206,28 +217,24 @@ def get_certificate_from_csr(ca):
'san': san,
},
):
msg = ('{} does not have access to get certificate cn {} against'
' ca {}').format(
authnz.get_logged_in_user(),
cn,
ca,
msg = (
f'{authnz.get_logged_in_user()} does not have access to get'
'certificate cn {cn} against ca {ca}'
)
error_msg = {'error': msg, 'reference': cn}
return jsonify(error_msg), 403

logger.info(
'get_certificate called on id={} for ca={} by user={}'.format(
cn,
ca,
logged_in_user,
)
'get_certificate called on id=%s for ca=%s by user=%s',
cn,
ca,
logged_in_user,
)

arn = ca_object.issue_certificate(data['csr'], validity)
certificate = ca_object.get_certificate_from_arn(arn)
certificate_json = ca_object.issue_certificate(data['csr'], validity)
certificate_response = CertificateResponse(
certificate=certificate['certificate'],
certificate_chain=certificate['certificate_chain'],
certificate=certificate_json['certificate'],
certificate_chain=certificate_json['certificate_chain'],
)
return certificate_response_schema.dumps(certificate_response)

Expand Down Expand Up @@ -284,7 +291,7 @@ def list_cas():

cas = certificatemanager.list_cas()

logger.info('list_cas called by user={}'.format(logged_in_user))
logger.info('list_cas called by user=%s', logged_in_user)

cas_response = CertificateAuthoritiesResponse.from_cas(cas)
return certificate_authorities_response_schema.dumps(cas_response)
Expand Down Expand Up @@ -331,29 +338,23 @@ def get_ca(ca):
'''
try:
ca_object = certificatemanager.get_ca(ca)
except certificatemanager.CertificateAuthorityNotFoundError:
except CertificateAuthorityNotFoundError:
return jsonify({'error': 'Provided CA not found.'}), 404

logged_in_user = authnz.get_logged_in_user()

if not acl_module_check(
resource_type='ca',
action='get',
resource_id=ca,
):
msg = '{} does not have access to get ca {}'.format(
authnz.get_logged_in_user(),
ca,
)
msg = f'''
{authnz.get_logged_in_user()} does not have access to get ca {ca}
'''
error_msg = {'error': msg, 'reference': ca}
return jsonify(error_msg), 403

logger.info(
'get_ca called on id={} by user={}'.format(
ca,
logged_in_user,
)
)

logger.info('get_ca called on id=%s by user=%s', ca, logged_in_user)
_ca = ca_object.get_certificate_authority_certificate()
ca_response = CertificateAuthorityResponse(
ca=_ca['ca'],
Expand Down
Loading

0 comments on commit 78f7057

Please sign in to comment.