-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer.get_watermark_offsets Ignores Timeout Parameter in Some Situations #413
Comments
Thanks for reporting this @bpowers39. At a glance it would seem that once the broker transitions to the down state it not only stops servicing the broker ops queue but also stops scanning for timeouts. As a result your request hangs out on the queue until a connection can be reestablished. case RD_KAFKA_BROKER_STATE_DOWN: rd_kafka_broker_connect: case RD_KAFKA_BROKER_STATE_UP; rd_kafka_broker_bufq_timeout_scan: I would agree this can cause some unexpected results. Perhaps adding a timeout scan at some configurable interval during the down state could help here. I'll open a librdkafka issue to report this behavior(or if you would like to that's okay too) to see what the best approach is to take here. |
Thanks for the quick reply! Feel free to open the issue, you understand it better than I do. This is particularly problematic in combination with #412. Since the whole process stops when it happens. |
I'm curious, is this being worked on currently? This issue can really be problematic in some cases. |
Description
The get_watermark_offsets ignores the timeout and blocks forever when the kafka broker for a selected partition is down. Once the broker is back up, the function returns. I suspect this is a bug in rdkafka, but I'm posting it here first in case it's an issue with the python bindings.
How to reproduce
Create a topic with one partition. Run the below example. Once the example has consumed a few messages, kill the broker hosting the partition. The call to get_watermark_offsets will block until the broker comes back up.
Note that this example is probably dependent on a bug in rdkafka, so it may not be reproducible 100% of the time. You can also reproduce this with a multi-partition and broker setup. In this case, the function only blocks for as long as it takes for a new leader to be elected.
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
): kafka-python: ('0.11.4', 721920) rdkafka: ('0.11.4', 722175){'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()), 'default.topic.config': {'auto.offset.reset': 'smallest'}}
'debug': '..'
as necessary)The text was updated successfully, but these errors were encountered: