From 50786d37815543fce8078a97feb30c840e4773a8 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 12 Aug 2024 07:14:21 -0500 Subject: [PATCH 01/12] Fix exceptions from WebSocket ping task not being consumed related issue https://github.com/home-assistant/core/issues/123653 replaces and closes #7238 fixes #5182 --- aiohttp/client_ws.py | 4 ++++ aiohttp/web_ws.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index f7822df8645..5d55a8691c6 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -151,9 +151,13 @@ def _send_heartbeat(self) -> None: if not ping_task.done(): self._ping_task = ping_task ping_task.add_done_callback(self._ping_task_done) + else: + self._ping_task_done(ping_task) def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" + if not task.cancelled() and (exc := task.exception()): + self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) self._ping_task = None def _pong_not_received(self) -> None: diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 3b76ab8eead..cb0fb486ff3 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -189,9 +189,13 @@ def _send_heartbeat(self) -> None: if not ping_task.done(): self._ping_task = ping_task ping_task.add_done_callback(self._ping_task_done) + else: + self._ping_task_done(ping_task) def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" + if not task.cancelled() and (exc := task.exception()): + self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) self._ping_task = None def _pong_not_received(self) -> None: From 8cd3d7cbb823162b557ff7aca3be76143dd29197 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 12 Aug 2024 07:17:11 -0500 Subject: [PATCH 02/12] Fix exceptions from WebSocket ping task not being consumed related issue https://github.com/home-assistant/core/issues/123653 replaces and closes #7238 fixes #5182 --- aiohttp/web_ws.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index cb0fb486ff3..5d0259336fa 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -195,6 +195,7 @@ def _send_heartbeat(self) -> None: def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" if not task.cancelled() and (exc := task.exception()): + assert self._reader is not None self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) self._ping_task = None From befa1ddb952c9dfe441fbe17349f64eeb142c4b4 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 12 Aug 2024 15:38:24 -0500 Subject: [PATCH 03/12] test for ping in progress while connection is lost --- tests/test_client_ws_functional.py | 29 +++++++++++++++++++++ tests/test_web_websocket_functional.py | 36 ++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index a63d0d57979..6fed4aa7beb 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -661,6 +661,35 @@ async def handler(request: web.Request) -> NoReturn: assert ping_received +async def test_heartbeat_connection_closed(aiohttp_client: AiohttpClient) -> None: + """Test that the connection is closed while ping is in progress.""" + + async def handler(request: web.Request) -> NoReturn: + ws = web.WebSocketResponse(autoping=False) + await ws.prepare(request) + await ws.receive() + assert False + + app = web.Application() + app.router.add_route("GET", "/", handler) + + client = await aiohttp_client(app) + resp = await client.ws_connect("/", heartbeat=0.1) + ping_count = 0 + # We patch write here to simulate a connection reset error + # since if we closed the connection normally, the server would + # would cancel the heartbeat task and we wouldn't get a ping + with mock.patch.object( + resp._conn.transport, "write", side_effect=ConnectionResetError + ), mock.patch.object(resp._writer, "ping", wraps=resp._writer.ping) as ping: + await resp.receive() + ping_count = ping.call_count + # Connection should be closed roughly after 1.5x heartbeat. + await asyncio.sleep(0.2) + assert ping_count == 1 + assert resp.close_code is WSCloseCode.ABNORMAL_CLOSURE + + async def test_heartbeat_no_pong(aiohttp_client: AiohttpClient) -> None: """Test that the connection is closed if no pong is received without sending messages.""" ping_received = False diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index af7addf29b9..e9defca6172 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -5,12 +5,14 @@ import contextlib import sys from typing import Any, Optional +from unittest import mock import pytest import aiohttp from aiohttp import WSServerHandshakeError, web from aiohttp.http import WSCloseCode, WSMsgType +from aiohttp.pytest_plugin import AiohttpClient async def test_websocket_can_prepare(loop: Any, aiohttp_client: Any) -> None: @@ -715,6 +717,40 @@ async def handler(request): await ws.close() +async def test_heartbeat_connection_closed( + loop: asyncio.AbstractEventLoop, aiohttp_client: AiohttpClient +) -> None: + """Test that the connection is closed while ping is in progress.""" + ping_count = 0 + + async def handler(request: web.Request) -> web.WebSocketResponse: + nonlocal ping_count + ws_server = web.WebSocketResponse(heartbeat=0.05) + await ws_server.prepare(request) + # We patch write here to simulate a connection reset error + # since if we closed the connection normally, the server would + # would cancel the heartbeat task and we wouldn't get a ping + with mock.patch.object( + ws_server._req.transport, "write", side_effect=ConnectionResetError + ), mock.patch.object( + ws_server._writer, "ping", wraps=ws_server._writer.ping + ) as ping: + await ws_server.receive() + ping_count = ping.call_count + return ws + + app = web.Application() + app.router.add_get("/", handler) + + client = await aiohttp_client(app) + ws = await client.ws_connect("/", autoping=False) + msg = await ws.receive() + assert msg.type is aiohttp.WSMsgType.CLOSED + assert msg.extra is None + assert ping_count == 1 + await ws.close() + + async def test_heartbeat_no_pong_send_many_messages( loop: Any, aiohttp_client: Any ) -> None: From 0fe79af4d5616d71f76a6b5726d7a5c5e4207a5b Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 12 Aug 2024 15:38:59 -0500 Subject: [PATCH 04/12] Update tests/test_client_ws_functional.py --- tests/test_client_ws_functional.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 6fed4aa7beb..93e6519de1e 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -677,7 +677,7 @@ async def handler(request: web.Request) -> NoReturn: resp = await client.ws_connect("/", heartbeat=0.1) ping_count = 0 # We patch write here to simulate a connection reset error - # since if we closed the connection normally, the server would + # since if we closed the connection normally, the client would # would cancel the heartbeat task and we wouldn't get a ping with mock.patch.object( resp._conn.transport, "write", side_effect=ConnectionResetError From db48f59ef0c4a1cadc1a0daec01e1304ff9a8f48 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 12 Aug 2024 15:51:40 -0500 Subject: [PATCH 05/12] mypy --- tests/test_client_ws_functional.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_client_ws_functional.py b/tests/test_client_ws_functional.py index 6fed4aa7beb..624fe6bc8db 100644 --- a/tests/test_client_ws_functional.py +++ b/tests/test_client_ws_functional.py @@ -679,6 +679,7 @@ async def handler(request: web.Request) -> NoReturn: # We patch write here to simulate a connection reset error # since if we closed the connection normally, the server would # would cancel the heartbeat task and we wouldn't get a ping + assert resp._conn is not None with mock.patch.object( resp._conn.transport, "write", side_effect=ConnectionResetError ), mock.patch.object(resp._writer, "ping", wraps=resp._writer.ping) as ping: From bd3c8c9d148150a3931b24c623749c3248741481 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 12 Aug 2024 15:54:14 -0500 Subject: [PATCH 06/12] timeline --- CHANGES/8685.bugfix.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 CHANGES/8685.bugfix.rst diff --git a/CHANGES/8685.bugfix.rst b/CHANGES/8685.bugfix.rst new file mode 100644 index 00000000000..8bd20196ee3 --- /dev/null +++ b/CHANGES/8685.bugfix.rst @@ -0,0 +1,3 @@ +Fixed unconsumed exceptions raised by the WebSocket heartbeat -- by :user:`bdraco`. + +If the heartbeat ping raised an exception, it would not be consumed and would be logged as an warning. From c7c4fbe62356ca366a1d5cca843ac38e86d168d2 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 16 Aug 2024 20:43:19 -0500 Subject: [PATCH 07/12] exception is no loner swallowed --- aiohttp/client_ws.py | 23 +++++++++++++---------- aiohttp/web_ws.py | 17 ++++++++++++----- tests/test_web_websocket_functional.py | 6 ++++-- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 5d55a8691c6..489cce43992 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -157,19 +157,22 @@ def _send_heartbeat(self) -> None: def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" if not task.cancelled() and (exc := task.exception()): - self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) + self._handle_ping_pong_exception(exc) self._ping_task = None def _pong_not_received(self) -> None: - if not self._closed: - self._set_closed() - self._close_code = WSCloseCode.ABNORMAL_CLOSURE - self._exception = ServerTimeoutError() - self._response.close() - if self._waiting and not self._closing: - self._reader.feed_data( - WSMessage(WSMsgType.ERROR, self._exception, None) - ) + self._handle_ping_pong_exception(ServerTimeoutError()) + + def _handle_ping_pong_exception(self, exc: BaseException) -> None: + """Handle exceptions raised during ping/pong processing.""" + if self._closed: + return + self._set_closed() + self._close_code = WSCloseCode.ABNORMAL_CLOSURE + self._exception = exc + self._response.close() + if self._waiting and not self._closing: + self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) def _set_closed(self) -> None: """Set the connection to closed. diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 3b678e6b9c7..fa2cbd07567 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -195,15 +195,22 @@ def _send_heartbeat(self) -> None: def _ping_task_done(self, task: "asyncio.Task[None]") -> None: """Callback for when the ping task completes.""" if not task.cancelled() and (exc := task.exception()): - assert self._reader is not None - self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) + self._handle_ping_pong_exception(exc) self._ping_task = None def _pong_not_received(self) -> None: if self._req is not None and self._req.transport is not None: - self._set_closed() - self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) - self._exception = asyncio.TimeoutError() + self._handle_ping_pong_exception(asyncio.TimeoutError()) + + def _handle_ping_pong_exception(self, exc: BaseException) -> None: + """Handle exceptions raised during ping/pong processing.""" + if self._closed: + return + self._set_closed() + self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) + self._exception = exc + if self._waiting and not self._closing: + self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) def _set_closed(self) -> None: """Set the connection to closed. diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index aaee23892a6..388d72e0e47 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -736,8 +736,10 @@ async def handler(request: web.Request) -> web.WebSocketResponse: ), mock.patch.object( ws_server._writer, "ping", wraps=ws_server._writer.ping ) as ping: - await ws_server.receive() - ping_count = ping.call_count + try: + await ws_server.receive() + finally: + ping_count = ping.call_count return ws app = web.Application() From cbe88401714341d656b56559808564b6cbf10725 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Fri, 16 Aug 2024 20:46:41 -0500 Subject: [PATCH 08/12] guard --- aiohttp/web_ws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index fa2cbd07567..134f7b10a24 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -209,7 +209,7 @@ def _handle_ping_pong_exception(self, exc: BaseException) -> None: self._set_closed() self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) self._exception = exc - if self._waiting and not self._closing: + if self._waiting and not self._closing and self._reader: self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) def _set_closed(self) -> None: From c6817f6e0e1e0a1b40c4c1162b8a8bf54b356323 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 17 Aug 2024 08:27:12 -0500 Subject: [PATCH 09/12] add check for ABNORMAL_CLOSURE --- tests/test_web_websocket_functional.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index 388d72e0e47..f0e97b72f52 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -750,6 +750,7 @@ async def handler(request: web.Request) -> web.WebSocketResponse: msg = await ws.receive() assert msg.type is aiohttp.WSMsgType.CLOSED assert msg.extra is None + assert ws.close_code == WSCloseCode.ABNORMAL_CLOSURE assert ping_count == 1 await ws.close() From cc0da14e1ca96ac5b5a5a678e88b6c4301461fcb Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 17 Aug 2024 08:46:50 -0500 Subject: [PATCH 10/12] handler will raise --- tests/test_web_websocket_functional.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index f0e97b72f52..60a3ad190ad 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -5,7 +5,7 @@ import contextlib import sys import weakref -from typing import Any, Optional +from typing import Any, NoReturn, Optional from unittest import mock import pytest @@ -724,7 +724,7 @@ async def test_heartbeat_connection_closed( """Test that the connection is closed while ping is in progress.""" ping_count = 0 - async def handler(request: web.Request) -> web.WebSocketResponse: + async def handler(request: web.Request) -> NoReturn: nonlocal ping_count ws_server = web.WebSocketResponse(heartbeat=0.05) await ws_server.prepare(request) @@ -740,7 +740,7 @@ async def handler(request: web.Request) -> web.WebSocketResponse: await ws_server.receive() finally: ping_count = ping.call_count - return ws + assert False app = web.Application() app.router.add_get("/", handler) From a3cb0fb773cd457e55cce5993f87c3dfa8184cfc Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 17 Aug 2024 08:51:33 -0500 Subject: [PATCH 11/12] coverage --- tests/test_web_websocket_functional.py | 33 ++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/test_web_websocket_functional.py b/tests/test_web_websocket_functional.py index 60a3ad190ad..3d1b5a98ef3 100644 --- a/tests/test_web_websocket_functional.py +++ b/tests/test_web_websocket_functional.py @@ -755,6 +755,39 @@ async def handler(request: web.Request) -> NoReturn: await ws.close() +async def test_heartbeat_failure_ends_receive( + loop: asyncio.AbstractEventLoop, aiohttp_client: AiohttpClient +) -> None: + """Test that no heartbeat response to the server ends the receive call.""" + ws_server_close_code = None + ws_server_exception = None + + async def handler(request: web.Request) -> NoReturn: + nonlocal ws_server_close_code, ws_server_exception + ws_server = web.WebSocketResponse(heartbeat=0.05) + await ws_server.prepare(request) + try: + await ws_server.receive() + finally: + ws_server_close_code = ws_server.close_code + ws_server_exception = ws_server.exception() + assert False + + app = web.Application() + app.router.add_get("/", handler) + + client = await aiohttp_client(app) + ws = await client.ws_connect("/", autoping=False) + msg = await ws.receive() + assert msg.type is aiohttp.WSMsgType.PING + msg = await ws.receive() + assert msg.type is aiohttp.WSMsgType.CLOSED + assert ws.close_code == WSCloseCode.ABNORMAL_CLOSURE + assert ws_server_close_code == WSCloseCode.ABNORMAL_CLOSURE + assert isinstance(ws_server_exception, asyncio.TimeoutError) + await ws.close() + + async def test_heartbeat_no_pong_send_many_messages( loop: Any, aiohttp_client: Any ) -> None: From 4a10ede7193c5fc5851a8de6df33f3489bc7cd69 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 17 Aug 2024 10:28:28 -0500 Subject: [PATCH 12/12] fix check --- aiohttp/web_ws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 134f7b10a24..d5a48e478db 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -209,7 +209,7 @@ def _handle_ping_pong_exception(self, exc: BaseException) -> None: self._set_closed() self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE) self._exception = exc - if self._waiting and not self._closing and self._reader: + if self._waiting and not self._closing and self._reader is not None: self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None)) def _set_closed(self) -> None: