Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: await broker.publish do not waits actual message sends on Kafka(aiokafka) #1748

Closed
zpl opened this issue Aug 31, 2024 · 3 comments · Fixed by #1749
Closed

Bug: await broker.publish do not waits actual message sends on Kafka(aiokafka) #1748

zpl opened this issue Aug 31, 2024 · 3 comments · Fixed by #1749
Assignees
Labels
AioKafka Issues related to `faststream.kafka` module bug Something isn't working Confluent Issues related to `faststream.confluent` module

Comments

@zpl
Copy link

zpl commented Aug 31, 2024

Describe the bug

await broker.publish(....) do not wait for actual message send, even with enable_idempotence=True / acks='all'

I have fix for it.

so, this happend on Aiokafka, with remote servers. the reason is in faststream/kafka/publisher/producer.py

in

    @override
    async def publish(  # type: ignore[override]
        self,
        message: "SendableMessage",
        topic: str,
        *,
        correlation_id: str,
        key: Union[bytes, Any, None] = None,
        partition: Optional[int] = None,
        timestamp_ms: Optional[int] = None,
        headers: Optional[Dict[str, str]] = None,
        reply_to: str = "",
    ) -> None:
        """Publish a message to a topic."""
        message, content_type = encode_message(message)

        headers_to_send = {
            "content-type": content_type or "",
            "correlation_id": correlation_id,
            **(headers or {}),
        }

        if reply_to:
            headers_to_send["reply_to"] = headers_to_send.get(
                "reply_to",
                reply_to,
            )

        # should be send_and_wait here
        await self._producer.send( 
            topic=topic,
            value=message,
            key=key,
            partition=partition,
            timestamp_ms=timestamp_ms,
            headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
        ) 

there is a call to await self._producer.send and send is not actually waits for message deliver - it returns a furute (https://aiokafka.readthedocs.io/en/stable/_modules/aiokafka/producer/producer.html#AIOKafkaProducer.send)

here we should use send_and_wait instead (https://aiokafka.readthedocs.io/en/stable/_modules/aiokafka/producer/producer.html#AIOKafkaProducer.send_and_wait)

Could you please help me with pull request? thanks.

How to reproduce
Include source code:

import asyncio
from faststream import FastStream

# KAFKA_BOOTSTRAP_SERVERS - some remote cluster (not localhost)
broker = KafkaBroker( config('KAFKA_BOOTSTRAP_SERVERS'), security=security, enable_idempotence=True)

async def send_message():
    await broker.connect()
    await broker.publish([1,2,3], "msg_answer1")

    # it will work, if I add 
    # await asyncio.sleep(5) 




if __name__ == "__main__":
    asyncio.run(send_message())
...

Expected behavior
Expected: Message sent and then app closes

Observed behavior
App closes before actual message sent

Additional context
Provide any other relevant context or information about the problem here.

@zpl zpl added the bug Something isn't working label Aug 31, 2024
@Lancetnik Lancetnik added Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module labels Sep 1, 2024
@Lancetnik
Copy link
Member

Lancetnik commented Sep 1, 2024

Thank you for the report! I found the same behavior with Confluent, but was not sure, how it works
Now I see the problem and try to fix it soon

@Lancetnik
Copy link
Member

@zpl I created a draft PR alreat, but still not sure, which method should be used as a default one: should we wait for confirmation everytime or the current behavior should be the default one?

@zpl
Copy link
Author

zpl commented Sep 2, 2024

@Lancetnik , I think it should be send_and_wait for default. probably in some rare cases we can skip it for performance?

well, I think even with performance I prefer to use create_task instead of await. but I expect that send will actually send's a message.

@zpl zpl closed this as completed Sep 2, 2024
@zpl zpl reopened this Sep 2, 2024
github-merge-queue bot pushed a commit that referenced this issue Sep 6, 2024
* fix (#1748): add Kafka publish no_wait option

* feat: add no_confirm option to Confluent

* chore: fix CI

* chore: fix CI

* docs: gen API

* refactor: mv confluent thread to asyncio task

* fix: add no_confirm option to Confluent publish batch method

* chore: fix precommit

---------

Co-authored-by: Kumaran Rajendhiran <kumaran@airt.ai>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AioKafka Issues related to `faststream.kafka` module bug Something isn't working Confluent Issues related to `faststream.confluent` module
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

2 participants