Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer.get_watermark_offsets Ignores Timeout Parameter in Some Situations #413

Open
4 of 7 tasks
bpowers39 opened this issue Jun 28, 2018 · 3 comments
Open
4 of 7 tasks

Comments

@bpowers39
Copy link

Description

The get_watermark_offsets ignores the timeout and blocks forever when the kafka broker for a selected partition is down. Once the broker is back up, the function returns. I suspect this is a bug in rdkafka, but I'm posting it here first in case it's an issue with the python bindings.

How to reproduce

Create a topic with one partition. Run the below example. Once the example has consumed a few messages, kill the broker hosting the partition. The call to get_watermark_offsets will block until the broker comes back up.

import time
import sys
import confluent_kafka
from confluent_kafka import Consumer, KafkaError
from uuid import uuid4

if __name__ == '__main__':
    debug_thread = threading.Thread(target=debug_thread_func)
    debug_thread.start()

    client = Consumer({'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()),
              'default.topic.config': {'auto.offset.reset': 'smallest'}})

    def assigned(consumer, partitions):
        print("Assigned:", partitions)
        

    client.subscribe(['ibbot'], on_assign=assigned)

    while True:
        msg = client.poll(timeout=1)
        for partition in client.assignment():
            print(client.get_watermark_offsets(partition, timeout=1))

        if msg is not None:
            if msg.error():
                print("Error: ", msg.error())
            else:
                print("Data")

    client.close()
   

Note that this example is probably dependent on a bug in rdkafka, so it may not be reproducible 100% of the time. You can also reproduce this with a multi-partition and broker setup. In this case, the function only blocks for as long as it takes for a new leader to be elected.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): kafka-python: ('0.11.4', 721920) rdkafka: ('0.11.4', 722175)
  • Apache Kafka broker version: 0.11.0.2
  • Client configuration: {'bootstrap.servers': 'gateway:9092', 'group.id': str(uuid4()), 'default.topic.config': {'auto.offset.reset': 'smallest'}}
  • Operating system: RHEL 7
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@rnpridgeon
Copy link
Contributor

Thanks for reporting this @bpowers39. At a glance it would seem that once the broker transitions to the down state it not only stops servicing the broker ops queue but also stops scanning for timeouts. As a result your request hangs out on the queue until a connection can be reestablished.

case RD_KAFKA_BROKER_STATE_DOWN:
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_broker.c#L3480-L3507

rd_kafka_broker_connect:
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_broker.c#L1457-L1490

case RD_KAFKA_BROKER_STATE_UP; rd_kafka_broker_bufq_timeout_scan:
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_broker.c#L3547-L3573

I would agree this can cause some unexpected results. Perhaps adding a timeout scan at some configurable interval during the down state could help here. I'll open a librdkafka issue to report this behavior(or if you would like to that's okay too) to see what the best approach is to take here.

@bpowers39
Copy link
Author

Thanks for the quick reply! Feel free to open the issue, you understand it better than I do. This is particularly problematic in combination with #412. Since the whole process stops when it happens.

@chinmaychandak
Copy link

I'm curious, is this being worked on currently? This issue can really be problematic in some cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants