Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close lookupd connection before reader #83

Merged
merged 1 commit into from
Nov 6, 2022

Conversation

atugushev
Copy link
Collaborator

@atugushev atugushev commented Nov 5, 2022

This may reduce occasional ConnectionClosedError exceptions on reader.close() if the reader has a lookup connection. As a result – less flaky tests.

Example of flaky test
=================================== FAILURES ===================================
___________________________ test_restore_connection ____________________________

nsqlookupd = NSQLookupD('127.0.0.1', 4160), nsqd = NSQD('127.0.0.1', 4150)
wait_for = <function wait_for.<locals>.inner at 0x7f71eded9510>
register_producers = <function register_producers.<locals>._register_producers at 0x7f71eded92d0>

    async def test_restore_connection(nsqlookupd, nsqd, wait_for, register_producers):
        reader = await create_reader(
            topic="foo",
            channel="bar",
            lookupd_http_addresses=[nsqlookupd.http_address],
            lookupd_poll_interval=100,
        )
    
        await register_producers(nsqd)
        await wait_for(lambda: len(reader.connections) == 1)
    
        await nsqd.stop()
        await wait_for(lambda: len(reader.connections) == 0)
    
        await nsqd.start()
        await wait_for(lambda: len(reader.connections) == 1)
    
>       await reader.close()

tests/test_reader_lookupd.py:115: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
ansq/tcp/reader.py:162: in close
    await self._lookupd.close()
ansq/tcp/reader.py:261: in close
    await self.stop_polling()
ansq/tcp/reader.py:254: in stop_polling
    await self._poll_lookup_task
ansq/tcp/reader.py:236: in poll_lookup
    await self.query_lookup()
ansq/tcp/reader.py:224: in query_lookup
    await self._reader.connect_to_nsqd(address.host, address.port)
ansq/tcp/reader.py:150: in connect_to_nsqd
    await connection.subscribe(topic=self._topic, channel=self._channel)
ansq/tcp/connection.py:551: in subscribe
    sub_response = await self.sub(topic, channel)
ansq/tcp/connection.py:486: in sub
    response = await self.execute(NSQCommands.SUB, topic, channel)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <NSQConnection: endpoint=tcp://127.0.0.1:4150, status=ConnectionStatus.CLOSED>
command = b'SUB', data = None, callback = None, args = ('foo', 'bar')
future = <Future finished exception=ConnectionClosedError('Connection is closed')>

    async def execute(
        self,
        command: Union[str, bytes],
        *args: Any,
        data: Optional[Any] = None,
        callback: Optional[Callable[[TCPResponse], Any]] = None,
    ) -> TCPResponse:
        """Execute command
    
        Be careful: commands ``NOP``, ``FIN``, ``RDY``, ``REQ``, ``TOUCH``
            by NSQ spec returns ``None`` as  The class:`asyncio.Future` result.
    
        :returns: The response from NSQ.
        """
        if command is None:
            raise ValueError("Command must not be None")
        if None in set(args):
            raise ValueError("Args must not contain None")
    
        if (
            self.is_auth_required
            and not self.is_authorized
            and command != NSQCommands.AUTH
        ):
            raise NSQUnauthorized("NSQ server requires client authorization")
    
        if (
            self.status.is_reconnecting
            and self._reconnect_task
            and not self._reconnect_task.done()
        ):
            await self._reconnect_task
    
        assert self._reader, "You should call `connect` method first"
        if not self._status and not (command == NSQCommands.CLS):
            raise ConnectionClosedError("Connection is closed")
    
        future = self._loop.create_future()
        if command in (
            NSQCommands.NOP,
            NSQCommands.FIN,
            NSQCommands.RDY,
            NSQCommands.REQ,
            NSQCommands.TOUCH,
        ):
            future.set_result(None)
            callback and callback(None)
        else:
            self._cmd_waiters.append((future, callback))
    
        command_raw = self._parser.encode_command(command, *args, data=data)
        if command != NSQCommands.NOP:
            self.logger.debug("NSQ: Executing command %s", command_raw)
        assert self._writer is not None
        self._writer.write(command_raw)
    
        # track all processed and requeued messages
        if command in (
            NSQCommands.FIN,
            NSQCommands.REQ,
            NSQCommands.FIN.decode(),
            NSQCommands.REQ.decode(),
        ):
            self._in_flight = max(0, self._in_flight - 1)
    
>       return await future
E       ansq.tcp.exceptions.ConnectionClosedError: Connection is closed

This may reduce occasional ConnectionClosedError exceptions on reader.close()
@codecov-commenter
Copy link

codecov-commenter commented Nov 5, 2022

Codecov Report

Base: 87.55% // Head: 87.60% // Increases project coverage by +0.04% 🎉

Coverage data is based on head (ea6b52b) compared to base (42cd062).
Patch coverage: 100.00% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##           master      #83      +/-   ##
==========================================
+ Coverage   87.55%   87.60%   +0.04%     
==========================================
  Files          33       33              
  Lines        2041     2041              
  Branches      234      234              
==========================================
+ Hits         1787     1788       +1     
  Misses        188      188              
+ Partials       66       65       -1     
Impacted Files Coverage Δ
ansq/tcp/reader.py 82.11% <100.00%> (ø)
ansq/tcp/connection.py 67.81% <0.00%> (+0.31%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

@atugushev atugushev merged commit 4bd4c44 into list-family:master Nov 6, 2022
@atugushev atugushev deleted the fix-connection-error branch November 6, 2022 17:11
@atugushev atugushev added this to the 0.2.0 milestone May 30, 2023
@atugushev atugushev added the bug Something isn't working label May 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants