diff --git a/CHANGELOG.md b/CHANGELOG.md index 06216ad2..00ca597c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,10 @@ All versions prior to 0.9.0 are untracked. signature verification on a pre-computed hash value ([#904](https://github.com/sigstore/sigstore-python/pull/904)) +* API: The `sigstore.dsse` module has been been added, including APIs + for representing in-toto statements and DSSE envelopes + ([#930](https://github.com/sigstore/sigstore-python/pull/930)) + ### Removed * **BREAKING API CHANGE**: `SigningResult.input_digest` has been removed; diff --git a/pyproject.toml b/pyproject.toml index c53a233b..bfc862a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,6 @@ dependencies = [ "cryptography >= 42", "id >= 1.1.0", "importlib_resources ~= 5.7; python_version < '3.11'", - "in-toto-attestation == 0.9.3", "pydantic >= 2,< 3", "pyjwt >= 2.1", "pyOpenSSL >= 23.0.0", @@ -65,7 +64,6 @@ lint = [ # and let Dependabot periodically perform this update. "ruff < 0.3.3", "types-requests", - "types-protobuf", "types-pyOpenSSL", ] doc = ["pdoc"] diff --git a/sigstore/_internal/dsse.py b/sigstore/_internal/dsse.py deleted file mode 100644 index 5dc3d961..00000000 --- a/sigstore/_internal/dsse.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright 2022 The Sigstore Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Functionality for building and manipulating DSSE envelopes. -""" - -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import ec -from google.protobuf.json_format import MessageToJson -from in_toto_attestation.v1.statement import Statement -from sigstore_protobuf_specs.io.intoto import Envelope, Signature - - -def sign_intoto(key: ec.EllipticCurvePrivateKey, payload: Statement) -> Envelope: - """ - Create a DSSE envelope containing a signature over an in-toto formatted - attestation. - """ - - # See: - # https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md - # https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md - - type_ = "application/vnd.in-toto+json" - payload_encoded = MessageToJson(payload.pb, sort_keys=True).encode() - # NOTE: `payload_encoded.decode()` to avoid printing `repr(bytes)`, which would - # add `b'...'` around the formatted payload. - pae = ( - f"DSSEv1 {len(type_)} {type_} {len(payload_encoded)} {payload_encoded.decode()}" - ) - - signature = key.sign(pae.encode(), ec.ECDSA(hashes.SHA256())) - return Envelope( - payload=payload_encoded, - payload_type=type_, - signatures=[Signature(sig=signature, keyid=None)], - ) diff --git a/sigstore/_internal/rekor/client.py b/sigstore/_internal/rekor/client.py index 20c9a62d..93d56b32 100644 --- a/sigstore/_internal/rekor/client.py +++ b/sigstore/_internal/rekor/client.py @@ -77,7 +77,7 @@ def __init__(self, http_error: requests.HTTPError): """ Create a new `RekorClientError` from the given `requests.HTTPError`. """ - if http_error.response: + if http_error.response is not None: try: error = rekor_types.Error.model_validate_json(http_error.response.text) super().__init__(f"{error.code}: {error.message}") diff --git a/sigstore/dsse.py b/sigstore/dsse.py new file mode 100644 index 00000000..5dfd70b1 --- /dev/null +++ b/sigstore/dsse.py @@ -0,0 +1,211 @@ +# Copyright 2022 The Sigstore Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Functionality for building and manipulating in-toto Statements and DSSE envelopes. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Literal, Optional, Union + +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec +from pydantic import BaseModel, ConfigDict, Field, RootModel, StrictStr, ValidationError +from sigstore_protobuf_specs.io.intoto import Envelope as _Envelope +from sigstore_protobuf_specs.io.intoto import Signature + +_logger = logging.getLogger(__name__) + +_Digest = Union[ + Literal["sha256"], + Literal["sha384"], + Literal["sha512"], + Literal["sha3_256"], + Literal["sha3_384"], + Literal["sha3_512"], +] +""" +NOTE: in-toto's DigestSet contains all kinds of hash algorithms that +we intentionally do not support. This model is limited to common members of the +SHA-2 and SHA-3 family that are at least as strong as SHA-256. + +See: +""" + +_DigestSet = RootModel[Dict[_Digest, str]] +""" +An internal validation model for in-toto subject digest sets. +""" + + +class _Subject(BaseModel): + """ + A single in-toto statement subject. + """ + + name: Optional[StrictStr] + digest: _DigestSet = Field(...) + + +class _Statement(BaseModel): + """ + An internal validation model for in-toto statements. + """ + + model_config = ConfigDict(populate_by_name=True) + + type_: Literal["https://in-toto.io/Statement/v1"] = Field(..., alias="_type") + subjects: List[_Subject] = Field(..., min_length=1, alias="subject") + predicate_type: StrictStr = Field(..., alias="predicateType") + predicate: Optional[Dict[str, Any]] = Field(None, alias="predicate") + + +class Statement: + """ + Represents an in-toto statement. + + This type deals with opaque bytes to ensure that the encoding does not + change, but Statements are internally checked for conformance against + the JSON object layout defined in the in-toto attesation spec. + + See: + """ + + def __init__(self, contents: bytes) -> None: + """ + Construct a new Statement. + + This takes an opaque `bytes` containing the statement; use + `StatementBuilder` to manually construct an in-toto statement + from constituent pieces. + """ + self._contents = contents + try: + self._statement = _Statement.model_validate_json(contents) + except ValidationError: + raise ValueError("malformed in-toto statement") + + def _pae(self) -> bytes: + """ + Construct the PAE encoding for this statement. + """ + + # See: + # https://github.com/secure-systems-lab/dsse/blob/v1.0.0/envelope.md + # https://github.com/in-toto/attestation/blob/v1.0/spec/v1.0/envelope.md + pae = f"DSSEv1 {len(Envelope._TYPE)} {Envelope._TYPE} ".encode() + pae += b" ".join([str(len(self._contents)).encode(), self._contents]) + return pae + + +class _StatementBuilder: + """ + A builder-style API for constructing in-toto Statements. + """ + + def __init__( + self, + subjects: Optional[List[_Subject]] = None, + predicate_type: Optional[str] = None, + predicate: Optional[Dict[str, Any]] = None, + ): + """ + Create a new `_StatementBuilder`. + """ + self._subjects = subjects or [] + self._predicate_type = predicate_type + self._predicate = predicate + + def subjects(self, subjects: list[_Subject]) -> _StatementBuilder: + """ + Configure the subjects for this builder. + """ + self._subjects = subjects + return self + + def predicate_type(self, predicate_type: str) -> _StatementBuilder: + """ + Configure the predicate type for this builder. + """ + self._predicate_type = predicate_type + return self + + def predicate(self, predicate: dict[str, Any]) -> _StatementBuilder: + """ + Configure the predicate for this builder. + """ + self._predicate = predicate + return self + + def build(self) -> Statement: + """ + Build a `Statement` from the builder's state. + """ + try: + stmt = _Statement( + type_="https://in-toto.io/Statement/v1", + subjects=self._subjects, + predicate_type=self._predicate_type, + predicate=self._predicate, + ) + except ValidationError as e: + raise ValueError(f"invalid statement: {e}") + + return Statement(stmt.model_dump_json(by_alias=True).encode()) + + +class Envelope: + """ + Represents a DSSE envelope. + + This class cannot be constructed directly; you must use `sign`. + + See: + """ + + _TYPE = "application/vnd.in-toto+json" + + def __init__(self, inner: _Envelope) -> None: + """ + @private + """ + + self._inner = inner + + def to_json(self) -> str: + """ + Return a JSON string with this DSSE envelope's contents. + """ + # TODO: Unclear why mypy thinks this is returning `Any`. + return self._inner.to_json() # type: ignore[no-any-return] + + +def _sign(key: ec.EllipticCurvePrivateKey, stmt: Statement) -> Envelope: + """ + Sign for the given in-toto `Statement`, and encapsulate the resulting + signature in a DSSE `Envelope`. + """ + pae = stmt._pae() + _logger.debug(f"DSSE PAE: {pae!r}") + + signature = key.sign(pae, ec.ECDSA(hashes.SHA256())) + return Envelope( + _Envelope( + payload=stmt._contents, + payload_type=Envelope._TYPE, + signatures=[Signature(sig=signature, keyid=None)], + ) + ) diff --git a/sigstore/sign.py b/sigstore/sign.py index 7a684887..4d01a28d 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -50,7 +50,6 @@ from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec from cryptography.x509.oid import NameOID -from in_toto_attestation.v1.statement import Statement from sigstore_protobuf_specs.dev.sigstore.bundle.v1 import ( Bundle, VerificationMaterial, @@ -71,8 +70,8 @@ ) from sigstore_protobuf_specs.io.intoto import Envelope +from sigstore import dsse from sigstore import hashes as sigstore_hashes -from sigstore._internal import dsse from sigstore._internal.fulcio import ( ExpiredCertificate, FulcioCertificateSigningResponse, @@ -85,7 +84,7 @@ from sigstore.oidc import ExpiredIdentity, IdentityToken from sigstore.transparency import LogEntry -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class Signer: @@ -119,16 +118,16 @@ def __init__( FulcioCertificateSigningResponse ] = None if cache: - logger.debug("Generating ephemeral keys...") + _logger.debug("Generating ephemeral keys...") self.__cached_private_key = ec.generate_private_key(ec.SECP256R1()) - logger.debug("Requesting ephemeral certificate...") + _logger.debug("Requesting ephemeral certificate...") self.__cached_signing_certificate = self._signing_cert(self._private_key) @property def _private_key(self) -> ec.EllipticCurvePrivateKey: """Get or generate a signing key.""" if self.__cached_private_key is None: - logger.debug("no cached key; generating ephemeral key") + _logger.debug("no cached key; generating ephemeral key") return ec.generate_private_key(ec.SECP256R1()) return self.__cached_private_key @@ -145,7 +144,7 @@ def _signing_cert( return self.__cached_signing_certificate else: - logger.debug("Retrieving signed certificate...") + _logger.debug("Retrieving signed certificate...") # Build an X.509 Certificiate Signing Request builder = ( @@ -174,7 +173,7 @@ def _signing_cert( def sign( self, - input_: bytes | Statement | sigstore_hashes.Hashed, + input_: bytes | dsse.Statement | sigstore_hashes.Hashed, ) -> Bundle: """ Sign an input, and return a `Bundle` corresponding to the signed result. @@ -207,7 +206,7 @@ def sign( verify_sct(sct, cert, chain, self._signing_ctx._rekor._ct_keyring) - logger.debug("Successfully verified SCT...") + _logger.debug("Successfully verified SCT...") # Prepare inputs b64_cert = base64.b64encode( @@ -217,8 +216,8 @@ def sign( # Sign artifact content: MessageSignature | Envelope proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse - if isinstance(input_, Statement): - content = dsse.sign_intoto(private_key, input_) + if isinstance(input_, dsse.Statement): + content = dsse._sign(private_key, input_) # Create the proposed DSSE entry proposed_entry = rekor_types.Dsse( @@ -265,7 +264,7 @@ def sign( # Submit the proposed entry to the transparency log entry = self._signing_ctx._rekor.log.entries.post(proposed_entry) - logger.debug(f"Transparency log entry created with index: {entry.log_index}") + _logger.debug(f"Transparency log entry created with index: {entry.log_index}") return _make_bundle( content=content, diff --git a/sigstore/verify/models.py b/sigstore/verify/models.py index e2c26bdf..59422d4e 100644 --- a/sigstore/verify/models.py +++ b/sigstore/verify/models.py @@ -64,7 +64,7 @@ from sigstore.hashes import Hashed from sigstore.transparency import LogEntry, LogInclusionProof -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class VerificationResult(BaseModel): @@ -282,7 +282,7 @@ def from_bundle( # TODO: We should also retrieve the root of trust here and # cross-check against it. if cert_is_root_ca(chain_cert): - logger.warning( + _logger.warning( "this bundle contains a root CA, making it subject to misuse" ) @@ -316,7 +316,7 @@ def from_bundle( if not inclusion_promise: raise InvalidMaterials("bundle must contain an inclusion promise") if inclusion_proof and not inclusion_proof.checkpoint.envelope: - logger.debug( + _logger.debug( "0.1 bundle contains inclusion proof without checkpoint; ignoring" ) else: @@ -385,7 +385,7 @@ def rekor_entry(self, hashed_input: Hashed, client: RekorClient) -> LogEntry: and self._rekor_entry.inclusion_proof.checkpoint # type: ignore ) - logger.debug( + _logger.debug( f"has_inclusion_proof={has_inclusion_proof} " f"has_inclusion_promise={has_inclusion_promise}" ) @@ -412,7 +412,7 @@ def rekor_entry(self, hashed_input: Hashed, client: RekorClient) -> LogEntry: entry: LogEntry | None = None if offline: - logger.debug("offline mode; using offline log entry") + _logger.debug("offline mode; using offline log entry") # In offline mode, we require either an inclusion proof or an # inclusion promise. Every `LogEntry` has at least one as a # construction invariant, so no additional check is required here. @@ -421,7 +421,7 @@ def rekor_entry(self, hashed_input: Hashed, client: RekorClient) -> LogEntry: # In online mode, we require an inclusion proof. If our supplied log # entry doesn't have one, then we perform a lookup. if not has_inclusion_proof: - logger.debug("retrieving transparency log entry") + _logger.debug("retrieving transparency log entry") entry = client.log.entries.retrieve.post(expected_entry) else: entry = self._rekor_entry @@ -430,7 +430,7 @@ def rekor_entry(self, hashed_input: Hashed, client: RekorClient) -> LogEntry: if entry is None: raise RekorEntryMissing - logger.debug("Rekor entry: ensuring contents match signing materials") + _logger.debug("Rekor entry: ensuring contents match signing materials") # To catch a potentially dishonest or compromised Rekor instance, we compare # the expected entry (generated above) with the JSON structure returned diff --git a/sigstore/verify/policy.py b/sigstore/verify/policy.py index b3979915..a9bb5cd9 100644 --- a/sigstore/verify/policy.py +++ b/sigstore/verify/policy.py @@ -39,7 +39,7 @@ VerificationSuccess, ) -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) # From: https://github.com/sigstore/fulcio/blob/main/docs/oid-info.md _OIDC_ISSUER_OID = ObjectIdentifier("1.3.6.1.4.1.57264.1.1") @@ -249,7 +249,7 @@ def verify(self, cert: Certificate) -> VerificationResult: Verify `cert` against the policy. """ - logger.warning( + _logger.warning( "unsafe (no-op) verification policy used! no verification performed!" ) return VerificationSuccess() diff --git a/sigstore/verify/verifier.py b/sigstore/verify/verifier.py index 8dc9a36c..0204cd7f 100644 --- a/sigstore/verify/verifier.py +++ b/sigstore/verify/verifier.py @@ -67,7 +67,7 @@ ) from sigstore.verify.policy import VerificationPolicy -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class LogEntryMissing(VerificationFailure): @@ -252,7 +252,7 @@ def verify( if not policy_check: return policy_check - logger.debug("Successfully verified signing certificate validity...") + _logger.debug("Successfully verified signing certificate validity...") # 4) Verify that the signature was signed by the public key in the signing certificate try: @@ -266,7 +266,7 @@ def verify( except InvalidSignature: return VerificationFailure(reason="Signature is invalid for input") - logger.debug("Successfully verified signature...") + _logger.debug("Successfully verified signature...") # 5) Retrieve the Rekor entry for this artifact (potentially from # an offline entry), confirming its consistency with the other @@ -300,7 +300,7 @@ def verify( except CheckpointError as exc: return VerificationFailure(reason=f"invalid Rekor root hash: {exc}") - logger.debug( + _logger.debug( f"successfully verified inclusion proof: index={entry.log_index}" ) elif not materials._offline: @@ -309,7 +309,7 @@ def verify( # then we've somehow entered an invalid state, so fail. return VerificationFailure(reason="missing Rekor inclusion proof") else: - logger.warning( + _logger.warning( "inclusion proof not present in bundle: skipping due to offline verification" ) @@ -317,7 +317,7 @@ def verify( if entry.inclusion_promise: try: verify_set(self._rekor, entry) - logger.debug( + _logger.debug( f"successfully verified inclusion promise: index={entry.log_index}" ) except InvalidSETError as inval_set: diff --git a/test/unit/test_sign.py b/test/unit/test_sign.py index ffd0a3cf..c6d27894 100644 --- a/test/unit/test_sign.py +++ b/test/unit/test_sign.py @@ -23,6 +23,7 @@ import sigstore.oidc from sigstore._internal.keyring import KeyringError, KeyringLookupError from sigstore._internal.sct import InvalidSCTError, InvalidSCTKeyError +from sigstore.dsse import _StatementBuilder, _Subject from sigstore.hashes import Hashed from sigstore.sign import SigningContext from sigstore.verify.models import VerificationMaterials @@ -148,3 +149,27 @@ def test_sign_prehashed(staging): verifier.verify(input_, materials=materials, policy=UnsafeNoOp()) # verifying against the prehash also works verifier.verify(hashed, materials=materials, policy=UnsafeNoOp()) + + +@pytest.mark.online +@pytest.mark.ambient_oidc +def test_sign_dsse(staging): + sign_ctx, _, identity = staging + + ctx = sign_ctx() + stmt = ( + _StatementBuilder() + .subjects( + [_Subject(name="null", digest={"sha256": hashlib.sha256(b"").hexdigest()})] + ) + .predicate_type("https://cosign.sigstore.dev/attestation/v1") + .predicate( + { + "Data": "", + "Timestamp": "2023-12-07T00:37:58Z", + } + ) + ).build() + + with ctx.signer(identity) as signer: + signer.sign(stmt) diff --git a/test/unit/verify/test_policy.py b/test/unit/verify/test_policy.py index 9281154f..6b9be8b5 100644 --- a/test/unit/verify/test_policy.py +++ b/test/unit/verify/test_policy.py @@ -29,7 +29,7 @@ def test_does_not_init(self): class TestUnsafeNoOp: def test_succeeds(self, monkeypatch): logger = pretend.stub(warning=pretend.call_recorder(lambda s: None)) - monkeypatch.setattr(policy, "logger", logger) + monkeypatch.setattr(policy, "_logger", logger) policy_ = policy.UnsafeNoOp() assert policy_.verify(pretend.stub())