Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix noncompliant keyids #338

Merged
merged 3 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 100 additions & 9 deletions signer/tuf_on_ci_sign/_signer_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

"""Internal repository module for TUF-on-CI signer tools"""

import copy
import filecmp
import json
import logging
Expand Down Expand Up @@ -739,27 +740,117 @@ def status(self, rolename: str) -> str:

return "\n".join(output)

def _get_legacy_root_key(self, key: Key) -> Key | None:
# calculate keyid _without custom metadata_, then lookup key from known good
# root keys. This is useful in import situation where the legacy key does not
# have the custom metadata but is otherwise the same key
def _calculate_keyid(key: Key) -> str:
data: bytes = encode_canonical(key.to_dict()).encode()
hasher = digest("sha256")
hasher.update(data)
return hasher.hexdigest()

test_key = copy.deepcopy(key)
del test_key.unrecognized_fields["x-tuf-on-ci-keyowner"]
legacy_keyid = _calculate_keyid(test_key)
return self._known_good_root().keys.get(legacy_keyid)

def sign(self, rolename: str):
"""Sign without payload changes"""
md = self.open(rolename)
signing_keys: dict[str, Key] = {}
for key in self._get_keys(rolename):
keyowner = key.unrecognized_fields["x-tuf-on-ci-keyowner"]
if keyowner == self.user.name:
self._sign(rolename, md, key)
self._write(rolename, md)
return
signing_keys[key.keyid] = key

# Root is eligible to sign current root if the signer was valid
# in previous version
if rolename == "root":
# special case for import: if the same key was used with different keyid
# we want to sign with that keyid too
for key in list(signing_keys.values()):
legacy_key = self._get_legacy_root_key(key)
if legacy_key:
signing_keys[legacy_key.keyid] = legacy_key

# user is also eligible to sign current root if they were a signer
# in previous version
for key in self._get_keys(rolename, True):
keyowner = key.unrecognized_fields["x-tuf-on-ci-keyowner"]
if keyowner == self.user.name:
self._sign(rolename, md, key)
self._write(rolename, md)
return
signing_keys[key.keyid] = key

if not signing_keys:
raise ValueError(f"{rolename} signing key for {self.user.name} not found")

for key in signing_keys.values():
self._sign(rolename, md, key)
self._write(rolename, md)

def force_compliant_keyids(self, rolename: str) -> bool:
"""Make all keyids defined in rolename spec compliant

This is a hidden feature to fix issue #294. It updates all keyids defined
in the role so that they are spec compliant: this means changes in the
delegated roles that use the keyids and requires resigning the metadata
of those roles.

Requires resigning with care: Root signatures should be duplicated for new and
old keyids."""

def _calculate_keyid(key: Key) -> str:
data: bytes = encode_canonical(key.to_dict()).encode()
hasher = digest("sha256")
hasher.update(data)
return hasher.hexdigest()

changed = False
delegates = set()
if rolename == "root":
with self.edit_root() as root:
for key in list(root.keys.values()):
compliant_keyid = _calculate_keyid(key)
if key.keyid == compliant_keyid:
continue
# Update keyid in all roles
for rolename, role in root.roles.items():
for i, id in enumerate(role.keyids):
if id == key.keyid:
role.keyids[i] = compliant_keyid
if rolename == "targets":
delegates.add(rolename)

# update the actual key
del root.keys[key.keyid]
key.keyid = compliant_keyid
root.keys[key.keyid] = key

changed = True
elif rolename == "targets":
with self.edit_targets() as targets:
if not targets.delegations or not targets.delegations.roles:
raise AbortEdit
for key in list(targets.delegations.keys.values()):
compliant_keyid = _calculate_keyid(key)
if key.keyid == compliant_keyid:
continue
# Update keyid in the key and all roles
for rolename, role in targets.delegations.roles.items():
for i, id in enumerate(role.keyids):
if id == key.keyid:
role.keyids[i] = compliant_keyid
delegates.add(rolename)
# update the actual key
del targets.delegations.keys[key.keyid]
key.keyid = compliant_keyid
targets.delegations.keys[key.keyid] = key
changed = True

for delegate in delegates:
# Force resigning of delegates
with self.edit_targets(delegate):
pass

raise ValueError(f"{rolename} signing key for {self.user.name} not found")
return changed


def build_paths(rolename: str, depth: int) -> list[str]:
Expand Down
14 changes: 11 additions & 3 deletions signer/tuf_on_ci_sign/delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,16 @@ def _update_offline_role(repo: SignerRepository, role: str) -> bool:
@click.version_option()
@click.option("-v", "--verbose", count=True, default=0)
@click.option("--push/--no-push", default=True)
@click.option("--force-compliant-keyids", hidden=True, is_flag=True)
@click.argument("event-name", metavar="SIGNING-EVENT")
@click.argument("role", required=False)
def delegate(verbose: int, push: bool, event_name: str, role: str | None):
def delegate(
verbose: int,
push: bool,
force_compliant_keyids: bool,
event_name: str,
role: str | None,
):
"""Tool for modifying TUF-on-CI delegations."""
logging.basicConfig(level=logging.WARNING - verbose * 10)

Expand All @@ -357,7 +364,9 @@ def delegate(verbose: int, push: bool, event_name: str, role: str | None):
if role is None:
role = click.prompt(bold("Enter name of role to modify"))

if role in ["timestamp", "snapshot"]:
if force_compliant_keyids:
changed = repo.force_compliant_keyids(role)
elif role in ["timestamp", "snapshot"]:
changed = _update_online_roles(repo)
else:
changed = _update_offline_role(repo, role)
Expand All @@ -381,7 +390,6 @@ def delegate(verbose: int, push: bool, event_name: str, role: str | None):
git_expect(
["commit", "-m", f"Signed by {user_config.name}", "--signoff"]
)

if push:
push_changes(user_config, event_name, msg)
else:
Expand Down
5 changes: 5 additions & 0 deletions signer/tuf_on_ci_sign/import_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def import_repo(verbose: int, push: bool, event_name: str, import_file: str | No
ok = _update_keys(root.keys, role_data) and ok
if not ok:
raise AbortEdit("Missing values")

else:
with repo.edit_targets(rolename) as targets:
ok = _update_expiry(targets, role_data) and ok
Expand All @@ -172,6 +173,10 @@ def import_repo(verbose: int, push: bool, event_name: str, import_file: str | No
if not ok:
raise AbortEdit("Missing values")

# we have updated keys defined in root/targets: make sure keyids are compliant
repo.force_compliant_keyids("root")
repo.force_compliant_keyids("targets")

if not ok:
print("Error: Undefined values found. please save this in a file,")
print("fill in the values and use the file as import-file argument:\n")
Expand Down