diff --git a/signer/tuf_on_ci_sign/_signer_repository.py b/signer/tuf_on_ci_sign/_signer_repository.py index 26abc836..ab3ddca3 100644 --- a/signer/tuf_on_ci_sign/_signer_repository.py +++ b/signer/tuf_on_ci_sign/_signer_repository.py @@ -2,6 +2,7 @@ """Internal repository module for TUF-on-CI signer tools""" +import copy import filecmp import json import logging @@ -739,27 +740,117 @@ def status(self, rolename: str) -> str: return "\n".join(output) + def _get_legacy_root_key(self, key: Key) -> Key | None: + # calculate keyid _without custom metadata_, then lookup key from known good + # root keys. This is useful in import situation where the legacy key does not + # have the custom metadata but is otherwise the same key + def _calculate_keyid(key: Key) -> str: + data: bytes = encode_canonical(key.to_dict()).encode() + hasher = digest("sha256") + hasher.update(data) + return hasher.hexdigest() + + test_key = copy.deepcopy(key) + del test_key.unrecognized_fields["x-tuf-on-ci-keyowner"] + legacy_keyid = _calculate_keyid(test_key) + return self._known_good_root().keys.get(legacy_keyid) + def sign(self, rolename: str): """Sign without payload changes""" md = self.open(rolename) + signing_keys: dict[str, Key] = {} for key in self._get_keys(rolename): keyowner = key.unrecognized_fields["x-tuf-on-ci-keyowner"] if keyowner == self.user.name: - self._sign(rolename, md, key) - self._write(rolename, md) - return + signing_keys[key.keyid] = key - # Root is eligible to sign current root if the signer was valid - # in previous version if rolename == "root": + # special case for import: if the same key was used with different keyid + # we want to sign with that keyid too + for key in list(signing_keys.values()): + legacy_key = self._get_legacy_root_key(key) + if legacy_key: + signing_keys[legacy_key.keyid] = legacy_key + + # user is also eligible to sign current root if they were a signer + # in previous version for key in self._get_keys(rolename, True): keyowner = key.unrecognized_fields["x-tuf-on-ci-keyowner"] if keyowner == self.user.name: - self._sign(rolename, md, key) - self._write(rolename, md) - return + signing_keys[key.keyid] = key + + if not signing_keys: + raise ValueError(f"{rolename} signing key for {self.user.name} not found") + + for key in signing_keys.values(): + self._sign(rolename, md, key) + self._write(rolename, md) + + def force_compliant_keyids(self, rolename: str) -> bool: + """Make all keyids defined in rolename spec compliant + + This is a hidden feature to fix issue #294. It updates all keyids defined + in the role so that they are spec compliant: this means changes in the + delegated roles that use the keyids and requires resigning the metadata + of those roles. + + Requires resigning with care: Root signatures should be duplicated for new and + old keyids.""" + + def _calculate_keyid(key: Key) -> str: + data: bytes = encode_canonical(key.to_dict()).encode() + hasher = digest("sha256") + hasher.update(data) + return hasher.hexdigest() + + changed = False + delegates = set() + if rolename == "root": + with self.edit_root() as root: + for key in list(root.keys.values()): + compliant_keyid = _calculate_keyid(key) + if key.keyid == compliant_keyid: + continue + # Update keyid in all roles + for rolename, role in root.roles.items(): + for i, id in enumerate(role.keyids): + if id == key.keyid: + role.keyids[i] = compliant_keyid + if rolename == "targets": + delegates.add(rolename) + + # update the actual key + del root.keys[key.keyid] + key.keyid = compliant_keyid + root.keys[key.keyid] = key + + changed = True + elif rolename == "targets": + with self.edit_targets() as targets: + if not targets.delegations or not targets.delegations.roles: + raise AbortEdit + for key in list(targets.delegations.keys.values()): + compliant_keyid = _calculate_keyid(key) + if key.keyid == compliant_keyid: + continue + # Update keyid in the key and all roles + for rolename, role in targets.delegations.roles.items(): + for i, id in enumerate(role.keyids): + if id == key.keyid: + role.keyids[i] = compliant_keyid + delegates.add(rolename) + # update the actual key + del targets.delegations.keys[key.keyid] + key.keyid = compliant_keyid + targets.delegations.keys[key.keyid] = key + changed = True + + for delegate in delegates: + # Force resigning of delegates + with self.edit_targets(delegate): + pass - raise ValueError(f"{rolename} signing key for {self.user.name} not found") + return changed def build_paths(rolename: str, depth: int) -> list[str]: diff --git a/signer/tuf_on_ci_sign/delegate.py b/signer/tuf_on_ci_sign/delegate.py index 9a8b1cc3..22819e9c 100755 --- a/signer/tuf_on_ci_sign/delegate.py +++ b/signer/tuf_on_ci_sign/delegate.py @@ -338,9 +338,16 @@ def _update_offline_role(repo: SignerRepository, role: str) -> bool: @click.version_option() @click.option("-v", "--verbose", count=True, default=0) @click.option("--push/--no-push", default=True) +@click.option("--force-compliant-keyids", hidden=True, is_flag=True) @click.argument("event-name", metavar="SIGNING-EVENT") @click.argument("role", required=False) -def delegate(verbose: int, push: bool, event_name: str, role: str | None): +def delegate( + verbose: int, + push: bool, + force_compliant_keyids: bool, + event_name: str, + role: str | None, +): """Tool for modifying TUF-on-CI delegations.""" logging.basicConfig(level=logging.WARNING - verbose * 10) @@ -357,7 +364,9 @@ def delegate(verbose: int, push: bool, event_name: str, role: str | None): if role is None: role = click.prompt(bold("Enter name of role to modify")) - if role in ["timestamp", "snapshot"]: + if force_compliant_keyids: + changed = repo.force_compliant_keyids(role) + elif role in ["timestamp", "snapshot"]: changed = _update_online_roles(repo) else: changed = _update_offline_role(repo, role) @@ -381,7 +390,6 @@ def delegate(verbose: int, push: bool, event_name: str, role: str | None): git_expect( ["commit", "-m", f"Signed by {user_config.name}", "--signoff"] ) - if push: push_changes(user_config, event_name, msg) else: diff --git a/signer/tuf_on_ci_sign/import_repo.py b/signer/tuf_on_ci_sign/import_repo.py index ef48b52c..93fab151 100644 --- a/signer/tuf_on_ci_sign/import_repo.py +++ b/signer/tuf_on_ci_sign/import_repo.py @@ -162,6 +162,7 @@ def import_repo(verbose: int, push: bool, event_name: str, import_file: str | No ok = _update_keys(root.keys, role_data) and ok if not ok: raise AbortEdit("Missing values") + else: with repo.edit_targets(rolename) as targets: ok = _update_expiry(targets, role_data) and ok @@ -172,6 +173,10 @@ def import_repo(verbose: int, push: bool, event_name: str, import_file: str | No if not ok: raise AbortEdit("Missing values") + # we have updated keys defined in root/targets: make sure keyids are compliant + repo.force_compliant_keyids("root") + repo.force_compliant_keyids("targets") + if not ok: print("Error: Undefined values found. please save this in a file,") print("fill in the values and use the file as import-file argument:\n")