From df28ea9b72e5a1ea090a3c16c361d01fe7c7f396 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 6 Feb 2024 13:26:23 +0200 Subject: [PATCH 1/2] Repository: Start using VerificationResult This simplifies the handling quite a lot: * status() result is now a lot easier to understand * root version is now checked by get_root_verification_result(): tuf-on-ci does not need to * No need to build a list of signed and unsigned signers anymore (just translate keys to signer names) --- repo/pyproject.toml | 2 +- repo/tuf_on_ci/_repository.py | 102 +++++++++----------------------- repo/tuf_on_ci/signing_event.py | 51 +++++++++------- signer/pyproject.toml | 6 +- 4 files changed, 62 insertions(+), 99 deletions(-) diff --git a/repo/pyproject.toml b/repo/pyproject.toml index 231dbb8e..5d7f885d 100644 --- a/repo/pyproject.toml +++ b/repo/pyproject.toml @@ -13,7 +13,7 @@ description = "TUF-on-CI repository tools, intended to be executed on a CI syste readme = "README.md" dependencies = [ "securesystemslib[awskms, azurekms, gcpkms, sigstore, pynacl] ~= 0.30", - "tuf ~= 3.0", + "tuf @ git+https://github.com/theupdateframework/python-tuf", "click ~= 8.1", ] requires-python = ">=3.10" diff --git a/repo/tuf_on_ci/_repository.py b/repo/tuf_on_ci/_repository.py index 16950556..d2926fa2 100644 --- a/repo/tuf_on_ci/_repository.py +++ b/repo/tuf_on_ci/_repository.py @@ -7,7 +7,6 @@ from enum import Enum, unique from glob import glob -from securesystemslib.exceptions import UnverifiedSignatureError from securesystemslib.signer import ( KEY_FOR_TYPE_AND_SCHEME, SIGNER_FOR_URI_SCHEME, @@ -22,12 +21,14 @@ Metadata, MetaFile, Root, + RootVerificationResult, Snapshot, TargetFile, Targets, Timestamp, + VerificationResult, ) -from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer +from tuf.api.serialization.json import JSONSerializer from tuf.repository import AbortEdit, Repository # sigstore is not a supported key by default @@ -59,9 +60,7 @@ def __str__(self): @dataclass class SigningStatus: invites: set[str] # invites to _delegations_ of the role - signed: set[str] - missing: set[str] - threshold: int + verification_result: VerificationResult | RootVerificationResult target_changes: list[TargetState] valid: bool message: str | None @@ -261,9 +260,7 @@ def open_prev(self, role: str) -> Metadata | None: return None - def _validate_role( - self, delegator: Metadata, rolename: str - ) -> tuple[bool, str | None]: + def _validate_update(self, rolename: str) -> tuple[bool, str | None]: """Validate role compatibility with this repository Returns bool for validity and optional error message""" @@ -286,11 +283,6 @@ def _validate_role( if not md.signed.consistent_snapshot: return False, "Consistent snapshot is not enabled" - # Specification: root version must be x+1, not just larger - if prev_md and prev_md.signed != md.signed: - if md.signed.version != prev_md.signed.version + 1: - return False, f"Version {md.signed.version} is not valid for root" - # tuf-on-ci online signer must be the same for both roles ts_role = md.signed.get_delegated_role(Timestamp.type) sn_role = md.signed.get_delegated_role(Snapshot.type) @@ -318,11 +310,6 @@ def _validate_role( # * check there are no delegations # * check that target files in metadata match the files in targets/ - try: - delegator.verify_delegate(rolename, md) - except UnsignedMetadataError: - return False, None - return True, None @staticmethod @@ -404,36 +391,30 @@ def _get_target_changes(self, rolename: str) -> list[TargetState]: return changes - def _get_signing_status( - self, rolename: str, known_good: bool - ) -> SigningStatus | None: - """Build signing status for role. + def status(self, rolename: str) -> SigningStatus: + """Returns signing status for role. + + Uses .signing-event-state file.""" + + if rolename in [Timestamp.type, Snapshot.type]: + raise ValueError("Not supported for online metadata") - This method relies on event state (.signing-event-state) to be accurate. - Returns None only when known_good is True, and then in two cases: if delegating - role is not root (because then the known good state is irrelevant) and also if - there is no known good version yet. - """ invites = set() - sigs = set() - missing_sigs = set() md = self.open(rolename) + bytes = md.signed_bytes + sigs = md.signatures - # Find delegating metadata. For root handle the special case of known good - # delegating metadata. - if known_good: - delegator = None - if rolename == "root": - delegator = self.open_prev("root") - if not delegator: - # Not root role or there is no known-good root metadata yet - return None - elif rolename == "root": - delegator = self.open("root") + # Get verification result. Handle previous root if it exists + if rolename == "root": + root = self.root() + prev_md: Metadata[Root] | None = self.open_prev("root") + prev_root = prev_md.signed if prev_md else None + vr: VerificationResult | RootVerificationResult + vr = root.get_root_verification_result(prev_root, bytes, sigs) elif rolename == "targets": - delegator = self.open("root") + vr = self.root().get_verification_result(rolename, bytes, sigs) else: - delegator = self.open("targets") + vr = self.targets().get_verification_result(rolename, bytes, sigs) # Build list of invites to all delegated roles of rolename delegation_names = [] @@ -445,45 +426,18 @@ def _get_signing_status( for delegation_name in delegation_names: invites.update(self.state.invited_signers_for_role(delegation_name)) - role = delegator.signed.get_delegated_role(rolename) - - # Build lists of signed signers and not signed signers - for key in self._get_keys(rolename, known_good): - keyowner = key.unrecognized_fields["x-tuf-on-ci-keyowner"] - try: - payload = CanonicalJSONSerializer().serialize(md.signed) - key.verify_signature(md.signatures[key.keyid], payload) - sigs.add(keyowner) - except (KeyError, UnverifiedSignatureError): - missing_sigs.add(keyowner) - # Document changes to targets metadata in this signing event target_changes = self._get_target_changes(rolename) - # Just to be sure: double check that delegation threshold is reached + # Calculate signing event state if invites: valid, msg = False, None + elif not vr: + valid, msg = False, None else: - valid, msg = self._validate_role(delegator, rolename) - - return SigningStatus( - invites, sigs, missing_sigs, role.threshold, target_changes, valid, msg - ) - - def status(self, rolename: str) -> tuple[SigningStatus, SigningStatus | None]: - """Returns signing status for role. - - In case of root, another SigningStatus may be returned for the previous - 'known good' root. - Uses .signing-event-state file.""" - if rolename in ["timestamp", "snapshot"]: - raise ValueError("Not supported for online metadata") - - known_good_status = self._get_signing_status(rolename, known_good=True) - signing_event_status = self._get_signing_status(rolename, known_good=False) - assert signing_event_status is not None + valid, msg = self._validate_update(rolename) - return signing_event_status, known_good_status + return SigningStatus(invites, vr, target_changes, valid, msg) def build(self, metadata_path: str, artifact_path: str | None): """Build a publishable directory of metadata and (optionally) artifacts""" diff --git a/repo/tuf_on_ci/signing_event.py b/repo/tuf_on_ci/signing_event.py index 56c197ee..88914de2 100644 --- a/repo/tuf_on_ci/signing_event.py +++ b/repo/tuf_on_ci/signing_event.py @@ -11,6 +11,7 @@ from tempfile import TemporaryDirectory import click +from tuf.api.metadata import Key, VerificationResult from tuf_on_ci._repository import CIRepository @@ -87,20 +88,25 @@ def _find_changed_target_roles( def _role_status(repo: CIRepository, role: str, event_name) -> bool: - status, prev_status = repo.status(role) - role_is_valid = status.valid - sig_counts = f"{len(status.signed)}/{status.threshold}" - signed = status.signed - missing = status.missing - - # Handle the additional status for the possible previous, known good root version: - if prev_status: - role_is_valid = role_is_valid and prev_status.valid - sig_counts = f"{len(prev_status.signed)}/{prev_status.threshold} ({sig_counts})" - signed = signed | prev_status.signed - missing = missing | prev_status.missing - - if role_is_valid and not status.invites: + def signer(key: Key) -> str: + return key.unrecognized_fields["x-tuf-on-ci-keyowner"] + + status = repo.status(role) + vr = status.verification_result + + # Build the signature count description string: + if isinstance(vr, VerificationResult): + sig_counts = f"{len(vr.signed)}/{vr.threshold}" + else: + sig_counts = ( + f"{len(vr.first.signed)}/{vr.first.threshold} " + + f"({len(vr.second.signed)}/{vr.second.threshold})" + ) + # build strings of signed and unsigned signer names + signed = ", ".join([signer(key) for key in vr.signed.values()]) + unsigned = ", ".join([signer(key) for key in vr.unsigned.values()]) + + if status.valid: emoji = "white_check_mark" else: emoji = "x" @@ -113,29 +119,28 @@ def _role_status(repo: CIRepository, role: str, event_name) -> bool: "Invitees can accept the invitations by running " f"`tuf-on-ci-sign {event_name}`" ) - - if not status.invites: + else: if status.target_changes: click.echo(f"Role `{role}` contains following artifact changes:") for target_state in status.target_changes: click.echo(f" * {target_state}") click.echo("") - if role_is_valid: + if status.valid: click.echo( f"Role `{role}` is verified and signed by {sig_counts} signers " - f"({', '.join(signed)})." + f"({signed})." ) - elif signed: + elif vr.signed: click.echo( f"Role `{role}` is not yet verified. It is signed by {sig_counts} " - f"signers ({', '.join(signed)})." + f"signers ({signed})." ) else: click.echo(f"Role `{role}` is unsigned and not yet verified") - if missing: - click.echo(f"Still missing signatures from {', '.join(missing)}") + if vr.unsigned: + click.echo(f"Still missing signatures from {unsigned}") click.echo( "Signers can sign these changes by running " f"`tuf-on-ci-sign {event_name}`" @@ -144,7 +149,7 @@ def _role_status(repo: CIRepository, role: str, event_name) -> bool: if status.message: click.echo(f"**Error**: {status.message}") - return role_is_valid and not status.invites + return status.valid @click.command() # type: ignore[arg-type] diff --git a/signer/pyproject.toml b/signer/pyproject.toml index af629356..0d76fc63 100644 --- a/signer/pyproject.toml +++ b/signer/pyproject.toml @@ -2,6 +2,10 @@ requires = ["hatchling"] build-backend = "hatchling.build" +[tool.hatch.metadata] +# for git dependencies +allow-direct-references = true + [project] name = "tuf-on-ci-sign" version = "0.5.0" @@ -9,7 +13,7 @@ description = "Signing tools for TUF-on-CI" readme = "README.md" dependencies = [ "securesystemslib[awskms,azurekms,gcpkms,hsm,sigstore] ~= 0.30", - "tuf ~= 3.0", + "tuf @ git+https://github.com/theupdateframework/python-tuf", "click ~= 8.1", ] requires-python = ">=3.10" From 3a3da4cf000bf2f76ec509656a31f64484ca59fc Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Tue, 6 Feb 2024 14:03:31 +0200 Subject: [PATCH 2/2] repository: Update to some new tuf API * Use verify_delegate() from Root, Targets * Use helpers like Repository.root(), Repository.targets() --- repo/tuf_on_ci/_repository.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/repo/tuf_on_ci/_repository.py b/repo/tuf_on_ci/_repository.py index d2926fa2..67e87aa7 100644 --- a/repo/tuf_on_ci/_repository.py +++ b/repo/tuf_on_ci/_repository.py @@ -212,9 +212,8 @@ def close(self, rolename: str, md: Metadata) -> None: md.signatures[key.keyid] = Signature(key.keyid, "") if rolename in ["timestamp", "snapshot"]: - root_md: Metadata[Root] = self.open("root") # repository should never write unsigned online roles - root_md.verify_delegate(rolename, md) + self.root().verify_delegate(rolename, md.signed_bytes, md.signatures) filename = self._get_filename(rolename) data = md.to_bytes(JSONSerializer()) @@ -509,17 +508,17 @@ def is_signed(self, rolename: str) -> bool: false in this case: this is useful when repository decides if it needs a new online role version. """ - role_md = self.open(rolename) + md = self.open(rolename) if rolename in ["root", "timestamp", "snapshot", "targets"]: - delegator = self.open("root") + delegator: Root | Targets = self.root() else: - delegator = self.open("targets") + delegator = self.targets() try: - delegator.verify_delegate(rolename, role_md) + delegator.verify_delegate(rolename, md.signed_bytes, md.signatures) except UnsignedMetadataError: return False signing_days, _ = self.signing_expiry_period(rolename) delta = timedelta(days=signing_days) - return datetime.utcnow() + delta < role_md.signed.expires + return datetime.utcnow() + delta < md.signed.expires