Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

await websocket.recv() block the coroutine loop. #1468

Open
liyi-lee opened this issue May 20, 2024 · 1 comment
Open

await websocket.recv() block the coroutine loop. #1468

liyi-lee opened this issue May 20, 2024 · 1 comment
Labels

Comments

@liyi-lee
Copy link

liyi-lee commented May 20, 2024

This problem is not consistently reproducible.

I have a client program that, when started, receives information from the server, processes it, and then sends the result back to the server. However, I have encountered an issue: after the client processes a batch of data, it enters a frozen state where all coroutines stop running. It remains in this state until I press Ctrl+C to exit. By this time, the server has already disconnected due to not receiving a 'pong' in time. The client then restarts the connection and the cycle repeats.

Here is my sample code.

class WebSocketClient:
    def __init__(self, uri):
        self.uri = uri
        self.websocket = None
        self.lock = asyncio.Lock()

    async def connect(self):
        while True:
            try:
                self.websocket = await websockets.connect(self.uri)
                break
            except OSError as e:
                logging.error()

    async def send_message(self, message):
        if message:
            async with self.lock:
                await self.websocket.send(message)

    async def listen(self):
        try:
            while True:
                message = await self.websocket.recv()
                logging.info(f"Message from {self.uri}: {message}")
                # await anthor coroutine
                # send message
                await send_message("xxx")
        except Exception as e:
            logging.error(f"listen error :{e} , stopping listen task.")
            raise


    async def run(self, interval):
        while True:
            listen_task = None
            try:
                await self.connect()
                listen_task = asyncio.create_task(self.listen())
                await listen_task
            finally:
                if self.websocket and not self.websocket.closed:
                    await self.websocket.close()


async def main():
    client = WebSocketClient(server_str, plugin)
    await client.run()

if __name__ == "__main__":
    asyncio.run(main(), debug=True)

The log is probably something like this.

2024-05-20 21:11:00 | websockets.client | DEBUG | < PING '' [0 bytes]
2024-05-20 21:11:00 | websockets.client | DEBUG | > PONG '' [0 bytes]
2024-05-20 21:11:30 | websockets.client | DEBUG | < PING '' [0 bytes]
2024-05-20 21:11:30 | websockets.client | DEBUG | > PONG '' [0 bytes]
2024-05-20 21:12:00 | websockets.client | DEBUG | < PING '' [0 bytes]

At this point, it entered a frozen state. When I press Ctrl+C, the following log is output.

2024-05-20 21:15:00 | websockets.client | DEBUG | > PONG '' [0 bytes]
2024-05-20 21:15:00 | websockets.client | DEBUG | < PING '' [0 bytes]
2024-05-20 21:15:00 | websockets.client | DEBUG | > PONG '' [0 bytes]
2024-05-20 21:15:00 | websockets.client | DEBUG | < PING '' [0 bytes]
2024-05-20 21:15:00 | websockets.client | DEBUG | > PONG '' [0 bytes]

It seems that when the log mentioned above is being output, only the coroutine for await websocket.recv() is running. It appears that this is causing a blockage in the entire coroutine loop. Even when I replaced it with await asyncio.wait_for(websocket.recv(), 60), it still causes a blockage.

Please help me solve this problem. Thanks

@aaugustin
Copy link
Member

This doesn't look like a bug in websockets. With the info you provided, I have no hypothesis of what may cause the issue you're seeing.

There's at least two clear problems in the code but I don't have enough information to tell if and how they could lead to blocking:

  • every time you create a new connection, you run a new coroutine runningself.listen and it isn't guaranteed that self.listen terminates when that connection closes (because the definition of self.websocket can change while self.listen is running)

  • the lock is completely unnecessary

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants