diff --git a/pyatv/auth/hap_srp.py b/pyatv/auth/hap_srp.py index 0a7e613e7..3453bfd10 100644 --- a/pyatv/auth/hap_srp.py +++ b/pyatv/auth/hap_srp.py @@ -91,7 +91,7 @@ def verify1(self, credentials, session_pub_key, encrypted): "Pair-Verify-Encrypt-Salt", "Pair-Verify-Encrypt-Info", self._shared ) - chacha = chacha20.Chacha20Cipher(session_key, session_key) + chacha = chacha20.Chacha20Cipher8byteNonce(session_key, session_key) decrypted_tlv = read_tlv(chacha.decrypt(encrypted, nonce="PV-Msg02".encode())) identifier = decrypted_tlv[TlvValue.Identifier] @@ -199,14 +199,14 @@ def step3( if additional_data: tlv.update(additional_data) - chacha = chacha20.Chacha20Cipher(self._session_key, self._session_key) + chacha = chacha20.Chacha20Cipher8byteNonce(self._session_key, self._session_key) encrypted_data = chacha.encrypt(write_tlv(tlv), nonce="PS-Msg05".encode()) log_binary(_LOGGER, "Data", Encrypted=encrypted_data) return encrypted_data def step4(self, encrypted_data): """Last pairing step.""" - chacha = chacha20.Chacha20Cipher(self._session_key, self._session_key) + chacha = chacha20.Chacha20Cipher8byteNonce(self._session_key, self._session_key) decrypted_tlv_bytes = chacha.decrypt(encrypted_data, nonce="PS-Msg06".encode()) if not decrypted_tlv_bytes: diff --git a/pyatv/protocols/airplay/server_auth.py b/pyatv/protocols/airplay/server_auth.py index e5839cee1..9351273fc 100644 --- a/pyatv/protocols/airplay/server_auth.py +++ b/pyatv/protocols/airplay/server_auth.py @@ -284,7 +284,7 @@ def _m1_verify(self, pairing_data): {TlvValue.Identifier: self.unique_id, TlvValue.Signature: signature} ) - chacha = chacha20.Chacha20Cipher(session_key, session_key) + chacha = chacha20.Chacha20Cipher8byteNonce(session_key, session_key) encrypted = chacha.encrypt(tlv, nonce="PV-Msg02".encode()) tlv = { @@ -368,7 +368,7 @@ def _m5_setup(self, pairing_data, transient: bool): binascii.unhexlify(self.session.key), ) - chacha = chacha20.Chacha20Cipher(session_key, session_key) + chacha = chacha20.Chacha20Cipher8byteNonce(session_key, session_key) decrypted_tlv_bytes = chacha.decrypt( pairing_data[TlvValue.EncryptedData], nonce="PS-Msg05".encode() ) @@ -396,7 +396,7 @@ def _m5_setup(self, pairing_data, transient: bool): } ) - chacha = chacha20.Chacha20Cipher(session_key, session_key) + chacha = chacha20.Chacha20Cipher8byteNonce(session_key, session_key) encrypted = chacha.encrypt(tlv, nonce="PS-Msg06".encode()) self.has_paired() diff --git a/pyatv/protocols/mrp/connection.py b/pyatv/protocols/mrp/connection.py index 0ba549ce3..249c35f2b 100644 --- a/pyatv/protocols/mrp/connection.py +++ b/pyatv/protocols/mrp/connection.py @@ -92,7 +92,7 @@ def eof_received(self): def enable_encryption(self, output_key: bytes, input_key: bytes) -> None: """Enable encryption with the specified keys.""" - self._chacha = chacha20.Chacha20Cipher(output_key, input_key) + self._chacha = chacha20.Chacha20Cipher8byteNonce(output_key, input_key) @property def connected(self) -> bool: diff --git a/pyatv/protocols/mrp/server_auth.py b/pyatv/protocols/mrp/server_auth.py index c096b127d..c43568bf9 100644 --- a/pyatv/protocols/mrp/server_auth.py +++ b/pyatv/protocols/mrp/server_auth.py @@ -147,7 +147,7 @@ def _m1_verify(self, pairing_data): {TlvValue.Identifier: self.unique_id, TlvValue.Signature: signature} ) - chacha = chacha20.Chacha20Cipher(session_key, session_key) + chacha = chacha20.Chacha20Cipher8byteNonce(session_key, session_key) encrypted = chacha.encrypt(tlv, nonce="PV-Msg02".encode()) msg = messages.crypto_pairing( @@ -227,7 +227,7 @@ def _m5_setup(self, _): } ) - chacha = chacha20.Chacha20Cipher(session_key, session_key) + chacha = chacha20.Chacha20Cipher8byteNonce(session_key, session_key) encrypted = chacha.encrypt(tlv, nonce="PS-Msg06".encode()) msg = messages.crypto_pairing( diff --git a/pyatv/support/chacha20.py b/pyatv/support/chacha20.py index 768da7a4a..295faa4e9 100644 --- a/pyatv/support/chacha20.py +++ b/pyatv/support/chacha20.py @@ -8,15 +8,11 @@ NONCE_LENGTH = 12 -# The first 4 bytes are always 0, followed by 8 bytes of counter -# for a total of 12 bytes. -PACK_NONCE = partial(Struct(" None: """Initialize a new Chacha20Cipher.""" self._enc_out = ChaCha20Poly1305(out_key) self._enc_in = ChaCha20Poly1305(in_key) @@ -31,7 +27,11 @@ def out_nonce(self) -> bytes: This is the nonce that will be used by encrypt in the _next_ call if no custom nonce is specified. """ - return PACK_NONCE(self._out_counter) + nonce_length = self._nonce_length + nonce = self._out_counter.to_bytes(length=nonce_length, byteorder="little") + if nonce_length != NONCE_LENGTH: + return self._pad_nonce(nonce) + return nonce @property def in_nonce(self) -> bytes: @@ -40,7 +40,15 @@ def in_nonce(self) -> bytes: This is the nonce that will be used by decrypt in the _next_ call if no custom nonce is specified. """ - return PACK_NONCE(self._in_counter) + nonce_length = self._nonce_length + nonce = self._in_counter.to_bytes(length=nonce_length, byteorder="little") + if nonce_length != NONCE_LENGTH: + return self._pad_nonce(nonce) + return nonce + + def _pad_nonce(self, nonce: bytes) -> bytes: + """Pad nonce to 12 bytes.""" + return b"\x00" * (NONCE_LENGTH - len(nonce)) + nonce def encrypt( self, data: bytes, nonce: Optional[bytes] = None, aad: Optional[bytes] = None @@ -50,7 +58,7 @@ def encrypt( nonce = self.out_nonce self._out_counter += 1 elif len(nonce) < NONCE_LENGTH: - nonce = b"\x00" * (NONCE_LENGTH - len(nonce)) + nonce + nonce = self._pad_nonce(nonce) return self._enc_out.encrypt(nonce, data, aad) def decrypt( @@ -61,5 +69,38 @@ def decrypt( nonce = self.in_nonce self._in_counter += 1 elif len(nonce) < NONCE_LENGTH: - nonce = b"\x00" * (NONCE_LENGTH - len(nonce)) + nonce + nonce = self._pad_nonce(nonce) return self._enc_in.decrypt(nonce, data, aad) + + +_PACK_NONCE_WITH_4_BYTE_PAD = partial(Struct(" None: + """Initialize a new Chacha20Cipher8byteNonce.""" + super().__init__(out_key, in_key, nonce_length=8) + + @property + def out_nonce(self) -> bytes: + """Return next encrypt nonce. + + This is the nonce that will be used by encrypt in the _next_ call if no custom + nonce is specified. + """ + return _PACK_NONCE_WITH_4_BYTE_PAD(self._out_counter) + + @property + def in_nonce(self) -> bytes: + """Return next decrypt nonce. + + This is the nonce that will be used by decrypt in the _next_ call if no custom + nonce is specified. + """ + return _PACK_NONCE_WITH_4_BYTE_PAD(self._in_counter) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 1d4ee1f9a..e778205d1 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,13 +1,13 @@ aiohttp==3.9.5 async-timeout==4.0.3 -cryptography==42.0.5 +cryptography==42.0.8 chacha20poly1305-reuseable==0.12.1 ifaddr==0.2.0 ifaddr==0.2.0 mediafile==0.12.0 miniaudio==1.59 -protobuf==4.25.2 -pydantic==2.5.3 +protobuf==5.27.1 +pydantic==2.7.4 requests==2.32.2 srptools==1.0.1 tabulate==0.9.0 diff --git a/requirements/requirements_docs.txt b/requirements/requirements_docs.txt index 35ceac3d8..11cfd0646 100644 --- a/requirements/requirements_docs.txt +++ b/requirements/requirements_docs.txt @@ -1,2 +1,2 @@ codespell==2.2.6 -pdoc3==0.10.0 +pdoc3==0.11.0 diff --git a/requirements/requirements_test.txt b/requirements/requirements_test.txt index fe7500902..a47097f2c 100644 --- a/requirements/requirements_test.txt +++ b/requirements/requirements_test.txt @@ -1,6 +1,6 @@ -black==24.4.0 -deepdiff==6.7.1 -flake8==7.0.0 +black==24.4.2 +deepdiff==7.0.1 +flake8==7.1.0 isort==5.13.2 mutagen==1.47.0 pyfakefs==5.4.1 @@ -8,10 +8,10 @@ pylint==3.0.3 pytest==8.2.2 pytest-asyncio==0.23.6 pytest-cov==4.1.0 -pytest-timeout==2.2.0 +pytest-timeout==2.3.1 pytest-aiohttp==1.0.5 pytest-httpserver==1.0.8 -pytest-xdist==3.5.0 +pytest-xdist==3.6.1 pydocstyle==6.3.0 mypy==1.8.0 mypy-protobuf==3.5.0 diff --git a/tests/fake_device/mrp.py b/tests/fake_device/mrp.py index d9433ae66..5458a266a 100644 --- a/tests/fake_device/mrp.py +++ b/tests/fake_device/mrp.py @@ -392,7 +392,7 @@ def connection_lost(self, exc): def enable_encryption(self, output_key: bytes, input_key: bytes) -> None: """Enable encryption with specified keys.""" - self.chacha = chacha20.Chacha20Cipher(output_key, input_key) + self.chacha = chacha20.Chacha20Cipher8byteNonce(output_key, input_key) self.state.has_authenticated = True def send_to_client(self, message: ProtobufMessage) -> None: diff --git a/tests/support/test_chacha20.py b/tests/support/test_chacha20.py new file mode 100644 index 000000000..81fc34213 --- /dev/null +++ b/tests/support/test_chacha20.py @@ -0,0 +1,23 @@ +"""Unit tests for pyatv.support.chacha20.""" + +import logging + +from pyatv.support import chacha20 + +fake_key = b"k" * 32 + + +def test_12_bytes_nonce(): + cipher = chacha20.Chacha20Cipher(fake_key, fake_key, 12) + assert len(cipher.out_nonce) == chacha20.NONCE_LENGTH + assert len(cipher.in_nonce) == chacha20.NONCE_LENGTH + result = cipher.encrypt(b"test") + assert cipher.decrypt(result) == b"test" + + +def test_8_bytes_nonce(): + cipher = chacha20.Chacha20Cipher8byteNonce(fake_key, fake_key) + assert len(cipher.out_nonce) == chacha20.NONCE_LENGTH + assert len(cipher.in_nonce) == chacha20.NONCE_LENGTH + result = cipher.encrypt(b"test") + assert cipher.decrypt(result) == b"test"