Skip to content

Commit

Permalink
Add PemBundleTrustStore and update pki customizations
Browse files Browse the repository at this point in the history
Update customizations for generating ca-certificates on Linux systems.

Add trust_roots_key for an org's trust roots.

Add PemBundleTrustStore
  • Loading branch information
hartmans authored and kdienes committed Aug 5, 2024
1 parent aad5084 commit f5f807d
Show file tree
Hide file tree
Showing 4 changed files with 3,726 additions and 47 deletions.
146 changes: 100 additions & 46 deletions carthage/pki.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,46 +168,6 @@ def __init__(self, **kwargs):

__all__ += ['EntanglementPkiManager']

def install_root_cert_customization(get_certificate_info):
'''
Return a customization that installs one or more root certificates.
:param get_get_certificate_info: A callback function resolved in the context of the customization's :class:`AsyncInjector` that returns a sequence of [certificate_name, pem_certificate]. The *certificate_name* uniquely identifies the certificate and *pem_certificate* is the root certificate to add.
'''
class InstallRootCertCustomization(machine.FilesystemCustomization):

@setup_task("Install Root Certs")
async def install_root_cert(self):
certificates_to_install = await self.ainjector(get_certificate_info)
carthage_cert_dir = os.path.join(
self.path,
"usr/share/ca-certificates/carthage")
os.makedirs(carthage_cert_dir, exist_ok=True)
for name, pem_cert in certificates_to_install:
with open(os.path.join(
carthage_cert_dir, f"{name}.crt"),
"wt") as f:
f.write(pem_cert)
with open(os.path.join(
self.path, "etc/ca-certificates.conf"),
"ta") as f:
f.write(f"carthage/{name}.crt\n")
await self.run_command("/usr/sbin/update-ca-certificates")

return InstallRootCertCustomization

__all__ += ['install_root_cert_customization']

@inject(
pki=PkiManager)
def pki_manager_certificate_info(pki):
return [('carthage_pki', pki.ca_cert)]

PkiCustomizations = install_root_cert_customization(pki_manager_certificate_info)

__all__ += ["PkiCustomizations"]

class TrustStore(AsyncInjectable):


Expand Down Expand Up @@ -258,6 +218,84 @@ def __repr__(self):

__all__ += ['TrustStore']

#: The trust store for using when contacting some API service, instantiated on the injector of the service being contacted.
contact_trust_store_key = InjectionKey(TrustStore, role='contact')

__all__ += ['contact_trust_store_key']

#: The key for a trust store for validating client certificates; always optional
client_certificate_trust_store_key = InjectionKey(TrustStore, role='client_certificate', _optional=True)

__all__ += ['client_certificate_trust_store_key']

#: The organization's local trust roots (inherently optional)
trust_roots_key = InjectionKey(TrustStore, role='organization_trust_roots', _optional=True)

__all__ += ['trust_roots_key']

def install_root_cert_customization(get_certificate_info):
'''
Return a customization that installs one or more root certificates.
:param get_get_certificate_info: A callback function resolved in the context of the customization's :class:`AsyncInjector` that returns a sequence of [certificate_name, pem_certificate]. The *certificate_name* uniquely identifies the certificate and *pem_certificate* is the root certificate to add.
'''
class InstallRootCertCustomization(machine.FilesystemCustomization):

@setup_task("Install Root Certs")
async def install_root_cert(self):
certificates_to_install = await self.ainjector(get_certificate_info)
carthage_cert_dir = os.path.join(
self.path,
"usr/share/ca-certificates/carthage")
os.makedirs(carthage_cert_dir, exist_ok=True)
for name, pem_cert in certificates_to_install.items():
with open(os.path.join(
carthage_cert_dir, f"{name}.crt"),
"wt") as f:
f.write(pem_cert)
with open(os.path.join(
self.path, "etc/ca-certificates.conf"),
"ta") as f:
f.write(f"carthage/{name}.crt\n")
await self.run_command("/usr/sbin/update-ca-certificates")

return InstallRootCertCustomization

__all__ += ['install_root_cert_customization']

@inject(
pki=InjectionKey(PkiManager, _optional=True),
trust_roots=trust_roots_key)
async def pki_manager_certificate_info(pki, trust_roots):
'''
Returns two sets of potential trust roots:
* PkiManager.trust_store.trusted_certificates: the trust roots necessary to trust certificates issued by the PkiManager Carthage is using
* trust_roots_key.trusted_certificates: the trust roots local to the current organization.
These trust roots may be overlapping. Duplicate tags are suppressed. For example if the PkiManager is using the local organization's CA, the trust roots may be identical.
'''
certificates_to_install = {}
if pki:
pki_trust_store = await pki.trust_store()
else:
pki_trust_store = None
for store in (trust_roots, pki_trust_store):
if store is None: continue
async for tag, cert in store.trusted_certificates():
if tag in certificates_to_install: continue
certificates_to_install[tag] = cert
return certificates_to_install
return [('carthage_pki', pki.ca_cert)]

PkiCustomizations = install_root_cert_customization(pki_manager_certificate_info)

__all__ += ["PkiCustomizations"]


class SimpleTrustStore(TrustStore):

'''
Expand Down Expand Up @@ -305,12 +343,28 @@ async def trusted_certificates(self):

__all__ += ['LetsencryptTrustStore']

#: The trust store for using when contacting some API service, instantiated on the injector of the service being contacted.
contact_trust_store_key = InjectionKey(TrustStore, role='contact')
class PemBundleTrustStore(TrustStore):

__all__ += ['contact_trust_store_key']
'''
A trust store that takes certificates from a CA file containing multiple certificates in a file.
#: The key for a trust store for validating client certificates; always optional
client_certificate_trust_store_key = InjectionKey(TrustStore, role='client_certificate', _optional=True)
:param name: The name of the trust store
__all__ += ['client_certificate_trust_store_key']
:param pem_bundle: a file containing one or more PEM certificates.
'''

def __init__(self, name,pem_bundle, **kwargs):
super().__init__(name=name, **kwargs)
self.pem_bundle = pathlib.Path(pem_bundle)

async def trusted_certificates(self):
counter = 1
for cert in split_pem(self.pem_bundle.read_text()):
try:
cert = x509_modify(cert)
except (ImportError, NameError): pass
yield f'{self.name}_{counter}', cert
counter += 1

__all__ += ['PemBundleTrustStore']
Loading

0 comments on commit f5f807d

Please sign in to comment.