Skip to content

Commit

Permalink
sigstore, test: honor PublicKeyDetails when loading Keyrings (#953)
Browse files Browse the repository at this point in the history
* sigstore, test: honor PublicKeyDetails when loading Keyrings

Signed-off-by: William Woodruff <william@trailofbits.com>

* lintage

Signed-off-by: William Woodruff <william@trailofbits.com>

---------

Signed-off-by: William Woodruff <william@trailofbits.com>
  • Loading branch information
woodruffw committed Apr 3, 2024
1 parent 1d2c924 commit 426120f
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 149 deletions.
181 changes: 117 additions & 64 deletions sigstore/_internal/trustroot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Iterable, List, NewType
from typing import ClassVar, Iterable, List, NewType

import cryptography.hazmat.primitives.asymmetric.padding as padding
from cryptography.exceptions import InvalidSignature
Expand All @@ -31,6 +32,10 @@
Certificate,
load_der_x509_certificate,
)
from sigstore_protobuf_specs.dev.sigstore.common.v1 import PublicKey as _PublicKey
from sigstore_protobuf_specs.dev.sigstore.common.v1 import (
PublicKeyDetails as _PublicKeyDetails,
)
from sigstore_protobuf_specs.dev.sigstore.common.v1 import TimeRange
from sigstore_protobuf_specs.dev.sigstore.trustroot.v1 import (
CertificateAuthority,
Expand All @@ -44,10 +49,9 @@
from sigstore._utils import (
InvalidKeyError,
KeyID,
UnexpectedKeyFormatError,
PublicKey,
key_id,
load_der_public_key,
load_pem_public_key,
)
from sigstore.errors import MetadataError

Expand Down Expand Up @@ -102,72 +106,119 @@ class KeyringSignatureError(KeyringError):
"""


class Keyring:
@dataclass(init=False)
class Key:
"""
Represents a set of CT signing keys, each of which is a potentially
valid signer for a Signed Certificate Timestamp (SCT).
This structure exists to facilitate key rotation in a CT log.
Represents a key in a `Keyring`.
"""

def __init__(self, keys: List[bytes] = []):
hash_algorithm: hashes.HashAlgorithm
key: PublicKey
key_id: KeyID

_RSA_SHA_256_DETAILS: ClassVar[set[_PublicKeyDetails]] = {
_PublicKeyDetails.PKCS1_RSA_PKCS1V5,
_PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256,
_PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256,
_PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256,
}

_EC_DETAILS_TO_HASH: ClassVar[dict[_PublicKeyDetails, hashes.HashAlgorithm]] = {
_PublicKeyDetails.PKIX_ECDSA_P256_SHA_256: hashes.SHA256(),
_PublicKeyDetails.PKIX_ECDSA_P384_SHA_384: hashes.SHA384(),
_PublicKeyDetails.PKIX_ECDSA_P521_SHA_512: hashes.SHA512(),
}

def __init__(self, public_key: _PublicKey) -> None:
"""
Create a new `Keyring`, with `keys` as the initial set of signing
keys. These `keys` can be in either DER format or PEM encoded.
Construct a key from the given Sigstore PublicKey message.
"""
self._keyring = {}
for key_bytes in keys:
key = None

try:
key = load_pem_public_key(key_bytes)
except UnexpectedKeyFormatError as e:
raise e
except InvalidKeyError:
key = load_der_public_key(key_bytes)
hash_algorithm: hashes.HashAlgorithm
if public_key.key_details in self._RSA_SHA_256_DETAILS:
hash_algorithm = hashes.SHA256()
key = load_der_public_key(public_key.raw_bytes, types=(rsa.RSAPublicKey,))
elif public_key.key_details in self._EC_DETAILS_TO_HASH:
hash_algorithm = self._EC_DETAILS_TO_HASH[public_key.key_details]
key = load_der_public_key(
public_key.raw_bytes, types=(ec.EllipticCurvePublicKey,)
)
else:
raise InvalidKeyError(f"unsupported key type: {public_key.key_details}")

self.hash_algorithm = hash_algorithm
self.key = key
self.key_id = key_id(key)

def verify(self, signature: bytes, data: bytes) -> None:
"""
Verifies the given `data` against `signature` using the current key.
"""
if isinstance(self.key, rsa.RSAPublicKey):
self.key.verify(
signature=signature,
data=data,
# TODO: Parametrize this as well, for PSS.
padding=padding.PKCS1v15(),
algorithm=self.hash_algorithm,
)
elif isinstance(self.key, ec.EllipticCurvePublicKey):
self.key.verify(
signature=signature,
data=data,
signature_algorithm=ec.ECDSA(self.hash_algorithm),
)
else:
# Unreachable without API misuse.
raise KeyringSignatureError(f"unsupported key: {self.key}")

self._keyring[key_id(key)] = key

def add(self, key_pem: bytes) -> None:
class Keyring:
"""
Represents a set of keys, each of which is a potentially valid verifier.
"""

def __init__(self, public_keys: List[_PublicKey] = []):
"""
Adds a PEM-encoded key to the current keyring.
Create a new `Keyring`, with `keys` as the initial set of verifying keys.
"""
key = load_pem_public_key(key_pem)
self._keyring[key_id(key)] = key
self._keyring: dict[KeyID, Key] = {}

for public_key in public_keys:
key = Key(public_key)
self._keyring[key.key_id] = key

def verify(self, *, key_id: KeyID, signature: bytes, data: bytes) -> None:
"""
Verify that `signature` is a valid signature for `data`, using the
key identified by `key_id`.
Raises if `key_id` does not match a key in the `Keyring`, or if
the signature is invalid.
`key_id` is an unauthenticated hint; if no key matches the given key ID,
all keys in the keyring are tried.
Raises if the signature is invalid, i.e. is not valid for any of the
keys in the keyring.
"""

key = self._keyring.get(key_id)
if key is None:
# If we don't have a key corresponding to this key ID, we can't
# possibly verify the signature.
raise KeyringLookupError(f"no known key for key ID {key_id.hex()}")

try:
if isinstance(key, rsa.RSAPublicKey):
key.verify(
signature=signature,
data=data,
padding=padding.PKCS1v15(),
algorithm=hashes.SHA256(),
)
elif isinstance(key, ec.EllipticCurvePublicKey):
key.verify(
signature=signature,
data=data,
signature_algorithm=ec.ECDSA(hashes.SHA256()),
)
else:
# NOTE(ww): Unreachable without API misuse.
raise KeyringError(f"unsupported key type: {key}")
except InvalidSignature as exc:
raise KeyringSignatureError("invalid signature") from exc
if key is not None:
candidates = [key]
else:
candidates = list(self._keyring.values())

# Try to verify each candidate key. In the happy case, this will
# be exactly one candidate.
valid = False
for candidate in candidates:
try:
candidate.verify(signature, data)
valid = True
break
except InvalidSignature:
pass

if not valid:
raise KeyringSignatureError("invalid signature")


RekorKeyring = NewType("RekorKeyring", Keyring)
Expand Down Expand Up @@ -197,7 +248,7 @@ def from_file(
cls,
path: str,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
) -> TrustedRoot:
"""Create a new trust root from file"""
trusted_root: TrustedRoot = cls().from_json(Path(path).read_bytes())
trusted_root.purpose = purpose
Expand All @@ -209,7 +260,7 @@ def from_tuf(
url: str,
offline: bool = False,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
) -> TrustedRoot:
"""Create a new trust root from a TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
Expand All @@ -223,7 +274,7 @@ def production(
cls,
offline: bool = False,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
) -> TrustedRoot:
"""Create new trust root from Sigstore production TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
Expand All @@ -236,7 +287,7 @@ def staging(
cls,
offline: bool = False,
purpose: KeyringPurpose = KeyringPurpose.VERIFY,
) -> "TrustedRoot":
) -> TrustedRoot:
"""Create new trust root from Sigstore staging TUF repository.
If `offline`, will use trust root in local TUF cache. Otherwise will
Expand All @@ -247,17 +298,19 @@ def staging(
@staticmethod
def _get_tlog_keys(
tlogs: list[TransparencyLogInstance], purpose: KeyringPurpose
) -> Iterable[bytes]:
"""Return public key contents given transparency log instances."""
) -> Iterable[_PublicKey]:
"""
Yields an iterator of public keys for transparency log instances that
are suitable for `purpose`.
"""
allow_expired = purpose is KeyringPurpose.VERIFY
for key in tlogs:
for tlog in tlogs:
if not _is_timerange_valid(
key.public_key.valid_for, allow_expired=allow_expired
tlog.public_key.valid_for, allow_expired=allow_expired
):
continue
key_bytes = key.public_key.raw_bytes
if key_bytes:
yield key_bytes

yield tlog.public_key

@staticmethod
def _get_ca_keys(
Expand All @@ -274,14 +327,14 @@ def _get_ca_keys(
def rekor_keyring(self) -> RekorKeyring:
"""Return keyring with keys for Rekor."""

keys: list[bytes] = list(self._get_tlog_keys(self.tlogs, self.purpose))
keys: list[_PublicKey] = list(self._get_tlog_keys(self.tlogs, self.purpose))
if len(keys) != 1:
raise MetadataError("Did not find one Rekor key in trusted root")
return RekorKeyring(Keyring(keys))

def ct_keyring(self) -> CTKeyring:
"""Return keyring with key for CTFE."""
ctfes: list[bytes] = list(self._get_tlog_keys(self.ctlogs, self.purpose))
ctfes: list[_PublicKey] = list(self._get_tlog_keys(self.ctlogs, self.purpose))
if not ctfes:
raise MetadataError("CTFE keys not found in trusted root")
return CTKeyring(Keyring(ctfes))
Expand Down
40 changes: 21 additions & 19 deletions sigstore/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import hashlib
import sys
from enum import Enum
from typing import IO, NewType, Union
from typing import IO, NewType, Type, Union

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
Expand All @@ -46,6 +46,8 @@

PublicKey = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]

PublicKeyTypes = Union[Type[rsa.RSAPublicKey], Type[ec.EllipticCurvePublicKey]]

HexStr = NewType("HexStr", str)
"""
A newtype for `str` objects that contain hexadecimal strings (e.g. `ffabcd00ff`).
Expand Down Expand Up @@ -97,33 +99,33 @@ class InvalidCertError(Error):
"""


class UnexpectedKeyFormatError(InvalidKeyError):
"""
Raised when loading a key produces a key of an unexpected type.
"""

pass


def load_pem_public_key(key_pem: bytes) -> PublicKey:
def load_pem_public_key(
key_pem: bytes,
*,
types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey),
) -> PublicKey:
"""
A specialization of `cryptography`'s `serialization.load_pem_public_key`
with a uniform exception type (`InvalidKeyError`) and additional restrictions
on key validity (only RSA and ECDSA keys are valid).
with a uniform exception type (`InvalidKeyError`) and filtering on valid key types
for Sigstore purposes.
"""

try:
key = serialization.load_pem_public_key(key_pem)
except Exception as exc:
raise InvalidKeyError("could not load PEM-formatted public key") from exc

if not isinstance(key, (rsa.RSAPublicKey, ec.EllipticCurvePublicKey)):
raise UnexpectedKeyFormatError(f"invalid key format (not ECDSA or RSA): {key}")
if not isinstance(key, types):
raise InvalidKeyError(f"invalid key format: not one of {types}")

return key
return key # type: ignore[return-value]


def load_der_public_key(key_der: bytes) -> PublicKey:
def load_der_public_key(
key_der: bytes,
*,
types: tuple[PublicKeyTypes, ...] = (rsa.RSAPublicKey, ec.EllipticCurvePublicKey),
) -> PublicKey:
"""
The `load_pem_public_key` specialization, but DER.
"""
Expand All @@ -133,10 +135,10 @@ def load_der_public_key(key_der: bytes) -> PublicKey:
except Exception as exc:
raise InvalidKeyError("could not load DER-formatted public key") from exc

if not isinstance(key, (rsa.RSAPublicKey, ec.EllipticCurvePublicKey)):
raise UnexpectedKeyFormatError(f"invalid key format (not ECDSA or RSA): {key}")
if not isinstance(key, types):
raise InvalidKeyError(f"invalid key format: not one of {types}")

return key
return key # type: ignore[return-value]


def base64_encode_pem_cert(cert: Certificate) -> B64Str:
Expand Down
Loading

0 comments on commit 426120f

Please sign in to comment.