Skip to content

Commit

Permalink
security: import certificates in initramfs
Browse files Browse the repository at this point in the history
Also dump for transfer durig switchroot so that the certificates
can be potentially imported early after switchroot by a service.

Resolves: INSTALLER-4030
  • Loading branch information
rvykydal committed Jan 3, 2025
1 parent 4be880f commit f6a0113
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
41 changes: 41 additions & 0 deletions dracut/parse-kickstart
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ TMPDIR = "/tmp"
ARPHRD_ETHER = "1"
ARPHRD_INFINIBAND = "32"

CERT_TRANSPORT_DIR = "/run/install/certificates"

# Helper function for reading simple files in /sys
def readsysfile(f):
'''Return the contents of f, or "" if missing.'''
Expand Down Expand Up @@ -403,6 +405,44 @@ def ksnet_to_dracut(args, lineno, net, bootdev=False):

return " ".join(line)


def _dump_certificate(cert, root="/", dump_dir=None):
"""Dump the certificate into specified file."""
dump_dir = dump_dir or cert.dir
if not dump_dir:
log.error("Certificate destination is missing for %s", cert.filename)
return

dst_dir = os.path.join(root, dump_dir.lstrip('/'))
log.debug("Dumping certificate %s into %s.", cert.filename, dst_dir)
if not os.path.exists(dst_dir):
log.debug("Path %s for certificate does not exist, creating.", dst_dir)
os.makedirs(dst_dir)

dst = os.path.join(dst_dir, cert.filename)

if os.path.exists(dst):
log.warning("Certificate file %s already exists, replacing.", dst)

with open(dst, 'w') as f:
f.write(cert.cert)
f.write('\n')


def process_certificates(handler):
"""Import certificates defined in %certificate sections."""
for cert in handler.certificates:
log.info("Processing kickstart certificate %s", cert.filename)

if not cert.filename:
log.error("Missing certificate file name, skipping.")
continue

_dump_certificate(cert)
# Dump for transport to switchroot
_dump_certificate(cert, root=CERT_TRANSPORT_DIR+"/path/")


def process_kickstart(ksfile):
handler = DracutHandler()
try:
Expand All @@ -422,6 +462,7 @@ def process_kickstart(ksfile):
with open(TMPDIR+"/ks.info", "a") as f:
f.write('parsed_kickstart="%s"\n' % processed_file)
log.info("finished parsing kickstart")
process_certificates(handler)
return processed_file, handler.output

if __name__ == '__main__':
Expand Down
69 changes: 69 additions & 0 deletions tests/unit_tests/dracut_tests/test_parse_kickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@
import tempfile
import unittest

CERT_CONTENT = """-----BEGIN CERTIFICATE-----
MIIBjTCCATOgAwIBAgIUWR5HO3v/0I80Ne0jQWVZFODuWLEwCgYIKoZIzj0EAwIw
FDESMBAGA1UEAwwJUlZURVNUIENBMB4XDTI0MTEyMDEzNTk1N1oXDTM0MTExODEz
NTk1N1owFDESMBAGA1UEAwwJUlZURVNUIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0D
AQcDQgAELghFKGEgS8+5/2nx50W0xOqTrKc2Jz/rD/jfL0m4z4fkeAslCOkIKv74
0wfBXMngxi+OF/b3Vh8FmokuNBQO5qNjMGEwHQYDVR0OBBYEFOJarl9Xkd13sLzI
mHqv6aESlvuCMB8GA1UdIwQYMBaAFOJarl9Xkd13sLzImHqv6aESlvuCMA8GA1Ud
EwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgEGMAoGCCqGSM49BAMCA0gAMEUCIAet
7nyre42ReoRKoyHWLDsQmQDzoyU3FQdC0cViqOtrAiEAxYIL+XTTp7Xy9RNE4Xg7
yNWXfdraC/AfMM8fqsxlVJM=
-----END CERTIFICATE-----"""


class BaseTestCase(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -243,3 +255,60 @@ def test_bootloader(self):
lines = self.execParseKickstart(ks_file.name)

assert lines[0] == "inst.extlinux", lines

def _check_cert_file(self, cert_file, content):
with open(cert_file) as f:
# Anaconda adds `\n` to the value when dumping it
assert f.read() == content+'\n'

def test_certificate(self):
filename = "rvtest.pem"
cdir = os.path.join(self.tmpdir, "cert_dir/subdir")
content = CERT_CONTENT
ks_cert = f"""
%certificate --filename={filename} --dir={cdir}
{content}
%end
"""
cert_file = os.path.join(cdir, filename)

with tempfile.NamedTemporaryFile(mode="w+t") as ks_file:
ks_file.write(ks_cert)
ks_file.flush()
lines = self.execParseKickstart(ks_file.name)
assert lines == []

self._check_cert_file(cert_file, content)

# Check existence for file for transport to root
CERT_TRANSPORT_DIR = "/run/install/certificates"
transport_file = os.path.join(CERT_TRANSPORT_DIR, cert_file)
self._check_cert_file(transport_file, content)

def test_certificate_existing(self):
filename = "rvtest.pem"
cdir = os.path.join(self.tmpdir, "cert_dir/subdir")
content = CERT_CONTENT
ks_cert = f"""
%certificate --filename={filename} --dir={cdir}
{content}
%end
"""
cert_file = os.path.join(cdir, filename)

# Existing file should be overwritten
os.makedirs(cdir)
open(cert_file, 'w')

with tempfile.NamedTemporaryFile(mode="w+t") as ks_file:
ks_file.write(ks_cert)
ks_file.flush()
lines = self.execParseKickstart(ks_file.name)
assert lines == []

self._check_cert_file(cert_file, content)

# Check existence for file for transport to root
CERT_TRANSPORT_DIR = "/run/install/certificates"
transport_file = os.path.join(CERT_TRANSPORT_DIR, cert_file)
self._check_cert_file(transport_file, content)

0 comments on commit f6a0113

Please sign in to comment.